中间件-------RabbitMQ

同步和异步

异步调用

MQ

MQ优势:①服务解耦   ②异步调用   ③流量削峰

结构

消息模型

 RabbitMQ入门案例,实现消息发送和消息接收

生产者:
public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.136.132");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}
消费者:
public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.136.132");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

SpringAMQP

引入依赖

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

普通队列 

 第一步:publisher服务配置文件,发消息

spring:
  rabbitmq:
    host: 192.168.136.132
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {

    //获取RabbitTemplateAPI
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test(){
        String queueName = "simple.queue";
        String message = "hello SpringAMQP";

        //使用API传入队列名和消息即可直接发送
        rabbitTemplate.convertAndSend(queueName,message);
    }

}

第二步:Consumer服务配置信息监听消息

spring:
  rabbitmq:
    host: 192.168.136.132
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

//定义一个监听类去监听消息
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void ListenSimpleQueue(String msg){
        System.out.println("msg = " + msg);
    }
}


Work Queue队列

多个消费者绑定到同一个队列,可以通过prefetch来控制消费者消息预取的数量

第一步: 生产者发送消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {

    //获取RabbitTemplateAPI
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test01() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello SpringAMQP--";

        for (int i = 0; i < 50; i++) {
            //使用API传入队列名和消息即可直接发送
            rabbitTemplate.convertAndSend(queueName,message+i);
            Thread.sleep(20);
        }

    }

}

第二步:消费者设置多个监听消息

@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void ListenWorkQueue(String msg) throws InterruptedException {
        System.out.println("消费者一接收到消息---- = " + msg + LocalDateTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues = "simple.queue")
    public void ListenWorkQueue01(String msg) throws InterruptedException {
        System.out.println("消费者二接收到消息---- = " + msg + LocalDateTime.now());
        Thread.sleep(200);
    }
}

 第三步:消费者可通过prehtch设置消息预取数量

spring:
  rabbitmq:
    host: 192.168.136.132
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /
    listener:
      simple:
        prefetch: 1


发布-订阅模型

Fanout广播交换机 --->多个队列收到交换机的消息

第一步:Consumer声明交换机,队列并进行绑定。
@Configuration
public class FanoutConfig {

    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    //声明队列1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    //绑定队列1到交换机上
    @Bean
    public Binding fanoutBanding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }


    //声明队列2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    //绑定队列2到交换机上
    @Bean
    public Binding fanoutBanding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
}
 第二步:Consumer进行监听消息
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void ListenSimpleQueue1(String msg){
        System.out.println("消费者接收到fanout.queue1的消息 = " + msg);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void ListenSimpleQueue2(String msg){
        System.out.println("消费者接收到fanout.queue2的消息 = " + msg);
    }
}
第三步:Publisher向交换机发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {

    //获取RabbitTemplateAPI
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testExchange() {
        //声明交换机名称
        String exchangeName = "itcast.fanout";
        //消息
        String message = "Hello Everyone";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
}

Direct路由交换机 --->将消息发给指定key的队列

第一步:在Listener中声明队列,交换机以及key
@Component
public class SpringRabbitListener {
    //声明队列1,交换机以及队列1的bindingKey
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "derict.queue1"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void ListenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息 = " + msg);
    }

    //声明队列2,交换机以及队列2的bindingKey
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "derict.queue2"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void ListenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息 = " + msg);
    }
}
第二步:向指定key的队列发送消息
    @Test
    public void testDirect() {
        //声明交换机名称
        String exchangeName = "itcast.direct";

        //消息
        String message = "Hello Blue!!";

        //发送消息,指定交换机,队列以及要发送的key
        rabbitTemplate.convertAndSend(exchangeName,"red",message);
    }

Topic主题交换机 ---->key必须是多个单词列表,统一主题,支持通配符

 第一步:在Listener中声明队列,交换机以及通配符key
@Component
public class SpringRabbitListener {

    //声明队列2的交换机,队列以及通配符key
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void ListenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息 = " + msg);
    }
    //声明队列2的交换机,队列以及通配符key
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void ListenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息 = " + msg);
    }

}
第二步:向主题通配符发送消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {

    //获取RabbitTemplateAPI
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testTopic() {
        //声明交换机名称
        String exchangeName = "itcast.topic";

        //消息
        String message = "Hello China!!";

        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
    }
}

消息转换器

 RabbitMQ发的消息体都是Object类型,所有还可以发送对象数据。而且默认的消息转换器是MessageConverter实现的,当使用的是Map数据类型时,就会序列化成很多字节,所以推荐使用JSON的序列化和反序列化,直接修改默认的MessageConverter的类型

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
@Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }

对于RabbitMQ高级部分:死信队列,延迟队列,发布确认,幂等性,优先,惰性队列等有时间再学

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/646621.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Text2SQL 论文】SeaD:使用 Schema-aware 去噪训练的 end2end 的 Text2SQL

论文&#xff1a;SeaD: End-to-end Text-to-SQL Generation with Schema-aware Denoising ⭐⭐ NAACL 2022, arXiv:2105.07911 本论文提出 SeaD 模型&#xff0c;使用 schema-aware 的去噪方法来训练一个 end2end、seq2seq 的 Transformer 模型来实现 Text2SQL。 一、论文速读…

[vue error] vue3中使用同名简写报错 ‘v-bind‘ directives require an attribute value

错误详情 错误信息 ‘v-bind’ directives require an attribute value.eslintvue/valid-v-bind 错误原因 默认情况下&#xff0c;ESLint 将同名缩写视为错误。此外&#xff0c;Volar 扩展可能需要更新以支持 Vue 3.4 中的新语法。 解决方案 更新 Volar 扩展 安装或更新 …

Springboot集成GRPC

Springboot集成GRPC 一、springboot版本二、GRPC的pom依赖2.1 服务端2.2 客户端3.构建依赖 三、配置文件服务端客户端 四、 demo4.1 编写proto文件4.2 生成文件4.3 服务端重写方法4.4 客户端调用该方法 五、测试 一、springboot版本 <groupId>org.springframework.boot&l…

YOLOv8_pose预测流程-原理解析[关键点检测理论篇]

YOLOv8_seg的网络结构图在博客YOLOv8网络结构介绍_CSDN博客已经更新了,由网络结构图可以看到相对于目标检测网络,实例分割网络只是在Head层不相同,如下图所示,在每个特征层中增加了KeyPoint分支(浅绿色),通过两个卷积组和一个Conv卷积得到得到通道数为51的特征图,51表示…

自动驾驶技术现状与需求分析

随着科技的不断进步和智能化浪潮的席卷&#xff0c;自动驾驶技术已成为当今交通领域的热点话题。本文旨在深入探讨自动驾驶技术的当前发展状况&#xff0c;并对其未来的市场需求进行细致分析。首先&#xff0c;我们将回顾自动驾驶技术的起源、发展历程以及当前的技术水平&#…

信息学奥赛初赛天天练-12-数论-整除问题

更多资源请关注纽扣编程微信公众号 整除的性质 1 整除性 若 &#x1d44e; 和 &#x1d44f; 都为整数&#xff0c; &#x1d44e; 整除 &#x1d44f; 是指 &#x1d44f; 是 &#x1d44e; 的倍数&#xff0c;&#x1d44e; 是 &#x1d44f; 的约数&#xff08;或者叫 因…

贪心题目总结

1. 最长递增子序列 我们来看一下我们的贪心策略体现在哪里&#xff1f;&#xff1f;&#xff1f; 我们来总结一下&#xff1a; 我们在考虑最长递增子序列的长度的时候&#xff0c;其实并不关心这个序列长什么样子,我们只是关心最后一个元素是谁。这样新来一个元素之后&#xf…

C++编程揭秘:虚表机制与ABI兼容性的实例剖析

前言&#xff1a; 假设你的应用程序引用的一个库某天更新了&#xff0c;虽然 API 和调用方式基本没变&#xff0c;但你需要重新编译你的应用程序才能使用这个库&#xff0c;那么一般说这个库是源码兼容&#xff08;Source compatible&#xff09;&#xff1b;反之&#xff0c;如…

CAN总线简介

1. CAN总线概述 1.1 CAN定义与历史背景 CAN&#xff0c;全称为Controller Area Network&#xff0c;是一种基于消息广播的串行通信协议。它最初由德国Bosch公司在1983年为汽车行业开发&#xff0c;目的是实现汽车内部电子控制单元&#xff08;ECUs&#xff09;之间的可靠通信。…

批量漏洞挖掘思路小结

漏洞挖掘是指对应用程序中未知漏洞的探索&#xff0c;通过综合应用各种技术和工具&#xff0c;尽可能地找出其中的潜在漏洞。一般情况下漏洞挖掘针对单一的应用系统&#xff0c;通过端口扫描、目录扫描、文件扫描等方式对其安全性进行评估&#xff0c;而本文主要针对Nday和1day…

软考结束。有什么要说的

1. 竟然是机试&#xff0c;出乎我意料。是 考试机构觉得笔试成本高了么。这次的考试是机试&#xff0c;相比以往有所不一样。感言是不是以后都会在固定地点考试也说不准。 2. 遇到年轻人。 这次旁边的一个女同学第一次参加&#xff0c;还像我询问了一些关于软考的事。我是有…

【设计模式】JAVA Design Patterns——Command(事务模式)

&#x1f50d;目的 将请求封装为对象&#xff0c;从而使你可以将具有不同请求的客户端参数化&#xff0c;队列或记录请求&#xff0c;并且支持可撤销操作。 &#x1f50d;解释 真实世界例子 有一个巫师在地精上施放咒语。咒语在地精上一一执行。第一个咒语使地精缩小&#xff0…

从零实现Llama3中文版

1.前言 一个月前&#xff0c;Meta 发布了开源大模型 llama3 系列&#xff0c;在多个关键基准测试中优于业界 SOTA 模型&#xff0c;并在代码生成任务上全面领先。 此后&#xff0c;开发者们便开始了本地部署和实现&#xff0c;比如 llama3 的中文实现、llama3 的纯 NumPy 实现…

06中间件RTOS/CP

Autosar CP 操作系统详解-CSDN博客 1. 什么是RTOS &#xff1f; RTOS&#xff0c;英文全称是 Real-time Operation System&#xff0c;中文就是 实时操作系统&#xff0c;又称及时操作系统。 实时操作系统&#xff0c;是指当外界事件或数据产生时&#xff0c;能够接受并以足…

【HMGD】STM32/GD32 CAN通信

各种通信协议速度分析 协议最高速度(btis/s)I2C400KCAN1MCAN-FD5M48510MSPI36M CAN协议图和通信帧 CubeMX CAN配置说明 CAN通信波特率 APB1频率 / 分频系数 /&#xff08;BS1 BS2 同步通信段&#xff09;* 1000 ​ 42 / 1 / (111) * 1000 ​ 14,000 KHz ​ 1400000…

【Java面试】二、Redis篇(中)

文章目录 1、Redis持久化1.1 RDB1.2 AOF1.3 RDB与AOF的对比 2、数据过期策略&#xff08;删除策略&#xff09;2.1 惰性删除2.2 定期删除 3、数据淘汰策略4、主从复制4.1 主从全量同步4.2 增量同步 5、哨兵模式5.1 服务状态监控5.2 哨兵选主规则5.3 哨兵模式下&#xff0c;Redi…

Android ListView鼠标模式下ListView回滚问题

概述 在 Android 应用程序中&#xff0c;ListView 是一种常用的控件&#xff0c;用于显示可滚动列表数据。然而&#xff0c;当在鼠标操作模式下使用 ListView 时&#xff0c;可能会遇到一个问题&#xff1a;点击列表项时&#xff0c;列表会回滚到指定位置&#xff0c;这可能会导…

c语言IO

前言 老是忘记c语言IO操作&#xff0c;故写个文章记录一下 打开文件 fopen FILE *fopen(const char *path, const char *mode);mode 返回值 如果文件成功打开&#xff0c;fopen 返回一个指向 FILE 结构的指针。如果文件打开失败&#xff08;例如&#xff0c;因为文件不存…

CMS Full GC流程以及调优配置

个人博客 CMS Full GC流程以及调优配置 | iwts’s blog CMS CMS 收集器是以实现最短 STW 时间为目标的收集器&#xff0c;所以对于偏业务的后台开发而言&#xff0c;基本上都无脑选CMS了。 多线程收集器&#xff0c;工作在老年代&#xff0c;采用标记清除算法。比较特殊&am…

leetcode-顺时针旋转矩阵-111

题目要求 思路 1.假设现在有一个矩阵 123 456 789 2.我们可以根据19这个对角线将数据进行交换&#xff0c;得到矩阵 147 258 369 3.然后将矩阵每一行的数据再翻转&#xff0c;得到矩阵 741 852 963 代码实现 class Solution { public:vector<vector<int> > rot…