IMM Kalman Beta 体制自适应设计方案(v5.0)
IMM Kalman Beta 体制自适应设计方案(v5.0)
1. 问题背景
当前系统 analysis_core.py 用固定窗口 OLS(BETA_WINDOW=100)估计 β,滞后 ≈17 天;risk_manager.py 使用等额名义价值忽略 β;strategy.py 固定阈值在 β 飙升时持续误入场;高斯似然在加密货币重尾(kurtosis 5–20)下过度敏感;随机游走状态方程导致 P_β 无稳态解。
2. 架构
IMM(Interacting Multiple Model)Kalman Filter + OU 均值回归联合估计时变 [α, β],双用途输出:
┌──────────────────────────────────────────────────────────────────┐
│ IMMKalmanBetaEstimator (4h) │
│ M=5 并行模型, OU 状态方程 (per-model Φ_β) │
│ Student-t 似然 (MLE 在线 ν), Sage-Husa R │
│ 自适应 Dirichlet TPM, 连续可观测性加权, x̄ 在线漂移 │
└──────────────────────┬───────────────────────────────────────────┘
│
┌─────────────┴─────────────┐
▼ ▼
用途 1: 体制检测 用途 2: Hedge Ratio
P(高Q模型) → regime_score alt_notional = |β| × base_notional
│
▼
入场信号过滤 (硬拦截 / 阈值缩放)
│
▼
跨配对系统性风险聚合 (双阈值 + 确认窗口)
| 层 | 组件 | 核心算法 |
|---|---|---|
| 第一层 | IMM Kalman Filter | M=5 OU Kalman + Student-t 似然 + Sage-Husa R + 自适应 Dirichlet TPM + MLE ν |
| 第二层 | IMM 体制检测 | regime_score = P(高Q模型) → 入场过滤 |
| 第三层 | 跨配对风险聚合 | 双阈值独立设定 + 确认窗口 |
| 第四层 | Beta 加权仓位 | kalman_beta 直接作为 hedge ratio |
3. 算法设计
3.1 数学模型
M 个并行状态空间模型,每个模型 j 使用不同的 (Q_β^(j), Φ_β^(j)) 对:
状态方程(OU 过程,per-model Φ_β):
x_t = Φ^(j) × x_{t-1} + (I − Φ^(j)) × x̄_t + w_t, w_t ~ N(0, Q^(j))
x_t = [α_t, β_t]'
x̄_t = [ᾱ_t, β̄_t]' 在线缓慢漂移的长期均值
Φ^(j) = diag(Φ_α, Φ_β^(j)) per-model 对角回归矩阵
Q^(j) = diag(q_α, q_β^(j)) q_α 共享, q_β^(j) 各异
观测方程(所有模型共享):
r_alt_t = H_t × x_t + v_t, v_t ~ N(0, R_t)
H_t = [1, r_btc_t]
R_t 通过 Sage-Husa 时变遗忘因子在线自适应
Q_β 与 Φ_β 配对网格(P_∞ 跨 5 个数量级,模型间区分度强):
| 模型 j | Q_β^(j) | Φ_β^(j) | 物理含义 | P_∞_β |
|---|---|---|---|---|
| 0 | 1e-6 | 0.950 | β 几乎不变,强回归 | 1.0e-5 |
| 1 | 1e-5 | 0.970 | β 缓慢变化 | 1.7e-4 |
| 2 | 1e-4 | 0.980 | β 正常变化 | 2.5e-3 |
| 3 | 1e-3 | 0.990 | β 快速变化,弱回归 | 5.0e-2 |
| 4 | 1e-2 | 0.995 | 结构性断裂,近自由漂移 | 1.0 |
3.2 IMM 递推(v5.0)
输入: r_btc_t, r_alt_t, 上一步状态 {x^(j), P^(j), μ_j, α_{TPM}}
Step 1: 可观测性权重
btc_var_ema = (1−γ_obs) × btc_var_ema + γ_obs × r_btc²
obs_weight = min(r_btc² / max(btc_var_ema, 1e-12), 1.0)
含义: r_btc ≈ 0 时 obs_weight→0,模型概率几乎不更新,消除边界跳变
Step 2: 保存 μ_{t-1}(供 Step 5b 使用)
Step 3: 混合(Interaction)
⚠️ 必须先保存所有模型原始状态副本,再执行混合
预测模型概率: μ̃_j = Σ_i π_{ij} × μ_i ← 使用 Step 2 更新后的新 TPM
混合权重: w_{i|j} = π_{ij} × μ_i / μ̃_j
# 为每个模型 j 准备混合初始条件(从原始副本计算)
x̃^(j) = Σ_i w_{i|j} × orig_x[i]
P̃^(j) = Σ_i w_{i|j} × [orig_P[i] + (orig_x[i] − x̃^(j))(orig_x[i] − x̃^(j))']
Step 4: 并行滤波(每个模型 j 独立运行 OU Kalman)
OU 预测:
x_pred^(j) = Φ^(j) × x̃^(j) + (I − Φ^(j)) × x̄
P_pred^(j) = Φ^(j) × P̃^(j) × Φ^(j)' + Q^(j)
Innovation:
ε^(j) = r_alt_t − H_t × x_pred^(j)
S^(j) = H_t × P_pred^(j) × H_t' + R
Student-t 似然(用原始 innovation):
L_j = t_ν(ε^(j); 0, S^(j))
Huber Clipping(保护状态更新,不影响似然):
ε̃^(j) = clip(ε^(j), ±c×√S^(j))
Kalman 更新(Joseph 稳定形式):
K^(j) = P_pred^(j) × H_t' / S^(j)
x^(j) = x_pred^(j) + K^(j) × ε̃^(j)
A = I − K^(j) × H_t
P^(j) = A × P_pred^(j) × A' + R × K^(j) × K^(j)'
Step 5: 模型概率更新(含可观测性温和)
log_L_tempered_j = obs_weight × log_L_j
obs_weight=1 → 正常贝叶斯更新
obs_weight=0 → L_tempered=1,μ 保持 μ̃ 不变
log_μ_j = log(μ̃_j) + log_L_tempered_j
μ_j = softmax(log_μ)
μ_j = max(μ_j, μ_floor),再归一化
Step 5b: 自适应 Dirichlet TPM(v5 新增,须在后验 μ 更新后执行)
# 正确充分统计量(Li & Bar-Shalom, 1996):
# c_{ij} = π_{ij} × μ_i(t-1) × μ_j(t) / μ̃_j(t)
#
# 关键:后验修正项 μ_j(t)/μ̃_j(t) 使软计数偏向实际被激活的模型对
# 若只用 π_{ij} × μ_i,软计数正比于现有 TPM 行,归一化后 TPM 永不改变
# 后验修正项打破该恒等式,让 TPM 能向数据驱动方向学习
posterior_correction_j = μ_j(t) / μ̃_j(t) # 后验/预测 比值,shape (M,)
c_{ij} = π_{ij} × μ_i(t-1) × correction_j
α_{TPM} = λ_tpm × α_{TPM} + c # λ_tpm=0.995,有效窗口 ~33天
π_{ij} = α_{ij} / Σ_k α_{ik} # 按行归一化
Step 6: Sage-Husa 自适应 R(v5 替换 Robbins-Monro)
# 时变遗忘因子,早期收敛快,长期等效 EMA(1-b)
d_k = (1 − b) / (1 − b^k) b=0.97, k=更新次数
d_1 = 1.0, d_2 ≈ 0.49, d_k→_{k→∞} 0.03
weighted_innov_sq = Σ_j μ̃_j × ε^(j)² ← 用预测概率,避免循环依赖
weighted_HP_H = Σ_j μ̃_j × H P_pred^(j) H'
R = (1 − d_k) × R + d_k × (weighted_innov_sq − weighted_HP_H)
R = max(R, R_floor)
Step 7: 融合输出
β = Σ_j μ_j × β^(j)
P_β = Σ_j μ_j × [P_β^(j) + (β^(j) − β)²]
regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j
Step 8: 在线 ν 自适应(v5 修正:E[z⁴] 而非 (E[z])⁴)
# 正确追踪预测分布的4阶矩,而非均值的4次方
fourth_moment_ema = (1−γ_ν) × fourth_moment_ema + γ_ν × Σ_j μ_j × (ε^(j)/√S^(j))⁴
excess_kurt = fourth_moment_ema − 3.0
若 excess_kurt > 0.2 且 obs_weight > 0.3:
ν = clip(6/excess_kurt + 4, 3, 30) # Student-t excess kurtosis = 6/(ν-4) 反解
同步到所有 M 个模型
v4 错误原因: Σ_j μ_j z_j 是均值,(均值)⁴ ≤ E[z⁴](Jensen),系统性低估 kurtosis,
ν 偏高 → 重尾保护失效
Step 9: x̄ 在线漂移
x̄[α] = (1−γ_bar) × x̄[α] + γ_bar × α_fused
x̄[β] = (1−γ_bar) × x̄[β] + γ_bar × β_fused
同步到所有 M 个模型
γ_bar=0.005 → 有效窗口 ~33天,β 持续迁移时缓慢跟随,不追踪短期波动
3.3 参数
| 参数 | 默认值 | 含义 |
|---|---|---|
| Q_β 网格 | [1e-6, 1e-5, 1e-4, 1e-3, 1e-2] | M=5 模型的 β 过程噪声(对数均匀,4 个数量级) |
| Φ_β 网格 | [0.95, 0.97, 0.98, 0.99, 0.995] | Per-model OU 回归强度(低Q强回归,高Q弱回归) |
| q_α / Φ_α | 1e-5 / 0.995 | α 过程噪声与回归系数(共享) |
| R_floor | 1e-8 | R 正定下限 |
| b(Sage-Husa) | 0.97 | 遗忘基数;长期等效 EMA(0.03),早期收敛更快 |
| clip_sigma | 3.0 | Huber 截断(保护状态,不影响似然) |
| ν_init | MLE from OLS resid | Student-t 初始自由度(v5 改为 MLE) |
| γ_ν | 0.01 | 在线 ν 学习率(有效窗口 ~100步) |
| λ_tpm | 0.995 | Dirichlet TPM 遗忘因子(有效窗口 ~33天) |
| κ_tpm | 20.0 | Dirichlet 初始化浓度参数 |
| q_high | 1e-3 | 高Q模型阈值 |
| μ_floor | 1e-6 | 模型概率下限 |
| γ_obs | 0.05 | BTC 方差 EMA 衰减(可观测性权重) |
| γ_bar | 0.005 | x̄ 漂移学习率 |
| P₀ | diag(0.1, 1.0) | 初始不确定性 |
参数一致性约束:
q_α = Q_β_grid[2]/10→ α 变化比中位 β 慢一个量级Φ_β_grid与Q_β_grid配对 → 高Q模型弱回归(Φ接近1)γ_bar ≪ 1/(-ln(Φ_β_grid[2]))→ x̄ 迁移远慢于 β 追踪
3.4 体制检测
regime_score = Σ_{j: q_β^(j) ≥ q_high} μ_j 直接作为体制指标:
| regime_score | 含义 | 动作 |
|---|---|---|
| < 0.3 | 低Q模型主导,β 稳定 | 正常入场 |
| [0.3, 0.7) | β 开始变化 | 阈值线性缩放至 scale_max |
| ≥ 0.7 | β 剧烈变化 | 硬拦截 |
obs_weight ≈ 0 时模型概率几乎不更新,regime_score 自然稳定,不会因 BTC 横盘产生误报。
3.5 跨配对系统性风险聚合
双独立阈值 + 确认窗口:
条件 1: expanding 配对比例 ≥ ratio_threshold (0.3)
条件 2: 加权 regime_score ≥ weighted_threshold (0.25)
任一触发 → 累计确认计数
连续 confirm_bars (2) 次触发 → 全局拦截
中间恢复 → 计数归零
3.6 Beta 加权仓位
hedge_beta 选择:Kalman β(P_β ≤ P_max 时)→ OLS β(降级)→ 1.0(兜底)。
仓位公式:base_notional = total / (1 + |β|), alt_notional = |β| × base_notional。β=1.0 时退化为等额。
4. 实现
4.1 src/config.py
# ═══ IMM Kalman Filter 参数(v5.0)═══
IMM_Q_BETA_GRID: list[float] = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
IMM_PHI_BETA_GRID: list[float] = [0.95, 0.97, 0.98, 0.99, 0.995]
IMM_Q_ALPHA: float = 1e-5
IMM_PHI_ALPHA: float = 0.995
IMM_P0_ALPHA: float = 0.1
IMM_P0_BETA: float = 1.0
IMM_R_FLOOR: float = 1e-8
IMM_SAGE_HUSA_B: float = 0.97 # Sage-Husa 遗忘基数(替换 Robbins-Monro r_gamma)
IMM_CLIP_SIGMA: float = 3.0
IMM_STUDENT_T_NU: float = 5.0 # 默认 ν(per-pair MLE 初始化时覆盖)
IMM_NU_GAMMA: float = 0.01
IMM_TPM_LAMBDA: float = 0.995 # Dirichlet TPM 遗忘因子
IMM_TPM_KAPPA: float = 20.0 # Dirichlet 初始浓度参数
IMM_TRANSITION_PROB: float = 0.98 # 初始 p_stay(Dirichlet 先验均值)
IMM_HIGH_Q_THRESHOLD: float = 1e-3
IMM_MU_FLOOR: float = 1e-6
IMM_OBS_GAMMA: float = 0.05
IMM_XBAR_GAMMA: float = 0.005
# ═══ Hedge Ratio 参数 ═══
HEDGE_BETA_MIN: float = 0.1
HEDGE_BETA_MAX: float = 5.0
HEDGE_BETA_P_MAX: float = 0.5
4.2 src/utils/analysis/analysis_core.py — 核心类
import numpy as np
from scipy.special import gammaln
from scipy.stats import t as scipy_t
from scipy.stats import kurtosis as sp_kurtosis
def _estimate_student_t_nu(residuals: np.ndarray, default: float = 5.0) -> float:
"""MLE 估计 Student-t 自由度 ν(v5: 替换矩匹配)
MLE 比矩匹配(kurt=6/(ν-4))精度高约一个数量级,尤其在 n<500 时样本峰度方差极大。
fallback 到矩匹配仅在 scipy 失败时使用。
"""
if len(residuals) < 30:
return default
try:
df, _, _ = scipy_t.fit(residuals, floc=0) # MLE,固定 location=0
return float(np.clip(df, 3.0, 30.0))
except Exception:
kurt = float(sp_kurtosis(residuals, fisher=True))
return 30.0 if kurt <= 0.2 else float(np.clip(6.0 / max(kurt, 0.01) + 4.0, 3.0, 30.0))
class _KalmanModel:
"""单一 Kalman Filter 模型(IMM 内部组件)
2D 状态 [α, β],OU 均值回归 + 固定 Q + 共享 R。
Student-t 似然 + Joseph 稳定形式 + Huber clipping。
"""
def __init__(
self,
x: np.ndarray,
P: np.ndarray,
q_alpha: float,
q_beta: float,
R: float,
x_bar: np.ndarray,
phi: np.ndarray, # [Φ_α, Φ_β^(j)] — per-model
r_floor: float = 1e-8,
clip_sigma: float = 3.0,
nu: float = 5.0,
):
self.x = x.copy()
self.P = P.copy()
self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
self.R = max(R, r_floor)
self.x_bar = x_bar.copy()
self._phi = np.diag(phi).astype(np.float64)
self._I_phi = np.eye(2) - self._phi
self._r_floor = r_floor
self._clip_sigma = clip_sigma
self.nu = nu
self._update_t_const()
def _update_t_const(self):
self._log_t_const = (
gammaln((self.nu + 1) / 2) - gammaln(self.nu / 2)
- 0.5 * np.log(self.nu * np.pi)
)
def predict(self):
self.x = self._phi @ self.x + self._I_phi @ self.x_bar
self.P = self._phi @ self.P @ self._phi.T + self.Q
def update(self, r_btc: float, r_alt: float) -> dict:
H = np.array([1.0, r_btc], dtype=np.float64)
innovation = r_alt - H @ self.x
HP_H = float(H @ self.P @ H)
S = max(HP_H + self.R, 1e-15)
# Student-t 似然(原始 innovation)
log_likelihood = (
self._log_t_const - 0.5 * np.log(S)
- ((self.nu + 1) / 2) * np.log(1 + innovation ** 2 / (self.nu * S))
)
# Huber clipping(保护状态更新,不影响似然)
sqrt_S = S ** 0.5
norm_innov = innovation / sqrt_S
innov_update = innovation
clipped = False
if abs(norm_innov) > self._clip_sigma:
innov_update = self._clip_sigma * sqrt_S * np.sign(innovation)
clipped = True
K = (self.P @ H) / S
self.x = self.x + K * innov_update
# Joseph 稳定形式(保证 P 正定)
A = np.eye(2) - np.outer(K, H)
self.P = A @ self.P @ A.T + self.R * np.outer(K, K)
return {
'alpha': float(self.x[0]),
'beta': float(self.x[1]),
'P_beta': float(self.P[1, 1]),
'P_alpha': float(self.P[0, 0]),
'innovation': innovation,
'normalized_innovation': norm_innov, # 原始标准化残差,用于 ν 估计
'S': S,
'HP_H': HP_H,
'log_likelihood': log_likelihood,
'clipped': clipped,
'kalman_gain_beta': float(K[1]),
}
class IMMKalmanBetaEstimator:
"""Interacting Multiple Model Kalman Filter 时变 [α, β] 联合估计器
核心思想 (Blom & Bar-Shalom, 1988):
M 个 Kalman Filter 并行运行,每个使用不同的 (Q_β, Φ_β) 对,
通过贝叶斯模型概率实时加权融合。
v5.0 改进(vs v4.0):
1. 自适应 Dirichlet TPM — 替换静态带宽 TPM,在线学习模型转移速率
2. Sage-Husa 自适应 R — 替换 Robbins-Monro,时变遗忘因子早期收敛更快
3. 修正在线 ν 估计 — 追踪 E[z⁴] 而非 (E[z])⁴,消除系统性 kurtosis 低估
4. MLE ν 初始化 — 替换矩匹配,n<500 时精度高约一个数量级
双用途输出:
- beta → hedge ratio
- regime_score → 体制检测(高Q模型总概率)
每根新 4h K 线调用 update() 一次。
"""
def __init__(
self,
alpha_init: float,
beta_init: float,
q_beta_grid: list[float] | None = None,
phi_beta_grid: list[float] | None = None,
q_alpha: float = 1e-5,
phi_alpha: float = 0.995,
r_init: float = 1e-2,
p0_alpha: float = 0.1,
p0_beta: float = 1.0,
r_floor: float = 1e-8,
sage_husa_b: float = 0.97,
clip_sigma: float = 3.0,
nu: float = 5.0,
nu_gamma: float = 0.01,
transition_prob: float = 0.98,
tpm_kappa: float = 20.0,
tpm_lambda: float = 0.995,
high_q_threshold: float = 1e-3,
mu_floor: float = 1e-6,
obs_gamma: float = 0.05,
xbar_gamma: float = 0.005,
btc_var_init: float = 4e-4,
):
if q_beta_grid is None:
q_beta_grid = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
if phi_beta_grid is None:
phi_beta_grid = [0.95, 0.97, 0.98, 0.99, 0.995]
assert len(q_beta_grid) == len(phi_beta_grid)
self.M = len(q_beta_grid)
self._q_beta_grid = list(q_beta_grid)
self._phi_beta_grid = list(phi_beta_grid)
self._high_q_threshold = high_q_threshold
self._sage_husa_b = sage_husa_b
self._r_floor = r_floor
self._clip_sigma = clip_sigma
self._mu_floor = mu_floor
self._n_updates = 0
# 在线 ν 自适应(v5: 追踪 E[z⁴],初始化从 ν 反推 3 + 6/(ν-4))
self._nu = nu
self._nu_gamma = nu_gamma
self._fourth_moment_ema = 3.0 + 6.0 / max(nu - 4.0, 0.5)
# 连续可观测性权重
self._obs_gamma = obs_gamma
self._btc_var_ema = btc_var_init
# x̄ 在线漂移
self._xbar_gamma = xbar_gamma
x_bar = np.array([alpha_init, beta_init], dtype=np.float64)
self._x_bar = x_bar.copy()
# 初始化 M 个 Kalman 模型
x0 = np.array([alpha_init, beta_init], dtype=np.float64)
P0 = np.diag([p0_alpha, p0_beta]).astype(np.float64)
self._models = [
_KalmanModel(
x0, P0, q_alpha, q_beta_grid[j], r_init, x_bar,
np.array([phi_alpha, phi_beta_grid[j]]),
r_floor, clip_sigma, nu,
)
for j in range(self.M)
]
# 均匀初始模型概率
self._mu = np.ones(self.M) / self.M
# ── 自适应 Dirichlet TPM(v5 新增)──
# 以带宽 TPM 为先验均值,浓度参数 κ 控制先验强度
init_tpm = self._build_bandwidth_tpm(self.M, transition_prob)
self._tpm_alpha = init_tpm * tpm_kappa # Dirichlet 浓度,shape (M, M)
self._tpm_lambda = tpm_lambda
self._TPM = init_tpm.copy()
# 高Q模型索引
self._high_q_indices = [
j for j, q in enumerate(q_beta_grid) if q >= high_q_threshold
]
@staticmethod
def _build_bandwidth_tpm(M: int, p_stay: float) -> np.ndarray:
"""构造初始带宽转移概率矩阵(相邻模型跳转概率 > 远端)"""
tpm = np.zeros((M, M))
for i in range(M):
weights = np.array([
np.exp(-abs(i - j)) if i != j else 0.0
for j in range(M)
])
w_sum = weights.sum()
if w_sum > 0:
tpm[i] = weights / w_sum * (1.0 - p_stay)
tpm[i, i] = p_stay
return tpm
@property
def beta(self) -> float:
return float(sum(self._mu[j] * self._models[j].x[1] for j in range(self.M)))
@property
def beta_variance(self) -> float:
b = self.beta
return float(sum(
self._mu[j] * (self._models[j].P[1, 1] + (self._models[j].x[1] - b) ** 2)
for j in range(self.M)
))
@property
def regime_score(self) -> float:
return float(sum(self._mu[j] for j in self._high_q_indices))
@property
def effective_q_beta(self) -> float:
return float(sum(self._mu[j] * self._q_beta_grid[j] for j in range(self.M)))
def update(self, r_btc: float, r_alt: float) -> dict:
"""接收一对新的对数收益率,执行完整 IMM v5.0 更新"""
self._n_updates += 1
# ── Step 1: 可观测性权重(连续) ──
self._btc_var_ema = ((1 - self._obs_gamma) * self._btc_var_ema
+ self._obs_gamma * r_btc ** 2)
obs_weight = min(r_btc ** 2 / max(self._btc_var_ema, 1e-12), 1.0)
# ── Step 2: 自适应 Dirichlet TPM(v5 新增) ──
# soft_counts_{ij} = π_{ij} × μ_i(本步期望转移计数)
soft_counts = self._TPM * self._mu[:, np.newaxis] # shape (M, M)
self._tpm_alpha = self._tpm_lambda * self._tpm_alpha + soft_counts
row_sums = self._tpm_alpha.sum(axis=1, keepdims=True)
self._TPM = self._tpm_alpha / np.maximum(row_sums, 1e-30)
# ── Step 3: 混合(Interaction) ──
mu_predicted = self._TPM.T @ self._mu
mix_weights = np.zeros((self.M, self.M))
for j in range(self.M):
if mu_predicted[j] > 1e-30:
for i in range(self.M):
mix_weights[i, j] = self._TPM[i, j] * self._mu[i] / mu_predicted[j]
# ⚠️ 保存原始状态副本,防止混合循环中覆写污染
orig_x = [m.x.copy() for m in self._models]
orig_P = [m.P.copy() for m in self._models]
for j in range(self.M):
x_mixed = np.zeros(2)
for i in range(self.M):
x_mixed += mix_weights[i, j] * orig_x[i]
P_mixed = np.zeros((2, 2))
for i in range(self.M):
dx = orig_x[i] - x_mixed
P_mixed += mix_weights[i, j] * (orig_P[i] + np.outer(dx, dx))
self._models[j].x = x_mixed
self._models[j].P = P_mixed
# ── Step 4: 并行滤波(OU 预测 + Student-t 更新) ──
results = []
log_likelihoods = np.zeros(self.M)
for j in range(self.M):
self._models[j].predict()
r = self._models[j].update(r_btc, r_alt)
results.append(r)
log_likelihoods[j] = r['log_likelihood']
# ── Step 5: 模型概率更新(似然温和 + 概率下限保护) ──
log_L_tempered = obs_weight * log_likelihoods
log_mu = np.log(np.maximum(mu_predicted, 1e-30)) + log_L_tempered
log_mu -= np.max(log_mu)
self._mu = np.exp(log_mu)
total = np.sum(self._mu)
self._mu = self._mu / total if total > 0 else np.ones(self.M) / self.M
self._mu = np.maximum(self._mu, self._mu_floor)
self._mu /= self._mu.sum()
# ── Step 6: Sage-Husa 自适应 R(v5 替换 Robbins-Monro) ──
b = self._sage_husa_b
d_k = (1.0 - b) / (1.0 - b ** self._n_updates) # 时变遗忘因子
weighted_innov_sq = sum(
mu_predicted[j] * results[j]['innovation'] ** 2 for j in range(self.M)
)
weighted_HP_H = sum(
mu_predicted[j] * results[j]['HP_H'] for j in range(self.M)
)
r_update = weighted_innov_sq - weighted_HP_H
new_R = (1.0 - d_k) * self._models[0].R + d_k * r_update
new_R = max(new_R, self._r_floor)
for m in self._models:
m.R = new_R
# ── Step 7: 融合输出 ──
beta = sum(self._mu[j] * results[j]['beta'] for j in range(self.M))
alpha = sum(self._mu[j] * results[j]['alpha'] for j in range(self.M))
P_beta = sum(
self._mu[j] * (results[j]['P_beta'] + (results[j]['beta'] - beta) ** 2)
for j in range(self.M)
)
P_alpha = sum(
self._mu[j] * (results[j]['P_alpha'] + (results[j]['alpha'] - alpha) ** 2)
for j in range(self.M)
)
regime_score = sum(self._mu[j] for j in self._high_q_indices)
effective_q = sum(self._mu[j] * self._q_beta_grid[j] for j in range(self.M))
dominant = int(np.argmax(self._mu))
weighted_innovation = sum(self._mu[j] * results[j]['innovation'] for j in range(self.M))
any_clipped = any(results[j]['clipped'] for j in range(self.M) if self._mu[j] > 0.1)
# ── Step 8: 在线 ν 自适应(v5 修正:追踪 E[z⁴],非 (E[z])⁴) ──
if self._nu_gamma > 0 and obs_weight > 0.3:
# 正确:加权4阶矩 E[z⁴],而非4次方的加权均值
weighted_fourth_moment = sum(
self._mu[j] * results[j]['normalized_innovation'] ** 4
for j in range(self.M)
)
self._fourth_moment_ema = (
(1 - self._nu_gamma) * self._fourth_moment_ema
+ self._nu_gamma * weighted_fourth_moment
)
excess_kurt = self._fourth_moment_ema - 3.0
if excess_kurt > 0.2:
new_nu = float(np.clip(6.0 / excess_kurt + 4.0, 3.0, 30.0))
self._nu = new_nu
for m in self._models:
m.nu = new_nu
m._update_t_const()
# ── Step 9: x̄ 在线漂移 ──
if self._xbar_gamma > 0:
self._x_bar[0] = (1 - self._xbar_gamma) * self._x_bar[0] + self._xbar_gamma * alpha
self._x_bar[1] = (1 - self._xbar_gamma) * self._x_bar[1] + self._xbar_gamma * beta
for m in self._models:
m.x_bar = self._x_bar.copy()
return {
'alpha': float(alpha),
'beta': float(beta),
'P_beta': float(P_beta),
'P_alpha': float(P_alpha),
'regime_score': float(regime_score),
'effective_q_beta': float(effective_q),
'model_probs': self._mu.copy(),
'dominant_model_idx': dominant,
'innovation': float(weighted_innovation),
'clipped': any_clipped,
'obs_weight': float(obs_weight),
'R': self._models[0].R,
'nu': self._nu,
}
def state_dict(self) -> dict:
return {
'models': [
{
'x': m.x.tolist(), 'P': m.P.tolist(),
'Q': m.Q.tolist(), 'R': m.R,
'phi': m._phi.tolist(),
}
for m in self._models
],
'mu': self._mu.tolist(),
'q_beta_grid': self._q_beta_grid,
'phi_beta_grid': self._phi_beta_grid,
'high_q_threshold': self._high_q_threshold,
'n_updates': self._n_updates,
'TPM': self._TPM.tolist(),
'tpm_alpha': self._tpm_alpha.tolist(),
'tpm_lambda': self._tpm_lambda,
'sage_husa_b': self._sage_husa_b,
'r_floor': self._r_floor,
'nu': self._nu,
'nu_gamma': self._nu_gamma,
'fourth_moment_ema': self._fourth_moment_ema,
'clip_sigma': self._clip_sigma,
'x_bar': self._x_bar.tolist(),
'xbar_gamma': self._xbar_gamma,
'obs_gamma': self._obs_gamma,
'btc_var_ema': self._btc_var_ema,
'mu_floor': self._mu_floor,
}
@classmethod
def from_state_dict(cls, d: dict) -> 'IMMKalmanBetaEstimator':
obj = cls.__new__(cls)
obj.M = len(d['models'])
obj._q_beta_grid = d['q_beta_grid']
obj._phi_beta_grid = d.get('phi_beta_grid', [0.95, 0.97, 0.98, 0.99, 0.995])
obj._high_q_threshold = d['high_q_threshold']
obj._n_updates = d['n_updates']
obj._mu = np.array(d['mu'], dtype=np.float64)
obj._TPM = np.array(d['TPM'], dtype=np.float64)
obj._tpm_alpha = np.array(d['tpm_alpha'], dtype=np.float64)
obj._tpm_lambda = d.get('tpm_lambda', 0.995)
obj._sage_husa_b = d.get('sage_husa_b', 0.97)
obj._r_floor = d.get('r_floor', 1e-8)
obj._nu = d.get('nu', 5.0)
obj._nu_gamma = d.get('nu_gamma', 0.01)
obj._fourth_moment_ema = d.get('fourth_moment_ema', 9.0)
obj._clip_sigma = d.get('clip_sigma', 3.0)
obj._x_bar = np.array(d.get('x_bar', [0.0, 1.0]), dtype=np.float64)
obj._xbar_gamma = d.get('xbar_gamma', 0.005)
obj._obs_gamma = d.get('obs_gamma', 0.05)
obj._btc_var_ema = d.get('btc_var_ema', 4e-4)
obj._mu_floor = d.get('mu_floor', 1e-6)
obj._models = []
for md in d['models']:
m = _KalmanModel.__new__(_KalmanModel)
m.x = np.array(md['x'], dtype=np.float64)
m.P = np.array(md['P'], dtype=np.float64)
m.Q = np.array(md['Q'], dtype=np.float64)
m.R = md['R']
m.x_bar = obj._x_bar.copy()
m._phi = np.array(md['phi'], dtype=np.float64)
m._I_phi = np.eye(2) - m._phi
m._r_floor = obj._r_floor
m._clip_sigma = obj._clip_sigma
m.nu = obj._nu
m._update_t_const()
obj._models.append(m)
obj._high_q_indices = [
j for j, q in enumerate(obj._q_beta_grid) if q >= obj._high_q_threshold
]
return obj
4.3 src/utils/analysis/analysis_core.py — 集成点
calculate_cointegration_params_dual_window() 在现有 OLS 之后新增 IMM 更新:
def calculate_cointegration_params_dual_window(
base_klines, alt_klines,
beta_window=None, zscore_window=None,
kalman_state: dict | None = None,
):
# ... 现有 OLS 逻辑不变 ...
# ═══ IMM Kalman Filter 更新(v5.0)═══
kalman_result = None
kalman_state_out = kalman_state
if len(aligned) >= 2:
r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])
if kalman_state is not None:
kf = IMMKalmanBetaEstimator.from_state_dict(kalman_state)
else:
ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
nu_init = _estimate_student_t_nu( # v5: MLE
model.resid if hasattr(model, 'resid') else np.array([]),
default=IMM_STUDENT_T_NU,
)
kf = IMMKalmanBetaEstimator(
alpha_init=alpha_ols if use_alpha else 0.0,
beta_init=beta_ols,
q_beta_grid=IMM_Q_BETA_GRID,
phi_beta_grid=IMM_PHI_BETA_GRID,
q_alpha=IMM_Q_ALPHA, phi_alpha=IMM_PHI_ALPHA,
r_init=max(ols_residual_var, 1e-6),
p0_alpha=IMM_P0_ALPHA, p0_beta=IMM_P0_BETA,
r_floor=IMM_R_FLOOR, sage_husa_b=IMM_SAGE_HUSA_B,
clip_sigma=IMM_CLIP_SIGMA,
nu=nu_init, nu_gamma=IMM_NU_GAMMA,
transition_prob=IMM_TRANSITION_PROB,
tpm_kappa=IMM_TPM_KAPPA, tpm_lambda=IMM_TPM_LAMBDA,
high_q_threshold=IMM_HIGH_Q_THRESHOLD,
mu_floor=IMM_MU_FLOOR,
obs_gamma=IMM_OBS_GAMMA,
xbar_gamma=IMM_XBAR_GAMMA,
)
kalman_result = kf.update(r_btc, r_alt)
kalman_state_out = kf.state_dict()
return {
# ... 现有字段不变 ...
'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
'kalman_regime_score': kalman_result['regime_score'] if kalman_result else 0.0,
'kalman_effective_q': kalman_result['effective_q_beta'] if kalman_result else 1e-4,
'kalman_model_probs': kalman_result['model_probs'].tolist() if kalman_result else None,
'kalman_obs_weight': kalman_result['obs_weight'] if kalman_result else 1.0,
'kalman_nu': kalman_result['nu'] if kalman_result else 5.0,
'kalman_state': kalman_state_out,
}
4.4 src/trading/strategy.py
@dataclass
class _BetaRegimeState:
regime: str # 'stable' | 'expanding'
regime_score: float
kalman_P_beta: float
effective_q_beta: float
threshold_scale: float # ≥1.0
hard_block: bool
reason: str
class _IMMRegimeDetector:
"""基于 IMM 模型概率的 Beta 体制检测器"""
def __init__(self):
self._pair_states: dict[PairKey, _BetaRegimeState] = {}
self._n_updates: dict[PairKey, int] = {}
def update(
self, key: PairKey, regime_score: float,
kalman_P_beta: float, effective_q_beta: float,
kline_time: str,
soft_prob: float, hard_prob: float,
scale_max: float, warmup: int,
) -> _BetaRegimeState:
n = self._n_updates.get(key, 0) + 1
self._n_updates[key] = n
if n < warmup:
state = _BetaRegimeState('stable', regime_score, kalman_P_beta,
effective_q_beta, 1.0, False, "数据不足")
elif regime_score >= hard_prob:
state = _BetaRegimeState(
'expanding', regime_score, kalman_P_beta, effective_q_beta,
scale_max, True,
f"Beta硬拦截: regime={regime_score:.3f}>={hard_prob} eff_Q={effective_q_beta:.1e}"
)
elif regime_score >= soft_prob:
t = min(max((regime_score - soft_prob) / max(hard_prob - soft_prob, 0.01), 0), 1)
scale = 1.0 + t * (scale_max - 1.0)
state = _BetaRegimeState(
'expanding', regime_score, kalman_P_beta, effective_q_beta,
scale, False,
f"Beta缩放: regime={regime_score:.3f} scale={scale:.2f}"
)
else:
state = _BetaRegimeState('stable', regime_score, kalman_P_beta,
effective_q_beta, 1.0, False,
f"Beta稳定: regime={regime_score:.3f}")
self._pair_states[key] = state
return state
def get_all_states(self) -> dict[PairKey, _BetaRegimeState]:
return self._pair_states
def cleanup_pair(self, key: PairKey) -> None:
self._pair_states.pop(key, None)
self._n_updates.pop(key, None)
class _SystemicRiskAggregator:
"""双独立阈值 + 确认窗口的系统性风险聚合"""
def __init__(self, enabled: bool = True):
self._enabled = enabled
self._consecutive_triggers = 0
def check(
self,
pair_states: dict[PairKey, _BetaRegimeState],
ratio_threshold: float = 0.3,
weighted_threshold: float = 0.25,
confirm_bars: int = 2,
position_weights: dict[PairKey, float] | None = None,
) -> tuple[bool, str]:
if not self._enabled or not pair_states:
self._consecutive_triggers = 0
return False, ""
total = len(pair_states)
n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
ratio = n_expanding / total
if position_weights:
total_w = sum(position_weights.get(k, 1.0) for k in pair_states)
weighted_score = sum(
position_weights.get(k, 1.0) * s.regime_score
for k, s in pair_states.items()
) / total_w
else:
weighted_score = sum(s.regime_score for s in pair_states.values()) / total
ratio_hit = ratio >= ratio_threshold
weighted_hit = weighted_score >= weighted_threshold
if ratio_hit or weighted_hit:
self._consecutive_triggers += 1
if self._consecutive_triggers >= confirm_bars:
return True, (
f"系统性风险(连续{self._consecutive_triggers}次): "
f"{n_expanding}/{total} ({ratio:.0%})配对扩张"
f"{'(比例触发)' if ratio_hit else ''}, "
f"加权regime={weighted_score:.3f}"
f"{'(加权触发)' if weighted_hit else ''}"
)
return False, f"系统性风险待确认({self._consecutive_triggers}/{confirm_bars})"
self._consecutive_triggers = 0
return False, ""
4.5 src/trading/config.py
@dataclass(frozen=True)
class StrategyParams:
# ... 现有字段 ...
# Beta 体制过滤器
beta_regime_enabled: bool = True
beta_regime_soft_prob: float = 0.3
beta_regime_hard_prob: float = 0.7
beta_regime_scale_max: float = 2.0
beta_regime_warmup: int = 5
# 系统性风险
systemic_risk_enabled: bool = True
systemic_ratio_threshold: float = 0.3
systemic_weighted_threshold: float = 0.25
systemic_confirm_bars: int = 2
4.6 src/trading/orchestrator.py
def resolve_hedge_beta(
kalman_beta: float | None,
kalman_P_beta: float | None,
ols_beta: float | None,
p_beta_max: float = HEDGE_BETA_P_MAX,
beta_min: float = HEDGE_BETA_MIN,
beta_max: float = HEDGE_BETA_MAX,
) -> tuple[float, str, bool]:
"""选择 hedge_beta: Kalman → OLS → 1.0 兜底"""
raw_beta = None
source = 'default'
if kalman_beta is not None and kalman_P_beta is not None:
if kalman_P_beta <= p_beta_max:
raw_beta = kalman_beta
source = 'kalman'
if raw_beta is None and ols_beta is not None:
raw_beta = ols_beta
source = 'ols'
if raw_beta is None:
return 1.0, 'default', False
beta_negative = raw_beta < 0
beta = float(np.clip(abs(raw_beta), beta_min, beta_max))
return beta, source, beta_negative
process_analysis() 提取 IMM 输出,透传到 strategy.process_tick() 和 on_entry_signal()。
4.7 src/trading/risk_manager.py
def calculate_position_size(self, signal, alt_price, base_price=0.0,
available_balance=0.0,
hedge_beta=1.0, hedge_beta_source=''):
# ... 现有逻辑 ...
if self._config.pair_mode == "pair" and base_price > 0 and alt_price > 0:
abs_beta = max(abs(hedge_beta), 0.1)
total_notional = position_usd * 2
base_notional = total_notional / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
alt_size = alt_notional / alt_price
base_size = base_notional / base_price
elif alt_price > 0:
alt_size = position_usd / alt_price
return alt_size, base_size
4.8 src/trading/models.py
@dataclass
class PairTradeSignal:
# ... 现有字段 ...
hedge_beta: float = 1.0
hedge_beta_source: str = 'default'
beta_negative: bool = False
@dataclass
class PairPosition:
# ... 现有字段 ...
entry_hedge_beta: float = 1.0
5. 数据流
1. WebSocket 4h K 线闭合
↓
2. realtime_kline_service_base 触发分析
↓
3. analyze_multi_period()
├─ calculate_cointegration_params_dual_window(kalman_state=缓存)
│ ├─ OLS 回归 → β_ols, spread, adf_pvalue(现有)
│ └─ IMM Kalman v5.0 update:
│ ├─ Step 1: 连续可观测性权重
│ ├─ Step 2: 自适应 Dirichlet TPM 更新 + 混合
│ ├─ Step 3: M=5 并行 OU 预测
│ ├─ Step 4: Student-t 更新(Joseph + Huber)
│ ├─ Step 5: 似然温和 + 贝叶斯模型概率更新
│ ├─ Step 6: Sage-Husa 在线 R
│ ├─ Step 7: 融合 → beta, P_beta, regime_score
│ ├─ Step 8: 在线 ν 自适应(E[z⁴])
│ └─ Step 9: x̄ 在线漂移
└─ 输出 multi_period_result(含 kalman_* 字段)
↓
4. orchestrator.process_analysis()
├─ 缓存 kalman_state
├─ → strategy.process_tick()
│ ├─ _IMMRegimeDetector.update() → 硬拦截 / 阈值缩放
│ └─ _SystemicRiskAggregator.check() → 系统性拦截
├─ 若产生 EntrySignal:
│ ├─ resolve_hedge_beta()
│ └─ on_entry_signal(hedge_beta=...)
↓
5. position_manager → risk_manager.calculate_position_size(hedge_beta)
├─ base_notional = total / (1 + |β|)
└─ alt_notional = |β| × base_notional
6. DB 变更
ALTER TABLE pair_positions ADD COLUMN entry_hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta_source VARCHAR(10) DEFAULT 'default';
ALTER TABLE trading_signals ADD COLUMN beta_negative BOOLEAN DEFAULT FALSE;
7. 验证
# ── 核心收敛 ──
def test_imm_convergence():
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5)
rng = np.random.default_rng(42)
for _ in range(200):
r = rng.normal(0, 0.02)
kf.update(r, 0.001 + 1.0 * r + rng.normal(0, 0.01))
assert abs(kf.beta - 1.0) < 0.15
def test_imm_ou_steady_state():
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5)
rng = np.random.default_rng(42)
p_betas = []
for _ in range(500):
r = rng.normal(0, 0.02)
result = kf.update(r, 0.5 * r + rng.normal(0, 0.01))
p_betas.append(result['P_beta'])
assert np.var(p_betas[400:]) < np.var(p_betas[50:150]) * 0.5
assert max(p_betas[100:]) < 0.5
def test_imm_interaction_no_corruption():
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), rng.normal(0, 0.03))
for b in [m.x[1] for m in kf._models]:
assert abs(b) < 10, f"Beta out of range after mixing: {b}"
# ── 体制检测 ──
def test_imm_regime_score_stable():
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5)
rng = np.random.default_rng(42)
for _ in range(50):
result = kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
assert result['regime_score'] < 0.3
def test_imm_regime_score_spike():
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
for _ in range(5):
r = rng.normal(0, 0.02)
result = kf.update(r, 3.0 * r + rng.normal(0, 0.01))
assert result['regime_score'] > 0.3
# ── Student-t 鲁棒性 ──
def test_imm_student_t_robustness():
kf_t = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, nu=5.0)
kf_g = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, nu=1000.0)
for i in range(50):
r = 0.01 * (1 if i % 2 == 0 else -1)
kf_t.update(r, 0.5 * r + 0.001)
kf_g.update(r, 0.5 * r + 0.001)
rs_t, rs_g = kf_t.regime_score, kf_g.regime_score
result_t = kf_t.update(0.02, 0.5 * 0.02 + 0.15) # 5σ 异常值
result_g = kf_g.update(0.02, 0.5 * 0.02 + 0.15)
assert (result_t['regime_score'] - rs_t) < (result_g['regime_score'] - rs_g)
# ── v5 新增:Sage-Husa R ──
def test_sage_husa_r_decreases_with_lower_noise():
rng = np.random.default_rng(42)
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, sage_husa_b=0.97)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.05))
R_high = kf._models[0].R
for _ in range(80):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.005))
assert kf._models[0].R < R_high * 0.5
def test_sage_husa_fast_early_convergence():
"""Sage-Husa 前期 d_k 大(接近 1),比 EMA 收敛更快"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, sage_husa_b=0.97)
b = kf._sage_husa_b
d1 = (1 - b) / (1 - b ** 1) # = 1.0
d50 = (1 - b) / (1 - b ** 50) # ≈ 0.031
assert abs(d1 - 1.0) < 1e-10
assert d50 < 0.04
# ── v5 新增:Dirichlet TPM 自适应 ──
def test_dirichlet_tpm_adapts():
"""β 长期快变时,Dirichlet TPM 应向高Q模型间的高转移概率收敛"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, tpm_kappa=5.0, tpm_lambda=0.98)
rng = np.random.default_rng(42)
tpm_init = kf._TPM.copy()
# 注入快变 β 数据,高Q模型应被激活
beta_val = 0.5
for i in range(100):
beta_val += 0.05 * (1 if i % 4 < 2 else -1)
r = rng.normal(0, 0.02)
kf.update(r, beta_val * r + rng.normal(0, 0.01))
# TPM 应有所改变(不再等于初始值)
assert not np.allclose(kf._TPM, tpm_init, atol=1e-4)
def test_dirichlet_tpm_rows_sum_to_one():
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5)
rng = np.random.default_rng(42)
for _ in range(100):
kf.update(rng.normal(0, 0.02), rng.normal(0, 0.03))
row_sums = kf._TPM.sum(axis=1)
assert np.allclose(row_sums, 1.0, atol=1e-10)
# ── v5 新增:ν 估计修正 ──
def test_online_nu_uses_fourth_moment_not_mean_power():
"""验证 E[z⁴] 路径:重尾数据应使 ν 减小"""
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, nu=10.0, nu_gamma=0.05)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
nu_before = kf._nu
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.standard_t(3) * 0.03)
assert kf._nu < nu_before, f"ν should decrease with heavy-tail data: {nu_before} → {kf._nu}"
def test_nu_mle_init_heavy_tail():
"""MLE 初始化:重尾残差 → 低 ν,正态残差 → 高 ν"""
rng = np.random.default_rng(42)
nu_normal = _estimate_student_t_nu(rng.normal(0, 1, 300))
nu_heavy = _estimate_student_t_nu(rng.standard_t(df=3, size=300))
assert nu_normal > 10
assert nu_heavy < 7
assert nu_heavy < nu_normal
# ── 基础保证 ──
def test_imm_model_probs_sum_to_one():
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5)
rng = np.random.default_rng(42)
for _ in range(200):
result = kf.update(rng.normal(0, 0.02), rng.normal(0, 0.03))
assert abs(sum(result['model_probs']) - 1.0) < 1e-10
def test_imm_state_persistence():
kf1 = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, nu=7.0)
kf1.update(0.02, 0.01)
kf2 = IMMKalmanBetaEstimator.from_state_dict(kf1.state_dict())
r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
assert abs(r1['beta'] - r2['beta']) < 1e-10
def test_imm_observability_continuous():
kf = IMMKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
mu_before = kf._mu.copy()
result = kf.update(1e-8, rng.normal(0, 0.01))
assert result['obs_weight'] < 0.01
assert np.max(np.abs(kf._mu - mu_before)) < 0.05
# ── Hedge Beta ──
def test_resolve_hedge_beta_kalman():
beta, src, neg = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.01, ols_beta=0.5)
assert src == 'kalman' and abs(beta - 0.45) < 0.01 and not neg
def test_resolve_hedge_beta_fallback_ols():
beta, src, _ = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.8, ols_beta=0.5)
assert src == 'ols' and abs(beta - 0.5) < 0.01
def test_resolve_hedge_beta_negative():
beta, src, neg = resolve_hedge_beta(kalman_beta=-0.8, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 0.8 and neg is True
# ── 消融实验(回测验证基准) ──
# 1. OU vs 随机游走: 设 phi_beta_grid=[1,1,1,1,1],观察 P_β 是否无界增长
# 2. Student-t vs 高斯: 设 nu=1000,观察 regime_score 异常波动频率
# 3. Dirichlet TPM vs 静态 TPM: 对比不同 beta 变化速率配对的检测延迟
# 4. E[z⁴] vs (E[z])⁴: 注入重尾数据,对比 ν 估计偏差
# 5. Sage-Husa vs Robbins-Monro: 初始化后前 20 步 R 追踪误差对比