HIP-3集成设计文档
HIP-3 集成设计文档
版本: v1.0
日期: 2026-03-03
适用项目: Trading-in-websocket(全币种协整配对量化交易系统)
目录
1. 背景与目标
1.1 现状
当前系统(RealtimeKlineService)监控 Hyperliquid 主网标准永续合约(约 220 个币种),以 BTC/USDC:USDC 为基准,通过协整分析筛选配对并执行 Adaptive Bollinger Z-Score 策略。
1.2 目标
将 HIP-3(Builder-Deployed Perpetuals) 市场纳入现有系统:
| 维度 | 目标 |
|---|---|
| 监控 | 动态发现所有 HIP-3 DEX 及其代币,注册到 DB,为 backfill 和回测提供数据源 |
| 实时分析 | 对 cointegrated_pairs 表中已登记的 HIP-3 配对,实时计算 Z-score 并告警 |
| 交易 | 对 HIP-3 币种信号自动下单(隔离保证金),复用现有策略引擎 |
1.3 用户决策
| 决策项 | 选择 |
|---|---|
| HIP-3 币种来源 | 动态发现所有 DEX(perpDexs API) |
| 配对分析策略 | 仅 cointegrated_pairs 表中已登记的配对 |
| 交易 SDK | Hyperliquid 官方 Python SDK(原生支持 HIP-3) |
| 服务架构 | 扩展现有全币种服务,不新建独立进程 |
2. HIP-3 技术概述
2.1 什么是 HIP-3
HIP-3(Hyperliquid Improvement Proposal 3)是 Builder-Deployed Perpetuals 协议,允许第三方开发者在 Hyperliquid 基础设施上部署独立的永续期货 DEX。
核心特性:
| 特性 | 说明 |
|---|---|
| 质押要求 | 部署者需质押 500,000 HYPE |
| 保证金模式 | 仅支持隔离保证金(isolated-margin only),不支持跨仓 |
| 费用结构 | 标准 perp 的 2 倍,部署者获得所有费用的 50% |
| 资产命名 | 前缀格式:dex_name:TOKEN,如 xyz:XYZ100 |
| 独立性 | 每个 DEX 有独立资金、订单簿、预言机和参数设置 |
2.2 资产 ID 计算
标准永续: asset_id = universe 中的顺序 index(0 起)
HIP-3: asset_id = 110000 + dex_order * 10000 + local_index
dex_order:perpDexsAPI 响应中 DEX 的顺序(从 0 开始,API 返回数组[null, dex0, dex1, ...])local_index:该 DEX 的universe数组中代币的顺序
由 SDK 自动处理,应用层无需手动计算。
2.3 Symbol 格式规范
| 层级 | 格式 | 示例 |
|---|---|---|
| Hyperliquid API(coin) | dex:TOKEN |
xyz:XYZ100 |
| 项目内部 symbol | dex:TOKEN/USDC:USDC |
xyz:XYZ100/USDC:USDC |
| WS 订阅 coin | dex:TOKEN |
xyz:XYZ100 |
| DB 存储 symbol | dex:TOKEN/USDC:USDC |
xyz:XYZ100/USDC:USDC |
兼容性:现有 symbol_to_coin("xyz:XYZ100/USDC:USDC") 返回 "xyz:XYZ100",无需修改。
2.4 SDK 原生支持
Hyperliquid Python SDK 已原生支持 HIP-3,关键接口:
# Info 初始化(带 HIP-3 DEX 列表)
info = Info(base_url, skip_ws=True, perp_dexs=["xyz", "abc"])
# 内部自动调用 metaAndAssetCtxs?dex=xxx 填充 coin_to_asset 映射
# Exchange 初始化(带 HIP-3 DEX 列表)
exchange = Exchange(wallet, base_url, perp_dexs=["xyz", "abc"])
# 下单(HIP-3 coin 格式与标准 perp 完全一致)
exchange.market_open("xyz:XYZ100", is_buy=True, sz=1.0, slippage=0.01)
exchange.market_close("xyz:XYZ100", sz=1.0, slippage=0.01)
# 价格查询(带 dex 参数)
info.all_mids(dex="xyz") # 返回该 DEX 所有 coin 的中间价
# 持仓查询(带 dex 参数)
info.user_state(address, dex="xyz") # 返回该 DEX 的用户状态
# DEX 发现
info.perp_dexs()
# → [null, {"name": "xyz", "deployer": "0x...", ...}, ...]
2.5 WebSocket 订阅
HIP-3 的 WS K 线订阅格式与标准 perp 完全相同:
{
"method": "subscribe",
"subscription": {
"type": "candle",
"coin": "xyz:XYZ100",
"interval": "5m"
}
}
重要限制:每个 IP 最多 1,000 个 WebSocket 订阅。
3. 关键约束与设计决策
3.1 WebSocket 订阅上限(最关键约束)
现状分析:
标准 perp: ~220 个币种 × 3 周期(5m/1h/4h) = 660 个 candle 订阅
+ 220 个 l2Book 订阅
= 约 880 个订阅
剩余可用: 1000 - 880 = 120 个订阅
若对所有 HIP-3 币种全量订阅(假设 100 个 HIP-3 coins × 3 周期 = 300 个),将严重超限。
设计决策:分层订阅策略
HIP-3 币种 WS 订阅:仅订阅 cointegrated_pairs 表中已登记的 HIP-3 配对
HIP-3 历史数据收集:通过 backfill 脚本(REST API)收集,不占用 WS 配额
这样只有经过回测验证、已导入配对表的 HIP-3 币种才会占用 WS 订阅资源。
3.2 仅支持隔离保证金
官方文档明确说明:HIP-3 市场当前仅支持隔离保证金,跨仓计划未来支持。
设计决策:在 _sync_hip3_coin_meta() 中将所有 HIP-3 coin 的 forceIsolated 设为 True。
现有的 _ensure_leverage() 方法检测到 forceIsolated=True 会自动使用 is_cross=False,
无需修改 _ensure_leverage() 代码。
3.3 持仓查询差异
标准 perp 持仓通过 user_state(address) 获取;HIP-3 持仓需要 user_state(address, dex="xyz") 按 DEX 单独查询。
设计决策:get_positions() 在现有标准查询基础上,遍历 _hip3_dex_names 追加查询并合并结果。
3.4 L2Book WS 缓存 Key 解析 Bug
现有 _get_all_mids_from_ws() 中:
coin = key.split(":")[0] # 期望拆分 "BTC:l2Book" → "BTC"
HIP-3 coin 名含 :,如 "xyz:XYZ100:l2Book" 被错误解析为 "xyz"。
设计决策:修复为:
coin = key[:-len(":l2Book")] # "xyz:XYZ100:l2Book" → "xyz:XYZ100" ✓
3.5 分析任务中的 Base Symbol 处理
现有逻辑对所有 symbol 都追加 self.base_symbol(BTC)作为兜底分析基准。
HIP-3 coins 只应与 cointegrated_pairs 表中登记的基准配对,不应自动追加 BTC。
设计决策:分析 worker 中检测到 HIP-3 symbol 时跳过默认 base 追加。
4. 整体架构设计
4.1 数据流图
┌─────────────────────────────────────────────────────────────────────┐
│ 【历史数据收集层】(离线) │
│ │
│ perpDexs API → 发现所有 HIP-3 DEX │
│ ↓ │
│ metaAndAssetCtxs → 获取各 DEX 的 universe(代币列表) │
│ ↓ │
│ symbol_metadata 表 ← upsert (market_type='hip3') │
│ ↓ │
│ backfill_all_data.py → REST API 拉取历史 K 线 → klines 表 │
│ ↓ │
│ 回测脚本(backtest_pairwise_correlation.py 等) │
│ ↓ │
│ import_cointegrated_pairs.py → cointegrated_pairs 表 │
└─────────────────────────────────────────────────────────────────────┘
↓(cointegrated_pairs 驱动)
┌─────────────────────────────────────────────────────────────────────┐
│ 【实时监控层】(在线) │
│ │
│ Hyperliquid WebSocket API │
│ ├── 标准 perp candle:所有活跃币种 │
│ └── HIP-3 candle:仅 cointegrated_pairs 中的 HIP-3 coins │
│ ↓ │
│ EnhancedWebSocketManager(假活检测 + 自动重连) │
│ ↓ │
│ 分析 Worker Pool │
│ ├── 标准 perp:ALT vs BTC(默认)+ cointegrated_pairs │
│ └── HIP-3:仅 cointegrated_pairs 中的配对(跳过 BTC 默认) │
│ ↓ │
│ AdaptiveBollingerStrategy(Z-score 信号) │
│ ↓ │
│ TradingOrchestrator → HyperliquidExecutor │
│ └── market_open("xyz:XYZ100", ...) → SDK 自动处理 HIP-3 │
└─────────────────────────────────────────────────────────────────────┘
4.2 HIP-3 判断辅助函数
# src/services/realtime_kline_service_base.py(模块级函数)
def _is_hip3_symbol(symbol: str) -> bool:
"""判断是否 HIP-3 市场(coin 部分含 ':')
Examples:
_is_hip3_symbol("xyz:XYZ100/USDC:USDC") → True
_is_hip3_symbol("BTC/USDC:USDC") → False
_is_hip3_symbol("PURR/USDC:USDC") → False
"""
coin = symbol.split('/')[0]
return ':' in coin
5. 数据层设计
5.1 DB Schema 迁移
文件:database/init_timescaledb.sql(末尾追加 Migration 节)
-- =====================================================
-- Migration: 新增 HIP-3 市场类型标识
-- 版本: v1.0 (2026-03-03)
-- =====================================================
-- 新增 market_type 列区分标准永续和 HIP-3 市场
ALTER TABLE symbol_metadata
ADD COLUMN IF NOT EXISTS market_type VARCHAR(10) DEFAULT 'perp';
-- 添加注释
COMMENT ON COLUMN symbol_metadata.market_type IS
'市场类型: perp=标准永续合约, hip3=HIP-3 Builder部署的永续DEX';
-- 为已有数据设置默认值(确保数据一致性)
UPDATE symbol_metadata SET market_type = 'perp' WHERE market_type IS NULL;
-- 可选索引(按 market_type 过滤时使用)
CREATE INDEX IF NOT EXISTS idx_symbol_metadata_market_type
ON symbol_metadata (market_type);
\echo '✅ Migration: symbol_metadata.market_type 列已添加'
5.2 SymbolMetadataRepository 修改
文件:src/utils/database/timescaledb.py
upsert_symbol() 参数扩展
def upsert_symbol(
self,
symbol: str,
base_asset: str,
quote_asset: str,
listing_time: datetime | None = None,
is_active: bool = True,
market_type: str = 'perp', # ← 新增参数
) -> bool:
"""
注册或更新币种元数据
Args:
symbol: 币种符号,如 "BTC/USDC:USDC" 或 "xyz:XYZ100/USDC:USDC"
base_asset: 基础资产,如 "BTC" 或 "xyz:XYZ100"
quote_asset: 计价资产,通常为 "USDC"
listing_time: 上线时间
is_active: 是否活跃
market_type: 市场类型,'perp' 或 'hip3' # ← 新增说明
"""
try:
with self.client.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO symbol_metadata
(symbol, base_asset, quote_asset, listing_time,
is_active, market_type, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (symbol)
DO UPDATE SET
base_asset = EXCLUDED.base_asset,
quote_asset = EXCLUDED.quote_asset,
listing_time = COALESCE(EXCLUDED.listing_time,
symbol_metadata.listing_time),
is_active = EXCLUDED.is_active,
market_type = EXCLUDED.market_type,
updated_at = NOW();
""",
(symbol, base_asset, quote_asset,
listing_time, is_active, market_type)
)
return True
except Exception as e:
logger.error(f"币种元数据更新失败: {e}")
return False
get_active_symbols() 参数扩展
def get_active_symbols(
self,
market_type: str | None = None, # ← 新增参数
) -> list[str]:
"""
获取所有活跃币种列表
Args:
market_type: 若指定则过滤市场类型 ('perp' 或 'hip3'),None 返回全部
Returns:
活跃币种符号列表,如 ["BTC/USDC:USDC", "xyz:XYZ100/USDC:USDC", ...]
"""
if market_type:
results = self.client.execute_query(
"""
SELECT symbol FROM symbol_metadata
WHERE is_active = TRUE AND market_type = %s
ORDER BY symbol;
""",
(market_type,)
)
else:
results = self.client.execute_query(
"""
SELECT symbol FROM symbol_metadata
WHERE is_active = TRUE
ORDER BY symbol;
"""
)
return [r['symbol'] for r in results] if results else []
6. 服务层设计
6.1 RealtimeKlineService 修改
文件:src/services/realtime_kline_service.py
_get_active_symbols() 扩展
def _get_active_symbols(self) -> List[str]:
"""
获取活跃币种列表(标准 perp + HIP-3)
Returns:
所有活跃币种列表,格式如 ["BTC/USDC:USDC", "xyz:XYZ100/USDC:USDC", ...]
"""
from src.config import HIP3_ENABLED
# ── 1. 标准 perp 逻辑(现有代码不变)──
symbols = []
try:
active_symbols = self.symbol_repo.get_active_symbols(market_type='perp')
if active_symbols:
self.logger.info(f"从数据库加载 {len(active_symbols)} 个活跃标准 perp 币种")
symbols = active_symbols
else:
# 从交易所获取(首次运行)
self.logger.info("数据库无标准 perp 数据,从交易所获取...")
meta = retry_call(
lambda: Info(constants.MAINNET_API_URL, skip_ws=True).meta(),
description="通用版-交易所元数据",
)
for asset_info in meta.get('universe', []):
name = asset_info.get('name')
if name:
symbol = f"{name}/USDC:USDC"
symbols.append(symbol)
self.symbol_repo.upsert_symbol(
symbol=symbol,
base_asset=name,
quote_asset='USDC',
is_active=True,
market_type='perp', # ← 明确标记为 perp
)
except Exception as e:
self.logger.error(f"获取标准 perp 币种列表失败: {e}", exc_info=True)
symbols = ['BTC/USDC:USDC', 'ETH/USDC:USDC']
# ── 2. HIP-3 发现(新增)──
if HIP3_ENABLED:
try:
hip3_symbols = self._load_hip3_symbols()
self.logger.info(f"HIP-3 币种发现完成: {len(hip3_symbols)} 个")
# 注意:这里仅注册到 DB,不全部加入 self.symbols 用于 WS 订阅
# WS 订阅过滤在 _build_market_subscriptions() 中控制
symbols.extend(hip3_symbols)
except Exception as e:
self.logger.error(f"HIP-3 币种发现失败(不影响标准 perp 运行): {e}")
return symbols
新增 _load_hip3_symbols() 方法
def _load_hip3_symbols(self) -> List[str]:
"""
发现所有 HIP-3 DEX 的代币并注册到数据库
流程:
1. 调用 perpDexs API 获取所有 DEX 列表
2. 对每个 DEX 调用 metaAndAssetCtxs 获取代币列表
3. 构造 symbol 格式并 upsert 到 symbol_metadata(market_type='hip3')
4. 返回所有 HIP-3 symbol 列表
Returns:
HIP-3 symbol 列表,格式如 ["xyz:XYZ100/USDC:USDC", ...]
注意:
返回的列表仅用于 DB 注册;WS 订阅过滤由 _build_market_subscriptions()
按 cointegrated_pairs 表进一步筛选,以避免超过 1000 WS 订阅上限。
"""
from src.config import HIP3_MAX_DEXES
from hyperliquid.info import Info
import hyperliquid.utils.constants as constants
hip3_symbols = []
# 获取所有 DEX 列表
info = retry_call(
lambda: Info(constants.MAINNET_API_URL, skip_ws=True, timeout=15),
description="HIP-3-Info初始化",
)
all_dexes_raw = retry_call(
lambda: info.perp_dexs(),
description="perpDexs列表",
)
# all_dexes_raw[0] = null,从 index 1 开始
valid_dexes = [
d for d in all_dexes_raw[1:]
if isinstance(d, dict) and d.get('name')
][:HIP3_MAX_DEXES]
self.logger.info(f"发现 {len(valid_dexes)} 个 HIP-3 DEX(限制 HIP3_MAX_DEXES={HIP3_MAX_DEXES})")
for dex_info in valid_dexes:
dex_name = dex_info['name']
try:
# 获取该 DEX 的代币列表
meta = retry_call(
lambda d=dex_name: info.meta(dex=d),
description=f"HIP-3-{dex_name}-meta",
max_attempts=3,
)
universe = meta.get('universe', [])
for asset_info in universe:
coin_name = asset_info.get('name')
if not coin_name:
continue
# HIP-3 coin 格式:dex_name:TOKEN(如 xyz:XYZ100)
# 但 API 返回的可能已经是 dex:TOKEN 格式
symbol = f"{coin_name}/USDC:USDC"
hip3_symbols.append(symbol)
# 注册到数据库
self.symbol_repo.upsert_symbol(
symbol=symbol,
base_asset=coin_name,
quote_asset='USDC',
is_active=True,
market_type='hip3',
)
self.logger.info(
f"HIP-3 DEX [{dex_name}]: {len(universe)} 个代币已注册"
)
except Exception as e:
self.logger.warning(
f"HIP-3 DEX [{dex_name}] 代币加载失败(跳过): {e}"
)
return hip3_symbols
6.2 RealtimeKlineServiceBase 修改
文件:src/services/realtime_kline_service_base.py
新增模块级辅助函数
# 在文件顶部(import 区域之后,class 定义之前)添加
def _is_hip3_symbol(symbol: str) -> bool:
"""判断是否 HIP-3 市场(coin 部分含 ':')
HIP-3 coin 格式:dex_name:TOKEN,如 xyz:XYZ100
项目内部 symbol 格式:xyz:XYZ100/USDC:USDC
Args:
symbol: 完整 symbol 字符串,如 "xyz:XYZ100/USDC:USDC"
Returns:
True 表示 HIP-3 市场,False 表示标准永续
Examples:
>>> _is_hip3_symbol("xyz:XYZ100/USDC:USDC")
True
>>> _is_hip3_symbol("BTC/USDC:USDC")
False
>>> _is_hip3_symbol("PURR/USDC:USDC")
False
"""
coin = symbol.split('/')[0]
return ':' in coin
_build_market_subscriptions() 扩展
现有方法对所有 symbol 无差别订阅。HIP-3 需要按 cointegrated_pairs 过滤。
def _build_market_subscriptions(self) -> List[Dict]:
"""构建行情 WS 订阅列表(candle + l2Book)
订阅策略:
- 标准 perp:所有活跃币种(现有逻辑)
- HIP-3:仅 cointegrated_pairs 中已登记的 HIP-3 币种
原因:WS 每 IP 上限 1000 个订阅,需避免超限
"""
subscriptions = []
for symbol in self._get_all_symbols():
coin = symbol.split('/')[0]
# HIP-3 coins:只订阅已在 cointegrated_pairs 中的
if _is_hip3_symbol(symbol):
if not self._is_in_cointegrated_pairs(symbol):
self.logger.debug(
f"跳过 HIP-3 WS 订阅(未在 cointegrated_pairs 中): {symbol}"
)
continue
for interval in ['5m', '1h', '4h']:
subscriptions.append({
"type": "candle",
"coin": coin,
"interval": interval,
})
subscriptions.append({"type": "l2Book", "coin": coin})
self.logger.info(
f"WS 订阅构建完成: {len(subscriptions)} 个订阅 "
f"(上限 1000,当前占用 {len(subscriptions)/1000*100:.1f}%)"
)
return subscriptions
def _is_in_cointegrated_pairs(self, symbol: str) -> bool:
"""检查 symbol 是否在 cointegrated_pairs 内存缓存中
_pair_cache 格式:{alt_symbol: [base_symbol, ...]}
"""
with self._pair_cache_lock:
return symbol in self._pair_cache
分析 Worker 循环修改
位置:约 line 950,在 _analysis_worker() 方法内
# 原有代码(对所有 symbol 追加默认 base):
# if symbol != self.base_symbol and self.base_symbol not in base_symbols:
# base_symbols.append(self.base_symbol)
# 修改后:HIP-3 coins 不追加默认 base,严格依赖 cointegrated_pairs
if symbol != self.base_symbol and self.base_symbol not in base_symbols:
if not _is_hip3_symbol(symbol):
# 标准 perp:追加默认 base(BTC),确保即使 cointegrated_pairs
# 缓存未更新也能检测新产生的协整关系
base_symbols.append(self.base_symbol)
# HIP-3:不追加默认 base,只分析已登记的配对
# 原因:HIP-3 和 BTC 的协整关系需要先经过回测验证
# 当 base_symbols 为空(HIP-3 且 cointegrated_pairs 无记录),跳过分析
if not base_symbols:
with self.recent_analysis_lock:
self.recent_analysis[task_key] = current_time
self.analysis_queue.task_done()
continue
_monitor_new_symbols() 扩展
def _monitor_new_symbols(self):
"""新币种监控线程
每小时执行一次:
1. 检测新上线的标准 perp(现有逻辑)
2. 检测新上线的 HIP-3 DEX 和代币(新增)
"""
from src.config import HIP3_ENABLED
while not self.stop_event.is_set():
try:
# ── 现有:标准 perp 新币检测 ──
# ... (现有代码不变)...
# ── 新增:HIP-3 新币检测 ──
if HIP3_ENABLED:
self._check_new_hip3_symbols()
except Exception as e:
self.logger.error(f"新币监控异常: {e}", exc_info=True)
self.stop_event.wait(3600) # 每小时检查一次
def _check_new_hip3_symbols(self):
"""检测新上线的 HIP-3 DEX 和代币,更新 DB 注册
注意:新发现的 HIP-3 coins 不自动订阅 WS,需要用户:
1. 用 backfill 脚本收历史数据
2. 回测验证 → import_cointegrated_pairs.py 导入配对
3. 重启服务后才会订阅 WS
"""
try:
new_hip3 = self._load_hip3_symbols() # 复用已有方法
if new_hip3:
with self.symbols_lock:
current_set = set(self.symbols)
truly_new = [s for s in new_hip3 if s not in current_set]
if truly_new:
self.symbols.extend(truly_new)
self.logger.info(
f"发现 {len(truly_new)} 个新 HIP-3 代币,已注册到 DB: "
f"{truly_new[:3]}{'...' if len(truly_new) > 3 else ''}"
)
except Exception as e:
self.logger.warning(f"HIP-3 新币检测失败: {e}")
_get_all_mids_from_ws() Bug 修复
当前 Bug:key.split(":")[0] 对 "xyz:XYZ100:l2Book" 返回 "xyz" 而非 "xyz:XYZ100"。
def _get_all_mids_from_ws(self) -> dict[str, float]:
"""从 WebSocket L2Book 缓存读取中间价
修复:HIP-3 coin 名含 ':',需用 rstrip 而非 split 提取 coin 名
"""
result = {}
L2BOOK_SUFFIX = ":l2Book"
for key, msg in self._market_ws_manager.latest_data.items():
if not key.endswith(L2BOOK_SUFFIX):
continue
# 修复:去掉后缀得到完整 coin 名
# "BTC:l2Book" → "BTC" (标准 perp)✓
# "xyz:XYZ100:l2Book" → "xyz:XYZ100" (HIP-3)✓
coin = key[:-len(L2BOOK_SUFFIX)]
try:
mid = self._calculate_mid_from_l2_data(msg)
if mid and mid > 0:
result[coin] = mid
except Exception:
pass
return result
7. 交易执行层设计
7.1 HyperliquidExecutor 修改
文件:src/trading/executor.py
__init__ 新增属性
class HyperliquidExecutor:
def __init__(self, config: TradingConfig):
# ... 现有属性 ...
# HIP-3 相关状态
self._hip3_dex_names: list[str] = [] # 已加载的 HIP-3 DEX 名称列表
initialize() 扩展
def initialize(self) -> bool:
"""初始化 SDK 连接(支持 HIP-3)"""
try:
# ... 现有初始化逻辑(钱包加载、Exchange/Info 创建)...
# 现有:缓存标准 perp 币种元数据
self._load_coin_meta()
# ── 新增:HIP-3 支持初始化 ──
if self._config.hip3_enabled:
self._initialize_hip3(base_url)
# ... 后续现有逻辑不变 ...
return True
except Exception as e:
logger.error(f"交易执行器初始化失败: {e}", exc_info=True)
return False
def _initialize_hip3(self, base_url: str):
"""初始化 HIP-3 支持
1. 加载 DEX 名称列表
2. 重建支持 perp_dexs 的 Exchange 和 Info
3. 同步 HIP-3 coin 元数据到 _coin_meta
"""
hip3_dex_names = self._load_hip3_dex_names()
if not hip3_dex_names:
logger.warning("未发现任何 HIP-3 DEX,HIP-3 交易将不可用")
return
self._hip3_dex_names = hip3_dex_names
# 重建 Exchange 和 Info(携带 perp_dexs 参数)
# SDK 会自动调用 metaAndAssetCtxs?dex=xxx 填充 coin_to_asset
self._exchange = self._create_exchange(
self._wallet, base_url, perp_dexs=hip3_dex_names
)
self._info = self._create_info(base_url, perp_dexs=hip3_dex_names)
# 将 HIP-3 coin 的 szDecimals 同步到 _coin_meta
self._sync_hip3_coin_meta()
logger.info(
f"HIP-3 交易支持已启用 | "
f"DEX 数量: {len(hip3_dex_names)} | "
f"DEX 列表: {hip3_dex_names[:5]}{'...' if len(hip3_dex_names) > 5 else ''}"
)
_create_exchange() / _create_info() 签名扩展
@staticmethod
@api_retry("Exchange初始化", max_attempts=3, min_wait=2, max_wait=10,
retryable=is_retryable_api_error)
def _create_exchange(
wallet,
base_url: str,
perp_dexs: list[str] | None = None, # ← 新增参数
) -> Exchange:
"""带重试的 Exchange 构造,支持 HIP-3 perp_dexs 参数"""
return Exchange(wallet, base_url, perp_dexs=perp_dexs or None)
@staticmethod
@api_retry("Info初始化", max_attempts=4, min_wait=5, max_wait=30,
retryable=is_retryable_api_error)
def _create_info(
base_url: str,
perp_dexs: list[str] | None = None, # ← 新增参数
) -> Info:
"""带重试的 Info 构造,支持 HIP-3 perp_dexs 参数"""
return Info(base_url, skip_ws=True, timeout=10,
perp_dexs=perp_dexs or None)
新增 _load_hip3_dex_names()
def _load_hip3_dex_names(self) -> list[str]:
"""获取所有 HIP-3 DEX 名称列表
调用 perpDexs API,跳过第一个 null 元素,返回有效 DEX 名称。
Returns:
DEX 名称列表,如 ["xyz", "abc", ...]
"""
try:
all_dexes = retry_call(
lambda: self._info.perp_dexs(),
description="HIP-3 DEX 列表加载",
max_attempts=3,
retryable=is_retryable_readonly,
)
# all_dexes[0] 固定为 null,从 index 1 开始
dex_names = [
d['name'] for d in all_dexes[1:]
if isinstance(d, dict) and d.get('name')
]
logger.info(f"HIP-3 DEX 发现: {len(dex_names)} 个 → {dex_names[:5]}")
return dex_names
except Exception as e:
logger.warning(f"HIP-3 DEX 列表加载失败: {e}")
return []
新增 _sync_hip3_coin_meta()
def _sync_hip3_coin_meta(self):
"""将 HIP-3 coins 的元数据同步到 _coin_meta
SDK 初始化时已通过 perp_dexs 参数填充了 coin_to_asset 映射,
这里将 szDecimals 等信息同步到项目自己的 _coin_meta 缓存。
关键设置:HIP-3 coins 的 forceIsolated=True(仅支持隔离保证金)
"""
synced = 0
for coin, asset in self._exchange.info.coin_to_asset.items():
# HIP-3 资产 ID >= 110000
if asset >= 110_000 and coin not in self._coin_meta:
sz_dec = self._exchange.info.asset_to_sz_decimals.get(asset, 8)
self._coin_meta[coin] = {
"szDecimals": sz_dec,
"maxLeverage": 50, # 默认最高杠杆
"forceIsolated": True, # HIP-3 仅支持隔离保证金(官方文档)
}
synced += 1
logger.info(
f"HIP-3 coin 元数据同步完成: {synced} 个新增 | "
f"示例: {list(k for k in self._coin_meta if ':' in k)[:3]}"
)
get_positions() 扩展
def get_positions(self, force_refresh: bool = False) -> list[dict]:
"""获取所有持仓(标准 perp + HIP-3)
HIP-3 持仓需要按 DEX 单独查询(官方 API 限制:每次只能查一个 DEX)
"""
# ── 现有标准 perp 逻辑(不变)──
positions = <现有实现>
# ── 新增:HIP-3 持仓(按 DEX 遍历)──
for dex in self._hip3_dex_names:
try:
state = retry_call(
lambda d=dex: self._info.user_state(
self._wallet.address, dex=d
),
description=f"HIP-3持仓查询({dex})",
retryable=is_retryable_readonly,
)
hip3_asset_positions = state.get("assetPositions", [])
hip3_positions = [
p.get("position", p)
for p in hip3_asset_positions
if p
]
if hip3_positions:
logger.debug(
f"HIP-3 持仓 ({dex}): {len(hip3_positions)} 个"
)
positions.extend(hip3_positions)
except Exception as e:
logger.warning(f"HIP-3 持仓查询失败 (dex={dex}): {e}")
return positions
_get_all_mids_from_api() 扩展
def _get_all_mids_from_api(self) -> dict[str, float]:
"""获取所有 coin 的中间价(标准 perp + HIP-3)
HIP-3 mids 需要按 DEX 单独查询(allMids?dex=xxx)
"""
# ── 标准 perp mids(现有)──
raw = retry_call(
lambda: self._info.all_mids(),
description="全市场价格查询",
retryable=is_retryable_readonly,
)
result = {k: float(v) for k, v in raw.items()}
# ── HIP-3 mids(新增)──
for dex in self._hip3_dex_names:
try:
hip3_raw = retry_call(
lambda d=dex: self._info.all_mids(dex=d),
description=f"HIP-3价格查询({dex})",
retryable=is_retryable_readonly,
)
result.update({k: float(v) for k, v in hip3_raw.items()})
except Exception as e:
logger.debug(f"HIP-3 all_mids 查询失败 (dex={dex}): {e}")
return result
7.2 无需修改的方法
| 方法 | 原因 |
|---|---|
_place_market_order() |
SDK market_open("xyz:XYZ100", ...) 已原生支持 HIP-3 |
_place_close_order() |
SDK market_close("xyz:XYZ100", ...) 已原生支持 HIP-3 |
_ensure_leverage() |
通过 forceIsolated=True 自动选择 is_cross=False |
_parse_order_response() |
API 响应格式与标准 perp 相同 |
round_size() |
_coin_meta 中已包含 HIP-3 szDecimals |
8. 配置层设计
8.1 src/config.py 新增常量
# ============ HIP-3 配置 ============
HIP3_ENABLED: bool = os.getenv('HIP3_ENABLED', 'false').lower() in ('true', '1', 'yes')
HIP3_MAX_DEXES: int = int(os.getenv('HIP3_MAX_DEXES', '20'))
# 说明:
# HIP3_ENABLED - 是否启用 HIP-3 市场支持(默认 false,向后兼容)
# HIP3_MAX_DEXES - 最多监控的 HIP-3 DEX 数量(防止 DEX 过多时启动过慢)
8.2 src/trading/config.py 扩展
@dataclass
class TradingConfig:
# ... 现有字段 ...
# ── HIP-3 配置 ──
hip3_enabled: bool = False
# 说明:是否启用 HIP-3 市场交易。开启后初始化时会加载 HIP-3 DEX
# 列表并重建 Exchange 对象以支持 HIP-3 下单。
# 建议与 HIP3_ENABLED(监控侧)保持一致。
load_trading_config() 中对应读取:
hip3_enabled=os.getenv('HIP3_ENABLED', 'false').lower() in ('true', '1', 'yes'),
8.3 .env 配置示例
# HIP-3 功能开关(监控 + 交易均通过此变量控制)
HIP3_ENABLED=true
# 最多监控的 HIP-3 DEX 数量(避免启动过慢,建议 10-30)
HIP3_MAX_DEXES=20
9. 文件改动清单
9.1 新增/修改文件
| 文件 | 改动类型 | 改动摘要 |
|---|---|---|
database/init_timescaledb.sql |
追加 Migration SQL | ALTER TABLE symbol_metadata ADD market_type VARCHAR(10) |
src/config.py |
新增常量 | HIP3_ENABLED, HIP3_MAX_DEXES |
src/utils/database/timescaledb.py |
参数扩展(2 处) | upsert_symbol(market_type=), get_active_symbols(market_type=) |
src/services/realtime_kline_service.py |
方法扩展 + 新增 | _get_active_symbols() 追加 HIP-3 + 新增 _load_hip3_symbols() |
src/services/realtime_kline_service_base.py |
多处修改 | 新增 _is_hip3_symbol() / _is_in_cointegrated_pairs() + _build_market_subscriptions() 过滤 + 分析 worker 跳过默认 base + _monitor_new_symbols() 扩展 + _get_all_mids_from_ws() Bug 修复 |
src/trading/executor.py |
多处扩展 | initialize() + _initialize_hip3() + _create_exchange/info() + _load_hip3_dex_names() + _sync_hip3_coin_meta() + get_positions() + _get_all_mids_from_api() + _get_all_mids_from_ws() Bug 修复 |
src/trading/config.py |
新增字段 | hip3_enabled: bool = False |
9.2 无需改动的文件
| 文件 | 原因 |
|---|---|
src/trading/executor.py:_place_market_order() |
SDK 自动处理 HIP-3 |
src/trading/executor.py:_place_close_order() |
SDK 自动处理 HIP-3 |
src/trading/executor.py:_ensure_leverage() |
forceIsolated=True 自动生效 |
src/trading/strategy.py |
策略引擎与市场类型无关 |
src/trading/models.py |
symbol_to_coin() 兼容 HIP-3 格式 |
src/trading/orchestrator.py |
通过 executor 透明处理 |
src/trading/position_manager.py |
持仓维度为 PairKey,市场类型透明 |
src/trading/risk_manager.py |
风险控制逻辑与市场类型无关 |
src/utils/analysis/analysis_core.py |
协整分析逻辑与 symbol 无关 |
scripts/import_cointegrated_pairs.py |
用户手动导入 HIP-3 配对 |
scripts/backfill_all_data.py |
用于 HIP-3 历史数据收集,现有逻辑可复用 |
10. 使用流程
10.1 首次上线 HIP-3
# Step 1: 执行 DB 迁移
docker exec -it crypto_timescaledb psql -U postgres -d crypto_data -c "
ALTER TABLE symbol_metadata ADD COLUMN IF NOT EXISTS market_type VARCHAR(10) DEFAULT 'perp';
UPDATE symbol_metadata SET market_type = 'perp' WHERE market_type IS NULL;
"
# Step 2: 配置环境变量
echo "HIP3_ENABLED=true" >> .env
echo "HIP3_MAX_DEXES=20" >> .env
# Step 3: 发现 HIP-3 DEX 并注册代币(服务启动时自动执行,也可单独测试)
python -c "
from src.services.realtime_kline_service import RealtimeKlineService
# ... 测试 _load_hip3_symbols()
"
# Step 4: 收集 HIP-3 历史 K 线(用于回测)
python -m src.scripts.backfill_all_data --market-type hip3
# Step 5: 运行回测,筛选 HIP-3 协整配对
python -m src.scripts.backtest_pairwise_correlation --base hip3
# Step 6: 导入验证通过的配对
python -m src.scripts.import_cointegrated_pairs --file hip3_pairs.csv
# Step 7: 启动服务(自动订阅 cointegrated_pairs 中的 HIP-3 WS)
HIP3_ENABLED=true python -m src.services.realtime_kline_service
10.2 日常运维
| 场景 | 操作 |
|---|---|
| 新 HIP-3 DEX 上线 | 服务每小时自动检测并注册到 DB,需要手动 backfill + 回测后重启服务才会订阅 WS |
| 添加新 HIP-3 配对 | import_cointegrated_pairs.py 导入 → 重启服务 |
| 移除 HIP-3 配对 | remove_cointegrated_pairs.py 删除 → 重启服务 |
| 关闭 HIP-3 | HIP3_ENABLED=false → 重启服务 |
11. 验证方案
11.1 DB 迁移验证
docker exec -it crypto_timescaledb psql -U postgres -d crypto_data -c "
SELECT column_name, data_type, column_default
FROM information_schema.columns
WHERE table_name = 'symbol_metadata'
AND column_name = 'market_type';
"
# 期望输出: market_type | character varying | perp
11.2 HIP-3 DEX 发现验证
python -c "
from hyperliquid.info import Info
from hyperliquid.utils.constants import MAINNET_API_URL
info = Info(MAINNET_API_URL, skip_ws=True)
dexes = info.perp_dexs()
valid = [d for d in dexes[1:] if isinstance(d, dict) and d.get('name')]
print(f'发现 {len(valid)} 个 HIP-3 DEX')
for d in valid[:5]:
print(f' - {d[\"name\"]} (deployer: {d.get(\"deployer\",\"\")[:10]}...)')
"
11.3 SDK HIP-3 能力验证
python -c "
from hyperliquid.info import Info
from hyperliquid.utils.constants import MAINNET_API_URL
# 取第一个 DEX 测试
info_plain = Info(MAINNET_API_URL, skip_ws=True)
dexes = info_plain.perp_dexs()
first_dex = next((d['name'] for d in dexes[1:] if isinstance(d, dict) and d.get('name')), None)
if not first_dex:
print('无可用 HIP-3 DEX')
else:
info_hip3 = Info(MAINNET_API_URL, skip_ws=True, perp_dexs=[first_dex])
# 验证 coin_to_asset 包含 HIP-3 coins
hip3_coins = {k: v for k, v in info_hip3.coin_to_asset.items() if v >= 110000}
print(f'DEX [{first_dex}] 加载 {len(hip3_coins)} 个 HIP-3 coins:')
for coin, asset_id in list(hip3_coins.items())[:3]:
print(f' {coin}: asset_id={asset_id}')
# 验证 all_mids
mids = info_hip3.all_mids(dex=first_dex)
print(f'价格数据: {len(mids)} 个 coins, 示例: {dict(list(mids.items())[:2])}')
"
11.4 全量服务启动验证
HIP3_ENABLED=true python -m src.services.realtime_kline_service &
# 观察日志:
# ✓ "HIP-3 支持已启用 | DEX 数量: N"
# ✓ "HIP-3 币种发现完成: N 个"
# ✓ "WS 订阅构建完成: N 个订阅(上限 1000,当前占用 X%)"
# ✗ 不应出现 "WS 订阅数量超限" 警告
11.5 交易执行验证(testnet)
TRADING_NETWORK=testnet HIP3_ENABLED=true \
python -m src.scripts.testnet_open_close_observe
# 观察日志:
# ✓ "_coin_meta 包含 HIP-3 coins(如 xyz:XYZ100)"
# ✓ "forceIsolated=True 触发隔离保证金模式"
# ✓ "HIP-3 DEX 列表加载完成"
12. 风险与注意事项
12.1 WS 订阅配额风险
风险:若 cointegrated_pairs 中导入大量 HIP-3 配对(每个 coin × 3 周期),可能超过 1000 上限。
缓解措施:
- 服务启动时检查并日志输出订阅数量和占用比例
- 若超过 900 个,发出告警
- 建议在
cointegrated_pairs导入时保持 HIP-3 配对总数 < 30 个(90 个订阅)
# _build_market_subscriptions() 中加入检查
total = len(subscriptions)
if total > 900:
self.logger.warning(
f"⚠️ WS 订阅数量接近上限: {total}/1000,"
f"建议减少 HIP-3 cointegrated_pairs 数量"
)
12.2 隔离保证金资金管理
HIP-3 仅支持隔离保证金,每个仓位的资金独立。
注意事项:
- 每次开仓会实际划拨保证金到该仓位账户
- 最大持仓数
max_open_pairs需考虑 HIP-3 仓位的资金占用 - 建议 HIP-3 仓位使用较小的
base_position_usd
12.3 HIP-3 费用较高
HIP-3 费用是标准 perp 的 2 倍,会影响策略盈利能力。
建议:回测时使用实际费率(约 0.06% taker),而非标准 perp 费率(约 0.03%)。
12.4 流动性风险
HIP-3 市场流动性通常低于主网标准永续,可能导致:
- 滑点远超预期
- 限价单难以成交
- 价格容易被大单影响
建议:
- 优先使用较小仓位
- 设置更严格的
min_zscore_abs阈值(提高入场质量) - 监控
impactPxs字段评估流动性深度
12.5 DEX 生命周期风险
HIP-3 DEX 由第三方部署,存在:
- DEX 突然关闭(需要所有市场先关闭,再等待 30 天)
- 预言机更新延迟(标记价格可能过期)
- 部署者参数变更
建议:通过飞书告警监控 HIP-3 仓位的异常(如流动性骤降、价格异常)。
12.6 数据完整性
HIP-3 历史数据通常较短(新市场),协整分析需要 ≥ 358 根 4H K 线(约 60 天)。系统会通过现有的 MIN_4H_DATA_POINTS 检查自动跳过数据不足的配对,无需额外处理。
文档完毕。如需更新,请同步修改对应版本号。