微服务学习|初识MQ、RabbitMQ快速入门、SpringAMQP

初识MQ

同步通讯和异步通讯

同步通讯是实时性质的,就好像你用手机与朋友打视频电话,但是,别人再想与你视频就不行了,异步通讯不要求实时性,就好像你用手机发短信,好多人都能同时给你发短信,你都可以收到,而且不用及时回复。

同步调用的问题

微服务间基于Feign的调用就属于同步方式,存在一些问题

比如用户调用支付服务时,它需要先后调用订单服务、仓储服务、短信服务等,都调用结束后,支付服务再返回用户相关信息,故这个过程的响应时间实际上就是所有这些相关服务执行之后所用时间之和,这样是非常影响效率的。但是也有优点,时效性较强,可以立即得到结果

同步调用存在的问题

1.如果我们想对支付服务增加一些功能,增加一些别的服务,为了让支付功能调用这个新服务,我们需要改动相关的代码

2.调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。就如支付服务必须要等订单服务、仓储服务、短信服务都执行后,才能给出响应,很慢。

3.上述支付服务在等待其余服务响应的时候,资源是不释放的,一直在等待,高并发场景下嫉妒浪费系统资源

4.如果其中一个服务出现问题,例如仓储服务出现问题,则整个响应就无法进行了,就整个崩掉了。

异步调用方案

异步调用常见实现就是事件驱动模式

比如用户发起支付服务,此时会将该消息发给Broker,然后就不用管后面了,支付服务直接给用户响应,不需要管订单、仓储、短信这些服务的执行了,这几个服务提前订阅了这个Broker,所以,支付服务消息来了,会从Broker中获取到消息,然后执行自己的逻辑就好。

优势一:服务解耦

当我们对支付服务增加一些新的功能,新服务时,只需要将该服务订阅这个Broker即可,不需要改动支付服务的代码

优势二:性能提升,吞吐量提高

因为是异步调用,支付服务不需要关注其相关的其他服务执行,只需将支付消息发给Broker即可,然后就能直接给用户响应,非常快,后续相关的服务只需要从Broker中收到该消息后执行相关的业务操作就可。

优势三:服务没有强依赖,不担心级联失败问题

其中一个服务发生故障,整个支付服务不会因为这个服务故障了而停掉,支付服务依然会直接给用户响应。

优势四:流量削峰

因为有Broker的缘故,如果并发量比较大,这些消息都会暂存在Broker中,慢慢的让其余各服务去处理,所以能够很好的将流量削峰。

异步通信的缺点

依赖于Broker的可靠性、安全性、吞吐能力

架构复杂了,业务没有明显的流程线,不好追踪管理

什么是MQ?

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。简单来讲,Kafka对并发场景性能更高,但是没有RabbitMQ安全,所以一般情况下就用RabbitMQ就可,中规中矩。

RabbitMQ快速入门

单机部署

我们在Centos7虚拟机中使用Docker来安装。

方式一:在线拉取

方式二:从本地加载

在课前资料已经提供了镜像包

上传到虚拟机中后,使用命令加载镜像即可

安装MQ

执行下面的命令来运行MQ容器

这里第一个15672端口是用来登录RabbitMQ控制台的,第二个5672端口是用来进行异步调用的。这里也要设置RabbitMQ控制台的登录账号itcast密码123321

用宿主机ip+第一个端口15672来访问RabbitMQ控制台,账号itcast密码123321,登录即可

RabbitMQ的结构和概念

RabbitMQ中的几个概念
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtualhost: 虚拟主机,是对queue、exchange等资源的逻辑分组

常见消息模型

MQ的官方文档中给出了5个MQ的Demo示例,对应了几种不同的用法

HelloWorld案例

官方的Helloworld是基于最基础的消息队列模型来实现的,只包括三个角色:
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接受并缓存消息
consumer:订阅队列,处理队列中的消息

编写publisher消息发布者代码,新建一个连接工厂,配置本机RabbitMQ的异步调用端口,以及账号密码,然后建立一个连接,connection对象。

连接对象connection生成后,我们可以在RabbitMQ的控制台,看到了这个新生成的连接

然后用这个connection连接对象创建一个通道channel对象

执行完后,控制台也能够看到这个新建的通道

然后用channel对象新建一个名为simple.queue的队列,并给这个队列发消息

可以在控制台看到这个新创建的名为simple.queue的队列,并且队列中已经有了我们刚发送的消息

订阅该队列的consumer编写,前三步与publisher一致

最后一步变成了利用channel将消费者与队列绑定,在handleDelivery()中定义consumer的消费行为

SpringAMQP

什么是SpringAMQP

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

流程如下:
1.在父工程中引入spring-amqp的依赖

2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

3.在consumer服务中编写消费逻辑,绑定simple.queue这个队列

步骤1: 引入AMQP依赖

因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中

步骤2: 在publisher中编写测试方法,向simple.queue发送消息

1.在publisher服务中编写application.yml,添加mq连接信息

2.在publisher服务中新建一个测试类,编写测试方法

运行后,可在RabbitMQ控制台看到该队列,以及队列中有消息了

什么是AMQP?

应用间消息通信的一种协议,与语言和平台无关。

SpringAMOP如何发送消息?

引入amqp的starter依赖
配置RabbitMo地址
利用RabbitTemplate的convertAndSend方法

步骤3: 在consumer中编写消费逻辑,监听simple.queue

1.在consumer服务中编写application.yml,添加mq连接信息

2.在consumer服务中新建一个类,编写消费逻辑

SpringAMQP如何接收消息?
引入amqp的starter依赖
配置RabbitMQ地址
定义类,添加@Component注解
类中声明方法,添加@RabbitListener注解,方法参数就是消息

注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

Work Queue 工作队列

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积,也就是两个消费者同时订阅一个队列,共同处理队列中的消息

案例:模拟WorkQueue,实现一个队列绑定多个消费者

基本思路如下
1.在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

2.在consumer服务中定义两个消息监听者,都监听simple.queue队列
3.消费者1每秒处理50条消息,消费者2每秒处理10条消息

publisher服务中定义测试方法,每20ms发送一个消息,发送50次,也就是1秒向消息队列中发送50条消息

定义两个消息监听者,都监听simple.queue队列,消费者1一秒钟能处理50条消息,消费者2一秒钟能处理5条消息,故理论上这两个消费者能在1秒内处理完publisher发送的所有(50条)消息

但是,运行起来发现,并不是按照能力强的处理的消息多这样来分配的,而是这两个监听者,各分到了一般的消息,消费者1处理偶数号的消息,消费者2处理奇数的消息,这样最终结果就是消费者1很快的处理完了总共消息的一半,25条消息,而消费者2却花了好多秒去处理分给自己25条消息,最终的结果就是这两个消费者不能够在1秒内处理完所有消息。

实际上,它是有一个预取机制造成的这样结果,进入到消息队列中的消息会被提前预取给消费者,默认的是一人一个,这样平均分配的,消费者还没处理完,但是消息都已经全部预取出来给了对应消费者,我们可以配置预取机制,让每个消费者预取消息的数量为1,每次只能获取一消息,处理完成才能获取下一个消息,这样就可以做到能者多劳的结果。

消费预取限制

修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限

配置完后再启动,就发现了消费者1执行的消息要比消费者2执行的消息多了,而不是简单的将消息都平均分配给监听的两个消费者。

Work模型的使用

多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量

发布 ( Publish )、订阅 ( Subscribe )

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。

常见exchange类型包括:
Fanout:广播
Direct: 路由
Topic: 话题

发布订阅-Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

案例:利用SpringAMQP演示FanoutExchange的使用

实现思路如下:
1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定
2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

3.在publisher中编写测试方法,向itcast.fanout发送消息

步骤1: 在consumer服务声明Exchange、Queue、 Binding

在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下

启动后,可以在RabbitMQ控制台看到我们声明的交换机与两个队列完成绑定。

步骤2: 在consumer服务声明两个消费者

在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2

步骤3: 在publisher服务发送消息到FanoutExchange

在publisher服务的SpringAmqpTest类中添加测试方法

项目启动,两个消费者都收到了这个publisher发送的消息

交换机的作用是什么?
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列

发布订阅-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式 (routes)。每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例:利用SpringAMQP演示DirectExchange的使用

实现思路如下:
1.利用@RabbitListener声明Exchange、Queue、RoutingKey
2.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
3.在publisher中编写测试方法,向itcast.direct发送消息

步骤1: 在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2.
2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

步骤2: 在publisher服务发送消息到DirectExchange

在publisher服务的SpringAmqpTest类中添加测试方法,convertAndSend的第二个参数就是RoutingKey,如果是red,则两个消费者都可收到,是blue,则只有消费者1收到,yellow则只有消费者2收到。

描述下Direct交换机与Fanout交换机的差异?
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解?
@Queue
@Exchange

发布订阅-TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割。Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词

案例:利用SpringAMQP演示TopicExchange的使用

实现思路如下:
1.并利用@RabbitListener声明Exchange、Queue、RoutingKey
2.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

3.在publisher中编写测试方法,向itcast.topic发送消息

步骤1: 在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

步骤2: 在publisher服务发送消息到TopicExchange

在publisher服务的SpringAmqpTest类中添加测试方法,如果RoutingKey是china.news,则两个消费者都能够接收到消息,如果为china.weather,则只有消费者1能收到消息,如果是usa.news,则只有消费者2能收到消息

消息转换器

案例:测试发送Object类型消息

说明:在SpringAMQP的发送方法中,接收消息的类型是object,也就是说我们可以发送任意对象类型的消息,SpringAMOP会帮我们序列化为字节后发送

先在consumer中的fanoutConfig中声明一个队列object.queue

在publisher中发送消息以测试

执行完publisher发消息后,RabbitMQ控制台查看object.queue队列中刚收到的消息,看到为乱码,被序列化了

Spring的对消息对象的处理是由org.springframeworkamqp.support.converterMessageConverter来处理的。而默认实现是simpleMessageConverter,基于]DK的ObjectOutputStream完成序列化。

如果要修改只需要定义一个MessaeConverter 类型的Bean即可。推荐用ISON方式序列化,步骤如下:
我们在publisher服务引入依赖

我们在publisher服务声明MessageConverter,在该服务的主启动函数中声明即可。

再次重新执行完publisher发消息后,RabbitMQ控制台查看object.queue队列中刚收到的消息,看到消息已经成为了json格式,而不是乱码了

我们在consumer服务引入Jackson依赖

我们在consumer服务定义MessageConverter,在该服务的主启动函数中声明即可

然后定义一个消费者,监听object.queue队列并消费消息

服务启动后,这个consumer消费者收到了这个json消息。

SpringAMQP中消息的序列化和反序列化是怎么实现的?
利用MessageConverter实现的,默认是JDK的序列化
注意发送方与接收方必须使用相同的MessageConverter

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/184793.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java springboot测试类虚拟MVC环境 匹配返回值与预期内容是否相同 (JSON数据格式) 版

上文java springboot测试类鉴定虚拟MVC请求 返回内容与预期值是否相同我们讲了测试类中 虚拟MVC发送请求 匹配返回内容是否与预期值相同 但是 让我意外的是 既然没人骂我 因为我们实际开发 返回的基本都是json数据 字符串的接口场景是少数的 我们在java文件目录下创建一个 dom…

U9二次开发之轻量服务项目开发

最近公司要开发一个下载图纸的U9轻量级接口,轻量级接口就是restful api,可以直接通过get、post等方式调用,参数的传送和结果的返回都使用JSON格式,用起来比Webservice接口爽多了。 如果是开发新的接口,我建议都用轻量…

CentOS7磁盘挂载

1 引言 本文主要讲述CentOS7磁盘挂载相关知识点和操作。 2 磁盘挂载 步骤1: 查看机器所挂硬盘及分区情况 fdisk -l查询结果: 由上图可以看到该结果包含:硬盘名称、硬盘大小等信息。 属性解释说明Disk /dev/vda硬盘名称53.7G磁盘大…

vue3中引入svg矢量图

vue3中引入svg矢量图 1、前言2、安装SVG依赖插件3、在vite.config.ts 中配置插件4、main.ts入口文件导入5、使用svg5.1 在src/assets/icons文件夹下引入svg矢量图5.2 在src/components目录下创建一个SvgIcon组件5.3 封装成全局组件,在src文件夹下创建plugin/index.t…

一穿一戴一世界 | 紫光展锐2023智能穿戴沙龙成功举办

11月23日,紫光展锐在深圳成功举办了以“一穿一戴一世界”为主题的2023智能穿戴沙龙。展锐智能穿戴沙龙已举办四届,旨在为行业提供启发性的观点和前瞻性的创新理念。本届沙龙吸引了终端厂商、行业翘楚、生态伙伴等行业各领域超过500人汇聚一堂&#xff0c…

代码随想录算法训练营第四十五天【动态规划part07】 | 70. 爬楼梯 (进阶)、322. 零钱兑换、279.完全平方数

70. 爬楼梯 (进阶) 题目链接: 题目页面 求解思路: 动规五部曲 确定dp数组及其下标含义:爬到有i阶楼梯的楼顶,有dp[i]种方法递推公式:dp[i] dp[i-j];dp数组的初始化:dp[0] 1;确…

echarts笛卡尔坐标系热力图当坐标及数据为小数时

// X坐标轴 const xValue [6,6.5,7,7.5,8,8.5,9,9.5,10]; //Y坐标轴 const yValue [1.5,2,2.5,3,3.5,4,4.5,5,5.5,6]; // 需要展示的值【X坐标,Y坐标,展示的数值】 const data [[6.5,2,4], [7, 2.5, 10]] ; // 坐标轴及数值存在小数时,需要进行转化,否…

图扑数字孪生在智慧校园可视化中的应用

当今,智慧校园发展阶段亟需推动信息可视化建设与发展,将大数据、云计算、可视化等高新技术相融合,为校园师生创造科学智能的学习环境,并实现教学资源最大化和信息服务智能化。帮助学校更好地应用校园可视化技术,提升校…

【医学图像处理】超详细!PET图像批量预处理

目录 一、单个PET图像预处理1、使用[MRIConvert](https://pan.baidu.com/s/1cn3kgeVRir8HvP6HHm0M0Q?pwd5rt5)处理DCM2、MRI和PET数据预处理过程1) 打开matlab命令行输入spm pet,打开SMP12,界面如下2) Realign,只需要…

小程序:用户查找英语单词的意思 ← Python字典

【程序分析】 ● 字典中的条目是没有顺序的。 ● 可以对字典使用如下方法: keys()、values()、 items()、 clear()、 get(key)、 pop(key) 和popitem()【程序代码】 dictionary{"dog":"狗","apple":"苹果","banana&q…

软件测试该如何发展?自我价值诉求?“我“的测试之路...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 其实测试的生态&a…

curl添加https服务

CURL支持的通信协议有FTP、FTPS、HTTP、HTTPS、TFTP、SFTP、Gopher、SCP、Telnet、DICT、FILE、LDAP、LDAPS、IMAP、POP3、SMTP和RTSP。 首选删除系统自带的openssl,因为他只有可执行程序和库,没有头文件。 sudo apt-get remove openssl openssl官网&am…

国产自研数字孪生引擎如何突围?易知微给出了一个答案!

大数据产业创新服务媒体 ——聚焦数据 改变商业 在数字化转型的大潮中,数字孪生引擎以其独特的能力,正迅速成为能源、智慧城市、智能制造及智能政务等多个领域的关键技术。通过创建现实世界的虚拟副本,数字孪生为复杂系统的管理、优化和预测…

每日一题 2824. 统计和小于目标的下标对数目(简单)

简单题&#xff0c;走流程 class Solution:def countPairs(self, nums: List[int], target: int) -> int:ans 0for i in range(len(nums)):for j in range(i 1, len(nums)):if nums[i] nums[j] < target:ans 1return ans

数据治理技术之数据清洗

数据清洗背景 数据质量一般由准确性、完整性、一致性、时效性、可信性以及可解释性等特征来描述&#xff0c;根据 Rahm 等人在 2000 年对数据质量基于单数据源还是多数据源以及问题出在模式层还是实例层的标准进行分类&#xff0c;将数据质量问题分为单数据源模式层问题、单数…

Jetson orin(Ubuntu20.04)不接显示器无法输出VNC图像解决办法以及vnc安装记录

sudo apt install vino 好像Jetpack 5.0中已经自带了。。 配置VNC server: gsettings set org.gnome.Vino prompt-enabled false gsettings set org.gnome.Vino require-encryption false 编辑org.gnome,增加一个“enabled key”的参数&#xff1a; cd /usr/share/glib-2…

数据结构-树

参考&#xff1a;https://www.hello-algo.com/chapter_tree/binary_tree/#711 1. 介绍 树存储不同于数组和链表的地方在于既可以保证数据检索的速度&#xff0c;又可以保证数据插入删除修改的速度&#xff0c;二者兼顾。 二叉树是一种很重要的数据结构&#xff0c;是非线性的…

Linux | 创建 | 删除 | 查看 | 基本命名详解

Linux | 创建 | 删除 | 查看 | 基本命名详解 文章目录 Linux | 创建 | 删除 | 查看 | 基本命名详解前言一、安装Linux1.1 方法一&#xff1a;云服务器方式1.2 方法二&#xff1a;虚拟机方式 二、ls2.2 ll 三、which3.1 ls -ld 四、pwd五、cd5.1 cd .\.5.2 ls -al5.3 重新认识命…

一个令人惊艳的新项目,SVD开源了!

大家好&#xff0c;我是 Jack。 对于 Stable Diffusion&#xff0c;想必我的读者朋友们对此都不陌生。 自 Stability AI 公司发布 SD&#xff08;全称&#xff1a;Stable Diffusion) 以来&#xff0c;受到了很多人的喜爱。 SDXL 效果 随后技术升级&#xff0c;又发布了 SDXL…