Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。

为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。

这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。

例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。

还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。

其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。

下面开始对KafkaConsumer的分析。

KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。

  • subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
  • assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
  • commit*()方法:提交消费者已经消费完成的offset。
  • seek*()方法:指定消费者起始消费的位置。
  • poll()方法:负责从服务端获取消息。
  • pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。

了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:Consumer的唯一标示。
  • coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
  • keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
  • fetcher:负责从服务端获取消息。
  • interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
  • client:负责消费者与Kafka服务端的网络通信。
  • subscriptions:维护了消费者的消费状态。
  • metadata:记录了整个Kafka集群的元信息。
  • currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。

在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/330110.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ChatGPT 商业提示词攻略书

原文:ChatGPT Business Prompt Playbook 译者:飞龙 协议:CC BY-NC-SA 4.0 一、书系介绍 人工智能发展迅速。非常迅速。 所以我希望你做两件事: (1) 在 Twitter 上关注我:iamkylebalmer (2) 订阅我的免费电子邮件通…

爬虫-10-selenium自动化(2)

#内容:执行js,标签页切换,动作链,元素等待

给科研人的 ML 开源发布工具包

什么是开源发布工具包? 恭喜你的论文成功发表,这是一个巨大的成就!你的研究成果将为学界做出贡献。 其实除了发表论文之外,你还可以通过发布研究的其他部分,如代码、数据集、模型等,来增加研究的可见度和采…

葡萄酒术语“干”是什么意思呢?

一个初学品酒的人常常会感到力不从心,有如此多的术语,如甜、干、单宁、酒体等等,很容易让人迷失。嗯,就像情人眼里出西施一样,“好酒”因人而异。虽然品尝各种不同的葡萄酒是了解你喜欢什么的最好方法,但我…

leetcode热题100.路径总和 III

Problem: 437. 路径总和 III 文章目录 题目思路1复杂度1Code1思路2复杂度2Code2 题目 给定一个二叉树的根节点 root ,和一个整数 targetSum ,求该二叉树里节点值之和等于 targetSum 的 路径 的数目。 路径 不需要从根节点开始,也不需要在叶…

统计学-R语言-5.3

文章目录 前言分位数统计量的标准误总结 前言 本篇文章即为概率与分布的最后一篇文章。 分位数 分位数函数是累积分布函数的反函数。 p-分位数是具有这样性质的一个值:小于或等于它的概率为p。 根据定义,中位数即50%分位数。 分位数通常用于置信区间的…

如何解决分支机构无法连入总部采购管理系统的难题

案例背景: 某企业业务规模不断壮大,内部采购流程越发复杂,供应商资质情况各异难以管理,为提高内部采购效率和采购品质,优化供应链管理,确保采购环节公正透明可溯,该企业集中化部署了采购管理系…

Microsoft Word 删除空行

Microsoft Word 删除空行 1. 删除空行1.1. 替换1.2. 段落标记 References 1. 删除空行 1.1. 替换 1.2. 段落标记 特殊格式 -> 段落标记 References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

<软考高项备考>《论文专题 - 73 风险管理(5)》

5 过程4-实施定量风险分析 5.1 问题 4W1H过程做什么是就已识别的单个项目风险和不确定性的其他来源对整体项目目标的影响进行定量分析的过程。作用:1、量化整体项目风险最大可能性;2、提供额外的定量风险信息,以支持风险应对规划。为什么做了解风险对项目整体目标…

K8S对外服务ingress

Sevice作用体现在两个方面 集群内部 不断跟踪pod的变化,更新endpoint中的pod对象,基于pod的ip地址不断发现的一种服务发现机制 集群外部 类似负载均衡器,把流量(ip端口),不涉及转发url(http ht…

Golang 搭建 WebSocket 应用(三) - 实现一个消息推送中心

有了前两篇的铺垫,相信大家已经对 Golang 中 WebSocket 的使用有一定的了解了, 今天我们以一个更加真实的例子来学习如何在 Golang 中使用 WebSocket。 需求背景 在实际的项目中,往往有一些任务耗时比较长,然后我们会把这些任务…

【创作活动】ChatGPT 和文心一言哪个更好用?

文章目录 文心一言优点缺点 ChatGPT优点缺点 Java编码能力比较对人工智能的看法 ChatGPT是由OpenAI开发的交互式AI大模型, 文心一言是由百度研发的知识增强大语言模型,本文从Java开发的角度对比一下哪个更好用(本文仅用于投稿CSDN创造活动&am…

2024年阿里云服务器4核8G配置活动价格多少钱?

阿里云服务器4核8g配置云服务器u1价格是955.58元一年,4核8G配置还可以选择ECS计算型c7实例、计算型c8i实例、计算平衡增强型c6e、ECS经济型e实例、AMD计算型c8a等机型等ECS实例规格,规格不同性能不同,价格也不同,阿里云服务器网al…

缺失msvcr120.dll怎么办,msvcr120.dll一键修复教程

在计算机系统运行过程中,遇到“msvcr120.dll丢失”的情况时常困扰着众多用户。这一现象不仅会导致部分软件无法正常启动或运行,还可能引发一系列连锁反应,影响到用户的日常操作体验。msvcr120.dll作为Microsoft Visual C Redistributable Pac…

使用easyexcel 导出多级表头demo

先看效果&#xff1a; 1、引入maven依赖 <!--EasyExcel --> <dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>3.2.1</version> </dependency> 2、实体类 package com.…

全球光伏知名企业-晶科能源联合泛微采知连,建立文控管理平台

晶科能源股份有限公司&#xff08;简称“晶科能源”&#xff09;是一家全球知名、极具创新力的太阳能科技企业。 &#xff08;图片素材来自晶科能源官网&#xff09; 公司战略性布局光伏产业链核心环节&#xff0c;聚焦光伏产品一体化研发制造和清洁能源整体解决方案提供&…

安全牧场,保障优质奶源 追溯羊奶品质

安全牧场&#xff0c;保障优质奶源 追溯羊奶品质 近年来&#xff0c;人们对食品安全和健康越来越关注&#xff0c;而安全牧场的兴起正能够满足人们对优质奶源的需求。安全牧场以严格的品质监控和科学的管理&#xff0c;为消费者提供可追溯的高品质羊奶产品。本文小编羊大师将为…

白山云基于StarRocks数据库构建湖仓一体数仓的实践

背景 随着每天万亿级别的业务数据流向数据湖&#xff0c;数据湖的弊端也逐渐凸显出来&#xff0c;例如&#xff1a; 数据入湖时效性差&#xff1a;数据湖主要依赖于离线批量计算&#xff0c;通常不支持实时数据更新&#xff0c;因此无法保证数据的强一致性&#xff0c;造成数…

openssl3.2 - 官方demo学习 - test - certs

文章目录 openssl3.2 - 官方demo学习 - test - certs概述笔记.sh的执行语句打印的方法要修改的实际函数END openssl3.2 - 官方demo学习 - test - certs 概述 官方demos目录有证书操作的例子 已经做了笔记 openssl3.2 - 官方demo学习 - certs 但是这个demos/certs目录的脚本,…