数据自愈模块提速

数据自愈并发优化设计方案

一、现状分析

当前流程

_run_data_healingrealtime_kline_service_base.py:1807):

for (symbol, base_symbol) in heal_pairs:  # ~50+ 配对,串行
    DataHealingOrchestrator(...)           # → RepairExecutor → KlineDataFiller
    │                                      #   Info(MAINNET_API_URL) HTTP 握手 ~0.85s
    │                                      #   第 2-50 个立刻被 shared_executor 覆盖,全部浪费!
    _load_zscore_history()                # DB 查询 ×1(0.08-0.56s)
    _get_db_now()                         # SELECT NOW() ×1
    _diagnose()                           # 内存计算(~0ms)
    check_continuity()                    # ← 第一次连续性检查
    _final_assessment()                   # ← 第二次连续性检查(冗余)
    QualityAssessor.assess()              # 质量评估

瓶颈定量(日志实测,50 对)

每对的耗时结构(以 IO/BTC 对为例):

KlineDataFiller 初始化完成  ← Info(MAINNET_API_URL) HTTP 握手 ~0.85s  ← 49 次是废弃的!
自愈总耗时: 0.14s           ← DB 查询 + 诊断 + 评估
─────────────────────────────────────────────────────────────────────
每对实际耗时: ~1.0s
操作 单次耗时 次数 总计 占比
KlineDataFiller init(HTTP 握手,49次废弃) ~0.85s 50 ~43s 78%
_load_zscore_history DB 查询 ~0.15s avg ~50 ~7.5s 14%
ONDO/BTC DB 慢查询(特例) 4.44s 1 4.44s 8%
_get_db_now SELECT NOW() ~0.01s ~50 ~0.5s ~1%
_final_assessment 冗余连续性检查 ~0.001s ~50 ~0.05s ~0%
总计 ~55s

根因DataHealingOrchestrator.__init__ 无条件调用 RepairExecutor.__init__
KlineDataFiller.__init__Info(MAINNET_API_URL, skip_ws=True, timeout=30)
发起一次 HTTP 握手拉取资产元数据(kline_data_filler.py:82)。
服务层代码虽然通过 shared_executor 复用了第一个实例,但第 2-50 个 KlineDataFiller
已经完成 HTTP 握手后才被覆盖,浪费约 43 秒

基础设施

  • 连接池:psycopg 3.x ConnectionPoolmin_size=2, max_size=10,支持并发 DB 查询
  • 交易所 APIKlineDataFiller 内置 10 分钟冷却机制
  • 超时保护:全局 healing_timeout(300s),单配对 HEALING_PER_PAIR_TIMEOUT=60s

二、优化方案 A:批量 DB 查询

目标

50+ 次 DB 往返 → 1 次

核心 SQL

将原来的单配对查询:

-- 原始:每个配对执行一次
SELECT kline_time, zscore_4h, analysis_time
FROM (
    SELECT DISTINCT ON (kline_time)
        kline_time, zscore_4h, analysis_time
    FROM analysis_results
    WHERE symbol = %s AND base_symbol = %s
      AND zscore_4h IS NOT NULL
      AND kline_time >= NOW() - INTERVAL 'X days'
    ORDER BY kline_time DESC, analysis_time DESC
    LIMIT %s
) sub
ORDER BY kline_time ASC

改为批量查询:

-- 优化:所有配对一次查询
SELECT symbol, base_symbol, kline_time, zscore_4h, analysis_time
FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY symbol, base_symbol
        ORDER BY kline_time DESC
    ) AS rn
    FROM (
        SELECT DISTINCT ON (symbol, base_symbol, kline_time)
            symbol, base_symbol, kline_time, zscore_4h, analysis_time
        FROM analysis_results
        WHERE (symbol, base_symbol) IN (VALUES (%s,%s), (%s,%s), ...)
          AND zscore_4h IS NOT NULL
          AND kline_time >= NOW() - INTERVAL '{boundary_days} days'
        ORDER BY symbol, base_symbol, kline_time DESC, analysis_time DESC
    ) deduped
) ranked
WHERE rn <= %s
ORDER BY symbol, base_symbol, kline_time ASC

SQL 逻辑说明

  1. 内层 DISTINCT ON:每个 (symbol, base_symbol, kline_time) 只保留最新 analysis_time 的记录
  2. ROW_NUMBER() OVER (PARTITION BY ... ORDER BY kline_time DESC):给每个配对的记录按时间倒序编号
  3. WHERE rn <= required_count:每个配对只取最近 N 条
  4. 外层 ORDER BY:按时间正序返回

内存分组

from collections import defaultdict

grouped: dict[tuple[str, str], list[dict]] = defaultdict(list)
for row in rows:
    grouped[(row['symbol'], row['base_symbol'])].append(row)

代码改动

  1. orchestrator.py — 新增 batch_load_zscore_history() 静态方法
  2. realtime_kline_service_base.py_run_data_healing() 调用批量加载替代循环内单独加载

预期收益

DB 往返从 ~50 次降到 1 次,数据加载阶段 ~7.5s → ~0.3s


三、优化方案 B:并发修复

前提

批量查询后,数据已在内存中。诊断是纯 CPU 操作(~0ms/对),不需要并发。只有不健康的配对需要调用 RepairExecutor.repair()(涉及 DB 写入 + 交易所 API),才需要并发。

并发约束

约束 说明
DB 连接池 max_size=10 并发 DB 操作不能超过 10
KlineDataFiller 冷却机制 10 分钟冷却期,同 symbol 不会重复请求
交易所 API 频率限制 需控制并发请求数
RepairExecutor._fill_kline_gaps_parallel 内部已用 ThreadPoolExecutor(max_workers=2)

设计

# 不健康的配对才需要修复
unhealthy_pairs = [(sym, base, records, diagnosis) for ...]

if unhealthy_pairs:
    with ThreadPoolExecutor(max_workers=min(5, len(unhealthy_pairs))) as pool:
        futures = {
            pool.submit(_heal_single_pair, pair): pair
            for pair in unhealthy_pairs
        }
        for future in as_completed(futures):
            result = future.result()
            ...

并发度选择

max_workers=5(保守值):

  • DB 池 max_size=10,每个 repair 最多占 2 连接(双 symbol 并行 K 线补充)
  • 5 × 2 = 10,刚好打满连接池
  • 不会与主服务其他 DB 操作竞争过多

代码改动

  1. realtime_kline_service_base.py_run_data_healing() 修复阶段改为并发
  2. orchestrator.pyheal_and_prepare() 支持接收预加载数据,避免重复查询

四、附加优化

4a. db_now 共享

现状:每个配对调用 _get_db_now()SELECT NOW()(50+ 次)

优化:在批量查询中一并获取 NOW(),传给所有配对

# 批量查询的同一事务中获取 NOW()
cur.execute("SELECT NOW() AS db_now")
db_now = cur.fetchone()['db_now']

4b. 健康配对跳过冗余评估

现状_final_assessment() 总是重新调用 check_continuity(),即使诊断已判定健康

优化:诊断结果为 is_healthy=True 时,直接从诊断结果构建 HealingResult,跳过二次检查

# 在 heal_and_prepare() 中
if diagnosis.is_healthy:
    # 直接构建结果,跳过 _final_assessment 的重复 check_continuity
    quality = self.assessor.assess(records, required_count, True, 0)
    return HealingResult(
        status=self._determine_status(quality),
        data=self._extract_zscore_values(records),
        quality=quality,
        iterations_used=iteration,
    )

4c. 日志精简

现状:每个健康配对输出 5 行日志

  1. "连续性检查: 数据连续"(诊断阶段)
  2. "数据健康(第1轮)"(heal_and_prepare)
  3. "连续性检查: 数据连续"(_final_assessment 冗余)
  4. "质量评估: A级"(_final_assessment)
  5. "自愈结果: ready"(heal_and_prepare)

优化:健康配对只输出 1 行摘要日志,DEBUG 级别保留详细日志


五、重构后的流程

_run_data_healing():
│
├── 1. 查询 DB 中有数据的配对              # 已有逻辑,不变
├── 2. 构建 heal_pairs 列表               # 已有逻辑,不变
│
├── 3.【新】批量加载 + db_now              # 1 次 DB 查询,替代 50+ 次
│      batch_load_zscore_history()
│      → grouped: {(sym, base): [records...]}
│      → db_now: datetime
│
├── 4.【新】批量诊断(内存操作,瞬间)
│      for pair in heal_pairs:
│          records = grouped[pair]
│          diagnosis = diagnose(records, required_count, db_now)
│          if healthy → 直接记录结果
│          else → 加入 unhealthy_pairs
│
├── 5.【新】并发修复不健康配对
│      ThreadPoolExecutor(max_workers=5):
│          for pair in unhealthy_pairs:
│              RepairExecutor.repair(...)
│              → 重新加载 + 评估
│
└── 6. 汇总日志
       healed=X, failed=Y, healthy=Z (skipped)

六、预期性能对比

新架构在批量诊断阶段不再创建 DataHealingOrchestrator 对象,KlineDataFiller 只在真正
需要修复时才初始化(1 次),因此同时消除了 DB 查询和 HTTP 握手两大瓶颈。

指标 优化前 优化后 提升
KlineDataFiller init 次数(HTTP 握手) 50 次(49 次废弃) 0-1 次 ~50x
KlineDataFiller init 总耗时 ~43s ~0s(健康)/ ~1s(有修复) 消除
DB 查询次数 ~100(加载50 + NOW50) 2(批量加载1 + NOW1) 50x
DB 查询总耗时 ~7.5s ~0.3s 25x
全健康场景总耗时 ~55s ~0.4s ~140x
有 5 对需修复 ~55s ~2s ~28x
有 15 对需修复 ~55s ~5s ~11x
日志行数 ~250 行 ~10 行 精简

全健康场景耗时拆解(优化后):

1 次批量 DB 查询:       ~0.3s
50 对内存诊断(纯计算): ~0.05s
KlineDataFiller init:  0s(无需修复,不创建)
─────────────────────────────
总计:                  ~0.35s

七、改动文件清单

文件 改动类型 说明
orchestrator.py 新增方法 + 修改 添加 batch_load_zscore_history()heal_and_prepare() 支持接收预加载数据
realtime_kline_service_base.py 重构方法 _run_data_healing() 改为批量加载 + 并发修复

不变的文件repair_executor.pycontinuity_checker.pyquality_assessor.pyconfig.py__init__.py


八、风险控制

风险 缓解措施
DB 连接池耗尽 并发修复 max_workers=5,远小于连接池 max_size=10
交易所 API 限流 KlineDataFiller 已有 10 分钟冷却机制,不受影响
大批量 SQL 性能 50 个 VALUES 对 PostgreSQL 不是问题,有索引支持
向后兼容 heal_and_prepare() 保留原有签名,新增可选参数 preloaded_recordsdb_now
超时保护 全局 healing_timeout(300s) 保持不变
并发异常传播 每个 future 独立 try/except,单个配对失败不影响其他配对

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG