参考
https://zhuanlan.zhihu.com/p/600380258
https://xie.infoq.cn/article/aaa353c9df6641eb1b09e6f36
https://www.luozhiyun.com/archives/458
前言
在许多业务场景中,我们需要使用定时器来执行一些定期任务或操作。以下是一些常见的使用场景:
-
订单管理
- 当订单一直处于未支付状态时,需要及时关闭订单并退还库存
- 对于处于退款状态的订单,需要定期检查是否已经退款成功
-
用户激活
- 对于新创建的店铺,如果在N天内没有上传商品,系统需要发送激活短信提醒
-
数据统计
- 需要定期统计并更新各种业务指标和报表数据
-
缓存刷新
- 需要定期刷新或更新缓存数据,以确保数据的及时性和准确性
-
消息队列消费
- 需要定期从消息队列中消费并处理消息
-
系统维护
- 需要定期执行一些系统维护任务,如日志清理、数据备份等
这些场景中,如果使用传统的扫表方式,每个业务都需要维护自己的扫表逻辑,会导致大量重复代码。使用定时器,可以将这部分逻辑抽象出来,作为一个公共的组件,提高代码的可维护性和复用性。定时器可以根据配置的时间间隔,周期性地执行指定的任务,从而解决上述问题。
核心思路
主动轮询
一句话总结:实现定时器最简单粗暴的方式:轮询 + 触发.
(1)注册定时器:解析并将一系列定时任务平铺直叙地展开,每笔定时任务明确展示执行时间这一指标
(2)节点自轮询:每间隔一个微小的时间范围,对定时任务列表进行全量查询
(3)过滤&触发:以 执行时间小于等于当前时刻 作为过滤条件,摘出满足执行条件的定时任务进行执行.
这个乞丐版定时器虽然已经实现了,但是它还有很多可以优化的地方,比如每次查询都需要承担 O(N) 的线性时间复杂度。显然,我们还有很多地方可以进一步改进。
存储结构优化:有序表
一句话总结:基于有序表提升查询效率
针对无序表结构,虽然插入记录可以达到 O(1) 的时间复杂度,但每次查询都需要 O(N) 的线性时间复杂度,这显然是一个短板。解决这个问题的一个优化方向是通过重新设计存储结构,将时间复杂度均摊到每一笔操作当中。
例如使用红黑树(RBTree) 或者跳表(Skip List) 这样的数据结构,能以将插入记录的时间复杂度由 O(1) ->O(logN)为代价,换取查询时间复杂度由O(N) -> O(logN) 的优化.
在 xTimer 的实现中,存储介质选型上选择使用 Redis ZSet,以定时任务执行时间为 Score 进行有序结构的搭建,当定时任务数量达到一定量级时,ZSet 底层基于跳表作为有序表的实现. 一些更细致的实现流程如下:
(1)以 Redis ZSet 作为存储介质;
(2)每次添加定时任务时,执行 ZAdd 动作,以执行时间的时间戳作为排序的键(Score) 进行有序结构的搭建;
(3)每次查询定时任务时,执行 ZRangeByScore 动作,以当前时刻的时间戳加上一个微小偏移量作 score 的左右边界;
经过优化,将木桶的短板从 O(N) 提升到 O(logN),这是时间复杂度模型的优化。接下来,可以通过数据分治的方式对查询任务数量 N 进行优化;
下面给出了简单的代码
package main
import (
"fmt"
"github.com/go-redis/redis/v8"
"context"
"time"
)
var ctx = context.Background()
type RedisTimer struct {
client *redis.Client
}
func NewRedisTimer() *RedisTimer {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
return &RedisTimer{client}
}
func (r *RedisTimer) AddTask(taskID string, executionTime int64) error {
_, err := r.client.ZAdd(ctx, "tasks", &redis.Z{Score: float64(executionTime), Member: taskID}).Result()
return err
}
func (r *RedisTimer) QueryTasks() ([]string, error) {
currentTime := time.Now().Unix()
minScore := fmt.Sprintf("%d", currentTime - 1) // 设置一个微小的偏移量,确保查询到当前时间之后的任务
maxScore := "+inf"
result, err := r.client.ZRangeByScore(ctx, "tasks", &redis.ZRangeBy{
Min: minScore,
Max: maxScore,
}).Result()
if err != nil {
return nil, err
}
return result, nil
}
func main() {
redisTimer := NewRedisTimer()
// 添加定时任务
err := redisTimer.AddTask("task1", time.Now().Unix() + 10) // 10秒后执行
if err != nil {
fmt.Println("Error adding task1:", err)
}
err = redisTimer.AddTask("task2", time.Now().Unix() + 20) // 20秒后执行
if err != nil {
fmt.Println("Error adding task2:", err)
}
// 查询定时任务
tasks, err := redisTimer.QueryTasks()
if err != nil {
fmt.Println("Error querying tasks:", err)
} else {
fmt.Println("当前定时任务:", tasks)
}
}
存储结构优化:纵向分治
一句话总结:通过时间范围分片,减少查询涉及的任务数量。
每次我们进行查询时,我们最关心的是即将要执行的任务,也就是那些在接下来的一段时间内即将发生的。但是,数据中可能会包含一些计划在稍后执行的任务,它们在当前的查询中并不是我们的主要焦点,只是在增加数据量而已。为**了更清楚地处理这个问题,我们把整个时间分成了许多小片段,就像把一根线剪成了很多小段一样。这样,我们可以更容易地识别出当前时间段内需要关注的任务,而那些在未来时间段才会执行的任务则暂时不被考虑。**这种方法让我们更好地管理任务,避免了不必要的数据干扰,确保我们能够专注于当下最重要的事情。
在 xTimer
的实现中,选用 1
分钟作为分片的时间范围,更细致的实现流程如下:
(1)插入每笔定时任务时,根据执行时间推算出所属的分钟级时间范围表达式,例如:2022-09-17-11:00:03 -> 2022-09-17-11:00:00_2022-09-17-11:01:00
。
(2)以分钟级时间范围表达式为 key
,将定时任务任务插入到不同 ZSet
中,组成一系列相互隔离的有序表结构。
(3)每一次查询过程中,同样根据当前时刻推算出对应分钟级时间范围表达式,并以此为 key
查找到对应的有序表进行 ZRange
查询。
至此,每一次查询的任务量级就从全量数据 N
进一步减小到分钟级数据 N'
,毋庸置疑 N' << N
。
存储结构优化:横向分治
一句话总结:通过定时任务分桶,提高并发度.
前面的小节,我们都基于单核的视角出发,对查询和存储模型进行优化。然而我们的主题是生产环境中 golang 分布式定时器的实现方案,因此必然是集群模式,且单个节点也可以基于 goroutine 实现高并发。
为了避免引起多协程介入导致对临界资源的竞态问题,xTimer
在实现上以分片作为最小的资源粒度,每一个分片对应的任务集只会由一个goroutine
负责作轮询,因此相应的要求是,需要将时间分片拆解为更细的粒度,即在横向上额外增加一个分桶的维度,从而保证每个时间范围内能有对应于分桶数量的goroutine
并发参加工作。
更细致的流程如下:
(1)插入定时任务时,首先根据执行时间,确定其从属的时间范围;
(2)其次,根据定时任务的唯一标识 id
,结合服务对最大桶数的设置参数,随机将定时任务划分到一个桶中;
(3)以时间范围和桶号组装形成一个新的 key
,形成一个二维分片,实现对定时任务有序表的隔离;
(4)后续流程与 横向分治小节相同.
至此,分布式定时器核心方案的轮廓已经呈现,本质上是一个主动轮询的模型,过程中辅以有序表结合分时分桶的方式进行轮询效率的优化。