OrderFilledEvent 接入订单追踪系统3

OrderFilledEvent 接入订单追踪系统

现状诊断

当前 fill 处理路径

user 频道 data.fills  →  _publish_user_events()  →  OrderFilledEvent  →  🕳️ 无消费者(数据丢失)
userFills 频道        →  handle_message()         →  _on_user_fill()   →  ✅ 订单追踪正常

两条路径并存,结果截然不同:

  • user 频道的 fills_publish_user_events 已发布 OrderFilledEvent(优先级 CRITICAL),但 EventBus 上零订阅者,成交数据完全丢失,不进入订单追踪。
  • userFills 频道:通过 realtime_kline_service_base.on_message 路由至 WebSocketOrderManager.handle_message_on_user_fill,订单追踪正常工作。

根本问题

OrderFilledEvent 是"已搭好发射台但从未点火"的组件——有定义、有发布、有 CRITICAL 优先级,但没有任何消费者。这不是事件多余,而是订阅方缺位。

EventBus 特性确认

经代码核查,EventBus.publish同步阻塞调用,直接遍历 handler 列表执行,不涉及队列或线程池。因此:

  • 消息顺序与 WebSocket 到达顺序完全一致,无时序风险
  • publish(event) 在行为上等价于直接方法调用,无额外开销
  • 使用 EventBus 的目的是解耦(基础设施层不直接持有业务层引用),而非异步

设计目标

  1. 修复 user 频道 fills 丢失:让 WebSocketOrderManager 成为 OrderFilledEvent 的消费者。
  2. 统一成交入口:两路 fill 来源(user 频道 + userFills 频道)均通过 OrderFilledEvent 进入订单追踪,_fill_ids 去重保证不重复累计。
  3. 保持行为不变:订单状态机、加权均价计算、grace 超时、HTTP 兜底逻辑全部不动。
  4. 架构方向正确:基础设施层(enhanced_ws_manager)只发事件,业务层(WebSocketOrderManager)自行订阅,避免跨层耦合;后续 TradeLogger、RiskManager 等新消费者只需 subscribe,无需改动现有代码。

架构变化

变更前

realtime_kline_service_base.on_message
    ├── channel == "orderUpdates"  →  mgr.handle_message()  →  _on_order_update()
    └── channel == "userFills"     →  mgr.handle_message()  →  _on_user_fill()

enhanced_ws_manager._publish_user_events (user 频道 fills)
    └── publish(OrderFilledEvent)  →  【无消费者,数据丢失】

变更后

realtime_kline_service_base.on_message
    ├── channel == "orderUpdates"  →  mgr.handle_message()  →  _on_order_update()  ← 不变
    └── channel == "userFills"     →  【不再路由到 handle_message】

enhanced_ws_manager(两路来源,均发布带 fill_id 的事件)
    ├── _publish_user_events (user 频道 fills)   →  publish(OrderFilledEvent + fill_id)  ─┐
    └── _cache_latest_data  (userFills 频道)     →  publish(OrderFilledEvent + fill_id)  ─┤
                                                                                           ↓
                                                                               EventBus(同步)
                                                                                           ↓
                                                               WebSocketOrderManager._on_order_filled_event()
                                                                   └── _fill_ids 去重 → _accumulate_fill()

实现步骤

执行顺序:步骤一(修复发布端)→ 步骤二(接入消费端)→ 步骤三(移除旧路由)。
步骤一和步骤二完成、验证通过后,再执行步骤三;避免在新路径未验证前就切断旧路径。


步骤一:修复两路发布端,统一写入 fill_id

文件src/utils/websocket/enhanced_ws_manager.py

两处修改保持 fill_id 生成规则完全一致:优先取 fill["tid"],不存在时取 md5(f"{oid}:{px}:{sz}:{time}") ——与 websocket_order_manager._fill_key() 函数的逻辑相同,直接复用或提取为共享函数。

修改点 A_publish_user_events(user 频道,约第 1386 行)

在已有的 OrderFilledEvent 构造中补充 metadata

fill_id = fill.get("tid") or _fill_key(fill)  # 与 _fill_key() 规则一致
event = OrderFilledEvent(
    timestamp=datetime.now(),
    source="websocket",
    order_id=str(fill.get("oid", "")),
    client_order_id=fill.get("cloid"),
    symbol=fill.get("coin", ""),
    side=fill.get("side", ""),
    filled_qty=float(fill.get("sz", 0)),
    filled_price=float(fill.get("px", 0)),
    fee=float(fill.get("fee", 0)),
    closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
    metadata={"fill_id": fill_id},          # ← 新增
)
self._event_bus.publish(event)

修改点 B_cache_latest_data(userFills 频道,新增分支)

在现有 channel == "user" 分支旁新增:

elif channel == "userFills":
    raw = msg.get("data", {})
    fills = raw if isinstance(raw, list) else [raw]
    for fill in fills:
        oid = fill.get("oid")
        if not oid:
            continue
        fill_id = fill.get("tid") or _fill_key(fill)   # 与修改点 A 规则一致
        event = OrderFilledEvent(
            timestamp=datetime.now(),
            source="websocket",
            order_id=str(oid),
            client_order_id=fill.get("cloid"),
            symbol=fill.get("coin", ""),
            side=fill.get("side", ""),
            filled_qty=float(fill.get("sz", 0)),
            filled_price=float(fill.get("px", 0)),
            fee=float(fill.get("fee", 0)),
            closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
            metadata={"fill_id": fill_id},
        )
        self._event_bus.publish(event)

步骤二:WebSocketOrderManager 订阅 OrderFilledEvent

文件 Asrc/trading/executor.py(约第 143 行)

# 变更前
self._ws_order_manager = WebSocketOrderManager(executor=self)

# 变更后
self._ws_order_manager = WebSocketOrderManager(executor=self, event_bus=self._event_bus)

文件 Bsrc/trading/websocket_order_manager.py

构造函数:增加 event_bus 参数,在末尾完成订阅:

def __init__(self, executor, event_bus=None):
    self._executor = executor
    self._tracking: dict[int, OrderTracking] = {}
    self._lock = threading.Lock()
    self._http_busy: set[int] = set()

    self._event_bus = event_bus
    if event_bus is not None:
        event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)

event_bus=None 仅供单元测试使用;生产路径由 Executor 强制传入,不允许缺省运行。

新增 handler_on_order_filled_event

def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
    oid = _safe_int_oid(event.order_id)
    if oid is None:
        return

    fill_px = event.filled_price
    fill_sz = event.filled_qty
    if fill_px <= 0 or fill_sz <= 0:
        return

    # fill_id:优先取发布端写入的值,不存在时回退到与 _fill_key 等价的备用规则
    fill_id: str = event.metadata.get("fill_id") or (
        f"{oid}:{fill_px}:{fill_sz}:{event.timestamp.timestamp()}"
    )

    should_resolve = False
    resolve_tracking = None

    with self._lock:
        tracking = self._tracking.get(oid)
        if tracking is None or tracking.status != OrderStatus.PENDING:
            return
        if fill_id in tracking._fill_ids:
            logger.debug("跳过重复 fill: oid=%s fill_id=%s", oid, fill_id)
            return

        tracking._fill_ids.add(fill_id)
        self._accumulate_fill(tracking, fill_px, fill_sz)
        tracking.has_fill_price = True
        tracking.fill_count += 1

        if tracking._ws_status is not None:
            should_resolve = True
            resolve_tracking = tracking

    if should_resolve:
        self._resolve(oid, resolve_tracking)

逻辑与现有 _on_user_fill 完全等价,仅将输入从原始 dict 换为 Event 对象。

取消订阅:在 manager 的 close()reset() 方法中调用:

if self._event_bus is not None:
    self._event_bus.unsubscribe(OrderFilledEvent, self._on_order_filled_event)

防止 manager 重建时 handler 残留或多次订阅。


步骤三:移除 userFills 旧路由

前提:步骤一、二已上线并验证订单追踪正常后,再执行此步骤。

文件src/services/realtime_kline_service_base.py(约第 639 行)

# 变更前
if channel in ("orderUpdates", "userFills"):
    mgr.handle_message(msg)

# 变更后
if channel == "orderUpdates":          # userFills 已由 EventBus 接管,不再直接路由
    mgr.handle_message(msg)

缓冲区回放(重连恢复逻辑):同步修改,缓冲消息回放时同样只对 orderUpdates 调用 handle_message,不对 userFills 回放。

说明:移除旧路由后,重连期间的历史 userFills 消息不会被补偿回放。这是有意的取舍:

  • 断线期间的成交补偿应通过 REST API 查询订单状态,而非回放 WebSocket 原始消息(现有 HTTP 兜底逻辑不变)。
  • 若未来需要精确补偿,应在 WebSocketReconnectedEvent 的 handler 中主动拉取 fills,而不是回放原始 userFills。

fill_id 生成规则(发布端与消费端必须一致)

优先级 规则 说明
1 tid:{fill["tid"]} 交易所唯一成交 ID(Hyperliquid WsFill.tid),与现有 _fill_key 格式一致
2 hash:{md5(oid:px:sz:time)} tid 不存在时的确定性备用 key,与现有 _fill_key 一致

重要:存储的 fill_id 必须与 websocket_order_manager._fill_key() 的返回值一致(带 tid:hash: 前缀),否则 user 与 userFills 两路对同一笔成交会产生不同 key,去重失效。发布端应统一调用 _fill_key(fill) 或共享的 _make_fill_id(fill),不要使用裸的 fill.get("tid")

发布端(两处:_publish_user_events_cache_latest_data)与消费端(_on_order_filled_event 的备用 key)的生成规则必须完全一致。建议将该逻辑提取为模块级私有函数 _make_fill_id(fill: dict) -> str,两侧共享调用。


联网文档与实测:响应体结构与 key 统一

官方数据结构(Hyperliquid)

  • 文档WebSocket Subscriptions 中定义:
    • userFillsWsUserFills { user, fills: WsFill[] }
    • userEvents(或部分实现中的 user)→ WsUserEvent 可包含 { "fills": WsFill[] }
  • WsFill 字段(与去重相关):oid, tid(唯一成交 ID), px, sz, time, coin, side, fee, closedPnl, 等。两路 fill 来源均使用同一 WsFill 结构,故可用同一套 fill_id 规则。

实测与 key 协调

  1. 在测试网观察真实响应:运行脚本捕获 WebSocket 消息与 fill 结构:

    TRADING_NETWORK=testnet python scripts/capture_fill_responses_testnet.py
    

    脚本会订阅 orderUpdates / userFills / user,在测试网下一笔小限价单(或手动下单),将收到的消息写入 scripts/out/fill_responses_<ts>.json,并在控制台打印各 channel 下 fill 的字段集合与样本,便于确认 tidtime 等在两路中的存在与格式。

  2. key/id 统一约定

    • 唯一标识:以 fill_id 作为去重键,与现有 OrderTracking._fill_ids 一致。
    • 生成方式:与 _fill_key(fill) 保持一致:有 tid 时用 tid:{tid},无 tid 时用 hash:{md5(oid:px:sz:time)};发布端与消费端共用同一函数或同一规则,保证 user 频道与 userFills 频道对同一笔成交得到相同 fill_id,从而只累计一次。
  3. 实测结果(测试网):运行上述脚本后,userFills 的 fill 对象中可见字段包括:oid, tid, px, sz, time, coin, side, fee, closedPnl, hash, crossed, dir, startPosition, feeToken, twapId。其中 tid 为数字、time 为毫秒时间戳,与官方 WsFill 一致;去重时应优先使用 tid,备用 key 使用 oid:px:sz:time(注意 API 中 px/sz 可能为字符串,需与 _fill_key 一致做类型处理)。


测试矩阵

场景 验证点
仅 userFills 到达(常规路径) 订单正确进入 FILLED,加权均价和手数正确
仅 user 频道 fills 到达 订单正确进入 FILLED,与上一行行为一致
user 频道与 userFills 同时到达同一笔 fill _fill_ids 去重,仅累计一次,加权均价不偏移
userFills 先到,orderUpdates 后到 fills 累计后,orderUpdates 到达时触发 _resolve
orderUpdates 先到(_ws_status 已设),userFills 后到 fill 到达时立即触发 _resolve
fill_id 缺失(metadata 为空) 备用 key 生成,handler 不崩溃,去重仍有效
manager 重建后重新注入 event_bus 旧 handler 已 unsubscribe,新 handler 仅订阅一次
orderUpdates 的 canceled/rejected 行为不变,grace 超时与 HTTP 兜底逻辑不受影响
重连后缓冲 orderUpdates 回放 仅 orderUpdates 回放,userFills 不回放,行为符合预期

变更文件汇总

文件 变更内容
src/utils/websocket/enhanced_ws_manager.py _publish_user_events 补 fill_id;_cache_latest_data 新增 userFills 分支发布事件
src/trading/websocket_order_manager.py 构造函数增加 event_bus 参数;新增 _on_order_filled_event handler;close/reset 时 unsubscribe
src/trading/executor.py 创建 WebSocketOrderManager 时传入 event_bus
src/services/realtime_kline_service_base.py on_message 和缓冲回放中,userFills 不再路由到 handle_message

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG