rabbitmq-常见七种消息队列-控制台界面管理-python-实现简单访问

文章目录

    • 1.消息的基本概念
      • 1.1.生产者和消费者
      • 1.2.消息队列(Queue)
      • 1.3.交换机(Exchange)
      • 1.4.消息确认
    • 2.七种队列模式
      • 2.1.简单模式(Hello World)
      • 2.2.工作队列模式(Work queues)
      • 2.3.发布订阅模式(Publish/Subscribe)
      • 2.4.路由模式(Routing)
      • 2.5.主题模式(Topics)
      • 2.6.远程过程调用(RPC)
      • 2.7.发布者确认(Publisher Confirms)
    • 3.rabbitmq控制台管理
      • 3.1.三种基本队列
        • 3.1.2.Classic经典队列-单机环境队列
        • 3.1.3.Quorum仲裁队列-高可用队列
        • 3.1.3.Stream队列-大规模队列
      • 3.2.生产者和消费者
      • 3.3.交换机(exchange)
    • 4.Python访问rabbitmq
      • 4.1.生产者提交消息
      • 4.2.消费者执行消息
    • 5.总结

1.消息的基本概念

1.1.生产者和消费者

生产者(Producer)
消息的创建者。
负责创建和推送数据到消息服务器。

消费者(Consumer)
消息的接收方。
负责接收消息和处理数据。

1.2.消息队列(Queue)

消息队列是RabbitMQ的内部对象,用于存储生产者的消息直到发送给消费者,它是消费者接收消息的地方。

消息队列的重要属性:

持久性
broker重启前都有效。
自动删除
在所有消费者停止使用之后自动删除。
惰性
没有主动声明队列,调用会导致异常。
排他性
一旦启用,声明它的消费者才能使用。

1.3.交换机(Exchange)

交换机用于接收,分配消息。

  1. 生产者要先指定一个routing key,然后将消息发送到交换机。
  2. routing key需要与exchange type和binding key联合使用才能最终生效。
  3. 交换机将消息路由到一个或多个队列中,或丢弃。

交换机包含4中类型: direct, topic, fanout, headers。

direct(直连交换机)
具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列。先匹配,再投送。Direct Exchange是RabbitMQ的默认交换机模式。这是最简单的模式。它根据routing key全文匹配去寻找队列。

1.4.消息确认

消息确认是指当一个消息从队列中投递给消费者(consumer)后,消费者会通知一下消息代理(broker)。消息确认可以自动,也可以由处理消息的开发者手动执行。当启用消息确认后,消息代理需要收到来自消费者的确认回执后,才完全将消息从队列中删除。

2.七种队列模式

2.1.简单模式(Hello World)

在这里插入图片描述

做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B。
单生产者,单消费者,单队列。

应用场景:

将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人。

2.2.工作队列模式(Work queues)

在这里插入图片描述

在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者。适用于资源密集型任务, 单个消费者处理不过来,需要多个消费者进行处理的场景。单生产者,多消费者,单队列。

应用场景:

一个订单的处理需要10s,有多个订单可以同时放到消息队列, 然后让多个消费者同时并行处理,而不是单个消费者的串行消费。

2.3.发布订阅模式(Publish/Subscribe)

在这里插入图片描述
一次向许多消费者发送消息,将消息将广播到所有的消费者。单生产者,多消费者,多队列。

应用场景:

更新商品库存后需要通知多个缓存和多个数据库。

结构如下:

一个fanout类型交换机扇出两个消息队列,分别为缓存消息队列、数据库消息队列
一个缓存消息队列对应着多个缓存消费者
一个数据库消息队列对应着多个数据库消费者

2.4.路由模式(Routing)

在这里插入图片描述
根据Routing Key有选择地接收消息。多消费者,选择性多队列,每个队列通过routing key全文匹配。发送消息到交换机并且要指定路由键(Routing key) 。消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息。

应用场景:

在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12 promote,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息。

2.5.主题模式(Topics)

在这里插入图片描述
主题交换机方式接收消息,将routing key和模式进行匹配。多消费者,选择性多队列,每个队列通过模式匹配。队列需要绑定在一个模式上。#匹配一个词或多个词,*只匹配一个词。

应用场景:

iphone促销活动可以接收主题为多种iPhone的消息,如iphone12、iphone13等。

2.6.远程过程调用(RPC)

在这里插入图片描述

在远程计算机上运行功能并等待结果。

应用场景:

需要等待接口返回数据,如订单支付。

2.7.发布者确认(Publisher Confirms)

与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。

应用场景:

对于消息可靠性要求较高,比如钱包扣款。

3.rabbitmq控制台管理

打开浏览器。访问 http://127.0.0.1:15672
出现管理页面:
账号:guest
密码:guest

登录后,您将看到 RabbitMQ 的控制台界面。该界面将显示以下几个主要部分:

Overview:概览页面提供了关于 RabbitMQ 节点、队列和交换机等的统计信息。
Connections:连接页面提供了有关当前客户端连接的详细信息。
Channels:通道页面提供了有关当前打开通道的详细信息。
Exchanges:交换机页面提供了有关 RabbitMQ 服务器上已声明的交换机的详细信息。
Queues:队列页面提供了有关 RabbitMQ 服务器上已声明的队列的详细信息。
Admin:管理页面提供了一些高级管理功能,例如添加用户、设置权限和定义策略等。

通过 RabbitMQ 控制台,您可以执行许多操作,例如创建和删除队列、交换机和绑定、查看队列和交换机的详细信息、监视连接和通道等。控制台还提供了一些高级管理功能,例如添加用户、设置权限和定义策略等。

3.1.三种基本队列

队列是具有两个主要操作的顺序数据结构:入队和出队。RabbitMQ中的队列是FIFO(先进先出)。一些队列因为一些特性,即消费者的优先级和重新排队,会影响消费者消费的顺序。
在这里插入图片描述

3.1.2.Classic经典队列-单机环境队列

这个是RabbitMQ最经典的队列类型。在单机环境中,拥有比较高的消息可靠性。
在这里插入图片描述我们在创建队列的时候,根据上图可以看到,经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。

Durability:是否持久化,可选项为持久化(Durable)和 临时(Transient)。Durable相对消息安全性更高。但是同时需要有更多的IO操作,所以生产和消费消息的性能,相比Transient会比较低。
Auto delete:是否自动删除,如果选择是,则消息会被其中一个消费者消费之后,队列会自动销毁,其他消费者也会断开连接。一般不会自动删除。

3.1.3.Quorum仲裁队列-高可用队列

仲裁队列,是RabbitMQ从3.8.0版本引入的新的队列类型,整个3.8.X版本,也是针对仲裁队列进行完善和优化。Quorum相比Classic在分布式环境下对消息的可靠性保障更高。官方文档中表示,未来会使用Quorum代替Classic。

在这里插入图片描述
Quorum是基于Raft一致性协议实现的一种新型的分布式消息队列,他实现了持久化,多备份的FIFO队列,主要就是针对RabbitMQ的镜像模式设计的。简单理解就是Quorum队列中的消息需要有集群中多半节点同意确认后,才会写入到队列中。这种队列类似于RocketMQ当中的DLedger集群。这种方式可以保证消息在集群内部不会丢失。同时,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。从整体功能上来说,Quorum队列是在Classic经典队列的基础上做减法,因此对于RabbitMQ的长期使用者而言,其实是会影响使用体验的。他与普通队列的区别:

特性ClassicQuorum
非持久化队列(Non-durable queues支持不支持
独占队列(Exclusivity支持不支持
每条消息的持久化(Per message persistence每条消息总是
会员变更(Membership changes自动手动
消息TTLMessage TTL支持支持(3.10版本开始)
队列TTLQueue TTL支持支持
队列长度限制(Queue length limits支持支持
懒加载(Lazy behaviour支持始终
消息优先级(Message priority支持不支持
消费者优先级(Consumer priority支持支持
死信交换(Dead letter exchanges支持支持
毒消息处理(Poison message handling不支持支持
全局QosGlobal QoS Prefetch支持不支持

Exclusivity表示独占队列,即表示队列只能由声明该队列的Connection连接来进行使用,包括队列创建、删除、收发消息等,并且独占队列会在声明该队列的Connection断开后自动删除。其中有个特例就是这个Poison Message。所谓毒消息是指消息一直不能被消费者正常消费(可能是由于消费者失败或者消费逻辑有问题等),就会导致消息不断的重新入队,这样这些消息就成为了毒消息。这些读消息应该有保障机制进行标记并及时删除。
Quorum队列会持续跟踪消息的失败投递尝试次数,并记录在x-delivery-count这样一个头部参数中。然后,就可以通过设置 Delivery limit参数来定制一个毒消息的删除策略。当消息的重复投递次数超过了Delivery limit参数阈值时,RabbitMQ就会删除这些毒消息。当然,如果配置了死信队列的话,就会进入对应的死信队列。
Quorum队列更适合于 队列长期存在,并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景。例如 电商系统的订单,引入MQ后,处理速度可以慢一点,但是订单不能丢失。

也对应以下一些不适合使用的场景:

一些临时使用的队列:比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列。
对消息低延迟要求高: 一致性算法会影响消息的延迟。
对数据安全性要求不高:Quorum队列需要消费者手动通知或者生产者手动确认。
队列消息积压严重 : 如果队列中的消息很大,或者积压的消息很多,就不要使用Quorum队列。Quorum队列当前会将所有消息始终保存在内存中,直到达到内存使用极限。

3.1.3.Stream队列-大规模队列

Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型,也是目前官方最为推荐的队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。
在这里插入图片描述

Stream队列的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。这种队列提供了RabbitMQ已有的其他队列类型不太好实现的四个特点:

大规模分发(large fan-outs)

当想要向多个订阅者发送相同的消息时,以往的队列类型必须为每个消费者绑定一个专用的队列。如果消费者的数量很大,这就会导致性能低下。而Stream队列允许任意数量的消费者使用同一个队列的消息,从而消除绑定多个队列的需求。

消息回溯(Replay/Time-travelling)

RabbitMQ已有的这些队列类型,在消费者处理完消息后,消息都会从队列中删除,因此,无法重新读取已经消费过的消息。而Stream队列允许用户在日志的任何一个连接点开始重新读取数据。

高吞吐性能(Throughput Performance)

Stream队列的设计以性能为主要目标,对消息传递吞吐量的提升非常明显。

大日志(Large logs)

RabbitMQ一直以来有一个让人诟病的地方,就是当队列中积累的消息过多时,性能下降会非常明显。但是Stream队列的设计目标就是以最小的内存开销高效地存储大量的数据。

3.2.生产者和消费者

生产者是消息的提供方,消费者是消息的执行方,代表的是行为的不同,rabbitmq通过登录来验证不同的用户,不同的行为来代表不同人的特征。

3.3.交换机(exchange)

在RabbitMQ中,所有的producer都不会直接把message发送到queue中,甚至producer都不知道message在发出后有没有发送到queue中,事实上,producer只能将message发送给exchange,由exchange来决定发送到哪个queue中。

exchange的一端用来从producer中接收message,另一端用来发送message到queue,exchange的类型规定了怎么处理接收到的message,发布订阅模式使用到的exchange类型为 fanout ,这种exchange类型非常简单,就是将接收到的message广播给已知的(即绑定到此exchange的)所有consumer。

当然,如果不想使用特定的exchange,可以使用 exchange=‘’ 表示使用默认的exchange,默认的exchange会将消息发送到 routing_key 指定的queue,可以参考工作(任务)队列模式和Hello world模式。

4.Python访问rabbitmq

在python语言环境下,使用pika库来访问访问操作rabbitmq中间件。

pip install pika

在这里插入图片描述

4.1.生产者提交消息

# coding=utf-8
# producer
import pika

# 指定远程 rabbitmq 的用户名密码并创建凭证
credentials = pika.PlainCredentials(username="guest", password="guest")

# 1. 创建 connect 连接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))

# 2. 在 connect 上创建一个 channel
channel = connect.channel()

# 3. 在 channel 上声明交换器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)

# 4. 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello')

# 5. 通过键 'world' 将队列和交换器绑定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')

# 6. 创建纯文本消息
msg_props = pika.BasicProperties()
msg_props.content_type = 'text/plain'

# 7. 将消息发送到 RabbitMQ
message = 'quit'
channel.basic_publish(exchange='hello',  routing_key='world',  properties=msg_props, body=message)

# 8. 关闭通道
channel.close()

# 9. 当生产者发送完消息后,可选择关闭连接
connect.close()

4.2.消费者执行消息

#!/usr/bin/env python
# coding=utf-8

# consumer

import pika

# 指定远程 rabbitmq 的用户名密码并创建凭证
credentials = pika.PlainCredentials(username="guest", password="guest")

# 1. 创建 connect 连接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))

# 2. 在 connect 上创建一个 channel
channel = connect.channel()

# 3. 在 channel 上声明交换器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)

# 4. 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello')

# 5. 通过键 'world' 将队列和交换器绑定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')

# 6. 定义一个回调函数,用来接收生产者发送的消息
'''

在 Python3 中,bytes 和 str 的互相转换方式是
str.encode('utf-8')
bytes.decode('utf-8')

'''
def callback(channel, method, properties, body):
    # 消息确认
    channel.basic_ack(delivery_tag=method.delivery_tag)

    if body.decode('utf-8') == "quit":
        # 停止消费,并退出
        channel.basic_cancel(consumer_tag='hello-consumer')
        channel.close()
        connect.close()
    else:
        print("msg is {}".format(body))
        # print("msg type {}".format(type(body)))
        # print("msg eval after type {}".format(type(eval(body))))


# 7. 消费者消费
channel.basic_consume('hello', callback,  auto_ack=False)

# 8. 开始循环取消息
channel.start_consuming()

5.总结

通过使用rabbitmq技术,可以实现生产者和消费者模式,并实现两者的解耦,生产者负责通过交换机将数据存入队列,而消费者从队列中取数据,并执行相应的消息。可以用在服务器复杂耗时任务的并行计算中使用,与常用的web服务器(如apache等)解耦,提高服务器计算资源的利用效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/254253.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

jmeter之HTTP代理服务器脚本

前言:没有接口文档的话,可用http代理服务器录制脚本 1.设置客户端的代理(本机的代理) 计算机右键属性->搜索代理服务器设置 代理输入jmeter所在电脑的ip和8888端口。 2.创建http代理服务器 测试计划->添加->非测试元件…

店面销售技巧培训之如何提升店面销售技巧

如何提升店面销售技巧 店面销售是零售业中非常重要的环节,而销售技巧则是决定销售成功与否的关键因素。提升店面销售技巧,不仅可以提高销售业绩,还可以增强客户满意度,提升品牌形象。本文将介绍一些提升店面销售技巧的方法&#…

vuepress-----25、右侧目录

# 25、vuepress 右侧目录 https://github.com/xuek9900/vuepress-plugin-right-anchor vuepress-plugin-right-anchor English |中文 在用 Vuepress 2.x 编写的文档页面右侧添加 锚点导航栏 # 版本 2.x.x -> Vuepress 2.x -> npm next -> master 分支0…

基于Django框架实现的图像相似性搜索网页应用项目源码+数据库,上传图片到网站,基于预训练的 VGG16 模型提取图像特征

项目描述 一个基于Django框架实现的图像相似性搜索网页应用。用户可以通过上传图片到网站,然后该项目会基于预训练的 VGG16 模型提取图像特征,并利用已有图库中的图像特征与上传图片的特征进行比较,计算相似度并呈现给用户。 项目运行效果截…

设计模式——责任链模式(行为模式)

引言 责任链模式是一种行为设计模式, 允许你将请求沿着处理者链进行发送。 收到请求后, 每个处理者均可对请求进行处理, 或将其传递给链上的下个处理者。 问题 假如你正在开发一个在线订购系统。 你希望对系统访问进行限制, 只允…

【网络安全】-Linux操作系统—CentOS安装、配置

文章目录 准备工作下载CentOS创建启动盘确保硬件兼容 安装CentOS启动安装程序分区硬盘网络和主机名设置开始安装完成安装 初次登录和配置更新系统安装额外的软件仓库安装网络工具配置防火墙设置SELinux安装文本编辑器配置SSH服务 总结 CentOS是一个基于Red Hat Enterprise Linu…

基于html5的演唱会购票系统的设计与实现论文

基于html5的演唱会购票系统的设计与实现 摘要 随着信息互联网购物的飞速发展,一般企业都去创建属于自己的电商平台以及购物管理系统。本文介绍了基于html5的演唱会购票系统的设计与实现的开发全过程。通过分析企业对于基于html5的演唱会购票系统的设计与实现的需求…

13.仿简道云公式函数实战-逻辑函数-NOT

1. NOT函数 NOT 函数可用于对其参数的逻辑求反,当逻辑为 true 时,返回结果 false;当逻辑为 false 时,返回结果 true。 2. 函数用法 NOT(logical) 3. 函数示例 1)NOT(A),表示如果 A 为 true 时&#xf…

惯性导航基础知识学习----01惯性器件相关

🌈武汉大学惯性导航课程合集是入门惯导的精品课程~ 作为导航路上的鼠鼠我,要开始学习惯性导航了~ 需要达到的要求是大致了解惯导的原理等~ 后期会陆续更新惯导相关的知识和笔记等~ 🐬 本blog为 武汉大学惯性导航课程 的记录~ 感谢团队提供的开…

提升广东省水泥行业效率的MES解决方案

广东省水泥行业作为我国重要的基础建设材料生产领域之一,提高其生产效率和降低能源消耗具有重要意义。而随着技术的发展,我国的MES系统技术发展,通过引入MES系统成为了实现降本增效目标的必要且有效的手段。MES是一种基于计算机技术的管理工具…

argparse --- 命令行选项、参数和子命令解析器详解

一、argparse简介 argparse模块提供了非常方便的命令行参数解析功能,能够大大简化命令行程序的开发。 现在的大型项目中都会运用argparse来管理项目中涉及的参数,在使用命令行时更好地定义模型参数。 argparse定义四个步骤: 导入argparse包…

条款5:了解c++默默编写并调用了哪些函数

如果你不自己声明,编译器会替你声明(编译器版本的)拷贝构造函数、拷贝赋值运算符和析构函数。此外,如果你没有声明任何构造函数,编译器会为你声明一个默认构造函数。 class Empty{};本质上和写成下面这样是一样的: c…

LLM之Agent(六)| 使用AutoGen、LangChian、RAG以及函数调用构建超级对话系统

本文我们将尝试AutoGen集成函数调用功能。函数调用最早出现在Open AI API中,它允许用户调用外部API来增强系统的整体功能和效率。例如,在对话过程中根据需要调用天气API。 函数调用和Agent有各种组合,在这里我们将通过函数调用调用RAG检索增强…

SpringBoot 3.2.0 版本 mysql 依赖下载错误

最近想尝试一下最新的 SpringBoot 项目,于是将自己的开源项目进行了一些升级。 JDK 版本从 JDK8 升级至 JDK17。SpringBoot 版本从 SpringBoot 2.7.3 升级到 SpringBoot 3.2.0 其中 JDK 的升级比较顺利,毕竟 JDK 的旧版本兼容性一直非常好。 但是在升级…

红酒为何会变蓝?这是什么原因?

有的朋友发现红酒酒渍当天没有清洗,第二天会变蓝。于是非常好奇,明明红色的葡萄酒,怎么会变蓝呢? 葡萄酒不但含有酒精,还含有丰富的天然色素花青素。就像我们平时在家煮紫薯粥,加一点小苏打明明紫红色的粥就…

C++计算(a+b)*(c-b)的值 2023年9月c++一级 电子学会中小学生软件编程C++等级考试一级真题答案解析

目录 C计算(ab)*(c-b)的值 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 C计算(ab)*(c-b)的值 2023年9月 C编程等级考试一级编程题 一、题目要求 1、编程实现 给定3个整数a、b、c,计算表达…

爬楼梯算法

计算跳到n阶的跳法总数 package com.zxj.algorithm.动态规划;import lombok.extern.slf4j.Slf4j;import java.util.Arrays;/*** 递归函数 f(n): 计算跳到n阶的跳法总数。* <p>* f(0) 0,* f(1) 1,* f(2) 2,* f(n) f(n-1) f(n-2)*/ Slf4j public class 爬楼梯 {/*** …

【MATLAB】数据拟合第12期-基于高斯核回归的拟合算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 基于高斯核回归的拟合算法是一种处理回归问题的机器学习方法。以下是该算法的简单介绍&#xff1a; 核心思想&#xff1a;高斯核回归的核心思想是利用高斯核函数对数据点进行非线性映射&a…

SpringBoot已经禁掉了循环依赖!

还在问循环依赖嘛&#xff1f;SpringBoot已经禁掉了循环依赖&#xff01; 首发2023-12-18 11:26yuan人生 如果现在面试时还有人问你循环依赖&#xff0c;你就这样怼他&#xff1a;循环依赖是一种代码质量低下的表现&#xff0c;springboot2.6之后的版本已经默认禁用了。 Spr…

【MySQL】数据库基础入门 安装MySQL

目录 介绍&#xff1a; 安装MySQL: 设置 root 账号密码 2.配置环境变量 2.找到 Path 系统变量, 点击 "编辑" 介绍&#xff1a; MySQL是一个开源的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;它是一种用于管理和存储数据的软件。 安装MySQL: …