K线数据持久化优化方案

一、数据库选型推荐

基于您的需求分析:

推荐方案:TimescaleDB(PostgreSQL时序扩展)

选择理由:

  • 时序数据专用:专为K线这类时间序列数据优化,查询性能比普通PostgreSQL提升10-100倍
  • SQL兼容:完全兼容PostgreSQL,学习成本低,生态成熟
  • 实时写入优化:支持高并发插入,适合您的实时增量写入需求
  • 时间范围查询极快:内置时间分区(hypertable),按时间查询接近O(1)
  • 云服务支持:Timescale Cloud 或 AWS RDS for PostgreSQL + TimescaleDB 扩展
  • 数据压缩:自动压缩历史数据,节省50-90%存储空间
  • Python生态完善:psycopg2/asyncpg 驱动成熟稳定

数据规模估算(您的场景):

  • 50个币种 × 3个时间周期(5m/1h/4h)× 180天
  • 5分钟K线:288条/天 × 180天 × 50币种 = 259万条
  • 1小时K线:24条/天 × 180天 × 50币种 = 21.6万条
  • 4小时K线:6条/天 × 180天 × 50币种 = 5.4万条
  • 总计约286万条记录,压缩后约500MB-1GB存储

备选方案对比

| 数据库 | 优势 | 劣势 | 适用场景 |

|--------|------|------|----------|

| InfluxDB | 时序数据库专家,写入性能最强 | InfluxQL学习成本,生态较小 | 纯时序数据,追求极致性能 |

| ClickHouse | 分析查询极快,压缩比高 | 复杂部署,不适合频繁更新 | 大规模数据分析(亿级) |

| Redis (TimeSeries模块) | 内存速度,实时性极强 | 成本高,不适合大规模历史 | 实时K线推送(配合主库) |

| MongoDB | 灵活Schema,部署简单 | 时序查询性能一般 | 原型开发,需求未定 |

二、架构设计

2.1 数据库表结构设计

-- 创建 TimescaleDB 扩展
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- K线数据主表(使用 hypertable 自动分区)
CREATE TABLE klines (
    time TIMESTAMPTZ NOT NULL,           -- 时间戳(主键一部分)
    symbol VARCHAR(50) NOT NULL,          -- 交易对(如 BTC/USDC:USDC)
    timeframe VARCHAR(10) NOT NULL,       -- 时间周期(5m/1h/4h)
    open DOUBLE PRECISION NOT NULL,       -- 开盘价
    high DOUBLE PRECISION NOT NULL,       -- 最高价
    low DOUBLE PRECISION NOT NULL,        -- 最低价
    close DOUBLE PRECISION NOT NULL,      -- 收盘价
    volume DOUBLE PRECISION NOT NULL,     -- 成交量
    volume_usd DOUBLE PRECISION,          -- 成交额(USD)
    return DOUBLE PRECISION,              -- 收益率(计算字段)
    created_at TIMESTAMPTZ DEFAULT NOW(), -- 数据写入时间
    PRIMARY KEY (time, symbol, timeframe)
);

-- 转换为 TimescaleDB hypertable(自动时间分区)
SELECT create_hypertable('klines', 'time', 
    chunk_time_interval => INTERVAL '7 days',  -- 每7天一个分区
    if_not_exists => TRUE
);

-- 创建复合索引(加速常见查询)
CREATE INDEX idx_symbol_timeframe_time ON klines (symbol, timeframe, time DESC);
CREATE INDEX idx_timeframe_time ON klines (timeframe, time DESC);

-- 启用自动压缩(历史数据压缩,节省存储)
ALTER TABLE klines SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'symbol,timeframe',
    timescaledb.compress_orderby = 'time DESC'
);

-- 自动压缩策略:7天以前的数据自动压缩
SELECT add_compression_policy('klines', INTERVAL '7 days');

-- 数据保留策略(可选):自动删除6个月前的数据
SELECT add_retention_policy('klines', INTERVAL '180 days');

2.2 缓存策略(三层缓存架构)

graph LR
    API[交易所API] --> L1[L1: 内存缓存dict]
    L1 --> L2[L2: Redis可选]
    L2 --> L3[L3: TimescaleDB]
    L3 --> Analysis[分析计算]

缓存层设计:

  1. L1 内存缓存(已有 base_df_cachealt_df_cache):保留当前运行周期的数据
  2. L2 Redis缓存(可选):缓存最近1小时的实时K线,TTL=1h
  3. L3 TimescaleDB:持久化全量历史数据

2.3 数据流程图

flowchart TD
    Start[开始分析] --> CheckCache{检查内存缓存}
    CheckCache -->|命中| UseCache[使用缓存数据]
    CheckCache -->|未命中| CheckDB{检查数据库}
    CheckDB -->|存在且新鲜| LoadDB[从数据库加载]
    CheckDB -->|不存在或过时| FetchAPI[调用API获取]
    FetchAPI --> SaveDB[保存到数据库]
    SaveDB --> UpdateCache[更新内存缓存]
    LoadDB --> UpdateCache
    UpdateCache --> Analysis[执行分析]
    UseCache --> Analysis
    Analysis --> End[结束]

三、代码实现方案

3.1 创建数据库操作模块

新建文件:utils/timescaledb.py

核心功能:

  • 数据库连接管理(使用连接池)
  • K线数据批量插入(使用 COPY 或批量 INSERT)
  • 按时间范围查询K线数据
  • 数据去重(使用 ON CONFLICT DO NOTHING

关键方法:

class TimescaleDBClient:
    def __init__(self, connection_string)
    def save_klines(self, df, symbol, timeframe) -> int
    def load_klines(self, symbol, timeframe, period) -> pd.DataFrame
    def get_latest_timestamp(self, symbol, timeframe) -> datetime
    def batch_insert(self, klines_batch) -> int

3.2 修改 multi_coins3.py

需要修改的核心方法:

  1. __init__ (153-189行):
    • 添加 TimescaleDBClient 实例化
    • 添加配置项:ENABLE_DB_CACHEDB_CONNECTION_STRING
  2. download_ccxt_data (256-303行):
    • 调用前先查询数据库:db.load_klines(symbol, timeframe, period)
    • 如果数据库有数据且足够新鲜,直接返回
    • 如果需要增量更新,只获取缺失的时间段
    • API获取后,调用 db.save_klines() 保存
  3. _get_base_data (761-785行):
    • 先查内存缓存 → 再查数据库 → 最后调用API
  4. _get_alt_data (787-827行):
    • 同样的三层查询策略

3.3 增量更新策略

def smart_download(self, symbol, period, timeframe):
    """智能下载:优先使用数据库,仅增量获取缺失数据"""
    # 1. 查询数据库中最新的时间戳
    latest_ts = self.db.get_latest_timestamp(symbol, timeframe)
    
    # 2. 计算需要的时间范围
    target_bars = self._period_to_bars(period, timeframe)
    need_since = now - target_bars * bar_interval
    
    # 3. 判断数据库数据是否足够
    if latest_ts and latest_ts >= need_since:
        # 数据库有足够的历史数据
        df = self.db.load_klines(symbol, timeframe, period)
        
        # 检查是否需要增量更新(最新数据是否过时)
        if now - latest_ts > bar_interval * 2:
            # 增量获取最新数据
            new_data = self.exchange.fetch_ohlcv(symbol, timeframe, since=latest_ts)
            df = pd.concat([df, new_data]).drop_duplicates()
            self.db.save_klines(df, symbol, timeframe)
        
        return df
    else:
        # 数据库无数据或不足,全量获取
        df = self.download_ccxt_data(symbol, period, timeframe)
        self.db.save_klines(df, symbol, timeframe)
        return df

四、性能优化措施

4.1 批量写入优化

  • 使用 PostgreSQL COPY 命令(比 INSERT 快10-100倍)
  • 批量大小:500-1000条/批次
  • 异步写入:使用线程池或 asyncpg

4.2 查询优化

  • 利用 TimescaleDB 的 time_bucket() 函数聚合查询
  • 使用 EXPLAIN ANALYZE 分析慢查询
  • 适当添加物化视图(如每日统计)

4.3 连接池配置

from psycopg2 import pool

connection_pool = pool.ThreadedConnectionPool(
    minconn=2,
    maxconn=10,
    dsn=connection_string
)

五、云端部署建议

方案1:Timescale Cloud(推荐)

  • 优势:免运维,自动备份,按量付费
  • 成本:约 $50-100/月(您的数据规模)
  • 配置:2核4GB即可满足需求

方案2:AWS RDS PostgreSQL + TimescaleDB

  • 优势:与现有AWS服务集成,灵活配置
  • 成本:约 $30-80/月(使用 db.t3.medium)

方案3:自建(Docker Compose)

  • 优势:成本低,完全控制
  • 劣势:需要自行备份和运维

六、迁移路径

  1. 第一阶段(1-2天)
    • 搭建 TimescaleDB 测试环境(Docker本地测试)
    • 实现 TimescaleDBClient 基础功能
    • 单元测试数据读写
  2. 第二阶段(2-3天)
    • 修改 multi_coins3.py 集成数据库
    • 实现缓存穿透逻辑
    • 批量导入历史数据
  3. 第三阶段(1天)
    • 云端部署数据库
    • 性能测试和优化
    • 监控和告警配置

七、监控指标

  • 数据库存储空间使用率
  • 查询响应时间(P95、P99)
  • API调用次数减少率(目标:减少70%+)
  • 缓存命中率(目标:>80%)

预期效果:

  • ⚡ API调用减少70%+,避免限流
  • 🚀 数据加载速度提升5-10倍
  • 💾 支持离线分析和回测
  • 📊 便于数据可视化和监控

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG