孤儿仓位设计方案1

孤儿仓位收纳优化:内存缺失时先查数据库

一、目标与背景

目标:将「孤儿仓位」定义为仅在交易所存在、且内存与数据库都不存在的仓位;当「内存无」时先查 DB,有则从 DB 恢复进内存,无再按现逻辑新建收纳。

背景:当前 _detect_and_adopt_orphans 只判断「交易所有且内存无」,未区分「DB 有 / DB 无」,可能导致同一交易所仓位被重复建仓、丢失 DB 中的 entry_z/时间等。


二、现状流程(简要)

  • 入口recover_positions_from_db()sync_with_exchange() 内调用 _detect_and_adopt_orphans(exchange_coins, source),调用方已持 self._lock
  • 当前逻辑
    1. memory_coins = {symbol_to_coin(s) for s in self._positions}
    2. exchange_coins 中每个 coin,若 coin not in memory_coinsszi != 0 → 加入 orphan_entries
    3. 先做 Fix #12 配对(alt+base 方向相反则合并为 pair),再对剩余按 single 新建 PairPositionsave_position
  • 缺失:未在步骤 2 之前/之中区分「DB 有该仓位」的情况,一律当新孤儿处理。

三、优化后整体流程

flowchart TB
  subgraph input [输入]
    EX[exchange_coins]
    MEM[memory_coins]
  end
  subgraph phase1 [阶段1: 内存未覆盖的 coin]
    A[对每个 coin in exchange 且 not in memory]
  end
  subgraph phase2 [阶段2: 先查 DB]
    DB[get_open_positions]
    INDEX[按 symbol 建索引 db_by_symbol]
    CHECK{DB 中存在该 symbol 或 pair?}
  end
  subgraph phase3 [阶段3: 分支处理]
    LOAD[从 DB 行构建 PairPosition 并刷新 size]
    ADD_MEM[写入 self._positions 并更新 memory_coins]
    ORPHAN[加入 orphan_entries]
  end
  subgraph phase4 [阶段4: 现有孤儿逻辑]
    PAIR[Fix12 配对 alt+base]
    SINGLE[剩余按 single 新建收纳]
  end
  EX --> phase1
  MEM --> phase1
  A --> DB
  DB --> INDEX
  INDEX --> CHECK
  CHECK -->|有| LOAD
  CHECK -->|无| ORPHAN
  LOAD --> ADD_MEM
  ADD_MEM --> adopted_symbols
  ORPHAN --> phase4
  PAIR --> adopted_symbols
  SINGLE --> adopted_symbols

四、详细实现规格

4.1 辅助方法:_load_position_from_db_row

目的:从 DB 行 + 交易所快照构造并刷新 PairPosition,供「恢复」与「孤儿检测中 DB 命中」两处复用。

位置src/trading/position_manager.py,放在 recover_positions_from_db_detect_and_adopt_orphans 之间(例如约 722 行后)。

签名

def _load_position_from_db_row(
    self,
    row: dict,
    exchange_coins: dict[str, dict],
) -> PairPosition | None

语义

  • row 的字段构造 PairPosition(与当前 recover_positions_from_db 内 772–791 行一致),包含:
    • position_id, symbol, base_symbol, direction, status, pair_mode
    • alt_side, alt_size, alt_entry_pricebase_side, base_size, base_entry_price
    • entry_zscore_4h, entry_adaptive_z, entry_avg_zscore_4h, entry_signal_strength
    • open_time, entry_signal_id, network
  • exchange_coins 刷新:
    • position.alt_size = abs(float(exchange_coins[coin].get("szi", position.alt_size)))
    • pair_mode == "pair"base_symbol 非空,且 base_coin in exchange_coins,则刷新 position.base_size
  • 在此方法内写 self._positions 或调用 save_position,仅返回实例;调用方负责写入内存与是否落库。
  • symbol_to_coin(row["symbol"]) 不在 exchange_coins 中,返回 None(调用方在调用前应保证 coin 在 exchange 且 szi≠0)。

与恢复流程的对接recover_positions_from_db 中「由 row + exchange_coins 构建并刷新 size」的那段改为调用 position = self._load_position_from_db_row(row, exchange_coins),若返回非 None 则设置 position.peak_pnl_pct 后写入 self._positionsrecovered += 1;若返回 None 则保持当前「幽灵仓位」分支逻辑不变。

4.2 DB 索引结构(在 _detect_and_adopt_orphans 内)

  • 调用一次:db_rows = self._repo.get_open_positions(network=self._config.network.value) or []
  • 建立索引:
    • db_by_symbol: dict[str, dict] = {row["symbol"]: row for row in db_rows}
    • 用于:给定 symbol(如 coin_to_symbol(coin)),快速判断 DB 是否有该 symbol 的开放仓位。
    • 对于 pair 仓位:DB 中一行即代表 alt+base,row["symbol"] 为 alt symbol,row.get("base_symbol") 为 base symbol;用 symbol_to_coin(row["symbol"])symbol_to_coin(row.get("base_symbol","")) 可得到该行覆盖的 coins。
  • 去重:若同一 symbol 有多行,db_by_symbol 保留最后一次即可;或约定只取第一个开放记录(与当前恢复逻辑一致)。

4.3 _detect_and_adopt_orphans 逐步逻辑

前置:调用方已持 self._lockexchange_coins 的 key 为 coin(如 "PURR"),value 为交易所返回的 position 字典(含 szientryPx 等)。

步骤 1 — 内存与 DB 数据

  • memory_coins = {symbol_to_coin(s) for s in self._positions}
  • db_rows = self._repo.get_open_positions(network=self._config.network.value) or []
  • db_by_symbol = {row["symbol"]: row for row in db_rows}
  • adopted_symbols: list[str] = []

步骤 2 — 「内存无」的 coin 先尝试从 DB 加载

  • 对每个 coin, ex_pos in exchange_coins.items()
    • coin in memory_coins:跳过。
    • float(ex_pos.get("szi", 0)) == 0:跳过。
    • symbol = coin_to_symbol(coin)
    • symbol not in db_by_symbol:该 coin 不在此步处理,留给后面孤儿逻辑;继续下一个 coin。
    • symbol in db_by_symbol
      • row = db_by_symbol[symbol]
      • 若该 row 对应 pair 仓位(row.get("base_symbol") 非空),则检查 base 腿是否在交易所:base_coin = symbol_to_coin(row["base_symbol"]),若 base_coin not in exchange_coins 或 base 的 szi 为 0,则从 DB 加载(交给后续孤儿逻辑或配对逻辑处理);若 base 在 exchange 且 szi≠0,则继续。
      • position = self._load_position_from_db_row(row, exchange_coins);若为 None 则跳过。
      • position.peak_pnl_pct = float(row.get("peak_pnl_pct", 0) or 0)
      • self._positions[position.symbol] = position
      • memory_coins.add(coin);若为 pair,则 memory_coins.add(base_coin)
      • adopted_symbols.append(position.symbol)
      • 可选:从 db_by_symbol 中移除该 symbol,避免同一行被另一个 coin 再次加载。

步骤 3 — 构建真正的孤儿集合

  • orphan_entries = {}
  • 对每个 coin, ex_pos in exchange_coins.items()
    • coin in memory_coins:跳过。
    • float(ex_pos.get("szi", 0)) == 0:跳过。
    • orphan_entries[coin] = ex_pos
  • not orphan_entriesreturn adopted_symbols

步骤 4 — 现有 Fix #12 配对 + single 收纳

  • 保持与现有实现一致:先检查 alt+base 配对且方向相反则合并为 pair 并 save_position,再对 orphan_entries 中未配对的按 single 新建并 save_position
  • 所有新建或配对的 symbol 均加入 adopted_symbols
  • 最后 return adopted_symbols

4.4 恢复流程对 _load_position_from_db_row 的用法

recover_positions_from_dbwith self._lock 循环内:

  • 对每个 row,若 symbol in self._positions:跳过。
  • coin = symbol_to_coin(symbol);若 coin not in exchange_coins:按当前逻辑标记幽灵并 update_position_status(CLOSED),跳过。
  • position = self._load_position_from_db_row(row, exchange_coins);若为 None 则跳过。
  • position.peak_pnl_pct = float(row.get("peak_pnl_pct", 0) or 0)
  • self._positions[symbol] = positionrecovered += 1
  • 删除原先内联的 PairPosition 构造与 size 刷新代码块。

五、边界与一致性

  • 同一 DB 行只加载一次:在步骤 2 中每个 symbol 至多命中一次 db_by_symbol,只加载一次并加入 memory_coins,后续步骤 3 不会再把该 coin 放入 orphan_entries
  • pair 行只通过 alt symbol 命中:DB 中 pair 仓位以 alt 的 symbol 为键;处理该 row 时同时把 alt_coin 与 base_coin 加入 memory_coins,避免 base 腿再被当作孤儿。
  • base 腿在 DB 有、交易所无:步骤 2 中若发现 row 为 pair 但 base_coin not in exchange_coins 或 base 的 szi 为 0,则不从 DB 加载该 row,该 coin 会进入 orphan_entries;若仅 alt 在 exchange,则按 single 孤儿收纳,与当前「base 腿消失」的语义兼容。
  • 并发_detect_and_adopt_orphans 仍在调用方已持 self._lock 下执行,DB 查询在锁内,不引入新竞态。

六、日志与告警区分(可选)

  • 从 DB 加载到内存(步骤 2):可打 info,如 从 DB 恢复至内存 ({source}): {symbol} | position_id: {id},便于与「真孤儿」区分;不必再发「孤儿收纳」的告警。
  • 真孤儿(步骤 4 新建/配对):保持现有 logger.warningsender_colourful「孤儿仓位收纳/配对收纳」的文案与告警,可酌情在文案中注明「交易所与系统内存、数据库均无记录,已新建纳入管理」。

七、测试要点

  • DB 有、内存无:构造 DB 中存在某开放仓位(single 或 pair),且交易所 mock 返回对应 coin(s) 有持仓,内存无该 symbol;执行恢复或同步后,应从 DB 加载到 _positions,且不产生新 position_id、不重复 save_position 新建;adopted_symbols 包含该 symbol。
  • DB 无、内存无:交易所有、DB 无该 symbol;应走现有孤儿逻辑,新建 PairPositionsave_position,行为与当前一致。
  • DB 有、交易所无(幽灵):仅恢复流程处理;孤儿检测步骤 2 不会用该 row(因 coin 不在 exchange_coins),逻辑不变。
  • 配对与 single 混合:在步骤 2 只恢复 DB 中存在的 symbol;其余仍按 Fix #12 配对 + single 收纳,确保 adopted_symbols 与预期一致。

八、实现清单

代码

  • [ ] 新增 _load_position_from_db_row(row, exchange_coins),并用于 recover_positions_from_db
  • [ ] 在 _detect_and_adopt_orphans 开头增加 DB 查询与索引、步骤 2(内存无则先查 DB 并加载)、步骤 3 仅对「内存仍无」的 coin 建 orphan_entries

文档

  • [ ] docs/DATA_FLOW.md:情况3 下补充「内存无则先查 DB;DB 有则从 DB 加载,仅当内存与 DB 都没有时才收纳为孤儿」。
  • [ ] docs/README.md 或架构说明:孤儿定义改为「交易所有,但系统内存与数据库都没有记录的仓位」。

九、涉及文件

文件 变更说明
src/trading/position_manager.py 新增 _load_position_from_db_row;重构 recover_positions_from_db 使用该辅助方法;重构 _detect_and_adopt_orphans 增加「先查 DB 再收纳」逻辑
src/trading/trade_repository.py 无变更(继续使用 get_open_positions(network)
docs/DATA_FLOW.md 更新孤儿仓位检测流程描述
docs/README.md 更新孤儿仓位定义(可选)

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG