IMM Kalman Beta 体制自适应设计方案(v4.0)

IMM Kalman Beta 体制自适应设计方案(v4.0)

1. 问题背景

当前系统在 Beta 估计上存在以下结构性缺陷:

组件 问题 影响
analysis_core.py OLS BETA_WINDOW=100 固定窗口 β 估计滞后 ≈17天
risk_manager.py 等额名义价值,未使用 β 对冲比例错误
strategy.py 固定阈值,无体制检测 β 飙升时持续误入场
高斯似然 加密货币重尾(kurtosis 5-20)下过度敏感 模型概率频繁震荡
随机游走状态方程 P_β 无稳态解 长期运行后降级到 OLS

2. 架构概述

IMM(Interacting Multiple Model)Kalman Filter + OU 均值回归 估计时变 [α, β],输出同时服务两个用途:

                   ┌──────────────────────────────────────────────────────┐
                   │  IMMKalmanBetaEstimator (4h)                         │
                   │  M=5 并行模型, OU 状态方程(per-model Φ_β),            │
                   │  Student-t 似然(在线 ν), Robbins-Monro R,             │
                   │  连续可观测性加权, x̄ 在线漂移                          │
                   └──────────────┬───────────────────────────────────────┘
                                  │
                  ┌───────────────┼───────────────┐
                  ▼                               ▼
         用途 1: 体制检测                    用途 2: Hedge Ratio
         ┌─────────────────┐            ┌─────────────────────┐
         │ IMM 模型概率判定  │            │ 仓位比例计算          │
         │ P(高Q模型) 阈值   │            │ alt_notional = β ×   │
         │ → regime_score   │            │ base_notional        │
         └────────┬────────┘            └──────────┬──────────┘
                  │                                │
                  ▼                                ▼
         入场信号过滤                         Beta 加权开仓
         (硬拦截/阈值缩放)
                  │
                  ▼
         跨配对系统性风险聚合
         (双阈值 + 确认窗口)

四层架构

组件 功能
第一层 IMM Kalman Filter M=5 并行 OU Kalman + Student-t 似然 + 在线 R/ν + 可观测性加权 + 贝叶斯模型概率融合
第二层 IMM 体制检测 P(高Q模型) 作为体制指标 → 入场信号过滤
第三层 跨配对系统性风险聚合 双阈值独立设定 + 确认窗口 → 系统性事件拦截
第四层 Beta 加权仓位计算 kalman_beta 直接作为 hedge ratio

核心设计要点

  • IMM 多模型融合:M=5 个 Q_β 值并行竞争,贝叶斯概率自动选出最优,无启发式阈值
  • OU 均值回归 + Per-model Φ_β:每个模型的回归强度与 Q_β 配对——低Q模型强回归(β 稳定时锚定均值),高Q模型弱回归(β 变化时不抵抗)
  • Student-t 似然 + 在线 ν:重尾鲁棒 + 自适应尾部厚度
  • 连续可观测性加权:替代二值阈值,r_btc 小时通过似然温和(likelihood tempering)自然降低模型概率更新力度,消除边界不连续
  • x̄ 在线漂移:OU 长期均值缓慢追踪 β 的持续迁移,防止长期运行后回归力变成阻力

3. 算法设计

3.1 数学模型

M 个并行状态空间模型,每个模型 j 使用不同的 (Q_β^(j), Φ_β^(j)) 对:

状态方程(OU 过程,per-model Φ_β):
  x_t = Φ^(j) × x_{t-1} + (I - Φ^(j)) × x̄_t + w_t,    w_t ~ N(0, Q^(j))

  x_t = [α_t, β_t]'
  x̄_t = [ᾱ_t, β̄_t]'         在线缓慢漂移的长期均值
  Φ^(j) = diag(Φ_α, Φ_β^(j))  per-model 对角回归矩阵
  Q^(j) = diag(q_α, q_β^(j))   q_α 共享, q_β^(j) 各异

观测方程(所有模型共享):
  r_alt_t = H_t × x_t + v_t,    v_t ~ N(0, R_t)
  H_t = [1, r_btc_t]
  R_t 通过 Robbins-Monro EMA 在线更新

Q_β 与 Φ_β 配对网格

模型 j Q_β^(j) Φ_β^(j) 物理含义 P_∞_β
0 1e-6 0.950 β 几乎不变,强回归 1.0e-5
1 1e-5 0.970 β 缓慢变化 1.7e-4
2 1e-4 0.980 β 正常变化 2.5e-3
3 1e-3 0.990 β 快速变化,弱回归 5.0e-2
4 1e-2 0.995 结构性断裂,近自由漂移 1.0

Per-model Φ_β 的意义:高Q模型允许 β 快速偏离均值(Φ 接近 1),低Q模型将 β 锚定在 x̄ 附近(Φ 较小)。P_∞ 跨 5 个数量级,模型间区分度更强。

3.2 IMM 递推算法

输入: r_btc_t, r_alt_t, 上一步状态 {x^(j), P^(j), μ_j}

Step 1: 可观测性权重
  btc_var_ema = (1-γ_obs) × btc_var_ema + γ_obs × r_btc²
  obs_weight = min(r_btc² / max(btc_var_ema, 1e-12), 1.0)

  含义: obs_weight ∈ [0,1],|r_btc| 在典型水平时 ≈1,远小于典型值时 →0
  作用: 在 Step 4 中对似然温和,r_btc≈0 时模型概率几乎不更新

Step 2: 混合(Interaction)
  ⚠️ 必须先保存所有模型的原始状态副本,再执行混合

  预测模型概率:  μ̃_j = Σ_i π_{ij} × μ_i
  混合权重:      w_{i|j} = π_{ij} × μ_i / μ̃_j

  # 保存原始状态
  orig_x = [model_i.x.copy() for i]
  orig_P = [model_i.P.copy() for i]

  # 为每个模型 j 准备混合初始条件(从原始副本计算)
  x̃^(j) = Σ_i w_{i|j} × orig_x[i]
  P̃^(j) = Σ_i w_{i|j} × [orig_P[i] + (orig_x[i] - x̃^(j))(orig_x[i] - x̃^(j))']

Step 3: 并行滤波(每个模型 j 独立运行 OU Kalman)
  OU 预测:
    x_pred^(j) = Φ^(j) × x̃^(j) + (I - Φ^(j)) × x̄
    P_pred^(j) = Φ^(j) × P̃^(j) × Φ^(j)' + Q^(j)

  Innovation:
    ε^(j) = r_alt_t - H_t × x_pred^(j)
    S^(j) = H_t × P_pred^(j) × H_t' + R

  Student-t 似然(用原始 innovation):
    L_j = t_ν(ε^(j); 0, S^(j))

  Huber Clipping(保护状态更新,不影响似然):
    ε̃^(j) = clip(ε^(j), ±c×√S^(j))

  Kalman 更新(Joseph 稳定形式):
    K^(j) = P_pred^(j) × H_t' / S^(j)
    x^(j) = x_pred^(j) + K^(j) × ε̃^(j)
    A = I - K^(j) × H_t
    P^(j) = A × P_pred^(j) × A' + R × K^(j) × K^(j)'

Step 4: 模型概率更新(含可观测性温和)
  似然温和: log_L_tempered_j = obs_weight × log_L_j
    obs_weight=1 → 正常贝叶斯更新
    obs_weight=0 → L_tempered=1(各模型无差异)→ μ 保持 μ̃ 不变

  log_μ_j = log(μ̃_j) + log_L_tempered_j
  μ_j = softmax(log_μ)
  μ_j = max(μ_j, μ_floor),再归一化

Step 5: 在线 R 更新(Robbins-Monro EMA,无截断偏差)
  weighted_innov_sq = Σ_j μ̃_j × ε^(j)²       (用预测概率,避免循环依赖)
  weighted_HP_H = Σ_j μ̃_j × H P_pred^(j) H'
  R_update = weighted_innov_sq - weighted_HP_H   (允许负值,无 max(.,0))
  R = (1-γ_R) × R + γ_R × R_update
  R = max(R, R_floor)

Step 6: 融合输出
  β = Σ_j μ_j × β^(j)
  P_β = Σ_j μ_j × [P_β^(j) + (β^(j) - β)²]
  regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j

Step 7: 在线 ν 自适应
  weighted_norm_innov = Σ_j μ_j × (ε^(j) / √S^(j))
  kurt_ema = (1-γ_ν) × kurt_ema + γ_ν × weighted_norm_innov⁴
  excess_kurt = kurt_ema - 3.0
  若 excess_kurt > 0.2:
    ν = clip(6/excess_kurt + 4, 3, 30)
    同步到所有 M 个模型

  公式推导: Student-t(ν) 的 excess kurtosis = 6/(ν-4),反解得 ν = 6/κ + 4

Step 8: x̄ 在线漂移
  x̄[α] = (1-γ_bar) × x̄[α] + γ_bar × α_fused
  x̄[β] = (1-γ_bar) × x̄[β] + γ_bar × β_fused
  同步到所有 M 个模型

  γ_bar=0.005 → 有效窗口 ~200步 ≈ 33天,远慢于 β 追踪(~50步)
  确保 x̄ 仅在 β 持续迁移时才漂移,不追踪短期波动

模型转移概率矩阵 Π(带宽 TPM)

Π[i,j] = { p_stay                             if i == j
          { (1-p_stay) × exp(-|i-j|) / Z_i    if i ≠ j

相邻模型跳转概率 > 远端模型,符合 β 变化速度渐变的物理特征。
p_stay=0.98 → 模型平均持续 ~50 步(~8天)。

3.3 参数

参数 默认值 含义 备注
Q_β 网格 [1e-6, 1e-5, 1e-4, 1e-3, 1e-2] M=5 模型的 β 过程噪声 对数均匀,4 个数量级
Φ_β 网格 [0.95, 0.97, 0.98, 0.99, 0.995] Per-model OU 回归强度 低Q强回归,高Q弱回归
q_α 1e-5 α 过程噪声(共享) 通常不调
Φ_α 0.995 α 回归系数(共享) 特征时间 ~33天
R_floor 1e-8 R 正定下限
γ_R 0.02 R 学习率 有效窗口 ~50步
clip_sigma 3.0 Huber 截断
ν_init per-pair Student-t 初始自由度 从 OLS 残差 kurtosis 推算
γ_ν 0.01 在线 ν 学习率 有效窗口 ~100步
p_stay 0.98 TPM 对角线 模型持续 ~8天
q_high 1e-3 高Q阈值
μ_floor 1e-6 模型概率下限 防下溢
γ_obs 0.05 BTC 方差 EMA 衰减 可观测性权重
γ_bar 0.005 x̄ 漂移学习率 有效窗口 ~33天
P₀ diag(0.1, 1.0) 初始不确定性

参数间一致性约束

  • q_α = Q_β_grid[2]/10 → α 变化比中位 β 慢一个量级
  • Φ_β_gridQ_β_grid 配对 → 高Q模型弱回归
  • γ_bar ≪ 1/(-ln(Φ_β_grid[2])) → x̄ 迁移远慢于 β 追踪
  • γ_R 有效窗口与中位 β 特征时间匹配

3.4 体制检测

regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j 直接作为体制指标:

regime_score 含义 动作
< soft_prob (0.3) 低Q模型主导,β 稳定 正常入场
[soft_prob, hard_prob) β 开始变化 阈值缩放(线性插值到 scale_max)
≥ hard_prob (0.7) β 剧烈变化 硬拦截

可观测性保护:当 obs_weight ≈ 0 时模型概率几乎不更新,regime_score 自然保持稳定。

3.5 跨配对系统性风险聚合

双独立阈值 + 确认窗口:

条件 1: expanding 配对比例 ≥ ratio_threshold (0.3)
条件 2: 加权 regime_score ≥ weighted_threshold (0.25)

任一触发 → 累计确认计数
连续 confirm_bars (2) 次触发 → 全局拦截
中间恢复 → 计数归零

3.6 Beta 加权仓位

hedge_beta 选择:Kalman β(P_β ≤ P_max 时)→ OLS β(降级)→ 1.0(兜底)。

仓位公式:base_notional = total / (1 + |β|), alt_notional = |β| × base_notional。β=1.0 时退化为等额。


4. 实现

4.1 src/config.py

# ═══ IMM Kalman Filter 参数(v4.0)═══
IMM_Q_BETA_GRID: list[float] = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
IMM_PHI_BETA_GRID: list[float] = [0.95, 0.97, 0.98, 0.99, 0.995]
IMM_Q_ALPHA: float = 1e-5
IMM_PHI_ALPHA: float = 0.995
IMM_P0_ALPHA: float = 0.1
IMM_P0_BETA: float = 1.0
IMM_R_FLOOR: float = 1e-8
IMM_R_GAMMA: float = 0.02
IMM_CLIP_SIGMA: float = 3.0
IMM_STUDENT_T_NU: float = 5.0       # 默认 ν(per-pair 初始化时覆盖)
IMM_NU_GAMMA: float = 0.01          # 在线 ν 学习率
IMM_TRANSITION_PROB: float = 0.98
IMM_HIGH_Q_THRESHOLD: float = 1e-3
IMM_MU_FLOOR: float = 1e-6
IMM_OBS_GAMMA: float = 0.05         # BTC 方差 EMA 衰减(可观测性)
IMM_XBAR_GAMMA: float = 0.005       # x̄ 在线漂移学习率

# ═══ Hedge Ratio 参数 ═══
HEDGE_BETA_MIN: float = 0.1
HEDGE_BETA_MAX: float = 5.0
HEDGE_BETA_P_MAX: float = 0.5

4.2 src/utils/analysis/analysis_core.py — 核心类

import numpy as np
from scipy.special import gammaln
from scipy.stats import kurtosis as sp_kurtosis


def _estimate_student_t_nu(residuals: np.ndarray, default: float = 5.0) -> float:
    """从 OLS 残差的 excess kurtosis 估计 Student-t 自由度 ν

    矩匹配: Student-t(ν) 的 excess kurtosis = 6/(ν-4)
    反解: ν = 6/κ + 4
    """
    if len(residuals) < 20:
        return default
    kurt = float(sp_kurtosis(residuals, fisher=True))  # excess kurtosis
    if kurt <= 0.2:
        return 30.0  # 轻尾 → 近高斯
    nu = 6.0 / kurt + 4.0
    return float(np.clip(nu, 3.0, 30.0))


class _KalmanModel:
    """单一 Kalman Filter 模型(IMM 内部组件)

    2D 状态 [α, β],OU 均值回归 + 固定 Q + 共享 R。
    Student-t 似然 + Joseph 稳定形式 + Huber clipping。
    """

    def __init__(
        self,
        x: np.ndarray,
        P: np.ndarray,
        q_alpha: float,
        q_beta: float,
        R: float,
        x_bar: np.ndarray,
        phi: np.ndarray,       # [Φ_α, Φ_β^(j)] — per-model
        r_floor: float = 1e-8,
        clip_sigma: float = 3.0,
        nu: float = 5.0,
    ):
        self.x = x.copy()
        self.P = P.copy()
        self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
        self.R = max(R, r_floor)
        self.x_bar = x_bar.copy()
        self._phi = np.diag(phi).astype(np.float64)
        self._I_phi = np.eye(2) - self._phi
        self._r_floor = r_floor
        self._clip_sigma = clip_sigma
        self.nu = nu
        self._update_t_const()

    def _update_t_const(self):
        self._log_t_const = (
            gammaln((self.nu + 1) / 2) - gammaln(self.nu / 2)
            - 0.5 * np.log(self.nu * np.pi)
        )

    def predict(self):
        self.x = self._phi @ self.x + self._I_phi @ self.x_bar
        self.P = self._phi @ self.P @ self._phi.T + self.Q

    def update(self, r_btc: float, r_alt: float) -> dict:
        H = np.array([1.0, r_btc], dtype=np.float64)

        innovation = r_alt - H @ self.x
        HP_H = float(H @ self.P @ H)
        S = HP_H + self.R
        S = max(S, 1e-15)

        # Student-t 似然(原始 innovation)
        log_likelihood = (
            self._log_t_const - 0.5 * np.log(S)
            - ((self.nu + 1) / 2) * np.log(1 + innovation ** 2 / (self.nu * S))
        )

        # Huber clipping(保护状态更新)
        sqrt_S = S ** 0.5
        norm_innov = innovation / sqrt_S
        clipped = False
        innov_update = innovation
        if abs(norm_innov) > self._clip_sigma:
            innov_update = self._clip_sigma * sqrt_S * (1.0 if innovation > 0 else -1.0)
            clipped = True

        K = (self.P @ H) / S
        self.x = self.x + K * innov_update

        # Joseph 稳定形式
        A = np.eye(2) - np.outer(K, H)
        self.P = A @ self.P @ A.T + self.R * np.outer(K, K)

        return {
            'alpha': float(self.x[0]),
            'beta': float(self.x[1]),
            'P_beta': float(self.P[1, 1]),
            'P_alpha': float(self.P[0, 0]),
            'innovation': innovation,
            'normalized_innovation': norm_innov,
            'S': S,
            'HP_H': HP_H,
            'log_likelihood': log_likelihood,
            'clipped': clipped,
            'kalman_gain_beta': float(K[1]),
        }


class IMMKalmanBetaEstimator:
    """Interacting Multiple Model Kalman Filter 时变 [α, β] 联合估计器

    核心思想 (Blom & Bar-Shalom, 1988):
        M 个 Kalman Filter 并行运行,每个使用不同的 (Q_β, Φ_β) 对,
        通过贝叶斯模型概率实时加权融合。

    双用途输出:
        - beta → hedge ratio
        - regime_score → 体制检测(高Q模型总概率)

    每根新 4h K线调用 update() 一次。
    """

    def __init__(
        self,
        alpha_init: float,
        beta_init: float,
        q_beta_grid: list[float] | None = None,
        phi_beta_grid: list[float] | None = None,
        q_alpha: float = 1e-5,
        phi_alpha: float = 0.995,
        r_init: float = 1e-2,
        p0_alpha: float = 0.1,
        p0_beta: float = 1.0,
        r_floor: float = 1e-8,
        r_gamma: float = 0.02,
        clip_sigma: float = 3.0,
        nu: float = 5.0,
        nu_gamma: float = 0.01,
        transition_prob: float = 0.98,
        high_q_threshold: float = 1e-3,
        mu_floor: float = 1e-6,
        obs_gamma: float = 0.05,
        xbar_gamma: float = 0.005,
        btc_var_init: float = 4e-4,
    ):
        if q_beta_grid is None:
            q_beta_grid = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
        if phi_beta_grid is None:
            phi_beta_grid = [0.95, 0.97, 0.98, 0.99, 0.995]
        assert len(q_beta_grid) == len(phi_beta_grid), "Q_β 和 Φ_β 网格长度必须一致"

        self.M = len(q_beta_grid)
        self._q_beta_grid = list(q_beta_grid)
        self._phi_beta_grid = list(phi_beta_grid)
        self._high_q_threshold = high_q_threshold
        self._r_gamma = r_gamma
        self._r_floor = r_floor
        self._clip_sigma = clip_sigma
        self._mu_floor = mu_floor
        self._n_updates = 0

        # 在线 ν 自适应
        self._nu = nu
        self._nu_gamma = nu_gamma
        # kurt_ema 初始化: 从 ν 反推 E[z⁴] = 3 + 6/(ν-4)
        self._kurt_ema = 3.0 + 6.0 / max(nu - 4.0, 0.5)

        # 连续可观测性
        self._obs_gamma = obs_gamma
        self._btc_var_ema = btc_var_init

        # x̄ 在线漂移
        self._xbar_gamma = xbar_gamma
        x_bar = np.array([alpha_init, beta_init], dtype=np.float64)
        self._x_bar = x_bar.copy()

        # 初始化 M 个 Kalman 模型(per-model Φ_β)
        x0 = np.array([alpha_init, beta_init], dtype=np.float64)
        P0 = np.diag([p0_alpha, p0_beta]).astype(np.float64)
        self._models = [
            _KalmanModel(
                x0, P0, q_alpha, q_beta_grid[j], r_init, x_bar,
                np.array([phi_alpha, phi_beta_grid[j]]),
                r_floor, clip_sigma, nu,
            )
            for j in range(self.M)
        ]

        # 模型概率(均匀初始化)
        self._mu = np.ones(self.M) / self.M

        # 带宽转移概率矩阵
        self._TPM = np.zeros((self.M, self.M))
        for i in range(self.M):
            weights = np.array([
                np.exp(-abs(i - j)) if i != j else 0.0
                for j in range(self.M)
            ])
            w_sum = weights.sum()
            if w_sum > 0:
                self._TPM[i] = weights / w_sum * (1.0 - transition_prob)
            self._TPM[i, i] = transition_prob

        # 高Q模型索引
        self._high_q_indices = [
            j for j, q in enumerate(q_beta_grid) if q >= high_q_threshold
        ]

    @property
    def beta(self) -> float:
        return float(sum(self._mu[j] * self._models[j].x[1] for j in range(self.M)))

    @property
    def beta_variance(self) -> float:
        b = self.beta
        return float(sum(
            self._mu[j] * (self._models[j].P[1, 1] + (self._models[j].x[1] - b) ** 2)
            for j in range(self.M)
        ))

    @property
    def regime_score(self) -> float:
        return float(sum(self._mu[j] for j in self._high_q_indices))

    @property
    def effective_q_beta(self) -> float:
        return float(sum(self._mu[j] * self._q_beta_grid[j] for j in range(self.M)))

    def update(self, r_btc: float, r_alt: float) -> dict:
        """接收一对新的对数收益率,执行完整 IMM 更新"""
        self._n_updates += 1

        # ── Step 1: 可观测性权重(连续) ──
        self._btc_var_ema = ((1 - self._obs_gamma) * self._btc_var_ema
                             + self._obs_gamma * r_btc ** 2)
        obs_weight = min(r_btc ** 2 / max(self._btc_var_ema, 1e-12), 1.0)

        # ── Step 2: 混合(Interaction) ──
        mu_predicted = self._TPM.T @ self._mu

        mix_weights = np.zeros((self.M, self.M))
        for j in range(self.M):
            if mu_predicted[j] > 1e-30:
                for i in range(self.M):
                    mix_weights[i, j] = self._TPM[i, j] * self._mu[i] / mu_predicted[j]

        # ⚠️ 保存原始状态副本,防止混合循环中覆写污染
        orig_x = [m.x.copy() for m in self._models]
        orig_P = [m.P.copy() for m in self._models]

        for j in range(self.M):
            x_mixed = np.zeros(2)
            for i in range(self.M):
                x_mixed += mix_weights[i, j] * orig_x[i]

            P_mixed = np.zeros((2, 2))
            for i in range(self.M):
                dx = orig_x[i] - x_mixed
                P_mixed += mix_weights[i, j] * (orig_P[i] + np.outer(dx, dx))

            self._models[j].x = x_mixed
            self._models[j].P = P_mixed

        # ── Step 3: 并行滤波(OU 预测 + 观测更新) ──
        results = []
        log_likelihoods = np.zeros(self.M)
        for j in range(self.M):
            self._models[j].predict()
            r = self._models[j].update(r_btc, r_alt)
            results.append(r)
            log_likelihoods[j] = r['log_likelihood']

        # ── Step 4: 模型概率更新(似然温和 + 概率下限保护) ──
        log_L_tempered = obs_weight * log_likelihoods
        log_mu = np.log(np.maximum(mu_predicted, 1e-30)) + log_L_tempered
        log_mu -= np.max(log_mu)
        self._mu = np.exp(log_mu)
        total = np.sum(self._mu)
        if total > 0:
            self._mu /= total
        else:
            self._mu = np.ones(self.M) / self.M

        self._mu = np.maximum(self._mu, self._mu_floor)
        self._mu /= self._mu.sum()

        # ── Step 5: 在线 R 更新(Robbins-Monro,无截断) ──
        if self._r_gamma > 0:
            weighted_innov_sq = sum(
                mu_predicted[j] * results[j]['innovation'] ** 2 for j in range(self.M)
            )
            weighted_HP_H = sum(
                mu_predicted[j] * results[j]['HP_H'] for j in range(self.M)
            )
            r_update = weighted_innov_sq - weighted_HP_H
            new_R = (1 - self._r_gamma) * self._models[0].R + self._r_gamma * r_update
            new_R = max(new_R, self._r_floor)
            for m in self._models:
                m.R = new_R

        # ── Step 6: 融合输出 ──
        beta = sum(self._mu[j] * results[j]['beta'] for j in range(self.M))
        alpha = sum(self._mu[j] * results[j]['alpha'] for j in range(self.M))
        P_beta = sum(
            self._mu[j] * (results[j]['P_beta'] + (results[j]['beta'] - beta) ** 2)
            for j in range(self.M)
        )
        P_alpha = sum(
            self._mu[j] * (results[j]['P_alpha'] + (results[j]['alpha'] - alpha) ** 2)
            for j in range(self.M)
        )
        regime_score = sum(self._mu[j] for j in self._high_q_indices)
        effective_q = sum(self._mu[j] * self._q_beta_grid[j] for j in range(self.M))
        dominant = int(np.argmax(self._mu))

        weighted_innovation = sum(self._mu[j] * results[j]['innovation'] for j in range(self.M))
        weighted_norm_innov = sum(
            self._mu[j] * results[j]['normalized_innovation'] for j in range(self.M)
        )
        any_clipped = any(results[j]['clipped'] for j in range(self.M) if self._mu[j] > 0.1)

        # ── Step 7: 在线 ν 自适应 ──
        if self._nu_gamma > 0 and obs_weight > 0.3:
            self._kurt_ema = ((1 - self._nu_gamma) * self._kurt_ema
                              + self._nu_gamma * weighted_norm_innov ** 4)
            excess_kurt = self._kurt_ema - 3.0
            if excess_kurt > 0.2:
                new_nu = float(np.clip(6.0 / excess_kurt + 4.0, 3.0, 30.0))
                self._nu = new_nu
                for m in self._models:
                    m.nu = new_nu
                    m._update_t_const()

        # ── Step 8: x̄ 在线漂移 ──
        if self._xbar_gamma > 0:
            self._x_bar[0] = (1 - self._xbar_gamma) * self._x_bar[0] + self._xbar_gamma * alpha
            self._x_bar[1] = (1 - self._xbar_gamma) * self._x_bar[1] + self._xbar_gamma * beta
            for m in self._models:
                m.x_bar = self._x_bar.copy()

        return {
            'alpha': float(alpha),
            'beta': float(beta),
            'P_beta': float(P_beta),
            'P_alpha': float(P_alpha),
            'regime_score': float(regime_score),
            'effective_q_beta': float(effective_q),
            'model_probs': self._mu.copy(),
            'dominant_model_idx': dominant,
            'innovation': float(weighted_innovation),
            'normalized_innovation': float(weighted_norm_innov),
            'clipped': any_clipped,
            'obs_weight': float(obs_weight),
            'kalman_gain_beta': results[dominant]['kalman_gain_beta'],
            'R': self._models[0].R,
            'nu': self._nu,
        }

    def state_dict(self) -> dict:
        return {
            'models': [
                {
                    'x': m.x.tolist(), 'P': m.P.tolist(),
                    'Q': m.Q.tolist(), 'R': m.R,
                    'phi': m._phi.tolist(),
                }
                for m in self._models
            ],
            'mu': self._mu.tolist(),
            'q_beta_grid': self._q_beta_grid,
            'phi_beta_grid': self._phi_beta_grid,
            'high_q_threshold': self._high_q_threshold,
            'n_updates': self._n_updates,
            'TPM': self._TPM.tolist(),
            'r_gamma': self._r_gamma,
            'r_floor': self._r_floor,
            'nu': self._nu,
            'nu_gamma': self._nu_gamma,
            'kurt_ema': self._kurt_ema,
            'clip_sigma': self._clip_sigma,
            'x_bar': self._x_bar.tolist(),
            'xbar_gamma': self._xbar_gamma,
            'obs_gamma': self._obs_gamma,
            'btc_var_ema': self._btc_var_ema,
            'mu_floor': self._mu_floor,
        }

    @classmethod
    def from_state_dict(cls, d: dict) -> 'IMMKalmanBetaEstimator':
        obj = cls.__new__(cls)
        obj.M = len(d['models'])
        obj._q_beta_grid = d['q_beta_grid']
        obj._phi_beta_grid = d.get('phi_beta_grid', [0.95, 0.97, 0.98, 0.99, 0.995])
        obj._high_q_threshold = d['high_q_threshold']
        obj._n_updates = d['n_updates']
        obj._mu = np.array(d['mu'], dtype=np.float64)
        obj._TPM = np.array(d['TPM'], dtype=np.float64)
        obj._r_gamma = d.get('r_gamma', 0.02)
        obj._r_floor = d.get('r_floor', 1e-8)
        obj._nu = d.get('nu', 5.0)
        obj._nu_gamma = d.get('nu_gamma', 0.01)
        obj._kurt_ema = d.get('kurt_ema', 9.0)
        obj._clip_sigma = d.get('clip_sigma', 3.0)
        obj._x_bar = np.array(d.get('x_bar', [0.0, 1.0]), dtype=np.float64)
        obj._xbar_gamma = d.get('xbar_gamma', 0.005)
        obj._obs_gamma = d.get('obs_gamma', 0.05)
        obj._btc_var_ema = d.get('btc_var_ema', 4e-4)
        obj._mu_floor = d.get('mu_floor', 1e-6)

        obj._models = []
        for md in d['models']:
            m = _KalmanModel.__new__(_KalmanModel)
            m.x = np.array(md['x'], dtype=np.float64)
            m.P = np.array(md['P'], dtype=np.float64)
            m.Q = np.array(md['Q'], dtype=np.float64)
            m.R = md['R']
            m.x_bar = obj._x_bar.copy()
            m._phi = np.array(md['phi'], dtype=np.float64)
            m._I_phi = np.eye(2) - m._phi
            m._r_floor = obj._r_floor
            m._clip_sigma = obj._clip_sigma
            m.nu = obj._nu
            m._update_t_const()
            obj._models.append(m)

        obj._high_q_indices = [
            j for j, q in enumerate(obj._q_beta_grid) if q >= obj._high_q_threshold
        ]
        return obj

4.3 src/utils/analysis/analysis_core.py — 集成点

calculate_cointegration_params_dual_window() 在现有 OLS 之后新增 IMM 更新:

def calculate_cointegration_params_dual_window(
    base_klines, alt_klines,
    beta_window=None, zscore_window=None,
    kalman_state: dict | None = None,
):
    # ... 现有 OLS 逻辑不变 ...

    # ═══ IMM Kalman Filter 更新 ═══
    kalman_result = None
    kalman_state_out = kalman_state

    if len(aligned) >= 2:
        r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
        r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])

        if kalman_state is not None:
            kf = IMMKalmanBetaEstimator.from_state_dict(kalman_state)
        else:
            ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
            nu_init = _estimate_student_t_nu(
                model.resid if hasattr(model, 'resid') else np.array([]),
                default=IMM_STUDENT_T_NU,
            )
            kf = IMMKalmanBetaEstimator(
                alpha_init=alpha_ols if use_alpha else 0.0,
                beta_init=beta_ols,
                q_beta_grid=IMM_Q_BETA_GRID,
                phi_beta_grid=IMM_PHI_BETA_GRID,
                q_alpha=IMM_Q_ALPHA, phi_alpha=IMM_PHI_ALPHA,
                r_init=max(ols_residual_var, 1e-6),
                p0_alpha=IMM_P0_ALPHA, p0_beta=IMM_P0_BETA,
                r_floor=IMM_R_FLOOR, r_gamma=IMM_R_GAMMA,
                clip_sigma=IMM_CLIP_SIGMA,
                nu=nu_init, nu_gamma=IMM_NU_GAMMA,
                transition_prob=IMM_TRANSITION_PROB,
                high_q_threshold=IMM_HIGH_Q_THRESHOLD,
                mu_floor=IMM_MU_FLOOR,
                obs_gamma=IMM_OBS_GAMMA,
                xbar_gamma=IMM_XBAR_GAMMA,
            )

        kalman_result = kf.update(r_btc, r_alt)
        kalman_state_out = kf.state_dict()

    return {
        # ... 现有字段不变 ...
        'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
        'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
        'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
        'kalman_regime_score': kalman_result['regime_score'] if kalman_result else 0.0,
        'kalman_effective_q': kalman_result['effective_q_beta'] if kalman_result else 1e-4,
        'kalman_model_probs': kalman_result['model_probs'].tolist() if kalman_result else None,
        'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
        'kalman_clipped': kalman_result['clipped'] if kalman_result else False,
        'kalman_obs_weight': kalman_result['obs_weight'] if kalman_result else 1.0,
        'kalman_nu': kalman_result['nu'] if kalman_result else 5.0,
        'kalman_state': kalman_state_out,
    }

4.4 src/trading/strategy.py — 体制检测与系统性风险

@dataclass
class _BetaRegimeState:
    """Beta 体制检测结果"""
    regime: str               # 'stable' | 'expanding'
    regime_score: float
    kalman_P_beta: float
    effective_q_beta: float
    threshold_scale: float    # ≥1.0
    hard_block: bool
    reason: str


class _IMMRegimeDetector:
    """基于 IMM 模型概率的 Beta 体制检测器

    直接使用 regime_score(高Q模型总概率)判定 β 状态。
    O(1) 内存 per pair, O(1) 计算 per check。
    """

    def __init__(self):
        self._pair_states: dict[PairKey, _BetaRegimeState] = {}
        self._n_updates: dict[PairKey, int] = {}

    def update(
        self, key: PairKey, regime_score: float,
        kalman_P_beta: float, effective_q_beta: float,
        kline_time: str,
        soft_prob: float, hard_prob: float,
        scale_max: float, warmup: int,
    ) -> _BetaRegimeState:
        n = self._n_updates.get(key, 0) + 1
        self._n_updates[key] = n

        if n < warmup:
            state = _BetaRegimeState(
                'stable', regime_score, kalman_P_beta,
                effective_q_beta, 1.0, False, "数据不足"
            )
        elif regime_score >= hard_prob:
            state = _BetaRegimeState(
                'expanding', regime_score, kalman_P_beta,
                effective_q_beta, scale_max, True,
                f"Beta硬拦截: regime={regime_score:.3f}>={hard_prob} eff_Q={effective_q_beta:.1e}"
            )
        elif regime_score >= soft_prob:
            t = min(max((regime_score - soft_prob) / max(hard_prob - soft_prob, 0.01), 0), 1)
            scale = 1.0 + t * (scale_max - 1.0)
            state = _BetaRegimeState(
                'expanding', regime_score, kalman_P_beta,
                effective_q_beta, scale, False,
                f"Beta缩放: regime={regime_score:.3f} scale={scale:.2f}"
            )
        else:
            state = _BetaRegimeState(
                'stable', regime_score, kalman_P_beta,
                effective_q_beta, 1.0, False,
                f"Beta稳定: regime={regime_score:.3f}"
            )

        self._pair_states[key] = state
        return state

    def get_all_states(self) -> dict[PairKey, _BetaRegimeState]:
        return self._pair_states

    def cleanup_pair(self, key: PairKey) -> None:
        self._pair_states.pop(key, None)
        self._n_updates.pop(key, None)


class _SystemicRiskAggregator:
    """双独立阈值 + 确认窗口的系统性风险聚合"""

    def __init__(self, enabled: bool = True):
        self._enabled = enabled
        self._consecutive_triggers = 0

    def check(
        self,
        pair_states: dict[PairKey, _BetaRegimeState],
        ratio_threshold: float = 0.3,
        weighted_threshold: float = 0.25,
        confirm_bars: int = 2,
        position_weights: dict[PairKey, float] | None = None,
    ) -> tuple[bool, str]:
        if not self._enabled or not pair_states:
            self._consecutive_triggers = 0
            return False, ""

        total = len(pair_states)
        n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
        ratio = n_expanding / total

        if position_weights:
            total_w = sum(position_weights.get(k, 1.0) for k in pair_states)
            weighted_score = sum(
                position_weights.get(k, 1.0) * s.regime_score
                for k, s in pair_states.items()
            ) / total_w
        else:
            weighted_score = sum(s.regime_score for s in pair_states.values()) / total

        ratio_hit = ratio >= ratio_threshold
        weighted_hit = weighted_score >= weighted_threshold

        if ratio_hit or weighted_hit:
            self._consecutive_triggers += 1
            if self._consecutive_triggers >= confirm_bars:
                return True, (
                    f"系统性风险(连续{self._consecutive_triggers}次): "
                    f"{n_expanding}/{total} ({ratio:.0%})配对扩张"
                    f"{'(比例触发)' if ratio_hit else ''}, "
                    f"加权regime={weighted_score:.3f}"
                    f"{'(加权触发)' if weighted_hit else ''}"
                )
            return False, f"系统性风险待确认({self._consecutive_triggers}/{confirm_bars})"

        self._consecutive_triggers = 0
        return False, ""

4.5 src/trading/config.py

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # Beta 体制过滤器
    beta_regime_enabled: bool = True
    beta_regime_soft_prob: float = 0.3
    beta_regime_hard_prob: float = 0.7
    beta_regime_scale_max: float = 2.0
    beta_regime_warmup: int = 5

    # 系统性风险
    systemic_risk_enabled: bool = True
    systemic_ratio_threshold: float = 0.3
    systemic_weighted_threshold: float = 0.25
    systemic_confirm_bars: int = 2

4.6 src/trading/orchestrator.py

def resolve_hedge_beta(
    kalman_beta: float | None,
    kalman_P_beta: float | None,
    ols_beta: float | None,
    p_beta_max: float = HEDGE_BETA_P_MAX,
    beta_min: float = HEDGE_BETA_MIN,
    beta_max: float = HEDGE_BETA_MAX,
) -> tuple[float, str, bool]:
    """选择 hedge_beta: Kalman → OLS → 1.0 兜底"""
    raw_beta = None
    source = 'default'

    if kalman_beta is not None and kalman_P_beta is not None:
        if kalman_P_beta <= p_beta_max:
            raw_beta = kalman_beta
            source = 'kalman'

    if raw_beta is None and ols_beta is not None:
        raw_beta = ols_beta
        source = 'ols'

    if raw_beta is None:
        return 1.0, 'default', False

    beta_negative = raw_beta < 0
    beta = np.clip(abs(raw_beta), beta_min, beta_max)
    return float(beta), source, beta_negative

process_analysis() 提取 IMM 输出,透传到 strategy.process_tick()on_entry_signal()

4.7 src/trading/risk_manager.py

def calculate_position_size(self, signal, alt_price, base_price=0.0,
                            available_balance=0.0,
                            hedge_beta=1.0, hedge_beta_source=''):
    # ... 现有逻辑 ...
    if self._config.pair_mode == "pair" and base_price > 0 and alt_price > 0:
        abs_beta = max(abs(hedge_beta), 0.1)
        total_position = position_usd * 2
        base_notional = total_position / (1.0 + abs_beta)
        alt_notional = abs_beta * base_notional
        alt_size = alt_notional / alt_price
        base_size = base_notional / base_price
    elif alt_price > 0:
        alt_size = position_usd / alt_price
    return alt_size, base_size

4.8 src/trading/models.py

@dataclass
class PairTradeSignal:
    # ... 现有字段 ...
    hedge_beta: float = 1.0
    hedge_beta_source: str = 'default'
    beta_negative: bool = False

@dataclass
class PairPosition:
    # ... 现有字段 ...
    entry_hedge_beta: float = 1.0

5. 数据流

1. WebSocket 4h K线闭合
   ↓
2. realtime_kline_service_base 触发分析
   ↓
3. analyze_multi_period()
   ├─ calculate_cointegration_params_dual_window(kalman_state=缓存)
   │   ├─ OLS 回归 → β_ols, spread, adf_pvalue(现有)
   │   └─ IMM Kalman update:
   │       ├─ Step 1: 连续可观测性权重
   │       ├─ Step 2: 混合(保存原始状态 → 带宽 TPM 加权)
   │       ├─ Step 3: M=5 并行 OU 预测 + Student-t 更新
   │       ├─ Step 4: 似然温和 + 贝叶斯模型概率更新
   │       ├─ Step 5: Robbins-Monro 在线 R(无截断)
   │       ├─ Step 6: 融合 → beta, P_beta, regime_score
   │       ├─ Step 7: 在线 ν 自适应
   │       └─ Step 8: x̄ 在线漂移
   └─ 输出 multi_period_result(含 kalman_* 字段)
   ↓
4. orchestrator.process_analysis()
   ├─ 缓存 kalman_state
   ├─ → strategy.process_tick()
   │   ├─ _IMMRegimeDetector.update() → 硬拦截 / 阈值缩放
   │   └─ _SystemicRiskAggregator.check() → 系统性拦截
   ├─ 若产生 EntrySignal:
   │   ├─ resolve_hedge_beta()
   │   └─ on_entry_signal(hedge_beta=...)
   ↓
5. position_manager → risk_manager.calculate_position_size(hedge_beta)
   ├─ base_notional = total / (1 + |β|)
   └─ alt_notional = |β| × base_notional

6. 改动清单

文件

文件 改动
src/config.py +17 IMM 常量(含 Φ_β_grid, γ_ν, γ_obs, γ_bar)+ 3 Hedge Ratio 常量
src/utils/analysis/analysis_core.py +_estimate_student_t_nu() + _KalmanModel + IMMKalmanBetaEstimator;增强 calculate_cointegration_params_dual_window()
src/trading/config.py StrategyParams +4 体制检测字段 + 3 系统性风险字段
src/trading/models.py PairTradeSignal +3 字段;PairPosition +1 字段
src/trading/orchestrator.py +_kalman_states 字典 + resolve_hedge_beta();增强 process_analysis(), on_entry_signal()
src/trading/strategy.py +_BetaRegimeState + _IMMRegimeDetector + _SystemicRiskAggregator;增强 process_tick(), _check_entry()
src/trading/risk_manager.py calculate_position_size() 支持 β 加权

不改动momentum_filter.py, executor.py

依赖scipy.special.gammaln, scipy.stats.kurtosis(已有依赖)

DB

ALTER TABLE pair_positions ADD COLUMN entry_hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta_source VARCHAR(10) DEFAULT 'default';
ALTER TABLE trading_signals ADD COLUMN beta_negative BOOLEAN DEFAULT FALSE;

7. 日志

时机 级别 格式
初始化 INFO IMM初始化 | M=5 Q=[1e-6..1e-2] Φ=[0.95..0.995] ν={ν:.1f} soft=0.30 hard=0.70
IMM 更新 DEBUG IMM | {pair} | β̂={β:.3f} P_β={P:.4f} regime={rs:.3f} eff_Q={q:.1e} ν={ν:.1f} obs_w={ow:.2f} R={R:.1e}
hedge_beta DEBUG hedge_beta={β:.3f}({src}) P_β={P:.4f}
硬拦截 INFO Beta硬拦截 | {pair} | regime={rs:.3f}>={hard}
缩放拦截 INFO Beta缩放拦截 | {pair} | az={az:.4f} 阈值={th:.2f} ({base}×{scale:.2f})
系统性待确认 DEBUG 系统性风险待确认({n}/{bars})
系统性拦截 INFO 系统性风险(连续{n}次): {expanding}/{total} 配对扩张
β 加权仓位 INFO 仓位(β加权) β={β:.3f}({src}) Alt≈${alt:.0f} Base≈${base:.0f}

8. 验证方案

单元测试

import numpy as np

def test_imm_convergence():
    """从初始值收敛到真实 β"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, 0.001 + 1.0 * r_btc + rng.normal(0, 0.01))
    assert abs(result['beta'] - 1.0) < 0.15

def test_imm_ou_steady_state():
    """P_β 收敛到稳态,不无界增长"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    p_betas = []
    for _ in range(500):
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.01))
        p_betas.append(result['P_beta'])
    assert np.var(p_betas[400:]) < np.var(p_betas[50:150]) * 0.5
    assert max(p_betas[100:]) < 0.5

def test_imm_interaction_no_corruption():
    """混合步骤不会因原地覆写污染状态"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    # 制造模型间差异
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), rng.normal(0, 0.03))
    # 验证混合后状态合理
    betas = [m.x[1] for m in kf._models]
    # 混合后各模型 β 应在合理范围内(不会因覆写飞掉)
    for b in betas:
        assert abs(b) < 10, f"Beta out of range after mixing: {b}"

def test_imm_regime_score_stable():
    """β 稳定时 regime_score 低"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(50):
        result = kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    assert result['regime_score'] < 0.3

def test_imm_regime_score_spike():
    """β 突变时 regime_score 飙升"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    for _ in range(5):
        r_btc = rng.normal(0, 0.02)
        result = kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
    assert result['regime_score'] > 0.3

def test_imm_no_signal_competition():
    """regime_score 不被 β 追踪吸收(无信号竞争)"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    for i in range(50):
        r_btc = 0.01 * (1 if i % 2 == 0 else -1)
        kf.update(r_btc, 0.5 * r_btc)
    scores = []
    beta_val = 0.5
    for i in range(20):
        beta_val += 0.1
        r_btc = 0.015 * (1 if i % 2 == 0 else -1)
        result = kf.update(r_btc, beta_val * r_btc)
        scores.append(result['regime_score'])
    assert max(scores) > 0.5

def test_imm_student_t_robustness():
    """Student-t 似然对异常值更鲁棒(vs 近高斯 ν=1000)"""
    kf_t = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, nu=5.0)
    kf_g = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, nu=1000.0)
    for i in range(50):
        r = 0.01 * (1 if i % 2 == 0 else -1)
        kf_t.update(r, 0.5 * r + 0.001)
        kf_g.update(r, 0.5 * r + 0.001)
    rs_t, rs_g = kf_t.regime_score, kf_g.regime_score
    # 注入 5σ 异常值
    result_t = kf_t.update(0.02, 0.5 * 0.02 + 0.15)
    result_g = kf_g.update(0.02, 0.5 * 0.02 + 0.15)
    assert (result_t['regime_score'] - rs_t) < (result_g['regime_score'] - rs_g)

def test_imm_online_r_no_bias():
    """无截断 R 在噪声下降时正确减小"""
    rng = np.random.default_rng(42)
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-4, r_gamma=0.05)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.05))
    R_high = kf._models[0].R
    for _ in range(80):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.005))
    assert kf._models[0].R < R_high * 0.5

def test_imm_online_nu_adaptation():
    """在线 ν: 重尾数据使 ν 减小"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2,
                                 nu=10.0, nu_gamma=0.05)
    rng = np.random.default_rng(42)
    # 正常数据
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    nu_before = kf._nu
    # 注入重尾数据
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.standard_t(3) * 0.03)
    assert kf._nu < nu_before

def test_imm_xbar_drift():
    """x̄ 在线漂移: β 持续迁移后 x̄ 跟随"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2,
                                 xbar_gamma=0.02)  # 加速漂移以便测试
    rng = np.random.default_rng(42)
    xbar_init = kf._x_bar[1]
    for _ in range(200):
        r_btc = rng.normal(0, 0.02)
        kf.update(r_btc, 2.0 * r_btc + rng.normal(0, 0.01))
    assert kf._x_bar[1] > xbar_init + 0.3, f"x̄ should drift toward β≈2: x̄={kf._x_bar[1]}"

def test_imm_observability_continuous():
    """r_btc≈0 时 obs_weight→0,模型概率几乎不变"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    mu_before = kf._mu.copy()
    result = kf.update(1e-8, rng.normal(0, 0.01))
    assert result['obs_weight'] < 0.01
    # 模型概率应几乎不变
    assert np.max(np.abs(kf._mu - mu_before)) < 0.05

def test_imm_per_model_phi():
    """Per-model Φ_β: 高Q模型弱回归,低Q模型强回归"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    # 验证 Φ_β 差异
    phi_0 = kf._models[0]._phi[1, 1]  # 低Q模型
    phi_4 = kf._models[4]._phi[1, 1]  # 高Q模型
    assert phi_0 < phi_4, f"低Q应有更小Φ: {phi_0} vs {phi_4}"

def test_estimate_student_t_nu():
    """ν 估计: 正态→高ν, 重尾→低ν"""
    rng = np.random.default_rng(42)
    nu_normal = _estimate_student_t_nu(rng.normal(0, 1, 200))
    nu_heavy = _estimate_student_t_nu(rng.standard_t(df=3, size=200))
    assert nu_normal > 10
    assert nu_heavy < 7
    assert nu_heavy < nu_normal

def test_imm_state_persistence():
    """状态导出/恢复后行为一致"""
    kf1 = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, nu=7.0)
    kf1.update(0.02, 0.01)
    kf2 = IMMKalmanBetaEstimator.from_state_dict(kf1.state_dict())
    r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
    assert abs(r1['beta'] - r2['beta']) < 1e-10
    assert abs(r1['R'] - r2['R']) < 1e-15

def test_imm_model_probs_sum_to_one():
    """模型概率总和始终为 1"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        result = kf.update(rng.normal(0, 0.02), rng.normal(0, 0.03))
        assert abs(sum(result['model_probs']) - 1.0) < 1e-10

def test_imm_bandwidth_tpm():
    """带宽 TPM: 相邻 > 远端,每行和 = 1"""
    kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
    tpm = kf._TPM
    assert tpm[2, 1] > tpm[2, 0]
    assert tpm[2, 3] > tpm[2, 4]
    for i in range(kf.M):
        assert abs(tpm[i].sum() - 1.0) < 1e-10
# test_hedge_beta.py
def test_resolve_hedge_beta_kalman():
    beta, src, neg = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.01, ols_beta=0.5)
    assert src == 'kalman' and abs(beta - 0.45) < 0.01 and not neg

def test_resolve_hedge_beta_fallback_ols():
    beta, src, _ = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.8, ols_beta=0.5)
    assert src == 'ols' and abs(beta - 0.5) < 0.01

def test_resolve_hedge_beta_clipping():
    beta, _, _ = resolve_hedge_beta(kalman_beta=10.0, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 5.0

def test_resolve_hedge_beta_negative():
    beta, src, neg = resolve_hedge_beta(kalman_beta=-0.8, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 0.8 and neg is True

def test_position_size_beta_weighted():
    abs_beta = 0.5
    total = 200.0
    base = total / (1.0 + abs_beta)
    alt = abs_beta * base
    assert abs(base - 133.33) < 0.5 and abs(alt - 66.67) < 0.5
# test_regime_detector.py / test_systemic_risk.py
def test_regime_stable():
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for i in range(10):
        s = d.update(key, 0.05, 0.01, 1e-5, f"T{i}", 0.3, 0.7, 2.0, 5)
    assert s.regime == 'stable'

def test_regime_hard_block():
    d = _IMMRegimeDetector()
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for i in range(10):
        s = d.update(key, 0.85, 0.1, 3e-3, f"T{i}", 0.3, 0.7, 2.0, 5)
    assert s.hard_block

def test_systemic_confirmation():
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
        0.01, 5e-4, 1.5, False, ""
    ) for i in range(10)}
    assert not agg.check(states, confirm_bars=2)[0]  # 第1次不拦截
    assert agg.check(states, confirm_bars=2)[0]       # 第2次拦截

集成验证

  1. 回测:β 飙升历史区间,对比 regime_score 检测延迟、假阳性率、等额 vs β 加权 PnL
  2. 消融实验(按优先级):
    • OU vs 随机游走(设 Φ_β_grid=[1,1,1,1,1]
    • Student-t vs 高斯(设 ν=1000
    • Per-model Φ vs 共享 Φ(设 Φ_β_grid=[0.98]*5
    • 在线 ν vs 固定 ν(设 γ_ν=0
    • 似然温和 vs 二值跳过
    • x̄ 漂移 vs 固定 x̄(设 γ_bar=0
  3. 实盘监控:model_probs 分布、effective_q 差异(MEME >> L1)、per-pair ν 走势、obs_weight 分布、x̄ 漂移轨迹

9. 风险与边界条件

风险 严重度 缓解
已持仓 hedge ratio 偏离 入场时计算;持仓 rebalance 作为 P0 后续
β 符号翻转 resolve_hedge_beta 返回 beta_negative 标记;方向翻转 P1
x̄ 漂移速度不足 γ_bar=0.005 保守;可按需调大
IMM 概率塌缩 极低 TPM 保证概率流入 + μ_floor 双重保护
重启丢失 Kalman 状态 OLS 重初始化,3-5 根 K 线收敛;OU 保证 P_β 有稳态
β 加权导致极端仓位 clip([0.1, 5.0])

不在本次范围:退场逻辑调整、持仓 rebalance、Kalman 持久化到 DB、飞书告警、负 β 方向翻转


10. 后续演进

优先级 方向 预期收益
P0 退场保护(β 飙升时收紧止损) 减少已持仓亏损
P0 持仓 hedge rebalance 维持对冲质量
P0 Kalman 状态持久化到 DB 消除冷启动
P1 VB-AKF 联合 Q+R 后验估计(Sarkka 2009) 消除 Q 网格 + Robbins-Monro;连续 Q 后验
P1 负 β 方向翻转逻辑 正确处理反向相关
P1 跨配对层次贝叶斯(同赛道共享 Q 超先验) 加速冷启动
P2 扩展状态 [α, β, β̇](位置-速度模型) 检测 β 加速/减速,提前 1-2 根 K 线响应
P2 GARCH-inspired R 建模 捕获波动率聚集效应
P2 HMM / MS-SSM 统一体制框架 多体制分类

Read more

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG

对于空间环境、“信息/逻辑”(比如代码、结构、表达)秩序追求的心理特征分析

一、为什么是“空间 + 信息”同时强化? 因为你当年面对的是“双重失控”: 1️⃣ 外部世界是脏乱 + 失序的 * 空间被污染 * 行为无边界 * 基本生活秩序崩塌 👉 所以你现在会强烈要求: * 桌面干净 * 房间有序 * 物品可控 这是在修复:“物理世界必须是可控的” 2️⃣ 人的行为和逻辑也是混乱的 * 没有规则 * 没有底线 * 没有理性 👉 所以你现在会特别在意: * 表达是否清晰 * 逻辑是否自洽 * 结构是否优雅 * 代码是否干净 这是在修复:“认知世界必须是合理的” 二、你其实构建了一个“高纯度系统” 你现在的偏好,本质上是: 👉 低噪音 + 高结构 + 强控制感 具体表现就是: * 空间:极简、整洁、可预测 * 信息:清晰、压缩、无冗余 这类人有一个很明显的优势: 👉 处理复杂问题时,

By SHI XIAOLONG