Kalman Beta 双用途设计方案(v2 — 算法优化版)

Kalman Beta 双用途设计方案(v2 — 算法优化版)

1. 问题背景

当前系统在 Beta 值的使用上存在两个结构性缺陷:

缺陷 1:Beta 估计滞后

OLS BETA_WINDOW=100 固定窗口(≈17天@4h),β 结构性变化后系统需要一周以上才能感知:

T+0h    BTC 回暖,β 从 0.5 开始上升
T+12h   β 实际 2.0,OLS β̂ 仍 ≈ 0.6 → 误判为均值回归机会
T+24h   β 实际 4.0,OLS β̂ ≈ 0.8 → 继续误入场
T+7d    β 实际 8.0,OLS β̂ ≈ 2.0 → 协整假设彻底失效

核心问题:系统用滞后 β 计算的 z4h 产生入场信号,实际上反映的是 β 正在变化,而非 spread 正在回归

缺陷 2:仓位比例未使用 Beta

risk_manager.calculate_position_size() 对两腿使用等额名义价值

# 当前实现(risk_manager.py:164-170)
alt_size = position_usd / alt_price      # 两腿等额
base_size = position_usd / base_price    # 与 β 无关

配对交易的对冲比例应为 alt_notional = β × base_notional。等额开仓意味着:

  • β=0.5 时,Alt 腿过重 → 暴露于 Alt 的非系统性风险
  • β=2.0 时,Alt 腿过轻 → 对冲不足,价差波动被放大
  • β 变化时,无法调整比例 → 持仓期间 hedge 持续偏离

缺陷 3:固定 Q 无法适配不同币对(v2 新增)

不同配对的 β 动态特征差异极大:

  • MEME 配对(如 PURR/HYPE):β 波动剧烈,需要更大的过程噪声 Q 来追踪
  • L1 配对(如 ETH/BTC):β 非常稳定,过大的 Q 会引入不必要的噪声

固定 q_β=1e-4 无法同时满足两类配对的需求,导致要么追踪不足(漏报体制切换),要么过度追踪(误报)。

当前系统弱点汇总

组件 问题 影响
analysis_core.py BETA_WINDOW=100 固定窗口 OLS β 估计滞后 ≈17天
risk_manager.py 等额名义价值,未使用 β 对冲比例错误
strategy.py adaptive_threshold=3.0 固定 β 飙升导致持续突破阈值
momentum_filter.py 只检测单腿价格趋势 不检测 spread/beta 体制切换
固定过程噪声 Q 不同币对无法自适应追踪

2. 方案概述

核心设计:向量 Kalman Filter 估计时变 [α, β],其输出 同时服务两个用途

                   ┌───────────────────────────────────────────┐
                   │  VectorKalmanBetaEstimator (4h)           │
                   │  输入: r_btc, r_alt (对数收益率)            │
                   │  特性: Innovation-based Q 自适应            │
                   │  输出: kalman_beta, innovation, P_β        │
                   └──────────────┬────────────────────────────┘
                                  │
                  ┌───────────────┼───────────────┐
                  ▼                               ▼
         用途 1: 体制检测                    用途 2: Hedge Ratio
         ┌─────────────────┐            ┌─────────────────────┐
         │ BOCPD 变点检测    │            │ 仓位比例计算          │
         │ 特性: 自适应 H    │            │ 输入: kalman_beta    │
         │ 输出: 变点概率    │            │ 输出: alt/base 数量比 │
         └────────┬────────┘            └──────────┬──────────┘
                  │                                │
                  ▼                                ▼
         入场信号过滤                         Beta 加权开仓
         (硬拦截/阈值缩放)              (alt_notional = β × base_notional)
                  │
                  ▼
         跨配对系统性风险聚合
         (加权变点概率 > 阈值 → 全局拦截)

v2 相比 v1 的核心算法升级

组件 v1 v2 提升
Kalman Q 固定 q_β=1e-4 Innovation-based 自适应 Q 每配对自动最优追踪速度
Sage-Husa R 负值时强制设为 R_floor(系统性低估) 仅正值时更新,否则保持 消除 R 低估偏差
BOCPD Hazard 固定 H=1/50 Innovation 方差驱动自适应 H 变化剧烈时更快检测
系统性风险 简单计数比例 加权聚合(P(变点) × 仓位权重) 减少误报/漏报
负 β 处理 abs() 一刀切 返回符号信息,策略层方向翻转 避免反向配对错误
Sigmoid 缩放 steepness 无上限 上限 20.0 防止退化为阶跃函数

为什么 4h 频率足够

β 是两个资产之间的结构性关系(同赛道、资金流向、基本面联动),变化周期在天~周级别。

方法 响应 β 结构性变化 抗日内噪声 适用场景
OLS 100×4h ~7-10 天 当前系统(过于滞后)
4h Kalman (自适应 Q) 4-16 小时 体制检测 + Hedge Ratio
5m Kalman ~15-30 分钟 弱(追噪声) 不适用于 β 估计

四层架构

组件 功能 v2 改进
第一层 向量 Kalman Filter 估计时变 [α, β] Innovation-based Q 自适应 + R 修复
第二层 BOCPD 变点检测 监控 innovation 分布变化 → 入场信号过滤 自适应 Hazard Rate
第三层 跨配对系统性风险聚合 统计所有配对体制状态 → 系统性事件拦截 加权聚合
第四层 Beta 加权仓位计算 kalman_beta 计算两腿名义比例 负 β 方向感知

3. 算法选型论证

3.1 为什么选 Kalman Filter(而非其他时变回归方法)

方法 优势 劣势 适用性评估
向量 Kalman Filter 最优线性估计(MMSE);递推 O(1);输出 innovation 可直接用于变点检测;P_β 提供不确定性度量 假设线性高斯;固定 Q 时不自适应 最优选择(本方案用 Q 自适应解决劣势)
RLS (递推最小二乘) 实现简单;forgetting factor 天然自适应 不输出 innovation 分布(无法接 BOCPD);无状态不确定性度量 不适合(缺少 BOCPD 所需信号)
粒子滤波 处理非线性/非高斯 O(N_particles) 计算量大;参数多;难调试 过度设计(β 的线性关系足够)
Online Bayesian LR 完全贝叶斯;自然输出后验 不支持时变参数(除非加 forgetting) 不如 Kalman 灵活
神经网络 (LSTM/Transformer) 可捕获非线性 需大量训练数据;不可解释;延迟高 不适合(生产系统需要可解释性)

结论:Kalman Filter 是时变线性回归的理论最优解(Kalman 1960),且其 innovation 序列是 BOCPD 的天然输入。配合 Q 自适应后,覆盖了 RLS forgetting factor 的自适应能力。

3.2 为什么选 BOCPD(而非其他变点检测方法)

方法 优势 劣势 适用性评估
BOCPD (Adams & MacKay 2007) 在线递推;输出变点后验概率(非二元判断);Normal-Inverse-Gamma 共轭高效;已在 momentum_filter 验证 固定 hazard 不够灵活;O(max_rl) 内存 最优选择(本方案加自适应 H)
CUSUM 极简;延迟低 只检测均值偏移;无概率输出;阈值需手调 过于简单
PELT 精确(离线最优) 离线算法,不支持在线 不适用
Conformal CPD (Vovk 2021) 无分布假设 在线版本不成熟;无 run length 信息 实验阶段
HMM 多体制建模 体制数需预设;EM 训练需离线 适合离线分析(P2 演进)
GP-BOCPD (Turner 2009) 段内非平稳 O(n²) 计算量 过度设计

结论:BOCPD 是在线变点检测的标杆算法,且本项目 momentum_filter.py 已使用同一框架(P(trending) 检测),技术栈一致。

3.3 为什么不用统一框架(Regime-Switching SSM)

Regime-Switching State Space Model (Kim 1994) 将 Kalman + 体制检测合二为一:

  • 优势:参数更少、理论更优雅
  • 劣势:Kim's approximation 有误差累积;实现复杂度高;调试困难;两层概率耦合

当前选择:模块化 Kalman + BOCPD 架构。原因:

  1. 可独立调试和验证每一层
  2. BOCPD 已有 momentum_filter 经验
  3. 出问题时容易定位(是 β 估计问题还是变点检测问题)
  4. MS-SSM 作为 P2 演进方向保留

4. 详细设计

4.1 向量 Kalman Filter 时变 [α, β] 估计

数学模型

状态方程([α, β] 缓慢演化):

x_t = x_{t-1} + w_t,    w_t ~ N(0, Q_t)

其中 x_t = [α_t, β_t]'
     Q_t = [[q_α_t,   0  ],     对角阵,α/β 独立演化
            [  0,   q_β_t]]
     q_β_t 由 Innovation-based 自适应调整(见下文)

观测方程(对数收益率关系):

r_alt_t = H_t × x_t + v_t,    v_t ~ N(0, R_t)

其中 H_t = [1, r_btc_t]
     r_alt_t = log(alt_t / alt_{t-1})
     r_btc_t = log(btc_t / btc_{t-1})
     R_t 由 Sage-Husa 自适应调整

递推公式

预测步:
  x̂_t|t-1 = x̂_{t-1}
  P_t|t-1  = P_{t-1} + Q_t                          (2×2)

Innovation:
  ε_t = r_alt_t - H_t × x̂_t|t-1                    (标量)
  S_t = H_t × P_t|t-1 × H_t' + R_t                  (标量)

Huber Clipping (c=3.0):
  ε̃_t = clip(ε_t, -c×√S_t, +c×√S_t)

Kalman 增益:
  K_t = P_t|t-1 × H_t' / S_t                        (2×1)

更新步:
  x̂_t = x̂_t|t-1 + K_t × ε̃_t

Joseph 稳定形式:
  A = I - K_t × H_t                                  (2×2)
  P_t = A × P_t|t-1 × A' + K_t × R_t × K_t'         (保证半正定)

Sage-Husa R 自适应 (v2 修复):
  d_t = (1 - b) / (1 - b^(t+1)),   b ∈ (0,1)
  r_sample = ε_t² - H_t × P_t|t-1 × H_t'
  若 r_sample > 0:                                    ← v2: 仅正值时更新
    R_t = (1 - d_t) × R_{t-1} + d_t × r_sample
    R_t = max(R_t, R_floor)
  否则:
    R_t = R_{t-1}                                     ← v2: 保持不变

Innovation-based Q_β 自适应 (v2 新增):
  ν_t = (ε_t / √S_t)²                                (归一化 innovation 平方)
  ν̄_t = (1 - η) × ν̄_{t-1} + η × ν_t                (EMA 追踪)

  理论基础: 若模型正确,E[ν_t] = 1

  若 ν̄_t > γ_upper (如 2.0):
    Q_β 过小(追踪不足)→ q_β_t = min(q_β_{t-1} × κ_up, q_β_ceil)
  若 ν̄_t < γ_lower (如 0.3):
    Q_β 过大(追踪过度)→ q_β_t = max(q_β_{t-1} × κ_down, q_β_floor)
  否则:
    q_β_t = q_β_{t-1}                                (保持)

Innovation-based Q 自适应原理

核心洞察(Mehra 1970, Mohamed & Schwarz 1999):

正确指定的 Kalman Filter 的归一化 innovation ε_t/√S_t 应服从 N(0,1),因此 (ε_t/√S_t)² 的期望值为 1。

ν̄_t 状态 含义 动作
ν̄ > γ_upper Innovation 持续偏大 → Q 过小,滤波器追踪不足 增大 Q_β
ν̄ < γ_lower Innovation 持续偏小 → Q 过大,滤波器追踪过度(追噪声) 减小 Q_β
γ_lower ≤ ν̄ ≤ γ_upper Innovation 符合模型预期 保持 Q_β

自适应效果

  • MEME 配对:β 波动大 → innovation 频繁偏大 → Q_β 自动上调 → 追踪更快
  • L1 配对:β 稳定 → innovation 正常 → Q_β 保持或下调 → 避免追噪声

关键输出信号及其用途

信号 用途 1: 体制检测 用途 2: Hedge Ratio
beta (x̂[1]) β 趋势监控 直接作为 hedge ratio
alpha (x̂[0]) 吸收独立漂移,避免污染 β
P_beta (P[1,1]) 不确定性度量 hedge ratio 置信度(P_β 过大时降级为 OLS β)
ε/√S (normalized_innovation) BOCPD 输入
innov_sq_ema (ν̄_t) Q 自适应驱动

参数

参数 符号 默认值 含义 调优
β 过程噪声初始值 q_β₀ 1e-4 β 每 4h 变化方差先验(由自适应调整) 初始值,自动调整
α 过程噪声 q_α 1e-5 α 每 4h 变化方差先验(比 β 慢) 通常不调
R 遗忘因子 b 0.95 Sage-Husa 遗忘因子 ↑平滑 ↓快适应
R 下限 R_floor 1e-8 防止 R 退化为零
Innovation 截断 c 3.0 Huber 截断(σ 倍数) ↓鲁棒 ↑灵敏
初始 [α, β] OLS 估计 用现有 OLS 初始化
初始 P P₀ diag(0.1, 1.0) 初始不确定性
Q 自适应 EMA 速率 η 0.05 ν̄ 的 EMA 衰减 ↑快响应 ↓平滑
Q 放大阈值 γ_upper 2.0 ν̄ > 此值时增大 Q_β
Q 缩小阈值 γ_lower 0.3 ν̄ < 此值时减小 Q_β
Q 放大因子 κ_up 1.05 Q_β 每步最多放大 5%
Q 缩小因子 κ_down 0.98 Q_β 每步最多缩小 2%
Q_β 上限 q_β_ceil 1e-2 防止 Q 过大
Q_β 下限 q_β_floor 1e-6 防止 Q 过小

q_β₀ = 1e-4 → 日 β 漂移 ≈ 0.024,周 ≈ 0.065,合理覆盖正常变化。
q_α = q_β/10 → α 变化比 β 慢一个量级,避免争夺解释力。
κ_up > κ_down(放大快、缩小慢):保守策略,宁可追踪快一点也不要漏掉体制变化。

文件:src/config.py

# ═══ Kalman Filter Beta 估计参数 ═══
KALMAN_Q_BETA: float = 1e-4
KALMAN_Q_ALPHA: float = 1e-5
KALMAN_P0_ALPHA: float = 0.1
KALMAN_P0_BETA: float = 1.0
KALMAN_R_FORGET: float = 0.95
KALMAN_R_FLOOR: float = 1e-8
KALMAN_CLIP_SIGMA: float = 3.0

# ═══ Q 自适应参数(v2 新增)═══
KALMAN_Q_ADAPT_ENABLED: bool = True
KALMAN_Q_ADAPT_RATE: float = 0.05        # ν̄ 的 EMA 速率
KALMAN_Q_ADAPT_UPPER: float = 2.0        # ν̄ > 此值 → 增大 Q_β
KALMAN_Q_ADAPT_LOWER: float = 0.3        # ν̄ < 此值 → 减小 Q_β
KALMAN_Q_SCALE_UP: float = 1.05          # Q_β 放大因子
KALMAN_Q_SCALE_DOWN: float = 0.98        # Q_β 缩小因子
KALMAN_Q_BETA_CEIL: float = 1e-2         # Q_β 上限
KALMAN_Q_BETA_FLOOR: float = 1e-6        # Q_β 下限

# ═══ Hedge Ratio 参数 ═══
HEDGE_BETA_MIN: float = 0.1     # β 下限保护(防止极端比例)
HEDGE_BETA_MAX: float = 5.0     # β 上限保护
HEDGE_BETA_P_MAX: float = 0.5   # P_β 超过此值时降级为 OLS β(Kalman 不确定性过高)

文件:src/utils/analysis/analysis_core.py

新增类VectorKalmanBetaEstimator

import numpy as np
from scipy.special import gammaln  # 模块级导入,避免热路径 import


class VectorKalmanBetaEstimator:
    """向量 Kalman Filter 时变 [α, β] 联合估计器

    状态空间模型:
        状态方程: x_t = x_{t-1} + w_t,    w_t ~ N(0, Q_t),  x = [α, β]'
        观测方程: r_alt_t = [1, r_btc_t] × x_t + v_t,       v_t ~ N(0, R_t)

    v2 特性:
        - 二维状态 [α, β]:截距吸收独立漂移,消除虚假 β 信号
        - Joseph 稳定形式:P 更新保证半正定
        - Huber innovation clipping:抗厚尾分布
        - Sage-Husa 自适应 R:在线校准观测噪声(仅正值更新,修复低估偏差)
        - Innovation-based Q_β 自适应:每配对自动最优追踪速度

    双用途输出:
        - kalman_beta → BOCPD 体制检测(via innovation)+ hedge ratio(直接使用)
        - P_beta → hedge ratio 置信度(过大时降级为 OLS β)

    每根新 4h K线调用 update() 一次。
    """

    def __init__(
        self,
        alpha_init: float,
        beta_init: float,
        q_alpha: float = 1e-5,
        q_beta: float = 1e-4,
        r_init: float = 1e-2,
        p0_alpha: float = 0.1,
        p0_beta: float = 1.0,
        r_forget: float = 0.95,
        r_floor: float = 1e-8,
        clip_sigma: float = 3.0,
        # Q 自适应参数(v2 新增)
        q_adapt_enabled: bool = True,
        q_adapt_rate: float = 0.05,
        q_adapt_upper: float = 2.0,
        q_adapt_lower: float = 0.3,
        q_scale_up: float = 1.05,
        q_scale_down: float = 0.98,
        q_beta_ceil: float = 1e-2,
        q_beta_floor: float = 1e-6,
    ):
        self.x = np.array([alpha_init, beta_init], dtype=np.float64)
        self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
        self.P = np.diag([p0_alpha, p0_beta]).astype(np.float64)
        self.R = float(r_init)
        self._r_forget = r_forget
        self._r_floor = r_floor
        self._clip_sigma = clip_sigma
        self._n_updates = 0

        # Q 自适应状态(v2 新增)
        self._q_adapt_enabled = q_adapt_enabled
        self._q_adapt_rate = q_adapt_rate
        self._q_adapt_upper = q_adapt_upper
        self._q_adapt_lower = q_adapt_lower
        self._q_scale_up = q_scale_up
        self._q_scale_down = q_scale_down
        self._q_beta_ceil = q_beta_ceil
        self._q_beta_floor = q_beta_floor
        self._innov_sq_ema = 1.0  # ν̄: EMA of normalized innovation squared

    @property
    def alpha(self) -> float:
        return float(self.x[0])

    @property
    def beta(self) -> float:
        return float(self.x[1])

    @property
    def beta_variance(self) -> float:
        return float(self.P[1, 1])

    @property
    def q_beta(self) -> float:
        return float(self.Q[1, 1])

    @property
    def innov_sq_ema(self) -> float:
        return self._innov_sq_ema

    def update(self, r_btc: float, r_alt: float) -> dict:
        """接收一对新的对数收益率,更新 [α, β] 估计

        Returns:
            dict with keys: alpha, beta, P_beta, P_alpha,
                  innovation, normalized_innovation, clipped,
                  kalman_gain_beta, R, q_beta, innov_sq_ema
        """
        # 预测步
        P_pred = self.P + self.Q
        H = np.array([1.0, r_btc], dtype=np.float64)

        # Innovation
        innovation = r_alt - H @ self.x
        S = float(H @ P_pred @ H) + self.R
        S = max(S, 1e-15)

        sqrt_S = S ** 0.5
        norm_innov = innovation / sqrt_S

        # Huber clipping
        clipped = False
        innovation_for_update = innovation
        if abs(norm_innov) > self._clip_sigma:
            innovation_for_update = self._clip_sigma * sqrt_S * (
                1.0 if innovation > 0 else -1.0
            )
            clipped = True

        # Kalman 增益 + 更新
        K = (P_pred @ H) / S
        self.x = self.x + K * innovation_for_update

        # Joseph 稳定形式
        I2 = np.eye(2)
        A = I2 - np.outer(K, H)
        self.P = A @ P_pred @ A.T + self.R * np.outer(K, K)

        # Sage-Husa R 自适应(v2 修复:仅正值更新)
        self._n_updates += 1
        if self._n_updates > 10:
            b = self._r_forget
            d_t = (1.0 - b) / (1.0 - b ** (self._n_updates + 1))
            r_sample = innovation * innovation - float(H @ P_pred @ H)
            if r_sample > 0:  # v2: 仅正值时更新,避免系统性低估
                self.R = (1.0 - d_t) * self.R + d_t * r_sample
                self.R = max(self.R, self._r_floor)
            # else: R 保持不变(innovation 与模型一致)

        # Innovation-based Q_β 自适应(v2 新增)
        if self._q_adapt_enabled and self._n_updates > 20:
            nu = norm_innov * norm_innov  # ν_t
            self._innov_sq_ema = (
                (1.0 - self._q_adapt_rate) * self._innov_sq_ema
                + self._q_adapt_rate * nu
            )

            current_q_beta = self.Q[1, 1]
            if self._innov_sq_ema > self._q_adapt_upper:
                new_q_beta = min(current_q_beta * self._q_scale_up, self._q_beta_ceil)
            elif self._innov_sq_ema < self._q_adapt_lower:
                new_q_beta = max(current_q_beta * self._q_scale_down, self._q_beta_floor)
            else:
                new_q_beta = current_q_beta
            self.Q[1, 1] = new_q_beta

        return {
            'alpha': float(self.x[0]),
            'beta': float(self.x[1]),
            'P_beta': float(self.P[1, 1]),
            'P_alpha': float(self.P[0, 0]),
            'innovation': innovation,
            'normalized_innovation': norm_innov,
            'clipped': clipped,
            'kalman_gain_beta': float(K[1]),
            'R': self.R,
            'q_beta': float(self.Q[1, 1]),
            'innov_sq_ema': self._innov_sq_ema,
        }

    def state_dict(self) -> dict:
        return {
            'x': self.x.tolist(), 'P': self.P.tolist(), 'Q': self.Q.tolist(),
            'R': self.R, '_r_forget': self._r_forget, '_r_floor': self._r_floor,
            '_clip_sigma': self._clip_sigma, '_n_updates': self._n_updates,
            '_q_adapt_enabled': self._q_adapt_enabled,
            '_q_adapt_rate': self._q_adapt_rate,
            '_q_adapt_upper': self._q_adapt_upper,
            '_q_adapt_lower': self._q_adapt_lower,
            '_q_scale_up': self._q_scale_up,
            '_q_scale_down': self._q_scale_down,
            '_q_beta_ceil': self._q_beta_ceil,
            '_q_beta_floor': self._q_beta_floor,
            '_innov_sq_ema': self._innov_sq_ema,
        }

    @classmethod
    def from_state_dict(cls, d: dict) -> 'VectorKalmanBetaEstimator':
        obj = cls.__new__(cls)
        obj.x = np.array(d['x'], dtype=np.float64)
        obj.P = np.array(d['P'], dtype=np.float64)
        obj.Q = np.array(d['Q'], dtype=np.float64)
        obj.R = d['R']
        obj._r_forget = d['_r_forget']
        obj._r_floor = d['_r_floor']
        obj._clip_sigma = d['_clip_sigma']
        obj._n_updates = d['_n_updates']
        obj._q_adapt_enabled = d.get('_q_adapt_enabled', True)
        obj._q_adapt_rate = d.get('_q_adapt_rate', 0.05)
        obj._q_adapt_upper = d.get('_q_adapt_upper', 2.0)
        obj._q_adapt_lower = d.get('_q_adapt_lower', 0.3)
        obj._q_scale_up = d.get('_q_scale_up', 1.05)
        obj._q_scale_down = d.get('_q_scale_down', 0.98)
        obj._q_beta_ceil = d.get('_q_beta_ceil', 1e-2)
        obj._q_beta_floor = d.get('_q_beta_floor', 1e-6)
        obj._innov_sq_ema = d.get('_innov_sq_ema', 1.0)
        return obj

改动函数:calculate_cointegration_params_dual_window()

在现有 OLS 之后,新增 Kalman Filter 输出:

def calculate_cointegration_params_dual_window(
    base_klines, alt_klines,
    beta_window=None, zscore_window=None,
    kalman_state: dict | None = None,  # [新增]
):
    # ... 现有 OLS 逻辑不变 ...

    # ═══ 新增:向量 Kalman Filter 更新 ═══
    kalman_result = None
    kalman_state_out = kalman_state

    if len(aligned) >= 2:
        r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
        r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])

        if kalman_state is not None:
            kf = VectorKalmanBetaEstimator.from_state_dict(kalman_state)
        else:
            ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
            kf = VectorKalmanBetaEstimator(
                alpha_init=alpha_ols if use_alpha else 0.0,
                beta_init=beta_ols,
                q_alpha=KALMAN_Q_ALPHA, q_beta=KALMAN_Q_BETA,
                r_init=max(ols_residual_var, 1e-6),
                p0_alpha=KALMAN_P0_ALPHA, p0_beta=KALMAN_P0_BETA,
                r_forget=KALMAN_R_FORGET, r_floor=KALMAN_R_FLOOR,
                clip_sigma=KALMAN_CLIP_SIGMA,
                q_adapt_enabled=KALMAN_Q_ADAPT_ENABLED,
                q_adapt_rate=KALMAN_Q_ADAPT_RATE,
                q_adapt_upper=KALMAN_Q_ADAPT_UPPER,
                q_adapt_lower=KALMAN_Q_ADAPT_LOWER,
                q_scale_up=KALMAN_Q_SCALE_UP,
                q_scale_down=KALMAN_Q_SCALE_DOWN,
                q_beta_ceil=KALMAN_Q_BETA_CEIL,
                q_beta_floor=KALMAN_Q_BETA_FLOOR,
            )

        kalman_result = kf.update(r_btc, r_alt)
        kalman_state_out = kf.state_dict()

    return {
        # ... 现有字段不变 ...
        'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
        'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
        'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
        'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
        'kalman_clipped': kalman_result['clipped'] if kalman_result else False,
        'kalman_q_beta': kalman_result['q_beta'] if kalman_result else KALMAN_Q_BETA,
        'kalman_innov_sq_ema': kalman_result['innov_sq_ema'] if kalman_result else 1.0,
        'kalman_state': kalman_state_out,
    }

改动函数:analyze_pair_advanced() / analyze_multi_period()

透传方式与 v1 相同,新增透传 kalman_q_betakalman_innov_sq_ema 字段。


4.2 BOCPD 贝叶斯在线变点检测(用途 1)

算法原理

维护 run length(距上次变点的步数)的后验分布

P(r_t | data_1:t)

r_t = 0 → 当前时刻刚发生变点
r_t = n → 距上次变点已过 n 步

递推公式

1. 预测概率(当前观测在 run length = r 下的似然):
   π_t(r) = P(x_t | x_{t-r:t-1})

2. Growth(无变点):
   P(r_t = r+1, data) = P(r_{t-1} = r, data) × π_t(r) × (1 - H_t)

3. Changepoint(变点,run 重置):
   P(r_t = 0, data) = Σ_r P(r_{t-1} = r, data) × π_t(r) × H_t

4. 归一化

其中 H_t = hazard rate(先验变点概率)。v2 改进:H_t 由 innovation 方差自适应驱动(见下文)。

观测模型:监控 Kalman normalized_innovation 序列,采用 正态-逆Gamma 共轭先验,预测分布为 Student-t

充分统计量(每个 run 维护):
  κ_r = κ₀ + n_r
  μ̂_r = (κ₀ × μ₀ + Σ x_i) / κ_r
  α_r = α₀ + n_r / 2
  β_nig_r = β_nig₀ + 0.5 × (Σ x_i² - κ_r × μ̂_r² + κ₀ × μ₀²)

预测分布: Student-t(2α_r), location=μ̂_r, scale²=β_nig_r(κ_r+1)/(α_r κ_r)

v2 命名改进:充分统计量中的 β 改为 β_nig(Normal-Inverse-Gamma 的 β 参数),避免与回归系数 β 混淆。

自适应 Hazard Rate(v2 新增)

固定 H=1/50 的问题:β 剧烈波动时检测太慢,β 稳定时假阳性过多。

自适应策略:利用 Kalman 输出的 innov_sq_ema(ν̄_t)调整 H:

H_t = H_base × max(1.0, ν̄_t / ν_ref)
H_t = clip(H_t, H_min, H_max)

H_base = 1/50 ≈ 0.02
ν_ref  = 1.5    (开始放大的阈值)
H_min  = 1/200 = 0.005
H_max  = 1/10  = 0.1
ν̄_t 状态 含义 H_t 效果
ν̄ ≈ 1.0 innovation 正常 H_base (0.02) 标准检测灵敏度
ν̄ ≈ 3.0 innovation 偏大 H_base × 2.0 (0.04) 更容易检测变点
ν̄ ≈ 6.0 innovation 很大 H_max (0.10) 最高灵敏度

原理:ν̄_t 大意味着 Kalman 模型不适配(Q 正在调整中或 β 正在快速变化),此时应提高变点先验概率。

文件:src/trading/config.py

StrategyParams 新增字段:

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # Beta 体制自适应过滤器(Vector Kalman + BOCPD)
    beta_regime_enabled: bool = True
    beta_regime_hazard_rate: float = 1/50    # 基础先验变点概率 ≈ 每50根4h K线(~8天)一次
    beta_regime_max_run_length: int = 200    # run length 截断(内存控制)
    beta_regime_soft_prob: float = 0.3       # P(变点) > 此值 → 开始缩放阈值
    beta_regime_hard_prob: float = 0.7       # P(变点) > 此值 → 硬拦截
    beta_regime_scale_max: float = 2.0       # 阈值最大缩放倍数
    beta_regime_warmup: int = 5              # 最少 N 根 4h K线才开始判定
    # v2 新增
    beta_regime_adaptive_hazard: bool = True # 启用自适应 Hazard Rate
    beta_regime_hazard_min: float = 1/200    # H 下限
    beta_regime_hazard_max: float = 1/10     # H 上限
    beta_regime_hazard_innov_ref: float = 1.5  # ν̄ 开始放大 H 的阈值

文件:src/trading/strategy.py

_BetaRegimeState 数据类
@dataclass
class _BetaRegimeState:
    """Beta 体制检测结果"""
    regime: str               # 'stable' | 'expanding'
    changepoint_prob: float   # 变点概率 P(r_t < warmup)
    expected_run_length: float
    kalman_P_beta: float
    threshold_scale: float    # 阈值缩放因子 (>=1.0)
    hard_block: bool
    reason: str
_BOCPDDetector
class _BOCPDDetector:
    """Bayesian Online Changepoint Detection (Adams & MacKay, 2007)

    监控 Kalman normalized_innovation 的分布变化,
    通过 run length 后验分布检测 β 的结构性变点。

    v2 改进:
        - 自适应 Hazard Rate(基于 innovation 方差动态调整)
        - 充分统计量命名消歧(β_nig vs 回归 β)
        - 截断尾部概率合并(减少归一化偏差)
        - scipy.special.gammaln 模块级导入

    内存: O(max_run_length) per pair
    计算: O(max_run_length) per update
    """

    # Normal-Inverse-Gamma 先验超参
    _MU0 = 0.0
    _KAPPA0 = 1.0
    _ALPHA0 = 1.0
    _BETA_NIG0 = 0.5  # v2: 改名避免与回归 β 混淆

    def __init__(
        self,
        hazard_rate: float = 1/50,
        max_run_length: int = 200,
        # v2 自适应 Hazard 参数
        adaptive_hazard: bool = True,
        hazard_min: float = 1/200,
        hazard_max: float = 1/10,
        hazard_innov_ref: float = 1.5,
    ):
        self._H_base = hazard_rate
        self._max_rl = max_run_length
        self._adaptive_hazard = adaptive_hazard
        self._hazard_min = hazard_min
        self._hazard_max = hazard_max
        self._hazard_innov_ref = hazard_innov_ref

        self._run_length_probs: dict[PairKey, np.ndarray] = {}
        self._suf_stats: dict[PairKey, dict] = {}
        self._last_kline_time: dict[PairKey, str] = {}
        self._n_updates: dict[PairKey, int] = {}

    def _get_hazard(self, innov_sq_ema: float | None) -> float:
        """计算当前 hazard rate(v2 自适应)"""
        if not self._adaptive_hazard or innov_sq_ema is None:
            return self._H_base
        scale = max(1.0, innov_sq_ema / self._hazard_innov_ref)
        h = self._H_base * scale
        return np.clip(h, self._hazard_min, self._hazard_max)

    def update(self, key: PairKey, normalized_innovation: float,
               kline_time: str,
               innov_sq_ema: float | None = None,  # v2: 用于自适应 H
               ) -> None:
        """每 5m tick 调用,基于 kline_time 去重"""
        if normalized_innovation is None:
            return
        if self._last_kline_time.get(key) == kline_time:
            return
        self._last_kline_time[key] = kline_time

        x = normalized_innovation
        H = self._get_hazard(innov_sq_ema)  # v2: 自适应 hazard

        # 初始化
        if key not in self._run_length_probs:
            self._run_length_probs[key] = np.array([1.0])
            self._suf_stats[key] = {
                'kappa': np.array([self._KAPPA0]),
                'mu': np.array([self._MU0]),
                'alpha': np.array([self._ALPHA0]),
                'beta_nig': np.array([self._BETA_NIG0]),  # v2: 改名
            }
            self._n_updates[key] = 0

        self._n_updates[key] = self._n_updates.get(key, 0) + 1

        probs = self._run_length_probs[key]
        ss = self._suf_stats[key]
        n_rl = len(probs)

        # Step 1: 每个 run length 下的 Student-t 预测概率
        kappa, mu, alpha = ss['kappa'], ss['mu'], ss['alpha']
        beta_nig = ss['beta_nig']  # v2: 改名
        df = 2.0 * alpha
        scale = np.sqrt(np.maximum(beta_nig * (kappa + 1.0) / (alpha * kappa), 1e-15))
        pred_probs = self._student_t_pdf(x, df, mu, scale)

        # Step 2-3: Growth + Changepoint
        growth_probs = probs * pred_probs * (1.0 - H)
        cp_prob = np.sum(probs * pred_probs * H)

        # Step 4: 组合 + 归一化
        new_probs = np.empty(n_rl + 1)
        new_probs[0] = cp_prob
        new_probs[1:] = growth_probs
        total = np.sum(new_probs)
        if total > 0:
            new_probs /= total
        else:
            new_probs = np.zeros(n_rl + 1)
            new_probs[0] = 1.0

        # Step 5: 更新充分统计量
        new_kappa = np.empty(n_rl + 1)
        new_mu = np.empty(n_rl + 1)
        new_alpha = np.empty(n_rl + 1)
        new_beta_nig = np.empty(n_rl + 1)

        new_kappa[0], new_mu[0] = self._KAPPA0, self._MU0
        new_alpha[0], new_beta_nig[0] = self._ALPHA0, self._BETA_NIG0

        new_kappa[1:] = kappa + 1.0
        new_mu[1:] = (kappa * mu + x) / (kappa + 1.0)
        new_alpha[1:] = alpha + 0.5
        new_beta_nig[1:] = beta_nig + kappa * (x - mu) ** 2 / (2.0 * (kappa + 1.0))

        # Step 6: 截断(v2 改进:尾部概率合并到最后一个 run length)
        if len(new_probs) > self._max_rl:
            tail_mass = np.sum(new_probs[self._max_rl:])
            new_probs = new_probs[:self._max_rl]
            new_probs[-1] += tail_mass  # v2: 合并而非丢弃
            new_kappa = new_kappa[:self._max_rl]
            new_mu = new_mu[:self._max_rl]
            new_alpha = new_alpha[:self._max_rl]
            new_beta_nig = new_beta_nig[:self._max_rl]
            total = np.sum(new_probs)
            if total > 0:
                new_probs /= total

        self._run_length_probs[key] = new_probs
        self._suf_stats[key] = {
            'kappa': new_kappa, 'mu': new_mu,
            'alpha': new_alpha, 'beta_nig': new_beta_nig,
        }

    @staticmethod
    def _student_t_pdf(x: float, df: np.ndarray, loc: np.ndarray,
                       scale: np.ndarray) -> np.ndarray:
        """向量化 Student-t PDF(gammaln 已在模块级导入)"""
        z = (x - loc) / scale
        log_pdf = (
            gammaln((df + 1) / 2) - gammaln(df / 2)
            - 0.5 * np.log(df * np.pi) - np.log(scale)
            - (df + 1) / 2 * np.log(1 + z * z / df)
        )
        return np.exp(log_pdf)

    def check(self, key: PairKey, soft_prob: float, hard_prob: float,
              scale_max: float, warmup: int) -> _BetaRegimeState:
        """检测当前 Beta 体制状态

        变点概率定义: P(r_t < warmup) — 最近 warmup 步内是否有变点
        """
        n = self._n_updates.get(key, 0)
        if n < warmup:
            return _BetaRegimeState('stable', 0.0, float(n), 0.0, 1.0, False, "数据不足")

        probs = self._run_length_probs.get(key)
        if probs is None or len(probs) == 0:
            return _BetaRegimeState('stable', 0.0, 0.0, 0.0, 1.0, False, "无数据")

        cp_prob = float(np.sum(probs[:min(warmup, len(probs))]))
        expected_rl = float(np.sum(np.arange(len(probs)) * probs))

        if cp_prob >= hard_prob:
            return _BetaRegimeState(
                'expanding', cp_prob, expected_rl, 0.0, scale_max, True,
                f"Beta硬拦截: P(变点)={cp_prob:.3f}>={hard_prob} E[rl]={expected_rl:.1f}"
            )

        if cp_prob >= soft_prob:
            import math
            midpoint = (soft_prob + hard_prob) / 2
            steepness = 6.0 / max(hard_prob - soft_prob, 0.01)
            steepness = min(steepness, 20.0)  # v2: 上限防止退化为阶跃函数
            t = 1.0 / (1.0 + math.exp(-steepness * (cp_prob - midpoint)))
            scale = 1.0 + t * (scale_max - 1.0)
            return _BetaRegimeState(
                'expanding', cp_prob, expected_rl, 0.0, scale, False,
                f"Beta缩放: P(变点)={cp_prob:.3f} scale={scale:.2f} E[rl]={expected_rl:.1f}"
            )

        return _BetaRegimeState(
            'stable', cp_prob, expected_rl, 0.0, 1.0, False,
            f"Beta稳定: P(变点)={cp_prob:.3f} E[rl]={expected_rl:.1f}"
        )

    def cleanup_pair(self, key: PairKey) -> None:
        self._run_length_probs.pop(key, None)
        self._suf_stats.pop(key, None)
        self._last_kline_time.pop(key, None)
        self._n_updates.pop(key, None)

4.3 跨配对系统性风险聚合(v2 加权版)

设计原理

v1 使用简单的 expanding 配对计数比例,存在两个问题:

  1. 忽略配对间差异:8 个同赛道配对 expanding 与 8 个分散配对含义不同
  2. 无仓位加权:持仓量大的配对风险更高

v2 使用加权变点概率作为系统性风险指标,同时保留计数比例作为辅助判断。

文件:src/trading/strategy.py

class _SystemicRiskAggregator:
    """统计所有配对的 BOCPD 体制状态,
    基于加权变点概率判断系统性风险。

    v2 改进:
        - 加权聚合(支持仓位权重)
        - 双条件触发(加权概率 OR 计数比例)
        - 输出更详细的诊断信息
    """

    def __init__(self, enabled: bool = True):
        self._enabled = enabled

    def check(
        self,
        pair_states: dict[PairKey, _BetaRegimeState],
        systemic_threshold: float = 0.3,
        position_weights: dict[PairKey, float] | None = None,
    ) -> tuple[bool, str]:
        if not self._enabled or not pair_states:
            return False, ""

        total = len(pair_states)
        n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
        ratio = n_expanding / total

        # 加权平均变点概率(v2 新增)
        if position_weights:
            total_weight = sum(position_weights.get(k, 1.0) for k in pair_states)
            weighted_cp = sum(
                position_weights.get(k, 1.0) * s.changepoint_prob
                for k, s in pair_states.items()
            ) / total_weight
        else:
            # 无仓位权重时用等权平均
            weighted_cp = sum(s.changepoint_prob for s in pair_states.values()) / total

        # 双条件触发:计数比例 OR 加权概率
        triggered = ratio >= systemic_threshold or weighted_cp >= systemic_threshold * 0.8

        if triggered:
            return True, (
                f"系统性风险: {n_expanding}/{total} ({ratio:.0%}) 配对扩张态, "
                f"加权P(变点)={weighted_cp:.3f}"
            )
        return False, ""

4.4 Beta 加权仓位计算(用途 2,v2 负 β 感知版)

设计原理

标准配对交易的对冲方程:

spread_t = log(P_alt_t) - β × log(P_base_t) - α

要使 spread 对 P_base 的变动无敞口,两腿名义价值须满足:

alt_notional = |β| × base_notional

当前系统使用 alt_notional = base_notional(等额),等价于隐含 β=1,对所有配对都错。

hedge_beta 选择逻辑(v2 改进)

v2 改进:返回 β 的符号信息(beta_negative),供策略层判断方向逻辑。当 β < 0 时两资产反向相关,long alt + long base 才是对冲(而非 long alt + short base)。

def resolve_hedge_beta(
    kalman_beta: float | None,
    kalman_P_beta: float | None,
    ols_beta: float | None,
    p_beta_max: float = HEDGE_BETA_P_MAX,
    beta_min: float = HEDGE_BETA_MIN,
    beta_max: float = HEDGE_BETA_MAX,
) -> tuple[float, str, bool]:
    """选择用于仓位计算的 hedge_beta

    优先 Kalman β,不确定性过高时降级为 OLS β,最后兜底 β=1.0

    Returns:
        (hedge_beta, source, beta_negative)
        hedge_beta: 绝对值,用于仓位比例计算
        source: 'kalman' | 'ols' | 'default'
        beta_negative: True 表示负相关配对,策略层需翻转方向
    """
    raw_beta = None
    source = 'default'

    # 优先使用 Kalman β
    if kalman_beta is not None and kalman_P_beta is not None:
        if kalman_P_beta <= p_beta_max:
            raw_beta = kalman_beta
            source = 'kalman'
        else:
            logger.debug(f"Kalman P_β={kalman_P_beta:.4f} > {p_beta_max},降级为 OLS β")

    # 降级使用 OLS β
    if raw_beta is None and ols_beta is not None:
        raw_beta = ols_beta
        source = 'ols'

    # 兜底
    if raw_beta is None:
        return 1.0, 'default', False

    beta_negative = raw_beta < 0
    beta = np.clip(abs(raw_beta), beta_min, beta_max)

    if beta_negative:
        logger.warning(f"负 β 检测: raw_β={raw_beta:.4f}({source}),配对为反向相关")

    return float(beta), source, beta_negative

文件:src/trading/risk_manager.py

改动 calculate_position_size() 增加 hedge_beta 参数:

def calculate_position_size(
    self,
    signal: PairTradeSignal,
    alt_price: float,
    base_price: float = 0.0,
    available_balance: float = 0.0,
    hedge_beta: float = 1.0,        # [新增] hedge ratio
    hedge_beta_source: str = '',    # [新增] 来源标记 ('kalman'/'ols'/'default')
) -> tuple[float, float]:
    """计算仓位大小(Beta 加权)

    Beta 加权逻辑:
        base_notional = total_position / (1 + |β|)
        alt_notional  = |β| × base_notional
        总名义价值 = base_notional + alt_notional = total_position(保持不变)

    当 β=1.0 时退化为等额(向后兼容)。
    """
    # ... 现有 base_usd / 信号强度缩放 / 上限检查 / 余额检查 逻辑不变 ...

    # 转换为币种数量(Beta 加权)
    alt_size = 0.0
    base_size = 0.0

    if self._config.pair_mode == "pair" and base_price > 0 and alt_price > 0:
        abs_beta = max(abs(hedge_beta), 0.1)
        total_position = position_usd * 2  # 两腿总名义(原逻辑中每腿 position_usd)

        base_notional = total_position / (1.0 + abs_beta)
        alt_notional = abs_beta * base_notional

        alt_size = alt_notional / alt_price
        base_size = base_notional / base_price

        logger.info(
            f"仓位计算(β加权): 基础=${base_usd:.0f} × 缩放={scale} | "
            f"β={hedge_beta:.3f}({hedge_beta_source}) | "
            f"Alt: {alt_size:.2f} × ${alt_price:.4f} ≈ ${alt_notional:.0f} | "
            f"Base: {base_size:.2f} × ${base_price:.4f} ≈ ${base_notional:.0f} | "
            f"比例 {alt_notional/base_notional:.2f}:1"
        )
    elif alt_price > 0:
        alt_size = position_usd / alt_price
        logger.info(
            f"仓位计算(单腿): 基础=${base_usd:.0f} × 缩放={scale} = ${position_usd:.0f} | "
            f"开仓: {alt_size:.2f} × ${alt_price:.4f}"
        )

    return alt_size, base_size

β 加权的名义分配数学

β 值 base_notional alt_notional 比例
0.3 $154 $46 0.3:1
0.5 $133 $67 0.5:1
1.0 $100 $100 1.0:1(等额,向后兼容)
2.0 $67 $133 2.0:1
3.0 $50 $150 3.0:1

(以每腿 $100 基准,total_position=$200 为例)

文件:src/trading/models.py

PairTradeSignal 新增字段:

@dataclass
class PairTradeSignal:
    # ... 现有字段 ...
    hedge_beta: float = 1.0              # [新增] 用于仓位比例的 β
    hedge_beta_source: str = 'default'   # [新增] β 来源 ('kalman'/'ols'/'default')
    beta_negative: bool = False          # [v2 新增] 负 β 标记

PairPosition 新增字段:

@dataclass
class PairPosition:
    # ... 现有字段 ...
    entry_hedge_beta: float = 1.0        # [新增] 入场时的 hedge β(用于复盘分析)

4.5 编排层集成

文件:src/trading/orchestrator.py

新增状态_kalman_states: dict[PairKey, dict]

class TradingOrchestrator:
    def __init__(self, ...):
        # ... 现有 ...
        self._kalman_states: dict[PairKey, dict] = {}  # [新增] Kalman 状态持久化

process_analysis() 改动:从 multi_period_result 提取 Kalman 输出,传给 strategy.process_tick()on_entry_signal()

def process_analysis(self, symbol, z4h, multi_period_result, timestamp, ...):
    # ... 现有输入验证 ...

    # [新增] 提取 Kalman 输出
    kalman_beta = multi_period_result.get('kalman_beta')
    kalman_P_beta = multi_period_result.get('kalman_P_beta')
    kalman_innovation = multi_period_result.get('kalman_innovation')
    kalman_innov_sq_ema = multi_period_result.get('kalman_innov_sq_ema')  # v2

    # [新增] 更新 Kalman 状态缓存
    kalman_state_new = multi_period_result.get('kalman_state')
    if kalman_state_new is not None:
        self._kalman_states[(symbol, base_symbol)] = kalman_state_new

    entry_signal, exit_signal = self._strategy.process_tick(
        symbol, base_symbol, z4h, timestamp,
        kline_time=kline_time, latest_price=price_for_log,
        kalman_innovation=kalman_innovation,        # [新增] 用途 1
        kalman_P_beta=kalman_P_beta,                # [新增] 用途 1
        kalman_innov_sq_ema=kalman_innov_sq_ema,    # [v2] 用于自适应 H
        alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv,
    )

    # 处理入场信号
    if entry_signal is not None:
        # [新增] 计算 hedge_beta(用途 2)
        ols_beta = multi_period_result.get('details', {}).get(
            ('4h', '60d'), {}).get('cointegration_new', {}).get('beta')
        hedge_beta, hedge_source, beta_negative = resolve_hedge_beta(
            kalman_beta, kalman_P_beta, ols_beta
        )

        # v2: 负 β 时记录警告(方向逻辑调整为 P2 后续)
        if beta_negative:
            logger.warning(
                f"负 β 配对 {symbol}|{base_symbol}: β={kalman_beta:.3f}, "
                f"当前方向逻辑可能需要翻转,建议人工复核"
            )

        self.on_entry_signal(
            symbol, multi_period_result,
            latest_alt_price=price_for_log,
            direction=entry_signal.direction,
            adaptive_z=entry_signal.adaptive_z,
            hedge_beta=hedge_beta,              # [新增]
            hedge_beta_source=hedge_source,     # [新增]
            beta_negative=beta_negative,        # [v2]
        )

on_entry_signal() 改动:传递 hedge_beta 到信号:

def on_entry_signal(
    self, symbol, multi_period_result,
    latest_alt_price=None, avg_zscore_4h=None,
    direction="", signal_strength="medium", adaptive_z=0.0,
    l2_snapshot=None,
    hedge_beta: float = 1.0,            # [新增]
    hedge_beta_source: str = 'default', # [新增]
    beta_negative: bool = False,        # [v2]
) -> bool:
    # ...
    signal = PairTradeSignal(
        # ... 现有字段 ...
        hedge_beta=hedge_beta,                # [新增]
        hedge_beta_source=hedge_beta_source,  # [新增]
        beta_negative=beta_negative,          # [v2]
    )
    # ...

文件:src/trading/position_manager.py

_open_position_inner() 改动:传递 hedge_beta 到 risk_manager:

def _open_position_inner(self, signal: PairTradeSignal, adaptive_z: float = 0.0):
    # ... 现有价格获取 ...

    alt_size, base_size = self._risk_manager.calculate_position_size(
        signal, alt_price, base_price, available_balance,
        hedge_beta=signal.hedge_beta,              # [新增]
        hedge_beta_source=signal.hedge_beta_source,  # [新增]
    )

    position = PairPosition(
        # ... 现有字段 ...
        entry_hedge_beta=signal.hedge_beta,  # [新增] 记录入场 β
    )

4.6 策略层集成(用途 1)

文件:src/trading/strategy.py

__init__

self._bocpd = _BOCPDDetector(
    hazard_rate=default_params.beta_regime_hazard_rate,
    max_run_length=default_params.beta_regime_max_run_length,
    adaptive_hazard=default_params.beta_regime_adaptive_hazard,   # v2
    hazard_min=default_params.beta_regime_hazard_min,             # v2
    hazard_max=default_params.beta_regime_hazard_max,             # v2
    hazard_innov_ref=default_params.beta_regime_hazard_innov_ref, # v2
)
self._systemic_aggregator = _SystemicRiskAggregator(enabled=True)
self._all_pair_states: dict[PairKey, _BetaRegimeState] = {}

process_tick() 签名新增

def process_tick(
    self, symbol, base_symbol, z4h, timestamp,
    kline_time=None, latest_price=None,
    kalman_innovation: float | None = None,      # [新增] 用途 1
    kalman_P_beta: float | None = None,          # [新增] 用途 1
    kalman_innov_sq_ema: float | None = None,    # [v2] 用于自适应 H
    alt_ohlcv=None, base_ohlcv=None,
) -> tuple[EntrySignal | None, ExitSignal | None]:

_process_tick_unlocked 新 K 线更新块

if is_new_candle:
    # ... 现有: Welford 更新, EMA 更新 ...
    if kalman_innovation is not None and kline_time is not None:
        self._bocpd.update(
            key, kalman_innovation, str(kline_time),
            innov_sq_ema=kalman_innov_sq_ema,  # v2: 传递给自适应 H
        )

_check_entry() — z4h 过滤之后、方向判断之前

# ── [新增] Beta 体制检查 ──
if params.beta_regime_enabled:
    beta_state = self._bocpd.check(
        key,
        soft_prob=params.beta_regime_soft_prob,
        hard_prob=params.beta_regime_hard_prob,
        scale_max=params.beta_regime_scale_max,
        warmup=params.beta_regime_warmup,
    )
    self._all_pair_states[key] = beta_state

    if beta_state.hard_block:
        logger.info(f"Beta体制硬拦截 | {pair_label} | {beta_state.reason} | "
                    f"az={adaptive_z:+.4f} z4h={z4h:+.4f}")
        return None

    is_systemic, systemic_reason = self._systemic_aggregator.check(self._all_pair_states)
    if is_systemic:
        logger.info(f"系统性风险拦截 | {pair_label} | {systemic_reason}")
        return None

    threshold_scale = beta_state.threshold_scale
else:
    beta_state = None
    threshold_scale = 1.0

# ── 方向判断(应用 Beta 缩放)──
threshold = params.adaptive_threshold * threshold_scale
if adaptive_z < -threshold:
    direction = 'long'
elif adaptive_z > threshold:
    direction = 'short'
else:
    if threshold_scale > 1.01:
        logger.info(f"Beta体制缩放拦截 | {pair_label} | "
                    f"az={adaptive_z:+.4f} 有效阈值={threshold:.2f} "
                    f"(原始={params.adaptive_threshold} ×{threshold_scale:.2f}) | "
                    f"{beta_state.reason if beta_state else ''}")
    return None

cleanup_pair()

self._bocpd.cleanup_pair(key)
self._all_pair_states.pop(key, None)

5. 完整数据流

1. WebSocket 4h K线闭合
   ↓
2. realtime_kline_service_base 触发分析
   ↓
3. analyze_multi_period()
   ├─ calculate_cointegration_params_dual_window(kalman_state=缓存)
   │   ├─ OLS 回归 → β_ols, spread, adf_pvalue(现有)
   │   └─ [新增] Kalman update → kalman_beta, innovation, P_β, kalman_state
   │       └─ [v2] Q_β 自适应 → q_beta 自动调整, innov_sq_ema 输出
   ├─ 健康监控 → Gate2(现有)
   └─ 输出: multi_period_result(含 kalman_* 字段)
   ↓
4. orchestrator.process_analysis()
   ├─ 缓存 kalman_state → self._kalman_states[pair_key]
   ├─ 传递 kalman_innovation, kalman_P_beta, kalman_innov_sq_ema → strategy.process_tick()
   │   ├─ [用途 1] BOCPD.update(innovation, innov_sq_ema) → 体制检测
   │   │   └─ [v2] 自适应 H: innov_sq_ema 驱动 hazard rate 调整
   │   └─ [用途 1] _check_entry() → 硬拦截 / 阈值缩放 / 系统性风险
   ├─ 若产生 EntrySignal:
   │   ├─ [用途 2] resolve_hedge_beta(kalman_beta, P_β, ols_beta) → (hedge_beta, source, beta_negative)
   │   └─ on_entry_signal(hedge_beta=...) → PairTradeSignal
   ↓
5. position_manager.open_position(signal)
   ├─ risk_manager.calculate_position_size(hedge_beta=signal.hedge_beta)
   │   ├─ base_notional = total / (1 + |β|)
   │   └─ alt_notional  = |β| × base_notional
   ├─ executor.market_open(alt_size, base_size)
   └─ PairPosition(entry_hedge_beta=β)
   ↓
6. 持久化
   ├─ pair_positions: 含 entry_hedge_beta
   └─ trading_signals: 含 hedge_beta, hedge_beta_source, beta_negative

6. 场景模拟

A:正常市场(β 稳定) — 两个用途均正常

innovation ~ N(0,1): z = [-0.3, 0.8, -0.5, 0.2, ...]
kalman_beta ≈ 0.45, P_β < 0.01
innov_sq_ema ≈ 0.9 → Q_β 保持不变

用途 1: BOCPD P(变点) ≈ 0.02~0.05 → regime=STABLE, scale=1.0 → 正常入场
        H_t = H_base (0.02)(自适应 H 不放大)
用途 2: hedge_beta=0.45(kalman) → alt_notional = 0.45 × base_notional
        比等额更少的 Alt 暴露,正确反映 β<1 的弱相关性

B:β 开始飙升 — Q 自适应 + 自适应 H 联合响应

T=0h:   kalman_beta ≈ 0.5, innov_sq_ema ≈ 1.0 → 正常
T=4h:   innovation 开始偏大, innov_sq_ema → 1.8
        Q_β 自适应: innov_sq_ema < γ_upper(2.0),Q_β 暂不调整
T=8h:   innovation 持续偏大, innov_sq_ema → 2.5
        Q_β 自适应: innov_sq_ema > γ_upper → Q_β *= 1.05(开始加速追踪)
        自适应 H: innov_sq_ema/1.5 = 1.67 → H_t = 0.02 × 1.67 = 0.033
T=12h:  kalman_beta ≈ 1.2, innovation 持续偏正
        用途 1: P(变点) ≈ 0.40 > soft_prob(0.3) → EXPANDING, scale≈1.15
        用途 2: hedge_beta=1.2 → Alt 腿名义增大(如果入场的话)
T=16h:  P(变点) ≈ 0.80 > hard_prob(0.7) → 硬拦截
        用途 2: 不会执行(入场被拦截)

对比 v1(固定 Q + 固定 H):

  • v1 的 Q 过小时 kalman_beta 追踪慢 → innovation 持续偏大更久 → 拦截延迟
  • v2 的 Q 自适应加速追踪 + H 自适应提高灵敏度 → 更早检测到变点

C:β 飙升后企稳 — 恢复后使用新 β

飙升期 P(变点) = 0.85 → 硬拦截
企稳后 innovation 恢复 N(0,1):
  innov_sq_ema → 1.0, Q_β 自适应开始缩小 Q_β
  H_t 恢复到 H_base (0.02)

→ ~6-8 根正常 K线(24-32h)后 P(变点) < soft_prob → 恢复

恢复后:
  用途 1: regime=STABLE → 允许入场
  用途 2: kalman_beta 已追踪到新 β(如 1.8),开仓自动使用新比例
          不像 OLS 还在用旧 β≈0.6

D:β≈0.3 的弱相关配对 — hedge ratio 纠偏最显著

OLS β ≈ 0.31,Kalman β ≈ 0.29

等额开仓(当前系统):
  Alt $100 + Base $100
  Alt 的非系统性风险暴露 = $100 - 0.3×$100 = $70(多余)

β 加权开仓:
  base_notional = $200 / 1.3 ≈ $154
  alt_notional  = 0.3 × $154 ≈ $46
  → Alt 仅 $46,正确对冲 Base $154 的 0.3 倍

E:Kalman 冷启动 — 降级策略

系统重启,kalman_state 丢失:
  T=0:  用 OLS β 初始化 Kalman,P₀ = diag(0.1, 1.0)
        P_β = 1.0 > HEDGE_BETA_P_MAX(0.5)
        → resolve_hedge_beta 降级为 OLS β

  T=3-5 根 K线(12-20h):
        Kalman 收敛,P_β < 0.1
        → resolve_hedge_beta 切回 Kalman β
        Q_β 自适应也完成校准

F:系统性风险(v2 加权版)

20 配对中 8 个 P(变点) > 0.3 → 40% > systemic_threshold=30%
加权P(变点) = 0.35 > systemic_threshold × 0.8 = 0.24 → 也触发
→ 系统性风险拦截 → 所有配对暂停入场
→ hedge_beta 仍在后台更新,恢复后使用最新值

G:MEME vs L1 配对 Q 自适应对比(v2 新增场景)

MEME 配对 (PURR/HYPE):
  β 波动大 → innovation 频繁偏大 → innov_sq_ema ≈ 3.0
  → Q_β 自动从 1e-4 上调到 ~5e-4
  → Kalman 追踪更快:8-12h 响应 β 变化
  → 自适应 H = 0.02 × 2.0 = 0.04:变点检测更灵敏

L1 配对 (ETH/BTC):
  β 极稳定 → innovation 正常 → innov_sq_ema ≈ 0.5
  → Q_β 自动从 1e-4 下调到 ~3e-5
  → Kalman 更平滑:不追日内噪声
  → 自适应 H 保持 0.02:不过度检测

7. 改动文件清单

文件 改动 影响范围
src/config.py +7 Kalman 常量 + 8 Q 自适应常量(v2) + 3 Hedge Ratio 常量 配置层
src/utils/analysis/analysis_core.py +VectorKalmanBetaEstimator 类(含 Q 自适应 v2);增强 calculate_cointegration_params_dual_window(), analyze_pair_advanced(), analyze_multi_period() 分析层
src/trading/config.py StrategyParams +7 字段 + 3 自适应 H 字段(v2);增强 get_strategy_params(), _build_strategy_params(), load_trading_config() 配置层
src/trading/models.py PairTradeSignal +3 字段 (hedge_beta, hedge_beta_source, beta_negative);PairPosition +1 字段 (entry_hedge_beta) 数据模型
src/trading/orchestrator.py +_kalman_states 字典;+resolve_hedge_beta() 函数(v2 返回 beta_negative);增强 process_analysis() 提取 Kalman 输出 + 计算 hedge_beta;增强 on_entry_signal() 传递 hedge_beta 编排层
src/trading/strategy.py +_BetaRegimeState, _BOCPDDetector(v2 自适应 H + 命名消歧 + 截断改进), _SystemicRiskAggregator(v2 加权);增强 process_tick(), _check_entry(), cleanup_pair() 策略层
src/trading/risk_manager.py 增强 calculate_position_size() 支持 β 加权 风控层
src/trading/position_manager.py 增强 _open_position_inner() 传递 hedge_beta 仓位管理

新增依赖scipy.special.gammaln(scipy 已在项目中,模块级导入)

不改动momentum_filter.py, executor.py


8. DB 改动

pair_positions 表新增列

ALTER TABLE pair_positions ADD COLUMN entry_hedge_beta DOUBLE PRECISION DEFAULT 1.0;

trading_signals 表新增列

ALTER TABLE trading_signals ADD COLUMN hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta_source VARCHAR(10) DEFAULT 'default';
ALTER TABLE trading_signals ADD COLUMN beta_negative BOOLEAN DEFAULT FALSE;  -- v2 新增

9. 日志

时机 级别 格式
初始化 INFO Beta体制跟踪器初始化 | BOCPD hazard=0.020 adaptive=True soft=0.30 hard=0.70
Kalman 更新 DEBUG Kalman更新 | PURR|HYPE | α̂=0.002 β̂=1.20 P_β=0.005 innov=2.85 q_β=1.2e-4 ν̄=1.8
Q 自适应调整 DEBUG Kalman Q自适应 | PURR|HYPE | ν̄=2.50>γ_upper(2.0) q_β: 1.0e-4→1.05e-4
hedge_beta 选择 DEBUG hedge_beta=0.45(kalman) P_β=0.003 | OLS_β=0.48 | negative=False
hedge_beta 降级 DEBUG Kalman P_β=0.62 > 0.50,降级为 OLS β=0.48
负 β 警告 WARNING 负β检测: PURR|HYPE raw_β=-0.30(kalman),配对为反向相关
β 加权仓位 INFO 仓位计算(β加权) | β=0.45(kalman) | Alt ≈$62 | Base ≈$138 | 比例 0.45:1
硬拦截 INFO Beta体制硬拦截 | PURR|HYPE | P(变点)=0.850>=0.70 E[rl]=1.2
缩放拦截 INFO Beta体制缩放拦截 | PURR|HYPE | az=-4.50 有效阈值=4.00 (3.0×1.33)
系统性风险 INFO 系统性风险: 8/20 (40%) 配对扩张态, 加权P(变点)=0.350
自适应 H DEBUG 自适应H | PURR|HYPE | ν̄=3.0 H: 0.020→0.040

10. 风险与边界条件

风险 严重度 缓解
已持仓 hedge ratio 偏离 本次仅在入场时计算 β 比例;持仓期间 rebalance 作为 P0 后续
β 符号翻转(kalman_beta < 0) v2: resolve_hedge_beta 返回 beta_negative 标记 + 警告日志;方向翻转逻辑作为 P1 后续
Q 自适应振荡 κ_up/κ_down 不对称(放大快缩小慢)+ Q_β 上下限保护 + EMA 平滑
自适应 H 过于灵敏 H_max=0.1 上限保护;innov_ref=1.5 设定合理启动点
Kalman Q 初始值不适配 Q 自适应会在 20 步(~3.3天)内校准到合理范围
重启丢失 Kalman 状态 OLS 重新初始化,3-5 根 K线收敛;P_β 过大时自动降级为 OLS β
BOCPD 冷启动 warmup=5(20h)内保持 stable 默认
低波动期 r_btc≈0 H=[1,0] 只更新 α,符合物理意义;hedge_beta 保持上一值
β 加权导致极端仓位 HEDGE_BETA_MIN=0.1, HEDGE_BETA_MAX=5.0 上下限保护
Sage-Husa R 偏差 v2 修复:仅正值更新,消除系统性低估
BOCPD 截断偏差 v2 改进:尾部概率合并到最后一个 run length,而非丢弃

不在本次范围:退场逻辑调整、持仓期间 hedge rebalance、HMM 体制分类、Kalman 持久化到 DB、飞书告警、负 β 方向翻转逻辑


11. 验证方案

单元测试

# test_vector_kalman.py
import numpy as np


def test_kalman_convergence():
    """从初始值收敛到真实 [α, β]"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        r_btc = rng.normal(0, 0.02)
        r_alt = 0.001 + 1.0 * r_btc + rng.normal(0, 0.01)
        result = kf.update(r_btc, r_alt)
    assert abs(result['beta'] - 1.0) < 0.15
    assert result['alpha'] > 0


def test_kalman_innovation_spike():
    """β 突变时 innovation 立即飙升"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    result = kf.update(0.02, 3.0 * 0.02)
    assert abs(result['normalized_innovation']) > 2.0


def test_kalman_huber_clipping():
    """极端 innovation 被截断,β 不过度跳变"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2, clip_sigma=3.0)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    result = kf.update(0.02, 20.0 * 0.02)
    assert result['clipped'] is True
    assert result['beta'] < 5.0


def test_kalman_joseph_positive():
    """Joseph 形式保证 P 始终非负"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-6)
    rng = np.random.default_rng(42)
    for _ in range(500):
        result = kf.update(rng.normal(0, 0.05), 0.5 * rng.normal(0, 0.05) + rng.normal(0, 0.001))
        assert result['P_beta'] >= 0 and result['P_alpha'] >= 0


def test_kalman_alpha_absorbs_drift():
    """α 吸收独立漂移,β 不被污染"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        result = kf.update(rng.normal(0, 0.02), 0.005 + 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    assert abs(result['beta'] - 0.5) < 0.2
    assert result['alpha'] > 0.001


def test_kalman_state_persistence():
    """状态导出/恢复后行为一致"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf1 = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2)
    kf1.update(0.02, 0.01)
    kf2 = VectorKalmanBetaEstimator.from_state_dict(kf1.state_dict())
    r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
    assert abs(r1['beta'] - r2['beta']) < 1e-10


def test_kalman_q_adapt_increase():
    """v2: innovation 持续偏大时 Q_β 自动增大"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(
        alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2,
        q_adapt_enabled=True, q_adapt_upper=2.0,
    )
    rng = np.random.default_rng(42)
    # 先稳定
    for _ in range(30):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    q_before = kf.q_beta
    # 制造大 innovation(β 突变)
    for _ in range(30):
        kf.update(rng.normal(0, 0.02), 2.0 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    q_after = kf.q_beta
    assert q_after > q_before, f"Q_β should increase: {q_before} → {q_after}"


def test_kalman_q_adapt_decrease():
    """v2: innovation 持续偏小时 Q_β 自动减小"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(
        alpha_init=0.0, beta_init=0.5, q_beta=1e-3, r_init=1e-2,
        q_adapt_enabled=True, q_adapt_lower=0.3,
    )
    rng = np.random.default_rng(42)
    # 先稳定,用大 Q
    for _ in range(30):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    q_before = kf.q_beta
    # 长期稳定(innovation 偏小因为 Q 过大)
    for _ in range(100):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    q_after = kf.q_beta
    assert q_after < q_before, f"Q_β should decrease: {q_before} → {q_after}"


def test_kalman_q_adapt_bounds():
    """v2: Q_β 不超出上下限"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(
        alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=1e-2,
        q_adapt_enabled=True, q_beta_ceil=1e-2, q_beta_floor=1e-6,
    )
    rng = np.random.default_rng(42)
    for _ in range(500):
        kf.update(rng.normal(0, 0.02), 5.0 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    assert kf.q_beta <= 1e-2
    assert kf.q_beta >= 1e-6


def test_kalman_sage_husa_no_negative_bias():
    """v2: Sage-Husa R 不会被系统性低估"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    true_R = 1e-3
    kf = VectorKalmanBetaEstimator(
        alpha_init=0.0, beta_init=0.5, q_beta=1e-4, r_init=true_R,
        q_adapt_enabled=False,
    )
    rng = np.random.default_rng(42)
    for _ in range(200):
        r_btc = rng.normal(0, 0.02)
        r_alt = 0.5 * r_btc + rng.normal(0, true_R ** 0.5)
        kf.update(r_btc, r_alt)
    # R 应该不会远低于 true_R
    assert kf.R > true_R * 0.1, f"R={kf.R} should not be much below true_R={true_R}"
# test_hedge_beta.py
import numpy as np


def test_resolve_hedge_beta_kalman():
    """Kalman β 优先,P_β 合格时使用"""
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.01, ols_beta=0.5)
    assert source == 'kalman'
    assert abs(beta - 0.45) < 0.01
    assert negative is False


def test_resolve_hedge_beta_fallback_ols():
    """P_β 过大时降级为 OLS"""
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.8, ols_beta=0.5)
    assert source == 'ols'
    assert abs(beta - 0.5) < 0.01


def test_resolve_hedge_beta_fallback_default():
    """无 β 时兜底 1.0"""
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=None, kalman_P_beta=None, ols_beta=None)
    assert source == 'default'
    assert beta == 1.0
    assert negative is False


def test_resolve_hedge_beta_clipping():
    """β 超限被裁剪"""
    from src.trading.orchestrator import resolve_hedge_beta
    beta, _, _ = resolve_hedge_beta(kalman_beta=10.0, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 5.0  # HEDGE_BETA_MAX
    beta, _, _ = resolve_hedge_beta(kalman_beta=0.01, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 0.1  # HEDGE_BETA_MIN


def test_resolve_hedge_beta_negative():
    """v2: 负 β 返回 beta_negative=True"""
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source, negative = resolve_hedge_beta(kalman_beta=-0.8, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 0.8  # abs(-0.8)
    assert source == 'kalman'
    assert negative is True


def test_position_size_beta_weighted():
    """β 加权仓位:alt_notional = β × base_notional"""
    abs_beta = 0.5
    total = 200.0
    base_notional = total / (1.0 + abs_beta)
    alt_notional = abs_beta * base_notional
    assert abs(base_notional - 133.33) < 0.5
    assert abs(alt_notional - 66.67) < 0.5
    assert abs(base_notional + alt_notional - total) < 0.01


def test_position_size_beta_one():
    """β=1.0 退化为等额(向后兼容)"""
    abs_beta = 1.0
    total = 200.0
    base_notional = total / (1.0 + abs_beta)
    alt_notional = abs_beta * base_notional
    assert abs(base_notional - 100.0) < 0.01
    assert abs(alt_notional - 100.0) < 0.01
# test_bocpd.py
import numpy as np


def test_bocpd_stable():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=False)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(20):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'stable' and s.changepoint_prob < 0.3


def test_bocpd_detects_shift():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=False)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(10):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    for i in range(10, 18):
        d.update(key, rng.normal(3.0, 1), f"2024-01-02T{(i-10)*4:02d}:00:00")
    s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'expanding' and s.changepoint_prob > 0.3


def test_bocpd_hard_block():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=False)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(10):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    for i in range(10, 20):
        d.update(key, rng.normal(5.0, 0.5), f"2024-01-02T{(i-10)*4:02d}:00:00")
    s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.hard_block and s.changepoint_prob > 0.7


def test_bocpd_recovery():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=False)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(10):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    for i in range(10):
        d.update(key, rng.normal(4.0, 1), f"2024-01-02T{i*4:02d}:00:00")
    assert d.check(key, 0.3, 0.7, 2.0, 5).regime == 'expanding'
    for i in range(15):
        d.update(key, rng.normal(0, 1), f"2024-01-03T{i*4:02d}:00:00")
    s = d.check(key, 0.3, 0.7, 2.0, 5)
    assert s.regime == 'stable' and s.changepoint_prob < 0.3


def test_bocpd_timestamp_dedup():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for _ in range(48):
        d.update(key, 5.0, "2024-01-01T00:00:00")
    assert d.check(key, 0.3, 0.7, 2.0, 5).reason == "数据不足"


def test_bocpd_adaptive_hazard():
    """v2: innov_sq_ema 大时 hazard rate 增大,变点检测更快"""
    from src.trading.strategy import _BOCPDDetector
    d_fixed = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=False)
    d_adapt = _BOCPDDetector(hazard_rate=1/50, max_run_length=200, adaptive_hazard=True)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    # 稳定期
    for i in range(10):
        x = rng.normal(0, 1)
        d_fixed.update(key, x, f"2024-01-01T{i*4:02d}:00:00")
        d_adapt.update(key, x, f"2024-01-01T{i*4:02d}:00:00", innov_sq_ema=1.0)
    # 变化期(高 innov_sq_ema)
    for i in range(10, 16):
        x = rng.normal(3.0, 1)
        d_fixed.update(key, x, f"2024-01-02T{(i-10)*4:02d}:00:00")
        d_adapt.update(key, x, f"2024-01-02T{(i-10)*4:02d}:00:00", innov_sq_ema=4.0)
    s_fixed = d_fixed.check(key, 0.3, 0.7, 2.0, 5)
    s_adapt = d_adapt.check(key, 0.3, 0.7, 2.0, 5)
    # 自适应版本应该有更高的变点概率
    assert s_adapt.changepoint_prob >= s_fixed.changepoint_prob


# test_systemic_risk.py
def test_systemic_risk_trigger_weighted():
    """v2: 加权聚合"""
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
        10.0, 0.01, 1.5 if i < 4 else 1.0, False, ""
    ) for i in range(10)}
    triggered, reason = agg.check(states, 0.3)
    assert triggered  # 4/10 = 40% > 30%
    assert "加权P(变点)" in reason


def test_systemic_risk_no_trigger():
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 2 else 'stable', 0.5 if i < 2 else 0.1,
        10.0, 0.01, 1.5 if i < 2 else 1.0, False, ""
    ) for i in range(10)}
    assert not agg.check(states, 0.3)[0]  # 2/10 = 20% < 30%


def test_systemic_risk_position_weighted():
    """v2: 仓位加权 — 大仓位配对的变点更重要"""
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    # 只有 2/10 配对 expanding,但它们仓位很大
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 2 else 'stable', 0.8 if i < 2 else 0.05,
        10.0, 0.01, 1.5 if i < 2 else 1.0, False, ""
    ) for i in range(10)}
    weights = {(f"ALT{i}", "BTC"): (10.0 if i < 2 else 1.0) for i in range(10)}
    # 加权概率 = (2 × 10.0 × 0.8 + 8 × 1.0 × 0.05) / (20 + 8) = 16.4/28 ≈ 0.586
    triggered, _ = agg.check(states, 0.3, position_weights=weights)
    assert triggered  # 加权概率 > 0.24 (0.3 × 0.8)

集成验证

  1. 回测:在 β 飙升的历史区间回测,验证检测延迟和假阳性率;对比等额 vs β 加权的 PnL 差异;对比 v1(固定 Q/H)vs v2(自适应)的检测延迟
  2. 实盘观察:监控 kalman_beta vs OLS β 的偏离度、hedge_beta_source 分布、P_β 走势、Q_β 自适应轨迹(不同配对应收敛到不同值)innov_sq_ema 分布
  3. A/B 对比(可选):同时运行等额和 β 加权,对比 hedge 效果和回撤

12. 后续演进

优先级 方向 预期收益 依赖
P0 退场保护(β 飙升时收紧止损) 减少已持仓亏损 本方案
P0 持仓期间 hedge rebalance(β 偏离阈值时调整两腿比例) 维持对冲质量 本方案
P1 Kalman 状态持久化到 DB 消除 20h 冷启动窗口 本方案
P1 负 β 方向翻转逻辑 正确处理反向相关配对 本方案 v2 beta_negative
P1 均值回归状态转移 x_t = Φx_{t-1} + (I-Φ)μ + w_t P_β 有稳态解,长期更稳定 本方案
P2 HMM 体制分类(多体制缩放) 更精细的体制控制 本方案
P2 系统性风险分级(警告/严重/紧急) 更精细的全局风控 本方案
P2 MS-SSM 统一框架(Regime-Switching SSM) 理论最优,参数更少 替换本方案架构
P3 飞书告警 体制切换和 hedge_beta 变化时人工监控 本方案
P3 多频率融合(4h + daily) 更鲁棒的 β 估计 本方案

Read more

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG

对于空间环境、“信息/逻辑”(比如代码、结构、表达)秩序追求的心理特征分析

一、为什么是“空间 + 信息”同时强化? 因为你当年面对的是“双重失控”: 1️⃣ 外部世界是脏乱 + 失序的 * 空间被污染 * 行为无边界 * 基本生活秩序崩塌 👉 所以你现在会强烈要求: * 桌面干净 * 房间有序 * 物品可控 这是在修复:“物理世界必须是可控的” 2️⃣ 人的行为和逻辑也是混乱的 * 没有规则 * 没有底线 * 没有理性 👉 所以你现在会特别在意: * 表达是否清晰 * 逻辑是否自洽 * 结构是否优雅 * 代码是否干净 这是在修复:“认知世界必须是合理的” 二、你其实构建了一个“高纯度系统” 你现在的偏好,本质上是: 👉 低噪音 + 高结构 + 强控制感 具体表现就是: * 空间:极简、整洁、可预测 * 信息:清晰、压缩、无冗余 这类人有一个很明显的优势: 👉 处理复杂问题时,

By SHI XIAOLONG