Springboot集成RabbitMq+延时队列

1. 引入jar包

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置yml

2.1 配置生产者yml

 

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: / # 虚拟主机
    username: guest
    password: guest
    publisher-returns: true  #开启发送失败退回
    # simple:同步等待confirm结果,直到超时
    #correlated:异步回调,次你故意ConfirmCallback,MQ返回结果时会回调这个ComfirmCallback
    publisher-confirm-type: correlated

2.2 配置消费者yml

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: / # 虚拟主机
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        delayed-topic-input:
          destination: delayed-topic-demo #将消费者队列绑定到指定交换机
          group: group-1 
          #消费默认分组,消息到达时同一个分组下多个实例情况,只会有一个实例消费这条消息
          consumer:
            delayed-exchange: true #开启延时,生产者和消费者端都需要开启这个配置

 3.生产者生产消息

3.1 direct 直连

把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中

3.1.1 直连队列消息发送

/***直接交换机 **/
    public static final String directExchange = "directExchangeOne";
    public static final String routingKey1 = "directKey1";
    public static final String routingKey2 = "directKey2";
    public static final String directQueue1 = "directQueueOne";
    public static final String directQueue2 = "directQueueTwo";


/**
     * 直接交换机 一个交换机可以绑定一个队列一个消费者,也可以绑定多个队列多个消费者
     * 通过指定路由键directRouting发送给交换机directExchange
     * 交互机directExchange通过指定的路由键把消息msg投递到对应的队列上面去
     * @param map
     */
    public void directToQueue(Map<String, String> map) {
        map.put("direct-路由key:",RabbitConstants.routingKey1);
        rabbitTemplate.convertAndSend(RabbitConstants.directExchange, RabbitConstants.routingKey1, map);
        map.put("direct-路由key:",RabbitConstants.routingKey2);
        rabbitTemplate.convertAndSend(RabbitConstants.directExchange, RabbitConstants.routingKey2, map);
    }

3.1.2 直连队列消息绑定

package rabbit.config;

import config.RabbitConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置类 : 创建我们的直接交换机和队列,以及直接交换机跟队列的绑定关系
 * direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去
 *
 *  */
@Configuration
public class DirectConfig {

    /**
     * Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列
     * @return
     */
    @Bean
    public DirectExchange directExchangeOne(){
        return new DirectExchange(RabbitConstants.directExchange);
    }

    @Bean
    public Queue directQueueOne(){
        return new Queue(RabbitConstants.directQueue1);
    }

    @Bean
    public Queue directQueueTwo(){
        return new Queue(RabbitConstants.directQueue2);
    }

    /**
     * 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息
     * 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
     * direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去
     * @param directQueueOne
     * @param directExchangeOne
     * @return
     */
    @Bean
    public Binding directBindingOne(Queue directQueueOne, DirectExchange directExchangeOne){
        return BindingBuilder.bind(directQueueOne).to(directExchangeOne).with(RabbitConstants.routingKey1);
    }

    /**
     * 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息
     * 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
     * direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去
     * @param directQueueTwo
     * @param directExchangeOne
     * @return
     */
    @Bean
    public Binding directBindingTwo(Queue directQueueTwo, DirectExchange directExchangeOne) {
        return BindingBuilder.bind(directQueueTwo).to(directExchangeOne).with(RabbitConstants.routingKey2);

    }

}

3.1.3 直连队列消息接收

@RabbitListener(queues = RabbitConstants.directQueue1)
    @RabbitHandler // 指定对消息的处理
    public void directClientOne(HashMap<String,String> mes){
        System.out.println("直连队列消息1:" + mes);
    }

    /** @RabbitListener(queues = {"directQueue1","directQueue2"}):这样就可以一次消费两条消息 **/
    @RabbitListener(queues = RabbitConstants.directQueue2)
    @RabbitHandler
    public void directClientTwo(HashMap<String,String> mes){
        System.out.println("直连队列消息2: " + mes);
    }

 3.1.4 结果:

3.2 fanout 扇形

把消息发送到所有与它绑定的Queue中,没有路由概念

3.2.1 扇形消息发送

@Autowired
    public RabbitMqProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

/***
     * 扇形交换机
     * 这个交换机没有路由键概念,就算你绑了路由键也是无视的
     * 消息会发送到所有绑定的队列上。
     * @param fanoutMap1
     */
    public void fanoutToQueue(Map<String, String> fanoutMap1) {
        fanoutMap1.put("fanout-交换机:",RabbitConstants.fanoutExchange1);
        rabbitTemplate.convertAndSend(RabbitConstants.fanoutExchange1,null,fanoutMap1);
    }

3.2.2 扇形消息绑定

/**
 * 扇形交换机
 * Fanout:转发消息到所有绑定队列,没有路由key
 * */
@Configuration
public class FanoutConfig {

    /**
     * 不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange1(){
        return new FanoutExchange(RabbitConstants.fanoutExchange1);
    }

    @Bean
    public Queue fanoutQueue1(){
        return new Queue(RabbitConstants.fanoutQueue1);
    }

    @Bean
    public Queue fanoutQueue2(){
        return new Queue(RabbitConstants.fanoutQueue2);
    }

    /** 扇形交换机没有路由key */
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange1){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);
    }

    /** 扇形交换机没有路由key */
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange1) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange1);

    }

}

3.2.3 扇形消息接收

/** 扇形交换机 */
    public static final String fanoutExchange1 = "fanout_exchange1";
    public static final String fanoutQueue1 = "fanout_queue1";
    public static final String fanoutQueue2 = "fanout_queue2";


@RabbitListener(queues = RabbitConstants.fanoutQueue1)
    @RabbitHandler
    public void fanoutQueue1(HashMap<String,String> fanoutMes){
        System.out.println("扇形队列消息1: " + fanoutMes);
    }

    @RabbitListener(queues = RabbitConstants.fanoutQueue2)
    @RabbitHandler
    public void fanoutQueue2(HashMap<String,String> fanoutMes){
        System.out.println("扇形队列消息2: " + fanoutMes);
    }

3.2.4 扇形--结果

3.3  topic 主题

将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中--多了匹配的概念

3.3.1 主题队列消息发送

@Autowired
    public RabbitMqProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }


/***主题交换机:模糊匹配队列
     * *:星号表示任意一个字符
     * 	#:表示任意一个或者多个字符
     */
    // topic 的 routingKey
    public static final String topicA = "helloTopic.world";
    public static final String topicB = "helloTopic.#";
    public static final String topicAll = "#";

    public static final String topicExchange = "topic_exchange";
    /** 绑定 topicA = "helloTopic.world"*/
    public static final String topicQueue1 = "topic_queue1";
    /** 绑定 topicB="helloTopic.#"*/
    public static final String topicQueue2 = "topic_queue2";
    /** 绑定 #,匹配所有 */
    public static final String topicQueue3 = "topic_queue3";


/**
     * 主题交换机:模糊匹配队列
     * topic.# 可匹配topic topic.add topic.add.add
     * topic.* 可匹配topic.add  topic.delete
     * @param map
     */
    public void topicToQueue(Map<String, String> map) {
        // 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息
        map.put("Topic-路由key:",RabbitConstants.topicA);
        rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicA, map);

        map.put("Topic-路由key:",RabbitConstants.topicB);
        rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicB, map);

        map.put("Topic-路由key:",RabbitConstants.topicAll);
        rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicAll, map);

    }

3.3.2 主题队列消息绑定

/***
 * 按规则转发消息
 */
@Configuration
public class TopicConfig {

    /**
     * Topic Exchange 转发消息主要是根据通配符
     * 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中
     * @return
     */
    @Bean
    public TopicExchange topicExchange1(){
        return new TopicExchange(RabbitConstants.topicExchange);
    }

    @Bean
    public Queue topicQueue1(){
        return new Queue(RabbitConstants.topicQueue1);
    }

    @Bean
    public Queue topicQueue2(){
        return new Queue(RabbitConstants.topicQueue2);
    }

    @Bean
    public Queue topicQueue3(){
        return new Queue(RabbitConstants.topicQueue3);
    }

    /**
     * 消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,
     * Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中
     * @param topicQueue1
     * @param topicExchange1
     * @return
     */
    @Bean
    public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange1){
        return BindingBuilder.bind(topicQueue1).to(topicExchange1).with(RabbitConstants.topicA);
    }

    @Bean
    public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange1){
        return BindingBuilder.bind(topicQueue2).to(topicExchange1).with(RabbitConstants.topicB);
    }

@Bean
    public Binding topicBinding3(Queue topicQueue3, TopicExchange topicExchange1){
        return BindingBuilder.bind(topicQueue3).to(topicExchange1).with(RabbitConstants.topicAll);
    }

}

3.3.3 主题队列消息接收

@RabbitListener(queues = RabbitConstants.topicQueue1)
    @RabbitHandler
    public void topicQueue1(HashMap<String,String> topicMes){
        System.out.println("主题消息队列1: " + topicMes);
    }

    @RabbitListener(queues = RabbitConstants.topicQueue2)
    @RabbitHandler
    public void topicQueue2(HashMap<String,String> topicMes){
        System.out.println("主题消息队列2: " + topicMes);
    }

    @RabbitListener(queues = RabbitConstants.topicQueue3)
    @RabbitHandler
    public void topicQueue3(HashMap<String,String> topicMes){
        System.out.println("主题消息队列匹配所有: " + topicMes);
    }

3.3.4 主题--结果

3.4 Delayed 延时(需要延时插件,参考我另一篇插件安装)

3.4.1 延时队列消息发送

 /** 延迟队列 */
    public static final String DELAYED_EXCHANGE_NAME = "myDelayedExchange";
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";

/**
     * 死信延迟队列
     * @param message
     */
    public void sendDelayedMessage(String message) {
        System.out.println("Send time 开始: " + LocalDateTime.now());
        rabbitTemplate.convertAndSend(RabbitConstants.DELAYED_EXCHANGE_NAME,
                RabbitConstants.DELAYED_ROUTING_KEY,
                message,
                messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setDelay(10000); // 设置消息的延长时间延,单位毫秒
                    return messagePostProcessor;
                });

        System.out.println("Send time 结束: " + LocalDateTime.now() );
    }

3.4.2 延时队列消息绑定

public class DelayedConfig {

    /** 定义一个延迟交换机 **/
    @Bean
    public CustomExchange delayedExchange() {
        /*Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");*/
        return new CustomExchange(RabbitConstants.DELAYED_EXCHANGE_NAME,
                "x-delayed-message", // 消息类型  x-delayed-message
                true, // 是否持久化
                false); // 是否自动删除
    }

    /** 延时队列 **/
    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(RabbitConstants.DELAYED_QUEUE_NAME)
                .withArgument("x-delayed-type", "direct")
                .build();
    }

    /** 绑定队列到这个延迟交换机 */
    @Bean
    public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(RabbitConstants.DELAYED_ROUTING_KEY).noargs();
    }
}

3.4.3 延时队列消息接收

@RabbitListener(queues = RabbitConstants.DELAYED_QUEUE_NAME)
    public void receiveDelayedMessage(String message,  Channel channel) {
        System.out.println("Received delayed message: " + message);
        log.info("当前时间:{},接收时长信息给延迟队列:{}", LocalTime.now(),message);
        System.out.println("Received time: " + LocalDateTime.now() + "  Received: " + message);
    //    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }

3.4.4 延时--结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/537570.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

UE 模型学习

1. UE中任何模型都是有多个三角面构成的&#xff0c;模型有一个顶点数组&#xff0c;根据右手螺旋定则和三个顶点顺序确定三角面的法线。 注&#xff1a;当三角面的法线方向与相机法线方向相同时&#xff0c;此时看不见三角面&#xff1b;当两法线方向相反&#xff0c;此时才能…

数据结构DAY5--二叉树相关流程

流程有&#xff1a;创建->遍历->得到信息->销毁 创建 根据先序遍历的流程以及对叶子结点的左后驱结点和右后驱结点以#号替代的原则&#xff0c;写出一个数组&#xff0c;并建立一个结构体&#xff0c;包括数据域&#xff0c;结构体类型的左后驱结点和右后驱结点指针…

数字证书在网络安全中的关键作用与日常应用

在当今数字化的时代&#xff0c;网络安全问题日益凸显&#xff0c;保护数据安全和用户隐私成为了人们关注的焦点。数字证书作为一种重要的网络安全技术&#xff0c;其在网络安全中扮演着关键的角色&#xff0c;并且在我们的日常生活中有着广泛的应用。现在给大家介绍简单介绍下…

blender怎么用GPU渲染?blender GPU云渲染推荐

在三维建模和渲染领域&#xff0c;Blender以其强大的功能和免费开源的特点广受好评。GPU渲染作为提升渲染效率的关键技术&#xff0c;越来越受到用户的关注。本文将详细介绍如何在Blender中设置并利用GPU进行渲染&#xff0c;以及探索其云渲染的可能性&#xff0c;助力用户高效…

装机指导。

everything winrar snipaste cmake git tortoisegit tortoisesvn inno setup vs2022 安装的时候注意sdk路径一定要默认&#xff01;&#xff01; 否则你会发现在你的sdk安装路径的根盘符下会多出一个Windows Kits&#xff0c;强迫症接受不了 默认的会跟已有的装在一起…

无法用raven-js,如何直接使用TraceKit标准化错误字符串(一次有趣的探索)

引子&#xff1a;网上三年前&#xff08;2020&#xff09;的文章介绍了一个raven-js 简单说就是把堆栈信息格式化兼容各浏览器&#xff0c;便于查看错误来源。 **but&#xff1a;**到处找了一下raven-js&#xff0c;已经没有官方出处了&#xff0c;只在Sentry的源码仓库里发现…

林江院长赴长沙见证爱尔眼科巩膜镜技术诊疗门诊启动仪式

近日&#xff0c;爱尔眼科“巩膜镜技术诊疗门诊、视觉康复及训练门诊”启动会在湖南长沙顺利举行。旨在通过成立爱尔眼科巩膜镜技术诊疗门诊、视觉康复及训练门诊&#xff0c;为有需要的疑难屈光不正患者提供全新的诊疗途径&#xff0c;为各年龄阶段人群视觉问题提供更全面的个…

[数据结构初阶]二叉树

我们在前两篇博客中主要介绍了堆及其应用&#xff0c;针对的对象堆是完全二叉树&#xff0c;存储方式采用顺序结构存储的方式。 那么好的&#xff0c;这篇博客我们浅谈二叉树的链式存储&#xff0c;针对的对象是二叉树&#xff0c;并不局限于完全二叉树了&#xff01; 我们先来…

PlayerSettings.WebGL.emscriptenArgs设置无效的问题

1&#xff09;PlayerSettings.WebGL.emscriptenArgs设置无效的问题 2&#xff09;多个小资源包合并为大资源包的疑问 3&#xff09;AssetBundle在移动设备上丢失 4&#xff09;Unity云渲染插件RenderStreaming&#xff0c;如何实现多用户分别有独立的操作 这是第381篇UWA技术知…

MySOL之旅--------MySQL数据库基础( 3 )

本篇碎碎念:要相信啊,胜利就在前方,要是因为一点小事就停滞不前,可能你也不适合获取胜利,成功的路上会伴有泥石,但是走到最后,你会发现身上的泥泞皆是荣耀的勋章! 今日份励志文案: 凡是发生皆有利于我 目录 查询(select) 1.全列查询 2.指定列查询 3.查询字段为表达式 ​编…

PVE系统的安装

一.PVE系统的安装 前置准备环境:windows电脑已安装Oracle VM VirtualBox,电脑支持虚拟化,且已经开启,按住ctrl+shift+ESC打开任务管理器查看是否开启,如果被禁用,可进入BIOS开启虚拟化,重启电脑后再进行后续操作。本步骤选用windows10安装VirtualBox,版本为7.0.8。 …

被拒绝的职场空窗期,到底该怎么办?

打工人的心头刺 最近&#xff0c;一则新闻在网上炸开了锅&#xff1a;一位求职者因职场空窗期超过三个月&#xff0c;竟被无情拒绝应聘。消息一出&#xff0c;瞬间引起了广大职场人的共鸣。在这个快节奏的时代&#xff0c;我们似乎被一种无形的力量推着&#xff0c;不敢休息&am…

高性能代码如何编写?

引言&#xff1a; 性能优化一直是一个至关重要的议题。随着应用程序规模的不断增长和用户对性能的不断提升的要求&#xff0c;开发人员需要更加关注如何编写高性能的代码&#xff0c;以确保应用程序能够在各种情况下都能保持稳定和高效。编写高性能代码需要从多个方面入手&…

编译Nginx配置QUIC/HTTP3.0

1. 安装BoringSSL sudo apt update sudo apt install -y build-essential ca-certificates zlib1g-dev libpcre3 \ libpcre3-dev tar unzip libssl-dev wget curl git cmake ninja-build mercurial \ libunwind-dev pkg-configgit clone --depth1 https://github.com/google/b…

耐受强酸碱PFA试剂瓶高纯实验级进口聚四氟乙烯材质取样瓶

PFA取样瓶作为实验室中常备器皿耗材之一&#xff0c;主要用来盛放、储存和运输样品&#xff0c;根据使用条件不同&#xff0c;也可叫特氟龙试剂瓶、样品瓶、储样瓶、广口瓶、进样瓶等。广泛应用于半导体、新材料、多晶硅、硅材、微电子等行业。近年来随着新兴行业的快速发展&am…

软考 — 系统架构设计师 - 嵌入式真题

问题1&#xff1a; 可靠度表示系统在规定条件下&#xff0c;规定的时间内不发生失效的概率。 失效率表示系统运行到此时从未出现失效的情况下&#xff0c;单位时间内系统出现失效的概率 问题 2&#xff1a; 动态冗余又称为主动冗余&#xff0c;通过故障检测&#xff0c;故障定…

麒麟系统(kylin)安装ssh后,无法上传文件

1.赋予文件夹权限 chmod 777 filename 2.修改ssh配置文件 vi /etc/ssh/sshd_config 将Subsystem sftp /xxxxx 改为Subsystem sftp internal-sftp 重启服务 sudo service sshd restart 断开ssh连接&#xff0c;重新连接&#xff0c;即可正常上传文件

2012年认证杯SPSSPRO杯数学建模D题(第二阶段)人机游戏中的数学模型全过程文档及程序

2012年认证杯SPSSPRO杯数学建模 D题 人机游戏中的数学模型 原题再现&#xff1a; 计算机游戏在社会和生活中享有特殊地位。游戏设计者主要考虑易学性、趣味性和界面友好性。趣味性是本质吸引力&#xff0c;使玩游戏者百玩不厌。网络游戏一般考虑如何搭建安全可靠、丰富多彩的…

MindOpt APL向量化建模语法的介绍与应用(2)

前言 在数据科学、工程优化和其他科学计算领域中&#xff0c;向量和矩阵的运算是核心组成部分。MAPL作为一种数学规划语言&#xff0c;为这些领域的专业人员提供了强大的工具&#xff0c;通过向量式和矩阵式变量声明以及丰富的内置数学运算支持&#xff0c;大大简化了数学建模…

数学建模-Matlab中randperm函数及其双重进阶版

1.randperm函数的用法 &#xff08;1&#xff09;这种用法就是参数只有一个数字&#xff0c;代表的含义就是随机排列之后打印输出&#xff1b; 我们举例的数字是4&#xff0c;就会把1到4这4个数字随机打乱之后随机输出&#xff0c;每次运行结果都不一样 所有可能的情况是n的…