开仓信号详细告警设计BUG分析2
开仓信号详细告警设计BUG分析报告
分析日期: 2026-02-16
分析范围: 开仓信号详细告警功能模块
代码版本: daf7ec2 (open alert lark content fat 2)
📋 执行摘要
经过代码审查,发现"开仓信号详细告警"功能模块存在 4个严重BUG,其中 BUG #1(告警丢失) 为 Critical级别,会导致用户在开仓后收不到任何通知,必须立即修复。
关键发现
| BUG编号 | 问题描述 | 严重程度 | 影响范围 |
|---|---|---|---|
| #1 | 告警发送失败时不降级 | 🔴 Critical | 所有开仓信号 |
| #2 | 限流配置过于严格 | 🟠 High | 高频交易场景 |
| #3 | 去重逻辑存在缺陷 | 🟡 Medium | 边缘情况 |
| #4 | 异步队列满时静默丢弃 | 🟠 High | 极端高频场景 |
🚨 BUG #1: 告警发送失败时不降级(Critical)
问题位置
src/trading/orchestrator.py:753-794 - _send_entry_alert 方法
问题代码
def _send_entry_alert(
self,
signal: PairTradeSignal,
multi_period_result: dict,
position: PairPosition,
order_result: PairOrderResult,
adaptive_z: float,
):
"""发送开仓告警(详细告警 or 简短通知)"""
available_balance = self._executor.get_available_balance()
if ENABLE_SIGNAL_DETAIL_ALERT:
try:
title, content = format_signal_alert(
signal=signal,
multi_period_result=multi_period_result,
position=position,
order_result=order_result,
leverage=self._config.leverage,
adaptive_z=adaptive_z,
adaptive_threshold=self._config.strategy_adaptive_threshold,
reversion_factor=self._config.strategy_reversion_factor,
available_balance=available_balance,
network_label=self._config.network_label,
)
priority = "high" if abs(adaptive_z) > self._config.strategy_adaptive_threshold * 1.4 else "normal"
self._alert_sender.send( # ❌ 问题1: 没有检查返回值
title=title,
content=content,
pair_name=signal.symbol,
priority=priority,
)
return # ❌ 问题2: 直接返回,不检查发送是否成功
except Exception as e:
logger.error(f"详细告警生成失败,降级为简短通知: {e}")
# 回退到简短通知
self._send_simple_entry_notification(
signal, position, order_result, adaptive_z, available_balance,
)
问题分析
AlertSender.send() 的返回值
AlertSender.send() 方法返回 AlertStatus 枚举:
class AlertStatus(Enum):
SUCCESS = "发送成功"
FAILED = "发送失败"
THROTTLED = "被限流"
DEGRADED = "降级发送"
三种失败场景
场景1: 告警被限流
# alert_sender.py:54-57
if priority != "high" and not self._check_rate_limit(pair_name):
logger.info(f"告警被限流: {pair_name}")
return AlertStatus.THROTTLED # ❌ 返回THROTTLED,但orchestrator未检查
- 触发条件: 5分钟内同一币对发送超过2条告警(默认配置)
- 当前行为: orchestrator 直接
return,不发送简短通知 - 严重后果: 用户完全收不到开仓通知!
场景2: 告警被去重
# alert_sender.py:60-63
content_hash = hash(content)
if self._is_duplicate(content_hash):
logger.info(f"重复告警已去重: {pair_name}")
return AlertStatus.THROTTLED # ❌ 返回THROTTLED,但orchestrator未检查
- 触发条件: 60秒内发送了内容相同的告警
- 当前行为: orchestrator 直接
return,不发送简短通知 - 严重后果: 用户完全收不到开仓通知!
场景3: 飞书API调用失败
# alert_sender.py:66-72
try:
sender_colourful(content=content, title=title)
self._record(pair_name, content_hash)
return AlertStatus.SUCCESS
except Exception as e:
logger.error(f"告警发送失败: {pair_name} | {e}")
return AlertStatus.FAILED # ❌ 返回FAILED,但orchestrator未检查
- 触发条件: 网络异常、飞书webhook失败、队列满等
- 当前行为: orchestrator 直接
return,不发送简短通知 - 严重后果: 用户完全收不到开仓通知!
实际影响评估
测试场景: 3个币对在1分钟内同时触发开仓信号
| 时间 | 币对 | 详细告警 | 简短通知 | 用户收到 |
|---|---|---|---|---|
| 00:00 | BTC | ✅ 成功 | ❌ 未调用 | ✅ 收到 |
| 00:10 | ETH | ✅ 成功 | ❌ 未调用 | ✅ 收到 |
| 00:20 | SOL | ❌ 限流 | ❌ 未调用 | ❌ 未收到 |
修复方案
方案1: 检查返回值并降级(推荐)
def _send_entry_alert(
self,
signal: PairTradeSignal,
multi_period_result: dict,
position: PairPosition,
order_result: PairOrderResult,
adaptive_z: float,
):
"""发送开仓告警(详细告警 or 简短通知)"""
available_balance = self._executor.get_available_balance()
if ENABLE_SIGNAL_DETAIL_ALERT:
try:
title, content = format_signal_alert(
signal=signal,
multi_period_result=multi_period_result,
position=position,
order_result=order_result,
leverage=self._config.leverage,
adaptive_z=adaptive_z,
adaptive_threshold=self._config.strategy_adaptive_threshold,
reversion_factor=self._config.strategy_reversion_factor,
available_balance=available_balance,
network_label=self._config.network_label,
)
priority = "high" if abs(adaptive_z) > self._config.strategy_adaptive_threshold * 1.4 else "normal"
# ✅ 修复: 检查发送状态
status = self._alert_sender.send(
title=title,
content=content,
pair_name=signal.symbol,
priority=priority,
)
# ✅ 修复: 只有成功才返回
if status == AlertStatus.SUCCESS:
return
else:
# ✅ 修复: 被限流/去重/失败时降级
logger.warning(
f"详细告警发送失败({status.value}): {signal.symbol},降级为简短通知"
)
except Exception as e:
logger.error(f"详细告警生成失败,降级为简短通知: {e}")
# 降级到简短通知
self._send_simple_entry_notification(
signal, position, order_result, adaptive_z, available_balance,
)
方案2: 简短通知也使用AlertSender(可选增强)
def _send_simple_entry_notification(
self,
signal: PairTradeSignal,
position: PairPosition,
order_result: PairOrderResult,
adaptive_z: float,
available_balance: float,
):
"""发送简短开仓通知(降级兜底)"""
# ... 构建简短通知内容 ...
# ✅ 增强: 简短通知也使用限流/去重机制
status = self._alert_sender.send(
title=title,
content=content,
pair_name=signal.symbol,
priority="high", # ✅ 简短通知使用high优先级,绕过限流
)
if status != AlertStatus.SUCCESS:
logger.error(f"简短通知也发送失败: {signal.symbol} | {status.value}")
🟠 BUG #2: 限流配置过于严格(High)
问题位置
src/utils/monitoring/alert_sender.py:22-29 - AlertSender.__init__
问题代码
def __init__(
self,
rate_limit_window: int = 300, # ❌ 5分钟窗口过长
rate_limit_max: int = 2, # ❌ 最多2条过于严格
dedup_window: int = 60,
):
# 限流:每个币对在窗口内最多 N 条
self._rate_window = rate_limit_window
self._rate_max = rate_limit_max
self._history: dict[str, deque[float]] = defaultdict(lambda: deque(maxlen=20))
# 去重:相同内容哈希在窗口内去重
self._dedup_window = dedup_window
self._recent_hashes: dict[int, float] = {}
问题分析
当前限流策略
- 窗口大小: 300秒(5分钟)
- 最大告警数: 2条/币对
- 优先级bypass: 仅
priority="high"绕过限流
问题场景
场景1: 多个币对同时开仓
时间线:
00:00 - BTC 开仓 ✅ (1/2)
00:30 - ETH 开仓 ✅ (2/2)
01:00 - SOL 开仓 ❌ 被限流 (3/2)
02:00 - DOGE 开仓 ❌ 被限流 (4/2)
05:01 - AVAX 开仓 ✅ (1/2) - 5分钟后窗口重置
场景2: 同一币对快速开平仓
同一币对 BTC:
00:00 - 开仓告警 ✅ (1/2)
00:30 - 平仓告警 ✅ (2/2)
01:00 - 再次开仓 ❌ 被限流 (3/2)
修复方案
方案1: 调整限流参数(推荐)
# orchestrator.py:66
self._alert_sender = AlertSender(
rate_limit_window=60, # ✅ 缩短到1分钟
rate_limit_max=5, # ✅ 增加到5条
dedup_window=30, # ✅ 缩短去重窗口到30秒
)
理由:
- 1分钟窗口更符合交易频率
- 5条限制可应对多币对同时开仓
- 30秒去重窗口足够防止重复告警
方案2: 区分开仓和平仓限流(可选增强)
class AlertSender:
def __init__(
self,
entry_rate_limit_window: int = 60,
entry_rate_limit_max: int = 5,
exit_rate_limit_window: int = 60,
exit_rate_limit_max: int = 10, # 平仓限制更宽松
dedup_window: int = 30,
):
# ... 分别记录开仓和平仓历史
方案3: 全局限流而非币对限流(可选)
class AlertSender:
def __init__(
self,
global_rate_limit_window: int = 60,
global_rate_limit_max: int = 10, # 全局每分钟10条
per_pair_rate_limit_max: int = 3, # 单币对每分钟3条
):
# ... 同时维护全局和单币对限流
🟡 BUG #3: 去重逻辑存在缺陷(Medium)
问题位置
src/utils/monitoring/alert_sender.py:60-63, 84-93
问题代码
# 发送方法中
content_hash = hash(content) # ❌ 使用Python内置hash
if self._is_duplicate(content_hash):
logger.info(f"重复告警已去重: {pair_name}")
return AlertStatus.THROTTLED
# 去重检查方法
def _is_duplicate(self, content_hash: int) -> bool:
now = time.time()
# 清理过期(惰性清理,不频繁)
if len(self._recent_hashes) > 100:
expired = [k for k, t in self._recent_hashes.items() if now - t > self._dedup_window]
for k in expired:
del self._recent_hashes[k]
return content_hash in self._recent_hashes and now - self._recent_hashes[content_hash] < self._dedup_window
问题分析
问题1: Python内置hash()不稳定
# Python 3.3+ 默认启用hash随机化(PYTHONHASHSEED)
# 不同进程/重启后,相同字符串的hash值会变化
>>> hash("test") # 进程1
-1234567890123456789
>>> hash("test") # 进程2(重启后)
9876543210987654321 # ❌ 不同!
影响:
- 程序重启后,去重失效(但影响不大,因为窗口只有60秒)
- 多进程部署时,各进程去重不同步
问题2: 理论上存在哈希碰撞
虽然Python的hash算法碰撞概率极低,但理论上可能两个不同告警内容产生相同hash值。
问题3: 去重仅基于内容
# 假设两个不同币对,但其他参数完全相同,生成了相同的告警内容
content_BTC = "**方向**: 做多\n**Z-score 4h**: +2.5000\n..."
content_ETH = "**方向**: 做多\n**Z-score 4h**: +2.5000\n..."
# hash(content_BTC) == hash(content_ETH) # 可能相同!
# ❌ ETH的告警会被误判为重复
修复方案
方案1: 使用稳定哈希算法(推荐)
import hashlib
class AlertSender:
def send(self, title: str, content: str, pair_name: str, priority: str = "normal") -> AlertStatus:
# ... 限流检查 ...
# ✅ 修复: 使用MD5/SHA256等稳定哈希
content_hash = self._get_content_hash(content)
if self._is_duplicate(content_hash):
logger.info(f"重复告警已去重: {pair_name}")
return AlertStatus.THROTTLED
# ... 发送逻辑 ...
def _get_content_hash(self, content: str) -> str:
"""使用MD5生成稳定哈希(用于去重,非加密场景)"""
return hashlib.md5(content.encode('utf-8')).hexdigest()
def _is_duplicate(self, content_hash: str) -> bool: # ✅ 改为str类型
now = time.time()
# ... 清理逻辑 ...
return content_hash in self._recent_hashes and now - self._recent_hashes[content_hash] < self._dedup_window
方案2: 结合币对和时间戳去重(可选增强)
def _get_dedup_key(self, pair_name: str, content: str) -> str:
"""生成去重key: pair_name + content_hash"""
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()[:8]
return f"{pair_name}:{content_hash}"
# 使用
dedup_key = self._get_dedup_key(pair_name, content)
if self._is_duplicate(dedup_key):
# ...
优势:
- 不同币对即使内容相同也不会被去重
- 更精确的去重控制
🟠 BUG #4: 异步队列满时静默丢弃(High)
问题位置
src/utils/monitoring/lark_bot.py:115-119 - sender_colourful
问题代码
def sender_colourful(content, title=''):
"""
异步飞书富文本卡片告警(非阻塞)
消息被放入后台队列,由独立线程发送,不阻塞调用方。
"""
if not lark_bot_id:
logger.warning('lark 告警跳过: LARKBOT_ID 未配置')
return None
_ensure_worker()
url = f'{LARK_WEBHOOK_BASE}{lark_bot_id}'
message = {
# ... 构建消息体 ...
}
headers = {'Content-Type': 'application/json'}
payload = json.dumps(message)
try:
_send_queue.put_nowait((url, headers, payload)) # ❌ 队列满直接抛异常
except queue.Full:
logger.warning('lark 告警队列已满,丢弃本次告警') # ❌ 仅warning级别
return None # ❌ 静默失败,调用方无法感知
return "queued"
问题分析
队列容量
# lark_bot.py:15
_send_queue: queue.Queue = queue.Queue(maxsize=200) # 仅200条
问题场景
场景1: 高频告警场景
假设:
- 每秒产生5条告警(开仓、平仓、止损等)
- 飞书API响应时间2秒/条
- 队列处理速度: 0.5条/秒(2秒/条)
队列增长速度 = 5条/秒 - 0.5条/秒 = 4.5条/秒
队列满时间 = 200条 / 4.5条/秒 ≈ 44秒
场景2: 飞书API故障
如果飞书API临时故障(超时/500错误):
- 队列快速堆积
- 44秒后队列满
- 新告警全部丢弃
- ❌ 调用方无法感知,不会触发降级
当前行为链路
orchestrator._send_entry_alert()
→ alert_sender.send()
→ sender_colourful()
→ _send_queue.put_nowait() ❌ queue.Full 异常
→ return None ❌ 静默失败
→ return AlertStatus.SUCCESS ❌ 错误!实际未发送
→ return ❌ 不降级到简短通知
修复方案
方案1: 队列满时抛出异常(推荐)
def sender_colourful(content, title=''):
"""
异步飞书富文本卡片告警(非阻塞)
Raises:
RuntimeError: 队列满时抛出异常,由调用方处理
"""
if not lark_bot_id:
logger.warning('lark 告警跳过: LARKBOT_ID 未配置')
return None
_ensure_worker()
url = f'{LARK_WEBHOOK_BASE}{lark_bot_id}'
message = {
# ... 构建消息体 ...
}
headers = {'Content-Type': 'application/json'}
payload = json.dumps(message)
try:
_send_queue.put_nowait((url, headers, payload))
except queue.Full:
logger.error('lark 告警队列已满,发送失败') # ✅ 改为error级别
raise RuntimeError("Alert queue full") # ✅ 抛出异常让调用方感知
return "queued"
配合 AlertSender 捕获异常:
class AlertSender:
def send(self, title: str, content: str, pair_name: str, priority: str = "normal") -> AlertStatus:
# ... 限流、去重检查 ...
try:
sender_colourful(content=content, title=title)
self._record(pair_name, content_hash)
return AlertStatus.SUCCESS
except RuntimeError as e: # ✅ 捕获队列满异常
logger.error(f"告警队列满: {pair_name} | {e}")
return AlertStatus.FAILED
except Exception as e:
logger.error(f"告警发送失败: {pair_name} | {e}")
return AlertStatus.FAILED
方案2: 增加队列容量(简单但治标不治本)
# lark_bot.py:15
_send_queue: queue.Queue = queue.Queue(maxsize=1000) # ✅ 增加到1000
优点: 简单直接
缺点: 仅延缓问题,极端场景仍会满
方案3: 使用阻塞put(可选,但会影响性能)
try:
_send_queue.put((url, headers, payload), timeout=1.0) # ✅ 阻塞1秒
except queue.Full:
logger.error('lark 告警队列满(1秒超时),发送失败')
raise RuntimeError("Alert queue full after 1s timeout")
优点: 给队列一些消化时间
缺点: 阻塞可能影响主线程性能
🔧 综合修复建议
修复优先级
| 优先级 | BUG | 工作量 | 风险 |
|---|---|---|---|
| P0 | #1 告警丢失 | 1小时 | 低 |
| P1 | #2 限流过严 | 30分钟 | 低 |
| P1 | #4 队列丢弃 | 1小时 | 中 |
| P2 | #3 去重缺陷 | 1小时 | 低 |
修复步骤
第一阶段: 紧急修复(P0)
- 修复BUG #1: 检查
AlertSender.send()返回值- 文件:
src/trading/orchestrator.py:753-794 - 工作量: 10行代码修改
- 测试: 模拟限流/去重场景验证降级
- 文件:
第二阶段: 高优先级修复(P1)
-
修复BUG #2: 调整限流参数
- 文件:
src/trading/orchestrator.py:66 - 工作量: 3行代码修改
- 测试: 多币对同时开仓测试
- 文件:
-
修复BUG #4: 队列满时抛出异常
- 文件:
src/utils/monitoring/lark_bot.py:115-119 - 文件:
src/utils/monitoring/alert_sender.py:66-72 - 工作量: 10行代码修改
- 测试: 模拟队列满场景
- 文件:
第三阶段: 中优先级优化(P2)
- 修复BUG #3: 改进去重逻辑
- 文件:
src/utils/monitoring/alert_sender.py - 工作量: 20行代码修改
- 测试: 单元测试验证哈希稳定性
- 文件:
回归测试清单
- [ ] 单个币对正常开仓告警
- [ ] 多个币对同时开仓(5个以上)
- [ ] 同一币对5分钟内多次开仓
- [ ] 飞书API临时故障场景
- [ ] 队列满场景(压测)
- [ ] 程序重启后去重功能
- [ ] 高优先级告警绕过限流
- [ ] 降级到简短通知功能
📊 附录A: 代码审查依据
审查范围
src/trading/orchestrator.py(959行)src/utils/monitoring/alert_sender.py(99行)src/utils/monitoring/signal_alert_formatter.py(518行)src/utils/monitoring/lark_bot.py(122行)src/utils/monitoring/alert_level.py(19行)
审查方法
- 静态代码分析
- 控制流路径分析
- 错误处理链路追踪
- 边界条件测试用例设计
Git提交历史
daf7ec2 open alert lark content fat 2 (2026-02-16)
f49b828 open alert lark content fat (2026-02-16)
73f3d46 debug
d938e5a orphan manager
2785889 delete garbagfe
📊 附录B: 影响分析
用户影响
| 场景 | 当前行为 | 用户感知 | 严重程度 |
|---|---|---|---|
| 开仓后5分钟内第3次开仓 | 无告警 | 用户不知道开仓成功 | 🔴 Critical |
| 飞书API故障 | 无告警 | 用户不知道开仓成功 | 🔴 Critical |
| 多币对同时开仓 | 部分无告警 | 遗漏重要交易信号 | 🟠 High |
| 队列满(极端情况) | 无告警 | 完全丢失告警 | 🟠 High |
风险评估
当前系统风险等级: 🔴 High
主要风险:
- 用户错过关键交易信号
- 无法及时止损或调整仓位
- 资金管理风险增加
建议措施:
- 立即修复 BUG #1
- 增加监控告警:限流/去重/队列满事件
- 添加健康检查接口,监控告警系统状态
📝 附录C: 代码示例(完整修复)
修复后的 orchestrator.py
def _send_entry_alert(
self,
signal: PairTradeSignal,
multi_period_result: dict,
position: PairPosition,
order_result: PairOrderResult,
adaptive_z: float,
):
"""发送开仓告警(详细告警 or 简短通知)
降级策略:
1. 尝试发送详细告警
2. 如果失败/限流/去重,降级为简短通知
3. 简短通知使用high优先级绕过限流
"""
available_balance = self._executor.get_available_balance()
if ENABLE_SIGNAL_DETAIL_ALERT:
try:
title, content = format_signal_alert(
signal=signal,
multi_period_result=multi_period_result,
position=position,
order_result=order_result,
leverage=self._config.leverage,
adaptive_z=adaptive_z,
adaptive_threshold=self._config.strategy_adaptive_threshold,
reversion_factor=self._config.strategy_reversion_factor,
available_balance=available_balance,
network_label=self._config.network_label,
)
priority = "high" if abs(adaptive_z) > self._config.strategy_adaptive_threshold * 1.4 else "normal"
# ✅ 修复: 检查发送状态
status = self._alert_sender.send(
title=title,
content=content,
pair_name=signal.symbol,
priority=priority,
)
# ✅ 修复: 只有成功才返回
if status == AlertStatus.SUCCESS:
logger.info(f"详细告警发送成功: {signal.symbol}")
return
else:
# ✅ 修复: 被限流/去重/失败时降级
logger.warning(
f"详细告警发送失败({status.value}): {signal.symbol},降级为简短通知"
)
except Exception as e:
logger.error(f"详细告警生成失败,降级为简短通知: {e}", exc_info=True)
# 降级到简短通知(使用high优先级绕过限流)
self._send_simple_entry_notification_with_priority(
signal, position, order_result, adaptive_z, available_balance,
)
def _send_simple_entry_notification_with_priority(
self,
signal: PairTradeSignal,
position: PairPosition,
order_result: PairOrderResult,
adaptive_z: float,
available_balance: float,
):
"""发送简短开仓通知(降级兜底,使用高优先级)"""
# 构建简短通知内容
leverage = self._config.leverage
leg_a_notional = position.alt_size * position.alt_entry_price
leg_b_notional = position.base_size * position.base_entry_price
total_notional = leg_a_notional + leg_b_notional
used_margin = total_notional / leverage if leverage > 0 else total_notional
leg_a_detail = self._format_leg_detail("Leg A (目标)", order_result.leg_a) if order_result.leg_a else ""
leg_b_detail = self._format_leg_detail("Leg B (基准)", order_result.leg_b) if order_result.leg_b and order_result.leg_b.success else ""
thresh = self._config.strategy_adaptive_threshold
az_pct = abs(adaptive_z) / thresh * 100 if thresh > 0 else 0
title = f"🟢 开仓成功 {self._config.network_label} - {symbol_to_coin(signal.symbol)}"
content = (
f"**方向**: {'做多' if signal.direction == 'long' else '做空'}\n"
f"**模式**: {position.pair_mode}\n"
f"**杠杆**: {leverage}x\n"
f"**信号强度**: {signal.signal_strength}\n"
f"---\n"
f"**Z-score 4h**: {signal.zscore_4h:+.4f}\n"
f"**Adaptive Z**: {adaptive_z:+.4f} ({az_pct:.0f}% of ±{thresh})\n"
f"**回归目标**: {adaptive_z * self._config.strategy_reversion_factor:+.4f}\n"
f"---\n"
f"{leg_a_detail}"
f"{leg_b_detail}"
f"---\n"
f"**名义价值**: ${total_notional:.2f}\n"
f"**占用保证金**: ${used_margin:.2f}\n"
f"**剩余可用**: ${available_balance:.2f}\n"
f"---\n"
f"**仓位ID**: {position.position_id[:8]}...\n"
f"**信号ID**: {signal.signal_id[:8]}...\n"
f"**网络**: {self._config.network_label}"
)
# ✅ 使用high优先级,绕过限流
status = self._alert_sender.send(
title=title,
content=content,
pair_name=signal.symbol,
priority="high", # ✅ 简短通知强制使用high优先级
)
if status != AlertStatus.SUCCESS:
# ✅ 最终兜底:直接调用sender_colourful
logger.error(f"简短通知也发送失败({status.value}): {signal.symbol},尝试直接发送")
try:
sender_colourful(content=content, title=title)
except Exception as e:
logger.error(f"直接发送也失败: {signal.symbol} | {e}", exc_info=True)
修复后的 alert_sender.py
import hashlib
import time
from collections import defaultdict, deque
from src.utils.monitoring.lark_bot import sender_colourful
from src.utils.monitoring.alert_level import AlertStatus
from src.utils.core.logging_config import logger
class AlertSender:
"""告警发送器(限流 + 去重)"""
def __init__(
self,
rate_limit_window: int = 60, # ✅ 修复: 缩短到1分钟
rate_limit_max: int = 5, # ✅ 修复: 增加到5条
dedup_window: int = 30, # ✅ 修复: 缩短到30秒
):
# 限流:每个币对在窗口内最多 N 条
self._rate_window = rate_limit_window
self._rate_max = rate_limit_max
self._history: dict[str, deque[float]] = defaultdict(lambda: deque(maxlen=20))
# 去重:相同内容哈希在窗口内去重
self._dedup_window = dedup_window
self._recent_hashes: dict[str, float] = {} # ✅ 修复: 改为str类型
def send(
self,
title: str,
content: str,
pair_name: str,
priority: str = "normal",
) -> AlertStatus:
"""发送告警(同步,lark_bot 内部异步队列)
Args:
title: 告警标题
content: 告警内容
pair_name: 币对名称(用于限流 key)
priority: 优先级 (normal/high) — high 不限流
Returns:
AlertStatus
"""
# 1. 限流
if priority != "high" and not self._check_rate_limit(pair_name):
logger.info(f"告警被限流: {pair_name}")
return AlertStatus.THROTTLED
# 2. 去重
content_hash = self._get_content_hash(content) # ✅ 修复: 使用稳定哈希
if self._is_duplicate(content_hash):
logger.info(f"重复告警已去重: {pair_name}")
return AlertStatus.THROTTLED
# 3. 发送
try:
sender_colourful(content=content, title=title)
self._record(pair_name, content_hash)
return AlertStatus.SUCCESS
except RuntimeError as e: # ✅ 修复: 捕获队列满异常
logger.error(f"告警队列满: {pair_name} | {e}")
return AlertStatus.FAILED
except Exception as e:
logger.error(f"告警发送失败: {pair_name} | {e}")
return AlertStatus.FAILED
def _get_content_hash(self, content: str) -> str:
"""使用MD5生成稳定哈希(用于去重,非加密场景)"""
return hashlib.md5(content.encode('utf-8')).hexdigest()
def _check_rate_limit(self, pair_name: str) -> bool:
now = time.time()
history = self._history[pair_name]
# 清理过期
while history and now - history[0] > self._rate_window:
history.popleft()
return len(history) < self._rate_max
def _is_duplicate(self, content_hash: str) -> bool: # ✅ 修复: 改为str参数
now = time.time()
# 清理过期(惰性清理,不频繁)
if len(self._recent_hashes) > 100:
expired = [k for k, t in self._recent_hashes.items() if now - t > self._dedup_window]
for k in expired:
del self._recent_hashes[k]
return content_hash in self._recent_hashes and now - self._recent_hashes[content_hash] < self._dedup_window
def _record(self, pair_name: str, content_hash: str): # ✅ 修复: 改为str参数
now = time.time()
self._history[pair_name].append(now)
self._recent_hashes[content_hash] = now
修复后的 lark_bot.py
def sender_colourful(content, title=''):
"""
异步飞书富文本卡片告警(非阻塞)
消息被放入后台队列,由独立线程发送,不阻塞调用方。
Raises:
RuntimeError: 队列满时抛出异常,由调用方处理
https://open.larksuite.com/document/common-capabilities/message-card/message-cards-content/using-markdown-tags
"""
if not lark_bot_id:
logger.warning('lark 告警跳过: LARKBOT_ID 未配置')
return None
_ensure_worker()
url = f'{LARK_WEBHOOK_BASE}{lark_bot_id}'
message = {
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"header": {
"title": {
"tag": "plain_text",
"content": title
},
"template": "red"
},
"elements": [{
"tag": "markdown",
"content": content,
}]
}
}
if lark_alert_email:
message["email"] = lark_alert_email
headers = {'Content-Type': 'application/json'}
payload = json.dumps(message)
try:
_send_queue.put_nowait((url, headers, payload))
except queue.Full:
logger.error('lark 告警队列已满,发送失败') # ✅ 修复: 改为error级别
raise RuntimeError("Alert queue full") # ✅ 修复: 抛出异常让调用方感知
return "queued"
结论
当前"开仓信号详细告警"功能模块存在 4个严重BUG,其中 BUG #1(告警丢失) 为 Critical级别,必须立即修复。建议按照本报告提供的修复方案,分三个阶段完成修复:
- 紧急修复(P0):修复告警丢失问题
- 高优先级修复(P1):调整限流参数、修复队列满问题
- 中优先级优化(P2):改进去重逻辑
修复完成后,需要进行完整的回归测试,确保告警系统在各种场景下都能正常工作。
报告生成时间: 2026-02-16
报告作者: Claude Code SuperClaude
代码审查版本: daf7ec2