IMM Kalman Beta 体制自适应设计方案(v4.0)
IMM Kalman Beta 体制自适应设计方案(v4.0)
1. 问题背景
当前系统在 Beta 估计上存在以下结构性缺陷:
| 组件 | 问题 | 影响 |
|---|---|---|
analysis_core.py |
OLS BETA_WINDOW=100 固定窗口 |
β 估计滞后 ≈17天 |
risk_manager.py |
等额名义价值,未使用 β | 对冲比例错误 |
strategy.py |
固定阈值,无体制检测 | β 飙升时持续误入场 |
| 高斯似然 | 加密货币重尾(kurtosis 5-20)下过度敏感 | 模型概率频繁震荡 |
| 随机游走状态方程 | P_β 无稳态解 | 长期运行后降级到 OLS |
2. 架构概述
IMM(Interacting Multiple Model)Kalman Filter + OU 均值回归 估计时变 [α, β],输出同时服务两个用途:
┌──────────────────────────────────────────────────────┐
│ IMMKalmanBetaEstimator (4h) │
│ M=5 并行模型, OU 状态方程(per-model Φ_β), │
│ Student-t 似然(在线 ν), Robbins-Monro R, │
│ 连续可观测性加权, x̄ 在线漂移 │
└──────────────┬───────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼
用途 1: 体制检测 用途 2: Hedge Ratio
┌─────────────────┐ ┌─────────────────────┐
│ IMM 模型概率判定 │ │ 仓位比例计算 │
│ P(高Q模型) 阈值 │ │ alt_notional = β × │
│ → regime_score │ │ base_notional │
└────────┬────────┘ └──────────┬──────────┘
│ │
▼ ▼
入场信号过滤 Beta 加权开仓
(硬拦截/阈值缩放)
│
▼
跨配对系统性风险聚合
(双阈值 + 确认窗口)
四层架构
| 层 | 组件 | 功能 |
|---|---|---|
| 第一层 | IMM Kalman Filter | M=5 并行 OU Kalman + Student-t 似然 + 在线 R/ν + 可观测性加权 + 贝叶斯模型概率融合 |
| 第二层 | IMM 体制检测 | P(高Q模型) 作为体制指标 → 入场信号过滤 |
| 第三层 | 跨配对系统性风险聚合 | 双阈值独立设定 + 确认窗口 → 系统性事件拦截 |
| 第四层 | Beta 加权仓位计算 | kalman_beta 直接作为 hedge ratio |
核心设计要点
- IMM 多模型融合:M=5 个 Q_β 值并行竞争,贝叶斯概率自动选出最优,无启发式阈值
- OU 均值回归 + Per-model Φ_β:每个模型的回归强度与 Q_β 配对——低Q模型强回归(β 稳定时锚定均值),高Q模型弱回归(β 变化时不抵抗)
- Student-t 似然 + 在线 ν:重尾鲁棒 + 自适应尾部厚度
- 连续可观测性加权:替代二值阈值,r_btc 小时通过似然温和(likelihood tempering)自然降低模型概率更新力度,消除边界不连续
- x̄ 在线漂移:OU 长期均值缓慢追踪 β 的持续迁移,防止长期运行后回归力变成阻力
3. 算法设计
3.1 数学模型
M 个并行状态空间模型,每个模型 j 使用不同的 (Q_β^(j), Φ_β^(j)) 对:
状态方程(OU 过程,per-model Φ_β):
x_t = Φ^(j) × x_{t-1} + (I - Φ^(j)) × x̄_t + w_t, w_t ~ N(0, Q^(j))
x_t = [α_t, β_t]'
x̄_t = [ᾱ_t, β̄_t]' 在线缓慢漂移的长期均值
Φ^(j) = diag(Φ_α, Φ_β^(j)) per-model 对角回归矩阵
Q^(j) = diag(q_α, q_β^(j)) q_α 共享, q_β^(j) 各异
观测方程(所有模型共享):
r_alt_t = H_t × x_t + v_t, v_t ~ N(0, R_t)
H_t = [1, r_btc_t]
R_t 通过 Robbins-Monro EMA 在线更新
Q_β 与 Φ_β 配对网格:
| 模型 j | Q_β^(j) | Φ_β^(j) | 物理含义 | P_∞_β |
|---|---|---|---|---|
| 0 | 1e-6 | 0.950 | β 几乎不变,强回归 | 1.0e-5 |
| 1 | 1e-5 | 0.970 | β 缓慢变化 | 1.7e-4 |
| 2 | 1e-4 | 0.980 | β 正常变化 | 2.5e-3 |
| 3 | 1e-3 | 0.990 | β 快速变化,弱回归 | 5.0e-2 |
| 4 | 1e-2 | 0.995 | 结构性断裂,近自由漂移 | 1.0 |
Per-model Φ_β 的意义:高Q模型允许 β 快速偏离均值(Φ 接近 1),低Q模型将 β 锚定在 x̄ 附近(Φ 较小)。P_∞ 跨 5 个数量级,模型间区分度更强。
3.2 IMM 递推算法
输入: r_btc_t, r_alt_t, 上一步状态 {x^(j), P^(j), μ_j}
Step 1: 可观测性权重
btc_var_ema = (1-γ_obs) × btc_var_ema + γ_obs × r_btc²
obs_weight = min(r_btc² / max(btc_var_ema, 1e-12), 1.0)
含义: obs_weight ∈ [0,1],|r_btc| 在典型水平时 ≈1,远小于典型值时 →0
作用: 在 Step 4 中对似然温和,r_btc≈0 时模型概率几乎不更新
Step 2: 混合(Interaction)
⚠️ 必须先保存所有模型的原始状态副本,再执行混合
预测模型概率: μ̃_j = Σ_i π_{ij} × μ_i
混合权重: w_{i|j} = π_{ij} × μ_i / μ̃_j
# 保存原始状态
orig_x = [model_i.x.copy() for i]
orig_P = [model_i.P.copy() for i]
# 为每个模型 j 准备混合初始条件(从原始副本计算)
x̃^(j) = Σ_i w_{i|j} × orig_x[i]
P̃^(j) = Σ_i w_{i|j} × [orig_P[i] + (orig_x[i] - x̃^(j))(orig_x[i] - x̃^(j))']
Step 3: 并行滤波(每个模型 j 独立运行 OU Kalman)
OU 预测:
x_pred^(j) = Φ^(j) × x̃^(j) + (I - Φ^(j)) × x̄
P_pred^(j) = Φ^(j) × P̃^(j) × Φ^(j)' + Q^(j)
Innovation:
ε^(j) = r_alt_t - H_t × x_pred^(j)
S^(j) = H_t × P_pred^(j) × H_t' + R
Student-t 似然(用原始 innovation):
L_j = t_ν(ε^(j); 0, S^(j))
Huber Clipping(保护状态更新,不影响似然):
ε̃^(j) = clip(ε^(j), ±c×√S^(j))
Kalman 更新(Joseph 稳定形式):
K^(j) = P_pred^(j) × H_t' / S^(j)
x^(j) = x_pred^(j) + K^(j) × ε̃^(j)
A = I - K^(j) × H_t
P^(j) = A × P_pred^(j) × A' + R × K^(j) × K^(j)'
Step 4: 模型概率更新(含可观测性温和)
似然温和: log_L_tempered_j = obs_weight × log_L_j
obs_weight=1 → 正常贝叶斯更新
obs_weight=0 → L_tempered=1(各模型无差异)→ μ 保持 μ̃ 不变
log_μ_j = log(μ̃_j) + log_L_tempered_j
μ_j = softmax(log_μ)
μ_j = max(μ_j, μ_floor),再归一化
Step 5: 在线 R 更新(Robbins-Monro EMA,无截断偏差)
weighted_innov_sq = Σ_j μ̃_j × ε^(j)² (用预测概率,避免循环依赖)
weighted_HP_H = Σ_j μ̃_j × H P_pred^(j) H'
R_update = weighted_innov_sq - weighted_HP_H (允许负值,无 max(.,0))
R = (1-γ_R) × R + γ_R × R_update
R = max(R, R_floor)
Step 6: 融合输出
β = Σ_j μ_j × β^(j)
P_β = Σ_j μ_j × [P_β^(j) + (β^(j) - β)²]
regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j
Step 7: 在线 ν 自适应
weighted_norm_innov = Σ_j μ_j × (ε^(j) / √S^(j))
kurt_ema = (1-γ_ν) × kurt_ema + γ_ν × weighted_norm_innov⁴
excess_kurt = kurt_ema - 3.0
若 excess_kurt > 0.2:
ν = clip(6/excess_kurt + 4, 3, 30)
同步到所有 M 个模型
公式推导: Student-t(ν) 的 excess kurtosis = 6/(ν-4),反解得 ν = 6/κ + 4
Step 8: x̄ 在线漂移
x̄[α] = (1-γ_bar) × x̄[α] + γ_bar × α_fused
x̄[β] = (1-γ_bar) × x̄[β] + γ_bar × β_fused
同步到所有 M 个模型
γ_bar=0.005 → 有效窗口 ~200步 ≈ 33天,远慢于 β 追踪(~50步)
确保 x̄ 仅在 β 持续迁移时才漂移,不追踪短期波动
模型转移概率矩阵 Π(带宽 TPM):
Π[i,j] = { p_stay if i == j
{ (1-p_stay) × exp(-|i-j|) / Z_i if i ≠ j
相邻模型跳转概率 > 远端模型,符合 β 变化速度渐变的物理特征。
p_stay=0.98 → 模型平均持续 ~50 步(~8天)。
3.3 参数
| 参数 | 默认值 | 含义 | 备注 |
|---|---|---|---|
| Q_β 网格 | [1e-6, 1e-5, 1e-4, 1e-3, 1e-2] | M=5 模型的 β 过程噪声 | 对数均匀,4 个数量级 |
| Φ_β 网格 | [0.95, 0.97, 0.98, 0.99, 0.995] | Per-model OU 回归强度 | 低Q强回归,高Q弱回归 |
| q_α | 1e-5 | α 过程噪声(共享) | 通常不调 |
| Φ_α | 0.995 | α 回归系数(共享) | 特征时间 ~33天 |
| R_floor | 1e-8 | R 正定下限 | — |
| γ_R | 0.02 | R 学习率 | 有效窗口 ~50步 |
| clip_sigma | 3.0 | Huber 截断 | — |
| ν_init | per-pair | Student-t 初始自由度 | 从 OLS 残差 kurtosis 推算 |
| γ_ν | 0.01 | 在线 ν 学习率 | 有效窗口 ~100步 |
| p_stay | 0.98 | TPM 对角线 | 模型持续 ~8天 |
| q_high | 1e-3 | 高Q阈值 | — |
| μ_floor | 1e-6 | 模型概率下限 | 防下溢 |
| γ_obs | 0.05 | BTC 方差 EMA 衰减 | 可观测性权重 |
| γ_bar | 0.005 | x̄ 漂移学习率 | 有效窗口 ~33天 |
| P₀ | diag(0.1, 1.0) | 初始不确定性 | — |
参数间一致性约束:
q_α = Q_β_grid[2]/10→ α 变化比中位 β 慢一个量级Φ_β_grid与Q_β_grid配对 → 高Q模型弱回归γ_bar ≪ 1/(-ln(Φ_β_grid[2]))→ x̄ 迁移远慢于 β 追踪γ_R有效窗口与中位 β 特征时间匹配
3.4 体制检测
regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j 直接作为体制指标:
| regime_score | 含义 | 动作 |
|---|---|---|
| < soft_prob (0.3) | 低Q模型主导,β 稳定 | 正常入场 |
| [soft_prob, hard_prob) | β 开始变化 | 阈值缩放(线性插值到 scale_max) |
| ≥ hard_prob (0.7) | β 剧烈变化 | 硬拦截 |
可观测性保护:当 obs_weight ≈ 0 时模型概率几乎不更新,regime_score 自然保持稳定。
3.5 跨配对系统性风险聚合
双独立阈值 + 确认窗口:
条件 1: expanding 配对比例 ≥ ratio_threshold (0.3)
条件 2: 加权 regime_score ≥ weighted_threshold (0.25)
任一触发 → 累计确认计数
连续 confirm_bars (2) 次触发 → 全局拦截
中间恢复 → 计数归零
3.6 Beta 加权仓位
hedge_beta 选择:Kalman β(P_β ≤ P_max 时)→ OLS β(降级)→ 1.0(兜底)。
仓位公式:base_notional = total / (1 + |β|), alt_notional = |β| × base_notional。β=1.0 时退化为等额。
4. 实现
4.1 src/config.py
# ═══ IMM Kalman Filter 参数(v4.0)═══
IMM_Q_BETA_GRID: list[float] = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
IMM_PHI_BETA_GRID: list[float] = [0.95, 0.97, 0.98, 0.99, 0.995]
IMM_Q_ALPHA: float = 1e-5
IMM_PHI_ALPHA: float = 0.995
IMM_P0_ALPHA: float = 0.1
IMM_P0_BETA: float = 1.0
IMM_R_FLOOR: float = 1e-8
IMM_R_GAMMA: float = 0.02
IMM_CLIP_SIGMA: float = 3.0
IMM_STUDENT_T_NU: float = 5.0 # 默认 ν(per-pair 初始化时覆盖)
IMM_NU_GAMMA: float = 0.01 # 在线 ν 学习率
IMM_TRANSITION_PROB: float = 0.98
IMM_HIGH_Q_THRESHOLD: float = 1e-3
IMM_MU_FLOOR: float = 1e-6
IMM_OBS_GAMMA: float = 0.05 # BTC 方差 EMA 衰减(可观测性)
IMM_XBAR_GAMMA: float = 0.005 # x̄ 在线漂移学习率
# ═══ Hedge Ratio 参数 ═══
HEDGE_BETA_MIN: float = 0.1
HEDGE_BETA_MAX: float = 5.0
HEDGE_BETA_P_MAX: float = 0.5
4.2 src/utils/analysis/analysis_core.py — 核心类
import numpy as np
from scipy.special import gammaln
from scipy.stats import kurtosis as sp_kurtosis
def _estimate_student_t_nu(residuals: np.ndarray, default: float = 5.0) -> float:
"""从 OLS 残差的 excess kurtosis 估计 Student-t 自由度 ν
矩匹配: Student-t(ν) 的 excess kurtosis = 6/(ν-4)
反解: ν = 6/κ + 4
"""
if len(residuals) < 20:
return default
kurt = float(sp_kurtosis(residuals, fisher=True)) # excess kurtosis
if kurt <= 0.2:
return 30.0 # 轻尾 → 近高斯
nu = 6.0 / kurt + 4.0
return float(np.clip(nu, 3.0, 30.0))
class _KalmanModel:
"""单一 Kalman Filter 模型(IMM 内部组件)
2D 状态 [α, β],OU 均值回归 + 固定 Q + 共享 R。
Student-t 似然 + Joseph 稳定形式 + Huber clipping。
"""
def __init__(
self,
x: np.ndarray,
P: np.ndarray,
q_alpha: float,
q_beta: float,
R: float,
x_bar: np.ndarray,
phi: np.ndarray, # [Φ_α, Φ_β^(j)] — per-model
r_floor: float = 1e-8,
clip_sigma: float = 3.0,
nu: float = 5.0,
):
self.x = x.copy()
self.P = P.copy()
self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
self.R = max(R, r_floor)
self.x_bar = x_bar.copy()
self._phi = np.diag(phi).astype(np.float64)
self._I_phi = np.eye(2) - self._phi
self._r_floor = r_floor
self._clip_sigma = clip_sigma
self.nu = nu
self._update_t_const()
def _update_t_const(self):
self._log_t_const = (
gammaln((self.nu + 1) / 2) - gammaln(self.nu / 2)
- 0.5 * np.log(self.nu * np.pi)
)
def predict(self):
self.x = self._phi @ self.x + self._I_phi @ self.x_bar
self.P = self._phi @ self.P @ self._phi.T + self.Q
def update(self, r_btc: float, r_alt: float) -> dict:
H = np.array([1.0, r_btc], dtype=np.float64)
innovation = r_alt - H @ self.x
HP_H = float(H @ self.P @ H)
S = HP_H + self.R
S = max(S, 1e-15)
# Student-t 似然(原始 innovation)
log_likelihood = (
self._log_t_const - 0.5 * np.log(S)
- ((self.nu + 1) / 2) * np.log(1 + innovation ** 2 / (self.nu * S))
)
# Huber clipping(保护状态更新)
sqrt_S = S ** 0.5
norm_innov = innovation / sqrt_S
clipped = False
innov_update = innovation
if abs(norm_innov) > self._clip_sigma:
innov_update = self._clip_sigma * sqrt_S * (1.0 if innovation > 0 else -1.0)
clipped = True
K = (self.P @ H) / S
self.x = self.x + K * innov_update
# Joseph 稳定形式
A = np.eye(2) - np.outer(K, H)
self.P = A @ self.P @ A.T + self.R * np.outer(K, K)
return {
'alpha': float(self.x[0]),
'beta': float(self.x[1]),
'P_beta': float(self.P[1, 1]),
'P_alpha': float(self.P[0, 0]),
'innovation': innovation,
'normalized_innovation': norm_innov,
'S': S,
'HP_H': HP_H,
'log_likelihood': log_likelihood,
'clipped': clipped,
'kalman_gain_beta': float(K[1]),
}
class IMMKalmanBetaEstimator:
"""Interacting Multiple Model Kalman Filter 时变 [α, β] 联合估计器
核心思想 (Blom & Bar-Shalom, 1988):
M 个 Kalman Filter 并行运行,每个使用不同的 (Q_β, Φ_β) 对,
通过贝叶斯模型概率实时加权融合。
双用途输出:
- beta → hedge ratio
- regime_score → 体制检测(高Q模型总概率)
每根新 4h K线调用 update() 一次。
"""
def __init__(
self,
alpha_init: float,
beta_init: float,
q_beta_grid: list[float] | None = None,
phi_beta_grid: list[float] | None = None,
q_alpha: float = 1e-5,
phi_alpha: float = 0.995,
r_init: float = 1e-2,
p0_alpha: float = 0.1,
p0_beta: float = 1.0,
r_floor: float = 1e-8,
r_gamma: float = 0.02,
clip_sigma: float = 3.0,
nu: float = 5.0,
nu_gamma: float = 0.01,
transition_prob: float = 0.98,
high_q_threshold: float = 1e-3,
mu_floor: float = 1e-6,
obs_gamma: float = 0.05,
xbar_gamma: float = 0.005,
btc_var_init: float = 4e-4,
):
if q_beta_grid is None:
q_beta_grid = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
if phi_beta_grid is None:
phi_beta_grid = [0.95, 0.97, 0.98, 0.99, 0.995]
assert len(q_beta_grid) == len(phi_beta_grid), "Q_β 和 Φ_β 网格长度必须一致"
self.M = len(q_beta_grid)
self._q_beta_grid = list(q_beta_grid)
self._phi_beta_grid = list(phi_beta_grid)
self._high_q_threshold = high_q_threshold
self._r_gamma = r_gamma
self._r_floor = r_floor
self._clip_sigma = clip_sigma
self._mu_floor = mu_floor
self._n_updates = 0
# 在线 ν 自适应
self._nu = nu
self._nu_gamma = nu_gamma
# kurt_ema 初始化: 从 ν 反推 E[z⁴] = 3 + 6/(ν-4)
self._kurt_ema = 3.0 + 6.0 / max(nu - 4.0, 0.5)
# 连续可观测性
self._obs_gamma = obs_gamma
self._btc_var_ema = btc_var_init
# x̄ 在线漂移
self._xbar_gamma = xbar_gamma
x_bar = np.array([alpha_init, beta_init], dtype=np.float64)
self._x_bar = x_bar.copy()
# 初始化 M 个 Kalman 模型(per-model Φ_β)
x0 = np.array([alpha_init, beta_init], dtype=np.float64)
P0 = np.diag([p0_alpha, p0_beta]).astype(np.float64)
self._models = [
_KalmanModel(
x0, P0, q_alpha, q_beta_grid[j], r_init, x_bar,
np.array([phi_alpha, phi_beta_grid[j]]),
r_floor, clip_sigma, nu,
)
for j in range(self.M)
]
# 模型概率(均匀初始化)
self._mu = np.ones(self.M) / self.M
# 带宽转移概率矩阵
self._TPM = np.zeros((self.M, self.M))
for i in range(self.M):
weights = np.array([
np.exp(-abs(i - j)) if i != j else 0.0
for j in range(self.M)
])
w_sum = weights.sum()
if w_sum > 0:
self._TPM[i] = weights / w_sum * (1.0 - transition_prob)
self._TPM[i, i] = transition_prob
# 高Q模型索引
self._high_q_indices = [
j for j, q in enumerate(q_beta_grid) if q >= high_q_threshold
]
@property
def beta(self) -> float:
return float(sum(self._mu[j] * self._models[j].x[1] for j in range(self.M)))
@property
def beta_variance(self) -> float:
b = self.beta
return float(sum(
self._mu[j] * (self._models[j].P[1, 1] + (self._models[j].x[1] - b) ** 2)
for j in range(self.M)
))
@property
def regime_score(self) -> float:
return float(sum(self._mu[j] for j in self._high_q_indices))
@property
def effective_q_beta(self) -> float:
return float(sum(self._mu[j] * self._q_beta_grid[j] for j in range(self.M)))
def update(self, r_btc: float, r_alt: float) -> dict:
"""接收一对新的对数收益率,执行完整 IMM 更新"""
self._n_updates += 1
# ── Step 1: 可观测性权重(连续) ──
self._btc_var_ema = ((1 - self._obs_gamma) * self._btc_var_ema
+ self._obs_gamma * r_btc ** 2)
obs_weight = min(r_btc ** 2 / max(self._btc_var_ema, 1e-12), 1.0)
# ── Step 2: 混合(Interaction) ──
mu_predicted = self._TPM.T @ self._mu
mix_weights = np.zeros((self.M, self.M))
for j in range(self.M):
if mu_predicted[j] > 1e-30:
for i in range(self.M):
mix_weights[i, j] = self._TPM[i, j] * self._mu[i] / mu_predicted[j]
# ⚠️ 保存原始状态副本,防止混合循环中覆写污染
orig_x = [m.x.copy() for m in self._models]
orig_P = [m.P.copy() for m in self._models]
for j in range(self.M):
x_mixed = np.zeros(2)
for i in range(self.M):
x_mixed += mix_weights[i, j] * orig_x[i]
P_mixed = np.zeros((2, 2))
for i in range(self.M):
dx = orig_x[i] - x_mixed
P_mixed += mix_weights[i, j] * (orig_P[i] + np.outer(dx, dx))
self._models[j].x = x_mixed
self._models[j].P = P_mixed
# ── Step 3: 并行滤波(OU 预测 + 观测更新) ──
results = []
log_likelihoods = np.zeros(self.M)
for j in range(self.M):
self._models[j].predict()
r = self._models[j].update(r_btc, r_alt)
results.append(r)
log_likelihoods[j] = r['log_likelihood']
# ── Step 4: 模型概率更新(似然温和 + 概率下限保护) ──
log_L_tempered = obs_weight * log_likelihoods
log_mu = np.log(np.maximum(mu_predicted, 1e-30)) + log_L_tempered
log_mu -= np.max(log_mu)
self._mu = np.exp(log_mu)
total = np.sum(self._mu)
if total > 0:
self._mu /= total
else:
self._mu = np.ones(self.M) / self.M
self._mu = np.maximum(self._mu, self._mu_floor)
self._mu /= self._mu.sum()
# ── Step 5: 在线 R 更新(Robbins-Monro,无截断) ──
if self._r_gamma > 0:
weighted_innov_sq = sum(
mu_predicted[j] * results[j]['innovation'] ** 2 for j in range(self.M)
)
weighted_HP_H = sum(
mu_predicted[j] * results[j]['HP_H'] for j in range(self.M)
)
r_update = weighted_innov_sq - weighted_HP_H
new_R = (1 - self._r_gamma) * self._models[0].R + self._r_gamma * r_update
new_R = max(new_R, self._r_floor)
for m in self._models:
m.R = new_R
# ── Step 6: 融合输出 ──
beta = sum(self._mu[j] * results[j]['beta'] for j in range(self.M))
alpha = sum(self._mu[j] * results[j]['alpha'] for j in range(self.M))
P_beta = sum(
self._mu[j] * (results[j]['P_beta'] + (results[j]['beta'] - beta) ** 2)
for j in range(self.M)
)
P_alpha = sum(
self._mu[j] * (results[j]['P_alpha'] + (results[j]['alpha'] - alpha) ** 2)
for j in range(self.M)
)
regime_score = sum(self._mu[j] for j in self._high_q_indices)
effective_q = sum(self._mu[j] * self._q_beta_grid[j] for j in range(self.M))
dominant = int(np.argmax(self._mu))
weighted_innovation = sum(self._mu[j] * results[j]['innovation'] for j in range(self.M))
weighted_norm_innov = sum(
self._mu[j] * results[j]['normalized_innovation'] for j in range(self.M)
)
any_clipped = any(results[j]['clipped'] for j in range(self.M) if self._mu[j] > 0.1)
# ── Step 7: 在线 ν 自适应 ──
if self._nu_gamma > 0 and obs_weight > 0.3:
self._kurt_ema = ((1 - self._nu_gamma) * self._kurt_ema
+ self._nu_gamma * weighted_norm_innov ** 4)
excess_kurt = self._kurt_ema - 3.0
if excess_kurt > 0.2:
new_nu = float(np.clip(6.0 / excess_kurt + 4.0, 3.0, 30.0))
self._nu = new_nu
for m in self._models:
m.nu = new_nu
m._update_t_const()
# ── Step 8: x̄ 在线漂移 ──
if self._xbar_gamma > 0:
self._x_bar[0] = (1 - self._xbar_gamma) * self._x_bar[0] + self._xbar_gamma * alpha
self._x_bar[1] = (1 - self._xbar_gamma) * self._x_bar[1] + self._xbar_gamma * beta
for m in self._models:
m.x_bar = self._x_bar.copy()
return {
'alpha': float(alpha),
'beta': float(beta),
'P_beta': float(P_beta),
'P_alpha': float(P_alpha),
'regime_score': float(regime_score),
'effective_q_beta': float(effective_q),
'model_probs': self._mu.copy(),
'dominant_model_idx': dominant,
'innovation': float(weighted_innovation),
'normalized_innovation': float(weighted_norm_innov),
'clipped': any_clipped,
'obs_weight': float(obs_weight),
'kalman_gain_beta': results[dominant]['kalman_gain_beta'],
'R': self._models[0].R,
'nu': self._nu,
}
def state_dict(self) -> dict:
return {
'models': [
{
'x': m.x.tolist(), 'P': m.P.tolist(),
'Q': m.Q.tolist(), 'R': m.R,
'phi': m._phi.tolist(),
}
for m in self._models
],
'mu': self._mu.tolist(),
'q_beta_grid': self._q_beta_grid,
'phi_beta_grid': self._phi_beta_grid,
'high_q_threshold': self._high_q_threshold,
'n_updates': self._n_updates,
'TPM': self._TPM.tolist(),
'r_gamma': self._r_gamma,
'r_floor': self._r_floor,
'nu': self._nu,
'nu_gamma': self._nu_gamma,
'kurt_ema': self._kurt_ema,
'clip_sigma': self._clip_sigma,
'x_bar': self._x_bar.tolist(),
'xbar_gamma': self._xbar_gamma,
'obs_gamma': self._obs_gamma,
'btc_var_ema': self._btc_var_ema,
'mu_floor': self._mu_floor,
}
@classmethod
def from_state_dict(cls, d: dict) -> 'IMMKalmanBetaEstimator':
obj = cls.__new__(cls)
obj.M = len(d['models'])
obj._q_beta_grid = d['q_beta_grid']
obj._phi_beta_grid = d.get('phi_beta_grid', [0.95, 0.97, 0.98, 0.99, 0.995])
obj._high_q_threshold = d['high_q_threshold']
obj._n_updates = d['n_updates']
obj._mu = np.array(d['mu'], dtype=np.float64)
obj._TPM = np.array(d['TPM'], dtype=np.float64)
obj._r_gamma = d.get('r_gamma', 0.02)
obj._r_floor = d.get('r_floor', 1e-8)
obj._nu = d.get('nu', 5.0)
obj._nu_gamma = d.get('nu_gamma', 0.01)
obj._kurt_ema = d.get('kurt_ema', 9.0)
obj._clip_sigma = d.get('clip_sigma', 3.0)
obj._x_bar = np.array(d.get('x_bar', [0.0, 1.0]), dtype=np.float64)
obj._xbar_gamma = d.get('xbar_gamma', 0.005)
obj._obs_gamma = d.get('obs_gamma', 0.05)
obj._btc_var_ema = d.get('btc_var_ema', 4e-4)
obj._mu_floor = d.get('mu_floor', 1e-6)
obj._models = []
for md in d['models']:
m = _KalmanModel.__new__(_KalmanModel)
m.x = np.array(md['x'], dtype=np.float64)
m.P = np.array(md['P'], dtype=np.float64)
m.Q = np.array(md['Q'], dtype=np.float64)
m.R = md['R']
m.x_bar = obj._x_bar.copy()
m._phi = np.array(md['phi'], dtype=np.float64)
m._I_phi = np.eye(2) - m._phi
m._r_floor = obj._r_floor
m._clip_sigma = obj._clip_sigma
m.nu = obj._nu
m._update_t_const()
obj._models.append(m)
obj._high_q_indices = [
j for j, q in enumerate(obj._q_beta_grid) if q >= obj._high_q_threshold
]
return obj
4.3 src/utils/analysis/analysis_core.py — 集成点
calculate_cointegration_params_dual_window() 在现有 OLS 之后新增 IMM 更新:
def calculate_cointegration_params_dual_window(
base_klines, alt_klines,
beta_window=None, zscore_window=None,
kalman_state: dict | None = None,
):
# ... 现有 OLS 逻辑不变 ...
# ═══ IMM Kalman Filter 更新 ═══
kalman_result = None
kalman_state_out = kalman_state
if len(aligned) >= 2:
r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])
if kalman_state is not None:
kf = IMMKalmanBetaEstimator.from_state_dict(kalman_state)
else:
ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
nu_init = _estimate_student_t_nu(
model.resid if hasattr(model, 'resid') else np.array([]),
default=IMM_STUDENT_T_NU,
)
kf = IMMKalmanBetaEstimator(
alpha_init=alpha_ols if use_alpha else 0.0,
beta_init=beta_ols,
q_beta_grid=IMM_Q_BETA_GRID,
phi_beta_grid=IMM_PHI_BETA_GRID,
q_alpha=IMM_Q_ALPHA, phi_alpha=IMM_PHI_ALPHA,
r_init=max(ols_residual_var, 1e-6),
p0_alpha=IMM_P0_ALPHA, p0_beta=IMM_P0_BETA,
r_floor=IMM_R_FLOOR, r_gamma=IMM_R_GAMMA,
clip_sigma=IMM_CLIP_SIGMA,
nu=nu_init, nu_gamma=IMM_NU_GAMMA,
transition_prob=IMM_TRANSITION_PROB,
high_q_threshold=IMM_HIGH_Q_THRESHOLD,
mu_floor=IMM_MU_FLOOR,
obs_gamma=IMM_OBS_GAMMA,
xbar_gamma=IMM_XBAR_GAMMA,
)
kalman_result = kf.update(r_btc, r_alt)
kalman_state_out = kf.state_dict()
return {
# ... 现有字段不变 ...
'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
'kalman_regime_score': kalman_result['regime_score'] if kalman_result else 0.0,
'kalman_effective_q': kalman_result['effective_q_beta'] if kalman_result else 1e-4,
'kalman_model_probs': kalman_result['model_probs'].tolist() if kalman_result else None,
'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
'kalman_clipped': kalman_result['clipped'] if kalman_result else False,
'kalman_obs_weight': kalman_result['obs_weight'] if kalman_result else 1.0,
'kalman_nu': kalman_result['nu'] if kalman_result else 5.0,
'kalman_state': kalman_state_out,
}
4.4 src/trading/strategy.py — 体制检测与系统性风险
@dataclass
class _BetaRegimeState:
"""Beta 体制检测结果"""
regime: str # 'stable' | 'expanding'
regime_score: float
kalman_P_beta: float
effective_q_beta: float
threshold_scale: float # ≥1.0
hard_block: bool
reason: str
class _IMMRegimeDetector:
"""基于 IMM 模型概率的 Beta 体制检测器
直接使用 regime_score(高Q模型总概率)判定 β 状态。
O(1) 内存 per pair, O(1) 计算 per check。
"""
def __init__(self):
self._pair_states: dict[PairKey, _BetaRegimeState] = {}
self._n_updates: dict[PairKey, int] = {}
def update(
self, key: PairKey, regime_score: float,
kalman_P_beta: float, effective_q_beta: float,
kline_time: str,
soft_prob: float, hard_prob: float,
scale_max: float, warmup: int,
) -> _BetaRegimeState:
n = self._n_updates.get(key, 0) + 1
self._n_updates[key] = n
if n < warmup:
state = _BetaRegimeState(
'stable', regime_score, kalman_P_beta,
effective_q_beta, 1.0, False, "数据不足"
)
elif regime_score >= hard_prob:
state = _BetaRegimeState(
'expanding', regime_score, kalman_P_beta,
effective_q_beta, scale_max, True,
f"Beta硬拦截: regime={regime_score:.3f}>={hard_prob} eff_Q={effective_q_beta:.1e}"
)
elif regime_score >= soft_prob:
t = min(max((regime_score - soft_prob) / max(hard_prob - soft_prob, 0.01), 0), 1)
scale = 1.0 + t * (scale_max - 1.0)
state = _BetaRegimeState(
'expanding', regime_score, kalman_P_beta,
effective_q_beta, scale, False,
f"Beta缩放: regime={regime_score:.3f} scale={scale:.2f}"
)
else:
state = _BetaRegimeState(
'stable', regime_score, kalman_P_beta,
effective_q_beta, 1.0, False,
f"Beta稳定: regime={regime_score:.3f}"
)
self._pair_states[key] = state
return state
def get_all_states(self) -> dict[PairKey, _BetaRegimeState]:
return self._pair_states
def cleanup_pair(self, key: PairKey) -> None:
self._pair_states.pop(key, None)
self._n_updates.pop(key, None)
class _SystemicRiskAggregator:
"""双独立阈值 + 确认窗口的系统性风险聚合"""
def __init__(self, enabled: bool = True):
self._enabled = enabled
self._consecutive_triggers = 0
def check(
self,
pair_states: dict[PairKey, _BetaRegimeState],
ratio_threshold: float = 0.3,
weighted_threshold: float = 0.25,
confirm_bars: int = 2,
position_weights: dict[PairKey, float] | None = None,
) -> tuple[bool, str]:
if not self._enabled or not pair_states:
self._consecutive_triggers = 0
return False, ""
total = len(pair_states)
n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
ratio = n_expanding / total
if position_weights:
total_w = sum(position_weights.get(k, 1.0) for k in pair_states)
weighted_score = sum(
position_weights.get(k, 1.0) * s.regime_score
for k, s in pair_states.items()
) / total_w
else:
weighted_score = sum(s.regime_score for s in pair_states.values()) / total
ratio_hit = ratio >= ratio_threshold
weighted_hit = weighted_score >= weighted_threshold
if ratio_hit or weighted_hit:
self._consecutive_triggers += 1
if self._consecutive_triggers >= confirm_bars:
return True, (
f"系统性风险(连续{self._consecutive_triggers}次): "
f"{n_expanding}/{total} ({ratio:.0%})配对扩张"
f"{'(比例触发)' if ratio_hit else ''}, "
f"加权regime={weighted_score:.3f}"
f"{'(加权触发)' if weighted_hit else ''}"
)
return False, f"系统性风险待确认({self._consecutive_triggers}/{confirm_bars})"
self._consecutive_triggers = 0
return False, ""
4.5 src/trading/config.py
@dataclass(frozen=True)
class StrategyParams:
# ... 现有字段 ...
# Beta 体制过滤器
beta_regime_enabled: bool = True
beta_regime_soft_prob: float = 0.3
beta_regime_hard_prob: float = 0.7
beta_regime_scale_max: float = 2.0
beta_regime_warmup: int = 5
# 系统性风险
systemic_risk_enabled: bool = True
systemic_ratio_threshold: float = 0.3
systemic_weighted_threshold: float = 0.25
systemic_confirm_bars: int = 2
4.6 src/trading/orchestrator.py
def resolve_hedge_beta(
kalman_beta: float | None,
kalman_P_beta: float | None,
ols_beta: float | None,
p_beta_max: float = HEDGE_BETA_P_MAX,
beta_min: float = HEDGE_BETA_MIN,
beta_max: float = HEDGE_BETA_MAX,
) -> tuple[float, str, bool]:
"""选择 hedge_beta: Kalman → OLS → 1.0 兜底"""
raw_beta = None
source = 'default'
if kalman_beta is not None and kalman_P_beta is not None:
if kalman_P_beta <= p_beta_max:
raw_beta = kalman_beta
source = 'kalman'
if raw_beta is None and ols_beta is not None:
raw_beta = ols_beta
source = 'ols'
if raw_beta is None:
return 1.0, 'default', False
beta_negative = raw_beta < 0
beta = np.clip(abs(raw_beta), beta_min, beta_max)
return float(beta), source, beta_negative
process_analysis() 提取 IMM 输出,透传到 strategy.process_tick() 和 on_entry_signal()。
4.7 src/trading/risk_manager.py
def calculate_position_size(self, signal, alt_price, base_price=0.0,
available_balance=0.0,
hedge_beta=1.0, hedge_beta_source=''):
# ... 现有逻辑 ...
if self._config.pair_mode == "pair" and base_price > 0 and alt_price > 0:
abs_beta = max(abs(hedge_beta), 0.1)
total_position = position_usd * 2
base_notional = total_position / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
alt_size = alt_notional / alt_price
base_size = base_notional / base_price
elif alt_price > 0:
alt_size = position_usd / alt_price
return alt_size, base_size
4.8 src/trading/models.py
@dataclass
class PairTradeSignal:
# ... 现有字段 ...
hedge_beta: float = 1.0
hedge_beta_source: str = 'default'
beta_negative: bool = False
@dataclass
class PairPosition:
# ... 现有字段 ...
entry_hedge_beta: float = 1.0
5. 数据流
1. WebSocket 4h K线闭合
↓
2. realtime_kline_service_base 触发分析
↓
3. analyze_multi_period()
├─ calculate_cointegration_params_dual_window(kalman_state=缓存)
│ ├─ OLS 回归 → β_ols, spread, adf_pvalue(现有)
│ └─ IMM Kalman update:
│ ├─ Step 1: 连续可观测性权重
│ ├─ Step 2: 混合(保存原始状态 → 带宽 TPM 加权)
│ ├─ Step 3: M=5 并行 OU 预测 + Student-t 更新
│ ├─ Step 4: 似然温和 + 贝叶斯模型概率更新
│ ├─ Step 5: Robbins-Monro 在线 R(无截断)
│ ├─ Step 6: 融合 → beta, P_beta, regime_score
│ ├─ Step 7: 在线 ν 自适应
│ └─ Step 8: x̄ 在线漂移
└─ 输出 multi_period_result(含 kalman_* 字段)
↓
4. orchestrator.process_analysis()
├─ 缓存 kalman_state
├─ → strategy.process_tick()
│ ├─ _IMMRegimeDetector.update() → 硬拦截 / 阈值缩放
│ └─ _SystemicRiskAggregator.check() → 系统性拦截
├─ 若产生 EntrySignal:
│ ├─ resolve_hedge_beta()
│ └─ on_entry_signal(hedge_beta=...)
↓
5. position_manager → risk_manager.calculate_position_size(hedge_beta)
├─ base_notional = total / (1 + |β|)
└─ alt_notional = |β| × base_notional
6. 改动清单
文件
| 文件 | 改动 |
|---|---|
src/config.py |
+17 IMM 常量(含 Φ_β_grid, γ_ν, γ_obs, γ_bar)+ 3 Hedge Ratio 常量 |
src/utils/analysis/analysis_core.py |
+_estimate_student_t_nu() + _KalmanModel + IMMKalmanBetaEstimator;增强 calculate_cointegration_params_dual_window() |
src/trading/config.py |
StrategyParams +4 体制检测字段 + 3 系统性风险字段 |
src/trading/models.py |
PairTradeSignal +3 字段;PairPosition +1 字段 |
src/trading/orchestrator.py |
+_kalman_states 字典 + resolve_hedge_beta();增强 process_analysis(), on_entry_signal() |
src/trading/strategy.py |
+_BetaRegimeState + _IMMRegimeDetector + _SystemicRiskAggregator;增强 process_tick(), _check_entry() |
src/trading/risk_manager.py |
calculate_position_size() 支持 β 加权 |
不改动:momentum_filter.py, executor.py
依赖:scipy.special.gammaln, scipy.stats.kurtosis(已有依赖)
DB
ALTER TABLE pair_positions ADD COLUMN entry_hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta_source VARCHAR(10) DEFAULT 'default';
ALTER TABLE trading_signals ADD COLUMN beta_negative BOOLEAN DEFAULT FALSE;
7. 日志
| 时机 | 级别 | 格式 |
|---|---|---|
| 初始化 | INFO | IMM初始化 | M=5 Q=[1e-6..1e-2] Φ=[0.95..0.995] ν={ν:.1f} soft=0.30 hard=0.70 |
| IMM 更新 | DEBUG | IMM | {pair} | β̂={β:.3f} P_β={P:.4f} regime={rs:.3f} eff_Q={q:.1e} ν={ν:.1f} obs_w={ow:.2f} R={R:.1e} |
| hedge_beta | DEBUG | hedge_beta={β:.3f}({src}) P_β={P:.4f} |
| 硬拦截 | INFO | Beta硬拦截 | {pair} | regime={rs:.3f}>={hard} |
| 缩放拦截 | INFO | Beta缩放拦截 | {pair} | az={az:.4f} 阈值={th:.2f} ({base}×{scale:.2f}) |
| 系统性待确认 | DEBUG | 系统性风险待确认({n}/{bars}) |
| 系统性拦截 | INFO | 系统性风险(连续{n}次): {expanding}/{total} 配对扩张 |
| β 加权仓位 | INFO | 仓位(β加权) β={β:.3f}({src}) Alt≈${alt:.0f} Base≈${base:.0f} |
8. 验证方案
单元测试
import numpy as np
def test_imm_convergence():
"""从初始值收敛到真实 β"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(200):
r_btc = rng.normal(0, 0.02)
result = kf.update(r_btc, 0.001 + 1.0 * r_btc + rng.normal(0, 0.01))
assert abs(result['beta'] - 1.0) < 0.15
def test_imm_ou_steady_state():
"""P_β 收敛到稳态,不无界增长"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
p_betas = []
for _ in range(500):
r_btc = rng.normal(0, 0.02)
result = kf.update(r_btc, 0.5 * r_btc + rng.normal(0, 0.01))
p_betas.append(result['P_beta'])
assert np.var(p_betas[400:]) < np.var(p_betas[50:150]) * 0.5
assert max(p_betas[100:]) < 0.5
def test_imm_interaction_no_corruption():
"""混合步骤不会因原地覆写污染状态"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
# 制造模型间差异
for _ in range(50):
kf.update(rng.normal(0, 0.02), rng.normal(0, 0.03))
# 验证混合后状态合理
betas = [m.x[1] for m in kf._models]
# 混合后各模型 β 应在合理范围内(不会因覆写飞掉)
for b in betas:
assert abs(b) < 10, f"Beta out of range after mixing: {b}"
def test_imm_regime_score_stable():
"""β 稳定时 regime_score 低"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(50):
result = kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
assert result['regime_score'] < 0.3
def test_imm_regime_score_spike():
"""β 突变时 regime_score 飙升"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
for _ in range(5):
r_btc = rng.normal(0, 0.02)
result = kf.update(r_btc, 3.0 * r_btc + rng.normal(0, 0.01))
assert result['regime_score'] > 0.3
def test_imm_no_signal_competition():
"""regime_score 不被 β 追踪吸收(无信号竞争)"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
for i in range(50):
r_btc = 0.01 * (1 if i % 2 == 0 else -1)
kf.update(r_btc, 0.5 * r_btc)
scores = []
beta_val = 0.5
for i in range(20):
beta_val += 0.1
r_btc = 0.015 * (1 if i % 2 == 0 else -1)
result = kf.update(r_btc, beta_val * r_btc)
scores.append(result['regime_score'])
assert max(scores) > 0.5
def test_imm_student_t_robustness():
"""Student-t 似然对异常值更鲁棒(vs 近高斯 ν=1000)"""
kf_t = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, nu=5.0)
kf_g = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, nu=1000.0)
for i in range(50):
r = 0.01 * (1 if i % 2 == 0 else -1)
kf_t.update(r, 0.5 * r + 0.001)
kf_g.update(r, 0.5 * r + 0.001)
rs_t, rs_g = kf_t.regime_score, kf_g.regime_score
# 注入 5σ 异常值
result_t = kf_t.update(0.02, 0.5 * 0.02 + 0.15)
result_g = kf_g.update(0.02, 0.5 * 0.02 + 0.15)
assert (result_t['regime_score'] - rs_t) < (result_g['regime_score'] - rs_g)
def test_imm_online_r_no_bias():
"""无截断 R 在噪声下降时正确减小"""
rng = np.random.default_rng(42)
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-4, r_gamma=0.05)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.05))
R_high = kf._models[0].R
for _ in range(80):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.005))
assert kf._models[0].R < R_high * 0.5
def test_imm_online_nu_adaptation():
"""在线 ν: 重尾数据使 ν 减小"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2,
nu=10.0, nu_gamma=0.05)
rng = np.random.default_rng(42)
# 正常数据
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
nu_before = kf._nu
# 注入重尾数据
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.standard_t(3) * 0.03)
assert kf._nu < nu_before
def test_imm_xbar_drift():
"""x̄ 在线漂移: β 持续迁移后 x̄ 跟随"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2,
xbar_gamma=0.02) # 加速漂移以便测试
rng = np.random.default_rng(42)
xbar_init = kf._x_bar[1]
for _ in range(200):
r_btc = rng.normal(0, 0.02)
kf.update(r_btc, 2.0 * r_btc + rng.normal(0, 0.01))
assert kf._x_bar[1] > xbar_init + 0.3, f"x̄ should drift toward β≈2: x̄={kf._x_bar[1]}"
def test_imm_observability_continuous():
"""r_btc≈0 时 obs_weight→0,模型概率几乎不变"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
mu_before = kf._mu.copy()
result = kf.update(1e-8, rng.normal(0, 0.01))
assert result['obs_weight'] < 0.01
# 模型概率应几乎不变
assert np.max(np.abs(kf._mu - mu_before)) < 0.05
def test_imm_per_model_phi():
"""Per-model Φ_β: 高Q模型弱回归,低Q模型强回归"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
# 验证 Φ_β 差异
phi_0 = kf._models[0]._phi[1, 1] # 低Q模型
phi_4 = kf._models[4]._phi[1, 1] # 高Q模型
assert phi_0 < phi_4, f"低Q应有更小Φ: {phi_0} vs {phi_4}"
def test_estimate_student_t_nu():
"""ν 估计: 正态→高ν, 重尾→低ν"""
rng = np.random.default_rng(42)
nu_normal = _estimate_student_t_nu(rng.normal(0, 1, 200))
nu_heavy = _estimate_student_t_nu(rng.standard_t(df=3, size=200))
assert nu_normal > 10
assert nu_heavy < 7
assert nu_heavy < nu_normal
def test_imm_state_persistence():
"""状态导出/恢复后行为一致"""
kf1 = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2, nu=7.0)
kf1.update(0.02, 0.01)
kf2 = IMMKalmanBetaEstimator.from_state_dict(kf1.state_dict())
r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
assert abs(r1['beta'] - r2['beta']) < 1e-10
assert abs(r1['R'] - r2['R']) < 1e-15
def test_imm_model_probs_sum_to_one():
"""模型概率总和始终为 1"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(200):
result = kf.update(rng.normal(0, 0.02), rng.normal(0, 0.03))
assert abs(sum(result['model_probs']) - 1.0) < 1e-10
def test_imm_bandwidth_tpm():
"""带宽 TPM: 相邻 > 远端,每行和 = 1"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, r_init=1e-2)
tpm = kf._TPM
assert tpm[2, 1] > tpm[2, 0]
assert tpm[2, 3] > tpm[2, 4]
for i in range(kf.M):
assert abs(tpm[i].sum() - 1.0) < 1e-10
# test_hedge_beta.py
def test_resolve_hedge_beta_kalman():
beta, src, neg = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.01, ols_beta=0.5)
assert src == 'kalman' and abs(beta - 0.45) < 0.01 and not neg
def test_resolve_hedge_beta_fallback_ols():
beta, src, _ = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.8, ols_beta=0.5)
assert src == 'ols' and abs(beta - 0.5) < 0.01
def test_resolve_hedge_beta_clipping():
beta, _, _ = resolve_hedge_beta(kalman_beta=10.0, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 5.0
def test_resolve_hedge_beta_negative():
beta, src, neg = resolve_hedge_beta(kalman_beta=-0.8, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 0.8 and neg is True
def test_position_size_beta_weighted():
abs_beta = 0.5
total = 200.0
base = total / (1.0 + abs_beta)
alt = abs_beta * base
assert abs(base - 133.33) < 0.5 and abs(alt - 66.67) < 0.5
# test_regime_detector.py / test_systemic_risk.py
def test_regime_stable():
d = _IMMRegimeDetector()
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
for i in range(10):
s = d.update(key, 0.05, 0.01, 1e-5, f"T{i}", 0.3, 0.7, 2.0, 5)
assert s.regime == 'stable'
def test_regime_hard_block():
d = _IMMRegimeDetector()
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
for i in range(10):
s = d.update(key, 0.85, 0.1, 3e-3, f"T{i}", 0.3, 0.7, 2.0, 5)
assert s.hard_block
def test_systemic_confirmation():
agg = _SystemicRiskAggregator(enabled=True)
states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
0.01, 5e-4, 1.5, False, ""
) for i in range(10)}
assert not agg.check(states, confirm_bars=2)[0] # 第1次不拦截
assert agg.check(states, confirm_bars=2)[0] # 第2次拦截
集成验证
- 回测:β 飙升历史区间,对比 regime_score 检测延迟、假阳性率、等额 vs β 加权 PnL
- 消融实验(按优先级):
- OU vs 随机游走(设
Φ_β_grid=[1,1,1,1,1]) - Student-t vs 高斯(设
ν=1000) - Per-model Φ vs 共享 Φ(设
Φ_β_grid=[0.98]*5) - 在线 ν vs 固定 ν(设
γ_ν=0) - 似然温和 vs 二值跳过
- x̄ 漂移 vs 固定 x̄(设
γ_bar=0)
- OU vs 随机游走(设
- 实盘监控:model_probs 分布、effective_q 差异(MEME >> L1)、per-pair ν 走势、obs_weight 分布、x̄ 漂移轨迹
9. 风险与边界条件
| 风险 | 严重度 | 缓解 |
|---|---|---|
| 已持仓 hedge ratio 偏离 | 高 | 入场时计算;持仓 rebalance 作为 P0 后续 |
| β 符号翻转 | 高 | resolve_hedge_beta 返回 beta_negative 标记;方向翻转 P1 |
| x̄ 漂移速度不足 | 中 | γ_bar=0.005 保守;可按需调大 |
| IMM 概率塌缩 | 极低 | TPM 保证概率流入 + μ_floor 双重保护 |
| 重启丢失 Kalman 状态 | 中 | OLS 重初始化,3-5 根 K 线收敛;OU 保证 P_β 有稳态 |
| β 加权导致极端仓位 | 低 | clip([0.1, 5.0]) |
不在本次范围:退场逻辑调整、持仓 rebalance、Kalman 持久化到 DB、飞书告警、负 β 方向翻转
10. 后续演进
| 优先级 | 方向 | 预期收益 |
|---|---|---|
| P0 | 退场保护(β 飙升时收紧止损) | 减少已持仓亏损 |
| P0 | 持仓 hedge rebalance | 维持对冲质量 |
| P0 | Kalman 状态持久化到 DB | 消除冷启动 |
| P1 | VB-AKF 联合 Q+R 后验估计(Sarkka 2009) | 消除 Q 网格 + Robbins-Monro;连续 Q 后验 |
| P1 | 负 β 方向翻转逻辑 | 正确处理反向相关 |
| P1 | 跨配对层次贝叶斯(同赛道共享 Q 超先验) | 加速冷启动 |
| P2 | 扩展状态 [α, β, β̇](位置-速度模型) | 检测 β 加速/减速,提前 1-2 根 K 线响应 |
| P2 | GARCH-inspired R 建模 | 捕获波动率聚集效应 |
| P2 | HMM / MS-SSM 统一体制框架 | 多体制分类 |