孤儿仓位设计方案1
孤儿仓位收纳优化:内存缺失时先查数据库
一、目标与背景
目标:将「孤儿仓位」定义为仅在交易所存在、且内存与数据库都不存在的仓位;当「内存无」时先查 DB,有则从 DB 恢复进内存,无再按现逻辑新建收纳。
背景:当前 _detect_and_adopt_orphans 只判断「交易所有且内存无」,未区分「DB 有 / DB 无」,可能导致同一交易所仓位被重复建仓、丢失 DB 中的 entry_z/时间等。
二、现状流程(简要)
- 入口:
recover_positions_from_db()、sync_with_exchange()内调用_detect_and_adopt_orphans(exchange_coins, source),调用方已持self._lock。 - 当前逻辑:
memory_coins = {symbol_to_coin(s) for s in self._positions}。- 对
exchange_coins中每个 coin,若coin not in memory_coins且szi != 0→ 加入orphan_entries。 - 先做 Fix #12 配对(alt+base 方向相反则合并为 pair),再对剩余按 single 新建
PairPosition并save_position。
- 缺失:未在步骤 2 之前/之中区分「DB 有该仓位」的情况,一律当新孤儿处理。
三、优化后整体流程
flowchart TB
subgraph input [输入]
EX[exchange_coins]
MEM[memory_coins]
end
subgraph phase1 [阶段1: 内存未覆盖的 coin]
A[对每个 coin in exchange 且 not in memory]
end
subgraph phase2 [阶段2: 先查 DB]
DB[get_open_positions]
INDEX[按 symbol 建索引 db_by_symbol]
CHECK{DB 中存在该 symbol 或 pair?}
end
subgraph phase3 [阶段3: 分支处理]
LOAD[从 DB 行构建 PairPosition 并刷新 size]
ADD_MEM[写入 self._positions 并更新 memory_coins]
ORPHAN[加入 orphan_entries]
end
subgraph phase4 [阶段4: 现有孤儿逻辑]
PAIR[Fix12 配对 alt+base]
SINGLE[剩余按 single 新建收纳]
end
EX --> phase1
MEM --> phase1
A --> DB
DB --> INDEX
INDEX --> CHECK
CHECK -->|有| LOAD
CHECK -->|无| ORPHAN
LOAD --> ADD_MEM
ADD_MEM --> adopted_symbols
ORPHAN --> phase4
PAIR --> adopted_symbols
SINGLE --> adopted_symbols
四、详细实现规格
4.1 辅助方法:_load_position_from_db_row
目的:从 DB 行 + 交易所快照构造并刷新 PairPosition,供「恢复」与「孤儿检测中 DB 命中」两处复用。
位置:src/trading/position_manager.py,放在 recover_positions_from_db 与 _detect_and_adopt_orphans 之间(例如约 722 行后)。
签名:
def _load_position_from_db_row(
self,
row: dict,
exchange_coins: dict[str, dict],
) -> PairPosition | None
语义:
- 用
row的字段构造PairPosition(与当前recover_positions_from_db内 772–791 行一致),包含:position_id,symbol,base_symbol,direction,status,pair_modealt_side,alt_size,alt_entry_price;base_side,base_size,base_entry_priceentry_zscore_4h,entry_adaptive_z,entry_avg_zscore_4h,entry_signal_strengthopen_time,entry_signal_id,network
- 用
exchange_coins刷新:position.alt_size = abs(float(exchange_coins[coin].get("szi", position.alt_size)))- 若
pair_mode == "pair"且base_symbol非空,且base_coin in exchange_coins,则刷新position.base_size。
- 不在此方法内写
self._positions或调用save_position,仅返回实例;调用方负责写入内存与是否落库。 - 若
symbol_to_coin(row["symbol"])不在exchange_coins中,返回None(调用方在调用前应保证 coin 在 exchange 且 szi≠0)。
与恢复流程的对接:recover_positions_from_db 中「由 row + exchange_coins 构建并刷新 size」的那段改为调用 position = self._load_position_from_db_row(row, exchange_coins),若返回非 None 则设置 position.peak_pnl_pct 后写入 self._positions、recovered += 1;若返回 None 则保持当前「幽灵仓位」分支逻辑不变。
4.2 DB 索引结构(在 _detect_and_adopt_orphans 内)
- 调用一次:
db_rows = self._repo.get_open_positions(network=self._config.network.value) or []。 - 建立索引:
db_by_symbol: dict[str, dict] = {row["symbol"]: row for row in db_rows}- 用于:给定 symbol(如
coin_to_symbol(coin)),快速判断 DB 是否有该 symbol 的开放仓位。 - 对于 pair 仓位:DB 中一行即代表 alt+base,
row["symbol"]为 alt symbol,row.get("base_symbol")为 base symbol;用symbol_to_coin(row["symbol"])、symbol_to_coin(row.get("base_symbol",""))可得到该行覆盖的 coins。
- 去重:若同一 symbol 有多行,
db_by_symbol保留最后一次即可;或约定只取第一个开放记录(与当前恢复逻辑一致)。
4.3 _detect_and_adopt_orphans 逐步逻辑
前置:调用方已持 self._lock;exchange_coins 的 key 为 coin(如 "PURR"),value 为交易所返回的 position 字典(含 szi、entryPx 等)。
步骤 1 — 内存与 DB 数据
memory_coins = {symbol_to_coin(s) for s in self._positions}db_rows = self._repo.get_open_positions(network=self._config.network.value) or []db_by_symbol = {row["symbol"]: row for row in db_rows}adopted_symbols: list[str] = []
步骤 2 — 「内存无」的 coin 先尝试从 DB 加载
- 对每个
coin, ex_pos in exchange_coins.items():- 若
coin in memory_coins:跳过。 - 若
float(ex_pos.get("szi", 0)) == 0:跳过。 symbol = coin_to_symbol(coin)。- 若
symbol not in db_by_symbol:该 coin 不在此步处理,留给后面孤儿逻辑;继续下一个 coin。 - 若
symbol in db_by_symbol:row = db_by_symbol[symbol]。- 若该 row 对应 pair 仓位(
row.get("base_symbol")非空),则检查 base 腿是否在交易所:base_coin = symbol_to_coin(row["base_symbol"]),若base_coin not in exchange_coins或 base 的 szi 为 0,则不从 DB 加载(交给后续孤儿逻辑或配对逻辑处理);若 base 在 exchange 且 szi≠0,则继续。 position = self._load_position_from_db_row(row, exchange_coins);若为None则跳过。position.peak_pnl_pct = float(row.get("peak_pnl_pct", 0) or 0)。self._positions[position.symbol] = position。memory_coins.add(coin);若为 pair,则memory_coins.add(base_coin)。adopted_symbols.append(position.symbol)。- 可选:从
db_by_symbol中移除该symbol,避免同一行被另一个 coin 再次加载。
- 若
步骤 3 — 构建真正的孤儿集合
orphan_entries = {}- 对每个
coin, ex_pos in exchange_coins.items():- 若
coin in memory_coins:跳过。 - 若
float(ex_pos.get("szi", 0)) == 0:跳过。 orphan_entries[coin] = ex_pos
- 若
- 若
not orphan_entries:return adopted_symbols。
步骤 4 — 现有 Fix #12 配对 + single 收纳
- 保持与现有实现一致:先检查 alt+base 配对且方向相反则合并为 pair 并
save_position,再对orphan_entries中未配对的按 single 新建并save_position。 - 所有新建或配对的 symbol 均加入
adopted_symbols。 - 最后
return adopted_symbols。
4.4 恢复流程对 _load_position_from_db_row 的用法
在 recover_positions_from_db 的 with self._lock 循环内:
- 对每个
row,若symbol in self._positions:跳过。 coin = symbol_to_coin(symbol);若coin not in exchange_coins:按当前逻辑标记幽灵并update_position_status(CLOSED),跳过。position = self._load_position_from_db_row(row, exchange_coins);若为None则跳过。position.peak_pnl_pct = float(row.get("peak_pnl_pct", 0) or 0)。self._positions[symbol] = position;recovered += 1。- 删除原先内联的 PairPosition 构造与 size 刷新代码块。
五、边界与一致性
- 同一 DB 行只加载一次:在步骤 2 中每个
symbol至多命中一次db_by_symbol,只加载一次并加入memory_coins,后续步骤 3 不会再把该 coin 放入orphan_entries。 - pair 行只通过 alt symbol 命中:DB 中 pair 仓位以 alt 的 symbol 为键;处理该 row 时同时把 alt_coin 与 base_coin 加入
memory_coins,避免 base 腿再被当作孤儿。 - base 腿在 DB 有、交易所无:步骤 2 中若发现 row 为 pair 但
base_coin not in exchange_coins或 base 的 szi 为 0,则不从 DB 加载该 row,该 coin 会进入orphan_entries;若仅 alt 在 exchange,则按 single 孤儿收纳,与当前「base 腿消失」的语义兼容。 - 并发:
_detect_and_adopt_orphans仍在调用方已持self._lock下执行,DB 查询在锁内,不引入新竞态。
六、日志与告警区分(可选)
- 从 DB 加载到内存(步骤 2):可打 info,如
从 DB 恢复至内存 ({source}): {symbol} | position_id: {id},便于与「真孤儿」区分;不必再发「孤儿收纳」的告警。 - 真孤儿(步骤 4 新建/配对):保持现有
logger.warning与sender_colourful「孤儿仓位收纳/配对收纳」的文案与告警,可酌情在文案中注明「交易所与系统内存、数据库均无记录,已新建纳入管理」。
七、测试要点
- DB 有、内存无:构造 DB 中存在某开放仓位(single 或 pair),且交易所 mock 返回对应 coin(s) 有持仓,内存无该 symbol;执行恢复或同步后,应从 DB 加载到
_positions,且不产生新position_id、不重复save_position新建;adopted_symbols包含该 symbol。 - DB 无、内存无:交易所有、DB 无该 symbol;应走现有孤儿逻辑,新建
PairPosition并save_position,行为与当前一致。 - DB 有、交易所无(幽灵):仅恢复流程处理;孤儿检测步骤 2 不会用该 row(因 coin 不在
exchange_coins),逻辑不变。 - 配对与 single 混合:在步骤 2 只恢复 DB 中存在的 symbol;其余仍按 Fix #12 配对 + single 收纳,确保 adopted_symbols 与预期一致。
八、实现清单
代码
- [ ] 新增
_load_position_from_db_row(row, exchange_coins),并用于recover_positions_from_db。 - [ ] 在
_detect_and_adopt_orphans开头增加 DB 查询与索引、步骤 2(内存无则先查 DB 并加载)、步骤 3 仅对「内存仍无」的 coin 建orphan_entries。
文档
- [ ]
docs/DATA_FLOW.md:情况3 下补充「内存无则先查 DB;DB 有则从 DB 加载,仅当内存与 DB 都没有时才收纳为孤儿」。 - [ ]
docs/README.md或架构说明:孤儿定义改为「交易所有,但系统内存与数据库都没有记录的仓位」。
九、涉及文件
| 文件 | 变更说明 |
|---|---|
src/trading/position_manager.py |
新增 _load_position_from_db_row;重构 recover_positions_from_db 使用该辅助方法;重构 _detect_and_adopt_orphans 增加「先查 DB 再收纳」逻辑 |
src/trading/trade_repository.py |
无变更(继续使用 get_open_positions(network)) |
docs/DATA_FLOW.md |
更新孤儿仓位检测流程描述 |
docs/README.md |
更新孤儿仓位定义(可选) |