多币种进程卡死问题分析

BUG8: 全局锁导致系统运行后期卡死

发现时间: 2026-02-15
严重程度: Critical
状态: 待修复
影响: 运行数小时后分析队列堆满(80%+),所有 30 个 worker 停止消费,系统无分析输出


完整因果链

输入(新币上线) → 状态变化(订阅 + 队列入队) → 调用路径(worker → 数据补充 → API循环)
→ 出错点(全局锁串行化 30 个 worker) → 根因(Lock 内 sleep + 无总超时)

第 1 阶段:输入 — 新币种上线

_monitor_new_symbols 线程每小时扫描交易所 meta(realtime_kline_service_base.py:1607-1670):

# 发现新币种(如 AZTEC)
new_symbols = exchange_symbols - current_symbols

# 注册并动态订阅 WebSocket
self.symbols.append(symbol)
for interval in ['5m', '1h', '4h']:
    new_subscriptions.append({"type": "candle", "coin": coin, "interval": interval})

状态变化

属性 变化
self.symbols +AZTEC/USDC:USDC
WebSocket 订阅 +3 个 candle 订阅(5m, 1h, 4h)
数据库 AZTEC 在 DB 中无任何历史 K 线数据

第 2 阶段:状态变化 — WebSocket 消息涌入

AZTEC 的 5m K 线推送到达后,on_messagerealtime_kline_service_base.py:573-651)处理:

# L627: 只有 5m 周期触发分析
if kline['timeframe'] != '5m':
    return

# L638-639: 放入分析队列
analysis_task = {'symbol': 'AZTEC/USDC:USDC', 'timeframe': '5m', ...}
self.analysis_queue.put_nowait(analysis_task)

⚠️ 不只是 AZTEC — 所有已有币种的 5m K 线同样在持续入队
555 个 (symbol, timeframe) 组合持续积压到 analysis_queue(容量 30000)。


第 3 阶段:调用路径 — 分析 Worker 执行链

30 个 _analysis_worker 线程(realtime_kline_service_base.py:843-944)从队列消费:

task = self.analysis_queue.get(timeout=1.0)                    # 取任务
self._batch_upsert_with_retry([kline_data], ...)               # 阻塞点①: DB写入
self._analyze_and_alert(symbol, timeframe, kline_time)         # 执行分析

分析编排

_analyze_and_alert(L1288)→ _fetch_and_validate_price_data(L946):

price_data_cache = self._fetch_and_validate_price_data(symbol, timeframe)

数据校验与补充循环

_fetch_and_validate_price_data3 个周期 (5m/7d, 1h/30d, 4h/60d) 串行处理:

for tf, window in {'5m': 7天, '1h': 30天, '4h': 60天}.items():
    base_klines = self.kline_repo.query_range(...)     # 阻塞点②: DB查询
    alt_klines = self.kline_repo.query_range(...)      # 阻塞点③: DB查询

    # AZTEC 无历史数据 → need_refill = True
    if need_refill:
        self.data_filler.fill_missing_data(BTC, tf, ...)      # 阻塞核心④
        self.data_filler.fill_missing_data(AZTEC, tf, ...)     # 阻塞核心⑤

⚠️ 新币每个周期都缺数据,单次分析最多触发 6 次 fill_missing_data(3 周期 × base + alt)。

数据补充 → API 分页拉取

fill_missing_datakline_data_filler.py:229-268)→ fetch_candles_range_with_retryhyperliquid_candles.py:167-215):

all_rows = fetch_candles_range_with_retry(
    self._info, symbol, timeframe,
    since_ms, until_ms,
    api_limit=1500,           # 每页最多 1500 条
    request_interval=2.5,     # 页间等待 2.5 秒
)
# 分页循环
while current_since < until_ms:
    rows = fetch_candles(...)        # ← 持有全局锁!
    time.sleep(request_interval)     # 页间 2.5 秒等待

第 4 阶段:出错点 — 全局锁串行化

核心阻塞:fetch_candles() 全局互斥锁

hyperliquid_candles.py:29, 107-112

_candles_lock = threading.Lock()       # ← 模块级全局锁,单例

def fetch_candles(info, symbol, interval, start_time_ms, end_time_ms):
    min_interval = 2.5  # KLINE_FILLER_API_INTERVAL

    with _candles_lock:                                # ← 独占锁
        now = time.monotonic()
        elapsed = now - _last_candles_request_time
        if elapsed < min_interval:
            time.sleep(min_interval - elapsed)         # ← 锁内 sleep 最多 2.5s
        _last_candles_request_time = time.monotonic()

    # 锁释放后才发 HTTP 请求(30s 超时)
    raw = info.candles_snapshot(...)

锁争用时序

Worker-0:  [acquire ✅ → sleep(2.5s) → release] → HTTP(30s) → [acquire ⏳ 排29位...]
Worker-1:  [acquire ⏳ 等Worker-0释放...] → [acquire ✅ → sleep(2.5s) → release] → HTTP...
Worker-2:  [acquire ⏳ ...]
...
Worker-29: [acquire ⏳ 等 75s...]

定量估算

场景 计算 耗时
锁争用一轮 30 workers × 2.5s 限流间隔 75 秒
AZTEC 1h 补充 30 天 720 条 ÷ 1500/页 = 1 页 + 锁排队 ≈ 75 秒
AZTEC 5m 补充 7 天 2016 条 ÷ 1500/页 = 2 页 × 75 秒/轮 ≈ 150 秒
AZTEC 4h 补充 60 天 360 条 ÷ 1500/页 = 1 页 + 锁排队 ≈ 75 秒
多个新币同时补充 N × 3 周期 × 上述时间 几十分钟

第 5 阶段:根因总结

日志证据

12:31:28 - 开始补充K线数据 | AZTEC/USDC:USDC @ 1h | 时间范围: 30天
           ← 此后 68 分钟无任何分析完成日志 →
13:39:17 - 分析队列: 24073/30000 (80.2%) | 去重字典: 入队555 分析0

分析0 表示没有任何一个 worker 完成过一次分析

三个缺陷叠加

# 缺陷 位置 影响
1 全局 threading.Lock() hyperliquid_candles.py:29 30 个 worker 完全串行化为单线程吞吐
2 Lock 内 time.sleep(2.5s) hyperliquid_candles.py:111 每次持锁时间 = 限流等待时间,无法并行
3 fill_missing_data 无总超时 kline_data_filler.py:229 分页循环可运行任意长时间,无退出机制

一句话根因

fetch_candles() 的全局 threading.Lock() 将 30 个 worker 串行化为单 worker 吞吐。当新币触发无时间上限的 K 线补充时,每个 API 请求独占锁 2.5 秒,30 个 worker 排队一轮需要 75 秒,导致分析队列消费速度远低于入队速度,最终堆满卡死。


修复方案

1. 全局锁 → 信号量并发控制

- _candles_lock = threading.Lock()
+ _candles_semaphore = threading.Semaphore(3)  # 允许 3 个并发请求

  def fetch_candles(...):
-     with _candles_lock:
-         now = time.monotonic()
-         elapsed = now - _last_candles_request_time
-         if elapsed < min_interval:
-             time.sleep(min_interval - elapsed)
-         _last_candles_request_time = time.monotonic()
+     with _candles_semaphore:
+         raw = info.candles_snapshot(...)

2. 数据补充增加总超时

  def fill_missing_data(self, symbol, timeframe, start_time, end_time):
+     max_duration = KLINE_FILLER_MAX_DURATION_SECONDS  # 30秒
      all_rows = fetch_candles_range_with_retry(
-         ..., request_interval=2.5
+         ..., request_interval=2.5, max_duration=max_duration
      )

3. 补充失败降级处理

  # _fetch_and_validate_price_data 中
  if need_refill:
      alt_filled = self.data_filler.fill_missing_data(...)
+     # 补充后仍不足 → 快速失败,不反复阻塞 worker
+     if len(alt_klines) < 100:
+         self.logger.warning(f"数据补充后仍不足,跳过: {symbol} @ {tf}")
+         return None

4. 新增配置参数

参数 说明
KLINE_FILLER_MAX_DURATION_SECONDS 30 单次补充最大时长
KLINE_FILLER_MAX_CONCURRENT_REQUESTS 3 最大并发 API 请求

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG