RocketMQ 中如何实现消息的可靠传递?

引言

作为头部消息队列开源中间件,学习其中的技术方案并且总结可靠性和健壮性,提升我们的架构思维和解决问题的能力 。

在 RocketMQ 中实现消息的可靠传递可以从多个方面入手,涵盖生产者、Broker 以及消费者等不同环节。

 

生产者端

1. 同步发送消息

生产者使用同步发送模式时,会等待 Broker 返回发送结果,确保消息成功发送到 Broker 才会继续后续操作。若发送失败,生产者可以进行重试。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class SyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes("UTF-8"));
        try {
            // 同步发送消息
            producer.send(msg);
        } catch (Exception e) {
            // 发送失败,可进行重试等处理
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

2. 重试机制

生产者在发送消息失败时,可配置重试次数。RocketMQ 支持自动重试,当遇到网络抖动、Broker 临时不可用等情况时,会自动尝试重新发送消息。

producer.setRetryTimesWhenSendFailed(3); // 设置发送失败时的重试次数为 3 次

3. 消息幂等性处理

为避免因重试导致消息重复发送,生产者可以为每条消息生成唯一的 ID。Broker 在接收消息时,会根据消息 ID 进行去重处理,确保相同 ID 的消息只被处理一次。

Broker 端

1. 刷盘策略

  • 同步刷盘:当 Broker 收到消息后,会先将消息写入磁盘,再返回响应给生产者。这种策略保证了消息不会因 Broker 异常重启而丢失,但会降低系统的吞吐量。
    flushDiskType = SYNC_FLUSH
  • 异步刷盘:Broker 收到消息后,先将消息写入内存缓冲区,然后立即返回响应给生产者,由专门的线程将消息异步写入磁盘。这种策略性能较高,但在 Broker 异常崩溃时,可能会丢失部分内存中的消息。

    2. 主从复制

    RocketMQ 支持主从复制架构,主 Broker 接收消息后,会将消息同步复制到从 Broker。当主 Broker 出现故障时,可以切换到从 Broker 继续提供服务,保证消息的可用性。

    brokerRole = SYNC_MASTER # 主 Broker 配置为同步主节点
    brokerRole = SLAVE # 从 Broker 配置为从节点

    消费者端

    1. 手动提交消费偏移量

    消费者在处理完消息后,手动向 Broker 提交消费偏移量,确保只有在消息处理成功后才更新消费进度。这样,当消费者出现异常时,可以从上次提交的偏移量处继续消费,避免消息丢失。

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class ManualCommitConsumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.subscribe("TopicTest", "*");
    
            // 手动提交消费偏移量
            consumer.setAutoCommit(false);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        try {
                            // 处理消息
                            System.out.println(new String(msg.getBody()));
                        } catch (Exception e) {
                            // 处理失败,返回重试
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        }
                    }
                    // 手动提交消费偏移量
                    context.setAckIndex(msgs.size() - 1);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
        }
    }

    2. 消费重试机制

    当消费者处理消息失败时,RocketMQ 会自动进行重试。消费者可以根据业务需求,设置重试次数和重试间隔,确保消息能够被成功处理。

    3. 幂等消费

    消费者在处理消息时,要保证消息的幂等性,即多次处理相同的消息不会产生额外的影响。可以通过消息 ID 或业务唯一标识来判断消息是否已经处理过,避免重复处理。

总结

  1. 持久化策略:内存注定是不可靠的,刷盘一定是可靠性首选,但是刷盘导致的IO延时如何优化,是评判中间件性能的关键。
  2. 重试机制:3次重试应该是各个开源框架的默认重试次数。
  3. 集群化策略:单个节点注定不是高可用的最终形态,主从复制多节点可靠是最终态。
  4. 幂等机制:保持消息的重复消费可靠性,幂等键或者其他策略都是可参考的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/961648.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

K8S中高级存储之PV和PVC

高级存储 PV和PVC 由于kubernetes支持的存储系统有很多&#xff0c;要求客户全都掌握&#xff0c;显然不现实。为了能够屏蔽底层存储实现的细节&#xff0c;方便用户使用&#xff0c; kubernetes引入PV和PVC两种资源对象。 PV&#xff08;Persistent Volume&#xff09; PV是…

IVD设备-GB4793.1 安规理解笔记

IVD设备-GB4793.1 安规理解笔记 参考国标文档 GB4793.1接地电阻试验试验通过的标准 耐压试验试验通过的标准 浮地危险带电部分与可触及导电零部件之间耐压试验通过的标准 参考国标文档 GB4793.1 接地电阻试验 图1GB 4793.1-2007标准 附录F.2 ⌘根据F.1要求内容是关保护接地电…

“AI视频智能分析系统:让每一帧视频都充满智慧

嘿&#xff0c;大家好&#xff01;今天咱们来聊聊一个特别厉害的东西——AI视频智能分析系统。想象一下&#xff0c;如果你有一个超级聪明的“视频助手”&#xff0c;它不仅能自动识别视频中的各种元素&#xff0c;还能根据内容生成详细的分析报告&#xff0c;是不是感觉特别酷…

002-基于Halcon的图像几何变换

本节将简要介绍Halcon中有关图像几何变换的基本算子及其应用&#xff0c;主要涉及五种常见的二维几何变换形式&#xff1a;平移、镜像、旋转、错切和放缩。这几种变换可归结为一类更高级更抽象的空间变换类型&#xff0c;即仿射变换&#xff08;Affine transformation&#xff…

七、深入了解SpringBoot的配置文件

一、配置端口号 通过配置文件application.properties配置修改端口号 修改 application.properties 文件 #端口号修改成 9090 server.port9090运行结果&#xff0c;观察日志 二、配置文件格式 Spring Boot 配置⽂件有以下三种&#xff1a; • application.properties • ap…

【Kubernetes】Pod生命周期、初始化容器、主容器

一、Pod生命周期 Pod从创建到终止退出的时间范围称为Pod生命周期。 1、生命周期重要流程 创建基础容器&#xff08;pause container&#xff09;初始化容器&#xff08;init-X Containers&#xff09;主容器&#xff08;container&#xff09;启动后的钩子(post-start)启动探…

网络爬虫学习:应用selenium获取Edge浏览器版本号,自动下载对应版本msedgedriver,确保Edge浏览器顺利打开。

一、前言 我从24年11月份开始学习网络爬虫应用开发&#xff0c;经过2个来月的努力&#xff0c;于1月下旬完成了开发一款网络爬虫软件的学习目标。这里对本次学习及应用开发进行一下回顾总结。 前几天我已经发了一篇日志&#xff08;网络爬虫学习&#xff1a;应用selenium从搜…

python学opencv|读取图像(四十九)原理探究:使用cv2.bitwise()系列函数实现图像按位运算

【0】基础定义 按位与运算&#xff1a;两个等长度二进制数上下对齐&#xff0c;全1取1&#xff0c;其余取0。 按位或运算&#xff1a;两个等长度二进制数上下对齐&#xff0c;有1取1&#xff0c;其余取0。 按位异或运算&#xff1a; 两个等长度二进制数上下对齐&#xff0c;相…

U盘打开提示格式化:深度解析与数据恢复全攻略

在数字化时代&#xff0c;U盘作为便捷的数据存储和传输工具&#xff0c;广泛应用于各个领域。然而&#xff0c;当我们满怀期待地插入U盘&#xff0c;却遭遇“U盘打开提示格式化”的尴尬局面时&#xff0c;那份焦虑与无助感油然而生。本文将全面剖析U盘打开提示格式化的原因、应…

将5分钟安装Thingsboard 脚本升级到 3.9

稍微花了一点时间&#xff0c;将5分钟安装Thingsboard 脚本升级到最新版本 3.9。 [rootlab5 work]# cat one-thingsboard.shell echo "test on RHEL 8.10 " source /work/java/install-java.shell source /work/thingsboard/thingsboard-rpm.shell source /work/po…

【新春不断更】题海拾贝:P1878 舞蹈课

Hello大家好&#xff01;很高兴我们又见面啦&#xff01;给生活添点passion&#xff0c;开始今天的编程之路&#xff01; 我的博客&#xff1a;<但凡. 我的专栏&#xff1a;《编程之路》、《数据结构与算法之美》、《题海拾贝》 欢迎点赞&#xff0c;关注&#xff01; 1、题…

Windows 程序设计6:错误码的查看

文章目录 前言一、说明二、使用GetLastError找到错误的原因三、使用错误码的宏总结 前言 Windows 程序设计6&#xff1a;错误码的查看。 一、说明 有时写的代码单纯看是没有问题的&#xff0c;但是执行起来就会崩溃。因此要养成判断函数执行是否成功的习惯&#xff0c;除非这…

[STM32 - 野火] - - - 固件库学习笔记 - - -十三.高级定时器

一、高级定时器简介 高级定时器的简介在前面一章已经介绍过&#xff0c;可以点击下面链接了解&#xff0c;在这里进行一些补充。 [STM32 - 野火] - - - 固件库学习笔记 - - -十二.基本定时器 1.1 功能简介 1、高级定时器可以向上/向下/两边计数&#xff0c;还独有一个重复计…

安装zsh并美化

0 Zsh 是一种功能强大的 shell&#xff0c;通常用于替代默认的 Bash shell。它为命令行提供了更多的功能&#xff0c;例如自动补全、强大的模式匹配和主题支持等。 Oh My Zsh 是用于管理 Zsh 配置的框架。 powerlevel10k是样式&#xff0c;通过p10k configure脚本可以调节自己…

Hive:复杂数据类型之Map函数

Map函数 是Hive里面的一种复杂数据类型, 用于存储键值对集合。Map中的键和值可以是基础类型或复合类型&#xff0c;这使得Map在处理需要关联存储信息的数据时非常有用。 定义map时,需声明2个属性: key 和 value , map中是 key value 组成一个元素 key-value, key必须为原始类…

WPF基础 | 深入 WPF 事件机制:路由事件与自定义事件处理

WPF基础 | 深入 WPF 事件机制&#xff1a;路由事件与自定义事件处理 一、前言二、WPF 事件基础概念2.1 事件的定义与本质2.2 常见的 WPF 事件类型 三、路由事件3.1 路由事件的概念与原理3.2 路由事件的三个阶段3.3 路由事件的标识与注册3.4 常见的路由事件示例 四、自定义事件处…

Sklearn 中的逻辑回归

逻辑回归的数学模型 基本模型 逻辑回归主要用于处理二分类问题。二分类问题对于模型的输出包含 0 和 1&#xff0c;是一个不连续的值。分类问题的结果一般不能由线性函数求出。这里就需要一个特别的函数来求解&#xff0c;这里引入一个新的函数 Sigmoid 函数&#xff0c;也成…

【Rust自学】14.6. 安装二进制crate

喜欢的话别忘了点赞、收藏加关注哦&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 14.6.1. 从cratea.io安装二进制crate 通过cargo_install命令可以从crates.io安装二进制crate。 这并不是为了替换系统包&#xff0c;它应…

Vue 组件开发:构建高效可复用的前端界面要素

1 引言 在现代 Web 开发中,构建高效且可复用的前端界面要素是提升开发效率和用户体验的关键。Vue.js 作为一种轻量级且功能强大的前端框架,提供了丰富的工具和机制,帮助开发者快速构建高质量的应用程序。通过合理设计和封装 Vue 组件,我们可以实现组件的高效复用,提高开发…

Qt Ribbon使用实例

采用SARibbon创建简单的ribbon界面 实例代码如下所示&#xff1a; 1、头文件&#xff1a; #pragma once #include <SARibbonBar.h> #include "SARibbonMainWindow.h" class QTextEdit; class SAProjectDemo1 : public SARibbonMainWindow { Q_OBJECT pub…