RabbitMq的使用

最近处理访客记录所以,来学习下rabbitMQ。之前同事已经写好了,这里只需要进行消费,后续会逐渐完善。

0.介绍

0.1交换机(Exchanges)

rabbitmq中生产者发送的消息都是发送到交换机,再由交换机推入队列。所以生产者不知道队列去了哪里,就靠Exchage来控制,交换机总共有以下几种类型。

0.1.1广播模式(fanout)

扇出所有消息进入队列,类似广播。

0.1.2直接交换(direct)

绑定相关的routerKey分发到不同的队列,简单说就是direct交换机接收了消息后,根据关键词分发队列。

0.1.2主题模式(topic)

direct路由比较单一,所以提升了routerKey的能力,在关键词标记下加上了通配符。
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词

1.公共配置类

spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest
/**
 * 类描述:RabbitMQ公共配置类
 *
 * @ClassName RabbitMQConfig
 * @Author ward
 * @Date 2023-08-18 10:28
 */
public class RabbitMQConfig {
    /**
     * RabbitMQ的队列主题名称
     */
    public static final String RABBITMQ_TOPIC = "rabbitmqTopic";

    /**
     * RabbitMQ的DIRECT交换机名称
     */
    public static final String RABBITMQ_DIRECT_EXCHANGE = "rabbitmqDirectExchange";

    /**
     * RabbitMQ的Direct交换机和队列绑定的匹配键 DirectRouting
     */
    public static final String RABBITMQ_DIRECT_ROUTING = "rabbitmqDirectRouting";
}

2.消费消息的两种方式

把记录塞进队列里的时候,只是完成了第一步,那你肯定要对他进行消费。分为两种推模式和拉模式:推模式就是生产者发布消息时,主动推送给消费者;拉模式则是消费者发送请求后才会发送。

2.1


3.监听队列的两种方式

一种是@RabbitListener注解的方式,一种是实现springboot:ChannelAwareMessageListener接口的方式

3.1@RabbitListener

如果demoData想不转换成String直接推,得在这个数据流实现序列化。

innerRabbitTemplate.convertAndSend(InnerMQConfig.TOPIC_EXCHANGE, msgKey, JSONObject.toJSONString(demoData));
@Component
public class DemoRabbitMQListener {
      //定义方法进行信息的监听(queues表示队列名称)
      @RabbitListener(queues = "demo_queue")
      @RabbitHandler
      public void demoQueue(Message message){
          System.out.println("message:"+message.getBody());
      }
}

3.2实现ChannelAwareMessageListener接口

听前辈说直接实现这个接口,就不用管底层是谁的消息队列了,因为是基于Springboot,后续我会逐步求证,做需求只能先用着。这个实现起来有点麻烦,我总结了以下顺序:

3.2.1.创建连接工厂(ConnectionFactory——MQ连接工厂 )

publisherConfirms:消息发送到exchange,返回成功或者失败。
publishReturns:消息从exchange到queue,发送成功或者失败。
后续在DemoRabbitTemplate会演示回调

    @Bean(name = "DemoConnectionFactory")
    @Primary
    public ConnectionFactory connectionFactory() {
        //创建连接
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        // 主机地址
        connectionFactory.setHost(host);
        // 连接端口;默认为 5672
        connectionFactory.setPort(port);
        // 连接用户名;默认为guest
        connectionFactory.setUsername(username);
        // 连接密码;默认为guest
        connectionFactory.setPassword(password);
        // 虚拟主机名称;默认为 /
        connectionFactory.setVirtualHost(virtualHost);
        // 开启消息发送至RabbitMQ 的回调
        connectionFactory.setPublisherConfirms(true);
        // 开启消息发送至队列失败的回调
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

3.2.2.初始化组件(rabbitAdmin ——对MQ进行初始化的Spring组件)

    @Bean(name = "DemoRabbitAdmin")
    @Primary
    public RabbitAdmin rabbitAdmin(@Qualifier("DemoConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

3.2.3.创建交换器(exchange)

这里提供了两种等价的方式,喜欢哪种就用哪种。
durable:是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
autoDelete:是否自动删除,如果没有与之绑定的Queue,直接删除
internal:是否内置的,如果为true,只能通过Exchange到Exchange
arguments:结构化参数
在这里插入图片描述

看了源码之后发现默认只有名字的时候,其实持久化是开的的,自动删除默认就是关闭的。在这里插入图片描述

    /*创建交换器*/
    @Bean(DEMO_EXCHANGE)
    public TopicExchange exchange() {
        return new TopicExchange(DEMO_EXCHANGE, true, false);
    }
    /*创建交换器*/
    @Bean(DEMO_EXCHANGE)
    public Exchange exchange() {
        return ExchangeBuilder.topicExchange(DEMO_EXCHANGE).durable(true).build();
    }

3.2.4.创建队列(queue)

创建队列主要掌握这几个参数:
name: 队列名称。
durable: 队列是否持久化。 队列默认是存放到内存中的,rabbitmq重启则丢失,若想重启之后还存在则队列要持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库。
exclusive:是否排他的队列。有两个作用:连接关闭时该队列自动删除;该队列只允许一个消费者访问。
autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除。
arguments: 队列中的消息什么时候会自动被删除 (设置死信交换器和死信队列等设置)
在这里插入图片描述

    /*创建*/
    @Bean(QUEUE_NAME)
    public Queue QUEUE_DEMO() {
        return new Queue(QUEUE_NAME, true, false, false);
    }

3.2.5.绑定队列到交换机(binding)

    //绑定队列到交换机
    @Bean
    public Binding BINGING_EXCHANGE_QUEUE(@Qualifier(QUEUE_NAME) Queue queue,
                                           @Qualifier(DEMO_EXCHANGE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
    }

3.2.6.创建监听容器(SimpleMessageListenerContainer)

    //创建监听容器
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(
            @Qualifier("DemoConnectionFactory") ConnectionFactory connectionFactory,
            DemoRabbitMQListener demoRabbitMQListener,
            @Qualifier(QUEUE_NAME) Queue queue
    ) throws AmqpException {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        //消费者个数
        listenerContainer.setConcurrentConsumers(listenerSize);
        listenerContainer.setQueues(queue);
        listenerContainer.setExposeListenerChannel(true);
        //设置接收方式,AUTO-自动接收,MANUAL-手动接收,NULL-不接收
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //监听处理类(自己消费端写的类)
        listenerContainer.setMessageListener(demoRabbitMQListener);
        return listenerContainer;
    }

3.2.7.创建操作类(RabbitTemplate)

setConfirmCallback的消息回调是在生产者端要把参数丢进去的。

    @Bean(name = "DemoRabbitTemplate")
    @Primary//多个实现类使用该注解
    public RabbitTemplate rabbitTemplate(@Qualifier("DemoConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //触发setReturnCallback回调必须设置mandatory=true,否则Exchange没有找到Queue就会丢弃掉消息,而不会触发回调
        rabbitTemplate.setMandatory(true);
        //设置连接工厂
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //消息是否成功发送到Exchange回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 确认消息送到交换机(Exchange)回调
             * @param correlationData
             * @param ack
             * @param cause
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("确认消息送到交换机(Exchange)结果:");
                log.info("相关数据:{}", correlationData);
                boolean ret = false;
                if (ack) {
                    log.info("消息发送到交换机成功, 消息 = {}", correlationData.getId());
                    //下面可自定义业务逻辑处理,如入库保存信息等

                } else {
                    log.error("消息发送到交换机失败! 消息: {}}; 错误原因:cause: {}", correlationData.getId(), cause);
                    //下面可自定义业务逻辑处理,如入库保存信息等

                }
            }
        });
        //消息是否从Exchange路由到Queue
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 失败回调:只有消息没有投递给指定的队列
             * @param message  投递失败的消息详细信息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 当时这个消息发给那个交换机
             * @param routingKey 当时这个消息用那个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //获取消息id
                String messageId = message.getMessageProperties().getMessageId();
                // 内容
                String result = null;
                try {
                    result = new String(message.getBody(), "UTF-8");
                } catch (Exception e) {
                    log.error("消息发送失败", e);
                }
                log.error("消息发送失败, 消息ID = {}; 消息内容 = {}", messageId, result);
                //下面可自定义业务逻辑处理,如入库保存信息等
            }
        });
        return rabbitTemplate;
    }

3.2.8.监听消费(RabbitMQListener)

这个类要注意用@Service或者@Compet注解让他交给IOC

@Service
@Slf4j
public class DemoRabbitMQListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        log.info("message:{}", message.getBody());
        //todo: 接下来就是各自的业务逻辑,就是消费环节
    }
}

3.子标题

正文

在这里插入代码片

4.子标题

正文

在这里插入代码片

5.子标题

正文

在这里插入代码片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/86174.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

百度云BOS云存储的图片如何在访问时,同时进行格式转换、缩放等处理

前言 之前做了一个图片格式转换和压缩的服务,结果太占内存。后来查到在访问图片链接时,支持进行图片压缩和格式转换,本来想着先格式转换、压缩图片再上传到BOS,现在变成了上传后,访问时进行压缩和格式转换。想了想&am…

无人机航管应答机 ping200XR

产品概述 ping200XR是一个完整的系统,旨在满足航管应答器和自动相关监视广播(ADS-B)的要求,在管制空域操作无人航空系统(UAS)。该系统完全可配置为模式A,模式C,模式S转发器和扩展ADS-B发射机的任何组合。ping200XR包括一个精度超…

AutoDev 1.1.3 登场,个性化 AI 辅助:私有化大模型、自主设计 prompt、定义独特规则...

在过去的半个月里,我们为开源辅助编程工具 AutoDev 添加了更强大的自定义能力,现在你可以: 使用自己部署的开源大模型自己配置 Intellij IDEA 中的行为自定义开发过程中的规范 当然了,如果您自身拥有开发能力的话,建议…

MinIO线上扩容实战

硬件投入肯定是随着业务的增长而增长,这就要求中间件平台必须提供水平伸缩机制,MinIO对象存储服务也不例外,本文就详细介绍MinIO的扩容。 Minio支持通过增加新的Server Pool来扩容老的集群。每个Server Pool都是一个相对独立的故障域&#x…

uniapp微信小程序点击右上角菜单分享功能权限配置

个人项目地址: SubTopH前端开发个人站 (自己开发的前端功能和UI组件,一些有趣的小功能,感兴趣的伙伴可以访问,欢迎提出更好的想法,私信沟通,网站属于静态页面) SubTopH前端开发个人站…

IPv4,IPv6,TCP,路由

主要回顾一下TCP/IP的传输过程,在这个过程中,做了什么事情 ip : 网际协议,IP协议能让世界上任意两台计算机之间进行通信。 IP协议的三大功能: 寻址和路由传递服务:不可靠(尽最大努力交付传输数据包&…

Kali 分析和管理网络

查看网络 ifconfig 命令 ┌──(root㉿kali)-[~] # eth0:有线网卡 └─# ifconfig eth0: flags4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500inet 192.168.56.128 netmask 255.255.255.0 broadcast 192.168.56.255inet6 fe80::20c:29ff:feb3:7991 prefixlen 64 …

C++信息学奥赛1131:基因相关性

这段代码的功能是比较两个字符串的相似度&#xff0c;并根据给定的阈值判断是否相似。 解析注释后的代码如下&#xff1a; #include <iostream> #include <string> using namespace std;int main() {double bf; // 定义双精度浮点数变量bf&#xff0c;用于存储阈…

LED驱动型IC芯片的原理介绍

一、LED驱动器是什么 LED驱动器&#xff08;LED Driver&#xff09;&#xff0c;是指驱动LED发光或LED模块组件正常工作的电源调整电子器件。由于LED PN结的导通特性决定&#xff0c;它能适应的电源电压和电流变动范围十分狭窄&#xff0c;稍许偏离就可能无法点亮LED或者发光效…

10种最流行的3D模型文件格式及转换方法

3D 文件格式用于存储有关 3D 模型的信息。 你可能听说过一些最流行的格式&#xff0c;包括 STL、OBJ、FBX 和 DAE。 它们广泛应用于从视频游戏动画到工业增材制造的各种应用中。 在本文中&#xff0c;我们将考虑为什么有这么多不同的格式&#xff0c;探讨 3D 文件格式存储的四…

Code Lab - 2

pip install torch-scatter -f https://pytorch-geometric.com/whl/torch-1.10.2cu102.html pip install torch-sparse -f https://pytorch-geometric.com/whl/torch-1.10.2cu102.html pip install torch-geometric pip install ogb 1. PyG Datasets PyG有两个类&#xff0c;用…

学Python静不下来,看了一堆资料还是很迷茫是为什么

一、前言 最近发现&#xff0c;身边很多的小伙伴学Python都会遇到一个问题&#xff0c;就是资料也看了很多&#xff0c;也花了很多时间去学习但还是很迷茫&#xff0c;时间长了又发现之前学的知识点很多都忘了&#xff0c;都萌生出了想半路放弃的想法。 让我们看看蚂蚁金服的大…

IDEA:Error running,Command line is too long. 解决方法

报错如下&#xff1a; Error running SendSmsUtil. Command line is too long. Shorten the command line via JAR manifest or via a classpath file and rerun.原因是启动命令过长。 解决方法&#xff1a; 1、打开Edit Configurations 2、点击Modify options设置&#x…

关于android studio 几个简单的问题说明

自信是成功的第一步。——爱迪生 1. android studio 如何运行不同项目是否要更换不同的sdk 和 gradle 2.编译Gradle总是错误为什么 3.如何清理android studio 的缓存 4. 关于android Studio中的build 下面的rebuild project

Kafka基本使用

查看Kafka的进程是否在运行 #命令行终端中运行如下命令 ps -ef | grep kafkafind / -iname kafka-server-start.shcd /usr/local/kafka/bin/#启动kafka ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.propertiesKafka默认使用9092端口提供服务&#xf…

使用opencv-python在图片上显示中文

测试图像如下&#xff1a; 核心代码如下&#xff1a; import cv2 import numpy as np from PIL import Image, ImageDraw, ImageFontdef cv2ImgAddText(img, text, left, top, textColor(0, 255, 0), textSize20):if (isinstance(img, np.ndarray)): #判断是否OpenCV图片类型…

javaScript:七夕特辑-对象的认识与应用(包含日期对象及相关案例)

目录 一.前言 二.认识对象 在js中声明对象的方法 1.直接使用{}声明对象 2.使用构造函数创建对象 获取属性的值 执行对象方法 解释 三.对象的应用 代码 效果图 ​编辑 四.日期对象 1.Date 日期对象 2. getFullYear() 获取当前年份 3.getMonth() 获取当前日期对象…

记一次由于整型参数错误导致的任意文件上传

当时误打误撞发现的&#xff0c;觉得挺奇葩的&#xff0c;记录下 一个正常的图片上传的点&#xff0c;文件类型白名单 但是比较巧的是当时刚对上面的id进行过注入测试&#xff0c;有一些遗留的测试 payload 没删&#xff0c;然后在测试上传的时候就发现.php的后缀可以上传了&a…

初识 JVM 01

JVM JRE JDK的关系 JVM 的内存机构 程序计数器 java指令的执行流程&#xff1a; 1 右侧的java源代码编译为左侧的java字节码&#xff08;右侧第一个方块对应左侧第一个方块&#xff09; 2 字节码 经过解释器 变为机器码 3 机器码就可以被cpu来执行 程序计数器的作用就…

C语言<自定义类型>结构体、枚举、联合

✨Blog&#xff1a;&#x1f970;不会敲代码的小张:)&#x1f970; &#x1f251;推荐专栏&#xff1a;C语言&#x1f92a;、Cpp&#x1f636;‍&#x1f32b;️、数据结构初阶&#x1f480; &#x1f4bd;座右铭&#xff1a;“記住&#xff0c;每一天都是一個新的開始&#x1…