双 WebSocket 架构设计文档(K 线主网 + 订单/用户推送随配置)
双 WebSocket 架构设计文档(K 线主网 + 订单/用户推送随配置)
1. 目标与约束
需求
- K 线与 L2 订单簿:始终连接主网 WebSocket(
wss://api.hyperliquid.xyz/ws),与TRADING_NETWORK无关。 - orderUpdates / userFills / user:连接随
TRADING_NETWORK变化的 WebSocket(testnet 时wss://api.hyperliquid-testnet.xyz/ws,mainnet 时与主网一致),与下单/查单网络一致。
非目标
- 不改变现有交易 HTTP API、订单跟踪逻辑、WebSocketOrderManager 行为;仅改变「订单/用户推送」的数据来源连接。
2. 现状简述
当前为单 WebSocket:
src/config.py第 104 行:WS_URL = "wss://api.hyperliquid.xyz/ws",写死主网。src/services/realtime_kline_service_base.py的_build_subscriptions()(401~458 行)一次性构建:candle + l2Book +(若启用交易)orderUpdates、userFills、user。src/services/realtime_kline_service_base.py第 306~314 行:只创建一个EnhancedWebSocketManager(self.subscriptions, ...),并add_message_callback(self.on_message)。src/utils/websocket/enhanced_ws_manager.py第 274 行:self.ws_url = WS_URL,无参数化。
因此:K 线、L2、订单推送共用一个主网连接。当 TRADING_NETWORK=testnet 时,订单在测试网下单,但推送仍来自主网连接,导致收不到测试网订单的 orderUpdates/userFills,只能依赖 HTTP 兜底。
Executor 侧:src/trading/executor.py 第 86 行自建 EventBus(),第 1708~1712 行订阅 PositionUpdatedEvent、BalanceChangedEvent、WebSocketReconnectedEvent。目前没有任何地方将「全局 WS 管理器」的 _event_bus 与 executor 的订阅绑定(两条总线独立),因此持仓/余额的 WS 驱动更新在当前实现下可能未生效;本次设计让「交易 WS」的 event_bus 显式注册到 executor,保证订单与用户推送来自同一网络。
3. 目标架构
flowchart LR
subgraph config [Config]
WS_URL[WS_URL mainnet]
WS_TRADING_URL[WS_TRADING_URL from TRADING_NETWORK]
end
subgraph market [Market WS - Mainnet]
Mgr1[EnhancedWebSocketManager]
Mgr1 -->|subscribe| Sub1[candle l2Book]
Mgr1 -->|wss://api.hyperliquid.xyz/ws| HL_M[Hyperliquid Mainnet]
end
subgraph trading [Trading WS - Config]
Mgr2[EnhancedWebSocketManager]
Mgr2 -->|subscribe| Sub2[orderUpdates userFills user]
Mgr2 -->|WS_TRADING_URL| HL_T[Hyperliquid Testnet/Mainnet]
end
config --> Mgr1
config --> Mgr2
Mgr1 -->|on_message| Kline[K-line + L2 处理]
Mgr2 -->|_on_trading_message| OrderMgr[WebSocketOrderManager]
Mgr2 -->|_event_bus| Exec[Executor 持仓/余额/重连]
- Market WS:仅订阅 candle、l2Book;URL 固定主网;消息由现有
on_message处理(K 线写入、分析入队、L2 缓存)。 - Trading WS:仅订阅 orderUpdates、userFills、user;URL 为
WS_TRADING_URL;消息经_on_trading_message转交 WebSocketOrderManager;user在 EnhancedWebSocketManager 内部通过_cache_latest_data->_publish_user_events发布到该连接自己的_event_bus,Executor 订阅该 bus。
4. 数据流
行情(主网)
- 主网 WS 收到 candle / l2Book ->
on_message-> 解析 K 线 / 更新 L2 缓存 / 入队分析;get_global_ws_manager()仍返回该 manager,供 executor 的 L2 价格等使用。
订单与用户(随配置)
- 交易网 WS 收到 orderUpdates/userFills ->
_on_trading_message-> buffer 回放 +WebSocketOrderManager.handle_message(msg)。 - 交易网 WS 收到 user -> 在 EnhancedWebSocketManager 的
_wrapped_callback中经_cache_latest_data发布 PositionUpdatedEvent/BalanceChangedEvent 到该 manager 的_event_bus;Executor 通过register_trading_ws_event_bus(ws_trading_manager._event_bus)订阅,从而更新持仓/余额缓存并响应重连补查。
重连
- 仅「交易 WS」重连时发布
WebSocketReconnectedEvent,Executor 的_on_websocket_reconnected触发,执行verify_pending_orders双轮补查。行情 WS 重连不触发订单补查。
5. 模块级设计
| 模块 | 职责 |
|---|---|
| config | 保留 WS_URL 为主网;新增 WS_TRADING_URL,由 TRADING_NETWORK 推导。 |
| EnhancedWebSocketManager | 支持可选 ws_url 构造参数;不传则用 WS_URL(保持向后兼容)。 |
| RealtimeKlineServiceBase | 拆分为「行情订阅」与「交易订阅」;创建并持有两个 manager;交易 WS 消息单独回调;start/stop 双连接;将交易 WS 的 event_bus 注册到 executor。 |
| Executor | 新增 register_trading_ws_event_bus(event_bus),在该 bus 上订阅 PositionUpdated、BalanceChanged、WebSocketReconnected。 |
6. 文件级实现规格
6.1 src/config.py
- 位置:WebSocket 配置块(约 100~105 行之后)。
- 改动:
- 保持
WS_URL = "wss://api.hyperliquid.xyz/ws"不变。 - 新增:读取
TRADING_NETWORK(默认"testnet"),据此设置WS_TRADING_URL:testnet->"wss://api.hyperliquid-testnet.xyz/ws"- 否则 ->
"wss://api.hyperliquid.xyz/ws"
- 可选:支持通过环境变量
WS_TRADING_URL显式覆盖(便于调试或特殊部署)。
- 保持
- 行数:约 +5~7。
6.2 src/utils/websocket/enhanced_ws_manager.py
- 位置:
__init__签名与self.ws_url赋值(229~238 行、274 行)。 - 改动:
- 在
__init__中增加可选参数:ws_url: Optional[str] = None。 - 将
self.ws_url = WS_URL改为:self.ws_url = ws_url if ws_url is not None else WS_URL。 - 文档字符串中说明:不传
ws_url时使用 config 中的主网WS_URL。
- 在
- 行数:约 2~3 行修改。
6.3 src/services/realtime_kline_service_base.py
6.3.1 订阅拆分
_build_subscriptions()(401~458 行)- 仅保留 candle + l2Book 的构建逻辑;删除第 443~457 行中「若启用交易则 extend orderUpdates、userFills、user」的整块逻辑。
- 返回值仅包含行情订阅。
- 新增
_build_trading_subscriptions() -> List[Dict]- 条件与当前一致:存在
_trading_orchestrator且其_executor可用。 - 成功时返回
[{"type": "orderUpdates", "user": user_address}, {"type": "userFills", "user": user_address}, {"type": "user", "user": user_address}];否则返回[]。 - 打日志说明「已添加交易 WebSocket 订阅(随 TRADING_NETWORK)」。
- 条件与当前一致:存在
6.3.2 初始化与双 manager 创建
- 订阅与属性(约 206~208 行)
self.subscriptions = self._build_subscriptions()保持不变(仅行情)。- 新增:
self.trading_subscriptions = self._build_trading_subscriptions();若需在日志中区分,可打印行情订阅数与交易订阅数。
_init_service_threads()(277~325 行)- 创建行情 WS:
self.ws_manager = EnhancedWebSocketManager(subscriptions=self.subscriptions, ...),不传ws_url;add_message_callback(self.on_message)。保持_global_ws_manager = self.ws_manager,行为与现有一致。 - 若
self.trading_subscriptions非空:- 创建
self.ws_trading_manager = EnhancedWebSocketManager(subscriptions=self.trading_subscriptions, ..., ws_url=WS_TRADING_URL)(需从 config 导入WS_TRADING_URL)。 add_message_callback(self._on_trading_message)。- 调用
self._trading_orchestrator._executor.register_trading_ws_event_bus(self.ws_trading_manager._event_bus),使持仓/余额/重连事件来自交易网。
- 创建
- 若
self.trading_subscriptions为空,不创建ws_trading_manager,不调用register_trading_ws_event_bus。
- 创建行情 WS:
6.3.3 交易 WS 消息回调
- 新增
_on_trading_message(self, msg: Dict)- 复用现有订单消息处理逻辑:先做与
on_message中一致的「订单消息 buffer 回放」(若有未消费的缓冲则依次mgr.handle_message(buffered))。 channel = msg.get("channel") or msg.get("type", "");若channel in ("orderUpdates", "userFills"),则调用_get_ws_order_manager()得到 mgr,非空则mgr.handle_message(msg),否则将 msg 放入_order_msg_buffer(与现有 639~660 行逻辑一致)。- 不在此方法中处理
channel == "user":由 EnhancedWebSocketManager 内部_cache_latest_data->_publish_user_events发布到该连接自己的_event_bus,Executor 已订阅该 bus。
- 复用现有订单消息处理逻辑:先做与
6.3.4 生命周期
- start()(约 2176 行)
- 先
self.ws_manager.start();若存在self.ws_trading_manager,再self.ws_trading_manager.start()。
- 先
- stop()(约 2205~2206 行)
- 若存在
self.ws_trading_manager,先self.ws_trading_manager.stop(),再self.ws_manager.stop();否则仅self.ws_manager.stop()。
- 若存在
6.3.5 动态订阅(新币种)
src/services/realtime_kline_service_base.py第 2068~2072 行:add_subscriptions仅对self.ws_manager调用(只加 candle/l2Book),不涉及交易 WS。无需改逻辑。
6.3.6 导入
- 在文件顶部或使用处增加对
WS_TRADING_URL的导入(与现有WS_TIMEOUT、WS_ALERT_THRESHOLD等同一来源)。
行数:约 +55~85(含新增方法与修改)。
6.4 src/trading/executor.py
- 位置:在事件订阅相关区域(如 1704 行附近)新增方法。
- 改动:
- 新增方法:
register_trading_ws_event_bus(self, event_bus),内部对该event_bus执行与_subscribe_events相同的三类订阅:PositionUpdatedEvent ->_on_position_updated,BalanceChangedEvent ->_on_balance_changed,WebSocketReconnectedEvent ->_on_websocket_reconnected。 - 可选:在方法内打一条 debug 日志,标明「已注册交易 WS 的 event_bus」。
- 新增方法:
- 说明:保留现有
_subscribe_events()对self._event_bus的订阅无妨(当前无其他发布源);若希望语义更清晰,可仅在存在「已注册的 trading event_bus」时依赖该 bus,不在本设计内强制修改现有订阅逻辑。 - 行数:约 +8~12。
7. 边界与注意事项
- 启动顺序:交易模块(含 executor)在
_init_trading_module中先于_init_service_threads初始化,因此创建ws_trading_manager时_trading_orchestrator._executor已存在;仅在trading_subscriptions非空时创建交易 WS 并注册 event_bus,避免未启用交易时访问 executor。 - get_global_ws_manager:始终返回行情 WS(主网),L2 缓存与 executor 的
get_all_mids()等仍基于主网数据,符合「K 线/L2 始终主网」。 - 去重:订单消息去重若依赖
MessageDeduplicator,需确认其作用域;若当前按连接维度去重,则两个连接各自去重即可;若全局去重且 key 含 channel,则 orderUpdates/userFills 来自不同连接,一般不会误判重复。 - 飞书/告警:两个 manager 可共用同一
alert_callback(如_send_system_alert);若需区分告警来源,可在创建 trading manager 时传入不同 callback 或带 source 的包装。 - 测试:建议验证 (1) TRADING_NETWORK=testnet 时仅交易 WS 连测试网、行情 WS 连主网;(2) 测试网下单后能收到 orderUpdates/userFills;(3) 交易 WS 重连后 executor 执行订单补查;(4) TRADING_ENABLED=false 时仅一个行情 WS,无交易 WS。
8. 改动量汇总
| 文件 | 新增 | 修改/删除 | 合计(约) |
|---|---|---|---|
| src/config.py | 5~7 | 0 | 5~7 |
| src/utils/websocket/enhanced_ws_manager.py | 0 | 2~3 | 2~3 |
| src/services/realtime_kline_service_base.py | 55~75 | 15~25 | 70~95 |
| src/trading/executor.py | 8~12 | 0 | 8~12 |
| 合计 | — | — | 约 85~117 |
9. 依赖与兼容性
- 不新增第三方依赖。
- Hyperliquid 测试网 WS 地址采用公开文档:
wss://api.hyperliquid-testnet.xyz/ws。 .env.example可选:在 WebSocket/交易相关说明中增加一句「订单与用户推送 WebSocket 随 TRADING_NETWORK 切换;K 线始终主网」。