订单跟踪系统BUG37
订单追踪系统 Bug 修复设计文档
背景
对两份 Bug 报告进行代码核实后,确认以下情况:
报告误判(不需要修)
- B1 双 EventBus:
EventBus是单例(__new__+ 双重检查锁),EventBus()永远返回同一实例,不存在隔离问题 - B3 Grace Timer 竞态:
_resolve在 lock 内执行pop(oid),第二次调用时 identity check 失败直接 return,天然幂等 - B8 TOCTOU:
_check_order_after_cancel函数不存在,_close_limit_leg_timeout降级路径已有fill_px > 0判断
真实存在、本次全部修复
| 编号 | 优先级 | 位置 | 问题 |
|---|---|---|---|
| B2 | P0 | enhanced_ws_manager.py |
add_subscriptions + remove_subscriptions 在 subscriptions_lock 内调用 ws.send(),锁内阻塞网络 IO,死锁路径成立 |
| B4 | P1 | websocket_order_manager.py |
_resolve 无论 px 是否为 0 都无条件 has_fill_price = True,导致 0.0 价格传入下游,屏蔽 backfill |
| B7 | P2 | websocket_order_manager.py |
track_order 替换旧 tracking 时未继承 _fill_ids,重连后历史 fill 重发被重复累计 |
| Bug3 | P2 | websocket_order_manager.py |
verify_pending_orders 过滤条件含 _ws_status is None,已进入宽限期的订单被排除在补查之外 |
| B6 | P3 | websocket_order_manager.py |
threading.Timer.cancel() 只设标志位不终止线程,高频场景下累积大量 sleep-waiting 线程 |
| B5 | 次要 | websocket_order_manager.py |
_monitor_order 的 except 块直接调用 _resolve,若 _resolve 自身抛异常,result_event 永不释放 |
| Bug2 | 防御 | websocket_order_manager.py |
_on_order_filled_event 未防空 fill_id,当前路径不触发但属潜在风险 |
修改文件范围
src/utils/websocket/enhanced_ws_manager.py(重写add_subscriptions和remove_subscriptions)src/trading/websocket_order_manager.py(全文件重写,彻底剔除threading.Timer)
一、enhanced_ws_manager.py — 修复 B2
问题根因
两个方法在 with self.subscriptions_lock: 内部直接调用 ws.send():
# 当前(有问题)
with self.subscriptions_lock: # 持有锁 L
...
self.ws.send(json.dumps(msg)) # 阻塞网络 IO 在锁内
死锁路径:WS 消息线程收到消息 → 发布事件 → Executor 处理 → 调用 add_subscriptions
→ 尝试获取锁 L → 永远等待(锁已被发送线程持有,发送线程在等 WS 消息线程处理完)。
新设计:两步走,IO 移至锁外
原则:锁内只做内存操作(读写 dict/list),锁外做网络 IO。
add_subscriptions 新实现
def add_subscriptions(self, new_subscriptions: list[dict]) -> bool:
if not new_subscriptions:
return True
try:
# 第一步:仅内存操作(在锁内),收集需要发送的消息
pending_sends: list[tuple] = [] # (sub_key, subscription, msg_json)
with self.subscriptions_lock:
for subscription in new_subscriptions:
sub_key = (
subscription.get('type'),
subscription.get('coin'),
subscription.get('interval'),
)
if sub_key in self.active_subscriptions:
continue
self.subscriptions.append(subscription)
if self.state == ConnectionState.CONNECTED and self._is_connected():
if self.ws and self.ws.keep_running:
msg_json = json.dumps({"method": "subscribe", "subscription": subscription})
pending_sends.append((sub_key, subscription, msg_json))
# else: 连接不稳定,仅加列表,重连时自动订阅
# 第二步:网络 IO 在锁外执行
failed = False
for sub_key, subscription, msg_json in pending_sends:
try:
self.ws.send(msg_json)
with self.subscriptions_lock:
self.active_subscriptions[sub_key] = subscription
except Exception as e:
logger.error(f"订阅发送失败: {subscription} | {e}")
with self.subscriptions_lock:
try:
self.subscriptions.remove(subscription)
except ValueError:
pass
failed = True
return not failed
except Exception as e:
logger.error(f"add_subscriptions 异常: {e}", exc_info=True)
return False
remove_subscriptions 新实现
def remove_subscriptions(self, subscriptions_to_remove: list[dict]) -> bool:
if not subscriptions_to_remove:
return True
try:
# 第一步:内存操作在锁内,同时收集待发送的取消订阅消息
pending_unsubs: list[str] = [] # msg_json list
with self.subscriptions_lock:
for subscription in subscriptions_to_remove:
sub_key = (
subscription.get('type'),
subscription.get('coin'),
subscription.get('interval'),
)
if sub_key not in self.active_subscriptions:
continue
# 先从内存移除(原子完成,锁内)
self.active_subscriptions.pop(sub_key, None)
self._remove_from_subscriptions_list(sub_key)
if self._is_connected():
msg_json = json.dumps({"method": "unsubscribe", "subscription": subscription})
pending_unsubs.append(msg_json)
# 第二步:网络 IO 在锁外
for msg_json in pending_unsubs:
try:
self.ws.send(msg_json)
except Exception as e:
logger.error(f"取消订阅发送失败: {e}")
return True
except Exception as e:
logger.error(f"remove_subscriptions 异常: {e}", exc_info=True)
return False
_remove_from_subscriptions_list(sub_key)是提取出的私有辅助方法,封装原有按 key 查找并移除的逻辑,在锁内调用。
二、websocket_order_manager.py — 全文件重写
全文件重写的核心目标:彻底剔除 threading.Timer,所有 bug 一次性修净,无残留死代码。
2.1 OrderTracking dataclass 变更
| 字段 | 操作 | 说明 |
|---|---|---|
_grace_timer: threading.Timer | None |
删除 | 彻底去除 Timer |
_grace_done: threading.Event |
新增 | 宽限期取消信号,set() 即取消 |
_grace_started: bool |
新增 | 防止同一 tracking 重复启动宽限线程 |
@dataclass
class OrderTracking:
coin: str
timeout_seconds: int
status: OrderStatus = OrderStatus.PENDING
avg_price: float = 0.0
filled_size: float = 0.0
has_fill_price: bool = False
fill_count: int = 0
result_event: ThreadEvent = field(default_factory=ThreadEvent)
# 内部状态
_ws_status: OrderStatus | None = field(default=None, repr=False)
_grace_done: threading.Event = field(default_factory=threading.Event, repr=False)
_grace_started: bool = field(default=False, repr=False)
_fill_ids: set = field(default_factory=set, repr=False)
2.2 track_order — 修复 B7(继承 _fill_ids)
def track_order(self, oid: int, coin: str, timeout_seconds: int = 600) -> OrderTracking:
oid = int(oid)
tracking = OrderTracking(coin=coin, timeout_seconds=timeout_seconds)
with self._lock:
old = self._tracking.get(oid)
if old:
logger.warning(
f"重复追踪 oid={oid},旧追踪被新追踪替代"
f"(旧状态: {old.status.value},ws终态: {old._ws_status})"
)
tracking._fill_ids = old._fill_ids.copy() # B7 修复:继承去重集合
old._grace_done.set() # 取消旧宽限等待(立即唤醒线程)
old.status = OrderStatus.CANCELED
old.result_event.set()
self._tracking[oid] = tracking
logger.info(f"注册订单追踪: {coin} oid={oid} timeout={timeout_seconds}s")
threading.Thread(
target=self._monitor_order, args=(oid, tracking), daemon=True
).start()
return tracking
2.3 _resolve — 修复 B4(has_fill_price 仅在价格有效时置 True)
def _resolve(self, oid: int, tracking: OrderTracking,
status: OrderStatus | None = None,
px: float = 0.0, sz: float = 0.0) -> bool:
with self._lock:
if self._tracking.get(oid) is not tracking:
return False
if tracking.status != OrderStatus.PENDING:
return False
final_status = status if status is not None else tracking._ws_status
if final_status is None:
return False
tracking._grace_done.set() # 取消宽限等待(替代 timer.cancel())
tracking.status = final_status
# B4 修复:仅当 px > 0 时才标记 has_fill_price = True
# px == 0 时保持 has_fill_price=False,让下游 _backfill_order_price 处理
if final_status == OrderStatus.FILLED and not tracking.has_fill_price:
if px > 0:
tracking.avg_price = px
if sz > 0:
tracking.filled_size = sz
tracking.has_fill_price = True
self._tracking.pop(oid, None)
tracking.result_event.set()
self._log_result(tracking, oid)
return True
2.4 _on_order_status_event + 新增 _grace_wait — 修复 B6(替换 Timer)
def _on_order_status_event(self, event: OrderStatusEvent) -> None:
oid = _safe_int_oid(event.order_id)
if oid is None:
return
status_str = event.status
should_resolve = False
resolve_tracking = None
with self._lock:
tracking = self._tracking.get(oid)
if not tracking or tracking.status != OrderStatus.PENDING:
return
if status_str == "filled":
tracking._ws_status = OrderStatus.FILLED
logger.info(
f"orderUpdate FILLED: {tracking.coin} oid={oid} "
f"fallback_px={event.price} has_fill={'Y' if tracking.has_fill_price else 'N'}"
)
if tracking.has_fill_price:
should_resolve = True
resolve_tracking = tracking
elif not tracking._grace_started:
tracking._grace_started = True
threading.Thread(
target=self._grace_wait,
args=(oid, tracking, event.price, event.size),
daemon=True,
).start()
elif status_str in ("canceled", "margincanceled"):
logger.info(f"orderUpdate CANCELED: {tracking.coin} oid={oid}")
tracking._ws_status = OrderStatus.CANCELED
should_resolve = True
resolve_tracking = tracking
elif "rejected" in status_str:
if status_str != "rejected":
logger.info(
f"orderUpdate 拒绝变体: {tracking.coin} oid={oid} status={status_str!r}"
)
tracking._ws_status = OrderStatus.REJECTED
should_resolve = True
resolve_tracking = tracking
if should_resolve:
self._resolve(oid, resolve_tracking)
def _grace_wait(self, oid: int, tracking: OrderTracking,
fallback_px: float, fallback_sz: float) -> None:
"""宽限期等待:等待 userFills 到达;超时则用 orderUpdates 的 limitPx 作兜底。
Event.wait(timeout) 返回 True = 被 set()(_resolve 已调用,正常退出)
返回 False = 超时(userFills 未到,使用兜底价结算)
"""
timed_out = not tracking._grace_done.wait(timeout=self._FILL_GRACE_SEC)
if timed_out:
self._resolve(oid, tracking, None, fallback_px, fallback_sz)
# 被 set() 时:_resolve 已由其他路径调用,此线程直接退出,无副作用
Timer vs Event 对比
threading.Timer |
threading.Event 方案 |
|
|---|---|---|
| 取消方式 | cancel() 仅设标志位,线程继续 sleep |
set() 立即唤醒线程,线程退出 |
| 线程残留 | cancel 后线程 sleep 直到超时 | 无残留,立即退出 |
| 内存 | Timer 对象 + 线程持有引用 | 仅 Event 对象(极小) |
2.5 _on_order_filled_event — 修复 Bug2(空 fill_id 防御)
def _on_order_filled_event(self, event: OrderFilledEvent) -> None:
oid = _safe_int_oid(event.order_id)
if oid is None:
return
if not event.fill_id: # Bug2 防御
logger.warning(f"fill_id 为空,跳过重复累计: oid={oid}")
return
if event.filled_price <= 0 or event.filled_qty <= 0:
return
should_resolve = False
resolve_tracking = None
with self._lock:
tracking = self._tracking.get(oid)
if not tracking or tracking.status != OrderStatus.PENDING:
return
if event.fill_id in tracking._fill_ids:
return
tracking._fill_ids.add(event.fill_id)
self._accumulate_fill(tracking, event.filled_price, event.filled_qty)
tracking.has_fill_price = True
tracking.fill_count += 1
logger.info(
f"userFill 累计: {tracking.coin} oid={oid} "
f"本笔 px={event.filled_price} sz={event.filled_qty} | "
f"累计 avg_px={tracking.avg_price:.6g} total_sz={tracking.filled_size:.6g} "
f"fills={tracking.fill_count} fid={event.fill_id}"
)
if tracking._ws_status is not None:
should_resolve = True
resolve_tracking = tracking
if should_resolve:
self._resolve(oid, resolve_tracking)
2.6 verify_pending_orders — 修复 Bug3(补查范围扩展)
def verify_pending_orders(self):
with self._lock:
pending = [
(oid, t) for oid, t in self._tracking.items()
if t.status == OrderStatus.PENDING # 移除 _ws_status is None 限制
] # 宽限期内(_ws_status=FILLED)的订单也纳入补查
if not pending:
return
logger.info(f"重连补查 {len(pending)} 个未完成订单")
retry_list = []
for i, (oid, tracking) in enumerate(pending):
if i > 0:
time.sleep(1)
result = self._http_check(oid)
if result in ("error", "busy"):
logger.warning(f"重连补查未完成: oid={oid} result={result},稍后重试")
retry_list.append((oid, tracking))
if not retry_list:
return
time.sleep(3)
for oid, tracking in retry_list:
result = self._http_check(oid)
if result in ("error", "busy"):
logger.error(f"重连补查二次重试仍失败,强制超时: oid={oid} result={result}")
self._resolve(oid, tracking, OrderStatus.TIMEOUT)
原因:重连后宽限期内订单(已收 orderUpdates 未收 userFills)的 fill 事件可能已随断连丢失,必须 HTTP 补查拿到真实 avgPx。HTTP 成功结算后,宽限线程 _grace_done.wait() 超时但 _resolve 会因 identity check 失败直接返回,无双重结算。
2.7 _monitor_order — 修复 B5(嵌套保护,result_event 一定释放)
def _monitor_order(self, oid: int, tracking: OrderTracking):
try:
self._monitor_order_inner(oid, tracking)
except Exception as e:
logger.error(f"监控线程异常: {tracking.coin} oid={oid} | {e}", exc_info=True)
try:
self._resolve(oid, tracking, OrderStatus.TIMEOUT)
except Exception as resolve_err:
logger.critical(
f"_resolve 失败,强制释放 result_event: {tracking.coin} oid={oid} | {resolve_err}"
)
tracking.result_event.set() # 最后兜底,防止 wait_for_order 永久阻塞
三、不修改的文件
| 文件 | 原因 |
|---|---|
realtime_kline_service_base.py |
B1 不存在(EventBus 单例),无需注入 event_bus |
executor.py |
B1 不存在,_event_bus 已是同一单例,无需改动 |
四、改动汇总
src/utils/websocket/enhanced_ws_manager.py
| 方法 | 改动 |
|---|---|
add_subscriptions |
重写:锁内仅内存操作,锁外执行 ws.send(),锁内更新 active_subscriptions |
remove_subscriptions |
重写:同样模式,锁内收集待发消息,锁外 ws.send() |
_remove_from_subscriptions_list |
新增私有辅助方法,消除 remove_subscriptions 内重复的按 key 查找逻辑 |
src/trading/websocket_order_manager.py
| 位置 | 改动 |
|---|---|
OrderTracking |
删 _grace_timer,加 _grace_done: threading.Event + _grace_started: bool |
track_order |
加 _fill_ids 继承;timer.cancel() → _grace_done.set() |
_resolve |
timer.cancel() → _grace_done.set();has_fill_price=True 仅在 px > 0 时执行 |
_on_order_status_event |
删 threading.Timer 创建;改为调用 _grace_wait 守护线程 |
_grace_wait(新增) |
替代 Timer 回调;Event.wait(timeout) 实现可即时取消的宽限等待 |
_on_order_filled_event |
头部加空 fill_id 防御 |
verify_pending_orders |
删 and t._ws_status is None 条件 |
_monitor_order |
_resolve 调用外加嵌套 try/except + result_event.set() 兜底 |
五、验证方法
| Bug | 验证方式 |
|---|---|
| B2 锁内IO | 在 add_subscriptions 调用期间并发推送 WS 消息,确认无死锁;日志中 subscriptions_lock 持有时间不含网络等待 |
| B4 Fill Price 0 | Mock query_order_status 返回 avgPx=0,调用 _resolve(status=FILLED, px=0);断言 has_fill_price=False,下游 _backfill_order_price 被调用 |
| B7 _fill_ids 继承 | 旧 tracking 已记录 fill_id="tid:1";调用 track_order 新建 tracking 后,再次推送 fill_id="tid:1" 的 fill 事件;断言 tracking.filled_size 不变 |
| Bug3 补查范围 | tracking 处于 _ws_status=FILLED 宽限期;调用 verify_pending_orders;断言该 oid 被纳入补查 |
| B6 Timer 泄漏 | 高频触发宽限期并立即 cancel(_grace_done.set());用 threading.active_count() 确认线程数不持续增长 |
| B5 监控异常 | 在 _resolve 内注入异常;确认 result_event.set() 被调用,wait_for_order() 正常返回 |
| Bug2 空 fill_id | 发布 fill_id="" 的 OrderFilledEvent;确认日志有警告,filled_size 不变 |