OrderFilledEvent 接入订单追踪系统12

双 WS 全 EventBus 重构

问题陈述

orderUpdatesuserFills 通过 realtime_kline_service_base.on_message()mgr.handle_message() 直连调用 WebSocketOrderManager,而 user 频道的 fills/positions/balance 已走 EventBus。这导致:

  1. 半事件驱动 — 同一消费者(WebSocketOrderManager)两条管道:EventBus + 直连
  2. 冗余基础设施realtime_kline_service_base 维护消息缓冲区(_order_msg_buffer)+ flush 线程 + 回放逻辑,仅服务于直连路径
  3. 协议耦合handle_message / _on_order_update / _on_user_fill 直接解析 Hyperliquid 原始 WS 消息结构
  4. user 频道 fill 浪费_publish_user_events 发布 OrderFilledEvent 但 EventBus 上零订阅者

本次变更:orderUpdates 和 userFills 全部通过 EventBus 投递到 WebSocketOrderManager,彻底删除直连路径、消息缓冲和 user 频道 fill 发布。


架构决策

决策 结论 理由
路由策略 orderUpdates + userFills 统一走 EventBus 消除半事件驱动架构
fill 来源 userFills 频道 user 频道 fill 是冗余副本
切换方式 原子切换,旧路径全删 不留兼容代码
event_bus 参数 WebSocketOrderManager 必选依赖 构造时即 TypeError,不在运行时静默丢数据
make_fill_id 仅用 tid,无 hash 备用路径 实测 30+ fill 均有 tid
isSnapshot 发布端过滤,不进 EventBus 避免无效事件消耗
消息缓冲 删除 EventBus 同步投递 + HTTP 兜底足够覆盖

目标架构

enhanced_ws_manager._cache_latest_data
    ├── channel == "orderUpdates"
    │       └── _publish_order_status_events(data)
    │               └── for item in data:
    │                       └── publish(OrderStatusEvent)
    │
    └── channel == "userFills"
            └── _publish_fill_events(data)
                    └── isSnapshot? → 跳过
                        for fill in fills:
                            └── publish(OrderFilledEvent, metadata={"fill_id": "tid:{tid}"})

EventBus(同步)
    ├── WebSocketOrderManager._on_order_status_event()
    │       └── 状态识别 → _ws_status → grace timer / _resolve()
    │
    └── WebSocketOrderManager._on_order_filled_event()
            └── _fill_ids 去重 → _accumulate_fill() → _resolve()

实测数据结构(测试网,2026-02-22)

orderUpdates

{"channel": "orderUpdates", "data": [{"order": {"oid": 12345, "avgPx": "0.018", "limitPx": "0.019", "totalSz": "100"}, "status": "filled"}]}

userFills

{"channel": "userFills", "data": {"isSnapshot": false, "user": "0x...", "fills": [{"oid": 12345, "tid": 67890, "px": "0.01804", "sz": "50", "side": "B", "coin": "PURR", "fee": "0.009", "cloid": null}]}}
字段
WsFill.tid 整数,30+ 条 fill 均存在
WsFill.px / sz 字符串
WsFill.oid 整数
WsFill.side "A"(卖)或 "B"(买)

删除清单

以下代码直接删除,不留残留:

文件 删除内容
trading_events.py import hashlib_make_fill_id 中的 hash 路径
enhanced_ws_manager.py _publish_user_events 中的 fill 循环(L1422-1455);旧 _make_fill_id import
websocket_order_manager.py handle_message 方法整体;_on_order_update 方法整体;_on_user_fill 方法整体;import _make_fill_id
realtime_kline_service_base.py _order_msg_buffer 初始化;_buffer_flush_thread 创建+启动;_periodic_buffer_flush 方法;_get_ws_order_manager 方法;on_message 中全部订单路由+缓冲回放逻辑

变更代码

src/events/trading_events.py

_make_fill_idmake_fill_id,删 hash 路径:

def make_fill_id(fill: dict) -> str | None:
    """返回 fill 去重 key。tid 缺失返回 None,调用方跳过。"""
    tid = fill.get("tid")
    if tid is None:
        return None
    return f"tid:{tid}"

新增 OrderStatusEvent

@dataclass(kw_only=True)
class OrderStatusEvent(Event):
    """订单状态更新事件

    触发时机:WebSocket orderUpdates 频道推送
    订阅者:WebSocketOrderManager(订单追踪状态机)

    字段映射(Hyperliquid → 本事件):
    - order.oid                    → order_id (str)
    - item.status / order.status   → status (lowercase)
    - avgPx > limitPx > px 级联    → fallback_price
    - totalSz > origSz 级联        → fallback_size
    """
    order_id: str = ""
    status: str = ""            # "filled", "canceled", "rejected", etc.
    fallback_price: float = 0.0
    fallback_size: float = 0.0

    def __post_init__(self):
        super().__post_init__()
        self.priority = EventPriority.CRITICAL

OrderFilledEvent docstring 更新:触发来源改为 userFills 频道


src/utils/websocket/enhanced_ws_manager.py

新增模块级 helper

def _first_pos(*values) -> float:
    """取 values 中第一个正数 float,全部无效返回 0.0"""
    for v in values:
        try:
            f = float(v)
            if f > 0:
                return f
        except (TypeError, ValueError):
            continue
    return 0.0

_cache_latest_data 新增两个分支(在 elif channel == "user" 之后):

elif channel == "orderUpdates":
    self._publish_order_status_events(msg.get("data"))

elif channel == "userFills":
    self._publish_fill_events(msg.get("data", {}))

新增 _publish_order_status_events

def _publish_order_status_events(self, raw_data) -> None:
    """将 orderUpdates 原始数据拆分为逐条 OrderStatusEvent"""
    if not raw_data:
        return
    items = raw_data if isinstance(raw_data, list) else [raw_data]
    for item in items:
        if not isinstance(item, dict):
            continue
        order = item.get("order")
        if not isinstance(order, dict):
            continue
        oid = order.get("oid")
        if oid is None:
            continue
        status_str = (
            item.get("status") or order.get("status") or ""
        ).lower()
        if not status_str:
            continue
        self._event_bus.publish(OrderStatusEvent(
            timestamp=datetime.now(),
            source="websocket",
            order_id=str(oid),
            status=status_str,
            fallback_price=_first_pos(
                order.get("avgPx"), order.get("limitPx"), order.get("px")
            ),
            fallback_size=_first_pos(
                order.get("totalSz"), order.get("origSz")
            ),
        ))

新增 _publish_fill_events

def _publish_fill_events(self, data) -> None:
    """将 userFills 数据拆分为逐笔 OrderFilledEvent"""
    if isinstance(data, dict):
        if data.get("isSnapshot"):
            return
        fills = data.get("fills", [])
    elif isinstance(data, list):
        fills = data
    else:
        return

    for fill in fills:
        if not isinstance(fill, dict):
            continue
        fill_id = make_fill_id(fill)
        if fill_id is None:
            logger.error("userFills fill 缺少 tid,跳过: %s", fill)
            continue
        oid = fill.get("oid")
        if oid is None:
            continue
        self._event_bus.publish(OrderFilledEvent(
            timestamp=datetime.now(),
            source="websocket",
            order_id=str(oid),
            client_order_id=fill.get("cloid"),
            symbol=fill.get("coin", ""),
            side=fill.get("side", ""),
            filled_qty=float(fill.get("sz", 0)),
            filled_price=float(fill.get("px", 0)),
            fee=float(fill.get("fee", 0)),
            closed_pnl=(
                float(fill.get("closedPnl", 0))
                if "closedPnl" in fill else None
            ),
            metadata={"fill_id": fill_id},
        ))

_publish_user_events:删除 fill 循环(# ── 2. 订单成交事件 整个 if 块),仅保留持仓和余额事件。


src/trading/executor.py

self._ws_order_manager = WebSocketOrderManager(
    executor=self,
    event_bus=self._event_bus,
)

src/trading/websocket_order_manager.py

import 变更

# 新增
from src.events.event_bus import EventBus
from src.events.trading_events import OrderFilledEvent, OrderStatusEvent

# 删除
from src.events.trading_events import _make_fill_id

构造函数event_bus 为必选参数,构造时订阅两个事件:

def __init__(self, executor, event_bus: EventBus):
    self._executor = executor
    self._tracking: dict[int, OrderTracking] = {}
    self._lock = threading.Lock()
    self._http_busy: set[int] = set()
    self._event_bus = event_bus
    event_bus.subscribe(OrderStatusEvent, self._on_order_status_event)
    event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)

新增 _on_order_status_event(替代旧 _on_order_update,从 typed event 获取字段,不再解析原始 WS 消息):

def _on_order_status_event(self, event: OrderStatusEvent) -> None:
    oid = _safe_int_oid(event.order_id)
    if oid is None:
        return

    status_str = event.status
    should_resolve = False
    resolve_tracking = None

    with self._lock:
        tracking = self._tracking.get(oid)
        if not tracking or tracking.status != OrderStatus.PENDING:
            return

        if status_str == "filled":
            tracking._ws_status = OrderStatus.FILLED
            tracking._fallback_px = event.fallback_price
            tracking._fallback_sz = event.fallback_size
            logger.info(
                f"orderUpdate FILLED: {tracking.coin} oid={oid} "
                f"fallback_px={tracking._fallback_px} "
                f"fallback_sz={tracking._fallback_sz} "
                f"has_fill={'Y' if tracking.has_fill_price else 'N'}"
            )
            if tracking.has_fill_price:
                should_resolve = True
                resolve_tracking = tracking
            else:
                if tracking._grace_timer is None:
                    timer = threading.Timer(
                        self._FILL_GRACE_SEC, self._resolve,
                        [oid, tracking]
                    )
                    timer.daemon = True
                    tracking._grace_timer = timer
                    timer.start()

        elif status_str in ("canceled", "margincanceled"):
            logger.info(f"orderUpdate CANCELED: {tracking.coin} oid={oid}")
            tracking._ws_status = OrderStatus.CANCELED
            should_resolve = True
            resolve_tracking = tracking

        elif "rejected" in status_str:
            if status_str != "rejected":
                logger.info(
                    f"orderUpdate 拒绝变体: {tracking.coin} oid={oid} status={status_str!r}"
                )
            tracking._ws_status = OrderStatus.REJECTED
            should_resolve = True
            resolve_tracking = tracking

    if should_resolve:
        self._resolve(oid, resolve_tracking)

新增 _on_order_filled_event(替代旧 _on_user_fill):

def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
    oid = _safe_int_oid(event.order_id)
    if oid is None:
        return

    if event.filled_price <= 0 or event.filled_qty <= 0:
        return

    fill_id = event.metadata.get("fill_id") if event.metadata else None
    if not fill_id:
        return

    should_resolve = False
    resolve_tracking = None

    with self._lock:
        tracking = self._tracking.get(oid)
        if not tracking or tracking.status != OrderStatus.PENDING:
            return
        if fill_id in tracking._fill_ids:
            return

        tracking._fill_ids.add(fill_id)
        self._accumulate_fill(tracking, event.filled_price, event.filled_qty)
        tracking.has_fill_price = True
        tracking.fill_count += 1
        logger.info(
            f"userFill 累计: {tracking.coin} oid={oid} "
            f"本笔 px={event.filled_price} sz={event.filled_qty} | "
            f"累计 avg_px={tracking.avg_price:.6g} "
            f"total_sz={tracking.filled_size:.6g} "
            f"fills={tracking.fill_count} fid={fill_id}"
        )
        if tracking._ws_status is not None:
            should_resolve = True
            resolve_tracking = tracking

    if should_resolve:
        self._resolve(oid, resolve_tracking)

_resolve 内部通过 identity check(current is not tracking)保证幂等。

shutdown():取消订阅先于清理:

def shutdown(self):
    self._event_bus.unsubscribe(OrderStatusEvent, self._on_order_status_event)
    self._event_bus.unsubscribe(OrderFilledEvent, self._on_order_filled_event)
    with self._lock:
        pending = [
            (oid, t) for oid, t in list(self._tracking.items())
            if t.status == OrderStatus.PENDING
        ]
    for oid, tracking in pending:
        self._resolve(oid, tracking, OrderStatus.CANCELED)

src/services/realtime_kline_service_base.py

删除全部订单相关代码

位置 删除内容
__init__ self._order_msg_buffer: deque = deque(maxlen=500)
_setup_ws_manager _buffer_flush_thread 创建
start _buffer_flush_thread.start()
on_message 缓冲区回放块 + 订单消息路由块(整个 if channel in ("orderUpdates", "userFills") 及其 buffer 逻辑)
方法级 _periodic_buffer_flush 整体删除
方法级 _get_ws_order_manager 整体删除

on_message 不再有任何订单相关代码,仅保留 K 线处理。


消息缓冲删除理由

旧缓冲区存在的原因:WebSocketOrderManager 可能在 WS 消息到达时尚未初始化。

全 EventBus 方案下无需缓冲:

  1. 启动时序WebSocketOrderManagerexecutor.__init__ 中创建并订阅 EventBus,先于 WS 连接建立。启动前无订单被 track,即使事件丢失也无影响。
  2. 重连场景WebSocketOrderManager 生命周期内 EventBus 订阅始终有效,WS 重连不影响事件投递。verify_pending_orders() 双轮 HTTP 补查覆盖断连窗口。
  3. EventBus 同步投递 — 发布即消费,无异步丢失风险。

测试矩阵

场景 验证点
常规 orderUpdates filled OrderStatusEvent 发布 → _on_order_status_event 收到 → _ws_status=FILLED
常规 userFills 单笔成交 OrderFilledEvent 发布 → 订单进入 FILLED,avg_price 与 REST 查询误差 < 0.01%
多笔部分成交(同一 oid) 每笔独立累计,avg_price 滚动正确,fill_count 准确
同一 fill 重复推送 _fill_ids 去重,_accumulate_fill 仅调用一次
fill 缺少 tid 发布端 ERROR 日志,跳过,不进 EventBus
fill 缺少 oid 发布端跳过
userFills 早于 orderUpdates 到达 fills 累计后,OrderStatusEvent 触发 _resolve
orderUpdates 早于 userFills 到达 _ws_status 已设 + grace timer,fill 到达立即 _resolve
_resolve 并发调用 identity check 保证幂等,result_event.set() 仅一次
userFills 快照(isSnapshot=true) 发布端直接跳过,不进 EventBus
orderUpdates canceled / rejected 立即 _resolve,无需等待 fill
manager shutdown() 与事件并发 unsubscribe 先于清理,handler 不再被调用
event_bus 未传入 构造时即 TypeError
WS 重连 verify_pending_orders 双轮 HTTP 补查正常工作

变更文件汇总

文件 变更内容
src/events/trading_events.py 新增 OrderStatusEvent_make_fill_idmake_fill_id(去下划线,删 hash 路径,返回 str | None);更新 OrderFilledEvent docstring
src/utils/websocket/enhanced_ws_manager.py 新增 _first_pos helper;_cache_latest_data 新增 orderUpdates + userFills 分支;新增 _publish_order_status_events + _publish_fill_events_publish_user_events 删 fill 循环;import 更新
src/trading/websocket_order_manager.py handle_message / _on_order_update / _on_user_fillevent_bus 改必选;新增 _on_order_status_event + _on_order_filled_eventshutdown() 新增 unsubscribe;import 更新
src/trading/executor.py 创建 WebSocketOrderManager 时传入 event_bus
src/services/realtime_kline_service_base.py 删除 _order_msg_buffer / _buffer_flush_thread / _periodic_buffer_flush / _get_ws_order_manager / on_message 全部订单路由

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG