孤儿仓位设计方案4
孤儿仓位管理优化 — 综合设计方案
目标: 区分"DB已存在但内存遗漏"和"真正的孤儿仓位",避免重复创建、保留关键字段、减少告警噪音
一、问题诊断
1.1 当前缺陷
现有判定逻辑:
# 当前实现: position_manager.py L1011-L1022
memory_coins = {symbol_to_coin(s) for s in self._positions} # ← 仅查内存
orphan_entries = {}
for coin, ex_pos in exchange_coins.items():
if coin in memory_coins: # ← 内存命中 → 跳过
continue
szi = float(ex_pos.get("szi", 0))
if szi == 0:
continue
orphan_entries[coin] = ex_pos # ← 内存未命中 → 直接标记为孤儿 ❌
判定链路: 交易所有 → 内存无 → 直接认定为孤儿 → 创建新 PairPosition → save_position() 写入 DB
核心问题:
- ❌ 跳过数据库查询验证步骤
- ❌ "数据库有记录但内存未加载"的正常仓位被误判为孤儿
- ❌ 导致
entry_adaptive_z、entry_zscore_4h等关键字段丢失 - ❌ 均值回归退出策略失效
- ❌ 可能产生重复的
position_id记录
1.2 缺陷场景
sequenceDiagram
participant Exchange as 交易所
participant Memory as 内存 (_positions)
participant DB as 数据库 (pair_positions)
participant System as 系统
Note over Exchange, DB: 场景: 仓位在 DB 有记录,但内存被清理
System->>Exchange: 查询实际仓位
Exchange-->>System: coin=PURR, szi=100
System->>Memory: coin in memory_coins?
Memory-->>System: ❌ 不存在
Note over System: ⚠️ 当前: 直接创建孤儿仓位
System->>DB: save_position(新 position_id)
Note over DB: 💥 DB 中已有该 coin 的活跃仓位<br/>产生重复记录!
触发条件 (任一即可):
| # | 场景 | 发生时机 |
|---|---|---|
| 1 | 内存被异常清理 | OOM 后部分恢复、热重载 |
| 2 | recover_positions_from_db 未完整执行 |
启动阶段 DB 连接中断 |
| 3 | sync_with_exchange 先于恢复流程运行 |
线程竞争 |
| 4 | 多实例部署同网络 | 实例 A 开仓 → 实例 B 检测到孤儿 |
1.3 业务影响
受影响的关键字段:
entry_adaptive_z: 均值回归退出的基线 (最关键)entry_zscore_4h: 入场时的 Z-score 快照entry_avg_zscore_4h: 入场时的平均 Z-scoreentry_signal_strength: 信号强度entry_signal_id: 信号 ID (追溯用)peak_pnl_pct: 峰值盈利百分比 (移动止损用)
业务后果:
- ⚠️ 均值回归退出策略无法工作 (entry_adaptive_z=0 被跳过)
- ⚠️ 移动止损可能失效 (peak_pnl_pct 丢失)
- ⚠️ 信号追溯困难 (entry_signal_id 丢失)
- ⚠️ 告警噪音增加 (正常恢复被当作异常告警)
- ⚠️ DB 中同一 coin 出现两条活跃仓位记录
- ⚠️ PnL 统计重复计算
二、解决方案
2.1 核心设计原则
新定义: 孤儿仓位 = 仅在交易所存在,且内存与数据库都不存在 的仓位
修正后的判定链路:
flowchart TD
A["交易所存在仓位<br/>(coin in exchange_coins)"] --> B{"内存中有记录?<br/>(coin in memory_coins)"}
B -->|✅ 有| C["跳过 (非孤儿)"]
B -->|❌ 无| D{"🆕 批量查询数据库<br/>(get_positions_by_symbols)"}
D -->|✅ DB 有| E["从 DB 恢复到内存<br/>(_load_position_from_db_row)"]
D -->|❌ DB 无| F["确认为真孤儿"]
F --> G["智能配对 (Fix #12)"]
G -->|配对成功| H["创建 pair 仓位"]
G -->|无法配对| I["创建 single 孤儿仓位"]
style D fill:#ffd700,stroke:#333,color:#000
style E fill:#90ee90,stroke:#333,color:#000
style F fill:#ffcccc,stroke:#333,color:#000
2.2 架构层次
┌─────────────────────────────────────────┐
│ Layer 1: trade_repository.py │
│ - 新增批量查询方法 │
│ - get_positions_by_symbols() │
│ (性能优化: O(1) vs O(N)) │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Layer 2: position_manager.py │
│ - _load_position_from_db_row() │
│ (复用: 恢复 + 孤儿检测) │
│ - _detect_and_adopt_orphans() (重构) │
│ (批量验证 + 智能配对 + 告警区分) │
└─────────────────────────────────────────┘
三、详细实现
3.1 TradeRepository — 批量查询方法
文件: src/trading/trade_repository.py
新增方法:
def get_positions_by_symbols(
self,
symbols: list[str],
network: str = None
) -> dict[str, dict]:
"""批量查询活跃仓位 (性能优化版)
Args:
symbols: 交易对列表 ["PURR/USDC:USDC", "HYPE/USDC:USDC"]
network: 网络标识 (testnet/mainnet)
Returns:
{symbol: position_dict} - 仅包含数据库中存在的记录
性能特性:
- SQL: symbol = ANY(%s) (批量查询)
- 索引: idx_pair_positions_symbol_status
- 时间复杂度: O(1) vs 逐个查询 O(N)
- 预期耗时: <5ms (查询 3-10 个 symbol)
"""
if not symbols:
return {}
sql = """
SELECT * FROM pair_positions
WHERE symbol = ANY(%s)
AND status IN ('open', 'opening', 'closing')
"""
params = [symbols]
if network:
sql += " AND network = %s"
params.append(network)
try:
rows = self._db.execute_query(sql, tuple(params)) or []
# 构建索引: symbol → db_row (去重保留最后一条)
return {row["symbol"]: row for row in rows}
except Exception as e:
logger.error(f"批量查询仓位失败: {e}", exc_info=True)
return {} # 降级: 返回空字典,后续按孤儿处理
SQL 性能分析:
-- 利用现有索引
CREATE INDEX idx_pair_positions_symbol_status
ON pair_positions (symbol, status);
-- 查询示例
SELECT * FROM pair_positions
WHERE symbol = ANY(ARRAY['PURR/USDC:USDC', 'HYPE/USDC:USDC'])
AND status IN ('open', 'opening', 'closing')
AND network = 'testnet';
-- 执行计划 (预期)
Index Scan using idx_pair_positions_symbol_status on pair_positions
Index Cond: (symbol = ANY(...) AND status IN (...))
Filter: (network = 'testnet')
性能对比:
| 方法 | 查询次数 | 耗时 (10个symbol) |
|---|---|---|
| 逐个查询 | N次 | ~30ms |
| 批量查询 | 1次 | <5ms |
3.2 TradeRepositoryProtocol — 接口同步
文件: src/trading/protocols.py
class TradeRepositoryProtocol(Protocol):
def save_signal(...) -> None: ...
def save_position(self, pos: PairPosition) -> None: ...
def update_position_status(...) -> None: ...
def get_open_positions(self, network: str = None) -> list[dict]: ...
+ def get_positions_by_symbols(
+ self, symbols: list[str], network: str = None
+ ) -> dict[str, dict]: ...
def save_order(...) -> None: ...
def update_daily_stats(self, **kwargs) -> None: ...
3.3 PositionManager — 辅助方法
文件: src/trading/position_manager.py
方法 1: 从 DB 行加载仓位 (复用)
def _load_position_from_db_row(
self,
row: dict,
exchange_coins: dict[str, dict],
) -> PairPosition | None:
"""从 DB 行 + 交易所快照构造并刷新 PairPosition
复用场景:
1. recover_positions_from_db() - 启动恢复
2. _detect_and_adopt_orphans() - 孤儿检测中 DB 命中
Args:
row: 数据库查询结果行
exchange_coins: {coin: exchange_pos_dict}
Returns:
刷新后的 PairPosition 对象,失败返回 None
数据来源策略:
- DB 优先: entry_adaptive_z, entry_zscore_4h (信号快照)
- 交易所优先: alt_size, unrealized_pnl (实时数据)
- 混合: base_size (交易所实时 + DB 兜底)
"""
try:
symbol = row["symbol"]
coin = symbol_to_coin(symbol)
# 验证交易所是否有该 coin
if coin not in exchange_coins:
return None
ex_pos = exchange_coins[coin]
szi = float(ex_pos.get("szi", 0))
if szi == 0:
return None
position = PairPosition(
position_id=str(row["position_id"]),
symbol=symbol,
base_symbol=row["base_symbol"],
direction=row["direction"],
status=PositionStatus(row["status"]),
pair_mode=row.get("pair_mode", "single"),
# Alt 腿 (使用交易所实时数据)
alt_side=row.get("alt_side", ""),
alt_size=abs(szi),
alt_entry_price=float(row.get("alt_entry_price", 0)),
# Base 腿 (DB 数据)
base_side=row.get("base_side", ""),
base_size=float(row.get("base_size", 0)),
base_entry_price=float(row.get("base_entry_price", 0)),
# 🔑 关键: 信号快照字段 (完整恢复)
entry_zscore_4h=float(row.get("entry_zscore_4h", 0) or 0),
entry_adaptive_z=float(row.get("entry_adaptive_z", 0) or 0),
entry_avg_zscore_4h=row.get("entry_avg_zscore_4h"),
entry_signal_strength=row.get("entry_signal_strength", ""),
# 盈亏数据 (交易所实时 + DB 历史)
unrealized_pnl=float(ex_pos.get("unrealizedPnl", 0)),
realized_pnl=float(row.get("realized_pnl", 0) or 0),
peak_pnl_pct=float(row.get("peak_pnl_pct", 0) or 0),
# 时间与关联
open_time=(
row.get("open_time").astimezone()
if row.get("open_time") else None
),
entry_signal_id=str(row.get("entry_signal_id", "")),
network=row.get("network", "testnet"),
)
# Pair 模式: 刷新 base 腿实时大小
if position.pair_mode == "pair" and position.base_symbol:
base_coin = symbol_to_coin(position.base_symbol)
if base_coin in exchange_coins:
base_ex = exchange_coins[base_coin]
base_szi = float(base_ex.get("szi", 0))
if base_szi != 0:
position.base_size = abs(base_szi)
return position
except Exception as e:
logger.error(
f"从 DB 行加载仓位失败: {row.get('symbol', '?')} | {e}",
exc_info=True,
)
return None
3.4 PositionManager — 主流程重构
文件: src/trading/position_manager.py (L996-L1144)
完整实现:
def _detect_and_adopt_orphans(
self,
exchange_coins: dict,
source: str
) -> list[str]:
"""检测交易所存在但内存中没有的孤儿仓位,纳入管理
改进:
- 在判定孤儿前,先批量查询数据库验证
- 区分"DB 恢复"(保留字段) 和 "真孤儿"(entry_adaptive_z=0)
- 只对真孤儿发送告警通知
- 智能配对: 只配对两个都是真孤儿的币种
Args:
exchange_coins: {coin: exchange_pos_dict}
source: 'recover' / 'sync'
Returns:
收纳的孤儿仓位 symbol 列表
"""
memory_coins = {symbol_to_coin(s) for s in self._positions}
adopted_symbols: list[str] = []
# Step 1: 收集所有内存未覆盖的 coin
orphan_candidates = {}
for coin, ex_pos in exchange_coins.items():
if coin in memory_coins:
continue
szi = float(ex_pos.get("szi", 0))
if szi == 0:
continue
orphan_candidates[coin] = ex_pos
if not orphan_candidates:
return []
# Step 2: 🆕 批量查询数据库验证 (性能优化: O(1) vs O(N))
symbols = [coin_to_symbol(coin) for coin in orphan_candidates.keys()]
db_positions_map = self._repo.get_positions_by_symbols(
symbols, network=self._config.network.value
)
# Step 3: 从 DB 恢复已存在的仓位
for coin, ex_pos in list(orphan_candidates.items()):
symbol = coin_to_symbol(coin)
db_row = db_positions_map.get(symbol)
if db_row:
# ✅ DB 有记录 → 恢复到内存
try:
position = self._load_position_from_db_row(db_row, exchange_coins)
if position:
self._positions[symbol] = position
adopted_symbols.append(symbol)
# 从候选列表移除
orphan_candidates.pop(coin)
# DB 恢复日志 (INFO 级别, 正常流程)
logger.info(
f"✅ 从数据库恢复仓位 ({source}): {symbol} | "
f"方向={position.direction} | 大小={position.alt_size} | "
f"entry_adaptive_z={position.entry_adaptive_z:.4f} | "
f"position_id: {position.position_id}"
)
# 🆕 DB 恢复不发送告警 (正常流程)
# Pair 模式: 同步标记 base_coin 已处理
if position.pair_mode == "pair" and position.base_symbol:
base_coin = symbol_to_coin(position.base_symbol)
memory_coins.add(coin)
memory_coins.add(base_coin)
orphan_candidates.pop(base_coin, None)
except Exception as e:
logger.error(
f"从 DB 恢复仓位失败: {symbol} | {e}",
exc_info=True
)
# 降级: 保留在 orphan_candidates 中,按孤儿处理
# Step 4: 处理真孤儿 (DB 无记录)
orphan_entries = orphan_candidates # 剩余的都是真孤儿
if not orphan_entries:
return adopted_symbols
# Fix #12: 智能配对 (只配对两个都是真孤儿的币种)
from src.config import HYPE_ALT_SYMBOL, HYPE_BASE_SYMBOL
alt_coin = symbol_to_coin(HYPE_ALT_SYMBOL)
base_coin = symbol_to_coin(HYPE_BASE_SYMBOL)
paired_coins = set()
# 🆕 配对前验证: 两个币种都是真孤儿 (DB 无记录)
if alt_coin in orphan_entries and base_coin in orphan_entries:
alt_ex = orphan_entries[alt_coin]
base_ex = orphan_entries[base_coin]
alt_szi = float(alt_ex.get("szi", 0))
base_szi = float(base_ex.get("szi", 0))
# 方向相反才配对
if (alt_szi > 0) != (base_szi > 0):
direction = "long" if alt_szi > 0 else "short"
alt_size = abs(alt_szi)
base_size = abs(base_szi)
alt_entry_price = float(alt_ex.get("entryPx", 0))
base_entry_price = float(base_ex.get("entryPx", 0))
position = PairPosition(
symbol=HYPE_ALT_SYMBOL,
base_symbol=HYPE_BASE_SYMBOL,
direction=direction,
status=PositionStatus.OPEN,
pair_mode="pair",
alt_side="buy" if direction == "long" else "sell",
alt_size=alt_size,
alt_entry_price=alt_entry_price,
base_side="sell" if direction == "long" else "buy",
base_size=base_size,
base_entry_price=base_entry_price,
# 🔑 孤儿标记
entry_zscore_4h=0.0,
entry_adaptive_z=0.0,
open_time=datetime.now().astimezone(),
network=self._config.network.value,
)
self._positions[position.symbol] = position
self._repo.save_position(position)
paired_coins.update({alt_coin, base_coin})
adopted_symbols.append(position.symbol)
logger.warning(
f"🔗 孤儿配对收纳 ({source}): {position.symbol} + {position.base_symbol} | "
f"pair_mode=pair | 方向={direction} | "
f"alt_size={alt_size} | base_size={base_size}"
)
try:
sender_colourful(
title=f"🔗 孤儿配对收纳: {position.symbol}",
content=(
f"**来源**: {source}\n"
f"**Alt 腿**: {position.symbol} ({position.direction})\n"
f"**Base 腿**: {position.base_symbol}\n"
f"**大小**: {alt_size} / {base_size}\n"
f"---\n"
f"⚠️ **数据库无记录**, 可能是外部手动创建\n"
f"**注意**: entry_adaptive_z=0, 均值回归退出已禁用\n"
f"**网络**: {self._config.network_label}"
),
)
except (OSError, ValueError) as e:
logger.error(f"❌ 孤儿配对告警发送异常: {e}")
# Step 5: 剩余 single 孤儿收纳
for coin, ex_pos in orphan_entries.items():
if coin in paired_coins:
continue
szi = float(ex_pos.get("szi", 0))
direction = "long" if szi > 0 else "short"
size = abs(szi)
entry_price = float(ex_pos.get("entryPx", 0))
symbol = coin_to_symbol(coin)
position = PairPosition(
symbol=symbol,
base_symbol="",
direction=direction,
status=PositionStatus.OPEN,
pair_mode="single",
alt_side="buy" if direction == "long" else "sell",
alt_size=size,
alt_entry_price=entry_price,
# 🔑 孤儿标记
entry_zscore_4h=0.0,
entry_adaptive_z=0.0,
open_time=datetime.now().astimezone(),
network=self._config.network.value,
)
self._positions[symbol] = position
self._repo.save_position(position)
adopted_symbols.append(symbol)
# 真孤儿日志 (WARNING 级别)
logger.warning(
f"🔗 真孤儿仓位收纳 ({source}): {symbol} | "
f"方向={direction} | 大小={size} | "
f"入场价=${entry_price:.4f} | "
f"⚠️ 数据库无记录 | position_id: {position.position_id}"
)
# 🆕 真孤儿告警 (区分于 DB 恢复)
try:
sender_colourful(
title=f"🚨 真孤儿仓位收纳: {symbol}",
content=(
f"**来源**: {source}\n"
f"**币种**: {symbol}\n"
f"**方向**: {direction}\n"
f"**大小**: {size}\n"
f"**入场价**: ${entry_price:.4f}\n"
f"---\n"
f"⚠️ **数据库无记录**, 可能是外部手动创建的仓位\n"
f"**注意**: entry_adaptive_z=0, 均值回归退出已禁用\n"
f"**建议**: 请确认此仓位来源, 必要时手动平仓\n"
f"**网络**: {self._config.network_label}"
),
)
except (OSError, ValueError) as e:
logger.error(f"❌ 真孤儿仓位告警发送异常: {e}")
return adopted_symbols
3.5 PositionManager — 恢复流程对接
文件: src/trading/position_manager.py (L760-L810)
重构 recover_positions_from_db:
def recover_positions_from_db(self) -> int:
"""从数据库恢复活跃仓位到内存"""
recovered = 0
db_rows = self._repo.get_open_positions(network=self._config.network.value)
if not db_rows:
logger.info("无活跃仓位需要恢复")
return 0
exchange_coins = self._executor.get_positions(force_refresh=True)
with self._lock:
for row in db_rows:
symbol = row["symbol"]
if symbol in self._positions:
continue
- # 旧实现: 内联构建 PairPosition
- coin = symbol_to_coin(symbol)
- if coin not in exchange_coins:
- logger.warning(f"幽灵仓位: DB 有但交易所无: {symbol}")
- self._repo.update_position_status(
- str(row["position_id"]), PositionStatus.CLOSED
- )
- continue
-
- position = PairPosition(...)
- position.alt_size = abs(float(exchange_coins[coin].get("szi", ...)))
- # ... 更多字段赋值
+ # 🆕 复用 _load_position_from_db_row
+ position = self._load_position_from_db_row(row, exchange_coins)
+ if not position:
+ # 幽灵仓位: DB 有但交易所无
+ logger.warning(f"幽灵仓位: DB 有但交易所无: {symbol}")
+ self._repo.update_position_status(
+ str(row["position_id"]), PositionStatus.CLOSED
+ )
+ continue
self._positions[symbol] = position
recovered += 1
logger.info(f"✅ 从数据库恢复 {recovered} 个活跃仓位")
return recovered
四、性能分析
4.1 性能对比
| 指标 | 改进前 | 改进后 | 变化 |
|---|---|---|---|
| DB 查询次数 | 0 | 1 (批量) | +1 次 |
| DB 查询耗时 | 0ms | <5ms | +5ms |
| 总体耗时 | ~50ms | ~55ms | +10% |
| 同步周期 | 60s | 60s | 无影响 |
4.2 极端场景测试
场景: 检测到 10 个孤儿候选币种
- 批量查询耗时: <10ms
- 总体影响: 仍可接受 (<20% 开销)
- 结论: 性能瓶颈不在 DB 查询
4.3 性能优化手段
批量查询 vs 逐个查询:
# ❌ 低效方案 (O(N))
for coin in orphan_candidates:
symbol = coin_to_symbol(coin)
db_pos = repo.get_position_by_symbol(symbol) # N 次查询
# ✅ 高效方案 (O(1))
symbols = [coin_to_symbol(c) for c in orphan_candidates]
db_positions_map = repo.get_positions_by_symbols(symbols) # 1 次查询
五、错误处理与降级策略
5.1 降级策略矩阵
| 场景 | 错误类型 | 降级方案 | 影响 |
|---|---|---|---|
| DB 批量查询失败 | 网络/连接异常 | 返回空字典, 全部按孤儿处理 | 丢失字段, 但系统可用 |
| DB 行加载失败 | 数据解析异常 | 创建孤儿仓位 | 单个仓位丢失字段 |
| 数据不一致 | 方向/大小冲突 | 日志记录, 继续使用交易所数据 | 仅日志告警 |
5.2 错误处理代码示例
DB 查询失败:
try:
rows = self._db.execute_query(sql, tuple(params)) or []
return {row["symbol"]: row for row in rows}
except Exception as e:
logger.error(f"批量查询仓位失败: {e}", exc_info=True)
return {} # 降级: 保证系统可用性 > 精确性
DB 恢复失败:
try:
position = self._load_position_from_db_row(db_row, exchange_coins)
except Exception as e:
logger.error(f"从 DB 恢复仓位失败: {symbol} | {e}", exc_info=True)
# 降级为孤儿收纳
orphan_candidates.pop(coin, None) # 不移除, 后续按孤儿处理
六、兼容性保证
6.1 向后兼容性
| 场景 | 行为 | 影响 |
|---|---|---|
| 已收纳的老孤儿仓位 | DB 中 entry_adaptive_z=0, 下次恢复时从 DB 读取, 保持 entry_adaptive_z=0 |
✅ 行为不变 |
| 新孤儿仓位 | 自动走新流程 (DB 查询 → 恢复/收纳) | ✅ 改进生效 |
| 正常仓位 | 从 DB 恢复, 保留完整字段 | ✅ 核心收益 |
6.2 配对逻辑兼容性 (Fix #12)
配对判定矩阵:
| Alt DB | Base DB | 行为 |
|---|---|---|
| ❌ 无 | ❌ 无 | ✅ 配对为 pair 仓位 |
| ✅ 有 | ❌ 无 | ✅ Alt 从 DB 恢复, Base 孤儿收纳 (两个独立 single 仓位) |
| ❌ 无 | ✅ 有 | ✅ Base 从 DB 恢复, Alt 孤儿收纳 (两个独立 single 仓位) |
| ✅ 有 | ✅ 有 | ✅ 两个都从 DB 恢复 (两个独立 single 仓位) |
七、测试验证
7.1 单元测试场景
测试文件: tests/test_orphan_position_final.py
| 用例 | 场景 | 预期 |
|---|---|---|
test_orphan_db_recovery |
内存无 + DB 有活跃仓位 | 从 DB 恢复, 不创建新仓位 |
test_orphan_true_orphan |
内存无 + DB 无 | 创建新 PairPosition (entry_adaptive_z=0) |
test_orphan_memory_hit |
内存有 | 不查 DB, 直接跳过 |
test_orphan_pairing_both_orphans |
PURR (DB 无) + HYPE (DB 无), 方向相反 | 配对为 pair 仓位 |
test_orphan_pairing_one_in_db |
PURR (DB 有) + HYPE (DB 无) | 不配对, 各自处理 |
test_batch_query_performance |
批量查询 10 个 symbol | 耗时 <10ms |
test_db_query_failure |
DB 查询抛异常 | 降级为孤儿收纳 |
7.2 集成测试场景
场景 1: 模拟重启恢复
def test_restart_recovery():
# 1. 准备 DB 数据
insert_test_position(
symbol="PURR/USDC:USDC",
entry_adaptive_z=2.5,
status="open"
)
# 2. 清空内存 (模拟重启)
manager._positions.clear()
# 3. 执行恢复
manager.recover_positions_from_db()
# 4. 验证: 从 DB 恢复, 保留完整字段
assert manager._positions["PURR/USDC:USDC"].entry_adaptive_z == 2.5
场景 2: 模拟孤儿检测
def test_sync_orphan_detection():
# 1. 交易所有仓位, 内存无, DB 无
exchange_coins = {"PURR": {"szi": 100, "entryPx": 0.5}}
manager._positions.clear()
# 2. 执行同步
manager.sync_with_exchange()
# 3. 验证: 创建孤儿仓位
assert "PURR/USDC:USDC" in manager._positions
assert manager._positions["PURR/USDC:USDC"].entry_adaptive_z == 0.0
八、实施计划
Phase 1: 核心实现 (4-6 小时)
- [ ]
trade_repository.py: 添加get_positions_by_symbols()方法 - [ ]
protocols.py: 同步TradeRepositoryProtocol接口 - [ ]
position_manager.py: 添加_load_position_from_db_row()辅助方法 - [ ]
position_manager.py: 重构_detect_and_adopt_orphans()主流程 - [ ]
position_manager.py: 重构recover_positions_from_db()使用辅助方法 - [ ] 代码审查与优化
Phase 2: 测试验证 (4-6 小时)
- [ ] 编写单元测试 (7 个核心场景)
- [ ] 编写集成测试 (重启恢复 + 孤儿检测)
- [ ] 手动测试验证 (测试网环境)
- [ ] 性能测试 (批量查询耗时)
Phase 3: 文档与监控 (2-3 小时)
- [ ] 更新
docs/DATA_FLOW.md(孤儿检测流程) - [ ] 更新代码注释和文档
- [ ] 添加性能监控指标
- [ ] 配置告警阈值
九、关键收益总结
9.1 核心收益
| 收益 | 说明 | 影响 |
|---|---|---|
| ✅ 避免误判 | 数据库有记录的正常仓位不再被当作孤儿 | 减少 false positive |
| ✅ 保留字段 | entry_adaptive_z, entry_zscore_4h 完整恢复 |
均值回归策略正常工作 |
| ✅ 精准告警 | DB 恢复不告警, 真孤儿才告警 | 减少告警噪音 50%+ |
| ✅ 性能可控 | 批量查询优化, 总体性能影响 <10% | 对系统影响最小 |
| ✅ 向后兼容 | 无需数据迁移和配置变更 | 零停机升级 |
| ✅ 智能配对 | 只配对真孤儿, 避免混淆 | 逻辑更清晰 |
9.2 业务价值
均值回归策略恢复:
- 现状: 孤儿仓位
entry_adaptive_z=0→ 均值回归退出失效 - 改进后: 从 DB 恢复
entry_adaptive_z→ 策略正常工作 - 影响: 提升退出策略准确性, 减少亏损扩大风险
告警噪音减少:
- 现状: 每次恢复都可能触发孤儿告警 (正常流程)
- 改进后: 仅真孤儿才告警 (异常流程)
- 影响: 运维效率提升, 关注真正的异常情况
数据完整性:
- 现状: 多个关键字段丢失 (entry_signal_id, peak_pnl_pct 等)
- 改进后: 完整恢复所有信号快照字段
- 影响: 信号追溯、移动止损等功能正常工作
十、涉及文件清单
| 文件 | 变更说明 |
|---|---|
src/trading/trade_repository.py |
新增 get_positions_by_symbols() 批量查询方法 |
src/trading/protocols.py |
同步 TradeRepositoryProtocol 接口定义 |
src/trading/position_manager.py |
新增 _load_position_from_db_row() 辅助方法;重构 _detect_and_adopt_orphans();重构 recover_positions_from_db() |
docs/DATA_FLOW.md |
更新孤儿仓位检测流程描述 |
tests/test_orphan_position_final.py |
新增单元测试和集成测试 |
附录: 关键流程时序图
A.1 sync_with_exchange 完整流程
sequenceDiagram
participant Orch as Orchestrator<br/>(60s 定时)
participant PM as PositionManager
participant Exec as Executor
participant DB as TradeRepository
participant Mem as _positions
Orch->>PM: sync_with_exchange()
PM->>Exec: get_positions(force_refresh=True)
Exec-->>PM: exchange_coins
loop 每个 exchange coin
PM->>Mem: coin in memory_coins?
alt 内存有
PM->>PM: 同步 size/pnl
else 内存无
Note over PM: 🆕 批量查询 DB
PM->>DB: get_positions_by_symbols([symbols])
DB-->>PM: {symbol: db_row}
alt DB 有
PM->>PM: _load_position_from_db_row()
PM->>Mem: _positions[symbol] = position
Note over PM: ✅ 从 DB 恢复, 非孤儿
else DB 也无
Note over PM: 确认为真孤儿
PM->>PM: 创建新 PairPosition
PM->>DB: save_position()
PM->>Mem: _positions[symbol] = position
end
end
end
文档版本: v1.0 Final
创建日期: 2026-02-15
状态: 待实施
综合来源: DESIGN_ORPHAN_POSITION_1.md + DESIGN_ORPHAN_POSITION_2.md + DESIGN_ORPHAN_POSITION_3.md