双WebSocket架构设计5

双 WebSocket 架构设计文档(K 线主网 + 订单/用户推送随配置)


1. 目标与约束

需求

  • K 线与 L2 订单簿:始终连接主网 WebSocket(wss://api.hyperliquid.xyz/ws),与 TRADING_NETWORK 无关。
  • orderUpdates / userFills / user:连接随 TRADING_NETWORK 变化的 WebSocket(testnet 时 wss://api.hyperliquid-testnet.xyz/ws,mainnet 时与主网一致),与下单/查单网络保持一致。

非目标

  • 不改变现有交易 HTTP API、订单跟踪逻辑、WebSocketOrderManager 行为;仅改变「订单/用户推送」的数据来源连接。

安全默认值

  • TRADING_NETWORK 默认值为 "mainnet",防止忘记配置时静默连到测试网下单。

架构范围:本架构仅采用双 WebSocket(行情 WS + 交易 WS),不考虑单 WS 回滚。

背景:当前为单 WebSocket 连接主网;当 TRADING_NETWORK=testnet 时订单在测试网提交,但订单/用户推送仍来自主网连接,导致收不到测试网 orderUpdates/userFills,持仓/余额数据也与测试网不一致。本设计通过拆分为 Market WS(主网固定)+ Trading WS(随配置)解决该问题。


2. 现状分析

行号说明:本节与第 6 节中的行号为编写时参考;实现时以方法名/逻辑块为准,若与当前代码不一致则以代码为准。

2.1 单 WebSocket 架构(当前)

  • src/config.py:104WS_URL = "wss://api.hyperliquid.xyz/ws",写死主网。
  • src/services/realtime_kline_service_base.py_build_subscriptions()(第 401~459 行)一次性构建:candle + l2Book +(若启用交易)orderUpdates、userFills、user。
  • _init_service_threads()(第 306~314 行):只创建一个 EnhancedWebSocketManager(self.subscriptions, ...),并 add_message_callback(self.on_message)
  • src/utils/websocket/enhanced_ws_manager.py:275self.ws_url = WS_URL,无参数化。

因此:K 线、L2、订单推送共用一个主网连接。当 TRADING_NETWORK=testnet 时,订单在测试网下单,但推送仍来自主网连接,导致:

  • 收不到测试网订单的 orderUpdates/userFills,只能依赖 HTTP 兜底
  • user channel 的持仓/余额是主网数据,与测试网实际账户不一致

2.2 消息处理调用链(实测)

通过阅读源码确认,EnhancedWebSocketManager._wrapped_callback() 的实际调用顺序为:

WebSocket 收到消息
  → _on_message() [解析 JSON]
  → _wrapped_callback()
      1. health_monitor.on_message()           # 健康监控
      2. _cache_latest_data(msg)               # 缓存数据 + 发布 EventBus 事件  ← 先
      3. for cb in message_callbacks: cb(msg)  # 触发外部回调                  ← 后

关键结论:当外部回调(on_message / _on_trading_message)被调用时,_cache_latest_data 已经执行完毕,EventBus 事件已经发布。因此:

  • user channel 消息:EventBus 已发布 PositionUpdatedEvent / OrderFilledEvent / BalanceChangedEvent,外部回调无需重复处理。
  • candle / l2Book 消息:EventBus 已发布对应事件,on_message 负责的是业务逻辑(K 线入队分析),而不是事件发布。

2.3 事件发布全景

EnhancedWebSocketManager 通过 _cache_latest_data()_on_open() 发布以下事件到全局 EventBus:

触发时机 事件类型 关键字段
_on_open 重连时 WebSocketReconnectedEvent downtime_seconds, source
_on_open 首连时 WebSocketConnectedEvent connection_id, source
_on_close 断连时 WebSocketDisconnectedEvent reason, reconnect_scheduled
user 频道 assetPositions PositionUpdatedEvent positions, account_value, margin_summary
user 频道 fills OrderFilledEvent order_id, filled_qty, filled_price, fee
user 频道 marginSummary BalanceChangedEvent available_balance, total_balance, margin_used
candle 频道 CandleUpdatedEvent K 线数据
l2Book 频道 OrderBookUpdatedEvent 订单簿数据

所有事件的 source 字段当前固定为 "websocket"。双 WS 架构下需要区分来源:Market WS 发布的事件 source="market",Trading WS 发布的事件 source="trading"

OrderFilledEvent 当前由 EnhancedWebSocketManager 发布;若未来其他模块订阅该事件,同样须仅处理 source="trading",避免误用行情连接来源。

2.4 缓冲区回放机制(实测)

_order_msg_bufferdeque(maxlen=500),当前(改造前)已有两套回放机制:

  1. 即时回放on_message() 每次被调用时,检查缓冲区并尝试回放(利用任意新消息到达的时机)。改造后此职责转移至 _on_trading_messageon_message 不再涉及订单消息(见第 6.3.8 节)。
  2. 周期回放_periodic_buffer_flush() 独立线程,每 2 秒主动检查一次缓冲区。

双 WS 拆分后,_order_msg_buffer 是 service 级别的共享属性,与哪个 WS 收到消息无关。_buffer_flush_thread 无需复制,仍可服务于交易 WS 的缓冲区。

2.5 userFills 与 user.fills 重叠分析

当前订阅了两个相关频道:

  • userFills channel → on_message()WebSocketOrderManager.handle_message() → 内部订单状态更新
  • user channel(含 fills 字段) → _cache_latest_data()_publish_user_events()OrderFilledEvent(已发布,供未来订阅方使用);Executor 的缓存刷新主要由 PositionUpdatedEvent / BalanceChangedEvent 驱动

两者不重叠

  • userFills 是 WebSocketOrderManager 使用的原始成交流水,用于维护订单状态机。
  • user.fills 是账户级聚合推送,触发 Executor 的持仓/余额缓存刷新。

双 WS 架构下,两者都从 Trading WS 来,处理路径保持不变,无重复处理问题

2.6 EventBus 单例

EventBus线程安全单例src/events/event_bus.py:18-30):

class EventBus:
    _instance: "EventBus | None" = None
    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance

影响

  • EnhancedWebSocketManager.__init__ 第 294 行:self._event_bus = EventBus() → 拿到全局单例
  • Executor.__init__ 第 86 行:self._event_bus = EventBus() → 拿到同一个单例
  • 即:所有 WS manager 和 executor 共享同一个 EventBus,不存在"两条总线独立"的问题
  • EventBus.subscribe() 自带同一 handler 去重(第 51 行:if handler in handlers: return),不会因重复调用而注册多次

结论:无需"注册第二个 bus";只需通过事件 source 字段区分来源。executor 的 _subscribe_events() 已在全局单例上订阅,天然能收到两个 WS manager 发布的事件。


3. 目标架构

flowchart LR
  subgraph config [Config]
    WS_URL["WS_URL (主网固定)"]
    WS_TRADING_URL["WS_TRADING_URL (随 TRADING_NETWORK)"]
  end

  subgraph market [Market WS — 主网]
    Mgr1["EnhancedWebSocketManager\nsource='market'"]
    Mgr1 -->|订阅| Sub1[candle / l2Book]
    Mgr1 -->|wss://api.hyperliquid.xyz/ws| HL_M[Hyperliquid Mainnet]
  end

  subgraph trading [Trading WS — 随配置]
    Mgr2["EnhancedWebSocketManager\nsource='trading'"]
    Mgr2 -->|订阅| Sub2[orderUpdates / userFills / user]
    Mgr2 -->|WS_TRADING_URL| HL_T[Hyperliquid Testnet/Mainnet]
  end

  config --> Mgr1
  config --> Mgr2

  Mgr1 -->|on_message| Kline[K 线 + L2 处理]
  Mgr1 -->|EventBus source=market| EB[全局 EventBus]
  Mgr2 -->|_on_trading_message| OrderMgr[WebSocketOrderManager]
  Mgr2 -->|EventBus source=trading| EB
  EB -->|过滤 source=trading| Exec[Executor]

核心设计决策

  1. Market WS:仅订阅 candle、l2Book;URL 固定主网;source="market"
  2. Trading WS:仅订阅 orderUpdates、userFills、user;URL 为 WS_TRADING_URLsource="trading"
  3. EventBus 不改:保持全局单例,用事件 source 字段区分来源;executor 的事件 handler 仅处理 source="trading" 的事件。
  4. 无需 register_trading_ws_event_bus:单例 EventBus 天然共享,无需额外注册。

4. 数据流

4.1 行情(Market WS)

主网 WS → _wrapped_callback()
  1. _cache_latest_data() → CandleUpdatedEvent(source="market")、OrderBookUpdatedEvent(source="market") → EventBus + L2 缓存更新
  2. on_message() → K 线解析 → kline_buffer → 分析入队

职责区分:L2 由 Market WS 的 _cache_latest_data 发布 OrderBookUpdatedEvent 并更新缓存;on_message 仅负责 K 线解析与入队。

get_global_ws_manager() 仍返回该 manager,L2 缓存基于主网数据,供 executor 的 get_all_mids() 等读取。

4.2 订单与用户(Trading WS)

交易网 WS → _wrapped_callback()
  1. _cache_latest_data():
     - orderUpdates/userFills → 无 EventBus 事件(由外部回调处理)
     - user channel → _publish_user_events():
         PositionUpdatedEvent(source="trading") → EventBus → Executor(过滤通过)
         OrderFilledEvent(source="trading")    → EventBus → Executor(过滤通过)
         BalanceChangedEvent(source="trading") → EventBus → Executor(过滤通过)
  2. _on_trading_message():
     - user channel → 已由步骤1处理,此处显式跳过(避免重复)
     - 缓冲区即时回放(_periodic_buffer_flush 每 2 秒兜底)
     - orderUpdates/userFills → WebSocketOrderManager.handle_message()

4.3 重连

Trading WS 重连 → WebSocketReconnectedEvent(source="trading")
  → Executor._on_websocket_reconnected() 检查 source=="trading"
  → 触发 verify_pending_orders 双轮补查 ✅

Market WS 重连 → WebSocketReconnectedEvent(source="market")
  → Executor._on_websocket_reconnected() 检查 source≠"trading"
  → 跳过,不触发订单补查 ✅

4.4 半连接降级

状态 系统行为
Market WS 断开 + Trading WS 正常 K 线/L2 缓存过期,executor 仍可下单(HTTP),L2 价格不更新;飞书告警 [行情WS] 断连
Market WS 正常 + Trading WS 断开 K 线正常,订单推送回退 HTTP 兜底;飞书告警 [交易WS] 断连;持仓/余额依赖 HTTP 查询
两者都断开 全面降级,所有功能依赖 HTTP;两个 WS 各自独立重连

5. 模块级设计

模块 职责变化
config 新增 WS_TRADING_URL(随 TRADING_NETWORK 推导)
EnhancedWebSocketManager 在现有参数末尾新增可选 ws_urlsource_name 参数;source_name 用于所有事件发布
RealtimeKlineServiceBase 拆分订阅;交易 WS 消息单独回调;双 manager 生命周期管理
Executor 三个事件 handler 增加 source 过滤,仅处理 source="trading" 的事件

6. 文件级实现规格

行号说明:本节中的行号为编写时参考;实现时以方法名/逻辑块为准,若与当前代码不一致则以代码为准。

6.1 src/config.py

位置:WebSocket 配置块(第 100~120 行)之后。

# 交易推送 WebSocket 地址(随 TRADING_NETWORK 切换)
_trading_network = os.getenv("TRADING_NETWORK", "mainnet").lower()
if _trading_network not in ("mainnet", "testnet"):
    import logging
    logging.getLogger(__name__).warning(
        "TRADING_NETWORK=%s 非法,回退为 mainnet", os.getenv("TRADING_NETWORK")
    )
    _trading_network = "mainnet"
WS_TRADING_URL = os.getenv("WS_TRADING_URL") or (
    "wss://api.hyperliquid-testnet.xyz/ws"
    if _trading_network == "testnet"
    else "wss://api.hyperliquid.xyz/ws"
)

说明

  • TRADING_NETWORK 默认 "mainnet"(安全默认值,避免忘记配置时静默连测试网下单)。
  • 非法值(非 mainnet/testnet)时打 warning 并回退为 mainnet,保证实现一致。
  • WS_TRADING_URL 支持环境变量直接覆盖(便于特殊部署/调试)。
  • 行数:约 +10。

6.2 src/utils/websocket/enhanced_ws_manager.py

6.2.1 构造函数扩展

位置:__init__ 签名(第 229 行)与 self.ws_url 赋值(第 275 行)。在现有参数列表末尾(含 skip_disconnects 等)新增 ws_urlsource_name

def __init__(
    self,
    subscriptions: List[Dict],
    on_state_change: Optional[Callable] = None,
    timeout: int = WS_TIMEOUT,
    skip_disconnects: bool = False,
    alert_callback: Optional[Callable] = None,
    max_retries: int | None = WS_MAX_RETRIES,
    alert_threshold: int = WS_ALERT_THRESHOLD,
    ws_url: Optional[str] = None,        # ← 新增
    source_name: str = "websocket",      # ← 新增
):
    ...
    self.ws_url = ws_url if ws_url is not None else WS_URL   # ← 修改
    self._source_name = source_name                          # ← 新增
    ...
    logger.info(
        f"✅ EnhancedWebSocketManager 已初始化 | "
        f"source={self._source_name} | url={self.ws_url}"
    )

6.2.2 事件 source 字段参数化

将所有 self._event_bus.publish(...) 中的 source="websocket" 替换为 source=self._source_name

方法 约行号 事件类型
_on_open 638 WebSocketReconnectedEvent
_on_open 646 WebSocketConnectedEvent
_on_close/_on_error ~738 WebSocketDisconnectedEvent
_cache_latest_data ~1346 CandleUpdatedEvent
_cache_latest_data ~1366 OrderBookUpdatedEvent
_cache_latest_data ~1380 PriceUpdatedEvent
_publish_user_events 1401 PositionUpdatedEvent
_publish_user_events 1413 OrderFilledEvent
_publish_user_events 1435 BalanceChangedEvent

共 9 处,均为单行替换:source="websocket"source=self._source_name

行数:约 +4 行新增,9 行修改。


6.3 src/services/realtime_kline_service_base.py

6.3.1 属性初始化(__init__ 第 193 行附近)

__init__总是初始化以下属性,避免后续 hasattr 检查:

# 双 WS 架构(总是初始化,有交易订阅时创建交易 WS 实例)
self.ws_trading_manager: Optional[EnhancedWebSocketManager] = None
self._trading_ws_thread: Optional[threading.Thread] = None

6.3.2 订阅拆分

_build_subscriptions()(第 401 行):删除第 443~457 行的交易订阅逻辑,仅保留 candle + l2Book。

新增 _build_trading_subscriptions() -> List[Dict]

def _build_trading_subscriptions(self) -> List[Dict]:
    """构建交易 WS 订阅列表(orderUpdates / userFills / user)。

    仅当交易模块启用时返回非空列表。
    """
    from src.config import WS_TRADING_URL

    try:
        executor = self._trading_orchestrator._executor
    except AttributeError:
        executor = None

    if executor is None:
        self.logger.info("交易模块未启用,跳过交易 WS 订阅构建")
        return []

    user_address = executor._wallet.address
    subs = [
        {"type": "orderUpdates", "user": user_address},
        {"type": "userFills",    "user": user_address},
        {"type": "user",         "user": user_address},
    ]
    self.logger.info(
        f"✅ 交易 WS 订阅已构建 | url={WS_TRADING_URL} | "
        f"user={user_address[:10]}... | 订阅数={len(subs)}"
    )
    return subs

6.3.3 __init__ 订阅初始化(第 207 行附近)

self.subscriptions = self._build_subscriptions()
self.trading_subscriptions = self._build_trading_subscriptions()

self.logger.info(
    f"订阅数量 | 行情={len(self.subscriptions)} / 交易={len(self.trading_subscriptions)}"
)

6.3.4 _init_service_threads()(第 277 行)

def _init_service_threads(self):
    from src.config import WS_TRADING_URL

    # —— 行情 WS(主网固定,source="market")——
    self.ws_manager = EnhancedWebSocketManager(
        subscriptions=self.subscriptions,
        on_state_change=self.on_state_change,
        timeout=WS_TIMEOUT,
        alert_callback=self._send_system_alert,
        max_retries=WS_MAX_RETRIES,
        alert_threshold=WS_ALERT_THRESHOLD,
        source_name="market",
    )
    self.ws_manager.add_message_callback(self.on_message)
    _set_global_ws_manager(self.ws_manager)  # L2 缓存仍基于行情 WS

    # —— 交易 WS(随配置,source="trading")——
    if self.trading_subscriptions:
        self.ws_trading_manager = EnhancedWebSocketManager(
            subscriptions=self.trading_subscriptions,
            on_state_change=self._on_trading_state_change,
            timeout=WS_TIMEOUT,
            alert_callback=self._send_trading_alert,
            max_retries=WS_MAX_RETRIES,
            alert_threshold=WS_ALERT_THRESHOLD,
            ws_url=WS_TRADING_URL,
            source_name="trading",
        )
        self.ws_trading_manager.add_message_callback(self._on_trading_message)
        self.logger.info(
            f"🌐 WS 连接配置 | 行情: {WS_URL} (主网固定)"
            f" | 交易: {WS_TRADING_URL} (TRADING_NETWORK={os.getenv('TRADING_NETWORK','mainnet')})"
        )
    else:
        self.logger.info("✅ 交易订阅为空(TRADING_ENABLED=false),仅启动行情 WS")

6.3.5 交易 WS 消息回调 _on_trading_message

def _on_trading_message(self, msg: Dict):
    """交易 WS 消息回调。

    注意:调用本方法时,_cache_latest_data() 已执行完毕:
    - user channel 的 EventBus 事件(PositionUpdatedEvent 等)已发布
    - 本方法只需处理需要路由到 WebSocketOrderManager 的消息
    """
    channel = msg.get("channel") or msg.get("type", "")

    # user channel 已由 _cache_latest_data → _publish_user_events 处理,跳过
    if channel == "user":
        return

    # orderUpdates / userFills → WebSocketOrderManager
    if channel in ("orderUpdates", "userFills"):
        # 即时缓冲区回放(_periodic_buffer_flush 每 2 秒兜底,此处为即时优化)
        if self._order_msg_buffer:
            mgr = self._get_ws_order_manager()
            if mgr is not None:
                while True:
                    try:
                        buffered = self._order_msg_buffer.popleft()
                        mgr.handle_message(buffered)
                    except IndexError:
                        break

        mgr = self._get_ws_order_manager()
        if mgr is not None:
            try:
                mgr.handle_message(msg)
            except Exception as e:
                self.logger.error(f"处理交易WS订单消息失败: {e}", exc_info=True)
        else:
            self._order_msg_buffer.append(msg)
            buf_size = len(self._order_msg_buffer)
            if buf_size >= 1000:
                self.logger.error(f"订单消息缓冲区异常膨胀({buf_size}条),订单管理器可能初始化异常")
            elif buf_size >= 100 and buf_size % 100 == 0:
                self.logger.warning(f"订单消息缓冲区较大({buf_size}条),等待 WebSocketOrderManager 就绪")

缓冲区告警与 maxlen:当前 _order_msg_bufferdeque(maxlen=500),故 size 不会超过 500;buf_size >= 1000 为防御性检查(若将来提高 maxlen 可同步将 error 阈值设为 maxlen 或 2×maxlen)。

缓冲区归属:双 WS 架构下,订单相关消息仅经 _on_trading_message 入队与回放;行情 on_message 不再写入 _order_msg_buffer

6.3.6 交易 WS 状态回调 _on_trading_state_change

def _on_trading_state_change(self, state: str, info: Optional[str] = None):
    """交易 WS 连接状态变化回调,复用行情 WS 的 on_state_change 签名。"""
    self.logger.info(f"[交易WS] 状态变化: {state}" + (f" | {info}" if info else ""))
    # 可按需扩展:状态持久化、监控指标更新等

6.3.7 告警区分 _send_trading_alert

def _send_trading_alert(self, title: str, content: str):
    """交易 WS 告警,带来源前缀便于运维区分。"""
    self._send_system_alert(f"[交易WS] {title}", content)

6.3.8 行情 on_message 精简

从现有 on_message()(第 606~673 行)中删除以下内容:

  • 缓冲区回放逻辑(第 623~636 行)—— 已移至 _on_trading_message
  • orderUpdates/userFills 路由逻辑(第 639~660 行)—— 订单消息仅由交易 WS 的 _on_trading_message 处理

行情 on_message 仅处理行情(K 线 + 去重),不再包含任何订单相关分支:

def on_message(self, msg: Dict):
    if self._message_dedup.is_duplicate(msg):
        return

    # 行情 WS 只收到 candle / l2Book,仅做 K 线处理
    kline = self._parse_kline(msg)
    if not kline:
        return
    self.kline_buffer.put_nowait(kline)

6.3.9 生命周期管理

start()(第 2145 行附近)

def start(self):
    ...  # 原有工作线程启动(含 _buffer_flush_thread,服务于共享 _order_msg_buffer)

    # 交易 WS 在独立 daemon 线程启动(ws_manager.start() 是阻塞调用)
    if self.ws_trading_manager is not None:
        self._trading_ws_thread = threading.Thread(
            target=self.ws_trading_manager.start,
            daemon=True,
            name="trading-ws",
        )
        self._trading_ws_thread.start()
        self.logger.info("✅ 交易 WS 已在独立线程启动")

    # 行情 WS 阻塞当前线程(保持原有行为)
    self.ws_manager.start()

stop()(第 2185 行附近)

顺序动机:先停 trading orchestrator,避免关闭 WS 过程中仍有新订单或回调进入已停组件;再停交易 WS 并 join 线程,最后停行情 WS。

def stop(self):
    # 1. 停止交易 orchestrator(避免后续仍有订单/回调进入已停组件)
    if self._trading_orchestrator:
        self._trading_orchestrator.stop()

    # 2. 停止交易 WS(先于行情 WS)
    if self.ws_trading_manager is not None:
        self.ws_trading_manager.stop()
        self.logger.info("✅ 交易 WS stop() 已调用")
        # 等待线程退出(最多 5 秒,避免 daemon 线程被强杀时消息丢失)
        if self._trading_ws_thread is not None and self._trading_ws_thread.is_alive():
            self._trading_ws_thread.join(timeout=5)
            if self._trading_ws_thread.is_alive():
                self.logger.warning("交易 WS 线程在 5s 内未退出,继续停止流程")
            else:
                self.logger.info("✅ 交易 WS 线程已退出")

    # 3. 停止行情 WS(保持原有逻辑)
    self.ws_manager.stop()
    ...

6.3.10 动态订阅(新币种)

第 2068~2073 行:add_subscriptions 仅对 self.ws_manager 调用(只加 candle/l2Book),不涉及交易 WS。无需改动

6.3.11 导入

from src.config import WS_TRADING_URL

行数变化:约 +70~90(新增方法),-20~25(删除旧订单路由),净增约 50~65 行。


6.4 src/trading/executor.py

位置:事件 handler 方法(第 1754~1794 行)。

def _on_position_updated(self, event: PositionUpdatedEvent):
    if event.source != "trading":
        return  # 仅处理交易 WS 的持仓事件
    # ... 原有逻辑不变 ...

def _on_balance_changed(self, event: BalanceChangedEvent):
    if event.source != "trading":
        return  # 仅处理交易 WS 的余额事件
    # ... 原有逻辑不变 ...

def _on_websocket_reconnected(self, event: WebSocketReconnectedEvent):
    if event.source != "trading":
        return  # 仅交易 WS 重连时触发订单补查
    # ... 原有逻辑不变 ...

说明

  • 仅处理 source="trading" 的事件,行情 WS 发布的事件(source="market")一律忽略。
  • 无需新增 register_trading_ws_event_bus 方法:EventBus 是全局单例,executor 已在 _subscribe_events() 中订阅,天然能收到交易 WS 发布的事件。
  • OrderFilledEvent 若被其他模块订阅,同样须仅处理 source="trading",避免误用行情来源数据。
  • 行数:约 +3(每个 handler 加 1 行判断)。

7. 初始化时序

__init__
  ├── [194] _init_trading_module()             → TradingOrchestrator + Executor 初始化
  │        └── Executor._subscribe_events()   → 在全局 EventBus 上订阅 3 类事件
  ├── [+]   self.ws_trading_manager = None     → 总是初始化(防御性)
  ├── [+]   self._trading_ws_thread = None     → 总是初始化(防御性)
  ├── [207] _build_subscriptions()             → 行情订阅 (candle + l2Book)
  ├── [+]   _build_trading_subscriptions()     → 交易订阅(交易启用时非空)
  └── [221] _init_service_threads()
            ├── ws_manager (行情, source="market")
            └── ws_trading_manager (交易, source="trading") ← 有交易订阅时创建

start()
  ├── 工作线程启动(含 _buffer_flush_thread,服务于共享 _order_msg_buffer)
  ├── ws_trading_manager.start() ← daemon 线程(若存在,先启动)
  └── ws_manager.start()         ← 阻塞主线程

stop()
  ├── _trading_orchestrator.stop()
  ├── ws_trading_manager.stop()           ← 先停交易 WS
  ├── _trading_ws_thread.join(timeout=5)  ← 等待线程退出(优雅关闭)
  ├── ws_manager.stop()                   ← 再停行情 WS
  └── 等待队列清空

关键保证

  • _init_trading_module()(第 194 行)在 _init_service_threads()(第 221 行)之前执行,创建 ws_trading_manager_trading_orchestrator._executor 已存在。
  • ws_trading_manager_trading_ws_thread 总在 __init__ 中初始化为 None,无需 hasattr 检查。
  • Executor 的 _subscribe_events() 在 executor 构造时就已执行,早于任何 WS manager 创建,不会遗漏事件。
  • _buffer_flush_thread(周期性缓冲区回放)服务于 service 级别的 self._order_msg_buffer,与哪个 WS 收到消息无关,无需复制。

8. 边界与注意事项

8.1 get_global_ws_manager

始终返回行情 WS(主网),L2 缓存与 executor 的 get_all_mids() 等仍基于主网数据,符合「K 线/L2 始终主网」原则。本方案不暴露 get_trading_ws_manager();若后续需要监控/调试交易 WS,可在 RealtimeKlineServiceBase 上增加只读属性(如 trading_ws_manager)供内部或运维使用。

8.2 消息去重

on_message 中的 MessageDeduplicator 仅作用于行情消息。_on_trading_message 不调用 _message_dedup

  • 交易消息量远小于行情消息,去重收益低。
  • WebSocketOrderManager 内部有 oid 级去重。
  • 若后续需要,可为交易 WS 单独实例化 MessageDeduplicator(传入不同的 namespace key)。

8.3 并发安全

  • self._order_msg_bufferdeque(maxlen=500),Python 中 deque.append()deque.popleft() 在 CPython 中是原子操作,多线程安全。
  • _on_trading_message(交易 WS 线程)和 _buffer_flush_thread(独立线程)都会 popleft(),由于 deque 的原子性不会产生数据竞争,可能发生"竞争消耗"——这是预期行为(两者都在尽力回放)。
  • ws_trading_manager.stop()_trading_ws_thread.join(timeout=5) 确保线程退出后再停行情 WS,避免 _order_msg_buffer 仍在被消费时行情 WS 就关闭。

8.4 连接失败与配置校验

  • Trading WS 连接失败:行为与现有 EnhancedWebSocketManager 一致(重试 + 飞书告警);不阻塞行情 WS,系统以降级模式运行(订单依赖 HTTP 兜底)。
  • 配置校验(可选):在 config 加载后校验 TRADING_NETWORK in ("mainnet", "testnet");若为非法值则打 warning 并回退为 mainnet。也可对 WS_TRADING_URL 做 URL 格式校验,非法时 fail fast 或 fallback 到由 TRADING_NETWORK 推导的默认值。

8.5 start() 阻塞问题

EnhancedWebSocketManager.start() 是阻塞调用,因此:

  • 交易 WS 必须在独立 daemon 线程启动。
  • 行情 WS 保持在主线程阻塞(与当前行为一致)。
  • stop() 时两个 manager 的 stop_event 独立,互不影响。

8.6 mainnet=mainnet 场景

TRADING_NETWORK=mainnet 时,WS_TRADING_URL = WS_URL,两个 manager 连接同一地址,建立两条独立连接。功能正常,逻辑分离清晰,无副作用。从连接数角度为 2 个独立连接,对 Hyperliquid 可接受;若未来需省连接数,可另行考虑单 WS 多路复用,不在本方案范围内。

8.7 事件 source 过滤约定

凡依赖「交易/账户数据」的事件消费者,均应只处理 source="trading" 的事件,包括但不限于:PositionUpdatedEventBalanceChangedEventWebSocketReconnectedEvent(订单补查)、以及若被订阅的 OrderFilledEventWebSocketConnectedEvent/WebSocketDisconnectedEvent(用于交易 WS 状态时)。当前仅 Executor 的 3 个 handler 需在实现中加过滤;新增订阅方时须遵守此约定,避免误用 source="market" 的 user 相关事件。


9. 监控指标

双 WS 架构新增以下日志/指标,便于运维监控:

指标 来源 告警条件
market_ws_state Market WS on_state_change 连接断开 >30s
trading_ws_state Trading WS _on_trading_state_change 连接断开 >10s
order_msg_buffer_size _on_trading_message 入队时或 _periodic_buffer_flush 每 2 秒采样一次 len(_order_msg_buffer),避免重复打点 >100 条
trading_ws_thread_alive stop() 中的 join 结果 join 超时

启动时打印一条综合日志(在 _init_service_threads() 末尾):

🌐 WS 连接配置
  行情 WS: wss://api.hyperliquid.xyz/ws (主网固定)
  交易 WS: wss://api.hyperliquid-testnet.xyz/ws (TRADING_NETWORK=testnet)

两个 manager 通过 source_name_send_trading_alert 前缀区分日志与告警,若需指标系统,可为两个 manager 打不同标签(ws_source=market / ws_source=trading)。


10. 测试验证清单

功能测试

# 场景 预期
1 TRADING_NETWORK=testnet 行情 WS 连主网,交易 WS 连测试网,日志确认两个 URL
2 测试网下单后 能通过交易 WS 收到 orderUpdates/userFills
3 交易 WS 重连 executor 触发 verify_pending_orders 双轮补查
4 行情 WS 重连 executor 触发订单补查
5 TRADING_ENABLED=false 仅一个行情 WS,无交易 WS,无报错
6 TRADING_NETWORK=mainnet 两个 WS 连同一地址,功能正常(逻辑分离)
7 交易 WS 断开期间下单 HTTP 兜底正常,重连后推送恢复

单元测试(关键场景)

# 测试点 验证方式
T1 source="market"WebSocketReconnectedEvent Executor verify_pending_orders 不触发
T2 source="trading"PositionUpdatedEvent Executor 缓存正确更新
T3 source="market"PositionUpdatedEvent Executor 缓存更新
T4 _on_trading_message 收到 user channel 不调用 WebSocketOrderManager,无副作用
T5 缓冲区回放:先缓冲后就绪 就绪后 1~2 个周期(2~4 秒)内缓冲区应被清空;_periodic_buffer_flush 每 2 秒执行一次

压力/并发测试

# 场景 预期
P1 两个 WS 同时高频推送消息 无消息串扰,行情/交易各自独立处理
P2 交易 WS 线程与 _buffer_flush_thread 并发消费 _order_msg_buffer 无数据竞争;同一条消息要么被 _on_trading_message 即时处理,要么留在 buffer 中被周期回放或后续某条消息触发的即时回放处理,因 deque.popleft 原子性每条消息只会被一个消费者取出并交给 WebSocketOrderManager 一次(业务层另有 oid 去重)
P3 stop() 时交易 WS 仍有未处理消息 join(timeout=5) 后消息被合理丢弃或记录

11. 改动量汇总

文件 新增 修改/删除 合计(约)
src/config.py +10 0 10
src/utils/websocket/enhanced_ws_manager.py +4 ~12(9处source替换+3处签名) ~16
src/services/realtime_kline_service_base.py +70~90 -20~25(删除旧订单路由) 50~65
src/trading/executor.py +3 0 3
合计 约 79~94

12. 依赖与兼容性

  • 不新增第三方依赖。

  • Hyperliquid 测试网 WS 地址:wss://api.hyperliquid-testnet.xyz/ws(官方文档)。

  • Event 基类 source: str 字段已存在(src/events/base.py:39),无需修改事件定义。

  • EventBus.subscribe() 自带 handler 去重(第 51 行),不会因多次调用重复注册。

  • .env.example 新增:

    # 交易网络配置(mainnet/testnet)
    # 影响下单/查单 HTTP API 和订单推送 WS 地址
    TRADING_NETWORK=mainnet
    
    # 手动覆盖交易推送 WS 地址(可选,默认由 TRADING_NETWORK 推导)
    # WS_TRADING_URL=wss://api.hyperliquid-testnet.xyz/ws
    

13. 与原方案的差异对照

维度 原方案 当前方案
EventBus 处理 提议 register_trading_ws_event_bus() 注册第二个 bus 利用全局单例 + source 字段过滤,无需额外注册
重连误触发 未处理 Market WS 重连也触发补查的问题 executor handler 过滤 source≠"trading" 的事件
事件来源标识 所有事件 source="websocket" Market WS source="market",Trading WS source="trading"
user channel 处理说明 注释说"由内部处理",调用顺序不明 明确 _cache_latest_data 先于外部回调,_on_trading_message 中显式 return
userFillsuser.fills 未分析两者关系 明确两者不重叠,路径独立
缓冲区回放 每条消息触发,周期线程归属不明 明确两套机制共存合理,_buffer_flush_thread 无需复制
stop() 线程等待 未包含 _trading_ws_thread.join() 新增 join(timeout=5) 优雅关闭
ws_trading_manager 初始化 依赖 hasattr 检查 总在 __init__ 中初始化为 None
TRADING_NETWORK 默认值 "testnet" "mainnet"(安全默认值)
半连接降级 未讨论 第 4.4 节明确三种降级场景
start() 阻塞 未提及 交易 WS 在独立 daemon 线程启动
executor 事件过滤 未区分来源 仅处理 source="trading" 的事件
_on_trading_state_change 提及但未定义 提供完整实现规格
告警区分 简单提及共用或包装 明确 _send_trading_alert 带前缀,第 8 节说明区分方式
行情 on_message 精简 含订单路由逻辑 仅处理 K 线;订单消息仅由交易 WS 处理
动态订阅(新币种) 未明确 第 6.3.10 节明确仅对行情 WS 调用
事件类型覆盖 仅提 3 种 第 2.3 节列出全部 9 种事件及 source 替换
监控指标 第 9 节,4 项关键指标
测试清单 7 个功能场景 7 个功能 + 5 个单元 + 3 个压力场景

14. 后续可考虑

  • 将 TRADING_NETWORK 非法值校验作为必选(当前已推荐实现)。
  • 若交易 WS 消息出现重复处理问题,可为交易 WS 单独实例化 MessageDeduplicator
  • 暴露 RealtimeKlineServiceBase.trading_ws_manager 只读属性,便于监控/调试。
  • 若需省连接数,可另行评估单 WS 多路复用方案(非本架构范围)。

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG