Celery知识

celery介绍

# celery 的概念:
    * 翻译过来是芹菜
    * 官网:https://docs.celeryq.dev/en/stable/
# 是分布式的异步任务框架:
    分布式:一个任务,拆成多个任务在不同机器上做
    异步任务:后台执行,不阻塞主任务
    框架:集成到项目中
# 作用:
1、异步任务:异步()发邮件,短信,通知
2、延迟任务:延迟几秒 再执行某个任务
                订单提交后,延迟半小时,把订单取消
3、定时任务 :每隔多长事件 执行某个任务,比如定时更新缓存

        eg.买了个会员,会员快到期了会每天给你发短信提醒

celery架构

# django  是一个服务,celery 也是是一个服务,和django没有必然联系
    -命令启动,就能提供服务
# 三个模块:
        1、broker:消息中间件,消息队列,任务中间件
             存储任务(函数):发送短信任务,统计在线人数...
              redis:  使用字符串形式,能把任务表示出来即可

              reabbitmq 存储:  其实就是一个队列,一个个任务任务
        2、worker:任务执行单元,可以启动多个
              从消息队列(broker的redis)取出任务然后去执行程序(进程)
        3、backend:结果存储  Result Stores
              任务执行完成后的结果存储在这里
              redis存储,关系型数据库。。
# 执行流程:
        1、其他程序提交任务(函数),任务序列化后存到celery的broker
                        用redis   0 库
        2、接下来:worker执行,从broker中取任务,然后执行
        3、任务执行完后,把结果存到 bancked中
                        用redis   1库


# 注意:
    celery和其他程序是 独立运行
        1、可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
        2、celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务(django),一个是celery服务

项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

# 例子:

    人(django)是一个独立运行的服务 | 医院(celery)也是一个独立运行的服务
        正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题,
        人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

celery快速使用

# 使用步骤
1、安装:pip install celery
2、写个py文件 demo.py

from celery import Celery
import time
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('app', broker=broker, backend=backend)
# 写任务---》写函数---》必须用 @app.task 装饰---》装饰后,就变成了celery的任务了
@app.task
def hello():
    time.sleep(2)  # 模拟任务延迟
    return 'hello world'
@app.task
def add(a, b):
    time.sleep(3)
    return a + b

3、其他程序中,提交任务

res=add.delay(4,5)
print(res)

4、 启动worker---worker启动可以再靠前
 win运行:
        pip3 install eventlet
        celery -A demo worker -l info -P eventlet

 非win运行:mac linux
        celery -A demo  worker -l info

5、查询结果
---直接取redis中查
---使用代码查询

from demo import app
from celery.result import AsyncResult
id = '17bf03ad-a1e6-49d1-a182-794bd3e96b74'
if __name__ == '__main__':
        a = AsyncResult(id=id, app=app)
        if a.successful():
            result = a.get() # hello world
            print(result)
        elif a.failed():
            print('任务失败')
        elif a.status == 'PENDING':
            print('任务等待中被执行')
        elif a.status == 'RETRY':
            print('任务异常后正在重试')
        elif a.status == 'STARTED':
            print('任务已经开始被执行')

celery包结构

# 包结构 目录如下:
        --celery_task
                    --celery.py
                    --user_task.py
                    --order_task.py
                    --goods_task.py
    
-其他程序中提交任务: add_task_package.py
-其他程序中查询结果:get_result_package.py
###具体步骤

# celery.py  
from celery import Celery
#######1 实例化得到对象 ##########
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('app', broker=broker, backend=backend,include=['celery_task.order_task','celery_task.user_task'])
#######2 写任务 ##########以后各种类型任务,单独写在py文件中
# order_task.py   user_task.py 
# order_task.py
import time
from .celery import app
@app.task
def cancel_order(order_id):
    time.sleep(2)
    return '订单:%s取消成功' % order_id
# user_task.py
import time
from .celery import app
@app.task
def send_sms(phone, code):
    time.sleep(1)
    return '手机号:%s,发送验证码:%s,成功' % (phone, code)
######## 3 其他程序,提交任务 ##############################
from celery_task.user_task import send_sms
res=send_sms.delay('1893424323',8888)
print(res)

##### 4 启动worker########## 在包的一层,执行包,不需要具体到某个py文件了
win运行:pip3 install eventlet
        A celery_demo 包名----因为他会去包下找 celery.py 中得app执行
        celery -A celery_task worker -l info -P eventlet

非win运行:mac linux
        celery -A celery_task  worker -l info

#####5 查询结果#####
# 使用代码,查询结果
from celery_task.celery import app
from celery.result import AsyncResult

id = '46b26c73-62ae-403c-ba62-e469f2f8c69f'
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get() # hello world
        print(result)
    elif a.failed():
        print('任务失败')
    elif a.status == 'PENDING':
        print('任务等待中被执行')
    elif a.status == 'RETRY':
        print('任务异常后正在重试')
    elif a.status == 'STARTED':
        print('任务已经开始被执行')

celery实现异步任务,定时任务,延迟任务

# 异步任务:
        任务名.delay(传参数)

# 延迟任务---延迟多长事件干事

        点右键执行,work运行

# 链接上面任务二的order函数
from celery_task.order_task import cancel_order
from datetime import datetime, timedelta

# atetime.utcnow() 当前utc时间
eta = datetime.utcnow() + timedelta(seconds=15)
res = cancel_order.apply_async(args=['10001',], eta=eta)  # 订单+15s 后执行这个任务
print(res)

# 定时任务--一定要启动beat
     1、 在celery.py 中写

# 链接上面任务二的user_task函数
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 不使用UTC时间
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'send_sms': {
        'task': 'celery_task.user_task.send_sms',  #  执行的任务函数 
        'schedule': timedelta(seconds=3),    # 每隔三秒钟干
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
        'args': ('1896388888', '6666'),     # 传参数,手机号\验证码
    }
}

2、启动worker:celery -A celery_task worker -l info -P eventlet
3、启动beat(每个一段时间,就提交任务):celery -A celery_task beat  -l info

4、等待即可

django中使用celery

# 两种方案:
    -通用方案:自己封装
    -django-celery--》app---》创建出一些表
# 自己封装的通用方案:
1、把封装的包:celery_task 复制到项目中
2、在django中使用celery.py 中必须加入
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
    3 写任务,启动worker
    4 在django的视图类中,异步调用即可

# celery_task/celery.py
from celery import Celery
import os

# 任务里使用django的东西:缓存,表模型。。。必须加入
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')

# 1 实例化得到对象
broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery('app', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])

# 2 写任务,以后各种类型任务,单独写在py文件中

# 定时任务
app.conf.timezone = 'Asia/Shanghai'    # 时区
app.conf.enable_utc = False    # 不使用UTC

# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
}
# celery_task/user_task.py
import time
from .celery import app
from libs.send_information import common_send_sms

@app.task
def send_sms(phone, code):
    res = common_send_sms(code, mobile=phone)
    if res:
        return '短信发送成功:%s' % phone
    else:
        return '短信发送失败:%s' % phone

# user/views.py
    # 发送短信接口
    @action(methods=['get'], detail=False, url_path='send_information')
    def send_sms(self, request):
        try:
            mobile = request.query_params['mobile']  # 取的手机号放在请求地址栏中
            code = get_code()  # 生成验证码
            cache.set('sms_code_%s' % mobile, code, 61)  # 放在缓存中,以手机号做区
            # 异步发送短信--不管是否成功--如果不成功,用户再发一次即可
            # t = Thread(target=common_send_sms, args=[code, mobile])
            # t.start()  # 启动线程发送短信

            # celery异步发送短信
            send_sms.delay(mobile, code)
            return APIResponse(msg='短信已发送')

        except MultiValueDictKeyError as e:
            raise APIException(detail='手机号必须携带')
        except Exception as e:
            raise APIException(detail=str(e))

双写一致性(缓存问题)

# 轮播图加缓存
    出现问题:banner表中数据变了,缓存不会变
    mysql和redis数据不一致: mysql和redis双写一致性
# 双写一致性的解决方案:
    1、mysql修改---删缓存
    2、mysql修改---改缓存
    3、定时更新---每个5s,更新一次缓存
             先删缓存,在更新mysql
            先改缓存,再更新mysql
# 轮播图的接口---使用定时更新,解决双写一致性问题

# celery_task/home_task.p
import time
from .celery import app
from home.models import Banner
from django.conf import settings
from home.serializer import BannerSerializer
from django.core.cache import cache


@app.task
def update_banner():
    # 1 获取所有轮播图数据
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    # 2 序列化
    ser = BannerSerializer(instance=queryset, many=True)
    # ser.data
    # 2.1 把服务端前缀拼接上
    for item in ser.data:
        # media/banner/banner1.png,
        item['image'] = settings.BACKEND_URL + item['image']
    # 3 放到缓存
    cache.set('banner_list', ser.data)
    return '更新成功'
# celery.py
app = Celery('app', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task','celery_task.home_task'])

app.conf.beat_schedule = {
    'update_banner': {
        'task': 'celery_task.home_task.update_banner',
        # 'schedule': timedelta(seconds=5),
        'schedule': timedelta(minutes=3),
        'args': (),
    },
}

# 启动worker:celery -A celery_task worker -l info -P eventlet

    启动beat:celery -A celery_task beat  -l info
# 以后尽管改 mysql数据,最多3分钟就会更新到最新了

异步秒杀方案

# 秒杀功能:
      并发量要高:承载住很多用户同时操作
                订单表
                扣减库存
            效率要高


# 同步秒杀
    假设秒杀需要10s钟,项目并发量是3,总共5个商品要秒杀
    10s内,只有3个人能进入到系统,并且开始秒杀

# 前端
const routes = [
  {
    path: '/',
    name: 'home',
    component: HomeView
  },
  {
    path: '/seckill',
    name: 'seckill',
    component: SeckillView
  },
]
<template>
  <div>
    <Header></Header>
    <div style="padding: 50px;margin-left: 100px">
      <h1>Go语言课程</h1>
      <img src="http://photo.liuqingzheng.top/2023%2002%2022%2021%2057%2011%20/image-20230222215707795.png"
           height="300px"
           width="300px">
      <br>
      <el-button type="danger" @click="handleSeckill" v-loading.fullscreen.lock="fullscreenLoading">秒杀课程</el-button>
    </div>


    <br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br>


    <Footer></Footer>

  </div>
</template>

<script>
import Header from '@/components/Header'
import Footer from '@/components/Footer'

export default {
  name: "SckillView",
  data() {
    return {
      fullscreenLoading: false,
      task_id: '',
      t: null,
    }
  },
  methods: {
    // ##############同步秒杀##############
    // handleSeckill() {
    //   this.fullscreenLoading = true;
    //   this.$axios({
    //     url: '/user/seckill/seckill/',
    //     method: 'POST',
    //     data: {
    //       course_id: '99'
    //     }
    //   }).then(res => {
    //     this.fullscreenLoading = false;
    //     this.$message.success(res.msg)
    //   }).catch(res => {
    //     this.fullscreenLoading = false;
    //     this.$message.error(res)
    //   })
    // }
    // ##############同步秒杀##############


    // ##############异步秒杀##############
    handleSeckill() {
      this.fullscreenLoading = true;
      this.$axios({
        url: '/user/seckill/seckill/',
        method: 'POST',
        data: {
          course_id: '99'
        }
      }).then(res => {
        // 在排队,转圈的,还需要继续显示
        this.$message.success(res.msg)
        this.task_id = res.task_id
        // 继续发送请求---》查询是否秒杀成功:1 成功 2 没成功  3 秒杀任务还没执行
        // 启动定时任务,没隔1s,向后端发送一次请求
        this.t = setInterval(() => {
          this.$axios({
            url: '/user/seckill/get_result/',
            method: 'get',
            params: {
              task_id: this.task_id
            }
          }).then(res => {
            // 100 成功,success : 1 成功  0 失败  2 还没开始
            if (res.success == '1') {
              // 转圈框不显示
              this.fullscreenLoading = false;
              // 停止定时任务
              clearInterval(this.t)
              this.t = null
              this.$message.success(res.msg)

            } else if (res.success == '0') {
              // 转圈框不显示
              this.fullscreenLoading = false;
              // 停止定时任务
              clearInterval(this.t)
              this.t = null
              this.$message.error(res.msg)
            } else {
              // this.$message.error(res.msg)
              console.log(res.msg)
            }
          })


        }, 1000)


      }).catch(res => {
        this.fullscreenLoading = false;
        this.$message.error(res)
      })
    }
  },
  components: {
    Header, Footer
  }
}
</script>

<style scoped>

</style>
# 后端
# 秒杀功能
import random

from celery_task.order_task import seckill
from celery_task.celery import app
from celery.result import AsyncResult


class SeckillView(ViewSet):
    # 同步操作,性能不高
    # 异步提交任务
    @action(methods=['POST'], detail=False)
    def seckill(self, request, *args, **kwargs):
        course_id = request.data.get('course_id')
        task_id = seckill.delay(course_id)
        return APIResponse(msg='您正在排队', task_id=str(task_id))

    @action(methods=['GET'], detail=False)
    def get_result(self, request, *args, **kwargs):
        task_id = request.query_params.get('task_id')
        a = AsyncResult(id=task_id, app=app)
        if a.successful():
            result = a.get()  # True 和 False
            if result:
                return APIResponse(success='1', msg='秒杀成功')
            else:
                return APIResponse(success='0', msg='秒杀失败')
        elif a.status == 'PENDING':
            print('任务等待中被执行')
            return APIResponse(success='2', msg='任务等待中被执行')
        else:
            return APIResponse(success='3', msg='秒杀任务正在执行')
# 任务
import random
import time


@app.task
def seckill(course_id):
    print('根据课程id:%s,查询课程是否还有剩余,耗时2s' % course_id)
    time.sleep(2)
    res = random.choice([True, False])
    if res:  # 库存够
        print('扣减库存,耗时1s')
        time.sleep(1)
        print('下单,耗时2s')
        time.sleep(2)
        return True
    else:
        return False
# 路由
router.register('seckill', SeckillView, 'seckill')

今日思维导图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/455275.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【AI大模型应用开发】【LangChain系列】9. 实用技巧:大模型的流式输出在 OpenAI 和 LangChain 中的使用

大家好&#xff0c;我是同学小张&#xff0c;日常分享AI知识和实战案例欢迎 点赞 关注 &#x1f44f;&#xff0c;持续学习&#xff0c;持续干货输出。v: jasper_8017 一起交流&#x1f4ac;&#xff0c;一起进步&#x1f4aa;。微信公众号也可搜【同学小张】 &#x1f64f; 本…

六 超级数据查看器 讲解稿 详情1 概述

六 超级数据查看器 讲解稿 详情1 概述 点此此处 以新界面 打开B站 当前视频教程 APP下载地址 百度 下载地址 ​ 讲解稿全文&#xff1a; 大家好&#xff0c;今天我们讲解一下超级数据查看器详情界面。由于内容较多&#xff0c;讲解要分为7集&#xff0c;这是第一集 首…

Python导入类说一说

要在Python中导入一个类&#xff0c;需要使用import关键字。 详细去看下面的代码 1、多例类 class Restaurant:餐馆类def __init__(self,restaurant_name,cuisine_type):#类的属性self.restaurant_name restaurant_nameself.cuisine_type cuisine_type# self.stregth_leve…

如何利用ChatGPT联系英语口语和听写!分享一些Prompt!

参考文章 ChatGPT4升级方法 namecheap购买方法 sora namecheap 支付 首先先看ChatGPT修改英语作文的能力 足以证明ChatGPT的能力 ChatGPT英语练习 口语&#xff1a; 实时交谈纠错发音纠错语句 写作&#xff1a; 写作建议构思文本 模拟考试&#xff1a; 雅思、托福和…

论文阅读——Vision Transformer with Deformable Attention

Vision Transformer with Deformable Attention 多头自注意力公式化为&#xff1a; 第l层transformer模块公式化为&#xff1a; 在Transformer模型中简单地实现DCN是一个non-trivial的问题。在DCN中&#xff0c;特征图上的每个元素都单独学习其偏移&#xff0c;其中HWC特征图上…

BUGKU-WEB never_give_up

题目描述 题目截图如下&#xff1a; 进入场景看看&#xff1a; 解题思路 F12查看请求和响应&#xff0c;查找线索 相关工具 base64解码URL解码Burp Suit抓包 解题步骤 F12查看请求和响应&#xff0c;发现一行注释包含一个文件名称【1p.html】&#xff0c;这应该就是提…

操作系统内功篇:使用说明

本专栏是我阅览大佬小林coding写的电子书《图解系统》的一些总结并参杂一些我个人学习的补充&#xff0c;博客大纲是用的大佬的纲要。 暂时打算更新这么多&#xff0c;在以后的学习的过程中再慢慢更新......... 此文章会实时更新更新进程...........

什么是Ipython

IPython&#xff08;Interactive Python&#xff09;是一个增强版的Python交互式解释器。它在标准Python解释器的基础上添加了许多有用的功能&#xff0c;旨在提高你编程时的效率和体验。IPython的核心特性包括但不限于以下几点&#xff1a; 增强的交互性&#xff1a;IPython提…

18. 查看帖子详情

文章目录 一、建立路由二、开发GetPostDetailHandler三、编写logic四、编写dao层五、编译测试运行 一、建立路由 router/route.go v1.GET("/post/:id", controller.GetPostDetailHandler)二、开发GetPostDetailHandler controller/post.go func GetPostDetailHand…

linux命令深入研究——cat

cat命令&#xff0c;“猫”&#xff0c;可以理解为瞄一眼文件内容&#xff0c;其中可以用重定向符号对文件进行一些修改&#xff0c;如增加&#xff0c;删除文件内容&#xff0c;其命令参数如-n&#xff0c;-s&#xff0c;-b可以输出带有行号的行 如果想要快速删除文件内容&…

Java学习笔记(11)

面向对象进阶 Static 静态变量 所有对象一起共享&#xff0c;就用static修饰 不属于对象&#xff0c;属于类的 可以用 类名.静态变量 “”&#xff1b;赋值 但是 对象.静态变量也可以访问到内容 Static内存图 Student这个类的字节码文件加载到方法区&#xff0c;并在内…

Nacos启动的第一个坑 Request nacos server failed:

前言&#xff1a; 今天&#xff0c;小编启动nacos写微服务的demo,电脑上安装了nacos服务器&#xff0c;管理后台也能正常登录。然后搭建了一个基于springboot的微服务项目&#xff0c;加了依赖、启动类加了注解、配置文件也进行了配置&#xff0c;然后启动项目&#xff0c;启动…

中国城市统计年鉴、中国县域统计年鉴、中国财政统计年鉴、中国税务统计年鉴、中国科技统计年鉴、中国卫生统计年鉴​

统计年鉴是指以统计图表和分析说明为主&#xff0c;通过高度密集的统计数据来全面、系统、连续地记录年度经济、社会等各方面发展情况的大型工具书来获取统计数据资料。 统计年鉴是进行各项经济、社会研究的必要前提。而借助于统计年鉴&#xff0c;则是研究者常用的途径。目前国…

wordpress主题批量修改历史文章标题,文章内容

&#xff1a;​wordpress模板&#xff0c;在我映像中还是比较受欢迎的&#xff0c;至少它该有的插件都是应有尽有&#xff0c;不像帝国cms虽然功能多&#xff0c;但是基本用不上&#xff0c;而且很多会出错。也不像织梦cms漏洞太多&#xff0c;搞的建站期间出现很多其他事情&am…

Linux字符设备驱动开发一

linux字符设备驱动 0 驱动介绍1 字符设备驱动1.1 字符设备相关概念和结构体1.2 实现简单的字符设备模块1.3 创建字符设备1.4 总结 应用程序调用文件系统的API(open、close、read、write) -> 文件系统根据访问的设备类型&#xff0c;调用对应设备的驱动API -> 驱动对硬件进…

『scrapy爬虫』03. 爬取多个页面(详细注释步骤)

目录 1. 分析网页试着拿到多个页面的url2. 抓取250个电影3. start_requests的使用4. 代码规范导库的优化关于重写最终修改后的代码 总结 欢迎关注 『scrapy爬虫』 专栏&#xff0c;持续更新中 欢迎关注 『scrapy爬虫』 专栏&#xff0c;持续更新中 1. 分析网页试着拿到多个页面…

yum安装mysql 数据库tab自动补全

centos7上面没有mysql&#xff0c;它的数据库名字叫做mariadb [rootlocalhost ~]#yum install mariadb-server -y [rootlocalhost ~]#systemctl start mariadb.service [rootlocalhost ~]#systemctl stop firewalld [rootlocalhost ~]#setenforce 0 [rootlocalhost ~]#ss -na…

数字人基础 | 3D手部参数化模型2017-2023

楔子: 2017年年底的泰国曼谷, SIGGRAPH Asia会议上, 来自马普所的 Javier Romero, Dimitrios Tzionas(两人都是 Michael J. Black的学生)发布了事实性的手部参数化模型标准: MANO [1]。 MANO的诞生意味着 Michael J. Black团队在继人体参数化模型 SMPL后, 事实性的将能够表达人…

信息系统项目管理师--沟通管理

IT 项⽬成功有关的最重要的四个因素是&#xff1a;主管层的⽀持、⽤户参与、有经验的项⽬经理和清晰的业务⽬标 项⽬沟通管理是确保及时、正确地产⽣、收集、分发、存储和最终处理项⽬信息所需的过程 项⽬沟通管理由两部分组成&#xff1a;⼀是制定策略&#xff0c;确保沟通对…

GUI编程--PyQt5--QTabWidget

文章目录 组件使用信号样式设置 组件使用 QTabWidget 页签 信号 self._ui Ui_Sub() self._ui.setupUi(right) # 切换tab页 self._ui.tabWidget.currentChanged.connect(self.tab_slot)def tab_slot(self):cur_index self._ui.tabWidget.currentIndex()tab_name self._ui…