SpringBoot3整合RabbitMQ之四_发布订阅模型中的fanout模型

SpringBoot3整合RabbitMQ之四_发布订阅模型中的fanout模型

文章目录

  • SpringBoot3整合RabbitMQ之四_发布订阅模型中的fanout模型
  • 3. 发布/订阅模型之fanout模型
    • 1. 说明
    • 1. 消息发布者
      • 1. 创建工作队列的配置类
      • 2. 发布消费Controller
    • 2. 消息消费者One
    • 3. 消息消费者Two
    • 4. 消息消费者Three
    • 5. 输出结果

3. 发布/订阅模型之fanout模型

在这里插入图片描述

1. 说明

RabbitMQ广播(Fnaout)模型,也称为发布/订阅模型,是一种消息传递模式,用于将消息发送给多个消费者。在广播模型中,生产者发送消息到一个交换机(Exchange),而交换机将消息广播给所有绑定到它上面的队列(Queue),每个队列都有一个消费者监听并处理消息。

下面是 RabbitMQ 广播模型的基本原理和步骤:

  1. 交换机(Exchange): 生产者将消息发送到交换机。交换机负责将消息路由到一个或多个与之绑定的队列。
  2. 队列(Queue): 每个消费者都有一个独立的队列。队列存储交换机发送的消息,并将其提供给消费者。
  3. 绑定(Binding): 将队列与交换机进行绑定,指定一个或多个交换机将消息发送到该队列。
  4. 消费者(Consumer): 消费者监听队列,并处理收到的消息。

在广播模型中,有两种类型的交换机可供选择:

  • Fanout Exchange(扇出交换机): 扇出交换机会将消息广播到绑定到它上面的所有队列,无视消息的路由键(Routing Key)。
  • Headers Exchange(头交换机): 头交换机根据消息的 header 信息来进行匹配,并将消息发送到与 header 匹配的队列。

下面是使用 RabbitMQ 广播模型的基本步骤:

  1. 创建交换机,指定交换机类型为 Fanout Exchange。
  2. 创建队列,并将其绑定到交换机上。
  3. 生产者发送消息到交换机。
  4. 消费者监听队列,接收并处理消息。

使用 RabbitMQ 广播模型可以实现消息的一对多传递,适用于需要向多个消费者发送相同消息的场景,比如日志系统、通知系统等。

1. 消息发布者

1. 创建工作队列的配置类

package com.happy.msg.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * <p>
 *
 * @Description: 工作队列模型_创建名称为 work_queue 的队列 <br>
 * </p>
 * @Datetime: 2024/3/27 18:18
 * @Author: Yuan · JinSheng <br>
 * @Since 2024/3/27 18:18
 */
@Configuration
public class WorkQueueConfig {
    @Bean
    Queue workQueue() {
        return QueueBuilder.durable("work_queue").build();
    }
}

2. 发布消费Controller

package com.happy.msg.publisher;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * <p>
 *
 * @Description: 生产消息的控制器 <br>
 * </p>
 * @Datetime: 2024/3/27 10:53
 * @Author: Yuan · JinSheng <br>
 * @Since 2024/3/27 10:53
 */
@RestController
@RequestMapping("/work")
public class WorkQueuePublisherController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String sentMessage() {
        for (int i = 1; i <=10 ; i++) {
            rabbitTemplate.convertAndSend("work_queue", "work_queue队列第["+i+"]条消息,hello,rabbitmq"+i);
        }
        return "发送成功";
    }
}

2. 消息消费者One

package com.happy.msg.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
/**
 * <p>
 * @Description: 工作队列模型_消息消费者一 <br>
 * </p>
 * @Datetime: 2024/3/28 20:28
 * @Author: Yuan · JinSheng <br>
 * @Since 2024/3/28 20:28
 */
@Slf4j
@Component
public class WorkQueueConsumerOne {

    /***
     * @param message 消息
     * @Description: 监听work_queue队列中的消息,当客户端启动后,work_queue队列中的所有的消息都被此消费者消费并打印
     * @Author: Yuan · JinSheng
     */

    @RabbitListener(queues = "work_queue")
    public void msg(Message message){
        byte[] messageBody = message.getBody();
        String msg = new String(messageBody);
        log.info("WorkQueueConsumerOne接收到work_queue队列中的消息==={},===接收时间==={}",msg, LocalDateTime.now());
    }
}

  1. 输出结果
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[1]条消息,hello,rabbitmq1,===接收时间===2024-03-29T10:34:34.468074100
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[3]条消息,hello,rabbitmq3,===接收时间===2024-03-29T10:34:34.469081600
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[5]条消息,hello,rabbitmq5,===接收时间===2024-03-29T10:34:34.469976
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[7]条消息,hello,rabbitmq7,===接收时间===2024-03-29T10:34:34.469976
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[9]条消息,hello,rabbitmq9,===接收时间===2024-03-29T10:34:34.470811800

3. 消息消费者Two

package com.happy.msg.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
/**
 * <p>
 * @Description: 工作队列模型_消息消费者二<br>
 * </p>
 * @Datetime: 2024/3/28 20:30
 * @Author: Yuan · JinSheng <br>
 * @Since 2024/3/28 20:30
 */
@Slf4j
@Component
public class WorkQueueConsumerTwo {

    /***
     * @param message 消息
     * @Description: 监听work_queue队列中的消息,当客户端启动后,work_queue队列中的所有的消息都被此消费者消费并打印
     * @Author: Yuan · JinSheng
     */

    @RabbitListener(queues = "work_queue")
    public void msg(Message message){
        byte[] messageBody = message.getBody();
        String msg = new String(messageBody);
        log.info("WorkQueueConsumerTwo接收到work_queue队列中的消息==={},===接收时间==={}",msg, LocalDateTime.now());
    }
}

4. 消息消费者Three

package com.happy.msg.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
/**
 * <p>
 * @Description: 工作队列模型_消息消费者三<br>
 * </p>
 * @Datetime: 2024/3/28 20:30
 * @Author: Yuan · JinSheng <br>
 * @Since 2024/3/28 20:30
 */
@Slf4j
@Component
public class WorkQueueConsumerTwo {

    /***
     * @param message 消息
     * @Description: 监听work_queue队列中的消息,当客户端启动后,work_queue队列中的所有的消息都被此消费者消费并打印
     * @Author: Yuan · JinSheng
     */

    @RabbitListener(queues = "work_queue")
    public void msg(Message message){
        byte[] messageBody = message.getBody();
        String msg = new String(messageBody);
        //log.info("WorkQueueConsumerTwo接收到work_queue队列中的消息==={},===接收时间==={}",msg, LocalDateTime.now());
        System.out.println("WorkQueueConsumerTwo接收到work_queue队列中的消息==="+msg+",===接收时间==="+LocalDateTime.now());
    }
}

5. 输出结果

可看到消息被三个消费者按相等数量消费,总共10条消息,消费者1消费了4条,其他两个消费者消费了3条消息

WorkQueueConsumerTwo接收到work_queue队列中的消息===work_queue队列第[3]条消息,hello,rabbitmq3,===接收时间===2024-03-29T10:39:32.942546200
WorkQueueConsumerThree接收到work_queue队列中的消息===work_queue队列第[2]条消息,hello,rabbitmq2,===接收时间===2024-03-29T10:39:32.942546200
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[1]条消息,hello,rabbitmq1,===接收时间===2024-03-29T10:39:32.942037700
WorkQueueConsumerTwo接收到work_queue队列中的消息===work_queue队列第[6]条消息,hello,rabbitmq6,===接收时间===2024-03-29T10:39:32.943806300
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[4]条消息,hello,rabbitmq4,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerTwo接收到work_queue队列中的消息===work_queue队列第[9]条消息,hello,rabbitmq9,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerThree接收到work_queue队列中的消息===work_queue队列第[5]条消息,hello,rabbitmq5,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[7]条消息,hello,rabbitmq7,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerOne接收到work_queue队列中的消息===work_queue队列第[10]条消息,hello,rabbitmq10,===接收时间===2024-03-29T10:39:32.944336900
WorkQueueConsumerThree接收到work_queue队列中的消息===work_queue队列第[8]条消息,hello,rabbitmq8,===接收时间===2024-03-29T10:39:32.944336900

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/521744.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

实际项目中如何使用Git做分支管理

前言 Git是一种强大的分布式版本控制系统&#xff0c;在实际项目开发中使用Git进行分支管理是非常常见的做法&#xff0c;因为它可以帮助团队高效的协作和管理项目的不同版本&#xff0c;今天我们来讲讲在实际项目中最常用的Git分支管理策略Git Flow。 常见的Git分支管理策略…

IDEA2024.1版本震撼来袭,手把手教你激活!

前言 作为一个Java程序猿&#xff0c;必不可少的一款开发IDE神器&#xff1a;IntelliJ IDEA&#xff0c;简称“IDEA”。就在前天&#xff08;2024.4.4&#xff09;终于推出了心心念念的2024.1版本。 IntelliJ IDEA 2024.1 引入了一系列令人期待的升级&#xff0c;可以帮助您简…

Nuxt 3 项目中配置 Tailwind CSS

官方文档&#xff1a;https://www.tailwindcss.cn/docs/guides/nuxtjs#standard 安装 Tailwind CSS 及其相关依赖 执行如下命令&#xff0c;在 Nuxt 项目中安装 Tailwind CSS 及其相关依赖 npm install -D tailwindcss postcss autoprefixerpnpm install -D tailwindcss post…

深度剖析扫雷游戏的各个知识点(1)

哈喽&#xff0c;小伙伴&#xff0c;大家好&#xff0c;今天我来水一篇文章。害&#xff0c;也不算真的水吧&#xff0c;这次带大家深度剖析初次写扫雷游戏程序时还未接触到的知识点。废话不多说&#xff0c;直接进入正题 不知小伙伴们是否还记得当时我说过扫雷游戏我们是以多个…

AIGC实战——ProGAN(Progressive Growing Generative Adversarial Network)

AIGC实战——ProGAN 0. 前言1. ProGAN2. 渐进式训练3. 其他技术3.1 小批标准差3.2 均等学习率3.3 逐像素归一化 4. 图像生成小结系列链接 0. 前言 我们已经学习了使用生成对抗网络 (Generative Adversarial Network, GAN) 解决各种图像生成任务。GAN 的模型架构和训练过程具有…

2023 年网络安全热点技术发展态势

文章目录 前言一、人工智能信息技术迎来井喷式发展期二、零信任网络安全架构即将投入实际部署三、美国全面推动军政业务向云环境迁移四、专用太空软硬件与独立卫星网络并行发展五、量子信息技术与网络安全领域加速融合前言 在 2023 年取得进展的信息技术不在少数。从网络安全的…

从300亿分子中筛出6款,结构新且易合成,斯坦福抗生素设计AI模型登Nature子刊

ChatGPT狂飙160天&#xff0c;世界已经不是之前的样子。 新建了免费的人工智能中文站https://ai.weoknow.com 新建了收费的人工智能中文站https://ai.hzytsoft.cn/ 更多资源欢迎关注 全球每年有近 500 万人死于抗生素耐药性&#xff0c;因此迫切需要新的方法来对抗耐药菌株。 …

HTML5.Canvas简介

1. Canvas.getContext getContext(“2d”)是Canvas元素的方法&#xff0c;用于获取一个用于绘制2D图形的绘图上下文对象。在给定的代码中&#xff0c;首先通过getElementById方法获取id为"myCanvas"的Canvas元素&#xff0c;然后使用getContext(“2d”)方法获取该Ca…

音视频开发之旅(83)- 腾讯音乐开源高质量唇形同步模型--MuseTalk

目录 1.效果展示 2.原理学习 3.流程分析 4.资料 一、效果展示 -- &#xff08;推理素材来源于网络&#xff0c;如有侵权&#xff0c;联系立删&#xff01;&#xff09; 唱歌效果&#xff08;歌曲有suno生成&#xff09; 用于推理的视频素材来源于网络&#xff0c;如有侵权&…

Java中常见的排序算法

常见算法可以分为两大类&#xff1a;   非线性时间比较类排序&#xff1a;通过比较来决定元素间的相对次序&#xff0c;由于其时间复杂度不能突破O(nlogn)&#xff0c;因此称为非线性时间比较类排序。   线性时间非比较类排序&#xff1a;不通过比较来决定元素间的相对次序…

Windows应急响应

1.排查隐藏账号 查看注册表 找到攻击者用户目录文件 排查用户异常 eventvwr.msc 分析用户登录日志 排查可疑端口 排查可疑进程 检查启动项、计划任务和服务 查看系统补丁信息 安装火绒&#xff0c;在安全工具里有火绒剑 计划任务 使用D盾对主机进行检测&#xff0c;发现隐藏账户…

python自定义库的打包和安装

要将自定义库安装到python的三方包地址site-packages中&#xff0c;除了可以直接的复制之外&#xff0c;更为合理科学的方法是通过build和install的方式进行。因为直接复制仅仅作为一种临时的简单的方法&#xff0c;而且只能针对源码进行&#xff0c;也不好进行科学管理&#x…

AJAX —— 学习(三)(完结)

目录 一、jQuery 中的 AJAX &#xff08;一&#xff09;get 方法 1.语法介绍 2.结果实现 &#xff08;二&#xff09;post 方法 1.语法介绍 2.结果实现 &#xff08;三&#xff09;通用型的 AJAX 方法 1.语法介绍 2.结果实现 二、AJAX 工具库 axios &#xff08…

简历复印--原型模式

1.1 夸张的简历 简历的打印。"对编程来说&#xff0c;简单的复制粘贴极有可能造成重复代码的灾难。我所说的意思你根本还没听懂。那就以刚才的例子&#xff0c;我出个需求你写写看&#xff0c;要求有一个简历类&#xff0c;必须要有姓名&#xff0c;可以设置性别和年龄&am…

第十四届蓝桥杯省赛大学C组(C/C++)填充

原题链接&#xff1a;填充 有一个长度为 n 的 01 串&#xff0c;其中有一些位置标记为 ?&#xff0c;这些位置上可以任意填充 0 或者 1&#xff0c;请问如何填充这些位置使得这个 01 串中出现互不重叠的 0 和 1 子串最多&#xff0c;输出子串个数。 输入格式 输入一行包含一…

SQLite 4.9的 OS 接口或“VFS”(十三)

返回&#xff1a;SQLite—系列文章目录 上一篇:SQLite字节码引擎&#xff08;十二&#xff09; 下一篇:SQLite 4.9的虚拟表机制(十四) 1. 引言 本文介绍了 SQLite OS 可移植性层或“VFS” - 模块位于 SQLite 实现堆栈底部 提供跨操作系统的可移植性。 VFS是Virtual File…

5560.树的直径

蛮不错的一道题目&#xff0c;你要利用树的性质分析出&#xff0c;你只需要维护上一次的树的直径的两个端点就好了 #include<iostream>using namespace std; using ll long long; using pii pair<int,int>; const int N 6e510; const int inf 0x3f3f3f3f; cons…

算法:树形dp(树状dp)

文章目录 一、树形DP的概念1.基本概念2.解题步骤3.树形DP数据结构 二、典型例题1.LeetCode&#xff1a;337. 打家劫舍 III1.1、定义状态转移方程1.2、参考代码 2.ACWing&#xff1a;285. 没有上司的舞会1.1、定义状态转移方程1.2、拓扑排序参考代码1.3、dfs后序遍历参考代码 一…

MySQL复制拓扑4

文章目录 主要内容一.启用GUID并配置循环复制1.其中&#xff0c;UUID用来唯一标识每一个服务器&#xff0c;事务的编号记录了在该服务器上执行的事务的顺序。使用SELECT server_uuid\G命令可以查看服务器的UUID&#xff0c;sever1的UUID值显示如下&#xff1a;代码如下&#xf…

Vue3_2024_7天【回顾上篇watch常见的后两种场景】

随笔&#xff1a;这年头工作不好找咯&#xff0c;大家有学历提升的赶快了&#xff0c;还有外出人多注意身体&#xff0c;没错我在深圳这边阳了&#xff0c;真的绝啊&#xff0c;最尴尬的还给朋友传染了&#xff01;&#xff01;&#xff01; 之前三种的监听情况&#xff0c;监听…