SpringBoot 3.1.7 集成Kafka 3.5.0

一、背景

写这边篇文章的目的,是记录我在集成kafka客户端遇到的一些问题,文章会记录整个接入的过程,其中会遇到几个坑,如果需要最终版本,直接看最后一节就行了,感觉Spring-Kafka的文档太少了,如果采用SpringBoot集成的方式接入,一不小可能就会踩坑

二、操作步骤

1 添加依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2 添加配置文件

spring:
  profiles:
    active: dev
  application:
    name: goods-center
  kafka:
    bootstrap-servers: 192.168.31.114:9092
    producer:
      acks: all
      timeout.ms: 5000
      # 值序列化:使用Json
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      enable:
        idempotence: true # 默认为True
    consumer:
      group-id: goods-center
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      enable-auto-commit: false # 取消自动提交

3 生产者代码

package com.ychen.goodscenter.fafka;


import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private KafkaTemplate<Long, SubmitOrderReq> kafkaTemplate;

    public void sendOrderMessage(SubmitOrderReq msg) {
        kafkaTemplate.send(TopicConstants.ORDER_MESSAGE_TOPIC, msg.getOrderId(), msg);
        logger.info("order-message-topic message sent, orderId: {}", msg.getOrderId());
    }
}

4 消费者代码

package com.ychen.goodscenter.fafka;

import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "order-message-topic")
    public void processMessage(Consumer<Long, SubmitOrderReq> consumer, SubmitOrderReq submitOrderReq) {
        try {
            logger.info("order-message-topic message received, orderId: {}", submitOrderReq.getOrderId());
            orderService.submitOrder(submitOrderReq);
            // 同步提交
            consumer.commitSync();
            logger.info("order-message-topic message acked: orderId: {}", submitOrderReq.getOrderId());
        } catch (DuplicateKeyException dupe) {
            // 处理异常情况
            logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);
            // 重复数据,忽略掉,同步提交
            consumer.commitSync();
        } catch (Exception e) {
            // 处理异常情况
            logger.error("order-message-topic message processMessage error", e);
        }
    }
}

三、开始踩坑了

1 添加信任自己包

Caused by: java.lang.IllegalArgumentException: The class 'com.ychen.goodscenter.vo.req.SubmitOrderReq' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
	... 15 common frames omitted

原因: 因为我在消费消息时用了json序列化,需要给这个序列化,添加信任自己包,不加json序列号会报错

解决方法:添加配置

spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: "com.ychen.**"  

解决途径:百度

2 consumer.commitSync(); 无效

问题发现:当我正在批量消费消息时,强制重启应用进程,发现有部分消息丢失了,没有处理

我发了5000个样本请求,最后只生成了4912 个订单(中途强制重启了2次)

问题分析:有2中可能

第一种:之前配置的enable-auto-commit: false  是无效的。

第二种: consumer.commitSync(); 一次将批量拉取的offset提交了

问题排查:

通过在 consumer.commitSync(); 代码之前和之后分别打一个断点,然后发送一批数据

consumer.commitSync(); 之前:

consumer.commitSync(); 之后

结果发生了突变,说明是consumer.commitSync();执行之后引发的offset突变

翻阅源码:

总体而言,通过官方文档和源代码,我们可以确定 commitSync() 提交的是已经成功拉取到的消息的最大 offset,而不是当前正在处理的消息的 offset。

3 缺少AckMode 配置

既然consumer.commitSync();无法在批量处理消息的环境保证消息不丢失,那么需要寻找新的解决方案:

在org.springframework.kafka.annotation.KafkaListener 类的注释上面有写到可以使用org.springframework.kafka.support.Acknowledgment

然后我们消费者的代码改造后为:

package com.ychen.goodscenter.fafka;

import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "order-message-topic")
    public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
        try {
            logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());
            orderService.submitOrder(record.value());
            // 同步提交
            acknowledgment.acknowledge();
            logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
        } catch (DuplicateKeyException dupe) {
            // 处理异常情况
            logger.error("order-message-topic message processMessage data DuplicateKeyException", dupe);
            // 重复数据,忽略掉,同步提交
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 处理异常情况
            logger.error("order-message-topic message processMessage error", e);
        }
    }
}
Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

这里我也有点不懂了,明明已经配置自动提交了,还需要配置 ack-mode: MANUAL,既然他说要那就配置吧

在application.yml 增加配置

spring:
  kafka:
    listener:
      ack-mode: MANUAL

现在准备2000个样本,然后让消费者实例强制重启2次,看看数据库的订单数量是否为2000条

现在正确了,支持系统宕机仍然不丢失消息了

四、最终的配置文件和消费者代码

1 配置文件

spring:
  profiles:
    active: dev
  application:
    name: goods-center
  kafka:
    bootstrap-servers: 192.168.31.114:9092
    listener:
      ack-mode: MANUAL
    producer:
      acks: all
      timeout.ms: 5000
      # 值序列化:使用Json
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      enable:
        idempotence: true # 默认为True
    consumer:
      properties:
        spring.json.trusted.packages: "com.ychen.**" # 信任自己包,不加json序列号会报错
      group-id: goods-center
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      enable-auto-commit: false # 取消自动提交

2 消费者代码

package com.ychen.goodscenter.fafka;

import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "order-message-topic")
    public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
        try {
            logger.info("order-message-topic message received, orderId: {}", record.value().getOrderId());
            orderService.submitOrder(record.value());
            // 同步提交
            acknowledgment.acknowledge();
            logger.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
        } catch (DuplicateKeyException dupe) {
            // 处理异常情况
            logger.error("order-message-topic message error DuplicateKeyException", dupe);
            // 重复数据,忽略掉,同步提交
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 处理异常情况
            logger.error("order-message-topic message error unknown ", e);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/346162.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

linux更新内核

内核介绍 官网链接:https://kernel.org 内核下载库: https://mirrors.edge.kernel.org/pub/linux/kernel/ 更新软件源 rootcary:~# apt-get update rootcary:~# sudo apt-get install libncurses5-dev build-essential kernel-package flex bison libelf-dev libssl-dev 下…

大数据学习之Redis、从零基础到入门(一)

目录 一、Redis入门概述 1. 是什么&#xff1f; 官方解释&#xff1a; 2. 能干嘛&#xff1f; 2.1 主流功能与应用 2.1.1分布式缓存 2.1.2内存存储和持久化(RDBAOF) 2.1.3高可用架构搭建 2.1.4缓存穿透、击穿、雪崩 2.1.5分布式锁 2.1.6队列 2.2 总体功能概括 2.3…

基于Java SSM框架实现在线考试系统项目【项目源码+论文说明】

基于java的SSM框架实现在线考试系统演示 摘要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人们所认识&#…

面试知识点:notify是随机唤醒线程吗(唤醒线程顺序)?

做 Java 开发的小伙伴&#xff0c;对 wait 方法和 notify 方法应该都比较熟悉&#xff0c;这两个方法在线程通讯中使用的频率非常高&#xff0c;但对于 notify 方法的唤醒顺序&#xff0c;有很多小伙伴的理解都是错误的&#xff0c;有很多人会认为 notify 是随机唤醒的&#xf…

Vue实现图片预览,侧边栏懒加载,不用任何插件,简单好用

实现样式 需求 实现PDF上传预览&#xff0c;并且不能下载 第一次实现&#xff1a;用vue-pdf&#xff0c;将上传的文件用base64传给前端展示 问题&#xff1a; 水印第一次加载有后面又没有了。当上传大的pdf文件后&#xff0c;前端获取和渲染又长又慢&#xff0c;甚至不能用 修…

2. figure 常见属性

2. figure 常见属性 一 figsize二 dpi三 facecolor四 edgecolor五 frameon 数据可视化是数据分析中不可或缺的一环&#xff0c;而Matplotlib作为Python中最流行的绘图库之一&#xff0c;扮演着重要的角色。在Matplotlib中&#xff0c;matplotlib.figure.Figure对象是构建图形的…

MyBatis详解(2)-- mybatis配置文件

MyBatis详解&#xff08;2&#xff09; mybatis配置文件 mybatis配置文件 1.构建SqlSessionFactory的依据。 2.MyBatis最为核心的内容&#xff0c;对MyBatis的使用影响很大。 3.配置文件的层次顺序不能颠倒&#xff0c;一旦颠倒会出现异常。 < c o n f i g u r a t i o n…

openresty 安装, nginx与 openresty

openresty VS nginx Nginx 是一款高性能的 Web 服务器和反向代理服务器&#xff0c;具备基础的功能如HTTP服务、负载均衡、反向代理以及动静分离等。它是许多互联网应用的核心组件&#xff0c;因其模块化和可扩展的设计而受到欢迎。1 OpenResty 是基于 Nginx 的 Web 平台&…

C++入门篇章1(C++是如何解决C语言不能解决的问题的)

目录 1.C关键字(以C98为例)2.命名空间2.1 命名空间定义2.2命名空间使用 3.C输入&输出4.缺省参数4.1缺省参数概念4.2 缺省参数分类 5. 函数重载5.1函数重载概念5.2 C支持函数重载的原理--名字修饰(name Mangling) 1.C关键字(以C98为例) C总计63个关键字&#xff0c;C语言32…

【操作系统基础】【CPU访存原理】:寄存 缓存 内存 外存、内存空间分区、虚拟地址转换、虚拟地址的映射

存储器怎么存储数据、内存空间分区、虚拟地址转换 计算机的存储器&#xff1a;寄存 缓存 内存 外存&#xff08;按功能划分&#xff09; 计算机的处理器需要一个存储器来存储大量的指令和数据以便自己不断取指执行和访问数据。 内存&#xff08;内存就是运行内存&#xff0c…

利用git上传本地文件

1、建立仓库 2.然后刷新网站&#xff0c;获取下载链接&#xff0c;备用。 3、接下来在本地创建一个文件夹&#xff0c; 4、把github上面的仓库克隆到本地 git clone https://github.com/xxxxx&#xff08;https://github.com/xxxxx替换成你之前复制的地址&#xff09; 5、把…

简单快速取消AlertDialog的白色背景框,AlertDialog设置圆角背景

问题描述&#xff1a; 产品需求弹出的提示框是圆角&#xff0c;使用shape 设置圆角背景后&#xff0c;弹出的AlertDialog提示框四个角有白色的背景&#xff0c;据分析这个背景是 AlertDialog 父组件的背景色。 解决方法&#xff1a; 将Dialog的背景设置为透明色&#xff0c;代…

生产力工具|卸载并重装Anaconda3

一、Anaconda3卸载 &#xff08;一&#xff09;官方方案一&#xff08;Uninstall-Anaconda3-不能删除配置文件&#xff09; 官方推荐的方案是两种&#xff0c;一种是直接在Anaconda的安装路径下&#xff0c;双击&#xff1a; &#xff08;可以在搜索栏或者使用everything里面搜…

两数之和[中等]

一、题目 给你一个下标从1开始的整数数组numbers&#xff0c;该数组已按非递减顺序排列&#xff0c;请你从数组中找出满足相加之和等于目标数target的两个数。如果设这两个数分别是numbers[index1]和numbers[index2]&#xff0c;则1 < index1 < index2 < numbers.len…

Oracle报错:ORA-12541:TNS:无监听程序 (很大概率是listener.log满了,4G就无法写入了)

目录标题 一、前提二、查看listener.log三、如果是listener.log满了&#xff0c;内存达到4G,可以使用以下方法解决。&#xff08;一&#xff09;停用服务&#xff08;二&#xff09;将满了的listener.log日志删除或者改名&#xff0c;然后新建一个一样的listener.log文件&#…

助力工业生产质检,基于YOLOv7【tiny/l/x】不同系列参数模型开发构建生产制造场景下布匹瑕疵缺陷检测识别分析系统

纯粹的工业制造没有办法有长久的发展过程&#xff0c;转制造为全流程全场景的生产智造才是未来最具竞争力的生产场景&#xff0c;在前面的开发实践中我们已经涉足工业生产场景下进行了很多实地的项目开发&#xff0c;如&#xff1a;PCB电路板缺陷检测、焊接缺陷检测、螺母螺钉缺…

面试题-【消息队列】

消息队列 问题1 如何进行消息队列的技术选型优点解耦 &#xff08;pub/sub模型&#xff09;异步&#xff08;异步接口性能优化&#xff09;削峰 使用消息队列的缺点几种消息队列的特性 问题2 引入消息队列之后该如何保证其高可用性RabbitMQ的高可用kafka高可用 问题3 在消息队列…

07 队列

目录 1.队列 2.实现 3.OJ题 1. 队列 只允许在一段进行插入数据操作&#xff0c;在另一端进行数据删除操作的特殊线性表&#xff0c;队列具有先进先出FIFO&#xff08;First In Firtst Out&#xff09;&#xff0c;插入操作的叫队尾&#xff0c;删除操作的叫队头 2. 实现 队列…

前端echarts图形报表常见的样式配置

文章目录 &#x1f412;个人主页&#x1f3c5;Vue项目常用组件模板仓库&#x1f4d6;前言&#xff1a;&#x1f415;1.深色主题&#x1f415;2.改变柱状图颜色&#x1f415;突然发现去问ai&#xff0c;更容易理解&#xff0c;那就不总结了 &#x1f412;个人主页 &#x1f3c5;…

太阳光模拟器汽车耐老化太阳跟踪聚光户外加速老化试验

1 范围 1.1 本标准适用于以太阳为光源的菲涅耳反射系统来进行汽车外饰材料的加速老化试验。 1.2 本标准规定的设备和方法可用于确定曝露于日光、热和潮湿下的汽车材料的相对耐老化性&#xff0c; 前提是假设试验期间发生的对材料加速老化速率起决定性作用的物理、化学变化机理…