SpringBoot 整合 RabbitMQ (四十一)

二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。

上一章简单介绍了SpringBoot 实现 Web 版本控制 (四十),如果没有看过,请观看上一章

关于消息中间件 RabbitMQ, 可以看老蝴蝶之前的文章: https://blog.csdn.net/yjltx1234csdn/category_12130444.html

创建一个 普通的 Spring Boot Web 项目

整合 RabbitMQ

pom.xml 添加依赖


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.11.RELEASE</version>
    </parent>
        

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--引入 amqp 即rabbitmq 的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
        </dependency>

    </dependencies>
        

application.yml 配置文件配置

#配置服务器端相应消息
server:
  port: 8088
  servlet:
    context-path: /Server
#配置rabbitmq的相关信息
spring:
  rabbitmq:
    host: 127.0.0.1  # 配置主机
    port: 5672  # 配置端口
    virtual-host: yjl   # 配置虚拟主机
    username: guest   # 配置用户名
    password: guest  # 配置密码
    connection-timeout: 15000
    # 配置回调
    publisher-confirm-type: correlated
#配置rabbit 队列,交换器,路由等相关信息
rabbit:
  fanout:
    exchange: fanout_logs
    queue1: debug_console
    queue2: debug_file
    queue3: debug_db
  direct:
    exchange: direct_logs
    queue1: debug_console
    queue2: debug_file
    queue3: debug_db
  topic:
    exchange: topic_logs
    queue1: topic_log_console
    queue2: topic_log_file
  ttl:
    x_exchange: x
    queue_a: QA
    queue_b: QB
    queue_c: QC
    y_dead_exchange: y
    y_dead_queue_d: QD
    delayed_exchange: delayed_exchange2
    delayed_queue: delayed.queue
    delayed_routing_key: delayed_routing
  confirm:
    # 确认
    exchange: confirm_exchange_1
    queue: confirm_queue
    routing-key: key1
    backup_exchange: backup_exchange
    backup_queue: backup_queue
    warn_queue: warn_queue    

项目结构

项目结构如下:

image.png

SendMessageService 为 生产者发送消息的接口服务。

RecieveMessageService 为 消费者接收到消息后,进行的业务操作流程。

SendMessageController 为生产者创建消息的 Controller 入口。

创建队列

手动在 RabbitMQ 上创建一个队列 debug_console, 如果不存在的话。

image.png

简单的生产者发送消息

    @Resource
    private SendMessageService sendMessageService;

    @RequestMapping("/queue")
    public String queue() {
        Integer randNum = (int) (Math.random() * 1000 + 1);
        sendMessageService.sendQueue(randNum);
        return "存储到队列中的数据是:" + randNum;

    }

    @RequestMapping("/work")
    public String work() {
        sendMessageService.sendWork();
        return "批量生成循环数字";
    }

往队列发送消息, 使用 RabbitTemplate rabbitTemplate (与 RedisTemplate, JdbcTemplate 形似)

SendMessageServiceImpl.java

@Service
public class SendMessageServiceImpl implements SendMessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbit.direct.queue1}")
    private String queueName;


    // 最普通的.
    @Override
    public void sendQueue(Integer randNum) {
        // 只发送一条消息
        rabbitTemplate.convertAndSend(queueName, String.valueOf(randNum));
    }


    @Override
    public void sendWork() {
        for (int i = 0; i < 10; i++) {
            // 发送多条消息
            rabbitTemplate.convertAndSend(queueName, "第" + i + "条消息,消息内容是:" + i);
        }
    }
}

队列消息消费

ReceiveMessageServiceImpl.java

    @Override
    public void handlerMessage(String message) {
        log.info(">>>> 获取到消息 {},开始进行业务处理",message);
        // 接下来,就是具体的业务去处理这些消息了.
    }
@Component
@Slf4j
public class DirectMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue1}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"debug", "info", "warn", "error"}
            )
    })
    public void fanoutQueueConsumerConsole(String message) {
        log.info("控制台打印输出:" + message);
        receiveMessageService.handlerMessage("控制台打印输出 direct:" + message);
    }
}

验证

访问网址: http://localhost:8088/Server/send/queue

image.png

访问网址: http://localhost:8088/Server/send/work

image.png

普通的消息异步处理是完成了。 但重要的,应该是 Fanout, Direct 和 Topic 的主题处理.

Fanout 交换机消息配置

创建交换机,队列并绑定 FanoutConfig

package com.yjl.amqp.config.fanout;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Fanout 形式的 生产
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
public class FanoutConfig {

    @Value("${rabbit.fanout.queue1}")
    private String queue1;
    @Value("${rabbit.fanout.queue2}")
    private String queue2;

    @Value("${rabbit.fanout.exchange}")
    private String exchange;


    // 构建队列 Bean 和 Exchange Bean
    @Bean(value = "fanout_queue1")
    public Queue queue1() {
        return new Queue(queue1);
    }

    @Bean(value = "fanout_queue2")
    public Queue queue2() {
        return new Queue(queue2);
    }

    @Bean(value = "fanout_exchange")
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(exchange);
    }


    //进行绑定
    @Bean
    Binding bindingFanoutExchange1(@Qualifier("fanout_queue1") Queue queue,
                                   @Qualifier("fanout_exchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    Binding bindingFanoutExchange2(@Qualifier("fanout_queue2") Queue queue,
                                   @Qualifier("fanout_exchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

}

监听队列 FanoutMqConsumer

也可以使用 RabbitListener 进行绑定

package com.yjl.amqp.config.fanout;

import com.yjl.amqp.service.ReceiveMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * fanout 的消费
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
@Slf4j
public class FanoutMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;


    @RabbitListener(queues = {"${rabbit.fanout.queue1}", "${rabbit.fanout.queue2}"})
    public void fanoutQueueConsumer1An2(String message) {
        log.info("队列 fanout:" + message);
        receiveMessageService.handlerMessage("第一个消费者和第二个消费者获取消息 fanout:" + message);
    }

    // 也可以通过 RabbitListener 进行配置
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.fanout.queue3}"),
                    exchange = @Exchange(type = "fanout", name = "${rabbit.fanout.exchange}"),
                    key = {}
            )
    })
    public void fanoutQueueConsumer3(String message) {
        log.info("第三个消费者获取消息 fanout:" + message);
        receiveMessageService.handlerMessage("第三个消费者获取消息 fanout:" + message);
    }
}

发送消息

SendMessageController.java

    @RequestMapping("/fanout")
    public String fanout() {
        sendMessageService.fanout();
        return "fanout生成消息";
    }

SendMessageServiceImpl.java

    @Override
    public void fanout() {
        for (int i = 1; i <= 5; i++) {
            rabbitTemplate.convertAndSend(fanoutExchange, "", "fanout 发送消息:" + i);
        }
    }

验证

输入网址: http://localhost:8088/Server/send/fanout

image.png

Direct 交换机消息配置

通过注解绑定和消费队列消息 DirectMqConsumer

ackage com.yjl.amqp.config.direct;

import com.yjl.amqp.service.ReceiveMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 用途描述
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
@Slf4j
public class DirectMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue1}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"debug", "info", "warn", "error"}
            )
    })
    public void fanoutQueueConsumerConsole(String message) {
        log.info("控制台打印输出:" + message);
        receiveMessageService.handlerMessage("控制台打印输出 direct:" + message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue2}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"info", "warn", "error"}
            )
    })
    public void fanoutQueueConsumerFile(String message) {
        log.info("文件 打印输出:" + message);
        receiveMessageService.handlerMessage("文件打印输出 direct:" + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.direct.queue3}"),
                    exchange = @Exchange(type = "direct", name = "${rabbit.direct.exchange}"),
                    key = {"warn", "error"}
            )
    })
    public void fanoutQueueConsumerDb(String message) {
        log.info("Db 打印输出:" + message);
        receiveMessageService.handlerMessage("DB 打印输出 direct:" + message);
    }

}

发送消息

SendMessageController.java

  @RequestMapping("/direct")
    public String direct() {
        sendMessageService.direct();
        return "direct 生成消息";
    }

SendMessageServiceImpl.java

     @Override
    public void direct() {
        rabbitTemplate.convertAndSend(directExchange, "debug", "debug 消息");
        rabbitTemplate.convertAndSend(directExchange, "info", "info 消息");
        rabbitTemplate.convertAndSend(directExchange, "warn", "warn 消息");
        rabbitTemplate.convertAndSend(directExchange, "error", "error 消息");
    }

验证

输入网址: http://localhost:8088/Server/send/direct

image.png

Topic 交换机消息配置

创建交换机,队列并绑定 TopicConfig

package com.yjl.amqp.config.topic;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Topic 形式的 生产
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
public class TopicConfig {

    @Value("${rabbit.topic.queue1}")
    private String queue1;

    @Value("${rabbit.topic.exchange}")
    private String exchange;


    // 构建队列 Bean 和 Exchange Bean
    @Bean(value = "topic_queue1")
    public Queue queue1() {
        return new Queue(queue1);
    }

    @Bean(value = "topic_exchange")
    TopicExchange topicExchange() {
        return new TopicExchange(exchange);
    }


    //进行绑定
    @Bean
    Binding bindingTopicExchange(@Qualifier("topic_queue1") Queue queue,
                                 @Qualifier("topic_exchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange)
                .with("*.orange.*");
    }

}

监听队列 TopicMqConsumer

package com.yjl.amqp.config.topic;

import com.yjl.amqp.service.ReceiveMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * topic 的队列配置
 *
 * @author yuejianli
 * @date 2022-11-22
 */
@Component
@Slf4j
public class TopicMqConsumer {

    @Resource
    private ReceiveMessageService receiveMessageService;

    @RabbitListener(queues = {"${rabbit.topic.queue1}"})
    public void fanoutQueueConsumer1An2(String message) {
        log.info("队列 topic:" + message);
        receiveMessageService.handlerMessage("console topic:" + message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue("${rabbit.topic.queue2}"),
                    exchange = @Exchange(type = "topic", name = "${rabbit.topic.exchange}"),
                    key = {"lazy.#", "*.*.rabbit"}
            )
    })
    public void fanoutQueueConsumerConsole(String message) {
        log.info("file topic:" + message);
        receiveMessageService.handlerMessage("file topic:" + message);
    }
}

发送消息

SendMessageController.java

   @RequestMapping("/topic")
    public String topic() {
        sendMessageService.topic();
        return "topic 生成消息";
    }

SendMessageServiceImpl.java

  @Override
    public void topic() {

        Map<String, String> messageMap = new HashMap<>();

        messageMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
        messageMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");

        messageMap.put("quick.orange.fox", "被队列 Q1 接收到");
        messageMap.put("lazy.brown.fox", "被队列 Q2 接收到");
        messageMap.put("info", "一个 info 消息3 ");

        messageMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        messageMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        messageMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");

        messageMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

        messageMap.forEach((routingKey, message) -> {
            try {

                rabbitTemplate.convertAndSend(topicExchange, routingKey,
                        message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

验证

输入网址: http://localhost:8088/Server/send/topic

image.png

这是 RabbitMQ 异步处理消息的常见用法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/12420.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

小红书热词速看 | 古茗有何营销动作?

【导语】 据古茗官方数据&#xff0c;新系列推出当日即售空&#xff0c;单店最高出杯420杯&#xff0c;最快24小时内卖断货&#xff1b;上架3天&#xff0c;销量突破100万杯&#xff1b;10天&#xff0c;就售出了343万杯&#xff0c;其中2款牛油果奶昔用掉了40万斤牛油果&…

【Java基础】迷宫问题的Java代码实现

迷宫问题通常是指在给定的迷宫中&#xff0c;找到从起点到终点的路径的问题。迷宫通常由障碍物和自由空间组成&#xff0c;其中障碍物是不可穿越的区域&#xff0c;自由空间可以穿越。解决迷宫问题的方法有很多种&#xff0c;本文使用递归算法来解决迷宫问题。 一、使用递归算法…

vLive带你走进虚拟直播世界

虚拟直播是什么&#xff1f; 虚拟直播是基于5G实时渲染技术&#xff0c;在绿幕环境下拍摄画面&#xff0c;通过实时抠像、渲染与合成&#xff0c;再推流到直播平台的一种直播技术。尽管这种技术早已被影视工业所采用&#xff0c;但在全民化进程中却是困难重重&#xff0c;面临…

【状态估计】基于增强数值稳定性的无迹卡尔曼滤波多机电力系统动态状态估计(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

TeeChart Pro ActiveX 2023.3.20 Crack

TeeChart Pro ActiveX 图表组件库提供数百种 2D 和 3D 图形样式、56 种数学和统计函数供您选择&#xff0c;以及无限数量的轴和 14 个工具箱组件。图表控件可以有效地用于创建多任务仪表板。 插件的多功能性 ActiveX 图表控件作为服务器端库中的 Web 图表、脚本化 ASP 图表或桌…

Docker应用部署

文章目录 Docker 应用部署一、部署MySQL二、部署Tomcat三、部署Nginx四、部署Redis Docker 应用部署 一、部署MySQL 搜索mysql镜像 docker search mysql拉取mysql镜像 docker pull mysql:5.6创建容器&#xff0c;设置端口映射、目录映射 # 在/root目录下创建mysql目录用于…

缩短客户响应时间的 5 种方法

在当今竞争激烈的世界中&#xff0c;客户服务就是确保卓越的客户体验。这意味着顶级品牌必须竞争为客户提供最好的客户服务&#xff0c;而且提供最快的响应时间。 改善客户服务响应时间的 5种方法 1.使用正确的客户服务软件 客户服务软件是您可以为提高客户服务而进行的最佳…

【JVM】常量池

常量池&#xff08;Runtime Constant Poo&#xff09; 常量池Java中可以分为三种&#xff1a;字符串常量池&#xff08;堆&#xff09;、Class文件常量池、运行时常量池&#xff08;堆&#xff09;。 1.字符串常量池&#xff08;String Pool&#xff09; 为了提升性能和减少…

C++ 23 实用工具(二)绑定工具

C 23 实用工具&#xff08;二&#xff09;绑定工具 Adaptors for Functions std::bind、std::bind_front、std::bind_back和std::function这四个函数非常适合一起使用。 其中&#xff0c;std::bind、std::bind_front和std::bind_back可以让您即时创建新的函数对象&#xff0c…

protobuf编码格式解析

示例 假如定义一个如下的protobuf类型 message Person {required string user_name 1;optional int64 favorite_number 2;repeated string interests 3; }将其赋值为: user_name : "Martin" favorite_number : 1337 interests:"daydrea…

(PCB系列三)AD六层板布线经验累积

目录 1、布局&#xff1a; 2、创建电源类PWR 3、高速部分可以加屏蔽罩&#xff0c; 4、EMMC和NANDFLASH采取兼容放置&#xff08;创建联合&#xff09; 5、HDMI设计 6、就近原则摆放 7、AV端口 8、模拟信号&#xff08;1字型或L型走线&#xff09; 9、WIFI模块 10、局…

【精华】表格结构识别模型研究进展

表格结构识别模型研究进展 合合信息&#xff1a;表格识别与内容提炼技术理解及研发趋势 OCR之表格结构识别综述 表格识别技术综述 用于表检测和结构识别的深度学习&#xff1a;综述 &#xff08;1&#xff09;PP-Structure 速度提升11倍&#xff0c;一键PDF转Word PP-St…

【软考备战·希赛网每日一练】2023年4月12日

文章目录 一、今日成绩二、错题总结第一题 三、知识查缺 题目及解析来源&#xff1a;2023年04月12日软件设计师每日一练 一、今日成绩 二、错题总结 第一题 解析&#xff1a; 依据题目画出PERT图如下&#xff1a; 关键路径长度&#xff08;从起点到终点的路径中最长的一条&…

递归算法_字符串反转_20230412

递归算法-字符串反转 前言 递归算法对解决重复的子问题非常有效&#xff0c;字符串反转也可以用递归算法加以解决&#xff0c;递归算法设计的关键是建立子问题和原问题之间的相关性&#xff0c;同时需要确立递归退出的条件&#xff1b;如果递归退出的条件无法确定&#xff0c…

说过的话就一定要办到 - redo日志

一、什么是redo日志&#xff1f; 如果我们只在内存的 Buffer Pool 中修改了页面&#xff0c;假设在事务提交后突然发生了某个故障&#xff0c;导致内存中的数据都失效了&#xff0c;那么这个已经提交了的事务对数据库中所做的更改也就跟着丢失了&#xff0c;这会导致事务会失去…

4.基于多目标粒子群算法冷热电联供综合能源系统运行优化

4.基于多目标粒子群算法冷热电联供综合能源系统运行优化《文章复现》 相关资源代码&#xff1a;基于多目标粒子群算法冷热电联供综合能源系统运行优化 基于多目标算法的冷热电联供型综合能源系统运行优化 考虑用户舒适度的冷热电多能互补综合能源系统优化调度 仿真平台:matl…

Segment Anything论文翻译,SAM模型,SAM论文,SAM论文翻译;一个用于图像分割的新任务、模型和数据集;SA-1B数据集

【论文翻译】- Segment Anything / Model / SAM论文 论文链接&#xff1a; https://arxiv.org/pdf/2304.02643.pdfhttps://ai.facebook.com/research/publications/segment-anything/ 代码连接&#xff1a;https://github.com/facebookresearch/segment-anything 论文翻译&…

性能测试,python 内存分析工具 -memray

Memray是一个由彭博社开发的、开源内存剖析器&#xff1b;开源一个多月&#xff0c;已经收获了超8.4k的star&#xff0c;是名副其实的明星项目。今天我们就给大家来推荐这款python内存分析神器。 Memray可以跟踪python代码、本机扩展模块和python解释器本身中内存分配&#xf…

VR全景展示,VR全景平台,助理全景展示新模式

引言&#xff1a; VR全景展示是一种新型的展示方式&#xff0c;它利用虚拟现实技术和全景拍摄技术&#xff0c;使参观者可以身临其境地进入虚拟展览空间。这种展示方式不仅能够提供更加沉浸式的参观体验&#xff0c;还可以解决传统展览所面临的时间和地域限制等问题。 VR全景展…

测试工具之JMH详解

文章目录 1 JMH1.1 引言1.2 简介1.3 DEMO演示1.3.1 测试项目构建1.3.2 编写性能测试1.3.3 执行测试1.3.4 报告结果 1.4 注解介绍1.4.1 BenchmarkMode1.4.2 Warmup1.4.3 Measurement1.4.4 Threads1.4.5 Fork1.4.6 OutputTimeUnit1.4.7 Benchmark1.4.8 Param1.4.9 Setup1.4.10 Te…