数据链路按照Symbol严格匹配改造
策略引擎全链路 Per-Symbol 隔离 — 最终设计文档
1. 问题总结
策略引擎 (strategy.py) 的所有可变状态均为全局单值,在多 symbol 场景下产生三类严重问题:
| # | 问题 | 根因 | 影响 |
|---|---|---|---|
| 1 | 持有 ALT 时阻止 PURR 入场 | _position: PositionTracker | None 单槽位,有任意持仓即禁止所有 symbol 入场 |
错过交易机会 |
| 2 | 多 symbol 的 z4h 混入同一缓冲区 | _std_window / _ema / _welford_* 全局共享,不区分 symbol |
adaptive_z 计算结果被交叉污染,信号失真 |
| 3 | 多 symbol 交替调用时产生假突破 | _prev_above_threshold 全局单值,PURR 设 True → ALT 重置为 False → PURR 误触发 |
重复/虚假入场信号 |
[!IMPORTANT]
前两份设计文档(V1、V2)仅修复问题 1(仓位跟踪),且对_prev_above_threshold和_last_trade_time存在分歧。
本方案统一解决全部三个问题,将数据流涉及的所有环节按 symbol 一一匹配。
2. 设计原则
- 全链路 Per-Symbol:缓冲区、统计量、EMA、持仓、冷却期、突破状态,全部按 symbol 独立,杜绝任何交叉污染
- 状态内聚:引入
SymbolState容器,将单个 symbol 的全部可变状态聚合到一个对象,替代 10+ 个独立 dict - 延迟初始化:首次遇到新 symbol 时自动创建
SymbolState,无需预注册 - 最小接口变更:仅修改
strategy.py(主体)+orchestrator.py(3 处接口适配),下游模块不动 - 向前兼容:策略层支持多 symbol 同时持仓;入场仅当「当前 symbol 已在持仓」时跳过
3. 与前两份文档的决策对比
| 状态变量 | 文档 V1 | 文档 V2 | 本方案(V3) | 决策理由 |
|---|---|---|---|---|
_position |
dict[str, PositionTracker] |
dict[str, PositionTracker] |
SymbolState.position | 两份文档一致,本方案内聚到 SymbolState |
_exit_pending |
set[str] 或 dict[str, bool] |
_exit_pending_symbols: set[str] |
SymbolState.exit_pending: bool | 每个 SymbolState 自带 bool,语义更清晰 |
_last_trade_time |
全局(推荐方案 A) | _last_trade_times: dict[str, datetime] |
SymbolState.last_trade_time | 不同 symbol 应独立冷却,A 对 B 的交易不应阻止 B 入场 |
_prev_above_threshold |
dict[str, bool] |
保持全局(理由:缓冲区共享) | SymbolState.prev_above_threshold | 缓冲区已改为 per-symbol,突破状态理应同步隔离;且全局状态下多 symbol 交替会产生假突破 |
_std_window / EMA / Welford |
不涉及 | 不涉及 | SymbolState 内含 | 核心改进:消除缓冲区交叉污染,使 adaptive_z 对每个 symbol 独立精确 |
_last_kline_time |
不涉及 | 不涉及 | SymbolState.last_kline_time | K 线门控(防止同根 K 线重复更新基线)应按 symbol 独立 |
on_entry_rejected |
per-symbol 重置 | 不改(全局) | per-symbol 重置 | 与 _prev_above_threshold per-symbol 一致 |
4. 数据流与受影响节点全景
graph LR
subgraph "数据流入"
S["service_base<br>_trigger_strategy_if_ready"]
S -->|"symbol + z4h"| O["orchestrator<br>process_analysis"]
end
subgraph "策略引擎 strategy.py 🔴 修改范围"
O -->|1| PT["process_tick(symbol)"]
PT -->|2| CE["_check_entry(symbol, state)"]
PT -->|3| CX["_check_exit(symbol, state)"]
SS["SymbolState 容器<br>✅ per-symbol 隔离"]
CE -.->|"读"| SS
CX -.->|"读"| SS
PT -.->|"读/写"| SS
SS --- BUF["std_window + ema<br>+ welford_*"]
SS --- POS["position + exit_pending<br>+ last_trade_time"]
SS --- BRK["prev_above_threshold<br>+ last_kline_time"]
end
subgraph "回调 (orchestrator → strategy)"
CB1["on_position_opened(symbol)"] -.->|"写"| SS
CB2["on_position_closed(symbol)"] -.->|"写"| SS
CB3["on_exit_failed(symbol)"] -.->|"写"| SS
CB4["on_entry_rejected(symbol)"] -.->|"写"| SS
CB5["sync_position(symbol)"] -.->|"写"| SS
end
subgraph "下游 (无需修改) ✅"
O2["orchestrator 调用链"]
O3["position_manager"]
O4["risk_manager"]
O5["executor"]
end
style SS fill:#51cf66,color:#fff
style BUF fill:#51cf66,color:#fff
style POS fill:#51cf66,color:#fff
style BRK fill:#51cf66,color:#fff
5. 核心数据结构设计
5.1 新增 SymbolState dataclass
@dataclass
class SymbolState:
"""单个 symbol 的完整策略状态
所有与特定交易对相关的可变状态均聚合于此,
确保多 symbol 之间完全隔离,不存在交叉污染。
"""
# ── 缓冲区与统计量(Welford 在线算法)──
std_window: deque # z4h 滚动窗口 (maxlen=ema_span)
welford_mean: float = 0.0 # Welford 在线均值
welford_m2: float = 0.0 # Welford 偏差平方和
welford_updates: int = 0 # 增量更新计数(触发周期性重算)
ema: float | None = None # EMA 状态
last_kline_time: datetime | None = None # 上一根 K 线时间(基线更新门控)
# ── 持仓跟踪 ──
position: PositionTracker | None = None # 该 symbol 的持仓(无持仓时 None)
exit_pending: bool = False # 退场失败待重试标志
last_trade_time: datetime | None = None # 上次交易时间(冷却期)
# ── 突破检测 ──
prev_above_threshold: bool = False # 上一 tick 的 |adaptive_z| > threshold
# ── 诊断 ──
last_std: float = 0.0 # 最近一次计算的 std(日志用)
5.2 策略类持仓结构变更
class AdaptiveBollingerStrategy:
def __init__(self, config: TradingConfig):
# ── 全局不可变配置(所有 symbol 共享)──
self._ema_span = config.strategy_ema_span
self._alpha = 2.0 / (self._ema_span + 1)
self._threshold = config.strategy_adaptive_threshold
self._min_zscore_abs = config.strategy_min_zscore_abs
self._reversion_factor = config.strategy_reversion_factor
self._cooldown_minutes = config.strategy_cooldown_minutes
self._min_std = 0.01
# ── Per-Symbol 状态容器 ──
self._symbol_states: dict[str, SymbolState] = {}
# ── 全局辅助(非 symbol 相关)──
self._lock = threading.Lock()
self._signal_history: OrderedDict[str, datetime] = OrderedDict()
self._signal_cooldown = 60
self._tick_count: int = 0
self._last_status_time: datetime | None = None
self._last_near_thresh_time: datetime | None = None
5.3 延迟初始化辅助方法
def _get_or_create_state(self, symbol: str) -> SymbolState:
"""获取或创建 symbol 的策略状态(延迟初始化)"""
state = self._symbol_states.get(symbol)
if state is None:
state = SymbolState(std_window=deque(maxlen=self._ema_span))
self._symbol_states[symbol] = state
return state
6. 策略层(strategy.py)逐项修改规格
6.1 __init__ 重构
| 移除的全局状态 | 迁移去向 |
|---|---|
self._std_window: deque |
SymbolState.std_window |
self._welford_mean: float |
SymbolState.welford_mean |
self._welford_m2: float |
SymbolState.welford_m2 |
self._welford_updates: int |
SymbolState.welford_updates |
self._ema: float | None |
SymbolState.ema |
self._last_kline_time |
SymbolState.last_kline_time |
self._prev_above_threshold: bool |
SymbolState.prev_above_threshold |
self._position: PositionTracker | None |
SymbolState.position |
self._exit_pending: bool |
SymbolState.exit_pending |
self._last_trade_time: datetime | None |
SymbolState.last_trade_time |
self._last_std: float |
SymbolState.last_std |
保留为全局的:_ema_span, _alpha, _threshold, _min_zscore_abs, _reversion_factor, _cooldown_minutes, _min_std, _lock, _signal_history, _signal_cooldown, _tick_count, _last_status_time, _last_near_thresh_time, _WELFORD_RECALC_INTERVAL
6.2 公共接口变更
prime_buffer — 增加 symbol 参数
- def prime_buffer(self, z4h_values: list):
+ def prime_buffer(self, symbol: str, z4h_values: list):
内部实现改为对指定 symbol 的 state 操作:
def prime_buffer(self, symbol: str, z4h_values: list):
if not z4h_values:
logger.warning("⚠️ 缓冲区灌入: 无历史 z4h 数据")
return
with self._lock:
state = self._get_or_create_state(symbol)
self._prime_buffer_unlocked(state, z4h_values)
_prime_buffer_unlocked 签名变更:
- def _prime_buffer_unlocked(self, z4h_values: list):
+ def _prime_buffer_unlocked(self, state: SymbolState, z4h_values: list):
内部所有 self._ema → state.ema,self._std_window → state.std_window,self._welford_* → state.welford_*,self._prev_above_threshold → state.prev_above_threshold。
is_ready → is_symbol_ready(symbol)
- @property
- def is_ready(self) -> bool:
- min_required = max(10, self._ema_span // 4)
- return len(self._std_window) >= min_required
+ def is_symbol_ready(self, symbol: str) -> bool:
+ """指定 symbol 的缓冲区是否有足够数据计算 adaptive_z"""
+ state = self._symbol_states.get(symbol)
+ if state is None:
+ return False
+ min_required = max(10, self._ema_span // 4)
+ return len(state.std_window) >= min_required
内部 _process_tick_unlocked 中的 self.is_ready 调用改为直接检查 len(state.std_window) >= min_required。
current_adaptive_z → get_adaptive_z(symbol)
- @property
- def current_adaptive_z(self) -> float | None:
- with self._lock:
- return self._compute_adaptive_z()
+ def get_adaptive_z(self, symbol: str) -> float | None:
+ """指定 symbol 的当前 adaptive_z 快照(线程安全)"""
+ with self._lock:
+ state = self._symbol_states.get(symbol)
+ if state is None:
+ return None
+ return self._compute_adaptive_z(state)
6.3 内部计算方法重构
_compute_adaptive_z — 接受 SymbolState 参数
- def _compute_adaptive_z(self, realtime_z4h: float | None = None) -> float | None:
+ def _compute_adaptive_z(self, state: SymbolState, realtime_z4h: float | None = None) -> float | None:
内部替换:
self._ema→state.emaself._std_window→state.std_windowself._welford_m2→state.welford_m2self._last_std→state.last_stdself._min_std保持self._min_std(全局配置)
_recalculate_welford_from_window — 接受 SymbolState 参数
- def _recalculate_welford_from_window(self):
+ def _recalculate_welford_from_window(self, state: SymbolState):
内部替换:
self._std_window→state.std_windowself._welford_mean→state.welford_meanself._welford_m2→state.welford_m2
6.4 _process_tick_unlocked 重构
方法签名不变,内部通过 state 对象操作:
def _process_tick_unlocked(self, symbol, z4h, timestamp, kline_time=None, latest_price=None):
state = self._get_or_create_state(symbol)
# ── 判断是否为新 K 线 ──
is_new_candle = False
if kline_time is not None:
if state.last_kline_time is None or kline_time != state.last_kline_time:
is_new_candle = True
state.last_kline_time = kline_time
else:
is_new_candle = True
# ── 基线更新(仅新 K 线时执行)──
if is_new_candle:
n = len(state.std_window)
if n == self._ema_span:
old = state.std_window[0]
old_mean = state.welford_mean
state.welford_mean = old_mean + (z4h - old) / n
state.welford_m2 += (z4h - old) * ((z4h - state.welford_mean) + (old - old_mean))
if state.welford_m2 < 0:
state.welford_m2 = 0.0
else:
new_n = n + 1
delta = z4h - state.welford_mean
state.welford_mean += delta / new_n
delta2 = z4h - state.welford_mean
state.welford_m2 += delta * delta2
state.std_window.append(z4h)
state.welford_updates += 1
if state.welford_updates >= self._WELFORD_RECALC_INTERVAL:
self._recalculate_welford_from_window(state)
state.welford_updates = 0
if state.ema is None:
state.ema = z4h
else:
state.ema = self._alpha * z4h + (1 - self._alpha) * state.ema
# ── 就绪检查 ──
min_required = max(10, self._ema_span // 4)
if len(state.std_window) < min_required:
... # 未就绪,return (None, None)
# ── 计算 adaptive_z ──
adaptive_z = self._compute_adaptive_z(state, realtime_z4h=z4h)
...
# ── 突破检测(使用 state.prev_above_threshold)──
current_above = (adaptive_z < -self._threshold or adaptive_z > self._threshold)
# ── 状态摘要日志 ──
if state.position:
pos_base = f"{state.position.symbol} {state.position.direction}"
...
# 可选:增加总持仓数
# total_pos = len([s for s in self._symbol_states.values() if s.position])
# ── 退场/入场检查 ──
exit_signal = self._check_exit(symbol, z4h, adaptive_z, state)
if exit_signal is not None:
state.prev_above_threshold = current_above
return None, exit_signal
entry_signal = self._check_entry(symbol, z4h, adaptive_z, timestamp, state)
...
state.prev_above_threshold = current_above
return entry_signal, None
6.5 _check_entry 重构
def _check_entry(
- self, symbol: str, z4h: float, adaptive_z: float, timestamp: datetime,
+ self, symbol: str, z4h: float, adaptive_z: float, timestamp: datetime, state: SymbolState,
) -> EntrySignal | None:
关键变更:
入场持仓检查 — 仅该 symbol 有持仓时跳过(核心 BUG 修复):
- if self._position is not None:
- logger.info(f"⏭️ 入场跳过(已有持仓) | {symbol} | "
- f"当前持仓={self._position.symbol} {self._position.direction} | "
- f"az={adaptive_z:+.4f}")
- return None
+ if state.position is not None:
+ logger.info(f"⏭️ 入场跳过(已有持仓) | {symbol} | "
+ f"当前持仓={state.position.direction} | "
+ f"az={adaptive_z:+.4f}")
+ return None
突破检测 — 使用 per-symbol 状态:
- breakout = current_above and not self._prev_above_threshold
+ breakout = current_above and not state.prev_above_threshold
冷却期检查 — 使用 per-symbol 冷却时间:
- if self._last_trade_time is not None:
- elapsed = (timestamp - self._last_trade_time).total_seconds()
+ if state.last_trade_time is not None:
+ elapsed = (timestamp - state.last_trade_time).total_seconds()
6.6 _check_exit 重构
def _check_exit(
- self, symbol: str, z4h: float, adaptive_z: float
+ self, symbol: str, z4h: float, adaptive_z: float, state: SymbolState,
) -> ExitSignal | None:
关键变更:
- if self._position is None:
- return None
- if self._position.symbol != symbol:
+ if state.position is None:
return None
- direction = self._position.direction
+ direction = state.position.direction
删除
self._position.symbol != symbol检查,因为 state 本身就是该 symbol 的,dict key 查找已保证 symbol 匹配。
退场重试:
- if self._exit_pending:
+ if state.exit_pending:
所有 self._position.entry_* / self._position.entry_time → state.position.entry_* / state.position.entry_time。
6.7 回调方法重构
sync_position(symbol, ...)
with self._lock:
- self._position = PositionTracker(
+ state = self._get_or_create_state(symbol)
+ state.position = PositionTracker(
symbol=symbol, direction=direction,
entry_z4h=entry_z4h, entry_adaptive_z=entry_adaptive_z,
entry_time=entry_time or datetime.now().astimezone(),
)
on_position_opened(symbol, ...)
with self._lock:
- self._position = PositionTracker(
+ state = self._get_or_create_state(symbol)
+ state.position = PositionTracker(
symbol=symbol, direction=direction,
entry_z4h=entry_z4h, entry_adaptive_z=adaptive_z,
)
- self._last_trade_time = timestamp or datetime.now().astimezone()
+ state.last_trade_time = timestamp or datetime.now().astimezone()
on_position_closed(symbol, ...)
with self._lock:
- if self._position and self._position.symbol == symbol:
- self._last_trade_time = timestamp or datetime.now().astimezone()
- self._position = None
- self._exit_pending = False
+ state = self._symbol_states.get(symbol)
+ if state is not None and state.position is not None:
+ state.last_trade_time = timestamp or datetime.now().astimezone()
+ state.position = None
+ state.exit_pending = False
logger.info(f"🧹 持仓已清除 | {symbol}")
on_exit_failed(symbol)
with self._lock:
- if self._position and self._position.symbol == symbol:
- self._exit_pending = True
+ state = self._symbol_states.get(symbol)
+ if state is not None and state.position is not None:
+ state.exit_pending = True
on_entry_rejected(symbol)
with self._lock:
- self._prev_above_threshold = False
+ state = self._symbol_states.get(symbol)
+ if state is not None:
+ state.prev_above_threshold = False
7. 编排层(orchestrator.py)接口适配
编排层已按 symbol 调用策略所有方法,仅需适配 3 处接口签名变更:
7.1 prime_buffer 调用(2 处)
正常路径(约 L180):
- self._strategy.prime_buffer(healing_result.data)
+ self._strategy.prime_buffer(HYPE_ALT_SYMBOL, healing_result.data)
降级路径(约 L715):
- self._strategy.prime_buffer(z4h_values)
+ self._strategy.prime_buffer(HYPE_ALT_SYMBOL, z4h_values)
7.2 is_ready 引用(1 处)
启动日志(约 L217):
- f"策略: Adaptive Bollinger (is_ready={self._strategy.is_ready})"
+ f"策略: Adaptive Bollinger (is_ready={self._strategy.is_symbol_ready(HYPE_ALT_SYMBOL)})"
7.3 current_adaptive_z 引用(2 处)
止损通知(约 L739):
- exit_az = self._strategy.current_adaptive_z if self._strategy else None
+ exit_az = self._strategy.get_adaptive_z(symbol) if self._strategy else None
symbol来自_close_with_retry方法参数,已可用。
孤儿仓位恢复(约 L814):
- az = self._strategy.current_adaptive_z
+ az = self._strategy.get_adaptive_z(pos.symbol)
7.4 其余调用(无需改动)
| 方法 | 调用方式 | 状态 |
|---|---|---|
process_tick(symbol, ...) |
已按 symbol 调用 | ✅ 无需改 |
on_position_opened(symbol, ...) |
已按 symbol 调用 | ✅ 无需改 |
on_position_closed(symbol, ...) |
已按 symbol 调用 | ✅ 无需改 |
on_exit_failed(symbol) |
已按 symbol 调用 | ✅ 无需改 |
on_entry_rejected(symbol) |
已按 symbol 调用 | ✅ 无需改 |
sync_position(symbol, ...) |
已按 symbol 循环调用 | ✅ 无需改 |
8. 不修改的模块
| 模块 | 理由 |
|---|---|
position_manager.py |
已用 dict[str, PairPosition] 按 symbol 管理多仓位 ✅ |
risk_manager.py |
已用 len(open_positions) vs max_open_pairs 检查 ✅ |
realtime_kline_service_base.py |
已按 symbol 透传至 orchestrator,策略接口签名不影响此层 ✅ |
executor.py |
按交易所/币种获取与下单 ✅ |
trade_repository.py |
仓位记录按 symbol/position_id 区分 ✅ |
9. 修改文件汇总
src/trading/strategy.py — 主要修改
| 修改项 | 约影响行 | 类型 |
|---|---|---|
新增 SymbolState dataclass |
+20 行 | 新增 |
__init__ 移除全局状态,新增 _symbol_states |
~30 行 | 重构 |
新增 _get_or_create_state() |
+6 行 | 新增 |
prime_buffer + _prime_buffer_unlocked 增加 symbol/state 参数 |
~70 行 | 重构 |
is_ready → is_symbol_ready(symbol) |
~6 行 | 接口变更 |
current_adaptive_z → get_adaptive_z(symbol) |
~6 行 | 接口变更 |
_compute_adaptive_z(state, ...) |
~25 行 | 重构 |
_recalculate_welford_from_window(state) |
~10 行 | 重构 |
_process_tick_unlocked 全量 state 化 |
~155 行 | 重构 |
_check_entry state 化 |
~60 行 | 重构 |
_check_exit state 化 |
~65 行 | 重构 |
| 5 个回调方法 state 化 | ~60 行 | 重构 |
src/trading/orchestrator.py — 接口适配
| 修改项 | 约行号 | 类型 |
|---|---|---|
prime_buffer 增加 symbol 参数 |
L180, L715 | 调用适配 |
is_ready → is_symbol_ready() |
L217 | 调用适配 |
current_adaptive_z → get_adaptive_z(symbol) |
L739, L814 | 调用适配 |
10. 实施步骤
Step 1 ── 新增 SymbolState dataclass(strategy.py)
Step 2 ── 重构 __init__:移除全局状态 → _symbol_states dict
Step 3 ── 新增 _get_or_create_state(symbol) 辅助方法
Step 4 ── 重构内部方法:_compute_adaptive_z(state)、_recalculate_welford_from_window(state)
Step 5 ── 重构 prime_buffer(symbol, z4h_values) + _prime_buffer_unlocked(state, z4h_values)
Step 6 ── 重构 is_symbol_ready(symbol) + get_adaptive_z(symbol) 公共接口
Step 7 ── 重构 _process_tick_unlocked(全量 state 化)
Step 8 ── 重构 _check_entry / _check_exit(接受 state 参数)
Step 9 ── 重构 5 个回调方法(通过 state 操作)
Step 10 ── 适配 orchestrator.py 的 5 处调用
11. 测试与回归要点
单 symbol 回归
- 仅 PURR 时:开仓 → 平仓 → 再开仓,行为与修改前一致
- 缓冲区灌入(prime_buffer):传入 symbol 后 SymbolState 正确初始化
多 symbol 新增场景
- 先开 ALT,再对 PURR 产生入场信号:应允许 PURR 入场(策略不因「已有 ALT」跳过)
- PURR 入场后,
_symbol_states中应同时存在 ALT 与 PURR 两个独立 SymbolState - 两个 symbol 的
std_window/ema/welford_*完全独立,互不影响
同 symbol 重复入场
- 已持有 PURR 时,再次收到 PURR 入场信号:应被跳过并打日志
退场隔离
- 仅对对应 symbol 产生退场信号并清除该 symbol 的
state.position/state.exit_pending - 不影响其他 symbol 的 SymbolState
突破检测隔离
- PURR 突破(
prev_above_threshold=True)不影响 ALT 的突破检测状态 - 多 symbol 交替调用 process_tick 时不产生假突破
冷却期隔离
- PURR 平仓后的冷却期不阻止 ALT 入场
恢复与 sync
- 重启后
sync_position循环调用,多笔持仓(ALT + PURR)应各自写入独立的 SymbolState - prime_buffer 按 symbol 灌入,不交叉
12. Verification Plan
# 1. 语法检查
python -c "from src.trading.strategy import AdaptiveBollingerStrategy"
# 2. 旧全局引用审计(strategy.py 中不应残留以下引用)
grep -n "self\._position[^s_]" src/trading/strategy.py # _position 单值
grep -n "self\._ema[^_]" src/trading/strategy.py # _ema 全局
grep -n "self\._std_window" src/trading/strategy.py # _std_window 全局
grep -n "self\._exit_pending[^_]" src/trading/strategy.py # _exit_pending 单值
grep -n "self\._last_trade_time[^s]" src/trading/strategy.py # _last_trade_time 单值
grep -n "self\._prev_above_threshold" src/trading/strategy.py # _prev_above 全局
grep -n "self\._welford_mean[^s]" src/trading/strategy.py # welford 全局
grep -n "self\._welford_m2[^s]" src/trading/strategy.py # welford 全局
grep -n "self\._last_kline_time" src/trading/strategy.py # kline_time 全局
# 3. 接口适配审计(orchestrator.py 中旧接口应全部替换)
grep -n "prime_buffer\|is_ready\|current_adaptive_z" src/trading/orchestrator.py
# 4. 日志验证(部署后)
# - 不同 symbol 入场不再被其他 symbol 拦截
# - 缓冲区数据不交叉(各 symbol 的 ema/std 独立变化)
# - 冷却期按 symbol 独立生效