孤儿仓位设计方案2
孤儿仓位收纳逻辑改进 - 设计方案
一、问题诊断
1.1 当前缺陷
当前流程:
交易所有 + 内存无 → 直接判定为孤儿 → 收纳(entry_adaptive_z=0)
核心问题:
- ❌ 跳过数据库查询验证步骤
- ❌ "数据库有记录但内存未加载"的正常仓位被误判为孤儿
- ❌ 导致
entry_adaptive_z、entry_zscore_4h等关键字段丢失 - ❌ 均值回归退出策略失效
1.2 边缘情况分析
| 场景 | 详述 | 后果 |
|---|---|---|
| 持久化延迟 | 开仓成功 → 加入内存 → 异步 save_position() 尚未完成 → recover 时 DB 无记录 |
被误判为孤儿 |
| 并发竞态 | 恢复线程读 DB 时,新开仓的 save_position() 正在执行 |
DB 查询不到最新仓位 |
| 多进程扩展 | 其他进程开的仓位在当前进程的 DB 查询结果中可能延迟出现 | 短暂的数据不一致 |
1.3 影响范围
受影响的关键字段:
entry_adaptive_z:均值回归退出的基线(最关键)entry_zscore_4h:入场时的 Z-score 快照entry_avg_zscore_4h:入场时的平均 Z-scoreentry_signal_strength:信号强度entry_signal_id:信号 ID(追溯用)peak_pnl_pct:峰值盈利百分比(移动止损用)
业务影响:
- ⚠️ 均值回归退出策略无法工作(entry_adaptive_z=0 被跳过)
- ⚠️ 移动止损可能失效(peak_pnl_pct 丢失)
- ⚠️ 信号追溯困难(entry_signal_id 丢失)
- ⚠️ 告警噪音增加(正常恢复被当作异常告警)
二、改进方案
2.1 核心设计
新流程:
交易所有 + 内存无
↓
1️⃣ 批量查询数据库(symbol+network)
↓
├─ ✅ DB有记录 → 恢复完整仓位(保留 entry_adaptive_z)
│ → INFO 日志
│ → 不发送告警
└─ ❌ DB无记录 → 判定真孤儿 → 收纳(entry_adaptive_z=0)
→ WARNING 日志
→ 发送告警通知
核心优势:
- ✅ 区分"数据库恢复"和"真孤儿"
- ✅ 保留完整仓位字段
- ✅ 减少告警噪音
- ✅ 保证均值回归策略正常工作
2.2 架构层次
┌─────────────────────────────────────────┐
│ Layer 1: trade_repository.py │
│ - 新增批量查询方法 │
│ - get_positions_by_symbols() │
└─────────────────────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Layer 2: position_manager.py │
│ - _batch_verify_orphans_from_db() │
│ - _restore_position_from_db() │
│ - _create_orphan_position() │
│ - _detect_and_adopt_orphans() (修改) │
└─────────────────────────────────────────┘
三、详细实现
3.1 trade_repository.py - 批量查询方法
文件位置:src/trading/trade_repository.py
新增方法:
def get_positions_by_symbols(
self,
symbols: list[str],
network: str = None
) -> dict[str, dict]:
"""批量查询活跃仓位(性能优化版)
Args:
symbols: 交易对列表 ["PURR/USDC:USDC", "HYPE/USDC:USDC"]
network: 网络标识(testnet/mainnet)
Returns:
{symbol: position_dict} - 仅包含数据库中存在的记录
性能特性:
- SQL: symbol = ANY(%s)(批量查询)
- 索引: idx_pair_positions_symbol_status
- 时间复杂度: O(1) vs 逐个查询 O(N)
- 预期耗时: <5ms(查询 3-10 个 symbol)
"""
if not symbols:
return {}
sql = """
SELECT * FROM pair_positions
WHERE symbol = ANY(%s)
AND status IN ('open', 'opening', 'closing')
"""
params = [symbols]
if network:
sql += " AND network = %s"
params.append(network)
try:
rows = self._db.execute_query(sql, tuple(params)) or []
return {row["symbol"]: row for row in rows}
except Exception as e:
logger.error(f"批量查询仓位失败: {e}", exc_info=True)
return {} # 降级:返回空字典
SQL 性能分析:
-- 利用现有索引
CREATE INDEX idx_pair_positions_symbol_status
ON pair_positions (symbol, status);
-- 查询示例
SELECT * FROM pair_positions
WHERE symbol = ANY(ARRAY['PURR/USDC:USDC', 'HYPE/USDC:USDC'])
AND status IN ('open', 'opening', 'closing')
AND network = 'testnet';
-- 执行计划(预期)
Index Scan using idx_pair_positions_symbol_status on pair_positions
Index Cond: (symbol = ANY(...) AND status IN (...))
Filter: (network = 'testnet')
3.2 position_manager.py - 辅助方法
文件位置:src/trading/position_manager.py
方法 1:批量验证孤儿仓位
def _batch_verify_orphans_from_db(
self,
orphan_entries: dict[str, dict]
) -> dict[str, dict]:
"""批量查询数据库验证孤儿仓位
Args:
orphan_entries: {coin: exchange_pos_dict}
coin: "PURR", "HYPE" 等
exchange_pos_dict: {"szi": 100, "entryPx": 0.5, ...}
Returns:
{symbol: db_position_dict}
symbol: "PURR/USDC:USDC"
db_position_dict: 完整的 DB 记录(含 entry_adaptive_z 等)
错误处理:
- DB 查询失败 → 返回空字典(降级为孤儿收纳)
- 保证系统可用性 > 精确性
"""
if not orphan_entries:
return {}
# coin → symbol 转换
symbols = [coin_to_symbol(coin) for coin in orphan_entries.keys()]
try:
return self._repo.get_positions_by_symbols(
symbols,
network=self._config.network.value
)
except Exception as e:
logger.error(
f"批量验证孤儿仓位失败,降级为孤儿收纳: {e}",
exc_info=True
)
return {} # 降级策略
方法 2:从数据库恢复仓位
def _restore_position_from_db(
self,
db_row: dict,
exchange_pos: dict
) -> PairPosition:
"""从数据库记录恢复仓位对象
Args:
db_row: 数据库查询结果(完整记录)
exchange_pos: 交易所仓位数据(实时数据)
Returns:
恢复的 PairPosition 对象
数据来源策略:
- DB 优先:entry_adaptive_z, entry_zscore_4h(信号快照)
- 交易所优先:alt_size, unrealized_pnl(实时数据)
- 混合:base_size(交易所实时 + DB 兜底)
关键字段恢复:
✅ entry_adaptive_z: 均值回归基线
✅ entry_zscore_4h: 入场 Z-score
✅ entry_signal_id: 信号追溯
✅ peak_pnl_pct: 移动止损峰值
"""
symbol = db_row["symbol"]
position = PairPosition(
position_id=str(db_row["position_id"]),
symbol=symbol,
base_symbol=db_row["base_symbol"],
direction=db_row["direction"],
status=PositionStatus(db_row["status"]),
pair_mode=db_row.get("pair_mode", "single"),
# Alt 腿(使用交易所实时数据)
alt_side=db_row.get("alt_side", ""),
alt_size=abs(float(exchange_pos.get("szi", 0))),
alt_entry_price=float(db_row.get("alt_entry_price", 0)),
# Base 腿(DB 数据)
base_side=db_row.get("base_side", ""),
base_size=float(db_row.get("base_size", 0)),
base_entry_price=float(db_row.get("base_entry_price", 0)),
# 🔑 关键:信号快照字段(完整恢复)
entry_zscore_4h=float(db_row.get("entry_zscore_4h", 0) or 0),
entry_adaptive_z=float(db_row.get("entry_adaptive_z", 0) or 0),
entry_avg_zscore_4h=db_row.get("entry_avg_zscore_4h"),
entry_signal_strength=db_row.get("entry_signal_strength", ""),
# 盈亏数据(交易所实时 + DB 历史)
unrealized_pnl=float(exchange_pos.get("unrealizedPnl", 0)),
realized_pnl=float(db_row.get("realized_pnl", 0) or 0),
peak_pnl_pct=float(db_row.get("peak_pnl_pct", 0) or 0),
# 时间与关联
open_time=db_row.get("open_time").astimezone()
if db_row.get("open_time") else None,
entry_signal_id=str(db_row.get("entry_signal_id", "")),
network=db_row.get("network", "testnet"),
)
return position
数据一致性校验(可选增强):
# 在 _restore_position_from_db 末尾添加
szi = float(exchange_pos.get("szi", 0))
exchange_direction = "long" if szi > 0 else "short"
if db_row["direction"] != exchange_direction:
logger.warning(
f"⚠️ 数据不一致: {symbol} | "
f"DB 方向={db_row['direction']} vs "
f"交易所方向={exchange_direction}"
)
# 可选择抛出异常,触发降级流程
# raise ValueError("Direction mismatch")
方法 3:创建真孤儿仓位
def _create_orphan_position(
self,
coin: str,
exchange_pos: dict
) -> PairPosition:
"""创建真孤儿仓位(entry_adaptive_z=0)
Args:
coin: 币种名称 "PURR"
exchange_pos: 交易所仓位数据
Returns:
孤儿仓位对象(需持久化到 DB)
特征:
- entry_adaptive_z=0(禁用均值回归退出)
- entry_zscore_4h=0
- open_time=当前时间(无法追溯真实开仓时间)
- entry_signal_id=""(无信号关联)
"""
szi = float(exchange_pos.get("szi", 0))
direction = "long" if szi > 0 else "short"
size = abs(szi)
entry_price = float(exchange_pos.get("entryPx", 0))
symbol = coin_to_symbol(coin)
position = PairPosition(
symbol=symbol,
base_symbol="",
direction=direction,
status=PositionStatus.OPEN,
pair_mode="single",
alt_side="buy" if direction == "long" else "sell",
alt_size=size,
alt_entry_price=entry_price,
# 🔑 孤儿标记
entry_zscore_4h=0.0,
entry_adaptive_z=0.0,
open_time=datetime.now().astimezone(),
network=self._config.network.value,
)
return position
3.3 position_manager.py - 主流程改造
文件位置:src/trading/position_manager.py:996-1144
修改点总览:
| 行号 | 修改内容 | 说明 |
|---|---|---|
| 1024 后 | 增加 DB 批量查询 | db_positions_map = self._batch_verify_orphans_from_db(...) |
| 1033-1089 | 配对逻辑增强 | 只配对两个都是真孤儿的币种 |
| 1091-1143 | 分支处理 | DB 恢复 vs 真孤儿收纳 |
详细修改:
def _detect_and_adopt_orphans(
self,
exchange_coins: dict,
source: str
) -> list[str]:
"""检测交易所存在但内存中没有的孤儿仓位,纳入管理
改进:
- 在判定孤儿前,先批量查询数据库验证
- 区分"DB 恢复"(保留字段)和"真孤儿"(entry_adaptive_z=0)
- 只对真孤儿发送告警通知
Args:
exchange_coins: {coin: exchange_pos_dict}
source: 'recover' / 'sync'
Returns:
收纳的孤儿仓位 symbol 列表
"""
memory_coins = {symbol_to_coin(s) for s in self._positions}
adopted_symbols: list[str] = []
# 收集所有孤儿币种
orphan_entries = {}
for coin, ex_pos in exchange_coins.items():
if coin in memory_coins:
continue
szi = float(ex_pos.get("szi", 0))
if szi == 0:
continue
orphan_entries[coin] = ex_pos
if not orphan_entries:
return []
# 🆕 批量查询数据库验证(性能优化:O(1) vs O(N))
db_positions_map = self._batch_verify_orphans_from_db(orphan_entries)
# Fix #12: 智能配对(只配对真孤儿)
from src.config import HYPE_ALT_SYMBOL, HYPE_BASE_SYMBOL
alt_coin = symbol_to_coin(HYPE_ALT_SYMBOL)
base_coin = symbol_to_coin(HYPE_BASE_SYMBOL)
paired_coins = set()
# 🆕 配对前验证:两个币种都是真孤儿(DB 无记录)
if alt_coin in orphan_entries and base_coin in orphan_entries:
alt_symbol = coin_to_symbol(alt_coin)
base_symbol = coin_to_symbol(base_coin)
alt_is_orphan = alt_symbol not in db_positions_map
base_is_orphan = base_symbol not in db_positions_map
if alt_is_orphan and base_is_orphan:
# 执行现有配对逻辑(完全保持不变)
alt_ex = orphan_entries[alt_coin]
base_ex = orphan_entries[base_coin]
alt_szi = float(alt_ex.get("szi", 0))
base_szi = float(base_ex.get("szi", 0))
if (alt_szi > 0) != (base_szi > 0):
# [配对逻辑代码保持不变...]
paired_coins.update({alt_coin, base_coin})
adopted_symbols.append(position.symbol)
# 🆕 剩余孤儿仓位处理(区分 DB 恢复和真孤儿)
for coin, ex_pos in orphan_entries.items():
if coin in paired_coins:
continue
symbol = coin_to_symbol(coin)
# 🆕 检查数据库是否有记录
db_position = db_positions_map.get(symbol)
if db_position:
# ✅ 分支 1:从数据库恢复完整仓位
try:
position = self._restore_position_from_db(db_position, ex_pos)
# Pair 模式:同步 base 腿大小
if position.pair_mode == "pair" and position.base_symbol:
base_coin_for_pair = symbol_to_coin(position.base_symbol)
if base_coin_for_pair in exchange_coins:
base_ex_pos = exchange_coins[base_coin_for_pair]
position.base_size = abs(float(
base_ex_pos.get("szi", position.base_size)
))
self._positions[symbol] = position
adopted_symbols.append(symbol)
# DB 恢复日志(INFO 级别,正常流程)
logger.info(
f"✅ 从数据库恢复仓位 ({source}): {symbol} | "
f"方向={position.direction} | 大小={position.alt_size} | "
f"entry_adaptive_z={position.entry_adaptive_z:.4f} | "
f"position_id: {position.position_id}"
)
# 🆕 DB 恢复不发送告警(正常流程)
except Exception as e:
logger.error(f"从 DB 恢复仓位失败: {symbol} | {e}", exc_info=True)
# 降级为孤儿收纳
position = self._create_orphan_position(coin, ex_pos)
self._positions[symbol] = position
self._repo.save_position(position)
adopted_symbols.append(symbol)
# 发送告警...
else:
# ❌ 分支 2:真孤儿 - 创建新仓位(entry_adaptive_z=0)
position = self._create_orphan_position(coin, ex_pos)
self._positions[symbol] = position
self._repo.save_position(position)
adopted_symbols.append(symbol)
# 真孤儿日志(WARNING 级别)
logger.warning(
f"🔗 真孤儿仓位收纳 ({source}): {symbol} | "
f"方向={position.direction} | 大小={position.alt_size} | "
f"入场价=${position.alt_entry_price:.4f} | "
f"⚠️ 数据库无记录 | position_id: {position.position_id}"
)
# 🆕 真孤儿告警(区分于 DB 恢复)
try:
sender_colourful(
title=f"🚨 真孤儿仓位收纳: {symbol}",
content=(
f"**来源**: {source}\n"
f"**币种**: {symbol}\n"
f"**方向**: {position.direction}\n"
f"**大小**: {position.alt_size}\n"
f"**入场价**: ${position.alt_entry_price:.4f}\n"
f"---\n"
f"⚠️ **数据库无记录**,可能是外部手动创建的仓位\n"
f"**注意**: entry_adaptive_z=0,均值回归退出已禁用\n"
f"**建议**: 请确认此仓位来源,必要时手动平仓\n"
f"**网络**: {self._config.network_label}"
),
)
except (OSError, ValueError) as e:
logger.error(f"❌ 真孤儿仓位告警发送异常: {e}")
return adopted_symbols
四、错误处理策略
4.1 降级策略矩阵
| 场景 | 错误类型 | 降级方案 | 影响 |
|---|---|---|---|
| DB 查询失败 | 网络/连接异常 | 返回空字典,全部按孤儿处理 | 丢失字段,但系统可用 |
| DB 恢复失败 | 数据解析异常 | 创建孤儿仓位 | 单个仓位丢失字段 |
| 数据不一致 | 方向/大小冲突 | 日志记录,继续使用交易所数据 | 仅日志告警 |
4.2 错误处理代码
DB 查询失败:
try:
return self._repo.get_positions_by_symbols(...)
except Exception as e:
logger.error(f"批量验证孤儿仓位失败,降级为孤儿收纳: {e}", exc_info=True)
return {} # 降级:保证系统可用性 > 精确性
DB 恢复失败:
try:
position = self._restore_position_from_db(db_position, ex_pos)
except Exception as e:
logger.error(f"从 DB 恢复仓位失败: {symbol} | {e}", exc_info=True)
# 降级为孤儿收纳
position = self._create_orphan_position(coin, ex_pos)
# 保存 + 告警
五、性能分析
5.1 性能对比
| 指标 | 改进前 | 改进后 | 变化 |
|---|---|---|---|
| DB 查询次数 | 0 | 1(批量) | +1 次 |
| DB 查询耗时 | 0ms | <5ms | +5ms |
| 总体耗时 | ~50ms | ~55ms | +10% |
| 同步周期 | 60s | 60s | 无影响 |
5.2 性能优化手段
批量查询 vs 逐个查询:
# ❌ 低效方案(O(N))
for coin in orphan_entries:
symbol = coin_to_symbol(coin)
db_pos = repo.get_position_by_symbol(symbol) # N 次查询
# ✅ 高效方案(O(1))
symbols = [coin_to_symbol(c) for c in orphan_entries]
db_positions_map = repo.get_positions_by_symbols(symbols) # 1 次查询
索引利用:
-- 现有索引(已优化)
CREATE INDEX idx_pair_positions_symbol_status
ON pair_positions (symbol, status);
-- 批量查询执行计划
EXPLAIN SELECT * FROM pair_positions
WHERE symbol = ANY(ARRAY['PURR/USDC:USDC', 'HYPE/USDC:USDC'])
AND status IN ('open', 'opening', 'closing');
-- 预期结果:Index Scan(使用索引)
5.3 极端场景
场景:检测到 10 个孤儿币种
- DB 批量查询耗时: <10ms
- 总体影响: 仍可接受(<20% 开销)
- 结论: 性能瓶颈不在 DB 查询
六、兼容性保证
6.1 向后兼容性
| 场景 | 行为 | 影响 |
|---|---|---|
| 已收纳的老孤儿仓位 | DB 中 entry_adaptive_z=0,下次恢复时从 DB 读取,保持 entry_adaptive_z=0 |
✅ 行为不变 |
| 新孤儿仓位 | 自动走新流程(DB 查询 → 恢复/收纳) | ✅ 改进生效 |
| 正常仓位 | 从 DB 恢复,保留完整字段 | ✅ 核心收益 |
6.2 配对逻辑兼容性(Fix #12)
配对判定矩阵:
| Alt DB | Base DB | 行为 |
|---|---|---|
| ❌ 无 | ❌ 无 | ✅ 配对为 pair 仓位 |
| ✅ 有 | ❌ 无 | ✅ Alt 从 DB 恢复,Base 孤儿收纳(两个独立 single 仓位) |
| ❌ 无 | ✅ 有 | ✅ Base 从 DB 恢复,Alt 孤儿收纳(两个独立 single 仓位) |
| ✅ 有 | ✅ 有 | ✅ 两个都从 DB 恢复(两个独立 single 仓位) |
关键代码:
alt_is_orphan = alt_symbol not in db_positions_map
base_is_orphan = base_symbol not in db_positions_map
if alt_is_orphan and base_is_orphan:
# 执行配对逻辑
七、测试验证
7.1 单元测试场景
场景 1:DB 恢复正常仓位
def test_detect_orphans_db_recovery():
"""交易所有 + 内存无 + DB 有 → 恢复"""
exchange_coins = {"PURR": {"szi": 100, "entryPx": 0.5}}
manager._positions = {}
mock_repo.get_positions_by_symbols.return_value = {
"PURR/USDC:USDC": {"entry_adaptive_z": 2.5, ...}
}
adopted = manager._detect_and_adopt_orphans(exchange_coins, "test")
assert manager._positions["PURR/USDC:USDC"].entry_adaptive_z == 2.5
场景 2:真孤儿收纳
def test_detect_orphans_true_orphan():
"""交易所有 + 内存无 + DB 无 → 孤儿"""
exchange_coins = {"PURR": {"szi": 100, "entryPx": 0.5}}
manager._positions = {}
mock_repo.get_positions_by_symbols.return_value = {}
adopted = manager._detect_and_adopt_orphans(exchange_coins, "test")
assert manager._positions["PURR/USDC:USDC"].entry_adaptive_z == 0.0
mock_repo.save_position.assert_called_once()
场景 3:配对逻辑兼容性
def test_pairing_with_db_records():
"""PURR(DB 有) + HYPE(DB 无) → 不配对,各自处理"""
exchange_coins = {
"PURR": {"szi": 100, "entryPx": 0.5}, # long
"HYPE": {"szi": -200, "entryPx": 1.5}, # short
}
mock_repo.get_positions_by_symbols.return_value = {
"PURR/USDC:USDC": {"entry_adaptive_z": 2.5}
}
adopted = manager._detect_and_adopt_orphans(exchange_coins, "test")
# PURR 恢复,HYPE 孤儿收纳,不配对
assert manager._positions["PURR/USDC:USDC"].pair_mode == "single"
assert manager._positions["HYPE/USDC:USDC"].pair_mode == "single"
7.2 集成测试场景
场景:模拟重启恢复
def test_restart_recovery():
# 1. 准备 DB 数据
insert_test_position(
symbol="PURR/USDC:USDC",
entry_adaptive_z=2.5,
status="open"
)
# 2. 清空内存(模拟重启)
manager._positions.clear()
# 3. 执行恢复
manager.recover_positions_from_db()
# 4. 验证:从 DB 恢复,保留完整字段
assert manager._positions["PURR/USDC:USDC"].entry_adaptive_z == 2.5
八、实施计划
Phase 1: 核心实现(4-6 小时)
- [ ] trade_repository.py: 添加
get_positions_by_symbols()方法 - [ ] position_manager.py: 添加 3 个辅助方法
- [ ] position_manager.py: 修改
_detect_and_adopt_orphans()主流程 - [ ] 代码审查与优化
Phase 2: 测试验证(4-6 小时)
- [ ] 编写单元测试(3 个核心场景)
- [ ] 编写集成测试(重启恢复场景)
- [ ] 手动测试验证(测试网环境)
- [ ] 性能测试(批量查询耗时)
Phase 3: 文档与监控(可选,2-3 小时)
- [ ] 添加性能监控指标
- [ ] 更新代码注释和文档
- [ ] 配置告警阈值
九、关键收益总结
9.1 核心收益
| 收益 | 说明 | 影响 |
|---|---|---|
| ✅ 避免误判 | 数据库有记录的正常仓位不再被当作孤儿 | 减少 false positive |
| ✅ 保留字段 | entry_adaptive_z, entry_zscore_4h 完整恢复 |
均值回归策略正常工作 |
| ✅ 精准告警 | DB 恢复不告警,真孤儿才告警 | 减少告警噪音 50%+ |
| ✅ 性能可控 | 批量查询优化,总体性能影响 <10% | 对系统影响最小 |
| ✅ 向后兼容 | 无需数据迁移和配置变更 | 零停机升级 |
9.2 业务价值
均值回归策略恢复:
- 现状:孤儿仓位
entry_adaptive_z=0→ 均值回归退出失效 - 改进后:从 DB 恢复
entry_adaptive_z→ 策略正常工作 - 影响:提升退出策略准确性,减少亏损扩大风险
告警噪音减少:
- 现状:每次恢复都可能触发孤儿告警(正常流程)
- 改进后:仅真孤儿才告警(异常流程)
- 影响:运维效率提升,关注真正的异常情况
数据完整性:
- 现状:多个关键字段丢失(entry_signal_id, peak_pnl_pct 等)
- 改进后:完整恢复所有信号快照字段
- 影响:信号追溯、移动止损等功能正常工作
十、风险评估
| 风险 | 概率 | 影响 | 缓解措施 |
|---|---|---|---|
| DB 查询失败 | 低 | 中 | 降级策略:返回空字典,按孤儿处理 |
| 性能回归 | 低 | 低 | 批量查询优化,监控查询耗时 |
| 数据不一致 | 极低 | 低 | 方向校验,日志记录,降级处理 |
| 并发竞态 | 低 | 低 | 锁保护,原子操作 |
结论:总体风险可控,收益远大于风险。
附录:相关代码位置
关键文件清单
| 文件 | 行号 | 说明 |
|---|---|---|
src/trading/trade_repository.py |
新增 | get_positions_by_symbols() 方法 |
src/trading/position_manager.py |
996-1144 | _detect_and_adopt_orphans() 主流程 |
src/trading/position_manager.py |
新增 | _batch_verify_orphans_from_db() |
src/trading/position_manager.py |
新增 | _restore_position_from_db() |
src/trading/position_manager.py |
新增 | _create_orphan_position() |
src/trading/models.py |
参考 | PairPosition 字段定义 |
database/init_timescaledb.sql |
431-440 | 索引定义 |
数据库表结构
pair_positions 表关键字段:
position_id(UUID PRIMARY KEY)symbol(TEXT)status(TEXT)entry_adaptive_z(DOUBLE) ← 最关键字段entry_zscore_4h(DOUBLE)entry_signal_id(UUID)peak_pnl_pct(DOUBLE)network(TEXT)
相关索引:
CREATE INDEX idx_pair_positions_symbol_status
ON pair_positions (symbol, status);
文档版本:v1.0
创建日期:2026-02-15
作者:Claude Code
状态:待实施