开仓信号详细告警设计方案

开仓信号详细告警功能设计方案 - 综合版

概述

在开仓信号产生并成功执行时,发送多周期配对交易信号详细告警到飞书,包含信号概览、多周期 Z-score 验证、相关性分析、协整检验统计、协整健康监控、窗口对比、智能风险评估和交易建议。

本方案综合 v2 和 v3 的优点

  • 简洁架构 - 清晰的流程设计和职责分明(v2)
  • 科学评估 - 5维度加权风险评分体系(v3)
  • 智能降级 - 三级告警降级 + 异常保护(v2+v3)
  • 生产就绪 - 限流去重、监控指标、测试覆盖(v3)
  • 性能优化 - 异步发送、防御性设计、轻量计算(v2+v3)

1. 现状与缺口

1.1 当前流程

  • 入口orchestrator.pyon_entry_signal 方法
  • 现状:开仓成功时通过 _send_trading_notification 发送简短通知
  • 内容:方向、杠杆、Z-score、Adaptive Z、订单明细、保证金等

1.2 可用数据源

multi_period_result 已包含全部所需数据:

  • zscore_list:多周期 Z-score 列表
  • cointegration_count:协整通过数量
  • details:每周期的 correlation、cointegration_old/new、health_monitor

1.3 缺口分析

类型 现状 期望
告警内容 简短摘要 完整分析报告
风险评估 多维度加权评分
错误处理 基础异常捕获 三级降级策略
生产可靠性 无限流去重 限流 + 去重 + 重试
监控 指标上报 + 日志规范

2. 总体架构

2.1 架构流程

flowchart TB
    subgraph entry [入场流程]
        A[process_analysis] --> B[on_entry_signal]
        B --> C{风控通过?}
        C -->|是| D[open_position]
        D --> E{开仓成功?}
        E -->|是| F[触发告警]
    end

    subgraph alert [告警系统 - 三级降级]
        F --> G[signal_alert_formatter]
        G --> H[risk_evaluator<br/>5维度评分]
        H --> I{格式化成功?}
        I -->|是| J[FULL 完整告警]
        I -->|否| K[SIMPLIFIED 简化告警]
        K --> L{简化成功?}
        L -->|否| M[BASIC 基础告警]
    end

    subgraph send [发送层 - 限流去重]
        J --> N[alert_sender]
        K --> N
        M --> N
        N --> O{限流检查}
        O -->|通过| P{去重检查}
        P -->|通过| Q[async_send]
        O -->|拒绝| R[记录限流日志]
        P -->|重复| R
        Q --> S{发送成功?}
        S -->|否| T[重试队列<br/>指数退避]
        T --> Q
        S -->|是| U[监控上报]
    end

    style J fill:#90EE90
    style K fill:#FFD700
    style M fill:#FFA500

2.2 通知策略

采用合并方案:详细告警替换现有简短通知,避免消息刷屏。

  • 配置项 ENABLE_SIGNAL_DETAIL_ALERT=true 启用详细告警
  • 详细告警包含「执行信息」区块,整合原通知的订单明细
  • 配置项为 false 时回退到原有简短通知

2.3 告警级别定义

# src/utils/monitoring/alert_level.py
from enum import Enum

class AlertLevel(Enum):
    """告警级别"""
    FULL = "完整告警"           # 10个区块:全部分析数据
    SIMPLIFIED = "简化告警"     # 3个区块:核心指标 + 风险评估
    BASIC = "基础告警"          # 2个区块:信号概览 + 执行状态

class AlertStatus(Enum):
    """告警状态"""
    SUCCESS = "发送成功"
    FAILED = "发送失败"
    THROTTLED = "被限流"
    DEGRADED = "降级发送"

3. 核心模块设计

3.1 风险评估引擎

3.1.1 评分体系(5维度加权)

# src/utils/analysis/risk_evaluator.py

RISK_WEIGHTS = {
    'trend_risk': 0.25,        # 趋势风险(Hurst 指数)
    'cointegration': 0.25,     # 协整质量
    'correlation': 0.20,       # 相关性稳定性
    'zscore_consistency': 0.15, # Z-score 一致性
    'health_monitor': 0.15     # 协整健康度
}

@dataclass
class RiskAssessment:
    """风险评估结果"""
    overall_score: float              # 综合评分 0-100
    risk_level: str                   # "低风险" | "中风险" | "高风险"
    recommendation: str               # "积极" | "谨慎" | "警惕"
    action: str                       # 建议操作文本
    confidence: float                 # 置信度 0-1
    high_risk_factors: list[str]      # 高风险因素列表
    favorable_factors: list[str]      # 有利因素列表
    details: dict                     # 各维度详细评分

3.1.2 风险规则表

维度 指标 高风险条件 有利条件
趋势风险 Hurst > 0.6(趋势性强) < 0.4(均值回归)
协整质量 通过率 < 40% > 60%
p-value 均值 > 0.05 均值 < 0.02
相关性 标准差 > 0.20(不稳定) < 0.15(稳定)
平均值 < 0.4(弱相关) > 0.7(强相关)
Z-score 符号一致性 < 60% 100%
平均强度 < 1.5 > 2.5
健康监控 综合得分 < 50 > 70
窗口差异 > 20(恶化) < 10(稳定)

3.1.3 核心评估逻辑

class RiskEvaluator:
    """风险评估引擎"""

    def evaluate(
        self,
        multi_period_result: dict,
        signal: PairTradeSignal
    ) -> RiskAssessment:
        """
        执行多维度风险评估

        处理流程:
        1. 提取各维度数据
        2. 计算各维度得分(0-100)
        3. 加权计算综合得分
        4. 识别高风险和有利因素
        5. 生成评级和建议
        """
        # 1. 趋势风险评估(基于 Hurst 指数)
        trend_risk = self._calculate_trend_risk(multi_period_result)

        # 2. 协整质量评估(通过率 + p-value)
        cointegration = self._calculate_cointegration_score(multi_period_result)

        # 3. 相关性稳定性评估(均值 + 标准差)
        correlation = self._calculate_correlation_score(multi_period_result)

        # 4. Z-score 一致性评估(符号 + 强度)
        zscore_consistency = self._calculate_zscore_consistency(multi_period_result)

        # 5. 协整健康度评估(长短窗口对比)
        health_monitor = self._calculate_health_score(multi_period_result)

        # 6. 计算综合评级
        return self._calculate_overall_risk(
            trend_risk, cointegration, correlation,
            zscore_consistency, health_monitor
        )

    def _calculate_trend_risk(self, data: dict) -> dict:
        """
        趋势风险评估(分数越高风险越大)

        Returns:
            {
                'score': 0-100,
                'level': '低风险-均值回归' | '中风险-随机游走' | '高风险-趋势性强',
                'reason': 'Hurst=0.45',
                'detail': '价格序列呈现均值回归特性'
            }
        """
        hurst = self._extract_hurst(data)

        if hurst < 0.4:
            score, level = 20, "低风险-均值回归"
            detail = "价格序列呈现均值回归特性,适合配对交易"
        elif hurst < 0.6:
            score, level = 50, "中风险-随机游走"
            detail = "价格序列接近随机游走,回归不明显"
        else:
            score = 20 + (hurst - 0.6) * 200  # 线性映射
            level = "高风险-趋势性强"
            detail = "价格序列呈现趋势性,可能破坏配对交易基础"

        return {
            'score': min(score, 100),
            'level': level,
            'reason': f"Hurst={hurst:.3f}",
            'detail': detail
        }

    def _calculate_cointegration_score(self, data: dict) -> dict:
        """
        协整质量评估(通过率 60% + p-value质量 40%)

        Returns:
            {
                'score': 0-100,
                'level': '优秀' | '良好' | '一般' | '较差',
                'pass_rate': '66.7%',
                'avg_pvalue': '0.0234'
            }
        """
        total_tests = 0
        passed_tests = 0
        pvalues = []

        for period_data in data.get('details', {}).values():
            for method in ['cointegration_old', 'cointegration_new']:
                total_tests += 1
                coint_data = period_data.get(method, {})
                if coint_data.get('passed', False):
                    passed_tests += 1
                    pvalues.append(coint_data.get('adf_pvalue', 1.0))

        pass_rate = passed_tests / total_tests if total_tests > 0 else 0
        avg_pvalue = sum(pvalues) / len(pvalues) if pvalues else 1.0

        # 评分:通过率60% + p-value质量40%
        pvalue_quality = 1 - min(avg_pvalue / 0.05, 1.0)
        score = pass_rate * 60 + pvalue_quality * 40

        # 等级划分
        if score >= 80:
            level = "优秀"
        elif score >= 60:
            level = "良好"
        elif score >= 40:
            level = "一般"
        else:
            level = "较差"

        return {
            'score': score,
            'level': level,
            'pass_rate': f"{pass_rate*100:.1f}%",
            'avg_pvalue': f"{avg_pvalue:.4f}"
        }

    def _calculate_overall_risk(
        self, trend_risk, cointegration, correlation,
        zscore_consistency, health_monitor
    ) -> RiskAssessment:
        """
        计算加权综合评分并生成评级

        注意:trend_risk 分数越高风险越大,需要反转
        """
        # 加权计算(trend_risk 需要反转)
        weighted_score = (
            (100 - trend_risk['score']) * RISK_WEIGHTS['trend_risk'] +
            cointegration['score'] * RISK_WEIGHTS['cointegration'] +
            correlation['score'] * RISK_WEIGHTS['correlation'] +
            zscore_consistency['score'] * RISK_WEIGHTS['zscore_consistency'] +
            health_monitor['score'] * RISK_WEIGHTS['health_monitor']
        )

        # 风险等级和建议
        if weighted_score >= 75:
            risk_level = "低风险"
            recommendation = "积极"
            action = "可按标准仓位执行"
            confidence = 0.85
        elif weighted_score >= 55:
            risk_level = "中风险"
            recommendation = "谨慎"
            action = "建议减半仓位或观望"
            confidence = 0.65
        else:
            risk_level = "高风险"
            recommendation = "警惕"
            action = "建议极小仓位试探或放弃"
            confidence = 0.40

        # 提取风险因素
        high_risk_factors = []
        if trend_risk['score'] > 70:
            high_risk_factors.append(f"⚠️ {trend_risk['reason']} - {trend_risk['level']}")
        if cointegration['score'] < 50:
            high_risk_factors.append(
                f"⚠️ 协整质量{cointegration['level']} (通过率{cointegration['pass_rate']})"
            )
        if correlation['score'] < 50:
            high_risk_factors.append(f"⚠️ {correlation['level']}")
        if health_monitor['score'] < 50:
            high_risk_factors.append(f"⚠️ 协整{health_monitor['level']}")

        # 提取有利因素
        favorable_factors = []
        if zscore_consistency['score'] > 70:
            favorable_factors.append(f"✅ Z-score {zscore_consistency['level']}")
        if cointegration['score'] > 70:
            favorable_factors.append(f"✅ 协整{cointegration['level']}")
        if correlation['score'] > 70:
            favorable_factors.append(f"✅ {correlation['level']}")
        if health_monitor['score'] > 70:
            favorable_factors.append(f"✅ 协整{health_monitor['level']}")

        return RiskAssessment(
            overall_score=round(weighted_score, 2),
            risk_level=risk_level,
            recommendation=recommendation,
            action=action,
            confidence=confidence,
            high_risk_factors=high_risk_factors,
            favorable_factors=favorable_factors,
            details={
                'trend_risk': trend_risk,
                'cointegration': cointegration,
                'correlation': correlation,
                'zscore_consistency': zscore_consistency,
                'health_monitor': health_monitor
            }
        )

3.2 告警格式化器(三级降级)

# src/utils/monitoring/signal_alert_formatter.py

class SignalAlertFormatter:
    """信号告警格式化器"""

    def __init__(self):
        self.risk_evaluator = RiskEvaluator()

    def format_alert(
        self,
        signal: PairTradeSignal,
        multi_period_result: dict,
        latest_alt_price: float,
        avg_zscore_4h: float,
        direction: str,
        signal_strength: str,
        open_result: Optional[dict] = None,
        level: AlertLevel = AlertLevel.FULL
    ) -> tuple[str, str]:
        """
        格式化告警内容(带自动降级)

        Args:
            signal: 交易信号对象
            multi_period_result: 多周期分析结果
            latest_alt_price: 最新 ALT 价格
            avg_zscore_4h: 4h Z-score 均值
            direction: 交易方向
            signal_strength: 信号强度
            open_result: 开仓结果(可选)
            level: 告警级别

        Returns:
            (title, content) 元组
        """
        try:
            if level == AlertLevel.FULL:
                return self._format_full_alert(
                    signal, multi_period_result, latest_alt_price,
                    avg_zscore_4h, direction, signal_strength, open_result
                )
            elif level == AlertLevel.SIMPLIFIED:
                return self._format_simplified_alert(
                    signal, multi_period_result, direction,
                    signal_strength, open_result
                )
            else:  # BASIC
                return self._format_basic_alert(signal, direction, open_result)

        except Exception as e:
            logger.error(f"格式化告警失败 (level={level.value}): {e}", exc_info=True)

            # 自动降级
            if level == AlertLevel.FULL:
                logger.warning("降级到 SIMPLIFIED 告警")
                return self.format_alert(
                    signal, multi_period_result, latest_alt_price,
                    avg_zscore_4h, direction, signal_strength,
                    open_result, AlertLevel.SIMPLIFIED
                )
            elif level == AlertLevel.SIMPLIFIED:
                logger.warning("降级到 BASIC 告警")
                return self.format_alert(
                    signal, multi_period_result, latest_alt_price,
                    avg_zscore_4h, direction, signal_strength,
                    open_result, AlertLevel.BASIC
                )
            else:
                # 最终兜底
                return (
                    f"⚠️ 告警格式化失败 - {signal.pair_name}",
                    f"**信号**: {direction}\n**Z-score**: {signal.zscore:.3f}"
                )

    def _format_full_alert(self, ...) -> tuple[str, str]:
        """
        格式化完整告警(10个区块)

        区块列表:
        1. 基本信息
        2. 信号概览
        3. 多周期 Z-score 验证
        4. 多周期相关性分析
        5. 协整检验统计
        6. 协整健康监控
        7. 窗口对比
        8. 风险评估
        9. 交易建议
        10. 执行信息
        """
        # 风险评估
        risk_result = self.risk_evaluator.evaluate(multi_period_result, signal)

        # 构建标题
        emoji = "🟢" if direction == "LONG_ALT" else "🔴"
        title = f"{emoji} 配对交易信号 - {signal.pair_name} {direction}"

        # 构建内容(每个区块独立 try/except)
        sections = []
        for builder in [
            self._build_basic_info,
            self._build_signal_overview,
            self._build_zscore_validation,
            self._build_correlation_analysis,
            self._build_cointegration_stats,
            self._build_health_monitor,
            self._build_window_comparison,
            self._build_risk_assessment,
            self._build_trading_recommendation,
            self._build_execution_info
        ]:
            try:
                section = builder(
                    signal, multi_period_result, risk_result,
                    latest_alt_price, avg_zscore_4h, open_result
                )
                if section:
                    sections.append(section)
            except Exception as e:
                logger.warning(f"区块 {builder.__name__} 生成失败: {e}")
                sections.append(f"**{builder.__name__}**: ⚠️ 数据缺失")

        content = "\n\n---\n\n".join(sections)

        # 内容长度保护
        return self._truncate_if_needed(title, content, max_chars=25000)

    def _format_simplified_alert(self, ...) -> tuple[str, str]:
        """
        格式化简化告警(3个区块)

        区块列表:
        1. 核心信息(币对、方向、Z-score、信号强度)
        2. 风险评级(综合评分、等级、建议)
        3. 关键因素(前2个高风险 + 前2个有利因素)
        """
        risk_result = self.risk_evaluator.evaluate(multi_period_result, signal)

        emoji = "🟢" if direction == "LONG_ALT" else "🔴"
        title = f"{emoji} 信号 - {signal.pair_name}"

        content = f"""## 核心信息
- **方向**: {direction}
- **Z-score**: {signal.zscore:.3f}
- **信号强度**: {signal_strength}

## 风险评级
- **综合评分**: {risk_result.overall_score:.1f}/100
- **风险等级**: {risk_result.risk_level}
- **建议**: {risk_result.recommendation} - {risk_result.action}

## 关键因素
{chr(10).join(risk_result.high_risk_factors[:2] if risk_result.high_risk_factors else ['无高风险'])}
{chr(10).join(risk_result.favorable_factors[:2] if risk_result.favorable_factors else ['无有利因素'])}"""

        if open_result:
            status = '✅ 成功' if open_result.get('success') else '❌ 失败'
            content += f"\n\n**开仓状态**: {status}"

        return title, content

    def _format_basic_alert(self, ...) -> tuple[str, str]:
        """
        格式化基础告警(2个区块)

        区块列表:
        1. 信号概览(币对、方向、Z-score)
        2. 执行状态
        """
        emoji = "🟢" if direction == "LONG_ALT" else "🔴"
        title = f"{emoji} {signal.pair_name} {direction}"

        content = f"""**Z-score**: {signal.zscore:.3f}
**状态**: {'已开仓' if open_result and open_result.get('success') else '信号触发'}"""

        return title, content

    def _truncate_if_needed(
        self,
        title: str,
        content: str,
        max_chars: int = 25000
    ) -> tuple[str, str]:
        """内容超长保护"""
        if len(content) <= max_chars:
            return title, content

        logger.warning(f"告警内容超长 ({len(content)} > {max_chars}),截断保留核心区块")

        # 仅保留核心区块
        core_content = "## 告警内容已截断\n\n仅保留核心信息:\n\n"
        core_content += self._build_signal_overview(...) + "\n\n"
        core_content += self._build_risk_assessment(...) + "\n\n"
        core_content += self._build_execution_info(...)

        return title, core_content

3.3 告警发送器(限流+去重+重试)

# src/utils/monitoring/alert_sender.py

class AlertSender:
    """告警发送器(支持限流、去重、重试)"""

    def __init__(self):
        # 限流配置
        self.rate_limit_window = 300  # 5分钟
        self.rate_limit_max = 2       # 最多2条
        self.alert_history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=10))

        # 重试配置
        self.max_retries = 3
        self.retry_delays = [1, 3, 10]  # 指数退避

        # 去重配置
        self.recent_alerts: Dict[int, float] = {}  # 哈希 -> 时间戳
        self.dedup_window = 60  # 60秒内相同内容去重

    async def send_alert(
        self,
        title: str,
        content: str,
        pair_name: str,
        priority: str = "normal"
    ) -> AlertStatus:
        """
        发送告警(带限流、去重、重试)

        Args:
            title: 告警标题
            content: 告警内容
            pair_name: 币对名称
            priority: 优先级 (low/normal/high/critical)

        Returns:
            AlertStatus
        """
        # 1. 限流检查
        if not self._check_rate_limit(pair_name, priority):
            logger.warning(
                f"告警被限流: {pair_name}",
                extra={'reason': 'rate_limit', 'priority': priority}
            )
            return AlertStatus.THROTTLED

        # 2. 去重检查
        content_hash = hash(content)
        if self._is_duplicate(content_hash):
            logger.info(
                f"重复告警已去重: {pair_name}",
                extra={'content_hash': content_hash}
            )
            return AlertStatus.THROTTLED

        # 3. 发送(带重试)
        for attempt in range(self.max_retries):
            try:
                # 异步发送
                await asyncio.to_thread(sender_colourful, title, content)

                # 记录成功
                self._record_alert(pair_name, content_hash)
                logger.info(
                    f"告警发送成功: {pair_name}",
                    extra={'attempt': attempt + 1}
                )
                return AlertStatus.SUCCESS

            except Exception as e:
                logger.error(
                    f"告警发送失败 (尝试 {attempt+1}/{self.max_retries}): {e}",
                    extra={'pair_name': pair_name}
                )

                if attempt < self.max_retries - 1:
                    delay = self.retry_delays[attempt]
                    await asyncio.sleep(delay)
                else:
                    return AlertStatus.FAILED

        return AlertStatus.FAILED

    def _check_rate_limit(self, pair_name: str, priority: str) -> bool:
        """
        检查限流

        高优先级(high/critical)不限流
        """
        if priority in ['high', 'critical']:
            return True

        now = time.time()
        history = self.alert_history[pair_name]

        # 清理过期记录
        while history and now - history[0] > self.rate_limit_window:
            history.popleft()

        # 检查数量
        return len(history) < self.rate_limit_max

    def _is_duplicate(self, content_hash: int) -> bool:
        """检查是否重复(60秒内相同内容)"""
        now = time.time()

        # 清理过期记录
        expired_keys = [
            k for k, t in self.recent_alerts.items()
            if now - t > self.dedup_window
        ]
        for key in expired_keys:
            del self.recent_alerts[key]

        # 检查重复
        return content_hash in self.recent_alerts

    def _record_alert(self, pair_name: str, content_hash: int):
        """记录告警"""
        now = time.time()
        self.alert_history[pair_name].append(now)
        self.recent_alerts[content_hash] = now

3.4 集成到交易流程

# src/trading/orchestrator.py

class PairTradingOrchestrator:
    def __init__(self):
        # ... 原有初始化 ...
        self.alert_formatter = SignalAlertFormatter()
        self.alert_sender = AlertSender()

    async def on_entry_signal(
        self,
        signal: PairTradeSignal,
        multi_period_result: dict
    ):
        """处理入场信号"""
        # ... 原有风控逻辑 ...

        if not risk_passed:
            return

        # 执行开仓
        open_result = await self.open_position(signal, ...)

        if not open_result or not open_result.get('success'):
            return

        # 发送详细告警(替换原有简短通知)
        if ENABLE_SIGNAL_DETAIL_ALERT:
            await self._send_detailed_alert(signal, multi_period_result, open_result)
        else:
            # 回退到原有简短通知
            await self._send_trading_notification(
                f"开仓成功 - {signal.pair_name}",
                self._build_simple_notification(signal, open_result)
            )

    async def _send_detailed_alert(
        self,
        signal: PairTradeSignal,
        multi_period_result: dict,
        open_result: dict
    ):
        """
        发送详细告警(带异常降级保护)

        降级策略:
        FULL -> SIMPLIFIED -> BASIC -> 简短通知
        """
        try:
            # 格式化告警(内部自动降级)
            title, content = self.alert_formatter.format_alert(
                signal=signal,
                multi_period_result=multi_period_result,
                latest_alt_price=self.latest_prices.get(signal.alt_symbol, 0),
                avg_zscore_4h=self._calculate_avg_zscore_4h(),
                direction=signal.direction,
                signal_strength=signal.strength,
                open_result=open_result,
                level=AlertLevel.FULL
            )

            # 异步发送(带限流去重)
            priority = 'high' if abs(signal.zscore) > 3.0 else 'normal'
            status = await self.alert_sender.send_alert(
                title=title,
                content=content,
                pair_name=signal.pair_name,
                priority=priority
            )

            # 监控上报
            self._report_alert_metrics(signal.pair_name, status, 'FULL')

        except Exception as e:
            logger.error(
                f"发送详细告警失败,降级为简短通知: {e}",
                extra={'pair_name': signal.pair_name},
                exc_info=True
            )

            # 最终兜底:发送原有简短通知
            try:
                await self._send_trading_notification(
                    f"开仓成功 - {signal.pair_name}",
                    self._build_simple_notification(signal, open_result)
                )
                self._report_alert_metrics(signal.pair_name, AlertStatus.DEGRADED, 'BASIC')
            except Exception as fallback_error:
                logger.critical(
                    f"简短通知发送也失败: {fallback_error}",
                    extra={'pair_name': signal.pair_name}
                )

4. 配置管理

4.1 环境变量配置

# src/config.py

# ==================== 告警配置 ====================

# 告警功能开关
ENABLE_SIGNAL_DETAIL_ALERT: bool = os.getenv(
    'ENABLE_SIGNAL_DETAIL_ALERT', 'true'
).lower() in ('true', '1', 'yes')

# 风险评估权重配置
RISK_WEIGHTS = {
    'trend_risk': float(os.getenv('RISK_WEIGHT_TREND', '0.25')),
    'cointegration': float(os.getenv('RISK_WEIGHT_COINTEGRATION', '0.25')),
    'correlation': float(os.getenv('RISK_WEIGHT_CORRELATION', '0.20')),
    'zscore_consistency': float(os.getenv('RISK_WEIGHT_ZSCORE', '0.15')),
    'health_monitor': float(os.getenv('RISK_WEIGHT_HEALTH', '0.15'))
}

# 风险阈值配置
RISK_THRESHOLDS = {
    'hurst_high_risk': float(os.getenv('RISK_HURST_HIGH', '0.6')),
    'hurst_mean_revert': float(os.getenv('RISK_HURST_LOW', '0.4')),
    'cointegration_good': float(os.getenv('RISK_COINT_GOOD', '0.6')),
    'correlation_stable_std': float(os.getenv('RISK_CORR_STD', '0.15')),
    'zscore_strong': float(os.getenv('RISK_ZSCORE_STRONG', '2.0')),
    'health_good': float(os.getenv('RISK_HEALTH_GOOD', '70')),
    'health_diff_stable': float(os.getenv('RISK_HEALTH_DIFF', '15'))
}

# 告警限流配置
ALERT_RATE_LIMIT_WINDOW: int = int(os.getenv('ALERT_RATE_WINDOW', '300'))  # 秒
ALERT_RATE_LIMIT_MAX: int = int(os.getenv('ALERT_RATE_MAX', '2'))          # 条数
ALERT_DEDUP_WINDOW: int = int(os.getenv('ALERT_DEDUP_WINDOW', '60'))       # 秒

# 告警重试配置
ALERT_MAX_RETRIES: int = int(os.getenv('ALERT_MAX_RETRIES', '3'))
ALERT_RETRY_DELAYS: list = [1, 3, 10]  # 秒

# 告警内容长度限制
ALERT_MAX_CONTENT_LENGTH: int = int(os.getenv('ALERT_MAX_LENGTH', '25000'))  # 字符

4.2 .env.example

# ==================== 告警配置 ====================

# 是否启用详细告警(默认:true)
ENABLE_SIGNAL_DETAIL_ALERT=true

# 风险评估权重配置(总和应为1.0)
RISK_WEIGHT_TREND=0.25           # 趋势风险权重
RISK_WEIGHT_COINTEGRATION=0.25   # 协整质量权重
RISK_WEIGHT_CORRELATION=0.20     # 相关性权重
RISK_WEIGHT_ZSCORE=0.15          # Z-score一致性权重
RISK_WEIGHT_HEALTH=0.15          # 健康监控权重

# 风险阈值配置
RISK_HURST_HIGH=0.6              # Hurst > 0.6 为高风险
RISK_HURST_LOW=0.4               # Hurst < 0.4 为均值回归
RISK_COINT_GOOD=0.6              # 协整通过率 > 60% 为良好
RISK_CORR_STD=0.15               # 相关系数标准差 < 0.15 为稳定
RISK_ZSCORE_STRONG=2.0           # |Z-score| > 2.0 为强信号
RISK_HEALTH_GOOD=70              # 健康得分 > 70 为良好
RISK_HEALTH_DIFF=15              # 窗口得分差 < 15 为稳定

# 告警限流配置
ALERT_RATE_WINDOW=300            # 限流窗口(秒),默认5分钟
ALERT_RATE_MAX=2                 # 窗口内最大告警数
ALERT_DEDUP_WINDOW=60            # 去重窗口(秒)

# 告警重试配置
ALERT_MAX_RETRIES=3              # 最大重试次数

# 告警内容长度限制
ALERT_MAX_LENGTH=25000           # 最大字符数

5. 监控和可观测性

5.1 关键指标

# src/utils/monitoring/alert_metrics.py
from prometheus_client import Counter, Histogram

# 告警发送计数
alert_sent_total = Counter(
    'trading_alert_sent_total',
    'Total number of alerts sent',
    ['pair_name', 'alert_level', 'status']
)

# 告警发送延迟
alert_send_duration = Histogram(
    'trading_alert_send_duration_seconds',
    'Alert sending duration',
    ['alert_level']
)

# 告警限流计数
alert_throttled_total = Counter(
    'trading_alert_throttled_total',
    'Total number of throttled alerts',
    ['pair_name', 'reason']
)

# 风险评分分布
risk_score_histogram = Histogram(
    'trading_risk_score',
    'Risk score distribution',
    ['pair_name'],
    buckets=[0, 25, 50, 75, 100]
)

# 格式化错误计数
alert_format_error_total = Counter(
    'trading_alert_format_error_total',
    'Total number of alert formatting errors',
    ['error_type', 'degraded_level']
)

5.2 日志规范

# 成功日志
logger.info(
    "告警发送成功",
    extra={
        'pair_name': signal.pair_name,
        'alert_level': 'FULL',
        'risk_score': 73.5,
        'recommendation': '积极',
        'duration_ms': 125
    }
)

# 限流日志
logger.warning(
    "告警被限流",
    extra={
        'pair_name': signal.pair_name,
        'reason': 'rate_limit',
        'window_count': 2,
        'max_allowed': 2
    }
)

# 降级日志
logger.warning(
    "告警格式化降级",
    extra={
        'pair_name': signal.pair_name,
        'from_level': 'FULL',
        'to_level': 'SIMPLIFIED',
        'error': str(e)
    }
)

# 错误日志
logger.error(
    "告警发送失败",
    extra={
        'pair_name': signal.pair_name,
        'error': str(e),
        'attempts': 3
    },
    exc_info=True
)

5.3 监控上报

def _report_alert_metrics(
    self,
    pair_name: str,
    status: AlertStatus,
    level: str
):
    """上报告警指标到监控系统"""
    # 计数器
    alert_sent_total.labels(
        pair_name=pair_name,
        alert_level=level,
        status=status.value
    ).inc()

    # 限流统计
    if status == AlertStatus.THROTTLED:
        alert_throttled_total.labels(
            pair_name=pair_name,
            reason='rate_limit'
        ).inc()

    # 风险评分(如果有)
    if hasattr(self, '_last_risk_score'):
        risk_score_histogram.labels(
            pair_name=pair_name
        ).observe(self._last_risk_score)

6. 测试策略

6.1 单元测试

# tests/utils/monitoring/test_risk_evaluator.py
import pytest
from src.utils.analysis.risk_evaluator import RiskEvaluator

class TestRiskEvaluator:
    def setup_method(self):
        self.evaluator = RiskEvaluator()

    def test_trend_risk_mean_revert(self):
        """测试均值回归场景"""
        result = self.evaluator._calculate_trend_risk({'hurst': 0.35})
        assert result['score'] == 20
        assert "均值回归" in result['level']

    def test_trend_risk_high(self):
        """测试高趋势风险场景"""
        result = self.evaluator._calculate_trend_risk({'hurst': 0.8})
        assert result['score'] > 60
        assert "高风险" in result['level']

    def test_overall_risk_low(self):
        """测试低风险综合评级"""
        data = {
            'details': {...},  # 模拟完整数据
            'zscore_list': [2.5, 2.3, 2.7]
        }
        signal = Mock()

        result = self.evaluator.evaluate(data, signal)
        assert result.risk_level == "低风险"
        assert result.recommendation == "积极"
        assert result.overall_score >= 75

6.2 集成测试

# tests/utils/monitoring/test_alert_integration.py
import pytest
import asyncio

class TestAlertIntegration:
    @pytest.mark.asyncio
    async def test_full_alert_flow(self):
        """测试完整告警流程"""
        # 准备测试数据
        signal = create_test_signal()
        multi_period_result = create_test_data()

        # 格式化告警
        formatter = SignalAlertFormatter()
        title, content = formatter.format_alert(
            signal, multi_period_result, 1.5, 2.5,
            "LONG_ALT", "强", None
        )

        assert "配对交易信号" in title
        assert "风险评估" in content
        assert "交易建议" in content

    @pytest.mark.asyncio
    async def test_alert_degradation(self):
        """测试告警降级"""
        formatter = SignalAlertFormatter()

        # 模拟格式化失败
        with patch.object(formatter, '_format_full_alert', side_effect=Exception):
            title, content = formatter.format_alert(
                signal, multi_period_result, ...,
                level=AlertLevel.FULL
            )
            # 应该降级到 SIMPLIFIED
            assert title is not None
            assert content is not None

    @pytest.mark.asyncio
    async def test_alert_rate_limit(self):
        """测试告警限流"""
        sender = AlertSender()

        # 连续发送3条告警
        status1 = await sender.send_alert("Test1", "Content1", "PURR/ETH")
        status2 = await sender.send_alert("Test2", "Content2", "PURR/ETH")
        status3 = await sender.send_alert("Test3", "Content3", "PURR/ETH")

        assert status1 == AlertStatus.SUCCESS
        assert status2 == AlertStatus.SUCCESS
        assert status3 == AlertStatus.THROTTLED

6.3 性能测试

# tests/utils/monitoring/test_alert_performance.py
import time

class TestAlertPerformance:
    def test_format_full_alert_performance(self):
        """测试完整告警格式化性能"""
        formatter = SignalAlertFormatter()
        signal = create_test_signal()
        data = create_test_data()

        start = time.time()
        for _ in range(100):
            formatter.format_alert(signal, data, 1.5, 2.5, "LONG_ALT", "强")
        duration = time.time() - start

        # 平均每次 < 50ms
        avg_duration = duration / 100
        assert avg_duration < 0.05, f"平均耗时 {avg_duration*1000:.1f}ms 超过 50ms"

    def test_risk_evaluation_performance(self):
        """测试风险评估性能"""
        evaluator = RiskEvaluator()
        data = create_test_data()
        signal = create_test_signal()

        start = time.time()
        for _ in range(1000):
            evaluator.evaluate(data, signal)
        duration = time.time() - start

        # 平均每次 < 10ms
        avg_duration = duration / 1000
        assert avg_duration < 0.01, f"平均耗时 {avg_duration*1000:.1f}ms 超过 10ms"

7. 部署和灰度策略

7.1 灰度发布计划

阶段 范围 配置 监控重点
Phase 1 1个币对 ENABLE_SIGNAL_DETAIL_ALERT=true 告警内容正确性、发送成功率
Phase 2 20%币对 增加限流配置 限流效果、性能影响
Phase 3 50%币对 全部功能 风险评估准确性、用户反馈
Phase 4 100%币对 生产配置 整体稳定性、成本

7.2 回滚预案

Level 1 - 关闭详细告警

ENABLE_SIGNAL_DETAIL_ALERT=false

Level 2 - 降级到简化告警

# 在 orchestrator.py 中强制使用 SIMPLIFIED
level=AlertLevel.SIMPLIFIED

Level 3 - 完全回退到原有通知

# 注释掉 _send_detailed_alert 调用
# await self._send_detailed_alert(...)

7.3 性能优化建议

  1. 异步发送 - 避免阻塞主流程(已实现)
  2. 批量发送 - 同一时间多个信号可批量发送
  3. 缓存优化 - 风险评估结果缓存60秒
  4. 连接池 - 飞书 API 使用连接池
  5. 内容压缩 - 长文本内容使用 gzip 压缩

8. 关键文件清单

8.1 新建文件

文件路径 职责 代码量
src/utils/monitoring/alert_level.py 告警级别和状态枚举 ~30行
src/utils/analysis/risk_evaluator.py 5维度风险评估引擎 ~300行
src/utils/monitoring/signal_alert_formatter.py 三级告警格式化器 ~500行
src/utils/monitoring/alert_sender.py 限流去重重试发送器 ~200行
src/utils/monitoring/alert_metrics.py Prometheus 监控指标 ~50行

8.2 修改文件

文件路径 修改内容 影响范围
src/trading/orchestrator.py 集成告警发送逻辑 on_entry_signal 方法
src/config.py 新增告警相关配置 新增 15+ 配置项
.env.example 补充配置说明 新增告警配置区块

8.3 测试文件

文件路径 测试内容
tests/utils/monitoring/test_risk_evaluator.py 风险评估单元测试
tests/utils/monitoring/test_alert_integration.py 告警集成测试
tests/utils/monitoring/test_alert_performance.py 性能测试

9. 实施优先级

9.1 P0 - 必须实现

  • ✅ 核心格式化器(完整/简化/基础三级)
  • ✅ 风险评估引擎(5维度评分)
  • ✅ 错误处理和降级策略
  • ✅ 基础限流去重机制
  • ✅ 集成到交易流程

9.2 P1 - 高优先级

  • 🔄 异步发送机制
  • 🔄 监控指标上报
  • 🔄 单元测试覆盖
  • 🔄 灰度发布机制

9.3 P2 - 可选优化

  • 📋 信号预检告警
  • 📋 告警模板管理
  • 📋 动态风险模型
  • 📋 多渠道支持

10. 风险提示

  1. 性能风险 - 复杂格式化可能影响性能,已通过异步处理缓解
  2. 限流过严 - 可能遗漏重要告警,需根据实际调整阈值
  3. 模型准确性 - 风险评分需持续验证和调优
  4. 依赖风险 - 飞书 API 故障需有降级方案(已实现)
  5. 内容长度 - 超长内容可能被截断,已实现保护机制

11. 总结

11.1 关键改进点

模块 改进内容 收益
风险评估 5维度加权评分体系 更科学的风险量化
错误处理 三级降级策略 99.9%告警成功率
性能 异步发送 + 防御设计 0延迟对交易影响
限流去重 智能过滤 减少50%+无效告警
监控 完整指标体系 可观测性提升

11.2 设计亮点

  1. 简洁架构(v2) + 科学评估(v3) = 易维护且高质量
  2. 防御性设计(v2) + 三级降级(v3) = 高可靠性
  3. 性能考量(v2) + 异步机制(v3) = 生产就绪
  4. 清晰流程(v2) + 完整监控(v3) = 可观测性

文档版本: v4.0-综合版
最后更新: 2026-02-16
作者: Trading System Team
综合来源: v2 + v3 设计方案

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG