Celery 异步分布式任务队列
Celery 5.4.0 官方文档
环境:3台 centos7.9 普通用户
redis | Scheduler | worker | |
---|---|---|---|
dp95 | 1 | ||
dp96 | 1 | 1 | 1 |
dp97 | 1 |
文章目录
- Celery 异步分布式任务队列
- 1、Celery 介绍
- 2、安装部署
- 2.1 安装消息中间件(broker)
- 2.2 安装Celery
- 3、功能测试
- 3.1 创建任务
- 3.1.1 在dp96 机器上创建应用文件 tasks.py
- 3.1.2 分发应用到dp95、dp97 机器相同位置
- 3.2 启动worker服务
- 3.2.1 在dp95、dp96、dp97三台机器分别启动celery worker服务
- 3.2.2 查看redis服务,borker 已存在队列,backend 无任何结果
- 3.3 执行分布式异步任务
- 3.4 查看异步任务结果
1、Celery 介绍
Celery是一个简单、灵活、可靠的分布式系统,基于python开发,可以处理大量的消息,同时提供维护这样一个系统所需的工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。
使用场景:
- 定时任务:定时爬虫、算法模型定时输出
- 异步任务:I/O密集型任务,消息推送、邮件发送、ai客服
- 分布式调度:airflow + celery 大数据ETL调度
优点:使用后再说
架构:AMQP(Advanced Message Queuing Protocol)高级消息队列协议
2、安装部署
2.1 安装消息中间件(broker)
Celery 需要一个中间件来进行接收和发送消息,通常以独立的服务形式出现,成为 消息中间人(Broker)。官方推荐RabbitMQ和Redis。
RabbitMQ:
- 优点:支持AMQP(高级消息队列协议),可靠性高,支持消息持久化,有丰富的功能特性(如消息确认、重试、超时、死信队列等)。
- 缺点:学习曲线较陡峭,配置复杂,性能可能较低。
Redis:
- 优点:配置简单,性能高,可以用作消息队列用于简单的场景。
- 缺点:不支持AMQP,不适合重载的消息队列处理,不支持消息的持久化和异步确认。
本次测试安装 redis http://download.redis.io/releases/
# redis 安装
# 解压
[dp96]$ tar -zxvf redis-7.2.4.tar.gz
[dp96]$ cd redis-7.2.4
# 编译
[dp96]$ make
# 安装
[dp96]$ make PREFIX= ~/redis install
# 修改配置文件
[dp96]$ vim ./redis.conf
'''
bind * -::* # 绑定主机地址
protected-mode no # 保护模式设置为 no,允许外网连接
port 6379 # 监听端口
timeout 0 # 当客户端闲置多长时间后关闭连接,如果指定为 0,表示关闭该功能
daemonize yes # yes表示启用守护进程,默认是no即不以守护进程方式运行
loglevel notice # 日志级别
logfile ./redis-server.log # 指定 Redis 服务器的日志文件路径
dir ./ # Redis 服务器的工作目录,即数据库文件的存放路径
pidfile /tmp/redis_6379.pid # 进程文件
'''
# 启动redis
[dp96]$ cd ~/redis/bin
[dp96]$ ./redis-server ~/opt/redis/redis-7.2.4/redis.conf
# 连接验证
[dp96]$ ./redis-cli
127.0.0.1:6379> CONFIG GET *
# redis 常用操作
# Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)
# 切换数据库0-15
127.0.0.1:6379> select 1
127.0.0.1:6379> scan 0 /keys * # 查看0号数据库所有的key
# 字符串类型
127.0.0.1:6379> set a 'test'
127.0.0.1:6379> get a # "test"
127.0.0.1:6379> del a
# 哈希类型,适合存储对象
127.0.0.1:6379> hmset a field1 'Hello' field2 'world'
127.0.0.1:6379> hget a # (error) ERR wrong number of arguments for 'hget' command
127.0.0.1:6379> hget a field2 # "world"
127.0.0.1:6379> del a # 重复key 会报错
# 列表
127.0.0.1:6379> lpush a redis
127.0.0.1:6379> lpush a rabbitmq
127.0.0.1:6379> lpush a 1
127.0.0.1:6379> lrange a 0 10
'''
1) "1"
2) "rabbitmq"
3) "redis"
'''
# 集合
127.0.0.1:6379> sadd a 1
127.0.0.1:6379> sadd a 1
127.0.0.1:6379> sadd a 2
127.0.0.1:6379> smembers a
'''
1) "1"
2) "2"
'''
# 有序集合 zadd key score member
127.0.0.1:6379> zadd a 1 张三
127.0.0.1:6379> zadd a 1 李四
127.0.0.1:6379> zadd a 2 王五
127.0.0.1:6379> zrangebyscore a 0 1 # 选去0-1分的集合元素
'''
1) "\xe5\xbc\xa0\xe4\xb8\x89"
2) "\xe5\xe6\x9d\x8e\xe5\x9b\x9b"
'''
2.2 安装Celery
# 安装celery
(airflow)[dp96]$ pip install celery -i https://pypi.tuna.tsinghua.edu.cn/simple
(airflow)[dp95]$ pip install celery
(airflow)[dp97]$ pip install celery
# 安装 redis Celery 远程连接redis服务时使用
(airflow)[dp96]$ pip install redis
(airflow)[dp95]$ pip install redis
(airflow)[dp97]$ pip install redis
3、功能测试
3.1 创建任务
3.1.1 在dp96 机器上创建应用文件 tasks.py
# 创建 tasks.py
from celery import Celery
import time
app = Celery('tasks', broker='redis://10.18.18.96:6379/0',backend='redis://10.18.18.96:6379/1') # broker 任务队列;backend 结果存储数据库
@app.task
def time_sleep(n):
time.sleep(n)
return f'延时{n}s函数'
3.1.2 分发应用到dp95、dp97 机器相同位置
(airflow)[dp96 ~/celery]$ scp -r tasks.py dp95:~/celery
(airflow)[dp96 ~/celery]$ scp -r tasks.py dp97:~/celery
3.2 启动worker服务
3.2.1 在dp95、dp96、dp97三台机器分别启动celery worker服务
# 以下是Celery命令入口点的选项解释:
-A, --app APPLICATION: 指定Celery应用的模块名或路径。
-b, --broker TEXT: 指定消息代理的地址,例如RabbitMQ或Redis。
--result-backend TEXT: 指定任务结果的后端存储地址。
--loader TEXT: 指定Celery加载器的类型。
--config TEXT: 指定配置文件的路径。
--workdir PATH: 指定Celery工作目录。
-C, --no-color: 禁用彩色输出。
-q, --quiet: 静默模式,减少输出。
--version: 显示Celery版本号。
--skip-checks: 跳过配置检查。
# tasks 是我们任务所在的文件名,workdir tasks.py任务文件所在目录 worker 表示启动的是 worker 程序
(airflow)[dp96 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/ worker --loglevel=info
(airflow)[dp95 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/ worker --loglevel=info
(airflow)[dp97 ~/celery]$ ..envs/airflow/bin/celery -A tasks --workdir ~/celery/ worker --loglevel=info
3.2.2 查看redis服务,borker 已存在队列,backend 无任何结果
3.3 执行分布式异步任务
3.3.1 在dp96另一个shell界面,在task.py文件同目录下,创建test.py文件发布任务
from tasks import time_sleep
import time
def test():
start = time.time()
res = []
for i in range(10):
res_ = time_sleep.delay(5) # 发布任务到broker (redis),正常同步任务执行时间10*5=50s
stop = time.time()
print(f'运行时间:{stop-start}')
3.4 查看异步任务结果
dp95:
dp96:
dp97:
如图所示,3台机器分别从队列中取出了3、4、3个睡眠5秒的任务,并在各自机器上并行(异步)运行,3台机器累计运行任务共花费5s
redis 后台结果数据库: