学习【RabbitMQ入门】这一篇就够了

目录

  • 1. RabbitMQ入门
    • 1-1. 同步调用
    • 1-2. 异步调用
    • 1-3. MQ技术选型
    • 1-4. RabbitMQ介绍
      • 消息模式
    • 1-5. SpringAMQP
      • Basic Queue
      • Work Queue
      • Fanout Exchange
      • Direct Exchange
      • Topic Exchange
      • 消息转换器

在这里插入图片描述

1. RabbitMQ入门

1-1. 同步调用

优势:

  • 时效性强,等待到结果后才返回。

问题:

  • 扩展性差
  • 性能下降
  • 级联失败问题

1-2. 异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

异调用的优势是什么?

  • 耦合度低,拓展性强
  • 异步调用,无需等待,性能好
  • 故障隔离,下游服务故障不影响上游业务
  • 缓存消息,流量削峰填谷

异步调用的问题是什么?

  • 不能立即得到调用结果,时效性差
  • 不确定下游业务执行是否成功
  • 业务安全依赖于Broker(消息代理)的可靠性

1-3. MQ技术选型

MQ(MessageQueue), 中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。

在这里插入图片描述

1-4. RabbitMQ介绍

RabbitMQ的整体架构及核心概念:

  • virtual-host:虚拟主机,起到数据隔离的作用
  • publisher:消息发送者
  • consumer:消息的消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息,不能存储消息

在这里插入图片描述

消息模式

消息队列的消息分为二类传输模型:点对点模型发布 /订阅模型

在这里插入图片描述

RabbitMQ在此基础上进行细化,提供了5种消息模型:

  • 1、2(点对点模型)
  • 3、4、5(发布/订阅模型)

在这里插入图片描述

1-5. SpringAMQP

AMQP:是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP:基于AMQP协议定义的一套API,提供了模板来发送和接收消息,模板底层是基于RabbitMQ封装。

Basic Queue

在这里插入图片描述

生产者

① 在application.yml中添加MQ配置

spring:
  rabbitmq:
    host: 192.168.136.131 # 主机名
    port: 5672 # 端口
    virtual-host: itheima # 虚拟主机
    username: root # 用户名
    password: root # 密码 

② 注入RabbitTemplate ,并利用RabbitTemplate实现消息发送

@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发送简单消息
    @Test
    public void testSimpleQueue() {
        //参数一: 队列名称(次队列需要是提前创建好的)   参数二: 消息内容
        rabbitTemplate.convertAndSend("p2p", "hello,spring amqp!");
    }
}

消费者

① 在application.yml中添加MQ配置

spring:
  rabbitmq:
    host: 192.168.136.131 # 主机名
    port: 5672 # 端口
    virtual-host: itheima # 虚拟主机
    username: root # 用户名
    password: root # 密码 

② 创建com.itheima.listener.SpringRabbitListener

@Component
public class SpringRabbitListener {
    //简单类型消息
    @RabbitListener(queues = "p2p")//声明监听的队列名称
    public void listenSimpleQueueMessage(String msg) {
        System.out.println("消费者接收到消息:【" + msg + "】");
    }
}

Work Queue

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work模型,多个消费者共同处理消息处理,速度就能大大提高了。

在这里插入图片描述

生产者

这次我们循环发送,模拟大量消息堆积现象。

在publisher服务中的SpringAmqpTest类中添加一个测试方法:

 //批量发送消息
 @Test
 public void testWorkQueue() {
     for (int i = 1; i <= 50; i++) {
         rabbitTemplate.convertAndSend("p2p", "message_" + i);
     }
 }

消费者

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

 //使用下面两个方法来接收p2p队列中的消息
 @RabbitListener(queues = "p2p")
 public void listenWorkQueue1(String msg) throws InterruptedException {
     System.out.println("消费者1接收到消息:【" + msg + "】");
     Thread.sleep(20);
 }

 @RabbitListener(queues = "p2p")
 public void listenWorkQueue2(String msg) throws InterruptedException {
     System.err.println("消费者2........接收到消息:【" + msg + "】");
     Thread.sleep(200);
 }

先启动ConsumerApplication后(消费者),再执行publisher服务中刚刚编写的发送测试方法testWorkQueue(提供者)。

可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。

在这里插入图片描述

能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 消费者一次处理一条消息,处理完毕后再从MQ中获取

在这里插入图片描述

小结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量
  • 缺点:如果有多个模块监听,都需要这条消息,但是Work模型的消息只会被一个模块的消费者处理

Fanout Exchange

Fanout,在MQ中可以理解为广播模式。

在这里插入图片描述

声明队列和交换机

我们习惯在消费者一方来创建交换机和队列

@Configuration
public class FanoutConfiguration {

    // 声明交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange");
    }

    // 声明第1个队列
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }

    // 队列1绑定交换机
    @Bean
    public Binding bindingQueue1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    // 声明第2个队列
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }

    // 队列2绑定交换机
    @Bean
    public Binding bindingQueue2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

消费者

//使用下面两个方法来测试Fanout类型的消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

生产者

// 发送fanout消息
@Test
public void testFanoutExchange() throws Exception {
    //参数一: 交换机名称  参数二:暂时没用 参数三: 消息内容
    rabbitTemplate.convertAndSend("fanout.exchange", "", "hello,everyone!");
}

小结

交换机的作用是什么?

  • 接收publisher发送的消息
  • FanoutExchange的会将消息路由到每个绑定的队列
  • 不能缓存消息,路由失败,消息丢失

Direct Exchange

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange

在这里插入图片描述

注解声明队列和交换机(消费者)

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

//使用下面两个方法来测试Direct类型的消息
// direct监听器1
@RabbitListener(bindings =@QueueBinding(  // 完成队列绑定交换机
        value = @Queue("direct.queue1"), // 创建队列
        exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT), // 创建交换机并绑定队列
        key ={"vip","base"} // 指定routing key
))
public void listenDirectQueue1(String msg){
    System.out.println("direct消费者1接收消息:【" + msg + "】");
}

// direct监听器2
@RabbitListener(bindings =@QueueBinding(
        value = @Queue("direct.queue2"),
        exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT),
        key ={"base"}
))
public void listenDirectQueue2(String msg){
    System.err.println("direct消费者2接收消息:【" + msg + "】");
}

生产者

// 发送direct消息
@Test
public void testSendDirect() throws Exception {
    //参数一: 交换机名称  参数二:routingKey 参数三: 消息内容
    rabbitTemplate.convertAndSend("direct.exchange", "vip", "hello,everyone!");
}

小结

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列

Topic Exchange

Topic类型的类型Exchange与Direct相比,可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: china.news

通配符规则:

#:代指0个或多个单词

*:代指一个单词

在这里插入图片描述

注解声明队列和交换机(消费者)

//使用下面两个方法来测试Topic类型的消息
 @RabbitListener(bindings = @QueueBinding(
         value = @Queue("topic.queue1"),
         exchange = @Exchange(value = "topic.exchange", type = ExchangeTypes.TOPIC),
         key = "china.#"
 ))
 public void listenTopicQueue1(String msg) {
     System.out.println("消费者1接收到Topic消息:【" + msg + "】");
 }

 @RabbitListener(bindings = @QueueBinding(
         value = @Queue("topic.queue2"),
         exchange = @Exchange(value = "topic.exchange", type = ExchangeTypes.TOPIC),
         key = "#.news"
 ))
 public void listenTopicQueue2(String msg) {
     System.out.println("消费者2接收到Topic消息:【" + msg + "】");
 }

生产者

// 发送topic消息
@Test
public void testTopicExchange() throws Exception {
    //参数一: 交换机名称  参数二:routingKey 参数三: 消息内容
    rabbitTemplate.convertAndSend("topic.exchange", "china.news", "喜报!孙悟空大战孙行者,胜!!!");
}

小结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey可以是多个单词,以 . 分割
  • Topic交换机与队列绑定时的bindingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

消息转换器

在SpringAMQP的发送方法中,Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 可读性差

配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小可读性更高,因此可以使用JSON方式来做序列化和反序列化。

① 在父工程中引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.10</version>
</dependency>

② 在publisher和consumer两个服务启动类中添加json转换器

@Bean
public MessageConverter jsonMessageConverter(){
	return new Jackson2JsonMessageConverter();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/518528.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

当前2024阿里云服务器哪个地域价格比较优惠,哪个地域便宜?

目前2024年阿里云服务器地域对比哪个价格更优惠&#xff1f;华北6&#xff08;乌兰察布&#xff09;、华北3&#xff08;张家口&#xff09;、华北1&#xff08;青岛&#xff09;和华南2&#xff08;河源&#xff09;地域更便宜&#xff0c;云服务器吧yunfuwuqiba.com整理阿里云…

【量子计算机为什么能吊打通用计算机】浅谈

Quntum Computer 一、量子计算机导入 这是一双手&#xff0c;这是大自然送给你最神奇的礼物&#xff0c;你用它来写字、吃饭、打游戏&#xff0c;除此之外&#xff0c;它还有一个妙不可言的功能&#xff0c;计算。是的&#xff0c;手是你人生中的第一个计算器&#xff0c;到小…

Azure service tag 导致的Exchange online 无法发送邮件的问题

最近碰到一个比较有趣的客户问题。 这个客户一直在使用Exchange online 与自己在Azure Vnet 里面的exchange server交换邮件。 客户的网络架构如下图所示。 客户说之前从exchange online往外发邮件一直是好的,但是最近两周开始只有百分之3左右的邮件可以发出去,其他的都pen…

java中大型医院HIS系统源码 Angular+Nginx+SpringBoot云HIS运维平台源码

java中大型医院HIS系统源码 AngularNginxSpringBoot云HIS运维平台源码 云HIS系统是一款满足基层医院各类业务需要的健康云产品。该产品能帮助基层医院完成日常各类业务&#xff0c;提供病患预约挂号支持、病患问诊、电子病历、开药发药、会员管理、统计查询、医生工作站和护士工…

不连续页分配器

不连续页分配器 在设备长时间运行后&#xff0c;内存碎片话&#xff0c;连续的物理页比较稀缺&#xff1b;伙伴分配器和slab块分配器&#xff0c;分配的内存物理上是连续的&#xff1b;在这种情况下&#xff0c;如果需要分配长度超过一页的内存块&#xff0c;可以使用不连续页…

LeetCode-17. 电话号码的字母组合【哈希表 字符串 回溯】

LeetCode-17. 电话号码的字母组合【哈希表 字符串 回溯】 题目描述&#xff1a;解题思路一&#xff1a;回溯三部曲&#xff1a;解题思路二&#xff1a;回溯解题思路三&#xff1a; 题目描述&#xff1a; 给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组…

【JavaSE】反射

Java代码的生命周期 Java代码在计算机中经历的阶段&#xff1a;Source源代码阶段、Class类对象阶段、RunTime运行时阶段。 Source源代码阶段: 这个阶段是由程序员编写生成源代码,再由Javac编译器生成class文件。 Class类对象阶段&#xff1a;由类加载器将class文件加载到JVM内…

【保姆级教程】如何订阅OnlyFans?如何在OnlyFans上面支付?OnlyFans虚拟卡订阅教程

1. 引言 什么是OnlyFans&#xff1a;OnlyFans是一种内容订阅服务&#xff0c;成立于2016年&#xff0c;允许内容创作者从用户那里获得资金&#xff0c;用户需要支付订阅费用才能查看他们的内容。它在多个领域受到欢迎&#xff0c;包括音乐、健身、摄影&#xff0c;以及成人内容…

C语言之指针(4)使用并模拟实现qsort

冒泡排序有局限性&#xff0c;实现时间长而且只能进行整型数据的排序&#xff0c;接下来介绍模拟实现qsort来方便实现各种数据的排序。 函数基本形式&#xff1a; 可以看到该函数有四个参数&#xff0c;第四个参数是一个函数指针&#xff0c;这个指针指向的函数第一个参数和第…

数据分析——数据规范化

数据规范化是数据分析中的一个重要步骤&#xff0c;其目的在于确保数据的一致性和可比性&#xff0c;提高数据质量和分析结果的准确性。以下是一些数据规范化的常见方法和技术&#xff1a; 数据清洗&#xff1a;此步骤主要清除数据中的重复项、空格、格式错误等&#xff0c;确…

【Oracle】oracle、mysql、sql server三者区别

欢迎来到《小5讲堂》&#xff0c;大家好&#xff0c;我是全栈小5。 这是《Oracle》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对知识…

Waifu2x:使用深度卷积神经网络的动漫风格艺术的图像超分辨率

Github网址&#xff1a;nagadomi/waifu2x&#xff1a;动漫风格艺术的图像超分辨率 (github.com) 该项目主要讲述的是如何利用预训练的深度学习模型来达到无损扩大收缩和去噪&#xff0c;对于一般训练图像的小伙伴应该很清晰图像经常要通过resize操作固定大小&#xff0c;然后c…

操作系统① —— 进程管理

1. 进程、线程、协程 进程&#xff1a; 是系统进行资源分配的基本单位私有地址空间&#xff0c;私有栈、堆上下⽂切换需要切换虚拟地址空间 线程&#xff1a; 是资源调度的基本单位公有同⼀地址空间&#xff0c;公有堆、私有栈上下⽂切换只需要切换少量寄存器 进程和线程的对比…

Oracle APEX 23.2版本 使用应用程序工作副本进行协作开发

现状描述&#xff1a; 当前APEX协作开发都是在同一应用程序下进行的&#xff0c;这样做有可能因同一时间对同一数据进行操作造成锁表或其他问题&#xff0c;Oracle APEX23.2版本迭代后新增了部分功能&#xff0c;可以创建应用程序的工作副本来修复错误、添加功能&#xff0c;然…

趣学前端 | 综合一波CSS选择器的用法

背景 最近睡前习惯翻会书&#xff0c;重温了《HTML5与CSS 3权威指南》。这本书&#xff0c;分上下两册&#xff0c;之前读完了上册&#xff0c;下册基本没翻过。为了对得起花过的每一分钱&#xff0c;决定拾起来近期读一读。 CSS 选择器 在CSS3中&#xff0c;提倡使用选择器…

大模型生成RAG评估数据集并计算hit_rate 和 mrr

文章目录 背景简介代码实现公开参考资料 背景 最近在做RAG评估的实验&#xff0c;需要一个RAG问答对的评估数据集。在网上没有找到好用的&#xff0c;于是便打算自己构建一个数据集。 简介 本文使用大模型自动生成RAG 问答数据集。使用BM25关键词作为检索器&#xff0c;然后…

AI图片智能选区抠像解决方案

高质量的图片处理往往依赖于繁琐的手动操作&#xff0c;耗费大量时间与精力。美摄科技推出了一款革命性的AI图片智能选区抠像解决方案&#xff0c;旨在帮助企业轻松实现图片的高效处理&#xff0c;提升内容创作效率与质量。 美摄科技的AI图片智能选区抠像解决方案&#xff0c;…

An Aspect-Based Engine

GPU Pro 译&#xff1a; By 王钰涵 2024 4.14 10.1 Introduction&#xff08;简介&#xff09; 引擎的定义在整个行业中有所不同。在最基本的层面上&#xff0c;该术语描述了一个代码库&#xff0c;它在多个项目中提供共同的功能。其目的是分享开发这些功能所需的资源成本…

知网参考文献引用格式转latex中BibTex-Python操作

处理思路 参考 处理步骤&#xff1a; &#xff08;单条处理&#xff1a;&#xff09; 1、选知网NoteExpress格式的2-7行复制信息 2、新建一个文本文件&#xff0c;命名为cite.txt&#xff0c;把知网所复制信息粘贴进来 &#xff08;txt文件保存编码ANSI可行&#xff09; 3、…

GD32F470_TTP224 4路 电容式 触摸开关 数字触摸传感器模块移植

2.8 TTP224触摸传感器 该模块是一个基于触摸检测IC(TTP223B)的电容式点动型触摸开关模块。常态下&#xff0c;模块输出低电平&#xff0c;模式为低功耗模式&#xff1b;当用手指触摸相应位置时&#xff0c;模块会输出高电平&#xff0c;模式切换为快速模式;当持续12秒没有触摸时…