全系统 Key 配对维度重构:问题分析与实施报告
全系统 Key 配对维度重构:问题分析与实施报告
日期:2026-02-19
涉及文件:6 个核心文件
改动性质:架构修复(非功能新增)
一、问题根因分析
1.1 系统背景
本系统是配对交易(Pairs Trading)系统:每一笔交易由两个标的组成:
symbol(alt 腿,如PURR/USDC:USDC)base_symbol(base 腿,如HYPE/USDC:USDC)
系统基于两者的协整关系计算 z-score,当 z-score 偏离均值足够远时入场,回归时退场。
1.2 根本缺陷
改造前,系统所有内部数据结构以单一 symbol: str 作为 key:
# 改造前 — strategy.py
self._baselines: dict[str, SymbolBaseline] = {}
self._positions: dict[str, PositionTracker] = {}
self._exit_pending: set[str] = set()
self._last_trade_time: dict[str, datetime] = {}
# ... 共 9 个字典/集合
# 改造前 — position_manager.py
self._positions: dict[str, PairPosition] = {}
self._opening_symbols: set[str] = set()
1.3 数据混乱的具体场景
当同一 PURR/USDC:USDC 同时与两个不同 base 配对(如 HYPE 和 BTC)时:
| 操作 | 旧行为(Bug) | 预期行为 |
|---|---|---|
_baselines["PURR"] = ... |
PURR/BTC 的基线覆盖 PURR/HYPE | 两个基线独立 |
_positions["PURR"] = pos |
PURR/BTC 仓位覆盖 PURR/HYPE 仓位 | 两个仓位独立 |
close_position("PURR") |
无法确定平哪个配对 | 明确 close_position("PURR", "HYPE") |
get_adaptive_z("PURR") |
返回被覆盖的错误 z | 按配对返回对应 z |
结果:
- 基线污染:z-score 基准被不同配对的数据混合污染,信号逻辑错乱
- 仓位丢失:后开的仓位覆盖前一个仓位的内存记录,前者永久丢失追踪
- 止损失效:止损监控遍历
_positions时只看到最后写入的那个仓位
二、解决方案
引入类型别名 PairKey = tuple[str, str],将所有内部状态的 key 从 str 改为 (symbol, base_symbol) 二元组。
设计原则:
- single 模式:
base_symbol=""",key 为(symbol, ""),行为与原来一致 - pair 模式:key 为
(symbol, base_symbol),两个配对完全隔离 - 策略参数:
_params_for(symbol)保持 symbol-only(参数按币种配置,与 base 无关)
三、改动详情(逐文件)
3.1 src/trading/models.py
添加类型别名,供所有文件 import 复用:
# 新增
PairKey = tuple[str, str] # (symbol, base_symbol)
3.2 src/trading/strategy.py(最核心改动)
9 个内部字典/集合的 key 类型全部升级:
| 字段 | 改造前 | 改造后 |
|---|---|---|
_baselines |
dict[str, SymbolBaseline] |
dict[PairKey, SymbolBaseline] |
_prev_above_threshold |
dict[str, bool] |
dict[PairKey, bool] |
_positions |
dict[str, PositionTracker] |
dict[PairKey, PositionTracker] |
_exit_pending |
set[str] |
set[PairKey] |
_last_trade_time |
dict[str, datetime] |
dict[PairKey, datetime] |
_last_near_thresh_time |
dict[str, datetime | None] |
dict[PairKey, datetime | None] |
_last_status_time |
dict[str, datetime | None] |
dict[PairKey, datetime | None] |
_tick_count |
dict[str, int] |
dict[PairKey, int] |
_last_adaptive_z |
dict[str, float] |
dict[PairKey, float] |
所有公共方法增加 base_symbol: str 参数:
# 改造前
def process_tick(self, symbol, z4h, timestamp, ..., base_symbol=None)
def on_position_closed(self, symbol, timestamp=None)
def on_exit_failed(self, symbol)
def get_adaptive_z(self, symbol)
def prime_buffer(self, symbol, z4h_values)
# 改造后
def process_tick(self, symbol, base_symbol, z4h, timestamp, ...)
def on_position_closed(self, symbol, base_symbol, timestamp=None)
def on_exit_failed(self, symbol, base_symbol)
def get_adaptive_z(self, symbol, base_symbol)
def prime_buffer(self, symbol, base_symbol, z4h_values)
信号去重 key 扩展:
# 改造前
dedup_key = f"{symbol}:{direction}:{timestamp}"
# 改造后
dedup_key = f"{symbol}:{base_symbol}:{direction}:{timestamp}"
cleanup_symbol 改为全量清理 + 新增精确清理:
def cleanup_symbol(self, symbol: str):
"""清理该 symbol 下所有配对的状态(用于 symbol 完全退出时)"""
with self._lock:
for key in [k for k in self._baselines if k[0] == symbol]:
for d in (self._baselines, self._positions, self._last_trade_time, ...):
d.pop(key, None)
def cleanup_pair(self, symbol: str, base_symbol: str):
"""清理特定配对的状态"""
key = (symbol, base_symbol)
...
3.3 src/trading/position_manager.py
内存结构升级:
# 改造前
self._positions: dict[str, PairPosition] = {}
self._opening_symbols: set[str] = set()
# 改造后
self._positions: dict[PairKey, PairPosition] = {}
self._opening_pairs: set[PairKey] = set()
新增模块级辅助函数:
def _pair_key(pos: PairPosition) -> PairKey:
return (pos.symbol, pos.base_symbol or "")
close_position 签名升级:
# 改造前
def close_position(self, symbol: str, signal=None, reason="signal", force_market=False)
# 改造后
def close_position(self, symbol: str, base_symbol: str, signal=None, reason="signal", force_market=False)
防重复检查:
# 改造前(Bug:无法区分同 symbol 不同配对)
if signal.symbol in self._positions or signal.symbol in self._opening_symbols:
return None
# 改造后
key = (signal.symbol, signal.base_symbol or "")
if key in self._positions or key in self._opening_pairs:
return None
sync_with_exchange 返回类型升级:
# 改造前
-> tuple[list[str], list[str], list[str]]
# 改造后
-> tuple[list[PairKey], list[PairKey], list[PairKey]]
孤儿收纳迭代:
# 改造前(dict[str, str],一对多时丢信息)
for alt_symbol, base_symbol in known_pairs.items():
# 改造后(list[PairKey],保留所有配对)
for alt_symbol, base_symbol in known_pairs:
3.4 src/trading/trade_repository.py
get_positions_by_symbols 返回类型:
# 改造前(同 symbol 多 base 时互相覆盖)
return {row["symbol"]: row for row in rows}
# 改造后
return {(row["symbol"], row.get("base_symbol") or ""): row for row in rows}
get_known_pair_relations 返回类型:
# 改造前(dict 在同 symbol 多 base 时丢失数据)
return {row["symbol"]: row["base_symbol"] for row in rows}
# 改造后
return [(row["symbol"], row["base_symbol"]) for row in rows]
3.5 src/trading/protocols.py
同步接口签名(返回类型标注与实现层保持一致):
def get_positions_by_symbols(
self, symbols: list[str], network: str = None
) -> dict[PairKey, dict]: ...
def get_known_pair_relations(
self, network: str = None
) -> list[PairKey]: ...
3.6 src/trading/orchestrator.py
共更新约 15 处调用方:
start() — 历史 z4h 灌入:
# 改造前(按 symbol 维度查,混合了不同 base 的 z4h)
sym_rows = db_client.execute_query(
"SELECT DISTINCT symbol FROM analysis_results WHERE zscore_4h IS NOT NULL"
)
self._strategy.prime_buffer(sym, z4h_values)
# 改造后(按配对维度查,每个配对独立初始化基线)
pair_rows = db_client.execute_query(
"SELECT DISTINCT symbol, base_symbol FROM analysis_results WHERE zscore_4h IS NOT NULL"
)
# SQL 查询增加 base_symbol 过滤条件
self._strategy.prime_buffer(sym, base_sym, z4h_values)
process_analysis() — 提取 base_symbol 并全链路传递:
base_symbol = multi_period_result.get("base_symbol") or ""
entry_signal, exit_signal = self._strategy.process_tick(
symbol, base_symbol, z4h, timestamp, ... # base_symbol 现为第 2 个位置参数
)
on_exit_signal() — 配对维度的仓位检查:
# 改造前(只按 symbol 检查,可能误判)
if symbol not in {p.symbol for p in self._position_manager.open_positions}:
# 改造后(按配对维度精确匹配)
if (symbol, base_symbol) not in {
(p.symbol, p.base_symbol or "") for p in self._position_manager.open_positions
}:
_close_with_retry() — 所有 4 处策略/仓位调用:
# 统一增加 pos.base_symbol or "" 参数
self._position_manager.close_position(pos.symbol, pos.base_symbol or "", ...)
self._strategy.get_adaptive_z(pos.symbol, pos.base_symbol or "")
self._strategy.on_position_closed(pos.symbol, pos.base_symbol or "")
self._strategy.on_exit_failed(pos.symbol, pos.base_symbol or "")
_position_sync() — 解包 PairKey 元组:
# 改造前(返回 list[str])
for symbol in closed_symbols:
self._strategy.on_position_closed(symbol)
# 改造后(返回 list[PairKey],解包使用)
for sym, base_sym in closed_pairs:
self._strategy.on_position_closed(sym, base_sym)
四、不变的部分
| 组件 | 说明 |
|---|---|
| 数据库 schema | pair_positions 表本来就有 symbol 和 base_symbol 两列,无需迁移 |
PairPosition / PairTradeSignal |
dataclass 字段不变,本来就有两个字段 |
_params_for(symbol) |
策略参数按币种配置,与 base 无关,保持 symbol-only |
| 黑名单系统 | 已经是 (symbol, base_symbol) 维度,无需改动 |
_has_open_position(symbol) |
服务层检查"该 symbol 是否有任意持仓",语义为 symbol 维度,无需改 |
| 执行层(executor) | 只关心下单,不维护 key |
| WebSocket 接收 / K 线写入 | 流程不变 |
五、验证
5.1 语法检查
所有 6 个修改文件全部通过 Python 语法编译检查:
models.py: OK
strategy.py: OK
position_manager.py: OK
trade_repository.py: OK
protocols.py: OK
orchestrator.py: OK
5.2 关键场景验证点
| 场景 | 验证方法 |
|---|---|
| PURR/HYPE 与 PURR/BTC 同时存在 | 检查 strategy._baselines 中两个 key 独立存在 |
| 平 PURR/HYPE 不影响 PURR/BTC | close_position("PURR", "HYPE") 后确认 "PURR/BTC" 仓位仍在 |
| 历史 z4h 按配对分别灌入 | 启动日志中 primed 计数 = 实际配对数 |
| 重启恢复 | recover_positions_from_db 正确以 (symbol, base_symbol) 为 key 恢复内存仓位 |
| 孤儿收纳 | known_pairs 现为 list[PairKey],多 base 时不再丢失 |
六、改造前后对比
改造前:
_positions = {
"PURR/USDC:USDC": <PURR/BTC 仓位> ← 覆盖了 PURR/HYPE 仓位!
}
改造后:
_positions = {
("PURR/USDC:USDC", "HYPE/USDC:USDC"): <PURR/HYPE 仓位>,
("PURR/USDC:USDC", "BTC/USDC:USDC"): <PURR/BTC 仓位>,
}
两个配对的基线、仓位、止损状态、退场重试标记全部独立,互不干扰。