双WebSocket架构设计18
双 WebSocket 架构设计
1. 问题与方案
问题:TRADING_NETWORK=testnet 时订单通过测试网 HTTP 提交,但 WebSocket 推送仍来自主网,导致收不到测试网的 orderUpdates/userFills,持仓/余额与测试网不一致。
方案:拆为两条独立 WebSocket 连接——
| 连接 | URL | source | 订阅 |
|---|---|---|---|
| Market WS | 主网固定 | "market" |
candle, l2Book |
| Trading WS | 随 TRADING_NETWORK |
"trading" |
orderUpdates, userFills, user |
实现原则:
- 老代码彻底删除,不打补丁;实现后单 WS 逻辑零残留
- 两处创建
EnhancedWebSocketManager必须显式传入ws_url与source_name,无默认值 - WS 层 source 合法取值仅
"market"和"trading",全局禁止"websocket" TRADING_NETWORK默认"mainnet",防止忘记配置时静默连测试网- 无全局可变状态:依赖通过构造函数或 setter 注入,不通过模块级全局变量传递
2. 架构与数据流
flowchart LR
subgraph config [Config]
WS_MARKET_URL["WS_MARKET_URL (主网固定)"]
WS_TRADING_URL["WS_TRADING_URL (随 TRADING_NETWORK)"]
end
subgraph market ["Market WS · source=market"]
Mgr1[EnhancedWebSocketManager]
Sub1[candle / l2Book]
end
subgraph trading ["Trading WS · source=trading"]
Mgr2[EnhancedWebSocketManager]
Sub2[orderUpdates / userFills / user]
end
WS_MARKET_URL --> Mgr1
WS_TRADING_URL --> Mgr2
Mgr1 --- Sub1
Mgr2 --- Sub2
Mgr1 -->|"on_message 回调"| Kline["K 线解析 → buffer → DB → 分析"]
Mgr1 -->|"TTLCache (l2Book)"| Exec[Executor]
EB[EventBus]
Mgr2 -->|"_cache_latest_data"| EB
EB -->|"OrderStatusEvent\nOrderFilledEvent"| WsOM[WebSocketOrderManager]
EB -->|"PositionUpdatedEvent\nBalanceChangedEvent"| Exec
Mgr1 -.->|"set_market_ws_manager()"| Orch[Orchestrator] -.-> Exec
数据流
Market WS(行情):每条消息触发两个独立机制,职责不重叠——
主网 WS → _wrapped_callback()
├─ _cache_latest_data() → TTLCache 写入(candle/l2Book 原始数据缓存,供 Executor 读取 l2Book)
└─ on_message() 回调 → _parse_kline() → kline_buffer → batch_writer → DB → 分析入队
TTLCache 是原始数据缓存(Executor 通过 _market_ws_manager.latest_data 读取 l2Book),on_message 是 K 线解析管线。二者输入相同但输出不同、消费者不同,不构成重复路径。
Trading WS(交易):不注册 on_message 回调,仅依赖 _cache_latest_data 发布事件。
交易网 WS → _wrapped_callback() → _cache_latest_data():
orderUpdates → OrderStatusEvent(source="trading") → EventBus → WebSocketOrderManager
userFills → OrderFilledEvent(source="trading") → EventBus → WebSocketOrderManager
user → Position/Balance/Fill Event(source="trading") → EventBus → Executor
事件表
仅列出有实际消费者的事件。
| 频道 | 事件类型 | source | 消费者 |
|---|---|---|---|
| orderUpdates | OrderStatusEvent | trading | WebSocketOrderManager |
| userFills | OrderFilledEvent | trading | WebSocketOrderManager(oid 去重) |
| user.fills | OrderFilledEvent | trading | Executor(刷新持仓/余额缓存) |
| user.assetPositions | PositionUpdatedEvent | trading | Executor |
| user.marginSummary | BalanceChangedEvent | trading | Executor |
| _on_open 首连 | WebSocketConnectedEvent | market / trading | 日志 |
| _on_open 重连 | WebSocketReconnectedEvent | market / trading | Executor(仅 trading 触发补查) |
| _on_close | WebSocketDisconnectedEvent | market / trading | 日志 |
两条 WS 独立连接、独立重连。_on_open 中自动重发各自的 subscriptions,无需外部干预。
TRADING_NETWORK=mainnet 时两 URL 相同,仍维持两条独立连接——代码路径统一,行情/交易隔离,代价仅多一条长连接。
初始化顺序:_init_trading_module()(Executor/WebSocketOrderManager 完成 EventBus 订阅)→ _init_service_threads()(创建 manager + 注入行情 WS 引用)→ start()(启动 WS)。start() 入口断言 self.ws_manager is not None,强制保证顺序。
3. 实现规格
以方法名与契约为准,行号仅作参考。
3.1 src/config.py
import os, logging
_trading_network = os.getenv("TRADING_NETWORK", "mainnet").lower()
if _trading_network not in ("mainnet", "testnet"):
logging.getLogger(__name__).warning(
"TRADING_NETWORK=%s 非法,回退为 mainnet", os.getenv("TRADING_NETWORK")
)
_trading_network = "mainnet"
TRADING_NETWORK = _trading_network
_WS_URLS = {
"mainnet": "wss://api.hyperliquid.xyz/ws",
"testnet": "wss://api.hyperliquid-testnet.xyz/ws",
}
WS_MARKET_URL = _WS_URLS["mainnet"]
WS_TRADING_URL = _WS_URLS[TRADING_NETWORK]
删除 WS_URL。引用网络标识统一使用 config.TRADING_NETWORK,禁止在 config.py 之外 os.getenv("TRADING_NETWORK")。
src/trading/config.py 中已有的 network_str = os.getenv("TRADING_NETWORK", "testnet") 必须迁移:删除该处定义,改为 from src.config import TRADING_NETWORK。注意旧处默认值为 "testnet",与本设计的 "mainnet" 默认值冲突——以 src/config.py 为唯一真相源。
3.2 src/utils/websocket/enhanced_ws_manager.py
构造函数:ws_url 与 source_name 为必选参数,位于可选参数之前。source_name 类型标注为 Literal["market", "trading"],构造函数 ValueError 作运行时防御。
from typing import Literal
def __init__(
self,
subscriptions: List[Dict],
ws_url: str,
source_name: Literal["market", "trading"],
on_state_change: Optional[Callable] = None,
timeout: int = WS_TIMEOUT,
skip_disconnects: bool = False,
alert_callback: Optional[Callable] = None,
max_retries: int | None = WS_MAX_RETRIES,
alert_threshold: int = WS_ALERT_THRESHOLD,
):
if source_name not in ("market", "trading"):
raise ValueError(
f"source_name 非法: {source_name!r},仅允许 'market' 或 'trading'"
)
self.ws_url = ws_url
self._source_name = source_name
删除对 WS_URL 的 import 和 self.ws_url = WS_URL 赋值,其余 WS_TIMEOUT/WS_MAX_RETRIES/WS_ALERT_THRESHOLD 保留。
删除死事件发布:_publish_candle_event 和 _publish_orderbook_event 方法整体删除。在 _cache_latest_data() 的 candle/l2Book 分支中移除对这两个方法的调用,仅保留 TTLCache 写入。
source 统一:所有剩余的 _event_bus.publish(..., source=...) 改为 source=self._source_name。涉及位置:_on_open(两处)、_on_close/_on_error、_publish_price_event、_publish_user_events、_publish_order_status_events、_publish_fill_events。
3.3 src/services/realtime_kline_service_base.py
导入
from src.config import WS_MARKET_URL, WS_TRADING_URL, TRADING_NETWORK
删除 from src.config import WS_URL(如存在)。
属性(__init__)
self.ws_trading_manager: Optional[EnhancedWebSocketManager] = None
self._trading_ws_thread: Optional[threading.Thread] = None
self.subscriptions = self._build_market_subscriptions()
self.trading_subscriptions = self._build_trading_subscriptions()
_build_market_subscriptions():仅返回 candle + l2Book,不含交易订阅,不访问 _executor/_wallet。
def _build_market_subscriptions(self) -> List[Dict]:
"""构建行情 WS 订阅列表(candle + l2Book)。"""
subscriptions = []
for symbol in self._get_all_symbols():
coin = symbol.split('/')[0]
for interval in ['5m', '1h', '4h']:
subscriptions.append({"type": "candle", "coin": coin, "interval": interval})
subscriptions.append({"type": "l2Book", "coin": coin})
if self.base_symbol:
base_coin = self.base_symbol.split('/')[0]
if not any(s.get("type") == "l2Book" and s.get("coin") == base_coin
for s in subscriptions):
subscriptions.append({"type": "l2Book", "coin": base_coin})
return subscriptions
base_symbol可能不在_get_all_symbols()返回值中(配对交易场景下 base 与 symbols 独立配置),因此需要额外检查。base_symbol是__init__中声明的属性,值为None时if self.base_symbol:已足够,不使用hasattr。
_build_trading_subscriptions():区分"未启用"(返回空列表)与"已启用但配置错误"(抛异常 fail-fast)。
def _build_trading_subscriptions(self) -> List[Dict]:
"""构建交易 WS 订阅列表(orderUpdates + userFills + user)。"""
if not self._trading_orchestrator:
return []
user_address = self._trading_orchestrator.get_trading_ws_user_address()
if not user_address:
raise ValueError(
"交易模块已启用但无法获取 user address,检查钱包配置"
)
return [
{"type": "orderUpdates", "user": user_address},
{"type": "userFills", "user": user_address},
{"type": "user", "user": user_address},
]
_init_service_threads():行情 WS 引用通过 orchestrator 注入 executor,不经过模块级全局变量。
def _init_service_threads(self):
# --- 行情 WS(主网固定) ---
self.ws_manager = EnhancedWebSocketManager(
subscriptions=self.subscriptions,
ws_url=WS_MARKET_URL,
source_name="market",
on_state_change=self.on_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_system_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
)
self.ws_manager.add_message_callback(self.on_message)
# 注入行情 WS 引用
if self._trading_orchestrator:
self._trading_orchestrator.set_market_ws_manager(self.ws_manager)
# --- 交易 WS(随配置) ---
if self.trading_subscriptions:
self.ws_trading_manager = EnhancedWebSocketManager(
subscriptions=self.trading_subscriptions,
ws_url=WS_TRADING_URL,
source_name="trading",
on_state_change=lambda state, info=None: self.logger.info(
f"[交易WS] {state}" + (f" | {info}" if info else "")
),
timeout=WS_TIMEOUT,
alert_callback=lambda title, content: self._send_system_alert(
f"[交易WS] {title}", content
),
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
)
# Trading WS 不注册 message 回调
self.logger.info(
f"WS | 行情: {WS_MARKET_URL} | 交易: {WS_TRADING_URL} ({TRADING_NETWORK})"
)
else:
self.logger.info("交易订阅为空,仅启动行情 WS")
on_message():仅处理 K 线(去重 → 解析 → 入队),不含 orderUpdates/userFills/user 分支。
def on_message(self, msg: Dict):
if self._message_dedup.is_duplicate(msg):
return
kline = self._parse_kline(msg)
if not kline:
return
self.kline_buffer.put_nowait(kline)
生命周期
def start(self):
assert self.ws_manager is not None, "必须先调用 _init_service_threads()"
# ... 工作线程启动 ...
if self.ws_trading_manager is not None:
self._trading_ws_thread = threading.Thread(
target=self.ws_trading_manager.start, daemon=True, name="trading-ws"
)
self._trading_ws_thread.start()
self.ws_manager.start() # 阻塞主线程
def stop(self):
if self._trading_orchestrator:
self._trading_orchestrator.stop()
if self.ws_trading_manager is not None:
self.ws_trading_manager.stop()
if self._trading_ws_thread and self._trading_ws_thread.is_alive():
self._trading_ws_thread.join(timeout=5)
if self._trading_ws_thread.is_alive():
self.logger.warning("交易 WS 线程在 5s 内未退出")
self.ws_manager.stop()
动态订阅:add_subscriptions 仅对 self.ws_manager(Market WS)调用,用于运行时新增 candle/l2Book 订阅(如新币种上线)。交易订阅在初始化时一次性确定,无动态变更需求。
3.4 src/trading/orchestrator.py
新增两个接口方法。Orchestrator 本身承担信号处理、止损监控、仓位同步等编排职责,是 Service 层与 Executor 层之间的唯一桥梁——这两个方法维护该边界,非凭空增加间接层。
def get_trading_ws_user_address(self) -> Optional[str]:
"""返回交易 WS 订阅所需的 user 地址。"""
if self._executor is None:
return None
return self._executor._wallet.address
def set_market_ws_manager(self, manager: "EnhancedWebSocketManager"):
"""注入行情 WS 引用,供 Executor 读取缓存行情数据。"""
if self._executor is not None:
self._executor.set_market_ws_manager(manager)
3.5 src/trading/executor.py
行情 WS 引用注入(替代全局 get_global_ws_manager()):
def set_market_ws_manager(self, manager: "EnhancedWebSocketManager"):
"""注入行情 WS 引用。由 Orchestrator 在 _init_service_threads 阶段调用。"""
self._market_ws_manager = manager
删除 from ... import get_global_ws_manager,所有原先调用 get_global_ws_manager() 的地方改为 self._market_ws_manager。
重连 handler 增加 source 过滤——两条 WS 都会发出 WebSocketReconnectedEvent,Executor 只响应 trading 来源:
def _on_websocket_reconnected(self, event: WebSocketReconnectedEvent):
if event.source != "trading":
return
self.logger.warning(
f"交易WS重连 | 断连={event.downtime_seconds:.1f}s"
)
self._verify_pending_orders()
PositionUpdatedEvent / BalanceChangedEvent handler 不加 source guard(见 D4)。
3.6 src/events/base.py
Event.source 保持 str 类型不变(见 D5)。
@dataclass
class Event:
timestamp: datetime
source: str
priority: EventPriority = field(default=EventPriority.NORMAL)
metadata: Dict[str, Any] = field(default_factory=dict)
event_id: str = field(default="", init=False, repr=False)
4. 删除与验收清单
每行同时标注"删什么"和"怎么验",一张表零重复。
| 文件 | 删除项 | 替代方案 | 验收 grep(结果必须为零) |
|---|---|---|---|
| src/config.py | WS_URL = "wss://..." |
WS_MARKET_URL / WS_TRADING_URL |
\bWS_URL\b in src/ |
| src/trading/config.py | network_str = os.getenv("TRADING_NETWORK", ...) 及相关分支 |
from src.config import TRADING_NETWORK |
os.getenv.*TRADING_NETWORK in src/(config.py 除外) |
| enhanced_ws_manager.py | from src.config import WS_URL |
构造参数 ws_url |
\bWS_URL\b in src/ |
| enhanced_ws_manager.py | self.ws_url = WS_URL 硬编码 |
构造参数 ws_url |
同上 |
| enhanced_ws_manager.py | 所有 source="websocket"(约 9 处) |
source=self._source_name |
source=.*websocket in src/ |
| enhanced_ws_manager.py | _publish_candle_event() 方法 |
删除(零消费者) | _publish_candle_event in src/ |
| enhanced_ws_manager.py | _publish_orderbook_event() 方法 |
删除(零消费者) | _publish_orderbook_event in src/ |
| enhanced_ws_manager.py | _cache_latest_data 中调用上述两方法的语句 |
仅保留 TTLCache 写入 | 同上 |
| realtime_kline_service_base.py | _global_ws_manager 模块级变量 |
依赖注入 | _global_ws_manager in src/ |
| realtime_kline_service_base.py | _global_ws_manager_lock 线程锁 |
随全局变量一并删除 | 同上 |
| realtime_kline_service_base.py | get_global_ws_manager() 函数 |
executor.set_market_ws_manager() |
get_global_ws_manager in src/ |
| realtime_kline_service_base.py | _build_subscriptions() 方法 |
_build_market_subscriptions() + _build_trading_subscriptions() |
_build_subscriptions[^_] in src/services/ |
| realtime_kline_service_base.py | _on_trading_state_change() 方法 |
内联 lambda | _on_trading_state_change in src/ |
| realtime_kline_service_base.py | _send_trading_alert() 方法 |
内联 lambda | _send_trading_alert in src/ |
| realtime_kline_service_base.py | _build_subscriptions 中 _executor._wallet.address 直接访问 |
get_trading_ws_user_address() |
_executor.*_wallet in src/services/ |
| executor.py | from ... import get_global_ws_manager |
self._market_ws_manager |
get_global_ws_manager in src/ |
| executor.py | 所有 get_global_ws_manager() 调用 |
self._market_ws_manager |
同上 |
补充校验(非删除项,但必须确认归零):
| grep 模式 | 范围 | 说明 |
|---|---|---|
assert source_name |
src/ | 应使用 ValueError,不用 assert |
orderUpdates|userFills |
on_message / _build_market_subscriptions 内 |
仅允许出现在 _build_trading_subscriptions 与 _cache_latest_data |
5. 设计决策记录
正文只写"做什么","为什么不做别的"集中在此。
D1:_cache_latest_data() 共享实现不拆分
_cache_latest_data() 包含 candle/l2Book/orderUpdates/userFills/user 全部频道的 if/elif 分支,双 WS 架构下每个实例约 50% 的分支不会被触发。保留共享实现——哪些分支被触发由 subscriptions(服务器推什么)决定,未命中的分支不执行、不影响正确性。拆分方案(子类/策略模式/handler map)均引入新抽象层,增加间接性但不增加正确性。
D2:删除 _publish_candle_event 和 _publish_orderbook_event
代码审计确认 CandleUpdatedEvent 和 OrderBookUpdatedEvent 在全代码库零消费者——发布后无任何 handler 订阅。candle 数据的实际消费路径是 on_message → _parse_kline → kline_buffer → DB;l2Book 数据的实际消费路径是 _cache_latest_data → TTLCache → Executor 直接读取。保留零消费者的事件发布是纯开销,删除后 Market WS 数据流从"双通道"简化为"单通道"。
D3:TRADING_NETWORK=mainnet 时仍维持两条连接
代码路径统一,行情/交易隔离,代价仅多一条长连接。避免"同网络时合并为单连接"的条件分支。
D4:PositionUpdatedEvent / BalanceChangedEvent 不加 source guard
当前架构下只有 Trading WS 发布这两类事件,guard 会保护一个不可能的场景。如果未来架构变化导致 Market WS 也发布这些事件,届时再加 guard。
D5:Event.source 保持 str 不收窄
事件系统中存在非 WS 来源的事件(如 "http"、"internal"),将基类收窄为 Literal["market", "trading"] 会破坏全局事件系统。source 合法值约束仅在 EnhancedWebSocketManager.__init__ 施加。
D6:交易 WS 的 on_state_change / alert_callback 内联为 lambda
这两个回调体只有一行日志/告警转发,不值得独立为命名方法。内联在构造调用处,读者一眼可见完整行为。
6. 验收测试
| # | 场景 | 预期 |
|---|---|---|
| 1 | TRADING_NETWORK=testnet |
行情 WS 连主网,交易 WS 连测试网 |
| 2 | 测试网下单 | 通过交易 WS 收到 orderUpdates/userFills |
| 3 | 交易 WS 重连 | 自动重订阅 + Executor 触发 _verify_pending_orders |
| 4 | 行情 WS 重连 | 自动重订阅 + Executor 不触发订单补查 |
| 5 | TRADING_ENABLED=false |
仅行情 WS,无交易 WS |
| 6 | TRADING_NETWORK=mainnet |
两 WS 连同一地址,功能正常 |
| 7 | TRADING_NETWORK=invalid |
回退 mainnet + warning 日志 |
| 8 | 交易模块已启用但钱包地址为空 | _build_trading_subscriptions 抛 ValueError,进程启动失败 |
| 9 | Trading WS 构造异常 | 异常上抛,进程启动失败(fail-fast) |
| 10 | 两 WS 同时高频推送 5 分钟 | 事件被消费者正确处理,无丢失、无异常日志 |
| 11 | 全局 grep 第 4 节验收列 | 全部为零 |