【SpringCloud学习笔记】RabbitMQ(中)

1. 交换机概述

前面《RabbitMQ上篇》我们使用SpringAMQP来演示如何用Java代码操作RabbitMQ,当时采用的是生产者直接将消息发布给队列,但是实际开发中不建议这么做,更加推荐生产者将消息发布到交换机(exchange),然后由exchange路由到队列,其架构如下所示:

可以看出,在发布-订阅模型中新增一个"交换机"角色,此后各个角色的任务如下:

  • publisher:不再是将message直接转发到queue,而是将message转发给exchange
  • exchange:一方面接收来自publisher生产的消息;另一方面,依据route key以及type将消息路由给绑定的不同的队列
  • queue:与以前一样,暂存消息,供消费者消费,另外还需要同交换机建立绑定关系
  • consumer:与以前一样,订阅queue中的消息,并进行业务处理消费消息

注意:由于我们的exchange不暂存消息,只做消息的路由,因此如果没有queue与exchange绑定或者routing key设置错误,就会导致消息丢失!!!

2. 交换机类型

RabbitMQ提供的交换机类型有如下四种:

  1. Fanout Exchange:扇出交换机,形象来说就是"广播交换机",会将消息路由给所有绑定的queue
  2. Direct Exchange:定向交换机,基于RoutingKey发给订阅的queue
  3. Topic Exchange: 通配符订阅,在Direct的基础上引入通配符
  4. Headers Exchange: 头匹配,基于MQ的消息头匹配,使用场景较少(此处不讲解)

2.1 Fanout Exchange

下面是Fanout Exchange的工作流程图:

特征:Fanout Exchange将消息路由给全部跟它绑定的queue
操作步骤:

  1. 在RabbitMQ控制台中新建两个队列:fanout.queue1、fanout.queue2image.png
  2. 在RabbitMQ控制台中新建一个Fanout类型的Exchange:fanout.exchange

image.png

  1. 将fanout.exchange与fanout.queue1、fanout.queue2分别建立binding关系

image.png

  1. 新建两个方法用于模拟consumer,分别监听fanout.queue1以及fanout.queue2队列
/**
 * 订阅fanout.queue1队列
 * @param msg 消息
 */
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    log.info("listener1 从【fanout.queue1】接收到消息:" + msg);
}

/**
 * 订阅fanout.queue2队列
 * @param msg 消息
 */
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    log.info("listener2 从【fanout.queue2】接收到消息:" + msg);
}
  1. 新建一个测试类方法,模拟将消息发布给fanout.exchange
/**
 * 测试FanoutExchange交换机类型
 */
@Test
public void testFanoutExchange() {
    // 1. 定义exchange名称
    String exchangeName = "fanout.exchange";
    // 2. 定义消息体
    String msg = "震惊!某大学频频被曝出食堂安全问题";
    // 3. 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "", msg);
}
  1. 观察结果

image.png
结果如上图所示:说明fanout.exchange雀氏将消息广播给了所有与之绑定的queue

2.2 Direct Exchange

特点:Direct Exchange要求在与queue建立binding关系的时候定义一个BindingKey,之后publisher生产者携带消息的同时也会指定RoutingKey,只有RoutingKey与BindingKey一致的queue才会被路由消息

工作流程如上图所示,其中queue1与exchange的Binding Key为"blue"以及"red",queue2与exchange的Binding Key为"yellow"以及"red",此时当Routing Key为"blue",Direct Exchange只会将消息路由给queue1
操作步骤:

  1. 在RabbitMQ控制台中新建两个队列:direct.queue1、direct.queue2

image.png

  1. 在RabbitMQ控制台中新建一个Direct类型的Exchange:direct.exchange

image.png

  1. 将direct.exchange与direct.queue1、direct.queue2分别建立binding关系,其中与queue1的binding key为"blue"与"red",与queue2的binding key为"yellow"与"red"

image.png

  1. 新建两个方法用于模拟consumer,分别监听direct.queue1以及direct.queue2队列
/**
 * 订阅direct.queue1队列
 * @param msg 消息
 */
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    log.info("listener1 从【direct.queue1】接收到消息:" + msg);
}

/**
 * 订阅direct.queue2队列
 * @param msg 消息
 */
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    log.info("listener2 从【direct.queue2】接收到消息:" + msg);
}
  1. 新建一个测试类方法,模拟将消息发布给direct.exchange,并指定routing key为"blue"
/**
 * 测试DirectExchange交换机类型
 */
@Test
public void testDirectExchange() {
    // 1. 定义交换机名称
    String exchangeName = "direct.exchange";
    // 2. 定义消息体
    String msg = "今日份消息只交给幸运色为blue的哦~";
    // 3. 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}
  1. 观察结果

image.png
结果符合预期,只有direct.queue1能够接受到消息!

2.3 Topic Exchange

Topic Exchange与Direct Exchange非常类似,都可以依据BindingKey以及RoutingKey的匹配程度进而路由给特定符合条件的queue,但是Topic Exchange定义Binding Key可以为一组词,中间用"."进行分隔,并且支持使用通配符,规则如下:

  • #:匹配0个或者多个词
  • *:匹配1个单词

例如现在queue1的BindingKey为"china.#“,而queue2的BindingKey为”#.news",而RoutingKey为"china.reports",此时可以路由给queue1,但是无法路由给queue2,如果RoutingKey为"china.news"则queue1、queue2均可以被路由
操作步骤:

  1. 在RabbitMQ控制台中新建两个队列:topic.queue1、topic.queue2

image.png

  1. 在RabbitMQ控制台中新建一个Topic类型的Exchange:topic.exchange

image.png

  1. 将topic.exchange与topic.queue1、topic.queue2分别建立binding关系,其中与queue1的binding key为"china.#“,与queue2的binding key为”#.news"

image.png

  1. 新建两个方法用于模拟consumer,分别监听topic.queue1以及topic.queue2队列
/**
 * 订阅topic.queue1队列
 * @param msg 消息
 */
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {
    log.info("listener1 从【topic.queue1】接收到消息:" + msg);
}

/**
 * 订阅topic.queue2队列
 * @param msg 消息
 */
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {
    log.info("listener2 从【topic.queue2】接收到消息:" + msg);
}
  1. 新建一个测试类方法,模拟将消息发布给topic.exchange,并指定routing key为"china.news"
/**
 * 测试TopicExchange交换机类型
 */
@Test
public void testTopicExchange() {
    // 1. 定义交换机名称
    String exchangeName = "topic.exchange";
    // 2. 定义消息体
    String msg = "中国新闻报,快来买呀!";
    // 3. 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", msg);
}
  1. 观察结果

image.png
证明通配符生效!

3. 声明队列和交换机

前面我们收发消息的过程是使用Java代码实现的,但是创建Queues以及Exchanges仍然需要我们在RabbitMQ提供的控制台实现,那么如何使用Java代码来创建Queue以及Exchange呢?
SpringAMQP API:

  • 声明队列:使用new Queue("队列名称")创建
  • 声明交换机:使用new FanoutExchange("交换机名称")(以FanoutExchange为例)
  • 声明绑定关系:使用BindingBuilder.bind(队列对象).to(交换机对象)构建

3.1 Fanout声明

步骤:

  1. 编写一个配置类,使用@Configuration 声明
  2. 内部配置Queue、Exchange、Binding,并使用@Bean声明
@Configuration
public class FanoutConfig {

    /**
     * 声明FanoutExchange交换机
     * @return 返回FanoutExchange对象
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("code.fanout.exchange");
    }

    /**
     * 声明FanoutQueue队列
     * @return 返回FanoutQueue队列
     */
    @Bean
    public Queue fanoutQueue() {
        return new Queue("code.fanout.queue");
    }

    /**
     * 声明绑定关系
     * @param fanoutExchange 交换机
     * @param fanoutQueue 队列
     * @return 绑定关系
     */
    @Bean
    public Binding fanoutBinding(FanoutExchange fanoutExchange, Queue fanoutQueue) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }
}

3.2 Direct声明

步骤:

  1. 编写一个配置类,使用@Configuration 声明
  2. 内部配置Queue、Exchange、Binding,并使用@Bean声明
@Configuration
public class DirectConfig {

    /**
     * 声明一个DirectExchange交换机
     * @return 返回一个DirectExchange类型对象
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("code.direct.exchange");
    }

    /**
     * 声明一个Queue队列
     * @return 返回一个Queue类型对象
     */
    @Bean
    public Queue directQueue() {
        return new Queue("code.direct.queue");
    }

    /**
     * 声明一个绑定关系
     * @return 返回Binding对象
     */
    @Bean
    public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("");
    }
}

3.3 基于注解声明

注解声明格式:

@Component
@Slf4j
public class AnnotateRabbitListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("annotate.direct.queue"),
            key = {"blue", "red"},
            exchange = @Exchange(name = "annotate.direct.exchange", type = ExchangeTypes.DIRECT)
    ))
    public void listenAnnotateDirect(String msg) {
        log.info("接收到消息:" + msg);
    }
}

4. 消息转换器

4.1 现象演示

前面我们都是将字符串类型的数据作为消息进行传输,那么如果是对象类型的消息呢,我们尝试发送一个自定义User类型作为消息传输:

/**
 * 自定义User类型
 * @author 米饭好好吃
 */
@Data
@AllArgsConstructor
public class User implements Serializable {
    private String name;
    private Integer age;
}
@Test
public void testSendObject() {
    // 1. 声明队列名称
    String queueName = "work.queue";
    // 2. 定义消息体
    User user = new User("jack", 22);
    // 3. 发送消息
    rabbitTemplate.convertAndSend(queueName, user);
}

从RabbitMQ控制台中查看消息内容如下:
image.png

4.2 追踪源码

image.png
我们发现实际调用了convertMessageIfNecessary(object)方法,我们继续追踪进去:
image.png
该方法判断object是否为Message类型,如果不是就调用getRequiredMessageConverter()获取所需的消息转换器,继续追踪进去:
image.png
image.png
该方法返回了一个SimpleMessageConverter实例对象,因此我们回到上一层,获取到MessageConverter实例后又调用了toMessage方法,我们继续追踪进去观察是如何转换消息的:
image.png
在AbstruectMessageConverter中实现了toMessage方法,而createMessage方法在子类 SimpleMessageConverter重写了该方法:
image.png
可以看出调用了SerialzationUtils.serialize(object)进行了序列化,继续追踪观察到底是如何序列化的:
image.png
可以看出是借助ObjectOutputStream进行序列化的,而这这个是JDK默认的序列化方式,该方式有如下缺点:

  • 序列化过程不够安全,可能存在注入风险
  • 序列化结果可读性较差
  • 序列化结果占用体积较大

因此我们需要重写消息转换器中的序列化机制:

4.3 自定义JSON序列化器

因此JDK原生序列化器有诸多确定,因此我们需要使用自定义的JSON序列化器,此处需要引入jackson-databind相关依赖

<dependency>
  <groupId>com.fasterxml.jackson.dataformat</groupId>
  <artifactId>jackson-dataformat-xml</artifactId>
  <version>2.9.10</version>
</dependency>
/**
 * 消息转换器配置
 * @author 米饭好好吃
 */
@Configuration
public class MessageConvertConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

验证结果:
image.png
在控制台中我们可以发现消息格式就是熟悉的JSON格式了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/718290.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Mac M3 Pro 部署Flink-1.16.3

目录 1、下载安装包 2、解压及配置 3、启动&测试 4、测试FlinkSQL读取hive数据 以上是mac硬件配置 1、下载安装包 官网&#xff1a;Downloads | Apache Flink 网盘&#xff1a; Flink 安装包 https://pan.baidu.com/s/1IN62_T5JUrnYUycYMwsQqQ?pwdgk4e Flink 已…

✅生产问题之Emoji表情如何操作存储,MySQL是否支持

针对 Emoji 表情 MySQL 存储是否支持的问题&#xff0c;结论是&#xff1a; MySQL 中可以存储 emoji 表情&#xff0c;但需要使用 UTF8MB4 字符编码。如果使用 UTF8MB3&#xff0c;存储这些扩展字符会导致解析错误。 课外补充 MySQL 对 Unicode 的支持 Unicode 字符集已成为…

win环境安装Node.js的多种方式

今天我们分享win环境安装Node.js的多种方式&#xff1a; 首先要明白Node.js是一个JavaScript运行环境&#xff0c;它基于Google的V8引擎进行封装&#xff0c;允许JavaScript运行在服务器端。Node.js让JavaScript成为一种与PHP、Python、Perl、Ruby等服务端语言平起平坐的脚本语…

聊聊redis中的字典的实现

写在文章开头 redis作为非关系数据库&#xff0c;其底层采用了字典(也称为映射)保存键值对。本文会基于源码分析的方式带你了解redis中这一常见数据结构的精巧设计&#xff0c;希望对你有帮助。 Hi&#xff0c;我是 sharkChili &#xff0c;是个不断在硬核技术上作死的 java c…

Spring Security——添加验证码

目录 项目总结 新建一个SpringBoot项目 VerifyCode&#xff08;生成验证码的工具类&#xff09; WebSecurityController控制器 VerifyCodeFilter&#xff08;自定义过滤器&#xff09; WebSecurityConfig配置类 login.html登录页面 项目测试 本项目是以上一篇文章的项目…

DC/AC电源模块:提升光伏发电系统的能源利用率

BOSHIDA DC/AC电源模块&#xff1a;提升光伏发电系统的能源利用率 随着环境保护意识的提高和能源需求的增加&#xff0c;光伏发电系统作为一种清洁能源的代表&#xff0c;受到了越来越多的关注。然而&#xff0c;光伏发电系统在实际应用中还存在一些问题&#xff0c;如发电效率…

【UIDynamic-动力学-UICollisionBehavior-碰撞模式-创建边界 Objective-C语言】

一、我们来说这个碰撞模式 1.把之前的代码备份一下,改个名字:“04-碰撞行为-碰撞模式”, 然后,command + R,先跑一下, 我现在,一点击,是这个红色的View、和蓝色的View、在发生碰撞, 我们说,碰撞模式是啥意思, collision里边,有一个叫做collisionMode, UICollis…

Camtasia Studio 2024软件下载附加详细安装教程

amtasia Studio 2024是一款功能强大的屏幕录制和视频编辑软件&#xff0c;由TechSmith公司开发。这款软件不仅能够帮助用户轻松地记录电脑屏幕上的任何操作&#xff0c;还可以将录制的视频进行专业的编辑和制作&#xff0c;最终输出高质量的视频教程、演示文稿、培训课程等。 …

geoserver 如何设置数据目录

在GeoServer中&#xff0c;数据目录是存储配置文件、数据存储、图层、样式等的重要目录。默认情况下&#xff0c;GeoServer的数据目录位于GeoServer安装目录下的data_dir文件夹。但在很多情况下&#xff0c;用户可能希望将数据目录设置在一个自定义位置&#xff0c;以便更好地管…

独立游戏之路:Tap篇 -- Unity 集成 TapTap 广告详细步骤

Unity 集成 TapADN 广告详细步骤 前言一、TapTap 广告介绍二、集成 TapTap 广告的步骤2.1 进入广告后台2.2 创建广告计划2.3 选择广告类型三、代码集成3.1 下载SDK3.2 工程配置3.3 源码分享四、常见问题4.1 有展现量没有预估收益 /eCPM 波动大?4.2 新建正式媒体找不到预约游戏…

公共服务数字化转型的五个路径

数字化技术赋能公共服务&#xff0c;主要以数据为着力点&#xff0c;通过数据驱动优化或重塑公共服务架构。基于用数据决策、用数据服务、用数据创新的现代化的公共服务供给模式&#xff0c;推进“信息数字化业务数字化组织业务化”的全方位公共服务数字化&#xff0c;进而赋能…

深入学习html的步骤

推荐的学习步骤&#xff1a; 1. 深入了解HTML基础标签 列表 HTML提供有序列表(<ol>)和无序列表(<ul>)。 <h2>无序列表</h2> <ul><li>项目一</li><li>项目二</li><li>项目三</li> </ul><h2>…

Vue48-ref属性

一、需求&#xff1a;操作DOM元素 1-1、使用原生的id属性 不太好&#xff01; 1-2、使用 ref属性 原生HTML中&#xff0c;用id属性给元素打标识&#xff0c;vue里面用ref属性。 给哪个元素加了ref属性&#xff0c;vc实例对象就收集哪个元素&#xff01;&#xff01;&#xff0…

【Oracle生产运维】数据库服务器高负载排查处理

说明 在Oracle数据库运维工作中&#xff0c;经常会遇到Oracle数据库服务器平均负载&#xff08;load average&#xff09;突然异常升高&#xff0c;如果放任不管&#xff0c;严重的情况下会出现数据库宕机、服务器重启等重大故障。因此&#xff0c;当发现数据库服务器平均负载…

房间预订系统怎么做

在这个日新月异的时代&#xff0c;旅游已经成为了许多人生活中不可或缺的一部分。然而&#xff0c;在规划一场完美的旅行时&#xff0c;房间预订往往是一个让人头疼的问题。今天&#xff0c;我要为大家揭秘一款颠覆传统的房间预订系统——它不仅仅是一个预订工具&#xff0c;更…

需求虽小但是问题很多,浅谈JavaScript导出excel文件

最近我在进行一些前端小开发&#xff0c;遇到了一个小需求&#xff1a;我想要将数据导出到 Excel 文件&#xff0c;并希望能够封装成一个函数来实现。这个函数需要接收一个二维数组作为参数&#xff0c;数组的第一行是表头。在导出的过程中&#xff0c;要能够确保避免出现中文乱…

使用 calibre 拆分电子书合辑

文章目录 引言下载插件拆书设置封面等元信息 引言 下载电子书合辑后&#xff0c;想拆分为单独成册的文件 https://bookfere.com/post/603.html 教程使用 calibre 的 EpubSplit 插件&#xff0c;这里我跟着实践&#xff0c;记录在此&#xff0c;希望能帮助你。 本文基于 macOS …

【工程2区】毕业神刊 —— 1-2个月录用!非黑!非预警!

【欧亚科睿学术】 电力能源类SCIE ✅ 进展超顺 ✅ 录用率高 ✅ 领域相关均可 【期刊简介】IF&#xff1a;1.0-2.0&#xff0c;JCR2区&#xff0c;中科院4区 【版面类型】正刊&#xff0c;仅少量版面 【终审周期】走期刊部系统&#xff0c;预计3个月左右录用 【检索情况…

计算机图形学入门13:纹理映射常见问题、MipMap

上一章介绍了纹理映射&#xff0c;这一章介绍纹理映射常见的问题。 1.纹理太小 1.1产生原因 例如要渲染一面墙&#xff0c;它的分辨率4K&#xff0c;但与它对应的纹理大小是256x256&#xff0c;这样要怎样&#xff1f;显然纹理会被拉大。当墙面上一个点去查询纹理时&#xff0…

浏览器开发公司Brave 将自己的搜索结果与其 Leo AI 助手集成

Brave Software是一家开发浏览器的公司&#xff0c;其主要产品是Brave浏览器。Brave浏览器基于Chromium项目开发&#xff0c;具有高性能和隐私保护的特点。此外&#xff0c;Brave浏览器还提供了“off record”模式&#xff0c;允许用户在不记录浏览历史的情况下使用浏览器。关于…