springboot kafka在kafka server AUTH变动后consumer自动销毁

前言

笔者使用了kafka用来传输数据,笔者在今年10月写了文章,怎么使用配置化实现kafka的装载:springboot kafka多数据源,通过配置动态加载发送者和消费者-CSDN博客

不过在实际运行中,kafka broker是加密的,本来也没啥,但是突然的一天笔者在监控发现消费者掉线了,发送者居然还是正常的,见鬼的事情就是这么朴实的发生了,而且日志有Authentication/Authorization Exception and no authExceptionRetryInterval set

这样的错误,还有

Closing the Kafka consumer
Kafka consumer has been closed

这样的日志,非常诡异,后面查询kafka集群才知道,kafka集群在某个时间被改了AUTH配置,然后又改回来了,神奇的操作。

现象

实际上看到日志是懵的,毕竟kafka是对方提供的,AUTH用户名和密码也是对方给的,而且发送者也出现AUTH失败,但是发送者一直重试,然后因为kafka集群的AUTH改回来了,重试成功了;唯独消费者AUTH失败后,关闭了。

源码分析

从日志搜索Authentication/Authorization Exception and no authExceptionRetryInterval set

发现日志出现在

org.springframework.kafka.listener.KafkaMessageListenerContainer

刚好在消息监听器容器的内部类

ListenerConsumer

看源码定义,是一个定时线程池的任务定义

private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback

分析怎么去消费的KafkaMessageListenerContainer的doStart方法

既然明晰了,那么分析问题的来源, run方法

		public void run() {
			ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
			publishConsumerStartingEvent(); //事件通知,spring事件
			this.consumerThread = Thread.currentThread();
			setupSeeks();
			KafkaUtils.setConsumerGroupId(this.consumerGroupId);
			this.count = 0;
			this.last = System.currentTimeMillis();
			initAssignedPartitions(); // 初始分配partitions
			publishConsumerStartedEvent(); // 消费者启动事件
			Throwable exitThrowable = null;
			this.lastReceive = System.currentTimeMillis();
			while (isRunning()) { //状态在上面的截图代码已经更新为运行状态
				try {
					pollAndInvoke(); // 队列拉取,拉取过程会出现各种异常
				}
				catch (NoOffsetForPartitionException nofpe) { //这个需要注意,但是这个是不可配置的
					this.fatalError = true;
					ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");
					exitThrowable = nofpe;
					break;
				}
				catch (AuthenticationException | AuthorizationException ae) { //这个是授权失败,就是来源于kafka broker的auth鉴权
					if (this.authExceptionRetryInterval == null) { //时间配置,默认居然是null
						ListenerConsumer.this.logger.error(ae,
								"Authentication/Authorization Exception and no authExceptionRetryInterval set");
						this.fatalError = true;
						exitThrowable = ae;
						break; //循环结束
					}
					else { // 一段时间后重试
						ListenerConsumer.this.logger.error(ae,
								"Authentication/Authorization Exception, retrying in "
										+ this.authExceptionRetryInterval.toMillis() + " ms");
						// We can't pause/resume here, as KafkaConsumer doesn't take pausing
						// into account when committing, hence risk of being flooded with
						// GroupAuthorizationExceptions.
						// see: https://github.com/spring-projects/spring-kafka/pull/1337
						sleepFor(this.authExceptionRetryInterval);
					}
				}
				catch (FencedInstanceIdException fie) {
					this.fatalError = true;
					ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
							+ "' has been fenced");
					exitThrowable = fie;
					break;
				}
				catch (StopAfterFenceException e) {
					this.logger.error(e, "Stopping container due to fencing");
					stop(false);
					exitThrowable = e;
				}
				catch (Error e) { // NOSONAR - rethrown
					this.logger.error(e, "Stopping container due to an Error");
					this.fatalError = true;
					wrapUp(e);
					throw e;
				}
				catch (Exception e) {
					handleConsumerException(e);
				}
				finally {
					clearThreadState();
				}
			}
            //上面异常后,这里的处理
			wrapUp(exitThrowable);
		}

注意NoOffsetForPartitionException和AuthenticationException | AuthorizationException,其中能配置重试的是AuthenticationException | AuthorizationException异常,就是授权失败可以通过配置过一段时间抢救一下,一般而言,kafka首次授权失败基本上就不太可能成功了,但是这个只能控制consumer销毁,producer还在重试,所以出现了消费者销毁了,发送者在kafka集群auth还原后成功恢复。看看wrapUp方法

既然知道了原理,那么解决办法是配置

authExceptionRetryInterval

解决方法

配置authExceptionRetryInterval也是不容易,分析取值

private final Duration authExceptionRetryInterval =
       this.containerProperties.getAuthExceptionRetryInterval();

从 containerProperties来的,实际上是继承org.springframework.kafka.listener.ConsumerProperties

在spring-kafka 2.8版本还对属性命名重构了,毕竟以前的命名字母太多了,😁

不过要修改这个值也不容易,kafkaproperties并没有提供这个参数,而且创建消费者容器工厂时

org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory

那么只能在

ConcurrentMessageListenerContainer

创建成功后,从这个里面读取配置,设置默认值。这就可以使用前面埋点的spring事件了,通过事件拿到Consumer,不就可以修改配置了

使用

publishConsumerStartedEvent();

的事件最合适,执行循环前最后一个事件,而且这里的this,就是消费者容器org.springframework.kafka.listener.KafkaMessageListenerContainer对象,就是我们需要的

编写代码如下:

@Component
public class KafkaConsumerStartedListener implements ApplicationListener<ConsumerStartedEvent> {
    @Override
    public void onApplicationEvent(ConsumerStartedEvent event) {
        KafkaMessageListenerContainer<?, ?> container = event.getSource(KafkaMessageListenerContainer.class);
        container.getContainerProperties().setAuthExceptionRetryInterval(Duration.of(30, ChronoUnit.SECONDS));
    }
}

就这样就解决了问题,哈哈哈贼简单。 

笔者在使用API时发现了还有一个API是授权失败后重启,是布尔变量,默认值是false,即不重试。那么这个是哪里触发的呢,在我们看到的消费者销毁事件里面,实际上控制重试有多种办法,一个是循环去kafka broker拉取,一个是重启kafka消费者容器。

 

这个在上面已经说明了,简单截个图再说明一下

设置的地方还是上面的代码里面,同理也可以修改事件为

ConsumerStoppedEvent

可以更精准,当然就用刚刚的代码加一行设置也是可以的。

总结

kafka在发送者和消费者是区分开的,发送者如果连接kafka broker失败后可以一直重试直到成功,但是消费者确有各种各样的逻辑,可以精准控制,比如消费者重启的配置

restartAfterAuthExceptions

可以控制消费者在停止时重启,如果仅仅是授权失败,而且不需要反复重启(消耗资源),那么可以通过

authExceptionRetryInterval

配置时间周期的方式实现,但是kafka并没有给我们配置的入口,但是kafka在消费者启动消费的过程埋了很多spring事件钩子,通过这些钩子可以操作,估计spring-kafka也不希望我们去修改,毕竟消费者启动失败了或授权失败了,消费者自动销毁是符合正常逻辑的。如果不使用kafka自己提供的事件,可以在启动完成通过

org.springframework.kafka.config.KafkaListenerEndpointRegistry

拿到所有消费者容器,来批量设置属性,毕竟spring-kafka也是通过这个端点注册器注册MessageListenerContainer的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/927196.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ansible使用说明

将安装包拷贝到主控端主机 在主控端主机安装ansible&#xff0c;sh setup.sh 确认安装成功后&#xff0c;编辑hosts文件&#xff08;按步骤逐个添加主机组&#xff0c;不要一开始全部配置好&#xff09; [site-init]下的主机列表为被控制的主机&#xff08;按照当前ai建模方案…

5G学习笔记之PRACH

即使是阴天&#xff0c;也要记得出门晒太阳哦 目录 1. 概述 2. PRACH Preamble 3. PRACH Preamble 类型 3.1 长前导码 3.2 短前导码 3.3 前导码格式与小区覆盖 4. PRACH时频资源 4.1 小区所有可用PRACH资源 4.2 SSB和RACH的关系 4.3 PRACH时频资源配置 1. 概述 随机接入…

单点登录深入详解之技术方案总结

技术方案之CAS认证 概述 CAS 是耶鲁大学的开源项目&#xff0c;宗旨是为 web 应用系统提供一种可靠的单点登录解决方案。 CAS 从安全性角度来考虑设计&#xff0c;用户在 CAS 输入用户名和密码之后通过ticket进行认证&#xff0c;能够有效防止密码泄露。 CAS 广泛使用于传统应…

不开流也可以知道文件大小(File类)file.length():long

但是文件的toString是这个东西&#xff0c;所以当你把一个文件对象转json&#xff0c;大概率只有paXXXXX” 这一个key&#xff0c;想要自动转成输出其他的文件大小或者文件名什么的&#xff0c;就自己封装file类&#xff0c;封装fiel的方法

数据结构 (16)特殊矩阵的压缩存储

前言 特殊矩阵的压缩存储是数据结构中的一个重要概念&#xff0c;它旨在通过找出特殊矩阵中值相同的矩阵元素的分布规律&#xff0c;把那些呈现规律性分布的、值相同的多个矩阵元素压缩存储到一个存储空间中&#xff0c;从而节省存储空间。 一、特殊矩阵的定义 特殊矩阵是指具有…

试题转excel;试题整理工具;试卷转excel;word转excel

一、问题描述 我父亲是一名教师&#xff0c;偶尔会需要将试卷转excel&#xff0c;方便管理处理一些特别重要的题目 于是&#xff0c;就抽空写一个专门将试题转excel的工具&#xff0c;便于各位教师从业者和教育行业的朋友更好的整理试题&#xff0c;减少一点重复枯燥的工作 …

CSP/信奥赛C++语法基础刷题训练(36):洛谷P11229:[CSP-J 2024] 小木棍

CSP/信奥赛C语法基础刷题训练&#xff08;36&#xff09;&#xff1a;洛谷P11229&#xff1a;[CSP-J 2024] 小木棍 题目描述 小 S 喜欢收集小木棍。在收集了 n n n 根长度相等的小木棍之后&#xff0c;他闲来无事&#xff0c;便用它们拼起了数字。用小木棍拼每种数字的方法如…

【NLP 4、数学基础】

此去经年&#xff0c;应是良辰美景虚设 —— 24.11.28 一、线性代数 1.标量和向量 ① 标量 Scalar 一个标量就是一个单独的数 ② 向量 Vector 一个向量是一列数 可以把向量看作空间中的点&#xff0c;每个元素是不同坐标轴上的坐标 向量中有几个数&#xff0c;就叫作几维…

IOC控制反转DI依赖注入(Java EE 学习笔记06)

1 IoC 控制反转 控制反转&#xff08;Inversion of Control&#xff0c;缩写为IoC&#xff09;是面向对象编程中的一个设计原则&#xff0c;用来降低程序代码之间的耦合度。在传统面向对象编程中&#xff0c;获取对象的方式是用new关键字主动创建一个对象&#xff0c;也就是说…

68 mysql 的 临键锁

前言 我们这里来说的就是 我们在 mysql 这边常见的 一种锁, 行临键锁 虽然 在平时我们用到的不是很多, 我们这里 主要是 讲一下 它的主要的触发的场景 行临键锁 等价于 行锁 间隙锁, 行间隙锁是一个 左开右开的区间, 行临键锁 是一个左开右闭的区间 但是 它 和 行锁的差异…

(数据结构与算法)如何提高学习算法的效率?面试算法重点有哪些?面试需要哪些能力?

面试官眼中的求职者 通过对你算法的考察&#xff01;&#xff01;&#xff01;&#xff01; 缩进太多&#xff01;&#xff01;一般不要超过三层&#xff01;&#xff01;&#xff01;缩进越少&#xff0c;bug越少&#xff1b;逻辑比较复杂&#xff0c;把这些包装成为函数&…

设计模式-适配器模式-注册器模式

设计模式-适配器模式-注册器模式 适配器模式 如果开发一个搜索中台&#xff0c;需要适配或接入不同的数据源&#xff0c;可能提供的方法参数和平台调用的方法参数不一致&#xff0c;可以使用适配器模式 适配器模式通过封装对象将复杂的转换过程隐藏于幕后。 被封装的对象甚至…

SpringBoot实战(三十二)集成 ofdrw,实现 PDF 和 OFD 的转换、SM2 签署OFD

目录 一、OFD 简介1.1 什么是 OFD&#xff1f;1.2 什么是 版式文档&#xff1f;1.3 为什么要用 OFD 而不是PDF&#xff1f; 二、ofdrw 简介2.1 定义2.2 Maven 依赖2.3 ofdrw 的 13 个模块 三、PDF/文本/图片 转 OFD&#xff08;ofdrw-conterver&#xff09;3.1 介绍&#xff1a…

Opencv+ROS实现摄像头读取处理画面信息

一、工具 ubuntu18.04 ROSopencv2 编译器&#xff1a;Visual Studio Code 二、原理 图像信息 ROS数据形式&#xff1a;sensor_msgs::Image OpenCV数据形式&#xff1a;cv:Mat 通过cv_bridge()函数进行ROS向opencv转换 cv_bridge是在ROS图像消息和OpenCV图像之间进行转…

【MySQL — 数据库基础】MySQL的安装与配置 & 数据库简单介绍

数据库基础 本节目标 掌握关系型数据库&#xff0c;数据库的作用掌握在Windows和Linux系统下安装MySQL数据库了解客户端工具的基本使用和SQL分类了解MySQL架构和存储引擎 1. 数据库的安装与配置 1.1 确认MYSQL版本 处理无法在 cmd 中使用 mysql 命令的情况&a…

shell编程基础笔记

目录 echo改字体颜色和字体背景颜色 bash基本功能&#xff1a; 运行方式&#xff1a;推荐使用第二种方法 变量类型 字符串处理&#xff1a; 条件判断&#xff1a;&#xff08;使用echo $?来判断条件结果&#xff0c;0为true&#xff0c;1为false&#xff09; 条件语句&a…

maxun爬虫工具docker搭建

思路来源开源无代码网络数据提取平台Maxun 先把代码克隆到本地&#xff08;只有第一次需要&#xff09; git clone https://github.com/getmaxun/maxun.git 转到maxun目录 cd maxun 启动容器 docker-compose --env-file .env up -d 成功启动六个容器 网址 http://local…

redis揭秘-redis01-redis单例与集群安装总结

文章目录 【README】【1】安装单机【1.1】安装环境【1.2】安装步骤 【2】redis集群主从模式配置【2.1】集群架构【2.2】redis集群主从模式搭建步骤【2.3】redis集群主从模式的问题&#xff08;单点故障问题&#xff09; 【3】redis集群哨兵模式配置【3.1】集群架构【3.2】redis…

构建高可用系统设计OpenStack、Docker、Mesos和Kubernetes(简称K8s)

如果构建高可用、高并发、高效运维的大型系统 大型系统架构设计包括业务层设计、服务层设计、基础架层设计、存储层设计、网络层协同设计来完成。 一、业务层 根据主要业务范畴的分类和特征提取&#xff0c;抽象出独立的业务系统&#xff0c;分别统计系统的用户角色群体、访…

mrRobot解题过程

一、靶场环境需要桥接网络 不建议使用校园网&#xff0c;因为使用校园网进行主机探索的时候会出现数不完的主机。 arp-scan -l若是流量少只能用校园网&#xff0c;便在这里看靶机ip 二、端口扫描 我习惯用fscan了&#xff08;需要自己安装&#xff09;&#xff0c;你们用nma…