【Rabbitmq篇】高级特性----事务,消息分发

目录

事务

消息分发 

应用场景

 1. 限流

2.负载均衡 

事务

RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制.SpringAMQP也提供了对事务相关的操作.RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败.

何为原子性(面试重点)?

例如: 当A向B转账1000元,会经历俩个步骤

1.A 向 B 转账 1000元 A的账号将会减去1000元

2.B将会收到1000元 B的账号将会增加1000元

可是,如果遇到极端情况,当A向B转账1000元时,A-1000元已完成,这个时候系统出现故障,导致A-1000 但是B却没有接收到 那么1000元将无缘无故丢失了 ,肯定不会允许这种事情发生,不然谁还敢转账。

此时就是将1操作和2操作绑定在一起,要么同时完成,要么一个都不执行

当出现1执行失败的时候,将1操作进行“回滚”,回到原来的状态,就当一切都没发生过

接下来实现rabbitmq的事务

声明队列:

    //事务
    public static final String TRANS_QUEUE = "trans_queue";

    @Bean("transQueue")
    public Queue transQueue() {
        return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
    }

 配置事务管理器:

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
    @Bean("transRabbitTemple")
    public RabbitTemplate transRabbitTemple(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //开启事务
        rabbitTemplate.setChannelTransacted(true);
        return  rabbitTemplate;
    }

生产者代码编写:

    @RequestMapping("/trans")
    public String trans() {
        System.out.println("trans test...");
        transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 1...");
        int num = 5/0;
        transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 2...");
        return "消息发送成功";
    }

测试:

1)不带 @Transactional 带异常的发送 看看会发生什么?

 

此时只有发送的第一条消息,紧接着发生了异常导致第二条消息未发送成功  

 

2) 带 @Transactional 带异常的发送 看看会发生什么? 

    @Transactional
    @RequestMapping("/trans")
    public String trans() {
        System.out.println("trans test...");
        transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 1...");
        int num = 5/0;
        transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 2...");
        return "消息发送成功";
    }

 

此时发生异常 本来发送了一条消息 但有异常,进行了回滚,当做没发生

也证明了我们事务的可靠性 

 3)带 @Transactional 不带异常的发送 看看会发生什么?

    @Transactional
    @RequestMapping("/trans")
    public String trans() {
        System.out.println("trans test...");
        transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 1...");
//        int num = 5/0;
        transRabbitTemplate1.convertAndSend(Constants.TRANS_EXCHANGE, "trans", "trans test 2...");
        return "消息发送成功";
    }

 此结果一切正常


消息分发 

RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者.每条消息只会发送给订阅列表里的⼀个消费者.这种方式⾮常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可.


默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息.这种方式是不太合理的,试想⼀下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降.

这样A都做完了10个任务,B还在写第一个任务,这样将会大大影响效率,从而导致整个的效率下降

如何处理呢我们可以使用前面章节讲到的channel.basicQos(intprefetchCount)方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量

比如:消费端调用了channelbasicQos(1),

此时A接收1条信息,并且消费1条 B同时也接收1条信息 但是它效率比较慢 所有它还在消费 而A处理完1条消息又接着处理第二条消息,属于多劳多得,并不会因为B影响整体的效率

应用场景

 1. 限流

如下使用场景:
订单系统每秒最多处理5000请求,正常情况下,订单系统可以正常满足需求
但是在秒杀时间点,请求瞬间增多,每秒1万个请求,如果这些请求全部通过MQ发送到订单系统,无疑会把订单系统压垮.

RabbitMQ提供了限流机制,可以控制消费端⼀次只拉取N个请求
通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答
prefetchCount:控制消费者从队列中预取(prefetch)消息的数量,以此来实现流控制和负载均衡.

1) 配置prefetch参数,设置应答方式为手动应答 

 2) 配置交换机,队列

package com.bite.extensions.config;

import com.bite.extensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QosConfig {

    @Bean("qosQueue")
    public Queue qosQueue() {
        return QueueBuilder.durable(Constants.QOS_QUEUE).build();
    }
    @Bean("qosExchange")
    public DirectExchange qosExchange() {
        return  ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();
    }
    @Bean("qosBinding")
    public Binding qosBinding(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("qos");
    }
}

3) 生产者

    @RequestMapping("/qos")
    public String qos() {
        System.out.println("qos test...");
        for (int i = 0; i < 15; i++) {
            rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test i..."+i);
        }
        return "消息发送成功";
    }

4)消费者

package com.bite.extensions.listener;

import com.bite.extensions.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class QosListener {
    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void handleMessage(Message message, Channel channel) throws Exception {
        //消费者逻辑
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.printf("[qos.queue]接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
/*            //业务逻辑处理
            System.out.println("业务逻辑处理!");
            //肯定确认
            channel.basicAck(deliverTag,false);*/
        } catch (Exception e) {
            //否定确认
            channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
        }
    }
}

 5)测试1 未设置肯定确认情况

此时将会只接收到5条,并且会阻塞住,达到一个限流的状态

测试2

把 prefetch: 5 注掉 再观看结果

此时将会一次性把队列的消息全部发送,并且全部消费

2.负载均衡 

如下图,在有两个消费者的情况下,⼀个消费者处理任务非常快,另⼀个非常慢,就会造成⼀个消费者会⼀直很忙,而另⼀个消费者很闲.这是因为RabbitMQ只是在消息进入队列时分派消息.它不考虑消费者未确认消息的数量.

我们可以使用设置prefetch=1的⽅式,告诉RabbitMQ⼀次只给⼀个消费者⼀条消息,也就是说,在处理并确认前⼀条消息之前,不要向该消费者发送新消息.相反,它会将它分派给下⼀个不忙的消费者. 

消费者: 

package com.bite.extensions.listener;

import com.bite.extensions.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class QosListener {
    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void handleMessage(Message message, Channel channel) throws Exception {
        //消费者逻辑
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.printf("第一个消费者 接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
            Thread.sleep(3000);
            channel.basicAck(deliverTag,false);
        } catch (Exception e) {
            //否定确认
            channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
        }
    }
    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void handleMessage2(Message message, Channel channel) throws Exception {
        //消费者逻辑
        long deliverTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.printf("第二个消费者 接收到信息: %s, deliveryTag: %d\n",new String(message.getBody(),"UTF-8"),deliverTag);
            Thread.sleep(1000);
            channel.basicAck(deliverTag,false);
        } catch (Exception e) {
            //否定确认
            channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
        }
    }
}

 

 结果:

这里可以看出每个消费者以不同的速度完成某项任务 以防止一个消费者未完成等很久的情况


结语: 写博客不仅仅是为了分享学习经历,同时这也有利于我巩固知识点,总结该知识点,由于作者水平有限,对文章有任何问题的还请指出,接受大家的批评,让我改进。同时也希望读者们不吝啬你们的点赞+收藏+关注,你们的鼓励是我创作的最大动力!  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/924702.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

优先算法 —— 双指针系列 - 有效三角形的个数

1. 有效三角形的个数 题目链接&#xff1a; 611. 有效三角形的个数 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/valid-triangle-number/description/ 2. 题目解析 以示例1为例&#xff1a; 3. 优化 我们都知道&#xff0c;判断三角形的方法就是两边相…

【H2O2|全栈】Node.js(2)

目录 前言 开篇语 准备工作 npm 概念 常见指令 项目中的包 创建项目 启动项目 服务器搭建 express 基本步骤 搭建应用 创建路由 监听端口 启动服务器 面试相关 结束语 前言 开篇语 本系列博客分享Node.js的相关知识点&#xff0c;本章讲解npm与服务器的简单…

Android 13 Aosp 默认允许应用动态权限

图库 frameworks/base/services/core/java/com/android/server/pm/permission/DefaultPermissionGrantPolicy.java 修改 public void grantDefaultPermissions(int userId) {DelayingPackageManagerCache pm new DelayingPackageManagerCache();grantPermissionsToSysCompon…

【NLP高频面题 - LLM架构篇】LLM对Transformer都有哪些优化?

【NLP高频面题 - LLM架构篇】LLM对Transformer都有哪些优化&#xff1f; ⚠︎ 重要性&#xff1a;★★★ &#x1f4af; NLP Github 项目&#xff1a; NLP 项目实践&#xff1a;fasterai/nlp-project-practice 介绍&#xff1a;该仓库围绕着 NLP 任务模型的设计、训练、优化、…

DAY139权限提升-Linux系统权限提升篇Vulnhub辅助项目SUID权限SUDO指令版本漏洞

Linux提权 1、内核溢出提权 2、suid、sudo、nfs、path、ld_preload、cron、lxd、capability、rbash等 3、数据库类型提权 Linux&#xff1a; 系统用户&#xff1a;UID(0-999) 普通用户&#xff1a;UID(1000-*) root用户&#xff1a;UID为0&#xff0c;拥有系统的完全控制…

notepad++文件github下载

1、github下载网址&#xff1a;Releases notepad-plus-plus/notepad-plus-plus GitHub 2、找到操作系统支持的软件&#xff1a; 3、CSDN下载链接&#xff1a;https://download.csdn.net/download/u013083576/90046203

无人机应用板卡详解!

一、核心技术 无人机板卡的核心技术主要包括但不限于以下几种&#xff1a; 通信技术&#xff1a;无人机板卡通常集成了各种通信技术&#xff0c;如无线电通信、卫星通信等&#xff0c;以实现远程控制和数据传输。这些技术确保了无人机能够在复杂环境中保持稳定的通信连接。 …

分布式链路追踪系统

系统现状及需要解决的问题 系统异常无法接收告警 系统总会有这样或者那样的问题&#xff0c;同样的现象可能是不同的系统问题引起的&#xff0c;解决这些问题是研发的基本职责之一。 但是解决问题的前提是发现问题&#xff0c;系统告警就是我们发现感知问题的重要的手段&…

qt音频实战

一、Qt音频基础知识 1、QT multimedia 2、QMediaPlayer类&#xff1a;媒体播放器&#xff0c;主要用于播放歌曲、网络收音机等功能。 3、QMediaPlaylist类&#xff1a;专用于播放媒体内容的列表。 二、界面设计 三、代码 #include "mainwindow.h" #include "…

【Linux】剧幕中的灵魂更迭:探索Shell下的程序替换

&#x1f3ac; 个人主页&#xff1a;谁在夜里看海. &#x1f4d6; 个人专栏&#xff1a;《C系列》《Linux系列》《算法系列》 ⛰️ 一念既出&#xff0c;万山无阻 目录 &#x1f4d6;一、进程程序替换 1.替换的演示 ❓替换与执行流 ❓程序替换≠进程替换 2.替换的原理 …

DIY-Tomcat项目 part 1 实现和测试Request以及Response

实现Request package Webserver.src.connector;import java.io.IOException; import java.io.InputStream;/* GET /index.html HTTP/1.1Host: localhost:8888Connection: keep-aliveCache-Control: max-age0Upgrade-Insecure-Requests: 1User-Agent: Mozilla/5.0 */public cla…

使用IDEA编写测试用例,复杂度校验

最近我们公司要求开发人员必须写测试用例&#xff0c;组织了TDD培训&#xff0c;测试驱动开发&#xff0c;同时衡量代码的圈复杂度&#xff0c;我记录下初次使用的过程。 编写测试用例&#xff0c;查看用例覆盖度 1、要编写测试用例&#xff0c;并看下测试用例的覆盖度&#…

Linux——用户级缓存区及模拟实现fopen、fweite、fclose

linux基础io重定向-CSDN博客 文章目录 目录 文章目录 什么是缓冲区 为什么要有缓冲区 二、编写自己的fopen、fwrite、fclose 1.引入函数 2、引入FILE 3.模拟封装 1、fopen 2、fwrite 3、fclose 4、fflush 总结 前言 用快递站讲述缓冲区 收件区&#xff08;类比输…

python学opencv|读取图像

【1】引言 前序学习了使用matplotlib模块进行画图&#xff0c;今天开始我们逐步尝试探索使用opencv来处理图片。 【2】学习资源 官网的学习链接如下&#xff1a; OpenCV: Getting Started with Images 不过读起来是英文版&#xff0c;可能略有难度&#xff0c;所以另推荐一…

多模态大模型打造沉浸式社交体验,Soul App创始人张璐团队海外首秀GITEX GLOBAL

2024年10月14日至18日,全球科技盛会GITEX GLOBAL在迪拜举办,各大科技企业汇聚一堂,展示前沿技术。在这次大会上,中国社交平台Soul App首次亮相国际大型展会,展示了由Soul App创始人张璐团队研发的多模态AI交互方案,吸引了海外来宾的目光。作为国内较早将AI引入社交关系的社交平…

Android 实现悬浮球的功能

Android 实现悬浮球的功能 在 Android 中&#xff0c;实现悬浮球可以通过以下方式实现&#xff0c;常见的方法是使用 WindowManager 创建一个悬浮窗口。以下是具体的实现步骤&#xff1a; 1. 配置权限 在 AndroidManifest.xml 中添加悬浮窗权限&#xff1a; <uses-permis…

Python与Amazon DynamoDB:构建高效爬虫数据存储解决方案

欢迎访问个人博客地址&#xff1a;https://blog.jiumoz.top/archives/pythonyu-amazon-dynamodb-gou-jian-gao-xiao-pa-chong-shu-ju-cun-chu-jie-jue-fang-an 1. 引言 1.1. 爬虫与NoSQL Python爬虫是一种通过模拟浏览器行为&#xff0c;从互联网上自动抓取数据的工具。它利…

Git 进程占用报错-解决方案

背景 大仓库&#xff0c;由于开发者分支较多&#xff0c;我们在使用 git pull 或 git push 等命令时&#xff08;与远端仓库交互的命令&#xff09;&#xff0c;不知之前配置了什么&#xff0c;我的电脑会必现以下报错&#xff08;有非常长一大串报错-不同分支的git进程占用报…

STM32完全学习——使用标准库完成PWM输出

一、TIM2初始化 我这里使用的是STM32F407ZGT6这个芯片&#xff0c;我这里使用的是定时器TIM2来完成PWM输出&#xff0c;由于这里没有使用中断&#xff0c;因此不需要初始化NVIC&#xff0c;下面先来进行定时器的相关初始化 TIM_TimeBaseInitTypeDef TIM_TimeBaseInitStruct;R…

【爬虫框架:feapder,管理系统 feaplat】

github&#xff1a;https://github.com/Boris-code/feapder 爬虫管理系统 feaplat&#xff1a;http://feapder.com/#/feapder_platform/feaplat 爬虫在线工具库 &#xff1a;http://www.spidertools.cn &#xff1a;https://www.kgtools.cn/1、feapder 简介 对于学习 Python…