全系统 Key 配对维度重构:问题分析与实施报告

全系统 Key 配对维度重构:问题分析与实施报告

日期:2026-02-19
涉及文件:6 个核心文件
改动性质:架构修复(非功能新增)


一、问题根因分析

1.1 系统背景

本系统是配对交易(Pairs Trading)系统:每一笔交易由两个标的组成:

  • symbol(alt 腿,如 PURR/USDC:USDC
  • base_symbol(base 腿,如 HYPE/USDC:USDC

系统基于两者的协整关系计算 z-score,当 z-score 偏离均值足够远时入场,回归时退场。

1.2 根本缺陷

改造前,系统所有内部数据结构以单一 symbol: str 作为 key:

# 改造前 — strategy.py
self._baselines: dict[str, SymbolBaseline] = {}
self._positions: dict[str, PositionTracker] = {}
self._exit_pending: set[str] = set()
self._last_trade_time: dict[str, datetime] = {}
# ... 共 9 个字典/集合
# 改造前 — position_manager.py
self._positions: dict[str, PairPosition] = {}
self._opening_symbols: set[str] = set()

1.3 数据混乱的具体场景

当同一 PURR/USDC:USDC 同时与两个不同 base 配对(如 HYPEBTC)时:

操作 旧行为(Bug) 预期行为
_baselines["PURR"] = ... PURR/BTC 的基线覆盖 PURR/HYPE 两个基线独立
_positions["PURR"] = pos PURR/BTC 仓位覆盖 PURR/HYPE 仓位 两个仓位独立
close_position("PURR") 无法确定平哪个配对 明确 close_position("PURR", "HYPE")
get_adaptive_z("PURR") 返回被覆盖的错误 z 按配对返回对应 z

结果

  • 基线污染:z-score 基准被不同配对的数据混合污染,信号逻辑错乱
  • 仓位丢失:后开的仓位覆盖前一个仓位的内存记录,前者永久丢失追踪
  • 止损失效:止损监控遍历 _positions 时只看到最后写入的那个仓位

二、解决方案

引入类型别名 PairKey = tuple[str, str],将所有内部状态的 key 从 str 改为 (symbol, base_symbol) 二元组。

设计原则

  • single 模式base_symbol=""",key 为 (symbol, ""),行为与原来一致
  • pair 模式:key 为 (symbol, base_symbol),两个配对完全隔离
  • 策略参数_params_for(symbol) 保持 symbol-only(参数按币种配置,与 base 无关)

三、改动详情(逐文件)

3.1 src/trading/models.py

添加类型别名,供所有文件 import 复用:

# 新增
PairKey = tuple[str, str]  # (symbol, base_symbol)

3.2 src/trading/strategy.py(最核心改动)

9 个内部字典/集合的 key 类型全部升级

字段 改造前 改造后
_baselines dict[str, SymbolBaseline] dict[PairKey, SymbolBaseline]
_prev_above_threshold dict[str, bool] dict[PairKey, bool]
_positions dict[str, PositionTracker] dict[PairKey, PositionTracker]
_exit_pending set[str] set[PairKey]
_last_trade_time dict[str, datetime] dict[PairKey, datetime]
_last_near_thresh_time dict[str, datetime | None] dict[PairKey, datetime | None]
_last_status_time dict[str, datetime | None] dict[PairKey, datetime | None]
_tick_count dict[str, int] dict[PairKey, int]
_last_adaptive_z dict[str, float] dict[PairKey, float]

所有公共方法增加 base_symbol: str 参数

# 改造前
def process_tick(self, symbol, z4h, timestamp, ..., base_symbol=None)
def on_position_closed(self, symbol, timestamp=None)
def on_exit_failed(self, symbol)
def get_adaptive_z(self, symbol)
def prime_buffer(self, symbol, z4h_values)

# 改造后
def process_tick(self, symbol, base_symbol, z4h, timestamp, ...)
def on_position_closed(self, symbol, base_symbol, timestamp=None)
def on_exit_failed(self, symbol, base_symbol)
def get_adaptive_z(self, symbol, base_symbol)
def prime_buffer(self, symbol, base_symbol, z4h_values)

信号去重 key 扩展

# 改造前
dedup_key = f"{symbol}:{direction}:{timestamp}"

# 改造后
dedup_key = f"{symbol}:{base_symbol}:{direction}:{timestamp}"

cleanup_symbol 改为全量清理 + 新增精确清理

def cleanup_symbol(self, symbol: str):
    """清理该 symbol 下所有配对的状态(用于 symbol 完全退出时)"""
    with self._lock:
        for key in [k for k in self._baselines if k[0] == symbol]:
            for d in (self._baselines, self._positions, self._last_trade_time, ...):
                d.pop(key, None)

def cleanup_pair(self, symbol: str, base_symbol: str):
    """清理特定配对的状态"""
    key = (symbol, base_symbol)
    ...

3.3 src/trading/position_manager.py

内存结构升级

# 改造前
self._positions: dict[str, PairPosition] = {}
self._opening_symbols: set[str] = set()

# 改造后
self._positions: dict[PairKey, PairPosition] = {}
self._opening_pairs: set[PairKey] = set()

新增模块级辅助函数

def _pair_key(pos: PairPosition) -> PairKey:
    return (pos.symbol, pos.base_symbol or "")

close_position 签名升级

# 改造前
def close_position(self, symbol: str, signal=None, reason="signal", force_market=False)

# 改造后
def close_position(self, symbol: str, base_symbol: str, signal=None, reason="signal", force_market=False)

防重复检查

# 改造前(Bug:无法区分同 symbol 不同配对)
if signal.symbol in self._positions or signal.symbol in self._opening_symbols:
    return None

# 改造后
key = (signal.symbol, signal.base_symbol or "")
if key in self._positions or key in self._opening_pairs:
    return None

sync_with_exchange 返回类型升级

# 改造前
-> tuple[list[str], list[str], list[str]]

# 改造后
-> tuple[list[PairKey], list[PairKey], list[PairKey]]

孤儿收纳迭代

# 改造前(dict[str, str],一对多时丢信息)
for alt_symbol, base_symbol in known_pairs.items():

# 改造后(list[PairKey],保留所有配对)
for alt_symbol, base_symbol in known_pairs:

3.4 src/trading/trade_repository.py

get_positions_by_symbols 返回类型

# 改造前(同 symbol 多 base 时互相覆盖)
return {row["symbol"]: row for row in rows}

# 改造后
return {(row["symbol"], row.get("base_symbol") or ""): row for row in rows}

get_known_pair_relations 返回类型

# 改造前(dict 在同 symbol 多 base 时丢失数据)
return {row["symbol"]: row["base_symbol"] for row in rows}

# 改造后
return [(row["symbol"], row["base_symbol"]) for row in rows]

3.5 src/trading/protocols.py

同步接口签名(返回类型标注与实现层保持一致):

def get_positions_by_symbols(
    self, symbols: list[str], network: str = None
) -> dict[PairKey, dict]: ...

def get_known_pair_relations(
    self, network: str = None
) -> list[PairKey]: ...

3.6 src/trading/orchestrator.py

共更新约 15 处调用方:

start() — 历史 z4h 灌入

# 改造前(按 symbol 维度查,混合了不同 base 的 z4h)
sym_rows = db_client.execute_query(
    "SELECT DISTINCT symbol FROM analysis_results WHERE zscore_4h IS NOT NULL"
)
self._strategy.prime_buffer(sym, z4h_values)

# 改造后(按配对维度查,每个配对独立初始化基线)
pair_rows = db_client.execute_query(
    "SELECT DISTINCT symbol, base_symbol FROM analysis_results WHERE zscore_4h IS NOT NULL"
)
# SQL 查询增加 base_symbol 过滤条件
self._strategy.prime_buffer(sym, base_sym, z4h_values)

process_analysis() — 提取 base_symbol 并全链路传递

base_symbol = multi_period_result.get("base_symbol") or ""

entry_signal, exit_signal = self._strategy.process_tick(
    symbol, base_symbol, z4h, timestamp, ...  # base_symbol 现为第 2 个位置参数
)

on_exit_signal() — 配对维度的仓位检查

# 改造前(只按 symbol 检查,可能误判)
if symbol not in {p.symbol for p in self._position_manager.open_positions}:

# 改造后(按配对维度精确匹配)
if (symbol, base_symbol) not in {
    (p.symbol, p.base_symbol or "") for p in self._position_manager.open_positions
}:

_close_with_retry() — 所有 4 处策略/仓位调用

# 统一增加 pos.base_symbol or "" 参数
self._position_manager.close_position(pos.symbol, pos.base_symbol or "", ...)
self._strategy.get_adaptive_z(pos.symbol, pos.base_symbol or "")
self._strategy.on_position_closed(pos.symbol, pos.base_symbol or "")
self._strategy.on_exit_failed(pos.symbol, pos.base_symbol or "")

_position_sync() — 解包 PairKey 元组

# 改造前(返回 list[str])
for symbol in closed_symbols:
    self._strategy.on_position_closed(symbol)

# 改造后(返回 list[PairKey],解包使用)
for sym, base_sym in closed_pairs:
    self._strategy.on_position_closed(sym, base_sym)

四、不变的部分

组件 说明
数据库 schema pair_positions 表本来就有 symbolbase_symbol 两列,无需迁移
PairPosition / PairTradeSignal dataclass 字段不变,本来就有两个字段
_params_for(symbol) 策略参数按币种配置,与 base 无关,保持 symbol-only
黑名单系统 已经是 (symbol, base_symbol) 维度,无需改动
_has_open_position(symbol) 服务层检查"该 symbol 是否有任意持仓",语义为 symbol 维度,无需改
执行层(executor) 只关心下单,不维护 key
WebSocket 接收 / K 线写入 流程不变

五、验证

5.1 语法检查

所有 6 个修改文件全部通过 Python 语法编译检查:

models.py:           OK
strategy.py:         OK
position_manager.py: OK
trade_repository.py: OK
protocols.py:        OK
orchestrator.py:     OK

5.2 关键场景验证点

场景 验证方法
PURR/HYPE 与 PURR/BTC 同时存在 检查 strategy._baselines 中两个 key 独立存在
平 PURR/HYPE 不影响 PURR/BTC close_position("PURR", "HYPE") 后确认 "PURR/BTC" 仓位仍在
历史 z4h 按配对分别灌入 启动日志中 primed 计数 = 实际配对数
重启恢复 recover_positions_from_db 正确以 (symbol, base_symbol) 为 key 恢复内存仓位
孤儿收纳 known_pairs 现为 list[PairKey],多 base 时不再丢失

六、改造前后对比

改造前:
  _positions = {
      "PURR/USDC:USDC": <PURR/BTC 仓位>   ← 覆盖了 PURR/HYPE 仓位!
  }

改造后:
  _positions = {
      ("PURR/USDC:USDC", "HYPE/USDC:USDC"): <PURR/HYPE 仓位>,
      ("PURR/USDC:USDC", "BTC/USDC:USDC"):  <PURR/BTC 仓位>,
  }

两个配对的基线、仓位、止损状态、退场重试标记全部独立,互不干扰。

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG