redis + celery

首先,部署Redis数据库:

先下载包:

wget http://download.redis.io/releases/redis-5.0.7.tar.gz

解压redis包:

tar -xvf redis-5.0.7.tar.gz

编译:

make 

sudo make install   (这样没有指定安装目录)

 

# 注意,redis默认安装路径:/usr/local/bin,这样其实挺好的,并不需要折腾,其实准确

# 的来说,当执行完两个make之后,就会在redis包下的src目录下生成所有必要文件,同

# 时,将一些可执行文件扔一份到 /usr/local/bin 当然,如果不想将这些可执行二进制文件

# 扔到 /usr/local/bin,可以自行指定位置,安下面命令执行即可

 

sudo make PREFIX=/usr/local/redis install  (指定redis的安装目录)

 安装完成后长这样:

c813fb02320940b59969d98bc26316d4.png

将redis配置文件复制到bin目录下(先新建文件夹然后再将redis配置文件coyp进去)

我们要将配置文件复制一份,我们以后就是用这个配置文件来启动。

cd /usr/local/bin

sudo mkdir redis_config

# 回到安装redis目录,因为redis.conf文件在这里

c160d40048b34cf4abdebf52dbae7cc2.png

sudo cp redis.conf /usr/local/bin/redis_config

 

接下来修改配置文件:

vi /usr/local/bin/redis_config/redis.conf

这里有几个地方需要注意:

第一个地方,绑定地址,允许访问的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0

0c773ee90a4e4414a8c7595978345929.png

 

第二个地方,设置守护,守护进程,修改为yes后即可后台运行

e879e96c7b3d4521888df5b8a355bb7e.png

 

第三个地方,设置密码,设置后访问Redis必须输入密码。这里注意,redis没有用户名一说,只有服务地址和密码,密码还可以不给,不像postgres等数据库,需要严格的身份验证。

57012d8d626849f59bff844220fa727b.png

其他的,基本不太动...

# 监听的端口
port 6379
# 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录
dir .
# 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15
databases 1
# 设置redis能够使用的最大内存
maxmemory 512mb
# 日志文件,默认为空,不记录日志,可以指定日志文件名
logfile "redis.log"

接下来,启动redis:

cd /usr/local/bin

redis-server redis_config/redis.conf

1c4c0444649444f29996dd394d1afcf4.png

启动客户端:

cd /usr/local/bin

redis-cli -h 127.0.0.1 -p 6379

设置Redis开机自启动

首先,新建一个系统服务文件:

vi /etc/systemd/system/redis.service

文件内容为:

[Unit]
Description=redis-server
After=network.target

[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /usr/local/bin/redis_config/redis.conf
PrivateTmp=true

[Install]
WantedBy=multi-user.target

这里其他的没啥,注意这个参数 ExecStart,填对就行。

systemctl daemon-reload 

现在,我们可以用下面这组命令来操作redis了:

# 启动
systemctl start redis
# 停止
systemctl stop redis
# 重启
systemctl restart redis
# 查看状态
systemctl status redis

执行下面的命令,可以让redis开机自启:

systemctl enable redis

 

好了,接下来,开始扯 celery 

Celery 是一款简单灵活可靠的分布式任务执行框架,支持大量任务的并发执行。

Celery 采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。

8e6f2b706cac40efba4eb17a07be3efa.png

  • 提交任务给 Broker 队列
  • 如果是异步任务,Worker 会立即从队列中取出任务并执行,执行结果保存在 Backend 中
  • 如果是定时任务,任务由 Celery Beat 进程周期性地将任务发往 Broker 队列,Worker 实时监视消息队列获取队列中的任务执行

应用场景

  • 长时间任务的异步执行, 如上传大文件
  • 实时任务执行,支持集群部署,如支持高并发的机器学习推理
  • 定时任务执行,如定时发送邮件

节点总结:

到这里,我先记录一些理解。

首先,celery它是一个典型的生产者消费者模型。也就是说,这个模型里,可以没有生产者,但是必须得有消费者。

其次,这里先了解2个命令:

celery -A tasks worker --loglevel=info --pool=solo

celery -A proj.period_task beat -l info

这里面出现了一个 worker, 一个 beat。worker 就是消费者的意思,beat 是指周期任务。

2个命令很像,但是意思完全不一样。celery beat -A ...  是说,周期的向队列里放入任务。而celery worker -A ... 是说,一旦队列里有任务,就立刻去执行任务。所以,beat 就属于生产者,而 worker 属于消费者。如果没有 worker 那么任务只会堆积,没人处理。因此,使用celery 一定得启动 worker。

第三,selery跟我们django服务里面自定义的app一样,它本身也是一个app。

安装
本文使用 Redis 作为 Broker 即消息队列

pip install celery
pip install redis

需要持久化任务的话,Broker 使用 RabbitMQ 并设置持久化队列。
官方建议生产环境首选 RabbitMQ ,突然停止或断电 Redis 可能会数据丢失。

 

Celery 的开发主要有四个步骤:

  1. 实例化 Celery
  2. 定义任务
  3. 启动任务 Worker
  4. 调用任务

先看一个简单的celery实现:

from celery import Celery

# broker 是用于存储任务的队列,backend 是用于存储任务执行结果的队列
test_celery = Celery('test', broker='redis://127.0.0.1:6379/0', 
                     backend='redis://127.0.0.1:6379/1')
# 也可以以这种方式,导入任务模块,当然,这里作为最简单的例子,是不需要的
app.conf.imports = ['tasks']

@test_celery.task
def my_add(a, b):
    return a + b

这样,一个最简单的最基本的 celery 就已经完成了。现在已经构建了一个 celery 的异步任务。但是光有任务是没有用的,首先得有消费者,就是我之前写的小结里记录的,celery 必须得有worker,然后,在搞一个生产者,将任务放到队列里,然后,自然会有worker去执行任务,代码也就会被执行了。

启动任务 Worker

celery -A my_celery worker -l info -c 4

这里千万注意,worker 的位置:

e388c599cf724c2b8e6f20806d1e9305.png

新版本,worker不让写前面了。

连接成功后,长这样子...

316260c61e4348c98e532c3c69f2af0c.png

到这里,celery 的消费者就搞定了,然后是生产者...其实生产者无非就2种,一次性的,循环性的。但是其本质又都是一样的,就是给 task 到任务队列完事。一次性的就是只触发一次将 task 压入队列,周期性的就是间隔的将 task 压入队列。

现在构建生产者,最简单的,就是这样,直接弄一个,然后,执行这个文件即可。

from my_celery import my_add

my_add.delay(1, 2)

看下结果:

81e1b6bea5174a0ea122a6df0df6ab82.png

这是刚起的 celery 的 worker 的执行结果...

6bff08848da64ef6b8e3fd9b70963651.png

换一种写法:

from my_celery import my_add

my_add.delay(1, 2)

# 使用签名模式,得到的是一个新的 task, 这种 task 可以跨越进程被调用
new_task = my_add.s(1, 2)
new_task.delay()

结果就成了:

40f31ad18b5d490ebe4a4719ac3c23b0.png

这是两种写法,先不做,讨论,一会在说。

就现在为止,基本上,我们就已经可以使用selery做事情了。尤其是在django服务中,完全可以搞一个 url 配合视图函数做任务触发,就可以利用celery做异步任务。

上面是个简单的实现,通常情况,都是写出配置文件来用的,会显得规范一些。目录结构通常是这样的。构建一个celery app的文件夹,让它和 manage.py 在同一级目录。

bbb4cedffc494485b4cc37f5e9e7ad4e.png

所有的 task 都可以放到 tasks.py 中,celery 的实例化对象可以放到 __init__.py 中,相关的配置可以放到 config.py 中。

__init__.py

from celery import Celery
from celery.schedules import crontab


test_celery = Celery('test')
# 加载配置文件
test_celery.config_from_object('my_celery.config')


# 添加周期任务,在没有调度的时候,周期任务是不会执行的,只有通过周期调度命令启动的时候
# 它们才会被执行
test_celery.conf.beat_schedule = {
    'test001': {
        'task': 'my_celery.tasks.my_add',
        # 每周一07:30执行my_add任务
        'schedule': crontab(minute='30', hour='7', day_of_week='1'),
        'args': (1, 3)
    },
    'test002': {
        'task': 'my_celery.tasks.my_add',
        # 每分钟执行一次 my_add 任务
        'schedule': crontab(minute='*/1'),
        'args': (1, 3)
    },
}

config.py 

注意:这里一定得记着导入模块,因为如果这里没写导入任务模块,那么就会导致任务模块里的任务统统没被注册,那就无法使用。

BROKER_URL = 'redis://127.0.0.1:6379/0'  # Broker,中间件,进行消息传输,使用Redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'  # Backend,结果后端,使用Redis
CELERY_RESULT_SERIALIZER = 'json'  # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间
CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区配置
CELERY_IMPORTS = (  # 导入的任务模块
    'my_celery.tasks',
)

tasks.py

from my_celery import test_celery


@test_celery.task
def my_add(a, b):
    return a + b

test.py

from my_celery.tasks import my_add

my_add.delay(1, 2)

# 定时任务,延时3秒执行
my_add.apply_async((3, 4), countdown=3)

new_task = my_add.s(5, 6)
new_task.delay()

普通的调用和之前的简单模式一样,这次看下周期调用:

celery -A my_celery beat -l info

这里有第二种写法,其意义在于,周期模式是需要记录时间的,因此,可以指定一个地方让其记录时间。 

celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

结果:

每隔1分钟将task压入队列

1fd5234c5e0c4fa09c1bfa7c9d4a5cf9.png

每隔1分钟,worker就能获取到task,并执行它 

b059de3b3dd84afa94d229ac44538a1b.png

到这里,基本的celery,就搞定了,可以使用了....

另外的一些花哨的用法,记录下:

调用任务

常规任务

  • delay():直接调用任务,是 apply_async() 的封装
  • apply_async():通过发送异步消息调用任务,可指定倒计时 countdown ,执行时间 eta ,过期时间 expires 等参数
  • signature():创建签名,可传递任务签名给别的进程使用,或作为其他函数的参数
  • s():创建签名的快捷方式
from my_celery.tasks import my_add

result = my_add.delay(1, 2)  # 直接调用
print(result.get())

result = my_add.apply_async((1, 2), countdown=2)  # 2s后执行
print(result.get())

t1 = my_add.signature((1, 2), countdown=2)  # 签名Signatures,可传递任务签名给别的进程使用,或作为其他函数的参数
result = t1.delay()
print(result.get())

t1 = my_add.s(1, 2).set(countdown=2)  # 创建签名的快捷方式
result = t1.delay()
print(result.get())

组合任务

  • group():组合,接受一个可并行调用的任务列表
  • chain():串联,将签名连接在一起,一个接一个调用(前一个签名的结果作为下一个签名的第一个参数)
  • chord():和弦,类似 group() 但包含回调,在所有任务执行完后再调用任务
  • map():将参数列表应用于该任务
  • starmap():将复合参数列表应用于该任务
  • chunks():将一个很长的参数列表分块成若干部分

任务状态跟踪

这种情况,需要对 __init__.py 做出一定的修改,添加一些内容即可

from celery import Celery, Task
from celery.schedules import crontab
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)  # 日志


test_celery = Celery('test')
test_celery.config_from_object('my_celery.config')


test_celery.conf.beat_schedule = {
    'test001': {
        'task': 'my_celery.tasks.my_add',
        'schedule': crontab(minute='30', hour='7', day_of_week='1'),
        'args': (1, 3)
    },
    'test002': {
        'task': 'my_celery.tasks.my_add',
        'schedule': crontab(minute='*/1'),
        'args': (1, 3)
    },
}


class TaskMonitor(Task):
    def on_success(self, retval, task_id, args, kwargs):
        """success时回调"""
        logger.info('task id:{} , arg:{} , successful !'.format(task_id, args))

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """retry时回调"""
        logger.info('task id:{} , arg:{} , retry !  einfo: {}'.format(task_id, args, exc))

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """failure时回调"""
        logger.info('task id: {0!r} failed: {1!r}'.format(task_id, exc))





然后,再修改下 tasks.py

from my_celery import test_celery, TaskMonitor


@test_celery.task
def my_add(a, b):
    return a + b


@test_celery.task(base=TaskMonitor)
def my_add_1(a, b):
    return a + b

命令行参数:

关于 celery 命令行的启动等参数,都在这了

参数含义全称
-A指定模块 
-l日志level–loglevel
-c进程数–concurrency
-Q指定队列–queue
-B周期性任务–beat
-P池的实现–pool

 

搭建redis:

Redis基础——1、Linux下安装Redis(超详细)_linux安装redis_原首的博客-CSDN博客

使用redis:

https://www.cnblogs.com/fuminer/p/17254164.html

celery的使用:

Python定时任务库Celery——分布式任务队列_python celery_XerCis的博客-CSDN博客

 

其他参考,总之,看到的帖子,都有错误之处,往往不能让我通达,故写此贴:

https://www.cnblogs.com/clark1990/p/17174251.html

Periodic Tasks — Celery 5.3.5 documentation

https://docs.celeryq.dev/en/stable/reference/cli.html

Python定时任务库Celery——分布式任务队列_python 使用分布式消息系统celery实现定时任务 自动执行python 脚本_XerCis的博客-CSDN博客

Python-Celery定时任务、延时任务、周期任务、crontab表达式及清除任务的基本使用与踩坑 - 知乎

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/160417.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

H5ke11--3介绍本地,会话存储

代码顺序: 1.设置input,捕获input如果有多个用属性选择符例如 input[typefile]点击事件.向我们的本地存储设置键值对 2.在点击事件外面设置本地存储表示初始化的值.点击上面的事件才能修改我们想修改的值 会话(session)浏览a数据可以写到本地硬盘,关闭页面数据就没了 本地(…

【docker】Docker网络与iptables

Docker能为我们提供很强大和灵活的网络能力,很大程度上要归功于与iptables的结合。在使用时,你可能没有太关注到 iptables在其中产生的作用,这是因为Docker已经帮我们自动的完成了相关的配置。 iptables在Docker中的应用主要是用于网络流量控…

数据结构与算法面试题——C++

自己在秋招过程中遇到的数据结构与算法方面的面试题 数据结构 vector vector是⼀种序列式容器,与array唯⼀差别就是对于空间运⽤的灵活性 array占⽤的是静态空间,⼀旦配置了就不可以改变⼤⼩,如果遇到空间不⾜的情况还要⾃⾏创建更⼤的空间…

AlphaControls控件TsDBCombobox出错:访问违规

日常使用AlphaControls控件TsDBCombobox,作为数据变化数据的控件。通常正常使用,一日 发现,出现以下错误: 控件访问违规的源代码,出错代码: function TacMainWnd.CallPrevWndProc(const Handle: hwnd; co…

UnoCss(原子化css引擎) 让你的开发更轻松愉快

什么是原子化CSS,UnoCSS又是什么,对此有疑问的推荐看下antfu的这篇文章——重新构想原子化 CSS (antfu.me) 相信看完这篇文章的你也会跟我一样热衷于UnoCSS. 介绍 今天介绍一个CSS开发利器 UnoCSS , 是一个具有高性能且极具灵活性的即时原子化 CSS 引擎…

若依启动步骤

1.创建数据库 2.启动redis 3.改后端的数据库连接配置 4.配置redis redis的地址:cmd中ipconfig命令查看 6.启动后端:如下 7.启动前端ruoyi-ui中 先运行npm install,再npm run dev。项目就启动成功了。 用户名:admin 密码&#x…

二十九、W5100S/W5500+RP2040树莓派Pico<Web socket Server>

文章目录 1 前言2 简介2 .1 什么是WebSocket协议?2.2 WebSocket协议工作原理2.3 WebSocket协议优点2.4 WebSocket应用场景 3 WIZnet以太网芯片4 WebSocket示例概述以及使用4.1 流程图4.2 准备工作核心4.3 连接方式4.4 主要代码概述4.5 结果演示 5 注意事项6 相关链接…

Leetcode 剑指 Offer II 053. 二叉搜索树中的中序后继

题目难度: 中等 原题链接 今天继续更新 Leetcode 的剑指 Offer(专项突击版)系列, 大家在公众号 算法精选 里回复 剑指offer2 就能看到该系列当前连载的所有文章了, 记得关注哦~ 题目描述 给定一棵二叉搜索树和其中的一个节点 p ,找到该节点在…

【精选】HTML5最全知识点集合

HTML5简介与基础骨架 HTML5介绍 HTML5是用来描述网页的一种语言&#xff0c;被称为超文本标记语言。用HTML5编写的文件&#xff0c;后缀以.html结尾 HTML是一种标记语言&#xff0c;标记语言是一套标记标签。标签是由尖括号包围的关键字&#xff0c;例如&#xff1a;<html…

基于单片机智能液位水位监测控制系统设计

**单片机设计介绍&#xff0c; 基于单片机智能液位水位监测控制系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机的智能液位水位监测控制系统可以用来检测和控制液位的高低&#xff0c;并可以用于水泵的控制和自…

戴姆勒——从豪华私家车到无人驾驶飞机

戴姆勒(DaimlerAG)是梅赛德斯-奔驰和精灵(Smart)汽车的德国母公司。自1926年其前身公司合并为戴姆勒-奔驰公司以来&#xff0c;戴姆勒在生产豪华和消费型汽车、卡车和公共汽车方面有着悠久的历史。 如今&#xff0c;除了以其精密设计的汽车闻名外&#xff0c;该公司还在设计、…

⑩④【MySQL】什么是视图?怎么用?视图的检查选项? 视图的作用?[VIEW]

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ 视图VIEW ⑩④详解MySQL视图1. 视图的基本使用…

NPDP 02组合管理

NPDP 产品经理认证知识体系指南解读&#xff0c;02组合管理 第二章 组合管理 公司战略或经营战略以及创新战略&#xff0c;为竞争性创新投资之间的权衡决策提供了整体方向和框架。在发展和持续性维护一个组织的产品组合时&#xff0c;总要面对一系列彼此竞争资源和投资的项目。…

11.5MyBatis(进阶)

一.${}和#{} 1.$是直接替换,#是预处理(使用占位符,替换成?).前者不安全(SQL注入), 后者安全. 2.$的使用场景: 如果传递的值是sql的关键字,只能使用$,不能使用#(asc,desc). 二.SQL注入 注意: 如果使用${}进行传参,一定要是可以穷举的,并且要进行安全性验证(例如排序,只能传a…

【漏洞复现】​金和OA存在任意文件读取漏洞

漏洞描述 金和OA协同办公管理系统C6软件(简称金和OA),本着简单、适用、高效的原则,贴合企事业单位的实际需求,实行通用化、标准化、智能化、人性化的产品设计,充分体现企事业单位规范管理、提高办公效率的核心思想,为用户提供一整套标准的办公自动化解决方案,以帮助企…

Flume学习笔记(3)—— Flume 自定义组件

前置知识&#xff1a; Flume学习笔记&#xff08;1&#xff09;—— Flume入门-CSDN博客 Flume学习笔记&#xff08;2&#xff09;—— Flume进阶-CSDN博客 Flume 自定义组件 自定义 Interceptor 需求分析&#xff1a;使用 Flume 采集服务器本地日志&#xff0c;需要按照日志…

浅析AcrelEMS-CIA机场智慧能源管平台解决方案-安科瑞 蒋静

1 概述 机场智慧能源管平台解决方案对机场范围内变电站内的高低压配电设备 、 发电机、变压器 、UPS、EPS 、广场照明 、 室内照明 、通风及排水等机电设备进行实时分布式监控和集中管理 , 实现无人值守 , 确保高速公路安全畅通 , 提高 自动化管理水平 , 降低机电设备的运行维…

计算机视觉:驾驶员疲劳检测

目录 前言 关键点讲解 代码详解 结果展示 改进方向&#xff08;打哈欠检测疲劳方法&#xff09; 改进方向&#xff08;点头检测疲劳&#xff09; GUI界面设计展示 前言 上次博客我们讲到了如何定位人脸&#xff0c;并且在人脸上进行关键点定位。其中包括5点定位和68点定…

骨传导能保护听力吗?骨传导耳机是智商税吗?

先说答案&#xff0c;骨传导耳机是可以保护听力的&#xff01;并且骨传导耳机也不是智商税&#xff01;甚至在某些场景下&#xff0c;骨传导耳机比其他耳机更适合。 为什么说骨传导耳机会保护听力呢&#xff1f;因为骨传导耳机跟入耳式耳机的传递声音方式是不一样的&#xff0c…

PACS医学影像信息化数字平台源码

PACS系统对医院影像科意义重大&#xff0c;将业务量巨大的影像检验流程依托于信息化技术&#xff0c;对于进行信息化建设的医院而言&#xff0c;是十分必要的。 PACS系统源码&#xff0c;集成三维影像后处理功能&#xff0c;包括三维多平面重建、三维容积重建、三维表面重建、三…