从零构建分布式异步任务队列:在 Shopee 的实战经验

Clock Watch with Astronomical Dial, ca. 1605-10, Metropolitan Museum of Art
Jan Jansen Bockeltz,《天文日晷怀表》, c. 1605-10 — 精密的齿轮机构驱动时间计量,如同调度系统驱动任务的准时执行
这篇文章的价值:每个达到一定规模的后端系统都需要异步任务——延迟执行、定时调度、失败重试。开源方案(Celery、Bull)往往太重或不够灵活。这篇文章分享在 Shopee 从零构建一个分布式异步任务队列的完整经验:存储模型、调度策略、失败重试、优先级队列,以及在生产环境中踩过的坑。

为什么每家公司都在造异步任务轮子

几乎每个达到一定规模的后端系统都会遇到同一个问题:有些事情不能在用户请求里同步做完。

用户下单后,你需要在 30 分钟后检查是否付款、没付就取消。用户注册后,你需要异步发一封欢迎邮件。每天凌晨,你需要跑一次对账。每小时,你需要同步一次外部数据。这些"稍后再做"的事情,就是异步任务。

问题在于,"稍后再做"听起来简单,做起来一点都不简单:

这就是为什么 Celery、Sidekiq、Bull、Temporal 这些项目存在——它们解决的都是同一个问题。但在大公司里,通用方案往往不够用:你需要和内部 RPC 框架集成、需要适配公司的部署平台、需要和已有的监控体系打通。所以几乎每家有一定规模的公司都会自己造一个。

Shopee 的情况

2019 年我加入 Shopee 的平台团队,负责面向业务开发团队的中间件服务。当时各业务线都有自己的"异步任务"方案——有人用 crontab + 脚本,有人写个死循环轮询数据库,有人直接在业务代码里起 goroutine 然后祈祷它不会丢。

问题显而易见:没有统一的任务管理、没有监控、没有重试、任务丢了没人知道。更要命的是,每个团队都在重复踩同样的坑——重试没做幂等导致重复扣款,crontab 机器挂了没人发现导致对账断了一周。平台团队决定从零构建一套分布式异步任务队列系统,提供标准化的异步任务能力。

这篇文章记录这个系统从设计到落地的完整过程。三年间它从 0 做到 10+ 业务接入、QPS 从 2K 增长到 25K+、双十一峰值 50K。

需求分析

和各业务团队聊下来,异步任务的需求大致分三类:

类型 场景举例 延迟要求
短延迟任务 订单超时取消、优惠券过期、消息推送 秒级~分钟级
长延迟任务 T+1 对账、周报生成、数据归档 小时级~天级
Cron 定时任务 每日凌晨跑批、每小时数据同步、心跳检测 按 cron 表达式

除了功能需求,还有几个硬性约束:

系统架构


  ┌──────────────┐   ┌──────────────┐   ┌──────────────┐
  │  Order Svc   │   │  Payment Svc │   │  Report Svc  │    业务服务
  │  (Go SDK)    │   │  (Go SDK)    │   │ (Python SDK) │
  └──────┬───────┘   └──────┬───────┘   └──────┬───────┘
         │                  │                   │
         └──────────────────┼───────────────────┘
                            │ Submit (RPC)
                            ▼
               ┌───────────────────────────┐
               │        API Server         │
               │  ┌─────────┐ ┌─────────┐ │
               │  │ 参数校验 │ │延迟路由  │ │
               │  └─────────┘ └─────────┘ │
               └─────┬─────────────┬───────┘
                     │             │
        delay ≤ 10m  │             │  delay > 10m / Cron
                     ▼             ▼
            ┌──────────────┐ ┌──────────────┐
            │    Kafka     │ │    MySQL     │
            │              │ │              │
            │ topic-order  │ │ async_tasks  │    持久化层
            │ topic-pay    │ │ cron_registry│
            │ topic-report │ │              │
            └──────┬───────┘ └──────┬───────┘
                   │                │
                   ▼                ▼
            ┌─────────────────────────────────────────┐
            │            Dispatcher (N 实例)            │
            │                                          │
            │  ┌────────────┐  ┌───────────────────┐  │
            │  │   Kafka    │  │   MySQL Poller     │  │
            │  │  Consumer  │  │   (秒级扫描)        │  │
            │  └─────┬──────┘  └────────┬──────────┘  │
            │        │                  │              │
            │        ▼                  ▼              │
            │  ┌──────────────────────────────────┐   │
            │  │    ETA Priority Queue (Heap)      │   │    调度层
            │  │    按触发时间排序,到期即 dispatch   │   │
            │  └───────────────┬──────────────────┘   │
            │                  │                       │
            └──────────────────┼───────────────────────┘
                               │ Dispatch (RPC)
                               ▼
            ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
            │  Order Svc   │ │ Payment Svc  │ │  Report Svc  │    执行器
            │  (Executor)  │ │  (Executor)  │ │  (Executor)  │
            └──────┬───────┘ └──────┬───────┘ └──────┬───────┘
                   │                │                 │
                   └────────────────┼─────────────────┘
                                    │ 结果回调 (RPC)
                                    ▼
            ┌─────────────────────────────────────────┐
            │           Prometheus + Grafana           │    可观测性
            │  延迟 · 吞吐 · 成功率 · 队列深度 · 重试   │
            └─────────────────────────────────────────┘

为什么是 Kafka + MySQL,而不是纯 Kafka 或纯 Redis

这是设计阶段讨论最多的决策。

纯 Kafka 方案的问题:Kafka 不支持任意延迟投递。虽然可以用多个 topic 模拟不同延迟级别(1s/5s/30s/1min/...),但粒度太粗,且 topic 数量爆炸。对于"3 天后触发"这种长延迟,Kafka 完全不合适——消息堆积在 broker 上,浪费资源且 rebalance 时有风险。

纯 Redis 方案(Sorted Set + ZRANGEBYSCORE):延迟任务用 score 存触发时间,定时扫描到期任务。问题是 Redis 的持久化不够可靠——RDB 有时间窗口,AOF 在高 QPS 下性能下降。对于"至少一次投递"的硬性要求,Redis 作为唯一存储太冒险。

最终方案:Kafka 处理即时和短延迟任务(≤ 10 分钟),MySQL 处理长延迟和 Cron 任务。

短延迟任务:ETA Priority Queue

短延迟任务从 Kafka 消费后,不是直接 sleep 等待,而是放入一个基于最小堆的优先队列,按 ETA(Expected Time of Arrival,预期触发时间)排序:

// ETA Priority Queue —— 基于 container/heap
type Task struct {
    ID        string
    Type      string
    Payload   string
    ETA       time.Time  // 预期触发时间
    Priority  int        // 业务优先级(用于同 ETA 的 tiebreak)
    index     int        // heap 内部索引
}

type ETAQueue []*Task

func (q ETAQueue) Len() int { return len(q) }
func (q ETAQueue) Less(i, j int) bool {
    if q[i].ETA.Equal(q[j].ETA) {
        return q[i].Priority > q[j].Priority  // 同 ETA,高优先级先出
    }
    return q[i].ETA.Before(q[j].ETA)
}
func (q ETAQueue) Swap(i, j int) {
    q[i], q[j] = q[j], q[i]
    q[i].index = i
    q[j].index = j
}
func (q *ETAQueue) Push(x any) {
    t := x.(*Task)
    t.index = len(*q)
    *q = append(*q, t)
}
func (q *ETAQueue) Pop() any {
    old := *q
    n := len(old)
    t := old[n-1]
    old[n-1] = nil
    t.index = -1
    *q = old[:n-1]
    return t
}

Dispatcher 的调度循环:

func (d *Dispatcher) Run() {
    pq := &ETAQueue{}
    heap.Init(pq)

    for {
        // 1. 从 Kafka 消费新任务,推入堆
        for _, task := range d.consumeBatch() {
            heap.Push(pq, task)
        }

        // 2. 弹出所有已到期的任务
        now := time.Now()
        for pq.Len() > 0 && (*pq)[0].ETA.Before(now) {
            task := heap.Pop(pq).(*Task)
            go d.dispatch(task)  // 异步执行
        }

        // 3. 计算下次唤醒时间:堆顶的 ETA 或兜底 100ms
        var sleepUntil time.Duration
        if pq.Len() > 0 {
            sleepUntil = time.Until((*pq)[0].ETA)
            if sleepUntil < 0 {
                sleepUntil = 0
            }
        } else {
            sleepUntil = 100 * time.Millisecond
        }
        time.Sleep(sleepUntil)
    }
}

为什么选堆而不是 Timer Wheel:

长延迟任务:MySQL Poller

长延迟任务存在 MySQL 里,核心表结构:

CREATE TABLE async_tasks (
    id          BIGINT PRIMARY KEY AUTO_INCREMENT,
    task_type   VARCHAR(128) NOT NULL,
    payload     TEXT NOT NULL,
    trigger_at  DATETIME NOT NULL,    -- 触发时间
    status      TINYINT DEFAULT 0,    -- 0=pending, 1=dispatched, 2=success, 3=failed
    retry_count INT DEFAULT 0,
    max_retries INT DEFAULT 3,
    created_at  DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at  DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_trigger (status, trigger_at)
);

Dispatcher 的 poller 每秒执行:

SELECT id, task_type, payload
FROM async_tasks
WHERE status = 0 AND trigger_at <= NOW()
ORDER BY trigger_at
LIMIT 1000
FOR UPDATE SKIP LOCKED;

FOR UPDATE SKIP LOCKED 是关键——多个 Dispatcher 实例并行扫描时,已经被其他实例锁住的行会被跳过,避免了分布式锁的复杂性。这是 MySQL 8.0 引入的特性,完美适合这个场景。

扫描到的任务立即更新状态为 dispatched,然后通过 RPC 调用业务服务的执行器。如果 RPC 失败或超时,任务状态回退为 pendingretry_count 加一,trigger_at 设为当前时间加上退避延迟(指数退避 + 抖动)。

Cron 定时任务

Cron 任务的实现分两层:

  1. Cron Scheduler:解析 cron 表达式,计算下一次触发时间,写入 MySQL 的 async_tasks 表。本质上是把 Cron 任务转化为长延迟任务。
  2. Cron Registry:存储 cron 规则的元信息(cron 表达式、关联的 task_type、负责人等)。
// Cron Scheduler 核心循环
func (s *CronScheduler) Run() {
    for {
        rules := s.loadActiveRules()  // 从 cron_registry 加载
        for _, rule := range rules {
            next := rule.NextTriggerTime()
            if !s.taskExistsForTime(rule.TaskType, next) {
                s.createTask(rule.TaskType, rule.Payload, next)
            }
        }
        time.Sleep(10 * time.Second)  // 每 10 秒检查一次
    }
}

这个设计的好处是 Cron Scheduler 是无状态的——它只负责"确保下一次触发的任务存在于 MySQL 中"。即使 Scheduler 挂了重启,也不会丢任务,因为它是幂等的(检查是否已存在再创建)。

SDK 设计

Go SDK 和 Python SDK 是业务团队直接使用的接口。设计原则:尽可能简单,隐藏底层复杂性。

// Go SDK - 提交任务
client := asyncd.NewClient(asyncd.WithServiceName("order-service"))

// 即时任务
err := client.Submit(ctx, &asyncd.Task{
    Type:    "cancel_expired_order",
    Payload: `{"order_id": 12345}`,
})

// 延迟任务
err := client.SubmitDelayed(ctx, &asyncd.Task{
    Type:    "send_review_reminder",
    Payload: `{"user_id": 67890}`,
}, 3 * 24 * time.Hour)  // 3 天后

// 注册执行器
client.RegisterExecutor("cancel_expired_order", func(ctx context.Context, payload string) error {
    var req CancelOrderRequest
    json.Unmarshal([]byte(payload), &req)
    return orderService.Cancel(ctx, req.OrderID)
})

SDK 内部做了几件事:

Kubernetes Operator

系统部署在 Kubernetes 上。一开始用标准的 Deployment + ConfigMap,但运维痛点很快暴露:

于是我们开发了一个 Custom Operator,引入了 CRD:

apiVersion: asyncd.example.io/v1
kind: AsyncJobConfig
metadata:
  name: order-cancel-job
spec:
  taskType: cancel_expired_order
  executor:
    service: order-service
    method: CancelExpiredOrder
    timeout: 30s
  retry:
    maxRetries: 3
    backoff: exponential
  kafka:
    topic: asyncd-order-cancel
    partitions: 6
    consumerGroup: asyncd-order-cancel-cg

Operator 监听 CRD 的变更,自动完成:

这大幅降低了业务接入的门槛——从"提工单找平台团队配置"变成"提交一个 YAML"。

监控与告警

每个环节都暴露了 Prometheus metrics:

// 任务提交
asyncd_task_submitted_total{task_type, source_service}
asyncd_task_submit_latency_seconds{task_type}

// 任务调度
asyncd_task_dispatched_total{task_type}
asyncd_task_dispatch_delay_seconds{task_type}  // 实际触发时间 - 预期触发时间

// 任务执行
asyncd_task_executed_total{task_type, status}   // status: success/failed/timeout
asyncd_task_execution_duration_seconds{task_type}
asyncd_task_retry_total{task_type}

// 队列健康
asyncd_kafka_consumer_lag{topic, partition}
asyncd_mysql_pending_tasks{task_type}
asyncd_mysql_poll_duration_seconds

最关键的告警规则:

双十一:从 25K 到 50K QPS

2021 年双十一是这个系统最大的考验。日常 QPS 在 25K 左右,预估峰值需要支撑 50K+。

提前做了几件事:

实际峰值达到了约 48K QPS,系统扛住了,没有任务丢失。核心指标:

指标 日常 双十一峰值
QPS ~25K ~48K
P99 调度延迟 180ms 520ms
任务丢失 0 0
重试率 0.3% 1.2%

踩过的坑

1. MySQL 的 datetime 精度

MySQL 的 DATETIME 默认精度是秒。当多个任务的 trigger_at 在同一秒内,ORDER BY trigger_at 的顺序不确定。对于秒级延迟任务,这导致了任务被乱序执行。解决方案:改用 DATETIME(3)(毫秒精度),并在 ORDER BY 中加上 id 作为 tiebreaker。

2. Kafka rebalance 风暴

早期 consumer 没有设置合理的 session.timeout.msheartbeat.interval.ms,业务执行器偶尔的慢请求导致 consumer 被认为挂了,触发 rebalance,rebalance 期间所有 consumer 停止消费,导致延迟飙升。最终调整为 session.timeout=30sheartbeat.interval=10smax.poll.interval=5min,同时执行器调用改为异步——消费消息后立即 commit,执行结果通过回调上报。

3. Poller 的惊群效应

多个 Dispatcher 实例同时扫描 MySQL,虽然 FOR UPDATE SKIP LOCKED 避免了重复执行,但大量的锁竞争导致 MySQL CPU 飙高。解决方案:每个 Dispatcher 实例负责特定的 task_type 范围(通过一致性哈希分配),避免所有实例扫描同一批数据。

4. 时区问题

Shopee 业务覆盖东南亚多个国家,各国时区不同。Cron 表达式 0 0 * * *(每天零点)到底是哪个零点?最终方案:所有内部时间统一用 UTC 存储和计算,Cron 规则必须显式指定时区 CRON_TZ=Asia/Singapore 0 0 * * *。SDK 会校验 cron 表达式中是否包含时区信息,没有的话直接拒绝。

5. 幂等性的推广困难

技术上最简单但推广最难的问题。"至少一次投递"要求业务执行器必须幂等,但很多业务开发者不理解或者不重视。上线后出过几次"重复扣款"、"重复发券"的事故,都是因为执行器没做幂等。最终在 SDK 层面强制要求传入一个 idempotency_key,并提供了一个基于 Redis 的去重 helper,从工具层面降低了出错概率。

回顾与反思

这个系统从 2019 年中开始设计,到 2020 年初第一个业务接入,到 2021 年双十一扛住 50K QPS。三年间的几个感悟:

如果重新做一次

最大的教训是:先把用例建模清楚,再做技术选型

我们当时把"异步任务"当作一个统一的概念来设计系统,但实际上不同的任务在关键属性上差异巨大。如果重新来过,我会先花足够的时间和业务团队一起梳理每种任务的核心属性:

属性 可能的取值 影响的设计决策
延迟类型 即时 / 短延迟 / 长延迟 / Cron 存储选型(Kafka vs MySQL)、调度策略
优先级 高(订单取消)/ 中 / 低(报表) 队列隔离策略、资源分配
唯一性 允许重复 / 同一时刻唯一 / 全局唯一 去重机制、幂等要求
投递语义 至少一次 / 至多一次 / 恰好一次 确认机制、重试策略、消息持久化
执行时长 毫秒级 / 秒级 / 分钟级+ 超时设置、心跳检测、资源隔离
失败处理 重试 / 丢弃 / 人工介入 重试策略、死信队列、告警规则
依赖关系 独立 / 有前置依赖 / DAG 任务编排、状态机

这些属性的不同组合,意味着完全不同的设计 trade-off。举几个例子:

我们当时犯的错误,就是把这些差异巨大的任务塞进同一套模型里。结果就是:高优先级任务被低优先级任务阻塞(没有队列隔离)、需要唯一性的任务被重复执行(没有去重机制)、需要编排的任务靠业务方手动串联(没有 DAG 支持)。

其他具体改进:

总结一句话:技术选型是为用例服务的,不是反过来。先花时间把任务的属性矩阵建清楚,每种组合对应什么 trade-off 想明白,再动手写代码。这比上来就纠结"用 Kafka 还是用 Redis"有价值得多。