MQ,RabbitMQ,SpringAMQP的原理与实操

MQ

同步通信

image-20240202103233412

image-20240202105021949

image-20240202105123170

异步通信

image-20240202111930461

事件驱动优势:

  • 服务解耦

  • 性能提升,吞吐量提高

    image-20240202141023030

  • 服务没有强依赖,不担心级联失败问题

    image-20240202141137606

  • 流量消峰

    image-20240202141435355

​ 小结: 大多情况对时效性要求较高,所有大多数时间用同步。而如果不需要对方的结果,且吞吐量,并发量较高则需要使用异步通信

image-20240202141921703

MQ常见框架

MQ(MessageQueue),消息队列,字面来看就是存放消息的队列,也就是事件驱动架构中的Broker

消息:就是事件,比如支付成功了这个事件,在MQ中就是一个消息

image-20240202144211395

RabbitMQ,RocketMQ 适合处理业务(若需要优化定制则选Rocket,因为用Java写的)

Kafka 适合处理日志(海量数据且对数据安全性要求不高的场景),ActiveMQ用的较少

RabbitMQ

RabbitMQ概述与安装

RabbitMQ是基于Erlang语言(面向并发的语言,天生为分布式系统而设计的)开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

参考课前资料(链接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取码:1468) 来安装RabbitMQ

image-20240202144811905

之后在浏览器输入:http://192.168.83.130:15672/ 进入RabbitMQ管理页面,按docker run中设置的账号密码进行登录

结果如下

image-20240204101726227

mq整体架构

image-20240204103735587

小结

image-20240204103835121

常见消息模型

image-20240204105108372

HelloWorld 案例

image-20240204105310538

动手实践

案例: 完成官方Demo中的hello world案例(链接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取码:1468)

image-20240204105416259

打开项目,将ip调成自己的rabbitmq使用虚拟机(或电脑)的ip,再运行一次PublisherTest中的 testSendMessage() 方法

发送一条消息。再运行ConsumerTest 中main方法来接收消息。

image-20240204112803673

小结

image-20240204135103572

SpringAMQP

AMOP(Advanced Message Queuing Protocol)高级消息队列协议,大大简化消息发送和接收的代码量,且与语言无关

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

image-20240204145954201

image-20240204140927498

AMQP依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>    
	<groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置文件中添加mq连接信息

spring:
  rabbitmq:
   	host: 192.168.83.130 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机 
    username: itcast # 用户名
    password: 123321 # 密码

Basic Queue 简单队列模型

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

流程如下:

1.在父工程中引入spring-amqp的依赖,以及在publisher服务中编写配置

2.在publisher服务中利用RabbitTemplate的convertAndSend方法,发送消息到simple.queue这个队列

image-20240204145734357

SpringAMQP发送消息步骤:引入依赖和设置配置---->利用RabbitTemplate的convertAndSend方法

3.在consumer中编写代码,接收消息

image-20240204151638720

SpringAMQP接收消息步骤:引入依赖和设置配置—》定义类,添加Component注解,类中声明方法添加@RabbitListener注解

Work Queue 工作队列模型

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积

比如队列 一秒来50条消息 一个消费者一秒处理40条消息,那么需要两个消费者才能使得队列中消息被处理不丢失

image-20240204153355750

案例:实现一个队列绑定多个消费者

image-20240204153947098

问题:rabbitMQ消息预取,会将50条消息平均分给消费者1和消费者2,但消费者2处理速度慢,因此在1s内处理不完publisher发过来的50条消息

解决方案:让能者多劳,设置preFetch,控制预取消息的上限

image-20240204160513742

小结image-20240204161439493

发布、订阅模型-Fanout

image-20240204161952605

注意:exchange负责消息路由,而不是存储(queue负责存储),路由失败则消息丢失

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue(广播)

案例:利用SpringAMQP演示FanoutExchange的使用

image-20240204163804072

step1 在consumer服务中声明Exchange、Queue、Binding(绑定关系)

image-20240204163828992

image-20240204164305716

step2 在consumer服务声明两个消费者

在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
	System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2") 
public void listenFanoutQueue2(String msg) {
	System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

step3 在publisher服务发送消息到FanoutExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {
	// 队列名称  
	String exchangeName = "itcast.fanout"; 
	// 消息
	String message = "hello, everyone!";
	// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息		
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

小结

image-20240205092228233

发布、订阅模型-Direct

image-20240205092356181

案例:利用SpringAMQP演示DirectExchange的使用

image-20240205092544599

步骤一 在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,

2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
        key = {"red","blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}


@RabbitListener(bindings = @QueueBinding(
       value = @Queue(name = "direct.queue2"),
       exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
        key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}

步骤二 在publisher服务发送消息到DirectExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testDirectExchange() {
    //交换机名字
    String exchangeName = "itcast.direct";
    //消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    //发送消息,参数依次为:交换机名称,RoutingKey,消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

从blue->yellow->red 运行三次,得到结果如下

image-20240205104021565

小结

image-20240205104321850

发布、订阅模型-Topic

image-20240205104559605

案例 利用SpringAMQP演示TopicExchange的使用

image-20240205104825731

步骤一:在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,

2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消费者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}


@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消费者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}

步骤二:在publisher服务发送消息到TopicExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testTopicExchange() {
    //交换机名字
    String exchangeName = "itcast.topic";
    //消息
    String message = "喜报!孙悟空大战哥斯拉,胜!";
    //发送消息,参数依次为:交换机名称,RoutingKey,消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

小结

image-20240205105655795

消息转化器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

​ 在publisher服务引入依赖

<dependency>   
	<groupId>com.fasterxml.jackson.core</groupId>   
	<artifactId>jackson-databind</artifactId>
</dependency>

​ 在publisher服务声明MessageConverter。(原本应该放到配置类中,但启动类也是配置类,所以可以放启动类中)

@Bean
public MessageConverter jsonMessageConverter(){
	return new Jackson2JsonMessageConverter(); 
}

image-20240205111950238

案例 测试发送Object类型消息

image-20240205111336946

结果如下(没有更改JDK序列化方式)

image-20240205111231469

使用json序列化器之后

image-20240205111303797

consumer接收消息过程

step1:加jackson依赖,依赖上面已经放父工程中,就不用做了

step2: 将pulisher中相同的MessageConverter放入consumer 启动类中(发送方与接收方必须相同)

@Bean
public MessageConverter jsonMessageConverter(){
	return new Jackson2JsonMessageConverter(); 
}

step3: 定义一个消费者,监听object.queue队列并消费消息

 @RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg){
    System.out.println("消费者........接收到对象消息:【" + msg + "】" + LocalTime.now());
}

image-20240205135854654

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/372428.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Zookeeper相关面试准备问题

Zookeeper介绍 Zookeeper从设计模式角度来理解&#xff0c;是一个基于观察者模式设计的分布式服务管理框架&#xff0c;它负责存储和管理大家都关心的数据&#xff0c;然后接受观察者的注册&#xff0c;一旦这些数据的状态发生了变化&#xff0c;Zookeeper就负责通知已经在Zoo…

SpringAOP+SpringBoot事务管理

项目搭建SpringAOPSpringBoot中管理事务AOP案例实战-日志记录日志系统 一、项目搭建 第一步&#xff1a;构建项目 第二步&#xff1a;导入依赖 第三步&#xff1a;配置信息 自动配置&#xff08;项目自动生成的启动类&#xff09; /*** 启动类&#xff1a;申明当前类是一个…

模拟被观察物体的位置和方向

开发环境&#xff1a; Windows 11 家庭中文版Microsoft Visual Studio Community 2019VTK-9.3.0.rc0vtk-example demo解决问题&#xff1a;模拟被观察物体的位置和方向&#xff0c;以帮助用户理解相机在观察特定对象时的位置和朝向。vtkCameraOrientationWidget 模拟的是被观察…

Redis核心技术与实战【学习笔记】 - 17.Redis 缓存异常:缓存雪崩、击穿、穿透

概述 Redis 的缓存异常问题&#xff0c;除了数据不一致问题外&#xff0c;还会面临其他三个问题&#xff0c;分别是缓存雪崩、缓存击穿、缓存穿透。这三个问题&#xff0c;一旦发生&#xff0c;会导致大量的请求积压到数据库。若并发量很大&#xff0c;就会导致数据库宕机或故…

九州金榜|如何做好家庭教育

孩子的家庭教育是每个家庭都要做的&#xff0c;也是每个家长面临的事情&#xff0c;同样不同的家庭教育教育出来的孩子性格也各不相同&#xff0c;有时候家长看别别人家的孩子品学兼优非常羡慕&#xff0c;很多家长会把问题归结到孩子身上&#xff0c;其实有没有想过是家庭教育…

C++之函数重载,默认参数,bool类型,inline函数,异常安全

函数重载 在实际开发中&#xff0c;有时候需要实现几个功能类似的函数&#xff0c;只是细节有所不同。如交换两个变量的值&#xff0c;但这两种变量可以有多种类型&#xff0c;short, int, float等。在C语言中&#xff0c;必须要设计出不同名的函数&#xff0c;其原型类似于&am…

【新书推荐】6.2节 段寄存器

在16位汇编语言的源程序中&#xff0c;我们将源程序按照不同的功能和作用划分为若干个逻辑段&#xff0c;如数据段用来存储数据&#xff0c;代码段用来存储代码&#xff0c;堆栈段用来保存临时数据&#xff0c;附加段用来拷贝数据。我们可以把汇编语言的源程序抽象地理解为数据…

TCP与UDP:传输层协议的差异与选择

在计算机网络中&#xff0c;传输控制协议&#xff08;TCP&#xff09;和用户数据报协议&#xff08;UDP&#xff09;是两种常用的传输层协议。然而&#xff0c;随着互联网的快速发展&#xff0c;传统的TCP和UDP在某些场景下存在一些限制。为了解决这些问题&#xff0c;出现了新…

如何使用VS Code编写小游戏并实现公网游玩本地游戏【内网穿透】

文章目录 前言1. 编写MENJA小游戏2. 安装cpolar内网穿透3. 配置MENJA小游戏公网访问地址4. 实现公网访问MENJA小游戏5. 固定MENJA小游戏公网地址 前言 本篇教程&#xff0c;我们将通过VS Code实现远程开发MENJA小游戏&#xff0c;并通过cpolar内网穿透发布到公网&#xff0c;分…

No matching client found for package name ‘com.unity3d.player‘

2024年2月5日更新 必须使用Unity方式接入Unity项目&#xff01;一句话解决所有问题。&#xff08;真的别玩Android方式&#xff09; 大致这问题出现原因是我在Unity采用了Android方式接入Firebase&#xff0c;而Android接入实际上和Unity接入方式有配置上的不一样&#xff0c;我…

爬虫工作量由小到大的思维转变---<第四十五章 Scrapyd 关于gerapy遇到问题>

前言: 本章主要是解决一些gerapy遇到的问题,会持续更新这篇! 正文: 问题1: 1400 - build.py - gerapy.server.core.build - 78 - build - error occurred (1, [E:\\项目文件名\\venv\\Scripts\\python.exe, setup.py, clean, -a, bdist_uberegg, -d, C:\\Users\\Administrat…

链表经典算法(+OJ刷题)

文章目录 前言一、移除链表元素二、链表的中间节点三.反转链表四.合并两个有序链表五.分割链表六.环形链表的约瑟夫问题总结 创作不易&#xff0c;点赞收藏一下呗&#xff01;&#xff01;&#xff01; 前言 在上一节&#xff0c;我们介绍了单链表的增&#xff0c;删&#xff…

机器学习基础、数学统计学概念、模型基础技术名词及相关代码个人举例

1.机器学习基础 &#xff08;1&#xff09;机器学习概述 机器学习是一种人工智能&#xff08;AI&#xff09;的分支&#xff0c;通过使用统计学和计算机科学的技术&#xff0c;使计算机能够从数据中学习并自动改进性能&#xff0c;而无需进行明确的编程。它涉及构建和训练机器…

用Python实现MD5加密

用Python实现MD5加密 用Python实现MD5加密时用到的是hashlib模块&#xff0c;可以通过hashlib标准库使用 多种Hash算法&#xff0c;如SHA1 、SHA224 、SHA256 、SHA384 、SHA512和MD5算法 等。下面是通过调用hashlib模块对字符串进行MD5加密的简单实例&#xff1a; from hash…

[UI5 常用控件] 06.Splitter,ResponsiveSplitter

文章目录 前言1. Splitter1.1 属性 2. ResponsiveSplitter 前言 本章节记录常用控件Splitter,ResponsiveSplitter。主要功能是分割画面布局。 其路径分别是&#xff1a; sap.ui.layout.Splittersap.ui.layout.ResponsiveSplitter 1. Splitter 1.1 属性 orientation &#x…

DBeaver连接达梦数据库

1、下载驱动文件 可官网下载Hibernate 框架 | 达梦技术文档 (dameng.com) 1. 打开DBeaver软件&#xff0c;点击“数据库”&#xff0c;选择“驱动管理器” 2. 点击“新建”进行达人大金仓驱动管理器配置。 3、创建驱动-设置&#xff1a;驱动名称、类名、url 驱动名称&#…

(2017|ICLR,EBGAN,AE 鉴别器,正则化)基于能量的 GAN

Energy-based Generative Adversarial Network 公和众和号&#xff1a;EDPJ&#xff08;进 Q 交流群&#xff1a;922230617 或加 VX&#xff1a;CV_EDPJ 进 V 交流群&#xff09; 目录 0. 摘要 2. EBGAN 模型 2.1 目标函数 2.2 解决方案的最优解 2.3 使用自动编码器 2.…

Android Button background 失效

问题 Android Button background 失效 详细问题 笔者开发Android项目&#xff0c;期望按照 android:background中所要求的颜色展示。 实际显示按照Android 默认颜色展示 解决方案 将xml的Button 组件修改为<android.widget.Button> 即将代码 <Buttonandroid:l…

RCS-YOLO复现

复现结果–Precision&#xff1a;0.941&#xff0c;Recall&#xff1a;0.945&#xff0c;AP 50 _{50} 50​&#xff1a;0.941&#xff0c;AP 50 : 95 _{50:95} 50:95​&#xff1a;0.693&#xff0c;误差在5个点内&#xff0c;可以接受 感想 第5篇完全复现的论文

Facebook与全球文化:多元化视角下的社交体验

在数字时代的今天&#xff0c;Facebook如一座横跨全球的桥梁&#xff0c;将人们从世界各地连接在一起。这个社交媒体平台已经不仅仅是一个在线社交的工具&#xff0c;更是一个全球化时代的文化交汇点。本文将深入研究Facebook在全球文化中的作用&#xff0c;以及它如何在多元文…