启动数据自愈系统BUG分析4

数据自愈系统严重 BUG — 第三轮因果链分析

分析日期: 2026-02-15
涉及文件: src/utils/data_healing/orchestrator.py, repair_executor.py, test_basic.py
背景: 第二轮重写修复了原始 3 大缺陷后,新代码引入了 4 个新 BUG


概览

orchestrator.py 头部注释声明"修复原有三大致命缺陷",但重写后的代码引入了新的严重问题。其中 BUG #1 为致命级别,会导致 _load_zscore_history() 的 SQL 查询在运行时直接失败,使自愈系统完全无法工作。

⚠️ CAUTION: BUG #1 导致 SQL 查询异常被静默吞掉,自愈系统认为"数据库中无历史数据",随后执行大量无效修复操作。


BUG #1(致命 🔴):SQL INTERVAL 参数化错误 — 查询永远失败

因果链

阶段 详情
输入 heal_and_prepare(required_count=144)_load_zscore_history(144)
状态变化 计算 needed_hours = (144 × 240) / 60 = 576,生成 time_ranges_hours = [576, 749, 1152]
调用路径 orchestrator.py:392db_client.execute_query(query, params=(self.symbol, hours))
出错点 orchestrator.py:388: SQL 中 INTERVAL '%s hours'
根因 psycopg3 参数占位符 %s 在 SQL 字符串字面量 '...' 内的行为不可预测

具体推演

# orchestrator.py 第 380-395 行
query = """
    SELECT DISTINCT ON (kline_time)
        kline_time, zscore_4h, analysis_time
    FROM analysis_results
    WHERE symbol = %s
      AND zscore_4h IS NOT NULL
      AND kline_time >= NOW() - INTERVAL '%s hours'
    ORDER BY kline_time DESC, analysis_time DESC
"""
rows = self.db_client.execute_query(query, params=(self.symbol, hours))

psycopg3 (cur.execute(query, params)) 处理 %s 占位符时:

情况 A: psycopg3 不识别引号内的 %s
  → SQL 发送: INTERVAL '%s hours'
  → PostgreSQL 报错: invalid input syntax for type interval: "%s hours"

情况 B: psycopg3 识别并替换 %s
  → 值被 psycopg3 加引号: INTERVAL ''576' hours'
  → PostgreSQL 报错: syntax error at or near "576"

无论哪种情况,查询都会抛出异常。

异常被静默吞掉的后果链

execute_query() 抛出 Exception
  ↓
orchestrator.py:435 捕获:
  except Exception as e:
      logger.warning(f"加载历史数据失败 - 未知错误: {e}")
      return []       # ← 返回空列表
  ↓
heal_and_prepare() 收到 records = []
  ↓
orchestrator.py:311: records 为空 → _determine_repair_targets() 走场景 1
  → _generate_expected_times(144) 生成 144 个时间点
  ↓
RepairExecutor.repair() 尝试修复这 144 个时间点
  → 但修复需要先加载 K 线 → K 线可能也不足 → 修复失败
  ↓
repaired_count = 0 → "修复无进展" → break
  ↓
最终: status = 'failed', 完整度 ≈ 0%

⚠️ 关键: logger.warning 级别的日志很容易被忽略,用户根本不知道 SQL 查询一直在失败。

修复方案

使用 make_interval() PostgreSQL 函数或安全的字符串格式化(hours 是内部计算的整数,不存在注入风险):

# 方案 A: PostgreSQL make_interval 函数(推荐)
query = """
    SELECT DISTINCT ON (kline_time)
        kline_time, zscore_4h, analysis_time
    FROM analysis_results
    WHERE symbol = %s
      AND zscore_4h IS NOT NULL
      AND kline_time >= NOW() - make_interval(hours => %s)
    ORDER BY kline_time DESC, analysis_time DESC
"""
rows = self.db_client.execute_query(query, params=(self.symbol, hours))

# 方案 B: f-string(hours 是代码内部计算的整数,安全)
query = f"""
    SELECT DISTINCT ON (kline_time)
        kline_time, zscore_4h, analysis_time
    FROM analysis_results
    WHERE symbol = %s
      AND zscore_4h IS NOT NULL
      AND kline_time >= NOW() - INTERVAL '{hours} hours'
    ORDER BY kline_time DESC, analysis_time DESC
"""
rows = self.db_client.execute_query(query, params=(self.symbol,))

BUG #2(严重 🟠):stale 变量引用 — 新鲜度修复目标逻辑混乱

因果链

阶段 详情
输入 heal_and_prepare() 迭代循环中,数据连续充足但不新鲜
状态变化 L166: missing_times = self._generate_stale_repair_targets(records) 赋值修复目标
调用路径 orchestrator.py:172_determine_repair_targets() → L175 条件判断
出错点 L175: if 'missing_times' in dir() and missing_times and not repair_targets:
根因 'missing_times' in dir() 不是变量存在性检查的正确方式 + 变量名语义冲突

具体推演

# orchestrator.py 第 135-176 行(简化)
while iteration < self.max_iterations:
    records = self._load_zscore_history(required_count)
    if records:
        is_continuous, missing_times, completeness = \      # ← L143: missing_times = checker 的结果
            self.checker.check_continuity(records, required_count)

        if is_continuous and len(records) >= required_count:
            freshness_ok, staleness_min = self._check_freshness(records)
            if freshness_ok:
                break
            else:
                missing_times = self._generate_stale_repair_targets(records)  # ← L166: 覆盖!
                if not missing_times:
                    break

    # 确定修复目标
    repair_targets = self._determine_repair_targets(records, required_count)  # ← L172

    # 合并新鲜度修复目标
    if 'missing_times' in dir() and missing_times and not repair_targets:     # ← L175: BUG
        repair_targets = missing_times

三重问题:

  1. 'missing_times' in dir() 不可靠dir() 在方法内部调用时返回局部作用域 + 外围作用域的所有名称,其行为不确定且与 Python 版本有关。正确方式是使用局部标志变量。

  2. 变量名 missing_times 语义冲突 — L143 的 missing_times 是连续性检查缺失的时间点,L166 的 missing_times 是新鲜度修复目标,两者语义完全不同,却复用同一个变量名。

  3. 重复调用 checker — L172 _determine_repair_targets() 内部(L316)会再次调用 checker.check_continuity(),重复计算。当数据连续且充足时,_determine_repair_targets() 返回 [],然后 L175 才用 missing_times逻辑最终可能正确,但非常脆弱。

危险场景

第 1 轮迭代:
  records 有数据但不足 → is_continuous=False
  missing_times = checker 返回的缺失时间点(L143)
  repair_targets = missing_times(L176 不执行,因为 repair_targets 有值)
  → 正常

第 2 轮迭代:
  records 连续且充足 → is_continuous=True, freshness_ok=False
  missing_times = _generate_stale_repair_targets()(L166 覆盖)
  repair_targets = _determine_repair_targets() → [](因为数据连续充足)
  L175: 'missing_times' in dir() → True(上轮迭代已存在)
  repair_targets = missing_times(stale targets)
  → 勉强正确,但如果 L166 没执行(freshness_ok=True 直接 break),
    missing_times 仍是第 1 轮 checker 的结果 → 错误!

修复方案

# 使用显式标志变量
stale_repair_targets = []       # ← 初始化在循环外

while iteration < self.max_iterations:
    records = self._load_zscore_history(required_count)
    stale_repair_targets = []   # ← 每轮重置

    if records:
        is_continuous, missing_times, completeness = \
            self.checker.check_continuity(records, required_count)

        if is_continuous and len(records) >= required_count:
            freshness_ok, staleness_min = self._check_freshness(records)
            if freshness_ok:
                break
            else:
                stale_repair_targets = self._generate_stale_repair_targets(records)
                if not stale_repair_targets:
                    break

    repair_targets = self._determine_repair_targets(records, required_count)

    if stale_repair_targets and not repair_targets:
        repair_targets = stale_repair_targets

BUG #3(中等 🟡):_extract_kline_window 窗口边界可能偏移一个周期

因果链

阶段 详情
输入 _repair_from_klines(missing_times) 遍历每个 missing_time
状态变化 为每个 missing_time 提取 TOTAL_WINDOW=130 条 K 线窗口
调用路径 repair_executor.py:142-143_extract_kline_window(df, missing_time, 130)
出错点 repair_executor.py:293: mask = df['time'] < end_time — 严格小于
根因 排除了 missing_time 本身的 K 线,可能导致 zscore 计算偏移

具体推演

# repair_executor.py 第 285-300 行
def _extract_kline_window(self, df, end_time, window_size):
    mask = df['time'] < end_time    # ← 严格小于,排除了 end_time 本身
    subset = df.loc[mask].tail(window_size)
    if len(subset) < window_size:
        return None
    return subset.to_dict('records')
对于 missing_time = T:
  K 线时间: ..., T-8h, T-4h, T  ← T 的 K 线被排除

zscore 计算:
  使用 [T-8h, T-4h] 的数据计算 → 比应有结果滞后一个周期
  如果业务意图是"基于 T 及之前的数据计算",则应使用 <= T

注意: 这取决于业务语义。如果 zscore 的约定是"使用严格历史数据(不含目标时间点自身)",则 < 是正确的。需要与实时分析路径(realtime_kline_service_base.py 中的 analyze_multi_period())对比确认一致性。

修复方案(需确认业务语义后决定)

# 如果业务需要包含当前时间点:
mask = df['time'] <= end_time

BUG #4(低 🟢):test_basic.py 测试代码与当前实现不兼容

错误清单

行号 错误 影响
28 ContinuityChecker() 无参构造 — 但当前实现必须传入 interval_minutes 测试直接 TypeError 崩溃
180 assert timeframe_to_minutes('unknown') == 5 — 但当前实现会 raise ValueError 断言错误

错误 1 详细分析

# test_basic.py 第 28 行
checker = ContinuityChecker()   # ❌ 缺少必需参数

# 当前 continuity_checker.py 第 20 行
class ContinuityChecker:
    def __init__(self, interval_minutes: int, tolerance_ratio: float = TOLERANCE_RATIO):
        if interval_minutes <= 0:
            raise ValueError(...)

原因: 测试代码是为旧版本(EXPECTED_INTERVAL_MINUTES 硬编码)编写的,重写后 interval_minutes 变为必需参数,但测试未同步更新。

错误 2 详细分析

# test_basic.py 第 180 行
assert timeframe_to_minutes('unknown') == 5   # ❌ 旧行为:fallback 到默认值

# 当前 config.py 第 29-36 行
def timeframe_to_minutes(timeframe: str) -> int:
    if timeframe not in TIMEFRAME_MINUTES:
        raise ValueError(f"不支持的 timeframe: {timeframe!r}...")   # ← 新行为:抛异常
    return TIMEFRAME_MINUTES[timeframe]

原因: 重写后 timeframe_to_minutes() 改为严格模式(不识别的 timeframe 直接报错),测试仍期望旧的 fallback 行为。

修复方案

# 修复 test_basic.py

# 第 28 行: 传入正确的 interval_minutes
checker = ContinuityChecker(interval_minutes=5)   # 5m 周期

# 第 180 行: 改为测试异常抛出
try:
    timeframe_to_minutes('unknown')
    assert False, "应该抛出 ValueError"
except ValueError:
    pass  # 预期行为

BUG 联合效应

程序启动
  ↓
_run_data_healing()
  ↓
DataHealingOrchestrator.heal_and_prepare(required_count=144)
  ↓
_load_zscore_history(144)
  ├── needed_hours = 576
  └── SQL: INTERVAL '%s hours'                ← BUG #1 致命
        ↓
      execute_query 抛出异常
        ↓
      except: return []                        ← 异常被静默吞掉
        ↓
      records = []
        ↓
      _determine_repair_targets([], 144)
        ↓
      _generate_expected_times(144)            ← 生成 144 个时间点
        ↓
      RepairExecutor.repair(144 个目标)
        ├── _find_kline_gaps()                 ← 可能成功
        ├── _fill_kline_gaps()                 ← 可能成功
        └── _repair_from_klines()
              ├── _extract_kline_window()      ← BUG #3(边界偏移)
              └── 依赖 K 线数据是否充足
        ↓
      可能部分成功,也可能 repaired_count=0
        ↓
      BUG #2 的变量混乱可能导致
      后续迭代的修复目标不正确
        ↓
      最终状态: 'failed' 或 'degraded'

优先级排序

优先级 BUG 严重性 修复复杂度
P0 BUG #1: SQL INTERVAL 参数化 致命 🔴 低(改一行)
P1 BUG #2: stale 变量引用 严重 🟠 低(重命名变量)
P2 BUG #3: 窗口边界 中等 🟡 低(需确认语义)
P3 BUG #4: 测试不兼容 低 🟢 低(更新测试)

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG