RabbitMQ工作模式2 整合springboot 和MQ高级特性

RabbitMQ工作模式

1.路由模式

创建交换机 , 连接队列 (生产者)

public class MyTestExDirect {
    @Test
    public void bbb() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接mq
        connectionFactory.setUsername("账号");
        connectionFactory.setPassword("密码");
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(端口号);
        connectionFactory.setVirtualHost("/aaa");
        //建立连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //创建交换机
        channel.exchangeDeclare("ex_direct", BuiltinExchangeType.DIRECT,false);
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 持久化
         * boolean exclusive, 是否独占
         * boolean autoDelete,  受否自动删除
         * Map<String, Object> arguments  参数
         */
        channel.queueDeclare("mydirect1",false,false,false,null);
        channel.queueDeclare("mydirect2",false,false,false,null);
        //绑定交换机和队列   设置routingkey
        channel.queueBind("mydirect1","ex_direct","error");
        channel.queueBind("mydirect2","ex_direct","test");
        channel.queueBind("mydirect2","ex_direct","test2");
        //交换机     routingkey     根据routingkey在队列上发布消息
        channel.basicPublish("ex_direct","error",null,"路由模式测试".getBytes());
    }
}

启动测试

交换机创建成功

队列创建成功 , 与交换机连接成功

通过routingkey "error" 将消息发送到 mydirect1

创建消费者

public class ConsumerAppDirect
{
    public static void main( String[] args ) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接mq
        connectionFactory.setUsername("账号"); 
        connectionFactory.setPassword("密码"); 
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(端口号); 
        connectionFactory.setVirtualHost("/aaa");
        //建立连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("mq:-----aaa"+s);
            }
        };
        channel.basicConsume("mydirect1",true,consumer);
    }
}

开启监控

2.Topics 主题模式

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词 

*:匹配不多不少恰好1个词   test.* test.insert

举例:

item.#:能够匹配item.insert.abc 或者 item.insert

item.*:只能匹配item.insert 

创建交换机和生产者

public class MyTestExTopics {
    @Test
    public void ccc() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接mq
        connectionFactory.setUsername("账号");
        connectionFactory.setPassword("密码"); 
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(端口号);
        connectionFactory.setVirtualHost("/aaa");
        //建立连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //创建交换机
        channel.exchangeDeclare("ex_topics", BuiltinExchangeType.TOPIC,false);
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable, 持久化
         * boolean exclusive, 是否独占
         * boolean autoDelete,  受否自动删除
         * Map<String, Object> arguments  参数
         */
        channel.queueDeclare("mytopics1",false,false,false,null);
        channel.queueDeclare("mytopics2",false,false,false,null);
        //绑定交换机和队列   设置routingkey
        channel.queueBind("mytopics1","ex_topics","test.#");
        channel.queueBind("mytopics2","ex_topics","*.aaa");
        channel.queueBind("mytopics2","ex_topics","test.*");
        //交换机     此处的routingkey应该是具体的值     根据routingkey在队列上发布消息
        channel.basicPublish("ex_topics","test.aaa",null,"TOPIC模式测试".getBytes());


    }
}

测试

发布消息成功

消费者监听参考路由模式 , 只需要修改队列就行

SpringBoot整合RabbitMQ

1.搭建项目

添加依赖

<!--2. rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

添加配置文件

2.创建工作模式(主题模式)

1)创建交换机和队列

package com.example.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicMqConfig {
    @Value("${mq.exchange.name}")
    private String EXCHANGENAME;
    @Value("${mq.queue.name}")
    private String QUEUENAME1;
    @Value("${mq.queue.name}")
    private String QUEUENAME2;
    //创建交换机
    @Bean("ex1")
    public Exchange getExchange(){
        Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();
        return exchange;
    }
    //创建队列
    @Bean("queue1")
    public Queue getQueue1(){
        Queue queue1 = QueueBuilder.nonDurable(QUEUENAME1).build();
        return queue1;
    }
    @Bean("queue2")
    public Queue getQueue2(){
        Queue queue2 = QueueBuilder.nonDurable(QUEUENAME1).build();
        return queue2;
    }
    //绑定交换机和队列
    @Bean("binding1")
    public Binding bindingQueueToExchange1(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){
        Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();
        return binding1;
    }
    @Bean("binding2")
    public Binding bindingQueueToExchange2(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){
        Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("test.*").noargs();
        return binding2;
    }
}

2)创建生产者

测试

3)创建消费者

创建配置文件

创建测试类 监听队列
package com.example.message;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerMessage {
    @RabbitListener(queues = "test_queue2")
    public void xxx(Message message){
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
    }
}

测试

MQ高级特性,消息的可靠性传递

1.确认模式

开启确认模式 修改配置

创建测试类

@SpringBootTest
public class MqTtst {
    @Value("${mq.exchange.name}")
    private String EXCHANGENAME;
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Test
    void sendMsg(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if (b){
                    System.out.println("发送消息成功");
                }else {
                    System.out.println("发送消息失败,原因:"+s);
                }
            }
        });
        rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");
    }
}

启动测试

2.消息回退

当交换机接收到消息 , 但队列收不到消息时 , 使用回退

修改配置

测试

@Test
void sendMsgReturn(){
    //  消息回退
    rabbitTemplate.setMandatory(true);//
    rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息回退,回退的消息是:"+new String(returnedMessage.getMessage().getBody())));
    rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");
}

3.Consumer Ack

三种确认方式

 自动确认:acknowledge="none" 。不管处理成功与否,业务处理异常也不管

(当消费者意担接收到消息之后,消费者就会给broker一个回执,证明已经接收到消息 了,不管消息到底是否成功)

手动确认:acknowledge="manual" 。可以解决业务异常的情况

(收到消息之后不会立马确认收到消息,当业务处理没有问题的时候手动的调用代码的方 式来进行处理,如果业务失败了,就可以进行额外的操作)

根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

1)自动确认

2)手动确认

修改配置 开启手动签收

3)创建测试

@Component
public class ShouDingQianShouMeaasge implements ChannelAwareMessageListener {
    @Override
    @RabbitListener(queues = "test_queue2")
    public void onMessage(Message message, Channel channel) throws Exception {
      Thread.sleep(2000);
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(1/0);
            channel.basicAck(deliveryTag,true);
        }catch (Exception e){
            System.out.println("拒绝签收");
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

启动测试

有异常拒绝签收

无异常签收成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/192232.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ZYNQ PL 中断请求

1 中断概念 中断学习 2 ZYNQ 中断框图 上图为 zynq 中断分布框图。可以看到部分 PL 到 PS 部分的中断&#xff0c;经过中断控制分配器&#xff08;ICD&#xff09;&#xff0c; 同时进入 CPU1 和 CPU0。查询下面表格&#xff0c;可以看到 PL 到 PS 部分一共有 20 个中断可以使…

qt国际化多语言

vs + qt 方法 一 (1)生成.pro文件 如果报错: cannot find any qt projects to export 则执行如下: 然后重新生成 pro文件。 (2)生成ts文件 (方法1)在项目文件(xxx.pro) 文件添加: TRANSLATIONS += en.ts zh_CN.ts 然后打开cmd命令,进入项目目录,执行 l…

C++中的new、operator new与placement new

new operator new operator是我们常用的new。 new 和 delete 是用来在 堆上申请和释放空间的 &#xff0c;是 C 定义的 关键字&#xff0c;和 sizeof 一样。 实际 new / delete 和 malloc / free 最大的区别是&#xff0c;前者对于 自定义类型 除了可以开辟空间&#xff0c;…

C语言基础篇5:指针(二)

接上篇&#xff1a;C语言基础篇5&#xff1a;指针(一) 4 指针作为函数参数 4.1 指针变量作为函数的参数 指针型变量可以作为函数的参数&#xff0c;使用指针作为函数的参数是将函数的参数声明为一个指针&#xff0c;前面提到当数组作为函数的实参时&#xff0c;值传递数组的地址…

其利天下技术总监冯建武受邀出席“2023年电子工程师大会”并作主题演讲

2023年11月23日&#xff0c;由华秋电子发烧友主办的“2023年电子工程师大会暨第三届社区年度颁奖活动”在深圳新一代产业园成功举行。本次年度颁奖活动邀请了高校教授、企业高管、行业专家、资深电子工程师等共300多人出席。聚焦“电机驱动技术”、“开源硬件”、“OpenHarmony…

C#学习-8课时

P10 输入输出程序编写 相同类型的可以直接相加&#xff1b; cwtabtabconsole.Writeline(); using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace C_8_1 { class Program { s…

【腾讯云云上实验室】用向量数据库—实践相亲社交应用

快速入口 &#x1f449;向量数据库_大模型知识库_向量数据存储_向量数据检索- 腾讯云 (tencent.com) 文章目录 前言1. 向量数据库概念及原理1.1 向量数据库概念1.2 向量数据库核心原理1.3 向量数据库优缺点1.4 向量数据库与传统数据库的区别 2. 腾讯云向量数据库的基本特性及优…

黑马程序员索引学习笔记

文章目录 索引的分类从索引字段特性从物理存储从数据结构组成索引的字段个数 InnoDB主键索的Btree高度为多高呢?explain执行计划最左匹配原则索引失效情况SQL提示覆盖索引、回表查询前缀索引索引设计原则 索引的分类 从索引字段特性 主键索引、唯一索引、常规索引、全文索引…

RNN 网络结构及训练过程简介

本文通过整理李宏毅老师的机器学习教程的内容&#xff0c;简要介绍 RNN&#xff08;recurrent neural network&#xff09;的网络结构及训练过程。 RNN 网络结构, 李宏毅 RNN RNN 的特点在于存储功能&#xff0c;即可以记忆前面时刻的信息。 最简单的 RNN 结构如下&#xf…

切换服务器上自己用户目录下的 conda 环境和一个外部的 Conda 环境

如果我们有自己的 Miniconda 安装和一个外部的 Conda 环境&#xff08;比如一个全局安装的 Anaconda&#xff09;&#xff0c;我们可以通过修改 shell 环境来切换使用它们。这通常涉及到更改 PATH 环境变量&#xff0c;以便指向你想要使用的 Conda 安装的可执行文件&#xff1a…

CSS新手入门笔记整理:CSS基本选择器

id属性 id属性具有唯一性&#xff0c;也就是说&#xff0c;在一个页面中相同的id只能出现一次。在不同的页面中&#xff0c;可以出现两个id相同的元素。 语法 <div id"text"> ...... </div> class属性 class&#xff0c;顾名思义&#xff0c;就是“类…

C语言每日一题(37)两数相加

力扣网 2 两数相加 题目描述 给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外&a…

【Linux】第二十一站:文件(一)

文章目录 一、共识原理二、C系列文件接口三、从C过渡到系统&#xff1a;文件系统调用四、访问文件的本质 一、共识原理 文件 内容 属性 文件分为打开的文件 和 没打开的文件 打开的文件&#xff1a;是谁打开的&#xff1f;是进程&#xff01;----所以研究打开的文件本质是研…

Redis之C语言底层数据结构笔记

目录 动态字符串SDS Dict ZipList QuickList ​ SkipList 动态字符串SDS Dict ZipList QuickList SkipList

SAP创建ODATA服务-Structure

SAP创建ODATA服务-Structure 1、创建数据字典 进入se11创建透明表ZRICO_USR,并创建对应字段 2、创建OData service 首先创建Gateway service project&#xff0c;事务码&#xff1a;SEGW&#xff0c;点击Create Project 按钮 Gateway service Project分四个部分&#xff1a…

ubuntu20.04安装tensorRT流程梳理

目标&#xff1a;先跑demo&#xff0c;再学习源码 step1, 提前准备好CUDA环境 安装CUDA&#xff0c;cuDNN 注意&#xff0c;CUDA&#xff0c;cuDNN需要去官网下载.run和tar文件安装&#xff0c;否则在下面step4 make命令会报找不到cuda等的错误&#xff0c;具体安装教程网上…

COMP2121 Discrete Mathematics

COMP2121 Discrete Mathematics 需要可WeChat: zh6-86

Unity 打印每次代码保存耗时

unity每次编辑代码的时候&#xff0c;都需要保存&#xff0c;unity右下角的小圆圈总是转个不停&#xff0c;那么每次编辑代码后&#xff0c;unity到底需要多久时间呢&#xff0c;下面就有代码可以获取 保存时间。 using UnityEngine; using UnityEditor; using UnityEditor.Com…

开源万能DIY预约小程序源码系统+自由DIY,海量模板任选择,附带完整的搭建教程

在移动互联网时代&#xff0c;用户对于预约服务的便捷性和高效性需求日益增长。为了满足这一需求&#xff0c;我们凭借多年的技术积累和经验&#xff0c;开发出了这款开源万能DIY预约小程序源码系统。该系统的推出旨在帮助开发者快速构建功能丰富、符合用户需求的预约小程序&am…