Spring Kafka—— KafkaListenerEndpointRegistry 隐式注册分析

由于我想在项目中实现基于 Spring kafka 动态连接 Kafka 服务,指定监听 Topic 并控制消费程序的启动和停止这样一个功能,所以就大概的了解了一下 Spring Kafka 的几个重要的类的概念,内容如下:

  1. ConsumerFactory
    • 作用:负责创建 Kafka 消费者的实例。ConsumerFactory 是一个简单的工厂类,用于封装消费者的配置(如bootstrap servers, key deserializer, value deserializer等)并生成Consumer实例。
    • 用法:通常在Spring配置类中定义,并通过依赖注入提供给KafkaListenerContainerFactory
  2. ConcurrentKafkaListenerContainerFactory
    • 作用:这个工厂类用于创建 ConcurrentMessageListenerContainer 实例,该容器管理多个Kafka MessageListenerContainer来提供并发消息消费。
    • 特点:可以设置并发消费的数量,即同时运行的MessageListenerContainer的数量。
      支持消息过滤、错误处理和事务管理。
    • 用法:在Spring配置类中定义,并设置其ConsumerFactory和其他相关配置。然后,可以通过@KafkaListener注解直接使用,Spring会自动使用这个工厂来创建监听器。
  3. KafkaListenerEndpointRegistry
    • 作用:这是一个管理类,用于管理应用中所有由@KafkaListener注解创建的消息监听器容器。
    • 特点:提供了启动和停止监听器的方法,可以在运行时控制监听器。
      可以用来查询当前所有注册的监听器的状态。
    • 用法:通常自动配置,可以通过自动注入到任何Spring管理的Bean中,用于运行时管理监听器。
  4. KafkaTemplate
    • 作用:这是一个高级抽象,用于生产消息到Kafka主题。
    • 特点:提供同步和异步发送消息的方法。
      支持事务消息发送。
    • 用法:定义在Spring配置类中,注入生产者工厂ProducerFactory,并用于应用中的消息发送。
  5. @KafkaListener
    作用:注解用于标记方法以作为Kafka消息的监听器,这些方法会自动被Spring容器管理,并在有新消息时触发。
    特点:
    可以指定主题、分区和消费组。
    支持并发消费。
    用法:放在组件的方法上,方法参数可以灵活地映射消息的key、value、headers等。

从上面的内容可以看到,KafkaListenerEndpointRegistry 这个类是管理消息监听容器的,并提供了启动和停止监听器的方法,于是我就想创建这个类来完成我的需求功能。当我直接写如下内容时:

@Component
public class KafkaConfig {

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @PostConstruct
    public void init() {
        System.out.println(registry);
    }
}

IDEA提示了 Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found. 但是我启动 SpringBoot 项目却没有报错 :
在这里插入图片描述
我在我的项目中是没有加 @EnableKafka 这样的注解的,代码如下:

@SpringBootApplication
public class SpringKafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaExampleApplication.class, args);
    }
}

引入的依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

于是我就比较好奇,项目启动的时候是在什么地方声明了 KafkaListenerEndpointRegistry 这个 bean 的。

KafkaListenerEndpointRegistry 隐式注册分析

SpringBoot 对于 kafka 有如下的自动配置:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

	private final KafkaProperties properties;

	private final RecordMessageConverter recordMessageConverter;

	private final RecordFilterStrategy<Object, Object> recordFilterStrategy;

	private final BatchMessageConverter batchMessageConverter;

	private final KafkaTemplate<Object, Object> kafkaTemplate;

	private final KafkaAwareTransactionManager<Object, Object> transactionManager;

	private final ConsumerAwareRebalanceListener rebalanceListener;

	private final CommonErrorHandler commonErrorHandler;

	private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;

	private final RecordInterceptor<Object, Object> recordInterceptor;

	KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
			ObjectProvider<RecordMessageConverter> recordMessageConverter,
			ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
			ObjectProvider<BatchMessageConverter> batchMessageConverter,
			ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
			ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
			ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
			ObjectProvider<CommonErrorHandler> commonErrorHandler,
			ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
			ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
		this.properties = properties;
		this.recordMessageConverter = recordMessageConverter.getIfUnique();
		this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
		this.batchMessageConverter = batchMessageConverter
			.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));
		this.kafkaTemplate = kafkaTemplate.getIfUnique();
		this.transactionManager = kafkaTransactionManager.getIfUnique();
		this.rebalanceListener = rebalanceListener.getIfUnique();
		this.commonErrorHandler = commonErrorHandler.getIfUnique();
		this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
		this.recordInterceptor = recordInterceptor.getIfUnique();
	}

	@Bean
	@ConditionalOnMissingBean
	ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
		ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
		configurer.setKafkaProperties(this.properties);
		configurer.setBatchMessageConverter(this.batchMessageConverter);
		configurer.setRecordMessageConverter(this.recordMessageConverter);
		configurer.setRecordFilterStrategy(this.recordFilterStrategy);
		configurer.setReplyTemplate(this.kafkaTemplate);
		configurer.setTransactionManager(this.transactionManager);
		configurer.setRebalanceListener(this.rebalanceListener);
		configurer.setCommonErrorHandler(this.commonErrorHandler);
		configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
		configurer.setRecordInterceptor(this.recordInterceptor);
		return configurer;
	}

	@Bean
	@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
	ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
			ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
			ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
		ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
		configurer.configure(factory, kafkaConsumerFactory
			.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
		return factory;
	}

	@Configuration(proxyBeanMethods = false)
	@EnableKafka
	@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	static class EnableKafkaConfiguration {

	}
}

可以看到这个配置类里面有一个静态的内部类 EnableKafkaConfiguration 该类上声明了 @EnableKafka 注解,也就是说内部静态类EnableKafkaConfiguration使用了@EnableKafka注解,并且通过@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)确保如果Spring上下文中缺少相应的Bean,则自动激活@EnableKafka功能。这意味着,即便你没有在你的应用配置中显式添加@EnableKafka,这个内部类也可以根据条件自动注册所需的Bean,从而启用Kafka的支持。

@EnableKafka 定义如下:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}

这个注解的使用导致了KafkaListenerConfigurationSelector的激活,其源码如下:

@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {

	@Override
	public String[] selectImports(AnnotationMetadata importingClassMetadata) {
		return new String[] { KafkaBootstrapConfiguration.class.getName() };
	}

}

上面的代码中 DeferredImportSelector是Spring框架中一个特殊的接口,它继承自ImportSelector。它主要用于处理配置类的导入,允许更细致地控制配置类的加载顺序。这个接口特别适用于那些依赖于由Spring容器中其他Bean或配置动态决定的配置。
KafkaListenerConfigurationSelector 这个类实现了DeferredImportSelector并通过selectImports方法返回了一个配置类名称的数组。这个方法指定了当Spring处理到这个选择器时,它应该导入KafkaBootstrapConfiguration类。

KafkaBootstrapConfiguration 内容如下:

public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {

	@Override
	public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
		if (!registry.containsBeanDefinition(
				KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

			registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
					new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
		}

		if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
			registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
					new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
		}
	}

}

KafkaBootstrapConfiguration 是一个实现了ImportBeanDefinitionRegistrar接口的类,主要用于程序化地注册Bean定义到Spring的ApplicationContext中。通过实现ImportBeanDefinitionRegistrar接口,这个类可以在Spring的配置阶段动态地添加Bean定义。

在这个特定的实现中,KafkaBootstrapConfiguration检查特定的Kafka相关Bean(如KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME和KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)是否已经注册。如果这些Bean尚未注册,它会使用RootBeanDefinition手动注册这些Bean到Spring容器中。

RootBeanDefinition 的功能

RootBeanDefinition是Spring框架中用于定义Bean的一个核心类。它是BeanDefinition接口的一个直接实现,提供了一种配置Spring管理的Bean的方式,包括Bean的类类型、生命周期回调、依赖信息等。

  • Bean配置的详细定义:RootBeanDefinition允许开发者详细定义Bean的创建细节,如构造函数参数、属性值、初始化方法、销毁方法等。
  • 高级功能:它还支持更复杂的配置,如懒加载、自动装配模式、作用域和其他高级特性。
  • 程序化Bean注册:通过使用RootBeanDefinition,开发者可以在运行时动态地注册Bean,这对于条件配置或需要响应不同配置环境的高级用途尤为重要。

KafkaBootstrapConfiguration类中,使用RootBeanDefinition来创建和注册KafkaListenerAnnotationBeanPostProcessorKafkaListenerEndpointRegistry类的实例,这些是设置和管理Kafka消息监听器所必需的。

之后在AbstractBeanFactory会根据 beanName 获取到了 RootBeanDefinition 如下图所示:
在这里插入图片描述
然后在如下所示的位置:
在这里插入图片描述
程序创建了 beanName 为 org.springframework.kafka.config.internalKafkaListenerEndpointRegistry 的实例,具体创建实例的位置如下:
在这里插入图片描述
从调试中可以看到此处实例化了 KafkaListenerEndpointRegistry
所以当我们 springboot 项目引入了

 <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
 </dependency>

依赖后,即使我们不显示的声明 @EnableKafka 程序也会进行初始化相应的配置。

总结

当Spring Boot项目中引入Spring Kafka依赖后,即使我们没有显式声明@EnableKafka,系统仍会自动进行相应的配置。因此,在项目中尝试注入KafkaListenerEndpointRegistry时,尽管IDE可能会提示“Could not autowire. No beans of ‘KafkaListenerEndpointRegistry’ type found.”,项目依然能够正常启动。这是因为KafkaListenerEndpointRegistry在Spring Kafka的自动配置过程中已被隐式注册。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/565749.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用JavaScript及HTML、CSS完成秒表计时器

案例要求 1.界面为一个显示计时面板和三个按钮分别为:开始&#xff0c;暂停&#xff0c;重置 2.点击开始&#xff0c;面板开始计时&#xff0c; 3.点击暂停&#xff0c;面板停止 4.点击重置&#xff0c;计时面板重新为0 案例源码 <!DOCTYPE html> <html lang"…

sqlplus / as sysdba登陆失败,(ORA-01017)

周一上班检查alert log&#xff0c;看到某个库报出大量的错误 提示无法连接到ASM实例&#xff0c;这是某知名MES厂商DBA创建的11G RAC刚刚​转交到我手上的&#xff0c;这又是给我挖了什么坑&#xff1f; 报错为ORA-01017​用户名密码不对&#xff1f;​what&#xff1f; 登陆o…

负载均衡的原理及算法

一、定义 负载均衡&#xff08;Load Balancing&#xff09;是一种计算机网络和服务器管理技术&#xff0c;旨在分配网络流量、请求或工作负载到多个服务器或资源&#xff0c;以确保这些服务器能够高效、均匀地处理负载&#xff0c;并且能够提供更高的性能、可用性和可扩展性。…

OpenCV-复数矩阵点乘ComplexMatrixDotMultiplication

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 需求说明 一般用到FFT&#xff0c;就涉及到复数的计算&#xff0c;为了便于调用&#xff0c;我自行封装了一个简单的复数矩阵点乘…

服务器被CC攻击怎么办

遇到CC攻击时&#xff0c;可采取以下措施&#xff1a;限制IP访问频率、启用防DDoS服务、配置Web应用防火墙、增加服务器带宽、使用负载均衡分散请求压力。 处理服务器遭遇CC攻击的方法如下&#xff1a; 1. 确认攻击 你需要确认服务器是否真的遭受了CC攻击&#xff0c;这可以…

Day10-Java进阶-泛型数据结构(树)TreeSet 集合

1. 泛型 1.1 泛型介绍 package com.itheima.generics;import java.util.ArrayList; import java.util.Iterator;public class GenericsDemo1 {/*泛型介绍 : JDK5引入的, 可以在编译阶段约束操作的数据类型, 并进行检查注意 : 泛型默认的类型是Object, 且只能接引用数据类型泛型…

【STM32+HAL+Proteus】系列学习教程3---GPIO输出模式(LED流水灯、LED跑马灯)

实现目标 1、掌握GPIO 输出模式控制 2、学会STM32CubeMX软件配置GPIO 3、具体目标&#xff1a;1、开发板4个LED实现流水灯&#xff1b;2、开发板4个LED实现跑马灯灯。 一、STM32 GPIO 概述 1、GPIO定义 GPIO&#xff08;General-purpose input/output&#xff09;是通用输入…

牛客NC238 加起来和为目标值的组合【中等 DFS C++、Java、Go、PHP】

题目 题目链接&#xff1a; https://www.nowcoder.com/practice/172e6420abf84c11840ed6b36a48f8cd 思路 本题是组合问题&#xff0c;相同元素不同排列仍然看作一个结果。 穷经所有的可能子集&#xff0c;若和等于target&#xff0c;加入最终结果集合。 给nums排序是为了方便…

Ai-WB2 系列模组SDK接入亚马逊云

文章目录 前言一、准备二、亚马逊云物模型建立1. 注册亚马逊账号&#xff0c;登录AWS IoT控制台&#xff0c;[注册地址](https://aws.amazon.com/cn/)2. 创建好之后点击登录3. 创建物品以及下载证书 三、连接亚马逊云demo获取以及配置1. 下载源码2. 按照顺序执行下面指令3. 修改…

IDEA 2021.3.3最新激活破解教程(可激活至2099年,亲测有效)

1、ja-netfilter-all Windows 系统&#xff0c;点击运行 install-current-user.vbs 脚本&#xff0c;为当前用户安装破解补丁 截图是window环境下的激活方式 运行此补丁大约花费几分钟&#xff0c;点击 确定&#xff0c; 等待 Done 完成提示框出现&#xff0c;到这里&#xf…

HarmonyOS ArkUI实战开发-页面跳转(Router、Ability)

页面跳转可以分为页面内跳转和页面间跳转&#xff0c;页面内跳转是指所跳转的页面在同一个 Ability 内部&#xff0c;它们之间的跳转可以使用 Router 或者 Navigator 的方式&#xff1b;页面间跳转是指所跳转的页面属与不同的 Ability &#xff0c;这种跳转需要借助 featureAbi…

51单片机数字温度报警器_DS18B20可调上下限(仿真+程序+原理图)

数字温度报警器 1 **主要功能&#xff1a;*****\*资料下载链接&#xff08;可点击&#xff09;&#xff1a;\**** 2 **仿真图&#xff1a;**3 **原理图&#xff1a;**4 **设计报告&#xff1a;**5 **程序设计&#xff1a;**主函数外部中断函数DS18B20驱动 6 讲解视频7 **资料清…

完美运营版商城/拼团/团购/秒杀/积分/砍价/实物商品/虚拟商品等全功能商城

源码下载地址&#xff1a;完美运营版商城.zip 后台可以自由拖曳修改前端UI页面 还支持虚拟商品自动发货等功能 挺不错的一套源码 前端UNIAPP 后端PHP 一键部署版本

Linux 终止进程命令—sudo kill -9 <进程号>

一、查找占用端口的进程&#xff1a;使用以下命令找到占用了该端口的进程&#xff1a; sudo lsof -i :<端口号> 该命令将显示占用该端口的进程的详细信息。 二、结束占用端口的进程&#xff1a;根据上一步得到的进程信息&#xff0c;使用以下命令结束该进程&#xff1a…

CSS-vminvmax单位

vmin 和 vmax 单位 vmin 是相对于视口宽度和高度中较小值进行计算&#xff0c;它的值为视口宽度和高度中的较小值的百分比。 例如&#xff0c;如果视口宽度为 800px&#xff0c;高度为 1000px&#xff0c;那么 1vmin 等于 8px&#xff08;800px 的 1%&#xff09;。 vmax 是…

【Linux】权限(shell运行原理、概念,Linux权限)

&#x1f308;个人主页&#xff1a;秦jh__https://blog.csdn.net/qinjh_?spm1010.2135.3001.5343&#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/qinjh_/category_12625432.html 目录 shell命令以及运行原理 创建和删除用户 创建新普通用户 删除用户 Linux权…

【bug】使用mmsegmentaion遇到的问题

利用mmsegmentaion跑自定义数据集时的bug处理&#xff08;使用bisenetV2&#xff09; 1. ValueError: val_dataloader, val_cfg, and val_evaluator should be either all None or not None, but got val_dataloader{batch_size: 1, num_workers: 4}, val_cfg{type: ValLoop}, …

Elasticsearch单机部署(Linux)

1. 准备环境 本文中Elasticsearch版本为7.12.0&#xff0c;JDK版本为1.8.0&#xff0c;Linux环境部署。 扩展&#xff1a; &#xff08;1&#xff09;查看Elasticsearch对应的常用的jdk版本如下&#xff1a;&#xff08;详情可看官网的支持一览表&#xff09; Elasticsearch a…

CTF网络安全大赛详情

网络安全已成为现代社会的一个关键挑战&#xff0c;随着互联网技术的飞速发展&#xff0c;从个人隐私保护到国家安全&#xff0c;网络安全的重要性日益突显。为了应对这一挑战&#xff0c;CTF&#xff08;Capture The Flag&#xff0c;中文&#xff1a;夺旗赛&#xff09;应运而…

03-JAVA设计模式-命令模式

命令模式 什么是命令模式 命令模式&#xff08;Command Pattern&#xff09;是一种行为设计模式&#xff0c;它将请求封装为对象&#xff0c;从而使你可用不同的请求把客户端与请求的处理者解耦,也称动作模式或事物模式。 在命令模式中&#xff0c;命令对象封装了接收者对象…