Java的RocketMQ使用

在 Spring Boot 中,RocketMQ 和 Kafka 都是常用的消息中间件,它们的使用方法有一些相似之处,也有各自的特点。

一、RocketMQ 在 Spring Boot 中的使用

  1. 引入依赖

    • 在项目的pom.xml文件中添加 RocketMQ 的依赖。
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>
    
  2. 配置 RocketMQ

    • application.propertiesapplication.yml文件中配置 RocketMQ 的相关参数,如 namesrvAddr(NameServer 地址)等。
    rocketmq.name-server=127.0.0.1:9876
    
  3. 生产者

    • 创建一个生产者类,使用@Resource注入RocketMQTemplate
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RocketMQProducer {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        public void sendMessage(String topic, String message) {
            rocketMQTemplate.convertAndSend(topic, message);
        }
    }
    
  4. 消费者

    • 创建一个消费者类,使用@RocketMQMessageListener注解指定监听的主题和消费组。
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            // 处理接收到的消息
            System.out.println("Received message: " + message);
        }
    }
    

二、Kafka 在 Spring Boot 中的使用

  1. 引入依赖

    • pom.xml文件中添加 Kafka 的依赖。
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.8.12</version>
    </dependency>
    
  2. 配置 Kafka

    • application.propertiesapplication.yml文件中配置 Kafka 的相关参数,如 bootstrapServers(Kafka 服务器地址)等。
    spring.kafka.bootstrap-servers=127.0.0.1:9092
    
  3. 生产者

    • 创建一个生产者类,使用@Resource注入KafkaTemplate
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaProducer {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendMessage(String topic, String message) {
            kafkaTemplate.send(topic, message);
        }
    }
    
  4. 消费者

    • 创建一个消费者类,使用@KafkaListener注解指定监听的主题和消费组。
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaConsumer {
        @KafkaListener(topics = "your_topic", groupId = "your_consumer_group")
        public void onMessage(String message) {
            // 处理接收到的消息
            System.out.println("Received message: " + message);
        }
    }
    

总的来说,RocketMQ 和 Kafka 在 Spring Boot 中的使用都比较方便,具体选择哪种消息中间件可以根据项目的实际需求来决定。RocketMQ 在一些场景下可能具有高吞吐量、低延迟等优势,而 Kafka 则在大规模分布式系统中被广泛应用,具有高可靠性和可扩展性。

二、如何保证消息队列顺序性

1、发送端保证顺序性

  1. 合理设计业务

    • 确保具有顺序性要求的消息被发送到同一个主题(Topic)的同一个队列(Queue)中。比如,将同一类业务的消息按照特定规则进行分类,使得它们都进入相同的队列。
    • 一个业务场景的消息尽量由一个发送端来发送消息,避免多个发送端发送可能导致的乱序。
  2. 使用同步发送

    • 在发送消息时,使用同步发送方式send(Message msg, long timeout),确保消息成功发送后再进行下一个消息的发送。这样可以避免异步发送可能导致的消息乱序情况。

2、消费端保证顺序性

  1. 单线程消费

    • 消费者在消费消息时,采用单线程的方式进行消费。这样可以确保同一队列中的消息按照发送的顺序被依次处理。
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            // 处理接收到的消息
            System.out.println("Received message: " + message);
        }
    }
    

    在实际应用中,可以将消费逻辑放在一个单独的方法中,然后在这个方法中进行顺序处理,确保消息的顺序性。

  2. 避免并发处理

    • 确保在消费消息的过程中,不会出现并发处理的情况。比如,不要在消费消息的同时启动其他异步任务或者多线程处理,以免破坏消息的顺序性。

3、设置队列数量

  1. 控制队列数量
    • 如果业务对消息顺序性要求非常严格,可以考虑减少主题下的队列数量。通常情况下,一个主题可以包含多个队列,消息会被随机分发到不同的队列中。如果队列数量较少,那么消息更有可能被发送到同一个队列中,从而更容易保证顺序性。

通过以上方法,可以在一定程度上保证 RocketMQ 消息的顺序性。但需要注意的是,保证消息顺序性可能会牺牲一定的性能和吞吐量,因此需要根据实际业务需求进行权衡和选择。

一、如何确保消息队列的可靠性

1、发送端

  1. 同步发送与确认

    • 使用同步发送方式send(Message msg, long timeout),该方法会等待消息发送成功的确认,确保消息被正确地发送到 Broker。如果发送失败或超时,可以进行重试或其他错误处理操作。
    try {
        SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
        System.out.println("Message sent successfully: " + sendResult);
    } catch (Exception e) {
        System.out.println("Failed to send message: " + e.getMessage());
        // 进行重试或其他错误处理
    }
    
  2. 事务消息

    • 对于一些需要保证事务一致性的场景,可以使用 RocketMQ 的事务消息机制。发送事务消息分为两个阶段,首先发送半事务消息,然后执行本地事务,根据本地事务的结果决定提交或回滚事务消息。
    @Service
    public class TransactionProducer {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        public void sendTransactionMessage() {
            TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("transactionTopic", new Message<>("transactionMessage"), null);
            System.out.println("Transaction message sent: " + result);
        }
    }
    

2、Broker 端

  1. 持久化存储

    • RocketMQ 支持消息的持久化存储,可以将消息存储在磁盘上,以防止消息丢失。通过配置broker.conf文件中的flushDiskType参数,可以选择同步刷盘或异步刷盘方式。同步刷盘可以保证消息在写入磁盘后才返回成功响应,但会影响性能;异步刷盘可以提高性能,但在系统故障时可能会丢失部分未刷盘的消息。
  2. 高可用部署

    • 部署多主多从的 RocketMQ 集群,当主节点出现故障时,从节点可以自动切换为主节点,保证消息服务的可用性。同时,可以配置主从同步方式,确保消息在主从节点之间的可靠同步。

3、消费端

  1. 消费确认

    • 消费者在成功处理消息后,需要向 Broker 发送消费确认。可以通过设置consumeModeCONSUME_PASSIVELY(被动消费模式),并在处理完消息后手动调用acknowledge()方法进行确认。如果消费失败,可以选择重试或者将消息发送到死信队列进行后续处理。
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            try {
                // 处理消息
                System.out.println("Received message: " + message);
                // 确认消费成功
                getRocketMQListenerContainer().acknowledge();
            } catch (Exception e) {
                System.out.println("Failed to process message: " + e.getMessage());
                // 可以选择重试或者发送到死信队列
            }
        }
    }
    
  2. 重试机制

    • 配置消费者的重试次数和重试时间间隔,当消费失败时,RocketMQ 会自动进行重试。可以在application.propertiesapplication.yml中配置rocketmq.retry.timesrocketmq.retry.interval参数来控制重试策略。

通过以上措施,可以在不同阶段保证 RocketMQ 消息的可靠性,确保消息在生产、存储和消费过程中不会丢失或出现错误。

三、保证消息处理的幂等性
在 RocketMQ 中,可以通过以下几种方式来保证消息处理的幂等性:

1、业务层面设计

  1. 使用唯一标识

    • 在业务中为每条消息生成一个唯一的标识,比如使用业务流水号、订单号等作为消息的唯一标识。在消费消息时,先根据这个唯一标识判断该消息是否已经被处理过。如果已经处理过,则直接忽略该消息。
    • 例如在电商系统中,订单创建的消息可以使用订单号作为唯一标识。消费者在处理消息时,先查询数据库中是否存在该订单号对应的处理记录,如果存在则说明该消息已经被处理过,不再重复处理。
    @Service
    public class OrderProcessingService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void processOrderMessage(String orderId) {
            boolean isProcessed = isOrderProcessed(orderId);
            if (isProcessed) {
                return;
            }
            // 处理订单逻辑
            System.out.println("Processing order: " + orderId);
            markOrderAsProcessed(orderId);
        }
    
        private boolean isOrderProcessed(String orderId) {
            int count = jdbcTemplate.queryForObject(
                    "SELECT COUNT(*) FROM processed_orders WHERE order_id =?",
                    Integer.class, orderId);
            return count > 0;
        }
    
        private void markOrderAsProcessed(String orderId) {
            jdbcTemplate.update(
                    "INSERT INTO processed_orders (order_id) VALUES (?)",
                    orderId);
        }
    }
    
  2. 利用数据库约束

    • 可以在数据库中使用唯一索引、主键约束等方式来保证业务数据的唯一性。在处理消息时,如果违反了这些约束,则说明该消息已经被处理过,不再重复处理。
    • 比如在用户注册的场景中,可以在数据库的用户表中使用用户名或邮箱作为唯一索引。当消费用户注册的消息时,尝试插入用户数据,如果插入失败(因为违反唯一索引约束),则说明该用户已经注册过,不再重复处理。
    @Service
    public class UserRegistrationService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void registerUser(String username, String password) {
            try {
                jdbcTemplate.update(
                        "INSERT INTO users (username, password) VALUES (?,?)",
                        username, password);
            } catch (DataIntegrityViolationException e) {
                // 处理插入失败的情况,可能是用户已存在
                System.out.println("User already exists: " + username);
            }
        }
    }
    

2、技术层面实现

  1. 分布式锁
    • 可以使用分布式锁来保证同一时间只有一个消费者实例在处理特定的消息。在处理消息之前,先获取分布式锁,如果获取成功则处理消息,处理完成后释放锁。如果获取锁失败,则说明该消息正在被其他实例处理,当前实例可以选择等待或者直接忽略该消息。
    • 可以使用 Redis 或 Zookeeper 等实现分布式锁。以 Redis 为例,可以使用 SETNX 命令来实现分布式锁。
    @Service
    public class MessageProcessingService {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public void processMessage(String messageId) {
            String lockKey = "message_lock_" + messageId;
            boolean locked = tryLock(lockKey);
            if (!locked) {
                return;
            }
            try {
                boolean isProcessed = isMessageProcessed(messageId);
                if (isProcessed) {
                    return;
                }
                // 处理消息逻辑
                System.out.println("Processing message: " + messageId);
                markMessageAsProcessed(messageId);
            } finally {
                releaseLock(lockKey);
            }
        }
    
        private boolean tryLock(String key) {
            return redisTemplate.opsForValue().setIfAbsent(key, "locked", Duration.ofSeconds(30));
        }
    
        private void releaseLock(String key) {
            redisTemplate.delete(key);
        }
    
        private boolean isMessageProcessed(String messageId) {
            // 判断消息是否已处理的逻辑
            return false;
        }
    
        private void markMessageAsProcessed(String messageId) {
            // 标记消息已处理的逻辑
        }
    }
    

通过以上方法,可以有效地保证 RocketMQ 消息处理的幂等性,避免因重复消费消息而导致的业务数据不一致问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/897280.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue中实现css布局

在vue中通过flex布局实现css的s型结构 通过数组截取循环布局&#xff0c;奇数行从左到右&#xff0c;在偶数行从右到左实现s型结构 主要内容分为三部分 中间内容部分 数据格式 items: [{nodeList: [1, 2, 3, 4, 5, 6]},{nodeList: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}, {nod…

数据结构与算法:贪心与相关力扣题455.分发饼干、376.摆动序列、53.最大子数组和(贪心+动态规划dp)、122.买卖股票的最佳时机Ⅱ

贪心算法 贪心策略在实现时&#xff0c;经常使用到的技巧&#xff1a; 根据某标准建立一个比较器来排序 根据某标准建立一个比较器来组成堆 举例题目&#xff1a;会议室安排 一些项目要占用一个会议室宣讲&#xff0c;会议室不能同时容纳两个项目的宣讲。 给你每一个项目开始…

Go 1.19.4 命令调用、日志、包管理、反射-Day 17

1. 系统命令调用 所谓的命令调用&#xff0c;就是通过os&#xff0c;找到系统中编译好的可执行文件&#xff0c;然后加载到内存中&#xff0c;变成进程。 1.1 exec.LookPath&#xff08;寻找命令&#xff09; 作用&#xff1a; exec.LookPath 函数用于在系统的环境变量中搜索可…

2024年中国工业大模型行业发展研究报告|附43页PDF文件下载

工业大模型伴随着大模型技术的发展&#xff0c;逐渐渗透至工业&#xff0c;处于萌芽阶段。 就大模型的本质而言&#xff0c;是由一系列参数化的数学函数组成的计算系统&#xff0c;且是一个概率模型&#xff0c;其工作机制是基于概率和统计推动进行的&#xff0c;而非真正的理解…

C++ 进阶:类相关特性的深入探讨

⭐在对C 中类的6个默认成员函数有了初步了解之后&#xff0c;现在我们进行对类相关特性的深入探讨&#xff01; &#x1f525;&#x1f525;&#x1f525;【C】类的默认成员函数&#xff1a;深入剖析与应用&#xff08;上&#xff09; 【C】类的默认成员函数&#xff1a;深入剖…

SpringBoot使用RestTemplate实现发送HTTP请求

Java 实现发送 HTTP 请求&#xff0c;系列文章&#xff1a; 《Java使用原生HttpURLConnection实现发送HTTP请求》 《Java使用HttpClient5实现发送HTTP请求》 《SpringBoot使用RestTemplate实现发送HTTP请求》 1、RestTemplate 的介绍 RestTemplate 是 Spring 框架提供的一个用…

【java】抽象类和接口(了解,进阶,到全部掌握)

各位看官早安午安晚安呀 如果您觉得这篇文章对您有帮助的话 欢迎您一键三连&#xff0c;小编尽全力做到更好 欢迎您分享给更多人哦 大家好我们今天来学习Java面向对象的的抽象类和接口&#xff0c;我们大家庭已经来啦~ 一&#xff1a;抽象类 1.1:抽象类概念 在面向对象的概念中…

12寸FAB厂试产到量产实现无纸化要素之软硬件

在12寸先进封装半导体车间从试产到量产的过程中&#xff0c;实现生产过程无纸化&#xff0c;需要综合考虑软硬件的配置。以下是一些关键的规划建议&#xff1a; 1、生产文档管理系统&#xff08;PDM&#xff09;&#xff1a; 采用基于SOLIDWORKS PDM开发的无纸化方案&#xf…

uniapp 整合 OpenLayers - 加载Geojson数据(在线、离线)

Geojson数据是矢量数据&#xff0c;主要是点、线、面数据集合 Geojson数据获取&#xff1a;DataV.GeoAtlas地理小工具系列 实现代码如下&#xff1a; <template><!-- 监听变量 operation 的变化&#xff0c;operation 发生改变时&#xff0c;调用 openlayers 模块的…

Java面试场景题(1)---如何使用redis记录上亿用户连续登陆天数

感谢uu们的观看&#xff0c;话不多说开始~ 对于这个问题&#xff0c;我们需要先来了解一下~ 海量数据都可以用bitmap来存储&#xff0c;因为占得内存小&#xff0c;速度也很快 我大概计算了一下~ 完全够&#xff1a;String类型512M 1byte 8个bit位 8个状态 512M1024byt…

数据库性能测试报告总结模板

1计划概述 目的&#xff1a;找出系统潜在的性能缺陷 目标&#xff1a;从安全&#xff0c;可靠&#xff0c;稳定的角度出发&#xff0c;找出性能缺陷&#xff0c;并且找出系统最佳承受并发用户数&#xff0c;以及并发用户数下长时间运行的负载情况&#xff0c;如要并发100用户&a…

CTFHUB技能树之SQL——字符型注入

开启靶场&#xff0c;打开链接&#xff1a; 直接指明是SQL字符型注入&#xff0c;但还是来判断一下 &#xff08;1&#xff09;检查是否存在注入点 1 and 11# 返回正确 1 and 12# 返回错误 说明存在SQL字符型注入 &#xff08;2&#xff09;猜字段数 1 order by 2# 1 order…

颠覆编程!通义灵码、包阅AI、CodeGeeX三大AI助手解锁无限潜力!

随着科技的疾速前行&#xff0c;人工智能&#xff08;AI&#xff09;辅助编程工具已跃然成为软件开发领域及编程爱好者群体中不可或缺的得力助手。这些融入了尖端智能化算法的工具&#xff0c;不仅深刻改变了编程工作的面貌&#xff0c;通过自动化和优化手段显著提升了编程效率…

GJS-WCP

不懂的就问&#xff0c;但我也是二把手......哭死 web GJS-ezssti 很常规的ssti模板注入&#xff0c;只过滤了"/","flag"。 过滤了/,flag 可以利用bash的特性绕过&#xff0c;如字符串截取&#xff0c;环境变量等等。payload1: {{url_for.__globals__[…

柔性数组的使用

//柔性数组的使用 #include<stdio.h> #include<stdlib.h> #include<errno.h> struct s {int i;int a[]; }; int main() {struct s* ps (struct s*)malloc(sizeof(struct s) 20 * sizeof(int));if (ps NULL){perror("malloc");return 1;}//使用这…

react18中在列表项中如何使用useRef来获取每项的dom对象

在react中获取dom节点都知道用ref&#xff0c;但是在一个列表循环中&#xff0c;这样做是行不通的&#xff0c;需要做进一步的数据处理。 实现效果 需求&#xff1a;点击每张图片&#xff0c;当前图片出现在可视区域。 代码实现 .box{border: 1px solid #000;list-style: …

Math类、System类、Runtime类、Object类、Objects类、BigInteger类、BigDecimal类

课程目标 能够熟练使用Math类中的常见方法 能够熟练使用System类中的常见方法 能够理解Object类的常见方法作用 能够熟练使用Objects类的常见方法 能够熟练使用BigInteger类的常见方法 能够熟练使用BigDecimal类的常见方法 1 Math类 1.1 概述 tips&#xff1a;了解内容…

Java | Leetcode Java题解之第493题翻转对

题目&#xff1a; 题解&#xff1a; class Solution {public int reversePairs(int[] nums) {Set<Long> allNumbers new TreeSet<Long>();for (int x : nums) {allNumbers.add((long) x);allNumbers.add((long) x * 2);}// 利用哈希表进行离散化Map<Long, Int…

【JAVA】第三张_Eclipse下载、安装、汉化

简介 Eclipse是一种流行的集成开发环境&#xff08;IDE&#xff09;&#xff0c;可用于开发各种编程语言&#xff0c;包括Java、C、Python等。它最初由IBM公司开发&#xff0c;后来被Eclipse Foundation接手并成为一个开源项目。 Eclipse提供了一个功能强大的开发平台&#x…

AI 编译器学习笔记之四 -- cann接口使用

1、安装昇腾依赖 # CANN发布件地址 https://cmc.rnd.huawei.com/cmcversion/index/releaseView?deltaId10274626629404288&isSelectSoftware&url_datarun Ascend-cann-toolkit_8.0.T15_linux-aarch64.run Ascend-cann-nnal_8.0.T15_linux-aarch64.run Ascend-cann-ker…