Kalman Beta 双用途设计方案(v3 — IMM 架构版)

Kalman Beta 双用途设计方案(v3 — IMM 架构版)

1. 问题背景

当前系统在 Beta 值的使用上存在多个结构性缺陷:

缺陷 1:Beta 估计滞后

OLS BETA_WINDOW=100 固定窗口(≈17天@4h),β 结构性变化后系统需要一周以上才能感知:

T+0h    BTC 回暖,β 从 0.5 开始上升
T+12h   β 实际 2.0,OLS β̂ 仍 ≈ 0.6 → 误判为均值回归机会
T+24h   β 实际 4.0,OLS β̂ ≈ 0.8 → 继续误入场
T+7d    β 实际 8.0,OLS β̂ ≈ 2.0 → 协整假设彻底失效

核心问题:系统用滞后 β 计算的 z4h 产生入场信号,实际上反映的是 β 正在变化,而非 spread 正在回归

缺陷 2:仓位比例未使用 Beta

risk_manager.calculate_position_size() 对两腿使用等额名义价值

# 当前实现(risk_manager.py:164-170)
alt_size = position_usd / alt_price      # 两腿等额
base_size = position_usd / base_price    # 与 β 无关

配对交易的对冲比例应为 alt_notional = β × base_notional。等额开仓意味着:

  • β=0.5 时,Alt 腿过重 → 暴露于 Alt 的非系统性风险
  • β=2.0 时,Alt 腿过轻 → 对冲不足,价差波动被放大
  • β 变化时,无法调整比例 → 持仓期间 hedge 持续偏离

缺陷 3:固定 Q 无法适配不同币对

不同配对的 β 动态特征差异极大:

  • MEME 配对(如 PURR/HYPE):β 波动剧烈,需要更大的过程噪声 Q 来追踪
  • L1 配对(如 ETH/BTC):β 非常稳定,过大的 Q 会引入不必要的噪声

固定 q_β=1e-4 无法同时满足两类配对的需求。

缺陷 4:v2 的 Q 自适应与 BOCPD 信号竞争(v3 新增)

v2 方案中存在一个未被识别的设计矛盾

Q 自适应的目标: 使 normalized innovation → N(0,1)
BOCPD 的输入:    normalized innovation 的分布变化

两者对抗: Q 自适应越好 → innovation 越稳定 → BOCPD 越难检测变点

具体表现:

β 开始飙升
→ innovation 变大
→ Q 自适应增大 Q_β(追踪变化)
→ innovation 被 Q 调整"吸收",逐步回到 N(0,1)
→ BOCPD 看到的变点信号被削弱
→ 检测延迟甚至漏报

v2 通过 κ_up=1.05(每步仅放大 5%)故意放慢 Q 自适应来缓解,但这意味着 Q 自适应的效果也被人为削弱——两个组件互相妥协,都无法达到最优

缺陷 5:v2 的 Q 自适应依赖启发式超参数(v3 新增)

v2 的 Innovation-based Q 自适应(Mehra 1970)本质是带阈值的乘法调节器:

ν̄ > γ_upper(2.0) → Q_β *= κ_up(1.05)
ν̄ < γ_lower(0.3) → Q_β *= κ_down(0.98)

8 个超参数(η, γ_upper, γ_lower, κ_up, κ_down, q_ceil, q_floor, q₀)全部缺乏理论依据,且存在交互效应。加上 BOCPD 的 10 个参数,v2 共有 28 个需要手调的参数

当前系统弱点汇总

组件 问题 影响
analysis_core.py BETA_WINDOW=100 固定窗口 OLS β 估计滞后 ≈17天
risk_manager.py 等额名义价值,未使用 β 对冲比例错误
strategy.py adaptive_threshold=3.0 固定 β 飙升导致持续突破阈值
momentum_filter.py 只检测单腿价格趋势 不检测 spread/beta 体制切换
v2 Q 自适应 启发式阈值 + 与 BOCPD 信号竞争 两个组件互相削弱
v2 Sage-Husa R 仅正值更新导致正偏差 R 长期被高估

2. 方案概述

核心设计IMM(Interacting Multiple Model)Kalman Filter 估计时变 [α, β],其输出 同时服务两个用途

                   ┌─────────────────────────────────────────────────┐
                   │  IMMKalmanBetaEstimator (4h)                    │
                   │  输入: r_btc, r_alt (对数收益率)                  │
                   │  特性: M=5 并行模型, 贝叶斯模型概率              │
                   │  输出: beta, P_beta, model_probs, regime_score  │
                   └──────────────┬──────────────────────────────────┘
                                  │
                  ┌───────────────┼───────────────┐
                  ▼                               ▼
         用途 1: 体制检测                    用途 2: Hedge Ratio
         ┌─────────────────┐            ┌─────────────────────┐
         │ IMM 模型概率判定  │            │ 仓位比例计算          │
         │ P(高Q模型) 阈值   │            │ 输入: beta           │
         │ 输出: regime_score│            │ 输出: alt/base 数量比 │
         └────────┬────────┘            └──────────┬──────────┘
                  │                                │
                  ▼                                ▼
         入场信号过滤                         Beta 加权开仓
         (硬拦截/阈值缩放)              (alt_notional = β × base_notional)
                  │
                  ▼
         跨配对系统性风险聚合
         (加权 regime_score > 阈值 → 全局拦截)

v3 相比 v2 的核心算法升级

组件 v2 v3 提升
β 估计 单一 Kalman + 启发式 Q 自适应 IMM 多模型融合 消除 Q 自适应的 8 个启发式超参;贝叶斯最优模型选择
体制检测 BOCPD(与 Q 自适应信号竞争) IMM 模型概率(体制信号内生) 消除信号竞争;减少 10 个 BOCPD 参数
R 估计 Sage-Husa(正偏差) 固定 R(IMM 的 Q 选择吸收 R 变化) 消除 Sage-Husa 偏差问题
系统性风险 计数比例 OR 加权概率(阈值耦合) 双阈值独立设定 消除 threshold × 0.8 魔术数字
负 β 处理 返回符号信息 不变
总参数量 28 个(18 Kalman + 10 BOCPD) 17 个 减少 40%

为什么 IMM 能同时解决 Q 自适应和体制检测

IMM 的关键洞察:不需要先"猜对" Q 再检测变点,而是让多个 Q 值并行竞争,贝叶斯概率自动选出最优

MEME 配对(β 波动大):
  高Q模型(1e-3, 1e-2) 持续获得高概率 → effective_Q 自动偏大
  → 追踪快,且 regime_score 高 → 自然提高入场门槛

L1 配对(β 稳定):
  低Q模型(1e-6, 1e-5) 持续获得高概率 → effective_Q 自动偏小
  → 不追噪声,且 regime_score 低 → 正常入场

β 突变时:
  高Q模型的 likelihood 瞬间飙升 → 模型概率快速切换
  → regime_score 立即升高 → 拦截入场
  → 同时 β 估计通过概率加权自动跟上新值

与 v2 的根本区别:v2 中 Q 自适应和变点检测是两个独立系统在抢同一个信号;v3 中它们是同一个贝叶斯推断的两个输出,不存在竞争。

为什么 4h 频率足够

β 是两个资产之间的结构性关系(同赛道、资金流向、基本面联动),变化周期在天~周级别。

方法 响应 β 结构性变化 抗日内噪声 适用场景
OLS 100×4h ~7-10 天 当前系统(过于滞后)
4h IMM Kalman 1-4 根K线(4-16h) 体制检测 + Hedge Ratio
5m Kalman ~15-30 分钟 弱(追噪声) 不适用于 β 估计

IMM 的响应速度优于 v2 的单一自适应 Kalman:β 突变时,高Q 模型立即拟合新关系,概率权重瞬间切换,无需等 Q 自适应的逐步放大过程。

四层架构

组件 功能 v3 改进
第一层 IMM Kalman Filter M 个并行 Kalman 估计 [α, β],贝叶斯模型概率融合 替代 v2 单 Kalman + 启发式 Q 自适应
第二层 IMM 体制检测 P(高Q模型) 作为体制指标 → 入场信号过滤 替代 v2 BOCPD,无信号竞争
第三层 跨配对系统性风险聚合 统计所有配对体制状态 → 系统性事件拦截 双阈值独立设定
第四层 Beta 加权仓位计算 kalman_beta 计算两腿名义比例 负 β 方向感知(与 v2 相同)

3. 算法选型论证

3.1 为什么选 Kalman Filter(而非其他时变回归方法)

方法 优势 劣势 适用性评估
向量 Kalman Filter 最优线性估计(MMSE);递推 O(1);P_β 提供不确定性度量 假设线性高斯;单一 Q 不自适应 最优基础组件(配合 IMM 解决 Q 问题)
RLS (递推最小二乘) 实现简单;forgetting factor 天然自适应 无状态不确定性度量;不支持多模型融合 不适合(缺少概率信息)
粒子滤波 处理非线性/非高斯 O(N_particles) 计算量大;参数多 过度设计(β 的线性关系足够)
Online Bayesian LR 完全贝叶斯;自然输出后验 不支持时变参数(除非加 forgetting) 不如 Kalman 灵活
神经网络 (LSTM/Transformer) 可捕获非线性 需大量训练数据;不可解释 不适合生产系统

结论:Kalman Filter 是时变线性回归的理论最优解(Kalman 1960)。单一 Kalman 的 Q 固定问题通过 IMM 多模型融合解决。

3.2 为什么选 IMM 做 Q 自适应(而非 v2 的 Mehra 启发式)

方法 优势 劣势 适用性评估
IMM (Blom & Bar-Shalom 1988) 贝叶斯最优模型选择;无启发式阈值;模型概率直接指示体制;响应 β 突变仅需 1-2 步 M 倍计算量(M=5 对 2D 状态可忽略);需设定 Q 网格 最优选择
Innovation-based (Mehra 1970) 经典方法;实现简单 8 个启发式超参;与变点检测信号竞争;响应速度受 κ 限制 v2 方案(已被 v3 替代)
VB-AKF (Sarkka 2009) 联合估计 Q 和 R 的后验;数学严格 实现复杂;变分近似引入额外误差;不直接输出体制信号 P2 演进方向
多模型 GPB1 (简化 IMM) 比 IMM 简单(无混合步) 模型切换后状态不连续 不如 IMM 稳定

IMM 的核心优势

  1. Q 网格直观[1e-6, 1e-5, 1e-4, 1e-3, 1e-2] 覆盖"β几乎不变"到"β剧烈变化"的全频谱,每个值的物理含义清晰
  2. 无阈值调参:贝叶斯后验自动选择最优模型,不需要 γ_upper/γ_lower/κ_up/κ_down
  3. 内生体制信号P(高Q模型) 天然表示"β 是否正在快速变化",无需额外的变点检测器
  4. 瞬时响应:β 突变时高Q模型的 likelihood 立即飙升,模型概率 1-2 步内切换完成;不像 Mehra 方法受 κ=1.05 限制需数十步
  5. 对 2D 状态计算量可忽略:5 个 2×2 Kalman Filter 的总计算量 ≈ 100 次浮点运算

3.3 为什么 IMM 模型概率可替代 BOCPD

方法 优势 劣势 适用性评估
IMM 模型概率 信号内生(无竞争);校准良好的贝叶斯后验;0 额外参数 仅检测 Q 维度的变化(无法检测非线性断裂) v3 最优选择
BOCPD (Adams & MacKay 2007) 通用在线变点检测;输出 run length 后验 与 Q 自适应信号竞争;10 个额外参数 v2 方案(已被 v3 替代)
CUSUM 极简 无概率输出;阈值需手调 过于简单
HMM 多体制建模 体制数需预设;EM 需离线 P2 演进方向

为什么 IMM 模型概率是更好的体制指标

BOCPD 监控 innovation 分布变化来"推断"β 是否在变——这是一个间接信号。IMM 的模型概率直接回答"β 的变化速度属于哪个量级"——这是一个直接信号

BOCPD 路径:  innovation 变大 → 推断分布变了 → 推断β在变 → 变点概率升高
IMM 路径:    高Q模型 likelihood 升高 → 模型概率切换 → β变化速度指标升高

BOCPD 需要额外假设(NIG 先验、hazard rate),IMM 不需要。

保留 BOCPD 作为 P2 选项:当需要检测 β 的非线性断裂(如突然从正相关变为负相关,不仅是变化速度加快)时,BOCPD 可作为补充检测器。此时建议监控 IMM 的加权 innovation(而非 normalized innovation),以避免信号竞争。

3.4 为什么不用统一框架(Regime-Switching SSM)

Regime-Switching State Space Model (Kim 1994) 将 Kalman + 体制检测合二为一:

  • 优势:参数更少、理论更优雅
  • 劣势:Kim's approximation 有误差累积;实现复杂度高;调试困难

当前选择:IMM + 阈值体制判定。原因:

  1. IMM 已覆盖 Q 自适应 + 体制检测两个需求
  2. 实现简单(标准 IMM 教科书代码)
  3. 可独立调试每个模型的行为
  4. MS-SSM / HMM 作为 P2 演进方向保留

4. 详细设计

4.1 IMM Kalman Filter 时变 [α, β] 估计

数学模型

M 个并行状态空间模型(每个模型 j 使用不同的过程噪声 Q_β^(j)):

状态方程(所有模型共享结构,Q_β 不同):
  x_t = x_{t-1} + w_t,    w_t ~ N(0, Q^(j))

  其中 x_t = [α_t, β_t]'
       Q^(j) = [[q_α,     0    ],     q_α 所有模型共享
                 [  0,   q_β^(j)]]     q_β^(j) 各模型不同

观测方程(所有模型共享):
  r_alt_t = H_t × x_t + v_t,    v_t ~ N(0, R)

  其中 H_t = [1, r_btc_t]
       r_alt_t = log(alt_t / alt_{t-1})
       r_btc_t = log(btc_t / btc_{t-1})
       R 固定(初始化自 OLS 残差方差)

Q_β 网格设计

模型 j Q_β^(j) 物理含义 日 β 漂移 σ
0 1e-6 β 几乎不变 ~0.002
1 1e-5 β 缓慢变化 ~0.008
2 1e-4 β 正常变化(先验中心) ~0.024
3 1e-3 β 快速变化 ~0.077
4 1e-2 β 剧烈变化 / 结构性断裂 ~0.245

覆盖 4 个数量级,对数均匀分布,覆盖从 ETH/BTC 级稳定到 MEME 级波动的全部场景。

IMM 递推算法

输入: r_btc_t, r_alt_t, 上一步状态 {x^(j), P^(j), μ_j} for j=0..M-1

Step 1: 混合(Interaction)
  预测模型概率:
    μ̃_j = Σ_i π_{ij} × μ_i                           (M×1)
    其中 π_{ij} = P(模型j在t | 模型i在t-1)

  混合权重:
    w_{i|j} = π_{ij} × μ_i / μ̃_j                      (M×M)

  混合状态(为每个模型j准备初始条件):
    x̃^(j) = Σ_i w_{i|j} × x^(i)                       (2×1)
    P̃^(j) = Σ_i w_{i|j} × [P^(i) + (x^(i) - x̃^(j))(x^(i) - x̃^(j))']   (2×2)

Step 2: 并行滤波(每个模型 j 独立运行标准 Kalman)
  预测:
    P_pred^(j) = P̃^(j) + Q^(j)

  Innovation:
    ε^(j) = r_alt_t - H_t × x̃^(j)
    S^(j) = H_t × P_pred^(j) × H_t' + R

  Huber Clipping (c=3.0):
    ε̃^(j) = clip(ε^(j), -c×√S^(j), +c×√S^(j))

  Kalman 增益 + 更新:
    K^(j) = P_pred^(j) × H_t' / S^(j)
    x^(j) = x̃^(j) + K^(j) × ε̃^(j)

  Joseph 稳定形式:
    A^(j) = I - K^(j) × H_t
    P^(j) = A^(j) × P_pred^(j) × A^(j)' + R × K^(j) × K^(j)'

  似然(用原始 innovation,非截断后的):
    L_j = (2π S^(j))^{-0.5} × exp(-0.5 × ε^(j)² / S^(j))

Step 3: 模型概率更新
  μ_j = μ̃_j × L_j / Σ_k (μ̃_k × L_k)                (归一化)

Step 4: 融合输出
  β = Σ_j μ_j × β^(j)                                 (概率加权)
  α = Σ_j μ_j × α^(j)
  P_β = Σ_j μ_j × [P_β^(j) + (β^(j) - β)²]           (含模型间方差)
  regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j            (高Q模型总概率)
  effective_q_β = Σ_j μ_j × q_β^(j)                    (有效Q值)

模型转移概率矩阵 Π

Π[i,j] = { p_stay          if i == j          (对角线: 0.98)
          { (1-p_stay)/(M-1) if i != j          (非对角: 0.005)

p_stay=0.98 含义:模型在 4h 尺度上 98% 概率保持不变,2% 概率切换到其他模型。对应 β 变化速度的平均持续时间 ≈ 50 根 K线(~8天),与原 BOCPD hazard=1/50 一致。

关键输出信号及其用途

信号 用途 1: 体制检测 用途 2: Hedge Ratio
beta (Σ μ_j × β^(j)) β 趋势监控 直接作为 hedge ratio
alpha (Σ μ_j × α^(j)) 吸收独立漂移,避免污染 β
P_beta (含模型间方差) 不确定性度量(比单模型更准确) hedge ratio 置信度(P_β 过大时降级为 OLS β)
regime_score (Σ 高Q模型概率) 直接用于体制检测
model_probs (μ 向量) 诊断:哪个 Q 模型主导
effective_q_beta (Σ μ_j × q_β^(j)) 诊断:当前有效 Q_β

参数

参数 符号 默认值 含义 调优
Q_β 网格 q_β_grid [1e-6, 1e-5, 1e-4, 1e-3, 1e-2] M=5 个模型的 β 过程噪声 对数均匀,覆盖全频谱
α 过程噪声 q_α 1e-5 α 每 4h 变化方差(所有模型共享) 通常不调
观测噪声 R OLS 残差方差 固定,不在线调整 初始化
R 下限 R_floor 1e-8 防止 R 退化为零
Innovation 截断 c 3.0 Huber 截断(σ 倍数) ↓鲁棒 ↑灵敏
初始 [α, β] OLS 估计 所有模型共享初始值
初始 P P₀ diag(0.1, 1.0) 初始不确定性
模型持久性 p_stay 0.98 转移矩阵对角线 ↑模型切换更保守
高Q阈值 q_high 1e-3 Q_β ≥ 此值归入"高Q"组

总计 9 个核心参数(vs v2 的 18 个,减少 50%)。

q_α = q_β_grid[2]/10 → α 变化比中位 β 慢一个量级。
p_stay = 0.98 → 模型平均持续 50 步(~8天),与 β 的结构性变化周期匹配。

文件:src/config.py

# ═══ IMM Kalman Filter 参数(v3)═══
IMM_Q_BETA_GRID: list[float] = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
IMM_Q_ALPHA: float = 1e-5
IMM_P0_ALPHA: float = 0.1
IMM_P0_BETA: float = 1.0
IMM_R_FLOOR: float = 1e-8
IMM_CLIP_SIGMA: float = 3.0
IMM_TRANSITION_PROB: float = 0.98
IMM_HIGH_Q_THRESHOLD: float = 1e-3

# ═══ Hedge Ratio 参数 ═══
HEDGE_BETA_MIN: float = 0.1     # β 下限保护
HEDGE_BETA_MAX: float = 5.0     # β 上限保护
HEDGE_BETA_P_MAX: float = 0.5   # P_β 超过此值时降级为 OLS β

文件:src/utils/analysis/analysis_core.py

新增类_KalmanModel(单一 Kalman 模型,IMM 的内部组件)

import numpy as np


class _KalmanModel:
    """单一 Kalman Filter 模型(IMM 的内部组件)

    2D 状态 [α, β],固定 Q 和 R。
    Joseph 稳定形式 + Huber clipping。
    """

    def __init__(
        self,
        x: np.ndarray,           # [α, β] 初始状态
        P: np.ndarray,           # 2×2 初始协方差
        q_alpha: float,
        q_beta: float,
        R: float,
        r_floor: float = 1e-8,
        clip_sigma: float = 3.0,
    ):
        self.x = x.copy()
        self.P = P.copy()
        self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
        self.R = max(R, r_floor)
        self._r_floor = r_floor
        self._clip_sigma = clip_sigma

    def update(self, r_btc: float, r_alt: float) -> dict:
        """标准 Kalman predict + update,返回滤波结果和似然"""
        # 预测步
        P_pred = self.P + self.Q
        H = np.array([1.0, r_btc], dtype=np.float64)

        # Innovation
        innovation = r_alt - H @ self.x
        S = float(H @ P_pred @ H) + self.R
        S = max(S, 1e-15)

        # 似然(用原始 innovation)
        log_likelihood = -0.5 * (np.log(2 * np.pi * S) + innovation ** 2 / S)

        # Huber clipping
        sqrt_S = S ** 0.5
        norm_innov = innovation / sqrt_S
        clipped = False
        innovation_for_update = innovation
        if abs(norm_innov) > self._clip_sigma:
            innovation_for_update = self._clip_sigma * sqrt_S * (
                1.0 if innovation > 0 else -1.0
            )
            clipped = True

        # Kalman 增益 + 更新
        K = (P_pred @ H) / S
        self.x = self.x + K * innovation_for_update

        # Joseph 稳定形式
        I2 = np.eye(2)
        A = I2 - np.outer(K, H)
        self.P = A @ P_pred @ A.T + self.R * np.outer(K, K)

        return {
            'alpha': float(self.x[0]),
            'beta': float(self.x[1]),
            'P_beta': float(self.P[1, 1]),
            'P_alpha': float(self.P[0, 0]),
            'innovation': innovation,
            'normalized_innovation': norm_innov,
            'S': S,
            'log_likelihood': log_likelihood,
            'clipped': clipped,
            'kalman_gain_beta': float(K[1]),
        }

新增类IMMKalmanBetaEstimator(IMM 多模型融合估计器)

class IMMKalmanBetaEstimator:
    """Interacting Multiple Model Kalman Filter 时变 [α, β] 联合估计器

    核心思想 (Blom & Bar-Shalom, 1988):
        M 个 Kalman Filter 并行运行,每个使用不同的 Q_β,
        通过贝叶斯模型概率实时加权融合。

    相比 v2 (Innovation-based Q 自适应) 的优势:
        1. 消除 8 个启发式超参数(γ/κ/ceil/floor 等)
        2. 模型概率天然作为体制指标(无需 BOCPD,无信号竞争)
        3. β 突变时 1-2 步即切换(vs v2 需 κ_up 逐步放大)
        4. 理论上是贝叶斯最优模型选择

    双用途输出:
        - beta → hedge ratio(直接使用概率加权 β)
        - regime_score → 体制检测(高Q模型的总概率)

    每根新 4h K线调用 update() 一次。
    """

    def __init__(
        self,
        alpha_init: float,
        beta_init: float,
        q_beta_grid: list[float] | None = None,
        q_alpha: float = 1e-5,
        r_init: float = 1e-2,
        p0_alpha: float = 0.1,
        p0_beta: float = 1.0,
        r_floor: float = 1e-8,
        clip_sigma: float = 3.0,
        transition_prob: float = 0.98,
        high_q_threshold: float = 1e-3,
    ):
        if q_beta_grid is None:
            q_beta_grid = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]

        self.M = len(q_beta_grid)
        self._q_beta_grid = list(q_beta_grid)
        self._high_q_threshold = high_q_threshold
        self._n_updates = 0

        # 初始化 M 个 Kalman 模型
        x0 = np.array([alpha_init, beta_init], dtype=np.float64)
        P0 = np.diag([p0_alpha, p0_beta]).astype(np.float64)
        self._models = [
            _KalmanModel(x0, P0, q_alpha, q_b, r_init, r_floor, clip_sigma)
            for q_b in q_beta_grid
        ]

        # 模型概率(均匀初始化)
        self._mu = np.ones(self.M) / self.M

        # 转移概率矩阵
        off_diag = (1.0 - transition_prob) / (self.M - 1)
        self._TPM = np.full((self.M, self.M), off_diag)
        np.fill_diagonal(self._TPM, transition_prob)

        # 高Q模型索引(预计算)
        self._high_q_indices = [
            j for j, q in enumerate(q_beta_grid) if q >= high_q_threshold
        ]

    @property
    def beta(self) -> float:
        return float(sum(
            self._mu[j] * self._models[j].x[1] for j in range(self.M)
        ))

    @property
    def beta_variance(self) -> float:
        b = self.beta
        return float(sum(
            self._mu[j] * (self._models[j].P[1, 1] + (self._models[j].x[1] - b) ** 2)
            for j in range(self.M)
        ))

    @property
    def regime_score(self) -> float:
        return float(sum(self._mu[j] for j in self._high_q_indices))

    @property
    def effective_q_beta(self) -> float:
        return float(sum(
            self._mu[j] * self._q_beta_grid[j] for j in range(self.M)
        ))

    def update(self, r_btc: float, r_alt: float) -> dict:
        """接收一对新的对数收益率,执行 IMM 更新

        Returns:
            dict with keys: alpha, beta, P_beta, P_alpha,
                  regime_score, effective_q_beta, model_probs,
                  dominant_model_idx, innovation, normalized_innovation, clipped
        """
        self._n_updates += 1

        # ── Step 1: 混合(Interaction)──
        mu_predicted = self._TPM.T @ self._mu  # 预测模型概率 (M,)

        # 混合权重 w[i,j] = π[i,j] * μ[i] / μ̃[j]
        mix_weights = np.zeros((self.M, self.M))
        for j in range(self.M):
            if mu_predicted[j] > 1e-30:
                for i in range(self.M):
                    mix_weights[i, j] = self._TPM[i, j] * self._mu[i] / mu_predicted[j]

        # 为每个模型 j 准备混合后的初始条件
        for j in range(self.M):
            x_mixed = np.zeros(2)
            for i in range(self.M):
                x_mixed += mix_weights[i, j] * self._models[i].x

            P_mixed = np.zeros((2, 2))
            for i in range(self.M):
                dx = self._models[i].x - x_mixed
                P_mixed += mix_weights[i, j] * (
                    self._models[i].P + np.outer(dx, dx)
                )

            self._models[j].x = x_mixed
            self._models[j].P = P_mixed

        # ── Step 2: 并行滤波 ──
        results = []
        log_likelihoods = np.zeros(self.M)
        for j in range(self.M):
            r = self._models[j].update(r_btc, r_alt)
            results.append(r)
            log_likelihoods[j] = r['log_likelihood']

        # ── Step 3: 模型概率更新 ──
        # 数值稳定的 log-space 计算
        log_mu = np.log(np.maximum(mu_predicted, 1e-30)) + log_likelihoods
        log_mu -= np.max(log_mu)  # 防止溢出
        self._mu = np.exp(log_mu)
        total = np.sum(self._mu)
        if total > 0:
            self._mu /= total
        else:
            self._mu = np.ones(self.M) / self.M

        # ── Step 4: 融合输出 ──
        beta = sum(self._mu[j] * results[j]['beta'] for j in range(self.M))
        alpha = sum(self._mu[j] * results[j]['alpha'] for j in range(self.M))
        P_beta = sum(
            self._mu[j] * (results[j]['P_beta'] + (results[j]['beta'] - beta) ** 2)
            for j in range(self.M)
        )
        P_alpha = sum(
            self._mu[j] * (results[j]['P_alpha'] + (results[j]['alpha'] - alpha) ** 2)
            for j in range(self.M)
        )
        regime_score = sum(self._mu[j] for j in self._high_q_indices)
        effective_q = sum(self._mu[j] * self._q_beta_grid[j] for j in range(self.M))

        dominant = int(np.argmax(self._mu))
        dom_result = results[dominant]

        return {
            'alpha': float(alpha),
            'beta': float(beta),
            'P_beta': float(P_beta),
            'P_alpha': float(P_alpha),
            'regime_score': float(regime_score),
            'effective_q_beta': float(effective_q),
            'model_probs': self._mu.copy(),
            'dominant_model_idx': dominant,
            'innovation': dom_result['innovation'],
            'normalized_innovation': dom_result['normalized_innovation'],
            'clipped': dom_result['clipped'],
            'kalman_gain_beta': dom_result['kalman_gain_beta'],
            'R': self._models[0].R,
        }

    def state_dict(self) -> dict:
        return {
            'models': [
                {'x': m.x.tolist(), 'P': m.P.tolist(), 'Q': m.Q.tolist(), 'R': m.R}
                for m in self._models
            ],
            'mu': self._mu.tolist(),
            'q_beta_grid': self._q_beta_grid,
            'high_q_threshold': self._high_q_threshold,
            'n_updates': self._n_updates,
            'TPM': self._TPM.tolist(),
        }

    @classmethod
    def from_state_dict(cls, d: dict) -> 'IMMKalmanBetaEstimator':
        obj = cls.__new__(cls)
        obj.M = len(d['models'])
        obj._q_beta_grid = d['q_beta_grid']
        obj._high_q_threshold = d['high_q_threshold']
        obj._n_updates = d['n_updates']
        obj._mu = np.array(d['mu'], dtype=np.float64)
        obj._TPM = np.array(d['TPM'], dtype=np.float64)
        obj._models = []
        for md in d['models']:
            m = _KalmanModel.__new__(_KalmanModel)
            m.x = np.array(md['x'], dtype=np.float64)
            m.P = np.array(md['P'], dtype=np.float64)
            m.Q = np.array(md['Q'], dtype=np.float64)
            m.R = md['R']
            m._r_floor = 1e-8
            m._clip_sigma = 3.0
            obj._models.append(m)
        obj._high_q_indices = [
            j for j, q in enumerate(obj._q_beta_grid) if q >= obj._high_q_threshold
        ]
        return obj

改动函数:calculate_cointegration_params_dual_window()

在现有 OLS 之后,新增 IMM Kalman Filter 输出:

def calculate_cointegration_params_dual_window(
    base_klines, alt_klines,
    beta_window=None, zscore_window=None,
    kalman_state: dict | None = None,  # [新增] IMM 状态
):
    # ... 现有 OLS 逻辑不变 ...

    # ═══ 新增:IMM Kalman Filter 更新 ═══
    kalman_result = None
    kalman_state_out = kalman_state

    if len(aligned) >= 2:
        r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
        r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])

        if kalman_state is not None:
            kf = IMMKalmanBetaEstimator.from_state_dict(kalman_state)
        else:
            ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
            kf = IMMKalmanBetaEstimator(
                alpha_init=alpha_ols if use_alpha else 0.0,
                beta_init=beta_ols,
                q_beta_grid=IMM_Q_BETA_GRID,
                q_alpha=IMM_Q_ALPHA,
                r_init=max(ols_residual_var, 1e-6),
                p0_alpha=IMM_P0_ALPHA, p0_beta=IMM_P0_BETA,
                r_floor=IMM_R_FLOOR, clip_sigma=IMM_CLIP_SIGMA,
                transition_prob=IMM_TRANSITION_PROB,
                high_q_threshold=IMM_HIGH_Q_THRESHOLD,
            )

        kalman_result = kf.update(r_btc, r_alt)
        kalman_state_out = kf.state_dict()

    return {
        # ... 现有字段不变 ...
        'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
        'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
        'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
        'kalman_regime_score': kalman_result['regime_score'] if kalman_result else 0.0,
        'kalman_effective_q': kalman_result['effective_q_beta'] if kalman_result else 1e-4,
        'kalman_model_probs': kalman_result['model_probs'].tolist() if kalman_result else None,
        'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
        'kalman_clipped': kalman_result['clipped'] if kalman_result else False,
        'kalman_state': kalman_state_out,
    }

改动函数:analyze_pair_advanced() / analyze_multi_period()

透传方式与 v2 相同,将 v2 的 kalman_q_beta / kalman_innov_sq_ema 替换为 kalman_regime_score / kalman_effective_q / kalman_model_probs


4.2 IMM 体制检测(用途 1)

设计原理

v2 使用 BOCPD 监控 normalized innovation 的分布变化来推断体制切换,存在与 Q 自适应的信号竞争问题。

v3 直接使用 IMM 的 regime_score(高Q模型的总概率)作为体制指标。这是一个天然的贝叶斯后验概率,不需要额外的变点检测器。

regime_score 定义:
  regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j

  = P(β 正在以 ≥ q_high 的速率变化 | 历史数据)
regime_score 含义 动作
< soft_prob (0.3) 低Q模型主导,β 稳定 正常入场
[soft_prob, hard_prob) 高Q模型概率上升,β 开始变化 阈值缩放
≥ hard_prob (0.7) 高Q模型主导,β 正在剧烈变化 硬拦截

文件:src/trading/config.py

StrategyParams 新增字段(v3 简化版,替代 v2 的 BOCPD 参数):

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # Beta 体制自适应过滤器(v3: 基于 IMM 模型概率)
    beta_regime_enabled: bool = True
    beta_regime_soft_prob: float = 0.3       # regime_score > 此值 → 开始缩放阈值
    beta_regime_hard_prob: float = 0.7       # regime_score > 此值 → 硬拦截
    beta_regime_scale_max: float = 2.0       # 阈值最大缩放倍数
    beta_regime_warmup: int = 5              # 最少 N 根 4h K线才开始判定

对比 v2 参数量:v2 有 10 个 BOCPD 参数(hazard_rate, max_run_length, adaptive_hazard, hazard_min, hazard_max, hazard_innov_ref, soft_prob, hard_prob, scale_max, warmup),v3 仅需 4 个参数(soft_prob, hard_prob, scale_max, warmup)。

文件:src/trading/strategy.py

_BetaRegimeState 数据类
@dataclass
class _BetaRegimeState:
    """Beta 体制检测结果(v3: 基于 IMM 模型概率)"""
    regime: str               # 'stable' | 'expanding'
    regime_score: float       # P(高Q模型) ∈ [0, 1]
    kalman_P_beta: float
    effective_q_beta: float
    threshold_scale: float    # 阈值缩放因子 (>=1.0)
    hard_block: bool
    reason: str
_IMMRegimeDetector 类(替代 v2 的 _BOCPDDetector
class _IMMRegimeDetector:
    """基于 IMM 模型概率的 Beta 体制检测器

    直接使用 IMMKalmanBetaEstimator 输出的 regime_score
    (高Q模型总概率)判定 β 是否处于快速变化状态。

    相比 v2 的 BOCPD:
        - 无信号竞争(体制信号是 IMM 的内生输出)
        - 无额外参数(不需要 hazard rate, max_run_length 等)
        - 校准更好(regime_score 是贝叶斯后验概率)

    内存: O(1) per pair(仅存储最新状态)
    计算: O(1) per check(简单阈值比较)
    """

    def __init__(self):
        self._pair_states: dict[PairKey, _BetaRegimeState] = {}
        self._n_updates: dict[PairKey, int] = {}

    def update(
        self,
        key: PairKey,
        regime_score: float,
        kalman_P_beta: float,
        effective_q_beta: float,
        kline_time: str,
        soft_prob: float,
        hard_prob: float,
        scale_max: float,
        warmup: int,
    ) -> _BetaRegimeState:
        """根据 IMM regime_score 判定当前体制"""

        n = self._n_updates.get(key, 0) + 1
        self._n_updates[key] = n

        if n < warmup:
            state = _BetaRegimeState(
                'stable', regime_score, kalman_P_beta,
                effective_q_beta, 1.0, False, "数据不足"
            )
            self._pair_states[key] = state
            return state

        if regime_score >= hard_prob:
            state = _BetaRegimeState(
                'expanding', regime_score, kalman_P_beta,
                effective_q_beta, scale_max, True,
                f"Beta硬拦截: regime_score={regime_score:.3f}>={hard_prob} "
                f"eff_Q={effective_q_beta:.1e}"
            )
        elif regime_score >= soft_prob:
            import math
            midpoint = (soft_prob + hard_prob) / 2
            steepness = 6.0 / max(hard_prob - soft_prob, 0.01)
            steepness = min(steepness, 20.0)
            t = 1.0 / (1.0 + math.exp(-steepness * (regime_score - midpoint)))
            scale = 1.0 + t * (scale_max - 1.0)
            state = _BetaRegimeState(
                'expanding', regime_score, kalman_P_beta,
                effective_q_beta, scale, False,
                f"Beta缩放: regime_score={regime_score:.3f} scale={scale:.2f} "
                f"eff_Q={effective_q_beta:.1e}"
            )
        else:
            state = _BetaRegimeState(
                'stable', regime_score, kalman_P_beta,
                effective_q_beta, 1.0, False,
                f"Beta稳定: regime_score={regime_score:.3f} "
                f"eff_Q={effective_q_beta:.1e}"
            )

        self._pair_states[key] = state
        return state

    def get_all_states(self) -> dict[PairKey, _BetaRegimeState]:
        return self._pair_states

    def cleanup_pair(self, key: PairKey) -> None:
        self._pair_states.pop(key, None)
        self._n_updates.pop(key, None)

4.3 跨配对系统性风险聚合(v3 双阈值独立版)

设计原理

v2 使用 weighted_cp >= systemic_threshold × 0.8 将加权概率阈值耦合到计数比例阈值。v3 将两个阈值完全独立设定,消除魔术数字。

文件:src/trading/config.py

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # 系统性风险参数(v3: 独立阈值)
    systemic_risk_enabled: bool = True
    systemic_ratio_threshold: float = 0.3      # expanding 配对比例阈值
    systemic_weighted_threshold: float = 0.25  # 加权 regime_score 阈值(独立设定)

文件:src/trading/strategy.py

class _SystemicRiskAggregator:
    """统计所有配对的 IMM 体制状态,
    基于双独立阈值判断系统性风险。

    v3 改进:
        - ratio_threshold 和 weighted_threshold 完全独立(消除 0.8 魔术数字)
        - 加权聚合支持仓位权重
    """

    def __init__(self, enabled: bool = True):
        self._enabled = enabled

    def check(
        self,
        pair_states: dict[PairKey, _BetaRegimeState],
        ratio_threshold: float = 0.3,
        weighted_threshold: float = 0.25,
        position_weights: dict[PairKey, float] | None = None,
    ) -> tuple[bool, str]:
        if not self._enabled or not pair_states:
            return False, ""

        total = len(pair_states)
        n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
        ratio = n_expanding / total

        # 加权平均 regime_score
        if position_weights:
            total_weight = sum(position_weights.get(k, 1.0) for k in pair_states)
            weighted_score = sum(
                position_weights.get(k, 1.0) * s.regime_score
                for k, s in pair_states.items()
            ) / total_weight
        else:
            weighted_score = sum(
                s.regime_score for s in pair_states.values()
            ) / total

        # 双独立条件触发
        ratio_triggered = ratio >= ratio_threshold
        weighted_triggered = weighted_score >= weighted_threshold

        if ratio_triggered or weighted_triggered:
            return True, (
                f"系统性风险: {n_expanding}/{total} ({ratio:.0%}) 配对扩张态"
                f"{'(比例触发)' if ratio_triggered else ''}, "
                f"加权regime_score={weighted_score:.3f}"
                f"{'(加权触发)' if weighted_triggered else ''}"
            )
        return False, ""

4.4 Beta 加权仓位计算(用途 2,与 v2 相同)

设计原理

标准配对交易的对冲方程:

spread_t = log(P_alt_t) - β × log(P_base_t) - α

要使 spread 对 P_base 的变动无敞口,两腿名义价值须满足:

alt_notional = |β| × base_notional

hedge_beta 选择逻辑

def resolve_hedge_beta(
    kalman_beta: float | None,
    kalman_P_beta: float | None,
    ols_beta: float | None,
    p_beta_max: float = HEDGE_BETA_P_MAX,
    beta_min: float = HEDGE_BETA_MIN,
    beta_max: float = HEDGE_BETA_MAX,
) -> tuple[float, str, bool]:
    """选择用于仓位计算的 hedge_beta

    优先 Kalman β,不确定性过高时降级为 OLS β,最后兜底 β=1.0

    Returns:
        (hedge_beta, source, beta_negative)
        hedge_beta: 绝对值,用于仓位比例计算
        source: 'kalman' | 'ols' | 'default'
        beta_negative: True 表示负相关配对,策略层需翻转方向
    """
    raw_beta = None
    source = 'default'

    if kalman_beta is not None and kalman_P_beta is not None:
        if kalman_P_beta <= p_beta_max:
            raw_beta = kalman_beta
            source = 'kalman'
        else:
            logger.debug(f"Kalman P_β={kalman_P_beta:.4f} > {p_beta_max},降级为 OLS β")

    if raw_beta is None and ols_beta is not None:
        raw_beta = ols_beta
        source = 'ols'

    if raw_beta is None:
        return 1.0, 'default', False

    beta_negative = raw_beta < 0
    beta = np.clip(abs(raw_beta), beta_min, beta_max)

    if beta_negative:
        logger.warning(f"负 β 检测: raw_β={raw_beta:.4f}({source}),配对为反向相关")

    return float(beta), source, beta_negative

文件:src/trading/risk_manager.py

改动 calculate_position_size() 增加 hedge_beta 参数(与 v2 相同):

def calculate_position_size(
    self,
    signal: PairTradeSignal,
    alt_price: float,
    base_price: float = 0.0,
    available_balance: float = 0.0,
    hedge_beta: float = 1.0,
    hedge_beta_source: str = '',
) -> tuple[float, float]:
    """计算仓位大小(Beta 加权)

    Beta 加权逻辑:
        base_notional = total_position / (1 + |β|)
        alt_notional  = |β| × base_notional
        总名义价值 = base_notional + alt_notional = total_position(保持不变)

    当 β=1.0 时退化为等额(向后兼容)。
    """
    # ... 现有 base_usd / 信号强度缩放 / 上限检查 / 余额检查 逻辑不变 ...

    alt_size = 0.0
    base_size = 0.0

    if self._config.pair_mode == "pair" and base_price > 0 and alt_price > 0:
        abs_beta = max(abs(hedge_beta), 0.1)
        total_position = position_usd * 2

        base_notional = total_position / (1.0 + abs_beta)
        alt_notional = abs_beta * base_notional

        alt_size = alt_notional / alt_price
        base_size = base_notional / base_price

        logger.info(
            f"仓位计算(β加权): 基础=${base_usd:.0f} × 缩放={scale} | "
            f"β={hedge_beta:.3f}({hedge_beta_source}) | "
            f"Alt: {alt_size:.2f} × ${alt_price:.4f} ≈ ${alt_notional:.0f} | "
            f"Base: {base_size:.2f} × ${base_price:.4f} ≈ ${base_notional:.0f} | "
            f"比例 {alt_notional/base_notional:.2f}:1"
        )
    elif alt_price > 0:
        alt_size = position_usd / alt_price

    return alt_size, base_size

文件:src/trading/models.py

与 v2 相同:

@dataclass
class PairTradeSignal:
    # ... 现有字段 ...
    hedge_beta: float = 1.0
    hedge_beta_source: str = 'default'
    beta_negative: bool = False

@dataclass
class PairPosition:
    # ... 现有字段 ...
    entry_hedge_beta: float = 1.0

4.5 编排层集成

文件:src/trading/orchestrator.py

新增状态_kalman_states: dict[PairKey, dict]

class TradingOrchestrator:
    def __init__(self, ...):
        # ... 现有 ...
        self._kalman_states: dict[PairKey, dict] = {}

process_analysis() 改动:从 multi_period_result 提取 IMM 输出,传给 strategy.process_tick()on_entry_signal()

def process_analysis(self, symbol, z4h, multi_period_result, timestamp, ...):
    # ... 现有输入验证 ...

    # [v3] 提取 IMM Kalman 输出
    kalman_beta = multi_period_result.get('kalman_beta')
    kalman_P_beta = multi_period_result.get('kalman_P_beta')
    kalman_regime_score = multi_period_result.get('kalman_regime_score')
    kalman_effective_q = multi_period_result.get('kalman_effective_q')

    # [v3] 更新 Kalman 状态缓存
    kalman_state_new = multi_period_result.get('kalman_state')
    if kalman_state_new is not None:
        self._kalman_states[(symbol, base_symbol)] = kalman_state_new

    entry_signal, exit_signal = self._strategy.process_tick(
        symbol, base_symbol, z4h, timestamp,
        kline_time=kline_time, latest_price=price_for_log,
        kalman_regime_score=kalman_regime_score,   # [v3] 体制检测
        kalman_P_beta=kalman_P_beta,               # [v3] 体制检测
        kalman_effective_q=kalman_effective_q,      # [v3] 诊断
        alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv,
    )

    if entry_signal is not None:
        ols_beta = multi_period_result.get('details', {}).get(
            ('4h', '60d'), {}).get('cointegration_new', {}).get('beta')
        hedge_beta, hedge_source, beta_negative = resolve_hedge_beta(
            kalman_beta, kalman_P_beta, ols_beta
        )

        if beta_negative:
            logger.warning(
                f"负 β 配对 {symbol}|{base_symbol}: β={kalman_beta:.3f}, "
                f"当前方向逻辑可能需要翻转,建议人工复核"
            )

        self.on_entry_signal(
            symbol, multi_period_result,
            latest_alt_price=price_for_log,
            direction=entry_signal.direction,
            adaptive_z=entry_signal.adaptive_z,
            hedge_beta=hedge_beta,
            hedge_beta_source=hedge_source,
            beta_negative=beta_negative,
        )

on_entry_signal() 改动与 v2 相同(传递 hedge_beta 到信号)。


4.6 策略层集成(用途 1)

文件:src/trading/strategy.py

__init__

self._regime_detector = _IMMRegimeDetector()  # v3: 替代 v2 的 _BOCPDDetector
self._systemic_aggregator = _SystemicRiskAggregator(enabled=True)

process_tick() 签名

def process_tick(
    self, symbol, base_symbol, z4h, timestamp,
    kline_time=None, latest_price=None,
    kalman_regime_score: float | None = None,     # [v3] 体制检测
    kalman_P_beta: float | None = None,           # [v3] 体制检测
    kalman_effective_q: float | None = None,      # [v3] 诊断
    alt_ohlcv=None, base_ohlcv=None,
) -> tuple[EntrySignal | None, ExitSignal | None]:

_process_tick_unlocked 新 K 线更新块

if is_new_candle:
    # ... 现有: Welford 更新, EMA 更新 ...
    if kalman_regime_score is not None and kline_time is not None:
        # v3: 直接用 IMM regime_score 判定体制(无需 BOCPD update)
        pass  # 体制判定在 _check_entry() 中按需执行

_check_entry() — z4h 过滤之后、方向判断之前

# ── [v3] Beta 体制检查(基于 IMM 模型概率)──
if params.beta_regime_enabled and kalman_regime_score is not None:
    beta_state = self._regime_detector.update(
        key,
        regime_score=kalman_regime_score,
        kalman_P_beta=kalman_P_beta or 0.0,
        effective_q_beta=kalman_effective_q or 1e-4,
        kline_time=str(kline_time),
        soft_prob=params.beta_regime_soft_prob,
        hard_prob=params.beta_regime_hard_prob,
        scale_max=params.beta_regime_scale_max,
        warmup=params.beta_regime_warmup,
    )

    if beta_state.hard_block:
        logger.info(f"Beta体制硬拦截 | {pair_label} | {beta_state.reason} | "
                    f"az={adaptive_z:+.4f} z4h={z4h:+.4f}")
        return None

    all_states = self._regime_detector.get_all_states()
    is_systemic, systemic_reason = self._systemic_aggregator.check(
        all_states,
        ratio_threshold=params.systemic_ratio_threshold,
        weighted_threshold=params.systemic_weighted_threshold,
    )
    if is_systemic:
        logger.info(f"系统性风险拦截 | {pair_label} | {systemic_reason}")
        return None

    threshold_scale = beta_state.threshold_scale
else:
    beta_state = None
    threshold_scale = 1.0

# ── 方向判断(应用 Beta 缩放)──
threshold = params.adaptive_threshold * threshold_scale
if adaptive_z < -threshold:
    direction = 'long'
elif adaptive_z > threshold:
    direction = 'short'
else:
    if threshold_scale > 1.01:
        logger.info(f"Beta体制缩放拦截 | {pair_label} | "
                    f"az={adaptive_z:+.4f} 有效阈值={threshold:.2f} "
                    f"(原始={params.adaptive_threshold} ×{threshold_scale:.2f}) | "
                    f"{beta_state.reason if beta_state else ''}")
    return None

cleanup_pair()

self._regime_detector.cleanup_pair(key)

5. 完整数据流

1. WebSocket 4h K线闭合
   ↓
2. realtime_kline_service_base 触发分析
   ↓
3. analyze_multi_period()
   ├─ calculate_cointegration_params_dual_window(kalman_state=缓存)
   │   ├─ OLS 回归 → β_ols, spread, adf_pvalue(现有)
   │   └─ [v3] IMM Kalman update:
   │       ├─ M=5 并行 Kalman 滤波 + 贝叶斯模型概率更新
   │       └─ 输出: beta, P_beta, regime_score, effective_q, model_probs
   ├─ 健康监控 → Gate2(现有)
   └─ 输出: multi_period_result(含 kalman_* 字段)
   ↓
4. orchestrator.process_analysis()
   ├─ 缓存 kalman_state → self._kalman_states[pair_key]
   ├─ 传递 kalman_regime_score, kalman_P_beta → strategy.process_tick()
   │   ├─ [用途 1] _IMMRegimeDetector.update(regime_score) → 体制判定
   │   │   └─ 硬拦截 / 阈值缩放(直接阈值比较,无 BOCPD)
   │   └─ [用途 1] _SystemicRiskAggregator.check() → 系统性拦截
   ├─ 若产生 EntrySignal:
   │   ├─ [用途 2] resolve_hedge_beta(kalman_beta, P_β, ols_beta)
   │   └─ on_entry_signal(hedge_beta=...) → PairTradeSignal
   ↓
5. position_manager.open_position(signal)
   ├─ risk_manager.calculate_position_size(hedge_beta=signal.hedge_beta)
   │   ├─ base_notional = total / (1 + |β|)
   │   └─ alt_notional  = |β| × base_notional
   ├─ executor.market_open(alt_size, base_size)
   └─ PairPosition(entry_hedge_beta=β)
   ↓
6. 持久化
   ├─ pair_positions: 含 entry_hedge_beta
   └─ trading_signals: 含 hedge_beta, hedge_beta_source, beta_negative

6. 场景模拟

A:正常市场(β 稳定) — 两个用途均正常

低Q模型(1e-6, 1e-5) 主导: μ = [0.35, 0.40, 0.20, 0.04, 0.01]
kalman_beta ≈ 0.45, P_β < 0.01

用途 1: regime_score = μ[3]+μ[4] = 0.05 < soft_prob(0.3)
        → regime=STABLE, scale=1.0 → 正常入场
用途 2: hedge_beta=0.45(kalman) → alt_notional = 0.45 × base_notional
        比等额更少的 Alt 暴露,正确反映 β<1 的弱相关性

B:β 开始飙升 — IMM 模型概率瞬时切换

T=0h:   kalman_beta ≈ 0.5, μ = [0.35, 0.40, 0.20, 0.04, 0.01]
        regime_score = 0.05 → 正常

T=4h:   β 突变 → 高Q模型 likelihood 飙升
        μ = [0.05, 0.10, 0.25, 0.40, 0.20]
        regime_score = 0.60 > soft_prob(0.3) → EXPANDING, scale≈1.3
        effective_q = 2.3e-3(自动切换到快速追踪)

        [对比 v2: 此时 Q_β 才刚开始 ×1.05 逐步放大, BOCPD 的信号还被 Q 调整削弱]

T=8h:   β 继续变化 → 最高Q模型(1e-2)主导
        μ = [0.01, 0.02, 0.07, 0.30, 0.60]
        regime_score = 0.90 > hard_prob(0.7) → 硬拦截

        [对比 v2: BOCPD P(变点)可能仍 < hard_prob(0.7), 因为信号被 Q 自适应部分吸收]

v3 vs v2 响应速度对比

  • v2:Q 自适应需 ~10 步才能放大 Q_β 到合理水平(受 κ_up=1.05 限制);BOCPD 信号被削弱后需更多步
  • v3:IMM 模型概率在 1-2 步内 完成切换(贝叶斯后验即时更新),regime_score 同步升高

C:β 飙升后企稳 — 模型概率自动回归

飙升期 regime_score = 0.90 → 硬拦截
企稳后:
  高Q模型 likelihood 下降,低Q模型 likelihood 上升
  μ 逐步回到 [0.30, 0.35, 0.25, 0.08, 0.02]

→ ~5-8 根正常 K线(20-32h)后 regime_score < soft_prob → 恢复

恢复后:
  用途 1: regime=STABLE → 允许入场
  用途 2: kalman_beta 已追踪到新 β(通过概率加权自动跟上)

D:β≈0.3 的弱相关配对 — hedge ratio 纠偏最显著

与 v2 相同:

等额开仓:  Alt $100 + Base $100 → Alt 非系统性风险暴露 $70(多余)
β 加权:    Alt $46 + Base $154 → 正确对冲

E:Kalman 冷启动 — 降级策略

系统重启,kalman_state 丢失:
  T=0:  用 OLS β 初始化所有 M=5 个模型,P₀ = diag(0.1, 1.0)
        P_β = 1.0 > HEDGE_BETA_P_MAX(0.5) → 降级为 OLS β
        μ = [1/5, 1/5, 1/5, 1/5, 1/5](均匀)

  T=3-5 根 K线(12-20h):
        模型概率分化,P_β 收敛 → 切回 Kalman β
        [注: IMM 收敛速度与单一 Kalman 相当,因为所有模型共享初始状态]

F:系统性风险

20 配对中 8 个 regime_score > 0.3 → 40% > ratio_threshold=30%
加权 regime_score = 0.35 > weighted_threshold=0.25 → 也触发
→ 系统性风险拦截 → 所有配对暂停入场
→ IMM 仍在后台更新,恢复后使用最新 β

G:MEME vs L1 配对自适应对比

MEME 配对 (PURR/HYPE):
  β 波动大 → 高Q模型(1e-3, 1e-2) 常获高概率
  effective_q ≈ 5e-4 → 追踪快
  regime_score ≈ 0.3-0.5 → 经常处于缩放区间(提高入场门槛)

L1 配对 (ETH/BTC):
  β 极稳定 → 低Q模型(1e-6, 1e-5) 主导
  effective_q ≈ 5e-6 → 不追噪声
  regime_score ≈ 0.02-0.05 → 几乎总是 stable(正常入场)

7. 改动文件清单

文件 改动 影响范围
src/config.py +8 IMM 常量 + 3 Hedge Ratio 常量(删除 v2 的 15 个 Kalman+Q 自适应常量) 配置层
src/utils/analysis/analysis_core.py +_KalmanModel 类 + IMMKalmanBetaEstimator 类(替代 v2 VectorKalmanBetaEstimator);增强 calculate_cointegration_params_dual_window(), analyze_pair_advanced(), analyze_multi_period() 分析层
src/trading/config.py StrategyParams +4 体制检测字段 + 2 系统性风险字段(替代 v2 的 10 个 BOCPD 字段 + 3 自适应 H 字段);增强 get_strategy_params(), _build_strategy_params(), load_trading_config() 配置层
src/trading/models.py PairTradeSignal +3 字段;PairPosition +1 字段(与 v2 相同) 数据模型
src/trading/orchestrator.py +_kalman_states 字典;+resolve_hedge_beta() 函数;增强 process_analysis(), on_entry_signal() 编排层
src/trading/strategy.py +_BetaRegimeState, +_IMMRegimeDetector(替代 v2 _BOCPDDetector), +_SystemicRiskAggregator(v3 独立阈值);增强 process_tick(), _check_entry(), cleanup_pair() 策略层
src/trading/risk_manager.py 增强 calculate_position_size() 支持 β 加权(与 v2 相同) 风控层
src/trading/position_manager.py 增强 _open_position_inner() 传递 hedge_beta(与 v2 相同) 仓位管理

新增依赖:无新增(numpy 已有,删除 v2 对 scipy.special.gammaln 的需求)

不改动momentum_filter.py, executor.py


8. DB 改动

与 v2 相同:

pair_positions 表新增列

ALTER TABLE pair_positions ADD COLUMN entry_hedge_beta DOUBLE PRECISION DEFAULT 1.0;

trading_signals 表新增列

ALTER TABLE trading_signals ADD COLUMN hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta_source VARCHAR(10) DEFAULT 'default';
ALTER TABLE trading_signals ADD COLUMN beta_negative BOOLEAN DEFAULT FALSE;

9. 日志

时机 级别 格式
初始化 INFO Beta体制跟踪器初始化 | IMM M=5 Q_grid=[1e-6..1e-2] p_stay=0.98 soft=0.30 hard=0.70
IMM 更新 DEBUG IMM更新 | PURR|HYPE | β̂=1.20 P_β=0.005 regime=0.65 eff_Q=5.2e-4 μ=[.01,.05,.25,.40,.29]
hedge_beta 选择 DEBUG hedge_beta=0.45(kalman) P_β=0.003 | OLS_β=0.48 | negative=False
hedge_beta 降级 DEBUG Kalman P_β=0.62 > 0.50,降级为 OLS β=0.48
负 β 警告 WARNING 负β检测: PURR|HYPE raw_β=-0.30(kalman),配对为反向相关
β 加权仓位 INFO 仓位计算(β加权) | β=0.45(kalman) | Alt ≈$62 | Base ≈$138 | 比例 0.45:1
硬拦截 INFO Beta体制硬拦截 | PURR|HYPE | regime_score=0.850>=0.70 eff_Q=3.2e-3
缩放拦截 INFO Beta体制缩放拦截 | PURR|HYPE | az=-4.50 有效阈值=4.00 (3.0×1.33)
系统性风险 INFO 系统性风险: 8/20 (40%) 配对扩张态(比例触发), 加权regime_score=0.350(加权触发)

10. 风险与边界条件

风险 严重度 缓解
已持仓 hedge ratio 偏离 本次仅在入场时计算 β 比例;持仓期间 rebalance 作为 P0 后续
β 符号翻转(kalman_beta < 0) resolve_hedge_beta 返回 beta_negative 标记 + 警告日志;方向翻转逻辑作为 P1 后续
IMM 模型概率塌缩(单模型概率→1.0) 转移矩阵保证至少 2% 概率分配给其他模型;数学上不可能完全塌缩
重启丢失 Kalman 状态 OLS 重新初始化 M 个模型,3-5 根 K线收敛;P_β 过大时自动降级为 OLS β
IMM 计算量(5× 单 Kalman) 2×2 状态 ×5 模型 ≈ 100 次浮点运算,4h 频率下完全可忽略
Q_β 网格覆盖不足 5 个值覆盖 4 个数量级(1e-6 到 1e-2),已包含绝大部分实际场景
低波动期 r_btc≈0 H=[1,0] 只更新 α,β 保持不变(所有模型相同行为)
β 加权导致极端仓位 HEDGE_BETA_MIN=0.1, HEDGE_BETA_MAX=5.0 上下限保护
固定 R 不自适应 IMM 的 Q 模型选择吸收了大部分 R 变化效应;VB-AKF 作为 P1 后续

不在本次范围:退场逻辑调整、持仓期间 hedge rebalance、HMM 体制分类、Kalman 持久化到 DB、飞书告警、负 β 方向翻转逻辑


11. 验证方案

单元测试

# test_imm_kalman.py
import numpy as np


def test_imm_convergence():
    """从初始值收敛到真实 [α, β]"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        r_btc = rng.normal(0, 0.02)
        r_alt = 0.001 + 1.0 * r_btc + rng.normal(0, 0.01)
        result = kf.update(r_btc, r_alt)
    assert abs(result['beta'] - 1.0) < 0.15
    assert result['alpha'] > 0


def test_imm_regime_score_stable():
    """β 稳定时 regime_score 低"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(50):
        r_btc = rng.normal(0, 0.02)
        r_alt = 0.5 * r_btc + rng.normal(0, 0.01)
        result = kf.update(r_btc, r_alt)
    assert result['regime_score'] < 0.3, f"Stable β should have low regime_score: {result['regime_score']}"


def test_imm_regime_score_spike():
    """β 突变时 regime_score 立即飙升"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    # 稳定期
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    # β 突变
    for _ in range(5):
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
    assert result['regime_score'] > 0.3, f"β jump should increase regime_score: {result['regime_score']}"


def test_imm_regime_score_recovery():
    """β 企稳后 regime_score 回落"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    # 稳定期
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    # β 突变
    for _ in range(10):
        r_btc = rng.normal(0, 0.02)
        kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
    # 在新 β 上企稳
    for _ in range(30):
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
    assert result['regime_score'] < 0.3, f"After stabilization, regime_score should decrease: {result['regime_score']}"


def test_imm_effective_q_adaptation():
    """MEME 配对应有更高的 effective_q,L1 配对应有更低的"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    rng = np.random.default_rng(42)

    # L1 配对:β 稳定
    kf_l1 = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    for _ in range(100):
        r_btc = rng.normal(0, 0.02)
        kf_l1.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.005))
    q_l1 = kf_l1.effective_q_beta

    # MEME 配对:β 波动大
    kf_meme = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    beta_val = 0.5
    for i in range(100):
        if i % 10 == 0:
            beta_val += rng.normal(0, 0.5)  # β 每 10 步大幅变化
        r_btc = rng.normal(0, 0.02)
        kf_meme.update(r_btc, beta_val * r_btc + rng.normal(0, 0.01))
    q_meme = kf_meme.effective_q_beta

    assert q_meme > q_l1, f"MEME should have higher effective_q: {q_meme} vs {q_l1}"


def test_imm_model_probs_sum_to_one():
    """模型概率总和始终为 1"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        r_btc = rng.normal(0, 0.02)
        r_alt = rng.normal(0, 0.03)
        result = kf.update(r_btc, r_alt)
        assert abs(sum(result['model_probs']) - 1.0) < 1e-10


def test_imm_huber_clipping():
    """极端 innovation 被截断,β 不过度跳变"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, clip_sigma=3.0)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    result = kf.update(0.02, 20.0 * 0.02)
    assert result['clipped'] is True
    assert result['beta'] < 5.0


def test_imm_joseph_positive():
    """Joseph 形式保证 P 始终非负"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-6)
    rng = np.random.default_rng(42)
    for _ in range(500):
        result = kf.update(rng.normal(0, 0.05), 0.5 * rng.normal(0, 0.05) + rng.normal(0, 0.001))
        assert result['P_beta'] >= 0 and result['P_alpha'] >= 0


def test_imm_alpha_absorbs_drift():
    """α 吸收独立漂移,β 不被污染"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        result = kf.update(rng.normal(0, 0.02), 0.005 + 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    assert abs(result['beta'] - 0.5) < 0.2
    assert result['alpha'] > 0.001


def test_imm_state_persistence():
    """状态导出/恢复后行为一致"""
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf1 = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    kf1.update(0.02, 0.01)
    kf2 = IMMKalmanBetaEstimator.from_state_dict(kf1.state_dict())
    r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
    assert abs(r1['beta'] - r2['beta']) < 1e-10


def test_imm_no_signal_competition():
    """v3 核心验证: regime_score 不会被 β 追踪吸收

    构造 β 渐变场景,验证 regime_score 在 β 变化期间持续升高
    (而非像 v2 的 BOCPD 那样被 Q 自适应削弱)
    """
    from src.utils.analysis.analysis_core import IMMKalmanBetaEstimator
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)

    # 稳定期
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))

    # β 渐变期(每步 β 增加 0.1)
    regime_scores_during_change = []
    beta_val = 0.5
    for _ in range(20):
        beta_val += 0.1
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, beta_val * r_btc + rng.normal(0, 0.01))
        regime_scores_during_change.append(result['regime_score'])

    # 关键断言: regime_score 在 β 变化期间应持续升高(而非被吸收后降低)
    max_score = max(regime_scores_during_change)
    assert max_score > 0.5, (
        f"During β change, regime_score should be high (got max={max_score}). "
        f"If this fails, there may be signal competition."
    )
# test_hedge_beta.py(与 v2 相同)
import numpy as np


def test_resolve_hedge_beta_kalman():
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.01, ols_beta=0.5)
    assert source == 'kalman'
    assert abs(beta - 0.45) < 0.01
    assert negative is False


def test_resolve_hedge_beta_fallback_ols():
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.8, ols_beta=0.5)
    assert source == 'ols'
    assert abs(beta - 0.5) < 0.01


def test_resolve_hedge_beta_fallback_default():
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=None, kalman_P_beta=None, ols_beta=None)
    assert source == 'default'
    assert beta == 1.0
    assert negative is False


def test_resolve_hedge_beta_clipping():
    from src.trading.orchestrator import resolve_hedge_beta
    beta, _, _ = resolve_hedge_beta(kalman_beta=10.0, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 5.0
    beta, _, _ = resolve_hedge_beta(kalman_beta=0.01, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 0.1


def test_resolve_hedge_beta_negative():
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=-0.8, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 0.8
    assert source == 'kalman'
    assert negative is True


def test_position_size_beta_weighted():
    abs_beta = 0.5
    total = 200.0
    base_notional = total / (1.0 + abs_beta)
    alt_notional = abs_beta * base_notional
    assert abs(base_notional - 133.33) < 0.5
    assert abs(alt_notional - 66.67) < 0.5
    assert abs(base_notional + alt_notional - total) < 0.01


def test_position_size_beta_one():
    abs_beta = 1.0
    total = 200.0
    base_notional = total / (1.0 + abs_beta)
    alt_notional = abs_beta * base_notional
    assert abs(base_notional - 100.0) < 0.01
    assert abs(alt_notional - 100.0) < 0.01
# test_regime_detector.py
import numpy as np


def test_regime_detector_stable():
    from src.trading.strategy import _IMMRegimeDetector
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for i in range(10):
        s = d.update(key, regime_score=0.05, kalman_P_beta=0.01,
                     effective_q_beta=1e-5, kline_time=f"2024-01-01T{i*4:02d}:00:00",
                     soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'stable' and s.regime_score < 0.3


def test_regime_detector_expanding():
    from src.trading.strategy import _IMMRegimeDetector
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for i in range(10):
        s = d.update(key, regime_score=0.50, kalman_P_beta=0.05,
                     effective_q_beta=5e-4, kline_time=f"2024-01-01T{i*4:02d}:00:00",
                     soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'expanding' and not s.hard_block
    assert s.threshold_scale > 1.0


def test_regime_detector_hard_block():
    from src.trading.strategy import _IMMRegimeDetector
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for i in range(10):
        s = d.update(key, regime_score=0.85, kalman_P_beta=0.1,
                     effective_q_beta=3e-3, kline_time=f"2024-01-01T{i*4:02d}:00:00",
                     soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.hard_block and s.regime_score > 0.7


def test_regime_detector_warmup():
    from src.trading.strategy import _IMMRegimeDetector
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    # warmup 期间即使 regime_score 高也返回 stable
    for i in range(3):
        s = d.update(key, regime_score=0.90, kalman_P_beta=0.1,
                     effective_q_beta=5e-3, kline_time=f"2024-01-01T{i*4:02d}:00:00",
                     soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'stable' and s.reason == "数据不足"


# test_systemic_risk.py
def test_systemic_risk_ratio_trigger():
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
        0.01, 5e-4 if i < 4 else 1e-5, 1.5 if i < 4 else 1.0, False, ""
    ) for i in range(10)}
    triggered, reason = agg.check(states, ratio_threshold=0.3, weighted_threshold=0.25)
    assert triggered  # 4/10 = 40% > 30%


def test_systemic_risk_weighted_trigger():
    """加权阈值独立于比例阈值"""
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    # 只有 2/10 expanding(20% < ratio 30%),但加权 score 高
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 2 else 'stable', 0.9 if i < 2 else 0.05,
        0.01, 5e-3 if i < 2 else 1e-5, 1.5 if i < 2 else 1.0, False, ""
    ) for i in range(10)}
    weights = {(f"ALT{i}", "BTC"): (10.0 if i < 2 else 1.0) for i in range(10)}
    triggered, _ = agg.check(states, ratio_threshold=0.3, weighted_threshold=0.25,
                             position_weights=weights)
    assert triggered  # 加权 score > 0.25


def test_systemic_risk_no_trigger():
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 2 else 'stable', 0.4 if i < 2 else 0.05,
        0.01, 3e-4 if i < 2 else 1e-5, 1.5 if i < 2 else 1.0, False, ""
    ) for i in range(10)}
    assert not agg.check(states, ratio_threshold=0.3, weighted_threshold=0.25)[0]

集成验证

  1. 回测:在 β 飙升的历史区间回测,验证 regime_score 的检测延迟;对比等额 vs β 加权的 PnL 差异;对比 v2(BOCPD + Q 自适应)vs v3(IMM)的检测延迟和假阳性率
  2. 实盘观察:监控 kalman_beta vs OLS β 的偏离度、hedge_beta_source 分布、P_β 走势、model_probs 分布(不同配对应呈现不同模式)effective_q_beta 差异(MEME >> L1)
  3. A/B 对比(可选):同时运行等额和 β 加权,对比 hedge 效果和回撤

12. 后续演进

优先级 方向 预期收益 依赖
P0 退场保护(β 飙升时收紧止损) 减少已持仓亏损 本方案
P0 持仓期间 hedge rebalance(β 偏离阈值时调整两腿比例) 维持对冲质量 本方案
P0 Kalman 状态持久化到 DB 消除冷启动窗口 本方案
P1 负 β 方向翻转逻辑 正确处理反向相关配对 本方案 beta_negative
P1 VB-AKF R 自适应 在线校准观测噪声(替代当前固定 R) 本方案
P1 跨配对层次化估计(同赛道共享 Q 超先验) 加速冷启动,提高数据稀疏时的估计精度 本方案
P1 均值回归状态转移 x_t = Φx_{t-1} + (I-Φ)μ + w_t P_β 有稳态解,长期更稳定 本方案
P2 BOCPD 补充检测器(监控 IMM 加权 innovation) 检测非线性断裂(如 β 正→负) 本方案
P2 HMM 体制分类(多体制缩放) 更精细的体制控制 本方案
P2 MS-SSM 统一框架(Regime-Switching SSM) 理论最优 替换本方案架构
P3 飞书告警(体制切换 + hedge_beta 变化) 人工监控 本方案
P3 多频率融合(4h + daily) 更鲁棒的 β 估计 本方案

Read more

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG

对于空间环境、“信息/逻辑”(比如代码、结构、表达)秩序追求的心理特征分析

一、为什么是“空间 + 信息”同时强化? 因为你当年面对的是“双重失控”: 1️⃣ 外部世界是脏乱 + 失序的 * 空间被污染 * 行为无边界 * 基本生活秩序崩塌 👉 所以你现在会强烈要求: * 桌面干净 * 房间有序 * 物品可控 这是在修复:“物理世界必须是可控的” 2️⃣ 人的行为和逻辑也是混乱的 * 没有规则 * 没有底线 * 没有理性 👉 所以你现在会特别在意: * 表达是否清晰 * 逻辑是否自洽 * 结构是否优雅 * 代码是否干净 这是在修复:“认知世界必须是合理的” 二、你其实构建了一个“高纯度系统” 你现在的偏好,本质上是: 👉 低噪音 + 高结构 + 强控制感 具体表现就是: * 空间:极简、整洁、可预测 * 信息:清晰、压缩、无冗余 这类人有一个很明显的优势: 👉 处理复杂问题时,

By SHI XIAOLONG