MQ - RabbitMQ、SpringAMQP --学习笔记

什么是MQ?

MQ 是消息队列(Message Queue)的缩写,它是一种应用程序间异步通信的技术。消息队列允许应用程序或服务间通过发送消息来交换数据,而不是直接调用对方,从而实现解耦、异步处理和负载均衡等目的。

简单来说,消息队列就像是一个邮局,应用程序就像是寄信人和收信人。一个应用程序(寄信人)发送消息(信件)到消息队列(邮局),另一个应用程序(收信人)从队列中取出消息进行处理。这个过程可以是同步的,也可以是完全异步的,意味着发送者和接收者不必同时在线,他们通过消息队列中转消息。

消息队列的主要优势包括:

  1. 解耦:发送者和接收者只依赖于消息队列,而不是直接依赖于对方,降低系统间的耦合度。
  2. 异步处理:发送者可以快速发送消息并继续处理其他任务,无需等待接收者的响应。
  3. 负载均衡:通过调整消费者的数量来处理不同的负载。
  4. 缓冲:在高负载情况下,消息队列可以作为缓冲,防止系统因请求过多而崩溃。
  5. 可扩展性:系统组件可以独立地扩展,提高系统整体的扩展性和弹性。

消息队列广泛应用于微服务架构、分布式系统、大数据处理流程等领域,常见的消息队列实现包括 Kafka、RabbitMQ、ActiveMQ 等。

RabbitMQ

一、安装

以下是基于docker的安装步骤

使用下面的命令即可:

docker run \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network networkname # 网络名\
 -d \
 rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://虚拟机ip:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在命令中已经指定了。

 二、基本构造

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方

  • consumer:消费者,也就是消费消息的一方

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理,可通过控制台的Queue选项卡进行管理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列,可通过控制台的Exchange选项卡进行管理

  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,通过控制台的Admin选项卡进行管理

SpringAMQP

一、概述

我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

二、使用

1、引入依赖

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

 2、修改配置

spring:
  rabbitmq:
    host: 192.168.88.130 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: admin # 用户名
    password: 123456 # 密码

3、发送消息示例:

    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }

4、接收消息示例:


@Component
public class SpringRabbitListener {
        // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

WorkQueues:任务模型。简单来说就是多个消费者绑定到一个队列,共同消费队列中的消息

5、交换机类型

在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机

  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。

  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

 Fanout交换机

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。

在广播模式下,消息发送流程是这样的:

  • 创建一个名为 hmall.fanout的交换机,类型是Fanout

  • 创建两个队列fanout.queue1fanout.queue2,绑定到交换机hmall.fanout

  • consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

  • 在publisher中编写测试方法,向test.fanout发送消息

说白了,就是发送给Fanout交换机的消息,Fanout交换机会将消息转发给所有绑定它的队列(广播)

发送示例:

@Test
public void testFanoutExchange() {
    // 交换机名称
    String exchangeName = "test.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

接收示例:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

  1. 声明一个名为hmall.direct的交换机

  2. 声明队列direct.queue1,绑定hmall.directbindingKeybludred

  3. 声明队列direct.queue2,绑定hmall.directbindingKeyyellowred

  4. consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

  5. 在publisher中编写测试方法,向hmall.direct发送消息

说白了,就是在绑定队列与交换机时添加了一个RoutingKey(路由key),相当于令牌, 利用这个RoutingKey做身份验证,将消息发送给需要的消费者

发送示例:

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "test.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

接收示例:

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

图示:

假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news 代表有中国的新闻消息;

  • china.weather 代表中国的天气消息;

  • japan.news 则代表日本新闻

  • japan.weather 代表日本的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:

    • china.news

    • china.weather

  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:

    • china.news

    • japan.news

 至于收发消息,与DIrect交换机一致

声明队列和交换机

可以在控制台通过图形化界面手动声明(很简单不赘述)

也可以直接在程序中声明,SpringAMQP提供了一个Queue类,用来创建队列:

示例:

    /**
     * 声明交换机
     * @return Direct类型交换机    
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();    //FanoutExchange   topicExchange
    }

    /**
     * 声明队列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }


    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }

用注解声明:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

配置JSON转换器

默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

首先在publisherconsumer两个服务中都引入依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/753205.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

无需高配置 怎么获得超流畅的VR体验?

传统VR眼镜在使用中存在一些显著不足&#xff0c;而实时渲染技术又是如何解决的&#xff1f;接下来与大家共同探讨遇到的问题以及实时渲染在VR眼镜中的实际应用。 1、高配置要求 目前主流VR一体机的眼镜需要较高配置才能运行普通VR内容&#xff0c;且受限于VR眼镜的算力限制&…

工作纪实51-手撸AB实验分流策略

前几天写了一篇关于哈希算法的文章&#xff0c;起源就是在构思AB实验平台的时候&#xff0c;用到了哈希&#xff0c;所以对其做了深入的了解 AB实验平台是一般互联网做策略、样式实验会用到的一个系统&#xff0c;一般开启某个实验之后&#xff0c;需要对线上流量进行分流&…

太速科技-FMC144 -八路 250MSPS 14bit AD FMC子卡

FMC144 -八路 250MSPS 14bit AD FMC子卡 一、板卡概述   FMC144是一款具有8通道模数转换器&#xff08;ADC&#xff09;的FMC卡&#xff0c;具有14bit分辨率&#xff0c;最大采样速率达250Msps。时钟配置芯片为AD9516-1&#xff0c;可由板载10MHz时钟提供参考&#xff0c;也可…

[游戏开发][UE5]引擎学习记录

C Log和蓝图Log C Log 方法 UE_Log(参数1&#xff0c;参数2&#xff0c;参数3) //举例: UE_LOG(LogTemp, Error, TEXT("Log Info: %s"),"Test Log"); 三个参数的作用 参数1&#xff1a;输出窗口归类使用&#xff0c;你写什么它就显示什么 参数2&#x…

node.js安装

下载地址 https://nodejs.org/en/download 安装教程

Stable Diffusion初体验——提示词指南

前言 Stable Diffusion是一种深度学习模型&#xff0c;它能够根据提示词生成高质量的图像。在Stable Diffusion模型中&#xff0c;提示词起着至关重要的作用&#xff0c;因为它们为模型提供了关于所需输出的指导。本文将探讨Stable Diffusion关于提示词的原理&#xff0c;包括…

k8s集群node节点加入失败

出现这种情况&#xff1a; [preflight] FYI: You can look at this config file with kubectl -n kube-system get cm kubeadm-config -o yaml [kubelet-start] Writing kubelet configuration to file "/var/lib/kubelet/config.yaml" [kubelet-start] Writing kub…

计算机网络——数据链路层(数据链路层概述及基本问题)

链路、数据链路和帧的概念 数据链路层在物理层提供服务的基础上向网络层提供服务&#xff0c;其主要作用是加强物理层传输原始比特流的功能&#xff0c;将物理层提供的可能出错的物理连接改造为逻辑上无差错的数据链路&#xff0c;使之对网络层表现为一条无差错的链路。 链路(…

sheng的学习笔记-AI-K均值算法

ai目录&#xff1a;sheng的学习笔记-AI目录-CSDN博客 需要学习前置知识&#xff1a;聚类&#xff0c;可参考 sheng的学习笔记-聚类(Clustering)-CSDN博客 目录 什么是k均值算法 流程 伪代码 数据集 伪代码 代码解释 划分示意图 优化目标 随机初始化 选择聚类数…

酣客的“FFC模式”|白酒商业模式|分润制度顶层架构设计

酣客公社摒弃传统商业模式&#xff0c;提出“心联网”及“FFC模式”的商业模式。 坐标&#xff1a;厦门&#xff0c;我是肖琳 深耕社交新零售行业10年&#xff0c;主要提供新零售系统工具及顶层商业模式设计、全案策划运营陪跑等。 今天和大家分享“酣客”的营销模式&#xff…

Parallels Toolbox for mac(pd工具箱) 6.0.2激活版

Parallels Toolbox 是由 Parallels 公司开发的一款实用工具集合软件&#xff0c;它主要面向使用 Parallels Desktop 的用户&#xff0c;提供了许多方便用户在 macOS 和 Windows 之间进行切换和管理的工具。Parallels Desktop 是一款流行的虚拟化软件&#xff0c;允许用户在 mac…

【24医学顶刊】GANDALF:主动学习 + 图注意力变换器 + 变分自编码器,改善多标签图像分类

GANDALF&#xff1a;主动学习 图注意力变换器 变分自编码器&#xff0c;改善多标签图像分类 提出背景子解法1&#xff1a;多标签信息样本的选择子解法2&#xff1a;生成信息丰富且非冗余的合成样本 例子&#xff1a;胸部X射线图像分析传统方法的操作和局限GaNDLF方法的优势 工…

理解ABP的领域驱动设计

大家好&#xff0c;我是张飞洪&#xff0c;感谢您的阅读&#xff0c;我会不定期和你分享学习心得&#xff0c;希望我的文章能成为你成长路上的垫脚石&#xff0c;让我们一起精进。 关于玩转ABP框架相关的文章&#xff0c;之前在博客园陆续写了《ABP vNext系列文章和视频》&…

电路仿真王者之争:SmartEDA如何领跑业界,打破传统仿真软件格局?

在电子设计领域&#xff0c;电路仿真软件一直扮演着至关重要的角色。它们为工程师们提供了一个虚拟的实验室&#xff0c;可以在不耗费大量实际资源的情况下&#xff0c;进行电路设计、优化和测试。在众多电路仿真软件中&#xff0c;SmartEDA以其独特的优势&#xff0c;逐渐崭露…

嵌入式开发十九:SysTick—系统定时器

在前面实验中我们使用到的延时都是通过SysTick进行延时的。 我们知道&#xff0c;延时有两种方式&#xff1a;软件延时&#xff0c;即CPU 循环等待产生的&#xff0c;这个延时是不精确的。第二种就是滴答定时器延时&#xff0c;本篇博客就来介绍 STM32F4 内部 SysTick 系统定时…

浅谈API生态建设:API安全策略的6项原则

API作为连接系统与应用的桥梁&#xff0c;在助力实现高效业务流程的同时&#xff0c;也不可避免出现资产管理困难、敏感数据泄漏风险骤增等安全问题。前段时间&#xff0c;安全公司Fastly公布了一项重磅调查报告&#xff0c;报告中显示95%的企业在过去1年中遭遇过API安全问题。…

AXI接口简介

AXI接口&#xff0c;全称为Advanced eXtensible Interface&#xff0c;是ARM公司推出的一种高性能、低成本、可扩展的高速总线接口。AXI接口是ARM公司提出的AMBA&#xff08;Advanced Microcontroller Bus Architecture&#xff09;高级微控制器总线架构的一部分。2003年发布了…

简易电阻、电容和电感测量仪-FPGA

通过VHDL语言编写程序用于设计电阻、电容和电感测量仪&#xff0c;通过使用试验箱进行验证是否设计正确&#xff0c;资料获取到咸&#x1f41f;&#xff1a;xy591215295250 \\\或者联系wechat 号&#xff1a;comprehensivable 设计并制作--台数字显示的电阻、电容和电感参数测试…

07-border布局的另一个用处

07-border布局的另一个用处 实现如下的布局: 分析: 1.USERNAME和PASSWORD使用form 2.PASSWORD的文本框使用NewMultiLineEntry 布局1 USERNAME和PASSWORD作为一个form整体&#xff0c;使用border布局&#xff0c;form设置为top&#xff0c;文本框设置为center参数。 packa…

Postman 接口测试 安装使用教程

1 下载官网:https://www.postman.com/downloads/ 2 方便下载,特提供百度云网盘: 链接&#xff1a;Postman 3 windows10 安装&#xff0c;点击安装包 #自动安装&#xff0c;并打开 4 举例&#xff0c;比如豆瓣&#xff0c;get 查询时间&#xff0c;图片登 5 举例&#xff0…