文章目录
- 简介
- 实现原理
- 实现逻辑
简介
RocketMQ事务消息
RocketMQ在4.3.0版中支持分布式事务消息,这里RocketMQ的事务消息是采用2PC(两段式协议) +补偿机制(消息回查)的分布式事务功能。提供消息发送与业务落库的一致性。
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。
RocketMQ的事务消息实现方式主要包括以下几个步骤:
1 生产者发送half消息到Broker。half消息在消费者看来是不可见的,这样可以避免消费者消费到事务未提交的数据,类似于数据库的隔离级别读已提交级别,避免脏读。
2.生产者创建订单,根据创建订单成功与否,向Broker发送commit或rollback。
3.生产者还可以提供Broker回调接口,当Broker发现一段时间half消息没有收到任何操作命令,会主动调用此接口来查询订单是否创建成功。
4.一旦half消息commit了,消费者库存系统就会来消费,如果消费成功,则消息销毁,分布式事务成功结束。
5.如果消费失败,则根据重试策略进行重试,最后还失败则进入死信队列,等待进一步处理。
实现原理
RocketMQ的事务消息实现原理基于两阶段提交协议(Two-Phase Commit Protocol),具体流程如下:
1.发送准备消息:当一个事务消息需要发送时,生产者会发送一个准备消息,该消息包含了实际的业务数据和事务的标识符。
2.执行本地事务:接收到准备消息后,生产者会执行一个本地事务,如果本地事务执行成功,则返回COMMIT状态,否则返回ROLLBACK状态。
3.提交或回滚本地事务:当生产者返回COMMIT状态时,代表本地事务已经执行成功,此时,RocketMQ会将消息标记为可提交状态,并发送一个commit消息给消费者;当生产者返回ROLLBACK状态时,代表本地事务执行失败,此时,RocketMQ会将消息标记为可回滚状态,并发送一个rollback消息给消费者。
4. 消费者处理消息:消费者在接收到commit或rollback消息后,会根据消息的状态来执行相应的操作。
5.提交或回滚事务:如果所有的消费者都接收到了commit消息,则代表该事务消息已经提交,此时生产者会提交本地事务,否则,如果有任何一个消费者接收到了rollback消息,则代表该事务消息已经回滚,生产者会回滚本地事务。
事务消息的实现原理基于两阶段提交协议,这是一种经典的分布式事务协议,可以保证数据的一致性和可靠性。RocketMQ通过实现TransactionListener接口来支持事务消息,同时也提供了许多配置选项和工具类来方便用户进行使用和扩展。
实现逻辑
RocketMQ的事务消息通过TransactionListener接口来实现。下面是事务消息的基本实现步骤:
1.实现事务监听器:首先,你需要实现RocketMQ提供的TransactionListener接口,该接口包括两个方法:executeLocalTransaction和checkLocalTransaction。
○ executeLocalTransaction方法用于执行本地事务,当发送事务消息时,RocketMQ会调用此方法来执行本地事务。在该方法内部,你需要执行实际的业务逻辑,并根据执行结果返回事务状态,可以是提交、回滚或是未知状态。
○ checkLocalTransaction方法用于检查本地事务状态,当RocketMQ没有收到事务消息的确认或者取消时,会调用此方法来检查本地事务的状态,然后对消息进行处理。
2.发送事务消息:在发送事务消息时,你需要指定事务监听器,并在executeLocalTransaction方法中执行实际的业务逻辑,然后根据业务逻辑的执行结果返回事务状态。
3.事务状态检查:RocketMQ会定期调用checkLocalTransaction方法来检查本地事务的状态,然后对消息进行处理,例如提交或者回滚。
4.事务消息处理:在消息消费端,你需要根据消息的实际状态来执行相应的处理逻辑。
下面是一个简单的示例代码,演示了如何使用RocketMQ的事务消息:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
TransactionListener transactionListener = new MyTransactionListener();
producer.setTransactionListener(transactionListener);
// 启动生产者
producer.start();
try {
// 创建事务消息
Message msg = new Message("TopicTest", "TagA", "KEY1", "Hello, RocketMQ".getBytes());
// 发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
class MyTransactionListener implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务,例如数据库操作等
// 返回本地事务的状态,可以是COMMIT、ROLLBACK或UNKNOW
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态,返回本地事务的状态,例如COMMIT、ROLLBACK或UNKNOW
}
}
上述代码展示了如何创建一个事务消息生产者,并实现一个简单的事务监听器。在实际应用中,你需要根据业务需求和具体场景来实现更复杂的事务逻辑。