启动数据自愈系统BUG分析2
数据自愈系统严重BUG分析报告
文档版本: 1.0
分析日期: 2026-02-15
严重性等级: 🔴 Critical (P0)
状态: 已确认,待修复
📋 执行摘要 | Executive Summary
BUG描述
Trading-in-websocket项目的数据自愈系统(Data Healing System)在启动时存在严重逻辑缺陷。当交易对的K线数据完全缺失时,自愈流程无法正常工作,导致修复失败,最终可能造成服务拒绝启动或降级启动。
核心问题
概念混淆:代码混淆了两个不同的时间点概念:
- zscore缺失时间点 (missing_times):需要计算zscore的目标时间点
- K线缺失时间点 (kline_gaps):计算zscore所需的输入K线时间点
当K线完全缺失时,代码只补充了zscore缺失的时间点,而忽视了计算zscore所需的130条K线完整窗口。
影响范围
- 新币种首次接入(K线表为空)
- 数据迁移后K线数据未同步
- 数据库故障恢复场景
修复复杂度
低 - 仅需修改一个方法,新增一个辅助方法
🎯 问题详解 | Problem Details
1. BUG位置
| 项目 | 详情 |
|---|---|
| 文件 | src/utils/data_healing/repair_executor.py |
| 方法 | RepairExecutor._find_kline_gaps() |
| 行号 | 222-267行(具体BUG在第258行) |
| 关键配置 | TOTAL_WINDOW = 130(BETA_WINDOW=100 + ZSCORE_WINDOW=30) |
2. BUG代码
# repair_executor.py:249-266
for sym in [symbol, base_symbol]:
klines = self.kline_repo.query_range(
symbol=sym,
timeframe=timeframe,
start_time=start_time,
end_time=end_time,
limit=10000
)
if not klines:
# ⚠️ BUG: 当K线完全缺失时,只把missing_times加入缺口列表
all_gaps.update(missing_times) # ❌ 错误逻辑
logger.warning(f"K 线完全缺失: {sym} @ {timeframe} [{start_time} ~ {end_time}]")
else:
_, sym_missing = self.kline_filler.validate_continuity(klines, timeframe)
if sym_missing:
all_gaps.update(sym_missing)
logger.debug(f"K 线缺口: {sym} @ {timeframe} | {len(sym_missing)} 个")
return sorted(all_gaps)
3. 问题根源
3.1 时间范围计算
# orchestrator.py:244-246
lookback_minutes = TOTAL_WINDOW * self._interval_minutes(timeframe)
start_time = min(missing_times) - timedelta(minutes=lookback_minutes)
end_time = max(missing_times)
# 数值示例(timeframe='5m')
# TOTAL_WINDOW = 130
# lookback_minutes = 130 * 5 = 650分钟 = 10小时50分钟
# start_time = min(missing_times) - 650分钟
# end_time = max(missing_times)
3.2 窗口依赖关系
计算任意时刻 t 的zscore需要:
- 输入: 130条K线窗口
[t-650分钟, t) - 组成:
- BETA_WINDOW (100条): 计算稳定的回归参数
- ZSCORE_WINDOW (30条): 计算敏感的均值回归指标
- 失败条件: 窗口不足130条
3.3 为什么代码失败
输入场景:
symbol = "PURR/USDC:USDC" # 新币种,K线表为空
missing_times = [t1, t2, t3] # 只有3个缺失的zscore时间点
当前代码执行流程:
1. _find_kline_gaps(missing_times=[t1, t2, t3])
↓
2. 计算范围: start_time = t1 - 650分钟, end_time = t3
↓
3. 查询K线: klines = query_range(symbol, start_time, end_time)
结果: klines = [] (完全没有K线)
↓
4. BUG触发: all_gaps.update(missing_times)
结果: all_gaps = {t1, t2, t3} # 只有3个时间点!
↓
5. _fill_kline_gaps(kline_gaps={t1, t2, t3})
补充K线: 只补充了3条K线
↓
6. _repair_from_klines(missing_times=[t1, t2, t3])
↓
7. 加载K线: alt_klines = query_range(start_time, end_time)
结果: [t1_kline, t2_kline, t3_kline] # 只有3条!
↓
8. 计算t1的zscore:
需要: [t1-650分钟, t1) 的130条K线
实际: 只有0条K线 (t1是最早的)
结果: _extract_kline_window返回None ❌
↓
9. 计算t2的zscore:
需要: [t2-650分钟, t2) 的130条K线
实际: 只有1条K线 (只有t1)
结果: _extract_kline_window返回None ❌
↓
10. 计算t3的zscore:
需要: [t3-650分钟, t3) 的130条K线
实际: 只有2条K线 (t1, t2)
结果: _extract_kline_window返回None ❌
↓
11. 修复结果: repaired_count = 0
原因: 所有zscore计算都失败了
最终后果:
- 第一轮迭代: 无进展 (repaired_count = 0)
- 继续迭代: BREAK (logger.warning "无进展,可能在冷却期或数据源不可用")
- 质量评估: completeness_pct = 3/144 ≈ 2% < 60%
- 启动状态: 'failed'
- 服务启动: ❌ 拒绝启动
正确代码应该做什么:
1. 识别K线完全缺失
2. 生成完整的时间序列: [t1-650分钟, t1-645分钟, ..., t3] (~130个)
3. 补充这130条K线
4. 重新加载: 成功获得130条K线
5. 计算t1的zscore: 需要130条 → 有130条 ✅
6. 计算t2的zscore: 需要130条 → 有130条 ✅
7. 计算t3的zscore: 需要130条 → 有130条 ✅
8. 修复结果: repaired_count = 3 ✅
9. 质量评估: completeness_pct = 144/144 = 100% ✅
10. 启动状态: 'ready' ✅
📊 完整因果链 | Complete Causal Chain
输入 (Input)
触发条件:
- 项目启动时,RealtimeKlineService初始化
- 数据库中某个symbol的zscore数据存在(analysis_results表有记录)
- 但该symbol的K线数据完全缺失(klines表为空)
具体数据示例:
symbol: "PURR/USDC:USDC"
base_symbol: "HYPE/USDC:USDC"
missing_times: [2026-02-15 10:00, 2026-02-15 10:30, 2026-02-15 11:00]
klines表状态: 空(没有任何PURR/USDC:USDC的K线数据)
required_count: 144 # 期望的历史记录数
状态变化 (State Changes)
第1阶段:初始化和检测
状态:
- RealtimeKlineService启动
- 查询数据库发现"PURR/USDC:USDC"有zscore数据
- 触发数据自愈流程
组件创建:
- DataHealingOrchestrator(symbol="PURR/USDC:USDC", base_symbol="HYPE/USDC:USDC")
- ContinuityChecker: 检查连续性
- RepairExecutor: 执行修复
- QualityAssessor: 评估质量
第2阶段:发现缺失
_load_zscore_history(144):
查询结果: 从analysis_results表查到部分记录(<144条)
check_continuity():
检查结果:
is_continuous: False
missing_times: [t1, t2, t3] # 发现缺失时间点
completeness_pct: 3/144 = 2.1%
流程: 发现不连续,进入修复流程
第3阶段:K线检查 (BUG触发点)
repair(missing_times=[t1, t2, t3], symbol="PURR/USDC:USDC", timeframe='4h'):
Step 1: _find_kline_gaps()
计算范围:
lookback_minutes = 130 * 60 = 7800分钟 (对于4h,interval=240)
start_time = t1 - 7800分钟 = t1 - 5.4小时
end_time = t3
查询K线:
for sym in ["PURR/USDC:USDC", "HYPE/USDC:USDC"]:
klines = query_range(sym, '4h', start_time, end_time)
sym="PURR/USDC:USDC":
klines = [] # ❌ 完全没有K线
↓
BUG触发: all_gaps.update([t1, t2, t3]) # 只添加了3个
sym="HYPE/USDC:USDC":
klines = [...] # 有数据
validate_continuity() 返回部分缺口
all_gaps.update(sym_missing)
返回: all_gaps = {t1, t2, t3, ...其他缺口} # 不完整!
BUG结果:
应该返回: ~130个时间点 (start_time到end_time的完整序列)
实际返回: 仅3个 + 部分缺口
后果: 补充的K线远不足130条
第4阶段:K线补充(不充分)
_fill_kline_gaps(kline_gaps={t1, t2, t3, ...}):
for sym in ["PURR/USDC:USDC", "HYPE/USDC:USDC"]:
filled = fill_missing_data_precise(symbol=sym, missing_timestamps=gaps)
结果:
PURR: 补充了约3条K线
HYPE: 补充了部分缺口
问题: 总K线数远不足130条!
第5阶段:修复失败(无法提取窗口)
_repair_from_klines(missing_times=[t1, t2, t3]):
加载K线:
alt_klines = query_range(PURR, start_time, end_time)
base_klines = query_range(HYPE, start_time, end_time)
结果: alt_klines只有~3-5条,远不足130条
计算zscore:
for missing_time in [t1, t2, t3]:
window = _extract_kline_window(alt_df, missing_time, TOTAL_WINDOW=130)
缺陷:
mask = alt_df['time'] < missing_time
subset = alt_df.loc[mask].tail(130)
if len(subset) < 130: # ❌ 这会一直为真
return None
结果: 所有zscore计算都返回None
最终: repaired_count = 0 (所有计算都失败)
第6阶段:修复无进展
修复循环检查:
if repaired_count == 0:
logger.warning("⚠️ 第1轮修复无进展,无可用K线数据")
BREAK
原因分析:
代码认为: "无进展,可能在冷却期或数据源不可用"
实际原因: 补充的K线数量不足,无法构建完整窗口 ❌
第7阶段:质量评估和启动决策
质量评估:
final_records: 仍然只有原始的部分记录
completeness_pct: 3/144 ≈ 2.1%
启动状态判定:
if completeness_pct < 60%:
return 'failed' # ❌ 拒绝启动
最终结果: HealingResult(status='failed', quality_grade='F')
服务启动决策:
if result.status == 'failed':
logger.error("数据质量不足,无法启动交易服务")
sys.exit(1) # ❌ 服务无法启动
调用路径 (Call Path)
程序启动 (main/cli entry point)
↓
RealtimeKlineService.__init__()
[src/services/realtime_kline_service.py]
↓
RealtimeKlineServiceBase.__init__()
[src/services/realtime_kline_service_base.py:1418-1496]
↓
_run_data_healing()
# 查询有zscore数据的symbol
symbols_with_data = {"PURR/USDC:USDC", "OTHER/USDC:USDC", ...}
heal_symbols = symbols_with_data ∩ active_symbols - {base_symbol}
↓
FOR symbol IN heal_symbols:
↓
DataHealingOrchestrator(
db_client, kline_repo,
symbol="PURR/USDC:USDC",
base_symbol="HYPE/USDC:USDC",
repair_timeframe='4h'
)
↓
heal_and_prepare(required_count=144)
[src/utils/data_healing/orchestrator.py:126-293]
↓
WHILE iteration < 3: # 最多3轮迭代
↓
_load_zscore_history(144)
# 查询analysis_results表
# 结果: 部分记录(<144条)
↓
ContinuityChecker.check_continuity(records, 144)
# 检查连续性
# 结果: is_continuous=False, missing_times=[t1,t2,t3]
↓
RepairExecutor.repair(
missing_times=[t1, t2, t3],
symbol="PURR/USDC:USDC",
base_symbol="HYPE/USDC:USDC",
timeframe='4h'
)
[src/utils/data_healing/repair_executor.py:55-93]
↓
Step 1: _find_kline_gaps(missing_times, symbol, base_symbol, '4h')
[222-267行] ⚠️ BUG在这里
↓
计算时间范围:
lookback_minutes = 130 * 60 = 7800分钟
start_time = t1 - 7800分钟
end_time = t3
↓
FOR sym IN [symbol, base_symbol]:
klines = query_range(sym, '4h', start_time, end_time)
IF not klines: # K线完全缺失
❌ BUG: all_gaps.update(missing_times) # 只添加[t1,t2,t3]
# ✅ 应该: all_gaps.update(generate_timeline(start_time, end_time))
↓
RETURN sorted(all_gaps) # 返回不完整的缺口
↓
Step 2: _fill_kline_gaps(kline_gaps, symbol, base_symbol, '4h')
[269-294行]
↓
KlineDataFiller.fill_missing_data_precise()
# 只补充了不完整的K线
↓
Step 3: _repair_from_klines(missing_times, symbol, base_symbol, '4h')
[95-220行]
↓
alt_klines = query_range(symbol, '4h', start_time, end_time)
base_klines = query_range(base_symbol, '4h', start_time, end_time)
# 只能获得补充的部分K线,远不足130条
↓
FOR missing_time IN [t1, t2, t3]:
↓
_extract_kline_window(alt_df, missing_time, TOTAL_WINDOW=130)
# 需要130条窗口,实际只有2-3条
# 返回: None ❌
↓
zscore计算失败
↓
repaired_count = 0 # 没有任何记录被修复
↓
IF repaired_count == 0:
logger.warning("⚠️ 第1轮修复无进展...")
BREAK # 退出迭代循环
↓
质量评估:
completeness = 3/144 ≈ 2.1%
status = 'failed' # < 60% 阈值
↓
RETURN HealingResult(status='failed')
↓
IF result.status == 'failed':
logger.error("数据质量不足,拒绝启动")
failed += 1
↓
服务启动失败 ❌
出错点 (Bug Point)
具体代码 (repair_executor.py:258)
if not klines:
all_gaps.update(missing_times) # ❌ 错误
问题分析
| 方面 | 详情 |
|---|---|
| 混淆的概念 | 把"需要修复的zscore时间点"误用为"需要补充的K线时间点" |
| 丢失的信息 | 计算zscore需要130条K线的窗口依赖关系 |
| 结果 | 补充的K线数量:3条 vs 需要:130条 |
| 失败原因 | 无法构建完整的窗口来计算zscore |
根因 (Root Cause)
1. 概念混淆
zscore缺失时间点 (missing_times):
- 定义: analysis_results表中zscore_4h为NULL的时间点
- 数量: 通常少量(几个到几十个)
- 用途: 标记需要修复的目标
K线缺失时间点 (kline_gaps):
- 定义: klines表中缺失的K线数据的时间点
- 数量: 可能很多(包含完整窗口)
- 用途: 标记需要补充的输入数据
关键区别:
zscore缺失 ≠ K线缺失
例如:
- zscore缺失时间点: [t1, t2, t3] (3个)
- 但计算这3个的zscore需要: [t1-650分钟, t1), [t2-650分钟, t2), ...
- 总共需要约: 3 * 130 = 390条K线(有重叠,实际需要~130条)
2. 窗口依赖关系被忽视
计算 missing_time 的zscore流程:
missing_time (目标时间点)
↓
需要130条K线窗口: [missing_time - 650分钟, missing_time)
↓
窗口组成:
- BETA_WINDOW (100条): 用于计算稳定的回归参数
公式: beta = cov(base_returns, alt_returns) / var(base_returns)
需要足够数据确保参数稳定
↓
- ZSCORE_WINDOW (30条): 用于计算敏感的均值回归指标
公式: zscore = (price_spread - mean(price_spread)) / std(price_spread)
需要最近30条数据计算动态的均值和标准差
↓
如果窗口不完整 (< 130条):
- 无法计算稳定的beta
- 无法计算准确的zscore
- 最终: _extract_kline_window返回None, zscore计算失败
3. 问题代码的逻辑缺陷
# 当前的错误理解
def _find_kline_gaps(missing_times, ...):
if not klines: # K线完全缺失
all_gaps.update(missing_times) # ❌ 错误理解
# 假设: "我需要修复[t1,t2,t3],那就补充[t1,t2,t3]的K线"
# 错误: 忽视了修复[t1,t2,t3]需要[t1-650, t3]范围的所有K线
# 正确的理解应该是
def _find_kline_gaps(missing_times, ...):
# 计算需要的时间范围(包含窗口)
start_time = min(missing_times) - 650分钟
end_time = max(missing_times)
if not klines: # K线完全缺失
# 需要补充整个时间范围的所有K线
all_gaps.update(generate_complete_timeline(start_time, end_time))
# 假设: "我需要修复[t1,t2,t3],但计算它们需要[t1-650, t3]的完整K线"
# 正确: 识别真正需要补充的K线时间点
💥 影响范围 | Impact Analysis
影响场景
场景1: 新币种首次接入
触发时机:
- 新币种(如PURR/USDC:USDC)刚开始交易
- WebSocket接收到实时5m K线
- 写入klines表(历史数据为空)
- 实时分析生成zscore,写入analysis_results表
- 服务重启或滚动升级
影响:
- 数据自愈失败
- 服务拒绝启动或降级启动
- 新币种交易无法进行
- 需要人工介入,手动补充数据
场景2: 数据迁移后
触发时机:
- 数据库从旧系统迁移到新系统
- K线表数据迁移不完整
- 但analysis_results表有残留记录
影响:
- 自愈流程失败
- 影响多个symbol同时
- 服务完全无法启动
- 交易中断,业务损失
场景3: 数据库故障恢复
触发时机:
- 数据库故障/备份恢复
- K线表数据丢失
- analysis_results表恢复但不完整
影响:
- 自愈无法修复不一致的数据
- 服务启动失败
- 恢复时间延长
影响等级
| 影响维度 | 严重程度 | 描述 |
|---|---|---|
| 服务可用性 | 🔴 Critical | 可能导致服务完全无法启动 |
| 数据准确性 | 🟠 High | zscore数据不完整,分析结果不可信 |
| 交易损失 | 🟠 High | 无法及时交易,错失市场机会 |
| 人工成本 | 🟠 High | 需要人工介入恢复,技术支持成本高 |
| 系统稳定性 | 🔴 Critical | 启动流程脆弱,易受数据状态影响 |
实际业务影响
轻度影响 (单个symbol):
- 该symbol以"降级模式"启动
- 交易策略运行但数据不可靠
- 可能造成错误的交易信号
中度影响 (多个symbol):
- 部分symbol启动失败
- 服务以"预热模式"运行
- 一部分交易功能不可用
- 用户投诉增加
严重影响 (全局):
- 所有symbol启动失败
- 服务完全无法工作
- 需要紧急停止交易
- 重大业务中断
- 用户资金安全风险
🔧 修复方案 | Fix Plan
修复策略
添加_generate_complete_timeline()方法,在K线完全缺失时生成完整的时间序列。
修改代码
1. 修改_find_kline_gaps()方法
文件: src/utils/data_healing/repair_executor.py (222-267行)
def _find_kline_gaps(
self,
missing_times: List[datetime],
symbol: str,
base_symbol: str,
timeframe: str
) -> List[datetime]:
"""
检查 repair_timeframe 的 K 线是否有缺口(复用 KlineDataFiller.validate_continuity)
Args:
missing_times: 缺失的时间点列表
symbol: 交易对符号
base_symbol: 基准符号
timeframe: 时间周期(repair_timeframe)
Returns:
缺失的 K 线时间点列表(已排序)
"""
if not missing_times:
return []
lookback_minutes = TOTAL_WINDOW * self._interval_minutes(timeframe)
start_time = min(missing_times) - timedelta(minutes=lookback_minutes)
end_time = max(missing_times)
all_gaps = set()
for sym in [symbol, base_symbol]:
klines = self.kline_repo.query_range(
symbol=sym,
timeframe=timeframe,
start_time=start_time,
end_time=end_time,
limit=10000
)
if not klines:
# ✅ 修复: 完全没有 K 线时,生成整个时间范围的所有时间点
complete_gaps = self._generate_complete_timeline(
start_time, end_time, timeframe
)
all_gaps.update(complete_gaps)
logger.warning(
f"K 线完全缺失: {sym} @ {timeframe} | "
f"需要补充 {len(complete_gaps)} 个时间点 "
f"[{start_time} ~ {end_time}]"
)
else:
_, sym_missing = self.kline_filler.validate_continuity(klines, timeframe)
if sym_missing:
all_gaps.update(sym_missing)
logger.debug(f"K 线缺口: {sym} @ {timeframe} | {len(sym_missing)} 个")
return sorted(all_gaps)
2. 新增_generate_complete_timeline()方法
在repair_executor.py中添加(在_find_kline_gaps()方法之后):
def _generate_complete_timeline(
self,
start_time: datetime,
end_time: datetime,
timeframe: str
) -> List[datetime]:
"""
生成完整的时间序列(用于K线完全缺失的场景)
当某个symbol的K线数据完全缺失时,需要补充整个时间范围内的所有K线,
才能提取足够的窗口数据来计算zscore。
Args:
start_time: 起始时间(包含)
end_time: 结束时间(包含)
timeframe: 时间周期('5m', '1h', '4h'等)
Returns:
时间点列表,按升序排列
Example:
>>> start = datetime(2026, 2, 15, 10, 0)
>>> end = datetime(2026, 2, 15, 10, 30)
>>> timeline = _generate_complete_timeline(start, end, '5m')
>>> len(timeline)
7 # [10:00, 10:05, 10:10, 10:15, 10:20, 10:25, 10:30]
"""
interval_minutes = self._interval_minutes(timeframe)
interval = timedelta(minutes=interval_minutes)
timeline = []
current = start_time
while current <= end_time:
timeline.append(current)
current += interval
logger.debug(
f"生成完整时间序列: {len(timeline)} 个时间点 | "
f"间隔: {interval_minutes}分钟 | "
f"范围: [{start_time} ~ {end_time}]"
)
return timeline
修改影响分析
| 组件 | 修改 | 影响 |
|---|---|---|
_find_kline_gaps() |
修改K线完全缺失的处理逻辑 | 返回完整的时间范围 |
_generate_complete_timeline() |
新增辅助方法 | 生成时间序列 |
_fill_kline_gaps() |
无改动 | 补充完整的K线 |
_repair_from_klines() |
无改动 | 成功计算zscore |
向后兼容性
✅ 完全兼容 - 修改仅影响K线完全缺失的分支,其他逻辑不变
✅ 验证方案 | Testing & Verification
单元测试
# test_data_healing_bug_fix.py
import pytest
from datetime import datetime, timedelta
from src.utils.data_healing.repair_executor import RepairExecutor
class TestFindKlineGapsFix:
"""验证K线完全缺失的修复"""
@pytest.fixture
def executor(self, db_client, kline_repo):
"""创建RepairExecutor实例"""
return RepairExecutor(db_client, kline_repo)
def test_generate_complete_timeline_5m(self, executor):
"""测试5分钟周期的时间序列生成"""
start = datetime(2026, 2, 15, 10, 0)
end = datetime(2026, 2, 15, 10, 30)
timeline = executor._generate_complete_timeline(start, end, '5m')
assert len(timeline) == 7 # 10:00, 10:05, 10:10, ..., 10:30
assert timeline[0] == start
assert timeline[-1] == end
assert all(
(timeline[i+1] - timeline[i]).total_seconds() == 5*60
for i in range(len(timeline)-1)
)
def test_generate_complete_timeline_1h(self, executor):
"""测试1小时周期的时间序列生成"""
start = datetime(2026, 2, 15, 10, 0)
end = datetime(2026, 2, 15, 13, 0)
timeline = executor._generate_complete_timeline(start, end, '1h')
assert len(timeline) == 4 # 10, 11, 12, 13
assert timeline[0] == start
assert timeline[-1] == end
def test_find_kline_gaps_with_empty_klines(self, executor, mock_kline_repo):
"""测试K线完全缺失时的缺口识别"""
# 设置mock: query_range返回空列表(K线完全缺失)
mock_kline_repo.query_range.return_value = []
missing_times = [
datetime(2026, 2, 15, 10, 0),
datetime(2026, 2, 15, 10, 30),
datetime(2026, 2, 15, 11, 0),
]
gaps = executor._find_kline_gaps(
missing_times,
symbol="PURR/USDC:USDC",
base_symbol="HYPE/USDC:USDC",
timeframe='5m'
)
# 预期: 约130个时间点(TOTAL_WINDOW)
# start_time = t1 - 650分钟, end_time = t3
# 区间宽度 ≈ 650分钟 + 60分钟 = 710分钟
# 时间点数 = 710 / 5 + 1 ≈ 143
expected_min = 130
assert len(gaps) >= expected_min, \
f"缺口数量不足: {len(gaps)} < {expected_min}"
def test_find_kline_gaps_partial_missing(self, executor, mock_kline_repo):
"""测试K线部分缺失时的正常处理"""
# 设置mock: 返回部分K线(部分缺失)
partial_klines = [...] # 部分K线数据
mock_kline_repo.query_range.return_value = partial_klines
missing_times = [
datetime(2026, 2, 15, 10, 0),
datetime(2026, 2, 15, 11, 0),
]
gaps = executor._find_kline_gaps(
missing_times,
symbol="PURR/USDC:USDC",
base_symbol="HYPE/USDC:USDC",
timeframe='5m'
)
# 应该调用validate_continuity处理部分缺失
# 验证返回合理的缺口列表
assert isinstance(gaps, list)
assert all(isinstance(g, datetime) for g in gaps)
集成测试
def test_full_healing_with_empty_klines():
"""测试完整的自愈流程(K线完全缺失)"""
# 准备测试数据
db_client = create_test_db_client()
kline_repo = create_test_kline_repo(empty=True) # 没有K线数据
# 在analysis_results表中创建部分记录
setup_partial_analysis_results(
symbol="PURR/USDC:USDC",
count=3 # 只有3条记录(缺少141条)
)
# 执行自愈
orchestrator = DataHealingOrchestrator(
db_client=db_client,
kline_repo=kline_repo,
symbol="PURR/USDC:USDC",
base_symbol="HYPE/USDC:USDC",
repair_timeframe='4h'
)
result = orchestrator.heal_and_prepare(required_count=144)
# 验证修复成功
assert result.status == 'ready', f"预期启动状态为'ready',实际为'{result.status}'"
assert result.quality.completeness_pct >= 95, \
f"数据完整度应≥95%,实际{result.quality.completeness_pct:.1f}%"
assert len(result.data) >= 144, \
f"数据条数应≥144,实际{len(result.data)}"
性能测试
def test_generate_timeline_performance():
"""验证时间序列生成的性能"""
executor = RepairExecutor(...)
start = datetime(2026, 1, 1, 0, 0)
end = datetime(2026, 2, 15, 23, 59) # 约1.5个月
# 测试5分钟周期(最多的时间点)
import time
t_start = time.time()
timeline = executor._generate_complete_timeline(start, end, '5m')
duration = time.time() - t_start
# 应该在100ms内完成
assert duration < 0.1, f"性能不达标: {duration:.3f}s > 0.1s"
# 验证时间点数量
expected = int((end - start).total_seconds() / 300) + 1
assert len(timeline) == expected
回归测试清单
- [ ] 原有的自愈流程不受影响
- [ ] K线部分缺失的场景仍然正确处理
- [ ] 质量评估和启动模式判决逻辑不变
- [ ] 超时保护机制有效
- [ ] 共享executor的复用逻辑正常
- [ ] 数据库写入性能符合预期
📈 修复前后对比 | Before & After
修复前(BUG场景)
输入:
symbol: PURR/USDC:USDC
missing_times: [t1, t2, t3]
K线数据: 空
执行流程:
Step 1: _find_kline_gaps()
↓ all_gaps = {t1, t2, t3} # 只有3个 ❌
Step 2: _fill_kline_gaps()
↓ 补充3条K线 ❌
Step 3: _repair_from_klines()
↓ 计算zscore
需要: 130条窗口
有: 0-2条
↓ repaired_count = 0 ❌
结果:
status: 'failed' ❌
completeness_pct: 2.1% ❌
启动: 拒绝 ❌
日志:
⚠️ 第1轮修复无进展,无可用K线数据
🏁 自愈结果: failed | F级 (2.1%) | 修复轮次: 1
❌ 服务拒绝启动
修复后(正确行为)
输入:
symbol: PURR/USDC:USDC
missing_times: [t1, t2, t3]
K线数据: 空
执行流程:
Step 1: _find_kline_gaps()
↓ all_gaps = {t1-650分钟, ..., t3} # 约130个 ✅
Step 2: _fill_kline_gaps()
↓ 补充130条K线 ✅
Step 3: _repair_from_klines()
↓ 计算zscore
需要: 130条窗口
有: 130条
↓ repaired_count = 144 ✅ # 修复了144条记录
结果:
status: 'ready' ✅
completeness_pct: 100% ✅
启动: 正常启动 ✅
日志:
✅ 第1轮修复完成: 修复 144 条记录
✅ 数据连续且完整(144/144),自愈完成(第1轮)
🏁 自愈结果: ready | A级 (100.0%) | 修复轮次: 1
✅ 服务正常启动
🚀 实施计划 | Implementation Plan
第1步:代码修改
- 修改
repair_executor.py中的_find_kline_gaps()方法 - 添加
_generate_complete_timeline()辅助方法
预计时间: 30分钟
第2步:单元测试
- 编写时间序列生成的测试
- 编写K线完全缺失场景的测试
- 测试部分缺失场景的兼容性
预计时间: 1小时
第3步:集成测试
- 端到端的自愈流程测试
- 验证数据库一致性
- 性能基准测试
预计时间: 1小时
第4步:线上验证
- 灰度发布到测试环境
- 使用新币种进行验证
- 监控日志和性能指标
预计时间: 2-4小时
第5步:发布
- 全量发布到生产环境
- 监控系统健康状态
- 准备回滚方案
预计时间: 1小时
总计: 约5-7小时
📝 参考资源 | References
相关代码路径
src/utils/data_healing/repair_executor.py- 修复执行器src/utils/data_healing/orchestrator.py- 自愈编排器src/utils/data_healing/continuity_checker.py- 连续性检查器src/services/realtime_kline_service_base.py- 服务基类
配置参数
TOTAL_WINDOW = 130- K线窗口大小BETA_WINDOW = 100- 回归参数窗口ZSCORE_WINDOW = 30- zscore计算窗口EXPECTED_INTERVAL_MINUTES = 5- 预期间隔
相关问题
- 数据库连接性
- API速率限制
- 数据一致性
📌 备注 | Notes
- 修复不会改变API接口:所有公开方法的签名保持不变
- 向后兼容:修改仅影响K线完全缺失的特殊分支
- 可测试性:新增方法
_generate_complete_timeline()是纯函数,易于测试 - 监控建议:建议添加metrics来监控自愈成功率和修复记录数
📞 联系方式 | Contact
如有问题或反馈,请联系开发团队。
文档维护: Trading-in-websocket项目组
最后更新: 2026-02-15