【RabbitMQ】

一、概念

MQ(消息队列):是指在消息传送过程中保存消息的容器,用于分布式系统之间的通信
生产者:是发送消息的用户应用程序。
队列:是存储消息的缓冲区。
消费者:是接收消息的用户应用程序。

1、优劣势

优势

● 应用解耦:使用MQ使得应用间耦合度降低,提高系统容错性和可维护性
● 异步提速:系统将消息发给MQ之后,就可返回用户信息,后续操作异步执行
● 消峰填谷:并发量高时,将消息全部放到MQ中,限制消费的速度为固定并发量,这样就消掉了高峰期的并发量,这就是消峰;但是因为消息积压,在高峰期过后的一段时间内,消费速度也仍旧保持固定并发量,直到消费完积压的消息,这就是填谷

劣势

● 系统可用性降低:一旦MQ宕机
● 系统复杂度提高:如何保证消息没有重复被消费,消息丢失了怎么办,如何保证消息的顺序等等
● 一致性问题:A系统同时给B、C、D发送消息,BC成功,D失败,如何保证数据一致性

2、工作模式

  1. 简单工作模式:一个生产者对应一个消费者
    在这里插入图片描述

  2. 工作队列模式(Work Queues):一个生产者对应多个消费者,多个消费者之间属于竞争关系,当任务比较重时,可以提高处理速度
    在这里插入图片描述

  3. 订阅模式(Publish/Subscribe):一个生产者对应多个消费者,多个消费者之间不是竞争关系,在这种模式中引入交换机的概念,交换机类型为fanout
    交换机:接收生产者的消息,并将消息推送给队列;交换机必须知道要如何处理他接收到的消息,类型如下:
    ○ direct:定向,将消息交给符合指定routing key的队列
    ○ topic:通配符,将消息交给符合制定routing pattern的队列
    ○ headers:参数匹配
    ○ fanout:广播,将收到的所有消息广播到它知道的所有队列。发送消息时不需要指定routing key
    在这里插入图片描述

  4. 路由模式(Routing):需要设计交换机类型为 direct,交换机和队列进行绑定,并指定通配符方式的routing key,当发送消息到交换机时,交换机会根据routing key将消息发送给队列
    ● 队列与交换机的绑定不再是随意绑定,而是指定要routing key;
    ● 发送方发送消息时,需要指定routing key;
    ● 只有队列的routing key与消息的routing key一致,才能接收到消息
    在这里插入图片描述

  5. 通配符模式(Topics):需要设计交换机类型为Topics,交换机和队列进行绑定,并指定通配符方式的routing key,当发送消息到交换机时,交换机会根据routing key将消息发送给队列
    ● *(星号)只能代替一个词。
    ● # (hash) 可以替代零个或多个单词。
    在这里插入图片描述

二、SpringBoot 整合RabbitMQ

1、引入依赖

implementation 'org.springframework.boot:spring-boot-starter-amqp'

2、配置文件

spring:
  rabbitmq:
    host: 192.168.252.206
    port: 5672
    username: admin
    password: admin

3、配置类

@Configuration
public class RabbitConfig {
    public static String EXCHANGE_NAME = "test_exchange";
    public static String QUEUE_NAME = "test_queue";

    /**
     * 1、交换机
     *
     * @return
     */
    @Bean(name = "testExchange")
    public Exchange exchanger() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    /**
     * 2、队列
     *
     * @return
     */
    @Bean(name = "testQueue")
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 3、绑定交换机和队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}

4、生产者发送消息

@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}

5、消费者接收消息

@Component
public class RabbitMqListener {
    
    @RabbitListener(queues = "test_queue")
    public void testListener(Message message) {
        System.out.println(message);
    }
}

结果如下:

(Body:'hello world!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test.kk, deliveryTag=1, consumerTag=amq.ctag-nYz1fLxW0ezTLEcO3W1rVw, consumerQueue=test_queue])

三、特性

1、消息的可靠投递

RabbitMQ为我们提供了两种控制消息可靠性的模式

confirm 确认模式

1、开启确认模式
spring:
rabbitmq:
publisher-confirm-type: correlated

  • NONE 禁用发布确认模式,是默认值
  • CORRELATED 发布消息成功到交换器后会触发回调方法
  • SIMPLE 经测试有两种效果:
    • 其一效果和 CORRELATED 值一样会触发回调方法,
    • 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

2、设置ConfirmCallback

@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息接收成功!");
            } else {
                System.out.println("消息接收失败!" + cause);
            }
        });
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}

return 退回模式

当消息发送给Exchange时,Exchange路由到Queue失败,才会执行RetureCallback
1、开启回退模式

spring:
  rabbitmq:
  	publisher-returns: true

2、设置RetureCallback
3、设置Exchange消息处理模式

  • 如果消息没有路由到Queue,则丢弃消息
  • 如果消息没有路由到Queue,将消息返回给发送方
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        // 设置交换机失败处理模式,true:返回消息给发送方;默认为false,即丢弃消息
        rabbitTemplate.setMandatory(true);
        // 设置RetureCallback
        rabbitTemplate.setReturnsCallback((returned) -> {
            System.out.println("return 执行了");
            System.out.println(returned);
        });
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test1.kk", "hello world!");
    }
}

2、Consumer ack

1、设置模式

spring:
  rabbitmq:
  	listener:
      direct:
        acknowledge-mode: manual
  • none:自动确认
  • manual:手动确认

2、设置监听器

  • 如果在消费端没有出现异常,就调用basicAck()方法签收消息
  • 如果在消费端出现异常,就调用basicNack()方法拒绝消息,让mq重新发送
@Component
public class AckListener implements ChannelAwareMessageListener {
    @RabbitListener(queues = "test_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理业务逻辑");
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

3、消费端限流

1、这是Consumer ack的模式为手动确认

spring:
  rabbitmq:
  	listener:
      direct:
        acknowledge-mode: manual
		prefetch: 2
  • none:自动确认
  • manual:手动确认
  • prefetch:表示消费端每次从mq中拉取多少条消息,直到手动确认消费完,才会拉取下一条消息

2、设置监听器

@Component
public class AckListener implements ChannelAwareMessageListener {

    @RabbitListener(queues = "test_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(message.getBody().toString());
            System.out.println("处理业务逻辑");
            //channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

4、生产者

class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
        }
    }
}

4、TTL

TTL全程 time to live,也就是存活时间/过期时间;当消息到达存活时间后,还没有被消费,会被清除;RabbitMQ可以对消息设置存活时间也可以对队列设置存活时间
对队列统一设置:是对 x-message-ttl 参数设置
对消息单独设置:是对 expiration 参数设置
如果两者都设置了,以时间短的为准

设置队列的存活时间

1、配置队列,将ttl设置为10秒

@Configuration
public class RabbitConfig {
    public static String EXCHANGE_NAME = "test_exchange";
    public static String QUEUE_NAME = "test_queue";

    /**
     * 1、交换机
     *
     * @return
     */
    @Bean(name = "testExchange")
    public Exchange exchanger() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    /**
     * 2、队列
     *
     * @return
     */
    @Bean(name = "testQueue")
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME).ttl(10000).build();
    }

    /**
     * 3、绑定交换机和队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}

2、发送mq

@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
        }
    }
}

3、去RabbitMQ界面查看,会发现该队列的ready在10秒之后会置0
在这里插入图片描述
在这里插入图片描述

设置消息的存活时间

1、生产者发送MQ:只需在发送消息时加上messagePostProcessor即可

@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!",messagePostProcessor);
    }
}

5、死信队列

消息成为死信的条件:

  • 消息队列长度达到限制
  • 消费者拒收消息:basicNack/basicReject,并且不把消息放回原目标队列:requeue=false
  • 原队列存在消息过期设置,消息达到过期时间未被消费

队列绑定死信交换机

  • x-dead-letter-exchange
  • x-dead-letter-routing-key

1、配置

spring:
  rabbitmq:
    host: 192.168.252.206
    port: 5672
    username: admin
    password: admin

2、配置类

@Configuration
public class RabbitConfig {
    public static String EXCHANGE_NAME = "test_exchange";
    public static String QUEUE_NAME = "test_queue";

    public static String DEAD_EXCHANGE_NAME = "dead_test_exchange";
    public static String DEAD_QUEUE_NAME = "dead_test_queue";

    /**
     * 1、死信交换机
     *
     * @return
     */
    @Bean(name = "deadTestExchange")
    public Exchange deadExchanger() {
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
    }

    /**
     * 2、死信队列
     *
     * @return
     */
    @Bean(name = "deadTestQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
    }

    /**
     * 3、绑定死信交换机和死信队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding bindingDead(@Qualifier(value = "deadTestExchange") Exchange exchange,
                               @Qualifier(value = "deadTestQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.kk.#").noargs();
    }

    /**
     * 1、交换机
     *
     * @return
     */
    @Bean(name = "testExchange")
    public Exchange exchanger() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    /**
     * 2、队列
     *
     * @return
     */
    @Bean(name = "testQueue")
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME)
                .ttl(10000)
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("test.kk")
                .maxLength(3)
                .build();
    }

    /**
     * 3、绑定交换机和队列
     *
     * @param exchange
     * @param queue
     * @return
     */
    @Bean
    public Binding binding(@Qualifier(value = "testExchange") Exchange exchange, @Qualifier(value = "testQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();
    }
}

3、测试类

  • 超时情况
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}
  • 超出长度
@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
        }
    }
}
  • 消费端拒收

生产端

@SpringBootTest
class RabbitConfigTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "test.kk", "hello world!");
    }
}

消费端

@Component
public class DeadListener implements ChannelAwareMessageListener {

    @RabbitListener(queues = "test_queue")
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody()));
            System.out.println("处理业务逻辑");
            int i = 3 / 0;
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, true, false);
        }
    }
}

6、延迟队列

延迟队列:消息进入消费端之后,不会立马被消费,会在指定时间达到后,才会消费。RabbitMQ通过TTL和死信队列实现延迟队列

  • 只需设置队列或者消息过期时间,当消息过期后即可进入死信队列
  • 消费端监听队列要监听死信队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/34500.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(嵌入式)STM32G061C8T6、STM32G061C6T6、STM32G061C8U6 64MHz 64KB/32KB 闪存(MCU)

STM32G0 32位微控制器 (MCU) 适合用于消费、工业和家电领域的应用&#xff0c;并可随时用于物联网 (IoT) 解决方案。这些微控制器具有很高的集成度&#xff0c;基于高性能ARM Cortex-M0 32位RISC内核&#xff0c;工作频率高达64MHz。该器件包含内存保护单元 (MPU)、高速嵌入式内…

mysql 视图

视图&#xff0c;是虚拟存在的表&#xff0c;视图中的数据在数据库中实际不存在&#xff0c;视图只保存查询SQL的逻辑&#xff0c;不保存查询结果 建表sql DROP TABLE IF EXISTS w_dict; CREATE TABLE w_dict (id int(0) NOT NULL AUTO_INCREMENT,label varchar(255) CHARACT…

go开源项目slgserver源码分析

个人博客地址: https://cxx001.gitee.io 前言 项目开源地址&#xff1a;https://github.com/llr104/slgserver 比较适合作为go语言入门学习项目或轻量级游戏项目&#xff0c;整体的项目结构和编码质量还是可以的。不过距离商业项目还是差点意思&#xff0c;如服务负载、容灾这…

弗迪科技携手纷享销客共建CRM系统,数智化升级加速“灯塔工厂”征程

当前&#xff0c;全球新一轮科技革命正和产业升级融合发展&#xff0c;数字化技术成为各行各业升级发展的重要推动力。 自2018年起&#xff0c;世界经济论坛与麦肯锡咨询公司发起“灯塔工厂”项目&#xff0c;全球严选制造业数字化转型典范作为“数字化制造”和“全球化4.0”的…

360测试开发技术面试题目

目录 一、java方面 二、Linux方面 三、数据库方面 四、性能测试方面 五、HTTP协议方面 六、其他 总结&#xff1a; 最近面试了360测试开发的职位&#xff0c;将面试题整理出来分享~ 一、java方面 1、java重载和重写的区别 重载overloading 多个方法、相同的名字&#x…

Nginx负载均衡、虚拟主机

目录 常用的6种负载均衡算法 轮询算法(round robin)默认 权重(weight) 响应时间(fair) 连接数(least_conn) IP_hash url_hash(第三方) 开发优选&#xff1a;一致性哈希 安装步骤&#xff1a; 虚拟主机 常用的6种负载均衡算法 轮询算法(round robin)默认 轮询方式&a…

C#扩展——Visual Studio 代码提示/智能提示字体大小更改方法.

声明&#xff1a;本文为个人笔记&#xff0c;用于学习研究使用非商用&#xff0c;内容为个人研究及综合整理所得&#xff0c;若有违规&#xff0c;请联系&#xff0c;违规必改。 C#扩展——Visual Studio 代码提示/智能提示字体大小更改方法. 文章目录 C#扩展——Visual Studio…

stm32或gd32移植libcanard实现UAVCAN协议

一、源码下载 1、git下载 点击我下载 2、csdn下载 自己上传的点击下载 二、源码移植 我自己是使用rt-thread操作系统移植的。但是不局限与操作系统&#xff0c;裸机也可以。 1、首先将源码加入到工程 2、分别实现一个内存的分配与释放函数&#xff0c;他是一个指针函数&…

基于预测帧的视频异常检测经典论文

16年上海科技的论文&#xff0c;上海科技做这个方向的系大佬多多的。 摘要 受基于稀疏编码的异常检测能力的激励&#xff0c;我们提出了一种时间相干稀疏编码(TSC)&#xff0c;其中我们强制用相似的重构系数对相似的相邻帧进行编码。然后&#xff0c;我们用一种特殊类型的层叠…

【开源工具】使用Whisper提取视频、语音的字幕

这里写目录标题 一、语音转字幕操作步骤1、下载安装包Assets\WhisperDesktop.zip[^2]2、加载模型2.1 下载模型2.1.1 进入Hugging Face[^3]的仓库2.1.2 选择需要下载的模型2.1.3 配置模型路径 3、语音转字幕4、实时语言转录功能 二、相关简介[^1]特点开发人员指南构建说明其他注…

英国 Tortoise Media发布2023年全球AI指数排名;美团宣布完成收购光年之外

&#x1f989; AI新闻 &#x1f680; 美团宣布完成收购光年之外&#xff0c;加强人工智能竞争力 摘要&#xff1a;美团在公告中宣布于2023年6月29日盘后收购光年之外的全部权益&#xff0c;以加强其在快速增长的人工智能行业中的竞争力。光年之外是中国领先的通用人工智能创新…

SpringBoot整合RabbitMQ实现消息延迟队列(含源码)

环境依赖 SpringBoot 3.1.0 JDK 17 前期准备 安装MQ: liunxdockerrabbitmq安装延迟队列插件 实例 实现延迟队列的一种方式是在 RabbitMQ 中使用消息延迟插件&#xff0c;这个插件可以让你在消息发送时设置一个延迟时间&#xff0c;超过这个时间后消息才会被消费者接收到…

【JVM内存模型】—— 每天一点小知识

&#x1f4a7; J V M 内存模型 \color{#FF1493}{JVM内存模型} JVM内存模型&#x1f4a7; &#x1f337; 仰望天空&#xff0c;妳我亦是行人.✨ &#x1f984; 个人主页——微风撞见云的博客&#x1f390; &#x1f433; 《数据结构与算法》专栏的文章图文并茂&#x…

智谱AI-算法实习生(知识图谱方向)实习面试记录

岗位描述 没错和我的经历可以说是match得不能再match了&#xff0c;但是还是挂了hh。 面试内容 给我面试的是唐杰老师的博士生&#xff0c;方向是社交网络数据挖掘&#xff0c;知识图谱。不cue名了&#xff0c;态度很友好的 &#xff0c;很赞。 date&#xff1a;6.28 Q1 自…

【Spark】介绍,部署与快速入门

文章目录 介绍核心模块Spark CoreSpark SQLSpark StreamingSpark MLlibSpark GraphX 部署命令行Web UI提交应用Local 模式Standalone配置文件添加 JAVA_HOME 环境变量和集群对应的 master 节点启动集群配置历史服务添加日志存储路径添加日志配置webui 配置高可用 Yarn模式配置文…

使用npm install -g @vue/cli 命令安装最新的脚手架与Vue版本不匹配的问题

使用npm install -g vue/cli 命令安装最新的脚手架 创建项目时不要选择Vue版本&#xff0c;让它默认选择&#xff08;默认选择 Vue2&#xff09;否则会出现 vue版本和脚手架版本vue-cli 不兼容的问题&#xff08;怪哉&#xff09; 脚手架兼容vue2 不兼容vue3 &#xff1f; 不理…

2023 年 10 大前端发展趋势

新技术的出现和老技术的淘汰让前端开发者们需要不断地学习和更新知识。特别是在经济不好的情况下&#xff0c;是否掌握新的技术很大程度决定着你是否被淘汰。 虽然应用程序试图将网站替代&#xff0c;但前端 Web 开发业务仍在快速变化和增长&#xff0c;前端开发人员的功能并没…

配置Jenkins slave agent(通过jnlp)方式连接

上一章&#xff0c;使用ssh的方式添加了两个agent&#xff0c;并都成功完成了构建任务&#xff0c;这一章使用jnlp的方式配置agent&#xff0c;jnlp方式配置agent有个好处&#xff0c;就是agent是主动去找到Master请求连接的&#xff0c;master->agent的通道可以配置一个age…

Leetcode-每日一题【234.回文链表】

题目 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1]输出&#xff1a;true 示例 2&#xff1a; 输入&#xff1a;head…

要从HTML中提取img标签的src属性(图片链接),可以使用正则表达式方式。

1. 定义提取src属性的正则表达式: const srcRegex /<img\s(?:[^>]*?\s)?src\s*\s*(["])((?:[^\1"]|\\\1|.)*?)\1/g 这个正则会匹配类似<img src"http://example.com/1.jpg">中的src属性和括号中的连接。2. 调用字符串的matchAll()方法…