双WebSocket架构设计20
双 WebSocket 架构设计
1. 问题与方案
问题:TRADING_NETWORK=testnet 时订单通过测试网 HTTP 提交,但 WebSocket 推送仍来自主网,导致收不到测试网的 orderUpdates/userFills,持仓/余额与测试网不一致。
方案:拆为两条独立 WebSocket 连接——
| 连接 | URL | source | 订阅 |
|---|---|---|---|
| Market WS | 主网固定 | "market" |
candle, l2Book |
| Trading WS | 随 TRADING_NETWORK |
"trading" |
orderUpdates, userFills, user |
实现原则:
- 老代码彻底删除,不打补丁;实现后单 WS 逻辑零残留
- 两处创建
EnhancedWebSocketManager必须显式传入ws_url与source_name,无默认值 - WS 层 source 合法取值仅
"market"和"trading",全局禁止"websocket" TRADING_NETWORK默认"mainnet",防止忘记配置时静默连测试网- 无全局可变状态:依赖通过构造函数或 setter 注入,不通过模块级全局变量传递
- 命名对称:行情侧统一
market_前缀,交易侧统一trading_前缀,禁止无前缀的遗留名称
2. 架构与数据流
flowchart LR
subgraph config [Config]
WS_MARKET_URL["WS_MARKET_URL (主网固定)"]
WS_TRADING_URL["WS_TRADING_URL (随 TRADING_NETWORK)"]
end
subgraph market ["Market WS · source=market"]
Mgr1[EnhancedWebSocketManager]
Sub1[candle / l2Book]
end
subgraph trading ["Trading WS · source=trading"]
Mgr2[EnhancedWebSocketManager]
Sub2[orderUpdates / userFills / user]
end
WS_MARKET_URL --> Mgr1
WS_TRADING_URL --> Mgr2
Mgr1 --- Sub1
Mgr2 --- Sub2
Mgr1 -->|"on_message 回调"| Kline["K 线解析 → buffer → DB → 分析"]
Mgr1 -->|"TTLCache (l2Book)"| Exec[Executor]
EB[EventBus]
Mgr2 -->|"_cache_latest_data"| EB
EB -->|"OrderStatusEvent\nOrderFilledEvent"| WsOM[WebSocketOrderManager]
EB -->|"PositionUpdatedEvent\nBalanceChangedEvent"| Exec
Mgr1 -.->|"set_market_ws_manager()"| Orch[Orchestrator] -.-> Exec
事件表
| 频道 | 事件类型 | source | 消费者 | 幂等保证 |
|---|---|---|---|---|
| orderUpdates | OrderStatusEvent | trading | WebSocketOrderManager | oid 匹配,忽略未跟踪 oid |
| userFills | OrderFilledEvent | trading | WebSocketOrderManager | tid 去重,忽略未跟踪 oid |
| user.fills | OrderFilledEvent | trading | Executor | 触发缓存刷新,天然幂等 |
| user.assetPositions | PositionUpdatedEvent | trading | Executor | 全量覆盖,天然幂等 |
| user.marginSummary | BalanceChangedEvent | trading | Executor | 全量覆盖,天然幂等 |
| _on_open 首连 | WebSocketConnectedEvent | market / trading | 日志 | — |
| _on_open 重连 | WebSocketReconnectedEvent | market / trading | Executor(仅 trading 触发补查) | — |
| _on_close | WebSocketDisconnectedEvent | market / trading | 日志 | — |
去重契约:
userFills与user.fills均发布OrderFilledEvent,EventBus 广播模型下两个消费者都会收到两份。WebSocketOrderManager 通过 tid 去重 + oid 匹配过滤,仅处理已跟踪订单的 fill;Executor 将 fill 事件作为缓存刷新触发器,重复触发等价于一次刷新。两个消费者均天然幂等,无需额外过滤机制。
两条 WS 独立连接、独立重连。_on_open 中自动重发各自的 subscriptions,无需外部干预。
TRADING_NETWORK=mainnet 时两 URL 相同,仍维持两条独立连接——代码路径统一,行情/交易隔离,代价仅多一条长连接。
初始化顺序:_init_trading_module()(Executor/WebSocketOrderManager 完成 EventBus 订阅)→ _init_service_threads()(创建 manager + 注入行情 WS 引用)→ start()(启动 WS)。_init_service_threads() 入口检查交易启用时 orchestrator 必须已初始化(RuntimeError),start() 检查 self.ws_market_manager is not None(RuntimeError)。双重 fail-fast,杜绝初始化顺序错误导致的静默降级。
3. 实现规格
以方法名与契约为准,行号仅作参考。
3.1 src/config.py
import os, logging
_trading_network = os.getenv("TRADING_NETWORK", "mainnet").lower()
if _trading_network not in ("mainnet", "testnet"):
logging.getLogger(__name__).warning(
"TRADING_NETWORK=%s 非法,回退为 mainnet", os.getenv("TRADING_NETWORK")
)
_trading_network = "mainnet"
TRADING_NETWORK = _trading_network
_WS_URLS = {
"mainnet": "wss://api.hyperliquid.xyz/ws",
"testnet": "wss://api.hyperliquid-testnet.xyz/ws",
}
WS_MARKET_URL = _WS_URLS["mainnet"]
WS_TRADING_URL = _WS_URLS[TRADING_NETWORK]
# 重连后二次验证间隔(秒),覆盖 Hyperliquid 最终一致性延迟
WS_RECONNECT_VERIFY_DELAY = 5
删除 WS_URL。引用网络标识统一使用 config.TRADING_NETWORK,禁止在 config.py 之外 os.getenv("TRADING_NETWORK")。
src/trading/config.py 中已有的 network_str = os.getenv("TRADING_NETWORK", "testnet") 必须迁移:删除该处定义,改为 from src.config import TRADING_NETWORK。注意旧处默认值为 "testnet",与本设计的 "mainnet" 默认值冲突——以 src/config.py 为唯一真相源。
3.2 src/utils/websocket/enhanced_ws_manager.py
构造函数:ws_url 与 source_name 为必选参数,位于可选参数之前。source_name 类型标注为 Literal["market", "trading"],构造函数 ValueError 作运行时防御。
from typing import Literal
def __init__(
self,
subscriptions: List[Dict],
ws_url: str,
source_name: Literal["market", "trading"],
on_state_change: Optional[Callable] = None,
timeout: int = WS_TIMEOUT,
alert_callback: Optional[Callable] = None,
max_retries: int | None = WS_MAX_RETRIES,
alert_threshold: int = WS_ALERT_THRESHOLD,
):
if source_name not in ("market", "trading"):
raise ValueError(
f"source_name 非法: {source_name!r},仅允许 'market' 或 'trading'"
)
self.ws_url = ws_url
self._source_name = source_name
删除对 WS_URL 的 import 和 self.ws_url = WS_URL 赋值,其余 WS_TIMEOUT/WS_MAX_RETRIES/WS_ALERT_THRESHOLD 保留。
删除死事件发布:_publish_candle_event、_publish_orderbook_event、_publish_price_event 三个方法整体删除。在 _cache_latest_data() 的 candle/l2Book 分支中移除对前两个方法的调用,仅保留 TTLCache 写入;allMids 分支整体删除(未订阅,永远不会触发)。对应的事件类 CandleUpdatedEvent、OrderBookUpdatedEvent、PriceUpdatedEvent 从 trading_events.py 一并删除,对应 import 同步清理(零发布 + 零消费 = 死代码)。
source 统一:所有剩余的 _event_bus.publish(..., source=...) 改为 source=self._source_name。涉及位置:_on_open(两处)、_on_close/_on_error、_publish_user_events、_publish_order_status_events、_publish_fill_events。
3.3 src/services/realtime_kline_service_base.py
导入
from src.config import (
WS_MARKET_URL, WS_TRADING_URL, WS_RECONNECT_VERIFY_DELAY, TRADING_NETWORK,
)
删除 from src.config import WS_URL(如存在)。
属性(__init__)
self.ws_market_manager: Optional[EnhancedWebSocketManager] = None
self.ws_trading_manager: Optional[EnhancedWebSocketManager] = None
self._trading_ws_thread: Optional[threading.Thread] = None
self.market_subscriptions = self._build_market_subscriptions()
self.trading_subscriptions = self._build_trading_subscriptions()
_build_market_subscriptions():仅返回 candle + l2Book,不含交易订阅,不访问 _executor/_wallet。
def _build_market_subscriptions(self) -> List[Dict]:
"""构建行情 WS 订阅列表(candle + l2Book)。"""
subscriptions = []
for symbol in self._get_all_symbols():
coin = symbol.split('/')[0]
for interval in ['5m', '1h', '4h']:
subscriptions.append({"type": "candle", "coin": coin, "interval": interval})
subscriptions.append({"type": "l2Book", "coin": coin})
return subscriptions
_get_all_symbols()须返回包含base_symbol在内的完整币种列表(见 D7),_build_market_subscriptions不做特殊分支。
_build_trading_subscriptions():区分"未启用"(返回空列表)与"已启用但配置错误"(抛异常 fail-fast)。
def _build_trading_subscriptions(self) -> List[Dict]:
"""构建交易 WS 订阅列表(orderUpdates + userFills + user)。"""
if not self._trading_orchestrator:
return []
user_address = self._trading_orchestrator.get_trading_ws_user_address()
if not user_address:
raise ValueError(
"交易模块已启用但无法获取 user address,检查钱包配置"
)
return [
{"type": "orderUpdates", "user": user_address},
{"type": "userFills", "user": user_address},
{"type": "user", "user": user_address},
]
_init_service_threads():入口 fail-fast 检查 + 行情 WS 引用通过 orchestrator 注入 executor,不经过模块级全局变量。
def _init_service_threads(self):
# --- fail-fast:交易已启用但 orchestrator 未初始化 = 调用顺序错误 ---
if self._trading_enabled and self._trading_orchestrator is None:
raise RuntimeError(
"交易已启用但 orchestrator 未初始化,必须先调用 _init_trading_module()"
)
# --- 行情 WS(主网固定) ---
self.ws_market_manager = EnhancedWebSocketManager(
subscriptions=self.market_subscriptions,
ws_url=WS_MARKET_URL,
source_name="market",
on_state_change=self.on_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_system_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
)
self.ws_market_manager.add_message_callback(self.on_message)
# 注入行情 WS 引用
if self._trading_orchestrator:
self._trading_orchestrator.set_market_ws_manager(self.ws_market_manager)
# --- 交易 WS(随配置) ---
if self.trading_subscriptions:
self.ws_trading_manager = EnhancedWebSocketManager(
subscriptions=self.trading_subscriptions,
ws_url=WS_TRADING_URL,
source_name="trading",
on_state_change=lambda state, info=None: self.logger.info(
f"[交易WS] {state}" + (f" | {info}" if info else "")
),
timeout=WS_TIMEOUT,
alert_callback=lambda title, content: self._send_system_alert(
f"[交易WS] {title}", content
),
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
)
self.logger.info(
f"WS | 行情: {WS_MARKET_URL} | 交易: {WS_TRADING_URL} ({TRADING_NETWORK})"
)
else:
self.logger.info("交易订阅为空,仅启动行情 WS")
on_message():仅处理 K 线(去重 → 解析 → 入队),不含 orderUpdates/userFills/user 分支。
def on_message(self, msg: Dict):
if self._message_dedup.is_duplicate(msg):
return
kline = self._parse_kline(msg)
if not kline:
return
self.kline_buffer.put_nowait(kline)
生命周期
def start(self):
if self.ws_market_manager is None:
raise RuntimeError("必须先调用 _init_service_threads()")
# ... 工作线程启动 ...
if self.ws_trading_manager is not None:
self._trading_ws_thread = threading.Thread(
target=self.ws_trading_manager.start, daemon=True, name="trading-ws"
)
self._trading_ws_thread.start()
self.ws_market_manager.start() # 阻塞主线程
def stop(self):
if self._trading_orchestrator:
self._trading_orchestrator.stop()
if self.ws_trading_manager is not None:
self.ws_trading_manager.stop()
if self._trading_ws_thread and self._trading_ws_thread.is_alive():
self._trading_ws_thread.join(timeout=5)
if self._trading_ws_thread.is_alive():
self.logger.warning("交易 WS 线程在 5s 内未退出")
self.ws_market_manager.stop()
动态订阅:add_subscriptions 仅对 self.ws_market_manager(Market WS)调用,用于运行时新增 candle/l2Book 订阅(如新币种上线)。交易订阅在初始化时一次性确定,无动态变更需求。
3.4 src/trading/orchestrator.py
新增两个接口方法。Orchestrator 是 Service 层与 Executor 层之间的唯一桥梁——这两个方法维护该边界。
def get_trading_ws_user_address(self) -> Optional[str]:
"""返回交易 WS 订阅所需的 user 地址。"""
if self._executor is None:
return None
return self._executor._wallet.address
def set_market_ws_manager(self, manager: "EnhancedWebSocketManager"):
"""注入行情 WS 引用,供 Executor 读取缓存行情数据。"""
if self._executor is not None:
self._executor.set_market_ws_manager(manager)
3.5 src/trading/executor.py
行情 WS 引用注入(替代全局 get_global_ws_manager()):
def set_market_ws_manager(self, manager: "EnhancedWebSocketManager"):
"""注入行情 WS 引用。由 Orchestrator 在 _init_service_threads 阶段调用。"""
self._market_ws_manager = manager
删除 from ... import get_global_ws_manager,所有原先调用 get_global_ws_manager() 的地方改为 self._market_ws_manager。
重连 handler——两条 WS 都会发出 WebSocketReconnectedEvent,Executor 只响应 trading 来源:
def _on_websocket_reconnected(self, event: WebSocketReconnectedEvent):
if event.source != "trading":
return
self.logger.warning(
f"交易WS重连 | 断连={event.downtime_seconds:.1f}s | 清空缓存+补查订单"
)
with self._cache_lock:
self._cached_positions = []
self._cached_account_value = 0.0
self._cached_available_balance = 0.0
self._position_cache_ts = 0.0
self._balance_cache_ts = 0.0
if self._ws_order_manager:
def _reconnect_verify():
try:
self._ws_order_manager.verify_pending_orders()
time.sleep(WS_RECONNECT_VERIFY_DELAY)
self._ws_order_manager.verify_pending_orders()
except Exception:
self.logger.exception("重连后订单验证失败")
threading.Thread(
target=_reconnect_verify, daemon=True, name="trading-reconnect-verify"
).start()
PositionUpdatedEvent / BalanceChangedEvent handler 不加 source guard(见 D4)。
3.6 src/events/base.py
Event.source 保持 str 类型不变(见 D5)。
@dataclass
class Event:
timestamp: datetime
source: str
priority: EventPriority = field(default=EventPriority.NORMAL)
metadata: Dict[str, Any] = field(default_factory=dict)
event_id: str = field(default="", init=False, repr=False)
4. 删除与验收清单
每行同时标注"删什么"和"怎么验",一张表零重复。
| 文件 | 删除项 | 替代方案 | 验收 grep(结果必须为零) |
|---|---|---|---|
| src/config.py | WS_URL = "wss://..." |
WS_MARKET_URL / WS_TRADING_URL |
\bWS_URL\b in src/ |
| src/trading/config.py | network_str = os.getenv("TRADING_NETWORK", ...) 及相关分支 |
from src.config import TRADING_NETWORK |
os.getenv.*TRADING_NETWORK in src/(config.py 除外) |
| enhanced_ws_manager.py | from src.config import WS_URL |
构造参数 ws_url |
\bWS_URL\b in src/ |
| enhanced_ws_manager.py | self.ws_url = WS_URL 硬编码 |
构造参数 ws_url |
同上 |
| enhanced_ws_manager.py | 所有 source="websocket"(约 9 处) |
source=self._source_name |
source=.*websocket in src/ |
| enhanced_ws_manager.py | _publish_candle_event() 方法 |
删除(零消费者) | _publish_candle_event in src/ |
| enhanced_ws_manager.py | _publish_orderbook_event() 方法 |
删除(零消费者) | _publish_orderbook_event in src/ |
| enhanced_ws_manager.py | _publish_price_event() 方法 |
删除(零订阅 + 零消费者) | _publish_price_event in src/ |
| enhanced_ws_manager.py | _cache_latest_data 中调用上述三方法的语句 |
仅保留 TTLCache 写入 | 同上 |
| enhanced_ws_manager.py | _cache_latest_data 中 allMids 分支 |
删除(未订阅) | allMids in enhanced_ws_manager.py |
| enhanced_ws_manager.py | from ... import CandleUpdatedEvent, OrderBookUpdatedEvent, PriceUpdatedEvent |
删除 import | CandleUpdatedEvent|OrderBookUpdatedEvent|PriceUpdatedEvent in src/ |
| trading_events.py | CandleUpdatedEvent 类定义 |
删除(零发布 + 零消费) | 同上 |
| trading_events.py | OrderBookUpdatedEvent 类定义 |
删除(零发布 + 零消费) | 同上 |
| trading_events.py | PriceUpdatedEvent 类定义 |
删除(零发布 + 零消费) | 同上 |
| realtime_kline_service_base.py | _global_ws_manager 模块级变量 |
依赖注入 | _global_ws_manager in src/ |
| realtime_kline_service_base.py | _global_ws_manager_lock 线程锁 |
随全局变量一并删除 | 同上 |
| realtime_kline_service_base.py | get_global_ws_manager() 函数 |
executor.set_market_ws_manager() |
get_global_ws_manager in src/ |
| realtime_kline_service_base.py | _build_subscriptions() 方法 |
_build_market_subscriptions() + _build_trading_subscriptions() |
_build_subscriptions[^_] in src/services/ |
| realtime_kline_service_base.py | self.subscriptions(无前缀) |
self.market_subscriptions |
self\.subscriptions\b in src/services/ |
| realtime_kline_service_base.py | self.ws_manager(无前缀) |
self.ws_market_manager |
self\.ws_manager\b in src/services/ |
| realtime_kline_service_base.py | _on_trading_state_change() 方法 |
内联 lambda | _on_trading_state_change in src/ |
| realtime_kline_service_base.py | _send_trading_alert() 方法 |
内联 lambda | _send_trading_alert in src/ |
| realtime_kline_service_base.py | _build_subscriptions 中 _executor._wallet.address 直接访问 |
get_trading_ws_user_address() |
_executor.*_wallet in src/services/ |
| executor.py | from ... import get_global_ws_manager |
self._market_ws_manager |
get_global_ws_manager in src/ |
| executor.py | 所有 get_global_ws_manager() 调用 |
self._market_ws_manager |
同上 |
| executor.py | _get_ws_manager() 方法 |
self._market_ws_manager |
_get_ws_manager in src/ |
补充校验(非删除项,但必须确认归零):
| grep 模式 | 范围 | 说明 |
|---|---|---|
assert source_name |
src/ | 应使用 ValueError,不用 assert |
orderUpdates|userFills |
on_message / _build_market_subscriptions 内 |
仅允许出现在 _build_trading_subscriptions 与 _cache_latest_data |
_get_ws_manager |
src/ | 方法已删除,应零引用 |
allMids |
enhanced_ws_manager.py | 分支已删除,应零引用 |
self\.ws_manager\b |
src/services/ | 应全部替换为 ws_market_manager 或 ws_trading_manager |
self\.subscriptions\b |
src/services/ | 应全部替换为 market_subscriptions 或 trading_subscriptions |
5. 设计决策记录
正文只写"做什么","为什么不做别的"集中在此。
D1:_cache_latest_data() 共享实现不拆分
_cache_latest_data() 包含 candle/l2Book/orderUpdates/userFills/user 全部频道的 if/elif 分支,双 WS 架构下每个实例约 50% 的分支不会被触发。保留共享实现——哪些分支被触发由 subscriptions(服务器推什么)决定,未命中的分支不执行、不影响正确性。拆分方案(子类/策略模式/handler map)均引入新抽象层,增加间接性但不增加正确性。
D2:删除 _publish_candle_event、_publish_orderbook_event、_publish_price_event 及对应事件类
代码审计确认 CandleUpdatedEvent、OrderBookUpdatedEvent、PriceUpdatedEvent 在全代码库零消费者——发布后无任何 handler 订阅。candle 数据的实际消费路径是 on_message → _parse_kline → kline_buffer → DB;l2Book 数据的实际消费路径是 _cache_latest_data → TTLCache → Executor 直接读取;allMids 未出现在任何订阅列表中,分支永远不会触发。发布方法、事件类定义、对应 import 一并删除,彻底消除死代码。
D3:TRADING_NETWORK=mainnet 时仍维持两条连接
代码路径统一,行情/交易隔离,代价仅多一条长连接。避免"同网络时合并为单连接"的条件分支。
D4:PositionUpdatedEvent / BalanceChangedEvent 不加 source guard
当前架构下只有 Trading WS 发布这两类事件,guard 会保护一个不可能的场景。如果未来架构变化导致 Market WS 也发布这些事件,届时再加 guard。
D5:Event.source 保持 str 不收窄
事件系统中存在非 WS 来源的事件(如 "http"、"internal"),将基类收窄为 Literal["market", "trading"] 会破坏全局事件系统。source 合法值约束仅在 EnhancedWebSocketManager.__init__ 施加。
D6:交易 WS 的 on_state_change / alert_callback 内联为 lambda
这两个回调体只有一行日志/告警转发,不值得独立为命名方法。内联在构造调用处,读者一眼可见完整行为。
D7:base_symbol 合并进 _get_all_symbols()
原 _build_market_subscriptions 中有 base_symbol 的额外检查分支——配对交易场景下 base_symbol 可能不在 _get_all_symbols() 返回值中。将合并逻辑下沉至 _get_all_symbols(),使其返回包含 base_symbol 在内的完整去重列表。_build_market_subscriptions 变为纯循环,零条件分支。
D8:重连验证——常量化 + 可观测化
交易 WS 重连后必须:①清空 position/balance 缓存(断连期间推送中断,缓存可能过期);②双重 verify_pending_orders()(间隔 WS_RECONNECT_VERIFY_DELAY 秒,防止最终一致性延迟导致首次查询遗漏状态变化)。验证线程命名为 trading-reconnect-verify,异常被捕获并记录日志——重连是低频事件,但未捕获的异常会导致线程静默死亡,丢失第二次验证。
D9:属性命名对称化
self.subscriptions → self.market_subscriptions,self.ws_manager → self.ws_market_manager。老名称源于单 WS 时代,在双 WS 架构下语义模糊(ws_manager 是哪个?)。既然原则是"老代码彻底删除",命名必须反映新架构。改名是纯机械操作,grep 验收确保零残留。
6. 验收测试
| # | 场景 | 预期 |
|---|---|---|
| 1 | TRADING_NETWORK=testnet |
行情 WS 连主网,交易 WS 连测试网 |
| 2 | 测试网下单 | 通过交易 WS 收到 orderUpdates/userFills |
| 3 | 交易 WS 重连 | 自动重订阅 + Executor 触发 verify_pending_orders |
| 4 | 行情 WS 重连 | 自动重订阅 + Executor 不触发订单补查 |
| 5 | TRADING_ENABLED=false |
仅行情 WS,无交易 WS |
| 6 | TRADING_NETWORK=mainnet |
两 WS 连同一地址,功能正常 |
| 7 | TRADING_NETWORK=invalid |
回退 mainnet + warning 日志 |
| 8 | 交易模块已启用但钱包地址为空 | _build_trading_subscriptions 抛 ValueError,进程启动失败 |
| 9 | Trading WS 构造异常 | 异常上抛,进程启动失败(fail-fast) |
| 10 | 交易已启用但 _init_trading_module() 未调用 |
_init_service_threads 抛 RuntimeError |
| 11 | 两 WS 同时高频推送 5 分钟 | 事件被消费者正确处理,无丢失、无异常日志 |
| 12 | 全局 grep 第 4 节验收列 | 全部为零 |