【初始RabbitMQ】工作队列的实现

工作队列

工作队列(又称为任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

轮训分发消息

我们启动两个工作线程,一个消息发送线程,一个用来接受线程,我们来看看它们两个工作线程是如何工作的

抽取工具类

我们将获取信道这个重复的代码封装为一个类,当时用的时候直接调用

/**
 * 连接工厂创建信道工具类
 */
public class RabbitMqUtils {
    public static Channel getChannel(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("118.31.6.132");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = null;
        try {
            try {
                connection = factory.newConnection();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        Channel channel = null;
        try {
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return channel;
    }
}

这里使用try...catch防止之后每次调用都需要抛出异常

生产者代码

/**
 * 生产者 发送大量消息
 */
public class Task01 {
    //队列名称
    public static final String QUEUE_NAME = "hello";
    //发送大量的消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        /*
         *生成一个队列
         * 1.队列名称
         * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
         * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
         * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
         * 5.其他参数 延迟消息等
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台接受消息
        Scanner scanner = new Scanner(System.in);
        /**
         * 发送一个消息
         * 1.发送到那个交换机
         * 2.路由的KEY值是哪个 本次是队列的名称
         * 3.其他参数信息
         * 4.发送消息的消息体
         */
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("发送"+message+"完成");
        }
    }
}

消费者代码

/**
 * 这是一个工作线程(消费者)
 */
public class Worker01 {
    //队列名称
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //消息接受
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };
        //消息接受被取消时
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消息取消消费接口回调");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        System.out.println("C2等待接受消息......................");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

启动两个工作线程

在运行之前我们要修改一个选项,这样我们就不需要重复的写消费者2了

启动一个发送线程 

启动程序,用生产者发送四条消息,

结果分析

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且 是按照有序的一个接收一次消息

 消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

消息应答的方法

  1. Channel.basicAck(用于肯定确认)
    1. RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  2. Channel.basicNack(用于否定确认)
  3. Channel.basicReject(用于否定确认)
    1. 与 Channel.basicNack 相比少一个参数(批量应答参数) 不处理该消息了直接拒绝,可以将其丢弃了

Multiple(批量应答)的解释

手动应答的一个好处就是可以批量应答并且减少网络拥堵

multiple 的 true 和 false 代表不同意思:

true:代表批量应答channel 上未应答的消息,比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答

false:同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息

消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改 为手动应答,消费者在上面代码的基础上增加下面画红色部分代码

睡眠工具类:

public class SleepUtils {
    public static void sleep(int second){
        try {
            Thread.sleep(1000*second);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

生产者:

/*
* 消息再手动应答不丢失、放回消息队列重新消费
 */
public class Task2 {
    //队列名称
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        //从控制台中输入信息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

消费者01:

public class Work01 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息短");
        //消息消费的时候如何处置消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            /**
             * 1.消息标记tag
             * 2.是否批量应答未应答的消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        //取消消息的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

消费者02:

public class Work02 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息长");
        //消息消费的时候如何处置消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:"+message);
            /**
             * 1.消息标记tag
             * 2.是否批量应答未应答的消息
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        //取消消息的回调
        CancelCallback cancelCallback = consumerTag->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者信息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
         * 3.消费者微车才能更改消费的回调
         * 4.消费者取消消费回调
         */
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

手动应答效果演示

在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/391453.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

电脑屏幕录制工具 Top10 榜单,免费无水印方法集

随着媒体行业的突飞猛进,不同服务之间对有效屏幕录制的竞争日益激烈。这导致市场上出现了质量参差不齐的屏幕录像机。特别是有些录屏器会自动给你录制的视频加上水印,给需要在公共场合使用的人留下不专业的印象。除此之外,它们甚至不能保护您…

【Google Bard】免费生成图像——功能和使用方法详解

Google Bard 关于Bard 图片生成功能打开Bard通过Bard来生成图片Bard Vs Bing Vs Dall-EBard的生成结果Bing的生成结果Dall-E 的生成结果 总结 关于Bard 图片生成功能 Google在2月1日(当地时间)宣布,其对话型AI“Bard”新增了图像生成功能。 …

Mysql——update更新数据的方式

注:文章参考: MySQL 更新数据 不同条件(批量)更新不同值_update批量更新同一列不同值-CSDN博客文章浏览阅读2w次,点赞20次,收藏70次。一般在更新时会遇到以下场景:1.全部更新;2.根据条件更新字段中的某部分…

MATLAB离线文档安装

MATLAB离线文档安装 来源于最全matlab安装离线文档教程只是对内容进行了精简,同时更方便查找 一、下载离线文档 我上传的2023b离线文档 提供本体属于违规行为,本体下载链接已删除 为方便已安装好软件的朋友想安装离线帮助文档,由于官网下载…

模型 IPO(输入、处理、输出)学习模型

系列文章 分享 模型,了解更多👉 模型_总纲目录。重在提升认知。信息转化与传递。 1 模型 IPO(输入、处理、输出)学习模型的应用 1.1 项目管理知识体系 PMBOK 中的IPO应用 在项目管理领域,PMBOK(Project Management Body of Know…

究极小白如何自己搭建一个自动发卡网站-独角数卡

本人从来没接触过建站,我之前都是在TB上花90叫别人给我搭建的网站,前几天这个TB店倒闭跑路了,而我的发卡网也打不开了,没办法,逼上梁山,自己捣鼓出来了!下面是2023/4/2自己建好的! …

STM32F1 - 系统时钟SysTick

SysTick 1> SysTick硬件框图2> SysTick的时钟源3> 1ms定时_中断方式4> 思考:无符号数 0 - 255 ?相关资料 1> SysTick硬件框图 SysTick属于Cotex-M3,是CPU外设; SysTick: 位宽24bit, 递减计数,自动重装…

《Go 简易速速上手小册》第2章:控制结构与函数(2024 最新版)

文章目录 2.1 条件语句:决策的艺术2.1.1 基础知识讲解2.1.2 重点案例:用户角色权限判断实现用户角色权限判断扩展功能实现代码功能扩展:添加或删除用户 2.1.3 拓展案例 1:成绩等级判断实现成绩等级判断功能实现代码扩展功能&#…

【开源图床】使用Typora+PicGo+Github+CDN搭建个人博客图床

准备工作: 首先电脑得提前完成安装如下: 1. nodejs环境(node ,npm):【安装指南】nodejs下载、安装与配置详细教程 2. Picgo:【安装指南】图床神器之Picgo下载、安装与配置详细教程 3. Typora:【安装指南】markdown神器之Typora下载、安装与无限使用详细教…

canal监听binlog记录业务数据的变更;canalAdmin对instance做web配置

概述 平时在开发中会通过logback打印一些开发日志,有时也会需要记录一些业务日志,简单的就直接用log记录一下,但是系统中需要记录日志的地方越来越多时,不能每个地方都写一套log记录; 由于平常用的大多都是mysql&…

Linux进程间通信(三)-----System V消息队列

消息队列的概念及原理 消息队列实际上就是在系统当中创建了一个队列,队列当中的每个成员都是一个数据块,这些数据块都由类型和信息两部分构成,两个互相通信的进程通过某种方式看到同一个消息队列,这两个进程向对方发数据时&#x…

【C++ QT项目2】——高仿安信可串口调试助手

【C QT项目2】——高仿安信可串口调试助手 1. 项目概述2. 项目UI设计3. 串口通信核心代码开发3.1 QSerialPort介绍及示例3.2 扫描系统串口3.3 数据的收发3.4 定时发送(QT定时器)3.5 HEX显示与发送 4. 串口调试助手功能的优化4.1 串口的实时扫描4.2 获取系…

2024.2.15 模拟实现 RabbitMQ —— 消息持久化

目录 引言 约定存储方式 消息序列化 重点理解 针对 MessageFileManager 单元测试 小结 统一硬盘操作​​​​​​​ 引言 问题: 关于 Message(消息)为啥在硬盘上存储? 回答: 消息操作并不涉及到复杂的增删查改消…

对比@Resource和@Autowired

版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl Resource和Autowired概述 在Java的Spring框架中,Resource和Autowired都是用于实现依赖注入(Dependency Injection, DI)的重要注解。依赖…

【机器学习笔记】11 支持向量机

支 持 向 量 机 ( Support Vector Machine,SVM ) 支 持 向 量 机 是 一 类 按 监 督 学 习 ( supervisedlearning)方式对数据进行二元分类的广义线性分类器(generalized linear classifier),其…

Python教程(26)——Python迭代器和生成器详解

迭代器 Python中的迭代器是一种对象,它可以迭代(遍历)一个可迭代对象(比如列表、元组或字符串)的元素。迭代器用于实现迭代器协议,即包含 __iter__() 方法和 __next__() 方法。 迭代器的工作原理是每次调…

大模型- 检索增强七宗罪

前言 地址:https://arxiv.org/pdf/2401.05856.pdf 标题:Seven Failure Points When Engineering a Retrieval Augmented Generation System 这篇论文介绍了如何设计一个检索增强生成系统(RAG),作者通过对三个不同领域…

人工智能专题:通过AI转变保险(英译中)

今天分享的是人工智能系列深度研究报告:《人工智能专题:通过AI转变保险(英译中)》。 (报告出品方:VIEWPOINT) 在新时代释放数据的力量 在数据和人工智能 ( AI ) 融合的…

Mac配置Python3最简单的方法

此文介绍Mac用Anaconda配置Python3 达成效果 能让你目前只装有Python2的Mac装上Python3,同时拥有很多科学计算库 anaconda介绍 anaconda 是一个python的发行版,包括了python和很多常见的软件库, 和一个包管理器conda。常见的科学计算类的库都包含在里…

Java集合篇之深入解析ArrayList,这六问你答的上来吗?

写在开头 开年第一篇,先祝各位新的一年身体健康,学业有成,事业有成哈,春节期间就是咔咔乱吃,咔咔乱玩,把学习都抛一边子去了,已经9天没有学习了,深深的懊悔,从今天开始&…