分布式消息队列:RabbitMQ(1)

目录

一:中间件

二:分布式消息队列 

2.1:是消息队列

2.1.1:消息队列的优势

2.1.1.1:异步处理化

2.1.1.2:削峰填谷

2.2:分布式消息队列

2.2.1:分布式消息队列的优势

2.2.1.1:数据的持久化

2.2.1.2:可扩展性

2.2.1.3:应用解耦

2.2.1.4:发送订阅 

2.2.2:分布式消息队列的应用场景 

三:Rabbitmq

3.1:基本概念

3.2:快速入门 

3.2.1:引入消息队列Java客户端

3.2.2:单消费开发生产者和消费者

 3.2.3:多消费开发生产者和消费者

3.3.3:交换机

3.3.3.1:交换机的类别

a):fanout


一:中间件

连接多个系统,帮助多个系统紧密协作的技术(组件)

二:分布式消息队列 

2.1:是消息队列

概念:存储消息的队列

关键词:存储,消息,队列

存储:存储数据

消息:某种数据结构,比如l字符串,对象,二进制数据,json等

队列:先进先出的数据结构

作用:在不同的系统下,应用之间实现消息的传输,不需要考虑传输应用的编程语言,系统和,框架等等,实现应用解耦的作用。

        eg:可以让Java开发的应用发消息,让php开发的应用收消息。

 针对生产者来说:不需要关心消费者什么时候接受消息,什么时候消费,我只需要把我的工作完成就好了。生产者和消费者之间实现了解耦。

针对上图,同样我们会发现,当小李要别的书籍的时候,小王也可以将别的书籍放到消息队列中。生产者和消费者从某一种程度上实现了解耦合。

2.1.1:消息队列的优势

2.1.1.1:异步处理化

生产者发送消息之后,可以继续去忙别的,消费者什么时候消费都可以,不产生阻塞。

2.1.1.2:削峰填谷

先把用户的请求放到消息队列种,消费者(实际执行操作的应用)可以按照自己的需求,慢慢去取。

举个栗子:

原本:

12点时来了10万个请求,原本情况下,10万个请求都在系统内部立刻处理,很快系统压力过大宕机。

现在:

把10万个请求放到消息队列中,处理系统以自己的恒定速率(比如每秒1个)慢慢执行,稳定处理。

2.2:分布式消息队列

2.2.1:分布式消息队列的优势

分布式消息队列继承于消息队列的优势,并进行了一部分的拓展。

2.2.1.1:数据的持久化

把消息集中存储在硬盘当中,服务器重启就不会丢失。

2.2.1.2:可扩展性

可以根据需求,随时增加(或减少)节点,继续保持稳定的服务。

2.2.1.3:应用解耦

可以连接不同语言(Java,PHP),框架开发的系统,让这些系统读取数据。

示例:

以前的项目:

加了分布式消息队列之后的项目: 

1:一个系统挂了,不影响另一个系统。

2:系统挂了之后并恢复,仍然可以从消息队列中取消息

3:只要发送消息到队列,就可以立即进行返回,不用同步调用所有系统,性能更高

2.2.1.4:发送订阅 

假设情景:当QQ进行了一部分改革之后,其他使用QQ的APP也应该处理
这部分改革。
QQ做了一个情景,要让其他系统知道,比如公告消息。如果QQ一次性给这些应用发消息,所引出的问题如下:
1.每次发通知都要调用很多系统,很麻烦,很可能失败
2.不知道哪个系统需要这些QQ的改革。
解决方案:大的核心系统始终往消息队列发消息,其他的系统都去订阅这个消息队列的消息,用的时候进行取就OK。

2.2.2:分布式消息队列的应用场景 

1:耗时场景。

2:高并发场景。

3:分布式系统的协作。(跨团队,跨业务合作,应用解耦)

4:强稳定的场景(金融业务,持久化,可靠性,削锋填谷)  

三:Rabbitmq

特点:生态好,易学习,易于理解,时效性强,支持不同语言的客户端,扩展性,可用性都很不错。

3.1:基本概念

AMPQ协议:Rabbitmq是遵循AMPQ协议的一种消息中间件。

生产者:发消息到交换机

消费者:收消息的,从某个队列中取消息

交换机(exchange):负责把消息转发到对应的队列

队列(Queue):存储消息的

路由(Rountes):转发,怎么把一个消息从一个地方转发到另一个地方(比如生产者转发到某个队列)

Rabbitmq:端口占用   5672:程序连接的端口 15672:管理界面端口

Rabbitmq的安装:https://blog.csdn.net/qq_25919879/article/details/113055350

管理器页面打不开:http://t.csdnimg.cn/6FqZl

3.2:快速入门 

3.2.1:引入消息队列Java客户端

<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.17.0</version>
 </dependency>

3.2.2:单消费开发生产者和消费者

生产者端代码:

public class SingeProducer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             //频道相当于客户端(jdbcClient,redisClient),提供了和消队列server建立通信,程序通过channel进行发送消息
             Channel channel = connection.createChannel()) {
            //创建消息队列,第二个参数(durable):是否开启持久化,第三个参数exclusiove:是否允许当前这个创建消息队列的
            //连接操作消息队列 第四个参数:没有人使用队列,是否需要删除
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //发送消息
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者代码:

public class SingeConsumer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //创建频道,提供通信
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

 3.2.3:多消费开发生产者和消费者

场景:一个生产者给队列里面发了一条消息,多个消费者进行消费。适用于多个机器同时去接收并处理任务(每个机器处理任务有限)

队列持久化:

durable:

参数设置为true,服务器队列不丢失

 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

 消息持久化:

 指定MessageProperties.PERSISTENY_TEXT_PLAIN参数

    channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));

生产者端代码:

public class MultiProducer {
    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            Scanner scanner=new Scanner(System.in);
            while(scanner.hasNext()){
                String message = scanner.nextLine();
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }

        }
    }
}

消费者代码: 

在消费者代码中,如何测验一个消费者只能取一个任务,我们利用for循环来进行解决。

指定确认某条消息:

第一个参数:获取消息的信息

第二个参数:如果是true,把所有的历史消息全都确认了。如果为false,取出当前的消息。

   //第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息
   //如果为false,则为一次性取一个消息
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

指定拒绝某条消息

第一个参数:获取消息的信息

第二个参数:如果是true,则代表是否要拒绝所有的历史消息。

第三个参数:如果是false, 则代表失败的任务是否要重新入队。

  channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
public class MultiConsumer {
    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        //建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        for (int i = 0; i <= 2; i++) {
            final Channel channel = connection.createChannel();
            int finalI=i;
            //声明队列
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            //控制单个消费者的任务积压数:每个消费者最多处理一个任务,每个消费者智能处理一个任务
            channel.basicQos(1);
            //处理从队列中取的的消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                try {
                    //处理工作
                    System.out.println(" [x] Received '" +"编号:"+finalI+ message + "'");

                    //停20秒模拟一个机器处理工作能力有限
                    Thread.sleep(20000);
                    //第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息
                    //如果为false,则为一次性取一个消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            //开启消费监听
            channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });

        }

    }

}

3.3.3:交换机

一个生产者给多个队列发消息,一个生产者对多个队列。交换机:转发功能,怎么把消息转发到不同的队列上。

3.3.3.1:交换机的类别
a):fanout

场景:很适用于发布订阅的场景。

特点:消息会被转发到所有绑定到交换机的队列。

生产者代码:当生产者发送消息后,由交换机放到消息队列中,消费者从消息队列中取。

public class FonoutProducer {

        private static final String EXCHANGE_NAME = "1";

        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                //创建交换机
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                Scanner scanner=new Scanner(System.in);
                while(scanner.hasNext()){
                    String message = scanner.nextLine();
                    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                    System.out.println(" [x] Sent '" + message + "'");
                }

            }
        }

}

消费者代码:

public class FonoutConsumer {


    private static final String EXCHANGE_NAME = "1";
    public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            Channel channel2= connection.createChannel();
            //声明交换机
            //创建队列,随机分配一个队列名称
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName="xiaowang";
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName2="xiaoli";
            channel2.queueDeclare(queueName2,true,false,false,null);
            channel2.queueBind(queueName2,EXCHANGE_NAME,"");

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [小王] Received '" + message + "'");
            };
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [小李] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
            channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
        }
    }

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/106937.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【内网穿透】搭建我的世界Java版服务器,公网远程联机

目录 前言 1. 搭建我的世界服务器 1.1 服务器安装java环境 1.2 配置服务端 2. 测试局域网联机 3. 公网远程联机 3.1 安装cpolar内网穿透 3.1.1 windows系统 3.1.2 linux系统&#xff08;支持一键自动安装脚本&#xff09; 3.2 创建隧道映射内网端口 3.3 测试公网远程…

编程实例:眼镜店顾客档案管理系统软件,可以登记顾客信息查询历史记录,视力检查登记查询,配镜销售单开单打印

编程实例&#xff1a;眼镜店顾客档案管理系统软件&#xff0c;可以登记顾客信息查询历史记录&#xff0c;视力检查登记查询&#xff0c;配镜销售单开单打印 编程系统化课程总目录及明细&#xff0c;点击进入了解详情。 https://blog.csdn.net/qq_29129627/article/details/1340…

1-07 React配置postcss-px-to-viewport

React配置postcss-px-to-viewport 移动端适配 安装依赖&#xff1a;在项目根目录下运行以下命令安装所需的依赖包&#xff1a; npm install postcss-px-to-viewport --save-dev配置代码 const path require(path);module.exports {webpack: {alias: {: path.resolve(__di…

轻量封装WebGPU渲染系统示例<6>-混合模式(源码)

当前示例源码github地址: https://github.com/vilyLei/voxwebgpu/blob/version-1.01/src/voxgpu/sample/BlendTest.ts 此示例渲染系统实现的特性: 1. 用户态与系统态隔离。 2. 高频调用与低频调用隔离。 3. 面向用户的易用性封装。 4. 渲染数据和渲染机制分离。 5. 用户…

基于springboot零食商城管理系统

功能如图所示 摘要 这基于Spring Boot的零食商城管理系统提供了强大的购物车和订单管理功能。用户可以在系统中浏览零食产品&#xff0c;并将它们添加到购物车中。购物车可以保存用户的选购商品&#xff0c;允许随时查看已选择的商品和它们的数量。一旦用户满意&#xff0c;他们…

推荐一个高效测试用例工具:XMind2TestCase..

一、背景 软件测试的核心是什么&#xff1f;毫无疑问是测试分析和测试用例设计&#xff0c;也是日常测试投入最多时间的工作内容之一。 然而&#xff0c;传统的测试用例设计过程有很多痛点&#xff1a; 1、使用Excel表格进行测试用例设计&#xff0c;虽然成本低&#xff0c;但…

PHP与mysql数据库交互

PHP与mysql数据库交互 文章目录 PHP与mysql数据库交互方法速查建立与Mysql链接捕获连接错误SQL语句的执行SQL 错误SQL语句执行结果集对象方法速查 案例 方法速查 函数名 作用 mysqli_connect() 与MySQL 数据库建立连接。 mysqli_close() 关闭与MYSQL 数据库建…

终于找到一个很赞的相亲社交软件了,而且还是公众号java+vue

目前&#xff0c;相亲已经成为了时下的热门话题&#xff0c;越来越多的单身男女找不到心仪的另一半&#xff0c;忙碌的工作&#xff0c;空余时间很少。其次离开校园之后&#xff0c;圈子变小&#xff0c;也没有渠道认识到新的朋友&#xff0c;种种情况影响下&#xff0c;单身的…

OpenHarmony docker环境搭建所见的问题和解决

【摘要】OpenHarmony docker环境搭建需要一台安装Ubuntu的虚拟机&#xff0c;并且虚拟机中需要有VScode。 整个搭建流程请参考这篇博客&#xff1a;OpenHarmony docker环境搭建-云社区-华为云 (huaweicloud.com) 上篇博主是用Ubuntu的服务器进行环境搭建的&#xff0c;在使用VS…

2023年第四届MathorCup大数据竞赛(A题)|坑洼道路检测和识别|数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2021年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题。 希望这些想法对大家的做题有一定的启发和借鉴意义。 让我们来…

uniapp 中添加 vconsole

uniapp 中添加 vconsole 一、安装 vconsole npm i vconsole二、使用 vconsole 在项目的 main.js 文件中添加如下内容 // #ifdef H5 // 提交前需要注释 本地调试使用 import * as vconsole from "vconsole"; new vconsole() // 使用 vconsole // #endif三、成功

在Go项目中二次封装Kafka客户端功能

1.摘要 在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。 在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafk…

禁止chrome浏览器更新方式

1、禁用更新服务 WinR调出运行&#xff0c;输入services.msc&#xff0c;进入服务。 在服务中有两个带有Google Update字样&#xff0c;双击打开后禁用&#xff0c;并把恢复选项设置为无操作。 2、删除计划任务 运行taskschd.msc&#xff0c;打开计划任务程序库&#xff0c;在…

FL Studio21水果编曲软件如何切换成官方中文版

FL studio又被国内网友称之为水果音乐制作软件21版本&#xff0c;是Image-Line公司成立23周年而发布的一个版本&#xff0c;FL studio中文版是目前互联网上最优秀的完整的软件音乐制作环境或数字音频工作站&#xff0c;FL Studio包含了编排&#xff0c;录制&#xff0c;编辑&am…

Leetcode刷题详解——点名

1. 题目链接&#xff1a;LCR 173. 点名 2. 题目描述&#xff1a; 某班级 n 位同学的学号为 0 ~ n-1。点名结果记录于升序数组 records。假定仅有一位同学缺席&#xff0c;请返回他的学号。 示例 1: 输入: records [0,1,2,3,5] 输出: 4示例 2: 输入: records [0, 1, 2, 3, 4,…

MATLAB中polyvalm函数用法

目录 语法 说明 示例 特征多项式的矩阵计算 polyvalm函数的功能是矩阵多项式计算。 语法 Y polyvalm(p,X) 说明 Y polyvalm(p,X) 以矩阵方式返回多项式 p 的计算值。此计算方式等同于使用多项式 p 替换矩阵 X。 示例 特征多项式的矩阵计算 求解 4 阶帕斯卡矩阵的特征…

Android-登录注册页面(第三次作业)

第三次作业 - 登录注册页面 题目要求 嵌套布局。使用线性布局的嵌套结构&#xff0c;实现登录注册的页面。&#xff08;例4-3&#xff09; 创建空的Activity 项目结构树如下图所示&#xff1a; 注意&#xff1a;MainActivity.java文件并为有任何操作&#xff0c;主要功能集中…

docker应用部署---Tomcat的部署配置

1. 搜索tomcat镜像 docker search tomcat2. 拉取tomcat镜像 docker pull tomcat3. 创建容器&#xff0c;设置端口映射、目录映射 # 在/root目录下创建tomcat目录用于存储tomcat数据信息 mkdir ~/tomcat cd ~/tomcatdocker run -id --namec_tomcat \ -p 8080:8080 \ -v $PWD:…

项目管理工具ConceptDraw PROJECT mac中文版自定义列功能

ConceptDraw PROJECT Mac是一款专业的项目管理工具&#xff0c;适用于MacOS平台。它提供了成功规划和执行项目所需的完整功能&#xff0c;包括任务和资源管理、报告和变更控制。 这款软件可以与ConceptDraw office集成&#xff0c;利用思维导图和数据可视化的强大功能来改进项目…