开仓动量过滤器设计方案6精简版

开仓动量过滤器设计方案

版本:v5.1 | 日期:2026-03-06
适用:Hyperliquid 量化交易系统(Adaptive Bollinger Z-Score 配对策略)


1. 问题定义

1.1 四项约束

约束 1:连续下跌 -> 不追跌(不做空)
约束 2:连续上涨 -> 不追涨(不做多)
约束 3:迅速暴涨 -> 不做空
约束 4:迅速暴跌 -> 不做多

1.2 过滤维度

维度 检查对象 原因
Alt 腿 alt symbol 价格动量 直接持仓标的风险
Base 腿 base symbol 价格动量 配对反向腿同样承担动量风险
Spread 两腿价差趋势性 两腿同涨但 spread 稳定时不应误杀

2. 四层过滤架构

执行顺序按拦截严格性排序(硬拦截优先):

输入信号(direction, alt_symbol, base_symbol)
    |
    +-->> [第1步] Layer 0(硬拦截): BOCPD 机制检测 + DFA-2 后备
    |      P(trending) > 0.75 -> 拦截
    |      共享 ctx.bocpd_trend_prob -> Layer 2/1 阈值调节
    |
    +-->> [第2步] Layer 2(硬拦截): Huber-CUSUM + BTC 滞后因果因子
    |      任一腿急动触发 -> 拦截
    |
    +-->> [第3步] Layer 1(软拦截): ER + RS 净位移 + MTF 连续加权
    |      任一腿持续趋势 -> 软拦截 -> 交由 Layer 3 仲裁
    |
    +-->> [第4步] Layer 3(仲裁器): PP/ADF + KPSS + OU 半衰期
           Spread 平稳 + 半衰期合理 -> 推翻拦截;否则维持
    |
    v
  最终决策:允许 / 拒绝入场
层级 执行顺序 类型 可仲裁 触发含义
Layer 0 第1步 硬拦截 均值回归假设失效
Layer 2 第2步 硬拦截 急动执行风险极高
Layer 1 第3步 软拦截 单腿持续趋势,可能误杀
Layer 3 第4步 仲裁 -- Spread 平稳+半衰期合理则放行

2.1 Layer 0:BOCPD 机制检测 + DFA-2 后备

BOCPD (Adams & MacKay 2007) 使用 NIG 共轭先验在线检测机制切换,输出 P(trending)。

核心算法:

输入:对数收益率 {r_t},hazard_rate H = 0.01
先验:NIG(mu0=0, kappa0=1, alpha0=3, beta0=0.01)

每时刻 t,对每个运行长度 r = 0..R(R=200):
  1. 预测:pi(r_t|r) = Student-t_{2*alpha_r}(mu_r, beta_r*(kappa_r+1)/(alpha_r*kappa_r))
  2. 增长:P(r_t=r+1) = P(r_{t-1}=r) * pi * (1-H)
  3. 变点:P(r_t=0)  = Sum_r P(r_{t-1}=r) * pi * H
  4. 更新 NIG:kappa'=kappa+1, mu'=(kappa*mu+r_t)/kappa',
               alpha'=alpha+0.5, beta'=beta+kappa*(r_t-mu)^2/(2*kappa')
  5. 归一化 + 截断至 R

趋势概率(贝叶斯后验边缘化):

P(trending) = Sum_r P(r_t=r) * P(trending|r)

P(trending|r) = P(|mu_r| > drift_threshold | NIG posterior_r)

  mu_r | data ~ Student-t_{2*alpha_r}(mu_hat, beta_r/(alpha_r*kappa_r))
  drift_threshold = 0.0002(5min 收益率 0.02%,有经济意义的最小漂移)

决策逻辑:

if BOCPD 就绪(>= 20 根更新):
    if trend_prob > 0.75 -> 硬拦截
    else -> 放行,trend_prob 共享给 Layer 2/1
else:
    降级为 DFA-2 + R-squared 质检(Hurst > 0.60 且 R2 > 0.85 -> 硬拦截)

2.2 Layer 2:Huber-CUSUM 急动检测

ret(i) = (close(i) - close(i-1)) / close(i-1)

# Huber 鲁棒化(减少闪崩/插针单根误触发)
z_raw = ret / rs_vol
z = sign(z_raw) * min(|z_raw|, 5.0)

# 自适应 drift(跟随波动率环境)
adaptive_drift = 0.5 * clamp(rs_vol / baseline_vol, 0.5, 2.0)

S_pos(i) = max(0, S_pos(i-1) + z - adaptive_drift)
S_neg(i) = max(0, S_neg(i-1) - z - adaptive_drift)

# BTC 滞后因果因子(lag=3 根 K 线 = 15 分钟)
if symbol != BTC:
    btc_stress = max(btc_stress_history[-3:])
    if btc_stress > 2.0:
        stress_factor = max(0.7, exp(-0.3 * (btc_stress - 2.0)))
        effective_thresh *= stress_factor

# 跨层 BOCPD 修正
if trend_prob > 0.5:
    effective_thresh *= 1.0 - 0.2 * (trend_prob - 0.5)

# VWPM + 量比联合确认
volume_confirmed = volume_ratio >= 1.5 AND |VWPM|/rs_vol >= 0.8

# 非对称触发
约束3:direction=='short' AND S_pos >= thresh_up AND volume_confirmed
约束4:direction=='long'  AND S_neg >= thresh_down AND volume_confirmed

RS 波动率使用 Rogers-Satchell (1991) 漂移不变 OHLC 估计器。

2.3 Layer 1:ER + RS 净位移 + MTF 连续加权

N = 30(5min K 线)

# Kaufman Efficiency Ratio
ER = |close[-1] - close[-N]| / Sum|close[i] - close[i-1]|

# 非对称 ER 阈值
er_thresh = 0.60(做多)或 0.50(做空)

# MTF 连续加权(1h 级别 ER 修正 5m 阈值)
mtf_er = 1h 级别 ER(6 根 1h bar)
mtf_weight = clamp((mtf_er - 0.30) / (0.70 - 0.30), 0, 1)
er_thresh *= 1.0 - 0.20 * mtf_weight

# 跨层 BOCPD 修正
if trend_prob > 0.5:
    er_thresh *= 1.0 - 0.4 * (trend_prob - 0.5)

# RS 自适应净位移阈值
adaptive_thresh = 0.008 * clamp(rs_vol / baseline, 0.3, 3.0)

# 联合触发
约束1:direction=='short' AND net_return <= -adaptive_thresh AND ER >= er_thresh
约束2:direction=='long'  AND net_return >= +adaptive_thresh AND ER >= er_thresh

2.4 Layer 3:Spread 仲裁(PP + KPSS + OU 半衰期)

仅在 Layer 1 软拦截时调用。判断 spread 是否平稳且回归速度足够快。

series = z4h_history 最近 41 个值(spread_lookback=40)

# 第一关:ER 快速筛选
if spread_er < 0.45 -> 无趋势,放行

# 第二关:PP/ADF + KPSS 平稳性检验
PP 优先(异方差鲁棒,需 arch 库),ADF 后备

四象限决策矩阵:
  ur_stationary + kpss_stationary -> 确认平稳 -> 第三关
  双非平稳 -> 确认非平稳 -> 维持拦截
  矛盾/不确定 -> 保守维持拦截

# 第三关:OU 半衰期约束(含 Hurwicz 偏差校正)
theta_ols = OLS 回归 DeltaX on X_{t-1} 的斜率
theta_corrected = theta_ols * n / (n - 2.5)     # Hurwicz 校正
half_life = -ln(2) / ln(1 + theta_corrected)     # 小时
if half_life > min(36.0, max_hold_hours * 0.5):
    -> 回归太慢 -> 维持拦截
else:
    -> 平稳 + 半衰期合理 -> 推翻拦截

2.5 跨层信息共享

@dataclass
_LayerContext:
    bocpd_trend_prob: float | None   # Layer 0 -> Layer 2, 1

信息流:Layer 0 -> ctx.bocpd_trend_prob -> Layer 2 调节 CUSUM 阈值
                                        -> Layer 1 调节 ER 阈值
设计原则:
  - 修正因子最多 10-15% 阈值调整,不是决定性开关
  - 各层可独立关闭/测试
  - ctx 仅单次 check() 内有效

3. 完整实现

3.1 改动文件

文件 改动 行数
src/trading/momentum_filter.py 重写 ~600 行
src/trading/config.py StrategyParams 新增字段 ~20 行
src/trading/strategy.py _check_entry 注入过滤器 ~2 行
src/trading/orchestrator.py 透传 OHLCV ~5 行
src/services/realtime_kline_service_base.py 提取双腿 OHLCV ~20 行

3.2 momentum_filter.py 完整代码

# src/trading/momentum_filter.py
"""
开仓动量过滤器 v5.1

四层过滤 + 跨层共享:
  Layer 0: BOCPD 机制检测 + DFA-2 后备        -> 硬拦截
  Layer 2: Huber-CUSUM + BTC 滞后因果因子     -> 硬拦截
  Layer 1: ER + RS 净位移 + MTF 连续加权      -> 软拦截
  Layer 3: PP/ADF + KPSS + OU 半衰期(Hurwicz) -> 仲裁
"""

import math
import warnings
from collections import deque
from dataclasses import dataclass
from datetime import datetime

try:
    from statsmodels.tsa.stattools import adfuller as _adfuller
    _HAS_ADF = True
except ImportError:
    _HAS_ADF = False

try:
    from statsmodels.tsa.stattools import kpss as _kpss
    _HAS_KPSS = True
except ImportError:
    _HAS_KPSS = False

try:
    from arch.unitroot import PhillipsPerron as _PhillipsPerron
    _HAS_PP = True
except ImportError:
    _HAS_PP = False


# ── 辅助函数 ──

def _logsumexp(vals: list[float]) -> float:
    if not vals:
        return float('-inf')
    m = max(vals)
    if m == float('-inf'):
        return float('-inf')
    return m + math.log(sum(math.exp(v - m) for v in vals))


def _student_t_logpdf(x: float, mu: float, scale_sq: float, df: float) -> float:
    if scale_sq <= 1e-30 or df <= 0:
        return -50.0
    z_sq = (x - mu) ** 2 / (df * scale_sq)
    return (math.lgamma((df + 1) / 2)
            - math.lgamma(df / 2)
            - 0.5 * math.log(df * math.pi * scale_sq)
            - (df + 1) / 2 * math.log(1 + z_sq))


def _student_t_cdf(x: float, df: float) -> float:
    """Student-t CDF(连分数近似,df >= 2 精度优于 0.001)。"""
    if df <= 0:
        return 0.5
    t = x
    y = t * t
    p = y / (df + y)

    if df >= 30:
        z = t * (1 - 1 / (4 * df)) / math.sqrt(1 + y / (2 * df))
        a1, a2, a3 = 0.4361836, -0.1201676, 0.9372980
        az = abs(z)
        if az > 8:
            return 1.0 if z > 0 else 0.0
        tt = 1.0 / (1.0 + 0.33267 * az)
        phi = 0.5 * math.exp(-az * az / 2) * tt * (a1 + tt * (a2 + tt * a3))
        return 1.0 - phi if z > 0 else phi

    a = df / 2
    b = 0.5
    p = max(0.0, min(1.0, p))

    if p < 1e-15:
        ibeta = 0.0
    elif p > 1 - 1e-15:
        ibeta = 1.0
    else:
        log_prefix = (math.lgamma(a + b) - math.lgamma(a) - math.lgamma(b)
                      + a * math.log(p) + b * math.log(1 - p))
        prefix = math.exp(log_prefix)
        f = 1.0
        c = 1.0
        d = 1.0 - (a + b) * p / (a + 1)
        if abs(d) < 1e-30:
            d = 1e-30
        d = 1.0 / d
        f = d
        for m in range(1, 100):
            num = m * (b - m) * p / ((a + 2 * m - 1) * (a + 2 * m))
            d = 1.0 + num * d
            if abs(d) < 1e-30: d = 1e-30
            c = 1.0 + num / c
            if abs(c) < 1e-30: c = 1e-30
            d = 1.0 / d
            f *= d * c
            num = -(a + m) * (a + b + m) * p / ((a + 2 * m) * (a + 2 * m + 1))
            d = 1.0 + num * d
            if abs(d) < 1e-30: d = 1e-30
            c = 1.0 + num / c
            if abs(c) < 1e-30: c = 1e-30
            d = 1.0 / d
            delta = d * c
            f *= delta
            if abs(delta - 1.0) < 1e-10:
                break
        ibeta = prefix * f / a

    half_ibeta = 0.5 * ibeta
    return 1.0 - half_ibeta if t >= 0 else half_ibeta


def _estimate_half_life(series: list[float], bar_minutes: float = 5.0,
                        hurwicz_correction: bool = True) -> float | None:
    """OU 半衰期估计(OLS + Hurwicz 偏差校正)。"""
    n = len(series)
    if n < 10:
        return None
    y = [series[i] - series[i - 1] for i in range(1, n)]
    x = series[:-1]
    n_obs = len(y)
    x_mean = sum(x) / n_obs
    y_mean = sum(y) / n_obs
    cov_xy = sum((x[i] - x_mean) * (y[i] - y_mean) for i in range(n_obs))
    var_x = sum((x[i] - x_mean) ** 2 for i in range(n_obs))
    if var_x < 1e-12:
        return None
    theta = cov_xy / var_x
    if hurwicz_correction and n_obs > 3:
        theta = theta * n_obs / (n_obs - 2.5)
    if theta >= 0 or theta <= -1:
        return None
    half_life_bars = -math.log(2) / math.log(1 + theta)
    half_life_hours = half_life_bars * bar_minutes / 60.0
    return half_life_hours if half_life_hours > 0 else None


# ── BOCPD ──

class _BOCPD:
    """Bayesian Online Changepoint Detection (Adams & MacKay 2007)。
    NIG 共轭先验,Student-t 预测,log-space,O(R) per update。"""

    __slots__ = ('_log_h', '_log_1mh', '_mu0', '_kappa0', '_alpha0', '_beta0',
                 '_max_run', '_log_probs', '_suff', '_n_updates',
                 '_drift_threshold')

    def __init__(self, hazard_rate: float = 0.01, mu0: float = 0.0,
                 kappa0: float = 1.0, alpha0: float = 3.0,
                 beta0: float = 0.01, max_run: int = 200,
                 drift_threshold: float = 0.0002):
        self._log_h = math.log(max(1e-10, hazard_rate))
        self._log_1mh = math.log(max(1e-10, 1.0 - hazard_rate))
        self._mu0, self._kappa0 = mu0, kappa0
        self._alpha0, self._beta0 = alpha0, beta0
        self._max_run = max_run
        self._drift_threshold = drift_threshold
        self._log_probs: list[float] = [0.0]
        self._suff: list[tuple[float, float, float, float]] = [
            (mu0, kappa0, alpha0, beta0)
        ]
        self._n_updates = 0

    def update(self, x: float) -> None:
        n = len(self._log_probs)
        log_preds = []
        for i in range(n):
            mu, kappa, alpha, beta = self._suff[i]
            if kappa > 0 and alpha > 0 and beta > 0:
                scale_sq = beta * (kappa + 1) / (alpha * kappa)
                log_preds.append(_student_t_logpdf(x, mu, scale_sq, 2 * alpha))
            else:
                log_preds.append(-50.0)
        log_growth = [
            self._log_probs[i] + log_preds[i] + self._log_1mh for i in range(n)
        ]
        log_cp = _logsumexp([
            self._log_probs[i] + log_preds[i] + self._log_h for i in range(n)
        ])
        new_log = [log_cp] + log_growth
        log_total = _logsumexp(new_log)
        new_log = [lp - log_total for lp in new_log]
        new_suff: list[tuple[float, float, float, float]] = [
            (self._mu0, self._kappa0, self._alpha0, self._beta0)
        ]
        for i in range(n):
            mu, kappa, alpha, beta = self._suff[i]
            kappa_n = kappa + 1
            mu_n = (kappa * mu + x) / kappa_n
            alpha_n = alpha + 0.5
            beta_n = beta + kappa * (x - mu) ** 2 / (2 * kappa_n)
            new_suff.append((mu_n, kappa_n, alpha_n, beta_n))
        if len(new_log) > self._max_run:
            new_log = new_log[:self._max_run]
            new_suff = new_suff[:self._max_run]
            log_total = _logsumexp(new_log)
            new_log = [lp - log_total for lp in new_log]
        self._log_probs = new_log
        self._suff = new_suff
        self._n_updates += 1

    @property
    def trend_probability(self) -> float:
        """P(trending) = Sum_r P(r) * P(|mu_r| > delta | NIG posterior_r)。"""
        delta = self._drift_threshold
        total = 0.0
        for i in range(len(self._log_probs)):
            prob = math.exp(self._log_probs[i])
            if prob < 1e-8:
                continue
            mu, kappa, alpha, beta = self._suff[i]
            if alpha > 0.5 and beta > 1e-30 and kappa > 0:
                df = 2 * alpha
                scale = math.sqrt(beta / (alpha * kappa))
                if scale < 1e-15:
                    p_trend = 1.0 if abs(mu) > delta else 0.0
                else:
                    t_upper = (delta - mu) / scale
                    t_lower = (-delta - mu) / scale
                    p_trend = (1.0 - _student_t_cdf(t_upper, df)
                               + _student_t_cdf(t_lower, df))
            else:
                p_trend = 0.5
            total += prob * p_trend
        return total

    @property
    def ready(self) -> bool:
        return self._n_updates >= 20


# ── DFA-2 后备 ──

def _hurst_dfa2(closes: list[float], r2_threshold: float = 0.85) -> tuple[float | None, float | None]:
    """DFA-2 Hurst 指数估计。返回 (hurst, r_squared) 或 (None, None)。"""
    n = len(closes)
    if n < 20:
        return None, None
    rets = []
    for i in range(1, n):
        if closes[i] > 0 and closes[i - 1] > 0:
            rets.append(math.log(closes[i] / closes[i - 1]))
    nr = len(rets)
    if nr < 16:
        return None, None
    mean_r = sum(rets) / nr
    profile = []
    cum = 0.0
    for r in rets:
        cum += r - mean_r
        profile.append(cum)
    scales: list[float] = []
    rms_vals: list[float] = []
    s = 4
    max_s = nr // 4
    while s <= max_s:
        n_win = nr // s
        if n_win < 2:
            break
        rss_sum = 0.0
        rss_cnt = 0
        for i in range(n_win):
            seg = profile[i * s: (i + 1) * s]
            ws = len(seg)
            if ws < 3:
                if ws >= 2:
                    x_m = (ws - 1) * 0.5
                    y_m = sum(seg) / ws
                    cov = sum((j - x_m) * (seg[j] - y_m) for j in range(ws))
                    var_x = sum((j - x_m) ** 2 for j in range(ws))
                    slope = cov / var_x if var_x > 1e-12 else 0.0
                    intercept = y_m - slope * x_m
                    rss = sum((seg[j] - (intercept + slope * j)) ** 2 for j in range(ws)) / ws
                    rss_sum += rss
                    rss_cnt += 1
                continue
            s0 = float(ws)
            s1 = ws * (ws - 1) / 2.0
            s2 = ws * (ws - 1) * (2 * ws - 1) / 6.0
            s3 = (ws * (ws - 1) / 2.0) ** 2
            s4 = ws * (ws - 1) * (2 * ws - 1) * (3 * ws * ws - 3 * ws - 1) / 30.0
            sy0 = sum(seg)
            sy1 = sum(j * seg[j] for j in range(ws))
            sy2 = sum(j * j * seg[j] for j in range(ws))
            det = (s0 * (s2 * s4 - s3 * s3)
                   - s1 * (s1 * s4 - s3 * s2)
                   + s2 * (s1 * s3 - s2 * s2))
            if abs(det) < 1e-12:
                x_m = (ws - 1) * 0.5
                y_m = sum(seg) / ws
                cov = sum((j - x_m) * (seg[j] - y_m) for j in range(ws))
                var_x = sum((j - x_m) ** 2 for j in range(ws))
                slope = cov / var_x if var_x > 1e-12 else 0.0
                intercept = y_m - slope * x_m
                rss = sum((seg[j] - (intercept + slope * j)) ** 2 for j in range(ws)) / ws
                rss_sum += rss
                rss_cnt += 1
                continue
            a = ((sy0 * (s2 * s4 - s3 * s3) - s1 * (sy1 * s4 - s3 * sy2)
                  + s2 * (sy1 * s3 - s2 * sy2)) / det)
            b = ((s0 * (sy1 * s4 - s3 * sy2) - sy0 * (s1 * s4 - s3 * s2)
                  + s2 * (s1 * sy2 - sy1 * s2)) / det)
            c = ((s0 * (s2 * sy2 - sy1 * s3) - s1 * (s1 * sy2 - sy1 * s2)
                  + sy0 * (s1 * s3 - s2 * s2)) / det)
            rss = sum((seg[j] - (a + b * j + c * j * j)) ** 2 for j in range(ws)) / ws
            rss_sum += rss
            rss_cnt += 1
        if rss_cnt >= 2:
            f_s = math.sqrt(rss_sum / rss_cnt)
            if f_s > 1e-14:
                scales.append(math.log(s))
                rms_vals.append(math.log(f_s))
        s = max(s + 1, int(s * 1.4 + 0.5))
    if len(scales) < 3:
        return None, None
    n_pts = len(scales)
    x_m = sum(scales) / n_pts
    y_m = sum(rms_vals) / n_pts
    num = sum((scales[i] - x_m) * (rms_vals[i] - y_m) for i in range(n_pts))
    den = sum((scales[i] - x_m) ** 2 for i in range(n_pts))
    if den < 1e-12:
        return None, None
    hurst = max(0.0, min(1.0, num / den))
    slope = num / den
    ss_res = sum((rms_vals[i] - (y_m + slope * (scales[i] - x_m))) ** 2 for i in range(n_pts))
    ss_tot = sum((rms_vals[i] - y_m) ** 2 for i in range(n_pts))
    r_squared = 1.0 - ss_res / ss_tot if ss_tot > 1e-12 else 0.0
    if r_squared < r2_threshold:
        return None, None
    return hurst, r_squared


# ── 辅助类 ──

class _RollingMedian:
    __slots__ = ('_data',)
    def __init__(self, maxlen: int):
        self._data: deque = deque(maxlen=maxlen)
    def push(self, val: float) -> None:
        self._data.append(val)
    @property
    def value(self) -> float | None:
        if not self._data:
            return None
        s = sorted(self._data)
        n = len(s)
        mid = n >> 1
        return s[mid] if n & 1 else (s[mid - 1] + s[mid]) * 0.5
    def __len__(self) -> int:
        return len(self._data)


class _HourBarAccum:
    """5m -> 1h 聚合器。"""
    __slots__ = ('count', 'open_', 'high', 'low', 'close', 'volume')
    def __init__(self):
        self.reset()
    def reset(self):
        self.count = 0
        self.open_ = self.high = self.close = self.volume = 0.0
        self.low = float('inf')
    def add(self, o: float, h: float, l: float, c: float, v: float) -> bool:
        if self.count == 0:
            self.open_ = o; self.high = h; self.low = l
        else:
            self.high = max(self.high, h); self.low = min(self.low, l)
        self.close = c; self.volume += v; self.count += 1
        return self.count >= 12
    def as_tuple(self) -> tuple:
        return (self.close, self.high, self.low, self.open_, self.volume)


@dataclass
class _LayerContext:
    bocpd_trend_prob: float | None = None
    hurst: float | None = None
    hurst_r2: float | None = None


# ── MomentumFilter 主类 ──

class MomentumFilter:
    """开仓动量过滤器 v5.1。

    公开接口:
      update(symbol, close, high, low, open_, volume, kline_time)
      check(symbol, direction) -> (allowed, reason, is_soft)
      check_spread(z4h_history, direction, max_hold_hours) -> (has_trend, reason)
      ready(symbol) -> bool
    """

    def __init__(
        self,
        enabled: bool = True,
        # Layer 0
        bocpd_enabled: bool = True,
        bocpd_hazard_rate: float = 0.01,
        bocpd_trend_threshold: float = 0.75,
        bocpd_drift_threshold: float = 0.0002,
        hurst_enabled: bool = True,
        hurst_lookback: int = 60,
        hurst_threshold: float = 0.60,
        hurst_r2_threshold: float = 0.85,
        # Layer 1
        sustained_lookback: int = 30,
        sustained_base_threshold: float = 0.008,
        er_threshold_long: float = 0.60,
        er_threshold_short: float = 0.50,
        mtf_enabled: bool = True,
        mtf_soft_lower: float = 0.30,
        mtf_soft_upper: float = 0.70,
        mtf_max_reduction: float = 0.20,
        mtf_lookback: int = 6,
        # Layer 2
        rs_period: int = 10,
        cusum_drift: float = 0.5,
        cusum_drift_adaptive: bool = True,
        cusum_z_clip: float = 5.0,
        cusum_threshold_spike_up: float = 3.5,
        cusum_threshold_spike_down: float = 2.5,
        volume_confirm_ratio: float = 1.5,
        volume_ema_period: int = 20,
        vwpm_window: int = 5,
        vwpm_confirm_ratio: float = 0.8,
        market_ref_symbol: str = "BTC",
        btc_stress_threshold: float = 2.0,
        btc_stress_factor_min: float = 0.7,
        btc_decay_rate: float = 0.3,
        btc_stress_lag: int = 3,
        # Layer 3
        spread_lookback: int = 40,
        spread_er_threshold: float = 0.45,
        spread_net_threshold: float = 1.5,
        spread_adf_pvalue: float = 0.05,
        spread_kpss_pvalue: float = 0.05,
        spread_use_pp: bool = True,
        spread_half_life_max: float = 36.0,
        spread_hurwicz_correction: bool = True,
        # 跨层
        cross_layer_enabled: bool = True,
        cross_layer_factor: float = 0.4,
    ):
        self._enabled = enabled

        self._bocpd_enabled = bocpd_enabled
        self._bocpd_hazard = bocpd_hazard_rate
        self._bocpd_trend_thresh = bocpd_trend_threshold
        self._bocpd_drift_thresh = bocpd_drift_threshold
        self._hurst_enabled = hurst_enabled
        self._hurst_lookback = max(20, hurst_lookback)
        self._hurst_thresh = hurst_threshold
        self._hurst_r2_thresh = hurst_r2_threshold

        self._sustained_n = max(5, sustained_lookback)
        self._sustained_base_thresh = sustained_base_threshold
        self._er_thresh_long = er_threshold_long
        self._er_thresh_short = er_threshold_short
        self._mtf_enabled = mtf_enabled
        self._mtf_soft_lower = mtf_soft_lower
        self._mtf_soft_upper = mtf_soft_upper
        self._mtf_max_reduction = mtf_max_reduction
        self._mtf_lookback = mtf_lookback

        self._rs_period = rs_period
        self._cusum_drift = cusum_drift
        self._cusum_drift_adaptive = cusum_drift_adaptive
        self._cusum_z_clip = cusum_z_clip
        self._cusum_thresh_up = cusum_threshold_spike_up
        self._cusum_thresh_down = cusum_threshold_spike_down
        self._vol_confirm_ratio = volume_confirm_ratio
        self._vol_ema_period = volume_ema_period
        self._vwpm_window = vwpm_window
        self._vwpm_confirm_ratio = vwpm_confirm_ratio
        self._market_ref = market_ref_symbol
        self._btc_stress_thresh = btc_stress_threshold
        self._btc_stress_factor_min = btc_stress_factor_min
        self._btc_decay_rate = btc_decay_rate
        self._btc_stress_lag = max(1, btc_stress_lag)

        self._spread_lookback = spread_lookback
        self._spread_er_thresh = spread_er_threshold
        self._spread_net_thresh = spread_net_threshold
        self._spread_adf_pvalue = spread_adf_pvalue
        self._spread_kpss_pvalue = spread_kpss_pvalue
        self._spread_use_pp = spread_use_pp
        self._spread_hl_max = spread_half_life_max
        self._spread_hurwicz = spread_hurwicz_correction

        self._cross_layer = cross_layer_enabled
        self._cross_factor = cross_layer_factor

        buf_size = max(hurst_lookback + 5, sustained_lookback + 5,
                       rs_period + 10, volume_ema_period + 5,
                       vwpm_window + 2) + 10
        self._buf_size = buf_size
        self._buffers: dict[str, deque] = {}
        self._last_kline_time: dict[str, datetime] = {}
        self._cusum_state: dict[str, tuple[float, float]] = {}
        self._rs_ema: dict[str, float] = {}
        self._vol_ema: dict[str, float] = {}
        self._baseline_median: dict[str, _RollingMedian] = {}
        self._bocpd_state: dict[str, _BOCPD] = {}
        self._buffers_1h: dict[str, deque] = {}
        self._hour_accum: dict[str, _HourBarAccum] = {}
        self._btc_stress_history: dict[str, deque] = {}

    # ── 公开接口 ──

    def update(self, symbol: str, close: float, high: float = 0.0,
               low: float = 0.0, open_: float = 0.0, volume: float = 0.0,
               kline_time: datetime | None = None) -> None:
        if kline_time is not None:
            if self._last_kline_time.get(symbol) == kline_time:
                return
            self._last_kline_time[symbol] = kline_time
        if symbol not in self._buffers:
            self._buffers[symbol] = deque(maxlen=self._buf_size)
        self._buffers[symbol].append((close, high, low, open_, volume))
        buf = self._buffers[symbol]
        if len(buf) >= 2:
            self._online_update(symbol, buf)

    def check(self, symbol: str, direction: str) -> tuple[bool, str, bool]:
        """Layer 0 -> Layer 2 -> Layer 1。返回 (allowed, reason, is_soft)。"""
        if not self._enabled:
            return True, "", False
        buf = self._buffers.get(symbol)
        if buf is None or len(buf) < self._rs_period + 2:
            return True, "", False

        ctx = _LayerContext()

        ok, reason = self._check_regime(symbol, ctx)
        if not ok:
            return False, reason, False

        ok, reason = self._check_spike(symbol, direction, ctx)
        if not ok:
            return False, reason, False

        if len(buf) >= self._sustained_n + 2:
            ok, reason = self._check_sustained(symbol, direction, ctx)
            if not ok:
                return False, reason, True

        return True, "", False

    def check_spread(self, z4h_history: list[float], direction: str,
                     max_hold_hours: float = 72.0) -> tuple[bool, str]:
        """Layer 3 仲裁。仅在 Layer 1 软拦截时调用。"""
        n = self._spread_lookback
        if len(z4h_history) < n + 1:
            return False, ""
        series = z4h_history[-(n + 1):]

        # 第一关:ER
        spread_dir = abs(series[-1] - series[0])
        spread_path = sum(abs(series[i] - series[i - 1]) for i in range(1, len(series)))
        er = spread_dir / spread_path if spread_path > 1e-10 else 0.0
        spread_net = series[-1] - series[0]
        dir_ok = (spread_net <= -self._spread_net_thresh if direction == 'short'
                  else spread_net >= self._spread_net_thresh)
        if not (er >= self._spread_er_thresh and dir_ok):
            return False, ""

        # 第二关:PP/ADF + KPSS
        ur_p = kpss_p = None
        ur_name = "ER-only"
        if len(series) >= 15:
            if self._spread_use_pp and _HAS_PP:
                try:
                    import numpy as _np
                    ur_p = float(_PhillipsPerron(_np.array(series, dtype=float)).pvalue)
                    ur_name = "PP"
                except Exception:
                    pass
            if ur_p is None and _HAS_ADF:
                try:
                    ur_p = _adfuller(series, maxlag=2, regression='c', autolag=None)[1]
                    ur_name = "ADF"
                except Exception:
                    pass
            if _HAS_KPSS:
                try:
                    with warnings.catch_warnings():
                        warnings.simplefilter("ignore")
                        kpss_p = _kpss(series, regression='c', nlags='auto')[1]
                except Exception:
                    pass

        # 四象限 + 半衰期
        if ur_p is not None and kpss_p is not None:
            ur_stat = ur_p < self._spread_adf_pvalue
            kpss_stat = kpss_p > self._spread_kpss_pvalue
            if ur_stat and kpss_stat:
                hl = _estimate_half_life(series, hurwicz_correction=self._spread_hurwicz)
                hl_max = min(self._spread_hl_max, max_hold_hours * 0.5)
                if hl is not None and hl > hl_max:
                    return True, (f"平稳但半衰期过长: {ur_name}_p={ur_p:.3f}"
                                  f" KPSS_p={kpss_p:.3f} HL={hl:.1f}h>{hl_max:.0f}h")
                hl_s = f" HL={hl:.1f}h" if hl else " HL=N/A"
                return False, (f"{ur_name}+KPSS确认平稳({ur_name}_p={ur_p:.3f}"
                               f" KPSS_p={kpss_p:.3f}{hl_s})->推翻拦截")
            if not ur_stat and not kpss_stat:
                return True, f"Spread趋势(三重确认): ER={er:.2f} {ur_name}_p={ur_p:.3f} KPSS_p={kpss_p:.3f}"
            return True, f"Spread趋势(矛盾): ER={er:.2f} {ur_name}_p={ur_p:.3f} KPSS_p={kpss_p:.3f}->保守拦截"

        if ur_p is not None:
            if ur_p < self._spread_adf_pvalue:
                hl = _estimate_half_life(series, hurwicz_correction=self._spread_hurwicz)
                hl_max = min(self._spread_hl_max, max_hold_hours * 0.5)
                if hl is not None and hl > hl_max:
                    return True, f"平稳但HL过长: {ur_name}_p={ur_p:.3f} HL={hl:.1f}h"
                return False, f"{ur_name}判定平稳(p={ur_p:.3f})->推翻拦截"
            return True, f"Spread趋势({ur_name}+ER): ER={er:.2f} p={ur_p:.3f}"

        return True, f"Spread趋势(ER): ER={er:.2f} 净位移={spread_net:+.3f}"

    def ready(self, symbol: str) -> bool:
        buf = self._buffers.get(symbol)
        return buf is not None and len(buf) >= self._rs_period + 2

    # ── Layer 0 ──

    def _check_regime(self, symbol: str, ctx: _LayerContext) -> tuple[bool, str]:
        if self._bocpd_enabled:
            bocpd = self._bocpd_state.get(symbol)
            if bocpd and bocpd.ready:
                trend_prob = bocpd.trend_probability
                ctx.bocpd_trend_prob = trend_prob
                if trend_prob > self._bocpd_trend_thresh:
                    return False, (f"Layer0-BOCPD趋势机制: P(trending)={trend_prob:.3f}"
                                   f">{self._bocpd_trend_thresh}")
                return True, ""
        if not self._hurst_enabled:
            return True, ""
        buf = self._buffers.get(symbol)
        if buf is None or len(buf) < self._hurst_lookback + 1:
            return True, ""
        closes = [d[0] for d in list(buf)[-(self._hurst_lookback + 1):]]
        h, r2 = _hurst_dfa2(closes, r2_threshold=self._hurst_r2_thresh)
        if h is not None:
            ctx.hurst = h; ctx.hurst_r2 = r2
        if h is None:
            return True, ""
        if h > self._hurst_thresh:
            return False, (f"Layer0-DFA2趋势机制: Hurst={h:.3f}>{self._hurst_thresh}"
                           f" R2={r2:.3f}(BOCPD预热中)")
        return True, ""

    # ── Layer 1 ──

    def _check_sustained(self, symbol: str, direction: str,
                         ctx: _LayerContext) -> tuple[bool, str]:
        buf = self._buffers[symbol]
        n = self._sustained_n
        data = list(buf)[-(n + 1):]
        closes = [d[0] for d in data]
        if len(closes) < n + 1:
            return True, ""
        ref = closes[0]
        if ref <= 0:
            return True, ""
        net_return = (closes[-1] - ref) / ref
        direction_dist = abs(closes[-1] - ref)
        path_length = sum(abs(closes[i] - closes[i - 1]) for i in range(1, len(closes)))
        er = direction_dist / path_length if path_length > 1e-10 else 0.0
        er_thresh = self._er_thresh_long if direction == 'long' else self._er_thresh_short

        # MTF 连续加权
        mtf_er = None
        if self._mtf_enabled:
            mtf_er = self._calc_mtf_er(symbol)
            if mtf_er is not None:
                span = self._mtf_soft_upper - self._mtf_soft_lower
                if span > 1e-10:
                    mtf_w = max(0.0, min(1.0, (mtf_er - self._mtf_soft_lower) / span))
                else:
                    mtf_w = 1.0 if mtf_er >= self._mtf_soft_upper else 0.0
                er_thresh *= 1.0 - self._mtf_max_reduction * mtf_w

        # 跨层 BOCPD 修正
        if self._cross_layer and ctx.bocpd_trend_prob is not None:
            if ctx.bocpd_trend_prob > 0.5:
                er_thresh *= 1.0 - self._cross_factor * (ctx.bocpd_trend_prob - 0.5)

        if er < er_thresh:
            return True, ""

        # RS 自适应阈值
        rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
        baseline = self._baseline_median.get(symbol)
        bl_val = baseline.value if (baseline and len(baseline) >= 5) else None
        if rs_vol > 0 and bl_val and bl_val > 0:
            adaptive_thresh = self._sustained_base_thresh * max(0.3, min(3.0, rs_vol / bl_val))
        else:
            adaptive_thresh = self._sustained_base_thresh

        mtf_str = f" [MTF-1h-ER={mtf_er:.2f}]" if mtf_er is not None and mtf_er > self._mtf_soft_lower else ""

        if direction == 'short' and net_return <= -adaptive_thresh:
            return False, (f"Layer1-不追跌: 净跌幅={net_return:.2%} 阈值=-{adaptive_thresh:.2%}"
                           f" ER={er:.2f}>={er_thresh:.2f}{mtf_str}")
        if direction == 'long' and net_return >= adaptive_thresh:
            return False, (f"Layer1-不追涨: 净涨幅={net_return:.2%} 阈值=+{adaptive_thresh:.2%}"
                           f" ER={er:.2f}>={er_thresh:.2f}{mtf_str}")
        return True, ""

    def _calc_mtf_er(self, symbol: str) -> float | None:
        buf_1h = self._buffers_1h.get(symbol)
        if buf_1h is None or len(buf_1h) < self._mtf_lookback + 1:
            return None
        data = list(buf_1h)[-(self._mtf_lookback + 1):]
        closes = [d[0] for d in data]
        d_dist = abs(closes[-1] - closes[0])
        p_len = sum(abs(closes[i] - closes[i - 1]) for i in range(1, len(closes)))
        return d_dist / p_len if p_len > 1e-10 else 0.0

    # ── Layer 2 ──

    def _check_spike(self, symbol: str, direction: str,
                     ctx: _LayerContext) -> tuple[bool, str]:
        s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))
        eff_up = self._cusum_thresh_up
        eff_down = self._cusum_thresh_down

        # BTC 滞后因果因子
        btc_str = ""
        if symbol != self._market_ref:
            btc_hist = self._btc_stress_history.get(self._market_ref)
            btc_stress = max(list(btc_hist)[-self._btc_stress_lag:]) if btc_hist and len(btc_hist) >= 1 else 0.0
            if btc_stress > self._btc_stress_thresh:
                excess = btc_stress - self._btc_stress_thresh
                sf = max(self._btc_stress_factor_min, math.exp(-self._btc_decay_rate * excess))
                eff_up *= sf; eff_down *= sf
                btc_str = f" BTC(lag={self._btc_stress_lag})->x{sf:.2f}"

        # 跨层修正
        if self._cross_layer and ctx.bocpd_trend_prob is not None and ctx.bocpd_trend_prob > 0.5:
            adj = 1.0 - 0.2 * (ctx.bocpd_trend_prob - 0.5)
            eff_up *= adj; eff_down *= adj

        vol_ok, vol_r, vwpm = self._check_volume(symbol)
        rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
        info = f" RS={rs_vol:.4f}" + (f" 量比={vol_r:.1f}" if vol_r > 0 else "") + (f" VWPM={vwpm:.4f}" if vwpm else "")

        if direction == 'short' and s_pos >= eff_up and vol_ok:
            return False, f"Layer2-暴涨不做空: CUSUM+={s_pos:.2f}>={eff_up:.2f}{info}{btc_str}"
        if direction == 'long' and s_neg >= eff_down and vol_ok:
            return False, f"Layer2-暴跌不做多: CUSUM-={s_neg:.2f}>={eff_down:.2f}{info}{btc_str}"
        return True, ""

    def _check_volume(self, symbol: str) -> tuple[bool, float, float]:
        buf = self._buffers.get(symbol)
        if buf is None or len(buf) < 2:
            return True, 0.0, 0.0
        current_vol = buf[-1][4]
        vol_ema = self._vol_ema.get(symbol, 0.0)
        if vol_ema <= 0 or current_vol <= 0:
            return True, 0.0, 0.0
        vol_ratio = current_vol / vol_ema
        vwpm_val = 0.0
        w = min(self._vwpm_window, len(buf) - 1)
        if w >= 1:
            data = list(buf)[-(w + 1):]
            weighted_sum = total_vol = 0.0
            for i in range(1, len(data)):
                pc, cc, v = data[i-1][0], data[i][0], data[i][4]
                if pc > 0 and v > 0:
                    weighted_sum += (cc - pc) / pc * v
                    total_vol += v
            if total_vol > 0:
                vwpm_val = weighted_sum / total_vol
        rs_vol = math.sqrt(max(0.0, self._rs_ema.get(symbol, 0.0)))
        if rs_vol > 1e-10 and vwpm_val != 0.0:
            confirmed = (vol_ratio >= self._vol_confirm_ratio
                         and abs(vwpm_val) / rs_vol >= self._vwpm_confirm_ratio)
        else:
            confirmed = vol_ratio >= self._vol_confirm_ratio
        return confirmed, vol_ratio, vwpm_val

    # ── 在线更新 ──

    def _online_update(self, symbol: str, buf: deque) -> None:
        curr = buf[-1]
        prev = buf[-2]
        close, high, low, open_, volume = curr
        prev_close = prev[0]
        if prev_close <= 0 or close <= 0:
            return

        # RS EMA
        rs_val = self._calc_rs_single(high, low, open_, close)
        alpha_rs = 2.0 / (self._rs_period + 1)
        prev_rs = self._rs_ema.get(symbol, rs_val)
        new_rs = alpha_rs * rs_val + (1.0 - alpha_rs) * prev_rs
        self._rs_ema[symbol] = new_rs

        # 基准波动率
        rs_vol_now = math.sqrt(max(0.0, new_rs))
        if symbol not in self._baseline_median:
            self._baseline_median[symbol] = _RollingMedian(maxlen=200)
        self._baseline_median[symbol].push(rs_vol_now)

        # Huber-CUSUM
        rs_vol = math.sqrt(max(0.0, new_rs))
        if rs_vol > 1e-10:
            z_raw = (close - prev_close) / prev_close / rs_vol
            z = math.copysign(min(abs(z_raw), self._cusum_z_clip), z_raw)
            drift = self._cusum_drift
            if self._cusum_drift_adaptive:
                bl = self._baseline_median.get(symbol)
                bl_val = bl.value if (bl and len(bl) >= 5) else None
                if bl_val and bl_val > 0:
                    drift = self._cusum_drift * max(0.5, min(2.0, rs_vol / bl_val))
            s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))
            self._cusum_state[symbol] = (max(0.0, s_pos + z - drift),
                                         max(0.0, s_neg - z - drift))

        # BTC stress 历史
        if symbol not in self._btc_stress_history:
            self._btc_stress_history[symbol] = deque(maxlen=max(10, self._btc_stress_lag + 2))
        sp, sn = self._cusum_state.get(symbol, (0.0, 0.0))
        self._btc_stress_history[symbol].append(max(sp, sn))

        # Volume EMA
        if volume > 0:
            alpha_v = 2.0 / (self._vol_ema_period + 1)
            self._vol_ema[symbol] = alpha_v * volume + (1.0 - alpha_v) * self._vol_ema.get(symbol, volume)

        # BOCPD
        if self._bocpd_enabled and prev_close > 0:
            log_ret = math.log(close / prev_close)
            if symbol not in self._bocpd_state:
                self._bocpd_state[symbol] = _BOCPD(
                    hazard_rate=self._bocpd_hazard, max_run=200,
                    alpha0=3.0, drift_threshold=self._bocpd_drift_thresh)
            self._bocpd_state[symbol].update(log_ret)

        # 1h 聚合
        if self._mtf_enabled:
            if symbol not in self._hour_accum:
                self._hour_accum[symbol] = _HourBarAccum()
            accum = self._hour_accum[symbol]
            o = open_ if open_ > 0 else close
            h = high if high > 0 else close
            l = low if low > 0 else close
            if accum.add(o, h, l, close, volume):
                if symbol not in self._buffers_1h:
                    self._buffers_1h[symbol] = deque(maxlen=30)
                self._buffers_1h[symbol].append(accum.as_tuple())
                accum.reset()

    @staticmethod
    def _calc_rs_single(high: float, low: float, open_: float, close: float) -> float:
        if high > 0 and low > 0 and open_ > 0 and high >= low:
            try:
                rs = (math.log(high / close) * math.log(high / open_)
                      + math.log(low / close) * math.log(low / open_))
                return max(0.0, rs)
            except (ValueError, ZeroDivisionError):
                pass
        if open_ > 0:
            try:
                return math.log(close / open_) ** 2
            except (ValueError, ZeroDivisionError):
                pass
        return 0.0

3.3 StrategyParams 新增字段

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # 动量过滤器
    momentum_filter_enabled: bool = True
    # Layer 0
    momentum_bocpd_enabled: bool = True
    momentum_bocpd_hazard_rate: float = 0.01
    momentum_bocpd_trend_threshold: float = 0.75
    momentum_bocpd_drift_threshold: float = 0.0002
    momentum_hurst_enabled: bool = True
    momentum_hurst_lookback: int = 60
    momentum_hurst_threshold: float = 0.60
    momentum_hurst_r2_threshold: float = 0.85
    # Layer 1
    momentum_sustained_lookback: int = 30
    momentum_sustained_base_threshold: float = 0.008
    momentum_er_threshold_long: float = 0.60
    momentum_er_threshold_short: float = 0.50
    momentum_mtf_enabled: bool = True
    momentum_mtf_soft_lower: float = 0.30
    momentum_mtf_soft_upper: float = 0.70
    momentum_mtf_max_reduction: float = 0.20
    momentum_mtf_lookback: int = 6
    # Layer 2
    momentum_rs_period: int = 10
    momentum_cusum_drift: float = 0.5
    momentum_cusum_drift_adaptive: bool = True
    momentum_cusum_z_clip: float = 5.0
    momentum_cusum_threshold_spike_up: float = 3.5
    momentum_cusum_threshold_spike_down: float = 2.5
    momentum_volume_confirm_ratio: float = 1.5
    momentum_volume_ema_period: int = 20
    momentum_vwpm_window: int = 5
    momentum_vwpm_confirm_ratio: float = 0.8
    momentum_market_ref_symbol: str = "BTC"
    momentum_btc_stress_threshold: float = 2.0
    momentum_btc_stress_factor_min: float = 0.7
    momentum_btc_decay_rate: float = 0.3
    momentum_btc_stress_lag: int = 3
    # Layer 3
    momentum_spread_lookback: int = 40
    momentum_spread_er_threshold: float = 0.45
    momentum_spread_net_threshold: float = 1.5
    momentum_spread_adf_pvalue: float = 0.05
    momentum_spread_kpss_pvalue: float = 0.05
    momentum_spread_use_pp: bool = True
    momentum_spread_half_life_max: float = 36.0
    momentum_spread_hurwicz_correction: bool = True
    # 跨层
    momentum_cross_layer_enabled: bool = True
    momentum_cross_layer_factor: float = 0.4

3.4 strategy.py 集成

_check_entry 中 Layer 3 仲裁调用:

spread_trend, spread_reason = self._momentum_filter.check_spread(
    z4h_hist, direction, max_hold_hours=params.max_hold_hours,
)

4. 数据流

WebSocket K线(OHLCV)
  -> realtime_kline_service_base._trigger_strategy_if_ready()
    -> 提取 alt/base OHLCV
    -> TradingOrchestrator.process_analysis(alt_ohlcv, base_ohlcv)
      -> AdaptiveBollingerStrategy.process_tick(alt_ohlcv, base_ohlcv)
        -> MomentumFilter.update()   O(1) 在线更新
        -> _check_entry()
          -> 步骤 1-4: 冷却期/突破/持仓/z4h
          -> 步骤 4.5: 方向判断
          -> 步骤 4.6: 四层动量过滤
          -> 步骤 5: EntrySignal

线程安全:MomentumFilter 由 AdaptiveBollingerStrategy._lock 统一保护。


5. 参数配置

5.1 关键参数调优

Layer 0 BOCPD:

参数 默认 范围 说明
bocpd_hazard_rate 0.01 0.005-0.05 变点先验概率。0.01 = 期望机制长度 100 根(8.3h)
bocpd_trend_threshold 0.75 0.60-0.90 P(trending) > 此值 -> 硬拦截
bocpd_drift_threshold 0.0002 0.0001-0.0005 有经济意义的最小漂移(5min 收益率)

Layer 1 MTF:

参数 默认 范围 说明
mtf_soft_lower 0.30 0.20-0.40 低于此值无 MTF 修正
mtf_soft_upper 0.70 0.55-0.80 高于此值满 MTF 修正
mtf_max_reduction 0.20 0.10-0.30 ER 阈值最大降低比例

Layer 2 CUSUM + BTC:

参数 默认 范围 说明
cusum_z_clip 5.0 3.0-8.0 Huber 截断点。越小越鲁棒但可能削弱信号
btc_stress_lag 3 1-10 BTC stress 滞后根数(x5min)。3 = 15 分钟因果延迟
btc_decay_rate 0.3 0.1-0.5 BTC 指数衰减率

Layer 3 Spread:

参数 默认 范围 说明
spread_lookback 40 20-80 Spread 回望根数。40 = 200 分钟
spread_half_life_max 36.0 12-48 最大允许半衰期(h)。建议 max_hold_hours/2

5.2 环境变量

TRADING_MOMENTUM_BOCPD_ENABLED=true
TRADING_MOMENTUM_BOCPD_HAZARD_RATE=0.01
TRADING_MOMENTUM_BOCPD_TREND_THRESHOLD=0.75
TRADING_MOMENTUM_BOCPD_DRIFT_THRESHOLD=0.0002
TRADING_MOMENTUM_HURST_ENABLED=true
TRADING_MOMENTUM_HURST_LOOKBACK=60
TRADING_MOMENTUM_HURST_THRESHOLD=0.60
TRADING_MOMENTUM_HURST_R2_THRESHOLD=0.85
TRADING_MOMENTUM_SUSTAINED_LOOKBACK=30
TRADING_MOMENTUM_SUSTAINED_BASE_THRESHOLD=0.008
TRADING_MOMENTUM_ER_THRESHOLD_LONG=0.60
TRADING_MOMENTUM_ER_THRESHOLD_SHORT=0.50
TRADING_MOMENTUM_MTF_ENABLED=true
TRADING_MOMENTUM_MTF_SOFT_LOWER=0.30
TRADING_MOMENTUM_MTF_SOFT_UPPER=0.70
TRADING_MOMENTUM_MTF_MAX_REDUCTION=0.20
TRADING_MOMENTUM_MTF_LOOKBACK=6
TRADING_MOMENTUM_RS_PERIOD=10
TRADING_MOMENTUM_CUSUM_DRIFT=0.5
TRADING_MOMENTUM_CUSUM_DRIFT_ADAPTIVE=true
TRADING_MOMENTUM_CUSUM_Z_CLIP=5.0
TRADING_MOMENTUM_CUSUM_THRESHOLD_SPIKE_UP=3.5
TRADING_MOMENTUM_CUSUM_THRESHOLD_SPIKE_DOWN=2.5
TRADING_MOMENTUM_VOLUME_CONFIRM_RATIO=1.5
TRADING_MOMENTUM_VWPM_WINDOW=5
TRADING_MOMENTUM_VWPM_CONFIRM_RATIO=0.8
TRADING_MOMENTUM_MARKET_REF_SYMBOL=BTC
TRADING_MOMENTUM_BTC_STRESS_THRESHOLD=2.0
TRADING_MOMENTUM_BTC_STRESS_FACTOR_MIN=0.7
TRADING_MOMENTUM_BTC_DECAY_RATE=0.3
TRADING_MOMENTUM_BTC_STRESS_LAG=3
TRADING_MOMENTUM_SPREAD_LOOKBACK=40
TRADING_MOMENTUM_SPREAD_ER_THRESHOLD=0.45
TRADING_MOMENTUM_SPREAD_NET_THRESHOLD=1.5
TRADING_MOMENTUM_SPREAD_ADF_PVALUE=0.05
TRADING_MOMENTUM_SPREAD_KPSS_PVALUE=0.05
TRADING_MOMENTUM_SPREAD_USE_PP=true
TRADING_MOMENTUM_SPREAD_HALF_LIFE_MAX=36.0
TRADING_MOMENTUM_SPREAD_HURWICZ_CORRECTION=true
TRADING_MOMENTUM_CROSS_LAYER_ENABLED=true
TRADING_MOMENTUM_CROSS_LAYER_FACTOR=0.4

6. 风险与局限性

局限 缓解
BOCPD 预热期 20 根(100min) DFA-2 自动后备;启动时 DB 回填
Student-t CDF 连分数近似 精度 >0.001(df>=2);可用 scipy 验证
半衰期小样本敏感 Hurwicz 校正 + spread_lookback=40 缓解
PP 需 arch 库 自动降级 ADF
BTC lag 最优值未知 默认 3 (15min);可通过 Granger 因果检验确定
38 个参数过拟合风险 全有默认值;后续引入 risk_attitude 元参数降维

不适用场景: 极低流动性资产、HIP-3 稀疏资产、max_hold_hours < 4h 的极短持仓策略。

后续方向

优先级 方向
P1 元参数降维:risk_attitude in [0,1] 驱动 38 参数联合缩放
P1 OFI 集成(L2 book -> 替代 VWPM)
P1 BOCPD hazard_rate Beta 后验自适应
P2 Transfer Entropy BTC 因果因子
P2 Kalman Filter 动态对冲比率
P2 对数似然比跨层统一框架

附录:学术参考

算法 论文
BOCPD Adams & MacKay (2007) arXiv:0710.3742
NIG 后验 Murphy (2007) Conjugate Bayesian analysis of the Gaussian distribution
DFA-2 Kantelhardt et al. (2002); Peng et al. (1994)
Kaufman ER Kaufman (1995) Smarter Trading
CUSUM Page (1954) Biometrika; Lorden (1971) Ann. Math. Statist.
CUSUM 自适应 drift Hawkins & Olwell (1998) Cumulative Sum Charts
Huber 估计器 Huber (1964) Ann. Math. Statist.
Rogers-Satchell Rogers & Satchell (1991) Ann. Appl. Prob.
OFI Cont, Kukanov & Stoikov (2014) J. Financial Econometrics
Phillips-Perron Phillips & Perron (1988) Biometrika
KPSS Kwiatkowski et al. (1992) J. Econometrics
OU 半衰期 Vidyamurthy (2004) Pairs Trading
Hurwicz 偏差 Hurwicz (1950); Kendall (1954) Biometrika
半衰期偏差校正 Krauss (2017) J. Economic Surveys
BTC-alt 传导 Makarov & Schoar (2020) J. Financial Economics
时间序列动量 Moskowitz, Ooi & Pedersen (2012) J. Financial Economics

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG