1.交换机Exchange
RabbitMQ消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们放到多个队列中还是说应该丢弃它们,这就的由交换机的类型来决定。
2.常见的交换机类型
- 直接(direct)
- 主题(topic)
- 标题(headers)
- 扇出(fanout)
3. 默认交换机(无名交换机)
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
上述代码中的第1个参数就是交换机名称,空字符串(“”)表示默认交换机(无名交换机)
3.绑定bindings
绑定bindings是交换机exchange与队列queue之间的桥梁
上图交换机X与队列Q1通过RoutingKey绑定,与队列Q2通过RoutingKey绑定
4.Fanout
Fanout扇出(广播/发布订阅),将接收到的所有消息广播到所有队列,类似于大喇叭,消息群发
一个消息生产者发消息给交换机,交换机通过相同的routingKey绑定2个队列,2个消费者收到同样的消息
4.1.消息生产者
package com.hong.rabbitmq6;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
/**
* @Description: 扇出模式消息发送者
* @Author: hong
* @Date: 2024-01-13 21:24
* @Version: 1.0
**/
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
Scanner scanner = new Scanner(System.in);
System.out.println("请输入:");
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送完成------" + message);
}
}
}
4.2.消费者1
package com.hong.rabbitmq6;
import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
/**
* @Description: 广播模式消息接收者1
* @Author: hong
* @Date: 2024-01-13 20:49
* @Version: 1.0
**/
public class Receive1 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
/*
*声明交换机
*第1个参数:交换机名称
*第2个参数:交换机类型
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明临时队列 当消费者断开与队列的连接,队列自动删除
String queueName = channel.queueDeclare().getQueue();
/*
* 绑定队列与交换机
* 第1个参数:队列名称
* 第2个参数:交换机名称
* 第3个参数:routingKey
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
System.out.println("Receive1接收到的消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = var -> {
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
4.3.消费者2
package com.hong.rabbitmq6;
import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
/**
* @Description: 广播模式消息接收者2
* @Author: hong
* @Date: 2024-01-13 20:49
* @Version: 1.0
**/
public class Receive2 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
/*
*声明交换机
*第1个参数:交换机名称
*第2个参数:交换机类型
*/
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明临时队列 当消费者断开与队列的连接,队列自动删除
String queueName = channel.queueDeclare().getQueue();
/*
* 绑定队列与交换机
* 第1个参数:队列名称
* 第2个参数:交换机名称
* 第3个参数:routingKey
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");
DeliverCallback deliverCallback = (comsumerTag, message) -> {
System.out.println("Receive2接收到的消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = var -> {
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}