python celery使用队列

在celery的配置方法中有个参数叫task_routes,是用来设置不同的任务 消费不同的队列(也就是路由)。

格式如下:

{ ‘task name’: { ‘queue’: ‘queue name’ }}

直接上代码,简单明了,目录格式如下:

在这里插入图片描述

首先是配置文件 config.init.py

import os
import sys
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))


class Config(object):
    """配置文件基类"""

    """ 项目名称 """
    PROJECT_NAME = "crawler_worker"
    """ celery backend存放结果 """
    CELERY_BACKEND_URL = "redis://127.0.0.1:6379/4"
    """ celery broker中间件 """
    CELERY_BROKER_URL = "redis://127.0.0.1:6379/5"

    """ worker 名称 """
    CRAWL_SEND_EMAIL_TASK = "crawl_service.crawl.send_email_task"  # 抓取发送邮件任务
    CRAWL_SEND_MSG_TASK = "crawl_service.crawl.send_msg_task"  # 抓取发送短信任务


settings = Config()

celery应用程序模块配置相关 celery_base.celery_app.py

import os
import sys
import time
import celery
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))

from config import settings


# 实例化celery对象
celery_app = celery.Celery(
    settings.PROJECT_NAME,
    backend=settings.CELERY_BACKEND_URL,
    broker=settings.CELERY_BROKER_URL,
    include=[
        "tasks.crawl_send_email",
        "tasks.crawl_send_msg",
    ],
)

# 任务路由
task_routes = {
    settings.CRAWL_SEND_EMAIL_TASK: {
        "queue": f"{settings.CRAWL_SEND_EMAIL_TASK}_queue"
    },
    settings.CRAWL_SEND_MSG_TASK: {"queue": f"{settings.CRAWL_SEND_MSG_TASK}_queue"},
}
# 任务去重
celery_once = {
    "backend": "celery_once.backends.Redis",
    "settings": {"url": settings.CELERY_BACKEND_URL, "default_timeout": 60 * 60},
}
# 配置文件
celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_default_queue="normal",
    timezone="Asia/Shanghai",
    enable_utc=False,
    task_routes=task_routes,
    task_ignore_result=True,
    redis_max_connections=100,
    result_expires=3600,
    ONCE=celery_once,
)

抓取基类 crawl_worker_base.py

from celery_once import QueueOnce


class CrawlBase(QueueOnce):
    """
    抓取worker基类
    """

    name = None
    once = {"graceful": True}
    ignore_result = True

发送邮件任务 crawl_send_email.py

import os
import sys
import time
import celery
from loguru import logger
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))

from config import settings
from celery_base.celery_app import celery_app
from tasks.crawl_worker_base import CrawlBase

"""

执行命令:
celery -A tasks.crawl_send_email worker -l info -Q crawl_service.crawl.send_email_task_queue

"""


class SendEmailClass(CrawlBase):
    name = settings.CRAWL_SEND_EMAIL_TASK

    def __init__(self, *args, **kwargs):
        super(SendEmailClass, self).__init__(*args, **kwargs)

    def run(self, name):
        logger.info("class的方式, 向%s发送邮件..." % name)
        time.sleep(5)
        logger.info("class的方式, 向%s发送邮件完成" % name)
        return f"成功拿到{name}发送的邮件!"


send_email = celery_app.register_task(SendEmailClass())

发送短信 crawl_send_msg.py

import os
import sys
import time
import celery
from loguru import logger
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))
from config import settings
from celery_base.celery_app import celery_app
from tasks.crawl_worker_base import CrawlBase

"""

执行命令:
celery -A tasks.crawl_send_msg worker -l info -Q crawl_service.crawl.send_msg_task_queue

"""


class SendMsgClass(CrawlBase):
    name = settings.CRAWL_SEND_MSG_TASK

    def __init__(self, *args, **kwargs):
        super(SendMsgClass, self).__init__(*args, **kwargs)

    def run(self, name):
        logger.info("class的方式, 向%s发送短信..." % name)
        time.sleep(5)
        logger.info("class的方式, 向%s发送短信完成" % name)
        return f"成功拿到{name}发送的短信!"


send_msg = celery_app.register_task(SendMsgClass())

发送邮件任务-调度器 send_email_scheduler.py

import sys
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))

from config import settings
from celery_base.celery_app import celery_app

if __name__ == "__main__":
    for i in range(100):
        result = celery_app.send_task(
            name=settings.CRAWL_SEND_EMAIL_TASK, args=(f"张三嘿嘿{i}",)
        )
        print(result.id)

发送短信任务-调度器 send_msg_scheduler.py

import os
import sys
import time
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))

from config import settings
from celery_base.celery_app import celery_app

if __name__ == "__main__":
    for i in range(100, 500):
        result = celery_app.send_task(
            name=settings.CRAWL_SEND_MSG_TASK, args=(f"李四哈哈哈{i}",)
        )
        print(result.id)

准备工作已经做好,紧接着分别执行命令:

celery -A tasks.crawl_send_email worker -l info -Q crawl_service.crawl.send_email_task_queue
celery -A tasks.crawl_send_msg worker -l info -Q crawl_service.crawl.send_msg_task_queue

出现👇🏻下面效果就代表celery启动成功:

在这里插入图片描述

最后只要发送任务即可,在redis中就可以看到专门指定的两个队列了。

在这里插入图片描述

看下运行过程中的日志

在这里插入图片描述

一个简单的celery + 队列就实现了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/397929.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Satoshivm一文科普,手把手教你交互(bitget 钱包)

什么是 SatoshiVM? SatoshiVM 是一种去中心化的第 2 层解决方案,创新地将比特币网络的强大安全性和价值稳定性与以太坊虚拟机 (EVM) 的高级可编程性和灵活性相结合。 SatoshiVM 是区块链领域的一个突出功能,支持使用原生 BTC 作为 Gas&#x…

基于JavaWeb开发的智慧医院OA系统[附源码]

基于JavaWeb开发的智慧医院OA系统[附源码] 🍅 作者主页 央顺技术团队 🍅 欢迎点赞 👍 收藏 ⭐留言 📝 🍅 文末获取源码联系方式 📝 🍅 查看下方微信号获取联系方式 承接各种定制系统 &#x1f4…

爬虫入门一

文章目录 一、什么是爬虫?二、爬虫基本流程三、requests模块介绍四、requests模块发送Get请求五、Get请求携带参数六、携带请求头七、发送post请求八、携带cookie方式一:放在请求头中方式二:放在cookie参数中 九、post请求携带参数十、模拟登…

BlackberryQ10 是可以安装 Android 4.3 应用的

BlackberryQ10 是可以安装 Android 4.3 应用的 最近淘了个 Q10 手机,非常稀罕它,拿着手感一流。这么好的东西,就想给它装点东西,但目前所有的应用都已经抛弃这个安卓版本了。 一、开发环境介绍 BlackBerry Q10 的 安卓版本是 4.…

String字符串,FastJson常用操作方法

JSON字符串操作 1、创建配置环境 # 引入测试包testImplementation group: org.springframework.boot, name: spring-boot-starter-test, version: 2.2.6.RELEASE # 创建测试类RunWith(SpringRunner.class)SpringBootTestpublic class JsonTest {Testpublic void test(){Syste…

IO进程线程day4作业

1、使用多进程完成两个文件的拷贝&#xff0c;父进程拷贝前一半&#xff0c;子进程拷贝后一半&#xff0c;父进程回收子进程的资源 代码&#xff1a; #include<myhead.h> int main(int argc, const char *argv[]) {//定义两个文件指针FILE *fp1NULL;FILE *fp2NULL;//创…

open3d 点云体素化

open3d 点云体素化 一、算法原理1.从点云到体素化&#xff08;主要函数&#xff09;2.从网格到体素化&#xff08;主要函数&#xff09; 二、代码三、结果1.从点云到体素化2.从网格到体素化 四、相关数据 一、算法原理 点云和三角网格是非常灵活但不规则的几何类型。体素网格是…

Mybatis | 初识Mybatis

初识Mybatis 目录: 初识Mybatis什么是Mybatis&#xff1f;Hibernate 和 MyBatis的区别&#xff1f;Mybatis的下载和使用Mybatis的工作原理 作者简介 &#xff1a;一只大皮卡丘&#xff0c;计算机专业学生&#xff0c;正在努力学习、努力敲代码中! 让我们一起继续努力学习&#…

微信小程序之会议OA个人中心后台交互

目录 获取用户昵称头像和昵称 小程序登录 登录-小程序 wx.checkSession wx.login wx.request 后台 准备数据表 反向生成工具生成 准备封装前端传过来的数据 小程序服器配置 导入微信小程序SDK application.yml WxProperties WxConfig WxAuthController 登录-小…

目录的共享与访问的实现

给用户机赋予读取文件的权利 创建文件夹&文件 点击属性–>共享–>共享&#xff08;S&#xff09; 点击添加–》给需要赋权的用户赋予相应的权限–>应用–确定 在赋权的用户机里winR–>‘\’‘IP地址&#xff08;主机&#xff09;’

常用的消息中间件RabbitMQ

目录 一、消息中间件 1、简介 2、作用 3、两种模式 1、P2P模式 2、Pub/Sub模式 4、常用中间件介绍与对比 1、Kafka 2、RabbitMQ 3、RocketMQ RabbitMQ和Kafka的区别 二、RabbiMQ集群 RabbiMQ特点 RabbitMQ模式⼤概分为以下三种: 集群中的基本概念&#xff1a; 集…

Open AI Sora的出现,大大改变ai视频内容生成赛道的格局

前言&#xff1a;在人工智能&#xff08;AI&#xff09;技术的迅猛发展下&#xff0c;我们见证了无数令人惊叹的创新&#xff0c;它们正逐渐重塑我们对可能性的认知。在这一浪潮中&#xff0c;OpenAI 的最新力作——Sora模型&#xff0c;以其在视频生成领域的革命性突破&#x…

mysql调优实战

EXPLAIN执行分析 id:值越大越先执行相同时&#xff0c;由上向下执行。 possible_key: 可能走索引的键。 key&#xff1a;真正走索引的键rows:根据表统计信息及索引选用情况&#xff0c;大致估算出找到所需的记录所需要读取的行数&#xff0c;也就是说&#xff0c;用的越少越好 …

动态获取 微信小程序appid / 自定义启动命令

官网&#xff1a;https://uniapp.dcloud.net.cn/collocation/package.html#%E7%94%A8%E6%B3%95 小程序开发完成之后需要一套代码多个小程序使用&#xff0c;每次都需要在manifest.json文件中手动修改&#xff0c;大大增加了开发的复杂度。 官网&#xff1a;https://uniapp.dcl…

市场复盘总结 20240220

仅用于记录当天的市场情况&#xff0c;用于统计交易策略的适用情况&#xff0c;以便程序回测 短线核心&#xff1a;不参与任何级别的调整&#xff0c;采用龙空龙模式 一支股票 10%的时候可以操作&#xff0c; 90%的时间适合空仓等待 二进三&#xff1a; 进级率中 19% 最常用…

Android---Retrofit实现网络请求:Java 版

简介 在 Android 开发中&#xff0c;网络请求是一个极为关键的部分。Retrofit 作为一个强大的网络请求库&#xff0c;能够简化开发流程&#xff0c;提供高效的网络请求能力。 Retrofit 是一个建立在 OkHttp 基础之上的网络请求库&#xff0c;能够将我们定义的 Java 接口转化为…

【Vue3】路由传参的几种方式

路由导航有两种方式&#xff0c;分别是&#xff1a;声明式导航 和 编程式导航 参数分为query参数和params参数两种 声明式导航 query参数 一、路径字符串拼接(不推荐) 1.传参 在路由路径后直接拼接?参数名:参数值 &#xff0c;多组参数间使用&分隔。 <RouterLink …

OpenGL学习——17.模型

前情提要&#xff1a;本文代码源自Github上的学习文档“LearnOpenGL”&#xff0c;我仅在源码的基础上加上中文注释。本文章不以该学习文档做任何商业盈利活动&#xff0c;一切著作权归原作者所有&#xff0c;本文仅供学习交流&#xff0c;如有侵权&#xff0c;请联系我删除。L…

算法——数值算法——牛顿迭代法

目录 牛顿迭代法 一、1021: [编程入门]迭代法求平方根 牛顿迭代法 迭代法&#xff08;Iteration&#xff09;是一种通过反复递推计算来逼近解的方法。而牛顿迭代法&#xff08;Newtons method&#xff09;则是一种特定的迭代法&#xff0c;用于求解方程或函数的根、最小值、最…

【软考中级备考笔记】计算机体系结构

计算机体系结构 2月19日 – 天气&#xff1a;阴转小雪 1. 冯诺依曼计算机体系结构 冯诺依曼将计算机分为了五大部分&#xff0c;分别是&#xff1a; 控制器&#xff1a;主要负责协调指令到执行运算器&#xff1a;负责算数和逻辑运算存储器&#xff1a;负责存储在指令执行过程…