启动数据自愈系统BUG分析4
数据自愈系统严重 BUG — 第三轮因果链分析
分析日期: 2026-02-15
涉及文件:src/utils/data_healing/orchestrator.py,repair_executor.py,test_basic.py
背景: 第二轮重写修复了原始 3 大缺陷后,新代码引入了 4 个新 BUG
概览
orchestrator.py 头部注释声明"修复原有三大致命缺陷",但重写后的代码引入了新的严重问题。其中 BUG #1 为致命级别,会导致 _load_zscore_history() 的 SQL 查询在运行时直接失败,使自愈系统完全无法工作。
⚠️ CAUTION: BUG #1 导致 SQL 查询异常被静默吞掉,自愈系统认为"数据库中无历史数据",随后执行大量无效修复操作。
BUG #1(致命 🔴):SQL INTERVAL 参数化错误 — 查询永远失败
因果链
| 阶段 | 详情 |
|---|---|
| 输入 | heal_and_prepare(required_count=144) → _load_zscore_history(144) |
| 状态变化 | 计算 needed_hours = (144 × 240) / 60 = 576,生成 time_ranges_hours = [576, 749, 1152] |
| 调用路径 | orchestrator.py:392 → db_client.execute_query(query, params=(self.symbol, hours)) |
| 出错点 | orchestrator.py:388: SQL 中 INTERVAL '%s hours' |
| 根因 | psycopg3 参数占位符 %s 在 SQL 字符串字面量 '...' 内的行为不可预测 |
具体推演
# orchestrator.py 第 380-395 行
query = """
SELECT DISTINCT ON (kline_time)
kline_time, zscore_4h, analysis_time
FROM analysis_results
WHERE symbol = %s
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - INTERVAL '%s hours'
ORDER BY kline_time DESC, analysis_time DESC
"""
rows = self.db_client.execute_query(query, params=(self.symbol, hours))
psycopg3 (cur.execute(query, params)) 处理 %s 占位符时:
情况 A: psycopg3 不识别引号内的 %s
→ SQL 发送: INTERVAL '%s hours'
→ PostgreSQL 报错: invalid input syntax for type interval: "%s hours"
情况 B: psycopg3 识别并替换 %s
→ 值被 psycopg3 加引号: INTERVAL ''576' hours'
→ PostgreSQL 报错: syntax error at or near "576"
无论哪种情况,查询都会抛出异常。
异常被静默吞掉的后果链
execute_query() 抛出 Exception
↓
orchestrator.py:435 捕获:
except Exception as e:
logger.warning(f"加载历史数据失败 - 未知错误: {e}")
return [] # ← 返回空列表
↓
heal_and_prepare() 收到 records = []
↓
orchestrator.py:311: records 为空 → _determine_repair_targets() 走场景 1
→ _generate_expected_times(144) 生成 144 个时间点
↓
RepairExecutor.repair() 尝试修复这 144 个时间点
→ 但修复需要先加载 K 线 → K 线可能也不足 → 修复失败
↓
repaired_count = 0 → "修复无进展" → break
↓
最终: status = 'failed', 完整度 ≈ 0%
⚠️ 关键:
logger.warning级别的日志很容易被忽略,用户根本不知道 SQL 查询一直在失败。
修复方案
使用 make_interval() PostgreSQL 函数或安全的字符串格式化(hours 是内部计算的整数,不存在注入风险):
# 方案 A: PostgreSQL make_interval 函数(推荐)
query = """
SELECT DISTINCT ON (kline_time)
kline_time, zscore_4h, analysis_time
FROM analysis_results
WHERE symbol = %s
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - make_interval(hours => %s)
ORDER BY kline_time DESC, analysis_time DESC
"""
rows = self.db_client.execute_query(query, params=(self.symbol, hours))
# 方案 B: f-string(hours 是代码内部计算的整数,安全)
query = f"""
SELECT DISTINCT ON (kline_time)
kline_time, zscore_4h, analysis_time
FROM analysis_results
WHERE symbol = %s
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - INTERVAL '{hours} hours'
ORDER BY kline_time DESC, analysis_time DESC
"""
rows = self.db_client.execute_query(query, params=(self.symbol,))
BUG #2(严重 🟠):stale 变量引用 — 新鲜度修复目标逻辑混乱
因果链
| 阶段 | 详情 |
|---|---|
| 输入 | heal_and_prepare() 迭代循环中,数据连续充足但不新鲜 |
| 状态变化 | L166: missing_times = self._generate_stale_repair_targets(records) 赋值修复目标 |
| 调用路径 | orchestrator.py:172 → _determine_repair_targets() → L175 条件判断 |
| 出错点 | L175: if 'missing_times' in dir() and missing_times and not repair_targets: |
| 根因 | 'missing_times' in dir() 不是变量存在性检查的正确方式 + 变量名语义冲突 |
具体推演
# orchestrator.py 第 135-176 行(简化)
while iteration < self.max_iterations:
records = self._load_zscore_history(required_count)
if records:
is_continuous, missing_times, completeness = \ # ← L143: missing_times = checker 的结果
self.checker.check_continuity(records, required_count)
if is_continuous and len(records) >= required_count:
freshness_ok, staleness_min = self._check_freshness(records)
if freshness_ok:
break
else:
missing_times = self._generate_stale_repair_targets(records) # ← L166: 覆盖!
if not missing_times:
break
# 确定修复目标
repair_targets = self._determine_repair_targets(records, required_count) # ← L172
# 合并新鲜度修复目标
if 'missing_times' in dir() and missing_times and not repair_targets: # ← L175: BUG
repair_targets = missing_times
三重问题:
-
'missing_times' in dir()不可靠 —dir()在方法内部调用时返回局部作用域 + 外围作用域的所有名称,其行为不确定且与 Python 版本有关。正确方式是使用局部标志变量。 -
变量名
missing_times语义冲突 — L143 的missing_times是连续性检查缺失的时间点,L166 的missing_times是新鲜度修复目标,两者语义完全不同,却复用同一个变量名。 -
重复调用 checker — L172
_determine_repair_targets()内部(L316)会再次调用checker.check_continuity(),重复计算。当数据连续且充足时,_determine_repair_targets()返回[],然后 L175 才用missing_times。逻辑最终可能正确,但非常脆弱。
危险场景
第 1 轮迭代:
records 有数据但不足 → is_continuous=False
missing_times = checker 返回的缺失时间点(L143)
repair_targets = missing_times(L176 不执行,因为 repair_targets 有值)
→ 正常
第 2 轮迭代:
records 连续且充足 → is_continuous=True, freshness_ok=False
missing_times = _generate_stale_repair_targets()(L166 覆盖)
repair_targets = _determine_repair_targets() → [](因为数据连续充足)
L175: 'missing_times' in dir() → True(上轮迭代已存在)
repair_targets = missing_times(stale targets)
→ 勉强正确,但如果 L166 没执行(freshness_ok=True 直接 break),
missing_times 仍是第 1 轮 checker 的结果 → 错误!
修复方案
# 使用显式标志变量
stale_repair_targets = [] # ← 初始化在循环外
while iteration < self.max_iterations:
records = self._load_zscore_history(required_count)
stale_repair_targets = [] # ← 每轮重置
if records:
is_continuous, missing_times, completeness = \
self.checker.check_continuity(records, required_count)
if is_continuous and len(records) >= required_count:
freshness_ok, staleness_min = self._check_freshness(records)
if freshness_ok:
break
else:
stale_repair_targets = self._generate_stale_repair_targets(records)
if not stale_repair_targets:
break
repair_targets = self._determine_repair_targets(records, required_count)
if stale_repair_targets and not repair_targets:
repair_targets = stale_repair_targets
BUG #3(中等 🟡):_extract_kline_window 窗口边界可能偏移一个周期
因果链
| 阶段 | 详情 |
|---|---|
| 输入 | _repair_from_klines(missing_times) 遍历每个 missing_time |
| 状态变化 | 为每个 missing_time 提取 TOTAL_WINDOW=130 条 K 线窗口 |
| 调用路径 | repair_executor.py:142-143 → _extract_kline_window(df, missing_time, 130) |
| 出错点 | repair_executor.py:293: mask = df['time'] < end_time — 严格小于 |
| 根因 | 排除了 missing_time 本身的 K 线,可能导致 zscore 计算偏移 |
具体推演
# repair_executor.py 第 285-300 行
def _extract_kline_window(self, df, end_time, window_size):
mask = df['time'] < end_time # ← 严格小于,排除了 end_time 本身
subset = df.loc[mask].tail(window_size)
if len(subset) < window_size:
return None
return subset.to_dict('records')
对于 missing_time = T:
K 线时间: ..., T-8h, T-4h, T ← T 的 K 线被排除
zscore 计算:
使用 [T-8h, T-4h] 的数据计算 → 比应有结果滞后一个周期
如果业务意图是"基于 T 及之前的数据计算",则应使用 <= T
注意: 这取决于业务语义。如果 zscore 的约定是"使用严格历史数据(不含目标时间点自身)",则
<是正确的。需要与实时分析路径(realtime_kline_service_base.py中的analyze_multi_period())对比确认一致性。
修复方案(需确认业务语义后决定)
# 如果业务需要包含当前时间点:
mask = df['time'] <= end_time
BUG #4(低 🟢):test_basic.py 测试代码与当前实现不兼容
错误清单
| 行号 | 错误 | 影响 |
|---|---|---|
| 28 | ContinuityChecker() 无参构造 — 但当前实现必须传入 interval_minutes |
测试直接 TypeError 崩溃 |
| 180 | assert timeframe_to_minutes('unknown') == 5 — 但当前实现会 raise ValueError |
断言错误 |
错误 1 详细分析
# test_basic.py 第 28 行
checker = ContinuityChecker() # ❌ 缺少必需参数
# 当前 continuity_checker.py 第 20 行
class ContinuityChecker:
def __init__(self, interval_minutes: int, tolerance_ratio: float = TOLERANCE_RATIO):
if interval_minutes <= 0:
raise ValueError(...)
原因: 测试代码是为旧版本(EXPECTED_INTERVAL_MINUTES 硬编码)编写的,重写后 interval_minutes 变为必需参数,但测试未同步更新。
错误 2 详细分析
# test_basic.py 第 180 行
assert timeframe_to_minutes('unknown') == 5 # ❌ 旧行为:fallback 到默认值
# 当前 config.py 第 29-36 行
def timeframe_to_minutes(timeframe: str) -> int:
if timeframe not in TIMEFRAME_MINUTES:
raise ValueError(f"不支持的 timeframe: {timeframe!r}...") # ← 新行为:抛异常
return TIMEFRAME_MINUTES[timeframe]
原因: 重写后 timeframe_to_minutes() 改为严格模式(不识别的 timeframe 直接报错),测试仍期望旧的 fallback 行为。
修复方案
# 修复 test_basic.py
# 第 28 行: 传入正确的 interval_minutes
checker = ContinuityChecker(interval_minutes=5) # 5m 周期
# 第 180 行: 改为测试异常抛出
try:
timeframe_to_minutes('unknown')
assert False, "应该抛出 ValueError"
except ValueError:
pass # 预期行为
BUG 联合效应
程序启动
↓
_run_data_healing()
↓
DataHealingOrchestrator.heal_and_prepare(required_count=144)
↓
_load_zscore_history(144)
├── needed_hours = 576
└── SQL: INTERVAL '%s hours' ← BUG #1 致命
↓
execute_query 抛出异常
↓
except: return [] ← 异常被静默吞掉
↓
records = []
↓
_determine_repair_targets([], 144)
↓
_generate_expected_times(144) ← 生成 144 个时间点
↓
RepairExecutor.repair(144 个目标)
├── _find_kline_gaps() ← 可能成功
├── _fill_kline_gaps() ← 可能成功
└── _repair_from_klines()
├── _extract_kline_window() ← BUG #3(边界偏移)
└── 依赖 K 线数据是否充足
↓
可能部分成功,也可能 repaired_count=0
↓
BUG #2 的变量混乱可能导致
后续迭代的修复目标不正确
↓
最终状态: 'failed' 或 'degraded'
优先级排序
| 优先级 | BUG | 严重性 | 修复复杂度 |
|---|---|---|---|
| P0 | BUG #1: SQL INTERVAL 参数化 | 致命 🔴 | 低(改一行) |
| P1 | BUG #2: stale 变量引用 | 严重 🟠 | 低(重命名变量) |
| P2 | BUG #3: 窗口边界 | 中等 🟡 | 低(需确认语义) |
| P3 | BUG #4: 测试不兼容 | 低 🟢 | 低(更新测试) |