量化交易系统数据流和交易流程
数据流与交易流程详解
本文档补充
README.md,详细描述系统运行时的数据流转和交易执行流程。
一、数据流总览
Hyperliquid WS API Hyperliquid REST API
│ │
│ K线/L2/订单更新 │ 历史K线补回
▼ ▼
EnhancedWebSocketManager ─────────── KlineDataFiller
│ │
│ on_message回调 │ 补回数据
▼ ▼
RealtimeKlineServiceBase DataHealingOrchestrator
│ │
├─→ 消息去重 (MessageDeduplicator) │
├─→ 解析K线数据 │
├─→ 入批量写入队列 ──→ TimescaleDB │
├─→ 入分析队列 │
│ │
▼ ▼
分析工作线程 (N个) zscore_4h 历史值
│
├─→ analyze_multi_period() ← 从 DB 读取历史K线
│ ├─→ 相关性分析
│ ├─→ 双窗口协整检验
│ ├─→ 多周期 Z-Score 计算
│ └─→ 信号方向 + 强度判定
│
├─→ 写入 analysis_results 表
│
└─→ TradingOrchestrator.process_analysis()
│
▼
交易执行流程(见下文)
二、交易执行流程
2.1 入场流程
process_analysis(symbol, z4h, multi_period_result, ...)
│
▼
Strategy.process_tick(symbol, z4h, timestamp, ...)
│
├─ 更新 EMA + Welford ──→ 计算 adaptive_z
│
├─ 入场判断:
│ ├ adaptive_z 从低突破到高? (突破检测)
│ ├ 有冷却期? → 跳过
│ └ 已有持仓? → 跳过
│
├─ 返回 EntrySignal(direction, z4h, adaptive_z)
│
▼
Orchestrator.on_entry_signal(symbol, multi_period_result, direction, ...)
│
├─→ 构建 PairTradeSignal
│
▼
PositionManager.open_position(signal, adaptive_z)
│
├─→ 防重复检查 (TOCTOU)
├─→ 获取 alt/base 最新价格
├─→ RiskManager.pre_trade_check() ← 9项风控审查
├─→ RiskManager.calculate_position_size()
│ └─ 基础仓位 × 信号强度缩放 × 余额限制 → (alt_size, base_size)
│
├─→ Executor.open_pair_position(signal)
│ ├─ [限价模式]
│ │ ├ Leg A: _place_limit_order(alt_coin) → WS等待成交
│ │ ├ Leg B 预检查: 余额充足? L2流动性?
│ │ └ Leg B: _place_limit_order(base_coin) → WS等待成交
│ ├─ [市价模式]
│ │ ├ Leg A: market_open(alt_coin, slippage)
│ │ └ Leg B: market_open(base_coin, slippage)
│ └ 返回 PairOrderResult
│
├─→ 创建 PairPosition → 写入 DB
├─→ TradeRepository.save_signal(action_taken='opened')
├─→ Strategy.on_position_opened()
└─→ 飞书通知
2.2 退场流程
process_analysis() → Strategy.process_tick()
│
├─ 退场判断:
│ ├ adaptive_z 回归到入场值 × reversion_factor?
│ ├ exit_pending (上次平仓失败)?
│ └ 方向验证
│
├─ 返回 ExitSignal(reason, z4h, adaptive_z, direction)
│
▼
Orchestrator.on_exit_signal(reversion_info)
│
▼
PositionManager.close_position(symbol, reason='signal')
│
├─→ 查找活跃仓位
├─→ 状态锁定 (CLOSING)
│
├─→ Executor.close_pair_position(position)
│ ├ Leg A: 反向下单 (reduce_only=True)
│ └ Leg B: 反向下单 (reduce_only=True)
│ └ 失败腿 → 采纳残留仓位管理
│
├─→ 计算已实现盈亏
├─→ TradeRepository.update_position_status(CLOSED)
├─→ RiskManager.update_daily_pnl()
├─→ Strategy.on_position_closed()
└─→ 飞书通知(含PnL/持仓时长/订单详情)
2.3 止损流程
止损监控线程 (_stop_loss_monitor) — 周期: 30秒
│
▼
遍历所有活跃仓位
│
├─→ PositionManager.update_position_prices()
│
├─→ RiskManager.update_trailing_peak(position)
│
├─→ 检查固定止损: check_stop_loss(position)
│ └ PnL% < -stop_loss_pct? → 平仓
│
├─→ 检查移动止损: check_trailing_stop(position)
│ └ 盈利达到激活阈值 + 从峰值回撤超过回调阈值? → 平仓
│
├─→ 检查持仓超时: check_max_hold_duration(position)
│ └ 持仓时长 > max_hold_hours? → 平仓
│
└─→ 触发平仓: _close_with_retry(pos, reason)
└ 最多重试 N 次(指数退避)
三、仓位同步流程
仓位同步线程 (_position_sync) — 周期: 60秒
│
▼
PositionManager.sync_with_exchange()
│
├─→ Executor 查询交易所实际仓位
│
├─→ 对比内存仓位 vs 交易所仓位
│
├─ 情况1: 交易所仓位消失(幽灵仓位)
│ └ 内存清理 → DB 标记关闭
│
├─ 情况2: base 腿消失(Leg B 异常)
│ └ 尝试平仓 alt 腿 → 失败则告警
│
└─ 情况3: 交易所有孤儿仓位
└ _detect_and_adopt_orphans()
├ alt+base 方向相反? → 合并为 pair 仓位
└ 单独存在? → 纳入管理(受止损保护)
四、数据自愈流程
TradingOrchestrator.start()
│
▼
DataHealingOrchestrator.heal_and_prepare(required_count=358)
│
├─ Phase 1: _load_zscore_history()
│ └ SQL查询 analysis_results 表
│
├─ Phase 2: _diagnose(records, required_count)
│ ├ ContinuityChecker → gap_targets (缺口)
│ ├ _check_freshness() → stale_targets (过期)
│ └ 数量检查 → shortfall_targets (不足)
│
├─ Phase 3: 修复循环 (最多 MAX_ITERATIONS 次)
│ ├ _merge_repair_targets(diagnosis) → 合并去重排序
│ ├ RepairExecutor.repair(targets) → REST API 回填
│ ├ 重新加载 + 重新诊断
│ └ 健康? → 退出循环
│
└─ Phase 4: _final_assessment()
├ QualityAssessor.assess(records) → QualityReport
└ 返回 HealingResult(status, data, quality, ...)
├ status='ready' → 正常启动
├ status='degraded' → 降级启动
└ status='critical' → 使用降级回退方案
五、WebSocket 连接生命周期
EnhancedWebSocketManager
│
├─ 初始化: subscriptions + callbacks
│
├─ 连接: _connect()
│ ├ websocket.WebSocketApp(url, ...)
│ ├ 发送订阅列表(批量,含延迟防限流)
│ └ 状态: DISCONNECTED → CONNECTING → CONNECTED
│
├─ 运行时:
│ ├ on_message() → 解析 → 回调分发
│ │ ├ candle → K线处理
│ │ ├ l2Book → L2缓存更新 + OrderBookUpdatedEvent
│ │ ├ orderUpdates → WebSocketOrderManager
│ │ └ userFills → 成交信息补充
│ │
│ ├ HealthMonitor.on_message() → 更新心跳时间
│ │ └ 定期检查: is_alive() → 超时则触发重连
│ │
│ └ Ping 线程: 每 N 秒发送心跳防止会话过期
│
├─ 断开: on_close()
│ ├ 状态: CONNECTED → RECONNECTING
│ ├ ReconnectionManager.get_delay() → 指数退避
│ └ 重新连接 → 重新订阅
│
└─ 关闭: stop()
└ 状态: → DISCONNECTED