OrderFilledEvent 接入订单追踪系统10
OrderFilledEvent 接入订单追踪系统
现状诊断
当前 fill 处理路径
user 频道 data.fills → _publish_user_events() → OrderFilledEvent → 🕳️ 无消费者(数据丢失)
userFills 频道 → handle_message() → _on_user_fill() → 直接累计(旧直连路径)
两条路径状态不一致:
- user 频道的 fills:
_publish_user_events已发布OrderFilledEvent(优先级 CRITICAL,metadata 含正确 fill_id),但 EventBus 上零订阅者,成交数据完全丢失,不进入订单追踪。 - userFills 频道:
_on_user_fill已正确解析data["fills"]嵌套结构(含向后兼容),通过_make_fill_id去重后直接调用_accumulate_fill,走的是旧直连路径而非 EventBus。
根本问题
唯一遗留问题:OrderFilledEvent 是"已搭好发射台但从未点火"的组件——有定义、有发布、有 CRITICAL 优先级,但没有任何消费者。这不是事件多余,而是订阅方缺位。
两路来源中,userFills 频道目前能正常工作(通过旧直连路径),user 频道的成交数据完全丢失。
已完成的修复
| 项目 | 状态 | 说明 |
|---|---|---|
_on_user_fill 解析路径 |
✅ 已修复 | 正确取 data["fills"],含 isSnapshot 处理和旧格式兼容 |
_make_fill_id 共享函数 |
✅ 已完成 | 提取至 src/events/trading_events.py 模块级,_fill_key 已删除 |
_publish_user_events 发布端 |
✅ 已完成 | 调用 _make_fill_id,metadata={"fill_id": fill_id} 已写入 |
EventBus 特性确认
经代码核查,EventBus.publish 为同步阻塞调用,直接遍历 handler 列表执行,不涉及队列或线程池。因此:
- 消息顺序与 WebSocket 到达顺序完全一致,无时序风险
publish(event)在行为上等价于直接方法调用,无额外开销- 使用 EventBus 的目的是解耦(基础设施层不直接持有业务层引用),而非异步
- 同步调用意味着 WebSocket 消息处理线程会阻塞至 handler 执行完毕;正常情况下延迟极低(微秒级),但需注意避免在 handler 内执行耗时操作
EventBus 异常隔离:publish 内部为每个 handler 独立捕获异常(try/except per handler),单个 handler 抛出异常不会中断其他 handler 的执行,也不会崩溃 WebSocket 消息处理线程。日志记录异常后继续执行。
EventBus 订阅去重:subscribe() 检测 handler 是否已存在,若已存在则跳过(不重复注册)。manager 重建时重新注入 event_bus 并调用 subscribe,是幂等安全的。
设计目标
- 修复 user 频道 fills 丢失:让
WebSocketOrderManager成为OrderFilledEvent的消费者。 - 统一成交入口:两路 fill 来源(user 频道 + userFills 频道)均通过
OrderFilledEvent进入订单追踪,_fill_ids去重保证不重复累计。 - 保持行为不变:订单状态机、加权均价计算、grace 超时、HTTP 兜底逻辑全部不动。
- 架构方向正确:基础设施层(enhanced_ws_manager)只发事件,业务层(WebSocketOrderManager)自行订阅,避免跨层耦合;后续 TradeLogger、RiskManager 等新消费者只需
subscribe,无需改动现有代码。
架构变化
变更前
realtime_kline_service_base.on_message
├── channel == "orderUpdates" → mgr.handle_message() → _on_order_update()
└── channel == "userFills" → mgr.handle_message() → _on_user_fill()
enhanced_ws_manager._publish_user_events (user 频道 fills)
└── publish(OrderFilledEvent) → 【无消费者,数据丢失】
变更后
realtime_kline_service_base.on_message
├── channel == "orderUpdates" → mgr.handle_message() → _on_order_update() ← 不变
└── channel == "userFills" → 【不再路由到 handle_message】
enhanced_ws_manager(两路来源,均通过 _make_fill_id 发布带前缀 fill_id 的事件)
├── _publish_user_events (user 频道 fills) → publish(OrderFilledEvent + fill_id) ─┐
└── _cache_latest_data (userFills 频道) → publish(OrderFilledEvent + fill_id) ─┤
↓
EventBus(同步)
↓
WebSocketOrderManager._on_order_filled_event()
└── _fill_ids 去重 → _accumulate_fill()
过渡期安全性分析
本节解答关键问题:步骤一B(新增 userFills 发布端)与步骤二(新增消费端订阅)完成后、步骤三(移除旧路由)执行前,是否存在重复累计风险?
过渡期数据流
userFills 消息
├── path A: realtime_kline_service_base → handle_message() → _on_user_fill()
│ └── 写入 tracking._fill_ids,调用 _accumulate_fill()
└── path B: enhanced_ws_manager._cache_latest_data → OrderFilledEvent
└── _on_order_filled_event()
└── 检查 fill_id in tracking._fill_ids → 已存在则跳过
为何过渡期安全
经代码核查(websocket_order_manager.py:471),_on_user_fill 使用与 _on_order_filled_event 完全相同的 tracking._fill_ids 集合去重:
# _on_user_fill(旧路径,已确认代码)
fid = _make_fill_id(fill_data)
with self._lock:
if fid in tracking._fill_ids: # ← 同一 set
continue
tracking._fill_ids.add(fid) # ← 同一 set
self._accumulate_fill(tracking, fill_px, fill_sz)
# _on_order_filled_event(新路径)
fill_id = event.metadata.get("fill_id")
with self._lock:
if fill_id in tracking._fill_ids: # ← 同一 set
return
tracking._fill_ids.add(fill_id) # ← 同一 set
self._accumulate_fill(tracking, fill_px, fill_sz)
两路均持有 self._lock,操作串行化。无论哪路先执行:
- 先执行者:将 fill_id 写入
_fill_ids,调用_accumulate_fill - 后执行者:发现 fill_id 已在
_fill_ids中,直接跳过
_make_fill_id 对同一笔 fill 在两路产生完全相同的 fill_id(共用函数,相同输入),去重保证严格有效,过渡期不会发生重复累计。
步骤一B 与步骤二的部署原子性
步骤一B(发布端)和步骤二(消费端)可以分开部署:
- 仅完成步骤一B(发布但无消费者):EventBus 上仍零订阅者,新发布的 OrderFilledEvent 被静默丢弃,行为与当前完全一致,不引入任何副作用。
- 仅完成步骤二(消费端就绪但发布端未改):userFills 频道无新事件发布,
_on_order_filled_event不会被调用,旧_on_user_fill路径照常工作。
两步均为增量变更,互不影响,无需原子化部署。步骤三(移除旧路由)才是真正的切换节点,执行前必须完成量化验证。
实现步骤
执行顺序:前提(提取共享函数)→ 步骤一(修复发布端)→ 步骤二(接入消费端)→ 步骤三(移除旧路由)。
步骤一和步骤二完成、验证通过(见步骤三前提标准)后,再执行步骤三;避免在新路径未验证前就切断旧路径。
✅ 前提:提取共享函数 _make_fill_id(已完成)
这是本次变更的基础。发布端两处与消费端必须调用同一函数,否则同一笔成交在两路产生不同 key,去重失效。
当前状态:已完成。_make_fill_id 现为 src/events/trading_events.py 的模块级函数,原 websocket_order_manager._fill_key 已删除,两处调用均已迁移至 _make_fill_id。
函数定义(位于 src/events/trading_events.py:19):
import hashlib
def _make_fill_id(fill: dict) -> str:
"""
生成 fill 的唯一去重 key。
- 有 tid(含 tid=0):返回 "tid:{tid}"
- 无 tid(即 None):返回 "hash:{md5(oid:px:sz:time)}"
注意:
- 使用 `tid is not None` 判断,确保 tid=0 视为有效值。
- hash 路径各字段直接使用 fill.get(key)(无缺省值),缺失时值为 None,
f-string 产生 "None:...",两路来源必须完全一致才能正确去重。
- px/sz 使用原始字符串,不做 float 转换。
"""
tid = fill.get("tid")
if tid is not None:
return f"tid:{tid}"
raw = f"{fill.get('oid')}:{fill.get('px')}:{fill.get('sz')}:{fill.get('time')}"
return f"hash:{hashlib.md5(raw.encode()).hexdigest()}"
命名约定说明:_make_fill_id 以下划线开头,表示"非对外 API"而非"模块私有"——它需要跨模块共享(发布端与消费端均 import),但不应被外部调用方直接使用。当前以下划线命名是已建立的惯例;若未来提升为正式 API,可重命名为 make_fill_id 并迁移至 src/utils/ 下的共享模块。
两侧 import 方式:
from src.events.trading_events import _make_fill_id
步骤一:修复两路发布端,统一写入 fill_id
事件定义说明:
OrderFilledEvent继承自Event,已有metadata字段,无需改OrderFilledEvent类本身。
文件:src/utils/websocket/enhanced_ws_manager.py
✅ 修改点 A:_publish_user_events(user 频道)—— 已完成
当前代码已正确调用 _make_fill_id 并写入 metadata:
fill_id = _make_fill_id(fill)
event = OrderFilledEvent(
timestamp=datetime.now(),
source="websocket",
order_id=str(fill.get("oid", "")),
client_order_id=fill.get("cloid"),
symbol=fill.get("coin", ""),
side=fill.get("side", ""),
filled_qty=float(fill.get("sz", 0)),
filled_price=float(fill.get("px", 0)),
fee=float(fill.get("fee", 0)),
closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
metadata={"fill_id": fill_id},
)
self._event_bus.publish(event)
⬜ 修改点 B:_cache_latest_data(userFills 频道)—— 待完成
在现有 channel == "user" 分支旁新增 elif channel == "userFills" 分支。
设计取舍说明:
_cache_latest_data原意是"缓存最新数据",在此处同时承担了发布 OrderFilledEvent 的职责,与函数名存在语义偏差。选择在此处处理而非新建独立方法的原因:
_cache_latest_data是唯一接收所有 channel 消息的路由入口,结构上与_publish_user_events对称- 现有其他 channel(candle、l2Book、allMids)也在
_cache_latest_data中同时完成缓存与事件发布,userFills 保持一致- 后续优化方向(TODO):将"缓存"与"事件发布"拆分为独立职责,重命名为
_process_message或提取_publish_userFills_event子方法,不在本次变更范围内。
注意数据结构(测试网实测验证):Hyperliquid
userFills推送的顶层 data 为WsUserFills { isSnapshot?, user, fills: WsFill[] },fills 数组在data["fills"]下,不是 data 本身。快照消息含isSnapshot: true,实时成交消息同样使用此结构(isSnapshot缺省或为 false)。两种情况均需取data["fills"]。
elif channel == "userFills":
raw = msg.get("data", {})
# data 结构为 WsUserFills { user, fills: WsFill[] },取 fills 字段
fills = raw.get("fills", []) if isinstance(raw, dict) else raw
for fill in fills:
oid = fill.get("oid")
if oid is None:
continue
fill_id = _make_fill_id(fill)
event = OrderFilledEvent(
timestamp=datetime.now(),
source="websocket",
order_id=str(oid),
client_order_id=fill.get("cloid"),
symbol=fill.get("coin", ""),
side=fill.get("side", ""),
filled_qty=float(fill.get("sz", 0)),
filled_price=float(fill.get("px", 0)),
fee=float(fill.get("fee", 0)),
closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
metadata={"fill_id": fill_id},
)
self._event_bus.publish(event)
快照消息处理说明:userFills 首次连接时会推送一条快照(
isSnapshot: true),包含该账户的历史成交数据。
- 已结束订单(不在
_tracking中,或 status ≠ PENDING):消费端_on_order_filled_event中tracking is None/tracking.status != PENDING分支静默跳过,无副作用。- 重连时仍在 PENDING 的订单:快照中的历史 fill 会被正常累计并触发
_resolve,这是预期行为——补偿重连期间遗漏的成交数据。两种情况均无需区分处理,现有
_tracking.get(oid)防护已覆盖全部场景。
_cache_latest_data异常隔离:_cache_latest_data末尾有except Exception as e: logger.warning(...)捕获,EventBus 发布端抛出的异常不会逃逸至 WebSocket 消息处理线程。EventBus 消费端(_on_order_filled_event)的异常则由EventBus.publish内部的 per-handler try/except 捕获,同样不会影响其他订阅者或消息线程。
⬜ 步骤二:WebSocketOrderManager 订阅 OrderFilledEvent(待完成)
文件 A:src/trading/executor.py(约第 143 行)
# 变更前
self._ws_order_manager = WebSocketOrderManager(executor=self)
# 变更后
self._ws_order_manager = WebSocketOrderManager(executor=self, event_bus=self._event_bus)
文件 B:src/trading/websocket_order_manager.py
构造函数:增加 event_bus 参数,在末尾完成订阅。event_bus=None 仅供单元测试使用;生产路径由 Executor 强制传入,缺省时打印 WARNING,便于排查问题:
def __init__(self, executor, event_bus=None):
self._executor = executor
self._tracking: dict[int, OrderTracking] = {}
self._lock = threading.Lock()
self._http_busy: set[int] = set()
self._event_bus = event_bus
if event_bus is not None:
event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)
else:
logger.warning(
"WebSocketOrderManager: event_bus 未注入,user 频道 fills 将丢失,"
"仅测试环境允许此配置"
)
辅助函数说明:_safe_int_oid 定义于 src/trading/websocket_order_manager.py:65,为模块级函数,将任意类型的 oid 安全转换为 int:
def _safe_int_oid(raw) -> int | None:
"""将 oid 安全转换为 int,失败返回 None"""
if raw is None:
return None
try:
return int(raw)
except (ValueError, TypeError):
return None
OrderFilledEvent.order_id 存为 str(oid),_safe_int_oid 负责将其还原为 int,与 _tracking 字典的 int key 对齐。
新增 handler:_on_order_filled_event
发布端已保证 metadata["fill_id"] 必然存在且格式正确。消费端直接读取,缺失时记录 WARNING 并跳过(不使用可能产生不一致 key 的备用路径):
def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
oid = _safe_int_oid(event.order_id)
if oid is None:
return
fill_px = event.filled_price
fill_sz = event.filled_qty
if fill_px <= 0 or fill_sz <= 0:
return
# fill_id 由发布端通过 _make_fill_id 写入,此处强依赖
fill_id: str = event.metadata.get("fill_id") if event.metadata else None
if not fill_id:
logger.warning(
"OrderFilledEvent 缺少 fill_id,发布端未正确调用 _make_fill_id,"
"oid=%s,跳过此事件以避免去重 key 不一致", oid
)
return
should_resolve = False
resolve_tracking = None
with self._lock:
tracking = self._tracking.get(oid)
if tracking is None or tracking.status != OrderStatus.PENDING:
return
if fill_id in tracking._fill_ids:
logger.debug("跳过重复 fill: oid=%s fill_id=%s", oid, fill_id)
return
tracking._fill_ids.add(fill_id)
self._accumulate_fill(tracking, fill_px, fill_sz)
tracking.has_fill_price = True
tracking.fill_count += 1
logger.debug(
"EventBus fill 累计: %s oid=%s 本笔 px=%s sz=%s | "
"累计 avg_px=%.6g total_sz=%.6g fills=%s fill_id=%s",
event.symbol, oid, fill_px, fill_sz,
tracking.avg_price, tracking.filled_size,
tracking.fill_count, fill_id,
)
if tracking._ws_status is not None:
should_resolve = True
resolve_tracking = tracking
# _resolve 在锁外调用。两路 fill 事件(user 频道与 userFills 频道)可能并发进入此处:
# - 两次 _resolve 调用持有相同的 resolve_tracking 引用
# - _resolve 内部通过 identity check(current is not tracking)保证幂等:
# 第一次调用成功 pop _tracking[oid],第二次因 current is None 直接返回 False
# - 这是安全的幂等行为,两次并发最多只有一次成功解析
if should_resolve:
self._resolve(oid, resolve_tracking)
日志级别说明:
_on_order_filled_event使用logger.debug而非logger.info。原旧路径_on_user_fill使用logger.info(步骤三后删除),新路径采用debug以减少生产环境日志量。若需在生产环境追踪成交明细,建议通过专用的 TradeLogger 消费者(后续接入)而非调整此日志级别。
为何不保留备用 key:消费端自行构造备用 key 时,
event.timestamp(datetime.now()生成)与原始fill["time"](交易所毫秒时间戳)在值和精度上均不同,无法与发布端的_make_fill_id对齐。正确做法是在发布端保证 fill_id 必填,而非在消费端兜底。
_resolve 幂等性机制(websocket_order_manager.py:234):
两路并发 _resolve 调用的安全性由以下代码保证:
def _resolve(self, oid: int, tracking: OrderTracking, ...) -> bool:
with self._lock:
# identity check:确认 tracking 对象未被 track_order() 替换
current = self._tracking.get(oid)
if current is not tracking: # ← 幂等核心
return False # 第二次调用:pop 后 current 为 None,直接返回
if tracking.status != OrderStatus.PENDING:
return False
# ... 设置终态 ...
self._tracking.pop(oid, None) # ← 第一次调用成功 pop
tracking.result_event.set()
return True
第一次 _resolve 调用持锁成功 pop,第二次调用时 current is None,None is not tracking 为 True,立即返回 False。两次并发调用最多一次成功,result_event.set() 只执行一次。
取消订阅:在 shutdown() 方法中调用(websocket_order_manager.py:212):
def shutdown(self):
"""强制解析所有未完成订单为 CANCELED,解除 wait_for_order 阻塞"""
# 取消订阅,防止 shutdown 后残留 handler 被调用
if self._event_bus is not None:
self._event_bus.unsubscribe(OrderFilledEvent, self._on_order_filled_event)
with self._lock:
pending = [
(oid, t) for oid, t in list(self._tracking.items())
if t.status == OrderStatus.PENDING
]
if not pending:
return
logger.info(f"shutdown: 强制解析 {len(pending)} 个未完成订单")
for oid, tracking in pending:
self._resolve(oid, tracking, OrderStatus.CANCELED)
unsubscribe时序说明:unsubscribe先于清理_tracking执行,确保 shutdown 期间不会有新的 fill 事件进入已开始清理的 manager。EventBus.unsubscribe持_subscriber_lock,与publish互斥,线程安全。
⬜ 步骤三:移除 userFills 旧路由(待完成,依赖步骤二验证通过)
前提(量化验证标准):在测试网环境完成以下验证后,再执行此步骤:
- 下 10 笔以上限价单,追踪成功率 100%(PENDING → FILLED 状态正确流转)
- 手动触发 user 频道与 userFills 频道同时到达同一笔 fill,加权均价与 REST 查询结果误差 < 0.01%
- 查看日志,无 "跳过重复 fill" 以外的 WARNING/ERROR
- 单元测试层面并发验证:使用
threading.Thread模拟两路 fill 事件并发注入同一 oid,断言:
_accumulate_fill仅调用一次(通过 mock 计数)tracking.avg_price与预期值完全一致tracking.fill_count == 1- 此测试不依赖测试网,可在 CI 中自动运行
- 以上验证在主网灰度(单账户小仓位)复测通过
文件:src/services/realtime_kline_service_base.py(约第 639 行)
# 变更前
if channel in ("orderUpdates", "userFills"):
mgr.handle_message(msg)
# 变更后
if channel == "orderUpdates": # userFills 已由 EventBus 接管,不再直接路由
mgr.handle_message(msg)
缓冲区回放(重连恢复逻辑):同步修改,缓冲消息回放时同样只对 orderUpdates 调用 handle_message,不对 userFills 回放。
取舍说明:移除旧路由后,重连期间的历史 userFills 消息不会被补偿回放。这是有意的取舍:
- 断线期间的成交补偿应通过 REST API 查询订单状态,而非回放 WebSocket 原始消息(现有 HTTP 兜底逻辑不变)。
- 若未来需要精确补偿,应在
WebSocketReconnectedEvent的 handler 中主动拉取 fills,而不是回放原始 userFills。
步骤三验证通过后,必须删除以下死代码(步骤三完成后这两处将永远不被调用):
websocket_order_manager.py中的_on_user_fill方法(整个方法体)handle_message中对channel == "userFills"的分支
保留死代码会误导后续维护者以为该路径仍然生效,必须在步骤三验收时一并清理。
fill_id 生成规则(发布端与消费端必须一致)
| 优先级 | 规则 | 说明 |
|---|---|---|
| 1 | tid:{fill["tid"]} |
交易所唯一成交 ID(Hyperliquid WsFill.tid),带 tid: 前缀 |
| 2 | hash:{md5(oid:px:sz:time)} |
tid 不存在时的确定性备用 key,带 hash: 前缀 |
关键约束:
- 发布端与消费端统一调用
_make_fill_id(fill)(定义于src/events/trading_events.py:19),禁止使用裸字段读取方式——后者在 tid 存在时可能返回裸值(如12345),缺少tid:前缀,导致去重失效。 tid=0是有效值:使用if tid is not None判断(而非if tid),确保tid=0的 fill 返回"tid:0"。若用if tid:则tid=0被误判为无 tid,走 hash 路径,产生不同 key,去重失效。- hash 路径缺省值:各字段使用
fill.get(key)(无缺省值),字段缺失时值为None,f-string 产生"None:..."。若使用fill.get(key, "")则缺失字段产生":...",两路 hash 不同,去重失效。 px、sz使用原始字符串(不做 float 转换)。- 消费端强依赖
metadata["fill_id"],不自行构造备用 key(原因:event.timestamp与fill["time"]在值和精度上不同,无法对齐)。
联网文档与实测:响应体结构与 key 统一
官方数据结构(Hyperliquid)
- 文档:WebSocket Subscriptions 中定义:
userFills→WsUserFills { user, fills: WsFill[] }(fills 在嵌套字段中,不是 data 本身)userEvents(或部分实现中的user)→WsUserEvent可包含{ "fills": WsFill[] }
- WsFill 字段(与去重相关):
oid,tid(唯一成交 ID),px,sz,time,coin,side,fee,closedPnl等。两路 fill 来源均使用同一 WsFill 结构,故可用同一套 fill_id 规则。
实测与 key 协调
-
在测试网观察真实响应:运行脚本捕获 WebSocket 消息与 fill 结构:
TRADING_NETWORK=testnet python scripts/capture_fill_responses_testnet.py脚本会订阅
orderUpdates/userFills/user,在测试网下一笔小限价单(或手动下单),将收到的消息写入scripts/out/fill_responses_<ts>.json,并在控制台打印各 channel 下 fill 的字段集合与样本,便于确认tid、time等在两路中的存在与格式。 -
key/id 统一约定:
- 唯一标识:以 fill_id 作为去重键,与现有
OrderTracking._fill_ids一致。 - 生成方式:统一调用
_make_fill_id(fill):有tid时用tid:{tid},无tid时用hash:{md5(oid:px:sz:time)};发布端与消费端共用同一函数,保证 user 频道与 userFills 频道对同一笔成交得到相同 fill_id,从而只累计一次。
- 唯一标识:以 fill_id 作为去重键,与现有
-
实测结果(两次测试网捕获,数据完全一致):
项目 结论 实测时间 2026-02-22 12:27 / 2026-02-22 12:45(两次) userFills 顶层结构 data = {"isSnapshot": bool, "user": "0x...", "fills": [WsFill, ...]}fills 解析方式 必须取 data["fills"],不可直接用dataWsFill 字段 coin,px(str),sz(str),side(str),time(int ms),startPosition,dir,closedPnl,hash,oid(int),crossed,fee(str),tid(int),feeToken,twapIdtid 存在性 30+ 条 fill 均有 tid(整数),hash 备用路径极少触发px / sz 类型 字符串(如 "0.01804","14288.0"),_make_fill_id直接使用原始字符串,不做 float 转换oid 类型 整数(如 48913699327),OrderFilledEvent 存为str(oid)side 值 "A"(Ask/卖出)或"B"(Bid/买入),直接透传,无需转换orderUpdates 结构 data = [{"order": {...}, "status": "...", "statusTimestamp": int}],直接数组关键验证:
raw.get("fills", []) if isinstance(raw, dict) else raw解析方式与实测数据结构完全匹配,可正确提取 fills 数组。
测试矩阵
| 场景 | 验证点 |
|---|---|
| 仅 userFills 到达(常规路径) | 订单正确进入 FILLED,加权均价和手数正确 |
| 仅 user 频道 fills 到达 | 订单正确进入 FILLED,与上一行行为一致 |
| user 频道与 userFills 同时到达同一笔 fill(串行) | _fill_ids 去重,仅累计一次,加权均价不偏移 |
| user 频道与 userFills 并发到达(多线程单元测试) | _lock 保护有效,不出现重复累计,_accumulate_fill mock 计数=1,fill_count==1 |
| userFills 先到,orderUpdates 后到 | fills 累计后,orderUpdates 到达时触发 _resolve |
orderUpdates 先到(_ws_status 已设),userFills 后到 |
fill 到达时立即触发 _resolve |
| 同一 oid 多次部分成交(连续多笔 userFills) | 每笔 fill 独立累计,加权均价滚动正确,最终 avg_price 与 REST 查询误差 < 0.01% |
| metadata 为空或缺少 fill_id | handler 记录 WARNING 并跳过,不崩溃,不产生错误 key |
_make_fill_id 对同一 fill 两路结果一致性验证 |
user 频道与 userFills 频道对同一笔成交生成完全相同的 fill_id |
| manager 重建后重新注入 event_bus | 旧 handler 已 unsubscribe(shutdown 调用),新 handler 仅订阅一次(EventBus 自动去重),无残留 |
| manager shutdown() 与 event 处理并发 | unsubscribe 先于 _tracking 清理执行,shutdown 后 handler 不再被调用,无竞态崩溃 |
_resolve 并发两次调用(identity check) |
仅一次成功(result_event.set() 调用一次),第二次返回 False,tracking.status 最终态唯一 |
_on_order_filled_event 内 _accumulate_fill 抛出异常 |
EventBus per-handler try/except 捕获,异常隔离,不影响 WebSocket 消息线程,异常信息写入日志 |
| orderUpdates 的 canceled/rejected | 行为不变,grace 超时与 HTTP 兜底逻辑不受影响 |
| 重连后缓冲 orderUpdates 回放 | 仅 orderUpdates 回放,userFills 不回放,行为符合预期 |
| event_bus=None(测试模式) | 构造函数输出 WARNING,不崩溃 |
tid=0 的 fill 两路均出现 |
_make_fill_id 返回 "tid:0"(is not None 判断),去重正确生效,不走 hash 路径 |
fill 缺少 oid 或 px 等字段 |
hash 路径使用 fill.get(key)(无缺省值),缺失字段均为 None,f-string 产生相同结果,两路 hash 完全一致 |
| userFills 快照(isSnapshot=true)中含不在追踪的历史 fill | _on_order_filled_event 静默跳过(tracking is None),无副作用,不影响当前 PENDING 订单 |
_safe_int_oid 接收非整数字符串 |
返回 None,handler 提前退出,不崩溃 |
| 过渡期(步骤一B+步骤二完成,步骤三未执行)userFills 双路 | path A(_on_user_fill)与 path B(_on_order_filled_event)通过共享 tracking._fill_ids 去重,仅累计一次 |
| EventBus subscribe 重复调用同一 handler | EventBus 自动跳过重复注册,len(handlers)==1 |
变更文件汇总
| 文件 | 变更内容 | 状态 |
|---|---|---|
src/events/trading_events.py |
新增模块级函数 _make_fill_id(fill);OrderFilledEvent 类本身不变 |
✅ 已完成 |
src/utils/websocket/enhanced_ws_manager.py |
import _make_fill_id(替换 @staticmethod);_publish_user_events 调用 _make_fill_id 并含 metadata={"fill_id": fill_id};_cache_latest_data 新增 userFills 分支,修正 data 解析路径(data["fills"]) |
修改点 A ✅ / 修改点 B ⬜ |
src/trading/websocket_order_manager.py |
import _make_fill_id(_fill_key 已删除);构造函数增加 event_bus 参数;新增 _on_order_filled_event handler;shutdown() 时 unsubscribe;步骤三验证通过后必须删除 _on_user_fill 方法及 handle_message 中的 userFills 分支 |
import/删除 ✅ / 订阅 ⬜ |
src/trading/executor.py |
创建 WebSocketOrderManager 时传入 event_bus |
⬜ 待完成 |
src/services/realtime_kline_service_base.py |
on_message 和缓冲回放中,userFills 不再路由到 handle_message |
⬜ 步骤三 |