开仓信号详细告警设计BUG分析2

开仓信号详细告警设计BUG分析报告

分析日期: 2026-02-16
分析范围: 开仓信号详细告警功能模块
代码版本: daf7ec2 (open alert lark content fat 2)


📋 执行摘要

经过代码审查,发现"开仓信号详细告警"功能模块存在 4个严重BUG,其中 BUG #1(告警丢失)Critical级别,会导致用户在开仓后收不到任何通知,必须立即修复。

关键发现

BUG编号 问题描述 严重程度 影响范围
#1 告警发送失败时不降级 🔴 Critical 所有开仓信号
#2 限流配置过于严格 🟠 High 高频交易场景
#3 去重逻辑存在缺陷 🟡 Medium 边缘情况
#4 异步队列满时静默丢弃 🟠 High 极端高频场景

🚨 BUG #1: 告警发送失败时不降级(Critical)

问题位置

src/trading/orchestrator.py:753-794 - _send_entry_alert 方法

问题代码

def _send_entry_alert(
    self,
    signal: PairTradeSignal,
    multi_period_result: dict,
    position: PairPosition,
    order_result: PairOrderResult,
    adaptive_z: float,
):
    """发送开仓告警(详细告警 or 简短通知)"""
    available_balance = self._executor.get_available_balance()

    if ENABLE_SIGNAL_DETAIL_ALERT:
        try:
            title, content = format_signal_alert(
                signal=signal,
                multi_period_result=multi_period_result,
                position=position,
                order_result=order_result,
                leverage=self._config.leverage,
                adaptive_z=adaptive_z,
                adaptive_threshold=self._config.strategy_adaptive_threshold,
                reversion_factor=self._config.strategy_reversion_factor,
                available_balance=available_balance,
                network_label=self._config.network_label,
            )

            priority = "high" if abs(adaptive_z) > self._config.strategy_adaptive_threshold * 1.4 else "normal"
            self._alert_sender.send(  # ❌ 问题1: 没有检查返回值
                title=title,
                content=content,
                pair_name=signal.symbol,
                priority=priority,
            )
            return  # ❌ 问题2: 直接返回,不检查发送是否成功
        except Exception as e:
            logger.error(f"详细告警生成失败,降级为简短通知: {e}")

    # 回退到简短通知
    self._send_simple_entry_notification(
        signal, position, order_result, adaptive_z, available_balance,
    )

问题分析

AlertSender.send() 的返回值

AlertSender.send() 方法返回 AlertStatus 枚举:

class AlertStatus(Enum):
    SUCCESS = "发送成功"
    FAILED = "发送失败"
    THROTTLED = "被限流"
    DEGRADED = "降级发送"

三种失败场景

场景1: 告警被限流

# alert_sender.py:54-57
if priority != "high" and not self._check_rate_limit(pair_name):
    logger.info(f"告警被限流: {pair_name}")
    return AlertStatus.THROTTLED  # ❌ 返回THROTTLED,但orchestrator未检查
  • 触发条件: 5分钟内同一币对发送超过2条告警(默认配置)
  • 当前行为: orchestrator 直接 return,不发送简短通知
  • 严重后果: 用户完全收不到开仓通知!

场景2: 告警被去重

# alert_sender.py:60-63
content_hash = hash(content)
if self._is_duplicate(content_hash):
    logger.info(f"重复告警已去重: {pair_name}")
    return AlertStatus.THROTTLED  # ❌ 返回THROTTLED,但orchestrator未检查
  • 触发条件: 60秒内发送了内容相同的告警
  • 当前行为: orchestrator 直接 return,不发送简短通知
  • 严重后果: 用户完全收不到开仓通知!

场景3: 飞书API调用失败

# alert_sender.py:66-72
try:
    sender_colourful(content=content, title=title)
    self._record(pair_name, content_hash)
    return AlertStatus.SUCCESS
except Exception as e:
    logger.error(f"告警发送失败: {pair_name} | {e}")
    return AlertStatus.FAILED  # ❌ 返回FAILED,但orchestrator未检查
  • 触发条件: 网络异常、飞书webhook失败、队列满等
  • 当前行为: orchestrator 直接 return,不发送简短通知
  • 严重后果: 用户完全收不到开仓通知!

实际影响评估

测试场景: 3个币对在1分钟内同时触发开仓信号

时间 币对 详细告警 简短通知 用户收到
00:00 BTC ✅ 成功 ❌ 未调用 ✅ 收到
00:10 ETH ✅ 成功 ❌ 未调用 ✅ 收到
00:20 SOL ❌ 限流 ❌ 未调用 未收到

修复方案

方案1: 检查返回值并降级(推荐)

def _send_entry_alert(
    self,
    signal: PairTradeSignal,
    multi_period_result: dict,
    position: PairPosition,
    order_result: PairOrderResult,
    adaptive_z: float,
):
    """发送开仓告警(详细告警 or 简短通知)"""
    available_balance = self._executor.get_available_balance()

    if ENABLE_SIGNAL_DETAIL_ALERT:
        try:
            title, content = format_signal_alert(
                signal=signal,
                multi_period_result=multi_period_result,
                position=position,
                order_result=order_result,
                leverage=self._config.leverage,
                adaptive_z=adaptive_z,
                adaptive_threshold=self._config.strategy_adaptive_threshold,
                reversion_factor=self._config.strategy_reversion_factor,
                available_balance=available_balance,
                network_label=self._config.network_label,
            )

            priority = "high" if abs(adaptive_z) > self._config.strategy_adaptive_threshold * 1.4 else "normal"

            # ✅ 修复: 检查发送状态
            status = self._alert_sender.send(
                title=title,
                content=content,
                pair_name=signal.symbol,
                priority=priority,
            )

            # ✅ 修复: 只有成功才返回
            if status == AlertStatus.SUCCESS:
                return
            else:
                # ✅ 修复: 被限流/去重/失败时降级
                logger.warning(
                    f"详细告警发送失败({status.value}): {signal.symbol},降级为简短通知"
                )
        except Exception as e:
            logger.error(f"详细告警生成失败,降级为简短通知: {e}")

    # 降级到简短通知
    self._send_simple_entry_notification(
        signal, position, order_result, adaptive_z, available_balance,
    )

方案2: 简短通知也使用AlertSender(可选增强)

def _send_simple_entry_notification(
    self,
    signal: PairTradeSignal,
    position: PairPosition,
    order_result: PairOrderResult,
    adaptive_z: float,
    available_balance: float,
):
    """发送简短开仓通知(降级兜底)"""
    # ... 构建简短通知内容 ...

    # ✅ 增强: 简短通知也使用限流/去重机制
    status = self._alert_sender.send(
        title=title,
        content=content,
        pair_name=signal.symbol,
        priority="high",  # ✅ 简短通知使用high优先级,绕过限流
    )

    if status != AlertStatus.SUCCESS:
        logger.error(f"简短通知也发送失败: {signal.symbol} | {status.value}")

🟠 BUG #2: 限流配置过于严格(High)

问题位置

src/utils/monitoring/alert_sender.py:22-29 - AlertSender.__init__

问题代码

def __init__(
    self,
    rate_limit_window: int = 300,  # ❌ 5分钟窗口过长
    rate_limit_max: int = 2,        # ❌ 最多2条过于严格
    dedup_window: int = 60,
):
    # 限流:每个币对在窗口内最多 N 条
    self._rate_window = rate_limit_window
    self._rate_max = rate_limit_max
    self._history: dict[str, deque[float]] = defaultdict(lambda: deque(maxlen=20))

    # 去重:相同内容哈希在窗口内去重
    self._dedup_window = dedup_window
    self._recent_hashes: dict[int, float] = {}

问题分析

当前限流策略

  • 窗口大小: 300秒(5分钟)
  • 最大告警数: 2条/币对
  • 优先级bypass: 仅 priority="high" 绕过限流

问题场景

场景1: 多个币对同时开仓

时间线:
00:00 - BTC 开仓 ✅ (1/2)
00:30 - ETH 开仓 ✅ (2/2)
01:00 - SOL 开仓 ❌ 被限流 (3/2)
02:00 - DOGE 开仓 ❌ 被限流 (4/2)
05:01 - AVAX 开仓 ✅ (1/2) - 5分钟后窗口重置

场景2: 同一币对快速开平仓

同一币对 BTC:
00:00 - 开仓告警 ✅ (1/2)
00:30 - 平仓告警 ✅ (2/2)
01:00 - 再次开仓 ❌ 被限流 (3/2)

修复方案

方案1: 调整限流参数(推荐)

# orchestrator.py:66
self._alert_sender = AlertSender(
    rate_limit_window=60,   # ✅ 缩短到1分钟
    rate_limit_max=5,       # ✅ 增加到5条
    dedup_window=30,        # ✅ 缩短去重窗口到30秒
)

理由:

  • 1分钟窗口更符合交易频率
  • 5条限制可应对多币对同时开仓
  • 30秒去重窗口足够防止重复告警

方案2: 区分开仓和平仓限流(可选增强)

class AlertSender:
    def __init__(
        self,
        entry_rate_limit_window: int = 60,
        entry_rate_limit_max: int = 5,
        exit_rate_limit_window: int = 60,
        exit_rate_limit_max: int = 10,  # 平仓限制更宽松
        dedup_window: int = 30,
    ):
        # ... 分别记录开仓和平仓历史

方案3: 全局限流而非币对限流(可选)

class AlertSender:
    def __init__(
        self,
        global_rate_limit_window: int = 60,
        global_rate_limit_max: int = 10,  # 全局每分钟10条
        per_pair_rate_limit_max: int = 3,  # 单币对每分钟3条
    ):
        # ... 同时维护全局和单币对限流

🟡 BUG #3: 去重逻辑存在缺陷(Medium)

问题位置

src/utils/monitoring/alert_sender.py:60-63, 84-93

问题代码

# 发送方法中
content_hash = hash(content)  # ❌ 使用Python内置hash
if self._is_duplicate(content_hash):
    logger.info(f"重复告警已去重: {pair_name}")
    return AlertStatus.THROTTLED

# 去重检查方法
def _is_duplicate(self, content_hash: int) -> bool:
    now = time.time()

    # 清理过期(惰性清理,不频繁)
    if len(self._recent_hashes) > 100:
        expired = [k for k, t in self._recent_hashes.items() if now - t > self._dedup_window]
        for k in expired:
            del self._recent_hashes[k]

    return content_hash in self._recent_hashes and now - self._recent_hashes[content_hash] < self._dedup_window

问题分析

问题1: Python内置hash()不稳定

# Python 3.3+ 默认启用hash随机化(PYTHONHASHSEED)
# 不同进程/重启后,相同字符串的hash值会变化

>>> hash("test")  # 进程1
-1234567890123456789

>>> hash("test")  # 进程2(重启后)
9876543210987654321  # ❌ 不同!

影响:

  • 程序重启后,去重失效(但影响不大,因为窗口只有60秒)
  • 多进程部署时,各进程去重不同步

问题2: 理论上存在哈希碰撞

虽然Python的hash算法碰撞概率极低,但理论上可能两个不同告警内容产生相同hash值。

问题3: 去重仅基于内容

# 假设两个不同币对,但其他参数完全相同,生成了相同的告警内容
content_BTC = "**方向**: 做多\n**Z-score 4h**: +2.5000\n..."
content_ETH = "**方向**: 做多\n**Z-score 4h**: +2.5000\n..."

# hash(content_BTC) == hash(content_ETH)  # 可能相同!
# ❌ ETH的告警会被误判为重复

修复方案

方案1: 使用稳定哈希算法(推荐)

import hashlib

class AlertSender:
    def send(self, title: str, content: str, pair_name: str, priority: str = "normal") -> AlertStatus:
        # ... 限流检查 ...

        # ✅ 修复: 使用MD5/SHA256等稳定哈希
        content_hash = self._get_content_hash(content)
        if self._is_duplicate(content_hash):
            logger.info(f"重复告警已去重: {pair_name}")
            return AlertStatus.THROTTLED

        # ... 发送逻辑 ...

    def _get_content_hash(self, content: str) -> str:
        """使用MD5生成稳定哈希(用于去重,非加密场景)"""
        return hashlib.md5(content.encode('utf-8')).hexdigest()

    def _is_duplicate(self, content_hash: str) -> bool:  # ✅ 改为str类型
        now = time.time()

        # ... 清理逻辑 ...

        return content_hash in self._recent_hashes and now - self._recent_hashes[content_hash] < self._dedup_window

方案2: 结合币对和时间戳去重(可选增强)

def _get_dedup_key(self, pair_name: str, content: str) -> str:
    """生成去重key: pair_name + content_hash"""
    content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()[:8]
    return f"{pair_name}:{content_hash}"

# 使用
dedup_key = self._get_dedup_key(pair_name, content)
if self._is_duplicate(dedup_key):
    # ...

优势:

  • 不同币对即使内容相同也不会被去重
  • 更精确的去重控制

🟠 BUG #4: 异步队列满时静默丢弃(High)

问题位置

src/utils/monitoring/lark_bot.py:115-119 - sender_colourful

问题代码

def sender_colourful(content, title=''):
    """
    异步飞书富文本卡片告警(非阻塞)

    消息被放入后台队列,由独立线程发送,不阻塞调用方。
    """
    if not lark_bot_id:
        logger.warning('lark 告警跳过: LARKBOT_ID 未配置')
        return None

    _ensure_worker()

    url = f'{LARK_WEBHOOK_BASE}{lark_bot_id}'
    message = {
        # ... 构建消息体 ...
    }

    headers = {'Content-Type': 'application/json'}
    payload = json.dumps(message)

    try:
        _send_queue.put_nowait((url, headers, payload))  # ❌ 队列满直接抛异常
    except queue.Full:
        logger.warning('lark 告警队列已满,丢弃本次告警')  # ❌ 仅warning级别
        return None  # ❌ 静默失败,调用方无法感知

    return "queued"

问题分析

队列容量

# lark_bot.py:15
_send_queue: queue.Queue = queue.Queue(maxsize=200)  # 仅200条

问题场景

场景1: 高频告警场景

假设:
- 每秒产生5条告警(开仓、平仓、止损等)
- 飞书API响应时间2秒/条
- 队列处理速度: 0.5条/秒(2秒/条)

队列增长速度 = 5条/秒 - 0.5条/秒 = 4.5条/秒
队列满时间 = 200条 / 4.5条/秒 ≈ 44秒

场景2: 飞书API故障

如果飞书API临时故障(超时/500错误):
- 队列快速堆积
- 44秒后队列满
- 新告警全部丢弃
- ❌ 调用方无法感知,不会触发降级

当前行为链路

orchestrator._send_entry_alert()
  → alert_sender.send()
    → sender_colourful()
      → _send_queue.put_nowait()  ❌ queue.Full 异常
      → return None  ❌ 静默失败
    → return AlertStatus.SUCCESS  ❌ 错误!实际未发送
  → return  ❌ 不降级到简短通知

修复方案

方案1: 队列满时抛出异常(推荐)

def sender_colourful(content, title=''):
    """
    异步飞书富文本卡片告警(非阻塞)

    Raises:
        RuntimeError: 队列满时抛出异常,由调用方处理
    """
    if not lark_bot_id:
        logger.warning('lark 告警跳过: LARKBOT_ID 未配置')
        return None

    _ensure_worker()

    url = f'{LARK_WEBHOOK_BASE}{lark_bot_id}'
    message = {
        # ... 构建消息体 ...
    }

    headers = {'Content-Type': 'application/json'}
    payload = json.dumps(message)

    try:
        _send_queue.put_nowait((url, headers, payload))
    except queue.Full:
        logger.error('lark 告警队列已满,发送失败')  # ✅ 改为error级别
        raise RuntimeError("Alert queue full")  # ✅ 抛出异常让调用方感知

    return "queued"

配合 AlertSender 捕获异常:

class AlertSender:
    def send(self, title: str, content: str, pair_name: str, priority: str = "normal") -> AlertStatus:
        # ... 限流、去重检查 ...

        try:
            sender_colourful(content=content, title=title)
            self._record(pair_name, content_hash)
            return AlertStatus.SUCCESS
        except RuntimeError as e:  # ✅ 捕获队列满异常
            logger.error(f"告警队列满: {pair_name} | {e}")
            return AlertStatus.FAILED
        except Exception as e:
            logger.error(f"告警发送失败: {pair_name} | {e}")
            return AlertStatus.FAILED

方案2: 增加队列容量(简单但治标不治本)

# lark_bot.py:15
_send_queue: queue.Queue = queue.Queue(maxsize=1000)  # ✅ 增加到1000

优点: 简单直接
缺点: 仅延缓问题,极端场景仍会满

方案3: 使用阻塞put(可选,但会影响性能)

try:
    _send_queue.put((url, headers, payload), timeout=1.0)  # ✅ 阻塞1秒
except queue.Full:
    logger.error('lark 告警队列满(1秒超时),发送失败')
    raise RuntimeError("Alert queue full after 1s timeout")

优点: 给队列一些消化时间
缺点: 阻塞可能影响主线程性能


🔧 综合修复建议

修复优先级

优先级 BUG 工作量 风险
P0 #1 告警丢失 1小时
P1 #2 限流过严 30分钟
P1 #4 队列丢弃 1小时
P2 #3 去重缺陷 1小时

修复步骤

第一阶段: 紧急修复(P0)

  1. 修复BUG #1: 检查 AlertSender.send() 返回值
    • 文件: src/trading/orchestrator.py:753-794
    • 工作量: 10行代码修改
    • 测试: 模拟限流/去重场景验证降级

第二阶段: 高优先级修复(P1)

  1. 修复BUG #2: 调整限流参数

    • 文件: src/trading/orchestrator.py:66
    • 工作量: 3行代码修改
    • 测试: 多币对同时开仓测试
  2. 修复BUG #4: 队列满时抛出异常

    • 文件: src/utils/monitoring/lark_bot.py:115-119
    • 文件: src/utils/monitoring/alert_sender.py:66-72
    • 工作量: 10行代码修改
    • 测试: 模拟队列满场景

第三阶段: 中优先级优化(P2)

  1. 修复BUG #3: 改进去重逻辑
    • 文件: src/utils/monitoring/alert_sender.py
    • 工作量: 20行代码修改
    • 测试: 单元测试验证哈希稳定性

回归测试清单

  • [ ] 单个币对正常开仓告警
  • [ ] 多个币对同时开仓(5个以上)
  • [ ] 同一币对5分钟内多次开仓
  • [ ] 飞书API临时故障场景
  • [ ] 队列满场景(压测)
  • [ ] 程序重启后去重功能
  • [ ] 高优先级告警绕过限流
  • [ ] 降级到简短通知功能

📊 附录A: 代码审查依据

审查范围

  • src/trading/orchestrator.py (959行)
  • src/utils/monitoring/alert_sender.py (99行)
  • src/utils/monitoring/signal_alert_formatter.py (518行)
  • src/utils/monitoring/lark_bot.py (122行)
  • src/utils/monitoring/alert_level.py (19行)

审查方法

  • 静态代码分析
  • 控制流路径分析
  • 错误处理链路追踪
  • 边界条件测试用例设计

Git提交历史

daf7ec2 open alert lark content fat 2 (2026-02-16)
f49b828 open alert lark content fat (2026-02-16)
73f3d46 debug
d938e5a orphan manager
2785889 delete garbagfe

📊 附录B: 影响分析

用户影响

场景 当前行为 用户感知 严重程度
开仓后5分钟内第3次开仓 无告警 用户不知道开仓成功 🔴 Critical
飞书API故障 无告警 用户不知道开仓成功 🔴 Critical
多币对同时开仓 部分无告警 遗漏重要交易信号 🟠 High
队列满(极端情况) 无告警 完全丢失告警 🟠 High

风险评估

当前系统风险等级: 🔴 High

主要风险:

  1. 用户错过关键交易信号
  2. 无法及时止损或调整仓位
  3. 资金管理风险增加

建议措施:

  1. 立即修复 BUG #1
  2. 增加监控告警:限流/去重/队列满事件
  3. 添加健康检查接口,监控告警系统状态

📝 附录C: 代码示例(完整修复)

修复后的 orchestrator.py

def _send_entry_alert(
    self,
    signal: PairTradeSignal,
    multi_period_result: dict,
    position: PairPosition,
    order_result: PairOrderResult,
    adaptive_z: float,
):
    """发送开仓告警(详细告警 or 简短通知)

    降级策略:
    1. 尝试发送详细告警
    2. 如果失败/限流/去重,降级为简短通知
    3. 简短通知使用high优先级绕过限流
    """
    available_balance = self._executor.get_available_balance()

    if ENABLE_SIGNAL_DETAIL_ALERT:
        try:
            title, content = format_signal_alert(
                signal=signal,
                multi_period_result=multi_period_result,
                position=position,
                order_result=order_result,
                leverage=self._config.leverage,
                adaptive_z=adaptive_z,
                adaptive_threshold=self._config.strategy_adaptive_threshold,
                reversion_factor=self._config.strategy_reversion_factor,
                available_balance=available_balance,
                network_label=self._config.network_label,
            )

            priority = "high" if abs(adaptive_z) > self._config.strategy_adaptive_threshold * 1.4 else "normal"

            # ✅ 修复: 检查发送状态
            status = self._alert_sender.send(
                title=title,
                content=content,
                pair_name=signal.symbol,
                priority=priority,
            )

            # ✅ 修复: 只有成功才返回
            if status == AlertStatus.SUCCESS:
                logger.info(f"详细告警发送成功: {signal.symbol}")
                return
            else:
                # ✅ 修复: 被限流/去重/失败时降级
                logger.warning(
                    f"详细告警发送失败({status.value}): {signal.symbol},降级为简短通知"
                )
        except Exception as e:
            logger.error(f"详细告警生成失败,降级为简短通知: {e}", exc_info=True)

    # 降级到简短通知(使用high优先级绕过限流)
    self._send_simple_entry_notification_with_priority(
        signal, position, order_result, adaptive_z, available_balance,
    )

def _send_simple_entry_notification_with_priority(
    self,
    signal: PairTradeSignal,
    position: PairPosition,
    order_result: PairOrderResult,
    adaptive_z: float,
    available_balance: float,
):
    """发送简短开仓通知(降级兜底,使用高优先级)"""
    # 构建简短通知内容
    leverage = self._config.leverage
    leg_a_notional = position.alt_size * position.alt_entry_price
    leg_b_notional = position.base_size * position.base_entry_price
    total_notional = leg_a_notional + leg_b_notional
    used_margin = total_notional / leverage if leverage > 0 else total_notional

    leg_a_detail = self._format_leg_detail("Leg A (目标)", order_result.leg_a) if order_result.leg_a else ""
    leg_b_detail = self._format_leg_detail("Leg B (基准)", order_result.leg_b) if order_result.leg_b and order_result.leg_b.success else ""

    thresh = self._config.strategy_adaptive_threshold
    az_pct = abs(adaptive_z) / thresh * 100 if thresh > 0 else 0

    title = f"🟢 开仓成功 {self._config.network_label} - {symbol_to_coin(signal.symbol)}"
    content = (
        f"**方向**: {'做多' if signal.direction == 'long' else '做空'}\n"
        f"**模式**: {position.pair_mode}\n"
        f"**杠杆**: {leverage}x\n"
        f"**信号强度**: {signal.signal_strength}\n"
        f"---\n"
        f"**Z-score 4h**: {signal.zscore_4h:+.4f}\n"
        f"**Adaptive Z**: {adaptive_z:+.4f} ({az_pct:.0f}% of ±{thresh})\n"
        f"**回归目标**: {adaptive_z * self._config.strategy_reversion_factor:+.4f}\n"
        f"---\n"
        f"{leg_a_detail}"
        f"{leg_b_detail}"
        f"---\n"
        f"**名义价值**: ${total_notional:.2f}\n"
        f"**占用保证金**: ${used_margin:.2f}\n"
        f"**剩余可用**: ${available_balance:.2f}\n"
        f"---\n"
        f"**仓位ID**: {position.position_id[:8]}...\n"
        f"**信号ID**: {signal.signal_id[:8]}...\n"
        f"**网络**: {self._config.network_label}"
    )

    # ✅ 使用high优先级,绕过限流
    status = self._alert_sender.send(
        title=title,
        content=content,
        pair_name=signal.symbol,
        priority="high",  # ✅ 简短通知强制使用high优先级
    )

    if status != AlertStatus.SUCCESS:
        # ✅ 最终兜底:直接调用sender_colourful
        logger.error(f"简短通知也发送失败({status.value}): {signal.symbol},尝试直接发送")
        try:
            sender_colourful(content=content, title=title)
        except Exception as e:
            logger.error(f"直接发送也失败: {signal.symbol} | {e}", exc_info=True)

修复后的 alert_sender.py

import hashlib
import time
from collections import defaultdict, deque

from src.utils.monitoring.lark_bot import sender_colourful
from src.utils.monitoring.alert_level import AlertStatus
from src.utils.core.logging_config import logger


class AlertSender:
    """告警发送器(限流 + 去重)"""

    def __init__(
        self,
        rate_limit_window: int = 60,   # ✅ 修复: 缩短到1分钟
        rate_limit_max: int = 5,       # ✅ 修复: 增加到5条
        dedup_window: int = 30,        # ✅ 修复: 缩短到30秒
    ):
        # 限流:每个币对在窗口内最多 N 条
        self._rate_window = rate_limit_window
        self._rate_max = rate_limit_max
        self._history: dict[str, deque[float]] = defaultdict(lambda: deque(maxlen=20))

        # 去重:相同内容哈希在窗口内去重
        self._dedup_window = dedup_window
        self._recent_hashes: dict[str, float] = {}  # ✅ 修复: 改为str类型

    def send(
        self,
        title: str,
        content: str,
        pair_name: str,
        priority: str = "normal",
    ) -> AlertStatus:
        """发送告警(同步,lark_bot 内部异步队列)

        Args:
            title: 告警标题
            content: 告警内容
            pair_name: 币对名称(用于限流 key)
            priority: 优先级 (normal/high) — high 不限流

        Returns:
            AlertStatus
        """
        # 1. 限流
        if priority != "high" and not self._check_rate_limit(pair_name):
            logger.info(f"告警被限流: {pair_name}")
            return AlertStatus.THROTTLED

        # 2. 去重
        content_hash = self._get_content_hash(content)  # ✅ 修复: 使用稳定哈希
        if self._is_duplicate(content_hash):
            logger.info(f"重复告警已去重: {pair_name}")
            return AlertStatus.THROTTLED

        # 3. 发送
        try:
            sender_colourful(content=content, title=title)
            self._record(pair_name, content_hash)
            return AlertStatus.SUCCESS
        except RuntimeError as e:  # ✅ 修复: 捕获队列满异常
            logger.error(f"告警队列满: {pair_name} | {e}")
            return AlertStatus.FAILED
        except Exception as e:
            logger.error(f"告警发送失败: {pair_name} | {e}")
            return AlertStatus.FAILED

    def _get_content_hash(self, content: str) -> str:
        """使用MD5生成稳定哈希(用于去重,非加密场景)"""
        return hashlib.md5(content.encode('utf-8')).hexdigest()

    def _check_rate_limit(self, pair_name: str) -> bool:
        now = time.time()
        history = self._history[pair_name]

        # 清理过期
        while history and now - history[0] > self._rate_window:
            history.popleft()

        return len(history) < self._rate_max

    def _is_duplicate(self, content_hash: str) -> bool:  # ✅ 修复: 改为str参数
        now = time.time()

        # 清理过期(惰性清理,不频繁)
        if len(self._recent_hashes) > 100:
            expired = [k for k, t in self._recent_hashes.items() if now - t > self._dedup_window]
            for k in expired:
                del self._recent_hashes[k]

        return content_hash in self._recent_hashes and now - self._recent_hashes[content_hash] < self._dedup_window

    def _record(self, pair_name: str, content_hash: str):  # ✅ 修复: 改为str参数
        now = time.time()
        self._history[pair_name].append(now)
        self._recent_hashes[content_hash] = now

修复后的 lark_bot.py

def sender_colourful(content, title=''):
    """
    异步飞书富文本卡片告警(非阻塞)

    消息被放入后台队列,由独立线程发送,不阻塞调用方。

    Raises:
        RuntimeError: 队列满时抛出异常,由调用方处理

    https://open.larksuite.com/document/common-capabilities/message-card/message-cards-content/using-markdown-tags
    """
    if not lark_bot_id:
        logger.warning('lark 告警跳过: LARKBOT_ID 未配置')
        return None

    _ensure_worker()

    url = f'{LARK_WEBHOOK_BASE}{lark_bot_id}'
    message = {
        "msg_type": "interactive",
        "card": {
            "config": {
                "wide_screen_mode": True
            },
            "header": {
                "title": {
                    "tag": "plain_text",
                    "content": title
                },
                "template": "red"
            },
            "elements": [{
                "tag": "markdown",
                "content": content,
            }]
        }
    }

    if lark_alert_email:
        message["email"] = lark_alert_email

    headers = {'Content-Type': 'application/json'}
    payload = json.dumps(message)

    try:
        _send_queue.put_nowait((url, headers, payload))
    except queue.Full:
        logger.error('lark 告警队列已满,发送失败')  # ✅ 修复: 改为error级别
        raise RuntimeError("Alert queue full")  # ✅ 修复: 抛出异常让调用方感知

    return "queued"

结论

当前"开仓信号详细告警"功能模块存在 4个严重BUG,其中 BUG #1(告警丢失)Critical级别,必须立即修复。建议按照本报告提供的修复方案,分三个阶段完成修复:

  1. 紧急修复(P0):修复告警丢失问题
  2. 高优先级修复(P1):调整限流参数、修复队列满问题
  3. 中优先级优化(P2):改进去重逻辑

修复完成后,需要进行完整的回归测试,确保告警系统在各种场景下都能正常工作。


报告生成时间: 2026-02-16
报告作者: Claude Code SuperClaude
代码审查版本: daf7ec2

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG