RabbitMQ介绍与使用

RabbitMQ官网

RabbitMQ 介绍

RabbitMQ 是一个开源的消息代理和队列服务器,基于 AMQP(高级消息队列协议)标准,使用 Erlang 编程语言构建。它是消息队列(MQ)的一种,广泛应用于分布式系统中,用于实现应用程序之间的异步消息传递。RabbitMQ 具有高可靠性、易扩展、高可用和功能丰富的特点,支持多种编程语言客户端,如 Java、Python、Ruby、C# 等。

RabbitMQ 的核心概念

  • Producer(生产者):消息的生产者,负责将消息发送到 RabbitMQ 中的 Exchange。
  • Consumer(消费者):消息的消费者,负责从队列中获取并处理消息。
  • Connection:生产者/消费者和 Broker 之间的 TCP 连接。
  • Channel:在 Connection 内部建立的逻辑连接,用于减少操作系统建立 TCP 连接的开销。
  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
  • Virtual Host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中。
  • Exchange:消息到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到队列中去。常用的类型有 direct、topic 和 fanout。
  • Queue:消息最终被送到这里等待消费者取走。

RabbitMq的交换机类型

1. Direct Exchange

  • 描述:Direct 交换机是最简单的交换机类型。它根据消息的 routing key 将消息路由到一个特定的队列。如果队列的 binding key 与消息的 routing key 完全匹配,则消息会被路由到该队列。
  • 特点
    • 一对一匹配:消息的 routing key 必须与队列的 binding key 完全相同。
    • 简单直接:适用于一对一的消息传递场景。
  • 示例
    • 生产者发送消息时指定 routing key 为 info
    • 队列 A 绑定到交换机时,binding key 也为 info
    • 消息将被路由到队列 A。

2. Topic Exchange

  • 描述:Topic 交换机允许更复杂的路由模式。消息的 routing key 和队列的 binding key 可以包含通配符,从而实现更灵活的路由规则。
  • 特点
    • 模式匹配:支持通配符 *(匹配一个单词)和 #(匹配多个单词)。
    • 灵活多变:适用于多对多的消息传递场景,可以实现复杂的路由逻辑。
  • 示例
    • 生产者发送消息时指定 routing key 为 user.info
    • 队列 A 绑定到交换机时,binding key 为 user.*
    • 队列 B 绑定到交换机时,binding key 为 *.info
    • 消息将被路由到队列 A 和队列 B。

3. Fanout Exchange

  • 描述:Fanout 交换机是最简单的广播交换机。它不关心消息的 routing key,将消息广播到所有绑定到该交换机的队列。
  • 特点
    • 广播消息:消息会被发送到所有绑定的队列,无论队列的 binding key 是什么。
    • 简单高效:适用于需要将消息广播到多个消费者的情况。
  • 示例
    • 生产者发送消息时,不指定 routing key。
    • 队列 A、队列 B 和队列 C 都绑定到该交换机。
    • 消息将被路由到队列 A、队列 B 和队列 C。

RabbitMQ 的主要特点

  • 可靠性:使用消息确认机制,确保消息的可靠传递。生产者在发送消息后会收到一个确认,消费者在处理完消息后会发送一个确认。如果消息发送或处理失败,RabbitMQ 会重新发送消息,直到确认为止。
  • 灵活性:支持多种消息传递模式,包括点对点、发布/订阅和消息路由等。
  • 可扩展性:可以通过添加更多的节点来实现水平扩展,以处理更大的消息负载。它还支持集群和镜像队列,提供高可用性和负载均衡。
  • 多语言支持:提供了多种编程语言的客户端库,包括 Java、Python、Ruby、C# 等。

MQ选型对比

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
可用性高,基于主从架构实现高可用高,基于主从架构实现高可用非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
单机吞吐量万级万级十万级十万级以上
消息延迟微秒级毫秒级毫秒级毫秒级以内
消息可靠性较高,基本不丢较低,有丢大概率经过参数优化配置,可以做到 0 丢失经过参数优化配置,可以做到 0 丢失

RabbitMQ安装

环境:Centos7.9,基于docker安装

1.使用docker run命令创建容器并安装mq

docker run \
 -e RABBITMQ_DEFAULT_USER=mqadmin \
 -e RABBITMQ_DEFAULT_PASS=mqadmin \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network mq-net\
 -d \
 rabbitmq:3.8-management

2.开放端口,或关闭防火墙(如果访问不了)

方法1:开放端口

#1.开放mq端口
firewall-cmd --zone=public --add-port=15672/tcp --add-port=5672/tcp --permanent
#2.重新加载防火墙配置
firewall-cmd --reload

方法2:临时关闭防火墙

systemctl stop firewalld

3.访问RabbitMQ控制台并登录

账号密码就是创建mq容器时指定的RABBITMQ_DEFAULT_USER和RABBITMQ_DEFAULT_PASS

访问IP地址:主机ip:15672

 

RabbitMQ控制台使用

1.收发消息

1.1创建消息队列

 1.2创建一个交换机

 1.3讲交换机与队列绑定

 1.4发送消息

1.5查看消息

2.数据隔离

当我们只部署了一个mq的话,当多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。

实现步骤:

1.在我们的用户管理创建一个新用户

可以看到我们的用户创建成功,但是刚创建的用户是没有虚拟主机的

2.登录新创建的用户,配置虚拟主机

我们可以通过右上角选择自己的虚拟主机

 可以看到,在我们选择我们当前用户的虚拟机主机之后,就看不到我们之前用/创建的队列了

SpringAMQP使用

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

Spring AMQPSpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

1.创建一个Maven的mqDemo项目

2.创建两个子模块publisher(消息的发送者)、consumer(消息的消费者)

3.在父模块的pom.xml中导入以下配置:

    <groupId>cn.mq.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>

4.在子模块pom.xml中分别导入以下配置:

publisher

    <parent>
        <artifactId>mq-demo</artifactId>
        <groupId>cn.mq.demo</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>publisher</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

consumer

    <parent>
        <artifactId>mq-demo</artifactId>
        <groupId>cn.mq.demo</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>consumer</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

5.在两个子模块的application.yaml中加入以下配置:

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 192.168.181.32 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /test # 虚拟主机
    username: testuser # 用户名
    password: testuser # 密码

6.创建交换机、队列,并监听消息

方式1基于配置类创建交换机、队列并绑定:

在consumer下创建一个configuration类

@Configuration
public class FanoutConfiguration {
  @Bean
  public FanoutExchange fanoutExchange() {
    //创建交换机
    return new FanoutExchange("test.fanout");
  }

  @Bean
  public Queue fanoutQueue1() {
    //创建队列
    return new Queue("fanout.queue1");
  }

  @Bean
  public Queue fanoutQueue2() {
    //创建队列
    return new Queue("fanout.queue2");
  }

  @Bean
  public Binding binDingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
    //将队列与交换机进行绑定
    return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
  }

  @Bean
  public Binding binDingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
    //将队列与交换机进行绑定
    return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
  }
}

然后创建一个监听类,监听消息

@Slf4j
@Component
public class SpringRabbitListener {

  @RabbitListener(queues = "fanout.queue1")//监听的队列
  public void listenerFanoutQueue1(String message) throws InterruptedException {
    System.out.println("消费者1接收到test.fanout消息:" + message + "," + LocalTime.now());
  }
  @RabbitListener(queues = "fanout.queue2")//监听的队列
  public void listenerFanoutQueue2(String message) throws InterruptedException {
    System.out.println("消费者2接收到test.fanout消息:" + message + "," + LocalTime.now());
  }
}

方式2基于注解创建交换机、队列并绑定:

@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "fanout.quque1"),
    exchange = @Exchange(name = "test.fanout", type = ExchangeTypes.FANOUT)
  ))
  public void listenerFanoutQueue1(String message) throws InterruptedException {
    System.err.println("消费者1接收到test.fanout消息:" + message + "," + LocalTime.now());
  }

  @RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "fanout.queue1"),
    exchange = @Exchange(name = "test.fanout", type = ExchangeTypes.FANOUT)
  ))
  public void listenerFanoutQueue2(String message) throws InterruptedException {
    System.err.println("消费者2接收到test.fanout消息:" + message + "," + LocalTime.now());
  }
}

 启动ConsumerApplication类,查看rabbitmq控制台查看是否已经创建交换机和队列成功,并且正确绑定(上面的方式实现一种即可)

 6.发送消息

在publisher创建测试类发送消息

@Slf4j
@SpringBootTest
class SpringAmqpTest {
  @Autowired
  RabbitTemplate rabbitTemplate;

  @Test
  public void testFanoutQueue() {
    String exchangeName = "test.fanout";//交换机名称
    String message = "hello,everyone!";//发送的消息
    rabbitTemplate.convertAndSend(exchangeName, "", message);
  }
}

8.测试

1.运行ConsumerApplication启动类,保持运行状态

2.运行testFanoutQueue测试类的方法

3.查看控制台输出

正确接收到消息! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/951930.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

自然语言处理之jieba分词和TF-IDF分析

jieba分词和TF-IDF分析 目录 jieba分词和TF-IDF分析1 jieba1.1 简介1.2 终端下载1.3 基本语法 2 TF-IDF分析2.1 什么是语料库2.2 TF2.3 IDF2.4 TF-IDF2.5 函数导入2.6 方法 3 实际测试3.1 问题解析3.2 代码测试 1 jieba 1.1 简介 结巴分词&#xff08;Jieba&#xff09;是一个…

rust学习——环境搭建

rust安装&#xff1a;https://kaisery.github.io/trpl-zh-cn/ch01-01-installation.html 1、vscode装插件&#xff1a; toml语法支持 依赖管理 rust语法支持 2、创建demo 3、查看目录 4、执行文件的几种方式&#xff1a; rust安装&#xff1a;https://www.rust-lang.org/z…

Opencv查找、绘制轮廓、圆形矩形轮廓和近似轮廓

查找、绘制轮廓、圆形矩形轮廓和近似轮廓 目录 查找、绘制轮廓、圆形矩形轮廓和近似轮廓1 轮廓查找和绘制1.1 轮廓查找1.1.1 函数和参数1.1.2 返回值 1.2 轮廓绘制1.2.1 函数和参数 1.3 步骤1.4 实际测试绘制轮廓 2 绘制近似轮廓2.1 函数和参数2.2 查找特定轮廓2.3 近似轮廓测试…

K8s Pod OOMKilled,监控却显示内存资源并未打满

1. 问题现象 pod一直重启&#xff0c;通过grafana查看&#xff0c;发现内存使用率并没有100%。 2. 排查过程 2.1 describe查看pod最新一次的状态 可以明显看到&#xff0c;最近一次的重启就是因为内存不足导致的。 2.2 describe 查看node节点状态 找到原因了&#xff0c;原来…

Knowledge Editing through Chain-of-Thought

题目 通过思路链进行知识编辑 论文地址&#xff1a;https://arxiv.org/abs/2412.17727 摘要 大型语言模型 (LLM) 在广泛的自然语言处理 (NLP) 任务中表现出卓越的能力。然而&#xff0c;由于频繁重新训练的成本很高&#xff0c;让这些模型与不断发展的世界知识保持同步仍然是一…

C语言Day14(c程序设计小红书+pta)

目录 &#xff08;一&#xff09;求总天数pta 题目说明&#xff1a; 代码实现&#xff1a; 程序分析&#xff1a; &#xff08;二&#xff09;十进制整数转换成R进制数pta 题目说明&#xff1a; 代码实现&#xff1a; 程序分析&#xff1a; &#xff08;三&#xff09;…

G1原理—3.G1是如何提升垃圾回收效率

大纲 1.G1为了提升GC的效率设计了哪些核心机制 2.G1中的记忆集是什么 3.G1中的位图和卡表 4.记忆集和卡表有什么关系 5.RSet记忆集是怎么更新的 6.DCQ机制的底层原理是怎样的 7.DCQS机制及GC线程对DCQ的处理 提升G1垃圾回收器GC效率的黑科技 G1设计了一套TLAB机制 快速…

算法(二)——一维差分、等差数列差分

文章目录 一维差分、等差数列差分一维差分例题&#xff1a;航班预订统计 等差数列差分例题&#xff1a;三步必杀例题&#xff1a;Lycanthropy 一维差分、等差数列差分 一维差分 差分解决的是 区间修改&#xff08;更新&#xff09;问题&#xff0c;特别是多次区间修改问题&…

nexus搭建maven私服

说到maven私服每个公司都有&#xff0c;比如我上一篇文章介绍的自定义日志starter&#xff0c;就可以上传到maven私服供大家使用&#xff0c;每次更新只需deploy一下就行&#xff0c;以下就是本人搭建私服的步骤 使用docker安装nexus #拉取镜像 docker pull sonatype/nexus3:…

StarRocks Awards 2024 年度贡献人物

在过去一年&#xff0c;StarRocks 在 Lakehouse 与 AI 等关键领域取得了显著进步&#xff0c;其卓越的产品功能极大地简化和提升了数据分析的效率&#xff0c;使得"One Data&#xff0c;All Analytics" 的愿景变得更加触手可及。 虽然实现这一目标的道路充满挑战且漫…

安卓app抓包总结(精)

前言 这里简单记录一下相关抓包工具证书的安装 burp证书安装 安装证书到移动设备(安卓7以后必须上传到设备系统根证书上) 导出证书 openssl x509 -inform DER -in cacert.der -out cacert.pem 转换格式 openssl x509 -inform PEM -subject_hash_old -in cacert.pem …

Nginx代理同域名前后端分离项目的完整步骤

前后端分离项目&#xff0c;前后端共用一个域名。通过域名后的 url 前缀来区别前后端项目。 以 vue php 项目为例。直接上 server 模块的 nginx 配置。 server{ listen 80; #listen [::]:80 default_server ipv6onlyon; server_name demo.com;#二配置项目域名 index index.ht…

嵌入式C语言:二维数组

目录 一、二维数组的定义 二、内存布局 2.1. 内存布局特点 2.2. 内存布局示例 2.2.1. 数组元素地址 2.2.2. 内存布局图&#xff08;简化表示&#xff09; 2.3. 初始化对内存布局的影响 三、访问二维数组元素 3.1. 常规下标访问方式 3.2. 通过指针访问 3.2.1. 指向数…

Google发布图像生成新工具Whisk:无需复杂提示词,使用图像和人工智能将想法可视化并重新混合

Whisk 是 Google Labs 的一项新实验&#xff0c;可使用图像进行快速而有趣的创作过程。Whisk不会生成带有长篇详细文本提示的图像&#xff0c;而是使用图像进行提示。只需拖入图像&#xff0c;即可开始创建。 whisk总结如下&#xff1a; Whisk 是 Google 实验室最新的生成图像实…

Linux下jar包脚本工具

Jar包启动、停止、重启 文章目录 Jar包启动、停止、重启1、 新建脚本2、启动服务脚本代码3、停止服务脚本代码4、重启服务脚本5、赋予脚本可执行权利 1、 新建脚本 vim 脚本明字.sh2、启动服务脚本代码 #!/bin/bash# 设置Java应用的名称&#xff08;可根据实际情况修改&#…

相机和激光雷达的外参标定 - 无标定板版本

1. 实现的效果 通过本软件实现求解相机和LiDAR的外参&#xff0c;即2个传感器之间的三维平移[x, y, z]和三维旋转[roll, pitch, yaw]。完成标定后&#xff0c;可将点云投影到图像&#xff0c;效果图如下&#xff1a; 本软件的优势&#xff1a;&#xff08;1&#xff09;无需特…

Github出现复杂问题 无法合并 分支冲突太多 如何复原

目录 问题再现 解决思路 当然我所指的是在 main 分支开一个新的分支 删除本地文件夹 重新克隆 开一个新分支 切换分支 下载远程分支 文件覆盖 合并到主分支 ​​​​​​​问题再现 太复杂了 无法更改 编译器现状 全部崩溃了 无法更改 即使创建一个新的分支也无济于…

uniapp使用chooseLocation安卓篇

本文章全部以高德地图为例 代码 <view class"bottom"><button click"choose">定位</button> </view> choose() {uni.chooseLocation({success: function(res) {console.log(位置名称&#xff1a; res.name);console.log(详细地…

设计模式-结构型-组合模式

1. 什么是组合模式&#xff1f; 组合模式&#xff08;Composite Pattern&#xff09; 是一种结构型设计模式&#xff0c;它允许将对象组合成树形结构来表示“部分-整体”的层次结构。组合模式使得客户端对单个对象和组合对象的使用具有一致性。换句话说&#xff0c;组合模式允…

AI多模态论文解读:LLaVA-CoT:让视觉语言模型逐步推理

本文作者&#xff1a;AIGCmagic社区 猫先生 一、简 介 LLaVA-CoT引入了四个不同的阶段&#xff08;摘要、标题、推理和结论&#xff09;&#xff0c;使模型能够独立进行系统化的多阶段推理&#xff0c;显著提高了在推理密集型任务上的准确性。 编译了LLaVA-CoT-100k数据集&am…