服务异步通讯---RabbitMQ实用篇

目录

一、初识MQ

一、同步调用

1、同步通讯和异步通讯

2、同步调用的问题

3.同步调用总结

 二、异步调用

1、优势: ​编辑

2、异步总结 

二、什么是MQ 

一、RabbitMQ快速入门

1、RabbitMQ的结构和概念

 2、常见消息模型

 2.1、基础消息队列模型

 2.2、总结

二、SpringAMQP 

一、利用SpringAMQP实现HelloWorld中的基础消息队列功能 

1.流程步骤

2、springAMQP 总结 

二、Work Queue 工作队列

​ 1、模拟WorkQueue,实现一个队列绑定多个消费者

​ 2.Work模型作用

三、发布( Publish )、订阅( Subscribe ) 

​ 1、发布订阅-Fanout Exchange

1.1、 利用SpringAMQP演示FanoutExchange的使用​编辑 

​1.2、交换机的作用是什么?

2、发布订阅-DirectExchange 

2.1、利用SpringAMQP演示DirectExchange的使用 

 2.2、描述下Direct交换机与Fanout交换机的差异?

3、发布订阅-TopicExchange 

3.1、利用SpringAMQP演示TopicExchange的使用 

​3.2、描述下Direct交换机与Topic交换机的差异?

四、消息转换器

1.测试发送Object类型消息 

2.消息转换器 

2.1、publisher服务

2.2 、consumer服务

2.3、SpringAMQP中消息的序列化和反序列化是怎么实现的?


一、初识MQ

一、同步调用

1、同步通讯和异步通讯

2、同步调用的问题

微服务间基于Feign的调用就属于同步方式,存在一些问题 

同步调用存在的问题 

3.同步调用总结

同步调用的优点: 时效性较强,可以立即得到结果

同步调用的问题:

耦合度高

性能和吞吐能力下降

有额外的资源消耗

有级联失败问题 

 二、异步调用

异步调用常见实现就是事件驱动模式 

1、优势: 

 

 

2、异步总结 

异步通信的优点:

耦合度低

吞吐量提升

故障隔离

流量削峰

异步通信的缺点:

依赖于Broker的可靠性、安全性、吞吐能力

架构复杂了,业务没有明显的流程线,不好追踪管理 

二、什么是MQ 

 MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

一、RabbitMQ快速入门

 RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

1、RabbitMQ的结构和概念

 

 2、常见消息模型

 2.1、基础消息队列模型

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

publisher:消息发布者,将消息发送到队列queue

queue:消息队列,负责接受并缓存消息

consumer:订阅队列,处理队列中的消息 

 

publisher:

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

consumer:

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}
 2.2、总结

二、SpringAMQP 

 SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

 

一、利用SpringAMQP实现HelloWorld中的基础消息队列功能 

 流程如下:

在父工程中引入spring-amqp的依赖

在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

在consumer服务中编写消费逻辑,绑定simple.queue这个队列

1.流程步骤

步骤1:引入AMQP依赖 

 步骤2:在publisher中编写测试方法,向simple.queue发送消息

 

步骤3:在consumer中编写消费逻辑,监听simple.queue 

2、springAMQP 总结 

什么是AMQP?

应用间消息通信的一种协议,与语言和平台无关。

SpringAMQP如何发送消息?

引入amqp的starter依赖

配置RabbitMQ地址

利用RabbitTemplate的convertAndSend方法

SpringAMQP如何接收消息?

引入amqp的starter依赖

配置RabbitMQ地址

定义类,添加@Component注解 类中声明方法,添加@RabbitListener注解,方法参数就是消息

注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

二、Work Queue 工作队列

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积 

 1、模拟WorkQueue,实现一个队列绑定多个消费者

 基本思路如下:

在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

在consumer服务中定义两个消息监听者,都监听simple.queue队列 消费者1每秒处理50条消息,消费者2每秒处理10条消息

 步骤1:生产者循环发送消息到simple.queue

 

步骤2:编写两个消费者,都监听simple.queue 

 

测试发现:消费条数,都还是差不多一样多。

原因:消费预取   没开始消费之前,2个消费者就已经将消息提前分配了。

 2.Work模型作用

Work模型的使用: 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

通过设置prefetch来控制消费者预取的消息数量。

三、发布( Publish )、订阅( Subscribe ) 

 发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。 常见exchange类型包括:

Fanout:广播

Direct:路由

Topic:话题

 1、发布订阅-Fanout Exchange



1.1、 利用SpringAMQP演示FanoutExchange的使用 

步骤1:在consumer服务声明Exchange、Queue、Binding 

SpringAMQP提供了声明交换机、队列、绑定关系的API,例如: 

 

 步骤2:在consumer服务声明两个消费者

 步骤3:在publisher服务发送消息到FanoutExchange

 1.2、交换机的作用是什么?

接收publisher发送的消息

将消息按照规则路由到与之绑定的队列

不能缓存消息,路由失败,消息丢失

FanoutExchange的会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

FanoutExchange

Queue

Binding

2、发布订阅-DirectExchange 

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

1、每一个Queue都与Exchange设置一个BindingKey

2、发布者发送消息时,指定消息的RoutingKey

3、Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

 

2.1、利用SpringAMQP演示DirectExchange的使用 

 步骤1:在consumer服务声明Exchange、Queue

 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2, 并利用@RabbitListener声明Exchange、Queue、RoutingKey

 

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }

步骤2:在publisher服务发送消息到DirectExchange

 @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "itcast.direct";
        // 消息
        String message = "hello, red!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }
 2.2、描述下Direct交换机与Fanout交换机的差异?

Fanout交换机将消息路由给每一个与之绑定的队列

Direct交换机根据RoutingKey判断路由给哪个队列

如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解?

@Queue

@Exchange

3、发布订阅-TopicExchange 

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。 Queue与Exchange指定BindingKey时可以使用通配符:

 #:代指0个或多个单词

*:代指一个单词

 

3.1、利用SpringAMQP演示TopicExchange的使用 

 

步骤1:在consumer服务声明Exchange、Queue 

在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2, 并利用@RabbitListener声明Exchange、Queue、RoutingKey

 

步骤2:在publisher服务发送消息到TopicExchange 

在publisher服务的SpringAmqpTest类中添加测试方法: 

 3.2、描述下Direct交换机与Topic交换机的差异?

Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割

Topic交换机与队列绑定时的bindingKey可以指定通配符

#:代表0个或多个词

*:代表1个词

四、消息转换器

1.测试发送Object类型消息 

 

2.消息转换器 
2.1、publisher服务

 

2.2 、consumer服务

 

2.3、SpringAMQP中消息的序列化和反序列化是怎么实现的?

利用MessageConverter实现的,默认是JDK的序列化 注意发送方与接收方必须使用相同的MessageConverter 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/287971.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

高效管理版本控制,Cornerstone 4 for Mac助您成为SVN专家

在软件开发和团队合作中,版本控制是一个至关重要的环节。为了帮助开发者更加高效地管理和控制代码版本,Cornerstone 4 for Mac应运而生。作为一款功能强大的SVN(Subversion)管理工具,Cornerstone 4 for Mac为Mac用户提…

IO进程线程 day2

1.使用fread、fwrite完成两个文件的拷贝 #include <stdio.h> #include <string.h> #include <stdlib.h> int main(int argc, const char *argv[]) {//判断输入是否合法if(argc>3){printf("输入不合法\n");return -1;}//定义两个文件指针&#…

低代码搭建,助力批发零售行业解决方案的快速实现

引言 随着技术的快速发展&#xff0c;低代码技术作为一种高效的业务解决方案&#xff0c;正日益在批发零售行业中展现其巨大的应用潜力。其所带来的快速搭建、灵活性和成本效益&#xff0c;对于现代批发零售业务的管理和发展具有重要意义。 本文旨在探讨低代码技术在批发零售…

努力打工的你存到钱了?2024新蓝海创业项目/适合普通人创业项目

为什么有钱人那么有钱&#xff1f;是他们够努力吗&#xff1f;有一位网友回答是这样回答的&#xff1a; “从小到大我所接触到的一切成功&#xff0c;他的基础都是努力&#xff0c;甚至于奉承时的吃得苦上苦方为人上人。但是那天我的三观出现了认知错误&#xff0c;光靠努力赚…

【番外】【Airsim in Windows WSL2-Ubuntu20.04-ROS】环境配置大全

【番外】【Airsim in Windows & WSL2-Ubuntu20.04-ROS】环境配置大全 【前言&#xff08;可省略不看&#xff09;】1.在windows上面部署好UE4AirSim联合仿真环境2.在windows上面部署wsl2系统以及在wsl2上面部署ubuntu系统3.安装好ubuntu系统之后&#xff0c;目前只能在命令…

手敲MyBatis(十四章)-解析含标签的动态SQL语句

1.前言 这一章主要的就是要解析动态标签里的Sql语句&#xff0c;然后进行条件语句的拼接&#xff0c;动态标签实现了trim和if标签&#xff0c;所以Sql节点就要加上TrimSqlNode和ifSqlNode&#xff0c;我们最终要获取Sql源&#xff0c;动态Sql语句需要一些处理&#xff0c;所以…

AI原生应用开发“三板斧”亮相WAVE SUMMIT+2023

面对AI应用创新的风口跃跃欲试&#xff0c;满脑子idea&#xff0c;却苦于缺乏技术背景&#xff0c;不得不望而却步&#xff0c;这曾是许多开发者的苦恼&#xff0c;如今正在成为过去。 12月28日&#xff0c;WAVE SUMMIT深度学习开发者大会2023在北京举办。百度AI技术生态总经理…

文件监控软件丨文件权限管理工具

文件已经成为企业最重要的资产之一。然而&#xff0c;文件的安全性和完整性经常受到威胁&#xff0c;如恶意软件感染、人为误操作、内部泄密等。 为了确保文件的安全&#xff0c;文件监控软件应运而生。本文将深入探讨文件监控软件的概念、功能、应用场景和未来发展等方面。 文…

Grafana UI 入门使用

最近项目上需要使用Grafana来做chart&#xff0c;因为server不是我在搭建&#xff0c;所以就不介绍怎么搭建grafana server&#xff0c;而是谈下怎么在UI上具体操作使用了。 DOCs 首先呢&#xff0c;贴一下官网doc的连接&#xff0c;方便查询 Grafana open source documenta…

【数据库原理】(6)关系数据库的关系操作集合

基本关系操作 关系数据操作的对象都是关系,其操作结果仍为关系,即集合式操作。关系数据库的操作可以分为两大类&#xff1a;数据查询和数据更新。这些操作都是基于数学理论&#xff0c;特别是集合理论。下面是对这些基本操作的解释和如何用不同的关系数据语言来表达这些操作的…

Objects are not valid as a React child (found: object with keys {name}).

在jsx中可以嵌套表达式&#xff0c;将表达式作为内容的一部分&#xff0c;但是要注意&#xff0c;普通对象不能作为子元素&#xff1b;但是数组&#xff0c;react元素对象是可以的 如下&#xff1a;不能将stu这个对象作为子元素放 function App() {const myCal imgStyleconst…

OSG-纹理映射(二)

2.6 Mipmap纹理映射 在一个动态的场景中&#xff0c;当一个纹理对象迅速远离视点时&#xff0c;纹理图像必须随着被投影的图像一起缩小。为了实现这种效果&#xff0c;可以通过对纹理图像进行过滤&#xff0c;适当对它进行缩小&#xff0c;以使它映射到物体的表面时不会产生抖动…

Android 串口协议

前言 本协议是 Android 应用端与主控板之间的通信协议&#xff0c;是串行通信协议。 协议要求同一时间只能有两个通讯端点在相互通讯&#xff0c;采用小端传输数据。 硬件层基于RS485协议&#xff0c;采取半双工&#xff0c;一主多从的通讯模式。Android定义为主机&#xff0c…

DataGear 4.7.0 发布,数据可视化分析平台

DataGear 4.7.0 发布&#xff0c;严重漏洞和BUG修复&#xff0c;具体更新内容如下&#xff1a; 新增&#xff1a;HTTP数据集新增【编码请求地址】支持&#xff0c;可用于解决请求地址中文乱码问题&#xff1b;新增&#xff1a;新增数据源密码加密存储支持&#xff08;开启需设…

怎么有效利用HTTPS协议

HTTPS的发展史可以追溯到早期的互联网时代&#xff0c;当时HTTP协议被广泛使用&#xff0c;但由于通信过程是明文的&#xff0c;导致用户的敏感信息容易被截取和窃取。为了解决这个问题&#xff0c;HTTPS协议应运而生。 HTTPS是在HTTP协议的基础上加入了传输层安全协议&#x…

深挖小白必会指针笔试题<一>

目录 引言 关键解决办法&#xff1a; 学会画图确定指向关系 例题一&#xff1a; 画图分析&#xff1a; 例题二&#xff1a; 画图分析&#xff1a; 例题三&#xff1a; 注&#xff1a;%x是按十六进制打印 画图分析&#xff1a; 例题四&#xff1a; 画图分析&…

基于Java+SpringMvc+Vue求职招聘系统详细设计实现

基于JavaSpringMvcVue求职招聘系统详细设计实现 &#x1f345; 作者主页 专业程序开发 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; 文章目录 基于JavaSpringMvcVue求职招聘系统详细设计实现一、前言介…

众和策略:今日,有“天地板”,也有“地天板”

今日早盘&#xff0c;A股持续坚持弱势震动&#xff0c;两市成交有进一步萎缩的趋势。 盘面上&#xff0c;煤炭、传媒娱乐、旅行、房地产等板块相对活泼&#xff0c;混合实际、PEEK材料、苹果概念、华为汽车等板块跌幅居前。 个股方面&#xff0c;神马电力连续5日涨停&#xf…

react useEffect 内存泄漏

componentWillUnmount() {this.setState (state, callback) > {return;};// 清除reactionthis.reaction();}useEffect 使用AbortController useEffect(() > { let abortController new AbortController(); // your async action is here return () > { abortCo…

TCP/IP的网络层(即IP层)之IP地址和网络掩码,在视频监控系统中的配置和应用

在给客户讲解我们的AS-V1000视频监控平台的时候&#xff0c;有的客户经常会配置错误IP地址的掩码和网关&#xff0c;导致出现一些网路问题。而在视频监控系统中&#xff0c;IP地址和子网掩码是用于标识网络中设备的重要标识符。IP地址被用来唯一地标识一个网络设备&#xff0c;…