协整检验通过性抖动导致的订单跟丢了的BUG 2
完整因果链:Gate1/Gate2 阻塞 Adaptive Z 回归平仓
Bug ID: GATE-EXIT-BLOCK-001
严重级别: 🚨 Critical
发现日期: 2026-02-16
状态: 已识别,待修复
执行摘要
现象:开仓成功后,若后续K线周期Gate1或Gate2未通过,该仓位无法通过Adaptive Z回归触发平仓。
影响:平仓仅能依赖止损、移动止损、持仓超时;无法在均值回归时主动平仓,违背策略设计初衷。
根因:退场检查与入场共用"分析结果存在"前置条件,Gate误伤了仅需zscore的退场路径。
1. 输入(Input / Trigger)
1.1 初始状态(T0)
| 维度 | 状态 |
|---|---|
| 持仓状态 | ETH/USDC:USDC 已成功开仓 |
| PositionManager | _positions 中存在该 symbol |
| Strategy | _positions[symbol] 有 PositionTracker(含 entry_adaptive_z) |
| Gate状态 | T0时刻 Gate1+Gate2 ✅ 通过 |
1.2 触发事件(T1)
WebSocket 推送新的 5m K线
↓
RealtimeKlineServiceBase.on_message()
↓
_analyze_and_alert(symbol='ETH/USDC:USDC', timeframe='5m', kline_time=T1)
1.3 输入差异
| 时刻 | Gate1 协整通过数 | Gate2 健康监控 | 是否有持仓 |
|---|---|---|---|
| T0 | 4/6 (≥3) ✅ | HEALTHY ✅ | 否 → 开仓成功 |
| T1 | 2/6 (<3) ❌ | UNHEALTHY ❌ | 是 → 应检查退场 |
关键矛盾:T1时刻系统应该检查退场条件,但因Gate不通过被错误地提前终止。
2. 状态变化(State Transition)
2.1 数据获取阶段
# realtime_kline_service_base.py:1307-1314
price_data_cache = self._fetch_and_validate_price_data(symbol, timeframe)
if price_data_cache is None or len(price_data_cache) < 3:
return # ✅ 正常:数据不足
状态:price_data_cache 包含 ('5m','7d'), ('1h','30d'), ('4h','60d') 三个周期的数据。
2.2 相关性过滤阶段
# realtime_kline_service_base.py:1121-1138
def _run_correlation_and_analysis(self, symbol: str, price_data_cache: Dict) -> Optional[Dict]:
# 相关系数前置过滤
corr_4h_60d_pre = calculate_correlation(...)
if corr_4h_60d_pre <= TARGET_CORR_THRESHOLD:
return None # ✅ 合理:相关性过低
状态:假设相关性通过,继续执行。
2.3 Gate检查阶段(⚠️ 问题核心)
# realtime_kline_service_base.py:1146-1151
multi_period_result = analyze_multi_period(
price_data_cache=price_data_cache,
base_symbol=self.base_symbol,
target_symbol=symbol,
...
)
内部状态变化:
Gate 1: 协整数量检查
# analysis_core.py:825-831
if cointegration_count < cointegration_threshold:
logger.info(
f"❌ {log_prefix}信号未通过 [Gate1:协整不足] | "
f"协整={cointegration_count}/6(需≥{cointegration_threshold})"
)
return None # ❌ 问题点1:直接返回None
状态转换:
- T1时刻:
cointegration_count = 2<threshold = 3 - 结果:
analyze_multi_period()返回 None
Gate 2: 健康监控检查
# analysis_core.py:837-844
health_monitor = details[HEALTH_MONITOR_PERIOD].get('health_monitor')
if health_monitor:
if not health_monitor.get('passed'):
logger.info(
f"❌ {log_prefix}信号未通过 [Gate2:健康监控] | "
f"4H/60D短期健康=UNHEALTHY"
)
return None # ❌ 问题点2:直接返回None
状态转换:
- T1时刻:
health_monitor['passed'] = False - 结果:
analyze_multi_period()返回 None
2.4 状态汇总
[ T0 ] symbol 无持仓 + Gate通过 → multi_period_result = {...} → 开仓成功
↓
[ T1 ] symbol 有持仓 + Gate不通过 → multi_period_result = None → ???
预期:T1应继续检查退场条件(仅需zscore,不需Gate)
实际:T1被当作"无效分析",直接return
3. 调用路径(Call Path)
3.1 正常路径(Gate通过时)
_analyze_and_alert(symbol, timeframe, kline_time)
│
├─ _fetch_and_validate_price_data(symbol, timeframe)
│ └─ price_data_cache = {...} ✅
│
├─ _run_correlation_and_analysis(symbol, price_data_cache)
│ │
│ ├─ 相关性过滤 ✅
│ │
│ └─ analyze_multi_period(...)
│ │
│ ├─ Gate1: cointegration_count >= 3 ✅
│ ├─ Gate2: health_monitor.passed = True ✅
│ │
│ └─ return {
│ 'zscore_list': [z5m, z1h, z4h],
│ 'cointegration_count': 4,
│ 'details': {...}
│ }
│
├─ multi_period_result is not None ✅
│
├─ _trigger_strategy_if_ready(...) ← 策略引擎被调用
│ │
│ └─ orchestrator.process_analysis(...)
│ │
│ └─ strategy.process_tick(...)
│ │
│ ├─ _check_entry(...) → EntrySignal | None
│ └─ _check_exit(...) → ExitSignal | None ✅ 退场检查
│
└─ _build_and_buffer_analysis_record(...)
3.2 故障路径(Gate不通过时)
_analyze_and_alert(symbol, timeframe, kline_time)
│
├─ _fetch_and_validate_price_data(symbol, timeframe)
│ └─ price_data_cache = {...} ✅
│
├─ _run_correlation_and_analysis(symbol, price_data_cache)
│ │
│ ├─ 相关性过滤 ✅
│ │
│ └─ analyze_multi_period(...)
│ │
│ ├─ Gate1: cointegration_count < 3 ❌
│ │ └─ return None
│ │
│ 或
│ │
│ └─ Gate2: health_monitor.passed = False ❌
│ └─ return None
│
├─ multi_period_result is None ❌
│
└─ if multi_period_result is None:
return ← ⚠️ 出错点:提前终止
[ 未执行 ] _trigger_strategy_if_ready(...)
[ 未执行 ] orchestrator.process_analysis(...)
[ 未执行 ] strategy.process_tick(...)
[ 未执行 ] _check_exit(...) ← ❌ 退场检查被跳过
3.3 代码位置映射
| 阶段 | 文件 | 行号 | 函数 | 状态 |
|---|---|---|---|---|
| 触发 | realtime_kline_service_base.py |
1287 | _analyze_and_alert |
入口 |
| 数据获取 | 同上 | 1307 | _fetch_and_validate_price_data |
✅ |
| Gate检查 | 同上 | 1316 | _run_correlation_and_analysis |
❌ 返回None |
| Gate1 | analysis_core.py |
825 | analyze_multi_period |
❌ 协整不足 |
| Gate2 | analysis_core.py |
837 | analyze_multi_period |
❌ 健康欠佳 |
| 出错点 | realtime_kline_service_base.py |
1317-1318 | _analyze_and_alert |
❌ 提前return |
| 策略引擎 | 同上 | 1321 | _trigger_strategy_if_ready |
❌ 未执行 |
| 编排器 | orchestrator.py |
242 | process_analysis |
❌ 未执行 |
| 策略核心 | strategy.py |
247 | process_tick |
❌ 未执行 |
| 退场检查 | strategy.py |
558 | _check_exit |
❌ 未执行 |
4. 出错点(Failure Point)
4.1 主要出错点
位置:src/services/realtime_kline_service_base.py:1316-1318
multi_period_result = self._run_correlation_and_analysis(symbol, price_data_cache)
if multi_period_result is None:
return # ⚠️ 出错点:不区分有无持仓
问题:
- 无条件提前return:只要
multi_period_result is None,无论该symbol是否有持仓,都直接终止 - 混淆两种场景:
- 场景A(合理):无持仓 + Gate不通过 → return(避免无效开仓)
- 场景B(错误):有持仓 + Gate不通过 → return(阻断退场检查)
4.2 次要出错点
位置:src/utils/analysis/analysis_core.py:825-831 和 837-844
# Gate1
if cointegration_count < cointegration_threshold:
return None # ⚠️ 硬编码返回None,无法区分"入场过滤"与"退场需求"
# Gate2
if not health_monitor.get('passed'):
return None # ⚠️ 同上
问题:
- Gate设计为"全局开关",无法针对"有持仓"场景提供"仅返回zscore"的轻量级结果
4.3 出错逻辑对比
| 场景 | 持仓状态 | Gate状态 | 当前行为 | 正确行为 |
|---|---|---|---|---|
| 1 | 无持仓 | 通过 | ✅ 调用策略引擎 → 检查入场 | ✅ 正确 |
| 2 | 无持仓 | 不通过 | ✅ return(不调用策略引擎) | ✅ 正确 |
| 3 | 有持仓 | 通过 | ✅ 调用策略引擎 → 检查退场 | ✅ 正确 |
| 4 | 有持仓 | 不通过 | ❌ return(不调用策略引擎) | ❌ 应调用策略引擎检查退场 |
场景4是Bug的核心:有持仓时,即使Gate不通过,也应调用策略引擎进行退场检查。
5. 根因(Root Cause)
5.1 设计层面根因
根因1: 退场检查与入场检查共用前置条件
问题描述:
- 入场检查:需要完整的多周期分析结果(协整、健康监控、zscore)
- 退场检查:仅需要当前的
z4h和adaptive_z(由策略内部EMA/STD计算)
当前实现:
# 只有 multi_period_result 非None(即Gate通过)时才进入策略引擎
if multi_period_result is None:
return # 入场和退场被绑在一起
正确设计:
# 应区分两种路径
if multi_period_result is None:
if has_open_position(symbol):
# 跳过Gate,获取轻量级结果(仅zscore)
lightweight_result = analyze_for_exit_only(...)
trigger_strategy_exit_only(...)
return
根因2: Gate误伤退场路径
Gate的本意:
- Gate1 协整检查:过滤入场质量,确保开仓时配对关系稳定
- Gate2 健康监控:过滤入场时机,避免在短期恶化期开仓
退场逻辑的实际需求:
# src/trading/strategy.py:558-631 _check_exit()
def _check_exit(self, symbol: str, z4h: float, adaptive_z: float, ...):
tracker = self._positions.get(symbol) # ← 只需持仓信息
direction = tracker.direction
entry_adaptive_z = tracker.entry_adaptive_z
# 均值回归判断(与Gate无关)
if direction == 'long':
if adaptive_z >= entry_adaptive_z * reversion_factor:
return ExitSignal(reason='reversion', ...)
elif direction == 'short':
if adaptive_z <= entry_adaptive_z * reversion_factor:
return ExitSignal(reason='reversion', ...)
依赖分析:
- ✅ 需要:
z4h,adaptive_z(策略内部计算) - ✅ 需要:持仓信息(
entry_adaptive_z,direction) - ❌ 不需要:Gate1 协整通过数
- ❌ 不需要:Gate2 健康监控
结论:Gate设计为"全局前置条件",但退场检查不依赖Gate,导致误伤。
5.2 实现层面根因
根因3: 缺少"持仓状态感知"
位置:realtime_kline_service_base.py:1316-1318
multi_period_result = self._run_correlation_and_analysis(symbol, price_data_cache)
if multi_period_result is None:
return # ← 缺少对 position_manager.open_positions 的检查
修复方向:
if multi_period_result is None:
# 检查是否有持仓
if (hasattr(self, '_trading_orchestrator')
and self._trading_orchestrator is not None
and symbol in {p.symbol for p in position_manager.open_positions}):
# 有持仓:跳过Gate重新获取分析结果(仅zscore)
multi_period_result = self._run_correlation_and_analysis(
symbol, price_data_cache, skip_gates=True
)
if multi_period_result:
# 仅触发退场检查
self._trigger_strategy_if_ready(..., exit_only=True)
return
5.3 因果关系图
graph TD
A[设计缺陷: 入场与退场共用前置条件] --> B[Gate成为全局开关]
B --> C[Gate不通过时 analyze_multi_period 返回None]
C --> D[_analyze_and_alert 提前return]
D --> E[策略引擎不被调用]
E --> F[_check_exit 永远不执行]
F --> G[Adaptive Z回归平仓被阻塞]
H[实现缺陷: 缺少持仓状态感知] --> D
I[Gate误伤: 退场不依赖Gate但被Gate阻塞] --> C
style A fill:#ff6b6b
style H fill:#ff6b6b
style I fill:#ff6b6b
style G fill:#ffd93d
6. 影响分析(Impact Assessment)
6.1 直接影响
| 维度 | 影响 | 严重程度 |
|---|---|---|
| Adaptive Z回归平仓 | ❌ 完全阻塞 | 🚨 Critical |
| 固定止损 | ✅ 正常工作(独立线程) | ✅ None |
| 移动止损 | ✅ 正常工作(独立线程) | ✅ None |
| 持仓超时 | ✅ 正常工作(独立线程) | ✅ None |
6.2 业务影响
场景1:Gate长期不通过
T0: 开仓 ETH/USDC:USDC @ adaptive_z = -2.5 (做多)
T1: adaptive_z = -1.5 (回归50%) + Gate不通过 → ❌ 无法平仓
T2: adaptive_z = -0.8 (回归70%) + Gate不通过 → ❌ 无法平仓
T3: adaptive_z = +0.5 (完全反转) + Gate不通过 → ❌ 无法平仓
T4: 价格反向亏损 → 触发止损才平仓
损失:错过最佳退出点(T1-T2),被迫止损退出(T4)。
场景2:Gate间歇性通过
T0: 开仓 @ adaptive_z = -2.5
T1: adaptive_z = -1.5 + Gate不通过 → ❌ 错过退出
T2: adaptive_z = -1.8 + Gate通过 → ✅ 但未达退出条件(回归不足)
T3: adaptive_z = -1.2 + Gate不通过 → ❌ 再次错过
T4: adaptive_z = -2.8 + Gate通过 → ❌ 偏离更严重,无法退出
结果:退出时机完全依赖Gate状态,而非策略逻辑。
6.3 止损兜底机制
独立线程:orchestrator.py:_stop_loss_monitor()
# 第588-634行
for pos in self._position_manager.open_positions:
# 检查1: 固定止损
if self._risk_manager.check_stop_loss(pos):
close_reason = f"止损触发 (>{sp.stop_loss_pct * 100:.1f}%)"
positions_to_close.append((pos, close_reason))
# 检查2: 移动止损
elif self._risk_manager.check_trailing_stop(pos):
close_reason = "移动止损触发"
positions_to_close.append((pos, close_reason))
# 检查3: 持仓超时
elif (datetime.now(timezone.utc) - pos.open_time).total_seconds() > sp.max_hold_duration_seconds:
close_reason = f"持仓超时 (>{sp.max_hold_duration_seconds/3600:.1f}h)"
positions_to_close.append((pos, close_reason))
特点:
- ✅ 不依赖K线分析
- ✅ 不依赖Gate
- ✅ 60秒轮询一次
- ❌ 无法实现"均值回归主动平仓"(策略核心逻辑)
7. 修复方案(Fix Proposal)
7.1 核心思路
目标:有持仓时,即使Gate不通过,仍能执行退场检查(仅退场,不入场)。
原则:
- 最小改动:不影响现有入场逻辑
- 向后兼容:不破坏无持仓时的Gate过滤
- 职责分离:入场依赖Gate,退场不依赖Gate
7.2 具体实现
Step 1: analyze_multi_period 支持跳过Gate
文件:src/utils/analysis/analysis_core.py
def analyze_multi_period(
price_data_cache: Dict[Tuple[str, str], Dict],
base_symbol: str,
target_symbol: str,
beta_window: int,
zscore_window: int,
cointegration_threshold: int = COINTEGRATION_THRESHOLD,
skip_gates: bool = False, # ← 新增参数
) -> Optional[Dict]:
"""多周期协整验证 + Z-score计算
Args:
skip_gates: True时跳过Gate1/Gate2检查,直接返回zscore(用于退场检查)
"""
# ... 前面的计算逻辑保持不变 ...
# 验证1: 协整通过数量检查
if not skip_gates: # ← 修改点1
if cointegration_count < cointegration_threshold:
logger.info(f"❌ [Gate1:协整不足] ...")
return None
# 验证2: 健康监控检查
health_monitor = details[HEALTH_MONITOR_PERIOD].get('health_monitor')
if health_monitor:
if not skip_gates and not health_monitor.get('passed'): # ← 修改点2
logger.info(f"❌ [Gate2:健康监控] ...")
return None
# ... 后续逻辑保持不变,正常返回结果 ...
Step 2: _run_correlation_and_analysis 传递参数
文件:src/services/realtime_kline_service_base.py
def _run_correlation_and_analysis(
self, symbol: str, price_data_cache: Dict, skip_gates: bool = False # ← 新增参数
) -> Optional[Dict]:
"""相关系数过滤 + 多周期分析"""
# ... 相关性过滤逻辑保持不变 ...
# 调用多周期验证(传递 skip_gates)
multi_period_result = analyze_multi_period(
price_data_cache=price_data_cache,
base_symbol=self.base_symbol,
target_symbol=symbol,
beta_window=BETA_WINDOW,
zscore_window=ZSCORE_WINDOW,
skip_gates=skip_gates, # ← 修改点
)
return multi_period_result
Step 3: _analyze_and_alert 添加持仓检查
文件:src/services/realtime_kline_service_base.py
def _analyze_and_alert(self, symbol: str, timeframe: str, kline_time: Optional[datetime] = None):
"""实时多周期分析 + 飞书告警(编排入口)"""
# ... 前面逻辑保持不变 ...
multi_period_result = self._run_correlation_and_analysis(symbol, price_data_cache)
if multi_period_result is None:
# ========== 新增逻辑开始 ==========
# 检查该symbol是否有持仓
has_position = False
if (hasattr(self, '_trading_orchestrator')
and self._trading_orchestrator is not None
and hasattr(self._trading_orchestrator, '_position_manager')):
position_manager = self._trading_orchestrator._position_manager
has_position = symbol in {p.symbol for p in position_manager.open_positions}
if has_position:
# 有持仓:跳过Gate重新获取分析结果(仅zscore用于退场)
self.logger.info(
f"🔄 Gate未通过但有持仓,跳过Gate重新分析: {symbol}"
)
multi_period_result = self._run_correlation_and_analysis(
symbol, price_data_cache, skip_gates=True
)
if multi_period_result is not None:
# 仅触发退场检查(不处理入场信号)
strategy_acted = self._trigger_strategy_if_ready(
symbol, timeframe, multi_period_result, price_data_cache,
kline_time, start_time, exit_only=True # ← 新增参数
)
self._build_and_buffer_analysis_record(
symbol, timeframe, multi_period_result, kline_time,
strategy_acted=strategy_acted
)
# ========== 新增逻辑结束 ==========
return # 原有return保持
# 正常路径(Gate通过)保持不变
strategy_acted = self._trigger_strategy_if_ready(...)
self._build_and_buffer_analysis_record(...)
Step 4: _trigger_strategy_if_ready 支持仅退场
文件:src/services/realtime_kline_service_base.py
def _trigger_strategy_if_ready(
self, symbol: str, timeframe: str, multi_period_result: Dict,
price_data_cache: Dict, kline_time: Optional[datetime], start_time: float,
exit_only: bool = False # ← 新增参数
) -> bool:
"""触发策略引擎
Args:
exit_only: True时仅处理退场信号,不处理入场信号
"""
# ... 前面逻辑保持不变 ...
if (self._trading_orchestrator is not None
and self._trading_orchestrator.strategy is not None):
acted = False
try:
acted = self._trading_orchestrator.process_analysis(
symbol=symbol,
z4h=current_zscore_4h,
multi_period_result=multi_period_result,
timestamp=analysis_now,
latest_alt_price=latest_alt_price,
avg_zscore_4h=avg_zscore_4h,
kline_time=kline_time,
l2_snapshot=l2_snapshot,
exit_only=exit_only, # ← 新增参数
)
except Exception as e:
self.logger.error(f"策略引擎异常: {e}", exc_info=True)
return acted
return False
Step 5: orchestrator.process_analysis 支持仅退场
文件:src/trading/orchestrator.py
def process_analysis(
self,
symbol: str,
z4h: float,
multi_period_result: dict,
timestamp: datetime,
latest_alt_price: float = None,
avg_zscore_4h: float = None,
kline_time: datetime | None = None,
l2_snapshot: dict | None = None,
exit_only: bool = False, # ← 新增参数
) -> bool:
"""策略引擎处理入口
Args:
exit_only: True时仅处理退场信号,不处理入场信号
"""
# ... 前面验证逻辑保持不变 ...
# 调用策略获取信号
entry_signal, exit_signal = self._strategy.process_tick(...)
# 处理退场信号(保持不变)
if exit_signal:
return self.on_exit_signal(exit_signal, latest_alt_price, l2_snapshot)
# 处理入场信号(新增条件判断)
if entry_signal and not exit_only: # ← 修改点:仅退场模式时跳过入场
return self.on_entry_signal(...)
return False
7.3 修复效果验证
场景1:无持仓 + Gate不通过(原有行为)
T1: symbol 无持仓 + Gate不通过
→ multi_period_result = None
→ has_position = False
→ return(不调用策略引擎)✅ 正确
场景2:无持仓 + Gate通过(原有行为)
T1: symbol 无持仓 + Gate通过
→ multi_period_result = {...}
→ 调用策略引擎(exit_only=False)
→ 检查入场信号 ✅ 正确
场景3:有持仓 + Gate通过(原有行为)
T1: symbol 有持仓 + Gate通过
→ multi_period_result = {...}
→ 调用策略引擎(exit_only=False)
→ 检查退场信号(优先)或入场信号 ✅ 正确
场景4:有持仓 + Gate不通过(修复后)
T1: symbol 有持仓 + Gate不通过
→ multi_period_result = None(第一次)
→ has_position = True
→ 重新调用 _run_correlation_and_analysis(skip_gates=True)
→ multi_period_result = {'zscore_list': [...], ...}(第二次,跳过Gate)
→ 调用策略引擎(exit_only=True)
→ 检查退场信号 ✅ 修复成功
→ 不处理入场信号 ✅ 避免误开仓
8. 风险评估(Risk Assessment)
8.1 修复前风险
| 风险 | 概率 | 影响 | 综合评级 |
|---|---|---|---|
| 错过最佳退出点 | 高(Gate间歇性不通过) | 中等(依赖止损) | 🔴 High |
| 被迫止损退出 | 中(Gate长期不通过) | 高(违背策略逻辑) | 🔴 High |
| 持仓时间过长 | 高(无法均值回归) | 中等(风险暴露增加) | 🟡 Medium |
| 策略失效 | 低(仅影响退场路径) | 高(核心逻辑受阻) | 🔴 High |
综合评级:🚨 Critical
8.2 修复后风险
| 风险 | 概率 | 影响 | 综合评级 |
|---|---|---|---|
| 跳过Gate导致误平仓 | 低(仅用zscore判断) | 低(策略内置阈值) | 🟢 Low |
| 性能影响(重复分析) | 低(仅有持仓时) | 低(单次分析<50ms) | 🟢 Low |
| 代码复杂度增加 | 中(5处修改) | 低(逻辑清晰) | 🟢 Low |
综合评级:🟢 Low
9. 测试计划(Test Plan)
9.1 单元测试
# tests/test_gate_exit_fix.py
def test_no_position_gate_fail():
"""无持仓 + Gate不通过 → 不调用策略引擎"""
service = create_mock_service()
service._run_correlation_and_analysis = Mock(return_value=None)
service._trigger_strategy_if_ready = Mock()
service._analyze_and_alert('ETH/USDC:USDC', '5m')
service._trigger_strategy_if_ready.assert_not_called() # ✅
def test_has_position_gate_fail():
"""有持仓 + Gate不通过 → 跳过Gate重新分析并触发退场检查"""
service = create_mock_service_with_position('ETH/USDC:USDC')
# Mock第一次调用返回None(Gate不通过)
# Mock第二次调用返回zscore(skip_gates=True)
service._run_correlation_and_analysis = Mock(
side_effect=[None, {'zscore_list': [-1.5, -1.2, -1.0]}]
)
service._trigger_strategy_if_ready = Mock()
service._analyze_and_alert('ETH/USDC:USDC', '5m')
# 验证:第二次调用时skip_gates=True
assert service._run_correlation_and_analysis.call_count == 2
assert service._run_correlation_and_analysis.call_args_list[1][1]['skip_gates'] == True
# 验证:触发策略引擎时exit_only=True
service._trigger_strategy_if_ready.assert_called_once()
assert service._trigger_strategy_if_ready.call_args[1]['exit_only'] == True
9.2 集成测试
# tests/integration/test_adaptive_z_exit.py
def test_adaptive_z_exit_with_gate_failure():
"""完整流程:有持仓 + Gate不通过 + Adaptive Z回归 → 成功平仓"""
# 1. 开仓(Gate通过)
orchestrator = create_orchestrator()
orchestrator.process_analysis(
symbol='ETH/USDC:USDC',
z4h=-2.5,
multi_period_result={
'zscore_list': [-2.5, -2.3, -2.5],
'cointegration_count': 4, # Gate1通过
'details': {('4h', '60d'): {'health_monitor': {'passed': True}}} # Gate2通过
},
...
)
# 验证开仓成功
assert orchestrator._position_manager.open_count == 1
pos = orchestrator._position_manager.open_positions[0]
assert pos.entry_adaptive_z == -2.5
# 2. 模拟Gate不通过(协整下降)
service = create_service(orchestrator)
service._run_correlation_and_analysis = Mock(
side_effect=[
None, # 第一次:Gate不通过
{'zscore_list': [-1.2, -1.0, -1.2], 'cointegration_count': 2} # 第二次:skip_gates
]
)
# 3. 触发分析(adaptive_z回归50%)
service._analyze_and_alert('ETH/USDC:USDC', '5m')
# 4. 验证平仓成功
assert orchestrator._position_manager.open_count == 0 # ✅ 成功平仓
9.3 回归测试
- [ ] 无持仓时Gate过滤仍正常工作
- [ ] 有持仓时Gate通过仍正常检查退场
- [ ] 止损/移动止损/超时平仓不受影响
- [ ] 性能无明显下降(<5%)
10. 参考资料(References)
10.1 相关文件
| 文件 | 关键函数 | 行号 |
|---|---|---|
src/services/realtime_kline_service_base.py |
_analyze_and_alert |
1287-1326 |
src/services/realtime_kline_service_base.py |
_run_correlation_and_analysis |
1121-1151 |
src/utils/analysis/analysis_core.py |
analyze_multi_period |
825-844 |
src/trading/orchestrator.py |
process_analysis |
242-282 |
src/trading/strategy.py |
process_tick |
247-272 |
src/trading/strategy.py |
_check_exit |
558-631 |
10.2 相关文档
docs/bug-analysis-gate-blocks-adaptive-z-exit.md- 初始bug分析docs/bug-analysis-gate1-2-block-exit.md- 简化因果链分析
附录A:数据流图(Data Flow Diagram)
sequenceDiagram
participant WS as WebSocket
participant SA as _analyze_and_alert
participant RC as _run_correlation_and_analysis
participant AM as analyze_multi_period
participant TS as _trigger_strategy_if_ready
participant ORC as orchestrator.process_analysis
participant STR as strategy.process_tick
participant EXIT as strategy._check_exit
participant PM as PositionManager
WS->>SA: on_message(symbol, kline)
SA->>RC: _run_correlation_and_analysis(symbol, cache)
RC->>AM: analyze_multi_period(cache)
alt Gate1/Gate2 不通过
AM-->>RC: return None
RC-->>SA: return None
SA->>PM: check has_position(symbol)?
alt 有持仓(修复后)
PM-->>SA: True
SA->>RC: _run_correlation_and_analysis(skip_gates=True)
RC->>AM: analyze_multi_period(skip_gates=True)
AM-->>RC: return {zscore_list}
RC-->>SA: return {zscore_list}
SA->>TS: _trigger_strategy_if_ready(exit_only=True)
TS->>ORC: process_analysis(exit_only=True)
ORC->>STR: process_tick()
STR->>EXIT: _check_exit(adaptive_z)
EXIT-->>STR: ExitSignal
STR-->>ORC: (None, ExitSignal)
ORC->>PM: close_position(symbol)
else 无持仓
PM-->>SA: False
SA-->>WS: return(跳过)
end
else Gate1/Gate2 通过
AM-->>RC: return {full_result}
RC-->>SA: return {full_result}
SA->>TS: _trigger_strategy_if_ready(exit_only=False)
TS->>ORC: process_analysis(exit_only=False)
ORC->>STR: process_tick()
STR->>EXIT: _check_exit()
EXIT-->>STR: ExitSignal | None
STR-->>ORC: (EntrySignal | None, ExitSignal | None)
end
文档版本: v1.0
最后更新: 2026-02-17
作者: Claude Code
状态: ✅ 完成因果链分析,待实施修复