Spring Boot 应用Kafka讲解和案例示范

Kafka 是一款高吞吐量、低延迟的分布式消息系统。本文将详细介绍如何在 Spring Boot 项目中使用 Kafka 进行消息接收与消费,并结合幂等和重试机制,确保消息消费的可靠性和系统的扩展性。我们将以电商交易系统为案例进行深入解析。

1. 系统架构概览

在电商系统中,Kafka 常用于订单状态变更、库存变化等事件的异步处理。

+----------------+     Kafka      +----------------+
| 订单服务       |  ---> Produce ---> | 消费服务       |
| (Order Service)|     Topic      | (Consumer Service)|
+----------------+                +----------------+
        |                                  |
     MySQL                               MySQL

主要流程:

  1. 订单服务:接收用户订单请求后,异步将订单信息发送到 Kafka。
  2. 消费服务:从 Kafka 中消费订单信息,更新库存、生成发货信息等操作。
  3. 数据库:使用 MySQL 存储订单和库存数据,并通过 MyBatis 实现持久化操作。

2. Kafka 的基础介绍

Kafka 是一种基于发布-订阅模式的消息系统,支持高吞吐、分区与复制等机制,具备容错和可扩展的特点。它的主要组成部分有:

  • Producer(生产者):向 Kafka 的 Topic 发送消息。
  • Consumer(消费者):从 Kafka 的 Topic 读取消息。
  • Broker(代理):Kafka 的服务器集群。
  • Topic(主题):消息的分类单位。
  • Partition(分区):用于分布式处理消息。

3. 项目环境搭建

3.1 Maven 依赖

在 Spring Boot 项目中,我们通过 spring-kafka 提供对 Kafka 的集成。还需要引入 MyBatis 和 MySQL 相关依赖。

<dependencies>
    <!-- Spring Boot Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- MySQL Driver -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>

    <!-- MyBatis -->
    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>2.2.0</version>
    </dependency>

    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Lombok (可选,用于简化代码) -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>
3.2 数据库表结构设计

为实现电商系统的消息接收与消费,以下是两个主要数据库表:订单表和消费记录表。

  • 订单表(orders:存储订单的基础信息。
  • 消费记录表(message_consume_record:记录消费过的消息,用于幂等校验。
CREATE TABLE orders (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    order_no VARCHAR(64) NOT NULL,
    user_id BIGINT NOT NULL,
    total_price DECIMAL(10, 2) NOT NULL,
    status INT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE message_consume_record (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    message_key VARCHAR(64) NOT NULL UNIQUE,
    consumed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

4. Kafka 消息生产与接收实现

4.1 生产者配置

在 Spring Boot 中,我们可以通过 KafkaTemplate 发送消息。首先,在 application.yml 中配置 Kafka 的基础信息:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 3
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

bootstrap-servers: 这是 Kafka 服务的地址,Kafka 集群通常由多个 Broker 组成,每个 Broker 提供消息的存储与转发功能。这里指定了本地的 Kafka 服务器(localhost:9092),如果有多个 Broker,可以用逗号分隔(例如:localhost:9092,localhost:9093)。

retries: 当消息发送失败时,生产者将重试发送的次数。这里配置了 3 次重试。这在网络不稳定或 Kafka 节点暂时不可用时非常有用,可以有效提高消息发送成功率。

key-serializer: 生产者发送的消息可以有一个键值对。key-serializer 用于将消息的键序列化为字节数组。这里使用了 StringSerializer,表示消息的键是字符串形式,序列化为字节后发送。

value-serializer: 类似于键,value-serializer 用于将消息的值序列化为字节数组。配置 StringSerializer 表示消息内容是字符串。

4.2 消息生产示例
@Service
public class OrderProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendOrderMessage(String orderId) {
        kafkaTemplate.send("order-topic", orderId);
    }
}

OrderService 中,用户提交订单后,可以将订单 ID 发送至 Kafka:

@Service
public class OrderService {

    @Autowired
    private OrderProducer orderProducer;

    public void createOrder(OrderDTO order) {
        // 保存订单逻辑...
        orderProducer.sendOrderMessage(order.getOrderId());
    }
}

5. 消息消费实现

5.1 消费者配置

在消费者中,我们需要定义 @KafkaListener 注解监听 Kafka 主题,并从中接收消息。

spring:
  kafka:
    consumer:
      group-id: order-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

group-id: 消费者组 ID。Kafka 允许多个消费者组监听同一个 Topic,每个消费者组可以独立消费消息。此处配置 order-group,意味着该消费者属于订单消费逻辑的消费者组。

auto-offset-reset: 指定消费者在没有初始偏移量(offset)或当前偏移量无效的情况下,从哪里开始读取消息。earliest 表示从最早的可用消息开始消费,这对于新启动的消费者非常有用,能够确保读取历史数据。

key-deserializer: 将接收到的消息键从字节数组反序列化为 Java 对象。这里配置 StringDeserializer,表示键是字符串。

value-deserializer: 类似于键的反序列化,value-deserializer 用于将消息内容反序列化为 Java 对象。配置 StringDeserializer,表示消息内容是字符串。

5.2 消息消费示例
@Service
public class OrderConsumer {

    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void consumeOrder(String orderId) {
        orderService.processOrder(orderId);
    }
}

OrderService 中,处理接收到的订单消息:

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Transactional
    public void processOrder(String orderId) {
        // 根据订单 ID 更新订单状态、库存等操作
        Order order = orderMapper.findById(orderId);
        // 更新订单逻辑...
    }
}

6. 幂等性保证

Kafka 的消息消费可能会因为网络问题或其他故障导致重复消费,因此在消费消息时需要考虑幂等性。我们可以通过在数据库中存储每个消息的唯一标识来实现幂等。

6.1 幂等校验实现

在消费消息时,首先检查该消息是否已经被消费过:

@Service
public class OrderConsumer {

    @Autowired
    private MessageConsumeRecordMapper consumeRecordMapper;

    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void consumeOrder(String orderId) {
        if (consumeRecordMapper.existsByMessageKey(orderId)) {
            // 如果已经处理过该消息,直接返回
            return;
        }

        // 处理订单
        orderService.processOrder(orderId);

        // 记录已处理消息
        consumeRecordMapper.insertConsumeRecord(orderId);
    }
}

MessageConsumeRecordMapper 接口用于操作消费记录表:

@Mapper
public interface MessageConsumeRecordMapper {

    boolean existsByMessageKey(String messageKey);

    void insertConsumeRecord(String messageKey);
}

通过这种方式,我们确保了每条消息只被消费一次,避免重复处理订单数据。


7. 重试机制实现

为了保证消息的可靠消费,可能会需要对消费失败的消息进行重试。Kafka 提供了自动重试机制,但在多次重试失败后,仍然可能需要手动处理。因此,我们可以通过将消费失败的消息保存至数据库,并定期进行重试的方式,实现可靠的消息处理。

7.1 消费失败记录表设计
CREATE TABLE failed_message (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    message_key VARCHAR(64) NOT NULL,
    payload TEXT NOT NULL,
    failed_reason TEXT,
    retry_count INT DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
7.2 重试机制实现

在消费消息失败时,将消息记录到失败表中,并定期进行重试。

@Service
public class OrderConsumer {

    @Autowired
    private FailedMessageMapper failedMessageMapper;

    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void consumeOrder(String orderId) {
        try {
            orderService.processOrder(orderId);
        } catch (Exception e) {
            failedMessageMapper.insertFailedMessage(orderId, e.getMessage());
        }
     }
}     

通过定时任务或手动触发,定期查询失败的消息并重新消费:

@Service
public class FailedMessageRetryService {

    @Autowired
    private FailedMessageMapper failedMessageMapper;

    @Scheduled(fixedDelay = 60000)  // 每分钟重试一次
    public void retryFailedMessages() {
        List<FailedMessage> failedMessages = failedMessageMapper.findAll();
        for (FailedMessage message : failedMessages) {
            try {
                orderService.processOrder(message.getPayload());
                failedMessageMapper.deleteById(message.getId());
            } catch (Exception e) {
                failedMessageMapper.incrementRetryCount(message.getId());
            }
        }
    }
}

8. 扩展性设计

为了使系统具备良好的扩展性,我们需要考虑以下几个方面:

8.1 支持多种消息格式

除了支持 Kafka 消息,我们可以通过设计合理的接口结构,扩展系统支持其他消息队列或 HTTP 请求的接入。例如,通过创建统一的 MessageConsumer 接口,任何类型的消息都可以实现消费逻辑。

public interface MessageConsumer {
    void consume(String payload);
}

@Service
public class KafkaOrderConsumer implements MessageConsumer {
    @Override
    public void consume(String payload) {
        // Kafka 消息消费逻辑
    }
}

通过这种设计,可以轻松添加新的消息类型或处理逻辑,而不需要修改现有代码。

8.2 动态配置

为了增强系统的灵活性,系统可以支持通过数据库或配置文件动态调整消息消费逻辑。例如,可以在配置文件中定义不同业务的消费逻辑:

message-consumers:
  order:
    type: kafka
    topic: order-topic
  user:
    type: http
    url: http://example.com/user/message

通过读取这些配置,系统可以动态选择不同的消费逻辑,从而增强扩展性。


9. 性能优化

9.1 异步消费

为了提高消费速度,可以将消息的处理逻辑放入线程池中异步执行,从而避免阻塞 Kafka 消费的主线程。

@Async
public void processOrderAsync(String orderId) {
    orderService.processOrder(orderId);
}
9.2 批量消费

Kafka 支持批量消费消息,这样可以减少 Kafka 客户端与 Broker 之间的交互次数,提升性能。在 Spring Boot 中,可以通过配置 max.poll.records 参数控制每次批量消费的消息数量。

spring:
  kafka:
    consumer:
      max-poll-records: 500
9.3 分区与并行消费

通过为 Kafka 的 Topic 配置多个分区,并为消费者组中的消费者分配不同的分区,可以实现并行消费,从而提升系统的消费能力。

spring:
  kafka:
    consumer:
      concurrency: 3

10. Kafka 防止 MQ 队列堆积太多导致内存溢出问题

在实际的生产环境中,当消费速度低于消息的生产速度时,Kafka 消费者端的消息队列可能会出现堆积。如果消息堆积时间过长,会导致 Kafka 中的分区文件过大,甚至在消费者端可能造成内存溢出。因此,我们需要在架构设计中考虑如何有效防止消息堆积的问题。

以下是一些常见的应对策略:

10.1 提高消费速度

当 Kafka 的消费速度低于生产速度时,最直接的应对措施就是提升消费的速度:

  1. 并行消费:通过配置 Kafka 消费者的 concurrency 参数来增加消费者实例的数量。Kafka 使用分区来进行负载均衡,分区的数量决定了并发消费的能力。因此,增加分区数可以提升消费者的并发处理能力。
spring:
  kafka:
    consumer:
      concurrency: 3  # 配置多个消费者进行并行处理
  1. 批量消费:通过 max.poll.records 参数配置每次拉取的消息数量。增加批量消费可以减少 Kafka 消费者与 Broker 之间的交互,从而提升性能。
spring:
  kafka:
    consumer:
      max-poll-records: 500  # 每次批量拉取 500 条消息
10.2 优化消息处理逻辑

在消费端,消息的处理速度是决定 Kafka 消费效率的关键。因此,需要对消费逻辑进行优化:

  1. 异步处理:在消息处理完成后再返回响应,可能导致整个消费过程变慢。可以通过使用异步任务处理消息内容,从而避免阻塞 Kafka 消费的主线程。可以结合 Spring 的 @Async 注解实现异步处理。
@Async
public void processOrderAsync(String orderId) {
    // 异步处理订单消息
    orderService.processOrder(orderId);
}
  1. 缩短消息处理时间:简化业务逻辑,避免冗长的处理流程。使用缓存等方式减少对数据库的频繁访问,降低 I/O 操作带来的性能开销。
10.3 调整 Kafka 生产者端的速率

生产者端的消息发送速率直接影响消息的堆积情况。当消费端无法跟上生产端的速度时,适当限制生产者的消息发送速率是一个有效的策略:

  1. 限流机制:在生产者端通过限流策略,控制每秒钟向 Kafka 发送的消息数量,确保消费者有足够的时间处理消息。例如,可以使用 RateLimiter 实现限流。
RateLimiter rateLimiter = RateLimiter.create(1000); // 每秒最多发送 1000 条消息

public void sendMessage(String topic, String message) {
    rateLimiter.acquire();  // 获取许可
    kafkaTemplate.send(topic, message);
}
  1. 分布式限流:如果消息的生产端部署在多个节点上,可以使用 Redis 等工具实现分布式限流。
10.4 设置合适的消费位移提交策略

Kafka 消费者有两种提交消费位移的方式:自动提交和手动提交。默认情况下,Kafka 会每隔一段时间自动提交消费位移。如果消费端发生异常,未能处理的消息在下次重新拉取时会再次被消费。为了避免消息重复消费,我们可以将消费位移的提交改为手动提交,确保消息处理完后再提交位移。

spring:
  kafka:
    consumer:
      enable-auto-commit: false  # 关闭自动提交位移

手动提交消费位移:

try {
    // 消费处理消息
    processMessage(record);
    // 手动提交位移
    acknowledgment.acknowledge();
} catch (Exception e) {
    // 处理异常
}
10.5 配置 Kafka 消息保留策略

如果消息堆积严重,可以通过 Kafka 的 retention.ms 参数设置消息的存储时间,确保超过存储时间的消息自动删除,防止 Kafka 分区文件无限制增长。

log.retention.ms=604800000  # 配置 Kafka 日志文件的保留时间,单位为毫秒,这里设置为 7 天

此外,可以通过配置 log.retention.bytes 来限制 Kafka 每个分区的日志文件大小,确保超出大小限制后自动删除最早的消息。

log.retention.bytes=1073741824  # 配置 Kafka 分区日志文件的最大大小,单位为字节,这里设置为 1 GB
10.6 使用 Kafka 消息压缩

对于大数据量的消息,可以启用 Kafka 消息压缩功能,减少消息的占用空间,从而提升生产和消费的效率。Kafka 支持多种压缩算法,包括 GZIP、LZ4 和 SNAPPY。

spring:
  kafka:
    producer:
      compression-type: gzip  # 启用 GZIP 压缩

压缩不仅可以减少网络传输的数据量,还可以降低 Kafka Broker 和消费端的存储压力,从而减少消息堆积的可能性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/884238.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

robomimic应用教程(二)——策略运行与评估

得到训练好的pth后&#xff0c;下一并将其进行部署及效果评估 可以在jupyter notebook中进行此操作&#xff0c;文件为robomimic文件夹中的examples/notebooks/run_policy.ipynb 本文采用pycharm调试 该脚本用于在环境中评估策略&#xff0c;主要包括从model zoo下载checkpoi…

【web开发】Spring Boot 快速搭建Web项目(三)

Date: 2024.08.31 18:01:20 author: lijianzhan 简述&#xff1a;根据上篇原文Spring Boot 快速搭建Web项目&#xff08;二&#xff09;&#xff0c;由于已经搭建好项目初始的框架&#xff0c;以及自动创建了一个启动类文件&#xff08;TestWebApplication.java&#xff09; …

【Python】Daphne:Django 异步服务的桥梁

Daphne 是 Django Channels 项目的一部分&#xff0c;专门用于为 Django 提供支持 HTTP、WebSocket、HTTP2 和 ASGI 协议的异步服务器。Daphne 是一个开源的 Python 异步服务器&#xff0c;它可以帮助开发者运行异步应用程序&#xff0c;并且非常适合与 Django Channels 一起使…

电子电路的基础知识

电子电路是现代电子技术的基础&#xff0c;由电子元件&#xff08;如电阻、电容、电感、二极管、晶体管等&#xff09;和无线电元件通过一定方式连接而成的电路系统。 以下是对电子电路的详细概述&#xff1a; 一、定义与分类 定义&#xff1a;电子电路是指由电子器件和有关无…

解压视频素材下载网站推荐

在制作抖音小说推文或其他短视频时&#xff0c;找到合适的解压视频素材非常重要。以下是几个推荐的网站&#xff0c;可以帮助你轻松下载高质量的解压视频素材&#xff1a; 蛙学网 蛙学网是国内顶尖的短视频素材网站&#xff0c;提供大量4K高清无水印的解压视频素材&#xff0c;…

【记录】Excel|不允许的操作:合并或隐藏单元格出现的问题列表及解决方案

人话说在前&#xff1a;这篇的内容是2022年5月写的&#xff0c;当时碰到了要批量处理数据的情况&#xff0c;但是又不知道数据为啥一直报错报错报错&#xff0c;说不允许我操作&#xff0c;最终发现是因为存在隐藏的列或行&#xff0c;于是就很无语地写了博客&#xff0c;但内容…

如何使用Flux+lora进行AI模型文字生成图片

目录 概要 前期准备 部署安装与运行 1. 部署ComfyUI 本篇的模型部署是在ComfyUI的基础上进行&#xff0c;如果没有部署过ComfyUI&#xff0c;请按照下面流程先进行部署&#xff0c;如已安装请跳过该步&#xff1a; &#xff08;1&#xff09;使用命令克隆 ComfyUI &…

【友元补充】【动态链接补充】

友元 友元的目的是让一个函数或者类&#xff0c;访问另一个类中的私有成员。 友元的关键字friend是一个修饰符。 友元分为友元类和友元函数 1.全局函数作友元 2.类作友元 3.类的一个成员函数作友元 好处&#xff1a;可以通过友元在类外访问类内的私有和受保护类型的成员 坏处…

Python画笔案例-068 绘制漂亮米

1、绘制漂亮米 通过 python 的turtle 库绘制 漂亮米,如下图: 2、实现代码 绘制 漂亮米,以下为实现代码: """漂亮米.py注意亮度为0.5的时候最鲜艳本程序需要coloradd模块支持,安装方法:pip install coloradd程序运行需要很长时间,请耐心等待。可以把窗口最小…

智能Ai语音机器人的应用价值有哪些?

随着时间的推移&#xff0c;人工智能的发展越来越成熟&#xff0c;智能时代也离人们越来越近&#xff0c;近几年人工智能越来越火爆&#xff0c;人工智能的应用已经开始渗透到各行各业&#xff0c;与生活交融&#xff0c;成为人们无法拒绝&#xff0c;无法失去的一个重要存在。…

医疗大数据安全与隐私保护:数据分类分级的基石作用

医疗行业在数字化转型中迅猛发展&#xff0c;医疗大数据作为核心驱动力&#xff0c;深刻改变医疗服务的模式与效率。它不仅促进医疗信息的流通与共享&#xff0c;推动个性化、精准化的医疗服务新生态。同时&#xff0c;也在提升医疗服务质量、优化医疗资源配置等方面展现巨大潜…

Spring Ioc底层原理代码详细解释

文章目录 概要根据需求编写XML文件&#xff0c;配置需要创建的bean编写程序读取XML文件&#xff0c;获取bean相关信息&#xff0c;类&#xff0c;属性&#xff0c;id前提知识点Dom4j根据第二步获取到的信息&#xff0c;结合反射机制动态创建对象&#xff0c;同时完成属性赋值将…

蓝桥杯【物联网】零基础到国奖之路:十二. TIM

蓝桥杯【物联网】零基础到国奖之路:十二. TIM 第一节 理论知识第二节 cubemx配置 第一节 理论知识 STM32L071xx器件包括4个通用定时器、1个低功耗定时器&#xff08;LPTIM&#xff09;、2个基本定时器、2个看门狗定时器和SysTick定时器。 通用定时器&#xff08;TIM2、TIM3、…

Spring Cloud Alibaba-(6)Spring Cloud Gateway【网关】

Spring Cloud Alibaba-&#xff08;1&#xff09;搭建项目环境 Spring Cloud Alibaba-&#xff08;2&#xff09;Nacos【服务注册与发现、配置管理】 Spring Cloud Alibaba-&#xff08;3&#xff09;OpenFeign【服务调用】 Spring Cloud Alibaba-&#xff08;4&#xff09;Sen…

数据分析工具julius ai如何使用

什么是julius ai Julius AI 是一款强大的ai数据分析工具。用户可以使用excel、数据库、文本文件等多种格式的数据&#xff0c;Julius AI 会自动分析这些数据并提供详细的解释和可视化图表。官网显示它目前已经有三十万用户。它也支持手机版。 虽然openai也支持生成图表&#xf…

asp.net core grpc快速入门

环境 .net 8 vs2022 创建 gRPC 服务器 一定要勾选Https 安装Nuget包 <PackageReference Include"Google.Protobuf" Version"3.28.2" /> <PackageReference Include"Grpc.AspNetCore" Version"2.66.0" /> <PackageR…

项目实战:k8s部署考试系统

一、新建nfs服务器&#xff08;192.168.1.44&#xff09; 1.基础配置&#xff08;IP地址防火墙等&#xff09; 2.配置时间同步 [rootlocalhost ~]# yum -y install ntpdate.x86_64 [rootlocalhost ~]# ntpdate time2.aliyun.com 27 Sep 10:28:08 ntpdate[1634]: adjust tim…

MySql在更新操作时引入“两阶段提交”的必要性

日志模块有两个redo log和binlog&#xff0c;redo log 是引擎层的日志&#xff08;负责存储相关的事&#xff09;&#xff0c;binlog是在Server层&#xff0c;主要做MySQL共嗯那个层面的事情。redo log就像一个缓冲区&#xff0c;可以让当更新操作的时候先放redo log中&#xf…

node.js npm 安装和安装create-next-app -windowsserver12

1、官网下载windows版本NODE.JS https://nodejs.org/dist/v20.17.0/node-v20.17.0-x64.msi 2、安装后增加两个文件夹目录node_global、node_cache npm config set prefix "C:\Program Files\nodejs\node_global" npm config set prefix "C:\Program Files\nod…

基于SpringBoot的新冠检测信息管理系统的设计与实现

文未可获取一份本项目的java源码和数据库参考。 国内外在该方向的研究现状及分析 新型冠状病毒肺炎疫情发生以来&#xff0c;中国政府采取积极的防控策略和措施&#xff0c;经过两个多月的不懈努力&#xff0c;有效控制了新发病例的増长&#xff0c;本地传播已经趋于完全控制…