【RabbitMQ】消息分发、事务

消息分发

概念

RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经消费并且已经确认了该消息。这种方式是不大合理的。试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

在工作模式一文中,书写RPC模式的代码时,已经写了一行代码channel.basicQos(1),来限制当前信道上的消费者所能保持的最大未确认消息的数量是1。所以,我们只需要使用此方法来限制每一个消费者的消息数量就可以避免上述情况发生。

比如,消费端调用了channel.basicQos(5),RabbitMQ就会为该消费者计数,发送一条消息计数加一,消费一条消息计数减一。当到达了设定的上限之后,RabbitMQ就不会再向该消费者发送消息了,知道消费者确认了某条消息之后,才会继续发送。

当channel.basicQos(int prefetchCount)中的形参个数为0时,表示的是没有上限。

应用场景

  1. 限流
  2. 非公平分发(负载均衡)

限流

在学习消息分发之前,当消息到达队列之后,如果有对应的消费者存在,那么队列就会一股脑把所有消息全部发送过去,从而造成瞬间压力,进而可能造成服务宕机,产生严重的影响。因此我们就要进行限流,限制消费者接收消息的数量。

限流通过设置prefetchCount参数,同时也必须要设置消息应答方式为手动应答。

spring:
  rabbitmq:
    host: 43.138.108.125
    port: 5672
    username: admin
    password: admin
    virtual-host: mq-springboot-test
    listener:
      simple:
        acknowledge-mode: manual # 消息确认机制为手动确认
        prefetch: 5 # 最多拉取5条消息
@Configuration
public class QosConfig {

    @Bean("qosQueue")
    public Queue qosQueue() {
        return QueueBuilder.durable(Constants.QOS_QUEUE).build();
    }

    @Bean("qosExchange")
    public Exchange qosExchange() {
        return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();
    }

    @Bean("qosQueueBind")
    public Binding qosQueueBind(@Qualifier("qosQueue") Queue queue, @Qualifier("qosExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();
    }

}
@RestController
@RequestMapping("/qos")
public class QosController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping
    public void qosQueue() {
        for (int i = 0; i < 10; i++) {
            this.rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "hello qos " + i);
            System.out.println("第" + i + "次发送消息成功!");
        }
    }

}
@Configuration
public class QosListener {

    @RabbitListener(queues = Constants.QOS_QUEUE)
    public void qosListener(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收的消息为:" + msg);
        // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

 

启动程序之后,可以看到出现如上结果,明显看到,我们发送了10条信息,但是由于限流的原因,当消费者接收了5条消息之后,并且没有去应答,因此程序就不再继续接收消息,而是等待这5条消息应答之后,才会去继续接收消息。

负载均衡

在有两个消费者的情况下,一个消费者处理任务非常快,一个消费者处理任务非常慢,就会造成一个消费者会一直很忙,而另一个消费者会很闲。这是因为RabbitMQ只是在消息进入队列时进行分派消息,他不考虑消费者未确认消息的数量。我们可以使用prefetch=1的方式来进行设置,告诉RabbitMQ一次只给一个消费者一条消息。在消费者处理并确认该消息之前,都不向其发送新的消息。这样做就可以使得有消息时,所有消费者都处理忙碌的状态。

实现负载均衡功能的代码和实现限流的代码类似,只需要将配置文件中的prefetch修改为1即可。

事务

RabbitMQ也实现了事务机制,允许开发者确保消息的接收和发送是原子性的,要么全部成功,要把全部失败。

@Component
public class RabbitTemplateConfig {

    @Bean("transactionRabbitTemplate")
    public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setChannelTransacted(true); // 开启事务
        return rabbitTemplate;
    }

}
@Configuration
public class TransactionConfig {

    @Bean("transactionQueue")
    public Queue transactionQueue() {
        return QueueBuilder.durable(Constants.TRANSACTION_QUEUE).build();
    }

    @Bean("transactionExchange")
    public Exchange transactionExchange() {
        return ExchangeBuilder.directExchange(Constants.TRANSACTION_EXCHANGE).durable(true).build();
    }

    @Bean("transactionQueueBind")
    public Binding transactionQueueBind(@Qualifier("transactionQueue") Queue queue,
                                       @Qualifier("transactionExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("transaction").noargs();
    }

}
@RestController
@RequestMapping("/transaction")
public class TransactionController {

    @Resource(name = "transactionRabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    @Transactional
    @RequestMapping
    public void transactionQueue() {
        System.out.println("发送成功");
         this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");
         int i = 1 / 0;
        this.rabbitTemplate.convertAndSend(Constants.TRANSACTION_EXCHANGE, "transaction", "hello transaction");
    }

}

RabbitMQ和Redis中的事务相对来说,都是比较简单的,并不和MySQL,包含那么多的性质。因此,在对事务的介绍中,并没有大幅度进行介绍。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/881684.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深度学习01-概述

深度学习是机器学习的一个子集。机器学习是实现人工智能的一种途径&#xff0c;而深度学习则是通过多层神经网络模拟人类大脑的方式进行学习和知识提取。 深度学习的关键特点&#xff1a; 1. 自动提取特征&#xff1a;与传统的机器学习方法不同&#xff0c;深度学习不需要手动…

【数据库】常用数据库简介

目录 &#x1f354; 常用的关系型数据库 &#x1f354; Mysql简介 &#x1f354; SQL 简介 SQL语句的分类 SQL 写法 SQL 常用的数据类型 &#x1f354; DDL语句 对数据库的操作 对数据表的操作 &#x1f354; DML语句 插入数据 insert into 修改数据 update 删除数…

python实现多个pdf文件合并

打印发票时&#xff0c;需要将pdf合并成一个&#xff0c;单页两张打印。网上一些pdf合并逐渐收费&#xff0c;这玩意儿都能收费&#xff1f;自己写一个脚本使用。 实现代码&#xff1a; 输入pdf文件夹路径data_dir&#xff0c;统计目录下的“合并后的PDF”文件夹下&#xff0c;…

linux重要文件

/etc/sysconfig/network-scripts/ifcfg-eth1 网卡重启 /etc/init.d/network restart ifup ethname & ifdown ethname /etc/resolv.conf 设置Linux本地的客户端DNS的配置文件 linux客户端DNS可以在网卡配置文件(/etc/sysconfig/network/ifcfg-eth0 DNS2)里配置 也可以在/et…

Java_Day04学习

类继承实例 package com.dx.test03; public class extendsTest {public static void main(String args[]) {// 实例化一个Cat对象&#xff0c;设置属性name和age&#xff0c;调用voice()和eat()方法&#xff0c;再打印出名字和年龄信息/********* begin *********/Cat cat ne…

Pandas -----------------------基础知识(一)

目录 Series对象 属性和方法 布尔值列表获取Series对象中部分数据 运算 DateFrame对象 常用属性 常见方法 运算 总结 Series对象 是DataFrame的列对象或者行对象 生成Series对象生成索引使用元组创建Series对象使用字典创建Series对象 通过Pandas创建对象 自定义索引 …

面试官问:你最自豪的成就是什么?

当面试官问你最自豪的成就是什么&#xff0c;我们首先分析面试官为什么这么问&#xff0c;他想通过这问题得到什么信息&#xff1f; 你最自豪的成就是什么&#xff1f; 其实反应了一个人的职业驱动力&#xff0c;比如我们常说的&#xff1a;上进心&#xff0c;主动积极性&…

【机器学习-监督学习】朴素贝叶斯

【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈Python机器学习 ⌋ ⌋ ⌋ 机器学习是一门人工智能的分支学科&#xff0c;通过算法和模型让计算机从数据中学习&#xff0c;进行模型训练和优化&#xff0c;做出预测、分类和决策支持。Python成为机器学习的首选语言&#xff0c;…

【小沐学GIS】基于Openstreetmap创建Sionna RT场景(Python)

文章目录 1、简介1.1 blender 2、下载和安装2.1 Python2.2 jupyter 3、运行结语 1、简介 1.1 blender https://www.blender.org/ Blender 是一款免费开源的3D创作套件。 使用 Blender&#xff0c;您可以创建3D可视化效果&#xff0c;例如静态图像、3D动画、VFX&#xff08;…

【UE5】将2D切片图渲染为体积纹理,最终实现使用RT实时绘制体积纹理【第一篇-原理】

如果想直接制作&#xff0c;请看【第二篇】内容 这次做一个这样的东西&#xff0c;通过在2DRT上实时绘制&#xff0c;生成动态的体积纹理&#xff0c;也就是可以runtime的VDB 设想的文章流程: 对原理进行学习制作体积渲染制作实时绘制 第一篇&#xff08;本篇&#xff09;是对“…

【Rust练习】16.模式

文章题目来自&#xff1a;https://practice-zh.course.rs/pattern-match/patterns.html 1 &#x1f31f;&#x1f31f; 使用 | 可以匹配多个值, 而使用 … 可以匹配一个闭区间的数值序列 fn main() {} fn match_number(n: i32) {match n {// 匹配一个单独的值1 > println!(…

【赵渝强老师】K8s中的Deployment控制器

K8s的Deployment将Pod部署成无状态的应用程序&#xff0c;它只关心Pod的数量、Pod更新方式、使用的镜像和资源限制等。由于是无状态的管理方式&#xff0c;因此Deployment中没有角色和顺序的概念&#xff0c;换句话说&#xff1a;Deployment中没有状态。   通过使用Deploymen…

【远程调用PythonAPI-flask】

文章目录 前言一、Pycharm创建flask项目1.创建虚拟环境2.创建flask项目 二、远程调用PythonAPI——SpringBoot项目集成1.修改PyCharm的host配置2.防火墙设置3.SpringBoot远程调用PythonAPI 前言 解决Pycharm运行Flask指定ip、端口更改无效的问题 首先先创建一个新的flask项目&…

C语言 | Leetcode C语言题解之第415题字符串相加

题目&#xff1a; 题解&#xff1a; char* addStrings(char* num1, char* num2) {int i strlen(num1) - 1, j strlen(num2) - 1, add 0;char* ans (char*)malloc(sizeof(char) * (fmax(i, j) 3));int len 0;while (i > 0 || j > 0 || add ! 0) {int x i > 0 ?…

Games101学习 - 着色

本文主要讲述Games101中的着色部分。 文中将使用UE的UTexture2D接口&#xff0c;若不了解可以看这篇&#xff1a; https://blog.csdn.net/grayrail/article/details/142165442 1.面积比计算三角形坐标 通过三角形面积比可以得到三角形的坐标alpha、beta、gamma从而进行插值&a…

ChatGPT 4o 使用指南 (9月更新)

首先基础知识还是要介绍得~ 一、模型知识&#xff1a; GPT-4o&#xff1a;最新的版本模型&#xff0c;支持视觉等多模态&#xff0c;OpenAI 文档中已经更新了 GPT-4o 的介绍&#xff1a;128k 上下文&#xff0c;训练截止 2023 年 10 月&#xff08;作为对比&#xff0c;GPT-4…

【Linux笔记】虚拟机内Linux内容复制到宿主机的Window文件夹(文件)中

一、共享文件夹 I、Windows宿主机上创建一个文件夹 目录&#xff1a;D:\Centos_iso\shared_files II、在VMware中设置共享文件夹 1、打开VMware Workstation 2、选择需要设置的Linux虚拟机&#xff0c;点击“编辑虚拟机设置”。 3、在“选项”标签页中&#xff0c;选择“共…

Vue学习记录之三(ref全家桶)

ref、reactive是在 setup() 声明组件内部状态用的&#xff0c; 这些变量通常都要 return 出去&#xff0c;除了供 < template > 或渲染函数渲染视图&#xff0c;也可以作为 props 或 emit 参数 在组件间传递。它们的值变更可触发页面渲染。 ref &#xff1a;是一个函数&…

前端组件库

vant2现在的地址 Vant 2 - Mobile UI Components built on Vue

【学习笔记】手写Tomcat 四

目录 一、Read 方法返回 -1 的问题 二、JDBC 优化 1. 创建配置文件 2. 创建工具类 3. 简化 JDBC 的步骤 三、修改密码 优化返回数据 创建修改密码的页面 注意 测试 四、优化响应动态资源 1. 创建 LoginServlet 类 2. 把登录功能的代码放到 LoginServlet 类 3. 创…