RabbitMQ、RabbitMQ发布/订阅模式

1.RabbiMQ

RabbitMQ是一个消息中间件
MQ的基本结构
在这里插入图片描述

1.1RabitMQ安装

参考:Docker安装
Docker中部署RabbitMQ

2.入门案例

2.1.publisher实现

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

2.2.consumer实现

代码实现:

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

2.3.总结

基本消息队列的消息发送流程

  1. 建立connection

  2. 创建channel

  3. 利用channel声明队列

  4. 利用channel向队列发送消息

基本消息队列的消息接收流程

  1. 建立connection

  2. 创建channel

  3. 利用channel声明队列

  4. 定义consumer的消费行为handleDelivery()

  5. 利用channel将消费者与队列绑定

3.发布/订阅

发布订阅的模型如图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oJCfKz9C-1683447126021)(assets/image-20210717165309625.png)]

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.1用bean实例实现Fanout类型的发布/订阅模型

导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.1.1在consumer中创建一个类,声明队列和交换机:

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("huang.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

3.1.2.消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {
    // 队列名称
    String exchangeName = "huang.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

3.1.3.消息接收

在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

3.2.基于注解实现Direct类型的发布/订阅模式

3.1.1.consumer注解来声明队列和交换机

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

3.1.2.消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testSendDirectExchange() {
    // 交换机名称
    String exchangeName = "itcast.direct";
    // 消息
    String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/18618.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

算法记录 | Day53 动态规划

1143.最长公共子序列 思路&#xff1a; 本题和动态规划&#xff1a;718. 最长重复子数组 (opens new window)区别在于这里不要求是连续的了&#xff0c;但要有相对顺序&#xff0c;即&#xff1a;“ace” 是 “abcde” 的子序列&#xff0c;但 “aec” 不是 “abcde” 的子序…

力扣sql中等篇练习(十六)

力扣sql中等篇练习(十六) 1 不同性别每日分数统计 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出 a 示例输入 b 示例输出 1.2 示例sql语句 # 分数是往后累加的 SELECT s2.gender,s2.day,sum(s1.score_points) total FROM Scores s1 CROSS JOIN Scores s2 ON s2.gen…

细谈抽象类

目录 抽象类 1.抽象类是被abstract修饰的类 2.抽象类中的抽象方法 3.抽象类中可以有和普通类一样的成员变量和成员方法 4.抽象类不能被实例化 5.那么抽象类不能被实例化要它有何用&#xff1f;&#xff1f;&#xff1f; 6.注意&#xff1a; 抽象类 如果一个类中没有包含足…

Web3.0介绍与产业赛道(去中心化,金融与数字资产,应用与存储,区块链技术)

文章目录 1、web3.0时代——区块链技术2、产业赛道&#xff1a;去中心化金融与数字资产3、产业赛道&#xff1a;去中心化应用与存储4、区块链&#xff1a;基础设施与区块链安全和隐私 1、web3.0时代——区块链技术 Web3.0是什么 Web3.0是指下一代互联网技术&#xff0c;它将在…

测试2:基础

目录 1.软件测试的生命周期 2.描述BUG 3.定义bug的级别 1.Blocker(崩溃) 2.Critical(严重) 3、Major&#xff08;一般&#xff09;&#xff1a; 4、Minor&#xff08;次要&#xff09;&#xff1a; 4.BUG的生命周期 1.软件测试的生命周期 需求分析,测试计划,测试设计,测…

【上进小菜猪】使用Ambari提高Hadoop集群管理和开发效率:提高大数据应用部署和管理效率的利器

&#x1f4ec;&#x1f4ec;我是上进小菜猪&#xff0c;沈工大软件工程专业&#xff0c;爱好敲代码&#xff0c;持续输出干货&#xff0c;欢迎关注。 介绍 Hadoop是一种开源的分布式处理框架&#xff0c;用于在一组低成本硬件的集群上存储和处理大规模数据集。Ambari是一种基…

【分布式搜索引擎03】

分布式搜索引擎03 11.9.数据聚合11.9.1.聚合的种类11.9.2.DSL实现聚合11.9.2.1.Bucket聚合语法11.9.2.2.聚合结果排序11.9.2.3.限定聚合范围11.9.2.4.Metric聚合语法11.9.2.5.小结 11.9.3.RestAPI实现聚合11.9.3.1.API语法11.9.3.2.业务需求11.9.3.3.业务实现 11.10.自动补全&a…

大学毕业设计使用python制作

前言&#xff1a;相信看到这篇文章的小伙伴都或多或少有一些编程基础&#xff0c;懂得一些linux的基本命令了吧&#xff0c;本篇文章将带领大家服务器如何部署一个使用django框架开发的一个网站进行云服务器端的部署。 文章使用到的的工具 Python&#xff1a;一种编程语言&…

JRebel插件热部署快速入门教程

文章目录 引入插件安装插件激活打开激活窗口激活插件 插件使用设置项目热更新热更新说明演示热更新 引入 Jrebel能够非常方便的帮助我们进行项目的热更新&#xff0c;尤其是前端也嵌在后端工程中的单体项目&#xff0c;热更新能减少一半的开发时间&#xff0c;这里我们演示一下…

TAPD使用规范

目录 https://www.bilibili.com/?spm_id_from333.788.0.0我该如何理解这段网址&#xff1f; ?spm_id_from333.788.0.0&#xff1a;表示查询字符串&#xff0c;用于向服务器传递额外的参数信息。在这个例子中&#xff0c;该查询字符串可能用于追踪网站访问来源或统计数据分析…

EndNote X9 导入知网文献 插入引用文献 方法

文章目录 1 EndNote X9 导入知网文献2 EndNote X9 插入参考文献常见问题总结3 EndNote X9 快速上手教程&#xff08;毕业论文参考文献管理器&#xff09; 1 EndNote X9 导入知网文献 下载知网参考文献引用&#xff1a; ①下载 引用&#xff1b; ②格式为 EndNote&#xff1b; 知…

JavaScript - 进阶+高级(笔记)

前言 给孩子点点关注吧&#xff01;&#x1f62d; 本篇文章主要记录以下几部分&#xff1a; 进阶&#xff1a; 作用域&#xff1b;函数进阶&#xff08;函数提升、函数参数、箭头函数&#xff09;&#xff1b;解构赋值&#xff1b;对象进阶&#xff08;构造函数、实例成员、静…

freeRTOS中使用看门狗的一点思考

关于看门狗想必各位嵌入式软件开发的朋友应该都不会陌生的。在嵌入式软件开发中&#xff0c;看门狗常被用于监测cpu的程序是否正常在运行&#xff0c;如果cpu程序运行异常会由看门狗在达到设定的阈值时触发复位&#xff0c;从而让整个cpu复位重新开始运行。 看门狗的本质是一个…

【C++初阶】类和对象(下)

一.再谈构造函数 构造函数其实分为&#xff1a; 1.函数体赋值 2.初始化列表 之前所讲到的构造函数其实都是函数体赋值&#xff0c;那么本篇文章将会具体讲述初始化列表。 初始化列表 语法 以一个冒号开始&#xff0c;接着是一个以逗号分隔的数据成员列表&#xff0c;每个"…

Kali Linux 操作系统安装详细步骤——基于 VMware 虚拟机

1. Kali 操作系统简介 Kali Linux 是一个基于 Debian 的 Linux 发行版&#xff0c;旨在进行高级渗透测试和安全审计。Kali Linux 包含数百种工具&#xff0c;适用于各种信息安全任务&#xff0c;如渗透测试&#xff0c;安全研究&#xff0c;计算机取证和逆向工程。Kali Linux 由…

【论文】SimCLS:一个简单的框架 摘要总结的对比学习(1)

SimCLS:摘要总结的对比学习(1&#xff09; 写在最前面模型框架 摘要1 简介 写在最前面 SimCLS: A Simple Framework for Contrastive Learning of Abstractive Summarization&#xff08;2021ACL会议&#xff09; https://arxiv.org/abs/2106.01890 论文&#xff1a;https://…

GIT常用操作

GIT基本使用保姆级教程 1、本地安装GIT 1.1、安装 GIT安装包获取&#xff1a;https://git-scm.com/ 具体安装流程自行百度或自行摸索 1.2、配置信息 安装完成后运行git程序&#xff0c;大打开git bash界面&#xff0c;然后输入以下命令&#xff0c;设置全局用户名与全局邮…

软件设计师笔记--数据结构

文章目录 前言学习资料数据结构大 O 表示法时间复杂度线性结构和线性表线性表的顺序存储线性表的链式存储栈的顺序存储栈的链式存储队列的顺序存储与循环队列 串KMP 数组矩阵树二叉树二叉树的顺序存储结构二叉树的链式存储结构二叉树的遍历平衡二叉树二叉排序树最优二叉树(哈夫…

ZZS-7系列分闸、合闸、电源监视综合控制装置ZZS-7/1 ac220v

ZZS-7系列分闸、合闸、电源监视综合控制装置 系列型号&#xff1a; ZZS-7/1分闸、合闸、电源监视综合控制装置 ZZS-7/11分闸、合闸、电源监视综合控制装置 ZZS-7/12分闸、合闸、电源监视综合控制装置 ZZS-7/13分闸、合闸、电源监视综合控制装置 ZZS-7/14分闸、合闸、电源…

分享111个Java源码,总有一款适合您

分享111个Java源码&#xff0c;总有一款适合您 源码下载链接&#xff1a;https://pan.baidu.com/s/1fycjYHA7y6r-IH8H7v5XKA?pwdag8l 提取码&#xff1a;ag8l ​ Druid v1.2.15 OpenJDK Java开发环境 v21.5 Diboot轻代码开发平台 v2.8.0 blockj 基础区块链&#xff08;联…