微服务技术栈SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式(四):消息队列MQ

文章目录

  • 一、消息队列MQ
  • 二、RabbitMQ
    • 2.1 单机部署
    • 2.2 消息模型
  • 三、SpringAMAP
    • 3.1 简单消息队列
    • 3.2 工作消息队列
    • 3.3 发布-订阅模型:FanoutExchange 广播交换机
    • 3.4 发布-订阅模型:DirectExchange 路由交换机
    • 3.5 发布-订阅模型:TopicExchange 话题交换机
    • 3.6 消息转换器


一、消息队列MQ

同步调用的优点:时效性较强,可以立即得到结果;
同步调用的问题:耦合度高;性能和吞吐能力下降;有额外的资源消耗;有级联失败问题;

异步调用常见实现就是事件驱动模式
异步通信的优点:耦合度低;吞吐量提升;故障隔离;流量削峰;
异步通信的缺点:依赖于Broker的可靠性、安全性、吞吐能力;架构复杂了,业务没有明显的流程线,不好追踪管理;

在这里插入图片描述

二、RabbitMQ

RabbitMQ的官网:https://www.rabbitmq.com/
在这里插入图片描述
在这里插入图片描述

2.1 单机部署

我们在Centos7虚拟机中使用Docker来安装。

  1. 下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar
  1. 安装MQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management
  1. 通过 主机IP:15672 访问RabbitMQ的管理界面
    在这里插入图片描述

2.2 消息模型

在这里插入图片描述

在官网 https://www.rabbitmq.com/ 中,选择文件 -> 入门,可看见案例demo
在这里插入图片描述
下面演示:官网的基本消息队列模型
在这里插入图片描述

publisher

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.101.6");
        factory.setPort(5672);
        factory.setVirtualHost("/");  //虚拟主机
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

consumer

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.101.6");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

在这里插入图片描述

三、SpringAMAP

在这里插入图片描述

3.1 简单消息队列

在这里插入图片描述

流程如下:

  1. 在父工程中引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  1. 在publisher和consumer服务中编写application.yml,添加mq连接信息
spring:
  rabbitmq:
    host: 192.168.150.101 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /  # 虚拟主机
  1. 在publisher服务中新建一个测试类,编写测试方法,然后运行测试方法发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() {
        String queueName = "simple.queue";  // 队列名称
        String message = "hello, spring amqp!"; // 消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}
  1. 在consumer服务中新建一个类,编写消费逻辑,然后启动服务。 定义类,添加@Component注解;类中声明方法,添加@RabbitListener注解,方法参数就时消息。注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
@Component
public class SpringRabbitListener {

     @RabbitListener(queues = "simple.queue")
     public void listenSimpleQueue(String msg) {
         System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
     }
}

3.2 工作消息队列

在这里插入图片描述
在这里插入图片描述

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2WorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello, message__";
        for (int i = 1; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }
}
  1. 设置两个消费者
@Component
public class SpringRabbitListener {


    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }
  1. 由于消费者存在消息预取机制【即:消费者会将队列中的消息提前取出来,再处理】导致两个消费者处理消息的数量一致【即:一半一半】,因此需要在消费者的application设置prefetch=1如下【用来保证每次处理完一条消息再取消息】,这样消费者1比消费者2处理的消息更多。
spring:
  rabbitmq:
    host: 192.168.150.101 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: itcast
    password: 123321
    virtual-host: /  # 虚拟主机
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成后才能获取下一个消息,该用于解决消息预取机制

3.3 发布-订阅模型:FanoutExchange 广播交换机

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendFanoutExchange() {
        // 交换机名称
        String exchangeName = "itcast.fanout";
        // 消息
        String message = "hello, every one!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
}
  1. 声明一个交换机,两个消息队列,并完成绑定,然后设置两个消费者接收消息。最后测试发现两个消费者可以接收发布者的消息
package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    // 1.声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    // 2.声明第1个队列 注意:此方法名是该队列的唯一ID
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    // 3.绑定队列1到交换机 注意:参数名要与上述定义的方法名保持一致
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }

    // 声明第2个队列 注意:此方法名是该队列的唯一ID
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    // 绑定队列2到交换机 注意:参数名要与上述定义的方法名保持一致
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
}

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
    }

在这里插入图片描述

3.4 发布-订阅模型:DirectExchange 路由交换机

在这里插入图片描述

在这里插入图片描述

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "itcast.direct";
        // 消息
        String message = "hello, blue!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "blue", message);
    }
}
  1. 设置两个消费者,并且用@RabbitListener注解的方式声明 Binding Queue Exchange Key。当发送者发送key=blue的消息时,只有消费者1收到。
@Component
public class SpringRabbitListener {
    /**
     * 4.发布-订阅模型:Direct 路由
     * 用注解的方式声明 Binding Queue Exchange Key
     */

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))   // type表示哪种交换机
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }
}

在这里插入图片描述

3.5 发布-订阅模型:TopicExchange 话题交换机

在这里插入图片描述
在这里插入图片描述
实现消费者1接收中国的所有消息,消费者2接收所有的新闻

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Test
    public void testSendTopicExchange() {
        // 交换机名称
        String exchangeName = "itcast.topic";
        // 消息
        String message = "今天天气不错,我的心情好极了!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.wearther", message);
    }
}
  1. 设置两个消费者,并且用@RabbitListener注解的方式声明 Binding Queue Exchange Key。当发送者发送key=china.wearther的消息时,只有消费者1收到。
@Component
public class SpringRabbitListener {
    /**
     * 5.发布-订阅模型:topic 路由
     * 用注解的方式声明 Binding Queue Exchange Key
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }
}

3.6 消息转换器

上述传递的都是String类型的,而实际需要传递Object类型的数据,因此我么需要对消息进行转换

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Test
    public void testSendObjectQueue() {
        Map<String, Object> msg = new HashMap<>();
        msg.put("name","小明");
        msg.put("age",12);
        // 发送消息
        rabbitTemplate.convertAndSend("object.queue", msg);
    }
}
  1. 设置两个消费者,并且用@RabbitListener注解的方式声明 Binding Queue Exchange Key。当发送者发送key=china.wearther的消息时,只有消费者1收到。
@Component
public class SpringRabbitListener {
    /**
     * 6.消息转换器
     *
     */
    @RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String,Object> msg){
        System.out.println("接收到object.queue的消息:" + msg);
    }
}
  1. 但这样存在一个问题publisher发布的数据被序列化,因此我们需要在publisher和consumer的pom文件(或者父工程的pom文件)中添加依赖,并且在Application中反序列化
        <!--JSON序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/443136.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu平铺左、右、上、下、1/2、1/4窗口(脚本)

前言 之前因为一直在用Ubuntu 18或者Ubuntu 20然后发现装了GNOME插件后&#xff0c;电脑在使用过程中&#xff0c;会时不时的卡死&#xff08;鼠标没问题&#xff0c;键盘输入会有10-20秒的延迟&#xff09;频率基本是一小时一次&#xff0c;因为这种卡顿会很容易打断思路&…

【职言】三年功能测试,一些测试工作的“吐槽”

以下为作者观点&#xff1a; 概述 作为功能测试&#xff0c;我也分享下日常工作中功能测试值得吐槽的问题&#xff0c;由于工作时间不长且未进过大厂&#xff0c;不了解大公司的工作模式和流程&#xff0c;所以自己的方法和理解都是基于中小公司的工作经验总结&#xff0c;应…

Linux 之五:权限管理(文件权限和用户管理)

1. 文件权限 在Linux系统中&#xff0c;文件权限是一个非常基础且重要的安全机制。它决定了用户和用户组对文件或目录的访问控制级别。 每个文件或目录都有一个包含9个字符的权限模式&#xff0c;这些字符分为三组&#xff0c;每组三个字符&#xff0c;分别对应文件所有者的权限…

2024蓝桥杯每日一题(差分)

一、第一题&#xff1a;空调 解题思路&#xff1a;差分 希望P减掉T后就相当于从0到New_P&#xff0c;想到得到New_P只需要对全0数组进行若干次区间加操作&#xff0c;所以只需要对New_P数组进行差分&#xff0c;累加正数和负数&#xff0c;哪个绝对值大答案就是那个。 …

【深度学习笔记】6_8 长短期记忆(LSTM)

注&#xff1a;本文为《动手学深度学习》开源内容&#xff0c;部分标注了个人理解&#xff0c;仅为个人学习记录&#xff0c;无抄袭搬运意图 6.8 长短期记忆&#xff08;LSTM&#xff09; 本节将介绍另一种常用的门控循环神经网络&#xff1a;长短期记忆&#xff08;long shor…

二叉树的先序遍历详解(小白也懂)(附带中序,后序代码)

文章目录 二叉树的先序遍历(分而治之思想)样例图函数的递归调用源代码运行结果 二叉树的先序遍历(分而治之思想) 代码在文章底部。 先序遍历又叫先根遍历&#xff0c;顾名思义&#xff1a;遍历顺序为根&#xff0c;左子树&#xff0c;右子树。 样例图 本文将对以下二叉树进…

关于 JVM

1、请你谈谈你对JVM的理解&#xff1f; JVM由JVM运行时数据区&#xff08;图示中蓝色框包含部分&#xff09;、执行引擎、本地库接口、本地方法库组成。 JVM运行时数据区&#xff0c;分为方法区、堆、虚拟机栈、本地方法栈和程序计数器。 1.方法区 Java 虚拟机规范中定…

YOLOv8-Seg改进:SPPF涨点篇 |引入YOLOv9的SPPELAN

🚀🚀🚀本文改进:SPP创新结合ELAN,来自于YOLOv9,助力YOLOv8,将SPPELAN代替原始的SPPF 🚀🚀🚀YOLOv8-seg创新专栏:http://t.csdnimg.cn/KLSdv 学姐带你学习YOLOv8,从入门到创新,轻轻松松搞定科研; 1)手把手教你如何训练YOLOv8-seg; 2)模型创新,提升分割…

day59 线程

创建线程的第二种方式 实现接口Runnable 重写run方法 创建线程的第三种方式 java.util.concurrent下的Callable重写call()方法 java.util.concurrent.FutureTask 创建线程类对象 获取返回值 线程的四种生命周期 线程的优先级1-10 default为5&#xff0c;优先级越高&#xff0c…

JavaEE进阶(15)Spring原理:Bean的作用域、Bean的生命周期、Spring Boot自动配置(加载Bean、SpringBoot原理分析)

接上次博客&#xff1a;JavaEE进阶&#xff08;14&#xff09;Linux基本使用和程序部署&#xff08;博客系统部署&#xff09;-CSDN博客 目录 关于Bean的作用域 概念 Bean的作用域 Bean的生命周期 源码阅读 Spring Boot自动配置 Spring 加载Bean 问题描述 原因分析 …

JavaEE企业开发新技术

目录 2.1 Class对象基本概念 1、概念 2.2 Class对象的获取方式 2.3基本数据类型的Class对象 1、概念 2.4 反射的基本概念 概念 2.5 Class对象的基本使用-1 2.6 Class对象的基本使用-2 newInstance()和new()区别&#xff1a; 2.1 Class对象基本概念 1、概念 反射的…

面试经典150题——合并两个有序链表

You just work on it. Time will do the rest! 1. 题目描述 2. 题目分析与解析 2.1 思路一 这个题目还是比较简单的&#xff0c;通过分析题目&#xff0c;我们可以知道题目中关键信息为&#xff1a; 所以我们只需要从前向后遍历两个链表&#xff0c;在两个链表不空的情况下&…

白皮书发布|超融合运行 K8s 的场景、功能与优势

目前&#xff0c;不少企业都使用虚拟化/超融合运行 Kubernetes 和容器化应用。一些用户可能会有疑惑&#xff1a;既然 Kubernetes 可以部署在裸金属上&#xff0c;使用虚拟化不是“多此一举”吗&#xff1f; 在电子书《IT 基础架构团队的 Kubernetes 管理&#xff1a;从入门到…

Android中显式Intent和隐式Intent的区别

1、intent的中文名 称是意图&#xff0c;Intent是各个组件之间信息沟通的桥梁&#xff0c; 既能在Activity之间沟通&#xff0c;又能在Activity与Service之间沟通&#xff0c;也能在Activity与Broadcast之间沟通 **intent组成元素的列表说明**2、显式Intent&#xff0c;直接指定…

河北专升本(C语言编程题)

一&#xff1a;基础算法原理 1. 冒泡排序 原理&#xff1a;从左到右&#xff0c;相邻元素进行比较。每次比较一轮&#xff0c;就会找到序列中最大的一个或最小的一个。这个数就会从序列的最右边冒出来。 以从小到大排序为例&#xff0c;第一轮比较后&#xff0c;所有数中最大的…

软考71-上午题-【面向对象技术2-UML】-UML中的图2

一、用例图 上午题&#xff0c;考的少&#xff1b;下午题&#xff0c;考的多。 1-1、用例图的定义 用例图展现了一组用例、参与者以及它们之间的关系。 用例图用于对系统的静态用例图进行建模。 可以用下列两种方式来使用用例图&#xff1a; 1、对系统的语境建模&#xff1b…

自动驾驶革命:解密端到端背后的数据、算力和AI奇迹

作者 |毫末智行数据智能科学家 贺翔 编辑 |祥威 最近&#xff0c;特斯拉FSD V12的发布引发了业界对端到端自动驾驶的热议&#xff0c;业界纷纷猜测FSD V12的强大能力是如何训练出来的。从马斯克的测试视频可以大致归纳一下FSD V12系统的一些核心特征&#xff1a; 训练数据&am…

Redis的集群模式

Redis有三种主要的集群模式&#xff0c;用于在分布式环境中实现高可用性和数据复制。这些集群模式分别是:主从复制(Master-Slave Replication)、哨兵模式(Sentinel)和Redis Cluster模式。 主从模式 主从复制是Redis最简单的集群模式。这个模式主要是为了解决单点故障的问题&a…

探索Cglib:解析动态代理的神奇之处

文章目录 CGLIB介绍CGLIB使用示例CGLIB核心原理分析代理类分析代理方法分析 FastClass机制分析 CGLIB介绍 CGLIB(Code Generation Library)是一个开源项目&#xff01;是一个强大的&#xff0c;高性能&#xff0c;高质量的Code生成类库&#xff0c;它可以在运行期扩展Java类与…

【echarts】xAxis鼠标事件失效问题

项目中用到echarts柱状图&#xff0c;出现x轴标签文字过长重叠问题&#xff0c;在pass掉标签倾斜、换行方案之后最终决定限制文字长度&#xff0c;超出以…占位&#xff0c;鼠标悬浮时显示完整tooltip。 但编写过程中发现xAxis鼠标事件无法触发&#xff0c;只有bar区域是可触发…