OrderFilledEvent 接入订单追踪系统11

OrderFilledEvent 接入订单追踪系统

问题陈述

_publish_user_events(user 频道)已发布 OrderFilledEvent,但 EventBus 上零订阅者,成交数据完全丢失。
_on_user_fill(userFills 频道)通过旧直连路径绕过 EventBus 直接调用 _accumulate_fill,与事件驱动架构方向相反。

本次变更:以 userFills 为唯一 fill 来源,通过 EventBus 接入 WebSocketOrderManager,同时删除全部旧路径。


架构决策

决策 结论
fill 来源 userFills 频道,user 频道不再处理 fill
切换方式 原子切换,旧路径与新路径在同一次变更中完成
event_bus 参数 WebSocketOrderManager 必选依赖,不提供 None 默认值
make_fill_id 仅用 tid,无 hash 备用路径(实测 30+ fill 均有 tid)

目标架构

enhanced_ws_manager._cache_latest_data
    └── channel == "userFills"
            └── for fill in data["fills"]:
                    └── publish(OrderFilledEvent, metadata={"fill_id": "tid:{tid}"})

EventBus(同步)
    └── WebSocketOrderManager._on_order_filled_event()
            └── _fill_ids 去重 → _accumulate_fill()

实测数据结构(测试网,2026-02-22)

字段
userFills 顶层 data = {"isSnapshot": bool, "user": "0x...", "fills": [WsFill, ...]}
WsFill.tid 整数,30+ 条 fill 均存在
WsFill.px / sz 字符串(如 "0.01804"
WsFill.oid 整数
WsFill.side "A"(卖)或 "B"(买)

删除清单

以下代码直接删除,不留残留:

文件 删除内容
websocket_order_manager.py _on_user_fill 方法(整体)
websocket_order_manager.py handle_messagechannel == "userFills" 分支
enhanced_ws_manager.py _publish_user_events 中的 fill 循环逻辑

变更代码

src/events/trading_events.py

def make_fill_id(fill: dict) -> str | None:
    """
    返回 fill 去重 key。
    tid 缺失时返回 None,调用方负责记录错误并跳过。
    """
    tid = fill.get("tid")
    if tid is None:
        return None
    return f"tid:{tid}"

_make_fill_id 同步重命名(去下划线)并删除 hash 备用路径。


src/utils/websocket/enhanced_ws_manager.py

_publish_user_events:删除 fill 循环,仅保留其他事件类型。

_cache_latest_data:新增 userFills 分支:

elif channel == "userFills":
    fills = msg.get("data", {}).get("fills", [])
    for fill in fills:
        fill_id = make_fill_id(fill)
        if fill_id is None:
            logger.error("userFills fill 缺少 tid,跳过: %s", fill)
            continue
        oid = fill.get("oid")
        if oid is None:
            continue
        self._event_bus.publish(OrderFilledEvent(
            timestamp=datetime.now(),
            source="websocket",
            order_id=str(oid),
            client_order_id=fill.get("cloid"),
            symbol=fill.get("coin", ""),
            side=fill.get("side", ""),
            filled_qty=float(fill.get("sz", 0)),
            filled_price=float(fill.get("px", 0)),
            fee=float(fill.get("fee", 0)),
            closed_pnl=float(fill.get("closedPnl", 0)) if "closedPnl" in fill else None,
            metadata={"fill_id": fill_id},
        ))

src/trading/executor.py

self._ws_order_manager = WebSocketOrderManager(
    executor=self,
    event_bus=self._event_bus,   # 必选参数
)

src/trading/websocket_order_manager.py

构造函数event_bus 为必选参数,构造时完成订阅:

def __init__(self, executor, event_bus: EventBus):
    self._executor = executor
    self._tracking: dict[int, OrderTracking] = {}
    self._lock = threading.Lock()
    self._http_busy: set[int] = set()
    self._event_bus = event_bus
    event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)

新增 handler

def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
    try:
        oid = int(event.order_id)
    except (ValueError, TypeError):
        return

    if event.filled_price <= 0 or event.filled_qty <= 0:
        return

    fill_id = event.metadata.get("fill_id") if event.metadata else None
    if not fill_id:
        logger.error("OrderFilledEvent 缺少 fill_id,oid=%s,跳过", oid)
        return

    should_resolve = False
    resolve_tracking = None

    with self._lock:
        tracking = self._tracking.get(oid)
        if tracking is None or tracking.status != OrderStatus.PENDING:
            return
        if fill_id in tracking._fill_ids:
            return

        tracking._fill_ids.add(fill_id)
        self._accumulate_fill(tracking, event.filled_price, event.filled_qty)
        tracking.has_fill_price = True
        tracking.fill_count += 1

        if tracking._ws_status is not None:
            should_resolve = True
            resolve_tracking = tracking

    if should_resolve:
        self._resolve(oid, resolve_tracking)

_resolve 内部通过 identity check(current is not tracking)保证幂等:第一次调用成功 pop,第二次因 current is None 直接返回 False。

shutdown():取消订阅先于清理执行:

def shutdown(self):
    self._event_bus.unsubscribe(OrderFilledEvent, self._on_order_filled_event)
    with self._lock:
        pending = [
            (oid, t) for oid, t in list(self._tracking.items())
            if t.status == OrderStatus.PENDING
        ]
    for oid, tracking in pending:
        self._resolve(oid, tracking, OrderStatus.CANCELED)

src/services/realtime_kline_service_base.py

# 变更前
if channel in ("orderUpdates", "userFills"):
    mgr.handle_message(msg)

# 变更后
if channel == "orderUpdates":
    mgr.handle_message(msg)

缓冲区回放同步修改,userFills 不回放(重连补偿通过现有 HTTP 兜底处理)。


测试矩阵

场景 验证点
常规 userFills 单笔成交 订单进入 FILLED,avg_price 与 REST 查询误差 < 0.01%
多笔部分成交(同一 oid) 每笔独立累计,avg_price 滚动正确,fill_count 准确
同一 fill 重复推送 _fill_ids 去重,_accumulate_fill 仅调用一次
fill 缺少 tid 发布端 ERROR 日志,跳过,不崩溃,不写入 EventBus
fill 缺少 oid 发布端跳过,不崩溃
userFills 早于 orderUpdates 到达 fills 累计后,orderUpdates 触发 _resolve
orderUpdates 早于 userFills 到达 fill 到达时 _ws_status 已设,立即触发 _resolve
_resolve 并发调用(单元测试) identity check 保证幂等,result_event.set() 仅执行一次
userFills 快照(isSnapshot=true)含历史 fill tracking is None 静默跳过,不影响当前 PENDING 订单
manager shutdown() 与 fill 并发 unsubscribe 先于清理,handler 不再被调用,无竞态
event_bus 未传入 构造时即 TypeError,启动失败,不在运行时静默丢数据

变更文件汇总

文件 变更内容
src/events/trading_events.py _make_fill_idmake_fill_id(去下划线,删 hash 路径)
src/utils/websocket/enhanced_ws_manager.py _publish_user_events 删 fill 循环;_cache_latest_data 新增 userFills 分支
src/trading/websocket_order_manager.py _on_user_fill;删 handle_message userFills 分支;event_bus 改必选;新增 _on_order_filled_eventshutdown() 新增 unsubscribe
src/trading/executor.py 创建 WebSocketOrderManager 时传入 event_bus
src/services/realtime_kline_service_base.py on_message 和缓冲回放移除 userFills 路由

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG