双WebSocket架构设计10
双 WebSocket 架构设计文档(K 线主网 + 订单/用户推送随配置)
1. 目标与约束
背景:原单 WebSocket 连接主网;当 TRADING_NETWORK=testnet 时订单在测试网提交,但推送仍来自主网,导致收不到测试网 orderUpdates/userFills,持仓/余额数据与测试网不一致。本设计拆分为 Market WS(主网固定)+ Trading WS(随配置)解决该问题。
需求
- K 线与 L2 订单簿:始终连接主网 WebSocket(
wss://api.hyperliquid.xyz/ws),与TRADING_NETWORK无关。 - orderUpdates / userFills / user:连接随
TRADING_NETWORK变化的 WebSocket(testnet 时wss://api.hyperliquid-testnet.xyz/ws,mainnet 时与主网一致)。
非目标:不改变现有交易 HTTP API、订单状态机与 WebSocketOrderManager 的对外行为。
实现原则(必守)
- 按重写执行:相关方法按下文「契约与删除清单」重写,不以补丁方式实现;实现后单 WS 与订单同网的任何分支、注释零残留。
- 不保留兼容路径:不保留「若未传
ws_url则用全局 URL」「若未传source_name则用websocket」的默认值;两处创建 EnhancedWebSocketManager 时必须显式传入ws_url与source_name。 - 事件 source 合法取值仅两种:
"market"、"trading"。实现后全局禁止出现source="websocket"。
安全默认值:TRADING_NETWORK 默认值为 "mainnet",防止忘记配置时静默连到测试网下单。
架构范围:仅采用双 WebSocket(行情 WS + 交易 WS),不考虑单 WS 回滚。
2. 目标架构
以下为唯一实现形态;实现后不存在单 WS 分支、不存在按配置切换单/双 WS 的逻辑。
flowchart LR
subgraph config [Config]
WS_MARKET_URL["WS_MARKET_URL (主网固定)"]
WS_TRADING_URL["WS_TRADING_URL (随 TRADING_NETWORK)"]
end
subgraph market [Market WS — 主网]
Mgr1["EnhancedWebSocketManager\nsource='market'"]
Mgr1 -->|订阅| Sub1[candle / l2Book]
Mgr1 -->|WS_MARKET_URL| HL_M[Hyperliquid Mainnet]
end
subgraph trading [Trading WS — 随配置]
Mgr2["EnhancedWebSocketManager\nsource='trading'"]
Mgr2 -->|订阅| Sub2[orderUpdates / userFills / user]
Mgr2 -->|WS_TRADING_URL| HL_T[Hyperliquid Testnet/Mainnet]
end
config --> Mgr1
config --> Mgr2
Mgr1 -->|on_message| Kline[K 线 + L2 处理]
Mgr1 -->|EventBus source=market| EB[全局 EventBus]
Mgr2 -->|_cache_latest_data 发布 无 message 回调| EB
EB -->|OrderStatusEvent/OrderFilledEvent| OrderMgr[WebSocketOrderManager]
EB -->|过滤 source=trading| Exec[Executor]
核心设计决策:
- Market WS:仅订阅 candle、l2Book;URL 固定主网(
WS_MARKET_URL);source="market"。 - Trading WS:仅订阅 orderUpdates、userFills、user;URL 为
WS_TRADING_URL;source="trading"。 - 订单/用户数据流:仅通过 EventBus。orderUpdates/userFills 在 Trading WS 的
_cache_latest_data内发布 OrderStatusEvent/OrderFilledEvent;WebSocketOrderManager 仅通过订阅上述事件接收,不存在「原始 WS 消息 → handle_message」的调用。 - EventBus:保持全局单例;executor 的 handler 仅处理
source="trading"的事件。
3. 数据流
3.1 行情(Market WS)
主网 WS → _wrapped_callback()
1. _cache_latest_data() → CandleUpdatedEvent(source="market")、OrderBookUpdatedEvent(source="market") → EventBus + L2 缓存更新
2. on_message() → K 线解析 → kline_buffer → 分析入队
get_market_ws_manager() 返回该 manager,L2 缓存基于主网数据,供 executor 的 get_all_mids() 等读取。
3.2 订单与用户(Trading WS)
交易网 WS → _wrapped_callback()
→ _cache_latest_data():
- orderUpdates → _publish_order_status_events() → OrderStatusEvent(source="trading") → EventBus → WebSocketOrderManager 订阅
- userFills → _publish_fill_events() → OrderFilledEvent(source="trading") → EventBus → WebSocketOrderManager 订阅
- user → _publish_user_events() → PositionUpdatedEvent / BalanceChangedEvent(source="trading") → EventBus → Executor 过滤后处理
Trading WS 不注册任何 message 回调(不调用 add_message_callback),仅依赖 _cache_latest_data 发布事件。若初始化顺序保证 Executor/WebSocketOrderManager 在交易 WS 收包前已订阅 EventBus,无需缓冲与回放逻辑。
3.3 事件发布全景
| 触发时机 | 事件类型 | source |
|---|---|---|
_on_open 首连时 |
WebSocketConnectedEvent |
market / trading |
_on_open 重连时 |
WebSocketReconnectedEvent |
market / trading |
_on_close 断连时 |
WebSocketDisconnectedEvent |
market / trading |
user 频道 assetPositions |
PositionUpdatedEvent |
trading |
user 频道 fills |
OrderFilledEvent |
trading |
user 频道 marginSummary |
BalanceChangedEvent |
trading |
| orderUpdates 频道 | OrderStatusEvent |
trading |
| userFills 频道 | OrderFilledEvent |
trading |
| candle 频道 | CandleUpdatedEvent |
market |
| l2Book 频道 | OrderBookUpdatedEvent |
market |
3.4 重连
Trading WS 重连 → WebSocketReconnectedEvent(source="trading")
→ Executor._on_websocket_reconnected() 仅当 source=="trading" 时执行 → 触发 verify_pending_orders ✅
Market WS 重连 → WebSocketReconnectedEvent(source="market")
→ Executor._on_websocket_reconnected() 因 source≠"trading" 直接 return,不触发订单补查 ✅
3.5 半连接降级
| 状态 | 系统行为 |
|---|---|
| Market WS 断开 + Trading WS 正常 | K 线/L2 缓存过期,executor 仍可下单(HTTP),L2 价格不更新;飞书告警 [行情WS] |
| Market WS 正常 + Trading WS 断开 | K 线正常,订单推送回退 HTTP 兜底;飞书告警 [交易WS];持仓/余额依赖 HTTP |
| 两者都断开 | 全面降级,所有功能依赖 HTTP;两个 WS 各自独立重连 |
4. 模块级设计
| 模块 | 职责变化 |
|---|---|
| config | 提供 WS_MARKET_URL(主网固定)、WS_TRADING_URL(随 TRADING_NETWORK);删除 WS_URL。 |
| EnhancedWebSocketManager | 构造函数必选参数 ws_url: str、source_name: str(无默认值);所有事件发布使用 source=self._source_name,合法取值仅 market / trading。 |
| TradingOrchestrator | 新增 get_trading_ws_user_address(self) -> Optional[str]:有 executor 时返回钱包地址,否则返回 None。 |
| RealtimeKlineServiceBase | 订阅拆分为行情/交易两套;on_message 仅处理 K 线;Trading WS 不注册 message 回调;「是否启用交易 WS」与 user 地址仅通过 TradingOrchestrator.get_trading_ws_user_address() 获取,不探测 _executor/_wallet;双 manager 生命周期管理。 |
| Executor | 三个事件 handler 入口增加 source=="trading" 过滤,否则直接 return。 |
| get_market_ws_manager | 原 get_global_ws_manager 重命名,语义为「行情 WS(主网)」,禁止用于交易逻辑。 |
5. 文件级实现规格
实现时以方法名与下文契约/删除清单为准;行号仅作参考,若与当前代码不一致则以契约为准。
5.1 src/config.py
import os
import logging
# ---------- 行情 WebSocket(主网固定) ----------
WS_MARKET_URL = "wss://api.hyperliquid.xyz/ws"
# ---------- 交易 WebSocket(随 TRADING_NETWORK) ----------
_trading_network = os.getenv("TRADING_NETWORK", "mainnet").lower()
if _trading_network not in ("mainnet", "testnet"):
logging.getLogger(__name__).warning(
"TRADING_NETWORK=%s 非法,回退为 mainnet", os.getenv("TRADING_NETWORK")
)
_trading_network = "mainnet"
WS_TRADING_URL = os.getenv("WS_TRADING_URL") or (
"wss://api.hyperliquid-testnet.xyz/ws"
if _trading_network == "testnet"
else "wss://api.hyperliquid.xyz/ws"
)
删除清单:删除 WS_URL;所有对 WS_URL 的引用改为 WS_MARKET_URL 并删除原变量。
5.2 src/utils/websocket/enhanced_ws_manager.py
契约:ws_url 与 source_name 为必选参数,无默认值;事件 source 仅使用 self._source_name,合法取值仅 "market"、"trading";实现后全局禁止出现 source="websocket"。
5.2.1 构造函数
def __init__(
self,
subscriptions: List[Dict],
on_state_change: Optional[Callable] = None,
timeout: int = WS_TIMEOUT,
skip_disconnects: bool = False,
alert_callback: Optional[Callable] = None,
max_retries: int | None = WS_MAX_RETRIES,
alert_threshold: int = WS_ALERT_THRESHOLD,
ws_url: str, # 必选,无默认值
source_name: str, # 必选,取值仅 "market" 或 "trading"
):
...
self.ws_url = ws_url
self._source_name = source_name
5.2.2 事件发布
所有 _event_bus.publish(...) 中的 source= 使用 source=self._source_name(共约 9 处:_on_open 两处、_on_close/_on_error 一处、_publish_candle_event、_publish_orderbook_event、_publish_price_event、_publish_user_events 内多处、_publish_order_status_events、_publish_fill_events)。
删除清单:实现后全局搜索 source="websocket",确保结果为零。同步修改 src/events/base.py 中 Event 的 source 字段 docstring,仅保留「仅允许 market(行情 WS)或 trading(交易 WS)」,删除 websocket/http/internal 等旧语义。
5.3 src/services/realtime_kline_service_base.py
顶层导入
from src.config import WS_MARKET_URL, WS_TRADING_URL
5.3.1 属性初始化(__init__)
self.ws_trading_manager: Optional[EnhancedWebSocketManager] = None
self._trading_ws_thread: Optional[threading.Thread] = None
5.3.2 _build_subscriptions()
- 契约:仅返回 candle + l2Book 的订阅列表。
- 删除清单:不得包含 orderUpdates、userFills、user 的订阅构建;不得包含对
_executor/_wallet的访问;不得使用hasattr(..., '_executor')或getattr(..., '_executor', None)。
5.3.3 _build_trading_subscriptions()
def _build_trading_subscriptions(self) -> List[Dict]:
"""构建交易 WS 订阅列表(orderUpdates / userFills / user)。无可用地址时返回空列表。"""
if not self._trading_orchestrator:
return []
user_address = self._trading_orchestrator.get_trading_ws_user_address()
if not user_address:
return []
return [
{"type": "orderUpdates", "user": user_address},
{"type": "userFills", "user": user_address},
{"type": "user", "user": user_address},
]
5.3.4 订阅初始化(__init__)
self.subscriptions = self._build_subscriptions()
self.trading_subscriptions = self._build_trading_subscriptions()
5.3.5 _init_service_threads()
创建两个 manager 时均显式传入 ws_url 与 source_name:
def _init_service_threads(self):
# 行情 WS(主网固定,source="market")
self.ws_manager = EnhancedWebSocketManager(
subscriptions=self.subscriptions,
on_state_change=self.on_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_system_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
ws_url=WS_MARKET_URL,
source_name="market",
)
self.ws_manager.add_message_callback(self.on_message)
_set_market_ws_manager(self.ws_manager)
if self.trading_subscriptions:
self.ws_trading_manager = EnhancedWebSocketManager(
subscriptions=self.trading_subscriptions,
on_state_change=self._on_trading_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_trading_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
ws_url=WS_TRADING_URL,
source_name="trading",
)
# 不注册 message 回调:订单/用户数据仅由 _cache_latest_data 发布到 EventBus
self.logger.info(
f"🌐 WS 连接配置 | 行情: {WS_MARKET_URL} (主网固定)"
f" | 交易: {WS_TRADING_URL} (TRADING_NETWORK={os.getenv('TRADING_NETWORK','mainnet')})"
)
else:
self.logger.info("交易订阅为空,仅启动行情 WS")
约定:仅当 self.trading_subscriptions 非空时创建并启动交易 WS;不存在空订阅的 trading manager 或占位线程。
5.3.6 Trading WS 状态与告警
def _on_trading_state_change(self, state: str, info: Optional[str] = None):
self.logger.info(f"[交易WS] 状态变化: {state}" + (f" | {info}" if info else ""))
def _send_trading_alert(self, title: str, content: str):
self._send_system_alert(f"[交易WS] {title}", content)
5.3.7 on_message(msg)
- 契约:仅处理行情(去重 → K 线解析 → 入队)。
- 删除清单:不得包含
channel in ("orderUpdates", "userFills", "user")的判断;不得包含订单缓冲区写入/回放;不得调用 WebSocketOrderManager 或任何「订单原始消息」处理。
def on_message(self, msg: Dict):
if self._message_dedup.is_duplicate(msg):
return
kline = self._parse_kline(msg)
if not kline:
return
self.kline_buffer.put_nowait(kline)
# ... 后续仅与 K 线入队、分析入队相关,无 orderUpdates/userFills/user 分支
5.3.8 生命周期管理
start():先启动交易 WS 线程(若存在),再阻塞于行情 WS。
def start(self):
# 原有工作线程启动
if self.ws_trading_manager is not None:
self._trading_ws_thread = threading.Thread(
target=self.ws_trading_manager.start,
daemon=True,
name="trading-ws",
)
self._trading_ws_thread.start()
self.ws_manager.start()
stop():先停 orchestrator → 交易 WS(含 join)→ 行情 WS。
def stop(self):
if self._trading_orchestrator:
self._trading_orchestrator.stop()
if self.ws_trading_manager is not None:
self.ws_trading_manager.stop()
if self._trading_ws_thread is not None and self._trading_ws_thread.is_alive():
self._trading_ws_thread.join(timeout=5)
if self._trading_ws_thread.is_alive():
self.logger.warning("交易 WS 线程在 5s 内未退出")
self.ws_manager.stop()
# ... 其余等待队列清空等
5.3.9 动态订阅(新币种)
add_subscriptions 仅对 self.ws_manager 调用(只加 candle/l2Book),不涉及交易 WS。
5.4 src/trading/orchestrator.py
新增方法,供 RealtimeKlineServiceBase 构建交易 WS 订阅时调用:
def get_trading_ws_user_address(self) -> Optional[str]:
"""返回交易 WS 订阅所需的 user 地址;无 executor 时返回 None。"""
if self._executor is None:
return None
return self._executor._wallet.address
约束:RealtimeKlineServiceBase 不得直接访问 Orchestrator 的 _executor 或 _wallet,仅通过此接口获取地址。
5.5 src/trading/executor.py
三个事件 handler 入口增加 source 过滤,仅处理 source="trading":
def _on_position_updated(self, event: PositionUpdatedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
def _on_balance_changed(self, event: BalanceChangedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
def _on_websocket_reconnected(self, event: WebSocketReconnectedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
5.6 全局行情 WS 获取(原 get_global_ws_manager)
将 get_global_ws_manager / _set_global_ws_manager 重命名为 get_market_ws_manager / _set_market_ws_manager,所有调用处同步更新。语义为「行情 WS(主网)」;禁止将返回值用于交易相关逻辑。
6. 初始化时序
__init__
├── _init_trading_module() → TradingOrchestrator + Executor,Executor._subscribe_events() 订阅 EventBus
├── self.ws_trading_manager = None
├── self._trading_ws_thread = None
├── _build_subscriptions() → 仅 candle + l2Book
├── _build_trading_subscriptions() → 仅当 get_trading_ws_user_address() 返回地址时非空
└── _init_service_threads()
├── ws_manager(ws_url=WS_MARKET_URL, source_name="market")
└── ws_trading_manager(ws_url=WS_TRADING_URL, source_name="trading") [仅当 trading_subscriptions 非空]
start()
├── 工作线程启动
├── ws_trading_manager.start() [daemon 线程,若存在]
└── ws_manager.start() [阻塞主线程]
stop()
├── _trading_orchestrator.stop()
├── ws_trading_manager.stop() + _trading_ws_thread.join(timeout=5)
└── ws_manager.stop()
7. 边界与注意事项
- 消息去重:
on_message的MessageDeduplicator仅作用于行情。Trading WS 无 message 回调,WebSocketOrderManager 内部有 oid 级去重。 - 并发与阻塞:
EnhancedWebSocketManager.start()为阻塞调用,故交易 WS 必须在独立 daemon 线程启动。stop()中先停交易 WS 并join(timeout=5),再停行情 WS,避免竞态。 - 连接失败:Trading WS 连接失败与现有重试与飞书告警一致,不阻塞行情 WS;订单依赖 HTTP 兜底。
- mainnet=mainnet:
TRADING_NETWORK=mainnet时两 URL 可相同,两条独立连接,逻辑分离清晰。 - 监控:market_ws_state 来自
on_state_change;trading_ws_state 来自_on_trading_state_change(带[交易WS]前缀);trading_ws_thread_alive 可在stop()的 join 超时处打 warning。 - 事件基类:
src/events/base.py中 Event 的source字段 docstring 仅保留「仅允许market(行情 WS)或trading(交易 WS)」,删除websocket/http/internal等旧语义。 - userFills 与 user.fills:两者均来自 Trading WS,路径独立。userFills → WebSocketOrderManager(订单状态机);user.fills → Executor(持仓/余额缓存刷新)。
8. 改动量汇总
| 文件 | 变更要点 |
|---|---|
src/config.py |
新增 WS_MARKET_URL、WS_TRADING_URL;删除 WS_URL |
src/utils/websocket/enhanced_ws_manager.py |
构造函数增加必选 ws_url、source_name;所有事件 source 使用 self._source_name |
src/trading/orchestrator.py |
新增 get_trading_ws_user_address() -> Optional[str] |
src/services/realtime_kline_service_base.py |
订阅拆分、_build_trading_subscriptions 仅调 get_trading_ws_user_address()、Trading WS 不注册 message 回调、on_message 仅行情、双 manager 生命周期、_set_market_ws_manager |
src/trading/executor.py |
三处 handler 增加 if event.source != "trading": return |
src/events/base.py |
source 字段 docstring 仅允许 market/trading |
| 全局 | get_global_ws_manager → get_market_ws_manager,调用处同步修改 |
实现后禁止符号(grep 零结果)
| 禁止项 | 说明 |
|---|---|
WS_URL |
已由 WS_MARKET_URL / WS_TRADING_URL 替代,删除后不得残留 |
get_global_ws_manager、_set_global_ws_manager |
已重命名为 get_market_ws_manager / _set_market_ws_manager |
source="websocket"、source='websocket' |
事件 source 仅允许 "market" / "trading" |
RealtimeKlineServiceBase 中对 _executor、_wallet 的访问 |
仅通过 get_trading_ws_user_address() 获取地址 |
_on_trading_message、对交易 WS 的 add_message_callback |
Trading WS 不注册任何 message 回调 |
orderUpdates/userFills/user 出现在 on_message 或行情处理路径中 |
仅允许出现在订阅构建与 Trading WS 的 _cache_latest_data 中 |
9. 测试验证清单
功能测试
| # | 场景 | 预期 |
|---|---|---|
| 1 | TRADING_NETWORK=testnet |
行情 WS 连主网,交易 WS 连测试网 |
| 2 | 测试网下单后 | 能通过交易 WS 收到 orderUpdates/userFills(经 EventBus 到 WebSocketOrderManager) |
| 3 | 交易 WS 重连 | Executor 仅当 source=="trading" 时触发 verify_pending_orders |
| 4 | 行情 WS 重连 | Executor 不触发订单补查 |
| 5 | TRADING_ENABLED=false |
仅行情 WS,无交易 WS |
| 6 | TRADING_NETWORK=mainnet |
两 WS 可连同一地址,功能正常 |
| 7 | 交易 WS 断开期间下单 | HTTP 兜底正常,重连后推送恢复 |
单元测试
| # | 测试点 | 验证方式 |
|---|---|---|
| T1 | source="market" 的 WebSocketReconnectedEvent |
Executor 不触发订单补查 |
| T2 | source="trading" 的 PositionUpdatedEvent |
Executor 缓存正确更新 |
| T3 | source="market" 的 PositionUpdatedEvent |
Executor 不更新 |
| T4 | Trading WS 不注册 message 回调 | 仅 _cache_latest_data 发布事件,无 add_message_callback |
| T5 | 事件 source 仅 market/trading |
无 source="websocket" 的发布或断言 |
压力/并发
| # | 场景 | 预期 |
|---|---|---|
| P1 | 两 WS 同时高频推送 | 无串扰,行情/交易各自处理 |
| P2 | stop() 时交易 WS 仍有消息 |
join(timeout=5) 后流程正常结束 |