孤儿仓位管理重构实施计划
孤儿仓位管理重构实施计划
Context(背景)
问题诊断
当前系统存在孤儿仓位判定逻辑缺陷:
现有流程:交易所有 + 内存无 → 直接判定为孤儿 → 创建新 PairPosition
核心问题:
- ❌ 跳过数据库查询验证步骤
- ❌ "数据库有记录但内存未加载"的正常仓位被误判为孤儿
- ❌ 导致重复创建
position_id、丢失entry_adaptive_z等关键字段 - ❌ 均值回归退出策略失效
触发场景:
- 内存被异常清理(OOM 后部分恢复)
recover_positions_from_db未完整执行(DB 连接中断)sync_with_exchange先于恢复流程运行(线程竞争)
修复目标
实施综合设计文档 docs/DESIGN_ORPHAN_POSITION_FINAL.md 中的优化方案:
新定义:孤儿仓位 = 交易所有 + 内存无 + 数据库也无
核心改进:
- 批量查询优化:使用 SQL
ANY(%s)批量查询,O(1) vs O(N) - 辅助方法复用:
_load_position_from_db_row供恢复和孤儿检测共用 - 智能配对:只配对两个都是真孤儿的币种
- 告警区分:DB 恢复 → INFO 日志不告警,真孤儿 → WARNING 日志 + 飞书告警
- 完全重构:彻底删除旧代码,无死代码残留
Implementation Plan(实施方案)
1. TradeRepository 层改造
文件:src/trading/trade_repository.py
新增方法:get_positions_by_symbols
def get_positions_by_symbols(
self,
symbols: list[str],
network: str = None
) -> dict[str, dict]:
"""批量查询活跃仓位(性能优化版)
Args:
symbols: 交易对列表 ["PURR/USDC:USDC", "HYPE/USDC:USDC"]
network: 网络标识(testnet/mainnet)
Returns:
{symbol: position_dict} - 仅包含数据库中存在的记录
性能特性:
- SQL: symbol = ANY(%s)(批量查询)
- 索引: idx_pair_positions_symbol_status
- 时间复杂度: O(1) vs 逐个查询 O(N)
- 预期耗时: <5ms(查询 3-10 个 symbol)
"""
if not symbols:
return {}
sql = """
SELECT * FROM pair_positions
WHERE symbol = ANY(%s)
AND status IN ('open', 'opening', 'closing')
"""
params = [symbols]
if network:
sql += " AND network = %s"
params.append(network)
try:
rows = self._db.execute_query(sql, tuple(params)) or []
return {row["symbol"]: row for row in rows}
except Exception as e:
logger.error(f"批量查询仓位失败: {e}", exc_info=True)
return {} # 降级:返回空字典
接口同步:src/trading/protocols.py
在 TradeRepositoryProtocol 类中添加方法签名(约 L68 后):
def get_positions_by_symbols(
self, symbols: list[str], network: str = None
) -> dict[str, dict]: ...
2. PositionManager 辅助方法
文件:src/trading/position_manager.py
新增方法:_load_position_from_db_row(约 L722 后,recover_positions_from_db 之前)
def _load_position_from_db_row(
self,
row: dict,
exchange_coins: dict[str, dict],
) -> PairPosition | None:
"""从 DB 行 + 交易所快照构造并刷新 PairPosition
复用场景:
1. recover_positions_from_db() - 启动恢复
2. _detect_and_adopt_orphans() - 孤儿检测中 DB 命中
Args:
row: 数据库查询结果行
exchange_coins: {coin: exchange_pos_dict}
Returns:
刷新后的 PairPosition 对象,失败返回 None
数据来源策略:
- DB 优先: entry_adaptive_z, entry_zscore_4h(信号快照)
- 交易所优先: alt_size, unrealized_pnl(实时数据)
- 混合: base_size(交易所实时 + DB 兜底)
"""
try:
symbol = row["symbol"]
coin = symbol_to_coin(symbol)
# 验证交易所是否有该 coin
if coin not in exchange_coins:
return None
ex_pos = exchange_coins[coin]
szi = float(ex_pos.get("szi", 0))
if szi == 0:
return None
position = PairPosition(
position_id=str(row["position_id"]),
symbol=symbol,
base_symbol=row["base_symbol"],
direction=row["direction"],
status=PositionStatus(row["status"]),
pair_mode=row.get("pair_mode", "single"),
# Alt 腿(使用交易所实时数据)
alt_side=row.get("alt_side", ""),
alt_size=abs(szi),
alt_entry_price=float(row.get("alt_entry_price", 0)),
# Base 腿(DB 数据)
base_side=row.get("base_side", ""),
base_size=float(row.get("base_size", 0)),
base_entry_price=float(row.get("base_entry_price", 0)),
# 🔑 关键:信号快照字段(完整恢复)
entry_zscore_4h=float(row.get("entry_zscore_4h", 0) or 0),
entry_adaptive_z=float(row.get("entry_adaptive_z", 0) or 0),
entry_avg_zscore_4h=row.get("entry_avg_zscore_4h"),
entry_signal_strength=row.get("entry_signal_strength", ""),
# 盈亏数据(交易所实时 + DB 历史)
unrealized_pnl=float(ex_pos.get("unrealizedPnl", 0)),
realized_pnl=float(row.get("realized_pnl", 0) or 0),
peak_pnl_pct=float(row.get("peak_pnl_pct", 0) or 0),
# 时间与关联
open_time=(
row.get("open_time").astimezone()
if row.get("open_time") else None
),
entry_signal_id=str(row.get("entry_signal_id", "")),
network=row.get("network", "testnet"),
)
# Pair 模式:刷新 base 腿实时大小
if position.pair_mode == "pair" and position.base_symbol:
base_coin = symbol_to_coin(position.base_symbol)
if base_coin in exchange_coins:
base_ex = exchange_coins[base_coin]
base_szi = float(base_ex.get("szi", 0))
if base_szi != 0:
position.base_size = abs(base_szi)
return position
except Exception as e:
logger.error(
f"从 DB 行加载仓位失败: {row.get('symbol', '?')} | {e}",
exc_info=True,
)
return None
3. _detect_and_adopt_orphans 完全重写
文件:src/trading/position_manager.py
删除:L996-L1144(148 行旧代码)
重写:
def _detect_and_adopt_orphans(
self,
exchange_coins: dict,
source: str
) -> list[str]:
"""检测交易所存在但内存中没有的孤儿仓位,纳入管理
改进:
- 在判定孤儿前,先批量查询数据库验证
- 区分"DB 恢复"(保留字段)和"真孤儿"(entry_adaptive_z=0)
- 只对真孤儿发送告警通知
- 智能配对:只配对两个都是真孤儿的币种
Args:
exchange_coins: {coin: exchange_pos_dict}
source: 'recover' / 'sync'
Returns:
收纳的孤儿仓位 symbol 列表
"""
memory_coins = {symbol_to_coin(s) for s in self._positions}
adopted_symbols: list[str] = []
# Step 1: 收集所有内存未覆盖的 coin
orphan_candidates = {}
for coin, ex_pos in exchange_coins.items():
if coin in memory_coins:
continue
szi = float(ex_pos.get("szi", 0))
if szi == 0:
continue
orphan_candidates[coin] = ex_pos
if not orphan_candidates:
return []
# Step 2: 批量查询数据库验证(性能优化:O(1) vs O(N))
symbols = [coin_to_symbol(coin) for coin in orphan_candidates.keys()]
db_positions_map = self._repo.get_positions_by_symbols(
symbols, network=self._config.network.value
)
# Step 3: 从 DB 恢复已存在的仓位
for coin, ex_pos in list(orphan_candidates.items()):
symbol = coin_to_symbol(coin)
db_row = db_positions_map.get(symbol)
if db_row:
# ✅ DB 有记录 → 恢复到内存
try:
position = self._load_position_from_db_row(db_row, exchange_coins)
if position:
self._positions[symbol] = position
adopted_symbols.append(symbol)
# 从候选列表移除
orphan_candidates.pop(coin)
# DB 恢复日志(INFO 级别,正常流程)
logger.info(
f"✅ 从数据库恢复仓位 ({source}): {symbol} | "
f"方向={position.direction} | 大小={position.alt_size} | "
f"entry_adaptive_z={position.entry_adaptive_z:.4f} | "
f"position_id: {position.position_id}"
)
# DB 恢复不发送告警(正常流程)
# Pair 模式:同步标记 base_coin 已处理
if position.pair_mode == "pair" and position.base_symbol:
base_coin = symbol_to_coin(position.base_symbol)
memory_coins.add(coin)
memory_coins.add(base_coin)
orphan_candidates.pop(base_coin, None)
except Exception as e:
logger.error(
f"从 DB 恢复仓位失败: {symbol} | {e}",
exc_info=True
)
# 降级:保留在 orphan_candidates 中,按孤儿处理
# Step 4: 处理真孤儿(DB 无记录)
orphan_entries = orphan_candidates # 剩余的都是真孤儿
if not orphan_entries:
return adopted_symbols
# Fix #12: 智能配对(只配对两个都是真孤儿的币种)
from src.config import HYPE_ALT_SYMBOL, HYPE_BASE_SYMBOL
alt_coin = symbol_to_coin(HYPE_ALT_SYMBOL)
base_coin = symbol_to_coin(HYPE_BASE_SYMBOL)
paired_coins = set()
# 配对前验证:两个币种都是真孤儿(DB 无记录)
if alt_coin in orphan_entries and base_coin in orphan_entries:
alt_ex = orphan_entries[alt_coin]
base_ex = orphan_entries[base_coin]
alt_szi = float(alt_ex.get("szi", 0))
base_szi = float(base_ex.get("szi", 0))
# 方向相反才配对
if (alt_szi > 0) != (base_szi > 0):
direction = "long" if alt_szi > 0 else "short"
alt_size = abs(alt_szi)
base_size = abs(base_szi)
alt_entry_price = float(alt_ex.get("entryPx", 0))
base_entry_price = float(base_ex.get("entryPx", 0))
position = PairPosition(
symbol=HYPE_ALT_SYMBOL,
base_symbol=HYPE_BASE_SYMBOL,
direction=direction,
status=PositionStatus.OPEN,
pair_mode="pair",
alt_side="buy" if direction == "long" else "sell",
alt_size=alt_size,
alt_entry_price=alt_entry_price,
base_side="sell" if direction == "long" else "buy",
base_size=base_size,
base_entry_price=base_entry_price,
# 孤儿标记
entry_zscore_4h=0.0,
entry_adaptive_z=0.0,
open_time=datetime.now().astimezone(),
network=self._config.network.value,
)
self._positions[position.symbol] = position
self._repo.save_position(position)
paired_coins.update({alt_coin, base_coin})
adopted_symbols.append(position.symbol)
logger.warning(
f"🔗 孤儿配对收纳 ({source}): {position.symbol} + {position.base_symbol} | "
f"pair_mode=pair | 方向={direction} | "
f"alt_size={alt_size} | base_size={base_size}"
)
try:
sender_colourful(
title=f"🔗 孤儿配对收纳: {position.symbol}",
content=(
f"**来源**: {source}\n"
f"**Alt 腿**: {position.symbol} ({position.direction})\n"
f"**Base 腿**: {position.base_symbol}\n"
f"**大小**: {alt_size} / {base_size}\n"
f"---\n"
f"⚠️ **数据库无记录**,可能是外部手动创建\n"
f"**注意**: entry_adaptive_z=0,均值回归退出已禁用\n"
f"**网络**: {self._config.network_label}"
),
)
except (OSError, ValueError) as e:
logger.error(f"❌ 孤儿配对告警发送异常: {e}")
# Step 5: 剩余 single 孤儿收纳
for coin, ex_pos in orphan_entries.items():
if coin in paired_coins:
continue
szi = float(ex_pos.get("szi", 0))
direction = "long" if szi > 0 else "short"
size = abs(szi)
entry_price = float(ex_pos.get("entryPx", 0))
symbol = coin_to_symbol(coin)
position = PairPosition(
symbol=symbol,
base_symbol="",
direction=direction,
status=PositionStatus.OPEN,
pair_mode="single",
alt_side="buy" if direction == "long" else "sell",
alt_size=size,
alt_entry_price=entry_price,
# 孤儿标记
entry_zscore_4h=0.0,
entry_adaptive_z=0.0,
open_time=datetime.now().astimezone(),
network=self._config.network.value,
)
self._positions[symbol] = position
self._repo.save_position(position)
adopted_symbols.append(symbol)
# 真孤儿日志(WARNING 级别)
logger.warning(
f"🔗 真孤儿仓位收纳 ({source}): {symbol} | "
f"方向={direction} | 大小={size} | "
f"入场价=${entry_price:.4f} | "
f"⚠️ 数据库无记录 | position_id: {position.position_id}"
)
# 真孤儿告警(区分于 DB 恢复)
try:
sender_colourful(
title=f"🚨 真孤儿仓位收纳: {symbol}",
content=(
f"**来源**: {source}\n"
f"**币种**: {symbol}\n"
f"**方向**: {direction}\n"
f"**大小**: {size}\n"
f"**入场价**: ${entry_price:.4f}\n"
f"---\n"
f"⚠️ **数据库无记录**,可能是外部手动创建的仓位\n"
f"**注意**: entry_adaptive_z=0,均值回归退出已禁用\n"
f"**建议**: 请确认此仓位来源,必要时手动平仓\n"
f"**网络**: {self._config.network_label}"
),
)
except (OSError, ValueError) as e:
logger.error(f"❌ 真孤儿仓位告警发送异常: {e}")
return adopted_symbols
4. recover_positions_from_db 重构
文件:src/trading/position_manager.py
删除:L772-L792(内联 PairPosition 构建代码)
替换为:
# 原 L772-L792 删除,替换为:
position = self._load_position_from_db_row(row, exchange_coins)
if not position:
# 幽灵仓位:DB 有但交易所无
logger.warning(f"幽灵仓位:DB 有但交易所无: {symbol}")
self._repo.update_position_status(
str(row["position_id"]), PositionStatus.CLOSED
)
continue
# 原 L806-L807 保持不变(peak_pnl_pct 恢复)
position.peak_pnl_pct = float(row.get("peak_pnl_pct", 0) or 0)
self._positions[symbol] = position
recovered += 1
Critical Files(关键文件)
| 文件路径 | 修改类型 | 说明 |
|---|---|---|
src/trading/trade_repository.py |
新增方法 | 添加 get_positions_by_symbols 批量查询方法 |
src/trading/protocols.py |
接口同步 | 在 TradeRepositoryProtocol 添加方法签名 |
src/trading/position_manager.py |
新增+重写 | 1) 新增 _load_position_from_db_row 辅助方法2) 完全重写 _detect_and_adopt_orphans (L996-L1144)3) 重构 recover_positions_from_db (删除 L772-L792) |
Verification(验证方案)
1. 单元测试
创建 tests/test_orphan_position_refactored.py:
def test_db_exists_position():
"""DB 已存在仓位不应被判定为孤儿"""
# 断言:PURR 不在 adopted 列表中
# 断言:self._positions 中存在 PURR 记录
# 断言:entry_adaptive_z 正确恢复
def test_true_orphan_pair():
"""双孤儿智能配对"""
# 断言:创建 pair_mode="pair" 仓位
# 断言:发送飞书告警
def test_true_orphan_single():
"""单币孤儿收纳"""
# 断言:创建 pair_mode="single" 仓位
def test_db_query_failure():
"""DB 查询失败降级"""
# 断言:返回 {},所有候选按孤儿处理
def test_batch_query_performance():
"""批量查询性能"""
# 断言:SQL 执行 1 次(ANY 语法)
2. 集成测试
| 场景 | 验证点 | 预期结果 |
|---|---|---|
| 正常启动 | 无孤儿 | 日志无 WARNING |
| DB 恢复 | 交易所有 + DB 有 | INFO 日志,无告警 |
| 真孤儿配对 | 交易所有 + DB 无(双币) | WARNING 日志 + 飞书告警 |
| 真孤儿单币 | 交易所有 + DB 无(单币) | WARNING 日志 + 飞书告警 |
3. 性能验证
- 批量查询 10 个 symbol:<5ms
- 批量查询 100 个 symbol:<10ms
4. 代码审查检查清单
- [ ] 确认 L996-L1144(148 行)已完全删除
- [ ] 确认 L772-L792(21 行)已完全删除
- [ ] 确认无注释掉的旧代码
- [ ] 确认
_load_position_from_db_row在两处被调用 - [ ] 确认所有辅助方法被实际使用
Implementation Steps(实施步骤)
Phase 1: TradeRepository 层
- 在
src/trading/trade_repository.py添加get_positions_by_symbols方法 - 在
src/trading/protocols.py同步接口定义 - 编写单元测试验证批量查询
Phase 2: PositionManager 层
- 在
src/trading/position_manager.py约 L722 后添加_load_position_from_db_row方法 - 完全删除 L996-L1144(
_detect_and_adopt_orphans旧实现) - 在原位置重写
_detect_and_adopt_orphans方法 - 修改
recover_positions_from_db中的 L772-L792,改为调用辅助方法
Phase 3: 测试验证
- 运行单元测试,确保覆盖率 >90%
- 手动测试:正常启动、DB 恢复、真孤儿场景
- 性能测试:批量查询耗时验证
Phase 4: 代码审查
- 检查无死代码残留
- 检查所有辅助方法被调用
- 检查日志级别正确(INFO vs WARNING)
Expected Outcomes(预期结果)
功能改进
- ✅ 避免误判:DB 已存在仓位不再被当作孤儿
- ✅ 保留字段:
entry_adaptive_z完整恢复 → 均值回归策略正常工作 - ✅ 精准告警:DB 恢复不告警,真孤儿才告警 → 告警噪音减少 50%+
性能优化
- ✅ 批量查询 <5ms(10 个 symbol)
- ✅ 总体性能影响 <10%
代码质量
- ✅ 删除 169 行旧代码(148 + 21)
- ✅ 无死代码、无冗余逻辑
- ✅ 辅助方法复用,职责清晰
向后兼容
- ✅ 无需数据迁移
- ✅ 保持 Fix #12 配对逻辑
- ✅ 保持告警格式和模式