【RabbitMQ】RabbitMQ 的概念以及使用RabbitMQ编写生产者消费者代码

目录

1. RabbitMQ 核心概念

1.1生产者和消费者

1.2 Connection和Channel

1.3 Virtual host

1.4 Queue

1.5 Exchange

1.6 RabbitMO工作流程

2. AMQP

3.RabbitMO快速入门

3.1.引入依赖

3.2.编写生产者代码

​3.3.编写消费者代码

4.源码


1. RabbitMQ 核心概念

在安装RabbitMQ和管理界面之后, 访问云服务器ip和相应的端口号, 会展示如下界面:

界面上的导航栏共分6部分,这6部分分别是什么意思呢,我们先看看RabbitMO的工作流程 

RabbitMO是一个消息中间件, 也是一个生产者消费者模型, 它负责接收, 存储并转发消息

消息传递的过程类似邮局:

当你要发送一个邮件时,你把你的邮件放到邮局,邮局接收邮件,并通过邮递员送到收件人的手上.

按照这个逻辑,制片人就类似邮件发件人.Consumer就是收件人,RabbitMQ就类似于邮局

1.1生产者和消费者

  • Producer: 生产者,是RabbitMQ Server的客户端,向RabbitMQ发送消息
  • Consumer: 消费者,也是RabbitMQ Server的客户端,从RabbitMQ接收消息
  • Broker: 其实就是RabbitMO Server,主要是接收和收发消息

生产者(Producer)创建消息, 然后发布到RabbitMQ中, 在实际应用中, 消息通常是一个带有一定业务
逻辑结构的数据, 比如JSON字符串, 消息可以带有一定的标签, RabbitMO会根据标签进行路由, 把消息发送给感兴趣的消费者(Consumer).

消费者连接到RabbitMQ服务器, 就可以消费消息了, 消费的过程中, 标签会被丢掉, 消费者只会收到
消息, 并不知道消息的生产者是谁, 当然消费者也不需要知道.

对于 RabbitMO 来说, 一个 RabbitMO Broker 可以简单地看作一个 RabbitMO 服务节点, 或者
RabbitMO 服务实例, 大多数情况下也可以将一个 RabbitMO Broker 看作一台 RabbitMO 服务器

1.2 Connection和Channel

Connection: 连接. 是客户端和RabbitMO服务器之间的一个TCP连接, 这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息.

Channel: 通道, 信道. Channel是在Connection之上的一个抽象层, 在 RabbitMO中, 一个TCP连接可以有多个Channel, 每个Channel都是独立的虚拟连接, 消息的发送和接收都是基于 Channel的.

通道的主要作用是将消息的读写操作复用到同一个TCP连接上,这样可以减少建立和关闭连接的开销提高性能.

1.3 Virtual host

Virtual host: 虚拟主机. 这是一个虚拟概念, 它为消息队列提供了一种逻辑上的隔离机制. 对于
RabbitMQ而言, 一个 BrokerServer 上可以存在多个 Virtual Host. 当多个不同的用户使用同一个
RabbitMQ Server 提供的服务时,可以虚拟划分出多个vhost,每个用户在自己的 vhost 创建
exchange/queue等

类似MySOL的 "database" , 是一个逻辑上的集合. 一个MySQL服务器可以有多个database

1.4 Queue

Queue: 队列, 是RabbitMO的内部对象, 用于存储消息

多个消费者, 可以订阅同一个队列

1.5 Exchange

Exchange: 交换机. message 到达 broker 的第一站, 它负责接收生产者发送的消息, 并根据特定的规则把这些消息路由到一个或多个Queue列中.

Exchange起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息

类似于发快递之后, 物流公司怎么处理呢, 根据咱们的地址来分派这个快递到不同的站点, 然后再送到收件人手里, 这个分配的工作, 就是交换机来做的

1.6 RabbitMO工作流程

理解了上面的概念之后, 再来回顾一下这个图, 来看RabbitMO的工作流程

1. Producer 生产了一条消息
2. Producer 连接到RabbitMQBroker,建立一个连接(Connection), 开启一个信道(Channel)
3. Producer 声明一个交换机(Exchange), 路由消息
4. Producer 声明一个队列(Queue), 存放信息
5. Producer 发送消息至 RabbitMO Broker
6. RabbitMQ Broker 接收消息, 并存入相应的队列(Queue)中, 如果未找到相应的队列, 则根据生产者的配置, 选择丢弃或者退回给生产者.

如果我们把RabbitMQ比作一个物流公司,那么它的一些核心概念可以这样理解:
1.Broker就类似整个物流公司的总部,它负责协调和管理所有的物流站点,确保包裹安全、高效      地送达.

2.Virtual Host可以看作是物流公司为不同的客户或业务部门划分的独立运营中心,每个运营中     心都有自己的仓库(Queue),分拣规则(Exchange)和运输路线(Connection和Channel),这样       可以确保不同客户的包裹处理不会相互干扰,同时提供定制化的服务.

3.Exchange就像是站点里的分拣中心,当包裹到达时,分拣中心会根据包裹上的标签来决定这     个包裹应该送往哪个目的地(队列).快递站点可能有不同类型的分拣中心,有的按照具体地址     分拣,有的将包裹复制给多个收件人等.

4.Queue就是快递站点里的一个个仓库,用来临时存放等待派送的包裹,每个仓库都有一个或       多个快递员(消费者)负责从仓库中取出包裹并派送给最终的收件人.
5.Connection就像是快递员与快递站点之间的通信线路.快递员需要通过这个线路来接收派送     任务(消息).
6.Channel 就像是快递员在执行任务时使用的多个并行的通信线路. 这样,快递员可以同时       处理多个包裹, 比如一边派送包裹, 一边接收新的包裹.

2. AMQP

AMQP(Advanced Message Queuing Protocol) 是一种高级消息队列协议, AMQP定义了一套确定的消息交换功能, 包括交换器(Exchange), 队列(Queue)等, 这些组件共同工作, 使得生产者能够将消息发送到交换器, 然后由队列接收并等待消费者接收, AMOP还定义了一个网络协议, 允许客户端应用通过该协议与消息代理和AMOP模型进行交互通信

RabbitMQ是遵从AMQP协议的, 换句话说, RabbitMQ就是AMQP协议的Erlang的实现(当然abbitMQ还支持STOMP2,MOTT2等协议). AMOP的模型结构和RabbitMO的模型结构是一样的.

3.RabbitMO快速入门

步骤

1.引入依赖
2.编写生产者代码
3.编写消费者代码

3.1.引入依赖

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.21.0</version>
</dependency>

3.2.编写生产者代码

创建连接:

//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("8.130.35.237");
connectionFactory.setPort(5672); //需要提前开放端口号
connectionFactory.setUsername("study"); //账号
connectionFactory.setPassword("study"); //密码
connectionFactory.setVirtualHost("test"); //虚拟主机
Connection connection = connectionFactory.newConnection();

创建Channel:

//2.开启信道
Channel channel = connection.createChannel();

声明一个队列Queue:

/**
 * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
 *                                                Map<String, Object> arguments)
 * 参数说明:
 * queue:队列名称
 * durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
 * exclusive:是否独占,只能有⼀个消费者监听队列
 * autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
 * arguments 参数
 */
channel.queueDeclare("hello", true, false, false, null);

发送消息:

当一个新的 RabbitMO 节点启动时,它会预声明(declare)几个内置的交换机,内置交换机名称是空
字符串(""). 生产者发送的消息会根据队列名称直接路由到对应的队列.

例如: 如果有一个名为"hello"的队列, 生产者可以直接发送消息到"hello"队列, 而消费者可以从
"hello"队列中接收消息, 而不需要关心交换机的存在, 这种模式非常适合简单的应用场景,其中生产者和消费者之间的通信是一对一的.

//5.发送消息
/**
 * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
 * 参数说明
 * exchange: 交换机名称, 简单模式下, 交换机会使用默认的""
 * routingKey: 内置交换机, routingKey和队列名称保持一致
 * props: 属性配置
 * body: 消息
 */
String msg = "Hello World";
channel.basicPublish("","hello",null,msg.getBytes());
System.out.println(msg + "消息发送成功!");

释放资源:

//显式地关闭Channel是个好习惯, 但这不是必须的, Connection关闭的时候,Channel也会⾃动关闭. 
channel.close();
connection.close();

运行代码, 观察结果:

运行前:

运行之后, 队列中就已经有了hello这个队列的信息

如果在代码中注掉资源释放的代码,在Connections和Channels也可以看到相关信息

Queue也可以配置显示Consumer相关信息


3.3.编写消费者代码

消费者代码和生产者前3步都是一样的, 第4步改为消费当前队列
1.创建连接
2.创建Channel
3.声明一个队列Queue
4.消费消息
5.释放资源

消费当前队列
basicConsume

/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数: 
1. queue: 队列名称 
2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认 
3. callback: 回调对象 
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws 
IOException;

Consumer
用于定义消息消费者的行为.当我们需要从RabbitMQ接收消息时,需要提供一个实现了Consumer
consumer 接口的对象

Defaultconsumer 是 RabbitMQ提供的一个默认消费者,实现了Consumer接口

核心方法:
1. handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body):从队列接收到消息时,会自动调用该方法.

在这个方法中,我们可以定义如何处理接收到的消息,例如打印消息内容,处理业务逻辑或者将消息
存储到数据库等

参数说明如下:
consumerTag: 消费者标签,通常是消费者在订阅队列时指定的.
envelope: 包含消息的封包信息,如队列名称,交换机等
properties: 一些配置信息
body: 消息的具体内容

//4.消费消息
/**
 * basicConsume(String queue, boolean autoAck, Consumer callback)
 * 参数说明
 * queue:要消费的队列名称
 * autoAck:是否自动确认, 消费者收到信息后, 自动和MQ确认
 * callback:接收到消息后执行的逻辑
 */
DefaultConsumer consumer = new DefaultConsumer(channel) {
    //从队列中收到消息, 就会执行方法
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("接收到消息: " + new String(body));
    }
};
channel.basicConsume("hello", true, consumer);

 释放资源:

//等待程序执行完成
Thread.sleep(20000);

//5.释放资源
channel.close();
connection.close();

实际上消费者相当于是一个监听程序,不需要关闭资源

运行代码观察结果:

运行程序,我们刚才发送的消息,就收到了

4.源码

生产者代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ProductDemo {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.35.237");
        connectionFactory.setPort(5672); //需要提前开放端口号
        connectionFactory.setUsername("study"); //账号
        connectionFactory.setPassword("study"); //密码
        connectionFactory.setVirtualHost("test"); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.开启信道
        Channel channel = connection.createChannel();
        //3.声明交换机 使用内置的交换机
        //4.声明队列

        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                                Map<String, Object> arguments)
         * 参数说明:
         * queue:队列名称
         * durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
         * exclusive:是否独占,只能有⼀个消费者监听队列
         * autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
         * arguments 参数
         */
        channel.queueDeclare("hello", true, false, false, null);

        //5.发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 参数说明
         * exchange: 交换机名称, 简单模式下, 交换机会使用默认的""
         * routingKey: 内置交换机, routingKey和队列名称保持一致
         * props: 属性配置
         * body: 消息
         */
        String msg = "Hello World";
        channel.basicPublish("","hello",null,msg.getBytes());
        System.out.println(msg + "消息发送成功!");
        //6.资源释放
        channel.close();
        connection.close();
    }
}

消费者代码:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerDemo {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1.创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("8.130.35.237");
        connectionFactory.setPort(5672); //需要提前开放端口号
        connectionFactory.setUsername("study"); //账号
        connectionFactory.setPassword("study"); //密码
        connectionFactory.setVirtualHost("test"); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2.创建Channel
        Channel channel = connection.createChannel();
        //3.申明队列(可以省略)
//        channel.queueDeclare("hello", true, false, false, null);

        //4.消费消息
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 参数说明
         * queue:要消费的队列名称
         * autoAck:是否自动确认, 消费者收到信息后, 自动和MQ确认
         * callback:接收到消息后执行的逻辑
         */
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            //从队列中收到消息, 就会执行方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息: " + new String(body));
            }
        };
        String s = channel.basicConsume("hello", true, consumer);

        //等待程序执行完成
        Thread.sleep(20000);

        //5.释放资源
        channel.close();
        connection.close();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/883659.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LiveNVR监控流媒体Onvif/RTSP功能-支持电子放大拉框放大直播视频拉框放大录像视频流拉框放大电子放大

LiveNVR监控流媒体Onvif/RTSP功能-支持电子放大拉框放大直播视频拉框放大录像视频流拉框放大电子放大 1、视频广场2、录像回看3、RTSP/HLS/FLV/RTMP拉流Onvif流媒体服务 1、视频广场 视频广场 -》播放 &#xff0c;左键单击可以拉取矩形框&#xff0c;放大选中的范围&#xff…

NLP-transformer学习:(7)evaluate实践

NLP-transformer学习&#xff1a;&#xff08;7&#xff09;evaluate 使用方法 打好基础&#xff0c;为了后面学习走得更远。 本章节是单独的 NLP-transformer学习 章节&#xff0c;主要实践了evaluate。同时&#xff0c;最近将学习代码传到&#xff1a;https://github.com/Mex…

STL之vector篇(下)(手撕底层代码,从零实现vector的常用指令,深度剖析并优化其核心代码)

文章目录 1.基本结构与初始化1.1 空构造函数的实现与测试1.2 带大小和默认值的构造函数1.3 使用迭代器范围初始化的构造函数(建议先看完后面的reserve和push_back)1.4 拷贝构造函数1.5 赋值操作符的实现&#xff08;深拷贝&#xff09;1.6 析构函数1.7 begin 与 end 迭代器 2. …

使用宝塔部署项目在win上

项目部署 注意&#xff1a; 前后端部署项目&#xff0c;需要两个域名&#xff08;二级域名&#xff0c;就是主域名结尾的域名&#xff0c;需要在主域名下添加就可以了&#xff09;&#xff0c;前端一个&#xff0c;后端一个 思路&#xff1a;访问域名就会浏览器会加载前端的代…

如何守护变美神器安全?红外热像仪:放开那根美发棒让我来!

随着智能家电市场的迅速发展&#xff0c;制造商们越来越关注生产过程中效率和质量的提升。如何守护变美神器安全&#xff1f;红外热像仪&#xff1a;放开那根卷发棒让我来&#xff01; 美发棒生产遇到什么困境&#xff1f; 美发棒生产过程中会出现设备加热不均情况&#xff0c…

[数据库实验五] 审计及触发器

一、实验目的与要求&#xff1a; 1.了解MySQL审计功能及实现方式 2.掌握触发器的工作原理、定义及操作方法 二、实验内容&#xff1a; 注&#xff1a; 在同一个触发器内编写多行代码&#xff0c;需要用结构begin ……end 函数current_user()获得当前登录用户名 1.自动保存…

智慧城市主要运营模式分析

(一)运营模式演变 作为新一代信息化技术落地应用的新事物,智慧城市在建设模式方面借鉴了大量工程建设的经验,如平行发包(DBB,Design-Bid-Build)、EPC工程总承包、PPP等模式等,这些模式在不同的发展阶段和条件下发挥了重要作用。 在智慧城市发展模式从政府主导、以建为主、…

linux----进程地址空间

前言 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、空间分布 二、栈和堆的特点 &#xff08;1&#xff09;栈堆相对而生&#xff0c;堆是向上增长的&#xff0c;栈是向下增长的。 验证&#xff1a;堆是向上增长的 这里我们看到申请的堆&#xff…

记一次Windows状态栏不显示问题

文章目录 &#x1fa9f;解决方案☁️单次处理☁️有效处理 &#x1fa9f;现象&#x1fa9f;尝试的操作⭐END&#x1f31f;跋&#x1f31f;交流方式 &#x1fa9f;解决方案 ☁️单次处理 重启explorer.exe 命令行操作 注意&#xff0c;使用命令行操作的时候&#xff0c;出现…

链动 2+1 模式 S2B2C 商城小程序源码:创新价格盈利模式探索

摘要&#xff1a;本文深入探讨了价格盈利模式的两种类型&#xff0c;即价格返利模式和动态定价盈利模式。通过引入链动 21 模式 S2B2C 商城小程序源码&#xff0c;分析其在实现这两种价格盈利模式方面的优势和应用场景&#xff0c;为朋友圈卖货及电商领域的发展提供新的思路和方…

QT菜单之快捷菜单设计

快捷菜单又称为上下文菜单&#xff0c;通常在用鼠标右击的时候弹出。创建快捷菜单的方法和创建菜单栏菜单类似。 效果图&#xff1a; 一、将MainWindow类对象的ContextMenuPolicy属性设置为customContextMenu。 打开mainWindow.ui&#xff0c;在属性视图上找到ContextMenuPoli…

一文掌握python单元测试unittest(二)

接上篇:https://blog.csdn.net/qq_38120851/article/details/141642215 目录 四、参数化测试 1、使用 subTest 2、使用装饰器 3)使用第三方库parameterized 五、跳过测试 1、使用 unittest.skip() 或 unittest.skipIf() 装饰器: 2、使用 setUp() 方法中的断言来跳过整…

EasyCVR智慧公园视频智能管理方案:赋能公园安全管理新高度

随着城市化进程的加速&#xff0c;智慧城市建设已成为提升城市管理效率、增强居民生活质量的重要途径。智慧公园作为智慧城市的重要组成部分&#xff0c;其安全与管理水平直接影响着市民的休闲娱乐体验。EasyCVR智慧公园视频智能管理方案&#xff0c;正是基于这一背景应运而生&…

Android 车载应用开发指南 - CarService 详解(下)

车载应用正在改变人们的出行体验。从导航到娱乐、从安全到信息服务&#xff0c;车载应用的开发已成为汽车智能化发展的重要组成部分。而对于开发者来说&#xff0c;如何将自己的应用程序无缝集成到车载系统中&#xff0c;利用汽车的硬件和服务能力&#xff0c;是一个极具挑战性…

【Docker】01-Docker常见指令

1. Docker Docker会下载镜像&#xff0c;运行的时候&#xff0c;创建一个隔离的环境&#xff0c;称为容器。 docker run -d \ # 创建并运行一个容器&#xff0c;-d表示后台运行 --name mysql \ # 容器名称-p 3307:3306 \ # 端口映射&#xff0c;宿主机端口映射到容器端口-e TZ…

Cilium + ebpf 系列文章-什么是ebpf?(一)

前言&#xff1a; 这篇非常非常干&#xff0c;很有可能读不懂。 这里非常非常推荐&#xff0c;建议使用Cilium官网的lab来辅助学习&#xff01;&#xff01;&#xff01;Resources Library - IsovalentExplore Isovalents Resource Library, your one-stop destination for ins…

linux命令:显示已安装在linux内核的模块的详细信息的工具modinfo详解

目录 一、概述 二、使用方法 1、基本的使用语法 2、常用选项 3、输出字段 4、获取帮助 三、示例 四、实际用途 1、诊断问题 2、模块依赖 3、参数配置 五、其他事项 一、概述 modinfo 是 Linux 系统中的一个工具&#xff0c;用于显示有关已安装内核模块的详细信息。…

中间件:maxwell、canal

文章目录 1、底层原理&#xff1a;基于mysql的bin log日志实现的&#xff1a;把自己伪装成slave2、bin log 日志有三种模式&#xff1a;2.1、statement模式&#xff1a;2.2、row模式&#xff1a;2.3、mixed模式&#xff1a; 3、maxwell只支持 row 模式&#xff1a;4、maxwell介…

MySQL多版本并发控制MVCC实现原理

MVCC MVCC 是多版本并发控制方法&#xff0c;用来解决读和写之间的冲突&#xff0c;比如脏读、不可重复读问题&#xff0c;MVCC主要针对读操作做限制&#xff0c;保证每次读取到的数据都是本次读取之前的已经提交事务所修改的。 概述 当一个事务要对数据库中的数据进行selec…

十七,Spring Boot 整合 MyBatis 的详细步骤(两种方式)

十七&#xff0c;Spring Boot 整合 MyBatis 的详细步骤(两种方式) 文章目录 十七&#xff0c;Spring Boot 整合 MyBatis 的详细步骤(两种方式)1. Spring Boot 配置 MyBatis 的详细步骤2. 最后&#xff1a; MyBatis 的官方文档&#xff1a;https://mybatis.p2hp.com/ 关于 MyBa…