RabbitMQ手动应答与持久化

1.SleepUtil线程睡眠工具类

package com.hong.utils;

/**
 * @Description: 线程睡眠工具类
 * @Author: hong
 * @Date: 2023-12-16 23:10
 * @Version: 1.0
 **/
public class SleepUtil {
    public static void sleep(int second) {
        try {
            Thread.sleep(1000*second);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

2.消息生产者

package com.hong.rabbitmq3;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * @Description: 消息手动应答时不丢失,放回队列重新消费
 * @Author: hong
 * @Date: 2023-12-16 22:33
 * @Version: 1.0
 **/
public class Task3 {

    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入:");
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("消息发送完成------" + message);
        }
    }
}

3.两个消费者

模拟一个处理速度快(Worker3),另一个处理速度慢(Worker4)

3.1.处理时间短

package com.hong.rabbitmq3;

import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description: 消息手动应答时不丢失,放回队列重新消费
 * @Author: hong
 * @Date: 2023-12-16 23:05
 * @Version: 1.0
 **/
public class Worker3 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        System.out.println("worker3等待接收消息,处理速度快");

        DeliverCallback deliverCallback = (comsumerTag, message) -> {
            SleepUtil.sleep(1);
            System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 第一个参数:消息标识
             * 第二个参数是否批量:true批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");
        //手动应答false
       channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

3.2.处理时间长

package com.hong.rabbitmq3;

import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description: 消息手动应答时不丢失, 放回队列重新消费
 * @Author: hong
 * @Date: 2023-12-16 23:05
 * @Version: 1.0
 **/
public class Worker4 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        System.out.println("worker4等待接收消息,处理速度慢");

        DeliverCallback deliverCallback = (comsumerTag, message) -> {
            SleepUtil.sleep(20);
            System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 第一个参数:消息标识
             * 第二个参数是否批量:true批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");
        
        //手动应答false
       channel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);
    }
}

4.结果

启动生产者后启动2个消费者,等消息bb接收到后,发送cc和dd
在这里插入图片描述
在这里插入图片描述
等Worker4接收到消息bb后将其关闭,发现原本该Worker4消费的消息dd并未丢失,重回队列被Worker3消费
在这里插入图片描述

5.持久化

5.1.队列持久化

package com.hong.rabbitmq4;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * @Description: 队列持久化
 * @Author: hong
 * @Date: 2023-12-17 22:52
 * @Version: 1.0
 **/
public class Task4 {
    public static final String TASK_QUEUE_NAME = "persist_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        //true持久化
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入:");
        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("消息发送完成------" + message);
        }
    }
}

5.2.消息持久化

package com.hong.rabbitmq4;

import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

import java.util.Scanner;

/**
 * @Description: 队列持久化与消息持久化
 * @Author: hong
 * @Date: 2023-12-17 22:52
 * @Version: 1.0
 **/
public class Task4 {
    public static final String TASK_QUEUE_NAME = "persist_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        //队列持久化   true持久化
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入:");
        while (scanner.hasNext()){
            String message = scanner.next();
            //消息持久化  MessageProperties.PERSISTENT_TEXT_PLAIN
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
            System.out.println("消息发送完成------" + message);
        }
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/253091.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HashMap构造函数解析与应用场景

目录 1. HashMap简介 2. HashMap的构造函数 2.1 默认构造函数 2.2 指定初始容量和加载因子的构造函数 3. 构造函数参数的影响 3.1 初始容量的选择 3.2 加载因子的选择 4. 构造函数的应用场景 4.1 默认构造函数的应用场景 4.2 指定初始容量和加载因子的构造函数的应用…

海安行车记录仪avi杀病毒导致文件丢失的恢复案例

海安行车记录仪,听名字就知道是个小小小品牌,而且用的文件格式是比较古老的AVI,这种文件格式是微软设计的,后来并没有普及(不支持4G以上大文件而且结构过于松散)。这个恢复案例比较特殊的地方是不太清楚做过…

教师如何维护学生的自尊心

作为教师,我们不仅要传授知识,更要关心学生的身心健康,特别是他们的自尊心。自尊心是个人自我价值的重要体现,对学生的学习、生活和未来的发展都有深远的影响。因此,维护学生的自尊心是教师的重要责任。 教师要尊重每…

[Verilog] Verilog 操作符与表达式

主页: 元存储博客 文章目录 前言1. 操作符2. 操作数3 表达式总结 前言 1. 操作符 图片来源: https://www.runoob.com/ Verilog语言中使用的操作符包括: 算术操作符:加法()、减法(-)、乘法(*)、除法(/)、取模(%)、自增()、自减(–…

常用网安渗透工具及命令(扫目录、解密爆破、漏洞信息搜索)

目录 dirsearch: dirmap: 输入目标 文件读取 ciphey(很强的一个自动解密工具): john(破解密码): whatweb指纹识别: searchsploit: 例1: 例2: 例3&…

no module named ‘xxx‘

目录结构如下 我想在GCNmodel的model里引入layers的GraphConvolution:from GCNmodel.layers import GraphConvolution,但这样却报错no module named GCNmodel,而且用from layers import GraphConvolution也不行。然后用sys.path.appen(xxx)…

MySQL数据库,表的增量备份与恢复

1. 从物理与逻辑的角度 数据库备份可以分为物理备份和逻辑备份。物理备份是对数据库操作系统的物理文件(如数据 文件,日志文件等)的备份。这种类型的备份适用于在出现问题时需要快速恢复的大型重要数据库。 物理备份又可以分为冷备份&#xf…

Redis Cluster集群搭建 三主三从

Redis包下载 Linux: http://download.redis.io/releases/ Mac or Windows: https://redis.io/download/ 2.下载后解压进入文件夹(本次我的Redis版本是6.2.14版本) /redis/redis-6.2.14 开始安装 make instarll修改配置文件复制redis.conf 6…

Ubuntu 常用命令之 chmod 命令用法介绍

chmod是Linux系统下的一个命令,用于改变文件或目录的权限。它的名称是“change mode”的缩写。在Linux中,文件或目录的权限分为读(r)、写(w)和执行(x)三种,分别对应数字4…

Python redis安装使用教程

一、项目环境 Python 3.8.xredis-5.0.14 二、Redis 安装 下载地址:https://github.com/tporadowski/redis/releases 下载 Redis-x64-xxx.zip压缩包到你要安装的文件夹,解压即可 三、使用redis 打开一个 cmd 窗口,使用 cd 命令切换redis…

(5)shell命令以及Linux的权限

写在前面 本章我们将重点讲解 Linux 权限,这是 Linux 基础部分中非常重要的一部分。内容比较干,我会稍稍正经些去讲解。话不多说,我们直接切入正题。 shell 命令及运行原理 严格意义上说的是一个操作系统,我们称之为 —— &…

MDK编译过程和文件类型

MDK是一款IDE软件,具有,编辑,编译,链接,下载,调试等等的功能。 1.编译器介绍: MDK可以编译C/C文件和汇编文件,MDK只是一款IDE软件,那他内部使用的是什么编译器呢&#x…

【已解决】Java zip解压时候 malformed input off : 4, length : 1

需求:通过页面上传ZIP文件后,对zip文件进行解压。 遇到的错误:在进行zip解压的时候错误如下: 先看报错前的: /*** 解压缩ZIP文件* param zipFile ZIP文件* param destDir 目标路径*/ public static void zipDecompre…

HIVE窗口函数

什么是窗口函数 hive中开窗函数通过over关键字声明;窗口函数,准确地说,函数在窗口中的应用;比如sum函数不仅可在group by后聚合,在可在窗口中应用; hive中groupby算子和开窗over,shuffle的逻辑…

时序数据库选型TimescaleDB

最近要做一个数字车间的物联网项目,数据存储成了首先要解决的问题,整个车间一共104台数控机床,1s钟采集1次数据,360024365*1043,279,744,000 ,一年要产生32亿条记录,这个数据量用常见的关系型数据库肯定是不…

phpMyAdmin的常见安装位置

nginx的日志显示有人一直在尝试访问phpMyAdmin的setup.php,用了各种位置。 其实我只有一个nginx,别的什么也没有。 47.99.136.156 - - [01:44:37 0800] "GET http://abc.com:80/phpMyAdmin/scripts/setup.php HTTP/1.0" 404 162 "-"…

新建vue3项目

三种方法 一. 第一种方式 1、操作步骤: 创建项目目录 vue create 项目名称选择配置方式 ? Please pick a preset: #选择一个配置 Default ([Vue 3] babel, eslint)Default ([Vue 2] babel, eslint)Manually select …

wordpress安装之正式开始安装wordpress

1、拉取wordpress镜像 docker pull wordpress 2、启动容器 启动容器,设置容器名为wordpress2并把80端口映射到宿主机的9988端口 docker run -it --name wordpress2 -p 9988:80 -d wordpress 3、查看容器状态 docker ps 4、安装wordpress博客程序 因为我们前面启…

「斗破年番」小医仙黑皇城遭调戏,五品丹换药材,获取菩提涎消息

Hello,小伙伴们,我是拾荒君。 《斗破苍穹年番》的第75集已经更新了,喜欢这部国漫的小伙伴应该都去观看了吧,拾荒君也是看了看这一集。在这一集中,萧炎成功地帮助吴昊等人摆脱了鹰爪老人的围困,然后便前往了黑皇城。 黑…

【JAVA-Day65】Java内部类深度解析

Java内部类深度解析 《Java内部类深度解析》摘要引言一、理解内部类1. 内部类的基本概念和语法1.1 什么是内部类?1.2 内部类的语法结构1.3 内部类的基本概念 2. 不同类型的内部类详解2.1 成员内部类2.2 静态内部类2.3 局部内部类2.4 匿名内部类 二、内部类与普通类的…