Spring-Kafka 3.0 消费者消费失败处理方案

一、背景

我们作为Kafka在使用Kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存

二、设置消费失败重试次数

1 默认重试次数在哪里看

Kafka3.0 版本默认失败重试次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到 org.springframework.kafka.listener.SeekUtils

2 如何修改重试次数

据我的实验,spring-kafka3.0版本通过application.yml 配置是行不通的,也没有找到任何一项配置可以改重试次数的(网上很多说的通过配置spring.kafka.consumer.retries 可以配置,我尝试过了,至少3.0版本是不行的,如果有人成功试过可以通过application.yml 配置消费者的消费的重试次数可以留言通知我,谢谢)

经过我不懈努力和尝试,只能通过Java代码配置的方式才可以,并且这种方式相对于application.yml配置更加灵活细致,上代码

    public CommonErrorHandler commonErrorHandler() {
        BackOff backOff = new FixedBackOff(5000L, 3L);
        return  new DefaultErrorHandler(backOff);
    }

然后把这个handler 添加到ConcurrentKafkaListenerContainerFactory中就行了

三、设置消费失败处理方式

1 保存到数据库重试

我们需要在创建DefaultErrorHandler类时加入一个ConsumerAwareRecordRecoverer参数就可以了,这样在重试3次后仍然失败就会保存到数据库中,注意这里save to db成功之后,我认为没有必要执行consumer.commitSync方法,首先这个consumer.commitSync这个方法默认是提交当前批次的最大的offset(可能会导致丢失消息),其次不提交Kafka的消费者仍然回去消费后面的消息,只要后面的消息,消费成功了,那么依然会提交offset,覆盖了这个offset

    public CommonErrorHandler commonErrorHandler() {
        // 创建 FixedBackOff 对象
        BackOff backOff = new FixedBackOff(5000L, 3L);
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
            log.info("save to db " + record.value().toString());
        }, backOff);
        return defaultErrorHandler;
    }

如果你硬要提交也可以试试下面这种,指定提交当前的offset

    public CommonErrorHandler commonErrorHandler() {
        // 创建 FixedBackOff 对象
        BackOff backOff = new FixedBackOff(5000L, 3L);
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
            log.info("save to db " + record.value().toString());
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
            consumer.commitSync(offsets);
        }, backOff);
        return defaultErrorHandler;
    }

2 发送到Kafka死信队列

仍然在创建DefaultErrorHandler类时加入一个DeadLetterPublishingRecoverer 类就行了,默认会把消息发到kafkaTemplate 配置的topic名字为your_topic+.DLT

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
   

 public CommonErrorHandler commonErrorHandler() {
        // 创建 FixedBackOff 对象
        BackOff backOff = new FixedBackOff(5000L, 3L);

        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff);

        return defaultErrorHandler;
    }
ConsumerRecordRecoverer 接口总共就这2种实现方式

四、整体消费者代码粘贴

1 application.yml

kafka-consumer:
  bootstrapServers: 192.168.31.114:9092
  groupId: goods-center
  #后台的心跳线程必须在30秒之内提交心跳,否则会reBalance
  sessionTimeOut: 30000
  autoOffsetReset: latest
  #取消自动提交,即便如此 spring会帮助我们自动提交
  enableAutoCommit: false
  #自动提交间隔
  autoCommitInterval: 1000
  #拉取的最小字节
  fetchMinSize: 1
  #拉去最小字节的最大等待时间
  fetchMaxWait: 500
  maxPollRecords: 50
  #300秒的提交间隔,如果程序大于300秒提交,会报错
  maxPollInterval: 300000
  #心跳间隔
  heartbeatInterval: 10000
  keyDeserializer: org.apache.kafka.common.serialization.LongDeserializer
  valueDeserializer: org.springframework.kafka.support.serializer.JsonDeserializer

2 KafkaListenerProperties

package com.ychen.goodscenter.fafka;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
//指定配置文件的前缀
@ConfigurationProperties(prefix = "kafka-consumer")
@Getter
@Setter
public class KafkaListenerProperties {
 
    private String groupId;
 
    private String sessionTimeOut;
 
    private String bootstrapServers;
 
    private String autoOffsetReset;
 
    private boolean enableAutoCommit;
 
    private String autoCommitInterval;
 
    private String fetchMinSize;
 
    private String fetchMaxWait;
 
    private String maxPollRecords;
 
    private String maxPollInterval;
 
    private String heartbeatInterval;
 
    private String keyDeserializer;
 
    private String valueDeserializer;
 
}

3 KafkaConsumerConfig

package com.ychen.goodscenter.fafka;

import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.*;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableConfigurationProperties(KafkaListenerProperties.class)
@Slf4j
public class KafkaConsumerConfig {
    @Autowired
    private KafkaListenerProperties kafkaListenerProperties;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 并发数 多个微服务实例会均分
        factory.setConcurrency(2);
//        factory.setBatchListener(true);
        factory.setCommonErrorHandler(commonErrorHandler());

        ContainerProperties containerProperties = factory.getContainerProperties();
        // 是否设置手动提交
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }


    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> consumerConfigs = consumerConfigs();
        log.info("消费者的配置信息:{}", JSONObject.toJSONString(consumerConfigs));
        return new DefaultKafkaConsumerFactory<>(consumerConfigs);
    }


    public CommonErrorHandler commonErrorHandler() {
        // 创建 FixedBackOff 对象
        BackOff backOff = new FixedBackOff(5000L, 3L);

        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff);
//        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> {
//            log.info("save to db " + record.value().toString());
//            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
//            offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
//            consumer.commitSync(offsets);
//        }, backOff);
        return defaultErrorHandler;
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // 服务器地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrapServers());
        // 是否自动提交
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerProperties.isEnableAutoCommit());
        // 自动提交间隔
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getAutoCommitInterval());

        //会话时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerProperties.getSessionTimeOut());
        //key序列化
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getKeyDeserializer());
        //value序列化
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getValueDeserializer());
        // 心跳时间
        propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getHeartbeatInterval());

        // 分组id
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroupId());
        //消费策略
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerProperties.getAutoOffsetReset());
        // poll记录数
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerProperties.getMaxPollRecords());
        //poll时间
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaListenerProperties.getMaxPollInterval());

        propsMap.put("spring.json.trusted.packages", "com.ychen.**");

        return propsMap;
    }


}

4 MessageListener

package com.ychen.goodscenter.fafka;

import com.ychen.goodscenter.service.OrderService;
import com.ychen.goodscenter.vo.req.SubmitOrderReq;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {
    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = "order-message-topic", containerFactory = "kafkaListenerContainerFactory")
    public void processMessage(ConsumerRecord<Long, SubmitOrderReq> record, Acknowledgment acknowledgment) {
        log.info("order-message-topic message Listener, Thread ID: " + Thread.currentThread().getId());

        try {
            log.info("order-message-topic message received, orderId: {}", record.value().getOrderId());

            orderService.submitOrder(record.value());
            // 同步提交
            acknowledgment.acknowledge();
            log.info("order-message-topic message acked: orderId: {}", record.value().getOrderId());
        } catch (DuplicateKeyException dupe) {
            // 处理异常情况
            log.error("order-message-topic message error DuplicateKeyException", dupe);
            // 重复数据,忽略掉,同步提交
            acknowledgment.acknowledge();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/348487.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

27.移除元素(力扣LeetCode)

文章目录 27.移除元素&#xff08;力扣LeetCode&#xff09;题目描述方法一&#xff1a;vector成员函数&#xff1a;erase方法二&#xff1a;暴力解法方法三&#xff1a;双指针法 27.移除元素&#xff08;力扣LeetCode&#xff09; 题目描述 给你一个数组 nums 和一个值 val&…

【开源】基于JAVA语言的班级考勤管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 系统基础支持模块2.2 班级学生教师支持模块2.3 考勤签到管理2.4 学生请假管理 三、系统设计3.1 功能设计3.1.1 系统基础支持模块3.1.2 班级学生教师档案模块3.1.3 考勤签到管理模块3.1.4 学生请假管理模块 3.2 数据库设…

python Django入门

1.创建Django项目 方式一:进入到指定要存放项目的目录&#xff0c;执行*django-admin startproject “projectname”* 来创建一个名方式二:使用Pycharm专业版创建Django项目 创建项目后&#xff0c;默认的目录结构: manage.py:是Django用于管理本项目的命令行工具&#xff0c…

前出深入-机器学习

文章目录 一、K近邻算法1.1 先画一个散列图1.2 使用K最近算法建模拟合数据1.3 进行预测1.4 K最近邻算法处理多元分类问题1.5 K最近邻算法用于回归分析1.6 K最近邻算法项目实战-酒的分类1.6.1 对数据进行分析1.6.2 生成训练数据集和测试数据集1.6.3 使用K最近邻算法对数据进行建…

如何在Shopee平台上进行选品 :充分利用渠道获取灵感和数据支持

在Shopee平台上进行选品是一个关键的决策过程&#xff0c;它直接影响到卖家的销售业绩和店铺的发展。为了帮助卖家更好地进行选品&#xff0c;Shopee提供了多种渠道来获取灵感和数据支持。下面将介绍一些主要的选品渠道以及如何利用它们来进行选品。 先给大家推荐一款shopee知…

【DDD】学习笔记-深入分析软件的复杂度

软件复杂度的成因 Eric Evans 的经典著作《领域驱动设计》的副标题为“软件核心复杂性应对之道”&#xff0c;这说明了 Eric 对领域驱动设计的定位就是应对软件开发的复杂度。Eric 甚至认为&#xff1a;“领域驱动设计只有应用在大型项目上才能产生最大的收益”。他通过 Smart…

C#,数据检索算法之线性检索(Linear Search)的源代码

数据检索算法是指从数据集合&#xff08;数组、表、哈希表等&#xff09;中检索指定的数据项。 数据检索算法是所有算法的基础算法之一。 线性&#xff1f;听起来就“高大上”&#xff0c;其实&#xff0c;只不过就是挨个比较呗。 本文发布&#xff08;听起来很正式 &#x…

TarGAN:多模态医学图像转换GAN

TarGAN 核心思想网络结构 核心思想 论文&#xff1a;https://arxiv.org/abs/2105.08993 代码&#xff1a;https://github.com/2165998/TarGAN 解决的问题&#xff1a;传统多模态医学图像转换通常&#xff0c;在生成高质量图像方面存在问题&#xff0c;特别是在关键目标区域或…

C语言中计算结构体的大小

一. 使用sizeof 计算结构体的大小 通常情况下&#xff0c;我们习惯于使用 sizeof 运算符来计算结构体的大小。 例如&#xff0c;下面是一个结构体的定义&#xff1a; struct Student {int id;char name[20];int age;float score; }; 其中&#xff0c;Student是该结构体的类…

IP被封怎么办?如何绕过IP禁令?

相信很多人遇到过IP禁令&#xff1a;比如你在访问社交媒体、搜索引擎或电子商务网站时会被限制访问&#xff0c;又或者你的的账号莫名被封&#xff0c;这些由于网络上的种种限制我们经常会遭遇IP被封的情况&#xff0c;导致无法使用继续进行网络行动。在本文中&#xff0c;我们…

Django笔记(七):JWT认证

首 前后端分离的项目更多使用JWT认证——Json Web Token。本文记录djangorestframework-simplejwt的使用方式。文档 安装 pip install djangorestframework-simplejwt 配置settings.py: INSTALLED_APPS [rest_framework_simplejwt, ]REST_FRAMEWORK {DEFAULT_AUTHENTICA…

ARM 400系列控制器IP简介

1. GIC-400 GIC-400是一个高性能、区域优化的中断控制器&#xff0c;具有高级微控制器总线架构&#xff08;AMBA&#xff09;高级可扩展接口&#xff08;AXI&#xff09;接口。它在片上系统&#xff08;SoC&#xff09;配置中检测、管理和分配中断。你可以对GIC-400进行配置&am…

shell脚本基础之循环语句

目录 一、循环语句的概念 二、for循环语句 1、列表循环 2、列表for循环案例大全 案例一 案例二 案例三 案例四 案例五 案例六 案例七 案例八 3、不带列表循环 4、类似C语言风格的for循环 5、for循环总结 三、while循环语句 1、while循环语句格式 2、while死循…

概念性——数据库简介

前些天发现了一个人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;最重要的屌图甚多&#xff0c;忍不住分享一下给大家。点击跳转到网站。 概念性——数据库简介 介绍 数据对于当今许多应用程序和网站的运行至关重要。对热门视频的评论、多人游戏中分…

精酿啤酒的原料供应链:质量控制的重要性

对于啤酒的品质和口感&#xff0c;原料供应链的质量控制是重要的。特别是对于Fendi Club这样品质的啤酒&#xff0c;其原料供应链的管理更是重中之重。下面&#xff0c;我们将深入探讨Fendi Club啤酒如何对其原料供应链进行质量控制&#xff0c;以确保啤酒的品质和口感。 首先&…

专业144总分410+华南理工大学811信号与系统考研经验华工电子信息与通信

今年专业811信号与系统144&#xff08;二战&#xff0c;感谢信息通信Jenny老师专业课对我的巨大提高&#xff0c;第一年自己复习只考了90&#xff0c;主要栽专业课和数学&#xff09;总分410含泪&#xff08;二战的同学都知道苦&#xff0c;成功来之不易&#xff09;考上华南理…

【IC设计】Vivado单口RAM的使用和时序分析

文章目录 创建单口RAM IPIP Catalog中选择单口RAM IPBasicPort A OptionsOther Options 仿真找到IP例化原语编写Testbench 波形分析 创建单口RAM IP IP Catalog中选择单口RAM IP Basic Port A Options Other Options 仿真 找到IP例化原语 IP Sources-Instantiation Template…

深度了解TCP/IP模型

网络通信是现代社会不可或缺的一部分&#xff0c;而TCP/IP模型作为网络通信的基石&#xff0c;扮演着至关重要的角色。本文将深入探讨TCP/IP模型的概念、结构及其在网络通信中的作用&#xff0c;为读者提供全面的了解。 一.TCP/IP模型简介 TCP/IP模型是一个网络通信协议体系&a…

Android Settings 按住电源按钮

如题&#xff0c;Android 原生 Settings 里有个 按住电源按钮 的选项&#xff0c;可以设置按住电源按钮的操作。 按住电源按钮 两个选项的 UI 是分离的&#xff0c; 电源菜单 代码在 packages/apps/Settings/src/com/android/settings/gestures/LongPressPowerForPowerMen…

使用WebDriver采样器将JMeter与Selenium集成

第一步&#xff1a; 在JMeter中添加Selenium / WebDriver插件 第二步&#xff1a; 创建一条测试计划–添加线程组 添加配置元素 - jpgc - WebDriver Sampler 添加配置元素 - jpgc - Chrome Driver Config 并且添加监听器查看结果树 第三步&#xff1a; 下载 chromedriver…