目录
一、基本使用
二、使用注意事项
1. 生产者重连机制 - 保证mq服务是通的
2. 生产者确认机制 - 回调机制
3. MQ的可靠性
4. Lazy Queue模式
5. 消费者确认机制
一、基本使用
部署完RabbitMQ有两种使用方式:
- 网页客户端
- Java代码
MQ组成部分:
- 虚拟主机 -> 进行数据隔离的,好比mysql中的不同数据库
- 交换机 -> 进行路由转发消息
- fanout:最普通 相当于广播
- topic:可依key绑定不同队列,可使用.分割key,并支持模糊匹配
- direct:可依key绑定不同队列,不支持模糊匹配
- 队列 -> 接受交换机转发过来的消息
Java中使用(注解开发):
1.引依赖和配置属性 - 版本选个差不多的 我是继承的springboot的
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.189.189
port: 5672
virtual-host: hmallVir
username: hmall
password: 123321
2.使用RabbitTemplate发消息
@SpringBootTest
public class TestSendMs {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void testSend1(){
String queueName = "hmallQueue1";
String ms = "hello";
rabbitTemplate.convertAndSend(queueName, ms);
}
}
3.使用RabbitListerner接受消息
@Component
public class MyListeners {
@RabbitListener(queues = "hmallQueue1")
public void listenerMs1(String ms) throws InterruptedException {
System.out.println("消费者1接收到消息 -> " + ms);
Thread.sleep(20);
}
}
4.声明队列和交换机的方式二(简单常用)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "hmallQueue1"),
exchange = @Exchange(name = "hmall", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到消息 -> " + msg);
}
轮询接受消息问题:
队列读取消息时使用轮询机制,每个队列都读取相同的消息数量,这样不好,我们要针对队列处理消息的能力,需要在配置文件设置属性fetch
spring:
rabbitmq:
host: 192.168.189.189
port: 5672
virtual-host: hmallVir
username: hmall
password: 123321
listener:
simple:
prefetch: 1 # 每次处理完消息再获取新的消息 性能越好处理消息越多
扩展消息转换器:
默认的消息转换器是直接将对象序列化为Byte[],即读不懂又不安全还占内存
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
@Bean
public MessageConverter jacksonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
二、使用注意事项
1. 生产者重连机制 - 保证mq服务是通的
连接重试,注意这里是阻塞式的,意味着连接失败会一直重试其他业务不会执行,所以建议禁用此模式。
spring:
rabbitmq:
host: 192.168.189.189
port: 5672
virtual-host: hmallVir
username: hmall
password: 123321
connection-timeout: 1s # 连接失败重试
template:
retry:
enabled: true # 开启重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待市场倍数
max-attempts: 3 # 最大重试次数
2. 生产者确认机制 - 回调机制
SpringAMQP生产者确认机制的返回值情况?
- ack 消息成功投递到mq
- 交换机收到了 没有路由转发到队列 ack + return路由异常
- 交换机收到并路由成功 只ack
- nack 消息丢失 投递失败
3. MQ的可靠性
服务一旦挂了消息就都没有了,还有就是内存如果满了,会触发阻塞式的强制持久化操作,这会导致这段时间处理消息的能力为0
- 设置消息的持久化属性:需要将
delivery_mode
属性设置为2。这个设置会将消息标记为持久化,确保消息被写入磁盘而不是仅保存在内存中。 - 声明持久化的队列:在RabbitMQ中声明队列时,需要将
durable
属性设置为true。这样,队列的元数据以及队列中的消息都会被持久化到磁盘上。需要注意的是,即使队列被设置为持久化,也不能保证内部所存储的消息不会丢失,因为RabbitMQ默认在消息被消费后立即删除它。要确保消息不被删除,需要在消费时进行相应的设置。 - 声明持久化的交换机:与队列类似,交换机也可以被设置为持久化。在声明交换机时,将
durable
属性设置为true,就可以将交换机标记为持久化。如果交换机不设置持久化,那么在RabbitMQ服务重启之后,相关的交换机元数据会丢失,但已发送的消息不会丢失,只是不能再将新的消息发送到这个交换机中。
4. Lazy Queue模式
- 降低内存压力:在高负载情况下,大量消息堆积可能导致内存压力剧增。使用Lazy Queue模式,消息直接写入磁盘,可以有效降低内存使用,避免内存溢出等问题。
- 支持大量消息存储:由于消息存储在磁盘上,Lazy Queue模式可以支持数百万条消息的存储,满足大规模消息处理的需求。
- 提高系统稳定性:通过将消息持久化到磁盘,Lazy Queue模式增强了消息的可靠性,即使RabbitMQ服务器发生故障或重启,消息也不会丢失,保证了系统的稳定性和数据的完整性。
5. 消费者确认机制
这里就是模拟事务嘛,利用AOP思想