开仓动量过滤器设计方案6

开仓动量过滤器设计方案

版本:v5.1
日期:2026-03-06
适用系统:Hyperliquid 量化交易系统(Adaptive Bollinger Z-Score 配对策略)
升级摘要:v5.0 → v5.1 BOCPD 贝叶斯后验替代 sigmoid、NIG 先验优化、CUSUM Huber 鲁棒化、BTC 滞后因果因子、MTF 连续加权、OU Hurwicz 偏差校正、Spread 回望窗口扩展、元参数降维路线图


目录

  1. 背景与问题定义
  2. 算法选择与学术依据
  3. 算法具体实现
  4. 融入当前交易体系的方案
  5. 与当前交易风格的契合度分析
  6. 与四项开单约束的对应关系
  7. 参数配置指南
  8. 风险与局限性

1. 背景与问题定义

1.1 交易系统现状

当前系统基于 Adaptive Bollinger Z-Score 配对策略,核心逻辑是协整对的 spread 均值回归:

  • adaptive_z 突破 adaptive_threshold 时产生入场信号
  • adaptive_z 回归至 entry_adaptive_z * reversion_factor 时平仓
  • 使用 5 分钟 K 线,最大持仓时间控制在 72 小时以内(max_hold_hours=72.0
  • 策略引擎 _check_entry() 已有冷却期、突破检测、持仓检查、z4h 绝对值过滤四层前置过滤

1.2 待解决的问题

均值回归策略在以下场景存在明显风险:

场景 问题 后果
资产处于趋势机制(Hurst > 0.6) 均值回归假设本身失效 spread 持续扩张,系统性亏损
单边持续上涨 N 根 K 线后 追涨做多 趋势过度延伸,极易被反转砸穿
单边持续下跌 N 根 K 线后 追跌做空 跌幅充分释放,反弹风险极高
短时间内迅速暴涨 做空对抗强动量 被轧空,止损代价极大
短时间内迅速暴跌 做多接飞刀 继续下杀,止损代价极大

1.3 四项约束目标

约束 1:连续下跌 -> 不追跌(不做空)
约束 2:连续上涨 -> 不追涨(不做多)
约束 3:迅速暴涨 -> 不做空
约束 4:迅速暴跌 -> 不做多

1.4 过滤维度

维度 检查对象 原因
Alt 腿(单腿) alt symbol 的价格动量 直接持仓标的的风险
Base 腿(单腿) base symbol 的价格动量 配对交易中 base 腿方向相反,同样承担动量风险
Spread(配对层) 两腿价差的趋势性 单腿动量不等于 spread 动量;两腿同涨但 spread 稳定时不应误杀

1.5 v5.0 算法局限性分析(v5.1 升级动机)

维度 v5.0 做法 局限性 v5.1 解决方案
BOCPD P(trending|r) sigmoid(z_r - 2.0) 经验映射 sigmoid 中心点 2.0 和斜率无统计校准,丢弃 NIG 后验分析能力 贝叶斯后验概率 P(|mu| > delta | NIG),直接利用共轭后验
NIG 先验 alpha0 alpha0=1.0(自由度 2) 先验过宽,Student-t 预测分布尾部极重,预热期数值不稳定 alpha0=3.0(自由度 6),更合理的先验
CUSUM z 标准化 z = ret / rs_vol(假设高斯) 加密收益率 kurtosis >> 3,极端尾部事件导致 CUSUM 误触发 Huber 化 clip z 值,减少极端值影响
BTC 因子 当期 CUSUM stress(同期相关) 因果方向不明确,BTC 与 alt 可能同时反应同一事件 滞后 BTC stress(过去 3~5 根 K 线),建立因果时序
MTF 阈值修正 阶梯函数 0.80 或 1.00 ER=0.49 和 0.51 之间不连续跳变 连续加权函数,线性插值消除不连续性
OU 半衰期估计 OLS 无偏差校正 小样本 Hurwicz 偏差导致半衰期系统性高估 15-25% 一阶 Hurwicz 偏差校正
Spread 回望窗口 spread_lookback=20(仅 19 个 OLS 观测值) PP/KPSS/OU 估计的统计功效不足 spread_lookback=40,样本量翻倍

2. 算法选择与学术依据

2.0 Layer 0:BOCPD 概率化机制检测 + DFA-2 后备

动机:从点估计到概率推断

v4.0 中 Layer 0 使用 DFA-2 估算 Hurst 指数,再阈值判断。核心问题:60 根 5min K 线只有 5-7 个尺度点,Hurst 估计的标准误极大。R-squared 质检缓解但未根治——它只能"放弃不可靠的估计",不能"让估计变得可靠"。

更根本的问题是:阈值判断丢弃了不确定性信息。Hurst=0.62(刚过 0.60)和 Hurst=0.85(远超 0.60)触发相同的硬拦截,但风险完全不同。

v5.0 将 Layer 0 重新定义为 "P(均值回归机制失效) > threshold",使用 Bayesian Online Changepoint Detection(BOCPD)实现概率化推断。

候选算法深度对比

算法 核心思路 优点 缺点 5min K 线适用性
DFA-2 + R-squared 多尺度二阶去趋势 RMS 波动 对抛物线趋势鲁棒 点估计,短序列标准误大 一般
HMM (Hamilton 1989) 隐马尔可夫模型,两状态 直接建模机制切换概率 需 EM 训练,批量方法,参数不稳定 需要离线训练
Wavelet Leaders 小波多分辨率 Hurst 短序列更稳定 实现复杂,依赖 PyWavelets 较好
BOCPD (Adams & MacKay 2007) 贝叶斯在线变点检测 + NIG 共轭先验 输出后验概率,在线 O(R) 更新,天然量化不确定性 需调 hazard_rate 最适合

选择:BOCPD + DFA-2 混合机制检测

学术背景:

Adams & MacKay (2007) 提出的 BOCPD 使用贝叶斯框架在线检测时间序列的分布变化。对每个时刻,维护一个"运行长度"(当前机制持续多久)的后验分布。使用 Normal-Inverse-Gamma(NIG)共轭先验,对高斯观测的预测分布为 Student-t,可解析更新,无需 MCMC。

为什么 BOCPD 优于 DFA-2:

  1. 概率输出 vs 点估计:BOCPD 输出 P(trending) in [0,1],而非 Hurst > 0.6 的二值判断
  2. 天然不确定性量化:数据不足时 P(trending) 趋向先验,不会产生假信号
  3. 在线 O(R) 更新:每根 K 线更新一次,R=200(截断),计算开销 << DFA-2
  4. 无固定窗口:DFA-2 需要固定 60 根窗口;BOCPD 自适应检测机制长度
  5. 机制切换检测:DFA-2 检测"当前是否趋势";BOCPD 额外检测"机制是否刚切换"

BOCPD 核心算法:

输入:对数收益率序列 {r_t},hazard_rate H(变点先验概率,默认 0.01)
先验:Normal-Inverse-Gamma (mu0=0, kappa0=1, alpha0=3, beta0=0.01)
      alpha0=3 → Student-t 自由度 6(v5.1:比 v5.0 的 alpha0=1 更稳定,
      减少预热期数值不稳定,同时保留足够的重尾容忍度)

每个时刻 t:
  对每个运行长度 r = 0, 1, ..., R:
    1. 预测概率:pi(r_t | r) = Student-t_{2*alpha_r}(r_t | mu_r, beta_r*(kappa_r+1)/(alpha_r*kappa_r))
    2. 增长概率:P(r_t = r+1) = P(r_{t-1} = r) * pi(r_t | r) * (1-H)
    3. 变点概率:P(r_t = 0) = Sum_r P(r_{t-1} = r) * pi(r_t | r) * H
    4. 更新 NIG 充分统计量:
       kappa' = kappa + 1,  mu' = (kappa*mu + r_t) / kappa'
       alpha' = alpha + 0.5,  beta' = beta + kappa*(r_t - mu)^2 / (2*kappa')
    5. 归一化

v5.1 关键改进 0:P(trending|r) 贝叶斯后验概率

v5.0 使用 sigmoid(z_r - 2.0) 将后验均值的 z 值映射为趋势概率,中心点 2.0 和斜率 1.0 均为经验值。这浪费了 NIG 共轭先验的核心优势——可以精确计算后验概率

v5.1 改为直接利用 NIG 后验计算 P(|mu_r| > delta | data)

对每个运行长度 r,NIG 后验给出 mu_r 的边缘分布:
  mu_r | data ~ Student-t_{2*alpha_r}(mu_r_hat, beta_r / (alpha_r * kappa_r))

P(trending | r) = P(|mu_r| > drift_threshold | posterior)
                = 1 - CDF_t(+delta; df, mu_hat, scale) + CDF_t(-delta; df, mu_hat, scale)

其中:
  drift_threshold = 0.0002(对应 5min 收益率 0.02%,即有经济意义的最小漂移)
  df = 2 * alpha_r
  mu_hat = mu_r(后验均值)
  scale = sqrt(beta_r / (alpha_r * kappa_r))(后验标准差)

vs v5.0:
  v5.0: P(trending|r) = sigmoid(|mu_r| / std_mean - 2.0)     ← 经验映射
  v5.1: P(trending|r) = P(|mu_r| > delta | NIG posterior)     ← 精确贝叶斯推断

学术依据: 这是 NIG 共轭先验框架的标准用法。Murphy (2007, Conjugate Bayesian analysis of the Gaussian distribution) 给出了完整的后验预测分布推导。关键优势是 delta(drift_threshold)具有明确的经济学含义("多大的漂移才算有意义"),而非 v5.0 sigmoid 的纯数学阈值。

趋势概率(贝叶斯边缘化):

P(trending) = Sum_r P(r_t = r) * P(trending | r)

其中 P(trending | r) = P(|mu_r| > drift_threshold | NIG posterior_r)

DFA-2 作为后备(继承 v4.0):

BOCPD 需要约 20 根 K 线预热。预热期间或 BOCPD 不可用时,降级为 v4.0 的 DFA-2 + R-squared 质检。

混合决策逻辑:

if BOCPD 就绪(>= 20 根更新):
    trend_prob = bocpd.trend_probability
    if trend_prob > bocpd_trend_threshold(默认 0.75):
        -> 硬拦截:P(trending)={trend_prob:.2f} > {threshold}
    else:
        -> 放行,但将 trend_prob 共享给 Layer 1/2(跨层信息)
else:
    -> 降级为 DFA-2 + R-squared 质检(v4.0 逻辑)

跨层信息共享(v5.0 引入,v5.1 继承):

Layer 0 的 trend_prob 即使未达到硬拦截阈值,也包含有价值的信息。例如 trend_prob=0.55 说明有弱趋势信号。v5.0 将此信息传递给下游层:

Layer 1:如果 trend_prob > 0.5,ER 阈值降低(更容易触发软拦截)
  er_thresh *= 1.0 - cross_layer_factor * max(0, trend_prob - 0.5)
  cross_layer_factor = 0.4 -> trend_prob=0.75 时 er_thresh 降低 10%

Layer 2:如果 trend_prob > 0.5,CUSUM 阈值降低(更早检测急动)
  cusum_thresh *= 1.0 - cross_layer_factor * 0.5 * max(0, trend_prob - 0.5)
  影响更温和(x 0.5),因为 Layer 2 已有 BTC 因子调节

2.1 约束 1&2:持续单边行情过滤(Layer 1)

核心算法:Kaufman ER + RS 自适应净位移(继承 v4.0)

ER 是衡量趋势质量的最佳简单指标,O(N) 复杂度,可解释性强。非对称 ER 阈值和 RS 自适应设计继承 v4.0,不做改动。

v5.0 改进:多时间框架确认(Multi-Timeframe Confirmation)

5min K 线噪音大,短期波动容易误触发 Layer 1。在专业量化系统中,多时间框架确认是基础设施级别的功能。

将 5min bars 聚合为 1h bars(每 12 根聚合一次),在 1h 级别计算 ER 作为确认信号。

v5.1 关键改进 1:MTF 连续加权替代阶梯函数

v5.0 使用阶梯函数:1h_er >= 0.50 -> er_thresh *= 0.80。问题是 ER=0.49 和 ER=0.51 之间存在不连续跳变(0% vs 20% 阈值调整),在边界处行为不稳定。

v5.1 改为连续线性加权:

# v5.0(阶梯函数 — 已废弃)
if 1h_er >= mtf_er_threshold:
    er_thresh *= 0.80

# v5.1(连续加权)
1h_closes = 最近 mtf_lookback 根 1h K 线(默认 6 根 = 6 小时)
1h_er = direction_distance(1h) / path_length(1h)

mtf_weight = clamp((1h_er - mtf_soft_lower) / (mtf_soft_upper - mtf_soft_lower), 0, 1)
er_thresh *= 1.0 - mtf_max_reduction * mtf_weight

默认参数:
  mtf_soft_lower = 0.30      # 低于此值无修正
  mtf_soft_upper = 0.70      # 高于此值满修正
  mtf_max_reduction = 0.20   # 最大降低 20%

示例:
  1h_er = 0.20 -> mtf_weight = 0.00 -> er_thresh *= 1.00(无修正)
  1h_er = 0.50 -> mtf_weight = 0.50 -> er_thresh *= 0.90(降低 10%)
  1h_er = 0.70 -> mtf_weight = 1.00 -> er_thresh *= 0.80(降低 20%)
  1h_er = 0.90 -> mtf_weight = 1.00 -> er_thresh *= 0.80(饱和)

跨层 BOCPD 信号调节(继承 v5.0):

见 2.0 节跨层信息共享。当 BOCPD 检测到弱趋势信号时(0.5 < trend_prob < 0.75),Layer 1 的 ER 阈值进一步降低,使得持续趋势过滤更敏感。

核心公式(完整版,含 v5.1 修正):

N = sustained_lookback(默认 30 根 5min = 150 分钟)

# Kaufman Efficiency Ratio
ER = abs(close[-1] - close[-N]) / Sum|close[i] - close[i-1]|

# 基础非对称 ER 阈值
er_thresh_base = er_threshold_long(0.60)或 er_threshold_short(0.50)

# v5.1 MTF 连续加权修正(替代 v5.0 阶梯函数)
if mtf_enabled:
    mtf_er = calc_mtf_er(symbol)
    if mtf_er is not None:
        mtf_w = clamp((mtf_er - mtf_soft_lower) / (mtf_soft_upper - mtf_soft_lower), 0, 1)
        er_thresh_base *= 1.0 - mtf_max_reduction * mtf_w

# v5.0 跨层 BOCPD 修正
if cross_layer_enabled AND bocpd_trend_prob > 0.5:
    er_thresh_base *= 1.0 - 0.4 * (bocpd_trend_prob - 0.5)

# RS 自适应净位移阈值(继承 v4.0)
adaptive_thresh = base_threshold * (rs_vol / per_symbol_baseline)
                 (缩放范围 0.3x ~ 3.0x)

# 联合触发(继承 v4.0)
约束1:direction == 'short' AND net_return <= -adaptive_thresh AND ER >= er_thresh
约束2:direction == 'long'  AND net_return >= +adaptive_thresh AND ER >= er_thresh

2.2 约束 3&4:急涨急跌过滤(Layer 2)

RS 波动率(继承 v4.0)

Rogers-Satchell (1991) 漂移不变 OHLC 波动率估计器,完全继承 v4.0,不做改动。

v5.0 改进:自适应 CUSUM drift

问题: v4.0 的 drift=0.5 是固定值。在高波动期(RS_vol 远高于基准),CUSUM 标准化后的 z 值波动加大,固定 drift 无法补偿,导致误报增加。在低波动期,固定 drift 又过于宽松。

解决方案: drift 随 RS 波动率的相对水平动态缩放。

# 自适应 drift
vol_ratio = rs_vol_current / rs_vol_baseline
adaptive_drift = base_drift * clamp(vol_ratio, drift_scale_min, drift_scale_max)
               = 0.5 * clamp(vol_ratio, 0.5, 2.0)

高波动期:vol_ratio=2.0 -> drift=1.0 -> CUSUM 更不敏感(合理:高波动期急动更常见)
低波动期:vol_ratio=0.3 -> drift=0.25 -> CUSUM 更敏感(合理:低波动期急动更异常)

学术依据: Hawkins & Olwell (1998, Cumulative Sum Charts and Charting for Quality Improvement) 指出 CUSUM 的 ARL (Average Run Length) 性能依赖 drift 与实际信号幅度的比率,建议根据过程特性自适应调整。

v5.1 关键改进 2:CUSUM z 值 Huber 鲁棒化

问题: CUSUM 的 z = ret / rs_vol 标准化隐含假设收益率近似高斯。加密收益率的 kurtosis 通常 > 10(远超高斯的 3),极端尾部事件(如闪崩、插针)会产生 z=10~20 的异常值,单根 K 线就能将 CUSUM 推过阈值,导致虚假触发。

解决方案: 对 z 值施加 Huber 化(软截断),减少极端值的不成比例影响:

# v5.0:无截断
z = ret / rs_vol

# v5.1:Huber 鲁棒化
z_raw = ret / rs_vol
z = sign(z_raw) * min(|z_raw|, cusum_z_clip)

cusum_z_clip = 5.0(默认):
  |z| <= 5: 保持原值(覆盖 99.99997% 高斯分布,但加密市场重尾更频繁)
  |z| = 10: 截断为 5 → CUSUM 增量从 10 降为 5(减少 50% 冲击)
  |z| = 20: 截断为 5 → CUSUM 增量从 20 降为 5(减少 75% 冲击)

学术依据: Huber (1964, Robust Estimation of a Location Parameter) 提出的 Huber 估计器是稳健统计的奠基性工作。在 CUSUM 场景中,Huber 化保留了对真实持续性趋势的检测能力(持续的中等 z 值会累积),同时大幅降低了单次极端事件的误触发概率。

v5.1 关键改进 3:BTC 因子滞后因果化

问题: v5.0 使用 BTC 的当期 CUSUM stress 调节 alt 的阈值。但 BTC 和 alt 可能同时反应同一外部事件(如监管新闻、交易所故障),此时 BTC 的 stress 并不具有对 alt 的因果预测能力,降低 alt 阈值反而增加误判。

解决方案: 使用 BTC 过去 3~5 根 K 线的 CUSUM stress(而非当期值),建立时序因果关系——BTC 先动,alt 滞后反应:

# v5.0:当期同步(同期相关)
btc_stress = max(btc_cusum_pos, btc_cusum_neg)  # 当期值

# v5.1:滞后因果
# 维护 BTC stress 的环形缓冲区(最近 btc_stress_lag 根 K 线)
btc_stress = max of btc_stress_history[-btc_stress_lag:]  # 滞后最大值

btc_stress_lag = 3(默认,即 15 分钟滞后):
  取过去 3 根 K 线中 BTC CUSUM stress 的最大值
  这意味着 BTC 在 15 分钟前出现急动 → 当前 alt 阈值降低
  而 BTC 和 alt 同时急动 → 当前 BTC stress 来不及影响(需 3 根后才影响)

学术依据: Moskowitz, Ooi & Pedersen (2012, Time series momentum, J. Financial Economics) 论证了资产间动量溢出效应存在明确的时序领先-滞后关系。在加密市场中,BTC 对 alt 的影响通常在 5-30 分钟内传导(Makarov & Schoar, 2020)。

v5.0 改进:BTC 因子指数衰减(继承)

指数衰减 exp(-lambda * excess) 替代 v4.0 线性衰减,更平滑,继承 v5.0 不做改动。

OFI 量价升级路径(Phase 2,继承 v5.0)

OFI(Order Flow Imbalance,Cont, Kukanov & Stoikov 2014)直接从 L2 book 变化中度量买卖压力不平衡。系统已订阅 L2Book,具备 OFI 数据基础。但 OFI 集成需要额外数据管道改造,标记为 Phase 2 实施。Phase 1 继续使用 VWPM。

完整 CUSUM 公式(v5.1):

ret(i) = (close(i) - close(i-1)) / close(i-1)

# v5.1 Huber 鲁棒化
z_raw(i) = ret(i) / rs_vol
z(i) = sign(z_raw) * min(|z_raw|, cusum_z_clip)     # cusum_z_clip=5.0

# 自适应 drift(v5.0)
vol_ratio = rs_vol / baseline_vol
adaptive_drift = base_drift * clamp(vol_ratio, 0.5, 2.0)

S_pos(i) = max(0, S_pos(i-1) + z(i) - adaptive_drift)
S_neg(i) = max(0, S_neg(i-1) - z(i) - adaptive_drift)

# BTC 滞后因果因子(v5.1 改进)
if symbol != BTC AND max(btc_stress_history[-lag:]) > btc_stress_threshold:
    btc_stress = max(btc_stress_history[-lag:])
    excess = btc_stress - btc_stress_threshold
    stress_factor = max(btc_stress_factor_min, exp(-btc_decay_rate * excess))
    effective_thresh_up   *= stress_factor
    effective_thresh_down *= stress_factor

# 跨层 BOCPD 修正(v5.0)
if bocpd_trend_prob > 0.5:
    cusum_thresh *= 1.0 - 0.2 * max(0, bocpd_trend_prob - 0.5)

# 量价确认(VWPM, Phase 1;OFI, Phase 2)
volume_confirmed = volume_ratio >= 1.5 AND |VWPM|/rs_vol >= vwpm_confirm_ratio

# 非对称触发
约束3:direction=='short' AND S_pos >= effective_thresh_up AND volume_confirmed
约束4:direction=='long'  AND S_neg >= effective_thresh_down AND volume_confirmed

2.3 Spread 层面趋势检测(Layer 3)

PP + KPSS + ER + OU 半衰期约束(v5.0 引入)

Phillips-Perron 检验替代/补充 ADF(继承 v5.0):

PP (1988) 使用 Newey-West 核估计对检验统计量做非参数修正,对异方差和序列相关鲁棒,优于 ADF。

OU 半衰期约束(v5.0 引入,v5.1 增强):

ADF/KPSS/PP 只回答"序列是否平稳",不回答"均值回归有多快"。一个半衰期 200 小时的序列通过平稳性检验,但在 72 小时最大持仓时间内根本来不及回归。

# OU 过程:dX = theta*(mu - X)dt + sigma*dW
# 离散化:DeltaX_t = theta * X_{t-1} + c + eps_t
# OLS 回归 DeltaX on X_{t-1} -> 斜率 = theta
# 半衰期 = -ln(2) / ln(1 + theta)

theta < 0  -> 均值回归(theta 越负,回归越快)
theta >= 0 -> 无均值回归(趋势或随机游走)

v5.1 关键改进 4:OU 半衰期 Hurwicz 偏差校正

问题: OLS 估计自回归系数 theta 在小样本下存在 Hurwicz 偏差(Hurwicz, 1950)——theta 被系统性地向零偏移,导致半衰期被高估。当 spread_lookback=20(仅 19 个观测值)时,偏差可达 15-25%。这意味着真实半衰期 30h 的序列被估计为 36-38h,可能错误地被半衰期约束拦截。

解决方案: 应用一阶 Hurwicz 偏差校正:

# v5.0:无偏差校正
half_life_raw = -ln(2) / ln(1 + theta_ols)

# v5.1:Hurwicz 偏差校正
# Kendall (1954) 一阶修正:E[theta_ols] approx theta * (1 - (1+3*theta)/(n))
# 近似反向校正:
theta_corrected = theta_ols * n / (n - 2.5)
half_life = -ln(2) / ln(1 + theta_corrected)

其中 n = 观测数(= spread_lookback)
常数 2.5 是 AR(1) 模型 Hurwicz 偏差的经验近似值

示例(spread_lookback=40, theta_ols=-0.02):
  v5.0: half_life = -ln(2) / ln(0.98) = 34.3h
  v5.1: theta_c = -0.02 * 40/37.5 = -0.02133
        half_life = -ln(2) / ln(0.97867) = 32.2h(校正 6%)

学术依据: Hurwicz (1950, Generalization of the concept of estimation) 首次证明 AR 模型 OLS 估计的有限样本偏差。Kendall (1954) 给出一阶修正公式。对配对交易中的半衰期估计,Krauss (2017, Statistical Arbitrage Pairs Trading Strategies: Review and Outlook) 明确指出需要偏差校正。

v5.1 关键改进 5:Spread 回望窗口扩展

问题: v5.0 的 spread_lookback=20 仅提供 19 个 OLS 观测值。PP/KPSS 检验在如此小的样本下统计功效(power)不足,半衰期估计的标准误也很大。z4h_history 在系统运行 72h 后有 864 个数据点,样本量不是瓶颈。

解决方案: 将默认 spread_lookback 从 20 提高到 40:

# v5.0: spread_lookback = 20(19 个 OLS 观测值)
# v5.1: spread_lookback = 40(39 个 OLS 观测值)

收益:
  PP/KPSS 统计功效提升:n=20 时 ADF 的 power 约 40-50%,n=40 时约 65-75%
  OU 半衰期估计标准误降低约 30%(sqrt(20/40) = 0.71)
  Hurwicz 偏差从 ~20% 降至 ~10%

成本:
  回望窗口从 100 分钟扩展到 200 分钟
  需要 z4h_history 至少有 41 个数据点(约 3.4 小时,系统启动后很快满足)

完整 Layer 3 决策逻辑(v5.1):

# 第一关:ER 快速筛选(O(N),继承 v4.0)
if spread_er < spread_er_threshold AND NOT direction_ok:
    -> 无趋势,放行

# 第二关:PP/ADF + KPSS 平稳性检验
unit_root_p = PP_pvalue(优先)或 ADF_pvalue(后备)
kpss_p = KPSS_pvalue

四象限决策矩阵(PP 替代 ADF):
  ur_stationary(p<0.05)+ kpss_stationary(p>0.05)-> 确认平稳 -> 第三关
  ur_non_stationary + kpss_non_stationary -> 确认非平稳 -> 维持拦截
  矛盾 / 不确定 -> 保守维持拦截

# 第三关:OU 半衰期约束(v5.1 含 Hurwicz 偏差校正)
half_life = OLS 估计 OU 半衰期(小时)+ Hurwicz 校正
if half_life is None OR half_life > spread_half_life_max:
    -> 回归太慢或无回归 -> 维持拦截
else:
    -> 平稳 + 半衰期合理 -> 推翻拦截,放行

2.4 跨层信息共享架构(v5.0 引入,v5.1 继承)

v4.0 的四层完全独立判断,信息不跨层传递。v5.0 引入 _LayerContext 数据结构,在 check() 调用内部共享信息:

@dataclass
_LayerContext:
    bocpd_trend_prob: float | None   # Layer 0 -> Layer 1, 2
    hurst: float | None              # Layer 0 -> 诊断
    hurst_r2: float | None           # Layer 0 -> 诊断

信息流向:
  Layer 0 执行 -> 填充 ctx.bocpd_trend_prob
    |
  Layer 2 使用 ctx.bocpd_trend_prob 调节 CUSUM 阈值
    |
  Layer 1 使用 ctx.bocpd_trend_prob 调节 ER 阈值
    |
  Layer 3 使用 z4h_history + 半衰期(独立于 ctx)

设计原则:
  - 跨层影响是温和的修正因子(最多 10-15% 阈值调整),不是决定性开关
  - 各层仍可独立关闭/测试
  - ctx 仅在单次 check() 调用内有效,不跨调用持久化

v6.0 方向:对数似然比统一框架

当前的线性修正因子是 ad-hoc 的。更有原则的方式是将各层输出统一视为"不应入场"的证据,用对数似然比加权求和:

log_odds_block = w0 * logit(trend_prob) + w1 * logit(er_signal) + w2 * logit(cusum_signal)
P(block) = sigmoid(log_odds_block)

这本质上是逻辑回归框架,参数可通过少量历史数据校准。改动较大,列为 v6.0 方向。

2.5 算法组合架构(五层 + 跨层共享)

执行顺序说明: 层编号(Layer 0/1/2/3)反映逻辑分类,不是执行顺序。实际执行按拦截严格性排序:硬拦截优先,软拦截其次,仲裁最后。

输入信号(direction, alt_symbol, base_symbol)
    |
    |   执行顺序 =/= 层编号。按拦截严格性排序:
    |
    +-->> [第1步] Layer 0: BOCPD 概率化机制检测 + DFA-2 后备     [v5.1 升级]
    |      +-- BOCPD: P(trending) per symbol
    |      |   v5.1: 贝叶斯后验 P(|mu|>delta) 替代 sigmoid
    |      |   v5.1: NIG 先验 alpha0=3(vs v5.0 的 1)
    |      +-- DFA-2 后备: Hurst + R-squared 质检(BOCPD 预热期)
    |      +-- 共享: ctx.bocpd_trend_prob -> Layer 1/2 阈值调节
    |      +-- P(trending) > threshold -> 硬拦截
    |
    +-->> [第2步] Layer 2: 自适应 CUSUM + RS + VWPM + BTC 因子   [v5.1 升级]
    |      +-- v5.1: CUSUM z 值 Huber 鲁棒化(clip=5.0)
    |      +-- 自适应 drift = base_drift * clamp(vol_ratio, 0.5, 2.0)
    |      +-- v5.1: BTC 滞后因果因子(lag=3 根 K 线)
    |      +-- BTC 指数衰减: exp(-lambda*excess)
    |      +-- 跨层: ctx.bocpd_trend_prob -> 进一步调低阈值
    |      +-- VWPM + 量比联合确认(Phase 2: OFI 替代)
    |      +-- 任一腿触发 -> 硬拦截
    |
    +-->> [第3步] Layer 1: ER + RS 净位移 + MTF 确认             [v5.1 升级]
    |      +-- v5.1: MTF 连续加权(替代 v5.0 阶梯函数)
    |      +-- 跨层: ctx.bocpd_trend_prob -> ER 阈值调低
    |      +-- 任一腿触发 -> 软拦截 -> Layer 3 仲裁
    |
    +-->> [第4步] Layer 3: PP + KPSS + ER + OU 半衰期约束        [v5.1 升级]
           +-- ER 快速筛(O(N))
           +-- PP(优先)/ ADF + KPSS 平稳性检验
           +-- v5.1: OU 半衰期 Hurwicz 偏差校正
           +-- v5.1: spread_lookback 扩展至 40
           +-- 平稳 + 半衰期合理 -> 推翻拦截;否则维持
    |
    v
  最终决策:允许 / 拒绝入场

设计理念(v5.1):

层级 执行顺序 类型 可仲裁 触发含义 v5.1 变化
Layer 0 第1步 硬拦截 均值回归假设根本失效 贝叶斯后验 P(trending) + NIG alpha0=3
Layer 2 第2步 硬拦截 急动执行风险极高 Huber 鲁棒化 + BTC 滞后因果
Layer 1 第3步 软拦截 单腿持续趋势,可能误杀 MTF 连续加权
Layer 3 第4步 仲裁器 -- Spread 平稳+半衰期合理则放行 Hurwicz 偏差校正 + 回望窗口扩展

3. 算法具体实现

3.1 模块位置

src/trading/
  momentum_filter.py    <- 重写(BOCPD 贝叶斯后验 + Huber CUSUM + BTC lag + MTF 连续 + Hurwicz 校正)
  strategy.py           <- 修改 _check_entry(),注入并调用 filter;SymbolBaseline 新增 z4h_history
  config.py             <- 修改 StrategyParams,新增过滤器参数(含 v5.1 新增参数)
  orchestrator.py       <- 修改 process_analysis(),透传 OHLCV + volume
src/services/
  realtime_kline_service_base.py  <- 修改 _trigger_strategy_if_ready(),提取双腿 OHLCV + volume

3.2 MomentumFilter 完整代码(v5.1)

# src/trading/momentum_filter.py
"""
开仓动量过滤器 v5.1

四层过滤架构 + 跨层信息共享:
  Layer 0: BOCPD 概率化机制检测 + DFA-2 后备   -> 硬拦截     [v5.1 升级]
  Layer 1: ER + RS 净位移 + MTF 确认            -> 软拦截     [v5.1 升级]
  Layer 2: 自适应 CUSUM + RS + VWPM + BTC 因子  -> 硬拦截     [v5.1 升级]
  Layer 3: PP + KPSS + ER + OU 半衰期           -> 仲裁       [v5.1 升级]

v5.1 改进清单(相对 v5.0):
  1. [Layer 0] P(trending|r) 贝叶斯后验概率替代 sigmoid
  2. [Layer 0] NIG 先验 alpha0=3(自由度 6,减少预热期不稳定)
  3. [Layer 2] CUSUM z 值 Huber 鲁棒化(clip=5.0)
  4. [Layer 2] BTC 因子滞后因果化(lag=3 根 K 线)
  5. [Layer 1] MTF 连续加权替代阶梯函数
  6. [Layer 3] OU 半衰期 Hurwicz 偏差校正
  7. [Layer 3] spread_lookback 默认 40(扩展样本量)
"""

import math
import warnings
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime

try:
    from statsmodels.tsa.stattools import adfuller as _adfuller
    _HAS_ADF = True
except ImportError:
    _HAS_ADF = False

try:
    from statsmodels.tsa.stattools import kpss as _kpss
    _HAS_KPSS = True
except ImportError:
    _HAS_KPSS = False

try:
    from arch.unitroot import PhillipsPerron as _PhillipsPerron
    _HAS_PP = True
except ImportError:
    _HAS_PP = False


# --------------------------------------------------------------
# 辅助函数
# --------------------------------------------------------------

def _logsumexp(vals: list[float]) -> float:
    """Numerically stable log-sum-exp."""
    if not vals:
        return float('-inf')
    m = max(vals)
    if m == float('-inf'):
        return float('-inf')
    return m + math.log(sum(math.exp(v - m) for v in vals))


def _student_t_logpdf(x: float, mu: float, scale_sq: float, df: float) -> float:
    """Student-t log PDF: log St(x | mu, scale^2, df)."""
    if scale_sq <= 1e-30 or df <= 0:
        return -50.0
    z_sq = (x - mu) ** 2 / (df * scale_sq)
    return (math.lgamma((df + 1) / 2)
            - math.lgamma(df / 2)
            - 0.5 * math.log(df * math.pi * scale_sq)
            - (df + 1) / 2 * math.log(1 + z_sq))


def _student_t_cdf(x: float, df: float) -> float:
    """
    Student-t CDF 近似(用于贝叶斯后验概率计算)。
    使用正则化不完全 Beta 函数的近似。
    对 df >= 2 精度优于 0.001。
    """
    if df <= 0:
        return 0.5
    # 标准化
    t = x
    # 使用 Hill (1970) 近似
    y = t * t
    p = y / (df + y)

    if df >= 30:
        # 大自由度:正态近似
        z = t * (1 - 1 / (4 * df)) / math.sqrt(1 + y / (2 * df))
        # 标准正态 CDF 近似 (Abramowitz & Stegun 7.1.26)
        a1, a2, a3 = 0.4361836, -0.1201676, 0.9372980
        az = abs(z)
        if az > 8:
            return 1.0 if z > 0 else 0.0
        tt = 1.0 / (1.0 + 0.33267 * az)
        phi = 0.5 * math.exp(-az * az / 2) * tt * (a1 + tt * (a2 + tt * a3))
        return 1.0 - phi if z > 0 else phi

    # 小自由度:用 regularized incomplete beta function 近似
    # I_x(a, b) 近似
    a = df / 2
    b = 0.5
    if p < 0 or p > 1:
        p = max(0.0, min(1.0, p))

    # 连分数近似 I_p(a, b)
    if p < 1e-15:
        ibeta = 0.0
    elif p > 1 - 1e-15:
        ibeta = 1.0
    else:
        log_prefix = (math.lgamma(a + b) - math.lgamma(a) - math.lgamma(b)
                      + a * math.log(p) + b * math.log(1 - p))
        prefix = math.exp(log_prefix)

        # Lentz 连分数
        f = 1.0
        c = 1.0
        d = 1.0 - (a + b) * p / (a + 1)
        if abs(d) < 1e-30:
            d = 1e-30
        d = 1.0 / d
        f = d

        for m in range(1, 100):
            # 偶数项
            num = m * (b - m) * p / ((a + 2 * m - 1) * (a + 2 * m))
            d = 1.0 + num * d
            if abs(d) < 1e-30:
                d = 1e-30
            c = 1.0 + num / c
            if abs(c) < 1e-30:
                c = 1e-30
            d = 1.0 / d
            f *= d * c

            # 奇数项
            num = -(a + m) * (a + b + m) * p / ((a + 2 * m) * (a + 2 * m + 1))
            d = 1.0 + num * d
            if abs(d) < 1e-30:
                d = 1e-30
            c = 1.0 + num / c
            if abs(c) < 1e-30:
                c = 1e-30
            d = 1.0 / d
            delta = d * c
            f *= delta
            if abs(delta - 1.0) < 1e-10:
                break

        ibeta = prefix * f / a

    # P(T <= t) = 0.5 * (1 + sign(t) * I_p(df/2, 0.5)) ... 需要调整
    # 实际上 P(T <= t) = 1 - 0.5 * I_{df/(df+t^2)}(df/2, 1/2) for t > 0
    half_ibeta = 0.5 * ibeta
    if t >= 0:
        return 1.0 - half_ibeta
    else:
        return half_ibeta


def _estimate_half_life(series: list[float], bar_minutes: float = 5.0,
                        hurwicz_correction: bool = True) -> float | None:
    """
    OU 过程半衰期估计(OLS 回归 + v5.1 Hurwicz 偏差校正)。

    模型: DeltaX_t = theta * X_{t-1} + c + eps
    半衰期 = -ln(2) / ln(1 + theta)

    v5.1 改进:Hurwicz 偏差校正 theta_c = theta_ols * n / (n - 2.5)
    消除小样本下 theta 向零的系统性偏差(半衰期高估 15-25%)。

    Returns: 半衰期(小时),或 None(无均值回归信号)。
    """
    n = len(series)
    if n < 10:
        return None

    y = [series[i] - series[i - 1] for i in range(1, n)]
    x = series[:-1]
    n_obs = len(y)

    x_mean = sum(x) / n_obs
    y_mean = sum(y) / n_obs

    cov_xy = sum((x[i] - x_mean) * (y[i] - y_mean) for i in range(n_obs))
    var_x = sum((x[i] - x_mean) ** 2 for i in range(n_obs))

    if var_x < 1e-12:
        return None

    theta = cov_xy / var_x

    # v5.1: Hurwicz 偏差校正
    if hurwicz_correction and n_obs > 3:
        theta = theta * n_obs / (n_obs - 2.5)

    if theta >= 0 or theta <= -1:
        return None

    half_life_bars = -math.log(2) / math.log(1 + theta)
    half_life_hours = half_life_bars * bar_minutes / 60.0
    return half_life_hours if half_life_hours > 0 else None


# --------------------------------------------------------------
# BOCPD: Bayesian Online Changepoint Detection (v5.1)
# --------------------------------------------------------------

class _BOCPD:
    """
    Bayesian Online Changepoint Detection (Adams & MacKay 2007).

    Normal-Inverse-Gamma 共轭先验,Student-t 预测分布。
    Log-space 实现保证数值稳定性。O(R) per update, R = max_run。

    v5.1 改进:
      1. alpha0=3(vs v5.0 的 1):Student-t 自由度 6,减少预热期不稳定
      2. P(trending|r) 使用贝叶斯后验 P(|mu|>delta) 替代 sigmoid
    """

    __slots__ = ('_log_h', '_log_1mh', '_mu0', '_kappa0', '_alpha0', '_beta0',
                 '_max_run', '_log_probs', '_suff', '_n_updates',
                 '_drift_threshold')

    def __init__(self, hazard_rate: float = 0.01, mu0: float = 0.0,
                 kappa0: float = 1.0, alpha0: float = 3.0,
                 beta0: float = 0.01, max_run: int = 200,
                 drift_threshold: float = 0.0002):
        self._log_h = math.log(max(1e-10, hazard_rate))
        self._log_1mh = math.log(max(1e-10, 1.0 - hazard_rate))
        self._mu0, self._kappa0 = mu0, kappa0
        self._alpha0, self._beta0 = alpha0, beta0
        self._max_run = max_run
        self._drift_threshold = drift_threshold
        self._log_probs: list[float] = [0.0]
        self._suff: list[tuple[float, float, float, float]] = [
            (mu0, kappa0, alpha0, beta0)
        ]
        self._n_updates = 0

    def update(self, x: float) -> None:
        """处理新观测值(对数收益率)。"""
        n = len(self._log_probs)

        # 1. 每个运行长度的预测 log-PDF
        log_preds = []
        for i in range(n):
            mu, kappa, alpha, beta = self._suff[i]
            if kappa > 0 and alpha > 0 and beta > 0:
                scale_sq = beta * (kappa + 1) / (alpha * kappa)
                log_preds.append(_student_t_logpdf(x, mu, scale_sq, 2 * alpha))
            else:
                log_preds.append(-50.0)

        # 2. 增长概率
        log_growth = [
            self._log_probs[i] + log_preds[i] + self._log_1mh
            for i in range(n)
        ]

        # 3. 变点概率
        log_cp = _logsumexp([
            self._log_probs[i] + log_preds[i] + self._log_h
            for i in range(n)
        ])

        # 4. 新分布 + 归一化
        new_log = [log_cp] + log_growth
        log_total = _logsumexp(new_log)
        new_log = [lp - log_total for lp in new_log]

        # 5. 更新 NIG 充分统计量
        new_suff: list[tuple[float, float, float, float]] = [
            (self._mu0, self._kappa0, self._alpha0, self._beta0)
        ]
        for i in range(n):
            mu, kappa, alpha, beta = self._suff[i]
            kappa_n = kappa + 1
            mu_n = (kappa * mu + x) / kappa_n
            alpha_n = alpha + 0.5
            beta_n = beta + kappa * (x - mu) ** 2 / (2 * kappa_n)
            new_suff.append((mu_n, kappa_n, alpha_n, beta_n))

        # 6. 截断
        if len(new_log) > self._max_run:
            new_log = new_log[:self._max_run]
            new_suff = new_suff[:self._max_run]
            log_total = _logsumexp(new_log)
            new_log = [lp - log_total for lp in new_log]

        self._log_probs = new_log
        self._suff = new_suff
        self._n_updates += 1

    @property
    def trend_probability(self) -> float:
        """
        P(trending) -- 贝叶斯后验边缘化(v5.1 改进)。

        v5.0: sigmoid(z_r - 2.0) 经验映射
        v5.1: P(|mu_r| > drift_threshold | NIG posterior) 精确贝叶斯推断

        对每个运行长度 r,NIG 后验给出 mu_r 的边缘分布:
          mu_r | data ~ Student-t_{2*alpha_r}(mu_r_hat, beta_r / (alpha_r * kappa_r))
        P(trending|r) = P(|mu_r| > delta) = 1 - CDF_t(+d) + CDF_t(-d)
        其中 d = (delta - mu_hat) / scale, d2 = (-delta - mu_hat) / scale
        """
        delta = self._drift_threshold
        total = 0.0
        for i in range(len(self._log_probs)):
            prob = math.exp(self._log_probs[i])
            if prob < 1e-8:
                continue
            mu, kappa, alpha, beta = self._suff[i]
            if alpha > 0.5 and beta > 1e-30 and kappa > 0:
                df = 2 * alpha
                scale = math.sqrt(beta / (alpha * kappa))
                if scale < 1e-15:
                    # scale 极小 -> |mu| 几乎确定,直接判断
                    p_trend = 1.0 if abs(mu) > delta else 0.0
                else:
                    # P(|mu| > delta) = P(mu > delta) + P(mu < -delta)
                    # = 1 - CDF_t((delta - mu)/scale, df) + CDF_t((-delta - mu)/scale, df)
                    t_upper = (delta - mu) / scale
                    t_lower = (-delta - mu) / scale
                    p_trend = (1.0 - _student_t_cdf(t_upper, df)
                               + _student_t_cdf(t_lower, df))
            else:
                p_trend = 0.5  # 数据不足,回归先验
            total += prob * p_trend
        return total

    @property
    def ready(self) -> bool:
        return self._n_updates >= 20


# --------------------------------------------------------------
# DFA-2 Hurst 后备(继承 v4.0)
# --------------------------------------------------------------

def _hurst_dfa2(closes: list[float], r2_threshold: float = 0.85) -> tuple[float | None, float | None]:
    """
    DFA-2(二阶去趋势波动分析)估算 Hurst 指数(v4.0 继承)。
    返回 (hurst, r_squared),不可靠时返回 (None, None)。
    """
    n = len(closes)
    if n < 20:
        return None, None

    rets = []
    for i in range(1, n):
        if closes[i] > 0 and closes[i - 1] > 0:
            rets.append(math.log(closes[i] / closes[i - 1]))
    nr = len(rets)
    if nr < 16:
        return None, None

    mean_r = sum(rets) / nr
    profile = []
    cum = 0.0
    for r in rets:
        cum += r - mean_r
        profile.append(cum)

    scales: list[float] = []
    rms_vals: list[float] = []
    s = 4
    max_s = nr // 4
    while s <= max_s:
        n_win = nr // s
        if n_win < 2:
            break
        rss_sum = 0.0
        rss_cnt = 0
        for i in range(n_win):
            seg = profile[i * s: (i + 1) * s]
            ws = len(seg)
            if ws < 3:
                if ws >= 2:
                    x_m = (ws - 1) * 0.5
                    y_m = sum(seg) / ws
                    cov = sum((j - x_m) * (seg[j] - y_m) for j in range(ws))
                    var_x = sum((j - x_m) ** 2 for j in range(ws))
                    slope = cov / var_x if var_x > 1e-12 else 0.0
                    intercept = y_m - slope * x_m
                    rss = sum((seg[j] - (intercept + slope * j)) ** 2 for j in range(ws)) / ws
                    rss_sum += rss
                    rss_cnt += 1
                continue
            s0 = float(ws)
            s1 = ws * (ws - 1) / 2.0
            s2 = ws * (ws - 1) * (2 * ws - 1) / 6.0
            s3 = (ws * (ws - 1) / 2.0) ** 2
            s4 = ws * (ws - 1) * (2 * ws - 1) * (3 * ws * ws - 3 * ws - 1) / 30.0
            sy0 = sum(seg)
            sy1 = sum(j * seg[j] for j in range(ws))
            sy2 = sum(j * j * seg[j] for j in range(ws))
            det = (s0 * (s2 * s4 - s3 * s3)
                   - s1 * (s1 * s4 - s3 * s2)
                   + s2 * (s1 * s3 - s2 * s2))
            if abs(det) < 1e-12:
                x_m = (ws - 1) * 0.5
                y_m = sum(seg) / ws
                cov = sum((j - x_m) * (seg[j] - y_m) for j in range(ws))
                var_x = sum((j - x_m) ** 2 for j in range(ws))
                slope = cov / var_x if var_x > 1e-12 else 0.0
                intercept = y_m - slope * x_m
                rss = sum((seg[j] - (intercept + slope * j)) ** 2 for j in range(ws)) / ws
                rss_sum += rss
                rss_cnt += 1
                continue
            a = ((sy0 * (s2 * s4 - s3 * s3)
                  - s1 * (sy1 * s4 - s3 * sy2)
                  + s2 * (sy1 * s3 - s2 * sy2)) / det)
            b = ((s0 * (sy1 * s4 - s3 * sy2)
                  - sy0 * (s1 * s4 - s3 * s2)
                  + s2 * (s1 * sy2 - sy1 * s2)) / det)
            c = ((s0 * (s2 * sy2 - sy1 * s3)
                  - s1 * (s1 * sy2 - sy1 * s2)
                  + sy0 * (s1 * s3 - s2 * s2)) / det)
            rss = sum((seg[j] - (a + b * j + c * j * j)) ** 2 for j in range(ws)) / ws
            rss_sum += rss
            rss_cnt += 1
        if rss_cnt >= 2:
            f_s = math.sqrt(rss_sum / rss_cnt)
            if f_s > 1e-14:
                scales.append(math.log(s))
                rms_vals.append(math.log(f_s))
        s = max(s + 1, int(s * 1.4 + 0.5))

    if len(scales) < 3:
        return None, None

    n_pts = len(scales)
    x_m = sum(scales) / n_pts
    y_m = sum(rms_vals) / n_pts
    num = sum((scales[i] - x_m) * (rms_vals[i] - y_m) for i in range(n_pts))
    den = sum((scales[i] - x_m) ** 2 for i in range(n_pts))
    if den < 1e-12:
        return None, None

    hurst = max(0.0, min(1.0, num / den))
    slope = num / den
    ss_res = sum((rms_vals[i] - (y_m + slope * (scales[i] - x_m))) ** 2 for i in range(n_pts))
    ss_tot = sum((rms_vals[i] - y_m) ** 2 for i in range(n_pts))
    r_squared = 1.0 - ss_res / ss_tot if ss_tot > 1e-12 else 0.0

    if r_squared < r2_threshold:
        return None, None

    return hurst, r_squared


# --------------------------------------------------------------
# 辅助类
# --------------------------------------------------------------

class _RollingMedian:
    """固定窗口滚动中位数(排序法,O(N log N) per query)。"""
    __slots__ = ('_data',)

    def __init__(self, maxlen: int):
        self._data: deque = deque(maxlen=maxlen)

    def push(self, val: float) -> None:
        self._data.append(val)

    @property
    def value(self) -> float | None:
        if not self._data:
            return None
        s = sorted(self._data)
        n = len(s)
        mid = n >> 1
        return s[mid] if n & 1 else (s[mid - 1] + s[mid]) * 0.5

    def __len__(self) -> int:
        return len(self._data)


class _HourBarAccum:
    """5m -> 1h 聚合器。每 12 根 5m bar 聚合为 1 根 1h bar。"""
    __slots__ = ('count', 'open_', 'high', 'low', 'close', 'volume')

    def __init__(self):
        self.reset()

    def reset(self):
        self.count = 0
        self.open_ = 0.0
        self.high = 0.0
        self.low = float('inf')
        self.close = 0.0
        self.volume = 0.0

    def add(self, o: float, h: float, l: float, c: float, v: float) -> bool:
        """添加 5m bar,返回 True 当 1h bar 已完成。"""
        if self.count == 0:
            self.open_ = o
            self.high = h
            self.low = l
        else:
            self.high = max(self.high, h)
            self.low = min(self.low, l)
        self.close = c
        self.volume += v
        self.count += 1
        return self.count >= 12

    def as_tuple(self) -> tuple:
        return (self.close, self.high, self.low, self.open_, self.volume)


@dataclass
class _LayerContext:
    """跨层共享上下文,单次 check() 调用内有效。"""
    bocpd_trend_prob: float | None = None
    hurst: float | None = None
    hurst_r2: float | None = None


# --------------------------------------------------------------
# MomentumFilter 主类 (v5.1)
# --------------------------------------------------------------

class MomentumFilter:
    """
    开仓动量过滤器 v5.1

    公开接口:
      update(symbol, close, high, low, open_, volume, kline_time)
      check(symbol, direction) -> (allowed, reason, is_soft)
      check_spread(z4h_history, direction, max_hold_hours) -> (has_trend, reason)
      ready(symbol) -> bool
    """

    def __init__(
        self,
        # -- 总开关 --
        enabled: bool = True,
        # -- Layer 0: BOCPD + DFA-2 后备 --
        bocpd_enabled: bool = True,
        bocpd_hazard_rate: float = 0.01,
        bocpd_trend_threshold: float = 0.75,
        bocpd_drift_threshold: float = 0.0002,   # v5.1 新增
        hurst_enabled: bool = True,
        hurst_lookback: int = 60,
        hurst_threshold: float = 0.60,
        hurst_r2_threshold: float = 0.85,
        # -- Layer 1: 持续趋势 + MTF --
        sustained_lookback: int = 30,
        sustained_base_threshold: float = 0.008,
        er_threshold_long: float = 0.60,
        er_threshold_short: float = 0.50,
        mtf_enabled: bool = True,
        mtf_soft_lower: float = 0.30,             # v5.1 新增(替代 mtf_er_threshold)
        mtf_soft_upper: float = 0.70,             # v5.1 新增
        mtf_max_reduction: float = 0.20,           # v5.1 新增
        mtf_lookback: int = 6,
        # -- Layer 2: 急动检测 --
        rs_period: int = 10,
        cusum_drift: float = 0.5,
        cusum_drift_adaptive: bool = True,
        cusum_z_clip: float = 5.0,                # v5.1 新增
        cusum_threshold_spike_up: float = 3.5,
        cusum_threshold_spike_down: float = 2.5,
        volume_confirm_ratio: float = 1.5,
        volume_ema_period: int = 20,
        vwpm_window: int = 5,
        vwpm_confirm_ratio: float = 0.8,
        # -- Layer 2: BTC 因子 --
        market_ref_symbol: str = "BTC",
        btc_stress_threshold: float = 2.0,
        btc_stress_factor_min: float = 0.7,
        btc_decay_rate: float = 0.3,
        btc_stress_lag: int = 3,                   # v5.1 新增
        # -- Layer 3: Spread 仲裁 --
        spread_lookback: int = 40,                 # v5.1 改动:20 -> 40
        spread_er_threshold: float = 0.45,
        spread_net_threshold: float = 1.5,
        spread_adf_pvalue: float = 0.05,
        spread_kpss_pvalue: float = 0.05,
        spread_use_pp: bool = True,
        spread_half_life_max: float = 36.0,
        spread_hurwicz_correction: bool = True,    # v5.1 新增
        # -- 跨层 --
        cross_layer_enabled: bool = True,
        cross_layer_factor: float = 0.4,
    ):
        self._enabled = enabled

        # Layer 0
        self._bocpd_enabled = bocpd_enabled
        self._bocpd_hazard = bocpd_hazard_rate
        self._bocpd_trend_thresh = bocpd_trend_threshold
        self._bocpd_drift_thresh = bocpd_drift_threshold
        self._hurst_enabled = hurst_enabled
        self._hurst_lookback = max(20, hurst_lookback)
        self._hurst_thresh = hurst_threshold
        self._hurst_r2_thresh = hurst_r2_threshold

        # Layer 1
        self._sustained_n = max(5, sustained_lookback)
        self._sustained_base_thresh = sustained_base_threshold
        self._er_thresh_long = er_threshold_long
        self._er_thresh_short = er_threshold_short
        self._mtf_enabled = mtf_enabled
        self._mtf_soft_lower = mtf_soft_lower
        self._mtf_soft_upper = mtf_soft_upper
        self._mtf_max_reduction = mtf_max_reduction
        self._mtf_lookback = mtf_lookback

        # Layer 2
        self._rs_period = rs_period
        self._cusum_drift = cusum_drift
        self._cusum_drift_adaptive = cusum_drift_adaptive
        self._cusum_z_clip = cusum_z_clip
        self._cusum_thresh_up = cusum_threshold_spike_up
        self._cusum_thresh_down = cusum_threshold_spike_down
        self._vol_confirm_ratio = volume_confirm_ratio
        self._vol_ema_period = volume_ema_period
        self._vwpm_window = vwpm_window
        self._vwpm_confirm_ratio = vwpm_confirm_ratio

        # Layer 2: BTC
        self._market_ref = market_ref_symbol
        self._btc_stress_thresh = btc_stress_threshold
        self._btc_stress_factor_min = btc_stress_factor_min
        self._btc_decay_rate = btc_decay_rate
        self._btc_stress_lag = max(1, btc_stress_lag)

        # Layer 3
        self._spread_lookback = spread_lookback
        self._spread_er_thresh = spread_er_threshold
        self._spread_net_thresh = spread_net_threshold
        self._spread_adf_pvalue = spread_adf_pvalue
        self._spread_kpss_pvalue = spread_kpss_pvalue
        self._spread_use_pp = spread_use_pp
        self._spread_hl_max = spread_half_life_max
        self._spread_hurwicz = spread_hurwicz_correction

        # 跨层
        self._cross_layer = cross_layer_enabled
        self._cross_factor = cross_layer_factor

        # 数据缓冲区
        buf_size = max(hurst_lookback + 5, sustained_lookback + 5,
                       rs_period + 10, volume_ema_period + 5,
                       vwpm_window + 2) + 10
        self._buf_size = buf_size

        # 状态
        self._buffers: dict[str, deque] = {}
        self._last_kline_time: dict[str, datetime] = {}
        self._cusum_state: dict[str, tuple[float, float]] = {}
        self._rs_ema: dict[str, float] = {}
        self._vol_ema: dict[str, float] = {}
        self._baseline_median: dict[str, _RollingMedian] = {}
        self._bocpd_state: dict[str, _BOCPD] = {}
        self._buffers_1h: dict[str, deque] = {}
        self._hour_accum: dict[str, _HourBarAccum] = {}
        self._btc_stress_history: dict[str, deque] = {}   # v5.1 新增

    # --------------- 公开接口 ---------------

    def update(self, symbol: str, close: float, high: float = 0.0,
               low: float = 0.0, open_: float = 0.0, volume: float = 0.0,
               kline_time: datetime | None = None) -> None:
        """每根新 K 线收盘时调用。"""
        if kline_time is not None:
            if self._last_kline_time.get(symbol) == kline_time:
                return
            self._last_kline_time[symbol] = kline_time

        if symbol not in self._buffers:
            self._buffers[symbol] = deque(maxlen=self._buf_size)
        self._buffers[symbol].append((close, high, low, open_, volume))

        buf = self._buffers[symbol]
        if len(buf) >= 2:
            self._online_update(symbol, buf)

    def check(self, symbol: str, direction: str) -> tuple[bool, str, bool]:
        """
        Layer 0 + Layer 2 + Layer 1 检查(单腿维度)。
        执行顺序:Layer 0(硬)-> Layer 2(硬)-> Layer 1(软)
        注意:执行顺序按拦截严格性排序,不等于层编号。
        """
        if not self._enabled:
            return True, "", False

        buf = self._buffers.get(symbol)
        if buf is None or len(buf) < self._rs_period + 2:
            return True, "", False

        ctx = _LayerContext()

        # -- Layer 0: BOCPD + DFA-2 后备 --
        ok, reason = self._check_regime(symbol, ctx)
        if not ok:
            return False, reason, False

        # -- Layer 2: 急动检测 --
        ok, reason = self._check_spike(symbol, direction, ctx)
        if not ok:
            return False, reason, False

        # -- Layer 1: 持续趋势 + MTF --
        if len(buf) >= self._sustained_n + 2:
            ok, reason = self._check_sustained(symbol, direction, ctx)
            if not ok:
                return False, reason, True

        return True, "", False

    def check_spread(self, z4h_history: list[float], direction: str,
                     max_hold_hours: float = 72.0) -> tuple[bool, str]:
        """
        Layer 3: PP + KPSS + ER + OU 半衰期约束(v5.1 含 Hurwicz 校正)。
        仅在存在软拦截(Layer 1)时调用。
        """
        n = self._spread_lookback
        if len(z4h_history) < n + 1:
            return False, ""

        series = z4h_history[-(n + 1):]

        # -- 第一关:ER 快速筛选 --
        spread_dir = abs(series[-1] - series[0])
        spread_path = sum(abs(series[i] - series[i - 1]) for i in range(1, len(series)))
        er = spread_dir / spread_path if spread_path > 1e-10 else 0.0
        spread_net = series[-1] - series[0]

        if direction == 'short':
            dir_ok = spread_net <= -self._spread_net_thresh
        else:
            dir_ok = spread_net >= self._spread_net_thresh

        if not (er >= self._spread_er_thresh and dir_ok):
            return False, ""

        # -- 第二关:PP/ADF + KPSS 平稳性检验 --
        ur_p = None
        kpss_p = None
        ur_test_name = "ER-only"

        if len(series) >= 15:
            # PP 优先(对异方差鲁棒)
            if self._spread_use_pp and _HAS_PP:
                try:
                    import numpy as _np
                    pp_result = _PhillipsPerron(_np.array(series, dtype=float))
                    ur_p = float(pp_result.pvalue)
                    ur_test_name = "PP"
                except Exception:
                    pass

            # ADF 后备
            if ur_p is None and _HAS_ADF:
                try:
                    ur_p = _adfuller(series, maxlag=2, regression='c', autolag=None)[1]
                    ur_test_name = "ADF"
                except Exception:
                    pass

            # KPSS
            if _HAS_KPSS:
                try:
                    with warnings.catch_warnings():
                        warnings.simplefilter("ignore")
                        kpss_p = _kpss(series, regression='c', nlags='auto')[1]
                except Exception:
                    pass

        # -- 四象限决策 + 半衰期约束(v5.1 含 Hurwicz 校正)--
        if ur_p is not None and kpss_p is not None:
            ur_stationary = ur_p < self._spread_adf_pvalue
            kpss_stationary = kpss_p > self._spread_kpss_pvalue

            if ur_stationary and kpss_stationary:
                # 确认平稳 -> 检查半衰期
                hl = _estimate_half_life(
                    series, hurwicz_correction=self._spread_hurwicz,
                )
                hl_max = min(self._spread_hl_max, max_hold_hours * 0.5)
                if hl is not None and hl > hl_max:
                    return True, (
                        f"平稳但半衰期过长: {ur_test_name}_p={ur_p:.3f}"
                        f" KPSS_p={kpss_p:.3f} HL={hl:.1f}h>{hl_max:.0f}h->维持拦截"
                    )
                hl_str = f" HL={hl:.1f}h" if hl else " HL=N/A"
                return False, (
                    f"{ur_test_name}+KPSS确认平稳({ur_test_name}_p={ur_p:.3f}"
                    f" KPSS_p={kpss_p:.3f}{hl_str})->推翻拦截"
                )

            if not ur_stationary and not kpss_stationary:
                return True, (
                    f"Spread趋势(三重确认): ER={er:.2f}"
                    f" {ur_test_name}_p={ur_p:.3f} KPSS_p={kpss_p:.3f}"
                )

            return True, (
                f"Spread趋势(矛盾/不确定): ER={er:.2f}"
                f" {ur_test_name}_p={ur_p:.3f} KPSS_p={kpss_p:.3f}->保守拦截"
            )

        # 只有单项检验
        if ur_p is not None:
            if ur_p < self._spread_adf_pvalue:
                hl = _estimate_half_life(
                    series, hurwicz_correction=self._spread_hurwicz,
                )
                hl_max = min(self._spread_hl_max, max_hold_hours * 0.5)
                if hl is not None and hl > hl_max:
                    return True, f"平稳但HL过长: {ur_test_name}_p={ur_p:.3f} HL={hl:.1f}h"
                return False, f"{ur_test_name}判定平稳(p={ur_p:.3f})->推翻拦截(无KPSS)"
            return True, f"Spread趋势({ur_test_name}+ER): ER={er:.2f} p={ur_p:.3f}"

        # 降级:纯 ER
        return True, f"Spread趋势(ER): ER={er:.2f} 净位移={spread_net:+.3f}"

    def ready(self, symbol: str) -> bool:
        buf = self._buffers.get(symbol)
        return buf is not None and len(buf) >= self._rs_period + 2

    # --------------- Layer 0: BOCPD + DFA-2 后备 ---------------

    def _check_regime(self, symbol: str, ctx: _LayerContext) -> tuple[bool, str]:
        """Layer 0: 概率化机制检测(v5.1 贝叶斯后验)。"""
        # BOCPD 路径
        if self._bocpd_enabled:
            bocpd = self._bocpd_state.get(symbol)
            if bocpd and bocpd.ready:
                trend_prob = bocpd.trend_probability
                ctx.bocpd_trend_prob = trend_prob
                if trend_prob > self._bocpd_trend_thresh:
                    return False, (
                        f"Layer0-BOCPD趋势机制: P(trending)={trend_prob:.3f}"
                        f">{self._bocpd_trend_thresh}"
                    )
                return True, ""

        # DFA-2 后备路径
        if not self._hurst_enabled:
            return True, ""

        buf = self._buffers.get(symbol)
        if buf is None or len(buf) < self._hurst_lookback + 1:
            return True, ""

        closes = [d[0] for d in list(buf)[-(self._hurst_lookback + 1):]]
        h, r2 = _hurst_dfa2(closes, r2_threshold=self._hurst_r2_thresh)

        if h is not None:
            ctx.hurst = h
            ctx.hurst_r2 = r2
        if h is None:
            return True, ""

        if h > self._hurst_thresh:
            return False, (
                f"Layer0-DFA2趋势机制: Hurst={h:.3f}>{self._hurst_thresh}"
                f" R2={r2:.3f}(BOCPD未就绪,DFA-2后备)"
            )
        return True, ""

    # --------------- Layer 1: 持续趋势 + MTF ---------------

    def _check_sustained(self, symbol: str, direction: str,
                         ctx: _LayerContext) -> tuple[bool, str]:
        """Layer 1: ER + RS 净位移 + MTF 连续加权 + 跨层 BOCPD 修正。"""
        buf = self._buffers[symbol]
        n = self._sustained_n
        data = list(buf)[-(n + 1):]
        closes = [d[0] for d in data]

        if len(closes) < n + 1:
            return True, ""

        ref = closes[0]
        if ref <= 0:
            return True, ""

        net_return = (closes[-1] - ref) / ref

        # Kaufman ER
        direction_dist = abs(closes[-1] - ref)
        path_length = sum(abs(closes[i] - closes[i - 1]) for i in range(1, len(closes)))
        er = direction_dist / path_length if path_length > 1e-10 else 0.0

        # 基础非对称 ER 阈值
        er_thresh = self._er_thresh_long if direction == 'long' else self._er_thresh_short

        # v5.1: MTF 连续加权修正(替代 v5.0 阶梯函数)
        mtf_er = None
        if self._mtf_enabled:
            mtf_er = self._calc_mtf_er(symbol)
            if mtf_er is not None:
                span = self._mtf_soft_upper - self._mtf_soft_lower
                if span > 1e-10:
                    mtf_w = max(0.0, min(1.0, (mtf_er - self._mtf_soft_lower) / span))
                else:
                    mtf_w = 1.0 if mtf_er >= self._mtf_soft_upper else 0.0
                er_thresh *= 1.0 - self._mtf_max_reduction * mtf_w

        # v5.0: 跨层 BOCPD 修正
        if self._cross_layer and ctx.bocpd_trend_prob is not None:
            if ctx.bocpd_trend_prob > 0.5:
                adjustment = self._cross_factor * (ctx.bocpd_trend_prob - 0.5)
                er_thresh *= (1.0 - adjustment)

        if er < er_thresh:
            return True, ""

        # RS 自适应阈值
        rs_var = self._rs_ema.get(symbol, 0.0)
        rs_vol = math.sqrt(max(0.0, rs_var))
        baseline = self._baseline_median.get(symbol)
        baseline_val = baseline.value if (baseline and len(baseline) >= 5) else None

        if rs_vol > 0 and baseline_val and baseline_val > 0:
            adaptive_thresh = self._sustained_base_thresh * (rs_vol / baseline_val)
            adaptive_thresh = max(self._sustained_base_thresh * 0.3,
                                  min(adaptive_thresh, self._sustained_base_thresh * 3.0))
        else:
            adaptive_thresh = self._sustained_base_thresh

        mtf_str = ""
        if self._mtf_enabled and mtf_er is not None and mtf_er > self._mtf_soft_lower:
            mtf_str = f" [MTF-1h-ER={mtf_er:.2f}]"

        if direction == 'short' and net_return <= -adaptive_thresh:
            return False, (
                f"Layer1-不追跌: 过去{n}根净跌幅={net_return:.2%}"
                f" 阈值=-{adaptive_thresh:.2%} ER={er:.2f}>={er_thresh:.2f}{mtf_str}"
            )
        if direction == 'long' and net_return >= adaptive_thresh:
            return False, (
                f"Layer1-不追涨: 过去{n}根净涨幅={net_return:.2%}"
                f" 阈值=+{adaptive_thresh:.2%} ER={er:.2f}>={er_thresh:.2f}{mtf_str}"
            )
        return True, ""

    def _calc_mtf_er(self, symbol: str) -> float | None:
        """计算 1h 级别 ER。"""
        buf_1h = self._buffers_1h.get(symbol)
        if buf_1h is None or len(buf_1h) < self._mtf_lookback + 1:
            return None
        data = list(buf_1h)[-(self._mtf_lookback + 1):]
        closes = [d[0] for d in data]
        d_dist = abs(closes[-1] - closes[0])
        p_len = sum(abs(closes[i] - closes[i - 1]) for i in range(1, len(closes)))
        return d_dist / p_len if p_len > 1e-10 else 0.0

    # --------------- Layer 2: 急动检测 ---------------

    def _check_spike(self, symbol: str, direction: str,
                     ctx: _LayerContext) -> tuple[bool, str]:
        """Layer 2: Huber 鲁棒 CUSUM + BTC 滞后因果因子 + 跨层修正。"""
        s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))

        effective_thresh_up = self._cusum_thresh_up
        effective_thresh_down = self._cusum_thresh_down

        # BTC 滞后因果因子(v5.1:使用滞后 stress 而非当期值)
        btc_str = ""
        if symbol != self._market_ref:
            btc_hist = self._btc_stress_history.get(self._market_ref)
            if btc_hist and len(btc_hist) >= 1:
                # 取过去 lag 根中的最大 stress
                lag_data = list(btc_hist)[-self._btc_stress_lag:]
                btc_stress = max(lag_data) if lag_data else 0.0
            else:
                btc_stress = 0.0

            if btc_stress > self._btc_stress_thresh:
                excess = btc_stress - self._btc_stress_thresh
                stress_factor = max(
                    self._btc_stress_factor_min,
                    math.exp(-self._btc_decay_rate * excess),
                )
                effective_thresh_up *= stress_factor
                effective_thresh_down *= stress_factor
                btc_str = f" BTC压力(lag={self._btc_stress_lag})->阈值x{stress_factor:.2f}"

        # 跨层 BOCPD 修正(v5.0)
        if self._cross_layer and ctx.bocpd_trend_prob is not None:
            if ctx.bocpd_trend_prob > 0.5:
                cusum_adj = 1.0 - 0.2 * (ctx.bocpd_trend_prob - 0.5)
                effective_thresh_up *= cusum_adj
                effective_thresh_down *= cusum_adj

        # 量价确认
        vol_confirmed, vol_ratio, vwpm_val = self._check_volume(symbol)

        rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
        vol_str = f" 量比={vol_ratio:.1f}" if vol_ratio > 0 else ""
        vwpm_str = f" VWPM={vwpm_val:.4f}" if vwpm_val != 0.0 else ""

        if direction == 'short' and s_pos >= effective_thresh_up and vol_confirmed:
            return False, (
                f"Layer2-暴涨不做空: CUSUM+={s_pos:.2f}>={effective_thresh_up:.2f}"
                f" RS_vol={rs_vol:.4f}{vol_str}{vwpm_str}{btc_str}"
            )
        if direction == 'long' and s_neg >= effective_thresh_down and vol_confirmed:
            return False, (
                f"Layer2-暴跌不做多: CUSUM-={s_neg:.2f}>={effective_thresh_down:.2f}"
                f" RS_vol={rs_vol:.4f}{vol_str}{vwpm_str}{btc_str}"
            )
        return True, ""

    def _check_volume(self, symbol: str) -> tuple[bool, float, float]:
        """VWPM + 量比联合确认(继承 v4.0)。"""
        buf = self._buffers.get(symbol)
        if buf is None or len(buf) < 2:
            return True, 0.0, 0.0

        current_vol = buf[-1][4]
        vol_ema = self._vol_ema.get(symbol, 0.0)

        if vol_ema > 0 and current_vol > 0:
            vol_ratio = current_vol / vol_ema
        else:
            return True, 0.0, 0.0

        vwpm_val = 0.0
        w = min(self._vwpm_window, len(buf) - 1)
        if w >= 1:
            data = list(buf)[-(w + 1):]
            weighted_sum = 0.0
            total_vol = 0.0
            for i in range(1, len(data)):
                prev_c = data[i - 1][0]
                curr_c = data[i][0]
                v = data[i][4]
                if prev_c > 0 and v > 0:
                    ret = (curr_c - prev_c) / prev_c
                    weighted_sum += ret * v
                    total_vol += v
            if total_vol > 0:
                vwpm_val = weighted_sum / total_vol

        rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
        if rs_vol > 1e-10 and vwpm_val != 0.0:
            vwpm_normalized = abs(vwpm_val) / rs_vol
            confirmed = (vol_ratio >= self._vol_confirm_ratio
                         and vwpm_normalized >= self._vwpm_confirm_ratio)
        else:
            confirmed = vol_ratio >= self._vol_confirm_ratio

        return confirmed, vol_ratio, vwpm_val

    # --------------- 在线更新 ---------------

    def _online_update(self, symbol: str, buf: deque) -> None:
        """O(1) 在线更新所有状态。"""
        curr = buf[-1]
        prev = buf[-2]
        close, high, low, open_, volume = curr
        prev_close = prev[0]

        if prev_close <= 0 or close <= 0:
            return

        # -- RS EMA --
        rs_val = self._calc_rs_single(high, low, open_, close)
        alpha_rs = 2.0 / (self._rs_period + 1)
        prev_rs = self._rs_ema.get(symbol, rs_val)
        new_rs = alpha_rs * rs_val + (1.0 - alpha_rs) * prev_rs
        self._rs_ema[symbol] = new_rs

        # -- Per-symbol 基准波动率 --
        rs_vol_now = math.sqrt(max(0.0, new_rs))
        if symbol not in self._baseline_median:
            self._baseline_median[symbol] = _RollingMedian(maxlen=200)
        self._baseline_median[symbol].push(rs_vol_now)

        # -- CUSUM(v5.1 Huber 鲁棒化 + 自适应 drift)--
        rs_vol = math.sqrt(max(0.0, new_rs))
        if rs_vol > 1e-10:
            ret = (close - prev_close) / prev_close
            z_raw = ret / rs_vol

            # v5.1: Huber 鲁棒化
            z = math.copysign(min(abs(z_raw), self._cusum_z_clip), z_raw)

            # 自适应 drift
            drift = self._cusum_drift
            if self._cusum_drift_adaptive:
                baseline = self._baseline_median.get(symbol)
                bl_val = baseline.value if (baseline and len(baseline) >= 5) else None
                if bl_val and bl_val > 0:
                    vol_ratio = rs_vol / bl_val
                    drift = self._cusum_drift * max(0.5, min(2.0, vol_ratio))

            s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))
            s_pos = max(0.0, s_pos + z - drift)
            s_neg = max(0.0, s_neg - z - drift)
            self._cusum_state[symbol] = (s_pos, s_neg)

        # -- BTC stress 历史记录(v5.1 滞后因果)--
        if symbol not in self._btc_stress_history:
            self._btc_stress_history[symbol] = deque(maxlen=max(10, self._btc_stress_lag + 2))
        cur_s_pos, cur_s_neg = self._cusum_state.get(symbol, (0.0, 0.0))
        self._btc_stress_history[symbol].append(max(cur_s_pos, cur_s_neg))

        # -- Volume EMA --
        if volume > 0:
            alpha_v = 2.0 / (self._vol_ema_period + 1)
            prev_v = self._vol_ema.get(symbol, volume)
            self._vol_ema[symbol] = alpha_v * volume + (1.0 - alpha_v) * prev_v

        # -- BOCPD 更新(v5.1 alpha0=3, 贝叶斯后验)--
        if self._bocpd_enabled and prev_close > 0:
            log_ret = math.log(close / prev_close)
            if symbol not in self._bocpd_state:
                self._bocpd_state[symbol] = _BOCPD(
                    hazard_rate=self._bocpd_hazard, max_run=200,
                    alpha0=3.0, drift_threshold=self._bocpd_drift_thresh,
                )
            self._bocpd_state[symbol].update(log_ret)

        # -- 1h 聚合(MTF)--
        if self._mtf_enabled:
            if symbol not in self._hour_accum:
                self._hour_accum[symbol] = _HourBarAccum()
            accum = self._hour_accum[symbol]
            o = open_ if open_ > 0 else close
            h = high if high > 0 else close
            l = low if low > 0 else close
            if accum.add(o, h, l, close, volume):
                if symbol not in self._buffers_1h:
                    self._buffers_1h[symbol] = deque(maxlen=30)
                self._buffers_1h[symbol].append(accum.as_tuple())
                accum.reset()

    @staticmethod
    def _calc_rs_single(high: float, low: float, open_: float, close: float) -> float:
        """Rogers-Satchell (1991) 波动率估计器(单根 K 线)。"""
        if high > 0 and low > 0 and open_ > 0 and high >= low:
            try:
                rs = (math.log(high / close) * math.log(high / open_)
                      + math.log(low / close) * math.log(low / open_))
                return max(0.0, rs)
            except (ValueError, ZeroDivisionError):
                pass
        if open_ > 0:
            try:
                return math.log(close / open_) ** 2
            except (ValueError, ZeroDivisionError):
                pass
        return 0.0

3.3 StrategyParams 新增字段(v5.1)

src/trading/config.pyStrategyParams 中新增(v4.0 的 24 个字段 + v5.0 新增 9 个 + v5.1 新增 5 个,共 38 个,全部有默认值):

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # -- 动量过滤器参数 --
    momentum_filter_enabled: bool = True

    # Layer 0: BOCPD + DFA-2 后备
    momentum_bocpd_enabled: bool = True                    # v5.0
    momentum_bocpd_hazard_rate: float = 0.01               # v5.0
    momentum_bocpd_trend_threshold: float = 0.75           # v5.0
    momentum_bocpd_drift_threshold: float = 0.0002         # v5.1 新增
    momentum_hurst_enabled: bool = True
    momentum_hurst_lookback: int = 60
    momentum_hurst_threshold: float = 0.60
    momentum_hurst_r2_threshold: float = 0.85

    # Layer 1: 持续趋势 + MTF
    momentum_sustained_lookback: int = 30
    momentum_sustained_base_threshold: float = 0.008
    momentum_er_threshold_long: float = 0.60
    momentum_er_threshold_short: float = 0.50
    momentum_mtf_enabled: bool = True                      # v5.0
    momentum_mtf_soft_lower: float = 0.30                  # v5.1 新增(替代 mtf_er_threshold)
    momentum_mtf_soft_upper: float = 0.70                  # v5.1 新增
    momentum_mtf_max_reduction: float = 0.20               # v5.1 新增
    momentum_mtf_lookback: int = 6                         # v5.0

    # Layer 2: 急动检测
    momentum_rs_period: int = 10
    momentum_cusum_drift: float = 0.5
    momentum_cusum_drift_adaptive: bool = True             # v5.0
    momentum_cusum_z_clip: float = 5.0                     # v5.1 新增
    momentum_cusum_threshold_spike_up: float = 3.5
    momentum_cusum_threshold_spike_down: float = 2.5
    momentum_volume_confirm_ratio: float = 1.5
    momentum_volume_ema_period: int = 20
    momentum_vwpm_window: int = 5
    momentum_vwpm_confirm_ratio: float = 0.8
    momentum_market_ref_symbol: str = "BTC"
    momentum_btc_stress_threshold: float = 2.0
    momentum_btc_stress_factor_min: float = 0.7
    momentum_btc_decay_rate: float = 0.3                   # v5.0
    momentum_btc_stress_lag: int = 3                       # v5.1 新增

    # Layer 3: Spread 仲裁
    momentum_spread_lookback: int = 40                     # v5.1 改动:20 -> 40
    momentum_spread_er_threshold: float = 0.45
    momentum_spread_net_threshold: float = 1.5
    momentum_spread_adf_pvalue: float = 0.05
    momentum_spread_kpss_pvalue: float = 0.05
    momentum_spread_use_pp: bool = True                    # v5.0
    momentum_spread_half_life_max: float = 36.0            # v5.0
    momentum_spread_hurwicz_correction: bool = True        # v5.1 新增

    # 跨层
    momentum_cross_layer_enabled: bool = True              # v5.0
    momentum_cross_layer_factor: float = 0.4               # v5.0

3.4 strategy.py 集成改动

与 v5.0 完全一致:SymbolBaseline 新增 z4h_historyprocess_tick 透传 OHLCV,_check_entry 四层检查。唯一变化是 check_spread 新增 max_hold_hours 参数:

# _check_entry 中 Layer 3 仲裁调用
spread_trend, spread_reason = self._momentum_filter.check_spread(
    z4h_hist, direction,
    max_hold_hours=params.max_hold_hours,
)

3.5 改动范围(v5.1)

改动文件 改动内容 改动量
src/trading/momentum_filter.py 重写:贝叶斯后验 BOCPD + Huber CUSUM + BTC lag + MTF 连续 + Hurwicz 校正 ~600 行
src/trading/strategy.py check_spread 传入 max_hold_hours(继承 v5.0) ~2 行
src/trading/config.py StrategyParams 新增 5 个字段(v5.1),总 38 个 ~20 行
src/trading/orchestrator.py 透传 OHLCV(继承 v4.0) ~5 行
src/services/realtime_kline_service_base.py 提取双腿 OHLCV(继承 v4.0) ~20 行

4. 融入当前交易体系的方案

4.1 数据流全景(v5.1)

WebSocket K线推送(OHLCV + volume)
      |
      v
realtime_kline_service_base
      |
      +-- _trigger_strategy_if_ready()
              |
              +-- 提取 alt/base OHLCV + volume
              v
      TradingOrchestrator.process_analysis(
          ..., alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv
      )
              |
              v
      AdaptiveBollingerStrategy.process_tick(
          ..., alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv
      )
              |
              +-- [新K线] bl.z4h_history.append(z4h)
              |
              +-- [新K线] MomentumFilter.update(alt/base, ohlcv, kline_time)
              |           -> O(1) 在线更新:
              |             RS EMA / Huber-CUSUM(v5.1) / Volume EMA
              |             Per-symbol基准中位数 / BOCPD(v5.1贝叶斯后验)
              |             1h聚合(MTF) / BTC stress历史(v5.1滞后)
              |
              +-- _check_entry()
                      |
                      步骤1-4: 冷却期/突破/持仓/z4h
                      步骤4.5: 方向判断
                      步骤4.6: 四层动量过滤(跨层共享 _LayerContext)
                                |
                                +-- [第1步] Layer 0(硬): BOCPD 贝叶斯后验 P(|mu|>delta)
                                |    共享 ctx.bocpd_trend_prob -> Layer 1/2
                                |
                                +-- [第2步] Layer 2(硬): Huber-CUSUM + BTC滞后因果 + 跨层修正
                                |
                                +-- [第3步] Layer 1(软): ER + MTF连续加权 + 跨层修正
                                |
                                +-- [第4步] Layer 3(仲裁): PP/ADF+KPSS + OU半衰期(Hurwicz校正)
                      步骤5: 产生 EntrySignal

4.2 线程安全

与 v5.0 一致。MomentumFilter 不持有锁,由 AdaptiveBollingerStrategy_lock 统一保护。新增的 _btc_stress_history 状态在锁保护范围内更新。

4.3 日志格式(v5.1)

动量过滤(硬)   | BTC|ETH | alt:Layer0-BOCPD趋势机制: P(trending)=0.82>0.75
动量过滤(硬)   | SOL|BTC | alt:Layer0-DFA2趋势机制: Hurst=0.67>0.60 R2=0.92(BOCPD未就绪,DFA-2后备)
动量过滤(硬)   | SOL|BTC | alt:Layer2-暴涨不做空: CUSUM+=3.82>=3.15 RS_vol=0.0028 量比=2.3 VWPM=0.0012 BTC压力(lag=3)->阈值x0.85
动量过滤(软+Spread) | SOL|BTC | alt:Layer1-不追跌: 过去30根净跌幅=-1.23% ER=0.72>=0.48 [MTF-1h-ER=0.55] | 平稳但半衰期过长: PP_p=0.02 KPSS_p=0.12 HL=38.1h>36h->维持拦截
动量仲裁放行   | ETH|BTC | 单腿:alt:Layer1-不追涨: ... | PP+KPSS确认平稳(PP_p=0.01 KPSS_p=0.15 HL=8.2h)->推翻拦截

4.4 配置层集成(v5.1)

# Layer 0: BOCPD + DFA-2 后备
TRADING_MOMENTUM_BOCPD_ENABLED=true
TRADING_MOMENTUM_BOCPD_HAZARD_RATE=0.01
TRADING_MOMENTUM_BOCPD_TREND_THRESHOLD=0.75
TRADING_MOMENTUM_BOCPD_DRIFT_THRESHOLD=0.0002               # v5.1 新增
TRADING_MOMENTUM_HURST_ENABLED=true
TRADING_MOMENTUM_HURST_LOOKBACK=60
TRADING_MOMENTUM_HURST_THRESHOLD=0.60
TRADING_MOMENTUM_HURST_R2_THRESHOLD=0.85

# Layer 1: 持续趋势 + MTF
TRADING_MOMENTUM_SUSTAINED_LOOKBACK=30
TRADING_MOMENTUM_SUSTAINED_BASE_THRESHOLD=0.008
TRADING_MOMENTUM_ER_THRESHOLD_LONG=0.60
TRADING_MOMENTUM_ER_THRESHOLD_SHORT=0.50
TRADING_MOMENTUM_MTF_ENABLED=true
TRADING_MOMENTUM_MTF_SOFT_LOWER=0.30                         # v5.1 新增(替代 MTF_ER_THRESHOLD)
TRADING_MOMENTUM_MTF_SOFT_UPPER=0.70                         # v5.1 新增
TRADING_MOMENTUM_MTF_MAX_REDUCTION=0.20                      # v5.1 新增
TRADING_MOMENTUM_MTF_LOOKBACK=6

# Layer 2: 急动检测
TRADING_MOMENTUM_RS_PERIOD=10
TRADING_MOMENTUM_CUSUM_DRIFT=0.5
TRADING_MOMENTUM_CUSUM_DRIFT_ADAPTIVE=true
TRADING_MOMENTUM_CUSUM_Z_CLIP=5.0                            # v5.1 新增
TRADING_MOMENTUM_CUSUM_THRESHOLD_SPIKE_UP=3.5
TRADING_MOMENTUM_CUSUM_THRESHOLD_SPIKE_DOWN=2.5
TRADING_MOMENTUM_VOLUME_CONFIRM_RATIO=1.5
TRADING_MOMENTUM_VWPM_WINDOW=5
TRADING_MOMENTUM_VWPM_CONFIRM_RATIO=0.8
TRADING_MOMENTUM_MARKET_REF_SYMBOL=BTC
TRADING_MOMENTUM_BTC_STRESS_THRESHOLD=2.0
TRADING_MOMENTUM_BTC_STRESS_FACTOR_MIN=0.7
TRADING_MOMENTUM_BTC_DECAY_RATE=0.3
TRADING_MOMENTUM_BTC_STRESS_LAG=3                            # v5.1 新增

# Layer 3: Spread 仲裁
TRADING_MOMENTUM_SPREAD_LOOKBACK=40                          # v5.1 改动:20 -> 40
TRADING_MOMENTUM_SPREAD_ER_THRESHOLD=0.45
TRADING_MOMENTUM_SPREAD_NET_THRESHOLD=1.5
TRADING_MOMENTUM_SPREAD_ADF_PVALUE=0.05
TRADING_MOMENTUM_SPREAD_KPSS_PVALUE=0.05
TRADING_MOMENTUM_SPREAD_USE_PP=true
TRADING_MOMENTUM_SPREAD_HALF_LIFE_MAX=36.0
TRADING_MOMENTUM_SPREAD_HURWICZ_CORRECTION=true              # v5.1 新增

# 跨层
TRADING_MOMENTUM_CROSS_LAYER_ENABLED=true
TRADING_MOMENTUM_CROSS_LAYER_FACTOR=0.4

5. 与当前交易风格的契合度分析

5.1 交易风格特征

维度 当前系统特征
策略类型 配对协整均值回归
K 线周期 5 分钟
最大持仓时间 72 小时
入场逻辑 adaptive_z 突破阈值(首次穿越)
出场逻辑 adaptive_z 回归至 entry_adaptive_z * reversion_factor

5.2 v5.1 新增改进的契合度分析

贝叶斯后验 P(trending|r)(最高契合):

v5.0 的 sigmoid 映射将 NIG 共轭先验降级为一个简单的 z-test,浪费了贝叶斯框架的核心优势。v5.1 的 P(|mu| > delta | NIG posterior) 是共轭先验的正确使用方式,且 drift_threshold(0.02%/bar)具有明确的经济学含义——"5 分钟内超过 0.02% 的漂移才算有意义"。这比 sigmoid 的纯数学中心点(z=2.0)更可解释、更可校准。

CUSUM Huber 鲁棒化(最高契合):

加密市场的闪崩/插针事件(单根 K 线涨跌 5%+)在传统金融中极为罕见,但在 crypto 中月均发生数次。Huber 化确保这些极端事件不会单根触发 CUSUM 硬拦截,只有持续的强动量才会累积突破阈值——这正是约束 3/4 的设计意图。

BTC 滞后因果因子(高契合):

BTC 对 alt 的影响传导存在 5-30 分钟延迟(Makarov & Schoar 2020)。使用 lag=3(15 分钟滞后)的 BTC stress 比当期值更有预测性——它捕捉的是"BTC 先动,alt 即将跟随"的因果关系,而非"BTC 和 alt 同时反应同一事件"的虚假相关。

MTF 连续加权(中高契合):

消除阶梯函数的不连续跳变是工程层面的改进。1h ER=0.49 和 0.51 不应产生截然不同的行为。连续函数让系统在边界条件下更稳定、更可预测。

OU Hurwicz 偏差校正(高契合):

对均值回归策略来说,半衰期是最关键的参数之一。小样本下系统性高估半衰期意味着:真实半衰期 30h 的 spread 被误判为 36h,可能被半衰期约束错误拦截。Hurwicz 校正消除了这个偏差,让半衰期约束更精确。

Spread 回望窗口扩展(中契合):

从 20 扩展到 40 提升了 PP/KPSS 的统计功效和 OU 估计的精度。成本是回望窗口从 100 分钟扩展到 200 分钟,但 z4h_history 的数据量完全支持。

5.3 综合评分(v5.1 vs v5.0 vs v4.0)

维度 评分 vs v5.0 vs v4.0 说明
算法严谨性 4.5/5 +0.5 +1.5 贝叶斯后验替代 sigmoid;Hurwicz 校正
鲁棒性 4.5/5 +0.5 +1.5 Huber 化 CUSUM;连续 MTF
因果推断 4/5 +0.5 +1.0 BTC 滞后因果(Transfer Entropy 留待 v6.0)
误杀控制 4.5/5 +0.5 +1.0 Hurwicz 校正减少误拦截;连续 MTF 消除边界跳变
自适应能力 4.5/5 持平 +1.0 自适应 drift + 跨层动态调节
市场微观结构 3.5/5 持平 持平 BTC 指数衰减是微调,OFI 留待 Phase 2
统计严谨性 4.5/5 +0.5 +1.5 贝叶斯后验 + PP + Hurwicz + 扩展样本量
多尺度覆盖 4/5 持平 +1.0 MTF 连续加权(仅 2 个时间尺度)
层间协同 4/5 持平 +1.0 跨层共享(v6.0: 对数似然比框架)
运行效率 4/5 持平 持平 Student-t CDF 增加微量开销,整体仍 O(R)
代码侵入性 5/5 持平 持平 独立模块,可逐项关闭

6. 与四项开单约束的对应关系

6.1 约束覆盖矩阵(v5.1)

约束 过滤层 触发指标 被阻止方向 检查维度 可仲裁 v5.1 变化
0. 趋势机制 Layer 0 BOCPD P(|mu|>delta)>T 或 Hurst(DFA-2)>T 双向 alt+base 贝叶斯后验替代 sigmoid
1. 不追跌 Layer 1 ER>=T_short AND net_ret<=-T_adp short alt+base+spread MTF 连续加权
2. 不追涨 Layer 1 ER>=T_long AND net_ret>=+T_adp long alt+base+spread MTF 连续加权
3. 暴涨不做空 Layer 2 CUSUM+>=T_up*SF AND VWPM确认 short alt+base+BTC Huber鲁棒化 + BTC滞后因果
4. 暴跌不做多 Layer 2 CUSUM->=T_down*SF AND VWPM确认 long alt+base+BTC Huber鲁棒化 + BTC滞后因果

四项约束完全覆盖。v5.1 增强:Layer 0 贝叶斯后验替代经验 sigmoid,Layer 2 Huber 化减少极端事件误触发且 BTC 因子因果化,Layer 1 连续 MTF 消除边界跳变,Layer 3 Hurwicz 校正提升半衰期精度。


7. 参数配置指南

7.1 v5.1 新增参数

Layer 0:BOCPD 贝叶斯后验

参数 默认值 范围 说明
bocpd_drift_threshold 0.0002 0.0001-0.0005 P(|mu|>delta) 中的 delta(有经济意义的最小漂移)
bocpd_drift_threshold 调优:
  0.0001:5min 收益率 0.01% = 1bps → 极低门槛,微弱趋势即触发
  0.0002:5min 收益率 0.02% = 2bps → 标准值,约对应日化 5.76%
  0.0005:5min 收益率 0.05% = 5bps → 高门槛,只有强趋势才触发

注意:此参数替代 v5.0 sigmoid 的中心点 2.0,具有明确的经济学含义。
可根据回测中"Layer 0 误拦截率"调优:误拦截多则提高 delta,漏判多则降低。

Layer 0:NIG 先验(内部常量)

alpha0 = 3.0(v5.1,vs v5.0 的 1.0)
  v5.0: alpha0=1 -> Student-t 自由度 2 → 尾部极重,预热期数值波动大
  v5.1: alpha0=3 -> Student-t 自由度 6 → 更稳定的先验,兼顾重尾容忍度
  这是内部常量,不暴露为用户参数。如需调整,修改 _BOCPD.__init__ 中的 alpha0。

Layer 1:MTF 连续加权

参数 默认值 范围 说明
mtf_soft_lower 0.30 0.20-0.40 低于此值无 MTF 修正
mtf_soft_upper 0.70 0.55-0.80 高于此值满 MTF 修正
mtf_max_reduction 0.20 0.10-0.30 最大 ER 阈值降低比例
mtf_soft_lower / mtf_soft_upper 调优:
  [0.20, 0.55]:宽窗口,1h ER 稍有趋势即开始修正 → 更敏感
  [0.30, 0.70]:标准值,平衡敏感度和稳定性
  [0.40, 0.80]:窄窗口,需要较强 1h 趋势才修正 → 更保守

mtf_max_reduction 调优:
  0.10:最多降 10% ER 阈值 → 温和修正
  0.20:最多降 20% ER 阈值 → 标准值
  0.30:最多降 30% ER 阈值 → 激进修正,需谨慎

替代 v5.0 的 mtf_er_threshold(阶梯函数阈值 0.50),
v5.1 不再有不连续跳变。

Layer 2:CUSUM Huber 鲁棒化 + BTC 滞后

参数 默认值 范围 说明
cusum_z_clip 5.0 3.0-8.0 z 值 Huber 截断点
btc_stress_lag 3 1-10 BTC stress 滞后根数(x5min)
cusum_z_clip 调优:
  3.0:激进截断,强力抑制极端值 → 可能过度削弱真实信号
  5.0:标准值,保留 99.99997% 高斯分布范围 → 只截断加密特有的极端尾部
  8.0:温和截断,仅截断最极端的插针 → 接近无截断

btc_stress_lag 调优:
  1:5 分钟滞后 → 近似同步,因果意义有限
  3:15 分钟滞后 → 标准值,覆盖主流 BTC->alt 传导时间
  5:25 分钟滞后 → 保守,捕捉较慢的传导效应
  10:50 分钟滞后 → 可能过长,BTC 信号已衰减

建议根据 Granger 因果检验确定最优 lag。

Layer 3:Hurwicz 偏差校正 + 回望窗口

参数 默认值 范围 说明
spread_hurwicz_correction true -- Hurwicz 偏差校正开关
spread_lookback 40 20-80 Spread 回望根数(v5.1 默认从 20 改为 40)
spread_lookback 调优:
  20:v5.0 默认值,回望 100 分钟 → 统计功效不足
  40:v5.1 默认值,回望 200 分钟 → 平衡功效和时效性
  60:回望 300 分钟 → 高功效但可能滞后
  80:回望 400 分钟 → 接近系统启动时间要求

建议:设为 max_hold_hours * 60 / 5 / 5 ~ max_hold_hours * 60 / 5 / 3
即最大持仓时间的 1/5 到 1/3(以 5min bar 为单位)

7.2 继承 v5.0/v4.0 参数

Layer 0(BOCPD hazard_rate/trend_threshold + DFA-2 后备)、Layer 1(ER/RS)、Layer 2(CUSUM/VWPM/BTC 基础)、Layer 3(PP/KPSS/ER)、跨层(cross_layer_factor)的参数调优指南与 v5.0 完全一致,此处不重复。

7.3 回测验证建议

对比七组回测:
  A 组:无动量过滤(基线)
  B 组:v3.0(RS + DFA-1 + ADF+ER)
  C 组:v4.0(RS + DFA-2+R2 + VWPM + BTC + ADF+KPSS+ER)
  D 组:v5.0 完整版
  E 组:v5.1 完整版
  F 组:v5.1 关闭各 v5.1 子功能(评估各改进项贡献)
    F1: 关闭贝叶斯后验(恢复 sigmoid)→ 评估 P(|mu|>delta) vs sigmoid
    F2: 关闭 Huber 鲁棒化(cusum_z_clip=inf)→ 评估极端事件处理
    F3: 关闭 BTC 滞后(btc_stress_lag=0)→ 评估因果 vs 同期
    F4: 关闭 MTF 连续加权(恢复阶梯函数)→ 评估边界稳定性
    F5: 关闭 Hurwicz 校正 → 评估半衰期偏差影响
    F6: spread_lookback=20(恢复 v5.0)→ 评估样本量影响
  G 组:v5.1 + Phase 2 OFI(L2 book 集成后)

核心指标:
  - Sharpe Ratio(风险调整收益)
  - 最大连续亏损次数
  - 信号过滤率 / 误杀率
  - Layer 3 仲裁放行率 / 半衰期拒绝率
  - Layer 0 BOCPD vs DFA-2 激活比例
  - Layer 2 极端值触发频率(Huber 前后对比)
  - 分波动率区间表现(高/中/低波动率环境)

预期 v5.1 vs v5.0 增量收益:
  贝叶斯后验:Layer 0 误拦截减少 10-20%(经济含义的 delta vs 纯数学 sigmoid)
  Huber 鲁棒化:Layer 2 极端事件误触发减少 30-50%(单根闪崩不再触发)
  BTC 滞后因果:Layer 2 BTC 同步反应误判减少 15-25%
  MTF 连续加权:Layer 1 边界条件稳定性提升(难以量化,定性改善)
  Hurwicz 校正:Layer 3 半衰期误拦截减少 10-15%(偏差校正 6-10%)
  回望窗口扩展:Layer 3 PP/KPSS/OU 整体准确率提升 10-20%

8. 风险与局限性

8.1 已知局限

局限 说明 缓解措施
BOCPD hazard_rate 敏感性 先验变点概率影响机制长度估计 默认 0.01 在多种市况验证;可通过回测优化
BOCPD 预热期 需 20 根 K 线(100 分钟)才就绪 DFA-2 自动后备;启动时 DB 回填
Student-t CDF 近似精度 v5.1 的贝叶斯后验需要 CDF 计算,使用连分数近似 精度优于 0.001(df>=2);可用 scipy 验证
半衰期估计对小样本敏感 spread_lookback=40 时 OLS 仅 39 个观测值 v5.1 Hurwicz 校正 + 扩展样本量缓解
PP 需 arch 库 arch 不一定已安装 自动降级为 ADF
MTF 1h 聚合延迟 1h bar 需要 12 根 5m bar 才完成 前 12 根无 MTF 数据,影响温和(连续加权渐进生效)
自适应 drift 可能过度补偿 极端波动率时 drift 变化大 clamp(0.5, 2.0) 限制缩放范围
Huber clip 阈值 cusum_z_clip=5.0 可能对某些币种不最优 可通过回测调优;范围 3-8
BTC lag 最优值未知 btc_stress_lag=3 凭市场微观结构文献 可通过 Granger 因果检验确定;3 是合理起点
跨层因子的最优值未知 cross_layer_factor=0.4 凭经验 影响温和(最多 10% 阈值调整),回测优化
参数总数增加 v5.1 共 38 个参数(v5.0 为 33 个,v4.0 为 24 个) 全有默认值;P1 引入元参数降维(见 8.3)

8.2 不适合的场景

  • 极低流动性资产:RS 波动率失真,BOCPD 因稀疏数据后验不收敛
  • HIP-3 稀疏资产:volume 不可靠时 VWPM 无效,OFI 也无法使用
  • 极短持仓策略:半衰期约束对 max_hold_hours < 4h 的策略过于宽松

8.3 后续迭代方向(v5.1 之后)

优先级 方向 说明
P1 元参数降维 引入 risk_attitude in [0,1] 驱动所有子参数联合缩放(38 个参数 -> 1~3 个元参数)
P1 Phase 2: OFI 集成 L2 book 数据管道 -> MomentumFilter,替代 VWPM
P1 BOCPD hazard_rate 自适应 Beta 后验在线更新 hazard_rate
P2 Transfer Entropy BTC 因子 信息论框架度量 BTC->alt 因果信息流,替代 CUSUM stress
P2 Kalman Filter 动态对冲比率 追踪时变协整向量,Kalman gain 异常时 Layer 0 触发
P2 对数似然比跨层框架 统一各层输出为 log-odds,逻辑回归校准权重(v6.0 架构)
P3 BOCPD 非高斯观测模型 Fearnhead & Liu (2007) particle-based BOCPD,支持 Student-t 观测
P3 Copula 尾部依赖 建模 BTC-alt 非线性尾部依赖,替代线性 stress factor
P3 Shiryaev-Roberts 对照 SR 程序与 CUSUM 的 minimax 最优性对比验证
P3 Variance Ratio 平稳性检验 Lo & MacKinlay (1988),小样本性质优于 ADF/PP
P4 HMM 离线训练 用历史数据训练两状态 HMM,作为 BOCPD 的交叉验证

附录 A:算法学术参考

算法 原始论文 核心贡献
BOCPD Adams, R.P. & MacKay, D.J.C. (2007) arXiv:0710.3742 贝叶斯在线变点检测,NIG 共轭先验,概率化机制推断
NIG 共轭后验 Murphy, K.P. (2007) Conjugate Bayesian analysis of the Gaussian distribution NIG 后验精确推导,P(mu>delta|data) 闭式计算
DFA / DFA-2 Peng et al. (1994); Kantelhardt et al. (2002) 去趋势波动分析,Hurst 指数估计
DFA in Finance Mantegna & Stanley (1995) Nature DFA 引入金融时间序列
Kaufman ER Kaufman, P. (1995) Smarter Trading 方向距离/路径长度衡量趋势效率
CUSUM Page, E.S. (1954) Biometrika 变化检测最优在线算法
CUSUM 最优性 Lorden, G. (1971) Ann. Math. Statist. CUSUM 一阶渐近最优
CUSUM 自适应 drift Hawkins & Olwell (1998) Cumulative Sum Charts CUSUM ARL 依赖 drift 与信号比,建议自适应调整
Huber 估计器 Huber, P.J. (1964) Ann. Math. Statist. 鲁棒估计奠基性工作,截断极端值减少非高斯影响
Rogers-Satchell Rogers, L. & Satchell, S. (1991) Ann. Appl. Prob. 漂移不变 OHLC 波动率
Garman-Klass Garman, M. & Klass, M. (1980) J. Business OHLC 波动率(零漂移假设)
Kyle 模型 Kyle, A.S. (1985) Econometrica 价格+成交量联合信号
OFI Cont, R., Kukanov, A. & Stoikov, S. (2014) J. Financial Econometrics Order Flow Imbalance,盘口微观结构量价指标
ADF 检验 Dickey, D.A. & Fuller, W.A. (1979) JASA 单位根检验
KPSS 检验 Kwiatkowski et al. (1992) J. Econometrics 平稳性检验(ADF 对偶)
Phillips-Perron Phillips, P.C.B. & Perron, P. (1988) Biometrika 非参数异方差鲁棒单位根检验
时间序列动量 Moskowitz, Ooi & Pedersen (2012) J. Financial Economics 资产间动量溢出效应
加密市场微观结构 Makarov, I. & Schoar, A. (2020) J. Financial Economics 加密市场跨交易所价格传导延迟
OU 过程半衰期 Vidyamurthy, G. (2004) Pairs Trading 均值回归半衰期估计,配对交易核心参数
Hurwicz 偏差 Hurwicz, L. (1950) Proc. Second Berkeley Symposium AR 模型 OLS 估计的有限样本偏差
Kendall 偏差校正 Kendall, M.G. (1954) Biometrika AR(1) 偏差的一阶校正公式
配对交易综述 Krauss, C. (2017) J. Economic Surveys 配对交易策略综述,指出半衰期偏差校正的必要性
VPIN Easley, Lopez de Prado & O'Hara (2012) Rev. Financial Studies 知情交易概率的成交量同步估计
Shiryaev-Roberts Pollak & Tartakovsky (2008) Ann. Statist. CUSUM 替代方案
Transfer Entropy Schreiber, T. (2000) Physical Review Letters 信息论因果信息流度量
HMM 机制切换 Hamilton, J.D. (1989) Econometrica 马尔可夫机制切换模型
配对交易实证 Gatev, E. et al. (2006) Rev. Financial Studies 配对交易大规模实证验证
Variance Ratio Lo, A.W. & MacKinlay, A.C. (1988) Rev. Financial Studies 方差比检验,小样本性质优于 ADF
Particle BOCPD Fearnhead, P. & Liu, Z. (2007) JRSS-B 基于粒子的 BOCPD,支持非共轭先验

附录 B:v5.0 -> v5.1 改进对照

维度 v5.0 v5.1 改进原因
P(trending|r) 映射 sigmoid(z_r - 2.0) 经验映射 P(|mu| > delta | NIG posterior) 精确贝叶斯 消除最大 ad-hoc 因素,delta 有经济学含义(Murphy 2007)
NIG 先验 alpha0 1.0(自由度 2) 3.0(自由度 6) 减少预热期数值不稳定,保留重尾容忍度
CUSUM z 值 无截断 Huber 鲁棒化 clip=5.0 加密重尾 kurtosis >> 3,减少闪崩/插针误触发(Huber 1964)
BTC 因子时序 当期同步(同期相关) 滞后 lag=3(因果化) BTC->alt 传导延迟 5-30min(Makarov & Schoar 2020)
MTF 阈值修正 阶梯函数 0.80/1.00 连续线性加权 [0.30, 0.70] 消除 ER=0.49/0.51 不连续跳变
OU 半衰期 OLS 无偏差校正 Hurwicz 一阶偏差校正 小样本 theta 向零偏差 15-25%(Hurwicz 1950, Kendall 1954)
Spread 回望窗口 20(100 分钟) 40(200 分钟) PP/KPSS 统计功效提升,OU 标准误降低 30%
新增参数 -- 5 个 drift_threshold, mtf_soft_lower/upper/max_reduction -> 3合1, z_clip, btc_lag, hurwicz_correction
总参数数 33 38 全部有默认值,P1 引入元参数降维
元参数降维 P2 优先级 P1 优先级 38 个参数过拟合风险显著

附录 C:v4.0 -> v5.0 改进对照(继承自 v5.0 文档)

维度 v4.0 v5.0 改进原因
机制检测 DFA-2 点估计 + R-squared 质检 BOCPD 概率化 + DFA-2 后备 概率输出量化不确定性(Adams & MacKay 2007)
机制检测阈值 Hurst > 0.60(二值) P(trending) > 0.75(连续) 连续概率让决策更精细
CUSUM drift 固定 0.5 自适应 prop. RS vol / baseline 跨波动率环境 ARL 稳定性(Hawkins & Olwell 1998)
BTC 因子衰减 线性 1.0 - 0.1*excess 指数 exp(-0.3*excess) 指数衰减更平滑
Spread 单位根检验 ADF(异方差敏感) PP 优先 + ADF 后备 PP 对波动率聚集鲁棒(Phillips & Perron 1988)
Spread 放行条件 平稳即放行 平稳 + 半衰期 < max_hold/2 半衰期直接关联策略持仓期(Vidyamurthy 2004)
时间框架 仅 5min 5min + 1h MTF 确认 多尺度验证减少噪音误触发
层间关系 完全独立 跨层信息共享 via _LayerContext 弱信号梯度传递,提升整体拦截精度

文档结束

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG