Beta 体制自适应过滤器设计方案(精简版)(Beta值用于计算配对交易的开仓比例)

Kalman Beta 双用途设计方案

1. 问题背景

当前系统在 Beta 值的使用上存在两个结构性缺陷:

缺陷 1:Beta 估计滞后

OLS BETA_WINDOW=100 固定窗口(≈17天@4h),β 结构性变化后系统需要一周以上才能感知:

T+0h    BTC 回暖,β 从 0.5 开始上升
T+12h   β 实际 2.0,OLS β̂ 仍 ≈ 0.6 → 误判为均值回归机会
T+24h   β 实际 4.0,OLS β̂ ≈ 0.8 → 继续误入场
T+7d    β 实际 8.0,OLS β̂ ≈ 2.0 → 协整假设彻底失效

核心问题:系统用滞后 β 计算的 z4h 产生入场信号,实际上反映的是 β 正在变化,而非 spread 正在回归

缺陷 2:仓位比例未使用 Beta

risk_manager.calculate_position_size() 对两腿使用等额名义价值

# 当前实现(risk_manager.py:164-170)
alt_size = position_usd / alt_price      # 两腿等额
base_size = position_usd / base_price    # 与 β 无关

配对交易的对冲比例应为 alt_notional = β × base_notional。等额开仓意味着:

  • β=0.5 时,Alt 腿过重 → 暴露于 Alt 的非系统性风险
  • β=2.0 时,Alt 腿过轻 → 对冲不足,价差波动被放大
  • β 变化时,无法调整比例 → 持仓期间 hedge 持续偏离

当前系统弱点汇总

组件 问题 影响
analysis_core.py BETA_WINDOW=100 固定窗口 OLS β 估计滞后 ≈17天
risk_manager.py 等额名义价值,未使用 β 对冲比例错误
strategy.py adaptive_threshold=3.0 固定 β 飙升导致持续突破阈值
momentum_filter.py 只检测单腿价格趋势 不检测 spread/beta 体制切换

2. 方案概述

核心设计:向量 Kalman Filter 估计时变 [α, β],其输出 同时服务两个用途

                   ┌─────────────────────────────────────┐
                   │  VectorKalmanBetaEstimator (4h)     │
                   │  输入: r_btc, r_alt (对数收益率)      │
                   │  输出: kalman_beta, innovation, P_β  │
                   └──────────────┬──────────────────────┘
                                  │
                  ┌───────────────┼───────────────┐
                  ▼                               ▼
         用途 1: 体制检测                    用途 2: Hedge Ratio
         ┌─────────────────┐            ┌─────────────────────┐
         │ BOCPD 变点检测    │            │ 仓位比例计算          │
         │ 输入: innovation │            │ 输入: kalman_beta    │
         │ 输出: 变点概率    │            │ 输出: alt/base 数量比 │
         └────────┬────────┘            └──────────┬──────────┘
                  │                                │
                  ▼                                ▼
         入场信号过滤                         Beta 加权开仓
         (硬拦截/阈值缩放)              (alt_notional = β × base_notional)

为什么 4h 频率足够

β 是两个资产之间的结构性关系(同赛道、资金流向、基本面联动),变化周期在天~周级别。

方法 响应 β 结构性变化 抗日内噪声 适用场景
OLS 100×4h ~7-10 天 当前系统(过于滞后)
4h Kalman (q_β=1e-4) 8-16 小时 体制检测 + Hedge Ratio
5m Kalman ~15-30 分钟 弱(追噪声) 不适用于 β 估计

4h Kalman 的 q_β=1e-4 意味着:单步 β 漂移先验 ≈0.01,日漂移 ≈0.024,周漂移 ≈0.065。足够追踪真实的结构性变化,同时不会被日内噪声带偏。

四层架构

组件 功能
第一层 向量 Kalman Filter 估计时变 [α, β],输出 kalman_betanormalized_innovation
第二层 BOCPD 变点检测 监控 innovation 分布变化,输出变点概率 → 入场信号过滤
第三层 跨配对系统性风险聚合 统计所有配对体制状态,识别系统性事件
第四层 Beta 加权仓位计算 kalman_beta 计算两腿名义比例,纠正等额对冲偏差

3. 详细设计

3.1 向量 Kalman Filter 时变 [α, β] 估计

数学模型

状态方程([α, β] 缓慢演化):

x_t = x_{t-1} + w_t,    w_t ~ N(0, Q)

其中 x_t = [α_t, β_t]'
     Q = [[q_α,  0 ],     对角阵,α/β 独立演化
          [ 0,  q_β]]

观测方程(对数收益率关系):

r_alt_t = H_t × x_t + v_t,    v_t ~ N(0, R)

其中 H_t = [1, r_btc_t]
     r_alt_t = log(alt_t / alt_{t-1})
     r_btc_t = log(btc_t / btc_{t-1})

递推公式

预测步:
  x̂_t|t-1 = x̂_{t-1}
  P_t|t-1  = P_{t-1} + Q                           (2×2)

Innovation:
  ε_t = r_alt_t - H_t × x̂_t|t-1                   (标量)
  S_t = H_t × P_t|t-1 × H_t' + R                  (标量)

Huber Clipping (c=3.0):
  ε̃_t = clip(ε_t, -c×√S_t, +c×√S_t)

Kalman 增益:
  K_t = P_t|t-1 × H_t' / S_t                       (2×1)

更新步:
  x̂_t = x̂_t|t-1 + K_t × ε̃_t

Joseph 稳定形式:
  A = I - K_t × H_t                                 (2×2)
  P_t = A × P_t|t-1 × A' + K_t × R × K_t'          (保证半正定)

Sage-Husa R 自适应:
  d_t = (1 - b) / (1 - b^(t+1)),   b ∈ (0,1)
  R_t = (1 - d_t) × R_{t-1} + d_t × (ε_t² - H_t × P_t|t-1 × H_t')
  R_t = max(R_t, R_floor)

关键输出信号及其用途

信号 用途 1: 体制检测 用途 2: Hedge Ratio
beta (x̂[1]) β 趋势监控 直接作为 hedge ratio
alpha (x̂[0]) 吸收独立漂移,避免污染 β
P_beta (P[1,1]) 不确定性度量 hedge ratio 置信度(P_β 过大时降级为 OLS β)
ε/√S (normalized_innovation) BOCPD 输入

参数

参数 符号 默认值 含义 调优
β 过程噪声 q_β 1e-4 β 每 4h 变化方差先验 ↑快追踪 ↓稳定
α 过程噪声 q_α 1e-5 α 每 4h 变化方差先验(比 β 慢) 通常不调
R 遗忘因子 b 0.95 Sage-Husa 遗忘因子 ↑平滑 ↓快适应
R 下限 R_floor 1e-8 防止 R 退化为零
Innovation 截断 c 3.0 Huber 截断(σ 倍数) ↓鲁棒 ↑灵敏
初始 [α, β] OLS 估计 用现有 OLS 初始化
初始 P P₀ diag(0.1, 1.0) 初始不确定性

q_β = 1e-4 → 日 β 漂移 ≈ 0.024,周 ≈ 0.065,合理覆盖正常变化。
q_α = q_β/10 → α 变化比 β 慢一个量级,避免争夺解释力。

文件:src/config.py

# ═══ Kalman Filter Beta 估计参数 ═══
KALMAN_Q_BETA: float = 1e-4
KALMAN_Q_ALPHA: float = 1e-5
KALMAN_P0_ALPHA: float = 0.1
KALMAN_P0_BETA: float = 1.0
KALMAN_R_FORGET: float = 0.95
KALMAN_R_FLOOR: float = 1e-8
KALMAN_CLIP_SIGMA: float = 3.0

# ═══ Hedge Ratio 参数 ═══
HEDGE_BETA_MIN: float = 0.1     # β 下限保护(防止极端比例)
HEDGE_BETA_MAX: float = 5.0     # β 上限保护
HEDGE_BETA_P_MAX: float = 0.5   # P_β 超过此值时降级为 OLS β(Kalman 不确定性过高)

文件:src/utils/analysis/analysis_core.py

新增类VectorKalmanBetaEstimator

import numpy as np


class VectorKalmanBetaEstimator:
    """向量 Kalman Filter 时变 [α, β] 联合估计器

    状态空间模型:
        状态方程: x_t = x_{t-1} + w_t,    w_t ~ N(0, Q),  x = [α, β]'
        观测方程: r_alt_t = [1, r_btc_t] × x_t + v_t,     v_t ~ N(0, R)

    特性:
        - 二维状态 [α, β]:截距吸收独立漂移,消除虚假 β 信号
        - Joseph 稳定形式:P 更新保证半正定
        - Huber innovation clipping:抗厚尾分布
        - Sage-Husa 自适应 R:在线校准噪声

    双用途输出:
        - kalman_beta → BOCPD 体制检测(via innovation)+ hedge ratio(直接使用)
        - P_beta → hedge ratio 置信度(过大时降级为 OLS β)

    每根新 4h K线调用 update() 一次。
    """

    def __init__(
        self,
        alpha_init: float,
        beta_init: float,
        q_alpha: float = 1e-5,
        q_beta: float = 1e-4,
        r_init: float = 1e-2,
        p0_alpha: float = 0.1,
        p0_beta: float = 1.0,
        r_forget: float = 0.95,
        r_floor: float = 1e-8,
        clip_sigma: float = 3.0,
    ):
        self.x = np.array([alpha_init, beta_init], dtype=np.float64)
        self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
        self.P = np.diag([p0_alpha, p0_beta]).astype(np.float64)
        self.R = float(r_init)
        self._r_forget = r_forget
        self._r_floor = r_floor
        self._clip_sigma = clip_sigma
        self._n_updates = 0

    @property
    def alpha(self) -> float:
        return float(self.x[0])

    @property
    def beta(self) -> float:
        return float(self.x[1])

    @property
    def beta_variance(self) -> float:
        return float(self.P[1, 1])

    def update(self, r_btc: float, r_alt: float) -> dict:
        """接收一对新的对数收益率,更新 [α, β] 估计

        Returns:
            dict with keys: alpha, beta, P_beta, P_alpha,
                  innovation, normalized_innovation, clipped,
                  kalman_gain_beta, R
        """
        # 预测步
        P_pred = self.P + self.Q
        H = np.array([1.0, r_btc], dtype=np.float64)

        # Innovation
        innovation = r_alt - H @ self.x
        S = float(H @ P_pred @ H) + self.R
        S = max(S, 1e-15)

        sqrt_S = S ** 0.5
        norm_innov = innovation / sqrt_S

        # Huber clipping
        clipped = False
        innovation_for_update = innovation
        if abs(norm_innov) > self._clip_sigma:
            innovation_for_update = self._clip_sigma * sqrt_S * (
                1.0 if innovation > 0 else -1.0
            )
            clipped = True

        # Kalman 增益 + 更新
        K = (P_pred @ H) / S
        self.x = self.x + K * innovation_for_update

        # Joseph 稳定形式
        I2 = np.eye(2)
        A = I2 - np.outer(K, H)
        self.P = A @ P_pred @ A.T + self.R * np.outer(K, K)

        # Sage-Husa R 自适应
        self._n_updates += 1
        if self._n_updates > 10:
            b = self._r_forget
            d_t = (1.0 - b) / (1.0 - b ** (self._n_updates + 1))
            r_sample = max(innovation * innovation - float(H @ P_pred @ H), self._r_floor)
            self.R = (1.0 - d_t) * self.R + d_t * r_sample
            self.R = max(self.R, self._r_floor)

        return {
            'alpha': float(self.x[0]),
            'beta': float(self.x[1]),
            'P_beta': float(self.P[1, 1]),
            'P_alpha': float(self.P[0, 0]),
            'innovation': innovation,
            'normalized_innovation': norm_innov,
            'clipped': clipped,
            'kalman_gain_beta': float(K[1]),
            'R': self.R,
        }

    def state_dict(self) -> dict:
        return {
            'x': self.x.tolist(), 'P': self.P.tolist(), 'Q': self.Q.tolist(),
            'R': self.R, '_r_forget': self._r_forget, '_r_floor': self._r_floor,
            '_clip_sigma': self._clip_sigma, '_n_updates': self._n_updates,
        }

    @classmethod
    def from_state_dict(cls, d: dict) -> 'VectorKalmanBetaEstimator':
        obj = cls.__new__(cls)
        obj.x = np.array(d['x'], dtype=np.float64)
        obj.P = np.array(d['P'], dtype=np.float64)
        obj.Q = np.array(d['Q'], dtype=np.float64)
        obj.R = d['R']
        obj._r_forget = d['_r_forget']
        obj._r_floor = d['_r_floor']
        obj._clip_sigma = d['_clip_sigma']
        obj._n_updates = d['_n_updates']
        return obj

改动函数:calculate_cointegration_params_dual_window()

在现有 OLS 之后,新增 Kalman Filter 输出:

def calculate_cointegration_params_dual_window(
    base_klines, alt_klines,
    beta_window=None, zscore_window=None,
    kalman_state: dict | None = None,  # [新增]
):
    # ... 现有 OLS 逻辑不变 ...

    # ═══ 新增:向量 Kalman Filter 更新 ═══
    kalman_result = None
    kalman_state_out = kalman_state

    if len(aligned) >= 2:
        r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
        r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])

        if kalman_state is not None:
            kf = VectorKalmanBetaEstimator.from_state_dict(kalman_state)
        else:
            ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
            kf = VectorKalmanBetaEstimator(
                alpha_init=alpha_ols if use_alpha else 0.0,
                beta_init=beta_ols,
                q_alpha=KALMAN_Q_ALPHA, q_beta=KALMAN_Q_BETA,
                r_init=max(ols_residual_var, 1e-6),
                p0_alpha=KALMAN_P0_ALPHA, p0_beta=KALMAN_P0_BETA,
                r_forget=KALMAN_R_FORGET, r_floor=KALMAN_R_FLOOR,
                clip_sigma=KALMAN_CLIP_SIGMA,
            )

        kalman_result = kf.update(r_btc, r_alt)
        kalman_state_out = kf.state_dict()

    return {
        # ... 现有字段不变 ...
        'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
        'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
        'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
        'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
        'kalman_clipped': kalman_result['clipped'] if kalman_result else False,
        'kalman_state': kalman_state_out,
    }

改动函数:analyze_pair_advanced()

透传 kalman_state 并提取 Kalman 输出:

def analyze_pair_advanced(
    base_klines, alt_klines,
    beta_window=None, zscore_window=None,
    enable_health_monitor=True,
    stats_period_key=None,
    kalman_state: dict | None = None,  # [新增]
) -> Dict:
    # ...

    # 3. New方法:双窗口OLS协整分析 + Kalman
    coint_new = calculate_cointegration_params_dual_window(
        base_klines, alt_klines, beta_window, zscore_window,
        kalman_state=kalman_state,  # [新增] 透传
    )
    if coint_new:
        result['cointegration_new'] = {
            # ... 现有字段 ...
            'kalman_beta': coint_new.get('kalman_beta'),          # [新增]
            'kalman_P_beta': coint_new.get('kalman_P_beta'),      # [新增]
            'kalman_innovation': coint_new.get('kalman_innovation'),  # [新增]
            'kalman_state': coint_new.get('kalman_state'),        # [新增]
        }

    # ...

改动函数:analyze_multi_period()

在返回值中透传 Kalman 输出:

def analyze_multi_period(...) -> Optional[Dict]:
    # ...

    return {
        # ... 现有字段 ...
        'kalman_beta': details.get(('4h', '60d'), {}).get(
            'cointegration_new', {}).get('kalman_beta'),
        'kalman_P_beta': details.get(('4h', '60d'), {}).get(
            'cointegration_new', {}).get('kalman_P_beta', 1.0),
        'kalman_innovation': details.get(('4h', '60d'), {}).get(
            'cointegration_new', {}).get('kalman_innovation', 0.0),
        'kalman_state': details.get(('4h', '60d'), {}).get(
            'cointegration_new', {}).get('kalman_state'),
    }

3.2 BOCPD 贝叶斯在线变点检测(用途 1)

算法原理

维护 run length(距上次变点的步数)的后验分布

P(r_t | data_1:t)

r_t = 0 → 当前时刻刚发生变点
r_t = n → 距上次变点已过 n 步

递推公式

1. 预测概率(当前观测在 run length = r 下的似然):
   π_t(r) = P(x_t | x_{t-r:t-1})

2. Growth(无变点):
   P(r_t = r+1, data) = P(r_{t-1} = r, data) × π_t(r) × (1 - H)

3. Changepoint(变点,run 重置):
   P(r_t = 0, data) = Σ_r P(r_{t-1} = r, data) × π_t(r) × H

4. 归一化

其中 H = hazard rate(先验变点概率,默认 1/50 ≈ 每 ~8天一次)。

观测模型:监控 Kalman normalized_innovation 序列,采用 正态-逆Gamma 共轭先验,预测分布为 Student-t

充分统计量(每个 run 维护):
  κ_r = κ₀ + n_r
  μ̂_r = (κ₀ × μ₀ + Σ x_i) / κ_r
  α_r = α₀ + n_r / 2
  β_r = β₀ + 0.5 × (Σ x_i² - κ_r × μ̂_r² + κ₀ × μ₀²)

预测分布: Student-t(2α_r), location=μ̂_r, scale²=β_r(κ_r+1)/(α_r κ_r)

文件:src/trading/config.py

StrategyParams 新增字段:

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # Beta 体制自适应过滤器(Vector Kalman + BOCPD)
    beta_regime_enabled: bool = True
    beta_regime_hazard_rate: float = 1/50    # 先验变点概率 ≈ 每50根4h K线(~8天)一次
    beta_regime_max_run_length: int = 200    # run length 截断(内存控制)
    beta_regime_soft_prob: float = 0.3       # P(变点) > 此值 → 开始缩放阈值
    beta_regime_hard_prob: float = 0.7       # P(变点) > 此值 → 硬拦截
    beta_regime_scale_max: float = 2.0       # 阈值最大缩放倍数
    beta_regime_warmup: int = 5              # 最少 N 根 4h K线才开始判定

文件:src/trading/strategy.py

_BetaRegimeState 数据类
@dataclass
class _BetaRegimeState:
    """Beta 体制检测结果"""
    regime: str               # 'stable' | 'expanding'
    changepoint_prob: float   # 变点概率 P(r_t < warmup)
    expected_run_length: float
    kalman_P_beta: float
    threshold_scale: float    # 阈值缩放因子 (>=1.0)
    hard_block: bool
    reason: str
_BOCPDDetector
class _BOCPDDetector:
    """Bayesian Online Changepoint Detection (Adams & MacKay, 2007)

    监控 Kalman normalized_innovation 的分布变化,
    通过 run length 后验分布检测 β 的结构性变点。

    内存: O(max_run_length) per pair
    计算: O(max_run_length) per update
    """

    # Normal-Inverse-Gamma 先验超参
    _MU0 = 0.0
    _KAPPA0 = 1.0
    _ALPHA0 = 1.0
    _BETA0 = 0.5

    def __init__(self, hazard_rate: float = 1/50, max_run_length: int = 200):
        self._H = hazard_rate
        self._max_rl = max_run_length
        self._run_length_probs: dict[PairKey, np.ndarray] = {}
        self._suf_stats: dict[PairKey, dict] = {}
        self._last_kline_time: dict[PairKey, str] = {}
        self._n_updates: dict[PairKey, int] = {}

    def update(self, key: PairKey, normalized_innovation: float,
               kline_time: str) -> None:
        """每 5m tick 调用,基于 kline_time 去重"""
        if normalized_innovation is None:
            return
        if self._last_kline_time.get(key) == kline_time:
            return
        self._last_kline_time[key] = kline_time

        x = normalized_innovation

        # 初始化
        if key not in self._run_length_probs:
            self._run_length_probs[key] = np.array([1.0])
            self._suf_stats[key] = {
                'kappa': np.array([self._KAPPA0]),
                'mu': np.array([self._MU0]),
                'alpha': np.array([self._ALPHA0]),
                'beta': np.array([self._BETA0]),
            }
            self._n_updates[key] = 0

        self._n_updates[key] = self._n_updates.get(key, 0) + 1

        probs = self._run_length_probs[key]
        ss = self._suf_stats[key]
        n_rl = len(probs)

        # Step 1: 每个 run length 下的 Student-t 预测概率
        kappa, mu, alpha, beta = ss['kappa'], ss['mu'], ss['alpha'], ss['beta']
        df = 2.0 * alpha
        scale = np.sqrt(np.maximum(beta * (kappa + 1.0) / (alpha * kappa), 1e-15))
        pred_probs = self._student_t_pdf(x, df, mu, scale)

        # Step 2-3: Growth + Changepoint
        growth_probs = probs * pred_probs * (1.0 - self._H)
        cp_prob = np.sum(probs * pred_probs * self._H)

        # Step 4: 组合 + 归一化
        new_probs = np.empty(n_rl + 1)
        new_probs[0] = cp_prob
        new_probs[1:] = growth_probs
        total = np.sum(new_probs)
        if total > 0:
            new_probs /= total
        else:
            new_probs = np.zeros(n_rl + 1)
            new_probs[0] = 1.0

        # Step 5: 更新充分统计量
        new_kappa = np.empty(n_rl + 1)
        new_mu = np.empty(n_rl + 1)
        new_alpha = np.empty(n_rl + 1)
        new_beta = np.empty(n_rl + 1)

        new_kappa[0], new_mu[0] = self._KAPPA0, self._MU0
        new_alpha[0], new_beta[0] = self._ALPHA0, self._BETA0

        new_kappa[1:] = kappa + 1.0
        new_mu[1:] = (kappa * mu + x) / (kappa + 1.0)
        new_alpha[1:] = alpha + 0.5
        new_beta[1:] = beta + kappa * (x - mu) ** 2 / (2.0 * (kappa + 1.0))

        # Step 6: 截断
        if len(new_probs) > self._max_rl:
            new_probs = new_probs[:self._max_rl]
            new_kappa = new_kappa[:self._max_rl]
            new_mu = new_mu[:self._max_rl]
            new_alpha = new_alpha[:self._max_rl]
            new_beta = new_beta[:self._max_rl]
            total = np.sum(new_probs)
            if total > 0:
                new_probs /= total

        self._run_length_probs[key] = new_probs
        self._suf_stats[key] = {
            'kappa': new_kappa, 'mu': new_mu,
            'alpha': new_alpha, 'beta': new_beta,
        }

    @staticmethod
    def _student_t_pdf(x: float, df: np.ndarray, loc: np.ndarray,
                       scale: np.ndarray) -> np.ndarray:
        z = (x - loc) / scale
        from scipy.special import gammaln
        log_pdf = (
            gammaln((df + 1) / 2) - gammaln(df / 2)
            - 0.5 * np.log(df * np.pi) - np.log(scale)
            - (df + 1) / 2 * np.log(1 + z * z / df)
        )
        return np.exp(log_pdf)

    def check(self, key: PairKey, soft_prob: float, hard_prob: float,
              scale_max: float, warmup: int) -> _BetaRegimeState:
        """检测当前 Beta 体制状态

        变点概率定义: P(r_t < warmup) — 最近 warmup 步内是否有变点
        """
        n = self._n_updates.get(key, 0)
        if n < warmup:
            return _BetaRegimeState('stable', 0.0, float(n), 0.0, 1.0, False, "数据不足")

        probs = self._run_length_probs.get(key)
        if probs is None or len(probs) == 0:
            return _BetaRegimeState('stable', 0.0, 0.0, 0.0, 1.0, False, "无数据")

        cp_prob = float(np.sum(probs[:min(warmup, len(probs))]))
        expected_rl = float(np.sum(np.arange(len(probs)) * probs))

        if cp_prob >= hard_prob:
            return _BetaRegimeState(
                'expanding', cp_prob, expected_rl, 0.0, scale_max, True,
                f"Beta硬拦截: P(变点)={cp_prob:.3f}>={hard_prob} E[rl]={expected_rl:.1f}"
            )

        if cp_prob >= soft_prob:
            import math
            midpoint = (soft_prob + hard_prob) / 2
            steepness = 6.0 / max(hard_prob - soft_prob, 0.01)
            t = 1.0 / (1.0 + math.exp(-steepness * (cp_prob - midpoint)))
            scale = 1.0 + t * (scale_max - 1.0)
            return _BetaRegimeState(
                'expanding', cp_prob, expected_rl, 0.0, scale, False,
                f"Beta缩放: P(变点)={cp_prob:.3f} scale={scale:.2f} E[rl]={expected_rl:.1f}"
            )

        return _BetaRegimeState(
            'stable', cp_prob, expected_rl, 0.0, 1.0, False,
            f"Beta稳定: P(变点)={cp_prob:.3f} E[rl]={expected_rl:.1f}"
        )

    def cleanup_pair(self, key: PairKey) -> None:
        self._run_length_probs.pop(key, None)
        self._suf_stats.pop(key, None)
        self._last_kline_time.pop(key, None)
        self._n_updates.pop(key, None)

3.3 跨配对系统性风险聚合

文件:src/trading/strategy.py

class _SystemicRiskAggregator:
    """统计所有配对的 BOCPD 体制状态,
    超过 systemic_threshold 比例的配对处于 expanding → 触发全局拦截。
    无自身状态,每次实时计算。
    """

    def __init__(self, enabled: bool = True):
        self._enabled = enabled

    def check(self, pair_states: dict[PairKey, _BetaRegimeState],
              systemic_threshold: float = 0.3) -> tuple[bool, str]:
        if not self._enabled or not pair_states:
            return False, ""

        total = len(pair_states)
        n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
        ratio = n_expanding / total
        avg_cp = sum(s.changepoint_prob for s in pair_states.values()) / total

        if ratio >= systemic_threshold:
            return True, (
                f"系统性风险: {n_expanding}/{total} ({ratio:.0%}) 配对扩张态, "
                f"平均P(变点)={avg_cp:.3f}"
            )
        return False, ""

3.4 Beta 加权仓位计算(用途 2)

设计原理

标准配对交易的对冲方程:

spread_t = log(P_alt_t) - β × log(P_base_t) - α

要使 spread 对 P_base 的变动无敞口,两腿名义价值须满足:

alt_notional = |β| × base_notional

当前系统使用 alt_notional = base_notional(等额),等价于隐含 β=1,对所有配对都错。

hedge_beta 选择逻辑

def resolve_hedge_beta(
    kalman_beta: float | None,
    kalman_P_beta: float | None,
    ols_beta: float | None,
    p_beta_max: float = HEDGE_BETA_P_MAX,
    beta_min: float = HEDGE_BETA_MIN,
    beta_max: float = HEDGE_BETA_MAX,
) -> tuple[float, str]:
    """选择用于仓位计算的 hedge_beta

    优先 Kalman β,不确定性过高时降级为 OLS β,最后兜底 β=1.0

    Returns:
        (hedge_beta, source) — source: 'kalman' | 'ols' | 'default'
    """
    # 优先使用 Kalman β
    if kalman_beta is not None and kalman_P_beta is not None:
        if kalman_P_beta <= p_beta_max:
            beta = np.clip(abs(kalman_beta), beta_min, beta_max)
            return float(beta), 'kalman'
        # P_β 过大:Kalman 不确定性高(冷启动或体制切换中),降级
        logger.debug(f"Kalman P_β={kalman_P_beta:.4f} > {p_beta_max},降级为 OLS β")

    # 降级使用 OLS β
    if ols_beta is not None:
        beta = np.clip(abs(ols_beta), beta_min, beta_max)
        return float(beta), 'ols'

    # 兜底
    return 1.0, 'default'

文件:src/trading/risk_manager.py

改动 calculate_position_size() 增加 hedge_beta 参数:

def calculate_position_size(
    self,
    signal: PairTradeSignal,
    alt_price: float,
    base_price: float = 0.0,
    available_balance: float = 0.0,
    hedge_beta: float = 1.0,        # [新增] hedge ratio
    hedge_beta_source: str = '',    # [新增] 来源标记 ('kalman'/'ols'/'default')
) -> tuple[float, float]:
    """计算仓位大小(Beta 加权)

    Beta 加权逻辑:
        base_notional = total_position / (1 + |β|)
        alt_notional  = |β| × base_notional
        总名义价值 = base_notional + alt_notional = total_position(保持不变)

    当 β=1.0 时退化为等额(向后兼容)。
    """
    # ... 现有 base_usd / 信号强度缩放 / 上限检查 / 余额检查 逻辑不变 ...

    # 转换为币种数量(Beta 加权)
    alt_size = 0.0
    base_size = 0.0

    if self._config.pair_mode == "pair" and base_price > 0 and alt_price > 0:
        abs_beta = max(abs(hedge_beta), 0.1)
        total_position = position_usd * 2  # 两腿总名义(原逻辑中每腿 position_usd)

        base_notional = total_position / (1.0 + abs_beta)
        alt_notional = abs_beta * base_notional

        alt_size = alt_notional / alt_price
        base_size = base_notional / base_price

        logger.info(
            f"仓位计算(β加权): 基础=${base_usd:.0f} × 缩放={scale} | "
            f"β={hedge_beta:.3f}({hedge_beta_source}) | "
            f"Alt: {alt_size:.2f} × ${alt_price:.4f} ≈ ${alt_notional:.0f} | "
            f"Base: {base_size:.2f} × ${base_price:.4f} ≈ ${base_notional:.0f} | "
            f"比例 {alt_notional/base_notional:.2f}:1"
        )
    elif alt_price > 0:
        alt_size = position_usd / alt_price
        logger.info(
            f"仓位计算(单腿): 基础=${base_usd:.0f} × 缩放={scale} = ${position_usd:.0f} | "
            f"开仓: {alt_size:.2f} × ${alt_price:.4f}"
        )

    return alt_size, base_size

β 加权的名义分配数学

β 值 base_notional alt_notional 比例
0.3 $154 $46 0.3:1
0.5 $133 $67 0.5:1
1.0 $100 $100 1.0:1(等额,向后兼容)
2.0 $67 $133 2.0:1
3.0 $50 $150 3.0:1

(以每腿 $100 基准,total_position=$200 为例)

文件:src/trading/models.py

PairTradeSignal 新增字段:

@dataclass
class PairTradeSignal:
    # ... 现有字段 ...
    hedge_beta: float = 1.0              # [新增] 用于仓位比例的 β
    hedge_beta_source: str = 'default'   # [新增] β 来源 ('kalman'/'ols'/'default')

PairPosition 新增字段:

@dataclass
class PairPosition:
    # ... 现有字段 ...
    entry_hedge_beta: float = 1.0        # [新增] 入场时的 hedge β(用于复盘分析)

3.5 编排层集成

文件:src/trading/orchestrator.py

新增状态_kalman_states: dict[PairKey, dict]

class TradingOrchestrator:
    def __init__(self, ...):
        # ... 现有 ...
        self._kalman_states: dict[PairKey, dict] = {}  # [新增] Kalman 状态持久化

process_analysis() 改动:从 multi_period_result 提取 Kalman 输出,传给 strategy.process_tick()on_entry_signal()

def process_analysis(self, symbol, z4h, multi_period_result, timestamp, ...):
    # ... 现有输入验证 ...

    # [新增] 提取 Kalman 输出
    kalman_beta = multi_period_result.get('kalman_beta')
    kalman_P_beta = multi_period_result.get('kalman_P_beta')
    kalman_innovation = multi_period_result.get('kalman_innovation')

    # [新增] 更新 Kalman 状态缓存
    kalman_state_new = multi_period_result.get('kalman_state')
    if kalman_state_new is not None:
        self._kalman_states[(symbol, base_symbol)] = kalman_state_new

    entry_signal, exit_signal = self._strategy.process_tick(
        symbol, base_symbol, z4h, timestamp,
        kline_time=kline_time, latest_price=price_for_log,
        kalman_innovation=kalman_innovation,  # [新增] 用途 1
        kalman_P_beta=kalman_P_beta,          # [新增] 用途 1
        alt_ohlcv=alt_ohlcv, base_ohlcv=base_ohlcv,
    )

    # 处理入场信号
    if entry_signal is not None:
        # [新增] 计算 hedge_beta(用途 2)
        ols_beta = multi_period_result.get('details', {}).get(
            ('4h', '60d'), {}).get('cointegration_new', {}).get('beta')
        hedge_beta, hedge_source = resolve_hedge_beta(
            kalman_beta, kalman_P_beta, ols_beta
        )

        self.on_entry_signal(
            symbol, multi_period_result,
            latest_alt_price=price_for_log,
            direction=entry_signal.direction,
            adaptive_z=entry_signal.adaptive_z,
            hedge_beta=hedge_beta,           # [新增]
            hedge_beta_source=hedge_source,  # [新增]
        )

on_entry_signal() 改动:传递 hedge_beta 到信号:

def on_entry_signal(
    self, symbol, multi_period_result,
    latest_alt_price=None, avg_zscore_4h=None,
    direction="", signal_strength="medium", adaptive_z=0.0,
    l2_snapshot=None,
    hedge_beta: float = 1.0,            # [新增]
    hedge_beta_source: str = 'default', # [新增]
) -> bool:
    # ...
    signal = PairTradeSignal(
        # ... 现有字段 ...
        hedge_beta=hedge_beta,               # [新增]
        hedge_beta_source=hedge_beta_source,  # [新增]
    )
    # ...

文件:src/trading/position_manager.py

_open_position_inner() 改动:传递 hedge_beta 到 risk_manager:

def _open_position_inner(self, signal: PairTradeSignal, adaptive_z: float = 0.0):
    # ... 现有价格获取 ...

    alt_size, base_size = self._risk_manager.calculate_position_size(
        signal, alt_price, base_price, available_balance,
        hedge_beta=signal.hedge_beta,              # [新增]
        hedge_beta_source=signal.hedge_beta_source,  # [新增]
    )

    position = PairPosition(
        # ... 现有字段 ...
        entry_hedge_beta=signal.hedge_beta,  # [新增] 记录入场 β
    )

3.6 策略层集成(用途 1)

文件:src/trading/strategy.py

__init__

self._bocpd = _BOCPDDetector(
    hazard_rate=default_params.beta_regime_hazard_rate,
    max_run_length=default_params.beta_regime_max_run_length,
)
self._systemic_aggregator = _SystemicRiskAggregator(enabled=True)
self._all_pair_states: dict[PairKey, _BetaRegimeState] = {}

process_tick() 签名新增

def process_tick(
    self, symbol, base_symbol, z4h, timestamp,
    kline_time=None, latest_price=None,
    kalman_innovation: float | None = None,  # [新增] 用途 1
    kalman_P_beta: float | None = None,      # [新增] 用途 1
    alt_ohlcv=None, base_ohlcv=None,
) -> tuple[EntrySignal | None, ExitSignal | None]:

_process_tick_unlocked 新 K 线更新块

if is_new_candle:
    # ... 现有: Welford 更新, EMA 更新 ...
    if kalman_innovation is not None and kline_time is not None:
        self._bocpd.update(key, kalman_innovation, str(kline_time))

_check_entry() — z4h 过滤之后、方向判断之前

# ── [新增] Beta 体制检查 ──
if params.beta_regime_enabled:
    beta_state = self._bocpd.check(
        key,
        soft_prob=params.beta_regime_soft_prob,
        hard_prob=params.beta_regime_hard_prob,
        scale_max=params.beta_regime_scale_max,
        warmup=params.beta_regime_warmup,
    )
    self._all_pair_states[key] = beta_state

    if beta_state.hard_block:
        logger.info(f"Beta体制硬拦截 | {pair_label} | {beta_state.reason} | "
                    f"az={adaptive_z:+.4f} z4h={z4h:+.4f}")
        return None

    is_systemic, systemic_reason = self._systemic_aggregator.check(self._all_pair_states)
    if is_systemic:
        logger.info(f"系统性风险拦截 | {pair_label} | {systemic_reason}")
        return None

    threshold_scale = beta_state.threshold_scale
else:
    beta_state = None
    threshold_scale = 1.0

# ── 方向判断(应用 Beta 缩放)──
threshold = params.adaptive_threshold * threshold_scale
if adaptive_z < -threshold:
    direction = 'long'
elif adaptive_z > threshold:
    direction = 'short'
else:
    if threshold_scale > 1.01:
        logger.info(f"Beta体制缩放拦截 | {pair_label} | "
                    f"az={adaptive_z:+.4f} 有效阈值={threshold:.2f} "
                    f"(原始={params.adaptive_threshold} ×{threshold_scale:.2f}) | "
                    f"{beta_state.reason if beta_state else ''}")
    return None

cleanup_pair()

self._bocpd.cleanup_pair(key)
self._all_pair_states.pop(key, None)

4. 完整数据流

1. WebSocket 4h K线闭合
   ↓
2. realtime_kline_service_base 触发分析
   ↓
3. analyze_multi_period()
   ├─ calculate_cointegration_params_dual_window(kalman_state=缓存)
   │   ├─ OLS 回归 → β_ols, spread, adf_pvalue(现有)
   │   └─ [新增] Kalman update → kalman_beta, innovation, P_β, kalman_state
   ├─ 健康监控 → Gate2(现有)
   └─ 输出: multi_period_result(含 kalman_* 字段)
   ↓
4. orchestrator.process_analysis()
   ├─ 缓存 kalman_state → self._kalman_states[pair_key]
   ├─ 传递 kalman_innovation, kalman_P_beta → strategy.process_tick()
   │   ├─ [用途 1] BOCPD.update(innovation) → 体制检测
   │   └─ [用途 1] _check_entry() → 硬拦截 / 阈值缩放
   ├─ 若产生 EntrySignal:
   │   ├─ [用途 2] resolve_hedge_beta(kalman_beta, P_β, ols_beta) → hedge_beta
   │   └─ on_entry_signal(hedge_beta=...) → PairTradeSignal
   ↓
5. position_manager.open_position(signal)
   ├─ risk_manager.calculate_position_size(hedge_beta=signal.hedge_beta)
   │   ├─ base_notional = total / (1 + |β|)
   │   └─ alt_notional  = |β| × base_notional
   ├─ executor.market_open(alt_size, base_size)
   └─ PairPosition(entry_hedge_beta=β)
   ↓
6. 持久化
   ├─ pair_positions: 含 entry_hedge_beta
   └─ trading_signals: 含 hedge_beta, hedge_beta_source

5. 场景模拟

A:正常市场(β 稳定) — 两个用途均正常

innovation ~ N(0,1): z = [-0.3, 0.8, -0.5, 0.2, ...]
kalman_beta ≈ 0.45, P_β < 0.01

用途 1: BOCPD P(变点) ≈ 0.02~0.05 → regime=STABLE, scale=1.0 → 正常入场
用途 2: hedge_beta=0.45(kalman) → alt_notional = 0.45 × base_notional
        比等额更少的 Alt 暴露,正确反映 β<1 的弱相关性

B:β 开始飙升 — 用途 1 拦截,用途 2 自动调整

T=0h:   kalman_beta ≈ 0.5  → 正常
T=12h:  kalman_beta ≈ 1.2, innovation 持续偏正
        用途 1: P(变点) ≈ 0.35 > soft_prob → EXPANDING, scale≈1.1
        用途 2: hedge_beta=1.2 → Alt 腿名义增大(如果入场的话)
T=16h:  P(变点) ≈ 0.75 > hard_prob → 硬拦截
        用途 2: 不会执行(入场被拦截)

C:β 飙升后企稳 — 恢复后使用新 β

飙升期 P(变点) = 0.85 → 硬拦截
企稳后 innovation 恢复 N(0,1):
→ ~6-8 根正常 K线(24-32h)后 P(变点) < soft_prob → 恢复

恢复后:
  用途 1: regime=STABLE → 允许入场
  用途 2: kalman_beta 已追踪到新 β(如 1.8),开仓自动使用新比例
          不像 OLS 还在用旧 β≈0.6

D:β≈0.3 的弱相关配对 — hedge ratio 纠偏最显著

OLS β ≈ 0.31,Kalman β ≈ 0.29

等额开仓(当前系统):
  Alt $100 + Base $100
  Alt 的非系统性风险暴露 = $100 - 0.3×$100 = $70(多余)

β 加权开仓:
  base_notional = $200 / 1.3 ≈ $154
  alt_notional  = 0.3 × $154 ≈ $46
  → Alt 仅 $46,正确对冲 Base $154 的 0.3 倍

E:Kalman 冷启动 — 降级策略

系统重启,kalman_state 丢失:
  T=0:  用 OLS β 初始化 Kalman,P₀ = diag(0.1, 1.0)
        P_β = 1.0 > HEDGE_BETA_P_MAX(0.5)
        → resolve_hedge_beta 降级为 OLS β

  T=3-5 根 K线(12-20h):
        Kalman 收敛,P_β < 0.1
        → resolve_hedge_beta 切回 Kalman β

F:系统性风险

20 配对中 8 个 P(变点) > 0.3 → 40% > systemic_threshold=30%
→ 系统性风险拦截 → 所有配对暂停入场
→ hedge_beta 仍在后台更新,恢复后使用最新值

6. 改动文件清单

文件 改动 影响范围
src/config.py +7 Kalman 常量 + 3 Hedge Ratio 常量 配置层
src/utils/analysis/analysis_core.py +VectorKalmanBetaEstimator 类;增强 calculate_cointegration_params_dual_window(), analyze_pair_advanced(), analyze_multi_period() 分析层
src/trading/config.py StrategyParams +7 字段;增强 get_strategy_params(), _build_strategy_params(), load_trading_config() 配置层
src/trading/models.py PairTradeSignal +2 字段 (hedge_beta, hedge_beta_source);PairPosition +1 字段 (entry_hedge_beta) 数据模型
src/trading/orchestrator.py +_kalman_states 字典;增强 process_analysis() 提取 Kalman 输出 + 计算 hedge_beta;增强 on_entry_signal() 传递 hedge_beta 编排层
src/trading/strategy.py +_BetaRegimeState, _BOCPDDetector, _SystemicRiskAggregator;增强 process_tick(), _check_entry(), cleanup_pair() 策略层
src/trading/risk_manager.py 增强 calculate_position_size() 支持 β 加权 风控层
src/trading/position_manager.py 增强 _open_position_inner() 传递 hedge_beta 仓位管理

新增依赖scipy.special.gammaln(scipy 已在项目中)

不改动momentum_filter.py, executor.py


7. DB 改动

pair_positions 表新增列

ALTER TABLE pair_positions ADD COLUMN entry_hedge_beta DOUBLE PRECISION DEFAULT 1.0;

trading_signals 表新增列

ALTER TABLE trading_signals ADD COLUMN hedge_beta DOUBLE PRECISION DEFAULT 1.0;
ALTER TABLE trading_signals ADD COLUMN hedge_beta_source VARCHAR(10) DEFAULT 'default';

8. 日志

时机 级别 格式
初始化 INFO Beta体制跟踪器初始化 | BOCPD hazard=0.020 soft=0.30 hard=0.70
Kalman 更新 DEBUG Kalman更新 | PURR|HYPE | α̂=0.002 β̂=1.20 P_β=0.005 innov=2.85
hedge_beta 选择 DEBUG hedge_beta=0.45(kalman) P_β=0.003 | OLS_β=0.48
hedge_beta 降级 DEBUG Kalman P_β=0.62 > 0.50,降级为 OLS β=0.48
β 加权仓位 INFO 仓位计算(β加权) | β=0.45(kalman) | Alt ≈$62 | Base ≈$138 | 比例 0.45:1
硬拦截 INFO Beta体制硬拦截 | PURR|HYPE | P(变点)=0.850>=0.70 E[rl]=1.2
缩放拦截 INFO Beta体制缩放拦截 | PURR|HYPE | az=-4.50 有效阈值=4.00 (3.0×1.33)
系统性风险 INFO 系统性风险: 8/20 (40%) 配对扩张态

9. 风险与边界条件

风险 严重度 缓解
已持仓 hedge ratio 偏离 本次仅在入场时计算 β 比例;持仓期间 rebalance 作为 P0 后续
β 符号翻转(kalman_beta < 0) resolve_hedge_beta 取绝对值 + 下限保护 HEDGE_BETA_MIN=0.1
Kalman Q 不适配 R 自适应 + Huber clipping + 回测调优
重启丢失 Kalman 状态 OLS 重新初始化,3-5 根 K线收敛;P_β 过大时自动降级为 OLS β
BOCPD 冷启动 warmup=5(20h)内保持 stable 默认
低波动期 r_btc≈0 H=[1,0] 只更新 α,符合物理意义;hedge_beta 保持上一值
β 加权导致极端仓位 HEDGE_BETA_MIN=0.1, HEDGE_BETA_MAX=5.0 上下限保护

不在本次范围:退场逻辑调整、持仓期间 hedge rebalance、HMM 体制分类、Kalman 持久化到 DB、飞书告警、Q 自适应


10. 验证方案

单元测试

# test_vector_kalman.py
import numpy as np

def test_kalman_convergence():
    """从初始值收敛到真实 [α, β]"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        r_btc = rng.normal(0, 0.02)
        r_alt = 0.001 + 1.0 * r_btc + rng.normal(0, 0.01)
        result = kf.update(r_btc, r_alt)
    assert abs(result['beta'] - 1.0) < 0.15
    assert result['alpha'] > 0

def test_kalman_innovation_spike():
    """β 突变时 innovation 立即飙升"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    result = kf.update(0.02, 3.0 * 0.02)
    assert abs(result['normalized_innovation']) > 2.0

def test_kalman_huber_clipping():
    """极端 innovation 被截断,β 不过度跳变"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2, clip_sigma=3.0)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    result = kf.update(0.02, 20.0 * 0.02)
    assert result['clipped'] is True
    assert result['beta'] < 5.0

def test_kalman_joseph_positive():
    """Joseph 形式保证 P 始终非负"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-6)
    rng = np.random.default_rng(42)
    for _ in range(500):
        result = kf.update(rng.normal(0, 0.05), 0.5 * rng.normal(0, 0.05) + rng.normal(0, 0.001))
        assert result['P_beta'] >= 0 and result['P_alpha'] >= 0

def test_kalman_alpha_absorbs_drift():
    """α 吸收独立漂移,β 不被污染"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        result = kf.update(rng.normal(0, 0.02), 0.005 + 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    assert abs(result['beta'] - 0.5) < 0.2
    assert result['alpha'] > 0.001

def test_kalman_state_persistence():
    """状态导出/恢复后行为一致"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf1 = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
    kf1.update(0.02, 0.01)
    kf2 = VectorKalmanBetaEstimator.from_state_dict(kf1.state_dict())
    r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
    assert abs(r1['beta'] - r2['beta']) < 1e-10
# test_hedge_beta.py
import numpy as np

def test_resolve_hedge_beta_kalman():
    """Kalman β 优先,P_β 合格时使用"""
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.01, ols_beta=0.5)
    assert source == 'kalman'
    assert abs(beta - 0.45) < 0.01

def test_resolve_hedge_beta_fallback_ols():
    """P_β 过大时降级为 OLS"""
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source = resolve_hedge_beta(kalman_beta=0.45, kalman_P_beta=0.8, ols_beta=0.5)
    assert source == 'ols'
    assert abs(beta - 0.5) < 0.01

def test_resolve_hedge_beta_fallback_default():
    """无 β 时兜底 1.0"""
    from src.trading.orchestrator import resolve_hedge_beta
    beta, source = resolve_hedge_beta(kalman_beta=None, kalman_P_beta=None, ols_beta=None)
    assert source == 'default'
    assert beta == 1.0

def test_resolve_hedge_beta_clipping():
    """β 超限被裁剪"""
    from src.trading.orchestrator import resolve_hedge_beta
    beta, _ = resolve_hedge_beta(kalman_beta=10.0, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 5.0  # HEDGE_BETA_MAX
    beta, _ = resolve_hedge_beta(kalman_beta=0.01, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 0.1  # HEDGE_BETA_MIN

def test_position_size_beta_weighted():
    """β 加权仓位:alt_notional = β × base_notional"""
    # β=0.5, total=200 → base=133.3, alt=66.7
    abs_beta = 0.5
    total = 200.0
    base_notional = total / (1.0 + abs_beta)
    alt_notional = abs_beta * base_notional
    assert abs(base_notional - 133.33) < 0.5
    assert abs(alt_notional - 66.67) < 0.5
    assert abs(base_notional + alt_notional - total) < 0.01

def test_position_size_beta_one():
    """β=1.0 退化为等额(向后兼容)"""
    abs_beta = 1.0
    total = 200.0
    base_notional = total / (1.0 + abs_beta)
    alt_notional = abs_beta * base_notional
    assert abs(base_notional - 100.0) < 0.01
    assert abs(alt_notional - 100.0) < 0.01

def test_position_size_negative_beta():
    """负 β 取绝对值"""
    from src.trading.orchestrator import resolve_hedge_beta
    beta, _ = resolve_hedge_beta(kalman_beta=-0.8, kalman_P_beta=0.01, ols_beta=0.5)
    assert beta == 0.8  # abs(-0.8)
# test_bocpd.py
import numpy as np

def test_bocpd_stable():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(20):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'stable' and s.changepoint_prob < 0.3

def test_bocpd_detects_shift():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(10):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    for i in range(10, 18):
        d.update(key, rng.normal(3.0, 1), f"2024-01-02T{(i-10)*4:02d}:00:00")
    s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'expanding' and s.changepoint_prob > 0.3

def test_bocpd_hard_block():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(10):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    for i in range(10, 20):
        d.update(key, rng.normal(5.0, 0.5), f"2024-01-02T{(i-10)*4:02d}:00:00")
    s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.hard_block and s.changepoint_prob > 0.7

def test_bocpd_recovery():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(10):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    for i in range(10):
        d.update(key, rng.normal(4.0, 1), f"2024-01-02T{i*4:02d}:00:00")
    assert d.check(key, 0.3, 0.7, 2.0, 5).regime == 'expanding'
    for i in range(15):
        d.update(key, rng.normal(0, 1), f"2024-01-03T{i*4:02d}:00:00")
    s = d.check(key, 0.3, 0.7, 2.0, 5)
    assert s.regime == 'stable' and s.changepoint_prob < 0.3

def test_bocpd_timestamp_dedup():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for _ in range(48):
        d.update(key, 5.0, "2024-01-01T00:00:00")
    assert d.check(key, 0.3, 0.7, 2.0, 5).reason == "数据不足"

# test_systemic_risk.py
def test_systemic_risk_trigger():
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
        10.0, 0.01, 1.5 if i < 4 else 1.0, False, ""
    ) for i in range(10)}
    assert agg.check(states, 0.3)[0]  # 4/10 = 40% > 30%

def test_systemic_risk_no_trigger():
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 2 else 'stable', 0.5 if i < 2 else 0.1,
        10.0, 0.01, 1.5 if i < 2 else 1.0, False, ""
    ) for i in range(10)}
    assert not agg.check(states, 0.3)[0]  # 2/10 = 20% < 30%

集成验证

  1. 回测:在 β 飙升的历史区间回测,验证检测延迟和假阳性率;对比等额 vs β 加权的 PnL 差异
  2. 实盘观察:监控 kalman_beta vs OLS β 的偏离度、hedge_beta_source 分布、P_β 走势
  3. A/B 对比(可选):同时运行等额和 β 加权,对比 hedge 效果和回撤

11. 后续演进

优先级 方向 预期收益
P0 退场保护(β 飙升时收紧止损) 减少已持仓亏损
P0 持仓期间 hedge rebalance(β 偏离阈值时调整两腿比例) 维持对冲质量
P1 Kalman 状态持久化到 DB 消除 20h 冷启动窗口
P1 Innovation-based Q 自适应 不同币对自动最优 Q
P2 HMM 体制分类 更精细的多体制缩放控制
P2 系统性风险分级(警告/严重/紧急) 更精细的全局风控
P3 飞书告警 体制切换和 hedge_beta 变化时人工监控

Read more

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG

对于空间环境、“信息/逻辑”(比如代码、结构、表达)秩序追求的心理特征分析

一、为什么是“空间 + 信息”同时强化? 因为你当年面对的是“双重失控”: 1️⃣ 外部世界是脏乱 + 失序的 * 空间被污染 * 行为无边界 * 基本生活秩序崩塌 👉 所以你现在会强烈要求: * 桌面干净 * 房间有序 * 物品可控 这是在修复:“物理世界必须是可控的” 2️⃣ 人的行为和逻辑也是混乱的 * 没有规则 * 没有底线 * 没有理性 👉 所以你现在会特别在意: * 表达是否清晰 * 逻辑是否自洽 * 结构是否优雅 * 代码是否干净 这是在修复:“认知世界必须是合理的” 二、你其实构建了一个“高纯度系统” 你现在的偏好,本质上是: 👉 低噪音 + 高结构 + 强控制感 具体表现就是: * 空间:极简、整洁、可预测 * 信息:清晰、压缩、无冗余 这类人有一个很明显的优势: 👉 处理复杂问题时,

By SHI XIAOLONG