双WebSocket架构设计2
双 WebSocket 架构设计文档(K 线主网 + 订单/用户推送随配置)
文档版本:3.0 | 最后更新:2026-02-22
3.0 变更:架构仅保留双 WebSocket,移除单 WS 回滚(DUAL_WS_ENABLED、_build_trading_subscriptions_legacy、_handle_trading_channel 等);Executor 直接按source="trading"过滤;同步修正 2.2 OrderFilledEvent 表述、缓冲区 1000 条 error 告警、config 行号与构造函数参数说明。
1. 目标与约束
需求
- K 线与 L2 订单簿:始终连接主网 WebSocket(
wss://api.hyperliquid.xyz/ws),与TRADING_NETWORK无关。 - orderUpdates / userFills / user:连接随
TRADING_NETWORK变化的 WebSocket(testnet 时wss://api.hyperliquid-testnet.xyz/ws,mainnet 时与主网一致),与下单/查单网络保持一致。
非目标
- 不改变现有交易 HTTP API、订单跟踪逻辑、WebSocketOrderManager 行为;仅改变「订单/用户推送」的数据来源连接。
安全默认值
TRADING_NETWORK默认值为"mainnet",防止忘记配置时静默连到测试网下单。
架构范围:本架构仅采用双 WebSocket(行情 WS + 交易 WS),不考虑单 WS 回滚。
背景:当前为单 WebSocket 连接主网;当 TRADING_NETWORK=testnet 时订单在测试网提交,但订单/用户推送仍来自主网连接,导致收不到测试网 orderUpdates/userFills,持仓/余额数据也与测试网不一致。本设计通过拆分为 Market WS(主网固定)+ Trading WS(随配置)解决该问题。
2. 现状分析
2.1 单 WebSocket 架构(当前)
src/config.py:104:WS_URL = "wss://api.hyperliquid.xyz/ws",写死主网。src/services/realtime_kline_service_base.py的_build_subscriptions()(第 401~459 行)一次性构建:candle + l2Book +(若启用交易)orderUpdates、userFills、user。_init_service_threads()(第 306~314 行):只创建一个EnhancedWebSocketManager(self.subscriptions, ...),并add_message_callback(self.on_message)。src/utils/websocket/enhanced_ws_manager.py:275:self.ws_url = WS_URL,无参数化。
因此:K 线、L2、订单推送共用一个主网连接。当 TRADING_NETWORK=testnet 时,订单在测试网下单,但推送仍来自主网连接,导致:
- 收不到测试网订单的 orderUpdates/userFills,只能依赖 HTTP 兜底
userchannel 的持仓/余额是主网数据,与测试网实际账户不一致
2.2 消息处理调用链(实测)
通过阅读源码确认,EnhancedWebSocketManager._wrapped_callback() 的实际调用顺序为:
WebSocket 收到消息
→ _on_message() [解析 JSON]
→ _wrapped_callback()
1. health_monitor.on_message() # 健康监控
2. _cache_latest_data(msg) # 缓存数据 + 发布 EventBus 事件 ← 先
3. for cb in message_callbacks: cb(msg) # 触发外部回调 ← 后
关键结论:当外部回调(on_message / _on_trading_message)被调用时,_cache_latest_data 已经执行完毕,EventBus 事件已经发布。因此:
userchannel 消息:EventBus 已发布PositionUpdatedEvent/OrderFilledEvent/BalanceChangedEvent,外部回调无需重复处理。candle/l2Book消息:EventBus 已发布对应事件,on_message负责的是业务逻辑(K 线入队分析),而不是事件发布。
2.3 事件发布全景
EnhancedWebSocketManager 通过 _cache_latest_data() 和 _on_open() 发布以下事件到全局 EventBus:
| 触发时机 | 事件类型 | 关键字段 |
|---|---|---|
_on_open 重连时 |
WebSocketReconnectedEvent |
downtime_seconds, source |
_on_open 首连时 |
WebSocketConnectedEvent |
connection_id, source |
_on_close 断连时 |
WebSocketDisconnectedEvent |
reason, reconnect_scheduled |
user 频道 assetPositions |
PositionUpdatedEvent |
positions, account_value, margin_summary |
user 频道 fills |
OrderFilledEvent |
order_id, filled_qty, filled_price, fee |
user 频道 marginSummary |
BalanceChangedEvent |
available_balance, total_balance, margin_used |
| candle 频道 | CandleUpdatedEvent |
K 线数据 |
| l2Book 频道 | OrderBookUpdatedEvent |
订单簿数据 |
所有事件的 source 字段当前固定为 "websocket"。双 WS 架构下需要区分来源:Market WS 发布的事件 source="market",Trading WS 发布的事件 source="trading"。
OrderFilledEvent 当前由 EnhancedWebSocketManager 发布;若未来其他模块订阅该事件,同样须仅处理 source="trading",避免误用行情连接来源。
2.4 缓冲区回放机制(实测)
_order_msg_buffer 是 deque(maxlen=500),已有两套回放机制:
- 即时回放:
on_message()每次被调用时,检查缓冲区并尝试回放(利用任意新消息到达的时机)。 - 周期回放:
_periodic_buffer_flush()独立线程,每 2 秒主动检查一次缓冲区。
双 WS 拆分后,_order_msg_buffer 是 service 级别的共享属性,与哪个 WS 收到消息无关。_buffer_flush_thread 无需复制,仍可服务于交易 WS 的缓冲区。
2.5 userFills 与 user.fills 重叠分析
当前订阅了两个相关频道:
userFillschannel →on_message()→WebSocketOrderManager.handle_message()→ 内部订单状态更新userchannel(含fills字段) →_cache_latest_data()→_publish_user_events()→OrderFilledEvent(已发布,供未来订阅方使用);Executor 的缓存刷新主要由PositionUpdatedEvent/BalanceChangedEvent驱动
两者不重叠:
userFills是 WebSocketOrderManager 使用的原始成交流水,用于维护订单状态机。user.fills是账户级聚合推送,触发 Executor 的持仓/余额缓存刷新。
双 WS 架构下,两者都从 Trading WS 来,处理路径保持不变,无重复处理问题。
2.6 EventBus 单例
EventBus 是线程安全单例(src/events/event_bus.py:18-30):
class EventBus:
_instance: "EventBus | None" = None
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
影响:
EnhancedWebSocketManager.__init__第 294 行:self._event_bus = EventBus()→ 拿到全局单例Executor.__init__第 86 行:self._event_bus = EventBus()→ 拿到同一个单例- 即:所有 WS manager 和 executor 共享同一个 EventBus,不存在"两条总线独立"的问题
EventBus.subscribe()自带同一 handler 去重(第 51 行:if handler in handlers: return),不会因重复调用而注册多次
结论:无需"注册第二个 bus";只需通过事件 source 字段区分来源。executor 的 _subscribe_events() 已在全局单例上订阅,天然能收到两个 WS manager 发布的事件。
3. 目标架构
flowchart LR
subgraph config [Config]
WS_URL["WS_URL (主网固定)"]
WS_TRADING_URL["WS_TRADING_URL (随 TRADING_NETWORK)"]
end
subgraph market [Market WS — 主网]
Mgr1["EnhancedWebSocketManager\nsource='market'"]
Mgr1 -->|订阅| Sub1[candle / l2Book]
Mgr1 -->|wss://api.hyperliquid.xyz/ws| HL_M[Hyperliquid Mainnet]
end
subgraph trading [Trading WS — 随配置]
Mgr2["EnhancedWebSocketManager\nsource='trading'"]
Mgr2 -->|订阅| Sub2[orderUpdates / userFills / user]
Mgr2 -->|WS_TRADING_URL| HL_T[Hyperliquid Testnet/Mainnet]
end
config --> Mgr1
config --> Mgr2
Mgr1 -->|on_message| Kline[K 线 + L2 处理]
Mgr1 -->|EventBus source=market| EB[全局 EventBus]
Mgr2 -->|_on_trading_message| OrderMgr[WebSocketOrderManager]
Mgr2 -->|EventBus source=trading| EB
EB -->|过滤 source=trading| Exec[Executor]
核心设计决策:
- Market WS:仅订阅 candle、l2Book;URL 固定主网;
source="market"。 - Trading WS:仅订阅 orderUpdates、userFills、user;URL 为
WS_TRADING_URL;source="trading"。 - EventBus 不改:保持全局单例,用事件
source字段区分来源;executor 的事件 handler 仅处理source="trading"的事件。 - 无需
register_trading_ws_event_bus:单例 EventBus 天然共享,无需额外注册。
4. 数据流
4.1 行情(Market WS)
主网 WS → _wrapped_callback()
1. _cache_latest_data() → CandleUpdatedEvent(source="market") → EventBus
2. on_message() → K 线解析 → kline_buffer → 分析入队
get_global_ws_manager() 仍返回该 manager,L2 缓存基于主网数据,供 executor 的 get_all_mids() 等读取。
4.2 订单与用户(Trading WS)
交易网 WS → _wrapped_callback()
1. _cache_latest_data():
- orderUpdates/userFills → 无 EventBus 事件(由外部回调处理)
- user channel → _publish_user_events():
PositionUpdatedEvent(source="trading") → EventBus → Executor(过滤通过)
OrderFilledEvent(source="trading") → EventBus → Executor(过滤通过)
BalanceChangedEvent(source="trading") → EventBus → Executor(过滤通过)
2. _on_trading_message():
- user channel → 已由步骤1处理,此处显式跳过(避免重复)
- 缓冲区即时回放(_periodic_buffer_flush 每 2 秒兜底)
- orderUpdates/userFills → WebSocketOrderManager.handle_message()
4.3 重连
Trading WS 重连 → WebSocketReconnectedEvent(source="trading")
→ Executor._on_websocket_reconnected() 检查 source=="trading"
→ 触发 verify_pending_orders 双轮补查 ✅
Market WS 重连 → WebSocketReconnectedEvent(source="market")
→ Executor._on_websocket_reconnected() 检查 source≠"trading"
→ 跳过,不触发订单补查 ✅
4.4 半连接降级
| 状态 | 系统行为 |
|---|---|
| Market WS 断开 + Trading WS 正常 | K 线/L2 缓存过期,executor 仍可下单(HTTP),L2 价格不更新;飞书告警 [行情WS] 断连 |
| Market WS 正常 + Trading WS 断开 | K 线正常,订单推送回退 HTTP 兜底;飞书告警 [交易WS] 断连;持仓/余额依赖 HTTP 查询 |
| 两者都断开 | 全面降级,所有功能依赖 HTTP;两个 WS 各自独立重连 |
5. 模块级设计
| 模块 | 职责变化 |
|---|---|
| config | 新增 WS_TRADING_URL(随 TRADING_NETWORK 推导) |
| EnhancedWebSocketManager | 在现有参数末尾新增可选 ws_url、source_name 参数;source_name 用于所有事件发布 |
| RealtimeKlineServiceBase | 拆分订阅;交易 WS 消息单独回调;双 manager 生命周期管理 |
| Executor | 三个事件 handler 增加 source 过滤,仅处理 source="trading" 的事件 |
6. 文件级实现规格
6.1 src/config.py
位置:WebSocket 配置块(第 100~120 行)之后。
# 交易推送 WebSocket 地址(随 TRADING_NETWORK 切换)
_trading_network = os.getenv("TRADING_NETWORK", "mainnet").lower()
WS_TRADING_URL = os.getenv("WS_TRADING_URL") or (
"wss://api.hyperliquid-testnet.xyz/ws"
if _trading_network == "testnet"
else "wss://api.hyperliquid.xyz/ws"
)
说明:
TRADING_NETWORK默认"mainnet"(安全默认值,避免忘记配置时静默连测试网下单)。WS_TRADING_URL支持环境变量直接覆盖(便于特殊部署/调试)。- 可选:在 config 加载后校验
TRADING_NETWORK in ("mainnet", "testnet");若为非法值则打 warning 并回退为 mainnet。 - 行数:约 +6。
6.2 src/utils/websocket/enhanced_ws_manager.py
6.2.1 构造函数扩展
位置:__init__ 签名(第 229 行)与 self.ws_url 赋值(第 275 行)。在现有参数列表末尾(含 skip_disconnects 等)新增 ws_url、source_name:
def __init__(
self,
subscriptions: List[Dict],
on_state_change: Optional[Callable] = None,
timeout: int = WS_TIMEOUT,
skip_disconnects: bool = False,
alert_callback: Optional[Callable] = None,
max_retries: int | None = WS_MAX_RETRIES,
alert_threshold: int = WS_ALERT_THRESHOLD,
ws_url: Optional[str] = None, # ← 新增
source_name: str = "websocket", # ← 新增
):
...
self.ws_url = ws_url if ws_url is not None else WS_URL # ← 修改
self._source_name = source_name # ← 新增
...
logger.info(
f"✅ EnhancedWebSocketManager 已初始化 | "
f"source={self._source_name} | url={self.ws_url}"
)
6.2.2 事件 source 字段参数化
将所有 self._event_bus.publish(...) 中的 source="websocket" 替换为 source=self._source_name:
| 方法 | 约行号 | 事件类型 |
|---|---|---|
_on_open |
638 | WebSocketReconnectedEvent |
_on_open |
646 | WebSocketConnectedEvent |
_on_close/_on_error |
~738 | WebSocketDisconnectedEvent |
_cache_latest_data |
~1346 | CandleUpdatedEvent |
_cache_latest_data |
~1366 | OrderBookUpdatedEvent |
_cache_latest_data |
~1380 | PriceUpdatedEvent |
_publish_user_events |
1401 | PositionUpdatedEvent |
_publish_user_events |
1413 | OrderFilledEvent |
_publish_user_events |
1435 | BalanceChangedEvent |
共 9 处,均为单行替换:source="websocket" → source=self._source_name。
行数:约 +4 行新增,9 行修改。
6.3 src/services/realtime_kline_service_base.py
6.3.1 属性初始化(__init__ 第 193 行附近)
在 __init__ 中总是初始化以下属性,避免后续 hasattr 检查:
# 双 WS 架构(总是初始化,有交易订阅时创建交易 WS 实例)
self.ws_trading_manager: Optional[EnhancedWebSocketManager] = None
self._trading_ws_thread: Optional[threading.Thread] = None
6.3.2 订阅拆分
_build_subscriptions()(第 401 行):删除第 443~457 行的交易订阅逻辑,仅保留 candle + l2Book。
新增 _build_trading_subscriptions() -> List[Dict]:
def _build_trading_subscriptions(self) -> List[Dict]:
"""构建交易 WS 订阅列表(orderUpdates / userFills / user)。
仅当交易模块启用时返回非空列表。
"""
from src.config import WS_TRADING_URL
try:
executor = self._trading_orchestrator._executor
except AttributeError:
executor = None
if executor is None:
self.logger.info("交易模块未启用,跳过交易 WS 订阅构建")
return []
user_address = executor._wallet.address
subs = [
{"type": "orderUpdates", "user": user_address},
{"type": "userFills", "user": user_address},
{"type": "user", "user": user_address},
]
self.logger.info(
f"✅ 交易 WS 订阅已构建 | url={WS_TRADING_URL} | "
f"user={user_address[:10]}... | 订阅数={len(subs)}"
)
return subs
6.3.3 __init__ 订阅初始化(第 207 行附近)
self.subscriptions = self._build_subscriptions()
self.trading_subscriptions = self._build_trading_subscriptions()
self.logger.info(
f"订阅数量 | 行情={len(self.subscriptions)} / 交易={len(self.trading_subscriptions)}"
)
6.3.4 _init_service_threads()(第 277 行)
def _init_service_threads(self):
from src.config import WS_TRADING_URL
# —— 行情 WS(主网固定,source="market")——
self.ws_manager = EnhancedWebSocketManager(
subscriptions=self.subscriptions,
on_state_change=self.on_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_system_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
source_name="market",
)
self.ws_manager.add_message_callback(self.on_message)
_set_global_ws_manager(self.ws_manager) # L2 缓存仍基于行情 WS
# —— 交易 WS(随配置,source="trading")——
if self.trading_subscriptions:
self.ws_trading_manager = EnhancedWebSocketManager(
subscriptions=self.trading_subscriptions,
on_state_change=self._on_trading_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_trading_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
ws_url=WS_TRADING_URL,
source_name="trading",
)
self.ws_trading_manager.add_message_callback(self._on_trading_message)
self.logger.info(
f"🌐 WS 连接配置 | 行情: {WS_URL} (主网固定)"
f" | 交易: {WS_TRADING_URL} (TRADING_NETWORK={os.getenv('TRADING_NETWORK','mainnet')})"
)
else:
self.logger.info("✅ 交易订阅为空(TRADING_ENABLED=false),仅启动行情 WS")
6.3.5 交易 WS 消息回调 _on_trading_message
def _on_trading_message(self, msg: Dict):
"""交易 WS 消息回调。
注意:调用本方法时,_cache_latest_data() 已执行完毕:
- user channel 的 EventBus 事件(PositionUpdatedEvent 等)已发布
- 本方法只需处理需要路由到 WebSocketOrderManager 的消息
"""
channel = msg.get("channel") or msg.get("type", "")
# user channel 已由 _cache_latest_data → _publish_user_events 处理,跳过
if channel == "user":
return
# orderUpdates / userFills → WebSocketOrderManager
if channel in ("orderUpdates", "userFills"):
# 即时缓冲区回放(_periodic_buffer_flush 每 2 秒兜底,此处为即时优化)
if self._order_msg_buffer:
mgr = self._get_ws_order_manager()
if mgr is not None:
while True:
try:
buffered = self._order_msg_buffer.popleft()
mgr.handle_message(buffered)
except IndexError:
break
mgr = self._get_ws_order_manager()
if mgr is not None:
try:
mgr.handle_message(msg)
except Exception as e:
self.logger.error(f"处理交易WS订单消息失败: {e}", exc_info=True)
else:
self._order_msg_buffer.append(msg)
buf_size = len(self._order_msg_buffer)
if buf_size >= 1000:
self.logger.error(f"订单消息缓冲区异常膨胀({buf_size}条),订单管理器可能初始化异常")
elif buf_size >= 100 and buf_size % 100 == 0:
self.logger.warning(f"订单消息缓冲区较大({buf_size}条),等待 WebSocketOrderManager 就绪")
缓冲区归属:双 WS 架构下,订单相关消息仅经 _on_trading_message 入队与回放;行情 on_message 不再写入 _order_msg_buffer。
6.3.6 交易 WS 状态回调 _on_trading_state_change
def _on_trading_state_change(self, state: str, info: Optional[str] = None):
"""交易 WS 连接状态变化回调,复用行情 WS 的 on_state_change 签名。"""
self.logger.info(f"[交易WS] 状态变化: {state}" + (f" | {info}" if info else ""))
# 可按需扩展:状态持久化、监控指标更新等
6.3.7 告警区分 _send_trading_alert
def _send_trading_alert(self, title: str, content: str):
"""交易 WS 告警,带来源前缀便于运维区分。"""
self._send_system_alert(f"[交易WS] {title}", content)
6.3.8 行情 on_message 精简
从现有 on_message()(第 606~673 行)中删除以下内容:
- 缓冲区回放逻辑(第 623~636 行)—— 已移至
_on_trading_message - orderUpdates/userFills 路由逻辑(第 639~660 行)—— 订单消息仅由交易 WS 的
_on_trading_message处理
行情 on_message 仅处理行情(K 线 + 去重),不再包含任何订单相关分支:
def on_message(self, msg: Dict):
if self._message_dedup.is_duplicate(msg):
return
# 行情 WS 只收到 candle / l2Book,仅做 K 线处理
kline = self._parse_kline(msg)
if not kline:
return
self.kline_buffer.put_nowait(kline)
6.3.9 生命周期管理
start()(第 2145 行附近):
def start(self):
... # 原有工作线程启动(含 _buffer_flush_thread,服务于共享 _order_msg_buffer)
# 交易 WS 在独立 daemon 线程启动(ws_manager.start() 是阻塞调用)
if self.ws_trading_manager is not None:
self._trading_ws_thread = threading.Thread(
target=self.ws_trading_manager.start,
daemon=True,
name="trading-ws",
)
self._trading_ws_thread.start()
self.logger.info("✅ 交易 WS 已在独立线程启动")
# 行情 WS 阻塞当前线程(保持原有行为)
self.ws_manager.start()
stop()(第 2185 行附近):
def stop(self):
# 1. 停止交易 orchestrator
if self._trading_orchestrator:
self._trading_orchestrator.stop()
# 2. 停止交易 WS(先于行情 WS)
if self.ws_trading_manager is not None:
self.ws_trading_manager.stop()
self.logger.info("✅ 交易 WS stop() 已调用")
# 等待线程退出(最多 5 秒,避免 daemon 线程被强杀时消息丢失)
if self._trading_ws_thread is not None and self._trading_ws_thread.is_alive():
self._trading_ws_thread.join(timeout=5)
if self._trading_ws_thread.is_alive():
self.logger.warning("交易 WS 线程在 5s 内未退出,继续停止流程")
else:
self.logger.info("✅ 交易 WS 线程已退出")
# 3. 停止行情 WS(保持原有逻辑)
self.ws_manager.stop()
...
6.3.10 动态订阅(新币种)
第 2068~2073 行:add_subscriptions 仅对 self.ws_manager 调用(只加 candle/l2Book),不涉及交易 WS。无需改动。
6.3.11 导入
from src.config import WS_TRADING_URL
行数变化:约 +70~90(新增方法),-20~25(删除旧订单路由),净增约 50~65 行。
6.4 src/trading/executor.py
位置:事件 handler 方法(第 1754~1794 行)。
def _on_position_updated(self, event: PositionUpdatedEvent):
if event.source != "trading":
return # 仅处理交易 WS 的持仓事件
# ... 原有逻辑不变 ...
def _on_balance_changed(self, event: BalanceChangedEvent):
if event.source != "trading":
return # 仅处理交易 WS 的余额事件
# ... 原有逻辑不变 ...
def _on_websocket_reconnected(self, event: WebSocketReconnectedEvent):
if event.source != "trading":
return # 仅交易 WS 重连时触发订单补查
# ... 原有逻辑不变 ...
说明:
- 仅处理
source="trading"的事件,行情 WS 发布的事件(source="market")一律忽略。 - 无需新增
register_trading_ws_event_bus方法:EventBus 是全局单例,executor 已在_subscribe_events()中订阅,天然能收到交易 WS 发布的事件。 OrderFilledEvent若被其他模块订阅,同样须仅处理source="trading",避免误用行情来源数据。- 行数:约 +3(每个 handler 加 1 行判断)。
7. 初始化时序
__init__
├── [194] _init_trading_module() → TradingOrchestrator + Executor 初始化
│ └── Executor._subscribe_events() → 在全局 EventBus 上订阅 3 类事件
├── [+] self.ws_trading_manager = None → 总是初始化(防御性)
├── [+] self._trading_ws_thread = None → 总是初始化(防御性)
├── [207] _build_subscriptions() → 行情订阅 (candle + l2Book)
├── [+] _build_trading_subscriptions() → 交易订阅(交易启用时非空)
└── [221] _init_service_threads()
├── ws_manager (行情, source="market")
└── ws_trading_manager (交易, source="trading") ← 有交易订阅时创建
start()
├── 工作线程启动(含 _buffer_flush_thread,服务于共享 _order_msg_buffer)
├── ws_trading_manager.start() ← daemon 线程(若存在,先启动)
└── ws_manager.start() ← 阻塞主线程
stop()
├── _trading_orchestrator.stop()
├── ws_trading_manager.stop() ← 先停交易 WS
├── _trading_ws_thread.join(timeout=5) ← 等待线程退出(优雅关闭)
├── ws_manager.stop() ← 再停行情 WS
└── 等待队列清空
关键保证:
_init_trading_module()(第 194 行)在_init_service_threads()(第 221 行)之前执行,创建ws_trading_manager时_trading_orchestrator._executor已存在。ws_trading_manager和_trading_ws_thread总在__init__中初始化为None,无需hasattr检查。- Executor 的
_subscribe_events()在 executor 构造时就已执行,早于任何 WS manager 创建,不会遗漏事件。 _buffer_flush_thread(周期性缓冲区回放)服务于 service 级别的self._order_msg_buffer,与哪个 WS 收到消息无关,无需复制。
8. 边界与注意事项
8.1 get_global_ws_manager
始终返回行情 WS(主网),L2 缓存与 executor 的 get_all_mids() 等仍基于主网数据,符合「K 线/L2 始终主网」原则。
8.2 消息去重
on_message 中的 MessageDeduplicator 仅作用于行情消息。_on_trading_message 不调用 _message_dedup:
- 交易消息量远小于行情消息,去重收益低。
WebSocketOrderManager内部有 oid 级去重。- 若后续需要,可为交易 WS 单独实例化
MessageDeduplicator(传入不同的 namespace key)。
8.3 并发安全
self._order_msg_buffer是deque(maxlen=500),Python 中deque.append()和deque.popleft()在 CPython 中是原子操作,多线程安全。_on_trading_message(交易 WS 线程)和_buffer_flush_thread(独立线程)都会popleft(),由于deque的原子性不会产生数据竞争,可能发生"竞争消耗"——这是预期行为(两者都在尽力回放)。ws_trading_manager.stop()后_trading_ws_thread.join(timeout=5)确保线程退出后再停行情 WS,避免_order_msg_buffer仍在被消费时行情 WS 就关闭。
8.4 连接失败与配置校验
- Trading WS 连接失败:行为与现有 EnhancedWebSocketManager 一致(重试 + 飞书告警);不阻塞行情 WS,系统以降级模式运行(订单依赖 HTTP 兜底)。
- 配置校验(可选):在 config 加载后校验
TRADING_NETWORK in ("mainnet", "testnet");若为非法值则打 warning 并回退为 mainnet。也可对WS_TRADING_URL做 URL 格式校验,非法时 fail fast 或 fallback 到由 TRADING_NETWORK 推导的默认值。
8.5 start() 阻塞问题
EnhancedWebSocketManager.start() 是阻塞调用,因此:
- 交易 WS 必须在独立 daemon 线程启动。
- 行情 WS 保持在主线程阻塞(与当前行为一致)。
stop()时两个 manager 的stop_event独立,互不影响。
8.6 mainnet=mainnet 场景
TRADING_NETWORK=mainnet 时,WS_TRADING_URL = WS_URL,两个 manager 连接同一地址,建立两条独立连接。功能正常,逻辑分离清晰,无副作用。
9. 监控指标
双 WS 架构新增以下日志/指标,便于运维监控:
| 指标 | 来源 | 告警条件 |
|---|---|---|
market_ws_state |
Market WS on_state_change |
连接断开 >30s |
trading_ws_state |
Trading WS _on_trading_state_change |
连接断开 >10s |
order_msg_buffer_size |
_on_trading_message 中的 len(_order_msg_buffer) |
>100 条 |
trading_ws_thread_alive |
stop() 中的 join 结果 |
join 超时 |
启动时打印一条综合日志(在 _init_service_threads() 末尾):
🌐 WS 连接配置
行情 WS: wss://api.hyperliquid.xyz/ws (主网固定)
交易 WS: wss://api.hyperliquid-testnet.xyz/ws (TRADING_NETWORK=testnet)
两个 manager 通过 source_name 和 _send_trading_alert 前缀区分日志与告警,若需指标系统,可为两个 manager 打不同标签(ws_source=market / ws_source=trading)。
10. 测试验证清单
功能测试
| # | 场景 | 预期 |
|---|---|---|
| 1 | TRADING_NETWORK=testnet |
行情 WS 连主网,交易 WS 连测试网,日志确认两个 URL |
| 2 | 测试网下单后 | 能通过交易 WS 收到 orderUpdates/userFills |
| 3 | 交易 WS 重连 | executor 触发 verify_pending_orders 双轮补查 |
| 4 | 行情 WS 重连 | executor 不触发订单补查 |
| 5 | TRADING_ENABLED=false |
仅一个行情 WS,无交易 WS,无报错 |
| 6 | TRADING_NETWORK=mainnet |
两个 WS 连同一地址,功能正常(逻辑分离) |
| 7 | 交易 WS 断开期间下单 | HTTP 兜底正常,重连后推送恢复 |
单元测试(关键场景)
| # | 测试点 | 验证方式 |
|---|---|---|
| T1 | source="market" 的 WebSocketReconnectedEvent |
Executor verify_pending_orders 不触发 |
| T2 | source="trading" 的 PositionUpdatedEvent |
Executor 缓存正确更新 |
| T3 | source="market" 的 PositionUpdatedEvent |
Executor 缓存不更新 |
| T4 | _on_trading_message 收到 user channel |
不调用 WebSocketOrderManager,无副作用 |
| T5 | 缓冲区回放:先缓冲后就绪 | _periodic_buffer_flush 在 2 秒内清空缓冲区 |
压力/并发测试
| # | 场景 | 预期 |
|---|---|---|
| P1 | 两个 WS 同时高频推送消息 | 无消息串扰,行情/交易各自独立处理 |
| P2 | 交易 WS 线程与 _buffer_flush_thread 并发消费 _order_msg_buffer |
无数据竞争,消息被且仅被处理一次(WebSocketOrderManager 内部 oid 去重保障) |
| P3 | stop() 时交易 WS 仍有未处理消息 |
join(timeout=5) 后消息被合理丢弃或记录 |
11. 改动量汇总
| 文件 | 新增 | 修改/删除 | 合计(约) |
|---|---|---|---|
| src/config.py | +6 | 0 | 6 |
| src/utils/websocket/enhanced_ws_manager.py | +4 | ~12(9处source替换+3处签名) | ~16 |
| src/services/realtime_kline_service_base.py | +70~90 | -20~25(删除旧订单路由) | 50~65 |
| src/trading/executor.py | +3 | 0 | 3 |
| 合计 | — | — | 约 75~90 |
12. 依赖与兼容性
-
不新增第三方依赖。
-
Hyperliquid 测试网 WS 地址:
wss://api.hyperliquid-testnet.xyz/ws(官方文档)。 -
Event 基类
source: str字段已存在(src/events/base.py:39),无需修改事件定义。 -
EventBus.subscribe()自带 handler 去重(第 51 行),不会因多次调用重复注册。 -
.env.example新增:# 交易网络配置(mainnet/testnet) # 影响下单/查单 HTTP API 和订单推送 WS 地址 TRADING_NETWORK=mainnet # 手动覆盖交易推送 WS 地址(可选,默认由 TRADING_NETWORK 推导) # WS_TRADING_URL=wss://api.hyperliquid-testnet.xyz/ws
13. 与原方案的差异对照
| 维度 | 原方案 | 当前方案 |
|---|---|---|
| EventBus 处理 | 提议 register_trading_ws_event_bus() 注册第二个 bus |
利用全局单例 + source 字段过滤,无需额外注册 |
| 重连误触发 | 未处理 Market WS 重连也触发补查的问题 | executor handler 过滤 source≠"trading" 的事件 |
| 事件来源标识 | 所有事件 source="websocket" |
Market WS source="market",Trading WS source="trading" |
user channel 处理说明 |
注释说"由内部处理",调用顺序不明 | 明确 _cache_latest_data 先于外部回调,_on_trading_message 中显式 return |
userFills 与 user.fills |
未分析两者关系 | 明确两者不重叠,路径独立 |
| 缓冲区回放 | 每条消息触发,周期线程归属不明 | 明确两套机制共存合理,_buffer_flush_thread 无需复制 |
stop() 线程等待 |
未包含 _trading_ws_thread.join() |
新增 join(timeout=5) 优雅关闭 |
ws_trading_manager 初始化 |
依赖 hasattr 检查 |
总在 __init__ 中初始化为 None |
TRADING_NETWORK 默认值 |
"testnet" |
"mainnet"(安全默认值) |
| 半连接降级 | 未讨论 | 第 4.4 节明确三种降级场景 |
start() 阻塞 |
未提及 | 交易 WS 在独立 daemon 线程启动 |
| executor 事件过滤 | 未区分来源 | 仅处理 source="trading" 的事件 |
_on_trading_state_change |
提及但未定义 | 提供完整实现规格 |
| 告警区分 | 简单提及共用或包装 | 明确 _send_trading_alert 带前缀,第 8 节说明区分方式 |
行情 on_message 精简 |
含订单路由逻辑 | 仅处理 K 线;订单消息仅由交易 WS 处理 |
| 动态订阅(新币种) | 未明确 | 第 6.3.10 节明确仅对行情 WS 调用 |
| 事件类型覆盖 | 仅提 3 种 | 第 2.3 节列出全部 9 种事件及 source 替换 |
| 监控指标 | 无 | 第 9 节,4 项关键指标 |
| 测试清单 | 7 个功能场景 | 7 个功能 + 5 个单元 + 3 个压力场景 |