RocketMQ面试题:进阶部分

🧑 博主简介:CSDN博客专家历代文学网(PC端可以访问:https://literature.sinhy.com/#/?__c=1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,精通Java编程高并发设计Springboot和微服务,熟悉LinuxESXI虚拟化以及云原生Docker和K8s,热衷于探索科技的边界,并将理论知识转化为实际应用。保持对新技术的好奇心,乐于分享所学,希望通过我的实践经历和见解,启发他人的创新思维。在这里,我希望能与志同道合的朋友交流探讨,共同进步,一起在技术的世界里不断学习成长。
技术合作请加本人wx(注明来自csdn):foreast_sea

在这里插入图片描述


在这里插入图片描述

RocketMQ面试题:进阶部分

1. 如何保证消息的可用性/可靠性/不丢失呢?

消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失:生产阶段、存储阶段、消费阶段。

所以要从这三个阶段考虑:

在这里插入图片描述

1.1 生产

在生产阶段,主要通过请求确认机制,来保证消息的可靠传递

  • 1、同步发送的时候,要注意处理响应结果和异常。如果返回响应 OK,表示消息成功发送到了 Broker,如果响应失败,或者发生其它异常,都应该重试。
  • 2、异步发送的时候,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。
  • 3、如果发生超时的情况,也可以通过查询日志的 API,来检查是否在 Broker 存储成功。
1.2 存储

存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步。

  • 1、消息只要持久化到 CommitLog(日志文件)中,即使 Broker 宕机,未消费的消息也能重新恢复再消费。
  • 2、Broker 的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在 pagecache 中(内存中),但是同步刷盘更可靠,它是 Producer 发送消息后等数据持久化到磁盘之后再返回响应给 Producer。

在这里插入图片描述

  • 3、Broker 通过主从模式来保证高可用,Broker 支持 Master 和 Slave 同步复制、Master 和 Slave 异步复制模式,生产者的消息都是发送给 Master,但是消费既可以从 Master 消费,也可以从 Slave 消费。同步复制模式可以保证即使 Master 宕机,消息肯定在 Slave 中有备份,保证了消息不会丢失。
1.3 消费

从 Consumer 角度分析,如何保证消息被成功消费?

  • Consumer 保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。

2. 如何处理消息重复的问题呢?

RocketMQ 可以保证消息一定投递,且不丢失,但无法保证消息不重复消费。

因此,需要在业务端做好消息的幂等性处理,或者做消息去重。

在这里插入图片描述

幂等性是指一个操作可以执行多次而不会产生副作用,即无论执行多少次,结果都是相同的。可以在业务逻辑中加入检查逻辑,确保同一消息多次消费不会产生副作用。

例如,在支付场景下,消费者消费扣款的消息,对一笔订单执行扣款操作,金额为100元。

如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果要保证只扣款一次,金额为100元。如果扣款操作是符合要求的,那么就可以认为整个消费过程实现了消息幂等。

消息去重,是指在消费者消费消息之前,先检查一下是否已经消费过这条消息,如果消费过了,就不再消费。

业务端可以通过一个专门的表来记录已经消费过的消息 ID,每次消费消息之前,先查询一下这个表,如果已经存在,就不再消费。

public void processMessage(String messageId, String message) {
    if (!isMessageProcessed(messageId)) {
        // 处理消息
        markMessageAsProcessed(messageId);
    }
}

private boolean isMessageProcessed(String messageId) {
    // 查询去重表,检查消息ID是否存在
}

private void markMessageAsProcessed(String messageId) {
    // 将消息ID插入去重表
}
2.1 如何保证消息的幂等性?

在这里插入图片描述

首先,消息必须携带业务唯一标识,可以通过雪花算法生成全局唯一 ID。

Message msg = new Message(TOPIC /* Topic */,
             TAG /* Tag */,
               ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
             );
             
message.setKey("ORDERID_100"); // 订单编号
SendResult sendResult = producer.send(message);      

其次,在消费者接收到消息后,判断 Redis 中是否存在该业务主键的标志位,若存在标志位,则认为消费成功,否则执行业务逻辑,执行完成后,在缓存中添加标志位。

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
           String bizKey = messageExt.getKeys(); // 唯一业务主键
           //1. 判断是否存在标志
           if(redisTemplate.hasKey(RedisKeyConstants.WAITING_SEND_LOCK + bizKey)) {
         			continue;
       		 }
         	 //2. 执行业务逻辑
           //TODO do business
           //3. 设置标志位
           redisTemplate.opsForValue().set(RedisKeyConstants.WAITING_SEND_LOCK + bizKey, "1", 72, TimeUnit.HOURS);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

然后,利用数据库的唯一索引来防止业务的重复插入。

CREATE TABLE `t_order` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `order_id` varchar(64) NOT NULL COMMENT '订单编号',
  `order_name` varchar(64) NOT NULL COMMENT '订单名称',
  PRIMARY KEY (`id`),
  UNIQUE KEY `order_id` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';

最后,在数据库表中使用版本号,通过乐观锁机制来保证幂等性。每次更新操作时检查版本号是否一致,只有一致时才执行更新并递增版本号。如果版本号不一致,则说明操作已被执行过,拒绝重复操作。

public void updateRecordWithOptimisticLock(int id, String newValue, int expectedVersion) {
    int updatedRows = jdbcTemplate.update(
        "UPDATE records SET value = ?, version = version + 1 WHERE id = ? AND version = ?",
        newValue, id, expectedVersion
    );
    if (updatedRows == 0) {
        throw new OptimisticLockingFailureException("Record has been modified by another transaction");
    }
}

或者悲观锁机制,通过数据库的锁机制来保证幂等性。

public void updateRecordWithPessimisticLock(int id) {
    jdbcTemplate.queryForObject("SELECT * FROM records WHERE id = ? FOR UPDATE", id);
    jdbcTemplate.update("UPDATE records SET value = ? WHERE id = ?", "newValue", id);
}
2.2 雪花算法了解吗?

雪花算法是由 Twitter 开发的一种分布式唯一 ID 生成算法。

在这里插入图片描述

雪花算法以 64 bit 来存储组成 ID 的4 个部分:

  1. 最高位占1 bit,始终为 0,表示正数。
  2. 中位占 41 bit,值为毫秒级时间戳;
  3. 中下位占 10 bit,机器 ID(包括数据中心 ID 和机器 ID),可以支持 1024 个节点。
  4. 末位占 12 bit,值为当前毫秒内生成的不同的自增序列,值的上限为 4096;

目前雪花算法的实现比较多,可以直接使用 Hutool 工具类库中的 IdUtil.getSnowflake() 方法来获取雪花 ID。

long id = IdUtil.getSnowflakeNextId();
  1. Java 面试指南(付费)收录的京东同学 4 云实习面试原题:如何处理消息重复消费的问题?如何保证幂等性?雪花算法了解吗?

3. 怎么处理消息积压?

发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:

在这里插入图片描述

  • 消费者扩容:如果当前 Topic 的 Message Queue 的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。
  • 消息迁移 Queue 扩容:如果当前 Topic 的 Message Queue 的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容 Message Queue。可以新建一个临时的 Topic,临时的 Topic 多设置一些 Message Queue,然后先用一些消费者把消费的数据丢到临时的 Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的 Topic 里的数据,消费完了之后,恢复原状。

在这里插入图片描述

4. 顺序消息如何实现?

RocketMQ 实现顺序消息的关键在于保证消息生产和消费过程中严格的顺序控制,即确保同一业务的消息按顺序发送到同一个队列中,并由同一个消费者线程按顺序消费。

在这里插入图片描述

4.1 局部顺序消息如何实现?

局部顺序消息保证在某个逻辑分区或业务逻辑下的消息顺序,例如同一个订单或用户的消息按顺序消费,而不同订单或用户之间的顺序不做保证。

在这里插入图片描述

4.2 全局顺序消息如何实现?

全局顺序消息保证消息在整个系统范围内的严格顺序,即消息按照生产的顺序被消费。

可以将所有消息发送到一个单独的队列中,确保所有消息按生产顺序发送和消费。

在这里插入图片描述

5. 如何实现消息过滤?

有两种方案:

  • 一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
  • 另一种是在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。

一般采用 Cosumer 端过滤,如果希望提高吞吐量,可以采用 Broker 过滤。

对消息的过滤有三种方式:

在这里插入图片描述

  • 根据 Tag 过滤:这是最常见的一种,用起来高效简单
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
  • SQL 表达式过滤:SQL 表达式过滤更加灵活
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

  • Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤

6. 延时消息了解吗?

电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息,1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

RocketMQ 是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:

// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
    Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
    // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
    message.setDelayTimeLevel(3);
    // 发送消息
    producer.send(message);
}

但是目前 RocketMQ 支持的延时级别是有限的:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
6.1 RocketMQ 怎么实现延时消息的?

简单,八个字:临时存储+定时任务

Broker 收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的 Message Queue 中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标 Topic 的队列中,然后消费者就可以正常消费这些消息。

在这里插入图片描述

7. 怎么实现分布式消息事务的?半消息?

半消息:是指暂时还不能被 Consumer 消费的消息,Producer 成功发送到 Broker 端的消息,但是此消息被标记为 “暂不可投递” 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后,Consumer 才能消费此条消息。

依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查:

在这里插入图片描述

  • 1、Producer 向 broker 发送半消息
  • 2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
  • 3、Producer 端执行本地事务。
  • 4、正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
  • 5、异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况。
  • 6、Producer 端查询本地事务的状态
  • 7、根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
  • 8、消费者段消费到消息之后,执行本地事务。

8. 死信队列知道吗?

死信队列用于存储那些无法被正常处理的消息,这些消息被称为死信(Dead Letter)。

在这里插入图片描述

产生死信的原因是,消费者在处理消息时发生异常,且达到了最大重试次数。当消费失败的原因排查并解决后,可以重发这些死信消息,让消费者重新消费;如果暂时无法处理,为避免到期后死信消息被删除,可以先将死信消息导出并进行保存。

9. 如何保证 RocketMQ 的高可用?

NameServer 因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。

在这里插入图片描述

RocketMQ 的高可用主要是在体现在 Broker 的读和写的高可用,Broker 的高可用是通过集群主从实现的。

在这里插入图片描述

Broker 可以配置两种角色:Master 和 Slave,Master 角色的 Broker 支持读和写,Slave 角色的 Broker 只支持读,Master 会向 Slave 同步消息。

也就是说 Producer 只能向 Master 角色的 Broker 写入消息,Cosumer 可以从 Master 和 Slave 角色的 Broker 读取消息。

Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。

如何达到发送端写的高可用性呢?在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId 机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组 Master 仍然可用, Producer 仍然可以发送消息 RocketMQ 目前还不支持把 Slave 自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/964199.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Deepseek-R1 和 OpenAI o1 这样的推理模型普遍存在“思考不足”的问题

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

结构性多余到结构性消失的现象和案例

在碎片化的现象和案例中提取关联性的信息。 也就是废墟之上如何重生的问题。 碎片化无处不在&#xff0c;普通人无人可以幸免。 当AI能力越来越强大&#xff0c;如下这些都在变为现实。 生产力 98%的人是过剩劳动力 人在大规模地被废弃 当人是生产力主体的时候&#xff0c;如…

(脚本学习)BUU18 [CISCN2019 华北赛区 Day2 Web1]Hack World1

自用 题目 考虑是不是布尔盲注&#xff0c;如何测试&#xff1a;用"1^1^11 1^0^10&#xff0c;就像是真真真等于真&#xff0c;真假真等于假"这个测试 SQL布尔盲注脚本1 import requestsurl "http://8e4a9bf2-c055-4680-91fd-5b969ebc209e.node5.buuoj.cn…

【C++】P1957 口算练习题

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目描述输入格式&#xff1a;输出格式&#xff1a; &#x1f4af;我的做法代码实现&#xff1a; &#x1f4af;老师的做法代码实现&#xff1a; &#x1f4af;对比分析&am…

【Linux系统】信号:再谈OS与内核区、信号捕捉、重入函数与 volatile

再谈操作系统与内核区 1、浅谈虚拟机和操作系统映射于地址空间的作用 我们调用任何函数&#xff08;无论是库函数还是系统调用&#xff09;&#xff0c;都是在各自进程的地址空间中执行的。无论操作系统如何切换进程&#xff0c;它都能确保访问同一个操作系统实例。换句话说&am…

LabVIEW双光子成像系统:自主创新,精准成像,赋能科研

双光子成像系统&#xff1a;自主创新&#xff0c;精准成像&#xff0c;赋能科研 第一部分&#xff1a;概述 双光子成像利用两个低能量光子同时激发荧光分子&#xff0c;具有深层穿透、高分辨率、低光损伤等优势。它能实现活体深层组织的成像&#xff0c;支持实时动态观察&…

「全网最细 + 实战源码案例」设计模式——策略模式

核心思想 享元模式&#xff08;Flyweight Pattern&#xff09;是一种行为型设计模式&#xff0c;用于定义一系列算法或策略&#xff0c;将它们封装成独立的类&#xff0c;并使它们可以相互替换&#xff0c;而不影响客户端的代码&#xff0c;提高代码的可维护性和扩展性。 结构…

安全策略实验

安全策略实验 1.拓扑图 2.需求分析 需求&#xff1a; 1.VLAN 2属于办公区&#xff0c;VLAN 3属于生产区 2.办公区PC在工作日时间&#xff08;周一至周五&#xff0c;早8到晚6&#xff09;可以正常访问OA server其他时间不允许 3.办公区PC可以在任意时刻访问Web Server 4.生产…

一文了解边缘计算

什么是边缘计算&#xff1f; 我们可以通过一个最简单的例子来理解它&#xff0c;它就像一个司令员&#xff0c;身在离炮火最近的前线&#xff0c;汇集现场所有的实时信息&#xff0c;经过分析并做出决策&#xff0c;及时果断而不拖延。 1.什么是边缘计算&#xff1f; 边缘计算…

对象的实例化、内存布局与访问定位

一、创建对象的方式 二、创建对象的步骤: 一、判断对象对应的类是否加载、链接、初始化: 虚拟机遇到一条new指令&#xff0c;首先去检查这个指令的参数能否在Metaspace的常量池中定位到一个类的符号引用&#xff0c;并且检查这个符号引用代表的类是否已经被加载、解析和初始化…

Altium Designer绘制原理图时画斜线的方法

第一步&#xff1a;检查设置是否正确 打开preferences->PCB Editor ->Interactive Routing->Interactive Routing Options->Restrict TO 90/45去掉勾选项&#xff0c;点击OK即可。如下图所示&#xff1a; 然后在划线时&#xff0c;按下shift空格就能够切换划线…

【R语言】环境空间

一、环境空间的特点 环境空间是一种特殊类型的变量&#xff0c;它可以像其它变量一样被分配和操作&#xff0c;还可以以参数的形式传递给函数。 R语言中环境空间具有如下3个特点&#xff1a; 1、对象名称唯一性 此特点指的是在不同的环境空间中可以有同名的变量出现&#x…

NeuralCF 模型:神经网络协同过滤模型

实验和完整代码 完整代码实现和jupyter运行&#xff1a;https://github.com/Myolive-Lin/RecSys--deep-learning-recommendation-system/tree/main 引言 NeuralCF 模型由新加坡国立大学研究人员于 2017 年提出&#xff0c;其核心思想在于将传统协同过滤方法与深度学习技术相结…

【ChatGPT:开启人工智能新纪元】

一、ChatGPT 是什么 最近,ChatGPT 可是火得一塌糊涂,不管是在科技圈、媒体界,还是咱们普通人的日常聊天里,都能听到它的大名。好多人都在讨论,这 ChatGPT 到底是个啥 “神器”,能让大家这么着迷?今天咱就好好唠唠。 ChatGPT,全称是 Chat Generative Pre-trained Trans…

c++ 定点 new 及其汇编解释

&#xff08;1&#xff09; 代码距离&#xff1a; #include <new> // 需要包含这个头文件 #include <iostream>int main() {char buffer[sizeof(int)]; // 分配一个足够大的字符数组作为内存池int* p new(&buffer) int(42); // 使用 placement new…

Linux系统之whereis命令的基本使用

Linux系统之whereis命令的基本使用 一、whereis命令介绍二、whereis命令的使用帮助2.1 whereis命令的帮助信息2.2 whereis命令帮助解释 三、whereis命令的基本使用3.1 查找命令的位置3.2 仅查找二进制文件3.3 仅查找手册页3.4 输出实际使用的查找路径3.5 指定自定义搜索路径 四…

CH340G上传程序到ESP8266-01(S)模块

文章目录 概要ESP8266模块外形尺寸模块原理图模块引脚功能 CH340G模块外形及其引脚模块引脚功能USB TO TTL引脚 程序上传接线Arduino IDE 安装ESP8266开发板Arduino IDE 开发板上传失败上传成功 正常工作 概要 使用USB TO TTL&#xff08;CH340G&#xff09;将Arduino将程序上传…

整形的存储形式和浮点型在计算机中的存储形式

在计算机科学的底层世界里&#xff0c;数据存储是基石般的存在。不同数据类型&#xff0c;如整形与浮点型&#xff0c;其存储方式犹如独特的密码&#xff0c;隐藏着计算机高效运行的秘密。理解它们&#xff0c;是深入掌握编程与计算机原理的关键。 一、整形的存储形式 原码、反…

无人机PX4飞控 | PX4源码添加自定义uORB消息并保存到日志

PX4源码添加自定义uORB消息并保存到日志 0 前言 PX4的内部通信机制主要依赖于uORB&#xff08;Micro Object Request Broker&#xff09;&#xff0c;这是一种跨进程的通信机制&#xff0c;一种轻量级的中间件&#xff0c;用于在PX4飞控系统的各个模块之间进行高效的数据交换…

QMK启用摇杆和鼠标按键功能

虽然选择了触摸屏&#xff0c;我仍选择为机械键盘嵌入摇杆模块&#xff0c;这本质上是对"操作连续性"的执着。   值得深思的是&#xff0c;本次开发过程中借助DeepSeek的代码生成与逻辑推理&#xff0c;其展现的能力已然颠覆传统编程范式&#xff0c;需求描述可自动…