文章目录
- 事务消息
- 概念
- 配置
- 惰性队列
- 概念
- 应用场景
- 优先队列
- 概念
- 配置
事务消息
仅在生产者端有效,消费端无效
概念
总结:
- 在生产者端使用事务消息和消费端没有关系
- 在生产者端使用事务消息仅仅是控制事务内的消息是否发送
- 提交事务就把事务内所有消息都发送到交换机
- 回滚事务则事务内任何消息都不会被发送
配置
// 配置
import lombok.Data;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Data
public class RabbitConfig {
@Bean
public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
}
//测试
@SpringBootTest
@Slf4j
public class RabbitMQTest {
public static final String EXCHANGE_NAME = "exchange.tx.dragon";
public static final String ROUTING_KEY = "routing.key.tx.dragon";
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
@Transactional
@Rollback(value = true) // junit 默认回滚事务,所以想提交事务就设置为 false
public void testSendMessageInTx() {
// 1、发送第一条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01~~~)");
// 2、抛出异常
log.info("do bad:" + 10 / 0);
// 3、发送第二条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02~~~)");
}
}
惰性队列
概念
创建队列时,在Durability这里有两个选项可以选择
- Durable:持久化队列,消息会持久化到硬盘上
- Transient:临时队列,不做持久化操作,broker重启后消息会丢失
https://www.rabbitmq.com/docs/lazy-queues
经典队列中的消息会尽可能早地移动到磁盘。 只有当使用者请求这些消息时,它们才会加载到 RAM 中
会从硬盘中读取,只能作为权宜之计
应用场景
优先队列
概念
默认情况:基于队列先进先出的特性,通常来说,先入队的先投递
- 设置优先级之后:优先级高的消息更大几率先投递
- 关键参数:x-max-priority
RabbitMQ允许我们使用一个正整数给消息设定优先级
- 消息的优先级数值取值范围:1~255
- RabbitMQ官网建议在1~5之间设置消息的优先级(优先级越高,占用CPU、
内存等资源越多)
队列在声明时可以指定参数:x-max-priority
- 默认值:0 此时消息即使设置优先级也无效
- 指定一个正整数值:消息的优先级数值不能超过这个值
配置
配置exchange:exchange.test.priority
配置队列:queue.test.priority
x-max-priority
// 生产者:发三个从1开始,优先级越大消费的时候越在前面消费
// 下面是发一条消息的示例
public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
@Test
public void test06SendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "message test proirity 3", message -> {
// 消息本身的优先级数值
// 切记:不能超过 x-max-priority: 10
message.getMessageProperties().setPriority(3);
return message;
});
}
// 消费者
@Component
@Slf4j
public class MyMessageListener {
public static final String QUEUE_PRIORITY = "queue.test.priority";
@RabbitListener(queues = {QUEUE_PRIORITY})
public void processMessagePriority(String dataString, Message message, Channel channel) throws IOException {
log.info("[priority]" + dataString);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}