RabbitMQ基本介绍及简单上手

(一)什么是MQ

  MQ(message queue)本质上是队列,满足先入先出,只不过队列中存放的内容是消息而已,那什么是消息呢? 消息可以是字符串,json也可以是一些复杂对象

 我们应用场景通常是再分布式系统之间

    系统通信方式

一般我们的系统调用方式有两种

一种是同步通信,也就是a--->b,直接调用对方服务,数据从a出发直接到达b

另一种叫异步通信,也就是a----->容器---->b,数据从一段出发,先进入一个容器进行临时存储,当满足条件后,由容器发给另一端,这个容器的实现就是MQ

(二)MQ的作用

MQ主要的工作是接收并发送消息,在不同场景下可以有不同的作用

这里说一些比较常见的作用

1.异步解耦

  在业务流程中,有一些操作时很耗时的,并且可能阻塞其他任务,但是这些操作并不要求立即执行,我们就可以借助MQ来把这些操作异步化

2.流量削峰

 我们在一些特殊时间可能会有访问量突增的情况,那我们应用一般来说是不能立即全部处理的,所以我们可以使用MQ把访问放到消息队列中,然后依次的处理这些请求

3.消息分发

  当多个系统要对同一个数据做出响应时,我们可以使用MQ来进行消息分发,比如支付成功后,支付系统就可以向MQ发送消息,然后其他系统来订阅这个消息,就不需要轮询数据库了

4.延迟通知

 在一些定时要进行处理的一些场景中,我们可以使用MQ的延迟消息功能,就比如在电商平台,用户下单后一定时间未支付,我们可以把限定的时间放到队列中,时间到了自动取消订单

(三)RabbitMQ的核心概念

我们这里用RabbitMQ来学习消息队列

AMQP

首先我们来看一下这个,AMQP是一种高级消息队列,定义了一套确定的消息交换功能,包含交换机和队列如何映射等,这些组件共同工作,使消息能够正常被接收和发送,AMQP还定义了一个网络协议,允许客户端和AMQP模型进行通信

我们RabbitMQ就是实现了AMQP协议的,RabbitMQ就是AMQP协议的Erlang的实现

所以他们两者的结构模型是一样的

首先我们来看一下RabbitMQ的工作流程

RabbitMQ是⼀个消息中间件,也是⼀个⽣产者消费者模型.它负责接收,存储并转发消息.

首先我们先来了解一下图中出现的一些概念

1.Producer和Consumer

Producer:生产者,是MQ的客户端,用来向RabbitMQ发送消息的

Consumer:消费者,也是MQ的客户端,是用来从RabbitMQ中取消息的

Brokeer就是RabbitMQ,用来接收和转发消息的

我们也可以简化下上面的图,只关注这三者

2.Connection和Channel

Connection:连接,是客户端和RabbitMQ服务器建立的一个TCP连接,这个连接时建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息

Channel:信道,每个建立的connection都可以由很多个信道,每个信道是一个独立的虚拟连接,消息的发送和接收都是通过channel的

注:信道的作用是为了把所有的读写操作都给复用到同一个TCP连接上,这个可以减少建立和关闭连接的开销,提高性能

3.Virtual host

Virtual host:虚拟机,这个是消息队列提供的一种逻辑上的隔离机制(本质上没有隔离),对于Rabbit一个Broker可以有多个虚拟机,当多个不同的⽤⼾使⽤同⼀个 RabbitMQ Server提供的服务时,可以虚拟划分出多个vhost,每个⽤⼾在⾃⼰的vhost创建exchange/queue等

4.queue

queue:队列,是RabbitMQ中的内部对象,是存储消息的地方

多个消费者可以订阅同一个队列

5.exchange

exchange:交换机 ,是消息到达broker中的第一站(因为虚拟机是逻辑上的本质上是不存在的),它负责接收生产者传来的消息,并根据routingkey(bingingkey)来把这些消息路由到一个或者多个queue中,exchange起到了消息路由的作用

6.RabbitMQ的工作流程

1.生产者生产消息

2.生产者与RabbitMQ建立一个连接并且开启一个信道

3.生产者声明一个交换机

4.生产者声明一个队列

5.生产者发送消息到Broker

6.Broker接收消息,并存入到相列中,如果没有相应的队列,就根据生产者的配置来进行一些处理

(四)RabbitMQ入门程序

  首先我们要使用RabbitMQ我们要在服务器上进行安装,那安装就先不说了,我们可以在官方文档或者b站找到很多资料

1.引入依赖

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.20.0</version>
        </dependency>

生产者代码:

public class Produce {
    public static void main(String[] args) throws IOException, TimeoutException {
          //建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("62.234.46.219");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wd");
        connectionFactory.setPassword("w85315678");
        connectionFactory.setVirtualHost("bit");
        Connection connection = connectionFactory.newConnection();
        //开启信道
        Channel channel = connection.createChannel();
        //声明交换机(这里使用默认交换机)
        //声明队列
        /**
         * var1 队列名称
         * var2 是否持久化
         * var3 是否独占(这个队列只有一个消费者)
         * var4 是否自动删除
         * argument 一些高级参数
         * AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;
         */
        channel.queueDeclare("hello",true,false,false,null);
        //发送消息
        /**
         * var1 交换机名称
         * var2 交换机通过什么映射到queue中(内置交换机routingkey与队列名称保持一致)
         * var3 属性配置
         * body 消息
         * void basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4)
         */
        String s1="hello wd";
        channel.basicPublish("","hello",null,s1.getBytes());
        //释放信道
        channel.close();
        //释放连接
        connection.close();
    }
}

消费者代码:

public class consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("62.234.46.219");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wd");
        connectionFactory.setPassword("w85315678");
        connectionFactory.setVirtualHost("bit");
        Connection connection = connectionFactory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //声明队列(如果队列没有才创建,有就不创建)
        channel.queueDeclare("hello",true,false,false,null);
        //消费消息
        /**
         * String basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue 队列名称
         * autoAck是否自动确认
         * callback接收消息后要执行的逻辑
         */
        DefaultConsumer consumer=new DefaultConsumer(channel){
            //从队列中获取到消息就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到: "+new String(body));
            }
        };
        channel.basicConsume("hello",true,consumer);
        //释放资源
        channel.close();
        connection.close();
    }
}

消费者代码中有一个类叫Consumer

也就是我们channel.basicConsume()中的最后一个参数

这个参数是来定义消费者行为的,当我们需要从MQ中接收消息时,我们要提供一个实现了Consumer接口的对象(这里我使用的是内部类)

DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口

核心方法就是我们上面重写的方法

那上面我们没有说参数都是什么意思,这里来说一下

consumerTag:消费者标签,通常是消费者在订阅队列时指定的

envelope:有一些封包信息,如队列名称,交换机等

properties:一些配置信息

body:消息的具体内容

(五)可能出现的错误

1.

我们上面销毁的过程中,一定要先销毁channel再销毁connection或者直接销毁connection不销毁channel

如果先销毁connection再销毁channel就会报错

2.

我们在消费者代码中,如果不自己声明队列,且MQ中没有这个队列时,我们就会报错

3.

如果端口或者ip错误,也会报错

4.账号密码不正确也会报错 

5.用户对虚拟机没有操作权限会报错

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/951009.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

sys.dm_exec_connections:查询与 SQL Server 实例建立的连接有关的信息以及每个连接的详细信息(客户端ip)

文章目录 引言I 基于dm_exec_connections查询客户端ip权限物理联接时间范围dm_exec_connections表see also: 监视SQL Server 内存使用量资源信号灯 DMV sys.dm_exec_query_resource_semaphores( 确定查询执行内存的等待)引言 查询历史数据库客户端ip应用场景: 安全分析缺乏…

vscode如何离线安装插件

在没有网络的时候,如果要安装插件,就会麻烦一些,需要通过离线安装的方式进行。下面记录如何在vscode离线安装插件。 一、下载离线插件 在一台能联网的电脑中,下载好离线插件,拷贝到无法联网的电脑上。等待安装。 vscode插件商店地址:https://marketplace.visualstudio.co…

基于ADAS 与关键点特征金字塔网络融合的3D LiDAR目标检测原理与算法实现

一、概述 3D LiDAR目标检测是一种在三维空间中识别和定位感兴趣目标的技术。在自动驾驶系统和先进的空间分析中&#xff0c;目标检测方法的不断演进至关重要。3D LiDAR目标检测作为一种变革性的技术&#xff0c;在环境感知方面提供了前所未有的准确性和深度信息. 在这里&…

【玩转全栈】----Django连接MySQL

阅前先赞&#xff0c;养好习惯&#xff01; 目录 1、ORM框架介绍 选择建议 2、安装mysqlclient 3、创建数据库 4、修改settings&#xff0c;连接数据库 5、对数据库进行操作 创建表 删除表 添加数据 删除数据 修改&#xff08;更新&#xff09;数据&#xff1a; 获取数据 1、OR…

基于Android的疫苗预约系统

博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业多年&#xff0c;熟悉各种主流语言&#xff0c;精通java、python、php、爬虫、web开发&#xff0c;已经做了多年的设计程序开发&#xff0c;开发过上千套设计程序&#xff0c;没有什么华丽的语言&#xff0c;只有实…

基于 Apache Commons Pool 实现的 gRPC 连接池管理类 GrpcChannelPool 性能分析与优化

基于 Apache Commons Pool 实现的 gRPC 连接池管理类 GrpcChannelPool 性能分析与优化 1. 输出关键信息的代码示例 日志记录方法 使用以下代码记录连接池的关键信息&#xff0c;帮助分析连接池的状态和性能瓶颈&#xff1a; import org.apache.commons.pool2.impl.GenericO…

矩阵碰一碰发视频的视频剪辑功能源码搭建,支持OEM

在短视频创作与传播领域&#xff0c;矩阵碰一碰发视频结合视频剪辑功能&#xff0c;为用户带来了高效且富有创意的内容产出方式。这一功能允许用户通过碰一碰 NFC 设备触发视频分享&#xff0c;并在分享前对视频进行个性化剪辑。以下将详细阐述该功能的源码搭建过程。 一、技术…

CClinkIEfield Basic转Modbus TCP网关模块连接三菱FX5U PLC

捷米特JM-CCLKIE-TCP是自主研发的一款CCLINK IE FB从站功能的通讯网关。该产品主要功能是将各种 MODBUS-TCP 设备接入到 CCLINK IE FB网络中。 捷米特JM-CCLKIE-TCP网关连接到CCLINK IE FB总线中做为从站使用&#xff0c;连接到 MODBUS-TCP 总线中做为主站或从站使用。 为了打破…

农产品智慧物流系统

本文结尾处获取源码。 本文结尾处获取源码。 本文结尾处获取源码。 一、相关技术 后端&#xff1a;Java、JavaWeb / Springboot。前端&#xff1a;Vue、HTML / CSS / Javascript 等。数据库&#xff1a;MySQL 二、相关软件&#xff08;列出的软件其一均可运行&#xff09; I…

设计模式-结构型-桥接模式

1. 什么是桥接模式&#xff1f; 桥接模式&#xff08;Bridge Pattern&#xff09; 是一种结构型设计模式&#xff0c;它旨在将抽象部分与实现部分分离&#xff0c;使它们可以独立变化。通过这种方式&#xff0c;系统可以在抽象和实现两方面进行扩展&#xff0c;而无需相互影响…

后台管理系统引导功能的实现

引导是软件中经常见到的一个功能&#xff0c;无论是在后台项目还是前台或者是移动端项目中。 那么对于引导页而言&#xff0c;它是如何实现的呢&#xff1f;通常情况下引导页是通过 聚焦 的方式&#xff0c;高亮一块视图&#xff0c;然后通过文字解释的形式来告知用户该功能的作…

现场展示deepseek VS openAI o1模型大对比

DeepSeek-V3 模型的发布在 AI 领域引起了广泛关注。作为一款拥有 6850 亿参数的混合专家&#xff08;MoE&#xff09;语言模型&#xff0c;DeepSeek-V3 在多个基准测试中表现出色&#xff0c;甚至超越了一些闭源模型。其在 Aider 代码能力排行榜上的正确率达到 48.4%&#xff0…

Golang的并发编程框架比较

# Golang的并发编程框架比较 中的并发编程 在现代软件开发中&#xff0c;处理高并发的能力愈发重要。Golang作为一门支持并发编程的编程语言&#xff0c;提供了丰富的并发编程框架和工具&#xff0c;使得开发者能够更轻松地处理并发任务。本文将介绍Golang中几种常用的并发编程…

SSL,TLS协议分析

写在前面 工作中总是会接触到https协议&#xff0c;也知道其使用了ssl&#xff0c;tls协议。但对其细节并不是十分的清楚。所以&#xff0c;就希望通过这篇文章让自己和读者朋友们都能对这方面知识有更清晰的理解。 1&#xff1a;tls/ssl协议的工作原理 1.1&#xff1a;设计的…

网络安全-XSS跨站脚本攻击(基础篇)

漏洞扫描的原理 1.跨站脚本攻击介绍 xss跨站脚本攻击&#xff1a; xSS 全称&#xff08;Cross site Scripting &#xff09;跨站脚本攻击&#xff0c;是最常见的Web应用程序安全漏洞之一&#xff0c;位于OWASP top 10 2013/2017年度分别为第三名和第七名&#xff0c;XSS是指攻…

SpringBoot之核心配置

学习目标&#xff1a; 1.熟悉Spring Boot全局配置文件的使用 2.掌握Spring Boot配置文件属性值注入 3.熟悉Spring Boot自定义配置 4.掌握Profile多环境配置 5.了解随机值设置以及参数间引用 1.全局配置文件 Spring Boot使用 application.properties 或者application.yaml 的文…

【Word_笔记】Word的修订模式内容改为颜色标记

需求如下&#xff1a;请把修改后的部分直接在原文标出来&#xff0c;不要采用修订模式 步骤1&#xff1a;打开需要转换的word后&#xff0c;同时按住alt和F11 进入&#xff08;Microsoft Visual Basic for Appliations&#xff09; 步骤2&#xff1a;插入 ---- 模块 步骤3&…

【C++】字符数|组与字符串的深度解析

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;一、字符数组的基本概念1. 什么是字符数组&#xff1f;2. C语言风格字符串的特点 &#x1f4af;二、字符数组的初始化1. 字符串直接赋值2. 按字符逐个赋值数据对比示例 &am…

计算机网络——网络层—IP数据报与分片

一、IP 数据报的格式 • 一个 IP 数据报由首部和数据两部分组成。 • 首部的前一部分是固定长度&#xff0c;共 20 字节&#xff0c;是所有 IP 数据报必须具有的。 • 在首部的固定部分的后面是一些可选字段&#xff0c;其长度是可变的。 IP 数据报首部的固定部分中的各字段 版…

【Python学习(八)——异常处理】

Python学习&#xff08;八&#xff09;——异常处理 本文介绍了异常处理的知识&#xff0c;仅作为本人学习时记录&#xff0c;感兴趣的初学者可以一起看看&#xff0c;欢迎评论区讨论&#xff0c;一起加油鸭~~~ 心中默念&#xff1a;Python 简单好学&#xff01;&#xff01;&…