(一)什么是MQ
MQ(message queue)本质上是队列,满足先入先出,只不过队列中存放的内容是消息而已,那什么是消息呢? 消息可以是字符串,json也可以是一些复杂对象
我们应用场景通常是再分布式系统之间
系统通信方式
一般我们的系统调用方式有两种
一种是同步通信,也就是a--->b,直接调用对方服务,数据从a出发直接到达b
另一种叫异步通信,也就是a----->容器---->b,数据从一段出发,先进入一个容器进行临时存储,当满足条件后,由容器发给另一端,这个容器的实现就是MQ
(二)MQ的作用
MQ主要的工作是接收并发送消息,在不同场景下可以有不同的作用
这里说一些比较常见的作用
1.异步解耦
在业务流程中,有一些操作时很耗时的,并且可能阻塞其他任务,但是这些操作并不要求立即执行,我们就可以借助MQ来把这些操作异步化
2.流量削峰
我们在一些特殊时间可能会有访问量突增的情况,那我们应用一般来说是不能立即全部处理的,所以我们可以使用MQ把访问放到消息队列中,然后依次的处理这些请求
3.消息分发
当多个系统要对同一个数据做出响应时,我们可以使用MQ来进行消息分发,比如支付成功后,支付系统就可以向MQ发送消息,然后其他系统来订阅这个消息,就不需要轮询数据库了
4.延迟通知
在一些定时要进行处理的一些场景中,我们可以使用MQ的延迟消息功能,就比如在电商平台,用户下单后一定时间未支付,我们可以把限定的时间放到队列中,时间到了自动取消订单
(三)RabbitMQ的核心概念
我们这里用RabbitMQ来学习消息队列
AMQP
首先我们来看一下这个,AMQP是一种高级消息队列,定义了一套确定的消息交换功能,包含交换机和队列如何映射等,这些组件共同工作,使消息能够正常被接收和发送,AMQP还定义了一个网络协议,允许客户端和AMQP模型进行通信
我们RabbitMQ就是实现了AMQP协议的,RabbitMQ就是AMQP协议的Erlang的实现
所以他们两者的结构模型是一样的
首先我们来看一下RabbitMQ的工作流程
RabbitMQ是⼀个消息中间件,也是⼀个⽣产者消费者模型.它负责接收,存储并转发消息.
首先我们先来了解一下图中出现的一些概念
1.Producer和Consumer
Producer:生产者,是MQ的客户端,用来向RabbitMQ发送消息的
Consumer:消费者,也是MQ的客户端,是用来从RabbitMQ中取消息的
Brokeer就是RabbitMQ,用来接收和转发消息的
我们也可以简化下上面的图,只关注这三者
2.Connection和Channel
Connection:连接,是客户端和RabbitMQ服务器建立的一个TCP连接,这个连接时建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息
Channel:信道,每个建立的connection都可以由很多个信道,每个信道是一个独立的虚拟连接,消息的发送和接收都是通过channel的
注:信道的作用是为了把所有的读写操作都给复用到同一个TCP连接上,这个可以减少建立和关闭连接的开销,提高性能
3.Virtual host
Virtual host:虚拟机,这个是消息队列提供的一种逻辑上的隔离机制(本质上没有隔离),对于Rabbit一个Broker可以有多个虚拟机,当多个不同的⽤⼾使⽤同⼀个 RabbitMQ Server提供的服务时,可以虚拟划分出多个vhost,每个⽤⼾在⾃⼰的vhost创建exchange/queue等
4.queue
queue:队列,是RabbitMQ中的内部对象,是存储消息的地方
多个消费者可以订阅同一个队列
5.exchange
exchange:交换机 ,是消息到达broker中的第一站(因为虚拟机是逻辑上的本质上是不存在的),它负责接收生产者传来的消息,并根据routingkey(bingingkey)来把这些消息路由到一个或者多个queue中,exchange起到了消息路由的作用
6.RabbitMQ的工作流程
1.生产者生产消息
2.生产者与RabbitMQ建立一个连接并且开启一个信道
3.生产者声明一个交换机
4.生产者声明一个队列
5.生产者发送消息到Broker
6.Broker接收消息,并存入到相列中,如果没有相应的队列,就根据生产者的配置来进行一些处理
(四)RabbitMQ入门程序
首先我们要使用RabbitMQ我们要在服务器上进行安装,那安装就先不说了,我们可以在官方文档或者b站找到很多资料
1.引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
生产者代码:
public class Produce {
public static void main(String[] args) throws IOException, TimeoutException {
//建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("62.234.46.219");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wd");
connectionFactory.setPassword("w85315678");
connectionFactory.setVirtualHost("bit");
Connection connection = connectionFactory.newConnection();
//开启信道
Channel channel = connection.createChannel();
//声明交换机(这里使用默认交换机)
//声明队列
/**
* var1 队列名称
* var2 是否持久化
* var3 是否独占(这个队列只有一个消费者)
* var4 是否自动删除
* argument 一些高级参数
* AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;
*/
channel.queueDeclare("hello",true,false,false,null);
//发送消息
/**
* var1 交换机名称
* var2 交换机通过什么映射到queue中(内置交换机routingkey与队列名称保持一致)
* var3 属性配置
* body 消息
* void basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4)
*/
String s1="hello wd";
channel.basicPublish("","hello",null,s1.getBytes());
//释放信道
channel.close();
//释放连接
connection.close();
}
}
消费者代码:
public class consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("62.234.46.219");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wd");
connectionFactory.setPassword("w85315678");
connectionFactory.setVirtualHost("bit");
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明队列(如果队列没有才创建,有就不创建)
channel.queueDeclare("hello",true,false,false,null);
//消费消息
/**
* String basicConsume(String queue, boolean autoAck, Consumer callback)
* queue 队列名称
* autoAck是否自动确认
* callback接收消息后要执行的逻辑
*/
DefaultConsumer consumer=new DefaultConsumer(channel){
//从队列中获取到消息就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到: "+new String(body));
}
};
channel.basicConsume("hello",true,consumer);
//释放资源
channel.close();
connection.close();
}
}
消费者代码中有一个类叫Consumer
也就是我们channel.basicConsume()中的最后一个参数
这个参数是来定义消费者行为的,当我们需要从MQ中接收消息时,我们要提供一个实现了Consumer接口的对象(这里我使用的是内部类)
DefaultConsumer是RabbitMQ提供的一个默认消费者,实现了Consumer接口
核心方法就是我们上面重写的方法
那上面我们没有说参数都是什么意思,这里来说一下
consumerTag:消费者标签,通常是消费者在订阅队列时指定的
envelope:有一些封包信息,如队列名称,交换机等
properties:一些配置信息
body:消息的具体内容
(五)可能出现的错误
1.
我们上面销毁的过程中,一定要先销毁channel再销毁connection或者直接销毁connection不销毁channel
如果先销毁connection再销毁channel就会报错
2.
我们在消费者代码中,如果不自己声明队列,且MQ中没有这个队列时,我们就会报错
3.
如果端口或者ip错误,也会报错
4.账号密码不正确也会报错
5.用户对虚拟机没有操作权限会报错