【消息中间件】Rabbitmq的基本要素、生产和消费、发布和订阅

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。


文章目录

  • 前言
  • 一、消息队列的基本要素
    • 1.队列:queue
    • 2.交换机:exchange
    • 3.事件:routing_key
    • 4.任务:task
  • 二、生产消费模式
    • 1.安装pika
    • 2.模拟生产者进程
    • 3.模拟消费者进程
    • 4.ACK消息确认机制
    • 5.类的写法
      • (1)新建MyRabbitMQ.py文件
      • (2)基础RabiitMQ
  • 三、发布订阅模式
  • 四、多消息队列
  • 五、异常处理
    • 1. 死信队列


前言

Rabbitmq消息队列,Windows安装RabbitMQ教程


一、消息队列的基本要素

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

消息队列是一种中间件 ,用于在不同的组件或系统之间传递消息(进程间通讯的一种)。 它提供了一种可靠的机制(AMQP)来存储和传递消息,并确保消息的顺序性和可靠性。消息队列需要存储消息。

1.队列:queue

用于接入消息队列的出入口

2.交换机:exchange

用于存储的一种通道

3.事件:routing_key

用于记录的一种标记

4.任务:task

这里的任务就是处理程序,还可能包含回调函数

注:基于我们使用不同的要素组合,分化出了基础的生产消费模式和发布订阅模式。其中只使用队列和任务的方式划为生产消费模式,4个同时使用的方式划为发布订阅模式。

二、生产消费模式

消息队列处理的是进程间通讯问题,生产者和消费者正是2个进程的程序,代表了不同的组件或系统。
我们使用python来实现相关功能,可以通过pika这个三方库来实现。

1.安装pika

pip install pika -i https://pypi.tuna.tsinghua.edu.cn/simple

2.模拟生产者进程

这里的生产者进程可能是一个后端程序、也可能是一个py文件、也可能知识一条触发命令。

# !/usr/bin/env python
import pika

# ######################### 生产者 #########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))
# 声明一个channel,类似数据库打开一个链接
channel = connection.channel()
# 创建一个队列,队列名称叫做hello
channel.queue_declare(queue='hello')
# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

3.模拟消费者进程

消费者

# !/usr/bin/env python
import pika

# ########################## 消费者 ##########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))
channel = connection.channel()

# channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # 取一个就关掉的方法
    channel.stop_consuming()
# 去hello队列里拿数据,一但有数据,就执行callback
channel.basic_consume(callback, queue='hello', no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉
channel.start_consuming()

4.ACK消息确认机制

ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将次消息从队列中删除。

生产者

# !/usr/bin/env python
import pika

# ######################### 生产者 #########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))
# 声明一个channel,类似数据库打开一个链接
channel = connection.channel()
# 创建一个队列,队列名称叫做hello
channel.queue_declare(queue='hello')
# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消费者

# !/usr/bin/env python
import pika

# ########################## 消费者 ##########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost', credentials=credentials))

channel = connection.channel()


# channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # 取值做确认工作
    ch.basic_ack(delivery_tag=method.deliver_tag)


# 去hello队列里拿数据,一但有数据,就执行callback,
# no_ack=Flask必须在取值时做确认工作,否则值不会被取出
channel.basic_consume(callback, queue='hello', no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉
channel.start_consuming()

5.类的写法

这个类使用 pika 库进行与 RabbitMQ 的通信。当你使用 send_message() 或 receive_message() 、consume_messages方法时,Channel 对象必须是打开的。如果没有连接或者通道没有打开,这些方法将引发 ValueError 异常。

(1)新建MyRabbitMQ.py文件

文件包含rabbitmq的类,类中包含连接到RabbitMQ,并在连接对象上创建一个管道,然后就可以使用send_message()receive_message()方法、consume_messages发送和接收消息,接收消息会调用回调方法。

下面是一个带有消费回调的完整 RabbitMQ 类

import pika
import time

class RabbitMQ:
    def __init__(self, host, port, username, password):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.connection = None
        self.channel = None

    def connect(self, timeout=10):
        credentials = pika.PlainCredentials(self.username, self.password)
        parameters = pika.ConnectionParameters(host=self.host,
                                               port=self.port,
                                               credentials=credentials)
        start_time = time.time()
        while time.time() - start_time < timeout:
            try:
                self.connection = pika.BlockingConnection(parameters)
                self.channel = self.connection.channel()
                return True
            except pika.exceptions.AMQPConnectionError:
                time.sleep(1)
        return False

    def send_message(self, exchange, routing_key, message):
        try:
            self.channel.basic_publish(exchange=exchange,
                                       routing_key=routing_key,
                                       body=message,
                                       properties=pika.BasicProperties(delivery_mode=2))
        except AttributeError:
            raise ValueError("Channel is not open. Call connect() before send_message().")

    def receive_message(self, queue, auto_ack=False):
        try:
            method_frame, properties, body = self.channel.basic_get(queue=queue, auto_ack=auto_ack)
            if method_frame:
                return body.decode('utf-8')
            else:
                return None
        except AttributeError:
            raise ValueError("Channel is not open. Call connect() before receive_message().")

    def consume_messages(self, queue, callback):
        try:
            self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
            self.channel.start_consuming()
        except AttributeError:
            raise ValueError("Channel is not open. Call connect() before consume_messages().")
    
    def create_queue(self, name):
        try:
            self.channel.queue_declare(queue=name, durable=True)
        except AttributeError:
            raise ValueError("Channel is not open. Call connect() before create_queue().")

    def bind_queue(self, queue, exchange, routing_key):
        try:
            self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)
        except AttributeError:
            raise ValueError("Channel is not open. Call connect() before bind_queue().")

    def close(self):
        try:
            self.connection.close()
        except AttributeError:
            raise ValueError("Connection is not open. Call connect() before close().")

(2)基础RabiitMQ

基于队列_生产

创建RabiitMQ_生产.py文件,内容如下:

from MyRabbitMQ import RabbitMQ

if __name__ == '__main__':
    print('RabbitMQ生产')
    my_host = '127.0.0.1'
    my_username = 'guest'
    my_password = 'guest'
    my_queue = 'hello'
    my_exchange = 'BBB'
    my_routing_key = 'hello'

    rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
    if rabbitmq.connect():
        rabbitmq.create_queue(my_queue)
        rabbitmq.send_message('', my_queue, message='开始了')
    else:
        print("Failed to connect to RabbitMQ.")

基于队列_消费

from MyRabbitMQ import RabbitMQ

if __name__ == '__main__':
    print('RabbitMQ消费')
    my_host = '127.0.0.1'
    my_username = 'guest'
    my_password = 'guest'
    my_queue = 'hello'
    my_exchange = 'BBB'
    my_routing_key = 'hello'


    def callback(channel, method, properties, body):
        print("Received message: %s" % body.decode('utf-8'))
        channel.basic_ack(delivery_tag=method.delivery_tag)


    rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
    if rabbitmq.connect():
        rabbitmq.create_queue(my_queue)
        rabbitmq.consume_messages(my_queue, callback)
    else:
        print("Failed to connect to RabbitMQ.")

在此例中,当一个新的消息从名为 my_queue 的队列中接收时,回调函数 callback 将被调用并打印消息内容。

注意:如果你的回调函数需要执行较复杂的操作(例如长时间运行或使用多线程),则你应该确保它是线程安全的,并且在操作完成后调用 ch.basic_ack,这样 RabbitMQ 就知道消息已经被处理并可以将其从队列中删除。

三、发布订阅模式

发布订阅模式的消费者是queue队列,需要绑定exchange和routing_key,实际使用时可能存在一个队列绑定多个routing_key,或多个queue绑定一个routing_key,所以在我们的消费者处理中,需要判断routing_key事件做必要的区分。

基于exchangs交换机的生产者

from MyRabbitMQ import RabbitMQ

if __name__ == '__main__':
    print('RabbitMQ消费')
    my_host = '127.0.0.1'
    my_username = 'guest'
    my_password = 'guest'
    my_queue = 'hello'
    my_exchange = 'BBB'
    my_routing_key = 'hello'


    rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
    if rabbitmq.connect():
		rabbitmq.send_message(my_exchange, my_routing_key, message='开始了')
    else:
        print("Failed to connect to RabbitMQ.")

基于exchangs交换机的消费者

from MyRabbitMQ import RabbitMQ

if __name__ == '__main__':
    print('RabbitMQ消费')
    my_host = '127.0.0.1'
    my_username = 'guest'
    my_password = 'guest'
    my_queue = 'hello'
    my_exchange = 'BBB'
    my_routing_key = 'hello'


    def callback(channel, method, properties, body):
        print("Received message: %s" % body.decode('utf-8'))
        channel.basic_ack(delivery_tag=method.delivery_tag)


    rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)
    if rabbitmq.connect():
        rabbitmq.create_queue(my_queue)
        # rabbitmq.send_message(my_exchange, my_routing_key, message='开始了')
        rabbitmq.bind_queue(my_queue, my_exchange, my_routing_key)
        rabbitmq.consume_messages(my_queue, callback)
    else:
        print("Failed to connect to RabbitMQ.")

在这里插入图片描述

四、多消息队列

import pika
import random
from retry import retry
def on_message(channel, method_frame, header_frame, body)
    print(method_frame.delivery_tag)
    print(body)
    print(header_frame)
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)

node1 = pika.URLParameters('amqp://node1')
node2 = pika.URLParameters('amqp://node2')
node3 = pika.URLParameters('amqp://node3')
all_endpoints = [node1, node2, node3]

@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter(1, 3)
def consume():
    random.shuffle(all_endpoints)
    connection = pika.BlockingConnection(all_endpoints)
    channel = connection.channel()
    channel.basic_qos(prefetch_count=1)
    channel.queue_declare('recovery-example', durable=False, auto_delete=True)
    channel.basic_consume('recovery-example', on_message)
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
        connection.close()
    except pika.excaptions.ConnectionClosedByBroker:
        continue
consume()

五、异常处理

from pika.exceptions import ChannelClosed
from pika.exceptions import ConnectionClosed

    try:
        mq.start_consuming_message()
    except ConnectionClosed as e:
        mq.clear()
        mq.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)
        mq.start_consuming_message()
    except ChannelClosed as e:
        mq.clear()
        mq.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)
        mq.start_consuming_message()

1. 死信队列

死信队列就是备份队列,rabbitMQ有,kafka还没有

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/252220.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

虚拟机Linux(Centos7)安装Docker

如果没有安装虚拟机的&#xff0c;可以参考这篇VMware虚拟机安装Linux操作系统&#xff08;CentOS7&#xff09; 文章目录 0.安装Docker1.CentOS安装Docker1.1.卸载&#xff08;可选&#xff09;如何看自己的虚拟机上是否安装过docker&#xff1f; 1.2.安装docker1.3.启动docke…

【观测宇宙】

这个网站一眼看清整个宇宙。可观测范围一亿光年。 Cocosmos | 掌上宇宙 作者开发介绍&#xff1a;Cocosmos 序章 | 掌中宇宙&#xff0c;浩瀚星海&#xff0c;一眼万年 (qq.com)

Cell Systems | 深度学习开启蛋白质设计新时代

今天为大家介绍的是来自Bruno Correia团队的一篇综述。深度学习领域的迅速进步对蛋白质设计产生了显著影响。最近&#xff0c;深度学习方法在蛋白质结构预测方面取得了重大突破&#xff0c;使我们能够得到数百万种蛋白质的高质量模型。结合用于生成建模和序列分析的新型架构&am…

【深度强化学习】TRPO、PPO

策略梯度的缺点 步长难以确定&#xff0c;一旦步长选的不好&#xff0c;就导致恶性循环 步长不合适 → 策略变差 → 采集的数据变差 → &#xff08;回报 / 梯度导致的&#xff09;步长不合适 步长不合适 \to 策略变差 \to 采集的数据变差 \to &#xff08;回报/梯度导致的&am…

RabbitMQ 消息持久化

默认情况下&#xff0c;exchange、queue、message 等数据都是存储在内存中的&#xff0c;这意味着如果 RabbitMQ 重启、关闭、宕机时所有的信息都将丢失。 RabbitMQ 提供了持久化来解决这个问题&#xff0c;持久化后&#xff0c;如果 RabbitMQ 发送 重启、关闭、宕机&#xff…

信息安全和网络安全的区别

信息安全与网络安全都属于安全领域&#xff0c;但它们的范围和重点不同。 信息安全主要关注数据的保护&#xff0c;包括对敏感数据进行加密、防止数据丢失或泄露等措施。信息安全通常与数据存储、传输和处理相关。 而网络安全更侧重于保护计算机系统和网络免受攻击、病毒、蠕…

C++类与对象 (上)

目录 前言&#xff1a; 类和对象的理解 类的引入 类的定义与使用方式 访问限定符 类的两种定义方式 成员变量的命名规则 类的作用域 类的实例化 类对象模型 计算类对象的大小 类对象的存储方式 this指针 前言&#xff1a; C语言是面向过程的&#xff0c;关注的是过…

我想开发一款跨平台桌面软件,请告诉我qt、electron、tauri、pyqt、flutter分别适合开发哪些跨平台桌面

不同的跨平台桌面开发工具适用于不同的应用场景和开发者需求。以下是关于 Qt、Electron、Tauri、PyQt、Flutter 的简要说明&#xff0c;以帮助你更好地选择适合你项目的工具&#xff1a; Qt: 适用场景&#xff1a; Qt 是一个强大的 C 框架&#xff0c;适用于开发需要高性能和原…

【LeetCode】数组精选17题——双指针、滑动窗口、前缀和

目录 快慢指针&#xff1a; 1. 移动零&#xff08;简单&#xff09; 2. 复写零&#xff08;简单&#xff09; 对撞指针&#xff1a; 1. 两数之和 II - 输入有序数组&#xff08;中等&#xff09; 2. 三数之和&#xff08;中等&#xff09; 3. 有效三角形的个数&#xff…

python语言中“缩进”说法,python中的缩进规则

本篇文章给大家谈谈python语言中“缩进”说法&#xff0c;以及python中的缩进规则&#xff0c;希望对各位有所帮助&#xff0c;不要忘了收藏本站喔。 缩进是Python的灵魂 Python是一门独特的语言&#xff0c;它的代码块是通过缩进&#xff08;Indentation&#xff09;来标记的&…

QT自带打包问题:无法定位程序输入点?metaobject@qsound

文章目录 无法定位程序输入点?metaobjectqsound……检查系统环境变量的配置&#xff1a;打包无须安装qt的文件 无法定位程序输入点?metaobjectqsound…… 在执行release打包程序后&#xff0c;相应的release文件夹下的exe文件&#xff0c;无法打开 如有错误欢迎指出 检查系…

LCR 181. 字符串中的单词反转

解题思路&#xff1a; class Solution {public String reverseMessage(String message) {message message.trim(); // 删除首尾空格int j message.length() - 1, i j;StringBuilder res new StringBuilder();while (i > 0) {while (i >…

如何批量获取CSDN文章数据并进行持久化

自己去看文章数据的话&#xff0c;比较慢&#xff0c;所以一直想通过程序来批量获取CSDN的文章数据&#xff0c;最近研究了一下&#xff0c;发现还是挺简单的&#xff0c;能够直接通过解析json来获取文章数据&#xff0c;跟大家分享一下。 文章目录 一、步骤1、首先我们到自己的…

JavaScript数组分组groupBy

JavaScript 最近发布了一个方法 Object.groupBy&#xff0c;可以对可迭代对象中的元素进行分组。 语法&#xff1a; Object.groupBy(items, callbackFn)items 被分组的可迭代对象&#xff0c;如 Array。 callbackFn 对可迭代对象中的每个元素执行的函数。 举个例子&#…

结构型设计模式(一):门面模式 组合模式

门面模式 Facade 1、什么是门面模式 门面模式&#xff08;Facade Pattern&#xff09;是一种结构型设计模式&#xff0c;旨在为系统提供一个统一的接口&#xff0c;以便于访问子系统中的一群接口。它通过定义一个高层接口&#xff0c;简化了客户端与子系统之间的交互&#xf…

基于Java SSM框架实现图书店仓库进销存管理系统项目【项目源码+论文说明】

基于java的SSM框架实现图书店仓库进销存管理系统演示 摘要 仓库作为储存货物的核心功能之一&#xff0c;在整个仓储中具有非常重要的作用&#xff0c;是社会物质生产的必要条件。良好的仓库布局环境能够对货物进入下一个环节前的质量起保证作用&#xff0c;能够为货物进入市场…

FPGA设计与实战之时钟及时序简介1

文章目录 一、时钟定义二、基本时序三、总结一、时钟定义 我们目前设计的电路以同步时序电路为主,时钟做为电路工作的基准而显得非常重要。 简单的接口电路比如I2C、SPI等,复杂一点接口比如Ethernet的MII、GMII等接口,它们都有一个或多个时钟信号。 那么什么是时钟信号?它…

PADS9.5 : 原图绘图图纸尺寸下修改

原图绘图图纸尺寸下修改 图页边界线也要修改 如果二者选择不一致&#xff1a; 会出现下图所示情况&#xff1a;

Android hwcomposer服务启动流程

Android hwcomposer服务启动流程 客户端 binder远程调用 服务端 surfaceflinger --binder--> hwcomposer .hal文件编译时生成支持binder进程间远程调用通信的cpp文件 在out/soong/.intermediates/hardware/interfaces/graphics/composer/2.1/ 目录下找…