Kalman Beta 双用途设计方案(v3.1 — IMM + OU 架构版)

Kalman Beta 双用途设计方案(v3.1 — IMM + OU 架构版)

1. 问题背景

当前系统在 Beta 值的使用上存在多个结构性缺陷:

缺陷 1:Beta 估计滞后

OLS BETA_WINDOW=100 固定窗口(≈17天@4h),β 结构性变化后系统需要一周以上才能感知:

T+0h    BTC 回暖,β 从 0.5 开始上升
T+12h   β 实际 2.0,OLS β̂ 仍 ≈ 0.6 → 误判为均值回归机会
T+24h   β 实际 4.0,OLS β̂ ≈ 0.8 → 继续误入场
T+7d    β 实际 8.0,OLS β̂ ≈ 2.0 → 协整假设彻底失效

核心问题:系统用滞后 β 计算的 z4h 产生入场信号,实际上反映的是 β 正在变化,而非 spread 正在回归

缺陷 2:仓位比例未使用 Beta

risk_manager.calculate_position_size() 对两腿使用等额名义价值

# 当前实现(risk_manager.py:164-170)
alt_size = position_usd / alt_price      # 两腿等额
base_size = position_usd / base_price    # 与 β 无关

配对交易的对冲比例应为 alt_notional = β × base_notional。等额开仓意味着:

  • β=0.5 时,Alt 腿过重 → 暴露于 Alt 的非系统性风险
  • β=2.0 时,Alt 腿过轻 → 对冲不足,价差波动被放大
  • β 变化时,无法调整比例 → 持仓期间 hedge 持续偏离

缺陷 3:固定 Q 无法适配不同币对

不同配对的 β 动态特征差异极大:

  • MEME 配对(如 PURR/HYPE):β 波动剧烈,需要更大的过程噪声 Q 来追踪
  • L1 配对(如 ETH/BTC):β 非常稳定,过大的 Q 会引入不必要的噪声

固定 q_β=1e-4 无法同时满足两类配对的需求。

缺陷 4:v2 的 Q 自适应与 BOCPD 信号竞争(v3 新增)

v2 方案中存在一个未被识别的设计矛盾

Q 自适应的目标: 使 normalized innovation → N(0,1)
BOCPD 的输入:    normalized innovation 的分布变化

两者对抗: Q 自适应越好 → innovation 越稳定 → BOCPD 越难检测变点

具体表现:

β 开始飙升
→ innovation 变大
→ Q 自适应增大 Q_β(追踪变化)
→ innovation 被 Q 调整"吸收",逐步回到 N(0,1)
→ BOCPD 看到的变点信号被削弱
→ 检测延迟甚至漏报

v2 通过 κ_up=1.05(每步仅放大 5%)故意放慢 Q 自适应来缓解,但这意味着 Q 自适应的效果也被人为削弱——两个组件互相妥协,都无法达到最优

缺陷 5:v2 的 Q 自适应依赖启发式超参数(v3 新增)

v2 的 Innovation-based Q 自适应(Mehra 1970)本质是带阈值的乘法调节器:

ν̄ > γ_upper(2.0) → Q_β *= κ_up(1.05)
ν̄ < γ_lower(0.3) → Q_β *= κ_down(0.98)

8 个超参数(η, γ_upper, γ_lower, κ_up, κ_down, q_ceil, q_floor, q₀)全部缺乏理论依据,且存在交互效应。加上 BOCPD 的 10 个参数,v2 共有 28 个需要手调的参数

缺陷 6:高斯似然对加密货币重尾分布失效(v3 新增)

v2 的 Kalman Filter 及 BOCPD 均假设 innovation 服从高斯分布。但加密货币收益率是典型的重尾分布(kurtosis 通常 5-20):

高斯分布下 5σ 事件概率:    ≈ 3e-7
Student-t(ν=5) 下 5σ 事件概率: ≈ 1e-3

差距: ~4 个数量级

影响:

  • 模型概率对异常值过度敏感:一个重尾正常值在高斯似然下被判为"极不可能",导致模型概率剧烈跳变
  • Huber clipping 只保护了状态更新,未保护模型概率更新(似然仍用原始 innovation)
  • 模型概率频繁震荡 → regime_score 不稳定 → 误拦截或漏拦截

解决方案:将似然函数替换为 Student-t 分布(Roth et al. 2013; Agamennoni et al. 2012),仅改似然计算,Kalman 更新步骤不变。

缺陷 7:随机游走状态方程导致协方差无界增长(v3.1 新增)

v2/v3 原方案使用纯随机游走状态方程 x_t = x_{t-1} + w_t,导致:

P_β 的递推:
  P_pred = P + Q
  P_post = (1 - KH) × P_pred

当 r_btc ≈ 0(BTC 横盘)时:
  H = [1, ~0] → β 不可观测
  K_β ≈ 0 → P_β 每步增长 Q_β
  → P_β 线性增长,永无稳态

影响:

  • 长期运行后 P_β 漂移:β 估计的不确定性无稳态解,取决于何时启动
  • 低波动期 P_β 膨胀:BTC 横盘数天后 P_β 可能超过降级阈值,导致不必要地回退到 OLS
  • β 估计可漂移至任意值:无均值回归约束,纯随机游走允许 β 无界漂移

解决方案:引入 OU(Ornstein-Uhlenbeck)均值回归状态方程,仅新增 1 个参数 Φ_β,提供 P_β 稳态解 P_∞ = Q/(1-Φ²)

当前系统弱点汇总

组件 问题 影响
analysis_core.py BETA_WINDOW=100 固定窗口 OLS β 估计滞后 ≈17天
risk_manager.py 等额名义价值,未使用 β 对冲比例错误
strategy.py adaptive_threshold=3.0 固定 β 飙升导致持续突破阈值
momentum_filter.py 只检测单腿价格趋势 不检测 spread/beta 体制切换
v2 Q 自适应 启发式阈值 + 与 BOCPD 信号竞争 两个组件互相削弱
v2 Sage-Husa R 仅正值更新导致正偏差 R 长期被高估
v2 高斯似然 加密货币重尾分布下过度敏感 模型概率频繁震荡
v2/v3 随机游走 P_β 无稳态解,低波动期膨胀 长期运行后降级到 OLS

2. 方案概述

核心设计IMM(Interacting Multiple Model)Kalman Filter + OU 均值回归状态方程 估计时变 [α, β],其输出 同时服务两个用途

                   ┌──────────────────────────────────────────────────────┐
                   │  IMMKalmanBetaEstimator (4h)                         │
                   │  输入: r_btc, r_alt (对数收益率)                       │
                   │  特性: M=5 并行模型, OU 状态方程, Student-t 似然,       │
                   │       Per-pair ν, 在线 R 估计, 可观测性保护            │
                   │  输出: beta, P_beta, model_probs, regime_score        │
                   └──────────────┬───────────────────────────────────────┘
                                  │
                  ┌───────────────┼───────────────┐
                  ▼                               ▼
         用途 1: 体制检测                    用途 2: Hedge Ratio
         ┌─────────────────┐            ┌─────────────────────┐
         │ IMM 模型概率判定  │            │ 仓位比例计算          │
         │ P(高Q模型) 阈值   │            │ 输入: beta           │
         │ 输出: regime_score│            │ 输出: alt/base 数量比 │
         └────────┬────────┘            └──────────┬──────────┘
                  │                                │
                  ▼                                ▼
         入场信号过滤                         Beta 加权开仓
         (硬拦截/阈值缩放)              (alt_notional = β × base_notional)
                  │
                  ▼
         跨配对系统性风险聚合
         (加权 regime_score > 阈值 → 全局拦截)

v3.1 相比 v2 的核心算法升级

组件 v2 v3.1 提升
β 估计 单一 Kalman + 启发式 Q 自适应 IMM 多模型融合 消除 Q 自适应的 8 个启发式超参;贝叶斯最优模型选择
状态方程 随机游走 x_t = x_{t-1} + w_t OU 均值回归 x_t = Φx_{t-1} + (I-Φ)x̄ + w_t P_β 有稳态解;β 长期运行不漂移
体制检测 BOCPD(与 Q 自适应信号竞争) IMM 模型概率(体制信号内生) 消除信号竞争;减少 10 个 BOCPD 参数
似然函数 高斯(对重尾过敏) Student-t(per-pair ν 自适应初始化) 模型概率对加密货币异常值鲁棒;不同币对自动适配
R 估计 Sage-Husa(正偏差) Robbins-Monro EMA(γ=0.02,无截断偏差) 消除正偏差;在线追踪观测噪声变化
转移概率 带宽 TPM(相邻模型更可能跳转) 符合 β 渐变的物理特征
可观测性 无保护 r_btc ≈ 0 时跳过 β 更新 防止低波动期 P_β 膨胀和 regime_score 失真
系统性风险 计数比例 OR 加权概率(阈值耦合) 双阈值独立 + 确认窗口 消除魔术数字;防止单根 K 线噪声导致全局停摆
负 β 处理 返回符号信息 不变
总参数量 28 个(18 Kalman + 10 BOCPD) 17 个 减少 39%(均有理论依据)

为什么 IMM 能同时解决 Q 自适应和体制检测

IMM 的关键洞察:不需要先"猜对" Q 再检测变点,而是让多个 Q 值并行竞争,贝叶斯概率自动选出最优

MEME 配对(β 波动大):
  高Q模型(1e-3, 1e-2) 持续获得高概率 → effective_Q 自动偏大
  → 追踪快,且 regime_score 高 → 自然提高入场门槛

L1 配对(β 稳定):
  低Q模型(1e-6, 1e-5) 持续获得高概率 → effective_Q 自动偏小
  → 不追噪声,且 regime_score 低 → 正常入场

β 突变时:
  高Q模型的 likelihood 瞬间飙升 → 模型概率快速切换
  → regime_score 立即升高 → 拦截入场
  → 同时 β 估计通过概率加权自动跟上新值

与 v2 的根本区别:v2 中 Q 自适应和变点检测是两个独立系统在抢同一个信号;v3.1 中它们是同一个贝叶斯推断的两个输出,不存在竞争。

v3.1 的附加改进(相比 v3 原方案)

  • OU 均值回归状态方程:P_β 有稳态解 P_∞ = Q/(1-Φ²),消除长期协方差漂移
  • Per-pair Student-t ν 初始化:基于历史残差 kurtosis 自动设定,不同币对自动适配尾部厚度
  • Robbins-Monro 无截断偏差:去掉 max(., 0) 截断,仅用 R_floor 保护正定,消除与 Sage-Husa 相同性质的正偏差
  • 可观测性保护|r_btc| < ε_obs 时跳过 β 更新,防止 regime_score 在低信息量时失真
  • 系统性风险确认窗口:连续 N 根 K 线触发才执行全局拦截,避免单根 K 线噪声导致停摆
  • 模型概率下限保护μ_floor=1e-6 防止长期运行后某模型概率下溢到 0

为什么 4h 频率足够

β 是两个资产之间的结构性关系(同赛道、资金流向、基本面联动),变化周期在天~周级别。

方法 响应 β 结构性变化 抗日内噪声 适用场景
OLS 100×4h ~7-10 天 当前系统(过于滞后)
4h IMM Kalman 1-4 根K线(4-16h) 体制检测 + Hedge Ratio
5m Kalman ~15-30 分钟 弱(追噪声) 不适用于 β 估计

IMM 的响应速度优于 v2 的单一自适应 Kalman:β 突变时,高Q 模型立即拟合新关系,概率权重瞬间切换,无需等 Q 自适应的逐步放大过程。

四层架构

组件 功能 v3.1 改进
第一层 IMM Kalman Filter M 个并行 Kalman 估计 [α, β],OU 状态方程 + Student-t 似然 + 在线 R + 带宽 TPM + 可观测性保护 + 贝叶斯模型概率融合 替代 v2 单 Kalman + 启发式 Q 自适应 + Sage-Husa R;新增 OU 过程和可观测性保护
第二层 IMM 体制检测 P(高Q模型) 作为体制指标 → 入场信号过滤 替代 v2 BOCPD,无信号竞争
第三层 跨配对系统性风险聚合 统计所有配对体制状态 → 系统性事件拦截 双阈值独立设定 + 确认窗口
第四层 Beta 加权仓位计算 kalman_beta 计算两腿名义比例 负 β 方向感知(与 v2 相同)

3. 算法选型论证

3.1 为什么选 Kalman Filter(而非其他时变回归方法)

方法 优势 劣势 适用性评估
向量 Kalman Filter 最优线性估计(MMSE);递推 O(1);P_β 提供不确定性度量 假设线性高斯;单一 Q 不自适应 最优基础组件(配合 IMM 解决 Q 问题)
RLS (递推最小二乘) 实现简单;forgetting factor 天然自适应 无状态不确定性度量;不支持多模型融合 不适合(缺少概率信息)
粒子滤波 处理非线性/非高斯 O(N_particles) 计算量大;参数多 过度设计(β 的线性关系足够)
Online Bayesian LR 完全贝叶斯;自然输出后验 不支持时变参数(除非加 forgetting) 不如 Kalman 灵活
神经网络 (LSTM/Transformer) 可捕获非线性 需大量训练数据;不可解释 不适合生产系统

结论:Kalman Filter 是时变线性回归的理论最优解(Kalman 1960)。单一 Kalman 的 Q 固定问题通过 IMM 多模型融合解决。

3.2 为什么选 IMM 做 Q 自适应(而非 v2 的 Mehra 启发式)

方法 优势 劣势 适用性评估
IMM (Blom & Bar-Shalom 1988) 贝叶斯最优模型选择;无启发式阈值;模型概率直接指示体制;响应 β 突变仅需 1-2 步 M 倍计算量(M=5 对 2D 状态可忽略);需设定 Q 网格 最优选择
Innovation-based (Mehra 1970) 经典方法;实现简单 8 个启发式超参;与变点检测信号竞争;响应速度受 κ 限制 v2 方案(已被 v3 替代)
VB-AKF (Sarkka 2009) 联合估计 Q 和 R 的后验;数学严格 实现复杂;变分近似引入额外误差;不直接输出体制信号 P1 演进方向
多模型 GPB1 (简化 IMM) 比 IMM 简单(无混合步) 模型切换后状态不连续 不如 IMM 稳定
IMM-ATP (自适应转移概率) 在线学习 p_stay 额外复杂度;收益有限 P2 增强选项

IMM 的核心优势

  1. Q 网格直观[1e-6, 1e-5, 1e-4, 1e-3, 1e-2] 覆盖"β几乎不变"到"β剧烈变化"的全频谱,每个值的物理含义清晰
  2. 无阈值调参:贝叶斯后验自动选择最优模型,不需要 γ_upper/γ_lower/κ_up/κ_down
  3. 内生体制信号P(高Q模型) 天然表示"β 是否正在快速变化",无需额外的变点检测器
  4. 瞬时响应:β 突变时高Q模型的 likelihood 立即飙升,模型概率 1-2 步内切换完成;不像 Mehra 方法受 κ=1.05 限制需数十步
  5. 对 2D 状态计算量可忽略:5 个 2×2 Kalman Filter 的总计算量 ≈ 100 次浮点运算
  6. 与 Student-t 天然兼容:只需替换似然函数,Kalman 更新步骤不变;重尾鲁棒性对加密市场至关重要
  7. 在线 R 估计平滑化:IMM 概率加权的 innovation 比单模型的更稳定,Robbins-Monro 估计更准确

3.3 为什么 IMM 模型概率可替代 BOCPD

方法 优势 劣势 适用性评估
IMM 模型概率 信号内生(无竞争);校准良好的贝叶斯后验;0 额外参数 仅检测 Q 维度的变化(无法检测非线性断裂) v3.1 最优选择
BOCPD (Adams & MacKay 2007) 通用在线变点检测;输出 run length 后验 与 Q 自适应信号竞争;10 个额外参数 v2 方案(已被 v3 替代)
CUSUM 极简 无概率输出;阈值需手调 过于简单
HMM 多体制建模 体制数需预设;EM 需离线 P2 演进方向

为什么 IMM 模型概率是更好的体制指标

BOCPD 监控 innovation 分布变化来"推断"β 是否在变——这是一个间接信号。IMM 的模型概率直接回答"β 的变化速度属于哪个量级"——这是一个直接信号

BOCPD 路径:  innovation 变大 → 推断分布变了 → 推断β在变 → 变点概率升高
IMM 路径:    高Q模型 likelihood 升高 → 模型概率切换 → β变化速度指标升高

BOCPD 需要额外假设(NIG 先验、hazard rate),IMM 不需要。

保留 BOCPD 作为 P2 选项:当需要检测 β 的非线性断裂(如突然从正相关变为负相关,不仅是变化速度加快)时,BOCPD 可作为补充检测器。此时建议监控 IMM 的加权 innovation(而非 normalized innovation),以避免信号竞争。

3.4 为什么用 Student-t 似然(而非高斯)

方法 优势 劣势 适用性评估
Student-t 似然 (Roth 2013, Agamennoni 2012) 重尾鲁棒;5σ 事件概率 ~1e-3(vs 高斯 3e-7);模型概率更新平滑 ν 参数需设定 v3.1 最优选择
高斯似然 经典 Kalman 标准;实现最简 加密货币 kurtosis 5-20,异常值导致模型概率剧烈震荡 v2(不适合加密市场)
Huber 似然 (Huber 1964) 对异常值鲁棒 非概率模型,不输出校准的似然值,无法用于模型概率更新 不适合 IMM
混合高斯 可建模重尾 参数多(每个混合成分需 μ, σ, π) 过度设计

Student-t 的关键优势

Kalman Filter 的状态更新步骤(predict + update)不需要改动,只需将模型概率更新中的似然函数从高斯替换为 Student-t:

高斯:     L_j = N(ε; 0, S) = (2πS)^{-0.5} exp(-ε²/2S)
Student-t: L_j = t_ν(ε; 0, S) ∝ (1 + ε²/(νS))^{-(ν+1)/2}

v3.1 Per-pair ν 初始化(替代固定 ν=5):

不同币对的尾部厚度差异显著(ETH/BTC kurtosis ≈ 5-8 vs MEME kurtosis ≈ 15-30),固定 ν=5 不够精细。v3.1 从 OLS 初始化阶段的残差 kurtosis 自动推算 ν:

ν_init = max(3.0, 6.0 / (kurtosis/3 - 1) + 2)

示例:
  ETH/BTC (kurtosis ≈ 6):  ν = 6/(6/3-1) + 2 = 8.0 → 接近高斯(合理)
  MEME    (kurtosis ≈ 15): ν = 6/(15/3-1) + 2 = 3.5 → 重尾保护强
  中等    (kurtosis ≈ 9):  ν = 6/(9/3-1) + 2 = 5.0 → 与 v3 默认一致

上下限保护: ν = clip(ν_init, 3.0, 30.0)
  ν < 3: 方差不存在(Student-t 性质)
  ν > 30: 趋近高斯,可直接用高斯

实现成本极低(初始化时 1 行计算),但对异构币对提供自动适配。

3.5 为什么用 Robbins-Monro 在线 R 估计(而非固定 R 或 Sage-Husa)

方法 优势 劣势 适用性评估
Robbins-Monro EMA(v3.1 无截断版) 无偏;实现极简(1行);追踪 R 时变 步长 γ 需手设 v3.1 最优选择
固定 R 最简;IMM Q 选择部分吸收 R 变化 观测噪声实际时变(日间 vs 周末,牛市 vs 熊市) v3 初版(已升级)
Sage-Husa (1969) 经典自适应方法 max(., 0) 截断 → R 长期正偏差 v2(已弃用)
Robbins-Monro EMA(v3 原版,含 max(.,0) 偏差比 Sage-Husa 小 max(.,0) 截断仍引入同性质正偏差(只是程度较轻) v3 原方案(v3.1 修复)
VB-AKF (Sarkka 2009) 联合估计 Q 和 R 的后验;数学严格 实现复杂;变分近似引入额外误差 P1 演进方向

v3.1 修复:去掉 max(., 0) 截断

v3 原方案的 R 更新公式:

R_t = (1-γ) × R_{t-1} + γ × max(ε_t² - H_t P_pred H_t', 0)    ← 有正偏差

max(., 0) 使得 R 只能被增大、不能被减小(当 ε² < HPH' 时更新被截断为 0)。这与文档对 Sage-Husa 的批评("仅正值更新导致正偏差")构成逻辑矛盾

v3.1 修正为:

R_update = ε_t² - H_t P_pred H_t'           ← 允许负值
R_t = (1-γ) × R_{t-1} + γ × R_update        ← 无截断
R_t = max(R_t, R_floor)                      ← 仅 floor 保护正定

R_floor = 1e-8 已足够保证 R > 0(正定性),不需要逐步截断。ε² < HPH' 是正常统计波动(约 50% 的时间会发生),截断这些事件会系统性高估 R。

3.6 为什么用 OU 状态方程(而非随机游走)

方法 P_β 稳态解 β 漂移控制 参数量 适用性
OU 过程 x_t = Φx_{t-1} + (I-Φ)x̄ + w_t P_∞ = Q/(1-Φ²) ✅ 向 x̄ 回归 +1 (Φ_β) v3.1 最优
随机游走 x_t = x_{t-1} + w_t ❌ 无稳态 ❌ 无约束漂移 0 v3 原方案
固定参数 x_t = x_0 完全不追踪 完全不漂移 0 OLS(过于保守)

OU 过程的物理意义

β 描述两个加密货币之间的结构性关系。虽然 β 会随市场环境变化(时变),但它有回归倾向——两个同赛道币的关系不会无限偏离初始值。OU 过程精确建模了这一特征:

Φ_β = 0.98 的含义:
  - 每步 β 向 β̄(长期均值)回归 2%
  - 特征时间尺度 τ = -1/ln(Φ) ≈ 50 步 ≈ 8 天
  - 与 p_stay=0.98(模型持续时间)一致

P_β 稳态解:
  对 Q_β=1e-4, Φ_β=0.98:
    P_∞ = 1e-4 / (1 - 0.98²) = 1e-4 / 0.0396 ≈ 2.5e-3
  → P_β 永远不会超过此值(即使长期运行/低波动期)
  → 不会不必要地降级到 OLS

对 IMM 的影响:每个模型的 Q_β^(j) 不同,因此每个模型的 P_∞^(j) 不同——高Q模型天然有更大的稳态不确定性,低Q模型有更小的。这与 IMM 的设计意图完全一致。

3.7 为什么不用统一框架(Regime-Switching SSM)

Regime-Switching State Space Model (Kim 1994) 将 Kalman + 体制检测合二为一:

  • 优势:参数更少、理论更优雅
  • 劣势:Kim's approximation 有误差累积;实现复杂度高;调试困难

当前选择:IMM + 阈值体制判定。原因:

  1. IMM 已覆盖 Q 自适应 + 体制检测两个需求
  2. 实现简单(标准 IMM 教科书代码)
  3. 可独立调试每个模型的行为
  4. MS-SSM / HMM 作为 P2 演进方向保留

4. 详细设计

4.1 IMM Kalman Filter 时变 [α, β] 估计

数学模型

M 个并行状态空间模型(每个模型 j 使用不同的过程噪声 Q_β^(j)),共享 OU 均值回归结构

状态方程(OU 过程,所有模型共享 Φ,Q_β 不同):
  x_t = Φ × x_{t-1} + (I - Φ) × x̄ + w_t,    w_t ~ N(0, Q^(j))

  其中 x_t = [α_t, β_t]'
       x̄ = [α_init, β_init]'    OLS 初始值作为长期均值
       Φ = [[Φ_α,  0  ],         对角回归矩阵
            [ 0,  Φ_β]]
       Q^(j) = [[q_α,     0    ],     q_α 所有模型共享
                 [  0,   q_β^(j)]]     q_β^(j) 各模型不同

  Φ_α = 0.995 → α 回归极慢(特征时间 ~200步 ≈ 33天)
  Φ_β = 0.98  → β 回归适中(特征时间 ~50步 ≈ 8天)

观测方程(所有模型共享):
  r_alt_t = H_t × x_t + v_t,    v_t ~ N(0, R)

  其中 H_t = [1, r_btc_t]
       r_alt_t = log(alt_t / alt_{t-1})
       r_btc_t = log(btc_t / btc_{t-1})
       R 初始化自 OLS 残差方差,在线 Robbins-Monro EMA 更新

OU 过程的 P_β 稳态解(推导):

稳态条件: P_∞ = Φ P_∞ Φ' + Q  (忽略观测更新,仅考虑预测步)

对角情形:
  P_∞_β = Φ_β² × P_∞_β + Q_β
  → P_∞_β = Q_β / (1 - Φ_β²)

各模型的稳态 P_β:
  模型 0 (Q_β=1e-6): P_∞ = 1e-6 / 0.0396 ≈ 2.5e-5
  模型 1 (Q_β=1e-5): P_∞ = 1e-5 / 0.0396 ≈ 2.5e-4
  模型 2 (Q_β=1e-4): P_∞ = 1e-4 / 0.0396 ≈ 2.5e-3
  模型 3 (Q_β=1e-3): P_∞ = 1e-3 / 0.0396 ≈ 2.5e-2
  模型 4 (Q_β=1e-2): P_∞ = 1e-2 / 0.0396 ≈ 2.5e-1

观测更新会进一步减小 P_β → 实际值更低。
关键: 融合后的 P_β = Σ μ_j [P_β^(j) + (β^(j) - β̄)²] 也有稳态,不会无界增长。

Q_β 网格设计

模型 j Q_β^(j) 物理含义 P_∞_β 日 β 漂移 σ
0 1e-6 β 几乎不变 2.5e-5 ~0.002
1 1e-5 β 缓慢变化 2.5e-4 ~0.008
2 1e-4 β 正常变化(先验中心) 2.5e-3 ~0.024
3 1e-3 β 快速变化 2.5e-2 ~0.077
4 1e-2 β 剧烈变化 / 结构性断裂 2.5e-1 ~0.245

覆盖 4 个数量级,对数均匀分布,覆盖从 ETH/BTC 级稳定到 MEME 级波动的全部场景。

IMM 递推算法

输入: r_btc_t, r_alt_t, 上一步状态 {x^(j), P^(j), μ_j} for j=0..M-1

Step 0: 可观测性检查
  若 |r_btc_t| < ε_obs (默认 1e-6):
    → 跳过 β 更新: 仅用 H=[1, 0] 更新 α
    → 标记 regime_score 为上一步值(不更新模型概率)
    → 原因: r_btc≈0 时 β 不可观测,强行更新会
            使 P_β 膨胀、regime_score 失真

Step 1: 混合(Interaction)
  预测模型概率:
    μ̃_j = Σ_i π_{ij} × μ_i                           (M×1)
    其中 π_{ij} = P(模型j在t | 模型i在t-1)

  混合权重:
    w_{i|j} = π_{ij} × μ_i / μ̃_j                      (M×M)

  混合状态(为每个模型j准备初始条件):
    x̃^(j) = Σ_i w_{i|j} × x^(i)                       (2×1)
    P̃^(j) = Σ_i w_{i|j} × [P^(i) + (x^(i) - x̃^(j))(x^(i) - x̃^(j))']   (2×2)

Step 2: 并行滤波(每个模型 j 独立运行 OU Kalman)
  OU 预测(关键改动: 随机游走 → OU):
    x_pred^(j) = Φ × x̃^(j) + (I - Φ) × x̄
    P_pred^(j) = Φ × P̃^(j) × Φ' + Q^(j)

  Innovation:
    ε^(j) = r_alt_t - H_t × x_pred^(j)
    S^(j) = H_t × P_pred^(j) × H_t' + R
    S^(j) = max(S^(j), 1e-15)

  Huber Clipping (c=3.0):
    ε̃^(j) = clip(ε^(j), -c×√S^(j), +c×√S^(j))

  Kalman 增益 + 更新:
    K^(j) = P_pred^(j) × H_t' / S^(j)
    x^(j) = x_pred^(j) + K^(j) × ε̃^(j)

  Joseph 稳定形式:
    A^(j) = I - K^(j) × H_t
    P^(j) = A^(j) × P_pred^(j) × A^(j)' + R × K^(j) × K^(j)'

  似然(Student-t,用原始 innovation,非截断后的):
    L_j = Γ((ν+1)/2) / (Γ(ν/2) × √(νπS^(j))) × (1 + ε^(j)²/(ν×S^(j)))^{-(ν+1)/2}
    其中 ν = Student-t 自由度(per-pair 初始化,默认 5)

Step 3: 模型概率更新
  μ_j = μ̃_j × L_j / Σ_k (μ̃_k × L_k)                (归一化)
  μ_j = max(μ_j, μ_floor)                              (下限保护,μ_floor=1e-6)
  归一化: μ_j /= Σ_k μ_k

Step 4: 在线 R 更新(Robbins-Monro EMA,v3.1 无截断版)
  若 γ_R > 0:
    weighted_innov_sq = Σ_j μ̃_j × ε^(j)²              (注: 用预测概率 μ̃,避免循环依赖)
    weighted_HP_H = Σ_j μ̃_j × H P_pred^(j) H'
    R_update = weighted_innov_sq - weighted_HP_H        (允许负值,无 max(.,0) 截断)
    R = (1-γ_R) × R + γ_R × R_update
    R = max(R, R_floor)                                 (仅 floor 保护正定)
    同步到所有 M 个模型

Step 5: 融合输出
  β = Σ_j μ_j × β^(j)                                 (概率加权)
  α = Σ_j μ_j × α^(j)
  P_β = Σ_j μ_j × [P_β^(j) + (β^(j) - β)²]           (含模型间方差)
  regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j            (高Q模型总概率)
  effective_q_β = Σ_j μ_j × q_β^(j)                    (有效Q值)
  innovation = Σ_j μ_j × ε^(j)                          (概率加权)

v3.1 Step 4 改进要点

  1. 无截断R_update 允许为负值,消除 Sage-Husa 同性质的正偏差
  2. 用预测概率 μ̃:R 更新使用 Step 1 的 μ̃_j(而非 Step 3 更新后的 μ_j),避免似然→R→似然的循环依赖
  3. R_floor 兜底R_floor=1e-8 保证正定性,是充分的下限保护

模型转移概率矩阵 Π(带宽 TPM)

Π[i,j] = { p_stay                                if i == j    (对角线: 0.98)
          { (1-p_stay) × exp(-|i-j|) / Z_i        if i ≠ j    (带宽衰减)

其中 Z_i = Σ_{j≠i} exp(-|i-j|)    (逐行归一化常数)

相比均匀非对角线的改进

均匀 TPM 假设"从 Q=1e-6 跳到 Q=1e-2 和跳到 Q=1e-5 的概率相同"——不合理。实际中 β 变化速度通常渐变(先加速再进入快速变化),带宽 TPM 使相邻 Q 模型更可能互相跳转:

M=5 示例(中间行 i=2 的非对角权重):
  j=0 (|i-j|=2): exp(-2) ≈ 0.14  → 概率 0.003
  j=1 (|i-j|=1): exp(-1) ≈ 0.37  → 概率 0.008
  j=3 (|i-j|=1): exp(-1) ≈ 0.37  → 概率 0.008
  j=4 (|i-j|=2): exp(-2) ≈ 0.14  → 概率 0.003

对比均匀: 每个非对角 = 0.005

p_stay=0.98 含义:模型在 4h 尺度上 98% 概率保持不变,2% 概率切换到其他模型。对应 β 变化速度的平均持续时间 ≈ 50 根 K线(~8天),与 Φ_β=0.98 的特征时间一致。带宽衰减使跳转更倾向相邻模型,对 β 突变仍可响应(只是跳到远端模型的概率更低,但 likelihood 主导时仍会快速切换)。

关键输出信号及其用途

信号 用途 1: 体制检测 用途 2: Hedge Ratio
beta (Σ μ_j × β^(j)) β 趋势监控 直接作为 hedge ratio
alpha (Σ μ_j × α^(j)) 吸收独立漂移,避免污染 β
P_beta (含模型间方差) 不确定性度量(OU 过程保证有稳态) hedge ratio 置信度(P_β 过大时降级为 OLS β)
regime_score (Σ 高Q模型概率) 直接用于体制检测
model_probs (μ 向量) 诊断:哪个 Q 模型主导
effective_q_beta (Σ μ_j × q_β^(j)) 诊断:当前有效 Q_β
observable (bool) r_btc ≈ 0 时 regime_score 不可靠

参数

参数 符号 默认值 含义 调优
Q_β 网格 q_β_grid [1e-6, 1e-5, 1e-4, 1e-3, 1e-2] M=5 个模型的 β 过程噪声 对数均匀,覆盖全频谱
α 过程噪声 q_α 1e-5 α 每 4h 变化方差(所有模型共享) 通常不调
β 回归系数 Φ_β 0.98 OU 过程均值回归速率(特征时间 ~8天) ↑更保守 ↓追踪更快
α 回归系数 Φ_α 0.995 α 回归速率(特征时间 ~33天) 通常不调
观测噪声初始值 R_init OLS 残差方差 初始 R,后续在线更新 初始化
R 下限 R_floor 1e-8 防止 R 退化为零
R 学习率 γ_R 0.02 Robbins-Monro EMA 步长(0=禁用在线R) ↓平滑 ↑响应
Innovation 截断 c 3.0 Huber 截断(σ 倍数) ↓鲁棒 ↑灵敏
Student-t 自由度 ν per-pair (默认 5.0) 似然函数重尾度(从 OLS 残差 kurtosis 自动推算) 3-30 范围
可观测性阈值 ε_obs 1e-6 |r_btc| < 此值时跳过 β 更新
初始 [α, β] / 长期均值 x̄ OLS 估计 所有模型共享初始值和 OU 回归目标
初始 P P₀ diag(0.1, 1.0) 初始不确定性
模型持久性 p_stay 0.98 带宽 TPM 对角线 ↑模型切换更保守
高Q阈值 q_high 1e-3 Q_β ≥ 此值归入"高Q"组
概率下限 μ_floor 1e-6 防止模型概率下溢到 0

总计 17 个核心参数(vs v2 的 28 个,减少 39%)。相比 v3 原方案(15 个)新增 Φ_β、Φ_α、ε_obs,均有明确物理含义和理论依据。

参数间的一致性约束

  • q_α = q_β_grid[2]/10 → α 变化比中位 β 慢一个量级
  • p_stay = Φ_β = 0.98 → 模型持续时间和 β 特征时间对齐(~8天)
  • Φ_α = 0.995 → α 回归比 β 更慢,允许独立漂移被缓慢吸收
  • ν 从 OLS 残差 kurtosis 自动推算,覆盖 3-30 范围
  • γ_R = 0.02 → R 有效窗口 ≈50 步,与 β 特征时间匹配

文件:src/config.py

# ═══ IMM Kalman Filter 参数(v3.1)═══
IMM_Q_BETA_GRID: list[float] = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
IMM_Q_ALPHA: float = 1e-5
IMM_P0_ALPHA: float = 0.1
IMM_P0_BETA: float = 1.0
IMM_R_FLOOR: float = 1e-8
IMM_R_GAMMA: float = 0.02       # Robbins-Monro R 学习率(0=禁用在线R估计)
IMM_CLIP_SIGMA: float = 3.0
IMM_STUDENT_T_NU: float = 5.0   # Student-t 默认自由度(per-pair 初始化时覆盖)
IMM_TRANSITION_PROB: float = 0.98
IMM_HIGH_Q_THRESHOLD: float = 1e-3
IMM_PHI_BETA: float = 0.98      # OU 均值回归系数(β)
IMM_PHI_ALPHA: float = 0.995    # OU 均值回归系数(α)
IMM_OBS_THRESHOLD: float = 1e-6 # 可观测性阈值(|r_btc| < 此值跳过 β 更新)
IMM_MU_FLOOR: float = 1e-6      # 模型概率下限

# ═══ Hedge Ratio 参数 ═══
HEDGE_BETA_MIN: float = 0.1     # β 下限保护
HEDGE_BETA_MAX: float = 5.0     # β 上限保护
HEDGE_BETA_P_MAX: float = 0.5   # P_β 超过此值时降级为 OLS β

文件:src/utils/analysis/analysis_core.py

新增类_KalmanModel(单一 Kalman 模型,IMM 的内部组件)

import numpy as np
from scipy.special import gammaln
from scipy.stats import kurtosis as sp_kurtosis


def _estimate_student_t_nu(residuals: np.ndarray, default: float = 5.0) -> float:
    """从 OLS 残差的 kurtosis 估计 Student-t 自由度 ν

    公式: ν = max(3, 6 / (kurtosis/3 - 1) + 2)
    其中 kurtosis 使用 excess kurtosis(正态分布为 0)
    """
    if len(residuals) < 20:
        return default
    kurt = float(sp_kurtosis(residuals, fisher=True))  # excess kurtosis
    if kurt <= 0:
        return min(default, 30.0)  # 轻尾或正态 → 用较大 ν
    nu = 6.0 / (kurt / 3.0) + 2.0  # 简化: 6/(excess_kurt/3) + 2
    return float(np.clip(nu, 3.0, 30.0))


class _KalmanModel:
    """单一 Kalman Filter 模型(IMM 的内部组件)

    2D 状态 [α, β],OU 均值回归 + 固定 Q + 共享 R。
    Student-t 似然 + Joseph 稳定形式 + Huber clipping。
    """

    def __init__(
        self,
        x: np.ndarray,           # [α, β] 初始状态
        P: np.ndarray,           # 2×2 初始协方差
        q_alpha: float,
        q_beta: float,
        R: float,
        x_bar: np.ndarray,       # OU 长期均值 [ᾱ, β̄]
        phi: np.ndarray,         # OU 回归系数 [Φ_α, Φ_β]
        r_floor: float = 1e-8,
        clip_sigma: float = 3.0,
        nu: float = 5.0,        # Student-t 自由度
    ):
        self.x = x.copy()
        self.P = P.copy()
        self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
        self.R = max(R, r_floor)
        self._x_bar = x_bar.copy()
        self._phi = np.diag(phi).astype(np.float64)    # 2×2 对角矩阵
        self._I_phi = np.eye(2) - self._phi             # I - Φ
        self._r_floor = r_floor
        self._clip_sigma = clip_sigma
        self._nu = nu

        # Student-t 常数预计算
        self._log_t_const = (
            gammaln((nu + 1) / 2) - gammaln(nu / 2)
            - 0.5 * np.log(nu * np.pi)
        )

    def predict(self):
        """OU 预测步(从混合后的状态出发)"""
        self.x = self._phi @ self.x + self._I_phi @ self._x_bar
        self.P = self._phi @ self.P @ self._phi.T + self.Q

    def update(self, r_btc: float, r_alt: float) -> dict:
        """Kalman 观测更新,返回滤波结果和 Student-t 似然

        注意: 调用前必须先调用 predict()(IMM 流程中由外层控制)
        """
        H = np.array([1.0, r_btc], dtype=np.float64)

        # Innovation
        innovation = r_alt - H @ self.x
        HP_H = float(H @ self.P @ H)
        S = HP_H + self.R
        S = max(S, 1e-15)

        # Student-t 似然(用原始 innovation,对重尾鲁棒)
        log_likelihood = (
            self._log_t_const
            - 0.5 * np.log(S)
            - ((self._nu + 1) / 2) * np.log(1 + innovation ** 2 / (self._nu * S))
        )

        # Huber clipping(保护状态更新,不影响似然)
        sqrt_S = S ** 0.5
        norm_innov = innovation / sqrt_S
        clipped = False
        innovation_for_update = innovation
        if abs(norm_innov) > self._clip_sigma:
            innovation_for_update = self._clip_sigma * sqrt_S * (
                1.0 if innovation > 0 else -1.0
            )
            clipped = True

        # Kalman 增益 + 更新
        K = (self.P @ H) / S
        self.x = self.x + K * innovation_for_update

        # Joseph 稳定形式
        I2 = np.eye(2)
        A = I2 - np.outer(K, H)
        self.P = A @ self.P @ A.T + self.R * np.outer(K, K)

        return {
            'alpha': float(self.x[0]),
            'beta': float(self.x[1]),
            'P_beta': float(self.P[1, 1]),
            'P_alpha': float(self.P[0, 0]),
            'innovation': innovation,
            'normalized_innovation': norm_innov,
            'S': S,
            'HP_H': HP_H,
            'log_likelihood': log_likelihood,
            'clipped': clipped,
            'kalman_gain_beta': float(K[1]),
        }

新增类IMMKalmanBetaEstimator(IMM 多模型融合估计器)

class IMMKalmanBetaEstimator:
    """Interacting Multiple Model Kalman Filter 时变 [α, β] 联合估计器

    核心思想 (Blom & Bar-Shalom, 1988):
        M 个 Kalman Filter 并行运行,每个使用不同的 Q_β,
        通过贝叶斯模型概率实时加权融合。

    v3.1 改进(相比 v3 原方案):
        1. OU 均值回归状态方程: x_t = Φx_{t-1} + (I-Φ)x̄ + w_t
           → P_β 有稳态解 P_∞ = Q/(1-Φ²),消除长期协方差漂移
        2. Per-pair Student-t ν: 从 OLS 残差 kurtosis 自动推算
           → 不同币对自动适配尾部厚度
        3. Robbins-Monro 无截断: 去掉 max(.,0),消除正偏差
           → R 更新使用预测概率 μ̃ 避免循环依赖
        4. 可观测性保护: |r_btc| < ε_obs 时跳过 β 更新
           → 防止低波动期 P_β 膨胀和 regime_score 失真

    双用途输出:
        - beta → hedge ratio(直接使用概率加权 β)
        - regime_score → 体制检测(高Q模型的总概率)

    每根新 4h K线调用 update() 一次。
    """

    def __init__(
        self,
        alpha_init: float,
        beta_init: float,
        q_beta_grid: list[float] | None = None,
        q_alpha: float = 1e-5,
        r_init: float = 1e-2,
        p0_alpha: float = 0.1,
        p0_beta: float = 1.0,
        r_floor: float = 1e-8,
        r_gamma: float = 0.02,
        clip_sigma: float = 3.0,
        nu: float = 5.0,
        transition_prob: float = 0.98,
        high_q_threshold: float = 1e-3,
        phi_beta: float = 0.98,
        phi_alpha: float = 0.995,
        obs_threshold: float = 1e-6,
        mu_floor: float = 1e-6,
    ):
        if q_beta_grid is None:
            q_beta_grid = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]

        self.M = len(q_beta_grid)
        self._q_beta_grid = list(q_beta_grid)
        self._high_q_threshold = high_q_threshold
        self._r_gamma = r_gamma
        self._r_floor = r_floor
        self._nu = nu
        self._clip_sigma = clip_sigma
        self._obs_threshold = obs_threshold
        self._mu_floor = mu_floor
        self._n_updates = 0
        self._last_observable = True

        # OU 参数
        x_bar = np.array([alpha_init, beta_init], dtype=np.float64)
        phi = np.array([phi_alpha, phi_beta], dtype=np.float64)
        self._x_bar = x_bar
        self._phi_vec = phi

        # 初始化 M 个 Kalman 模型
        x0 = np.array([alpha_init, beta_init], dtype=np.float64)
        P0 = np.diag([p0_alpha, p0_beta]).astype(np.float64)
        self._models = [
            _KalmanModel(x0, P0, q_alpha, q_b, r_init, x_bar, phi,
                         r_floor, clip_sigma, nu)
            for q_b in q_beta_grid
        ]

        # 模型概率(均匀初始化)
        self._mu = np.ones(self.M) / self.M

        # 带宽转移概率矩阵(相邻模型跳转概率更高)
        self._TPM = np.zeros((self.M, self.M))
        for i in range(self.M):
            weights = np.array([
                np.exp(-abs(i - j)) if i != j else 0.0
                for j in range(self.M)
            ])
            w_sum = weights.sum()
            if w_sum > 0:
                self._TPM[i] = weights / w_sum * (1.0 - transition_prob)
            self._TPM[i, i] = transition_prob

        # 高Q模型索引(预计算)
        self._high_q_indices = [
            j for j, q in enumerate(q_beta_grid) if q >= high_q_threshold
        ]

    @property
    def beta(self) -> float:
        return float(sum(
            self._mu[j] * self._models[j].x[1] for j in range(self.M)
        ))

    @property
    def beta_variance(self) -> float:
        b = self.beta
        return float(sum(
            self._mu[j] * (self._models[j].P[1, 1] + (self._models[j].x[1] - b) ** 2)
            for j in range(self.M)
        ))

    @property
    def regime_score(self) -> float:
        return float(sum(self._mu[j] for j in self._high_q_indices))

    @property
    def effective_q_beta(self) -> float:
        return float(sum(
            self._mu[j] * self._q_beta_grid[j] for j in range(self.M)
        ))

    def update(self, r_btc: float, r_alt: float) -> dict:
        """接收一对新的对数收益率,执行 IMM 更新

        Returns:
            dict with keys: alpha, beta, P_beta, P_alpha,
                  regime_score, effective_q_beta, model_probs,
                  dominant_model_idx, innovation, normalized_innovation,
                  clipped, observable, R
        """
        self._n_updates += 1

        # ── Step 0: 可观测性检查 ──
        observable = abs(r_btc) >= self._obs_threshold
        self._last_observable = observable

        if not observable:
            # r_btc ≈ 0: β 不可观测,仅用 H=[1,0] 更新 α
            # 不更新模型概率,保持上一步的 regime_score
            for m in self._models:
                m.predict()
            # 仅更新 α(用 r_alt 作为 α 的观测)
            for m in self._models:
                H_alpha = np.array([1.0, 0.0], dtype=np.float64)
                innov = r_alt - H_alpha @ m.x
                S = m.P[0, 0] + m.R
                S = max(S, 1e-15)
                K = m.P @ H_alpha / S
                m.x = m.x + K * innov
                A = np.eye(2) - np.outer(K, H_alpha)
                m.P = A @ m.P @ A.T + m.R * np.outer(K, K)

            beta = sum(self._mu[j] * self._models[j].x[1] for j in range(self.M))
            alpha = sum(self._mu[j] * self._models[j].x[0] for j in range(self.M))
            P_beta = sum(
                self._mu[j] * (self._models[j].P[1, 1] + (self._models[j].x[1] - beta) ** 2)
                for j in range(self.M)
            )
            return {
                'alpha': float(alpha), 'beta': float(beta),
                'P_beta': float(P_beta), 'P_alpha': 0.0,
                'regime_score': self.regime_score,
                'effective_q_beta': self.effective_q_beta,
                'model_probs': self._mu.copy(),
                'dominant_model_idx': int(np.argmax(self._mu)),
                'innovation': 0.0, 'normalized_innovation': 0.0,
                'clipped': False, 'observable': False,
                'kalman_gain_beta': 0.0, 'R': self._models[0].R,
            }

        # ── Step 1: 混合(Interaction)──
        mu_predicted = self._TPM.T @ self._mu  # 预测模型概率 (M,)

        # 混合权重 w[i,j] = π[i,j] * μ[i] / μ̃[j]
        mix_weights = np.zeros((self.M, self.M))
        for j in range(self.M):
            if mu_predicted[j] > 1e-30:
                for i in range(self.M):
                    mix_weights[i, j] = self._TPM[i, j] * self._mu[i] / mu_predicted[j]

        # 为每个模型 j 准备混合后的初始条件
        for j in range(self.M):
            x_mixed = np.zeros(2)
            for i in range(self.M):
                x_mixed += mix_weights[i, j] * self._models[i].x

            P_mixed = np.zeros((2, 2))
            for i in range(self.M):
                dx = self._models[i].x - x_mixed
                P_mixed += mix_weights[i, j] * (
                    self._models[i].P + np.outer(dx, dx)
                )

            self._models[j].x = x_mixed
            self._models[j].P = P_mixed

        # ── Step 2: 并行滤波(OU 预测 + 观测更新)──
        results = []
        log_likelihoods = np.zeros(self.M)
        for j in range(self.M):
            self._models[j].predict()       # OU 预测步
            r = self._models[j].update(r_btc, r_alt)  # 观测更新
            results.append(r)
            log_likelihoods[j] = r['log_likelihood']

        # ── Step 3: 模型概率更新 ──
        # 数值稳定的 log-space 计算
        log_mu = np.log(np.maximum(mu_predicted, 1e-30)) + log_likelihoods
        log_mu -= np.max(log_mu)  # 防止溢出
        self._mu = np.exp(log_mu)
        total = np.sum(self._mu)
        if total > 0:
            self._mu /= total
        else:
            self._mu = np.ones(self.M) / self.M

        # 模型概率下限保护(防止长期运行后某模型概率下溢到 0)
        self._mu = np.maximum(self._mu, self._mu_floor)
        self._mu /= self._mu.sum()

        # ── Step 4: 在线 R 更新(Robbins-Monro EMA,v3.1 无截断版)──
        if self._r_gamma > 0:
            # 用预测概率 μ̃ 加权,避免与 Step 3 的循环依赖
            weighted_innov_sq = sum(
                mu_predicted[j] * results[j]['innovation'] ** 2 for j in range(self.M)
            )
            weighted_HP_H = sum(
                mu_predicted[j] * results[j]['HP_H'] for j in range(self.M)
            )
            # v3.1: 不截断,允许负值,仅用 R_floor 保护正定
            r_update = weighted_innov_sq - weighted_HP_H
            new_R = (1 - self._r_gamma) * self._models[0].R + self._r_gamma * r_update
            new_R = max(new_R, self._r_floor)
            for m in self._models:
                m.R = new_R

        # ── Step 5: 融合输出 ──
        beta = sum(self._mu[j] * results[j]['beta'] for j in range(self.M))
        alpha = sum(self._mu[j] * results[j]['alpha'] for j in range(self.M))
        P_beta = sum(
            self._mu[j] * (results[j]['P_beta'] + (results[j]['beta'] - beta) ** 2)
            for j in range(self.M)
        )
        P_alpha = sum(
            self._mu[j] * (results[j]['P_alpha'] + (results[j]['alpha'] - alpha) ** 2)
            for j in range(self.M)
        )
        regime_score = sum(self._mu[j] for j in self._high_q_indices)
        effective_q = sum(self._mu[j] * self._q_beta_grid[j] for j in range(self.M))

        dominant = int(np.argmax(self._mu))

        # 概率加权 innovation
        weighted_innovation = sum(
            self._mu[j] * results[j]['innovation'] for j in range(self.M)
        )
        weighted_norm_innov = sum(
            self._mu[j] * results[j]['normalized_innovation'] for j in range(self.M)
        )
        any_clipped = any(results[j]['clipped'] for j in range(self.M)
                         if self._mu[j] > 0.1)

        return {
            'alpha': float(alpha),
            'beta': float(beta),
            'P_beta': float(P_beta),
            'P_alpha': float(P_alpha),
            'regime_score': float(regime_score),
            'effective_q_beta': float(effective_q),
            'model_probs': self._mu.copy(),
            'dominant_model_idx': dominant,
            'innovation': float(weighted_innovation),
            'normalized_innovation': float(weighted_norm_innov),
            'clipped': any_clipped,
            'observable': True,
            'kalman_gain_beta': results[dominant]['kalman_gain_beta'],
            'R': self._models[0].R,
        }

    def state_dict(self) -> dict:
        return {
            'models': [
                {'x': m.x.tolist(), 'P': m.P.tolist(), 'Q': m.Q.tolist(), 'R': m.R}
                for m in self._models
            ],
            'mu': self._mu.tolist(),
            'q_beta_grid': self._q_beta_grid,
            'high_q_threshold': self._high_q_threshold,
            'n_updates': self._n_updates,
            'TPM': self._TPM.tolist(),
            'r_gamma': self._r_gamma,
            'r_floor': self._r_floor,
            'nu': self._nu,
            'clip_sigma': self._clip_sigma,
            'x_bar': self._x_bar.tolist(),
            'phi_vec': self._phi_vec.tolist(),
            'obs_threshold': self._obs_threshold,
            'mu_floor': self._mu_floor,
        }

    @classmethod
    def from_state_dict(cls, d: dict) -> 'IMMKalmanBetaEstimator':
        obj = cls.__new__(cls)
        obj.M = len(d['models'])
        obj._q_beta_grid = d['q_beta_grid']
        obj._high_q_threshold = d['high_q_threshold']
        obj._n_updates = d['n_updates']
        obj._mu = np.array(d['mu'], dtype=np.float64)
        obj._TPM = np.array(d['TPM'], dtype=np.float64)
        obj._r_gamma = d.get('r_gamma', 0.02)
        obj._r_floor = d.get('r_floor', 1e-8)
        obj._nu = d.get('nu', 5.0)
        obj._clip_sigma = d.get('clip_sigma', 3.0)
        obj._x_bar = np.array(d.get('x_bar', [0.0, 1.0]), dtype=np.float64)
        obj._phi_vec = np.array(d.get('phi_vec', [0.995, 0.98]), dtype=np.float64)
        obj._obs_threshold = d.get('obs_threshold', 1e-6)
        obj._mu_floor = d.get('mu_floor', 1e-6)
        obj._last_observable = True

        nu = obj._nu
        log_t_const = (
            gammaln((nu + 1) / 2) - gammaln(nu / 2)
            - 0.5 * np.log(nu * np.pi)
        )

        phi = np.diag(obj._phi_vec)
        I_phi = np.eye(2) - phi

        obj._models = []
        for md in d['models']:
            m = _KalmanModel.__new__(_KalmanModel)
            m.x = np.array(md['x'], dtype=np.float64)
            m.P = np.array(md['P'], dtype=np.float64)
            m.Q = np.array(md['Q'], dtype=np.float64)
            m.R = md['R']
            m._x_bar = obj._x_bar.copy()
            m._phi = phi.copy()
            m._I_phi = I_phi.copy()
            m._r_floor = obj._r_floor
            m._clip_sigma = obj._clip_sigma
            m._nu = nu
            m._log_t_const = log_t_const
            obj._models.append(m)
        obj._high_q_indices = [
            j for j, q in enumerate(obj._q_beta_grid) if q >= obj._high_q_threshold
        ]
        return obj

改动函数:calculate_cointegration_params_dual_window()

在现有 OLS 之后,新增 IMM Kalman Filter 输出:

def calculate_cointegration_params_dual_window(
    base_klines, alt_klines,
    beta_window=None, zscore_window=None,
    kalman_state: dict | None = None,  # [新增] IMM 状态
):
    # ... 现有 OLS 逻辑不变 ...

    # ═══ 新增:IMM Kalman Filter 更新 ═══
    kalman_result = None
    kalman_state_out = kalman_state

    if len(aligned) >= 2:
        r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
        r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])

        if kalman_state is not None:
            kf = IMMKalmanBetaEstimator.from_state_dict(kalman_state)
        else:
            ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
            # v3.1: per-pair ν 从 OLS 残差 kurtosis 自动推算
            nu_init = _estimate_student_t_nu(
                model.resid if hasattr(model, 'resid') else np.array([]),
                default=IMM_STUDENT_T_NU,
            )
            kf = IMMKalmanBetaEstimator(
                alpha_init=alpha_ols if use_alpha else 0.0,
                beta_init=beta_ols,
                q_beta_grid=IMM_Q_BETA_GRID,
                q_alpha=IMM_Q_ALPHA,
                r_init=max(ols_residual_var, 1e-6),
                p0_alpha=IMM_P0_ALPHA, p0_beta=IMM_P0_BETA,
                r_floor=IMM_R_FLOOR, r_gamma=IMM_R_GAMMA,
                clip_sigma=IMM_CLIP_SIGMA, nu=nu_init,
                transition_prob=IMM_TRANSITION_PROB,
                high_q_threshold=IMM_HIGH_Q_THRESHOLD,
                phi_beta=IMM_PHI_BETA, phi_alpha=IMM_PHI_ALPHA,
                obs_threshold=IMM_OBS_THRESHOLD,
                mu_floor=IMM_MU_FLOOR,
            )

        kalman_result = kf.update(r_btc, r_alt)
        kalman_state_out = kf.state_dict()

    return {
        # ... 现有字段不变 ...
        'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
        'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
        'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
        'kalman_regime_score': kalman_result['regime_score'] if kalman_result else 0.0,
        'kalman_effective_q': kalman_result['effective_q_beta'] if kalman_result else 1e-4,
        'kalman_model_probs': kalman_result['model_probs'].tolist() if kalman_result else None,
        'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
        'kalman_clipped': kalman_result['clipped'] if kalman_result else False,
        'kalman_observable': kalman_result['observable'] if kalman_result else True,
        'kalman_state': kalman_state_out,
    }

改动函数:analyze_pair_advanced() / analyze_multi_period()

透传方式与 v2 相同,将 v2 的 kalman_q_beta / kalman_innov_sq_ema 替换为 kalman_regime_score / kalman_effective_q / kalman_model_probs / kalman_observable


4.2 IMM 体制检测(用途 1)

设计原理

v2 使用 BOCPD 监控 normalized innovation 的分布变化来推断体制切换,存在与 Q 自适应的信号竞争问题。

v3.1 直接使用 IMM 的 regime_score(高Q模型的总概率)作为体制指标。这是一个天然的贝叶斯后验概率,不需要额外的变点检测器。

regime_score 定义:
  regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j

  = P(β 正在以 ≥ q_high 的速率变化 | 历史数据)
regime_score 含义 动作
< soft_prob (0.3) 低Q模型主导,β 稳定 正常入场
[soft_prob, hard_prob) 高Q模型概率上升,β 开始变化 阈值缩放
≥ hard_prob (0.7) 高Q模型主导,β 正在剧烈变化 硬拦截

可观测性保护:当 kalman_observable=False(r_btc ≈ 0)时,regime_score 保持上一步值,体制判定不做更新。

文件:src/trading/config.py

StrategyParams 新增字段(v3.1 简化版,替代 v2 的 BOCPD 参数):

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # Beta 体制自适应过滤器(v3.1: 基于 IMM 模型概率)
    beta_regime_enabled: bool = True
    beta_regime_soft_prob: float = 0.3       # regime_score > 此值 → 开始缩放阈值
    beta_regime_hard_prob: float = 0.7       # regime_score > 此值 → 硬拦截
    beta_regime_scale_max: float = 2.0       # 阈值最大缩放倍数
    beta_regime_warmup: int = 5              # 最少 N 根 4h K线才开始判定

对比 v2 参数量:v2 有 10 个 BOCPD 参数(hazard_rate, max_run_length, adaptive_hazard, hazard_min, hazard_max, hazard_innov_ref, soft_prob, hard_prob, scale_max, warmup),v3.1 仅需 4 个参数(soft_prob, hard_prob, scale_max, warmup)。

文件:src/trading/strategy.py

_BetaRegimeState 数据类
@dataclass
class _BetaRegimeState:
    """Beta 体制检测结果(v3.1: 基于 IMM 模型概率)"""
    regime: str               # 'stable' | 'expanding'
    regime_score: float       # P(高Q模型) ∈ [0, 1]
    kalman_P_beta: float
    effective_q_beta: float
    threshold_scale: float    # 阈值缩放因子 (>=1.0)
    hard_block: bool
    reason: str
    observable: bool          # r_btc 是否足够大(β 可观测)
_IMMRegimeDetector 类(替代 v2 的 _BOCPDDetector
class _IMMRegimeDetector:
    """基于 IMM 模型概率的 Beta 体制检测器

    直接使用 IMMKalmanBetaEstimator 输出的 regime_score
    (高Q模型总概率)判定 β 是否处于快速变化状态。

    相比 v2 的 BOCPD:
        - 无信号竞争(体制信号是 IMM 的内生输出)
        - 无额外参数(不需要 hazard rate, max_run_length 等)
        - 校准更好(regime_score 是贝叶斯后验概率)
        - 可观测性感知(r_btc ≈ 0 时不更新判定)

    内存: O(1) per pair(仅存储最新状态)
    计算: O(1) per check(简单阈值比较)
    """

    def __init__(self):
        self._pair_states: dict[PairKey, _BetaRegimeState] = {}
        self._n_updates: dict[PairKey, int] = {}

    def update(
        self,
        key: PairKey,
        regime_score: float,
        kalman_P_beta: float,
        effective_q_beta: float,
        kline_time: str,
        soft_prob: float,
        hard_prob: float,
        scale_max: float,
        warmup: int,
        observable: bool = True,
    ) -> _BetaRegimeState:
        """根据 IMM regime_score 判定当前体制"""

        n = self._n_updates.get(key, 0) + 1
        self._n_updates[key] = n

        if n < warmup:
            state = _BetaRegimeState(
                'stable', regime_score, kalman_P_beta,
                effective_q_beta, 1.0, False, "数据不足", observable
            )
            self._pair_states[key] = state
            return state

        # 可观测性保护: r_btc ≈ 0 时保持上一步判定
        if not observable:
            prev = self._pair_states.get(key)
            if prev is not None:
                state = _BetaRegimeState(
                    prev.regime, prev.regime_score, kalman_P_beta,
                    effective_q_beta, prev.threshold_scale, prev.hard_block,
                    f"{prev.reason} (β不可观测,保持)", False
                )
            else:
                state = _BetaRegimeState(
                    'stable', regime_score, kalman_P_beta,
                    effective_q_beta, 1.0, False, "β不可观测", False
                )
            self._pair_states[key] = state
            return state

        if regime_score >= hard_prob:
            state = _BetaRegimeState(
                'expanding', regime_score, kalman_P_beta,
                effective_q_beta, scale_max, True,
                f"Beta硬拦截: regime_score={regime_score:.3f}>={hard_prob} "
                f"eff_Q={effective_q_beta:.1e}", True
            )
        elif regime_score >= soft_prob:
            t = (regime_score - soft_prob) / max(hard_prob - soft_prob, 0.01)
            t = min(max(t, 0.0), 1.0)
            scale = 1.0 + t * (scale_max - 1.0)
            state = _BetaRegimeState(
                'expanding', regime_score, kalman_P_beta,
                effective_q_beta, scale, False,
                f"Beta缩放: regime_score={regime_score:.3f} scale={scale:.2f} "
                f"eff_Q={effective_q_beta:.1e}", True
            )
        else:
            state = _BetaRegimeState(
                'stable', regime_score, kalman_P_beta,
                effective_q_beta, 1.0, False,
                f"Beta稳定: regime_score={regime_score:.3f} "
                f"eff_Q={effective_q_beta:.1e}", True
            )

        self._pair_states[key] = state
        return state

    def get_all_states(self) -> dict[PairKey, _BetaRegimeState]:
        return self._pair_states

    def cleanup_pair(self, key: PairKey) -> None:
        self._pair_states.pop(key, None)
        self._n_updates.pop(key, None)

4.3 跨配对系统性风险聚合(v3.1 双阈值 + 确认窗口版)

设计原理

v2 使用 weighted_cp >= systemic_threshold × 0.8 将加权概率阈值耦合到计数比例阈值。v3 将两个阈值完全独立设定。v3.1 进一步新增确认窗口,要求连续 N 根 K 线都触发才执行全局拦截,避免单根 K 线噪声导致全系统停摆。

文件:src/trading/config.py

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # 系统性风险参数(v3.1: 独立阈值 + 确认窗口)
    systemic_risk_enabled: bool = True
    systemic_ratio_threshold: float = 0.3      # expanding 配对比例阈值
    systemic_weighted_threshold: float = 0.25  # 加权 regime_score 阈值(独立设定)
    systemic_confirm_bars: int = 2             # 连续触发 N 根 K 线才拦截(防噪声)

文件:src/trading/strategy.py

class _SystemicRiskAggregator:
    """统计所有配对的 IMM 体制状态,
    基于双独立阈值 + 确认窗口判断系统性风险。

    v3.1 改进:
        - ratio_threshold 和 weighted_threshold 完全独立(消除 0.8 魔术数字)
        - 确认窗口: 连续 N 根 K 线触发才执行拦截(防止单根噪声停摆)
        - 加权聚合支持仓位权重
    """

    def __init__(self, enabled: bool = True):
        self._enabled = enabled
        self._consecutive_triggers = 0

    def check(
        self,
        pair_states: dict[PairKey, _BetaRegimeState],
        ratio_threshold: float = 0.3,
        weighted_threshold: float = 0.25,
        confirm_bars: int = 2,
        position_weights: dict[PairKey, float] | None = None,
    ) -> tuple[bool, str]:
        if not self._enabled or not pair_states:
            self._consecutive_triggers = 0
            return False, ""

        total = len(pair_states)
        n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
        ratio = n_expanding / total

        # 加权平均 regime_score
        if position_weights:
            total_weight = sum(position_weights.get(k, 1.0) for k in pair_states)
            weighted_score = sum(
                position_weights.get(k, 1.0) * s.regime_score
                for k, s in pair_states.items()
            ) / total_weight
        else:
            weighted_score = sum(
                s.regime_score for s in pair_states.values()
            ) / total

        # 双独立条件
        ratio_triggered = ratio >= ratio_threshold
        weighted_triggered = weighted_score >= weighted_threshold

        if ratio_triggered or weighted_triggered:
            self._consecutive_triggers += 1
            if self._consecutive_triggers >= confirm_bars:
                return True, (
                    f"系统性风险(连续{self._consecutive_triggers}次): "
                    f"{n_expanding}/{total} ({ratio:.0%}) 配对扩张态"
                    f"{'(比例触发)' if ratio_triggered else ''}, "
                    f"加权regime_score={weighted_score:.3f}"
                    f"{'(加权触发)' if weighted_triggered else ''}"
                )
            return False, (
                f"系统性风险待确认({self._consecutive_triggers}/{confirm_bars}): "
                f"{n_expanding}/{total} ({ratio:.0%})"
            )

        self._consecutive_triggers = 0
        return False, ""

4.4 Beta 加权仓位计算(用途 2,与 v2 相同)

设计原理

标准配对交易的对冲方程:

spread_t = log(P_alt_t) - β × log(P_base_t) - α

要使 spread 对 P_base 的变动无敞口,两腿名义价值须满足:

alt_notional = |β| × base_notional

hedge_beta 选择逻辑

def resolve_hedge_beta(
    kalman_beta: float | None,
    kalman_P_beta: float | None,
    ols_beta: float | None,
    p_beta_max: float = HEDGE_BETA_P_MAX,
    beta_min: float = HEDGE_BETA_MIN,
    beta_max: float = HEDGE_BETA_MAX,
) -> tuple[float, str, bool]:
    """选择用于仓位计算的 hedge_beta

    优先 Kalman β,不确定性过高时降级为 OLS β,最后兜底 β=1.0

    Returns:
        (hedge_beta, source, beta_negative)
        hedge_beta: 绝对值,用于仓位比例计算
        source: 'kalman' | 'ols' | 'default'
        beta_negative: True 表示负相关配对,策略层需翻转方向
    """
    raw_beta = None
    source = 'default'

    if kalman_beta is not None and kalman_P_beta is not None:
        if kalman_P_beta <= p_beta_max:
            raw_beta = kalman_beta
            source = 'kalman'
        else:
            logger.debug(f"Kalman P_β={kalman_P_beta:.4f} > {p_beta_max},降级为 OLS β")

    if raw_beta is None and ols_beta is not None:
        raw_beta = ols_beta
        source = 'ols'

    if raw_beta is None:
        return 1.0, 'default', False

    beta_negative = raw_beta < 0
    beta = np.clip(abs(raw_beta), beta_min, beta_max)

    if beta_negative:
        logger.warning(f"负 β 检测: raw_β={raw_beta:.4f}({source}),配对为反向相关")

    return float(beta), source, beta_negative

文件:src/trading/risk_manager.py

改动 calculate_position_size() 增加 hedge_beta 参数(与 v2 相同):

def calculate_position_size(
    self,
    signal: PairTradeSignal,
    alt_price: float,
    base_price: float = 0.0,
    available_balance: float = 0.0,
    hedge_beta: float = 1.0,
    hedge_beta_source: str = '',
) -> tuple[float, float]:
    """计算仓位大小(Beta 加权)

    Beta 加权逻辑:
        base_notional = total_position / (1 + |β|)
        alt_notional  = |β| × base_notional
        总名义价值 = base_notional + alt_notional = total_position(保持不变)

    当 β=1.0 时退化为等额(向后兼容)。
    """
    # ... 现有 base_usd / 信号强度缩放 / 上限检查 / 余额检查 逻辑不变 ...

    alt_size = 0.0
    base_size = 0.0

    if self._config.pair_mode == "pair" and base_price > 0 and alt_price > 0:
        abs_beta = max(abs(hedge_beta), 0.1)
        total_position = position_usd * 2

        base_notional = total_position / (1.0 + abs_beta)
        alt_notional = abs_beta * base_notional

        alt_size = alt_notional / alt_price
        base_size = base_notional / base_price

        logger.info(
            f"仓位计算(β加权): 基础=${base_usd:.0f} × 缩放={scale} | "
            f"β={hedge_beta:.3f}({hedge_beta_source}) | "
            f"Alt: {alt_size:.2f} × ${alt_price:.4f} ≈ ${alt_notional:.0f} | "
            f"Base: {base_size:.2f} × ${base_price:.4f} ≈ ${base_notional:.0f} | "
            f"比例 {alt_notional/base_notional:.2f}:1"
        )
    elif alt_price > 0:
        alt_size = position_usd / alt_price

    return alt_size, base_size

文件:src/trading/models.py

与 v2 相同:

@dataclass
class PairTradeSignal:
    # ... 现有字段 ...
    hedge_beta: float = 1.0
    hedge_beta_source: str = 'default'
    beta_negative: bool = False

@dataclass
class PairPosition:
    # ... 现有字段 ...
    entry_hedge_beta: float = 1.0

4.5 编排层集成

文件:src/trading/orchestrator.py

新增状态_kalman_states: dict[PairKey, dict]

class TradingOrchestrator:
    def __init__(self, ...):
        # ... 现有 ...
        self._kalman_states: dict[PairKey, dict] = {}

process_analysis() 改动:从 multi_period_result 提取 IMM 输出,传给 strategy.process_tick()on_entry_signal()

def process_analysis(self, symbol, z4h, multi_period_result, timestamp, ...):
    # ... 现有输入验证 ...

    # [v3.1] 提取 IMM Kalman 输出
    kalman_beta = multi_period_result.get('kalman_beta')
    kalman_P_beta = multi_period_result.get('kalman_P_beta')
    kalman_regime_score = multi_period_result.get('kalman_regime_score')
    kalman_effective_q = multi_period_result.get('kalman_effective_q')
    kalman_observable = multi_period_result.get('kalman_observable', True)

    # [v3.1] 更新 Kalman 状态缓存
    kalman_state_new = multi_period_result.get('kalman_state')
    if kalman_state_new is not None:
        self._kalman_states[(symbol, base_symbol)] = kalman_state_new

    entry_signal, exit_signal = self._strategy.process_tick(
        symbol, base_symbol, z4h, timestamp,
        kline_time=kline_time, latest_price=price_for_log,
        kalman_regime_score=kalman_regime_score,
        kalman_P_beta=kalman_P_beta,
        kalman_effective_q=kalman_effective_q,
        kalman_observable=kalman_observable,
        alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv,
    )

    if entry_signal is not None:
        ols_beta = multi_period_result.get('details', {}).get(
            ('4h', '60d'), {}).get('cointegration_new', {}).get('beta')
        hedge_beta, hedge_source, beta_negative = resolve_hedge_beta(
            kalman_beta, kalman_P_beta, ols_beta
        )

        if beta_negative:
            logger.warning(
                f"负 β 配对 {symbol}|{base_symbol}: β={kalman_beta:.3f}, "
                f"当前方向逻辑可能需要翻转,建议人工复核"
            )

        self.on_entry_signal(
            symbol, multi_period_result,
            latest_alt_price=price_for_log,
            direction=entry_signal.direction,
            adaptive_z=entry_signal.adaptive_z,
            hedge_beta=hedge_beta,
            hedge_beta_source=hedge_source,
            beta_negative=beta_negative,
        )

on_entry_signal() 改动与 v2 相同(传递 hedge_beta 到信号)。


4.6 策略层集成(用途 1)

文件:src/trading/strategy.py

__init__

self._regime_detector = _IMMRegimeDetector()  # v3.1: 替代 v2 的 _BOCPDDetector
self._systemic_aggregator = _SystemicRiskAggregator(enabled=True)

process_tick() 签名

def process_tick(
    self, symbol, base_symbol, z4h, timestamp,
    kline_time=None, latest_price=None,
    kalman_regime_score: float | None = None,
    kalman_P_beta: float | None = None,
    kalman_effective_q: float | None = None,
    kalman_observable: bool = True,
    alt_ohlcv=None, base_ohlcv=None,
) -> tuple[EntrySignal | None, ExitSignal | None]:

_check_entry() — z4h 过滤之后、方向判断之前

# ── [v3.1] Beta 体制检查(基于 IMM 模型概率)──
if params.beta_regime_enabled and kalman_regime_score is not None:
    beta_state = self._regime_detector.update(
        key,
        regime_score=kalman_regime_score,
        kalman_P_beta=kalman_P_beta or 0.0,
        effective_q_beta=kalman_effective_q or 1e-4,
        kline_time=str(kline_time),
        soft_prob=params.beta_regime_soft_prob,
        hard_prob=params.beta_regime_hard_prob,
        scale_max=params.beta_regime_scale_max,
        warmup=params.beta_regime_warmup,
        observable=kalman_observable,
    )

    if beta_state.hard_block:
        logger.info(f"Beta体制硬拦截 | {pair_label} | {beta_state.reason} | "
                    f"az={adaptive_z:+.4f} z4h={z4h:+.4f}")
        return None

    all_states = self._regime_detector.get_all_states()
    is_systemic, systemic_reason = self._systemic_aggregator.check(
        all_states,
        ratio_threshold=params.systemic_ratio_threshold,
        weighted_threshold=params.systemic_weighted_threshold,
        confirm_bars=params.systemic_confirm_bars,
    )
    if is_systemic:
        logger.info(f"系统性风险拦截 | {pair_label} | {systemic_reason}")
        return None

    threshold_scale = beta_state.threshold_scale
else:
    beta_state = None
    threshold_scale = 1.0

# ── 方向判断(应用 Beta 缩放)──
threshold = params.adaptive_threshold * threshold_scale
if adaptive_z < -threshold:
    direction = 'long'
elif adaptive_z > threshold:
    direction = 'short'
else:
    if threshold_scale > 1.01:
        logger.info(f"Beta体制缩放拦截 | {pair_label} | "
                    f"az={adaptive_z:+.4f} 有效阈值={threshold:.2f} "
                    f"(原始={params.adaptive_threshold} ×{threshold_scale:.2f}) | "
                    f"{beta_state.reason if beta_state else ''}")
    return None

cleanup_pair()

self._regime_detector.cleanup_pair(key)

5. 完整数据流

1. WebSocket 4h K线闭合
   ↓
2. realtime_kline_service_base 触发分析
   ↓
3. analyze_multi_period()
   ├─ calculate_cointegration_params_dual_window(kalman_state=缓存)
   │   ├─ OLS 回归 → β_ols, spread, adf_pvalue(现有)
   │   └─ [v3.1] IMM Kalman update:
   │       ├─ Step 0: 可观测性检查(|r_btc| < ε_obs → 跳过 β 更新)
   │       ├─ Step 1: 混合(带宽 TPM 加权)
   │       ├─ Step 2: M=5 并行 OU 预测 + Kalman 更新(Student-t 似然)
   │       ├─ Step 3: 贝叶斯模型概率更新(概率下限保护)
   │       ├─ Step 4: Robbins-Monro 在线 R 估计(无截断偏差,用 μ̃ 加权)
   │       └─ Step 5: 输出 beta, P_beta, regime_score, effective_q, model_probs, R, observable
   ├─ 健康监控 → Gate2(现有)
   └─ 输出: multi_period_result(含 kalman_* 字段)
   ↓
4. orchestrator.process_analysis()
   ├─ 缓存 kalman_state → self._kalman_states[pair_key]
   ├─ 传递 kalman_regime_score, kalman_P_beta, kalman_observable → strategy.process_tick()
   │   ├─ [用途 1] _IMMRegimeDetector.update(regime_score, observable) → 体制判定
   │   │   └─ 硬拦截 / 阈值缩放 / 不可观测时保持(直接阈值比较,无 BOCPD)
   │   └─ [用途 1] _SystemicRiskAggregator.check(confirm_bars) → 系统性拦截(需连续确认)
   ├─ 若产生 EntrySignal:
   │   ├─ [用途 2] resolve_hedge_beta(kalman_beta, P_β, ols_beta)
   │   └─ on_entry_signal(hedge_beta=...) → PairTradeSignal
   ↓
5. position_manager.open_position(signal)
   ├─ risk_manager.calculate_position_size(hedge_beta=signal.hedge_beta)
   │   ├─ base_notional = total / (1 + |β|)
   │   └─ alt_notional  = |β| × base_notional
   ├─ executor.market_open(alt_size, base_size)
   └─ PairPosition(entry_hedge_beta=β)
   ↓
6. 持久化
   ├─ pair_positions: 含 entry_hedge_beta
   └─ trading_signals: 含 hedge_beta, hedge_beta_source, beta_negative

6. 场景模拟

A:正常市场(β 稳定) — 两个用途均正常

低Q模型(1e-6, 1e-5) 主导: μ = [0.35, 0.40, 0.20, 0.04, 0.01]
kalman_beta ≈ 0.45, P_β ≈ 0.003 (远低于 OU 稳态 P_∞≈2.5e-3)

用途 1: regime_score = μ[3]+μ[4] = 0.05 < soft_prob(0.3)
        → regime=STABLE, scale=1.0 → 正常入场
用途 2: hedge_beta=0.45(kalman) → alt_notional = 0.45 × base_notional
        比等额更少的 Alt 暴露,正确反映 β<1 的弱相关性

B:β 开始飙升 — IMM 模型概率瞬时切换

T=0h:   kalman_beta ≈ 0.5, μ = [0.35, 0.40, 0.20, 0.04, 0.01]
        regime_score = 0.05 → 正常

T=4h:   β 突变 → 高Q模型 likelihood 飙升
        μ = [0.05, 0.10, 0.25, 0.40, 0.20]
        regime_score = 0.60 > soft_prob(0.3) → EXPANDING, scale≈1.3
        effective_q = 2.3e-3(自动切换到快速追踪)
        OU 回归约束: β 向 β̄ 回归 2%/步,不会无限漂移

        [对比 v2: 此时 Q_β 才刚开始 ×1.05 逐步放大, BOCPD 的信号还被 Q 调整削弱]

T=8h:   β 继续变化 → 最高Q模型(1e-2)主导
        μ = [0.01, 0.02, 0.07, 0.30, 0.60]
        regime_score = 0.90 > hard_prob(0.7) → 硬拦截

        [对比 v2: BOCPD P(变点)可能仍 < hard_prob(0.7), 因为信号被 Q 自适应部分吸收]

v3.1 vs v2 响应速度对比

  • v2:Q 自适应需 ~10 步才能放大 Q_β 到合理水平(受 κ_up=1.05 限制);BOCPD 信号被削弱后需更多步
  • v3.1:IMM 模型概率在 1-2 步内 完成切换(贝叶斯后验即时更新),regime_score 同步升高

C:β 飙升后企稳 — 模型概率自动回归 + OU 回归

飙升期 regime_score = 0.90 → 硬拦截
企稳后:
  高Q模型 likelihood 下降,低Q模型 likelihood 上升
  μ 逐步回到 [0.30, 0.35, 0.25, 0.08, 0.02]

→ ~5-8 根正常 K线(20-32h)后 regime_score < soft_prob → 恢复

恢复后:
  用途 1: regime=STABLE → 允许入场
  用途 2: kalman_beta 已追踪到新 β
          OU 过程使 β 向 β̄ 缓慢回归(Φ_β=0.98, 每步回归 2%)
          若新 β 持续偏离 → x̄ 应在后续版本支持在线更新

D:β≈0.3 的弱相关配对 — hedge ratio 纠偏最显著

等额开仓:  Alt $100 + Base $100 → Alt 非系统性风险暴露 $70(多余)
β 加权:    Alt $46 + Base $154 → 正确对冲

E:Kalman 冷启动 — 降级策略

系统重启,kalman_state 丢失:
  T=0:  用 OLS β 初始化所有 M=5 个模型,P₀ = diag(0.1, 1.0)
        P_β = 1.0 > HEDGE_BETA_P_MAX(0.5) → 降级为 OLS β
        μ = [1/5, 1/5, 1/5, 1/5, 1/5](均匀)
        x̄ = [α_ols, β_ols](OU 长期均值设为 OLS 估计)

  T=3-5 根 K线(12-20h):
        模型概率分化,P_β 收敛(OU 保证收敛到稳态)→ 切回 Kalman β

F:系统性风险 + 确认窗口

T=0h: 20 配对中 8 个 regime_score > 0.3 → 40% > ratio_threshold=30%
      → 触发但确认计数=1 < confirm_bars=2 → 不拦截,记录日志
T=4h: 仍有 7 个 expanding → 确认计数=2 ≥ confirm_bars=2
      → 系统性风险拦截 → 所有配对暂停入场
      → IMM 仍在后台更新,恢复后使用最新 β

[对比 v3 原方案: 单根 K 线触发即拦截 → 可能因噪声导致不必要的停摆]

G:MEME vs L1 配对自适应对比

MEME 配对 (PURR/HYPE):
  β 波动大 → 高Q模型(1e-3, 1e-2) 常获高概率
  effective_q ≈ 5e-4 → 追踪快
  regime_score ≈ 0.3-0.5 → 经常处于缩放区间(提高入场门槛)
  per-pair ν ≈ 3.5(kurtosis ≈ 15)→ 重尾保护强

L1 配对 (ETH/BTC):
  β 极稳定 → 低Q模型(1e-6, 1e-5) 主导
  effective_q ≈ 5e-6 → 不追噪声
  regime_score ≈ 0.02-0.05 → 几乎总是 stable(正常入场)
  per-pair ν ≈ 8.0(kurtosis ≈ 6)→ 接近高斯

H:重尾异常值场景 — Student-t vs 高斯似然对比

稳定期 β≈0.5,某根 K线出现 5σ 异常收益率(闪崩/插针):

高斯似然:
  5σ 事件: L_j = exp(-12.5) ≈ 3.7e-6(几乎为零)
  → 高Q模型 likelihood 相对飙升(因为高Q模型的 S 更大,惩罚更轻)
  → 模型概率瞬间切换到高Q,regime_score 可能从 0.05 → 0.70+
  → 误触发硬拦截(实际上 β 并未变化,只是一个异常值)

Student-t(ν=5) 似然:
  5σ 事件: L_j ∝ (1 + 25/5)^{-3} ≈ 0.005(合理的小概率)
  → 各模型 likelihood 差异缩小(重尾下异常值不那么"不可能")
  → 模型概率温和调整,regime_score 从 0.05 → 0.15(仍在 stable)
  → 无误拦截

关键差异: 高斯下 4 个数量级的 likelihood 差距 → Student-t 下仅 1 个数量级

I:在线 R 自适应场景 — v3.1 无截断 vs v3 有截断

牛市期(收益率波动 σ≈3%):
  R 通过 Robbins-Monro EMA 跟踪到 ≈ 9e-4

突然进入熊市恐慌(σ 从 3% 升到 8%):
  v3.1 (无截断): R 在 ~20 步(~3天)内追踪到新波动率水平
  v3   (有截断): R 类似追踪,但因 max(.,0) 导致轻微正偏差

σ 从 8% 回落到 2%(恐慌结束):
  v3.1 (无截断): ε²-HPH' 频繁为负 → R 正常下降
  v3   (有截断): ε²-HPH' < 0 时被截断为 0 → R 下降更慢(正偏差累积)

长期影响: v3 的 R 会系统性高估 → S 偏大 → 各模型 likelihood 差异缩小
         → regime_score 的区分度降低 → 体制检测灵敏度下降

J:r_btc ≈ 0(BTC 横盘)— 可观测性保护

BTC 连续 3 天窄幅震荡(|r_btc| < 1e-6):

无保护 (v3 原方案):
  H = [1, ~0] → β 不可观测 → P_β 每步增长 Q_β
  3 天 = 18 步 → P_β 增长 18 × Q_β
  对 Q_β=1e-4 的模型: ΔP_β = 1.8e-3 → 可能触发 OLS 降级
  模型概率更新基于无信息量的似然 → regime_score 随机波动

有保护 (v3.1):
  |r_btc| < ε_obs → 跳过整个 IMM 更新
  P_β 仅通过 OU 预测步增长,但被 Φ_β<1 抑制: P_β → P_∞(有稳态)
  regime_score 保持上一步值(不做无意义的更新)
  体制判定标记 observable=False → 不触发任何新的拦截决策

7. 改动文件清单

文件 改动 影响范围
src/config.py +13 IMM 常量(含 Φ_β, Φ_α, ε_obs, μ_floor, Student-t ν)+ 3 Hedge Ratio 常量(删除 v2 的 15 个 Kalman+Q 自适应常量) 配置层
src/utils/analysis/analysis_core.py +_estimate_student_t_nu() 函数 + _KalmanModel 类(含 OU 预测)+ IMMKalmanBetaEstimator 类(含可观测性保护、无截断 R 更新、per-pair ν);增强 calculate_cointegration_params_dual_window(), analyze_pair_advanced(), analyze_multi_period() 分析层
src/trading/config.py StrategyParams +4 体制检测字段 + 3 系统性风险字段(含 confirm_bars)(替代 v2 的 10 个 BOCPD 字段 + 3 自适应 H 字段);增强 get_strategy_params(), _build_strategy_params(), load_trading_config() 配置层
src/trading/models.py PairTradeSignal +3 字段;PairPosition +1 字段(与 v2 相同) 数据模型
src/trading/orchestrator.py +_kalman_states 字典;+resolve_hedge_beta() 函数;增强 process_analysis(), on_entry_signal();透传 kalman_observable 编排层
src/trading/strategy.py +_BetaRegimeState(含 observable), +_IMMRegimeDetector(含可观测性保护), +_SystemicRiskAggregator(含确认窗口);增强 process_tick(), _check_entry(), cleanup_pair() 策略层
src/trading/risk_manager.py 增强 calculate_position_size() 支持 β 加权(与 v2 相同) 风控层
src/trading/position_manager.py 增强 _open_position_inner() 传递 hedge_beta(与 v2 相同) 仓位管理

新增依赖scipy.special.gammaln(Student-t 似然), scipy.stats.kurtosis(per-pair ν 初始化);scipy 已是项目依赖。

不改动momentum_filter.py, executor.py


8. DB 改动

与 v2 相同:

pair_positions 表新增列

ALTER TABLE pair_positions ADD COLUMN entry_hedge_beta DOUBLE PRECISION DEFAULT 1.0;

trading_signals 表新增列

ALTER TABLE trading_signals ADD COLUMN hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta_source VARCHAR(10) DEFAULT 'default';
ALTER TABLE trading_signals ADD COLUMN beta_negative BOOLEAN DEFAULT FALSE;

9. 日志

时机 级别 格式
初始化 INFO Beta体制跟踪器初始化 | IMM M=5 Q_grid=[1e-6..1e-2] ν={nu:.1f} Φ_β=0.98 γ_R=0.02 p_stay=0.98 soft=0.30 hard=0.70
IMM 更新 DEBUG IMM更新 | PURR|HYPE | β̂=1.20 P_β=0.005 regime=0.65 eff_Q=5.2e-4 R=8.3e-4 μ=[.01,.05,.25,.40,.29] obs=True
不可观测 DEBUG IMM跳过β更新 | PURR|HYPE | |r_btc|=3e-7 < ε_obs=1e-6 → 保持regime=0.65
hedge_beta 选择 DEBUG hedge_beta=0.45(kalman) P_β=0.003 | OLS_β=0.48 | negative=False
hedge_beta 降级 DEBUG Kalman P_β=0.62 > 0.50,降级为 OLS β=0.48
负 β 警告 WARNING 负β检测: PURR|HYPE raw_β=-0.30(kalman),配对为反向相关
β 加权仓位 INFO 仓位计算(β加权) | β=0.45(kalman) | Alt ≈$62 | Base ≈$138 | 比例 0.45:1
硬拦截 INFO Beta体制硬拦截 | PURR|HYPE | regime_score=0.850>=0.70 eff_Q=3.2e-3
缩放拦截 INFO Beta体制缩放拦截 | PURR|HYPE | az=-4.50 有效阈值=4.00 (3.0×1.33)
系统性待确认 DEBUG 系统性风险待确认(1/2): 8/20 (40%) 配对扩张态
系统性拦截 INFO 系统性风险(连续2次): 8/20 (40%) 配对扩张态(比例触发), 加权regime_score=0.350(加权触发)

10. 风险与边界条件

风险 严重度 缓解
已持仓 hedge ratio 偏离 本次仅在入场时计算 β 比例;持仓期间 rebalance 作为 P0 后续
β 符号翻转(kalman_beta < 0) resolve_hedge_beta 返回 beta_negative 标记 + 警告日志;方向翻转逻辑作为 P1 后续
OU 长期均值 x̄ 过时 x̄ 初始化为 OLS 估计,β 真值长期偏离时 OU 会拉回旧值。缓解:Φ_β=0.98 的回归力很弱(每步仅 2%),高Q模型可临时克服回归力追踪新值。x̄ 在线更新作为 P1 后续
IMM 模型概率塌缩(单模型概率→1.0) 极低 转移矩阵保证至少 2% 概率流入 + 概率下限保护 μ_floor=1e-6;双重机制防止塌缩
重启丢失 Kalman 状态 OLS 重新初始化 M 个模型,3-5 根 K线收敛;P_β 过大时自动降级为 OLS β;OU 保证 P_β 收敛到稳态
IMM 计算量(5× 单 Kalman) 2×2 状态 ×5 模型 ≈ 100 次浮点运算,4h 频率下完全可忽略
Q_β 网格覆盖不足 5 个值覆盖 4 个数量级(1e-6 到 1e-2),已包含绝大部分实际场景
低波动期 r_btc≈0 v3.1 可观测性保护:跳过 β 更新 + OU 约束 P_β 增长
β 加权导致极端仓位 HEDGE_BETA_MIN=0.1, HEDGE_BETA_MAX=5.0 上下限保护
R 在线估计暂态振荡 γ_R=0.02 的低通滤波 + R_floor 兜底;无截断偏差使 R 在暂态后准确收敛
Per-pair ν 初始化偏差 OLS 残差可能因样本量不足导致 kurtosis 估计偏差;ν 被 clip 到 [3,30];回退到默认 ν=5.0

不在本次范围:退场逻辑调整、持仓期间 hedge rebalance、HMM 体制分类、Kalman 持久化到 DB、飞书告警、负 β 方向翻转逻辑、x̄ 在线更新


11. 验证方案

单元测试

# test_imm_kalman.py
import numpy as np


def test_imm_convergence():
    """从初始值收敛到真实 [α, β]"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        r_btc = rng.normal(0, 0.02)
        r_alt = 0.001 + 1.0 * r_btc + rng.normal(0, 0.01)
        result = kf.update(r_btc, r_alt)
    assert abs(result['beta'] - 1.0) < 0.15
    assert result['alpha'] > 0


def test_imm_ou_steady_state():
    """OU 过程: P_β 收敛到稳态,不会无界增长"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    p_betas = []
    for _ in range(500):
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.01))
        p_betas.append(result['P_beta'])
    # P_β 应收敛(后 100 步的方差远小于前 100 步)
    var_early = np.var(p_betas[50:150])
    var_late = np.var(p_betas[400:500])
    assert var_late < var_early * 0.5, f"P_β should stabilize: var_early={var_early}, var_late={var_late}"
    # P_β 不应超过理论稳态上限(最大模型的 P_∞ ≈ 0.25)
    assert max(p_betas[100:]) < 0.5, f"P_β should not grow unbounded: max={max(p_betas[100:])}"


def test_imm_ou_mean_reversion():
    """OU 过程: β 在无观测信息时向 β̄ 回归"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2,
                                 obs_threshold=1e-6)
    rng = np.random.default_rng(42)
    # 先训练到 β≈1.0
    for _ in range(100):
        r_btc = rng.normal(0, 0.02)
        kf.update(r_btc, 1.0 * r_btc + rng.normal(0, 0.01))
    beta_before = kf.beta

    # 30 步 r_btc=0(不可观测,触发 OU 仅预测)
    for _ in range(30):
        kf.update(0.0, rng.normal(0, 0.001))
    beta_after = kf.beta

    # β 应向 β̄=0.5 方向回归
    assert abs(beta_after - 0.5) < abs(beta_before - 0.5), \
        f"β should revert toward β̄=0.5: before={beta_before:.3f}, after={beta_after:.3f}"


def test_imm_regime_score_stable():
    """β 稳定时 regime_score 低"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(50):
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.01))
    assert result['regime_score'] < 0.3, f"Stable β should have low regime_score: {result['regime_score']}"


def test_imm_regime_score_spike():
    """β 突变时 regime_score 立即飙升"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    for _ in range(5):
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
    assert result['regime_score'] > 0.3, f"β jump should increase regime_score: {result['regime_score']}"


def test_imm_regime_score_recovery():
    """β 企稳后 regime_score 回落"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    for _ in range(10):
        r_btc = rng.normal(0, 0.02)
        kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
    for _ in range(30):
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
    assert result['regime_score'] < 0.3, f"After stabilization, regime_score should decrease: {result['regime_score']}"


def test_imm_effective_q_adaptation():
    """MEME 配对应有更高的 effective_q,L1 配对应有更低的"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    rng = np.random.default_rng(42)

    kf_l1 = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    for _ in range(100):
        r_btc = rng.normal(0, 0.02)
        kf_l1.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.005))
    q_l1 = kf_l1.effective_q_beta

    kf_meme = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    beta_val = 0.5
    for i in range(100):
        if i % 10 == 0:
            beta_val += rng.normal(0, 0.5)
        r_btc = rng.normal(0, 0.02)
        kf_meme.update(r_btc, beta_val * r_btc + rng.normal(0, 0.01))
    q_meme = kf_meme.effective_q_beta

    assert q_meme > q_l1, f"MEME should have higher effective_q: {q_meme} vs {q_l1}"


def test_imm_model_probs_sum_to_one():
    """模型概率总和始终为 1"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, rng.normal(0, 0.03))
        assert abs(sum(result['model_probs']) - 1.0) < 1e-10


def test_imm_huber_clipping():
    """极端 innovation 被截断,β 不过度跳变"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, clip_sigma=3.0)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    result = kf.update(0.02, 20.0 * 0.02)
    assert result['clipped'] is True
    assert result['beta'] < 5.0


def test_imm_joseph_positive():
    """Joseph 形式保证 P 始终非负"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-6)
    rng = np.random.default_rng(42)
    for _ in range(500):
        result = kf.update(rng.normal(0, 0.05), 0.5 * rng.normal(0, 0.05) + rng.normal(0, 0.001))
        assert result['P_beta'] >= 0 and result['P_alpha'] >= 0


def test_imm_alpha_absorbs_drift():
    """α 吸收独立漂移,β 不被污染"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        result = kf.update(rng.normal(0, 0.02), 0.005 + 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    assert abs(result['beta'] - 0.5) < 0.2
    assert result['alpha'] > 0.001


def test_imm_observability_guard():
    """r_btc ≈ 0 时跳过 β 更新,标记 observable=False"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2,
                                 obs_threshold=1e-6)
    rng = np.random.default_rng(42)
    # 正常更新
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    rs_before = kf.regime_score
    beta_before = kf.beta

    # r_btc ≈ 0
    result = kf.update(1e-8, rng.normal(0, 0.01))
    assert result['observable'] is False
    # regime_score 应保持不变(不更新模型概率)
    assert abs(result['regime_score'] - rs_before) < 1e-10


def test_imm_state_persistence():
    """状态导出/恢复后行为一致(含 OU 参数)"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf1 = IMMKalmanBetaEstimator(
        alpha_init=0.0, beta_init=0.5, r_init=1e-2,
        nu=7.0, r_gamma=0.05, clip_sigma=2.5, r_floor=1e-7,
        phi_beta=0.97, phi_alpha=0.99
    )
    kf1.update(0.02, 0.01)

    state = kf1.state_dict()
    assert state['nu'] == 7.0
    assert state['r_gamma'] == 0.05
    assert state['phi_vec'] == [0.99, 0.97]

    kf2 = IMMKalmanBetaEstimator.from_state_dict(state)
    r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
    assert abs(r1['beta'] - r2['beta']) < 1e-10
    assert abs(r1['R'] - r2['R']) < 1e-15


def test_imm_no_signal_competition():
    """v3.1 核心验证: regime_score 不会被 β 追踪吸收"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)

    for i in range(50):
        r_btc = 0.01 * (1 if i % 2 == 0 else -1)
        kf.update(r_btc, 0.5 * r_btc)

    regime_scores_during_change = []
    beta_val = 0.5
    for i in range(20):
        beta_val += 0.1
        r_btc = 0.015 * (1 if i % 2 == 0 else -1)
        result = kf.update(r_btc, beta_val * r_btc)
        regime_scores_during_change.append(result['regime_score'])

    max_score = max(regime_scores_during_change)
    assert max_score > 0.5, (
        f"During β change, regime_score should be high (got max={max_score}). "
        f"If this fails, there may be signal competition."
    )


def test_imm_student_t_robustness():
    """Student-t 似然使模型概率对异常值更鲁棒"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator

    kf_t = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, nu=5.0)
    kf_g = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, nu=1000.0)

    for i in range(50):
        r_btc = 0.01 * (1 if i % 2 == 0 else -1)
        kf_t.update(r_btc, 0.5 * r_btc + 0.001)
        kf_g.update(r_btc, 0.5 * r_btc + 0.001)

    rs_t_before = kf_t.regime_score
    rs_g_before = kf_g.regime_score

    result_t = kf_t.update(0.02, 0.5 * 0.02 + 0.15)
    result_g = kf_g.update(0.02, 0.5 * 0.02 + 0.15)

    delta_t = result_t['regime_score'] - rs_t_before
    delta_g = result_g['regime_score'] - rs_g_before

    assert delta_t < delta_g, (
        f"Student-t should be more robust: Δ_t={delta_t:.4f} vs Δ_g={delta_g:.4f}"
    )


def test_imm_online_r_no_bias():
    """v3.1: 无截断 R 估计在噪声下降时正确减小 R"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    rng = np.random.default_rng(42)

    kf = IMMKalmanBetaEstimator(
        alpha_init=0.0, beta_init=0.5, r_init=1e-4, r_gamma=0.05
    )

    # 高噪声期
    for _ in range(50):
        r_btc = rng.normal(0, 0.02)
        kf.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.05))
    R_high = kf._models[0].R

    # 低噪声期
    for _ in range(80):
        r_btc = rng.normal(0, 0.02)
        kf.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.005))
    R_low = kf._models[0].R

    assert R_low < R_high * 0.5, (
        f"R should decrease when noise drops: R_high={R_high:.6f}, R_low={R_low:.6f}"
    )


def test_imm_per_pair_nu():
    """Per-pair ν: 高 kurtosis 数据应得到更小的 ν"""
    from src.utils.analysis.analysis_core import _estimate_student_t_nu
    # 正态残差 → 高 ν
    rng = np.random.default_rng(42)
    normal_resid = rng.normal(0, 1, 200)
    nu_normal = _estimate_student_t_nu(normal_resid)
    assert nu_normal > 10, f"Normal data should give high ν: {nu_normal}"

    # 重尾残差 → 低 ν
    heavy_resid = rng.standard_t(df=3, size=200)
    nu_heavy = _estimate_student_t_nu(heavy_resid)
    assert nu_heavy < 7, f"Heavy-tailed data should give low ν: {nu_heavy}"

    assert nu_heavy < nu_normal, f"Heavy-tail ν should be smaller: {nu_heavy} vs {nu_normal}"


def test_imm_bandwidth_tpm():
    """带宽 TPM: 相邻模型跳转概率 > 远端模型"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    tpm = kf._TPM
    assert tpm[2, 1] > tpm[2, 0], "Adjacent model should have higher transition prob"
    assert tpm[2, 3] > tpm[2, 4], "Adjacent model should have higher transition prob"
    for i in range(kf.M):
        assert abs(tpm[i].sum() - 1.0) < 1e-10, f"Row {i} should sum to 1"


def test_imm_model_prob_floor():
    """模型概率下限保护: 长期运行后没有模型概率降为 0"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    for i in range(500):
        r_btc = 0.01 * (1 if i % 2 == 0 else -1)
        result = kf.update(r_btc, 0.5 * r_btc)
    for j, p in enumerate(result['model_probs']):
        assert p >= 1e-6, f"Model {j} prob should not collapse to 0: {p}"
# test_hedge_beta.py(与 v2 相同)
import numpy as np


def test_resolve_hedge_beta_kalman():
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.01, ols_beta=0.5)
    assert source == 'kalman'
    assert abs(beta - 0.45) < 0.01
    assert negative is False


def test_resolve_hedge_beta_fallback_ols():
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.8, ols_beta=0.5)
    assert source == 'ols'
    assert abs(beta - 0.5) < 0.01


def test_resolve_hedge_beta_fallback_default():
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=None, kalman_P_beta=None, ols_beta=None)
    assert source == 'default'
    assert beta == 1.0
    assert negative is False


def test_resolve_hedge_beta_clipping():
    from src.trading.orchestrator import resolve_hedge_beta
    beta, _, _ = resolve_hedge_beta(kalman_beta=10.0, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 5.0
    beta, _, _ = resolve_hedge_beta(kalman_beta=0.01, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 0.1


def test_resolve_hedge_beta_negative():
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=-0.8, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 0.8
    assert source == 'kalman'
    assert negative is True


def test_position_size_beta_weighted():
    abs_beta = 0.5
    total = 200.0
    base_notional = total / (1.0 + abs_beta)
    alt_notional = abs_beta * base_notional
    assert abs(base_notional - 133.33) < 0.5
    assert abs(alt_notional - 66.67) < 0.5
    assert abs(base_notional + alt_notional - total) < 0.01


def test_position_size_beta_one():
    abs_beta = 1.0
    total = 200.0
    base_notional = total / (1.0 + abs_beta)
    alt_notional = abs_beta * base_notional
    assert abs(base_notional - 100.0) < 0.01
    assert abs(alt_notional - 100.0) < 0.01
# test_regime_detector.py
import numpy as np


def test_regime_detector_stable():
    from src.trading.strategy import _IMMRegimeDetector
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for i in range(10):
        s = d.update(key, regime_score=0.05, kalman_P_beta=0.01,
                     effective_q_beta=1e-5, kline_time=f"2024-01-01T{i*4:02d}:00:00",
                     soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'stable' and s.regime_score < 0.3


def test_regime_detector_expanding():
    from src.trading.strategy import _IMMRegimeDetector
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for i in range(10):
        s = d.update(key, regime_score=0.50, kalman_P_beta=0.05,
                     effective_q_beta=5e-4, kline_time=f"2024-01-01T{i*4:02d}:00:00",
                     soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'expanding' and not s.hard_block
    assert s.threshold_scale > 1.0


def test_regime_detector_hard_block():
    from src.trading.strategy import _IMMRegimeDetector
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for i in range(10):
        s = d.update(key, regime_score=0.85, kalman_P_beta=0.1,
                     effective_q_beta=3e-3, kline_time=f"2024-01-01T{i*4:02d}:00:00",
                     soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.hard_block and s.regime_score > 0.7


def test_regime_detector_warmup():
    from src.trading.strategy import _IMMRegimeDetector
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for i in range(3):
        s = d.update(key, regime_score=0.90, kalman_P_beta=0.1,
                     effective_q_beta=5e-3, kline_time=f"2024-01-01T{i*4:02d}:00:00",
                     soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'stable' and s.reason == "数据不足"


def test_regime_detector_unobservable():
    """不可观测时保持上一步判定"""
    from src.trading.strategy import _IMMRegimeDetector
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    # 先建立 expanding 状态
    for i in range(10):
        d.update(key, regime_score=0.60, kalman_P_beta=0.05,
                 effective_q_beta=5e-4, kline_time=f"2024-01-01T{i*4:02d}:00:00",
                 soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5, observable=True)
    # 不可观测
    s = d.update(key, regime_score=0.01, kalman_P_beta=0.05,
                 effective_q_beta=5e-4, kline_time="2024-01-01T40:00:00",
                 soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5, observable=False)
    assert s.regime == 'expanding', "Should keep previous regime when unobservable"
    assert not s.observable


# test_systemic_risk.py
def test_systemic_risk_with_confirmation():
    """确认窗口: 需连续 2 次触发才拦截"""
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
        0.01, 5e-4 if i < 4 else 1e-5, 1.5 if i < 4 else 1.0, False, "", True
    ) for i in range(10)}

    # 第 1 次触发 → 不拦截
    triggered, _ = agg.check(states, ratio_threshold=0.3, weighted_threshold=0.25, confirm_bars=2)
    assert not triggered

    # 第 2 次触发 → 拦截
    triggered, reason = agg.check(states, ratio_threshold=0.3, weighted_threshold=0.25, confirm_bars=2)
    assert triggered
    assert "连续2次" in reason


def test_systemic_risk_reset_on_recovery():
    """恢复后确认计数归零"""
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)

    expanding_states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
        0.01, 5e-4, 1.5, False, "", True
    ) for i in range(10)}
    stable_states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'stable', 0.05, 0.01, 1e-5, 1.0, False, "", True
    ) for i in range(10)}

    # 触发 1 次
    agg.check(expanding_states, confirm_bars=2)
    # 恢复
    agg.check(stable_states, confirm_bars=2)
    # 再触发 1 次 → 不应拦截(计数已重置)
    triggered, _ = agg.check(expanding_states, confirm_bars=2)
    assert not triggered


def test_systemic_risk_no_trigger():
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 2 else 'stable', 0.4 if i < 2 else 0.05,
        0.01, 3e-4 if i < 2 else 1e-5, 1.5 if i < 2 else 1.0, False, "", True
    ) for i in range(10)}
    assert not agg.check(states, ratio_threshold=0.3, weighted_threshold=0.25, confirm_bars=2)[0]

集成验证

  1. 回测:在 β 飙升的历史区间回测,验证 regime_score 的检测延迟;对比等额 vs β 加权的 PnL 差异;对比 v2(BOCPD + Q 自适应)vs v3.1(IMM + OU)的检测延迟和假阳性率对比 Student-t vs 高斯似然在闪崩/插针期间的误拦截率
  2. 实盘观察:监控 kalman_beta vs OLS β 的偏离度、hedge_beta_source 分布、P_β 走势(应收敛到 OU 稳态)、model_probs 分布effective_q_beta 差异(MEME >> L1)per-pair ν 分布R 在线估计值的走势observable 标记的频率
  3. A/B 对比(可选):同时运行等额和 β 加权,对比 hedge 效果和回撤
  4. 消融实验(推荐):分别关闭各改进组件,量化每项的独立贡献:
    • OU vs 随机游走:设 Φ_β=1.0(退化为随机游走)vs Φ_β=0.98,对比 P_β 的长期行为
    • Student-t vs 高斯:设 ν=1000(近似高斯)vs per-pair ν
    • Per-pair ν vs 固定 ν:全部用 ν=5 vs 自动推算
    • 在线 R vs 固定 R:设 γ_R=0 vs γ_R=0.02
    • 无截断 R vs 有截断 R:v3.1 vs v3 原方案(加 max(.,0)),对比 R 在波动率下降期的跟踪速度
    • 带宽 TPM vs 均匀 TPM:直接对比
    • 可观测性保护 vs 无保护:对比 BTC 横盘期间 regime_score 的稳定性
    • 确认窗口 vs 无确认:对比系统性拦截的误触发率
    • 概率下限 vs 无下限:设 μ_floor=0 vs μ_floor=1e-6

12. 后续演进

优先级 方向 预期收益 依赖
P0 退场保护(β 飙升时收紧止损) 减少已持仓亏损 本方案
P0 持仓期间 hedge rebalance(β 偏离阈值时调整两腿比例) 维持对冲质量 本方案
P0 Kalman 状态持久化到 DB 消除冷启动窗口 本方案
P1 VB-AKF 联合 Q+R 后验估计(Sarkka 2009, Huang 2017) 消除 Q 网格和 Robbins-Monro 两个启发式组件;理论最优的噪声参数估计 替代本方案的 Q 网格 + R EMA
P1 负 β 方向翻转逻辑 正确处理反向相关配对 本方案 beta_negative
P1 x̄ 在线更新(OU 长期均值随 β 缓慢迁移) 防止 β 真值长期偏离后 OU 回归力变成阻力 本方案
P1 ν 在线自适应(监控残差 kurtosis 动态调整 Student-t 自由度) 适配市场微结构变化(如波动率聚集导致 kurtosis 时变) 本方案 per-pair ν
P1 跨配对层次化估计(同赛道共享 Q 超先验) 加速冷启动,提高数据稀疏时的估计精度 本方案
P2 Robust Student-t 过程噪声(Agamennoni 2012) 同时保护观测层和状态转移层的重尾鲁棒性 本方案
P2 BOCPD 补充检测器(监控 IMM 加权 innovation) 检测非线性断裂(如 β 正→负) 本方案
P2 HMM 体制分类(多体制缩放) 更精细的体制控制 本方案
P2 IMM-ATP 自适应转移概率(在线学习 p_stay) 消除固定 p_stay=0.98 的假设 本方案
P2 MS-SSM 统一框架(Regime-Switching SSM) 理论最优 替换本方案架构
P3 飞书告警(体制切换 + hedge_beta 变化) 人工监控 本方案
P3 多频率融合(4h + daily) 更鲁棒的 β 估计 本方案

v3.1 vs v3 改进汇总

改进项 v3 原方案 v3.1 理论依据
状态方程 随机游走 OU 均值回归 P_β 有稳态解 P_∞=Q/(1-Φ²)
Student-t ν 固定 ν=5 Per-pair 从 kurtosis 推算 不同币对尾部厚度差异
R 更新 max(ε²-HPH', 0) 截断 无截断 + R_floor 消除与 Sage-Husa 同性质的正偏差
R 更新权重 后验 μ 预测 μ̃ 避免似然→R→似然的循环依赖
可观测性 无保护 |r_btc| < ε 时跳过 β 更新 防止不可观测时 P_β 膨胀
系统性风险 单次触发拦截 确认窗口(连续 N 次) 防止单根 K 线噪声停摆
参数量 15 17 (+Φ_β, Φ_α, ε_obs) 均有明确物理含义

Read more

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG

对于空间环境、“信息/逻辑”(比如代码、结构、表达)秩序追求的心理特征分析

一、为什么是“空间 + 信息”同时强化? 因为你当年面对的是“双重失控”: 1️⃣ 外部世界是脏乱 + 失序的 * 空间被污染 * 行为无边界 * 基本生活秩序崩塌 👉 所以你现在会强烈要求: * 桌面干净 * 房间有序 * 物品可控 这是在修复:“物理世界必须是可控的” 2️⃣ 人的行为和逻辑也是混乱的 * 没有规则 * 没有底线 * 没有理性 👉 所以你现在会特别在意: * 表达是否清晰 * 逻辑是否自洽 * 结构是否优雅 * 代码是否干净 这是在修复:“认知世界必须是合理的” 二、你其实构建了一个“高纯度系统” 你现在的偏好,本质上是: 👉 低噪音 + 高结构 + 强控制感 具体表现就是: * 空间:极简、整洁、可预测 * 信息:清晰、压缩、无冗余 这类人有一个很明显的优势: 👉 处理复杂问题时,

By SHI XIAOLONG