RabbitMQ个人理解与基本使用

目录

一. 作用:

二. RabbitMQ的5中队列模式:

1. 简单模式

2. Work模式

3. 发布/订阅模式

4. 路由模式

5. 主题模式

三. 消息持久化:

消息过期时间

ACK应答 

四. 同步接收和异步接收:

应用场景

五. 基本使用 :

引入依赖库:

配置文件RabbitMQConfig: 

创建消息任务类: 

解析:


一. 作用:

        RabbitMQ主要用于消息队列的实现。

二. RabbitMQ的5中队列模式:

1. 简单模式

一个生产者(发送方)对应一个消费者(接收方)

2. Work模式

一个生产者对应多个消费者,但是只能有一个消费者获得消息(排他)

3. 发布/订阅模式

一个消费者将消息首先发送到fanout交换器,交换器绑定到多个队列,然后与之对应的所有消费者都能接收到消息(不排他)

4. 路由模式

生产者将消息发送到direct交换器,交换器按照关键字(Key),把消息路由到某个队列

5. 主题模式

生产者将消息发送到Topic交换器,交换器按照复杂的规则,把消息路由到某个队列

三. 消息持久化:

        消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。除了消息持久化之外,甚至交换器和队列都能持久化。也就是说rabbitmq的消息会被存储在磁盘上,只有当消费收到消息,rabbitmq确认消费者收到消息(Acknowledgments--简称ACK)后才会将消息从队列中删除。  

  • 消息过期时间

        如果消费者一直不接收消息,消息会一直保存在消息队列当中,短期内可能不会有什么影响,但是如果经过长时间的积累后,消息会变得很多很多 ,浪费大量的资源,内存。

        为了应对这种情况,就可以对rabbitmq设置消息的过期时间,在规定时间内消息没有被接收,就会删除掉该消息。

  • ACK应答 

        消费者接收到消息后,为了让RabbitMQ 知道,就需要返回一个ACK应答,告诉RabbitMQ消费者已经收到了消息,如果收到消息后我们需要删除该消息,只需要在ACK应答中加上deliveryTag标志位。

四. 同步接收和异步接收:

        同步接收:指消费者调用方法时,会阻塞来等待消息,直到消息被成功消费或者队列为空。(没有消息等待消息再接着处理)。

        异步接收: 指消费者不会在接收消息时阻塞,而是通过回调函数处理消息。消费者在等待消息的同时不会停下,可以处理其他任务。(当有消息时才来处理消息)。

  • 应用场景

        同步接收 :当消息的处理顺序对业务逻辑非常重要,就使用同步接收,消费者一次只处理一个消息,确保了每条消息的处理顺序。

        异步接收:当处理消息的时间比较长,或者系统的并发量大时,采用异步接收会更好。

RabbitMQ还有一个杀手锏——同时使用异步收发和同步收发。

五. 基本使用 :

引入依赖库:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency> 

配置文件RabbitMQConfig: 

import com.rabbitmq.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Value("${rabbitmq.factoryHost}")
    private String host;

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(5672);
        return factory;
    }
}

 host配置,我将rabbitMQ放在虚拟机上的,所有ip是虚拟机的地址:

创建消息任务类: 

@Slf4j
@Component
public class MessageTask {
    @Autowired
    private ConnectionFactory factory;
    @Autowired
    private MessageService messageService;

    /*
    * 同步发送消息
    * */
    public void send(String topic, MessageEntity entity) {
        //向MongoDB保存消息数据,返回消息ID
        String id = messageService.insertMessage(entity);
        //向RabbitMQ发送消息
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()){
            //连接到某个topic
            channel.queueDeclare(topic, true, false, false, null);
            HashMap header = new HashMap();
            header.put("messageId",id);
            //创建AMQP协议参与对象,添加附加属性
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();
            channel.basicPublish("",topic,properties,entity.getMsg().getBytes());
            log.debug("消息发送成功");
        } catch (Exception e){
            log.error(e.getMessage());
            throw new EmosException("向MQ发送消息失败");
        }
    }

    /*
    * 异步发送消息
    * */
    @Async("AsyncTaskExecutor")
    public void sendAsync(String topic, MessageEntity entity) {
        send(topic, entity);
    }

    /*
    * 同步接收消息
    * */
    public int receive(String topic) {
        int i = 0;
        try (//接收消息数据
             Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 从队列中获取消息,不自动确认
            channel.queueDeclare(topic, true, false, false, null);
            //Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环
            while (true) {
                //创建响应接收数据,禁止自动发送Ack应答
                GetResponse response = channel.basicGet(topic, false);
                if (response != null) {
                    AMQP.BasicProperties properties = response.getProps();
                    Map<String, Object> header = properties.getHeaders(); //获取附加属性对象
                    String messageId = header.get("messageId").toString();
                    byte[] body = response.getBody();//获取消息正文
                    String message = new String(body);
                    log.debug("从RabbitMQ接收的消息:" + message);
                    MessageRefEntity entity = new MessageRefEntity();
                    entity.setMessageId(messageId);
                    entity.setReceiverId(Integer.parseInt(topic));
                    entity.setReadFlag(false);
                    entity.setLastFlag(true);
                    messageService.insertRef(entity); //把消息存储在MongoDB中
                    //数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                    i++;
                } else {
                    break; //接收不到消息,则退出死循环
                }
            }
        } catch (Exception e) {
            log.error("执行异常", e);
        }
        return i;
    }

    /*
    * 异步接收消息
    * */
    @Async
    public int receiveAsync(String topic) {
        return receive(topic);
    }

    /*
    * 同步删除消息
    * */
    public void deleteQueue(String topic) {
        try(//接收消息数据
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()){
            channel.queueDelete(topic);
            log.debug("成功删除消息队列:"+topic);
        } catch (Exception e){
            log.error("删除消息队列失败:",e);
            throw new EmosException("删除消息队列失败");
        }
    }

    /*
    * 异步删除消息
    * */
    @Async
    public void deleteAsync(String topic) {
        deleteQueue(topic);
    }
}

解析:

channel.queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
  • queueName:队列的名称,用于标识消息的存储位置。
  • durable:

        true,表示队列是持久化的。

        false,表示队列是非持久化的。

  • exclusive:

        true:队列仅供当前连接使用,连接断开时队列会自动删除。

        false:队列可供多个连接共享。

  • autoDelete:
    true:当队列不再被任何消费者订阅时,队列会自动删除。
    false:队列即使没有消费者订阅也会一直存在,直到手动删除。

  • arguments:额外的参数,null表示没有额外参数

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 60000); // 设置队列中消息的过期时间为 60 秒(60000 毫秒)

channel.queueDeclare("myQueue", true, false, false, arguments);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/936918.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络工程师常用软件之配置对比软件

老王说网络&#xff1a;网络资源共享汇总 https://docs.qq.com/sheet/DWXZiSGxiaVhxYU1F ☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝☝ 我们经常在项目或者运维中对设备的config进行变更&am…

嵌入式驱动开发详解15(电容触摸屏gt9147)

文章目录 前言电容触摸屏特点MT触摸消息电容触摸屏协议电容屏触摸时序Type A 触摸点信息上报时序Type B 触摸点信息上报时序 多点触摸所使用到的API函数 驱动部分驱动框图设备树节点修改设备树引脚配置设备节点配置 具体驱动开发I2C驱动框架I2C框架内部实现 参考文献 前言 随着…

antdv-<a-button>中属性的使用

UI组件库&#xff08;User Interface Component Library&#xff09;是一种预先构建好的、可重用的用户界面元素集合&#xff0c;旨在帮助开发者更快速、更简便地构建用户界面。这些组件通常包括按钮、表单、导航栏、模态框等&#xff0c;能够提供一致的外观和交互风格&#xf…

win服务器的架设、windows server 2012 R2 系统的下载与安装使用

文章目录 windows server 2012 R2 系统的下载与安装使用1 windows server 2012 的下载2 打开 VMware 虚拟机软件&#xff08;1&#xff09;新建虚拟机&#xff08;2&#xff09;设置虚拟机&#xff08;3&#xff09;打开虚拟机 windows server 2012&#xff08;4&#xff09;进…

【ArcGIS微课1000例】0135:自动生成标识码(长度不变,前面自动加0)

文章目录 一、加载实验数据二、BSM计算方法一、加载实验数据 加载专栏《ArcGIS微课实验1000例(附数据)》配套数据中0135.rar中的建筑物数据,如下图所示: 打开属性表,BSM为数据库中要求的字段:以TD_T 1066-2021《不动产登记数据库标准》为例: 计算出来的BSM如下图: 二、B…

康谋方案 | 多源相机数据采集与算法集成测试方案

目录 一、相机组成 二、多源相机采集与测试方案 三、应用案例分享 四、结语 在智能化技术快速发展当下&#xff0c;图像数据的采集与处理逐渐成为自动驾驶、工业等领域的一项关键技术。高质量的图像数据采集与算法集成测试都是确保系统性能和可靠性的关键。随着技术的不断进…

陪玩系统小程序源码/游戏陪玩APP系统用户端有哪些功能?游戏陪玩小程序APP源码开发

多客陪玩系统-游戏陪玩线下预约上门服务等陪玩圈子陪玩社区系统源码 陪玩系统源码&#xff0c;高质量的陪玩系统源码&#xff0c;游戏陪玩APP源码开发&#xff0c;语音陪玩源码搭建: 线上陪玩活动组局与线下家政服务系统的部署需要综合考虑技术选型、开发流程、部署流程、功能实…

运维实战:K8s 上的 Doris 高可用集群最佳实践

今天我们将深入探讨&#xff1a;&#xff1a;如何在 K8s 集群上部署 Compute storage coupled&#xff08;存算耦合&#xff09; 模式的 Doris 高可用集群&#xff1f; 本文&#xff0c;我将为您提供一份全面的实战指南&#xff0c;逐步引导您完成以下关键任务&#xff1a; 配…

从零用java实现 小红书 springboot vue uniapp (2)主页优化

前言 移动端演示 http://8.146.211.120:8081/#/ 前面的文章我们基本完成了主页的布局 今天我们具体的去进行实现 并且分享我开发时遇到的问题 首先先看效果 java仿小红书主页 实现效果为 1.顶端全屏切换 2.上划加载更多 3.下拉当前页整体刷新 顶端全屏切换我们选择 gui-switch…

动手学深度学习-线性神经网络-7softmax回归的简洁实现

目录 初始化模型参数 重新审视Softmax的实现 优化算法 训练 小结 在 线性回归的实现中&#xff0c; 我们发现通过深度学习框架的高级API能够使实现 线性回归变得更加容易。 同样&#xff0c;通过深度学习框架的高级API也能更方便地实现softmax回归模型。 本节如在上一节…

人工智能原理实验四:智能算法与机器学习

一、实验目的 本实验课程是计算机、智能、物联网等专业学生的一门专业课程&#xff0c;通过实验&#xff0c;帮助学生更好地掌握人工智能相关概念、技术、原理、应用等&#xff1b;通过实验提高学生编写实验报告、总结实验结果的能力&#xff1b;使学生对智能程序、智能算法等…

【新界面】基于卷积神经网络的垃圾分类(Matlab)

基于CNN的垃圾识别与分类GUI【新界面】 有需要可直接联系我&#xff0c;基本都在在线&#xff0c;能秒回&#xff01;可加我看演示视频&#xff0c;不懂可以远程教学 1.此项目设计包括两份完整的源代码&#xff0c;有GUI界面的代码和无GUI界面系统的代码。 &#xff08;以下部…

网站访问的基础-HTTP超文本传输协议

BS架构 浏览器Browser⬅➡服务器Server 浏览器和服务器之间通过 IP 地址进行通信&#xff0c;实现数据的请求和传输。 例如&#xff0c;当用户在浏览器中访问一个网站时&#xff0c;浏览器会根据用户输入的网址&#xff08;通过 DNS 解析得到服务器 IP 地址&#xff09;向服…

【C++】递归填充矩阵的理论解析与实现

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;问题描述&#x1f4af;递归实现&#x1f4af;参数解析函数参数详解填充顺序分析递归终止条件 &#x1f4af;示例解析第一层递归第二层递归第三层递归最终输出 &#x1f4af…

Git 仓库托管教程

git远程仓库 常用的远程仓库-->托管服务&#xff1a;github、码云、gitlab等 github需要魔法上网&#xff0c;速度较慢因为在国外且仅仅支持Git&#xff0c;如果不是Git项目是不支持的&#xff1b;码云--gitee国内的代码托管平台&#xff0c;服务器在国内速度快一些&#…

[创业之路-190]:《华为战略管理法-DSTE实战体系》-2-华为DSTE战略管理体系概要

目录 一、DSTE战略管理体系与BLM的关系 1、DSTE战略管理体系概述 2、BLM模型概述 3、DSTE与BLM的关系 二、重新认识流程 1. 流程就是业务本身&#xff0c;流程是业务过程的可视化&#xff1a; 2. 流程是业务最佳路径的经验教训总结&#xff1a; 3. 流程是战略知识资产、…

多智能体架构 Insight-V:针对长链视觉推理瓶颈

多智能体架构 Insight-V&#xff1a;针对长链视觉推理瓶颈 https://arxiv.org/abs/2411.14432 推理智能体与总结智能体协作完成任务&#xff0c;实现复杂视觉任务中的高效推理与总结。其中写了一小段&#xff0c;用迭代 DPO 算法&#xff0c;在每一轮训练中&#xff0c;模型会…

ASP.NET |日常开发中连接Oracle数据库详解

ASP.NET &#xff5c;日常开发中连接Oracle数据库详解 前言一、安装和配置 Oracle 数据访问组件1.1 安装ODP.NET&#xff08;Oracle Data Provider for.NET&#xff09;&#xff1a;1.2 引用相关程序集&#xff1a; 二、配置连接字符串2.1 连接字符串的基本组成部分&#xff1a…

生成树协议STP工作步骤

第一步&#xff1a;选择根桥 优先级比较&#xff1a;首先比较优先级&#xff0c;优先级值越小的是根桥MAC地址比较&#xff1a;如果优先级相同&#xff0c;则比较MAC地址。MAC地址小的是根桥。 MAC地址比较的时候从左往右&#xff0c;一位一位去比 第二步&#xff1a;所有非根…

Redis是什么?Redis和MongoDB的区别在那里?

Redis介绍 Redis&#xff08;Remote Dictionary Server&#xff09;是一个开源的、基于内存的数据结构存储系统&#xff0c;它可以用作数据库、缓存和消息中间件。以下是关于Redis的详细介绍&#xff1a; 一、数据结构支持 字符串&#xff08;String&#xff09; 这是Redis最…