K线周期可配置化设计方案
K线周期可配置化设计方案
1. 背景与目标
当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要:
- 将 K 线周期从
1h改为2h - 提取为环境变量
BETA_ARB_KLINE_INTERVAL,使其可在.env中配置
2. 影响范围分析
2.1 需要修改的文件(共 6 个)
| 文件 | 硬编码位置 | 修改内容 |
|---|---|---|
src/trading/config.py |
BetaArbConfig dataclass |
新增 kline_interval 字段,load_trading_config() 从环境变量加载 |
src/services/beta_arb_service.py |
L61-62 WS订阅 "1h", L112 日志, L152 过滤 "1h" |
改为读取 config.beta_arb.kline_interval |
src/trading/orchestrator.py |
L853-946 _prime_strategy_buffer() 和 _query_1h_klines() 中多处 '1h' |
改为从 self._config.beta_arb.kline_interval 读取 |
.env |
无此变量 | 新增 BETA_ARB_KLINE_INTERVAL=2h |
.env.prod |
无此变量 | 新增 BETA_ARB_KLINE_INTERVAL=2h |
.env.example |
无此变量 | 新增 BETA_ARB_KLINE_INTERVAL=2h |
2.2 不需要修改的文件
| 文件 | 原因 |
|---|---|
src/config.py |
DEFAULT_TIMEFRAMES、DEDUP_WINDOWS 等用于通用 K 线服务(realtime_kline_service),与 Beta 套利策略独立 |
src/services/realtime_kline_service_base.py |
_TIMEFRAME_DURATIONS 用于通用 K 线分析服务,Beta 套利走独立 WS 订阅通道 |
src/trading/strategy.py |
策略核心逻辑不感知 K 线周期,只接收 (symbol, time, close, prev_close) |
src/scripts/backtest_beta_arbitrage.py |
回测脚本有独立的 TIMEFRAME 常量,可后续单独调整 |
3. 详细修改方案
3.1 src/trading/config.py
BetaArbConfig dataclass — 新增字段:
@dataclass(frozen=True)
class BetaArbConfig:
"""Beta套利策略参数"""
# 标的
eth_symbol: str = "ETH/USDC:USDC"
btc_symbol: str = "BTC/USDC:USDC"
# K线周期(WS订阅 & DB查询使用,如 "1h", "2h", "4h")
kline_interval: str = "2h" # <-- 新增
# Beta计算
negative_return_count: int = 10
...
load_trading_config() — 加载环境变量:
beta_arb = BetaArbConfig(
eth_symbol=os.getenv("BETA_ARB_ETH_SYMBOL", "ETH/USDC:USDC"),
btc_symbol=os.getenv("BETA_ARB_BTC_SYMBOL", "BTC/USDC:USDC"),
kline_interval=os.getenv("BETA_ARB_KLINE_INTERVAL", "2h"), # <-- 新增
negative_return_count=_env_int("BETA_ARB_NEGATIVE_RETURN_COUNT", "10"),
...
)
3.2 src/services/beta_arb_service.py
3 处硬编码 "1h" 需替换:
L60-62 — WS 订阅构建:
# 修改前
market_subs = [
{"type": "candle", "coin": eth_coin, "interval": "1h"},
{"type": "candle", "coin": btc_coin, "interval": "1h"},
...
]
# 修改后
interval = ba.kline_interval
market_subs = [
{"type": "candle", "coin": eth_coin, "interval": interval},
{"type": "candle", "coin": btc_coin, "interval": interval},
...
]
L112 — 启动日志:
# 修改前
logger.info(f"行情WS启动: {eth_coin}/1h + {btc_coin}/1h")
# 修改后
logger.info(f"行情WS启动: {eth_coin}/{interval} + {btc_coin}/{interval}")
L152 — WS 消息过滤:
# 修改前
if interval != "1h" or not coin:
return
# 修改后
if interval != self._kline_interval or not coin:
return
注意:
_on_market_message中的局部变量interval来自 WS 消息解析(L151),与上面启动时的interval不同。
需要在__init__中保存self._kline_interval = "1h",然后在start()中赋值为ba.kline_interval。
完整改动:
class BetaArbService:
def __init__(self):
self._orchestrator = TradingOrchestrator()
...
self._kline_interval: str = "2h" # <-- 新增,start() 中从config覆盖
def start(self):
...
ba = self._orchestrator.config.beta_arb
eth_coin = symbol_to_coin(ba.eth_symbol)
btc_coin = symbol_to_coin(ba.btc_symbol)
self._kline_interval = ba.kline_interval # <-- 新增
...
interval = ba.kline_interval
market_subs = [
{"type": "candle", "coin": eth_coin, "interval": interval},
{"type": "candle", "coin": btc_coin, "interval": interval},
...
]
...
logger.info(f"行情WS启动: {eth_coin}/{interval} + {btc_coin}/{interval}")
def _on_market_message(self, msg: dict):
...
interval = data.get("i")
if interval != self._kline_interval or not coin: # <-- 使用实例变量
return
3.3 src/trading/orchestrator.py
_prime_strategy_buffer() — 7 处 '1h' 替换为 ba.kline_interval:
def _prime_strategy_buffer(self, db_client: TimescaleDBClient):
"""从DB加载历史K线灌入策略缓冲区,含连续性/时效性校验与自动回填"""
ba = self._config.beta_arb
kline_interval = ba.kline_interval # <-- 新增局部变量
...
for symbol in [ba.eth_symbol, ba.btc_symbol]:
try:
rows = self._query_klines(db_client, symbol, query_start, kline_interval)
...
is_continuous, missing_ts = filler.validate_continuity(rows, kline_interval)
...
filled = filler.fill_missing_data_precise(symbol, kline_interval, missing_ts)
...
filled = filler.fill_missing_data(symbol, kline_interval, latest_time, now_utc)
...
filled = filler.fill_missing_data(symbol, kline_interval, query_start, now_utc)
...
rows = self._query_klines(db_client, symbol, query_start, kline_interval)
_query_1h_klines() — 重命名并参数化:
# 修改前
@staticmethod
def _query_1h_klines(db_client, symbol, start):
return db_client.execute_query(
"""
SELECT time, close, return_pct
FROM klines
WHERE symbol = %s AND timeframe = '1h'
AND time >= %s
ORDER BY time ASC
""",
(symbol, start),
)
# 修改后
@staticmethod
def _query_klines(db_client, symbol, start, timeframe: str = "2h"):
return db_client.execute_query(
"""
SELECT time, close, return_pct
FROM klines
WHERE symbol = %s AND timeframe = %s
AND time >= %s
ORDER BY time ASC
""",
(symbol, timeframe, start),
)
3.4 .env / .env.prod / .env.example
在 # — 信号 — 部分新增:
# — K线周期 —
BETA_ARB_KLINE_INTERVAL=2h # 策略K线周期 (1h / 2h / 4h)
4. 数据流变化
修改前:
.env (无) → 硬编码 "1h" → WS订阅 1h candle → 过滤 "1h" → 策略信号
→ DB查询 timeframe='1h'
修改后:
.env BETA_ARB_KLINE_INTERVAL=2h
→ BetaArbConfig.kline_interval="2h"
→ BetaArbService: WS订阅 2h candle, 过滤 "2h"
→ Orchestrator: DB查询 timeframe='2h', 回填 '2h'
→ 策略信号(strategy.py 不变,只接收数据)
5. 前置条件
- Hyperliquid WS 支持
2h作为 candle interval(已确认支持:1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 8h, 12h, 1d, 3d, 1w, 1M) - DB
klines表需要有timeframe='2h'的数据。如果之前只采集了1h数据,首次启动时_prime_strategy_buffer()会触发自动回填
6. 合法值与校验
环境变量为字符串,不做强校验(与现有 eth_symbol、btc_symbol 一致)。合法值取决于 Hyperliquid WS 支持的 interval 列表。如果传入非法值,WS 订阅会失败并通过现有的重连/告警机制报错。
7. 回测脚本说明
src/scripts/backtest_beta_arbitrage.py 中有独立的 TIMEFRAME = '1h' 常量,本次不修改。回测脚本是离线工具,与实盘服务独立运行,可后续根据需要单独调整。