订单跟踪系统BUG37

订单追踪系统 Bug 修复设计文档

背景

对两份 Bug 报告进行代码核实后,确认以下情况:

报告误判(不需要修)

  • B1 双 EventBus:EventBus 是单例(__new__ + 双重检查锁),EventBus() 永远返回同一实例,不存在隔离问题
  • B3 Grace Timer 竞态:_resolve 在 lock 内执行 pop(oid),第二次调用时 identity check 失败直接 return,天然幂等
  • B8 TOCTOU:_check_order_after_cancel 函数不存在,_close_limit_leg_timeout 降级路径已有 fill_px > 0 判断

真实存在、本次全部修复

编号 优先级 位置 问题
B2 P0 enhanced_ws_manager.py add_subscriptions + remove_subscriptionssubscriptions_lock 内调用 ws.send(),锁内阻塞网络 IO,死锁路径成立
B4 P1 websocket_order_manager.py _resolve 无论 px 是否为 0 都无条件 has_fill_price = True,导致 0.0 价格传入下游,屏蔽 backfill
B7 P2 websocket_order_manager.py track_order 替换旧 tracking 时未继承 _fill_ids,重连后历史 fill 重发被重复累计
Bug3 P2 websocket_order_manager.py verify_pending_orders 过滤条件含 _ws_status is None,已进入宽限期的订单被排除在补查之外
B6 P3 websocket_order_manager.py threading.Timer.cancel() 只设标志位不终止线程,高频场景下累积大量 sleep-waiting 线程
B5 次要 websocket_order_manager.py _monitor_order 的 except 块直接调用 _resolve,若 _resolve 自身抛异常,result_event 永不释放
Bug2 防御 websocket_order_manager.py _on_order_filled_event 未防空 fill_id,当前路径不触发但属潜在风险

修改文件范围

  • src/utils/websocket/enhanced_ws_manager.py(重写 add_subscriptionsremove_subscriptions
  • src/trading/websocket_order_manager.py(全文件重写,彻底剔除 threading.Timer

一、enhanced_ws_manager.py — 修复 B2

问题根因

两个方法在 with self.subscriptions_lock: 内部直接调用 ws.send()

# 当前(有问题)
with self.subscriptions_lock:          # 持有锁 L
    ...
    self.ws.send(json.dumps(msg))      # 阻塞网络 IO 在锁内

死锁路径:WS 消息线程收到消息 → 发布事件 → Executor 处理 → 调用 add_subscriptions
→ 尝试获取锁 L → 永远等待(锁已被发送线程持有,发送线程在等 WS 消息线程处理完)。

新设计:两步走,IO 移至锁外

原则:锁内只做内存操作(读写 dict/list),锁外做网络 IO。

add_subscriptions 新实现

def add_subscriptions(self, new_subscriptions: list[dict]) -> bool:
    if not new_subscriptions:
        return True
    try:
        # 第一步:仅内存操作(在锁内),收集需要发送的消息
        pending_sends: list[tuple] = []   # (sub_key, subscription, msg_json)
        with self.subscriptions_lock:
            for subscription in new_subscriptions:
                sub_key = (
                    subscription.get('type'),
                    subscription.get('coin'),
                    subscription.get('interval'),
                )
                if sub_key in self.active_subscriptions:
                    continue
                self.subscriptions.append(subscription)
                if self.state == ConnectionState.CONNECTED and self._is_connected():
                    if self.ws and self.ws.keep_running:
                        msg_json = json.dumps({"method": "subscribe", "subscription": subscription})
                        pending_sends.append((sub_key, subscription, msg_json))
                    # else: 连接不稳定,仅加列表,重连时自动订阅

        # 第二步:网络 IO 在锁外执行
        failed = False
        for sub_key, subscription, msg_json in pending_sends:
            try:
                self.ws.send(msg_json)
                with self.subscriptions_lock:
                    self.active_subscriptions[sub_key] = subscription
            except Exception as e:
                logger.error(f"订阅发送失败: {subscription} | {e}")
                with self.subscriptions_lock:
                    try:
                        self.subscriptions.remove(subscription)
                    except ValueError:
                        pass
                failed = True

        return not failed
    except Exception as e:
        logger.error(f"add_subscriptions 异常: {e}", exc_info=True)
        return False

remove_subscriptions 新实现

def remove_subscriptions(self, subscriptions_to_remove: list[dict]) -> bool:
    if not subscriptions_to_remove:
        return True
    try:
        # 第一步:内存操作在锁内,同时收集待发送的取消订阅消息
        pending_unsubs: list[str] = []   # msg_json list
        with self.subscriptions_lock:
            for subscription in subscriptions_to_remove:
                sub_key = (
                    subscription.get('type'),
                    subscription.get('coin'),
                    subscription.get('interval'),
                )
                if sub_key not in self.active_subscriptions:
                    continue
                # 先从内存移除(原子完成,锁内)
                self.active_subscriptions.pop(sub_key, None)
                self._remove_from_subscriptions_list(sub_key)
                if self._is_connected():
                    msg_json = json.dumps({"method": "unsubscribe", "subscription": subscription})
                    pending_unsubs.append(msg_json)

        # 第二步:网络 IO 在锁外
        for msg_json in pending_unsubs:
            try:
                self.ws.send(msg_json)
            except Exception as e:
                logger.error(f"取消订阅发送失败: {e}")

        return True
    except Exception as e:
        logger.error(f"remove_subscriptions 异常: {e}", exc_info=True)
        return False

_remove_from_subscriptions_list(sub_key) 是提取出的私有辅助方法,封装原有按 key 查找并移除的逻辑,在锁内调用。


二、websocket_order_manager.py — 全文件重写

全文件重写的核心目标:彻底剔除 threading.Timer,所有 bug 一次性修净,无残留死代码。

2.1 OrderTracking dataclass 变更

字段 操作 说明
_grace_timer: threading.Timer | None 删除 彻底去除 Timer
_grace_done: threading.Event 新增 宽限期取消信号,set() 即取消
_grace_started: bool 新增 防止同一 tracking 重复启动宽限线程
@dataclass
class OrderTracking:
    coin: str
    timeout_seconds: int
    status: OrderStatus = OrderStatus.PENDING
    avg_price: float = 0.0
    filled_size: float = 0.0
    has_fill_price: bool = False
    fill_count: int = 0
    result_event: ThreadEvent = field(default_factory=ThreadEvent)
    # 内部状态
    _ws_status: OrderStatus | None = field(default=None,                    repr=False)
    _grace_done: threading.Event   = field(default_factory=threading.Event, repr=False)
    _grace_started: bool           = field(default=False,                   repr=False)
    _fill_ids: set                 = field(default_factory=set,             repr=False)

2.2 track_order — 修复 B7(继承 _fill_ids

def track_order(self, oid: int, coin: str, timeout_seconds: int = 600) -> OrderTracking:
    oid = int(oid)
    tracking = OrderTracking(coin=coin, timeout_seconds=timeout_seconds)
    with self._lock:
        old = self._tracking.get(oid)
        if old:
            logger.warning(
                f"重复追踪 oid={oid},旧追踪被新追踪替代"
                f"(旧状态: {old.status.value},ws终态: {old._ws_status})"
            )
            tracking._fill_ids = old._fill_ids.copy()   # B7 修复:继承去重集合
            old._grace_done.set()                        # 取消旧宽限等待(立即唤醒线程)
            old.status = OrderStatus.CANCELED
            old.result_event.set()
        self._tracking[oid] = tracking
    logger.info(f"注册订单追踪: {coin} oid={oid} timeout={timeout_seconds}s")
    threading.Thread(
        target=self._monitor_order, args=(oid, tracking), daemon=True
    ).start()
    return tracking

2.3 _resolve — 修复 B4(has_fill_price 仅在价格有效时置 True)

def _resolve(self, oid: int, tracking: OrderTracking,
             status: OrderStatus | None = None,
             px: float = 0.0, sz: float = 0.0) -> bool:
    with self._lock:
        if self._tracking.get(oid) is not tracking:
            return False
        if tracking.status != OrderStatus.PENDING:
            return False
        final_status = status if status is not None else tracking._ws_status
        if final_status is None:
            return False

        tracking._grace_done.set()      # 取消宽限等待(替代 timer.cancel())
        tracking.status = final_status

        # B4 修复:仅当 px > 0 时才标记 has_fill_price = True
        # px == 0 时保持 has_fill_price=False,让下游 _backfill_order_price 处理
        if final_status == OrderStatus.FILLED and not tracking.has_fill_price:
            if px > 0:
                tracking.avg_price = px
                if sz > 0:
                    tracking.filled_size = sz
                tracking.has_fill_price = True

        self._tracking.pop(oid, None)

    tracking.result_event.set()
    self._log_result(tracking, oid)
    return True

2.4 _on_order_status_event + 新增 _grace_wait — 修复 B6(替换 Timer)

def _on_order_status_event(self, event: OrderStatusEvent) -> None:
    oid = _safe_int_oid(event.order_id)
    if oid is None:
        return

    status_str = event.status
    should_resolve = False
    resolve_tracking = None

    with self._lock:
        tracking = self._tracking.get(oid)
        if not tracking or tracking.status != OrderStatus.PENDING:
            return

        if status_str == "filled":
            tracking._ws_status = OrderStatus.FILLED
            logger.info(
                f"orderUpdate FILLED: {tracking.coin} oid={oid} "
                f"fallback_px={event.price} has_fill={'Y' if tracking.has_fill_price else 'N'}"
            )
            if tracking.has_fill_price:
                should_resolve = True
                resolve_tracking = tracking
            elif not tracking._grace_started:
                tracking._grace_started = True
                threading.Thread(
                    target=self._grace_wait,
                    args=(oid, tracking, event.price, event.size),
                    daemon=True,
                ).start()

        elif status_str in ("canceled", "margincanceled"):
            logger.info(f"orderUpdate CANCELED: {tracking.coin} oid={oid}")
            tracking._ws_status = OrderStatus.CANCELED
            should_resolve = True
            resolve_tracking = tracking

        elif "rejected" in status_str:
            if status_str != "rejected":
                logger.info(
                    f"orderUpdate 拒绝变体: {tracking.coin} oid={oid} status={status_str!r}"
                )
            tracking._ws_status = OrderStatus.REJECTED
            should_resolve = True
            resolve_tracking = tracking

    if should_resolve:
        self._resolve(oid, resolve_tracking)


def _grace_wait(self, oid: int, tracking: OrderTracking,
                fallback_px: float, fallback_sz: float) -> None:
    """宽限期等待:等待 userFills 到达;超时则用 orderUpdates 的 limitPx 作兜底。

    Event.wait(timeout) 返回 True = 被 set()(_resolve 已调用,正常退出)
                        返回 False = 超时(userFills 未到,使用兜底价结算)
    """
    timed_out = not tracking._grace_done.wait(timeout=self._FILL_GRACE_SEC)
    if timed_out:
        self._resolve(oid, tracking, None, fallback_px, fallback_sz)
    # 被 set() 时:_resolve 已由其他路径调用,此线程直接退出,无副作用

Timer vs Event 对比

threading.Timer threading.Event 方案
取消方式 cancel() 仅设标志位,线程继续 sleep set() 立即唤醒线程,线程退出
线程残留 cancel 后线程 sleep 直到超时 无残留,立即退出
内存 Timer 对象 + 线程持有引用 仅 Event 对象(极小)

2.5 _on_order_filled_event — 修复 Bug2(空 fill_id 防御)

def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
    oid = _safe_int_oid(event.order_id)
    if oid is None:
        return
    if not event.fill_id:                                    # Bug2 防御
        logger.warning(f"fill_id 为空,跳过重复累计: oid={oid}")
        return
    if event.filled_price <= 0 or event.filled_qty <= 0:
        return

    should_resolve = False
    resolve_tracking = None

    with self._lock:
        tracking = self._tracking.get(oid)
        if not tracking or tracking.status != OrderStatus.PENDING:
            return
        if event.fill_id in tracking._fill_ids:
            return

        tracking._fill_ids.add(event.fill_id)
        self._accumulate_fill(tracking, event.filled_price, event.filled_qty)
        tracking.has_fill_price = True
        tracking.fill_count += 1
        logger.info(
            f"userFill 累计: {tracking.coin} oid={oid} "
            f"本笔 px={event.filled_price} sz={event.filled_qty} | "
            f"累计 avg_px={tracking.avg_price:.6g} total_sz={tracking.filled_size:.6g} "
            f"fills={tracking.fill_count} fid={event.fill_id}"
        )
        if tracking._ws_status is not None:
            should_resolve = True
            resolve_tracking = tracking

    if should_resolve:
        self._resolve(oid, resolve_tracking)

2.6 verify_pending_orders — 修复 Bug3(补查范围扩展)

def verify_pending_orders(self):
    with self._lock:
        pending = [
            (oid, t) for oid, t in self._tracking.items()
            if t.status == OrderStatus.PENDING      # 移除 _ws_status is None 限制
        ]                                           # 宽限期内(_ws_status=FILLED)的订单也纳入补查
    if not pending:
        return

    logger.info(f"重连补查 {len(pending)} 个未完成订单")
    retry_list = []

    for i, (oid, tracking) in enumerate(pending):
        if i > 0:
            time.sleep(1)
        result = self._http_check(oid)
        if result in ("error", "busy"):
            logger.warning(f"重连补查未完成: oid={oid} result={result},稍后重试")
            retry_list.append((oid, tracking))

    if not retry_list:
        return

    time.sleep(3)
    for oid, tracking in retry_list:
        result = self._http_check(oid)
        if result in ("error", "busy"):
            logger.error(f"重连补查二次重试仍失败,强制超时: oid={oid} result={result}")
            self._resolve(oid, tracking, OrderStatus.TIMEOUT)

原因:重连后宽限期内订单(已收 orderUpdates 未收 userFills)的 fill 事件可能已随断连丢失,必须 HTTP 补查拿到真实 avgPx。HTTP 成功结算后,宽限线程 _grace_done.wait() 超时但 _resolve 会因 identity check 失败直接返回,无双重结算。

2.7 _monitor_order — 修复 B5(嵌套保护,result_event 一定释放)

def _monitor_order(self, oid: int, tracking: OrderTracking):
    try:
        self._monitor_order_inner(oid, tracking)
    except Exception as e:
        logger.error(f"监控线程异常: {tracking.coin} oid={oid} | {e}", exc_info=True)
        try:
            self._resolve(oid, tracking, OrderStatus.TIMEOUT)
        except Exception as resolve_err:
            logger.critical(
                f"_resolve 失败,强制释放 result_event: {tracking.coin} oid={oid} | {resolve_err}"
            )
            tracking.result_event.set()     # 最后兜底,防止 wait_for_order 永久阻塞

三、不修改的文件

文件 原因
realtime_kline_service_base.py B1 不存在(EventBus 单例),无需注入 event_bus
executor.py B1 不存在,_event_bus 已是同一单例,无需改动

四、改动汇总

src/utils/websocket/enhanced_ws_manager.py

方法 改动
add_subscriptions 重写:锁内仅内存操作,锁外执行 ws.send(),锁内更新 active_subscriptions
remove_subscriptions 重写:同样模式,锁内收集待发消息,锁外 ws.send()
_remove_from_subscriptions_list 新增私有辅助方法,消除 remove_subscriptions 内重复的按 key 查找逻辑

src/trading/websocket_order_manager.py

位置 改动
OrderTracking _grace_timer,加 _grace_done: threading.Event + _grace_started: bool
track_order _fill_ids 继承;timer.cancel()_grace_done.set()
_resolve timer.cancel()_grace_done.set()has_fill_price=True 仅在 px > 0 时执行
_on_order_status_event threading.Timer 创建;改为调用 _grace_wait 守护线程
_grace_wait(新增) 替代 Timer 回调;Event.wait(timeout) 实现可即时取消的宽限等待
_on_order_filled_event 头部加空 fill_id 防御
verify_pending_orders and t._ws_status is None 条件
_monitor_order _resolve 调用外加嵌套 try/except + result_event.set() 兜底

五、验证方法

Bug 验证方式
B2 锁内IO add_subscriptions 调用期间并发推送 WS 消息,确认无死锁;日志中 subscriptions_lock 持有时间不含网络等待
B4 Fill Price 0 Mock query_order_status 返回 avgPx=0,调用 _resolve(status=FILLED, px=0);断言 has_fill_price=False,下游 _backfill_order_price 被调用
B7 _fill_ids 继承 旧 tracking 已记录 fill_id="tid:1";调用 track_order 新建 tracking 后,再次推送 fill_id="tid:1" 的 fill 事件;断言 tracking.filled_size 不变
Bug3 补查范围 tracking 处于 _ws_status=FILLED 宽限期;调用 verify_pending_orders;断言该 oid 被纳入补查
B6 Timer 泄漏 高频触发宽限期并立即 cancel(_grace_done.set());用 threading.active_count() 确认线程数不持续增长
B5 监控异常 _resolve 内注入异常;确认 result_event.set() 被调用,wait_for_order() 正常返回
Bug2 空 fill_id 发布 fill_id=""OrderFilledEvent;确认日志有警告,filled_size 不变

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG