数据自愈并发优化设计方案(v2)
数据自愈并发优化设计方案(v2)
快速摘要
| 类别 | 改动项 | 优先级 |
|---|---|---|
| 存量 Bug(必修) | _get_db_now() fallback 返回 naive datetime → TypeError |
P0 |
| 存量 Bug(必修) | batch_insert 裸 INSERT,并发写入无冲突保护 |
P0(需先执行 DDL) |
| 性能优化 | KlineDataFiller 懒加载,消除 49 次废弃 HTTP 握手 |
P1 |
| 性能优化 | ~100 次 DB 往返 → 1 次(批量查询含 db_now) | P1 |
| 性能优化 | 串行修复 → 并发修复(max_workers=4) | P2 |
| 代码质量 | 冗余连续性检查、日志精简、冷却锁保护 | P3 |
部署依赖:
batch_insert ON CONFLICT修复必须在 DDL 约束添加之后上线。建议顺序见第九节。
一、现状分析
当前流程
_run_data_healing(realtime_kline_service_base.py:1807):
for (symbol, base_symbol) in heal_pairs: # ~50+ 配对,串行
DataHealingOrchestrator(...) # → RepairExecutor.__init__
│ # → KlineDataFiller.__init__
│ # → Info(MAINNET_API_URL) HTTP 握手 ~0.85s ← 根因
│ # ← 第 2-50 个握手完成后才被 shared_executor 覆盖,全部浪费
_load_zscore_history() # DB 查询 ×1(0.08-0.56s)
_get_db_now() # SELECT NOW() ×1
_diagnose() # 内存计算(~0ms)
check_continuity() # ← 第一次连续性检查
_final_assessment() # ← 第二次连续性检查(冗余)
瓶颈定量(日志实测,50 对)
| 操作 | 单次耗时 | 次数 | 总计 | 占比 |
|---|---|---|---|---|
KlineDataFiller init(HTTP 握手,49 次废弃) |
~0.85s | 50 | ~43s | 78% |
_load_zscore_history DB 查询 |
~0.15s avg | ~50 | ~7.5s | 14% |
| ONDO/BTC DB 慢查询(特例,见附加优化 4d) | 4.44s | 1 | 4.44s | 8% |
_get_db_now SELECT NOW() |
~0.01s | ~50 | ~0.5s | ~1% |
_final_assessment 冗余连续性检查 |
~0.001s | ~50 | ~0.05s | ~0% |
| 总计 | ~55s |
基础设施
- 连接池:psycopg 3.x
ConnectionPool,min_size=2, max_size=10,支持并发 DB 查询 - 交易所 API:
KlineDataFiller内置 10 分钟冷却机制(threading.Lock()保护) - 超时保护:全局
healing_timeout(300s) - 覆盖索引:
idx_analysis_results_pair_kline_time ON analysis_results (symbol, base_symbol, kline_time DESC, analysis_time DESC) WHERE zscore_4h IS NOT NULL(与批量 SQL 完全对齐)
二、存量 Bug 修复(独立于性能优化,可单独上线)
Bug-1:_get_db_now() fallback 返回 naive datetime
现状:orchestrator.py:447:
except Exception:
return datetime.utcnow() # ← naive datetime(无时区信息)
DB 的 SELECT NOW() 返回 timezone-aware datetime。_check_freshness 中做时间差计算时触发 TypeError。
修复:
# orchestrator.py — _get_db_now()(第 442-447 行)
def _get_db_now(self) -> datetime:
try:
rows = self.db_client.execute_query("SELECT NOW() AS now")
return rows[0]['now']
except Exception:
return datetime.now(timezone.utc) # ✅ 修复:统一 timezone-aware
Bug-2:batch_insert 裸 INSERT,并发写入无冲突保护
前提:必须先添加 UNIQUE 约束(DDL 变更)
当前 analysis_results 主键为 PRIMARY KEY (analysis_time, id),(analysis_time, symbol, base_symbol) 上无唯一约束。
-- 上线前在生产 DB 执行(低峰期,约 10-30 分钟)
CREATE UNIQUE INDEX CONCURRENTLY uq_analysis_results_time_pair
ON analysis_results (analysis_time, symbol, base_symbol);
ALTER TABLE analysis_results
ADD CONSTRAINT uq_analysis_results_time_pair
UNIQUE USING INDEX uq_analysis_results_time_pair;
修复(约束添加后):
# timescaledb.py — AnalysisResultRepository.batch_insert
ON CONFLICT (analysis_time, symbol, base_symbol) DO NOTHING; -- ✅ 并发写入安全
三、优化方案 A:KlineDataFiller 懒加载
根因与解法
49 次废弃 HTTP 握手(~43s,78%)是最大瓶颈。根因在于 KlineDataFiller.__init__ 立即触发握手,而非首次实际 API 调用时触发。
修复:将 _info 改为懒加载,首次通过 info property 访问时初始化:
# kline_data_filler.py
class KlineDataFiller:
def __init__(self, exchange_id: str, kline_repo: KlineRepository):
self._exchange_id = exchange_id
self._kline_repo = kline_repo
self._info: Optional[Info] = None # ← 不在 __init__ 中初始化
self._info_lock = threading.Lock() # ← 双重检查锁,线程安全
@property
def info(self) -> Info:
"""懒加载 Info 实例,首次调用时触发 HTTP 握手(线程安全)。"""
if self._info is None:
with self._info_lock:
if self._info is None: # ← 双重检查
self._info = Info(MAINNET_API_URL, skip_ws=True, timeout=30)
return self._info
将类内部所有
self._info直接访问替换为self.info(通过 property)。
效果
| 场景 | 改动前 | 改动后 |
|---|---|---|
| 50 对全部健康(无修复) | 50 次握手,~43s | 0 次握手 |
| 5 对需修复(共享 executor) | 50 次握手,~43s | 1 次握手(首次修复时触发) |
| 接口改动范围 | — | 仅 kline_data_filler.py 内部,外部接口零改动 |
接口不变的意义
懒加载后,DataHealingOrchestrator.__init__ 不再触发 HTTP 握手,因此:
DataHealingOrchestrator.__init__无需新增executor可选参数heal_and_prepare()无需新增preloaded_diagnosis可选参数- 并发修复路径直接复用已有的
orchestrator.executor = shared_executor模式(见方案 C)
四、优化方案 B:批量 DB 查询
目标
50+ 次 DB 往返 → 1 次(db_now 内嵌在同一查询中)
核心 SQL
-- boundary_days = math.ceil(required_count × interval_minutes / 1440) + LOAD_SAFETY_MARGIN_DAYS
SELECT
NOW() AS db_now, -- 消除独立 SELECT NOW(),同一事务快照
symbol, base_symbol,
kline_time, zscore_4h, analysis_time
FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY symbol, base_symbol
ORDER BY kline_time DESC
) AS rn
FROM (
SELECT DISTINCT ON (symbol, base_symbol, kline_time)
symbol, base_symbol, kline_time, zscore_4h, analysis_time
FROM analysis_results
WHERE (symbol, base_symbol) IN (VALUES (%s,%s), (%s,%s), ...)
AND zscore_4h IS NOT NULL
AND kline_time >= NOW() - INTERVAL '{boundary_days} days'
ORDER BY symbol, base_symbol, kline_time DESC, analysis_time DESC
) deduped
) ranked
WHERE rn <= %s
ORDER BY symbol, base_symbol, kline_time ASC
boundary_days为 Python 计算的整型字面量,不涉及用户输入,字符串插值安全。
失败处理:跳过本轮,不降级
批量查询失败意味着 DB 已出现问题,此时降级执行 100 次串行查询只会加重负担。自愈是后台补偿流程,跳过一轮等下次调度重试更安全。
# realtime_kline_service_base.py — _run_data_healing()
try:
grouped, db_now = batch_load_zscore_history(heal_pairs, required_count, self.db_client)
except Exception as e:
logger.error(f"批量查询失败,跳过本轮自愈: {e}", exc_info=True)
return
Statement Timeout
将现有 DB_STATEMENT_TIMEOUT_MS 默认值从 10s 调整为 15s。批量查询取代了所有单配对查询,统一使用此值即可,无需新增常量。
上线前验证执行计划(必做)
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT NOW() AS db_now, symbol, base_symbol, ...
-- 用真实 10-50 个配对测试
| 执行计划 | 预期耗时 | 处理方式 |
|---|---|---|
Index Scan |
~0.5-1s | 直接上线 ✅ |
Bitmap Index Scan |
~0.5-2s | 可接受 ✅ |
Seq Scan < 2s |
~1-2s | 可接受 ✅ |
Seq Scan ≥ 2s |
>2s | 重写 SQL 或调整索引,不使用 enable_seqscan = off ⚠️ |
SET LOCAL enable_seqscan = off会干扰规划器全局行为并掩盖根因,禁止作为解决方案。
代码改动
orchestrator.py:新增batch_load_zscore_history()静态方法(含 db_now);_get_db_now()fallback 修复realtime_kline_service_base.py:_run_data_healing()调用批量查询,失败logger.error + returnconfig.py:DB_STATEMENT_TIMEOUT_MS默认值调整为 15000
五、优化方案 C:并发修复
前提
批量查询后数据已在内存中。诊断是纯 CPU 操作(~0ms/对),不需要并发。只有不健康配对调用 RepairExecutor.repair()(涉及 DB 写入 + 交易所 API),才进入并发。
线程安全验证
| 组件 | 线程安全方式 |
|---|---|
db_client(ConnectionPool) |
psycopg 3.x 连接池原生线程安全 |
kline_repo / analysis_repo |
每次操作从连接池独立获取连接 |
kline_filler(KlineDataFiller) |
冷却字典由 threading.Lock() 保护;懒加载由双重检查锁保护 |
repair() 方法本身 |
无跨调用共享的可变状态 |
设计:直接属性替换,接口零改动
有了懒加载(方案 A),DataHealingOrchestrator.__init__ 不再触发握手,无需修改其接口。直接复用已有的属性替换模式:
def _run_concurrent_repair(
unhealthy_pairs: list[tuple[str, str]],
required_count: int,
db_client: TimescaleDBClient,
kline_repo: KlineRepository,
) -> dict[tuple[str, str], HealingResult]:
if not unhealthy_pairs:
return {}
# 懒加载:握手在首次修复时触发,而非此处
shared_executor = RepairExecutor(db_client, kline_repo)
results: dict[tuple[str, str], HealingResult] = {}
def _heal_one(pair: tuple[str, str]) -> tuple[tuple[str, str], HealingResult]:
symbol, base_symbol = pair
orchestrator = DataHealingOrchestrator(
db_client=db_client,
kline_repo=kline_repo,
symbol=symbol,
base_symbol=base_symbol,
)
orchestrator.executor = shared_executor # ✅ 直接替换,DataHealingOrchestrator 接口零改动
result = orchestrator.heal_and_prepare(required_count=required_count)
return (symbol, base_symbol), result
max_workers = min(4, len(unhealthy_pairs))
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(_heal_one, pair): pair for pair in unhealthy_pairs}
for future in as_completed(futures):
try:
key, result = future.result()
results[key] = result
except Exception as e:
pair = futures[future]
logger.error(f"并发修复异常 [{pair[0]}/{pair[1]}]: {e}", exc_info=True)
empty_quality = QualityAssessor().assess([], required_count, False, 0)
results[(pair[0], pair[1])] = HealingResult(
status='failed', data=[], quality=empty_quality, iterations_used=0,
)
return results
DataHealingOrchestrator.__init__和heal_and_prepare()签名均未修改。
并发度与连接池
heal_and_prepare() 内部操作(顺序执行):
1. _load_zscore_history() ← 1 个连接
2. _fill_kline_gaps_parallel() ← 最多 2 个连接(ThreadPoolExecutor(max_workers=2))
3. batch_insert() ← 1 个连接
峰值连接数(步骤 2 阶段):
4 workers × 2 = 8 并发连接
8 ≤ ConnectionPool.max_size=10 ✅(留 2 个连接作安全余量)
并发修复中,每个 unhealthy 配对的
heal_and_prepare内部仍执行一次_load_zscore_history(),加载最新数据(不复用批量缓存,防时间差导致数据过时)。
六、附加优化
6a. 健康配对跳过冗余评估
冗余路径:
heal_and_prepare()
├─ _diagnose()
│ └─ check_continuity() ← 第 1 次
└─ _final_assessment()
└─ check_continuity() ← 第 2 次(参数完全相同,纯冗余)
优化:诊断结果为 is_healthy=True 时,直接复用诊断结论构建 HealingResult,跳过 _final_assessment:
# orchestrator.py — heal_and_prepare() 内
if diagnosis.is_healthy:
quality = self.assessor.assess(records, required_count, is_continuous=True, missing_count=0)
return HealingResult(
status=self._determine_status(quality),
data=self._extract_zscore_values(records),
quality=quality,
iterations_used=iteration,
)
6b. 日志精简
优化前:每个健康配对输出 5 行 INFO 日志(50 对 = 250 行)
优化后:健康配对仅 DEBUG 级别,汇总 1 行 INFO:
自愈完成 | healed=3, failed=0, healthy=47 | 耗时: 0.42s
6c. _is_in_cooldown 锁保护(并发修复上线前完成)
原标注为"可选",但并发修复上线后读写竞态实际存在,升级为推荐:
# kline_data_filler.py
def _is_in_cooldown(self, symbol: str, timeframe: str) -> bool:
with self._cooldown_lock: # ✅ 与写操作一致
last_fill = self.fill_cooldown.get((symbol, timeframe), 0)
return time.time() - last_fill < self.COOLDOWN_SECONDS
6d. ONDO/BTC 慢查询根因排查(批量 SQL 上线前完成)
ONDO/BTC 单次查询耗时 4.44s,是其他配对的 30 倍。批量 SQL 走 Index Scan 时此配对会成为全局瓶颈。
-- Step 1:确认数据量
SELECT COUNT(*) FROM analysis_results
WHERE symbol = 'ONDO/USDC:USDC' AND base_symbol = 'BTC/USDC:USDC' AND zscore_4h IS NOT NULL;
-- Step 2:检查执行计划
EXPLAIN (ANALYZE, BUFFERS) SELECT DISTINCT ON (kline_time) ...
-- Step 3:若数据量过大,清理超出保留窗口的历史数据
DELETE FROM analysis_results
WHERE symbol = 'ONDO/USDC:USDC' AND base_symbol = 'BTC/USDC:USDC'
AND kline_time < NOW() - INTERVAL '60 days';
七、重构后流程
_run_data_healing():
│
├── 1. 查询 DB 中有数据的配对 # 已有逻辑,不变
├── 2. 构建 heal_pairs 列表 # 已有逻辑,不变
│
├── 3.【新】批量加载(含 db_now,1 次 DB 往返)
│ batch_load_zscore_history(heal_pairs, required_count, db_client)
│ → grouped: {(sym, base): [records...]}
│ → db_now: datetime(timezone-aware)
│ 失败 → logger.error + return(跳过本轮,等下次调度)
│
├── 4.【新】批量诊断(内存操作,~0ms)
│ for pair in heal_pairs:
│ records = grouped.get(pair, [])
│ orchestrator = DataHealingOrchestrator(...) # 懒加载:无 HTTP 握手,仅对象分配
│ diagnosis = orchestrator._diagnose(records, required_count, db_now)
│ if is_healthy → 跳过 _final_assessment,直接构建结果(见 6a)
│ else → 加入 unhealthy_pairs
│
├── 5.【新】并发修复不健康配对
│ shared_executor = RepairExecutor(...) # 懒加载:握手延迟至首次修复调用
│ ThreadPoolExecutor(max_workers=min(4, N)):
│ for pair in unhealthy_pairs:
│ orchestrator.executor = shared_executor # 直接替换,接口零改动
│ orchestrator.heal_and_prepare(required_count)
│ → 重新加载最新数据(1 次 DB 查询/配对)
│ → 修复 + 评估
│
└── 6. 汇总日志(1 行)
healed=X, failed=Y, healthy=Z | 耗时: T s
八、预期性能对比
| 指标 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| HTTP 握手次数 | 50 次(49 次废弃) | 0 次(全健康)/ 1 次(有修复) | 消除 |
| HTTP 握手总耗时 | ~43s | 0s / ~0.85s | 消除 |
| DB 查询次数(加载 + NOW) | ~100 次 | 1 次 | 100x |
| DB 查询总耗时 | ~8s | ~0.3s | 25x |
| 全健康场景总耗时 | ~55s | ~0.35s | ~157x |
| 有 5 对需修复 | ~55s | ~2.8s | ~20x |
| 有 15 对需修复 | ~55s | ~7-8s | ~7-8x |
| 日志行数 | ~250 行 | ~10 行 | 精简 |
全健康场景耗时拆解(优化后):
1 次批量 DB 查询(含 db_now): ~0.3s
50 对内存诊断(对象分配 + CPU):~0.05s
HTTP 握手: 0s(无需修复,懒加载不触发)
─────────────────────────────────────────
总计: ~0.35s
有 5 对需修复的场景(优化后,max_workers=4):
1 次批量 DB 查询: ~0.3s
50 对内存诊断: ~0.05s
1 次 HTTP 握手(首次修复时触发): ~0.85s
5 对各自重新加载数据(并发): ~0.15s
5 对并发修复(4+1 批,含 DB 写入): ~1.5s
─────────────────────────────────────────
总计: ~2.8s
九、部署顺序
Step 1:【DB 层】执行 DDL(低峰期,约 10-30 分钟)
CREATE UNIQUE INDEX CONCURRENTLY ...
ALTER TABLE ... ADD CONSTRAINT ...
Step 2:【代码层,独立,可提前】修复 Bug-1
_get_db_now() fallback 改为 datetime.now(timezone.utc)
Step 3:【代码层,依赖 Step 1】上线 batch_insert ON CONFLICT
timescaledb.py 追加 ON CONFLICT ... DO NOTHING
Step 4:【代码层,依赖 Step 3】上线方案 A(KlineDataFiller 懒加载)
kline_data_filler.py:_info 懒加载 + 双重检查锁
同步上线 6c(_is_in_cooldown 锁保护)
Step 5:【代码层,依赖 Step 4】上线方案 B(批量 DB 查询)
上线前:EXPLAIN ANALYZE 验证执行计划(见四)
上线前:完成 6d ONDO/BTC 慢查询排查
Step 6:【代码层,依赖 Step 5】上线方案 C(并发修复)
Step 7:【可选,非阻塞】上线附加优化 6a / 6b
十、改动文件清单
| 文件 | 改动类型 | 说明 |
|---|---|---|
| DDL 迁移脚本 | DDL 变更(Step 1,必须先执行) | UNIQUE 索引 + 约束 |
kline_data_filler.py |
核心改动(Step 4) | _info 懒加载 + 双重检查锁;_is_in_cooldown 锁保护 |
orchestrator.py |
新增方法 + Bug 修复 | 新增 batch_load_zscore_history()(返回 grouped + db_now);_get_db_now() fallback 修复 |
realtime_kline_service_base.py |
重构方法 | _run_data_healing() 改为批量加载 + 诊断 + 并发修复;新增 _run_concurrent_repair() |
timescaledb.py |
Bug 修复(依赖 DDL) | batch_insert 追加 ON CONFLICT ... DO NOTHING |
config.py |
参数调整 | DB_STATEMENT_TIMEOUT_MS 默认值调整为 15000 |
不变的文件:repair_executor.py、continuity_checker.py、quality_assessor.py、__init__.py
与 v1 相比删除的改动:
| 删除项 | 删除原因 |
|---|---|
_batch_load_with_fallback() |
替换为硬失败+跳过;DB 故障时降级执行 100 次串行查询适得其反 |
_fallback_individual_load() |
同上,两条并行代码路径维护成本高于收益 |
_load_zscore_history_single() |
无 fallback 路径则不再需要 |
DataHealingOrchestrator.__init__ executor 参数 |
懒加载后直接属性替换即可,无需污染构造函数接口 |
heal_and_prepare() preloaded_diagnosis 参数 |
跳过 0ms CPU 操作不值得引入接口复杂度 |
BATCH_STATEMENT_TIMEOUT_MS 常量 |
合并到现有 DB_STATEMENT_TIMEOUT_MS(调整默认值即可) |
SET LOCAL enable_seqscan = off |
干扰规划器,掩盖根因,禁止作为解决方案 |
十一、风险控制
| 风险 | 缓解措施 |
|---|---|
| 批量查询失败 | logger.error 告警 + 跳过本轮,等下次调度重试;不静默降级 |
| 批量 SQL 走 Seq Scan 性能差 | 上线前 EXPLAIN (ANALYZE, BUFFERS) 验证;问题则重写 SQL 或调整索引 |
| ONDO/BTC 慢查询拖慢批量 SQL | 批量 SQL 上线前优先排查修复(见 6d) |
| DDL 变更加锁影响生产 | CREATE UNIQUE INDEX CONCURRENTLY 避免锁,低峰期执行 |
| DDL 未完成就上线 ON CONFLICT | 运行时报错;严格按部署顺序操作 |
| DB 连接池耗尽 | max_workers=4,峰值 8 连接 ≤ max_size=10(留 2 个安全余量) |
| 懒加载并发初始化竞态 | 双重检查锁(threading.Lock)保护,CPython GIL 之外也安全 |
_cooldown_lock 并发读竞态 |
6c 与方案 A 同步上线 |
datetime naive/aware 混用 |
Bug-1 修复,统一 datetime.now(timezone.utc) |
| 并发修复异常传播 | 每个 future 独立 try/except,单配对失败不影响其他 |
| 交易所 API 频率限制 | KlineDataFiller 10 分钟冷却机制,4 并发不会重复请求同一 symbol |
| 超时保护 | 全局 healing_timeout(300s) 保持不变 |
| 向后兼容 | DataHealingOrchestrator 和 heal_and_prepare 接口完全不变,原有调用方无需修改 |