RabbitMQ讲解与整合

RabbitMq安装

类型概念

租户
RabbitMQ 中有一个概念叫做多租户,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突
交换机
在这里插入图片描述

交换机
属性意义意义
type类型direct默认的直接交换机
根据交换机下队列绑定的routingKey直接匹配
fanout扇形交换机
简单来说就是发布订阅
队列直接绑定在交换机下,统一发布消息
headers头部交换机,通过message header头部信息进行比对
可以根据定义全匹配、部分匹配等规则
topic主题交换机
通过绑定routingKey进行模糊匹配
Durability耐用
(持久化)
durable持久化,数据存放于硬盘
transient瞬态,数据存放于内存
Auto delete自动删除Yes没有绑定队列时自动删除,针对的是曾经有过但后来没有的事物
No不自动删除
Internal内部使用Yes该路由绑定的队列不会被用户消费
No不自动删除

队列

在这里插入图片描述

队列
属性意义意义
type类型Default for virtual host租户配置的默认选项,下列三种其一
默认Classic无需设置
Classic传统的队列类型
数据存储在单个节点上
不具备quorum队列的高可用性和数据保护特性
ps:单机时使用
Quorum高可用性队列
数据会被复制到多个节点
提供更好的数据可靠性和持久性
ps:部署多节点时使用
Stream特殊类型的队列
用于支持事件流处理(event streaming)
具有类似于Kafka的流式处理特性
ps:听说不成熟,暂时用不上
Durability耐用
(持久化)
durable持久化,数据存放于硬盘
transient瞬态,数据存放于内存

参数:

显示参数实际参数作用
Auto expirex-expires设置队列的过期时间,单位为毫秒。当队列在指定时间内未被使用,将会被自动删除
Message TTLx-message-ttl设置队列中消息的过期时间(Time-To-Live),单位为毫秒。消息在队列中存放的时间超过设定的过期时间后会被自动删除
Overflow behaviourx-overflow设置队列溢出行为,可选值为 drop-head(删除最旧的消息)或 reject-publish(拒绝发布新消息)
Single active consumerx-single-active-consumer配置队列是否只允许单个消费者消费消息。当设置了x-single-active-consumer参数时,表示队列只允许有一个消费者活跃地消费消息,其他消费者将被阻塞,直到当前的消费者停止消费或断开连接
Dead letter exchangex-dead-letter-exchange设置队列中的死信消息转发到的交换机名称。当消息成为死信时,将会被转发到指定的交换机
Dead letter routing keyx-dead-letter-routing-key设置死信消息转发时的路由键。死信消息将通过指定的路由键转发到目标交换机
Max lengthx-max-length设置队列的最大长度,即队列中消息的最大数量。当队列中消息数量达到设定的最大长度后,新消息将无法入队
Max length bytesx-max-length-bytes设置队列消息的最大总字节数。当队列中消息的总字节数达到设定的最大值后,新消息将无法入队
Leader locatorx-queue-leader-locator配置队列的领导者(Leader)定位器,集群中使用

SpringBoot整合

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.6.3</version>
</dependency>

配置数据源

spring: 
  rabbitmq:
    addresses: xxx.xxx.xx.xx:5672
    username: admin
    password: xxxxxx
    virtual-host: /

配置交换机和队列

@Component
public class RabbitMqConfig {
	// 定义交换机名称
    public static final String FANOUT_EXCHANGE = "fanout.test";
    @Bean(name = FANOUT_EXCHANGE)
    public FanoutExchange fanoutExchange() {
    	// 交换机类型按需创建,这里用的是Fanout,发布订阅,绑定在该交换机下的队列都会收到消息
    	// 参数2:是否持久化
    	// 参数3:是否自动删除
        return new FanoutExchange(FANOUT_EXCHANGE, true, false);
    }
	
	//  定义队列
    public static final String FANOUT_QUEUE1 = "queue1";
    @Bean(name = FANOUT_QUEUE1)
    public Queue fanoutQueue1() {
    	// 后三个不写也行,这是默认值
    	// 参数2:是否持久化数据到磁盘(防止意外关闭数据丢失)
    	// 参数3:是否具有排他性
    	// 参数4:队列不再使用时是否自动删除
        return new Queue(FANOUT_QUEUE1, true, false, false);
    }
	public static final String FANOUT_QUEUE2 = "queue2";
    @Bean(name = FANOUT_QUEUE2)
    public Queue fanoutQueue2() {
        return new Queue(FANOUT_QUEUE2, true, false, false);
    }
		
    @Bean
    public Binding bindingSimpleQueue1(@Qualifier(FANOUT_QUEUE1) Queue fanoutQueue1,
                                       @Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {
        // 将交换机和队列绑定
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    @Bean
    public Binding bindingSimpleQueue2(@Qualifier(FANOUT_QUEUE2) Queue fanoutQueue2,
                                       @Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {
        // 将交换机和队列绑定
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

测试发一条消息到队列

@SpringBootTest(classes = TemplateApplication.class)
public class RabbitMQTest {

    @Autowired
    RabbitMessagingTemplate rabbitMessagingTemplate;

    @Test
    public void testSent(){
		//指定交换机->指定队列(因为创建的交换机是FanoutExchange,所以绑定该交换机的队列都会收到一条消息)
        rabbitMessagingTemplate.convertAndSend("fanout.test","发送数据到FanoutExchange");
    	// 如果创建队列不绑定交换机和路由键,那么实际上会有默认的交换机和路由键,均为空,直接将消息发送给队列,队列名则和路由键保持一致,仍然可以成功发送消息。
    }
}

测试接收队列消息

写个监听类接收消息:

@Component
public class RabbitMqListenter {
    @RabbitListener(queues = {RabbitMqConfig.FANOUT_QUEUE1,RabbitMqConfig.FANOUT_QUEUE2})
    public void reciveLogAll(String msg) throws Exception {
        System.out.println("消费到数据:" + msg);
    }
}

-------------基础的使用到这里就结束了-------------

拓展事项

rabbitMqPusher

自己封装一个更加方便使用的发送工具,可有可无,其中可以使用RabbitMessagingTemplate和RabbitTemplate,RabbitMessagingTemplate和RabbitTemplate都是Spring AMQP提供的用于与RabbitMQ进行交互的工具类如果只是简单使用,那么RabbitMessagingTemplate就够用了,如果需要更精细的控制,可以选择使用RabbitTemplate

,但它们在使用方式和功能上有一些不同点:

RabbitMessagingTemplate:

RabbitMessagingTemplate是MessagingTemplate的子类,用于在Spring应用程序中发送和接收消息。
它提供了一种更高级别的抽象,使得在Spring框架中更容易使用消息发送和接收的功能。
可以直接与Spring的消息通道(MessageChannel)集成,方便进行消息的发送和接收。

RabbitTemplate:

RabbitTemplate是Spring AMQP提供的用于与RabbitMQ进行交互的核心类,提供了丰富的方法来发送和接收消息。
它是一个强大而灵活的工具,可以直接与RabbitMQ的交互进行细粒度的控制。
可以设置消息的属性、监听发送确认、接收确认等功能,更加灵活地处理消息发送和接收的细节。

package com.template.rabbitmq.producer.impl;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class RabbitMqPusher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     * @param quene   队列名称 或 交换机名称
     * @param message 消息内容
     */
    public void send(String quene, String message) {
        rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).build());
        log.info("发送消息---> quene:{} ---> message:{}", message, quene);
    }

    /**
     * 直接发送消息到队列
     * 超过有效期丢弃,如果队列没有声明x-message-ttl属性则无效
     *
     * @param quene      队列名称
     * @param message    消息内容
     * @param expiration 有效期(毫秒)
     */
    public void send(String quene, String message, Integer expiration) {
        rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());
        log.info("发送消息---> quene:{} ---> message:{} ---> expiration:{}", quene, message, expiration);
    }


    /**
     * 发送消息
     * 超过有效期丢弃,如果队列没有声明x-message-ttl属性则无效
     *
     * @param exchange   交换机名称
     * @param routingKey 路由键
     * @param message    消息内容
     * @param expiration 有效期(毫秒)
     */
    public void send(String exchange, String routingKey, String message, Integer expiration) {
        rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());
        log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{} ---> expiration:{}", exchange, routingKey, message, expiration);
    }

    /**
     * 发送消息
     *
     * @param exchange   交换机名称
     * @param routingKey 路由键
     * @param message    消息内容
     */
    public void send(String exchange, String routingKey, String message) {
        rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).build());
        log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{}", exchange, routingKey, message);
    }


}

在RabbitMQ中,如果队列没有设置过期时间(即没有声明x-message-ttl属性),那么即使在发送消息时设置了消息的过期时间也会失效。消息的过期时间只有在队列设置了过期时间的情况下才会生效。
实测以上列代码的方式直接对消息设置有效期是生效的。

死信队列

和普通队列一样,只不过是对其他队列进行配置,将过期的消息路由到死信队列中。
创建死信交换机和死信路由

	// 配置交换机的文件中继续增加配置
	public static final String DIRECT_GP_DEAD_LETTER_EXCHANGE = "DIRECT_GP_DEAD_LETTER_EXCHANGE";
    public static final String DIRECT_GP_DEAD_LETTER_QUEUE = "DIRECT_GP_DEAD_LETTER_QUEUE";
    @Bean(DIRECT_GP_DEAD_LETTER_EXCHANGE)
    public DirectExchangedirectDeadLetterExchange() {
        return new DirectExchange(DIRECT_GP_DEAD_LETTER_EXCHANGE, true, false, new HashMap<>());
    }
    @Bean(DIRECT_GP_DEAD_LETTER_QUEUE)
    public Queue directDeadLetterQueue() {
        return new Queue(DIRECT_GP_DEAD_LETTER_QUEUE, true, false, false, new HashMap<>());
    }

设置队列消息有效期并绑定死信队列

	@Bean(name = DIRECT_QUEUE1)
    public Queue directQueue1() {
        HashMap<String, Object> headers = new HashMap<>();
        // 配置消息有效期,消息发送到队列10秒后如果未被消费者消费,则过期
        headers.put("x-message-ttl",10000);
        // 配置超期交换机,消息过期后会发送到此交换机
        headers.put("x-dead-letter-exchange",DIRECT_GP_DEAD_LETTER_EXCHANGE);
        // 配置超期routingKey,消息过期后转移消息时指定的routingKey
        headers.put("x-dead-letter-routing-key",DIRECT_GP_DEAD_LETTER_QUEUE);
        // 如果只配置了有效期,未配置交换机和routingKey,则消息会被直接丢弃
        return new Queue(DIRECT_QUEUE1, true, false, false,headers);
    }

配置完成后,尝试向DIRECT_QUEUE1发送一条消息,不启动消费者,10秒后消息会自动转移到死信队列中,可在可视化管理界面进行验证。

延时队列
延时队列场景举例:

预定一个会议室,两个小时后开始,要求提前十分钟通知参会人员进行开会。
如果不使用延时队列,那么就需要不断轮询,查看是否到达需要通知的时间,进行消息通知。

延时队列的实现方式:

死信队列+消息有效期
预定时间到提前十分钟通知中间有110分钟,那么创建一条通知消息,设置有效期110分钟丢入队列,不用消费者去监听,等待消息过期后路由到指定的死信队列,再去消费死信队列中的消息即可。
所以延时队列实际上是一种实现方案,而不是一种特定的队列类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/415438.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

顶会ICLR2024论文Time-LLM:基于大语言模型的时间序列预测

文青松 松鼠AI首席科学家、AI研究院负责人 美国佐治亚理工学院(Georgia Tech)电子与计算机工程博士&#xff0c;人工智能、决策智能和信号处理方向专家&#xff0c;在松鼠AI、阿里、Marvell等公司超10年的技术和管理经验&#xff0c;近100篇文章发表在人工智能相关的顶会与顶刊…

RabbitMQ-消息队列:Federation Exchange、Federation Queue、Shovel

25、Federation Exchange 1、使用它的原因 ​ (broker 北京)&#xff0c;(broker 深圳)彼此之间相距甚远&#xff0c;网络延迟是一个不得不面对的问题。有一个在北京 的业务(Client 北京) 需要连接(broker 北京)&#xff0c;向其中的交换器 exchangeA 发送消息&#xff0c;此…

3G 蜂窝移动通信

4 3G 蜂窝移动通信 第三代 (3G) 蜂窝移动通信系统 -1996 年正式标准名称&#xff1a;IMT-2000。 -工作在 2000 MHz 频段&#xff0c;数据率可达 2000 kbit/s&#xff08;固定站&#xff09;和 384 kbit/s&#xff08;移动站&#xff09;。 -包括中国通信标准化协会 CCSA (C…

Flutter自定义输入框同时出现多种字体颜色

Flutter自定义输入框同时出现多种字体颜色 效果展示基本逻辑代码示例 效果展示 输入框内效果 基本逻辑 主要通过重写TextEditingController中的buildTextSpan方法实现&#xff0c;通过在buildTextSpan中将内容手动切割&#xff08;本人通过正则表达式将#这些话题分割开来&a…

win32com打开带密码excel

简单来说给excel上加密常见的方法有两种 方法一&#xff1a; 直接修改文件属性 这种方法对应的解法是 excel DispatchEx("Excel.Application") # 启动excel excel.Visible visible # 可视化 excel.DisplayAlerts displayalerts # 是否显示警告 wb excel.Wo…

基于springboot实现旅游路线规划系统项目【项目源码+论文说明】

基于springboot实现旅游路线规划系统演示 摘要 随着互联网的飞速发展以及旅游产业的逐渐升温&#xff0c;越来越多人通过互联网获取更多的旅游信息&#xff0c;包括参考旅游文纪等内容。通过参考旅游博主推荐的旅游景点和规划线路&#xff0c;参考计划着自己的旅行&#xff0c…

Git教程-Git的基本使用

Git是一个强大的分布式版本控制系统&#xff0c;它不仅用于跟踪代码的变化&#xff0c;还能够协调多个开发者之间的工作。在软件开发过程中&#xff0c;Git被广泛应用于协作开发、版本管理和代码追踪等方面。以下是一个详细的Git教程&#xff0c;我们将深入探讨Git的基本概念和…

golang 泛型详解

目录 概念 ~int vs .int 常见的用途和错误 结论&#xff1a; 概念 Go 在1.18 中添加了泛型&#xff0c;这样Go 就可以在定义时不定义类型&#xff0c;而是在使用时进行类型的定义&#xff0c;并且还可以在编译期间对参数类型进行校验。Go 目前只支持泛型方法&#xff0c;还…

css通过calc动态计算宽度

max-width: calc(100% - 40px) .m-mj-status-drawing-info-data{ display: inline-block; margin: 10px; min-width: 200px; padding: 10px;border-radius: 10px; background: #ddd;max-width: calc(100% - 40px);word-wrap: break-word;white-space: pre-line;}我开发的chatg…

python+mysql咖啡店推荐系统django+vue

(1).研究的基本内容 系统的角色分为&#xff1a; 1.管理员 2.会员 3.非会员 角色不同&#xff0c;权限也不相同 技术栈 后端&#xff1a;python 前端&#xff1a;vue.jselementui 框架&#xff1a;django/flask Python版本&#xff1a;python3.7 数据库&#xff1a;mysql5.7…

c#/ .net8 香橙派orange pi +SSD1306 oled显示屏 显示中文+英文 实例

本文使用香橙派orangepi pi 3ltsSSD1306 oled显示屏作为例子&#xff0c;其它型号的也是一样使用的 在nuget包中安装 Sang.IoT.SSD1306; 以下两个二选一 SkiaSharp;//在window下运行装这个 SkiaSharp.NativeAssets.Linux.NoDependencies;//在linux下运行一定要装这个 在c# .ne…

李宏毅机器学习入门笔记——第六节

对抗生成式网络&#xff08;GAN&#xff09; 输入一个问题输出不同的答案出来 GAN里面有生成器和鉴别器 不断对抗生成&#xff0c;进行两者的网络 算法步骤 这里输出的结果可以是分类的&#xff0c;也可以是回归的。 两者训练过程&#xff0c;是固定生成器&#xff0c;训练…

主流开发环境都有哪些?主流开发语言都有什么?

目录 一、简介&#xff1a; 二、主流开发环境&#xff1a; 三、主流开发语言&#xff1a; 四、结论&#xff1a; 一、简介&#xff1a; 在现代软件开发领域&#xff0c;选择适合自己需求的开发环境和开发语言至关重要。本文将介绍目前主流的开发环境和开发语言&#xff0c;…

深度学习--神经网络基础

神经网络 人工神经网络&#xff08; Artificial Neural Network &#xff0c; 简写为 ANN &#xff09;也简称为神经网络&#xff08; NN &#xff09;&#xff0c;是一种模仿生物神经网络结构和 功能的计算模型 。人脑可以看做是一个生物神经网络&#xff0c;由众多的 神经元…

国际黄金价格是什么?和黄金价格有何区别?

黄金是世界上最珍贵的贵金属之一&#xff0c;其价值被无数人所垂涎。而国际黄金价格作为市场上的参考指标&#xff0c;直接影响着黄金交易的买卖。那么国际黄金价格到底是什么&#xff0c;与黄金价格又有何区别呢&#xff1f;本文将为您详细解答。 国际黄金价格是指以美元计量的…

部署PhotoMaker通过堆叠 ID 嵌入自定义逼真的人物照片

PhotoMaker只需要一张人脸照片就可以生成不同风格的人物照片&#xff0c;可以快速出图&#xff0c;无需额外的LoRA培训。 安装环境 python 3.10gitVisual Studio 2022 安装依赖库 git clone https://github.com/bmaltais/PhotoMaker.git cd PhotoMaker python -m venv venv…

idea如何建立一个springboot项目

1.打开File -New-Project 2.填写相关信息&#xff0c;Name:### Type:Maven Croup、Artifact、java 版本 注&#xff1a;此时&#xff0c;第一次打开可能会报错&#xff0c;说版本不匹配。注意下方的两个红框&#xff0c;将Server URL的地址改为“https://start.aliyun.com ”…

C#理论 —— 基础语法、数据类型、变量、常量、运算符、三大结构

文章目录 1. 基础语法1.1 标识符命名规则1.2 C# 关键字1.3 C#注释 2. 数据类型2.1 值类型&#xff08;Value types&#xff09;2.2 引用类型&#xff08;Reference types&#xff09;2.2.1 对象&#xff08;Object&#xff09;类型3.2.2 动态&#xff08;Dynamic&#xff09;类…

Vue 环境安装以及项目创建

环境安装 nodejs 安装 下载地址&#xff1a;https://nodejs.org/dist/v18.16.1/ 根据系统类型选择对应安装包&#xff0c;选择安装路径那个后一直下一步即可安装完成。 配置npm 代理镜像,设置为淘宝的镜像地址&#xff08;后面按照依赖可以加速下载安装包&#xff09; npm c…

Java介绍

计算机语言历史 1、软件的分类 软件从架构上分类&#xff1a; C/S(Client/Server)&#xff1a;基于客户端和服务器 B/S(Browser/Server)&#xff1a;基于浏览器和服务器 如何区分&#xff1a;如果使用时要安装则为C/S架构的&#xff0c;如果使用时用浏览器打开则为B/S架构 由于…