OrderFilledEvent 接入订单追踪系统5

OrderFilledEvent 接入订单追踪系统

现状诊断

当前 fill 处理路径

user 频道 data.fills  →  _publish_user_events()  →  OrderFilledEvent  →  🕳️ 无消费者(数据丢失)
userFills 频道        →  handle_message()         →  _on_user_fill()   →  🕳️ 解析路径错误(数据丢失)

两条路径均静默失效

  • user 频道的 fills_publish_user_events 已发布 OrderFilledEvent(优先级 CRITICAL),但 EventBus 上零订阅者,成交数据完全丢失,不进入订单追踪。
  • userFills 频道:通过 realtime_kline_service_base.on_message 路由至 WebSocketOrderManager.handle_message_on_user_fill,但解析路径错误,所有 fills 被静默跳过(见下方说明)。

根本问题

问题一OrderFilledEvent 是"已搭好发射台但从未点火"的组件——有定义、有发布、有 CRITICAL 优先级,但没有任何消费者。这不是事件多余,而是订阅方缺位。

问题二(测试网实测发现)_on_user_fill 的解析路径与 Hyperliquid 实际推送结构不符。

Hyperliquid userFills 的实际 data 结构(两次测试网捕获一致):

{
  "channel": "userFills",
  "data": {
    "isSnapshot": true,
    "user": "0x...",
    "fills": [ { "oid": ..., "tid": ..., "px": "...", ... } ]
  }
}

而现有 _on_user_fill 代码:

raw = message.get("data")          # → {"isSnapshot": true, "user": "...", "fills": [...]}(dict)
fills = raw if isinstance(raw, list) else [raw]  # → [raw],把整个 dict 当作一个 fill
for fill_data in fills:
    oid = _safe_int_oid(fill_data.get("oid"))   # → None(dict 无 "oid" key)
    if oid is None:
        continue                    # → 所有 fills 均被跳过

data 是嵌套 dict(非列表),代码错误地将整个 data 对象包装为单个 fill,导致 oid 始终为 None,所有 fills 静默跳过,订单追踪从未真正工作

EventBus 特性确认

经代码核查,EventBus.publish同步阻塞调用,直接遍历 handler 列表执行,不涉及队列或线程池。因此:

  • 消息顺序与 WebSocket 到达顺序完全一致,无时序风险
  • publish(event) 在行为上等价于直接方法调用,无额外开销
  • 使用 EventBus 的目的是解耦(基础设施层不直接持有业务层引用),而非异步

设计目标

  1. 修复 user 频道 fills 丢失:让 WebSocketOrderManager 成为 OrderFilledEvent 的消费者。
  2. 统一成交入口:两路 fill 来源(user 频道 + userFills 频道)均通过 OrderFilledEvent 进入订单追踪,_fill_ids 去重保证不重复累计。
  3. 保持行为不变:订单状态机、加权均价计算、grace 超时、HTTP 兜底逻辑全部不动。
  4. 架构方向正确:基础设施层(enhanced_ws_manager)只发事件,业务层(WebSocketOrderManager)自行订阅,避免跨层耦合;后续 TradeLogger、RiskManager 等新消费者只需 subscribe,无需改动现有代码。

架构变化

变更前

realtime_kline_service_base.on_message
    ├── channel == "orderUpdates"  →  mgr.handle_message()  →  _on_order_update()
    └── channel == "userFills"     →  mgr.handle_message()  →  _on_user_fill()

enhanced_ws_manager._publish_user_events (user 频道 fills)
    └── publish(OrderFilledEvent)  →  【无消费者,数据丢失】

变更后

realtime_kline_service_base.on_message
    ├── channel == "orderUpdates"  →  mgr.handle_message()  →  _on_order_update()  ← 不变
    └── channel == "userFills"     →  【不再路由到 handle_message】

enhanced_ws_manager(两路来源,均通过 _make_fill_id 发布带前缀 fill_id 的事件)
    ├── _publish_user_events (user 频道 fills)   →  publish(OrderFilledEvent + fill_id)  ─┐
    └── _cache_latest_data  (userFills 频道)     →  publish(OrderFilledEvent + fill_id)  ─┤
                                                                                           ↓
                                                                               EventBus(同步)
                                                                                           ↓
                                                               WebSocketOrderManager._on_order_filled_event()
                                                                   └── _fill_ids 去重 → _accumulate_fill()

实现步骤

执行顺序:前提(提取共享函数)→ 步骤一(修复发布端)→ 步骤二(接入消费端)→ 步骤三(移除旧路由)。
步骤一和步骤二完成、验证通过(见步骤三前提标准)后,再执行步骤三;避免在新路径未验证前就切断旧路径。


前提:提取共享函数 _make_fill_id

这是本次变更的基础。发布端两处与消费端必须调用同一函数,否则同一笔成交在两路产生不同 key,去重失效。

src/utils/websocket/enhanced_ws_manager.py 顶部(模块级)新增:

import hashlib

def _make_fill_id(fill: dict) -> str:
    """
    生成 fill 的唯一去重 key,规则与 websocket_order_manager._fill_key() 完全一致。
    - 有 tid:返回 "tid:{tid}"
    - 无 tid:返回 "hash:{md5(oid:px:sz:time)}"
    注意:px/sz 直接使用原始字符串,不做类型转换,与 _fill_key 保持一致。
    """
    tid = fill.get("tid")
    if tid:
        return f"tid:{tid}"
    oid = fill.get("oid", "")
    px = fill.get("px", "")
    sz = fill.get("sz", "")
    t = fill.get("time", "")
    return f"hash:{hashlib.md5(f'{oid}:{px}:{sz}:{t}'.encode()).hexdigest()}"

同时将 websocket_order_manager._fill_key() 迁移为调用此共享函数(或直接内联相同逻辑),确保两侧行为完全一致。


步骤一:修复两路发布端,统一写入 fill_id

文件src/utils/websocket/enhanced_ws_manager.py

两处修改均调用 _make_fill_id(fill),保证 fill_id 带有 tid:hash: 前缀,与 _fill_key() 返回值格式完全一致。

修改点 A_publish_user_events(user 频道,约第 1386 行)

在已有的 OrderFilledEvent 构造中,将裸字段读取替换为 _make_fill_id 调用,并补充 metadata

fill_id = _make_fill_id(fill)          # ← 使用共享函数,保证带前缀
event = OrderFilledEvent(
    timestamp=datetime.now(),
    source="websocket",
    order_id=str(fill.get("oid", "")),
    client_order_id=fill.get("cloid"),
    symbol=fill.get("coin", ""),
    side=fill.get("side", ""),
    filled_qty=float(fill.get("sz", 0)),
    filled_price=float(fill.get("px", 0)),
    fee=float(fill.get("fee", 0)),
    closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
    metadata={"fill_id": fill_id},     # ← 新增,消费端依赖此字段去重
)
self._event_bus.publish(event)

修改点 B_cache_latest_data(userFills 频道,新增分支)

在现有 channel == "user" 分支旁新增。

注意数据结构(测试网实测验证):Hyperliquid userFills 推送的顶层 data 为 WsUserFills { isSnapshot?, user, fills: WsFill[] },fills 数组在 data["fills"] 下,不是 data 本身。快照消息含 isSnapshot: true,实时成交消息同样使用此结构(isSnapshot 缺省或为 false)。两种情况均需取 data["fills"]

elif channel == "userFills":
    raw = msg.get("data", {})
    # data 结构为 WsUserFills { user, fills: WsFill[] },取 fills 字段
    fills = raw.get("fills", []) if isinstance(raw, dict) else raw
    for fill in fills:
        oid = fill.get("oid")
        if not oid:
            continue
        fill_id = _make_fill_id(fill)   # ← 与修改点 A 完全一致
        event = OrderFilledEvent(
            timestamp=datetime.now(),
            source="websocket",
            order_id=str(oid),
            client_order_id=fill.get("cloid"),
            symbol=fill.get("coin", ""),
            side=fill.get("side", ""),
            filled_qty=float(fill.get("sz", 0)),
            filled_price=float(fill.get("px", 0)),
            fee=float(fill.get("fee", 0)),
            closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
            metadata={"fill_id": fill_id},
        )
        self._event_bus.publish(event)

步骤二:WebSocketOrderManager 订阅 OrderFilledEvent

文件 Asrc/trading/executor.py(约第 143 行)

# 变更前
self._ws_order_manager = WebSocketOrderManager(executor=self)

# 变更后
self._ws_order_manager = WebSocketOrderManager(executor=self, event_bus=self._event_bus)

文件 Bsrc/trading/websocket_order_manager.py

构造函数:增加 event_bus 参数,在末尾完成订阅。event_bus=None 仅供单元测试使用;生产路径由 Executor 强制传入,缺省时打印 WARNING,便于排查问题:

def __init__(self, executor, event_bus=None):
    self._executor = executor
    self._tracking: dict[int, OrderTracking] = {}
    self._lock = threading.Lock()
    self._http_busy: set[int] = set()

    self._event_bus = event_bus
    if event_bus is not None:
        event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)
    else:
        logger.warning(
            "WebSocketOrderManager: event_bus 未注入,user 频道 fills 将丢失,"
            "仅测试环境允许此配置"
        )

新增 handler_on_order_filled_event

发布端已保证 metadata["fill_id"] 必然存在且格式正确。消费端直接读取,缺失时记录 WARNING 并跳过(不使用可能产生不一致 key 的备用路径):

def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
    oid = _safe_int_oid(event.order_id)
    if oid is None:
        return

    fill_px = event.filled_price
    fill_sz = event.filled_qty
    if fill_px <= 0 or fill_sz <= 0:
        return

    # fill_id 由发布端通过 _make_fill_id 写入,此处强依赖
    fill_id: str = event.metadata.get("fill_id") if event.metadata else None
    if not fill_id:
        logger.warning(
            "OrderFilledEvent 缺少 fill_id,发布端未正确调用 _make_fill_id,"
            "oid=%s,跳过此事件以避免去重 key 不一致", oid
        )
        return

    should_resolve = False
    resolve_tracking = None

    with self._lock:
        tracking = self._tracking.get(oid)
        if tracking is None or tracking.status != OrderStatus.PENDING:
            return
        if fill_id in tracking._fill_ids:
            logger.debug("跳过重复 fill: oid=%s fill_id=%s", oid, fill_id)
            return

        tracking._fill_ids.add(fill_id)
        self._accumulate_fill(tracking, fill_px, fill_sz)
        tracking.has_fill_price = True
        tracking.fill_count += 1

        if tracking._ws_status is not None:
            should_resolve = True
            resolve_tracking = tracking

    # _resolve 在锁外调用,resolve_tracking 是锁内取得的对象引用,此时 status 不会回退
    if should_resolve:
        self._resolve(oid, resolve_tracking)

为何不保留备用 key:消费端自行构造备用 key 时,event.timestampdatetime.now() 生成)与原始 fill["time"](交易所毫秒时间戳)在值和精度上均不同,无法与发布端的 _make_fill_id 对齐。正确做法是在发布端保证 fill_id 必填,而非在消费端兜底。

取消订阅:在 manager 的 close()reset() 方法中调用:

if self._event_bus is not None:
    self._event_bus.unsubscribe(OrderFilledEvent, self._on_order_filled_event)

防止 manager 重建时 handler 残留或多次订阅。


步骤三:移除 userFills 旧路由

前提(量化验证标准):在测试网环境完成以下验证后,再执行此步骤:

  1. 下 10 笔以上限价单,追踪成功率 100%(PENDING → FILLED 状态正确流转)
  2. 手动触发 user 频道与 userFills 频道同时到达同一笔 fill,加权均价与 REST 查询结果误差 < 0.01%
  3. 查看日志,无 "跳过重复 fill" 以外的 WARNING/ERROR
  4. 以上验证在主网灰度(单账户小仓位)复测通过

文件src/services/realtime_kline_service_base.py(约第 639 行)

# 变更前
if channel in ("orderUpdates", "userFills"):
    mgr.handle_message(msg)

# 变更后
if channel == "orderUpdates":          # userFills 已由 EventBus 接管,不再直接路由
    mgr.handle_message(msg)

缓冲区回放(重连恢复逻辑):同步修改,缓冲消息回放时同样只对 orderUpdates 调用 handle_message,不对 userFills 回放。

取舍说明:移除旧路由后,重连期间的历史 userFills 消息不会被补偿回放。这是有意的取舍:

  • 断线期间的成交补偿应通过 REST API 查询订单状态,而非回放 WebSocket 原始消息(现有 HTTP 兜底逻辑不变)。
  • 若未来需要精确补偿,应在 WebSocketReconnectedEvent 的 handler 中主动拉取 fills,而不是回放原始 userFills。

fill_id 生成规则(发布端与消费端必须一致)

优先级 规则 说明
1 tid:{fill["tid"]} 交易所唯一成交 ID(Hyperliquid WsFill.tid),带 tid: 前缀
2 hash:{md5(oid:px:sz:time)} tid 不存在时的确定性备用 key,带 hash: 前缀

关键约束

  • 发布端必须调用 _make_fill_id(fill),禁止使用 fill.get("tid") or _fill_key(fill) 等裸字段读取方式——后者在 tid 存在时返回裸值(如 12345),缺少 tid: 前缀,与 _fill_key 返回的 "tid:12345" 不同,导致去重失效。
  • pxsz 使用原始字符串(不做 float 转换),与 _fill_key 保持一致。
  • 消费端强依赖 metadata["fill_id"],不自行构造备用 key(原因:event.timestampfill["time"] 在值和精度上不同,无法对齐)。

联网文档与实测:响应体结构与 key 统一

官方数据结构(Hyperliquid)

  • 文档WebSocket Subscriptions 中定义:
    • userFillsWsUserFills { user, fills: WsFill[] }fills 在嵌套字段中,不是 data 本身
    • userEvents(或部分实现中的 user)→ WsUserEvent 可包含 { "fills": WsFill[] }
  • WsFill 字段(与去重相关):oid, tid(唯一成交 ID), px, sz, time, coin, side, fee, closedPnl 等。两路 fill 来源均使用同一 WsFill 结构,故可用同一套 fill_id 规则。

实测与 key 协调

  1. 在测试网观察真实响应:运行脚本捕获 WebSocket 消息与 fill 结构:

    TRADING_NETWORK=testnet python scripts/capture_fill_responses_testnet.py
    

    脚本会订阅 orderUpdates / userFills / user,在测试网下一笔小限价单(或手动下单),将收到的消息写入 scripts/out/fill_responses_<ts>.json,并在控制台打印各 channel 下 fill 的字段集合与样本,便于确认 tidtime 等在两路中的存在与格式。

  2. key/id 统一约定

    • 唯一标识:以 fill_id 作为去重键,与现有 OrderTracking._fill_ids 一致。
    • 生成方式:统一调用 _make_fill_id(fill):有 tid 时用 tid:{tid},无 tid 时用 hash:{md5(oid:px:sz:time)};发布端与消费端共用同一函数,保证 user 频道与 userFills 频道对同一笔成交得到相同 fill_id,从而只累计一次。
  3. 实测结果(两次测试网捕获,数据完全一致)

    项目 结论
    实测时间 2026-02-22 12:27 / 2026-02-22 12:45(两次)
    userFills 顶层结构 data = {"isSnapshot": bool, "user": "0x...", "fills": [WsFill, ...]}
    fills 解析方式 必须取 data["fills"],不可直接用 data
    WsFill 字段 coin, px(str), sz(str), side(str), time(int ms), startPosition, dir, closedPnl, hash, oid(int), crossed, fee(str), tid(int), feeToken, twapId
    tid 存在性 30+ 条 fill 均有 tid(整数),hash 备用路径极少触发
    px / sz 类型 字符串(如 "0.01804", "14288.0"),_make_fill_id 直接使用原始字符串,不做 float 转换
    oid 类型 整数(如 48913699327),OrderFilledEvent 存为 str(oid)
    side 值 "A"(Ask/卖出)或 "B"(Bid/买入),直接透传,无需转换
    orderUpdates 结构 data = [{"order": {...}, "status": "...", "statusTimestamp": int}],直接数组

    关键验证:设计文档提出的 raw.get("fills", []) if isinstance(raw, dict) else raw 解析方式与实测数据结构完全匹配,可正确提取 fills 数组。


测试矩阵

场景 验证点
仅 userFills 到达(常规路径) 订单正确进入 FILLED,加权均价和手数正确
仅 user 频道 fills 到达 订单正确进入 FILLED,与上一行行为一致
user 频道与 userFills 同时到达同一笔 fill(串行) _fill_ids 去重,仅累计一次,加权均价不偏移
user 频道与 userFills 并发到达(多线程) _lock 保护有效,不出现重复累计,加权均价正确
userFills 先到,orderUpdates 后到 fills 累计后,orderUpdates 到达时触发 _resolve
orderUpdates 先到(_ws_status 已设),userFills 后到 fill 到达时立即触发 _resolve
metadata 为空或缺少 fill_id handler 记录 WARNING 并跳过,不崩溃,不产生错误 key
_make_fill_id 对同一 fill 两路结果一致性验证 user 频道与 userFills 频道对同一笔成交生成完全相同的 fill_id
manager 重建后重新注入 event_bus 旧 handler 已 unsubscribe,新 handler 仅订阅一次,无残留
manager close() 与 event 处理并发 unsubscribe 后 handler 不再被调用,无竞态崩溃
orderUpdates 的 canceled/rejected 行为不变,grace 超时与 HTTP 兜底逻辑不受影响
重连后缓冲 orderUpdates 回放 仅 orderUpdates 回放,userFills 不回放,行为符合预期
event_bus=None(测试模式) 构造函数输出 WARNING,不崩溃,旧路径(_on_user_fill)仍可独立测试

变更文件汇总

文件 变更内容
src/utils/websocket/enhanced_ws_manager.py 新增模块级 _make_fill_id(fill)_publish_user_events 调用 _make_fill_id 并补 fill_id;_cache_latest_data 新增 userFills 分支,修正 data 解析路径(data["fills"]),调用 _make_fill_id 发布事件
src/trading/websocket_order_manager.py 构造函数增加 event_bus 参数,缺省时打印 WARNING;新增 _on_order_filled_event handler,强依赖 metadata["fill_id"];迁移 _fill_key 逻辑与 _make_fill_id 对齐;close/reset 时 unsubscribe;步骤三后可移除已失效的 _on_user_fill
src/trading/executor.py 创建 WebSocketOrderManager 时传入 event_bus
src/services/realtime_kline_service_base.py on_message 和缓冲回放中,userFills 不再路由到 handle_message(步骤三)

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG