孤儿仓位设计方案2

孤儿仓位收纳逻辑改进 - 设计方案

一、问题诊断

1.1 当前缺陷

当前流程

交易所有 + 内存无 → 直接判定为孤儿 → 收纳(entry_adaptive_z=0)

核心问题

  • ❌ 跳过数据库查询验证步骤
  • ❌ "数据库有记录但内存未加载"的正常仓位被误判为孤儿
  • ❌ 导致 entry_adaptive_zentry_zscore_4h 等关键字段丢失
  • ❌ 均值回归退出策略失效

1.2 边缘情况分析

场景 详述 后果
持久化延迟 开仓成功 → 加入内存 → 异步 save_position() 尚未完成 → recover 时 DB 无记录 被误判为孤儿
并发竞态 恢复线程读 DB 时,新开仓的 save_position() 正在执行 DB 查询不到最新仓位
多进程扩展 其他进程开的仓位在当前进程的 DB 查询结果中可能延迟出现 短暂的数据不一致

1.3 影响范围

受影响的关键字段

  • entry_adaptive_z:均值回归退出的基线(最关键
  • entry_zscore_4h:入场时的 Z-score 快照
  • entry_avg_zscore_4h:入场时的平均 Z-score
  • entry_signal_strength:信号强度
  • entry_signal_id:信号 ID(追溯用)
  • peak_pnl_pct:峰值盈利百分比(移动止损用)

业务影响

  • ⚠️ 均值回归退出策略无法工作(entry_adaptive_z=0 被跳过)
  • ⚠️ 移动止损可能失效(peak_pnl_pct 丢失)
  • ⚠️ 信号追溯困难(entry_signal_id 丢失)
  • ⚠️ 告警噪音增加(正常恢复被当作异常告警)

二、改进方案

2.1 核心设计

新流程

交易所有 + 内存无
  ↓
1️⃣ 批量查询数据库(symbol+network)
  ↓
  ├─ ✅ DB有记录 → 恢复完整仓位(保留 entry_adaptive_z)
  │                → INFO 日志
  │                → 不发送告警
  └─ ❌ DB无记录 → 判定真孤儿 → 收纳(entry_adaptive_z=0)
                   → WARNING 日志
                   → 发送告警通知

核心优势

  • ✅ 区分"数据库恢复"和"真孤儿"
  • ✅ 保留完整仓位字段
  • ✅ 减少告警噪音
  • ✅ 保证均值回归策略正常工作

2.2 架构层次

┌─────────────────────────────────────────┐
│  Layer 1: trade_repository.py          │
│  - 新增批量查询方法                      │
│  - get_positions_by_symbols()           │
└─────────────────────────────────────────┘
              ↓
┌─────────────────────────────────────────┐
│  Layer 2: position_manager.py           │
│  - _batch_verify_orphans_from_db()      │
│  - _restore_position_from_db()          │
│  - _create_orphan_position()            │
│  - _detect_and_adopt_orphans() (修改)   │
└─────────────────────────────────────────┘

三、详细实现

3.1 trade_repository.py - 批量查询方法

文件位置src/trading/trade_repository.py

新增方法

def get_positions_by_symbols(
    self,
    symbols: list[str],
    network: str = None
) -> dict[str, dict]:
    """批量查询活跃仓位(性能优化版)

    Args:
        symbols: 交易对列表 ["PURR/USDC:USDC", "HYPE/USDC:USDC"]
        network: 网络标识(testnet/mainnet)

    Returns:
        {symbol: position_dict} - 仅包含数据库中存在的记录

    性能特性:
        - SQL: symbol = ANY(%s)(批量查询)
        - 索引: idx_pair_positions_symbol_status
        - 时间复杂度: O(1) vs 逐个查询 O(N)
        - 预期耗时: <5ms(查询 3-10 个 symbol)
    """
    if not symbols:
        return {}

    sql = """
        SELECT * FROM pair_positions
        WHERE symbol = ANY(%s)
          AND status IN ('open', 'opening', 'closing')
    """
    params = [symbols]

    if network:
        sql += " AND network = %s"
        params.append(network)

    try:
        rows = self._db.execute_query(sql, tuple(params)) or []
        return {row["symbol"]: row for row in rows}
    except Exception as e:
        logger.error(f"批量查询仓位失败: {e}", exc_info=True)
        return {}  # 降级:返回空字典

SQL 性能分析

-- 利用现有索引
CREATE INDEX idx_pair_positions_symbol_status
  ON pair_positions (symbol, status);

-- 查询示例
SELECT * FROM pair_positions
WHERE symbol = ANY(ARRAY['PURR/USDC:USDC', 'HYPE/USDC:USDC'])
  AND status IN ('open', 'opening', 'closing')
  AND network = 'testnet';

-- 执行计划(预期)
Index Scan using idx_pair_positions_symbol_status on pair_positions
  Index Cond: (symbol = ANY(...) AND status IN (...))
  Filter: (network = 'testnet')

3.2 position_manager.py - 辅助方法

文件位置src/trading/position_manager.py

方法 1:批量验证孤儿仓位

def _batch_verify_orphans_from_db(
    self,
    orphan_entries: dict[str, dict]
) -> dict[str, dict]:
    """批量查询数据库验证孤儿仓位

    Args:
        orphan_entries: {coin: exchange_pos_dict}
            coin: "PURR", "HYPE" 等
            exchange_pos_dict: {"szi": 100, "entryPx": 0.5, ...}

    Returns:
        {symbol: db_position_dict}
            symbol: "PURR/USDC:USDC"
            db_position_dict: 完整的 DB 记录(含 entry_adaptive_z 等)

    错误处理:
        - DB 查询失败 → 返回空字典(降级为孤儿收纳)
        - 保证系统可用性 > 精确性
    """
    if not orphan_entries:
        return {}

    # coin → symbol 转换
    symbols = [coin_to_symbol(coin) for coin in orphan_entries.keys()]

    try:
        return self._repo.get_positions_by_symbols(
            symbols,
            network=self._config.network.value
        )
    except Exception as e:
        logger.error(
            f"批量验证孤儿仓位失败,降级为孤儿收纳: {e}",
            exc_info=True
        )
        return {}  # 降级策略

方法 2:从数据库恢复仓位

def _restore_position_from_db(
    self,
    db_row: dict,
    exchange_pos: dict
) -> PairPosition:
    """从数据库记录恢复仓位对象

    Args:
        db_row: 数据库查询结果(完整记录)
        exchange_pos: 交易所仓位数据(实时数据)

    Returns:
        恢复的 PairPosition 对象

    数据来源策略:
        - DB 优先:entry_adaptive_z, entry_zscore_4h(信号快照)
        - 交易所优先:alt_size, unrealized_pnl(实时数据)
        - 混合:base_size(交易所实时 + DB 兜底)

    关键字段恢复:
        ✅ entry_adaptive_z: 均值回归基线
        ✅ entry_zscore_4h: 入场 Z-score
        ✅ entry_signal_id: 信号追溯
        ✅ peak_pnl_pct: 移动止损峰值
    """
    symbol = db_row["symbol"]

    position = PairPosition(
        position_id=str(db_row["position_id"]),
        symbol=symbol,
        base_symbol=db_row["base_symbol"],
        direction=db_row["direction"],
        status=PositionStatus(db_row["status"]),
        pair_mode=db_row.get("pair_mode", "single"),

        # Alt 腿(使用交易所实时数据)
        alt_side=db_row.get("alt_side", ""),
        alt_size=abs(float(exchange_pos.get("szi", 0))),
        alt_entry_price=float(db_row.get("alt_entry_price", 0)),

        # Base 腿(DB 数据)
        base_side=db_row.get("base_side", ""),
        base_size=float(db_row.get("base_size", 0)),
        base_entry_price=float(db_row.get("base_entry_price", 0)),

        # 🔑 关键:信号快照字段(完整恢复)
        entry_zscore_4h=float(db_row.get("entry_zscore_4h", 0) or 0),
        entry_adaptive_z=float(db_row.get("entry_adaptive_z", 0) or 0),
        entry_avg_zscore_4h=db_row.get("entry_avg_zscore_4h"),
        entry_signal_strength=db_row.get("entry_signal_strength", ""),

        # 盈亏数据(交易所实时 + DB 历史)
        unrealized_pnl=float(exchange_pos.get("unrealizedPnl", 0)),
        realized_pnl=float(db_row.get("realized_pnl", 0) or 0),
        peak_pnl_pct=float(db_row.get("peak_pnl_pct", 0) or 0),

        # 时间与关联
        open_time=db_row.get("open_time").astimezone()
                  if db_row.get("open_time") else None,
        entry_signal_id=str(db_row.get("entry_signal_id", "")),
        network=db_row.get("network", "testnet"),
    )

    return position

数据一致性校验(可选增强)

# 在 _restore_position_from_db 末尾添加
szi = float(exchange_pos.get("szi", 0))
exchange_direction = "long" if szi > 0 else "short"

if db_row["direction"] != exchange_direction:
    logger.warning(
        f"⚠️ 数据不一致: {symbol} | "
        f"DB 方向={db_row['direction']} vs "
        f"交易所方向={exchange_direction}"
    )
    # 可选择抛出异常,触发降级流程
    # raise ValueError("Direction mismatch")

方法 3:创建真孤儿仓位

def _create_orphan_position(
    self,
    coin: str,
    exchange_pos: dict
) -> PairPosition:
    """创建真孤儿仓位(entry_adaptive_z=0)

    Args:
        coin: 币种名称 "PURR"
        exchange_pos: 交易所仓位数据

    Returns:
        孤儿仓位对象(需持久化到 DB)

    特征:
        - entry_adaptive_z=0(禁用均值回归退出)
        - entry_zscore_4h=0
        - open_time=当前时间(无法追溯真实开仓时间)
        - entry_signal_id=""(无信号关联)
    """
    szi = float(exchange_pos.get("szi", 0))
    direction = "long" if szi > 0 else "short"
    size = abs(szi)
    entry_price = float(exchange_pos.get("entryPx", 0))
    symbol = coin_to_symbol(coin)

    position = PairPosition(
        symbol=symbol,
        base_symbol="",
        direction=direction,
        status=PositionStatus.OPEN,
        pair_mode="single",
        alt_side="buy" if direction == "long" else "sell",
        alt_size=size,
        alt_entry_price=entry_price,

        # 🔑 孤儿标记
        entry_zscore_4h=0.0,
        entry_adaptive_z=0.0,

        open_time=datetime.now().astimezone(),
        network=self._config.network.value,
    )

    return position

3.3 position_manager.py - 主流程改造

文件位置src/trading/position_manager.py:996-1144

修改点总览

行号 修改内容 说明
1024 后 增加 DB 批量查询 db_positions_map = self._batch_verify_orphans_from_db(...)
1033-1089 配对逻辑增强 只配对两个都是真孤儿的币种
1091-1143 分支处理 DB 恢复 vs 真孤儿收纳

详细修改

def _detect_and_adopt_orphans(
    self,
    exchange_coins: dict,
    source: str
) -> list[str]:
    """检测交易所存在但内存中没有的孤儿仓位,纳入管理

    改进:
        - 在判定孤儿前,先批量查询数据库验证
        - 区分"DB 恢复"(保留字段)和"真孤儿"(entry_adaptive_z=0)
        - 只对真孤儿发送告警通知

    Args:
        exchange_coins: {coin: exchange_pos_dict}
        source: 'recover' / 'sync'

    Returns:
        收纳的孤儿仓位 symbol 列表
    """
    memory_coins = {symbol_to_coin(s) for s in self._positions}
    adopted_symbols: list[str] = []

    # 收集所有孤儿币种
    orphan_entries = {}
    for coin, ex_pos in exchange_coins.items():
        if coin in memory_coins:
            continue
        szi = float(ex_pos.get("szi", 0))
        if szi == 0:
            continue
        orphan_entries[coin] = ex_pos

    if not orphan_entries:
        return []

    # 🆕 批量查询数据库验证(性能优化:O(1) vs O(N))
    db_positions_map = self._batch_verify_orphans_from_db(orphan_entries)

    # Fix #12: 智能配对(只配对真孤儿)
    from src.config import HYPE_ALT_SYMBOL, HYPE_BASE_SYMBOL
    alt_coin = symbol_to_coin(HYPE_ALT_SYMBOL)
    base_coin = symbol_to_coin(HYPE_BASE_SYMBOL)
    paired_coins = set()

    # 🆕 配对前验证:两个币种都是真孤儿(DB 无记录)
    if alt_coin in orphan_entries and base_coin in orphan_entries:
        alt_symbol = coin_to_symbol(alt_coin)
        base_symbol = coin_to_symbol(base_coin)
        alt_is_orphan = alt_symbol not in db_positions_map
        base_is_orphan = base_symbol not in db_positions_map

        if alt_is_orphan and base_is_orphan:
            # 执行现有配对逻辑(完全保持不变)
            alt_ex = orphan_entries[alt_coin]
            base_ex = orphan_entries[base_coin]
            alt_szi = float(alt_ex.get("szi", 0))
            base_szi = float(base_ex.get("szi", 0))

            if (alt_szi > 0) != (base_szi > 0):
                # [配对逻辑代码保持不变...]
                paired_coins.update({alt_coin, base_coin})
                adopted_symbols.append(position.symbol)

    # 🆕 剩余孤儿仓位处理(区分 DB 恢复和真孤儿)
    for coin, ex_pos in orphan_entries.items():
        if coin in paired_coins:
            continue

        symbol = coin_to_symbol(coin)

        # 🆕 检查数据库是否有记录
        db_position = db_positions_map.get(symbol)

        if db_position:
            # ✅ 分支 1:从数据库恢复完整仓位
            try:
                position = self._restore_position_from_db(db_position, ex_pos)

                # Pair 模式:同步 base 腿大小
                if position.pair_mode == "pair" and position.base_symbol:
                    base_coin_for_pair = symbol_to_coin(position.base_symbol)
                    if base_coin_for_pair in exchange_coins:
                        base_ex_pos = exchange_coins[base_coin_for_pair]
                        position.base_size = abs(float(
                            base_ex_pos.get("szi", position.base_size)
                        ))

                self._positions[symbol] = position
                adopted_symbols.append(symbol)

                # DB 恢复日志(INFO 级别,正常流程)
                logger.info(
                    f"✅ 从数据库恢复仓位 ({source}): {symbol} | "
                    f"方向={position.direction} | 大小={position.alt_size} | "
                    f"entry_adaptive_z={position.entry_adaptive_z:.4f} | "
                    f"position_id: {position.position_id}"
                )
                # 🆕 DB 恢复不发送告警(正常流程)

            except Exception as e:
                logger.error(f"从 DB 恢复仓位失败: {symbol} | {e}", exc_info=True)
                # 降级为孤儿收纳
                position = self._create_orphan_position(coin, ex_pos)
                self._positions[symbol] = position
                self._repo.save_position(position)
                adopted_symbols.append(symbol)
                # 发送告警...
        else:
            # ❌ 分支 2:真孤儿 - 创建新仓位(entry_adaptive_z=0)
            position = self._create_orphan_position(coin, ex_pos)
            self._positions[symbol] = position
            self._repo.save_position(position)
            adopted_symbols.append(symbol)

            # 真孤儿日志(WARNING 级别)
            logger.warning(
                f"🔗 真孤儿仓位收纳 ({source}): {symbol} | "
                f"方向={position.direction} | 大小={position.alt_size} | "
                f"入场价=${position.alt_entry_price:.4f} | "
                f"⚠️ 数据库无记录 | position_id: {position.position_id}"
            )

            # 🆕 真孤儿告警(区分于 DB 恢复)
            try:
                sender_colourful(
                    title=f"🚨 真孤儿仓位收纳: {symbol}",
                    content=(
                        f"**来源**: {source}\n"
                        f"**币种**: {symbol}\n"
                        f"**方向**: {position.direction}\n"
                        f"**大小**: {position.alt_size}\n"
                        f"**入场价**: ${position.alt_entry_price:.4f}\n"
                        f"---\n"
                        f"⚠️ **数据库无记录**,可能是外部手动创建的仓位\n"
                        f"**注意**: entry_adaptive_z=0,均值回归退出已禁用\n"
                        f"**建议**: 请确认此仓位来源,必要时手动平仓\n"
                        f"**网络**: {self._config.network_label}"
                    ),
                )
            except (OSError, ValueError) as e:
                logger.error(f"❌ 真孤儿仓位告警发送异常: {e}")

    return adopted_symbols

四、错误处理策略

4.1 降级策略矩阵

场景 错误类型 降级方案 影响
DB 查询失败 网络/连接异常 返回空字典,全部按孤儿处理 丢失字段,但系统可用
DB 恢复失败 数据解析异常 创建孤儿仓位 单个仓位丢失字段
数据不一致 方向/大小冲突 日志记录,继续使用交易所数据 仅日志告警

4.2 错误处理代码

DB 查询失败

try:
    return self._repo.get_positions_by_symbols(...)
except Exception as e:
    logger.error(f"批量验证孤儿仓位失败,降级为孤儿收纳: {e}", exc_info=True)
    return {}  # 降级:保证系统可用性 > 精确性

DB 恢复失败

try:
    position = self._restore_position_from_db(db_position, ex_pos)
except Exception as e:
    logger.error(f"从 DB 恢复仓位失败: {symbol} | {e}", exc_info=True)
    # 降级为孤儿收纳
    position = self._create_orphan_position(coin, ex_pos)
    # 保存 + 告警

五、性能分析

5.1 性能对比

指标 改进前 改进后 变化
DB 查询次数 0 1(批量) +1 次
DB 查询耗时 0ms <5ms +5ms
总体耗时 ~50ms ~55ms +10%
同步周期 60s 60s 无影响

5.2 性能优化手段

批量查询 vs 逐个查询

# ❌ 低效方案(O(N))
for coin in orphan_entries:
    symbol = coin_to_symbol(coin)
    db_pos = repo.get_position_by_symbol(symbol)  # N 次查询

# ✅ 高效方案(O(1))
symbols = [coin_to_symbol(c) for c in orphan_entries]
db_positions_map = repo.get_positions_by_symbols(symbols)  # 1 次查询

索引利用

-- 现有索引(已优化)
CREATE INDEX idx_pair_positions_symbol_status
  ON pair_positions (symbol, status);

-- 批量查询执行计划
EXPLAIN SELECT * FROM pair_positions
WHERE symbol = ANY(ARRAY['PURR/USDC:USDC', 'HYPE/USDC:USDC'])
  AND status IN ('open', 'opening', 'closing');

-- 预期结果:Index Scan(使用索引)

5.3 极端场景

场景:检测到 10 个孤儿币种

  • DB 批量查询耗时: <10ms
  • 总体影响: 仍可接受(<20% 开销)
  • 结论: 性能瓶颈不在 DB 查询

六、兼容性保证

6.1 向后兼容性

场景 行为 影响
已收纳的老孤儿仓位 DB 中 entry_adaptive_z=0,下次恢复时从 DB 读取,保持 entry_adaptive_z=0 ✅ 行为不变
新孤儿仓位 自动走新流程(DB 查询 → 恢复/收纳) ✅ 改进生效
正常仓位 从 DB 恢复,保留完整字段 ✅ 核心收益

6.2 配对逻辑兼容性(Fix #12)

配对判定矩阵

Alt DB Base DB 行为
❌ 无 ❌ 无 ✅ 配对为 pair 仓位
✅ 有 ❌ 无 ✅ Alt 从 DB 恢复,Base 孤儿收纳(两个独立 single 仓位)
❌ 无 ✅ 有 ✅ Base 从 DB 恢复,Alt 孤儿收纳(两个独立 single 仓位)
✅ 有 ✅ 有 ✅ 两个都从 DB 恢复(两个独立 single 仓位)

关键代码

alt_is_orphan = alt_symbol not in db_positions_map
base_is_orphan = base_symbol not in db_positions_map

if alt_is_orphan and base_is_orphan:
    # 执行配对逻辑

七、测试验证

7.1 单元测试场景

场景 1:DB 恢复正常仓位

def test_detect_orphans_db_recovery():
    """交易所有 + 内存无 + DB 有 → 恢复"""
    exchange_coins = {"PURR": {"szi": 100, "entryPx": 0.5}}
    manager._positions = {}
    mock_repo.get_positions_by_symbols.return_value = {
        "PURR/USDC:USDC": {"entry_adaptive_z": 2.5, ...}
    }

    adopted = manager._detect_and_adopt_orphans(exchange_coins, "test")

    assert manager._positions["PURR/USDC:USDC"].entry_adaptive_z == 2.5

场景 2:真孤儿收纳

def test_detect_orphans_true_orphan():
    """交易所有 + 内存无 + DB 无 → 孤儿"""
    exchange_coins = {"PURR": {"szi": 100, "entryPx": 0.5}}
    manager._positions = {}
    mock_repo.get_positions_by_symbols.return_value = {}

    adopted = manager._detect_and_adopt_orphans(exchange_coins, "test")

    assert manager._positions["PURR/USDC:USDC"].entry_adaptive_z == 0.0
    mock_repo.save_position.assert_called_once()

场景 3:配对逻辑兼容性

def test_pairing_with_db_records():
    """PURR(DB 有) + HYPE(DB 无) → 不配对,各自处理"""
    exchange_coins = {
        "PURR": {"szi": 100, "entryPx": 0.5},   # long
        "HYPE": {"szi": -200, "entryPx": 1.5},  # short
    }
    mock_repo.get_positions_by_symbols.return_value = {
        "PURR/USDC:USDC": {"entry_adaptive_z": 2.5}
    }

    adopted = manager._detect_and_adopt_orphans(exchange_coins, "test")

    # PURR 恢复,HYPE 孤儿收纳,不配对
    assert manager._positions["PURR/USDC:USDC"].pair_mode == "single"
    assert manager._positions["HYPE/USDC:USDC"].pair_mode == "single"

7.2 集成测试场景

场景:模拟重启恢复

def test_restart_recovery():
    # 1. 准备 DB 数据
    insert_test_position(
        symbol="PURR/USDC:USDC",
        entry_adaptive_z=2.5,
        status="open"
    )

    # 2. 清空内存(模拟重启)
    manager._positions.clear()

    # 3. 执行恢复
    manager.recover_positions_from_db()

    # 4. 验证:从 DB 恢复,保留完整字段
    assert manager._positions["PURR/USDC:USDC"].entry_adaptive_z == 2.5

八、实施计划

Phase 1: 核心实现(4-6 小时)

  • [ ] trade_repository.py: 添加 get_positions_by_symbols() 方法
  • [ ] position_manager.py: 添加 3 个辅助方法
  • [ ] position_manager.py: 修改 _detect_and_adopt_orphans() 主流程
  • [ ] 代码审查与优化

Phase 2: 测试验证(4-6 小时)

  • [ ] 编写单元测试(3 个核心场景)
  • [ ] 编写集成测试(重启恢复场景)
  • [ ] 手动测试验证(测试网环境)
  • [ ] 性能测试(批量查询耗时)

Phase 3: 文档与监控(可选,2-3 小时)

  • [ ] 添加性能监控指标
  • [ ] 更新代码注释和文档
  • [ ] 配置告警阈值

九、关键收益总结

9.1 核心收益

收益 说明 影响
✅ 避免误判 数据库有记录的正常仓位不再被当作孤儿 减少 false positive
✅ 保留字段 entry_adaptive_z, entry_zscore_4h 完整恢复 均值回归策略正常工作
✅ 精准告警 DB 恢复不告警,真孤儿才告警 减少告警噪音 50%+
✅ 性能可控 批量查询优化,总体性能影响 <10% 对系统影响最小
✅ 向后兼容 无需数据迁移和配置变更 零停机升级

9.2 业务价值

均值回归策略恢复

  • 现状:孤儿仓位 entry_adaptive_z=0 → 均值回归退出失效
  • 改进后:从 DB 恢复 entry_adaptive_z → 策略正常工作
  • 影响:提升退出策略准确性,减少亏损扩大风险

告警噪音减少

  • 现状:每次恢复都可能触发孤儿告警(正常流程)
  • 改进后:仅真孤儿才告警(异常流程)
  • 影响:运维效率提升,关注真正的异常情况

数据完整性

  • 现状:多个关键字段丢失(entry_signal_id, peak_pnl_pct 等)
  • 改进后:完整恢复所有信号快照字段
  • 影响:信号追溯、移动止损等功能正常工作

十、风险评估

风险 概率 影响 缓解措施
DB 查询失败 降级策略:返回空字典,按孤儿处理
性能回归 批量查询优化,监控查询耗时
数据不一致 极低 方向校验,日志记录,降级处理
并发竞态 锁保护,原子操作

结论:总体风险可控,收益远大于风险。


附录:相关代码位置

关键文件清单

文件 行号 说明
src/trading/trade_repository.py 新增 get_positions_by_symbols() 方法
src/trading/position_manager.py 996-1144 _detect_and_adopt_orphans() 主流程
src/trading/position_manager.py 新增 _batch_verify_orphans_from_db()
src/trading/position_manager.py 新增 _restore_position_from_db()
src/trading/position_manager.py 新增 _create_orphan_position()
src/trading/models.py 参考 PairPosition 字段定义
database/init_timescaledb.sql 431-440 索引定义

数据库表结构

pair_positions 表关键字段

  • position_id (UUID PRIMARY KEY)
  • symbol (TEXT)
  • status (TEXT)
  • entry_adaptive_z (DOUBLE) ← 最关键字段
  • entry_zscore_4h (DOUBLE)
  • entry_signal_id (UUID)
  • peak_pnl_pct (DOUBLE)
  • network (TEXT)

相关索引

CREATE INDEX idx_pair_positions_symbol_status
  ON pair_positions (symbol, status);

文档版本:v1.0
创建日期:2026-02-15
作者:Claude Code
状态:待实施

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG