孤儿仓位设计方案4

孤儿仓位管理优化 — 综合设计方案

目标: 区分"DB已存在但内存遗漏"和"真正的孤儿仓位",避免重复创建、保留关键字段、减少告警噪音


一、问题诊断

1.1 当前缺陷

现有判定逻辑:

# 当前实现: position_manager.py L1011-L1022
memory_coins = {symbol_to_coin(s) for s in self._positions}   # ← 仅查内存
orphan_entries = {}
for coin, ex_pos in exchange_coins.items():
    if coin in memory_coins:       # ← 内存命中 → 跳过
        continue
    szi = float(ex_pos.get("szi", 0))
    if szi == 0:
        continue
    orphan_entries[coin] = ex_pos   # ← 内存未命中 → 直接标记为孤儿 ❌

判定链路: 交易所有 → 内存无 → 直接认定为孤儿 → 创建新 PairPositionsave_position() 写入 DB

核心问题:

  • ❌ 跳过数据库查询验证步骤
  • ❌ "数据库有记录但内存未加载"的正常仓位被误判为孤儿
  • ❌ 导致 entry_adaptive_zentry_zscore_4h 等关键字段丢失
  • ❌ 均值回归退出策略失效
  • ❌ 可能产生重复的 position_id 记录

1.2 缺陷场景

sequenceDiagram
    participant Exchange as 交易所
    participant Memory as 内存 (_positions)
    participant DB as 数据库 (pair_positions)
    participant System as 系统

    Note over Exchange, DB: 场景: 仓位在 DB 有记录,但内存被清理

    System->>Exchange: 查询实际仓位
    Exchange-->>System: coin=PURR, szi=100

    System->>Memory: coin in memory_coins?
    Memory-->>System: ❌ 不存在

    Note over System: ⚠️ 当前: 直接创建孤儿仓位
    System->>DB: save_position(新 position_id)
    Note over DB: 💥 DB 中已有该 coin 的活跃仓位<br/>产生重复记录!

触发条件 (任一即可):

# 场景 发生时机
1 内存被异常清理 OOM 后部分恢复、热重载
2 recover_positions_from_db 未完整执行 启动阶段 DB 连接中断
3 sync_with_exchange 先于恢复流程运行 线程竞争
4 多实例部署同网络 实例 A 开仓 → 实例 B 检测到孤儿

1.3 业务影响

受影响的关键字段:

  • entry_adaptive_z: 均值回归退出的基线 (最关键)
  • entry_zscore_4h: 入场时的 Z-score 快照
  • entry_avg_zscore_4h: 入场时的平均 Z-score
  • entry_signal_strength: 信号强度
  • entry_signal_id: 信号 ID (追溯用)
  • peak_pnl_pct: 峰值盈利百分比 (移动止损用)

业务后果:

  • ⚠️ 均值回归退出策略无法工作 (entry_adaptive_z=0 被跳过)
  • ⚠️ 移动止损可能失效 (peak_pnl_pct 丢失)
  • ⚠️ 信号追溯困难 (entry_signal_id 丢失)
  • ⚠️ 告警噪音增加 (正常恢复被当作异常告警)
  • ⚠️ DB 中同一 coin 出现两条活跃仓位记录
  • ⚠️ PnL 统计重复计算

二、解决方案

2.1 核心设计原则

新定义: 孤儿仓位 = 仅在交易所存在,且内存与数据库都不存在 的仓位

修正后的判定链路:

flowchart TD
    A["交易所存在仓位<br/>(coin in exchange_coins)"] --> B{"内存中有记录?<br/>(coin in memory_coins)"}
    B -->|✅ 有| C["跳过 (非孤儿)"]
    B -->|❌ 无| D{"🆕 批量查询数据库<br/>(get_positions_by_symbols)"}
    D -->|✅ DB 有| E["从 DB 恢复到内存<br/>(_load_position_from_db_row)"]
    D -->|❌ DB 无| F["确认为真孤儿"]
    F --> G["智能配对 (Fix #12)"]
    G -->|配对成功| H["创建 pair 仓位"]
    G -->|无法配对| I["创建 single 孤儿仓位"]

    style D fill:#ffd700,stroke:#333,color:#000
    style E fill:#90ee90,stroke:#333,color:#000
    style F fill:#ffcccc,stroke:#333,color:#000

2.2 架构层次

┌─────────────────────────────────────────┐
│  Layer 1: trade_repository.py          │
│  - 新增批量查询方法                      │
│  - get_positions_by_symbols()           │
│  (性能优化: O(1) vs O(N))               │
└─────────────────────────────────────────┘
              ↓
┌─────────────────────────────────────────┐
│  Layer 2: position_manager.py           │
│  - _load_position_from_db_row()         │
│    (复用: 恢复 + 孤儿检测)               │
│  - _detect_and_adopt_orphans() (重构)   │
│    (批量验证 + 智能配对 + 告警区分)      │
└─────────────────────────────────────────┘

三、详细实现

3.1 TradeRepository — 批量查询方法

文件: src/trading/trade_repository.py

新增方法:

def get_positions_by_symbols(
    self,
    symbols: list[str],
    network: str = None
) -> dict[str, dict]:
    """批量查询活跃仓位 (性能优化版)

    Args:
        symbols: 交易对列表 ["PURR/USDC:USDC", "HYPE/USDC:USDC"]
        network: 网络标识 (testnet/mainnet)

    Returns:
        {symbol: position_dict} - 仅包含数据库中存在的记录

    性能特性:
        - SQL: symbol = ANY(%s) (批量查询)
        - 索引: idx_pair_positions_symbol_status
        - 时间复杂度: O(1) vs 逐个查询 O(N)
        - 预期耗时: <5ms (查询 3-10 个 symbol)
    """
    if not symbols:
        return {}

    sql = """
        SELECT * FROM pair_positions
        WHERE symbol = ANY(%s)
          AND status IN ('open', 'opening', 'closing')
    """
    params = [symbols]

    if network:
        sql += " AND network = %s"
        params.append(network)

    try:
        rows = self._db.execute_query(sql, tuple(params)) or []
        # 构建索引: symbol → db_row (去重保留最后一条)
        return {row["symbol"]: row for row in rows}
    except Exception as e:
        logger.error(f"批量查询仓位失败: {e}", exc_info=True)
        return {}  # 降级: 返回空字典,后续按孤儿处理

SQL 性能分析:

-- 利用现有索引
CREATE INDEX idx_pair_positions_symbol_status
  ON pair_positions (symbol, status);

-- 查询示例
SELECT * FROM pair_positions
WHERE symbol = ANY(ARRAY['PURR/USDC:USDC', 'HYPE/USDC:USDC'])
  AND status IN ('open', 'opening', 'closing')
  AND network = 'testnet';

-- 执行计划 (预期)
Index Scan using idx_pair_positions_symbol_status on pair_positions
  Index Cond: (symbol = ANY(...) AND status IN (...))
  Filter: (network = 'testnet')

性能对比:

方法 查询次数 耗时 (10个symbol)
逐个查询 N次 ~30ms
批量查询 1次 <5ms

3.2 TradeRepositoryProtocol — 接口同步

文件: src/trading/protocols.py

 class TradeRepositoryProtocol(Protocol):
     def save_signal(...) -> None: ...
     def save_position(self, pos: PairPosition) -> None: ...
     def update_position_status(...) -> None: ...
     def get_open_positions(self, network: str = None) -> list[dict]: ...
+    def get_positions_by_symbols(
+        self, symbols: list[str], network: str = None
+    ) -> dict[str, dict]: ...
     def save_order(...) -> None: ...
     def update_daily_stats(self, **kwargs) -> None: ...

3.3 PositionManager — 辅助方法

文件: src/trading/position_manager.py

方法 1: 从 DB 行加载仓位 (复用)

def _load_position_from_db_row(
    self,
    row: dict,
    exchange_coins: dict[str, dict],
) -> PairPosition | None:
    """从 DB 行 + 交易所快照构造并刷新 PairPosition

    复用场景:
        1. recover_positions_from_db() - 启动恢复
        2. _detect_and_adopt_orphans() - 孤儿检测中 DB 命中

    Args:
        row: 数据库查询结果行
        exchange_coins: {coin: exchange_pos_dict}

    Returns:
        刷新后的 PairPosition 对象,失败返回 None

    数据来源策略:
        - DB 优先: entry_adaptive_z, entry_zscore_4h (信号快照)
        - 交易所优先: alt_size, unrealized_pnl (实时数据)
        - 混合: base_size (交易所实时 + DB 兜底)
    """
    try:
        symbol = row["symbol"]
        coin = symbol_to_coin(symbol)

        # 验证交易所是否有该 coin
        if coin not in exchange_coins:
            return None

        ex_pos = exchange_coins[coin]
        szi = float(ex_pos.get("szi", 0))
        if szi == 0:
            return None

        position = PairPosition(
            position_id=str(row["position_id"]),
            symbol=symbol,
            base_symbol=row["base_symbol"],
            direction=row["direction"],
            status=PositionStatus(row["status"]),
            pair_mode=row.get("pair_mode", "single"),

            # Alt 腿 (使用交易所实时数据)
            alt_side=row.get("alt_side", ""),
            alt_size=abs(szi),
            alt_entry_price=float(row.get("alt_entry_price", 0)),

            # Base 腿 (DB 数据)
            base_side=row.get("base_side", ""),
            base_size=float(row.get("base_size", 0)),
            base_entry_price=float(row.get("base_entry_price", 0)),

            # 🔑 关键: 信号快照字段 (完整恢复)
            entry_zscore_4h=float(row.get("entry_zscore_4h", 0) or 0),
            entry_adaptive_z=float(row.get("entry_adaptive_z", 0) or 0),
            entry_avg_zscore_4h=row.get("entry_avg_zscore_4h"),
            entry_signal_strength=row.get("entry_signal_strength", ""),

            # 盈亏数据 (交易所实时 + DB 历史)
            unrealized_pnl=float(ex_pos.get("unrealizedPnl", 0)),
            realized_pnl=float(row.get("realized_pnl", 0) or 0),
            peak_pnl_pct=float(row.get("peak_pnl_pct", 0) or 0),

            # 时间与关联
            open_time=(
                row.get("open_time").astimezone()
                if row.get("open_time") else None
            ),
            entry_signal_id=str(row.get("entry_signal_id", "")),
            network=row.get("network", "testnet"),
        )

        # Pair 模式: 刷新 base 腿实时大小
        if position.pair_mode == "pair" and position.base_symbol:
            base_coin = symbol_to_coin(position.base_symbol)
            if base_coin in exchange_coins:
                base_ex = exchange_coins[base_coin]
                base_szi = float(base_ex.get("szi", 0))
                if base_szi != 0:
                    position.base_size = abs(base_szi)

        return position

    except Exception as e:
        logger.error(
            f"从 DB 行加载仓位失败: {row.get('symbol', '?')} | {e}",
            exc_info=True,
        )
        return None

3.4 PositionManager — 主流程重构

文件: src/trading/position_manager.py (L996-L1144)

完整实现:

def _detect_and_adopt_orphans(
    self,
    exchange_coins: dict,
    source: str
) -> list[str]:
    """检测交易所存在但内存中没有的孤儿仓位,纳入管理

    改进:
        - 在判定孤儿前,先批量查询数据库验证
        - 区分"DB 恢复"(保留字段) 和 "真孤儿"(entry_adaptive_z=0)
        - 只对真孤儿发送告警通知
        - 智能配对: 只配对两个都是真孤儿的币种

    Args:
        exchange_coins: {coin: exchange_pos_dict}
        source: 'recover' / 'sync'

    Returns:
        收纳的孤儿仓位 symbol 列表
    """
    memory_coins = {symbol_to_coin(s) for s in self._positions}
    adopted_symbols: list[str] = []

    # Step 1: 收集所有内存未覆盖的 coin
    orphan_candidates = {}
    for coin, ex_pos in exchange_coins.items():
        if coin in memory_coins:
            continue
        szi = float(ex_pos.get("szi", 0))
        if szi == 0:
            continue
        orphan_candidates[coin] = ex_pos

    if not orphan_candidates:
        return []

    # Step 2: 🆕 批量查询数据库验证 (性能优化: O(1) vs O(N))
    symbols = [coin_to_symbol(coin) for coin in orphan_candidates.keys()]
    db_positions_map = self._repo.get_positions_by_symbols(
        symbols, network=self._config.network.value
    )

    # Step 3: 从 DB 恢复已存在的仓位
    for coin, ex_pos in list(orphan_candidates.items()):
        symbol = coin_to_symbol(coin)
        db_row = db_positions_map.get(symbol)

        if db_row:
            # ✅ DB 有记录 → 恢复到内存
            try:
                position = self._load_position_from_db_row(db_row, exchange_coins)
                if position:
                    self._positions[symbol] = position
                    adopted_symbols.append(symbol)
                    # 从候选列表移除
                    orphan_candidates.pop(coin)

                    # DB 恢复日志 (INFO 级别, 正常流程)
                    logger.info(
                        f"✅ 从数据库恢复仓位 ({source}): {symbol} | "
                        f"方向={position.direction} | 大小={position.alt_size} | "
                        f"entry_adaptive_z={position.entry_adaptive_z:.4f} | "
                        f"position_id: {position.position_id}"
                    )
                    # 🆕 DB 恢复不发送告警 (正常流程)

                    # Pair 模式: 同步标记 base_coin 已处理
                    if position.pair_mode == "pair" and position.base_symbol:
                        base_coin = symbol_to_coin(position.base_symbol)
                        memory_coins.add(coin)
                        memory_coins.add(base_coin)
                        orphan_candidates.pop(base_coin, None)

            except Exception as e:
                logger.error(
                    f"从 DB 恢复仓位失败: {symbol} | {e}",
                    exc_info=True
                )
                # 降级: 保留在 orphan_candidates 中,按孤儿处理

    # Step 4: 处理真孤儿 (DB 无记录)
    orphan_entries = orphan_candidates  # 剩余的都是真孤儿

    if not orphan_entries:
        return adopted_symbols

    # Fix #12: 智能配对 (只配对两个都是真孤儿的币种)
    from src.config import HYPE_ALT_SYMBOL, HYPE_BASE_SYMBOL
    alt_coin = symbol_to_coin(HYPE_ALT_SYMBOL)
    base_coin = symbol_to_coin(HYPE_BASE_SYMBOL)
    paired_coins = set()

    # 🆕 配对前验证: 两个币种都是真孤儿 (DB 无记录)
    if alt_coin in orphan_entries and base_coin in orphan_entries:
        alt_ex = orphan_entries[alt_coin]
        base_ex = orphan_entries[base_coin]
        alt_szi = float(alt_ex.get("szi", 0))
        base_szi = float(base_ex.get("szi", 0))

        # 方向相反才配对
        if (alt_szi > 0) != (base_szi > 0):
            direction = "long" if alt_szi > 0 else "short"
            alt_size = abs(alt_szi)
            base_size = abs(base_szi)
            alt_entry_price = float(alt_ex.get("entryPx", 0))
            base_entry_price = float(base_ex.get("entryPx", 0))

            position = PairPosition(
                symbol=HYPE_ALT_SYMBOL,
                base_symbol=HYPE_BASE_SYMBOL,
                direction=direction,
                status=PositionStatus.OPEN,
                pair_mode="pair",
                alt_side="buy" if direction == "long" else "sell",
                alt_size=alt_size,
                alt_entry_price=alt_entry_price,
                base_side="sell" if direction == "long" else "buy",
                base_size=base_size,
                base_entry_price=base_entry_price,
                # 🔑 孤儿标记
                entry_zscore_4h=0.0,
                entry_adaptive_z=0.0,
                open_time=datetime.now().astimezone(),
                network=self._config.network.value,
            )

            self._positions[position.symbol] = position
            self._repo.save_position(position)
            paired_coins.update({alt_coin, base_coin})
            adopted_symbols.append(position.symbol)

            logger.warning(
                f"🔗 孤儿配对收纳 ({source}): {position.symbol} + {position.base_symbol} | "
                f"pair_mode=pair | 方向={direction} | "
                f"alt_size={alt_size} | base_size={base_size}"
            )

            try:
                sender_colourful(
                    title=f"🔗 孤儿配对收纳: {position.symbol}",
                    content=(
                        f"**来源**: {source}\n"
                        f"**Alt 腿**: {position.symbol} ({position.direction})\n"
                        f"**Base 腿**: {position.base_symbol}\n"
                        f"**大小**: {alt_size} / {base_size}\n"
                        f"---\n"
                        f"⚠️ **数据库无记录**, 可能是外部手动创建\n"
                        f"**注意**: entry_adaptive_z=0, 均值回归退出已禁用\n"
                        f"**网络**: {self._config.network_label}"
                    ),
                )
            except (OSError, ValueError) as e:
                logger.error(f"❌ 孤儿配对告警发送异常: {e}")

    # Step 5: 剩余 single 孤儿收纳
    for coin, ex_pos in orphan_entries.items():
        if coin in paired_coins:
            continue

        szi = float(ex_pos.get("szi", 0))
        direction = "long" if szi > 0 else "short"
        size = abs(szi)
        entry_price = float(ex_pos.get("entryPx", 0))
        symbol = coin_to_symbol(coin)

        position = PairPosition(
            symbol=symbol,
            base_symbol="",
            direction=direction,
            status=PositionStatus.OPEN,
            pair_mode="single",
            alt_side="buy" if direction == "long" else "sell",
            alt_size=size,
            alt_entry_price=entry_price,
            # 🔑 孤儿标记
            entry_zscore_4h=0.0,
            entry_adaptive_z=0.0,
            open_time=datetime.now().astimezone(),
            network=self._config.network.value,
        )

        self._positions[symbol] = position
        self._repo.save_position(position)
        adopted_symbols.append(symbol)

        # 真孤儿日志 (WARNING 级别)
        logger.warning(
            f"🔗 真孤儿仓位收纳 ({source}): {symbol} | "
            f"方向={direction} | 大小={size} | "
            f"入场价=${entry_price:.4f} | "
            f"⚠️ 数据库无记录 | position_id: {position.position_id}"
        )

        # 🆕 真孤儿告警 (区分于 DB 恢复)
        try:
            sender_colourful(
                title=f"🚨 真孤儿仓位收纳: {symbol}",
                content=(
                    f"**来源**: {source}\n"
                    f"**币种**: {symbol}\n"
                    f"**方向**: {direction}\n"
                    f"**大小**: {size}\n"
                    f"**入场价**: ${entry_price:.4f}\n"
                    f"---\n"
                    f"⚠️ **数据库无记录**, 可能是外部手动创建的仓位\n"
                    f"**注意**: entry_adaptive_z=0, 均值回归退出已禁用\n"
                    f"**建议**: 请确认此仓位来源, 必要时手动平仓\n"
                    f"**网络**: {self._config.network_label}"
                ),
            )
        except (OSError, ValueError) as e:
            logger.error(f"❌ 真孤儿仓位告警发送异常: {e}")

    return adopted_symbols

3.5 PositionManager — 恢复流程对接

文件: src/trading/position_manager.py (L760-L810)

重构 recover_positions_from_db:

 def recover_positions_from_db(self) -> int:
     """从数据库恢复活跃仓位到内存"""
     recovered = 0
     db_rows = self._repo.get_open_positions(network=self._config.network.value)
     if not db_rows:
         logger.info("无活跃仓位需要恢复")
         return 0

     exchange_coins = self._executor.get_positions(force_refresh=True)

     with self._lock:
         for row in db_rows:
             symbol = row["symbol"]
             if symbol in self._positions:
                 continue

-            # 旧实现: 内联构建 PairPosition
-            coin = symbol_to_coin(symbol)
-            if coin not in exchange_coins:
-                logger.warning(f"幽灵仓位: DB 有但交易所无: {symbol}")
-                self._repo.update_position_status(
-                    str(row["position_id"]), PositionStatus.CLOSED
-                )
-                continue
-
-            position = PairPosition(...)
-            position.alt_size = abs(float(exchange_coins[coin].get("szi", ...)))
-            # ... 更多字段赋值

+            # 🆕 复用 _load_position_from_db_row
+            position = self._load_position_from_db_row(row, exchange_coins)
+            if not position:
+                # 幽灵仓位: DB 有但交易所无
+                logger.warning(f"幽灵仓位: DB 有但交易所无: {symbol}")
+                self._repo.update_position_status(
+                    str(row["position_id"]), PositionStatus.CLOSED
+                )
+                continue

             self._positions[symbol] = position
             recovered += 1

         logger.info(f"✅ 从数据库恢复 {recovered} 个活跃仓位")
         return recovered

四、性能分析

4.1 性能对比

指标 改进前 改进后 变化
DB 查询次数 0 1 (批量) +1 次
DB 查询耗时 0ms <5ms +5ms
总体耗时 ~50ms ~55ms +10%
同步周期 60s 60s 无影响

4.2 极端场景测试

场景: 检测到 10 个孤儿候选币种

  • 批量查询耗时: <10ms
  • 总体影响: 仍可接受 (<20% 开销)
  • 结论: 性能瓶颈不在 DB 查询

4.3 性能优化手段

批量查询 vs 逐个查询:

# ❌ 低效方案 (O(N))
for coin in orphan_candidates:
    symbol = coin_to_symbol(coin)
    db_pos = repo.get_position_by_symbol(symbol)  # N 次查询

# ✅ 高效方案 (O(1))
symbols = [coin_to_symbol(c) for c in orphan_candidates]
db_positions_map = repo.get_positions_by_symbols(symbols)  # 1 次查询

五、错误处理与降级策略

5.1 降级策略矩阵

场景 错误类型 降级方案 影响
DB 批量查询失败 网络/连接异常 返回空字典, 全部按孤儿处理 丢失字段, 但系统可用
DB 行加载失败 数据解析异常 创建孤儿仓位 单个仓位丢失字段
数据不一致 方向/大小冲突 日志记录, 继续使用交易所数据 仅日志告警

5.2 错误处理代码示例

DB 查询失败:

try:
    rows = self._db.execute_query(sql, tuple(params)) or []
    return {row["symbol"]: row for row in rows}
except Exception as e:
    logger.error(f"批量查询仓位失败: {e}", exc_info=True)
    return {}  # 降级: 保证系统可用性 > 精确性

DB 恢复失败:

try:
    position = self._load_position_from_db_row(db_row, exchange_coins)
except Exception as e:
    logger.error(f"从 DB 恢复仓位失败: {symbol} | {e}", exc_info=True)
    # 降级为孤儿收纳
    orphan_candidates.pop(coin, None)  # 不移除, 后续按孤儿处理

六、兼容性保证

6.1 向后兼容性

场景 行为 影响
已收纳的老孤儿仓位 DB 中 entry_adaptive_z=0, 下次恢复时从 DB 读取, 保持 entry_adaptive_z=0 ✅ 行为不变
新孤儿仓位 自动走新流程 (DB 查询 → 恢复/收纳) ✅ 改进生效
正常仓位 从 DB 恢复, 保留完整字段 ✅ 核心收益

6.2 配对逻辑兼容性 (Fix #12)

配对判定矩阵:

Alt DB Base DB 行为
❌ 无 ❌ 无 ✅ 配对为 pair 仓位
✅ 有 ❌ 无 ✅ Alt 从 DB 恢复, Base 孤儿收纳 (两个独立 single 仓位)
❌ 无 ✅ 有 ✅ Base 从 DB 恢复, Alt 孤儿收纳 (两个独立 single 仓位)
✅ 有 ✅ 有 ✅ 两个都从 DB 恢复 (两个独立 single 仓位)

七、测试验证

7.1 单元测试场景

测试文件: tests/test_orphan_position_final.py

用例 场景 预期
test_orphan_db_recovery 内存无 + DB 有活跃仓位 从 DB 恢复, 不创建新仓位
test_orphan_true_orphan 内存无 + DB 无 创建新 PairPosition (entry_adaptive_z=0)
test_orphan_memory_hit 内存有 不查 DB, 直接跳过
test_orphan_pairing_both_orphans PURR (DB 无) + HYPE (DB 无), 方向相反 配对为 pair 仓位
test_orphan_pairing_one_in_db PURR (DB 有) + HYPE (DB 无) 不配对, 各自处理
test_batch_query_performance 批量查询 10 个 symbol 耗时 <10ms
test_db_query_failure DB 查询抛异常 降级为孤儿收纳

7.2 集成测试场景

场景 1: 模拟重启恢复

def test_restart_recovery():
    # 1. 准备 DB 数据
    insert_test_position(
        symbol="PURR/USDC:USDC",
        entry_adaptive_z=2.5,
        status="open"
    )

    # 2. 清空内存 (模拟重启)
    manager._positions.clear()

    # 3. 执行恢复
    manager.recover_positions_from_db()

    # 4. 验证: 从 DB 恢复, 保留完整字段
    assert manager._positions["PURR/USDC:USDC"].entry_adaptive_z == 2.5

场景 2: 模拟孤儿检测

def test_sync_orphan_detection():
    # 1. 交易所有仓位, 内存无, DB 无
    exchange_coins = {"PURR": {"szi": 100, "entryPx": 0.5}}
    manager._positions.clear()

    # 2. 执行同步
    manager.sync_with_exchange()

    # 3. 验证: 创建孤儿仓位
    assert "PURR/USDC:USDC" in manager._positions
    assert manager._positions["PURR/USDC:USDC"].entry_adaptive_z == 0.0

八、实施计划

Phase 1: 核心实现 (4-6 小时)

  • [ ] trade_repository.py: 添加 get_positions_by_symbols() 方法
  • [ ] protocols.py: 同步 TradeRepositoryProtocol 接口
  • [ ] position_manager.py: 添加 _load_position_from_db_row() 辅助方法
  • [ ] position_manager.py: 重构 _detect_and_adopt_orphans() 主流程
  • [ ] position_manager.py: 重构 recover_positions_from_db() 使用辅助方法
  • [ ] 代码审查与优化

Phase 2: 测试验证 (4-6 小时)

  • [ ] 编写单元测试 (7 个核心场景)
  • [ ] 编写集成测试 (重启恢复 + 孤儿检测)
  • [ ] 手动测试验证 (测试网环境)
  • [ ] 性能测试 (批量查询耗时)

Phase 3: 文档与监控 (2-3 小时)

  • [ ] 更新 docs/DATA_FLOW.md (孤儿检测流程)
  • [ ] 更新代码注释和文档
  • [ ] 添加性能监控指标
  • [ ] 配置告警阈值

九、关键收益总结

9.1 核心收益

收益 说明 影响
✅ 避免误判 数据库有记录的正常仓位不再被当作孤儿 减少 false positive
✅ 保留字段 entry_adaptive_z, entry_zscore_4h 完整恢复 均值回归策略正常工作
✅ 精准告警 DB 恢复不告警, 真孤儿才告警 减少告警噪音 50%+
✅ 性能可控 批量查询优化, 总体性能影响 <10% 对系统影响最小
✅ 向后兼容 无需数据迁移和配置变更 零停机升级
✅ 智能配对 只配对真孤儿, 避免混淆 逻辑更清晰

9.2 业务价值

均值回归策略恢复:

  • 现状: 孤儿仓位 entry_adaptive_z=0 → 均值回归退出失效
  • 改进后: 从 DB 恢复 entry_adaptive_z → 策略正常工作
  • 影响: 提升退出策略准确性, 减少亏损扩大风险

告警噪音减少:

  • 现状: 每次恢复都可能触发孤儿告警 (正常流程)
  • 改进后: 仅真孤儿才告警 (异常流程)
  • 影响: 运维效率提升, 关注真正的异常情况

数据完整性:

  • 现状: 多个关键字段丢失 (entry_signal_id, peak_pnl_pct 等)
  • 改进后: 完整恢复所有信号快照字段
  • 影响: 信号追溯、移动止损等功能正常工作

十、涉及文件清单

文件 变更说明
src/trading/trade_repository.py 新增 get_positions_by_symbols() 批量查询方法
src/trading/protocols.py 同步 TradeRepositoryProtocol 接口定义
src/trading/position_manager.py 新增 _load_position_from_db_row() 辅助方法;
重构 _detect_and_adopt_orphans();
重构 recover_positions_from_db()
docs/DATA_FLOW.md 更新孤儿仓位检测流程描述
tests/test_orphan_position_final.py 新增单元测试和集成测试

附录: 关键流程时序图

A.1 sync_with_exchange 完整流程

sequenceDiagram
    participant Orch as Orchestrator<br/>(60s 定时)
    participant PM as PositionManager
    participant Exec as Executor
    participant DB as TradeRepository
    participant Mem as _positions

    Orch->>PM: sync_with_exchange()
    PM->>Exec: get_positions(force_refresh=True)
    Exec-->>PM: exchange_coins

    loop 每个 exchange coin
        PM->>Mem: coin in memory_coins?
        alt 内存有
            PM->>PM: 同步 size/pnl
        else 内存无
            Note over PM: 🆕 批量查询 DB
            PM->>DB: get_positions_by_symbols([symbols])
            DB-->>PM: {symbol: db_row}
            alt DB 有
                PM->>PM: _load_position_from_db_row()
                PM->>Mem: _positions[symbol] = position
                Note over PM: ✅ 从 DB 恢复, 非孤儿
            else DB 也无
                Note over PM: 确认为真孤儿
                PM->>PM: 创建新 PairPosition
                PM->>DB: save_position()
                PM->>Mem: _positions[symbol] = position
            end
        end
    end

文档版本: v1.0 Final
创建日期: 2026-02-15
状态: 待实施
综合来源: DESIGN_ORPHAN_POSITION_1.md + DESIGN_ORPHAN_POSITION_2.md + DESIGN_ORPHAN_POSITION_3.md

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG