springboot使用rabbitmq

使用springboot创建rabbitMQ的链接。

整个项目结构如下:

img

1.maven依赖

<dependency>
    	<groupId>com.rabbitmq</groupId>
    	<artifactId>amqp-client</artifactId>
    	<version>3.4.1</version>
</dependency>

application.yaml的配置如下

spring:
  application:
    name: rabbitMQ
  rabbitmq:
    host: 192.168.142.128  #rabbitmq的主机名
    port: 5672			   #端口,默认5672
    username: itheima      #rabbitmq的账号
    password: 123321	   #密码
server:
  port: 8081               #项目启动端口

2.创建rabbitMQ配置类 – RabbitConfig。

@Configuration
@Slf4j
public class RabbitConfig {

    @Bean("directExchange")
    public DirectExchange directExchange() {
        return new DirectExchange(MQConstant.DIRECT_EXCHANGE);
    }

    @Bean("directQueue")
    public Queue directQueue() {
        return new Queue(MQConstant.DIRECT_QUEUE);
    }

    @Bean("bindingDirectExchange")
    public Binding bindingDirectExchange(@Qualifier("directExchange") DirectExchange directExchange,
                                         @Qualifier("directQueue") Queue directQueue) {
        return BindingBuilder.bind(directQueue).to(directExchange).with(MQConstant.ROUTING_KEY);
    }

}

3.创建RabbitMQ客户端类,主要是用来发送消息用的。

@Component
@Slf4j
public class RabbitMqClient {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(MessageBody messageBody){
        try{
            String uuid = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(uuid);
            rabbitTemplate.convertAndSend(MQConstant.DIRECT_EXCHANGE, MQConstant.ROUTING_KEY, JSON.toJSONString(messageBody),
                    new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            // 消息持久化
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            log.info("message send,{}", message);
                            return message;
                        }
                    },correlationData);
            log.info("message send successful");
        }catch (Exception e){
            log.info("send message error:{}",e);
        }
    }
}

4.创建接收消息类 —RabbitMqServer

@Component
@Slf4j
public class RabbitMqServer {

    @RabbitListener(queues = MQConstant.DIRECT_QUEUE)
    public void receive(String message) {
        try {
            log.info("receive message:{}", message);
            MessageBody messageBody = JSON.parseObject(message, MessageBody.class);
            switch (messageBody.getTopic()) {
                case QueueTopic.USER_LOGIN:
                    User user = JSON.parseObject(messageBody.getData(), User.class);
                    log.info("receive user:{}", user);
                    break;
                default:
                    log.info("no need hanndle message:{},topic:{}", message, messageBody.getTopic());
                    break;
            }
        }catch (Exception e){
            log.error("rabbitmq receive message error:{}", e);
        }
    }
}

5.有了以上准备后就可以开始向mq里面发送消息了,在单元测试编写测试代码。

@SpringBootTest(classes = RabbitMqApplication.class)
class RabbitMqApplicationTests {

	@Autowired
	private RabbitMqClient rabbitMqClient;

	@Test
	void testDirectSend() {
		//数据
		User user = new User();
		user.setId(123);
		user.setName("Lewin-jie2");
		user.setPassword("123");

		MessageBody messageBody = new MessageBody();
		messageBody.setData(JSON.toJSONString(user));

		long time = new Date().getTime();
		messageBody.setSendTime(time);
		//添加主题
		messageBody.setTopic(QueueTopic.USER_LOGIN);
		rabbitMqClient.send(messageBody);
	}

}

6.运行后,可以看到后台的日志,证明我们消息发送已经成功了。

image-20241109151751035

我们打开rabbitmq的控制台(http://你的主机名:15672),可以开到队列里面也收到了消息,但是还没有被消费。

image-20241109152401671

以上出现结果就证明rabbit已经是配置好了。那么我们来了解一下啊rabbitmq

简介:rabbitmq是基于amqp协议,用elang语言开发的一个高级的消息队列,以高性能,高可靠,高吞吐量而被大量应用到应用系统作为第三方消息中间件使用,为应用系统实现应用解耦削峰减流,异步消息

rabbitmq主要构造有,producter,consumer,exchange,queue组成

1.直连交换机(direct_exchange)。

刚刚配置的时候就是演示的producter发消息到直连交换机,然后再发送到queue中的过程。

2.广播交换机(fanout_exchange).

顾名思义,就是绑定该交换机的所有队列都可以收到这个交换机的消息

	@Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(MQConstant.FANOUT_EXCHANGE);
    }

    @Bean("aQueue")
    public Queue aQueue(){
        return new Queue(MQConstant.FANOUT_QUEUE_A);
    }

    @Bean("bQueue")
    public Queue bQueue(){
        return new Queue(MQConstant.FANOUT_QUEUE_B);
    }

    @Bean("cQueue")
    public Queue cQueue(){
        return new Queue(MQConstant.FANOUT_QUEUE_C);
    }

    /**
     * 绑定队列aQueue bQueue cQueue
     */
    @Bean("bindingFanoutExchange1")
    public Binding bindingFanoutExchange1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,
                                         @Qualifier("aQueue") Queue aQueue){
        return BindingBuilder.bind(aQueue).to(fanoutExchange);
    }

    @Bean("bindingFanoutExchange2")
    public Binding bindingFanoutExchange2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,
                                         @Qualifier("bQueue") Queue bQueue){
        return BindingBuilder.bind(bQueue).to(fanoutExchange);
    }

    @Bean("bindingFanoutExchange3")
    public Binding bindingFanoutExchange3(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,
                                         @Qualifier("cQueue") Queue cQueue){
        return BindingBuilder.bind(cQueue).to(fanoutExchange);
    }

编写controller类,再postman上面测试[http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag](http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag)

@Controller
@RequestMapping("/mq")
@Slf4j
public class SendMessageController {
    @Autowired
    private RabbitMqClient rabbitMqClient;

    @PostMapping("/sendFanoutMsg")
    public String sendFanoutMsg(@RequestParam("msg") String msg){
        try {
            MessageBody messageBody = new MessageBody();
            messageBody.setData(msg);
            rabbitMqClient.send1(messageBody);
        }catch (Exception e){
            log.error("sendFanoutMsg error{}", e);
        }
        return "send fanout msg success";
    }
}

结果:控制台收到消息了!!!

image-20241109213739205

3.主题交换机(topic_exchange)

topic_exchange和direct_exchange很像,topic有通配符。direct没有。

image-20241109214721827
  1. china.news 代表只关心中国新闻

  2. china.weather 代表只关心中国天气

  3. japan.news 代表只关心日本的新闻

  4. japan.weather 代表只关心日本的天气

    controller接口

    @PostMapping("/sendTopicMsg")
        public String sendTopicMsg(@RequestParam("msg") String msg,@RequestParam("type") String type){
            try {
                MessageBody messageBody = new MessageBody();
                messageBody.setData(msg);
                rabbitMqClient.send3(messageBody,type);
            }catch (Exception e){
                log.error("sendTopicMsg error{}", e);
            }
            return "send topic msg success";
        }
    

    利用postman测试。

    1.msg: “祖国75岁生日快乐”,type:“china.news”

    image-20241110102452558

    预测:queue1,queue4会接收到消息。

    image-20241110102659758

    2.msg: “日本大量排核废水,导致哥斯拉出现”,type:“japan.news”

    image-20241110102738974

    预测:queue2,queue4会接收到消息。

    image-20241110103638277

    3.msg: “今日日本出现大暴雨,怀疑是哥斯拉来了”,type:“Japan.weather”

    image-20241110103727452

    预测:queue2,queue3会接收到消息

    image-20241110103839382

topic-exchange在代码中如何使用。首先创建交换机,和队列,绑定交换机。

/*============================topic===========================*/
    @Bean("topicExchange")
    public TopicExchange topicExchange() {
        return new TopicExchange(MQConstant.TOPIC_EXCHANGE);
    }

    @Bean("queue1")
    public Queue queue1(){
        return new Queue(MQConstant.QUEUE1);
    }

    @Bean("queue2")
    public Queue queue2(){
        return new Queue(MQConstant.QUEUE2);
    }

    @Bean("queue3")
    public Queue queue3(){
        return new Queue(MQConstant.QUEUE3);
    }

    @Bean("queue4")
    public Queue queue4(){
        return new Queue(MQConstant.QUEUE4);
    }

    @Bean("bingTopicExchange1")
    public Binding bingTopicExchange1(@Qualifier("queue1") Queue queue1,
                                      @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue1).to(topicExchange).with(MQConstant.CHINA_);
    }

    @Bean("bingTopicExchange2")
    public Binding bingTopicExchange2(@Qualifier("queue2") Queue queue2,
                                      @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue2).to(topicExchange).with(MQConstant.JAPAN_);
    }

    @Bean("bingTopicExchange3")
    public Binding bingTopicExchange3(@Qualifier("queue3") Queue queue3,
                                      @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue3).to(topicExchange).with(MQConstant._WEATHER);
    }

    @Bean("bingTopicExchange4")
    public Binding bingTopicExchange4(@Qualifier("queue4") Queue queue4,
                                      @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(queue4).to(topicExchange).with(MQConstant._NEWS);
    }

消息发送

 public void send3(MessageBody messageBody,String routingKey) {
        try{
            String uuid = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(uuid);
            rabbitTemplate.convertAndSend(MQConstant.TOPIC_EXCHANGE, routingKey , JSON.toJSONString(messageBody),
                    new MessagePostProcessor() {
                        @Override
                        public Message postProcessMessage(Message message) throws AmqpException {
                            // 消息持久化
                            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                            log.info("message send,{}", message);
                            return message;
                        }
                    },correlationData);
            log.info("message send successful");
        }catch (Exception e){
            log.info("send message error:{}",e);
        }
    }

消息接收

@RabbitListener(queues = MQConstant.QUEUE1)
    public void receive4(String message) {
        log.info("topic exchange");
        try {
            log.info("queue1 receive message:{}", message);
            MessageBody messageBody = JSON.parseObject(message, MessageBody.class);
            log.info("receive message:{}", messageBody.getData());
        }catch (Exception e){
            log.error("rabbitmq receive a message error:{}", e);
        }
    }

    @RabbitListener(queues = MQConstant.QUEUE2)
    public void receive5(String message) {
        log.info("topic exchange");
        try {
            log.info("queue2 receive message:{}", message);
            MessageBody messageBody = JSON.parseObject(message, MessageBody.class);
            log.info("receive message:{}", messageBody.getData());
        }catch (Exception e){
            log.error("rabbitmq receive a message error:{}", e);
        }
    }

    @RabbitListener(queues = MQConstant.QUEUE3)
    public void receive6(String message) {
        log.info("topic exchange");
        try {
            log.info("queue3 receive message:{}", message);
            MessageBody messageBody = JSON.parseObject(message, MessageBody.class);
            log.info("receive message:{}", messageBody.getData());
        }catch (Exception e){
            log.error("rabbitmq receive a message error:{}", e);
        }
    }

    @RabbitListener(queues = MQConstant.QUEUE4)
    public void receive7(String message) {
        log.info("topic exchange");
        try {
            log.info("queue4 receive message:{}", message);
            MessageBody messageBody = JSON.parseObject(message, MessageBody.class);
            log.info("receive message:{}", messageBody.getData());
        }catch (Exception e){
            log.error("rabbitmq receive a message error:{}", e);
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/963801.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

安卓(android)订餐菜单【Android移动开发基础案例教程(第2版)黑马程序员】

一、实验目的&#xff08;如果代码有错漏&#xff0c;可查看源码&#xff09; 1.掌握Activity生命周的每个方法。 2.掌握Activity的创建、配置、启动和关闭。 3.掌握Intent和IntentFilter的使用。 4.掌握Activity之间的跳转方式、任务栈和四种启动模式。 5.掌握在Activity中添加…

RabbitMQ快速上手及入门

概念 概念&#xff1a; publisher&#xff1a;生产者&#xff0c;也就是发送消息的一方 consumer&#xff1a;消费者&#xff0c;也就是消费消息的一方 queue&#xff1a;队列&#xff0c;存储消息。生产者投递的消息会暂存在消息队列中&#xff0c;等待消费者处理 exchang…

java命令详解

这里以jdk8为例子&#xff0c;查看默认的垃圾回收器 java -XX:PrintCommandLineFlags -version-XX:UseParallelGC : Parallel Scavenge 和 Parallel Old 组合 -XX:InitialHeapSize268435456 : 初始化堆大小&#xff08;字节&#xff09; -XX:MaxHeapSize4294967296 : 最大堆大…

自主Shell命令行解释器

什么是命令行 我们一直使用的"ls","cd","pwd","mkdir"等命令&#xff0c;都是在命令行上输入的&#xff0c;我们之前对于命令行的理解&#xff1a; 命令行是干啥的&#xff1f;是为我们做命令行解释的。 命令行这个东西实际上是我们…

分析哲学:从 语言解剖到 思想澄清的哲学探险

分析哲学&#xff1a;从 语言解剖 到 思想澄清 的哲学探险 第一节&#xff1a;分析哲学的基本概念与公式解释 【通俗讲解&#xff0c;打比方来讲解&#xff01;】 分析哲学&#xff0c;就像一位 “语言侦探”&#xff0c;专注于 “解剖语言”&#xff0c;揭示我们日常使用的语…

自定义数据集 使用paddlepaddle框架实现逻辑回归

导入必要的库 import numpy as np import paddle import paddle.nn as nn 数据准备&#xff1a; seed1 paddle.seed(seed)# 1.散点输入 定义输入数据 data [[-0.5, 7.7], [1.8, 98.5], [0.9, 57.8], [0.4, 39.2], [-1.4, -15.7], [-1.4, -37.3], [-1.8, -49.1], [1.5, 75.6…

QtCreator在配置Compilers时,有一个叫ABI的选项,那么什么是ABI?

问题提出 QtCreator在配置Compilers时,有一个叫ABI的选项,那么什么是ABI&#xff1f; ABI&#xff08;Application Binary Interface&#xff09;介绍 ABI&#xff08;Application Binary Interface&#xff0c;应用二进制接口&#xff09;是指应用程序与操作系统或其他程序…

[STM32 标准库]EXTI应用场景 功能框图 寄存器

一、EXTI 外部中断在嵌入式系统中有广泛的应用场景&#xff0c;如按钮开关控制&#xff0c;传感器触发&#xff0c;通信接口中断等。其原理都差不多&#xff0c;STM32会对外部中断引脚的边沿进行检测&#xff0c;若检测到相应的边沿会触发中断&#xff0c;在中断中做出相应的处…

Maven jar 包下载失败问题处理

Maven jar 包下载失败问题处理 1.配置好国内的Maven源2.重新下载3. 其他问题 1.配置好国内的Maven源 打开⾃⼰的 Idea 检测 Maven 的配置是否正确&#xff0c;正确的配置如下图所示&#xff1a; 检查项⼀共有两个&#xff1a; 确认右边的两个勾已经选中&#xff0c;如果没有请…

【JavaScript】Web API事件流、事件委托

目录 1.事件流 1.1 事件流和两个阶段说明 1.2 事件捕获 1.3 事件冒泡 1.4 阻止冒泡 1.5 解绑事件 L0 事件解绑 L2 事件解绑 鼠标经过事件的区别 两种注册事件的区别 2.事件委托 案例 tab栏切换改造 3.其他事件 3.1 页面加载事件 3.2 页面滚动事件 3.2 页面滚…

Spring Cloud工程搭建

目录 工程搭建 搭建父子工程 创建父工程 Spring Cloud版本 创建子项目-订单服务 声明项⽬依赖 和 项⽬构建插件 创建子项目-商品服务 声明项⽬依赖 和 项⽬构建插件 工程搭建 因为拆分成了微服务&#xff0c;所以要拆分出多个项目&#xff0c;但是IDEA只能一个窗口有一…

neo4j入门

文章目录 neo4j版本说明部署安装Mac部署docker部署 neo4j web工具使用数据结构图数据库VS关系数据库 neo4j neo4j官网Neo4j是用ava实现的开源NoSQL图数据库。Neo4作为图数据库中的代表产品&#xff0c;已经在众多的行业项目中进行了应用&#xff0c;如&#xff1a;网络管理&am…

selenium记录Spiderbuf例题C03

防止自己遗忘&#xff0c;故作此为记录。 鸢尾花数据集(Iris Dataset) 这道题牵扯到JS动态加载。 步骤&#xff1a; &#xff08;1&#xff09;进入例题&#xff0c;需要找到按钮规律。 flip_xpath: str r"//li/a[onclickgetIrisData({});]" &#xff08;2&…

自定义数据集 使用pytorch框架实现逻辑回归并保存模型,然后保存模型后再加载模型进行预测,对预测结果计算精确度和召回率及F1分数

导入必要的库&#xff1a; import numpy as np import torch import torch.nn as nn import torch.optim as optim from sklearn.metrics import precision_score, recall_score, f1_score 准备数据&#xff1a; class1_points np.array([[1.9, 1.2],[1.5, 2.1],[1.9, 0.5]…

如何运行Composer安装PHP包 安装JWT库

1. 使用Composer Composer是PHP的依赖管理工具&#xff0c;它允许你轻松地安装和管理PHP包。对于JWT&#xff0c;你可以使用firebase/php-jwt这个库&#xff0c;这是由Firebase提供的官方库。 安装Composer&#xff08;如果你还没有安装的话&#xff09;&#xff1a; 访问Co…

《Linux服务与安全管理》| 数据库服务器安装和配置

《Linux服务与安全管理》| 数据库服务器安装和配置 目录 《Linux服务与安全管理》| 数据库服务器安装和配置 任务一&#xff1a; 安装PostgreSQL数据库&#xff0c;设置远程登录&#xff0c;客户端可以成功登录并操作数据库。 任务二&#xff1a; 安装MySQL数据库&#xf…

【贪心算法篇】:“贪心”之旅--算法练习题中的智慧与策略(一)

✨感谢您阅读本篇文章&#xff0c;文章内容是个人学习笔记的整理&#xff0c;如果哪里有误的话还请您指正噢✨ ✨ 个人主页&#xff1a;余辉zmh–CSDN博客 ✨ 文章所属专栏&#xff1a;贪心算法篇–CSDN博客 文章目录 一.贪心算法1.什么是贪心算法2.贪心算法的特点 二.例题1.柠…

DRM系列七:Drm之CREATE_DUMB

本系列文章基于linux 5.15 DRM驱动的显存由GEM&#xff08;Graphics execution management&#xff09;管理。 一、创建流程 创建buf时&#xff0c;user层提供需要buf的width,height以及bpp(bite per pixel)&#xff0c;然后调用drmIoctl(fd, DRM_IOCTL_MODE_CREATE_DUMB, &…

Python从0到100(八十七):CNN网络详细介绍及WISDM数据集模型仿真

前言&#xff1a; 零基础学Python&#xff1a;Python从0到100最新最全教程。 想做这件事情很久了&#xff0c;这次我更新了自己所写过的所有博客&#xff0c;汇集成了Python从0到100&#xff0c;共一百节课&#xff0c;帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…

WPF进阶 | WPF 动画特效揭秘:实现炫酷的界面交互效果

WPF进阶 | WPF 动画特效揭秘&#xff1a;实现炫酷的界面交互效果 前言一、WPF 动画基础概念1.1 什么是 WPF 动画1.2 动画的基本类型1.3 动画的核心元素 二、线性动画详解2.1 DoubleAnimation 的使用2.2 ColorAnimation 实现颜色渐变 三、关键帧动画深入3.1 DoubleAnimationUsin…