Celery(分布式任务队列)入门学习笔记

Celery 的简单介绍

用 Celery 官方的介绍:它是一个分布式任务队列; 简单,灵活,可靠的处理大量消息的分布式系统; 它专注于实时处理,并支持任务调度。

Celery 如果使用 RabbitMQ 作为消息系统的话,整个应用体系就是下面这张图

Celery 官方给出的 Hello World, 对于未接触它的人来说根本就不知道是什么

1

2

3

4

5

6

7

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task

def hello():

    return 'hello world'

还是有必要按住上面那张图看 Celery 的组成部分

  1. Celery 自身实现的部分其实是 Producer 和 Consumer. Producer 创建任务,并发送消息到消息队列,我们称这个队列为 Broker。Consumer 从 Broker 中接收消息,完成计算任务,把结果存到 Backend
  2. Broker 就是那个消息队列,可选择的实现有 RabbitMQ, Redis, Amazon SQS
  3. 结果存储(Backend), 可选择 AMQP(像 RabbitMQ 就是它的一个实现), Redis, Memcached, Cassandra, Elasticsearch, MongoDB, CouchDB, DynamoDB, Amazon S3, File system 等等,看来它的定制性很强
  4. 消息和结果的存储还涉及到一个序列化的问题,可选择 pickle(Python 专用), json, yaml, msgpack. 消息可用 zlib, bzip2 进行压缩, 或加密存储
  5. Worker 的并发可采用 prefork(多进程), thread(多线程), Eventlet, gevent, solo(单线程)]

Celery 应用的基础选型

Celery 的 Broker 和 Backend 有非常多的选择组合,RabbitMQ 和 Redis 都是即可作为 Broker 又能用作 Backend。但 Celery 的推荐是用 RabbitMQ 作为 Broker, 小的结果这里选择用 Redis 作为 Backend, 所以这里的选型是

  1. Broker: RabbitMQ
  2. Backend: Redis
  3. 序列化:JSON  -- 方便在学习中查到消息中的数据
准备 Redis

安装 Python 包

在需要运行 Producer 和 Consumer(worker) 的机器上创建一个 Python 虚拟环境,然后安装下面的包

$ pip install celery redis

实践中只需要安装 celery redis 就能运行后面的例子,没有安装 librabbitmq, "celery[librabbitmq]" 也行,安装了这两个库能使用更高效的 librabbitmq C 库。如果安装了 librabbitmq 库,broker='amqp://...'  默认使用 librabbitmq, 找不到 librabbitmq 的话就用 broker='pyamqp://...'

$ pip install librabbitmq
$ pip install "celery[librabbitmq]"

注:中括号中的是安装 Celery 提供的 bundle, 它定义在 setup.py 的 setup 函数中的 extras_require。

Celery 应用实战

我们不用 Celery 的 Hello World 实例,那不能帮助我们理解背后发生了什么。创建一个 tasks.py 文件

1

2

3

4

5

6

7

8

9

10

11

from celery import Celery

app = Celery('celery-demo',

                broker='amqp://celery:your-password@192.168.86.181:5672/',

                backend='redis://192.168.86.181:6379')

@app.task

def add(x, y):

    return x + y

这里配置连接到 brocker 的 / vhost, 如果连接到别的 vhost, 如 celery 的话, url 写成 amqp://celery:your-passoword@192.168.86.181:5672/celery. backend 的 redis 如果要配置密码, 和 db 的话,写成 redis://:password@192.168.86.181:6379/2

暂且不在该脚本中直接执行 add.delay(15, 30), 而是放到 Python 控制台下方便测试

现在进到 Python 控制台

1

2

3

4

5

6

>>> from tasks import add

>>> task = add.delay(15, 30)

>>> task.id

'c3552fa2-502a-450b-933b-19a1da65ba33'

>>> task.status

'PENDING'

由于 Worker 还没有启动,所以得到一个 task_id, 状态是 PENDING。趁这时候看看 Celery 目前做了什么,来查看到 RabbitMQ

7

celery direct

Celery 在 RabbitMQ 中创建了的资源有

  1. 一个 Exchange: celery direct
  2. 两个 binding: 送到默认(空字符串)或 celery exchange 的, routing-key 为 celery 的消息会转发到队列 celery 中
  3. 一个队列 celery

查看队列 celery 中的消息

1

2

3

4

5

6

vagrant@celery:~$ rabbitmqadmin get queue=celery ackmode=ack_requeue_true

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

| routing_key | exchange | message_count |                                       payload                                       | payload_bytes | payload_encoding | redelivered |

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

| celery      |          | 0             | [[15, 30], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}] | 83            | string           | False       |

+-------------+----------+---------------+-------------------------------------------------------------------------------------+---------------+------------------+-------------+

ackmode=ack_requeue_true, 所以消息仍然在队列中, Redis 中什么也还没发生,接下来要

启动 Celery Worker

要用到 celery 命令,不过只要是 Python 的程序,命令行能做的事情总是能用 Python 代码来执行,用 celery --help 可看它的详细说明。

$ celery -A tasks worker -l INFO

tasks 是自己创建的模块文件 tasks.py

这时候显示出一条绿绿的芹菜出来了,所以得用屏幕截图来表现

取出消息并显示任务执行完成,这时候去看 RabbitMQ 的队列 celery 中的消息不见了,启动 Worker 后也会在 RabbitMQ 中创建 queue, 及对应的 binding, exchange。

再回到提交任务的 Python 控制台

1

2

3

4

>>> task.status

'SUCCESS'

>>> task.result

45

一个 Celery 全套服务圆满完成。结果存在了 Redis 中

192.168.86.181:6379> keys *
1) "celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33"
192.168.86.181:6379> TTL celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
(integer) 85840
192.168.86.181:6379> get celery-task-meta-c3552fa2-502a-450b-933b-19a1da65ba33
"{\"status\": \"SUCCESS\", \"result\": 45, \"traceback\": null, \"children\": [], \"date_done\": \"2022-01-17T07:23:48.901999\", \"task_id\": \"c3552fa2-502a-450b-933b-19a1da65ba33\"}"

Redis 中的结果保存时长为 24 小时,失败的任务会记录下异常信息。

关于 Worker 的控制查看帮助 celery worker --help, 比如

  1. -c, --concurrency: 并发数,默认为系统中 CPU 的内核数
  2. -P, --pool [prefork|eventlet|gevent|solo|processes|threads]:  worker 池的实现方式
  3. --max-tasks-per-child INTEGER: worker 执行的最大任务数,达到最大数目后便重启当前 worker
  4. -Q, --queues: 指定处理任务的队列名称,逗号分隔

任务的状态变迁是:PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Celery 的配置

除了在声明 Celery 对象时可以指定 broker, backend 属性之外,我们可以用 py 配置文件的形式来配置更多的内容,配置文件 celeryconfig.py, 内容是 Configuration and defautls 中列出的项目

比如 celeryconfig.py

1

2

3

4

5

6

7

broker_url = 'amqp://celery:your-password@192.168.86.181:5672/'

result_backend = 'redis://192.168.86.181:6379'

task_serializer = 'json'

result_serializer = 'json'

accept_content = ['json']

timezone = 'America/Chicago'

enable_utc = True

新的格式是用小写的,旧格式用大写,如 BROKER_URL, 但是同一个配置文件中不能混合大小写,同时写 BROKER_URL 和 result_backend 就不行了。

然后在 tasks.py 中加载配置文件

1

2

3

4

from celery import Celery

import celeryconfig

app = Celery('celery-demo')

app.config_from_object(celeryconfig)

Celery 实时监控工具

Flower 是一个基于 Web 的监控 Celery 中任务的工具,安装和启动

$ pip install flower
$ celery -A tasks flower

打开链接 http://localhost:5555

其他剩下的问题,应该就是如何安排 Worker(比如结合 AutoScaling),从 Python 代码中启动 Worker, 怎么做灵活的配置, 调度任务的执行,其他的 backend 选择等等。

其他补充

backend rpc:// 的组合

如果配置中用

1

2

broker_url = 'amqp://celery:password@192.168.86.50:5672/celery'

result_backend = 'rpc://'

amqp 和 rpc:// 的组合,任务和结果都会存在 RabbitMQ 中

1

2

broker_url = 'redis://192.168.86.50'

result_backend = 'rpc://'

redis 和  rpc:// 的组合,任务和结果都保存在 Redis 中

为什么 Celery 推荐使用 RabbitMQ, 一说是它的一开发人员负责开发过 RabbitMQ, 所以即使使用 Redis 时,也会在 Redis 中写入有关 RabbitMQ 概念的数据,如 exchange, routing key 等。

 常见问题

Celery ValueError: not enough values to unpack (expected 3, got 0)的解决方案

先安装eventlet

pip install eventlet

然后,启动worker的时候加一个参数,如下:

celery -A <moduleName> worker -l info -P eventlet

然后就可以正常运行worker执行任务了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/597602.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

GStreamer日志调试笔记

1、查询所有分类 #gst-launch-1.0 --gst-debug-help 2、查询videotestsrc的日志 #gst-launch-1.0 --gst-debug-help | findstr videotestsrc 结果&#xff1a; 3、使用--gst-debug设置相应日志类型的相应等级&#xff0c;越大显示日志越多&#xff0c;排查内存泄露可以设置为9 …

【iOS】多线程

文章目录 前言一、多线程的选择方案二、GCD和NSOperation的比较二、多线程相关概念任务队列 三、死锁情况主队列加同步任务 四、任务队列组合主队列异步并发队列异步 前言 这两天将iOS的多线程的使用都看了一遍&#xff0c;iOS的多线程方案有许多&#xff0c;本篇博客主要总结…

打开深度学习的锁:(0)什么是神经网络?

PS&#xff1a;每每温故必而知新 什么是神经网络&#xff1f; 一、一个单神经元的神经网络二、多个单神经元的神经网络三、到底什么是机器学习&#xff1f;&#xff08;重点&#xff09;1&#xff1a;什么是机器学习的训练&#xff1f;2&#xff1a;什么是模型&#xff1f;权重…

python数据分析所需要的语法基础

Python语言基础——语法基础 前言语法基础变量标识符数据类型输入与输出代码缩进与注释 总结 前言 对于学过C语言的人来说&#xff0c;python其实很简单。学过一种语言&#xff0c;学习另一种语言&#xff0c;很显然的能感觉到&#xff0c;语言大体上都是相通的。当然&#xf…

【docker】常用的把springboot打包为docker镜像的maven插件

Spring Boot Maven Plugin: Spring Boot 自带的 Maven 插件 (spring-boot-maven-plugin) 支持直接生成 Docker 镜像。通过配置&#xff0c;可以在 Maven 构建过程中自动构建 Docker 镜像&#xff0c;而无需单独编写 Dockerfile。这种方法简化了将应用打包为 Docker 镜像的过程。…

武汉理工大学python123实验——流程控制结构

1.百分制成绩转换五分制#1707 n int(input())if n>90:print(A) elif n>80:print(B) elif n>70:print(C) elif n>60:print(D) else:print(E) 2.角古猜想#73963 n eval(input()) if n<0:print(ERROR) elif . in str(n):print(ERROR) else:print(n,end" …

Qt扫盲-Qt D-Bus概述

Qt D-Bus概述 一、概述二、总线三、相关概念1. 消息2. 服务名称3. 对象的路径4. 接口5. 备忘单 四、调试五、使用Qt D-Bus 适配器1. 在 D-Bus 适配器中声明槽函数1. 异步槽2. 只输入槽3. 输入输出槽4. 自动回复5. 延迟回复 一、概述 D-Bus是一种进程间通信(IPC)和远程过程调用…

Java面试题:多线程2

如何停止正在运行的线程 1,使用退出标志,使线程正常退出(run方法中循环对退出标志进行判断) 2,使用stop()方法强行终止(不推荐) 3,调用interrupt()方法中断线程 打断阻塞线程(sleep,wait,join),线程会抛出InterruptedException异常 打断正常的线程,可以根据打断状态来标记…

继承知识及扩展(C++)

1. 继承是什么&#xff1f; 继承是面向对象编程的三大特征之一&#xff0c;也是代码复用的手段之一。之前我们在很多的地方尝试函数的复用&#xff0c;而继承是为了类的复用提供了很好的方式。 &#xff08;1&#xff09;继承的代码怎么写 在一个类后面使用 &#xff1a;继承方…

知识图谱融入RAG模型:LinkedIn重塑智能客服新范式【附LeCun哈佛演讲PPT】

原文&#xff1a;Retrieval-Augmented Generation with Knowledge Graphs for Customer Service Question Answering 一、研究背景与问题 在客服领域,快速准确地匹配用户问题与历史工单,是提供优质回答的关键。传统的检索增强生成(Retrieval-Augmented Generation, RAG)方法虽…

IDEA-控制台日志过滤插件 - Grep Console

IDEA-控制台日志过滤插件 - Grep Console 当idea控制台日志较多时&#xff0c;为了方便查找关键字&#xff0c;使用Grep Console插件&#xff0c;指定控制台中关键字高亮显示 1.安装 2.使用 2.1 高亮显示 控制台中指定颜色高亮显示指定字符 效果: 重启项目后还是会高亮显示 取…

【软考高项】三十三、质量管理

一、管理基础 质量定义 国际标准&#xff1a;反映实体满足主体明确和隐含需求的能力的特性总和。 国家标准&#xff1a;一组固有特性满足要求的程度。固有特性是指在某事或某物中本来就有的&#xff0c;尤其是那种永久的可区分的特征。 ➢ 对产品来说&#xff0c;例如…

缓存菜品操作

一&#xff1a;问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大。 二&#xff1a;实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; 每个分…

k8s保持pod健康

存活探针 Kubemetes 可以通过存活探针 (liveness probe) 检查容器是否还在运行。可以为 pod 中的每个容器单独指定存活探针。如果探测失败&#xff0c;Kubemetes 将定期执行探针并重新启动容器。 Kubemetes 有以下三种探测容器的机制&#xff1a; HTTP GET 探针对容器的 IP 地…

Day61:单调栈 739. 每日温度 496.下一个更大元素 I

739. 每日温度 给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 answer[i] 是指对于第 i 天&#xff0c;下一个更高温度出现在几天后。如果气温在这之后都不会升高&#xff0c;请在该位置用 0 来代替。 示例 1: 输…

发表博客之:gemm/threadblock/threadblock_swizzle.h 文件夹讲解,cutlass深入讲解

文章目录 [发表博客之&#xff1a;gemm/threadblock/threadblock_swizzle.h 文件夹讲解&#xff0c;cutlass深入讲解](https://cyj666.blog.csdn.net/article/details/138514145)先来看一下最简单的struct GemmIdentityThreadblockSwizzle结构体 发表博客之&#xff1a;gemm/th…

vue2 webpack-dev-server Unknown promise rejection reason

在vue.config.js中添加如下配置&#xff0c;重启项目即可 module.exports defineConfig({devServer: {client: {overlay: false,},} })参考

探索中位数快速排序算法:高效寻找数据集的中间值

在计算机科学领域&#xff0c;寻找数据集的中位数是一个常见而重要的问题。而快速排序算法作为一种高效的排序算法&#xff0c;可以被巧妙地利用来解决中位数查找的问题。本文将深入探讨中位数快速排序算法的原理、实现方法以及应用场景&#xff0c;带你领略这一寻找中间值的高…

vue 金额组件,输入提示单位:‘千’、‘万’、‘十万’...并用‘,’三个格式化

近期项目中遇到一个需求&#xff0c;金额输入框&#xff0c;输入过程中自动提示‘千’、‘万’、‘十万’、‘百万’......等单位提示&#xff0c;鼠标失去焦点后&#xff0c;并用‘,’三位隔开计数。 例如&#xff1a; 输入&#xff1a;12345.99 失去焦点&#xff1a;12,34…

Vue--》从零开始打造交互体验一流的电商平台(一)

今天开始使用 vue3 ts 搭建一个电商项目平台&#xff0c;因为文章会将项目的每处代码的书写都会讲解到&#xff0c;所以本项目会分成好几篇文章进行讲解&#xff0c;我会在最后一篇文章中会将项目代码开源到我的github上&#xff0c;大家可以自行去进行下载运行&#xff0c;希…