Kalman Beta 双用途设计方案(v3.1 — IMM + OU 架构版)
Kalman Beta 双用途设计方案(v3.1 — IMM + OU 架构版)
1. 问题背景
当前系统在 Beta 值的使用上存在多个结构性缺陷:
缺陷 1:Beta 估计滞后
OLS BETA_WINDOW=100 固定窗口(≈17天@4h),β 结构性变化后系统需要一周以上才能感知:
T+0h BTC 回暖,β 从 0.5 开始上升
T+12h β 实际 2.0,OLS β̂ 仍 ≈ 0.6 → 误判为均值回归机会
T+24h β 实际 4.0,OLS β̂ ≈ 0.8 → 继续误入场
T+7d β 实际 8.0,OLS β̂ ≈ 2.0 → 协整假设彻底失效
核心问题:系统用滞后 β 计算的 z4h 产生入场信号,实际上反映的是 β 正在变化,而非 spread 正在回归。
缺陷 2:仓位比例未使用 Beta
risk_manager.calculate_position_size() 对两腿使用等额名义价值:
# 当前实现(risk_manager.py:164-170)
alt_size = position_usd / alt_price # 两腿等额
base_size = position_usd / base_price # 与 β 无关
配对交易的对冲比例应为 alt_notional = β × base_notional。等额开仓意味着:
- β=0.5 时,Alt 腿过重 → 暴露于 Alt 的非系统性风险
- β=2.0 时,Alt 腿过轻 → 对冲不足,价差波动被放大
- β 变化时,无法调整比例 → 持仓期间 hedge 持续偏离
缺陷 3:固定 Q 无法适配不同币对
不同配对的 β 动态特征差异极大:
- MEME 配对(如 PURR/HYPE):β 波动剧烈,需要更大的过程噪声 Q 来追踪
- L1 配对(如 ETH/BTC):β 非常稳定,过大的 Q 会引入不必要的噪声
固定 q_β=1e-4 无法同时满足两类配对的需求。
缺陷 4:v2 的 Q 自适应与 BOCPD 信号竞争(v3 新增)
v2 方案中存在一个未被识别的设计矛盾:
Q 自适应的目标: 使 normalized innovation → N(0,1)
BOCPD 的输入: normalized innovation 的分布变化
两者对抗: Q 自适应越好 → innovation 越稳定 → BOCPD 越难检测变点
具体表现:
β 开始飙升
→ innovation 变大
→ Q 自适应增大 Q_β(追踪变化)
→ innovation 被 Q 调整"吸收",逐步回到 N(0,1)
→ BOCPD 看到的变点信号被削弱
→ 检测延迟甚至漏报
v2 通过 κ_up=1.05(每步仅放大 5%)故意放慢 Q 自适应来缓解,但这意味着 Q 自适应的效果也被人为削弱——两个组件互相妥协,都无法达到最优。
缺陷 5:v2 的 Q 自适应依赖启发式超参数(v3 新增)
v2 的 Innovation-based Q 自适应(Mehra 1970)本质是带阈值的乘法调节器:
ν̄ > γ_upper(2.0) → Q_β *= κ_up(1.05)
ν̄ < γ_lower(0.3) → Q_β *= κ_down(0.98)
8 个超参数(η, γ_upper, γ_lower, κ_up, κ_down, q_ceil, q_floor, q₀)全部缺乏理论依据,且存在交互效应。加上 BOCPD 的 10 个参数,v2 共有 28 个需要手调的参数。
缺陷 6:高斯似然对加密货币重尾分布失效(v3 新增)
v2 的 Kalman Filter 及 BOCPD 均假设 innovation 服从高斯分布。但加密货币收益率是典型的重尾分布(kurtosis 通常 5-20):
高斯分布下 5σ 事件概率: ≈ 3e-7
Student-t(ν=5) 下 5σ 事件概率: ≈ 1e-3
差距: ~4 个数量级
影响:
- 模型概率对异常值过度敏感:一个重尾正常值在高斯似然下被判为"极不可能",导致模型概率剧烈跳变
- Huber clipping 只保护了状态更新,未保护模型概率更新(似然仍用原始 innovation)
- 模型概率频繁震荡 → regime_score 不稳定 → 误拦截或漏拦截
解决方案:将似然函数替换为 Student-t 分布(Roth et al. 2013; Agamennoni et al. 2012),仅改似然计算,Kalman 更新步骤不变。
缺陷 7:随机游走状态方程导致协方差无界增长(v3.1 新增)
v2/v3 原方案使用纯随机游走状态方程 x_t = x_{t-1} + w_t,导致:
P_β 的递推:
P_pred = P + Q
P_post = (1 - KH) × P_pred
当 r_btc ≈ 0(BTC 横盘)时:
H = [1, ~0] → β 不可观测
K_β ≈ 0 → P_β 每步增长 Q_β
→ P_β 线性增长,永无稳态
影响:
- 长期运行后 P_β 漂移:β 估计的不确定性无稳态解,取决于何时启动
- 低波动期 P_β 膨胀:BTC 横盘数天后 P_β 可能超过降级阈值,导致不必要地回退到 OLS
- β 估计可漂移至任意值:无均值回归约束,纯随机游走允许 β 无界漂移
解决方案:引入 OU(Ornstein-Uhlenbeck)均值回归状态方程,仅新增 1 个参数 Φ_β,提供 P_β 稳态解 P_∞ = Q/(1-Φ²)。
当前系统弱点汇总
| 组件 | 问题 | 影响 |
|---|---|---|
analysis_core.py |
BETA_WINDOW=100 固定窗口 OLS |
β 估计滞后 ≈17天 |
risk_manager.py |
等额名义价值,未使用 β | 对冲比例错误 |
strategy.py |
adaptive_threshold=3.0 固定 |
β 飙升导致持续突破阈值 |
momentum_filter.py |
只检测单腿价格趋势 | 不检测 spread/beta 体制切换 |
| v2 Q 自适应 | 启发式阈值 + 与 BOCPD 信号竞争 | 两个组件互相削弱 |
| v2 Sage-Husa R | 仅正值更新导致正偏差 | R 长期被高估 |
| v2 高斯似然 | 加密货币重尾分布下过度敏感 | 模型概率频繁震荡 |
| v2/v3 随机游走 | P_β 无稳态解,低波动期膨胀 | 长期运行后降级到 OLS |
2. 方案概述
核心设计:IMM(Interacting Multiple Model)Kalman Filter + OU 均值回归状态方程 估计时变 [α, β],其输出 同时服务两个用途:
┌──────────────────────────────────────────────────────┐
│ IMMKalmanBetaEstimator (4h) │
│ 输入: r_btc, r_alt (对数收益率) │
│ 特性: M=5 并行模型, OU 状态方程, Student-t 似然, │
│ Per-pair ν, 在线 R 估计, 可观测性保护 │
│ 输出: beta, P_beta, model_probs, regime_score │
└──────────────┬───────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼
用途 1: 体制检测 用途 2: Hedge Ratio
┌─────────────────┐ ┌─────────────────────┐
│ IMM 模型概率判定 │ │ 仓位比例计算 │
│ P(高Q模型) 阈值 │ │ 输入: beta │
│ 输出: regime_score│ │ 输出: alt/base 数量比 │
└────────┬────────┘ └──────────┬──────────┘
│ │
▼ ▼
入场信号过滤 Beta 加权开仓
(硬拦截/阈值缩放) (alt_notional = β × base_notional)
│
▼
跨配对系统性风险聚合
(加权 regime_score > 阈值 → 全局拦截)
v3.1 相比 v2 的核心算法升级
| 组件 | v2 | v3.1 | 提升 |
|---|---|---|---|
| β 估计 | 单一 Kalman + 启发式 Q 自适应 | IMM 多模型融合 | 消除 Q 自适应的 8 个启发式超参;贝叶斯最优模型选择 |
| 状态方程 | 随机游走 x_t = x_{t-1} + w_t | OU 均值回归 x_t = Φx_{t-1} + (I-Φ)x̄ + w_t | P_β 有稳态解;β 长期运行不漂移 |
| 体制检测 | BOCPD(与 Q 自适应信号竞争) | IMM 模型概率(体制信号内生) | 消除信号竞争;减少 10 个 BOCPD 参数 |
| 似然函数 | 高斯(对重尾过敏) | Student-t(per-pair ν 自适应初始化) | 模型概率对加密货币异常值鲁棒;不同币对自动适配 |
| R 估计 | Sage-Husa(正偏差) | Robbins-Monro EMA(γ=0.02,无截断偏差) | 消除正偏差;在线追踪观测噪声变化 |
| 转移概率 | — | 带宽 TPM(相邻模型更可能跳转) | 符合 β 渐变的物理特征 |
| 可观测性 | 无保护 | r_btc ≈ 0 时跳过 β 更新 | 防止低波动期 P_β 膨胀和 regime_score 失真 |
| 系统性风险 | 计数比例 OR 加权概率(阈值耦合) | 双阈值独立 + 确认窗口 | 消除魔术数字;防止单根 K 线噪声导致全局停摆 |
| 负 β 处理 | 返回符号信息 | 不变 | — |
| 总参数量 | 28 个(18 Kalman + 10 BOCPD) | 17 个 | 减少 39%(均有理论依据) |
为什么 IMM 能同时解决 Q 自适应和体制检测
IMM 的关键洞察:不需要先"猜对" Q 再检测变点,而是让多个 Q 值并行竞争,贝叶斯概率自动选出最优。
MEME 配对(β 波动大):
高Q模型(1e-3, 1e-2) 持续获得高概率 → effective_Q 自动偏大
→ 追踪快,且 regime_score 高 → 自然提高入场门槛
L1 配对(β 稳定):
低Q模型(1e-6, 1e-5) 持续获得高概率 → effective_Q 自动偏小
→ 不追噪声,且 regime_score 低 → 正常入场
β 突变时:
高Q模型的 likelihood 瞬间飙升 → 模型概率快速切换
→ regime_score 立即升高 → 拦截入场
→ 同时 β 估计通过概率加权自动跟上新值
与 v2 的根本区别:v2 中 Q 自适应和变点检测是两个独立系统在抢同一个信号;v3.1 中它们是同一个贝叶斯推断的两个输出,不存在竞争。
v3.1 的附加改进(相比 v3 原方案):
- OU 均值回归状态方程:P_β 有稳态解
P_∞ = Q/(1-Φ²),消除长期协方差漂移 - Per-pair Student-t ν 初始化:基于历史残差 kurtosis 自动设定,不同币对自动适配尾部厚度
- Robbins-Monro 无截断偏差:去掉
max(., 0)截断,仅用 R_floor 保护正定,消除与 Sage-Husa 相同性质的正偏差 - 可观测性保护:
|r_btc| < ε_obs时跳过 β 更新,防止 regime_score 在低信息量时失真 - 系统性风险确认窗口:连续 N 根 K 线触发才执行全局拦截,避免单根 K 线噪声导致停摆
- 模型概率下限保护:
μ_floor=1e-6防止长期运行后某模型概率下溢到 0
为什么 4h 频率足够
β 是两个资产之间的结构性关系(同赛道、资金流向、基本面联动),变化周期在天~周级别。
| 方法 | 响应 β 结构性变化 | 抗日内噪声 | 适用场景 |
|---|---|---|---|
| OLS 100×4h | ~7-10 天 | 强 | 当前系统(过于滞后) |
| 4h IMM Kalman | 1-4 根K线(4-16h) | 强 | 体制检测 + Hedge Ratio |
| 5m Kalman | ~15-30 分钟 | 弱(追噪声) | 不适用于 β 估计 |
IMM 的响应速度优于 v2 的单一自适应 Kalman:β 突变时,高Q 模型立即拟合新关系,概率权重瞬间切换,无需等 Q 自适应的逐步放大过程。
四层架构
| 层 | 组件 | 功能 | v3.1 改进 |
|---|---|---|---|
| 第一层 | IMM Kalman Filter | M 个并行 Kalman 估计 [α, β],OU 状态方程 + Student-t 似然 + 在线 R + 带宽 TPM + 可观测性保护 + 贝叶斯模型概率融合 | 替代 v2 单 Kalman + 启发式 Q 自适应 + Sage-Husa R;新增 OU 过程和可观测性保护 |
| 第二层 | IMM 体制检测 | P(高Q模型) 作为体制指标 → 入场信号过滤 | 替代 v2 BOCPD,无信号竞争 |
| 第三层 | 跨配对系统性风险聚合 | 统计所有配对体制状态 → 系统性事件拦截 | 双阈值独立设定 + 确认窗口 |
| 第四层 | Beta 加权仓位计算 | 用 kalman_beta 计算两腿名义比例 |
负 β 方向感知(与 v2 相同) |
3. 算法选型论证
3.1 为什么选 Kalman Filter(而非其他时变回归方法)
| 方法 | 优势 | 劣势 | 适用性评估 |
|---|---|---|---|
| 向量 Kalman Filter | 最优线性估计(MMSE);递推 O(1);P_β 提供不确定性度量 | 假设线性高斯;单一 Q 不自适应 | 最优基础组件(配合 IMM 解决 Q 问题) |
| RLS (递推最小二乘) | 实现简单;forgetting factor 天然自适应 | 无状态不确定性度量;不支持多模型融合 | 不适合(缺少概率信息) |
| 粒子滤波 | 处理非线性/非高斯 | O(N_particles) 计算量大;参数多 | 过度设计(β 的线性关系足够) |
| Online Bayesian LR | 完全贝叶斯;自然输出后验 | 不支持时变参数(除非加 forgetting) | 不如 Kalman 灵活 |
| 神经网络 (LSTM/Transformer) | 可捕获非线性 | 需大量训练数据;不可解释 | 不适合生产系统 |
结论:Kalman Filter 是时变线性回归的理论最优解(Kalman 1960)。单一 Kalman 的 Q 固定问题通过 IMM 多模型融合解决。
3.2 为什么选 IMM 做 Q 自适应(而非 v2 的 Mehra 启发式)
| 方法 | 优势 | 劣势 | 适用性评估 |
|---|---|---|---|
| IMM (Blom & Bar-Shalom 1988) | 贝叶斯最优模型选择;无启发式阈值;模型概率直接指示体制;响应 β 突变仅需 1-2 步 | M 倍计算量(M=5 对 2D 状态可忽略);需设定 Q 网格 | 最优选择 |
| Innovation-based (Mehra 1970) | 经典方法;实现简单 | 8 个启发式超参;与变点检测信号竞争;响应速度受 κ 限制 | v2 方案(已被 v3 替代) |
| VB-AKF (Sarkka 2009) | 联合估计 Q 和 R 的后验;数学严格 | 实现复杂;变分近似引入额外误差;不直接输出体制信号 | P1 演进方向 |
| 多模型 GPB1 (简化 IMM) | 比 IMM 简单(无混合步) | 模型切换后状态不连续 | 不如 IMM 稳定 |
| IMM-ATP (自适应转移概率) | 在线学习 p_stay | 额外复杂度;收益有限 | P2 增强选项 |
IMM 的核心优势:
- Q 网格直观:
[1e-6, 1e-5, 1e-4, 1e-3, 1e-2]覆盖"β几乎不变"到"β剧烈变化"的全频谱,每个值的物理含义清晰 - 无阈值调参:贝叶斯后验自动选择最优模型,不需要 γ_upper/γ_lower/κ_up/κ_down
- 内生体制信号:
P(高Q模型)天然表示"β 是否正在快速变化",无需额外的变点检测器 - 瞬时响应:β 突变时高Q模型的 likelihood 立即飙升,模型概率 1-2 步内切换完成;不像 Mehra 方法受 κ=1.05 限制需数十步
- 对 2D 状态计算量可忽略:5 个 2×2 Kalman Filter 的总计算量 ≈ 100 次浮点运算
- 与 Student-t 天然兼容:只需替换似然函数,Kalman 更新步骤不变;重尾鲁棒性对加密市场至关重要
- 在线 R 估计平滑化:IMM 概率加权的 innovation 比单模型的更稳定,Robbins-Monro 估计更准确
3.3 为什么 IMM 模型概率可替代 BOCPD
| 方法 | 优势 | 劣势 | 适用性评估 |
|---|---|---|---|
| IMM 模型概率 | 信号内生(无竞争);校准良好的贝叶斯后验;0 额外参数 | 仅检测 Q 维度的变化(无法检测非线性断裂) | v3.1 最优选择 |
| BOCPD (Adams & MacKay 2007) | 通用在线变点检测;输出 run length 后验 | 与 Q 自适应信号竞争;10 个额外参数 | v2 方案(已被 v3 替代) |
| CUSUM | 极简 | 无概率输出;阈值需手调 | 过于简单 |
| HMM | 多体制建模 | 体制数需预设;EM 需离线 | P2 演进方向 |
为什么 IMM 模型概率是更好的体制指标:
BOCPD 监控 innovation 分布变化来"推断"β 是否在变——这是一个间接信号。IMM 的模型概率直接回答"β 的变化速度属于哪个量级"——这是一个直接信号。
BOCPD 路径: innovation 变大 → 推断分布变了 → 推断β在变 → 变点概率升高
IMM 路径: 高Q模型 likelihood 升高 → 模型概率切换 → β变化速度指标升高
BOCPD 需要额外假设(NIG 先验、hazard rate),IMM 不需要。
保留 BOCPD 作为 P2 选项:当需要检测 β 的非线性断裂(如突然从正相关变为负相关,不仅是变化速度加快)时,BOCPD 可作为补充检测器。此时建议监控 IMM 的加权 innovation(而非 normalized innovation),以避免信号竞争。
3.4 为什么用 Student-t 似然(而非高斯)
| 方法 | 优势 | 劣势 | 适用性评估 |
|---|---|---|---|
| Student-t 似然 (Roth 2013, Agamennoni 2012) | 重尾鲁棒;5σ 事件概率 ~1e-3(vs 高斯 3e-7);模型概率更新平滑 | ν 参数需设定 | v3.1 最优选择 |
| 高斯似然 | 经典 Kalman 标准;实现最简 | 加密货币 kurtosis 5-20,异常值导致模型概率剧烈震荡 | v2(不适合加密市场) |
| Huber 似然 (Huber 1964) | 对异常值鲁棒 | 非概率模型,不输出校准的似然值,无法用于模型概率更新 | 不适合 IMM |
| 混合高斯 | 可建模重尾 | 参数多(每个混合成分需 μ, σ, π) | 过度设计 |
Student-t 的关键优势:
Kalman Filter 的状态更新步骤(predict + update)不需要改动,只需将模型概率更新中的似然函数从高斯替换为 Student-t:
高斯: L_j = N(ε; 0, S) = (2πS)^{-0.5} exp(-ε²/2S)
Student-t: L_j = t_ν(ε; 0, S) ∝ (1 + ε²/(νS))^{-(ν+1)/2}
v3.1 Per-pair ν 初始化(替代固定 ν=5):
不同币对的尾部厚度差异显著(ETH/BTC kurtosis ≈ 5-8 vs MEME kurtosis ≈ 15-30),固定 ν=5 不够精细。v3.1 从 OLS 初始化阶段的残差 kurtosis 自动推算 ν:
ν_init = max(3.0, 6.0 / (kurtosis/3 - 1) + 2)
示例:
ETH/BTC (kurtosis ≈ 6): ν = 6/(6/3-1) + 2 = 8.0 → 接近高斯(合理)
MEME (kurtosis ≈ 15): ν = 6/(15/3-1) + 2 = 3.5 → 重尾保护强
中等 (kurtosis ≈ 9): ν = 6/(9/3-1) + 2 = 5.0 → 与 v3 默认一致
上下限保护: ν = clip(ν_init, 3.0, 30.0)
ν < 3: 方差不存在(Student-t 性质)
ν > 30: 趋近高斯,可直接用高斯
实现成本极低(初始化时 1 行计算),但对异构币对提供自动适配。
3.5 为什么用 Robbins-Monro 在线 R 估计(而非固定 R 或 Sage-Husa)
| 方法 | 优势 | 劣势 | 适用性评估 |
|---|---|---|---|
| Robbins-Monro EMA(v3.1 无截断版) | 无偏;实现极简(1行);追踪 R 时变 | 步长 γ 需手设 | v3.1 最优选择 |
| 固定 R | 最简;IMM Q 选择部分吸收 R 变化 | 观测噪声实际时变(日间 vs 周末,牛市 vs 熊市) | v3 初版(已升级) |
| Sage-Husa (1969) | 经典自适应方法 | max(., 0) 截断 → R 长期正偏差 |
v2(已弃用) |
Robbins-Monro EMA(v3 原版,含 max(.,0)) |
偏差比 Sage-Husa 小 | max(.,0) 截断仍引入同性质正偏差(只是程度较轻) |
v3 原方案(v3.1 修复) |
| VB-AKF (Sarkka 2009) | 联合估计 Q 和 R 的后验;数学严格 | 实现复杂;变分近似引入额外误差 | P1 演进方向 |
v3.1 修复:去掉 max(., 0) 截断
v3 原方案的 R 更新公式:
R_t = (1-γ) × R_{t-1} + γ × max(ε_t² - H_t P_pred H_t', 0) ← 有正偏差
max(., 0) 使得 R 只能被增大、不能被减小(当 ε² < HPH' 时更新被截断为 0)。这与文档对 Sage-Husa 的批评("仅正值更新导致正偏差")构成逻辑矛盾。
v3.1 修正为:
R_update = ε_t² - H_t P_pred H_t' ← 允许负值
R_t = (1-γ) × R_{t-1} + γ × R_update ← 无截断
R_t = max(R_t, R_floor) ← 仅 floor 保护正定
R_floor = 1e-8 已足够保证 R > 0(正定性),不需要逐步截断。ε² < HPH' 是正常统计波动(约 50% 的时间会发生),截断这些事件会系统性高估 R。
3.6 为什么用 OU 状态方程(而非随机游走)
| 方法 | P_β 稳态解 | β 漂移控制 | 参数量 | 适用性 |
|---|---|---|---|---|
OU 过程 x_t = Φx_{t-1} + (I-Φ)x̄ + w_t |
✅ P_∞ = Q/(1-Φ²) |
✅ 向 x̄ 回归 | +1 (Φ_β) | v3.1 最优 |
随机游走 x_t = x_{t-1} + w_t |
❌ 无稳态 | ❌ 无约束漂移 | 0 | v3 原方案 |
固定参数 x_t = x_0 |
完全不追踪 | 完全不漂移 | 0 | OLS(过于保守) |
OU 过程的物理意义:
β 描述两个加密货币之间的结构性关系。虽然 β 会随市场环境变化(时变),但它有回归倾向——两个同赛道币的关系不会无限偏离初始值。OU 过程精确建模了这一特征:
Φ_β = 0.98 的含义:
- 每步 β 向 β̄(长期均值)回归 2%
- 特征时间尺度 τ = -1/ln(Φ) ≈ 50 步 ≈ 8 天
- 与 p_stay=0.98(模型持续时间)一致
P_β 稳态解:
对 Q_β=1e-4, Φ_β=0.98:
P_∞ = 1e-4 / (1 - 0.98²) = 1e-4 / 0.0396 ≈ 2.5e-3
→ P_β 永远不会超过此值(即使长期运行/低波动期)
→ 不会不必要地降级到 OLS
对 IMM 的影响:每个模型的 Q_β^(j) 不同,因此每个模型的 P_∞^(j) 不同——高Q模型天然有更大的稳态不确定性,低Q模型有更小的。这与 IMM 的设计意图完全一致。
3.7 为什么不用统一框架(Regime-Switching SSM)
Regime-Switching State Space Model (Kim 1994) 将 Kalman + 体制检测合二为一:
- 优势:参数更少、理论更优雅
- 劣势:Kim's approximation 有误差累积;实现复杂度高;调试困难
当前选择:IMM + 阈值体制判定。原因:
- IMM 已覆盖 Q 自适应 + 体制检测两个需求
- 实现简单(标准 IMM 教科书代码)
- 可独立调试每个模型的行为
- MS-SSM / HMM 作为 P2 演进方向保留
4. 详细设计
4.1 IMM Kalman Filter 时变 [α, β] 估计
数学模型
M 个并行状态空间模型(每个模型 j 使用不同的过程噪声 Q_β^(j)),共享 OU 均值回归结构:
状态方程(OU 过程,所有模型共享 Φ,Q_β 不同):
x_t = Φ × x_{t-1} + (I - Φ) × x̄ + w_t, w_t ~ N(0, Q^(j))
其中 x_t = [α_t, β_t]'
x̄ = [α_init, β_init]' OLS 初始值作为长期均值
Φ = [[Φ_α, 0 ], 对角回归矩阵
[ 0, Φ_β]]
Q^(j) = [[q_α, 0 ], q_α 所有模型共享
[ 0, q_β^(j)]] q_β^(j) 各模型不同
Φ_α = 0.995 → α 回归极慢(特征时间 ~200步 ≈ 33天)
Φ_β = 0.98 → β 回归适中(特征时间 ~50步 ≈ 8天)
观测方程(所有模型共享):
r_alt_t = H_t × x_t + v_t, v_t ~ N(0, R)
其中 H_t = [1, r_btc_t]
r_alt_t = log(alt_t / alt_{t-1})
r_btc_t = log(btc_t / btc_{t-1})
R 初始化自 OLS 残差方差,在线 Robbins-Monro EMA 更新
OU 过程的 P_β 稳态解(推导):
稳态条件: P_∞ = Φ P_∞ Φ' + Q (忽略观测更新,仅考虑预测步)
对角情形:
P_∞_β = Φ_β² × P_∞_β + Q_β
→ P_∞_β = Q_β / (1 - Φ_β²)
各模型的稳态 P_β:
模型 0 (Q_β=1e-6): P_∞ = 1e-6 / 0.0396 ≈ 2.5e-5
模型 1 (Q_β=1e-5): P_∞ = 1e-5 / 0.0396 ≈ 2.5e-4
模型 2 (Q_β=1e-4): P_∞ = 1e-4 / 0.0396 ≈ 2.5e-3
模型 3 (Q_β=1e-3): P_∞ = 1e-3 / 0.0396 ≈ 2.5e-2
模型 4 (Q_β=1e-2): P_∞ = 1e-2 / 0.0396 ≈ 2.5e-1
观测更新会进一步减小 P_β → 实际值更低。
关键: 融合后的 P_β = Σ μ_j [P_β^(j) + (β^(j) - β̄)²] 也有稳态,不会无界增长。
Q_β 网格设计:
| 模型 j | Q_β^(j) | 物理含义 | P_∞_β | 日 β 漂移 σ |
|---|---|---|---|---|
| 0 | 1e-6 | β 几乎不变 | 2.5e-5 | ~0.002 |
| 1 | 1e-5 | β 缓慢变化 | 2.5e-4 | ~0.008 |
| 2 | 1e-4 | β 正常变化(先验中心) | 2.5e-3 | ~0.024 |
| 3 | 1e-3 | β 快速变化 | 2.5e-2 | ~0.077 |
| 4 | 1e-2 | β 剧烈变化 / 结构性断裂 | 2.5e-1 | ~0.245 |
覆盖 4 个数量级,对数均匀分布,覆盖从 ETH/BTC 级稳定到 MEME 级波动的全部场景。
IMM 递推算法
输入: r_btc_t, r_alt_t, 上一步状态 {x^(j), P^(j), μ_j} for j=0..M-1
Step 0: 可观测性检查
若 |r_btc_t| < ε_obs (默认 1e-6):
→ 跳过 β 更新: 仅用 H=[1, 0] 更新 α
→ 标记 regime_score 为上一步值(不更新模型概率)
→ 原因: r_btc≈0 时 β 不可观测,强行更新会
使 P_β 膨胀、regime_score 失真
Step 1: 混合(Interaction)
预测模型概率:
μ̃_j = Σ_i π_{ij} × μ_i (M×1)
其中 π_{ij} = P(模型j在t | 模型i在t-1)
混合权重:
w_{i|j} = π_{ij} × μ_i / μ̃_j (M×M)
混合状态(为每个模型j准备初始条件):
x̃^(j) = Σ_i w_{i|j} × x^(i) (2×1)
P̃^(j) = Σ_i w_{i|j} × [P^(i) + (x^(i) - x̃^(j))(x^(i) - x̃^(j))'] (2×2)
Step 2: 并行滤波(每个模型 j 独立运行 OU Kalman)
OU 预测(关键改动: 随机游走 → OU):
x_pred^(j) = Φ × x̃^(j) + (I - Φ) × x̄
P_pred^(j) = Φ × P̃^(j) × Φ' + Q^(j)
Innovation:
ε^(j) = r_alt_t - H_t × x_pred^(j)
S^(j) = H_t × P_pred^(j) × H_t' + R
S^(j) = max(S^(j), 1e-15)
Huber Clipping (c=3.0):
ε̃^(j) = clip(ε^(j), -c×√S^(j), +c×√S^(j))
Kalman 增益 + 更新:
K^(j) = P_pred^(j) × H_t' / S^(j)
x^(j) = x_pred^(j) + K^(j) × ε̃^(j)
Joseph 稳定形式:
A^(j) = I - K^(j) × H_t
P^(j) = A^(j) × P_pred^(j) × A^(j)' + R × K^(j) × K^(j)'
似然(Student-t,用原始 innovation,非截断后的):
L_j = Γ((ν+1)/2) / (Γ(ν/2) × √(νπS^(j))) × (1 + ε^(j)²/(ν×S^(j)))^{-(ν+1)/2}
其中 ν = Student-t 自由度(per-pair 初始化,默认 5)
Step 3: 模型概率更新
μ_j = μ̃_j × L_j / Σ_k (μ̃_k × L_k) (归一化)
μ_j = max(μ_j, μ_floor) (下限保护,μ_floor=1e-6)
归一化: μ_j /= Σ_k μ_k
Step 4: 在线 R 更新(Robbins-Monro EMA,v3.1 无截断版)
若 γ_R > 0:
weighted_innov_sq = Σ_j μ̃_j × ε^(j)² (注: 用预测概率 μ̃,避免循环依赖)
weighted_HP_H = Σ_j μ̃_j × H P_pred^(j) H'
R_update = weighted_innov_sq - weighted_HP_H (允许负值,无 max(.,0) 截断)
R = (1-γ_R) × R + γ_R × R_update
R = max(R, R_floor) (仅 floor 保护正定)
同步到所有 M 个模型
Step 5: 融合输出
β = Σ_j μ_j × β^(j) (概率加权)
α = Σ_j μ_j × α^(j)
P_β = Σ_j μ_j × [P_β^(j) + (β^(j) - β)²] (含模型间方差)
regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j (高Q模型总概率)
effective_q_β = Σ_j μ_j × q_β^(j) (有效Q值)
innovation = Σ_j μ_j × ε^(j) (概率加权)
v3.1 Step 4 改进要点:
- 无截断:
R_update允许为负值,消除 Sage-Husa 同性质的正偏差 - 用预测概率 μ̃:R 更新使用 Step 1 的
μ̃_j(而非 Step 3 更新后的μ_j),避免似然→R→似然的循环依赖 - R_floor 兜底:
R_floor=1e-8保证正定性,是充分的下限保护
模型转移概率矩阵 Π(带宽 TPM):
Π[i,j] = { p_stay if i == j (对角线: 0.98)
{ (1-p_stay) × exp(-|i-j|) / Z_i if i ≠ j (带宽衰减)
其中 Z_i = Σ_{j≠i} exp(-|i-j|) (逐行归一化常数)
相比均匀非对角线的改进:
均匀 TPM 假设"从 Q=1e-6 跳到 Q=1e-2 和跳到 Q=1e-5 的概率相同"——不合理。实际中 β 变化速度通常渐变(先加速再进入快速变化),带宽 TPM 使相邻 Q 模型更可能互相跳转:
M=5 示例(中间行 i=2 的非对角权重):
j=0 (|i-j|=2): exp(-2) ≈ 0.14 → 概率 0.003
j=1 (|i-j|=1): exp(-1) ≈ 0.37 → 概率 0.008
j=3 (|i-j|=1): exp(-1) ≈ 0.37 → 概率 0.008
j=4 (|i-j|=2): exp(-2) ≈ 0.14 → 概率 0.003
对比均匀: 每个非对角 = 0.005
p_stay=0.98 含义:模型在 4h 尺度上 98% 概率保持不变,2% 概率切换到其他模型。对应 β 变化速度的平均持续时间 ≈ 50 根 K线(~8天),与 Φ_β=0.98 的特征时间一致。带宽衰减使跳转更倾向相邻模型,对 β 突变仍可响应(只是跳到远端模型的概率更低,但 likelihood 主导时仍会快速切换)。
关键输出信号及其用途
| 信号 | 用途 1: 体制检测 | 用途 2: Hedge Ratio |
|---|---|---|
beta (Σ μ_j × β^(j)) |
β 趋势监控 | 直接作为 hedge ratio |
alpha (Σ μ_j × α^(j)) |
吸收独立漂移,避免污染 β | — |
P_beta (含模型间方差) |
不确定性度量(OU 过程保证有稳态) | hedge ratio 置信度(P_β 过大时降级为 OLS β) |
regime_score (Σ 高Q模型概率) |
直接用于体制检测 | — |
model_probs (μ 向量) |
诊断:哪个 Q 模型主导 | — |
effective_q_beta (Σ μ_j × q_β^(j)) |
诊断:当前有效 Q_β | — |
observable (bool) |
r_btc ≈ 0 时 regime_score 不可靠 | — |
参数
| 参数 | 符号 | 默认值 | 含义 | 调优 |
|---|---|---|---|---|
| Q_β 网格 | q_β_grid | [1e-6, 1e-5, 1e-4, 1e-3, 1e-2] | M=5 个模型的 β 过程噪声 | 对数均匀,覆盖全频谱 |
| α 过程噪声 | q_α | 1e-5 | α 每 4h 变化方差(所有模型共享) | 通常不调 |
| β 回归系数 | Φ_β | 0.98 | OU 过程均值回归速率(特征时间 ~8天) | ↑更保守 ↓追踪更快 |
| α 回归系数 | Φ_α | 0.995 | α 回归速率(特征时间 ~33天) | 通常不调 |
| 观测噪声初始值 | R_init | OLS 残差方差 | 初始 R,后续在线更新 | 初始化 |
| R 下限 | R_floor | 1e-8 | 防止 R 退化为零 | — |
| R 学习率 | γ_R | 0.02 | Robbins-Monro EMA 步长(0=禁用在线R) | ↓平滑 ↑响应 |
| Innovation 截断 | c | 3.0 | Huber 截断(σ 倍数) | ↓鲁棒 ↑灵敏 |
| Student-t 自由度 | ν | per-pair (默认 5.0) | 似然函数重尾度(从 OLS 残差 kurtosis 自动推算) | 3-30 范围 |
| 可观测性阈值 | ε_obs | 1e-6 | |r_btc| < 此值时跳过 β 更新 | — |
| 初始 [α, β] / 长期均值 x̄ | — | OLS 估计 | 所有模型共享初始值和 OU 回归目标 | — |
| 初始 P | P₀ | diag(0.1, 1.0) | 初始不确定性 | — |
| 模型持久性 | p_stay | 0.98 | 带宽 TPM 对角线 | ↑模型切换更保守 |
| 高Q阈值 | q_high | 1e-3 | Q_β ≥ 此值归入"高Q"组 | — |
| 概率下限 | μ_floor | 1e-6 | 防止模型概率下溢到 0 | — |
总计 17 个核心参数(vs v2 的 28 个,减少 39%)。相比 v3 原方案(15 个)新增 Φ_β、Φ_α、ε_obs,均有明确物理含义和理论依据。
参数间的一致性约束:
q_α = q_β_grid[2]/10→ α 变化比中位 β 慢一个量级p_stay = Φ_β = 0.98→ 模型持续时间和 β 特征时间对齐(~8天)Φ_α = 0.995→ α 回归比 β 更慢,允许独立漂移被缓慢吸收ν从 OLS 残差 kurtosis 自动推算,覆盖 3-30 范围γ_R = 0.02→ R 有效窗口 ≈50 步,与 β 特征时间匹配
文件:src/config.py
# ═══ IMM Kalman Filter 参数(v3.1)═══
IMM_Q_BETA_GRID: list[float] = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
IMM_Q_ALPHA: float = 1e-5
IMM_P0_ALPHA: float = 0.1
IMM_P0_BETA: float = 1.0
IMM_R_FLOOR: float = 1e-8
IMM_R_GAMMA: float = 0.02 # Robbins-Monro R 学习率(0=禁用在线R估计)
IMM_CLIP_SIGMA: float = 3.0
IMM_STUDENT_T_NU: float = 5.0 # Student-t 默认自由度(per-pair 初始化时覆盖)
IMM_TRANSITION_PROB: float = 0.98
IMM_HIGH_Q_THRESHOLD: float = 1e-3
IMM_PHI_BETA: float = 0.98 # OU 均值回归系数(β)
IMM_PHI_ALPHA: float = 0.995 # OU 均值回归系数(α)
IMM_OBS_THRESHOLD: float = 1e-6 # 可观测性阈值(|r_btc| < 此值跳过 β 更新)
IMM_MU_FLOOR: float = 1e-6 # 模型概率下限
# ═══ Hedge Ratio 参数 ═══
HEDGE_BETA_MIN: float = 0.1 # β 下限保护
HEDGE_BETA_MAX: float = 5.0 # β 上限保护
HEDGE_BETA_P_MAX: float = 0.5 # P_β 超过此值时降级为 OLS β
文件:src/utils/analysis/analysis_core.py
新增类:_KalmanModel(单一 Kalman 模型,IMM 的内部组件)
import numpy as np
from scipy.special import gammaln
from scipy.stats import kurtosis as sp_kurtosis
def _estimate_student_t_nu(residuals: np.ndarray, default: float = 5.0) -> float:
"""从 OLS 残差的 kurtosis 估计 Student-t 自由度 ν
公式: ν = max(3, 6 / (kurtosis/3 - 1) + 2)
其中 kurtosis 使用 excess kurtosis(正态分布为 0)
"""
if len(residuals) < 20:
return default
kurt = float(sp_kurtosis(residuals, fisher=True)) # excess kurtosis
if kurt <= 0:
return min(default, 30.0) # 轻尾或正态 → 用较大 ν
nu = 6.0 / (kurt / 3.0) + 2.0 # 简化: 6/(excess_kurt/3) + 2
return float(np.clip(nu, 3.0, 30.0))
class _KalmanModel:
"""单一 Kalman Filter 模型(IMM 的内部组件)
2D 状态 [α, β],OU 均值回归 + 固定 Q + 共享 R。
Student-t 似然 + Joseph 稳定形式 + Huber clipping。
"""
def __init__(
self,
x: np.ndarray, # [α, β] 初始状态
P: np.ndarray, # 2×2 初始协方差
q_alpha: float,
q_beta: float,
R: float,
x_bar: np.ndarray, # OU 长期均值 [ᾱ, β̄]
phi: np.ndarray, # OU 回归系数 [Φ_α, Φ_β]
r_floor: float = 1e-8,
clip_sigma: float = 3.0,
nu: float = 5.0, # Student-t 自由度
):
self.x = x.copy()
self.P = P.copy()
self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
self.R = max(R, r_floor)
self._x_bar = x_bar.copy()
self._phi = np.diag(phi).astype(np.float64) # 2×2 对角矩阵
self._I_phi = np.eye(2) - self._phi # I - Φ
self._r_floor = r_floor
self._clip_sigma = clip_sigma
self._nu = nu
# Student-t 常数预计算
self._log_t_const = (
gammaln((nu + 1) / 2) - gammaln(nu / 2)
- 0.5 * np.log(nu * np.pi)
)
def predict(self):
"""OU 预测步(从混合后的状态出发)"""
self.x = self._phi @ self.x + self._I_phi @ self._x_bar
self.P = self._phi @ self.P @ self._phi.T + self.Q
def update(self, r_btc: float, r_alt: float) -> dict:
"""Kalman 观测更新,返回滤波结果和 Student-t 似然
注意: 调用前必须先调用 predict()(IMM 流程中由外层控制)
"""
H = np.array([1.0, r_btc], dtype=np.float64)
# Innovation
innovation = r_alt - H @ self.x
HP_H = float(H @ self.P @ H)
S = HP_H + self.R
S = max(S, 1e-15)
# Student-t 似然(用原始 innovation,对重尾鲁棒)
log_likelihood = (
self._log_t_const
- 0.5 * np.log(S)
- ((self._nu + 1) / 2) * np.log(1 + innovation ** 2 / (self._nu * S))
)
# Huber clipping(保护状态更新,不影响似然)
sqrt_S = S ** 0.5
norm_innov = innovation / sqrt_S
clipped = False
innovation_for_update = innovation
if abs(norm_innov) > self._clip_sigma:
innovation_for_update = self._clip_sigma * sqrt_S * (
1.0 if innovation > 0 else -1.0
)
clipped = True
# Kalman 增益 + 更新
K = (self.P @ H) / S
self.x = self.x + K * innovation_for_update
# Joseph 稳定形式
I2 = np.eye(2)
A = I2 - np.outer(K, H)
self.P = A @ self.P @ A.T + self.R * np.outer(K, K)
return {
'alpha': float(self.x[0]),
'beta': float(self.x[1]),
'P_beta': float(self.P[1, 1]),
'P_alpha': float(self.P[0, 0]),
'innovation': innovation,
'normalized_innovation': norm_innov,
'S': S,
'HP_H': HP_H,
'log_likelihood': log_likelihood,
'clipped': clipped,
'kalman_gain_beta': float(K[1]),
}
新增类:IMMKalmanBetaEstimator(IMM 多模型融合估计器)
class IMMKalmanBetaEstimator:
"""Interacting Multiple Model Kalman Filter 时变 [α, β] 联合估计器
核心思想 (Blom & Bar-Shalom, 1988):
M 个 Kalman Filter 并行运行,每个使用不同的 Q_β,
通过贝叶斯模型概率实时加权融合。
v3.1 改进(相比 v3 原方案):
1. OU 均值回归状态方程: x_t = Φx_{t-1} + (I-Φ)x̄ + w_t
→ P_β 有稳态解 P_∞ = Q/(1-Φ²),消除长期协方差漂移
2. Per-pair Student-t ν: 从 OLS 残差 kurtosis 自动推算
→ 不同币对自动适配尾部厚度
3. Robbins-Monro 无截断: 去掉 max(.,0),消除正偏差
→ R 更新使用预测概率 μ̃ 避免循环依赖
4. 可观测性保护: |r_btc| < ε_obs 时跳过 β 更新
→ 防止低波动期 P_β 膨胀和 regime_score 失真
双用途输出:
- beta → hedge ratio(直接使用概率加权 β)
- regime_score → 体制检测(高Q模型的总概率)
每根新 4h K线调用 update() 一次。
"""
def __init__(
self,
alpha_init: float,
beta_init: float,
q_beta_grid: list[float] | None = None,
q_alpha: float = 1e-5,
r_init: float = 1e-2,
p0_alpha: float = 0.1,
p0_beta: float = 1.0,
r_floor: float = 1e-8,
r_gamma: float = 0.02,
clip_sigma: float = 3.0,
nu: float = 5.0,
transition_prob: float = 0.98,
high_q_threshold: float = 1e-3,
phi_beta: float = 0.98,
phi_alpha: float = 0.995,
obs_threshold: float = 1e-6,
mu_floor: float = 1e-6,
):
if q_beta_grid is None:
q_beta_grid = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
self.M = len(q_beta_grid)
self._q_beta_grid = list(q_beta_grid)
self._high_q_threshold = high_q_threshold
self._r_gamma = r_gamma
self._r_floor = r_floor
self._nu = nu
self._clip_sigma = clip_sigma
self._obs_threshold = obs_threshold
self._mu_floor = mu_floor
self._n_updates = 0
self._last_observable = True
# OU 参数
x_bar = np.array([alpha_init, beta_init], dtype=np.float64)
phi = np.array([phi_alpha, phi_beta], dtype=np.float64)
self._x_bar = x_bar
self._phi_vec = phi
# 初始化 M 个 Kalman 模型
x0 = np.array([alpha_init, beta_init], dtype=np.float64)
P0 = np.diag([p0_alpha, p0_beta]).astype(np.float64)
self._models = [
_KalmanModel(x0, P0, q_alpha, q_b, r_init, x_bar, phi,
r_floor, clip_sigma, nu)
for q_b in q_beta_grid
]
# 模型概率(均匀初始化)
self._mu = np.ones(self.M) / self.M
# 带宽转移概率矩阵(相邻模型跳转概率更高)
self._TPM = np.zeros((self.M, self.M))
for i in range(self.M):
weights = np.array([
np.exp(-abs(i - j)) if i != j else 0.0
for j in range(self.M)
])
w_sum = weights.sum()
if w_sum > 0:
self._TPM[i] = weights / w_sum * (1.0 - transition_prob)
self._TPM[i, i] = transition_prob
# 高Q模型索引(预计算)
self._high_q_indices = [
j for j, q in enumerate(q_beta_grid) if q >= high_q_threshold
]
@property
def beta(self) -> float:
return float(sum(
self._mu[j] * self._models[j].x[1] for j in range(self.M)
))
@property
def beta_variance(self) -> float:
b = self.beta
return float(sum(
self._mu[j] * (self._models[j].P[1, 1] + (self._models[j].x[1] - b) ** 2)
for j in range(self.M)
))
@property
def regime_score(self) -> float:
return float(sum(self._mu[j] for j in self._high_q_indices))
@property
def effective_q_beta(self) -> float:
return float(sum(
self._mu[j] * self._q_beta_grid[j] for j in range(self.M)
))
def update(self, r_btc: float, r_alt: float) -> dict:
"""接收一对新的对数收益率,执行 IMM 更新
Returns:
dict with keys: alpha, beta, P_beta, P_alpha,
regime_score, effective_q_beta, model_probs,
dominant_model_idx, innovation, normalized_innovation,
clipped, observable, R
"""
self._n_updates += 1
# ── Step 0: 可观测性检查 ──
observable = abs(r_btc) >= self._obs_threshold
self._last_observable = observable
if not observable:
# r_btc ≈ 0: β 不可观测,仅用 H=[1,0] 更新 α
# 不更新模型概率,保持上一步的 regime_score
for m in self._models:
m.predict()
# 仅更新 α(用 r_alt 作为 α 的观测)
for m in self._models:
H_alpha = np.array([1.0, 0.0], dtype=np.float64)
innov = r_alt - H_alpha @ m.x
S = m.P[0, 0] + m.R
S = max(S, 1e-15)
K = m.P @ H_alpha / S
m.x = m.x + K * innov
A = np.eye(2) - np.outer(K, H_alpha)
m.P = A @ m.P @ A.T + m.R * np.outer(K, K)
beta = sum(self._mu[j] * self._models[j].x[1] for j in range(self.M))
alpha = sum(self._mu[j] * self._models[j].x[0] for j in range(self.M))
P_beta = sum(
self._mu[j] * (self._models[j].P[1, 1] + (self._models[j].x[1] - beta) ** 2)
for j in range(self.M)
)
return {
'alpha': float(alpha), 'beta': float(beta),
'P_beta': float(P_beta), 'P_alpha': 0.0,
'regime_score': self.regime_score,
'effective_q_beta': self.effective_q_beta,
'model_probs': self._mu.copy(),
'dominant_model_idx': int(np.argmax(self._mu)),
'innovation': 0.0, 'normalized_innovation': 0.0,
'clipped': False, 'observable': False,
'kalman_gain_beta': 0.0, 'R': self._models[0].R,
}
# ── Step 1: 混合(Interaction)──
mu_predicted = self._TPM.T @ self._mu # 预测模型概率 (M,)
# 混合权重 w[i,j] = π[i,j] * μ[i] / μ̃[j]
mix_weights = np.zeros((self.M, self.M))
for j in range(self.M):
if mu_predicted[j] > 1e-30:
for i in range(self.M):
mix_weights[i, j] = self._TPM[i, j] * self._mu[i] / mu_predicted[j]
# 为每个模型 j 准备混合后的初始条件
for j in range(self.M):
x_mixed = np.zeros(2)
for i in range(self.M):
x_mixed += mix_weights[i, j] * self._models[i].x
P_mixed = np.zeros((2, 2))
for i in range(self.M):
dx = self._models[i].x - x_mixed
P_mixed += mix_weights[i, j] * (
self._models[i].P + np.outer(dx, dx)
)
self._models[j].x = x_mixed
self._models[j].P = P_mixed
# ── Step 2: 并行滤波(OU 预测 + 观测更新)──
results = []
log_likelihoods = np.zeros(self.M)
for j in range(self.M):
self._models[j].predict() # OU 预测步
r = self._models[j].update(r_btc, r_alt) # 观测更新
results.append(r)
log_likelihoods[j] = r['log_likelihood']
# ── Step 3: 模型概率更新 ──
# 数值稳定的 log-space 计算
log_mu = np.log(np.maximum(mu_predicted, 1e-30)) + log_likelihoods
log_mu -= np.max(log_mu) # 防止溢出
self._mu = np.exp(log_mu)
total = np.sum(self._mu)
if total > 0:
self._mu /= total
else:
self._mu = np.ones(self.M) / self.M
# 模型概率下限保护(防止长期运行后某模型概率下溢到 0)
self._mu = np.maximum(self._mu, self._mu_floor)
self._mu /= self._mu.sum()
# ── Step 4: 在线 R 更新(Robbins-Monro EMA,v3.1 无截断版)──
if self._r_gamma > 0:
# 用预测概率 μ̃ 加权,避免与 Step 3 的循环依赖
weighted_innov_sq = sum(
mu_predicted[j] * results[j]['innovation'] ** 2 for j in range(self.M)
)
weighted_HP_H = sum(
mu_predicted[j] * results[j]['HP_H'] for j in range(self.M)
)
# v3.1: 不截断,允许负值,仅用 R_floor 保护正定
r_update = weighted_innov_sq - weighted_HP_H
new_R = (1 - self._r_gamma) * self._models[0].R + self._r_gamma * r_update
new_R = max(new_R, self._r_floor)
for m in self._models:
m.R = new_R
# ── Step 5: 融合输出 ──
beta = sum(self._mu[j] * results[j]['beta'] for j in range(self.M))
alpha = sum(self._mu[j] * results[j]['alpha'] for j in range(self.M))
P_beta = sum(
self._mu[j] * (results[j]['P_beta'] + (results[j]['beta'] - beta) ** 2)
for j in range(self.M)
)
P_alpha = sum(
self._mu[j] * (results[j]['P_alpha'] + (results[j]['alpha'] - alpha) ** 2)
for j in range(self.M)
)
regime_score = sum(self._mu[j] for j in self._high_q_indices)
effective_q = sum(self._mu[j] * self._q_beta_grid[j] for j in range(self.M))
dominant = int(np.argmax(self._mu))
# 概率加权 innovation
weighted_innovation = sum(
self._mu[j] * results[j]['innovation'] for j in range(self.M)
)
weighted_norm_innov = sum(
self._mu[j] * results[j]['normalized_innovation'] for j in range(self.M)
)
any_clipped = any(results[j]['clipped'] for j in range(self.M)
if self._mu[j] > 0.1)
return {
'alpha': float(alpha),
'beta': float(beta),
'P_beta': float(P_beta),
'P_alpha': float(P_alpha),
'regime_score': float(regime_score),
'effective_q_beta': float(effective_q),
'model_probs': self._mu.copy(),
'dominant_model_idx': dominant,
'innovation': float(weighted_innovation),
'normalized_innovation': float(weighted_norm_innov),
'clipped': any_clipped,
'observable': True,
'kalman_gain_beta': results[dominant]['kalman_gain_beta'],
'R': self._models[0].R,
}
def state_dict(self) -> dict:
return {
'models': [
{'x': m.x.tolist(), 'P': m.P.tolist(), 'Q': m.Q.tolist(), 'R': m.R}
for m in self._models
],
'mu': self._mu.tolist(),
'q_beta_grid': self._q_beta_grid,
'high_q_threshold': self._high_q_threshold,
'n_updates': self._n_updates,
'TPM': self._TPM.tolist(),
'r_gamma': self._r_gamma,
'r_floor': self._r_floor,
'nu': self._nu,
'clip_sigma': self._clip_sigma,
'x_bar': self._x_bar.tolist(),
'phi_vec': self._phi_vec.tolist(),
'obs_threshold': self._obs_threshold,
'mu_floor': self._mu_floor,
}
@classmethod
def from_state_dict(cls, d: dict) -> 'IMMKalmanBetaEstimator':
obj = cls.__new__(cls)
obj.M = len(d['models'])
obj._q_beta_grid = d['q_beta_grid']
obj._high_q_threshold = d['high_q_threshold']
obj._n_updates = d['n_updates']
obj._mu = np.array(d['mu'], dtype=np.float64)
obj._TPM = np.array(d['TPM'], dtype=np.float64)
obj._r_gamma = d.get('r_gamma', 0.02)
obj._r_floor = d.get('r_floor', 1e-8)
obj._nu = d.get('nu', 5.0)
obj._clip_sigma = d.get('clip_sigma', 3.0)
obj._x_bar = np.array(d.get('x_bar', [0.0, 1.0]), dtype=np.float64)
obj._phi_vec = np.array(d.get('phi_vec', [0.995, 0.98]), dtype=np.float64)
obj._obs_threshold = d.get('obs_threshold', 1e-6)
obj._mu_floor = d.get('mu_floor', 1e-6)
obj._last_observable = True
nu = obj._nu
log_t_const = (
gammaln((nu + 1) / 2) - gammaln(nu / 2)
- 0.5 * np.log(nu * np.pi)
)
phi = np.diag(obj._phi_vec)
I_phi = np.eye(2) - phi
obj._models = []
for md in d['models']:
m = _KalmanModel.__new__(_KalmanModel)
m.x = np.array(md['x'], dtype=np.float64)
m.P = np.array(md['P'], dtype=np.float64)
m.Q = np.array(md['Q'], dtype=np.float64)
m.R = md['R']
m._x_bar = obj._x_bar.copy()
m._phi = phi.copy()
m._I_phi = I_phi.copy()
m._r_floor = obj._r_floor
m._clip_sigma = obj._clip_sigma
m._nu = nu
m._log_t_const = log_t_const
obj._models.append(m)
obj._high_q_indices = [
j for j, q in enumerate(obj._q_beta_grid) if q >= obj._high_q_threshold
]
return obj
改动函数:calculate_cointegration_params_dual_window()
在现有 OLS 之后,新增 IMM Kalman Filter 输出:
def calculate_cointegration_params_dual_window(
base_klines, alt_klines,
beta_window=None, zscore_window=None,
kalman_state: dict | None = None, # [新增] IMM 状态
):
# ... 现有 OLS 逻辑不变 ...
# ═══ 新增:IMM Kalman Filter 更新 ═══
kalman_result = None
kalman_state_out = kalman_state
if len(aligned) >= 2:
r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])
if kalman_state is not None:
kf = IMMKalmanBetaEstimator.from_state_dict(kalman_state)
else:
ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
# v3.1: per-pair ν 从 OLS 残差 kurtosis 自动推算
nu_init = _estimate_student_t_nu(
model.resid if hasattr(model, 'resid') else np.array([]),
default=IMM_STUDENT_T_NU,
)
kf = IMMKalmanBetaEstimator(
alpha_init=alpha_ols if use_alpha else 0.0,
beta_init=beta_ols,
q_beta_grid=IMM_Q_BETA_GRID,
q_alpha=IMM_Q_ALPHA,
r_init=max(ols_residual_var, 1e-6),
p0_alpha=IMM_P0_ALPHA, p0_beta=IMM_P0_BETA,
r_floor=IMM_R_FLOOR, r_gamma=IMM_R_GAMMA,
clip_sigma=IMM_CLIP_SIGMA, nu=nu_init,
transition_prob=IMM_TRANSITION_PROB,
high_q_threshold=IMM_HIGH_Q_THRESHOLD,
phi_beta=IMM_PHI_BETA, phi_alpha=IMM_PHI_ALPHA,
obs_threshold=IMM_OBS_THRESHOLD,
mu_floor=IMM_MU_FLOOR,
)
kalman_result = kf.update(r_btc, r_alt)
kalman_state_out = kf.state_dict()
return {
# ... 现有字段不变 ...
'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
'kalman_regime_score': kalman_result['regime_score'] if kalman_result else 0.0,
'kalman_effective_q': kalman_result['effective_q_beta'] if kalman_result else 1e-4,
'kalman_model_probs': kalman_result['model_probs'].tolist() if kalman_result else None,
'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
'kalman_clipped': kalman_result['clipped'] if kalman_result else False,
'kalman_observable': kalman_result['observable'] if kalman_result else True,
'kalman_state': kalman_state_out,
}
改动函数:analyze_pair_advanced() / analyze_multi_period()
透传方式与 v2 相同,将 v2 的 kalman_q_beta / kalman_innov_sq_ema 替换为 kalman_regime_score / kalman_effective_q / kalman_model_probs / kalman_observable。
4.2 IMM 体制检测(用途 1)
设计原理
v2 使用 BOCPD 监控 normalized innovation 的分布变化来推断体制切换,存在与 Q 自适应的信号竞争问题。
v3.1 直接使用 IMM 的 regime_score(高Q模型的总概率)作为体制指标。这是一个天然的贝叶斯后验概率,不需要额外的变点检测器。
regime_score 定义:
regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j
= P(β 正在以 ≥ q_high 的速率变化 | 历史数据)
| regime_score | 含义 | 动作 |
|---|---|---|
| < soft_prob (0.3) | 低Q模型主导,β 稳定 | 正常入场 |
| [soft_prob, hard_prob) | 高Q模型概率上升,β 开始变化 | 阈值缩放 |
| ≥ hard_prob (0.7) | 高Q模型主导,β 正在剧烈变化 | 硬拦截 |
可观测性保护:当 kalman_observable=False(r_btc ≈ 0)时,regime_score 保持上一步值,体制判定不做更新。
文件:src/trading/config.py
StrategyParams 新增字段(v3.1 简化版,替代 v2 的 BOCPD 参数):
@dataclass(frozen=True)
class StrategyParams:
# ... 现有字段 ...
# Beta 体制自适应过滤器(v3.1: 基于 IMM 模型概率)
beta_regime_enabled: bool = True
beta_regime_soft_prob: float = 0.3 # regime_score > 此值 → 开始缩放阈值
beta_regime_hard_prob: float = 0.7 # regime_score > 此值 → 硬拦截
beta_regime_scale_max: float = 2.0 # 阈值最大缩放倍数
beta_regime_warmup: int = 5 # 最少 N 根 4h K线才开始判定
对比 v2 参数量:v2 有 10 个 BOCPD 参数(hazard_rate, max_run_length, adaptive_hazard, hazard_min, hazard_max, hazard_innov_ref, soft_prob, hard_prob, scale_max, warmup),v3.1 仅需 4 个参数(soft_prob, hard_prob, scale_max, warmup)。
文件:src/trading/strategy.py
_BetaRegimeState 数据类
@dataclass
class _BetaRegimeState:
"""Beta 体制检测结果(v3.1: 基于 IMM 模型概率)"""
regime: str # 'stable' | 'expanding'
regime_score: float # P(高Q模型) ∈ [0, 1]
kalman_P_beta: float
effective_q_beta: float
threshold_scale: float # 阈值缩放因子 (>=1.0)
hard_block: bool
reason: str
observable: bool # r_btc 是否足够大(β 可观测)
_IMMRegimeDetector 类(替代 v2 的 _BOCPDDetector)
class _IMMRegimeDetector:
"""基于 IMM 模型概率的 Beta 体制检测器
直接使用 IMMKalmanBetaEstimator 输出的 regime_score
(高Q模型总概率)判定 β 是否处于快速变化状态。
相比 v2 的 BOCPD:
- 无信号竞争(体制信号是 IMM 的内生输出)
- 无额外参数(不需要 hazard rate, max_run_length 等)
- 校准更好(regime_score 是贝叶斯后验概率)
- 可观测性感知(r_btc ≈ 0 时不更新判定)
内存: O(1) per pair(仅存储最新状态)
计算: O(1) per check(简单阈值比较)
"""
def __init__(self):
self._pair_states: dict[PairKey, _BetaRegimeState] = {}
self._n_updates: dict[PairKey, int] = {}
def update(
self,
key: PairKey,
regime_score: float,
kalman_P_beta: float,
effective_q_beta: float,
kline_time: str,
soft_prob: float,
hard_prob: float,
scale_max: float,
warmup: int,
observable: bool = True,
) -> _BetaRegimeState:
"""根据 IMM regime_score 判定当前体制"""
n = self._n_updates.get(key, 0) + 1
self._n_updates[key] = n
if n < warmup:
state = _BetaRegimeState(
'stable', regime_score, kalman_P_beta,
effective_q_beta, 1.0, False, "数据不足", observable
)
self._pair_states[key] = state
return state
# 可观测性保护: r_btc ≈ 0 时保持上一步判定
if not observable:
prev = self._pair_states.get(key)
if prev is not None:
state = _BetaRegimeState(
prev.regime, prev.regime_score, kalman_P_beta,
effective_q_beta, prev.threshold_scale, prev.hard_block,
f"{prev.reason} (β不可观测,保持)", False
)
else:
state = _BetaRegimeState(
'stable', regime_score, kalman_P_beta,
effective_q_beta, 1.0, False, "β不可观测", False
)
self._pair_states[key] = state
return state
if regime_score >= hard_prob:
state = _BetaRegimeState(
'expanding', regime_score, kalman_P_beta,
effective_q_beta, scale_max, True,
f"Beta硬拦截: regime_score={regime_score:.3f}>={hard_prob} "
f"eff_Q={effective_q_beta:.1e}", True
)
elif regime_score >= soft_prob:
t = (regime_score - soft_prob) / max(hard_prob - soft_prob, 0.01)
t = min(max(t, 0.0), 1.0)
scale = 1.0 + t * (scale_max - 1.0)
state = _BetaRegimeState(
'expanding', regime_score, kalman_P_beta,
effective_q_beta, scale, False,
f"Beta缩放: regime_score={regime_score:.3f} scale={scale:.2f} "
f"eff_Q={effective_q_beta:.1e}", True
)
else:
state = _BetaRegimeState(
'stable', regime_score, kalman_P_beta,
effective_q_beta, 1.0, False,
f"Beta稳定: regime_score={regime_score:.3f} "
f"eff_Q={effective_q_beta:.1e}", True
)
self._pair_states[key] = state
return state
def get_all_states(self) -> dict[PairKey, _BetaRegimeState]:
return self._pair_states
def cleanup_pair(self, key: PairKey) -> None:
self._pair_states.pop(key, None)
self._n_updates.pop(key, None)
4.3 跨配对系统性风险聚合(v3.1 双阈值 + 确认窗口版)
设计原理
v2 使用 weighted_cp >= systemic_threshold × 0.8 将加权概率阈值耦合到计数比例阈值。v3 将两个阈值完全独立设定。v3.1 进一步新增确认窗口,要求连续 N 根 K 线都触发才执行全局拦截,避免单根 K 线噪声导致全系统停摆。
文件:src/trading/config.py
@dataclass(frozen=True)
class StrategyParams:
# ... 现有字段 ...
# 系统性风险参数(v3.1: 独立阈值 + 确认窗口)
systemic_risk_enabled: bool = True
systemic_ratio_threshold: float = 0.3 # expanding 配对比例阈值
systemic_weighted_threshold: float = 0.25 # 加权 regime_score 阈值(独立设定)
systemic_confirm_bars: int = 2 # 连续触发 N 根 K 线才拦截(防噪声)
文件:src/trading/strategy.py
class _SystemicRiskAggregator:
"""统计所有配对的 IMM 体制状态,
基于双独立阈值 + 确认窗口判断系统性风险。
v3.1 改进:
- ratio_threshold 和 weighted_threshold 完全独立(消除 0.8 魔术数字)
- 确认窗口: 连续 N 根 K 线触发才执行拦截(防止单根噪声停摆)
- 加权聚合支持仓位权重
"""
def __init__(self, enabled: bool = True):
self._enabled = enabled
self._consecutive_triggers = 0
def check(
self,
pair_states: dict[PairKey, _BetaRegimeState],
ratio_threshold: float = 0.3,
weighted_threshold: float = 0.25,
confirm_bars: int = 2,
position_weights: dict[PairKey, float] | None = None,
) -> tuple[bool, str]:
if not self._enabled or not pair_states:
self._consecutive_triggers = 0
return False, ""
total = len(pair_states)
n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
ratio = n_expanding / total
# 加权平均 regime_score
if position_weights:
total_weight = sum(position_weights.get(k, 1.0) for k in pair_states)
weighted_score = sum(
position_weights.get(k, 1.0) * s.regime_score
for k, s in pair_states.items()
) / total_weight
else:
weighted_score = sum(
s.regime_score for s in pair_states.values()
) / total
# 双独立条件
ratio_triggered = ratio >= ratio_threshold
weighted_triggered = weighted_score >= weighted_threshold
if ratio_triggered or weighted_triggered:
self._consecutive_triggers += 1
if self._consecutive_triggers >= confirm_bars:
return True, (
f"系统性风险(连续{self._consecutive_triggers}次): "
f"{n_expanding}/{total} ({ratio:.0%}) 配对扩张态"
f"{'(比例触发)' if ratio_triggered else ''}, "
f"加权regime_score={weighted_score:.3f}"
f"{'(加权触发)' if weighted_triggered else ''}"
)
return False, (
f"系统性风险待确认({self._consecutive_triggers}/{confirm_bars}): "
f"{n_expanding}/{total} ({ratio:.0%})"
)
self._consecutive_triggers = 0
return False, ""
4.4 Beta 加权仓位计算(用途 2,与 v2 相同)
设计原理
标准配对交易的对冲方程:
spread_t = log(P_alt_t) - β × log(P_base_t) - α
要使 spread 对 P_base 的变动无敞口,两腿名义价值须满足:
alt_notional = |β| × base_notional
hedge_beta 选择逻辑
def resolve_hedge_beta(
kalman_beta: float | None,
kalman_P_beta: float | None,
ols_beta: float | None,
p_beta_max: float = HEDGE_BETA_P_MAX,
beta_min: float = HEDGE_BETA_MIN,
beta_max: float = HEDGE_BETA_MAX,
) -> tuple[float, str, bool]:
"""选择用于仓位计算的 hedge_beta
优先 Kalman β,不确定性过高时降级为 OLS β,最后兜底 β=1.0
Returns:
(hedge_beta, source, beta_negative)
hedge_beta: 绝对值,用于仓位比例计算
source: 'kalman' | 'ols' | 'default'
beta_negative: True 表示负相关配对,策略层需翻转方向
"""
raw_beta = None
source = 'default'
if kalman_beta is not None and kalman_P_beta is not None:
if kalman_P_beta <= p_beta_max:
raw_beta = kalman_beta
source = 'kalman'
else:
logger.debug(f"Kalman P_β={kalman_P_beta:.4f} > {p_beta_max},降级为 OLS β")
if raw_beta is None and ols_beta is not None:
raw_beta = ols_beta
source = 'ols'
if raw_beta is None:
return 1.0, 'default', False
beta_negative = raw_beta < 0
beta = np.clip(abs(raw_beta), beta_min, beta_max)
if beta_negative:
logger.warning(f"负 β 检测: raw_β={raw_beta:.4f}({source}),配对为反向相关")
return float(beta), source, beta_negative
文件:src/trading/risk_manager.py
改动 calculate_position_size() 增加 hedge_beta 参数(与 v2 相同):
def calculate_position_size(
self,
signal: PairTradeSignal,
alt_price: float,
base_price: float = 0.0,
available_balance: float = 0.0,
hedge_beta: float = 1.0,
hedge_beta_source: str = '',
) -> tuple[float, float]:
"""计算仓位大小(Beta 加权)
Beta 加权逻辑:
base_notional = total_position / (1 + |β|)
alt_notional = |β| × base_notional
总名义价值 = base_notional + alt_notional = total_position(保持不变)
当 β=1.0 时退化为等额(向后兼容)。
"""
# ... 现有 base_usd / 信号强度缩放 / 上限检查 / 余额检查 逻辑不变 ...
alt_size = 0.0
base_size = 0.0
if self._config.pair_mode == "pair" and base_price > 0 and alt_price > 0:
abs_beta = max(abs(hedge_beta), 0.1)
total_position = position_usd * 2
base_notional = total_position / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
alt_size = alt_notional / alt_price
base_size = base_notional / base_price
logger.info(
f"仓位计算(β加权): 基础=${base_usd:.0f} × 缩放={scale} | "
f"β={hedge_beta:.3f}({hedge_beta_source}) | "
f"Alt: {alt_size:.2f} × ${alt_price:.4f} ≈ ${alt_notional:.0f} | "
f"Base: {base_size:.2f} × ${base_price:.4f} ≈ ${base_notional:.0f} | "
f"比例 {alt_notional/base_notional:.2f}:1"
)
elif alt_price > 0:
alt_size = position_usd / alt_price
return alt_size, base_size
文件:src/trading/models.py
与 v2 相同:
@dataclass
class PairTradeSignal:
# ... 现有字段 ...
hedge_beta: float = 1.0
hedge_beta_source: str = 'default'
beta_negative: bool = False
@dataclass
class PairPosition:
# ... 现有字段 ...
entry_hedge_beta: float = 1.0
4.5 编排层集成
文件:src/trading/orchestrator.py
新增状态:_kalman_states: dict[PairKey, dict]
class TradingOrchestrator:
def __init__(self, ...):
# ... 现有 ...
self._kalman_states: dict[PairKey, dict] = {}
process_analysis() 改动:从 multi_period_result 提取 IMM 输出,传给 strategy.process_tick() 和 on_entry_signal():
def process_analysis(self, symbol, z4h, multi_period_result, timestamp, ...):
# ... 现有输入验证 ...
# [v3.1] 提取 IMM Kalman 输出
kalman_beta = multi_period_result.get('kalman_beta')
kalman_P_beta = multi_period_result.get('kalman_P_beta')
kalman_regime_score = multi_period_result.get('kalman_regime_score')
kalman_effective_q = multi_period_result.get('kalman_effective_q')
kalman_observable = multi_period_result.get('kalman_observable', True)
# [v3.1] 更新 Kalman 状态缓存
kalman_state_new = multi_period_result.get('kalman_state')
if kalman_state_new is not None:
self._kalman_states[(symbol, base_symbol)] = kalman_state_new
entry_signal, exit_signal = self._strategy.process_tick(
symbol, base_symbol, z4h, timestamp,
kline_time=kline_time, latest_price=price_for_log,
kalman_regime_score=kalman_regime_score,
kalman_P_beta=kalman_P_beta,
kalman_effective_q=kalman_effective_q,
kalman_observable=kalman_observable,
alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv,
)
if entry_signal is not None:
ols_beta = multi_period_result.get('details', {}).get(
('4h', '60d'), {}).get('cointegration_new', {}).get('beta')
hedge_beta, hedge_source, beta_negative = resolve_hedge_beta(
kalman_beta, kalman_P_beta, ols_beta
)
if beta_negative:
logger.warning(
f"负 β 配对 {symbol}|{base_symbol}: β={kalman_beta:.3f}, "
f"当前方向逻辑可能需要翻转,建议人工复核"
)
self.on_entry_signal(
symbol, multi_period_result,
latest_alt_price=price_for_log,
direction=entry_signal.direction,
adaptive_z=entry_signal.adaptive_z,
hedge_beta=hedge_beta,
hedge_beta_source=hedge_source,
beta_negative=beta_negative,
)
on_entry_signal() 改动与 v2 相同(传递 hedge_beta 到信号)。
4.6 策略层集成(用途 1)
文件:src/trading/strategy.py
__init__:
self._regime_detector = _IMMRegimeDetector() # v3.1: 替代 v2 的 _BOCPDDetector
self._systemic_aggregator = _SystemicRiskAggregator(enabled=True)
process_tick() 签名:
def process_tick(
self, symbol, base_symbol, z4h, timestamp,
kline_time=None, latest_price=None,
kalman_regime_score: float | None = None,
kalman_P_beta: float | None = None,
kalman_effective_q: float | None = None,
kalman_observable: bool = True,
alt_ohlcv=None, base_ohlcv=None,
) -> tuple[EntrySignal | None, ExitSignal | None]:
_check_entry() — z4h 过滤之后、方向判断之前:
# ── [v3.1] Beta 体制检查(基于 IMM 模型概率)──
if params.beta_regime_enabled and kalman_regime_score is not None:
beta_state = self._regime_detector.update(
key,
regime_score=kalman_regime_score,
kalman_P_beta=kalman_P_beta or 0.0,
effective_q_beta=kalman_effective_q or 1e-4,
kline_time=str(kline_time),
soft_prob=params.beta_regime_soft_prob,
hard_prob=params.beta_regime_hard_prob,
scale_max=params.beta_regime_scale_max,
warmup=params.beta_regime_warmup,
observable=kalman_observable,
)
if beta_state.hard_block:
logger.info(f"Beta体制硬拦截 | {pair_label} | {beta_state.reason} | "
f"az={adaptive_z:+.4f} z4h={z4h:+.4f}")
return None
all_states = self._regime_detector.get_all_states()
is_systemic, systemic_reason = self._systemic_aggregator.check(
all_states,
ratio_threshold=params.systemic_ratio_threshold,
weighted_threshold=params.systemic_weighted_threshold,
confirm_bars=params.systemic_confirm_bars,
)
if is_systemic:
logger.info(f"系统性风险拦截 | {pair_label} | {systemic_reason}")
return None
threshold_scale = beta_state.threshold_scale
else:
beta_state = None
threshold_scale = 1.0
# ── 方向判断(应用 Beta 缩放)──
threshold = params.adaptive_threshold * threshold_scale
if adaptive_z < -threshold:
direction = 'long'
elif adaptive_z > threshold:
direction = 'short'
else:
if threshold_scale > 1.01:
logger.info(f"Beta体制缩放拦截 | {pair_label} | "
f"az={adaptive_z:+.4f} 有效阈值={threshold:.2f} "
f"(原始={params.adaptive_threshold} ×{threshold_scale:.2f}) | "
f"{beta_state.reason if beta_state else ''}")
return None
cleanup_pair():
self._regime_detector.cleanup_pair(key)
5. 完整数据流
1. WebSocket 4h K线闭合
↓
2. realtime_kline_service_base 触发分析
↓
3. analyze_multi_period()
├─ calculate_cointegration_params_dual_window(kalman_state=缓存)
│ ├─ OLS 回归 → β_ols, spread, adf_pvalue(现有)
│ └─ [v3.1] IMM Kalman update:
│ ├─ Step 0: 可观测性检查(|r_btc| < ε_obs → 跳过 β 更新)
│ ├─ Step 1: 混合(带宽 TPM 加权)
│ ├─ Step 2: M=5 并行 OU 预测 + Kalman 更新(Student-t 似然)
│ ├─ Step 3: 贝叶斯模型概率更新(概率下限保护)
│ ├─ Step 4: Robbins-Monro 在线 R 估计(无截断偏差,用 μ̃ 加权)
│ └─ Step 5: 输出 beta, P_beta, regime_score, effective_q, model_probs, R, observable
├─ 健康监控 → Gate2(现有)
└─ 输出: multi_period_result(含 kalman_* 字段)
↓
4. orchestrator.process_analysis()
├─ 缓存 kalman_state → self._kalman_states[pair_key]
├─ 传递 kalman_regime_score, kalman_P_beta, kalman_observable → strategy.process_tick()
│ ├─ [用途 1] _IMMRegimeDetector.update(regime_score, observable) → 体制判定
│ │ └─ 硬拦截 / 阈值缩放 / 不可观测时保持(直接阈值比较,无 BOCPD)
│ └─ [用途 1] _SystemicRiskAggregator.check(confirm_bars) → 系统性拦截(需连续确认)
├─ 若产生 EntrySignal:
│ ├─ [用途 2] resolve_hedge_beta(kalman_beta, P_β, ols_beta)
│ └─ on_entry_signal(hedge_beta=...) → PairTradeSignal
↓
5. position_manager.open_position(signal)
├─ risk_manager.calculate_position_size(hedge_beta=signal.hedge_beta)
│ ├─ base_notional = total / (1 + |β|)
│ └─ alt_notional = |β| × base_notional
├─ executor.market_open(alt_size, base_size)
└─ PairPosition(entry_hedge_beta=β)
↓
6. 持久化
├─ pair_positions: 含 entry_hedge_beta
└─ trading_signals: 含 hedge_beta, hedge_beta_source, beta_negative
6. 场景模拟
A:正常市场(β 稳定) — 两个用途均正常
低Q模型(1e-6, 1e-5) 主导: μ = [0.35, 0.40, 0.20, 0.04, 0.01]
kalman_beta ≈ 0.45, P_β ≈ 0.003 (远低于 OU 稳态 P_∞≈2.5e-3)
用途 1: regime_score = μ[3]+μ[4] = 0.05 < soft_prob(0.3)
→ regime=STABLE, scale=1.0 → 正常入场
用途 2: hedge_beta=0.45(kalman) → alt_notional = 0.45 × base_notional
比等额更少的 Alt 暴露,正确反映 β<1 的弱相关性
B:β 开始飙升 — IMM 模型概率瞬时切换
T=0h: kalman_beta ≈ 0.5, μ = [0.35, 0.40, 0.20, 0.04, 0.01]
regime_score = 0.05 → 正常
T=4h: β 突变 → 高Q模型 likelihood 飙升
μ = [0.05, 0.10, 0.25, 0.40, 0.20]
regime_score = 0.60 > soft_prob(0.3) → EXPANDING, scale≈1.3
effective_q = 2.3e-3(自动切换到快速追踪)
OU 回归约束: β 向 β̄ 回归 2%/步,不会无限漂移
[对比 v2: 此时 Q_β 才刚开始 ×1.05 逐步放大, BOCPD 的信号还被 Q 调整削弱]
T=8h: β 继续变化 → 最高Q模型(1e-2)主导
μ = [0.01, 0.02, 0.07, 0.30, 0.60]
regime_score = 0.90 > hard_prob(0.7) → 硬拦截
[对比 v2: BOCPD P(变点)可能仍 < hard_prob(0.7), 因为信号被 Q 自适应部分吸收]
v3.1 vs v2 响应速度对比:
- v2:Q 自适应需 ~10 步才能放大 Q_β 到合理水平(受 κ_up=1.05 限制);BOCPD 信号被削弱后需更多步
- v3.1:IMM 模型概率在 1-2 步内 完成切换(贝叶斯后验即时更新),regime_score 同步升高
C:β 飙升后企稳 — 模型概率自动回归 + OU 回归
飙升期 regime_score = 0.90 → 硬拦截
企稳后:
高Q模型 likelihood 下降,低Q模型 likelihood 上升
μ 逐步回到 [0.30, 0.35, 0.25, 0.08, 0.02]
→ ~5-8 根正常 K线(20-32h)后 regime_score < soft_prob → 恢复
恢复后:
用途 1: regime=STABLE → 允许入场
用途 2: kalman_beta 已追踪到新 β
OU 过程使 β 向 β̄ 缓慢回归(Φ_β=0.98, 每步回归 2%)
若新 β 持续偏离 → x̄ 应在后续版本支持在线更新
D:β≈0.3 的弱相关配对 — hedge ratio 纠偏最显著
等额开仓: Alt $100 + Base $100 → Alt 非系统性风险暴露 $70(多余)
β 加权: Alt $46 + Base $154 → 正确对冲
E:Kalman 冷启动 — 降级策略
系统重启,kalman_state 丢失:
T=0: 用 OLS β 初始化所有 M=5 个模型,P₀ = diag(0.1, 1.0)
P_β = 1.0 > HEDGE_BETA_P_MAX(0.5) → 降级为 OLS β
μ = [1/5, 1/5, 1/5, 1/5, 1/5](均匀)
x̄ = [α_ols, β_ols](OU 长期均值设为 OLS 估计)
T=3-5 根 K线(12-20h):
模型概率分化,P_β 收敛(OU 保证收敛到稳态)→ 切回 Kalman β
F:系统性风险 + 确认窗口
T=0h: 20 配对中 8 个 regime_score > 0.3 → 40% > ratio_threshold=30%
→ 触发但确认计数=1 < confirm_bars=2 → 不拦截,记录日志
T=4h: 仍有 7 个 expanding → 确认计数=2 ≥ confirm_bars=2
→ 系统性风险拦截 → 所有配对暂停入场
→ IMM 仍在后台更新,恢复后使用最新 β
[对比 v3 原方案: 单根 K 线触发即拦截 → 可能因噪声导致不必要的停摆]
G:MEME vs L1 配对自适应对比
MEME 配对 (PURR/HYPE):
β 波动大 → 高Q模型(1e-3, 1e-2) 常获高概率
effective_q ≈ 5e-4 → 追踪快
regime_score ≈ 0.3-0.5 → 经常处于缩放区间(提高入场门槛)
per-pair ν ≈ 3.5(kurtosis ≈ 15)→ 重尾保护强
L1 配对 (ETH/BTC):
β 极稳定 → 低Q模型(1e-6, 1e-5) 主导
effective_q ≈ 5e-6 → 不追噪声
regime_score ≈ 0.02-0.05 → 几乎总是 stable(正常入场)
per-pair ν ≈ 8.0(kurtosis ≈ 6)→ 接近高斯
H:重尾异常值场景 — Student-t vs 高斯似然对比
稳定期 β≈0.5,某根 K线出现 5σ 异常收益率(闪崩/插针):
高斯似然:
5σ 事件: L_j = exp(-12.5) ≈ 3.7e-6(几乎为零)
→ 高Q模型 likelihood 相对飙升(因为高Q模型的 S 更大,惩罚更轻)
→ 模型概率瞬间切换到高Q,regime_score 可能从 0.05 → 0.70+
→ 误触发硬拦截(实际上 β 并未变化,只是一个异常值)
Student-t(ν=5) 似然:
5σ 事件: L_j ∝ (1 + 25/5)^{-3} ≈ 0.005(合理的小概率)
→ 各模型 likelihood 差异缩小(重尾下异常值不那么"不可能")
→ 模型概率温和调整,regime_score 从 0.05 → 0.15(仍在 stable)
→ 无误拦截
关键差异: 高斯下 4 个数量级的 likelihood 差距 → Student-t 下仅 1 个数量级
I:在线 R 自适应场景 — v3.1 无截断 vs v3 有截断
牛市期(收益率波动 σ≈3%):
R 通过 Robbins-Monro EMA 跟踪到 ≈ 9e-4
突然进入熊市恐慌(σ 从 3% 升到 8%):
v3.1 (无截断): R 在 ~20 步(~3天)内追踪到新波动率水平
v3 (有截断): R 类似追踪,但因 max(.,0) 导致轻微正偏差
σ 从 8% 回落到 2%(恐慌结束):
v3.1 (无截断): ε²-HPH' 频繁为负 → R 正常下降
v3 (有截断): ε²-HPH' < 0 时被截断为 0 → R 下降更慢(正偏差累积)
长期影响: v3 的 R 会系统性高估 → S 偏大 → 各模型 likelihood 差异缩小
→ regime_score 的区分度降低 → 体制检测灵敏度下降
J:r_btc ≈ 0(BTC 横盘)— 可观测性保护
BTC 连续 3 天窄幅震荡(|r_btc| < 1e-6):
无保护 (v3 原方案):
H = [1, ~0] → β 不可观测 → P_β 每步增长 Q_β
3 天 = 18 步 → P_β 增长 18 × Q_β
对 Q_β=1e-4 的模型: ΔP_β = 1.8e-3 → 可能触发 OLS 降级
模型概率更新基于无信息量的似然 → regime_score 随机波动
有保护 (v3.1):
|r_btc| < ε_obs → 跳过整个 IMM 更新
P_β 仅通过 OU 预测步增长,但被 Φ_β<1 抑制: P_β → P_∞(有稳态)
regime_score 保持上一步值(不做无意义的更新)
体制判定标记 observable=False → 不触发任何新的拦截决策
7. 改动文件清单
| 文件 | 改动 | 影响范围 |
|---|---|---|
src/config.py |
+13 IMM 常量(含 Φ_β, Φ_α, ε_obs, μ_floor, Student-t ν)+ 3 Hedge Ratio 常量(删除 v2 的 15 个 Kalman+Q 自适应常量) | 配置层 |
src/utils/analysis/analysis_core.py |
+_estimate_student_t_nu() 函数 + _KalmanModel 类(含 OU 预测)+ IMMKalmanBetaEstimator 类(含可观测性保护、无截断 R 更新、per-pair ν);增强 calculate_cointegration_params_dual_window(), analyze_pair_advanced(), analyze_multi_period() |
分析层 |
src/trading/config.py |
StrategyParams +4 体制检测字段 + 3 系统性风险字段(含 confirm_bars)(替代 v2 的 10 个 BOCPD 字段 + 3 自适应 H 字段);增强 get_strategy_params(), _build_strategy_params(), load_trading_config() |
配置层 |
src/trading/models.py |
PairTradeSignal +3 字段;PairPosition +1 字段(与 v2 相同) |
数据模型 |
src/trading/orchestrator.py |
+_kalman_states 字典;+resolve_hedge_beta() 函数;增强 process_analysis(), on_entry_signal();透传 kalman_observable |
编排层 |
src/trading/strategy.py |
+_BetaRegimeState(含 observable), +_IMMRegimeDetector(含可观测性保护), +_SystemicRiskAggregator(含确认窗口);增强 process_tick(), _check_entry(), cleanup_pair() |
策略层 |
src/trading/risk_manager.py |
增强 calculate_position_size() 支持 β 加权(与 v2 相同) |
风控层 |
src/trading/position_manager.py |
增强 _open_position_inner() 传递 hedge_beta(与 v2 相同) |
仓位管理 |
新增依赖:scipy.special.gammaln(Student-t 似然), scipy.stats.kurtosis(per-pair ν 初始化);scipy 已是项目依赖。
不改动:momentum_filter.py, executor.py
8. DB 改动
与 v2 相同:
pair_positions 表新增列
ALTER TABLE pair_positions ADD COLUMN entry_hedge_beta DOUBLE PRECISION DEFAULT 1.0;
trading_signals 表新增列
ALTER TABLE trading_signals ADD COLUMN hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta_source VARCHAR(10) DEFAULT 'default';
ALTER TABLE trading_signals ADD COLUMN beta_negative BOOLEAN DEFAULT FALSE;
9. 日志
| 时机 | 级别 | 格式 |
|---|---|---|
| 初始化 | INFO | Beta体制跟踪器初始化 | IMM M=5 Q_grid=[1e-6..1e-2] ν={nu:.1f} Φ_β=0.98 γ_R=0.02 p_stay=0.98 soft=0.30 hard=0.70 |
| IMM 更新 | DEBUG | IMM更新 | PURR|HYPE | β̂=1.20 P_β=0.005 regime=0.65 eff_Q=5.2e-4 R=8.3e-4 μ=[.01,.05,.25,.40,.29] obs=True |
| 不可观测 | DEBUG | IMM跳过β更新 | PURR|HYPE | |r_btc|=3e-7 < ε_obs=1e-6 → 保持regime=0.65 |
| hedge_beta 选择 | DEBUG | hedge_beta=0.45(kalman) P_β=0.003 | OLS_β=0.48 | negative=False |
| hedge_beta 降级 | DEBUG | Kalman P_β=0.62 > 0.50,降级为 OLS β=0.48 |
| 负 β 警告 | WARNING | 负β检测: PURR|HYPE raw_β=-0.30(kalman),配对为反向相关 |
| β 加权仓位 | INFO | 仓位计算(β加权) | β=0.45(kalman) | Alt ≈$62 | Base ≈$138 | 比例 0.45:1 |
| 硬拦截 | INFO | Beta体制硬拦截 | PURR|HYPE | regime_score=0.850>=0.70 eff_Q=3.2e-3 |
| 缩放拦截 | INFO | Beta体制缩放拦截 | PURR|HYPE | az=-4.50 有效阈值=4.00 (3.0×1.33) |
| 系统性待确认 | DEBUG | 系统性风险待确认(1/2): 8/20 (40%) 配对扩张态 |
| 系统性拦截 | INFO | 系统性风险(连续2次): 8/20 (40%) 配对扩张态(比例触发), 加权regime_score=0.350(加权触发) |
10. 风险与边界条件
| 风险 | 严重度 | 缓解 |
|---|---|---|
| 已持仓 hedge ratio 偏离 | 高 | 本次仅在入场时计算 β 比例;持仓期间 rebalance 作为 P0 后续 |
| β 符号翻转(kalman_beta < 0) | 高 | resolve_hedge_beta 返回 beta_negative 标记 + 警告日志;方向翻转逻辑作为 P1 后续 |
| OU 长期均值 x̄ 过时 | 中 | x̄ 初始化为 OLS 估计,β 真值长期偏离时 OU 会拉回旧值。缓解:Φ_β=0.98 的回归力很弱(每步仅 2%),高Q模型可临时克服回归力追踪新值。x̄ 在线更新作为 P1 后续 |
| IMM 模型概率塌缩(单模型概率→1.0) | 极低 | 转移矩阵保证至少 2% 概率流入 + 概率下限保护 μ_floor=1e-6;双重机制防止塌缩 |
| 重启丢失 Kalman 状态 | 中 | OLS 重新初始化 M 个模型,3-5 根 K线收敛;P_β 过大时自动降级为 OLS β;OU 保证 P_β 收敛到稳态 |
| IMM 计算量(5× 单 Kalman) | 低 | 2×2 状态 ×5 模型 ≈ 100 次浮点运算,4h 频率下完全可忽略 |
| Q_β 网格覆盖不足 | 低 | 5 个值覆盖 4 个数量级(1e-6 到 1e-2),已包含绝大部分实际场景 |
| 低波动期 r_btc≈0 | 低 | v3.1 可观测性保护:跳过 β 更新 + OU 约束 P_β 增长 |
| β 加权导致极端仓位 | 低 | HEDGE_BETA_MIN=0.1, HEDGE_BETA_MAX=5.0 上下限保护 |
| R 在线估计暂态振荡 | 低 | γ_R=0.02 的低通滤波 + R_floor 兜底;无截断偏差使 R 在暂态后准确收敛 |
| Per-pair ν 初始化偏差 | 低 | OLS 残差可能因样本量不足导致 kurtosis 估计偏差;ν 被 clip 到 [3,30];回退到默认 ν=5.0 |
不在本次范围:退场逻辑调整、持仓期间 hedge rebalance、HMM 体制分类、Kalman 持久化到 DB、飞书告警、负 β 方向翻转逻辑、x̄ 在线更新
11. 验证方案
单元测试
# test_imm_kalman.py
import numpy as np
def test_imm_convergence():
"""从初始值收敛到真实 [α, β]"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(200):
r_btc = rng.normal(0, 0.02)
r_alt = 0.001 + 1.0 * r_btc + rng.normal(0, 0.01)
result = kf.update(r_btc, r_alt)
assert abs(result['beta'] - 1.0) < 0.15
assert result['alpha'] > 0
def test_imm_ou_steady_state():
"""OU 过程: P_β 收敛到稳态,不会无界增长"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
p_betas = []
for _ in range(500):
r_btc = rng.normal(0, 0.02)
result = kf.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.01))
p_betas.append(result['P_beta'])
# P_β 应收敛(后 100 步的方差远小于前 100 步)
var_early = np.var(p_betas[50:150])
var_late = np.var(p_betas[400:500])
assert var_late < var_early * 0.5, f"P_β should stabilize: var_early={var_early}, var_late={var_late}"
# P_β 不应超过理论稳态上限(最大模型的 P_∞ ≈ 0.25)
assert max(p_betas[100:]) < 0.5, f"P_β should not grow unbounded: max={max(p_betas[100:])}"
def test_imm_ou_mean_reversion():
"""OU 过程: β 在无观测信息时向 β̄ 回归"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2,
obs_threshold=1e-6)
rng = np.random.default_rng(42)
# 先训练到 β≈1.0
for _ in range(100):
r_btc = rng.normal(0, 0.02)
kf.update(r_btc, 1.0 * r_btc + rng.normal(0, 0.01))
beta_before = kf.beta
# 30 步 r_btc=0(不可观测,触发 OU 仅预测)
for _ in range(30):
kf.update(0.0, rng.normal(0, 0.001))
beta_after = kf.beta
# β 应向 β̄=0.5 方向回归
assert abs(beta_after - 0.5) < abs(beta_before - 0.5), \
f"β should revert toward β̄=0.5: before={beta_before:.3f}, after={beta_after:.3f}"
def test_imm_regime_score_stable():
"""β 稳定时 regime_score 低"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(50):
r_btc = rng.normal(0, 0.02)
result = kf.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.01))
assert result['regime_score'] < 0.3, f"Stable β should have low regime_score: {result['regime_score']}"
def test_imm_regime_score_spike():
"""β 突变时 regime_score 立即飙升"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
for _ in range(5):
r_btc = rng.normal(0, 0.02)
result = kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
assert result['regime_score'] > 0.3, f"β jump should increase regime_score: {result['regime_score']}"
def test_imm_regime_score_recovery():
"""β 企稳后 regime_score 回落"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
for _ in range(10):
r_btc = rng.normal(0, 0.02)
kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
for _ in range(30):
r_btc = rng.normal(0, 0.02)
result = kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
assert result['regime_score'] < 0.3, f"After stabilization, regime_score should decrease: {result['regime_score']}"
def test_imm_effective_q_adaptation():
"""MEME 配对应有更高的 effective_q,L1 配对应有更低的"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
rng = np.random.default_rng(42)
kf_l1 = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
for _ in range(100):
r_btc = rng.normal(0, 0.02)
kf_l1.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.005))
q_l1 = kf_l1.effective_q_beta
kf_meme = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
beta_val = 0.5
for i in range(100):
if i % 10 == 0:
beta_val += rng.normal(0, 0.5)
r_btc = rng.normal(0, 0.02)
kf_meme.update(r_btc, beta_val * r_btc + rng.normal(0, 0.01))
q_meme = kf_meme.effective_q_beta
assert q_meme > q_l1, f"MEME should have higher effective_q: {q_meme} vs {q_l1}"
def test_imm_model_probs_sum_to_one():
"""模型概率总和始终为 1"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(200):
r_btc = rng.normal(0, 0.02)
result = kf.update(r_btc, rng.normal(0, 0.03))
assert abs(sum(result['model_probs']) - 1.0) < 1e-10
def test_imm_huber_clipping():
"""极端 innovation 被截断,β 不过度跳变"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, clip_sigma=3.0)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
result = kf.update(0.02, 20.0 * 0.02)
assert result['clipped'] is True
assert result['beta'] < 5.0
def test_imm_joseph_positive():
"""Joseph 形式保证 P 始终非负"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-6)
rng = np.random.default_rng(42)
for _ in range(500):
result = kf.update(rng.normal(0, 0.05), 0.5 * rng.normal(0, 0.05) + rng.normal(0, 0.001))
assert result['P_beta'] >= 0 and result['P_alpha'] >= 0
def test_imm_alpha_absorbs_drift():
"""α 吸收独立漂移,β 不被污染"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(200):
result = kf.update(rng.normal(0, 0.02), 0.005 + 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
assert abs(result['beta'] - 0.5) < 0.2
assert result['alpha'] > 0.001
def test_imm_observability_guard():
"""r_btc ≈ 0 时跳过 β 更新,标记 observable=False"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2,
obs_threshold=1e-6)
rng = np.random.default_rng(42)
# 正常更新
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
rs_before = kf.regime_score
beta_before = kf.beta
# r_btc ≈ 0
result = kf.update(1e-8, rng.normal(0, 0.01))
assert result['observable'] is False
# regime_score 应保持不变(不更新模型概率)
assert abs(result['regime_score'] - rs_before) < 1e-10
def test_imm_state_persistence():
"""状态导出/恢复后行为一致(含 OU 参数)"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf1 = IMMKalmanBetaEstimator(
alpha_init=0.0, beta_init=0.5, r_init=1e-2,
nu=7.0, r_gamma=0.05, clip_sigma=2.5, r_floor=1e-7,
phi_beta=0.97, phi_alpha=0.99
)
kf1.update(0.02, 0.01)
state = kf1.state_dict()
assert state['nu'] == 7.0
assert state['r_gamma'] == 0.05
assert state['phi_vec'] == [0.99, 0.97]
kf2 = IMMKalmanBetaEstimator.from_state_dict(state)
r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
assert abs(r1['beta'] - r2['beta']) < 1e-10
assert abs(r1['R'] - r2['R']) < 1e-15
def test_imm_no_signal_competition():
"""v3.1 核心验证: regime_score 不会被 β 追踪吸收"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
for i in range(50):
r_btc = 0.01 * (1 if i % 2 == 0 else -1)
kf.update(r_btc, 0.5 * r_btc)
regime_scores_during_change = []
beta_val = 0.5
for i in range(20):
beta_val += 0.1
r_btc = 0.015 * (1 if i % 2 == 0 else -1)
result = kf.update(r_btc, beta_val * r_btc)
regime_scores_during_change.append(result['regime_score'])
max_score = max(regime_scores_during_change)
assert max_score > 0.5, (
f"During β change, regime_score should be high (got max={max_score}). "
f"If this fails, there may be signal competition."
)
def test_imm_student_t_robustness():
"""Student-t 似然使模型概率对异常值更鲁棒"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf_t = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, nu=5.0)
kf_g = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, nu=1000.0)
for i in range(50):
r_btc = 0.01 * (1 if i % 2 == 0 else -1)
kf_t.update(r_btc, 0.5 * r_btc + 0.001)
kf_g.update(r_btc, 0.5 * r_btc + 0.001)
rs_t_before = kf_t.regime_score
rs_g_before = kf_g.regime_score
result_t = kf_t.update(0.02, 0.5 * 0.02 + 0.15)
result_g = kf_g.update(0.02, 0.5 * 0.02 + 0.15)
delta_t = result_t['regime_score'] - rs_t_before
delta_g = result_g['regime_score'] - rs_g_before
assert delta_t < delta_g, (
f"Student-t should be more robust: Δ_t={delta_t:.4f} vs Δ_g={delta_g:.4f}"
)
def test_imm_online_r_no_bias():
"""v3.1: 无截断 R 估计在噪声下降时正确减小 R"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
rng = np.random.default_rng(42)
kf = IMMKalmanBetaEstimator(
alpha_init=0.0, beta_init=0.5, r_init=1e-4, r_gamma=0.05
)
# 高噪声期
for _ in range(50):
r_btc = rng.normal(0, 0.02)
kf.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.05))
R_high = kf._models[0].R
# 低噪声期
for _ in range(80):
r_btc = rng.normal(0, 0.02)
kf.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.005))
R_low = kf._models[0].R
assert R_low < R_high * 0.5, (
f"R should decrease when noise drops: R_high={R_high:.6f}, R_low={R_low:.6f}"
)
def test_imm_per_pair_nu():
"""Per-pair ν: 高 kurtosis 数据应得到更小的 ν"""
from src.utils.analysis.analysis_core import _estimate_student_t_nu
# 正态残差 → 高 ν
rng = np.random.default_rng(42)
normal_resid = rng.normal(0, 1, 200)
nu_normal = _estimate_student_t_nu(normal_resid)
assert nu_normal > 10, f"Normal data should give high ν: {nu_normal}"
# 重尾残差 → 低 ν
heavy_resid = rng.standard_t(df=3, size=200)
nu_heavy = _estimate_student_t_nu(heavy_resid)
assert nu_heavy < 7, f"Heavy-tailed data should give low ν: {nu_heavy}"
assert nu_heavy < nu_normal, f"Heavy-tail ν should be smaller: {nu_heavy} vs {nu_normal}"
def test_imm_bandwidth_tpm():
"""带宽 TPM: 相邻模型跳转概率 > 远端模型"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
tpm = kf._TPM
assert tpm[2, 1] > tpm[2, 0], "Adjacent model should have higher transition prob"
assert tpm[2, 3] > tpm[2, 4], "Adjacent model should have higher transition prob"
for i in range(kf.M):
assert abs(tpm[i].sum() - 1.0) < 1e-10, f"Row {i} should sum to 1"
def test_imm_model_prob_floor():
"""模型概率下限保护: 长期运行后没有模型概率降为 0"""
from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
for i in range(500):
r_btc = 0.01 * (1 if i % 2 == 0 else -1)
result = kf.update(r_btc, 0.5 * r_btc)
for j, p in enumerate(result['model_probs']):
assert p >= 1e-6, f"Model {j} prob should not collapse to 0: {p}"
# test_hedge_beta.py(与 v2 相同)
import numpy as np
def test_resolve_hedge_beta_kalman():
from src.trading.orchestrator import resolve_hedge_beta
beta, source, negative = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.01, ols_beta=0.5)
assert source == 'kalman'
assert abs(beta - 0.45) < 0.01
assert negative is False
def test_resolve_hedge_beta_fallback_ols():
from src.trading.orchestrator import resolve_hedge_beta
beta, source, negative = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.8, ols_beta=0.5)
assert source == 'ols'
assert abs(beta - 0.5) < 0.01
def test_resolve_hedge_beta_fallback_default():
from src.trading.orchestrator import resolve_hedge_beta
beta, source, negative = resolve_hedge_beta(kalman_beta=None, kalman_P_beta=None, ols_beta=None)
assert source == 'default'
assert beta == 1.0
assert negative is False
def test_resolve_hedge_beta_clipping():
from src.trading.orchestrator import resolve_hedge_beta
beta, _, _ = resolve_hedge_beta(kalman_beta=10.0, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 5.0
beta, _, _ = resolve_hedge_beta(kalman_beta=0.01, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 0.1
def test_resolve_hedge_beta_negative():
from src.trading.orchestrator import resolve_hedge_beta
beta, source, negative = resolve_hedge_beta(kalman_beta=-0.8, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 0.8
assert source == 'kalman'
assert negative is True
def test_position_size_beta_weighted():
abs_beta = 0.5
total = 200.0
base_notional = total / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
assert abs(base_notional - 133.33) < 0.5
assert abs(alt_notional - 66.67) < 0.5
assert abs(base_notional + alt_notional - total) < 0.01
def test_position_size_beta_one():
abs_beta = 1.0
total = 200.0
base_notional = total / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
assert abs(base_notional - 100.0) < 0.01
assert abs(alt_notional - 100.0) < 0.01
# test_regime_detector.py
import numpy as np
def test_regime_detector_stable():
from src.trading.strategy import _IMMRegimeDetector
d = _IMMRegimeDetector()
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
for i in range(10):
s = d.update(key, regime_score=0.05, kalman_P_beta=0.01,
effective_q_beta=1e-5, kline_time=f"2024-01-01T{i*4:02d}:00:00",
soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
assert s.regime == 'stable' and s.regime_score < 0.3
def test_regime_detector_expanding():
from src.trading.strategy import _IMMRegimeDetector
d = _IMMRegimeDetector()
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
for i in range(10):
s = d.update(key, regime_score=0.50, kalman_P_beta=0.05,
effective_q_beta=5e-4, kline_time=f"2024-01-01T{i*4:02d}:00:00",
soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
assert s.regime == 'expanding' and not s.hard_block
assert s.threshold_scale > 1.0
def test_regime_detector_hard_block():
from src.trading.strategy import _IMMRegimeDetector
d = _IMMRegimeDetector()
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
for i in range(10):
s = d.update(key, regime_score=0.85, kalman_P_beta=0.1,
effective_q_beta=3e-3, kline_time=f"2024-01-01T{i*4:02d}:00:00",
soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
assert s.hard_block and s.regime_score > 0.7
def test_regime_detector_warmup():
from src.trading.strategy import _IMMRegimeDetector
d = _IMMRegimeDetector()
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
for i in range(3):
s = d.update(key, regime_score=0.90, kalman_P_beta=0.1,
effective_q_beta=5e-3, kline_time=f"2024-01-01T{i*4:02d}:00:00",
soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
assert s.regime == 'stable' and s.reason == "数据不足"
def test_regime_detector_unobservable():
"""不可观测时保持上一步判定"""
from src.trading.strategy import _IMMRegimeDetector
d = _IMMRegimeDetector()
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
# 先建立 expanding 状态
for i in range(10):
d.update(key, regime_score=0.60, kalman_P_beta=0.05,
effective_q_beta=5e-4, kline_time=f"2024-01-01T{i*4:02d}:00:00",
soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5, observable=True)
# 不可观测
s = d.update(key, regime_score=0.01, kalman_P_beta=0.05,
effective_q_beta=5e-4, kline_time="2024-01-01T40:00:00",
soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5, observable=False)
assert s.regime == 'expanding', "Should keep previous regime when unobservable"
assert not s.observable
# test_systemic_risk.py
def test_systemic_risk_with_confirmation():
"""确认窗口: 需连续 2 次触发才拦截"""
from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
agg = _SystemicRiskAggregator(enabled=True)
states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
0.01, 5e-4 if i < 4 else 1e-5, 1.5 if i < 4 else 1.0, False, "", True
) for i in range(10)}
# 第 1 次触发 → 不拦截
triggered, _ = agg.check(states, ratio_threshold=0.3, weighted_threshold=0.25, confirm_bars=2)
assert not triggered
# 第 2 次触发 → 拦截
triggered, reason = agg.check(states, ratio_threshold=0.3, weighted_threshold=0.25, confirm_bars=2)
assert triggered
assert "连续2次" in reason
def test_systemic_risk_reset_on_recovery():
"""恢复后确认计数归零"""
from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
agg = _SystemicRiskAggregator(enabled=True)
expanding_states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
0.01, 5e-4, 1.5, False, "", True
) for i in range(10)}
stable_states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
'stable', 0.05, 0.01, 1e-5, 1.0, False, "", True
) for i in range(10)}
# 触发 1 次
agg.check(expanding_states, confirm_bars=2)
# 恢复
agg.check(stable_states, confirm_bars=2)
# 再触发 1 次 → 不应拦截(计数已重置)
triggered, _ = agg.check(expanding_states, confirm_bars=2)
assert not triggered
def test_systemic_risk_no_trigger():
from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
agg = _SystemicRiskAggregator(enabled=True)
states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
'expanding' if i < 2 else 'stable', 0.4 if i < 2 else 0.05,
0.01, 3e-4 if i < 2 else 1e-5, 1.5 if i < 2 else 1.0, False, "", True
) for i in range(10)}
assert not agg.check(states, ratio_threshold=0.3, weighted_threshold=0.25, confirm_bars=2)[0]
集成验证
- 回测:在 β 飙升的历史区间回测,验证 regime_score 的检测延迟;对比等额 vs β 加权的 PnL 差异;对比 v2(BOCPD + Q 自适应)vs v3.1(IMM + OU)的检测延迟和假阳性率;对比 Student-t vs 高斯似然在闪崩/插针期间的误拦截率
- 实盘观察:监控 kalman_beta vs OLS β 的偏离度、hedge_beta_source 分布、P_β 走势(应收敛到 OU 稳态)、model_probs 分布、effective_q_beta 差异(MEME >> L1)、per-pair ν 分布、R 在线估计值的走势、observable 标记的频率
- A/B 对比(可选):同时运行等额和 β 加权,对比 hedge 效果和回撤
- 消融实验(推荐):分别关闭各改进组件,量化每项的独立贡献:
- OU vs 随机游走:设
Φ_β=1.0(退化为随机游走)vsΦ_β=0.98,对比 P_β 的长期行为 - Student-t vs 高斯:设
ν=1000(近似高斯)vs per-pair ν - Per-pair ν vs 固定 ν:全部用 ν=5 vs 自动推算
- 在线 R vs 固定 R:设
γ_R=0vsγ_R=0.02 - 无截断 R vs 有截断 R:v3.1 vs v3 原方案(加
max(.,0)),对比 R 在波动率下降期的跟踪速度 - 带宽 TPM vs 均匀 TPM:直接对比
- 可观测性保护 vs 无保护:对比 BTC 横盘期间 regime_score 的稳定性
- 确认窗口 vs 无确认:对比系统性拦截的误触发率
- 概率下限 vs 无下限:设
μ_floor=0vsμ_floor=1e-6
- OU vs 随机游走:设
12. 后续演进
| 优先级 | 方向 | 预期收益 | 依赖 |
|---|---|---|---|
| P0 | 退场保护(β 飙升时收紧止损) | 减少已持仓亏损 | 本方案 |
| P0 | 持仓期间 hedge rebalance(β 偏离阈值时调整两腿比例) | 维持对冲质量 | 本方案 |
| P0 | Kalman 状态持久化到 DB | 消除冷启动窗口 | 本方案 |
| P1 | VB-AKF 联合 Q+R 后验估计(Sarkka 2009, Huang 2017) | 消除 Q 网格和 Robbins-Monro 两个启发式组件;理论最优的噪声参数估计 | 替代本方案的 Q 网格 + R EMA |
| P1 | 负 β 方向翻转逻辑 | 正确处理反向相关配对 | 本方案 beta_negative |
| P1 | x̄ 在线更新(OU 长期均值随 β 缓慢迁移) | 防止 β 真值长期偏离后 OU 回归力变成阻力 | 本方案 |
| P1 | ν 在线自适应(监控残差 kurtosis 动态调整 Student-t 自由度) | 适配市场微结构变化(如波动率聚集导致 kurtosis 时变) | 本方案 per-pair ν |
| P1 | 跨配对层次化估计(同赛道共享 Q 超先验) | 加速冷启动,提高数据稀疏时的估计精度 | 本方案 |
| P2 | Robust Student-t 过程噪声(Agamennoni 2012) | 同时保护观测层和状态转移层的重尾鲁棒性 | 本方案 |
| P2 | BOCPD 补充检测器(监控 IMM 加权 innovation) | 检测非线性断裂(如 β 正→负) | 本方案 |
| P2 | HMM 体制分类(多体制缩放) | 更精细的体制控制 | 本方案 |
| P2 | IMM-ATP 自适应转移概率(在线学习 p_stay) | 消除固定 p_stay=0.98 的假设 | 本方案 |
| P2 | MS-SSM 统一框架(Regime-Switching SSM) | 理论最优 | 替换本方案架构 |
| P3 | 飞书告警(体制切换 + hedge_beta 变化) | 人工监控 | 本方案 |
| P3 | 多频率融合(4h + daily) | 更鲁棒的 β 估计 | 本方案 |
v3.1 vs v3 改进汇总
| 改进项 | v3 原方案 | v3.1 | 理论依据 |
|---|---|---|---|
| 状态方程 | 随机游走 | OU 均值回归 | P_β 有稳态解 P_∞=Q/(1-Φ²) |
| Student-t ν | 固定 ν=5 | Per-pair 从 kurtosis 推算 | 不同币对尾部厚度差异 |
| R 更新 | max(ε²-HPH', 0) 截断 | 无截断 + R_floor | 消除与 Sage-Husa 同性质的正偏差 |
| R 更新权重 | 后验 μ | 预测 μ̃ | 避免似然→R→似然的循环依赖 |
| 可观测性 | 无保护 | |r_btc| < ε 时跳过 β 更新 | 防止不可观测时 P_β 膨胀 |
| 系统性风险 | 单次触发拦截 | 确认窗口(连续 N 次) | 防止单根 K 线噪声停摆 |
| 参数量 | 15 | 17 (+Φ_β, Φ_α, ε_obs) | 均有明确物理含义 |