OrderFilledEvent 接入订单追踪系统2
OrderFilledEvent 接入订单追踪系统
现状诊断
当前 fill 处理路径
user 频道 data.fills → _publish_user_events() → OrderFilledEvent → 🕳️ 无消费者(数据丢失)
userFills 频道 → handle_message() → _on_user_fill() → ✅ 订单追踪正常
两条路径并存,结果截然不同:
- user 频道的 fills:
_publish_user_events已发布OrderFilledEvent(优先级 CRITICAL),但 EventBus 上零订阅者,成交数据完全丢失,不进入订单追踪。 - userFills 频道:通过
realtime_kline_service_base.on_message路由至WebSocketOrderManager.handle_message→_on_user_fill,订单追踪正常工作。
根本问题
OrderFilledEvent 是"已搭好发射台但从未点火"的组件——有定义、有发布、有 CRITICAL 优先级,但没有任何消费者。这不是事件多余,而是订阅方缺位。
EventBus 特性确认
经代码核查,EventBus.publish 为同步阻塞调用,直接遍历 handler 列表执行,不涉及队列或线程池。因此:
- 消息顺序与 WebSocket 到达顺序完全一致,无时序风险
publish(event)在行为上等价于直接方法调用,无额外开销- 使用 EventBus 的目的是解耦(基础设施层不直接持有业务层引用),而非异步
设计目标
- 修复 user 频道 fills 丢失:让
WebSocketOrderManager成为OrderFilledEvent的消费者。 - 统一成交入口:两路 fill 来源(user 频道 + userFills 频道)均通过
OrderFilledEvent进入订单追踪,_fill_ids去重保证不重复累计。 - 保持行为不变:订单状态机、加权均价计算、grace 超时、HTTP 兜底逻辑全部不动。
- 架构方向正确:基础设施层(enhanced_ws_manager)只发事件,业务层(WebSocketOrderManager)自行订阅,避免跨层耦合;后续 TradeLogger、RiskManager 等新消费者只需
subscribe,无需改动现有代码。
架构变化
变更前
realtime_kline_service_base.on_message
├── channel == "orderUpdates" → mgr.handle_message() → _on_order_update()
└── channel == "userFills" → mgr.handle_message() → _on_user_fill()
enhanced_ws_manager._publish_user_events (user 频道 fills)
└── publish(OrderFilledEvent) → 【无消费者,数据丢失】
变更后
realtime_kline_service_base.on_message
├── channel == "orderUpdates" → mgr.handle_message() → _on_order_update() ← 不变
└── channel == "userFills" → 【不再路由到 handle_message】
enhanced_ws_manager(两路来源,均发布带 fill_id 的事件)
├── _publish_user_events (user 频道 fills) → publish(OrderFilledEvent + fill_id) ─┐
└── _cache_latest_data (userFills 频道) → publish(OrderFilledEvent + fill_id) ─┤
↓
EventBus(同步)
↓
WebSocketOrderManager._on_order_filled_event()
└── _fill_ids 去重 → _accumulate_fill()
实现步骤
执行顺序:步骤一(修复发布端)→ 步骤二(接入消费端)→ 步骤三(移除旧路由)。
步骤一和步骤二完成、验证通过后,再执行步骤三;避免在新路径未验证前就切断旧路径。
步骤一:修复两路发布端,统一写入 fill_id
文件:src/utils/websocket/enhanced_ws_manager.py
两处修改保持 fill_id 生成规则完全一致:优先取 fill["tid"],不存在时取 md5(f"{oid}:{px}:{sz}:{time}") ——与 websocket_order_manager._fill_key() 函数的逻辑相同,直接复用或提取为共享函数。
修改点 A:_publish_user_events(user 频道,约第 1386 行)
在已有的 OrderFilledEvent 构造中补充 metadata:
fill_id = fill.get("tid") or _fill_key(fill) # 与 _fill_key() 规则一致
event = OrderFilledEvent(
timestamp=datetime.now(),
source="websocket",
order_id=str(fill.get("oid", "")),
client_order_id=fill.get("cloid"),
symbol=fill.get("coin", ""),
side=fill.get("side", ""),
filled_qty=float(fill.get("sz", 0)),
filled_price=float(fill.get("px", 0)),
fee=float(fill.get("fee", 0)),
closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
metadata={"fill_id": fill_id}, # ← 新增
)
self._event_bus.publish(event)
修改点 B:_cache_latest_data(userFills 频道,新增分支)
在现有 channel == "user" 分支旁新增:
elif channel == "userFills":
raw = msg.get("data", {})
fills = raw if isinstance(raw, list) else [raw]
for fill in fills:
oid = fill.get("oid")
if not oid:
continue
fill_id = fill.get("tid") or _fill_key(fill) # 与修改点 A 规则一致
event = OrderFilledEvent(
timestamp=datetime.now(),
source="websocket",
order_id=str(oid),
client_order_id=fill.get("cloid"),
symbol=fill.get("coin", ""),
side=fill.get("side", ""),
filled_qty=float(fill.get("sz", 0)),
filled_price=float(fill.get("px", 0)),
fee=float(fill.get("fee", 0)),
closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
metadata={"fill_id": fill_id},
)
self._event_bus.publish(event)
步骤二:WebSocketOrderManager 订阅 OrderFilledEvent
文件 A:src/trading/executor.py(约第 143 行)
# 变更前
self._ws_order_manager = WebSocketOrderManager(executor=self)
# 变更后
self._ws_order_manager = WebSocketOrderManager(executor=self, event_bus=self._event_bus)
文件 B:src/trading/websocket_order_manager.py
构造函数:增加 event_bus 参数,在末尾完成订阅:
def __init__(self, executor, event_bus=None):
self._executor = executor
self._tracking: dict[int, OrderTracking] = {}
self._lock = threading.Lock()
self._http_busy: set[int] = set()
self._event_bus = event_bus
if event_bus is not None:
event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)
event_bus=None 仅供单元测试使用;生产路径由 Executor 强制传入,不允许缺省运行。
新增 handler:_on_order_filled_event
def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
oid = _safe_int_oid(event.order_id)
if oid is None:
return
fill_px = event.filled_price
fill_sz = event.filled_qty
if fill_px <= 0 or fill_sz <= 0:
return
# fill_id:优先取发布端写入的值,不存在时回退到与 _fill_key 等价的备用规则
fill_id: str = event.metadata.get("fill_id") or (
f"{oid}:{fill_px}:{fill_sz}:{event.timestamp.timestamp()}"
)
should_resolve = False
resolve_tracking = None
with self._lock:
tracking = self._tracking.get(oid)
if tracking is None or tracking.status != OrderStatus.PENDING:
return
if fill_id in tracking._fill_ids:
logger.debug("跳过重复 fill: oid=%s fill_id=%s", oid, fill_id)
return
tracking._fill_ids.add(fill_id)
self._accumulate_fill(tracking, fill_px, fill_sz)
tracking.has_fill_price = True
tracking.fill_count += 1
if tracking._ws_status is not None:
should_resolve = True
resolve_tracking = tracking
if should_resolve:
self._resolve(oid, resolve_tracking)
逻辑与现有 _on_user_fill 完全等价,仅将输入从原始 dict 换为 Event 对象。
取消订阅:在 manager 的 close() 或 reset() 方法中调用:
if self._event_bus is not None:
self._event_bus.unsubscribe(OrderFilledEvent, self._on_order_filled_event)
防止 manager 重建时 handler 残留或多次订阅。
步骤三:移除 userFills 旧路由
前提:步骤一、二已上线并验证订单追踪正常后,再执行此步骤。
文件:src/services/realtime_kline_service_base.py(约第 639 行)
# 变更前
if channel in ("orderUpdates", "userFills"):
mgr.handle_message(msg)
# 变更后
if channel == "orderUpdates": # userFills 已由 EventBus 接管,不再直接路由
mgr.handle_message(msg)
缓冲区回放(重连恢复逻辑):同步修改,缓冲消息回放时同样只对 orderUpdates 调用 handle_message,不对 userFills 回放。
说明:移除旧路由后,重连期间的历史 userFills 消息不会被补偿回放。这是有意的取舍:
- 断线期间的成交补偿应通过 REST API 查询订单状态,而非回放 WebSocket 原始消息(现有 HTTP 兜底逻辑不变)。
- 若未来需要精确补偿,应在
WebSocketReconnectedEvent的 handler 中主动拉取 fills,而不是回放原始 userFills。
fill_id 生成规则(发布端与消费端必须一致)
| 优先级 | 规则 | 说明 |
|---|---|---|
| 1 | tid:{fill["tid"]} |
交易所唯一成交 ID(Hyperliquid WsFill.tid),与现有 _fill_key 格式一致 |
| 2 | hash:{md5(oid:px:sz:time)} |
tid 不存在时的确定性备用 key,与现有 _fill_key 一致 |
重要:存储的 fill_id 必须与 websocket_order_manager._fill_key() 的返回值一致(带 tid: 或 hash: 前缀),否则 user 与 userFills 两路对同一笔成交会产生不同 key,去重失效。发布端应统一调用 _fill_key(fill) 或共享的 _make_fill_id(fill),不要使用裸的 fill.get("tid")。
发布端(两处:_publish_user_events 和 _cache_latest_data)与消费端(_on_order_filled_event 的备用 key)的生成规则必须完全一致。建议将该逻辑提取为模块级私有函数 _make_fill_id(fill: dict) -> str,两侧共享调用。
联网文档与实测:响应体结构与 key 统一
官方数据结构(Hyperliquid)
- 文档:WebSocket Subscriptions 中定义:
userFills→WsUserFills { user, fills: WsFill[] }userEvents(或部分实现中的user)→WsUserEvent可包含{ "fills": WsFill[] }
- WsFill 字段(与去重相关):
oid,tid(唯一成交 ID),px,sz,time,coin,side,fee,closedPnl, 等。两路 fill 来源均使用同一 WsFill 结构,故可用同一套 fill_id 规则。
实测与 key 协调
-
在测试网观察真实响应:运行脚本捕获 WebSocket 消息与 fill 结构:
TRADING_NETWORK=testnet python scripts/capture_fill_responses_testnet.py脚本会订阅
orderUpdates/userFills/user,在测试网下一笔小限价单(或手动下单),将收到的消息写入scripts/out/fill_responses_<ts>.json,并在控制台打印各 channel 下 fill 的字段集合与样本,便于确认tid、time等在两路中的存在与格式。 -
key/id 统一约定:
- 唯一标识:以 fill_id 作为去重键,与现有
OrderTracking._fill_ids一致。 - 生成方式:与
_fill_key(fill)保持一致:有tid时用tid:{tid},无tid时用hash:{md5(oid:px:sz:time)};发布端与消费端共用同一函数或同一规则,保证 user 频道与 userFills 频道对同一笔成交得到相同 fill_id,从而只累计一次。
- 唯一标识:以 fill_id 作为去重键,与现有
测试矩阵
| 场景 | 验证点 |
|---|---|
| 仅 userFills 到达(常规路径) | 订单正确进入 FILLED,加权均价和手数正确 |
| 仅 user 频道 fills 到达 | 订单正确进入 FILLED,与上一行行为一致 |
| user 频道与 userFills 同时到达同一笔 fill | _fill_ids 去重,仅累计一次,加权均价不偏移 |
| userFills 先到,orderUpdates 后到 | fills 累计后,orderUpdates 到达时触发 _resolve |
orderUpdates 先到(_ws_status 已设),userFills 后到 |
fill 到达时立即触发 _resolve |
| fill_id 缺失(metadata 为空) | 备用 key 生成,handler 不崩溃,去重仍有效 |
| manager 重建后重新注入 event_bus | 旧 handler 已 unsubscribe,新 handler 仅订阅一次 |
| orderUpdates 的 canceled/rejected | 行为不变,grace 超时与 HTTP 兜底逻辑不受影响 |
| 重连后缓冲 orderUpdates 回放 | 仅 orderUpdates 回放,userFills 不回放,行为符合预期 |
变更文件汇总
| 文件 | 变更内容 |
|---|---|
src/utils/websocket/enhanced_ws_manager.py |
_publish_user_events 补 fill_id;_cache_latest_data 新增 userFills 分支发布事件 |
src/trading/websocket_order_manager.py |
构造函数增加 event_bus 参数;新增 _on_order_filled_event handler;close/reset 时 unsubscribe |
src/trading/executor.py |
创建 WebSocketOrderManager 时传入 event_bus |
src/services/realtime_kline_service_base.py |
on_message 和缓冲回放中,userFills 不再路由到 handle_message |