kafka进阶_3.消费消息

文章目录

  • 一、消费消息概览
    • 1.1、基本代码
    • 1.2、消费过程
  • 二、消费者组
    • 2.1、push & pull
    • 2.2、消费者组
  • 三、调度器Coordinator
  • 四、消费者分配策略
  • 五、偏移量offset
    • 5.1、起始偏移量
    • 5.2、指定偏移量消费
    • 5.3、偏移量提交
      • 5.3.1、自动提交
      • 5.3.2、手动提交
    • 5.4、偏移量的保存
  • 六、消费者事务

如果想了解kafka基础架构、生产者架构和kafka存储消息可以参考 kafka基础、 Kafka进阶_1.生产消息和 kafka进阶_2.存储消息。

一、消费消息概览

1.1、基本代码

public class KafkaConsumerTest {
    public static void main(String[] args) {
        // TODO 创建消费者配置参数集合
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        // TODO 通过配置,创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
        // TODO 订阅主题
        consumer.subscribe(Collections.singletonList("test"));
        // TODO 消费数据
        final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
        // TODO 遍历数据
        for (ConsumerRecord<String, String> record : poll) {
            System.out.println( record.value() );
        }
        // TODO 关闭消费者
        consumer.close();
    }
}

主要的属性配置:

参数名参数作用类型推荐值
bootstrap.servers集群地址必须
key.deserializer对数据Key进行反序列化的类完整名称必须Kafka提供的字符串反序列化类:StringSerializer
value.deserializer对数据Value进行反序列化的类完整名称必须Kafka提供的字符串反序列化类:ValueSerializer
group.id消费者组ID,用于标识完整的消费场景,一个组中可以包含多个不同的消费者对象

1.2、消费过程

  消费者消费数据时,一般情况下,只是设定了订阅的主题名称,那是如何消费到数据的呢。我们这里说一下服务端拉取数据的基本流程:

在这里插入图片描述

  1. 服务端获取到用户拉取数据的请求:Kafka消费客户端会向Broker发送拉取数据的请求FetchRequest,服务端Broker获取到请求后根据请求标记FETCH交给应用处理接口KafkaApis进行处理。
  2. 通过副本管理器拉取数据:副本管理器需要确定当前拉取数据的分区,然后进行数据的读取操作。
  3. 判定首选副本:2.4版本前,数据读写的分区都是Leader分区,从2.4版本后,kafka支持Follower副本进行读取。主要原因就是跨机房或者说跨数据中心的场景,为了节约流量资源,可以从当前机房或数据中心的副本中获取数据。这个副本称之为首选副本。
  4. 拉取分区数据:Kafka的底层读取数据是采用日志段LogSegment对象进行操作的。
  5. 零拷贝:为了提高数据读取效率,Kafka的底层采用nio提供的FileChannel零拷贝技术,直接从操作系统内核中进行数据传输,提高数据拉取的效率。

二、消费者组

  从数据处理的角度来讲,消费者和生产者的处理逻辑都相对比较简单。Producer生产者的基本数据处理逻辑就是向Kafka发送数据,并获取Kafka的数据接收确认响应。

在这里插入图片描述

而消费者的基本数据处理逻辑就是向Kafka请求数据,并获取Kafka返回的数据。

在这里插入图片描述
  逻辑确实很简单,但是Kafka为了能够构建高吞吐,高可靠性,高并发的分布式消息传输系统,所以在很多细节上进行了扩展和改善:比如生产者可以指定分区,可以异步和同步发送数据,可以进行幂等性操作和事务处理。对应的,消费者功能和处理细节也进行了扩展和改善。

2.1、push & pull

  Kafka的主题如果就一个分区的话,那么在硬件配置相同的情况下,消费者Consumer消费主题数据的方式没有什么太大的差别。

在这里插入图片描述
  不过,Kafka为了能够构建高吞吐,高可靠性,高并发的分布式消息传输系统,它的主题是允许多个分区的,那么就会发现不同的消费数据的方式区别还是很大的。

1、如果数据由Kafka进行推送(push),那么多个分区的数据同时推送给消费者进行处理,明显一个消费者的消费能力是有限的,那么消费者无法快速处理数据,就会导致数据的积压,从而导致网络、存储等资源造成极大的压力,影响吞吐量和数据传输效率。

在这里插入图片描述
2.如果kafka的分区数据在内部可以存储的时间更长一些,再由消费者根据自己的消费能力向kafka申请(拉取)数据,那么整个数据处理的通道就会更顺畅一些。Kafka的Consumer就采用的这种拉取数据的方式。

在这里插入图片描述

2.2、消费者组

  消费者可以根据自身的消费能力主动拉取Kafka的数据,但是毕竟自身的消费能力有限,如果主题分区的数据过多,那么消费的时间就会很长。对于kafka来讲,数据就需要长时间的进行存储,那么对Kafka集群资源的压力就非常大。如果希望提高消费者的消费能力,并且减少kafka集群的存储资源压力。所以有必要对消费者进行横向伸缩,从而提高消息消费速率。

在这里插入图片描述
  不过这么做有一个问题,就是每一个消费者是独立,那么一个消费者就不能消费主题中的全部数据,简单来讲,就是对于某一个消费者个体来讲,主题中的部分数据是没有消费到的,也就会认为数据丢了,这个该如何解决呢?那如果我们将这多个消费者当成一个整体,是不是就可以了呢?这就是所谓的消费者组 Consumer Group。在kafka中,每个消费者都对应一个消费组,消费者可以是一个线程,一个进程,一个服务实例,如果kafka想要消费消息,那么需要指定消费哪个topic的消息以及自己的消费组id(groupId)。

在这里插入图片描述

三、调度器Coordinator

  消费者想要拉取数据,首先必须要加入到一个组中,成为消费组中的一员,同样道理,如果消费者出现了问题,也应该从消费者组中剥离。而这种加入组和退出组的处理,都应该由专门的管理组件进行处理,这个组件在kafka中,我们称之为消费者组调度器(Group Coordinator)。Group Coordinator是Broker上的一个组件,用于管理和调度消费者组的成员、状态、分区分配、偏移量等信息。每个Broker都有一个Group Coordinator对象,负责管理多个消费者组,但每个消费者组只有一个Group Coordinator。

在这里插入图片描述

四、消费者分配策略

五、偏移量offset

  偏移量offset是消费者消费数据的一个非常重要的属性。默认情况下,消费者如果不指定消费主题数据的偏移量,那么消费者启动消费时,无论当前主题之前存储了多少历史数据,消费者只能从连接成功后当前主题最新的数据偏移位置读取,而无法读取之前的任何数据,如果想要获取之前的数据,就需要设定配置参数或指定数据偏移量。

5.1、起始偏移量

在消费者的配置中,我们可以增加偏移量相关参数auto.offset.reset,用于从最开始获取主题数据:

public class KafkaConsumerTest {
    public static void main(String[] args) {

        // TODO 创建消费者配置参数集合
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        paramMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        // TODO 通过配置,创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
        // TODO 订阅主题
        consumer.subscribe(Arrays.asList("test"));

        while ( true ) {
            // TODO 消费数据
            final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));

            // TODO 遍历数据
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println( record );
            }
        }
    }
}

auto.offset.reset有三种取值:

1、earliest
对于同一个消费者组,从头开始消费。就是说如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费(未提交偏移量的场合)。

在这里插入图片描述
2、latest
对于同一个消费者组,消费者只能消费到连接topic后,新产生的数据(未提交偏移量的场合)。

在这里插入图片描述
3、none:生产环境不使用。

5.2、指定偏移量消费

除了上一节所讲的从最开始的偏移量或最后的偏移量读取数据以外,Kafka还支持从指定偏移量的位置开始消费数据:

public class KafkaConsumerOffsetTest {
    public static void main(String[] args) {

        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        KafkaConsumer<String, String> c = new KafkaConsumer<String, String>(paramMap);

        // TODO 订阅主题
        c.subscribe(Collections.singletonList("test"));
        // TODO 拉取数据,获取基本集群信息
        c.poll(Duration.ofMillis(100));
        // TODO 根据集群的基本信息配置需要消费的主题及偏移量
        final Set<TopicPartition> assignment = c.assignment();
        for (TopicPartition topicPartition : assignment) {
            if ( topicPartition.topic().equals("test") ) {
                c.seek(topicPartition, 0);
            }
        }
        // TODO 拉取数据
        while (true) {
            final ConsumerRecords<String, String> poll = c.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println( record.value() );
            }
        }
    }
}

5.3、偏移量提交

  生产环境中,消费者可能因为某些原因或故障重新启动消费,那么如果不知道之前消费数据的位置,重启后再消费,就可能重复消费(earliest)或漏消费(latest)。所以Kafka提供了保存消费者偏移量的功能,而这个功能需要由消费者进行提交操作。这样消费者重启后就可以根据之前提交的偏移量进行消费了。

注意:一旦消费者提交了偏移量,那么kafka会优先使用提交的偏移量进行消费。此时,auto.offset.reset参数是不起作用的。

5.3.1、自动提交

  所谓的自动提交就是消费者消费完数据后,无需告知kafka当前消费数据的偏移量,而是由消费者客户端API周期性地将消费的偏移量提交到Kafka中。这个周期默认为5000ms,可以通过配置进行修改。

public class KafkaConsumerCommitAutoTest {
    public static void main(String[] args) {

        // TODO 创建消费者配置参数集合
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // TODO 启用自动提交消费偏移量,默认取值为true
        paramMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // TODO 设置自动提交offset的时间周期为1000ms,默认5000ms
        paramMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        // TODO 通过配置,创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
        // TODO 订阅主题
        consumer.subscribe(Arrays.asList("test"));

        while ( true ) {
            // TODO 消费数据
            final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));

            // TODO 遍历数据
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println( record );
            }
        }
    }
}

5.3.2、手动提交

  基于时间周期的偏移量提交,是我们无法控制的,一旦参数设置的不合理,或单位时间内数据量消费的很多,却没有来及的自动提交,那么数据就会重复消费。所以Kafka也支持消费偏移量的手动提交,也就是说当消费者消费完数据后,自行通过API进行提交。不过为了考虑效率和安全,kafka同时提供了异步提交和同步提交两种方式供我们选择。注意:需要禁用自动提交ENABLE_AUTO_COMMIT_CONFIG=false,才能开启手动提交:

1、异步提交

  向Kafka发送偏移量offset提交请求后,就可以直接消费下一批数据,因为无需等待kafka的提交确认,所以无法知道当前的偏移量一定提交成功,安全性比较低,但相对,消费性能会提高

public class KafkaConsumerCommitASyncTest {
    public static void main(String[] args) {

        // TODO 创建消费者配置参数集合
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // TODO 禁用自动提交消费偏移量,默认取值为true
        paramMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        paramMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        paramMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // TODO 通过配置,创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(paramMap);
        // TODO 订阅主题
        consumer.subscribe(Arrays.asList("test"));
        while ( true ) {
            // TODO 消费数据
            final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
            // TODO 遍历处理数据
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println( record );
            }
            // TODO 异步提交偏移量
            //     此处需要注意,需要在拉取数据完成处理后再提交
            //     否则提前提交了,但数据处理失败,下一次消费数据就拉取不到了
            consumer.commitAsync();
        }
    }
}

2、同步提交

  必须等待Kafka完成offset提交请求的响应后,才可以消费下一批数据,一旦提交失败,会进行重试处理,尽可能保证偏移量提交成功,但是依然可能因为意外情况导致提交请求失败。此种方式消费效率比较低,但是安全性高。

5.4、偏移量的保存

  由于消费者在消费消息的时候可能会由于各种原因而断开消费,当重新启动消费者时我们需要让它接着上次消费的位置offset继续消费,因此消费者需要实时的记录自己以及消费的位置。0.90版本之前,这个信息是记录在zookeeper内的,在0.90之后的版本,offset保存在_consumer_offsets这个topic内。

  每个consumer会定期将自己消费分区的offset提交给_consumer_offsets,提交过去的时候,key是consumerGroupId+topic+分区号

在这里插入图片描述

六、消费者事务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/922256.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(笔记,自己可见_1)简单了解ZYNQ

1、zynq首先是一个片上操作系统&#xff08;Soc&#xff09;&#xff0c;结合了arm&#xff08;PS&#xff09;和fpga&#xff08;PL&#xff09;两部分组成 Zynq系统主要由两部分组成&#xff1a;PS&#xff08;Processing System&#xff09;和PL&#xff08;Programmable L…

c语言的qsort函数理解与使用

介绍&#xff1a;qsort 函数是 C 标准库中用于排序的快速排序算法函数。它的用法非常灵活&#xff0c;可以对任意类型的元素进行排序&#xff0c;只要提供了比较函数即可。 qsort 函数原型及参数解释&#xff1a; void qsort ( void* base, //指向要排序的数组的首元素…

【淘汰9成NLP面试者的高频面题】LSTM中的tanh和sigmoid分别用在什么地方?为什么?

博客主页&#xff1a; [青松] 本文专栏: NLP 大模型百面百过 【淘汰9成NLP面试者的高频面题】LSTM中的tanh和sigmoid分别用在什么地方&#xff1f;为什么&#xff1f; 重要性&#xff1a;★★★ &#x1f4af; 本题主要考察面试者对以下问题的理解&#xff1a; ① 数据特征和模…

JWT加解密应用方案设计与实现

为什么要用令牌技术&#xff1f; 这个问题其实问的就是Cookice、Session、Token(令牌)之间的区别了。 首先&#xff0c;存放的位置做一下比较&#xff0c;Cookice小饼干存放在客户端的浏览器当中&#xff0c;Session会话存放在服务器线程当中(本质上还是需要利用Cookice实现)…

数据集-目标检测系列- 安全背心 检测数据集 safety_vests >> DataBall

数据集-目标检测系列- 安全背心 检测数据集 safety DataBall 助力快速掌握数据集的信息和使用方式&#xff0c;会员享有 百种数据集&#xff0c;持续增加中。 贵在坚持&#xff01; 数据样例项目地址&#xff1a; * 相关项目 1&#xff09;数据集可视化项目&#xff1a;gi…

C语言菜鸟入门·关键字·int的用法

目录 1. int关键字 1.1 取值范围 1.2 符号类型 1.3 运算 1.3.1 加法运算() 1.3.2 减法运算(-) 1.3.3 乘法运算(*) 1.3.4 除法运算(/) 1.3.5 取余运算(%) 1.3.6 自增()与自减(--) 1.3.7 位运算 2. 更多关键字 1. int关键字 int 是一个关键字&#xff0…

unity中:超低入门级显卡、集显(功耗30W以下)运行unity URP管线输出的webgl程序有那些地方可以大幅优化帧率

删除Global Volume&#xff1a; 删除Global Volume是一项简单且高效的优化措施。实测表明&#xff0c;这一改动可以显著提升帧率&#xff0c;甚至能够将原本无法流畅运行的场景变得可用。 更改前的效果&#xff1a; 更改后的效果&#xff1a; 优化阴影和材质&#xff1a; …

Vue + Websocket播放PCM(base64转ArrayBuffer、 字符串转ArrayBuffer)

文章目录 引言I 音视频处理相关概念和APIII 案例:基于开源库 pcm-player方式播放借助MediaSource和Audio对象播放音频流。基于原生api AudioContext 播放操作III 格式转换js字符串转ArrayBufferbase64 转 ArrayBufferIV 解决pcm-player分片播放问题引言 需求: 基于webscoket传…

【JavaEE进阶】SpringBoot 快速上⼿

了解Maven,并配置国内源 使⽤SpringBoot创建⼀个项⽬, 输出HelloWorld 一、Maven 1.什么是Maven 官⽅对于Maven的描述: Apache Maven is a software project management and comprehension tool. Based on the concept of a project object model (POM), Maven can man…

QT QFormLayout控件 全面详解

本系列文章全面的介绍了QT中的57种控件的使用方法以及示例&#xff0c;包括 Button(PushButton、toolButton、radioButton、checkBox、commandLinkButton、buttonBox)、Layouts(verticalLayout、horizontalLayout、gridLayout、formLayout)、Spacers(verticalSpacer、horizonta…

PCA算法所体现的核心数学思维

一、PCA算法的基本思想 PCA算法的核心思想是通过线性变换&#xff0c;将数据从原始的高维空间投影到低维空间&#xff0c;同时尽可能保留数据的主要变异性。这种变换是通过找到一组新的坐标轴&#xff08;即主成分&#xff09;来实现的&#xff0c;这些坐标轴是原始数据空间的…

如何解决pdf.js跨域从url动态加载pdf文档

摘要 当我们想用PDF.js从URL加载文档时&#xff0c;将会因遇到跨域问题而中断&#xff0c;且是因为会触发了PDF.js和浏览器的双重CORS block&#xff0c;这篇文章将会介绍&#xff1a;①如何禁用pdf.js的跨域&#xff1f;②如何绕过浏览器的CORS加载URL文件&#xff1f;②如何使…

C语言数据结构——详细讲解 双链表

从单链表到双链表&#xff1a;数据结构的演进与优化 前言一、单链表回顾二、单链表的局限性三、什么是双链表四、双链表的优势1.双向遍历2.不带头双链表的用途3.带头双链表的用途 五、双链表的操作双链表的插入操作&#xff08;一&#xff09;双链表的尾插操作&#xff08;二&a…

Java小白成长记(创作笔记二)

目录 序言 思维导图 续 用户登录/注册 数据表 实体层 持久层 服务层 认证与授权 整合springsecurity controller注册测试 controller登录测试 跨域解决 方法 Java小白成长记&#xff08;创作笔记一&#xff09; Java小白成长记&#xff08;创作笔记二&#xff09;…

案例研究|阿特斯的JumpServer分布式部署和多组织管理实践

苏州阿特斯阳光电力科技有限公司&#xff08;以下简称为阿特斯&#xff09;是一家集太阳能光伏组件制造和为全球客户提供太阳能应用产品研发、设计、制造、销售的专业公司。 阿特斯集团总部位于加拿大&#xff0c;中国区总部位于江苏省苏州市。通过全球战略和多元化的市场布局…

20241123-四元数高阶奇异值分解-(1)

四元数高阶奇异值分解及其在彩色图像处理中的应用-(1) &#x1f4d4; 声明 &#x1f1e8;&#x1f1f3; : 1️⃣ &#x1f4c3; 原文网址链接: 四元数高阶奇异值分解及其在彩色图像处理中的应用 - ScienceDirect &#x1f517; Quaternion … image processing (arxiv.org) ​ …

游戏引擎学习第20天

视频参考:https://www.bilibili.com/video/BV1VkBCYmExt 解释 off-by-one 错误 从演讲者的视角&#xff1a;对代码问题的剖析与修复过程 问题的起因 演讲者提到&#xff0c;他可能无意中在代码中造成了一个错误&#xff0c;这与“调试时间标记索引”有关。他发现了一个逻辑问题…

python开发之Linux

文章目录 1. 基础2. 进阶链接压缩/解压缩 文件权限用户远程操作编辑文件软件安装 1. 基础 # 查看当前目录下文件 ls# 查看当前目录 pwd# 清除界面内容 clear# 切换目录 cd# 创建目录 mkdir# 创建文件 touch 文件 vi 文件# 强制删除 rm -rf # 复制文件 cp 复制文件 复制文件路径…

Docker2:docker快速入门(部署MySQL)

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

oracle的静态注册和动态注册

oracle的静态注册和动态注册 静态注册&#xff1a; 静态注册 : 指将实例的相关信息手动告知 listener 侦 听 器 &#xff0c; 可以使用netmgr,netca,oem 以及直接 vi listener.ora 文件来实现静态注册&#xff0c;在动态注册不稳定时使用&#xff0c;特点是&#xff1a;稳定&…