Beta 体制自适应过滤器设计方案 v2
Beta 体制自适应过滤器设计方案 v2
1. 问题背景
1.1 市场现象
当锚定物(BTC)回暖后,Alt 资产的 Beta 值会持续飙升:
正常态: β ≈ 0.3 ~ 0.5 (BTC 涨 1%,Alt 涨 0.3~0.5%)
扩张态: β ≈ 4 ~ 10 (BTC 涨 1%,Alt 涨 4~10%)
这不是价格的普通上涨,而是价差的持续过大 — 协整关系的阶段性破裂。
1.2 当前系统弱点
| 组件 | 问题 | 影响 |
|---|---|---|
analysis_core.py |
BETA_WINDOW=100 固定窗口 OLS |
β 估计滞后 ≈17天才能跟上新体制 |
strategy.py |
adaptive_threshold=3.0 固定 |
β 飙升导致 z4h 极端,持续突破阈值 |
momentum_filter.py |
只检测单腿价格趋势 | 不检测 spread/beta 层面的体制切换 |
| 健康监控 | 长窗口(200)/短窗口(100) | 反应迟钝,等检测到已经亏了 |
1.3 故障场景还原
时间线(无 Beta 体制过滤器):
T+0h BTC 开始回暖,β 从 0.5 开始上升
T+12h β 实际已达 2.0,但 OLS(100期) β̂ 仍 ≈ 0.6
→ spread = log(alt) - 0.6×log(btc) 持续偏大
→ z4h 持续走正,adaptive_z 突破阈值
→ 系统产生做空信号(误判为均值回归机会)
T+24h β 实际达 4.0,OLS β̂ ≈ 0.8
→ z4h 继续走高,系统再次尝试入场
→ 开仓即被 β 持续扩张吞噬利润
T+7d β 实际达 8.0,OLS β̂ ≈ 2.0
→ 仍在追赶,z4h 仍然极端
→ 协整假设彻底失效,均值回归策略不成立
核心问题:系统用滞后 β 计算的 z4h 产生入场信号,实际上这些信号反映的是 β 正在变化,而非 spread 正在回归。
2. 算法选型与对比
2.1 业界主流时变 Beta 估计方案
| 方案 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 双窗口 OLS | 两个不同长度的滑动窗口 OLS 对比 | 实现简单 | 两个估计都滞后,信噪比低 | 快速原型 |
| EWMA 回归 | 指数加权最小二乘 | 比 OLS 更快响应 | 无最优性保证,无不确定性估计 | 中等需求 |
| Kalman Filter | 状态空间模型,递归最优估计 | 最优加权、实时追踪、自带不确定性 | 需调 q/r 参数 | 本项目首选 |
| DCC-GARCH | 动态条件相关模型 | 完整的波动率建模 | 计算重,参数多 | 学术研究 |
| HMM 体制切换 | 隐马尔可夫模型 | 概率化体制判定 | 离线训练,在线更新复杂 | 体制分类(可作后续增强) |
2.2 业界主流变点检测方案
| 方案 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 均值阈值 | mean(divergence) > threshold |
极简 | 无统计基础,硬编码阈值 | v1 原型 |
| CUSUM | 累积偏离检测 | 对持续漂移极敏感,有统计基础 | 需设参考值和阈值 | 本项目首选 |
| Bayesian Online CPD | 贝叶斯在线变点检测 | 概率化,无需预设阈值 | 实现较复杂 | 高要求场景 |
| Bai-Perron | 结构性断点检验 | 严格统计检验 | 离线方法 | 回测分析 |
2.3 本方案选择:Kalman Filter + CUSUM
第一层:Kalman Filter 估计时变 β
- 递归最优估计,每个 4h 数据点更新一次
- 自然输出 β 的估计不确定性 P(Kalman 协方差)
- 自然输出innovation(预测误差),直接反映 β 的突变
- 替代 v1 的双窗口 OLS + β_divergence 衍生指标
第二层:CUSUM 检测 β 的结构性漂移
- 监控 Kalman innovation 序列的累积偏离
- 对持续漂移(β 飙升)极其敏感
- 比均值阈值有更强的统计基础(Wald 序贯检验的特例)
- 双向检测:上行扩张 + 下行急变
2.4 v1 → v2 对比
| 维度 | v1(双窗口 OLS + 均值阈值) | v2(Kalman + CUSUM) |
|---|---|---|
| β 估计 | 两个滞后的离散快照 | 连续最优递归估计 |
| 体制检测信号 | β_divergence(两个不准的估计求差) | innovation + P(卡尔曼原生信号) |
| 变点检测 | mean > threshold(硬编码) |
CUSUM(累积偏离,统计有据) |
| 检测延迟 | 需 12h+(MIN_SAMPLES=3 根 4h) | 1 根 4h 即可(innovation 立即飙升) |
| 方向覆盖 | 仅上行 | 上行 + 下行急变 |
| 去重机制 | 浮点 epsilon 比较 | kline_time 时间戳 |
| 阈值缩放 | 线性插值 | Sigmoid 平滑 |
| 参数数量 | 4 个(divergence_threshold, hard_block_threshold, scale_max, buffer_size) | 3 个(q, cusum_threshold, scale_max) |
| 冷启动 | 3 根 4h(12h) | 1 根 4h(4h) |
3. 详细设计
3.1 Kalman Filter 时变 Beta 估计
数学模型
状态方程(β 缓慢演化):
β_t = β_{t-1} + w_t, w_t ~ N(0, Q)
观测方程(对数收益率关系):
r_alt_t = β_t × r_btc_t + v_t, v_t ~ N(0, R)
其中:
r_alt_t = log(alt_t) - log(alt_{t-1}):Alt 对数收益率r_btc_t = log(btc_t) - log(btc_{t-1}):BTC 对数收益率Q:过程噪声方差(控制 β 的先验变化速率)R:观测噪声方差(收益率残差的方差)
递推公式:
预测:
β̂_t|t-1 = β̂_{t-1} (β 的先验预测)
P_t|t-1 = P_{t-1} + Q (协方差预测)
更新:
innovation_t = r_alt_t - β̂_t|t-1 × r_btc_t (预测误差)
S_t = r_btc_t² × P_t|t-1 + R (innovation 方差)
K_t = P_t|t-1 × r_btc_t / S_t (Kalman 增益)
β̂_t = β̂_t|t-1 + K_t × innovation_t (β 最优估计)
P_t = (1 - K_t × r_btc_t) × P_t|t-1 (估计协方差)
关键信号:
innovation_t:预测误差,β 突变时立即飙升P_t:估计不确定性,β 不稳定时持续增大normalized_innovation_t = innovation_t / √S_t:标准化预测误差,服从 N(0,1)
参数选择
| 参数 | 符号 | 默认值 | 含义 | 调优方向 |
|---|---|---|---|---|
| 过程噪声 | Q | 1e-4 | β 每 4h 的先验变化方差 | ↑更快追踪 ↓更稳定 |
| 观测噪声 | R | 自适应 | 收益率残差方差,从历史数据估计 | 自动校准 |
| 初始 Beta | β₀ | OLS(100) | 用现有 OLS 估计初始化 | — |
| 初始协方差 | P₀ | 1.0 | 初始不确定性(较大,让滤波器快速收敛) | — |
Q 的选择依据:
- Q = 1e-4 意味着 β 每 4h 的标准差先验为 0.01
- 一天 6 根 4h K线 → 日 β 漂移标准差 ≈ 0.01 × √6 ≈ 0.024
- 一周 → 周 β 漂移标准差 ≈ 0.01 × √42 ≈ 0.065
- 合理地允许 β 在正常市场中缓慢变化,同时在突变时 innovation 会显著偏大
R 的自适应估计:
# 使用最近 N 根 4h K线的收益率残差方差估计 R
# 初始化时从 OLS 残差估计;运行中通过指数加权更新
R_t = α_R × (innovation_t² / r_btc_t²) + (1 - α_R) × R_{t-1}
文件:src/config.py
# Kalman Filter Beta 估计参数
KALMAN_Q: float = 1e-4 # [新增] 过程噪声方差(β 变化速率先验)
KALMAN_P0: float = 1.0 # [新增] 初始估计协方差
KALMAN_R_ALPHA: float = 0.05 # [新增] R 自适应更新衰减因子
文件:src/utils/analysis/analysis_core.py
新增类:KalmanBetaEstimator
class KalmanBetaEstimator:
"""Kalman Filter 时变 Beta 估计器
状态空间模型:
状态方程: β_t = β_{t-1} + w_t, w_t ~ N(0, Q)
观测方程: r_alt_t = β_t × r_btc_t + v_t, v_t ~ N(0, R)
每根新 4h K线调用 update() 一次,输出:
beta: β 的最优估计
P: 估计协方差(不确定性)
innovation: 标准化预测误差(用于 CUSUM 变点检测)
"""
def __init__(self, beta_init: float, q: float = 1e-4, r_init: float = 1e-2,
p_init: float = 1.0, r_alpha: float = 0.05):
self.beta = beta_init # 状态估计 β̂
self.P = p_init # 估计协方差
self.Q = q # 过程噪声方差
self.R = r_init # 观测噪声方差(自适应更新)
self._r_alpha = r_alpha # R 更新衰减因子
self._n_updates = 0
def update(self, r_btc: float, r_alt: float) -> dict:
"""接收一对新的对数收益率,更新 β 估计
Args:
r_btc: BTC 对数收益率 log(btc_t / btc_{t-1})
r_alt: Alt 对数收益率 log(alt_t / alt_{t-1})
Returns:
{
'beta': float, # β 最优估计
'P': float, # 估计协方差
'innovation': float, # 原始预测误差
'normalized_innovation': float, # 标准化预测误差 (~ N(0,1))
'kalman_gain': float, # Kalman 增益
}
"""
# 预测步
P_pred = self.P + self.Q
# 避免 r_btc ≈ 0 时的数值问题(BTC 无波动时无法更新 β)
if abs(r_btc) < 1e-10:
return {
'beta': self.beta,
'P': P_pred,
'innovation': 0.0,
'normalized_innovation': 0.0,
'kalman_gain': 0.0,
}
# Innovation(预测误差)
innovation = r_alt - self.beta * r_btc
# Innovation 方差
S = r_btc * r_btc * P_pred + self.R
# Kalman 增益
K = P_pred * r_btc / S
# 更新步
self.beta += K * innovation
self.P = (1 - K * r_btc) * P_pred
# 确保 P 不会因数值误差变负
if self.P < 0:
self.P = 1e-8
# R 自适应更新(指数加权移动平均)
if self._n_updates > 10: # 预热期后开始自适应
r_sample = innovation * innovation / (r_btc * r_btc) if abs(r_btc) > 1e-10 else self.R
self.R = self._r_alpha * r_sample + (1 - self._r_alpha) * self.R
self.R = max(self.R, 1e-8) # 下限保护
self._n_updates += 1
# 标准化 innovation(理论上 ~ N(0,1))
norm_innov = innovation / max(S ** 0.5, 1e-10)
return {
'beta': self.beta,
'P': self.P,
'innovation': innovation,
'normalized_innovation': norm_innov,
'kalman_gain': K,
}
def state_dict(self) -> dict:
"""导出状态(用于持久化)"""
return {
'beta': self.beta, 'P': self.P, 'Q': self.Q,
'R': self.R, '_r_alpha': self._r_alpha,
'_n_updates': self._n_updates,
}
@classmethod
def from_state_dict(cls, d: dict) -> 'KalmanBetaEstimator':
"""从持久化状态恢复"""
obj = cls(beta_init=d['beta'], q=d['Q'], r_init=d['R'],
p_init=d['P'], r_alpha=d['_r_alpha'])
obj._n_updates = d['_n_updates']
return obj
改动函数:calculate_cointegration_params_dual_window()
在现有 OLS 之后,新增 Kalman Filter 输出:
def calculate_cointegration_params_dual_window(
base_klines, alt_klines,
beta_window=None, zscore_window=None,
kalman_state: dict | None = None, # [新增] Kalman 状态(跨调用持久化)
):
# ... 现有 OLS 逻辑不变 ...
# beta_ols = 现有的 β_long
# ═══ 新增:Kalman Filter 更新 ═══
kalman_result = None
kalman_state_out = kalman_state # 透传,无更新时原样返回
if len(aligned) >= 2:
# 计算最新一根 K线的对数收益率
r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])
# 初始化或恢复 Kalman 估计器
if kalman_state is not None:
kf = KalmanBetaEstimator.from_state_dict(kalman_state)
else:
# 首次:用 OLS β 初始化,R 从 OLS 残差估计
ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
kf = KalmanBetaEstimator(
beta_init=beta_ols,
q=KALMAN_Q,
r_init=max(ols_residual_var, 1e-6),
p_init=KALMAN_P0,
r_alpha=KALMAN_R_ALPHA,
)
kalman_result = kf.update(r_btc, r_alt)
kalman_state_out = kf.state_dict()
return {
# ... 现有字段不变 ...
'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
'kalman_P': kalman_result['P'] if kalman_result else 1.0,
'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
'kalman_state': kalman_state_out,
}
Kalman 输出信号语义
| 信号 | 正常范围 | β 飙升时 | 用途 |
|---|---|---|---|
kalman_beta |
≈ OLS β | 快速追踪真实 β | 更准的 β 估计 |
kalman_P |
小(~1e-3) | 增大(β 不稳定) | 不确定性度量 |
normalized_innovation |
~ N(0,1) | 持续 > 2σ | CUSUM 输入信号 |
3.2 CUSUM 变点检测
算法原理
CUSUM(Cumulative Sum)检测统计量监控 Kalman normalized_innovation 的累积偏离:
上行 CUSUM: S⁺_t = max(0, S⁺_{t-1} + z_t - k)
下行 CUSUM: S⁻_t = max(0, S⁻_{t-1} - z_t - k)
其中:
z_t=normalized_innovation(标准化预测误差)k= 参考值(slack parameter),过滤正常波动。取 0.5 是经典选择(对应检测 1σ 偏移的最优值)- 当
S⁺_t > h或S⁻_t > h→ 检测到体制切换
CUSUM 为什么优于均值阈值:
- 均值阈值需要积累多个样本才能判定(v1 需 MIN_SAMPLES=3 根,即 12h)
- CUSUM 是累积量,第一根偏离的 K 线就开始积累,持续偏离时快速超过阈值
- 如果偏离是暂时的(一根 K 线),CUSUM 会因
-k项自动衰减,不会误报 - 这正是 Wald 序贯概率比检验的核心思想:以最小样本量检测分布漂移
文件:src/trading/config.py
StrategyParams 新增字段:
@dataclass(frozen=True)
class StrategyParams:
# ... 现有字段 ...
# Beta 体制自适应过滤器(Kalman + CUSUM)
beta_regime_enabled: bool = True
beta_regime_cusum_k: float = 0.5 # CUSUM 参考值(过滤正常波动)
beta_regime_cusum_h: float = 4.0 # CUSUM 报警阈值(体制切换判定)
beta_regime_hard_block_h: float = 8.0 # CUSUM 硬拦截阈值
beta_regime_scale_max: float = 2.0 # 阈值最大缩放倍数
参数语义:
| 参数 | 默认值 | 含义 | 与 v1 对应 |
|---|---|---|---|
cusum_k |
0.5 | innovation 偏离超过 0.5σ 才开始累积 | — |
cusum_h |
4.0 | CUSUM 累积到 4.0 开始缩放阈值 | divergence_threshold=0.8 |
hard_block_h |
8.0 | CUSUM 累积到 8.0 硬拦截 | hard_block_threshold=2.0 |
scale_max |
2.0 | 最大缩放倍数(同 v1) | scale_max=2.0 |
cusum_h = 4.0 的选择依据:
- 正常市场中
normalized_innovation ~ N(0,1),单次偏离 >0.5σ 的概率 ≈ 31% - 即使每根 K线 都偏离 1.0σ,CUSUM 每次也只累积 0.5(扣除 k),需 8 根 K线(32h)才触发
- 但如果 β 真正飙升导致 innovation 持续 3σ+,每次累积 2.5,仅需 2 根 K线(8h)即触发
- 对比 v1 的 12h 最快检测 → 检测速度提升 33%
hard_block_h = 8.0 的选择依据:
- 持续 3σ 偏离时 ≈ 4 根 K线(16h)触发
- 持续 5σ 偏离时 ≈ 2 根 K线(8h)触发
- 对应 β 已经发生剧烈变化,协整关系完全失效
文件:src/trading/strategy.py
3.2.1 _BetaRegimeState 数据类
@dataclass
class _BetaRegimeState:
"""Beta 体制检测结果"""
regime: str # 'stable' | 'expanding'
cusum_up: float # 上行 CUSUM 值
cusum_down: float # 下行 CUSUM 值
kalman_P: float # Kalman 估计协方差
threshold_scale: float # 阈值缩放因子 (>=1.0)
hard_block: bool # True = 硬拦截
reason: str # 诊断信息
3.2.2 _BetaRegimeTracker 类
class _BetaRegimeTracker:
"""Beta 体制自适应跟踪器(Kalman + CUSUM)
跟踪 Kalman Filter 输出的 normalized_innovation 序列,
通过 CUSUM 检测 β 的结构性漂移。
数据策略:
- 每个 5m tick 接收一次 Kalman 输出
- 基于 kline_time 时间戳去重,仅新 4h K线才更新 CUSUM
- CUSUM 是递推计算,无需缓冲区
判定逻辑:
- max(S⁺, S⁻) >= hard_block_h → 硬拦截
- max(S⁺, S⁻) >= cusum_h → 阈值缩放
- 否则 → 正常交易
"""
_MIN_UPDATES = 2 # 至少 2 根 4h K线才开始判定
def __init__(self, enabled: bool = True):
self._enabled = enabled
# 每配对的 CUSUM 状态
self._cusum_up: dict[PairKey, float] = {}
self._cusum_down: dict[PairKey, float] = {}
self._last_kline_time: dict[PairKey, str] = {} # 时间戳去重
self._n_updates: dict[PairKey, int] = {}
self._last_kalman_P: dict[PairKey, float] = {}
def update(
self,
key: PairKey,
normalized_innovation: float,
kalman_P: float,
kline_time: str,
cusum_k: float,
) -> None:
"""每 5m tick 调用,基于 kline_time 去重
Args:
key: 配对标识
normalized_innovation: Kalman 标准化预测误差
kalman_P: Kalman 估计协方差
kline_time: 4h K线时间戳(去重依据)
cusum_k: CUSUM 参考值
"""
if not self._enabled or normalized_innovation is None:
return
# 时间戳去重:同一根 4h K线只处理一次
last_time = self._last_kline_time.get(key)
if last_time == kline_time:
return # 同一根 K线,跳过
self._last_kline_time[key] = kline_time
self._last_kalman_P[key] = kalman_P
# 初始化
if key not in self._cusum_up:
self._cusum_up[key] = 0.0
self._cusum_down[key] = 0.0
self._n_updates[key] = 0
z = normalized_innovation
# CUSUM 递推
self._cusum_up[key] = max(0.0, self._cusum_up[key] + z - cusum_k)
self._cusum_down[key] = max(0.0, self._cusum_down[key] - z - cusum_k)
self._n_updates[key] = self._n_updates.get(key, 0) + 1
def check(
self,
key: PairKey,
cusum_h: float,
hard_block_h: float,
scale_max: float,
) -> _BetaRegimeState:
"""检测当前 Beta 体制状态
Args:
key: 配对标识
cusum_h: CUSUM 报警阈值(开始缩放)
hard_block_h: CUSUM 硬拦截阈值
scale_max: 最大阈值缩放倍数
Returns:
_BetaRegimeState
"""
if not self._enabled:
return _BetaRegimeState('stable', 0.0, 0.0, 0.0, 1.0, False, "")
n = self._n_updates.get(key, 0)
if n < self._MIN_UPDATES:
return _BetaRegimeState('stable', 0.0, 0.0, 0.0, 1.0, False, "数据不足")
s_up = self._cusum_up.get(key, 0.0)
s_down = self._cusum_down.get(key, 0.0)
s_max = max(s_up, s_down)
kalman_P = self._last_kalman_P.get(key, 0.0)
# ── 判定 ──
if s_max >= hard_block_h:
direction = "上行" if s_up >= s_down else "下行"
return _BetaRegimeState(
'expanding', s_up, s_down, kalman_P, scale_max, True,
f"Beta硬拦截({direction}): S⁺={s_up:.2f} S⁻={s_down:.2f} "
f">={hard_block_h} P={kalman_P:.6f}"
)
if s_max >= cusum_h:
# Sigmoid 缩放: cusum_h → scale≈1.0, hard_block_h → scale≈scale_max
# 使用 sigmoid 而非线性插值,在边界处更平滑
midpoint = (cusum_h + hard_block_h) / 2
steepness = 6.0 / (hard_block_h - cusum_h) # 确保 [cusum_h, hard_block_h] 覆盖 ~95% sigmoid
import math
t = 1.0 / (1.0 + math.exp(-steepness * (s_max - midpoint)))
scale = 1.0 + t * (scale_max - 1.0)
direction = "上行" if s_up >= s_down else "下行"
return _BetaRegimeState(
'expanding', s_up, s_down, kalman_P, scale, False,
f"Beta缩放({direction}): S⁺={s_up:.2f} S⁻={s_down:.2f} "
f"scale={scale:.2f} P={kalman_P:.6f}"
)
return _BetaRegimeState(
'stable', s_up, s_down, kalman_P, 1.0, False,
f"Beta稳定: S⁺={s_up:.2f} S⁻={s_down:.2f} P={kalman_P:.6f}"
)
def reset_cusum(self, key: PairKey) -> None:
"""体制切换确认后重置 CUSUM(可选,用于切换后重新开始监控)"""
self._cusum_up[key] = 0.0
self._cusum_down[key] = 0.0
def cleanup_pair(self, key: PairKey) -> None:
self._cusum_up.pop(key, None)
self._cusum_down.pop(key, None)
self._last_kline_time.pop(key, None)
self._n_updates.pop(key, None)
self._last_kalman_P.pop(key, None)
关键设计决策:
-
时间戳去重(vs v1 浮点 epsilon):
- 直接比较
kline_time字符串,100% 准确 - 不存在 v1 中边界值
abs(new-old) < 0.001误判的风险 kline_time已由_process_tick_unlocked()传入,无额外成本
- 直接比较
-
CUSUM 递推(vs v1 缓冲区均值):
- 无需 deque 缓冲区,O(1) 空间
- 递推计算,O(1) 时间
- CUSUM 的
max(0, ...)自带"遗忘"机制:当 innovation 回归正常时自动衰减
-
双向检测(vs v1 仅上行):
S⁺检测 β 上行飙升(Alt 跑赢 BTC)S⁻检测 β 下行急变(Alt 急速回落)- 两种情况都意味着协整关系不稳定
take max(S⁺, S⁻)统一处理
-
Sigmoid 缩放(vs v1 线性插值):
- 在
cusum_h附近缓慢启动,避免刚到阈值就大幅缩放 - 在
hard_block_h附近快速趋近上限 - 过渡更平滑,无硬拐点
- 在
-
CUSUM 自动衰减:
- 体制恢复后 innovation 回归 N(0,1),CUSUM 因
-k项自然下降 - 无需 v1 中"等 buffer 中高值衰减"的问题
- 衰减速度由
k控制:k=0.5 → 每根正常 K线约下降 0.5
- 体制恢复后 innovation 回归 N(0,1),CUSUM 因
3.3 编排层:传递 Kalman 输出
文件:src/trading/orchestrator.py
改动函数:process_analysis()
从 multi_period_result 提取 Kalman 输出,传给 strategy.process_tick():
# 在调用 strategy.process_tick() 之前
kalman_innovation = multi_period_result.get('kalman_innovation')
kalman_P = multi_period_result.get('kalman_P')
entry_signal, exit_signal = self._strategy.process_tick(
symbol, base_symbol, z4h, timestamp,
kline_time=kline_time,
latest_price=price_for_log,
kalman_innovation=kalman_innovation, # [新增]
kalman_P=kalman_P, # [新增]
)
无需改 process_analysis() 的方法签名 — multi_period_result 已经作为参数传入。
向上传播
analyze_pair_advanced():从 coint_new 结果中提取 Kalman 输出,放入 cointegration_new dict。
analyze_multi_period():从 4h/60d 周期的 detail 中提取 Kalman 信号,放入返回值:
return {
# ... 现有字段 ...
'kalman_innovation': details.get(('4h', '60d'), {}).get(
'cointegration_new', {}
).get('kalman_innovation', 0.0),
'kalman_P': details.get(('4h', '60d'), {}).get(
'cointegration_new', {}
).get('kalman_P', 1.0),
'kalman_state': details.get(('4h', '60d'), {}).get(
'cointegration_new', {}
).get('kalman_state'),
}
只用 4h 周期的 Kalman 输出,因为:
- 4h 是协整分析和交易信号的主时间框架
- 5m/1h 周期的 β 波动大、噪声多,不适合体制判定
Kalman 状态持久化
kalman_state 是一个 dict,需要跨调用传递以保持 Kalman Filter 的连续性。
持久化方案:
- 在
orchestrator.py中维护_kalman_states: dict[PairKey, dict]字典 - 每次调用
analyze_multi_period()时传入,返回后更新 - 重启后 Kalman 会用 OLS β 重新初始化,2-3 根 K线后收敛
3.4 策略层集成
文件:src/trading/strategy.py
__init__:
self._beta_regime = _BetaRegimeTracker(
enabled=default_params.beta_regime_enabled,
)
process_tick() / _process_tick_unlocked() 签名新增:
def process_tick(
self, symbol, base_symbol, z4h, timestamp,
kline_time=None, latest_price=None,
kalman_innovation: float | None = None, # [新增]
kalman_P: float | None = None, # [新增]
) -> tuple[EntrySignal | None, ExitSignal | None]:
在 _process_tick_unlocked 的新 K 线更新块中:
if is_new_candle:
# ... 现有: Welford 更新, EMA 更新 ...
# [新增] Beta 体制跟踪(基于 kline_time 去重)
if kalman_innovation is not None and kline_time is not None:
self._beta_regime.update(
key,
normalized_innovation=kalman_innovation,
kalman_P=kalman_P or 0.0,
kline_time=str(kline_time),
cusum_k=params.beta_regime_cusum_k,
)
_check_entry() 改动(在 z4h 过滤之后、方向判断之前):
# ── 现有步骤 3: z4h 绝对值过滤 ──
if abs(z4h) < params.min_zscore_abs:
...
return None
# ── [新增] 步骤 4: Beta 体制检查 ──
if params.beta_regime_enabled:
beta_state = self._beta_regime.check(
key,
cusum_h=params.beta_regime_cusum_h,
hard_block_h=params.beta_regime_hard_block_h,
scale_max=params.beta_regime_scale_max,
)
if beta_state.hard_block:
logger.info(
f"🛡️ Beta体制硬拦截 | {pair_label} | {beta_state.reason} | "
f"az={adaptive_z:+.4f} z4h={z4h:+.4f}"
)
return None
threshold_scale = beta_state.threshold_scale
else:
beta_state = None
threshold_scale = 1.0
# ── 步骤 5: 方向判断(应用 Beta 缩放)──
threshold = params.adaptive_threshold * threshold_scale
if adaptive_z < -threshold:
direction = 'long'
elif adaptive_z > threshold:
direction = 'short'
else:
if threshold_scale > 1.01:
logger.info(
f"🛡️ Beta体制缩放拦截 | {pair_label} | "
f"az={adaptive_z:+.4f} 有效阈值={threshold:.2f} "
f"(原始={params.adaptive_threshold} ×{threshold_scale:.2f}) | "
f"{beta_state.reason if beta_state else ''}"
)
return None
cleanup_pair():
self._beta_regime.cleanup_pair(key)
4. 参数调优指南
4.1 完整参数表
| 参数 | 默认值 | 位置 | 含义 | 调优方向 |
|---|---|---|---|---|
KALMAN_Q |
1e-4 | config.py | β 变化速率先验 | ↑更快追踪 ↓更稳定 |
KALMAN_P0 |
1.0 | config.py | 初始不确定性 | 较大值让滤波器快速收敛 |
KALMAN_R_ALPHA |
0.05 | config.py | R 自适应衰减 | ↑更快适应噪声变化 |
beta_regime_cusum_k |
0.5 | StrategyParams | CUSUM 参考值 | ↓更敏感 ↑更多过滤 |
beta_regime_cusum_h |
4.0 | StrategyParams | 开始缩放的 CUSUM 值 | ↓更早介入 ↑更晚介入 |
beta_regime_hard_block_h |
8.0 | StrategyParams | 硬拦截的 CUSUM 值 | ↓更保守 ↑更激进 |
beta_regime_scale_max |
2.0 | StrategyParams | 最大阈值缩放倍数 | ↑更难入场 ↓影响小 |
4.2 环境变量约定
# Kalman Filter 参数(全局)
KALMAN_Q=0.0001
KALMAN_P0=1.0
KALMAN_R_ALPHA=0.05
# CUSUM 参数(策略层,支持全局 / 币种级 / 配对级)
TRADING_STRATEGY_BETA_REGIME_ENABLED=true
TRADING_STRATEGY_BETA_REGIME_CUSUM_K=0.5
TRADING_STRATEGY_BETA_REGIME_CUSUM_H=4.0
TRADING_STRATEGY_BETA_REGIME_HARD_BLOCK_H=8.0
TRADING_STRATEGY_BETA_REGIME_SCALE_MAX=2.0
# 币种级覆盖 (PURR 的 β 天然波动大,提高阈值)
TRADING_STRATEGY_PURR_BETA_REGIME_CUSUM_H=5.0
4.3 场景模拟
场景 A:正常市场(β 稳定)
每根 4h K线: normalized_innovation ~ N(0,1)
典型值: z = [-0.3, 0.8, -0.5, 0.2, -0.7, ...]
CUSUM 演化:
S⁺: max(0, 0 + (-0.3) - 0.5) = 0 → 0
S⁺: max(0, 0 + 0.8 - 0.5) = 0.3 → 0.3
S⁺: max(0, 0.3 + (-0.5) - 0.5) = 0 → 0 ← 自动衰减归零
S⁺: max(0, 0 + 0.2 - 0.5) = 0 → 0
→ S⁺ 始终在 0 附近波动,远低于 cusum_h=4.0
→ regime=STABLE, scale=1.0
→ threshold=3.0(无变化)
→ 正常入场
场景 B:β 开始飙升(早期检测)
β 飙升导致 innovation 持续偏正:
z = [2.5, 3.1, 2.8, 2.2, ...]
CUSUM 演化:
S⁺: max(0, 0 + 2.5 - 0.5) = 2.0 → 2.0
S⁺: max(0, 2.0 + 3.1 - 0.5) = 4.6 → 4.6 ← 第 2 根K线!超过 cusum_h=4.0
S⁺: max(0, 4.6 + 2.8 - 0.5) = 6.9 → 6.9
→ 仅 2 根 4h K线(8h)即检测到 β 飙升
→ 对比 v1 的 3 根 K线(12h)最快检测 → 提前 4h
→ T=8h: regime=EXPANDING, scale=1.15(Sigmoid 缩放刚启动)
→ T=12h: S⁺=6.9, scale=1.75(接近上限)
→ T=16h: S⁺ 可能超过 hard_block_h=8.0 → 硬拦截
场景 C:β 全面飙升(硬拦截)
z = [5.0, 4.5, 5.2, ...]
CUSUM 演化:
S⁺: max(0, 0 + 5.0 - 0.5) = 4.5 → 4.5 ← 第 1 根即超 cusum_h!
S⁺: max(0, 4.5 + 4.5 - 0.5) = 8.5 → 8.5 ← 第 2 根超 hard_block_h
→ 硬拦截,8h 内完成
→ 极端 β 飙升仅需 8h 即触发硬拦截
→ 日志: "🛡️ Beta体制硬拦截(上行) | PURR|HYPE | S⁺=8.50 S⁻=0.00 >=8.0 P=0.012"
场景 D:β 飙升后企稳
β 飙升阶段 S⁺ 累积到 10.0
β 企稳后 innovation 回归 N(0,1):
z = [0.3, -0.2, 0.5, -0.4, 0.1, ...]
CUSUM 衰减:
S⁺: max(0, 10.0 + 0.3 - 0.5) = 9.8 → 9.8
S⁺: max(0, 9.8 + (-0.2) - 0.5) = 9.1 → 9.1
S⁺: max(0, 9.1 + 0.5 - 0.5) = 9.1 → 9.1
S⁺: max(0, 9.1 + (-0.4) - 0.5) = 8.2 → 8.2
...
→ 每根正常 K线 CUSUM 约下降 0.5(= k)
→ 从 10.0 降至 4.0 需约 12 根 K线(48h ≈ 2天)
→ 降至 0 需约 20 根 K线(80h ≈ 3.3天)
→ 保守恢复,确保 β 真正企稳后才放行交易
场景 E:β 急剧下降(下行急变检测 — v1 无此能力)
Alt 急速回落, β 从 5.0 回落到 1.0:
z = [-3.0, -2.8, -3.5, ...]
CUSUM 演化:
S⁻: max(0, 0 + 3.0 - 0.5) = 2.5 → 2.5
S⁻: max(0, 2.5 + 2.8 - 0.5) = 4.8 → 4.8 ← 超过 cusum_h=4.0
→ 下行急变 2 根 K线即检测到
→ 虽然 β 在下降,但协整关系同样不稳定
→ regime=EXPANDING, 提高入场门槛
→ v1 的 max(0, β_short-β_long) 在此场景下 divergence=0,完全漏检
场景 F:短暂噪声冲击(不误报)
z = [2.5, -0.3, 0.1, -0.4, ...] ← 单根 K线偏离,后续正常
CUSUM 演化:
S⁺: max(0, 0 + 2.5 - 0.5) = 2.0 → 2.0
S⁺: max(0, 2.0 + (-0.3) - 0.5) = 1.2 → 1.2 ← 快速衰减
S⁺: max(0, 1.2 + 0.1 - 0.5) = 0.8 → 0.8
S⁺: max(0, 0.8 + (-0.4) - 0.5) = 0 → 0 ← 3 根后归零
→ 单次冲击被 CUSUM 的 -k 项吸收,不会触发(远低于 cusum_h=4.0)
→ 对比 v1: 如果 buffer 只有 3-4 个值,mean 可能被单次高值拉高导致误触发
5. 改动文件清单
| 文件 | 改动类型 | 改动内容 |
|---|---|---|
src/config.py |
新增常量 | KALMAN_Q, KALMAN_P0, KALMAN_R_ALPHA |
src/utils/analysis/analysis_core.py |
新增类 | KalmanBetaEstimator(Kalman Filter 实现) |
src/utils/analysis/analysis_core.py |
增强函数 | calculate_cointegration_params_dual_window() 新增 kalman_state 参数和 Kalman 输出 |
src/utils/analysis/analysis_core.py |
增强函数 | analyze_pair_advanced() 传播 Kalman 输出 |
src/utils/analysis/analysis_core.py |
增强函数 | analyze_multi_period() 返回 kalman_innovation, kalman_P, kalman_state |
src/trading/config.py |
新增字段 | StrategyParams 增加 5 个 beta_regime 参数 |
src/trading/config.py |
增强函数 | get_strategy_params(), _build_strategy_params(), load_trading_config() 适配 |
src/trading/orchestrator.py |
新增状态 | _kalman_states: dict[PairKey, dict] Kalman 状态缓存 |
src/trading/orchestrator.py |
增强调用 | process_analysis() 提取并传递 Kalman 输出 |
src/trading/strategy.py |
新增类 | _BetaRegimeState, _BetaRegimeTracker(CUSUM 实现) |
src/trading/strategy.py |
增强方法 | process_tick() 新增 kalman_innovation, kalman_P 参数 |
src/trading/strategy.py |
增强方法 | _check_entry() 新增 Beta 体制检查 + Sigmoid 缩放 |
src/trading/strategy.py |
增强方法 | cleanup_pair() 清理 tracker 状态 |
不涉及改动的文件:
momentum_filter.py— 不变position_manager.py— 不变(不涉及平仓逻辑,见 §7.1 风险评估)executor.py— 不变models.py— 不变(PairTradeSignal 暂不加字段)
6. 日志设计
6.1 新增日志
| 时机 | 级别 | 格式 |
|---|---|---|
| 初始化 | INFO | 🔬 Beta体制跟踪器初始化 | enabled=True cusum_k=0.50 cusum_h=4.00 hard_block=8.00 scale_max=2.0 |
| 硬拦截 | INFO | 🛡️ Beta体制硬拦截 | PURR|HYPE | Beta硬拦截(上行): S⁺=8.50 S⁻=0.30 >=8.0 P=0.012 | az=-8.50 z4h=-6.20 |
| 缩放拦截 | INFO | 🛡️ Beta体制缩放拦截 | PURR|HYPE | az=-4.50 有效阈值=4.00 (原始=3.0 ×1.33) | Beta缩放(上行): S⁺=5.20 S⁻=0.10 scale=1.33 P=0.008 |
| 5min摘要 | INFO | 📋 状态摘要 | ... | beta_regime=EXPANDING S⁺=5.20 S⁻=0.10 scale=1.33 kalman_β=1.85 |
| 分析层 | DEBUG | Kalman更新 | ... | β̂=1.20 P=0.005 innov=2.85 K=0.12 |
6.2 飞书告警集成
当 Beta 体制从 STABLE → EXPANDING 转换时,可在 orchestrator 层发送飞书告警(可选增强,不在本次范围内)。
7. 风险与边界条件
7.1 已识别风险
| 风险 | 严重度 | 影响 | 缓解 |
|---|---|---|---|
| 已持仓风险暴露 | 高 | β 飙升期间只拦截新入场,已有持仓的止损/止盈不受影响 | 本次仅做入场拦截;退场加固需作为 P0 后续任务 |
| Kalman Q 不适配 | 中 | Q 太大 → β 追踪过敏(对噪声过度反应);Q 太小 → 追踪迟钝 | R 自适应 + 回测调优;默认 1e-4 在多数币对上合理 |
| 重启丢失 Kalman 状态 | 中 | 重启后 Kalman 用 OLS β 重新初始化,2-3 根 K线后收敛 | 4h 级别,最多 12h 无精确追踪;考虑后续持久化到 DB |
| CUSUM 衰减慢 | 低 | β 企稳后 CUSUM 需 2-3 天归零 | 保守策略,确保真正企稳;可通过降低 hard_block_h 缓解 |
| 低波动期 BTC r≈0 | 低 | r_btc ≈ 0 时无法更新 Kalman | 代码中已有 abs(r_btc) < 1e-10 保护,跳过更新 |
7.2 不在本次范围
- 退场逻辑调整(β 飙升时的持仓保护)— 建议作为 P0 后续任务
- HMM 体制分类(概率化体制判定,作为 CUSUM 的后续升级)
- Kalman 状态持久化到 DB(消除重启冷启动)
- 飞书告警集成
- PairTradeSignal 增加 kalman_beta / cusum 字段
- Q/R 的自适应调优(如 Expectation Maximization)
8. 验证方案
8.1 单元测试
# test_kalman_beta.py
import math
def test_kalman_convergence():
"""Kalman Filter 从初始值收敛到真实 β"""
from src.utils.analysis.analysis_core import KalmanBetaEstimator
kf = KalmanBetaEstimator(beta_init=0.5, q=1e-4, r_init=1e-2)
# 模拟真实 β=1.0 的观测
import random
random.seed(42)
for _ in range(100):
r_btc = random.gauss(0, 0.02)
r_alt = 1.0 * r_btc + random.gauss(0, 0.01)
result = kf.update(r_btc, r_alt)
assert abs(result['beta'] - 1.0) < 0.1 # 收敛到真实值附近
def test_kalman_innovation_spike():
"""β 突变时 innovation 立即飙升"""
from src.utils.analysis.analysis_core import KalmanBetaEstimator
kf = KalmanBetaEstimator(beta_init=0.5, q=1e-4, r_init=1e-2)
# 预热:β=0.5
import random
random.seed(42)
for _ in range(50):
r_btc = random.gauss(0, 0.02)
r_alt = 0.5 * r_btc + random.gauss(0, 0.01)
kf.update(r_btc, r_alt)
# β 突变为 3.0
r_btc = 0.02
r_alt = 3.0 * r_btc
result = kf.update(r_btc, r_alt)
assert abs(result['normalized_innovation']) > 2.0 # 显著偏离
def test_kalman_zero_r_btc():
"""r_btc ≈ 0 时安全跳过"""
from src.utils.analysis.analysis_core import KalmanBetaEstimator
kf = KalmanBetaEstimator(beta_init=0.5, q=1e-4, r_init=1e-2)
result = kf.update(0.0, 0.001)
assert result['innovation'] == 0.0
assert result['kalman_gain'] == 0.0
def test_kalman_state_persistence():
"""状态导出/恢复后行为一致"""
from src.utils.analysis.analysis_core import KalmanBetaEstimator
kf1 = KalmanBetaEstimator(beta_init=0.5, q=1e-4, r_init=1e-2)
kf1.update(0.02, 0.01)
state = kf1.state_dict()
kf2 = KalmanBetaEstimator.from_state_dict(state)
r1 = kf1.update(0.03, 0.015)
r2 = kf2.update(0.03, 0.015)
assert abs(r1['beta'] - r2['beta']) < 1e-10
# test_beta_regime_cusum.py
def test_stable_regime():
"""正常 innovation → regime=STABLE"""
from src.trading.strategy import _BetaRegimeTracker
tracker = _BetaRegimeTracker(enabled=True)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
innovations = [0.3, -0.2, 0.5, -0.4, 0.1, -0.3, 0.2, -0.1, 0.4, -0.5]
for i, z in enumerate(innovations):
tracker.update(key, z, 0.001, f"2024-01-01T{i*4:02d}:00:00", cusum_k=0.5)
state = tracker.check(key, cusum_h=4.0, hard_block_h=8.0, scale_max=2.0)
assert state.regime == 'stable'
assert state.threshold_scale == 1.0
def test_expanding_soft():
"""持续偏正 innovation → CUSUM 超过 cusum_h → 缩放"""
from src.trading.strategy import _BetaRegimeTracker
tracker = _BetaRegimeTracker(enabled=True)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
# 持续 3σ 偏离
for i in range(5):
tracker.update(key, 3.0, 0.01, f"2024-01-01T{i*4:02d}:00:00", cusum_k=0.5)
state = tracker.check(key, cusum_h=4.0, hard_block_h=8.0, scale_max=2.0)
assert state.regime == 'expanding'
assert 1.0 < state.threshold_scale <= 2.0
assert not state.hard_block
def test_expanding_hard():
"""极端 innovation → CUSUM 超过 hard_block_h → 硬拦截"""
from src.trading.strategy import _BetaRegimeTracker
tracker = _BetaRegimeTracker(enabled=True)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
for i in range(5):
tracker.update(key, 5.0, 0.02, f"2024-01-01T{i*4:02d}:00:00", cusum_k=0.5)
state = tracker.check(key, cusum_h=4.0, hard_block_h=8.0, scale_max=2.0)
assert state.regime == 'expanding'
assert state.hard_block
def test_cusum_decay():
"""CUSUM 在 innovation 恢复正常后自动衰减"""
from src.trading.strategy import _BetaRegimeTracker
tracker = _BetaRegimeTracker(enabled=True)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
# 先累积 CUSUM
for i in range(3):
tracker.update(key, 3.0, 0.01, f"2024-01-01T{i*4:02d}:00:00", cusum_k=0.5)
state = tracker.check(key, cusum_h=4.0, hard_block_h=8.0, scale_max=2.0)
cusum_peak = state.cusum_up
# 正常 innovation → CUSUM 衰减
for i in range(20):
tracker.update(key, 0.0, 0.001, f"2024-01-02T{i*4:02d}:00:00", cusum_k=0.5)
state = tracker.check(key, cusum_h=4.0, hard_block_h=8.0, scale_max=2.0)
assert state.cusum_up < cusum_peak # 已衰减
assert state.regime == 'stable' # 回到稳定态
def test_downward_detection():
"""下行急变检测(v1 无此能力)"""
from src.trading.strategy import _BetaRegimeTracker
tracker = _BetaRegimeTracker(enabled=True)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
# 持续负 innovation(β 急剧下降)
for i in range(5):
tracker.update(key, -3.0, 0.01, f"2024-01-01T{i*4:02d}:00:00", cusum_k=0.5)
state = tracker.check(key, cusum_h=4.0, hard_block_h=8.0, scale_max=2.0)
assert state.regime == 'expanding'
assert state.cusum_down > state.cusum_up # 下行 CUSUM 更大
def test_timestamp_dedup():
"""同一 kline_time 不重复更新 CUSUM"""
from src.trading.strategy import _BetaRegimeTracker
tracker = _BetaRegimeTracker(enabled=True)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
# 同一根 K线的 48 个 5m tick
for _ in range(48):
tracker.update(key, 5.0, 0.01, "2024-01-01T00:00:00", cusum_k=0.5)
# 只应更新 1 次,不满足 MIN_UPDATES=2
state = tracker.check(key, cusum_h=4.0, hard_block_h=8.0, scale_max=2.0)
assert state.regime == 'stable'
assert state.reason == "数据不足"
def test_disabled():
"""禁用时始终返回 stable"""
from src.trading.strategy import _BetaRegimeTracker
tracker = _BetaRegimeTracker(enabled=False)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
for i in range(10):
tracker.update(key, 5.0, 0.02, f"2024-01-01T{i*4:02d}:00:00", cusum_k=0.5)
state = tracker.check(key, cusum_h=4.0, hard_block_h=8.0, scale_max=2.0)
assert state.regime == 'stable'
assert state.threshold_scale == 1.0
8.2 集成验证
-
回测:在 β 飙升的历史区间(如 2024 年 BTC 减半后的 alt season)上回测:
- 对比有无 Beta 体制过滤的入场次数和胜率
- 对比 v1 (OLS+均值阈值) vs v2 (Kalman+CUSUM) 的检测延迟和假阳性率
- 预期:v2 检测延迟降低 30-50%,假阳性率降低(CUSUM 抗噪声能力更强)
-
实盘观察:
- 监控 Kalman β 与 OLS β 的对比(Kalman 应更快追踪实际 β)
- 监控 CUSUM S⁺/S⁻ 的演化,验证正常市场中保持低位
- 观察
🛡️ Beta体制硬拦截和🛡️ Beta体制缩放拦截日志频率 - 验证下行急变场景的检测(v1 完全漏检的情况)
- 根据实际情况调整
cusum_h和hard_block_h
-
A/B 对比(可选):
- 同时运行 v1 和 v2 的体制检测逻辑(v2 实际决策,v1 仅日志)
- 对比两者的检测时间差、拦截次数、误报次数
9. 后续演进路线
| 优先级 | 方向 | 描述 | 预期收益 |
|---|---|---|---|
| P0 | 退场保护 | β 飙升时收紧已有仓位的止损 | 减少已持仓的亏损 |
| P1 | Kalman 状态持久化 | 将 kalman_state 存入 DB,消除重启冷启动 | 消除 12h 无保护窗口 |
| P1 | Q 自适应 | 用 EM 或在线 MLE 自适应 Q | 不同币对自动最优 |
| P2 | HMM 体制分类 | 在 CUSUM 之上叠加 HMM,输出体制概率 | 更精细的缩放控制 |
| P2 | 冷却期 | EXPANDING → STABLE 过渡期加额外保护 | 避免体制切换边界的误入场 |
| P3 | 飞书告警 | 体制切换时发送飞书通知 | 人工监控 |