OrderFilledEvent 接入订单追踪系统12
双 WS 全 EventBus 重构
问题陈述
orderUpdates 和 userFills 通过 realtime_kline_service_base.on_message() → mgr.handle_message() 直连调用 WebSocketOrderManager,而 user 频道的 fills/positions/balance 已走 EventBus。这导致:
- 半事件驱动 — 同一消费者(
WebSocketOrderManager)两条管道:EventBus + 直连 - 冗余基础设施 —
realtime_kline_service_base维护消息缓冲区(_order_msg_buffer)+ flush 线程 + 回放逻辑,仅服务于直连路径 - 协议耦合 —
handle_message/_on_order_update/_on_user_fill直接解析 Hyperliquid 原始 WS 消息结构 - user 频道 fill 浪费 —
_publish_user_events发布OrderFilledEvent但 EventBus 上零订阅者
本次变更:orderUpdates 和 userFills 全部通过 EventBus 投递到 WebSocketOrderManager,彻底删除直连路径、消息缓冲和 user 频道 fill 发布。
架构决策
| 决策 | 结论 | 理由 |
|---|---|---|
| 路由策略 | orderUpdates + userFills 统一走 EventBus | 消除半事件驱动架构 |
| fill 来源 | 仅 userFills 频道 |
user 频道 fill 是冗余副本 |
| 切换方式 | 原子切换,旧路径全删 | 不留兼容代码 |
event_bus 参数 |
WebSocketOrderManager 必选依赖 |
构造时即 TypeError,不在运行时静默丢数据 |
make_fill_id |
仅用 tid,无 hash 备用路径 |
实测 30+ fill 均有 tid |
| isSnapshot | 发布端过滤,不进 EventBus | 避免无效事件消耗 |
| 消息缓冲 | 删除 | EventBus 同步投递 + HTTP 兜底足够覆盖 |
目标架构
enhanced_ws_manager._cache_latest_data
├── channel == "orderUpdates"
│ └── _publish_order_status_events(data)
│ └── for item in data:
│ └── publish(OrderStatusEvent)
│
└── channel == "userFills"
└── _publish_fill_events(data)
└── isSnapshot? → 跳过
for fill in fills:
└── publish(OrderFilledEvent, metadata={"fill_id": "tid:{tid}"})
EventBus(同步)
├── WebSocketOrderManager._on_order_status_event()
│ └── 状态识别 → _ws_status → grace timer / _resolve()
│
└── WebSocketOrderManager._on_order_filled_event()
└── _fill_ids 去重 → _accumulate_fill() → _resolve()
实测数据结构(测试网,2026-02-22)
orderUpdates:
{"channel": "orderUpdates", "data": [{"order": {"oid": 12345, "avgPx": "0.018", "limitPx": "0.019", "totalSz": "100"}, "status": "filled"}]}
userFills:
{"channel": "userFills", "data": {"isSnapshot": false, "user": "0x...", "fills": [{"oid": 12345, "tid": 67890, "px": "0.01804", "sz": "50", "side": "B", "coin": "PURR", "fee": "0.009", "cloid": null}]}}
| 字段 | 值 |
|---|---|
| WsFill.tid | 整数,30+ 条 fill 均存在 |
| WsFill.px / sz | 字符串 |
| WsFill.oid | 整数 |
| WsFill.side | "A"(卖)或 "B"(买) |
删除清单
以下代码直接删除,不留残留:
| 文件 | 删除内容 |
|---|---|
trading_events.py |
import hashlib;_make_fill_id 中的 hash 路径 |
enhanced_ws_manager.py |
_publish_user_events 中的 fill 循环(L1422-1455);旧 _make_fill_id import |
websocket_order_manager.py |
handle_message 方法整体;_on_order_update 方法整体;_on_user_fill 方法整体;import _make_fill_id |
realtime_kline_service_base.py |
_order_msg_buffer 初始化;_buffer_flush_thread 创建+启动;_periodic_buffer_flush 方法;_get_ws_order_manager 方法;on_message 中全部订单路由+缓冲回放逻辑 |
变更代码
src/events/trading_events.py
_make_fill_id → make_fill_id,删 hash 路径:
def make_fill_id(fill: dict) -> str | None:
"""返回 fill 去重 key。tid 缺失返回 None,调用方跳过。"""
tid = fill.get("tid")
if tid is None:
return None
return f"tid:{tid}"
新增 OrderStatusEvent:
@dataclass(kw_only=True)
class OrderStatusEvent(Event):
"""订单状态更新事件
触发时机:WebSocket orderUpdates 频道推送
订阅者:WebSocketOrderManager(订单追踪状态机)
字段映射(Hyperliquid → 本事件):
- order.oid → order_id (str)
- item.status / order.status → status (lowercase)
- avgPx > limitPx > px 级联 → fallback_price
- totalSz > origSz 级联 → fallback_size
"""
order_id: str = ""
status: str = "" # "filled", "canceled", "rejected", etc.
fallback_price: float = 0.0
fallback_size: float = 0.0
def __post_init__(self):
super().__post_init__()
self.priority = EventPriority.CRITICAL
OrderFilledEvent docstring 更新:触发来源改为 userFills 频道。
src/utils/websocket/enhanced_ws_manager.py
新增模块级 helper:
def _first_pos(*values) -> float:
"""取 values 中第一个正数 float,全部无效返回 0.0"""
for v in values:
try:
f = float(v)
if f > 0:
return f
except (TypeError, ValueError):
continue
return 0.0
_cache_latest_data 新增两个分支(在 elif channel == "user" 之后):
elif channel == "orderUpdates":
self._publish_order_status_events(msg.get("data"))
elif channel == "userFills":
self._publish_fill_events(msg.get("data", {}))
新增 _publish_order_status_events:
def _publish_order_status_events(self, raw_data) -> None:
"""将 orderUpdates 原始数据拆分为逐条 OrderStatusEvent"""
if not raw_data:
return
items = raw_data if isinstance(raw_data, list) else [raw_data]
for item in items:
if not isinstance(item, dict):
continue
order = item.get("order")
if not isinstance(order, dict):
continue
oid = order.get("oid")
if oid is None:
continue
status_str = (
item.get("status") or order.get("status") or ""
).lower()
if not status_str:
continue
self._event_bus.publish(OrderStatusEvent(
timestamp=datetime.now(),
source="websocket",
order_id=str(oid),
status=status_str,
fallback_price=_first_pos(
order.get("avgPx"), order.get("limitPx"), order.get("px")
),
fallback_size=_first_pos(
order.get("totalSz"), order.get("origSz")
),
))
新增 _publish_fill_events:
def _publish_fill_events(self, data) -> None:
"""将 userFills 数据拆分为逐笔 OrderFilledEvent"""
if isinstance(data, dict):
if data.get("isSnapshot"):
return
fills = data.get("fills", [])
elif isinstance(data, list):
fills = data
else:
return
for fill in fills:
if not isinstance(fill, dict):
continue
fill_id = make_fill_id(fill)
if fill_id is None:
logger.error("userFills fill 缺少 tid,跳过: %s", fill)
continue
oid = fill.get("oid")
if oid is None:
continue
self._event_bus.publish(OrderFilledEvent(
timestamp=datetime.now(),
source="websocket",
order_id=str(oid),
client_order_id=fill.get("cloid"),
symbol=fill.get("coin", ""),
side=fill.get("side", ""),
filled_qty=float(fill.get("sz", 0)),
filled_price=float(fill.get("px", 0)),
fee=float(fill.get("fee", 0)),
closed_pnl=(
float(fill.get("closedPnl", 0))
if "closedPnl" in fill else None
),
metadata={"fill_id": fill_id},
))
_publish_user_events:删除 fill 循环(# ── 2. 订单成交事件 整个 if 块),仅保留持仓和余额事件。
src/trading/executor.py
self._ws_order_manager = WebSocketOrderManager(
executor=self,
event_bus=self._event_bus,
)
src/trading/websocket_order_manager.py
import 变更:
# 新增
from src.events.event_bus import EventBus
from src.events.trading_events import OrderFilledEvent, OrderStatusEvent
# 删除
from src.events.trading_events import _make_fill_id
构造函数:event_bus 为必选参数,构造时订阅两个事件:
def __init__(self, executor, event_bus: EventBus):
self._executor = executor
self._tracking: dict[int, OrderTracking] = {}
self._lock = threading.Lock()
self._http_busy: set[int] = set()
self._event_bus = event_bus
event_bus.subscribe(OrderStatusEvent, self._on_order_status_event)
event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)
新增 _on_order_status_event(替代旧 _on_order_update,从 typed event 获取字段,不再解析原始 WS 消息):
def _on_order_status_event(self, event: OrderStatusEvent) -> None:
oid = _safe_int_oid(event.order_id)
if oid is None:
return
status_str = event.status
should_resolve = False
resolve_tracking = None
with self._lock:
tracking = self._tracking.get(oid)
if not tracking or tracking.status != OrderStatus.PENDING:
return
if status_str == "filled":
tracking._ws_status = OrderStatus.FILLED
tracking._fallback_px = event.fallback_price
tracking._fallback_sz = event.fallback_size
logger.info(
f"orderUpdate FILLED: {tracking.coin} oid={oid} "
f"fallback_px={tracking._fallback_px} "
f"fallback_sz={tracking._fallback_sz} "
f"has_fill={'Y' if tracking.has_fill_price else 'N'}"
)
if tracking.has_fill_price:
should_resolve = True
resolve_tracking = tracking
else:
if tracking._grace_timer is None:
timer = threading.Timer(
self._FILL_GRACE_SEC, self._resolve,
[oid, tracking]
)
timer.daemon = True
tracking._grace_timer = timer
timer.start()
elif status_str in ("canceled", "margincanceled"):
logger.info(f"orderUpdate CANCELED: {tracking.coin} oid={oid}")
tracking._ws_status = OrderStatus.CANCELED
should_resolve = True
resolve_tracking = tracking
elif "rejected" in status_str:
if status_str != "rejected":
logger.info(
f"orderUpdate 拒绝变体: {tracking.coin} oid={oid} status={status_str!r}"
)
tracking._ws_status = OrderStatus.REJECTED
should_resolve = True
resolve_tracking = tracking
if should_resolve:
self._resolve(oid, resolve_tracking)
新增 _on_order_filled_event(替代旧 _on_user_fill):
def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
oid = _safe_int_oid(event.order_id)
if oid is None:
return
if event.filled_price <= 0 or event.filled_qty <= 0:
return
fill_id = event.metadata.get("fill_id") if event.metadata else None
if not fill_id:
return
should_resolve = False
resolve_tracking = None
with self._lock:
tracking = self._tracking.get(oid)
if not tracking or tracking.status != OrderStatus.PENDING:
return
if fill_id in tracking._fill_ids:
return
tracking._fill_ids.add(fill_id)
self._accumulate_fill(tracking, event.filled_price, event.filled_qty)
tracking.has_fill_price = True
tracking.fill_count += 1
logger.info(
f"userFill 累计: {tracking.coin} oid={oid} "
f"本笔 px={event.filled_price} sz={event.filled_qty} | "
f"累计 avg_px={tracking.avg_price:.6g} "
f"total_sz={tracking.filled_size:.6g} "
f"fills={tracking.fill_count} fid={fill_id}"
)
if tracking._ws_status is not None:
should_resolve = True
resolve_tracking = tracking
if should_resolve:
self._resolve(oid, resolve_tracking)
_resolve内部通过 identity check(current is not tracking)保证幂等。
shutdown():取消订阅先于清理:
def shutdown(self):
self._event_bus.unsubscribe(OrderStatusEvent, self._on_order_status_event)
self._event_bus.unsubscribe(OrderFilledEvent, self._on_order_filled_event)
with self._lock:
pending = [
(oid, t) for oid, t in list(self._tracking.items())
if t.status == OrderStatus.PENDING
]
for oid, tracking in pending:
self._resolve(oid, tracking, OrderStatus.CANCELED)
src/services/realtime_kline_service_base.py
删除全部订单相关代码:
| 位置 | 删除内容 |
|---|---|
__init__ |
self._order_msg_buffer: deque = deque(maxlen=500) |
_setup_ws_manager |
_buffer_flush_thread 创建 |
start |
_buffer_flush_thread.start() |
on_message |
缓冲区回放块 + 订单消息路由块(整个 if channel in ("orderUpdates", "userFills") 及其 buffer 逻辑) |
| 方法级 | _periodic_buffer_flush 整体删除 |
| 方法级 | _get_ws_order_manager 整体删除 |
on_message 不再有任何订单相关代码,仅保留 K 线处理。
消息缓冲删除理由
旧缓冲区存在的原因:WebSocketOrderManager 可能在 WS 消息到达时尚未初始化。
全 EventBus 方案下无需缓冲:
- 启动时序 —
WebSocketOrderManager在executor.__init__中创建并订阅 EventBus,先于 WS 连接建立。启动前无订单被 track,即使事件丢失也无影响。 - 重连场景 —
WebSocketOrderManager生命周期内 EventBus 订阅始终有效,WS 重连不影响事件投递。verify_pending_orders()双轮 HTTP 补查覆盖断连窗口。 - EventBus 同步投递 — 发布即消费,无异步丢失风险。
测试矩阵
| 场景 | 验证点 |
|---|---|
| 常规 orderUpdates filled | OrderStatusEvent 发布 → _on_order_status_event 收到 → _ws_status=FILLED |
| 常规 userFills 单笔成交 | OrderFilledEvent 发布 → 订单进入 FILLED,avg_price 与 REST 查询误差 < 0.01% |
| 多笔部分成交(同一 oid) | 每笔独立累计,avg_price 滚动正确,fill_count 准确 |
| 同一 fill 重复推送 | _fill_ids 去重,_accumulate_fill 仅调用一次 |
| fill 缺少 tid | 发布端 ERROR 日志,跳过,不进 EventBus |
| fill 缺少 oid | 发布端跳过 |
| userFills 早于 orderUpdates 到达 | fills 累计后,OrderStatusEvent 触发 _resolve |
| orderUpdates 早于 userFills 到达 | _ws_status 已设 + grace timer,fill 到达立即 _resolve |
_resolve 并发调用 |
identity check 保证幂等,result_event.set() 仅一次 |
| userFills 快照(isSnapshot=true) | 发布端直接跳过,不进 EventBus |
| orderUpdates canceled / rejected | 立即 _resolve,无需等待 fill |
| manager shutdown() 与事件并发 | unsubscribe 先于清理,handler 不再被调用 |
event_bus 未传入 |
构造时即 TypeError |
| WS 重连 | verify_pending_orders 双轮 HTTP 补查正常工作 |
变更文件汇总
| 文件 | 变更内容 |
|---|---|
src/events/trading_events.py |
新增 OrderStatusEvent;_make_fill_id → make_fill_id(去下划线,删 hash 路径,返回 str | None);更新 OrderFilledEvent docstring |
src/utils/websocket/enhanced_ws_manager.py |
新增 _first_pos helper;_cache_latest_data 新增 orderUpdates + userFills 分支;新增 _publish_order_status_events + _publish_fill_events;_publish_user_events 删 fill 循环;import 更新 |
src/trading/websocket_order_manager.py |
删 handle_message / _on_order_update / _on_user_fill;event_bus 改必选;新增 _on_order_status_event + _on_order_filled_event;shutdown() 新增 unsubscribe;import 更新 |
src/trading/executor.py |
创建 WebSocketOrderManager 时传入 event_bus |
src/services/realtime_kline_service_base.py |
删除 _order_msg_buffer / _buffer_flush_thread / _periodic_buffer_flush / _get_ws_order_manager / on_message 全部订单路由 |