RabbitMQ中的Work Queues模式

在现代分布式系统中,消息队列(Message Queue)是实现异步通信和解耦系统的关键组件之一。RabbitMQ 是一个广泛使用的开源消息代理软件,支持多种消息传递模式。其中,Work Queues(工作队列)模式是一种常见的模式,用于在多个消费者之间分配任务,从而实现负载均衡和提高系统的处理能力。下面将详细介绍 RabbitMQ 中的 Work Queues 模式。

1. 什么是 Work Queues 模式?

Work Queues 模式(也称为任务队列模式)是一种消息传递模式,用于在多个消费者之间分配任务。在这种模式下,生产者将任务(消息)发送到队列中,多个消费者从队列中获取任务并进行处理。每个任务只会被一个消费者处理,从而实现负载均衡。

在这里插入图片描述

Work Queues 模式的主要优点包括:

  1. 负载均衡:多个消费者可以并行处理任务,从而提高系统的处理能力。
  2. 解耦:生产者和消费者之间通过队列进行通信,彼此之间不需要直接交互。
  3. 异步处理:任务可以异步处理,生产者不需要等待任务完成即可继续执行其他操作。

2. Work Queues 模式的工作原理

2.1 生产者(Producer)

生产者负责将任务(消息)发送到队列中。生产者不需要知道有多少消费者会处理这些任务,只需要将任务发送到队列即可。

2.2 队列(Queue)

队列是消息的缓冲区,用于存储生产者发送的任务。队列可以有多个消费者,但每个任务只会被一个消费者处理。

2.3 消费者(Consumer)

消费者从队列中获取任务并进行处理。多个消费者可以并行处理任务,从而实现负载均衡。消费者可以是独立的进程、线程或服务。

2.4 消息确认(Message Acknowledgment)

为了确保任务能够可靠地处理,RabbitMQ 提供了消息确认机制。消费者在处理完任务后,需要向 RabbitMQ 发送确认消息,告知任务已经处理完成。如果消费者在处理任务时崩溃,RabbitMQ 会将未确认的任务重新分配给其他消费者。

2.5 公平分发(Fair Dispatch)

默认情况下,RabbitMQ 会按顺序将任务分发给消费者。然而,如果某些消费者处理任务的速度较慢,可能会导致任务堆积。为了避免这种情况,可以使用 basicQos 方法设置预取计数(prefetch count),限制每个消费者一次可以获取的任务数量,从而实现更公平的分发。

3. 环境准备

在开始之前,确保你已经安装了以下环境:

  • Java 开发环境(JDK 8 或更高版本)
  • RabbitMQ 服务器(已启动并运行)
  • Maven(用于管理依赖)

3.1 添加依赖

首先,在你的项目中添加 RabbitMQ 客户端库的依赖。如果你使用的是 Maven,可以在 pom.xml 中添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

4. 代码案例

4.1 生产者代码

生产者负责将任务发送到队列。以下是一个简单的生产者代码示例:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private static final String QUEUE_NAME = "work_queues";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            //4.将消息发送到队列
            for (int i = 1; i <= 10; i++) {
                String message = "Task to be processed, " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

代码解释:

  • ConnectionFactory: 用于创建与 RabbitMQ 服务器的连接。
  • Connection: 表示与 RabbitMQ 服务器的物理连接。
  • Channel: 表示与 RabbitMQ 服务器的逻辑连接,用于发送和接收消息。
  • queueDeclare: 声明一个队列,true 表示队列是持久的。
  • basicPublish: 将消息发布到队列,使用默认交换机(空字符串)。

4.2 消费者代码

消费者负责从队列中获取任务并处理。以下是一个简单的消费者代码示例:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    private static final String QUEUE_NAME = "work_queues";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.138");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test");
        factory.setPassword("test");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1); // 每次只处理一个消息

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");

            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                //手动ack,因为不同的机器处理速度不一样,因此不同的机器会在不同时间应答,这样机器就可以根据实际能力处理了
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };

        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

代码解释:

  • basicQos(1): 设置每次只处理一个消息,确保任务的公平分配。
  • DeliverCallback: 定义消息处理逻辑,doWork 方法模拟任务处理过程。
  • basicAck: 确认消息处理完成,从队列中移除消息。

5. 运行示例

  1. 启动 RabbitMQ 服务器:确保 RabbitMQ 服务器已启动并运行。
  2. 运行多个消费者:启动多个消费者实例,确保它们连接到同一个队列。
  3. 运行生产者:启动生产者实例,发送任务到队列。

示例输出:

  • 生产者输出

     [x] Sent 'Task to be processed, 1'
     [x] Sent 'Task to be processed, 2'
     [x] Sent 'Task to be processed, 3'
     [x] Sent 'Task to be processed, 4'
     [x] Sent 'Task to be processed, 5'
     [x] Sent 'Task to be processed, 6'
     [x] Sent 'Task to be processed, 7'
     [x] Sent 'Task to be processed, 8'
     [x] Sent 'Task to be processed, 9'
     [x] Sent 'Task to be processed, 10'
    
  • 消费者1输出

     [*] Waiting for messages. To exit press CTRL+C
     [x] Received 'Task to be processed, 1'
     [x] Done
     [x] Received 'Task to be processed, 4'
     [x] Done
     [x] Received 'Task to be processed, 6'
     [x] Done
     [x] Received 'Task to be processed, 8'
     [x] Done
     [x] Received 'Task to be processed, 10'
     [x] Done
    
  • 消费者2输出

     [*] Waiting for messages. To exit press CTRL+C
     [x] Received 'Task to be processed, 2'
     [x] Done
     [x] Received 'Task to be processed, 3'
     [x] Done
     [x] Received 'Task to be processed, 5'
     [x] Done
     [x] Received 'Task to be processed, 7'
     [x] Done
     [x] Received 'Task to be processed, 9'
     [x] Done
    

在这里插入图片描述

6. 总结

本文详细介绍了如何在 RabbitMQ 中实现 Work Queues 模式,包括生产者、默认交换机、队列和多个消费者的设计与实现。通过使用 RabbitMQ 的 Java 客户端库,我们可以轻松地实现任务的分配和处理。Work Queues 模式非常适合需要将任务分配给多个消费者处理的场景,如任务调度、日志处理等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/937933.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构(二叉树)

前言&#xff1a; 在数据结构那片浩瀚无垠、仿若神秘宇宙的天地里&#xff0c;二叉树宛如一颗散发着独特光辉、极为耀眼的星辰。它就像一位技艺精湛的建筑师&#xff0c;运用独特的二叉分支结构&#xff0c;精心构建起层次分明、秩序井然的数据组织 “大厦”。根节点仿若大厦的…

React简单入门 - [Next.js项目] - 页面跳转、AntD组件、二级目录等

须知 1Next.js 官网(英文)Next.js by Vercel - The React Framework2Next.js 文档(中文)简介 | Next.js 中文文档3React官网(中文)https://react.docschina.org/learn4Ant Design组件总览组件总览 - Ant Design5tailwindcss类名大全 官网英Justify Content - TailwindCS…

【MySQL数据库】Ubuntu下的mysql

目录 1&#xff0c;安装mysql数据库 2&#xff0c;mysql默认安装路径 3&#xff0c;my.cnf配置文件 4&#xff0c;mysql运用的相关指令及说明 5&#xff0c;数据库、表的备份和恢复 mysql是一套给我们提供数据存取的&#xff0c;更加有利于管理数据的服务的网络程序。下面…

elasticsearch设置密码访问

1 用户认证介绍 默认ES是没有设置用户认证访问的&#xff0c;所以每次访问时&#xff0c;直接调相关API就能查询和写入数据。现在做一个认证&#xff0c;只有通过认证的用户才能访问和操作ES。 2 开启加密设置 1.生成证书文件 /usr/share/elasticsearch/bin/elasticsearch-…

深入理解 CSS 文本换行: overflow-wrap 和 word-break

前言 正常情况下&#xff0c;在固定宽度的盒子中的中文会自动换行。但是&#xff0c;当遇到非常长的英文单词或者很长的 URL 时&#xff0c;文本可能就不会自动换行&#xff0c;而会溢出所在容器。幸运的是&#xff0c;CSS 为我们提供了一些和文本换行相关的属性&#xff1b;今…

Java 垃圾回收机制详解

1 垃圾回收的概念 垃圾回收&#xff08;Garbage Collection&#xff0c;GC&#xff09;是自动管理内存的一种机制&#xff0c;用于释放不再使用的对象所占用的内存空间&#xff0c;防止内存溢出。垃圾回收器通过识别和回收那些已经死亡或长时间未使用的对象&#xff0c;来优化…

RabbitMQ全局流量控制

RabbitMQ全局流量控制 流控机制流控是对什么进行控制&#xff1f;rabbitmq进程邮箱流控机制是什么&#xff1f; 流控原理流控原理流程 流控状态显示流控对象流控机制对象主要进程各进程状态情形分析 性能提升提升队列性能方式 当消息积压时&#xff0c;消息会进入到队列深处&am…

Jetpack Compose赋能:以速破局,高效打造非凡应用

Android Compose 是谷歌推出的一种现代化 UI 框架&#xff0c;基于 Kotlin 编程语言&#xff0c;旨在简化和加速 Android 应用开发。它以声明式编程为核心&#xff0c;与传统的 View 系统相比&#xff0c;Compose 提供了更直观、更简洁的开发体验。以下是对 Android Compose 的…

40 基于单片机的温湿度检测判断系统

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于STC89C52单片机&#xff0c;采用dht11温湿度传感器检测温湿度&#xff0c; 通过lcd1602显示屏各个参数&#xff0c;四个按键分别可以增加温湿度的阈值&#xff0c; 如果超过阈值&#xff0c;则…

【cocos creator】按照行列数创建格子布局

调用 this.creatLayout(5, 5, this.boxNode, this.rootNode) //限制数量 this.creatLayout(5, 5, this.boxNode, this.rootNode, cc.v3(0, 0), 10, 10, 23) /*** 创建格子布局* param xCount 列数量* param yCount 行数量* param prefab 预制体* param root 根节点* param root…

211-基于FMC的1路1.5G ADC 1路 2.5G DAC子卡

一、板卡概述 FMC-1AD-1DA-1SYNC是我司自主研发的一款1路1G AD采集、1路2.5G DA回放的FMC、1路AD同步信号子卡。板卡采用标准FMC子卡架构&#xff0c;可方便地与其他FMC板卡实现高速互联&#xff0c;可广泛用于高频模拟信号采集等领域。 二、功能介绍 2.1 原理框图 2.2 硬件…

前端性能优化(理念篇)

前端性能优化&#xff08;理念篇&#xff09; 前言 其实前端性能优化&#xff0c;按照我的理解&#xff0c;首先你公司的硬件条件跟其它资源跟的上&#xff0c;比如服务器资源&#xff0c;宽带怎么样&#xff0c;还有后端接口响应如何&#xff0c;这些资源都具备后&#xff0…

Y3编辑器文档4:触发器1(界面及使用简介、变量作用域、入门案例)

文章目录 一、触发器简介1.1 触发器界面1.2 ECA语句编辑及快捷键1.3 参数设置1.4 变量设置1.5 实体触发器1.6 触发器复用 二、触发器的多层结构2.1 子触发器&#xff08;在游戏内对新的事件进行注册&#xff09;2.2 触发器变量作用域 三、入门案例3.1 使用触发器实现瞬间移动3.…

3D相框案例讲解(详细)

前言 通过现阶段的学习&#xff0c;我们已经掌握了HTML&#xff0c;CSS和JS部分的相关知识点&#xff0c;现在让我们通过一篇案例&#xff0c;来巩固我们近期所学的知识点。 详细视频讲解戳这里 任务一 了解目标案例样式 1.1了解案例 3D相框 1.2 分析案例 首先我们看到一个…

网络安全漏洞挖掘之漏洞SSRF

SSRF简介 SSRF(Server-Side Request Forgery:服务器端请求伪造是一种由攻击者构造形成由服务端发起请求的一个安全漏洞。一般情况下&#xff0c;SSRF攻击的目标是从外网无法访问的内部系统。&#xff08;正是因为它是由服务端发起的&#xff0c;所以它能够请求到与它相连而与外…

简单的Java小项目

学生选课系统 在控制台输入输出信息&#xff1a; 在eclipse上面的超级简单文件结构&#xff1a; Main.java package experiment_4;import java.util.*; import java.io.*;public class Main {public static List<Course> courseList new ArrayList<>();publi…

VPN模式

拓扑结构 实验图&#xff1a; 路由器router 配置 DHCP配置 需要右键激活 路由器项配置网关 dns项配置ip DNS服务配置 正向区域 选择不允许动态更新 反向区域 创建主机 正向 验证是否创建成功 反向查找区域 输入网段 使用默认名称---不允许动态更新 KALI机的验证 web服务…

IS-IS协议

IS-IS协议介绍 IS-IS&#xff08;Intermediate System to Intermediate System&#xff09;协议是一种链路状态的内部网关协议&#xff08;IGP&#xff09;&#xff0c;用于在同一个自治系统&#xff08;Autonomous System, AS&#xff09;内部的路由器之间交换路由信息。IS-I…

Git-基础操作命令

目录 Git基础操作命令 case *查看提交日志 log 版本回退 get add . Git基础操作命令 我们创建并且初始化这个仓库以后&#xff0c;我们就要在里面进行操作。 Git 对于文件的增删改查存在几个状态&#xff0c;这些修改状态会随着我们执行Git的命令而发生变化。 untracked、…