数据自愈并发优化设计方案(v2)

数据自愈并发优化设计方案(v2)

快速摘要

类别 改动项 优先级
存量 Bug(必修) _get_db_now() fallback 返回 naive datetime → TypeError P0
存量 Bug(必修) batch_insert 裸 INSERT,并发写入无冲突保护 P0(需先执行 DDL)
性能优化 KlineDataFiller 懒加载,消除 49 次废弃 HTTP 握手 P1
性能优化 ~100 次 DB 往返 → 1 次(批量查询含 db_now) P1
性能优化 串行修复 → 并发修复(max_workers=4) P2
代码质量 冗余连续性检查、日志精简、冷却锁保护 P3

部署依赖batch_insert ON CONFLICT 修复必须在 DDL 约束添加之后上线。建议顺序见第九节


一、现状分析

当前流程

_run_data_healingrealtime_kline_service_base.py:1807):

for (symbol, base_symbol) in heal_pairs:  # ~50+ 配对,串行
    DataHealingOrchestrator(...)           # → RepairExecutor.__init__
    │                                      #   → KlineDataFiller.__init__
    │                                      #   → Info(MAINNET_API_URL) HTTP 握手 ~0.85s  ← 根因
    │                                      #   ← 第 2-50 个握手完成后才被 shared_executor 覆盖,全部浪费
    _load_zscore_history()                # DB 查询 ×1(0.08-0.56s)
    _get_db_now()                         # SELECT NOW() ×1
    _diagnose()                           # 内存计算(~0ms)
    check_continuity()                    # ← 第一次连续性检查
    _final_assessment()                   # ← 第二次连续性检查(冗余)

瓶颈定量(日志实测,50 对)

操作 单次耗时 次数 总计 占比
KlineDataFiller init(HTTP 握手,49 次废弃) ~0.85s 50 ~43s 78%
_load_zscore_history DB 查询 ~0.15s avg ~50 ~7.5s 14%
ONDO/BTC DB 慢查询(特例,见附加优化 4d) 4.44s 1 4.44s 8%
_get_db_now SELECT NOW() ~0.01s ~50 ~0.5s ~1%
_final_assessment 冗余连续性检查 ~0.001s ~50 ~0.05s ~0%
总计 ~55s

基础设施

  • 连接池:psycopg 3.x ConnectionPoolmin_size=2, max_size=10,支持并发 DB 查询
  • 交易所 APIKlineDataFiller 内置 10 分钟冷却机制(threading.Lock() 保护)
  • 超时保护:全局 healing_timeout(300s)
  • 覆盖索引idx_analysis_results_pair_kline_time ON analysis_results (symbol, base_symbol, kline_time DESC, analysis_time DESC) WHERE zscore_4h IS NOT NULL(与批量 SQL 完全对齐)

二、存量 Bug 修复(独立于性能优化,可单独上线)

Bug-1:_get_db_now() fallback 返回 naive datetime

现状orchestrator.py:447

except Exception:
    return datetime.utcnow()   # ← naive datetime(无时区信息)

DB 的 SELECT NOW() 返回 timezone-aware datetime。_check_freshness 中做时间差计算时触发 TypeError

修复

# orchestrator.py — _get_db_now()(第 442-447 行)
def _get_db_now(self) -> datetime:
    try:
        rows = self.db_client.execute_query("SELECT NOW() AS now")
        return rows[0]['now']
    except Exception:
        return datetime.now(timezone.utc)   # ✅ 修复:统一 timezone-aware

Bug-2:batch_insert 裸 INSERT,并发写入无冲突保护

前提:必须先添加 UNIQUE 约束(DDL 变更)

当前 analysis_results 主键为 PRIMARY KEY (analysis_time, id)(analysis_time, symbol, base_symbol) 上无唯一约束。

-- 上线前在生产 DB 执行(低峰期,约 10-30 分钟)
CREATE UNIQUE INDEX CONCURRENTLY uq_analysis_results_time_pair
    ON analysis_results (analysis_time, symbol, base_symbol);

ALTER TABLE analysis_results
    ADD CONSTRAINT uq_analysis_results_time_pair
    UNIQUE USING INDEX uq_analysis_results_time_pair;

修复(约束添加后):

# timescaledb.py — AnalysisResultRepository.batch_insert
ON CONFLICT (analysis_time, symbol, base_symbol) DO NOTHING;  -- ✅ 并发写入安全

三、优化方案 A:KlineDataFiller 懒加载

根因与解法

49 次废弃 HTTP 握手(~43s,78%)是最大瓶颈。根因在于 KlineDataFiller.__init__ 立即触发握手,而非首次实际 API 调用时触发。

修复:将 _info 改为懒加载,首次通过 info property 访问时初始化:

# kline_data_filler.py
class KlineDataFiller:
    def __init__(self, exchange_id: str, kline_repo: KlineRepository):
        self._exchange_id = exchange_id
        self._kline_repo = kline_repo
        self._info: Optional[Info] = None        # ← 不在 __init__ 中初始化
        self._info_lock = threading.Lock()        # ← 双重检查锁,线程安全

    @property
    def info(self) -> Info:
        """懒加载 Info 实例,首次调用时触发 HTTP 握手(线程安全)。"""
        if self._info is None:
            with self._info_lock:
                if self._info is None:            # ← 双重检查
                    self._info = Info(MAINNET_API_URL, skip_ws=True, timeout=30)
        return self._info

将类内部所有 self._info 直接访问替换为 self.info(通过 property)。

效果

场景 改动前 改动后
50 对全部健康(无修复) 50 次握手,~43s 0 次握手
5 对需修复(共享 executor) 50 次握手,~43s 1 次握手(首次修复时触发)
接口改动范围 仅 kline_data_filler.py 内部,外部接口零改动

接口不变的意义

懒加载后,DataHealingOrchestrator.__init__ 不再触发 HTTP 握手,因此:

  • DataHealingOrchestrator.__init__ 无需新增 executor 可选参数
  • heal_and_prepare() 无需新增 preloaded_diagnosis 可选参数
  • 并发修复路径直接复用已有的 orchestrator.executor = shared_executor 模式(见方案 C)

四、优化方案 B:批量 DB 查询

目标

50+ 次 DB 往返 → 1 次db_now 内嵌在同一查询中)

核心 SQL

-- boundary_days = math.ceil(required_count × interval_minutes / 1440) + LOAD_SAFETY_MARGIN_DAYS
SELECT
    NOW() AS db_now,                          -- 消除独立 SELECT NOW(),同一事务快照
    symbol, base_symbol,
    kline_time, zscore_4h, analysis_time
FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY symbol, base_symbol
        ORDER BY kline_time DESC
    ) AS rn
    FROM (
        SELECT DISTINCT ON (symbol, base_symbol, kline_time)
            symbol, base_symbol, kline_time, zscore_4h, analysis_time
        FROM analysis_results
        WHERE (symbol, base_symbol) IN (VALUES (%s,%s), (%s,%s), ...)
          AND zscore_4h IS NOT NULL
          AND kline_time >= NOW() - INTERVAL '{boundary_days} days'
        ORDER BY symbol, base_symbol, kline_time DESC, analysis_time DESC
    ) deduped
) ranked
WHERE rn <= %s
ORDER BY symbol, base_symbol, kline_time ASC

boundary_days 为 Python 计算的整型字面量,不涉及用户输入,字符串插值安全。

失败处理:跳过本轮,不降级

批量查询失败意味着 DB 已出现问题,此时降级执行 100 次串行查询只会加重负担。自愈是后台补偿流程,跳过一轮等下次调度重试更安全。

# realtime_kline_service_base.py — _run_data_healing()
try:
    grouped, db_now = batch_load_zscore_history(heal_pairs, required_count, self.db_client)
except Exception as e:
    logger.error(f"批量查询失败,跳过本轮自愈: {e}", exc_info=True)
    return

Statement Timeout

将现有 DB_STATEMENT_TIMEOUT_MS 默认值从 10s 调整为 15s。批量查询取代了所有单配对查询,统一使用此值即可,无需新增常量。

上线前验证执行计划(必做)

EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT NOW() AS db_now, symbol, base_symbol, ...
-- 用真实 10-50 个配对测试
执行计划 预期耗时 处理方式
Index Scan ~0.5-1s 直接上线 ✅
Bitmap Index Scan ~0.5-2s 可接受 ✅
Seq Scan < 2s ~1-2s 可接受 ✅
Seq Scan ≥ 2s >2s 重写 SQL 或调整索引,不使用 enable_seqscan = off ⚠️

SET LOCAL enable_seqscan = off 会干扰规划器全局行为并掩盖根因,禁止作为解决方案。

代码改动

  1. orchestrator.py:新增 batch_load_zscore_history() 静态方法(含 db_now);_get_db_now() fallback 修复
  2. realtime_kline_service_base.py_run_data_healing() 调用批量查询,失败 logger.error + return
  3. config.pyDB_STATEMENT_TIMEOUT_MS 默认值调整为 15000

五、优化方案 C:并发修复

前提

批量查询后数据已在内存中。诊断是纯 CPU 操作(~0ms/对),不需要并发。只有不健康配对调用 RepairExecutor.repair()(涉及 DB 写入 + 交易所 API),才进入并发。

线程安全验证

组件 线程安全方式
db_clientConnectionPool psycopg 3.x 连接池原生线程安全
kline_repo / analysis_repo 每次操作从连接池独立获取连接
kline_fillerKlineDataFiller 冷却字典由 threading.Lock() 保护;懒加载由双重检查锁保护
repair() 方法本身 无跨调用共享的可变状态

设计:直接属性替换,接口零改动

有了懒加载(方案 A),DataHealingOrchestrator.__init__ 不再触发握手,无需修改其接口。直接复用已有的属性替换模式:

def _run_concurrent_repair(
    unhealthy_pairs: list[tuple[str, str]],
    required_count: int,
    db_client: TimescaleDBClient,
    kline_repo: KlineRepository,
) -> dict[tuple[str, str], HealingResult]:
    if not unhealthy_pairs:
        return {}

    # 懒加载:握手在首次修复时触发,而非此处
    shared_executor = RepairExecutor(db_client, kline_repo)

    results: dict[tuple[str, str], HealingResult] = {}

    def _heal_one(pair: tuple[str, str]) -> tuple[tuple[str, str], HealingResult]:
        symbol, base_symbol = pair
        orchestrator = DataHealingOrchestrator(
            db_client=db_client,
            kline_repo=kline_repo,
            symbol=symbol,
            base_symbol=base_symbol,
        )
        orchestrator.executor = shared_executor   # ✅ 直接替换,DataHealingOrchestrator 接口零改动
        result = orchestrator.heal_and_prepare(required_count=required_count)
        return (symbol, base_symbol), result

    max_workers = min(4, len(unhealthy_pairs))
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {pool.submit(_heal_one, pair): pair for pair in unhealthy_pairs}
        for future in as_completed(futures):
            try:
                key, result = future.result()
                results[key] = result
            except Exception as e:
                pair = futures[future]
                logger.error(f"并发修复异常 [{pair[0]}/{pair[1]}]: {e}", exc_info=True)
                empty_quality = QualityAssessor().assess([], required_count, False, 0)
                results[(pair[0], pair[1])] = HealingResult(
                    status='failed', data=[], quality=empty_quality, iterations_used=0,
                )

    return results

DataHealingOrchestrator.__init__heal_and_prepare() 签名均未修改

并发度与连接池

heal_and_prepare() 内部操作(顺序执行):
  1. _load_zscore_history()       ← 1 个连接
  2. _fill_kline_gaps_parallel()  ← 最多 2 个连接(ThreadPoolExecutor(max_workers=2))
  3. batch_insert()               ← 1 个连接

峰值连接数(步骤 2 阶段):
  4 workers × 2 = 8 并发连接
  8 ≤ ConnectionPool.max_size=10  ✅(留 2 个连接作安全余量)

并发修复中,每个 unhealthy 配对的 heal_and_prepare 内部仍执行一次 _load_zscore_history(),加载最新数据(不复用批量缓存,防时间差导致数据过时)。


六、附加优化

6a. 健康配对跳过冗余评估

冗余路径

heal_and_prepare()
├─ _diagnose()
│   └─ check_continuity()    ← 第 1 次
└─ _final_assessment()
    └─ check_continuity()    ← 第 2 次(参数完全相同,纯冗余)

优化:诊断结果为 is_healthy=True 时,直接复用诊断结论构建 HealingResult,跳过 _final_assessment

# orchestrator.py — heal_and_prepare() 内
if diagnosis.is_healthy:
    quality = self.assessor.assess(records, required_count, is_continuous=True, missing_count=0)
    return HealingResult(
        status=self._determine_status(quality),
        data=self._extract_zscore_values(records),
        quality=quality,
        iterations_used=iteration,
    )

6b. 日志精简

优化前:每个健康配对输出 5 行 INFO 日志(50 对 = 250 行)

优化后:健康配对仅 DEBUG 级别,汇总 1 行 INFO:

自愈完成 | healed=3, failed=0, healthy=47 | 耗时: 0.42s

6c. _is_in_cooldown 锁保护(并发修复上线前完成)

原标注为"可选",但并发修复上线后读写竞态实际存在,升级为推荐:

# kline_data_filler.py
def _is_in_cooldown(self, symbol: str, timeframe: str) -> bool:
    with self._cooldown_lock:                      # ✅ 与写操作一致
        last_fill = self.fill_cooldown.get((symbol, timeframe), 0)
    return time.time() - last_fill < self.COOLDOWN_SECONDS

6d. ONDO/BTC 慢查询根因排查(批量 SQL 上线前完成)

ONDO/BTC 单次查询耗时 4.44s,是其他配对的 30 倍。批量 SQL 走 Index Scan 时此配对会成为全局瓶颈。

-- Step 1:确认数据量
SELECT COUNT(*) FROM analysis_results
WHERE symbol = 'ONDO/USDC:USDC' AND base_symbol = 'BTC/USDC:USDC' AND zscore_4h IS NOT NULL;

-- Step 2:检查执行计划
EXPLAIN (ANALYZE, BUFFERS) SELECT DISTINCT ON (kline_time) ...

-- Step 3:若数据量过大,清理超出保留窗口的历史数据
DELETE FROM analysis_results
WHERE symbol = 'ONDO/USDC:USDC' AND base_symbol = 'BTC/USDC:USDC'
  AND kline_time < NOW() - INTERVAL '60 days';

七、重构后流程

_run_data_healing():
│
├── 1. 查询 DB 中有数据的配对              # 已有逻辑,不变
├── 2. 构建 heal_pairs 列表               # 已有逻辑,不变
│
├── 3.【新】批量加载(含 db_now,1 次 DB 往返)
│      batch_load_zscore_history(heal_pairs, required_count, db_client)
│      → grouped: {(sym, base): [records...]}
│      → db_now: datetime(timezone-aware)
│      失败 → logger.error + return(跳过本轮,等下次调度)
│
├── 4.【新】批量诊断(内存操作,~0ms)
│      for pair in heal_pairs:
│          records = grouped.get(pair, [])
│          orchestrator = DataHealingOrchestrator(...)   # 懒加载:无 HTTP 握手,仅对象分配
│          diagnosis = orchestrator._diagnose(records, required_count, db_now)
│          if is_healthy → 跳过 _final_assessment,直接构建结果(见 6a)
│          else          → 加入 unhealthy_pairs
│
├── 5.【新】并发修复不健康配对
│      shared_executor = RepairExecutor(...)   # 懒加载:握手延迟至首次修复调用
│      ThreadPoolExecutor(max_workers=min(4, N)):
│          for pair in unhealthy_pairs:
│              orchestrator.executor = shared_executor  # 直接替换,接口零改动
│              orchestrator.heal_and_prepare(required_count)
│              → 重新加载最新数据(1 次 DB 查询/配对)
│              → 修复 + 评估
│
└── 6. 汇总日志(1 行)
       healed=X, failed=Y, healthy=Z | 耗时: T s

八、预期性能对比

指标 优化前 优化后 提升
HTTP 握手次数 50 次(49 次废弃) 0 次(全健康)/ 1 次(有修复) 消除
HTTP 握手总耗时 ~43s 0s / ~0.85s 消除
DB 查询次数(加载 + NOW) ~100 次 1 次 100x
DB 查询总耗时 ~8s ~0.3s 25x
全健康场景总耗时 ~55s ~0.35s ~157x
有 5 对需修复 ~55s ~2.8s ~20x
有 15 对需修复 ~55s ~7-8s ~7-8x
日志行数 ~250 行 ~10 行 精简

全健康场景耗时拆解(优化后):

1 次批量 DB 查询(含 db_now):  ~0.3s
50 对内存诊断(对象分配 + CPU):~0.05s
HTTP 握手:                     0s(无需修复,懒加载不触发)
─────────────────────────────────────────
总计:                          ~0.35s

有 5 对需修复的场景(优化后,max_workers=4):

1 次批量 DB 查询:                   ~0.3s
50 对内存诊断:                      ~0.05s
1 次 HTTP 握手(首次修复时触发):    ~0.85s
5 对各自重新加载数据(并发):         ~0.15s
5 对并发修复(4+1 批,含 DB 写入):  ~1.5s
─────────────────────────────────────────
总计:                               ~2.8s

九、部署顺序

Step 1:【DB 层】执行 DDL(低峰期,约 10-30 分钟)
   CREATE UNIQUE INDEX CONCURRENTLY ...
   ALTER TABLE ... ADD CONSTRAINT ...

Step 2:【代码层,独立,可提前】修复 Bug-1
   _get_db_now() fallback 改为 datetime.now(timezone.utc)

Step 3:【代码层,依赖 Step 1】上线 batch_insert ON CONFLICT
   timescaledb.py 追加 ON CONFLICT ... DO NOTHING

Step 4:【代码层,依赖 Step 3】上线方案 A(KlineDataFiller 懒加载)
   kline_data_filler.py:_info 懒加载 + 双重检查锁
   同步上线 6c(_is_in_cooldown 锁保护)

Step 5:【代码层,依赖 Step 4】上线方案 B(批量 DB 查询)
   上线前:EXPLAIN ANALYZE 验证执行计划(见四)
   上线前:完成 6d ONDO/BTC 慢查询排查

Step 6:【代码层,依赖 Step 5】上线方案 C(并发修复)

Step 7:【可选,非阻塞】上线附加优化 6a / 6b

十、改动文件清单

文件 改动类型 说明
DDL 迁移脚本 DDL 变更(Step 1,必须先执行) UNIQUE 索引 + 约束
kline_data_filler.py 核心改动(Step 4) _info 懒加载 + 双重检查锁;_is_in_cooldown 锁保护
orchestrator.py 新增方法 + Bug 修复 新增 batch_load_zscore_history()(返回 grouped + db_now);_get_db_now() fallback 修复
realtime_kline_service_base.py 重构方法 _run_data_healing() 改为批量加载 + 诊断 + 并发修复;新增 _run_concurrent_repair()
timescaledb.py Bug 修复(依赖 DDL) batch_insert 追加 ON CONFLICT ... DO NOTHING
config.py 参数调整 DB_STATEMENT_TIMEOUT_MS 默认值调整为 15000

不变的文件repair_executor.pycontinuity_checker.pyquality_assessor.py__init__.py

与 v1 相比删除的改动

删除项 删除原因
_batch_load_with_fallback() 替换为硬失败+跳过;DB 故障时降级执行 100 次串行查询适得其反
_fallback_individual_load() 同上,两条并行代码路径维护成本高于收益
_load_zscore_history_single() 无 fallback 路径则不再需要
DataHealingOrchestrator.__init__ executor 参数 懒加载后直接属性替换即可,无需污染构造函数接口
heal_and_prepare() preloaded_diagnosis 参数 跳过 0ms CPU 操作不值得引入接口复杂度
BATCH_STATEMENT_TIMEOUT_MS 常量 合并到现有 DB_STATEMENT_TIMEOUT_MS(调整默认值即可)
SET LOCAL enable_seqscan = off 干扰规划器,掩盖根因,禁止作为解决方案

十一、风险控制

风险 缓解措施
批量查询失败 logger.error 告警 + 跳过本轮,等下次调度重试;不静默降级
批量 SQL 走 Seq Scan 性能差 上线前 EXPLAIN (ANALYZE, BUFFERS) 验证;问题则重写 SQL 或调整索引
ONDO/BTC 慢查询拖慢批量 SQL 批量 SQL 上线前优先排查修复(见 6d)
DDL 变更加锁影响生产 CREATE UNIQUE INDEX CONCURRENTLY 避免锁,低峰期执行
DDL 未完成就上线 ON CONFLICT 运行时报错;严格按部署顺序操作
DB 连接池耗尽 max_workers=4,峰值 8 连接 ≤ max_size=10(留 2 个安全余量)
懒加载并发初始化竞态 双重检查锁(threading.Lock)保护,CPython GIL 之外也安全
_cooldown_lock 并发读竞态 6c 与方案 A 同步上线
datetime naive/aware 混用 Bug-1 修复,统一 datetime.now(timezone.utc)
并发修复异常传播 每个 future 独立 try/except,单配对失败不影响其他
交易所 API 频率限制 KlineDataFiller 10 分钟冷却机制,4 并发不会重复请求同一 symbol
超时保护 全局 healing_timeout(300s) 保持不变
向后兼容 DataHealingOrchestratorheal_and_prepare 接口完全不变,原有调用方无需修改

Read more

跑步的技巧(滚动落地)

“滚动落地(rolling contact / rolling foot strike)”不是一种教条式的“脚法”,而是一种 让冲击沿着整只脚、整条后链逐级传递的落地机制。 它的核心不是“你先用哪儿着地”,而是: 你的脚落地之后,冲击是不是像轮子一样滚过去,而不是像锤子一样砸下去。 这就是滚动落地的本质。 一、什么叫“滚动落地”? 你可以把它理解成两种完全不同的落地方式: 1. 砸地(撞击式) 脚像锤子一样拍到地上: * 要么后跟先砸 * 要么前掌先戳 * 冲击集中在一个点 * 一个结构瞬间吃掉大部分载荷 结果就是: * 后跟砸 → 膝盖难受 * 前掌戳 → 前脚掌磨烂 * 都不是长跑友好模式 这叫 撞击式着地(impact strike)。 2. 滚地(滚动式) 脚像轮胎一样“滚”过地面: * 不是某一点硬砸 * 而是外侧中足先轻触 * 再向前滚到前掌 * 最后从大脚趾蹬离

By SHI XIAOLONG

AMI的优越性

世界模型(World Models)的具体例子 如下,我按类型分类,便于理解。每类都附带实际实现、演示效果和应用场景。 1. Yann LeCun / Meta 的 JEPA 系列(最直接对应“世界模型”概念) 这些是 LeCun 主张的非生成式抽象预测世界模型代表。 * I-JEPA(Image JEPA,2023) 输入一张图像,模型把不同区域(context 和 target)编码成抽象表示,然后预测 target 的表示(不在像素级别重建)。 例子:给定一张遮挡了部分物体的图片,模型能预测“被遮挡物体的大致位置和属性”,构建对物体持久性和空间关系的理解。 这是一个“原始世界模型”,能学习物理常识(如物体不会凭空消失)。 * V-JEPA / V-JEPA 2(Video JEPA,

By SHI XIAOLONG

什么是:“世界模型(World Models)”

世界模型(World Models) 是人工智能领域的一个核心概念,尤其在 Yann LeCun 等研究者推动的下一代 AI 架构中占据中心位置。它指的是 AI 系统在内部构建的对现实世界的抽象模拟或内部表示,让机器能够像人类或动物一样“理解”物理世界、预测未来、规划行动。 简单比喻 想象你闭上眼睛也能“看到”房间里的物体会如何移动、碰撞或掉落——这就是你大脑里的世界模型。AI 的世界模型就是类似的“数字孪生”(digital twin)或“内部模拟器”:它不是简单记住数据,而是学习世界的动态、因果关系和物理直觉(如重力、物体持久性、遮挡、因果等)。 为什么需要世界模型? 当前主流的大型语言模型(LLM) 擅长处理文本(统计模式预测),但存在根本局限: * 缺乏对物理世界的真正理解 → 容易“幻觉”、无法可靠规划。 * 样本效率低 → 人类/

By SHI XIAOLONG

K线周期可配置化设计方案

K线周期可配置化设计方案 1. 背景与目标 当前 Beta 套利策略的 K 线周期硬编码为 "1h",分散在多个文件中。需要: 1. 将 K 线周期从 1h 改为 2h 2. 提取为环境变量 BETA_ARB_KLINE_INTERVAL,使其可在 .env 中配置 2. 影响范围分析 2.1 需要修改的文件(共 6 个) 文件 硬编码位置 修改内容 src/trading/config.py BetaArbConfig dataclass 新增 kline_interval 字段,

By SHI XIAOLONG