Celery + redis 异步分布式任务队列安装测试

Celery 异步分布式任务队列

Celery 5.4.0 官方文档

环境:3台 centos7.9 普通用户

redisSchedulerworker
dp951
dp96111
dp971

文章目录

  • Celery 异步分布式任务队列
    • 1、Celery 介绍
    • 2、安装部署
      • 2.1 安装消息中间件(broker)
      • 2.2 安装Celery
    • 3、功能测试
      • 3.1 创建任务
          • 3.1.1 在dp96 机器上创建应用文件 tasks.py
          • 3.1.2 分发应用到dp95、dp97 机器相同位置
      • 3.2 启动worker服务
          • 3.2.1 在dp95、dp96、dp97三台机器分别启动celery worker服务
          • 3.2.2 查看redis服务,borker 已存在队列,backend 无任何结果
      • 3.3 执行分布式异步任务
      • 3.4 查看异步任务结果

1、Celery 介绍

Celery是一个简单、灵活、可靠的分布式系统,基于python开发,可以处理大量的消息,同时提供维护这样一个系统所需的工具。

它是一个专注于实时处理的任务队列,同时也支持任务调度

使用场景

  • 定时任务:定时爬虫、算法模型定时输出
  • 异步任务:I/O密集型任务,消息推送、邮件发送、ai客服
  • 分布式调度:airflow + celery 大数据ETL调度

优点:使用后再说

架构:AMQP(Advanced Message Queuing Protocol)高级消息队列协议

2、安装部署

2.1 安装消息中间件(broker)

Celery 需要一个中间件来进行接收和发送消息,通常以独立的服务形式出现,成为 消息中间人(Broker)。官方推荐RabbitMQ和Redis。

RabbitMQ:

  • 优点:支持AMQP(高级消息队列协议),可靠性高,支持消息持久化,有丰富的功能特性(如消息确认、重试、超时、死信队列等)。
  • 缺点:学习曲线较陡峭,配置复杂,性能可能较低。

Redis:

  • 优点:配置简单,性能高,可以用作消息队列用于简单的场景。
  • 缺点:不支持AMQP,不适合重载的消息队列处理,不支持消息的持久化和异步确认。

本次测试安装 redis http://download.redis.io/releases/

# redis 安装
# 解压
[dp96]$ tar -zxvf redis-7.2.4.tar.gz
[dp96]$ cd redis-7.2.4
# 编译
[dp96]$ make
# 安装
[dp96]$ make PREFIX= ~/redis install

# 修改配置文件
[dp96]$ vim ./redis.conf
'''
bind * -::*           # 绑定主机地址
protected-mode no     # 保护模式设置为 no,允许外网连接
port 6379             # 监听端口
timeout 0             # 当客户端闲置多长时间后关闭连接,如果指定为 0,表示关闭该功能
daemonize yes         # yes表示启用守护进程,默认是no即不以守护进程方式运行
loglevel notice       # 日志级别
logfile ./redis-server.log  # 指定 Redis 服务器的日志文件路径
dir ./                      # Redis 服务器的工作目录,即数据库文件的存放路径
pidfile /tmp/redis_6379.pid # 进程文件
'''

# 启动redis
[dp96]$ cd ~/redis/bin
[dp96]$ ./redis-server ~/opt/redis/redis-7.2.4/redis.conf

# 连接验证
[dp96]$ ./redis-cli
127.0.0.1:6379> CONFIG GET *

在这里插入图片描述

# redis 常用操作
# Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)

# 切换数据库0-15
127.0.0.1:6379> select 1 
127.0.0.1:6379> scan 0 /keys *       # 查看0号数据库所有的key

# 字符串类型
127.0.0.1:6379> set a 'test' 
127.0.0.1:6379> get a           # "test"
127.0.0.1:6379> del a

# 哈希类型,适合存储对象
127.0.0.1:6379> hmset a field1 'Hello' field2 'world' 
127.0.0.1:6379> hget a            # (error) ERR wrong number of arguments for 'hget' command        
127.0.0.1:6379> hget a field2     # "world"
127.0.0.1:6379> del a             # 重复key 会报错

# 列表
127.0.0.1:6379> lpush a redis
127.0.0.1:6379> lpush a rabbitmq
127.0.0.1:6379> lpush a 1
127.0.0.1:6379> lrange a 0 10
'''
1) "1"
2) "rabbitmq"
3) "redis"
'''

# 集合
127.0.0.1:6379> sadd a 1
127.0.0.1:6379> sadd a 1
127.0.0.1:6379> sadd a 2
127.0.0.1:6379> smembers a
'''
1) "1"
2) "2"
'''

# 有序集合 zadd key score member 
127.0.0.1:6379> zadd a 1 张三
127.0.0.1:6379> zadd a 1 李四
127.0.0.1:6379> zadd a 2 王五
127.0.0.1:6379> zrangebyscore a 0 1   # 选去0-1分的集合元素

'''
1) "\xe5\xbc\xa0\xe4\xb8\x89"
2) "\xe5\xe6\x9d\x8e\xe5\x9b\x9b"
'''

2.2 安装Celery

# 安装celery
(airflow)[dp96]$ pip install celery -i https://pypi.tuna.tsinghua.edu.cn/simple

(airflow)[dp95]$ pip install celery
(airflow)[dp97]$ pip install celery

# 安装 redis Celery 远程连接redis服务时使用
(airflow)[dp96]$ pip install redis
(airflow)[dp95]$ pip install redis
(airflow)[dp97]$ pip install redis

3、功能测试

3.1 创建任务

3.1.1 在dp96 机器上创建应用文件 tasks.py
# 创建 tasks.py 
from celery import Celery
import time

app = Celery('tasks', broker='redis://10.18.18.96:6379/0',backend='redis://10.18.18.96:6379/1') # broker 任务队列;backend 结果存储数据库

@app.task
def time_sleep(n):
    time.sleep(n) 
    return f'延时{n}s函数'
3.1.2 分发应用到dp95、dp97 机器相同位置
(airflow)[dp96 ~/celery]$ scp -r tasks.py dp95:~/celery
(airflow)[dp96 ~/celery]$ scp -r tasks.py dp97:~/celery

3.2 启动worker服务

3.2.1 在dp95、dp96、dp97三台机器分别启动celery worker服务
# 以下是Celery命令入口点的选项解释:
-A, --app APPLICATION: 指定Celery应用的模块名或路径。
-b, --broker TEXT: 指定消息代理的地址,例如RabbitMQ或Redis。
--result-backend TEXT: 指定任务结果的后端存储地址。
--loader TEXT: 指定Celery加载器的类型。
--config TEXT: 指定配置文件的路径。
--workdir PATH: 指定Celery工作目录。
-C, --no-color: 禁用彩色输出。
-q, --quiet: 静默模式,减少输出。
--version: 显示Celery版本号。
--skip-checks: 跳过配置检查。
# tasks 是我们任务所在的文件名,workdir tasks.py任务文件所在目录 worker 表示启动的是 worker 程序
(airflow)[dp96 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info

(airflow)[dp95 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info
(airflow)[dp97 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/  worker  --loglevel=info

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.2.2 查看redis服务,borker 已存在队列,backend 无任何结果

在这里插入图片描述
在这里插入图片描述

3.3 执行分布式异步任务

3.3.1 在dp96另一个shell界面,在task.py文件同目录下,创建test.py文件发布任务

from tasks import time_sleep
import time

def test():
    start = time.time()
    res = []
    for i in range(10):
        res_ = time_sleep.delay(5)  # 发布任务到broker (redis),正常同步任务执行时间10*5=50s
    stop = time.time()
    print(f'运行时间:{stop-start}')

在这里插入图片描述

3.4 查看异步任务结果

dp95:
在这里插入图片描述

dp96:

在这里插入图片描述

dp97:

在这里插入图片描述

如图所示,3台机器分别从队列中取出了3、4、3个睡眠5秒的任务,并在各自机器上并行(异步)运行,3台机器累计运行任务共花费5s

redis 后台结果数据库:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/597150.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

mac 本地使用docker 运行es,kibana

1.下载 m芯片一些版本不支持.踩过坑.翻看官网才知道只有部分镜像支持m芯片 https://hub.docker.com/添加链接描述 docker pull elasticsearch:7.17.21 docker pull kibana:7.17.21镜像已经下载下来了 2.创建文件映射-挂载 /Users/lin/dev/dockerMsg 其中lin是自己的用户名…

【数据结构/C语言】单链表的实现

目录 一、单链表的基本概念 单链表的简介 单链表的特点 二、预备知识 三、单链表的基本结构 四、单链表的基本操作 1.链表打印 2.申请节点 3.头插 4.尾插 5.头删 6.尾删 7.查找节点 8.指定位置之前插入 9.指定位置之后插入 10.删除给定节点 11.删除给定节点之…

90、动态规划-最长的有效括号

思路: 找出有效括号并且是最长的有效括号 dp[i]表示以i结尾的括号最长是多少 然后从1开始 因为从0位置不管是左括号还是右括号都是无法形成一个完成的括号。所以dp[0]0; 当i1时候,判断括号是否是)如果不是那么无法结尾&#x…

cmake进阶:变量的作用域说明一(从函数作用域方面)

一. 简介 如同 C 语言一样,在 cmake 中,变量也有作用域的概念,本文我们就来聊一聊关于 cmake 中变量作用域的问题。 接下来从三个方面进行介绍:函数作用域、目录作用域以及全局作用域。 二. 函数作用域 我把这个作用域叫做函数…

pycharm安装pandas包

import pandas时提示未安装pandas,点击下图红框选项,进行pandas安装 pycharm底部会有安装中的提示 pycharm底部提示红框的内容,说明安装成功 这个时候就可以看到import pandas不再报错了

LeetCode 611. 有效三角形的个数

原题链接:611. 有效三角形的个数 - 力扣(LeetCode) 题目说,给定一个包含非负整数的数组 num,返回其中可以组成三角形三条边的三元组个数。 示例: nums [4, 2, 3, 4]; 有效组合如下:…

NIO和NIO.2对比

Java NIO (New Input/Output) 是从Java 1.4版本开始引入的一个新的I/O API,用于替代原来的BIO(Blocking I/O)API。NIO提供了更加灵活和高效的网络通信方式,特别适合于高吞吐量的网络编程。NIO的主要特点是非阻塞模式,它…

数据结构(C):玩转顺序表

🍺0.前言 🎷1.线性表 🎸2.顺序表 📀动态顺序表的实现 💿初始化 💿检查容量是否满了,进行扩容 💿插入:头插和尾插 💿删除:头删和尾删 &…

Python实现2048游戏

提供学习或者毕业设计使用,功能基本都有,不能和市场上正式游戏相提比论,请理性对待! 在这篇博客中,我们将使用 Python 和 Pygame 库来编写经典的 2048 游戏。2048 是一个益智类游戏,通过在 4x4 网格上滑动方块并合并它们来创建一个新的数字,直到获得数字 2048 或者无法继…

bfs之走迷宫

文章目录 走迷宫广度优先遍历代码Java代码打印路径 走迷宫 给定一个 nm 的二维整数数组,用来表示一个迷宫,数组中只包含 0或 1,其中 0表示可以走的路,1表示不可通过的墙壁。 最初,有一个人位于左上角 (1,1) 处&#…

leetcode-岛屿数量-99

题目要求 思路 1.使用广度优先遍历,将数组中所有为1的元素遍历一遍,遍历过程中使用递归,讲该元素的上下左右四个方向的元素值也置为0 2.统计一共执行过多少次,次数就是岛屿数量 代码实现 class Solution { public:int solve(vec…

mac电脑如何安装python及环境搭建

(1)进入官网:Download Python | Python.org,根据自己电脑选择python (2)这里我选择的是mac,点击:macos,选择最近版本并点击进入 (3)选择mac版本: (4)点击就可以进入下载: (5)下载好之…

网站防御XSS攻击的有效策略与实施步骤

随着互联网应用的普及与发展,网站安全已成为众多企业关注的焦点,而XSS(Cross-Site Scripting)攻击作为最常见的Web安全漏洞之一,对用户数据安全构成严重威胁。本文将详细介绍网站如何有效防御XSS攻击,并提供…

Spring JdbcTemplate使用临时表+事务会话管理实现数据新增、查询及自动清除功能

需求描述: 由于某些情况下当查询过滤参数过大时,执行sql由于参数过大而报错,此时 需要使用临时表的方式,即 当参数超过某个阀值(如 1000,可调整)新增一张临时表,将原表 与 该临时表进…

2024精武杯部分复现

首先是计算机部分,这里是题目 做完才发现其实很多东西在火眼里面已经有更快的捷径了,但是自己当时没发现,还去傻傻的开虚拟机去找,说到底,还是对取证软件的理解不够,也不怎么会用。不废话了,直接…

怎么给word文件名批量替换部分文字?word设置批量替换文字教程

批量替换Word文件名中的几个字,对于经常处理大量文件的人来说,是一项非常实用的技能。以下是一个详细的步骤指南,帮助你快速完成这项任务。 首先,你需要准备一个可以批量重命名文件的工具。市面上有很多这样的工具可供选择&#x…

人工智能的发展将如何重塑网络安全

微信搜索关注公众号网络研究观,获取更多信息。 人们很容易认为人工智能 (AI) 真正出现是在 2019 年,当时 OpenAI 推出了 ChatGPT 的前身 GPT-2。 但现实却有些不同。人工智能的基础可以追溯到 1950 年,当时数学家艾伦图灵发表了题为“计算机…

密码学《图解密码技术》 记录学习 第十四章

目录 十四章 14.1 本章学习的内容 14.2 什么是 SSL/TLS 14.2.1 Alice 在 Bob 书店买书 14.2.2 客户端与服务器 14.2.3 АSSL/TLS 承载HTTP 14.2.4 SSL/TLS的工作 14.2.5 SSL/TLS也可以保护其他的协议 14.2.6 密码套件 14.2.7 SSL 与 TLS 的区别 14.3 使用 SSL/TLS 进…

产业观察:电机驱动成为人形机器人的动力核心

前不久,波士顿动力发布一则“再见,液压Atlas”视频,宣告其著名的液压驱动双足人形机器人Atlas正式退役。这则视频引起全球所有Atlas粉丝的高度关注。然而紧接着,波士顿动力便又推出了全部由电机驱动的新一代Atlas机器人&#xff0…

【Git】【MacOS】Github从创建与生成SSH公钥

创建账号 这一步不过多赘述,根据自己的邮箱新创建一个账号 配置SSH公钥 本人是macOS系统,首先从终端输入 cd ~/.ssh进入.ssh目录,然后通过 ls查看有没有一个叫做id_rsa.pub的文件 本人之前生成过SSH公钥,如果没有的话,通过 ssh-keygen -t…