多币种进程卡死问题分析
BUG8: 全局锁导致系统运行后期卡死
发现时间: 2026-02-15
严重程度: Critical
状态: 待修复
影响: 运行数小时后分析队列堆满(80%+),所有 30 个 worker 停止消费,系统无分析输出
完整因果链
输入(新币上线) → 状态变化(订阅 + 队列入队) → 调用路径(worker → 数据补充 → API循环)
→ 出错点(全局锁串行化 30 个 worker) → 根因(Lock 内 sleep + 无总超时)
第 1 阶段:输入 — 新币种上线
_monitor_new_symbols 线程每小时扫描交易所 meta(realtime_kline_service_base.py:1607-1670):
# 发现新币种(如 AZTEC)
new_symbols = exchange_symbols - current_symbols
# 注册并动态订阅 WebSocket
self.symbols.append(symbol)
for interval in ['5m', '1h', '4h']:
new_subscriptions.append({"type": "candle", "coin": coin, "interval": interval})
状态变化
| 属性 | 变化 |
|---|---|
self.symbols |
+AZTEC/USDC:USDC |
| WebSocket 订阅 | +3 个 candle 订阅(5m, 1h, 4h) |
| 数据库 | AZTEC 在 DB 中无任何历史 K 线数据 |
第 2 阶段:状态变化 — WebSocket 消息涌入
AZTEC 的 5m K 线推送到达后,on_message(realtime_kline_service_base.py:573-651)处理:
# L627: 只有 5m 周期触发分析
if kline['timeframe'] != '5m':
return
# L638-639: 放入分析队列
analysis_task = {'symbol': 'AZTEC/USDC:USDC', 'timeframe': '5m', ...}
self.analysis_queue.put_nowait(analysis_task)
⚠️ 不只是 AZTEC — 所有已有币种的 5m K 线同样在持续入队。
555 个 (symbol, timeframe) 组合持续积压到 analysis_queue(容量 30000)。
第 3 阶段:调用路径 — 分析 Worker 执行链
30 个 _analysis_worker 线程(realtime_kline_service_base.py:843-944)从队列消费:
task = self.analysis_queue.get(timeout=1.0) # 取任务
self._batch_upsert_with_retry([kline_data], ...) # 阻塞点①: DB写入
self._analyze_and_alert(symbol, timeframe, kline_time) # 执行分析
分析编排
_analyze_and_alert(L1288)→ _fetch_and_validate_price_data(L946):
price_data_cache = self._fetch_and_validate_price_data(symbol, timeframe)
数据校验与补充循环
_fetch_and_validate_price_data 对 3 个周期 (5m/7d, 1h/30d, 4h/60d) 串行处理:
for tf, window in {'5m': 7天, '1h': 30天, '4h': 60天}.items():
base_klines = self.kline_repo.query_range(...) # 阻塞点②: DB查询
alt_klines = self.kline_repo.query_range(...) # 阻塞点③: DB查询
# AZTEC 无历史数据 → need_refill = True
if need_refill:
self.data_filler.fill_missing_data(BTC, tf, ...) # 阻塞核心④
self.data_filler.fill_missing_data(AZTEC, tf, ...) # 阻塞核心⑤
⚠️ 新币每个周期都缺数据,单次分析最多触发 6 次
fill_missing_data(3 周期 × base + alt)。
数据补充 → API 分页拉取
fill_missing_data(kline_data_filler.py:229-268)→ fetch_candles_range_with_retry(hyperliquid_candles.py:167-215):
all_rows = fetch_candles_range_with_retry(
self._info, symbol, timeframe,
since_ms, until_ms,
api_limit=1500, # 每页最多 1500 条
request_interval=2.5, # 页间等待 2.5 秒
)
# 分页循环
while current_since < until_ms:
rows = fetch_candles(...) # ← 持有全局锁!
time.sleep(request_interval) # 页间 2.5 秒等待
第 4 阶段:出错点 — 全局锁串行化
核心阻塞:fetch_candles() 全局互斥锁
hyperliquid_candles.py:29, 107-112:
_candles_lock = threading.Lock() # ← 模块级全局锁,单例
def fetch_candles(info, symbol, interval, start_time_ms, end_time_ms):
min_interval = 2.5 # KLINE_FILLER_API_INTERVAL
with _candles_lock: # ← 独占锁
now = time.monotonic()
elapsed = now - _last_candles_request_time
if elapsed < min_interval:
time.sleep(min_interval - elapsed) # ← 锁内 sleep 最多 2.5s
_last_candles_request_time = time.monotonic()
# 锁释放后才发 HTTP 请求(30s 超时)
raw = info.candles_snapshot(...)
锁争用时序
Worker-0: [acquire ✅ → sleep(2.5s) → release] → HTTP(30s) → [acquire ⏳ 排29位...]
Worker-1: [acquire ⏳ 等Worker-0释放...] → [acquire ✅ → sleep(2.5s) → release] → HTTP...
Worker-2: [acquire ⏳ ...]
...
Worker-29: [acquire ⏳ 等 75s...]
定量估算
| 场景 | 计算 | 耗时 |
|---|---|---|
| 锁争用一轮 | 30 workers × 2.5s 限流间隔 | 75 秒 |
| AZTEC 1h 补充 30 天 | 720 条 ÷ 1500/页 = 1 页 + 锁排队 | ≈ 75 秒 |
| AZTEC 5m 补充 7 天 | 2016 条 ÷ 1500/页 = 2 页 × 75 秒/轮 | ≈ 150 秒 |
| AZTEC 4h 补充 60 天 | 360 条 ÷ 1500/页 = 1 页 + 锁排队 | ≈ 75 秒 |
| 多个新币同时补充 | N × 3 周期 × 上述时间 | 几十分钟 |
第 5 阶段:根因总结
日志证据
12:31:28 - 开始补充K线数据 | AZTEC/USDC:USDC @ 1h | 时间范围: 30天
← 此后 68 分钟无任何分析完成日志 →
13:39:17 - 分析队列: 24073/30000 (80.2%) | 去重字典: 入队555 分析0
分析0 表示没有任何一个 worker 完成过一次分析。
三个缺陷叠加
| # | 缺陷 | 位置 | 影响 |
|---|---|---|---|
| 1 | 全局 threading.Lock() |
hyperliquid_candles.py:29 |
30 个 worker 完全串行化为单线程吞吐 |
| 2 | Lock 内 time.sleep(2.5s) |
hyperliquid_candles.py:111 |
每次持锁时间 = 限流等待时间,无法并行 |
| 3 | fill_missing_data 无总超时 |
kline_data_filler.py:229 |
分页循环可运行任意长时间,无退出机制 |
一句话根因
fetch_candles()的全局threading.Lock()将 30 个 worker 串行化为单 worker 吞吐。当新币触发无时间上限的 K 线补充时,每个 API 请求独占锁 2.5 秒,30 个 worker 排队一轮需要 75 秒,导致分析队列消费速度远低于入队速度,最终堆满卡死。
修复方案
1. 全局锁 → 信号量并发控制
- _candles_lock = threading.Lock()
+ _candles_semaphore = threading.Semaphore(3) # 允许 3 个并发请求
def fetch_candles(...):
- with _candles_lock:
- now = time.monotonic()
- elapsed = now - _last_candles_request_time
- if elapsed < min_interval:
- time.sleep(min_interval - elapsed)
- _last_candles_request_time = time.monotonic()
+ with _candles_semaphore:
+ raw = info.candles_snapshot(...)
2. 数据补充增加总超时
def fill_missing_data(self, symbol, timeframe, start_time, end_time):
+ max_duration = KLINE_FILLER_MAX_DURATION_SECONDS # 30秒
all_rows = fetch_candles_range_with_retry(
- ..., request_interval=2.5
+ ..., request_interval=2.5, max_duration=max_duration
)
3. 补充失败降级处理
# _fetch_and_validate_price_data 中
if need_refill:
alt_filled = self.data_filler.fill_missing_data(...)
+ # 补充后仍不足 → 快速失败,不反复阻塞 worker
+ if len(alt_klines) < 100:
+ self.logger.warning(f"数据补充后仍不足,跳过: {symbol} @ {tf}")
+ return None
4. 新增配置参数
| 参数 | 值 | 说明 |
|---|---|---|
KLINE_FILLER_MAX_DURATION_SECONDS |
30 | 单次补充最大时长 |
KLINE_FILLER_MAX_CONCURRENT_REQUESTS |
3 | 最大并发 API 请求 |