工作中常用的RabbitMQ实践

目录

1.前置知识

准备工作

2.导入依赖

3.生产者

4.消费者

5.验证

验证Direct

验证Fanout

验证Topic



1.前置知识

rabbitmq有五种工作模式;按照有无交换机分为两大类

无交换机的:简单队列(一对一,单生产单消费)、工作队列(工作队列有轮训分发和公平分发两种模式)

有交换机:发布-订阅、路由模式、主题模式

准备工作

安装rabbitmq,并成功启动

2.导入依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.生产者

生产端项目结构:
 

逻辑:生产者只对交换机进行生产,至于队列绑定等放在消费端进行执行

BusinessConfig

定义了三个不同类型的交换机

direct类型:(当生产者往该交换机发送消息时,他必须指定固定的routingkey,当routingkey值为空,他也会匹配routingkey为空的队列)

fanout类型:(当生产者往该交换机发送消息时,他所绑定的队列都会收到消息,routingkey即使写了也会忽略,一般为空字符串)

Topic类型:(当生产者往该交换机发送消息时,他并不像direct指定固定的routingkey,可以进行模糊匹配,当该routingkey为空时,他会匹配routingkey为空的队列)

package com.zsp.quartz.queue;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;


/**
 * @Author: ZhangSP
 * @Date: 2023/12/7  14:05
 */
public class BusinessConfig {
    // 声明direct交换机
    public static final String EXCHANGE_DIRECT= "exchange_direct_inform";

    // 声明fanout交换机
    public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";

    // 声明topic交换机
    public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
}

TestProducer

生产消息

package com.zsp.quartz.queue;

import com.alibaba.fastjson.JSON;
import com.zsp.quartz.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class TestProducer {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void Producer_topics_springbootTest() {

        //使用rabbitTemplate发送消息
        String message = "";
        User user = new User();
        user.setName("张三");
        user.setEmail("anjduahsd");
        message = JSON.toJSONString(user);

        // direct
        rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_DIRECT,"",message);

        // fanout
        rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_FANOUT,"",message);

        // topic
        rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_TOPIC,"",message);
    }
}

4.消费者

消费者目录结构:

BusinessConfig内容解析:

①定义交换机类型

②配置交换机与队列的绑定关系

③通过容器工厂声明队列

package com.zsp.consumer.queue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * @Author: ZhangSP
 * @Date: 2023/12/7  14:05
 */
@Slf4j
@Configuration
public class BusinessConfig {
    // 声明direct
    public static final String EXCHANGE_DIRECT= "exchange_direct_inform";
    public static final String QUEUE_DIRECT_EMAIL = "queue_direct_inform_email";
    public static final String QUEUE_DIRECT_SMS = "queue_direct_inform_sms";
    public void BindDirectEmail(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);
            channel.queueDeclare(QUEUE_DIRECT_EMAIL, true, false, false, null);
            channel.queueBind(QUEUE_DIRECT_EMAIL, EXCHANGE_DIRECT, "");
        } catch (Exception e) {
            log.error("声明Direct->email队列时失败", e);
        }
    }
    public void BindDirectSms(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);
            channel.queueDeclare(QUEUE_DIRECT_SMS, true, false, false, null);
            channel.queueBind(QUEUE_DIRECT_SMS, EXCHANGE_DIRECT, "123");
        } catch (Exception e) {
            log.error("声明Direct->sms失败", e);
        }
    }
    // 声明fanout
    public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";
    public static final String QUEUE_FANOUT_EMAIL = "queue_fanout_inform_email";
    public static final String QUEUE_FANOUT_SMS = "queue_fanout_inform_sms";
    public void BindFanoutEmail(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);
            channel.queueDeclare(QUEUE_FANOUT_EMAIL, true, false, false, null);
            channel.queueBind(QUEUE_FANOUT_EMAIL, EXCHANGE_FANOUT, "");
        } catch (Exception e) {
            log.error("声明Fanout->email队列时失败", e);
        }
    }
    public void BindFanoutSms(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);
            channel.queueDeclare(QUEUE_FANOUT_SMS, true, false, false, null);
            channel.queueBind(QUEUE_FANOUT_SMS, EXCHANGE_FANOUT,"");
        } catch (Exception e) {
            log.error("声明Fanout->sms失败", e);
        }
    }

    // 声明topic
    public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
    public static final String QUEUE_TOPIC_EMAIL = "queue_topic_inform_email";
    public static final String QUEUE_TOPIC_SMS = "queue_topic_inform_sms";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    public void BindTopicEmail(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);
            channel.queueDeclare(QUEUE_TOPIC_EMAIL, true, false, false, null);
            channel.queueBind(QUEUE_TOPIC_EMAIL, EXCHANGE_TOPIC, ROUTINGKEY_EMAIL);
        } catch (Exception e) {
            log.error("声明Topic->email队列时失败", e);
        }
    }
    public void BindTopicSms(Channel channel) {
        try {
            channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);
            channel.queueDeclare(QUEUE_TOPIC_SMS, true, false, false, null);
            channel.queueBind(QUEUE_TOPIC_SMS, EXCHANGE_TOPIC,"");
        } catch (Exception e) {
            log.error("声明Topic->sms失败", e);
        }
    }



    // 声明队列
    @Autowired
    @Qualifier(value = "zspConnectionFactory")
    private ConnectionFactory connectionFactory;
    @PostConstruct
    public void shengmingQueue() {
        try {
            Connection connection = connectionFactory.createConnection();
            Channel channel = connection.createChannel(false);
            BindDirectEmail(channel);
            BindDirectSms(channel);
            BindFanoutEmail(channel);
            BindFanoutSms(channel);
            BindTopicEmail(channel);
            BindTopicSms(channel);
        } catch (Exception e) {
            log.error("业务实例声明绑定队列报错:",e);
        }
    }
}

RabbitFactory内容解析:

①创建自定义连接工厂

②通过@Qualifier准确注入连接工厂,创建个性化容器工厂

package com.zsp.consumer.queue;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitFactory {

    @Bean("zspConnectionFactory")
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        // 设置RabbitMQ的连接信息,如主机名、端口号、用户名和密码等
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");
        return connectionFactory;
    }

    @Bean("rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("zspConnectionFactory") ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(5);
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}

ReceiveHandler内容解析:

监听绑定的队列消息

package com.zsp.consumer.queue;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ReceiveHandler {
    //监听自定义的Direct队列
    @RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_SMS, containerFactory = "rabbitListenerContainerFactory")
    public void directSMS(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Direct队列->sms队列" + jsonObject);
    }

    @RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_EMAIL, containerFactory = "rabbitListenerContainerFactory")
    public void directEmail(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Direct队列->email队列" + jsonObject);
    }

    //监听自定义的Fanout队列
    @RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_SMS, containerFactory = "rabbitListenerContainerFactory")
    public void FanoutSMS(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Fanout队列->sms队列" + jsonObject);
    }

    @RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_EMAIL, containerFactory = "rabbitListenerContainerFactory")
    public void FanoutEmail(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Fanout队列->email队列" + jsonObject);
    }

    //监听自定义的Topic队列
    @RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_SMS, containerFactory = "rabbitListenerContainerFactory")
    public void TopicSMS(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Topic队列->sms队列" + jsonObject);
    }

    @RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_EMAIL, containerFactory = "rabbitListenerContainerFactory")
    public void TopicEmail(String msg, Message message, Channel channel) {
        JSONObject jsonObject = JSONObject.parseObject(msg);
        System.out.println("Topic队列->email队列" + jsonObject);
    }
}

5.验证

先启动消费者端,然后执行TestProducer

验证Direct

1.向routingkey为空的队列发消息

我们在消费者端配置了routingkey为空的队列,叫做 QUEUE_DIRECT_EMAIL

因此会打印出下面这条记录

2.向routingkey为123的队列发消息

我们在消费者端配置了routingkey为123的队列,叫做 QUEUE_DIRECT_SMS

因此会打出下面这条记录

验证Fanout

谁跟我绑定了,我都发

验证Topic

模糊匹配routingkey

匹配sms队列

会把下面这个打印出来

需要注意的是如果我们没有自定义容器工厂的话,这个containerFactory可以不写
简单理解就是实例,也就是rabbitmq服务地址是在哪里,实例包括了域名、端口、账号、密码等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/235849.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

内网穿透的应用-如何结合Cpolar内网穿透工具实现在IDEA中远程访问家里或者公司的数据库

文章目录 1. 本地连接测试2. Windows安装Cpolar3. 配置Mysql公网地址4. IDEA远程连接Mysql小结 5. 固定连接公网地址6. 固定地址连接测试 IDEA作为Java开发最主力的工具&#xff0c;在开发过程中需要经常用到数据库&#xff0c;如Mysql数据库&#xff0c;但是在IDEA中只能连接本…

VS Code使用教程

链接远程服务器 https://blog.csdn.net/zhaxun/article/details/120568402 免密登陆服务器 1生成客户机&#xff08;个人PC&#xff09;密令 ssh-keygen -t rsa生成的文件在主目录的.ssh文件当中。 查看密令并复制到linux系统当中 cat id_rsa.pub 2复制到服务器中 echo …

用C语言了解文件那些下 ‘流‘ 事

本篇会加入个人的所谓‘鱼式疯言’❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言,而是理解过并总结出来通俗易懂的大白话,我会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的&#xff0c;可能说的不是那么严谨.但小编初心是能让更多人能接受我们这个概念 前言 &#…

AMEYA360:大唐恩智浦荣获 2023芯向亦庄 “汽车芯片50强”

2023年11月28日&#xff0c;由北京市科学技术委员会和北京市经济和信息化局指导、北京经济技术开发区管理委员会主办、盖世汽车协办的“芯向亦庄”汽车芯片大赛在北京亦庄成功闭幕。 在本次大赛中 大唐恩智浦的 电池管理芯片DNB1168 (应用于新能源汽车BMS系统) 凭卓越的性能及高…

PDF控件Spire.PDF for .NET【转换】演示:将 PDF 转换为 Excel

PDF是一种通用的文件格式&#xff0c;但它很难编辑。如果您想修改和计算PDF数据&#xff0c;将PDF转换为Excel将是一个理想的解决方案。在本文中&#xff0c;您将了解如何使用Spire.PDF for .NET在 C# 和 VB.NET 中将 PDF 转换为 Excel。 Spire.Doc 是一款专门对 Word 文档进行…

医疗设备智慧管理助力医院提质增效,阿基米德amp;健康界实践分享

近日&#xff0c;苏州阿基米德网络科技有限公司与医疗领域头部级媒体健康界&#xff0c;联合举办“数智为擎 提质增效——医学装备智慧管理创新发展论坛”的直播活动。 直播现场&#xff0c;来自上海交通大学医学院附属同仁医院、中华医学会航海医学分会、苏州阿基米德的专家们…

vscode报错:建立连接:XHR failed

文章目录 问题解决方案 问题 Windows端ssh远程连接Linux端&#xff0c;Windows端vscode报错&#xff1a;“…XHR failed.” 解决方案 参考&#xff1a;解决 Windows 端 VS Code “无法与 “…“ 建立连接&#xff1a;XHR failed.” 问题 亲测有效。 总结&#xff1a; linux…

如何在小米路由器4A千兆版刷入OpenWRT并通过内网穿透工具实现公网远程访问

文章目录 前言1. 安装Python和需要的库2. 使用 OpenWRTInvasion 破解路由器3. 备份当前分区并刷入新的Breed4. 安装cpolar内网穿透4.1 注册账号4.2 下载cpolar客户端4.3 登录cpolar web ui管理界面4.4 创建公网地址 5. 固定公网地址访问 前言 OpenWRT是一个高度模块化、高度自…

代码随想录二刷 |二叉树 |144.二叉树的前序遍历

代码随想录二刷 &#xff5c;二叉树 &#xff5c;144.二叉树的前序遍历 题目描述解题思路代码实现递归法迭代法 题目描述 144.二叉树的前序遍历 给你二叉树的根节点 root &#xff0c;返回它节点值的 前序 遍历。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输…

使用Sourcetrail解析C项目

阅读源码的工具很多&#xff0c;今天给大家推荐一款别具一格的源码阅读神器。 它就是 Sourcetrail&#xff0c;一个免费开源、跨平台的可视化源码探索项目 使用

Python+Appium自动化测试之元素等待方法与重新封装元素定位方法

在appium自动化测试脚本运行的过程中&#xff0c;因为网络不稳定、测试机或模拟器卡顿等原因&#xff0c;有时候会出现页面元素加载超时元素定位失败的情况&#xff0c;但实际这又不是bug&#xff0c;只是元素加载较慢&#xff0c;这个时候我们就会使用元素等待的方法来避免这种…

前端知识(十三)——JavaScript监听按键,禁止F12,禁止右键,禁止保存网页【Ctrl+s】等操作

禁止右键 document.oncontextmenu new Function("event.returnValuefalse;") //禁用右键禁止按键 // 监听按键 document.onkeydown function () {// f12if (window.event && window.event.keyCode 123) {alert("F12被禁用");event.keyCode 0…

设计模式-门面模式(Facade)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、定义二、结构 前言 在组件构建过程中&#xff0c;某些接口之间直接依赖会带来很多问题&#xff0c;甚至无法直接实现。采用一层间接接口&#xff0c;来隔离…

低代码开发:属于“美味膳食”还是“垃圾食品”

目录 引言低代码是什么&#xff1f;低代码的优点使用挑战未来展望最后 引言 随着数字化转型的迅猛发展&#xff0c;低代码开发平台逐渐成为了企业和开发者的关注焦点&#xff0c;尤其是前两年低代码的迅速火爆&#xff0c;来势汹汹&#xff0c;号称要让大部分程序员下岗的功能…

导入pgsql中的保存的html数据到hive时,换行符无法被repalce

数据如图所示&#xff1a; 当我使用replace函数 \r\n 、\r 、 \n替换时。无论如何都无法替换 最终发现可以使用chr(ASCII码) 可以匹配到&#xff0c;坑我好久。 replace(replace(replace(replace(replace(bid_html_con, chr(9),),chr(10),),chr(13),),chr(160),),chr(32),)

EDA 数字时钟

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、数字时钟是什么&#xff1f;二、EDA里面数码管的显示1.元件模型2.参考程序3. 实验仿真波形4.实验现象5. 仿真问题 三、显示时钟1. 时钟电路模块2.参考程序3…

社交媒体图像识别与情感分析

社交媒体图像识别与情感分析是当前人工智能领域的一个研究热点。通过对社交媒体上大量的图像和文本数据进行深度学习和情感分析&#xff0c;可以提取出图像中的情感信息&#xff0c;从而为社交媒体用户提供更加个性化和精准的内容推荐和服务。 在社交媒体图像识别方面&#xff…

祝贺 年citation突破100

有好多年&#xff0c;写不出1篇论文了&#xff0c;也没有思路&#xff0c;觉得做的内容非常浅&#xff0c;一直忙于实际应用项目&#xff0c;偏技术突破。 2018出国访学后&#xff0c;重新看文献&#xff0c;整理思路&#xff0c;拓展思维&#xff0c;逐步写了几篇论文&#x…

Selenium自动化(上)

Selenium 安装 环境准备 第一种方式 Python 自带的 pip 工具安装。 pip install selenium4.12.0安装完成后&#xff0c;查看安装的 Selenium 版本号。 pip show selenium第二种方式 安装 Selenium 的前提是拥有 Python 开发环境&#xff08;推荐使用 PyCharm&#xff09;。…

外贸企业邮箱推荐:高抵达率的邮件解决方案

外贸邮箱用哪个企业邮箱邮件抵达率高&#xff1f; 在邮件到达率方面&#xff0c;Zoho Mail企业邮箱具有以下优势&#xff1a; 高效率的反垃圾邮件功能&#xff1a;Zoho Mail配备了前沿的反垃圾邮件过滤技术&#xff0c;能准确识别和拦截垃圾邮件&#xff0c;保证重要邮件能按时…