双WebSocket架构设计6
双 WebSocket 架构设计文档(K 线主网 + 订单/用户推送随配置)
1. 目标与约束
需求
- K 线与 L2 订单簿:始终连接主网 WebSocket(
wss://api.hyperliquid.xyz/ws),与TRADING_NETWORK无关。 - orderUpdates / userFills / user:连接随
TRADING_NETWORK变化的 WebSocket(testnet 时wss://api.hyperliquid-testnet.xyz/ws,mainnet 时与主网一致),与下单/查单网络保持一致。
非目标
- 不改变现有交易 HTTP API、订单跟踪逻辑、WebSocketOrderManager 行为;仅改变「订单/用户推送」的数据来源连接。
安全默认值
TRADING_NETWORK默认值为"mainnet",防止忘记配置时静默连到测试网下单。
架构范围:本架构仅采用双 WebSocket(行情 WS + 交易 WS),不考虑单 WS 回滚。
背景:当前为单 WebSocket 连接主网;当 TRADING_NETWORK=testnet 时订单在测试网提交,但订单/用户推送仍来自主网连接,导致收不到测试网 orderUpdates/userFills,持仓/余额数据也与测试网不一致。本设计通过拆分为 Market WS(主网固定)+ Trading WS(随配置)解决该问题。
2. 现状分析
2.1 单 WebSocket 架构(当前)
src/config.py:WS_URL = "wss://api.hyperliquid.xyz/ws",写死主网。_build_subscriptions()一次性构建:candle + l2Book +(若启用交易)orderUpdates、userFills、user。_init_service_threads():只创建一个EnhancedWebSocketManager,绑定单一回调。EnhancedWebSocketManager:self.ws_url = WS_URL,无参数化。
因此:K 线、L2、订单推送共用一个主网连接。当 TRADING_NETWORK=testnet 时,订单在测试网下单,但推送仍来自主网连接,导致:
- 收不到测试网订单的 orderUpdates/userFills,只能依赖 HTTP 兜底
userchannel 的持仓/余额是主网数据,与测试网实际账户不一致
2.2 消息处理调用链
EnhancedWebSocketManager._wrapped_callback() 的调用顺序:
WebSocket 收到消息
→ _on_message() [解析 JSON]
→ _wrapped_callback()
1. health_monitor.on_message() # 健康监控
2. _cache_latest_data(msg) # 缓存数据 + 发布 EventBus 事件 ← 先
3. for cb in message_callbacks: cb(msg) # 触发外部回调 ← 后
关键结论:当外部回调(on_message / _on_trading_message)被调用时,_cache_latest_data 已经执行完毕,EventBus 事件已经发布。因此 user channel 的 PositionUpdatedEvent / OrderFilledEvent / BalanceChangedEvent 已发布,_on_trading_message 无需重复处理 user channel。
2.3 事件发布全景
EnhancedWebSocketManager 通过 _cache_latest_data() 和 _on_open() 发布以下事件到全局 EventBus:
| 触发时机 | 事件类型 | 关键字段 |
|---|---|---|
_on_open 重连时 |
WebSocketReconnectedEvent |
downtime_seconds, source |
_on_open 首连时 |
WebSocketConnectedEvent |
connection_id, source |
_on_close 断连时 |
WebSocketDisconnectedEvent |
reason, reconnect_scheduled |
user 频道 assetPositions |
PositionUpdatedEvent |
positions, account_value, margin_summary |
user 频道 fills |
OrderFilledEvent |
order_id, filled_qty, filled_price, fee |
user 频道 marginSummary |
BalanceChangedEvent |
available_balance, total_balance, margin_used |
| candle 频道 | CandleUpdatedEvent |
K 线数据 |
| l2Book 频道 | OrderBookUpdatedEvent |
订单簿数据 |
双 WS 架构下 source 字段区分来源:Market WS 发布的事件 source="market",Trading WS 发布的事件 source="trading"。
2.4 EventBus 单例
EventBus 是线程安全单例。所有 WS manager 和 executor 共享同一个 EventBus,无需"注册第二个 bus"。只需通过事件 source 字段区分来源,executor 的 _subscribe_events() 天然能收到两个 WS manager 发布的事件。
2.5 userFills 与 user.fills 职责分工
userFillschannel →_on_trading_message→WebSocketOrderManager.handle_message()→ 维护订单状态机userchannel(含fills字段)→_cache_latest_data()→_publish_user_events()→OrderFilledEvent(供 Executor 的持仓/余额缓存刷新)
两者不重叠,均从 Trading WS 来,处理路径独立。
3. 目标架构
flowchart LR
subgraph config [Config]
WS_URL["WS_URL (主网固定)"]
WS_TRADING_URL["WS_TRADING_URL (随 TRADING_NETWORK)"]
end
subgraph market [Market WS — 主网]
Mgr1["EnhancedWebSocketManager\nsource='market'"]
Mgr1 -->|订阅| Sub1[candle / l2Book]
Mgr1 -->|wss://api.hyperliquid.xyz/ws| HL_M[Hyperliquid Mainnet]
end
subgraph trading [Trading WS — 随配置]
Mgr2["EnhancedWebSocketManager\nsource='trading'"]
Mgr2 -->|订阅| Sub2[orderUpdates / userFills / user]
Mgr2 -->|WS_TRADING_URL| HL_T[Hyperliquid Testnet/Mainnet]
end
config --> Mgr1
config --> Mgr2
Mgr1 -->|on_message| Kline[K 线 + L2 处理]
Mgr1 -->|EventBus source=market| EB[全局 EventBus]
Mgr2 -->|_on_trading_message| OrderMgr[WebSocketOrderManager]
Mgr2 -->|EventBus source=trading| EB
EB -->|过滤 source=trading| Exec[Executor]
核心设计决策:
- Market WS:仅订阅 candle、l2Book;URL 固定主网;
source="market"。 - Trading WS:仅订阅 orderUpdates、userFills、user;URL 为
WS_TRADING_URL;source="trading"。 - EventBus 不改:保持全局单例,用事件
source字段区分来源;executor 的事件 handler 仅处理source="trading"的事件。
4. 数据流
4.1 行情(Market WS)
主网 WS → _wrapped_callback()
1. _cache_latest_data() → CandleUpdatedEvent(source="market")、OrderBookUpdatedEvent(source="market") → EventBus + L2 缓存更新
2. on_message() → K 线解析 → kline_buffer → 分析入队
get_global_ws_manager() 返回该 manager,L2 缓存基于主网数据,供 executor 的 get_all_mids() 等读取。
4.2 订单与用户(Trading WS)
交易网 WS → _wrapped_callback()
1. _cache_latest_data():
- orderUpdates/userFills → 无 EventBus 事件(由外部回调处理)
- user channel → _publish_user_events():
PositionUpdatedEvent(source="trading") → EventBus → Executor(过滤通过)
OrderFilledEvent(source="trading") → EventBus → Executor(过滤通过)
BalanceChangedEvent(source="trading") → EventBus → Executor(过滤通过)
2. _on_trading_message():
- user channel → 已由步骤1处理,显式跳过
- orderUpdates/userFills → WebSocketOrderManager.handle_message()
- 无管理器时 → 入队 _order_msg_buffer(由 _periodic_buffer_flush 每 2 秒清空)
4.3 重连
Trading WS 重连 → WebSocketReconnectedEvent(source="trading")
→ Executor._on_websocket_reconnected() 检查 source=="trading"
→ 触发 verify_pending_orders 双轮补查 ✅
Market WS 重连 → WebSocketReconnectedEvent(source="market")
→ Executor._on_websocket_reconnected() 检查 source≠"trading"
→ 跳过,不触发订单补查 ✅
4.4 半连接降级
| 状态 | 系统行为 |
|---|---|
| Market WS 断开 + Trading WS 正常 | K 线/L2 缓存过期,executor 仍可下单(HTTP),L2 价格不更新;飞书告警 [行情WS] 断连 |
| Market WS 正常 + Trading WS 断开 | K 线正常,订单推送回退 HTTP 兜底;飞书告警 [交易WS] 断连;持仓/余额依赖 HTTP 查询 |
| 两者都断开 | 全面降级,所有功能依赖 HTTP;两个 WS 各自独立重连 |
5. 模块级设计
| 模块 | 职责变化 |
|---|---|
| config | 新增 WS_TRADING_URL(随 TRADING_NETWORK 推导) |
| EnhancedWebSocketManager | 新增可选 ws_url、source_name 参数;source_name 用于所有事件发布 |
| RealtimeKlineServiceBase | 拆分订阅;交易 WS 消息单独回调;双 manager 生命周期管理 |
| Executor | 三个事件 handler 增加 source 过滤,仅处理 source="trading" 的事件 |
6. 文件级实现规格
行号说明:本节行号为编写时参考,实现时以方法名/逻辑块为准。
6.1 src/config.py
from src.config import WS_URL, WS_TRADING_URL # 模块顶层统一导入
# 交易推送 WebSocket 地址(随 TRADING_NETWORK 切换)
_trading_network = os.getenv("TRADING_NETWORK", "mainnet").lower()
if _trading_network not in ("mainnet", "testnet"):
logging.getLogger(__name__).warning(
"TRADING_NETWORK=%s 非法,回退为 mainnet", os.getenv("TRADING_NETWORK")
)
_trading_network = "mainnet"
WS_TRADING_URL = os.getenv("WS_TRADING_URL") or (
"wss://api.hyperliquid-testnet.xyz/ws"
if _trading_network == "testnet"
else "wss://api.hyperliquid.xyz/ws"
)
说明:
TRADING_NETWORK默认"mainnet"(安全默认值)。- 非法值打 warning 并回退 mainnet。
WS_TRADING_URL支持环境变量直接覆盖。- 行数:约 +10。
6.2 src/utils/websocket/enhanced_ws_manager.py
6.2.1 构造函数扩展
在现有参数列表末尾新增 ws_url、source_name:
def __init__(
self,
subscriptions: List[Dict],
on_state_change: Optional[Callable] = None,
timeout: int = WS_TIMEOUT,
skip_disconnects: bool = False,
alert_callback: Optional[Callable] = None,
max_retries: int | None = WS_MAX_RETRIES,
alert_threshold: int = WS_ALERT_THRESHOLD,
ws_url: Optional[str] = None, # ← 新增
source_name: str = "websocket", # ← 新增
):
...
self.ws_url = ws_url if ws_url is not None else WS_URL # ← 修改
self._source_name = source_name # ← 新增
6.2.2 事件 source 字段参数化
将所有 self._event_bus.publish(...) 中的 source="websocket" 替换为 source=self._source_name:
| 方法 | 事件类型 |
|---|---|
_on_open |
WebSocketReconnectedEvent、WebSocketConnectedEvent |
_on_close/_on_error |
WebSocketDisconnectedEvent |
_cache_latest_data |
CandleUpdatedEvent、OrderBookUpdatedEvent、PriceUpdatedEvent |
_publish_user_events |
PositionUpdatedEvent、OrderFilledEvent、BalanceChangedEvent |
共 9 处单行替换:source="websocket" → source=self._source_name。
行数:约 +4 新增,9 行修改。
6.3 src/services/realtime_kline_service_base.py
顶层导入
from src.config import WS_URL, WS_TRADING_URL
6.3.1 属性初始化(__init__)
self.ws_trading_manager: Optional[EnhancedWebSocketManager] = None
self._trading_ws_thread: Optional[threading.Thread] = None
6.3.2 订阅拆分
_build_subscriptions() 删除交易订阅逻辑,仅保留 candle + l2Book。
新增 _build_trading_subscriptions():
def _build_trading_subscriptions(self) -> List[Dict]:
"""构建交易 WS 订阅列表(orderUpdates / userFills / user)。"""
executor = getattr(self._trading_orchestrator, '_executor', None) \
if self._trading_orchestrator else None
if executor is None:
self.logger.info("交易模块未启用,跳过交易 WS 订阅构建")
return []
user_address = executor._wallet.address
return [
{"type": "orderUpdates", "user": user_address},
{"type": "userFills", "user": user_address},
{"type": "user", "user": user_address},
]
6.3.3 订阅初始化(__init__)
self.subscriptions = self._build_subscriptions()
self.trading_subscriptions = self._build_trading_subscriptions()
6.3.4 _init_service_threads()
def _init_service_threads(self):
# —— 行情 WS(主网固定,source="market")——
self.ws_manager = EnhancedWebSocketManager(
subscriptions=self.subscriptions,
on_state_change=self.on_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_system_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
source_name="market",
)
self.ws_manager.add_message_callback(self.on_message)
_set_global_ws_manager(self.ws_manager)
# —— 交易 WS(随配置,source="trading")——
if self.trading_subscriptions:
self.ws_trading_manager = EnhancedWebSocketManager(
subscriptions=self.trading_subscriptions,
on_state_change=self.on_state_change,
timeout=WS_TIMEOUT,
alert_callback=lambda t, c: self._send_system_alert(f"[交易WS] {t}", c),
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
ws_url=WS_TRADING_URL,
source_name="trading",
)
self.ws_trading_manager.add_message_callback(self._on_trading_message)
self.logger.info(
f"🌐 WS 连接配置 | 行情: {WS_URL} (主网固定)"
f" | 交易: {WS_TRADING_URL} (TRADING_NETWORK={os.getenv('TRADING_NETWORK','mainnet')})"
)
else:
self.logger.info("交易订阅为空(TRADING_ENABLED=false),仅启动行情 WS")
6.3.5 交易 WS 消息回调 _on_trading_message
def _on_trading_message(self, msg: Dict):
"""交易 WS 消息回调。
调用此方法时 _cache_latest_data() 已执行完毕:
- user channel 的 EventBus 事件已发布,此处直接跳过。
- orderUpdates/userFills 路由给 WebSocketOrderManager 或入队缓冲区。
- 缓冲区由 _periodic_buffer_flush 每 2 秒统一清空。
"""
channel = msg.get("channel") or msg.get("type", "")
if channel == "user":
return # 已由 _cache_latest_data → _publish_user_events 处理
if channel in ("orderUpdates", "userFills"):
mgr = self._get_ws_order_manager()
if mgr is not None:
try:
mgr.handle_message(msg)
except Exception as e:
self.logger.error(f"处理交易WS订单消息失败: {e}", exc_info=True)
else:
self._order_msg_buffer.append(msg)
6.3.6 行情 on_message 精简
从现有 on_message() 中删除:
- 缓冲区回放逻辑(已移至
_periodic_buffer_flush) - orderUpdates/userFills 路由逻辑(订单消息仅由
_on_trading_message处理)
def on_message(self, msg: Dict):
if self._message_dedup.is_duplicate(msg):
return
kline = self._parse_kline(msg)
if not kline:
return
self.kline_buffer.put_nowait(kline)
6.3.7 生命周期管理
start():
def start(self):
... # 原有工作线程启动(含 _buffer_flush_thread)
if self.ws_trading_manager is not None:
self._trading_ws_thread = threading.Thread(
target=self.ws_trading_manager.start,
daemon=True,
name="trading-ws",
)
self._trading_ws_thread.start()
self.ws_manager.start() # 阻塞主线程
stop()(顺序:先停 orchestrator → 交易 WS → 行情 WS):
def stop(self):
if self._trading_orchestrator:
self._trading_orchestrator.stop()
if self.ws_trading_manager is not None:
self.ws_trading_manager.stop()
if self._trading_ws_thread is not None and self._trading_ws_thread.is_alive():
self._trading_ws_thread.join(timeout=5)
if self._trading_ws_thread.is_alive():
self.logger.warning("交易 WS 线程在 5s 内未退出")
self.ws_manager.stop()
...
6.3.8 动态订阅(新币种)
add_subscriptions 仅对 self.ws_manager 调用(只加 candle/l2Book),无需改动。
行数变化:约 +55~70 新增,-20~25 删除,净增约 30~45 行。
6.4 src/trading/executor.py
def _on_position_updated(self, event: PositionUpdatedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
def _on_balance_changed(self, event: BalanceChangedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
def _on_websocket_reconnected(self, event: WebSocketReconnectedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
行数:约 +3(每个 handler 加 1 行判断)。
7. 初始化时序
__init__
├── _init_trading_module() → TradingOrchestrator + Executor 初始化
│ └── Executor._subscribe_events() → 在全局 EventBus 上订阅 3 类事件
├── self.ws_trading_manager = None → 防御性初始化
├── self._trading_ws_thread = None → 防御性初始化
├── _build_subscriptions() → 行情订阅 (candle + l2Book)
├── _build_trading_subscriptions() → 交易订阅(交易启用时非空)
└── _init_service_threads()
├── ws_manager (行情, source="market")
└── ws_trading_manager (交易, source="trading") ← 有交易订阅时创建
start()
├── 工作线程启动(含 _buffer_flush_thread)
├── ws_trading_manager.start() ← daemon 线程(若存在,先启动)
└── ws_manager.start() ← 阻塞主线程
stop()
├── _trading_orchestrator.stop()
├── ws_trading_manager.stop() ← 先停交易 WS
├── _trading_ws_thread.join(timeout=5)
├── ws_manager.stop() ← 再停行情 WS
└── 等待队列清空
关键保证:
_init_trading_module()在_init_service_threads()之前执行,创建ws_trading_manager时_executor已存在。ws_trading_manager和_trading_ws_thread总在__init__中初始化为None,无需hasattr检查。_buffer_flush_thread服务于 service 级别的self._order_msg_buffer,与哪个 WS 收到消息无关,无需复制。
8. 边界与注意事项
8.1 get_global_ws_manager
始终返回行情 WS(主网),L2 缓存与 executor 的 get_all_mids() 等仍基于主网数据。
8.2 消息去重
on_message 中的 MessageDeduplicator 仅作用于行情消息。_on_trading_message 无需去重:交易消息量远小于行情,且 WebSocketOrderManager 内部有 oid 级去重。
8.3 并发安全
self._order_msg_buffer是deque(maxlen=500),CPython 中deque.append()和deque.popleft()是原子操作,多线程安全。ws_trading_manager.stop()后_trading_ws_thread.join(timeout=5)确保线程退出后再停行情 WS。
8.4 连接失败
Trading WS 连接失败行为与现有 EnhancedWebSocketManager 一致(重试 + 飞书告警 [交易WS]),不阻塞行情 WS,系统以降级模式运行(订单依赖 HTTP 兜底)。
8.5 start() 阻塞
EnhancedWebSocketManager.start() 是阻塞调用,因此交易 WS 必须在独立 daemon 线程启动,行情 WS 保持阻塞主线程(与当前行为一致)。
8.6 mainnet=mainnet 场景
TRADING_NETWORK=mainnet 时,WS_TRADING_URL = WS_URL,两个 manager 建立两条独立连接,功能正常,逻辑分离清晰。
8.7 事件 source 过滤约定
凡依赖「交易/账户数据」的事件消费者,均须只处理 source="trading" 的事件,包括:PositionUpdatedEvent、BalanceChangedEvent、WebSocketReconnectedEvent、OrderFilledEvent、WebSocketConnectedEvent/WebSocketDisconnectedEvent(用于交易 WS 状态时)。新增订阅方时须遵守此约定。
9. 监控指标
| 指标 | 来源 | 告警条件 |
|---|---|---|
market_ws_state |
Market WS on_state_change |
连接断开 >30s |
trading_ws_state |
Trading WS on_state_change(带 [交易WS] 前缀区分日志) |
连接断开 >10s |
order_msg_buffer_size |
_periodic_buffer_flush 每 2 秒采样 len(_order_msg_buffer) |
>100 条 |
trading_ws_thread_alive |
stop() 中的 join 结果 |
join 超时 |
启动时打印综合日志(在 _init_service_threads() 末尾):
🌐 WS 连接配置
行情 WS: wss://api.hyperliquid.xyz/ws (主网固定)
交易 WS: wss://api.hyperliquid-testnet.xyz/ws (TRADING_NETWORK=testnet)
10. 测试验证清单
功能测试
| # | 场景 | 预期 |
|---|---|---|
| 1 | TRADING_NETWORK=testnet |
行情 WS 连主网,交易 WS 连测试网,日志确认两个 URL |
| 2 | 测试网下单后 | 能通过交易 WS 收到 orderUpdates/userFills |
| 3 | 交易 WS 重连 | executor 触发 verify_pending_orders 双轮补查 |
| 4 | 行情 WS 重连 | executor 不触发订单补查 |
| 5 | TRADING_ENABLED=false |
仅一个行情 WS,无交易 WS,无报错 |
| 6 | TRADING_NETWORK=mainnet |
两个 WS 连同一地址,功能正常(逻辑分离) |
| 7 | 交易 WS 断开期间下单 | HTTP 兜底正常,重连后推送恢复 |
单元测试
| # | 测试点 | 验证方式 |
|---|---|---|
| T1 | source="market" 的 WebSocketReconnectedEvent |
Executor verify_pending_orders 不触发 |
| T2 | source="trading" 的 PositionUpdatedEvent |
Executor 缓存正确更新 |
| T3 | source="market" 的 PositionUpdatedEvent |
Executor 缓存不更新 |
| T4 | _on_trading_message 收到 user channel |
不调用 WebSocketOrderManager,无副作用 |
| T5 | 缓冲区回放 | 管理器就绪后 2 秒内缓冲区应被清空 |
压力/并发测试
| # | 场景 | 预期 |
|---|---|---|
| P1 | 两个 WS 同时高频推送消息 | 无消息串扰,行情/交易各自独立处理 |
| P2 | _periodic_buffer_flush 消费 _order_msg_buffer |
deque.popleft 原子性保证每条消息只被处理一次(业务层另有 oid 去重) |
| P3 | stop() 时交易 WS 仍有未处理消息 |
join(timeout=5) 后消息被合理丢弃或记录 |
11. 改动量汇总
| 文件 | 新增 | 修改/删除 | 合计(约) |
|---|---|---|---|
| src/config.py | +10 | 0 | 10 |
| src/utils/websocket/enhanced_ws_manager.py | +4 | ~12(9 处 source 替换 + 3 处签名) | ~16 |
| src/services/realtime_kline_service_base.py | +55~70 | -20~25(删除旧订单路由) | 30~45 |
| src/trading/executor.py | +3 | 0 | 3 |
| 合计 | — | — | 约 59~74 |
12. 依赖与兼容性
-
不新增第三方依赖。
-
Hyperliquid 测试网 WS 地址:
wss://api.hyperliquid-testnet.xyz/ws(官方文档)。 -
Event 基类
source: str字段已存在(src/events/base.py),无需修改事件定义。 -
.env.example新增:# 交易网络配置(mainnet/testnet) # 影响下单/查单 HTTP API 和订单推送 WS 地址 TRADING_NETWORK=mainnet # 手动覆盖交易推送 WS 地址(可选,默认由 TRADING_NETWORK 推导) # WS_TRADING_URL=wss://api.hyperliquid-testnet.xyz/ws