golang实现延迟队列(delay queue)

golang实现延迟队列

1 延迟队列:邮件提醒、订单自动取消

延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:

  • 邮件提醒
  • 订单自动取消(超过多少时间未支付,就取消订单)
  • 对超时任务的处理等

由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。

2 实现

2.1 simple简单版:go自带的time包实现

思路:

  1. 定义Task结构体,包含
  • ExecuteTime time.Time
  • Job func()
  1. 定义DelayQueue
  • TaskQueue []Task
  • func AddTask
  • func RemoveTask
  • ExecuteTask

这种方案存在的问题:

Go程序重启时,存储在slice中的延迟处理任务将全部丢失

完整代码:

package main

import (
	"fmt"
	"time"
)

/*
基于go实现延迟队列
*/
type Task struct {
	ExecuteTime time.Time
	Job         func()
}

type DelayQueue struct {
	Tasks []*Task
}

func (d *DelayQueue) AddTask(t *Task) {
	d.Tasks = append(d.Tasks, t)
}

func (d *DelayQueue) RemoveTask() {
	//FIFO: remove the first task to enqueue
	d.Tasks = d.Tasks[1:]
}

func (d *DelayQueue) ExecuteTask() {
	for len(d.Tasks) > 0 {
		//dequeue a task
		currentTask := d.Tasks[0]
		if time.Now().Before(currentTask.ExecuteTime) {
			//if the task execution time is not up, wait
			time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
		}
		//execute the task
		currentTask.Job()
		//remove task who has been executed
		d.RemoveTask()
	}

}

func main() {
	fmt.Println("start delayQueue")
	delayQueue := &DelayQueue{}
	firstTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 1),
		Job: func() {
			fmt.Println("executed task 1 after delay")
		},
	}
	delayQueue.AddTask(firstTask)
	secondTask := &Task{
		ExecuteTime: time.Now().Add(time.Second * 7),
		Job: func() {
			fmt.Println("executed task 2 after delay")
		},
	}
	delayQueue.AddTask(secondTask)
	delayQueue.ExecuteTask()
	fmt.Println("all tasks have been done!!!")
}

效果:
在这里插入图片描述

2.2 complex持久版:go+redis

为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。

思路:

  1. 初始化redis连接
  2. 延迟队列采用redis的zset(有序集合)实现

前置准备:

# 安装docker
yum install -y yum-utils
yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/centos/docker-ce.repo
yum install docker
systemctl start docker

# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis

完整代码:

package main

import (
	"fmt"
	"github.com/go-redis/redis"
	log "github.com/ziyifast/log"
	"time"
)

/*
基于redis zset实现延迟队列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"

func initClient() (err error) {
	redisdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // not set password
		DB:       0,  //use default db
	})
	_, err = redisdb.Ping().Result()
	if err != nil {
		log.Errorf("%v", err)
		return err
	}
	return nil
}

func main() {
	err := initClient()
	if err != nil {
		log.Errorf("init redis client err: %v", err)
		return
	}
	addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())
	addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix())
	//执行队列中的任务
	getAndExecuteTask()
}

// executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序
func addTaskToQueue(task string, executeTime int64) {
	err := redisdb.ZAdd(DelayQueueKey, redis.Z{
		Score:  float64(executeTime),
		Member: task,
	}).Err()
	if err != nil {
		panic(err)
	}
}

// 从redis中取一个task并执行
func getAndExecuteTask() {
	for {
		tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{
			Min:    "-inf",
			Max:    fmt.Sprintf("%d", time.Now().Unix()),
			Offset: 0,
			Count:  1,
		}).Result()
		if err != nil {
			time.Sleep(time.Second * 1)
			continue
		}
		//处理任务
		for _, task := range tasks {
			fmt.Println("Execute task: ", task)
			//执行完任务之后用 ZREM 移除该任务
			redisdb.ZRem(DelayQueueKey, task)
		}
		time.Sleep(time.Second * 1)
	}
}

效果:

redis一直从延迟队列中取数据,如果处理完一批则睡眠1s

  • 具体根据大家的业务调整,此处主要介绍思路

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/401486.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux的Ubuntu的APT使用

Linux的Ubuntu的APT使用 apt 介绍 apt 是 Advanced Packaging Tool 的简称,是一款安装包管理工具。在 Ubuntu 下,我们可以使用 apt 命令进行软件包的安装、删除、清理等,类似于 Windows 中的软件管理工具。 Ubuntu 软件操作的相关命令 su…

042-WEB攻防-PHP应用MYSQL架构SQL注入跨库查询文件读写权限操作

042-WEB攻防-PHP应用&MYSQL架构&SQL注入&跨库查询&文件读写&权限操作 #知识点: 1、PHP-MYSQL-SQL注入-常规查询 2、PHP-MYSQL-SQL注入-跨库查询 3、PHP-MYSQL-SQL注入-文件读写 演示案例: ➢PHP-MYSQL-Web组成架构 ➢PHP-MYSQL-SQL…

python自动化接口测试

前几天,同组姐妹说想要对接口那些异常值进行测试,能否有自动化测试的方法。仔细想了一下,工具还挺多,大概分析了一下: 1、soapui:可以对接口参数进行异常值参数化,可以加断言,一般我们会加http…

undo日志详解

一、undo日志介绍 上一节详细的说了redo日志,redo日志的功能就是把增删改操作都记录着,如果断电导致内存中的脏页丢失,可以根据磁盘中的redo日志文件进行恢复。redo日志被设计出来是为了保证数据库的持久性,undo日志设计出来是为…

从故宫修建看「软件物料清单」的重要性 @安全历史01

故宫,这座中国传统文化的重要代表和象征性建筑已屹立近600年,是世界上现存规模最大、保存最为完整的木质结构古建筑之一。 故宫之所以能至今保存完好,除持续保护和修缮外,其使用的木材和砖石等材料也经过了精挑细选,保…

一周学会Django5 Python Web开发-Django5路由重定向

锋哥原创的Python Web开发 Django5视频教程: 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计25条视频,包括:2024版 Django5 Python we…

图片速览 PrintListener: 通过手指摩擦声发现指纹认证漏洞

原有一些方法主要是用字典猜测的方式来解锁的,文章的方法利用了用户滑手机屏幕产生的声音来辅助指纹的生成,且本文所提方案的准确性要更高(文章采样了各种环境、各种情况的数据,详见原文)。 PrintListener的攻击场景广泛且隐蔽。它只需要记录…

小米标准模组+MCU 快速上手开发(一)——之固件下载

小米标准模组+MCU 开发笔记之固件下载 背景技术名词简介● 小米IoT开发者平台● 小米IoT 模组● ESP系列简介问题描述 + 解决方式问题1:固件下载是否有示例,如何下载到硬件板卡中?问题2:固件下载的官方程序是什么?在哪里?该如何使用?问题3:固件下载时,Flash和Ram 有什…

安全这么卷了吗?北京,渗透,4k,不包吃住,非实习

起初某HR找人发了条招聘信息 看到被卷到4k一个月被震惊到了 随后发布了朋友圈,引起来众多讨论 对此网友发表众多评价 越来越卷的工作现象确实是一个普遍存在的问题 另外,也可以考虑和雇主沟通, 寻求更合理的工作安排, 或者…

C#,大规模图(Large Graph)的均匀成本搜索之迪杰斯特拉(Dijkstra)算法与源代码

1 均匀成本搜索 均匀成本搜索是迪杰斯特拉算法的变体。这里,我们不是将所有顶点插入到一个优先级队列中,而是只插入源,然后在需要时一个接一个地插入。在每一步中,我们检查项目是否已经在优先级队列中(使用访问数组)。如果是&…

flink反压

flink反压(backpressure),简单来说就是当接收方的接收速率低于发送方的发送速率,这时如果不做处理就会导致接收方的数据积压越来越多直到内存溢出,所以此时需要一个机制来根据接收方的状态反过来限制发送方的发送速率&…

精英ECS Z97-MACHINE V1.0 BIOS MX25L6406E

官网上的两个BIOS我都无法亮机,这是我保存出来的BIOS,不知道是否能使用五代的处理器 官网:Z97-MACHINE|Motherboard|产品|ECS 精英电脑 国外老哥的看法:ECS Z97-MACHINE Closer Look: The BIO…

手拉手助成长社会融合实践活动之新春送温暖

2月21日上午来自合肥市第四十五中学橡树湾校区七(15)中队、共青团合肥市第六中学2023级36班委员会的40多名同学,带来了龙年的礼物看望陪伴合肥市庐阳区为民社会工作服务中心幸福小院的兄弟姐妹。 大家详细地了解了幸福小院孩子们学习、康复和社会实践及能力提升情况…

CSS列表学习2

之前学习了列表&#xff1b;继续熟悉&#xff1b; <!DOCTYPE html> <html> <head> <meta http-equiv"Content-Type" content"text/html; charsetutf-8"/><title></title><meta charset"utf-8" /><…

通用性技术底座AI大模型与各行业专用性AI小模型搭建(第二篇)

五、小模型架构选择问题 在选择行业专用AI小模型的架构时&#xff0c;需要考虑以下几个关键因素&#xff1a; 1. **任务类型**&#xff1a; - 不同的任务类型&#xff08;如分类、回归、序列生成、图像识别等&#xff09;对应着不同的模型结构。例如&#xff0c;文本分类问…

Ansible cron模块 适用于管理计划任务 设置多个计划任务

目录 选项添加一个计划任务检查是否添加成功删除计划任务检查是否执行成功 选项 其使用的语法跟我们的crontab文件中的语法一致&#xff0c;同时&#xff0c;可以指定以下选项&#xff1a; day #日应该运行的工作( 1-31, , /2, ) hour # 小时 ( 0-23, , /2, ) minute #分钟( 0…

Leetcode 26-30题

删除有序数组中的重复项 给定一个有序数组&#xff0c;要求原地删除重复出现的元素&#xff0c;返回删除后的数组的长度。 这里的原地删除其实可以这样表示&#xff0c;用双指针从前往后扫一遍&#xff0c;遇到新的没出现过的元素就放到前面去&#xff0c;就可以实现删除后的数…

【杭州游戏业:创业热土,政策先行】

在前面的文章中&#xff0c;我们探讨了上海、北京、广州、深圳等城市的游戏产业现状。现在&#xff0c;我们切换视角&#xff0c;来看看另一个游戏创业热土——杭州的发展情况 最近第19届亚运会在杭州举办&#xff0c;本次亚运会上&#xff0c;电子竞技首次获准列为正式比赛项…

了解 Kubernetes

1 Kubernetes概述 1.1 k8s是什么 K8S 的全称为 Kubernetes (K12345678S)&#xff0c;PS&#xff1a;“嘛&#xff0c;写全称也太累了吧&#xff0c;不如整个缩写” 作用&#xff1a; 用于自动部署、扩展和管理“容器化&#xff08;containerized&#xff09;应用程序”的开…

Manacher算法和扩展kmp

Manacher算法 a情况 b情况 具体例子 c情况 总结 代码 #include<iostream> #include<algorithm> #include<string> #include<cmath>using namespace std; const int N 1.1e7 1; char ss[N << 1]; int p[N << 1]; int n; void manacherss…