双WebSocket架构设计15
双 WebSocket 架构设计
1. 问题与方案
问题:TRADING_NETWORK=testnet 时订单通过测试网 HTTP 提交,但 WebSocket 推送仍来自主网,导致收不到测试网的 orderUpdates/userFills,持仓/余额与测试网不一致。
方案:拆为两条独立 WebSocket 连接——
| 连接 | URL | source | 订阅 |
|---|---|---|---|
| Market WS | 主网固定 | "market" |
candle, l2Book |
| Trading WS | 随 TRADING_NETWORK |
"trading" |
orderUpdates, userFills, user |
实现原则:
- 老代码重写,不打补丁;实现后单 WS 逻辑零残留
- 两处创建
EnhancedWebSocketManager必须显式传入ws_url与source_name,无默认值 - 事件 source 合法取值仅
"market"和"trading",全局禁止"websocket" TRADING_NETWORK默认"mainnet",防止忘记配置时静默连测试网
2. 架构与数据流
flowchart LR
subgraph config [Config]
WS_MARKET_URL["WS_MARKET_URL (主网固定)"]
WS_TRADING_URL["WS_TRADING_URL (随 TRADING_NETWORK)"]
end
subgraph market [Market WS]
Mgr1["EnhancedWebSocketManager\nsource='market'"]
Mgr1 -->|订阅| Sub1[candle / l2Book]
Mgr1 -->|WS_MARKET_URL| HL_M[Hyperliquid Mainnet]
end
subgraph trading [Trading WS]
Mgr2["EnhancedWebSocketManager\nsource='trading'"]
Mgr2 -->|订阅| Sub2[orderUpdates / userFills / user]
Mgr2 -->|WS_TRADING_URL| HL_T[Hyperliquid Testnet/Mainnet]
end
config --> Mgr1
config --> Mgr2
Mgr1 -->|_cache_latest_data| EB1[EventBus]
EB1 -->|CandleUpdatedEvent / OrderBookUpdatedEvent| Kline[K 线 + L2 处理]
Mgr1 -->|on_message| KlineProc[K 线解析 → kline_buffer → 分析入队]
Mgr2 -->|_cache_latest_data| EB2[EventBus]
EB2 -->|OrderStatusEvent / OrderFilledEvent| OrderMgr[WebSocketOrderManager]
EB2 -->|过滤 source=trading| Exec[Executor]
事件全景
| 频道 | 事件类型 | source | 消费者 | 互斥说明 |
|---|---|---|---|---|
| candle | CandleUpdatedEvent | market | K 线处理 | |
| l2Book | OrderBookUpdatedEvent | market | L2 缓存 | |
| orderUpdates | OrderStatusEvent | trading | WebSocketOrderManager | |
| userFills | OrderFilledEvent | trading | WebSocketOrderManager(oid 去重) | 与下行消费者互斥 |
| user.fills | OrderFilledEvent | trading | Executor(刷新持仓/余额缓存) | 与上行消费者互斥 |
| user.assetPositions | PositionUpdatedEvent | trading | Executor | |
| user.marginSummary | BalanceChangedEvent | trading | Executor | |
| _on_open 首连 | WebSocketConnectedEvent | 各自 | 日志 | |
| _on_open 重连 | WebSocketReconnectedEvent | 各自 | Executor(仅 trading) | |
| _on_close | WebSocketDisconnectedEvent | 各自 | 日志 |
行情数据流(Market WS)
主网 WS → _wrapped_callback()
1. _cache_latest_data() → CandleUpdatedEvent / OrderBookUpdatedEvent (source="market") → EventBus
2. on_message() → K 线解析 → kline_buffer → 分析入队
交易数据流(Trading WS)
交易网 WS → _wrapped_callback() → _cache_latest_data():
- orderUpdates → OrderStatusEvent(source="trading") → EventBus → WebSocketOrderManager
- userFills → OrderFilledEvent(source="trading") → EventBus → WebSocketOrderManager
- user → Position/Balance/Fill Event(source="trading") → EventBus → Executor
Trading WS 不注册 message 回调,仅依赖 _cache_latest_data 发布事件。
两条 WS 独立连接、独立重连。_on_open 中自动重发各自的 subscriptions,无需外部干预。Executor 仅响应 source="trading" 的重连事件触发 verify_pending_orders。
TRADING_NETWORK=mainnet 时两 URL 相同,仍维持两条独立连接——代码路径统一,行情/交易隔离,代价仅多一条长连接。
初始化顺序保证:_init_trading_module()(Executor/WebSocketOrderManager 完成 EventBus 订阅)→ _init_service_threads()(仅创建 manager)→ start()(启动 WS),订阅必然先于首条消息。
3. 实现规格
以方法名与契约为准,行号仅作参考。
3.1 src/config.py
import os, logging
_trading_network = os.getenv("TRADING_NETWORK", "mainnet").lower()
if _trading_network not in ("mainnet", "testnet"):
logging.getLogger(__name__).warning(
"TRADING_NETWORK=%s 非法,回退为 mainnet", os.getenv("TRADING_NETWORK")
)
_trading_network = "mainnet"
TRADING_NETWORK = _trading_network
_WS_URLS = {
"mainnet": "wss://api.hyperliquid.xyz/ws",
"testnet": "wss://api.hyperliquid-testnet.xyz/ws",
}
WS_MARKET_URL = _WS_URLS["mainnet"]
WS_TRADING_URL = _WS_URLS[TRADING_NETWORK]
删除 WS_URL。引用网络标识统一使用 config.TRADING_NETWORK,禁止在 config.py 之外 os.getenv("TRADING_NETWORK")。
3.2 src/utils/websocket/enhanced_ws_manager.py
构造函数:ws_url 与 source_name 为必选参数,位于可选参数之前。source_name 类型标注为 Literal["market", "trading"],构造函数 ValueError 保留作运行时防御。
from typing import Literal
def __init__(
self,
subscriptions: List[Dict],
ws_url: str,
source_name: Literal["market", "trading"],
on_state_change: Optional[Callable] = None,
timeout: int = WS_TIMEOUT,
skip_disconnects: bool = False,
alert_callback: Optional[Callable] = None,
max_retries: int | None = WS_MAX_RETRIES,
alert_threshold: int = WS_ALERT_THRESHOLD,
):
if source_name not in ("market", "trading"):
raise ValueError(
f"source_name 非法: {source_name!r},仅允许 'market' 或 'trading'"
)
self.ws_url = ws_url
self._source_name = source_name
所有 _event_bus.publish(..., source=...) 改为 source=self._source_name。涉及位置:_on_open(两处)、_on_close/_on_error、_publish_candle_event、_publish_orderbook_event、_publish_price_event、_publish_user_events、_publish_order_status_events、_publish_fill_events。
删除对 WS_URL 的 import,其余 WS_TIMEOUT/WS_MAX_RETRIES/WS_ALERT_THRESHOLD 保留。
3.3 src/services/realtime_kline_service_base.py
导入
from src.config import WS_MARKET_URL, WS_TRADING_URL, TRADING_NETWORK
属性(__init__)
self.ws_trading_manager: Optional[EnhancedWebSocketManager] = None
self._trading_ws_thread: Optional[threading.Thread] = None
self.subscriptions = self._build_market_subscriptions()
self.trading_subscriptions = self._build_trading_subscriptions()
_build_market_subscriptions():仅返回 candle + l2Book,不含交易订阅,不访问 _executor/_wallet。
def _build_market_subscriptions(self) -> List[Dict]:
"""构建行情 WS 订阅列表(candle + l2Book)。"""
subscriptions = []
for symbol in self._get_all_symbols():
coin = symbol.split('/')[0]
for interval in ['5m', '1h', '4h']:
subscriptions.append({"type": "candle", "coin": coin, "interval": interval})
subscriptions.append({"type": "l2Book", "coin": coin})
# 基准币种 l2Book 去重
if hasattr(self, 'base_symbol') and self.base_symbol:
base_coin = self.base_symbol.split('/')[0]
if not any(s.get("type") == "l2Book" and s.get("coin") == base_coin
for s in subscriptions):
subscriptions.append({"type": "l2Book", "coin": base_coin})
return subscriptions
_build_trading_subscriptions()
def _build_trading_subscriptions(self) -> List[Dict]:
if not self._trading_orchestrator:
return []
user_address = self._trading_orchestrator.get_trading_ws_user_address()
if not user_address:
return []
return [
{"type": "orderUpdates", "user": user_address},
{"type": "userFills", "user": user_address},
{"type": "user", "user": user_address},
]
_init_service_threads()
Trading WS 创建失败时 fail-fast:trading_subscriptions 非空说明用户明确配置了交易模块,创建失败应直接抛异常而非静默降级为仅行情模式(半残状态运行不如早暴露配置问题)。
_on_trading_state_change 和 _send_trading_alert 不再作为独立方法,改为 _init_service_threads 内联 lambda,消除仅加前缀的包装方法。
def _init_service_threads(self):
# --- 行情 WS(主网固定) ---
self.ws_manager = EnhancedWebSocketManager(
subscriptions=self.subscriptions,
ws_url=WS_MARKET_URL,
source_name="market",
on_state_change=self.on_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_system_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
)
self.ws_manager.add_message_callback(self.on_message)
_set_market_ws_manager(self.ws_manager)
# --- 交易 WS(随配置) ---
if self.trading_subscriptions:
self.ws_trading_manager = EnhancedWebSocketManager(
subscriptions=self.trading_subscriptions,
ws_url=WS_TRADING_URL,
source_name="trading",
on_state_change=lambda state, info=None: self.logger.info(
f"[交易WS] {state}" + (f" | {info}" if info else "")
),
timeout=WS_TIMEOUT,
alert_callback=lambda title, content: self._send_system_alert(
f"[交易WS] {title}", content
),
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
)
# Trading WS 不注册 message 回调
self.logger.info(
f"WS | 行情: {WS_MARKET_URL} | 交易: {WS_TRADING_URL} ({TRADING_NETWORK})"
)
else:
self.logger.info("交易订阅为空,仅启动行情 WS")
on_message():仅处理 K 线(去重 → 解析 → 入队),不含 orderUpdates/userFills/user 分支。
def on_message(self, msg: Dict):
if self._message_dedup.is_duplicate(msg):
return
kline = self._parse_kline(msg)
if not kline:
return
self.kline_buffer.put_nowait(kline)
# 后续仅 K 线入队、分析入队
生命周期
def start(self):
# ... 工作线程启动 ...
if self.ws_trading_manager is not None:
self._trading_ws_thread = threading.Thread(
target=self.ws_trading_manager.start, daemon=True, name="trading-ws"
)
self._trading_ws_thread.start()
self.ws_manager.start() # 阻塞主线程
def stop(self):
if self._trading_orchestrator:
self._trading_orchestrator.stop()
if self.ws_trading_manager is not None:
self.ws_trading_manager.stop()
if self._trading_ws_thread and self._trading_ws_thread.is_alive():
self._trading_ws_thread.join(timeout=5)
if self._trading_ws_thread.is_alive():
self.logger.warning("交易 WS 线程在 5s 内未退出")
self.ws_manager.stop()
动态订阅:add_subscriptions 仅对 self.ws_manager(Market WS)调用,用于运行时新增 candle/l2Book 订阅。不涉及交易 WS(见第 4 节设计取舍说明)。
3.4 src/trading/orchestrator.py
def get_trading_ws_user_address(self) -> Optional[str]:
"""返回交易 WS 订阅所需的 user 地址。"""
if self._executor is None:
return None
return self._executor._wallet.address
RealtimeKlineServiceBase 不得直接访问 _executor 或 _wallet,仅通过此接口获取地址。
3.5 src/trading/executor.py
三个 handler 增加 source 过滤:
def _on_position_updated(self, event: PositionUpdatedEvent):
if event.source != "trading":
return
# ... 原有逻辑 ...
def _on_balance_changed(self, event: BalanceChangedEvent):
if event.source != "trading":
return
# ... 原有逻辑 ...
def _on_websocket_reconnected(self, event: WebSocketReconnectedEvent):
if event.source != "trading":
return
# ... 原有逻辑 ...
关于三处 guard 的说明:三处 if event.source != "trading": return 保留为最简形式(三行独立 guard),不引入装饰器或 EventBus 过滤层等新抽象。其中仅 WebSocketReconnectedEvent 的 guard 是必要的——因为两条 WS 都会发出重连事件,Executor 必须区分来源。PositionUpdatedEvent 和 BalanceChangedEvent 的 guard 是防御性编程——当前架构下只有 Trading WS 发布这两类事件,但 guard 可防止未来新增 source 时引发误处理。
3.6 src/events/base.py
source 字段类型标注为 Literal["market", "trading"],提供编译期约束。构造函数 ValueError 保留作运行时防御(3.2 节)。
from typing import Literal
@dataclass
class Event:
timestamp: datetime
source: Literal["market", "trading"]
priority: EventPriority = field(default=EventPriority.NORMAL)
metadata: Dict[str, Any] = field(default_factory=dict)
event_id: str = field(default="", init=False, repr=False)
3.7 全局 WS 获取重命名
| 旧名 | 新名 |
|---|---|
get_global_ws_manager |
get_market_ws_manager |
_set_global_ws_manager |
_set_market_ws_manager |
_global_ws_manager |
_market_ws_manager |
调用处仅一处:src/trading/executor.py。语义为行情 WS(主网),禁止用于交易逻辑。
4. 设计取舍
本节记录三个有意识的设计决策,避免被误判为遗漏。
4.1 _cache_latest_data 共享类中的"死代码"
EnhancedWebSocketManager._cache_latest_data() 包含 candle/l2Book/orderUpdates/userFills/user 全部频道的处理分支。双 WS 架构下,Market 实例永远不会触发 orderUpdates/userFills/user 分支,Trading 实例永远不会触发 candle/l2Book 分支——每个实例约 50% 的分支是"死代码"。
决策:保留共享实现,不按 source_name 拆分策略。原因:哪些分支被触发由订阅内容(subscriptions)决定,而非代码遗漏。拆分会引入新的条件分支或子类,增加抽象层级但不增加正确性。
4.2 Executor source guard 保留理由
三处 if event.source != "trading": return 是有意保留的最简形式。不引入 @source_filter 装饰器或 EventBus 层过滤——三行 guard 的认知成本低于理解新抽象的成本。详见 3.5 节说明。
4.3 add_subscriptions 仅用于 Market WS
add_subscriptions 方法用于运行时动态新增 candle/l2Book 订阅(如新币种上线),仅对 self.ws_manager(Market WS)调用。代码层面不加 source_name 守卫——该方法在 EnhancedWebSocketManager 中是通用的,限制来自调用方约定而非被调用方检查。交易订阅在初始化时一次性确定,无动态变更需求。
5. 禁止符号表
实现后以下 grep 结果必须为零:
| grep 模式 | 范围 | 说明 |
|---|---|---|
\bWS_URL\b |
src/ | 已由 WS_MARKET_URL / WS_TRADING_URL 替代,\b 词边界排除 WS_MARKET_URL/WS_TRADING_URL |
source=.*websocket |
src/ | source 仅 "market" / "trading" |
get_global_ws_manager|_set_global_ws_manager|_global_ws_manager |
src/ | 已重命名为 market 前缀 |
_executor.*_wallet |
src/services/ | 仅通过 get_trading_ws_user_address() |
assert source_name |
src/ | 使用 ValueError |
os.getenv.*TRADING_NETWORK |
src/(config.py 除外) | 统一用 config.TRADING_NETWORK |
orderUpdates|userFills |
on_message / _build_market_subscriptions 内 |
仅允许出现在 _build_trading_subscriptions 与 _cache_latest_data |
6. 验收测试
| # | 场景 | 预期 |
|---|---|---|
| 1 | TRADING_NETWORK=testnet |
行情 WS 连主网,交易 WS 连测试网 |
| 2 | 测试网下单 | 通过交易 WS 收到 orderUpdates/userFills |
| 3 | 交易 WS 重连 | 自动重订阅 + Executor 触发 verify_pending_orders |
| 4 | 行情 WS 重连 | 自动重订阅 + Executor 不触发订单补查 |
| 5 | TRADING_ENABLED=false |
仅行情 WS,无交易 WS |
| 6 | TRADING_NETWORK=mainnet |
两 WS 连同一地址,功能正常 |
| 7 | TRADING_NETWORK=invalid |
回退 mainnet + warning 日志 |
| 8 | Trading WS trading_subscriptions 非空但构造异常 |
异常上抛,进程启动失败(fail-fast) |
| 9 | 两 WS 同时高频推送 | EventBus 无竞态、无死锁 |
| 10 | 全局 grep 禁止符号表 | 全部为零 |