Spring RabbitMQ那些事(1-交换机配置消息发送订阅实操)

这里写目录标题

  • 一、序言
  • 二、配置文件application.yml
  • 三、RabbitMQ交换机和队列配置
    • 1、定义4个队列
    • 2、定义Fanout交换机和队列绑定关系
    • 2、定义Direct交换机和队列绑定关系
    • 3、定义Topic交换机和队列绑定关系
    • 4、定义Header交换机和队列绑定关系
  • 四、RabbitMQ消费者配置
  • 五、RabbitMQ生产者
  • 六、测试用例
    • 1、发送到FanoutExchage
    • 2、发送到DirectExchage
    • 3、发送到TopicExchange
    • 4、发动到HeadersExchage
  • 七、结语

一、序言

在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。


二、配置文件application.yml

我们先上应用启动配置文件application.yml,如下:

server:
  port: 8080
spring:
  rabbitmq:
    addresses: localhost:5672
    username: admin
    password: admin
    virtual-host: /
    listener:
      type: simple
      simple:
        acknowledge-mode: auto
        concurrency: 5
        max-concurrency: 20
        prefetch: 5

备注:这里我们指定了RabbitListenerContainerFactory的类型为SimpleRabbitListenerContainerFactory,并且指定消息确认模式为自动确认

三、RabbitMQ交换机和队列配置

Spring官方提供了一套 流式API 来定义队列交换机绑定关系,非常的方便,接下来我们定义4种类型的交换机和相应队列的绑定关系。

1、定义4个队列

/**
 * 定义4个队列
 */
@Configuration
protected static class QueueConfig {

	@Bean
	public Queue queue1() {
		return QueueBuilder.durable("queue-1").build();
	}

	@Bean
	public Queue queue2() {
		return QueueBuilder.durable("queue-2").build();
	}

	@Bean
	public Queue queue3() {
		return QueueBuilder.durable("queue-3").build();
	}

	@Bean
	public Queue queue4() {
		return QueueBuilder.durable("queue-4").build();
	}
}

2、定义Fanout交换机和队列绑定关系

/**
 * 定义Fanout交换机和对应的绑定关系
 */
@Configuration
protected static class FanoutExchangeBindingConfig {

	@Bean
	public FanoutExchange fanoutExchange() {
		return ExchangeBuilder.fanoutExchange("fanout-exchange").build();
	}

	/**
	 * 定义多个Fanout交换机和队列的绑定关系
	 * @param fanoutExchange
	 * @param queue1
	 * @param queue2
	 * @param queue3
	 * @param queue4
	 * @return
	 */
	@Bean
	public Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {
		Binding queue1Binding = BindingBuilder.bind(queue1).to(fanoutExchange);
		Binding queue2Binding = BindingBuilder.bind(queue2).to(fanoutExchange);
		Binding queue3Binding = BindingBuilder.bind(queue3).to(fanoutExchange);
		Binding queue4Binding = BindingBuilder.bind(queue4).to(fanoutExchange);
		return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);
	}

}

备注:这里我们将4个队列绑定到了名为fanout-exchange的交换机上。

2、定义Direct交换机和队列绑定关系

@Configuration
protected static class DirectExchangeBindingConfig {

	@Bean
	public DirectExchange directExchange() {
		return ExchangeBuilder.directExchange("direct-exchange").build();
	}

	@Bean
	public Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {
		return BindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");
	}
}

备注:这里我们定义了名为direct-exchange的交换机并通过路由keyqueue3-route-keyqueue-3绑定到了该交换机上。


3、定义Topic交换机和队列绑定关系

@Configuration
protected static class TopicExchangeBindingConfig {

	@Bean
	public TopicExchange topicExchange() {
		return ExchangeBuilder.topicExchange("topic-exchange").build();
	}

	@Bean
	public Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {
		Binding queue1Binding = BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");
		Binding queue2Binding = BindingBuilder.bind(queue2).to(topicExchange).with("com.#");
		return new Declarables(queue1Binding, queue2Binding);
	}
}

这里我们定义了名为topic-exchange类型的交换机,该类型交换机支持路由key通配符匹配,*代表一个任意字符,#代表一个或多个任意字符。

备注:

  1. 通过路由keycom.order.*queue-1绑定到了该交换机上。
  2. 通过路由key com.#queue-2也绑定到了该交换机上。

4、定义Header交换机和队列绑定关系

@Configuration
protected static class HeaderExchangeBinding {

	@Bean
	public HeadersExchange headersExchange() {
		return ExchangeBuilder.headersExchange("headers-exchange").build();
	}

	@Bean
	public Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {
		return BindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");
	}
}

备注:这里我们定义了名为headers-exchange类型的交换机,并通过参数function=loggingqueue-4绑定到了该交换机上。


四、RabbitMQ消费者配置

Spring RabbitMQ中支持注解式监听端点配置,用于异步接收消息,如下:

@Slf4j
@Component
public class RabbitMqConsumer {

	@RabbitListener(queues = "queue-1")
	public void handleMsgFromQueue1(String msg) {
		log.info("Message received from queue-1, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-2")
	public void handleMsgFromQueue2(String msg) {
		log.info("Message received from queue-2, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-3")
	public void handleMsgFromQueue3(String msg) {
		log.info("Message received from queue-3, message body: {}", msg);
	}

	@RabbitListener(queues = "queue-4")
	public void handleMsgFromQueue4(String msg) {
		log.info("Message received from queue-4, message body: {}", msg);
	}
}

备注:这里我们分别定义了4个消费者,分别用来接受4个队列的消息。

五、RabbitMQ生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {

	private final RabbitTemplate rabbitTemplate;

	public void sendMsgToFanoutExchange(String body) {
		log.info("开始发送消息到fanout-exchange, 消息体:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("fanout-exchange", StringUtils.EMPTY, message);
	}

	public void sendMsgToDirectExchange(String body) {
		log.info("开始发送消息到direct-exchange, 消息体:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("direct-exchange", "queue3-route-key", message);
	}

	public void sendMsgToTopicExchange(String routingKey, String body) {
		log.info("开始发送消息到topic-exchange, 消息体:{}", body);

		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
		rabbitTemplate.send("topic-exchange", routingKey, message);
	}

	public void sendMsgToHeadersExchange(String body) {
		log.info("开始发送消息到headers-exchange, 消息体:{}", body);

		MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setHeader("function", "logging").build();
		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
		rabbitTemplate.send("headers-exchange", StringUtils.EMPTY, message);
	}

}


六、测试用例

这里写了个简单的Controller用来测试,如下:

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {

	private final RabbitMqProducer rabbitMqProducer;

	@RequestMapping("/exchange/fanout")
	public ResponseEntity<String> sendMsgToFanoutExchange(String body) {
		rabbitMqProducer.sendMsgToFanoutExchange(body);
		return ResponseEntity.ok("广播消息发送成功");
	}

	@RequestMapping("/exchange/direct")
	public ResponseEntity<String> sendMsgToDirectExchange(String body) {
		rabbitMqProducer.sendMsgToDirectExchange(body);
		return ResponseEntity.ok("消息发送到Direct交换成功");
	}

	@RequestMapping("/exchange/topic")
	public ResponseEntity<String> sendMsgToTopicExchange(String routingKey, String body) {
		rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);
		return ResponseEntity.ok("消息发送到Topic交换机成功");
	}

	@RequestMapping("/exchange/headers")
	public ResponseEntity<String> sendMsgToHeadersExchange(String body) {
		rabbitMqProducer.sendMsgToHeadersExchange(body);
		return ResponseEntity.ok("消息发送到Headers交换机成功");
	}

}

1、发送到FanoutExchage

直接访问http://localhost:8080/exchange/fanout?body=hello,可以看到该消息广播到了4个队列上。

2023-11-07 17:41:12.959  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到fanout-exchange, 消息体:hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

2、发送到DirectExchage

访问http://localhost:8080/exchange/direct?body=hello,可以看到消息通过路由keyqueue3-route-key发送到了queue-3上。

2023-11-07 17:43:26.804  INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到direct-exchange, 消息体:hello
2023-11-07 17:43:26.822  INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello

3、发送到TopicExchange

访问http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create,路由key为 com.order.create的消息分别发送到了queue-1queue-2上。

2023-11-07 17:44:45.301  INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到topic-exchange, 消息体:hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

4、发动到HeadersExchage

访问http://localhost:8080/exchange/headers?body=hello,消息通过头部信息function=logging发送到了headers-exchange上。

2023-11-07 17:47:21.736  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到headers-exchange, 消息体:hello
2023-11-07 17:47:21.749  INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello

七、结语

下一节我们将会介绍通过两种方式借由RabbitMQ实现延迟消息发送和订阅,敬请期待。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/121521.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言面试

数据类型&#xff08;基本内置类型&#xff09; char //字符数据类型 short //短整型 int //整型 long //长整型 long long //更长的整型 float //单精度浮点数 double //双精度浮点数 类型的基本归类 整形家族&#xff1a; …

英伟达发布RAPIDS cuDF框架 pandas在GPU上运行速度快了150倍

11月9日 消息&#xff1a;Nvidia 发布了一款名为 RAPIDS cuDF 的新版本&#xff0c;据称可以将 pandas 运行在 GPU 上&#xff0c;并且性能提升了150倍。pandas 是一款流行的基于 Python 的数据框架库&#xff0c;用于数据处理和分析。它的开源版本由 Wes McKinney 开发和发布&…

RT-Thread提供的网络世界入口 -net组件

作为一款在RTOS领域对网络支持很丰富的RT-Thread&#xff0c;对设备联网功能的支持的工具就是net组件。 位于/rt-thread/components/net路劲下&#xff0c;作为一款基础组件&#xff0c;env与Studio的工程配置项界面的配置项都依赖该目录下的Kconfig。 我们对网络功能的选择&am…

关于卷积神经网络的步幅(stride)

认识步幅&#xff08;stride&#xff09; 卷积核从输入数组的最左上方开始&#xff0c;按从左往右、从上往下的顺序&#xff0c;依次在输入数组上滑动&#xff0c;我们将每次滑动的行数和列数称为步幅。 计算步幅 假设输入的形状n∗n&#xff0c;卷积核的形状为f∗f&#xff0…

css排版—— 一篇优雅的文章(中英文) vs 聊天框的特别排版

文章 <div class"contentBox"><p>这是一篇范文——仅供测试使用</p><p>With the coming of national day, I have a one week holiday. I reallyexpect to it, because it want to have a short trip during these days. Iwill travel to Ji…

机器学习模板代码(期末考试复习)自用存档

机器学习复习代码 利用sklearn实现knn import numpy as np import pandas as pd from sklearn.neighbors import KNeighborsClassifier from sklearn.model_selection import GridSearchCVdef model_selection(x_train, y_train):## 第一个是网格搜索## p是选择查找方式:1是欧…

Vue3 + Vite + Ts + Router搭建项目

1、新建文件夹 从新建的文件夹cmd进入终端 2、安装vite—依据vite创建vue3项目 2.1、运行 npm init vitelatest2.2.1、输入项目名称 2.2.2、选择vue 2.2.3、选择TypeScript语言 3、安装依赖项 3.1、进入刚才创建的文件夹 cd vite-project 3.2、查看镜像 #查看当前源 npm con…

【uniapp+vue3/vue2】ksp-cropper高性能图片裁剪工具,详解

效果图&#xff1a; 1、ksp-cropper是hbuilder插件市场中的一款插件&#xff0c;兼容vue2和vue3 ksp-cropper插件安装地址&#xff0c;直接点击跳转 2、插件用法相对简单 &#xff08;1&#xff09;只要url有值就会显示插件&#xff0c;为空就会隐藏插件 &#xff08;2&#…

自动化测试框架 —— pytest框架入门篇

今天就给大家说一说pytest框架。 今天这篇文章呢&#xff0c;会从以下几个方面来介绍&#xff1a; 1、首先介绍一下pytest框架 2、带大家安装Pytest框架 3、使用pytest框架时需要注意的点 4、pytest的运行方式 5、pytest框架中常用的插件 01、pytest框架介绍 pytest 是 pytho…

国产猫罐头可以长期作为主食吗?我家的优质TOP的猫罐头分享

我最近一直在调查国产猫罐头可以长期作为主食吗&#xff1f;看看我的购物订单&#xff0c;我已经尝试了几十款了。今天&#xff0c;我想和大家分享一些关于国产猫罐头的经验和见解。 近年来&#xff0c;国产宠粮市场取得了突破性的进展&#xff0c;各个猫粮商在配方、营养数据…

win10添加回环网卡步骤

打开命令行输入hdwwiz 添加新硬件向导 结果

Visual Studio 2022 + OpenCV 4.5.2 安装与配置教程

目录 OpenCV的下载与配置Visual Studio 2022的配置新建工程新建文件新建项目属性表环境配置测试先写一个输出将OpenCV的动态链接库添加到项目的 x64 | Debug下测试配置效果 Other OpenCV的下载与配置 参考这个OpenCV的下载与环境变量的配置&#xff1a; Windows10CLionOpenCV4…

CUDA学习笔记7——CUDA内存组织

CUDA内存组织 CUDA设备内存的分类与特征 内存类型物理位置访问权限可见范围生命周期1全局内存芯片外可读写所有线程和主机端由主机分配与释放2常量内存芯片外只读所有线程和主机端由主机分配与释放3纹理和表面内存芯片外一般只读所有线程和主机端由主机分配与释放4寄存器内存…

OpenSSL生成自签名证书

生成之前首先需要明白以下内容&#xff1a; 第三点的验证数字签名解释下&#xff1a;客户端将使用颁发机构的公钥解密得到的原始数据&#xff0c;再将原始数据通过哈希算法计算得到的哈希值&#xff08;此处应该是使用CA证书提供的哈希算法&#xff09;进行比对。如果两者一致&…

2022最新版-李宏毅机器学习深度学习课程-P46 自监督学习Self-supervised Learning(BERT)

一、概述&#xff1a;自监督学习模型与芝麻街 参数量 ELMO&#xff1a;94MBERT&#xff1a;340MGPT-2&#xff1a;1542MMegatron&#xff1a;8BT5&#xff1a;11BTuring NLG&#xff1a;17BGPT-3&#xff1a;175BSwitch Transformer&#xff1a;1.6T 二、Self-supervised Lear…

云计算:无所不能的超级英雄

引言 在这个奇妙的时代&#xff0c;云计算如同一位无所不能的超级英雄&#xff0c;无处不在。从智能家居到无人驾驶&#xff0c;从虚拟现实到人工智能&#xff0c;云计算为我们的生活带来了智能、便捷和有趣。它以其强大的能力和灵活性&#xff0c;令我们的生活变得更加智能化…

库存预占架构升级方案设计-交易库存中心

背景介绍 &#xfeff; 伴随物流行业的迅猛发展&#xff0c;一体化供应链模式的落地&#xff0c;对系统吞吐、系统稳定发出巨大挑战&#xff0c;库存作为供应链的重中之重表现更为明显。近三年数据可以看出&#xff1a; &#xfeff;&#xfeff; 接入商家同比增长37.64%、货…

rviz添加qt插件

一、增加rviz plugin插件 资料&#xff1a;http://admin.guyuehome.com/42336 https://blog.51cto.com/u_13625033/6126970 这部分代码只是将上面两个链接中的代码整合在了一起&#xff0c;整合在一起后可以更好的理解其中的关系 1、创建软件包 catkin_create_pkg rviz_tel…

css呼吸效果实现

实现一个图片有规律的大小变化&#xff0c;呈现呼吸效果&#xff0c;怎么用CSS实现这个呼吸效果呢 一.实现 CSS实现动态效果可以使用动画( animation)来属性实现&#xff0c;放大缩小效果可以用transform: scale来实现&#xff0c;在这基础上有了动画&#xff0c;就可以设置一个…

每日汇评:黄金将测试1935美元的200日移动均线

金价在美联储主席鲍威尔发表讲话之前仍然脆弱&#xff1b; 在市场情绪喜忧参半的情况下&#xff0c;美元与美债收益率走势艰难&#xff1b; 在上升的三角形破裂和看跌的相对强弱指数中&#xff0c;黄金价格看向200日移动均线&#xff1b; 黄金周四早时在略低于1950美元的三周…