SpringBoot RabbitMQ收发消息、配置及原理

今天分析SpringBoot通过自动配置集成RabbitMQ的原理以及使用。

AMQP概念

RabbitMQ是基于AMQP协议的message broker,所以我们首先要对AMQP做一个简单的了解。

AMQP (Advanced Message Queuing Protocol) is a messaging protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.

AMQP是Advanced Message Queuing Protocol的缩写,是一个消息收发客户端与消息中间件都要遵守的消息通讯协议。

AMQP消息模型如下图:
在这里插入图片描述
消息有发布者发送给Exchange,Exchange通过绑定路由规则路由该消息到消息队列中,之后消息中间件投递该消息给指定的消费者、或者由消费者按需拉取/消费消息。

消息丢失问题:网络原因、或者消费者消费逻辑的原因均可能导致消息发送(接收)失败,AMQP通过消息确认(message acknowledgements)机制来处理消息发送失败:消息送达消费者后,消费者需发送确认消息通知broker。启用消息确认机制后,broker必须在收到确认之后才能从队列中移除消息,确保消息不丢失。

消息无法送达问题:消息无法送达消费者的情况下,该消息会根据参数设置被broker返回给消息发送者、直接丢弃、或者丢入“死信”队列。

Exchange Type

AMQP消息发送给Exchange,Exchange接收到消息后路由给0个或多个队列,路由规则依赖于Exchange Type。

AMQP包含如下Exange Type:

  1. Direct exchange:路由匹配后直接投递
  2. Fanout exchange:消息投递给绑定到当前Exchange的所有队列(不需要匹配)。
  3. Topic exchange:根据订阅主题投递消息。
  4. Headers exchange:路由规则定义在消息头信息中,根据头信息匹配投递。

队列

队列是AMQP中存储消息的地方,队列具有如下属性:

  1. Name:队列名称。每一个队列都有名称,不指定名称则broker会自动生成一个名称。"amq."开头的队列名称是AMQP的保留名称,不允许应用层使用。
  2. Durable (the queue will survive a broker restart):存活周期。队列创建的时候可以被声明为持久队列或临时队列,持久队列的元数据存储在硬盘、而临时队列存储在内存。
  3. Exclusive (used by only one connection and the queue will be deleted when that connection closes):独占队列,连接关闭后队列删除。
  4. Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes):队列必须绑定至少一个消费者,没有消费者绑定到给队列则队列自动删除。
  5. Arguments (optional; used by plugins and broker-specific features such as message TTL, queue length limit, etc):其他消息参数。

队列使用之前必须首先声明(通过声明指定消息队列参数),声明队列等同于创建队列,声明队列时如果队列已经存在,则:声明的参数与已存在队列参数相同,对当前已存在队列没有任何影响,如果声明的参数与已存在队列参数不同,抛出异常。

绑定

绑定指的是Exchange路由消息到队列的规则。队列首先必须绑定到Exchange之后,Exchange才能发送消息到该队列。routing key是绑定的可选属性,特定Exchange(Direct exchange)发送消息到队列时会根据routing key进行匹配。

消费者

消费者通过以下两种方式从队列中消费消息:

  1. Push方式:消息主动投递给消费者(推荐方式)。
  2. Poll方式:消费者轮询获取(性能原因,不推荐)。

((Consumer) Delivery Acknowledgements)投递确认

由于网络原因,或者消费者应用在消息消费过程中发生错误导致消息消费失败------均可导致消息丢失。

为了尽可能避免消息丢失,AMQP提供了两种消息确认机制:

  1. 自动确认模式:broker发送消息之后确认。
  2. 显式确认:消费者反馈确认消息。

显式确认模式下,由消费者决定确认消息的反馈时机:收到消息后立即反馈、或者消息持久化之后反馈、或者消息完全被消费(消费业务处理完成)之后反馈。

如果消费者一直不给broker发送确认信息,broker会重新投递消息给其他消费者、如果当前没有其他消费者注册到当前队列的话,broker会一直保存该消息直到有新的消费者注册上来之后发送。

拒信

消费者可给broker返回拒绝消息指令,以告知broker消费者处理消息失败。根据消费者的指令,broker可以丢弃该消息、或者重新进入队列等待再次消费。进入队列等待再次消费的情况下,消费者需注意避免造成该消息的循环投递。

Prefetching Messages

多个消费者共享同一队列的场景下,broker可以控制每一消费者最多可以并发处理的消息数量(消费者返回消息确认前brokenr最多投递到该消费者的消息数量)。这种机制其实是一种简单的load balance,一个消费者如果长时间不返回消息确认,极有可能说明该消费者的消费能力出现问题。

Connections

与数据库连接类似,AMQP客户端(生产者或者消费者)通过connections连接broker、并通过connection收发消息。

Channels

多数场景下,客户端都需要创建多个连接以便并发处理消息,但是同时创建多个TCP连接在性能和网络管理上都很不划算,因此,AMQP通过channels多路复用TCP连接:一个连接可以包含多个Channels,所有的客户端操作都依赖于Channels执行,每一个Channels都独立运行互不影响。

RabbitMQ

RabbitMQ is the most widely deployed open source message broker.

RabbitMQ是基于AMQP协议的、Erlang开发的被广泛部署的开源消息队列。

有关RabbitMQ的详细特性本文不展开讨论。

RabbitMQ的安装

RabbitMQ基于Erlang,所以安装RabbitMQ之前,你必须已经安装好了Erlang。不同版本的RabbitMQ对Erlang的版本也有不同的要求,详情参见https://www.rabbitmq.com/which-erlang.html。

根据下载安装Erlang/OTP 25.3.2。

之后从RabbitMQ官网(https://www.rabbitmq.com/install-windows.html)下载安装RabbitMQ(rabbitmq-server-3.12.12.exe)。

SpringBoot项目配置RabbitMQ

RabbitMQ is a lightweight, reliable, scalable, and portable message broker based on the AMQP protocol. Spring uses RabbitMQ to communicate through the AMQP protocol.

Spring使用RabbitMQ作为AMQP协议下的消息队列broker,SpringBoot提供了对RabbitMQ的基于autoConfiguration的支持。

pom文件引入RabbitMQ

	<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

yml文件配置

application.yml文件中配置RabbitMQ:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true
    #消费方消息确认
    listener:
      simple:
        #确认方式:手动确认
        acknowledge-mode: manual
        # 拒信不重新入队列
        default-requeue-rejected: false
        retry:
          enabled: true #监听重试是否可用
          max-attempts: 5 #最大重试次数,默认为3
    	  retryTime: 120000 # 重试间隔时间(毫秒)

发送消息

创建一个RabbitMQ消息发送服务并注入到IoC容器中:

@Component
public class RabbitMQService {
    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public RabbitMQService(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }
    public void sendMessage(String message){
        amqpTemplate.convertAndSend(message);
    }
}

消息接收

使用@RabbitListener注解接收来自于RabbitMQ的消息:

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        log.info("recieve message from rabbitMQ:"+content);
    }

需要特别注意:

By default, if retries are disabled and the listener throws an exception, the delivery is retried indefinitely. You can modify this behavior in two ways: Set the defaultRequeueRejected property to false so that zero re-deliveries are attempted or throw an AmqpRejectAndDontRequeueException to signal the message should be rejected. The latter is the mechanism used when retries are enabled and the maximum number of delivery attempts is reached.

默认的重试被关闭的情况下,如果消息监听器抛出异常、发送方将会无限重试。我们可以通过如下两种方式避免这种情况:设置defaultRequeueRejected参数为false,因此失败消息不会重新进入队列。或者抛出AmqpRejectAndDontRequeueException异常表明消息应当被丢弃。第二种机制也是重试机制打开后、重试次数达到上限后采用的机制。

RabbitMQ涉及的内容比较多,详情后补。

春节愉快,开工大吉!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/390495.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

476. Number Complement(数字的补数)

问题描述 对整数的二进制表示取反&#xff08;0 变 1 &#xff0c;1 变 0&#xff09;后&#xff0c;再转换为十进制表示&#xff0c;可以得到这个整数的补数。 例如&#xff0c;整数 5 的二进制表示是 “101” &#xff0c;取反后得到 “010” &#xff0c;再转回十进制表示…

JavaSE-03笔记【继承~super】

文章目录 1. 继承1.1 继承概述&#xff08;理解&#xff09;1.2 如何继承&#xff08;掌握&#xff09;1.2.1 继承的语法格式1.2.2 具体举例 1.3 继承的相关特性&#xff08;掌握&#xff09;1.4 对继承自Object类的方法的测试&#xff08;理解&#xff09;1.5 难点解惑1.5.1 掌…

07-k8s中secret资源02-玩转secret

一、回顾secret资源的简单实用 第一步&#xff1a;将想要的数据信息【key&#xff1a;value】中的value值&#xff0c;使用base64编码后&#xff0c;写入secret资源清单中&#xff1b; 第二步&#xff1a;创建secret资源&#xff1b; 第三步&#xff1a;pod资源引用secret资源&…

【Linux网络编程六】服务器守护进程化Daemon

【Linux网络编程六】服务器守护进程化Daemon 一.背景知识&#xff1a;前台与后台二.相关操作三.Linux的进程间关系四.自成会话五.守护进程四步骤六.服务器守护进程化 一.背景知识&#xff1a;前台与后台 核心知识就是一个用户在启动Linux时&#xff0c;都会给一个session会话&a…

最小生成树(Kruskal算法及相关例题)

1.Kruskal算法概念以及基本思路 &#xff08;1&#xff09;概念&#xff1a; 克鲁斯卡尔算法是求连通网的最小生成树的另一种方法。它的时间复杂度为O&#xff08;ElogE&#xff09;(E是图G的边的总数)&#xff0c;适合于求边稀疏的网的最小生成树 。 其基本思想是&#xff…

JDBC访问数据库

目录 加载Driver驱动配置驱动地址 获取数据库连接创建会话-SQL命令发送器通过Statement发送SQL命令并得到结果处理结果关闭数据库资源测试 加载Driver驱动 1.在模块JDBC中新建一个lib目录文件 2. 将mysql-connector-j-8.2.0包粘贴至lib目录中。 配置驱动地址 // 加载…

Nvidia 携手 RTX 推出的本地运行 AI 聊天机器人

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

人工智能学习与实训笔记(三):神经网络之目标检测问题

目录 五、目标检测问题 5.1 目标检测基础概念 5.1.1 边界框&#xff08;bounding box&#xff09; 5.1.2 锚框&#xff08;Anchor box&#xff09; 5.1.3 交并比 5.2 单阶段目标检测模型YOLOv3 5.2.1 YOLOv3模型设计思想 5.2.2 YOLOv3模型训练过程 5.2.3 如何建立输出…

uni-app 经验分享,从入门到离职(二)—— tabBar 底部导航栏实战基础篇

文章目录 &#x1f4cb;前言⏬关于专栏 &#x1f3af;关于小程序 tabbar 的一些知识&#x1f3af;创建一个基本的 tabBar&#x1f4dd;最后 &#x1f4cb;前言 这篇文章的内容主题是关于小程序的 tabBar 底部导航栏的入门使用和实战技巧。通过上一篇文章的基础&#xff0c;我们…

【sgCreateTableColumn】自定义小工具:敏捷开发→自动化生成表格列html代码(表格列生成工具)[基于el-table-column]

源码 <template><!-- 前往https://blog.csdn.net/qq_37860634/article/details/136126479 查看使用说明 --><div :class"$options.name"><div class"sg-head">表格列生成工具</div><div class"sg-container"…

C++,stl,常用排序算法,常用拷贝和替换算法

目录 1.常用排序算法 sort random_shuffle merge reverse 2.常用拷贝和替换算法 copy replace replace_if swap 1.常用排序算法 sort 默认从小到大排序 #include<bits/stdc.h> using namespace std;int main() {vector<int> v;v.push_back(1);v.push_ba…

RabbitMQ如何保证可靠

0. RabbitMQ不可靠原因 消息从生产者到消费者的每一步都可能导致消息丢失&#xff1a; 发送消息时丢失&#xff1a; 生产者发送消息时连接MQ失败生产者发送消息到达MQ后未找到Exchange生产者发送消息到达MQ的Exchange后&#xff0c;未找到合适的Queue消息到达MQ后&#xff0c;…

idea里微服务依赖在maven能install但不能启动

场景&#xff1a;多个微服务相互依赖&#xff0c;install都没问题&#xff0c;jar包都是正常的&#xff0c;就连jar都能启动&#xff0c;为什么在idea里面项目就是不能启动呢&#xff0c;我是懵逼的 所以解决办法就是&#xff1a; 在设置的编译器里面虚拟机选项添加 -Djps.tr…

第五节 zookeeper集群与分布式锁_2

1.分布式锁概述 1.1 什么是分布式锁 1&#xff09;要介绍分布式锁&#xff0c;首先要提到与分布式锁相对应的是线程锁。 线程锁&#xff1a;主要用来给方法、代码块加锁。当某个方法或代码使用锁&#xff0c;在同一时刻仅有一个线程执行该方法或该代码段。 线程锁只在同一J…

LEETCODE 164. 破解闯关密码

class Solution { public:string crackPassword(vector<int>& password) {vector<string> password_str;for(int i0;i<password.size();i){password_str.push_back(to_string(password[i]));}//希尔排序int gappassword.size()/2;while(gap>0){for(int i…

命令执行讲解和函数

命令执行漏洞简介 命令执行漏洞产生原因 应用未对用户输入做严格得检查过滤&#xff0c;导致用户输入得参数被当成命令来执行 命令执行漏洞的危害 1.继承Web服务程序的权限去执行系统命会或读写文件 2.反弹shell&#xff0c;获得目标服务器的权限 3.进一步内网渗透 远程代…

python----输入输出算数运算

1.格式化输出 如果我们直接打印输出&#xff0c;就是输出变量的值&#xff0c;例如&#xff1a; 如果我们想打印a10就需要格式化字符串&#xff0c;就是使用f进行格式化&#xff0c;如图所示&#xff1b; 2.控制台输入 input执行的时候&#xff0c;就会等待用户进行输入&…

Qlik Sense : 条形图

条形图 “条形图适合比较多个值。维度轴显示所比较的类别条目&#xff0c;度量轴显示每个类别条目的值。” Qlik Sense中的条形图是一种数据可视化工具&#xff0c;用于展示不同类别或维度之间的比较。它通过水平或垂直的条形表示数据&#xff0c;并根据数值的大小进行排序。…

RK3568平台开发系列讲解(存储篇)文件描述符相关系统调用实现

🚀返回专栏总目录 文章目录 一、open 系统调用二、close 系统调用沉淀、分享、成长,让自己和他人都能有所收获!😄 一、open 系统调用 open()系统调用会分配新的文件句柄(file description),用来维护与打开文件相关的元信息(如偏移量、路径、操作方法等),并会给进程…

微信小程序框架阐述

目录 一、框架 响应的数据绑定 页面管理 基础组件 丰富的 API 二、逻辑层 App Service 小程序的生命周期 注册页面 使用 Page 构造器注册页面 在页面中使用 behaviors 使用 Component 构造器构造页面 页面的生命周期 页面路由 页面栈 路由方式 注意事项 模块化…