kafka-消费者-消费异常处理(SpringBoot整合Kafka)

文章目录

  • 1、消费异常处理
    • 1.1、application.yml配置
    • 1.2、注册异常处理器
    • 1.3、消费者使用异常处理器
    • 1.4、创建生产者发送消息
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:
      • 1.8.1、第一次启动SpringKafkaConsumerApplication
      • 1.8.n、第n次启动SpringKafkaConsumerApplication

1、消费异常处理

1.1、application.yml配置

server:
  port: 8120

# v1
spring:
  Kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    consumer:
      # read-committed读事务已提交的消息 解决脏读问题
      isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
      # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
      # 调用ack方法时才会提交ack给kafka
#      enable-auto-commit: false
      # 消费者提交ack时多长时间批量提交一次
      auto-commit-interval: 1000
      # 消费者第一次消费主题消息时从哪个位置开始
      # earliest:从最早的消息开始消费
      # latest:第一次从LEO位置开始消费
      # none:如果主题分区没有偏移量,则抛出异常
      auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 手动ack:manual手动ack时 如果有异常会尝试一直消费
#      ack-mode: manual
      # 手动ack:消费有异常时停止
      ack-mode: manual_immediate

在这里插入图片描述

1.2、注册异常处理器

package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
@Configuration
public class MyKafkaConfig {
    @Bean
    public NewTopic springTestPartitionTopic() {
        return TopicBuilder.name("my_topic1") //主题名称
                .partitions(3) //分区数量
                .replicas(3) //副本数量
                .build();
    }

    //方法名就是注入到容器中对象的名称
    @Bean
    public ConsumerAwareListenerErrorHandler myErrorHandler() {
        //创建异常处理器:消费者异常时 且注册使用当前异常处理器 会生效
        return new ConsumerAwareListenerErrorHandler() {

            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                System.out.println("出现异常,消息内容:value = " + message.getPayload());
                System.out.println("header = "+message.getHeaders());
                System.out.println("异常信息:" + e.getMessage());
                System.out.println("=================");
                return null;
            }
        };
    }
}

1.3、消费者使用异常处理器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListenerAck {
    /**
     * 自动ack可能会导致漏消息
     *      spring-kafka:
     *                     自动ack 如果有异常,会死循环获取消息重新消费
     *                        不能继续向后消费消息,会导致消息积压
     *
     *                    手动ack 配置了手动ack,且ack-mode为manual_immediate时,
     *                        如果消息消费失败,会继续向后消费
     * @param record
     */
    @KafkaListener(
            topicPartitions = {
                    @TopicPartition(topic = "my_topic1",partitions = {"0"})}
            , groupId = "my_group1",errorHandler = "myErrorHandler")
    public void onMessage1(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        System.out.println("my_group1消费者获取分区0的消息:topic = "+ record.topic()
                +",partition:"+record.partition()
                +",offset = "+record.offset()
                +",key = "+record.key()
                +",value = "+record.value());
        int i = 1/0;
        // 手动ack:手动确认消息已经消费 broker 会提交消费者偏移量
        acknowledgment.acknowledge();
    }
}

在这里插入图片描述

1.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;

import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest
class SpringKafkaConsumerApplicationTests {
    @Resource
    KafkaTemplate kafkaTemplate;

    @Test
    void contextLoads() {
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send("my_topic1",i%3,"", "指定ack-mode: manual_immediate消费"+i);
        }
    }

}

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaConsumerApplication.class, args);
    }

}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu</groupId>
    <artifactId>spring-kafka-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-kafka-consumer</name>
    <description>spring-kafka-consumer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.8、消费者控制台:

1.8.1、第一次启动SpringKafkaConsumerApplication

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 指定ack-mode: manual_immediate消费0
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1717672170845, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费0)
header = {id=91c96206-2381-5fa1-b391-52627056762f, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 指定ack-mode: manual_immediate消费3
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费3)
header = {id=8755d954-615c-37ff-67f0-85521e090b03, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定ack-mode: manual_immediate消费6
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费6)
header = {id=68082673-07d2-6f69-3935-c7034a7caf81, timestamp=1717672488753}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定ack-mode: manual_immediate消费9
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费9)
header = {id=bcbda053-9536-df91-d937-2688b5d4c6ea, timestamp=1717672488754}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================

1.8.n、第n次启动SpringKafkaConsumerApplication

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 指定ack-mode: manual_immediate消费0
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1717672170845, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费0)
header = {id=6bc9edb9-ad1c-e00e-9b27-c0c540248091, timestamp=1717672854508}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 指定ack-mode: manual_immediate消费3
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费3)
header = {id=14e6951b-b25f-10ca-702c-a1699d25645b, timestamp=1717672854508}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定ack-mode: manual_immediate消费6
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费6)
header = {id=26e85ef8-2502-fd29-8d5b-091ec0362900, timestamp=1717672854509}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定ack-mode: manual_immediate消费9
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费9)
header = {id=25c6b6c8-d02d-2091-7c6f-ebaea6c52d1d, timestamp=1717672854509}
异常信息:Listener method 'public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.springframework.kafka.support.Acknowledgment)' threw exception
=================

此时如果不关闭SpringKafkaConsumerApplication,生产者继续发送消息,消费者只会往后消费,不会从头再次消费

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/684929.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【案例分享】明道数云为阿联酋迪拜公司Eastman BLDG打造外贸管理系统

内容概要 本文介绍了Eastman公司与明道数云软件的合作&#xff0c;通过数字化解决方案提升了Eastman在贸易管理中的效率。Eastman公司位于阿联酋迪拜&#xff0c;周边城市有门店&#xff0c;人数大概在30&#xff0c;是一家主营瓷砖和石材类产品的贸易公司&#xff0c;面临着各…

C#,JavaScript实现浮点数格式化自动保留合适的小数位数

目标 由于浮点数有漂移问题&#xff0c;转成字符串时 3.6 有可能得到 3.6000000000001&#xff0c;总之很长的一串&#xff0c;通常需要截取&#xff0c;但按照固定长度截取不一定能使用各种情况&#xff0c;如果能根据数值大小保留有效位数就好了。 C#实现 我们可以在基础库里…

linux实验报告

实验一&#xff1a;Linux操作系统的安装与配置 实验目的&#xff1a; 1.掌握虚拟机技术&#xff1b; 2.掌握Linux的安装步骤&#xff1b; 3.掌握安装过程中的基本配置要求。 4.掌握正确启动Linux的方法&#xff1b; 5.掌握正确退出Linux的方法&#xff1b; 6.熟悉已安装…

【Python】把xmind转换为指定格式txt文本

人工智能训练通常需要使用文本格式&#xff0c;xmind作为一种常规格式不好进行解析&#xff0c;那如何把xmind转换为txt格式呢&#xff1f; 软件信息 python python -v Python 3.9.13 (tags/v3.9.13:6de2ca5, May 17 2022, 16:36:42) [MSC v.1929 64 bit (AMD64)] on win32…

C#知识|通过ADO.NET实现应用程序对数据库的查询操作。

哈喽,你好啊,我是雷工! 前边学习了通过ADO.NET实现C#应用程序对数据库的增、删、改操作。 接下来接着学习查询操作,以下为学习笔记。 查询返回有两种类型,一种是单行单列的单一结果,一种是结果集,首先了解查询结果是单行单列结果的写法。 01 查询返回单一结果 以前方的…

Python编程学习第一篇——Python零基础快速入门(五)—元组(Tuple)操作

Python元组是一种不可变的有序集合&#xff0c;可以存储多个不同类型的数据。元组使用小括号来表示&#xff0c;其中的元素用逗号分隔开。与列表不同&#xff0c;元组的元素不能被修改、删除或添加。它的一些常规操作包括元组的创建、访问、添加、修改、删除、运算等等&#xf…

编译原理-程序设计语言的设计

变量 存储单元及它的名称由变量的概念来代替; 可以代表一个或一组单元,可以修改。 绑定 绑定:一个实体(或对象)与其某种属性建立起某种联系的 过程,称为绑定。 静态绑定:凡是在编译时能确定的属性,称为静态属性; 若绑定在编译时完成,运行时不改变,称为静态…

武汉科技大学,计算机考研全面改考408,24计算机专硕复试线仅298分!武汉科技大学计算机考研考情分析!

武汉科技大学&#xff08;Wuhan University of Science and Technology&#xff09;简称“武科大”&#xff0c;坐落于湖北省武汉市&#xff0c;是湖北省人民政府、教育部和六家国家特大型企业共建高校&#xff0c;是湖北省“双一流”建设重点高校&#xff0c;入选国家“中西部…

高考志愿填报自动更新数据小程序开源版开发

高考志愿填报自动更新数据小程序开源版开发 版本一&#xff1a;java开源域名授权&#xff08;自动更新数据&#xff09;&#xff1b;5999 自研RSCH算法&#xff0c;专业和测评综合推算 微信一键登录注册&#xff0c;免注册获取微信头像、昵称 支持院校和专业优先&#xff0c;…

28 - 只出现一次的最大数字(高频 SQL 50 题基础版)

28 - 只出现一次的最大数字 select (selectnumfromMyNumbers group bynum havingcount(num)1order by num desc limit 1) as num;

Doris insert into 插入语句执行成功,且select查询成功,返回结果不报错,但查不到该插入数据

问题&#xff1a;Doris insert into 正常执行成功&#xff0c;select 查询也执行成功&#xff0c;但查不到该写入数据 原因&#xff1a;由于有其他 insert commit 事务待提交且该任务处于锁的状态&#xff0c;导致不断在回滚&#xff0c;进而造成其他的insert into 语句也执行成…

广西容县“四力”并举提质效,正宇软件助力智慧人大建设

近日&#xff0c;广西容县在提升人大代表履职质效方面取得显著成效&#xff0c;通过汇聚“四力”——活力、动力、合力、能力&#xff0c;成功打造了一个高效、务实的人大代表履职环境。这一成果不仅为当地人大工作树立了新的标杆&#xff0c;也为广西乃至全国的人大代表履职模…

24-unittest简介

一、unittest简介 unittest是Python中常用的单元测试框架&#xff0c;与Java中的Junit单元测试框架类似。 二、示例程序 1&#xff09;导入unittest模块 import unittest 2&#xff09;使用help()函数查看源码中的示例程序 help(unittest) Simple usage:import unittestc…

数组和指针的联系(C语言)

数组和指针是两种不同的数据类型&#xff0c;数组是一种构造类型&#xff0c;用于存储一组相同类型的变量&#xff1b;而指针是一种特殊类型&#xff0c;专门用来存放数据的地址。数组名除了sizeof(数组名)和&数组名表示整个数组外&#xff0c;其他情况下都表示的是首元素的…

Visual Studio Code 开发esp8266流程2Arduino 配置 nodemcu

http://arduino.esp8266.com/stable/package_esp8266com_index.json http://arduino.esp8266.com/stable/package_esp8266com_index.json Arduino: Library Manager Arduino: Board

零拷贝技术

背景 磁盘可以说是计算机系统重最慢的硬件之一&#xff0c;读写速度相对内存10以上&#xff0c;所以针对优化磁盘的技术非常的多&#xff0c;比如&#xff1a;零拷贝、直接I/O、异步I/O等等&#xff0c;这些优化的目的就是为了提高系统的吞吐量&#xff0c;另外操作系统内核中的…

【Unity学习笔记】反射

文章目录 前言反射通过反射获取类型 Unity中的反射用反射在Unity中动态加载 前言 在我平时做项目的时候&#xff0c;由于我们做的项目都是很简单的&#xff0c;所以不怎么接触反射机制。最早了解反射机制是关于Invoke的时候&#xff0c;知道可以通过方法名来直接进行Invoke调用…

数字人动作解决方案,塑造逼真动作

在品牌形象塑造、市场推广及客户服务等领域&#xff0c;企业正面临着前所未有的挑战和机遇。为满足企业的需求&#xff0c;美摄科技凭借其在人工智能和计算机视觉领域的深厚积累&#xff0c;推出了面向企业的数字人动作解决方案&#xff0c;助力企业轻松打造逼真、灵活的虚拟形…

apple开发者账户证书删除与下载

1.打开并登陆: Sign In - Apple 选择证书 证书管理页面 证书详情 删除或者下载证书 下载证书 删除证书

【实战】kafka3.X kraft模式集群搭建

文章目录 前言kafka2.0与3.x对比准备工作JDK安装kafka安装服务器增加hosts 修改Kraft协议配置文件格式化存储目录 启动集群停止集群测试Kafka集群创建topic查看topic列表查看消息详情生产消息消费消息查看消费者组查看消费者组列表 前言 相信很多同学都用过Kafka2.0吧&#xf…