K线周期可配置化设计方案

K线周期可配置化设计方案

1. 背景与目标

当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要:

  1. 将 K 线周期从 1h 改为 2h
  2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置

2. 影响范围分析

2.1 需要修改的文件(共 6 个)

文件 硬编码位置 修改内容
src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,load_trading_config() 从环境变量加载
src/services/beta_arb_service.py L61-62 WS订阅 "1h", L112 日志, L152 过滤 "1h" 改为读取 config.beta_arb.kline_interval
src/trading/orchestrator.py L853-946 _prime_strategy_buffer()_query_1h_klines() 中多处 '1h' 改为从 self._config.beta_arb.kline_interval 读取
.env 无此变量 新增 BETA_ARB_KLINE_INTERVAL=2h
.env.prod 无此变量 新增 BETA_ARB_KLINE_INTERVAL=2h
.env.example 无此变量 新增 BETA_ARB_KLINE_INTERVAL=2h

2.2 不需要修改的文件

文件 原因
src/config.py DEFAULT_TIMEFRAMESDEDUP_WINDOWS 等用于通用 K 线服务(realtime_kline_service),与 Beta 套利策略独立
src/services/realtime_kline_service_base.py _TIMEFRAME_DURATIONS 用于通用 K 线分析服务,Beta 套利走独立 WS 订阅通道
src/trading/strategy.py 策略核心逻辑不感知 K 线周期,只接收 (symbol, time, close, prev_close)
src/scripts/backtest_beta_arbitrage.py 回测脚本有独立的 TIMEFRAME 常量,可后续单独调整

3. 详细修改方案

3.1 src/trading/config.py

BetaArbConfig dataclass — 新增字段:

@dataclass(frozen=True)
class BetaArbConfig:
    """Beta套利策略参数"""
    # 标的
    eth_symbol: str = "ETH/USDC:USDC"
    btc_symbol: str = "BTC/USDC:USDC"

    # K线周期(WS订阅 & DB查询使用,如 "1h", "2h", "4h")
    kline_interval: str = "2h"                     # <-- 新增

    # Beta计算
    negative_return_count: int = 10
    ...

load_trading_config() — 加载环境变量:

beta_arb = BetaArbConfig(
    eth_symbol=os.getenv("BETA_ARB_ETH_SYMBOL", "ETH/USDC:USDC"),
    btc_symbol=os.getenv("BETA_ARB_BTC_SYMBOL", "BTC/USDC:USDC"),
    kline_interval=os.getenv("BETA_ARB_KLINE_INTERVAL", "2h"),  # <-- 新增
    negative_return_count=_env_int("BETA_ARB_NEGATIVE_RETURN_COUNT", "10"),
    ...
)

3.2 src/services/beta_arb_service.py

3 处硬编码 "1h" 需替换:

L60-62 — WS 订阅构建:

# 修改前
market_subs = [
    {"type": "candle", "coin": eth_coin, "interval": "1h"},
    {"type": "candle", "coin": btc_coin, "interval": "1h"},
    ...
]

# 修改后
interval = ba.kline_interval
market_subs = [
    {"type": "candle", "coin": eth_coin, "interval": interval},
    {"type": "candle", "coin": btc_coin, "interval": interval},
    ...
]

L112 — 启动日志:

# 修改前
logger.info(f"行情WS启动: {eth_coin}/1h + {btc_coin}/1h")

# 修改后
logger.info(f"行情WS启动: {eth_coin}/{interval} + {btc_coin}/{interval}")

L152 — WS 消息过滤:

# 修改前
if interval != "1h" or not coin:
    return

# 修改后
if interval != self._kline_interval or not coin:
    return

注意:_on_market_message 中的局部变量 interval 来自 WS 消息解析(L151),与上面启动时的 interval 不同。
需要在 __init__ 中保存 self._kline_interval = "1h",然后在 start() 中赋值为 ba.kline_interval

完整改动:

class BetaArbService:
    def __init__(self):
        self._orchestrator = TradingOrchestrator()
        ...
        self._kline_interval: str = "2h"           # <-- 新增,start() 中从config覆盖

    def start(self):
        ...
        ba = self._orchestrator.config.beta_arb
        eth_coin = symbol_to_coin(ba.eth_symbol)
        btc_coin = symbol_to_coin(ba.btc_symbol)
        self._kline_interval = ba.kline_interval    # <-- 新增

        ...
        interval = ba.kline_interval
        market_subs = [
            {"type": "candle", "coin": eth_coin, "interval": interval},
            {"type": "candle", "coin": btc_coin, "interval": interval},
            ...
        ]
        ...
        logger.info(f"行情WS启动: {eth_coin}/{interval} + {btc_coin}/{interval}")

    def _on_market_message(self, msg: dict):
        ...
        interval = data.get("i")
        if interval != self._kline_interval or not coin:  # <-- 使用实例变量
            return

3.3 src/trading/orchestrator.py

_prime_strategy_buffer() — 7 处 '1h' 替换为 ba.kline_interval

def _prime_strategy_buffer(self, db_client: TimescaleDBClient):
    """从DB加载历史K线灌入策略缓冲区,含连续性/时效性校验与自动回填"""
    ba = self._config.beta_arb
    kline_interval = ba.kline_interval              # <-- 新增局部变量
    ...
    for symbol in [ba.eth_symbol, ba.btc_symbol]:
        try:
            rows = self._query_klines(db_client, symbol, query_start, kline_interval)
            ...
            is_continuous, missing_ts = filler.validate_continuity(rows, kline_interval)
            ...
            filled = filler.fill_missing_data_precise(symbol, kline_interval, missing_ts)
            ...
            filled = filler.fill_missing_data(symbol, kline_interval, latest_time, now_utc)
            ...
            filled = filler.fill_missing_data(symbol, kline_interval, query_start, now_utc)
            ...
            rows = self._query_klines(db_client, symbol, query_start, kline_interval)

_query_1h_klines() — 重命名并参数化:

# 修改前
@staticmethod
def _query_1h_klines(db_client, symbol, start):
    return db_client.execute_query(
        """
        SELECT time, close, return_pct
        FROM klines
        WHERE symbol = %s AND timeframe = '1h'
          AND time >= %s
        ORDER BY time ASC
        """,
        (symbol, start),
    )

# 修改后
@staticmethod
def _query_klines(db_client, symbol, start, timeframe: str = "2h"):
    return db_client.execute_query(
        """
        SELECT time, close, return_pct
        FROM klines
        WHERE symbol = %s AND timeframe = %s
          AND time >= %s
        ORDER BY time ASC
        """,
        (symbol, timeframe, start),
    )

3.4 .env / .env.prod / .env.example

# — 信号 — 部分新增:

# — K线周期 —
BETA_ARB_KLINE_INTERVAL=2h                         # 策略K线周期 (1h / 2h / 4h)

4. 数据流变化

修改前:
  .env (无) → 硬编码 "1h" → WS订阅 1h candle → 过滤 "1h" → 策略信号
                           → DB查询 timeframe='1h'

修改后:
  .env BETA_ARB_KLINE_INTERVAL=2h
    → BetaArbConfig.kline_interval="2h"
    → BetaArbService: WS订阅 2h candle, 过滤 "2h"
    → Orchestrator: DB查询 timeframe='2h', 回填 '2h'
    → 策略信号(strategy.py 不变,只接收数据)

5. 前置条件

  • Hyperliquid WS 支持 2h 作为 candle interval(已确认支持:1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 8h, 12h, 1d, 3d, 1w, 1M)
  • DB klines 表需要有 timeframe='2h' 的数据。如果之前只采集了 1h 数据,首次启动时 _prime_strategy_buffer() 会触发自动回填

6. 合法值与校验

环境变量为字符串,不做强校验(与现有 eth_symbolbtc_symbol 一致)。合法值取决于 Hyperliquid WS 支持的 interval 列表。如果传入非法值,WS 订阅会失败并通过现有的重连/告警机制报错。

7. 回测脚本说明

src/scripts/backtest_beta_arbitrage.py 中有独立的 TIMEFRAME = '1h' 常量,本次不修改。回测脚本是离线工具,与实盘服务独立运行,可后续根据需要单独调整。

Read more

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

对于空间环境、“信息/逻辑”(比如代码、结构、表达)秩序追求的心理特征分析

一、为什么是“空间 + 信息”同时强化? 因为你当年面对的是“双重失控”: 1️⃣ 外部世界是脏乱 + 失序的 * 空间被污染 * 行为无边界 * 基本生活秩序崩塌 👉 所以你现在会强烈要求: * 桌面干净 * 房间有序 * 物品可控 这是在修复:“物理世界必须是可控的” 2️⃣ 人的行为和逻辑也是混乱的 * 没有规则 * 没有底线 * 没有理性 👉 所以你现在会特别在意: * 表达是否清晰 * 逻辑是否自洽 * 结构是否优雅 * 代码是否干净 这是在修复:“认知世界必须是合理的” 二、你其实构建了一个“高纯度系统” 你现在的偏好,本质上是: 👉 低噪音 + 高结构 + 强控制感 具体表现就是: * 空间:极简、整洁、可预测 * 信息:清晰、压缩、无冗余 这类人有一个很明显的优势: 👉 处理复杂问题时,

By SHI XIAOLONG

我现在有洁癖,追求极致的秩序感,追求极简主义,讨厌混乱的生活状态(心里溯源)

其来源于高压的成长环境 一个直接判断: 你现在的“洁癖 + 秩序感 + 极简”,本质上是 👉 从“极端混乱环境”里进化出来的“反混乱系统” 而不是单纯的性格偏好。 一、你的系统是怎么被“塑造”的 你当年的环境,有几个核心特征: * 无规则(甚至反规则) * 无边界(到处污染、侵犯) * 无秩序(行为完全失控) * 无安全感(人是不可预测的) 对一个“想保持正常的人”来说,这种环境会产生一个本能需求: 👉 我要建立一个“绝对可控的世界” 于是你的大脑学会了三件事: 1️⃣ 用“干净”对抗“污染” 当年那种: * 到处脏乱 * 身体/空间被侵犯 会让人产生一种深层反应: 👉 “我要把一切恢复到干净、可控” 所以现在的洁癖,本质是: * 不是怕脏 * 而是怕“

By SHI XIAOLONG