RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计

在这里插入图片描述

🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
🌲文章所在专栏:RocketMQ
🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
💬 向我询问任何您想要的东西,ID:vnjohn
🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
😄 代词: vnjohn
⚡ 有趣的事实:音乐、跑步、电影、游戏

目录

  • 前言
  • 业务设计流程
  • 业务设计源码
    • 基础 SQL 脚本
    • 基础依赖
    • 基础配置
    • 基础依赖代码库
    • 模块代码
      • 订单服务
      • 库存服务
      • 生成订单
  • RocketMQ 事务消息
    • 订单服务生产者
      • 事务生产者实例
      • 事务生产者监听器
      • 事务消息实体
      • 事务消息投递
    • 库存服务消费者
    • Tips
  • 总结

前言

在上一篇文章:保护数据完整性:探索 RocketMQ 分布式事务消息的力量 详细分析了「事务消息设计方面及源码相关层面」讲解,事务半消息的发送及提交、事务消息的补偿过程

在 RocketMQ 中由于网络故障原因或业务应用程序异常宕机导致事务消息未及时的完成处理,提供了事务消息补偿机制>检查本地事务执行状态的方法,为整个流程二阶段提交完成了不可忽视的异常消息补偿机制。

接下来,会通过以下两个链路中的第一条链路进行实战演练,确保在 RocketMQ 事务消息处理过程中,这两者的事务状态能够确保一致完成.

1、创建订单完成、预扣减库存
2、订单支付完成、实扣减库存

业务设计流程

在这里插入图片描述

1、事务生产者发送半消息到 Broker 服务端的 Half Topic 中,实际发送半消息的 Topic 是真实的 Topic,在这里会被替换为「RMQ_SYS_TRANS_HALF_TOPIC」存储到日志文件中

2、在 Broker 服务端将半消息存储到日志文件以后,若发送半消息的结果是成功的,那么就会执行「订单服务客户端」本地事务方法「executeLocalTransaction」

3、会同步等待本地事务方法执行的结果,再根据执行结果、消息体投递消息类型「EndTransactionRequestHeader」给到 Broker 服务端进行处理,该请求由 Broker 服务端「EndTransactionProcessor」进行处理.

4、在 EndTransactionProcessor 中会根据本地事务处理的结果,进行判别

1、若本地事务执行成功,在 Broker 服务端会将半消息对应的 Topic 调整为真实的 Topic 消息进行存储到日志文件中,随即在库存服务的消费者才能消费到这条消息,从而再对库存进行扣减,同时标记好半消息,确保在定时检查事务消息时不会再次被扫描到进行处理
2、若本地事务执行失败,在 Broker 服务端会将半消息标记为「已处理」不会让定时触发的事务消息检查机制进行扫描到

当然,定时任务的数据处理,不能确保它有时间的误差性,所以说执行成功或执行失败的事务消息,会在补偿机制进行再一次的校验

业务设计源码

基础 SQL 脚本

CREATE TABLE `order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
  `order_no` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '订单编号',
  `amount` bigint DEFAULT NULL COMMENT '订单金额',
  `sku_id` bigint DEFAULT NULL COMMENT '商品skuId',
  `user_id` bigint DEFAULT NULL COMMENT '用户id',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

CREATE TABLE `stock` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
  `sku_id` bigint DEFAULT NULL COMMENT '商品skuId',
  `lock_stock` int DEFAULT NULL COMMENT '锁定库存',
  `stock` int DEFAULT NULL COMMENT '真实库存',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

基础依赖

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <spring-boot.version>2.6.7</spring-boot.version>
    <jackson.version>2.11.0</jackson.version>
    <mysql.version>8.0.17</mysql.version>
    <alibaba-druid.version>1.2.8</alibaba-druid.version>
    <mybatis-plus.version>3.4.2</mybatis-plus.version>
</properties>
<dependencies>
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.vnjohn</groupId>
        <artifactId>blog-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

基础配置

订单服务、库存服务在实际的生产过程中,会各自有各自的库,在本地环境中演练中采用一个库进行模拟.

订单服务,作为事务消息生产者

server:
  port: 8088

spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/rocketmq_transaction_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
    username: root
    password: 12345678
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      filters: stat
      maxActive: 30
      initialSize: 1
      maxWait: 10000
      # 保持连接活跃
      keep-alive: true
      minIdle: 1
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: select 'x'
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxOpenPreparedStatements: 20

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开启sql日志
  type-aliases-package: org.vnjohn.*.*.model # 注意:对应实体类的路径

rocketmq:
  transaction:
    producer: order_transaction
  bootstraps:
  namesrv-addr: 172.16.249.10:9876;172.16.249.11:9876;172.16.249.12:9876

库存服务,作为事务消息消费者

server:
  port: 8085

spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/rocketmq_transaction_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
    username: root
    password: 12345678
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      filters: stat
      maxActive: 30
      initialSize: 1
      maxWait: 10000
      # 保持连接活跃
      keep-alive: true
      minIdle: 1
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: select 'x'
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxOpenPreparedStatements: 20

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开启sql日志
  type-aliases-package: org.vnjohn.*.*.model # 注意:对应实体类的路径

rocketmq:
  bootstraps:
  namesrv-addr: 172.16.249.10:9876;172.16.249.11:9876;172.16.249.12:9876
  consumer-group: order_transaction_group
  order:
    create:
      topic: order_transaction
      tag: withholding_stock

基础依赖代码库

/**
 * Spring Context 工具类
 *
 * @author vnjohn
 */
@Component
public class SpringContextUtils implements ApplicationContextAware {
    public final static String SPRING_CONTEXT_UTILS_COMPONENT = "springContextUtils";
    public static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextUtils.applicationContext = applicationContext;
    }

    /**
     * 获取HttpServletRequest
     */
    public static HttpServletRequest getHttpServletRequest() {
        return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
    }

    public static HttpServletResponse getHttpServletResponse() {
        return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
    }

    public static Object getBean(String name) {
        return applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> c) {
        return applicationContext.getBean(c);
    }

    public static <T> Map<String, T> getBeanOfType(Class<T> c) {
        return applicationContext.getBeansOfType(c);
    }

    public static <T> T getBean(String name, Class<T> requiredType) {
        return applicationContext.getBean(name, requiredType);
    }

    public static boolean containsBean(String name) {
        return applicationContext.containsBean(name);
    }

    public static boolean isSingleton(String name) {
        return applicationContext.isSingleton(name);
    }

    public static Class<? extends Object> getType(String name) {
        return applicationContext.getType(name);
    }

    /**
     * 获取当前环境
     */
    public static String getActiveProfile() {
        return applicationContext.getEnvironment().getActiveProfiles()[0];
    }
}

模块代码

订单服务

订单 DO 实体

@Data
@TableName("`order`")
@EqualsAndHashCode(callSuper = false)
public class OrderDO {
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 订单金额
     */
    private Long amount;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 用户id
     */
    private Long userId;

}

订单数据库映射层

/**
 * @author vnjohn
 * @since 2023/11/15
 */
public interface OrderMapper extends BaseMapper<OrderDO> {
}

订单仓储层

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Component
public class OrderRepository {
    @Resource
    private OrderMapper orderMapper;

    public Order queryByOrderNo(String orderNo) {
        OrderDO orderDO = orderMapper.selectOne(new QueryWrapper<OrderDO>()
                .lambda().eq(OrderDO::getOrderNo, orderNo));
        return BeanUtils.copy(orderDO, Order.class);
    }
}

库存服务

库存 DO 实体

/**
 * @author vnjohn
 * @since 2023/11/15
 */
@Data
@TableName("`stock`")
@EqualsAndHashCode(callSuper = false)
public class StockDO {
    @TableId(type = IdType.AUTO)
    private Long id;

    private Long skuId;

    private Integer lockStock;

    private Integer stock;

}

库存数据库映射层

/**
 * @author vnjohn
 * @since 2023/11/15
 */
public interface StockMapper extends BaseMapper<StockDO> {
}

库存仓储层

/**
 * @author vnjohn
 * @since 2023/11/15
 */
@Slf4j
@Component
public class StockRepository {
    @Resource
    private StockMapper stockMapper;

    public void preDecreaseStock(String orderNo, Long skuId) {
        // 订单号与 SkuId 可用于做日志记录,这里默认给它数量记为 1
        log.info("订单号:{}", orderNo);
        StockDO stockDO = stockMapper.selectOne(new QueryWrapper<StockDO>()
                .lambda().eq(StockDO::getSkuId, skuId));
        if (null == stockDO) {
            return;
        }
        int currentLockStock = stockDO.getLockStock() + 1;
        stockDO.setLockStock(currentLockStock);
        // 此处最好采用乐观锁+ CAS 方式进行更新
        stockMapper.updateById(stockDO);
    }
}

生成订单

订单领域实体

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Data
public class Order {
    /**
     * id
     */
    private Long id;

    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 订单金额
     */
    private Long amount;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 用户id
     */
    private Long userId;

}

创建订单实体

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Data
public class CreateOrder {
    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 订单金额
     */
    private Long amount;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 用户id
     */
    private Long userId;

}

创建订单领域执行器

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Component
public class OrderCreateHandler {
    @Resource
    private OrderMapper orderMapper;

    public Boolean handle(CreateOrder order) {
        // 在这里模拟订单异常或创建系统
        // 1、订单创建逻辑,涉及到表结构及数据会比较多,这里不做多阐述
        String orderNo = order.getOrderNo();
        return orderMapper.insert(BeanUtils.copy(order, OrderDO.class)) > 0;
    }

}

RocketMQ 事务消息

订单服务生产者

首先要在订单服务能够投递事务消息,应该先实例化一个事务生产者

事务生产者实例

/**
 * 订单服务专门用来提供事务消息的生产者
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
@Component
public class OrderTransactionProducer {
    @Value("${rocketmq.transaction.producer}")
    private String transactionProducerName;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    private static TransactionMQProducer PRODUCER;

    /**
     * 获取事务生产者实例对象
     *
     * @return 事务生产者实例
     */
    public static TransactionMQProducer getInstance() {
        return PRODUCER;
    }

    /**
     * 若未定义线程,检测事务半消息的线程默认只有一个,当同时出现多条事务半消息需要检测时,就退化为队列的方式进行入队,要进行排队处理,从而降低了并发、并行数
     *
     * <p>
     *  public TransactionMQProducer(String namespace, String producerGroup, RPCHook rpcHook) {
     *          super(namespace, producerGroup, rpcHook);
     *          this.checkThreadPoolMinSize = 1;
     *          this.checkThreadPoolMaxSize = 1;
     *          this.checkRequestHoldMax = 2000;
     *  }
     * </p>
     */
    @PostConstruct
    public void initTransactionProducer() {
        try {
            PRODUCER = new TransactionMQProducer(transactionProducerName);
            PRODUCER.setNamesrvAddr(namesrvAddr);
            PRODUCER.setTransactionListener(new OrderTransactionListener());
            // 自定义线程池处理,用于追踪消息投递时的日志,能够追踪到具体的投放线程,所必要参数,不设置采用的是默认的线程池
//          producer.setExecutorService();
            PRODUCER.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            log.error("创建事务生产者异常,", e);
        }
    }
}

事务生产者监听器

与事务生产者必须绑定好的一个关键>监听器,用这个监听器来判别如何做事务消息的后续处理

/**
 * 订单服务「本地事务消息-半消息」处理
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
public class OrderTransactionListener implements TransactionListener {
    /**
     * unknow 消息最大重试次数设置为 3,当超过 3 次以后进行默认成功且记录到本地日志表中
     */
    private static Integer MAX_RETRY_TIME = 3;

    /**
     * 用于存储本地事务执行的结果:事务id->订单id
     */
    private ConcurrentHashMap<String, String> TRANSACTION_ORDER_MAP = new ConcurrentHashMap<>();

    /**
     * 用于存储本地事务检查次数的结果:事务id->check 次数
     * 源码中会默认检查为 15 次,一次的时间间隔为 6s,由于那个属于全局的配置
     * 在这里可自定义适配次数,时间还是按照默认的配置来进行处理
     */
    private ConcurrentHashMap<String, Integer> UNKNOW_TRANSACTION_CHECK_MAP = new ConcurrentHashMap<>();


    /**
     * 执行本地事务的处理逻辑
     *
     * @param message 待发送的消息内容
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 当前执行的事务 id
        String transactionId = message.getTransactionId();
        CreateOrder createOrder = (CreateOrder) o;
        TRANSACTION_ORDER_MAP.put(transactionId, createOrder.getOrderNo());
        // TODO 捕获创建订单后的执行结果
        try {
            // 伪代码创建订单
            OrderRepository orderRepository = SpringContextUtils.getBean(OrderRepository.class);
            Order order = orderRepository.queryByOrderNo(createOrder.getOrderNo());
            // 订单已存在,不再做处理
            if (Objects.nonNull(order)) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            OrderCreateHandler orderCreateHandler = SpringContextUtils.getBean(OrderCreateHandler.class);
            Boolean handleResult = orderCreateHandler.handle(createOrder);
            if (handleResult) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        } catch (Exception orderException) {
            // 消息进行回滚,在消费者那一侧是无法观察此消息的
            log.error("创建订单出现异常,", orderException);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 由于网络问题,导致非业务异常,可能是订单已经写入数据库成功了没有及时地去处理事务消息状态
        // 在这里需要去进行 check 检查本地事务是否执行有误
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 事务 producer 会从 broker 获取到未处理的事务消息列表,进行依次处理
     * 检查本地事务状态,当出现「事务消息生产者」宕机时,该方法仍然会对未处理的事务消息进行检测
     * 对于 unknow 消息,会 1s 进行一次定期处理,该参数可调整
     *
     * @param messageExt 消息扩展类信息
     * @return 本地事务执行状态
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 检查本地事务表是否存在订单号
        String transactionId = messageExt.getTransactionId();
        log.info("checkLocalTransaction transactionId:{}", transactionId);
        String orderNo = TRANSACTION_ORDER_MAP.get(transactionId);
        // unknow 消息进行重试三次,超出后不再做处理,不然 unknow 消息会一直在控制台打印进行处理,视为无效的工作
        // 增加补偿机制,用于处理 unknow 重试的消息,当重试的消息是提交或回滚状态,则调用相关的方法进行处理
        Integer checkCount = UNKNOW_TRANSACTION_CHECK_MAP.get(transactionId);
        if (Objects.isNull(checkCount) || checkCount < MAX_RETRY_TIME) {
            checkCount = Objects.isNull(checkCount) ? 1 : ++checkCount;
            log.info("transactionId-{},check 检查次数:{}", transactionId, checkCount);
            UNKNOW_TRANSACTION_CHECK_MAP.put(transactionId, checkCount);
            return LocalTransactionState.UNKNOW;
        }
        // 检查次数超出预定阈值,可记录到日志
        if (checkCount.equals(MAX_RETRY_TIME)) {
            log.info("transactionId-{},check 检查次数超出", transactionId);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 反查订单数据当前订单是否已经创建
        OrderRepository orderRepository = SpringContextUtils.getBean(OrderRepository.class);
        Order order = orderRepository.queryByOrderNo(orderNo);
        if (Objects.nonNull(order)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

}

事务消息实体

/**
 * 订单库存消息体
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Data
public class OrderWithStockMessage {
    /**
     * 订单编号
     */
    private String orderNo;

    /**
     * 商品id
     */
    private Long skuId;

    /**
     * 数量
     */
    private Integer quantity;

}

事务消息投递

/**
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
@Component
public class OrderUnifiedCommandHandler {

    public void handler(String orderNo) {
        // 1、订单数据幂等校验成功
        // 2、调用生产者发送事务半消息
        // 假设接收到订单支付已完成的标识
        OrderWithStockMessage stockMessageBody = new OrderWithStockMessage();
        stockMessageBody.setSkuId(1001L);
        stockMessageBody.setQuantity(2);
        stockMessageBody.setOrderNo(orderNo);
        Message stockMessage = new Message(
                "order_transaction",
                "withholding_stock",
                UUID.randomUUID().toString(),
                JsonUtils.objToJsonStr(stockMessageBody).getBytes()
        );
        CreateOrder order = new CreateOrder();
        order.setSkuId(1001L);
        order.setAmount(100L);
        order.setUserId(888L);
        order.setOrderNo(orderNo);

        // putUserProperty 该方法,通过网络传递给 consumer 一些用户自定义参数,可以用来校验做其他的业务逻辑处理
        // stockMessage.putUserProperty("action", );

        // 发送的是半消息
        // Message msg, Object arg
        // 第一个参数:本地事务处理成功以后,需要进行发送的消息体内容
        // 第二个参数:作为本地事务用于检测或执行本地事务时的对象体
        try {
            TransactionSendResult transactionSendResult = OrderTransactionProducer.getInstance().sendMessageInTransaction(stockMessage, order);
            log.info("事务执行结果:{}", JsonUtils.objToJsonStr(transactionSendResult.getLocalTransactionState()));
        } catch (MQClientException e) {
            log.error("订单预生成,事务半消息 send fail,", e);
        }
    }
}

库存服务消费者

/**
 * 订单消费者,消费来自订单服务投递的消息
 *
 * @author vnjohn
 * @since 2023/11/2
 */
@Slf4j
@Component
public class CreateOrderPreStockConsumer {
    @Value("${rocketmq.consumer-group}")
    private String consumerGroup;

    @Value("${rocketmq.namesrv-addr}")
    private String namesrvAddr;

    @Value("${rocketmq.order.create.topic}")
    private String orderCreateTopic;

    @Value("${rocketmq.order.create.tag}")
    private String orderCreateTag;

    @PostConstruct
    public void initCreateOrderConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        try {
            consumer.subscribe(orderCreateTopic, orderCreateTag);
            consumer.registerMessageListener(new CreateOrderStockMessageListener());
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }


    public static class CreateOrderStockMessageListener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            MessageExt messageExt = list.get(0);
            String msgId = messageExt.getMsgId();
            byte[] body = messageExt.getBody();
            String bodyValue = new String(body);
            log.info("当前订单创建成功,预扣减库存信息:{}", bodyValue);
            String orderNo = JSON.parseObject(bodyValue).getString("orderNo");
            Long skuId = JSON.parseObject(bodyValue).getLong("skuId");
            StockRepository stockRepository = SpringContextUtils.getBean(StockRepository.class);
            stockRepository.preDecreaseStock(orderNo, skuId);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

Tips

采用两阶段提交的事务消息,先提交一个半消息,然后执行本地事务,再发送一个 commit 的半消息;若这个 commit 半消息失败了,MQ 是基于第一个半消息不断的反查本地事务执行状态来进行后续流程的推进的,这样只有当本地事务提交成功,最终 MQ 消息也会发送成功,若本地事务 rollback,那么 MQ 消息不会再进行发送,会标记这条半消息的状态为「已处理」从而保证了两者之间的一致性

执行本地事务方法+本地事务定时检查,结合起来来保证事务消息执行的一致性

总结

该篇博文主要是通过实际的业务代码来进行 RocketMQ 事务消息实战,上一篇博文从 RocketMQ 事务消息的整体设计以及相关的源码的讲解,这篇通过订单生成、库存预扣减的简单例子来对事务消息的这块流程进行细粒化的业务设计,事务消息生产者的本地事务消息与补偿事务消息结合起来保证订单创建成功以后,库存才进行预扣减,希望这篇简单的 RocketMQ 事务消息实战博文能够帮助到您理解事务消息的实际应用,期待三连支持!

🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!

博文放在 微信体系 专栏里,欢迎订阅,会持续更新!

如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

推荐专栏:Spring、MySQL,订阅一波不再迷路

大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/153246.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2023.11.16 hivesql高阶函数之json

目录 1.数据准备 2.操作 -- 方式1: 逐个(字段)处理, get_json_object UDF函数 最大弊端是一次只能解析提取一个字段 -- 方式2: 逐条处理. json_tuple 这是一个UDTF函数 可以一次解析提取多个字段 -- 方式3: 在建表时候, 直接处理json, row format SerDe 能处理Json的SerDe类…

申请国外博士后如何写好推荐信

在国外博士后申请过程中&#xff0c;推荐信是至关重要的一环。一封优秀的推荐信不仅能够突显申请者的学术实力和专业背景&#xff0c;更能够展示其在研究领域的潜力和个人素质。以下是知识人网小编整理的一些写好推荐信的建议&#xff0c;希望对申请者有所帮助。 1.选择合适的推…

gd32 USB HOST 接口

接口 CPU引脚 复用 DM PB14 USBHS_DM AF12 DP PB15 USBHS_DP AF12

提高APP安全性的必备加固手段——深度解析代码混淆技术

​ APP 加固方式 iOSAPP 加固是优化 APK 安全性的一种方法&#xff0c;常见的加固方式有混淆代码、加壳、数据加密、动态加载等。下面介绍一下 iOSAPP 加固的具体实现方式。 混淆代码&#xff1a; 使用 ProGuard 工具可以对代码进行混淆&#xff0c;使得反编译出来的代码很难…

【机器学习9】前馈神经网络

深度前馈网络是一类网络模型的统称&#xff0c;主要包括多层感知机、 自编码器、限制玻尔兹曼机&#xff0c; 以及卷积神经网络等。 1 激活函数 激活函数及对应导函数图其它Sigmoid 导数 在z很大或很小时都会趋近于0&#xff0c; 造成梯度消失的现象Tanh 其导数在z很大或很小…

德迅云安全和您聊聊关于DDOS高防ip的一些方面

德迅DDoS防护服务是以省骨干网的DDoS防护网络为基础&#xff0c;结合德迅自研的DDoS攻击检测和智能防护体系&#xff0c;向您提供可管理的DDoS防护服务&#xff0c;自动快速的缓解网络攻击对业务造成的延迟增加&#xff0c;访问受限&#xff0c;业务中断等影响&#xff0c;从而…

智能运维监控告警6大优势

随着云计算和互联网的高速发展&#xff0c;大量应用需要横跨不同网络终端&#xff0c;并广泛接入第三方服务(如支付、登录、导航等)&#xff0c;IT系统架构越来越复杂。 快速迭代的产品需求和良好的用户体验&#xff0c;需要IT运维管理者时刻保障核心业务稳定可用&#xff0c;…

.NET开源全面方便的第三方登录组件集合 - MrHuo.OAuth

前言 我相信做开发的同学应该都对接过各种各样的第三方平台的登录授权&#xff0c;来获取用户信息&#xff08;如&#xff1a;微信登录、支付宝登录、QQ登录、GitHub登录等等&#xff09;。今天给大家推荐一个.NET开源好用的、全面的、方便第三方登录组件集合框架&#xff1a;M…

新版软考高项试题分析精选(四)

请点击↑关注、收藏&#xff0c;本博客免费为你获取精彩知识分享&#xff01;有惊喜哟&#xff01;&#xff01; 1、一般而言&#xff0c;大型软件系统中实现数据压缩功能&#xff0c;工作在OSI参考模型的&#xff08; &#xff09;。 A.应用层 B.表示层 C.会话层 D.网络层…

Java 集合框架,泛型,包装类

文章目录 集合框架泛型Java 中的泛型裸类型&#xff08;了解&#xff09;原理泛型的上界泛型方法通配符 包装类ArrayList构造常见操作 LinkedListStackQueuePriorityQueueMapMap.Entry<K, V>Map 常用方法 Set常用方法 集合框架 Vector 一个古老的集合类&#xff0c;实现了…

±15kV ESD 保护、3V-5.5V 供电、真 RS-232 收发器MS2232/MS2232T

产品简述 MS2232/MS2232T 芯片是集成电荷泵&#xff0c;具有 15kV ESD 保护的 RS-232 收发器&#xff0c;包括两路接收器、两路发送器。 芯片满足 TIA/EIA-232 标准&#xff0c;为异步通信控制器和串口连 接器提供通信接口。 芯片采用 3V-5.5V 供电&#xff0c;电荷泵仅用…

快速生成力扣链表题的链表,实现快速调试

关于力扣链表题需要本地调试创建链表的情况 我们在练习链表题&#xff0c;力扣官方需要会员&#xff0c;我们又不想开会员&#xff0c;想在本地调试给你们提供的代码 声明&#xff1a;本人也是参考的别人的代码&#xff0c;给你们提供不同语言生成链表 参考链接&#xff1a; 参…

最终方案(乱)

为什么要在mos管上并一个快恢复二极管 因为电机成感性&#xff0c;为了在关断期间给它提供一个续流回路

Linux 爱好者线下沙龙:成都场圆满结束 下一场西子湖畔相见 | LLUG·第五站

导读&#xff1a;第四站 LLUG成都场已于10 月 29 日在武侯区菁蓉汇成功举办。LLUG 第五站将于11 月 25 日走进美丽的西子湖畔&#xff0c;在这个冬日&#xff0c;LLUG 与你在杭州线下相见。 10 月 29 日&#xff0c;LLUG 成都场成功在武侯区菁蓉汇举办。 LLUG成都站由 Linux 中…

Redhat8.3上部署Lustre文件系统

Lustre文件系统 Lustre架构是用于集群的存储架构。Lustre架构的核心组件是Lustre文件系统&#xff0c;它在Linux操作系统上得到支持&#xff0c;并提供了一个符合POSIX *标准的UNIX文件系统接口。 Lustre存储架构用于许多不同类型的集群。它以支持世界上许多最大的拥有数万个…

2023数维杯国际赛数学建模竞赛选题建议及D题思路讲解

大家好呀&#xff0c;2023年第九届数维杯国际大学生数学建模挑战赛今天早上开赛啦&#xff0c;在这里先带来初步的选题建议及思路。 目前团队正在写B题和D题完整论文&#xff0c;后续还会持续更新哈&#xff0c;大家三连关注一下防止迷路。 注意&#xff0c;本文只是比较简略…

clip4clip:an empirical study of clip for end to end video clip retrieval

广告深度学习计算&#xff1a;阿里妈妈智能创意服务优化使用CPU/GPU分离的多进程架构&#xff0c;加速阿里妈妈智能创意服务。https://mp.weixin.qq.com/s/_pjhXrUZVzFRtiwG2LhnkwCLIP4Clip: CLIP 再下一城&#xff0c;利用CLIP实现视频检索 - 知乎前言&#xff1a; OpenAI 的论…

ubuntu 20.04安装 Anaconda教程

在安装Anaconda之前需要先安装ros(防止跟conda冲突&#xff0c;先装ros)。提前安装好cuda 和cudnn。 本博客参考&#xff1a;ubuntu20.04配置ros noetic和cuda&#xff0c;cudnn&#xff0c;anaconda&#xff0c;pytorch深度学习的环境 安装完conda后&#xff0c;输入: pyth…

CCRC认证是什么?

什么是CCRC认证&#xff1f; 信息安全服务资质&#xff0c;是信息安全服务机构提供安全服务的一种资格&#xff0c;包括法律地位、资源状况、管理水平、技术能力等方面的要求。 信息安全服务资质&#xff08;CCRC&#xff09;是依据国家法律法规、国家标准、行业标准和技术规范…

快手怎么涨粉最快?10个实用方法让你迅速积累粉丝

先来看实操成果&#xff0c;↑↑需要的同学可看我名字↖↖↖↖↖&#xff0c;或评论888无偿分享 各位知友们&#xff0c;大家好&#xff01;今天我来分享一些在快手涨粉的实用方法&#xff0c;让你迅速积累粉丝。如果你还没有注册快手账号&#xff0c;那么现在就赶紧去下载注册…