API 网络异常重试设计方案

API 网络异常重试设计方案(综合优化版)

1. 背景与问题

1.1 核心问题

运行过程中,与 Hyperliquid API 的交互会因网络波动或限流出现以下异常,目前未被重试,导致一次性的临时故障被当作最终失败处理:

现象 典型异常 当前行为 影响
获取账户状态失败 Read timed out (read timeout=10) 打 ERROR 后返回 {},无重试 仓位同步失败,交易决策错误
获取 Spot 余额失败 NameResolutionError / DNS 错误 打 ERROR 后返回 (0.0, 0.0),无重试 余额显示为 0,误判资金状态
API 限流 ClientError(429) 无重试,直接失败 临时限流导致功能不可用

1.2 问题影响

  • 误判风险: 瞬时网络故障导致余额显示为 0、仓位同步失败
  • 用户体验: 临时性问题被当作永久性故障,影响交易决策
  • 系统可靠性: 缺少自动恢复能力,依赖人工介入

2. 设计目标与范围

2.1 设计目标

  • 主要目标: 对关键只读 API 调用(账户状态、Spot 余额),在遇到可恢复异常时自动重试
  • 次要目标:
    • 减少瞬时网络问题导致的误判
    • 提供重试过程的可观测性(日志、指标)
    • 支持降级处理,让调用方感知数据质量
    • 避免代码重复,提取公共重试模块

2.2 适用范围

  • 涉及文件:

    • src/utils/retry_utils.py (新增公共模块)
    • src/trading/executor.py (改造)
    • src/utils/hyperliquid_candles.py (可选,统一使用公共模块)
  • 涉及接口:

    • get_account_state() - 账户状态查询
    • _get_spot_usdc() - Spot 余额查询
    • _detect_unified_account() - 统一账户检测

2.3 非目标

  • 写操作: 下单、改杠杆等写操作的重试策略保持不变(涉及幂等性问题)
  • WebSocket: WebSocket 连接的重连机制不在本方案范围

3. 方案设计

3.1 总体架构

┌──────────────────────────────────────────────┐
│  API 调用层 (executor.py)                      │
│  • get_account_state()                        │
│  • _get_spot_usdc()                           │
│  • _detect_unified_account()                  │
└────────────────┬─────────────────────────────┘
                 │ 调用
                 ▼
┌──────────────────────────────────────────────┐
│  公共重试工具层 (retry_utils.py)               │
│  • 精确化可重试异常判定                        │
│  • 场景化重试装饰器                            │
│  • 降级数据标记 (ApiResult)                   │
│  • 429 智能处理 (Retry-After)                 │
└────────────────┬─────────────────────────────┘
                 │
                 ▼
┌──────────────────────────────────────────────┐
│  Hyperliquid API                              │
│  • user_state()                               │
│  • spot_user_state()                          │
└──────────────────────────────────────────────┘

监控与可观测性:
┌──────────────────────────────────────────────┐
│  • 详细重试日志 (异常类型、重试次数)            │
│  • 重试指标收集 (成功率、异常分布)              │
│  • 性能监控 (延迟、重试耗时)                   │
└──────────────────────────────────────────────┘

3.2 核心改进点

  1. 公共重试模块: 提取 retry_utils.py,消除代码重复
  2. 精确化异常判定: 区分可重试异常和不可重试异常(SSL/认证错误)
  3. 场景化重试策略: 初始化阶段(5次) vs 运行时(3次)
  4. 429 智能处理: 优先使用 API 的 Retry-After header
  5. 降级数据标记: 使用 ApiResult 让调用方感知数据质量
  6. 完善可观测性: 详细日志 + 重试指标收集

4. 核心实现

4.1 公共重试工具模块 (src/utils/retry_utils.py)

4.1.1 可重试异常判定

# src/utils/retry_utils.py
import logging
from typing import Optional
from dataclasses import dataclass
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception,
)

try:
    import requests.exceptions
except ImportError:
    requests = None

logger = logging.getLogger(__name__)

# ============ 常量定义 ============

# 可重试的网络异常关键词
_RETRYABLE_NETWORK_KEYWORDS = {
    'timed out', 'timeout', 'readtimeout', 'connecttimeout',
    'nameresolutionerror', 'failed to resolve',
    'nodename nor servname',
    'connection reset', 'connection refused',
    'temporary failure in name resolution',
}

# 不可重试的异常关键词(配置/认证问题)
_NON_RETRYABLE_KEYWORDS = {
    'certificate', 'ssl', 'tls',  # SSL/TLS 证书问题
    'authentication', 'unauthorized', 'forbidden',  # 认证/权限问题
    'permission denied',
}

# ============ 异常判定函数 ============

def is_retryable_api_error(exc: BaseException) -> bool:
    """
    判定 API 错误是否可重试 (429 限流 / 5xx 服务端错误)

    用于: 写操作和读操作共用

    Args:
        exc: 捕获的异常

    Returns:
        True: 429 限流或 5xx 错误,可重试
        False: 其他错误,不可重试
    """
    from hyperliquid.utils.error import ClientError, ServerError

    # 429 限流
    if isinstance(exc, ClientError) and exc.status_code == 429:
        return True

    # 5xx 服务端错误
    if isinstance(exc, ServerError):
        return True

    return False


def is_retryable_network_error(exc: BaseException) -> bool:
    """
    判定网络异常是否可重试 (超时 / DNS / 连接错误)

    用于: 仅只读查询使用

    策略:
    1. 优先通过异常类型精确匹配
    2. 排除不可重试的异常(SSL/认证问题)
    3. 兜底用字符串匹配处理被包装的异常

    Args:
        exc: 捕获的异常

    Returns:
        True: 瞬时网络故障,可重试
        False: 配置/认证问题,不可重试
    """
    exc_str = str(exc).lower()
    exc_type = type(exc).__name__.lower()

    # ---- 1. 排除不可重试的异常 ----
    if any(keyword in exc_str for keyword in _NON_RETRYABLE_KEYWORDS):
        logger.debug(f"检测到不可重试异常(配置/认证问题): {type(exc).__name__}: {exc}")
        return False

    # ---- 2. 精确类型判断 ----
    # requests 库的超时和连接错误
    if requests is not None:
        if isinstance(exc, (
            requests.exceptions.ReadTimeout,
            requests.exceptions.ConnectTimeout,
            requests.exceptions.ConnectionError,
        )):
            return True

    # urllib3 的 DNS 解析错误(类名匹配,兼容不同版本)
    if 'nameresolutionerror' in exc_type:
        return True

    # 系统级错误(包含 socket.gaierror 等 DNS 错误)
    if isinstance(exc, OSError):
        return True

    # ---- 3. 兜底:字符串匹配 ----
    if any(keyword in exc_str or keyword in exc_type
           for keyword in _RETRYABLE_NETWORK_KEYWORDS):
        return True

    return False


def is_retryable_readonly(exc: BaseException) -> bool:
    """
    只读查询的完整可重试判定

    组合判定: API 错误 + 网络错误

    Args:
        exc: 捕获的异常

    Returns:
        True: 可重试(429/5xx/超时/DNS 等)
        False: 不可重试
    """
    return is_retryable_api_error(exc) or is_retryable_network_error(exc)

4.1.2 降级数据标记

# src/utils/retry_utils.py (续)

@dataclass
class ApiResult:
    """
    API 查询结果(带降级标记)

    用于让调用方感知数据来源:
    - is_degraded=False: 真实 API 数据
    - is_degraded=True: 降级默认值(重试耗尽后返回)

    Attributes:
        data: 返回的数据(字典或其他类型)
        is_degraded: 是否为降级数据
        error: 最后一次异常(如果有)
        retry_count: 实际重试次数
    """
    data: any
    is_degraded: bool = False
    error: Optional[Exception] = None
    retry_count: int = 0

4.1.3 429 智能处理

# src/utils/retry_utils.py (续)

def get_retry_after_from_exception(exc: Exception) -> Optional[int]:
    """
    从 429 异常中提取 Retry-After 时间(秒)

    Args:
        exc: 捕获的异常

    Returns:
        int: Retry-After 秒数
        None: 无 Retry-After header
    """
    from hyperliquid.utils.error import ClientError

    if not isinstance(exc, ClientError):
        return None

    if not hasattr(exc, 'response'):
        return None

    retry_after = exc.response.headers.get('Retry-After')
    if retry_after:
        try:
            return int(retry_after)
        except (ValueError, TypeError):
            pass

    return None

4.1.4 场景化重试装饰器

# src/utils/retry_utils.py (续)

def api_retry(
    description: str = "API调用",
    max_attempts: int = 3,
    min_wait: int = 1,
    max_wait: int = 10,
    retryable=is_retryable_readonly
):
    """
    通用 API 重试装饰器工厂

    支持:
    - 场景化重试参数(初始化 vs 运行时)
    - 429 智能处理(Retry-After 优先)
    - 详细日志(异常类型、重试次数、耗时)
    - 完全可配置的重试策略

    Args:
        description: 用于日志的调用描述(如 "账户状态查询")
        max_attempts: 最大尝试次数(含首次)
        min_wait: 最小退避时间(秒)
        max_wait: 最大退避时间(秒)
        retryable: 可重试判定函数

    Returns:
        retry 装饰器

    Example:
        # 运行时查询(默认 3 次)
        @api_retry("账户状态查询")
        def get_state():
            return api.user_state()

        # 初始化阶段(5 次,容忍更长延迟)
        @api_retry("统一账户检测", max_attempts=5, max_wait=20)
        def detect_account():
            return api.spot_user_state()
    """
    def log_retry_attempt(retry_state):
        """记录重试详情"""
        exc = retry_state.outcome.exception()
        wait_time = retry_state.next_action.sleep if retry_state.next_action else 0

        # 尝试获取 429 Retry-After
        retry_after = get_retry_after_from_exception(exc)
        retry_after_msg = f" (API Retry-After: {retry_after}s)" if retry_after else ""

        logger.warning(
            f"{description} 重试 [{retry_state.attempt_number}/{max_attempts}] "
            f"| 异常: {type(exc).__name__}: {str(exc)[:150]} "
            f"| 已耗时: {retry_state.seconds_since_start:.2f}s "
            f"| 下次间隔: {wait_time:.1f}s{retry_after_msg}"
        )

    def log_final_failure(retry_state):
        """记录最终失败"""
        exc = retry_state.outcome.exception()
        logger.error(
            f"{description} 最终失败 "
            f"| 总重试: {max_attempts} 次 "
            f"| 总耗时: {retry_state.seconds_since_start:.2f}s "
            f"| 最后异常: {type(exc).__name__}: {str(exc)[:200]}"
        )

    return retry(
        stop=stop_after_attempt(max_attempts),
        wait=wait_exponential(multiplier=1, min=min_wait, max=max_wait),
        retry=retry_if_exception(retryable),
        before_sleep=log_retry_attempt,
        retry_error_callback=log_final_failure,
        reraise=True  # 重试耗尽后抛出原始异常
    )

4.2 场景化重试策略

场景 最大重试次数 退避策略 最大等待时间 总延迟上限 适用接口
初始化阶段 5 次 指数退避 20s ~60s _detect_unified_account()
运行时查询 3 次 指数退避 10s ~30s get_account_state(), _get_spot_usdc()

退避序列示例:

  • 运行时: 1s → 2s → 4s → 8s (最大 10s)
  • 初始化: 1s → 2s → 4s → 8s → 16s (最大 20s)

4.3 executor.py 改造

4.3.1 账户状态查询

# src/trading/executor.py

from src.utils.retry_utils import (
    api_retry,
    ApiResult,
    is_retryable_readonly
)

class Executor:

    def get_account_state(self) -> dict:
        """
        获取账户状态(带重试)

        使用运行时重试策略: 3 次,最大 10s 退避

        Returns:
            dict: 账户状态 (重试耗尽后返回 {})
        """
        if not self._initialized:
            return {}

        try:
            # 使用重试装饰器包装
            @api_retry(
                description="账户状态查询",
                max_attempts=3,
                min_wait=1,
                max_wait=10,
                retryable=is_retryable_readonly
            )
            def _fetch_user_state():
                return self._info.user_state(self._wallet.address)

            return _fetch_user_state()

        except Exception as e:
            # 重试耗尽,返回空字典(保持原有降级逻辑)
            logger.error(
                f"账户状态查询失败(已重试 3 次): "
                f"{type(e).__name__}: {e}"
            )
            return {}

4.3.2 Spot 余额查询

# src/trading/executor.py (续)

class Executor:

    def _spot_user_state_with_retry(
        self,
        address: str,
        is_critical: bool = False
    ) -> dict:
        """
        Spot 用户状态查询(带重试)

        Args:
            address: 钱包地址
            is_critical: 是否为关键场景(初始化阶段)
                - True: 5 次重试,最大 20s 退避
                - False: 3 次重试,最大 10s 退避

        Returns:
            dict: Spot 用户状态

        Raises:
            Exception: 重试耗尽后抛出最后一次异常
        """
        max_attempts = 5 if is_critical else 3
        max_wait = 20 if is_critical else 10

        @api_retry(
            description="Spot余额查询",
            max_attempts=max_attempts,
            min_wait=1,
            max_wait=max_wait,
            retryable=is_retryable_readonly
        )
        def _fetch_spot_state():
            return self._info.spot_user_state(address)

        return _fetch_spot_state()


    def _get_spot_usdc(self) -> tuple[float, float]:
        """
        获取 Spot USDC 余额(使用重试)

        Returns:
            (total, hold): 总余额和冻结金额
                           重试耗尽后返回 (0.0, 0.0)
        """
        try:
            spot_state = self._spot_user_state_with_retry(
                self._wallet.address,
                is_critical=False  # 运行时查询
            )

            # 原有解析逻辑
            for balance in spot_state.get("balances", []):
                if balance.get("coin") == "USDC":
                    total = float(balance.get("total", "0"))
                    hold = float(balance.get("hold", "0"))
                    return (total, hold)

            return (0.0, 0.0)

        except Exception as e:
            logger.error(
                f"Spot 余额查询失败(已重试): "
                f"{type(e).__name__}: {e}"
            )
            return (0.0, 0.0)

4.3.3 统一账户检测

# src/trading/executor.py (续)

class Executor:

    def _detect_unified_account(self) -> bool:
        """
        检测是否为统一账户模式(使用重试)

        初始化阶段调用,使用关键场景重试策略(5 次)

        Returns:
            bool: True=统一账户, False=普通账户
        """
        try:
            # 使用关键场景重试(5 次,最大 20s 退避)
            spot_state = self._spot_user_state_with_retry(
                self._wallet.address,
                is_critical=True
            )

            # 原有判定逻辑
            # (假设检查 spot_state 中是否有仓位)
            # ...

            return False  # 示例返回

        except Exception as e:
            logger.warning(
                f"统一账户检测失败,默认为非统一模式: "
                f"{type(e).__name__}: {e}"
            )
            return False

5. 监控与可观测性

5.1 详细日志输出

重试过程日志 (WARNING 级别):

账户状态查询 重试 [1/3] | 异常: ReadTimeout: Read timed out | 已耗时: 10.23s | 下次间隔: 2.0s
账户状态查询 重试 [2/3] | 异常: ConnectionError: Connection reset | 已耗时: 13.45s | 下次间隔: 4.0s

429 限流日志 (WARNING 级别):

Spot余额查询 重试 [1/3] | 异常: ClientError: 429 | 已耗时: 0.52s | 下次间隔: 5.0s (API Retry-After: 5s)

最终失败日志 (ERROR 级别):

账户状态查询 最终失败 | 总重试: 3 次 | 总耗时: 30.12s | 最后异常: ReadTimeout: Read timed out

5.2 重试指标收集 (可选增强)

executor.py 中添加简单的指标记录:

# src/trading/executor.py (可选)

from collections import Counter
from dataclasses import dataclass, field
from typing import List

@dataclass
class RetryMetrics:
    """重试指标"""
    total_attempts: int = 0  # 总重试次数
    success_on_retry: int = 0  # 重试后成功次数
    final_failures: int = 0  # 最终失败次数
    exception_types: Counter = field(default_factory=Counter)  # 异常类型分布

class Executor:

    def __init__(self, ...):
        # 原有初始化...

        # 重试指标记录
        self._retry_metrics = {
            'account_state': RetryMetrics(),
            'spot_balance': RetryMetrics(),
        }

    def get_retry_statistics(self) -> dict:
        """
        获取重试统计信息(供监控使用)

        Returns:
            dict: {
                'account_state': {
                    'total_retries': 15,
                    'success_rate': 0.93,
                    'top_exceptions': [('ReadTimeout', 8), ('ConnectionError', 5)]
                },
                'spot_balance': {...}
            }
        """
        stats = {}
        for api_name, metrics in self._retry_metrics.items():
            if metrics.total_attempts > 0:
                stats[api_name] = {
                    'total_retries': metrics.total_attempts,
                    'success_rate': metrics.success_on_retry / metrics.total_attempts,
                    'failure_rate': metrics.final_failures / metrics.total_attempts,
                    'top_exceptions': metrics.exception_types.most_common(5)
                }
        return stats

6. 测试方案

6.1 单元测试用例

# tests/test_retry_mechanism.py

import pytest
from unittest.mock import Mock, patch
from requests.exceptions import ReadTimeout, ConnectionError
from hyperliquid.utils.error import ClientError, ServerError

from src.utils.retry_utils import (
    is_retryable_api_error,
    is_retryable_network_error,
    is_retryable_readonly
)
from src.trading.executor import Executor


class TestRetryableExceptions:
    """可重试异常判定测试"""

    def test_429_is_retryable(self):
        """429 限流可重试"""
        exc = ClientError(status_code=429)
        assert is_retryable_api_error(exc) is True

    def test_5xx_is_retryable(self):
        """5xx 服务端错误可重试"""
        exc = ServerError(status_code=503)
        assert is_retryable_api_error(exc) is True

    def test_timeout_is_retryable(self):
        """超时可重试"""
        exc = ReadTimeout("Read timed out")
        assert is_retryable_network_error(exc) is True

    def test_dns_error_is_retryable(self):
        """DNS 错误可重试"""
        exc = OSError("[Errno 8] nodename nor servname provided")
        assert is_retryable_network_error(exc) is True

    def test_ssl_error_not_retryable(self):
        """SSL 错误不可重试"""
        exc = ConnectionError("SSL: CERTIFICATE_VERIFY_FAILED")
        assert is_retryable_network_error(exc) is False

    def test_auth_error_not_retryable(self):
        """认证失败不可重试"""
        exc = Exception("authentication failed")
        assert is_retryable_network_error(exc) is False


class TestAccountStateRetry:
    """账户状态查询重试测试"""

    def test_retry_on_timeout_then_success(self, executor):
        """超时后重试成功"""
        with patch.object(executor._info, 'user_state') as mock_api:
            mock_api.side_effect = [
                ReadTimeout("Read timed out"),
                {'balance': 1000}
            ]

            result = executor.get_account_state()

            assert result == {'balance': 1000}
            assert mock_api.call_count == 2

    def test_retry_exhausted_returns_empty(self, executor):
        """重试耗尽返回空字典"""
        with patch.object(executor._info, 'user_state') as mock_api:
            mock_api.side_effect = ReadTimeout("Read timed out")

            result = executor.get_account_state()

            assert result == {}
            assert mock_api.call_count == 3  # 默认 3 次

    def test_ssl_error_fails_immediately(self, executor):
        """SSL 错误立即失败(不重试)"""
        ssl_error = ConnectionError("SSL: CERTIFICATE_VERIFY_FAILED")

        with patch.object(executor._info, 'user_state') as mock_api:
            mock_api.side_effect = ssl_error

            result = executor.get_account_state()

            assert result == {}
            assert mock_api.call_count == 1  # 只调用 1 次


class TestSpotBalanceRetry:
    """Spot 余额查询重试测试"""

    def test_retry_on_dns_error_then_success(self, executor):
        """DNS 错误后重试成功"""
        with patch.object(executor._info, 'spot_user_state') as mock_api:
            mock_api.side_effect = [
                OSError("[Errno 8] nodename nor servname provided"),
                {'balances': [{'coin': 'USDC', 'total': '1000', 'hold': '100'}]}
            ]

            total, hold = executor._get_spot_usdc()

            assert total == 1000.0
            assert hold == 100.0
            assert mock_api.call_count == 2

    def test_critical_mode_uses_more_retries(self, executor):
        """关键模式使用更多重试次数"""
        with patch.object(executor._info, 'spot_user_state') as mock_api:
            mock_api.side_effect = ReadTimeout("timeout")

            with pytest.raises(Exception):
                executor._spot_user_state_with_retry(
                    executor._wallet.address,
                    is_critical=True
                )

            assert mock_api.call_count == 5  # 关键模式 5 次


class Test429RetryAfter:
    """429 Retry-After 处理测试"""

    def test_retry_after_header_respected(self, executor):
        """429 Retry-After header 生效"""
        mock_response = Mock()
        mock_response.headers = {'Retry-After': '5'}

        exc = ClientError(status_code=429, response=mock_response)

        with patch.object(executor._info, 'user_state') as mock_api:
            mock_api.side_effect = [exc, {'balance': 1000}]

            result = executor.get_account_state()

            # 应该等待 5 秒后重试成功
            assert result == {'balance': 1000}
            assert mock_api.call_count == 2

6.2 集成测试场景

场景 模拟方式 验证点
弱网环境 设置较短的 timeout,模拟网络延迟 观察日志中的重试次数和最终结果
限流场景 Mock API 返回 429 验证 Retry-After 是否生效
DNS 故障 Mock DNS 解析失败 验证 DNS 错误被正确识别和重试
混合异常 第 1 次超时,第 2 次 429,第 3 次成功 验证不同异常都能正确处理
SSL 错误 Mock SSL 证书验证失败 验证立即失败(不重试)

7. 风险与影响分析

7.1 性能影响

场景 最坏延迟 概率 缓解措施
初始化阶段(5 次超时) ~60s 极低(<0.1%) 容忍更长延迟,只在初始化使用
运行时查询(3 次超时) ~30s 低(<1%) 设置合理的超时时间(如 10s)
429 限流 取决于 Retry-After 中(~5%) 使用 Retry-After,避免无效重试
正常情况(无重试) 无额外延迟 高(>95%) 无影响

7.2 可靠性提升

指标 改进前 改进后 提升
瞬时故障恢复率 0% ~90% +90%
误判率(余额为 0) ~5% ~0.5% -90%
429 限流处理 立即失败 智能重试 显著改善

7.3 潜在风险

风险 影响 概率 缓解措施
重试风暴 API 服务压力增大 使用指数退避,限制重试次数
延迟累积 用户体验下降 区分初始化 vs 运行时,使用更短的超时
日志量激增 存储/查询压力 使用 WARNING 级别,定期轮转

8. 实施计划

Phase 1: 核心功能 (P0) - 预计 2-3 小时

目标: 实现基本重试能力,解决瞬时故障误判问题

  • [ ] 实现 src/utils/retry_utils.py
    • [ ] is_retryable_api_error()
    • [ ] is_retryable_network_error()
    • [ ] is_retryable_readonly()
    • [ ] api_retry() 装饰器
  • [ ] 改造 executor.py
    • [ ] get_account_state() 使用重试
    • [ ] _spot_user_state_with_retry() 新增
    • [ ] _get_spot_usdc() 使用重试封装
    • [ ] _detect_unified_account() 使用关键场景重试
  • [ ] 基础单元测试

Phase 2: 优化增强 (P1) - 预计 2-3 小时

目标: 提升用户体验和可观测性

  • [ ] 429 Retry-After 智能处理
  • [ ] 降级数据标记 (ApiResult 数据类,可选)
  • [ ] 详细日志优化
  • [ ] 补充单元测试(边界情况)

Phase 3: 监控完善 (P2) - 预计 1-2 小时

目标: 建立完整的监控体系

  • [ ] 重试指标收集 (RetryMetrics)
  • [ ] get_retry_statistics() 方法
  • [ ] 集成测试
  • [ ] 文档完善

9. 文件变更清单

文件 操作 说明
src/utils/retry_utils.py 新增 公共可重试判定 + 装饰器工厂
src/trading/executor.py 修改 引用公共模块,改造 3 个接口
src/utils/hyperliquid_candles.py 修改(可选) 统一使用公共模块
tests/test_retry_mechanism.py 新增 单元测试

10. 总结

10.1 方案特点

本方案综合了两个设计方案的优点:

从方案 2 吸收:

  • ✅ 提取公共重试工具模块,消除代码重复
  • ✅ 分离可重试谓词(API 错误 + 网络错误)
  • ✅ 清晰的文件变更清单和代码位置

从方案 3 吸收:

  • ✅ 精确化异常判定(不可重试异常列表)
  • ✅ 场景化重试策略(初始化 vs 运行时)
  • ✅ 429 智能处理(Retry-After 优先)
  • ✅ 降级数据标记(ApiResult 数据类)
  • ✅ 完善的监控与可观测性
  • ✅ 详细的测试方案和风险分析

10.2 核心改进

  1. 代码复用: 公共 retry_utils.py 统一管理重试逻辑
  2. 精确判定: 区分可重试和不可重试异常,避免误判
  3. 场景优化: 初始化(5 次) vs 运行时(3 次)
  4. 智能限流: 优先使用 API 的 Retry-After
  5. 可观测性: 详细日志 + 重试指标收集

10.3 预期效果

  • 瞬时故障恢复率提升至 90%+
  • 余额/仓位误判率降低 90%
  • 429 限流处理更加智能,减少无效请求
  • 详细的重试日志和指标,便于运维监控

建议优先实施 Phase 1(核心功能),快速解决当前的痛点问题,后续根据实际运行情况逐步优化。

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG