从零构建分布式异步任务队列:在 Shopee 的实战经验
这篇文章的价值:每个达到一定规模的后端系统都需要异步任务——延迟执行、定时调度、失败重试。开源方案(Celery、Bull)往往太重或不够灵活。这篇文章分享在 Shopee 从零构建一个分布式异步任务队列的完整经验:存储模型、调度策略、失败重试、优先级队列,以及在生产环境中踩过的坑。
为什么每家公司都在造异步任务轮子
几乎每个达到一定规模的后端系统都会遇到同一个问题:有些事情不能在用户请求里同步做完。
用户下单后,你需要在 30 分钟后检查是否付款、没付就取消。用户注册后,你需要异步发一封欢迎邮件。每天凌晨,你需要跑一次对账。每小时,你需要同步一次外部数据。这些"稍后再做"的事情,就是异步任务。
问题在于,"稍后再做"听起来简单,做起来一点都不简单:
- 可靠性:服务重启了,任务还在吗?网络抖动了,任务会丢吗?
- 延迟控制:"30 分钟后执行"——谁来计时?精度多少?
- 并发控制:10 个实例同时运行,怎么保证同一个任务不会被执行两次?
- 失败处理:任务执行失败了,重试几次?重试间隔多久?重试到什么时候放弃?
- 可观测性:现在有多少任务在排队?平均延迟多少?哪些任务失败了?
这就是为什么 Celery、Sidekiq、Bull、Temporal 这些项目存在——它们解决的都是同一个问题。但在大公司里,通用方案往往不够用:你需要和内部 RPC 框架集成、需要适配公司的部署平台、需要和已有的监控体系打通。所以几乎每家有一定规模的公司都会自己造一个。
Shopee 的情况
2019 年我加入 Shopee 的平台团队,负责面向业务开发团队的中间件服务。当时各业务线都有自己的"异步任务"方案——有人用 crontab + 脚本,有人写个死循环轮询数据库,有人直接在业务代码里起 goroutine 然后祈祷它不会丢。
问题显而易见:没有统一的任务管理、没有监控、没有重试、任务丢了没人知道。更要命的是,每个团队都在重复踩同样的坑——重试没做幂等导致重复扣款,crontab 机器挂了没人发现导致对账断了一周。平台团队决定从零构建一套分布式异步任务队列系统,提供标准化的异步任务能力。
这篇文章记录这个系统从设计到落地的完整过程。三年间它从 0 做到 10+ 业务接入、QPS 从 2K 增长到 25K+、双十一峰值 50K。
需求分析
和各业务团队聊下来,异步任务的需求大致分三类:
| 类型 | 场景举例 | 延迟要求 |
|---|---|---|
| 短延迟任务 | 订单超时取消、优惠券过期、消息推送 | 秒级~分钟级 |
| 长延迟任务 | T+1 对账、周报生成、数据归档 | 小时级~天级 |
| Cron 定时任务 | 每日凌晨跑批、每小时数据同步、心跳检测 | 按 cron 表达式 |
除了功能需求,还有几个硬性约束:
- 至少一次投递:任务可以重复执行(业务做幂等),但绝对不能丢
- 多语言支持:业务团队用 Go 和 Python,必须提供两种 SDK
- 和内部 RPC 平台集成:任务触发通过公司内部的 RPC 框架调用业务服务,不能引入额外的网络协议
- 可观测性:每个任务的状态、耗时、重试次数都要能查到
系统架构
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Order Svc │ │ Payment Svc │ │ Report Svc │ 业务服务
│ (Go SDK) │ │ (Go SDK) │ │ (Python SDK) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└──────────────────┼───────────────────┘
│ Submit (RPC)
▼
┌───────────────────────────┐
│ API Server │
│ ┌─────────┐ ┌─────────┐ │
│ │ 参数校验 │ │延迟路由 │ │
│ └─────────┘ └─────────┘ │
└─────┬─────────────┬───────┘
│ │
delay ≤ 10m │ │ delay > 10m / Cron
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Kafka │ │ MySQL │
│ │ │ │
│ topic-order │ │ async_tasks │ 持久化层
│ topic-pay │ │ cron_registry│
│ topic-report │ │ │
└──────┬───────┘ └──────┬───────┘
│ │
▼ ▼
┌─────────────────────────────────────────┐
│ Dispatcher (N 实例) │
│ │
│ ┌────────────┐ ┌───────────────────┐ │
│ │ Kafka │ │ MySQL Poller │ │
│ │ Consumer │ │ (秒级扫描) │ │
│ └─────┬──────┘ └────────┬──────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────┐ │
│ │ ETA Priority Queue (Heap) │ │ 调度层
│ │ 按触发时间排序,到期即 dispatch │ │
│ └───────────────┬──────────────────┘ │
│ │ │
└──────────────────┼───────────────────────┘
│ Dispatch (RPC)
▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Order Svc │ │ Payment Svc │ │ Report Svc │ 执行器
│ (Executor) │ │ (Executor) │ │ (Executor) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└────────────────┼─────────────────┘
│ 结果回调 (RPC)
▼
┌─────────────────────────────────────────┐
│ Prometheus + Grafana │ 可观测性
│ 延迟 · 吞吐 · 成功率 · 队列深度 · 重试 │
└─────────────────────────────────────────┘
为什么是 Kafka + MySQL,而不是纯 Kafka 或纯 Redis
这是设计阶段讨论最多的决策。
纯 Kafka 方案的问题:Kafka 不支持任意延迟投递。虽然可以用多个 topic 模拟不同延迟级别(1s/5s/30s/1min/...),但粒度太粗,且 topic 数量爆炸。对于"3 天后触发"这种长延迟,Kafka 完全不合适——消息堆积在 broker 上,浪费资源且 rebalance 时有风险。
纯 Redis 方案(Sorted Set + ZRANGEBYSCORE):延迟任务用 score 存触发时间,定时扫描到期任务。问题是 Redis 的持久化不够可靠——RDB 有时间窗口,AOF 在高 QPS 下性能下降。对于"至少一次投递"的硬性要求,Redis 作为唯一存储太冒险。
最终方案:Kafka 处理即时和短延迟任务(≤ 10 分钟),MySQL 处理长延迟和 Cron 任务。
- Kafka:吞吐量高,天然支持消费者组和 partition 级并行。短延迟任务消费后放入内存中的 ETA 优先队列(最小堆),到期后触发执行。
- MySQL:ACID 保证任务不丢。长延迟任务写入时带上触发时间,Dispatcher 每秒扫描到期任务。Cron 任务由 Scheduler 提前计算下一次触发时间写入。
短延迟任务:ETA Priority Queue
短延迟任务从 Kafka 消费后,不是直接 sleep 等待,而是放入一个基于最小堆的优先队列,按 ETA(Expected Time of Arrival,预期触发时间)排序:
// ETA Priority Queue —— 基于 container/heap
type Task struct {
ID string
Type string
Payload string
ETA time.Time // 预期触发时间
Priority int // 业务优先级(用于同 ETA 的 tiebreak)
index int // heap 内部索引
}
type ETAQueue []*Task
func (q ETAQueue) Len() int { return len(q) }
func (q ETAQueue) Less(i, j int) bool {
if q[i].ETA.Equal(q[j].ETA) {
return q[i].Priority > q[j].Priority // 同 ETA,高优先级先出
}
return q[i].ETA.Before(q[j].ETA)
}
func (q ETAQueue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}
func (q *ETAQueue) Push(x any) {
t := x.(*Task)
t.index = len(*q)
*q = append(*q, t)
}
func (q *ETAQueue) Pop() any {
old := *q
n := len(old)
t := old[n-1]
old[n-1] = nil
t.index = -1
*q = old[:n-1]
return t
}
Dispatcher 的调度循环:
func (d *Dispatcher) Run() {
pq := &ETAQueue{}
heap.Init(pq)
for {
// 1. 从 Kafka 消费新任务,推入堆
for _, task := range d.consumeBatch() {
heap.Push(pq, task)
}
// 2. 弹出所有已到期的任务
now := time.Now()
for pq.Len() > 0 && (*pq)[0].ETA.Before(now) {
task := heap.Pop(pq).(*Task)
go d.dispatch(task) // 异步执行
}
// 3. 计算下次唤醒时间:堆顶的 ETA 或兜底 100ms
var sleepUntil time.Duration
if pq.Len() > 0 {
sleepUntil = time.Until((*pq)[0].ETA)
if sleepUntil < 0 {
sleepUntil = 0
}
} else {
sleepUntil = 100 * time.Millisecond
}
time.Sleep(sleepUntil)
}
}
为什么选堆而不是 Timer Wheel:
- 语义直观:每个任务有一个 ETA,堆顶永远是最近要触发的任务。代码逻辑清晰,不需要理解 cascade、tick 这些概念。
- 精度灵活:Timer Wheel 的精度受限于 tick 粒度(比如 100ms 一格),堆没有这个限制——ETA 精确到纳秒。
- 天然支持优先级:
Less函数里可以同时比较 ETA 和业务优先级。同一秒内到期的任务,高优先级的先执行。 - O(log n) 够用:堆的插入和弹出是 O(log n),对于我们的场景(堆内通常几千到几万个任务),性能完全不是瓶颈。Timer Wheel 的 O(1) 优势只在百万级任务时才体现。
- Go 标准库自带:
container/heap已经提供了完整的接口,不需要引入额外依赖。
长延迟任务:MySQL Poller
长延迟任务存在 MySQL 里,核心表结构:
CREATE TABLE async_tasks (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
task_type VARCHAR(128) NOT NULL,
payload TEXT NOT NULL,
trigger_at DATETIME NOT NULL, -- 触发时间
status TINYINT DEFAULT 0, -- 0=pending, 1=dispatched, 2=success, 3=failed
retry_count INT DEFAULT 0,
max_retries INT DEFAULT 3,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_trigger (status, trigger_at)
);
Dispatcher 的 poller 每秒执行:
SELECT id, task_type, payload
FROM async_tasks
WHERE status = 0 AND trigger_at <= NOW()
ORDER BY trigger_at
LIMIT 1000
FOR UPDATE SKIP LOCKED;
FOR UPDATE SKIP LOCKED 是关键——多个 Dispatcher 实例并行扫描时,已经被其他实例锁住的行会被跳过,避免了分布式锁的复杂性。这是 MySQL 8.0 引入的特性,完美适合这个场景。
扫描到的任务立即更新状态为 dispatched,然后通过 RPC 调用业务服务的执行器。如果 RPC 失败或超时,任务状态回退为 pending,retry_count 加一,trigger_at 设为当前时间加上退避延迟(指数退避 + 抖动)。
Cron 定时任务
Cron 任务的实现分两层:
- Cron Scheduler:解析 cron 表达式,计算下一次触发时间,写入 MySQL 的
async_tasks表。本质上是把 Cron 任务转化为长延迟任务。 - Cron Registry:存储 cron 规则的元信息(cron 表达式、关联的 task_type、负责人等)。
// Cron Scheduler 核心循环
func (s *CronScheduler) Run() {
for {
rules := s.loadActiveRules() // 从 cron_registry 加载
for _, rule := range rules {
next := rule.NextTriggerTime()
if !s.taskExistsForTime(rule.TaskType, next) {
s.createTask(rule.TaskType, rule.Payload, next)
}
}
time.Sleep(10 * time.Second) // 每 10 秒检查一次
}
}
这个设计的好处是 Cron Scheduler 是无状态的——它只负责"确保下一次触发的任务存在于 MySQL 中"。即使 Scheduler 挂了重启,也不会丢任务,因为它是幂等的(检查是否已存在再创建)。
SDK 设计
Go SDK 和 Python SDK 是业务团队直接使用的接口。设计原则:尽可能简单,隐藏底层复杂性。
// Go SDK - 提交任务
client := asyncd.NewClient(asyncd.WithServiceName("order-service"))
// 即时任务
err := client.Submit(ctx, &asyncd.Task{
Type: "cancel_expired_order",
Payload: `{"order_id": 12345}`,
})
// 延迟任务
err := client.SubmitDelayed(ctx, &asyncd.Task{
Type: "send_review_reminder",
Payload: `{"user_id": 67890}`,
}, 3 * 24 * time.Hour) // 3 天后
// 注册执行器
client.RegisterExecutor("cancel_expired_order", func(ctx context.Context, payload string) error {
var req CancelOrderRequest
json.Unmarshal([]byte(payload), &req)
return orderService.Cancel(ctx, req.OrderID)
})
SDK 内部做了几件事:
- 序列化:payload 统一用 JSON 字符串,避免跨语言的序列化兼容问题
- RPC 集成:通过公司内部 RPC 框架注册服务,Dispatcher 调用执行器时走标准 RPC 链路(服务发现、负载均衡、超时控制都由框架处理)
- 幂等提示:SDK 文档和 code review 时反复强调——执行器必须是幂等的,因为"至少一次投递"意味着同一个任务可能被执行多次
Kubernetes Operator
系统部署在 Kubernetes 上。一开始用标准的 Deployment + ConfigMap,但运维痛点很快暴露:
- 新业务接入需要手动创建 Kafka topic、更新路由配置、调整 consumer 数量
- Dispatcher 的 partition 分配和 MySQL poller 的并发度需要根据 QPS 动态调整
- Cron 规则的变更需要重启 Scheduler
于是我们开发了一个 Custom Operator,引入了 CRD:
apiVersion: asyncd.example.io/v1
kind: AsyncJobConfig
metadata:
name: order-cancel-job
spec:
taskType: cancel_expired_order
executor:
service: order-service
method: CancelExpiredOrder
timeout: 30s
retry:
maxRetries: 3
backoff: exponential
kafka:
topic: asyncd-order-cancel
partitions: 6
consumerGroup: asyncd-order-cancel-cg
Operator 监听 CRD 的变更,自动完成:
- 创建/更新 Kafka topic 和 consumer group
- 更新 Dispatcher 的路由表(通过 ConfigMap 热更新)
- 根据 QPS 指标自动调整 consumer 副本数(HPA 联动)
这大幅降低了业务接入的门槛——从"提工单找平台团队配置"变成"提交一个 YAML"。
监控与告警
每个环节都暴露了 Prometheus metrics:
// 任务提交
asyncd_task_submitted_total{task_type, source_service}
asyncd_task_submit_latency_seconds{task_type}
// 任务调度
asyncd_task_dispatched_total{task_type}
asyncd_task_dispatch_delay_seconds{task_type} // 实际触发时间 - 预期触发时间
// 任务执行
asyncd_task_executed_total{task_type, status} // status: success/failed/timeout
asyncd_task_execution_duration_seconds{task_type}
asyncd_task_retry_total{task_type}
// 队列健康
asyncd_kafka_consumer_lag{topic, partition}
asyncd_mysql_pending_tasks{task_type}
asyncd_mysql_poll_duration_seconds
最关键的告警规则:
- dispatch_delay > 5s:任务调度延迟超过 5 秒,说明消费能力不足或 MySQL poller 卡住了
- kafka_consumer_lag 持续增长:消费者跟不上生产者,需要扩容
- mysql_pending_tasks 持续增长:到期任务堆积,poller 可能有问题
- task retry 率 > 10%:下游服务可能出了问题
双十一:从 25K 到 50K QPS
2021 年双十一是这个系统最大的考验。日常 QPS 在 25K 左右,预估峰值需要支撑 50K+。
提前做了几件事:
- Kafka partition 扩容:核心 topic 从 12 partition 扩到 24,consumer 相应扩容
- MySQL 读写分离:poller 查询走从库,状态更新走主库,避免扫描查询影响写入
- 批量提交优化:SDK 增加了
SubmitBatch接口,单次 RPC 提交多个任务,减少网络 round trip - 预热:双十一前一天,提前创建好所有 Cron 任务的下一次触发记录,避免零点的写入尖峰
实际峰值达到了约 48K QPS,系统扛住了,没有任务丢失。核心指标:
| 指标 | 日常 | 双十一峰值 |
|---|---|---|
| QPS | ~25K | ~48K |
| P99 调度延迟 | 180ms | 520ms |
| 任务丢失 | 0 | 0 |
| 重试率 | 0.3% | 1.2% |
踩过的坑
1. MySQL 的 datetime 精度
MySQL 的 DATETIME 默认精度是秒。当多个任务的 trigger_at 在同一秒内,ORDER BY trigger_at 的顺序不确定。对于秒级延迟任务,这导致了任务被乱序执行。解决方案:改用 DATETIME(3)(毫秒精度),并在 ORDER BY 中加上 id 作为 tiebreaker。
2. Kafka rebalance 风暴
早期 consumer 没有设置合理的 session.timeout.ms 和 heartbeat.interval.ms,业务执行器偶尔的慢请求导致 consumer 被认为挂了,触发 rebalance,rebalance 期间所有 consumer 停止消费,导致延迟飙升。最终调整为 session.timeout=30s、heartbeat.interval=10s、max.poll.interval=5min,同时执行器调用改为异步——消费消息后立即 commit,执行结果通过回调上报。
3. Poller 的惊群效应
多个 Dispatcher 实例同时扫描 MySQL,虽然 FOR UPDATE SKIP LOCKED 避免了重复执行,但大量的锁竞争导致 MySQL CPU 飙高。解决方案:每个 Dispatcher 实例负责特定的 task_type 范围(通过一致性哈希分配),避免所有实例扫描同一批数据。
4. 时区问题
Shopee 业务覆盖东南亚多个国家,各国时区不同。Cron 表达式 0 0 * * *(每天零点)到底是哪个零点?最终方案:所有内部时间统一用 UTC 存储和计算,Cron 规则必须显式指定时区 CRON_TZ=Asia/Singapore 0 0 * * *。SDK 会校验 cron 表达式中是否包含时区信息,没有的话直接拒绝。
5. 幂等性的推广困难
技术上最简单但推广最难的问题。"至少一次投递"要求业务执行器必须幂等,但很多业务开发者不理解或者不重视。上线后出过几次"重复扣款"、"重复发券"的事故,都是因为执行器没做幂等。最终在 SDK 层面强制要求传入一个 idempotency_key,并提供了一个基于 Redis 的去重 helper,从工具层面降低了出错概率。
回顾与反思
这个系统从 2019 年中开始设计,到 2020 年初第一个业务接入,到 2021 年双十一扛住 50K QPS。三年间的几个感悟:
- 存储选型决定了系统的天花板:Kafka + MySQL 的组合虽然不性感,但胜在成熟可靠。在中间件系统里,"不丢数据"比"低延迟"重要得多。Kafka 的持久化和 at-least-once 语义是这个系统的基石,任何替代方案都必须先过这一关。
- SDK 的易用性决定了系统的推广速度:再好的系统,如果 SDK 用起来很麻烦,业务团队就不会用。花在 SDK 设计上的时间永远不亏。
- Kubernetes Operator 是中间件团队的杠杆:把运维操作编码为 CRD + Controller,业务团队自助接入,平台团队不再是瓶颈。这个模式后来被其他中间件(配置中心、消息队列)复用。
- 监控不是事后补的,是第一天就要有的:系统上线第一天就有完整的 Prometheus + Grafana dashboard。早期很多问题都是通过 metrics 提前发现的,而不是等业务报障。
- 幂等性是一个社会问题,不是技术问题:你可以提供工具,但最终需要业务开发者理解并重视。文档、培训、code review、事故复盘,缺一不可。
如果重新做一次
最大的教训是:先把用例建模清楚,再做技术选型。
我们当时把"异步任务"当作一个统一的概念来设计系统,但实际上不同的任务在关键属性上差异巨大。如果重新来过,我会先花足够的时间和业务团队一起梳理每种任务的核心属性:
| 属性 | 可能的取值 | 影响的设计决策 |
|---|---|---|
| 延迟类型 | 即时 / 短延迟 / 长延迟 / Cron | 存储选型(Kafka vs MySQL)、调度策略 |
| 优先级 | 高(订单取消)/ 中 / 低(报表) | 队列隔离策略、资源分配 |
| 唯一性 | 允许重复 / 同一时刻唯一 / 全局唯一 | 去重机制、幂等要求 |
| 投递语义 | 至少一次 / 至多一次 / 恰好一次 | 确认机制、重试策略、消息持久化 |
| 执行时长 | 毫秒级 / 秒级 / 分钟级+ | 超时设置、心跳检测、资源隔离 |
| 失败处理 | 重试 / 丢弃 / 人工介入 | 重试策略、死信队列、告警规则 |
| 依赖关系 | 独立 / 有前置依赖 / DAG | 任务编排、状态机 |
这些属性的不同组合,意味着完全不同的设计 trade-off。举几个例子:
- 订单超时取消(短延迟 + 高优先级 + 全局唯一 + 至少一次):需要独立的高优先级队列,去重机制必须可靠,消息绝对不能丢——Kafka 是正确选择,Redis 不行。
- 统计报表生成(Cron + 低优先级 + 允许重复 + 至多一次):跑两次也没关系,但不能阻塞高优先级任务——需要队列隔离,而不是和订单任务共享。
- 数据导出(即时 + 执行时间长 + 有前置依赖):需要心跳检测防止被误判为超时,需要 DAG 编排而不是手动串联。
我们当时犯的错误,就是把这些差异巨大的任务塞进同一套模型里。结果就是:高优先级任务被低优先级任务阻塞(没有队列隔离)、需要唯一性的任务被重复执行(没有去重机制)、需要编排的任务靠业务方手动串联(没有 DAG 支持)。
其他具体改进:
- 引入死信队列:重试耗尽的任务应该进入死信队列而不是标记为 failed 就完了。当时缺少这个机制,导致一些失败任务被遗忘。
- 按优先级隔离队列:高优先级任务使用独立的 Kafka topic 和 consumer group,避免被低优先级任务的堆积拖慢。这个后来补上了,但如果一开始就在用例建模阶段识别出来,实现会干净得多。
- 任务唯一性约束:提供声明式的唯一性配置(如
unique_key = "order:{order_id}"),由系统层面保证同一个 key 的任务不会重复执行,而不是完全依赖业务方做幂等。 - DAG 式任务编排:业务经常需要"任务 A 完成后触发任务 B"。当时是业务自己在执行器里手动提交下一个任务,很容易出错。应该提供声明式的 DAG 编排能力。
总结一句话:技术选型是为用例服务的,不是反过来。先花时间把任务的属性矩阵建清楚,每种组合对应什么 trade-off 想明白,再动手写代码。这比上来就纠结"用 Kafka 还是用 Redis"有价值得多。