目录
1. 简单模式
2. Work Queues(作队列)
3. Publish/Subscribe(发布/订阅)
4. Routing(路由模式)
5. Topics(通配符模式)
6. RPC(RPC通信)
7. Publisher Confirms(发布确认)
7.1Publishing Messages Individually(单独确认)
7.2 Publishing Messages in Batches(批量确认)
7.3 Handling Publisher Confirms Asynchronously(异步确认)
上一篇文章中, 我们简单介绍了RabbitMQ 7种工作模式:
【RabbitMQ】RabbitMQ 的七种工作模式介绍-CSDN博客
在这篇文章中, 将会对这7种工作模式进行代码演示
这篇文章代码中用到的常量:
public class Constants {
public static final String HOST = "8.130.35.237";
public static final Integer PORT = 5672;
public static final String USER_NAME = "study";
public static final String PASSWORD = "study";
public static final String VIRTUAL_HOST = "test";
//工作队列模式
public static final String WORK_QUEUE = "work.queue";
//发布订阅模式
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
//路由模式
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
//通配符模式
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
//rpc模式
public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
//publisher confirms
public static final String PUBLISH_CONFIRMS_QUEUE1 = "publish.confirms.queue1";
public static final String PUBLISH_CONFIRMS_QUEUE2 = "publish.confirms.queue2";
public static final String PUBLISH_CONFIRMS_QUEUE3 = "publish.confirms.queue3";
}
咱们在前面学习了简单模式的写法, 接下来学习另外几种工作模式的写法
1. 简单模式
在第一篇文章中的入门程序就是简单模式. 此处就省略啦~~
第一篇文章的地址:【RabbitMQ】RabbitMQ 的概念以及使用RabbitMQ编写生产者消费者代码-CSDN博客
2. Work Queues(作队列)
简单模式的增强版, 和简单模式的区别就是: 简单模式有一个消费者, 工作队列模式支持多个消费者接收消息, 消费者之间是竞争关系, 每个消息只能被一个消费者接收
编写生产者代码
工作队列模式和简单模式区别是有多个消费者, 所以生产者消费者代码差异不大
相比简单模式, 生产者的代码基本一样, 为了能看到多个消费者竞争的关系, 我们一次发送10条消息
我们把发送消息的地方, 改为一次发送10条消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.声明队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* 参数说明:
* queue:队列名称
* durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
* exclusive:是否独占,只能有⼀个消费者监听队列
* autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
* arguments 参数
*/
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
//4.发送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 参数说明
* exchange: 交换机名称, 简单模式下, 交换机会使用默认的""
* routingKey: 内置交换机, routingKey和队列名称保持一致
* props: 属性配置
* body: 消息
*/
for (int i = 0; i < 10; i++) {
String msg = "Hello work queue... " + i;
channel.basicPublish("",Constants.WORK_QUEUE,null,msg.getBytes());
System.out.println(msg + "消息发送成功!");
}
//6.资源释放
channel.close();
connection.close();
}
}
编写消费者代码
消费者代码和简单模式一样, 只是复制两份. 两个消费者代码可以是一样的
Consumer1:
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.声明队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* 参数说明:
* queue:队列名称
* durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
* exclusive:是否独占,只能有⼀个消费者监听队列
* autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
* arguments 参数
*/
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
//4.消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
//5.释放资源
// channel.close();
// connection.close();
}
}
Consumer2:
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.声明队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* 参数说明:
* queue:队列名称
* durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
* exclusive:是否独占,只能有⼀个消费者监听队列
* autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
* arguments 参数
*/
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
//4.消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer2 收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
//5.释放资源
// channel.close();
// connection.close();
}
}
运行程序,观察结果
先启动两个消费者运行,再启动生产者
如果先启动生产者,在启动消费者,由于消息较少,处理较快,那么第一个启动的消费者就会瞬间把10条消息消费掉,所以我们先启动两个消费者,再启动生产者
1. 启动2个消费者
2. 启动生产者
可以看到两个消费者都打印了消费信息
可以看到管理界面上显示两个消费者
3. Publish/Subscribe(发布/订阅)
在发布/订阅模型中,多了一个Exchange角色
Exchange 常见有三种类型,分别代表不同的路由规则
a) Fanout: 广播,将消息交给所有绑定到交换机的队列 (Publish/Subscribe模式)
b) Direct: 定向,把消息交给符合指定routing key的队列 (Routing模式)
c)Topic: 通配符,把消息交给符合routing pattern(路由模式)的队列 (Topics模式)
也就分别对应不同的工作模式
我们来看看Publish/Subscribe 模式
步骤:
1.引入依赖
2.编写生产者代码
3.编写消费者代码
编写生产者代码
和前面两个的区别是:
需要创建交换机,并且绑定队列和交换机
创建交换机
声明两个队列
后面验证是否两个队列都能收到消息
绑定队列和交换机
完整代码:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.声明交换机
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
//4.申明队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* 参数说明:
* queue:队列名称
* durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
* exclusive:是否独占,只能有⼀个消费者监听队列
* autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
* arguments 参数
*/
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
//5.交换机和队列绑定
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
//6.发送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 参数说明
* exchange: 交换机名称, 简单模式下, 交换机会使用默认的""
* routingKey: 内置交换机, routingKey和队列名称保持一致
* props: 属性配置
* body: 消息
*/
for (int i = 0; i < 10; i++) {
String msg = "Hello fanout queue... " + i;
channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,msg.getBytes());
System.out.println(msg + "消息发送成功!");
}
//6.资源释放
channel.close();
connection.close();
}
}
编写消费者代
交换机和队列的绑定关系及声明已经在生产方写完,所以消费者不需要再写了
去掉声明队列的代码就可以了
1.创建Channel
2.接收消息,并处理
完整代码
消费者1
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.申明队列
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
//4.消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer2 收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
}
}
消费者2
把队列名称改一下就可以了.此处省略
运行程序,观察结果
1.运行生产者
a) 可以看到两个队列分别有了一条消息
b) Exchange多了队列绑定关系
2.运行消费者
4. Routing(路由模式)
队列和交换机的绑定, 不能是任意的绑定了, 而是要指定一个BindingKey(RoutingKey的一种)
消息的发送方在向Exchange发送消息时, 也需要指定消息的RoutingKey
Exchange也不再把消息交给每一个绑定的key, 而是根据消息的RoutingKey进行判断,只有队列绑定时的BindingKey和发送消息的RoutingKey完全一致, 才会接收到消息
编写生产者代码
和发布订阅模式的区别是: 交换机类型不同, 绑定队列的BindingKey不同
创建交换机, 定义交换机类型为BuiltinExchangeType.DIRECT
声明队列
绑定交换机和队列
完整代码:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.声明交换机
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
//4.申明队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* 参数说明:
* queue:队列名称
* durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
* exclusive:是否独占,只能有⼀个消费者监听队列
* autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
* arguments 参数
*/
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
//5.交换机和队列绑定
channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");
//6.发送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 参数说明
* exchange: 交换机名称, 简单模式下, 交换机会使用默认的""
* routingKey: 内置交换机, routingKey和队列名称保持一致
* props: 属性配置
* body: 消息
*/
String msg = "hello direct, my routingKey is a ...";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, msg.getBytes());
String msg_b = "hello direct, my routingKey is b ...";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, msg_b.getBytes());
String msg_c = "hello direct, my routingKey is c ...";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, msg_c.getBytes());
System.out.println("消息发送成功!");
//6.资源释放
channel.close();
connection.close();
}
}
编写消费者代码
Routing模式的消费者代码和Publish/Subscribe 代码一样,同样复制出来两份
消费者1:DirectRabbitmqConsumer1
消费者2: DirectRabbitmgConsumer2
修改消费的队列名称就可以
完整代码:
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.申明队列
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
//4.消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer2 收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
}
}
运行程序, 观察结果:
5. Topics(通配符模式)
Topics 和Routing模式的区别是:
1. topics 模式使用的交换机类型为topic(Routing模式使用的交换机类型为direct)
2. topic 类型的交换机在匹配规则上进行了扩展, Binding Key支持通配符匹配(direct类型的交换机路由规则是BindingKey和RoutingKey完全匹配).
编写生产者代码
和路由模式,发布订阅模式的区别是: 交换机类型不同,绑定队列的RoutingKey不同
创建交换机
定义交换机类型为BuiltinExchangeType.TOPIC
声明队列
绑定交换机和队列
完整代码
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.声明交换机
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
//4.申明队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
* Map<String, Object> arguments)
* 参数说明:
* queue:队列名称
* durable:可持久化 true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。
* exclusive:是否独占,只能有⼀个消费者监听队列
* autoDelete:是否⾃动删除, 当没有Consumer时, ⾃动删除掉
* arguments 参数
*/
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
//5.交换机和队列绑定
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
//6.发送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 参数说明
* exchange: 交换机名称, 简单模式下, 交换机会使用默认的""
* routingKey: 内置交换机, routingKey和队列名称保持一致
* props: 属性配置
* body: 消息
*/
String msg = "hello direct, my routingKey is sae.a.fa ...";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "sae.a.fa", null, msg.getBytes()); //转发到Q1
String msg_b = "hello direct, my routingKey is ef.a.b ...";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes());//转发到Q1,Q2
String msg_c = "hello direct, my routingKey is c.ef.d ...";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.d", null, msg_c.getBytes());//转发到Q2
System.out.println("消息发送成功!");
//6.资源释放
channel.close();
connection.close();
}
}
编写消费者代码
Routing模式的消费者代码和Routing模式代码一样,修改消费的队列名称即可
同样复制出来两份
消费者1:TopicRabbitmqConsumerl
消费者2: TopicRabbitmqConsumer2
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.申明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
//4.消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer2 收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);
}
}
代码运行结果:
6. RPC(RPC通信)
RPC(Remote Procedure Cal),即远程过程调用.它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术.类似于Http远程调用.
RabbitMQ实现RPC通信的过程,大概是通过两个队列实现一个可回调的过程
大概流程如下:
1.客户端发送消息到一个指定的队列, 并在消息属性中设置 replyTo 字段, 这个字段指定了一个回调队列, 服务端处理后, 会把响应结果发送到这个队列.
2.服务端接收到请求后, 处理请求并发送响应消息到 replyTo 指定的回调队列
3.客户端在回调队列上等待响应消息. 一旦收到响应,客户端会检查消息的correlationld属性,以确
保它是所期望的响应.
编写客户端代码
客户端代码主要流程如下:
1.声明两个队列, 包含回调队列 replyQueueName, 声明本次请求的唯一标志 corrld
2.将 replyQueueName 和 corrld 配置到要发送的消息队列中
3.使用阻塞队列来阻塞当前进程, 监听回调队列中的消息, 把请求放到阻塞队列中
4.阻塞队列有消息后, 主线程被唤醒,打印返回内容
申明队列:
使用内置交换机发送消息:
//3.发送请求
String msg = "hello rpc...";
//设置请求的唯一标识
String correlationID = UUID.randomUUID().toString();
//设置请求的相关属性
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(correlationID)
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
使用阻塞队列, 来存储回调结果:
//4.接收响应
//使用阻塞队列, 来存储响应
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String respMsg = new String(body);
System.out.println("接收到回调信息: " + respMsg);
if(correlationID.equals(properties.getCorrelationId())) {
//如果 correlationID 校验一致
response.offer(respMsg);
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
获取回调结果:
String result = response.take();
System.out.println("[RPC Client 响应结果]: " + result);
完整代码:
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
* rpc 客户端
* 1.发送请求
* 2.接收响应
*/
public class RpcClient {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false,null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false,null);
//3.发送请求
String msg = "hello rpc...";
//设置请求的唯一标识
String correlationID = UUID.randomUUID().toString();
//设置请求的相关属性
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(correlationID)
.replyTo(Constants.RPC_RESPONSE_QUEUE)
.build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
//4.接收响应
//使用阻塞队列, 来存储响应
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String respMsg = new String(body);
System.out.println("接收到回调信息: " + respMsg);
if(correlationID.equals(properties.getCorrelationId())) {
//如果 correlationID 校验一致
response.offer(respMsg);
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
String result = response.take();
System.out.println("[RPC Client 响应结果]: " + result);
}
}
编写服务端代码
服务端代码主要流程如下:
1.接收消息
2.根据消息内容进行响应处理,把应答结果返回到回调队列中
设置同时最多只能获取一个消息
如果不设置 basicQos, RabbitMQ 会使用默认的 OoS 设置, 其 prefetchcount 默认值为0. 当
prefetchCount为0 时,RabbitMO 会根据内部实现和当前的网络状况等因素,可能会同时发送多条
消息给消费者. 这意味着在默认情况下,消费者可能会同时接收到多条消息, 但具体数量不是严格保
证的,可能会有所波动
在RPC模式下,通常期望的是一对一的消息处理,即一个请求对应一个响应,消费者在处理完一个消息并确认之后,才会接收到下一条消息.
接收消息, 并做出相应的处理
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body);
System.out.println("接收到请求: " + request);
String response = "针对request:" + request + " , 响应成功";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
RabbitMQ 消息确定机制
在RabbitMO中,basicConsume方法的autoAck参数用于指定消费者是否应该自动向消息队列确认
消息自动确认(autoAck=true): 消息队列在将消息发送给消费者后, 会立即从内存中删除该消息. 这意味着, 如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
手动确认(autoAck=false): 消息队列在将消息发送给消费者后,需要消费者显式地调用basicAck
方法来确认消息. 手动确认提供了更高的可靠性, 确保消息不会被意外丢失, 适用于消息处理重要且需要确保每个消息都被正确处理的场景.
完整代码:
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RPC Server
* 1.接收请求
* 2.发送响应
*/
public class RpcServer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.接收请求
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body);
System.out.println("接收到请求: " + request);
String response = "针对request:" + request + " , 响应成功";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
}
}
运行结果:
7. Publisher Confirms(发布确认)
作为消息中间件,都会面临消息丢失的问题.
消息丢失大概分为三种情况:
生产者问题. 因为应用程序故障, 网络抖动等各种原因, 生产者没有成功向broker发送消息
消息中间件自身问题, 生产者成功发送给了Broker,但是Broker没有把消息保存好,导致消息丢失
消费者问题. Broker 发送消息到消费者, 消费者在消费消息时, 因为没有处理好, 导致broker将消费
失败的消息从队列中删除了
RabbitMO也对上述问题给出了相应的解决方案,问题2可以通过持久化机制.问题3可以采用消息应答机制.
针对问题1,可以采用发布确认(Publisher Confirms)机制实现
发布确认 属于RabbitMQ的七大工作模式之一
生产者将信道设置成 confirm(确认) 模式, 一旦信道进入confirm模式, 所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始), 一旦消息被投递到所有匹配的队列之后, RabbitMO就会发送一个确认给生产者(包含消息的唯一ID), 这就使得生产者知道消息已经正确到达目的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写入磁盘之后发出. broker回传给生产者的确认消息中 deliveryTag 包含了确认消息的序号, 此外 broker也可以设置 channel.basicAck 方法中的 multiple参数, 表示到这个序号之前的所有消息都已经得到了处理.
发送方确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息
1.当消息最终得到确认之后, 生产者可以通过回调方法来处理该确认消息.
2.如果 RabbitMO 因为自身内部错误导致消息丢失, 就会发送一条nack(Basic.Nack)命令, 生产者同样可以在回调方法中处理该nack命令
使用发送确认机制, 必须要信道设置成 confirm(确认) 模式
发布确认是 AMOP 0.9.1协议的扩展, 默认情况下它不会被启用. 生产者通过channel.confirmSelect() 将信道设置为confirm模式.
发布确认有3种策略, 接下来我们来学习这三种策略
7.1Publishing Messages Individually(单独确认)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
public class PublisherConfirms {
private static final Integer MESSAGE_COUNT = 200;
static Connection createConnection() throws Exception {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception{
//Publishing Messages Individually(单独确认)
publishingMessagesIndividually();
//Publishing Messages in Batches(批量确认)
publishingMessagesInBatches();
//Handling Publisher Confirms Asynchronously(异步确认)
handlingPublisherConfirmsAsynchronously();
}
private static void handlingPublisherConfirmsAsynchronously() {
}
private static void publishingMessagesInBatches() {
}
/**
* 单独确认
*/
private static void publishingMessagesIndividually() throws Exception{
try (Connection connection = createConnection()){
//1.开启信道
Channel channel = connection.createChannel();
//2.设置信道为confirm模式
channel.confirmSelect();
//3.申明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE1, true, false, false, null);
//4.发送消息, 并等待确认
long start = System.currentTimeMillis();
for(int i = 0; i<MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms " + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE1, null, msg.getBytes());
//等待确认
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
}
}
}
代码运行结果:
可以发现,发送200条消息,耗时很长
观察上面代码, 会发现这种策略是每发送一条消息后就调用channel.waitForConfirmsOrDie方法, 之后等待服务端的确认, 这实际上是一种串行同步等待的方式, 尤其对于持久化的消息来说, 需要等待消息确认存储在磁盘之后才会返回(调用Linux内核的fsync方法).
但是发布确认机制是支持异步的. 可以一边发送消息, 一边等待消息确认.
由此进行了改进, 接下来看另外两种策略:
Publishing Messages in Batches(批量确认): 每发送一批消息后, 调用channel.waitForConfirms方
法, 等待服务器的确认返回.
Handling Publisher Confirms Asynchronously(异步确认): 提供一个回调方法, 服务端确认了一条
或者多条消息后客户端会回这个方法进行处理
7.2 Publishing Messages in Batches(批量确认)
核心代码:
private static void publishingMessagesInBatches() throws Exception{
try (Connection connection = createConnection()){
//1.开启信道
Channel channel = connection.createChannel();
//2.设置信道为confirm模式
channel.confirmSelect();
//3.申明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE2, true, false, false, null);
//4.发送消息, 并进行确认
long start = System.currentTimeMillis();
int batchSize = 100;
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms " + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE2, null, msg.getBytes());
outstandingMessageCount++;
if(outstandingMessageCount == batchSize) {
//等待确认
channel.waitForConfirmsOrDie(5000);
outstandingMessageCount = 0;
}
}
if(outstandingMessageCount > 0) {
//等待确认
channel.waitForConfirmsOrDie(5000);
outstandingMessageCount = 0;
}
long end = System.currentTimeMillis();
System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
}
代码运行结果:
可以观察到, 性能提高了很多
相比于单独确认策略, 批量确认极大地提升了confirm的效率, 缺点是出现Basic,Nack或者超时时, 我们不清楚具体哪条消息出了问题.客户端需要将这一批次的消息全部重发, 这会带来明显的重复消息数量.
当消息经常丢失时, 批量确认的性能应该是不升反降的.
7.3 Handling Publisher Confirms Asynchronously(异步确认)
异步confirm方法的编程实现最为复杂. Channel接口提供了一个方法addConfirmListener. 这个方法
可以添加ConfirmListener回调接口.
ConfirmListener接口中包含两个方法:handleAck(long deliveryTag,booleanmultiple) 和 handleNack(long deliveryTag,boolean multiple), 分别对应处理RabbitMO发送给生产者的ack和nack.
deliveryTag 表示发送消息的序号. multiple 表示是否批量确认
我们需要为每一个Channel维护一个已发送消息的序号集合, 当收到RabbitMO的 confirm 回调时,从集合中删除对应的消息. 当Channel开启confirm模式后, channel上发送消息都会附带一个从1开始递增的deliveryTag序号. 我们可以使用SortedSet 的有序性来维护这个已发消息的集合
1. 当收到ack时,从序列中删除该消息的序号,如果为批量确认消息,表示小于等于当前序号deliveryTag的消息都收到了,则清除对应集合
2. 当收到nack时,处理逻辑类似, 不过需要结合具体的业务情况, 进行消息重发等操作
/**
* 异步确认
*/
private static void handlingPublisherConfirmsAsynchronously() throws Exception{
try (Connection connection = createConnection()){
//1.开启信道
Channel channel = connection.createChannel();
//2.设置信道为confirm模式
channel.confirmSelect();
//3.申明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE3, true, false, false, null);
//4.监听confirm
//有序集合,元素按照⾃然顺序进⾏排序,存储未confirm消息序号
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//multiple 批量
//confirmSet.headSet(n)⽅法返回当前集合中⼩于n的集合
if(multiple) {
//批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰
//这批序号的消息都已经被ack了
confirmSeqNo.headSet(deliveryTag+1).clear();
}else {
//单条确认:将当前的deliveryTag从集合中移除
confirmSeqNo.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if(multiple) {
confirmSeqNo.headSet(deliveryTag+1).clear();
}else {
confirmSeqNo.remove(deliveryTag);
}
//业务要根据实际场景进行处理, 比如重发, 此处省略
}
});
//5.发送消息, 并等待确认
long start = System.currentTimeMillis();
for(int i = 0; i<MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms " + i;
//得到下次发送消息的序号, 从1开始
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE1, null, msg.getBytes());
confirmSeqNo.add(seqNo);
}
while (!confirmSeqNo.isEmpty()) {
Thread.sleep(10);
}
long end = System.currentTimeMillis();
System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
}
}
三种策略对比, 完整代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
private static final Integer MESSAGE_COUNT = 200;
static Connection createConnection() throws Exception {
//1.建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
connectionFactory.setUsername(Constants.USER_NAME); //账号
connectionFactory.setPassword(Constants.PASSWORD); //密码
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
Connection connection = connectionFactory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception{
//Publishing Messages Individually(单独确认)
publishingMessagesIndividually();
//Publishing Messages in Batches(批量确认)
publishingMessagesInBatches();
//Handling Publisher Confirms Asynchronously(异步确认)
handlingPublisherConfirmsAsynchronously();
}
/**
* 异步确认
*/
private static void handlingPublisherConfirmsAsynchronously() throws Exception{
try (Connection connection = createConnection()){
//1.开启信道
Channel channel = connection.createChannel();
//2.设置信道为confirm模式
channel.confirmSelect();
//3.申明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE3, true, false, false, null);
//4.监听confirm
//有序集合,元素按照⾃然顺序进⾏排序,存储未confirm消息序号
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//multiple 批量
//confirmSet.headSet(n)⽅法返回当前集合中⼩于n的集合
if(multiple) {
//批量确认:将集合中⼩于等于当前序号deliveryTag元素的集合清除,表⽰
//这批序号的消息都已经被ack了
confirmSeqNo.headSet(deliveryTag+1).clear();
}else {
//单条确认:将当前的deliveryTag从集合中移除
confirmSeqNo.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if(multiple) {
confirmSeqNo.headSet(deliveryTag+1).clear();
}else {
confirmSeqNo.remove(deliveryTag);
}
//业务要根据实际场景进行处理, 比如重发, 此处省略
}
});
//5.发送消息, 并等待确认
long start = System.currentTimeMillis();
for(int i = 0; i<MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms " + i;
//得到下次发送消息的序号, 从1开始
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE1, null, msg.getBytes());
confirmSeqNo.add(seqNo);
}
while (!confirmSeqNo.isEmpty()) {
Thread.sleep(10);
}
long end = System.currentTimeMillis();
System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
}
}
private static void publishingMessagesInBatches() throws Exception{
try (Connection connection = createConnection()){
//1.开启信道
Channel channel = connection.createChannel();
//2.设置信道为confirm模式
channel.confirmSelect();
//3.申明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE2, true, false, false, null);
//4.发送消息, 并进行确认
long start = System.currentTimeMillis();
int batchSize = 100;
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms " + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE2, null, msg.getBytes());
outstandingMessageCount++;
if(outstandingMessageCount == batchSize) {
//等待确认
channel.waitForConfirmsOrDie(5000);
outstandingMessageCount = 0;
}
}
if(outstandingMessageCount > 0) {
//等待确认
channel.waitForConfirmsOrDie(5000);
outstandingMessageCount = 0;
}
long end = System.currentTimeMillis();
System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
}
}
/**
* 单独确认
*/
private static void publishingMessagesIndividually() throws Exception{
try (Connection connection = createConnection()){
//1.开启信道
Channel channel = connection.createChannel();
//2.设置信道为confirm模式
channel.confirmSelect();
//3.申明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE1, true, false, false, null);
//4.发送消息, 并等待确认
long start = System.currentTimeMillis();
for(int i = 0; i<MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms " + i;
channel.basicPublish("", Constants.PUBLISH_CONFIRMS_QUEUE1, null, msg.getBytes());
//等待确认
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
}
}
}