启动数据自愈系统BUG分析2

数据自愈系统严重BUG分析报告

文档版本: 1.0
分析日期: 2026-02-15
严重性等级: 🔴 Critical (P0)
状态: 已确认,待修复


📋 执行摘要 | Executive Summary

BUG描述

Trading-in-websocket项目的数据自愈系统(Data Healing System)在启动时存在严重逻辑缺陷。当交易对的K线数据完全缺失时,自愈流程无法正常工作,导致修复失败,最终可能造成服务拒绝启动或降级启动

核心问题

概念混淆:代码混淆了两个不同的时间点概念:

  • zscore缺失时间点 (missing_times):需要计算zscore的目标时间点
  • K线缺失时间点 (kline_gaps):计算zscore所需的输入K线时间点

当K线完全缺失时,代码只补充了zscore缺失的时间点,而忽视了计算zscore所需的130条K线完整窗口

影响范围

  • 新币种首次接入(K线表为空)
  • 数据迁移后K线数据未同步
  • 数据库故障恢复场景

修复复杂度

- 仅需修改一个方法,新增一个辅助方法


🎯 问题详解 | Problem Details

1. BUG位置

项目 详情
文件 src/utils/data_healing/repair_executor.py
方法 RepairExecutor._find_kline_gaps()
行号 222-267行(具体BUG在第258行)
关键配置 TOTAL_WINDOW = 130(BETA_WINDOW=100 + ZSCORE_WINDOW=30)

2. BUG代码

# repair_executor.py:249-266
for sym in [symbol, base_symbol]:
    klines = self.kline_repo.query_range(
        symbol=sym,
        timeframe=timeframe,
        start_time=start_time,
        end_time=end_time,
        limit=10000
    )
    if not klines:
        # ⚠️ BUG: 当K线完全缺失时,只把missing_times加入缺口列表
        all_gaps.update(missing_times)  # ❌ 错误逻辑
        logger.warning(f"K 线完全缺失: {sym} @ {timeframe} [{start_time} ~ {end_time}]")
    else:
        _, sym_missing = self.kline_filler.validate_continuity(klines, timeframe)
        if sym_missing:
            all_gaps.update(sym_missing)
            logger.debug(f"K 线缺口: {sym} @ {timeframe} | {len(sym_missing)} 个")

return sorted(all_gaps)

3. 问题根源

3.1 时间范围计算

# orchestrator.py:244-246
lookback_minutes = TOTAL_WINDOW * self._interval_minutes(timeframe)
start_time = min(missing_times) - timedelta(minutes=lookback_minutes)
end_time = max(missing_times)

# 数值示例(timeframe='5m')
# TOTAL_WINDOW = 130
# lookback_minutes = 130 * 5 = 650分钟 = 10小时50分钟
# start_time = min(missing_times) - 650分钟
# end_time = max(missing_times)

3.2 窗口依赖关系

计算任意时刻 t 的zscore需要:

  • 输入: 130条K线窗口 [t-650分钟, t)
  • 组成:
    • BETA_WINDOW (100条): 计算稳定的回归参数
    • ZSCORE_WINDOW (30条): 计算敏感的均值回归指标
  • 失败条件: 窗口不足130条

3.3 为什么代码失败

输入场景:
  symbol = "PURR/USDC:USDC"  # 新币种,K线表为空
  missing_times = [t1, t2, t3]  # 只有3个缺失的zscore时间点

当前代码执行流程:
  1. _find_kline_gaps(missing_times=[t1, t2, t3])
     ↓
  2. 计算范围: start_time = t1 - 650分钟, end_time = t3
     ↓
  3. 查询K线: klines = query_range(symbol, start_time, end_time)
     结果: klines = []  (完全没有K线)
     ↓
  4. BUG触发: all_gaps.update(missing_times)
     结果: all_gaps = {t1, t2, t3}  # 只有3个时间点!
     ↓
  5. _fill_kline_gaps(kline_gaps={t1, t2, t3})
     补充K线: 只补充了3条K线
     ↓
  6. _repair_from_klines(missing_times=[t1, t2, t3])
     ↓
  7. 加载K线: alt_klines = query_range(start_time, end_time)
     结果: [t1_kline, t2_kline, t3_kline]  # 只有3条!
     ↓
  8. 计算t1的zscore:
     需要: [t1-650分钟, t1) 的130条K线
     实际: 只有0条K线 (t1是最早的)
     结果: _extract_kline_window返回None ❌
     ↓
  9. 计算t2的zscore:
     需要: [t2-650分钟, t2) 的130条K线
     实际: 只有1条K线 (只有t1)
     结果: _extract_kline_window返回None ❌
     ↓
  10. 计算t3的zscore:
      需要: [t3-650分钟, t3) 的130条K线
      实际: 只有2条K线 (t1, t2)
      结果: _extract_kline_window返回None ❌
      ↓
  11. 修复结果: repaired_count = 0
      原因: 所有zscore计算都失败了

最终后果:
  - 第一轮迭代: 无进展 (repaired_count = 0)
  - 继续迭代: BREAK (logger.warning "无进展,可能在冷却期或数据源不可用")
  - 质量评估: completeness_pct = 3/144 ≈ 2% < 60%
  - 启动状态: 'failed'
  - 服务启动: ❌ 拒绝启动

正确代码应该做什么:
  1. 识别K线完全缺失
  2. 生成完整的时间序列: [t1-650分钟, t1-645分钟, ..., t3]  (~130个)
  3. 补充这130条K线
  4. 重新加载: 成功获得130条K线
  5. 计算t1的zscore: 需要130条 → 有130条 ✅
  6. 计算t2的zscore: 需要130条 → 有130条 ✅
  7. 计算t3的zscore: 需要130条 → 有130条 ✅
  8. 修复结果: repaired_count = 3 ✅
  9. 质量评估: completeness_pct = 144/144 = 100% ✅
  10. 启动状态: 'ready' ✅

📊 完整因果链 | Complete Causal Chain

输入 (Input)

触发条件:
  - 项目启动时,RealtimeKlineService初始化
  - 数据库中某个symbol的zscore数据存在(analysis_results表有记录)
  - 但该symbol的K线数据完全缺失(klines表为空)

具体数据示例:
  symbol: "PURR/USDC:USDC"
  base_symbol: "HYPE/USDC:USDC"
  missing_times: [2026-02-15 10:00, 2026-02-15 10:30, 2026-02-15 11:00]
  klines表状态: 空(没有任何PURR/USDC:USDC的K线数据)
  required_count: 144  # 期望的历史记录数

状态变化 (State Changes)

第1阶段:初始化和检测

状态:
  - RealtimeKlineService启动
  - 查询数据库发现"PURR/USDC:USDC"有zscore数据
  - 触发数据自愈流程

组件创建:
  - DataHealingOrchestrator(symbol="PURR/USDC:USDC", base_symbol="HYPE/USDC:USDC")
  - ContinuityChecker: 检查连续性
  - RepairExecutor: 执行修复
  - QualityAssessor: 评估质量

第2阶段:发现缺失

_load_zscore_history(144):
  查询结果: 从analysis_results表查到部分记录(<144条)

check_continuity():
  检查结果:
    is_continuous: False
    missing_times: [t1, t2, t3]  # 发现缺失时间点
    completeness_pct: 3/144 = 2.1%

流程: 发现不连续,进入修复流程

第3阶段:K线检查 (BUG触发点)

repair(missing_times=[t1, t2, t3], symbol="PURR/USDC:USDC", timeframe='4h'):

  Step 1: _find_kline_gaps()
    计算范围:
      lookback_minutes = 130 * 60 = 7800分钟 (对于4h,interval=240)
      start_time = t1 - 7800分钟 = t1 - 5.4小时
      end_time = t3

    查询K线:
      for sym in ["PURR/USDC:USDC", "HYPE/USDC:USDC"]:
        klines = query_range(sym, '4h', start_time, end_time)

        sym="PURR/USDC:USDC":
          klines = []  # ❌ 完全没有K线
          ↓
          BUG触发: all_gaps.update([t1, t2, t3])  # 只添加了3个

        sym="HYPE/USDC:USDC":
          klines = [...]  # 有数据
          validate_continuity() 返回部分缺口
          all_gaps.update(sym_missing)

    返回: all_gaps = {t1, t2, t3, ...其他缺口}  # 不完整!

BUG结果:
  应该返回: ~130个时间点 (start_time到end_time的完整序列)
  实际返回: 仅3个 + 部分缺口
  后果: 补充的K线远不足130条

第4阶段:K线补充(不充分)

_fill_kline_gaps(kline_gaps={t1, t2, t3, ...}):
  for sym in ["PURR/USDC:USDC", "HYPE/USDC:USDC"]:
    filled = fill_missing_data_precise(symbol=sym, missing_timestamps=gaps)

  结果:
    PURR: 补充了约3条K线
    HYPE: 补充了部分缺口

  问题: 总K线数远不足130条!

第5阶段:修复失败(无法提取窗口)

_repair_from_klines(missing_times=[t1, t2, t3]):

  加载K线:
    alt_klines = query_range(PURR, start_time, end_time)
    base_klines = query_range(HYPE, start_time, end_time)
    结果: alt_klines只有~3-5条,远不足130条

  计算zscore:
    for missing_time in [t1, t2, t3]:
      window = _extract_kline_window(alt_df, missing_time, TOTAL_WINDOW=130)

      缺陷:
        mask = alt_df['time'] < missing_time
        subset = alt_df.loc[mask].tail(130)

        if len(subset) < 130:  # ❌ 这会一直为真
          return None

      结果: 所有zscore计算都返回None

  最终: repaired_count = 0 (所有计算都失败)

第6阶段:修复无进展

修复循环检查:
  if repaired_count == 0:
    logger.warning("⚠️ 第1轮修复无进展,无可用K线数据")
    BREAK

原因分析:
  代码认为: "无进展,可能在冷却期或数据源不可用"
  实际原因: 补充的K线数量不足,无法构建完整窗口 ❌

第7阶段:质量评估和启动决策

质量评估:
  final_records: 仍然只有原始的部分记录
  completeness_pct: 3/144 ≈ 2.1%

启动状态判定:
  if completeness_pct < 60%:
    return 'failed'  # ❌ 拒绝启动

  最终结果: HealingResult(status='failed', quality_grade='F')

服务启动决策:
  if result.status == 'failed':
    logger.error("数据质量不足,无法启动交易服务")
    sys.exit(1)  # ❌ 服务无法启动

调用路径 (Call Path)

程序启动 (main/cli entry point)
  ↓
RealtimeKlineService.__init__()
  [src/services/realtime_kline_service.py]
  ↓
RealtimeKlineServiceBase.__init__()
  [src/services/realtime_kline_service_base.py:1418-1496]
  ↓
_run_data_healing()
  # 查询有zscore数据的symbol
  symbols_with_data = {"PURR/USDC:USDC", "OTHER/USDC:USDC", ...}
  heal_symbols = symbols_with_data ∩ active_symbols - {base_symbol}
  ↓
  FOR symbol IN heal_symbols:
    ↓
    DataHealingOrchestrator(
      db_client, kline_repo,
      symbol="PURR/USDC:USDC",
      base_symbol="HYPE/USDC:USDC",
      repair_timeframe='4h'
    )
    ↓
    heal_and_prepare(required_count=144)
      [src/utils/data_healing/orchestrator.py:126-293]
      ↓
      WHILE iteration < 3:  # 最多3轮迭代
        ↓
        _load_zscore_history(144)
          # 查询analysis_results表
          # 结果: 部分记录(<144条)
        ↓
        ContinuityChecker.check_continuity(records, 144)
          # 检查连续性
          # 结果: is_continuous=False, missing_times=[t1,t2,t3]
        ↓
        RepairExecutor.repair(
          missing_times=[t1, t2, t3],
          symbol="PURR/USDC:USDC",
          base_symbol="HYPE/USDC:USDC",
          timeframe='4h'
        )
          [src/utils/data_healing/repair_executor.py:55-93]
          ↓
          Step 1: _find_kline_gaps(missing_times, symbol, base_symbol, '4h')
            [222-267行] ⚠️ BUG在这里
            ↓
            计算时间范围:
              lookback_minutes = 130 * 60 = 7800分钟
              start_time = t1 - 7800分钟
              end_time = t3
            ↓
            FOR sym IN [symbol, base_symbol]:
              klines = query_range(sym, '4h', start_time, end_time)

              IF not klines:  # K线完全缺失
                ❌ BUG: all_gaps.update(missing_times)  # 只添加[t1,t2,t3]
                # ✅ 应该: all_gaps.update(generate_timeline(start_time, end_time))
              ↓
            RETURN sorted(all_gaps)  # 返回不完整的缺口
          ↓
          Step 2: _fill_kline_gaps(kline_gaps, symbol, base_symbol, '4h')
            [269-294行]
            ↓
            KlineDataFiller.fill_missing_data_precise()
            # 只补充了不完整的K线
          ↓
          Step 3: _repair_from_klines(missing_times, symbol, base_symbol, '4h')
            [95-220行]
            ↓
            alt_klines = query_range(symbol, '4h', start_time, end_time)
            base_klines = query_range(base_symbol, '4h', start_time, end_time)
            # 只能获得补充的部分K线,远不足130条
            ↓
            FOR missing_time IN [t1, t2, t3]:
              ↓
              _extract_kline_window(alt_df, missing_time, TOTAL_WINDOW=130)
                # 需要130条窗口,实际只有2-3条
                # 返回: None ❌
              ↓
              zscore计算失败
            ↓
            repaired_count = 0  # 没有任何记录被修复
        ↓
        IF repaired_count == 0:
          logger.warning("⚠️ 第1轮修复无进展...")
          BREAK  # 退出迭代循环
      ↓
      质量评估:
        completeness = 3/144 ≈ 2.1%
        status = 'failed'  # < 60% 阈值
      ↓
      RETURN HealingResult(status='failed')
    ↓
    IF result.status == 'failed':
      logger.error("数据质量不足,拒绝启动")
      failed += 1
  ↓
  服务启动失败 ❌

出错点 (Bug Point)

具体代码 (repair_executor.py:258)

if not klines:
    all_gaps.update(missing_times)  # ❌ 错误

问题分析

方面 详情
混淆的概念 把"需要修复的zscore时间点"误用为"需要补充的K线时间点"
丢失的信息 计算zscore需要130条K线的窗口依赖关系
结果 补充的K线数量:3条 vs 需要:130条
失败原因 无法构建完整的窗口来计算zscore

根因 (Root Cause)

1. 概念混淆

zscore缺失时间点 (missing_times):
  - 定义: analysis_results表中zscore_4h为NULL的时间点
  - 数量: 通常少量(几个到几十个)
  - 用途: 标记需要修复的目标

K线缺失时间点 (kline_gaps):
  - 定义: klines表中缺失的K线数据的时间点
  - 数量: 可能很多(包含完整窗口)
  - 用途: 标记需要补充的输入数据

关键区别:
  zscore缺失 ≠ K线缺失

  例如:
    - zscore缺失时间点: [t1, t2, t3]  (3个)
    - 但计算这3个的zscore需要: [t1-650分钟, t1), [t2-650分钟, t2), ...
    - 总共需要约: 3 * 130 = 390条K线(有重叠,实际需要~130条)

2. 窗口依赖关系被忽视

计算 missing_time 的zscore流程:

  missing_time (目标时间点)
    ↓
  需要130条K线窗口: [missing_time - 650分钟, missing_time)
    ↓
  窗口组成:
    - BETA_WINDOW (100条): 用于计算稳定的回归参数
      公式: beta = cov(base_returns, alt_returns) / var(base_returns)
      需要足够数据确保参数稳定
    ↓
    - ZSCORE_WINDOW (30条): 用于计算敏感的均值回归指标
      公式: zscore = (price_spread - mean(price_spread)) / std(price_spread)
      需要最近30条数据计算动态的均值和标准差
  ↓
  如果窗口不完整 (< 130条):
    - 无法计算稳定的beta
    - 无法计算准确的zscore
    - 最终: _extract_kline_window返回None, zscore计算失败

3. 问题代码的逻辑缺陷

# 当前的错误理解
def _find_kline_gaps(missing_times, ...):
    if not klines:  # K线完全缺失
        all_gaps.update(missing_times)  # ❌ 错误理解
    # 假设: "我需要修复[t1,t2,t3],那就补充[t1,t2,t3]的K线"
    # 错误: 忽视了修复[t1,t2,t3]需要[t1-650, t3]范围的所有K线

# 正确的理解应该是
def _find_kline_gaps(missing_times, ...):
    # 计算需要的时间范围(包含窗口)
    start_time = min(missing_times) - 650分钟
    end_time = max(missing_times)

    if not klines:  # K线完全缺失
        # 需要补充整个时间范围的所有K线
        all_gaps.update(generate_complete_timeline(start_time, end_time))
    # 假设: "我需要修复[t1,t2,t3],但计算它们需要[t1-650, t3]的完整K线"
    # 正确: 识别真正需要补充的K线时间点

💥 影响范围 | Impact Analysis

影响场景

场景1: 新币种首次接入

触发时机:
  - 新币种(如PURR/USDC:USDC)刚开始交易
  - WebSocket接收到实时5m K线
  - 写入klines表(历史数据为空)
  - 实时分析生成zscore,写入analysis_results表
  - 服务重启或滚动升级

影响:
  - 数据自愈失败
  - 服务拒绝启动或降级启动
  - 新币种交易无法进行
  - 需要人工介入,手动补充数据

场景2: 数据迁移后

触发时机:
  - 数据库从旧系统迁移到新系统
  - K线表数据迁移不完整
  - 但analysis_results表有残留记录

影响:
  - 自愈流程失败
  - 影响多个symbol同时
  - 服务完全无法启动
  - 交易中断,业务损失

场景3: 数据库故障恢复

触发时机:
  - 数据库故障/备份恢复
  - K线表数据丢失
  - analysis_results表恢复但不完整

影响:
  - 自愈无法修复不一致的数据
  - 服务启动失败
  - 恢复时间延长

影响等级

影响维度 严重程度 描述
服务可用性 🔴 Critical 可能导致服务完全无法启动
数据准确性 🟠 High zscore数据不完整,分析结果不可信
交易损失 🟠 High 无法及时交易,错失市场机会
人工成本 🟠 High 需要人工介入恢复,技术支持成本高
系统稳定性 🔴 Critical 启动流程脆弱,易受数据状态影响

实际业务影响

轻度影响 (单个symbol):
  - 该symbol以"降级模式"启动
  - 交易策略运行但数据不可靠
  - 可能造成错误的交易信号

中度影响 (多个symbol):
  - 部分symbol启动失败
  - 服务以"预热模式"运行
  - 一部分交易功能不可用
  - 用户投诉增加

严重影响 (全局):
  - 所有symbol启动失败
  - 服务完全无法工作
  - 需要紧急停止交易
  - 重大业务中断
  - 用户资金安全风险

🔧 修复方案 | Fix Plan

修复策略

添加_generate_complete_timeline()方法,在K线完全缺失时生成完整的时间序列。

修改代码

1. 修改_find_kline_gaps()方法

文件: src/utils/data_healing/repair_executor.py (222-267行)

def _find_kline_gaps(
    self,
    missing_times: List[datetime],
    symbol: str,
    base_symbol: str,
    timeframe: str
) -> List[datetime]:
    """
    检查 repair_timeframe 的 K 线是否有缺口(复用 KlineDataFiller.validate_continuity)

    Args:
        missing_times: 缺失的时间点列表
        symbol: 交易对符号
        base_symbol: 基准符号
        timeframe: 时间周期(repair_timeframe)

    Returns:
        缺失的 K 线时间点列表(已排序)
    """
    if not missing_times:
        return []

    lookback_minutes = TOTAL_WINDOW * self._interval_minutes(timeframe)
    start_time = min(missing_times) - timedelta(minutes=lookback_minutes)
    end_time = max(missing_times)

    all_gaps = set()
    for sym in [symbol, base_symbol]:
        klines = self.kline_repo.query_range(
            symbol=sym,
            timeframe=timeframe,
            start_time=start_time,
            end_time=end_time,
            limit=10000
        )
        if not klines:
            # ✅ 修复: 完全没有 K 线时,生成整个时间范围的所有时间点
            complete_gaps = self._generate_complete_timeline(
                start_time, end_time, timeframe
            )
            all_gaps.update(complete_gaps)
            logger.warning(
                f"K 线完全缺失: {sym} @ {timeframe} | "
                f"需要补充 {len(complete_gaps)} 个时间点 "
                f"[{start_time} ~ {end_time}]"
            )
        else:
            _, sym_missing = self.kline_filler.validate_continuity(klines, timeframe)
            if sym_missing:
                all_gaps.update(sym_missing)
                logger.debug(f"K 线缺口: {sym} @ {timeframe} | {len(sym_missing)} 个")

    return sorted(all_gaps)

2. 新增_generate_complete_timeline()方法

repair_executor.py中添加(在_find_kline_gaps()方法之后):

def _generate_complete_timeline(
    self,
    start_time: datetime,
    end_time: datetime,
    timeframe: str
) -> List[datetime]:
    """
    生成完整的时间序列(用于K线完全缺失的场景)

    当某个symbol的K线数据完全缺失时,需要补充整个时间范围内的所有K线,
    才能提取足够的窗口数据来计算zscore。

    Args:
        start_time: 起始时间(包含)
        end_time: 结束时间(包含)
        timeframe: 时间周期('5m', '1h', '4h'等)

    Returns:
        时间点列表,按升序排列

    Example:
        >>> start = datetime(2026, 2, 15, 10, 0)
        >>> end = datetime(2026, 2, 15, 10, 30)
        >>> timeline = _generate_complete_timeline(start, end, '5m')
        >>> len(timeline)
        7  # [10:00, 10:05, 10:10, 10:15, 10:20, 10:25, 10:30]
    """
    interval_minutes = self._interval_minutes(timeframe)
    interval = timedelta(minutes=interval_minutes)

    timeline = []
    current = start_time

    while current <= end_time:
        timeline.append(current)
        current += interval

    logger.debug(
        f"生成完整时间序列: {len(timeline)} 个时间点 | "
        f"间隔: {interval_minutes}分钟 | "
        f"范围: [{start_time} ~ {end_time}]"
    )

    return timeline

修改影响分析

组件 修改 影响
_find_kline_gaps() 修改K线完全缺失的处理逻辑 返回完整的时间范围
_generate_complete_timeline() 新增辅助方法 生成时间序列
_fill_kline_gaps() 无改动 补充完整的K线
_repair_from_klines() 无改动 成功计算zscore

向后兼容性

完全兼容 - 修改仅影响K线完全缺失的分支,其他逻辑不变


✅ 验证方案 | Testing & Verification

单元测试

# test_data_healing_bug_fix.py

import pytest
from datetime import datetime, timedelta
from src.utils.data_healing.repair_executor import RepairExecutor


class TestFindKlineGapsFix:
    """验证K线完全缺失的修复"""

    @pytest.fixture
    def executor(self, db_client, kline_repo):
        """创建RepairExecutor实例"""
        return RepairExecutor(db_client, kline_repo)

    def test_generate_complete_timeline_5m(self, executor):
        """测试5分钟周期的时间序列生成"""
        start = datetime(2026, 2, 15, 10, 0)
        end = datetime(2026, 2, 15, 10, 30)

        timeline = executor._generate_complete_timeline(start, end, '5m')

        assert len(timeline) == 7  # 10:00, 10:05, 10:10, ..., 10:30
        assert timeline[0] == start
        assert timeline[-1] == end
        assert all(
            (timeline[i+1] - timeline[i]).total_seconds() == 5*60
            for i in range(len(timeline)-1)
        )

    def test_generate_complete_timeline_1h(self, executor):
        """测试1小时周期的时间序列生成"""
        start = datetime(2026, 2, 15, 10, 0)
        end = datetime(2026, 2, 15, 13, 0)

        timeline = executor._generate_complete_timeline(start, end, '1h')

        assert len(timeline) == 4  # 10, 11, 12, 13
        assert timeline[0] == start
        assert timeline[-1] == end

    def test_find_kline_gaps_with_empty_klines(self, executor, mock_kline_repo):
        """测试K线完全缺失时的缺口识别"""
        # 设置mock: query_range返回空列表(K线完全缺失)
        mock_kline_repo.query_range.return_value = []

        missing_times = [
            datetime(2026, 2, 15, 10, 0),
            datetime(2026, 2, 15, 10, 30),
            datetime(2026, 2, 15, 11, 0),
        ]

        gaps = executor._find_kline_gaps(
            missing_times,
            symbol="PURR/USDC:USDC",
            base_symbol="HYPE/USDC:USDC",
            timeframe='5m'
        )

        # 预期: 约130个时间点(TOTAL_WINDOW)
        # start_time = t1 - 650分钟, end_time = t3
        # 区间宽度 ≈ 650分钟 + 60分钟 = 710分钟
        # 时间点数 = 710 / 5 + 1 ≈ 143
        expected_min = 130
        assert len(gaps) >= expected_min, \
            f"缺口数量不足: {len(gaps)} < {expected_min}"

    def test_find_kline_gaps_partial_missing(self, executor, mock_kline_repo):
        """测试K线部分缺失时的正常处理"""
        # 设置mock: 返回部分K线(部分缺失)
        partial_klines = [...]  # 部分K线数据
        mock_kline_repo.query_range.return_value = partial_klines

        missing_times = [
            datetime(2026, 2, 15, 10, 0),
            datetime(2026, 2, 15, 11, 0),
        ]

        gaps = executor._find_kline_gaps(
            missing_times,
            symbol="PURR/USDC:USDC",
            base_symbol="HYPE/USDC:USDC",
            timeframe='5m'
        )

        # 应该调用validate_continuity处理部分缺失
        # 验证返回合理的缺口列表
        assert isinstance(gaps, list)
        assert all(isinstance(g, datetime) for g in gaps)

集成测试

def test_full_healing_with_empty_klines():
    """测试完整的自愈流程(K线完全缺失)"""

    # 准备测试数据
    db_client = create_test_db_client()
    kline_repo = create_test_kline_repo(empty=True)  # 没有K线数据

    # 在analysis_results表中创建部分记录
    setup_partial_analysis_results(
        symbol="PURR/USDC:USDC",
        count=3  # 只有3条记录(缺少141条)
    )

    # 执行自愈
    orchestrator = DataHealingOrchestrator(
        db_client=db_client,
        kline_repo=kline_repo,
        symbol="PURR/USDC:USDC",
        base_symbol="HYPE/USDC:USDC",
        repair_timeframe='4h'
    )

    result = orchestrator.heal_and_prepare(required_count=144)

    # 验证修复成功
    assert result.status == 'ready', f"预期启动状态为'ready',实际为'{result.status}'"
    assert result.quality.completeness_pct >= 95, \
        f"数据完整度应≥95%,实际{result.quality.completeness_pct:.1f}%"
    assert len(result.data) >= 144, \
        f"数据条数应≥144,实际{len(result.data)}"

性能测试

def test_generate_timeline_performance():
    """验证时间序列生成的性能"""
    executor = RepairExecutor(...)

    start = datetime(2026, 1, 1, 0, 0)
    end = datetime(2026, 2, 15, 23, 59)  # 约1.5个月

    # 测试5分钟周期(最多的时间点)
    import time
    t_start = time.time()
    timeline = executor._generate_complete_timeline(start, end, '5m')
    duration = time.time() - t_start

    # 应该在100ms内完成
    assert duration < 0.1, f"性能不达标: {duration:.3f}s > 0.1s"

    # 验证时间点数量
    expected = int((end - start).total_seconds() / 300) + 1
    assert len(timeline) == expected

回归测试清单

  • [ ] 原有的自愈流程不受影响
  • [ ] K线部分缺失的场景仍然正确处理
  • [ ] 质量评估和启动模式判决逻辑不变
  • [ ] 超时保护机制有效
  • [ ] 共享executor的复用逻辑正常
  • [ ] 数据库写入性能符合预期

📈 修复前后对比 | Before & After

修复前(BUG场景)

输入:
  symbol: PURR/USDC:USDC
  missing_times: [t1, t2, t3]
  K线数据: 空

执行流程:
  Step 1: _find_kline_gaps()
    ↓ all_gaps = {t1, t2, t3}  # 只有3个 ❌

  Step 2: _fill_kline_gaps()
    ↓ 补充3条K线 ❌

  Step 3: _repair_from_klines()
    ↓ 计算zscore
      需要: 130条窗口
      有: 0-2条
    ↓ repaired_count = 0 ❌

结果:
  status: 'failed' ❌
  completeness_pct: 2.1% ❌
  启动: 拒绝 ❌

日志:
  ⚠️ 第1轮修复无进展,无可用K线数据
  🏁 自愈结果: failed | F级 (2.1%) | 修复轮次: 1
  ❌ 服务拒绝启动

修复后(正确行为)

输入:
  symbol: PURR/USDC:USDC
  missing_times: [t1, t2, t3]
  K线数据: 空

执行流程:
  Step 1: _find_kline_gaps()
    ↓ all_gaps = {t1-650分钟, ..., t3}  # 约130个 ✅

  Step 2: _fill_kline_gaps()
    ↓ 补充130条K线 ✅

  Step 3: _repair_from_klines()
    ↓ 计算zscore
      需要: 130条窗口
      有: 130条
    ↓ repaired_count = 144 ✅  # 修复了144条记录

结果:
  status: 'ready' ✅
  completeness_pct: 100% ✅
  启动: 正常启动 ✅

日志:
  ✅ 第1轮修复完成: 修复 144 条记录
  ✅ 数据连续且完整(144/144),自愈完成(第1轮)
  🏁 自愈结果: ready | A级 (100.0%) | 修复轮次: 1
  ✅ 服务正常启动

🚀 实施计划 | Implementation Plan

第1步:代码修改

  1. 修改repair_executor.py中的_find_kline_gaps()方法
  2. 添加_generate_complete_timeline()辅助方法

预计时间: 30分钟

第2步:单元测试

  1. 编写时间序列生成的测试
  2. 编写K线完全缺失场景的测试
  3. 测试部分缺失场景的兼容性

预计时间: 1小时

第3步:集成测试

  1. 端到端的自愈流程测试
  2. 验证数据库一致性
  3. 性能基准测试

预计时间: 1小时

第4步:线上验证

  1. 灰度发布到测试环境
  2. 使用新币种进行验证
  3. 监控日志和性能指标

预计时间: 2-4小时

第5步:发布

  1. 全量发布到生产环境
  2. 监控系统健康状态
  3. 准备回滚方案

预计时间: 1小时

总计: 约5-7小时


📝 参考资源 | References

相关代码路径

  • src/utils/data_healing/repair_executor.py - 修复执行器
  • src/utils/data_healing/orchestrator.py - 自愈编排器
  • src/utils/data_healing/continuity_checker.py - 连续性检查器
  • src/services/realtime_kline_service_base.py - 服务基类

配置参数

  • TOTAL_WINDOW = 130 - K线窗口大小
  • BETA_WINDOW = 100 - 回归参数窗口
  • ZSCORE_WINDOW = 30 - zscore计算窗口
  • EXPECTED_INTERVAL_MINUTES = 5 - 预期间隔

相关问题

  • 数据库连接性
  • API速率限制
  • 数据一致性

📌 备注 | Notes

  1. 修复不会改变API接口:所有公开方法的签名保持不变
  2. 向后兼容:修改仅影响K线完全缺失的特殊分支
  3. 可测试性:新增方法_generate_complete_timeline()是纯函数,易于测试
  4. 监控建议:建议添加metrics来监控自愈成功率和修复记录数

📞 联系方式 | Contact

如有问题或反馈,请联系开发团队。

文档维护: Trading-in-websocket项目组
最后更新: 2026-02-15

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG