RabbitMQ中的各模式及其用法
- 工作队列模式
- 一、生产者代码
- 1、封装工具类
- 2、编写代码
- 3、发送消息效果
- 二、消费者代码
- 1、编写代码
- 2、运行效果
- 发布订阅模式
- 一、生产者代码
- 二、消费者代码
- 1、消费者1号
- 2、消费者2号
- 三、运行效果
- 四、小结
- 路由模式
- 一、生产者代码
- 二、消费者代码
- 1、消费者1号
- 2、消费者2号
- 三、运行结果
- 1、绑定关系
- 2、消费消息
- 主题模式
- 一、生产者代码
- 二、消费者代码
- 1、消费者1号
- 2、消费者2号
- 三、运行效果
- 总结
工作队列模式
一、生产者代码
新建一个module,在module下创建属于自己的包,并且创建一个名为“work”的子包,以及工具类包“util”。结构如图所示:
在pom文件中添加图中所示依赖:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
</dependencies>
此时准备工作基本完成。
1、封装工具类
修改rabbitMQ地址,替换为自己的。
package com.xxx.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ClassName: ConnectionUtil
* @Package: com.xxx.rabbitmq.util
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class ConnectionUtil {
public static final String HOST_ADDRESS = "192.168.xxx.xxx";
public static Connection getConnection() throws Exception {
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(HOST_ADDRESS);
// 端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection con = ConnectionUtil.getConnection();
// amqp://guest@192.168.xxx.xxx:5672/
System.out.println(con);
con.close();
}
}
2、编写代码
新建生产者类Producer:
package com.xxx.rabbitmq.work;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName: Producer
* @Package: com.xxx.rabbitmq.work
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Producer {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 1; i <= 10; i++) {
String body = i+"hello rabbitmq~~~";
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
}
channel.close();
connection.close();
}
}
3、发送消息效果
二、消费者代码
1、编写代码
创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。
Consumer1:
package com.xxx.rabbitmq.work;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName: Consumer1
* @Package: com.xxx.rabbitmq.work
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Consumer1 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
Consumer2:
package com.xxx.rabbitmq.work;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName: Consumer2
* @Package: com.xxx.rabbitmq.work
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Consumer2 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer2 body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
** 注意:**
运行的时候先启动两个消费端程序,然后再启动生产者端程序。
如果已经运行过生产者程序,则手动把work_queue队列删掉。
2、运行效果
最终两个消费端程序竞争结果如下:
这样就完成了工作队列模式的演示。
发布订阅模式
一、生产者代码
还是在上面的module内,新建一个名为fanout的子包,在包内创建Producer类:
package com.xxx.rabbitmq.fanout;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName: Producer
* @Package: com.xxx.rabbitmq.fanout
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 1、获取连接
Connection connection = ConnectionUtil.getConnection();
// 2、创建频道
Channel channel = connection.createChannel();
// 参数1. exchange:交换机名称
// 参数2. type:交换机类型
// DIRECT("direct"):定向
// FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"):通配符的方式
// HEADERS("headers"):参数匹配
// 参数3. durable:是否持久化
// 参数4. autoDelete:自动删除
// 参数5. internal:内部使用。一般false
// 参数6. arguments:其它参数
String exchangeName = "test_fanout";
// 3、创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
// 4、创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 5、绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout,routingKey设置为""
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
// 6、发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
// 7、释放资源
channel.close();
connection.close();
}
}
二、消费者代码
1、消费者1号
package com.xxx.rabbitmq.fanout;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName: Consumer1
* @Package: com.xxx.rabbitmq.fanout
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
2、消费者2号
package com.xxx.rabbitmq.fanout;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName: Consumer2
* @Package: com.xxx.rabbitmq.fanout
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue2Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
三、运行效果
先启动消费者,然后再运行生产者程序发送消息:
四、小结
交换机和队列的绑定关系如下图所示:
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
- 工作队列模式本质上是绑定默认交换机
- 发布订阅模式绑定指定交换机
- 监听同一个队列的消费端程序彼此之间是竞争关系
- 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息
路由模式
一、生产者代码
新建子包routing,并新建Producer类:
package com.xxx.rabbitmq.routing;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName: Producer
* @Package: com.xxx.rabbitmq.routing
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
// 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 声明(创建)队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 队列绑定交换机
// 队列1绑定error
channel.queueBind(queue1Name,exchangeName,"error");
// 队列2绑定info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
String message = "日志信息:张三调用了delete方法.错误了,日志级别error";
// 发送消息
channel.basicPublish(exchangeName,"error",null,message.getBytes());
System.out.println(message);
// 释放资源
channel.close();
connection.close();
}
}
二、消费者代码
1、消费者1号
package com.xxx.rabbitmq.routing;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName: Consumer1
* @Package: com.xxx.rabbitmq.routing
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
2、消费者2号
package com.xxx.rabbitmq.routing;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName: Consumer2
* @Package: com.xxx.rabbitmq.routing
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue2Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer2 将日志信息存储到数据库.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
三、运行结果
1、绑定关系
2、消费消息
主题模式
一、生产者代码
新建子包topic,新建生产者类Producer:
package com.xxx.rabbitmq.topic;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @ClassName: Producer
* @Package: com.xxx.rabbitmq.topic
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
// routing key 常用格式:系统的名称.日志的级别。
// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
// 分别发送消息到队列:order.info、goods.info、goods.error
String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());
body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
channel.close();
connection.close();
}
}
二、消费者代码
1、消费者1号
消费者1监听队列1:
package com.xxx.rabbitmq.topic;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName: Consumer1
* @Package: com.xxx.rabbitmq.topic
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue1";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
2、消费者2号
消费者2监听队列2:
package com.xxx.rabbitmq.topic;
import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName: Consumer2
* @Package: com.xxx.rabbitmq.topic
* @Author:
* @CreateDate:
* @Version: V1.0.0
* @Description:
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue2";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
三、运行效果
队列1:
队列2:
至此,就完成了RabbitMQ各模式的使用演示。
总结
在选择使用什么模式时,需要对应业务需求,结合需求选择合适的模式。