量化交易系统数据流和交易流程

数据流与交易流程详解

本文档补充 README.md,详细描述系统运行时的数据流转和交易执行流程。


一、数据流总览

Hyperliquid WS API                    Hyperliquid REST API
      │                                       │
      │ K线/L2/订单更新                        │ 历史K线补回
      ▼                                       ▼
EnhancedWebSocketManager ─────────── KlineDataFiller
      │                                       │
      │ on_message回调                         │ 补回数据
      ▼                                       ▼
RealtimeKlineServiceBase                DataHealingOrchestrator
      │                                       │
      ├─→ 消息去重 (MessageDeduplicator)       │
      ├─→ 解析K线数据                           │
      ├─→ 入批量写入队列 ──→ TimescaleDB        │
      ├─→ 入分析队列                            │
      │                                        │
      ▼                                        ▼
  分析工作线程 (N个)                         zscore_4h 历史值
      │
      ├─→ analyze_multi_period()  ← 从 DB 读取历史K线
      │     ├─→ 相关性分析
      │     ├─→ 双窗口协整检验
      │     ├─→ 多周期 Z-Score 计算
      │     └─→ 信号方向 + 强度判定
      │
      ├─→ 写入 analysis_results 表
      │
      └─→ TradingOrchestrator.process_analysis()
                │
                ▼
          交易执行流程(见下文)

二、交易执行流程

2.1 入场流程

process_analysis(symbol, z4h, multi_period_result, ...)
      │
      ▼
Strategy.process_tick(symbol, z4h, timestamp, ...)
      │
      ├─ 更新 EMA + Welford ──→ 计算 adaptive_z
      │
      ├─ 入场判断:
      │   ├ adaptive_z 从低突破到高? (突破检测)
      │   ├ 有冷却期? → 跳过
      │   └ 已有持仓? → 跳过
      │
      ├─ 返回 EntrySignal(direction, z4h, adaptive_z)
      │
      ▼
Orchestrator.on_entry_signal(symbol, multi_period_result, direction, ...)
      │
      ├─→ 构建 PairTradeSignal
      │
      ▼
PositionManager.open_position(signal, adaptive_z)
      │
      ├─→ 防重复检查 (TOCTOU)
      ├─→ 获取 alt/base 最新价格
      ├─→ RiskManager.pre_trade_check()  ← 9项风控审查
      ├─→ RiskManager.calculate_position_size()
      │     └─ 基础仓位 × 信号强度缩放 × 余额限制 → (alt_size, base_size)
      │
      ├─→ Executor.open_pair_position(signal)
      │     ├─ [限价模式]
      │     │   ├ Leg A: _place_limit_order(alt_coin) → WS等待成交
      │     │   ├ Leg B 预检查: 余额充足? L2流动性?
      │     │   └ Leg B: _place_limit_order(base_coin) → WS等待成交
      │     ├─ [市价模式]
      │     │   ├ Leg A: market_open(alt_coin, slippage)
      │     │   └ Leg B: market_open(base_coin, slippage)
      │     └ 返回 PairOrderResult
      │
      ├─→ 创建 PairPosition → 写入 DB
      ├─→ TradeRepository.save_signal(action_taken='opened')
      ├─→ Strategy.on_position_opened()
      └─→ 飞书通知

2.2 退场流程

process_analysis() → Strategy.process_tick()
      │
      ├─ 退场判断:
      │   ├ adaptive_z 回归到入场值 × reversion_factor?
      │   ├ exit_pending (上次平仓失败)?
      │   └ 方向验证
      │
      ├─ 返回 ExitSignal(reason, z4h, adaptive_z, direction)
      │
      ▼
Orchestrator.on_exit_signal(reversion_info)
      │
      ▼
PositionManager.close_position(symbol, reason='signal')
      │
      ├─→ 查找活跃仓位
      ├─→ 状态锁定 (CLOSING)
      │
      ├─→ Executor.close_pair_position(position)
      │     ├ Leg A: 反向下单 (reduce_only=True)
      │     └ Leg B: 反向下单 (reduce_only=True)
      │     └ 失败腿 → 采纳残留仓位管理
      │
      ├─→ 计算已实现盈亏
      ├─→ TradeRepository.update_position_status(CLOSED)
      ├─→ RiskManager.update_daily_pnl()
      ├─→ Strategy.on_position_closed()
      └─→ 飞书通知(含PnL/持仓时长/订单详情)

2.3 止损流程

止损监控线程 (_stop_loss_monitor) — 周期: 30秒
      │
      ▼
  遍历所有活跃仓位
      │
      ├─→ PositionManager.update_position_prices()
      │
      ├─→ RiskManager.update_trailing_peak(position)
      │
      ├─→ 检查固定止损: check_stop_loss(position)
      │     └ PnL% < -stop_loss_pct? → 平仓
      │
      ├─→ 检查移动止损: check_trailing_stop(position)
      │     └ 盈利达到激活阈值 + 从峰值回撤超过回调阈值? → 平仓
      │
      ├─→ 检查持仓超时: check_max_hold_duration(position)
      │     └ 持仓时长 > max_hold_hours? → 平仓
      │
      └─→ 触发平仓: _close_with_retry(pos, reason)
            └ 最多重试 N 次(指数退避)

三、仓位同步流程

仓位同步线程 (_position_sync) — 周期: 60秒
      │
      ▼
PositionManager.sync_with_exchange()
      │
      ├─→ Executor 查询交易所实际仓位
      │
      ├─→ 对比内存仓位 vs 交易所仓位
      │
      ├─ 情况1: 交易所仓位消失(幽灵仓位)
      │   └ 内存清理 → DB 标记关闭
      │
      ├─ 情况2: base 腿消失(Leg B 异常)
      │   └ 尝试平仓 alt 腿 → 失败则告警
      │
      └─ 情况3: 交易所有孤儿仓位
          └ _detect_and_adopt_orphans()
              ├ alt+base 方向相反? → 合并为 pair 仓位
              └ 单独存在? → 纳入管理(受止损保护)

四、数据自愈流程

TradingOrchestrator.start()
      │
      ▼
DataHealingOrchestrator.heal_and_prepare(required_count=358)
      │
      ├─ Phase 1: _load_zscore_history()
      │   └ SQL查询 analysis_results 表
      │
      ├─ Phase 2: _diagnose(records, required_count)
      │   ├ ContinuityChecker → gap_targets (缺口)
      │   ├ _check_freshness() → stale_targets (过期)
      │   └ 数量检查 → shortfall_targets (不足)
      │
      ├─ Phase 3: 修复循环 (最多 MAX_ITERATIONS 次)
      │   ├ _merge_repair_targets(diagnosis) → 合并去重排序
      │   ├ RepairExecutor.repair(targets) → REST API 回填
      │   ├ 重新加载 + 重新诊断
      │   └ 健康? → 退出循环
      │
      └─ Phase 4: _final_assessment()
          ├ QualityAssessor.assess(records) → QualityReport
          └ 返回 HealingResult(status, data, quality, ...)
                ├ status='ready' → 正常启动
                ├ status='degraded' → 降级启动
                └ status='critical' → 使用降级回退方案

五、WebSocket 连接生命周期

EnhancedWebSocketManager
      │
      ├─ 初始化: subscriptions + callbacks
      │
      ├─ 连接: _connect()
      │   ├ websocket.WebSocketApp(url, ...)
      │   ├ 发送订阅列表(批量,含延迟防限流)
      │   └ 状态: DISCONNECTED → CONNECTING → CONNECTED
      │
      ├─ 运行时:
      │   ├ on_message() → 解析 → 回调分发
      │   │   ├ candle → K线处理
      │   │   ├ l2Book → L2缓存更新 + OrderBookUpdatedEvent
      │   │   ├ orderUpdates → WebSocketOrderManager
      │   │   └ userFills → 成交信息补充
      │   │
      │   ├ HealthMonitor.on_message() → 更新心跳时间
      │   │   └ 定期检查: is_alive() → 超时则触发重连
      │   │
      │   └ Ping 线程: 每 N 秒发送心跳防止会话过期
      │
      ├─ 断开: on_close()
      │   ├ 状态: CONNECTED → RECONNECTING
      │   ├ ReconnectionManager.get_delay() → 指数退避
      │   └ 重新连接 → 重新订阅
      │
      └─ 关闭: stop()
          └ 状态: → DISCONNECTED

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG