孤儿仓位管理重构实施计划

孤儿仓位管理重构实施计划

Context(背景)

问题诊断

当前系统存在孤儿仓位判定逻辑缺陷:

现有流程:交易所有 + 内存无 → 直接判定为孤儿 → 创建新 PairPosition

核心问题

  • ❌ 跳过数据库查询验证步骤
  • ❌ "数据库有记录但内存未加载"的正常仓位被误判为孤儿
  • ❌ 导致重复创建 position_id、丢失 entry_adaptive_z 等关键字段
  • ❌ 均值回归退出策略失效

触发场景

  • 内存被异常清理(OOM 后部分恢复)
  • recover_positions_from_db 未完整执行(DB 连接中断)
  • sync_with_exchange 先于恢复流程运行(线程竞争)

修复目标

实施综合设计文档 docs/DESIGN_ORPHAN_POSITION_FINAL.md 中的优化方案:

新定义:孤儿仓位 = 交易所有 + 内存无 + 数据库也无

核心改进

  1. 批量查询优化:使用 SQL ANY(%s) 批量查询,O(1) vs O(N)
  2. 辅助方法复用_load_position_from_db_row 供恢复和孤儿检测共用
  3. 智能配对:只配对两个都是真孤儿的币种
  4. 告警区分:DB 恢复 → INFO 日志不告警,真孤儿 → WARNING 日志 + 飞书告警
  5. 完全重构:彻底删除旧代码,无死代码残留

Implementation Plan(实施方案)

1. TradeRepository 层改造

文件src/trading/trade_repository.py

新增方法get_positions_by_symbols

def get_positions_by_symbols(
    self,
    symbols: list[str],
    network: str = None
) -> dict[str, dict]:
    """批量查询活跃仓位(性能优化版)

    Args:
        symbols: 交易对列表 ["PURR/USDC:USDC", "HYPE/USDC:USDC"]
        network: 网络标识(testnet/mainnet)

    Returns:
        {symbol: position_dict} - 仅包含数据库中存在的记录

    性能特性:
        - SQL: symbol = ANY(%s)(批量查询)
        - 索引: idx_pair_positions_symbol_status
        - 时间复杂度: O(1) vs 逐个查询 O(N)
        - 预期耗时: <5ms(查询 3-10 个 symbol)
    """
    if not symbols:
        return {}

    sql = """
        SELECT * FROM pair_positions
        WHERE symbol = ANY(%s)
          AND status IN ('open', 'opening', 'closing')
    """
    params = [symbols]

    if network:
        sql += " AND network = %s"
        params.append(network)

    try:
        rows = self._db.execute_query(sql, tuple(params)) or []
        return {row["symbol"]: row for row in rows}
    except Exception as e:
        logger.error(f"批量查询仓位失败: {e}", exc_info=True)
        return {}  # 降级:返回空字典

接口同步src/trading/protocols.py

TradeRepositoryProtocol 类中添加方法签名(约 L68 后):

def get_positions_by_symbols(
    self, symbols: list[str], network: str = None
) -> dict[str, dict]: ...

2. PositionManager 辅助方法

文件src/trading/position_manager.py

新增方法_load_position_from_db_row(约 L722 后,recover_positions_from_db 之前)

def _load_position_from_db_row(
    self,
    row: dict,
    exchange_coins: dict[str, dict],
) -> PairPosition | None:
    """从 DB 行 + 交易所快照构造并刷新 PairPosition

    复用场景:
        1. recover_positions_from_db() - 启动恢复
        2. _detect_and_adopt_orphans() - 孤儿检测中 DB 命中

    Args:
        row: 数据库查询结果行
        exchange_coins: {coin: exchange_pos_dict}

    Returns:
        刷新后的 PairPosition 对象,失败返回 None

    数据来源策略:
        - DB 优先: entry_adaptive_z, entry_zscore_4h(信号快照)
        - 交易所优先: alt_size, unrealized_pnl(实时数据)
        - 混合: base_size(交易所实时 + DB 兜底)
    """
    try:
        symbol = row["symbol"]
        coin = symbol_to_coin(symbol)

        # 验证交易所是否有该 coin
        if coin not in exchange_coins:
            return None

        ex_pos = exchange_coins[coin]
        szi = float(ex_pos.get("szi", 0))
        if szi == 0:
            return None

        position = PairPosition(
            position_id=str(row["position_id"]),
            symbol=symbol,
            base_symbol=row["base_symbol"],
            direction=row["direction"],
            status=PositionStatus(row["status"]),
            pair_mode=row.get("pair_mode", "single"),

            # Alt 腿(使用交易所实时数据)
            alt_side=row.get("alt_side", ""),
            alt_size=abs(szi),
            alt_entry_price=float(row.get("alt_entry_price", 0)),

            # Base 腿(DB 数据)
            base_side=row.get("base_side", ""),
            base_size=float(row.get("base_size", 0)),
            base_entry_price=float(row.get("base_entry_price", 0)),

            # 🔑 关键:信号快照字段(完整恢复)
            entry_zscore_4h=float(row.get("entry_zscore_4h", 0) or 0),
            entry_adaptive_z=float(row.get("entry_adaptive_z", 0) or 0),
            entry_avg_zscore_4h=row.get("entry_avg_zscore_4h"),
            entry_signal_strength=row.get("entry_signal_strength", ""),

            # 盈亏数据(交易所实时 + DB 历史)
            unrealized_pnl=float(ex_pos.get("unrealizedPnl", 0)),
            realized_pnl=float(row.get("realized_pnl", 0) or 0),
            peak_pnl_pct=float(row.get("peak_pnl_pct", 0) or 0),

            # 时间与关联
            open_time=(
                row.get("open_time").astimezone()
                if row.get("open_time") else None
            ),
            entry_signal_id=str(row.get("entry_signal_id", "")),
            network=row.get("network", "testnet"),
        )

        # Pair 模式:刷新 base 腿实时大小
        if position.pair_mode == "pair" and position.base_symbol:
            base_coin = symbol_to_coin(position.base_symbol)
            if base_coin in exchange_coins:
                base_ex = exchange_coins[base_coin]
                base_szi = float(base_ex.get("szi", 0))
                if base_szi != 0:
                    position.base_size = abs(base_szi)

        return position

    except Exception as e:
        logger.error(
            f"从 DB 行加载仓位失败: {row.get('symbol', '?')} | {e}",
            exc_info=True,
        )
        return None

3. _detect_and_adopt_orphans 完全重写

文件src/trading/position_manager.py

删除:L996-L1144(148 行旧代码)

重写

def _detect_and_adopt_orphans(
    self,
    exchange_coins: dict,
    source: str
) -> list[str]:
    """检测交易所存在但内存中没有的孤儿仓位,纳入管理

    改进:
        - 在判定孤儿前,先批量查询数据库验证
        - 区分"DB 恢复"(保留字段)和"真孤儿"(entry_adaptive_z=0)
        - 只对真孤儿发送告警通知
        - 智能配对:只配对两个都是真孤儿的币种

    Args:
        exchange_coins: {coin: exchange_pos_dict}
        source: 'recover' / 'sync'

    Returns:
        收纳的孤儿仓位 symbol 列表
    """
    memory_coins = {symbol_to_coin(s) for s in self._positions}
    adopted_symbols: list[str] = []

    # Step 1: 收集所有内存未覆盖的 coin
    orphan_candidates = {}
    for coin, ex_pos in exchange_coins.items():
        if coin in memory_coins:
            continue
        szi = float(ex_pos.get("szi", 0))
        if szi == 0:
            continue
        orphan_candidates[coin] = ex_pos

    if not orphan_candidates:
        return []

    # Step 2: 批量查询数据库验证(性能优化:O(1) vs O(N))
    symbols = [coin_to_symbol(coin) for coin in orphan_candidates.keys()]
    db_positions_map = self._repo.get_positions_by_symbols(
        symbols, network=self._config.network.value
    )

    # Step 3: 从 DB 恢复已存在的仓位
    for coin, ex_pos in list(orphan_candidates.items()):
        symbol = coin_to_symbol(coin)
        db_row = db_positions_map.get(symbol)

        if db_row:
            # ✅ DB 有记录 → 恢复到内存
            try:
                position = self._load_position_from_db_row(db_row, exchange_coins)
                if position:
                    self._positions[symbol] = position
                    adopted_symbols.append(symbol)
                    # 从候选列表移除
                    orphan_candidates.pop(coin)

                    # DB 恢复日志(INFO 级别,正常流程)
                    logger.info(
                        f"✅ 从数据库恢复仓位 ({source}): {symbol} | "
                        f"方向={position.direction} | 大小={position.alt_size} | "
                        f"entry_adaptive_z={position.entry_adaptive_z:.4f} | "
                        f"position_id: {position.position_id}"
                    )
                    # DB 恢复不发送告警(正常流程)

                    # Pair 模式:同步标记 base_coin 已处理
                    if position.pair_mode == "pair" and position.base_symbol:
                        base_coin = symbol_to_coin(position.base_symbol)
                        memory_coins.add(coin)
                        memory_coins.add(base_coin)
                        orphan_candidates.pop(base_coin, None)

            except Exception as e:
                logger.error(
                    f"从 DB 恢复仓位失败: {symbol} | {e}",
                    exc_info=True
                )
                # 降级:保留在 orphan_candidates 中,按孤儿处理

    # Step 4: 处理真孤儿(DB 无记录)
    orphan_entries = orphan_candidates  # 剩余的都是真孤儿

    if not orphan_entries:
        return adopted_symbols

    # Fix #12: 智能配对(只配对两个都是真孤儿的币种)
    from src.config import HYPE_ALT_SYMBOL, HYPE_BASE_SYMBOL
    alt_coin = symbol_to_coin(HYPE_ALT_SYMBOL)
    base_coin = symbol_to_coin(HYPE_BASE_SYMBOL)
    paired_coins = set()

    # 配对前验证:两个币种都是真孤儿(DB 无记录)
    if alt_coin in orphan_entries and base_coin in orphan_entries:
        alt_ex = orphan_entries[alt_coin]
        base_ex = orphan_entries[base_coin]
        alt_szi = float(alt_ex.get("szi", 0))
        base_szi = float(base_ex.get("szi", 0))

        # 方向相反才配对
        if (alt_szi > 0) != (base_szi > 0):
            direction = "long" if alt_szi > 0 else "short"
            alt_size = abs(alt_szi)
            base_size = abs(base_szi)
            alt_entry_price = float(alt_ex.get("entryPx", 0))
            base_entry_price = float(base_ex.get("entryPx", 0))

            position = PairPosition(
                symbol=HYPE_ALT_SYMBOL,
                base_symbol=HYPE_BASE_SYMBOL,
                direction=direction,
                status=PositionStatus.OPEN,
                pair_mode="pair",
                alt_side="buy" if direction == "long" else "sell",
                alt_size=alt_size,
                alt_entry_price=alt_entry_price,
                base_side="sell" if direction == "long" else "buy",
                base_size=base_size,
                base_entry_price=base_entry_price,
                # 孤儿标记
                entry_zscore_4h=0.0,
                entry_adaptive_z=0.0,
                open_time=datetime.now().astimezone(),
                network=self._config.network.value,
            )

            self._positions[position.symbol] = position
            self._repo.save_position(position)
            paired_coins.update({alt_coin, base_coin})
            adopted_symbols.append(position.symbol)

            logger.warning(
                f"🔗 孤儿配对收纳 ({source}): {position.symbol} + {position.base_symbol} | "
                f"pair_mode=pair | 方向={direction} | "
                f"alt_size={alt_size} | base_size={base_size}"
            )

            try:
                sender_colourful(
                    title=f"🔗 孤儿配对收纳: {position.symbol}",
                    content=(
                        f"**来源**: {source}\n"
                        f"**Alt 腿**: {position.symbol} ({position.direction})\n"
                        f"**Base 腿**: {position.base_symbol}\n"
                        f"**大小**: {alt_size} / {base_size}\n"
                        f"---\n"
                        f"⚠️ **数据库无记录**,可能是外部手动创建\n"
                        f"**注意**: entry_adaptive_z=0,均值回归退出已禁用\n"
                        f"**网络**: {self._config.network_label}"
                    ),
                )
            except (OSError, ValueError) as e:
                logger.error(f"❌ 孤儿配对告警发送异常: {e}")

    # Step 5: 剩余 single 孤儿收纳
    for coin, ex_pos in orphan_entries.items():
        if coin in paired_coins:
            continue

        szi = float(ex_pos.get("szi", 0))
        direction = "long" if szi > 0 else "short"
        size = abs(szi)
        entry_price = float(ex_pos.get("entryPx", 0))
        symbol = coin_to_symbol(coin)

        position = PairPosition(
            symbol=symbol,
            base_symbol="",
            direction=direction,
            status=PositionStatus.OPEN,
            pair_mode="single",
            alt_side="buy" if direction == "long" else "sell",
            alt_size=size,
            alt_entry_price=entry_price,
            # 孤儿标记
            entry_zscore_4h=0.0,
            entry_adaptive_z=0.0,
            open_time=datetime.now().astimezone(),
            network=self._config.network.value,
        )

        self._positions[symbol] = position
        self._repo.save_position(position)
        adopted_symbols.append(symbol)

        # 真孤儿日志(WARNING 级别)
        logger.warning(
            f"🔗 真孤儿仓位收纳 ({source}): {symbol} | "
            f"方向={direction} | 大小={size} | "
            f"入场价=${entry_price:.4f} | "
            f"⚠️ 数据库无记录 | position_id: {position.position_id}"
        )

        # 真孤儿告警(区分于 DB 恢复)
        try:
            sender_colourful(
                title=f"🚨 真孤儿仓位收纳: {symbol}",
                content=(
                    f"**来源**: {source}\n"
                    f"**币种**: {symbol}\n"
                    f"**方向**: {direction}\n"
                    f"**大小**: {size}\n"
                    f"**入场价**: ${entry_price:.4f}\n"
                    f"---\n"
                    f"⚠️ **数据库无记录**,可能是外部手动创建的仓位\n"
                    f"**注意**: entry_adaptive_z=0,均值回归退出已禁用\n"
                    f"**建议**: 请确认此仓位来源,必要时手动平仓\n"
                    f"**网络**: {self._config.network_label}"
                ),
            )
        except (OSError, ValueError) as e:
            logger.error(f"❌ 真孤儿仓位告警发送异常: {e}")

    return adopted_symbols

4. recover_positions_from_db 重构

文件src/trading/position_manager.py

删除:L772-L792(内联 PairPosition 构建代码)

替换为

# 原 L772-L792 删除,替换为:
position = self._load_position_from_db_row(row, exchange_coins)
if not position:
    # 幽灵仓位:DB 有但交易所无
    logger.warning(f"幽灵仓位:DB 有但交易所无: {symbol}")
    self._repo.update_position_status(
        str(row["position_id"]), PositionStatus.CLOSED
    )
    continue

# 原 L806-L807 保持不变(peak_pnl_pct 恢复)
position.peak_pnl_pct = float(row.get("peak_pnl_pct", 0) or 0)

self._positions[symbol] = position
recovered += 1

Critical Files(关键文件)

文件路径 修改类型 说明
src/trading/trade_repository.py 新增方法 添加 get_positions_by_symbols 批量查询方法
src/trading/protocols.py 接口同步 TradeRepositoryProtocol 添加方法签名
src/trading/position_manager.py 新增+重写 1) 新增 _load_position_from_db_row 辅助方法
2) 完全重写 _detect_and_adopt_orphans (L996-L1144)
3) 重构 recover_positions_from_db (删除 L772-L792)

Verification(验证方案)

1. 单元测试

创建 tests/test_orphan_position_refactored.py

def test_db_exists_position():
    """DB 已存在仓位不应被判定为孤儿"""
    # 断言:PURR 不在 adopted 列表中
    # 断言:self._positions 中存在 PURR 记录
    # 断言:entry_adaptive_z 正确恢复

def test_true_orphan_pair():
    """双孤儿智能配对"""
    # 断言:创建 pair_mode="pair" 仓位
    # 断言:发送飞书告警

def test_true_orphan_single():
    """单币孤儿收纳"""
    # 断言:创建 pair_mode="single" 仓位

def test_db_query_failure():
    """DB 查询失败降级"""
    # 断言:返回 {},所有候选按孤儿处理

def test_batch_query_performance():
    """批量查询性能"""
    # 断言:SQL 执行 1 次(ANY 语法)

2. 集成测试

场景 验证点 预期结果
正常启动 无孤儿 日志无 WARNING
DB 恢复 交易所有 + DB 有 INFO 日志,无告警
真孤儿配对 交易所有 + DB 无(双币) WARNING 日志 + 飞书告警
真孤儿单币 交易所有 + DB 无(单币) WARNING 日志 + 飞书告警

3. 性能验证

  • 批量查询 10 个 symbol:<5ms
  • 批量查询 100 个 symbol:<10ms

4. 代码审查检查清单

  • [ ] 确认 L996-L1144(148 行)已完全删除
  • [ ] 确认 L772-L792(21 行)已完全删除
  • [ ] 确认无注释掉的旧代码
  • [ ] 确认 _load_position_from_db_row 在两处被调用
  • [ ] 确认所有辅助方法被实际使用

Implementation Steps(实施步骤)

Phase 1: TradeRepository 层

  1. src/trading/trade_repository.py 添加 get_positions_by_symbols 方法
  2. src/trading/protocols.py 同步接口定义
  3. 编写单元测试验证批量查询

Phase 2: PositionManager 层

  1. src/trading/position_manager.py 约 L722 后添加 _load_position_from_db_row 方法
  2. 完全删除 L996-L1144(_detect_and_adopt_orphans 旧实现)
  3. 在原位置重写 _detect_and_adopt_orphans 方法
  4. 修改 recover_positions_from_db 中的 L772-L792,改为调用辅助方法

Phase 3: 测试验证

  1. 运行单元测试,确保覆盖率 >90%
  2. 手动测试:正常启动、DB 恢复、真孤儿场景
  3. 性能测试:批量查询耗时验证

Phase 4: 代码审查

  1. 检查无死代码残留
  2. 检查所有辅助方法被调用
  3. 检查日志级别正确(INFO vs WARNING)

Expected Outcomes(预期结果)

功能改进

  • ✅ 避免误判:DB 已存在仓位不再被当作孤儿
  • ✅ 保留字段:entry_adaptive_z 完整恢复 → 均值回归策略正常工作
  • ✅ 精准告警:DB 恢复不告警,真孤儿才告警 → 告警噪音减少 50%+

性能优化

  • ✅ 批量查询 <5ms(10 个 symbol)
  • ✅ 总体性能影响 <10%

代码质量

  • ✅ 删除 169 行旧代码(148 + 21)
  • ✅ 无死代码、无冗余逻辑
  • ✅ 辅助方法复用,职责清晰

向后兼容

  • ✅ 无需数据迁移
  • ✅ 保持 Fix #12 配对逻辑
  • ✅ 保持告警格式和模式

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG