Kafka 消费端消费重试和死信队列

系列文章目录


文章目录

  • 系列文章目录
  • 前言


前言

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。
在这里插入图片描述


Spring-Kafka 提供消费重试的机制。当消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 重新消费消息 。

默认情况下,Spring-Kafka 达到配置的重试次数时,【每条消息的失败重试时间,由配置的时间隔决定】Consumer 如果依然消费失败 ,那么该消息就会进入到死信队列。

Spring-Kafka 封装了消费重试和死信队列, 将正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

我们在应用中可以对死信队列中的消息进行监控重发,来使得消费者实例再次进行消费,消费端需要做幂等性的处理。
在这里插入图片描述
引入POM依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.6</version>
</dependency>

YML配置文件

这里Kafka也安装到了本地,Kafka安装Windows版:http://www.javacui.com/tool/667.html 。

# Web配置
server:
  servlet:
    context-path: /
  port: 1088
# kafka 配置
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      # 表示接受反序列化任意的类,也可限定包路径
      properties:
        spring:
          json:
            trusted:
              packages: '*'
    producer:
      retries: 0 # 重试次数
      # 0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;
      # 1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;(数据要求快,重要性不高时用)
      # -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,数据一般不会丢失,延迟时间长但是可靠性高。(数据重要时用)
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1,0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    listener:
      missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错

配置ErrorHandler,用于定制重试次数和间隔时间

package com.example.springboot.config.kafka;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
 
/**
 * Spring-Kafka 通过实现自定义的 SeekToCurrentErrorHandler ,当 Consumer 消费消息异常的时候,进行拦截处理:
 * 重试小于最大次数时,重新投递该消息给 Consumer
 * 重试到达最大次数时,如果Consumer 还是消费失败时,该消息就会发送到死信队列。 死信队列的 命名规则为: 原有 Topic + .DLT 后缀 = 其死信队列的 Topic
 */
 
@Configuration
public class KafkaConfiguration {
   
    private Logger logger = LoggerFactory.getLogger(getClass());
 
    @Bean
    @Primary
    public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
   
        logger.warn("kafkaErrorHandler begin to Handle");
        // <1> 创建 DeadLetterPublishingRecoverer 对象
        ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
        // <2> 创建 FixedBackOff 对象   设置重试间隔 10秒 次数为 1 次
        // 创建 DeadLetterPublishingRecoverer 对象,它负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。
        // 注意,正常发送 1 次,重试 1 次,等于一共 2 次
        BackOff backOff = new FixedBackOff(10 * 1000L, 1L);
        // <3> 创建 SeekToCurrentErrorHandler 对象
        return new SeekToCurrentErrorHandler(recoverer, backOff);
    }
}

编写生产者代码

package com.example.springboot.controller;
 
import cn.hutool.core.date.DateUtil;
import com.example.springboot.model.Blog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
import java.util.Date;
 
@RestController
@RequestMapping("/test/kafka")
public class MessSendController {
   
    @Autowired
    private KafkaTemplate kafkaTemplate;
    private static final String messTopic = "test";
 
    @RequestMapping("/send")
    public String sendMess() {
   
        Blog blog = Blog.builder().id(1).name("测试").isDel(false).birthday(new Date()).build();
        kafkaTemplate.send(messTopic, blog);
        System.out.println("客户端 消息发送完成");
        return DateUtil.now();
    }
}

这里请求路径:http://localhost:1088/test/kafka/send 时,会发送一条消息到队列。

编写消费端代码

package com.example.springboot.listener;
 
import com.alibaba.fastjson.JSON;
import com.example.springboot.model.Blog;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaMessListener {
   
    private static final String messTopic = "test";
 
    @KafkaListener(id="KafkaMessLis

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/533538.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【QT】pro文件里添加又删除LIBS不影响运行的原因

我发现个问题啊&#xff0c;如果运行项目&#xff0c;发现报错&#xff0c;缺少某dll&#xff0c;接着你在pro文件里加上win32:LIBS -lOpengl32&#xff08;举个例子&#xff09;&#xff0c;接着可以运行了&#xff0c;接着把这行删掉&#xff0c;再运行&#xff0c;仍然可以…

中介者模式【行为模式C++】

1.简介 中介者模式是一种行为设计模式&#xff0c; 能让你减少对象之间混乱无序的依赖关系。 该模式会限制对象之间的直接交互&#xff0c; 迫使它们通过一个中介者对象进行合作。 亦称&#xff1a; 调解人、控制器、Intermediary、Controller、Mediator 2.示例 中介者模式在…

计算机组成原理(IO,输入输出)

1、“821.2016T1(1)”&#xff0c;表示821真题&#xff0c;2016年的题&#xff0c;T1是 选择题/填空题/大题 的第一题&#xff0c;其他类似标记也是相通 2、个人小白总结自用&#xff0c;不一定适用于其他人&#xff0c;请自行甄别 3、有任何疑问&#xff0c;欢迎私信探讨&…

uniapp开发h5端使用video播放mp4格式视频黑屏,但有音频播放解决方案

mp4格式视频有一些谷歌播放视频黑屏&#xff0c;搜狗浏览器可以正常播放 可能和视频的编码格式有关&#xff0c;谷歌只支持h.264编码格式的视频播放 将mp4编码格式修改为h.264即可 转换方法&#xff1a; 如果是自己手动上传文件可以手动转换 如果是后端接口调取的地址就需…

【leetcode面试经典150题】36. 旋转图像(C++)

【leetcode面试经典150题】专栏系列将为准备暑期实习生以及秋招的同学们提高在面试时的经典面试算法题的思路和想法。本专栏将以一题多解和精简算法思路为主&#xff0c;题解使用C语言。&#xff08;若有使用其他语言的同学也可了解题解思路&#xff0c;本质上语法内容一致&…

【Ansible自动化运维】Ansible入门基础信息【安装配置、常用命令与模块】

介绍安装配置注意事项yum安装验证安装配置host配置主机清单配置主控端被控端 常用模块命令组成command模块shell模块copy模块script模块 日志信息最后 介绍 Ansible 是一个开源 IT 自动化引擎&#xff0c;可自动执行供应、配置管理、应用程序部署、编排和许多其他 IT 流程。它可…

MySQL分库分表的方式有哪些

目录 一、为什么要分库分表 二、什么是分库分表 三、分库分表的几种方式 1.垂直拆分 2. 水平拆分 四、分库分表带来的问题 五、分库分表技术如何选型 一、为什么要分库分表 如果一个网站业务快速发展&#xff0c;那这个网站流量也会增加&#xff0c;数据的压力也会随之而…

护眼台灯哪个牌子好?护眼台灯十大排名,看看业内人怎么选!

眼镜已成为许多人日常生活中不可或缺的一部分&#xff0c;但戴眼镜并不总是方便的。现在许多家长也越来越关注孩子的视力问题&#xff0c;有些学生的书桌上已经放置上了护眼台灯。这种台灯提供柔和的光线&#xff0c;有助于改善照明环境&#xff0c;保护眼睛健康。然而&#xf…

使用jQuery实现购物界面的动态效果

实现功能&#xff1a;&#xff08;购物车以表格的格式展示&#xff09; 1 全选框和复选框之间的联动关系&#xff1a; 点击全选&#xff0c;所有复选框checked状态为true 点击复选框&#xff0c;全选框状态实时更新 2 点击删除按钮&#xff0c;删除对应的行 3 点击删除所选…

VUE3的有关知识

学习vue3的原因 在vue2当中的组件的实例,都是data一块,computed一块,当我们去找某一变量相关的则十分麻烦,vue3是组合式API,vue2是选项式, vue3的优点: 1)组合式更易维护 2)更快的速度 3)更小的体积 4)更好的响应式proxy 使用vue3相关脚手架创建项目 步骤: 1)node -v node版…

【CVE-2010-2883】进行钓鱼攻击的研究

最近作业中研究APT攻击&#xff0c;了解到2011年前后披露的LURID-APT&#xff0c;其中敌手利用了各种版本的文件查看器的漏洞实现攻击。CVE-2010-2883就是其中被利用的一个adobe reader的漏洞。特此复现&#xff0c;更好的研究和防范APT攻击。 本文仅仅是对相关漏洞利用的学习…

PyCharm Pro 2024:卓越的Python编辑开发工具,适用于Mac与Windows平台

PyCharm Pro 2024是一款专为Python开发者设计的强大编辑开发工具&#xff0c;无论是Mac还是Windows用户&#xff0c;都能从中受益良多。该软件凭借其出色的性能、丰富的功能和卓越的用户体验&#xff0c;成为Python编程界的翘楚。 作为一款高效的Python编辑器&#xff0c;PyCh…

02-结构化程式与自定义函数

视频教程&#xff1a;b站视频【MATLAB教程_台大郭彦甫&#xff08;14课&#xff09;原视频补档】https://www.bilibili.com/video/BV1GJ41137UH/?share_sourcecopy_web&vd_sourc*ed6b9f96888e9c85118cb40c164875dfc 官网教程&#xff1a; MATLAB 快速入门 - MathWorks 中…

React 使用 three.js 加载 gltf 3D模型 | three.js 入门

系列文章 示例项目(gitcode)&#xff1a;https://gitcode.com/qq_41456316/simple-react-three-demo 文章目录 系列文章前言一、three.js是什么&#xff1f;二、使用 React 和 three.js 加载 glTF 3D 模型的步骤步骤 1&#xff1a;创建 React 应用步骤 2&#xff1a;安装 thre…

使用QT 开发不规则窗体

使用QT 开发不规则窗体 不规则窗体贴图法的不规则窗体创建UI模板创建一个父类创建业务窗体main函数直接调用user_dialog创建QSS文件 完整的QT工程 不规则窗体 QT中开发不规则窗体有两种方法&#xff1a;&#xff08;1&#xff09;第一种方法&#xff0c;使用QWidget::setMask函…

川土微高性能模拟芯片系列产品介绍和应用

一、公司简介 上海川土微电子有限公司是一家成立于2016年的专注于高端模拟芯片研发设计与销售的高科技公司&#xff0c;产品涵盖隔离与接口、驱动与电源、高性能模拟三大产品线以及μMiC战略产品&#xff08; micro-Module in Chip&#xff09;。目前产品已广泛应用于工业控制…

怎么在SU草图大师里做地形模型?

​​​​​​  1.导入地形数据&#xff1a;首先&#xff0c;找到你想要模拟的地形数据。常见的文件格式是DEM(数字高程模型)文件&#xff0c;它包含地表高程数据。在SketchUp中&#xff0c;通过菜单栏选择“文件” -> “导入” -> “DEM”&#xff0c;然后选择你的DEM文…

基于springboot实现桂林旅游景点导游平台管理系统【项目源码+论文说明】

基于springboot实现桂林旅游景点导游平台管理系统演示 摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了桂林旅游景点导游平台的开发全过程。通过分析桂林旅游景点导游平台管理的不足&#xff0c;创建了一个计算…

【数据结构(四)】链表经典练习题

❣博主主页: 33的博客❣ ▶️文章专栏分类:数据结构◀️ &#x1f69a;我的代码仓库: 33的代码仓库&#x1f69a; &#x1faf5;&#x1faf5;&#x1faf5;关注我带你学更多数据结构知识 目录 1.前言2.删除值为key的所有结点3.反转链表4.返回中间结点5.输出倒数第k个结点6.链表…

2024-04-10 Linux gzip 和 gunzip 命令,gzip 压缩的文件通常比原始文件小得多。

一、gzip 是 Linux 系统中用于压缩文件的命令&#xff0c;它通常用于将单个文件压缩成 .gz 格式的文件。gzip 压缩的文件通常比原始文件小得多&#xff0c;因此它在节省磁盘空间和减少文件传输时间方面非常有用。 gzip 命令的基本语法如下&#xff1a; gzip [选项] [文件]复制…