SpringBoot 处理 @KafkaListener 消息

消息监听容器

1、KafkaMessageListenerContainer

由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;

看看其整体代码结构:

可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作;

ListenerConsumer, 内部真正拉取消息消费的是这个结构,其 实现了Runable接口,简言之,它就是一个后台线程轮训拉取并处理消息(while true死循环拉取消息)。

在doStart方法中会创建ListenerConsumer并交给线程池处理

以上步骤就开启了消息监听过程。

KafkaMessageListenerContainer#doStart
protected void doStart() {
		if (isRunning()) {
			return;
		}
		ContainerProperties containerProperties = getContainerProperties();
		if (!this.consumerFactory.isAutoCommit()) {
			AckMode ackMode = containerProperties.getAckMode();
			if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
				Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
			}
			if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
					&& containerProperties.getAckTime() == 0) {
				containerProperties.setAckTime(5000);
			}
		}

		Object messageListener = containerProperties.getMessageListener();
		Assert.state(messageListener != null, "A MessageListener is required");
		if (containerProperties.getConsumerTaskExecutor() == null) {
			SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
					(getBeanName() == null ? "" : getBeanName()) + "-C-");
			containerProperties.setConsumerTaskExecutor(consumerExecutor);
		}
		Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
		this.listener = (GenericMessageListener<?>) messageListener;
		ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);
		if (this.listener instanceof DelegatingMessageListener) {
			Object delegating = this.listener;
			while (delegating instanceof DelegatingMessageListener) {
				delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();
			}
			listenerType = ListenerUtils.determineListenerType(delegating);
		}
        // 这里创建了监听消费者对象
		this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);
		setRunning(true);
        // 将消费者对象放入到线程池中执行
		this.listenerConsumerFuture = containerProperties
				.getConsumerTaskExecutor()
				.submitListenable(this.listenerConsumer);
	}
KafkaMessageListenerContainer.ListenerConsumer#run
public void run() {
			this.consumerThread = Thread.currentThread();
			if (this.genericListener instanceof ConsumerSeekAware) {
				((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
			}
			if (this.transactionManager != null) {
				ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
			}
			this.count = 0;
			this.last = System.currentTimeMillis();
			if (isRunning() && this.definedPartitions != null) {
				try {
					initPartitionsIfNeeded();
				}
				catch (Exception e) {
					this.logger.error("Failed to set initial offsets", e);
				}
			}
			long lastReceive = System.currentTimeMillis();
			long lastAlertAt = lastReceive;
			while (isRunning()) {
				try {
					if (!this.autoCommit && !this.isRecordAck) {
						processCommits();
					}
					processSeeks();
					if (!this.consumerPaused && isPaused()) {
						this.consumer.pause(this.consumer.assignment());
						this.consumerPaused = true;
						if (this.logger.isDebugEnabled()) {
							this.logger.debug("Paused consumption from: " + this.consumer.paused());
						}
						publishConsumerPausedEvent(this.consumer.assignment());
					}
                    // 拉取信息
					ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
					this.lastPoll = System.currentTimeMillis();
					if (this.consumerPaused && !isPaused()) {
						if (this.logger.isDebugEnabled()) {
							this.logger.debug("Resuming consumption from: " + this.consumer.paused());
						}
						Set<TopicPartition> paused = this.consumer.paused();
						this.consumer.resume(paused);
						this.consumerPaused = false;
						publishConsumerResumedEvent(paused);
					}
					if (records != null && this.logger.isDebugEnabled()) {
						this.logger.debug("Received: " + records.count() + " records");
						if (records.count() > 0 && this.logger.isTraceEnabled()) {
							this.logger.trace(records.partitions().stream()
								.flatMap(p -> records.records(p).stream())
								// map to same format as send metadata toString()
								.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
								.collect(Collectors.toList()));
						}
					}
					if (records != null && records.count() > 0) {
						if (this.containerProperties.getIdleEventInterval() != null) {
							lastReceive = System.currentTimeMillis();
						}
						invokeListener(records);
					}
					else {
						if (this.containerProperties.getIdleEventInterval() != null) {
							long now = System.currentTimeMillis();
							if (now > lastReceive + this.containerProperties.getIdleEventInterval()
									&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
								publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener
										? this.consumer : null, this.consumerPaused);
								lastAlertAt = now;
								if (this.genericListener instanceof ConsumerSeekAware) {
									seekPartitions(getAssignedPartitions(), true);
								}
							}
						}
					}
				}
				catch (WakeupException e) {
					// Ignore, we're stopping
				}
				catch (NoOffsetForPartitionException nofpe) {
					this.fatalError = true;
					ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
					break;
				}
				catch (Exception e) {
					handleConsumerException(e);
				}
			}
			ProducerFactoryUtils.clearConsumerGroupId();
			if (!this.fatalError) {
				if (this.kafkaTxManager == null) {
					commitPendingAcks();
					try {
						this.consumer.unsubscribe();
					}
					catch (WakeupException e) {
						// No-op. Continue process
					}
				}
			}
			else {
				ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");
				KafkaMessageListenerContainer.this.stop();
			}
			this.monitorTask.cancel(true);
			if (!this.taskSchedulerExplicitlySet) {
				((ThreadPoolTaskScheduler) this.taskScheduler).destroy();
			}
			this.consumer.close();
			this.logger.info("Consumer stopped");
		}

2、ConcurrentMessageListenerContainer

并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者。

	protected void doStart() {
		if (!isRunning()) {
			ContainerProperties containerProperties = getContainerProperties();
			TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
			if (topicPartitions != null
					&& this.concurrency > topicPartitions.length) {
				this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
						+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
						+ topicPartitions.length);
				this.concurrency = topicPartitions.length;
			}
			setRunning(true);

            // 创建多个消费者
			for (int i = 0; i < this.concurrency; i++) {
				KafkaMessageListenerContainer<K, V> container;
				if (topicPartitions == null) {
					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
							containerProperties);
				}
				else {
					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
							containerProperties, partitionSubset(containerProperties, i));
				}
				String beanName = getBeanName();
				container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
				if (getApplicationEventPublisher() != null) {
					container.setApplicationEventPublisher(getApplicationEventPublisher());
				}
				container.setClientIdSuffix("-" + i);
				container.setAfterRollbackProcessor(getAfterRollbackProcessor());
				container.start();
				this.containers.add(container);
			}
		}
	}

3、@KafkaListener底层监听原理

上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息,但还缺少关键的一步,即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?

那么这个桥梁就是@KafkaListener注解

KafkaListenerAnnotationBeanPostProcessor, 从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作,当然这里也是;此类会扫描带@KafkaListener注解的类或者方法,通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer,并调用start方法启动监听,也就是这样打通了这条路…

4、Spring Boot 自动加载kafka相关配置

1、KafkaAutoConfiguration
自动生成kafka相关配置,比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默认创建bean实例

2、KafkaAnnotationDrivenConfiguration
主要是针对于spring-kafka提供的注解背后的相关操作,比如 @KafkaListener;

在开启了@EnableKafka注解后,spring会扫描到此配置并创建缺少的bean实例,比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候,就会默认创建一个beanName为kafkaListenerContainerFactory的实例,这也是为什么在springboot中不用定义consumer的相关配置也可以通过@KafkaListener正常的处理消息

5、消息处理

1、单条消息处理

@Configuration
public class KafkaConsumerConfiguration {

  

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(2);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);
        // poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);
        return props;
    }

}

这种方式的@KafkaLisener中的参数是单条的。

2、批量处理

@Configuration
@EnableKafka
public class KafkaConfig {
 
    @Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // 增加开启批量处理
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
    return factory;
}
 
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

// 注意:这里接受的是集合类型
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

这种方式的@KafkaLisener中的参数是多条的。

6、线程池相关

如果没有额外给Kafka指定线程池,底层默认用的是SimpleAsyncTaskExecutor类,它不使用线程池,而是为每个任务创建新线程。相当于一个消费者用一个独立的线程来跑。

总结

spring为了将kafka融入其生态,方便在spring大环境下使用kafka,开发了spring-kafa这一模块,本质上是为了帮助开发者更好的以spring的方式使用kafka

@KafkaListener就是这么一个工具,在同一个项目中既可以有单条的消息处理,也可以配置多条的消息处理,稍微改变下配置即可实现,很是方便

当然,@KafkaListener单条或者多条消息处理仍然是spring自行封装处理,与kafka-client客户端的拉取机制无关;比如一次性拉取50条消息,对于单条处理来说就是循环50次处理,而多条消息处理则可以一次性处理50条;本质上来说这套逻辑都是spring处理的,并不是说单条消费就是通过kafka-client一次只拉取一条消息

在使用过程中需要注意spring自动的创建的一些bean实例,当然也可以覆盖其自动创建的实例以满足特定的需求场景。

                        
原文链接:https://blog.csdn.net/yuechuzhixing/article/details/124725713

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/876013.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

前端用html写excel文件直接打开

源码 <html xmlns:o"urn:schemas-microsoft-com:office:office" xmlns:x"urn:schemas-microsoft-com:office:excel" xmlns"http://www.w3.org/TR/REC-html40"> <head><meta charset"UTF-8"><!--[if gte mso 9]&…

C++学习笔记之引用(基础)

C学习笔记之引用 https://www.runoob.com/cplusplus/cpp-references.html 引用变量是一个别名&#xff0c;它是已存在变量的另一个名字 一旦把引用初始化为某个变量&#xff0c;可以使用该引用名称或变量名称来指向变量 1、引用vs指针 引用和指针之间有一些相似&#xff0c;也…

计算机的错误计算(九十三)

摘要 探讨 log(y,x) 即以 x 为底 y 的对数的计算精度问题。 Log(y,x)运算是指 x 为底 y 的对数。 例1. 计算 log(123667.888, 0.999999999999999) . 不妨在Python中计算&#xff0c;则有&#xff1a; 若在 Excel 单元格中计算&#xff0c;则有几乎同样的输出&#xff1a; 然…

C++多态 学习

目录 一、多态的概念 二、多态的实现 三、纯虚函数和多态类 四、多态的原理 一、多态的概念 多态&#xff1a;多态分为编译时多态(静态多态)和运行时多态(动态多态)。编译时多态主要是我们之前学过的函数重载和函数模板&#xff0c;他们在传不同类型的参数就可以调用不同的函…

OpenCV-Python笔记(上)

安装 全局安装 pip install opencv-python项目虚拟环境安装 # 进入项目根路径执行 .venv/bin/pip install opencv-python计算机眼中的图像 一张图片由大小比如&#xff08;100*100&#xff09;决定&#xff0c;说明存在100*100的像素点&#xff0c;每个像素点存在颜色通道&…

ppt文件怎么压缩变小一些?8种压缩PPT文件的方法推荐

ppt文件怎么压缩变小一些&#xff1f;在现代工作环境中&#xff0c;PPT文件常常是我们展示信息和分享想法的主要工具。然而&#xff0c;当这些文件变得庞大时&#xff0c;它们不仅会占用大量的存储空间&#xff0c;还可能导致处理速度变慢&#xff0c;影响整体工作效率。这种情…

4+1视图模型

逻辑视图&#xff08;Logical View&#xff09; 逻辑视图主要关注系统的功能分解&#xff0c;即系统如何被划分为不同的逻辑组件&#xff08;如类、接口、包等&#xff09;&#xff0c;以及这些组件之间的交互关系。它帮助开发者理解系统的业务逻辑和功能结构。 开发视图&…

【人工智能】OpenAI发布GPT-o1模型:推理能力的革命性突破,这将再次刷新编程领域的格局!

在人工智能领域&#xff0c;推理能力的提升一直是研究者们追求的目标。就在两天前&#xff0c;OpenAI正式发布了其首款具有推理能力的大语言模型——o1。这款模型的推出&#xff0c;不仅标志着AI技术的又一次飞跃&#xff0c;也为开发者和用户提供了全新的工具来解决复杂问题。…

【实践】应用访问Redis突然超时怎么处理?

目录标题 问题描述分析过程查看监控数据系统监控指标JVM监控指标Redis监控指标分析应用异常单机异常规律集群异常规律统计超时的key 初步结论验证结论访问Redis链路slowlogRedis单节点info all定位redis节点定位异常keybigkeystcpdump定位大key影响 经验总结 问题描述 某产品线…

【验收交付资料】系统培训方案(doc原件)

1. 培训目的 2. 培训方式 3. 培训内容 4. 培训讲师 5. 培训教材 6. 培训质量保证 软件全套资料部分文档清单&#xff1a; 工作安排任务书&#xff0c;可行性分析报告&#xff0c;立项申请审批表&#xff0c;产品需求规格说明书&#xff0c;需求调研计划&#xff0c;用户需求调查…

Pikachu靶场之XSS

先来点鸡汤&#xff0c;少就是多&#xff0c;慢就是快。 环境搭建 攻击机kali 192.168.146.140 靶机win7 192.168.146.161 下载zip&#xff0c;pikachu - GitCode 把下载好的pikachu-master&#xff0c;拖进win7&#xff0c;用phpstudy打开网站根目录&#xff0c;.....再用…

豆包MarsCode编程助手:产品功能解析与应用场景探索!

随着现代技术的不断进化升级&#xff0c;人工智能正在逐步改变着我们的日常工作方式。特别是对于复杂的项目&#xff0c;代码编写、优化、调试、测试等环节充满挑战。为了简化这些环节、提高开发效率&#xff0c;许多智能编程工具应运而生&#xff0c;豆包MarsCode 编程助手就是…

nodejs基础教程之-异步编程promise/async/generator

1. 异步 所谓"异步"&#xff0c;简单说就是一个任务分成两段&#xff0c;先执行第一段&#xff0c;然后转而执行其他任务&#xff0c;等做好了准备&#xff0c;再回过头执行第二段,比如&#xff0c;有一个任务是读取文件进行处理&#xff0c;异步的执行过程就是下面…

二、Kubernetes中pod的管理及优化

一 kubernetes 中的资源 1.1 资源管理介绍 在kubernetes中&#xff0c;所有的内容都抽象为资源&#xff0c;用户需要通过操作资源来管理kubernetes。 kubernetes的本质上就是一个集群系统&#xff0c;用户可以在集群中部署各种服务 所谓的部署服务&#xff0c;其实就是在kub…

【运维监控】Prometheus+grafana监控zookeeper运行情况

运维监控系列文章入口&#xff1a;【运维监控】系列文章汇总索引 文章目录 一、prometheus二、grafana三、prometheus集成grafana监控zookeeper1、修改zookeeper配置2、修改prometheus配置3、导入grafana模板4、验证 本示例通过zookeeper自带的监控信息暴露出来&#xff0c;然后…

Ceisum(SuperMap iClient3D for Cesium)实现平面裁剪

1&#xff1a;参考API文档&#xff1a;SuperMap iClient3D for Cesium 开发指南 2&#xff1a;官网示例&#xff1a;support.supermap.com.cn:8090/webgl/Cesium/examples/webgl/examples.html#layer 3&#xff1a;SuperMap iServer&#xff1a;欢迎使用 SuperMap iServer 11…

C语言---函数指针基础总结万字(4)

一、 函数 1.函数是一段可以重复执行的代码。 它可以接受不同的参数&#xff0c; 完成对应的操作。 下面的例子就是一个函数 int plus(int n) {return n; }上面的代码声明了一个函数plus()。 2.函数声明的语法有以下几点&#xff0c;需要注意。 返回值类型。 函数声明时&a…

每日奇难怪题(持续更新)

1.以下程序输出结果是() int main() {int a 1, b 2, c 2, t;while (a < b < c) {t a;a b;b t;c--;}printf("%d %d %d", a, b, c); } 解析:a1 b2 c2 a<b 成立 ,等于一个真值1 1<2 执行循环体 t被赋值为1 a被赋值2 b赋值1 c-- c变成1 a<b 不成立…

【油猴脚本】00006 案例 Tampermonkey油猴脚本自定义表格列名称,自定义表格表头,自定义表格的thead里的td

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 【油…

数据结构一:绪论

&#xff08;一&#xff09;数据结构的基本概念 1.相关名词 【1】数据 1.信息的载体&#xff0c;描述客观事物 2.能被输入到计算机中 3.能被计算机程序识别和处理的符号的集合。 【2】数据元素 1.数据的一个“个体” 2.数据的基本单位 3.有时候也被称为元素、结点、顶点…