OrderFilledEvent 接入订单追踪系统
OrderFilledEvent 接入订单追踪系统
目标
- 补上“成交事件有发布无消费”的缺口,让订单追踪系统成为 OrderFilledEvent 的消费者。
- 统一成交入口:所有 fill(user 频道
data.fills+ userFills 频道)都经 EventBus 以 OrderFilledEvent 发布,订单追踪只通过订阅事件更新状态,逻辑更清晰、易扩展。
设计要点
- 单一入口:订单追踪的“应用一笔成交”逻辑只在一个地方执行——对 OrderFilledEvent 的订阅 handler(与现有
_on_user_fill逻辑等价)。 - 避免重复:userFills 不再走
handle_message→_on_user_fill,仅通过_cache_latest_data发布 OrderFilledEvent,由 WebSocketOrderManager 订阅后处理;orderUpdates 仍走handle_message→_on_order_update(终态与 grace 逻辑不变)。 - 去重:事件中携带可唯一标识一笔 fill 的 key(如
metadata["fill_id"]),WebSocketOrderManager 继续用现有的_fill_ids做去重。 - 双源:user 频道已有
_publish_user_events发布 OrderFilledEvent;在_cache_latest_data中为 userFills 增加分支,解析后对每条 fill 发布 OrderFilledEvent(字段与现有事件定义一致,fill_id 放 metadata)。
实现步骤
1. 从 userFills 发布 OrderFilledEvent
文件:src/utils/websocket/enhanced_ws_manager.py
- 在
_cache_latest_data()中增加channel == "userFills"分支(与现有"user"并列)。 - 从
msg["data"]取 fill 列表(兼容单条为 list 包装),对每条 fill:- 构造
OrderFilledEvent(order_id=str(oid),symbol=coin,filled_qty=sz,filled_price=px,fee,closed_pnl等,与_publish_user_events中 user.fills 的映射一致)。 - 生成 fill 唯一 key:优先用
fill.get("tid"),否则用oid:px:sz:time的哈希(与websocket_order_manager的_fill_key一致),写入event.metadata["fill_id"]。 self._event_bus.publish(event)。
- 构造
这样 user 与 userFills 两路都会向 EventBus 发布 OrderFilledEvent,且带可去重标识。
2. WebSocketOrderManager 订阅 OrderFilledEvent 并应用成交
文件:src/trading/websocket_order_manager.py
- 依赖 EventBus:构造函数增加参数
event_bus(可选,便于测试)。Executor 创建 manager 时传入:WebSocketOrderManager(executor=self, event_bus=self._event_bus)(见 executor 初始化约 143 行)。 - 订阅:在
WebSocketOrderManager初始化时(或由 Executor 在创建 manager 后调用一次register_event_handlers())调用event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)。若在 manager 内订阅,需在构造末尾、且保证只订阅一次。 - Handler
_on_order_filled_event(self, event: OrderFilledEvent):oid = _safe_int_oid(event.order_id),无效则 return。fill_id = event.metadata.get("fill_id")或由(order_id, filled_qty, filled_price, timestamp)生成备用 key(与发布端一致)。- 取
fill_px = event.filled_price、fill_sz = event.filled_qty,若 ≤0 则 return。 - 在
_lock内:按 oid 查_tracking;若不存在或非 PENDING 则 return;若fill_id in tracking._fill_ids则 return(去重);否则_fill_ids.add(fill_id),_accumulate_fill(tracking, fill_px, fill_sz),has_fill_price=True,fill_count+=1;若tracking._ws_status is not None则标记需要 resolve。 - 锁外如需要则调用
_resolve(oid, tracking)。
- 逻辑与现有
_on_user_fill一致,仅输入从 raw message 改为 Event,保证行为不变、订单追踪统一由事件驱动。
3. 不再把 userFills 交给 handle_message
文件:src/services/realtime_kline_service_base.py
- 在
on_message中,当前将orderUpdates与userFills都路由到WebSocketOrderManager.handle_message(约 639–661 行)。改为:仅channel == "orderUpdates"时调用mgr.handle_message(msg);channel == "userFills"时不再调用handle_message(不再把 userFills 原始消息交给订单管理器)。 - userFills 仍会经
_wrapped_callback→_cache_latest_data发布 OrderFilledEvent,订单管理器通过订阅接收,避免同一笔 fill 既走 handle_message 又走事件导致重复累计。 - 缓冲区回放逻辑中,若当前仍对缓冲消息按 channel 路由到
handle_message,则同样只对orderUpdates回放,不对userFills回放(或若希望重连后补发历史 fill,可考虑由 EventBus 的发布方在重连后通过 API 补拉并发布事件,而不是回放原始 userFills;本方案可先采用“userFills 不回放”,与“不把 userFills 交给 handle_message”一致)。
4. Executor 传入 EventBus
文件:src/trading/executor.py
- 创建 WebSocketOrderManager 时传入 event_bus:
self._ws_order_manager = WebSocketOrderManager(executor=self, event_bus=self._event_bus)(约 143 行)。若 manager 构造函数签名增加event_bus,此处同步修改。
5. OrderFilledEvent 的 fill 去重标识(可选但推荐)
文件:src/events/trading_events.py
- 在 OrderFilledEvent 的 docstring 中说明:发布方应将唯一 fill 标识放入
metadata["fill_id"],供订阅方去重。事件类本身已有metadata(来自 Event 基类),无需新增字段。 - 发布端(enhanced_ws_manager):在 user 频道的
_publish_user_events里,对每条 fill 也写入metadata["fill_id"](与 userFills 相同规则:tid 或 oid:px:sz:time 哈希),保证 user 与 userFills 两源去重一致。
6. 双 WebSocket / source 约定(若已存在)
若项目已区分行情 WS 与交易 WS(如《双WebSocket架构设计》),则 OrderFilledEvent 的订阅方(含 WebSocketOrderManager)应只处理 source == "trading"(或当前等价来源)。在 _on_order_filled_event 开头可加:若 event.source != getattr(self._executor, '_expected_ws_source', 'websocket') 则 return(具体常量名与架构文档一致即可)。当前若仅单 WS,可先不过滤或与现有 Executor 事件过滤保持一致。
测试与回归
- 单 WS:下单后确认 userFills 到达时订单能正确 FILLED、加权价与手数正确,且无重复累计。
- 若有 user 频道下发的 fills:确认与 userFills 同时存在时不会重复应用(fill_id 去重)。
- orderUpdates 的 filled/canceled/rejected 与 grace 逻辑不变,超时与 HTTP 兜底行为不变。
小结
| 项目 | 说明 |
|---|---|
| 事件来源 | user 频道 data.fills(已有)+ userFills 频道(新增发布) |
| 消费者 | WebSocketOrderManager 订阅 OrderFilledEvent,应用 fill 并去重、累计、解析 |
| 行为 | 与现有一致;入口统一为事件,便于后续加 TradeLogger、RiskManager 等订阅者 |
这样 OrderFilledEvent 不再“有发布无消费”,订单追踪统一由事件驱动,更智能且易扩展。