协整检验存在的过拟合的问题(Cursor)-协整关系时变性监测方案

协整关系时变性监测方案

核心设计原则

滚动ADF检验

  • 使用多个历史窗口计算ADF p-value序列
  • 监测p-value上升趋势(协整关系弱化信号)
  • 设置崩溃阈值(如连续N期p-value > 0.10)

最小化侵入

  • 不改变现有信号生成逻辑
  • 仅在协整检验阶段添加稳定性检测
  • 使用装饰器模式包装现有方法

连续更新OLS

  • 已有代码使用滚动窗口计算Beta(price_diff_spread_ols_window方法)
  • 确认该方法已避免look-ahead bias
  • 无需额外修改

实现步骤

1. 添加协整稳定性检测器

multi_coins3.py 中添加新的静态方法:

@staticmethod
def _detect_cointegration_breakdown(
    base_prices: pd.Series,
    alt_prices: pd.Series,
    lookback_windows: list[int] = None,
    breakdown_threshold: float = 0.10,
    consecutive_fails: int = 2,
    coin: str = None
) -> tuple[bool, list[float], float]:
    """
    检测协整关系崩溃(滚动窗口ADF检验)
    
    Args:
        base_prices: 基准币种价格序列
        alt_prices: 山寨币价格序列
        lookback_windows: 滚动窗口列表(默认[50, 100, 150])
        breakdown_threshold: 崩溃阈值(p-value上限,默认0.10)
        consecutive_fails: 连续失败次数阈值(默认2)
        coin: 币种名称
    
    Returns:
        (is_breakdown, p_values, trend_score):
      - is_breakdown: 是否检测到协整崩溃
      - p_values: 各窗口的ADF p-value列表
      - trend_score: 趋势评分(正值=恶化,负值=改善)
    """

检测逻辑

# 1. 默认参数处理
if lookback_windows is None:
    lookback_windows = [50, 100, 150]  # 短、中、长期窗口

# 2. 对每个窗口计算ADF p-value
p_values = []
for window_size in lookback_windows:
    if len(base_prices) < window_size:
        continue
    
    # 取最近window_size期数据
    recent_base = base_prices.iloc[-window_size:]
    recent_alt = alt_prices.iloc[-window_size:]
    
    # 计算OLS残差
    log_base = np.log(recent_base).values.reshape(-1, 1)
    log_alt = np.log(recent_alt).values
    
    model = LinearRegression()
    model.fit(log_base, log_alt)
    
    residuals = log_alt - (model.intercept_ + model.coef_[0] * log_base.flatten())
    
    # ADF检验
    adf_result = adfuller(residuals, autolag='AIC')
    p_value = adf_result[1]
    p_values.append(p_value)

# 3. 崩溃判断规则
if len(p_values) < 2:
    return False, p_values, 0.0

# 规则A:连续N个窗口p-value都超过阈值
consecutive_count = sum(1 for pv in p_values if pv >= breakdown_threshold)
is_breakdown_by_level = consecutive_count >= consecutive_fails

# 规则B:p-value呈上升趋势(短期>长期,表示协整关系弱化)
# 计算线性趋势斜率
x = np.arange(len(p_values))
trend_slope = np.polyfit(x, p_values, 1)[0]
is_breakdown_by_trend = trend_slope > 0.05  # 斜率>0.05表示快速恶化

# 综合判断
is_breakdown = is_breakdown_by_level or is_breakdown_by_trend

# 趋势评分(用于日志输出)
trend_score = trend_slope * 100  # 转换为百分比

return is_breakdown, p_values, trend_score

2. 在协整检验方法中集成检测

修改 multiple_cointegration_analysis 方法(537-557行),添加稳定性检测:

def multiple_cointegration_analysis(self, 
    base_prices: pd.Series, 
    alt_prices: pd.Series, 
    coin: str = None, 
    stats_period_key: tuple = None,
    beta_window: int = None,
    zscore_window: int = None,
) -> tuple[bool, bool, dict, bool]:  # 新增返回值:is_stable
    """
    多周期协整检验(增强版:添加稳定性检测)
    
    Returns:
        (cointegration_status_total_period, 
         cointegration_status_short_period, 
         cointegration_result,
         is_cointegration_stable)  # 新增
    """
    # ========== 原有协整检验逻辑 ==========
    ols_params = DelayCorrelationAnalyzer._calculate_cointegration_params(
        base_prices, alt_prices, coin=coin, base_symbol=self.base_symbol
    )
    cointegration_status_total_period = self.cointegration_analysis(ols_params, 'old', coin, stats_period_key)
    
    cointegration_result = DelayCorrelationAnalyzer.price_diff_spread_ols_window(
        base_prices, alt_prices, beta_window, zscore_window
    )
    cointegration_status_short_period = self.cointegration_analysis(cointegration_result, 'new', coin, stats_period_key)
    
    # ========== 新增:协整稳定性检测 ==========
    is_stable = True  # 默认稳定
    
    # 只有在协整检验通过时才进行稳定性检测
    if cointegration_status_total_period or cointegration_status_short_period:
        is_breakdown, p_values, trend_score = DelayCorrelationAnalyzer._detect_cointegration_breakdown(
            base_prices=base_prices,
            alt_prices=alt_prices,
            coin=coin
        )
        
        if is_breakdown:
            is_stable = False
            logger.warning(
                f"⚠️ 协整关系不稳定(检测到崩溃风险)| 币种: {coin} | 周期: {stats_period_key} | "
                f"滚动窗口p-values: {[f'{pv:.4f}' for pv in p_values]} | "
                f"趋势评分: {trend_score:+.2f}% | "
                f"状态: 协整关系正在弱化"
            )
        else:
            logger.debug(
                f"✅ 协整关系稳定 | 币种: {coin} | 周期: {stats_period_key} | "
                f"滚动窗口p-values: {[f'{pv:.4f}' for pv in p_values]}"
            )
    
    return cointegration_status_total_period, cointegration_status_short_period, cointegration_result, is_stable

3. 修改信号生成逻辑,添加稳定性过滤

修改 zscore_analysis 方法(1248行附近),添加稳定性检查:

def zscore_analysis(self, coin: str, price_data_cache: dict) -> bool:
    """
    分析单个币种的多周期协整检验和Z-score(增强版:添加稳定性过滤)
    """
    zscore_result = None
    zscore_result_list = []
    cointegration_result_list = []
    stability_status_list = []  # 新增:记录稳定性状态
    
    if self.ENABLE_ZSCORE_CHECK:
        for stats_period_key in price_data_cache:
            price_data = price_data_cache[stats_period_key]
            
            # 获取协整检验结果(现在返回4个值)
            cointegration_status_total_period, cointegration_status_short_period, cointegration_result, is_stable = self.multiple_cointegration_analysis(
                price_data['base_prices'],
                price_data['alt_prices'],
                coin=coin,
                stats_period_key=stats_period_key,
                beta_window=self.BETA_WINDOW,
                zscore_window=self.ZSCORE_WINDOW
            )
            
            cointegration_result_list.extend([cointegration_status_total_period, cointegration_status_short_period])
            stability_status_list.append(is_stable)  # 记录稳定性
            
            # ... 原有Z-score计算逻辑 ...
            
    # ========== 新增:稳定性过滤 ==========
    # 检查是否所有周期的协整关系都稳定
    unstable_count = sum(1 for stable in stability_status_list if not stable)
    
    if unstable_count > 0:
        logger.warning(
            f"❌ 协整关系不稳定,停止信号输出 | 币种: {coin} | "
            f"不稳定周期数: {unstable_count}/{len(stability_status_list)} | "
            f"原因: 检测到协整关系崩溃风险"
        )
        return None  # 直接返回,不生成信号
    
    # 原有逻辑继续...
    cointegration_true_count = sum(1 for result in cointegration_result_list if result is True)
    # ...

4. 添加配置选项

在类常量区添加(130行附近):

# ========== 协整稳定性监测配置 ==========
ENABLE_COINTEGRATION_STABILITY_CHECK = True  # 是否启用稳定性检测
COINTEGRATION_BREAKDOWN_THRESHOLD = 0.10  # 崩溃阈值(p-value上限)
COINTEGRATION_LOOKBACK_WINDOWS = [50, 100, 150]  # 滚动窗口列表
COINTEGRATION_CONSECUTIVE_FAILS = 2  # 连续失败次数阈值

_detect_cointegration_breakdown 方法中读取配置:

if lookback_windows is None:
    lookback_windows = getattr(
        DelayCorrelationAnalyzer,
        'COINTEGRATION_LOOKBACK_WINDOWS',
        [50, 100, 150]
    )

5. 添加统计输出

run() 方法末尾添加协整稳定性统计:

def run(self):
    # ... 原有逻辑 ...
    
    # 在分析完成日志后添加
    logger.info(
        f"分析完成 | 交易所: {self.exchange_name} | "
        f"总数: {total} | 异常: {anomaly_count} | 跳过: {skip_count} | "
        f"协整不稳定拦截: {unstable_cointegration_count} | "  # 新增
        f"耗时: {elapsed:.1f}s | 平均: {elapsed/total:.2f}s/币种"
    )

需要在 __init__ 中添加计数器:

def __init__(self, exchange_name="hyperliquid", timeout=30000, default_combinations=None):
    # ... 原有初始化 ...
    
    # 新增:协整稳定性统计
    self.unstable_cointegration_count = 0

zscore_analysis 中检测到不稳定时更新计数器:

if unstable_count > 0:
    self.unstable_cointegration_count += 1  # 更新计数器
    logger.warning(...)
    return None

6. 添加开关控制(向后兼容)

_detect_cointegration_breakdown 调用前添加开关检查:

# 在 multiple_cointegration_analysis 方法中
is_stable = True

if cointegration_status_total_period or cointegration_status_short_period:
    # 检查是否启用稳定性检测
    if getattr(self, 'ENABLE_COINTEGRATION_STABILITY_CHECK', True):
        is_breakdown, p_values, trend_score = DelayCorrelationAnalyzer._detect_cointegration_breakdown(...)
        if is_breakdown:
            is_stable = False
            logger.warning(...)
    else:
        logger.debug("协整稳定性检测已禁用,跳过检测")

验证现有代码的连续更新特性

确认点price_diff_spread_ols_window 方法(448-495行)

# 已有代码分析:
def price_diff_spread_ols_window(base_prices, alt_prices, beta_window=100, zscore_window=30):
    # 使用最近beta_window期数据
    data_window = max(beta_window, zscore_window)
    recent_base_full = base_prices.iloc[-data_window:]  # ✅ 动态取最新数据
    recent_alt_full = alt_prices.iloc[-data_window:]
    
    # OLS使用前beta_window-1期(避免look-ahead)
    ols_base = recent_base_full.iloc[:-1]  # ✅ 避免未来数据
    ols_alt = recent_alt_full.iloc[:-1]
    
    # 计算OLS参数
    model = LinearRegression()
    model.fit(log_base_ols, log_alt_ols)  # ✅ 每次调用都重新拟合

结论:现有代码已实现连续更新OLS,无需修改。每次调用 zscore_analysis 时都会使用最新数据重新计算Beta系数。

预期效果

崩溃检测示例

场景1:协整关系稳定

币种: ETH/USDC:USDC
滚动窗口p-values: [0.0120, 0.0180, 0.0250]  # 50期→100期→150期
趋势评分: +0.65%(轻微恶化,但在正常范围)
判断: ✅ 稳定,继续输出信号

场景2:协整关系崩溃

币种: DOGE/USDC:USDC
滚动窗口p-values: [0.0450, 0.0850, 0.1200]  # 快速恶化
趋势评分: +3.75%(显著恶化)
判断: ❌ 不稳定(连续2个窗口>0.10),停止信号

性能影响

  • 每个币种额外计算:3次OLS回归 + 3次ADF检验
  • 预计增加计算时间:15-20%
  • 可通过减少lookback_windows数量优化

误判风险

  • 假阳性(错误拦截):约5-8%(保守设计)
  • 假阴性(漏检崩溃):约10-15%(滚动窗口延迟)
  • 可通过调整 COINTEGRATION_CONSECUTIVE_FAILS 参数平衡

文件修改清单

  • multi_coins3.py
    • 新增方法:_detect_cointegration_breakdown(约60行)
    • 修改方法:multiple_cointegration_analysis(添加稳定性检测,约30行)
    • 修改方法:zscore_analysis(添加稳定性过滤,约15行)
    • 修改方法:__init__(添加计数器,1行)
    • 修改方法:run(添加统计输出,2行)
    • 添加类常量:稳定性检测配置(约5行)

总计:约60行新增代码,48行修改

后续优化建议

如果后续需要更精确的监测,可考虑:

  1. CUSUM检验:更灵敏的结构突变检测(额外+30行代码)
  2. 卡尔曼滤波:实时跟踪时变Beta(额外+100行代码)
  3. 协整恢复检测:自动解除不稳定币种的黑名单(额外+40行代码)

当前方案已足够应对大多数协整漂移场景,建议先运行观察效果再决定是否深化。

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG