RabbitMQ工作模式-发布订阅模式

Publish/Subscribe(发布订阅模式)

官方文档: https://www.rabbitmq.com/tutorials/tutorial-three-python.html

使用fanout类型类型的交换器,routingKey忽略。每个消费者定义生成一个队列关绑定到同一个Exchange,每个消费者都可以消费完整的消息。

消息广播给所有订阅该消息的消费者。

在RabbitMQ中,生产者不是将消息直接发送给消息消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。

生产者将消息发送给交换器。交换器非常简单,从生成者接收消息,将消息推送给消息队列。交换器必须清楚的知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器的类型。

在这里插入图片描述

发布订阅使用fanout的交换器,创建交换器,名称为test

channel.exchangeDeclare("test","fanout");

fanout交换器很简单,从名称就可以看出来(用风扇吹出去),将所有的收到的消息发给它的知道的所有队列。

存在一个默认的交换器。

此样例使用的是临时队列,即消费都实现将自动创建此队列,当消费都退出后,此队列也将自动删除。

队列名称如

amq.gen-gjKBgQ9PSmoj2YQGMOdPfA

样例代码

消费者1的代码:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class OneConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();

    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 声明的临时队列,名称由rabbitMQ自动生成
    String queueName = channel.queueDeclare().getQueue();
    System.out.println("临时队列的名称:" + queueName);

    // 定义交换机
    channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);

    // 消息队列与交换机的绑定
    channel.queueBind(queueName, "ex.testfan", "");

    channel.basicConsume(
        queueName,
        new DeliverCallback() {
          @Override
          public void handle(String consumerTag, Delivery message) throws IOException {
            System.out.println(
                "one 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
          }
        },
        new CancelCallback() {
          @Override
          public void handle(String consumerTag) throws IOException {}
        });
  }
}

消费者2

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class TwoConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();

    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 生成的临时队列
    String queueName = channel.queueDeclare().getQueue();
    System.out.println("临时队列的名称:" + queueName);

    // 定义交换机
    channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);

    // 消息队列与交换机的绑定
    channel.queueBind(queueName, "ex.testfan", "");

    channel.basicConsume(
        queueName,
        new DeliverCallback() {
          @Override
          public void handle(String consumerTag, Delivery message) throws IOException {
            System.out.println(
                "two 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
          }
        },
        new CancelCallback() {
          @Override
          public void handle(String consumerTag) throws IOException {}
        });
  }
}

消费者3

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class ThirdConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();

    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 生成的临时队列
    String queueName = channel.queueDeclare().getQueue();
    System.out.println("临时队列的名称:" + queueName);

    // 定义交换机
    channel.exchangeDeclare("ex.testfan", BuiltinExchangeType.FANOUT, true, false, null);

    // 消息队列与交换机的绑定
    channel.queueBind(queueName, "ex.testfan", "");

    channel.basicConsume(
        queueName,
        new DeliverCallback() {
          @Override
          public void handle(String consumerTag, Delivery message) throws IOException {
            System.out.println(
                "third 获取到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
          }
        },
        new CancelCallback() {
          @Override
          public void handle(String consumerTag) throws IOException {}
        });
  }
}

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class Product {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://root:123456@node1:5672/%2f");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    try {
      // 声明fanout类型交换机
      channel.exchangeDeclare("ex.testfan", "fanout", true, false, false, null);

      for (int i = 0; i < 20; i++) {
        channel.basicPublish(
            "ex.testfan",
            // 路由key
            "",
            // 属性
            null,
            // 信息
            ("hello world fan " + i).getBytes(StandardCharsets.UTF_8));
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    } finally {
      channel.close();
      connection.close();
    }
  }
}

观察下队列的绑定的情况:

在未启动消费都队列之前:

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]# 

在未启动消费者之前,只有看到几个默认的生产者。绑定的队列为空。

启动三个消费者:

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ ex.testfan         │ fanout  │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬────────────────────────────────┬──────────────────┬────────────────────────────────┬───────────┐
│ source_name │ source_kind │ destination_name               │ destination_kind │ routing_key                    │ arguments │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│             │ exchange    │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue            │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│             │ exchange    │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue            │ amq.gen-UG67rAw03FGbBupHX6o18g │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│             │ exchange    │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue            │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan  │ exchange    │ amq.gen-HnQLeaOB1YOEJXXfXP5_Mg │ queue            │                                │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan  │ exchange    │ amq.gen-UG67rAw03FGbBupHX6o18g │ queue            │                                │           │
├─────────────┼─────────────┼────────────────────────────────┼──────────────────┼────────────────────────────────┼───────────┤
│ ex.testfan  │ exchange    │ amq.gen-VbV63vwAn0IBzC7n6I--vQ │ queue            │                                │           │
└─────────────┴─────────────┴────────────────────────────────┴──────────────────┴────────────────────────────────┴───────────┘
[root@nullnull-os ~]# 

当启动生产者后,可以发现已经产生了3个默认的交换机及队列的绑定关系。以及手动绑定的3个队列的关系。

启动生产者,查看消费情况:

消费者1

临时队列的名称:amq.gen-VbV63vwAn0IBzC7n6I--vQ
one 获取到的消息:hello world fan 0
one 获取到的消息:hello world fan 1
one 获取到的消息:hello world fan 2
one 获取到的消息:hello world fan 3
one 获取到的消息:hello world fan 4
one 获取到的消息:hello world fan 5
one 获取到的消息:hello world fan 6
one 获取到的消息:hello world fan 7
one 获取到的消息:hello world fan 8
one 获取到的消息:hello world fan 9
one 获取到的消息:hello world fan 10
one 获取到的消息:hello world fan 11
one 获取到的消息:hello world fan 12
one 获取到的消息:hello world fan 13
one 获取到的消息:hello world fan 14
one 获取到的消息:hello world fan 15
one 获取到的消息:hello world fan 16
one 获取到的消息:hello world fan 17
one 获取到的消息:hello world fan 18
one 获取到的消息:hello world fan 19

消费者2:

临时队列的名称:amq.gen-KadV2OsCRLb84p2k_ijuww
two 获取到的消息:hello world fan 0
two 获取到的消息:hello world fan 1
two 获取到的消息:hello world fan 2
two 获取到的消息:hello world fan 3
two 获取到的消息:hello world fan 4
two 获取到的消息:hello world fan 5
two 获取到的消息:hello world fan 6
two 获取到的消息:hello world fan 7
two 获取到的消息:hello world fan 8
two 获取到的消息:hello world fan 9
two 获取到的消息:hello world fan 10
two 获取到的消息:hello world fan 11
two 获取到的消息:hello world fan 12
two 获取到的消息:hello world fan 13
two 获取到的消息:hello world fan 14
two 获取到的消息:hello world fan 15
two 获取到的消息:hello world fan 16
two 获取到的消息:hello world fan 17
two 获取到的消息:hello world fan 18
two 获取到的消息:hello world fan 19

消息者3:

临时队列的名称:amq.gen-TcqXVnoS2mjOpfCw1o1CZw
third 获取到的消息:hello world fan 0
third 获取到的消息:hello world fan 1
third 获取到的消息:hello world fan 2
third 获取到的消息:hello world fan 3
third 获取到的消息:hello world fan 4
third 获取到的消息:hello world fan 5
third 获取到的消息:hello world fan 6
third 获取到的消息:hello world fan 7
third 获取到的消息:hello world fan 8
third 获取到的消息:hello world fan 9
third 获取到的消息:hello world fan 10
third 获取到的消息:hello world fan 11
third 获取到的消息:hello world fan 12
third 获取到的消息:hello world fan 13
third 获取到的消息:hello world fan 14
third 获取到的消息:hello world fan 15
third 获取到的消息:hello world fan 16
third 获取到的消息:hello world fan 17
third 获取到的消息:hello world fan 18
third 获取到的消息:hello world fan 19

再停止几个消费者查看队列信息

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ ex.testfan         │ fanout  │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
[root@nullnull-os ~]# 

可以看到,当客户端退出之后,临时队列也就消失了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/101886.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

某人事系统架构搭建设计记录

首发博客地址 https://blog.zysicyj.top/ 先大致列一下基础情况 架构必须是微服务 场景上涉及大量查询操作&#xff0c;分析操作 存在临时大量写入的场景 并发并不高 对高可用要求较高&#xff0c;不能挂掉 对安全要求高 要能过等保测试等三方测试 使用人数并不多&#xff0c;十…

SpringBoot-学习笔记(基础)

文章目录 1. 概念1.1 SpringBoot快速入门1.2 SpringBoot和Spring对比1.3 pom文件坐标介绍1.4 引导类1.5 修改配置1.6 读取配置1.6.1 读取配置信息1.6.2 读取配置信息并创建类进行封装 1.7 整合第三方技术1.7.1 整合JUnit1.7.1 整合Mybatis1.7.1 整合Mybatis-Plus1.7.1 整合Drui…

SpringCloudAlibaba Gateway(三)-整合Sentinel功能路由维度、API维度进行流控

Gateway整合Sentinel ​ 前面使用过Sentinel组件对服务提供者、服务消费者进行流控、限流等操作。除此之外&#xff0c;Sentinel还支持对Gateway、Zuul等主流网关进行限流。 ​ 自sentinel1.6.0版开始&#xff0c;Sentinel提供了Gateway的适配模块&#xff0c;能针对路由(rou…

ARM编程模型-寄存器组

Cortex A系列ARM处理器共有40个32位寄存器,其中33个为通用寄存器,7个为状态寄存器。usr模式和sys模式共用同一组寄存器。 通用寄存器包括R0~R15,可以分为3类: 未分组寄存器R0~R7分组寄存器R8~R14、R13(SP) 、R14(LR)程序计数器PC(R15)、R8_fiq-R12_fir为快中断独有 在不同模…

Go的数据结构-hashmap

开放寻址法和拉链法 runtime.hamp bucket的数据结构 bucket的指针指向这里 map初始化&#xff1a;make 和字面量 make初始化 新建一个hamp结尾体&#xff0c;计算大B&#xff0c;创建一个桶数组 字面量初始化 map的并发解决 sync.map

leetcode622-设计循环队列

本题重点&#xff1a; 1. 选择合适的数据结构 2. 针对选择的数据结构判断“空”和“满” 这两点是不分先后次序的&#xff0c;在思考时应该被综合起来。事实上&#xff0c;无论我们选择链表还是数组&#xff0c;最终都能实现题中描述的“循环队列”的功能&#xff0c;只不过…

HTTP协议概述

HTTP 协议定义 HTTP协议&#xff0c;直译为超文本传输协议&#xff0c;是一种用于分布式、协作、超媒体的信息系统的应用协议。HTTP协议是万维网数据通信的基础。HTTP协议在客户端-服务器计算模型中充当请求-响应协议。客户端向服务器提交HTTP请求消息。服务器提供HTML文件和其…

在springboot项目中显示Services面板的方法

文章目录 前言方法一&#xff1a;Alt8快捷键方法二&#xff1a;使用Component标签总结 前言 在一个springboot项目中&#xff0c;通过开启Services面板&#xff0c;可以快速的启动、配置、管理多个子项目。 方法一&#xff1a;Alt8快捷键 1、在idea界面输入Alt8&#xff0c;在…

IP网络广播系统有哪些优点

IP网络广播系统有哪些优点 IP网络广播系统有哪些优点&#xff1f; IP网络广播系统是基于 TCP/IP 协议的公共广播系统&#xff0c;采用 IP 局域网或 广域网作为数据传输平台&#xff0c;扩展了公共广播系统的应用范围。随着局域网络和 网络的发展 , 使网络广播的普及变为可能 …

从零开始探索C语言(四)----循环

文章目录 1. C 循环1.1 while 循环1.2 for 循环1.3 do...1.4 嵌套循环 2. 循环控制语句2.1 break 语句2.2 continue 语句2.3 goto 语句 1. C 循环 有的时候&#xff0c;我们可能需要多次执行同一块代码。一般情况下&#xff0c;语句是按顺序执行的&#xff1a;函数中的第一个语…

ELK安装、部署、调试(五)filebeat的安装与配置

1.介绍 logstash 也可以收集日志&#xff0c;但是数据量大时太消耗系统新能。而filebeat是轻量级的&#xff0c;占用系统资源极少。 Filebeat 由两个主要组件组成&#xff1a;harvester 和 prospector。 采集器 harvester 的主要职责是读取单个文件的内容。读取每个文件&…

软件测试Day5|软件测试理论03

白盒测试方法 针对程序的代码进行测试&#xff0c;代码覆盖率高&#xff1b;缺点&#xff1a;覆盖所有代码路径大、业务功能可能覆盖不全、测试开销大 静态方法&#xff1a;1&#xff09;桌面检查&#xff08;一个人检查&#xff09;&#xff1b;2&#xff09;代码审查&#…

链表OJ练习(2)

一、分割链表 题目介绍&#xff1a; 思路&#xff1a;创建两个链表&#xff0c;ghead尾插大于x的节点&#xff0c;lhead尾插小于x的节点。先遍历链表。最后将ghead尾插到lhead后面&#xff0c;将大小链表链接。 我们需要在创建两个链表指针&#xff0c;指向两个链表的头节点&…

WebAssembly 在云原生中的实践指南

1 WebAssembly 介绍 WebAssembly&#xff08;Wasm&#xff09;是一种通用字节码技术&#xff0c;它可以将其他编程语言&#xff08;如 Go、Rust、C/C 等&#xff09;的程序代码编译为可在浏览器环境直接执行的字节码程序。 WebAssembly 的初衷之一是解决 JavaScript 的性能问…

Nginx基础+高级(2022版):待更新

1. 文章说明 说明&#xff1a;目前讲的是第一部分nginx核心技术篇&#xff0c;后需篇章会以第一部分为核心技术篇为基础来展开深度讲解&#xff0c;详情关注后续课程的发布。 2. 介绍和准备环境 2.1 介绍 Nginx (engine x) 是一个高性能的HTTP和反向代理web服务器&#xf…

【QT】使用qml的QtWebEngine遇到的一些问题总结

在使用qt官方的一些QML的QtWebEngine相关的例程的时候&#xff0c;有时在运行会报如下错误&#xff1a; WebEngineContext used before QtWebEngine::initialize() or OpenGL context creation failed 这个问题在main函数里面最前面加上&#xff1a; QCoreApplication::setAttr…

元素居中的方法总结

目录 垂直居中 行内元素垂直居中 单行文本垂直居中 1.line-height: 200px; 多行文本垂直居中 1.tablevertical-align:middle 块级元素垂直居中 1.display: flex;align-items: center; 2.使用position top margin-top 水平居中 行内元素水平居中 1.text-align:cente…

CUDA小白 - NPP(3) 图像处理 Color and Sampling Conversion

cuda小白 原始API链接 NPP GPU架构近些年也有不少的变化&#xff0c;具体的可以参考别的博主的介绍&#xff0c;都比较详细。还有一些cuda中的专有名词的含义&#xff0c;可以参考《详解CUDA的Context、Stream、Warp、SM、SP、Kernel、Block、Grid》 常见的NppStatus&#xf…

说说TIME_WAIT和CLOSE_WAIT区别

分析&回答 TCP协议规定&#xff0c;对于已经建立的连接&#xff0c;网络双方要进行四次握手才能成功断开连接&#xff0c;如果缺少了其中某个步骤&#xff0c;将会使连接处于假死状态&#xff0c;连接本身占用的资源不会被释放。网络服务器程序要同时管理大量连接&#xf…

【ES6】—类与继承

一、 定义类 class People {constructor (name, age) {this.name namethis.age age}showName () {console.log(this.name)} } let p1 new People(xiaoxiao, 30) console.log(p1) // People {name: xiaoxiao, age: 30}小节&#xff1a; 使用class关键字声明类使用construc…