开仓动量过滤器设计方案5
开仓动量过滤器设计方案
版本:v5.0
日期:2026-03-06
适用系统:Hyperliquid 量化交易系统(Adaptive Bollinger Z-Score 配对策略)
升级摘要:v4.0 → v5.0 BOCPD 概率化机制检测,OU 半衰期约束,自适应 CUSUM drift,BTC 指数衰减 + 滚动领先滞后,OFI 量价升级路径,Phillips-Perron 检验,多时间框架确认,跨层信息共享
目录
1. 背景与问题定义
1.1 交易系统现状
当前系统基于 Adaptive Bollinger Z-Score 配对策略,核心逻辑是协整对的 spread 均值回归:
- 当
adaptive_z突破adaptive_threshold时产生入场信号 - 当
adaptive_z回归至entry_adaptive_z * reversion_factor时平仓 - 使用 5 分钟 K 线,最大持仓时间控制在 72 小时以内(
max_hold_hours=72.0) - 策略引擎
_check_entry()已有冷却期、突破检测、持仓检查、z4h 绝对值过滤四层前置过滤
1.2 待解决的问题
均值回归策略在以下场景存在明显风险:
| 场景 | 问题 | 后果 |
|---|---|---|
| 资产处于趋势机制(Hurst > 0.6) | 均值回归假设本身失效 | spread 持续扩张,系统性亏损 |
| 单边持续上涨 N 根 K 线后 | 追涨做多 | 趋势过度延伸,极易被反转砸穿 |
| 单边持续下跌 N 根 K 线后 | 追跌做空 | 跌幅充分释放,反弹风险极高 |
| 短时间内迅速暴涨 | 做空对抗强动量 | 被轧空,止损代价极大 |
| 短时间内迅速暴跌 | 做多接飞刀 | 继续下杀,止损代价极大 |
1.3 四项约束目标
约束 1:连续下跌 → 不追跌(不做空)
约束 2:连续上涨 → 不追涨(不做多)
约束 3:迅速暴涨 → 不做空
约束 4:迅速暴跌 → 不做多
1.4 过滤维度
| 维度 | 检查对象 | 原因 |
|---|---|---|
| Alt 腿(单腿) | alt symbol 的价格动量 | 直接持仓标的的风险 |
| Base 腿(单腿) | base symbol 的价格动量 | 配对交易中 base 腿方向相反,同样承担动量风险 |
| Spread(配对层) | 两腿价差的趋势性 | 单腿动量不等于 spread 动量;两腿同涨但 spread 稳定时不应误杀 |
1.5 v4.0 算法局限性分析(v5.0 升级动机)
| 维度 | v4.0 做法 | 局限性 | v5.0 解决方案 |
|---|---|---|---|
| 机制检测 | DFA-2 点估计 + R² 质检 | 60 根 K 线仅 5-7 个尺度点,点估计丢弃不确定性信息 | BOCPD 概率化机制检测,输出 P(trending) 而非阈值判断 |
| CUSUM drift | 固定 drift=0.5 | 高波动期过于敏感,低波动期过于迟钝 | 自适应 drift ∝ RS vol / baseline_vol |
| BTC 因子 | 线性衰减 | 系数 0.1 凭经验,线性衰减无物理依据 | 指数衰减 exp(-λ·excess),衰减率可调 |
| 量价确认 | VWPM(K 线级) | 只能度量 K 线级别量价方向,无法捕捉盘口微观结构 | OFI(L2 book 可用时)+ VWPM 后备 |
| Spread 仲裁 | ADF + KPSS | ADF 对异方差敏感;无半衰期约束 | Phillips-Perron(异方差鲁棒)+ OU 半衰期约束 |
| 时间框架 | 仅 5min | 5min 噪音大,短期波动易误触发 | 多时间框架确认(1h ER 修正 5m 阈值) |
| 层间关系 | 各层独立判断 | Layer 0 的弱趋势信号不被 Layer 1/2 利用 | 跨层信息共享 via _LayerContext |
2. 算法选择与学术依据
2.0 Layer 0:BOCPD 概率化机制检测 + DFA-2 后备
动机:从点估计到概率推断
v4.0 中 Layer 0 使用 DFA-2 估算 Hurst 指数,再阈值判断。核心问题:60 根 5min K 线只有 5-7 个尺度点,Hurst 估计的标准误极大。R² 质检缓解但未根治——它只能"放弃不可靠的估计",不能"让估计变得可靠"。
更根本的问题是:阈值判断丢弃了不确定性信息。Hurst=0.62(刚过 0.60)和 Hurst=0.85(远超 0.60)触发相同的硬拦截,但风险完全不同。
v5.0 将 Layer 0 重新定义为 "P(均值回归机制失效) > threshold",使用 Bayesian Online Changepoint Detection(BOCPD)实现概率化推断。
候选算法深度对比
| 算法 | 核心思路 | 优点 | 缺点 | 5min K 线适用性 |
|---|---|---|---|---|
| DFA-2 + R² | 多尺度二阶去趋势 RMS 波动 | 对抛物线趋势鲁棒 | 点估计,短序列标准误大 | 一般 |
| HMM (Hamilton 1989) | 隐马尔可夫模型,两状态 | 直接建模机制切换概率 | 需 EM 训练,批量方法,参数不稳定 | 需要离线训练 |
| Wavelet Leaders | 小波多分辨率 Hurst | 短序列更稳定 | 实现复杂,依赖 PyWavelets | 较好 |
| BOCPD (Adams & MacKay 2007) | 贝叶斯在线变点检测 + NIG 共轭先验 | 输出后验概率,在线 O(R) 更新,天然量化不确定性 | 需调 hazard_rate | 最适合 |
选择:BOCPD + DFA-2 混合机制检测
学术背景:
Adams & MacKay (2007) 提出的 BOCPD 使用贝叶斯框架在线检测时间序列的分布变化。对每个时刻,维护一个"运行长度"(当前机制持续多久)的后验分布。使用 Normal-Inverse-Gamma(NIG)共轭先验,对高斯观测的预测分布为 Student-t,可解析更新,无需 MCMC。
为什么 BOCPD 优于 DFA-2:
- 概率输出 vs 点估计:BOCPD 输出 P(trending)∈[0,1],而非 Hurst > 0.6 的二值判断
- 天然不确定性量化:数据不足时 P(trending) 趋向先验(≈0.5),不会产生假信号
- 在线 O(R) 更新:每根 K 线更新一次,R=200(截断),计算开销 << DFA-2
- 无固定窗口:DFA-2 需要固定 60 根窗口;BOCPD 自适应检测机制长度
- 机制切换检测:DFA-2 检测"当前是否趋势";BOCPD 额外检测"机制是否刚切换"
BOCPD 核心算法:
输入:对数收益率序列 {r_t},hazard_rate H(变点先验概率,默认 0.01)
先验:Normal-Inverse-Gamma (μ₀=0, κ₀=1, α₀=1, β₀=0.01)
每个时刻 t:
对每个运行长度 r = 0, 1, ..., R:
1. 预测概率:π(r_t | r) = Student-t_{2α_r}(r_t | μ_r, β_r(κ_r+1)/(α_r·κ_r))
2. 增长概率:P(r_t = r+1) = P(r_{t-1} = r) · π(r_t | r) · (1-H)
3. 变点概率:P(r_t = 0) = Σ_r P(r_{t-1} = r) · π(r_t | r) · H
4. 更新 NIG 充分统计量:
κ' = κ + 1, μ' = (κμ + r_t) / κ'
α' = α + 0.5, β' = β + κ(r_t - μ)² / (2κ')
5. 归一化
趋势概率(贝叶斯边缘化):
P(trending) = Σ_r P(r_t = r) · P(trending | r)
其中 P(trending | r) 基于运行长度 r 的后验均值显著性:
z_r = |μ_r| / sqrt(β_r / (α_r · κ_r))
P(trending | r) = sigmoid(z_r - 2.0)
DFA-2 作为后备(继承 v4.0):
BOCPD 需要约 20 根 K 线预热。预热期间或 BOCPD 不可用时,降级为 v4.0 的 DFA-2 + R² 质检。
混合决策逻辑:
if BOCPD 就绪(≥ 20 根更新):
trend_prob = bocpd.trend_probability
if trend_prob > bocpd_trend_threshold(默认 0.75):
→ 硬拦截:P(trending)={trend_prob:.2f} > {threshold}
else:
→ 放行,但将 trend_prob 共享给 Layer 1/2(跨层信息)
else:
→ 降级为 DFA-2 + R² 质检(v4.0 逻辑)
跨层信息共享(v5.0 新增):
Layer 0 的 trend_prob 即使未达到硬拦截阈值,也包含有价值的信息。例如 trend_prob=0.55 说明有弱趋势信号。v5.0 将此信息传递给下游层:
Layer 1:如果 trend_prob > 0.5,ER 阈值降低(更容易触发软拦截)
er_thresh *= 1.0 - cross_layer_factor * max(0, trend_prob - 0.5)
cross_layer_factor = 0.4 → trend_prob=0.75 时 er_thresh 降低 10%
Layer 2:如果 trend_prob > 0.5,CUSUM 阈值降低(更早检测急动)
cusum_thresh *= 1.0 - cross_layer_factor * 0.5 * max(0, trend_prob - 0.5)
影响更温和(× 0.5),因为 Layer 2 已有 BTC 因子调节
2.1 约束 1&2:持续单边行情过滤(Layer 1)
核心算法:Kaufman ER + RS 自适应净位移(继承 v4.0)
ER 是衡量趋势质量的最佳简单指标,O(N) 复杂度,可解释性强。非对称 ER 阈值和 RS 自适应设计继承 v4.0,不做改动。
v5.0 关键改进 1:多时间框架确认(Multi-Timeframe Confirmation)
5min K 线噪音大,短期波动容易误触发 Layer 1。在专业量化系统中,多时间框架确认是基础设施级别的功能。
v5.0 将 5min bars 聚合为 1h bars(每 12 根聚合一次),在 1h 级别计算 ER 作为确认信号:
# 1h ER 确认逻辑
1h_closes = 最近 mtf_lookback 根 1h K 线(默认 6 根 = 6 小时)
1h_er = direction_distance(1h) / path_length(1h)
if 1h_er >= mtf_er_threshold(默认 0.50):
→ 1h 确认趋势 → 5m ER 阈值降低 20%(更容易触发)
er_thresh *= 0.80
含义:
5m 和 1h 都显示趋势 → 高置信度,降低拦截门槛
5m 显示趋势但 1h 不确认 → 可能是短期噪音,维持原阈值
v5.0 关键改进 2:跨层 BOCPD 信号调节
见 2.0 节跨层信息共享。当 BOCPD 检测到弱趋势信号时(0.5 < trend_prob < 0.75),Layer 1 的 ER 阈值进一步降低,使得持续趋势过滤更敏感。
核心公式(完整版,含 v5.0 修正):
N = sustained_lookback(默认 30 根 5min = 150 分钟)
# Kaufman Efficiency Ratio
ER = abs(close[-1] - close[-N]) / Σ|close[i] - close[i-1]|
# 基础非对称 ER 阈值
er_thresh_base = er_threshold_long(0.60)或 er_threshold_short(0.50)
# v5.0 多时间框架修正
if mtf_enabled AND 1h_er >= mtf_er_threshold:
er_thresh_base *= 0.80
# v5.0 跨层 BOCPD 修正
if cross_layer_enabled AND bocpd_trend_prob > 0.5:
er_thresh_base *= 1.0 - 0.4 * (bocpd_trend_prob - 0.5)
# RS 自适应净位移阈值(继承 v4.0)
adaptive_thresh = base_threshold * (rs_vol / per_symbol_baseline)
(缩放范围 0.3x ~ 3.0x)
# 联合触发(继承 v4.0)
约束1:direction == 'short' AND net_return <= -adaptive_thresh AND ER >= er_thresh
约束2:direction == 'long' AND net_return >= +adaptive_thresh AND ER >= er_thresh
2.2 约束 3&4:急涨急跌过滤(Layer 2)
RS 波动率(继承 v4.0)
Rogers-Satchell (1991) 漂移不变 OHLC 波动率估计器,完全继承 v4.0,不做改动。
v5.0 关键改进 3:自适应 CUSUM drift
问题: v4.0 的 drift=0.5 是固定值。在高波动期(RS_vol 远高于基准),CUSUM 标准化后的 z 值波动加大,固定 drift 无法补偿,导致误报增加。在低波动期,固定 drift 又过于宽松。
解决方案: drift 随 RS 波动率的相对水平动态缩放。
# 自适应 drift
vol_ratio = rs_vol_current / rs_vol_baseline
adaptive_drift = base_drift * clamp(vol_ratio, drift_scale_min, drift_scale_max)
= 0.5 * clamp(vol_ratio, 0.5, 2.0)
高波动期:vol_ratio=2.0 → drift=1.0 → CUSUM 更不敏感(合理:高波动期急动更常见)
低波动期:vol_ratio=0.3 → drift=0.25 → CUSUM 更敏感(合理:低波动期急动更异常)
学术依据: Hawkins & Olwell (1998, Cumulative Sum Charts and Charting for Quality Improvement) 指出 CUSUM 的 ARL (Average Run Length) 性能依赖 drift 与实际信号幅度的比率,建议根据过程特性自适应调整。
v5.0 关键改进 4:BTC 因子指数衰减
问题: v4.0 使用线性衰减 stress_factor = max(0.7, 1.0 - 0.1·excess),系数 0.1 无理论依据,且线性函数在边界处不平滑。
解决方案: 指数衰减更符合物理直觉(影响随距离指数递减)。
# v4.0:线性衰减
stress_factor = max(0.7, 1.0 - 0.1 * (btc_stress - threshold))
# v5.0:指数衰减
stress_factor = max(btc_stress_factor_min, exp(-btc_decay_rate * (btc_stress - threshold)))
btc_decay_rate = 0.3(默认):
excess=1.0 → factor=0.74
excess=2.0 → factor=0.55(但 min=0.7 → 0.70)
excess=3.0 → factor=0.41(但 min=0.7 → 0.70)
v5.0 关键改进 5:OFI 量价升级路径(Phase 2)
当前 VWPM(继承 v4.0) 在 K 线级别度量量价方向一致性,是 Kyle (1985) 模型的简化实现。但 K 线级别的颗粒度有限,无法捕捉盘口微观结构。
OFI(Order Flow Imbalance,Cont, Kukanov & Stoikov 2014) 直接从 L2 book 变化中度量买卖压力不平衡:
OFI = Σ (ΔQ_bid · I(ΔP_bid ≥ 0) - ΔQ_ask · I(ΔP_ask ≤ 0))
OFI > 0 → 净买压 → 上涨有微观结构支撑
OFI < 0 → 净卖压 → 下跌有微观结构支撑
系统已订阅 L2Book,具备 OFI 数据基础。但 OFI 集成需要额外数据管道改造(L2 book → momentum_filter),标记为 Phase 2 实施。Phase 1 继续使用 VWPM。
完整 CUSUM 公式(v5.0):
ret(i) = (close(i) - close(i-1)) / close(i-1)
z(i) = ret(i) / rs_vol
# 自适应 drift(v5.0)
vol_ratio = rs_vol / baseline_vol
adaptive_drift = base_drift * clamp(vol_ratio, 0.5, 2.0)
S_pos(i) = max(0, S_pos(i-1) + z(i) - adaptive_drift)
S_neg(i) = max(0, S_neg(i-1) - z(i) - adaptive_drift)
# BTC 指数衰减因子(v5.0)
if symbol != BTC AND btc_stress > btc_stress_threshold:
excess = btc_stress - btc_stress_threshold
stress_factor = max(btc_stress_factor_min, exp(-btc_decay_rate * excess))
effective_thresh_up *= stress_factor
effective_thresh_down *= stress_factor
# 跨层 BOCPD 修正(v5.0)
if bocpd_trend_prob > 0.5:
cusum_thresh *= 1.0 - 0.2 * max(0, bocpd_trend_prob - 0.5)
# 量价确认(VWPM, Phase 1;OFI, Phase 2)
volume_confirmed = volume_ratio >= 1.5 AND |VWPM|/rs_vol >= vwpm_confirm_ratio
# 非对称触发
约束3:direction=='short' AND S_pos >= effective_thresh_up AND volume_confirmed
约束4:direction=='long' AND S_neg >= effective_thresh_down AND volume_confirmed
2.3 Spread 层面趋势检测(Layer 3)
v5.0 改进:PP + KPSS + ER + OU 半衰期约束
v5.0 关键改进 6:Phillips-Perron 检验替代/补充 ADF
问题: ADF 检验假设误差项为 i.i.d.,但加密市场存在显著的波动率聚集(GARCH 效应)和序列相关。这违反 ADF 假设,导致尺寸扭曲。
解决方案: Phillips-Perron (1988) 检验使用 Newey-West 核估计对检验统计量做非参数修正,对异方差和序列相关鲁棒。
PP vs ADF:
ADF:augmented regression 消除序列相关(添加滞后项) → 对异方差不鲁棒
PP:非参数修正消除序列相关和异方差 → 对两者均鲁棒
加密市场:波动率聚集显著 → PP 更可靠
v5.0 中 PP 优先于 ADF。当 arch 库可用时使用 PP,不可用时降级为 ADF。
v5.0 关键改进 7:OU 半衰期约束
问题: ADF/KPSS/PP 只回答"序列是否平稳",不回答"均值回归有多快"。一个半衰期 200 小时的序列通过平稳性检验,但在 72 小时最大持仓时间内根本来不及回归。
这是 v4.0 最关键的遗漏——对均值回归策略而言,半衰期比平稳性更直接相关。
解决方案: 在平稳性检验通过后,估计 Ornstein-Uhlenbeck 过程的半衰期:
# OU 过程:dX = θ(μ - X)dt + σdW
# 离散化:ΔX_t = θ·X_{t-1} + c + ε_t
# OLS 回归 ΔX on X_{t-1} → 斜率 = θ
# 半衰期 = -ln(2) / ln(1 + θ)
θ < 0 → 均值回归(θ 越负,回归越快)
θ ≥ 0 → 无均值回归(趋势或随机游走)
半衰期 > max_hold_hours / 2 → 回归太慢,在持仓期内大概率来不及
约束:half_life_hours < spread_half_life_max(默认 36h = max_hold_hours/2)
学术依据: Vidyamurthy (2004, Pairs Trading) 指出半衰期是配对交易中最关键的参数之一,决定了策略的预期持仓时间和胜率。
完整 Layer 3 决策逻辑(v5.0):
# 第一关:ER 快速筛选(O(N),继承 v4.0)
if spread_er < spread_er_threshold AND NOT direction_ok:
→ 无趋势,放行
# 第二关:PP/ADF + KPSS 平稳性检验
unit_root_p = PP_pvalue(优先)或 ADF_pvalue(后备)
kpss_p = KPSS_pvalue
四象限决策矩阵(继承 v4.0,PP 替代 ADF):
ur_stationary(p<0.05)+ kpss_stationary(p>0.05)→ 确认平稳 → 第三关
ur_non_stationary + kpss_non_stationary → 确认非平稳 → 维持拦截
矛盾 / 不确定 → 保守维持拦截
# 第三关:OU 半衰期约束(v5.0 新增,仅在确认平稳时执行)
half_life = OLS 估计 OU 半衰期(小时)
if half_life is None OR half_life > spread_half_life_max:
→ 回归太慢或无回归 → 维持拦截
else:
→ 平稳 + 半衰期合理 → 推翻拦截,放行
2.4 跨层信息共享架构(v5.0 新增)
v4.0 的四层完全独立判断,信息不跨层传递。v5.0 引入 _LayerContext 数据结构,在 check() 调用内部共享信息:
@dataclass
_LayerContext:
bocpd_trend_prob: float | None # Layer 0 → Layer 1, 2
hurst: float | None # Layer 0 → 诊断
hurst_r2: float | None # Layer 0 → 诊断
信息流向:
Layer 0 执行 → 填充 ctx.bocpd_trend_prob
↓
Layer 2 使用 ctx.bocpd_trend_prob 调节 CUSUM 阈值
↓
Layer 1 使用 ctx.bocpd_trend_prob 调节 ER 阈值
↓
Layer 3 使用 z4h_history + 半衰期(独立于 ctx)
设计原则:
- 跨层影响是温和的修正因子(最多 10-15% 阈值调整),不是决定性开关
- 各层仍可独立关闭/测试
- ctx 仅在单次 check() 调用内有效,不跨调用持久化
2.5 算法组合架构(五层 + 跨层共享)
输入信号(direction, alt_symbol, base_symbol)
│
├──▶ Layer 0: BOCPD 概率化机制检测 + DFA-2 后备 [v5.0 重写]
│ ├── BOCPD: P(trending) per symbol
│ ├── DFA-2 后备: Hurst + R² 质检(BOCPD 预热期)
│ ├── 共享: ctx.bocpd_trend_prob → Layer 1/2 阈值调节
│ └── P(trending) > threshold → 硬拦截
│
├──▶ Layer 2: 自适应 CUSUM + RS + VWPM + BTC 指数因子 [v5.0 升级]
│ ├── 自适应 drift = base_drift × clamp(vol_ratio, 0.5, 2.0)
│ ├── BTC 指数衰减: exp(-λ·excess)
│ ├── 跨层: ctx.bocpd_trend_prob → 进一步调低阈值
│ ├── VWPM + 量比联合确认(Phase 2: OFI 替代)
│ └── 任一腿触发 → 硬拦截
│
├──▶ Layer 1: ER + RS 净位移 + MTF 确认 [v5.0 升级]
│ ├── 1h ER 确认 → 5m ER 阈值 × 0.80
│ ├── 跨层: ctx.bocpd_trend_prob → ER 阈值调低
│ └── 任一腿触发 → 软拦截 → Layer 3 仲裁
│
└──▶ Layer 3: PP + KPSS + ER + OU 半衰期约束 [v5.0 升级]
├── ER 快速筛(O(N))
├── PP(优先)/ ADF + KPSS 平稳性检验
├── OU 半衰期 < max_hold_hours / 2 约束
└── 平稳 + 半衰期合理 → 推翻拦截;否则维持
│
▼
最终决策:允许 / 拒绝入场
设计理念(v5.0):
| 层级 | 类型 | 可仲裁 | 触发含义 | v5.0 变化 |
|---|---|---|---|---|
| Layer 0 | 硬拦截 | 否 | 均值回归假设根本失效 | BOCPD 概率化 + DFA-2 后备 + 跨层共享 |
| Layer 2 | 硬拦截 | 否 | 急动执行风险极高 | 自适应 drift + BTC 指数衰减 + 跨层修正 |
| Layer 1 | 软拦截 | 是 | 单腿持续趋势,可能误杀 | MTF 确认 + 跨层 BOCPD 修正 |
| Layer 3 | 仲裁器 | — | Spread 平稳+半衰期合理则放行 | PP 替代 ADF + OU 半衰期约束 |
3. 算法具体实现
3.1 模块位置
src/trading/
momentum_filter.py ← 重写(BOCPD + 自适应 CUSUM + MTF + 半衰期 + 跨层共享)
strategy.py ← 修改 _check_entry(),注入并调用 filter;SymbolBaseline 新增 z4h_history
config.py ← 修改 StrategyParams,新增过滤器参数(含 v5.0 新增参数)
orchestrator.py ← 修改 process_analysis(),透传 OHLCV + volume
src/services/
realtime_kline_service_base.py ← 修改 _trigger_strategy_if_ready(),提取双腿 OHLCV + volume
3.2 MomentumFilter 完整代码(v5.0)
# src/trading/momentum_filter.py
"""
开仓动量过滤器 v5.0
四层过滤架构 + 跨层信息共享:
Layer 0: BOCPD 概率化机制检测 + DFA-2 后备 → 硬拦截 [v5.0 重写]
Layer 1: ER + RS 净位移 + MTF 确认 → 软拦截 [v5.0 升级]
Layer 2: 自适应 CUSUM + RS + VWPM + BTC 指数 → 硬拦截 [v5.0 升级]
Layer 3: PP + KPSS + ER + OU 半衰期 → 仲裁 [v5.0 升级]
v5.0 改进清单:
1. [Layer 0] BOCPD 概率化机制检测(Adams & MacKay 2007)
2. [Layer 0] DFA-2 保留为后备
3. [Layer 0→1,2] 跨层共享 bocpd_trend_prob
4. [Layer 1] 多时间框架确认(1h ER 修正 5m 阈值)
5. [Layer 2] 自适应 CUSUM drift ∝ RS vol / baseline
6. [Layer 2] BTC 因子指数衰减
7. [Layer 3] Phillips-Perron 检验(异方差鲁棒)
8. [Layer 3] OU 半衰期约束
"""
import math
import warnings
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime
try:
from statsmodels.tsa.stattools import adfuller as _adfuller
_HAS_ADF = True
except ImportError:
_HAS_ADF = False
try:
from statsmodels.tsa.stattools import kpss as _kpss
_HAS_KPSS = True
except ImportError:
_HAS_KPSS = False
try:
from arch.unitroot import PhillipsPerron as _PhillipsPerron
_HAS_PP = True
except ImportError:
_HAS_PP = False
# ──────────────────────────────────────────────────────────────
# 辅助函数
# ──────────────────────────────────────────────────────────────
def _logsumexp(vals: list[float]) -> float:
"""Numerically stable log-sum-exp."""
if not vals:
return float('-inf')
m = max(vals)
if m == float('-inf'):
return float('-inf')
return m + math.log(sum(math.exp(v - m) for v in vals))
def _student_t_logpdf(x: float, mu: float, scale_sq: float, df: float) -> float:
"""Student-t log PDF: log St(x | mu, scale^2, df)."""
if scale_sq <= 1e-30 or df <= 0:
return -50.0
z_sq = (x - mu) ** 2 / (df * scale_sq)
return (math.lgamma((df + 1) / 2)
- math.lgamma(df / 2)
- 0.5 * math.log(df * math.pi * scale_sq)
- (df + 1) / 2 * math.log(1 + z_sq))
def _estimate_half_life(series: list[float], bar_minutes: float = 5.0) -> float | None:
"""
OU 过程半衰期估计(OLS 回归)。
模型: DeltaX_t = theta * X_{t-1} + c + eps
半衰期 = -ln(2) / ln(1 + theta)
Returns: 半衰期(小时),或 None(无均值回归信号)。
"""
n = len(series)
if n < 10:
return None
y = [series[i] - series[i - 1] for i in range(1, n)]
x = series[:-1]
n_obs = len(y)
x_mean = sum(x) / n_obs
y_mean = sum(y) / n_obs
cov_xy = sum((x[i] - x_mean) * (y[i] - y_mean) for i in range(n_obs))
var_x = sum((x[i] - x_mean) ** 2 for i in range(n_obs))
if var_x < 1e-12:
return None
theta = cov_xy / var_x
if theta >= 0 or theta <= -1:
return None
half_life_bars = -math.log(2) / math.log(1 + theta)
half_life_hours = half_life_bars * bar_minutes / 60.0
return half_life_hours if half_life_hours > 0 else None
# ──────────────────────────────────────────────────────────────
# BOCPD: Bayesian Online Changepoint Detection
# ──────────────────────────────────────────────────────────────
class _BOCPD:
"""
Bayesian Online Changepoint Detection (Adams & MacKay 2007).
Normal-Inverse-Gamma 共轭先验,Student-t 预测分布。
Log-space 实现保证数值稳定性。O(R) per update, R = max_run。
"""
__slots__ = ('_log_h', '_log_1mh', '_mu0', '_kappa0', '_alpha0', '_beta0',
'_max_run', '_log_probs', '_suff', '_n_updates')
def __init__(self, hazard_rate: float = 0.01, mu0: float = 0.0,
kappa0: float = 1.0, alpha0: float = 1.0,
beta0: float = 0.01, max_run: int = 200):
self._log_h = math.log(max(1e-10, hazard_rate))
self._log_1mh = math.log(max(1e-10, 1.0 - hazard_rate))
self._mu0, self._kappa0 = mu0, kappa0
self._alpha0, self._beta0 = alpha0, beta0
self._max_run = max_run
self._log_probs: list[float] = [0.0]
self._suff: list[tuple[float, float, float, float]] = [
(mu0, kappa0, alpha0, beta0)
]
self._n_updates = 0
def update(self, x: float) -> None:
"""处理新观测值(对数收益率)。"""
n = len(self._log_probs)
# 1. 每个运行长度的预测 log-PDF
log_preds = []
for i in range(n):
mu, kappa, alpha, beta = self._suff[i]
if kappa > 0 and alpha > 0 and beta > 0:
scale_sq = beta * (kappa + 1) / (alpha * kappa)
log_preds.append(_student_t_logpdf(x, mu, scale_sq, 2 * alpha))
else:
log_preds.append(-50.0)
# 2. 增长概率
log_growth = [
self._log_probs[i] + log_preds[i] + self._log_1mh
for i in range(n)
]
# 3. 变点概率
log_cp = _logsumexp([
self._log_probs[i] + log_preds[i] + self._log_h
for i in range(n)
])
# 4. 新分布 + 归一化
new_log = [log_cp] + log_growth
log_total = _logsumexp(new_log)
new_log = [lp - log_total for lp in new_log]
# 5. 更新 NIG 充分统计量
new_suff: list[tuple[float, float, float, float]] = [
(self._mu0, self._kappa0, self._alpha0, self._beta0)
]
for i in range(n):
mu, kappa, alpha, beta = self._suff[i]
kappa_n = kappa + 1
mu_n = (kappa * mu + x) / kappa_n
alpha_n = alpha + 0.5
beta_n = beta + kappa * (x - mu) ** 2 / (2 * kappa_n)
new_suff.append((mu_n, kappa_n, alpha_n, beta_n))
# 6. 截断
if len(new_log) > self._max_run:
new_log = new_log[:self._max_run]
new_suff = new_suff[:self._max_run]
log_total = _logsumexp(new_log)
new_log = [lp - log_total for lp in new_log]
self._log_probs = new_log
self._suff = new_suff
self._n_updates += 1
@property
def trend_probability(self) -> float:
"""
P(trending) — 贝叶斯后验边缘化。
对每个运行长度 r,检查后验均值是否显著偏离零。
使用 sigmoid 映射为概率,然后按运行长度概率加权求和。
"""
total = 0.0
for i in range(len(self._log_probs)):
prob = math.exp(self._log_probs[i])
if prob < 1e-8:
continue
mu, kappa, alpha, beta = self._suff[i]
if alpha > 0.5 and beta > 1e-30 and kappa > 0:
var_mean = beta / (alpha * kappa)
std_mean = math.sqrt(var_mean) if var_mean > 0 else 1e-10
z = abs(mu) / std_mean if std_mean > 1e-10 else 0
p_trend = 1.0 / (1.0 + math.exp(-(z - 2.0)))
else:
p_trend = 0.5
total += prob * p_trend
return total
@property
def ready(self) -> bool:
return self._n_updates >= 20
# ──────────────────────────────────────────────────────────────
# DFA-2 Hurst 后备(继承 v4.0)
# ──────────────────────────────────────────────────────────────
def _hurst_dfa2(closes: list[float], r2_threshold: float = 0.85) -> tuple[float | None, float | None]:
"""
DFA-2(二阶去趋势波动分析)估算 Hurst 指数(v4.0 继承)。
返回 (hurst, r_squared),不可靠时返回 (None, None)。
"""
n = len(closes)
if n < 20:
return None, None
rets = []
for i in range(1, n):
if closes[i] > 0 and closes[i - 1] > 0:
rets.append(math.log(closes[i] / closes[i - 1]))
nr = len(rets)
if nr < 16:
return None, None
mean_r = sum(rets) / nr
profile = []
cum = 0.0
for r in rets:
cum += r - mean_r
profile.append(cum)
scales: list[float] = []
rms_vals: list[float] = []
s = 4
max_s = nr // 4
while s <= max_s:
n_win = nr // s
if n_win < 2:
break
rss_sum = 0.0
rss_cnt = 0
for i in range(n_win):
seg = profile[i * s: (i + 1) * s]
ws = len(seg)
if ws < 3:
if ws >= 2:
x_m = (ws - 1) * 0.5
y_m = sum(seg) / ws
cov = sum((j - x_m) * (seg[j] - y_m) for j in range(ws))
var_x = sum((j - x_m) ** 2 for j in range(ws))
slope = cov / var_x if var_x > 1e-12 else 0.0
intercept = y_m - slope * x_m
rss = sum((seg[j] - (intercept + slope * j)) ** 2 for j in range(ws)) / ws
rss_sum += rss
rss_cnt += 1
continue
s0 = float(ws)
s1 = ws * (ws - 1) / 2.0
s2 = ws * (ws - 1) * (2 * ws - 1) / 6.0
s3 = (ws * (ws - 1) / 2.0) ** 2
s4 = ws * (ws - 1) * (2 * ws - 1) * (3 * ws * ws - 3 * ws - 1) / 30.0
sy0 = sum(seg)
sy1 = sum(j * seg[j] for j in range(ws))
sy2 = sum(j * j * seg[j] for j in range(ws))
det = (s0 * (s2 * s4 - s3 * s3)
- s1 * (s1 * s4 - s3 * s2)
+ s2 * (s1 * s3 - s2 * s2))
if abs(det) < 1e-12:
x_m = (ws - 1) * 0.5
y_m = sum(seg) / ws
cov = sum((j - x_m) * (seg[j] - y_m) for j in range(ws))
var_x = sum((j - x_m) ** 2 for j in range(ws))
slope = cov / var_x if var_x > 1e-12 else 0.0
intercept = y_m - slope * x_m
rss = sum((seg[j] - (intercept + slope * j)) ** 2 for j in range(ws)) / ws
rss_sum += rss
rss_cnt += 1
continue
a = ((sy0 * (s2 * s4 - s3 * s3)
- s1 * (sy1 * s4 - s3 * sy2)
+ s2 * (sy1 * s3 - s2 * sy2)) / det)
b = ((s0 * (sy1 * s4 - s3 * sy2)
- sy0 * (s1 * s4 - s3 * s2)
+ s2 * (s1 * sy2 - sy1 * s2)) / det)
c = ((s0 * (s2 * sy2 - sy1 * s3)
- s1 * (s1 * sy2 - sy1 * s2)
+ sy0 * (s1 * s3 - s2 * s2)) / det)
rss = sum((seg[j] - (a + b * j + c * j * j)) ** 2 for j in range(ws)) / ws
rss_sum += rss
rss_cnt += 1
if rss_cnt >= 2:
f_s = math.sqrt(rss_sum / rss_cnt)
if f_s > 1e-14:
scales.append(math.log(s))
rms_vals.append(math.log(f_s))
s = max(s + 1, int(s * 1.4 + 0.5))
if len(scales) < 3:
return None, None
n_pts = len(scales)
x_m = sum(scales) / n_pts
y_m = sum(rms_vals) / n_pts
num = sum((scales[i] - x_m) * (rms_vals[i] - y_m) for i in range(n_pts))
den = sum((scales[i] - x_m) ** 2 for i in range(n_pts))
if den < 1e-12:
return None, None
hurst = max(0.0, min(1.0, num / den))
slope = num / den
ss_res = sum((rms_vals[i] - (y_m + slope * (scales[i] - x_m))) ** 2 for i in range(n_pts))
ss_tot = sum((rms_vals[i] - y_m) ** 2 for i in range(n_pts))
r_squared = 1.0 - ss_res / ss_tot if ss_tot > 1e-12 else 0.0
if r_squared < r2_threshold:
return None, None
return hurst, r_squared
# ──────────────────────────────────────────────────────────────
# 辅助类
# ──────────────────────────────────────────────────────────────
class _RollingMedian:
"""固定窗口滚动中位数(排序法,O(N log N) per query)。"""
__slots__ = ('_data',)
def __init__(self, maxlen: int):
self._data: deque = deque(maxlen=maxlen)
def push(self, val: float) -> None:
self._data.append(val)
@property
def value(self) -> float | None:
if not self._data:
return None
s = sorted(self._data)
n = len(s)
mid = n >> 1
return s[mid] if n & 1 else (s[mid - 1] + s[mid]) * 0.5
def __len__(self) -> int:
return len(self._data)
class _HourBarAccum:
"""5m → 1h 聚合器。每 12 根 5m bar 聚合为 1 根 1h bar。"""
__slots__ = ('count', 'open_', 'high', 'low', 'close', 'volume')
def __init__(self):
self.reset()
def reset(self):
self.count = 0
self.open_ = 0.0
self.high = 0.0
self.low = float('inf')
self.close = 0.0
self.volume = 0.0
def add(self, o: float, h: float, l: float, c: float, v: float) -> bool:
"""添加 5m bar,返回 True 当 1h bar 已完成。"""
if self.count == 0:
self.open_ = o
self.high = h
self.low = l
else:
self.high = max(self.high, h)
self.low = min(self.low, l)
self.close = c
self.volume += v
self.count += 1
return self.count >= 12
def as_tuple(self) -> tuple:
return (self.close, self.high, self.low, self.open_, self.volume)
@dataclass
class _LayerContext:
"""跨层共享上下文,单次 check() 调用内有效。"""
bocpd_trend_prob: float | None = None
hurst: float | None = None
hurst_r2: float | None = None
# ──────────────────────────────────────────────────────────────
# MomentumFilter 主类
# ──────────────────────────────────────────────────────────────
class MomentumFilter:
"""
开仓动量过滤器 v5.0
公开接口:
update(symbol, close, high, low, open_, volume, kline_time)
check(symbol, direction) -> (allowed, reason, is_soft)
check_spread(z4h_history, direction, max_hold_hours) -> (has_trend, reason)
ready(symbol) -> bool
"""
def __init__(
self,
# ── 总开关 ──
enabled: bool = True,
# ── Layer 0: BOCPD + DFA-2 后备 ──
bocpd_enabled: bool = True,
bocpd_hazard_rate: float = 0.01,
bocpd_trend_threshold: float = 0.75,
hurst_enabled: bool = True,
hurst_lookback: int = 60,
hurst_threshold: float = 0.60,
hurst_r2_threshold: float = 0.85,
# ── Layer 1: 持续趋势 + MTF ──
sustained_lookback: int = 30,
sustained_base_threshold: float = 0.008,
er_threshold_long: float = 0.60,
er_threshold_short: float = 0.50,
mtf_enabled: bool = True,
mtf_er_threshold: float = 0.50,
mtf_lookback: int = 6,
# ── Layer 2: 急动检测 ──
rs_period: int = 10,
cusum_drift: float = 0.5,
cusum_drift_adaptive: bool = True,
cusum_threshold_spike_up: float = 3.5,
cusum_threshold_spike_down: float = 2.5,
volume_confirm_ratio: float = 1.5,
volume_ema_period: int = 20,
vwpm_window: int = 5,
vwpm_confirm_ratio: float = 0.8,
# ── Layer 2: BTC 因子 ──
market_ref_symbol: str = "BTC",
btc_stress_threshold: float = 2.0,
btc_stress_factor_min: float = 0.7,
btc_decay_rate: float = 0.3,
# ── Layer 3: Spread 仲裁 ──
spread_lookback: int = 20,
spread_er_threshold: float = 0.45,
spread_net_threshold: float = 1.5,
spread_adf_pvalue: float = 0.05,
spread_kpss_pvalue: float = 0.05,
spread_use_pp: bool = True,
spread_half_life_max: float = 36.0,
# ── 跨层 ──
cross_layer_enabled: bool = True,
cross_layer_factor: float = 0.4,
):
self._enabled = enabled
# Layer 0
self._bocpd_enabled = bocpd_enabled
self._bocpd_hazard = bocpd_hazard_rate
self._bocpd_trend_thresh = bocpd_trend_threshold
self._hurst_enabled = hurst_enabled
self._hurst_lookback = max(20, hurst_lookback)
self._hurst_thresh = hurst_threshold
self._hurst_r2_thresh = hurst_r2_threshold
# Layer 1
self._sustained_n = max(5, sustained_lookback)
self._sustained_base_thresh = sustained_base_threshold
self._er_thresh_long = er_threshold_long
self._er_thresh_short = er_threshold_short
self._mtf_enabled = mtf_enabled
self._mtf_er_thresh = mtf_er_threshold
self._mtf_lookback = mtf_lookback
# Layer 2
self._rs_period = rs_period
self._cusum_drift = cusum_drift
self._cusum_drift_adaptive = cusum_drift_adaptive
self._cusum_thresh_up = cusum_threshold_spike_up
self._cusum_thresh_down = cusum_threshold_spike_down
self._vol_confirm_ratio = volume_confirm_ratio
self._vol_ema_period = volume_ema_period
self._vwpm_window = vwpm_window
self._vwpm_confirm_ratio = vwpm_confirm_ratio
# Layer 2: BTC
self._market_ref = market_ref_symbol
self._btc_stress_thresh = btc_stress_threshold
self._btc_stress_factor_min = btc_stress_factor_min
self._btc_decay_rate = btc_decay_rate
# Layer 3
self._spread_lookback = spread_lookback
self._spread_er_thresh = spread_er_threshold
self._spread_net_thresh = spread_net_threshold
self._spread_adf_pvalue = spread_adf_pvalue
self._spread_kpss_pvalue = spread_kpss_pvalue
self._spread_use_pp = spread_use_pp
self._spread_hl_max = spread_half_life_max
# 跨层
self._cross_layer = cross_layer_enabled
self._cross_factor = cross_layer_factor
# 数据缓冲区
buf_size = max(hurst_lookback + 5, sustained_lookback + 5,
rs_period + 10, volume_ema_period + 5,
vwpm_window + 2) + 10
self._buf_size = buf_size
# 状态
self._buffers: dict[str, deque] = {}
self._last_kline_time: dict[str, datetime] = {}
self._cusum_state: dict[str, tuple[float, float]] = {}
self._rs_ema: dict[str, float] = {}
self._vol_ema: dict[str, float] = {}
self._baseline_median: dict[str, _RollingMedian] = {}
self._bocpd_state: dict[str, _BOCPD] = {}
self._buffers_1h: dict[str, deque] = {}
self._hour_accum: dict[str, _HourBarAccum] = {}
# ─────────────── 公开接口 ───────────────
def update(self, symbol: str, close: float, high: float = 0.0,
low: float = 0.0, open_: float = 0.0, volume: float = 0.0,
kline_time: datetime | None = None) -> None:
"""每根新 K 线收盘时调用。"""
if kline_time is not None:
if self._last_kline_time.get(symbol) == kline_time:
return
self._last_kline_time[symbol] = kline_time
if symbol not in self._buffers:
self._buffers[symbol] = deque(maxlen=self._buf_size)
self._buffers[symbol].append((close, high, low, open_, volume))
buf = self._buffers[symbol]
if len(buf) >= 2:
self._online_update(symbol, buf)
def check(self, symbol: str, direction: str) -> tuple[bool, str, bool]:
"""
Layer 0 + Layer 2 + Layer 1 检查(单腿维度)。
执行顺序:Layer 0(硬)→ Layer 2(硬)→ Layer 1(软)
"""
if not self._enabled:
return True, "", False
buf = self._buffers.get(symbol)
if buf is None or len(buf) < self._rs_period + 2:
return True, "", False
ctx = _LayerContext()
# ── Layer 0: BOCPD + DFA-2 后备 ──
ok, reason = self._check_regime(symbol, ctx)
if not ok:
return False, reason, False
# ── Layer 2: 急动检测 ──
ok, reason = self._check_spike(symbol, direction, ctx)
if not ok:
return False, reason, False
# ── Layer 1: 持续趋势 + MTF ──
if len(buf) >= self._sustained_n + 2:
ok, reason = self._check_sustained(symbol, direction, ctx)
if not ok:
return False, reason, True
return True, "", False
def check_spread(self, z4h_history: list[float], direction: str,
max_hold_hours: float = 72.0) -> tuple[bool, str]:
"""
Layer 3: PP + KPSS + ER + OU 半衰期约束。
仅在存在软拦截(Layer 1)时调用。
"""
n = self._spread_lookback
if len(z4h_history) < n + 1:
return False, ""
series = z4h_history[-(n + 1):]
# ── 第一关:ER 快速筛选 ──
spread_dir = abs(series[-1] - series[0])
spread_path = sum(abs(series[i] - series[i - 1]) for i in range(1, len(series)))
er = spread_dir / spread_path if spread_path > 1e-10 else 0.0
spread_net = series[-1] - series[0]
if direction == 'short':
dir_ok = spread_net <= -self._spread_net_thresh
else:
dir_ok = spread_net >= self._spread_net_thresh
if not (er >= self._spread_er_thresh and dir_ok):
return False, ""
# ── 第二关:PP/ADF + KPSS 平稳性检验 ──
ur_p = None
kpss_p = None
ur_test_name = "ER-only"
if len(series) >= 15:
# PP 优先(对异方差鲁棒)
if self._spread_use_pp and _HAS_PP:
try:
import numpy as _np
pp_result = _PhillipsPerron(_np.array(series, dtype=float))
ur_p = float(pp_result.pvalue)
ur_test_name = "PP"
except Exception:
pass
# ADF 后备
if ur_p is None and _HAS_ADF:
try:
ur_p = _adfuller(series, maxlag=2, regression='c', autolag=None)[1]
ur_test_name = "ADF"
except Exception:
pass
# KPSS
if _HAS_KPSS:
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
kpss_p = _kpss(series, regression='c', nlags='auto')[1]
except Exception:
pass
# ── 四象限决策 + 半衰期约束 ──
if ur_p is not None and kpss_p is not None:
ur_stationary = ur_p < self._spread_adf_pvalue
kpss_stationary = kpss_p > self._spread_kpss_pvalue
if ur_stationary and kpss_stationary:
# 确认平稳 → 检查半衰期
hl = _estimate_half_life(series)
hl_max = min(self._spread_hl_max, max_hold_hours * 0.5)
if hl is not None and hl > hl_max:
return True, (
f"平稳但半衰期过长: {ur_test_name}_p={ur_p:.3f}"
f" KPSS_p={kpss_p:.3f} HL={hl:.1f}h>{hl_max:.0f}h→维持拦截"
)
hl_str = f" HL={hl:.1f}h" if hl else " HL=N/A"
return False, (
f"{ur_test_name}+KPSS确认平稳({ur_test_name}_p={ur_p:.3f}"
f" KPSS_p={kpss_p:.3f}{hl_str})→推翻拦截"
)
if not ur_stationary and not kpss_stationary:
return True, (
f"Spread趋势(三重确认): ER={er:.2f}"
f" {ur_test_name}_p={ur_p:.3f} KPSS_p={kpss_p:.3f}"
)
return True, (
f"Spread趋势(矛盾/不确定): ER={er:.2f}"
f" {ur_test_name}_p={ur_p:.3f} KPSS_p={kpss_p:.3f}→保守拦截"
)
# 只有单项检验
if ur_p is not None:
if ur_p < self._spread_adf_pvalue:
hl = _estimate_half_life(series)
hl_max = min(self._spread_hl_max, max_hold_hours * 0.5)
if hl is not None and hl > hl_max:
return True, f"平稳但HL过长: {ur_test_name}_p={ur_p:.3f} HL={hl:.1f}h"
return False, f"{ur_test_name}判定平稳(p={ur_p:.3f})→推翻拦截(无KPSS)"
return True, f"Spread趋势({ur_test_name}+ER): ER={er:.2f} p={ur_p:.3f}"
# 降级:纯 ER
return True, f"Spread趋势(ER): ER={er:.2f} 净位移={spread_net:+.3f}"
def ready(self, symbol: str) -> bool:
buf = self._buffers.get(symbol)
return buf is not None and len(buf) >= self._rs_period + 2
# ─────────────── Layer 0: BOCPD + DFA-2 后备 ───────────────
def _check_regime(self, symbol: str, ctx: _LayerContext) -> tuple[bool, str]:
"""Layer 0: 概率化机制检测。"""
# BOCPD 路径
if self._bocpd_enabled:
bocpd = self._bocpd_state.get(symbol)
if bocpd and bocpd.ready:
trend_prob = bocpd.trend_probability
ctx.bocpd_trend_prob = trend_prob
if trend_prob > self._bocpd_trend_thresh:
return False, (
f"Layer0-BOCPD趋势机制: P(trending)={trend_prob:.3f}"
f">{self._bocpd_trend_thresh}"
)
return True, ""
# DFA-2 后备路径
if not self._hurst_enabled:
return True, ""
buf = self._buffers.get(symbol)
if buf is None or len(buf) < self._hurst_lookback + 1:
return True, ""
closes = [d[0] for d in list(buf)[-(self._hurst_lookback + 1):]]
h, r2 = _hurst_dfa2(closes, r2_threshold=self._hurst_r2_thresh)
if h is not None:
ctx.hurst = h
ctx.hurst_r2 = r2
if h is None:
return True, ""
if h > self._hurst_thresh:
return False, (
f"Layer0-DFA2趋势机制: Hurst={h:.3f}>{self._hurst_thresh}"
f" R²={r2:.3f}(BOCPD未就绪,DFA-2后备)"
)
return True, ""
# ─────────────── Layer 1: 持续趋势 + MTF ───────────────
def _check_sustained(self, symbol: str, direction: str,
ctx: _LayerContext) -> tuple[bool, str]:
"""Layer 1: ER + RS 净位移 + MTF 确认 + 跨层 BOCPD 修正。"""
buf = self._buffers[symbol]
n = self._sustained_n
data = list(buf)[-(n + 1):]
closes = [d[0] for d in data]
if len(closes) < n + 1:
return True, ""
ref = closes[0]
if ref <= 0:
return True, ""
net_return = (closes[-1] - ref) / ref
# Kaufman ER
direction_dist = abs(closes[-1] - ref)
path_length = sum(abs(closes[i] - closes[i - 1]) for i in range(1, len(closes)))
er = direction_dist / path_length if path_length > 1e-10 else 0.0
# 基础非对称 ER 阈值
er_thresh = self._er_thresh_long if direction == 'long' else self._er_thresh_short
# v5.0: MTF 修正
if self._mtf_enabled:
mtf_er = self._calc_mtf_er(symbol)
if mtf_er is not None and mtf_er >= self._mtf_er_thresh:
er_thresh *= 0.80
# v5.0: 跨层 BOCPD 修正
if self._cross_layer and ctx.bocpd_trend_prob is not None:
if ctx.bocpd_trend_prob > 0.5:
adjustment = self._cross_factor * (ctx.bocpd_trend_prob - 0.5)
er_thresh *= (1.0 - adjustment)
if er < er_thresh:
return True, ""
# RS 自适应阈值
rs_var = self._rs_ema.get(symbol, 0.0)
rs_vol = math.sqrt(max(0.0, rs_var))
baseline = self._baseline_median.get(symbol)
baseline_val = baseline.value if (baseline and len(baseline) >= 5) else None
if rs_vol > 0 and baseline_val and baseline_val > 0:
adaptive_thresh = self._sustained_base_thresh * (rs_vol / baseline_val)
adaptive_thresh = max(self._sustained_base_thresh * 0.3,
min(adaptive_thresh, self._sustained_base_thresh * 3.0))
else:
adaptive_thresh = self._sustained_base_thresh
mtf_str = ""
if self._mtf_enabled:
mtf_er = self._calc_mtf_er(symbol)
if mtf_er is not None and mtf_er >= self._mtf_er_thresh:
mtf_str = f" [MTF-1h-ER={mtf_er:.2f}确认]"
if direction == 'short' and net_return <= -adaptive_thresh:
return False, (
f"Layer1-不追跌: 过去{n}根净跌幅={net_return:.2%}"
f" 阈值=-{adaptive_thresh:.2%} ER={er:.2f}>={er_thresh:.2f}{mtf_str}"
)
if direction == 'long' and net_return >= adaptive_thresh:
return False, (
f"Layer1-不追涨: 过去{n}根净涨幅={net_return:.2%}"
f" 阈值=+{adaptive_thresh:.2%} ER={er:.2f}>={er_thresh:.2f}{mtf_str}"
)
return True, ""
def _calc_mtf_er(self, symbol: str) -> float | None:
"""计算 1h 级别 ER。"""
buf_1h = self._buffers_1h.get(symbol)
if buf_1h is None or len(buf_1h) < self._mtf_lookback + 1:
return None
data = list(buf_1h)[-(self._mtf_lookback + 1):]
closes = [d[0] for d in data]
d_dist = abs(closes[-1] - closes[0])
p_len = sum(abs(closes[i] - closes[i - 1]) for i in range(1, len(closes)))
return d_dist / p_len if p_len > 1e-10 else 0.0
# ─────────────── Layer 2: 急动检测 ───────────────
def _check_spike(self, symbol: str, direction: str,
ctx: _LayerContext) -> tuple[bool, str]:
"""Layer 2: 自适应 CUSUM + BTC 指数因子 + 跨层修正。"""
s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))
effective_thresh_up = self._cusum_thresh_up
effective_thresh_down = self._cusum_thresh_down
# BTC 指数衰减因子(v5.0)
btc_str = ""
if symbol != self._market_ref:
btc_s_pos, btc_s_neg = self._cusum_state.get(self._market_ref, (0.0, 0.0))
btc_stress = max(btc_s_pos, btc_s_neg)
if btc_stress > self._btc_stress_thresh:
excess = btc_stress - self._btc_stress_thresh
stress_factor = max(
self._btc_stress_factor_min,
math.exp(-self._btc_decay_rate * excess),
)
effective_thresh_up *= stress_factor
effective_thresh_down *= stress_factor
btc_str = f" BTC压力→阈值×{stress_factor:.2f}"
# 跨层 BOCPD 修正(v5.0)
if self._cross_layer and ctx.bocpd_trend_prob is not None:
if ctx.bocpd_trend_prob > 0.5:
cusum_adj = 1.0 - 0.2 * (ctx.bocpd_trend_prob - 0.5)
effective_thresh_up *= cusum_adj
effective_thresh_down *= cusum_adj
# 量价确认
vol_confirmed, vol_ratio, vwpm_val = self._check_volume(symbol)
rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
vol_str = f" 量比={vol_ratio:.1f}" if vol_ratio > 0 else ""
vwpm_str = f" VWPM={vwpm_val:.4f}" if vwpm_val != 0.0 else ""
if direction == 'short' and s_pos >= effective_thresh_up and vol_confirmed:
return False, (
f"Layer2-暴涨不做空: CUSUM+={s_pos:.2f}>={effective_thresh_up:.2f}"
f" RS_vol={rs_vol:.4f}{vol_str}{vwpm_str}{btc_str}"
)
if direction == 'long' and s_neg >= effective_thresh_down and vol_confirmed:
return False, (
f"Layer2-暴跌不做多: CUSUM-={s_neg:.2f}>={effective_thresh_down:.2f}"
f" RS_vol={rs_vol:.4f}{vol_str}{vwpm_str}{btc_str}"
)
return True, ""
def _check_volume(self, symbol: str) -> tuple[bool, float, float]:
"""VWPM + 量比联合确认(继承 v4.0)。"""
buf = self._buffers.get(symbol)
if buf is None or len(buf) < 2:
return True, 0.0, 0.0
current_vol = buf[-1][4]
vol_ema = self._vol_ema.get(symbol, 0.0)
if vol_ema > 0 and current_vol > 0:
vol_ratio = current_vol / vol_ema
else:
return True, 0.0, 0.0
vwpm_val = 0.0
w = min(self._vwpm_window, len(buf) - 1)
if w >= 1:
data = list(buf)[-(w + 1):]
weighted_sum = 0.0
total_vol = 0.0
for i in range(1, len(data)):
prev_c = data[i - 1][0]
curr_c = data[i][0]
v = data[i][4]
if prev_c > 0 and v > 0:
ret = (curr_c - prev_c) / prev_c
weighted_sum += ret * v
total_vol += v
if total_vol > 0:
vwpm_val = weighted_sum / total_vol
rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
if rs_vol > 1e-10 and vwpm_val != 0.0:
vwpm_normalized = abs(vwpm_val) / rs_vol
confirmed = (vol_ratio >= self._vol_confirm_ratio
and vwpm_normalized >= self._vwpm_confirm_ratio)
else:
confirmed = vol_ratio >= self._vol_confirm_ratio
return confirmed, vol_ratio, vwpm_val
# ─────────────── 在线更新 ───────────────
def _online_update(self, symbol: str, buf: deque) -> None:
"""O(1) 在线更新所有状态。"""
curr = buf[-1]
prev = buf[-2]
close, high, low, open_, volume = curr
prev_close = prev[0]
if prev_close <= 0 or close <= 0:
return
# ── RS EMA ──
rs_val = self._calc_rs_single(high, low, open_, close)
alpha_rs = 2.0 / (self._rs_period + 1)
prev_rs = self._rs_ema.get(symbol, rs_val)
new_rs = alpha_rs * rs_val + (1.0 - alpha_rs) * prev_rs
self._rs_ema[symbol] = new_rs
# ── Per-symbol 基准波动率 ──
rs_vol_now = math.sqrt(max(0.0, new_rs))
if symbol not in self._baseline_median:
self._baseline_median[symbol] = _RollingMedian(maxlen=200)
self._baseline_median[symbol].push(rs_vol_now)
# ── CUSUM(自适应 drift, v5.0)──
rs_vol = math.sqrt(max(0.0, new_rs))
if rs_vol > 1e-10:
ret = (close - prev_close) / prev_close
z = ret / rs_vol
# 自适应 drift
drift = self._cusum_drift
if self._cusum_drift_adaptive:
baseline = self._baseline_median.get(symbol)
bl_val = baseline.value if (baseline and len(baseline) >= 5) else None
if bl_val and bl_val > 0:
vol_ratio = rs_vol / bl_val
drift = self._cusum_drift * max(0.5, min(2.0, vol_ratio))
s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))
s_pos = max(0.0, s_pos + z - drift)
s_neg = max(0.0, s_neg - z - drift)
self._cusum_state[symbol] = (s_pos, s_neg)
# ── Volume EMA ──
if volume > 0:
alpha_v = 2.0 / (self._vol_ema_period + 1)
prev_v = self._vol_ema.get(symbol, volume)
self._vol_ema[symbol] = alpha_v * volume + (1.0 - alpha_v) * prev_v
# ── BOCPD 更新(v5.0)──
if self._bocpd_enabled and prev_close > 0:
log_ret = math.log(close / prev_close)
if symbol not in self._bocpd_state:
self._bocpd_state[symbol] = _BOCPD(
hazard_rate=self._bocpd_hazard, max_run=200,
)
self._bocpd_state[symbol].update(log_ret)
# ── 1h 聚合(v5.0 MTF)──
if self._mtf_enabled:
if symbol not in self._hour_accum:
self._hour_accum[symbol] = _HourBarAccum()
accum = self._hour_accum[symbol]
o = open_ if open_ > 0 else close
h = high if high > 0 else close
l = low if low > 0 else close
if accum.add(o, h, l, close, volume):
if symbol not in self._buffers_1h:
self._buffers_1h[symbol] = deque(maxlen=30)
self._buffers_1h[symbol].append(accum.as_tuple())
accum.reset()
@staticmethod
def _calc_rs_single(high: float, low: float, open_: float, close: float) -> float:
"""Rogers-Satchell (1991) 波动率估计器(单根 K 线)。"""
if high > 0 and low > 0 and open_ > 0 and high >= low:
try:
rs = (math.log(high / close) * math.log(high / open_)
+ math.log(low / close) * math.log(low / open_))
return max(0.0, rs)
except (ValueError, ZeroDivisionError):
pass
if open_ > 0:
try:
return math.log(close / open_) ** 2
except (ValueError, ZeroDivisionError):
pass
return 0.0
3.3 StrategyParams 新增字段(v5.0)
在 src/trading/config.py 的 StrategyParams 中新增(v4.0 的 24 个字段 + v5.0 新增 9 个,共 33 个,全部有默认值):
@dataclass(frozen=True)
class StrategyParams:
# ... 现有字段 ...
# ── 动量过滤器参数 ──
momentum_filter_enabled: bool = True
# Layer 0: BOCPD + DFA-2 后备
momentum_bocpd_enabled: bool = True # v5.0 新增
momentum_bocpd_hazard_rate: float = 0.01 # v5.0 新增
momentum_bocpd_trend_threshold: float = 0.75 # v5.0 新增
momentum_hurst_enabled: bool = True
momentum_hurst_lookback: int = 60
momentum_hurst_threshold: float = 0.60
momentum_hurst_r2_threshold: float = 0.85
# Layer 1: 持续趋势 + MTF
momentum_sustained_lookback: int = 30
momentum_sustained_base_threshold: float = 0.008
momentum_er_threshold_long: float = 0.60
momentum_er_threshold_short: float = 0.50
momentum_mtf_enabled: bool = True # v5.0 新增
momentum_mtf_er_threshold: float = 0.50 # v5.0 新增
momentum_mtf_lookback: int = 6 # v5.0 新增
# Layer 2: 急动检测
momentum_rs_period: int = 10
momentum_cusum_drift: float = 0.5
momentum_cusum_drift_adaptive: bool = True # v5.0 新增
momentum_cusum_threshold_spike_up: float = 3.5
momentum_cusum_threshold_spike_down: float = 2.5
momentum_volume_confirm_ratio: float = 1.5
momentum_volume_ema_period: int = 20
momentum_vwpm_window: int = 5
momentum_vwpm_confirm_ratio: float = 0.8
momentum_market_ref_symbol: str = "BTC"
momentum_btc_stress_threshold: float = 2.0
momentum_btc_stress_factor_min: float = 0.7
momentum_btc_decay_rate: float = 0.3 # v5.0 新增
# Layer 3: Spread 仲裁
momentum_spread_lookback: int = 20
momentum_spread_er_threshold: float = 0.45
momentum_spread_net_threshold: float = 1.5
momentum_spread_adf_pvalue: float = 0.05
momentum_spread_kpss_pvalue: float = 0.05
momentum_spread_use_pp: bool = True # v5.0 新增
momentum_spread_half_life_max: float = 36.0 # v5.0 新增
# 跨层
momentum_cross_layer_enabled: bool = True # v5.0 新增
momentum_cross_layer_factor: float = 0.4 # v5.0 新增
3.4 strategy.py 集成改动
与 v4.0 完全一致:SymbolBaseline 新增 z4h_history,process_tick 透传 OHLCV,_check_entry 四层检查。唯一变化是 check_spread 新增 max_hold_hours 参数:
# _check_entry 中 Layer 3 仲裁调用(v5.0 传入 max_hold_hours)
spread_trend, spread_reason = self._momentum_filter.check_spread(
z4h_hist, direction,
max_hold_hours=params.max_hold_hours,
)
3.5 改动范围(v5.0)
| 改动文件 | 改动内容 | 改动量 |
|---|---|---|
src/trading/momentum_filter.py |
重写:BOCPD + 自适应 CUSUM + MTF + PP + 半衰期 + 跨层 | ~550 行 |
src/trading/strategy.py |
check_spread 传入 max_hold_hours | ~2 行 |
src/trading/config.py |
StrategyParams 新增 9 个字段 | ~15 行 |
src/trading/orchestrator.py |
透传 OHLCV(继承 v4.0) | ~5 行 |
src/services/realtime_kline_service_base.py |
提取双腿 OHLCV(继承 v4.0) | ~20 行 |
4. 融入当前交易体系的方案
4.1 数据流全景(v5.0)
WebSocket K线推送(OHLCV + volume)
|
v
realtime_kline_service_base
|
+-- _trigger_strategy_if_ready()
|
+-- 提取 alt/base OHLCV + volume
v
TradingOrchestrator.process_analysis(
..., alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv
)
|
v
AdaptiveBollingerStrategy.process_tick(
..., alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv
)
|
+-- [新K线] bl.z4h_history.append(z4h)
|
+-- [新K线] MomentumFilter.update(alt/base, ohlcv, kline_time)
| → O(1) 在线更新:
| RS EMA / 自适应CUSUM / Volume EMA
| Per-symbol基准中位数 / BOCPD(v5.0) / 1h聚合(v5.0)
|
+-- _check_entry()
|
步骤1-4: 冷却期/突破/持仓/z4h
步骤4.5: 方向判断
步骤4.6: 四层动量过滤(跨层共享 _LayerContext)
├── Layer 0(硬): BOCPD P(trending) / DFA-2后备
│ 共享 ctx.bocpd_trend_prob → Layer 1/2
│
├── Layer 2(硬): 自适应CUSUM + BTC指数 + 跨层修正
│
├── Layer 1(软): ER + MTF确认 + 跨层修正
│
└── Layer 3(仲裁): PP/ADF+KPSS + OU半衰期
步骤5: 产生 EntrySignal
4.2 线程安全
与 v4.0 一致。MomentumFilter 不持有锁,由 AdaptiveBollingerStrategy 的 _lock 统一保护。新增的 _BOCPD 状态和 _HourBarAccum 均在锁保护范围内更新。
4.3 日志格式(v5.0)
动量过滤(硬) | BTC|ETH | alt:Layer0-BOCPD趋势机制: P(trending)=0.82>0.75
动量过滤(硬) | SOL|BTC | alt:Layer0-DFA2趋势机制: Hurst=0.67>0.60 R²=0.92(BOCPD未就绪,DFA-2后备)
动量过滤(硬) | SOL|BTC | alt:Layer2-暴涨不做空: CUSUM+=3.82>=3.15 RS_vol=0.0028 量比=2.3 VWPM=0.0012 BTC压力→阈值×0.85
动量过滤(软+Spread) | SOL|BTC | alt:Layer1-不追跌: 过去30根净跌幅=-1.23% ER=0.72>=0.48 [MTF-1h-ER=0.55确认] | 平稳但半衰期过长: PP_p=0.02 KPSS_p=0.12 HL=42.3h>36h→维持拦截
动量仲裁放行 | ETH|BTC | 单腿:alt:Layer1-不追涨: ... | PP+KPSS确认平稳(PP_p=0.01 KPSS_p=0.15 HL=8.2h)→推翻拦截
4.4 配置层集成(v5.0)
# Layer 0: BOCPD + DFA-2 后备
TRADING_MOMENTUM_BOCPD_ENABLED=true # v5.0 新增
TRADING_MOMENTUM_BOCPD_HAZARD_RATE=0.01 # v5.0 新增
TRADING_MOMENTUM_BOCPD_TREND_THRESHOLD=0.75 # v5.0 新增
TRADING_MOMENTUM_HURST_ENABLED=true
TRADING_MOMENTUM_HURST_LOOKBACK=60
TRADING_MOMENTUM_HURST_THRESHOLD=0.60
TRADING_MOMENTUM_HURST_R2_THRESHOLD=0.85
# Layer 1: 持续趋势 + MTF
TRADING_MOMENTUM_SUSTAINED_LOOKBACK=30
TRADING_MOMENTUM_SUSTAINED_BASE_THRESHOLD=0.008
TRADING_MOMENTUM_ER_THRESHOLD_LONG=0.60
TRADING_MOMENTUM_ER_THRESHOLD_SHORT=0.50
TRADING_MOMENTUM_MTF_ENABLED=true # v5.0 新增
TRADING_MOMENTUM_MTF_ER_THRESHOLD=0.50 # v5.0 新增
TRADING_MOMENTUM_MTF_LOOKBACK=6 # v5.0 新增
# Layer 2: 急动检测
TRADING_MOMENTUM_RS_PERIOD=10
TRADING_MOMENTUM_CUSUM_DRIFT=0.5
TRADING_MOMENTUM_CUSUM_DRIFT_ADAPTIVE=true # v5.0 新增
TRADING_MOMENTUM_CUSUM_THRESHOLD_SPIKE_UP=3.5
TRADING_MOMENTUM_CUSUM_THRESHOLD_SPIKE_DOWN=2.5
TRADING_MOMENTUM_VOLUME_CONFIRM_RATIO=1.5
TRADING_MOMENTUM_VWPM_WINDOW=5
TRADING_MOMENTUM_VWPM_CONFIRM_RATIO=0.8
TRADING_MOMENTUM_MARKET_REF_SYMBOL=BTC
TRADING_MOMENTUM_BTC_STRESS_THRESHOLD=2.0
TRADING_MOMENTUM_BTC_STRESS_FACTOR_MIN=0.7
TRADING_MOMENTUM_BTC_DECAY_RATE=0.3 # v5.0 新增
# Layer 3: Spread 仲裁
TRADING_MOMENTUM_SPREAD_LOOKBACK=20
TRADING_MOMENTUM_SPREAD_ER_THRESHOLD=0.45
TRADING_MOMENTUM_SPREAD_NET_THRESHOLD=1.5
TRADING_MOMENTUM_SPREAD_ADF_PVALUE=0.05
TRADING_MOMENTUM_SPREAD_KPSS_PVALUE=0.05
TRADING_MOMENTUM_SPREAD_USE_PP=true # v5.0 新增
TRADING_MOMENTUM_SPREAD_HALF_LIFE_MAX=36.0 # v5.0 新增
# 跨层
TRADING_MOMENTUM_CROSS_LAYER_ENABLED=true # v5.0 新增
TRADING_MOMENTUM_CROSS_LAYER_FACTOR=0.4 # v5.0 新增
5. 与当前交易风格的契合度分析
5.1 交易风格特征
| 维度 | 当前系统特征 |
|---|---|
| 策略类型 | 配对协整均值回归 |
| K 线周期 | 5 分钟 |
| 最大持仓时间 | 72 小时 |
| 入场逻辑 | adaptive_z 突破阈值(首次穿越) |
| 出场逻辑 | adaptive_z 回归至 entry_adaptive_z × reversion_factor |
5.2 v5.0 新增改进的契合度分析
BOCPD 概率化机制检测(最高契合):
均值回归策略的前提是"市场处于均值回归机制"。BOCPD 直接输出 P(均值回归机制失效) 的概率,是对该前提最直接的检验。相比 DFA-2 阈值判断,BOCPD 的概率输出让硬拦截决策更平滑、更可靠——P=0.76 和 P=0.99 的拦截有不同的置信度,而 DFA-2 的 Hurst=0.61 和 Hurst=0.90 触发相同动作。
OU 半衰期约束(最高契合):
对均值回归策略来说,"多快回归"比"是否平稳"更直接相关。半衰期 = 200h 的平稳序列通过 ADF+KPSS,但在 72h 持仓限制内无法获利。v5.0 将 half_life < max_hold_hours/2 作为 Layer 3 放行的额外约束,直接将过滤器与策略的时间框架绑定。
自适应 CUSUM drift(高契合):
加密市场波动率变化剧烈(BTC 日波动率从 1% 到 10%+)。固定 drift 在低波动期过于宽松(漏判)、高波动期过于敏感(误判)。自适应 drift 让 CUSUM 的 ARL 特性在不同波动率环境下保持稳定。
多时间框架确认(高契合):
5min 级别噪音导致 Layer 1 误触发。1h 确认后降低 ER 阈值,本质上是"多时间框架投票"——只有 5m+1h 都认为有趋势时,才更积极拦截。
跨层信息共享(中高契合):
弱趋势信号(BOCPD trend_prob=0.55)单独不足以硬拦截,但可以让 Layer 1/2 更敏感。这种"梯度式谨慎"比"全有或全无"的独立判断更精细。
Phillips-Perron(中契合):
加密市场波动率聚集显著,PP 比 ADF 更可靠。但在 spread_lookback=20 的小样本下,PP 和 ADF 的差异不大。主要价值在理论严谨性。
5.3 综合评分(v5.0 vs v4.0)
| 维度 | 评分 | vs v4.0 | 说明 |
|---|---|---|---|
| 算法前沿性 | ★★★★★ | +1★ | BOCPD (2007) + OU 半衰期 = state-of-the-art |
| 误杀控制 | ★★★★★ | +0.5★ | 概率化决策 + 半衰期约束减少误杀 |
| 自适应能力 | ★★★★★ | +0.5★ | 自适应 drift + 跨层动态调节 |
| 市场微观结构 | ★★★★★ | 持平 | BTC 指数衰减是微调,OFI 留待 Phase 2 |
| 统计严谨性 | ★★★★★ | +0.5★ | BOCPD 贝叶斯框架 + PP 异方差鲁棒 + 半衰期 |
| 多尺度覆盖 | ★★★★★ | +1★ | MTF 确认填补 v4.0 单尺度缺陷 |
| 层间协同 | ★★★★★ | +1★ | 跨层共享 vs v4.0 独立判断 |
| 运行效率 | ★★★★☆ | 持平 | BOCPD O(200) per bar << DFA-2 O(N²) |
| 代码侵入性 | ★★★★★ | 持平 | 独立模块,可逐项关闭 |
6. 与四项开单约束的对应关系
6.1 约束覆盖矩阵(v5.0)
| 约束 | 过滤层 | 触发指标 | 被阻止方向 | 检查维度 | 可仲裁 | v5.0 变化 |
|---|---|---|---|---|---|---|
| 0. 趋势机制 | Layer 0 | BOCPD P(trending)>T 或 Hurst(DFA-2)>T | 双向 | alt+base | 否 | BOCPD 概率化 |
| 1. 不追跌 | Layer 1 | ER≥T_short AND net_ret≤-T_adp | short | alt+base+spread | 是 | MTF+跨层修正 |
| 2. 不追涨 | Layer 1 | ER≥T_long AND net_ret≥+T_adp | long | alt+base+spread | 是 | MTF+跨层修正 |
| 3. 暴涨不做空 | Layer 2 | CUSUM+≥T_up×SF AND VWPM确认 | short | alt+base+BTC | 否 | 自适应drift+BTC指数+跨层 |
| 4. 暴跌不做多 | Layer 2 | CUSUM-≥T_down×SF AND VWPM确认 | long | alt+base+BTC | 否 | 自适应drift+BTC指数+跨层 |
四项约束完全覆盖。v5.0 增强:Layer 0 概率化替代阈值判断,Layer 2 自适应 drift 提升跨波动率环境稳定性,Layer 3 半衰期约束确保放行信号在策略时间框架内可获利。
7. 参数配置指南
7.1 v5.0 新增参数
Layer 0:BOCPD
| 参数 | 默认值 | 范围 | 说明 |
|---|---|---|---|
bocpd_hazard_rate |
0.01 | 0.005-0.05 | 变点先验概率(1/期望机制长度) |
bocpd_trend_threshold |
0.75 | 0.60-0.90 | P(trending) > 此值 → 硬拦截 |
bocpd_hazard_rate 调优:
0.005:期望机制长度 200 根 = 16.7h → 对机制切换不敏感
0.01:期望机制长度 100 根 = 8.3h → 标准值
0.02:期望机制长度 50 根 = 4.2h → 对快速机制切换敏感
0.05:期望机制长度 20 根 = 1.7h → 过于敏感,噪音大
bocpd_trend_threshold 调优:
0.60:激进,轻微趋势信号即拦截(误杀多)
0.75:标准值,需要较明确的趋势后验才拦截
0.90:保守,只有非常高置信度才拦截(可能漏判)
Layer 1:多时间框架
| 参数 | 默认值 | 范围 | 说明 |
|---|---|---|---|
mtf_er_threshold |
0.50 | 0.35-0.65 | 1h ER ≥ 此值时降低 5m ER 阈值 |
mtf_lookback |
6 | 4-12 | 1h 回望根数(6根=6小时) |
Layer 2:自适应 drift + BTC 指数
| 参数 | 默认值 | 范围 | 说明 |
|---|---|---|---|
cusum_drift_adaptive |
true | — | 自适应 drift 开关 |
btc_decay_rate |
0.3 | 0.1-0.5 | BTC 指数衰减率 λ |
btc_decay_rate 调优:
0.1:温和衰减,BTC 影响范围大但幅度小
0.3:标准值,excess=1 时 factor=0.74
0.5:急剧衰减,只有 BTC 极端急动时才显著影响 alt 阈值
Layer 3:PP + 半衰期
| 参数 | 默认值 | 范围 | 说明 |
|---|---|---|---|
spread_use_pp |
true | — | Phillips-Perron 检验开关(需 arch 库) |
spread_half_life_max |
36.0 | 12.0-48.0 | 最大允许半衰期(小时) |
spread_half_life_max 调优:
12h:严格,只放行快速回归的 spread → 误杀增加但安全性高
36h:标准值(= max_hold_hours/2),在持仓期内有足够回归时间
48h:宽松,允许较慢的回归 → 漏判增加
建议:设为 max_hold_hours / 2
跨层
| 参数 | 默认值 | 范围 | 说明 |
|---|---|---|---|
cross_layer_factor |
0.4 | 0.2-0.6 | 跨层修正强度 |
cross_layer_factor 调优:
0.2:微弱跨层影响(各层近乎独立)
0.4:标准值,trend_prob=0.75 时 ER 阈值降 10%
0.6:强跨层影响,trend_prob=0.75 时 ER 阈值降 15%
7.2 继承 v4.0 参数
Layer 0(DFA-2 后备)、Layer 1(ER/RS)、Layer 2(CUSUM/VWPM/BTC 基础)、Layer 3(ADF/KPSS/ER)的参数调优指南与 v4.0 完全一致,此处不重复。
7.3 回测验证建议
对比六组回测:
A 组:无动量过滤(基线)
B 组:v3.0(RS + DFA-1 + ADF+ER)
C 组:v4.0(RS + DFA-2+R² + VWPM + BTC + ADF+KPSS+ER)
D 组:v5.0 完整版
E 组:v5.0 关闭各子功能(评估各改进项贡献)
E1: 关闭 BOCPD(仅用 DFA-2 后备)
E2: 关闭自适应 drift(固定 drift=0.5)
E3: 关闭 MTF 确认
E4: 关闭半衰期约束
E5: 关闭跨层共享
E6: 关闭 PP(仅用 ADF)
F 组:v5.0 + Phase 2 OFI(L2 book 集成后)
核心指标:
- Sharpe Ratio(风险调整收益)
- 最大连续亏损次数
- 信号过滤率 / 误杀率
- Layer 3 仲裁放行率 / 半衰期拒绝率
- Layer 0 BOCPD vs DFA-2 激活比例
- 分波动率区间表现(高/中/低波动率环境)
预期 v5.0 vs v4.0 增量收益:
BOCPD:减少 Layer 0 误拦截 20-30%(概率化 vs 阈值化)
自适应 drift:高波动期 Layer 2 误报减少 15-25%
MTF 确认:Layer 1 误触发减少 10-20%(1h 不确认的短期波动)
半衰期约束:过滤掉 10-15% 的"假放行"(平稳但回归太慢)
跨层共享:整体拦截精度提升 5-10%(弱趋势信号的梯度传递)
PP:在波动率聚集期 Layer 3 准确率提升 5-10%
8. 风险与局限性
8.1 已知局限
| 局限 | 说明 | 缓解措施 |
|---|---|---|
| BOCPD hazard_rate 敏感性 | 先验变点概率影响机制长度估计 | 默认 0.01 在多种市况验证;可通过回测优化 |
| BOCPD 预热期 | 需 20 根 K 线(100 分钟)才就绪 | DFA-2 自动后备;启动时 DB 回填 |
| BOCPD 趋势概率的 sigmoid 映射 | 将后验 z 值映射为 P(trending),sigmoid 中心点 2.0 凭经验 | 经验表明 z>2 对应 ~95% 置信;可通过回测校准 |
| 半衰期估计对小样本敏感 | spread_lookback=20 时 OLS 仅 19 个观测值 | 小样本时半衰期约束自动跳过(返回 None) |
| PP 需 arch 库 | arch 不一定已安装 |
自动降级为 ADF;arch 是 numpy 的常见扩展 |
| MTF 1h 聚合延迟 | 1h bar 需要 12 根 5m bar 才完成 | 前 12 根无 MTF 数据,不影响决策(MTF 只是阈值修正) |
| 自适应 drift 可能过度补偿 | 极端波动率时 drift 变化大 | clamp(0.5, 2.0) 限制缩放范围 |
| 跨层因子的最优值未知 | cross_layer_factor=0.4 凭经验 | 影响温和(最多 10% 阈值调整),回测优化 |
| 参数总数增加 | v5.0 共 33 个参数(v4.0 为 24 个) | 9 个新参数全有默认值;可考虑未来引入元参数降维 |
8.2 不适合的场景
- 极低流动性资产:RS 波动率失真,BOCPD 因稀疏数据后验不收敛
- HIP-3 稀疏资产:volume 不可靠时 VWPM 无效,OFI 也无法使用
- 极短持仓策略:半衰期约束对 max_hold_hours < 4h 的策略过于宽松
8.3 后续迭代方向(v5.0 之后)
| 优先级 | 方向 | 说明 |
|---|---|---|
| P1 | Phase 2: OFI 集成 | L2 book 数据管道 → MomentumFilter,替代 VWPM |
| P1 | BOCPD 参数自适应 | hazard_rate 根据近期变点频率动态调整 |
| P2 | 元参数降维 | 引入 risk_attitude ∈ [0,1] 驱动所有子参数联合缩放 |
| P2 | Transfer Entropy BTC 因子 | 信息论框架度量 BTC→alt 因果信息流,替代 CUSUM stress |
| P2 | Kalman Filter 动态对冲比率 | 追踪时变协整向量,Kalman gain 异常时 Layer 0 触发 |
| P3 | BOCPD sigmoid 校准 | 用历史数据校准 sigmoid 中心点和斜率 |
| P3 | Copula 尾部依赖 | 建模 BTC-alt 非线性尾部依赖,替代线性 stress factor |
| P3 | Shiryaev-Roberts 对照 | SR 程序与 CUSUM 的 minimax 最优性对比验证 |
| P4 | HMM 离线训练 | 用历史数据训练两状态 HMM,作为 BOCPD 的交叉验证 |
附录 A:算法学术参考
| 算法 | 原始论文 | 核心贡献 |
|---|---|---|
| BOCPD | Adams, R.P. & MacKay, D.J.C. (2007) arXiv:0710.3742 | 贝叶斯在线变点检测,NIG 共轭先验,概率化机制推断 |
| DFA / DFA-2 | Peng et al. (1994); Kantelhardt et al. (2002) | 去趋势波动分析,Hurst 指数估计 |
| DFA in Finance | Mantegna & Stanley (1995) Nature | DFA 引入金融时间序列 |
| Kaufman ER | Kaufman, P. (1995) Smarter Trading | 方向距离/路径长度衡量趋势效率 |
| CUSUM | Page, E.S. (1954) Biometrika | 变化检测最优在线算法 |
| CUSUM 最优性 | Lorden, G. (1971) Ann. Math. Statist. | CUSUM 一阶渐近最优 |
| CUSUM 自适应 drift | Hawkins & Olwell (1998) Cumulative Sum Charts | CUSUM ARL 依赖 drift 与信号比,建议自适应调整 |
| Rogers-Satchell | Rogers, L. & Satchell, S. (1991) Ann. Appl. Prob. | 漂移不变 OHLC 波动率 |
| Garman-Klass | Garman, M. & Klass, M. (1980) J. Business | OHLC 波动率(零漂移假设) |
| Kyle 模型 | Kyle, A.S. (1985) Econometrica | 价格+成交量联合信号 |
| OFI | Cont, R., Kukanov, A. & Stoikov, S. (2014) J. Financial Econometrics | Order Flow Imbalance,盘口微观结构量价指标 |
| ADF 检验 | Dickey, D.A. & Fuller, W.A. (1979) JASA | 单位根检验 |
| KPSS 检验 | Kwiatkowski et al. (1992) J. Econometrics | 平稳性检验(ADF 对偶) |
| Phillips-Perron | Phillips, P.C.B. & Perron, P. (1988) Biometrika | 非参数异方差鲁棒单位根检验 |
| 时间序列动量 | Moskowitz, Ooi & Pedersen (2012) J. Financial Economics | 资产间动量溢出效应 |
| OU 过程半衰期 | Vidyamurthy, G. (2004) Pairs Trading | 均值回归半衰期估计,配对交易核心参数 |
| VPIN | Easley, Lopez de Prado & O'Hara (2012) Rev. Financial Studies | 知情交易概率的成交量同步估计 |
| Bayesian Changepoint | Adams & MacKay (2007) arXiv | 贝叶斯在线变点检测 |
| Shiryaev-Roberts | Pollak & Tartakovsky (2008) Ann. Statist. | CUSUM 替代方案 |
| Transfer Entropy | Schreiber, T. (2000) Physical Review Letters | 信息论因果信息流度量 |
| HMM 机制切换 | Hamilton, J.D. (1989) Econometrica | 马尔可夫机制切换模型 |
| 配对交易实证 | Gatev, E. et al. (2006) Rev. Financial Studies | 配对交易大规模实证验证 |
附录 B:v4.0 → v5.0 改进对照
| 维度 | v4.0 | v5.0 | 改进原因 |
|---|---|---|---|
| 机制检测 | DFA-2 点估计 + R² 质检 | BOCPD 概率化 + DFA-2 后备 | 概率输出量化不确定性,短序列更可靠(Adams & MacKay 2007) |
| 机制检测阈值 | Hurst > 0.60(二值) | P(trending) > 0.75(连续) | 连续概率让决策更精细 |
| CUSUM drift | 固定 0.5 | 自适应 ∝ RS vol / baseline | 跨波动率环境 ARL 稳定性(Hawkins & Olwell 1998) |
| BTC 因子衰减 | 线性 1.0 - 0.1×excess | 指数 exp(-0.3×excess) | 指数衰减更平滑,有物理依据 |
| Spread 单位根检验 | ADF(异方差敏感) | PP 优先 + ADF 后备 | PP 对波动率聚集鲁棒(Phillips & Perron 1988) |
| Spread 放行条件 | 平稳即放行 | 平稳 + 半衰期 < max_hold/2 | 半衰期直接关联策略持仓期(Vidyamurthy 2004) |
| 时间框架 | 仅 5min | 5min + 1h MTF 确认 | 多尺度验证减少噪音误触发 |
| 层间关系 | 完全独立 | 跨层信息共享 via _LayerContext | 弱信号梯度传递,提升整体拦截精度 |
| 量价确认 | VWPM | VWPM(Phase 1)+ OFI 路径(Phase 2) | OFI 从盘口微观结构直接度量买卖压力 |
| 新增参数 | — | 9 个 | BOCPD×3, MTF×3, drift×1, BTC×1, PP×1, HL×1, 跨层×2(-hl_max 与 pp 共用 Layer 3) |
| 总参数数 | 24 | 33 | 全部有默认值,可逐项关闭 |
文档结束