双WebSocket架构设计13

双 WebSocket 架构设计文档(K 线主网 + 订单/用户推送随配置)

版本 日期 变更说明
v1.0 2026-02-22 初始版本
v1.1 2026-02-22 评审优化:校验改 ValueError、补充线程安全/重连订阅/容错/背压说明、完善测试清单

1. 目标与约束

背景:原单 WebSocket 连接主网;当 TRADING_NETWORK=testnet 时订单在测试网提交,但推送仍来自主网,导致收不到测试网 orderUpdates/userFills,持仓/余额数据与测试网不一致。本设计拆分为 Market WS(主网固定)+ Trading WS(随配置)解决该问题。

需求

  • K 线与 L2 订单簿:始终连接主网 WebSocket(wss://api.hyperliquid.xyz/ws),与 TRADING_NETWORK 无关。
  • orderUpdates / userFills / user:连接随 TRADING_NETWORK 变化的 WebSocket(testnet 时 wss://api.hyperliquid-testnet.xyz/ws,mainnet 时与主网一致)。

非目标:不改变现有交易 HTTP API、订单状态机与 WebSocketOrderManager 的对外行为。

实现原则(必守)

  • 按重写执行:相关方法按下文「契约与删除清单」重写,不以补丁方式实现;实现后单 WS 与订单同网的任何分支、注释零残留。
  • 不保留兼容路径:不保留「若未传 ws_url 则用全局 URL」「若未传 source_name 则用 websocket」的默认值;两处创建 EnhancedWebSocketManager 时必须显式传入 ws_urlsource_name
  • 事件 source 合法取值仅两种"market""trading"。实现后全局禁止出现 source="websocket"

安全默认值TRADING_NETWORK 默认值为 "mainnet",防止忘记配置时静默连到测试网下单。

WS_TRADING_URL 覆盖:若环境变量已设置 WS_TRADING_URL,则以其为准,不再按 TRADING_NETWORK 推导;未设置时才根据 TRADING_NETWORK 选择主网/测试网 URL。

架构范围:仅采用双 WebSocket(行情 WS + 交易 WS),不考虑单 WS 回滚。若未来需回滚到单 WS,需单独评估分支与数据流恢复方案,本设计不包含回滚步骤。


2. 术语与单 WS 的差异

术语 含义
Market WS 行情 WebSocket,固定连主网,仅订阅 candle、l2Book,source="market"
Trading WS 交易 WebSocket,随 TRADING_NETWORK/WS_TRADING_URL 变化,仅订阅 orderUpdates、userFills、user,source="trading"
source 事件来源标识,合法取值仅 "market""trading"
EventBus 全局单例事件总线(线程安全),订单/用户数据仅经此传递,不经过 WS message 回调

与单 WS 的差异:原架构为单连接同时承载行情与订单;本设计拆为两条连接——行情固定主网、订单随配置,所有事件带 source,订单与用户数据仅经 EventBus 由 Trading WS 发布,Executor/WebSocketOrderManager 仅消费 source="trading" 的事件。


3. 目标架构

以下为唯一实现形态;实现后不存在单 WS 分支、不存在按配置切换单/双 WS 的逻辑。

flowchart LR
  subgraph config [Config]
    WS_MARKET_URL["WS_MARKET_URL (主网固定)"]
    WS_TRADING_URL["WS_TRADING_URL (随 TRADING_NETWORK)"]
  end

  subgraph market [Market WS — 主网]
    Mgr1["EnhancedWebSocketManager\nsource='market'"]
    Mgr1 -->|订阅| Sub1[candle / l2Book]
    Mgr1 -->|WS_MARKET_URL| HL_M[Hyperliquid Mainnet]
  end

  subgraph trading [Trading WS — 随配置]
    Mgr2["EnhancedWebSocketManager\nsource='trading'"]
    Mgr2 -->|订阅| Sub2[orderUpdates / userFills / user]
    Mgr2 -->|WS_TRADING_URL| HL_T[Hyperliquid Testnet/Mainnet]
  end

  config --> Mgr1
  config --> Mgr2

  Mgr1 -->|on_message| Kline[K 线 + L2 处理]
  Mgr1 -->|EventBus source=market| EB[全局 EventBus]
  Mgr2 -->|_cache_latest_data 发布 无 message 回调| EB
  EB -->|OrderStatusEvent/OrderFilledEvent| OrderMgr[WebSocketOrderManager]
  EB -->|过滤 source=trading| Exec[Executor]

核心设计决策

  1. Market WS:仅订阅 candle、l2Book;URL 固定主网(WS_MARKET_URL);source="market"
  2. Trading WS:仅订阅 orderUpdates、userFills、user;URL 为 WS_TRADING_URLsource="trading"
  3. 订单/用户数据流仅通过 EventBus。orderUpdates/userFills 在 Trading WS 的 _cache_latest_data 内发布 OrderStatusEvent/OrderFilledEvent;WebSocketOrderManager 仅通过订阅上述事件接收,存在「原始 WS 消息 → handle_message」的调用。
  4. EventBus:保持全局单例;executor 的 handler 仅处理 source="trading" 的事件。

4. 数据流

4.1 行情(Market WS)

主网 WS → _wrapped_callback()
  1. _cache_latest_data() → CandleUpdatedEvent(source="market")、OrderBookUpdatedEvent(source="market") → EventBus + L2 缓存更新
  2. on_message() → K 线解析 → kline_buffer → 分析入队

get_market_ws_manager() 返回该 manager,L2 缓存基于主网数据,供 executor 的 get_all_mids() 等读取。

4.2 订单与用户(Trading WS)

交易网 WS → _wrapped_callback()
  → _cache_latest_data():
     - orderUpdates → _publish_order_status_events() → OrderStatusEvent(source="trading")   → EventBus → WebSocketOrderManager 订阅
     - userFills   → _publish_fill_events()          → OrderFilledEvent(source="trading")   → EventBus → WebSocketOrderManager 订阅
     - user        → _publish_user_events()          → PositionUpdatedEvent / BalanceChangedEvent(source="trading") → EventBus → Executor 过滤后处理

Trading WS 不注册任何 message 回调(不调用 add_message_callback),仅依赖 _cache_latest_data 发布事件。

EventBus 订阅顺序保证:Executor 与 WebSocketOrderManager 的 _subscribe_events() 均在 __init__ 阶段调用(先于 start()),而 Trading WS daemon 线程在 start() 中才启动并建立网络连接,故订阅必然先于第一条消息到达。此顺序由代码结构强制保证,无需额外缓冲或回放逻辑。若未来调整初始化顺序,需重新评估此假设。

4.3 事件发布全景

触发时机 事件类型 source 消费者
_on_open 首连时 WebSocketConnectedEvent market / trading 日志/监控
_on_open 重连时 WebSocketReconnectedEvent market / trading Executor(仅 trading)
_on_close 断连时 WebSocketDisconnectedEvent market / trading 日志/监控
user 频道 assetPositions PositionUpdatedEvent trading Executor
user 频道 fills OrderFilledEvent trading Executor(刷新持仓/余额缓存)
user 频道 marginSummary BalanceChangedEvent trading Executor
orderUpdates 频道 OrderStatusEvent trading WebSocketOrderManager
userFills 频道 OrderFilledEvent trading WebSocketOrderManager(oid 去重)
candle 频道 CandleUpdatedEvent market K 线处理
l2Book 频道 OrderBookUpdatedEvent market L2 缓存

4.4 重连与自动重订阅

重连订阅机制:EnhancedWebSocketManager 在 _on_open 中检测到重连时,会自动重新发送构造时传入的 subscriptions 列表。对于 Trading WS,这意味着 orderUpdates/userFills/user 订阅(含 user 地址)会被自动重发,无需外部干预。此行为由 EnhancedWebSocketManager 内部保证,两个 WS 实例各自独立重连、独立重订阅。

Trading WS 重连 → _on_open 自动重发 trading subscriptions
  → WebSocketReconnectedEvent(source="trading")
  → Executor._on_websocket_reconnected() 仅当 source=="trading" 时执行 → 触发 verify_pending_orders ✅

Market WS 重连 → _on_open 自动重发 market subscriptions
  → WebSocketReconnectedEvent(source="market")
  → Executor._on_websocket_reconnected() 因 source≠"trading" 直接 return,不触发订单补查 ✅

4.5 半连接降级

状态 系统行为
Market WS 断开 + Trading WS 正常 K 线/L2 缓存过期,executor 仍可下单(HTTP),L2 价格不更新;飞书告警 [行情WS]
Market WS 正常 + Trading WS 断开 K 线正常,订单推送回退 HTTP 兜底;飞书告警 [交易WS];持仓/余额依赖 HTTP
两者都断开 全面降级,所有功能依赖 HTTP;两个 WS 各自独立重连

5. 模块级设计

模块 职责变化
config 提供 WS_MARKET_URL(主网固定)、WS_TRADING_URL(随 TRADING_NETWORK);删除 WS_URL
EnhancedWebSocketManager 构造函数必选参数 ws_url: strsource_name: str(无默认值,置于可选参数之前);构造时以 ValueError 校验 source_name 合法性(不使用 assert,防止 -O 模式跳过);所有事件发布使用 source=self._source_name,合法取值仅 market / trading
TradingOrchestrator 新增 get_trading_ws_user_address(self) -> Optional[str]:有 executor 时返回钱包地址,否则返回 None
RealtimeKlineServiceBase 订阅拆分为行情/交易两套;on_message 仅处理 K 线;Trading WS 不注册 message 回调;「是否启用交易 WS」与 user 地址通过 TradingOrchestrator.get_trading_ws_user_address() 获取,不探测 _executor/_wallet;双 manager 生命周期管理。
Executor 三个事件 handler 入口增加 source=="trading" 过滤,否则直接 return。
get_market_ws_manager get_global_ws_manager 重命名,语义为「行情 WS(主网)」,禁止用于交易逻辑。

6. 文件级实现规格

实现时以方法名与下文契约/删除清单为准;行号仅作参考,若与当前代码不一致则以契约为准。

6.1 src/config.py

import os
import logging

# ---------- 行情 WebSocket(主网固定) ----------
WS_MARKET_URL = "wss://api.hyperliquid.xyz/ws"

# ---------- 交易 WebSocket(随 TRADING_NETWORK) ----------
_trading_network = os.getenv("TRADING_NETWORK", "mainnet").lower()
if _trading_network not in ("mainnet", "testnet"):
    logging.getLogger(__name__).warning(
        "TRADING_NETWORK=%s 非法,回退为 mainnet", os.getenv("TRADING_NETWORK")
    )
    _trading_network = "mainnet"

WS_TRADING_URL = os.getenv("WS_TRADING_URL") or (
    "wss://api.hyperliquid-testnet.xyz/ws"
    if _trading_network == "testnet"
    else "wss://api.hyperliquid.xyz/ws"
)

# 导出实际生效的网络标识,供日志等场景使用(避免各处重复 os.getenv + 校验)
TRADING_NETWORK = _trading_network

配置覆盖:若已设置 WS_TRADING_URL,则以其为准,不再按 TRADING_NETWORK 推导。

删除清单:删除 WS_URL;所有对 WS_URL 的引用改为 WS_MARKET_URL 并删除原变量。日志中引用网络标识时使用 config.TRADING_NETWORK,禁止在 config.py 之外直接 os.getenv("TRADING_NETWORK")(避免绕过校验逻辑导致日志与实际行为不一致)。


6.2 src/utils/websocket/enhanced_ws_manager.py

契约ws_urlsource_name必选参数,无默认值;事件 source 仅使用 self._source_name,合法取值仅 "market""trading";实现后全局禁止出现 source="websocket"

6.2.1 构造函数

def __init__(
    self,
    subscriptions: List[Dict],
    ws_url: str,           # 必选,无默认值;位于可选参数之前避免语法错误
    source_name: str,      # 必选,取值仅 "market" 或 "trading"
    on_state_change: Optional[Callable] = None,
    timeout: int = WS_TIMEOUT,
    skip_disconnects: bool = False,
    alert_callback: Optional[Callable] = None,
    max_retries: int | None = WS_MAX_RETRIES,
    alert_threshold: int = WS_ALERT_THRESHOLD,
):
    if source_name not in ("market", "trading"):
        raise ValueError(
            f"source_name 非法: {source_name!r},仅允许 'market' 或 'trading'"
        )
    self.ws_url = ws_url
    self._source_name = source_name

注意ws_urlsource_name 必须置于所有可选参数之前。Python 不允许无默认值的位置参数出现在有默认值参数之后(会触发 SyntaxError)。调用侧统一使用关键字传参(见 6.3.5),不受顺序影响。

校验选型:使用 ValueError 而非 assert。Python 以 -O 优化模式运行时会静默跳过所有 assert 语句,ValueError 在任何运行模式下均可靠触发。

6.2.2 事件发布

所有 _event_bus.publish(..., source=...) 必须使用 source=self._source_name。实现后执行 grep -r 'source=.*websocket' src 结果应为零。涉及位置包括:_on_open 两处、_on_close/_on_error 一处、_publish_candle_event_publish_orderbook_event_publish_price_event_publish_user_events 内多处、_publish_order_status_events_publish_fill_events

config 导入:删除对 WS_URL 的 import,其余 WS_TIMEOUTWS_MAX_RETRIESWS_ALERT_THRESHOLD 等 WS_* 常量保留(由创建方传入的 ws_url 不再从 config 读取)。

删除清单:实现后全局搜索 source="websocket",确保结果为零。同步修改 src/events/base.py 中 Event 的 source 字段 docstring,仅保留「仅允许 market(行情 WS)或 trading(交易 WS)」,删除 websocket/http/internal 等旧语义。


6.3 src/services/realtime_kline_service_base.py

顶层导入

from src.config import WS_MARKET_URL, WS_TRADING_URL, TRADING_NETWORK

6.3.1 属性初始化(__init__

self.ws_trading_manager: Optional[EnhancedWebSocketManager] = None
self._trading_ws_thread: Optional[threading.Thread] = None

6.3.2 _build_subscriptions()

  • 契约返回 candle + l2Book 的订阅列表。
  • 删除清单:不得包含 orderUpdates、userFills、user 的订阅构建;不得包含对 _executor/_wallet 的访问;不得使用 hasattr(..., '_executor')getattr(..., '_executor', None)

6.3.3 _build_trading_subscriptions()

def _build_trading_subscriptions(self) -> List[Dict]:
    """构建交易 WS 订阅列表(orderUpdates / userFills / user)。无可用地址时返回空列表。"""
    if not self._trading_orchestrator:
        return []
    user_address = self._trading_orchestrator.get_trading_ws_user_address()
    if not user_address:
        return []
    return [
        {"type": "orderUpdates", "user": user_address},
        {"type": "userFills",    "user": user_address},
        {"type": "user",         "user": user_address},
    ]

6.3.4 订阅初始化(__init__

self.subscriptions = self._build_subscriptions()
self.trading_subscriptions = self._build_trading_subscriptions()

6.3.5 _init_service_threads()

创建两个 manager 时均显式传入 ws_urlsource_name

def _init_service_threads(self):
    # 行情 WS(主网固定,source="market")
    self.ws_manager = EnhancedWebSocketManager(
        subscriptions=self.subscriptions,
        on_state_change=self.on_state_change,
        timeout=WS_TIMEOUT,
        alert_callback=self._send_system_alert,
        max_retries=WS_MAX_RETRIES,
        alert_threshold=WS_ALERT_THRESHOLD,
        ws_url=WS_MARKET_URL,
        source_name="market",
    )
    self.ws_manager.add_message_callback(self.on_message)
    _set_market_ws_manager(self.ws_manager)

    if self.trading_subscriptions:
        try:
            self.ws_trading_manager = EnhancedWebSocketManager(
                subscriptions=self.trading_subscriptions,
                on_state_change=self._on_trading_state_change,
                timeout=WS_TIMEOUT,
                alert_callback=self._send_trading_alert,
                max_retries=WS_MAX_RETRIES,
                alert_threshold=WS_ALERT_THRESHOLD,
                ws_url=WS_TRADING_URL,
                source_name="trading",
            )
            # 不注册 message 回调:订单/用户数据仅由 _cache_latest_data 发布到 EventBus
            self.logger.info(
                f"🌐 WS 连接配置 | 行情: {WS_MARKET_URL} (主网固定)"
                f" | 交易: {WS_TRADING_URL} (TRADING_NETWORK={TRADING_NETWORK})"
            )
        except Exception as e:
            self.logger.error(f"交易 WS 创建失败,降级为仅行情模式: {e}")
            self.ws_trading_manager = None
            self._send_system_alert(
                "[交易WS] 创建失败",
                f"Trading WS 初始化异常: {e},订单推送将完全依赖 HTTP 兜底",
            )
    else:
        self.logger.info("交易订阅为空,仅启动行情 WS")

约定:仅当 self.trading_subscriptions 非空时创建并启动交易 WS;不存在空订阅的 trading manager 或占位线程。Trading WS 创建失败时降级为仅行情模式,不阻塞 Market WS 启动。

6.3.6 Trading WS 状态与告警

def _on_trading_state_change(self, state: str, info: Optional[str] = None):
    self.logger.info(f"[交易WS] 状态变化: {state}" + (f" | {info}" if info else ""))

def _send_trading_alert(self, title: str, content: str):
    self._send_system_alert(f"[交易WS] {title}", content)

6.3.7 on_message(msg)

  • 契约处理行情(去重 → K 线解析 → 入队)。
  • 删除清单:不得包含 channel in ("orderUpdates", "userFills", "user") 的判断;不得包含订单缓冲区写入/回放;不得调用 WebSocketOrderManager 或任何「订单原始消息」处理。
def on_message(self, msg: Dict):
    if self._message_dedup.is_duplicate(msg):
        return
    kline = self._parse_kline(msg)
    if not kline:
        return
    self.kline_buffer.put_nowait(kline)
    # ... 后续仅与 K 线入队、分析入队相关,无 orderUpdates/userFills/user 分支

6.3.8 生命周期管理

start():先启动交易 WS 线程(若存在),再阻塞于行情 WS。

def start(self):
    # 原有工作线程启动
    if self.ws_trading_manager is not None:
        self._trading_ws_thread = threading.Thread(
            target=self.ws_trading_manager.start,
            daemon=True,
            name="trading-ws",
        )
        self._trading_ws_thread.start()
    self.ws_manager.start()

stop():先停 orchestrator → 交易 WS(含 join)→ 行情 WS。

def stop(self):
    if self._trading_orchestrator:
        self._trading_orchestrator.stop()
    if self.ws_trading_manager is not None:
        self.ws_trading_manager.stop()
        if self._trading_ws_thread is not None and self._trading_ws_thread.is_alive():
            self._trading_ws_thread.join(timeout=5)
            if self._trading_ws_thread.is_alive():
                self.logger.warning("交易 WS 线程在 5s 内未退出")
    self.ws_manager.stop()
    # ... 其余等待队列清空等

6.3.9 动态订阅(新币种)

add_subscriptions 仅对 self.ws_manager 调用(只加 candle/l2Book),不涉及交易 WS。


6.4 src/trading/orchestrator.py

新增方法,供 RealtimeKlineServiceBase 构建交易 WS 订阅时调用:

def get_trading_ws_user_address(self) -> Optional[str]:
    """返回交易 WS 订阅所需的 user 地址;无 executor 或 wallet 时返回 None。"""
    if self._executor is None:
        return None
    wallet = getattr(self._executor, "_wallet", None)
    if wallet is None:
        return None
    return wallet.address

约束:RealtimeKlineServiceBase 不得直接访问 Orchestrator 的 _executor_wallet,仅通过此接口获取地址。直接访问 self._executor._wallet.address_walletNone 时会抛 AttributeError,故改用 getattr 防御。


6.5 src/trading/executor.py

三个事件 handler 入口增加 source 过滤,仅处理 source="trading"

def _on_position_updated(self, event: PositionUpdatedEvent):
    if event.source != "trading":
        return
    # ... 原有逻辑不变 ...

def _on_balance_changed(self, event: BalanceChangedEvent):
    if event.source != "trading":
        return
    # ... 原有逻辑不变 ...

def _on_websocket_reconnected(self, event: WebSocketReconnectedEvent):
    if event.source != "trading":
        return
    # ... 原有逻辑不变 ...

6.6 全局行情 WS 获取(原 get_global_ws_manager)

get_global_ws_manager / 模块内对全局 manager 的 setter 重命名为 get_market_ws_manager / _set_market_ws_manager。若当前实现为在 _init_service_threads() 内直接赋值(例如 _global_ws_manager = self.ws_manager),需抽成函数 _set_market_ws_manager(manager),并将模块级变量重命名为 _market_ws_manager,在创建行情 WS 后调用 _set_market_ws_manager(self.ws_manager)。所有调用处同步更新:调用处仅一处——src/trading/executor.py(原 get_global_ws_manager())。语义为「行情 WS(主网)」;禁止将返回值用于交易相关逻辑(仅用于 L2 缓存、K 线、get_all_mids() 等行情相关逻辑)。


7. 初始化时序

__init__
  ├── _init_trading_module()             → TradingOrchestrator + Executor,Executor._subscribe_events() 订阅 EventBus
  ├── self.ws_trading_manager = None
  ├── self._trading_ws_thread = None
  ├── _build_subscriptions()            → 仅 candle + l2Book
  ├── _build_trading_subscriptions()    → 仅当 get_trading_ws_user_address() 返回地址时非空
  └── _init_service_threads()
            ├── ws_manager(ws_url=WS_MARKET_URL, source_name="market")
            └── ws_trading_manager(ws_url=WS_TRADING_URL, source_name="trading")  [仅当 trading_subscriptions 非空]

start()
  ├── 工作线程启动
  ├── ws_trading_manager.start()  [daemon 线程,若存在]
  └── ws_manager.start()          [阻塞主线程]

stop()
  ├── _trading_orchestrator.stop()
  ├── ws_trading_manager.stop() + _trading_ws_thread.join(timeout=5)
  └── ws_manager.stop()

显式约束:EventBus 订阅必须先于任一 WS 的 start(),避免首包丢失。当前顺序 _init_trading_module()(Executor/WebSocketOrderManager 在此完成 _subscribe_events())→ _init_service_threads()(仅创建 manager,未 start())→ 用户调用 start() 时再启动 WS,已满足该约束;若未来调整初始化顺序,需重新评估。


8. 边界与注意事项

  • EventBus 线程安全:Trading WS 在 daemon 线程中运行,_cache_latest_data 在该线程内调用 EventBus.publish(),而消费者(Executor、WebSocketOrderManager)在主线程或其他工作线程中处理事件。要求 EventBus 的 publish/subscribe/unsubscribe 操作必须线程安全(内部使用 threading.Lock 保护订阅者列表)。若当前 EventBus 实现未满足此要求,须在本次改动中补齐。
  • EventBus 背压策略:当前 EventBus 采用同步 publish——在发布线程内依次调用所有订阅者的 handler。这意味着消费者的处理延迟会直接阻塞 Trading WS 的 _cache_latest_data。对于当前订单频率(秒级),同步模型可接受。若未来需支持高频场景(毫秒级大量 orderUpdates),应考虑引入异步队列(如 queue.Queue + 消费线程)解耦发布与消费,并设置队列上限防止内存溢出。本次不实现异步化,仅记录此约束。
  • 消息去重on_messageMessageDeduplicator 仅作用于行情。Trading WS 无 message 回调,WebSocketOrderManager 内部有 oid 级去重。
  • 并发与阻塞EnhancedWebSocketManager.start() 为阻塞调用,故交易 WS 必须在独立 daemon 线程启动。stop() 中先停交易 WS 并 join(timeout=5),再停行情 WS,避免竞态。
  • 连接失败与创建失败:Trading WS 连接失败(运行时)与现有重试与飞书告警一致,不阻塞行情 WS;订单依赖 HTTP 兜底。Trading WS 创建失败(初始化时)会降级为仅行情模式并发送飞书告警(见 6.3.5)。交易 WS 断开时,订单状态依赖现有 WebSocketOrderManager 的 HTTP 兜底(早期检查 + 超时后验证),本设计不新增轮询逻辑。
  • mainnet=mainnetTRADING_NETWORK=mainnet 时两 URL 相同,但仍维持两条独立的 WebSocket 连接(各自心跳、各自重连)。这是有意的设计选择:代码路径统一,行情/交易订阅隔离,测试/mainnet 行为一致。代价是多一条长连接,对服务端影响可忽略不计。
  • 监控:market_ws_state 来自 on_state_change;trading_ws_state 来自 _on_trading_state_change(带 [交易WS] 前缀);trading_ws_thread_alive 可在 stop() 的 join 超时处打 warning。两个 WS 共用同一套 WS_ALERT_THRESHOLD(连续断连次数阈值)和 WS_MAX_RETRIES 配置;_send_trading_alert 仅在标题前缀加 [交易WS] 以便在飞书消息中区分来源,告警触发逻辑与行情 WS 一致。
  • 事件基类src/events/base.py 中 Event 的 source 字段 docstring 仅保留「仅允许 market(行情 WS)或 trading(交易 WS)」,删除 websocket/http/internal 等旧语义。
  • userFills 与 user.fills:两者均来自 Trading WS,但订阅者不同、职责不同,不存在重复处理
    • userFills 频道 → _publish_fill_events()OrderFilledEventWebSocketOrderManager 订阅(更新订单状态机,以 oid 为去重键,重复消息安全幂等)。
    • user 频道的 fills 字段 → _publish_user_events()OrderFilledEventExecutor 订阅(刷新持仓/余额缓存)。
    • 两路事件的消费者不同,不会造成同一逻辑的重复执行。WebSocketOrderManager 不订阅 user.fills 路径,Executor 不订阅 userFills 路径。
  • 回滚:本设计不包含回退到单 WS 的步骤;若需回滚,需单独评估分支与数据流恢复方案。

已知限制

  • 动态挂载 Executor 不自动补建交易 WS:交易 WS 仅在 __init___build_trading_subscriptions() 时根据 get_trading_ws_user_address() 决定是否创建。若系统启动时无 Executor(TRADING_ENABLED=false),后续运行中动态挂载 Executor 时,不会自动补建交易 WS。如需支持此场景,需单独设计热加载机制(如监听 Executor 挂载事件 → 动态创建 Trading WS manager + 启动 daemon 线程),本次不实现。

9. 改动量汇总

文件 变更要点
src/config.py 新增 WS_MARKET_URLWS_TRADING_URLTRADING_NETWORK(实际生效值);删除 WS_URL
src/utils/websocket/enhanced_ws_manager.py 构造函数增加必选 ws_urlsource_name;所有事件 source 使用 self._source_name;删除对 WS_URL 的 import,其余 WS_* 常量保留
src/trading/orchestrator.py 新增 get_trading_ws_user_address() -> Optional[str]
src/services/realtime_kline_service_base.py 订阅拆分、_build_trading_subscriptions 仅调 get_trading_ws_user_address()、Trading WS 不注册 message 回调、on_message 仅行情、双 manager 生命周期;抽成 _set_market_ws_manager(manager) 并重命名模块变量 _market_ws_manager
src/trading/executor.py 三处 handler 增加 if event.source != "trading": return
src/events/base.py 将 Event 的 source 字段 docstring 从 websocket/http/internal 改为仅允许 market(行情 WS)或 trading(交易 WS)
全局 get_global_ws_managerget_market_ws_manager,调用处同步修改(调用处仅一处src/trading/executor.py

实现后禁止符号(grep 零结果)

禁止项 说明
WS_URL 已由 WS_MARKET_URL / WS_TRADING_URL 替代,删除后不得残留
get_global_ws_manager_set_global_ws_manager 已重命名为 get_market_ws_manager / _set_market_ws_manager
source="websocket"source='websocket' 事件 source 仅允许 "market" / "trading"
RealtimeKlineServiceBase 中对 _executor_wallet 的访问 仅通过 get_trading_ws_user_address() 获取地址
_on_trading_message、对交易 WS 的 add_message_callback Trading WS 不注册任何 message 回调
orderUpdates/userFills/user 出现在 on_message 或行情处理路径中 仅允许出现在订阅构建与 Trading WS 的 _cache_latest_data
_build_subscriptions() 内出现 orderUpdates/userFills/user 或对 _executor/_wallet 的访问 行情订阅仅 candle + l2Book,地址仅通过 get_trading_ws_user_address() 获取
assert source_name 校验必须使用 ValueError,禁止 assert-O 模式会跳过)
config.py 之外的 os.getenv("TRADING_NETWORK") 统一使用 config.TRADING_NETWORK,避免绕过校验逻辑

10. 测试验证清单

功能测试

与 TRADING_ENABLED 的对应:当 TRADING_ENABLED=false 时,Orchestrator 不创建 Executor,get_trading_ws_user_address() 返回 Nonetrading_subscriptions 为空,故不创建交易 WS(与下表场景 5 对应)。

# 场景 预期
1 TRADING_NETWORK=testnet 行情 WS 连主网,交易 WS 连测试网
2 测试网下单后 能通过交易 WS 收到 orderUpdates/userFills(经 EventBus 到 WebSocketOrderManager)
3 交易 WS 重连 自动重订阅 + Executor 仅当 source=="trading" 时触发 verify_pending_orders
4 行情 WS 重连 自动重订阅 candle/l2Book + Executor 不触发订单补查
5 TRADING_ENABLED=false 仅行情 WS,无交易 WS(无 Executor 故无交易订阅)
6 TRADING_NETWORK=mainnet 两 WS 可连同一地址,功能正常
7 交易 WS 断开期间下单 HTTP 兜底正常,重连后推送恢复
8 WS_TRADING_URL 显式设置 覆盖 TRADING_NETWORK 推导,使用指定 URL
9 TRADING_NETWORK=invalid 回退为 mainnet 并输出 warning 日志
10 Trading WS 构造异常 Market WS 正常启动,飞书告警通知降级

单元测试

# 测试点 验证方式
T1 source="market"WebSocketReconnectedEvent Executor 不触发订单补查
T2 source="trading"PositionUpdatedEvent Executor 缓存正确更新
T3 source="market"PositionUpdatedEvent Executor 不更新
T4 Trading WS 不注册 message 回调 _cache_latest_data 发布事件,无 add_message_callback
T5 事件 source 仅 market/trading source="websocket" 的发布或断言
T6 source_name 传入非法值 ValueError 抛出(非 AssertionError),-O 模式下同样生效
T7 EventBus 多线程并发发布 两个 WS 线程同时 publish 无竞态、无事件丢失
T8 Trading WS 构造函数抛异常 ws_trading_manager 为 None,Market WS 正常启动
T9 TRADING_NETWORK 配置导出值 config.TRADING_NETWORK 与实际生效网络一致(含非法值回退)

压力/并发

# 场景 预期
P1 两 WS 同时高频推送 无串扰,行情/交易各自处理,EventBus 无死锁
P2 stop() 时交易 WS 仍有消息 join(timeout=5) 后流程正常结束
P3 Trading WS 连续快速重连 每次重连自动重订阅,不产生重复订阅

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG