OrderFilledEvent 接入订单追踪系统

OrderFilledEvent 接入订单追踪系统

目标

  • 补上“成交事件有发布无消费”的缺口,让订单追踪系统成为 OrderFilledEvent 的消费者。
  • 统一成交入口:所有 fill(user 频道 data.fills + userFills 频道)都经 EventBus 以 OrderFilledEvent 发布,订单追踪只通过订阅事件更新状态,逻辑更清晰、易扩展。

设计要点

  • 单一入口:订单追踪的“应用一笔成交”逻辑只在一个地方执行——对 OrderFilledEvent 的订阅 handler(与现有 _on_user_fill 逻辑等价)。
  • 避免重复:userFills 不再走 handle_message_on_user_fill,仅通过 _cache_latest_data 发布 OrderFilledEvent,由 WebSocketOrderManager 订阅后处理;orderUpdates 仍走 handle_message_on_order_update(终态与 grace 逻辑不变)。
  • 去重:事件中携带可唯一标识一笔 fill 的 key(如 metadata["fill_id"]),WebSocketOrderManager 继续用现有的 _fill_ids 做去重。
  • 双源:user 频道已有 _publish_user_events 发布 OrderFilledEvent;在 _cache_latest_data 中为 userFills 增加分支,解析后对每条 fill 发布 OrderFilledEvent(字段与现有事件定义一致,fill_id 放 metadata)。

实现步骤

1. 从 userFills 发布 OrderFilledEvent

文件src/utils/websocket/enhanced_ws_manager.py

  • _cache_latest_data() 中增加 channel == "userFills" 分支(与现有 "user" 并列)。
  • msg["data"] 取 fill 列表(兼容单条为 list 包装),对每条 fill:
    • 构造 OrderFilledEventorder_id=str(oid),symbol=coin,filled_qty=sz,filled_price=px,feeclosed_pnl 等,与 _publish_user_events 中 user.fills 的映射一致)。
    • 生成 fill 唯一 key:优先用 fill.get("tid"),否则用 oid:px:sz:time 的哈希(与 websocket_order_manager_fill_key 一致),写入 event.metadata["fill_id"]
    • self._event_bus.publish(event)

这样 user 与 userFills 两路都会向 EventBus 发布 OrderFilledEvent,且带可去重标识。

2. WebSocketOrderManager 订阅 OrderFilledEvent 并应用成交

文件src/trading/websocket_order_manager.py

  • 依赖 EventBus:构造函数增加参数 event_bus(可选,便于测试)。Executor 创建 manager 时传入:WebSocketOrderManager(executor=self, event_bus=self._event_bus)(见 executor 初始化约 143 行)。
  • 订阅:在 WebSocketOrderManager 初始化时(或由 Executor 在创建 manager 后调用一次 register_event_handlers())调用 event_bus.subscribe(OrderFilledEvent, self._on_order_filled_event)。若在 manager 内订阅,需在构造末尾、且保证只订阅一次。
  • Handler _on_order_filled_event(self, event: OrderFilledEvent)
    • oid = _safe_int_oid(event.order_id),无效则 return。
    • fill_id = event.metadata.get("fill_id") 或由 (order_id, filled_qty, filled_price, timestamp) 生成备用 key(与发布端一致)。
    • fill_px = event.filled_pricefill_sz = event.filled_qty,若 ≤0 则 return。
    • _lock 内:按 oid 查 _tracking;若不存在或非 PENDING 则 return;若 fill_id in tracking._fill_ids 则 return(去重);否则 _fill_ids.add(fill_id)_accumulate_fill(tracking, fill_px, fill_sz)has_fill_price=Truefill_count+=1;若 tracking._ws_status is not None 则标记需要 resolve。
    • 锁外如需要则调用 _resolve(oid, tracking)
  • 逻辑与现有 _on_user_fill 一致,仅输入从 raw message 改为 Event,保证行为不变、订单追踪统一由事件驱动。

3. 不再把 userFills 交给 handle_message

文件src/services/realtime_kline_service_base.py

  • on_message 中,当前将 orderUpdatesuserFills 都路由到 WebSocketOrderManager.handle_message(约 639–661 行)。改为: channel == "orderUpdates" 时调用 mgr.handle_message(msg)channel == "userFills"不再调用 handle_message(不再把 userFills 原始消息交给订单管理器)。
  • userFills 仍会经 _wrapped_callback_cache_latest_data 发布 OrderFilledEvent,订单管理器通过订阅接收,避免同一笔 fill 既走 handle_message 又走事件导致重复累计。
  • 缓冲区回放逻辑中,若当前仍对缓冲消息按 channel 路由到 handle_message,则同样只对 orderUpdates 回放,不对 userFills 回放(或若希望重连后补发历史 fill,可考虑由 EventBus 的发布方在重连后通过 API 补拉并发布事件,而不是回放原始 userFills;本方案可先采用“userFills 不回放”,与“不把 userFills 交给 handle_message”一致)。

4. Executor 传入 EventBus

文件src/trading/executor.py

  • 创建 WebSocketOrderManager 时传入 event_bus:self._ws_order_manager = WebSocketOrderManager(executor=self, event_bus=self._event_bus)(约 143 行)。若 manager 构造函数签名增加 event_bus,此处同步修改。

5. OrderFilledEvent 的 fill 去重标识(可选但推荐)

文件src/events/trading_events.py

  • 在 OrderFilledEvent 的 docstring 中说明:发布方应将唯一 fill 标识放入 metadata["fill_id"],供订阅方去重。事件类本身已有 metadata(来自 Event 基类),无需新增字段。
  • 发布端(enhanced_ws_manager):在 user 频道的 _publish_user_events 里,对每条 fill 也写入 metadata["fill_id"](与 userFills 相同规则:tid 或 oid:px:sz:time 哈希),保证 user 与 userFills 两源去重一致。

6. 双 WebSocket / source 约定(若已存在)

若项目已区分行情 WS 与交易 WS(如《双WebSocket架构设计》),则 OrderFilledEvent 的订阅方(含 WebSocketOrderManager)应只处理 source == "trading"(或当前等价来源)。在 _on_order_filled_event 开头可加:若 event.source != getattr(self._executor, '_expected_ws_source', 'websocket') 则 return(具体常量名与架构文档一致即可)。当前若仅单 WS,可先不过滤或与现有 Executor 事件过滤保持一致。

测试与回归

  • 单 WS:下单后确认 userFills 到达时订单能正确 FILLED、加权价与手数正确,且无重复累计。
  • 若有 user 频道下发的 fills:确认与 userFills 同时存在时不会重复应用(fill_id 去重)。
  • orderUpdates 的 filled/canceled/rejected 与 grace 逻辑不变,超时与 HTTP 兜底行为不变。

小结

项目 说明
事件来源 user 频道 data.fills(已有)+ userFills 频道(新增发布)
消费者 WebSocketOrderManager 订阅 OrderFilledEvent,应用 fill 并去重、累计、解析
行为 与现有一致;入口统一为事件,便于后续加 TradeLogger、RiskManager 等订阅者

这样 OrderFilledEvent 不再“有发布无消费”,订单追踪统一由事件驱动,更智能且易扩展。

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG