交易模块API请求添加重试
API 重试机制修复计划
项目: Trading-in-websocket
创建时间: 2026-02-17
修复范围: 13个缺少重试机制的 API 调用
预计工作量: 3-5 个工作日
一、执行摘要
1.1 问题概述
通过全项目扫描发现,13个关键 API 调用缺少重试机制,其中 8 个为高风险项(涉及交易执行、订单管理),5 个为中风险项(涉及账户和数据查询)。
当前状况:
- ✅ 已有 7 处正确使用重试机制
- ❌ 13 处缺少重试,影响系统鲁棒性
- 🔴 8 个高风险项可能导致交易失败或资金风险
1.2 修复目标
- 可靠性提升: 交易操作在网络抖动/API限流时自动重试
- 一致性保证: 所有外部 API 调用使用统一的重试策略
- 可观测性: 完善的重试日志,便于问题排查
- 零业务中断: 向后兼容,不影响现有功能
1.3 修复策略
核心策略: 渐进式修复 + 统一封装
Phase 1: 架构优化(创建统一重试封装层)
↓
Phase 2: 高风险修复(交易核心操作)
↓
Phase 3: 中风险修复(查询和账户操作)
↓
Phase 4: 验证与监控(测试和上线观察)
二、技术方案设计
2.1 架构设计:统一重试封装层
2.1.1 设计原则
- 单一职责: 每个封装方法只负责一个 API 调用
- 参数化配置: 根据操作类型使用不同的重试参数
- 向后兼容: 保持现有 API 签名不变
- 可测试性: 便于单元测试和集成测试
2.1.2 文件结构
src/trading/
├── executor.py # 现有执行器(需修改调用)
├── executor_retry_wrapper.py # 新增:重试封装混入类
└── retry_config.py # 新增:重试参数配置
src/services/
└── realtime_kline_service_base.py # 已修复:新币种监控
src/utils/
└── retry_utils.py # 已有:通用重试工具
2.1.3 重试参数配置策略
| 操作类型 | max_attempts | min_wait | max_wait | 理由 |
|---|---|---|---|---|
| 交易执行 | 3 | 1s | 5s | 快速失败,避免市场变化 |
| 订单查询 | 4 | 2s | 10s | 允许更长重试窗口 |
| 账户管理 | 4 | 3s | 15s | 非紧急操作,可容忍延迟 |
| 数据查询 | 5 | 2s | 15s | 数据补充允许多次重试 |
2.2 代码实现方案
2.2.1 创建 src/trading/retry_config.py
"""
Executor 重试参数配置
定义不同操作类型的重试策略
"""
from dataclasses import dataclass
@dataclass
class RetryConfig:
"""重试配置参数"""
description: str
max_attempts: int
min_wait: int
max_wait: int
# 交易执行类操作(快速失败)
TRADING_RETRY_CONFIG = RetryConfig(
description="交易操作",
max_attempts=3,
min_wait=1,
max_wait=5,
)
# 订单查询类操作
ORDER_QUERY_RETRY_CONFIG = RetryConfig(
description="订单查询",
max_attempts=4,
min_wait=2,
max_wait=10,
)
# 账户管理类操作
ACCOUNT_RETRY_CONFIG = RetryConfig(
description="账户管理",
max_attempts=4,
min_wait=3,
max_wait=15,
)
# 数据查询类操作
DATA_QUERY_RETRY_CONFIG = RetryConfig(
description="数据查询",
max_attempts=5,
min_wait=2,
max_wait=15,
)
2.2.2 创建 src/trading/executor_retry_wrapper.py
"""
Executor 交易操作重试封装混入类
将所有 exchange/info API 调用封装为带重试的方法
"""
import logging
from typing import Dict, Any, Optional
from src.utils.retry_utils import api_retry, is_retryable_readonly
from src.trading.retry_config import (
TRADING_RETRY_CONFIG,
ORDER_QUERY_RETRY_CONFIG,
ACCOUNT_RETRY_CONFIG,
DATA_QUERY_RETRY_CONFIG,
)
logger = logging.getLogger(__name__)
class ExecutorRetryMixin:
"""
交易执行器重试混入类
提供所有外部 API 调用的重试封装方法
使用方式: class Executor(ExecutorRetryMixin, ...):
"""
# ============================================================
# 交易执行类(高风险,快速失败)
# ============================================================
@api_retry(
TRADING_RETRY_CONFIG.description + "-限价单下单",
max_attempts=TRADING_RETRY_CONFIG.max_attempts,
min_wait=TRADING_RETRY_CONFIG.min_wait,
max_wait=TRADING_RETRY_CONFIG.max_wait,
retryable=is_retryable_readonly,
)
def _order_with_retry(
self,
name: str,
is_buy: bool,
sz: float,
limit_px: float,
order_type: Dict[str, Any],
reduce_only: bool = False,
):
"""限价单下单(带重试)"""
return self._exchange.order(
name=name,
is_buy=is_buy,
sz=sz,
limit_px=limit_px,
order_type=order_type,
reduce_only=reduce_only,
)
@api_retry(
TRADING_RETRY_CONFIG.description + "-市价开仓",
max_attempts=TRADING_RETRY_CONFIG.max_attempts,
min_wait=TRADING_RETRY_CONFIG.min_wait,
max_wait=TRADING_RETRY_CONFIG.max_wait,
retryable=is_retryable_readonly,
)
def _market_open_with_retry(self, coin: str, is_buy: bool, size: float):
"""市价开仓(带重试)"""
return self._exchange.market_open(
coin,
is_buy,
size,
slippage=self._config.slippage,
)
@api_retry(
TRADING_RETRY_CONFIG.description + "-市价平仓",
max_attempts=TRADING_RETRY_CONFIG.max_attempts,
min_wait=TRADING_RETRY_CONFIG.min_wait,
max_wait=TRADING_RETRY_CONFIG.max_wait,
retryable=is_retryable_readonly,
)
def _market_close_with_retry(self, coin: str, size: float):
"""市价平仓(带重试)"""
return self._exchange.market_close(
coin,
sz=size,
slippage=self._config.slippage,
)
@api_retry(
TRADING_RETRY_CONFIG.description + "-撤单",
max_attempts=TRADING_RETRY_CONFIG.max_attempts,
min_wait=TRADING_RETRY_CONFIG.min_wait,
max_wait=TRADING_RETRY_CONFIG.max_wait,
retryable=is_retryable_readonly,
)
def _cancel_order_with_retry(self, coin: str, oid: int):
"""撤单操作(带重试)"""
return self._exchange.cancel(coin, oid)
# ============================================================
# 订单查询类
# ============================================================
@api_retry(
ORDER_QUERY_RETRY_CONFIG.description + "-订单簿查询",
max_attempts=ORDER_QUERY_RETRY_CONFIG.max_attempts,
min_wait=ORDER_QUERY_RETRY_CONFIG.min_wait,
max_wait=ORDER_QUERY_RETRY_CONFIG.max_wait,
retryable=is_retryable_readonly,
)
def _l2_snapshot_with_retry(self, coin: str):
"""查询订单簿快照(带重试)"""
return self._info.l2_snapshot(coin)
@api_retry(
ORDER_QUERY_RETRY_CONFIG.description + "-订单状态查询",
max_attempts=ORDER_QUERY_RETRY_CONFIG.max_attempts,
min_wait=ORDER_QUERY_RETRY_CONFIG.min_wait,
max_wait=ORDER_QUERY_RETRY_CONFIG.max_wait,
retryable=is_retryable_readonly,
)
def _query_order_by_oid_with_retry(self, address: str, oid: int):
"""查询订单状态(带重试)"""
return self._info.query_order_by_oid(address, oid)
@api_retry(
ORDER_QUERY_RETRY_CONFIG.description + "-全市场价格",
max_attempts=ORDER_QUERY_RETRY_CONFIG.max_attempts,
min_wait=ORDER_QUERY_RETRY_CONFIG.min_wait,
max_wait=ORDER_QUERY_RETRY_CONFIG.max_wait,
retryable=is_retryable_readonly,
)
def _all_mids_with_retry(self):
"""获取全市场中间价(带重试)"""
return self._info.all_mids()
# ============================================================
# 账户管理类
# ============================================================
@api_retry(
ACCOUNT_RETRY_CONFIG.description + "-杠杆设置",
max_attempts=ACCOUNT_RETRY_CONFIG.max_attempts,
min_wait=ACCOUNT_RETRY_CONFIG.min_wait,
max_wait=ACCOUNT_RETRY_CONFIG.max_wait,
retryable=is_retryable_readonly,
)
def _update_leverage_with_retry(self, leverage: int, coin: str, is_cross: bool):
"""设置杠杆(带重试)"""
return self._exchange.update_leverage(leverage, coin, is_cross=is_cross)
@api_retry(
ACCOUNT_RETRY_CONFIG.description + "-账户抽象状态查询",
max_attempts=ACCOUNT_RETRY_CONFIG.max_attempts,
min_wait=ACCOUNT_RETRY_CONFIG.min_wait,
max_wait=ACCOUNT_RETRY_CONFIG.max_wait,
retryable=is_retryable_readonly,
)
def _query_user_abstraction_state_with_retry(self, address: str):
"""查询账户抽象状态(带重试)"""
return self._info.query_user_abstraction_state(address)
@api_retry(
ACCOUNT_RETRY_CONFIG.description + "-账户模式设置",
max_attempts=ACCOUNT_RETRY_CONFIG.max_attempts,
min_wait=ACCOUNT_RETRY_CONFIG.min_wait,
max_wait=ACCOUNT_RETRY_CONFIG.max_wait,
retryable=is_retryable_readonly,
)
def _user_set_abstraction_with_retry(self, address: str, target_abstraction: bool):
"""设置账户模式(带重试)"""
return self._exchange.user_set_abstraction(address, target_abstraction)
三、分阶段修复计划
Phase 1: 架构优化(第1天)
任务清单
- [x] ✅ 已完成:创建
docs/RETRY_MECHANISM.md - [x] ✅ 已完成:修复
realtime_kline_service_base.py:1637 - [ ] 创建
src/trading/retry_config.py - [ ] 创建
src/trading/executor_retry_wrapper.py - [ ] 在
executor.py中添加ExecutorRetryMixin继承
实施步骤
步骤 1.1: 创建重试配置模块
# 创建文件
touch src/trading/retry_config.py
# 添加内容(参见 2.2.1)
步骤 1.2: 创建重试封装混入类
# 创建文件
touch src/trading/executor_retry_wrapper.py
# 添加内容(参见 2.2.2)
步骤 1.3: 修改 executor.py 继承关系
# 在 executor.py 顶部添加 import
from src.trading.executor_retry_wrapper import ExecutorRetryMixin
# 修改类定义
class Executor(ExecutorRetryMixin): # 添加混入类继承
def __init__(self, ...):
...
验证步骤:
# 语法检查
python -m py_compile src/trading/retry_config.py
python -m py_compile src/trading/executor_retry_wrapper.py
python -m py_compile src/trading/executor.py
# 启动测试(确保不影响现有功能)
python -m pytest tests/ -k test_executor -v
Phase 2: 高风险修复(第2-3天)
2.1 市价交易修复(Priority 1)
修复项:
- Line 1245:
exchange.market_open() - Line 1297:
exchange.market_close()
修改点 1: executor.py:1245
# 修改前:
order_result = self._exchange.market_open(
coin,
is_buy,
size,
slippage=self._config.slippage,
)
# 修改后:
order_result = self._market_open_with_retry(coin, is_buy, size)
修改点 2: executor.py:1297
# 修改前:
order_result = self._exchange.market_close(
coin,
sz=size,
slippage=self._config.slippage,
)
# 修改后:
order_result = self._market_close_with_retry(coin, size)
测试验证:
# 在 tests/ 中添加测试
def test_market_open_with_retry():
"""测试市价开仓重试机制"""
# 模拟 429 错误
# 验证重试行为
pass
def test_market_close_with_retry():
"""测试市价平仓重试机制"""
pass
2.2 限价单交易修复(Priority 1)
修复项:
- Line 487:
exchange.order() - Line 545:
exchange.cancel()
修改点 1: executor.py:487
# 修改前:
order_result = self._exchange.order(
name=coin,
is_buy=is_buy,
sz=size,
limit_px=price,
order_type={"limit": {"tif": "Gtc"}},
reduce_only=reduce_only,
)
# 修改后:
order_result = self._order_with_retry(
name=coin,
is_buy=is_buy,
sz=size,
limit_px=price,
order_type={"limit": {"tif": "Gtc"}},
reduce_only=reduce_only,
)
修改点 2: executor.py:545
# 修改前:
resp = self._exchange.cancel(coin, oid)
# 修改后:
resp = self._cancel_order_with_retry(coin, oid)
2.3 价格查询修复(Priority 1)
修复项:
- Line 231:
info.l2_snapshot() - Line 1763:
info.all_mids()
修改点 1: executor.py:231
# 修改前:
snapshot = self._info.l2_snapshot(coin)
# 修改后:
snapshot = self._l2_snapshot_with_retry(coin)
修改点 2: executor.py:1763
# 修改前:
raw = self._info.all_mids()
# 修改后:
raw = self._all_mids_with_retry()
2.4 杠杆设置修复(Priority 2)
修复项:
- Line 1352:
exchange.update_leverage()(cross) - Line 1361:
exchange.update_leverage()(isolated)
修改点 1: executor.py:1352
# 修改前:
resp = self._exchange.update_leverage(leverage, coin, is_cross=is_cross)
# 修改后:
resp = self._update_leverage_with_retry(leverage, coin, is_cross=is_cross)
修改点 2: executor.py:1361
# 修改前:
resp = self._exchange.update_leverage(leverage, coin, is_cross=False)
# 修改后:
resp = self._update_leverage_with_retry(leverage, coin, is_cross=False)
2.5 订单查询修复(Priority 2)
修复项:
- Line 748:
info.query_order_by_oid()
修改点: executor.py:748
# 修改前:
status_resp = self._info.query_order_by_oid(
self._wallet.address, order_result.order_id
)
# 修改后:
status_resp = self._query_order_by_oid_with_retry(
self._wallet.address, order_result.order_id
)
Phase 3: 中风险修复(第4天)
3.1 账户管理修复
修复项:
- Line 1408:
info.query_user_abstraction_state() - Line 1452:
exchange.user_set_abstraction() - Line 1512:
info.user_state()
修改点 1: executor.py:1408
# 修改前:
return self._info.query_user_abstraction_state(self._wallet.address)
# 修改后:
return self._query_user_abstraction_state_with_retry(self._wallet.address)
修改点 2: executor.py:1452
# 修改前:
resp = self._exchange.user_set_abstraction(self._wallet.address, target_abstraction)
# 修改后:
resp = self._user_set_abstraction_with_retry(self._wallet.address, target_abstraction)
修改点 3: executor.py:1512
此处建议复用已有的 @api_retry 装饰器方法(如果存在),或添加新的封装方法。
3.2 数据查询修复
修复项:
- Line 123:
info.meta()(realtime_kline_service.py) - Line 122:
info.candles_snapshot()(hyperliquid_candles.py)
修改点 1: realtime_kline_service.py:123
创建类似 _fetch_exchange_meta_for_new_symbols() 的静态方法:
@staticmethod
@api_retry(
"通用版-交易所元数据",
max_attempts=4,
min_wait=5,
max_wait=30,
retryable=is_retryable_readonly,
)
def _fetch_exchange_meta() -> dict:
"""获取交易所 meta(含重试),用于初始化"""
from hyperliquid.info import Info
import hyperliquid.utils.constants as constants
info = Info(constants.MAINNET_API_URL, skip_ws=True, timeout=10)
return info.meta()
修改点 2: hyperliquid_candles.py:122
已有 fetch_candles_with_retry() 装饰器版本,确保使用该版本。
Phase 4: 验证与监控(第5天)
4.1 单元测试
创建 tests/test_executor_retry.py:
"""
测试 Executor 重试机制
"""
import pytest
from unittest.mock import Mock, patch
from hyperliquid.utils.error import ClientError, ServerError
from src.trading.executor import Executor
from src.trading.executor_retry_wrapper import ExecutorRetryMixin
class TestExecutorRetry:
"""测试 Executor 重试封装"""
def test_market_open_retry_on_429(self, mock_executor):
"""测试市价开仓在 429 时重试"""
# 模拟前2次失败,第3次成功
mock_executor._exchange.market_open.side_effect = [
ClientError("Rate limited", status_code=429),
ClientError("Rate limited", status_code=429),
{"status": "ok", "order_id": 12345},
]
result = mock_executor._market_open_with_retry("BTC", True, 0.1)
assert result["status"] == "ok"
assert mock_executor._exchange.market_open.call_count == 3
def test_order_retry_on_network_error(self, mock_executor):
"""测试限价单下单在网络错误时重试"""
from requests.exceptions import ConnectionError
mock_executor._exchange.order.side_effect = [
ConnectionError("Network error"),
{"status": "ok", "order_id": 12345},
]
result = mock_executor._order_with_retry(
name="BTC",
is_buy=True,
sz=0.1,
limit_px=50000,
order_type={"limit": {"tif": "Gtc"}},
)
assert result["status"] == "ok"
assert mock_executor._exchange.order.call_count == 2
def test_cancel_retry_exhausted(self, mock_executor):
"""测试撤单重试耗尽后抛出异常"""
mock_executor._exchange.cancel.side_effect = ServerError("Server error", status_code=500)
with pytest.raises(ServerError):
mock_executor._cancel_order_with_retry("BTC", 12345)
assert mock_executor._exchange.cancel.call_count == 3 # max_attempts
@pytest.fixture
def mock_executor():
"""创建模拟的 Executor 实例"""
executor = Mock(spec=Executor)
executor._exchange = Mock()
executor._info = Mock()
executor._wallet = Mock()
executor._config = Mock(slippage=0.05)
# 绑定重试方法
for method in dir(ExecutorRetryMixin):
if method.startswith('_') and method.endswith('_with_retry'):
setattr(executor, method, getattr(ExecutorRetryMixin, method).__get__(executor))
return executor
4.2 集成测试
在测试环境运行完整流程:
# 1. 启动服务
python src/main.py
# 2. 观察日志(应该看不到重试日志,说明正常)
tail -f logs/executor.log | grep "重试"
# 3. 模拟网络故障(可选)
# 使用 tc 命令模拟网络延迟/丢包
4.3 监控指标
添加监控指标跟踪重试情况:
# 在 executor.py 中添加
self.retry_metrics = {
"market_open_retries": 0,
"market_close_retries": 0,
"order_retries": 0,
"cancel_retries": 0,
}
# 在重试日志中增加计数
# 定期上报到监控系统
4.4 上线观察
观察周期: 上线后至少观察 3-7 天
关键指标:
- 重试成功率(成功重试次数 / 总重试次数)
- 重试频率(每小时重试次数)
- 重试耗时分布
- 最终失败次数
告警规则:
alerts:
- name: "高频重试告警"
condition: retry_count_per_hour > 10
severity: warning
- name: "重试失败告警"
condition: final_failure_count > 0
severity: error
四、风险评估与应对
4.1 潜在风险
| 风险 | 影响 | 概率 | 应对措施 |
|---|---|---|---|
| 重试逻辑错误导致交易重复 | 严重 | 低 | 充分测试,确保幂等性 |
| 重试延迟影响交易时机 | 中等 | 中 | 使用快速失败策略(3次/5秒内) |
| 重试日志过多影响性能 | 低 | 低 | 合理设置日志级别 |
| 改动引入 Bug | 中等 | 低 | 充分测试 + 灰度发布 |
4.2 回滚方案
快速回滚步骤:
# 1. 备份当前代码
git stash
# 2. 回滚到修改前版本
git revert <commit-hash>
# 3. 重启服务
systemctl restart trading-service
渐进式回滚:
如果某个模块出现问题,可以只回滚该模块:
# 临时禁用重试,直接调用原始方法
def _market_open_with_retry(self, coin, is_buy, size):
# 临时绕过重试
return self._exchange.market_open(coin, is_buy, size, slippage=self._config.slippage)
4.3 灰度发布策略
建议: 分批次上线
第1批: 10% 流量 → 观察24小时
↓
第2批: 50% 流量 → 观察24小时
↓
第3批: 100% 流量 → 观察72小时
五、实施检查清单
5.1 开发阶段
- [ ] 创建
retry_config.py - [ ] 创建
executor_retry_wrapper.py - [ ] 修改
executor.py继承 - [ ] 修改 8 个高风险调用点
- [ ] 修改 5 个中风险调用点
- [ ] 代码审查(Code Review)
5.2 测试阶段
- [ ] 单元测试通过
- [ ] 集成测试通过
- [ ] 性能测试通过(重试不影响性能)
- [ ] 模拟故障测试(429/5xx/网络错误)
5.3 上线阶段
- [ ] 备份当前代码
- [ ] 灰度发布(10% → 50% → 100%)
- [ ] 监控指标正常
- [ ] 日志无异常
- [ ] 运行 3-7 天无问题
5.4 文档阶段
- [ ] 更新
docs/RETRY_MECHANISM.md - [ ] 创建本修复计划文档
- [ ] 更新
CHANGELOG.md - [ ] 编写操作手册(如何查看重试日志)
六、时间与资源规划
6.1 工作量估算
| 阶段 | 工作量 | 人员 | 备注 |
|---|---|---|---|
| Phase 1: 架构优化 | 4 小时 | 1 人 | 创建封装层 |
| Phase 2: 高风险修复 | 8 小时 | 1 人 | 8 个修复点 |
| Phase 3: 中风险修复 | 4 小时 | 1 人 | 5 个修复点 |
| Phase 4: 测试验证 | 8 小时 | 1-2 人 | 单元测试 + 集成测试 |
| 总计 | 3 个工作日 | 1-2 人 | - |
6.2 时间表
第1天:
- 上午: Phase 1(架构优化)
- 下午: Phase 2.1-2.2(市价交易 + 限价单)
第2天:
- 上午: Phase 2.3-2.5(价格查询 + 杠杆 + 订单查询)
- 下午: Phase 3(中风险修复)
第3天:
- 上午: 单元测试 + 代码审查
- 下午: 集成测试 + 模拟故障测试
第4-5天:
- 灰度发布(10% → 50%)
第6-12天:
- 全量发布 + 观察期
七、成功标准
7.1 技术指标
- ✅ 所有 13 个 API 调用已添加重试机制
- ✅ 单元测试覆盖率 ≥ 80%
- ✅ 重试成功率 ≥ 95%(首次失败后重试成功)
- ✅ 重试引入的额外延迟 < 5 秒(P95)
- ✅ 无因重试导致的交易重复或资金损失
7.2 业务指标
- ✅ 交易执行成功率提升(减少因瞬时故障导致的失败)
- ✅ 系统可用性提升(429/5xx 不再导致服务异常)
- ✅ 运维告警减少(网络抖动不再触发告警)
7.3 观察期验证
上线后 7 天内:
- [ ] 无 P0/P1 级别故障
- [ ] 重试日志正常(有少量重试但最终成功)
- [ ] 无用户投诉或交易异常
- [ ] 性能指标正常(延迟、吞吐量)
八、附录
A. 参考文档
- docs/RETRY_MECHANISM.md - 重试机制设计文档
- src/utils/retry_utils.py - 重试工具实现
B. 相关 Issue
- 已修复:新币种监控缺少重试(realtime_kline_service_base.py:1637)
- 待修复:executor.py 中 13 个 API 调用
C. 修改文件清单
新增文件:
src/trading/retry_config.pysrc/trading/executor_retry_wrapper.pytests/test_executor_retry.pydocs/API_RETRY_FIX_PLAN.md(本文档)
修改文件:
src/trading/executor.py(13 处调用点 + 继承关系)src/services/realtime_kline_service.py(1 处)src/utils/hyperliquid_candles.py(确认已使用重试)docs/RETRY_MECHANISM.md(更新完成状态)
D. 联系人
- 技术负责人: [待填写]
- 测试负责人: [待填写]
- 上线负责人: [待填写]
文档版本: v1.0
最后更新: 2026-02-17
状态: 待执行