Beta 体制自适应过滤器设计方案(精简版)(Beta值用于计算配对交易的开仓比例)
Kalman Beta 双用途设计方案
1. 问题背景
当前系统在 Beta 值的使用上存在两个结构性缺陷:
缺陷 1:Beta 估计滞后
OLS BETA_WINDOW=100 固定窗口(≈17天@4h),β 结构性变化后系统需要一周以上才能感知:
T+0h BTC 回暖,β 从 0.5 开始上升
T+12h β 实际 2.0,OLS β̂ 仍 ≈ 0.6 → 误判为均值回归机会
T+24h β 实际 4.0,OLS β̂ ≈ 0.8 → 继续误入场
T+7d β 实际 8.0,OLS β̂ ≈ 2.0 → 协整假设彻底失效
核心问题:系统用滞后 β 计算的 z4h 产生入场信号,实际上反映的是 β 正在变化,而非 spread 正在回归。
缺陷 2:仓位比例未使用 Beta
risk_manager.calculate_position_size() 对两腿使用等额名义价值:
# 当前实现(risk_manager.py:164-170)
alt_size = position_usd / alt_price # 两腿等额
base_size = position_usd / base_price # 与 β 无关
配对交易的对冲比例应为 alt_notional = β × base_notional。等额开仓意味着:
- β=0.5 时,Alt 腿过重 → 暴露于 Alt 的非系统性风险
- β=2.0 时,Alt 腿过轻 → 对冲不足,价差波动被放大
- β 变化时,无法调整比例 → 持仓期间 hedge 持续偏离
当前系统弱点汇总
| 组件 | 问题 | 影响 |
|---|---|---|
analysis_core.py |
BETA_WINDOW=100 固定窗口 OLS |
β 估计滞后 ≈17天 |
risk_manager.py |
等额名义价值,未使用 β | 对冲比例错误 |
strategy.py |
adaptive_threshold=3.0 固定 |
β 飙升导致持续突破阈值 |
momentum_filter.py |
只检测单腿价格趋势 | 不检测 spread/beta 体制切换 |
2. 方案概述
核心设计:向量 Kalman Filter 估计时变 [α, β],其输出 同时服务两个用途:
┌─────────────────────────────────────┐
│ VectorKalmanBetaEstimator (4h) │
│ 输入: r_btc, r_alt (对数收益率) │
│ 输出: kalman_beta, innovation, P_β │
└──────────────┬──────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼
用途 1: 体制检测 用途 2: Hedge Ratio
┌─────────────────┐ ┌─────────────────────┐
│ BOCPD 变点检测 │ │ 仓位比例计算 │
│ 输入: innovation │ │ 输入: kalman_beta │
│ 输出: 变点概率 │ │ 输出: alt/base 数量比 │
└────────┬────────┘ └──────────┬──────────┘
│ │
▼ ▼
入场信号过滤 Beta 加权开仓
(硬拦截/阈值缩放) (alt_notional = β × base_notional)
为什么 4h 频率足够
β 是两个资产之间的结构性关系(同赛道、资金流向、基本面联动),变化周期在天~周级别。
| 方法 | 响应 β 结构性变化 | 抗日内噪声 | 适用场景 |
|---|---|---|---|
| OLS 100×4h | ~7-10 天 | 强 | 当前系统(过于滞后) |
| 4h Kalman (q_β=1e-4) | 8-16 小时 | 强 | 体制检测 + Hedge Ratio |
| 5m Kalman | ~15-30 分钟 | 弱(追噪声) | 不适用于 β 估计 |
4h Kalman 的 q_β=1e-4 意味着:单步 β 漂移先验 ≈0.01,日漂移 ≈0.024,周漂移 ≈0.065。足够追踪真实的结构性变化,同时不会被日内噪声带偏。
四层架构
| 层 | 组件 | 功能 |
|---|---|---|
| 第一层 | 向量 Kalman Filter | 估计时变 [α, β],输出 kalman_beta 和 normalized_innovation |
| 第二层 | BOCPD 变点检测 | 监控 innovation 分布变化,输出变点概率 → 入场信号过滤 |
| 第三层 | 跨配对系统性风险聚合 | 统计所有配对体制状态,识别系统性事件 |
| 第四层 | Beta 加权仓位计算 | 用 kalman_beta 计算两腿名义比例,纠正等额对冲偏差 |
3. 详细设计
3.1 向量 Kalman Filter 时变 [α, β] 估计
数学模型
状态方程([α, β] 缓慢演化):
x_t = x_{t-1} + w_t, w_t ~ N(0, Q)
其中 x_t = [α_t, β_t]'
Q = [[q_α, 0 ], 对角阵,α/β 独立演化
[ 0, q_β]]
观测方程(对数收益率关系):
r_alt_t = H_t × x_t + v_t, v_t ~ N(0, R)
其中 H_t = [1, r_btc_t]
r_alt_t = log(alt_t / alt_{t-1})
r_btc_t = log(btc_t / btc_{t-1})
递推公式:
预测步:
x̂_t|t-1 = x̂_{t-1}
P_t|t-1 = P_{t-1} + Q (2×2)
Innovation:
ε_t = r_alt_t - H_t × x̂_t|t-1 (标量)
S_t = H_t × P_t|t-1 × H_t' + R (标量)
Huber Clipping (c=3.0):
ε̃_t = clip(ε_t, -c×√S_t, +c×√S_t)
Kalman 增益:
K_t = P_t|t-1 × H_t' / S_t (2×1)
更新步:
x̂_t = x̂_t|t-1 + K_t × ε̃_t
Joseph 稳定形式:
A = I - K_t × H_t (2×2)
P_t = A × P_t|t-1 × A' + K_t × R × K_t' (保证半正定)
Sage-Husa R 自适应:
d_t = (1 - b) / (1 - b^(t+1)), b ∈ (0,1)
R_t = (1 - d_t) × R_{t-1} + d_t × (ε_t² - H_t × P_t|t-1 × H_t')
R_t = max(R_t, R_floor)
关键输出信号及其用途
| 信号 | 用途 1: 体制检测 | 用途 2: Hedge Ratio |
|---|---|---|
beta (x̂[1]) |
β 趋势监控 | 直接作为 hedge ratio |
alpha (x̂[0]) |
吸收独立漂移,避免污染 β | — |
P_beta (P[1,1]) |
不确定性度量 | hedge ratio 置信度(P_β 过大时降级为 OLS β) |
ε/√S (normalized_innovation) |
BOCPD 输入 | — |
参数
| 参数 | 符号 | 默认值 | 含义 | 调优 |
|---|---|---|---|---|
| β 过程噪声 | q_β | 1e-4 | β 每 4h 变化方差先验 | ↑快追踪 ↓稳定 |
| α 过程噪声 | q_α | 1e-5 | α 每 4h 变化方差先验(比 β 慢) | 通常不调 |
| R 遗忘因子 | b | 0.95 | Sage-Husa 遗忘因子 | ↑平滑 ↓快适应 |
| R 下限 | R_floor | 1e-8 | 防止 R 退化为零 | — |
| Innovation 截断 | c | 3.0 | Huber 截断(σ 倍数) | ↓鲁棒 ↑灵敏 |
| 初始 [α, β] | — | OLS 估计 | 用现有 OLS 初始化 | — |
| 初始 P | P₀ | diag(0.1, 1.0) | 初始不确定性 | — |
q_β = 1e-4 → 日 β 漂移 ≈ 0.024,周 ≈ 0.065,合理覆盖正常变化。
q_α = q_β/10 → α 变化比 β 慢一个量级,避免争夺解释力。
文件:src/config.py
# ═══ Kalman Filter Beta 估计参数 ═══
KALMAN_Q_BETA: float = 1e-4
KALMAN_Q_ALPHA: float = 1e-5
KALMAN_P0_ALPHA: float = 0.1
KALMAN_P0_BETA: float = 1.0
KALMAN_R_FORGET: float = 0.95
KALMAN_R_FLOOR: float = 1e-8
KALMAN_CLIP_SIGMA: float = 3.0
# ═══ Hedge Ratio 参数 ═══
HEDGE_BETA_MIN: float = 0.1 # β 下限保护(防止极端比例)
HEDGE_BETA_MAX: float = 5.0 # β 上限保护
HEDGE_BETA_P_MAX: float = 0.5 # P_β 超过此值时降级为 OLS β(Kalman 不确定性过高)
文件:src/utils/analysis/analysis_core.py
新增类:VectorKalmanBetaEstimator
import numpy as np
class VectorKalmanBetaEstimator:
"""向量 Kalman Filter 时变 [α, β] 联合估计器
状态空间模型:
状态方程: x_t = x_{t-1} + w_t, w_t ~ N(0, Q), x = [α, β]'
观测方程: r_alt_t = [1, r_btc_t] × x_t + v_t, v_t ~ N(0, R)
特性:
- 二维状态 [α, β]:截距吸收独立漂移,消除虚假 β 信号
- Joseph 稳定形式:P 更新保证半正定
- Huber innovation clipping:抗厚尾分布
- Sage-Husa 自适应 R:在线校准噪声
双用途输出:
- kalman_beta → BOCPD 体制检测(via innovation)+ hedge ratio(直接使用)
- P_beta → hedge ratio 置信度(过大时降级为 OLS β)
每根新 4h K线调用 update() 一次。
"""
def __init__(
self,
alpha_init: float,
beta_init: float,
q_alpha: float = 1e-5,
q_beta: float = 1e-4,
r_init: float = 1e-2,
p0_alpha: float = 0.1,
p0_beta: float = 1.0,
r_forget: float = 0.95,
r_floor: float = 1e-8,
clip_sigma: float = 3.0,
):
self.x = np.array([alpha_init, beta_init], dtype=np.float64)
self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
self.P = np.diag([p0_alpha, p0_beta]).astype(np.float64)
self.R = float(r_init)
self._r_forget = r_forget
self._r_floor = r_floor
self._clip_sigma = clip_sigma
self._n_updates = 0
@property
def alpha(self) -> float:
return float(self.x[0])
@property
def beta(self) -> float:
return float(self.x[1])
@property
def beta_variance(self) -> float:
return float(self.P[1, 1])
def update(self, r_btc: float, r_alt: float) -> dict:
"""接收一对新的对数收益率,更新 [α, β] 估计
Returns:
dict with keys: alpha, beta, P_beta, P_alpha,
innovation, normalized_innovation, clipped,
kalman_gain_beta, R
"""
# 预测步
P_pred = self.P + self.Q
H = np.array([1.0, r_btc], dtype=np.float64)
# Innovation
innovation = r_alt - H @ self.x
S = float(H @ P_pred @ H) + self.R
S = max(S, 1e-15)
sqrt_S = S ** 0.5
norm_innov = innovation / sqrt_S
# Huber clipping
clipped = False
innovation_for_update = innovation
if abs(norm_innov) > self._clip_sigma:
innovation_for_update = self._clip_sigma * sqrt_S * (
1.0 if innovation > 0 else -1.0
)
clipped = True
# Kalman 增益 + 更新
K = (P_pred @ H) / S
self.x = self.x + K * innovation_for_update
# Joseph 稳定形式
I2 = np.eye(2)
A = I2 - np.outer(K, H)
self.P = A @ P_pred @ A.T + self.R * np.outer(K, K)
# Sage-Husa R 自适应
self._n_updates += 1
if self._n_updates > 10:
b = self._r_forget
d_t = (1.0 - b) / (1.0 - b ** (self._n_updates + 1))
r_sample = max(innovation * innovation - float(H @ P_pred @ H), self._r_floor)
self.R = (1.0 - d_t) * self.R + d_t * r_sample
self.R = max(self.R, self._r_floor)
return {
'alpha': float(self.x[0]),
'beta': float(self.x[1]),
'P_beta': float(self.P[1, 1]),
'P_alpha': float(self.P[0, 0]),
'innovation': innovation,
'normalized_innovation': norm_innov,
'clipped': clipped,
'kalman_gain_beta': float(K[1]),
'R': self.R,
}
def state_dict(self) -> dict:
return {
'x': self.x.tolist(), 'P': self.P.tolist(), 'Q': self.Q.tolist(),
'R': self.R, '_r_forget': self._r_forget, '_r_floor': self._r_floor,
'_clip_sigma': self._clip_sigma, '_n_updates': self._n_updates,
}
@classmethod
def from_state_dict(cls, d: dict) -> 'VectorKalmanBetaEstimator':
obj = cls.__new__(cls)
obj.x = np.array(d['x'], dtype=np.float64)
obj.P = np.array(d['P'], dtype=np.float64)
obj.Q = np.array(d['Q'], dtype=np.float64)
obj.R = d['R']
obj._r_forget = d['_r_forget']
obj._r_floor = d['_r_floor']
obj._clip_sigma = d['_clip_sigma']
obj._n_updates = d['_n_updates']
return obj
改动函数:calculate_cointegration_params_dual_window()
在现有 OLS 之后,新增 Kalman Filter 输出:
def calculate_cointegration_params_dual_window(
base_klines, alt_klines,
beta_window=None, zscore_window=None,
kalman_state: dict | None = None, # [新增]
):
# ... 现有 OLS 逻辑不变 ...
# ═══ 新增:向量 Kalman Filter 更新 ═══
kalman_result = None
kalman_state_out = kalman_state
if len(aligned) >= 2:
r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])
if kalman_state is not None:
kf = VectorKalmanBetaEstimator.from_state_dict(kalman_state)
else:
ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
kf = VectorKalmanBetaEstimator(
alpha_init=alpha_ols if use_alpha else 0.0,
beta_init=beta_ols,
q_alpha=KALMAN_Q_ALPHA, q_beta=KALMAN_Q_BETA,
r_init=max(ols_residual_var, 1e-6),
p0_alpha=KALMAN_P0_ALPHA, p0_beta=KALMAN_P0_BETA,
r_forget=KALMAN_R_FORGET, r_floor=KALMAN_R_FLOOR,
clip_sigma=KALMAN_CLIP_SIGMA,
)
kalman_result = kf.update(r_btc, r_alt)
kalman_state_out = kf.state_dict()
return {
# ... 现有字段不变 ...
'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
'kalman_clipped': kalman_result['clipped'] if kalman_result else False,
'kalman_state': kalman_state_out,
}
改动函数:analyze_pair_advanced()
透传 kalman_state 并提取 Kalman 输出:
def analyze_pair_advanced(
base_klines, alt_klines,
beta_window=None, zscore_window=None,
enable_health_monitor=True,
stats_period_key=None,
kalman_state: dict | None = None, # [新增]
) -> Dict:
# ...
# 3. New方法:双窗口OLS协整分析 + Kalman
coint_new = calculate_cointegration_params_dual_window(
base_klines, alt_klines, beta_window, zscore_window,
kalman_state=kalman_state, # [新增] 透传
)
if coint_new:
result['cointegration_new'] = {
# ... 现有字段 ...
'kalman_beta': coint_new.get('kalman_beta'), # [新增]
'kalman_P_beta': coint_new.get('kalman_P_beta'), # [新增]
'kalman_innovation': coint_new.get('kalman_innovation'), # [新增]
'kalman_state': coint_new.get('kalman_state'), # [新增]
}
# ...
改动函数:analyze_multi_period()
在返回值中透传 Kalman 输出:
def analyze_multi_period(...) -> Optional[Dict]:
# ...
return {
# ... 现有字段 ...
'kalman_beta': details.get(('4h', '60d'), {}).get(
'cointegration_new', {}).get('kalman_beta'),
'kalman_P_beta': details.get(('4h', '60d'), {}).get(
'cointegration_new', {}).get('kalman_P_beta', 1.0),
'kalman_innovation': details.get(('4h', '60d'), {}).get(
'cointegration_new', {}).get('kalman_innovation', 0.0),
'kalman_state': details.get(('4h', '60d'), {}).get(
'cointegration_new', {}).get('kalman_state'),
}
3.2 BOCPD 贝叶斯在线变点检测(用途 1)
算法原理
维护 run length(距上次变点的步数)的后验分布:
P(r_t | data_1:t)
r_t = 0 → 当前时刻刚发生变点
r_t = n → 距上次变点已过 n 步
递推公式:
1. 预测概率(当前观测在 run length = r 下的似然):
π_t(r) = P(x_t | x_{t-r:t-1})
2. Growth(无变点):
P(r_t = r+1, data) = P(r_{t-1} = r, data) × π_t(r) × (1 - H)
3. Changepoint(变点,run 重置):
P(r_t = 0, data) = Σ_r P(r_{t-1} = r, data) × π_t(r) × H
4. 归一化
其中 H = hazard rate(先验变点概率,默认 1/50 ≈ 每 ~8天一次)。
观测模型:监控 Kalman normalized_innovation 序列,采用 正态-逆Gamma 共轭先验,预测分布为 Student-t:
充分统计量(每个 run 维护):
κ_r = κ₀ + n_r
μ̂_r = (κ₀ × μ₀ + Σ x_i) / κ_r
α_r = α₀ + n_r / 2
β_r = β₀ + 0.5 × (Σ x_i² - κ_r × μ̂_r² + κ₀ × μ₀²)
预测分布: Student-t(2α_r), location=μ̂_r, scale²=β_r(κ_r+1)/(α_r κ_r)
文件:src/trading/config.py
StrategyParams 新增字段:
@dataclass(frozen=True)
class StrategyParams:
# ... 现有字段 ...
# Beta 体制自适应过滤器(Vector Kalman + BOCPD)
beta_regime_enabled: bool = True
beta_regime_hazard_rate: float = 1/50 # 先验变点概率 ≈ 每50根4h K线(~8天)一次
beta_regime_max_run_length: int = 200 # run length 截断(内存控制)
beta_regime_soft_prob: float = 0.3 # P(变点) > 此值 → 开始缩放阈值
beta_regime_hard_prob: float = 0.7 # P(变点) > 此值 → 硬拦截
beta_regime_scale_max: float = 2.0 # 阈值最大缩放倍数
beta_regime_warmup: int = 5 # 最少 N 根 4h K线才开始判定
文件:src/trading/strategy.py
_BetaRegimeState 数据类
@dataclass
class _BetaRegimeState:
"""Beta 体制检测结果"""
regime: str # 'stable' | 'expanding'
changepoint_prob: float # 变点概率 P(r_t < warmup)
expected_run_length: float
kalman_P_beta: float
threshold_scale: float # 阈值缩放因子 (>=1.0)
hard_block: bool
reason: str
_BOCPDDetector 类
class _BOCPDDetector:
"""Bayesian Online Changepoint Detection (Adams & MacKay, 2007)
监控 Kalman normalized_innovation 的分布变化,
通过 run length 后验分布检测 β 的结构性变点。
内存: O(max_run_length) per pair
计算: O(max_run_length) per update
"""
# Normal-Inverse-Gamma 先验超参
_MU0 = 0.0
_KAPPA0 = 1.0
_ALPHA0 = 1.0
_BETA0 = 0.5
def __init__(self, hazard_rate: float = 1/50, max_run_length: int = 200):
self._H = hazard_rate
self._max_rl = max_run_length
self._run_length_probs: dict[PairKey, np.ndarray] = {}
self._suf_stats: dict[PairKey, dict] = {}
self._last_kline_time: dict[PairKey, str] = {}
self._n_updates: dict[PairKey, int] = {}
def update(self, key: PairKey, normalized_innovation: float,
kline_time: str) -> None:
"""每 5m tick 调用,基于 kline_time 去重"""
if normalized_innovation is None:
return
if self._last_kline_time.get(key) == kline_time:
return
self._last_kline_time[key] = kline_time
x = normalized_innovation
# 初始化
if key not in self._run_length_probs:
self._run_length_probs[key] = np.array([1.0])
self._suf_stats[key] = {
'kappa': np.array([self._KAPPA0]),
'mu': np.array([self._MU0]),
'alpha': np.array([self._ALPHA0]),
'beta': np.array([self._BETA0]),
}
self._n_updates[key] = 0
self._n_updates[key] = self._n_updates.get(key, 0) + 1
probs = self._run_length_probs[key]
ss = self._suf_stats[key]
n_rl = len(probs)
# Step 1: 每个 run length 下的 Student-t 预测概率
kappa, mu, alpha, beta = ss['kappa'], ss['mu'], ss['alpha'], ss['beta']
df = 2.0 * alpha
scale = np.sqrt(np.maximum(beta * (kappa + 1.0) / (alpha * kappa), 1e-15))
pred_probs = self._student_t_pdf(x, df, mu, scale)
# Step 2-3: Growth + Changepoint
growth_probs = probs * pred_probs * (1.0 - self._H)
cp_prob = np.sum(probs * pred_probs * self._H)
# Step 4: 组合 + 归一化
new_probs = np.empty(n_rl + 1)
new_probs[0] = cp_prob
new_probs[1:] = growth_probs
total = np.sum(new_probs)
if total > 0:
new_probs /= total
else:
new_probs = np.zeros(n_rl + 1)
new_probs[0] = 1.0
# Step 5: 更新充分统计量
new_kappa = np.empty(n_rl + 1)
new_mu = np.empty(n_rl + 1)
new_alpha = np.empty(n_rl + 1)
new_beta = np.empty(n_rl + 1)
new_kappa[0], new_mu[0] = self._KAPPA0, self._MU0
new_alpha[0], new_beta[0] = self._ALPHA0, self._BETA0
new_kappa[1:] = kappa + 1.0
new_mu[1:] = (kappa * mu + x) / (kappa + 1.0)
new_alpha[1:] = alpha + 0.5
new_beta[1:] = beta + kappa * (x - mu) ** 2 / (2.0 * (kappa + 1.0))
# Step 6: 截断
if len(new_probs) > self._max_rl:
new_probs = new_probs[:self._max_rl]
new_kappa = new_kappa[:self._max_rl]
new_mu = new_mu[:self._max_rl]
new_alpha = new_alpha[:self._max_rl]
new_beta = new_beta[:self._max_rl]
total = np.sum(new_probs)
if total > 0:
new_probs /= total
self._run_length_probs[key] = new_probs
self._suf_stats[key] = {
'kappa': new_kappa, 'mu': new_mu,
'alpha': new_alpha, 'beta': new_beta,
}
@staticmethod
def _student_t_pdf(x: float, df: np.ndarray, loc: np.ndarray,
scale: np.ndarray) -> np.ndarray:
z = (x - loc) / scale
from scipy.special import gammaln
log_pdf = (
gammaln((df + 1) / 2) - gammaln(df / 2)
- 0.5 * np.log(df * np.pi) - np.log(scale)
- (df + 1) / 2 * np.log(1 + z * z / df)
)
return np.exp(log_pdf)
def check(self, key: PairKey, soft_prob: float, hard_prob: float,
scale_max: float, warmup: int) -> _BetaRegimeState:
"""检测当前 Beta 体制状态
变点概率定义: P(r_t < warmup) — 最近 warmup 步内是否有变点
"""
n = self._n_updates.get(key, 0)
if n < warmup:
return _BetaRegimeState('stable', 0.0, float(n), 0.0, 1.0, False, "数据不足")
probs = self._run_length_probs.get(key)
if probs is None or len(probs) == 0:
return _BetaRegimeState('stable', 0.0, 0.0, 0.0, 1.0, False, "无数据")
cp_prob = float(np.sum(probs[:min(warmup, len(probs))]))
expected_rl = float(np.sum(np.arange(len(probs)) * probs))
if cp_prob >= hard_prob:
return _BetaRegimeState(
'expanding', cp_prob, expected_rl, 0.0, scale_max, True,
f"Beta硬拦截: P(变点)={cp_prob:.3f}>={hard_prob} E[rl]={expected_rl:.1f}"
)
if cp_prob >= soft_prob:
import math
midpoint = (soft_prob + hard_prob) / 2
steepness = 6.0 / max(hard_prob - soft_prob, 0.01)
t = 1.0 / (1.0 + math.exp(-steepness * (cp_prob - midpoint)))
scale = 1.0 + t * (scale_max - 1.0)
return _BetaRegimeState(
'expanding', cp_prob, expected_rl, 0.0, scale, False,
f"Beta缩放: P(变点)={cp_prob:.3f} scale={scale:.2f} E[rl]={expected_rl:.1f}"
)
return _BetaRegimeState(
'stable', cp_prob, expected_rl, 0.0, 1.0, False,
f"Beta稳定: P(变点)={cp_prob:.3f} E[rl]={expected_rl:.1f}"
)
def cleanup_pair(self, key: PairKey) -> None:
self._run_length_probs.pop(key, None)
self._suf_stats.pop(key, None)
self._last_kline_time.pop(key, None)
self._n_updates.pop(key, None)
3.3 跨配对系统性风险聚合
文件:src/trading/strategy.py
class _SystemicRiskAggregator:
"""统计所有配对的 BOCPD 体制状态,
超过 systemic_threshold 比例的配对处于 expanding → 触发全局拦截。
无自身状态,每次实时计算。
"""
def __init__(self, enabled: bool = True):
self._enabled = enabled
def check(self, pair_states: dict[PairKey, _BetaRegimeState],
systemic_threshold: float = 0.3) -> tuple[bool, str]:
if not self._enabled or not pair_states:
return False, ""
total = len(pair_states)
n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
ratio = n_expanding / total
avg_cp = sum(s.changepoint_prob for s in pair_states.values()) / total
if ratio >= systemic_threshold:
return True, (
f"系统性风险: {n_expanding}/{total} ({ratio:.0%}) 配对扩张态, "
f"平均P(变点)={avg_cp:.3f}"
)
return False, ""
3.4 Beta 加权仓位计算(用途 2)
设计原理
标准配对交易的对冲方程:
spread_t = log(P_alt_t) - β × log(P_base_t) - α
要使 spread 对 P_base 的变动无敞口,两腿名义价值须满足:
alt_notional = |β| × base_notional
当前系统使用 alt_notional = base_notional(等额),等价于隐含 β=1,对所有配对都错。
hedge_beta 选择逻辑
def resolve_hedge_beta(
kalman_beta: float | None,
kalman_P_beta: float | None,
ols_beta: float | None,
p_beta_max: float = HEDGE_BETA_P_MAX,
beta_min: float = HEDGE_BETA_MIN,
beta_max: float = HEDGE_BETA_MAX,
) -> tuple[float, str]:
"""选择用于仓位计算的 hedge_beta
优先 Kalman β,不确定性过高时降级为 OLS β,最后兜底 β=1.0
Returns:
(hedge_beta, source) — source: 'kalman' | 'ols' | 'default'
"""
# 优先使用 Kalman β
if kalman_beta is not None and kalman_P_beta is not None:
if kalman_P_beta <= p_beta_max:
beta = np.clip(abs(kalman_beta), beta_min, beta_max)
return float(beta), 'kalman'
# P_β 过大:Kalman 不确定性高(冷启动或体制切换中),降级
logger.debug(f"Kalman P_β={kalman_P_beta:.4f} > {p_beta_max},降级为 OLS β")
# 降级使用 OLS β
if ols_beta is not None:
beta = np.clip(abs(ols_beta), beta_min, beta_max)
return float(beta), 'ols'
# 兜底
return 1.0, 'default'
文件:src/trading/risk_manager.py
改动 calculate_position_size() 增加 hedge_beta 参数:
def calculate_position_size(
self,
signal: PairTradeSignal,
alt_price: float,
base_price: float = 0.0,
available_balance: float = 0.0,
hedge_beta: float = 1.0, # [新增] hedge ratio
hedge_beta_source: str = '', # [新增] 来源标记 ('kalman'/'ols'/'default')
) -> tuple[float, float]:
"""计算仓位大小(Beta 加权)
Beta 加权逻辑:
base_notional = total_position / (1 + |β|)
alt_notional = |β| × base_notional
总名义价值 = base_notional + alt_notional = total_position(保持不变)
当 β=1.0 时退化为等额(向后兼容)。
"""
# ... 现有 base_usd / 信号强度缩放 / 上限检查 / 余额检查 逻辑不变 ...
# 转换为币种数量(Beta 加权)
alt_size = 0.0
base_size = 0.0
if self._config.pair_mode == "pair" and base_price > 0 and alt_price > 0:
abs_beta = max(abs(hedge_beta), 0.1)
total_position = position_usd * 2 # 两腿总名义(原逻辑中每腿 position_usd)
base_notional = total_position / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
alt_size = alt_notional / alt_price
base_size = base_notional / base_price
logger.info(
f"仓位计算(β加权): 基础=${base_usd:.0f} × 缩放={scale} | "
f"β={hedge_beta:.3f}({hedge_beta_source}) | "
f"Alt: {alt_size:.2f} × ${alt_price:.4f} ≈ ${alt_notional:.0f} | "
f"Base: {base_size:.2f} × ${base_price:.4f} ≈ ${base_notional:.0f} | "
f"比例 {alt_notional/base_notional:.2f}:1"
)
elif alt_price > 0:
alt_size = position_usd / alt_price
logger.info(
f"仓位计算(单腿): 基础=${base_usd:.0f} × 缩放={scale} = ${position_usd:.0f} | "
f"开仓: {alt_size:.2f} × ${alt_price:.4f}"
)
return alt_size, base_size
β 加权的名义分配数学:
| β 值 | base_notional | alt_notional | 比例 |
|---|---|---|---|
| 0.3 | $154 | $46 | 0.3:1 |
| 0.5 | $133 | $67 | 0.5:1 |
| 1.0 | $100 | $100 | 1.0:1(等额,向后兼容) |
| 2.0 | $67 | $133 | 2.0:1 |
| 3.0 | $50 | $150 | 3.0:1 |
(以每腿 $100 基准,total_position=$200 为例)
文件:src/trading/models.py
PairTradeSignal 新增字段:
@dataclass
class PairTradeSignal:
# ... 现有字段 ...
hedge_beta: float = 1.0 # [新增] 用于仓位比例的 β
hedge_beta_source: str = 'default' # [新增] β 来源 ('kalman'/'ols'/'default')
PairPosition 新增字段:
@dataclass
class PairPosition:
# ... 现有字段 ...
entry_hedge_beta: float = 1.0 # [新增] 入场时的 hedge β(用于复盘分析)
3.5 编排层集成
文件:src/trading/orchestrator.py
新增状态:_kalman_states: dict[PairKey, dict]
class TradingOrchestrator:
def __init__(self, ...):
# ... 现有 ...
self._kalman_states: dict[PairKey, dict] = {} # [新增] Kalman 状态持久化
process_analysis() 改动:从 multi_period_result 提取 Kalman 输出,传给 strategy.process_tick() 和 on_entry_signal():
def process_analysis(self, symbol, z4h, multi_period_result, timestamp, ...):
# ... 现有输入验证 ...
# [新增] 提取 Kalman 输出
kalman_beta = multi_period_result.get('kalman_beta')
kalman_P_beta = multi_period_result.get('kalman_P_beta')
kalman_innovation = multi_period_result.get('kalman_innovation')
# [新增] 更新 Kalman 状态缓存
kalman_state_new = multi_period_result.get('kalman_state')
if kalman_state_new is not None:
self._kalman_states[(symbol, base_symbol)] = kalman_state_new
entry_signal, exit_signal = self._strategy.process_tick(
symbol, base_symbol, z4h, timestamp,
kline_time=kline_time, latest_price=price_for_log,
kalman_innovation=kalman_innovation, # [新增] 用途 1
kalman_P_beta=kalman_P_beta, # [新增] 用途 1
alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv,
)
# 处理入场信号
if entry_signal is not None:
# [新增] 计算 hedge_beta(用途 2)
ols_beta = multi_period_result.get('details', {}).get(
('4h', '60d'), {}).get('cointegration_new', {}).get('beta')
hedge_beta, hedge_source = resolve_hedge_beta(
kalman_beta, kalman_P_beta, ols_beta
)
self.on_entry_signal(
symbol, multi_period_result,
latest_alt_price=price_for_log,
direction=entry_signal.direction,
adaptive_z=entry_signal.adaptive_z,
hedge_beta=hedge_beta, # [新增]
hedge_beta_source=hedge_source, # [新增]
)
on_entry_signal() 改动:传递 hedge_beta 到信号:
def on_entry_signal(
self, symbol, multi_period_result,
latest_alt_price=None, avg_zscore_4h=None,
direction="", signal_strength="medium", adaptive_z=0.0,
l2_snapshot=None,
hedge_beta: float = 1.0, # [新增]
hedge_beta_source: str = 'default', # [新增]
) -> bool:
# ...
signal = PairTradeSignal(
# ... 现有字段 ...
hedge_beta=hedge_beta, # [新增]
hedge_beta_source=hedge_beta_source, # [新增]
)
# ...
文件:src/trading/position_manager.py
_open_position_inner() 改动:传递 hedge_beta 到 risk_manager:
def _open_position_inner(self, signal: PairTradeSignal, adaptive_z: float = 0.0):
# ... 现有价格获取 ...
alt_size, base_size = self._risk_manager.calculate_position_size(
signal, alt_price, base_price, available_balance,
hedge_beta=signal.hedge_beta, # [新增]
hedge_beta_source=signal.hedge_beta_source, # [新增]
)
position = PairPosition(
# ... 现有字段 ...
entry_hedge_beta=signal.hedge_beta, # [新增] 记录入场 β
)
3.6 策略层集成(用途 1)
文件:src/trading/strategy.py
__init__:
self._bocpd = _BOCPDDetector(
hazard_rate=default_params.beta_regime_hazard_rate,
max_run_length=default_params.beta_regime_max_run_length,
)
self._systemic_aggregator = _SystemicRiskAggregator(enabled=True)
self._all_pair_states: dict[PairKey, _BetaRegimeState] = {}
process_tick() 签名新增:
def process_tick(
self, symbol, base_symbol, z4h, timestamp,
kline_time=None, latest_price=None,
kalman_innovation: float | None = None, # [新增] 用途 1
kalman_P_beta: float | None = None, # [新增] 用途 1
alt_ohlcv=None, base_ohlcv=None,
) -> tuple[EntrySignal | None, ExitSignal | None]:
_process_tick_unlocked 新 K 线更新块:
if is_new_candle:
# ... 现有: Welford 更新, EMA 更新 ...
if kalman_innovation is not None and kline_time is not None:
self._bocpd.update(key, kalman_innovation, str(kline_time))
_check_entry() — z4h 过滤之后、方向判断之前:
# ── [新增] Beta 体制检查 ──
if params.beta_regime_enabled:
beta_state = self._bocpd.check(
key,
soft_prob=params.beta_regime_soft_prob,
hard_prob=params.beta_regime_hard_prob,
scale_max=params.beta_regime_scale_max,
warmup=params.beta_regime_warmup,
)
self._all_pair_states[key] = beta_state
if beta_state.hard_block:
logger.info(f"Beta体制硬拦截 | {pair_label} | {beta_state.reason} | "
f"az={adaptive_z:+.4f} z4h={z4h:+.4f}")
return None
is_systemic, systemic_reason = self._systemic_aggregator.check(self._all_pair_states)
if is_systemic:
logger.info(f"系统性风险拦截 | {pair_label} | {systemic_reason}")
return None
threshold_scale = beta_state.threshold_scale
else:
beta_state = None
threshold_scale = 1.0
# ── 方向判断(应用 Beta 缩放)──
threshold = params.adaptive_threshold * threshold_scale
if adaptive_z < -threshold:
direction = 'long'
elif adaptive_z > threshold:
direction = 'short'
else:
if threshold_scale > 1.01:
logger.info(f"Beta体制缩放拦截 | {pair_label} | "
f"az={adaptive_z:+.4f} 有效阈值={threshold:.2f} "
f"(原始={params.adaptive_threshold} ×{threshold_scale:.2f}) | "
f"{beta_state.reason if beta_state else ''}")
return None
cleanup_pair():
self._bocpd.cleanup_pair(key)
self._all_pair_states.pop(key, None)
4. 完整数据流
1. WebSocket 4h K线闭合
↓
2. realtime_kline_service_base 触发分析
↓
3. analyze_multi_period()
├─ calculate_cointegration_params_dual_window(kalman_state=缓存)
│ ├─ OLS 回归 → β_ols, spread, adf_pvalue(现有)
│ └─ [新增] Kalman update → kalman_beta, innovation, P_β, kalman_state
├─ 健康监控 → Gate2(现有)
└─ 输出: multi_period_result(含 kalman_* 字段)
↓
4. orchestrator.process_analysis()
├─ 缓存 kalman_state → self._kalman_states[pair_key]
├─ 传递 kalman_innovation, kalman_P_beta → strategy.process_tick()
│ ├─ [用途 1] BOCPD.update(innovation) → 体制检测
│ └─ [用途 1] _check_entry() → 硬拦截 / 阈值缩放
├─ 若产生 EntrySignal:
│ ├─ [用途 2] resolve_hedge_beta(kalman_beta, P_β, ols_beta) → hedge_beta
│ └─ on_entry_signal(hedge_beta=...) → PairTradeSignal
↓
5. position_manager.open_position(signal)
├─ risk_manager.calculate_position_size(hedge_beta=signal.hedge_beta)
│ ├─ base_notional = total / (1 + |β|)
│ └─ alt_notional = |β| × base_notional
├─ executor.market_open(alt_size, base_size)
└─ PairPosition(entry_hedge_beta=β)
↓
6. 持久化
├─ pair_positions: 含 entry_hedge_beta
└─ trading_signals: 含 hedge_beta, hedge_beta_source
5. 场景模拟
A:正常市场(β 稳定) — 两个用途均正常
innovation ~ N(0,1): z = [-0.3, 0.8, -0.5, 0.2, ...]
kalman_beta ≈ 0.45, P_β < 0.01
用途 1: BOCPD P(变点) ≈ 0.02~0.05 → regime=STABLE, scale=1.0 → 正常入场
用途 2: hedge_beta=0.45(kalman) → alt_notional = 0.45 × base_notional
比等额更少的 Alt 暴露,正确反映 β<1 的弱相关性
B:β 开始飙升 — 用途 1 拦截,用途 2 自动调整
T=0h: kalman_beta ≈ 0.5 → 正常
T=12h: kalman_beta ≈ 1.2, innovation 持续偏正
用途 1: P(变点) ≈ 0.35 > soft_prob → EXPANDING, scale≈1.1
用途 2: hedge_beta=1.2 → Alt 腿名义增大(如果入场的话)
T=16h: P(变点) ≈ 0.75 > hard_prob → 硬拦截
用途 2: 不会执行(入场被拦截)
C:β 飙升后企稳 — 恢复后使用新 β
飙升期 P(变点) = 0.85 → 硬拦截
企稳后 innovation 恢复 N(0,1):
→ ~6-8 根正常 K线(24-32h)后 P(变点) < soft_prob → 恢复
恢复后:
用途 1: regime=STABLE → 允许入场
用途 2: kalman_beta 已追踪到新 β(如 1.8),开仓自动使用新比例
不像 OLS 还在用旧 β≈0.6
D:β≈0.3 的弱相关配对 — hedge ratio 纠偏最显著
OLS β ≈ 0.31,Kalman β ≈ 0.29
等额开仓(当前系统):
Alt $100 + Base $100
Alt 的非系统性风险暴露 = $100 - 0.3×$100 = $70(多余)
β 加权开仓:
base_notional = $200 / 1.3 ≈ $154
alt_notional = 0.3 × $154 ≈ $46
→ Alt 仅 $46,正确对冲 Base $154 的 0.3 倍
E:Kalman 冷启动 — 降级策略
系统重启,kalman_state 丢失:
T=0: 用 OLS β 初始化 Kalman,P₀ = diag(0.1, 1.0)
P_β = 1.0 > HEDGE_BETA_P_MAX(0.5)
→ resolve_hedge_beta 降级为 OLS β
T=3-5 根 K线(12-20h):
Kalman 收敛,P_β < 0.1
→ resolve_hedge_beta 切回 Kalman β
F:系统性风险
20 配对中 8 个 P(变点) > 0.3 → 40% > systemic_threshold=30%
→ 系统性风险拦截 → 所有配对暂停入场
→ hedge_beta 仍在后台更新,恢复后使用最新值
6. 改动文件清单
| 文件 | 改动 | 影响范围 |
|---|---|---|
src/config.py |
+7 Kalman 常量 + 3 Hedge Ratio 常量 | 配置层 |
src/utils/analysis/analysis_core.py |
+VectorKalmanBetaEstimator 类;增强 calculate_cointegration_params_dual_window(), analyze_pair_advanced(), analyze_multi_period() |
分析层 |
src/trading/config.py |
StrategyParams +7 字段;增强 get_strategy_params(), _build_strategy_params(), load_trading_config() |
配置层 |
src/trading/models.py |
PairTradeSignal +2 字段 (hedge_beta, hedge_beta_source);PairPosition +1 字段 (entry_hedge_beta) |
数据模型 |
src/trading/orchestrator.py |
+_kalman_states 字典;增强 process_analysis() 提取 Kalman 输出 + 计算 hedge_beta;增强 on_entry_signal() 传递 hedge_beta |
编排层 |
src/trading/strategy.py |
+_BetaRegimeState, _BOCPDDetector, _SystemicRiskAggregator;增强 process_tick(), _check_entry(), cleanup_pair() |
策略层 |
src/trading/risk_manager.py |
增强 calculate_position_size() 支持 β 加权 |
风控层 |
src/trading/position_manager.py |
增强 _open_position_inner() 传递 hedge_beta |
仓位管理 |
新增依赖:scipy.special.gammaln(scipy 已在项目中)
不改动:momentum_filter.py, executor.py
7. DB 改动
pair_positions 表新增列
ALTER TABLE pair_positions ADD COLUMN entry_hedge_beta DOUBLE PRECISION DEFAULT 1.0;
trading_signals 表新增列
ALTER TABLE trading_signals ADD COLUMN hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta_source VARCHAR(10) DEFAULT 'default';
8. 日志
| 时机 | 级别 | 格式 |
|---|---|---|
| 初始化 | INFO | Beta体制跟踪器初始化 | BOCPD hazard=0.020 soft=0.30 hard=0.70 |
| Kalman 更新 | DEBUG | Kalman更新 | PURR|HYPE | α̂=0.002 β̂=1.20 P_β=0.005 innov=2.85 |
| hedge_beta 选择 | DEBUG | hedge_beta=0.45(kalman) P_β=0.003 | OLS_β=0.48 |
| hedge_beta 降级 | DEBUG | Kalman P_β=0.62 > 0.50,降级为 OLS β=0.48 |
| β 加权仓位 | INFO | 仓位计算(β加权) | β=0.45(kalman) | Alt ≈$62 | Base ≈$138 | 比例 0.45:1 |
| 硬拦截 | INFO | Beta体制硬拦截 | PURR|HYPE | P(变点)=0.850>=0.70 E[rl]=1.2 |
| 缩放拦截 | INFO | Beta体制缩放拦截 | PURR|HYPE | az=-4.50 有效阈值=4.00 (3.0×1.33) |
| 系统性风险 | INFO | 系统性风险: 8/20 (40%) 配对扩张态 |
9. 风险与边界条件
| 风险 | 严重度 | 缓解 |
|---|---|---|
| 已持仓 hedge ratio 偏离 | 高 | 本次仅在入场时计算 β 比例;持仓期间 rebalance 作为 P0 后续 |
| β 符号翻转(kalman_beta < 0) | 中 | resolve_hedge_beta 取绝对值 + 下限保护 HEDGE_BETA_MIN=0.1 |
| Kalman Q 不适配 | 中 | R 自适应 + Huber clipping + 回测调优 |
| 重启丢失 Kalman 状态 | 中 | OLS 重新初始化,3-5 根 K线收敛;P_β 过大时自动降级为 OLS β |
| BOCPD 冷启动 | 中 | warmup=5(20h)内保持 stable 默认 |
| 低波动期 r_btc≈0 | 低 | H=[1,0] 只更新 α,符合物理意义;hedge_beta 保持上一值 |
| β 加权导致极端仓位 | 低 | HEDGE_BETA_MIN=0.1, HEDGE_BETA_MAX=5.0 上下限保护 |
不在本次范围:退场逻辑调整、持仓期间 hedge rebalance、HMM 体制分类、Kalman 持久化到 DB、飞书告警、Q 自适应
10. 验证方案
单元测试
# test_vector_kalman.py
import numpy as np
def test_kalman_convergence():
"""从初始值收敛到真实 [α, β]"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(200):
r_btc = rng.normal(0, 0.02)
r_alt = 0.001 + 1.0 * r_btc + rng.normal(0, 0.01)
result = kf.update(r_btc, r_alt)
assert abs(result['beta'] - 1.0) < 0.15
assert result['alpha'] > 0
def test_kalman_innovation_spike():
"""β 突变时 innovation 立即飙升"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
result = kf.update(0.02, 3.0 * 0.02)
assert abs(result['normalized_innovation']) > 2.0
def test_kalman_huber_clipping():
"""极端 innovation 被截断,β 不过度跳变"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2, clip_sigma=3.0)
rng = np.random.default_rng(42)
for _ in range(50):
kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
result = kf.update(0.02, 20.0 * 0.02)
assert result['clipped'] is True
assert result['beta'] < 5.0
def test_kalman_joseph_positive():
"""Joseph 形式保证 P 始终非负"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-6)
rng = np.random.default_rng(42)
for _ in range(500):
result = kf.update(rng.normal(0, 0.05), 0.5 * rng.normal(0, 0.05) + rng.normal(0, 0.001))
assert result['P_beta'] >= 0 and result['P_alpha'] >= 0
def test_kalman_alpha_absorbs_drift():
"""α 吸收独立漂移,β 不被污染"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
rng = np.random.default_rng(42)
for _ in range(200):
result = kf.update(rng.normal(0, 0.02), 0.005 + 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
assert abs(result['beta'] - 0.5) < 0.2
assert result['alpha'] > 0.001
def test_kalman_state_persistence():
"""状态导出/恢复后行为一致"""
from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
kf1 = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
kf1.update(0.02, 0.01)
kf2 = VectorKalmanBetaEstimator.from_state_dict(kf1.state_dict())
r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
assert abs(r1['beta'] - r2['beta']) < 1e-10
# test_hedge_beta.py
import numpy as np
def test_resolve_hedge_beta_kalman():
"""Kalman β 优先,P_β 合格时使用"""
from src.trading.orchestrator import resolve_hedge_beta
beta, source = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.01, ols_beta=0.5)
assert source == 'kalman'
assert abs(beta - 0.45) < 0.01
def test_resolve_hedge_beta_fallback_ols():
"""P_β 过大时降级为 OLS"""
from src.trading.orchestrator import resolve_hedge_beta
beta, source = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.8, ols_beta=0.5)
assert source == 'ols'
assert abs(beta - 0.5) < 0.01
def test_resolve_hedge_beta_fallback_default():
"""无 β 时兜底 1.0"""
from src.trading.orchestrator import resolve_hedge_beta
beta, source = resolve_hedge_beta(kalman_beta=None, kalman_P_beta=None, ols_beta=None)
assert source == 'default'
assert beta == 1.0
def test_resolve_hedge_beta_clipping():
"""β 超限被裁剪"""
from src.trading.orchestrator import resolve_hedge_beta
beta, _ = resolve_hedge_beta(kalman_beta=10.0, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 5.0 # HEDGE_BETA_MAX
beta, _ = resolve_hedge_beta(kalman_beta=0.01, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 0.1 # HEDGE_BETA_MIN
def test_position_size_beta_weighted():
"""β 加权仓位:alt_notional = β × base_notional"""
# β=0.5, total=200 → base=133.3, alt=66.7
abs_beta = 0.5
total = 200.0
base_notional = total / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
assert abs(base_notional - 133.33) < 0.5
assert abs(alt_notional - 66.67) < 0.5
assert abs(base_notional + alt_notional - total) < 0.01
def test_position_size_beta_one():
"""β=1.0 退化为等额(向后兼容)"""
abs_beta = 1.0
total = 200.0
base_notional = total / (1.0 + abs_beta)
alt_notional = abs_beta * base_notional
assert abs(base_notional - 100.0) < 0.01
assert abs(alt_notional - 100.0) < 0.01
def test_position_size_negative_beta():
"""负 β 取绝对值"""
from src.trading.orchestrator import resolve_hedge_beta
beta, _ = resolve_hedge_beta(kalman_beta=-0.8, kalman_P_beta=0.01, ols_beta=0.5)
assert beta == 0.8 # abs(-0.8)
# test_bocpd.py
import numpy as np
def test_bocpd_stable():
from src.trading.strategy import _BOCPDDetector
d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
rng = np.random.default_rng(42)
for i in range(20):
d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
assert s.regime == 'stable' and s.changepoint_prob < 0.3
def test_bocpd_detects_shift():
from src.trading.strategy import _BOCPDDetector
d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
rng = np.random.default_rng(42)
for i in range(10):
d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
for i in range(10, 18):
d.update(key, rng.normal(3.0, 1), f"2024-01-02T{(i-10)*4:02d}:00:00")
s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
assert s.regime == 'expanding' and s.changepoint_prob > 0.3
def test_bocpd_hard_block():
from src.trading.strategy import _BOCPDDetector
d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
rng = np.random.default_rng(42)
for i in range(10):
d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
for i in range(10, 20):
d.update(key, rng.normal(5.0, 0.5), f"2024-01-02T{(i-10)*4:02d}:00:00")
s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
assert s.hard_block and s.changepoint_prob > 0.7
def test_bocpd_recovery():
from src.trading.strategy import _BOCPDDetector
d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
rng = np.random.default_rng(42)
for i in range(10):
d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
for i in range(10):
d.update(key, rng.normal(4.0, 1), f"2024-01-02T{i*4:02d}:00:00")
assert d.check(key, 0.3, 0.7, 2.0, 5).regime == 'expanding'
for i in range(15):
d.update(key, rng.normal(0, 1), f"2024-01-03T{i*4:02d}:00:00")
s = d.check(key, 0.3, 0.7, 2.0, 5)
assert s.regime == 'stable' and s.changepoint_prob < 0.3
def test_bocpd_timestamp_dedup():
from src.trading.strategy import _BOCPDDetector
d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
for _ in range(48):
d.update(key, 5.0, "2024-01-01T00:00:00")
assert d.check(key, 0.3, 0.7, 2.0, 5).reason == "数据不足"
# test_systemic_risk.py
def test_systemic_risk_trigger():
from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
agg = _SystemicRiskAggregator(enabled=True)
states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
10.0, 0.01, 1.5 if i < 4 else 1.0, False, ""
) for i in range(10)}
assert agg.check(states, 0.3)[0] # 4/10 = 40% > 30%
def test_systemic_risk_no_trigger():
from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
agg = _SystemicRiskAggregator(enabled=True)
states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
'expanding' if i < 2 else 'stable', 0.5 if i < 2 else 0.1,
10.0, 0.01, 1.5 if i < 2 else 1.0, False, ""
) for i in range(10)}
assert not agg.check(states, 0.3)[0] # 2/10 = 20% < 30%
集成验证
- 回测:在 β 飙升的历史区间回测,验证检测延迟和假阳性率;对比等额 vs β 加权的 PnL 差异
- 实盘观察:监控 kalman_beta vs OLS β 的偏离度、hedge_beta_source 分布、P_β 走势
- A/B 对比(可选):同时运行等额和 β 加权,对比 hedge 效果和回撤
11. 后续演进
| 优先级 | 方向 | 预期收益 |
|---|---|---|
| P0 | 退场保护(β 飙升时收紧止损) | 减少已持仓亏损 |
| P0 | 持仓期间 hedge rebalance(β 偏离阈值时调整两腿比例) | 维持对冲质量 |
| P1 | Kalman 状态持久化到 DB | 消除 20h 冷启动窗口 |
| P1 | Innovation-based Q 自适应 | 不同币对自动最优 Q |
| P2 | HMM 体制分类 | 更精细的多体制缩放控制 |
| P2 | 系统性风险分级(警告/严重/紧急) | 更精细的全局风控 |
| P3 | 飞书告警 | 体制切换和 hedge_beta 变化时人工监控 |