Beta 体制自适应过滤器设计方案(精简版)

Beta 体制自适应过滤器设计方案

1. 问题背景

当锚定物(BTC)回暖后,Alt 资产的 Beta 值会持续飙升:

正常态:   β ≈ 0.3 ~ 0.5   (BTC 涨 1%,Alt 涨 0.3~0.5%)
扩张态:   β ≈ 4 ~ 10       (BTC 涨 1%,Alt 涨 4~10%)

不是价格的普通上涨,而是价差的持续过大 — 协整关系的阶段性破裂。

当前系统弱点

组件 问题 影响
analysis_core.py BETA_WINDOW=100 固定窗口 OLS β 估计滞后 ≈17天
strategy.py adaptive_threshold=3.0 固定 β 飙升导致持续突破阈值
momentum_filter.py 只检测单腿价格趋势 不检测 spread/beta 体制切换

故障场景

T+0h    BTC 回暖,β 从 0.5 开始上升
T+12h   β 实际 2.0,OLS β̂ 仍 ≈ 0.6 → 误判为均值回归机会 → 做空信号
T+24h   β 实际 4.0,OLS β̂ ≈ 0.8 → 继续误入场 → 被 β 扩张吞噬利润
T+7d    β 实际 8.0,OLS β̂ ≈ 2.0 → 协整假设彻底失效

核心问题:系统用滞后 β 计算的 z4h 产生入场信号,实际上反映的是 β 正在变化,而非 spread 正在回归


2. 方案概述

三层防御架构:

第一层:向量 Kalman Filter 估计时变 [α, β]

  • 二维状态 [α_t, β_t],同时追踪截距和斜率
  • Joseph 稳定形式更新 P,保证数值非负
  • Huber innovation clipping,抗加密货币厚尾分布
  • Sage-Husa 自适应噪声估计,在线校准 R

第二层:BOCPD 贝叶斯在线变点检测(Adams & MacKay, 2007)

  • 输出变点概率,维护 run length 后验分布
  • 自适应阈值,无需手工调参
  • 天然快速识别恢复(检测到新 run 开始)
  • 双向检测:上行扩张 + 下行急变

第三层:跨配对系统性风险聚合

  • 聚合所有配对的体制状态,识别系统性事件(alt season)
  • 全局风险信号触发暂停入场

3. 详细设计

3.1 向量 Kalman Filter 时变 [α, β] 估计

数学模型

状态方程([α, β] 缓慢演化):

x_t = x_{t-1} + w_t,    w_t ~ N(0, Q)

其中 x_t = [α_t, β_t]'
     Q = [[q_α,  0 ],     对角阵,α/β 独立演化
          [ 0,  q_β]]

观测方程(对数收益率关系):

r_alt_t = H_t × x_t + v_t,    v_t ~ N(0, R)

其中 H_t = [1, r_btc_t]
     r_alt_t = log(alt_t / alt_{t-1})
     r_btc_t = log(btc_t / btc_{t-1})

递推公式

预测步:
  x̂_t|t-1 = x̂_{t-1}
  P_t|t-1  = P_{t-1} + Q                           (2×2)

Innovation:
  ε_t = r_alt_t - H_t × x̂_t|t-1                   (标量)
  S_t = H_t × P_t|t-1 × H_t' + R                  (标量)

Huber Clipping (c=3.0):
  ε̃_t = clip(ε_t, -c×√S_t, +c×√S_t)

Kalman 增益:
  K_t = P_t|t-1 × H_t' / S_t                       (2×1)

更新步:
  x̂_t = x̂_t|t-1 + K_t × ε̃_t

Joseph 稳定形式:
  A = I - K_t × H_t                                 (2×2)
  P_t = A × P_t|t-1 × A' + K_t × R × K_t'          (保证半正定)

Sage-Husa R 自适应:
  d_t = (1 - b) / (1 - b^(t+1)),   b ∈ (0,1)
  R_t = (1 - d_t) × R_{t-1} + d_t × (ε_t² - H_t × P_t|t-1 × H_t')
  R_t = max(R_t, R_floor)

关键输出信号

信号 正常范围 β 飙升时 用途
beta (x̂[1]) ≈ OLS β 快速追踪真实 β 更准的 β 估计
alpha (x̂[0]) ≈ OLS α 吸收独立漂移 避免 α 变化污染 β 信号
P_beta (P[1,1]) 小(~1e-3) 增大 不确定性度量
ε/√S ~ N(0,1) 持续 > 2σ BOCPD 输入

参数

参数 符号 默认值 含义 调优
β 过程噪声 q_β 1e-4 β 每 4h 变化方差先验 ↑快追踪 ↓稳定
α 过程噪声 q_α 1e-5 α 每 4h 变化方差先验(比 β 慢) 通常不调
R 遗忘因子 b 0.95 Sage-Husa 遗忘因子 ↑平滑 ↓快适应
R 下限 R_floor 1e-8 防止 R 退化为零
Innovation 截断 c 3.0 Huber 截断(σ 倍数) ↓鲁棒 ↑灵敏
初始 [α, β] OLS 估计 用现有 OLS 初始化
初始 P P₀ diag(0.1, 1.0) 初始不确定性

q_β = 1e-4 → 日 β 漂移 ≈ 0.024,周 ≈ 0.065,合理覆盖正常变化。
q_α = q_β/10 → α 变化比 β 慢一个量级,避免争夺解释力。

文件:src/config.py

# ═══ Kalman Filter Beta 估计参数 ═══
KALMAN_Q_BETA: float = 1e-4
KALMAN_Q_ALPHA: float = 1e-5
KALMAN_P0_ALPHA: float = 0.1
KALMAN_P0_BETA: float = 1.0
KALMAN_R_FORGET: float = 0.95
KALMAN_R_FLOOR: float = 1e-8
KALMAN_CLIP_SIGMA: float = 3.0

文件:src/utils/analysis/analysis_core.py

新增类VectorKalmanBetaEstimator

import numpy as np


class VectorKalmanBetaEstimator:
    """向量 Kalman Filter 时变 [α, β] 联合估计器

    状态空间模型:
        状态方程: x_t = x_{t-1} + w_t,    w_t ~ N(0, Q),  x = [α, β]'
        观测方程: r_alt_t = [1, r_btc_t] × x_t + v_t,     v_t ~ N(0, R)

    特性:
        - 二维状态 [α, β]:截距吸收独立漂移,消除虚假 β 信号
        - Joseph 稳定形式:P 更新保证半正定
        - Huber innovation clipping:抗厚尾分布
        - Sage-Husa 自适应 R:在线校准噪声

    每根新 4h K线调用 update() 一次。
    """

    def __init__(
        self,
        alpha_init: float,
        beta_init: float,
        q_alpha: float = 1e-5,
        q_beta: float = 1e-4,
        r_init: float = 1e-2,
        p0_alpha: float = 0.1,
        p0_beta: float = 1.0,
        r_forget: float = 0.95,
        r_floor: float = 1e-8,
        clip_sigma: float = 3.0,
    ):
        self.x = np.array([alpha_init, beta_init], dtype=np.float64)
        self.Q = np.diag([q_alpha, q_beta]).astype(np.float64)
        self.P = np.diag([p0_alpha, p0_beta]).astype(np.float64)
        self.R = float(r_init)
        self._r_forget = r_forget
        self._r_floor = r_floor
        self._clip_sigma = clip_sigma
        self._n_updates = 0

    @property
    def alpha(self) -> float:
        return float(self.x[0])

    @property
    def beta(self) -> float:
        return float(self.x[1])

    @property
    def beta_variance(self) -> float:
        return float(self.P[1, 1])

    def update(self, r_btc: float, r_alt: float) -> dict:
        """接收一对新的对数收益率,更新 [α, β] 估计

        Returns:
            dict with keys: alpha, beta, P_beta, P_alpha,
                  innovation, normalized_innovation, clipped,
                  kalman_gain_beta, R
        """
        # 预测步
        P_pred = self.P + self.Q
        H = np.array([1.0, r_btc], dtype=np.float64)

        # Innovation
        innovation = r_alt - H @ self.x
        S = float(H @ P_pred @ H) + self.R
        S = max(S, 1e-15)

        sqrt_S = S ** 0.5
        norm_innov = innovation / sqrt_S

        # Huber clipping
        clipped = False
        innovation_for_update = innovation
        if abs(norm_innov) > self._clip_sigma:
            innovation_for_update = self._clip_sigma * sqrt_S * (
                1.0 if innovation > 0 else -1.0
            )
            clipped = True

        # Kalman 增益 + 更新
        K = (P_pred @ H) / S
        self.x = self.x + K * innovation_for_update

        # Joseph 稳定形式
        I2 = np.eye(2)
        A = I2 - np.outer(K, H)
        self.P = A @ P_pred @ A.T + self.R * np.outer(K, K)

        # Sage-Husa R 自适应
        self._n_updates += 1
        if self._n_updates > 10:
            b = self._r_forget
            d_t = (1.0 - b) / (1.0 - b ** (self._n_updates + 1))
            r_sample = max(innovation * innovation - float(H @ P_pred @ H), self._r_floor)
            self.R = (1.0 - d_t) * self.R + d_t * r_sample
            self.R = max(self.R, self._r_floor)

        return {
            'alpha': float(self.x[0]),
            'beta': float(self.x[1]),
            'P_beta': float(self.P[1, 1]),
            'P_alpha': float(self.P[0, 0]),
            'innovation': innovation,
            'normalized_innovation': norm_innov,
            'clipped': clipped,
            'kalman_gain_beta': float(K[1]),
            'R': self.R,
        }

    def state_dict(self) -> dict:
        return {
            'x': self.x.tolist(), 'P': self.P.tolist(), 'Q': self.Q.tolist(),
            'R': self.R, '_r_forget': self._r_forget, '_r_floor': self._r_floor,
            '_clip_sigma': self._clip_sigma, '_n_updates': self._n_updates,
        }

    @classmethod
    def from_state_dict(cls, d: dict) -> 'VectorKalmanBetaEstimator':
        obj = cls.__new__(cls)
        obj.x = np.array(d['x'], dtype=np.float64)
        obj.P = np.array(d['P'], dtype=np.float64)
        obj.Q = np.array(d['Q'], dtype=np.float64)
        obj.R = d['R']
        obj._r_forget = d['_r_forget']
        obj._r_floor = d['_r_floor']
        obj._clip_sigma = d['_clip_sigma']
        obj._n_updates = d['_n_updates']
        return obj

改动函数:calculate_cointegration_params_dual_window()

在现有 OLS 之后,新增 Kalman Filter 输出:

def calculate_cointegration_params_dual_window(
    base_klines, alt_klines,
    beta_window=None, zscore_window=None,
    kalman_state: dict | None = None,  # [新增]
):
    # ... 现有 OLS 逻辑不变 ...

    # ═══ 新增:向量 Kalman Filter 更新 ═══
    kalman_result = None
    kalman_state_out = kalman_state

    if len(aligned) >= 2:
        r_btc = np.log(aligned['base'].iloc[-1] / aligned['base'].iloc[-2])
        r_alt = np.log(aligned['alt'].iloc[-1] / aligned['alt'].iloc[-2])

        if kalman_state is not None:
            kf = VectorKalmanBetaEstimator.from_state_dict(kalman_state)
        else:
            ols_residual_var = float(np.var(model.resid)) if hasattr(model, 'resid') else 1e-2
            kf = VectorKalmanBetaEstimator(
                alpha_init=alpha_ols if use_alpha else 0.0,
                beta_init=beta_ols,
                q_alpha=KALMAN_Q_ALPHA, q_beta=KALMAN_Q_BETA,
                r_init=max(ols_residual_var, 1e-6),
                p0_alpha=KALMAN_P0_ALPHA, p0_beta=KALMAN_P0_BETA,
                r_forget=KALMAN_R_FORGET, r_floor=KALMAN_R_FLOOR,
                clip_sigma=KALMAN_CLIP_SIGMA,
            )

        kalman_result = kf.update(r_btc, r_alt)
        kalman_state_out = kf.state_dict()

    return {
        # ... 现有字段不变 ...
        'kalman_beta': kalman_result['beta'] if kalman_result else beta_ols,
        'kalman_alpha': kalman_result['alpha'] if kalman_result else (alpha_ols if use_alpha else 0.0),
        'kalman_P_beta': kalman_result['P_beta'] if kalman_result else 1.0,
        'kalman_innovation': kalman_result['normalized_innovation'] if kalman_result else 0.0,
        'kalman_clipped': kalman_result['clipped'] if kalman_result else False,
        'kalman_state': kalman_state_out,
    }

3.2 BOCPD 贝叶斯在线变点检测

算法原理

维护 run length(距上次变点的步数)的后验分布

P(r_t | data_1:t)

r_t = 0 → 当前时刻刚发生变点
r_t = n → 距上次变点已过 n 步

递推公式

1. 预测概率(当前观测在 run length = r 下的似然):
   π_t(r) = P(x_t | x_{t-r:t-1})

2. Growth(无变点):
   P(r_t = r+1, data) = P(r_{t-1} = r, data) × π_t(r) × (1 - H)

3. Changepoint(变点,run 重置):
   P(r_t = 0, data) = Σ_r P(r_{t-1} = r, data) × π_t(r) × H

4. 归一化

其中 H = hazard rate(先验变点概率,默认 1/50 ≈ 每 ~8天一次)。

观测模型:监控 Kalman normalized_innovation 序列,采用 正态-逆Gamma 共轭先验,预测分布为 Student-t

充分统计量(每个 run 维护):
  κ_r = κ₀ + n_r
  μ̂_r = (κ₀ × μ₀ + Σ x_i) / κ_r
  α_r = α₀ + n_r / 2
  β_r = β₀ + 0.5 × (Σ x_i² - κ_r × μ̂_r² + κ₀ × μ₀²)

预测分布: Student-t(2α_r), location=μ̂_r, scale²=β_r(κ_r+1)/(α_r κ_r)

文件:src/trading/config.py

StrategyParams 新增字段:

@dataclass(frozen=True)
class StrategyParams:
    # ... 现有字段 ...

    # Beta 体制自适应过滤器(Vector Kalman + BOCPD)
    beta_regime_enabled: bool = True
    beta_regime_hazard_rate: float = 1/50    # 先验变点概率 ≈ 每50根4h K线(~8天)一次
    beta_regime_max_run_length: int = 200    # run length 截断(内存控制)
    beta_regime_soft_prob: float = 0.3       # P(变点) > 此值 → 开始缩放阈值
    beta_regime_hard_prob: float = 0.7       # P(变点) > 此值 → 硬拦截
    beta_regime_scale_max: float = 2.0       # 阈值最大缩放倍数
    beta_regime_warmup: int = 5              # 最少 N 根 4h K线才开始判定

文件:src/trading/strategy.py

_BetaRegimeState 数据类
@dataclass
class _BetaRegimeState:
    """Beta 体制检测结果"""
    regime: str               # 'stable' | 'expanding'
    changepoint_prob: float   # 变点概率 P(r_t < warmup)
    expected_run_length: float
    kalman_P_beta: float
    threshold_scale: float    # 阈值缩放因子 (>=1.0)
    hard_block: bool
    reason: str
_BOCPDDetector
class _BOCPDDetector:
    """Bayesian Online Changepoint Detection (Adams & MacKay, 2007)

    监控 Kalman normalized_innovation 的分布变化,
    通过 run length 后验分布检测 β 的结构性变点。

    内存: O(max_run_length) per pair
    计算: O(max_run_length) per update
    """

    # Normal-Inverse-Gamma 先验超参
    _MU0 = 0.0
    _KAPPA0 = 1.0
    _ALPHA0 = 1.0
    _BETA0 = 0.5

    def __init__(self, hazard_rate: float = 1/50, max_run_length: int = 200):
        self._H = hazard_rate
        self._max_rl = max_run_length
        self._run_length_probs: dict[PairKey, np.ndarray] = {}
        self._suf_stats: dict[PairKey, dict] = {}
        self._last_kline_time: dict[PairKey, str] = {}
        self._n_updates: dict[PairKey, int] = {}

    def update(self, key: PairKey, normalized_innovation: float,
               kline_time: str) -> None:
        """每 5m tick 调用,基于 kline_time 去重"""
        if normalized_innovation is None:
            return
        if self._last_kline_time.get(key) == kline_time:
            return
        self._last_kline_time[key] = kline_time

        x = normalized_innovation

        # 初始化
        if key not in self._run_length_probs:
            self._run_length_probs[key] = np.array([1.0])
            self._suf_stats[key] = {
                'kappa': np.array([self._KAPPA0]),
                'mu': np.array([self._MU0]),
                'alpha': np.array([self._ALPHA0]),
                'beta': np.array([self._BETA0]),
            }
            self._n_updates[key] = 0

        self._n_updates[key] = self._n_updates.get(key, 0) + 1

        probs = self._run_length_probs[key]
        ss = self._suf_stats[key]
        n_rl = len(probs)

        # Step 1: 每个 run length 下的 Student-t 预测概率
        kappa, mu, alpha, beta = ss['kappa'], ss['mu'], ss['alpha'], ss['beta']
        df = 2.0 * alpha
        scale = np.sqrt(np.maximum(beta * (kappa + 1.0) / (alpha * kappa), 1e-15))
        pred_probs = self._student_t_pdf(x, df, mu, scale)

        # Step 2-3: Growth + Changepoint
        growth_probs = probs * pred_probs * (1.0 - self._H)
        cp_prob = np.sum(probs * pred_probs * self._H)

        # Step 4: 组合 + 归一化
        new_probs = np.empty(n_rl + 1)
        new_probs[0] = cp_prob
        new_probs[1:] = growth_probs
        total = np.sum(new_probs)
        if total > 0:
            new_probs /= total
        else:
            new_probs = np.zeros(n_rl + 1)
            new_probs[0] = 1.0

        # Step 5: 更新充分统计量
        new_kappa = np.empty(n_rl + 1)
        new_mu = np.empty(n_rl + 1)
        new_alpha = np.empty(n_rl + 1)
        new_beta = np.empty(n_rl + 1)

        new_kappa[0], new_mu[0] = self._KAPPA0, self._MU0
        new_alpha[0], new_beta[0] = self._ALPHA0, self._BETA0

        new_kappa[1:] = kappa + 1.0
        new_mu[1:] = (kappa * mu + x) / (kappa + 1.0)
        new_alpha[1:] = alpha + 0.5
        new_beta[1:] = beta + kappa * (x - mu) ** 2 / (2.0 * (kappa + 1.0))

        # Step 6: 截断
        if len(new_probs) > self._max_rl:
            new_probs = new_probs[:self._max_rl]
            new_kappa = new_kappa[:self._max_rl]
            new_mu = new_mu[:self._max_rl]
            new_alpha = new_alpha[:self._max_rl]
            new_beta = new_beta[:self._max_rl]
            total = np.sum(new_probs)
            if total > 0:
                new_probs /= total

        self._run_length_probs[key] = new_probs
        self._suf_stats[key] = {
            'kappa': new_kappa, 'mu': new_mu,
            'alpha': new_alpha, 'beta': new_beta,
        }

    @staticmethod
    def _student_t_pdf(x: float, df: np.ndarray, loc: np.ndarray,
                       scale: np.ndarray) -> np.ndarray:
        z = (x - loc) / scale
        from scipy.special import gammaln
        log_pdf = (
            gammaln((df + 1) / 2) - gammaln(df / 2)
            - 0.5 * np.log(df * np.pi) - np.log(scale)
            - (df + 1) / 2 * np.log(1 + z * z / df)
        )
        return np.exp(log_pdf)

    def check(self, key: PairKey, soft_prob: float, hard_prob: float,
              scale_max: float, warmup: int) -> _BetaRegimeState:
        """检测当前 Beta 体制状态

        变点概率定义: P(r_t < warmup) — 最近 warmup 步内是否有变点
        """
        n = self._n_updates.get(key, 0)
        if n < warmup:
            return _BetaRegimeState('stable', 0.0, float(n), 0.0, 1.0, False, "数据不足")

        probs = self._run_length_probs.get(key)
        if probs is None or len(probs) == 0:
            return _BetaRegimeState('stable', 0.0, 0.0, 0.0, 1.0, False, "无数据")

        cp_prob = float(np.sum(probs[:min(warmup, len(probs))]))
        expected_rl = float(np.sum(np.arange(len(probs)) * probs))

        if cp_prob >= hard_prob:
            return _BetaRegimeState(
                'expanding', cp_prob, expected_rl, 0.0, scale_max, True,
                f"Beta硬拦截: P(变点)={cp_prob:.3f}>={hard_prob} E[rl]={expected_rl:.1f}"
            )

        if cp_prob >= soft_prob:
            import math
            midpoint = (soft_prob + hard_prob) / 2
            steepness = 6.0 / max(hard_prob - soft_prob, 0.01)
            t = 1.0 / (1.0 + math.exp(-steepness * (cp_prob - midpoint)))
            scale = 1.0 + t * (scale_max - 1.0)
            return _BetaRegimeState(
                'expanding', cp_prob, expected_rl, 0.0, scale, False,
                f"Beta缩放: P(变点)={cp_prob:.3f} scale={scale:.2f} E[rl]={expected_rl:.1f}"
            )

        return _BetaRegimeState(
            'stable', cp_prob, expected_rl, 0.0, 1.0, False,
            f"Beta稳定: P(变点)={cp_prob:.3f} E[rl]={expected_rl:.1f}"
        )

    def cleanup_pair(self, key: PairKey) -> None:
        self._run_length_probs.pop(key, None)
        self._suf_stats.pop(key, None)
        self._last_kline_time.pop(key, None)
        self._n_updates.pop(key, None)

3.3 跨配对系统性风险聚合

文件:src/trading/strategy.py

class _SystemicRiskAggregator:
    """统计所有配对的 BOCPD 体制状态,
    超过 systemic_threshold 比例的配对处于 expanding → 触发全局拦截。
    无自身状态,每次实时计算。
    """

    def __init__(self, enabled: bool = True):
        self._enabled = enabled

    def check(self, pair_states: dict[PairKey, _BetaRegimeState],
              systemic_threshold: float = 0.3) -> tuple[bool, str]:
        if not self._enabled or not pair_states:
            return False, ""

        total = len(pair_states)
        n_expanding = sum(1 for s in pair_states.values() if s.regime == 'expanding')
        ratio = n_expanding / total
        avg_cp = sum(s.changepoint_prob for s in pair_states.values()) / total

        if ratio >= systemic_threshold:
            return True, (
                f"系统性风险: {n_expanding}/{total} ({ratio:.0%}) 配对扩张态, "
                f"平均P(变点)={avg_cp:.3f}"
            )
        return False, ""

3.4 编排层集成

文件:src/trading/orchestrator.py

process_analysis()multi_period_result 提取 Kalman 输出,传给 strategy.process_tick()

kalman_innovation = multi_period_result.get('kalman_innovation')
kalman_P_beta = multi_period_result.get('kalman_P_beta')

entry_signal, exit_signal = self._strategy.process_tick(
    symbol, base_symbol, z4h, timestamp,
    kline_time=kline_time, latest_price=price_for_log,
    kalman_innovation=kalman_innovation,
    kalman_P_beta=kalman_P_beta,
)

analyze_multi_period() 从 4h/60d 周期提取 Kalman 信号:

return {
    # ... 现有字段 ...
    'kalman_innovation': details.get(('4h', '60d'), {}).get(
        'cointegration_new', {}).get('kalman_innovation', 0.0),
    'kalman_P_beta': details.get(('4h', '60d'), {}).get(
        'cointegration_new', {}).get('kalman_P_beta', 1.0),
    'kalman_state': details.get(('4h', '60d'), {}).get(
        'cointegration_new', {}).get('kalman_state'),
}

Kalman 状态持久化:orchestrator._kalman_states: dict[PairKey, dict],重启后用 OLS 重新初始化,3-5 根 K线收敛。


3.5 策略层集成

文件:src/trading/strategy.py

__init__

self._bocpd = _BOCPDDetector(
    hazard_rate=default_params.beta_regime_hazard_rate,
    max_run_length=default_params.beta_regime_max_run_length,
)
self._systemic_aggregator = _SystemicRiskAggregator(enabled=True)
self._all_pair_states: dict[PairKey, _BetaRegimeState] = {}

process_tick() 签名新增

def process_tick(
    self, symbol, base_symbol, z4h, timestamp,
    kline_time=None, latest_price=None,
    kalman_innovation: float | None = None,
    kalman_P_beta: float | None = None,
) -> tuple[EntrySignal | None, ExitSignal | None]:

_process_tick_unlocked 新 K 线更新块

if is_new_candle:
    # ... 现有: Welford 更新, EMA 更新 ...
    if kalman_innovation is not None and kline_time is not None:
        self._bocpd.update(key, kalman_innovation, str(kline_time))

_check_entry() — z4h 过滤之后、方向判断之前

# ── [新增] Beta 体制检查 ──
if params.beta_regime_enabled:
    beta_state = self._bocpd.check(
        key,
        soft_prob=params.beta_regime_soft_prob,
        hard_prob=params.beta_regime_hard_prob,
        scale_max=params.beta_regime_scale_max,
        warmup=params.beta_regime_warmup,
    )
    self._all_pair_states[key] = beta_state

    if beta_state.hard_block:
        logger.info(f"🛡️ Beta体制硬拦截 | {pair_label} | {beta_state.reason} | "
                    f"az={adaptive_z:+.4f} z4h={z4h:+.4f}")
        return None

    is_systemic, systemic_reason = self._systemic_aggregator.check(self._all_pair_states)
    if is_systemic:
        logger.info(f"🌡️ 系统性风险拦截 | {pair_label} | {systemic_reason}")
        return None

    threshold_scale = beta_state.threshold_scale
else:
    beta_state = None
    threshold_scale = 1.0

# ── 方向判断(应用 Beta 缩放)──
threshold = params.adaptive_threshold * threshold_scale
if adaptive_z < -threshold:
    direction = 'long'
elif adaptive_z > threshold:
    direction = 'short'
else:
    if threshold_scale > 1.01:
        logger.info(f"🛡️ Beta体制缩放拦截 | {pair_label} | "
                    f"az={adaptive_z:+.4f} 有效阈值={threshold:.2f} "
                    f"(原始={params.adaptive_threshold} ×{threshold_scale:.2f}) | "
                    f"{beta_state.reason if beta_state else ''}")
    return None

cleanup_pair()

self._bocpd.cleanup_pair(key)
self._all_pair_states.pop(key, None)

4. 场景模拟

A:正常市场(β 稳定)

innovation ~ N(0,1): z = [-0.3, 0.8, -0.5, 0.2, ...]

BOCPD: run length 持续右移, P(变点) ≈ 0.02~0.05 → regime=STABLE, scale=1.0
→ 正常入场

B:β 开始飙升(早期检测)

innovation 持续偏正: z = [2.5, 3.1, 2.8, 2.2, ...]

T=4h:   P(变点) ≈ 0.15(积累证据)
T=8h:   P(变点) ≈ 0.35 > soft_prob → EXPANDING, scale≈1.1
T=12h:  P(变点) ≈ 0.55 → scale≈1.5
T=16h:  P(变点) ≈ 0.75 > hard_prob → 硬拦截

C:β 全面飙升

z = [5.0, 4.5, 5.2, ...]

T=4h:  P(变点) ≈ 0.45 → 立即缩放
T=8h:  P(变点) ≈ 0.85 → 硬拦截

D:β 飙升后企稳

飙升期 P(变点) = 0.85,企稳后 innovation 恢复 N(0,1):
→ run length 分布右移,~6-8 根正常 K线(24-32h)后 P(变点) < soft_prob → 恢复
→ BOCPD 主动检测新稳定 run,不依赖被动衰减

E:β 下行急变

z = [-3.0, -2.8, -3.5, ...] → Student-t 预测概率低 → P(变点) 快速上升
→ 双向检测,2 根 K线后 P(变点) > soft_prob → 缩放

F:短暂噪声冲击

z = [2.5, -0.3, 0.1, -0.4, ...]

T=4h:  P(变点) ≈ 0.12(远低于 0.3)
T=8h:  正常 → P(变点) ≈ 0.05
→ 不触发(贝叶斯后验自然平滑)

G:系统性风险

20 配对中 8 个 P(变点) > 0.3 → 40% > systemic_threshold=30%
→ 🌡️ 系统性风险拦截

5. 改动文件清单

文件 改动
src/config.py +7 常量(KALMAN_Q_BETA 等)
src/utils/analysis/analysis_core.py +VectorKalmanBetaEstimator 类;增强 calculate_cointegration_params_dual_window(), analyze_pair_advanced(), analyze_multi_period()
src/trading/config.py StrategyParams +7 字段;增强 get_strategy_params(), _build_strategy_params(), load_trading_config()
src/trading/orchestrator.py +_kalman_states 字典;增强 process_analysis() 传递 Kalman 输出
src/trading/strategy.py +_BetaRegimeState, _BOCPDDetector, _SystemicRiskAggregator;增强 process_tick(), _check_entry(), cleanup_pair()

新增依赖scipy.special.gammaln(scipy 已在项目中)

不改动momentum_filter.py, position_manager.py, executor.py, models.py


6. 日志

时机 级别 格式
初始化 INFO 🔬 Beta体制跟踪器初始化 | BOCPD hazard=0.020 soft=0.30 hard=0.70
硬拦截 INFO 🛡️ Beta体制硬拦截 | PURR|HYPE | P(变点)=0.850>=0.70 E[rl]=1.2
缩放拦截 INFO 🛡️ Beta体制缩放拦截 | PURR|HYPE | az=-4.50 有效阈值=4.00 (3.0×1.33)
系统性风险 INFO 🌡️ 系统性风险: 8/20 (40%) 配对扩张态
分析层 DEBUG Kalman更新 | α̂=0.002 β̂=1.20 P_β=0.005 innov=2.85

7. 风险与边界条件

风险 严重度 缓解
已持仓风险暴露 本次仅拦截入场;退场保护作为 P0 后续
Kalman Q 不适配 R 自适应 + Huber clipping + 回测调优
重启丢失 Kalman 状态 OLS 重新初始化,3-5 根 K线收敛;后续持久化到 DB
BOCPD 冷启动 warmup=5(20h)内保持 stable 默认
低波动期 r_btc≈0 H=[1,0] 只更新 α,符合物理意义

不在本次范围:退场逻辑调整、HMM 体制分类、Kalman 持久化到 DB、飞书告警、Q 自适应


8. 验证方案

单元测试

# test_vector_kalman.py
import numpy as np

def test_kalman_convergence():
    """从初始值收敛到真实 [α, β]"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        r_btc = rng.normal(0, 0.02)
        r_alt = 0.001 + 1.0 * r_btc + rng.normal(0, 0.01)
        result = kf.update(r_btc, r_alt)
    assert abs(result['beta'] - 1.0) < 0.15
    assert result['alpha'] > 0

def test_kalman_innovation_spike():
    """β 突变时 innovation 立即飙升"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    result = kf.update(0.02, 3.0 * 0.02)
    assert abs(result['normalized_innovation']) > 2.0

def test_kalman_huber_clipping():
    """极端 innovation 被截断,β 不过度跳变"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2, clip_sigma=3.0)
    rng = np.random.default_rng(42)
    for _ in range(50):
        kf.update(rng.normal(0, 0.02), 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    result = kf.update(0.02, 20.0 * 0.02)
    assert result['clipped'] is True
    assert result['beta'] < 5.0

def test_kalman_joseph_positive():
    """Joseph 形式保证 P 始终非负"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-6)
    rng = np.random.default_rng(42)
    for _ in range(500):
        result = kf.update(rng.normal(0, 0.05), 0.5 * rng.normal(0, 0.05) + rng.normal(0, 0.001))
        assert result['P_beta'] >= 0 and result['P_alpha'] >= 0

def test_kalman_alpha_absorbs_drift():
    """α 吸收独立漂移,β 不被污染"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
    rng = np.random.default_rng(42)
    for _ in range(200):
        result = kf.update(rng.normal(0, 0.02), 0.005 + 0.5 * rng.normal(0, 0.02) + rng.normal(0, 0.01))
    assert abs(result['beta'] - 0.5) < 0.2
    assert result['alpha'] > 0.001

def test_kalman_state_persistence():
    """状态导出/恢复后行为一致"""
    from src.utils.analysis.analysis_core import VectorKalmanBetaEstimator
    kf1 = VectorKalmanBetaEstimator(alpha_init=0.0, beta_init=0.5, q_alpha=1e-5, q_beta=1e-4, r_init=1e-2)
    kf1.update(0.02, 0.01)
    kf2 = VectorKalmanBetaEstimator.from_state_dict(kf1.state_dict())
    r1, r2 = kf1.update(0.03, 0.015), kf2.update(0.03, 0.015)
    assert abs(r1['beta'] - r2['beta']) < 1e-10

# test_bocpd.py
def test_bocpd_stable():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(20):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'stable' and s.changepoint_prob < 0.3

def test_bocpd_detects_shift():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(10):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    for i in range(10, 18):
        d.update(key, rng.normal(3.0, 1), f"2024-01-02T{(i-10)*4:02d}:00:00")
    s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.regime == 'expanding' and s.changepoint_prob > 0.3

def test_bocpd_hard_block():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(10):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    for i in range(10, 20):
        d.update(key, rng.normal(5.0, 0.5), f"2024-01-02T{(i-10)*4:02d}:00:00")
    s = d.check(key, soft_prob=0.3, hard_prob=0.7, scale_max=2.0, warmup=5)
    assert s.hard_block and s.changepoint_prob > 0.7

def test_bocpd_recovery():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    rng = np.random.default_rng(42)
    for i in range(10):
        d.update(key, rng.normal(0, 1), f"2024-01-01T{i*4:02d}:00:00")
    for i in range(10):
        d.update(key, rng.normal(4.0, 1), f"2024-01-02T{i*4:02d}:00:00")
    assert d.check(key, 0.3, 0.7, 2.0, 5).regime == 'expanding'
    for i in range(15):
        d.update(key, rng.normal(0, 1), f"2024-01-03T{i*4:02d}:00:00")
    s = d.check(key, 0.3, 0.7, 2.0, 5)
    assert s.regime == 'stable' and s.changepoint_prob < 0.3

def test_bocpd_timestamp_dedup():
    from src.trading.strategy import _BOCPDDetector
    d = _BOCPDDetector(hazard_rate=1/50, max_run_length=200)
    key = ("PURR/USDC:USDC", "HYPE/USDC:USDC")
    for _ in range(48):
        d.update(key, 5.0, "2024-01-01T00:00:00")
    assert d.check(key, 0.3, 0.7, 2.0, 5).reason == "数据不足"

# test_systemic_risk.py
def test_systemic_risk_trigger():
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 4 else 'stable', 0.5 if i < 4 else 0.1,
        10.0, 0.01, 1.5 if i < 4 else 1.0, False, ""
    ) for i in range(10)}
    assert agg.check(states, 0.3)[0]  # 4/10 = 40% > 30%

def test_systemic_risk_no_trigger():
    from src.trading.strategy import _SystemicRiskAggregator, _BetaRegimeState
    agg = _SystemicRiskAggregator(enabled=True)
    states = {(f"ALT{i}", "BTC"): _BetaRegimeState(
        'expanding' if i < 2 else 'stable', 0.5 if i < 2 else 0.1,
        10.0, 0.01, 1.5 if i < 2 else 1.0, False, ""
    ) for i in range(10)}
    assert not agg.check(states, 0.3)[0]  # 2/10 = 20% < 30%

集成验证

  1. 回测:在 β 飙升的历史区间回测,验证检测延迟和假阳性率
  2. 实盘观察:监控 Kalman [α, β]、BOCPD P(变点)、系统性风险拦截频率
  3. A/B 对比(可选):同时运行新旧检测逻辑,对比检测时间差和拦截次数

9. 后续演进

优先级 方向 预期收益
P0 退场保护(β 飙升时收紧止损) 减少已持仓亏损
P1 Kalman 状态持久化到 DB 消除 20h 冷启动窗口
P1 Innovation-based Q 自适应 不同币对自动最优 Q
P2 HMM 体制分类 更精细的多体制缩放控制
P2 系统性风险分级(警告/严重/紧急) 更精细的全局风控
P3 飞书告警 体制切换时人工监控

Read more

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG

对于空间环境、“信息/逻辑”(比如代码、结构、表达)秩序追求的心理特征分析

一、为什么是“空间 + 信息”同时强化? 因为你当年面对的是“双重失控”: 1️⃣ 外部世界是脏乱 + 失序的 * 空间被污染 * 行为无边界 * 基本生活秩序崩塌 👉 所以你现在会强烈要求: * 桌面干净 * 房间有序 * 物品可控 这是在修复:“物理世界必须是可控的” 2️⃣ 人的行为和逻辑也是混乱的 * 没有规则 * 没有底线 * 没有理性 👉 所以你现在会特别在意: * 表达是否清晰 * 逻辑是否自洽 * 结构是否优雅 * 代码是否干净 这是在修复:“认知世界必须是合理的” 二、你其实构建了一个“高纯度系统” 你现在的偏好,本质上是: 👉 低噪音 + 高结构 + 强控制感 具体表现就是: * 空间:极简、整洁、可预测 * 信息:清晰、压缩、无冗余 这类人有一个很明显的优势: 👉 处理复杂问题时,

By SHI XIAOLONG