双WebSocket架构设计9
双 WebSocket 架构设计文档(K 线主网 + 订单/用户推送随配置)
1. 目标与约束
需求
- K 线与 L2 订单簿:始终连接主网 WebSocket(
wss://api.hyperliquid.xyz/ws),与TRADING_NETWORK无关。 - orderUpdates / userFills / user:连接随
TRADING_NETWORK变化的 WebSocket(testnet 时wss://api.hyperliquid-testnet.xyz/ws,mainnet 时与主网一致),与下单/查单网络保持一致。
非目标
- 不改变现有交易 HTTP API、订单状态机与 WebSocketOrderManager 的对外行为;仅改变「订单/用户推送」的数据来源:从单一主网 WS 改为行情 WS(主网)+ 交易 WS(随配置)。
实现原则(必守)
- 按重写执行:相关方法按下文「契约与删除清单」重写,不以「在第 X 行插入/删除」的补丁方式实现;实现后单 WS 与订单同网的任何分支、注释零残留。
- 不保留兼容路径:不保留「若未传
ws_url则用全局 URL」「若未传source_name则用websocket」的默认值;两处创建 EnhancedWebSocketManager 时必须显式传入ws_url与source_name。 - 事件 source 合法取值仅两种:
"market"、"trading"。实现后全局禁止再出现source="websocket"。
安全默认值
TRADING_NETWORK默认值为"mainnet",防止忘记配置时静默连到测试网下单。
架构范围:本架构仅采用双 WebSocket(行情 WS + 交易 WS),不考虑单 WS 回滚。
背景:当前为单 WebSocket 连接主网;当 TRADING_NETWORK=testnet 时订单在测试网提交,但订单/用户推送仍来自主网连接,导致收不到测试网 orderUpdates/userFills,持仓/余额数据也与测试网不一致。本设计通过拆分为 Market WS(主网固定)+ Trading WS(随配置)解决该问题。
2. 现状分析
2.1 单 WebSocket 架构(当前)
src/config.py:WS_URL写死主网,唯一 WS 地址。_build_subscriptions()一次性构建:candle + l2Book +(若启用交易)orderUpdates、userFills、user。_init_service_threads():只创建一个EnhancedWebSocketManager,绑定单一回调。EnhancedWebSocketManager:self.ws_url = WS_URL,无参数化。
因此:K 线、L2、订单推送共用一个主网连接。当 TRADING_NETWORK=testnet 时,订单在测试网下单,但推送仍来自主网连接,导致收不到测试网 orderUpdates/userFills,user 频道持仓/余额与测试网不一致。
2.2 消息处理调用链
EnhancedWebSocketManager._wrapped_callback() 的调用顺序:
WebSocket 收到消息
→ _on_message() [解析 JSON]
→ _wrapped_callback()
1. health_monitor.on_message() # 健康监控
2. _cache_latest_data(msg) # 缓存 + 发布 EventBus 事件 ← 先
3. for cb in message_callbacks: cb(msg) # 触发外部回调 ← 后
关键结论:外部回调被调用时,_cache_latest_data 已执行完毕。因此:
- orderUpdates / userFills:在
_cache_latest_data内已调用_publish_order_status_events/_publish_fill_events,事件已发布到 EventBus;WebSocketOrderManager 通过订阅 OrderStatusEvent / OrderFilledEvent 接收,不存在「原始消息 → WebSocketOrderManager.handle_message」的路径(当前代码中无handle_message方法)。 - user channel:
_cache_latest_data已调用_publish_user_events(),PositionUpdatedEvent / BalanceChangedEvent 等已发布;Trading WS 不注册任何 message 回调,仅依赖_cache_latest_data发布事件即可。
2.3 事件发布全景
EnhancedWebSocketManager 通过 _cache_latest_data() 和 _on_open() 发布以下事件到全局 EventBus:
| 触发时机 | 事件类型 | 关键字段 |
|---|---|---|
_on_open 重连时 |
WebSocketReconnectedEvent |
downtime_seconds, source |
_on_open 首连时 |
WebSocketConnectedEvent |
connection_id, source |
_on_close 断连时 |
WebSocketDisconnectedEvent |
reason, reconnect_scheduled |
user 频道 assetPositions |
PositionUpdatedEvent |
positions, account_value, margin_summary |
user 频道 fills |
OrderFilledEvent |
order_id, filled_qty, filled_price, fee |
user 频道 marginSummary |
BalanceChangedEvent |
available_balance, total_balance, margin_used |
| orderUpdates 频道 | (内部)_publish_order_status_events → OrderStatusEvent |
供 WebSocketOrderManager 订阅 |
| userFills 频道 | (内部)_publish_fill_events → OrderFilledEvent |
供 WebSocketOrderManager 订阅 |
| candle 频道 | CandleUpdatedEvent |
K 线数据 |
| l2Book 频道 | OrderBookUpdatedEvent |
订单簿数据 |
双 WS 架构下 source 合法取值仅 "market" 与 "trading":Market WS 发布 source="market",Trading WS 发布 source="trading"。
2.4 EventBus 单例
EventBus 是线程安全单例。所有 WS manager 与 executor 共享同一 EventBus,无需「注册第二个 bus」。通过事件 source 区分来源;executor 仅处理 source="trading" 的事件。
2.5 userFills 与 user.fills 职责分工
- userFills:Trading WS 的
_cache_latest_data→_publish_fill_events→ OrderFilledEvent → WebSocketOrderManager 订阅,用于订单状态机与成交价。 - user.fills:同一连接下
_publish_user_events发布 OrderFilledEvent,供 Executor 等刷新持仓/余额缓存。
两者均来自 Trading WS,路径独立,不重叠。
实现后约定:第 2 节仅作背景说明;实现后代码中不得保留与单 WS、source=websocket 或「订单与行情同源」相关的分支或注释。
3. 目标架构
以下目标架构为唯一实现形态;实现后不存在单 WS 分支、不存在按配置切换单/双 WS 的逻辑。
flowchart LR
subgraph config [Config]
WS_MARKET_URL["WS_MARKET_URL (主网固定)"]
WS_TRADING_URL["WS_TRADING_URL (随 TRADING_NETWORK)"]
end
subgraph market [Market WS — 主网]
Mgr1["EnhancedWebSocketManager\nsource='market'"]
Mgr1 -->|订阅| Sub1[candle / l2Book]
Mgr1 -->|WS_MARKET_URL| HL_M[Hyperliquid Mainnet]
end
subgraph trading [Trading WS — 随配置]
Mgr2["EnhancedWebSocketManager\nsource='trading'"]
Mgr2 -->|订阅| Sub2[orderUpdates / userFills / user]
Mgr2 -->|WS_TRADING_URL| HL_T[Hyperliquid Testnet/Mainnet]
end
config --> Mgr1
config --> Mgr2
Mgr1 -->|on_message| Kline[K 线 + L2 处理]
Mgr1 -->|EventBus source=market| EB[全局 EventBus]
Mgr2 -->|_cache_latest_data 发布 无 message 回调| EB
EB -->|OrderStatusEvent/OrderFilledEvent| OrderMgr[WebSocketOrderManager]
EB -->|过滤 source=trading| Exec[Executor]
核心设计决策:
- Market WS:仅订阅 candle、l2Book;URL 固定主网(
WS_MARKET_URL);source="market"。 - Trading WS:仅订阅 orderUpdates、userFills、user;URL 为
WS_TRADING_URL;source="trading"。 - 订单/用户数据流:仅通过 EventBus。orderUpdates/userFills 在 Trading WS 的
_cache_latest_data内发布 OrderStatusEvent/OrderFilledEvent;WebSocketOrderManager 仅通过订阅上述事件接收,不存在「原始 WS 消息 → handle_message」的调用。 - EventBus:保持全局单例;executor 的 handler 仅处理
source="trading"的事件。
4. 数据流
4.1 行情(Market WS)
主网 WS → _wrapped_callback()
1. _cache_latest_data() → CandleUpdatedEvent(source="market")、OrderBookUpdatedEvent(source="market") → EventBus + L2 缓存更新
2. on_message() → K 线解析 → kline_buffer → 分析入队
get_market_ws_manager() 返回该 manager(见 8.1),L2 缓存基于主网数据,供 executor 的 get_all_mids() 等读取。
4.2 订单与用户(Trading WS)
交易网 WS → _wrapped_callback()
→ _cache_latest_data():
- orderUpdates → _publish_order_status_events() → OrderStatusEvent(source="trading") → EventBus → WebSocketOrderManager 订阅
- userFills → _publish_fill_events() → OrderFilledEvent(source="trading") → EventBus → WebSocketOrderManager 订阅
- user → _publish_user_events() → PositionUpdatedEvent / BalanceChangedEvent 等(source="trading") → EventBus → Executor 过滤后处理
Trading WS 不注册任何 message 回调(不调用 add_message_callback),仅依赖 _cache_latest_data 发布事件。不引入订单消息缓冲区与 handle_message;若初始化顺序保证 Executor/WebSocketOrderManager 在交易 WS 收包前已订阅 EventBus,无需缓冲与回放逻辑。
4.3 重连
Trading WS 重连 → WebSocketReconnectedEvent(source="trading")
→ Executor._on_websocket_reconnected() 仅当 source=="trading" 时执行
→ 触发 verify_pending_orders 双轮补查 ✅
Market WS 重连 → WebSocketReconnectedEvent(source="market")
→ Executor._on_websocket_reconnected() 因 source≠"trading" 直接 return,不触发订单补查 ✅
4.4 半连接降级
| 状态 | 系统行为 |
|---|---|
| Market WS 断开 + Trading WS 正常 | K 线/L2 缓存过期,executor 仍可下单(HTTP),L2 价格不更新;飞书告警 [行情WS] 断连 |
| Market WS 正常 + Trading WS 断开 | K 线正常,订单推送回退 HTTP 兜底;飞书告警 [交易WS] 断连;持仓/余额依赖 HTTP 查询 |
| 两者都断开 | 全面降级,所有功能依赖 HTTP;两个 WS 各自独立重连 |
5. 模块级设计
| 模块 | 职责变化 |
|---|---|
| config | 提供 WS_MARKET_URL(主网固定)、WS_TRADING_URL(随 TRADING_NETWORK);不再保留「唯一 WS 地址」的单一变量语义。 |
| EnhancedWebSocketManager | 构造函数必选参数 ws_url: str、source_name: str(无默认值);所有事件发布使用 source=self._source_name,合法取值仅 market / trading。 |
| TradingOrchestrator | 提供 get_trading_ws_user_address(self) -> Optional[str]:有 executor 时返回钱包地址(用于交易 WS 订阅),否则返回 None。RealtimeKlineServiceBase 不得访问 Orchestrator 的 _executor 或 _wallet,仅通过此接口获取地址。 |
| RealtimeKlineServiceBase | 订阅拆分为行情/交易两套;行情 on_message 仅处理 K 线;交易 WS 不注册任何 message 回调,仅依赖 _cache_latest_data 发布事件;「是否启用交易 WS」与交易 WS 的 user 地址仅通过 TradingOrchestrator.get_trading_ws_user_address() 获取,不探测 _executor/_wallet;双 manager 生命周期管理。仅当 trading_subscriptions 非空时创建并启动交易 WS,不存在空订阅的 trading manager。 |
| Executor | 三个事件 handler 入口增加 source=="trading" 过滤,否则直接 return。 |
| get_market_ws_manager | 原 get_global_ws_manager 重命名为 get_market_ws_manager,语义为「行情 WS」,禁止用于交易逻辑。 |
6. 文件级实现规格
实现时以方法名与下文契约/删除清单为准;行号仅作参考,若与当前代码不一致则以契约为准。
6.1 src/config.py
契约:提供两个明确语义的 URL 变量,不保留「一个 URL 同时用于行情+交易」的单一变量。
import os
import logging
# ---------- 行情 WebSocket(主网固定) ----------
WS_MARKET_URL = "wss://api.hyperliquid.xyz/ws"
# ---------- 交易 WebSocket(随 TRADING_NETWORK) ----------
_trading_network = os.getenv("TRADING_NETWORK", "mainnet").lower()
if _trading_network not in ("mainnet", "testnet"):
logging.getLogger(__name__).warning(
"TRADING_NETWORK=%s 非法,回退为 mainnet", os.getenv("TRADING_NETWORK")
)
_trading_network = "mainnet"
WS_TRADING_URL = os.getenv("WS_TRADING_URL") or (
"wss://api.hyperliquid-testnet.xyz/ws"
if _trading_network == "testnet"
else "wss://api.hyperliquid.xyz/ws"
)
说明:若现有代码中仍有对 WS_URL 的引用,实现时统一改为 WS_MARKET_URL 并删除 WS_URL,确保无交易路径使用行情 URL 变量。
6.2 src/utils/websocket/enhanced_ws_manager.py
契约:ws_url 与 source_name 为必选参数,无默认值;事件 source 仅使用 self._source_name,合法取值仅 "market"、"trading";实现后全局禁止出现 source="websocket"。
6.2.1 构造函数
def __init__(
self,
subscriptions: List[Dict],
on_state_change: Optional[Callable] = None,
timeout: int = WS_TIMEOUT,
skip_disconnects: bool = False,
alert_callback: Optional[Callable] = None,
max_retries: int | None = WS_MAX_RETRIES,
alert_threshold: int = WS_ALERT_THRESHOLD,
ws_url: str, # 必选,无默认值
source_name: str, # 必选,取值仅 "market" 或 "trading"
):
...
self.ws_url = ws_url
self._source_name = source_name
6.2.2 事件发布
所有 _event_bus.publish(...) 中的 source= 使用 source=self._source_name(共约 9 处:_on_open 两处、_on_close/_on_error 一处、_publish_candle_event、_publish_orderbook_event、_publish_price_event、_publish_user_events 内多处、_publish_order_status_events、_publish_fill_events)。实现后全局搜索 "websocket",确保事件与测试中不再使用该字面量。另须同步修改 src/events/base.py 中 Event 的 source 字段 docstring(仅允许 market/trading),见 8.5 与 13。
6.3 src/services/realtime_kline_service_base.py
契约与删除清单(实现后必须满足)
-
_build_subscriptions()- 契约:仅返回 candle + l2Book 的订阅列表。
- 删除清单:实现后不得包含 orderUpdates、userFills、user 的订阅构建;不得包含对
_executor/_wallet的访问或「若启用交易则追加」等分支;不得使用hasattr(..., '_executor')或getattr(..., '_executor', None)。
-
on_message(msg)- 契约:仅处理行情:去重 → K 线解析 → 入队;不根据 channel 做订单相关分支。
- 删除清单:实现后不得包含
channel in ("orderUpdates", "userFills", "user")的判断;不得包含订单缓冲区写入/回放;不得调用 WebSocketOrderManager 或任何「订单原始消息」处理。
-
交易 WS:不注册任何 message 回调(不实现
_on_trading_message,不调用add_message_callback);订单/用户数据仅由_cache_latest_data发布到 EventBus。
顶层导入
from src.config import WS_MARKET_URL, WS_TRADING_URL
6.3.1 属性初始化(__init__)
self.ws_trading_manager: Optional[EnhancedWebSocketManager] = None
self._trading_ws_thread: Optional[threading.Thread] = None
6.3.2 订阅拆分与「交易 WS 是否启用」
契约:RealtimeKlineServiceBase 不得访问 TradingOrchestrator 的 _executor 或 _wallet;「是否启用交易 WS」与 user 地址仅通过 TradingOrchestrator.get_trading_ws_user_address() 获取(见 5 模块级设计)。
_build_subscriptions() 仅保留 candle + l2Book(无任何交易订阅逻辑)。
TradingOrchestrator 需提供(在 src/trading/orchestrator.py 实现):
def get_trading_ws_user_address(self) -> Optional[str]:
"""返回交易 WS 订阅所需的 user 地址;无 executor 时返回 None。"""
if self._executor is None:
return None
return self._executor._wallet.address
RealtimeKlineServiceBase 中:
def _build_trading_subscriptions(self) -> List[Dict]:
"""构建交易 WS 订阅列表(orderUpdates / userFills / user)。仅当 Orchestrator 返回地址时返回非空。"""
if not self._trading_orchestrator:
self.logger.info("交易模块未启用,跳过交易 WS 订阅构建")
return []
user_address = self._trading_orchestrator.get_trading_ws_user_address()
if not user_address:
self.logger.info("交易模块未启用,跳过交易 WS 订阅构建")
return []
return [
{"type": "orderUpdates", "user": user_address},
{"type": "userFills", "user": user_address},
{"type": "user", "user": user_address},
]
(可选)若仍需布尔判断,可保留一行方法:def _is_trading_ws_enabled(self) -> bool: return bool(self._trading_orchestrator and self._trading_orchestrator.get_trading_ws_user_address()),且不得内含对 _executor/_wallet 的访问。
6.3.3 订阅初始化(__init__)
self.subscriptions = self._build_subscriptions()
self.trading_subscriptions = self._build_trading_subscriptions()
6.3.4 _init_service_threads()
创建两个 manager 时均显式传入 ws_url 与 source_name:
def _init_service_threads(self):
# 行情 WS(主网固定,source="market")
self.ws_manager = EnhancedWebSocketManager(
subscriptions=self.subscriptions,
on_state_change=self.on_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_system_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
ws_url=WS_MARKET_URL,
source_name="market",
)
self.ws_manager.add_message_callback(self.on_message)
_set_market_ws_manager(self.ws_manager) # 见 8.1:原 get_global_ws_manager 改为 get_market_ws_manager
if self.trading_subscriptions:
self.ws_trading_manager = EnhancedWebSocketManager(
subscriptions=self.trading_subscriptions,
on_state_change=self._on_trading_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_trading_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
ws_url=WS_TRADING_URL,
source_name="trading",
)
# 不注册 message 回调:订单/用户数据仅由 _cache_latest_data 发布到 EventBus
self.logger.info(
f"🌐 WS 连接配置 | 行情: {WS_MARKET_URL} (主网固定)"
f" | 交易: {WS_TRADING_URL} (TRADING_NETWORK={os.getenv('TRADING_NETWORK','mainnet')})"
)
else:
self.logger.info("交易订阅为空,仅启动行情 WS")
约定:仅当 self.trading_subscriptions 非空时创建并启动交易 WS;不存在「空订阅的 trading manager」或占位线程。
6.3.5 交易 WS 状态与告警(Trading WS 不注册 message 回调)
def _on_trading_state_change(self, state: str, info: Optional[str] = None):
self.logger.info(f"[交易WS] 状态变化: {state}" + (f" | {info}" if info else ""))
def _send_trading_alert(self, title: str, content: str):
self._send_system_alert(f"[交易WS] {title}", content)
6.3.6 行情 on_message
按契约重写为仅处理行情(去重 + K 线解析 + 入队),无任何订单相关分支:
def on_message(self, msg: Dict):
if self._message_dedup.is_duplicate(msg):
return
kline = self._parse_kline(msg)
if not kline:
return
self.kline_buffer.put_nowait(kline)
# ... 后续仅与 K 线入队、分析入队相关,无 orderUpdates/userFills/user 分支
6.3.7 生命周期管理
start():先启动交易 WS 线程(若存在),再阻塞于行情 WS。
def start(self):
# 原有工作线程启动
if self.ws_trading_manager is not None:
self._trading_ws_thread = threading.Thread(
target=self.ws_trading_manager.start,
daemon=True,
name="trading-ws",
)
self._trading_ws_thread.start()
self.ws_manager.start()
stop():先停 orchestrator → 交易 WS(含 join)→ 行情 WS。
def stop(self):
if self._trading_orchestrator:
self._trading_orchestrator.stop()
if self.ws_trading_manager is not None:
self.ws_trading_manager.stop()
if self._trading_ws_thread is not None and self._trading_ws_thread.is_alive():
self._trading_ws_thread.join(timeout=5)
if self._trading_ws_thread.is_alive():
self.logger.warning("交易 WS 线程在 5s 内未退出")
self.ws_manager.stop()
# ... 其余等待队列清空等
6.3.8 动态订阅(新币种)
add_subscriptions 仅对 self.ws_manager 调用(只加 candle/l2Book),不涉及交易 WS。
6.4 src/trading/executor.py
三个事件 handler 入口增加 source 过滤,仅处理 source="trading":
def _on_position_updated(self, event: PositionUpdatedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
def _on_balance_changed(self, event: BalanceChangedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
def _on_websocket_reconnected(self, event: WebSocketReconnectedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
6.5 全局行情 WS 获取(原 get_global_ws_manager)
将 get_global_ws_manager 重命名为 get_market_ws_manager,语义为「获取行情 WS(主网)」。所有调用处(如 executor 中获取 L2/mids)改为调用 get_market_ws_manager();实现后禁止将「行情 manager」用于交易相关逻辑。
7. 初始化时序
__init__
├── _init_trading_module() → TradingOrchestrator + Executor,Executor._subscribe_events() 订阅 EventBus
├── self.ws_trading_manager = None
├── self._trading_ws_thread = None
├── _build_subscriptions() → 仅 candle + l2Book
├── _build_trading_subscriptions() → 仅当 _is_trading_ws_enabled() 为 True 时 orderUpdates/userFills/user
└── _init_service_threads()
├── ws_manager(ws_url=WS_MARKET_URL, source_name="market")
└── ws_trading_manager(ws_url=WS_TRADING_URL, source_name="trading") [仅当 trading_subscriptions 非空]
start()
├── 工作线程启动
├── ws_trading_manager.start() [daemon 线程,若存在]
└── ws_manager.start() [阻塞主线程]
stop()
├── _trading_orchestrator.stop()
├── ws_trading_manager.stop() + _trading_ws_thread.join(timeout=5)
└── ws_manager.stop()
8. 边界与注意事项
8.1 get_market_ws_manager(原 get_global_ws_manager)
- 实现时将原
get_global_ws_manager/_set_global_ws_manager重命名为get_market_ws_manager/_set_market_ws_manager。 - 始终返回行情 WS(主网);L2 缓存与 executor 的
get_all_mids()等基于此。禁止将返回值用于交易相关逻辑。
8.2 消息去重
on_message 的 MessageDeduplicator 仅作用于行情。Trading WS 无 message 回调,交易消息量小,且 WebSocketOrderManager 内部有 oid 级去重。
8.3 并发与阻塞
EnhancedWebSocketManager.start()为阻塞调用,故交易 WS 必须在独立 daemon 线程启动。stop()中先停交易 WS 并join(timeout=5),再停行情 WS,避免竞态。
8.4 连接失败与 mainnet=mainnet
- Trading WS 连接失败:与现有重试与飞书告警一致,不阻塞行情 WS;订单依赖 HTTP 兜底。
TRADING_NETWORK=mainnet时两 URL 可相同,两条独立连接,逻辑分离清晰。
8.5 事件 source 过滤约定
- 事件
source合法取值仅:"market"、"trading"。实现后全局搜索"websocket",确保事件发布与测试中不再使用。 - 凡依赖「交易/账户数据」的消费者(如 Executor、未来订阅 OrderFilledEvent 的模块),均须只处理
source="trading"的事件。 - 必改项:实现时须修改
src/events/base.py中 Event 的source字段 docstring,仅保留合法取值说明(例如:「事件来源,仅允许market(行情 WS)或trading(交易 WS)」),删除websocket/http/internal等旧语义。自检见 13。
9. 监控指标
| 指标 | 来源 | 说明 |
|---|---|---|
| market_ws_state | Market WS on_state_change |
连接断开 >30s 可告警 |
| trading_ws_state | Trading WS 状态回调(带 [交易WS] 前缀) |
连接断开 >10s 可告警 |
| trading_ws_thread_alive | stop() 中 join 结果 |
join 超时可打 warning |
启动时打印一条综合日志(在 _init_service_threads() 末尾):行情 WS URL、交易 WS URL、TRADING_NETWORK。
10. 测试验证清单
功能测试
| # | 场景 | 预期 |
|---|---|---|
| 1 | TRADING_NETWORK=testnet | 行情 WS 连主网,交易 WS 连测试网 |
| 2 | 测试网下单后 | 能通过交易 WS 收到 orderUpdates/userFills(经 EventBus 到 WebSocketOrderManager) |
| 3 | 交易 WS 重连 | Executor 仅当 source=="trading" 时触发 verify_pending_orders |
| 4 | 行情 WS 重连 | Executor 不触发订单补查 |
| 5 | TRADING_ENABLED=false | 仅行情 WS,无交易 WS |
| 6 | TRADING_NETWORK=mainnet | 两 WS 可连同一地址,功能正常 |
| 7 | 交易 WS 断开期间下单 | HTTP 兜底正常,重连后推送恢复 |
单元测试
| # | 测试点 | 验证方式 |
|---|---|---|
| T1 | source="market" 的 WebSocketReconnectedEvent | Executor 不触发订单补查 |
| T2 | source="trading" 的 PositionUpdatedEvent | Executor 缓存正确更新 |
| T3 | source="market" 的 PositionUpdatedEvent | Executor 不更新 |
| T4 | Trading WS 不注册 message 回调 | 仅 _cache_latest_data 发布事件,无 add_message_callback |
| T5 | 事件 source 仅 market/trading | 无 source="websocket" 的发布或断言 |
压力/并发
| # | 场景 | 预期 |
|---|---|---|
| P1 | 两 WS 同时高频推送 | 无串扰,行情/交易各自处理 |
| P2 | stop() 时交易 WS 仍有消息 | join(timeout=5) 后流程正常结束 |
11. 改动量汇总
| 文件 | 变更要点 |
|---|---|
| src/config.py | 新增 WS_MARKET_URL、WS_TRADING_URL;删除或替换 WS_URL 为 WS_MARKET_URL |
| src/utils/websocket/enhanced_ws_manager.py | 构造函数增加必选 ws_url、source_name;所有事件 source 使用 self._source_name |
| src/trading/orchestrator.py | 新增 get_trading_ws_user_address() -> Optional[str],供 RealtimeKlineServiceBase 构建交易 WS 订阅;K-line 层不得访问 _executor/_wallet |
| src/services/realtime_kline_service_base.py | 订阅拆分、_build_trading_subscriptions 仅调 get_trading_ws_user_address()、Trading WS 不注册 message 回调、on_message 仅行情、双 manager 生命周期、_set_market_ws_manager |
| src/trading/executor.py | 三处 handler 增加 if event.source != "trading": return |
| src/events/base.py | Event.source 的 docstring 改为仅允许 market/trading(见 8.5) |
| 全局 | get_global_ws_manager → get_market_ws_manager,调用处同步修改 |
12. 依赖与兼容性
- 不新增第三方依赖。
- Hyperliquid 测试网 WS:
wss://api.hyperliquid-testnet.xyz/ws。 - Event 基类已有
source: str字段,无需改事件定义。 .env.example增加 TRADING_NETWORK、可选 WS_TRADING_URL 说明。
13. 实现后自检清单(防死代码/冗余)
13.0 实现后全局禁止符号(grep 零结果)
实现完成后,生产代码中以下符号/字面量 grep 结果为零(仅允许出现在文档、测试预期或注释中的历史说明等明确例外位置):
| 禁止项 | 说明 |
|---|---|
WS_URL |
已由 WS_MARKET_URL / WS_TRADING_URL 替代,删除或替换后不得残留 |
get_global_ws_manager、_set_global_ws_manager |
已重命名为 get_market_ws_manager / _set_market_ws_manager |
source="websocket"、source='websocket' |
事件 source 仅允许 "market" / "trading" |
RealtimeKlineServiceBase 中对 _executor、_wallet 的访问 |
仅通过 TradingOrchestrator.get_trading_ws_user_address() 获取地址 |
实现与 Code Review 时可按上表逐项 grep 核对。
实现完成后,建议逐项确认:
- [ ] Config:无「单一 URL 同时用于行情+交易」的变量;WS_MARKET_URL / WS_TRADING_URL 语义清晰;所有 WS_URL 引用已改为 WS_MARKET_URL(或已删除 WS_URL)。
- [ ] EnhancedWebSocketManager:构造函数
ws_url、source_name为必选,无默认值;所有事件发布使用source=self._source_name;全局搜索无source="websocket"。 - [ ] TradingOrchestrator:已实现
get_trading_ws_user_address() -> Optional[str];RealtimeKlineServiceBase 中无对_executor/_wallet的访问。 - [ ] RealtimeKlineServiceBase:
_build_subscriptions()无 orderUpdates/userFills/user 及对 _executor/_wallet 的访问;_build_trading_subscriptions()仅调用get_trading_ws_user_address();on_message无订单相关 channel 判断与 buffer 逻辑;不存在对WebSocketOrderManager.handle_message的调用。Trading WS 不注册 message 回调(不存在_on_trading_message,不对交易 manager 调用add_message_callback)。仅当trading_subscriptions非空时创建交易 WS,无空订阅的 trading manager。 - [ ] 订单数据流:仅通过 EventBus(OrderStatusEvent / OrderFilledEvent,source="trading");WebSocketOrderManager 仅通过订阅接收。
- [ ] Executor:三个事件 handler 仅处理
source="trading",无「兼容旧 source」分支。 - [ ] 命名:
get_global_ws_manager已重命名为get_market_ws_manager,调用处已更新。 - [ ] 事件基类:
src/events/base.py中 Event 的source字段 docstring 与类型注解(若有)仅出现market/trading,无websocket/http/internal等旧语义。 - [ ] 13.0 禁止符号:上表所列禁止项在生产代码中 grep 零结果;实现中不存在
_on_trading_message方法或对交易 WS 的add_message_callback调用。 - [ ] 全局搜索:
orderUpdates、userFills、user仅出现在订阅构建、Trading WS 的 _cache_latest_data、以及测试/文档中;无残留的订单路由或 buffer 回放逻辑在行情 on_message 中。