OrderFilledEvent 接入订单追踪系统8

OrderFilledEvent 接入订单追踪系统

现状诊断

当前 fill 处理路径

user 频道 data.fills  →  _publish_user_events()  →  OrderFilledEvent  →  🕳️ 无消费者(数据丢失)
userFills 频道        →  handle_message()         →  _on_user_fill()   →  直接累计(旧直连路径)

两条路径状态不一致

  • user 频道的 fills_publish_user_events 已发布 OrderFilledEvent(优先级 CRITICAL,metadata 含正确 fill_id),但 EventBus 上零订阅者,成交数据完全丢失,不进入订单追踪。
  • userFills 频道_on_user_fill 已正确解析 data["fills"] 嵌套结构(含向后兼容),通过 _make_fill_id 去重后直接调用 _accumulate_fill走的是旧直连路径而非 EventBus

根本问题

唯一遗留问题OrderFilledEvent 是"已搭好发射台但从未点火"的组件——有定义、有发布、有 CRITICAL 优先级,但没有任何消费者。这不是事件多余,而是订阅方缺位

两路来源中,userFills 频道目前能正常工作(通过旧直连路径),user 频道的成交数据完全丢失。

已完成的修复

项目 状态 说明
_on_user_fill 解析路径 ✅ 已修复 正确取 data["fills"],含 isSnapshot 处理和旧格式兼容
_make_fill_id 共享函数 ✅ 已完成 提取至 src/events/trading_events.py 模块级,_fill_key 已删除
_publish_user_events 发布端 ✅ 已完成 调用 _make_fill_idmetadata={"fill_id": fill_id} 已写入

EventBus 特性确认

经代码核查,EventBus.publish同步阻塞调用,直接遍历 handler 列表执行,不涉及队列或线程池。因此:

  • 消息顺序与 WebSocket 到达顺序完全一致,无时序风险
  • publish(event) 在行为上等价于直接方法调用,无额外开销
  • 使用 EventBus 的目的是解耦(基础设施层不直接持有业务层引用),而非异步
  • 同步调用意味着 WebSocket 消息处理线程会阻塞至 handler 执行完毕;正常情况下延迟极低(微秒级),但需注意避免在 handler 内执行耗时操作

设计目标

  1. 修复 user 频道 fills 丢失:让 WebSocketOrderManager 成为 OrderFilledEvent 的消费者。
  2. 统一成交入口:两路 fill 来源(user 频道 + userFills 频道)均通过 OrderFilledEvent 进入订单追踪,_fill_ids 去重保证不重复累计。
  3. 保持行为不变:订单状态机、加权均价计算、grace 超时、HTTP 兜底逻辑全部不动。
  4. 架构方向正确:基础设施层(enhanced_ws_manager)只发事件,业务层(WebSocketOrderManager)自行订阅,避免跨层耦合;后续 TradeLogger、RiskManager 等新消费者只需 subscribe,无需改动现有代码。

架构变化

变更前

realtime_kline_service_base.on_message
    ├── channel == "orderUpdates"  →  mgr.handle_message()  →  _on_order_update()
    └── channel == "userFills"     →  mgr.handle_message()  →  _on_user_fill()

enhanced_ws_manager._publish_user_events (user 频道 fills)
    └── publish(OrderFilledEvent)  →  【无消费者,数据丢失】

变更后

realtime_kline_service_base.on_message
    ├── channel == "orderUpdates"  →  mgr.handle_message()  →  _on_order_update()  ← 不变
    └── channel == "userFills"     →  【不再路由到 handle_message】

enhanced_ws_manager(两路来源,均通过 _make_fill_id 发布带前缀 fill_id 的事件)
    ├── _publish_user_events (user 频道 fills)   →  publish(OrderFilledEvent + fill_id)  ─┐
    └── _cache_latest_data  (userFills 频道)     →  publish(OrderFilledEvent + fill_id)  ─┤
                                                                                           ↓
                                                                               EventBus(同步)
                                                                                           ↓
                                                               WebSocketOrderManager._on_order_filled_event()
                                                                   └── _fill_ids 去重 → _accumulate_fill()

实现步骤

执行顺序:前提(提取共享函数)→ 步骤一(修复发布端)→ 步骤二(接入消费端)→ 步骤三(移除旧路由)。
步骤一和步骤二完成、验证通过(见步骤三前提标准)后,再执行步骤三;避免在新路径未验证前就切断旧路径。


✅ 前提:提取共享函数 _make_fill_id(已完成)

这是本次变更的基础。发布端两处与消费端必须调用同一函数,否则同一笔成交在两路产生不同 key,去重失效。

当前状态:已完成。_make_fill_id 现为 src/events/trading_events.py 的模块级函数,原 websocket_order_manager._fill_key 已删除,两处调用均已迁移至 _make_fill_id

函数定义(位于 src/events/trading_events.py 顶部):

import hashlib

def _make_fill_id(fill: dict) -> str:
    """
    生成 fill 的唯一去重 key。
    - 有 tid(含 tid=0):返回 "tid:{tid}"
    - 无 tid(即 None):返回 "hash:{md5(oid:px:sz:time)}"

    注意:
    - 使用 `tid is not None` 判断,确保 tid=0 视为有效值。
    - hash 路径各字段直接使用 fill.get(key)(无缺省值),缺失时值为 None,
      f-string 产生 "None:...",两路来源必须完全一致才能正确去重。
    - px/sz 使用原始字符串,不做 float 转换。
    """
    tid = fill.get("tid")
    if tid is not None:
        return f"tid:{tid}"
    raw = f"{fill.get('oid')}:{fill.get('px')}:{fill.get('sz')}:{fill.get('time')}"
    return f"hash:{hashlib.md5(raw.encode()).hexdigest()}"

两侧 import 方式:

from src.events.trading_events import _make_fill_id

步骤一:修复两路发布端,统一写入 fill_id

事件定义说明OrderFilledEvent 继承自 Event,已有 metadata 字段,无需改 OrderFilledEvent 类本身

文件src/utils/websocket/enhanced_ws_manager.py

✅ 修改点 A:_publish_user_events(user 频道)—— 已完成

当前代码已正确调用 _make_fill_id 并写入 metadata

fill_id = _make_fill_id(fill)
event = OrderFilledEvent(
    timestamp=datetime.now(),
    source="websocket",
    order_id=str(fill.get("oid", "")),
    client_order_id=fill.get("cloid"),
    symbol=fill.get("coin", ""),
    side=fill.get("side", ""),
    filled_qty=float(fill.get("sz", 0)),
    filled_price=float(fill.get("px", 0)),
    fee=float(fill.get("fee", 0)),
    closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
    metadata={"fill_id": fill_id},
)
self._event_bus.publish(event)

⬜ 修改点 B:_cache_latest_data(userFills 频道)—— 待完成

在现有 channel == "user" 分支旁新增 elif channel == "userFills" 分支。

注意数据结构(测试网实测验证):Hyperliquid userFills 推送的顶层 data 为 WsUserFills { isSnapshot?, user, fills: WsFill[] },fills 数组在 data["fills"] 下,不是 data 本身。快照消息含 isSnapshot: true,实时成交消息同样使用此结构(isSnapshot 缺省或为 false)。两种情况均需取 data["fills"]

elif channel == "userFills":
    raw = msg.get("data", {})
    # data 结构为 WsUserFills { user, fills: WsFill[] },取 fills 字段
    fills = raw.get("fills", []) if isinstance(raw, dict) else raw
    for fill in fills:
        oid = fill.get("oid")
        if oid is None:
            continue
        fill_id = _make_fill_id(fill)
        event = OrderFilledEvent(
            timestamp=datetime.now(),
            source="websocket",
            order_id=str(oid),
            client_order_id=fill.get("cloid"),
            symbol=fill.get("coin", ""),
            side=fill.get("side", ""),
            filled_qty=float(fill.get("sz", 0)),
            filled_price=float(fill.get("px", 0)),
            fee=float(fill.get("fee", 0)),
            closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
            metadata={"fill_id": fill_id},
        )
        self._event_bus.publish(event)

快照消息处理说明:userFills 首次连接时会推送一条快照(isSnapshot: true),包含该账户的历史成交数据。

  • 已结束订单(不在 _tracking 中,或 status ≠ PENDING):消费端 _on_order_filled_eventtracking is None / tracking.status != PENDING 分支静默跳过,无副作用
  • 重连时仍在 PENDING 的订单:快照中的历史 fill 会被正常累计并触发 _resolve,这是预期行为——补偿重连期间遗漏的成交数据。

两种情况均无需区分处理,现有 _tracking.get(oid) 防护已覆盖全部场景。


⬜ 步骤二:WebSocketOrderManager 订阅 OrderFilledEvent(待完成)

文件 Asrc/trading/executor.py(约第 143 行)

# 变更前
self._ws_order_manager = WebSocketOrderManager(executor=self)

# 变更后
self._ws_order_manager = WebSocketOrderManager(executor=self, event_bus=self._event_bus)

文件 Bsrc/trading/websocket_order_manager.py

构造函数:增加 event_bus 参数,在末尾完成订阅。event_bus=None 仅供单元测试使用;生产路径由 Executor 强制传入,缺省时打印 WARNING,便于排查问题:

def __init__(self, executor, event_bus=None):
    self._executor = executor
    self._tracking: dict[int, OrderTracking] = {}
    self._lock = threading.Lock()
    self._http_busy: set[int] = set()

    self._event_bus = event_bus
    if event_bus is not None:
        event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)
    else:
        logger.warning(
            "WebSocketOrderManager: event_bus 未注入,user 频道 fills 将丢失,"
            "仅测试环境允许此配置"
        )

新增 handler_on_order_filled_event

发布端已保证 metadata["fill_id"] 必然存在且格式正确。消费端直接读取,缺失时记录 WARNING 并跳过(不使用可能产生不一致 key 的备用路径):

def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
    oid = _safe_int_oid(event.order_id)
    if oid is None:
        return

    fill_px = event.filled_price
    fill_sz = event.filled_qty
    if fill_px <= 0 or fill_sz <= 0:
        return

    # fill_id 由发布端通过 _make_fill_id 写入,此处强依赖
    fill_id: str = event.metadata.get("fill_id") if event.metadata else None
    if not fill_id:
        logger.warning(
            "OrderFilledEvent 缺少 fill_id,发布端未正确调用 _make_fill_id,"
            "oid=%s,跳过此事件以避免去重 key 不一致", oid
        )
        return

    should_resolve = False
    resolve_tracking = None

    with self._lock:
        tracking = self._tracking.get(oid)
        if tracking is None or tracking.status != OrderStatus.PENDING:
            return
        if fill_id in tracking._fill_ids:
            logger.debug("跳过重复 fill: oid=%s fill_id=%s", oid, fill_id)
            return

        tracking._fill_ids.add(fill_id)
        self._accumulate_fill(tracking, fill_px, fill_sz)
        tracking.has_fill_price = True
        tracking.fill_count += 1
        logger.info(
            "EventBus fill 累计: %s oid=%s 本笔 px=%s sz=%s | "
            "累计 avg_px=%.6g total_sz=%.6g fills=%s fill_id=%s",
            event.symbol, oid, fill_px, fill_sz,
            tracking.avg_price, tracking.filled_size,
            tracking.fill_count, fill_id,
        )

        if tracking._ws_status is not None:
            should_resolve = True
            resolve_tracking = tracking

    # _resolve 在锁外调用。两路 fill 事件(user 频道与 userFills 频道)可能并发进入此处:
    # - 两次 _resolve 调用持有相同的 resolve_tracking 引用
    # - _resolve 内部通过 identity check(current is not tracking)保证幂等:
    #   第一次调用成功 pop _tracking[oid],第二次因 current is None 直接返回 False
    # - 这是安全的幂等行为,两次并发最多只有一次成功解析
    if should_resolve:
        self._resolve(oid, resolve_tracking)

为何不保留备用 key:消费端自行构造备用 key 时,event.timestampdatetime.now() 生成)与原始 fill["time"](交易所毫秒时间戳)在值和精度上均不同,无法与发布端的 _make_fill_id 对齐。正确做法是在发布端保证 fill_id 必填,而非在消费端兜底。

取消订阅:在 manager 的 close()reset() 方法中调用:

if self._event_bus is not None:
    self._event_bus.unsubscribe(OrderFilledEvent, self._on_order_filled_event)

防止 manager 重建时 handler 残留或多次订阅。


⬜ 步骤三:移除 userFills 旧路由(待完成,依赖步骤二验证通过)

前提(量化验证标准):在测试网环境完成以下验证后,再执行此步骤:

  1. 下 10 笔以上限价单,追踪成功率 100%(PENDING → FILLED 状态正确流转)
  2. 手动触发 user 频道与 userFills 频道同时到达同一笔 fill,加权均价与 REST 查询结果误差 < 0.01%
  3. 查看日志,无 "跳过重复 fill" 以外的 WARNING/ERROR
  4. 以上验证在主网灰度(单账户小仓位)复测通过

文件src/services/realtime_kline_service_base.py(约第 639 行)

# 变更前
if channel in ("orderUpdates", "userFills"):
    mgr.handle_message(msg)

# 变更后
if channel == "orderUpdates":          # userFills 已由 EventBus 接管,不再直接路由
    mgr.handle_message(msg)

缓冲区回放(重连恢复逻辑):同步修改,缓冲消息回放时同样只对 orderUpdates 调用 handle_message,不对 userFills 回放。

取舍说明:移除旧路由后,重连期间的历史 userFills 消息不会被补偿回放。这是有意的取舍:

  • 断线期间的成交补偿应通过 REST API 查询订单状态,而非回放 WebSocket 原始消息(现有 HTTP 兜底逻辑不变)。
  • 若未来需要精确补偿,应在 WebSocketReconnectedEvent 的 handler 中主动拉取 fills,而不是回放原始 userFills。

步骤三验证通过后,必须删除以下死代码(步骤三完成后这两处将永远不被调用):

  1. websocket_order_manager.py 中的 _on_user_fill 方法(整个方法体)
  2. handle_message 中对 channel == "userFills" 的分支

保留死代码会误导后续维护者以为该路径仍然生效,必须在步骤三验收时一并清理。


fill_id 生成规则(发布端与消费端必须一致)

优先级 规则 说明
1 tid:{fill["tid"]} 交易所唯一成交 ID(Hyperliquid WsFill.tid),带 tid: 前缀
2 hash:{md5(oid:px:sz:time)} tid 不存在时的确定性备用 key,带 hash: 前缀

关键约束

  • 发布端与消费端统一调用 _make_fill_id(fill)(定义于 src/events/trading_events.py),禁止使用裸字段读取方式——后者在 tid 存在时可能返回裸值(如 12345),缺少 tid: 前缀,导致去重失效。
  • tid=0 是有效值:使用 if tid is not None 判断(而非 if tid),确保 tid=0 的 fill 返回 "tid:0"。若用 if tid:tid=0 被误判为无 tid,走 hash 路径,产生不同 key,去重失效。
  • hash 路径缺省值:各字段使用 fill.get(key)(无缺省值),字段缺失时值为 None,f-string 产生 "None:..."。若使用 fill.get(key, "") 则缺失字段产生 ":...",两路 hash 不同,去重失效。
  • pxsz 使用原始字符串(不做 float 转换)。
  • 消费端强依赖 metadata["fill_id"],不自行构造备用 key(原因:event.timestampfill["time"] 在值和精度上不同,无法对齐)。

联网文档与实测:响应体结构与 key 统一

官方数据结构(Hyperliquid)

  • 文档WebSocket Subscriptions 中定义:
    • userFillsWsUserFills { user, fills: WsFill[] }fills 在嵌套字段中,不是 data 本身
    • userEvents(或部分实现中的 user)→ WsUserEvent 可包含 { "fills": WsFill[] }
  • WsFill 字段(与去重相关):oid, tid(唯一成交 ID), px, sz, time, coin, side, fee, closedPnl 等。两路 fill 来源均使用同一 WsFill 结构,故可用同一套 fill_id 规则。

实测与 key 协调

  1. 在测试网观察真实响应:运行脚本捕获 WebSocket 消息与 fill 结构:

    TRADING_NETWORK=testnet python scripts/capture_fill_responses_testnet.py
    

    脚本会订阅 orderUpdates / userFills / user,在测试网下一笔小限价单(或手动下单),将收到的消息写入 scripts/out/fill_responses_<ts>.json,并在控制台打印各 channel 下 fill 的字段集合与样本,便于确认 tidtime 等在两路中的存在与格式。

  2. key/id 统一约定

    • 唯一标识:以 fill_id 作为去重键,与现有 OrderTracking._fill_ids 一致。
    • 生成方式:统一调用 _make_fill_id(fill):有 tid 时用 tid:{tid},无 tid 时用 hash:{md5(oid:px:sz:time)};发布端与消费端共用同一函数,保证 user 频道与 userFills 频道对同一笔成交得到相同 fill_id,从而只累计一次。
  3. 实测结果(两次测试网捕获,数据完全一致)

    项目 结论
    实测时间 2026-02-22 12:27 / 2026-02-22 12:45(两次)
    userFills 顶层结构 data = {"isSnapshot": bool, "user": "0x...", "fills": [WsFill, ...]}
    fills 解析方式 必须取 data["fills"],不可直接用 data
    WsFill 字段 coin, px(str), sz(str), side(str), time(int ms), startPosition, dir, closedPnl, hash, oid(int), crossed, fee(str), tid(int), feeToken, twapId
    tid 存在性 30+ 条 fill 均有 tid(整数),hash 备用路径极少触发
    px / sz 类型 字符串(如 "0.01804", "14288.0"),_make_fill_id 直接使用原始字符串,不做 float 转换
    oid 类型 整数(如 48913699327),OrderFilledEvent 存为 str(oid)
    side 值 "A"(Ask/卖出)或 "B"(Bid/买入),直接透传,无需转换
    orderUpdates 结构 data = [{"order": {...}, "status": "...", "statusTimestamp": int}],直接数组

    关键验证raw.get("fills", []) if isinstance(raw, dict) else raw 解析方式与实测数据结构完全匹配,可正确提取 fills 数组。


测试矩阵

场景 验证点
仅 userFills 到达(常规路径) 订单正确进入 FILLED,加权均价和手数正确
仅 user 频道 fills 到达 订单正确进入 FILLED,与上一行行为一致
user 频道与 userFills 同时到达同一笔 fill(串行) _fill_ids 去重,仅累计一次,加权均价不偏移
user 频道与 userFills 并发到达(多线程) _lock 保护有效,不出现重复累计,加权均价正确
userFills 先到,orderUpdates 后到 fills 累计后,orderUpdates 到达时触发 _resolve
orderUpdates 先到(_ws_status 已设),userFills 后到 fill 到达时立即触发 _resolve
同一 oid 多次部分成交(连续多笔 userFills) 每笔 fill 独立累计,加权均价滚动正确,最终 avg_price 与 REST 查询误差 < 0.01%
metadata 为空或缺少 fill_id handler 记录 WARNING 并跳过,不崩溃,不产生错误 key
_make_fill_id 对同一 fill 两路结果一致性验证 user 频道与 userFills 频道对同一笔成交生成完全相同的 fill_id
manager 重建后重新注入 event_bus 旧 handler 已 unsubscribe,新 handler 仅订阅一次,无残留
manager close() 与 event 处理并发 unsubscribe 后 handler 不再被调用,无竞态崩溃
orderUpdates 的 canceled/rejected 行为不变,grace 超时与 HTTP 兜底逻辑不受影响
重连后缓冲 orderUpdates 回放 仅 orderUpdates 回放,userFills 不回放,行为符合预期
event_bus=None(测试模式) 构造函数输出 WARNING,不崩溃
tid=0 的 fill 两路均出现 _make_fill_id 返回 "tid:0"is not None 判断),去重正确生效,不走 hash 路径
fill 缺少 oidpx 等字段 hash 路径使用 fill.get(key)(无缺省值),缺失字段均为 None,f-string 产生相同结果,两路 hash 完全一致
userFills 快照(isSnapshot=true)中含不在追踪的历史 fill _on_order_filled_event 静默跳过(tracking is None),无副作用,不影响当前 PENDING 订单

变更文件汇总

文件 变更内容 状态
src/events/trading_events.py 新增模块级函数 _make_fill_id(fill)OrderFilledEvent 类本身不变 ✅ 已完成
src/utils/websocket/enhanced_ws_manager.py import _make_fill_id(替换 @staticmethod);_publish_user_events 调用 _make_fill_id 并含 metadata={"fill_id": fill_id}_cache_latest_data 新增 userFills 分支,修正 data 解析路径(data["fills"] 修改点 A ✅ / 修改点 B ⬜
src/trading/websocket_order_manager.py import _make_fill_id_fill_key 已删除);构造函数增加 event_bus 参数;新增 _on_order_filled_event handler;close/reset 时 unsubscribe;步骤三验证通过后必须删除 _on_user_fill 方法及 handle_message 中的 userFills 分支 import/删除 ✅ / 订阅 ⬜
src/trading/executor.py 创建 WebSocketOrderManager 时传入 event_bus ⬜ 待完成
src/services/realtime_kline_service_base.py on_message 和缓冲回放中,userFills 不再路由到 handle_message ⬜ 步骤三

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG