开仓动量过滤器设计方案2

开仓动量过滤器设计方案

版本:v2.0
日期:2026-03-06
适用系统:Hyperliquid 量化交易系统(Adaptive Bollinger Z-Score 配对策略)


目录

  1. 背景与问题定义
  2. 算法选择与学术依据
  3. 算法具体实现
  4. 融入当前交易体系的方案
  5. 与当前交易风格的契合度分析
  6. 与四项开单约束的对应关系
  7. 参数配置指南
  8. 风险与局限性

1. 背景与问题定义

1.1 交易系统现状

当前系统基于 Adaptive Bollinger Z-Score 配对策略,核心逻辑是协整对的 spread 均值回归:

  • adaptive_z 突破 adaptive_threshold 时产生入场信号
  • adaptive_z 回归至 entry_adaptive_z * reversion_factor 时平仓
  • 使用 5 分钟 K 线,最大持仓时间控制在 72 小时以内(max_hold_hours=72.0
  • 策略引擎 _check_entry() 已有冷却期、突破检测、持仓检查、z4h 绝对值过滤四层前置过滤

1.2 待解决的问题

均值回归策略在以下场景存在明显风险:

场景 问题 后果
单边持续上涨 N 根 K 线后 追涨做多(跟趋势入场) 趋势已过度延伸,极易被反转砸穿
单边持续下跌 N 根 K 线后 追跌做空(跟趋势入场) 跌幅充分释放,反弹风险极高
短时间内迅速暴涨 做空对抗强动量 被轧空,止损代价极大
短时间内迅速暴跌 做多接飞刀 继续下杀,止损代价极大

1.3 四项约束目标

约束 1:连续下跌 → 不追跌(不做空)
约束 2:连续上涨 → 不追涨(不做多)
约束 3:迅速暴涨 → 不做空
约束 4:迅速暴跌 → 不做多

1.4 过滤维度

为全面覆盖配对交易的风险,过滤器需要在三个维度同时工作:

维度 检查对象 原因
Alt 腿(单腿) alt symbol 的价格动量 直接持仓标的的风险
Base 腿(单腿) base symbol 的价格动量 配对交易中 base 腿方向相反,同样承担动量风险
Spread(配对层) 两腿价差的趋势性 单腿动量不等于 spread 动量;两腿同涨但 spread 稳定时不应误杀

2. 算法选择与学术依据

2.1 约束 1&2:持续单边行情过滤

候选算法深度对比

算法 核心思路 优点 缺点 1-5min 适用性
连续 K 线计数 计数涨/跌根数 直观 噪音大,交替涨跌频繁,漏判 不适合
纯净位移 (Net Displacement) 只看起点→终点 对噪音免疫 路径盲:V 形反转误杀,震荡漂移漏判 一般
ADX + DMI 趋势强度指标 学术验证充分 滞后较大(14期默认),参数较多 一般
EMA 方向 指数均线斜率 平滑 滞后大,无法区分趋势质量 一般
Kaufman ER + 净位移(联合) 效率比率衡量趋势质量 + 净位移衡量幅度 同时捕捉方向和路径质量,消除 V 形误杀 最适合

选择:Kaufman 效率比率 + 自适应净位移(ER-Weighted Adaptive Net Displacement)

学术背景:

Perry Kaufman 在 Smarter Trading (1995) 中提出 Efficiency Ratio,后被广泛应用于 KAMA(Kaufman Adaptive Moving Average)中。ER 是唯一同时衡量方向性和路径效率的单一指标,复杂度 O(N),无需预热统计分布。

核心公式:

N = sustained_lookback(回望根数,默认 30 根 5min K 线 = 150 分钟)

# Kaufman Efficiency Ratio:方向距离 / 路径总长度
direction_distance = abs(close[-1] - close[-N])
path_length = sum(abs(close[i] - close[i-1]) for i in [-N+1, ..., -1])
ER = direction_distance / path_length    # 范围 [0, 1]

# 净位移(带符号)
net_return = (close[-1] - close[-N]) / close[-N]

# 自适应阈值:用 Garman-Klass 波动率标准化
sustained_threshold = base_threshold * (gk_vol / baseline_vol)

# 联合触发条件(必须同时满足两个条件):
约束1 触发:direction == 'short'
            AND net_return <= -sustained_threshold
            AND ER >= er_threshold(路径是干净的趋势,非震荡漂移)

约束2 触发:direction == 'long'
            AND net_return >= +sustained_threshold
            AND ER >= er_threshold

为什么 ER + 净位移联合判断是最优解:

  1. 消除 V 形反转误杀:市场从 100 跌到 96 再反弹到 99.2 时,净位移 = -0.8%(会触发拦截),但 ER ≈ 0.1(路径高度曲折)→ 联合判断正确放行。因为市场已经反转,不是持续趋势。

  2. 消除震荡漂移漏判:市场在 100±0.3% 范围震荡 30 根但恰好净漂移 +0.8% 时,ER ≈ 0.15 → 联合判断正确放行(这不是真正的趋势)。

  3. 精确捕捉真正的持续趋势:100 → 100.3 → 100.6 → ... → 101.2 这种稳步上涨,ER ≈ 0.85 且净位移 = +1.2% → 联合判断正确拦截。

  4. 自适应阈值sustained_threshold 不再是固定的 0.8%,而是随波动率自动缩放。低波动期(亚盘凌晨)门槛自动降低,高波动期(美盘 CPI/FOMC)门槛自动升高,始终保持相同的统计敏感度。

  5. 复杂度 O(N):与纯净位移完全相同,仅多计算一轮差的绝对值累加,零额外成本。

2.2 约束 3&4:急涨急跌过滤

候选算法深度对比

算法 核心思路 优点 缺点 1-5min 适用性
固定百分比阈值 单根 K 线涨幅 > X% 最简单 波动率变化大时误杀/漏判 不适合
SMA ATR 倍数 单根 vs 简单平均 ATR 自适应 SMA 响应慢,路径效率不如 EMA,只查单根 一般
Z-Score of Returns 滚动分布标准化 自适应 需 50+ 根历史才稳定 一般
CUSUM + Garman-Klass + 量价确认 累积和检测 + 高效波动率 + 成交量 捕捉分散式急动,波动率估计最精确,区分真假突破 略复杂 最适合

选择:三层复合急动检测器(CUSUM + Garman-Klass + Volume Confirmation)

学术背景:

  • CUSUM(累积和控制图):E.S. Page (1954) 提出,是序列变化检测理论中最优的在线检测算法(Lorden 准则下一阶渐近最优)。被广泛用于工业质量控制、金融风控、网络入侵检测。
  • Garman-Klass 波动率:Garman & Klass (1980),利用 OHLC 四个价格估计波动率,理论效率是 close-to-close 的 7.4 倍——意味着同样 10 根 K 线,GK 估计器的精度等价于 74 根 close-to-close 估计器。
  • Volume Confirmation:Kyle (1985) 信息模型表明,知情交易者的活动会同时体现在价格和成交量中。成交量放大是区分"真突破"和"低流动性跳价"的关键信号。

核心公式:

# ═══ 第一层:Garman-Klass 波动率估计(替代 SMA ATR) ═══

GK(i) = 0.5 * ln(H(i)/L(i))^2 - (2*ln(2) - 1) * ln(C(i)/O(i))^2
gk_var = EMA(GK, period=10, alpha=2/(10+1))     # 指数加权,响应灵敏
gk_vol = sqrt(gk_var)                            # Garman-Klass 波动率

# ═══ 第二层:CUSUM 累积和检测(替代单根 K 线检查) ═══

# 标准化收益率
ret(i) = (close(i) - close(i-1)) / close(i-1)
z(i)   = ret(i) / gk_vol          # 用 GK 波动率标准化

# 双侧 CUSUM(在线更新,O(1) 每步)
S_pos(i) = max(0, S_pos(i-1) + z(i) - drift)    # 检测正向急动
S_neg(i) = max(0, S_neg(i-1) - z(i) - drift)    # 检测负向急动

# drift = 0.5(标准值),cusum_threshold = 3.0(灵敏度参数)
spike_up   = S_pos >= cusum_threshold
spike_down = S_neg >= cusum_threshold

# ═══ 第三层:量价确认(可选增强) ═══

volume_ratio = current_volume / EMA(volume, period=20)
volume_confirmed = volume_ratio >= 1.5  # 成交量放大 50% 以上

# ═══ 最终判断 ═══

约束3 触发:direction == 'short'
            AND spike_up == True
            AND volume_confirmed == True   # 量价共振 → 真正的暴涨

约束4 触发:direction == 'long'
            AND spike_down == True
            AND volume_confirmed == True   # 量价共振 → 真正的暴跌

为什么 CUSUM 优于单根 ATR 倍数:

  1. 捕捉分散式急动:市场连续 3 根各涨 0.8%(每根都不超 2.5×ATR),但 CUSUM 会累积 S_pos 到阈值→检测到。单根检测会完全遗漏这种"温水煮青蛙"式急动。

  2. 自动复位:CUSUM 在市场恢复平静时自动衰减(max(0, ...)),无需手动管理检测窗口。

  3. O(1) 在线更新:每根 K 线只需一次加法和比较,比遍历 lookback 窗口更高效。

  4. 理论最优性:Lorden (1971) 证明 CUSUM 在平均检测延迟最小的意义下是最优的序列检测算法。

为什么 Garman-Klass 优于 SMA ATR:

  1. 7.4 倍估计效率:用 OHLC 四个价位,同样 10 根 K 线能达到 74 根 close-to-close 的精度。
  2. 更短有效预热:实际上 5-6 根 K 线就能得到可用的 GK 估计(vs ATR 需要 10+ 根)。
  3. EMA 加权:对近期波动变化响应更快,适合加密市场快速切换波动率状态。

为什么需要量价确认:

  1. 区分真假突破:低流动性币种可能因单笔大单导致价格跳动 2%+,但成交量极低→假信号,不应拦截。
  2. 减少误杀:加入量价确认后,只有"价格急动 + 成交量放大"的真正强动量才会被拦截。
  3. 数据已有:系统 kline 数据中已包含 volumevolume_usd 字段(_parse_kline() 已解析),只需透传至策略层。

2.3 Spread 层面趋势检测(新增维度)

学术背景:

配对交易的核心是 spread(价差)的均值回归,而非单腿价格。Vidyamurthy (2004) Pairs Trading 和 Gatev et al. (2006) 的经典研究都强调:spread 的行为才是配对交易成败的关键。单腿动量不等于 spread 动量——两腿同向涨跌时 spread 可能完全稳定。

本系统的天然优势adaptive_z 本身就是 spread 的标准化度量,可以直接复用 z4h 历史序列计算 spread 层面的趋势性,零额外数据需求

核心公式:

# 复用 z4h 序列作为 spread 代理
# z4h_buffer 已在 SymbolBaseline 中维护,直接可用

z4h_series = [z4h[-N], z4h[-N+1], ..., z4h[-1]]    # 最近 N 根 z4h 值

# Spread 层面的 Kaufman ER
spread_direction = abs(z4h[-1] - z4h[-N])
spread_path = sum(abs(z4h[i] - z4h[i-1]) for i in range(-N+1, ..., -1))
spread_er = spread_direction / spread_path if spread_path > 0 else 0

# Spread 净位移
spread_net = z4h[-1] - z4h[-N]    # z4h 已标准化,无需百分比

# Spread 趋势拦截条件
约束S1:direction == 'short' AND spread_net <= -spread_threshold AND spread_er >= er_threshold
       → spread 持续走负(alt 持续相对弱于 base),不应继续做空 spread
约束S2:direction == 'long'  AND spread_net >= +spread_threshold AND spread_er >= er_threshold
       → spread 持续走正(alt 持续相对强于 base),不应继续做多 spread

核心价值:当两腿同时上涨 2%(单腿过滤器会拦截),但 spread 完全稳定(spread ER ≈ 0)时,spread 过滤器正确判断"spread 无趋势",不拦截→ 避免误杀均值回归信号。三层维度中,spread 过滤作为最终仲裁:

单腿过滤拦截 + Spread 过滤放行 → 最终放行(spread 稳定,单腿同向运动不影响配对)
单腿过滤放行 + Spread 过滤拦截 → 最终拦截(spread 自身呈趋势,均值回归不可靠)

2.4 算法组合架构

三层过滤器按逻辑层次组合:

输入信号(direction, alt_symbol, base_symbol)
    │
    ├──▶ Layer 1: 单腿持续趋势过滤(ER + 自适应净位移)
    │      ├── alt 腿: check(alt_symbol, direction)
    │      └── base 腿: check(base_symbol, opposite_direction)
    │      任一腿被拦截 → 进入 Layer 3 仲裁
    │
    ├──▶ Layer 2: 急动检测(CUSUM + Garman-Klass + 量价确认)
    │      ├── alt 腿: check(alt_symbol, direction)
    │      └── base 腿: check(base_symbol, opposite_direction)
    │      任一腿被拦截 → 直接拒绝(急动无需仲裁,风险不可对冲)
    │
    └──▶ Layer 3: Spread 趋势仲裁(ER + z4h 净位移)
           仅在 Layer 1 拦截时启动
           Spread 无趋势(ER < threshold)→ 推翻 Layer 1 拦截,放行
           Spread 有趋势(ER >= threshold)→ 维持拦截
    │
    ▼
  最终决策:允许 / 拒绝入场

设计理念

  • Layer 1(单腿持续趋势)是软拦截:可以被 Spread 仲裁推翻。因为配对交易中单腿趋势不一定意味着 spread 趋势。
  • Layer 2(急动检测)是硬拦截:不可推翻。因为急动通常伴随极端波动和滑点,即使 spread 方向有利,执行风险也极高。
  • Layer 3(Spread 仲裁)是误杀修正器:只在 Layer 1 拦截时启动,防止两腿同向运动时的误杀。

3. 算法具体实现

3.1 模块位置

src/trading/
  momentum_filter.py    ← 新建(独立模块,不耦合策略逻辑)
  strategy.py           ← 修改 _check_entry(),注入并调用 filter
  config.py             ← 修改 StrategyParams,新增过滤器参数
  orchestrator.py       ← 修改 process_analysis(),透传 OHLCV + volume
src/services/
  realtime_kline_service_base.py  ← 修改 _trigger_strategy_if_ready(),提取双腿 OHLCV + volume

3.2 MomentumFilter 完整代码

# src/trading/momentum_filter.py
"""
开仓动量过滤器 v2.0

三层过滤架构:
  Layer 1: Kaufman ER + 自适应净位移(持续趋势检测,软拦截)
  Layer 2: CUSUM + Garman-Klass + 量价确认(急动检测,硬拦截)
  Layer 3: Spread ER 仲裁(误杀修正,仅在 Layer 1 拦截时启动)

四项开单约束:
  约束1:连续下跌不追跌(不做空)  → ER高 + 净位移 < -threshold
  约束2:连续上涨不追涨(不做多)  → ER高 + 净位移 > +threshold
  约束3:迅速暴涨不做空            → CUSUM正向突破 + 量价确认
  约束4:迅速暴跌不做多            → CUSUM负向突破 + 量价确认

设计原则:
  - 单腿 + Spread 双维度过滤,与配对逻辑解耦
  - 无外部依赖,仅需 OHLCV 序列
  - 数据不足时默认放行(安全降级)
"""

import math
from collections import deque
from datetime import datetime


class MomentumFilter:

    def __init__(
        self,
        # ── 持续趋势过滤参数(Layer 1) ──
        sustained_lookback: int = 30,
        sustained_base_threshold: float = 0.008,
        er_threshold: float = 0.55,
        # ── 急动检测参数(Layer 2) ──
        gk_period: int = 10,
        cusum_drift: float = 0.5,
        cusum_threshold: float = 3.0,
        volume_confirm_ratio: float = 1.5,
        volume_ema_period: int = 20,
        # ── Spread 仲裁参数(Layer 3) ──
        spread_lookback: int = 20,
        spread_er_threshold: float = 0.45,
        spread_net_threshold: float = 1.5,
        # ── 通用 ──
        enabled: bool = True,
    ):
        """
        Args:
            sustained_lookback:        持续趋势回望根数(默认30根5min = 150分钟)
            sustained_base_threshold:  净位移基准阈值(自适应缩放的基准值)
            er_threshold:              Kaufman ER 阈值(>= 此值才视为趋势)
            gk_period:                 Garman-Klass 波动率计算周期
            cusum_drift:               CUSUM 漂移参数(标准值 0.5)
            cusum_threshold:           CUSUM 触发阈值
            volume_confirm_ratio:      成交量确认倍数(当前量 / 均量 >= 此值)
            volume_ema_period:         成交量 EMA 周期
            spread_lookback:           Spread 趋势回望根数
            spread_er_threshold:       Spread ER 阈值
            spread_net_threshold:      Spread 净位移阈值(z4h 标准化单位)
            enabled:                   总开关
        """
        self._enabled = enabled

        # Layer 1
        self._sustained_n = max(5, sustained_lookback)
        self._sustained_base_thresh = sustained_base_threshold
        self._er_thresh = er_threshold

        # Layer 2
        self._gk_period = gk_period
        self._cusum_drift = cusum_drift
        self._cusum_thresh = cusum_threshold
        self._vol_confirm_ratio = volume_confirm_ratio
        self._vol_ema_period = volume_ema_period

        # Layer 3
        self._spread_lookback = spread_lookback
        self._spread_er_thresh = spread_er_threshold
        self._spread_net_thresh = spread_net_threshold

        # 数据缓冲区
        buf_size = max(sustained_lookback, gk_period + 10, volume_ema_period + 5) + 10
        self._buf_size = buf_size
        # symbol → deque of (close, high, low, open, volume)
        self._buffers: dict[str, deque] = {}
        # symbol → 最后一次更新的 kline_time(同一根 K 线不重复追加)
        self._last_kline_time: dict[str, datetime] = {}
        # symbol → CUSUM 状态 (S_pos, S_neg)
        self._cusum_state: dict[str, tuple[float, float]] = {}
        # symbol → GK 波动率 EMA 状态
        self._gk_ema: dict[str, float] = {}
        # symbol → Volume EMA 状态
        self._vol_ema: dict[str, float] = {}
        # GK 基准波动率(所有 symbol 的滚动中位数,用于自适应阈值)
        self._baseline_vol: float = 0.0
        self._baseline_vol_samples: list[float] = []

    # ------------------------------------------------------------------
    # 公开接口
    # ------------------------------------------------------------------

    def update(
        self, symbol: str, close: float,
        high: float = 0.0, low: float = 0.0,
        open_: float = 0.0, volume: float = 0.0,
        kline_time: datetime | None = None,
    ):
        """
        每根新 K 线收盘时调用。
        high/low/open_ 可选;有则 Garman-Klass 更精确,无则降级为 close-to-close。
        volume 可选;有则启用量价确认,无则跳过量确认(仍用 CUSUM 价格检测)。
        kline_time 用于去重:同一 symbol 在多配对中共享,同一根 K 线只追加一次。
        """
        if kline_time is not None:
            if self._last_kline_time.get(symbol) == kline_time:
                return
            self._last_kline_time[symbol] = kline_time

        if symbol not in self._buffers:
            self._buffers[symbol] = deque(maxlen=self._buf_size)
        self._buffers[symbol].append((close, high, low, open_, volume))

        # 在线更新 CUSUM 和 GK EMA
        buf = self._buffers[symbol]
        if len(buf) >= 2:
            self._online_update(symbol, buf)

    def check(self, symbol: str, direction: str) -> tuple[bool, str]:
        """
        Layer 1 + Layer 2 检查(单腿维度)。

        Args:
            symbol:    目标 coin(单腿,非配对)
            direction: 'long' 或 'short'

        Returns:
            (allowed: bool, reason: str)
            allowed=False 表示该方向被阻止,reason 为拒绝原因
        """
        if not self._enabled:
            return True, ""

        buf = self._buffers.get(symbol)
        if buf is None or len(buf) < self._gk_period + 2:
            return True, ""  # 数据不足,安全放行

        # Layer 2(硬拦截):急动检测 — 优先检查,因为急动风险更高
        ok, reason = self._check_spike(symbol, direction)
        if not ok:
            return False, reason

        # Layer 1(软拦截):持续趋势
        if len(buf) >= self._sustained_n + 2:
            ok, reason = self._check_sustained(symbol, direction)
            if not ok:
                return False, f"SOFT|{reason}"  # 标记为软拦截,供仲裁使用

        return True, ""

    def check_spread(self, z4h_history: list[float], direction: str) -> tuple[bool, str]:
        """
        Layer 3: Spread 趋势仲裁。
        仅在 Layer 1 软拦截时调用。

        Args:
            z4h_history: 最近 N 个 z4h 值(按时间正序)
            direction:   配对方向 'long' 或 'short'

        Returns:
            (has_trend: bool, reason: str)
            has_trend=True  → Spread 有趋势 → 维持 Layer 1 拦截
            has_trend=False → Spread 无趋势 → 推翻 Layer 1 拦截,放行
        """
        n = self._spread_lookback
        if len(z4h_history) < n + 1:
            return False, ""  # 数据不足,默认无趋势 → 放行

        series = z4h_history[-(n + 1):]

        # Spread ER
        spread_dir = abs(series[-1] - series[0])
        spread_path = sum(abs(series[i] - series[i - 1]) for i in range(1, len(series)))
        spread_er = spread_dir / spread_path if spread_path > 0 else 0

        # Spread 净位移(z4h 已标准化)
        spread_net = series[-1] - series[0]

        # 判断 Spread 是否有持续趋势
        if spread_er >= self._spread_er_thresh:
            if direction == 'short' and spread_net <= -self._spread_net_thresh:
                return True, (
                    f"Spread趋势确认: z4h净位移={spread_net:+.3f}"
                    f" ER={spread_er:.2f}>={self._spread_er_thresh}"
                )
            if direction == 'long' and spread_net >= self._spread_net_thresh:
                return True, (
                    f"Spread趋势确认: z4h净位移={spread_net:+.3f}"
                    f" ER={spread_er:.2f}>={self._spread_er_thresh}"
                )

        return False, ""

    def ready(self, symbol: str) -> bool:
        """该 symbol 是否已积累足够数据"""
        buf = self._buffers.get(symbol)
        if buf is None:
            return False
        return len(buf) >= self._gk_period + 2

    # ------------------------------------------------------------------
    # Layer 1: 持续趋势检测
    # ------------------------------------------------------------------

    def _check_sustained(self, symbol: str, direction: str) -> tuple[bool, str]:
        """
        Kaufman ER + 自适应净位移。
        ER >= threshold AND |net_return| >= adaptive_threshold → 持续趋势
        """
        buf = self._buffers[symbol]
        n = self._sustained_n
        data = list(buf)
        closes = [d[0] for d in data]

        if len(closes) < n + 1:
            return True, ""

        recent = closes[-(n + 1):]
        ref = recent[0]
        if ref <= 0:
            return True, ""

        # 净位移
        net_return = (recent[-1] - ref) / ref

        # Kaufman Efficiency Ratio
        direction_dist = abs(recent[-1] - ref)
        path_length = sum(abs(recent[i] - recent[i - 1]) for i in range(1, len(recent)))
        er = direction_dist / path_length if path_length > 0 else 0

        # 自适应阈值
        gk_vol = self._gk_ema.get(symbol, 0)
        if gk_vol > 0 and self._baseline_vol > 0:
            adaptive_thresh = self._sustained_base_thresh * (gk_vol / self._baseline_vol)
            # 限制缩放范围:0.3x ~ 3.0x
            adaptive_thresh = max(
                self._sustained_base_thresh * 0.3,
                min(adaptive_thresh, self._sustained_base_thresh * 3.0)
            )
        else:
            adaptive_thresh = self._sustained_base_thresh

        if er < self._er_thresh:
            return True, ""  # 路径不够"趋势化",放行

        if direction == 'short' and net_return <= -adaptive_thresh:
            return False, (
                f"约束1-不追跌: 过去{n}根净跌幅={net_return:.2%}"
                f" 阈值=-{adaptive_thresh:.2%} ER={er:.2f}"
            )
        if direction == 'long' and net_return >= adaptive_thresh:
            return False, (
                f"约束2-不追涨: 过去{n}根净涨幅={net_return:.2%}"
                f" 阈值=+{adaptive_thresh:.2%} ER={er:.2f}"
            )
        return True, ""

    # ------------------------------------------------------------------
    # Layer 2: 急动检测(CUSUM + Garman-Klass + 量价确认)
    # ------------------------------------------------------------------

    def _check_spike(self, symbol: str, direction: str) -> tuple[bool, str]:
        """
        CUSUM 双侧检测 + 量价确认。
        """
        s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))

        # 量价确认
        vol_ema = self._vol_ema.get(symbol, 0)
        buf = self._buffers[symbol]
        current_vol = buf[-1][4] if buf else 0  # volume 字段

        if vol_ema > 0 and current_vol > 0:
            vol_ratio = current_vol / vol_ema
            vol_confirmed = vol_ratio >= self._vol_confirm_ratio
        else:
            # 无成交量数据时跳过量确认,仅依赖 CUSUM
            vol_confirmed = True
            vol_ratio = 0.0

        gk_vol = self._gk_ema.get(symbol, 0)

        if direction == 'short' and s_pos >= self._cusum_thresh and vol_confirmed:
            vol_str = f" 量比={vol_ratio:.1f}" if vol_ratio > 0 else ""
            return False, (
                f"约束3-暴涨不做空: CUSUM+={s_pos:.2f}"
                f">={self._cusum_thresh} GK_vol={gk_vol:.4f}{vol_str}"
            )

        if direction == 'long' and s_neg >= self._cusum_thresh and vol_confirmed:
            vol_str = f" 量比={vol_ratio:.1f}" if vol_ratio > 0 else ""
            return False, (
                f"约束4-暴跌不做多: CUSUM-={s_neg:.2f}"
                f">={self._cusum_thresh} GK_vol={gk_vol:.4f}{vol_str}"
            )

        return True, ""

    # ------------------------------------------------------------------
    # 在线更新(每根 K 线调用一次)
    # ------------------------------------------------------------------

    def _online_update(self, symbol: str, buf: deque):
        """
        O(1) 在线更新 CUSUM、GK 波动率 EMA、Volume EMA。
        """
        curr = buf[-1]    # (close, high, low, open, volume)
        prev = buf[-2]
        close, high, low, open_, volume = curr
        prev_close = prev[0]

        if prev_close <= 0 or close <= 0:
            return

        # ── Garman-Klass 波动率 ──
        gk_val = self._calc_gk_single(close, high, low, open_, prev_close)
        alpha_gk = 2.0 / (self._gk_period + 1)
        prev_gk = self._gk_ema.get(symbol, gk_val)
        new_gk = alpha_gk * gk_val + (1 - alpha_gk) * prev_gk
        self._gk_ema[symbol] = new_gk

        # 更新基准波动率(所有 symbol 的滚动中位数)
        self._baseline_vol_samples.append(math.sqrt(max(0, new_gk)))
        if len(self._baseline_vol_samples) > 500:
            self._baseline_vol_samples = self._baseline_vol_samples[-500:]
        if len(self._baseline_vol_samples) >= 10:
            sorted_samples = sorted(self._baseline_vol_samples)
            self._baseline_vol = sorted_samples[len(sorted_samples) // 2]

        # ── CUSUM 更新 ──
        gk_vol = math.sqrt(max(0, new_gk))
        if gk_vol > 1e-10:
            ret = (close - prev_close) / prev_close
            z = ret / gk_vol

            s_pos, s_neg = self._cusum_state.get(symbol, (0.0, 0.0))
            s_pos = max(0.0, s_pos + z - self._cusum_drift)
            s_neg = max(0.0, s_neg - z - self._cusum_drift)
            self._cusum_state[symbol] = (s_pos, s_neg)

        # ── Volume EMA 更新 ──
        if volume > 0:
            alpha_vol = 2.0 / (self._vol_ema_period + 1)
            prev_vol_ema = self._vol_ema.get(symbol, volume)
            self._vol_ema[symbol] = alpha_vol * volume + (1 - alpha_vol) * prev_vol_ema

    @staticmethod
    def _calc_gk_single(
        close: float, high: float, low: float, open_: float, prev_close: float
    ) -> float:
        """
        计算单根 K 线的 Garman-Klass 波动率估计量。
        有 OHLC 时使用 GK 公式,仅有 close 时降级为 close-to-close 方差。
        """
        if high > 0 and low > 0 and open_ > 0 and high >= low:
            # Garman-Klass (1980) 估计器
            log_hl = math.log(high / low) if low > 0 else 0
            log_co = math.log(close / open_) if open_ > 0 else 0
            gk = 0.5 * log_hl ** 2 - (2 * math.log(2) - 1) * log_co ** 2
            return max(0.0, gk)
        else:
            # 降级:close-to-close 方差
            if prev_close > 0:
                log_ret = math.log(close / prev_close)
                return log_ret ** 2
            return 0.0

3.3 StrategyParams 新增字段

src/trading/config.pyStrategyParams 中新增:

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # ── 动量过滤器参数 ──
    momentum_filter_enabled: bool = True

    # Layer 1: 持续趋势过滤
    momentum_sustained_lookback: int = 30          # 回望根数(30根5min=150分钟)
    momentum_sustained_base_threshold: float = 0.008  # 净位移基准阈值(0.8%)
    momentum_er_threshold: float = 0.55            # Kaufman ER 阈值

    # Layer 2: 急动检测
    momentum_gk_period: int = 10                   # Garman-Klass EMA 周期
    momentum_cusum_drift: float = 0.5              # CUSUM 漂移参数
    momentum_cusum_threshold: float = 3.0          # CUSUM 触发阈值
    momentum_volume_confirm_ratio: float = 1.5     # 量价确认倍数
    momentum_volume_ema_period: int = 20           # 成交量 EMA 周期

    # Layer 3: Spread 仲裁
    momentum_spread_lookback: int = 20             # Spread 趋势回望根数
    momentum_spread_er_threshold: float = 0.45     # Spread ER 阈值
    momentum_spread_net_threshold: float = 1.5     # Spread 净位移阈值(z4h 单位)

3.4 strategy.py 集成改动

AdaptiveBollingerStrategy.__init__() 中初始化:

from src.trading.momentum_filter import MomentumFilter

class AdaptiveBollingerStrategy:
    def __init__(self, config: TradingConfig):
        # ... 现有初始化 ...

        # 动量过滤器(symbol 维度,非配对维度)
        default_p = self._params_for("")
        self._momentum_filter = MomentumFilter(
            sustained_lookback=default_p.momentum_sustained_lookback,
            sustained_base_threshold=default_p.momentum_sustained_base_threshold,
            er_threshold=default_p.momentum_er_threshold,
            gk_period=default_p.momentum_gk_period,
            cusum_drift=default_p.momentum_cusum_drift,
            cusum_threshold=default_p.momentum_cusum_threshold,
            volume_confirm_ratio=default_p.momentum_volume_confirm_ratio,
            volume_ema_period=default_p.momentum_volume_ema_period,
            spread_lookback=default_p.momentum_spread_lookback,
            spread_er_threshold=default_p.momentum_spread_er_threshold,
            spread_net_threshold=default_p.momentum_spread_net_threshold,
            enabled=default_p.momentum_filter_enabled,
        ) if default_p.momentum_filter_enabled else None

process_tick() 中新增 alt_ohlcv / base_ohlcv 参数(含 volume),传入两腿 OHLCV 数据:

def process_tick(
    self,
    symbol: str,
    base_symbol: str,
    z4h: float,
    timestamp: datetime,
    kline_time: datetime | None = None,
    latest_price: float | None = None,
    alt_ohlcv: dict | None = None,     # {'close','high','low','open','volume'}
    base_ohlcv: dict | None = None,    # {'close','high','low','open','volume'}
) -> tuple[EntrySignal | None, ExitSignal | None]:

_process_tick_unlocked() 中,新 K 线时更新动量过滤器(两腿均更新):

    # 新K线时更新动量过滤器(两腿)
    if is_new_candle and self._momentum_filter:
        if alt_ohlcv:
            self._momentum_filter.update(
                symbol=symbol,
                close=alt_ohlcv['close'],
                high=alt_ohlcv.get('high', 0.0),
                low=alt_ohlcv.get('low', 0.0),
                open_=alt_ohlcv.get('open', 0.0),
                volume=alt_ohlcv.get('volume', 0.0),
                kline_time=kline_time,
            )
        if base_ohlcv and base_symbol:
            self._momentum_filter.update(
                symbol=base_symbol,
                close=base_ohlcv['close'],
                high=base_ohlcv.get('high', 0.0),
                low=base_ohlcv.get('low', 0.0),
                open_=base_ohlcv.get('open', 0.0),
                volume=base_ohlcv.get('volume', 0.0),
                kline_time=kline_time,
            )

为什么需要 kline_time 去重? 同一 symbol(如 ETH)可能同时属于多个配对(ETH|BTC、ETH|SOL)。
每个配对的 process_tick 都会被调用,但同一根 K 线只应追加到 buffer 一次。
kline_time 作为去重 key,MomentumFilter.update() 内部自动跳过重复调用。

将原有步骤 5(方向判断)提前到步骤 4 之后统一执行,然后插入步骤 4.5-4.7(三层动量过滤):

def _check_entry(self, key, z4h, adaptive_z, timestamp, current_above, params):
    # ... 步骤 1-4(冷却期、突破检测、持仓检查、z4h 过滤) ...

    # 步骤 4.5:方向判断(从原步骤 5 前移,统一确定方向)
    threshold = params.adaptive_threshold
    if adaptive_z < -threshold:
        direction = 'long'
    elif adaptive_z > threshold:
        direction = 'short'
    else:
        return None

    # 步骤 4.6:动量过滤(三层架构) ── 两腿同时检查
    if self._momentum_filter:
        layer1_blocked = False
        layer1_reason = ""

        # ── Layer 2(硬拦截)+ Layer 1(软拦截)── alt 腿
        if self._momentum_filter.ready(key[0]):
            allowed, reason = self._momentum_filter.check(key[0], direction)
            if not allowed:
                if reason.startswith("SOFT|"):
                    layer1_blocked = True
                    layer1_reason = reason[5:]
                else:
                    # Layer 2 硬拦截,直接拒绝
                    logger.info(f"🚫 动量过滤 | {pair_label} | {reason}")
                    return None

        # ── Layer 2 + Layer 1 ── base 腿
        if key[1] and self._momentum_filter.ready(key[1]):
            opposite = 'short' if direction == 'long' else 'long'
            allowed, reason = self._momentum_filter.check(key[1], opposite)
            if not allowed:
                if reason.startswith("SOFT|"):
                    layer1_blocked = True
                    layer1_reason = reason[5:]
                else:
                    logger.info(f"🚫 动量过滤(对手腿) | {pair_label} | {reason}")
                    return None

        # ── Layer 3: Spread 仲裁(仅在 Layer 1 软拦截时) ──
        if layer1_blocked:
            bl = self._baselines.get(key)
            if bl and hasattr(bl, 'z4h_history') and len(bl.z4h_history) > 0:
                spread_trend, spread_reason = self._momentum_filter.check_spread(
                    list(bl.z4h_history), direction
                )
                if spread_trend:
                    # Spread 也有趋势 → 维持拦截
                    logger.info(
                        f"🚫 动量过滤 | {pair_label} | {layer1_reason}"
                        f" | {spread_reason}"
                    )
                    return None
                else:
                    # Spread 无趋势 → 推翻拦截,放行
                    logger.info(
                        f"✅ 动量仲裁放行 | {pair_label} | 单腿:{layer1_reason}"
                        f" | Spread无趋势,推翻拦截"
                    )
            else:
                # 无 z4h 历史,保守拦截
                logger.info(f"🚫 动量过滤 | {pair_label} | {layer1_reason}")
                return None

    # 步骤 5:产生 EntrySignal(direction 已在步骤 4.5 确定)
    bl = self._baselines.get(key)
    ema_std = f"ema={bl.ema:.4f} std={bl.last_std:.4f}" if bl else "ema=N/A std=N/A"
    abs_az = abs(adaptive_z)
    logger.info(
        f"🔺 入场信号 | {pair_label} {direction} | "
        f"z4h={z4h:+.4f} az={adaptive_z:+.4f} "
        f"({abs_az / threshold * 100:.0f}%thresh) | "
        f"{ema_std}"
    )
    return EntrySignal(direction=direction, z4h=z4h, adaptive_z=adaptive_z)

Layer 1 软拦截 + Layer 3 仲裁机制说明

配对交易中"两腿同涨"是常见场景——如 BTC 涨 2%,ETH 涨 2.5%。
此时 alt 腿(ETH)的 Layer 1 会拦截 long 方向(净涨幅超阈值)。
但 spread(ETH 相对 BTC 的超额收益)可能只变化了 0.5%,完全在正常范围内。
Layer 3 检查 z4h 序列的 ER 和净位移,发现 spread 无持续趋势→推翻 Layer 1 拦截→放行。
这避免了大量误杀,是配对交易动量过滤的核心创新点。


4. 融入当前交易体系的方案

4.1 数据流全景

WebSocket K线推送(OHLCV + volume)
      |
      v
realtime_kline_service_base
      |
      +-- _parse_kline() → kline dict (含 close/high/low/open/volume/volume_usd)
      |                   → kline_buffer → DB 批量写入
      |
      +-- _analyze_and_alert() → _fetch_and_validate_price_data()
      |                          从 DB 查询 alt/base 的 5m/1h/4h K线
      |                          → price_data_cache[('5m','7d')] 含 alt_klines + base_klines
      |
      +-- _trigger_strategy_if_ready()
              |
              +-- 从 price_data_cache 提取最新 alt/base OHLCV + volume
              |     alt_ohlcv = {'close','high','low','open','volume'}  ← alt_klines_5m[0]
              |     base_ohlcv = {'close','high','low','open','volume'} ← base_klines_5m[0]
              |
              v
      TradingOrchestrator.process_analysis(
          ..., alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv    ← 新增透传
      )
              |
              v
      AdaptiveBollingerStrategy.process_tick(
          ..., alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv    ← 新增透传
      )
              |
              +-- [新K线] MomentumFilter.update(alt_symbol, alt_ohlcv, kline_time)
              |           → O(1) 在线更新: CUSUM / GK EMA / Volume EMA
              |           MomentumFilter.update(base_symbol, base_ohlcv, kline_time)
              |           (kline_time 去重:同 symbol 多配对只追加一次)
              |
              +-- _check_exit()  → ExitSignal
              |
              +-- _check_entry()
                      |
                      步骤1: 冷却期
                      步骤2: 突破检测(adaptive_z 首次穿越阈值)
                      步骤3: 持仓检查
                      步骤4: z4h 绝对值过滤
                 [新增]
                      步骤4.5: 方向判断(从原步骤5前移)
                      步骤4.6: 三层动量过滤
                                ├── Layer 2(硬拦截): CUSUM + GK + 量价确认
                                │    ├── alt 腿: check(alt, direction)
                                │    └── base 腿: check(base, opposite)
                                │    任一拦截 → 直接拒绝
                                │
                                ├── Layer 1(软拦截): ER + 自适应净位移
                                │    ├── alt 腿: check(alt, direction)
                                │    └── base 腿: check(base, opposite)
                                │    任一拦截 → 进入 Layer 3 仲裁
                                │
                                └── Layer 3(仲裁): Spread ER + z4h 净位移
                                     spread 有趋势 → 维持拦截
                                     spread 无趋势 → 推翻拦截,放行
                      步骤5: 产生 EntrySignal
              |
              v
      TradingOrchestrator.on_entry_signal()
              |
              v
      RiskManager.pre_trade_check()(现有9项风控)
              |
              v
      HyperliquidExecutor.open_position()

4.2 改动范围

改动文件 改动内容 改动量
src/trading/momentum_filter.py 新建,三层过滤器(含 CUSUM/GK/ER/Volume/Spread) ~250 行
src/trading/strategy.py __init__ 注入,process_tick 新增 OHLCV 参数,_check_entry 三层检查 ~50 行
src/trading/config.py StrategyParams 新增 12 个字段(带默认值) ~15 行
src/trading/orchestrator.py process_analysis 新增 alt_ohlcv/base_ohlcv 参数并透传 ~5 行
src/services/realtime_kline_service_base.py _trigger_strategy_if_ready 提取双腿 OHLCV + volume 并传入 ~20 行

不需要改动:

  • risk_manager.py(过滤在上游完成)
  • executor.py(无感知)
  • models.py(无感知)
  • 数据库 schema(无需持久化过滤器状态——CUSUM/GK 均为 O(1) 在线状态,内存驻留)

4.3 配置层集成

StrategyParams 增加带默认值的字段,支持全局默认 + 按 symbol 覆盖 + 按 pair 覆盖,与现有参数覆盖体系完全一致。

环境变量示例:

# 全局动量过滤器配置
TRADING_MOMENTUM_FILTER_ENABLED=true

# Layer 1: 持续趋势
TRADING_MOMENTUM_SUSTAINED_LOOKBACK=30
TRADING_MOMENTUM_SUSTAINED_BASE_THRESHOLD=0.008
TRADING_MOMENTUM_ER_THRESHOLD=0.55

# Layer 2: 急动检测
TRADING_MOMENTUM_GK_PERIOD=10
TRADING_MOMENTUM_CUSUM_DRIFT=0.5
TRADING_MOMENTUM_CUSUM_THRESHOLD=3.0
TRADING_MOMENTUM_VOLUME_CONFIRM_RATIO=1.5
TRADING_MOMENTUM_VOLUME_EMA_PERIOD=20

# Layer 3: Spread 仲裁
TRADING_MOMENTUM_SPREAD_LOOKBACK=20
TRADING_MOMENTUM_SPREAD_ER_THRESHOLD=0.45
TRADING_MOMENTUM_SPREAD_NET_THRESHOLD=1.5

# 针对高波动 symbol 调参示例
TRADING_STRATEGY_SOL_MOMENTUM_CUSUM_THRESHOLD=3.5
TRADING_STRATEGY_BTC_MOMENTUM_SUSTAINED_BASE_THRESHOLD=0.012

4.4 orchestrator.py 集成改动

process_analysis() 新增 alt_ohlcv / base_ohlcv 参数,透传至 strategy.process_tick()

def process_analysis(
    self,
    symbol: str,
    z4h: float,
    multi_period_result: dict,
    timestamp: datetime,
    latest_alt_price: float = None,
    avg_zscore_4h: float = None,
    kline_time: datetime | None = None,
    l2_snapshot: dict | None = None,
    exit_only: bool = False,
    alt_ohlcv: dict | None = None,     # 新增
    base_ohlcv: dict | None = None,    # 新增
) -> bool:
    # ... 现有逻辑不变 ...

    entry_signal, exit_signal = self._strategy.process_tick(
        symbol, base_symbol, z4h, timestamp,
        kline_time=kline_time,
        latest_price=price_for_log,
        alt_ohlcv=alt_ohlcv,       # 新增透传
        base_ohlcv=base_ohlcv,     # 新增透传
    )

4.5 realtime_kline_service_base.py 集成改动

_trigger_strategy_if_ready() 中从 price_data_cache 提取双腿最新 OHLCV + volume:

def _trigger_strategy_if_ready(self, symbol, timeframe, multi_period_result,
                               price_data_cache, kline_time, start_time,
                               exit_only=False, base_symbol=None):
    # ... 现有逻辑 ...

    # 提取 alt/base 最新 OHLCV + volume(用于动量过滤器)
    alt_ohlcv = None
    base_ohlcv = None
    period_key_5m = ('5m', '7d')
    if period_key_5m in price_data_cache:
        alt_klines_5m = price_data_cache[period_key_5m]['alt_klines']
        if alt_klines_5m:
            k = alt_klines_5m[0]
            latest_alt_price = k.get('close')
            alt_ohlcv = {
                'close': k.get('close', 0.0),
                'high': k.get('high', 0.0),
                'low': k.get('low', 0.0),
                'open': k.get('open', 0.0),
                'volume': k.get('volume', 0.0),
            }
        base_klines_5m = price_data_cache[period_key_5m]['base_klines']
        if base_klines_5m:
            k = base_klines_5m[0]
            base_ohlcv = {
                'close': k.get('close', 0.0),
                'high': k.get('high', 0.0),
                'low': k.get('low', 0.0),
                'open': k.get('open', 0.0),
                'volume': k.get('volume', 0.0),
            }

    # ... 现有 L2 / avg_zscore_4h 逻辑 ...

    acted = self._trading_orchestrator.process_analysis(
        symbol=symbol,
        z4h=current_zscore_4h,
        multi_period_result=multi_period_result,
        timestamp=analysis_now,
        latest_alt_price=latest_alt_price,
        avg_zscore_4h=avg_zscore_4h,
        kline_time=kline_time,
        l2_snapshot=l2_snapshot,
        exit_only=exit_only,
        alt_ohlcv=alt_ohlcv,       # 新增
        base_ohlcv=base_ohlcv,     # 新增
    )

数据来源price_data_cache[('5m','7d')] 中的 alt_klinesbase_klines 均来自 DB 查询
query_range 返回完整 OHLCV + volume),按 time DESC 排序,[0] 即最新一根 5m K 线。
kline 解析时已包含 openvolumevolume_usd 字段(_parse_kline() 第 650-682 行),无额外 DB 开销。

4.6 线程安全

MomentumFilter 不持有锁,由 AdaptiveBollingerStrategy_lock(已存在)统一保护。update()check() 均在 _lock 的持有范围内调用,无竞态风险。kline_time 去重在逻辑层面保证同一根 K 线不被重复追加。CUSUM/GK EMA/Volume EMA 的在线状态(_cusum_state/_gk_ema/_vol_ema)均为简单 dict 赋值,在锁保护下安全。

4.7 日志集成

各层拒绝使用 INFO 级别,格式区分层级便于排查:

🚫 动量过滤 | BTC|ETH | 约束3-暴涨不做空: CUSUM+=3.45>=3.0 GK_vol=0.0034 量比=2.3
🚫 动量过滤(对手腿) | SOL|BTC | 约束4-暴跌不做多: CUSUM-=3.12>=3.0 GK_vol=0.0028 量比=1.8
🚫 动量过滤 | SOL|BTC | 约束1-不追跌: 过去30根净跌幅=-1.23% 阈值=-0.65% ER=0.72 | Spread趋势确认: z4h净位移=-2.1 ER=0.58>=0.45
✅ 动量仲裁放行 | ETH|BTC | 单腿:约束2-不追涨: 过去30根净涨幅=1.05% 阈值=+0.80% ER=0.68 | Spread无趋势,推翻拦截

5. 与当前交易风格的契合度分析

5.1 交易风格特征

维度 当前系统特征
策略类型 配对协整均值回归
K 线周期 5 分钟
最大持仓时间 72 小时(max_hold_hours=72.0
入场逻辑 adaptive_z 突破阈值(非连续信号,要求首次穿越)
出场逻辑 adaptive_z 回归至 entry_adaptive_z * reversion_factor
EMA/STD 参数 ema_span=36, std_window=72
已有风控 止损 3%、移动止损(激活 7%/回调 3%)、最大回撤、每日亏损限额、冷却期 15min、熔断器

5.2 契合度分析

高度契合之处:

  1. Spread 仲裁机制与均值回归哲学完美匹配:均值回归策略关心的是 spread 的行为,而非单腿价格。Layer 3 用 z4h(spread 代理)的 ER 做最终仲裁,确保只有 spread 本身呈趋势时才拦截。两腿同向运动但 spread 稳定时→放行→保留有效信号。这是 v2.0 最核心的改进。

  2. CUSUM 的时间尺度天然适配:CUSUM 是无窗口的在线检测器,通过 drift 参数控制灵敏度而非固定时间窗口。这比"检查最近 N 根"更灵活——它能捕捉"3 根连续小涨"和"1 根暴涨"两种模式。

  3. 自适应阈值消除时段偏差:GK 波动率驱动的自适应阈值让过滤器在亚盘(低波动)和美盘(高波动)保持一致的统计敏感度。对于 5 分钟 K 线的加密市场,波动率在不同时段可差 3-5 倍,自适应是必须的。

  4. O(1) 在线更新零延迟:CUSUM/GK EMA/Volume EMA 均为 O(1) 在线更新,每根 K 线只需常数时间,不影响现有 process_tick 的延迟特征。

  5. 量价确认减少误杀:加密市场中低流动性时段的跳价(价格大幅变动但成交量极小)是常见噪音。量价确认让 Layer 2 只拦截"真正的强动量",保护有效信号。

  6. 独立模块不影响出场:过滤器只影响入场判断,对 _check_exit() 和止损逻辑完全不干预。

  7. symbol 维度 + spread 维度正交:单腿 buffer 在 symbol 维度工作,spread 仲裁复用 z4h 历史,两个维度互不干扰,通过 kline_time 去重避免数据污染。

设计张力(有意为之的权衡):

  • 均值回归策略倾向于在"偏离最大时入场",而 Layer 1 会在此时压制部分信号。Layer 3 仲裁机制缓解了这一矛盾:只有 spread 本身也呈趋势时才真正拦截,单腿趋势不会自动拦截。
  • Layer 2 硬拦截不可仲裁,因为急动场景的执行风险(滑点、对手方缺失)无法通过 spread 稳定性对冲。

5.3 综合评分

维度 评分 v1.1 对比 说明
算法先进性 ★★★★★ +2★ Kaufman ER + CUSUM + GK + 量价确认,均为各领域最优/前沿算法
误杀控制 ★★★★★ +2★ Spread 仲裁机制彻底解决"两腿同向→误杀"问题
自适应能力 ★★★★★ +2★ GK 波动率驱动阈值,消除时段差异
时间尺度匹配 ★★★★★ 持平 CUSUM 无固定窗口,ER 窗口可调
代码侵入性 ★★★★★ 持平 独立模块,改动集中,可独立关闭
运行效率 ★★★★★ +1★ O(1) 在线更新(vs v1.1 的 O(N) check)
参数可控性 ★★★★☆ 持平 全部走环境变量,支持 symbol 级覆盖
策略哲学一致性 ★★★★★ +1★ Spread 仲裁与均值回归哲学高度一致

6. 与四项开单约束的对应关系

6.1 逐项验证

约束 1:连续下跌不追跌

Layer 1 检测:ER >= 0.55 AND net_return <= -adaptive_threshold
被阻止:direction == 'short'
原因:市场路径高效地持续下跌(非震荡漂移),跌幅充分释放,继续做空空间有限且反弹风险高
仲裁:Layer 3 检查 spread 趋势性 → spread 也呈下降趋势则维持拦截,否则放行

示例(5min K 线,N=30,base_threshold=0.8%,GK 自适应后 threshold=0.65%):

  • 过去 150 分钟稳步从 100 跌到 99(ER=0.72, 净跌 1%)→ Layer 1 拦截
    • Spread 同步趋势 → 维持拦截 ✓
    • Spread 稳定(两腿同跌)→ 仲裁放行 ✓(正确!spread 无风险)
  • 过去 150 分钟 V 形反转(100→96→99.2,ER=0.10, 净跌 0.8%)→ ER 不足,放行

约束 2:连续上涨不追涨

Layer 1 检测:ER >= 0.55 AND net_return >= +adaptive_threshold
被阻止:direction == 'long'
仲裁:同约束 1

示例:

  • 过去 150 分钟稳步从 100 涨到 101.2(ER=0.85, 净涨 1.2%)→ 拦截 → 仲裁
  • 过去 150 分钟震荡漂移(ER=0.15, 净涨 0.9%)→ ER 不足,放行

约束 3:迅速暴涨不做空

Layer 2 检测:CUSUM_pos >= 3.0 AND volume_ratio >= 1.5
被阻止:direction == 'short'
原因:CUSUM 累积检测到多方动量爆发 + 成交量放大确认 → 真正的暴涨
不可仲裁(硬拦截)

示例(GK_vol=0.3%,drift=0.5):

  • 连续 3 根各涨 0.8%(标准化 z=2.67 每根)→ CUSUM 累积到 6.5 → 拦截 ✓
    (v1.1 的单根 ATR 检测会遗漏这种分散式急动)
  • 单根暴涨 2.5%(z=8.3)→ CUSUM 一步跳到 7.8 → 拦截 ✓
  • 单根涨 0.3% 但量极小(vol_ratio=0.3)→ CUSUM 低 + 量不足 → 放行 ✓(流动性不足的噪音跳价)

约束 4:迅速暴跌不做多

Layer 2 检测:CUSUM_neg >= 3.0 AND volume_ratio >= 1.5
被阻止:direction == 'long'
不可仲裁(硬拦截)

示例同约束 3(方向相反)。

6.2 约束覆盖矩阵

约束 过滤层 触发指标 被阻止方向 检查维度 可仲裁 正确性
1. 连续下跌不追跌 Layer 1 ER≥T AND net_ret≤-T_adp short alt+base+spread
2. 连续上涨不追涨 Layer 1 ER≥T AND net_ret≥+T_adp long alt+base+spread
3. 迅速暴涨不做空 Layer 2 CUSUM+≥T AND vol≥1.5x short alt+base
4. 迅速暴跌不做多 Layer 2 CUSUM-≥T AND vol≥1.5x long alt+base

四项约束完全覆盖,无遗漏,无逻辑矛盾。Spread 仲裁机制额外解决了 v1.1 中"两腿同向→误杀"的问题。


7. 参数配置指南

7.1 基础参数推荐值

Layer 1:持续趋势过滤

参数 默认值 范围 说明
sustained_lookback 30 10-60 回望根数(30根5min=150分钟)
sustained_base_threshold 0.8% 0.3%-2.0% 净位移基准阈值(GK 自适应缩放)
er_threshold 0.55 0.40-0.75 Kaufman ER 阈值,越高越宽松

Layer 2:急动检测

参数 默认值 范围 说明
gk_period 10 5-20 GK 波动率 EMA 周期
cusum_drift 0.5 0.3-1.0 CUSUM 漂移,越大越不灵敏
cusum_threshold 3.0 2.0-5.0 CUSUM 触发阈值,越大越宽松
volume_confirm_ratio 1.5 1.0-3.0 量价确认倍数
volume_ema_period 20 10-50 成交量 EMA 周期

Layer 3:Spread 仲裁

参数 默认值 范围 说明
spread_lookback 20 10-40 Spread 趋势回望根数
spread_er_threshold 0.45 0.30-0.65 Spread ER 阈值
spread_net_threshold 1.5 0.5-3.0 Spread 净位移阈值(z4h 单位)

7.2 参数调优思路

Layer 1 调优:

er_threshold 调优:
  过低(< 0.40):震荡漂移也被判定为趋势 → 误杀增多
  过高(> 0.75):只拦截极端趋势 → 保护不足
  建议:从 0.55 开始,用历史数据统计 ER 分布的 65th 百分位

sustained_base_threshold 调优:
  该值会被 GK 波动率自适应缩放(0.3x ~ 3.0x)
  低波动期实际阈值 ≈ 0.3% ~ 0.5%
  高波动期实际阈值 ≈ 1.5% ~ 2.4%
  建议:基准值保持 0.8%,通过 GK 自动适配

Layer 2 调优:

cusum_drift 与 cusum_threshold 的关系:
  drift 控制"漂移容忍度":drift=0.5 意味着 ≤0.5σ 的收益率不会累积 CUSUM
  threshold 控制"触发灵敏度":threshold=3.0 意味着需要累积 3 个标准化单位才触发

  快速检测(更灵敏):drift=0.3, threshold=2.0
  标准检测:drift=0.5, threshold=3.0
  保守检测(更宽松):drift=0.8, threshold=4.0

  CUSUM 理论:ARL_0(正常运行长度)≈ exp(2*drift*threshold) / (2*drift^2)
  drift=0.5, threshold=3.0 → ARL_0 ≈ 40(约 40 根 K 线才会一次误报)

volume_confirm_ratio 调优:
  1.0:无量确认效果(等于关闭)
  1.5:标准值,量放大 50% 才确认
  2.0+:非常保守,可能漏过部分真急动

Layer 3 调优:

spread_er_threshold 调优:
  该阈值决定"仲裁的宽容度"
  过低(< 0.30):几乎所有 spread 都被判为有趋势 → 仲裁形同虚设
  过高(> 0.65):几乎总是推翻 Layer 1 → 持续趋势过滤形同虚设
  建议:0.45(偏宽容,给均值回归更多机会)

spread_net_threshold 调优:
  该值的单位是 z4h(标准化后的 spread)
  1.0:较灵敏,z4h 变化 1 个单位即判定趋势
  1.5:标准值
  2.0+:宽容,只有大幅 spread 趋势才维持拦截

7.3 回测验证建议

对比三组回测结果:
  A 组:无动量过滤(当前基线)
  B 组:v1.1 方案(纯净位移 + SMA ATR)
  C 组:v2.0 方案(ER + CUSUM + GK + Spread 仲裁)

核心对比指标:
  - Sharpe Ratio(风险调整收益)
  - 最大连续亏损次数
  - 平均单笔盈亏比(W/L Ratio)
  - 信号过滤率(被拦截信号 / 总信号)
  - 误杀率(被拦截后实际盈利的信号占比 — 通过回看判断)
  - 仲裁放行率(Layer 3 推翻 Layer 1 的比例)
  - 分时段表现差异(亚盘/欧盘/美盘 — 验证自适应阈值效果)

预期结果:
  C 组 vs A 组:过滤率 15-25%,Sharpe 提升 0.2-0.5,最大连亏减少 30%+
  C 组 vs B 组:过滤率相近但误杀率降低 40%+(Spread 仲裁 + ER 联合判断的贡献)

8. 风险与局限性

8.1 已知局限

局限 说明 缓解措施
数据预热期 GK EMA 需 10 根,CUSUM 需稳定 GK 后才有意义(约 15 根≈75 分钟) 启动时从 DB 回填最近 50 根 K 线预填充 buffer
基准波动率冷启动 baseline_vol 需要多个 symbol 的 GK 样本才稳定 前 10 个样本期间使用 base_threshold 不缩放
Spread 仲裁依赖 z4h 历史 需要 SymbolBaseline 中有 z4h_history 字段 若字段不存在则 Layer 1 拦截生效(保守策略)
量价确认依赖 volume 质量 部分低流动性币种或 HIP-3 资产的 volume 数据可能不可靠 volume 为 0 或缺失时自动跳过量确认,仅依赖 CUSUM
CUSUM 复位特性 CUSUM 在急动后需要一段时间衰减回 0,期间可能持续拦截 这是特性而非 bug——急动后的冷却期正是我们需要的

8.2 不适合的场景

  • 极低流动性资产:GK 波动率可能因价格跳空(bid-ask spread 过大)而失真,需配合黑名单
  • HIP-3 资产:若 K 线数据稀疏且 volume 不可靠,Layer 2 量价确认无法生效,仅靠 CUSUM 价格检测

8.3 后续迭代方向

优先级 方向 说明
P1 历史预填充 启动时从 DB 的 klines 表回填近 50 根 K 线,消除预热等待
P2 分时段参数 亚盘/欧盘/美盘使用不同 CUSUM drift(美盘可适当放宽)
P2 CUSUM 自适应 drift drift 随 GK 波动率动态调整,高波动期提高容忍度
P3 多时间框架确认 同时检查 1h K 线的趋势性,避免 5min 级别的噪音触发
P3 Hurst 指数辅助 用 Rescaled Range 法估计 H 值,H>0.6 额外加权 Layer 1 判断

附录 A:算法学术参考

算法 原始论文 核心贡献
Kaufman Efficiency Ratio Kaufman, P. (1995) Smarter Trading 方向距离/路径长度衡量趋势效率
CUSUM Page, E.S. (1954) Biometrika 序列变化检测的最优在线算法
CUSUM 最优性证明 Lorden, G. (1971) Ann. Math. Statist. CUSUM 在平均检测延迟最小意义下一阶渐近最优
Garman-Klass 波动率 Garman, M. & Klass, M. (1980) J. Business OHLC 波动率估计器,效率为 close-to-close 的 7.4 倍
知情交易模型 Kyle, A.S. (1985) Econometrica 价格+成交量联合信号区分知情/噪声交易
配对交易理论 Vidyamurthy, G. (2004) Pairs Trading Spread 均值回归的系统性方法论
配对交易实证 Gatev, E. et al. (2006) Rev. Financial Studies 配对交易策略的大规模实证验证

文档结束

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG