OrderFilledEvent 接入订单追踪系统11
OrderFilledEvent 接入订单追踪系统
问题陈述
_publish_user_events(user 频道)已发布 OrderFilledEvent,但 EventBus 上零订阅者,成交数据完全丢失。
_on_user_fill(userFills 频道)通过旧直连路径绕过 EventBus 直接调用 _accumulate_fill,与事件驱动架构方向相反。
本次变更:以 userFills 为唯一 fill 来源,通过 EventBus 接入 WebSocketOrderManager,同时删除全部旧路径。
架构决策
| 决策 | 结论 |
|---|---|
| fill 来源 | 仅 userFills 频道,user 频道不再处理 fill |
| 切换方式 | 原子切换,旧路径与新路径在同一次变更中完成 |
event_bus 参数 |
WebSocketOrderManager 必选依赖,不提供 None 默认值 |
make_fill_id |
仅用 tid,无 hash 备用路径(实测 30+ fill 均有 tid) |
目标架构
enhanced_ws_manager._cache_latest_data
└── channel == "userFills"
└── for fill in data["fills"]:
└── publish(OrderFilledEvent, metadata={"fill_id": "tid:{tid}"})
EventBus(同步)
└── WebSocketOrderManager._on_order_filled_event()
└── _fill_ids 去重 → _accumulate_fill()
实测数据结构(测试网,2026-02-22)
| 字段 | 值 |
|---|---|
| userFills 顶层 | data = {"isSnapshot": bool, "user": "0x...", "fills": [WsFill, ...]} |
| WsFill.tid | 整数,30+ 条 fill 均存在 |
| WsFill.px / sz | 字符串(如 "0.01804") |
| WsFill.oid | 整数 |
| WsFill.side | "A"(卖)或 "B"(买) |
删除清单
以下代码直接删除,不留残留:
| 文件 | 删除内容 |
|---|---|
websocket_order_manager.py |
_on_user_fill 方法(整体) |
websocket_order_manager.py |
handle_message 中 channel == "userFills" 分支 |
enhanced_ws_manager.py |
_publish_user_events 中的 fill 循环逻辑 |
变更代码
src/events/trading_events.py
def make_fill_id(fill: dict) -> str | None:
"""
返回 fill 去重 key。
tid 缺失时返回 None,调用方负责记录错误并跳过。
"""
tid = fill.get("tid")
if tid is None:
return None
return f"tid:{tid}"
原
_make_fill_id同步重命名(去下划线)并删除 hash 备用路径。
src/utils/websocket/enhanced_ws_manager.py
_publish_user_events:删除 fill 循环,仅保留其他事件类型。
_cache_latest_data:新增 userFills 分支:
elif channel == "userFills":
fills = msg.get("data", {}).get("fills", [])
for fill in fills:
fill_id = make_fill_id(fill)
if fill_id is None:
logger.error("userFills fill 缺少 tid,跳过: %s", fill)
continue
oid = fill.get("oid")
if oid is None:
continue
self._event_bus.publish(OrderFilledEvent(
timestamp=datetime.now(),
source="websocket",
order_id=str(oid),
client_order_id=fill.get("cloid"),
symbol=fill.get("coin", ""),
side=fill.get("side", ""),
filled_qty=float(fill.get("sz", 0)),
filled_price=float(fill.get("px", 0)),
fee=float(fill.get("fee", 0)),
closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
metadata={"fill_id": fill_id},
))
src/trading/executor.py
self._ws_order_manager = WebSocketOrderManager(
executor=self,
event_bus=self._event_bus, # 必选参数
)
src/trading/websocket_order_manager.py
构造函数:event_bus 为必选参数,构造时完成订阅:
def __init__(self, executor, event_bus: EventBus):
self._executor = executor
self._tracking: dict[int, OrderTracking] = {}
self._lock = threading.Lock()
self._http_busy: set[int] = set()
self._event_bus = event_bus
event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)
新增 handler:
def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
try:
oid = int(event.order_id)
except (ValueError, TypeError):
return
if event.filled_price <= 0 or event.filled_qty <= 0:
return
fill_id = event.metadata.get("fill_id") if event.metadata else None
if not fill_id:
logger.error("OrderFilledEvent 缺少 fill_id,oid=%s,跳过", oid)
return
should_resolve = False
resolve_tracking = None
with self._lock:
tracking = self._tracking.get(oid)
if tracking is None or tracking.status != OrderStatus.PENDING:
return
if fill_id in tracking._fill_ids:
return
tracking._fill_ids.add(fill_id)
self._accumulate_fill(tracking, event.filled_price, event.filled_qty)
tracking.has_fill_price = True
tracking.fill_count += 1
if tracking._ws_status is not None:
should_resolve = True
resolve_tracking = tracking
if should_resolve:
self._resolve(oid, resolve_tracking)
_resolve内部通过 identity check(current is not tracking)保证幂等:第一次调用成功 pop,第二次因current is None直接返回 False。
shutdown():取消订阅先于清理执行:
def shutdown(self):
self._event_bus.unsubscribe(OrderFilledEvent, self._on_order_filled_event)
with self._lock:
pending = [
(oid, t) for oid, t in list(self._tracking.items())
if t.status == OrderStatus.PENDING
]
for oid, tracking in pending:
self._resolve(oid, tracking, OrderStatus.CANCELED)
src/services/realtime_kline_service_base.py
# 变更前
if channel in ("orderUpdates", "userFills"):
mgr.handle_message(msg)
# 变更后
if channel == "orderUpdates":
mgr.handle_message(msg)
缓冲区回放同步修改,userFills 不回放(重连补偿通过现有 HTTP 兜底处理)。
测试矩阵
| 场景 | 验证点 |
|---|---|
| 常规 userFills 单笔成交 | 订单进入 FILLED,avg_price 与 REST 查询误差 < 0.01% |
| 多笔部分成交(同一 oid) | 每笔独立累计,avg_price 滚动正确,fill_count 准确 |
| 同一 fill 重复推送 | _fill_ids 去重,_accumulate_fill 仅调用一次 |
| fill 缺少 tid | 发布端 ERROR 日志,跳过,不崩溃,不写入 EventBus |
| fill 缺少 oid | 发布端跳过,不崩溃 |
| userFills 早于 orderUpdates 到达 | fills 累计后,orderUpdates 触发 _resolve |
| orderUpdates 早于 userFills 到达 | fill 到达时 _ws_status 已设,立即触发 _resolve |
_resolve 并发调用(单元测试) |
identity check 保证幂等,result_event.set() 仅执行一次 |
| userFills 快照(isSnapshot=true)含历史 fill | tracking is None 静默跳过,不影响当前 PENDING 订单 |
| manager shutdown() 与 fill 并发 | unsubscribe 先于清理,handler 不再被调用,无竞态 |
event_bus 未传入 |
构造时即 TypeError,启动失败,不在运行时静默丢数据 |
变更文件汇总
| 文件 | 变更内容 |
|---|---|
src/events/trading_events.py |
_make_fill_id → make_fill_id(去下划线,删 hash 路径) |
src/utils/websocket/enhanced_ws_manager.py |
_publish_user_events 删 fill 循环;_cache_latest_data 新增 userFills 分支 |
src/trading/websocket_order_manager.py |
删 _on_user_fill;删 handle_message userFills 分支;event_bus 改必选;新增 _on_order_filled_event;shutdown() 新增 unsubscribe |
src/trading/executor.py |
创建 WebSocketOrderManager 时传入 event_bus |
src/services/realtime_kline_service_base.py |
on_message 和缓冲回放移除 userFills 路由 |