kafka-重试和死信主题(SpringBoot整合Kafka)

文章目录

  • 1、重试和死信主题
  • 2、死信队列
  • 3、代码演示
    • 3.1、appication.yml
    • 3.2、引入spring-kafka依赖
    • 3.3、创建SpringBoot启动类
    • 3.4、创建生产者发送消息
    • 3.5、创建消费者消费消息

1、重试和死信主题

kafka默认支持重试和死信主题

  1. 重试主题:当消费者消费消息异常时,手动ack如果有异常继续向后消费,为了保证消息尽可能成功消费,可以将消费异常的消息扔到重试主题,再通过重试主题的消费者尝试消费
  2. 死信主题:当一个消息消费多次仍然失败,可以将该消息存到死信主题。避免漏消息,以后可以人工介入手动解决消息消费失败的问题

2、死信队列

在Kafka中,DLT通常指的是 Dead Letter Topic(死信队列)

Dead Letter Topic(DLT)的定义与功能

  1. 背景:原生Kafka是不支持Retry Topic和DLT的,但Spring Kafka在客户端实现了这两个功能。
  2. 功能:当消息在Kafka中被消费时,如果消费逻辑抛出异常,并且重试策略(如默认重试10次后)仍无法成功处理该消息,Spring Kafka会将该消息发送到DLT中。
  3. 自定义处理:可以通过自定义SeekToCurrentErrorHandler来控制消费失败后的处理逻辑,例如添加重试间隔、设置重试次数等。如果在重试后消息仍然消费失败,Spring Kafka会将该消息发送到DLT。

DLT的使用与意义

  1. 错误处理:DLT为Kafka提供了一种机制来处理那些无法被成功消费的消息,从而避免了消息的丢失或阻塞。
  2. 监控与告警:通过对DLT的监控,可以及时发现并处理那些无法被成功消费的消息,从而保障Kafka系统的稳定性和可靠性。
  3. 二次处理:对于发送到DLT的消息,可以进行二次处理,如手动干预、修复数据等,以确保这些消息能够最终得到处理。

总之,在Kafka中,DLT是一个用于处理无法被成功消费的消息的特殊Topic,它提供了一种灵活且可靠的机制来保障Kafka系统的稳定性和可靠性。

3、代码演示

3.1、appication.yml

server:
  port: 8120
# v1
spring:
  Kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    consumer:
      # read-committed读事务已提交的消息 解决脏读问题
      isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
      # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
      # 调用ack方法时才会提交ack给kafka
#      enable-auto-commit: false
      # 消费者提交ack时多长时间批量提交一次
      auto-commit-interval: 1000
      # 消费者第一次消费主题消息时从哪个位置开始
      # earliest:从最早的消息开始消费
      # latest:第一次从LEO位置开始消费
      # none:如果主题分区没有偏移量,则抛出异常
      auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # json反序列化器
#      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
    listener:
      # 手动ack:manual手动ack时 如果有异常会尝试一直消费
#      ack-mode: manual
      # 手动ack:消费有异常时停止
      ack-mode: manual_immediate


3.2、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu</groupId>
    <artifactId>spring-kafka-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-kafka-consumer</name>
    <description>spring-kafka-consumer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3.3、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaConsumerApplication.class, args);
    }

}

3.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.LocalDateTime;
@SpringBootTest
public class KafkaProducerApplicationTests {
    @Resource
    KafkaTemplate kafkaTemplate;

    @Test
    void sendRertyMsg(){
        kafkaTemplate.send("retry_topic",0,"","重试机制和死信队列"+"_"+ LocalDateTime.now());
    }
}

在这里插入图片描述

没有指定分区,默认会创建一个分区,即使此时有3个kafka实例,也只会用一个,因为只有一个分区

在这里插入图片描述
在这里插入图片描述

[
  [
    {
      "partition": 0,
      "offset": 0,
      "msg": "重试机制和死信队列_2024-06-07T15:49:37.398841600",
      "timespan": 1717746578357,
      "date": "2024-06-07 07:49:38"
    }
  ]
]

3.5、创建消费者消费消息

给一个消费者配重试主题的时候,死信主题消费者一般不写

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.retry.annotation.Backoff;
@Component
public class MyKafkaListenerAck {
    /**
     * 自动ack可能会导致漏消息
     *      spring-kafka:
     *                     自动ack 如果有异常,会死循环获取消息重新消费
     *                        不能继续向后消费消息,会导致消息积压
     *
     *                    手动ack 配置了手动ack,且ack-mode为manual_immediate时,
     *                        如果消息消费失败,会继续向后消费
     * @param record
     */
    //指定消费者消费异常使用重试+死信主题   必须结合手动ack使用
    @RetryableTopic(
            attempts = "3",
            numPartitions = "3",  //重试主题的分区数量
            backoff =@Backoff(value = 2_000L),  //重试的时间间隔
            autoCreateTopics ="true",
            retryTopicSuffix = "-myRetry",  //指定重试主题创建时的后缀名:使用原主题的名称拼接后缀生成名称 -retry
            dltTopicSuffix = "-myDlt" //指定DLT主题创建时的后缀名:使用原主题的名称拼接后缀生成名称 -dlt
    )
    @KafkaListener(groupId = "my_group1"
    ,topicPartitions = {@TopicPartition(topic = "retry_topic",partitions = {"0","1","2"})})
    public void consumeByRetry(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.println("consumeByRetry消费者获取到消息:topic = "+ record.topic()
                +",partition:"+record.partition()
                +",offset = "+record.offset()
                +",key = "+record.key()
                +",value = "+record.value());
        int i = 1/0;
        //手动ack
        ack.acknowledge();
    }

   /* //死信队列默认名称在原队列后拼接'-dlt'
    @KafkaListener(groupId = "my_group2"
            ,topicPartitions = {@TopicPartition(topic = "topic_dlt",partitions = {"0","1","2"})})
    public void consumeByDLT(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.println("consumeByDLT消费者获取到消息:topic = "+ record.topic()
                +",partition:"+record.partition()
                +",offset = "+record.offset()
                +",key = "+record.key()
                +",value = "+record.value());
        //手动ack
        ack.acknowledge();
    }*/

}

此时运行SpringKafkaConsumerApplication,控制台会报错,因为我们手动写了异常 int i=1/0

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

[
  [
    {
      "partition": 0,
      "offset": 0,
      "msg": "重试机制和死信队列_2024-06-07T15:49:37.398841600",
      "timespan": 1717746979414,
      "date": "2024-06-07 07:56:19"
    }
  ]
]

我们发现原本在重试主题中的消息,死信主题也有一份

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/689765.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++基础7:STL六大组件

目录 一、标准容器 1、顺序容器 vector ​编辑 deque list 容器适配器 stack queue prority_queue: 关联容器 有序关联容器set、mutiset、map、mutimap 增删查O(log n) 无序关联容 unordered_set、unordered_mutiset、unordered_map、unordered_mutimap 增删…

C语言实现map数据结构 key—value对应

1.首先43行 createKeyValuePair(char*key ,int value)这个函数就是给一个keyValuePair *pair的指针来通过内存分配将数据key和value存入这个pair指针所对应的内存空间 2.52行freeKeyValuePair这个函数是释放内存空间 3.头文件 struct结构体KeyValuePair就是一个指针一个值 4…

个人博客的未来出路在哪里?

说起个人博客的未来这就是个悲伤的话题,估计不少个人博客站长们都在苦苦的坚持和挣扎着吧,反正明月这两年感受最深刻的就是又有不少个人博客站点停更和 404 了都。自从坚持写博客这近十来年这种情况也都见怪不怪了,但这两年最突出的就是很多站长都是迷茫和悲观。 明月去年在…

Vue CLI 环境变量使用指南

一、简介 Vue CLI 是一个强大的前端工程化工具&#xff0c;它提供了丰富的配置选项&#xff0c;包括环境变量的管理。环境变量允许开发者根据不同的运行环境&#xff08;如开发、测试和生产&#xff09;应用不同的配置&#xff0c;而无需更改代码。本文将详细介绍如何在 Vue C…

AI图书推荐:用ChatGPT来写非虚构类书籍

这本书《用ChatGPT来写非虚构类书籍 》&#xff08;ChatGPT For KDP_ A manual from an experienced self-publisher to nonfiction authors for writing the book you were born to write with ChatGPT prompts mastering&#xff09;是一本专为非虚构类书籍作者编写的指南&am…

英伟达黄仁勋最新主题演讲:“机器人时代“已经到来

6月2日&#xff0c;英伟达联合创始人兼首席执行官黄仁勋在Computex 2024&#xff08;2024台北国际电脑展&#xff09;上发表主题演讲&#xff0c;分享了人工智能时代如何助推全球新产业革命。 黄仁勋表示&#xff0c;机器人时代已经到来&#xff0c;将来所有移动的物体都将实现…

[职场] 美术学就业方向和前景 #经验分享#学习方法

美术学就业方向和前景 2011年国务院学位委员会、教育部颁布了新的《学位授予和人才培养学科目录》&#xff0c;艺术学首次从文学门类中独立出来&#xff0c;成为新的第13个学科门类&#xff0c;即艺术学门类。其中&#xff0c;美术学又是艺术学门类下的五个一级学科之一。但是…

微信小程序开发,引用Vant Weapp UI样式,报错“没有找到可以构建的 NPM 包……”

文章目录 1.安装 Vant Weapp 的步骤2.常见问题 1.安装 Vant Weapp 的步骤 npm 安装 Vant Weapp修改 app.json构建 npm 包引入组件验证 npm 安装 Vant Weapp npm i vant/weapp -S --production修改 app.json 将 app.json 中的 “style”: “v2” 去除&#xff0c;小程序的新…

【数据分享】《中国文化文物与旅游统计年鉴》2022

最近老有同学过来询问《中国旅游年鉴》、《中国文化文物统计年鉴》、《中国文化和旅游统计年鉴》、《中国文化文物与旅游统计年鉴》&#xff0c;这四本年年鉴的关系以及怎么获取这四本年鉴。今天就在这里给大家分享一下这四本年鉴的具体情况。 实际上2018年&#xff0c;为适应…

mac无法读取windows分区怎么办 苹果硬盘怎么读取

对于Mac电脑用户但有Windows系统使用需求的&#xff0c;我们可以通过Boot Camp启动转换助理安装Windows分区这个方案来解决&#xff0c;不过因为两个系统的磁盘格式不同&#xff0c;相应的也会产生一些问题&#xff0c;例如无法正常读取windows分区。下面本文就详细说明mac无法…

阿里云(域名解析) certbot 证书配置

1、安装 certbot ubuntu 系统&#xff1a; sudo apt install certbot 2、申请certbot 域名证书&#xff0c;如申请二级域名aa.example.com 的ssl证书&#xff0c;同时需要让 bb.aa.example.com 也可以使用此证书 1、命令&#xff1a;sudo certbot certonly -d “域名” -d “…

【计算机毕业设计】基于SSM++jsp的在线医疗服务系统【源码+lw+部署文档】

包含论文源码的压缩包较大&#xff0c;请私信或者加我的绿色小软件获取 免责声明&#xff1a;资料部分来源于合法的互联网渠道收集和整理&#xff0c;部分自己学习积累成果&#xff0c;供大家学习参考与交流。收取的费用仅用于收集和整理资料耗费时间的酬劳。 本人尊重原创作者…

【AIGC+CAD】革新建筑、室内设计与建模领域的GenAI产品

一、产品定位 Augrade,一款专为建筑、室内设计和建模行业打造的AI CAD自动化工具。它凭借先进的AI技术,将2D蓝图迅速转化为精确的3D CAD模型,同时提供设计、成本分析的自动化以及全面的文档生成服务。Augrade致力于简化设计流程,确保技术可行性,并促进跨团队、跨工具的协…

怎么把m4a转换成mp3?四种常见的转换方法介绍!

怎么把m4a转换成mp3&#xff1f;在处理m4a音频文件时&#xff0c;我们可能会遇到一系列复杂的问题&#xff0c;首先&#xff0c;考虑到m4a是一种相对较新的音频格式&#xff0c;老旧的设备或软件可能无法准确识别它&#xff0c;这可能导致用户无法在这些设备上播放或编辑m4a文件…

标准发布实施 |《新能源电池工业废水处理技术指南磷酸铁锂电池》

T/ACEF 130&#xff0d;2024《新能源电池工业废水处理技术指南磷酸铁锂电池》欢迎各单位引用执行&#xff01;有课题也可联合立项&#xff01; 发布日期&#xff1a;2024年02月04日 实施日期&#xff1a;2024年03月01日 主要起草人&#xff1a;刘愿军、孙冬、丁炜鹏、何小芬…

django 内置 JSON 字段 使用场景

Django 内置的 JSON 字段&#xff08;JSONField&#xff09;是在 Django 3.1 版本中引入的&#xff0c;用于处理 JSON 格式的数据。JSONField 允许在数据库表中存储和查询 JSON 数据&#xff0c;并且在与 Python 代码交互时自动转换为合适的 Python 数据类型。以下是一些常见的…

SecureCRT[po破] for Mac SSH终端操作工具[解] 安装教程

文章目录 效果一、准备工作二、开始安装1、双击运行软件&#xff0c;将其从左侧拖入右侧文件夹中&#xff0c;等待安装完毕2、 应用程序显示软件图标&#xff0c;表示安装成功 三、输入对应参数1、解决“软件已损坏&#xff0c;无法打开&#xff0c;要移到废纸篓”问题解决步骤…

pdf压缩到指定大小的简单方法

压缩PDF文件是许多人在日常工作和学习中经常需要面对的问题。PDF文件因其跨平台、易阅读的特性而广受欢迎&#xff0c;但有时候文件体积过大&#xff0c;会给传输和存储带来不便。因此&#xff0c;学会如何有效地压缩PDF文件&#xff0c;就显得尤为重要。本文将详细介绍几种常见…

新能源汽车内卷真相

导语&#xff1a;2025年&#xff0c;我国新能源汽车总产能预计可达3661万辆&#xff0c;如此产能如何消化&#xff1f; 文 | 胡安 “这样卷下去不是办法&#xff0c;企业目的是什么&#xff1f;是盈利&#xff0c;为国家作贡献&#xff0c;为社会作贡献。我们应该有大格局&…

04 架构核心技术之分布式消息队列

本课时的主题是分布式消息队列&#xff0c;分布式消息队列的知识结构如下图。 本课时主要介绍以下内容。 同步架构和异步架构的区别。异步架构的主要组成部分&#xff1a;消息生产者、消息消费者、分布式消息队列。异步架构的两种主要模型&#xff1a;点对点模型和发布订阅模型…