HIP-3集成设计文档

HIP-3 集成设计文档

版本: v1.0
日期: 2026-03-03
适用项目: Trading-in-websocket(全币种协整配对量化交易系统)


目录

  1. 背景与目标
  2. HIP-3 技术概述
  3. 关键约束与设计决策
  4. 整体架构设计
  5. 数据层设计
  6. 服务层设计
  7. 交易执行层设计
  8. 配置层设计
  9. 文件改动清单
  10. 使用流程
  11. 验证方案
  12. 风险与注意事项

1. 背景与目标

1.1 现状

当前系统(RealtimeKlineService)监控 Hyperliquid 主网标准永续合约(约 220 个币种),以 BTC/USDC:USDC 为基准,通过协整分析筛选配对并执行 Adaptive Bollinger Z-Score 策略。

1.2 目标

HIP-3(Builder-Deployed Perpetuals) 市场纳入现有系统:

维度 目标
监控 动态发现所有 HIP-3 DEX 及其代币,注册到 DB,为 backfill 和回测提供数据源
实时分析 cointegrated_pairs 表中已登记的 HIP-3 配对,实时计算 Z-score 并告警
交易 对 HIP-3 币种信号自动下单(隔离保证金),复用现有策略引擎

1.3 用户决策

决策项 选择
HIP-3 币种来源 动态发现所有 DEX(perpDexs API)
配对分析策略 cointegrated_pairs 表中已登记的配对
交易 SDK Hyperliquid 官方 Python SDK(原生支持 HIP-3)
服务架构 扩展现有全币种服务,不新建独立进程

2. HIP-3 技术概述

2.1 什么是 HIP-3

HIP-3(Hyperliquid Improvement Proposal 3)是 Builder-Deployed Perpetuals 协议,允许第三方开发者在 Hyperliquid 基础设施上部署独立的永续期货 DEX。

核心特性

特性 说明
质押要求 部署者需质押 500,000 HYPE
保证金模式 仅支持隔离保证金(isolated-margin only),不支持跨仓
费用结构 标准 perp 的 2 倍,部署者获得所有费用的 50%
资产命名 前缀格式:dex_name:TOKEN,如 xyz:XYZ100
独立性 每个 DEX 有独立资金、订单簿、预言机和参数设置

2.2 资产 ID 计算

标准永续: asset_id = universe 中的顺序 index(0 起)
HIP-3:    asset_id = 110000 + dex_order * 10000 + local_index
  • dex_orderperpDexs API 响应中 DEX 的顺序(从 0 开始,API 返回数组 [null, dex0, dex1, ...]
  • local_index:该 DEX 的 universe 数组中代币的顺序

由 SDK 自动处理,应用层无需手动计算。

2.3 Symbol 格式规范

层级 格式 示例
Hyperliquid API(coin) dex:TOKEN xyz:XYZ100
项目内部 symbol dex:TOKEN/USDC:USDC xyz:XYZ100/USDC:USDC
WS 订阅 coin dex:TOKEN xyz:XYZ100
DB 存储 symbol dex:TOKEN/USDC:USDC xyz:XYZ100/USDC:USDC

兼容性:现有 symbol_to_coin("xyz:XYZ100/USDC:USDC") 返回 "xyz:XYZ100"无需修改

2.4 SDK 原生支持

Hyperliquid Python SDK 已原生支持 HIP-3,关键接口:

# Info 初始化(带 HIP-3 DEX 列表)
info = Info(base_url, skip_ws=True, perp_dexs=["xyz", "abc"])
# 内部自动调用 metaAndAssetCtxs?dex=xxx 填充 coin_to_asset 映射

# Exchange 初始化(带 HIP-3 DEX 列表)
exchange = Exchange(wallet, base_url, perp_dexs=["xyz", "abc"])

# 下单(HIP-3 coin 格式与标准 perp 完全一致)
exchange.market_open("xyz:XYZ100", is_buy=True, sz=1.0, slippage=0.01)
exchange.market_close("xyz:XYZ100", sz=1.0, slippage=0.01)

# 价格查询(带 dex 参数)
info.all_mids(dex="xyz")   # 返回该 DEX 所有 coin 的中间价

# 持仓查询(带 dex 参数)
info.user_state(address, dex="xyz")   # 返回该 DEX 的用户状态

# DEX 发现
info.perp_dexs()
# → [null, {"name": "xyz", "deployer": "0x...", ...}, ...]

2.5 WebSocket 订阅

HIP-3 的 WS K 线订阅格式与标准 perp 完全相同

{
  "method": "subscribe",
  "subscription": {
    "type": "candle",
    "coin": "xyz:XYZ100",
    "interval": "5m"
  }
}

重要限制:每个 IP 最多 1,000 个 WebSocket 订阅


3. 关键约束与设计决策

3.1 WebSocket 订阅上限(最关键约束)

现状分析

标准 perp: ~220 个币种 × 3 周期(5m/1h/4h) = 660 个 candle 订阅
           + 220 个 l2Book 订阅
           = 约 880 个订阅
剩余可用:   1000 - 880 = 120 个订阅

若对所有 HIP-3 币种全量订阅(假设 100 个 HIP-3 coins × 3 周期 = 300 个),将严重超限。

设计决策:分层订阅策略

HIP-3 币种 WS 订阅:仅订阅 cointegrated_pairs 表中已登记的 HIP-3 配对
HIP-3 历史数据收集:通过 backfill 脚本(REST API)收集,不占用 WS 配额

这样只有经过回测验证、已导入配对表的 HIP-3 币种才会占用 WS 订阅资源。

3.2 仅支持隔离保证金

官方文档明确说明:HIP-3 市场当前仅支持隔离保证金,跨仓计划未来支持。

设计决策:在 _sync_hip3_coin_meta() 中将所有 HIP-3 coin 的 forceIsolated 设为 True
现有的 _ensure_leverage() 方法检测到 forceIsolated=True 会自动使用 is_cross=False
无需修改 _ensure_leverage() 代码

3.3 持仓查询差异

标准 perp 持仓通过 user_state(address) 获取;HIP-3 持仓需要 user_state(address, dex="xyz") 按 DEX 单独查询。

设计决策get_positions() 在现有标准查询基础上,遍历 _hip3_dex_names 追加查询并合并结果。

3.4 L2Book WS 缓存 Key 解析 Bug

现有 _get_all_mids_from_ws() 中:

coin = key.split(":")[0]   # 期望拆分 "BTC:l2Book" → "BTC"

HIP-3 coin 名含 :,如 "xyz:XYZ100:l2Book" 被错误解析为 "xyz"

设计决策:修复为:

coin = key[:-len(":l2Book")]   # "xyz:XYZ100:l2Book" → "xyz:XYZ100" ✓

3.5 分析任务中的 Base Symbol 处理

现有逻辑对所有 symbol 都追加 self.base_symbol(BTC)作为兜底分析基准。
HIP-3 coins 只应与 cointegrated_pairs 表中登记的基准配对,不应自动追加 BTC

设计决策:分析 worker 中检测到 HIP-3 symbol 时跳过默认 base 追加。


4. 整体架构设计

4.1 数据流图

┌─────────────────────────────────────────────────────────────────────┐
│                     【历史数据收集层】(离线)                         │
│                                                                     │
│  perpDexs API → 发现所有 HIP-3 DEX                                  │
│      ↓                                                              │
│  metaAndAssetCtxs → 获取各 DEX 的 universe(代币列表)               │
│      ↓                                                              │
│  symbol_metadata 表 ← upsert (market_type='hip3')                  │
│      ↓                                                              │
│  backfill_all_data.py → REST API 拉取历史 K 线 → klines 表          │
│      ↓                                                              │
│  回测脚本(backtest_pairwise_correlation.py 等)                     │
│      ↓                                                              │
│  import_cointegrated_pairs.py → cointegrated_pairs 表               │
└─────────────────────────────────────────────────────────────────────┘
                              ↓(cointegrated_pairs 驱动)
┌─────────────────────────────────────────────────────────────────────┐
│                     【实时监控层】(在线)                            │
│                                                                     │
│  Hyperliquid WebSocket API                                          │
│  ├── 标准 perp candle:所有活跃币种                                  │
│  └── HIP-3 candle:仅 cointegrated_pairs 中的 HIP-3 coins           │
│                    ↓                                                │
│  EnhancedWebSocketManager(假活检测 + 自动重连)                     │
│                    ↓                                                │
│  分析 Worker Pool                                                    │
│  ├── 标准 perp:ALT vs BTC(默认)+ cointegrated_pairs              │
│  └── HIP-3:仅 cointegrated_pairs 中的配对(跳过 BTC 默认)         │
│                    ↓                                                │
│  AdaptiveBollingerStrategy(Z-score 信号)                           │
│                    ↓                                                │
│  TradingOrchestrator → HyperliquidExecutor                          │
│  └── market_open("xyz:XYZ100", ...) → SDK 自动处理 HIP-3           │
└─────────────────────────────────────────────────────────────────────┘

4.2 HIP-3 判断辅助函数

# src/services/realtime_kline_service_base.py(模块级函数)

def _is_hip3_symbol(symbol: str) -> bool:
    """判断是否 HIP-3 市场(coin 部分含 ':')

    Examples:
        _is_hip3_symbol("xyz:XYZ100/USDC:USDC")  → True
        _is_hip3_symbol("BTC/USDC:USDC")          → False
        _is_hip3_symbol("PURR/USDC:USDC")         → False
    """
    coin = symbol.split('/')[0]
    return ':' in coin

5. 数据层设计

5.1 DB Schema 迁移

文件database/init_timescaledb.sql(末尾追加 Migration 节)

-- =====================================================
-- Migration: 新增 HIP-3 市场类型标识
-- 版本: v1.0 (2026-03-03)
-- =====================================================

-- 新增 market_type 列区分标准永续和 HIP-3 市场
ALTER TABLE symbol_metadata
    ADD COLUMN IF NOT EXISTS market_type VARCHAR(10) DEFAULT 'perp';

-- 添加注释
COMMENT ON COLUMN symbol_metadata.market_type IS
    '市场类型: perp=标准永续合约, hip3=HIP-3 Builder部署的永续DEX';

-- 为已有数据设置默认值(确保数据一致性)
UPDATE symbol_metadata SET market_type = 'perp' WHERE market_type IS NULL;

-- 可选索引(按 market_type 过滤时使用)
CREATE INDEX IF NOT EXISTS idx_symbol_metadata_market_type
    ON symbol_metadata (market_type);

\echo '✅ Migration: symbol_metadata.market_type 列已添加'

5.2 SymbolMetadataRepository 修改

文件src/utils/database/timescaledb.py

upsert_symbol() 参数扩展

def upsert_symbol(
    self,
    symbol: str,
    base_asset: str,
    quote_asset: str,
    listing_time: datetime | None = None,
    is_active: bool = True,
    market_type: str = 'perp',        # ← 新增参数
) -> bool:
    """
    注册或更新币种元数据

    Args:
        symbol: 币种符号,如 "BTC/USDC:USDC" 或 "xyz:XYZ100/USDC:USDC"
        base_asset: 基础资产,如 "BTC" 或 "xyz:XYZ100"
        quote_asset: 计价资产,通常为 "USDC"
        listing_time: 上线时间
        is_active: 是否活跃
        market_type: 市场类型,'perp' 或 'hip3'    # ← 新增说明
    """
    try:
        with self.client.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(
                    """
                    INSERT INTO symbol_metadata
                        (symbol, base_asset, quote_asset, listing_time,
                         is_active, market_type, updated_at)
                    VALUES (%s, %s, %s, %s, %s, %s, NOW())
                    ON CONFLICT (symbol)
                    DO UPDATE SET
                        base_asset   = EXCLUDED.base_asset,
                        quote_asset  = EXCLUDED.quote_asset,
                        listing_time = COALESCE(EXCLUDED.listing_time,
                                                symbol_metadata.listing_time),
                        is_active    = EXCLUDED.is_active,
                        market_type  = EXCLUDED.market_type,
                        updated_at   = NOW();
                    """,
                    (symbol, base_asset, quote_asset,
                     listing_time, is_active, market_type)
                )
                return True
    except Exception as e:
        logger.error(f"币种元数据更新失败: {e}")
        return False

get_active_symbols() 参数扩展

def get_active_symbols(
    self,
    market_type: str | None = None,    # ← 新增参数
) -> list[str]:
    """
    获取所有活跃币种列表

    Args:
        market_type: 若指定则过滤市场类型 ('perp' 或 'hip3'),None 返回全部

    Returns:
        活跃币种符号列表,如 ["BTC/USDC:USDC", "xyz:XYZ100/USDC:USDC", ...]
    """
    if market_type:
        results = self.client.execute_query(
            """
            SELECT symbol FROM symbol_metadata
            WHERE is_active = TRUE AND market_type = %s
            ORDER BY symbol;
            """,
            (market_type,)
        )
    else:
        results = self.client.execute_query(
            """
            SELECT symbol FROM symbol_metadata
            WHERE is_active = TRUE
            ORDER BY symbol;
            """
        )
    return [r['symbol'] for r in results] if results else []

6. 服务层设计

6.1 RealtimeKlineService 修改

文件src/services/realtime_kline_service.py

_get_active_symbols() 扩展

def _get_active_symbols(self) -> List[str]:
    """
    获取活跃币种列表(标准 perp + HIP-3)

    Returns:
        所有活跃币种列表,格式如 ["BTC/USDC:USDC", "xyz:XYZ100/USDC:USDC", ...]
    """
    from src.config import HIP3_ENABLED

    # ── 1. 标准 perp 逻辑(现有代码不变)──
    symbols = []
    try:
        active_symbols = self.symbol_repo.get_active_symbols(market_type='perp')
        if active_symbols:
            self.logger.info(f"从数据库加载 {len(active_symbols)} 个活跃标准 perp 币种")
            symbols = active_symbols
        else:
            # 从交易所获取(首次运行)
            self.logger.info("数据库无标准 perp 数据,从交易所获取...")
            meta = retry_call(
                lambda: Info(constants.MAINNET_API_URL, skip_ws=True).meta(),
                description="通用版-交易所元数据",
            )
            for asset_info in meta.get('universe', []):
                name = asset_info.get('name')
                if name:
                    symbol = f"{name}/USDC:USDC"
                    symbols.append(symbol)
                    self.symbol_repo.upsert_symbol(
                        symbol=symbol,
                        base_asset=name,
                        quote_asset='USDC',
                        is_active=True,
                        market_type='perp',     # ← 明确标记为 perp
                    )
    except Exception as e:
        self.logger.error(f"获取标准 perp 币种列表失败: {e}", exc_info=True)
        symbols = ['BTC/USDC:USDC', 'ETH/USDC:USDC']

    # ── 2. HIP-3 发现(新增)──
    if HIP3_ENABLED:
        try:
            hip3_symbols = self._load_hip3_symbols()
            self.logger.info(f"HIP-3 币种发现完成: {len(hip3_symbols)} 个")
            # 注意:这里仅注册到 DB,不全部加入 self.symbols 用于 WS 订阅
            # WS 订阅过滤在 _build_market_subscriptions() 中控制
            symbols.extend(hip3_symbols)
        except Exception as e:
            self.logger.error(f"HIP-3 币种发现失败(不影响标准 perp 运行): {e}")

    return symbols

新增 _load_hip3_symbols() 方法

def _load_hip3_symbols(self) -> List[str]:
    """
    发现所有 HIP-3 DEX 的代币并注册到数据库

    流程:
    1. 调用 perpDexs API 获取所有 DEX 列表
    2. 对每个 DEX 调用 metaAndAssetCtxs 获取代币列表
    3. 构造 symbol 格式并 upsert 到 symbol_metadata(market_type='hip3')
    4. 返回所有 HIP-3 symbol 列表

    Returns:
        HIP-3 symbol 列表,格式如 ["xyz:XYZ100/USDC:USDC", ...]

    注意:
        返回的列表仅用于 DB 注册;WS 订阅过滤由 _build_market_subscriptions()
        按 cointegrated_pairs 表进一步筛选,以避免超过 1000 WS 订阅上限。
    """
    from src.config import HIP3_MAX_DEXES
    from hyperliquid.info import Info
    import hyperliquid.utils.constants as constants

    hip3_symbols = []

    # 获取所有 DEX 列表
    info = retry_call(
        lambda: Info(constants.MAINNET_API_URL, skip_ws=True, timeout=15),
        description="HIP-3-Info初始化",
    )
    all_dexes_raw = retry_call(
        lambda: info.perp_dexs(),
        description="perpDexs列表",
    )

    # all_dexes_raw[0] = null,从 index 1 开始
    valid_dexes = [
        d for d in all_dexes_raw[1:]
        if isinstance(d, dict) and d.get('name')
    ][:HIP3_MAX_DEXES]

    self.logger.info(f"发现 {len(valid_dexes)} 个 HIP-3 DEX(限制 HIP3_MAX_DEXES={HIP3_MAX_DEXES})")

    for dex_info in valid_dexes:
        dex_name = dex_info['name']
        try:
            # 获取该 DEX 的代币列表
            meta = retry_call(
                lambda d=dex_name: info.meta(dex=d),
                description=f"HIP-3-{dex_name}-meta",
                max_attempts=3,
            )
            universe = meta.get('universe', [])

            for asset_info in universe:
                coin_name = asset_info.get('name')
                if not coin_name:
                    continue

                # HIP-3 coin 格式:dex_name:TOKEN(如 xyz:XYZ100)
                # 但 API 返回的可能已经是 dex:TOKEN 格式
                symbol = f"{coin_name}/USDC:USDC"

                hip3_symbols.append(symbol)

                # 注册到数据库
                self.symbol_repo.upsert_symbol(
                    symbol=symbol,
                    base_asset=coin_name,
                    quote_asset='USDC',
                    is_active=True,
                    market_type='hip3',
                )

            self.logger.info(
                f"HIP-3 DEX [{dex_name}]: {len(universe)} 个代币已注册"
            )

        except Exception as e:
            self.logger.warning(
                f"HIP-3 DEX [{dex_name}] 代币加载失败(跳过): {e}"
            )

    return hip3_symbols

6.2 RealtimeKlineServiceBase 修改

文件src/services/realtime_kline_service_base.py

新增模块级辅助函数

# 在文件顶部(import 区域之后,class 定义之前)添加

def _is_hip3_symbol(symbol: str) -> bool:
    """判断是否 HIP-3 市场(coin 部分含 ':')

    HIP-3 coin 格式:dex_name:TOKEN,如 xyz:XYZ100
    项目内部 symbol 格式:xyz:XYZ100/USDC:USDC

    Args:
        symbol: 完整 symbol 字符串,如 "xyz:XYZ100/USDC:USDC"

    Returns:
        True 表示 HIP-3 市场,False 表示标准永续

    Examples:
        >>> _is_hip3_symbol("xyz:XYZ100/USDC:USDC")
        True
        >>> _is_hip3_symbol("BTC/USDC:USDC")
        False
        >>> _is_hip3_symbol("PURR/USDC:USDC")
        False
    """
    coin = symbol.split('/')[0]
    return ':' in coin

_build_market_subscriptions() 扩展

现有方法对所有 symbol 无差别订阅。HIP-3 需要按 cointegrated_pairs 过滤。

def _build_market_subscriptions(self) -> List[Dict]:
    """构建行情 WS 订阅列表(candle + l2Book)

    订阅策略:
    - 标准 perp:所有活跃币种(现有逻辑)
    - HIP-3:仅 cointegrated_pairs 中已登记的 HIP-3 币种
      原因:WS 每 IP 上限 1000 个订阅,需避免超限
    """
    subscriptions = []
    for symbol in self._get_all_symbols():
        coin = symbol.split('/')[0]

        # HIP-3 coins:只订阅已在 cointegrated_pairs 中的
        if _is_hip3_symbol(symbol):
            if not self._is_in_cointegrated_pairs(symbol):
                self.logger.debug(
                    f"跳过 HIP-3 WS 订阅(未在 cointegrated_pairs 中): {symbol}"
                )
                continue

        for interval in ['5m', '1h', '4h']:
            subscriptions.append({
                "type": "candle",
                "coin": coin,
                "interval": interval,
            })
        subscriptions.append({"type": "l2Book", "coin": coin})

    self.logger.info(
        f"WS 订阅构建完成: {len(subscriptions)} 个订阅 "
        f"(上限 1000,当前占用 {len(subscriptions)/1000*100:.1f}%)"
    )
    return subscriptions

def _is_in_cointegrated_pairs(self, symbol: str) -> bool:
    """检查 symbol 是否在 cointegrated_pairs 内存缓存中

    _pair_cache 格式:{alt_symbol: [base_symbol, ...]}
    """
    with self._pair_cache_lock:
        return symbol in self._pair_cache

分析 Worker 循环修改

位置:约 line 950,在 _analysis_worker() 方法内

# 原有代码(对所有 symbol 追加默认 base):
# if symbol != self.base_symbol and self.base_symbol not in base_symbols:
#     base_symbols.append(self.base_symbol)

# 修改后:HIP-3 coins 不追加默认 base,严格依赖 cointegrated_pairs
if symbol != self.base_symbol and self.base_symbol not in base_symbols:
    if not _is_hip3_symbol(symbol):
        # 标准 perp:追加默认 base(BTC),确保即使 cointegrated_pairs
        # 缓存未更新也能检测新产生的协整关系
        base_symbols.append(self.base_symbol)
    # HIP-3:不追加默认 base,只分析已登记的配对
    # 原因:HIP-3 和 BTC 的协整关系需要先经过回测验证

# 当 base_symbols 为空(HIP-3 且 cointegrated_pairs 无记录),跳过分析
if not base_symbols:
    with self.recent_analysis_lock:
        self.recent_analysis[task_key] = current_time
    self.analysis_queue.task_done()
    continue

_monitor_new_symbols() 扩展

def _monitor_new_symbols(self):
    """新币种监控线程

    每小时执行一次:
    1. 检测新上线的标准 perp(现有逻辑)
    2. 检测新上线的 HIP-3 DEX 和代币(新增)
    """
    from src.config import HIP3_ENABLED

    while not self.stop_event.is_set():
        try:
            # ── 现有:标准 perp 新币检测 ──
            # ... (现有代码不变)...

            # ── 新增:HIP-3 新币检测 ──
            if HIP3_ENABLED:
                self._check_new_hip3_symbols()

        except Exception as e:
            self.logger.error(f"新币监控异常: {e}", exc_info=True)

        self.stop_event.wait(3600)  # 每小时检查一次

def _check_new_hip3_symbols(self):
    """检测新上线的 HIP-3 DEX 和代币,更新 DB 注册

    注意:新发现的 HIP-3 coins 不自动订阅 WS,需要用户:
    1. 用 backfill 脚本收历史数据
    2. 回测验证 → import_cointegrated_pairs.py 导入配对
    3. 重启服务后才会订阅 WS
    """
    try:
        new_hip3 = self._load_hip3_symbols()  # 复用已有方法
        if new_hip3:
            with self.symbols_lock:
                current_set = set(self.symbols)
                truly_new = [s for s in new_hip3 if s not in current_set]
                if truly_new:
                    self.symbols.extend(truly_new)
                    self.logger.info(
                        f"发现 {len(truly_new)} 个新 HIP-3 代币,已注册到 DB: "
                        f"{truly_new[:3]}{'...' if len(truly_new) > 3 else ''}"
                    )
    except Exception as e:
        self.logger.warning(f"HIP-3 新币检测失败: {e}")

_get_all_mids_from_ws() Bug 修复

当前 Bugkey.split(":")[0]"xyz:XYZ100:l2Book" 返回 "xyz" 而非 "xyz:XYZ100"

def _get_all_mids_from_ws(self) -> dict[str, float]:
    """从 WebSocket L2Book 缓存读取中间价

    修复:HIP-3 coin 名含 ':',需用 rstrip 而非 split 提取 coin 名
    """
    result = {}
    L2BOOK_SUFFIX = ":l2Book"

    for key, msg in self._market_ws_manager.latest_data.items():
        if not key.endswith(L2BOOK_SUFFIX):
            continue

        # 修复:去掉后缀得到完整 coin 名
        # "BTC:l2Book"        → "BTC"        (标准 perp)✓
        # "xyz:XYZ100:l2Book" → "xyz:XYZ100" (HIP-3)✓
        coin = key[:-len(L2BOOK_SUFFIX)]

        try:
            mid = self._calculate_mid_from_l2_data(msg)
            if mid and mid > 0:
                result[coin] = mid
        except Exception:
            pass

    return result

7. 交易执行层设计

7.1 HyperliquidExecutor 修改

文件src/trading/executor.py

__init__ 新增属性

class HyperliquidExecutor:
    def __init__(self, config: TradingConfig):
        # ... 现有属性 ...

        # HIP-3 相关状态
        self._hip3_dex_names: list[str] = []   # 已加载的 HIP-3 DEX 名称列表

initialize() 扩展

def initialize(self) -> bool:
    """初始化 SDK 连接(支持 HIP-3)"""
    try:
        # ... 现有初始化逻辑(钱包加载、Exchange/Info 创建)...

        # 现有:缓存标准 perp 币种元数据
        self._load_coin_meta()

        # ── 新增:HIP-3 支持初始化 ──
        if self._config.hip3_enabled:
            self._initialize_hip3(base_url)

        # ... 后续现有逻辑不变 ...
        return True

    except Exception as e:
        logger.error(f"交易执行器初始化失败: {e}", exc_info=True)
        return False

def _initialize_hip3(self, base_url: str):
    """初始化 HIP-3 支持

    1. 加载 DEX 名称列表
    2. 重建支持 perp_dexs 的 Exchange 和 Info
    3. 同步 HIP-3 coin 元数据到 _coin_meta
    """
    hip3_dex_names = self._load_hip3_dex_names()
    if not hip3_dex_names:
        logger.warning("未发现任何 HIP-3 DEX,HIP-3 交易将不可用")
        return

    self._hip3_dex_names = hip3_dex_names

    # 重建 Exchange 和 Info(携带 perp_dexs 参数)
    # SDK 会自动调用 metaAndAssetCtxs?dex=xxx 填充 coin_to_asset
    self._exchange = self._create_exchange(
        self._wallet, base_url, perp_dexs=hip3_dex_names
    )
    self._info = self._create_info(base_url, perp_dexs=hip3_dex_names)

    # 将 HIP-3 coin 的 szDecimals 同步到 _coin_meta
    self._sync_hip3_coin_meta()

    logger.info(
        f"HIP-3 交易支持已启用 | "
        f"DEX 数量: {len(hip3_dex_names)} | "
        f"DEX 列表: {hip3_dex_names[:5]}{'...' if len(hip3_dex_names) > 5 else ''}"
    )

_create_exchange() / _create_info() 签名扩展

@staticmethod
@api_retry("Exchange初始化", max_attempts=3, min_wait=2, max_wait=10,
           retryable=is_retryable_api_error)
def _create_exchange(
    wallet,
    base_url: str,
    perp_dexs: list[str] | None = None,    # ← 新增参数
) -> Exchange:
    """带重试的 Exchange 构造,支持 HIP-3 perp_dexs 参数"""
    return Exchange(wallet, base_url, perp_dexs=perp_dexs or None)

@staticmethod
@api_retry("Info初始化", max_attempts=4, min_wait=5, max_wait=30,
           retryable=is_retryable_api_error)
def _create_info(
    base_url: str,
    perp_dexs: list[str] | None = None,    # ← 新增参数
) -> Info:
    """带重试的 Info 构造,支持 HIP-3 perp_dexs 参数"""
    return Info(base_url, skip_ws=True, timeout=10,
                perp_dexs=perp_dexs or None)

新增 _load_hip3_dex_names()

def _load_hip3_dex_names(self) -> list[str]:
    """获取所有 HIP-3 DEX 名称列表

    调用 perpDexs API,跳过第一个 null 元素,返回有效 DEX 名称。

    Returns:
        DEX 名称列表,如 ["xyz", "abc", ...]
    """
    try:
        all_dexes = retry_call(
            lambda: self._info.perp_dexs(),
            description="HIP-3 DEX 列表加载",
            max_attempts=3,
            retryable=is_retryable_readonly,
        )
        # all_dexes[0] 固定为 null,从 index 1 开始
        dex_names = [
            d['name'] for d in all_dexes[1:]
            if isinstance(d, dict) and d.get('name')
        ]
        logger.info(f"HIP-3 DEX 发现: {len(dex_names)} 个 → {dex_names[:5]}")
        return dex_names
    except Exception as e:
        logger.warning(f"HIP-3 DEX 列表加载失败: {e}")
        return []

新增 _sync_hip3_coin_meta()

def _sync_hip3_coin_meta(self):
    """将 HIP-3 coins 的元数据同步到 _coin_meta

    SDK 初始化时已通过 perp_dexs 参数填充了 coin_to_asset 映射,
    这里将 szDecimals 等信息同步到项目自己的 _coin_meta 缓存。

    关键设置:HIP-3 coins 的 forceIsolated=True(仅支持隔离保证金)
    """
    synced = 0
    for coin, asset in self._exchange.info.coin_to_asset.items():
        # HIP-3 资产 ID >= 110000
        if asset >= 110_000 and coin not in self._coin_meta:
            sz_dec = self._exchange.info.asset_to_sz_decimals.get(asset, 8)
            self._coin_meta[coin] = {
                "szDecimals": sz_dec,
                "maxLeverage": 50,      # 默认最高杠杆
                "forceIsolated": True,  # HIP-3 仅支持隔离保证金(官方文档)
            }
            synced += 1

    logger.info(
        f"HIP-3 coin 元数据同步完成: {synced} 个新增 | "
        f"示例: {list(k for k in self._coin_meta if ':' in k)[:3]}"
    )

get_positions() 扩展

def get_positions(self, force_refresh: bool = False) -> list[dict]:
    """获取所有持仓(标准 perp + HIP-3)

    HIP-3 持仓需要按 DEX 单独查询(官方 API 限制:每次只能查一个 DEX)
    """
    # ── 现有标准 perp 逻辑(不变)──
    positions = <现有实现>

    # ── 新增:HIP-3 持仓(按 DEX 遍历)──
    for dex in self._hip3_dex_names:
        try:
            state = retry_call(
                lambda d=dex: self._info.user_state(
                    self._wallet.address, dex=d
                ),
                description=f"HIP-3持仓查询({dex})",
                retryable=is_retryable_readonly,
            )
            hip3_asset_positions = state.get("assetPositions", [])
            hip3_positions = [
                p.get("position", p)
                for p in hip3_asset_positions
                if p
            ]
            if hip3_positions:
                logger.debug(
                    f"HIP-3 持仓 ({dex}): {len(hip3_positions)} 个"
                )
            positions.extend(hip3_positions)
        except Exception as e:
            logger.warning(f"HIP-3 持仓查询失败 (dex={dex}): {e}")

    return positions

_get_all_mids_from_api() 扩展

def _get_all_mids_from_api(self) -> dict[str, float]:
    """获取所有 coin 的中间价(标准 perp + HIP-3)

    HIP-3 mids 需要按 DEX 单独查询(allMids?dex=xxx)
    """
    # ── 标准 perp mids(现有)──
    raw = retry_call(
        lambda: self._info.all_mids(),
        description="全市场价格查询",
        retryable=is_retryable_readonly,
    )
    result = {k: float(v) for k, v in raw.items()}

    # ── HIP-3 mids(新增)──
    for dex in self._hip3_dex_names:
        try:
            hip3_raw = retry_call(
                lambda d=dex: self._info.all_mids(dex=d),
                description=f"HIP-3价格查询({dex})",
                retryable=is_retryable_readonly,
            )
            result.update({k: float(v) for k, v in hip3_raw.items()})
        except Exception as e:
            logger.debug(f"HIP-3 all_mids 查询失败 (dex={dex}): {e}")

    return result

7.2 无需修改的方法

方法 原因
_place_market_order() SDK market_open("xyz:XYZ100", ...) 已原生支持 HIP-3
_place_close_order() SDK market_close("xyz:XYZ100", ...) 已原生支持 HIP-3
_ensure_leverage() 通过 forceIsolated=True 自动选择 is_cross=False
_parse_order_response() API 响应格式与标准 perp 相同
round_size() _coin_meta 中已包含 HIP-3 szDecimals

8. 配置层设计

8.1 src/config.py 新增常量

# ============ HIP-3 配置 ============
HIP3_ENABLED: bool = os.getenv('HIP3_ENABLED', 'false').lower() in ('true', '1', 'yes')
HIP3_MAX_DEXES: int = int(os.getenv('HIP3_MAX_DEXES', '20'))
# 说明:
# HIP3_ENABLED  - 是否启用 HIP-3 市场支持(默认 false,向后兼容)
# HIP3_MAX_DEXES - 最多监控的 HIP-3 DEX 数量(防止 DEX 过多时启动过慢)

8.2 src/trading/config.py 扩展

@dataclass
class TradingConfig:
    # ... 现有字段 ...

    # ── HIP-3 配置 ──
    hip3_enabled: bool = False
    # 说明:是否启用 HIP-3 市场交易。开启后初始化时会加载 HIP-3 DEX
    #       列表并重建 Exchange 对象以支持 HIP-3 下单。
    #       建议与 HIP3_ENABLED(监控侧)保持一致。

load_trading_config() 中对应读取:

hip3_enabled=os.getenv('HIP3_ENABLED', 'false').lower() in ('true', '1', 'yes'),

8.3 .env 配置示例

# HIP-3 功能开关(监控 + 交易均通过此变量控制)
HIP3_ENABLED=true

# 最多监控的 HIP-3 DEX 数量(避免启动过慢,建议 10-30)
HIP3_MAX_DEXES=20

9. 文件改动清单

9.1 新增/修改文件

文件 改动类型 改动摘要
database/init_timescaledb.sql 追加 Migration SQL ALTER TABLE symbol_metadata ADD market_type VARCHAR(10)
src/config.py 新增常量 HIP3_ENABLED, HIP3_MAX_DEXES
src/utils/database/timescaledb.py 参数扩展(2 处) upsert_symbol(market_type=), get_active_symbols(market_type=)
src/services/realtime_kline_service.py 方法扩展 + 新增 _get_active_symbols() 追加 HIP-3 + 新增 _load_hip3_symbols()
src/services/realtime_kline_service_base.py 多处修改 新增 _is_hip3_symbol() / _is_in_cointegrated_pairs() + _build_market_subscriptions() 过滤 + 分析 worker 跳过默认 base + _monitor_new_symbols() 扩展 + _get_all_mids_from_ws() Bug 修复
src/trading/executor.py 多处扩展 initialize() + _initialize_hip3() + _create_exchange/info() + _load_hip3_dex_names() + _sync_hip3_coin_meta() + get_positions() + _get_all_mids_from_api() + _get_all_mids_from_ws() Bug 修复
src/trading/config.py 新增字段 hip3_enabled: bool = False

9.2 无需改动的文件

文件 原因
src/trading/executor.py:_place_market_order() SDK 自动处理 HIP-3
src/trading/executor.py:_place_close_order() SDK 自动处理 HIP-3
src/trading/executor.py:_ensure_leverage() forceIsolated=True 自动生效
src/trading/strategy.py 策略引擎与市场类型无关
src/trading/models.py symbol_to_coin() 兼容 HIP-3 格式
src/trading/orchestrator.py 通过 executor 透明处理
src/trading/position_manager.py 持仓维度为 PairKey,市场类型透明
src/trading/risk_manager.py 风险控制逻辑与市场类型无关
src/utils/analysis/analysis_core.py 协整分析逻辑与 symbol 无关
scripts/import_cointegrated_pairs.py 用户手动导入 HIP-3 配对
scripts/backfill_all_data.py 用于 HIP-3 历史数据收集,现有逻辑可复用

10. 使用流程

10.1 首次上线 HIP-3

# Step 1: 执行 DB 迁移
docker exec -it crypto_timescaledb psql -U postgres -d crypto_data -c "
  ALTER TABLE symbol_metadata ADD COLUMN IF NOT EXISTS market_type VARCHAR(10) DEFAULT 'perp';
  UPDATE symbol_metadata SET market_type = 'perp' WHERE market_type IS NULL;
"

# Step 2: 配置环境变量
echo "HIP3_ENABLED=true" >> .env
echo "HIP3_MAX_DEXES=20" >> .env

# Step 3: 发现 HIP-3 DEX 并注册代币(服务启动时自动执行,也可单独测试)
python -c "
from src.services.realtime_kline_service import RealtimeKlineService
# ... 测试 _load_hip3_symbols()
"

# Step 4: 收集 HIP-3 历史 K 线(用于回测)
python -m src.scripts.backfill_all_data --market-type hip3

# Step 5: 运行回测,筛选 HIP-3 协整配对
python -m src.scripts.backtest_pairwise_correlation --base hip3

# Step 6: 导入验证通过的配对
python -m src.scripts.import_cointegrated_pairs --file hip3_pairs.csv

# Step 7: 启动服务(自动订阅 cointegrated_pairs 中的 HIP-3 WS)
HIP3_ENABLED=true python -m src.services.realtime_kline_service

10.2 日常运维

场景 操作
新 HIP-3 DEX 上线 服务每小时自动检测并注册到 DB,需要手动 backfill + 回测后重启服务才会订阅 WS
添加新 HIP-3 配对 import_cointegrated_pairs.py 导入 → 重启服务
移除 HIP-3 配对 remove_cointegrated_pairs.py 删除 → 重启服务
关闭 HIP-3 HIP3_ENABLED=false → 重启服务

11. 验证方案

11.1 DB 迁移验证

docker exec -it crypto_timescaledb psql -U postgres -d crypto_data -c "
  SELECT column_name, data_type, column_default
  FROM information_schema.columns
  WHERE table_name = 'symbol_metadata'
  AND column_name = 'market_type';
"
# 期望输出: market_type | character varying | perp

11.2 HIP-3 DEX 发现验证

python -c "
from hyperliquid.info import Info
from hyperliquid.utils.constants import MAINNET_API_URL

info = Info(MAINNET_API_URL, skip_ws=True)
dexes = info.perp_dexs()
valid = [d for d in dexes[1:] if isinstance(d, dict) and d.get('name')]
print(f'发现 {len(valid)} 个 HIP-3 DEX')
for d in valid[:5]:
    print(f'  - {d[\"name\"]} (deployer: {d.get(\"deployer\",\"\")[:10]}...)')
"

11.3 SDK HIP-3 能力验证

python -c "
from hyperliquid.info import Info
from hyperliquid.utils.constants import MAINNET_API_URL

# 取第一个 DEX 测试
info_plain = Info(MAINNET_API_URL, skip_ws=True)
dexes = info_plain.perp_dexs()
first_dex = next((d['name'] for d in dexes[1:] if isinstance(d, dict) and d.get('name')), None)

if not first_dex:
    print('无可用 HIP-3 DEX')
else:
    info_hip3 = Info(MAINNET_API_URL, skip_ws=True, perp_dexs=[first_dex])

    # 验证 coin_to_asset 包含 HIP-3 coins
    hip3_coins = {k: v for k, v in info_hip3.coin_to_asset.items() if v >= 110000}
    print(f'DEX [{first_dex}] 加载 {len(hip3_coins)} 个 HIP-3 coins:')
    for coin, asset_id in list(hip3_coins.items())[:3]:
        print(f'  {coin}: asset_id={asset_id}')

    # 验证 all_mids
    mids = info_hip3.all_mids(dex=first_dex)
    print(f'价格数据: {len(mids)} 个 coins, 示例: {dict(list(mids.items())[:2])}')
"

11.4 全量服务启动验证

HIP3_ENABLED=true python -m src.services.realtime_kline_service &
# 观察日志:
# ✓ "HIP-3 支持已启用 | DEX 数量: N"
# ✓ "HIP-3 币种发现完成: N 个"
# ✓ "WS 订阅构建完成: N 个订阅(上限 1000,当前占用 X%)"
# ✗ 不应出现 "WS 订阅数量超限" 警告

11.5 交易执行验证(testnet)

TRADING_NETWORK=testnet HIP3_ENABLED=true \
python -m src.scripts.testnet_open_close_observe
# 观察日志:
# ✓ "_coin_meta 包含 HIP-3 coins(如 xyz:XYZ100)"
# ✓ "forceIsolated=True 触发隔离保证金模式"
# ✓ "HIP-3 DEX 列表加载完成"

12. 风险与注意事项

12.1 WS 订阅配额风险

风险:若 cointegrated_pairs 中导入大量 HIP-3 配对(每个 coin × 3 周期),可能超过 1000 上限。

缓解措施

  • 服务启动时检查并日志输出订阅数量和占用比例
  • 若超过 900 个,发出告警
  • 建议在 cointegrated_pairs 导入时保持 HIP-3 配对总数 < 30 个(90 个订阅)
# _build_market_subscriptions() 中加入检查
total = len(subscriptions)
if total > 900:
    self.logger.warning(
        f"⚠️ WS 订阅数量接近上限: {total}/1000,"
        f"建议减少 HIP-3 cointegrated_pairs 数量"
    )

12.2 隔离保证金资金管理

HIP-3 仅支持隔离保证金,每个仓位的资金独立。

注意事项

  • 每次开仓会实际划拨保证金到该仓位账户
  • 最大持仓数 max_open_pairs 需考虑 HIP-3 仓位的资金占用
  • 建议 HIP-3 仓位使用较小的 base_position_usd

12.3 HIP-3 费用较高

HIP-3 费用是标准 perp 的 2 倍,会影响策略盈利能力。

建议:回测时使用实际费率(约 0.06% taker),而非标准 perp 费率(约 0.03%)。

12.4 流动性风险

HIP-3 市场流动性通常低于主网标准永续,可能导致:

  • 滑点远超预期
  • 限价单难以成交
  • 价格容易被大单影响

建议

  • 优先使用较小仓位
  • 设置更严格的 min_zscore_abs 阈值(提高入场质量)
  • 监控 impactPxs 字段评估流动性深度

12.5 DEX 生命周期风险

HIP-3 DEX 由第三方部署,存在:

  • DEX 突然关闭(需要所有市场先关闭,再等待 30 天)
  • 预言机更新延迟(标记价格可能过期)
  • 部署者参数变更

建议:通过飞书告警监控 HIP-3 仓位的异常(如流动性骤降、价格异常)。

12.6 数据完整性

HIP-3 历史数据通常较短(新市场),协整分析需要 ≥ 358 根 4H K 线(约 60 天)。系统会通过现有的 MIN_4H_DATA_POINTS 检查自动跳过数据不足的配对,无需额外处理。


文档完毕。如需更新,请同步修改对应版本号。

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG