双WebSocket架构设计12
双 WebSocket 架构设计文档(K 线主网 + 订单/用户推送随配置)
1. 目标与约束
背景:原单 WebSocket 连接主网;当 TRADING_NETWORK=testnet 时订单在测试网提交,但推送仍来自主网,导致收不到测试网 orderUpdates/userFills,持仓/余额数据与测试网不一致。本设计拆分为 Market WS(主网固定)+ Trading WS(随配置)解决该问题。
需求
- K 线与 L2 订单簿:始终连接主网 WebSocket(
wss://api.hyperliquid.xyz/ws),与TRADING_NETWORK无关。 - orderUpdates / userFills / user:连接随
TRADING_NETWORK变化的 WebSocket(testnet 时wss://api.hyperliquid-testnet.xyz/ws,mainnet 时与主网一致)。
非目标:不改变现有交易 HTTP API、订单状态机与 WebSocketOrderManager 的对外行为。
实现原则(必守)
- 按重写执行:相关方法按下文「契约与删除清单」重写,不以补丁方式实现;实现后单 WS 与订单同网的任何分支、注释零残留。
- 不保留兼容路径:不保留「若未传
ws_url则用全局 URL」「若未传source_name则用websocket」的默认值;两处创建 EnhancedWebSocketManager 时必须显式传入ws_url与source_name。 - 事件 source 合法取值仅两种:
"market"、"trading"。实现后全局禁止出现source="websocket"。
安全默认值:TRADING_NETWORK 默认值为 "mainnet",防止忘记配置时静默连到测试网下单。
WS_TRADING_URL 覆盖:若环境变量已设置 WS_TRADING_URL,则以其为准,不再按 TRADING_NETWORK 推导;未设置时才根据 TRADING_NETWORK 选择主网/测试网 URL。
架构范围:仅采用双 WebSocket(行情 WS + 交易 WS),不考虑单 WS 回滚。若未来需回滚到单 WS,需单独评估分支与数据流恢复方案,本设计不包含回滚步骤。
2. 术语与单 WS 的差异
| 术语 | 含义 |
|---|---|
| Market WS | 行情 WebSocket,固定连主网,仅订阅 candle、l2Book,source="market" |
| Trading WS | 交易 WebSocket,随 TRADING_NETWORK/WS_TRADING_URL 变化,仅订阅 orderUpdates、userFills、user,source="trading" |
| source | 事件来源标识,合法取值仅 "market"、"trading" |
| EventBus | 全局单例事件总线,订单/用户数据仅经此传递,不经过 WS message 回调 |
与单 WS 的差异:原架构为单连接同时承载行情与订单;本设计拆为两条连接——行情固定主网、订单随配置,所有事件带 source,订单与用户数据仅经 EventBus 由 Trading WS 发布,Executor/WebSocketOrderManager 仅消费 source="trading" 的事件。
3. 目标架构
以下为唯一实现形态;实现后不存在单 WS 分支、不存在按配置切换单/双 WS 的逻辑。
flowchart LR
subgraph config [Config]
WS_MARKET_URL["WS_MARKET_URL (主网固定)"]
WS_TRADING_URL["WS_TRADING_URL (随 TRADING_NETWORK)"]
end
subgraph market [Market WS — 主网]
Mgr1["EnhancedWebSocketManager\nsource='market'"]
Mgr1 -->|订阅| Sub1[candle / l2Book]
Mgr1 -->|WS_MARKET_URL| HL_M[Hyperliquid Mainnet]
end
subgraph trading [Trading WS — 随配置]
Mgr2["EnhancedWebSocketManager\nsource='trading'"]
Mgr2 -->|订阅| Sub2[orderUpdates / userFills / user]
Mgr2 -->|WS_TRADING_URL| HL_T[Hyperliquid Testnet/Mainnet]
end
config --> Mgr1
config --> Mgr2
Mgr1 -->|on_message| Kline[K 线 + L2 处理]
Mgr1 -->|EventBus source=market| EB[全局 EventBus]
Mgr2 -->|_cache_latest_data 发布 无 message 回调| EB
EB -->|OrderStatusEvent/OrderFilledEvent| OrderMgr[WebSocketOrderManager]
EB -->|过滤 source=trading| Exec[Executor]
核心设计决策:
- Market WS:仅订阅 candle、l2Book;URL 固定主网(
WS_MARKET_URL);source="market"。 - Trading WS:仅订阅 orderUpdates、userFills、user;URL 为
WS_TRADING_URL;source="trading"。 - 订单/用户数据流:仅通过 EventBus。orderUpdates/userFills 在 Trading WS 的
_cache_latest_data内发布 OrderStatusEvent/OrderFilledEvent;WebSocketOrderManager 仅通过订阅上述事件接收,不存在「原始 WS 消息 → handle_message」的调用。 - EventBus:保持全局单例;executor 的 handler 仅处理
source="trading"的事件。
4. 数据流
4.1 行情(Market WS)
主网 WS → _wrapped_callback()
1. _cache_latest_data() → CandleUpdatedEvent(source="market")、OrderBookUpdatedEvent(source="market") → EventBus + L2 缓存更新
2. on_message() → K 线解析 → kline_buffer → 分析入队
get_market_ws_manager() 返回该 manager,L2 缓存基于主网数据,供 executor 的 get_all_mids() 等读取。
4.2 订单与用户(Trading WS)
交易网 WS → _wrapped_callback()
→ _cache_latest_data():
- orderUpdates → _publish_order_status_events() → OrderStatusEvent(source="trading") → EventBus → WebSocketOrderManager 订阅
- userFills → _publish_fill_events() → OrderFilledEvent(source="trading") → EventBus → WebSocketOrderManager 订阅
- user → _publish_user_events() → PositionUpdatedEvent / BalanceChangedEvent(source="trading") → EventBus → Executor 过滤后处理
Trading WS 不注册任何 message 回调(不调用 add_message_callback),仅依赖 _cache_latest_data 发布事件。
EventBus 订阅顺序保证:Executor 与 WebSocketOrderManager 的 _subscribe_events() 均在 __init__ 阶段调用(先于 start()),而 Trading WS daemon 线程在 start() 中才启动并建立网络连接,故订阅必然先于第一条消息到达。此顺序由代码结构强制保证,无需额外缓冲或回放逻辑。若未来调整初始化顺序,需重新评估此假设。
4.3 事件发布全景
| 触发时机 | 事件类型 | source |
|---|---|---|
_on_open 首连时 |
WebSocketConnectedEvent |
market / trading |
_on_open 重连时 |
WebSocketReconnectedEvent |
market / trading |
_on_close 断连时 |
WebSocketDisconnectedEvent |
market / trading |
user 频道 assetPositions |
PositionUpdatedEvent |
trading |
user 频道 fills |
OrderFilledEvent |
trading |
user 频道 marginSummary |
BalanceChangedEvent |
trading |
| orderUpdates 频道 | OrderStatusEvent |
trading |
| userFills 频道 | OrderFilledEvent |
trading |
| candle 频道 | CandleUpdatedEvent |
market |
| l2Book 频道 | OrderBookUpdatedEvent |
market |
4.4 重连
Trading WS 重连 → WebSocketReconnectedEvent(source="trading")
→ Executor._on_websocket_reconnected() 仅当 source=="trading" 时执行 → 触发 verify_pending_orders ✅
Market WS 重连 → WebSocketReconnectedEvent(source="market")
→ Executor._on_websocket_reconnected() 因 source≠"trading" 直接 return,不触发订单补查 ✅
4.5 半连接降级
| 状态 | 系统行为 |
|---|---|
| Market WS 断开 + Trading WS 正常 | K 线/L2 缓存过期,executor 仍可下单(HTTP),L2 价格不更新;飞书告警 [行情WS] |
| Market WS 正常 + Trading WS 断开 | K 线正常,订单推送回退 HTTP 兜底;飞书告警 [交易WS];持仓/余额依赖 HTTP |
| 两者都断开 | 全面降级,所有功能依赖 HTTP;两个 WS 各自独立重连 |
5. 模块级设计
| 模块 | 职责变化 |
|---|---|
| config | 提供 WS_MARKET_URL(主网固定)、WS_TRADING_URL(随 TRADING_NETWORK);删除 WS_URL。 |
| EnhancedWebSocketManager | 构造函数必选参数 ws_url: str、source_name: str(无默认值,置于可选参数之前);构造时 assert source_name in ("market", "trading") 运行时校验;所有事件发布使用 source=self._source_name,合法取值仅 market / trading。 |
| TradingOrchestrator | 新增 get_trading_ws_user_address(self) -> Optional[str]:有 executor 时返回钱包地址,否则返回 None。 |
| RealtimeKlineServiceBase | 订阅拆分为行情/交易两套;on_message 仅处理 K 线;Trading WS 不注册 message 回调;「是否启用交易 WS」与 user 地址仅通过 TradingOrchestrator.get_trading_ws_user_address() 获取,不探测 _executor/_wallet;双 manager 生命周期管理。 |
| Executor | 三个事件 handler 入口增加 source=="trading" 过滤,否则直接 return。 |
| get_market_ws_manager | 原 get_global_ws_manager 重命名,语义为「行情 WS(主网)」,禁止用于交易逻辑。 |
6. 文件级实现规格
实现时以方法名与下文契约/删除清单为准;行号仅作参考,若与当前代码不一致则以契约为准。
6.1 src/config.py
import os
import logging
# ---------- 行情 WebSocket(主网固定) ----------
WS_MARKET_URL = "wss://api.hyperliquid.xyz/ws"
# ---------- 交易 WebSocket(随 TRADING_NETWORK) ----------
_trading_network = os.getenv("TRADING_NETWORK", "mainnet").lower()
if _trading_network not in ("mainnet", "testnet"):
logging.getLogger(__name__).warning(
"TRADING_NETWORK=%s 非法,回退为 mainnet", os.getenv("TRADING_NETWORK")
)
_trading_network = "mainnet"
WS_TRADING_URL = os.getenv("WS_TRADING_URL") or (
"wss://api.hyperliquid-testnet.xyz/ws"
if _trading_network == "testnet"
else "wss://api.hyperliquid.xyz/ws"
)
配置覆盖:若已设置 WS_TRADING_URL,则以其为准,不再按 TRADING_NETWORK 推导。
删除清单:删除 WS_URL;所有对 WS_URL 的引用改为 WS_MARKET_URL 并删除原变量。
6.2 src/utils/websocket/enhanced_ws_manager.py
契约:ws_url 与 source_name 为必选参数,无默认值;事件 source 仅使用 self._source_name,合法取值仅 "market"、"trading";实现后全局禁止出现 source="websocket"。
6.2.1 构造函数
def __init__(
self,
subscriptions: List[Dict],
ws_url: str, # 必选,无默认值;位于可选参数之前避免语法错误
source_name: str, # 必选,取值仅 "market" 或 "trading"
on_state_change: Optional[Callable] = None,
timeout: int = WS_TIMEOUT,
skip_disconnects: bool = False,
alert_callback: Optional[Callable] = None,
max_retries: int | None = WS_MAX_RETRIES,
alert_threshold: int = WS_ALERT_THRESHOLD,
):
assert source_name in ("market", "trading"), (
f"source_name 非法: {source_name!r},仅允许 'market' 或 'trading'"
)
self.ws_url = ws_url
self._source_name = source_name
注意:
ws_url与source_name必须置于所有可选参数之前。Python 不允许无默认值的位置参数出现在有默认值参数之后(会触发SyntaxError)。调用侧统一使用关键字传参(见 6.3.5),不受顺序影响。
6.2.2 事件发布
所有 _event_bus.publish(..., source=...) 必须使用 source=self._source_name。实现后执行 grep -r 'source=.*websocket' src 结果应为零。涉及位置包括:_on_open 两处、_on_close/_on_error 一处、_publish_candle_event、_publish_orderbook_event、_publish_price_event、_publish_user_events 内多处、_publish_order_status_events、_publish_fill_events。
config 导入:删除对 WS_URL 的 import,其余 WS_TIMEOUT、WS_MAX_RETRIES、WS_ALERT_THRESHOLD 等 WS_* 常量保留(由创建方传入的 ws_url 不再从 config 读取)。
删除清单:实现后全局搜索 source="websocket",确保结果为零。同步修改 src/events/base.py 中 Event 的 source 字段 docstring,仅保留「仅允许 market(行情 WS)或 trading(交易 WS)」,删除 websocket/http/internal 等旧语义。
6.3 src/services/realtime_kline_service_base.py
顶层导入
from src.config import WS_MARKET_URL, WS_TRADING_URL
6.3.1 属性初始化(__init__)
self.ws_trading_manager: Optional[EnhancedWebSocketManager] = None
self._trading_ws_thread: Optional[threading.Thread] = None
6.3.2 _build_subscriptions()
- 契约:仅返回 candle + l2Book 的订阅列表。
- 删除清单:不得包含 orderUpdates、userFills、user 的订阅构建;不得包含对
_executor/_wallet的访问;不得使用hasattr(..., '_executor')或getattr(..., '_executor', None)。
6.3.3 _build_trading_subscriptions()
def _build_trading_subscriptions(self) -> List[Dict]:
"""构建交易 WS 订阅列表(orderUpdates / userFills / user)。无可用地址时返回空列表。"""
if not self._trading_orchestrator:
return []
user_address = self._trading_orchestrator.get_trading_ws_user_address()
if not user_address:
return []
return [
{"type": "orderUpdates", "user": user_address},
{"type": "userFills", "user": user_address},
{"type": "user", "user": user_address},
]
6.3.4 订阅初始化(__init__)
self.subscriptions = self._build_subscriptions()
self.trading_subscriptions = self._build_trading_subscriptions()
6.3.5 _init_service_threads()
创建两个 manager 时均显式传入 ws_url 与 source_name:
def _init_service_threads(self):
# 行情 WS(主网固定,source="market")
self.ws_manager = EnhancedWebSocketManager(
subscriptions=self.subscriptions,
on_state_change=self.on_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_system_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
ws_url=WS_MARKET_URL,
source_name="market",
)
self.ws_manager.add_message_callback(self.on_message)
_set_market_ws_manager(self.ws_manager)
if self.trading_subscriptions:
self.ws_trading_manager = EnhancedWebSocketManager(
subscriptions=self.trading_subscriptions,
on_state_change=self._on_trading_state_change,
timeout=WS_TIMEOUT,
alert_callback=self._send_trading_alert,
max_retries=WS_MAX_RETRIES,
alert_threshold=WS_ALERT_THRESHOLD,
ws_url=WS_TRADING_URL,
source_name="trading",
)
# 不注册 message 回调:订单/用户数据仅由 _cache_latest_data 发布到 EventBus
self.logger.info(
f"🌐 WS 连接配置 | 行情: {WS_MARKET_URL} (主网固定)"
f" | 交易: {WS_TRADING_URL} (TRADING_NETWORK={os.getenv('TRADING_NETWORK','mainnet')})"
)
else:
self.logger.info("交易订阅为空,仅启动行情 WS")
约定:仅当 self.trading_subscriptions 非空时创建并启动交易 WS;不存在空订阅的 trading manager 或占位线程。交易 WS 仅在启动时根据当前 get_trading_ws_user_address() 决定是否创建;运行中若后续才挂载 Executor,不自动补建交易 WS(若需支持需单独设计)。
6.3.6 Trading WS 状态与告警
def _on_trading_state_change(self, state: str, info: Optional[str] = None):
self.logger.info(f"[交易WS] 状态变化: {state}" + (f" | {info}" if info else ""))
def _send_trading_alert(self, title: str, content: str):
self._send_system_alert(f"[交易WS] {title}", content)
6.3.7 on_message(msg)
- 契约:仅处理行情(去重 → K 线解析 → 入队)。
- 删除清单:不得包含
channel in ("orderUpdates", "userFills", "user")的判断;不得包含订单缓冲区写入/回放;不得调用 WebSocketOrderManager 或任何「订单原始消息」处理。
def on_message(self, msg: Dict):
if self._message_dedup.is_duplicate(msg):
return
kline = self._parse_kline(msg)
if not kline:
return
self.kline_buffer.put_nowait(kline)
# ... 后续仅与 K 线入队、分析入队相关,无 orderUpdates/userFills/user 分支
6.3.8 生命周期管理
start():先启动交易 WS 线程(若存在),再阻塞于行情 WS。
def start(self):
# 原有工作线程启动
if self.ws_trading_manager is not None:
self._trading_ws_thread = threading.Thread(
target=self.ws_trading_manager.start,
daemon=True,
name="trading-ws",
)
self._trading_ws_thread.start()
self.ws_manager.start()
stop():先停 orchestrator → 交易 WS(含 join)→ 行情 WS。
def stop(self):
if self._trading_orchestrator:
self._trading_orchestrator.stop()
if self.ws_trading_manager is not None:
self.ws_trading_manager.stop()
if self._trading_ws_thread is not None and self._trading_ws_thread.is_alive():
self._trading_ws_thread.join(timeout=5)
if self._trading_ws_thread.is_alive():
self.logger.warning("交易 WS 线程在 5s 内未退出")
self.ws_manager.stop()
# ... 其余等待队列清空等
6.3.9 动态订阅(新币种)
add_subscriptions 仅对 self.ws_manager 调用(只加 candle/l2Book),不涉及交易 WS。
6.4 src/trading/orchestrator.py
新增方法,供 RealtimeKlineServiceBase 构建交易 WS 订阅时调用:
def get_trading_ws_user_address(self) -> Optional[str]:
"""返回交易 WS 订阅所需的 user 地址;无 executor 或 wallet 时返回 None。"""
if self._executor is None:
return None
wallet = getattr(self._executor, "_wallet", None)
if wallet is None:
return None
return wallet.address
约束:RealtimeKlineServiceBase 不得直接访问 Orchestrator 的 _executor 或 _wallet,仅通过此接口获取地址。直接访问 self._executor._wallet.address 在 _wallet 为 None 时会抛 AttributeError,故改用 getattr 防御。
6.5 src/trading/executor.py
三个事件 handler 入口增加 source 过滤,仅处理 source="trading":
def _on_position_updated(self, event: PositionUpdatedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
def _on_balance_changed(self, event: BalanceChangedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
def _on_websocket_reconnected(self, event: WebSocketReconnectedEvent):
if event.source != "trading":
return
# ... 原有逻辑不变 ...
6.6 全局行情 WS 获取(原 get_global_ws_manager)
将 get_global_ws_manager / 模块内对全局 manager 的 setter 重命名为 get_market_ws_manager / _set_market_ws_manager。若当前实现为在 _init_service_threads() 内直接赋值(例如 _global_ws_manager = self.ws_manager),需抽成函数 _set_market_ws_manager(manager),并将模块级变量重命名为 _market_ws_manager,在创建行情 WS 后调用 _set_market_ws_manager(self.ws_manager)。所有调用处同步更新:调用处仅一处——src/trading/executor.py(原 get_global_ws_manager())。语义为「行情 WS(主网)」;禁止将返回值用于交易相关逻辑(仅用于 L2 缓存、K 线、get_all_mids() 等行情相关逻辑)。
7. 初始化时序
__init__
├── _init_trading_module() → TradingOrchestrator + Executor,Executor._subscribe_events() 订阅 EventBus
├── self.ws_trading_manager = None
├── self._trading_ws_thread = None
├── _build_subscriptions() → 仅 candle + l2Book
├── _build_trading_subscriptions() → 仅当 get_trading_ws_user_address() 返回地址时非空
└── _init_service_threads()
├── ws_manager(ws_url=WS_MARKET_URL, source_name="market")
└── ws_trading_manager(ws_url=WS_TRADING_URL, source_name="trading") [仅当 trading_subscriptions 非空]
start()
├── 工作线程启动
├── ws_trading_manager.start() [daemon 线程,若存在]
└── ws_manager.start() [阻塞主线程]
stop()
├── _trading_orchestrator.stop()
├── ws_trading_manager.stop() + _trading_ws_thread.join(timeout=5)
└── ws_manager.stop()
显式约束:EventBus 订阅必须先于任一 WS 的 start(),避免首包丢失。当前顺序 _init_trading_module()(Executor/WebSocketOrderManager 在此完成 _subscribe_events())→ _init_service_threads()(仅创建 manager,未 start())→ 用户调用 start() 时再启动 WS,已满足该约束;若未来调整初始化顺序,需重新评估。
8. 边界与注意事项
- 消息去重:
on_message的MessageDeduplicator仅作用于行情。Trading WS 无 message 回调,WebSocketOrderManager 内部有 oid 级去重。 - 并发与阻塞:
EnhancedWebSocketManager.start()为阻塞调用,故交易 WS 必须在独立 daemon 线程启动。stop()中先停交易 WS 并join(timeout=5),再停行情 WS,避免竞态。 - 连接失败:Trading WS 连接失败与现有重试与飞书告警一致,不阻塞行情 WS;订单依赖 HTTP 兜底。交易 WS 断开时,订单状态依赖现有 WebSocketOrderManager 的 HTTP 兜底(早期检查 + 超时后验证),本设计不新增轮询逻辑。
- mainnet=mainnet:
TRADING_NETWORK=mainnet时两 URL 相同,但仍维持两条独立的 WebSocket 连接(各自心跳、各自重连)。这是有意的设计选择:代码路径统一,行情/交易订阅隔离,测试/mainnet 行为一致。代价是多一条长连接,对服务端影响可忽略不计。 - 监控:market_ws_state 来自
on_state_change;trading_ws_state 来自_on_trading_state_change(带[交易WS]前缀);trading_ws_thread_alive 可在stop()的 join 超时处打 warning。两个 WS 共用同一套WS_ALERT_THRESHOLD(连续断连次数阈值)和WS_MAX_RETRIES配置;_send_trading_alert仅在标题前缀加[交易WS]以便在飞书消息中区分来源,告警触发逻辑与行情 WS 一致。 - 事件基类:
src/events/base.py中 Event 的source字段 docstring 仅保留「仅允许market(行情 WS)或trading(交易 WS)」,删除websocket/http/internal等旧语义。 - userFills 与 user.fills:两者均来自 Trading WS,但订阅者不同、职责不同,不存在重复处理。
userFills频道 →_publish_fill_events()→OrderFilledEvent→ WebSocketOrderManager 订阅(更新订单状态机,以oid为去重键,重复消息安全幂等)。user频道的fills字段 →_publish_user_events()→OrderFilledEvent→ Executor 订阅(刷新持仓/余额缓存)。- 两路事件的消费者不同,不会造成同一逻辑的重复执行。WebSocketOrderManager 不订阅
user.fills路径,Executor 不订阅userFills路径。
- 回滚:本设计不包含回退到单 WS 的步骤;若需回滚,需单独评估分支与数据流恢复方案。
9. 改动量汇总
| 文件 | 变更要点 |
|---|---|
src/config.py |
新增 WS_MARKET_URL、WS_TRADING_URL;删除 WS_URL |
src/utils/websocket/enhanced_ws_manager.py |
构造函数增加必选 ws_url、source_name;所有事件 source 使用 self._source_name;删除对 WS_URL 的 import,其余 WS_* 常量保留 |
src/trading/orchestrator.py |
新增 get_trading_ws_user_address() -> Optional[str] |
src/services/realtime_kline_service_base.py |
订阅拆分、_build_trading_subscriptions 仅调 get_trading_ws_user_address()、Trading WS 不注册 message 回调、on_message 仅行情、双 manager 生命周期;抽成 _set_market_ws_manager(manager) 并重命名模块变量 _market_ws_manager |
src/trading/executor.py |
三处 handler 增加 if event.source != "trading": return |
src/events/base.py |
将 Event 的 source 字段 docstring 从 websocket/http/internal 改为仅允许 market(行情 WS)或 trading(交易 WS) |
| 全局 | get_global_ws_manager → get_market_ws_manager,调用处同步修改(调用处仅一处:src/trading/executor.py) |
实现后禁止符号(grep 零结果)
| 禁止项 | 说明 |
|---|---|
WS_URL |
已由 WS_MARKET_URL / WS_TRADING_URL 替代,删除后不得残留 |
get_global_ws_manager、_set_global_ws_manager |
已重命名为 get_market_ws_manager / _set_market_ws_manager |
source="websocket"、source='websocket' |
事件 source 仅允许 "market" / "trading" |
RealtimeKlineServiceBase 中对 _executor、_wallet 的访问 |
仅通过 get_trading_ws_user_address() 获取地址 |
_on_trading_message、对交易 WS 的 add_message_callback |
Trading WS 不注册任何 message 回调 |
orderUpdates/userFills/user 出现在 on_message 或行情处理路径中 |
仅允许出现在订阅构建与 Trading WS 的 _cache_latest_data 中 |
_build_subscriptions() 内出现 orderUpdates/userFills/user 或对 _executor/_wallet 的访问 |
行情订阅仅 candle + l2Book,地址仅通过 get_trading_ws_user_address() 获取 |
10. 测试验证清单
功能测试
与 TRADING_ENABLED 的对应:当 TRADING_ENABLED=false 时,Orchestrator 不创建 Executor,get_trading_ws_user_address() 返回 None,trading_subscriptions 为空,故不创建交易 WS(与下表场景 5 对应)。
| # | 场景 | 预期 |
|---|---|---|
| 1 | TRADING_NETWORK=testnet |
行情 WS 连主网,交易 WS 连测试网 |
| 2 | 测试网下单后 | 能通过交易 WS 收到 orderUpdates/userFills(经 EventBus 到 WebSocketOrderManager) |
| 3 | 交易 WS 重连 | Executor 仅当 source=="trading" 时触发 verify_pending_orders |
| 4 | 行情 WS 重连 | Executor 不触发订单补查 |
| 5 | TRADING_ENABLED=false |
仅行情 WS,无交易 WS(无 Executor 故无交易订阅) |
| 6 | TRADING_NETWORK=mainnet |
两 WS 可连同一地址,功能正常 |
| 7 | 交易 WS 断开期间下单 | HTTP 兜底正常,重连后推送恢复 |
单元测试
| # | 测试点 | 验证方式 |
|---|---|---|
| T1 | source="market" 的 WebSocketReconnectedEvent |
Executor 不触发订单补查 |
| T2 | source="trading" 的 PositionUpdatedEvent |
Executor 缓存正确更新 |
| T3 | source="market" 的 PositionUpdatedEvent |
Executor 不更新 |
| T4 | Trading WS 不注册 message 回调 | 仅 _cache_latest_data 发布事件,无 add_message_callback |
| T5 | 事件 source 仅 market/trading |
无 source="websocket" 的发布或断言 |
压力/并发
| # | 场景 | 预期 |
|---|---|---|
| P1 | 两 WS 同时高频推送 | 无串扰,行情/交易各自处理 |
| P2 | stop() 时交易 WS 仍有消息 |
join(timeout=5) 后流程正常结束 |