开仓动量过滤器设计方案6精简版
开仓动量过滤器设计方案
版本:v5.1 | 日期:2026-03-06
适用:Hyperliquid 量化交易系统(Adaptive Bollinger Z-Score 配对策略)
1. 问题定义
1.1 四项约束
约束 1:连续下跌 -> 不追跌(不做空)
约束 2:连续上涨 -> 不追涨(不做多)
约束 3:迅速暴涨 -> 不做空
约束 4:迅速暴跌 -> 不做多
1.2 过滤维度
| 维度 | 检查对象 | 原因 |
|---|---|---|
| Alt 腿 | alt symbol 价格动量 | 直接持仓标的风险 |
| Base 腿 | base symbol 价格动量 | 配对反向腿同样承担动量风险 |
| Spread | 两腿价差趋势性 | 两腿同涨但 spread 稳定时不应误杀 |
2. 四层过滤架构
执行顺序按拦截严格性排序(硬拦截优先):
输入信号(direction, alt_symbol, base_symbol)
|
+-->> [第1步] Layer 0(硬拦截): BOCPD 机制检测 + DFA-2 后备
| P(trending) > 0.75 -> 拦截
| 共享 ctx.bocpd_trend_prob -> Layer 2/1 阈值调节
|
+-->> [第2步] Layer 2(硬拦截): Huber-CUSUM + BTC 滞后因果因子
| 任一腿急动触发 -> 拦截
|
+-->> [第3步] Layer 1(软拦截): ER + RS 净位移 + MTF 连续加权
| 任一腿持续趋势 -> 软拦截 -> 交由 Layer 3 仲裁
|
+-->> [第4步] Layer 3(仲裁器): PP/ADF + KPSS + OU 半衰期
Spread 平稳 + 半衰期合理 -> 推翻拦截;否则维持
|
v
最终决策:允许 / 拒绝入场
| 层级 | 执行顺序 | 类型 | 可仲裁 | 触发含义 |
|---|---|---|---|---|
| Layer 0 | 第1步 | 硬拦截 | 否 | 均值回归假设失效 |
| Layer 2 | 第2步 | 硬拦截 | 否 | 急动执行风险极高 |
| Layer 1 | 第3步 | 软拦截 | 是 | 单腿持续趋势,可能误杀 |
| Layer 3 | 第4步 | 仲裁 | -- | Spread 平稳+半衰期合理则放行 |
2.1 Layer 0:BOCPD 机制检测 + DFA-2 后备
BOCPD (Adams & MacKay 2007) 使用 NIG 共轭先验在线检测机制切换,输出 P(trending)。
核心算法:
输入:对数收益率 {r_t},hazard_rate H = 0.01
先验:NIG(mu0=0, kappa0=1, alpha0=3, beta0=0.01)
每时刻 t,对每个运行长度 r = 0..R(R=200):
1. 预测:pi(r_t|r) = Student-t_{2*alpha_r}(mu_r, beta_r*(kappa_r+1)/(alpha_r*kappa_r))
2. 增长:P(r_t=r+1) = P(r_{t-1}=r) * pi * (1-H)
3. 变点:P(r_t=0) = Sum_r P(r_{t-1}=r) * pi * H
4. 更新 NIG:kappa'=kappa+1, mu'=(kappa*mu+r_t)/kappa',
alpha'=alpha+0.5, beta'=beta+kappa*(r_t-mu)^2/(2*kappa')
5. 归一化 + 截断至 R
趋势概率(贝叶斯后验边缘化):
P(trending) = Sum_r P(r_t=r) * P(trending|r)
P(trending|r) = P(|mu_r| > drift_threshold | NIG posterior_r)
mu_r | data ~ Student-t_{2*alpha_r}(mu_hat, beta_r/(alpha_r*kappa_r))
drift_threshold = 0.0002(5min 收益率 0.02%,有经济意义的最小漂移)
决策逻辑:
if BOCPD 就绪(>= 20 根更新):
if trend_prob > 0.75 -> 硬拦截
else -> 放行,trend_prob 共享给 Layer 2/1
else:
降级为 DFA-2 + R-squared 质检(Hurst > 0.60 且 R2 > 0.85 -> 硬拦截)
2.2 Layer 2:Huber-CUSUM 急动检测
ret(i) = (close(i) - close(i-1)) / close(i-1)
# Huber 鲁棒化(减少闪崩/插针单根误触发)
z_raw = ret / rs_vol
z = sign(z_raw) * min(|z_raw|, 5.0)
# 自适应 drift(跟随波动率环境)
adaptive_drift = 0.5 * clamp(rs_vol / baseline_vol, 0.5, 2.0)
S_pos(i) = max(0, S_pos(i-1) + z - adaptive_drift)
S_neg(i) = max(0, S_neg(i-1) - z - adaptive_drift)
# BTC 滞后因果因子(lag=3 根 K 线 = 15 分钟)
if symbol != BTC:
btc_stress = max(btc_stress_history[-3:])
if btc_stress > 2.0:
stress_factor = max(0.7, exp(-0.3 * (btc_stress - 2.0)))
effective_thresh *= stress_factor
# 跨层 BOCPD 修正
if trend_prob > 0.5:
effective_thresh *= 1.0 - 0.2 * (trend_prob - 0.5)
# VWPM + 量比联合确认
volume_confirmed = volume_ratio >= 1.5 AND |VWPM|/rs_vol >= 0.8
# 非对称触发
约束3:direction=='short' AND S_pos >= thresh_up AND volume_confirmed
约束4:direction=='long' AND S_neg >= thresh_down AND volume_confirmed
RS 波动率使用 Rogers-Satchell (1991) 漂移不变 OHLC 估计器。
2.3 Layer 1:ER + RS 净位移 + MTF 连续加权
N = 30(5min K 线)
# Kaufman Efficiency Ratio
ER = |close[-1] - close[-N]| / Sum|close[i] - close[i-1]|
# 非对称 ER 阈值
er_thresh = 0.60(做多)或 0.50(做空)
# MTF 连续加权(1h 级别 ER 修正 5m 阈值)
mtf_er = 1h 级别 ER(6 根 1h bar)
mtf_weight = clamp((mtf_er - 0.30) / (0.70 - 0.30), 0, 1)
er_thresh *= 1.0 - 0.20 * mtf_weight
# 跨层 BOCPD 修正
if trend_prob > 0.5:
er_thresh *= 1.0 - 0.4 * (trend_prob - 0.5)
# RS 自适应净位移阈值
adaptive_thresh = 0.008 * clamp(rs_vol / baseline, 0.3, 3.0)
# 联合触发
约束1:direction=='short' AND net_return <= -adaptive_thresh AND ER >= er_thresh
约束2:direction=='long' AND net_return >= +adaptive_thresh AND ER >= er_thresh
2.4 Layer 3:Spread 仲裁(PP + KPSS + OU 半衰期)
仅在 Layer 1 软拦截时调用。判断 spread 是否平稳且回归速度足够快。
series = z4h_history 最近 41 个值(spread_lookback=40)
# 第一关:ER 快速筛选
if spread_er < 0.45 -> 无趋势,放行
# 第二关:PP/ADF + KPSS 平稳性检验
PP 优先(异方差鲁棒,需 arch 库),ADF 后备
四象限决策矩阵:
ur_stationary + kpss_stationary -> 确认平稳 -> 第三关
双非平稳 -> 确认非平稳 -> 维持拦截
矛盾/不确定 -> 保守维持拦截
# 第三关:OU 半衰期约束(含 Hurwicz 偏差校正)
theta_ols = OLS 回归 DeltaX on X_{t-1} 的斜率
theta_corrected = theta_ols * n / (n - 2.5) # Hurwicz 校正
half_life = -ln(2) / ln(1 + theta_corrected) # 小时
if half_life > min(36.0, max_hold_hours * 0.5):
-> 回归太慢 -> 维持拦截
else:
-> 平稳 + 半衰期合理 -> 推翻拦截
2.5 跨层信息共享
@dataclass
_LayerContext:
bocpd_trend_prob: float | None # Layer 0 -> Layer 2, 1
信息流:Layer 0 -> ctx.bocpd_trend_prob -> Layer 2 调节 CUSUM 阈值
-> Layer 1 调节 ER 阈值
设计原则:
- 修正因子最多 10-15% 阈值调整,不是决定性开关
- 各层可独立关闭/测试
- ctx 仅单次 check() 内有效
3. 完整实现
3.1 改动文件
| 文件 | 改动 | 行数 |
|---|---|---|
src/trading/momentum_filter.py |
重写 | ~600 行 |
src/trading/config.py |
StrategyParams 新增字段 | ~20 行 |
src/trading/strategy.py |
_check_entry 注入过滤器 | ~2 行 |
src/trading/orchestrator.py |
透传 OHLCV | ~5 行 |
src/services/realtime_kline_service_base.py |
提取双腿 OHLCV | ~20 行 |
3.2 momentum_filter.py 完整代码
# src/trading/momentum_filter.py
"""
开仓动量过滤器 v5.1
四层过滤 + 跨层共享:
Layer 0: BOCPD 机制检测 + DFA-2 后备 -> 硬拦截
Layer 2: Huber-CUSUM + BTC 滞后因果因子 -> 硬拦截
Layer 1: ER + RS 净位移 + MTF 连续加权 -> 软拦截
Layer 3: PP/ADF + KPSS + OU 半衰期(Hurwicz) -> 仲裁
"""
import math
import warnings
from collections import deque
from dataclasses import dataclass
from datetime import datetime
try:
from statsmodels.tsa.stattools import adfuller as _adfuller
_HAS_ADF = True
except ImportError:
_HAS_ADF = False
try:
from statsmodels.tsa.stattools import kpss as _kpss
_HAS_KPSS = True
except ImportError:
_HAS_KPSS = False
try:
from arch.unitroot import PhillipsPerron as _PhillipsPerron
_HAS_PP = True
except ImportError:
_HAS_PP = False
# ── 辅助函数 ──
def _logsumexp(vals: list[float]) -> float:
if not vals:
return float('-inf')
m = max(vals)
if m == float('-inf'):
return float('-inf')
return m + math.log(sum(math.exp(v - m) for v in vals))
def _student_t_logpdf(x: float, mu: float, scale_sq: float, df: float) -> float:
if scale_sq <= 1e-30 or df <= 0:
return -50.0
z_sq = (x - mu) ** 2 / (df * scale_sq)
return (math.lgamma((df + 1) / 2)
- math.lgamma(df / 2)
- 0.5 * math.log(df * math.pi * scale_sq)
- (df + 1) / 2 * math.log(1 + z_sq))
def _student_t_cdf(x: float, df: float) -> float:
"""Student-t CDF(连分数近似,df >= 2 精度优于 0.001)。"""
if df <= 0:
return 0.5
t = x
y = t * t
p = y / (df + y)
if df >= 30:
z = t * (1 - 1 / (4 * df)) / math.sqrt(1 + y / (2 * df))
a1, a2, a3 = 0.4361836, -0.1201676, 0.9372980
az = abs(z)
if az > 8:
return 1.0 if z > 0 else 0.0
tt = 1.0 / (1.0 + 0.33267 * az)
phi = 0.5 * math.exp(-az * az / 2) * tt * (a1 + tt * (a2 + tt * a3))
return 1.0 - phi if z > 0 else phi
a = df / 2
b = 0.5
p = max(0.0, min(1.0, p))
if p < 1e-15:
ibeta = 0.0
elif p > 1 - 1e-15:
ibeta = 1.0
else:
log_prefix = (math.lgamma(a + b) - math.lgamma(a) - math.lgamma(b)
+ a * math.log(p) + b * math.log(1 - p))
prefix = math.exp(log_prefix)
f = 1.0
c = 1.0
d = 1.0 - (a + b) * p / (a + 1)
if abs(d) < 1e-30:
d = 1e-30
d = 1.0 / d
f = d
for m in range(1, 100):
num = m * (b - m) * p / ((a + 2 * m - 1) * (a + 2 * m))
d = 1.0 + num * d
if abs(d) < 1e-30: d = 1e-30
c = 1.0 + num / c
if abs(c) < 1e-30: c = 1e-30
d = 1.0 / d
f *= d * c
num = -(a + m) * (a + b + m) * p / ((a + 2 * m) * (a + 2 * m + 1))
d = 1.0 + num * d
if abs(d) < 1e-30: d = 1e-30
c = 1.0 + num / c
if abs(c) < 1e-30: c = 1e-30
d = 1.0 / d
delta = d * c
f *= delta
if abs(delta - 1.0) < 1e-10:
break
ibeta = prefix * f / a
half_ibeta = 0.5 * ibeta
return 1.0 - half_ibeta if t >= 0 else half_ibeta
def _estimate_half_life(series: list[float], bar_minutes: float = 5.0,
hurwicz_correction: bool = True) -> float | None:
"""OU 半衰期估计(OLS + Hurwicz 偏差校正)。"""
n = len(series)
if n < 10:
return None
y = [series[i] - series[i - 1] for i in range(1, n)]
x = series[:-1]
n_obs = len(y)
x_mean = sum(x) / n_obs
y_mean = sum(y) / n_obs
cov_xy = sum((x[i] - x_mean) * (y[i] - y_mean) for i in range(n_obs))
var_x = sum((x[i] - x_mean) ** 2 for i in range(n_obs))
if var_x < 1e-12:
return None
theta = cov_xy / var_x
if hurwicz_correction and n_obs > 3:
theta = theta * n_obs / (n_obs - 2.5)
if theta >= 0 or theta <= -1:
return None
half_life_bars = -math.log(2) / math.log(1 + theta)
half_life_hours = half_life_bars * bar_minutes / 60.0
return half_life_hours if half_life_hours > 0 else None
# ── BOCPD ──
class _BOCPD:
"""Bayesian Online Changepoint Detection (Adams & MacKay 2007)。
NIG 共轭先验,Student-t 预测,log-space,O(R) per update。"""
__slots__ = ('_log_h', '_log_1mh', '_mu0', '_kappa0', '_alpha0', '_beta0',
'_max_run', '_log_probs', '_suff', '_n_updates',
'_drift_threshold')
def __init__(self, hazard_rate: float = 0.01, mu0: float = 0.0,
kappa0: float = 1.0, alpha0: float = 3.0,
beta0: float = 0.01, max_run: int = 200,
drift_threshold: float = 0.0002):
self._log_h = math.log(max(1e-10, hazard_rate))
self._log_1mh = math.log(max(1e-10, 1.0 - hazard_rate))
self._mu0, self._kappa0 = mu0, kappa0
self._alpha0, self._beta0 = alpha0, beta0
self._max_run = max_run
self._drift_threshold = drift_threshold
self._log_probs: list[float] = [0.0]
self._suff: list[tuple[float, float, float, float]] = [
(mu0, kappa0, alpha0, beta0)
]
self._n_updates = 0
def update(self, x: float) -> None:
n = len(self._log_probs)
log_preds = []
for i in range(n):
mu, kappa, alpha, beta = self._suff[i]
if kappa > 0 and alpha > 0 and beta > 0:
scale_sq = beta * (kappa + 1) / (alpha * kappa)
log_preds.append(_student_t_logpdf(x, mu, scale_sq, 2 * alpha))
else:
log_preds.append(-50.0)
log_growth = [
self._log_probs[i] + log_preds[i] + self._log_1mh for i in range(n)
]
log_cp = _logsumexp([
self._log_probs[i] + log_preds[i] + self._log_h for i in range(n)
])
new_log = [log_cp] + log_growth
log_total = _logsumexp(new_log)
new_log = [lp - log_total for lp in new_log]
new_suff: list[tuple[float, float, float, float]] = [
(self._mu0, self._kappa0, self._alpha0, self._beta0)
]
for i in range(n):
mu, kappa, alpha, beta = self._suff[i]
kappa_n = kappa + 1
mu_n = (kappa * mu + x) / kappa_n
alpha_n = alpha + 0.5
beta_n = beta + kappa * (x - mu) ** 2 / (2 * kappa_n)
new_suff.append((mu_n, kappa_n, alpha_n, beta_n))
if len(new_log) > self._max_run:
new_log = new_log[:self._max_run]
new_suff = new_suff[:self._max_run]
log_total = _logsumexp(new_log)
new_log = [lp - log_total for lp in new_log]
self._log_probs = new_log
self._suff = new_suff
self._n_updates += 1
@property
def trend_probability(self) -> float:
"""P(trending) = Sum_r P(r) * P(|mu_r| > delta | NIG posterior_r)。"""
delta = self._drift_threshold
total = 0.0
for i in range(len(self._log_probs)):
prob = math.exp(self._log_probs[i])
if prob < 1e-8:
continue
mu, kappa, alpha, beta = self._suff[i]
if alpha > 0.5 and beta > 1e-30 and kappa > 0:
df = 2 * alpha
scale = math.sqrt(beta / (alpha * kappa))
if scale < 1e-15:
p_trend = 1.0 if abs(mu) > delta else 0.0
else:
t_upper = (delta - mu) / scale
t_lower = (-delta - mu) / scale
p_trend = (1.0 - _student_t_cdf(t_upper, df)
+ _student_t_cdf(t_lower, df))
else:
p_trend = 0.5
total += prob * p_trend
return total
@property
def ready(self) -> bool:
return self._n_updates >= 20
# ── DFA-2 后备 ──
def _hurst_dfa2(closes: list[float], r2_threshold: float = 0.85) -> tuple[float | None, float | None]:
"""DFA-2 Hurst 指数估计。返回 (hurst, r_squared) 或 (None, None)。"""
n = len(closes)
if n < 20:
return None, None
rets = []
for i in range(1, n):
if closes[i] > 0 and closes[i - 1] > 0:
rets.append(math.log(closes[i] / closes[i - 1]))
nr = len(rets)
if nr < 16:
return None, None
mean_r = sum(rets) / nr
profile = []
cum = 0.0
for r in rets:
cum += r - mean_r
profile.append(cum)
scales: list[float] = []
rms_vals: list[float] = []
s = 4
max_s = nr // 4
while s <= max_s:
n_win = nr // s
if n_win < 2:
break
rss_sum = 0.0
rss_cnt = 0
for i in range(n_win):
seg = profile[i * s: (i + 1) * s]
ws = len(seg)
if ws < 3:
if ws >= 2:
x_m = (ws - 1) * 0.5
y_m = sum(seg) / ws
cov = sum((j - x_m) * (seg[j] - y_m) for j in range(ws))
var_x = sum((j - x_m) ** 2 for j in range(ws))
slope = cov / var_x if var_x > 1e-12 else 0.0
intercept = y_m - slope * x_m
rss = sum((seg[j] - (intercept + slope * j)) ** 2 for j in range(ws)) / ws
rss_sum += rss
rss_cnt += 1
continue
s0 = float(ws)
s1 = ws * (ws - 1) / 2.0
s2 = ws * (ws - 1) * (2 * ws - 1) / 6.0
s3 = (ws * (ws - 1) / 2.0) ** 2
s4 = ws * (ws - 1) * (2 * ws - 1) * (3 * ws * ws - 3 * ws - 1) / 30.0
sy0 = sum(seg)
sy1 = sum(j * seg[j] for j in range(ws))
sy2 = sum(j * j * seg[j] for j in range(ws))
det = (s0 * (s2 * s4 - s3 * s3)
- s1 * (s1 * s4 - s3 * s2)
+ s2 * (s1 * s3 - s2 * s2))
if abs(det) < 1e-12:
x_m = (ws - 1) * 0.5
y_m = sum(seg) / ws
cov = sum((j - x_m) * (seg[j] - y_m) for j in range(ws))
var_x = sum((j - x_m) ** 2 for j in range(ws))
slope = cov / var_x if var_x > 1e-12 else 0.0
intercept = y_m - slope * x_m
rss = sum((seg[j] - (intercept + slope * j)) ** 2 for j in range(ws)) / ws
rss_sum += rss
rss_cnt += 1
continue
a = ((sy0 * (s2 * s4 - s3 * s3) - s1 * (sy1 * s4 - s3 * sy2)
+ s2 * (sy1 * s3 - s2 * sy2)) / det)
b = ((s0 * (sy1 * s4 - s3 * sy2) - sy0 * (s1 * s4 - s3 * s2)
+ s2 * (s1 * sy2 - sy1 * s2)) / det)
c = ((s0 * (s2 * sy2 - sy1 * s3) - s1 * (s1 * sy2 - sy1 * s2)
+ sy0 * (s1 * s3 - s2 * s2)) / det)
rss = sum((seg[j] - (a + b * j + c * j * j)) ** 2 for j in range(ws)) / ws
rss_sum += rss
rss_cnt += 1
if rss_cnt >= 2:
f_s = math.sqrt(rss_sum / rss_cnt)
if f_s > 1e-14:
scales.append(math.log(s))
rms_vals.append(math.log(f_s))
s = max(s + 1, int(s * 1.4 + 0.5))
if len(scales) < 3:
return None, None
n_pts = len(scales)
x_m = sum(scales) / n_pts
y_m = sum(rms_vals) / n_pts
num = sum((scales[i] - x_m) * (rms_vals[i] - y_m) for i in range(n_pts))
den = sum((scales[i] - x_m) ** 2 for i in range(n_pts))
if den < 1e-12:
return None, None
hurst = max(0.0, min(1.0, num / den))
slope = num / den
ss_res = sum((rms_vals[i] - (y_m + slope * (scales[i] - x_m))) ** 2 for i in range(n_pts))
ss_tot = sum((rms_vals[i] - y_m) ** 2 for i in range(n_pts))
r_squared = 1.0 - ss_res / ss_tot if ss_tot > 1e-12 else 0.0
if r_squared < r2_threshold:
return None, None
return hurst, r_squared
# ── 辅助类 ──
class _RollingMedian:
__slots__ = ('_data',)
def __init__(self, maxlen: int):
self._data: deque = deque(maxlen=maxlen)
def push(self, val: float) -> None:
self._data.append(val)
@property
def value(self) -> float | None:
if not self._data:
return None
s = sorted(self._data)
n = len(s)
mid = n >> 1
return s[mid] if n & 1 else (s[mid - 1] + s[mid]) * 0.5
def __len__(self) -> int:
return len(self._data)
class _HourBarAccum:
"""5m -> 1h 聚合器。"""
__slots__ = ('count', 'open_', 'high', 'low', 'close', 'volume')
def __init__(self):
self.reset()
def reset(self):
self.count = 0
self.open_ = self.high = self.close = self.volume = 0.0
self.low = float('inf')
def add(self, o: float, h: float, l: float, c: float, v: float) -> bool:
if self.count == 0:
self.open_ = o; self.high = h; self.low = l
else:
self.high = max(self.high, h); self.low = min(self.low, l)
self.close = c; self.volume += v; self.count += 1
return self.count >= 12
def as_tuple(self) -> tuple:
return (self.close, self.high, self.low, self.open_, self.volume)
@dataclass
class _LayerContext:
bocpd_trend_prob: float | None = None
hurst: float | None = None
hurst_r2: float | None = None
# ── MomentumFilter 主类 ──
class MomentumFilter:
"""开仓动量过滤器 v5.1。
公开接口:
update(symbol, close, high, low, open_, volume, kline_time)
check(symbol, direction) -> (allowed, reason, is_soft)
check_spread(z4h_history, direction, max_hold_hours) -> (has_trend, reason)
ready(symbol) -> bool
"""
def __init__(
self,
enabled: bool = True,
# Layer 0
bocpd_enabled: bool = True,
bocpd_hazard_rate: float = 0.01,
bocpd_trend_threshold: float = 0.75,
bocpd_drift_threshold: float = 0.0002,
hurst_enabled: bool = True,
hurst_lookback: int = 60,
hurst_threshold: float = 0.60,
hurst_r2_threshold: float = 0.85,
# Layer 1
sustained_lookback: int = 30,
sustained_base_threshold: float = 0.008,
er_threshold_long: float = 0.60,
er_threshold_short: float = 0.50,
mtf_enabled: bool = True,
mtf_soft_lower: float = 0.30,
mtf_soft_upper: float = 0.70,
mtf_max_reduction: float = 0.20,
mtf_lookback: int = 6,
# Layer 2
rs_period: int = 10,
cusum_drift: float = 0.5,
cusum_drift_adaptive: bool = True,
cusum_z_clip: float = 5.0,
cusum_threshold_spike_up: float = 3.5,
cusum_threshold_spike_down: float = 2.5,
volume_confirm_ratio: float = 1.5,
volume_ema_period: int = 20,
vwpm_window: int = 5,
vwpm_confirm_ratio: float = 0.8,
market_ref_symbol: str = "BTC",
btc_stress_threshold: float = 2.0,
btc_stress_factor_min: float = 0.7,
btc_decay_rate: float = 0.3,
btc_stress_lag: int = 3,
# Layer 3
spread_lookback: int = 40,
spread_er_threshold: float = 0.45,
spread_net_threshold: float = 1.5,
spread_adf_pvalue: float = 0.05,
spread_kpss_pvalue: float = 0.05,
spread_use_pp: bool = True,
spread_half_life_max: float = 36.0,
spread_hurwicz_correction: bool = True,
# 跨层
cross_layer_enabled: bool = True,
cross_layer_factor: float = 0.4,
):
self._enabled = enabled
self._bocpd_enabled = bocpd_enabled
self._bocpd_hazard = bocpd_hazard_rate
self._bocpd_trend_thresh = bocpd_trend_threshold
self._bocpd_drift_thresh = bocpd_drift_threshold
self._hurst_enabled = hurst_enabled
self._hurst_lookback = max(20, hurst_lookback)
self._hurst_thresh = hurst_threshold
self._hurst_r2_thresh = hurst_r2_threshold
self._sustained_n = max(5, sustained_lookback)
self._sustained_base_thresh = sustained_base_threshold
self._er_thresh_long = er_threshold_long
self._er_thresh_short = er_threshold_short
self._mtf_enabled = mtf_enabled
self._mtf_soft_lower = mtf_soft_lower
self._mtf_soft_upper = mtf_soft_upper
self._mtf_max_reduction = mtf_max_reduction
self._mtf_lookback = mtf_lookback
self._rs_period = rs_period
self._cusum_drift = cusum_drift
self._cusum_drift_adaptive = cusum_drift_adaptive
self._cusum_z_clip = cusum_z_clip
self._cusum_thresh_up = cusum_threshold_spike_up
self._cusum_thresh_down = cusum_threshold_spike_down
self._vol_confirm_ratio = volume_confirm_ratio
self._vol_ema_period = volume_ema_period
self._vwpm_window = vwpm_window
self._vwpm_confirm_ratio = vwpm_confirm_ratio
self._market_ref = market_ref_symbol
self._btc_stress_thresh = btc_stress_threshold
self._btc_stress_factor_min = btc_stress_factor_min
self._btc_decay_rate = btc_decay_rate
self._btc_stress_lag = max(1, btc_stress_lag)
self._spread_lookback = spread_lookback
self._spread_er_thresh = spread_er_threshold
self._spread_net_thresh = spread_net_threshold
self._spread_adf_pvalue = spread_adf_pvalue
self._spread_kpss_pvalue = spread_kpss_pvalue
self._spread_use_pp = spread_use_pp
self._spread_hl_max = spread_half_life_max
self._spread_hurwicz = spread_hurwicz_correction
self._cross_layer = cross_layer_enabled
self._cross_factor = cross_layer_factor
buf_size = max(hurst_lookback + 5, sustained_lookback + 5,
rs_period + 10, volume_ema_period + 5,
vwpm_window + 2) + 10
self._buf_size = buf_size
self._buffers: dict[str, deque] = {}
self._last_kline_time: dict[str, datetime] = {}
self._cusum_state: dict[str, tuple[float, float]] = {}
self._rs_ema: dict[str, float] = {}
self._vol_ema: dict[str, float] = {}
self._baseline_median: dict[str, _RollingMedian] = {}
self._bocpd_state: dict[str, _BOCPD] = {}
self._buffers_1h: dict[str, deque] = {}
self._hour_accum: dict[str, _HourBarAccum] = {}
self._btc_stress_history: dict[str, deque] = {}
# ── 公开接口 ──
def update(self, symbol: str, close: float, high: float = 0.0,
low: float = 0.0, open_: float = 0.0, volume: float = 0.0,
kline_time: datetime | None = None) -> None:
if kline_time is not None:
if self._last_kline_time.get(symbol) == kline_time:
return
self._last_kline_time[symbol] = kline_time
if symbol not in self._buffers:
self._buffers[symbol] = deque(maxlen=self._buf_size)
self._buffers[symbol].append((close, high, low, open_, volume))
buf = self._buffers[symbol]
if len(buf) >= 2:
self._online_update(symbol, buf)
def check(self, symbol: str, direction: str) -> tuple[bool, str, bool]:
"""Layer 0 -> Layer 2 -> Layer 1。返回 (allowed, reason, is_soft)。"""
if not self._enabled:
return True, "", False
buf = self._buffers.get(symbol)
if buf is None or len(buf) < self._rs_period + 2:
return True, "", False
ctx = _LayerContext()
ok, reason = self._check_regime(symbol, ctx)
if not ok:
return False, reason, False
ok, reason = self._check_spike(symbol, direction, ctx)
if not ok:
return False, reason, False
if len(buf) >= self._sustained_n + 2:
ok, reason = self._check_sustained(symbol, direction, ctx)
if not ok:
return False, reason, True
return True, "", False
def check_spread(self, z4h_history: list[float], direction: str,
max_hold_hours: float = 72.0) -> tuple[bool, str]:
"""Layer 3 仲裁。仅在 Layer 1 软拦截时调用。"""
n = self._spread_lookback
if len(z4h_history) < n + 1:
return False, ""
series = z4h_history[-(n + 1):]
# 第一关:ER
spread_dir = abs(series[-1] - series[0])
spread_path = sum(abs(series[i] - series[i - 1]) for i in range(1, len(series)))
er = spread_dir / spread_path if spread_path > 1e-10 else 0.0
spread_net = series[-1] - series[0]
dir_ok = (spread_net <= -self._spread_net_thresh if direction == 'short'
else spread_net >= self._spread_net_thresh)
if not (er >= self._spread_er_thresh and dir_ok):
return False, ""
# 第二关:PP/ADF + KPSS
ur_p = kpss_p = None
ur_name = "ER-only"
if len(series) >= 15:
if self._spread_use_pp and _HAS_PP:
try:
import numpy as _np
ur_p = float(_PhillipsPerron(_np.array(series, dtype=float)).pvalue)
ur_name = "PP"
except Exception:
pass
if ur_p is None and _HAS_ADF:
try:
ur_p = _adfuller(series, maxlag=2, regression='c', autolag=None)[1]
ur_name = "ADF"
except Exception:
pass
if _HAS_KPSS:
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
kpss_p = _kpss(series, regression='c', nlags='auto')[1]
except Exception:
pass
# 四象限 + 半衰期
if ur_p is not None and kpss_p is not None:
ur_stat = ur_p < self._spread_adf_pvalue
kpss_stat = kpss_p > self._spread_kpss_pvalue
if ur_stat and kpss_stat:
hl = _estimate_half_life(series, hurwicz_correction=self._spread_hurwicz)
hl_max = min(self._spread_hl_max, max_hold_hours * 0.5)
if hl is not None and hl > hl_max:
return True, (f"平稳但半衰期过长: {ur_name}_p={ur_p:.3f}"
f" KPSS_p={kpss_p:.3f} HL={hl:.1f}h>{hl_max:.0f}h")
hl_s = f" HL={hl:.1f}h" if hl else " HL=N/A"
return False, (f"{ur_name}+KPSS确认平稳({ur_name}_p={ur_p:.3f}"
f" KPSS_p={kpss_p:.3f}{hl_s})->推翻拦截")
if not ur_stat and not kpss_stat:
return True, f"Spread趋势(三重确认): ER={er:.2f} {ur_name}_p={ur_p:.3f} KPSS_p={kpss_p:.3f}"
return True, f"Spread趋势(矛盾): ER={er:.2f} {ur_name}_p={ur_p:.3f} KPSS_p={kpss_p:.3f}->保守拦截"
if ur_p is not None:
if ur_p < self._spread_adf_pvalue:
hl = _estimate_half_life(series, hurwicz_correction=self._spread_hurwicz)
hl_max = min(self._spread_hl_max, max_hold_hours * 0.5)
if hl is not None and hl > hl_max:
return True, f"平稳但HL过长: {ur_name}_p={ur_p:.3f} HL={hl:.1f}h"
return False, f"{ur_name}判定平稳(p={ur_p:.3f})->推翻拦截"
return True, f"Spread趋势({ur_name}+ER): ER={er:.2f} p={ur_p:.3f}"
return True, f"Spread趋势(ER): ER={er:.2f} 净位移={spread_net:+.3f}"
def ready(self, symbol: str) -> bool:
buf = self._buffers.get(symbol)
return buf is not None and len(buf) >= self._rs_period + 2
# ── Layer 0 ──
def _check_regime(self, symbol: str, ctx: _LayerContext) -> tuple[bool, str]:
if self._bocpd_enabled:
bocpd = self._bocpd_state.get(symbol)
if bocpd and bocpd.ready:
trend_prob = bocpd.trend_probability
ctx.bocpd_trend_prob = trend_prob
if trend_prob > self._bocpd_trend_thresh:
return False, (f"Layer0-BOCPD趋势机制: P(trending)={trend_prob:.3f}"
f">{self._bocpd_trend_thresh}")
return True, ""
if not self._hurst_enabled:
return True, ""
buf = self._buffers.get(symbol)
if buf is None or len(buf) < self._hurst_lookback + 1:
return True, ""
closes = [d[0] for d in list(buf)[-(self._hurst_lookback + 1):]]
h, r2 = _hurst_dfa2(closes, r2_threshold=self._hurst_r2_thresh)
if h is not None:
ctx.hurst = h; ctx.hurst_r2 = r2
if h is None:
return True, ""
if h > self._hurst_thresh:
return False, (f"Layer0-DFA2趋势机制: Hurst={h:.3f}>{self._hurst_thresh}"
f" R2={r2:.3f}(BOCPD预热中)")
return True, ""
# ── Layer 1 ──
def _check_sustained(self, symbol: str, direction: str,
ctx: _LayerContext) -> tuple[bool, str]:
buf = self._buffers[symbol]
n = self._sustained_n
data = list(buf)[-(n + 1):]
closes = [d[0] for d in data]
if len(closes) < n + 1:
return True, ""
ref = closes[0]
if ref <= 0:
return True, ""
net_return = (closes[-1] - ref) / ref
direction_dist = abs(closes[-1] - ref)
path_length = sum(abs(closes[i] - closes[i - 1]) for i in range(1, len(closes)))
er = direction_dist / path_length if path_length > 1e-10 else 0.0
er_thresh = self._er_thresh_long if direction == 'long' else self._er_thresh_short
# MTF 连续加权
mtf_er = None
if self._mtf_enabled:
mtf_er = self._calc_mtf_er(symbol)
if mtf_er is not None:
span = self._mtf_soft_upper - self._mtf_soft_lower
if span > 1e-10:
mtf_w = max(0.0, min(1.0, (mtf_er - self._mtf_soft_lower) / span))
else:
mtf_w = 1.0 if mtf_er >= self._mtf_soft_upper else 0.0
er_thresh *= 1.0 - self._mtf_max_reduction * mtf_w
# 跨层 BOCPD 修正
if self._cross_layer and ctx.bocpd_trend_prob is not None:
if ctx.bocpd_trend_prob > 0.5:
er_thresh *= 1.0 - self._cross_factor * (ctx.bocpd_trend_prob - 0.5)
if er < er_thresh:
return True, ""
# RS 自适应阈值
rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
baseline = self._baseline_median.get(symbol)
bl_val = baseline.value if (baseline and len(baseline) >= 5) else None
if rs_vol > 0 and bl_val and bl_val > 0:
adaptive_thresh = self._sustained_base_thresh * max(0.3, min(3.0, rs_vol / bl_val))
else:
adaptive_thresh = self._sustained_base_thresh
mtf_str = f" [MTF-1h-ER={mtf_er:.2f}]" if mtf_er is not None and mtf_er > self._mtf_soft_lower else ""
if direction == 'short' and net_return <= -adaptive_thresh:
return False, (f"Layer1-不追跌: 净跌幅={net_return:.2%} 阈值=-{adaptive_thresh:.2%}"
f" ER={er:.2f}>={er_thresh:.2f}{mtf_str}")
if direction == 'long' and net_return >= adaptive_thresh:
return False, (f"Layer1-不追涨: 净涨幅={net_return:.2%} 阈值=+{adaptive_thresh:.2%}"
f" ER={er:.2f}>={er_thresh:.2f}{mtf_str}")
return True, ""
def _calc_mtf_er(self, symbol: str) -> float | None:
buf_1h = self._buffers_1h.get(symbol)
if buf_1h is None or len(buf_1h) < self._mtf_lookback + 1:
return None
data = list(buf_1h)[-(self._mtf_lookback + 1):]
closes = [d[0] for d in data]
d_dist = abs(closes[-1] - closes[0])
p_len = sum(abs(closes[i] - closes[i - 1]) for i in range(1, len(closes)))
return d_dist / p_len if p_len > 1e-10 else 0.0
# ── Layer 2 ──
def _check_spike(self, symbol: str, direction: str,
ctx: _LayerContext) -> tuple[bool, str]:
s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))
eff_up = self._cusum_thresh_up
eff_down = self._cusum_thresh_down
# BTC 滞后因果因子
btc_str = ""
if symbol != self._market_ref:
btc_hist = self._btc_stress_history.get(self._market_ref)
btc_stress = max(list(btc_hist)[-self._btc_stress_lag:]) if btc_hist and len(btc_hist) >= 1 else 0.0
if btc_stress > self._btc_stress_thresh:
excess = btc_stress - self._btc_stress_thresh
sf = max(self._btc_stress_factor_min, math.exp(-self._btc_decay_rate * excess))
eff_up *= sf; eff_down *= sf
btc_str = f" BTC(lag={self._btc_stress_lag})->x{sf:.2f}"
# 跨层修正
if self._cross_layer and ctx.bocpd_trend_prob is not None and ctx.bocpd_trend_prob > 0.5:
adj = 1.0 - 0.2 * (ctx.bocpd_trend_prob - 0.5)
eff_up *= adj; eff_down *= adj
vol_ok, vol_r, vwpm = self._check_volume(symbol)
rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
info = f" RS={rs_vol:.4f}" + (f" 量比={vol_r:.1f}" if vol_r > 0 else "") + (f" VWPM={vwpm:.4f}" if vwpm else "")
if direction == 'short' and s_pos >= eff_up and vol_ok:
return False, f"Layer2-暴涨不做空: CUSUM+={s_pos:.2f}>={eff_up:.2f}{info}{btc_str}"
if direction == 'long' and s_neg >= eff_down and vol_ok:
return False, f"Layer2-暴跌不做多: CUSUM-={s_neg:.2f}>={eff_down:.2f}{info}{btc_str}"
return True, ""
def _check_volume(self, symbol: str) -> tuple[bool, float, float]:
buf = self._buffers.get(symbol)
if buf is None or len(buf) < 2:
return True, 0.0, 0.0
current_vol = buf[-1][4]
vol_ema = self._vol_ema.get(symbol, 0.0)
if vol_ema <= 0 or current_vol <= 0:
return True, 0.0, 0.0
vol_ratio = current_vol / vol_ema
vwpm_val = 0.0
w = min(self._vwpm_window, len(buf) - 1)
if w >= 1:
data = list(buf)[-(w + 1):]
weighted_sum = total_vol = 0.0
for i in range(1, len(data)):
pc, cc, v = data[i-1][0], data[i][0], data[i][4]
if pc > 0 and v > 0:
weighted_sum += (cc - pc) / pc * v
total_vol += v
if total_vol > 0:
vwpm_val = weighted_sum / total_vol
rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
if rs_vol > 1e-10 and vwpm_val != 0.0:
confirmed = (vol_ratio >= self._vol_confirm_ratio
and abs(vwpm_val) / rs_vol >= self._vwpm_confirm_ratio)
else:
confirmed = vol_ratio >= self._vol_confirm_ratio
return confirmed, vol_ratio, vwpm_val
# ── 在线更新 ──
def _online_update(self, symbol: str, buf: deque) -> None:
curr = buf[-1]
prev = buf[-2]
close, high, low, open_, volume = curr
prev_close = prev[0]
if prev_close <= 0 or close <= 0:
return
# RS EMA
rs_val = self._calc_rs_single(high, low, open_, close)
alpha_rs = 2.0 / (self._rs_period + 1)
prev_rs = self._rs_ema.get(symbol, rs_val)
new_rs = alpha_rs * rs_val + (1.0 - alpha_rs) * prev_rs
self._rs_ema[symbol] = new_rs
# 基准波动率
rs_vol_now = math.sqrt(max(0.0, new_rs))
if symbol not in self._baseline_median:
self._baseline_median[symbol] = _RollingMedian(maxlen=200)
self._baseline_median[symbol].push(rs_vol_now)
# Huber-CUSUM
rs_vol = math.sqrt(max(0.0, new_rs))
if rs_vol > 1e-10:
z_raw = (close - prev_close) / prev_close / rs_vol
z = math.copysign(min(abs(z_raw), self._cusum_z_clip), z_raw)
drift = self._cusum_drift
if self._cusum_drift_adaptive:
bl = self._baseline_median.get(symbol)
bl_val = bl.value if (bl and len(bl) >= 5) else None
if bl_val and bl_val > 0:
drift = self._cusum_drift * max(0.5, min(2.0, rs_vol / bl_val))
s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))
self._cusum_state[symbol] = (max(0.0, s_pos + z - drift),
max(0.0, s_neg - z - drift))
# BTC stress 历史
if symbol not in self._btc_stress_history:
self._btc_stress_history[symbol] = deque(maxlen=max(10, self._btc_stress_lag + 2))
sp, sn = self._cusum_state.get(symbol, (0.0, 0.0))
self._btc_stress_history[symbol].append(max(sp, sn))
# Volume EMA
if volume > 0:
alpha_v = 2.0 / (self._vol_ema_period + 1)
self._vol_ema[symbol] = alpha_v * volume + (1.0 - alpha_v) * self._vol_ema.get(symbol, volume)
# BOCPD
if self._bocpd_enabled and prev_close > 0:
log_ret = math.log(close / prev_close)
if symbol not in self._bocpd_state:
self._bocpd_state[symbol] = _BOCPD(
hazard_rate=self._bocpd_hazard, max_run=200,
alpha0=3.0, drift_threshold=self._bocpd_drift_thresh)
self._bocpd_state[symbol].update(log_ret)
# 1h 聚合
if self._mtf_enabled:
if symbol not in self._hour_accum:
self._hour_accum[symbol] = _HourBarAccum()
accum = self._hour_accum[symbol]
o = open_ if open_ > 0 else close
h = high if high > 0 else close
l = low if low > 0 else close
if accum.add(o, h, l, close, volume):
if symbol not in self._buffers_1h:
self._buffers_1h[symbol] = deque(maxlen=30)
self._buffers_1h[symbol].append(accum.as_tuple())
accum.reset()
@staticmethod
def _calc_rs_single(high: float, low: float, open_: float, close: float) -> float:
if high > 0 and low > 0 and open_ > 0 and high >= low:
try:
rs = (math.log(high / close) * math.log(high / open_)
+ math.log(low / close) * math.log(low / open_))
return max(0.0, rs)
except (ValueError, ZeroDivisionError):
pass
if open_ > 0:
try:
return math.log(close / open_) ** 2
except (ValueError, ZeroDivisionError):
pass
return 0.0
3.3 StrategyParams 新增字段
@dataclass(frozen=True)
class StrategyParams:
# ... 现有字段 ...
# 动量过滤器
momentum_filter_enabled: bool = True
# Layer 0
momentum_bocpd_enabled: bool = True
momentum_bocpd_hazard_rate: float = 0.01
momentum_bocpd_trend_threshold: float = 0.75
momentum_bocpd_drift_threshold: float = 0.0002
momentum_hurst_enabled: bool = True
momentum_hurst_lookback: int = 60
momentum_hurst_threshold: float = 0.60
momentum_hurst_r2_threshold: float = 0.85
# Layer 1
momentum_sustained_lookback: int = 30
momentum_sustained_base_threshold: float = 0.008
momentum_er_threshold_long: float = 0.60
momentum_er_threshold_short: float = 0.50
momentum_mtf_enabled: bool = True
momentum_mtf_soft_lower: float = 0.30
momentum_mtf_soft_upper: float = 0.70
momentum_mtf_max_reduction: float = 0.20
momentum_mtf_lookback: int = 6
# Layer 2
momentum_rs_period: int = 10
momentum_cusum_drift: float = 0.5
momentum_cusum_drift_adaptive: bool = True
momentum_cusum_z_clip: float = 5.0
momentum_cusum_threshold_spike_up: float = 3.5
momentum_cusum_threshold_spike_down: float = 2.5
momentum_volume_confirm_ratio: float = 1.5
momentum_volume_ema_period: int = 20
momentum_vwpm_window: int = 5
momentum_vwpm_confirm_ratio: float = 0.8
momentum_market_ref_symbol: str = "BTC"
momentum_btc_stress_threshold: float = 2.0
momentum_btc_stress_factor_min: float = 0.7
momentum_btc_decay_rate: float = 0.3
momentum_btc_stress_lag: int = 3
# Layer 3
momentum_spread_lookback: int = 40
momentum_spread_er_threshold: float = 0.45
momentum_spread_net_threshold: float = 1.5
momentum_spread_adf_pvalue: float = 0.05
momentum_spread_kpss_pvalue: float = 0.05
momentum_spread_use_pp: bool = True
momentum_spread_half_life_max: float = 36.0
momentum_spread_hurwicz_correction: bool = True
# 跨层
momentum_cross_layer_enabled: bool = True
momentum_cross_layer_factor: float = 0.4
3.4 strategy.py 集成
_check_entry 中 Layer 3 仲裁调用:
spread_trend, spread_reason = self._momentum_filter.check_spread(
z4h_hist, direction, max_hold_hours=params.max_hold_hours,
)
4. 数据流
WebSocket K线(OHLCV)
-> realtime_kline_service_base._trigger_strategy_if_ready()
-> 提取 alt/base OHLCV
-> TradingOrchestrator.process_analysis(alt_ohlcv, base_ohlcv)
-> AdaptiveBollingerStrategy.process_tick(alt_ohlcv, base_ohlcv)
-> MomentumFilter.update() O(1) 在线更新
-> _check_entry()
-> 步骤 1-4: 冷却期/突破/持仓/z4h
-> 步骤 4.5: 方向判断
-> 步骤 4.6: 四层动量过滤
-> 步骤 5: EntrySignal
线程安全:MomentumFilter 由 AdaptiveBollingerStrategy._lock 统一保护。
5. 参数配置
5.1 关键参数调优
Layer 0 BOCPD:
| 参数 | 默认 | 范围 | 说明 |
|---|---|---|---|
bocpd_hazard_rate |
0.01 | 0.005-0.05 | 变点先验概率。0.01 = 期望机制长度 100 根(8.3h) |
bocpd_trend_threshold |
0.75 | 0.60-0.90 | P(trending) > 此值 -> 硬拦截 |
bocpd_drift_threshold |
0.0002 | 0.0001-0.0005 | 有经济意义的最小漂移(5min 收益率) |
Layer 1 MTF:
| 参数 | 默认 | 范围 | 说明 |
|---|---|---|---|
mtf_soft_lower |
0.30 | 0.20-0.40 | 低于此值无 MTF 修正 |
mtf_soft_upper |
0.70 | 0.55-0.80 | 高于此值满 MTF 修正 |
mtf_max_reduction |
0.20 | 0.10-0.30 | ER 阈值最大降低比例 |
Layer 2 CUSUM + BTC:
| 参数 | 默认 | 范围 | 说明 |
|---|---|---|---|
cusum_z_clip |
5.0 | 3.0-8.0 | Huber 截断点。越小越鲁棒但可能削弱信号 |
btc_stress_lag |
3 | 1-10 | BTC stress 滞后根数(x5min)。3 = 15 分钟因果延迟 |
btc_decay_rate |
0.3 | 0.1-0.5 | BTC 指数衰减率 |
Layer 3 Spread:
| 参数 | 默认 | 范围 | 说明 |
|---|---|---|---|
spread_lookback |
40 | 20-80 | Spread 回望根数。40 = 200 分钟 |
spread_half_life_max |
36.0 | 12-48 | 最大允许半衰期(h)。建议 max_hold_hours/2 |
5.2 环境变量
TRADING_MOMENTUM_BOCPD_ENABLED=true
TRADING_MOMENTUM_BOCPD_HAZARD_RATE=0.01
TRADING_MOMENTUM_BOCPD_TREND_THRESHOLD=0.75
TRADING_MOMENTUM_BOCPD_DRIFT_THRESHOLD=0.0002
TRADING_MOMENTUM_HURST_ENABLED=true
TRADING_MOMENTUM_HURST_LOOKBACK=60
TRADING_MOMENTUM_HURST_THRESHOLD=0.60
TRADING_MOMENTUM_HURST_R2_THRESHOLD=0.85
TRADING_MOMENTUM_SUSTAINED_LOOKBACK=30
TRADING_MOMENTUM_SUSTAINED_BASE_THRESHOLD=0.008
TRADING_MOMENTUM_ER_THRESHOLD_LONG=0.60
TRADING_MOMENTUM_ER_THRESHOLD_SHORT=0.50
TRADING_MOMENTUM_MTF_ENABLED=true
TRADING_MOMENTUM_MTF_SOFT_LOWER=0.30
TRADING_MOMENTUM_MTF_SOFT_UPPER=0.70
TRADING_MOMENTUM_MTF_MAX_REDUCTION=0.20
TRADING_MOMENTUM_MTF_LOOKBACK=6
TRADING_MOMENTUM_RS_PERIOD=10
TRADING_MOMENTUM_CUSUM_DRIFT=0.5
TRADING_MOMENTUM_CUSUM_DRIFT_ADAPTIVE=true
TRADING_MOMENTUM_CUSUM_Z_CLIP=5.0
TRADING_MOMENTUM_CUSUM_THRESHOLD_SPIKE_UP=3.5
TRADING_MOMENTUM_CUSUM_THRESHOLD_SPIKE_DOWN=2.5
TRADING_MOMENTUM_VOLUME_CONFIRM_RATIO=1.5
TRADING_MOMENTUM_VWPM_WINDOW=5
TRADING_MOMENTUM_VWPM_CONFIRM_RATIO=0.8
TRADING_MOMENTUM_MARKET_REF_SYMBOL=BTC
TRADING_MOMENTUM_BTC_STRESS_THRESHOLD=2.0
TRADING_MOMENTUM_BTC_STRESS_FACTOR_MIN=0.7
TRADING_MOMENTUM_BTC_DECAY_RATE=0.3
TRADING_MOMENTUM_BTC_STRESS_LAG=3
TRADING_MOMENTUM_SPREAD_LOOKBACK=40
TRADING_MOMENTUM_SPREAD_ER_THRESHOLD=0.45
TRADING_MOMENTUM_SPREAD_NET_THRESHOLD=1.5
TRADING_MOMENTUM_SPREAD_ADF_PVALUE=0.05
TRADING_MOMENTUM_SPREAD_KPSS_PVALUE=0.05
TRADING_MOMENTUM_SPREAD_USE_PP=true
TRADING_MOMENTUM_SPREAD_HALF_LIFE_MAX=36.0
TRADING_MOMENTUM_SPREAD_HURWICZ_CORRECTION=true
TRADING_MOMENTUM_CROSS_LAYER_ENABLED=true
TRADING_MOMENTUM_CROSS_LAYER_FACTOR=0.4
6. 风险与局限性
| 局限 | 缓解 |
|---|---|
| BOCPD 预热期 20 根(100min) | DFA-2 自动后备;启动时 DB 回填 |
| Student-t CDF 连分数近似 | 精度 >0.001(df>=2);可用 scipy 验证 |
| 半衰期小样本敏感 | Hurwicz 校正 + spread_lookback=40 缓解 |
| PP 需 arch 库 | 自动降级 ADF |
| BTC lag 最优值未知 | 默认 3 (15min);可通过 Granger 因果检验确定 |
| 38 个参数过拟合风险 | 全有默认值;后续引入 risk_attitude 元参数降维 |
不适用场景: 极低流动性资产、HIP-3 稀疏资产、max_hold_hours < 4h 的极短持仓策略。
后续方向
| 优先级 | 方向 |
|---|---|
| P1 | 元参数降维:risk_attitude in [0,1] 驱动 38 参数联合缩放 |
| P1 | OFI 集成(L2 book -> 替代 VWPM) |
| P1 | BOCPD hazard_rate Beta 后验自适应 |
| P2 | Transfer Entropy BTC 因果因子 |
| P2 | Kalman Filter 动态对冲比率 |
| P2 | 对数似然比跨层统一框架 |
附录:学术参考
| 算法 | 论文 |
|---|---|
| BOCPD | Adams & MacKay (2007) arXiv:0710.3742 |
| NIG 后验 | Murphy (2007) Conjugate Bayesian analysis of the Gaussian distribution |
| DFA-2 | Kantelhardt et al. (2002); Peng et al. (1994) |
| Kaufman ER | Kaufman (1995) Smarter Trading |
| CUSUM | Page (1954) Biometrika; Lorden (1971) Ann. Math. Statist. |
| CUSUM 自适应 drift | Hawkins & Olwell (1998) Cumulative Sum Charts |
| Huber 估计器 | Huber (1964) Ann. Math. Statist. |
| Rogers-Satchell | Rogers & Satchell (1991) Ann. Appl. Prob. |
| OFI | Cont, Kukanov & Stoikov (2014) J. Financial Econometrics |
| Phillips-Perron | Phillips & Perron (1988) Biometrika |
| KPSS | Kwiatkowski et al. (1992) J. Econometrics |
| OU 半衰期 | Vidyamurthy (2004) Pairs Trading |
| Hurwicz 偏差 | Hurwicz (1950); Kendall (1954) Biometrika |
| 半衰期偏差校正 | Krauss (2017) J. Economic Surveys |
| BTC-alt 传导 | Makarov & Schoar (2020) J. Financial Economics |
| 时间序列动量 | Moskowitz, Ooi & Pedersen (2012) J. Financial Economics |