RabbitMQ基础编程模型及详细使用

目录

RabbitMQ基础编程模型

引入依赖

创建连接,获取Channel

 声明Exchange-可选

声明queue

声明Exchange与Queue的绑定关系-可选

Producer根据应用场景发送消息到queue

Consumer消费消息

Consumer主要有两种消费方式

1、被动消费模式

2、主动消费模式

完成以后关闭连接,释放资源


RabbitMQ基础编程模型

       在rabbitmq中各种消费模式都对应比较统一的编程模型。

引入依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>
创建连接,获取Channel
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
factory.setPort(HOST_PORT);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUAL_HOST);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

​       一般情况下,一个客户端里只创建一个Channel就可以了,因为一个Channel只要不关闭,是可以一直复用的。但是如果你想要创建多个Channel,要注意一下Channel冲突的问题。

       在创建channel时,可以在createChannel方法中传入一个分配的int参数channelNumber。这个ChannelNumber就会作为Channel的唯一标识。而RabbitMQ防止ChannelNumber重复的方式是:如果对应的Channel没有创建过,就会创建一个新的Channel。但是如果ChannelNumber已经创建过一个Channel了,这时就会返回一个null。


 声明Exchange-可选
channel.exchangeDeclare(String exchange, String type, boolean durable
, boolean autoDelete,Map<String, Object> arguments) throws IOException;

 以上方法的参数解释:

1. exchange(String)
       交换机的名称,用于标识消息应该被发送到哪个交换机。在发布消息时,生产者将消息发送到交换机,然后由交换机路由到一个或多个队列。
2. type(String)
交换机的类型,指定交换机的消息分发规则。常见的类型有:
        direct:直连交换机,根据消息的路由键将消息发送到与之绑定的队列。
        topic:主题交换机,使用通配符匹配规则进行消息路由。
        fanout:广播交换机,将消息发送到与之绑定的所有队列。
        headers:头部交换机,根据消息的头部信息进行路由。
3. durable(boolean)
       表示交换机是否是持久化的。如果设置为 true,交换机将在服务器重启后仍然存在,持久化到磁盘上。如果设置为 false,交换机将在服务器重启后被删除。
4. autoDelete(boolean)
       表示交换机是否是自动删除的。如果设置为 true,交换机将在至少一个队列或交换机与其绑定之后没有消费者时自动删除。如果设置为 false,交换机将一直存在,直到被显式删除。
5. arguments(Map<String, Object>)
       用于设置交换机的其他参数,以键值对的形式传入。例如,可以通过该参数设置交换机的一些额外属性或配置。

​       Exchange在消息收发过程中是一个可选的步骤,如果要使用就需要先进行声明。在声明Exchange时需要注意,如果Broker上没有对应的Exchange,那么RabbitMQ会自动创建一个新的交换机。但是如果Broker上已经有了这个Exchange,那么你声明时的这些参数需要与Broker上的保持一致。如果不一致就会报错。

 对比控制台参数,我们可以发现非常相似。


声明queue
channel.queueDeclare(String queue, boolean durable
, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);

以上方法的参数解释:

1.queue(String)
       队列的名称,用于标识消息将被发送到哪个队列。在消息发送时,生产者可以将消息发送到队列中,然后由队列分发给消费者。
2.durable(boolean)
       表示队列是否是持久化的。如果设置为 true,队列将在服务器重启后仍然存在,持久化到磁盘上。如果设置为 false,队列将在服务器重启后被删除。
3.exclusive(boolean)
       表示队列是否是排他的(exclusive)。如果设置为 true,则只有声明这个队列的连接(connection)可以使用它,一旦连接关闭,队列将被删除。排他队列通常用于一次性使用的临时队列。
4.autoDelete(boolean)
       表示队列是否是自动删除的。如果设置为 true,队列将在至少一个消费者连接后没有再被使用时自动删除。如果设置为 false,队列将一直存在,直到被显式删除。
5.arguments(Map<String, Object>)
       用于设置队列的其他参数,以键值对的形式传入。例如,可以通过该参数设置队列的最大长度、最大优先级等额外属性或配置。

       这是应用开发过程中必须要声明的一个组件。与Exchange一样,如果你声明的Queue在Broker上不存在,RabbitMQ会创建一个新的队列。但是如果Broker上已经有了这个队列,那么声明的属性必须和Broker上的队列保持一致,否则也会报错。

        声明Queue时,同样大部分的参数是可以从管理平台看到的。比如Durability,AutoDelete以及后面的arguments参数可以传哪些参数,都可以从页面上看到。

       Queue与Exchange不同的是, 队列类型并没有在API中体现。这是因为不同类型之间的Queue差距是很大的,无法用统一的方式来描述不同类型的队列。比如对于Quorum和Stream类型,根本就没有Durability和AutoDelete属性,他们的消息默认就是会持久化的。后面的属性参数也会有很大的区别。

​       唯一有点不同的是队列的Type属性。在客户端API中,目前并没有一个单独的字段来表示队列的类型。只能通过后面的arguments参数来区分不同的队列。如果要声明一个Quorum队列,则只需要在后面的arguments中传入一个参数,x-queue-type,参数值设定为quorum

Map<String,Object> params = new HashMap<>();
params.put("x-queue-type","quorum");
//声明Quorum队列的方式就是添加一个x-queue-type参数,指定为quorum。默认是classic
channel.queueDeclare(QUEUE_NAME, true, false, false, params);

注意:对于Quorum类型,durable参数就必须是true了,设置成false的话,会报错。同样,exclusive参数必须设置为false

​ 如果要声明一个Stream队列,则 x-queue-type参数要设置为 stream

        Map<String,Object> params = new HashMap<>();
        params.put("x-queue-type","stream");
        params.put("x-max-length-bytes", 20_000_000_000L); // maximum stream size: 20 GB
        params.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB
        channel.queueDeclare(QUEUE_NAME, true, false, false, params);

注意:durable参数必须是true,exclusive必须是false。x-max-length-bytes 表示日志文件的最大字节数。x-stream-max-segment-size-bytes 每一个日志文件的最大大小。这两个是可选参数,通常为了防止stream日志无限制累计,都会配合stream队列一起声明。


声明Exchange与Queue的绑定关系-可选
channel.queueBind(String queue, String exchange, String routingKey) throws IOException;

以上方法参数解释

1. queue(String)
       队列的名称。这是需要绑定到交换机的队列的名字。该队列应该已经通过 queueDeclare 方法声明过了,或者是一个已经存在的队列。
2. exchange(String)
       交换机的名称。这是消息将要发送到的交换机的名字。该交换机应该已经通过 exchangeDeclare 方法声明过了,或者是一个已经存在的交换机。
3. routingKey(String)
       路由键。它是一个标记,用于确定消息如何路由到队列。对于不同类型的交换机,路由键的作用略有不同:
       对于直连交换机(Direct Exchange),消息会被发送到路由键完全匹配的队列。
       对于主题交换机(Topic Exchange),路由键可以包含通配符,用于匹配多个队列。
       对于广播交换机(Fanout Exchange),路由键通常被忽略,因为消息会发送到所有绑定的队列。
       对于头部交换机(Headers Exchange),路由键通常不起作用,路由决策是基于消息头部的键值对。

      如果我们声明了Exchange和Queue,那么就还需要声明Exchange与Queue的绑定关系Binding。有了这些Binding,Exchange才可以知道Producer发送过来的消息将要分发到哪些Queue上。这些Binding涉及到消息的不同分发逻辑,与Exchange和Queue一样,如果Broker上没有建立绑定关系,那么RabbitMQ会按照客户端的声明,创建这些绑定关系。但是如果声明的Binding存在了,那么就需要与Broker上的保持一致。​ 另外,在声明Binding时,还可以传入两个参数, routingKey和props。这两个参数都是跟Exchange的消息分发逻辑有关。


Producer根据应用场景发送消息到queue
channel.basicPublish(String exchange, String routingKey
, BasicProperties props,message.getBytes()) ;

以上方法参数解释

1. exchange(String)
       交换机的名称。指定要将消息发送到哪个交换机。消息将被发送到该交换机,然后根据交换机的类型和绑定规则路由到一个或多个队列。
2. routingKey(String)
       路由键。用于指定消息在交换机和队列之间的路由规则。根据不同的交换机类型,路由键的作用也有所不同。例如,对于直连交换机(Direct Exchange),消息将被发送到与路由键完全匹配的队列。
3. props(BasicProperties)
       消息的基本属性。这是一个 BasicProperties 对象,用于设置消息的一些基本属性,如消息的持久性、优先级、过期时间等。通常可以使用 MessageProperties 工具类来创建 BasicProperties 对象。
4. message.getBytes()
       消息的内容,以字节数组的形式表示。这是要发送的实际消息内容。

       这其中Exchange如果不需要,传个空字符串就行了。routingKey跟Exchange的消息分发逻辑有关。关于props参数,可以传入以下消息相关的属性。

​ props的这些配置项,可以用RabbitMQ中提供的一个Builder对象来构建。

  AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); 
  //对应页面上的Properties部分,传入一些预定的参数值。
  builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());
  builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());
  AMQP.BasicProperties prop = builder.build();

       在发送消息时要注意一下消息的持久化问题。MessageProperties.PERSISTENT_TEXT_PLAIN是RabbitMQ提供的持久化消息的默认配置。而RabbitMQ中消息是否持久化不光取决于消息,还取决于Queue。通常为了保证消息安全,会将Queue和消息同时声明为持久化。


Consumer消费消息

​       定义消费者,消费消息进行处理,并向RabbitMQ进行消息确认。确认了之后就表明这个消息已经消费完了,否则RabbitMQ还会继续发起重试。

Consumer主要有两种消费方式
1、被动消费模式

       Consumer等待rabbitMQ 服务器将message推送过来再消费。一般是启一个一直挂起的线程来等待。     

channel.basicConsume(String queue, boolean autoAck, Consumer callback);

以上方法参数解释

1.1 queue(String)
      要消费的队列的名称。指定消费者将从哪个队列接收消息。该队列应该已经存在,可以通过 queueDeclare 方法进行声明。
1.2 autoAck(boolean)
      是否自动确认消息。如果设置为 true,表示一旦消费者收到消息,就自动确认(acknowledge)消息。这意味着消息一旦被发送到消费者,就会从队列中删除。如果设置为 false,则需要手动调用 channel.basicAck 来确认消息。
1.3 callback(Consumer)
       一个实现了 Consumer 接口的对象,用于处理接收到的消息。Consumer 接口中定义了用于处理消息的方法,主要是 handleDelivery 方法,该方法会在收到消息时被调用。

public interface Consumer {
    void handleConsumeOk(String consumerTag);
    void handleCancelOk(String consumerTag);
    void handleCancel(String consumerTag);
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
    void handleRecoverOk(String consumerTag);
    void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
}

       在 handleDelivery 方法中,可以编写具体的业务逻辑来处理接收到的消息。Envelope 包含了有关消息的元数据,properties 包含了消息的属性,body 包含了消息的内容。

2、主动消费模式

Comsumer主动到rabbitMQ服务器上去拉取messge进行消费。

GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);

        autoAck为true则表示消息被Consumer消费成功后,后续就无法再消费了。而如果autoAck设置为false,就需要在处理过程中手动去调用channel的basicAck方法进行应答。如果不应答的话,这个消息同样会继续被Consumer重复处理。所以这里要注意,如果消费者一直不对消息进行应答,那么消息就会不断的发起重试,这就会不断的消耗系统资源,最终造成服务宕机。但是也要注意,如果autoAck设置成了true,那么在回调函数中就不能再手动进行ack。重复的ack会造成Consumer无法正常消费更多的消息。


完成以后关闭连接,释放资源
channel.close(); 
conection.clouse();

        用完之后主动释放资源。如果不主动释放的话,大部分情况下,过一段时间RabbitMQ也会将这些资源释放掉,但是这就需要额外消耗系统资源。(如果是消费者被动消费则不需要关闭,因为需要一直等待消息推送过来)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/355050.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

sqli-labs闯关

目录 1.安装靶场2.了解几个sql常用知识2.1联合查询union用法2.2MySQL中的通配符&#xff1a;2.3常用函数2.4数据分组 3.mysql中重要的数据库和表4.开始闯关4.1 Less-14.1.1 首先进行一次常规的注入4.1.2 深入解析 1.安装靶场 1.首先推荐使用github下载靶场源码 https://githu…

内网安全:PTH PTK PTT

目录 实验所用网络拓朴图 网络环境说明​​​​​​​ LM认证 NTLM认证 NTLM Hash Kerberos认证 TGT票据 服务票据 Windows系统密码存储 域控制器 - 用户登录 域用户 本地用户 域用户和本地管理员 用户登录 Mimikatz抓取密码来源 域内一台主机上可以得到非本地用…

js实现贪吃蛇

文章目录 实现方法_11实现效果2 实现步骤2.1 移动场地2.2 游戏难度2.3 造蛇和食物2.4 蛇的移动2.5 产生食物的随机位置 3 全部代码 实现方法_21 实现效果2实现想法2.1 蛇的存储 实现方法_1 1实现效果 2 实现步骤 html部分忽略&#xff0c;布局写的太辣眼了 2.1 移动场地 用的表…

遥感的CCDC连续变化监测的qgis插件

简介 今天我逛GitHub的时候&#xff0c;看到一个比较有意思的插件:CCD-Plugin&#xff0c;记录一下。 CCD-Plugin是一个qgis插件&#xff0c;它使用 Google Earth Engine 获取 Landsat 或 Sentinel2 数据集&#xff0c;并运行连续变化检测 (CCDC) 算法来分析给定点的多年时间序…

【C++】一题掌握空指针

今天看见一道面试题&#xff0c;比较有意思&#xff0c;这一分享出来&#xff1a; 1.下面程序能编译通过吗&#xff1f; 2.下面程序会崩溃吗&#xff1f;在哪里崩溃 class A {public:void PrintA(){cout<<_a<<endl;}void Show(){cout<<"Show()"&…

(自用)learnOpenGL学习总结-高级OpenGL-模板测试

模板测试 模板测试简单来说就是一个mask&#xff0c;根据你的mask来保留或者丢弃片段。 那么可以用来显示什么功能呢&#xff1f;剪切&#xff0c;镂空、透明度等操作。 和深度缓冲的关系是&#xff1a; 先片段着色器&#xff0c;然后进入深度测试&#xff0c;最后加入模板测…

Linux第37步_解决“Boot interface 6 not supported”之问题

在使用USB OTG将“自己移植的固件”烧写到eMMC中时&#xff0c;串口会输出“Boot interface 6 not supported”&#xff0c;发现很多人踩坑&#xff0c;我也一样。 见下图&#xff1a; 解决办法&#xff1a; 1、打开终端 输入“ls回车”&#xff0c;列出当前目录下所有的文件…

Centos7 升级Docker 至最新版本

卸载旧版本的Docker yum remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-engine 安装需要的软件包 yum install -y yum-utils device-mapper-persistent-data lvm2 添加Docker的yum源 #yu…

防范[myers@airmail.cc].mkp攻击:解密[myers@airmail.cc].mkp勒索病毒的方法

引言&#xff1a; 随着科技的迅猛发展&#xff0c;网络安全问题日益突出&#xff0c;而勒索病毒也成为当前互联网威胁中的一大焦点。其中&#xff0c;[datastorecyberfear.com].mkp [hendersoncock.li].mkp [hudsonLcock.li].mkp[myersairmail.cc].mkp勒索病毒以其强大的加密能…

QT学习日记 | 初始QT

目录 一、创建QT文件 二、目录结构讲解 1、.pro文件 2、源文件与头文件 3、编译运行 4、界面文件 三、梦开始的地方&#xff08;Hello World&#xff01;&#xff09; 1、代码方式 2、拖拽方式 四、Qt中的“容器” 五、Qt的对象树机制 1、对象树的引入 2、对象树…

Java 的文件类的学习总结

目录 一、File 的创建 二、File 类的常用方法 一、File 的创建 二、File 类的常用方法

开始学习第二十五天(番外)

今天分享一下写的小游戏啦 头文件game.h #include<stdio.h> #include<time.h> #include<stdlib.h> #define H 3 #define L 3 void InitBoard(char Board[H][L], int h, int l); void DisplayBoard(char Board[H][L], int h, int l); void playermove(cha…

【开源】基于JAVA+Vue+SpringBoot的智慧家政系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 查询家政服务4.2 新增单条服务订单4.3 新增留言反馈4.4 小程序登录4.5 小程序数据展示 五、免责说明 一、摘要 1.1 项目介绍 基于微信小程序JAVAVueSpringBootMySQL的智慧家政系统&#xff0…

OpenCV-29 自适应阈值二值化

一、引入 在前面的部分我们使用的是全局阈值&#xff0c;整幅图像采用同一个数作为阈值。当时这种方法并不适应于所有情况。尤其是当同一幅图像上的不同部分具有不同的亮度时。这种情况下我们需要采用自适应阈值。此时的阈值时根据图像上的每一个小区域计算与其对应的阈值。因此…

Less-1(sqlmap自动注入攻击)--sqli

环境准备 打开火狐浏览器&#xff0c;进入sqli第一关的页面 工具准备 sqlmap 参数解释 -u URL 指定目标URL进行注入测试。--dataDATA指定POST请求的数据进行注入测试--cookieCOOKIE指定用于身份验证的cookie进行注入测试-p PARAMETER指定要测试的参数--levelLEVEL设置测试的深…

微信小程序开发 逐级选择地区

1.需求 微信小程序开发,逐级选择地区&#xff08;市、区县、街道、社区、网格&#xff09;&#xff0c;选择每一级然后展示下一级数据。 微信小程序逐级选择 2. 完整代码 2.1. 选择界面 2.1.1. selectArea.wxml <text bindtap"selectGrid">{{gridName}}</…

Java技术栈 —— Hadoop入门(二)

Java技术栈 —— Hadoop入门&#xff08;二&#xff09; 一、用MapReduce对统计单词个数1.1 项目流程1.2 可能遇到的问题1.3 代码勘误1.4 总结 一、用MapReduce对统计单词个数 1.1 项目流程 (1) 上传jar包。 (2) 上传words.txt文件。 (3) 用hadoop执行jar包的代码&#xff0c;…

go语言基础之泛型

1.泛型 泛型是一种独立于所使用的特定类型的编写代码的方法。使用泛型可以编写出适用于一组类型中的任何一种的函数和类型。 1.1 为什么需要泛型 func reverse(s []int) []int {l : len(s)r : make([]int, l)for i, e : range s {r[l-i-1] e}return r }fmt.Println(reverse…

ACL、VLAN、NAT笔记

一、ACL ---访问控制列表 1.ACL的作用 1&#xff0c;访问控制&#xff1a;在路由器流量流入或流出的接口上&#xff0c;匹配流量&#xff0c;然后 执行设定好的动作。 ---- permit 允许 , deny 拒绝 2&#xff0c;抓取感兴趣流&#xff1a;ACL可以和其他服务结合使用。ACL只…

Mac安装nvm,安装多个不同版本node,指定node版本

一.安装nvm brew install nvm二。配置文件 touch ~/.zshrc echo export NVM_DIR~/.nvm >> ~/.zshrc echo source $(brew --prefix nvm)/nvm.sh >> ~/.zshrc三.查看安装版本 nvm -vnvm常用命令如下&#xff1a;nvm ls &#xff1a;列出所有已安装的 node 版本nvm…