OrderFilledEvent 接入订单追踪系统6
OrderFilledEvent 接入订单追踪系统
现状诊断
当前 fill 处理路径
user 频道 data.fills → _publish_user_events() → OrderFilledEvent → 🕳️ 无消费者(数据丢失)
userFills 频道 → handle_message() → _on_user_fill() → 🕳️ 解析路径错误(数据丢失)
两条路径均静默失效:
- user 频道的 fills:
_publish_user_events已发布OrderFilledEvent(优先级 CRITICAL),但 EventBus 上零订阅者,成交数据完全丢失,不进入订单追踪。 - userFills 频道:通过
realtime_kline_service_base.on_message路由至WebSocketOrderManager.handle_message→_on_user_fill,但解析路径错误,所有 fills 被静默跳过(见下方说明)。
根本问题
问题一:OrderFilledEvent 是"已搭好发射台但从未点火"的组件——有定义、有发布、有 CRITICAL 优先级,但没有任何消费者。这不是事件多余,而是订阅方缺位。
问题二(测试网实测发现):_on_user_fill 的解析路径与 Hyperliquid 实际推送结构不符。
Hyperliquid userFills 的实际 data 结构(两次测试网捕获一致):
{
"channel": "userFills",
"data": {
"isSnapshot": true,
"user": "0x...",
"fills": [ { "oid": ..., "tid": ..., "px": "...", ... } ]
}
}
而现有 _on_user_fill 代码:
raw = message.get("data") # → {"isSnapshot": true, "user": "...", "fills": [...]}(dict)
fills = raw if isinstance(raw, list) else [raw] # → [raw],把整个 dict 当作一个 fill
for fill_data in fills:
oid = _safe_int_oid(fill_data.get("oid")) # → None(dict 无 "oid" key)
if oid is None:
continue # → 所有 fills 均被跳过
data 是嵌套 dict(非列表),代码错误地将整个 data 对象包装为单个 fill,导致 oid 始终为 None,所有 fills 静默跳过,订单追踪从未真正工作。
EventBus 特性确认
经代码核查,EventBus.publish 为同步阻塞调用,直接遍历 handler 列表执行,不涉及队列或线程池。因此:
- 消息顺序与 WebSocket 到达顺序完全一致,无时序风险
publish(event)在行为上等价于直接方法调用,无额外开销- 使用 EventBus 的目的是解耦(基础设施层不直接持有业务层引用),而非异步
设计目标
- 修复 user 频道 fills 丢失:让
WebSocketOrderManager成为OrderFilledEvent的消费者。 - 统一成交入口:两路 fill 来源(user 频道 + userFills 频道)均通过
OrderFilledEvent进入订单追踪,_fill_ids去重保证不重复累计。 - 保持行为不变:订单状态机、加权均价计算、grace 超时、HTTP 兜底逻辑全部不动。
- 架构方向正确:基础设施层(enhanced_ws_manager)只发事件,业务层(WebSocketOrderManager)自行订阅,避免跨层耦合;后续 TradeLogger、RiskManager 等新消费者只需
subscribe,无需改动现有代码。
架构变化
变更前
realtime_kline_service_base.on_message
├── channel == "orderUpdates" → mgr.handle_message() → _on_order_update()
└── channel == "userFills" → mgr.handle_message() → _on_user_fill()
enhanced_ws_manager._publish_user_events (user 频道 fills)
└── publish(OrderFilledEvent) → 【无消费者,数据丢失】
变更后
realtime_kline_service_base.on_message
├── channel == "orderUpdates" → mgr.handle_message() → _on_order_update() ← 不变
└── channel == "userFills" → 【不再路由到 handle_message】
enhanced_ws_manager(两路来源,均通过 _make_fill_id 发布带前缀 fill_id 的事件)
├── _publish_user_events (user 频道 fills) → publish(OrderFilledEvent + fill_id) ─┐
└── _cache_latest_data (userFills 频道) → publish(OrderFilledEvent + fill_id) ─┤
↓
EventBus(同步)
↓
WebSocketOrderManager._on_order_filled_event()
└── _fill_ids 去重 → _accumulate_fill()
实现步骤
执行顺序:前提(提取共享函数)→ 步骤一(修复发布端)→ 步骤二(接入消费端)→ 步骤三(移除旧路由)。
步骤一和步骤二完成、验证通过(见步骤三前提标准)后,再执行步骤三;避免在新路径未验证前就切断旧路径。
前提:提取共享函数 _make_fill_id
这是本次变更的基础。发布端两处与消费端必须调用同一函数,否则同一笔成交在两路产生不同 key,去重失效。
在 src/utils/websocket/enhanced_ws_manager.py 顶部(模块级)新增:
import hashlib
def _make_fill_id(fill: dict) -> str:
"""
生成 fill 的唯一去重 key,规则与 websocket_order_manager._fill_key() 完全一致。
- 有 tid:返回 "tid:{tid}"
- 无 tid:返回 "hash:{md5(oid:px:sz:time)}"
注意:px/sz 直接使用原始字符串,不做类型转换,与 _fill_key 保持一致。
"""
tid = fill.get("tid")
if tid:
return f"tid:{tid}"
oid = fill.get("oid", "")
px = fill.get("px", "")
sz = fill.get("sz", "")
t = fill.get("time", "")
return f"hash:{hashlib.md5(f'{oid}:{px}:{sz}:{t}'.encode()).hexdigest()}"
同时将 websocket_order_manager._fill_key() 迁移为调用此共享函数(或直接内联相同逻辑),确保两侧行为完全一致。
步骤一:修复两路发布端,统一写入 fill_id
事件定义说明:
OrderFilledEvent继承自Event,已有metadata字段,无需改事件定义(避免误改src/events/trading_events.py)。发布端仅需在构造事件时写入metadata={"fill_id": fill_id}即可。
文件:src/utils/websocket/enhanced_ws_manager.py
两处修改均调用 _make_fill_id(fill),保证 fill_id 带有 tid: 或 hash: 前缀,与 _fill_key() 返回值格式完全一致。
修改点 A:_publish_user_events(user 频道,约第 1386 行)
在已有的 OrderFilledEvent 构造中,将裸字段读取替换为 _make_fill_id 调用,并补充 metadata:
fill_id = _make_fill_id(fill) # ← 使用共享函数,保证带前缀
event = OrderFilledEvent(
timestamp=datetime.now(),
source="websocket",
order_id=str(fill.get("oid", "")),
client_order_id=fill.get("cloid"),
symbol=fill.get("coin", ""),
side=fill.get("side", ""),
filled_qty=float(fill.get("sz", 0)),
filled_price=float(fill.get("px", 0)),
fee=float(fill.get("fee", 0)),
closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
metadata={"fill_id": fill_id}, # ← 新增,消费端依赖此字段去重
)
self._event_bus.publish(event)
修改点 B:_cache_latest_data(userFills 频道,新增分支)
在现有 channel == "user" 分支旁新增。
注意数据结构(测试网实测验证):Hyperliquid
userFills推送的顶层 data 为WsUserFills { isSnapshot?, user, fills: WsFill[] },fills 数组在data["fills"]下,不是 data 本身。快照消息含isSnapshot: true,实时成交消息同样使用此结构(isSnapshot缺省或为 false)。两种情况均需取data["fills"]。
elif channel == "userFills":
raw = msg.get("data", {})
# data 结构为 WsUserFills { user, fills: WsFill[] },取 fills 字段
fills = raw.get("fills", []) if isinstance(raw, dict) else raw
for fill in fills:
oid = fill.get("oid")
if not oid:
continue
fill_id = _make_fill_id(fill) # ← 与修改点 A 完全一致
event = OrderFilledEvent(
timestamp=datetime.now(),
source="websocket",
order_id=str(oid),
client_order_id=fill.get("cloid"),
symbol=fill.get("coin", ""),
side=fill.get("side", ""),
filled_qty=float(fill.get("sz", 0)),
filled_price=float(fill.get("px", 0)),
fee=float(fill.get("fee", 0)),
closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
metadata={"fill_id": fill_id},
)
self._event_bus.publish(event)
步骤二:WebSocketOrderManager 订阅 OrderFilledEvent
文件 A:src/trading/executor.py(约第 143 行)
# 变更前
self._ws_order_manager = WebSocketOrderManager(executor=self)
# 变更后
self._ws_order_manager = WebSocketOrderManager(executor=self, event_bus=self._event_bus)
文件 B:src/trading/websocket_order_manager.py
构造函数:增加 event_bus 参数,在末尾完成订阅。event_bus=None 仅供单元测试使用;生产路径由 Executor 强制传入,缺省时打印 WARNING,便于排查问题:
def __init__(self, executor, event_bus=None):
self._executor = executor
self._tracking: dict[int, OrderTracking] = {}
self._lock = threading.Lock()
self._http_busy: set[int] = set()
self._event_bus = event_bus
if event_bus is not None:
event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)
else:
logger.warning(
"WebSocketOrderManager: event_bus 未注入,user 频道 fills 将丢失,"
"仅测试环境允许此配置"
)
新增 handler:_on_order_filled_event
发布端已保证 metadata["fill_id"] 必然存在且格式正确。消费端直接读取,缺失时记录 WARNING 并跳过(不使用可能产生不一致 key 的备用路径):
def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
oid = _safe_int_oid(event.order_id)
if oid is None:
return
fill_px = event.filled_price
fill_sz = event.filled_qty
if fill_px <= 0 or fill_sz <= 0:
return
# fill_id 由发布端通过 _make_fill_id 写入,此处强依赖
fill_id: str = event.metadata.get("fill_id") if event.metadata else None
if not fill_id:
logger.warning(
"OrderFilledEvent 缺少 fill_id,发布端未正确调用 _make_fill_id,"
"oid=%s,跳过此事件以避免去重 key 不一致", oid
)
return
should_resolve = False
resolve_tracking = None
with self._lock:
tracking = self._tracking.get(oid)
if tracking is None or tracking.status != OrderStatus.PENDING:
return
if fill_id in tracking._fill_ids:
logger.debug("跳过重复 fill: oid=%s fill_id=%s", oid, fill_id)
return
tracking._fill_ids.add(fill_id)
self._accumulate_fill(tracking, fill_px, fill_sz)
tracking.has_fill_price = True
tracking.fill_count += 1
if tracking._ws_status is not None:
should_resolve = True
resolve_tracking = tracking
# _resolve 在锁外调用,resolve_tracking 是锁内取得的对象引用,此时 status 不会回退
if should_resolve:
self._resolve(oid, resolve_tracking)
为何不保留备用 key:消费端自行构造备用 key 时,
event.timestamp(datetime.now()生成)与原始fill["time"](交易所毫秒时间戳)在值和精度上均不同,无法与发布端的_make_fill_id对齐。正确做法是在发布端保证 fill_id 必填,而非在消费端兜底。
取消订阅:在 manager 的 close() 或 reset() 方法中调用:
if self._event_bus is not None:
self._event_bus.unsubscribe(OrderFilledEvent, self._on_order_filled_event)
防止 manager 重建时 handler 残留或多次订阅。
步骤三:移除 userFills 旧路由
前提(量化验证标准):在测试网环境完成以下验证后,再执行此步骤:
- 下 10 笔以上限价单,追踪成功率 100%(PENDING → FILLED 状态正确流转)
- 手动触发 user 频道与 userFills 频道同时到达同一笔 fill,加权均价与 REST 查询结果误差 < 0.01%
- 查看日志,无 "跳过重复 fill" 以外的 WARNING/ERROR
- 以上验证在主网灰度(单账户小仓位)复测通过
文件:src/services/realtime_kline_service_base.py(约第 639 行)
# 变更前
if channel in ("orderUpdates", "userFills"):
mgr.handle_message(msg)
# 变更后
if channel == "orderUpdates": # userFills 已由 EventBus 接管,不再直接路由
mgr.handle_message(msg)
缓冲区回放(重连恢复逻辑):同步修改,缓冲消息回放时同样只对 orderUpdates 调用 handle_message,不对 userFills 回放。
取舍说明:移除旧路由后,重连期间的历史 userFills 消息不会被补偿回放。这是有意的取舍:
- 断线期间的成交补偿应通过 REST API 查询订单状态,而非回放 WebSocket 原始消息(现有 HTTP 兜底逻辑不变)。
- 若未来需要精确补偿,应在
WebSocketReconnectedEvent的 handler 中主动拉取 fills,而不是回放原始 userFills。
fill_id 生成规则(发布端与消费端必须一致)
| 优先级 | 规则 | 说明 |
|---|---|---|
| 1 | tid:{fill["tid"]} |
交易所唯一成交 ID(Hyperliquid WsFill.tid),带 tid: 前缀 |
| 2 | hash:{md5(oid:px:sz:time)} |
tid 不存在时的确定性备用 key,带 hash: 前缀 |
关键约束:
- 发布端必须调用
_make_fill_id(fill),禁止使用fill.get("tid") or _fill_key(fill)等裸字段读取方式——后者在 tid 存在时返回裸值(如12345),缺少tid:前缀,与_fill_key返回的"tid:12345"不同,导致去重失效。 px、sz使用原始字符串(不做 float 转换),与_fill_key保持一致。- 消费端强依赖
metadata["fill_id"],不自行构造备用 key(原因:event.timestamp与fill["time"]在值和精度上不同,无法对齐)。
联网文档与实测:响应体结构与 key 统一
官方数据结构(Hyperliquid)
- 文档:WebSocket Subscriptions 中定义:
userFills→WsUserFills { user, fills: WsFill[] }(fills 在嵌套字段中,不是 data 本身)userEvents(或部分实现中的user)→WsUserEvent可包含{ "fills": WsFill[] }
- WsFill 字段(与去重相关):
oid,tid(唯一成交 ID),px,sz,time,coin,side,fee,closedPnl等。两路 fill 来源均使用同一 WsFill 结构,故可用同一套 fill_id 规则。
实测与 key 协调
-
在测试网观察真实响应:运行脚本捕获 WebSocket 消息与 fill 结构:
TRADING_NETWORK=testnet python scripts/capture_fill_responses_testnet.py脚本会订阅
orderUpdates/userFills/user,在测试网下一笔小限价单(或手动下单),将收到的消息写入scripts/out/fill_responses_<ts>.json,并在控制台打印各 channel 下 fill 的字段集合与样本,便于确认tid、time等在两路中的存在与格式。 -
key/id 统一约定:
- 唯一标识:以 fill_id 作为去重键,与现有
OrderTracking._fill_ids一致。 - 生成方式:统一调用
_make_fill_id(fill):有tid时用tid:{tid},无tid时用hash:{md5(oid:px:sz:time)};发布端与消费端共用同一函数,保证 user 频道与 userFills 频道对同一笔成交得到相同 fill_id,从而只累计一次。
- 唯一标识:以 fill_id 作为去重键,与现有
-
实测结果(两次测试网捕获,数据完全一致):
项目 结论 实测时间 2026-02-22 12:27 / 2026-02-22 12:45(两次) userFills 顶层结构 data = {"isSnapshot": bool, "user": "0x...", "fills": [WsFill, ...]}fills 解析方式 必须取 data["fills"],不可直接用dataWsFill 字段 coin,px(str),sz(str),side(str),time(int ms),startPosition,dir,closedPnl,hash,oid(int),crossed,fee(str),tid(int),feeToken,twapIdtid 存在性 30+ 条 fill 均有 tid(整数),hash 备用路径极少触发px / sz 类型 字符串(如 "0.01804","14288.0"),_make_fill_id直接使用原始字符串,不做 float 转换oid 类型 整数(如 48913699327),OrderFilledEvent 存为str(oid)side 值 "A"(Ask/卖出)或"B"(Bid/买入),直接透传,无需转换orderUpdates 结构 data = [{"order": {...}, "status": "...", "statusTimestamp": int}],直接数组关键验证:设计文档提出的
raw.get("fills", []) if isinstance(raw, dict) else raw解析方式与实测数据结构完全匹配,可正确提取 fills 数组。
测试矩阵
| 场景 | 验证点 |
|---|---|
| 仅 userFills 到达(常规路径) | 订单正确进入 FILLED,加权均价和手数正确 |
| 仅 user 频道 fills 到达 | 订单正确进入 FILLED,与上一行行为一致 |
| user 频道与 userFills 同时到达同一笔 fill(串行) | _fill_ids 去重,仅累计一次,加权均价不偏移 |
| user 频道与 userFills 并发到达(多线程) | _lock 保护有效,不出现重复累计,加权均价正确 |
| userFills 先到,orderUpdates 后到 | fills 累计后,orderUpdates 到达时触发 _resolve |
orderUpdates 先到(_ws_status 已设),userFills 后到 |
fill 到达时立即触发 _resolve |
| metadata 为空或缺少 fill_id | handler 记录 WARNING 并跳过,不崩溃,不产生错误 key |
_make_fill_id 对同一 fill 两路结果一致性验证 |
user 频道与 userFills 频道对同一笔成交生成完全相同的 fill_id |
| manager 重建后重新注入 event_bus | 旧 handler 已 unsubscribe,新 handler 仅订阅一次,无残留 |
| manager close() 与 event 处理并发 | unsubscribe 后 handler 不再被调用,无竞态崩溃 |
| orderUpdates 的 canceled/rejected | 行为不变,grace 超时与 HTTP 兜底逻辑不受影响 |
| 重连后缓冲 orderUpdates 回放 | 仅 orderUpdates 回放,userFills 不回放,行为符合预期 |
| event_bus=None(测试模式) | 构造函数输出 WARNING,不崩溃,旧路径(_on_user_fill)仍可独立测试 |
变更文件汇总
| 文件 | 变更内容 |
|---|---|
src/utils/websocket/enhanced_ws_manager.py |
新增模块级 _make_fill_id(fill);_publish_user_events 调用 _make_fill_id 并补 fill_id;_cache_latest_data 新增 userFills 分支,修正 data 解析路径(data["fills"]),调用 _make_fill_id 发布事件 |
src/trading/websocket_order_manager.py |
构造函数增加 event_bus 参数,缺省时打印 WARNING;新增 _on_order_filled_event handler,强依赖 metadata["fill_id"];迁移 _fill_key 逻辑与 _make_fill_id 对齐;close/reset 时 unsubscribe;步骤三后可移除已失效的 _on_user_fill |
src/trading/executor.py |
创建 WebSocketOrderManager 时传入 event_bus |
src/services/realtime_kline_service_base.py |
on_message 和缓冲回放中,userFills 不再路由到 handle_message(步骤三) |