Kalman Beta 双用途设计方案(v2 — 算法优化版)
Kalman Beta 双用途设计方案(v2 — 算法优化版)
1. 问题背景
当前系统在 Beta 值的使用上存在两个结构性缺陷:
缺陷 1:Beta 估计滞后
OLS BETA_WINDOW=100 固定窗口(≈17天@4h),β 结构性变化后系统需要一周以上才能感知:
T+0h BTC 回暖,β 从 0.5 开始上升
T+12h β 实际 2.0,OLS β̂ 仍 ≈ 0.6 → 误判为均值回归机会
T+24h β 实际 4.0,OLS β̂ ≈ 0.8 → 继续误入场
T+7d β 实际 8.0,OLS β̂ ≈ 2.0 → 协整假设彻底失效
核心问题:系统用滞后 β 计算的 z4h 产生入场信号,实际上反映的是 β 正在变化,而非 spread 正在回归。
缺陷 2:仓位比例未使用 Beta
risk_manager.calculate_position_size() 对两腿使用等额名义价值:
# 当前实现(risk_manager.py:164-170)
alt_size = position_usd / alt_price # 两腿等额
base_size = position_usd / base_price # 与 β 无关
配对交易的对冲比例应为 alt_notional = β × base_notional。等额开仓意味着:
- β=0.5 时,Alt 腿过重 → 暴露于 Alt 的非系统性风险
- β=2.0 时,Alt 腿过轻 → 对冲不足,价差波动被放大
- β 变化时,无法调整比例 → 持仓期间 hedge 持续偏离
缺陷 3:固定 Q 无法适配不同币对(v2 新增)
不同配对的 β 动态特征差异极大:
- MEME 配对(如 PURR/HYPE):β 波动剧烈,需要更大的过程噪声 Q 来追踪
- L1 配对(如 ETH/BTC):β 非常稳定,过大的 Q 会引入不必要的噪声
固定 q_β=1e-4 无法同时满足两类配对的需求,导致要么追踪不足(漏报体制切换),要么过度追踪(误报)。
当前系统弱点汇总
| 组件 | 问题 | 影响 |
|---|---|---|
analysis_core.py |
BETA_WINDOW=100 固定窗口 OLS |
β 估计滞后 ≈17天 |
risk_manager.py |
等额名义价值,未使用 β | 对冲比例错误 |
strategy.py |
adaptive_threshold=3.0 固定 |
β 飙升导致持续突破阈值 |
momentum_filter.py |
只检测单腿价格趋势 | 不检测 spread/beta 体制切换 |
| — | 固定过程噪声 Q | 不同币对无法自适应追踪 |
2. 方案概述
核心设计:向量 Kalman Filter 估计时变 [α, β],其输出 同时服务两个用途:
┌───────────────────────────────────────────┐
│ VectorKalmanBetaEstimator (4h) │
│ 输入: r_btc, r_alt (对数收益率) │
│ 特性: Innovation-based Q 自适应 │
│ 输出: kalman_beta, innovation, P_β │
└──────────────┬────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼
用途 1: 体制检测 用途 2: Hedge Ratio
┌─────────────────┐ ┌─────────────────────┐
│ BOCPD 变点检测 │ │ 仓位比例计算 │
│ 特性: 自适应 H │ │ 输入: kalman_beta │
│ 输出: 变点概率 │ │ 输出: alt/base 数量比 │
└────────┬────────┘ └──────────┬──────────┘
│ │
▼ ▼
入场信号过滤 Beta 加权开仓
(硬拦截/阈值缩放) (alt_notional = β × base_notional)
│
▼
跨配对系统性风险聚合
(加权变点概率 > 阈值 → 全局拦截)
v2 相比 v1 的核心算法升级
| 组件 | v1 | v2 | 提升 |
|---|---|---|---|
| Kalman Q | 固定 q_β=1e-4 |
Innovation-based 自适应 Q | 每配对自动最优追踪速度 |
| Sage-Husa R | 负值时强制设为 R_floor(系统性低估) | 仅正值时更新,否则保持 | 消除 R 低估偏差 |
| BOCPD Hazard | 固定 H=1/50 |
Innovation 方差驱动自适应 H | 变化剧烈时更快检测 |
| 系统性风险 | 简单计数比例 | 加权聚合(P(变点) × 仓位权重) | 减少误报/漏报 |
| 负 β 处理 | abs() 一刀切 |
返回符号信息,策略层方向翻转 | 避免反向配对错误 |
| Sigmoid 缩放 | steepness 无上限 | 上限 20.0 | 防止退化为阶跃函数 |
为什么 4h 频率足够
β 是两个资产之间的结构性关系(同赛道、资金流向、基本面联动),变化周期在天~周级别。
| 方法 | 响应 β 结构性变化 | 抗日内噪声 | 适用场景 |
|---|---|---|---|
| OLS 100×4h | ~7-10 天 | 强 | 当前系统(过于滞后) |
| 4h Kalman (自适应 Q) | 4-16 小时 | 强 | 体制检测 + Hedge Ratio |
| 5m Kalman | ~15-30 分钟 | 弱(追噪声) | 不适用于 β 估计 |
四层架构
| 层 | 组件 | 功能 | v2 改进 |
|---|---|---|---|
| 第一层 | 向量 Kalman Filter | 估计时变 [α, β] | Innovation-based Q 自适应 + R 修复 |
| 第二层 | BOCPD 变点检测 | 监控 innovation 分布变化 → 入场信号过滤 | 自适应 Hazard Rate |
| 第三层 | 跨配对系统性风险聚合 | 统计所有配对体制状态 → 系统性事件拦截 | 加权聚合 |
| 第四层 | Beta 加权仓位计算 | 用 kalman_beta 计算两腿名义比例 |
负 β 方向感知 |
3. 算法选型论证
3.1 为什么选 Kalman Filter(而非其他时变回归方法)
| 方法 | 优势 | 劣势 | 适用性评估 |
|---|---|---|---|
| 向量 Kalman Filter | 最优线性估计(MMSE);递推 O(1);输出 innovation 可直接用于变点检测;P_β 提供不确定性度量 | 假设线性高斯;固定 Q 时不自适应 | 最优选择(本方案用 Q 自适应解决劣势) |
| RLS (递推最小二乘) | 实现简单;forgetting factor 天然自适应 | 不输出 innovation 分布(无法接 BOCPD);无状态不确定性度量 | 不适合(缺少 BOCPD 所需信号) |
| 粒子滤波 | 处理非线性/非高斯 | O(N_particles) 计算量大;参数多;难调试 | 过度设计(β 的线性关系足够) |
| Online Bayesian LR | 完全贝叶斯;自然输出后验 | 不支持时变参数(除非加 forgetting) | 不如 Kalman 灵活 |
| 神经网络 (LSTM/Transformer) | 可捕获非线性 | 需大量训练数据;不可解释;延迟高 | 不适合(生产系统需要可解释性) |
结论:Kalman Filter 是时变线性回归的理论最优解(Kalman 1960),且其 innovation 序列是 BOCPD 的天然输入。配合 Q 自适应后,覆盖了 RLS forgetting factor 的自适应能力。
3.2 为什么选 BOCPD(而非其他变点检测方法)
| 方法 | 优势 | 劣势 | 适用性评估 |
|---|---|---|---|
| BOCPD (Adams & MacKay 2007) | 在线递推;输出变点后验概率(非二元判断);Normal-Inverse-Gamma 共轭高效;已在 momentum_filter 验证 | 固定 hazard 不够灵活;O(max_rl) 内存 | 最优选择(本方案加自适应 H) |
| CUSUM | 极简;延迟低 | 只检测均值偏移;无概率输出;阈值需手调 | 过于简单 |
| PELT | 精确(离线最优) | 离线算法,不支持在线 | 不适用 |
| Conformal CPD (Vovk 2021) | 无分布假设 | 在线版本不成熟;无 run length 信息 | 实验阶段 |
| HMM | 多体制建模 | 体制数需预设;EM 训练需离线 | 适合离线分析(P2 演进) |
| GP-BOCPD (Turner 2009) | 段内非平稳 | O(n²) 计算量 | 过度设计 |
结论:BOCPD 是在线变点检测的标杆算法,且本项目 momentum_filter.py 已使用同一框架(P(trending) 检测),技术栈一致。
3.3 为什么不用统一框架(Regime-Switching SSM)
Regime-Switching State Space Model (Kim 1994) 将 Kalman + 体制检测合二为一:
- 优势:参数更少、理论更优雅
- 劣势:Kim's approximation 有误差累积;实现复杂度高;调试困难;两层概率耦合
当前选择:模块化 Kalman + BOCPD 架构。原因:
- 可独立调试和验证每一层
- BOCPD 已有 momentum_filter 经验
- 出问题时容易定位(是 β 估计问题还是变点检测问题)
- MS-SSM 作为 P2 演进方向保留
4. 详细设计
4.1 向量 Kalman Filter 时变 [α, β] 估计
数学模型
状态方程([α, β] 缓慢演化):
x_t = x_{t-1} + w_t, w_t ~ N(0, Q_t)
其中 x_t = [α_t, β_t]'
Q_t = [[q_α_t, 0 ], 对角阵,α/β 独立演化
[ 0, q_β_t]]
q_β_t 由 Innovation-based 自适应调整(见下文)
观测方程(对数收益率关系):
r_alt_t = H_t × x_t + v_t, v_t ~ N(0, R_t)
其中 H_t = [1, r_btc_t]
r_alt_t = log(alt_t / alt_{t-1})
r_btc_t = log(btc_t / btc_{t-1})
R_t 由 Sage-Husa 自适应调整
递推公式:
预测步:
x̂_t|t-1 = x̂_{t-1}
P_t|t-1 = P_{t-1} + Q_t (2×2)
Innovation:
ε_t = r_alt_t - H_t × x̂_t|t-1 (标量)
S_t = H_t × P_t|t-1 × H_t' + R_t (标量)
Huber Clipping (c=3.0):
ε̃_t = clip(ε_t, -c×√S_t, +c×√S_t)
Kalman 增益:
K_t = P_t|t-1 × H_t' / S_t (2×1)
更新步:
x̂_t = x̂_t|t-1 + K_t × ε̃_t
Joseph 稳定形式:
A = I - K_t × H_t (2×2)
P_t = A × P_t|t-1 × A' + K_t × R_t × K_t' (保证半正定)
Sage-Husa R 自适应 (v2 修复):
d_t = (1 - b) / (1 - b^(t+1)), b ∈ (0,1)
r_sample = ε_t² - H_t × P_t|t-1 × H_t'
若 r_sample > 0: ← v2: 仅正值时更新
R_t = (1 - d_t) × R_{t-1} + d_t × r_sample
R_t = max(R_t, R_floor)
否则:
R_t = R_{t-1} ← v2: 保持不变
Innovation-based Q_β 自适应 (v2 新增):
ν_t = (ε_t / √S_t)² (归一化 innovation 平方)
ν̄_t = (1 - η) × ν̄_{t-1} + η × ν_t (EMA 追踪)
理论基础: 若模型正确,E[ν_t] = 1
若 ν̄_t > γ_upper (如 2.0):
Q_β 过小(追踪不足)→ q_β_t = min(q_β_{t-1} × κ_up, q_β_ceil)
若 ν̄_t < γ_lower (如 0.3):
Q_β 过大(追踪过度)→ q_β_t = max(q_β_{t-1} × κ_down, q_β_floor)
否则:
q_β_t = q_β_{t-1} (保持)
Innovation-based Q 自适应原理
核心洞察(Mehra 1970, Mohamed & Schwarz 1999):
正确指定的 Kalman Filter 的归一化 innovation ε_t/√S_t 应服从 N(0,1),因此 (ε_t/√S_t)² 的期望值为 1。
ν̄_t 状态 |
含义 | 动作 |
|---|---|---|
| ν̄ > γ_upper | Innovation 持续偏大 → Q 过小,滤波器追踪不足 | 增大 Q_β |
| ν̄ < γ_lower | Innovation 持续偏小 → Q 过大,滤波器追踪过度(追噪声) | 减小 Q_β |
| γ_lower ≤ ν̄ ≤ γ_upper | Innovation 符合模型预期 | 保持 Q_β |
自适应效果:
- MEME 配对:β 波动大 → innovation 频繁偏大 → Q_β 自动上调 → 追踪更快
- L1 配对:β 稳定 → innovation 正常 → Q_β 保持或下调 → 避免追噪声
关键输出信号及其用途
| 信号 | 用途 1: 体制检测 | 用途 2: Hedge Ratio |
|---|---|---|
beta (x̂[1]) |
β 趋势监控 | 直接作为 hedge ratio |
alpha (x̂[0]) |
吸收独立漂移,避免污染 β | — |
P_beta (P[1,1]) |
不确定性度量 | hedge ratio 置信度(P_β 过大时降级为 OLS β) |
ε/√S (normalized_innovation) |
BOCPD 输入 | — |
innov_sq_ema (ν̄_t) |
Q 自适应驱动 | — |
参数
| 参数 | 符号 | 默认值 | 含义 | 调优 |
|---|---|---|---|---|
| β 过程噪声初始值 | q_β₀ | 1e-4 | β 每 4h 变化方差先验(由自适应调整) | 初始值,自动调整 |
| α 过程噪声 | q_α | 1e-5 | α 每 4h 变化方差先验(比 β 慢) | 通常不调 |
| R 遗忘因子 | b | 0.95 | Sage-Husa 遗忘因子 | ↑平滑 ↓快适应 |
| R 下限 | R_floor | 1e-8 | 防止 R 退化为零 | — |
| Innovation 截断 | c | 3.0 | Huber 截断(σ 倍数) | ↓鲁棒 ↑灵敏 |
| 初始 [α, β] | — | OLS 估计 | 用现有 OLS 初始化 | — |
| 初始 P | P₀ | diag(0.1, 1.0) | 初始不确定性 | — |
| Q 自适应 EMA 速率 | η | 0.05 | ν̄ 的 EMA 衰减 | ↑快响应 ↓平滑 |
| Q 放大阈值 | γ_upper | 2.0 | ν̄ > 此值时增大 Q_β | — |
| Q 缩小阈值 | γ_lower | 0.3 | ν̄ < 此值时减小 Q_β | — |
| Q 放大因子 | κ_up | 1.05 | Q_β 每步最多放大 5% | — |
| Q 缩小因子 | κ_down | 0.98 | Q_β 每步最多缩小 2% | — |
| Q_β 上限 | q_β_ceil | 1e-2 | 防止 Q 过大 | — |
| Q_β 下限 | q_β_floor | 1e-6 | 防止 Q 过小 | — |
q_β₀ = 1e-4 → 日 β 漂移 ≈ 0.024,周 ≈ 0.065,合理覆盖正常变化。
q_α = q_β/10 → α 变化比 β 慢一个量级,避免争夺解释力。
κ_up > κ_down(放大快、缩小慢):保守策略,宁可追踪快一点也不要漏掉体制变化。
文件:src/config.py
# ═══ Kalman Filter Beta 估计参数 ═══
KALMAN_Q_BETA: float = 1e-4
KALMAN_Q_ALPHA: float = 1e-5
KALMAN_P0_ALPHA: float = 0.1
KALMAN_P0_BETA: float = 1.0
KALMAN_R_FORGET: float = 0.95
KALMAN_R_FLOOR: float = 1e-8
KALMAN_CLIP_SIGMA: float = 3.0
# ═══ Q 自适应参数(v2 新增)═══
KALMAN_Q_ADAPT_ENABLED: bool = True
KALMAN_Q_ADAPT_RATE: float = 0.05 # ν̄ 的 EMA 速率
KALMAN_Q_ADAPT_UPPER: float = 2.0 # ν̄ > 此值 → 增大 Q_β
KALMAN_Q_ADAPT_LOWER: float = 0.3 # ν̄ < 此值 → 减小 Q_β
KALMAN_Q_SCALE_UP: float = 1.05 # Q_β 放大因子
KALMAN_Q_SCALE_DOWN: float = 0.98 # Q_β 缩小因子
KALMAN_Q_BETA_CEIL: float = 1e-2 # Q_β 上限
KALMAN_Q_BETA_FLOOR: float = 1e-6 # Q_β 下限
# ═══ Hedge Ratio 参数 ═══
HEDGE_BETA_MIN: float = 0.1 # β 下限保护(防止极端比例)
HEDGE_BETA_MAX: float = 5.0 # β 上限保护
HEDGE_BETA_P_MAX: float = 0.5 # P_β 超过此值时降级为 OLS β(Kalman 不确定性过高)
文件:src/utils/analysis/analysis_core.py
新增类:VectorKalmanBetaEstimator
import numpy as np
from scipy.special import gammaln # 模块级导入,避免热路径 import
class VectorKalmanBetaEstimator:
"""向量 Kalman Filter 时变 [α, β] 联合估计器
状态空间模型:
状态方程: x_t = x_{t-1} + w_t, w_t ~ N(0, Q_t), x = [α, β]'
观测方程: r_alt_t = [1, r_btc_t] × x_t + v_t, v_t ~ N(0, R_t)
v2 特性:
- 二维状态 [α, β]:截距吸收独立漂移,消除虚假 β 信号
- Joseph 稳定形式:P 更新保证半正定
- Huber innovation clipping:抗厚尾分布
- Sage-Husa 自适应 R:在线校准观测噪声(仅正值更新,修复低估偏差)
- Innovation-based Q_β 自适应:每配对自动最优追踪速度
双用途输出:
- kalman_beta → BOCPD 体制检测(via innovation)+ hedge ratio(直接使用)
- P_beta → hedge ratio 置信度(过大时降级为 OLS β)
每根新 4h K线调用 update() 一次。
"""
def __init__(
self,
alpha_init: float,
beta_init: float,
q_alpha: float = 1e-5,
q_beta: float = 1e-4,
r_init: float = 1e-2,
p0_alpha: float = 0.1,
p0_beta: float = 1.0,
r_forget: float = 0.95,
r_floor: float = 1e-8,
clip_sigma: float = 3.0,
# Q 自适应参数(v2 新增)
q_adapt_enabled: bool = True,
q_adapt_rate: float = 0.05,
q_adapt_upper: float = 2.0,
q_adapt_lower: float = 0.3,
q_scale_up: float = 1.05,
q_scale_down: float = 0.98,
q_beta_ceil: float = 1e-2,
q_beta_floor: float = 1e-6,
):
self.x = np.array([alpha_init, beta_init], dtype=np.float64)
self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
self.P = np.diag([p0_alpha, p0_beta]).astype(np.float64)
self.R = float(r_init)
self._r_forget = r_forget
self._r_floor = r_floor
self._clip_sigma = clip_sigma
self._n_updates = 0
# Q 自适应状态(v2 新增)
self._q_adapt_enabled = q_adapt_enabled
self._q_adapt_rate = q_adapt_rate
self._q_adapt_upper = q_adapt_upper
self._q_adapt_lower = q_adapt_lower
self._q_scale_up = q_scale_up
self._q_scale_down = q_scale_down
self._q_beta_ceil = q_beta_ceil
self._q_beta_floor = q_beta_floor
self._innov_sq_ema = 1.0 # ν̄: EMA of normalized innovation squared
@property
def alpha(self) -> float:
return float(self.x[0])
@property
def beta(self) -> float:
return float(self.x[1])
@property
def beta_variance(self) -> float:
return float(self.P[1, 1])
@property
def q_beta(self) -> float:
return float(self.Q[1, 1])
@property
def innov_sq_ema(self) -> float:
return self._innov_sq_ema
def update(self, r_btc: float, r_alt: float) -> dict:
"""接收一对新的对数收益率,更新 [α, β] 估计
Returns:
dict with keys: alpha, beta, P_beta, P_alpha,
innovation, normalized_innovation, clipped,
kalman_gain_beta, R, q_beta, innov_sq_ema
"""
# 预测步
P_pred = self.P + self.Q
H = np.array([1.0, r_btc], dtype=np.float64)
# Innovation
innovation = r_alt - H @ self.x
S = float(H @ P_pred @ H) + self.R
S = max(S, 1e-15)
sqrt_S = S ** 0.5
norm_innov = innovation / sqrt_S
# Huber clipping
clipped = False
innovation_for_update = innovation
if abs(norm_innov) > self._clip_sigma:
innovation_for_update = self._clip_sigma * sqrt_S * (
1.0 if innovation > 0 else -1.0
)
clipped = True
# Kalman 增益 + 更新
K = (P_pred @ H) / S
self.x = self.x + K * innovation_for_update
# Joseph 稳定形式
I2 = np.eye(2)
A = I2 - np.outer(K, H)
self.P = A @ P_pred @ A.T + self.R * np.outer(K, K)
# Sage-Husa R 自适应(v2 修复:仅正值更新)
self._n_updates += 1
if self._n_updates > 10:
b = self._r_forget
d_t = (1.0 - b) / (1.0 - b ** (self._n_updates + 1))
r_sample = innovation * innovation - float(H @ P_pred @ H)
if r_sample > 0: # v2: 仅正值时更新,避免系统性低估
self.R = (1.0 - d_t) * self.R + d_t * r_sample
self.R = max(self.R, self._r_floor)
# else: R 保持不变(innovation 与模型一致)
# Innovation-based Q_β 自适应(v2 新增)
if self._q_adapt_enabled and self._n_updates > 20:
nu = norm_innov * norm_innov # ν_t
self._innov_sq_ema = (
(1.0 - self._q_adapt_rate) * self._innov_sq_ema
+ self._q_adapt_rate * nu
)
current_q_beta = self.Q[1, 1]
if self._innov_sq_ema > self._q_adapt_upper:
new_q_beta = min(current_q_beta * self._q_scale_up, self._q_beta_ceil)
elif self._innov_sq_ema < self._q_adapt_lower:
new_q_beta = max(current_q_beta * self._q_scale_down, self._q_beta_floor)
else:
new_q_beta = current_q_beta
self.Q[1, 1] = new_q_beta
return {
'alpha': float(self.x[0]),
'beta': float(self.x[1]),
'P_beta': float(self.P[1, 1]),
'P_alpha': float(self.P[0, 0]),
'innovation': innovation,
'normalized_innovation': norm_innov,
'clipped': clipped,
'kalman_gain_beta': float(K[1]),
'R': self.R,
'q_beta': float(self.Q[1, 1]),
'innov_sq_ema': self._innov_sq_ema,
}
def state_dict(self) -> dict:
return {
'x': self.x.tolist(), 'P': self.P.tolist(), 'Q': self.Q.tolist(),
'R': self.R, '_r_forget': self._r_forget, '_r_floor': self._r_floor,
'_clip_sigma': self._clip_sigma, '_n_updates': self._n_updates,
'_q_adapt_enabled': self._q_adapt_enabled,
'_q_adapt_rate': self._q_adapt_rate,
'_q_adapt_upper': self._q_adapt_upper,
'_q_adapt_lower': self._q_adapt_lower,
'_q_scale_up': self._q_scale_up,
'_q_scale_down': self._q_scale_down,
'_q_beta_ceil': self._q_beta_ceil,
'_q_beta_floor': self._q_beta_floor,
'_innov_sq_ema': self._innov_sq_ema,
}
@classmethod
def from_state_dict(cls, d: dict) -> 'VectorKalmanBetaEstimator':
obj = cls.__new__(cls)
obj.x = np.array(d['x'], dtype=np.float64)
obj.P = np.array(d['P'], dtype=np.float64)
obj.Q = np.array(d['Q'], dtype=np.float64)
obj.R = d['R']
obj._r_forget = d['_r_forget']
obj._r_floor = d['_r_floor']
obj._clip_sigma = d['_clip_sigma']
obj._n_updates = d['_n_updates']
obj._q_adapt_enabled = d.get('_q_adapt_enabled', True)
obj._q_adapt_rate = d.get('_q_adapt_rate', 0.05)
obj._q_adapt_upper = d.get('_q_adapt_upper', 2.0)
obj._q_adapt_lower = d.get('_q_adapt_lower', 0.3)
obj._q_scale_up = d.get('_q_scale_up', 1.05)
obj._q_scale_down = d.get('_q_scale_down', 0.98)
obj._q_beta_ceil = d.get('_q_beta_ceil', 1e-2)
obj._q_beta_floor = d.get('_q_beta_floor', 1e-6)
obj._innov_sq_ema = d.get('_innov_sq_ema', 1.0)
return obj
改动函数:calculate_cointegration_params_dual_window()
在现有 OLS 之后,新增 Kalman Filter 输出:
def calculate_cointegration_params_dual_window(
base_klines, alt_klines,
beta_window=None, zscore_window=None,
kalman_state: dict | None = None, # [新增]
):
# ... 现有 OLS 逻辑不变 ...
# ═══ 新增:向量 Kalman Filter 更新 ═══
kalman_result = None
kalman_state_out = kalman_state
if len(aligned) >= 2:
r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])
if kalman_state is not None:
kf = VectorKalmanBetaEstimator.from_state_dict(kalman_state)
else:
ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
kf = VectorKalmanBetaEstimator(
alpha_init=alpha_ols if use_alpha else 0.0,
beta_init=beta_ols,
q_alpha=KALMAN_Q_ALPHA, q_beta=KALMAN_Q_BETA,
r_init=max(ols_residual_var, 1e-6),
p0_alpha=KALMAN_P0_ALPHA, p0_beta=KALMAN_P0_BETA,
r_forget=KALMAN_R_FORGET, r_floor=KALMAN_R_FLOOR,
clip_sigma=KALMAN_CLIP_SIGMA,
q_adapt_enabled=KALMAN_Q_ADAPT_ENABLED,
q_adapt_rate=KALMAN_Q_ADAPT_RATE,
q_adapt_upper=KALMAN_Q_ADAPT_UPPER,
q_adapt_lower=KALMAN_Q_ADAPT_LOWER,
q_scale_up=KALMAN_Q_SCALE_UP,
q_scale_down=KALMAN_Q_SCALE_DOWN,
q_beta_ceil=KALMAN_Q_BETA_CEIL,
q_beta_floor=KALMAN_Q_BETA_FLOOR,
)
kalman_result = kf.update(r_btc, r_alt)
kalman_state_out = kf.state_dict()
return {
# ... 现有字段不变 ...
'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
'kalman_clipped': kalman_result['clipped'] if kalman_result else False,
'kalman_q_beta': kalman_result['q_beta'] if kalman_result else KALMAN_Q_BETA,
'kalman_innov_sq_ema': kalman_result['innov_sq_ema'] if kalman_result else 1.0,
'kalman_state': kalman_state_out,
}
改动函数:analyze_pair_advanced() / analyze_multi_period()
透传方式与 v1 相同,新增透传 kalman_q_beta 和 kalman_innov_sq_ema 字段。
4.2 BOCPD 贝叶斯在线变点检测(用途 1)
算法原理
维护 run length(距上次变点的步数)的后验分布:
P(r_t | data_1:t)
r_t = 0 → 当前时刻刚发生变点
r_t = n → 距上次变点已过 n 步
递推公式:
1. 预测概率(当前观测在 run length = r 下的似然):
π_t(r) = P(x_t | x_{t-r:t-1})
2. Growth(无变点):
P(r_t = r+1, data) = P(r_{t-1} = r, data) × π_t(r) × (1 - H_t)
3. Changepoint(变点,run 重置):
P(r_t = 0, data) = Σ_r P(r_{t-1} = r, data) × π_t(r) × H_t
4. 归一化
其中 H_t = hazard rate(先验变点概率)。v2 改进:H_t 由 innovation 方差自适应驱动(见下文)。
观测模型:监控 Kalman normalized_innovation 序列,采用 正态-逆Gamma 共轭先验,预测分布为 Student-t:
充分统计量(每个 run 维护):
κ_r = κ₀ + n_r
μ̂_r = (κ₀ × μ₀ + Σ x_i) / κ_r
α_r = α₀ + n_r / 2
β_nig_r = β_nig₀ + 0.5 × (Σ x_i² - κ_r × μ̂_r² + κ₀ × μ₀²)
预测分布: Student-t(2α_r), location=μ̂_r, scale²=β_nig_r(κ_r+1)/(α_r κ_r)
v2 命名改进:充分统计量中的
β改为β_nig(Normal-Inverse-Gamma 的 β 参数),避免与回归系数 β 混淆。
自适应 Hazard Rate(v2 新增)
固定 H=1/50 的问题:β 剧烈波动时检测太慢,β 稳定时假阳性过多。
自适应策略:利用 Kalman 输出的 innov_sq_ema(ν̄_t)调整 H:
H_t = H_base × max(1.0, ν̄_t / ν_ref)
H_t = clip(H_t, H_min, H_max)
H_base = 1/50 ≈ 0.02
ν_ref = 1.5 (开始放大的阈值)
H_min = 1/200 = 0.005
H_max = 1/10 = 0.1
| ν̄_t 状态 | 含义 | H_t | 效果 |
|---|---|---|---|
| ν̄ ≈ 1.0 | innovation 正常 | H_base (0.02) | 标准检测灵敏度 |
| ν̄ ≈ 3.0 | innovation 偏大 | H_base × 2.0 (0.04) | 更容易检测变点 |
| ν̄ ≈ 6.0 | innovation 很大 | H_max (0.10) | 最高灵敏度 |
原理:ν̄_t 大意味着 Kalman 模型不适配(Q 正在调整中或 β 正在快速变化),此时应提高变点先验概率。
文件:src/trading/config.py
StrategyParams 新增字段:
@dataclass(frozen=True)
class StrategyParams:
# ... 现有字段 ...
# Beta 体制自适应过滤器(Vector Kalman + BOCPD)
beta_regime_enabled: bool = True
beta_regime_hazard_rate: float = 1/50 # 基础先验变点概率 ≈ 每50根4h K线(~8天)一次
beta_regime_max_run_length: int = 200 # run length 截断(内存控制)
beta_regime_soft_prob: float = 0.3 # P(变点) > 此值 → 开始缩放阈值
beta_regime_hard_prob: float = 0.7 # P(变点) > 此值 → 硬拦截
beta_regime_scale_max: float = 2.0 # 阈值最大缩放倍数
beta_regime_warmup: int = 5 # 最少 N 根 4h K线才开始判定
# v2 新增
beta_regime_adaptive_hazard: bool = True # 启用自适应 Hazard Rate
beta_regime_hazard_min: float = 1/200 # H 下限
beta_regime_hazard_max: float = 1/10 # H 上限
beta_regime_hazard_innov_ref: float = 1.5 # ν̄ 开始放大 H 的阈值
文件:src/trading/strategy.py
_BetaRegimeState 数据类
@dataclass
class _BetaRegimeState:
"""Beta 体制检测结果"""
regime: str # 'stable' | 'expanding'
changepoint_prob: float # 变点概率 P(r_t < warmup)
expected_run_length: float
kalman_P_beta: float
threshold_scale: float # 阈值缩放因子 (>=1.0)
hard_block: bool
reason: str
_BOCPDDetector 类
class _BOCPDDetector:
"""Bayesian Online Changepoint Detection (Adams & MacKay, 2007)
监控 Kalman normalized_innovation 的分布变化,
通过 run length 后验分布检测 β 的结构性变点。
v2 改进:
- 自适应 Hazard Rate(基于 innovation 方差动态调整)
- 充分统计量命名消歧(β_nig vs 回归 β)
- 截断尾部概率合并(减少归一化偏差)
- scipy.special.gammaln 模块级导入
内存: O(max_run_length) per pair
计算: O(max_run_length) per update
"""
# Normal-Inverse-Gamma 先验超参
_MU0 = 0.0
_KAPPA0 = 1.0
_ALPHA0 = 1.0
_BETA_NIG0 = 0.5 # v2: 改名避免与回归 β 混淆
def __init__(
self,
hazard_rate: float = 1/50,
max_run_length: int = 200,
# v2 自适应 Hazard 参数
adaptive_hazard: bool = True,
hazard_min: float = 1/200,
hazard_max: float = 1/10,
hazard_innov_ref: float = 1.5,
):
self._H_base = hazard_rate
self._max_rl = max_run_length
self._adaptive_hazard = adaptive_hazard
self._hazard_min = hazard_min
self._hazard_max = hazard_max
self._hazard_innov_ref = hazard_innov_ref
self._run_length_probs: dict[PairKey, np.ndarray] = {}
self._suf_stats: dict[PairKey, dict] = {}
self._last_kline_time: dict[PairKey, str] = {}
self._n_updates: dict[PairKey, int] = {}
def _get_hazard(self, innov_sq_ema: float | None) -> float:
"""计算当前 hazard rate(v2 自适应)"""
if not self._adaptive_hazard or innov_sq_ema is None:
return self._H_base
scale = max(1.0, innov_sq_ema / self._hazard_innov_ref)
h = self._H_base * scale
return np.clip(h, self._hazard_min, self._hazard_max)
def update(self, key: PairKey, normalized_innovation: float,
kline_time: str,
innov_sq_ema: float | None = None, # v2: 用于自适应 H
) -> None:
"""每 5m tick 调用,基于 kline_time 去重"""
if normalized_innovation is None:
return
if self._last_kline_time.get(key) == kline_time:
return
self._last_kline_time[key] = kline_time
x = normalized_innovation
H = self._get_hazard(innov_sq_ema) # v2: 自适应 hazard
# 初始化
if key not in self._run_length_probs:
self._run_length_probs[key] = np.array([1.0])
self._suf_stats[key] = {
'kappa': np.array([self._KAPPA0]),
'mu': np.array([self._MU0]),
'alpha': np.array([self._ALPHA0]),
'beta_nig': np.array([self._BETA_NIG0]), # v2: 改名
}
self._n_updates[key] = 0
self._n_updates[key] = self._n_updates.get(key, 0) + 1
probs = self._run_length_probs[key]
ss = self._suf_stats[key]
n_rl = len(probs)
# Step 1: 每个 run length 下的 Student-t 预测概率
kappa, mu, alpha = ss['kappa'], ss['mu'], ss['alpha']
beta_nig = ss['beta_nig'] # v2: 改名
df = 2.0 * alpha
scale = np.sqrt(np.maximum(beta_nig * (kappa + 1.0) / (alpha * kappa), 1e-15))
pred_probs = self._student_t_pdf(x, df, mu, scale)
# Step 2-3: Growth + Changepoint
growth_probs = probs * pred_probs * (1.0 - H)
cp_prob = np.sum(probs * pred_probs * H)
# Step 4: 组合 + 归一化
new_probs = np.empty(n_rl + 1)
new_probs[0] = cp_prob
new_probs[1:] = growth_probs
total = np.sum(new_probs)
if total > 0:
new_probs /= total
else:
new_probs = np.zeros(n_rl + 1)
new_probs[0] = 1.0
# Step 5: 更新充分统计量
new_kappa = np.empty(n_rl + 1)
new_mu = np.empty(n_rl + 1)
new_alpha = np.empty(n_rl + 1)
new_beta_nig = np.empty(n_rl + 1)
new_kappa[0], new_mu[0] = self._KAPPA0, self._MU0
new_alpha[0], new_beta_nig[0] = self._ALPHA0, self._BETA_NIG0
new_kappa[1:] = kappa + 1.0
new_mu[1:] = (kappa * mu + x) / (kappa + 1.0)
new_alpha[1:] = alpha + 0.5
new_beta_nig[1:] = beta_nig + kappa * (x - mu) ** 2 / (2.0 * (kappa + 1.0))
# Step 6: 截断(v2 改进:尾部概率合并到最后一个 run length)
if len(new_probs) > self._max_rl:
tail_mass = np.sum(new_probs[self._max_rl:])
new_probs = new_probs[:self._max_rl]
new_probs[-1] += tail_mass # v2: 合并而非丢弃
new_kappa = new_kappa[:self._max_rl]
new_mu = new_mu[:self._max_rl]
new_alpha = new_alpha[:self._max_rl]
new_beta_nig = new_beta_nig[:self._max_rl]
total = np.sum(new_probs)
if total > 0:
new_probs /= total
self._run_length_probs[key] = new_probs
self._suf_stats[key] = {
'kappa': new_kappa, 'mu': new_mu,
'alpha': new_alpha, 'beta_nig': new_beta_nig,
}
@staticmethod
def _student_t_pdf(x: float, df: np.ndarray, loc: np.ndarray,
scale: np.ndarray) -> np.ndarray:
"""向量化 Student-t PDF(gammaln 已在模块级导入)"""
z = (x - loc) / scale
log_pdf = (
gammaln((df + 1) / 2) - gammaln(df / 2)
- 0.5 * np.log(df * np.pi) - np.log(scale)
- (df + 1) / 2 * np.log(1 + z * z / df)
)
return np.exp(log_pdf)
def check(self, key: PairKey, soft_prob: float, hard_prob: float,
scale_max: float, warmup: int) -> _BetaRegimeState:
"""检测当前 Beta 体制状态
变点概率定义: P(r_t < warmup) — 最近 warmup 步内是否有变点
"""
n = self._n_updates.get(key, 0)
if n < warmup:
return _BetaRegimeState('stable', 0.0, float(n), 0.0, 1.0, False, "数据不足")
probs = self._run_length_probs.get(key)
if probs is None or len(probs) == 0:
return _BetaRegimeState('stable', 0.0, 0.0, 0.0, 1.0, False, "无数据")
cp_prob = float(np.sum(probs[:min(warmup, len(probs))]))
expected_rl = float(np.sum(np.arange(len(probs)) * probs))
if cp_prob >= hard_prob:
return _BetaRegimeState(
'expanding', cp_prob, expected_rl, 0.0, scale_max, True,
f"Beta硬拦截: P(变点)={cp_prob:.3f}>={hard_prob} E[rl]={expected_rl:.1f}"
)
if cp_prob >= soft_prob:
import math
midpoint = (soft_prob + hard_prob) / 2
steepness = 6.0 / max(hard_prob - soft_prob, 0.01)
steepness = min(steepness, 20.0) # v2: 上限防止退化为阶跃函数
t = 1.0 / (1.0 + math.exp(-steepness * (cp_prob - midpoint)))
scale = 1.0 + t * (scale_max - 1.0)
return _BetaRegimeState(
'expanding', cp_prob, expected_rl, 0.0, scale, False,
f"Beta缩放: P(变点)={cp_prob:.3f} scale={scale:.2f} E[rl]={expected_rl:.1f}"
)
return _BetaRegimeState(
'stable', cp_prob, expected_rl, 0.0, 1.0, False,
f"Beta稳定: P(变点)={cp_prob:.3f} E[rl]={expected_rl:.1f}"
)
def cleanup_pair(self, key: PairKey) -> None:
self._run_length_probs.pop(key, None)
self._suf_stats.pop(key, None)
self._last_kline_time.pop(key, None)
self._n_updates.pop(key, None)
4.3 跨配对系统性风险聚合(v2 加权版)
设计原理
v1 使用简单的 expanding 配对计数比例,存在两个问题:
- 忽略配对间差异:8 个同赛道配对 expanding 与 8 个分散配对含义不同
- 无仓位加权:持仓量大的配对风险更高
v2 使用加权变点概率作为系统性风险指标,同时保留计数比例作为辅助判断。
文件:src/trading/strategy.py
class _SystemicRiskAggregator:
"""统计所有配对的 BOCPD 体制状态,
基于加权变点概率判断系统性风险。
v2 改进:
- 加权聚合(支持仓位权重)
- 双条件触发(加权概率 OR 计数比例)
- 输出更详细的诊断信息
"""
def __init__(self, enabled: bool = True):
self._enabled = enabled
def check(
self,
pair_states: dict[PairKey, _BetaRegimeState],
systemic_threshold: float = 0.3,
position_weights: dict[PairKey, float] | None = None,
) -> tuple[bool, str]:
if not self._enabled or not pair_states:
return False, ""
total = len(pair_states)
n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
ratio = n_expanding / total
# 加权平均变点概率(v2 新增)
if position_weights:
total_weight = sum(position_weights.get(k, 1.0) for k in pair_states)
weighted_cp = sum(
position_weights.get(k, 1.0) * s.changepoint_prob
for k, s in pair_states.items()
) / total_weight
else:
# 无仓位权重时用等权平均
weighted_cp = sum(s.changepoint_prob for s in pair_states.values()) / total
# 双条件触发:计数比例 OR 加权概率
triggered = ratio >= systemic_threshold or weighted_cp >= systemic_threshold * 0.8
if triggered:
return True, (
f"系统性风险: {n_expanding}/{total} ({ratio:.0%}) 配对扩张态, "
f"加权P(变点)={weighted_cp:.3f}"
)
return False, ""
4.4 Beta 加权仓位计算(用途 2,v2 负 β 感知版)
设计原理
标准配对交易的对冲方程:
spread_t = log(P_alt_t) - β × log(P_base_t) - α
要使 spread 对 P_base 的变动无敞口,两腿名义价值须满足:
alt_notional = |β| × base_notional
当前系统使用 alt_notional = base_notional(等额),等价于隐含 β=1,对所有配对都错。
hedge_beta 选择逻辑(v2 改进)
v2 改进:返回 β 的符号信息(beta_negative),供策略层判断方向逻辑。当 β < 0 时两资产反向相关,long alt + long base 才是对冲(而非 long alt + short base)。
def resolve_hedge_beta(
kalman_beta: float | None,
kalman_P_beta: float | None,
ols_beta: float | None,
p_beta_max: float = HEDGE_BETA_P_MAX,
beta_min: float = HEDGE_BETA_MIN,
beta_max: float = HEDGE_BETA_MAX,
) -> tuple[float, str, bool]:
"""选择用于仓位计算的 hedge_beta
优先 Kalman β,不确定性过高时降级为 OLS β,最后兜底 β=1.0
Returns:
(hedge_beta, source, beta_negative)
hedge_beta: 绝对值,用于仓位比例计算
source: 'kalman' | 'ols' | 'default'
beta_negative: True 表示负相关配对,策略层需翻转方向
"""
raw_beta = None
source = 'default'
# 优先使用 Kalman β
if kalman_beta is not None and kalman_P_beta is not None:
if kalman_P_beta <= p_beta_max:
raw_beta = kalman_beta
source = 'kalman'
else:
logger.debug(f"Kalman P_β={kalman_P_beta:.4f} > {p_beta_max},降级为 OLS β")
# 降级使用 OLS β
if raw_beta is None and ols_beta is not None:
raw_beta = ols_beta
source = 'ols'
# 兜底
if raw_beta is None:
return 1.0, 'default', False
beta_negative = raw_beta < 0
beta = np.clip(abs(raw_beta), beta_min, beta_max)
if beta_negative:
logger.warning(f"负 β 检测: raw_β={raw_beta:.4f}({source}),配对为反向相关")
return float(beta), source, beta_negative
文件:src/trading/risk_manager.py
改动 calculate_position_size() 增加 hedge_beta 参数:
def calculate_position_size(
self,
signal: PairTradeSignal,
alt_price: float,
base_price: float = 0.0,
available_balance: float = 0.0,
hedge_beta: float = 1.0, # [新增] hedge ratio
hedge_beta_source: str = '', # [新增] 来源标记 ('kalman'/'ols'/'default')
) -> tuple[float, float]:
"""计算仓位大小(Beta 加权)
Beta 加权逻辑:
base_notional = total_position / (1 + |β|)
alt_notional = |β| × base_notional
总名义价值 = base_notional + alt_notional = total_position(保持不变)
当 β=1.0 时退化为等额(向后兼容)。
"""
# ... 现有 base_usd / 信号强度缩放 / 上限检查 / 余额检查 逻辑不变 ...
# 转换为币种数量(Beta 加权)
alt_size = 0.0
base_size = 0.0
if self._config.pair_mode == "pair" and base_price > 0 and alt_price > 0:
abs_beta = max(abs(hedge_beta), 0.1)
total_position = position_usd * 2 # 两腿总名义(原逻辑中每腿 position_usd)
base_notional = total_position / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
alt_size = alt_notional / alt_price
base_size = base_notional / base_price
logger.info(
f"仓位计算(β加权): 基础=${base_usd:.0f} × 缩放={scale} | "
f"β={hedge_beta:.3f}({hedge_beta_source}) | "
f"Alt: {alt_size:.2f} × ${alt_price:.4f} ≈ ${alt_notional:.0f} | "
f"Base: {base_size:.2f} × ${base_price:.4f} ≈ ${base_notional:.0f} | "
f"比例 {alt_notional/base_notional:.2f}:1"
)
elif alt_price > 0:
alt_size = position_usd / alt_price
logger.info(
f"仓位计算(单腿): 基础=${base_usd:.0f} × 缩放={scale} = ${position_usd:.0f} | "
f"开仓: {alt_size:.2f} × ${alt_price:.4f}"
)
return alt_size, base_size
β 加权的名义分配数学:
| β 值 | base_notional | alt_notional | 比例 |
|---|---|---|---|
| 0.3 | $154 | $46 | 0.3:1 |
| 0.5 | $133 | $67 | 0.5:1 |
| 1.0 | $100 | $100 | 1.0:1(等额,向后兼容) |
| 2.0 | $67 | $133 | 2.0:1 |
| 3.0 | $50 | $150 | 3.0:1 |
(以每腿 $100 基准,total_position=$200 为例)
文件:src/trading/models.py
PairTradeSignal 新增字段:
@dataclass
class PairTradeSignal:
# ... 现有字段 ...
hedge_beta: float = 1.0 # [新增] 用于仓位比例的 β
hedge_beta_source: str = 'default' # [新增] β 来源 ('kalman'/'ols'/'default')
beta_negative: bool = False # [v2 新增] 负 β 标记
PairPosition 新增字段:
@dataclass
class PairPosition:
# ... 现有字段 ...
entry_hedge_beta: float = 1.0 # [新增] 入场时的 hedge β(用于复盘分析)
4.5 编排层集成
文件:src/trading/orchestrator.py
新增状态:_kalman_states: dict[PairKey, dict]
class TradingOrchestrator:
def __init__(self, ...):
# ... 现有 ...
self._kalman_states: dict[PairKey, dict] = {} # [新增] Kalman 状态持久化
process_analysis() 改动:从 multi_period_result 提取 Kalman 输出,传给 strategy.process_tick() 和 on_entry_signal():
def process_analysis(self, symbol, z4h, multi_period_result, timestamp, ...):
# ... 现有输入验证 ...
# [新增] 提取 Kalman 输出
kalman_beta = multi_period_result.get('kalman_beta')
kalman_P_beta = multi_period_result.get('kalman_P_beta')
kalman_innovation = multi_period_result.get('kalman_innovation')
kalman_innov_sq_ema = multi_period_result.get('kalman_innov_sq_ema') # v2
# [新增] 更新 Kalman 状态缓存
kalman_state_new = multi_period_result.get('kalman_state')
if kalman_state_new is not None:
self._kalman_states[(symbol, base_symbol)] = kalman_state_new
entry_signal, exit_signal = self._strategy.process_tick(
symbol, base_symbol, z4h, timestamp,
kline_time=kline_time, latest_price=price_for_log,
kalman_innovation=kalman_innovation, # [新增] 用途 1
kalman_P_beta=kalman_P_beta, # [新增] 用途 1
kalman_innov_sq_ema=kalman_innov_sq_ema, # [v2] 用于自适应 H
alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv,
)
# 处理入场信号
if entry_signal is not None:
# [新增] 计算 hedge_beta(用途 2)
ols_beta = multi_period_result.get('details', {}).get(
('4h', '60d'), {}).get('cointegration_new', {}).get('beta')
hedge_beta, hedge_source, beta_negative = resolve_hedge_beta(
kalman_beta, kalman_P_beta, ols_beta
)
# v2: 负 β 时记录警告(方向逻辑调整为 P2 后续)
if beta_negative:
logger.warning(
f"负 β 配对 {symbol}|{base_symbol}: β={kalman_beta:.3f}, "
f"当前方向逻辑可能需要翻转,建议人工复核"
)
self.on_entry_signal(
symbol, multi_period_result,
latest_alt_price=price_for_log,
direction=entry_signal.direction,
adaptive_z=entry_signal.adaptive_z,
hedge_beta=hedge_beta, # [新增]
hedge_beta_source=hedge_source, # [新增]
beta_negative=beta_negative, # [v2]
)
on_entry_signal() 改动:传递 hedge_beta 到信号:
def on_entry_signal(
self, symbol, multi_period_result,
latest_alt_price=None, avg_zscore_4h=None,
direction="", signal_strength="medium", adaptive_z=0.0,
l2_snapshot=None,
hedge_beta: float = 1.0, # [新增]
hedge_beta_source: str = 'default', # [新增]
beta_negative: bool = False, # [v2]
) -> bool:
# ...
signal = PairTradeSignal(
# ... 现有字段 ...
hedge_beta=hedge_beta, # [新增]
hedge_beta_source=hedge_beta_source, # [新增]
beta_negative=beta_negative, # [v2]
)
# ...
文件:src/trading/position_manager.py
_open_position_inner() 改动:传递 hedge_beta 到 risk_manager:
def _open_position_inner(self, signal: PairTradeSignal, adaptive_z: float = 0.0):
# ... 现有价格获取 ...
alt_size, base_size = self._risk_manager.calculate_position_size(
signal, alt_price, base_price, available_balance,
hedge_beta=signal.hedge_beta, # [新增]
hedge_beta_source=signal.hedge_beta_source, # [新增]
)
position = PairPosition(
# ... 现有字段 ...
entry_hedge_beta=signal.hedge_beta, # [新增] 记录入场 β
)
4.6 策略层集成(用途 1)
文件:src/trading/strategy.py
__init__:
self._bocpd = _BOCPDDetector(
hazard_rate=default_params.beta_regime_hazard_rate,
max_run_length=default_params.beta_regime_max_run_length,
adaptive_hazard=default_params.beta_regime_adaptive_hazard, # v2
hazard_min=default_params.beta_regime_hazard_min, # v2
hazard_max=default_params.beta_regime_hazard_max, # v2
hazard_innov_ref=default_params.beta_regime_hazard_innov_ref, # v2
)
self._systemic_aggregator = _SystemicRiskAggregator(enabled=True)
self._all_pair_states: dict[PairKey, _BetaRegimeState] = {}
process_tick() 签名新增:
def process_tick(
self, symbol, base_symbol, z4h, timestamp,
kline_time=None, latest_price=None,
kalman_innovation: float | None = None, # [新增] 用途 1
kalman_P_beta: float | None = None, # [新增] 用途 1
kalman_innov_sq_ema: float | None = None, # [v2] 用于自适应 H
alt_ohlcv=None, base_ohlcv=None,
) -> tuple[EntrySignal | None, ExitSignal | None]:
_process_tick_unlocked 新 K 线更新块:
if is_new_candle:
# ... 现有: Welford 更新, EMA 更新 ...
if kalman_innovation is not None and kline_time is not None:
self._bocpd.update(
key, kalman_innovation, str(kline_time),
innov_sq_ema=kalman_innov_sq_ema, # v2: 传递给自适应 H
)
_check_entry() — z4h 过滤之后、方向判断之前:
# ── [新增] Beta 体制检查 ──
if params.beta_regime_enabled:
beta_state = self._bocpd.check(
key,
soft_prob=params.beta_regime_soft_prob,
hard_prob=params.beta_regime_hard_prob,
scale_max=params.beta_regime_scale_max,
warmup=params.beta_regime_warmup,
)
self._all_pair_states[key] = beta_state
if beta_state.hard_block:
logger.info(f"Beta体制硬拦截 | {pair_label} | {beta_state.reason} | "
f"az={adaptive_z:+.4f} z4h={z4h:+.4f}")
return None
is_systemic, systemic_reason = self._systemic_aggregator.check(self._all_pair_states)
if is_systemic:
logger.info(f"系统性风险拦截 | {pair_label} | {systemic_reason}")
return None
threshold_scale = beta_state.threshold_scale
else:
beta_state = None
threshold_scale = 1.0
# ── 方向判断(应用 Beta 缩放)──
threshold = params.adaptive_threshold * threshold_scale
if adaptive_z < -threshold:
direction = 'long'
elif adaptive_z > threshold:
direction = 'short'
else:
if threshold_scale > 1.01:
logger.info(f"Beta体制缩放拦截 | {pair_label} | "
f"az={adaptive_z:+.4f} 有效阈值={threshold:.2f} "
f"(原始={params.adaptive_threshold} ×{threshold_scale:.2f}) | "
f"{beta_state.reason if beta_state else ''}")
return None
cleanup_pair():
self._bocpd.cleanup_pair(key)
self._all_pair_states.pop(key, None)
5. 完整数据流
1. WebSocket 4h K线闭合
↓
2. realtime_kline_service_base 触发分析
↓
3. analyze_multi_period()
├─ calculate_cointegration_params_dual_window(kalman_state=缓存)
│ ├─ OLS 回归 → β_ols, spread, adf_pvalue(现有)
│ └─ [新增] Kalman update → kalman_beta, innovation, P_β, kalman_state
│ └─ [v2] Q_β 自适应 → q_beta 自动调整, innov_sq_ema 输出
├─ 健康监控 → Gate2(现有)
└─ 输出: multi_period_result(含 kalman_* 字段)
↓
4. orchestrator.process_analysis()
├─ 缓存 kalman_state → self._kalman_states[pair_key]
├─ 传递 kalman_innovation, kalman_P_beta, kalman_innov_sq_ema → strategy.process_tick()
│ ├─ [用途 1] BOCPD.update(innovation, innov_sq_ema) → 体制检测
│ │ └─ [v2] 自适应 H: innov_sq_ema 驱动 hazard rate 调整
│ └─ [用途 1] _check_entry() → 硬拦截 / 阈值缩放 / 系统性风险
├─ 若产生 EntrySignal:
│ ├─ [用途 2] resolve_hedge_beta(kalman_beta, P_β, ols_beta) → (hedge_beta, source, beta_negative)
│ └─ on_entry_signal(hedge_beta=...) → PairTradeSignal
↓
5. position_manager.open_position(signal)
├─ risk_manager.calculate_position_size(hedge_beta=signal.hedge_beta)
│ ├─ base_notional = total / (1 + |β|)
│ └─ alt_notional = |β| × base_notional
├─ executor.market_open(alt_size, base_size)
└─ PairPosition(entry_hedge_beta=β)
↓
6. 持久化
├─ pair_positions: 含 entry_hedge_beta
└─ trading_signals: 含 hedge_beta, hedge_beta_source, beta_negative
6. 场景模拟
A:正常市场(β 稳定) — 两个用途均正常
innovation ~ N(0,1): z = [-0.3, 0.8, -0.5, 0.2, ...]
kalman_beta ≈ 0.45, P_β < 0.01
innov_sq_ema ≈ 0.9 → Q_β 保持不变
用途 1: BOCPD P(变点) ≈ 0.02~0.05 → regime=STABLE, scale=1.0 → 正常入场
H_t = H_base (0.02)(自适应 H 不放大)
用途 2: hedge_beta=0.45(kalman) → alt_notional = 0.45 × base_notional
比等额更少的 Alt 暴露,正确反映 β<1 的弱相关性
B:β 开始飙升 — Q 自适应 + 自适应 H 联合响应
T=0h: kalman_beta ≈ 0.5, innov_sq_ema ≈ 1.0 → 正常
T=4h: innovation 开始偏大, innov_sq_ema → 1.8
Q_β 自适应: innov_sq_ema < γ_upper(2.0),Q_β 暂不调整
T=8h: innovation 持续偏大, innov_sq_ema → 2.5
Q_β 自适应: innov_sq_ema > γ_upper → Q_β *= 1.05(开始加速追踪)
自适应 H: innov_sq_ema/1.5 = 1.67 → H_t = 0.02 × 1.67 = 0.033
T=12h: kalman_beta ≈ 1.2, innovation 持续偏正
用途 1: P(变点) ≈ 0.40 > soft_prob(0.3) → EXPANDING, scale≈1.15
用途 2: hedge_beta=1.2 → Alt 腿名义增大(如果入场的话)
T=16h: P(变点) ≈ 0.80 > hard_prob(0.7) → 硬拦截
用途 2: 不会执行(入场被拦截)
对比 v1(固定 Q + 固定 H):
- v1 的 Q 过小时 kalman_beta 追踪慢 → innovation 持续偏大更久 → 拦截延迟
- v2 的 Q 自适应加速追踪 + H 自适应提高灵敏度 → 更早检测到变点
C:β 飙升后企稳 — 恢复后使用新 β
飙升期 P(变点) = 0.85 → 硬拦截
企稳后 innovation 恢复 N(0,1):
innov_sq_ema → 1.0, Q_β 自适应开始缩小 Q_β
H_t 恢复到 H_base (0.02)
→ ~6-8 根正常 K线(24-32h)后 P(变点) < soft_prob → 恢复
恢复后:
用途 1: regime=STABLE → 允许入场
用途 2: kalman_beta 已追踪到新 β(如 1.8),开仓自动使用新比例
不像 OLS 还在用旧 β≈0.6
D:β≈0.3 的弱相关配对 — hedge ratio 纠偏最显著
OLS β ≈ 0.31,Kalman β ≈ 0.29
等额开仓(当前系统):
Alt $100 + Base $100
Alt 的非系统性风险暴露 = $100 - 0.3×$100 = $70(多余)
β 加权开仓:
base_notional = $200 / 1.3 ≈ $154
alt_notional = 0.3 × $154 ≈ $46
→ Alt 仅 $46,正确对冲 Base $154 的 0.3 倍
E:Kalman 冷启动 — 降级策略
系统重启,kalman_state 丢失:
T=0: 用 OLS β 初始化 Kalman,P₀ = diag(0.1, 1.0)
P_β = 1.0 > HEDGE_BETA_P_MAX(0.5)
→ resolve_hedge_beta 降级为 OLS β
T=3-5 根 K线(12-20h):
Kalman 收敛,P_β < 0.1
→ resolve_hedge_beta 切回 Kalman β
Q_β 自适应也完成校准
F:系统性风险(v2 加权版)
20 配对中 8 个 P(变点) > 0.3 → 40% > systemic_threshold=30%
加权P(变点) = 0.35 > systemic_threshold × 0.8 = 0.24 → 也触发
→ 系统性风险拦截 → 所有配对暂停入场
→ hedge_beta 仍在后台更新,恢复后使用最新值
G:MEME vs L1 配对 Q 自适应对比(v2 新增场景)
MEME 配对 (PURR/HYPE):
β 波动大 → innovation 频繁偏大 → innov_sq_ema ≈ 3.0
→ Q_β 自动从 1e-4 上调到 ~5e-4
→ Kalman 追踪更快:8-12h 响应 β 变化
→ 自适应 H = 0.02 × 2.0 = 0.04:变点检测更灵敏
L1 配对 (ETH/BTC):
β 极稳定 → innovation 正常 → innov_sq_ema ≈ 0.5
→ Q_β 自动从 1e-4 下调到 ~3e-5
→ Kalman 更平滑:不追日内噪声
→ 自适应 H 保持 0.02:不过度检测
7. 改动文件清单
| 文件 | 改动 | 影响范围 |
|---|---|---|
src/config.py |
+7 Kalman 常量 + 8 Q 自适应常量(v2) + 3 Hedge Ratio 常量 | 配置层 |
src/utils/analysis/analysis_core.py |
+VectorKalmanBetaEstimator 类(含 Q 自适应 v2);增强 calculate_cointegration_params_dual_window(), analyze_pair_advanced(), analyze_multi_period() |
分析层 |
src/trading/config.py |
StrategyParams +7 字段 + 3 自适应 H 字段(v2);增强 get_strategy_params(), _build_strategy_params(), load_trading_config() |
配置层 |
src/trading/models.py |
PairTradeSignal +3 字段 (hedge_beta, hedge_beta_source, beta_negative);PairPosition +1 字段 (entry_hedge_beta) |
数据模型 |
src/trading/orchestrator.py |
+_kalman_states 字典;+resolve_hedge_beta() 函数(v2 返回 beta_negative);增强 process_analysis() 提取 Kalman 输出 + 计算 hedge_beta;增强 on_entry_signal() 传递 hedge_beta |
编排层 |
src/trading/strategy.py |
+_BetaRegimeState, _BOCPDDetector(v2 自适应 H + 命名消歧 + 截断改进), _SystemicRiskAggregator(v2 加权);增强 process_tick(), _check_entry(), cleanup_pair() |
策略层 |
src/trading/risk_manager.py |
增强 calculate_position_size() 支持 β 加权 |
风控层 |
src/trading/position_manager.py |
增强 _open_position_inner() 传递 hedge_beta |
仓位管理 |
新增依赖:scipy.special.gammaln(scipy 已在项目中,模块级导入)
不改动:momentum_filter.py, executor.py
8. DB 改动
pair_positions 表新增列
ALTER TABLE pair_positions ADD COLUMN entry_hedge_beta DOUBLE PRECISION DEFAULT 1.0;
trading_signals 表新增列
ALTER TABLE trading_signals ADD COLUMN hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta_source VARCHAR(10) DEFAULT 'default';
ALTER TABLE trading_signals ADD COLUMN beta_negative BOOLEAN DEFAULT FALSE; -- v2 新增
9. 日志
| 时机 | 级别 | 格式 |
|---|---|---|
| 初始化 | INFO | Beta体制跟踪器初始化 | BOCPD hazard=0.020 adaptive=True soft=0.30 hard=0.70 |
| Kalman 更新 | DEBUG | Kalman更新 | PURR|HYPE | α̂=0.002 β̂=1.20 P_β=0.005 innov=2.85 q_β=1.2e-4 ν̄=1.8 |
| Q 自适应调整 | DEBUG | Kalman Q自适应 | PURR|HYPE | ν̄=2.50>γ_upper(2.0) q_β: 1.0e-4→1.05e-4 |
| hedge_beta 选择 | DEBUG | hedge_beta=0.45(kalman) P_β=0.003 | OLS_β=0.48 | negative=False |
| hedge_beta 降级 | DEBUG | Kalman P_β=0.62 > 0.50,降级为 OLS β=0.48 |
| 负 β 警告 | WARNING | 负β检测: PURR|HYPE raw_β=-0.30(kalman),配对为反向相关 |
| β 加权仓位 | INFO | 仓位计算(β加权) | β=0.45(kalman) | Alt ≈$62 | Base ≈$138 | 比例 0.45:1 |
| 硬拦截 | INFO | Beta体制硬拦截 | PURR|HYPE | P(变点)=0.850>=0.70 E[rl]=1.2 |
| 缩放拦截 | INFO | Beta体制缩放拦截 | PURR|HYPE | az=-4.50 有效阈值=4.00 (3.0×1.33) |
| 系统性风险 | INFO | 系统性风险: 8/20 (40%) 配对扩张态, 加权P(变点)=0.350 |
| 自适应 H | DEBUG | 自适应H | PURR|HYPE | ν̄=3.0 H: 0.020→0.040 |
10. 风险与边界条件
| 风险 | 严重度 | 缓解 |
|---|---|---|
| 已持仓 hedge ratio 偏离 | 高 | 本次仅在入场时计算 β 比例;持仓期间 rebalance 作为 P0 后续 |
| β 符号翻转(kalman_beta < 0) | 高 | v2: resolve_hedge_beta 返回 beta_negative 标记 + 警告日志;方向翻转逻辑作为 P1 后续 |
| Q 自适应振荡 | 中 | κ_up/κ_down 不对称(放大快缩小慢)+ Q_β 上下限保护 + EMA 平滑 |
| 自适应 H 过于灵敏 | 中 | H_max=0.1 上限保护;innov_ref=1.5 设定合理启动点 |
| Kalman Q 初始值不适配 | 低 | Q 自适应会在 20 步(~3.3天)内校准到合理范围 |
| 重启丢失 Kalman 状态 | 中 | OLS 重新初始化,3-5 根 K线收敛;P_β 过大时自动降级为 OLS β |
| BOCPD 冷启动 | 中 | warmup=5(20h)内保持 stable 默认 |
| 低波动期 r_btc≈0 | 低 | H=[1,0] 只更新 α,符合物理意义;hedge_beta 保持上一值 |
| β 加权导致极端仓位 | 低 | HEDGE_BETA_MIN=0.1, HEDGE_BETA_MAX=5.0 上下限保护 |
| Sage-Husa R 偏差 | 低 | v2 修复:仅正值更新,消除系统性低估 |
| BOCPD 截断偏差 | 低 | v2 改进:尾部概率合并到最后一个 run length,而非丢弃 |
不在本次范围:退场逻辑调整、持仓期间 hedge rebalance、HMM 体制分类、Kalman 持久化到 DB、飞书告警、负 β 方向翻转逻辑
11. 验证方案
单元测试
# test_vector_kalman.py
import numpy as np
def test_kalman_convergence():
"""从初始值收敛到真实 [α, β]"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(200):
r_btc = rng.normal(0, 0.02)
r_alt = 0.001 + 1.0 * r_btc + rng.normal(0, 0.01)
result = kf.update(r_btc, r_alt)
assert abs(result['beta'] - 1.0) < 0.15
assert result['alpha'] > 0
def test_kalman_innovation_spike():
"""β 突变时 innovation 立即飙升"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
result = kf.update(0.02, 3.0 * 0.02)
assert abs(result['normalized_innovation']) > 2.0
def test_kalman_huber_clipping():
"""极端 innovation 被截断,β 不过度跳变"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2, clip_sigma=3.0)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
result = kf.update(0.02, 20.0 * 0.02)
assert result['clipped'] is True
assert result['beta'] < 5.0
def test_kalman_joseph_positive():
"""Joseph 形式保证 P 始终非负"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-6)
rng = np.random.default_rng(42)
for _ in range(500):
result = kf.update(rng.normal(0, 0.05), 0.5 * rng.normal(0, 0.05) + rng.normal(0, 0.001))
assert result['P_beta'] >= 0 and result['P_alpha'] >= 0
def test_kalman_alpha_absorbs_drift():
"""α 吸收独立漂移,β 不被污染"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(200):
result = kf.update(rng.normal(0, 0.02), 0.005 + 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
assert abs(result['beta'] - 0.5) < 0.2
assert result['alpha'] > 0.001
def test_kalman_state_persistence():
"""状态导出/恢复后行为一致"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf1 = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2)
kf1.update(0.02, 0.01)
kf2 = VectorKalmanBetaEstimator.from_state_dict(kf1.state_dict())
r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
assert abs(r1['beta'] - r2['beta']) < 1e-10
def test_kalman_q_adapt_increase():
"""v2: innovation 持续偏大时 Q_β 自动增大"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(
alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2,
q_adapt_enabled=True, q_adapt_upper=2.0,
)
rng = np.random.default_rng(42)
# 先稳定
for _ in range(30):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
q_before = kf.q_beta
# 制造大 innovation(β 突变)
for _ in range(30):
kf.update(rng.normal(0, 0.02), 2.0 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
q_after = kf.q_beta
assert q_after > q_before, f"Q_β should increase: {q_before} → {q_after}"
def test_kalman_q_adapt_decrease():
"""v2: innovation 持续偏小时 Q_β 自动减小"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(
alpha_init=0.0, beta_init=0.5, q_beta=1e-3, r_init=1e-2,
q_adapt_enabled=True, q_adapt_lower=0.3,
)
rng = np.random.default_rng(42)
# 先稳定,用大 Q
for _ in range(30):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
q_before = kf.q_beta
# 长期稳定(innovation 偏小因为 Q 过大)
for _ in range(100):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
q_after = kf.q_beta
assert q_after < q_before, f"Q_β should decrease: {q_before} → {q_after}"
def test_kalman_q_adapt_bounds():
"""v2: Q_β 不超出上下限"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(
alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2,
q_adapt_enabled=True, q_beta_ceil=1e-2, q_beta_floor=1e-6,
)
rng = np.random.default_rng(42)
for _ in range(500):
kf.update(rng.normal(0, 0.02), 5.0 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
assert kf.q_beta <= 1e-2
assert kf.q_beta >= 1e-6
def test_kalman_sage_husa_no_negative_bias():
"""v2: Sage-Husa R 不会被系统性低估"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
true_R = 1e-3
kf = VectorKalmanBetaEstimator(
alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=true_R,
q_adapt_enabled=False,
)
rng = np.random.default_rng(42)
for _ in range(200):
r_btc = rng.normal(0, 0.02)
r_alt = 0.5 * r_btc + rng.normal(0, true_R ** 0.5)
kf.update(r_btc, r_alt)
# R 应该不会远低于 true_R
assert kf.R > true_R * 0.1, f"R={kf.R} should not be much below true_R={true_R}"
# test_hedge_beta.py
import numpy as np
def test_resolve_hedge_beta_kalman():
"""Kalman β 优先,P_β 合格时使用"""
from src.trading.orchestrator import resolve_hedge_beta
beta, source, negative = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.01, ols_beta=0.5)
assert source == 'kalman'
assert abs(beta - 0.45) < 0.01
assert negative is False
def test_resolve_hedge_beta_fallback_ols():
"""P_β 过大时降级为 OLS"""
from src.trading.orchestrator import resolve_hedge_beta
beta, source, negative = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.8, ols_beta=0.5)
assert source == 'ols'
assert abs(beta - 0.5) < 0.01
def test_resolve_hedge_beta_fallback_default():
"""无 β 时兜底 1.0"""
from src.trading.orchestrator import resolve_hedge_beta
beta, source, negative = resolve_hedge_beta(kalman_beta=None, kalman_P_beta=None, ols_beta=None)
assert source == 'default'
assert beta == 1.0
assert negative is False
def test_resolve_hedge_beta_clipping():
"""β 超限被裁剪"""
from src.trading.orchestrator import resolve_hedge_beta
beta, _, _ = resolve_hedge_beta(kalman_beta=10.0, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 5.0 # HEDGE_BETA_MAX
beta, _, _ = resolve_hedge_beta(kalman_beta=0.01, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 0.1 # HEDGE_BETA_MIN
def test_resolve_hedge_beta_negative():
"""v2: 负 β 返回 beta_negative=True"""
from src.trading.orchestrator import resolve_hedge_beta
beta, source, negative = resolve_hedge_beta(kalman_beta=-0.8, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 0.8 # abs(-0.8)
assert source == 'kalman'
assert negative is True
def test_position_size_beta_weighted():
"""β 加权仓位:alt_notional = β × base_notional"""
abs_beta = 0.5
total = 200.0
base_notional = total / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
assert abs(base_notional - 133.33) < 0.5
assert abs(alt_notional - 66.67) < 0.5
assert abs(base_notional + alt_notional - total) < 0.01
def test_position_size_beta_one():
"""β=1.0 退化为等额(向后兼容)"""
abs_beta = 1.0
total = 200.0
base_notional = total / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
assert abs(base_notional - 100.0) < 0.01
assert abs(alt_notional - 100.0) < 0.01
# test_bocpd.py
import numpy as np
def test_bocpd_stable():
from src.trading.strategy import _BOCPDDetector
d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=False)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
rng = np.random.default_rng(42)
for i in range(20):
d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
assert s.regime == 'stable' and s.changepoint_prob < 0.3
def test_bocpd_detects_shift():
from src.trading.strategy import _BOCPDDetector
d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=False)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
rng = np.random.default_rng(42)
for i in range(10):
d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
for i in range(10, 18):
d.update(key, rng.normal(3.0, 1), f"2024-01-02T{(i-10)*4:02d}:00:00")
s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
assert s.regime == 'expanding' and s.changepoint_prob > 0.3
def test_bocpd_hard_block():
from src.trading.strategy import _BOCPDDetector
d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=False)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
rng = np.random.default_rng(42)
for i in range(10):
d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
for i in range(10, 20):
d.update(key, rng.normal(5.0, 0.5), f"2024-01-02T{(i-10)*4:02d}:00:00")
s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
assert s.hard_block and s.changepoint_prob > 0.7
def test_bocpd_recovery():
from src.trading.strategy import _BOCPDDetector
d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=False)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
rng = np.random.default_rng(42)
for i in range(10):
d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
for i in range(10):
d.update(key, rng.normal(4.0, 1), f"2024-01-02T{i*4:02d}:00:00")
assert d.check(key, 0.3, 0.7, 2.0, 5).regime == 'expanding'
for i in range(15):
d.update(key, rng.normal(0, 1), f"2024-01-03T{i*4:02d}:00:00")
s = d.check(key, 0.3, 0.7, 2.0, 5)
assert s.regime == 'stable' and s.changepoint_prob < 0.3
def test_bocpd_timestamp_dedup():
from src.trading.strategy import _BOCPDDetector
d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
for _ in range(48):
d.update(key, 5.0, "2024-01-01T00:00:00")
assert d.check(key, 0.3, 0.7, 2.0, 5).reason == "数据不足"
def test_bocpd_adaptive_hazard():
"""v2: innov_sq_ema 大时 hazard rate 增大,变点检测更快"""
from src.trading.strategy import _BOCPDDetector
d_fixed = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=False)
d_adapt = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=True)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
rng = np.random.default_rng(42)
# 稳定期
for i in range(10):
x = rng.normal(0, 1)
d_fixed.update(key, x, f"2024-01-01T{i*4:02d}:00:00")
d_adapt.update(key, x, f"2024-01-01T{i*4:02d}:00:00", innov_sq_ema=1.0)
# 变化期(高 innov_sq_ema)
for i in range(10, 16):
x = rng.normal(3.0, 1)
d_fixed.update(key, x, f"2024-01-02T{(i-10)*4:02d}:00:00")
d_adapt.update(key, x, f"2024-01-02T{(i-10)*4:02d}:00:00", innov_sq_ema=4.0)
s_fixed = d_fixed.check(key, 0.3, 0.7, 2.0, 5)
s_adapt = d_adapt.check(key, 0.3, 0.7, 2.0, 5)
# 自适应版本应该有更高的变点概率
assert s_adapt.changepoint_prob >= s_fixed.changepoint_prob
# test_systemic_risk.py
def test_systemic_risk_trigger_weighted():
"""v2: 加权聚合"""
from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
agg = _SystemicRiskAggregator(enabled=True)
states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
10.0, 0.01, 1.5 if i < 4 else 1.0, False, ""
) for i in range(10)}
triggered, reason = agg.check(states, 0.3)
assert triggered # 4/10 = 40% > 30%
assert "加权P(变点)" in reason
def test_systemic_risk_no_trigger():
from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
agg = _SystemicRiskAggregator(enabled=True)
states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
'expanding' if i < 2 else 'stable', 0.5 if i < 2 else 0.1,
10.0, 0.01, 1.5 if i < 2 else 1.0, False, ""
) for i in range(10)}
assert not agg.check(states, 0.3)[0] # 2/10 = 20% < 30%
def test_systemic_risk_position_weighted():
"""v2: 仓位加权 — 大仓位配对的变点更重要"""
from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
agg = _SystemicRiskAggregator(enabled=True)
# 只有 2/10 配对 expanding,但它们仓位很大
states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
'expanding' if i < 2 else 'stable', 0.8 if i < 2 else 0.05,
10.0, 0.01, 1.5 if i < 2 else 1.0, False, ""
) for i in range(10)}
weights = {(f"ALT{i}", "BTC"): (10.0 if i < 2 else 1.0) for i in range(10)}
# 加权概率 = (2 × 10.0 × 0.8 + 8 × 1.0 × 0.05) / (20 + 8) = 16.4/28 ≈ 0.586
triggered, _ = agg.check(states, 0.3, position_weights=weights)
assert triggered # 加权概率 > 0.24 (0.3 × 0.8)
集成验证
- 回测:在 β 飙升的历史区间回测,验证检测延迟和假阳性率;对比等额 vs β 加权的 PnL 差异;对比 v1(固定 Q/H)vs v2(自适应)的检测延迟
- 实盘观察:监控 kalman_beta vs OLS β 的偏离度、hedge_beta_source 分布、P_β 走势、Q_β 自适应轨迹(不同配对应收敛到不同值)、innov_sq_ema 分布
- A/B 对比(可选):同时运行等额和 β 加权,对比 hedge 效果和回撤
12. 后续演进
| 优先级 | 方向 | 预期收益 | 依赖 |
|---|---|---|---|
| P0 | 退场保护(β 飙升时收紧止损) | 减少已持仓亏损 | 本方案 |
| P0 | 持仓期间 hedge rebalance(β 偏离阈值时调整两腿比例) | 维持对冲质量 | 本方案 |
| P1 | Kalman 状态持久化到 DB | 消除 20h 冷启动窗口 | 本方案 |
| P1 | 负 β 方向翻转逻辑 | 正确处理反向相关配对 | 本方案 v2 beta_negative |
| P1 | 均值回归状态转移 x_t = Φx_{t-1} + (I-Φ)μ + w_t |
P_β 有稳态解,长期更稳定 | 本方案 |
| P2 | HMM 体制分类(多体制缩放) | 更精细的体制控制 | 本方案 |
| P2 | 系统性风险分级(警告/严重/紧急) | 更精细的全局风控 | 本方案 |
| P2 | MS-SSM 统一框架(Regime-Switching SSM) | 理论最优,参数更少 | 替换本方案架构 |
| P3 | 飞书告警 | 体制切换和 hedge_beta 变化时人工监控 | 本方案 |
| P3 | 多频率融合(4h + daily) | 更鲁棒的 β 估计 | 本方案 |