开仓动量过滤器设计方案5

开仓动量过滤器设计方案

版本:v5.0
日期:2026-03-06
适用系统:Hyperliquid 量化交易系统(Adaptive Bollinger Z-Score 配对策略)
升级摘要:v4.0 → v5.0 BOCPD 概率化机制检测,OU 半衰期约束,自适应 CUSUM drift,BTC 指数衰减 + 滚动领先滞后,OFI 量价升级路径,Phillips-Perron 检验,多时间框架确认,跨层信息共享


目录

  1. 背景与问题定义
  2. 算法选择与学术依据
  3. 算法具体实现
  4. 融入当前交易体系的方案
  5. 与当前交易风格的契合度分析
  6. 与四项开单约束的对应关系
  7. 参数配置指南
  8. 风险与局限性

1. 背景与问题定义

1.1 交易系统现状

当前系统基于 Adaptive Bollinger Z-Score 配对策略,核心逻辑是协整对的 spread 均值回归:

  • adaptive_z 突破 adaptive_threshold 时产生入场信号
  • adaptive_z 回归至 entry_adaptive_z * reversion_factor 时平仓
  • 使用 5 分钟 K 线,最大持仓时间控制在 72 小时以内(max_hold_hours=72.0
  • 策略引擎 _check_entry() 已有冷却期、突破检测、持仓检查、z4h 绝对值过滤四层前置过滤

1.2 待解决的问题

均值回归策略在以下场景存在明显风险:

场景 问题 后果
资产处于趋势机制(Hurst > 0.6) 均值回归假设本身失效 spread 持续扩张,系统性亏损
单边持续上涨 N 根 K 线后 追涨做多 趋势过度延伸,极易被反转砸穿
单边持续下跌 N 根 K 线后 追跌做空 跌幅充分释放,反弹风险极高
短时间内迅速暴涨 做空对抗强动量 被轧空,止损代价极大
短时间内迅速暴跌 做多接飞刀 继续下杀,止损代价极大

1.3 四项约束目标

约束 1:连续下跌 → 不追跌(不做空)
约束 2:连续上涨 → 不追涨(不做多)
约束 3:迅速暴涨 → 不做空
约束 4:迅速暴跌 → 不做多

1.4 过滤维度

维度 检查对象 原因
Alt 腿(单腿) alt symbol 的价格动量 直接持仓标的的风险
Base 腿(单腿) base symbol 的价格动量 配对交易中 base 腿方向相反,同样承担动量风险
Spread(配对层) 两腿价差的趋势性 单腿动量不等于 spread 动量;两腿同涨但 spread 稳定时不应误杀

1.5 v4.0 算法局限性分析(v5.0 升级动机)

维度 v4.0 做法 局限性 v5.0 解决方案
机制检测 DFA-2 点估计 + R² 质检 60 根 K 线仅 5-7 个尺度点,点估计丢弃不确定性信息 BOCPD 概率化机制检测,输出 P(trending) 而非阈值判断
CUSUM drift 固定 drift=0.5 高波动期过于敏感,低波动期过于迟钝 自适应 drift ∝ RS vol / baseline_vol
BTC 因子 线性衰减 系数 0.1 凭经验,线性衰减无物理依据 指数衰减 exp(-λ·excess),衰减率可调
量价确认 VWPM(K 线级) 只能度量 K 线级别量价方向,无法捕捉盘口微观结构 OFI(L2 book 可用时)+ VWPM 后备
Spread 仲裁 ADF + KPSS ADF 对异方差敏感;无半衰期约束 Phillips-Perron(异方差鲁棒)+ OU 半衰期约束
时间框架 仅 5min 5min 噪音大,短期波动易误触发 多时间框架确认(1h ER 修正 5m 阈值)
层间关系 各层独立判断 Layer 0 的弱趋势信号不被 Layer 1/2 利用 跨层信息共享 via _LayerContext

2. 算法选择与学术依据

2.0 Layer 0:BOCPD 概率化机制检测 + DFA-2 后备

动机:从点估计到概率推断

v4.0 中 Layer 0 使用 DFA-2 估算 Hurst 指数,再阈值判断。核心问题:60 根 5min K 线只有 5-7 个尺度点,Hurst 估计的标准误极大。R² 质检缓解但未根治——它只能"放弃不可靠的估计",不能"让估计变得可靠"。

更根本的问题是:阈值判断丢弃了不确定性信息。Hurst=0.62(刚过 0.60)和 Hurst=0.85(远超 0.60)触发相同的硬拦截,但风险完全不同。

v5.0 将 Layer 0 重新定义为 "P(均值回归机制失效) > threshold",使用 Bayesian Online Changepoint Detection(BOCPD)实现概率化推断。

候选算法深度对比

算法 核心思路 优点 缺点 5min K 线适用性
DFA-2 + R² 多尺度二阶去趋势 RMS 波动 对抛物线趋势鲁棒 点估计,短序列标准误大 一般
HMM (Hamilton 1989) 隐马尔可夫模型,两状态 直接建模机制切换概率 需 EM 训练,批量方法,参数不稳定 需要离线训练
Wavelet Leaders 小波多分辨率 Hurst 短序列更稳定 实现复杂,依赖 PyWavelets 较好
BOCPD (Adams & MacKay 2007) 贝叶斯在线变点检测 + NIG 共轭先验 输出后验概率,在线 O(R) 更新,天然量化不确定性 需调 hazard_rate 最适合

选择:BOCPD + DFA-2 混合机制检测

学术背景:

Adams & MacKay (2007) 提出的 BOCPD 使用贝叶斯框架在线检测时间序列的分布变化。对每个时刻,维护一个"运行长度"(当前机制持续多久)的后验分布。使用 Normal-Inverse-Gamma(NIG)共轭先验,对高斯观测的预测分布为 Student-t,可解析更新,无需 MCMC。

为什么 BOCPD 优于 DFA-2:

  1. 概率输出 vs 点估计:BOCPD 输出 P(trending)∈[0,1],而非 Hurst > 0.6 的二值判断
  2. 天然不确定性量化:数据不足时 P(trending) 趋向先验(≈0.5),不会产生假信号
  3. 在线 O(R) 更新:每根 K 线更新一次,R=200(截断),计算开销 << DFA-2
  4. 无固定窗口:DFA-2 需要固定 60 根窗口;BOCPD 自适应检测机制长度
  5. 机制切换检测:DFA-2 检测"当前是否趋势";BOCPD 额外检测"机制是否刚切换"

BOCPD 核心算法:

输入:对数收益率序列 {r_t},hazard_rate H(变点先验概率,默认 0.01)
先验:Normal-Inverse-Gamma (μ₀=0, κ₀=1, α₀=1, β₀=0.01)

每个时刻 t:
  对每个运行长度 r = 0, 1, ..., R:
    1. 预测概率:π(r_t | r) = Student-t_{2α_r}(r_t | μ_r, β_r(κ_r+1)/(α_r·κ_r))
    2. 增长概率:P(r_t = r+1) = P(r_{t-1} = r) · π(r_t | r) · (1-H)
    3. 变点概率:P(r_t = 0) = Σ_r P(r_{t-1} = r) · π(r_t | r) · H
    4. 更新 NIG 充分统计量:
       κ' = κ + 1,  μ' = (κμ + r_t) / κ'
       α' = α + 0.5,  β' = β + κ(r_t - μ)² / (2κ')
    5. 归一化

趋势概率(贝叶斯边缘化):
  P(trending) = Σ_r P(r_t = r) · P(trending | r)

  其中 P(trending | r) 基于运行长度 r 的后验均值显著性:
    z_r = |μ_r| / sqrt(β_r / (α_r · κ_r))
    P(trending | r) = sigmoid(z_r - 2.0)

DFA-2 作为后备(继承 v4.0):

BOCPD 需要约 20 根 K 线预热。预热期间或 BOCPD 不可用时,降级为 v4.0 的 DFA-2 + R² 质检。

混合决策逻辑:

if BOCPD 就绪(≥ 20 根更新):
    trend_prob = bocpd.trend_probability
    if trend_prob > bocpd_trend_threshold(默认 0.75):
        → 硬拦截:P(trending)={trend_prob:.2f} > {threshold}
    else:
        → 放行,但将 trend_prob 共享给 Layer 1/2(跨层信息)
else:
    → 降级为 DFA-2 + R² 质检(v4.0 逻辑)

跨层信息共享(v5.0 新增):

Layer 0 的 trend_prob 即使未达到硬拦截阈值,也包含有价值的信息。例如 trend_prob=0.55 说明有弱趋势信号。v5.0 将此信息传递给下游层:

Layer 1:如果 trend_prob > 0.5,ER 阈值降低(更容易触发软拦截)
  er_thresh *= 1.0 - cross_layer_factor * max(0, trend_prob - 0.5)
  cross_layer_factor = 0.4 → trend_prob=0.75 时 er_thresh 降低 10%

Layer 2:如果 trend_prob > 0.5,CUSUM 阈值降低(更早检测急动)
  cusum_thresh *= 1.0 - cross_layer_factor * 0.5 * max(0, trend_prob - 0.5)
  影响更温和(× 0.5),因为 Layer 2 已有 BTC 因子调节

2.1 约束 1&2:持续单边行情过滤(Layer 1)

核心算法:Kaufman ER + RS 自适应净位移(继承 v4.0)

ER 是衡量趋势质量的最佳简单指标,O(N) 复杂度,可解释性强。非对称 ER 阈值和 RS 自适应设计继承 v4.0,不做改动。

v5.0 关键改进 1:多时间框架确认(Multi-Timeframe Confirmation)

5min K 线噪音大,短期波动容易误触发 Layer 1。在专业量化系统中,多时间框架确认是基础设施级别的功能。

v5.0 将 5min bars 聚合为 1h bars(每 12 根聚合一次),在 1h 级别计算 ER 作为确认信号:

# 1h ER 确认逻辑
1h_closes = 最近 mtf_lookback 根 1h K 线(默认 6 根 = 6 小时)

1h_er = direction_distance(1h) / path_length(1h)

if 1h_er >= mtf_er_threshold(默认 0.50):
    → 1h 确认趋势 → 5m ER 阈值降低 20%(更容易触发)
    er_thresh *= 0.80

含义:
  5m 和 1h 都显示趋势 → 高置信度,降低拦截门槛
  5m 显示趋势但 1h 不确认 → 可能是短期噪音,维持原阈值

v5.0 关键改进 2:跨层 BOCPD 信号调节

见 2.0 节跨层信息共享。当 BOCPD 检测到弱趋势信号时(0.5 < trend_prob < 0.75),Layer 1 的 ER 阈值进一步降低,使得持续趋势过滤更敏感。

核心公式(完整版,含 v5.0 修正):

N = sustained_lookback(默认 30 根 5min = 150 分钟)

# Kaufman Efficiency Ratio
ER = abs(close[-1] - close[-N]) / Σ|close[i] - close[i-1]|

# 基础非对称 ER 阈值
er_thresh_base = er_threshold_long(0.60)或 er_threshold_short(0.50)

# v5.0 多时间框架修正
if mtf_enabled AND 1h_er >= mtf_er_threshold:
    er_thresh_base *= 0.80

# v5.0 跨层 BOCPD 修正
if cross_layer_enabled AND bocpd_trend_prob > 0.5:
    er_thresh_base *= 1.0 - 0.4 * (bocpd_trend_prob - 0.5)

# RS 自适应净位移阈值(继承 v4.0)
adaptive_thresh = base_threshold * (rs_vol / per_symbol_baseline)
                 (缩放范围 0.3x ~ 3.0x)

# 联合触发(继承 v4.0)
约束1:direction == 'short' AND net_return <= -adaptive_thresh AND ER >= er_thresh
约束2:direction == 'long'  AND net_return >= +adaptive_thresh AND ER >= er_thresh

2.2 约束 3&4:急涨急跌过滤(Layer 2)

RS 波动率(继承 v4.0)

Rogers-Satchell (1991) 漂移不变 OHLC 波动率估计器,完全继承 v4.0,不做改动。

v5.0 关键改进 3:自适应 CUSUM drift

问题: v4.0 的 drift=0.5 是固定值。在高波动期(RS_vol 远高于基准),CUSUM 标准化后的 z 值波动加大,固定 drift 无法补偿,导致误报增加。在低波动期,固定 drift 又过于宽松。

解决方案: drift 随 RS 波动率的相对水平动态缩放。

# 自适应 drift
vol_ratio = rs_vol_current / rs_vol_baseline
adaptive_drift = base_drift * clamp(vol_ratio, drift_scale_min, drift_scale_max)
               = 0.5 * clamp(vol_ratio, 0.5, 2.0)

高波动期:vol_ratio=2.0 → drift=1.0 → CUSUM 更不敏感(合理:高波动期急动更常见)
低波动期:vol_ratio=0.3 → drift=0.25 → CUSUM 更敏感(合理:低波动期急动更异常)

学术依据: Hawkins & Olwell (1998, Cumulative Sum Charts and Charting for Quality Improvement) 指出 CUSUM 的 ARL (Average Run Length) 性能依赖 drift 与实际信号幅度的比率,建议根据过程特性自适应调整。

v5.0 关键改进 4:BTC 因子指数衰减

问题: v4.0 使用线性衰减 stress_factor = max(0.7, 1.0 - 0.1·excess),系数 0.1 无理论依据,且线性函数在边界处不平滑。

解决方案: 指数衰减更符合物理直觉(影响随距离指数递减)。

# v4.0:线性衰减
stress_factor = max(0.7, 1.0 - 0.1 * (btc_stress - threshold))

# v5.0:指数衰减
stress_factor = max(btc_stress_factor_min, exp(-btc_decay_rate * (btc_stress - threshold)))

btc_decay_rate = 0.3(默认):
  excess=1.0 → factor=0.74
  excess=2.0 → factor=0.55(但 min=0.7 → 0.70)
  excess=3.0 → factor=0.41(但 min=0.7 → 0.70)

v5.0 关键改进 5:OFI 量价升级路径(Phase 2)

当前 VWPM(继承 v4.0) 在 K 线级别度量量价方向一致性,是 Kyle (1985) 模型的简化实现。但 K 线级别的颗粒度有限,无法捕捉盘口微观结构。

OFI(Order Flow Imbalance,Cont, Kukanov & Stoikov 2014) 直接从 L2 book 变化中度量买卖压力不平衡:

OFI = Σ (ΔQ_bid · I(ΔP_bid ≥ 0) - ΔQ_ask · I(ΔP_ask ≤ 0))

OFI > 0 → 净买压 → 上涨有微观结构支撑
OFI < 0 → 净卖压 → 下跌有微观结构支撑

系统已订阅 L2Book,具备 OFI 数据基础。但 OFI 集成需要额外数据管道改造(L2 book → momentum_filter),标记为 Phase 2 实施。Phase 1 继续使用 VWPM。

完整 CUSUM 公式(v5.0):

ret(i) = (close(i) - close(i-1)) / close(i-1)
z(i)   = ret(i) / rs_vol

# 自适应 drift(v5.0)
vol_ratio = rs_vol / baseline_vol
adaptive_drift = base_drift * clamp(vol_ratio, 0.5, 2.0)

S_pos(i) = max(0, S_pos(i-1) + z(i) - adaptive_drift)
S_neg(i) = max(0, S_neg(i-1) - z(i) - adaptive_drift)

# BTC 指数衰减因子(v5.0)
if symbol != BTC AND btc_stress > btc_stress_threshold:
    excess = btc_stress - btc_stress_threshold
    stress_factor = max(btc_stress_factor_min, exp(-btc_decay_rate * excess))
    effective_thresh_up   *= stress_factor
    effective_thresh_down *= stress_factor

# 跨层 BOCPD 修正(v5.0)
if bocpd_trend_prob > 0.5:
    cusum_thresh *= 1.0 - 0.2 * max(0, bocpd_trend_prob - 0.5)

# 量价确认(VWPM, Phase 1;OFI, Phase 2)
volume_confirmed = volume_ratio >= 1.5 AND |VWPM|/rs_vol >= vwpm_confirm_ratio

# 非对称触发
约束3:direction=='short' AND S_pos >= effective_thresh_up AND volume_confirmed
约束4:direction=='long'  AND S_neg >= effective_thresh_down AND volume_confirmed

2.3 Spread 层面趋势检测(Layer 3)

v5.0 改进:PP + KPSS + ER + OU 半衰期约束

v5.0 关键改进 6:Phillips-Perron 检验替代/补充 ADF

问题: ADF 检验假设误差项为 i.i.d.,但加密市场存在显著的波动率聚集(GARCH 效应)和序列相关。这违反 ADF 假设,导致尺寸扭曲。

解决方案: Phillips-Perron (1988) 检验使用 Newey-West 核估计对检验统计量做非参数修正,对异方差和序列相关鲁棒。

PP vs ADF:
  ADF:augmented regression 消除序列相关(添加滞后项) → 对异方差不鲁棒
  PP:非参数修正消除序列相关和异方差 → 对两者均鲁棒
  加密市场:波动率聚集显著 → PP 更可靠

v5.0 中 PP 优先于 ADF。当 arch 库可用时使用 PP,不可用时降级为 ADF。

v5.0 关键改进 7:OU 半衰期约束

问题: ADF/KPSS/PP 只回答"序列是否平稳",不回答"均值回归有多快"。一个半衰期 200 小时的序列通过平稳性检验,但在 72 小时最大持仓时间内根本来不及回归。

这是 v4.0 最关键的遗漏——对均值回归策略而言,半衰期比平稳性更直接相关。

解决方案: 在平稳性检验通过后,估计 Ornstein-Uhlenbeck 过程的半衰期:

# OU 过程:dX = θ(μ - X)dt + σdW
# 离散化:ΔX_t = θ·X_{t-1} + c + ε_t
# OLS 回归 ΔX on X_{t-1} → 斜率 = θ
# 半衰期 = -ln(2) / ln(1 + θ)

θ < 0  → 均值回归(θ 越负,回归越快)
θ ≥ 0  → 无均值回归(趋势或随机游走)
半衰期 > max_hold_hours / 2 → 回归太慢,在持仓期内大概率来不及

约束:half_life_hours < spread_half_life_max(默认 36h = max_hold_hours/2)

学术依据: Vidyamurthy (2004, Pairs Trading) 指出半衰期是配对交易中最关键的参数之一,决定了策略的预期持仓时间和胜率。

完整 Layer 3 决策逻辑(v5.0):

# 第一关:ER 快速筛选(O(N),继承 v4.0)
if spread_er < spread_er_threshold AND NOT direction_ok:
    → 无趋势,放行

# 第二关:PP/ADF + KPSS 平稳性检验
unit_root_p = PP_pvalue(优先)或 ADF_pvalue(后备)
kpss_p = KPSS_pvalue

四象限决策矩阵(继承 v4.0,PP 替代 ADF):
  ur_stationary(p<0.05)+ kpss_stationary(p>0.05)→ 确认平稳 → 第三关
  ur_non_stationary + kpss_non_stationary → 确认非平稳 → 维持拦截
  矛盾 / 不确定 → 保守维持拦截

# 第三关:OU 半衰期约束(v5.0 新增,仅在确认平稳时执行)
half_life = OLS 估计 OU 半衰期(小时)
if half_life is None OR half_life > spread_half_life_max:
    → 回归太慢或无回归 → 维持拦截
else:
    → 平稳 + 半衰期合理 → 推翻拦截,放行

2.4 跨层信息共享架构(v5.0 新增)

v4.0 的四层完全独立判断,信息不跨层传递。v5.0 引入 _LayerContext 数据结构,在 check() 调用内部共享信息:

@dataclass
_LayerContext:
    bocpd_trend_prob: float | None   # Layer 0 → Layer 1, 2
    hurst: float | None              # Layer 0 → 诊断
    hurst_r2: float | None           # Layer 0 → 诊断

信息流向:
  Layer 0 执行 → 填充 ctx.bocpd_trend_prob
    ↓
  Layer 2 使用 ctx.bocpd_trend_prob 调节 CUSUM 阈值
    ↓
  Layer 1 使用 ctx.bocpd_trend_prob 调节 ER 阈值
    ↓
  Layer 3 使用 z4h_history + 半衰期(独立于 ctx)

设计原则:
  - 跨层影响是温和的修正因子(最多 10-15% 阈值调整),不是决定性开关
  - 各层仍可独立关闭/测试
  - ctx 仅在单次 check() 调用内有效,不跨调用持久化

2.5 算法组合架构(五层 + 跨层共享)

输入信号(direction, alt_symbol, base_symbol)
    │
    ├──▶ Layer 0: BOCPD 概率化机制检测 + DFA-2 后备           [v5.0 重写]
    │      ├── BOCPD: P(trending) per symbol
    │      ├── DFA-2 后备: Hurst + R² 质检(BOCPD 预热期)
    │      ├── 共享: ctx.bocpd_trend_prob → Layer 1/2 阈值调节
    │      └── P(trending) > threshold → 硬拦截
    │
    ├──▶ Layer 2: 自适应 CUSUM + RS + VWPM + BTC 指数因子     [v5.0 升级]
    │      ├── 自适应 drift = base_drift × clamp(vol_ratio, 0.5, 2.0)
    │      ├── BTC 指数衰减: exp(-λ·excess)
    │      ├── 跨层: ctx.bocpd_trend_prob → 进一步调低阈值
    │      ├── VWPM + 量比联合确认(Phase 2: OFI 替代)
    │      └── 任一腿触发 → 硬拦截
    │
    ├──▶ Layer 1: ER + RS 净位移 + MTF 确认                   [v5.0 升级]
    │      ├── 1h ER 确认 → 5m ER 阈值 × 0.80
    │      ├── 跨层: ctx.bocpd_trend_prob → ER 阈值调低
    │      └── 任一腿触发 → 软拦截 → Layer 3 仲裁
    │
    └──▶ Layer 3: PP + KPSS + ER + OU 半衰期约束              [v5.0 升级]
           ├── ER 快速筛(O(N))
           ├── PP(优先)/ ADF + KPSS 平稳性检验
           ├── OU 半衰期 < max_hold_hours / 2 约束
           └── 平稳 + 半衰期合理 → 推翻拦截;否则维持
    │
    ▼
  最终决策:允许 / 拒绝入场

设计理念(v5.0):

层级 类型 可仲裁 触发含义 v5.0 变化
Layer 0 硬拦截 均值回归假设根本失效 BOCPD 概率化 + DFA-2 后备 + 跨层共享
Layer 2 硬拦截 急动执行风险极高 自适应 drift + BTC 指数衰减 + 跨层修正
Layer 1 软拦截 单腿持续趋势,可能误杀 MTF 确认 + 跨层 BOCPD 修正
Layer 3 仲裁器 Spread 平稳+半衰期合理则放行 PP 替代 ADF + OU 半衰期约束

3. 算法具体实现

3.1 模块位置

src/trading/
  momentum_filter.py    ← 重写(BOCPD + 自适应 CUSUM + MTF + 半衰期 + 跨层共享)
  strategy.py           ← 修改 _check_entry(),注入并调用 filter;SymbolBaseline 新增 z4h_history
  config.py             ← 修改 StrategyParams,新增过滤器参数(含 v5.0 新增参数)
  orchestrator.py       ← 修改 process_analysis(),透传 OHLCV + volume
src/services/
  realtime_kline_service_base.py  ← 修改 _trigger_strategy_if_ready(),提取双腿 OHLCV + volume

3.2 MomentumFilter 完整代码(v5.0)

# src/trading/momentum_filter.py
"""
开仓动量过滤器 v5.0

四层过滤架构 + 跨层信息共享:
  Layer 0: BOCPD 概率化机制检测 + DFA-2 后备   → 硬拦截     [v5.0 重写]
  Layer 1: ER + RS 净位移 + MTF 确认            → 软拦截     [v5.0 升级]
  Layer 2: 自适应 CUSUM + RS + VWPM + BTC 指数  → 硬拦截     [v5.0 升级]
  Layer 3: PP + KPSS + ER + OU 半衰期           → 仲裁       [v5.0 升级]

v5.0 改进清单:
  1. [Layer 0] BOCPD 概率化机制检测(Adams & MacKay 2007)
  2. [Layer 0] DFA-2 保留为后备
  3. [Layer 0→1,2] 跨层共享 bocpd_trend_prob
  4. [Layer 1] 多时间框架确认(1h ER 修正 5m 阈值)
  5. [Layer 2] 自适应 CUSUM drift ∝ RS vol / baseline
  6. [Layer 2] BTC 因子指数衰减
  7. [Layer 3] Phillips-Perron 检验(异方差鲁棒)
  8. [Layer 3] OU 半衰期约束
"""

import math
import warnings
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime

try:
    from statsmodels.tsa.stattools import adfuller as _adfuller
    _HAS_ADF = True
except ImportError:
    _HAS_ADF = False

try:
    from statsmodels.tsa.stattools import kpss as _kpss
    _HAS_KPSS = True
except ImportError:
    _HAS_KPSS = False

try:
    from arch.unitroot import PhillipsPerron as _PhillipsPerron
    _HAS_PP = True
except ImportError:
    _HAS_PP = False


# ──────────────────────────────────────────────────────────────
# 辅助函数
# ──────────────────────────────────────────────────────────────

def _logsumexp(vals: list[float]) -> float:
    """Numerically stable log-sum-exp."""
    if not vals:
        return float('-inf')
    m = max(vals)
    if m == float('-inf'):
        return float('-inf')
    return m + math.log(sum(math.exp(v - m) for v in vals))


def _student_t_logpdf(x: float, mu: float, scale_sq: float, df: float) -> float:
    """Student-t log PDF: log St(x | mu, scale^2, df)."""
    if scale_sq <= 1e-30 or df <= 0:
        return -50.0
    z_sq = (x - mu) ** 2 / (df * scale_sq)
    return (math.lgamma((df + 1) / 2)
            - math.lgamma(df / 2)
            - 0.5 * math.log(df * math.pi * scale_sq)
            - (df + 1) / 2 * math.log(1 + z_sq))


def _estimate_half_life(series: list[float], bar_minutes: float = 5.0) -> float | None:
    """
    OU 过程半衰期估计(OLS 回归)。

    模型: DeltaX_t = theta * X_{t-1} + c + eps
    半衰期 = -ln(2) / ln(1 + theta)

    Returns: 半衰期(小时),或 None(无均值回归信号)。
    """
    n = len(series)
    if n < 10:
        return None

    y = [series[i] - series[i - 1] for i in range(1, n)]
    x = series[:-1]
    n_obs = len(y)

    x_mean = sum(x) / n_obs
    y_mean = sum(y) / n_obs

    cov_xy = sum((x[i] - x_mean) * (y[i] - y_mean) for i in range(n_obs))
    var_x = sum((x[i] - x_mean) ** 2 for i in range(n_obs))

    if var_x < 1e-12:
        return None

    theta = cov_xy / var_x
    if theta >= 0 or theta <= -1:
        return None

    half_life_bars = -math.log(2) / math.log(1 + theta)
    half_life_hours = half_life_bars * bar_minutes / 60.0
    return half_life_hours if half_life_hours > 0 else None


# ──────────────────────────────────────────────────────────────
# BOCPD: Bayesian Online Changepoint Detection
# ──────────────────────────────────────────────────────────────

class _BOCPD:
    """
    Bayesian Online Changepoint Detection (Adams & MacKay 2007).

    Normal-Inverse-Gamma 共轭先验,Student-t 预测分布。
    Log-space 实现保证数值稳定性。O(R) per update, R = max_run。
    """

    __slots__ = ('_log_h', '_log_1mh', '_mu0', '_kappa0', '_alpha0', '_beta0',
                 '_max_run', '_log_probs', '_suff', '_n_updates')

    def __init__(self, hazard_rate: float = 0.01, mu0: float = 0.0,
                 kappa0: float = 1.0, alpha0: float = 1.0,
                 beta0: float = 0.01, max_run: int = 200):
        self._log_h = math.log(max(1e-10, hazard_rate))
        self._log_1mh = math.log(max(1e-10, 1.0 - hazard_rate))
        self._mu0, self._kappa0 = mu0, kappa0
        self._alpha0, self._beta0 = alpha0, beta0
        self._max_run = max_run
        self._log_probs: list[float] = [0.0]
        self._suff: list[tuple[float, float, float, float]] = [
            (mu0, kappa0, alpha0, beta0)
        ]
        self._n_updates = 0

    def update(self, x: float) -> None:
        """处理新观测值(对数收益率)。"""
        n = len(self._log_probs)

        # 1. 每个运行长度的预测 log-PDF
        log_preds = []
        for i in range(n):
            mu, kappa, alpha, beta = self._suff[i]
            if kappa > 0 and alpha > 0 and beta > 0:
                scale_sq = beta * (kappa + 1) / (alpha * kappa)
                log_preds.append(_student_t_logpdf(x, mu, scale_sq, 2 * alpha))
            else:
                log_preds.append(-50.0)

        # 2. 增长概率
        log_growth = [
            self._log_probs[i] + log_preds[i] + self._log_1mh
            for i in range(n)
        ]

        # 3. 变点概率
        log_cp = _logsumexp([
            self._log_probs[i] + log_preds[i] + self._log_h
            for i in range(n)
        ])

        # 4. 新分布 + 归一化
        new_log = [log_cp] + log_growth
        log_total = _logsumexp(new_log)
        new_log = [lp - log_total for lp in new_log]

        # 5. 更新 NIG 充分统计量
        new_suff: list[tuple[float, float, float, float]] = [
            (self._mu0, self._kappa0, self._alpha0, self._beta0)
        ]
        for i in range(n):
            mu, kappa, alpha, beta = self._suff[i]
            kappa_n = kappa + 1
            mu_n = (kappa * mu + x) / kappa_n
            alpha_n = alpha + 0.5
            beta_n = beta + kappa * (x - mu) ** 2 / (2 * kappa_n)
            new_suff.append((mu_n, kappa_n, alpha_n, beta_n))

        # 6. 截断
        if len(new_log) > self._max_run:
            new_log = new_log[:self._max_run]
            new_suff = new_suff[:self._max_run]
            log_total = _logsumexp(new_log)
            new_log = [lp - log_total for lp in new_log]

        self._log_probs = new_log
        self._suff = new_suff
        self._n_updates += 1

    @property
    def trend_probability(self) -> float:
        """
        P(trending) — 贝叶斯后验边缘化。

        对每个运行长度 r,检查后验均值是否显著偏离零。
        使用 sigmoid 映射为概率,然后按运行长度概率加权求和。
        """
        total = 0.0
        for i in range(len(self._log_probs)):
            prob = math.exp(self._log_probs[i])
            if prob < 1e-8:
                continue
            mu, kappa, alpha, beta = self._suff[i]
            if alpha > 0.5 and beta > 1e-30 and kappa > 0:
                var_mean = beta / (alpha * kappa)
                std_mean = math.sqrt(var_mean) if var_mean > 0 else 1e-10
                z = abs(mu) / std_mean if std_mean > 1e-10 else 0
                p_trend = 1.0 / (1.0 + math.exp(-(z - 2.0)))
            else:
                p_trend = 0.5
            total += prob * p_trend
        return total

    @property
    def ready(self) -> bool:
        return self._n_updates >= 20


# ──────────────────────────────────────────────────────────────
# DFA-2 Hurst 后备(继承 v4.0)
# ──────────────────────────────────────────────────────────────

def _hurst_dfa2(closes: list[float], r2_threshold: float = 0.85) -> tuple[float | None, float | None]:
    """
    DFA-2(二阶去趋势波动分析)估算 Hurst 指数(v4.0 继承)。
    返回 (hurst, r_squared),不可靠时返回 (None, None)。
    """
    n = len(closes)
    if n < 20:
        return None, None

    rets = []
    for i in range(1, n):
        if closes[i] > 0 and closes[i - 1] > 0:
            rets.append(math.log(closes[i] / closes[i - 1]))
    nr = len(rets)
    if nr < 16:
        return None, None

    mean_r = sum(rets) / nr
    profile = []
    cum = 0.0
    for r in rets:
        cum += r - mean_r
        profile.append(cum)

    scales: list[float] = []
    rms_vals: list[float] = []
    s = 4
    max_s = nr // 4
    while s <= max_s:
        n_win = nr // s
        if n_win < 2:
            break
        rss_sum = 0.0
        rss_cnt = 0
        for i in range(n_win):
            seg = profile[i * s: (i + 1) * s]
            ws = len(seg)
            if ws < 3:
                if ws >= 2:
                    x_m = (ws - 1) * 0.5
                    y_m = sum(seg) / ws
                    cov = sum((j - x_m) * (seg[j] - y_m) for j in range(ws))
                    var_x = sum((j - x_m) ** 2 for j in range(ws))
                    slope = cov / var_x if var_x > 1e-12 else 0.0
                    intercept = y_m - slope * x_m
                    rss = sum((seg[j] - (intercept + slope * j)) ** 2 for j in range(ws)) / ws
                    rss_sum += rss
                    rss_cnt += 1
                continue
            s0 = float(ws)
            s1 = ws * (ws - 1) / 2.0
            s2 = ws * (ws - 1) * (2 * ws - 1) / 6.0
            s3 = (ws * (ws - 1) / 2.0) ** 2
            s4 = ws * (ws - 1) * (2 * ws - 1) * (3 * ws * ws - 3 * ws - 1) / 30.0
            sy0 = sum(seg)
            sy1 = sum(j * seg[j] for j in range(ws))
            sy2 = sum(j * j * seg[j] for j in range(ws))
            det = (s0 * (s2 * s4 - s3 * s3)
                   - s1 * (s1 * s4 - s3 * s2)
                   + s2 * (s1 * s3 - s2 * s2))
            if abs(det) < 1e-12:
                x_m = (ws - 1) * 0.5
                y_m = sum(seg) / ws
                cov = sum((j - x_m) * (seg[j] - y_m) for j in range(ws))
                var_x = sum((j - x_m) ** 2 for j in range(ws))
                slope = cov / var_x if var_x > 1e-12 else 0.0
                intercept = y_m - slope * x_m
                rss = sum((seg[j] - (intercept + slope * j)) ** 2 for j in range(ws)) / ws
                rss_sum += rss
                rss_cnt += 1
                continue
            a = ((sy0 * (s2 * s4 - s3 * s3)
                  - s1 * (sy1 * s4 - s3 * sy2)
                  + s2 * (sy1 * s3 - s2 * sy2)) / det)
            b = ((s0 * (sy1 * s4 - s3 * sy2)
                  - sy0 * (s1 * s4 - s3 * s2)
                  + s2 * (s1 * sy2 - sy1 * s2)) / det)
            c = ((s0 * (s2 * sy2 - sy1 * s3)
                  - s1 * (s1 * sy2 - sy1 * s2)
                  + sy0 * (s1 * s3 - s2 * s2)) / det)
            rss = sum((seg[j] - (a + b * j + c * j * j)) ** 2 for j in range(ws)) / ws
            rss_sum += rss
            rss_cnt += 1
        if rss_cnt >= 2:
            f_s = math.sqrt(rss_sum / rss_cnt)
            if f_s > 1e-14:
                scales.append(math.log(s))
                rms_vals.append(math.log(f_s))
        s = max(s + 1, int(s * 1.4 + 0.5))

    if len(scales) < 3:
        return None, None

    n_pts = len(scales)
    x_m = sum(scales) / n_pts
    y_m = sum(rms_vals) / n_pts
    num = sum((scales[i] - x_m) * (rms_vals[i] - y_m) for i in range(n_pts))
    den = sum((scales[i] - x_m) ** 2 for i in range(n_pts))
    if den < 1e-12:
        return None, None

    hurst = max(0.0, min(1.0, num / den))
    slope = num / den
    ss_res = sum((rms_vals[i] - (y_m + slope * (scales[i] - x_m))) ** 2 for i in range(n_pts))
    ss_tot = sum((rms_vals[i] - y_m) ** 2 for i in range(n_pts))
    r_squared = 1.0 - ss_res / ss_tot if ss_tot > 1e-12 else 0.0

    if r_squared < r2_threshold:
        return None, None

    return hurst, r_squared


# ──────────────────────────────────────────────────────────────
# 辅助类
# ──────────────────────────────────────────────────────────────

class _RollingMedian:
    """固定窗口滚动中位数(排序法,O(N log N) per query)。"""
    __slots__ = ('_data',)

    def __init__(self, maxlen: int):
        self._data: deque = deque(maxlen=maxlen)

    def push(self, val: float) -> None:
        self._data.append(val)

    @property
    def value(self) -> float | None:
        if not self._data:
            return None
        s = sorted(self._data)
        n = len(s)
        mid = n >> 1
        return s[mid] if n & 1 else (s[mid - 1] + s[mid]) * 0.5

    def __len__(self) -> int:
        return len(self._data)


class _HourBarAccum:
    """5m → 1h 聚合器。每 12 根 5m bar 聚合为 1 根 1h bar。"""
    __slots__ = ('count', 'open_', 'high', 'low', 'close', 'volume')

    def __init__(self):
        self.reset()

    def reset(self):
        self.count = 0
        self.open_ = 0.0
        self.high = 0.0
        self.low = float('inf')
        self.close = 0.0
        self.volume = 0.0

    def add(self, o: float, h: float, l: float, c: float, v: float) -> bool:
        """添加 5m bar,返回 True 当 1h bar 已完成。"""
        if self.count == 0:
            self.open_ = o
            self.high = h
            self.low = l
        else:
            self.high = max(self.high, h)
            self.low = min(self.low, l)
        self.close = c
        self.volume += v
        self.count += 1
        return self.count >= 12

    def as_tuple(self) -> tuple:
        return (self.close, self.high, self.low, self.open_, self.volume)


@dataclass
class _LayerContext:
    """跨层共享上下文,单次 check() 调用内有效。"""
    bocpd_trend_prob: float | None = None
    hurst: float | None = None
    hurst_r2: float | None = None


# ──────────────────────────────────────────────────────────────
# MomentumFilter 主类
# ──────────────────────────────────────────────────────────────

class MomentumFilter:
    """
    开仓动量过滤器 v5.0

    公开接口:
      update(symbol, close, high, low, open_, volume, kline_time)
      check(symbol, direction) -> (allowed, reason, is_soft)
      check_spread(z4h_history, direction, max_hold_hours) -> (has_trend, reason)
      ready(symbol) -> bool
    """

    def __init__(
        self,
        # ── 总开关 ──
        enabled: bool = True,
        # ── Layer 0: BOCPD + DFA-2 后备 ──
        bocpd_enabled: bool = True,
        bocpd_hazard_rate: float = 0.01,
        bocpd_trend_threshold: float = 0.75,
        hurst_enabled: bool = True,
        hurst_lookback: int = 60,
        hurst_threshold: float = 0.60,
        hurst_r2_threshold: float = 0.85,
        # ── Layer 1: 持续趋势 + MTF ──
        sustained_lookback: int = 30,
        sustained_base_threshold: float = 0.008,
        er_threshold_long: float = 0.60,
        er_threshold_short: float = 0.50,
        mtf_enabled: bool = True,
        mtf_er_threshold: float = 0.50,
        mtf_lookback: int = 6,
        # ── Layer 2: 急动检测 ──
        rs_period: int = 10,
        cusum_drift: float = 0.5,
        cusum_drift_adaptive: bool = True,
        cusum_threshold_spike_up: float = 3.5,
        cusum_threshold_spike_down: float = 2.5,
        volume_confirm_ratio: float = 1.5,
        volume_ema_period: int = 20,
        vwpm_window: int = 5,
        vwpm_confirm_ratio: float = 0.8,
        # ── Layer 2: BTC 因子 ──
        market_ref_symbol: str = "BTC",
        btc_stress_threshold: float = 2.0,
        btc_stress_factor_min: float = 0.7,
        btc_decay_rate: float = 0.3,
        # ── Layer 3: Spread 仲裁 ──
        spread_lookback: int = 20,
        spread_er_threshold: float = 0.45,
        spread_net_threshold: float = 1.5,
        spread_adf_pvalue: float = 0.05,
        spread_kpss_pvalue: float = 0.05,
        spread_use_pp: bool = True,
        spread_half_life_max: float = 36.0,
        # ── 跨层 ──
        cross_layer_enabled: bool = True,
        cross_layer_factor: float = 0.4,
    ):
        self._enabled = enabled

        # Layer 0
        self._bocpd_enabled = bocpd_enabled
        self._bocpd_hazard = bocpd_hazard_rate
        self._bocpd_trend_thresh = bocpd_trend_threshold
        self._hurst_enabled = hurst_enabled
        self._hurst_lookback = max(20, hurst_lookback)
        self._hurst_thresh = hurst_threshold
        self._hurst_r2_thresh = hurst_r2_threshold

        # Layer 1
        self._sustained_n = max(5, sustained_lookback)
        self._sustained_base_thresh = sustained_base_threshold
        self._er_thresh_long = er_threshold_long
        self._er_thresh_short = er_threshold_short
        self._mtf_enabled = mtf_enabled
        self._mtf_er_thresh = mtf_er_threshold
        self._mtf_lookback = mtf_lookback

        # Layer 2
        self._rs_period = rs_period
        self._cusum_drift = cusum_drift
        self._cusum_drift_adaptive = cusum_drift_adaptive
        self._cusum_thresh_up = cusum_threshold_spike_up
        self._cusum_thresh_down = cusum_threshold_spike_down
        self._vol_confirm_ratio = volume_confirm_ratio
        self._vol_ema_period = volume_ema_period
        self._vwpm_window = vwpm_window
        self._vwpm_confirm_ratio = vwpm_confirm_ratio

        # Layer 2: BTC
        self._market_ref = market_ref_symbol
        self._btc_stress_thresh = btc_stress_threshold
        self._btc_stress_factor_min = btc_stress_factor_min
        self._btc_decay_rate = btc_decay_rate

        # Layer 3
        self._spread_lookback = spread_lookback
        self._spread_er_thresh = spread_er_threshold
        self._spread_net_thresh = spread_net_threshold
        self._spread_adf_pvalue = spread_adf_pvalue
        self._spread_kpss_pvalue = spread_kpss_pvalue
        self._spread_use_pp = spread_use_pp
        self._spread_hl_max = spread_half_life_max

        # 跨层
        self._cross_layer = cross_layer_enabled
        self._cross_factor = cross_layer_factor

        # 数据缓冲区
        buf_size = max(hurst_lookback + 5, sustained_lookback + 5,
                       rs_period + 10, volume_ema_period + 5,
                       vwpm_window + 2) + 10
        self._buf_size = buf_size

        # 状态
        self._buffers: dict[str, deque] = {}
        self._last_kline_time: dict[str, datetime] = {}
        self._cusum_state: dict[str, tuple[float, float]] = {}
        self._rs_ema: dict[str, float] = {}
        self._vol_ema: dict[str, float] = {}
        self._baseline_median: dict[str, _RollingMedian] = {}
        self._bocpd_state: dict[str, _BOCPD] = {}
        self._buffers_1h: dict[str, deque] = {}
        self._hour_accum: dict[str, _HourBarAccum] = {}

    # ─────────────── 公开接口 ───────────────

    def update(self, symbol: str, close: float, high: float = 0.0,
               low: float = 0.0, open_: float = 0.0, volume: float = 0.0,
               kline_time: datetime | None = None) -> None:
        """每根新 K 线收盘时调用。"""
        if kline_time is not None:
            if self._last_kline_time.get(symbol) == kline_time:
                return
            self._last_kline_time[symbol] = kline_time

        if symbol not in self._buffers:
            self._buffers[symbol] = deque(maxlen=self._buf_size)
        self._buffers[symbol].append((close, high, low, open_, volume))

        buf = self._buffers[symbol]
        if len(buf) >= 2:
            self._online_update(symbol, buf)

    def check(self, symbol: str, direction: str) -> tuple[bool, str, bool]:
        """
        Layer 0 + Layer 2 + Layer 1 检查(单腿维度)。
        执行顺序:Layer 0(硬)→ Layer 2(硬)→ Layer 1(软)
        """
        if not self._enabled:
            return True, "", False

        buf = self._buffers.get(symbol)
        if buf is None or len(buf) < self._rs_period + 2:
            return True, "", False

        ctx = _LayerContext()

        # ── Layer 0: BOCPD + DFA-2 后备 ──
        ok, reason = self._check_regime(symbol, ctx)
        if not ok:
            return False, reason, False

        # ── Layer 2: 急动检测 ──
        ok, reason = self._check_spike(symbol, direction, ctx)
        if not ok:
            return False, reason, False

        # ── Layer 1: 持续趋势 + MTF ──
        if len(buf) >= self._sustained_n + 2:
            ok, reason = self._check_sustained(symbol, direction, ctx)
            if not ok:
                return False, reason, True

        return True, "", False

    def check_spread(self, z4h_history: list[float], direction: str,
                     max_hold_hours: float = 72.0) -> tuple[bool, str]:
        """
        Layer 3: PP + KPSS + ER + OU 半衰期约束。
        仅在存在软拦截(Layer 1)时调用。
        """
        n = self._spread_lookback
        if len(z4h_history) < n + 1:
            return False, ""

        series = z4h_history[-(n + 1):]

        # ── 第一关:ER 快速筛选 ──
        spread_dir = abs(series[-1] - series[0])
        spread_path = sum(abs(series[i] - series[i - 1]) for i in range(1, len(series)))
        er = spread_dir / spread_path if spread_path > 1e-10 else 0.0
        spread_net = series[-1] - series[0]

        if direction == 'short':
            dir_ok = spread_net <= -self._spread_net_thresh
        else:
            dir_ok = spread_net >= self._spread_net_thresh

        if not (er >= self._spread_er_thresh and dir_ok):
            return False, ""

        # ── 第二关:PP/ADF + KPSS 平稳性检验 ──
        ur_p = None
        kpss_p = None
        ur_test_name = "ER-only"

        if len(series) >= 15:
            # PP 优先(对异方差鲁棒)
            if self._spread_use_pp and _HAS_PP:
                try:
                    import numpy as _np
                    pp_result = _PhillipsPerron(_np.array(series, dtype=float))
                    ur_p = float(pp_result.pvalue)
                    ur_test_name = "PP"
                except Exception:
                    pass

            # ADF 后备
            if ur_p is None and _HAS_ADF:
                try:
                    ur_p = _adfuller(series, maxlag=2, regression='c', autolag=None)[1]
                    ur_test_name = "ADF"
                except Exception:
                    pass

            # KPSS
            if _HAS_KPSS:
                try:
                    with warnings.catch_warnings():
                        warnings.simplefilter("ignore")
                        kpss_p = _kpss(series, regression='c', nlags='auto')[1]
                except Exception:
                    pass

        # ── 四象限决策 + 半衰期约束 ──
        if ur_p is not None and kpss_p is not None:
            ur_stationary = ur_p < self._spread_adf_pvalue
            kpss_stationary = kpss_p > self._spread_kpss_pvalue

            if ur_stationary and kpss_stationary:
                # 确认平稳 → 检查半衰期
                hl = _estimate_half_life(series)
                hl_max = min(self._spread_hl_max, max_hold_hours * 0.5)
                if hl is not None and hl > hl_max:
                    return True, (
                        f"平稳但半衰期过长: {ur_test_name}_p={ur_p:.3f}"
                        f" KPSS_p={kpss_p:.3f} HL={hl:.1f}h>{hl_max:.0f}h→维持拦截"
                    )
                hl_str = f" HL={hl:.1f}h" if hl else " HL=N/A"
                return False, (
                    f"{ur_test_name}+KPSS确认平稳({ur_test_name}_p={ur_p:.3f}"
                    f" KPSS_p={kpss_p:.3f}{hl_str})→推翻拦截"
                )

            if not ur_stationary and not kpss_stationary:
                return True, (
                    f"Spread趋势(三重确认): ER={er:.2f}"
                    f" {ur_test_name}_p={ur_p:.3f} KPSS_p={kpss_p:.3f}"
                )

            return True, (
                f"Spread趋势(矛盾/不确定): ER={er:.2f}"
                f" {ur_test_name}_p={ur_p:.3f} KPSS_p={kpss_p:.3f}→保守拦截"
            )

        # 只有单项检验
        if ur_p is not None:
            if ur_p < self._spread_adf_pvalue:
                hl = _estimate_half_life(series)
                hl_max = min(self._spread_hl_max, max_hold_hours * 0.5)
                if hl is not None and hl > hl_max:
                    return True, f"平稳但HL过长: {ur_test_name}_p={ur_p:.3f} HL={hl:.1f}h"
                return False, f"{ur_test_name}判定平稳(p={ur_p:.3f})→推翻拦截(无KPSS)"
            return True, f"Spread趋势({ur_test_name}+ER): ER={er:.2f} p={ur_p:.3f}"

        # 降级:纯 ER
        return True, f"Spread趋势(ER): ER={er:.2f} 净位移={spread_net:+.3f}"

    def ready(self, symbol: str) -> bool:
        buf = self._buffers.get(symbol)
        return buf is not None and len(buf) >= self._rs_period + 2

    # ─────────────── Layer 0: BOCPD + DFA-2 后备 ───────────────

    def _check_regime(self, symbol: str, ctx: _LayerContext) -> tuple[bool, str]:
        """Layer 0: 概率化机制检测。"""
        # BOCPD 路径
        if self._bocpd_enabled:
            bocpd = self._bocpd_state.get(symbol)
            if bocpd and bocpd.ready:
                trend_prob = bocpd.trend_probability
                ctx.bocpd_trend_prob = trend_prob
                if trend_prob > self._bocpd_trend_thresh:
                    return False, (
                        f"Layer0-BOCPD趋势机制: P(trending)={trend_prob:.3f}"
                        f">{self._bocpd_trend_thresh}"
                    )
                return True, ""

        # DFA-2 后备路径
        if not self._hurst_enabled:
            return True, ""

        buf = self._buffers.get(symbol)
        if buf is None or len(buf) < self._hurst_lookback + 1:
            return True, ""

        closes = [d[0] for d in list(buf)[-(self._hurst_lookback + 1):]]
        h, r2 = _hurst_dfa2(closes, r2_threshold=self._hurst_r2_thresh)

        if h is not None:
            ctx.hurst = h
            ctx.hurst_r2 = r2
        if h is None:
            return True, ""

        if h > self._hurst_thresh:
            return False, (
                f"Layer0-DFA2趋势机制: Hurst={h:.3f}>{self._hurst_thresh}"
                f" R²={r2:.3f}(BOCPD未就绪,DFA-2后备)"
            )
        return True, ""

    # ─────────────── Layer 1: 持续趋势 + MTF ───────────────

    def _check_sustained(self, symbol: str, direction: str,
                         ctx: _LayerContext) -> tuple[bool, str]:
        """Layer 1: ER + RS 净位移 + MTF 确认 + 跨层 BOCPD 修正。"""
        buf = self._buffers[symbol]
        n = self._sustained_n
        data = list(buf)[-(n + 1):]
        closes = [d[0] for d in data]

        if len(closes) < n + 1:
            return True, ""

        ref = closes[0]
        if ref <= 0:
            return True, ""

        net_return = (closes[-1] - ref) / ref

        # Kaufman ER
        direction_dist = abs(closes[-1] - ref)
        path_length = sum(abs(closes[i] - closes[i - 1]) for i in range(1, len(closes)))
        er = direction_dist / path_length if path_length > 1e-10 else 0.0

        # 基础非对称 ER 阈值
        er_thresh = self._er_thresh_long if direction == 'long' else self._er_thresh_short

        # v5.0: MTF 修正
        if self._mtf_enabled:
            mtf_er = self._calc_mtf_er(symbol)
            if mtf_er is not None and mtf_er >= self._mtf_er_thresh:
                er_thresh *= 0.80

        # v5.0: 跨层 BOCPD 修正
        if self._cross_layer and ctx.bocpd_trend_prob is not None:
            if ctx.bocpd_trend_prob > 0.5:
                adjustment = self._cross_factor * (ctx.bocpd_trend_prob - 0.5)
                er_thresh *= (1.0 - adjustment)

        if er < er_thresh:
            return True, ""

        # RS 自适应阈值
        rs_var = self._rs_ema.get(symbol, 0.0)
        rs_vol = math.sqrt(max(0.0, rs_var))
        baseline = self._baseline_median.get(symbol)
        baseline_val = baseline.value if (baseline and len(baseline) >= 5) else None

        if rs_vol > 0 and baseline_val and baseline_val > 0:
            adaptive_thresh = self._sustained_base_thresh * (rs_vol / baseline_val)
            adaptive_thresh = max(self._sustained_base_thresh * 0.3,
                                  min(adaptive_thresh, self._sustained_base_thresh * 3.0))
        else:
            adaptive_thresh = self._sustained_base_thresh

        mtf_str = ""
        if self._mtf_enabled:
            mtf_er = self._calc_mtf_er(symbol)
            if mtf_er is not None and mtf_er >= self._mtf_er_thresh:
                mtf_str = f" [MTF-1h-ER={mtf_er:.2f}确认]"

        if direction == 'short' and net_return <= -adaptive_thresh:
            return False, (
                f"Layer1-不追跌: 过去{n}根净跌幅={net_return:.2%}"
                f" 阈值=-{adaptive_thresh:.2%} ER={er:.2f}>={er_thresh:.2f}{mtf_str}"
            )
        if direction == 'long' and net_return >= adaptive_thresh:
            return False, (
                f"Layer1-不追涨: 过去{n}根净涨幅={net_return:.2%}"
                f" 阈值=+{adaptive_thresh:.2%} ER={er:.2f}>={er_thresh:.2f}{mtf_str}"
            )
        return True, ""

    def _calc_mtf_er(self, symbol: str) -> float | None:
        """计算 1h 级别 ER。"""
        buf_1h = self._buffers_1h.get(symbol)
        if buf_1h is None or len(buf_1h) < self._mtf_lookback + 1:
            return None
        data = list(buf_1h)[-(self._mtf_lookback + 1):]
        closes = [d[0] for d in data]
        d_dist = abs(closes[-1] - closes[0])
        p_len = sum(abs(closes[i] - closes[i - 1]) for i in range(1, len(closes)))
        return d_dist / p_len if p_len > 1e-10 else 0.0

    # ─────────────── Layer 2: 急动检测 ───────────────

    def _check_spike(self, symbol: str, direction: str,
                     ctx: _LayerContext) -> tuple[bool, str]:
        """Layer 2: 自适应 CUSUM + BTC 指数因子 + 跨层修正。"""
        s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))

        effective_thresh_up = self._cusum_thresh_up
        effective_thresh_down = self._cusum_thresh_down

        # BTC 指数衰减因子(v5.0)
        btc_str = ""
        if symbol != self._market_ref:
            btc_s_pos, btc_s_neg = self._cusum_state.get(self._market_ref, (0.0, 0.0))
            btc_stress = max(btc_s_pos, btc_s_neg)
            if btc_stress > self._btc_stress_thresh:
                excess = btc_stress - self._btc_stress_thresh
                stress_factor = max(
                    self._btc_stress_factor_min,
                    math.exp(-self._btc_decay_rate * excess),
                )
                effective_thresh_up *= stress_factor
                effective_thresh_down *= stress_factor
                btc_str = f" BTC压力→阈值×{stress_factor:.2f}"

        # 跨层 BOCPD 修正(v5.0)
        if self._cross_layer and ctx.bocpd_trend_prob is not None:
            if ctx.bocpd_trend_prob > 0.5:
                cusum_adj = 1.0 - 0.2 * (ctx.bocpd_trend_prob - 0.5)
                effective_thresh_up *= cusum_adj
                effective_thresh_down *= cusum_adj

        # 量价确认
        vol_confirmed, vol_ratio, vwpm_val = self._check_volume(symbol)

        rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
        vol_str = f" 量比={vol_ratio:.1f}" if vol_ratio > 0 else ""
        vwpm_str = f" VWPM={vwpm_val:.4f}" if vwpm_val != 0.0 else ""

        if direction == 'short' and s_pos >= effective_thresh_up and vol_confirmed:
            return False, (
                f"Layer2-暴涨不做空: CUSUM+={s_pos:.2f}>={effective_thresh_up:.2f}"
                f" RS_vol={rs_vol:.4f}{vol_str}{vwpm_str}{btc_str}"
            )
        if direction == 'long' and s_neg >= effective_thresh_down and vol_confirmed:
            return False, (
                f"Layer2-暴跌不做多: CUSUM-={s_neg:.2f}>={effective_thresh_down:.2f}"
                f" RS_vol={rs_vol:.4f}{vol_str}{vwpm_str}{btc_str}"
            )
        return True, ""

    def _check_volume(self, symbol: str) -> tuple[bool, float, float]:
        """VWPM + 量比联合确认(继承 v4.0)。"""
        buf = self._buffers.get(symbol)
        if buf is None or len(buf) < 2:
            return True, 0.0, 0.0

        current_vol = buf[-1][4]
        vol_ema = self._vol_ema.get(symbol, 0.0)

        if vol_ema > 0 and current_vol > 0:
            vol_ratio = current_vol / vol_ema
        else:
            return True, 0.0, 0.0

        vwpm_val = 0.0
        w = min(self._vwpm_window, len(buf) - 1)
        if w >= 1:
            data = list(buf)[-(w + 1):]
            weighted_sum = 0.0
            total_vol = 0.0
            for i in range(1, len(data)):
                prev_c = data[i - 1][0]
                curr_c = data[i][0]
                v = data[i][4]
                if prev_c > 0 and v > 0:
                    ret = (curr_c - prev_c) / prev_c
                    weighted_sum += ret * v
                    total_vol += v
            if total_vol > 0:
                vwpm_val = weighted_sum / total_vol

        rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
        if rs_vol > 1e-10 and vwpm_val != 0.0:
            vwpm_normalized = abs(vwpm_val) / rs_vol
            confirmed = (vol_ratio >= self._vol_confirm_ratio
                         and vwpm_normalized >= self._vwpm_confirm_ratio)
        else:
            confirmed = vol_ratio >= self._vol_confirm_ratio

        return confirmed, vol_ratio, vwpm_val

    # ─────────────── 在线更新 ───────────────

    def _online_update(self, symbol: str, buf: deque) -> None:
        """O(1) 在线更新所有状态。"""
        curr = buf[-1]
        prev = buf[-2]
        close, high, low, open_, volume = curr
        prev_close = prev[0]

        if prev_close <= 0 or close <= 0:
            return

        # ── RS EMA ──
        rs_val = self._calc_rs_single(high, low, open_, close)
        alpha_rs = 2.0 / (self._rs_period + 1)
        prev_rs = self._rs_ema.get(symbol, rs_val)
        new_rs = alpha_rs * rs_val + (1.0 - alpha_rs) * prev_rs
        self._rs_ema[symbol] = new_rs

        # ── Per-symbol 基准波动率 ──
        rs_vol_now = math.sqrt(max(0.0, new_rs))
        if symbol not in self._baseline_median:
            self._baseline_median[symbol] = _RollingMedian(maxlen=200)
        self._baseline_median[symbol].push(rs_vol_now)

        # ── CUSUM(自适应 drift, v5.0)──
        rs_vol = math.sqrt(max(0.0, new_rs))
        if rs_vol > 1e-10:
            ret = (close - prev_close) / prev_close
            z = ret / rs_vol

            # 自适应 drift
            drift = self._cusum_drift
            if self._cusum_drift_adaptive:
                baseline = self._baseline_median.get(symbol)
                bl_val = baseline.value if (baseline and len(baseline) >= 5) else None
                if bl_val and bl_val > 0:
                    vol_ratio = rs_vol / bl_val
                    drift = self._cusum_drift * max(0.5, min(2.0, vol_ratio))

            s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))
            s_pos = max(0.0, s_pos + z - drift)
            s_neg = max(0.0, s_neg - z - drift)
            self._cusum_state[symbol] = (s_pos, s_neg)

        # ── Volume EMA ──
        if volume > 0:
            alpha_v = 2.0 / (self._vol_ema_period + 1)
            prev_v = self._vol_ema.get(symbol, volume)
            self._vol_ema[symbol] = alpha_v * volume + (1.0 - alpha_v) * prev_v

        # ── BOCPD 更新(v5.0)──
        if self._bocpd_enabled and prev_close > 0:
            log_ret = math.log(close / prev_close)
            if symbol not in self._bocpd_state:
                self._bocpd_state[symbol] = _BOCPD(
                    hazard_rate=self._bocpd_hazard, max_run=200,
                )
            self._bocpd_state[symbol].update(log_ret)

        # ── 1h 聚合(v5.0 MTF)──
        if self._mtf_enabled:
            if symbol not in self._hour_accum:
                self._hour_accum[symbol] = _HourBarAccum()
            accum = self._hour_accum[symbol]
            o = open_ if open_ > 0 else close
            h = high if high > 0 else close
            l = low if low > 0 else close
            if accum.add(o, h, l, close, volume):
                if symbol not in self._buffers_1h:
                    self._buffers_1h[symbol] = deque(maxlen=30)
                self._buffers_1h[symbol].append(accum.as_tuple())
                accum.reset()

    @staticmethod
    def _calc_rs_single(high: float, low: float, open_: float, close: float) -> float:
        """Rogers-Satchell (1991) 波动率估计器(单根 K 线)。"""
        if high > 0 and low > 0 and open_ > 0 and high >= low:
            try:
                rs = (math.log(high / close) * math.log(high / open_)
                      + math.log(low / close) * math.log(low / open_))
                return max(0.0, rs)
            except (ValueError, ZeroDivisionError):
                pass
        if open_ > 0:
            try:
                return math.log(close / open_) ** 2
            except (ValueError, ZeroDivisionError):
                pass
        return 0.0

3.3 StrategyParams 新增字段(v5.0)

src/trading/config.pyStrategyParams 中新增(v4.0 的 24 个字段 + v5.0 新增 9 个,共 33 个,全部有默认值):

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # ── 动量过滤器参数 ──
    momentum_filter_enabled: bool = True

    # Layer 0: BOCPD + DFA-2 后备
    momentum_bocpd_enabled: bool = True                    # v5.0 新增
    momentum_bocpd_hazard_rate: float = 0.01               # v5.0 新增
    momentum_bocpd_trend_threshold: float = 0.75           # v5.0 新增
    momentum_hurst_enabled: bool = True
    momentum_hurst_lookback: int = 60
    momentum_hurst_threshold: float = 0.60
    momentum_hurst_r2_threshold: float = 0.85

    # Layer 1: 持续趋势 + MTF
    momentum_sustained_lookback: int = 30
    momentum_sustained_base_threshold: float = 0.008
    momentum_er_threshold_long: float = 0.60
    momentum_er_threshold_short: float = 0.50
    momentum_mtf_enabled: bool = True                      # v5.0 新增
    momentum_mtf_er_threshold: float = 0.50                # v5.0 新增
    momentum_mtf_lookback: int = 6                         # v5.0 新增

    # Layer 2: 急动检测
    momentum_rs_period: int = 10
    momentum_cusum_drift: float = 0.5
    momentum_cusum_drift_adaptive: bool = True             # v5.0 新增
    momentum_cusum_threshold_spike_up: float = 3.5
    momentum_cusum_threshold_spike_down: float = 2.5
    momentum_volume_confirm_ratio: float = 1.5
    momentum_volume_ema_period: int = 20
    momentum_vwpm_window: int = 5
    momentum_vwpm_confirm_ratio: float = 0.8
    momentum_market_ref_symbol: str = "BTC"
    momentum_btc_stress_threshold: float = 2.0
    momentum_btc_stress_factor_min: float = 0.7
    momentum_btc_decay_rate: float = 0.3                   # v5.0 新增

    # Layer 3: Spread 仲裁
    momentum_spread_lookback: int = 20
    momentum_spread_er_threshold: float = 0.45
    momentum_spread_net_threshold: float = 1.5
    momentum_spread_adf_pvalue: float = 0.05
    momentum_spread_kpss_pvalue: float = 0.05
    momentum_spread_use_pp: bool = True                    # v5.0 新增
    momentum_spread_half_life_max: float = 36.0            # v5.0 新增

    # 跨层
    momentum_cross_layer_enabled: bool = True              # v5.0 新增
    momentum_cross_layer_factor: float = 0.4               # v5.0 新增

3.4 strategy.py 集成改动

与 v4.0 完全一致:SymbolBaseline 新增 z4h_historyprocess_tick 透传 OHLCV,_check_entry 四层检查。唯一变化是 check_spread 新增 max_hold_hours 参数:

# _check_entry 中 Layer 3 仲裁调用(v5.0 传入 max_hold_hours)
spread_trend, spread_reason = self._momentum_filter.check_spread(
    z4h_hist, direction,
    max_hold_hours=params.max_hold_hours,
)

3.5 改动范围(v5.0)

改动文件 改动内容 改动量
src/trading/momentum_filter.py 重写:BOCPD + 自适应 CUSUM + MTF + PP + 半衰期 + 跨层 ~550 行
src/trading/strategy.py check_spread 传入 max_hold_hours ~2 行
src/trading/config.py StrategyParams 新增 9 个字段 ~15 行
src/trading/orchestrator.py 透传 OHLCV(继承 v4.0) ~5 行
src/services/realtime_kline_service_base.py 提取双腿 OHLCV(继承 v4.0) ~20 行

4. 融入当前交易体系的方案

4.1 数据流全景(v5.0)

WebSocket K线推送(OHLCV + volume)
      |
      v
realtime_kline_service_base
      |
      +-- _trigger_strategy_if_ready()
              |
              +-- 提取 alt/base OHLCV + volume
              v
      TradingOrchestrator.process_analysis(
          ..., alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv
      )
              |
              v
      AdaptiveBollingerStrategy.process_tick(
          ..., alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv
      )
              |
              +-- [新K线] bl.z4h_history.append(z4h)
              |
              +-- [新K线] MomentumFilter.update(alt/base, ohlcv, kline_time)
              |           → O(1) 在线更新:
              |             RS EMA / 自适应CUSUM / Volume EMA
              |             Per-symbol基准中位数 / BOCPD(v5.0) / 1h聚合(v5.0)
              |
              +-- _check_entry()
                      |
                      步骤1-4: 冷却期/突破/持仓/z4h
                      步骤4.5: 方向判断
                      步骤4.6: 四层动量过滤(跨层共享 _LayerContext)
                                ├── Layer 0(硬): BOCPD P(trending) / DFA-2后备
                                │    共享 ctx.bocpd_trend_prob → Layer 1/2
                                │
                                ├── Layer 2(硬): 自适应CUSUM + BTC指数 + 跨层修正
                                │
                                ├── Layer 1(软): ER + MTF确认 + 跨层修正
                                │
                                └── Layer 3(仲裁): PP/ADF+KPSS + OU半衰期
                      步骤5: 产生 EntrySignal

4.2 线程安全

与 v4.0 一致。MomentumFilter 不持有锁,由 AdaptiveBollingerStrategy_lock 统一保护。新增的 _BOCPD 状态和 _HourBarAccum 均在锁保护范围内更新。

4.3 日志格式(v5.0)

动量过滤(硬)   | BTC|ETH | alt:Layer0-BOCPD趋势机制: P(trending)=0.82>0.75
动量过滤(硬)   | SOL|BTC | alt:Layer0-DFA2趋势机制: Hurst=0.67>0.60 R²=0.92(BOCPD未就绪,DFA-2后备)
动量过滤(硬)   | SOL|BTC | alt:Layer2-暴涨不做空: CUSUM+=3.82>=3.15 RS_vol=0.0028 量比=2.3 VWPM=0.0012 BTC压力→阈值×0.85
动量过滤(软+Spread) | SOL|BTC | alt:Layer1-不追跌: 过去30根净跌幅=-1.23% ER=0.72>=0.48 [MTF-1h-ER=0.55确认] | 平稳但半衰期过长: PP_p=0.02 KPSS_p=0.12 HL=42.3h>36h→维持拦截
动量仲裁放行   | ETH|BTC | 单腿:alt:Layer1-不追涨: ... | PP+KPSS确认平稳(PP_p=0.01 KPSS_p=0.15 HL=8.2h)→推翻拦截

4.4 配置层集成(v5.0)

# Layer 0: BOCPD + DFA-2 后备
TRADING_MOMENTUM_BOCPD_ENABLED=true                        # v5.0 新增
TRADING_MOMENTUM_BOCPD_HAZARD_RATE=0.01                    # v5.0 新增
TRADING_MOMENTUM_BOCPD_TREND_THRESHOLD=0.75                # v5.0 新增
TRADING_MOMENTUM_HURST_ENABLED=true
TRADING_MOMENTUM_HURST_LOOKBACK=60
TRADING_MOMENTUM_HURST_THRESHOLD=0.60
TRADING_MOMENTUM_HURST_R2_THRESHOLD=0.85

# Layer 1: 持续趋势 + MTF
TRADING_MOMENTUM_SUSTAINED_LOOKBACK=30
TRADING_MOMENTUM_SUSTAINED_BASE_THRESHOLD=0.008
TRADING_MOMENTUM_ER_THRESHOLD_LONG=0.60
TRADING_MOMENTUM_ER_THRESHOLD_SHORT=0.50
TRADING_MOMENTUM_MTF_ENABLED=true                          # v5.0 新增
TRADING_MOMENTUM_MTF_ER_THRESHOLD=0.50                     # v5.0 新增
TRADING_MOMENTUM_MTF_LOOKBACK=6                            # v5.0 新增

# Layer 2: 急动检测
TRADING_MOMENTUM_RS_PERIOD=10
TRADING_MOMENTUM_CUSUM_DRIFT=0.5
TRADING_MOMENTUM_CUSUM_DRIFT_ADAPTIVE=true                 # v5.0 新增
TRADING_MOMENTUM_CUSUM_THRESHOLD_SPIKE_UP=3.5
TRADING_MOMENTUM_CUSUM_THRESHOLD_SPIKE_DOWN=2.5
TRADING_MOMENTUM_VOLUME_CONFIRM_RATIO=1.5
TRADING_MOMENTUM_VWPM_WINDOW=5
TRADING_MOMENTUM_VWPM_CONFIRM_RATIO=0.8
TRADING_MOMENTUM_MARKET_REF_SYMBOL=BTC
TRADING_MOMENTUM_BTC_STRESS_THRESHOLD=2.0
TRADING_MOMENTUM_BTC_STRESS_FACTOR_MIN=0.7
TRADING_MOMENTUM_BTC_DECAY_RATE=0.3                        # v5.0 新增

# Layer 3: Spread 仲裁
TRADING_MOMENTUM_SPREAD_LOOKBACK=20
TRADING_MOMENTUM_SPREAD_ER_THRESHOLD=0.45
TRADING_MOMENTUM_SPREAD_NET_THRESHOLD=1.5
TRADING_MOMENTUM_SPREAD_ADF_PVALUE=0.05
TRADING_MOMENTUM_SPREAD_KPSS_PVALUE=0.05
TRADING_MOMENTUM_SPREAD_USE_PP=true                        # v5.0 新增
TRADING_MOMENTUM_SPREAD_HALF_LIFE_MAX=36.0                 # v5.0 新增

# 跨层
TRADING_MOMENTUM_CROSS_LAYER_ENABLED=true                  # v5.0 新增
TRADING_MOMENTUM_CROSS_LAYER_FACTOR=0.4                    # v5.0 新增

5. 与当前交易风格的契合度分析

5.1 交易风格特征

维度 当前系统特征
策略类型 配对协整均值回归
K 线周期 5 分钟
最大持仓时间 72 小时
入场逻辑 adaptive_z 突破阈值(首次穿越)
出场逻辑 adaptive_z 回归至 entry_adaptive_z × reversion_factor

5.2 v5.0 新增改进的契合度分析

BOCPD 概率化机制检测(最高契合):

均值回归策略的前提是"市场处于均值回归机制"。BOCPD 直接输出 P(均值回归机制失效) 的概率,是对该前提最直接的检验。相比 DFA-2 阈值判断,BOCPD 的概率输出让硬拦截决策更平滑、更可靠——P=0.76P=0.99 的拦截有不同的置信度,而 DFA-2 的 Hurst=0.61Hurst=0.90 触发相同动作。

OU 半衰期约束(最高契合):

对均值回归策略来说,"多快回归"比"是否平稳"更直接相关。半衰期 = 200h 的平稳序列通过 ADF+KPSS,但在 72h 持仓限制内无法获利。v5.0 将 half_life < max_hold_hours/2 作为 Layer 3 放行的额外约束,直接将过滤器与策略的时间框架绑定。

自适应 CUSUM drift(高契合):

加密市场波动率变化剧烈(BTC 日波动率从 1% 到 10%+)。固定 drift 在低波动期过于宽松(漏判)、高波动期过于敏感(误判)。自适应 drift 让 CUSUM 的 ARL 特性在不同波动率环境下保持稳定。

多时间框架确认(高契合):

5min 级别噪音导致 Layer 1 误触发。1h 确认后降低 ER 阈值,本质上是"多时间框架投票"——只有 5m+1h 都认为有趋势时,才更积极拦截。

跨层信息共享(中高契合):

弱趋势信号(BOCPD trend_prob=0.55)单独不足以硬拦截,但可以让 Layer 1/2 更敏感。这种"梯度式谨慎"比"全有或全无"的独立判断更精细。

Phillips-Perron(中契合):

加密市场波动率聚集显著,PP 比 ADF 更可靠。但在 spread_lookback=20 的小样本下,PP 和 ADF 的差异不大。主要价值在理论严谨性。

5.3 综合评分(v5.0 vs v4.0)

维度 评分 vs v4.0 说明
算法前沿性 ★★★★★ +1★ BOCPD (2007) + OU 半衰期 = state-of-the-art
误杀控制 ★★★★★ +0.5★ 概率化决策 + 半衰期约束减少误杀
自适应能力 ★★★★★ +0.5★ 自适应 drift + 跨层动态调节
市场微观结构 ★★★★★ 持平 BTC 指数衰减是微调,OFI 留待 Phase 2
统计严谨性 ★★★★★ +0.5★ BOCPD 贝叶斯框架 + PP 异方差鲁棒 + 半衰期
多尺度覆盖 ★★★★★ +1★ MTF 确认填补 v4.0 单尺度缺陷
层间协同 ★★★★★ +1★ 跨层共享 vs v4.0 独立判断
运行效率 ★★★★☆ 持平 BOCPD O(200) per bar << DFA-2 O(N²)
代码侵入性 ★★★★★ 持平 独立模块,可逐项关闭

6. 与四项开单约束的对应关系

6.1 约束覆盖矩阵(v5.0)

约束 过滤层 触发指标 被阻止方向 检查维度 可仲裁 v5.0 变化
0. 趋势机制 Layer 0 BOCPD P(trending)>T 或 Hurst(DFA-2)>T 双向 alt+base BOCPD 概率化
1. 不追跌 Layer 1 ER≥T_short AND net_ret≤-T_adp short alt+base+spread MTF+跨层修正
2. 不追涨 Layer 1 ER≥T_long AND net_ret≥+T_adp long alt+base+spread MTF+跨层修正
3. 暴涨不做空 Layer 2 CUSUM+≥T_up×SF AND VWPM确认 short alt+base+BTC 自适应drift+BTC指数+跨层
4. 暴跌不做多 Layer 2 CUSUM-≥T_down×SF AND VWPM确认 long alt+base+BTC 自适应drift+BTC指数+跨层

四项约束完全覆盖。v5.0 增强:Layer 0 概率化替代阈值判断,Layer 2 自适应 drift 提升跨波动率环境稳定性,Layer 3 半衰期约束确保放行信号在策略时间框架内可获利。


7. 参数配置指南

7.1 v5.0 新增参数

Layer 0:BOCPD

参数 默认值 范围 说明
bocpd_hazard_rate 0.01 0.005-0.05 变点先验概率(1/期望机制长度)
bocpd_trend_threshold 0.75 0.60-0.90 P(trending) > 此值 → 硬拦截
bocpd_hazard_rate 调优:
  0.005:期望机制长度 200 根 = 16.7h → 对机制切换不敏感
  0.01:期望机制长度 100 根 = 8.3h  → 标准值
  0.02:期望机制长度 50 根 = 4.2h   → 对快速机制切换敏感
  0.05:期望机制长度 20 根 = 1.7h   → 过于敏感,噪音大

bocpd_trend_threshold 调优:
  0.60:激进,轻微趋势信号即拦截(误杀多)
  0.75:标准值,需要较明确的趋势后验才拦截
  0.90:保守,只有非常高置信度才拦截(可能漏判)

Layer 1:多时间框架

参数 默认值 范围 说明
mtf_er_threshold 0.50 0.35-0.65 1h ER ≥ 此值时降低 5m ER 阈值
mtf_lookback 6 4-12 1h 回望根数(6根=6小时)

Layer 2:自适应 drift + BTC 指数

参数 默认值 范围 说明
cusum_drift_adaptive true 自适应 drift 开关
btc_decay_rate 0.3 0.1-0.5 BTC 指数衰减率 λ
btc_decay_rate 调优:
  0.1:温和衰减,BTC 影响范围大但幅度小
  0.3:标准值,excess=1 时 factor=0.74
  0.5:急剧衰减,只有 BTC 极端急动时才显著影响 alt 阈值

Layer 3:PP + 半衰期

参数 默认值 范围 说明
spread_use_pp true Phillips-Perron 检验开关(需 arch 库)
spread_half_life_max 36.0 12.0-48.0 最大允许半衰期(小时)
spread_half_life_max 调优:
  12h:严格,只放行快速回归的 spread → 误杀增加但安全性高
  36h:标准值(= max_hold_hours/2),在持仓期内有足够回归时间
  48h:宽松,允许较慢的回归 → 漏判增加
  建议:设为 max_hold_hours / 2

跨层

参数 默认值 范围 说明
cross_layer_factor 0.4 0.2-0.6 跨层修正强度
cross_layer_factor 调优:
  0.2:微弱跨层影响(各层近乎独立)
  0.4:标准值,trend_prob=0.75 时 ER 阈值降 10%
  0.6:强跨层影响,trend_prob=0.75 时 ER 阈值降 15%

7.2 继承 v4.0 参数

Layer 0(DFA-2 后备)、Layer 1(ER/RS)、Layer 2(CUSUM/VWPM/BTC 基础)、Layer 3(ADF/KPSS/ER)的参数调优指南与 v4.0 完全一致,此处不重复。

7.3 回测验证建议

对比六组回测:
  A 组:无动量过滤(基线)
  B 组:v3.0(RS + DFA-1 + ADF+ER)
  C 组:v4.0(RS + DFA-2+R² + VWPM + BTC + ADF+KPSS+ER)
  D 组:v5.0 完整版
  E 组:v5.0 关闭各子功能(评估各改进项贡献)
    E1: 关闭 BOCPD(仅用 DFA-2 后备)
    E2: 关闭自适应 drift(固定 drift=0.5)
    E3: 关闭 MTF 确认
    E4: 关闭半衰期约束
    E5: 关闭跨层共享
    E6: 关闭 PP(仅用 ADF)
  F 组:v5.0 + Phase 2 OFI(L2 book 集成后)

核心指标:
  - Sharpe Ratio(风险调整收益)
  - 最大连续亏损次数
  - 信号过滤率 / 误杀率
  - Layer 3 仲裁放行率 / 半衰期拒绝率
  - Layer 0 BOCPD vs DFA-2 激活比例
  - 分波动率区间表现(高/中/低波动率环境)

预期 v5.0 vs v4.0 增量收益:
  BOCPD:减少 Layer 0 误拦截 20-30%(概率化 vs 阈值化)
  自适应 drift:高波动期 Layer 2 误报减少 15-25%
  MTF 确认:Layer 1 误触发减少 10-20%(1h 不确认的短期波动)
  半衰期约束:过滤掉 10-15% 的"假放行"(平稳但回归太慢)
  跨层共享:整体拦截精度提升 5-10%(弱趋势信号的梯度传递)
  PP:在波动率聚集期 Layer 3 准确率提升 5-10%

8. 风险与局限性

8.1 已知局限

局限 说明 缓解措施
BOCPD hazard_rate 敏感性 先验变点概率影响机制长度估计 默认 0.01 在多种市况验证;可通过回测优化
BOCPD 预热期 需 20 根 K 线(100 分钟)才就绪 DFA-2 自动后备;启动时 DB 回填
BOCPD 趋势概率的 sigmoid 映射 将后验 z 值映射为 P(trending),sigmoid 中心点 2.0 凭经验 经验表明 z>2 对应 ~95% 置信;可通过回测校准
半衰期估计对小样本敏感 spread_lookback=20 时 OLS 仅 19 个观测值 小样本时半衰期约束自动跳过(返回 None)
PP 需 arch 库 arch 不一定已安装 自动降级为 ADF;arch 是 numpy 的常见扩展
MTF 1h 聚合延迟 1h bar 需要 12 根 5m bar 才完成 前 12 根无 MTF 数据,不影响决策(MTF 只是阈值修正)
自适应 drift 可能过度补偿 极端波动率时 drift 变化大 clamp(0.5, 2.0) 限制缩放范围
跨层因子的最优值未知 cross_layer_factor=0.4 凭经验 影响温和(最多 10% 阈值调整),回测优化
参数总数增加 v5.0 共 33 个参数(v4.0 为 24 个) 9 个新参数全有默认值;可考虑未来引入元参数降维

8.2 不适合的场景

  • 极低流动性资产:RS 波动率失真,BOCPD 因稀疏数据后验不收敛
  • HIP-3 稀疏资产:volume 不可靠时 VWPM 无效,OFI 也无法使用
  • 极短持仓策略:半衰期约束对 max_hold_hours < 4h 的策略过于宽松

8.3 后续迭代方向(v5.0 之后)

优先级 方向 说明
P1 Phase 2: OFI 集成 L2 book 数据管道 → MomentumFilter,替代 VWPM
P1 BOCPD 参数自适应 hazard_rate 根据近期变点频率动态调整
P2 元参数降维 引入 risk_attitude ∈ [0,1] 驱动所有子参数联合缩放
P2 Transfer Entropy BTC 因子 信息论框架度量 BTC→alt 因果信息流,替代 CUSUM stress
P2 Kalman Filter 动态对冲比率 追踪时变协整向量,Kalman gain 异常时 Layer 0 触发
P3 BOCPD sigmoid 校准 用历史数据校准 sigmoid 中心点和斜率
P3 Copula 尾部依赖 建模 BTC-alt 非线性尾部依赖,替代线性 stress factor
P3 Shiryaev-Roberts 对照 SR 程序与 CUSUM 的 minimax 最优性对比验证
P4 HMM 离线训练 用历史数据训练两状态 HMM,作为 BOCPD 的交叉验证

附录 A:算法学术参考

算法 原始论文 核心贡献
BOCPD Adams, R.P. & MacKay, D.J.C. (2007) arXiv:0710.3742 贝叶斯在线变点检测,NIG 共轭先验,概率化机制推断
DFA / DFA-2 Peng et al. (1994); Kantelhardt et al. (2002) 去趋势波动分析,Hurst 指数估计
DFA in Finance Mantegna & Stanley (1995) Nature DFA 引入金融时间序列
Kaufman ER Kaufman, P. (1995) Smarter Trading 方向距离/路径长度衡量趋势效率
CUSUM Page, E.S. (1954) Biometrika 变化检测最优在线算法
CUSUM 最优性 Lorden, G. (1971) Ann. Math. Statist. CUSUM 一阶渐近最优
CUSUM 自适应 drift Hawkins & Olwell (1998) Cumulative Sum Charts CUSUM ARL 依赖 drift 与信号比,建议自适应调整
Rogers-Satchell Rogers, L. & Satchell, S. (1991) Ann. Appl. Prob. 漂移不变 OHLC 波动率
Garman-Klass Garman, M. & Klass, M. (1980) J. Business OHLC 波动率(零漂移假设)
Kyle 模型 Kyle, A.S. (1985) Econometrica 价格+成交量联合信号
OFI Cont, R., Kukanov, A. & Stoikov, S. (2014) J. Financial Econometrics Order Flow Imbalance,盘口微观结构量价指标
ADF 检验 Dickey, D.A. & Fuller, W.A. (1979) JASA 单位根检验
KPSS 检验 Kwiatkowski et al. (1992) J. Econometrics 平稳性检验(ADF 对偶)
Phillips-Perron Phillips, P.C.B. & Perron, P. (1988) Biometrika 非参数异方差鲁棒单位根检验
时间序列动量 Moskowitz, Ooi & Pedersen (2012) J. Financial Economics 资产间动量溢出效应
OU 过程半衰期 Vidyamurthy, G. (2004) Pairs Trading 均值回归半衰期估计,配对交易核心参数
VPIN Easley, Lopez de Prado & O'Hara (2012) Rev. Financial Studies 知情交易概率的成交量同步估计
Bayesian Changepoint Adams & MacKay (2007) arXiv 贝叶斯在线变点检测
Shiryaev-Roberts Pollak & Tartakovsky (2008) Ann. Statist. CUSUM 替代方案
Transfer Entropy Schreiber, T. (2000) Physical Review Letters 信息论因果信息流度量
HMM 机制切换 Hamilton, J.D. (1989) Econometrica 马尔可夫机制切换模型
配对交易实证 Gatev, E. et al. (2006) Rev. Financial Studies 配对交易大规模实证验证

附录 B:v4.0 → v5.0 改进对照

维度 v4.0 v5.0 改进原因
机制检测 DFA-2 点估计 + R² 质检 BOCPD 概率化 + DFA-2 后备 概率输出量化不确定性,短序列更可靠(Adams & MacKay 2007)
机制检测阈值 Hurst > 0.60(二值) P(trending) > 0.75(连续) 连续概率让决策更精细
CUSUM drift 固定 0.5 自适应 ∝ RS vol / baseline 跨波动率环境 ARL 稳定性(Hawkins & Olwell 1998)
BTC 因子衰减 线性 1.0 - 0.1×excess 指数 exp(-0.3×excess) 指数衰减更平滑,有物理依据
Spread 单位根检验 ADF(异方差敏感) PP 优先 + ADF 后备 PP 对波动率聚集鲁棒(Phillips & Perron 1988)
Spread 放行条件 平稳即放行 平稳 + 半衰期 < max_hold/2 半衰期直接关联策略持仓期(Vidyamurthy 2004)
时间框架 仅 5min 5min + 1h MTF 确认 多尺度验证减少噪音误触发
层间关系 完全独立 跨层信息共享 via _LayerContext 弱信号梯度传递,提升整体拦截精度
量价确认 VWPM VWPM(Phase 1)+ OFI 路径(Phase 2) OFI 从盘口微观结构直接度量买卖压力
新增参数 9 个 BOCPD×3, MTF×3, drift×1, BTC×1, PP×1, HL×1, 跨层×2(-hl_max 与 pp 共用 Layer 3)
总参数数 24 33 全部有默认值,可逐项关闭

文档结束

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG