kafka的位移

文章目录

    • 概要
    • 消费位移
    • __consumer_offsets主题
    • 位移提交

概要

本文主要总结kafka的位移是如何管理的,在broker端如何通过命令行查看到位移信息,并从代码层面总结了位移的提交方式。

消费位移

对于 Kafka 中的分区而言,它的每条消息都有唯一offset ,用来表示消息在分区中对应位置;对于消费者来说,它也有 offset 的概念,消费者使用 offse 来表示消费到分区中某个消息所在的位置。可通过命令行在查看到一个群组,在topic中两者当前的位置
bin/kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group kafka-boot

[root@node1 kafka_2.13-3.2.1]# bin/kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe  --group kafka-boot

Consumer group 'kafka-boot' has no active members.

GROUP           TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
kafka-boot      test-error-topic 0          26              26              0               -               -               -
kafka-boot      normal-test      0          23              24              1               -               -               -

这里对offse 做些区分 对于消息在分区中的位置 CURRENT-OFFSET称为“偏移量” 或消息位移;对于消费者消费到的位置,LOG-END-OFFSET称为“位移 ,有时候也会更明确地称之为“消费位移“。

生产者位移跟消费者位移的关系可以用下图来说明:
在这里插入图片描述

总结几个需要注意的点:

  • 分区副本有两种类型
    领导者副本:生产者跟消费者的请求都只会经过领导者副本
    跟随者副本:首领之外的副本,不处理客户端请求,从领导者副本那里通过拉取的方式同步消息
  • 消费位移存储在Zookeeper或Kafka中,新消费者客户端,偏移量存储咋Kafka内部主题 __consumer_offsets
  • 消费者提交的位移是当前消费消息位移的下一个位置,即:lastConsumeedOffset+1

__consumer_offsets主题

Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为提交位移(Committing Offsets)。

老版本 Consumer 的位移是提交到 ZooKeeper 中保存的。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。

但是,ZooKeeper 其实并不适用于这种高频的写操作,Kafka 社区自 0.8.2.x 版本开始推出了全新的位移管理
机制,将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,
__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。这种方式能够满足高频的写操作。

两个相关参数:
offsets.topic.num.partitions : 设置 __consumer_offsets主题的分区数,默认是50个分区
offsets.topic.replication.factor : 设置__consumer_offsets主题的副本数,默认是3(下载安装的包中此值可能为1 )

当Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题

一共有50个分区,那么消费者将位移提交到了哪个分区呢?

通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker
就是这个consumer group的coordinator
公式:Math.abs(groupID.hashCode()) % numPartitions

Kafka 1.0.2及以后提供了kafka_consumer_groups.sh脚本供用户查看consumer信息

1. 创建一个topic,分区数设置为1,副本数设置为1

[root@node1 kafka_2.13-3.2.1]# bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic test-offset --partitions 1 --replication-factor 1
Created topic test-offset.

[root@node1 kafka_2.13-3.2.1]# bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic test-offset
Topic: test-offset      TopicId: in6gxQ5OQS6x9R8V3oJ7AQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: test-offset      Partition: 0    Leader: 0       Replicas: 0     Isr: 0

2. 向主题test-offset中发送消息

[root@node1 kafka_2.13-3.2.1]# bin/kafka-console-producer.sh --broker-list node1:9092 --topic test-offset
>hello

3. 创建一个消费组,并从头开始消费

[root@node1 kafka_2.13-3.2.1]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092  --from-beginning --consumer-property group.id=testOffsetGroup   --topic test-offset
hello

4. 用代码根据上面的公式计算消费组testOffsetGroup提交位移的分区数

@Test
void getCommitOffsetPartitionTest() {
    String groupId = "testOffsetGroup";
    // 运行结果为16
    System.out.println(Math.abs(groupId.hashCode() % 50));
}
  1. 将kafka配置文件consumer.properties中设置exclude.internal.topics=false,并重启服务
    6. 查看主题__consumer_offsets第16分区上的信息,可以看到消费组testOffsetGroup提交的位移确实保存在了16分区上

[root@node1 kafka_2.13-3.2.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 16 --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896116191, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896121189, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896126188, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896131188, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896133573, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896162124, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896167124, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896172123, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896177124, expireTimestamp=None)
[testOffsetGroup,test-offset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1691896178781, expireTimestamp=None)

从上面也可看出__consumer_offsets topic的每一日志项的格式都是:
[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

客户端提交消费位移是使用OffsetCommitRequest 请求实现的,其结构如下
在这里插入图片描述

__consumer_offsets这个主题中的消息格式为KV对,key为[Group, Topic, Partition],value可以简单理解为记录了偏移量;这样的记录方式,使得broker端不需要关系group下有多少个消费者,新增消费者或者减少消费者发生重平衡时,都能准确地定位到对应地分区应该从哪个位置开始消费。

位移提交

鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响,Kafka,特别是KafkaConsumer API,提供了多种提交位移的方法。从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

自动提交

自动提交,就是指 Kafka Consumer 在后台默默地为你提交位移
两个重要的参数

  • enable.auto.commit设置是否自动提交位移,默认是true
  • auto.commit.interval.ms:设置自动提交为true时,该参数生效,标识多久提交一次位移,默认5s,
public static void main(String[] args) {
      Map<String, Object> configs = new HashMap<>();
      configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
      configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
      configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
      configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");
      // 设置偏移量自动提交。自动提交是默认值。这里做示例。
      configs.put("enable.auto.commit", "true");
      // 偏移量自动提交的时间间隔
      configs.put("auto.commit.interval.ms", "2000");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);
      consumer.subscribe(Collections.singleton("tp_demo_01"));

      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
          for (ConsumerRecord<String, String> record : records) {
              System.out.println(record.topic()
                      + "\t" + record.partition()
                      + "\t" + record.offset()
                      + "\t" + record.key()
                      + "\t" + record.value());
          }
      }
 }

设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但是会出现消息重复消费

在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

手动同步提交

开启手动提交位移的方法就是设置enable.auto.commit 为 false。但是,仅仅设置它为 false 还不够,因为你只是告诉
Kafka Consumer 不要自动提交位移而已,你还需要调用相应的 API 手动提交位移。

public static void main(String[] args) {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");

   configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);
    consumer.subscribe(Collections.singleton("tp_demo_01"));
    while (true) {
        ConsumerRecords<String, String> records =
                consumer.poll(Duration.ofSeconds(1));
        process(records); // 处理消息
        try {
            consumer.commitSync();
        } catch (CommitFailedException e) {
           handle(e); // 处理提交失败异常
        }
    }
}

调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束,这样就会影响TPS。

鉴于此问题,还有另外一个提交方式

手动异步提交

public static void main(String[] args) {
   Map<String, Object> configs = new HashMap<>();
   configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
   configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

   configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
   configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
   configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");
   configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
   KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(configs);
   while (true) {
       ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
       process(records); // 处理消息
       consumer.commitAsync((offsets, exception) -> {
           if (exception != null) {
               handle(exception);
           }
       });
   }
}

commitAsync 是否能够替代 commitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

是手动提交,需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:

  1. 利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。这些问题都是短暂的,自动重试通常都会成功。
  2. 不希望程序总处于阻塞状态,影响 TPS。
public static void main(String[] args) {
   Map<String, Object> configs = new HashMap<>();
   configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
   configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

   configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
   configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
   configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "con1");
   configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
   KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
           String>(configs);
   consumer.subscribe(Collections.singleton("tp_demo_01"));
   try {
       while (true) {
           ConsumerRecords<String, String> records =
                   consumer.poll(Duration.ofSeconds(1));
           consumer.commitAsync();
           process(records); // 处理消息
           consumer.commitAsync(); // 异步提交
       }
   } catch (Exception e) {
       handle(e); // 处理异常
   } finally {
       try {
           consumer.commitSync();// 最后一次提交使用同步阻塞式提交
       } finally {
           consumer.close();
       }

   }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/75652.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【科研论文配图绘制】task1 掌握科研绘图的基本知识

【科研论文配图绘制】task1 掌握科研绘图的基本知识 写在最前 8月份Datawhale组队学习&#xff0c;写下该博客记录学习内容 1.科研论文配图的分类与构成 2.科研论文配图的格式和尺寸 3.科研论文配图中的字体和字号设置 4.科研论文配图的版式设计、结构布局和颜色搭配 占个…

Postman接口自动化测试实战,从0到1一篇彻底打通...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 postman中的测试 …

6.2.0在线编辑:GrapeCity Documents for Word (GcWord) Crack

GrapeCity Word 文档 (GcWord) 支持 Office Math 函数以及转换为 MathML GcWord 现在支持在 Word 文档中创建和编辑 Office Math 内容。GcWord 中的 OMath 支持包括完整的 API&#xff0c;可处理科学、数学和通用 Word 文档中广泛使用的数学符号、公式和方程。以下是通过 OMa…

2022年3月全国计算机等级考试真题(二级C语言)

2022年3月全国计算机等级考试真题&#xff08;二级C语言&#xff09; 第1题 下列有关栈论述正确的是&#xff08; &#xff09; A. 栈顶元素最先能被删除 B. 栈顶元素最后才被删除 C. 栈底元素永远不能被删除 D. 以上三种说法都不对 正确答案&#xff1a;A 得 0 / 1 分 第2题…

【C# 基础精讲】异常的类型和处理方法

异常&#xff08;Exception&#xff09;是在程序执行过程中发生的意外或异常情况&#xff0c;例如除零错误、空引用访问、文件不存在等。在C#及其他编程语言中&#xff0c;异常处理是一种重要的机制&#xff0c;用于捕获和处理程序运行时可能出现的错误&#xff0c;以保证程序的…

考研 408 | 【计算机网络】 应用层

导图 网络应用模型 客户/服务器&#xff08;c/s&#xff09;模型 P2P模型 DNS 域名 域名服务器 域名解析过程 文件传输协议FTP FTP服务器和用户端 FTP工作原理 电子邮件 电子邮件的信息格式 组成结构 邮件服务器的功能&#xff1a; 1.发送&接收邮件 2.给发件人报告邮…

vue基础知识三:v-show和v-if有什么区别?使用场景分别是什么?

一、v-show与v-if的共同点 我们都知道在 vue 中 v-show 与 v-if 的作用效果是相同的(不含v-else)&#xff0c;都能控制元素在页面是否显示 在用法上也是相同的 <Model v-show"isShow" /> <Model v-if"isShow" />当表达式为true的时候&#…

数据结构--最短路径 Floyd算法

数据结构–最短路径 Floyd算法 F l o y d 算法&#xff1a;求出每⼀对顶点之间的最短路径 \color{red}Floyd算法&#xff1a;求出每⼀对顶点之间的最短路径 Floyd算法&#xff1a;求出每⼀对顶点之间的最短路径 使⽤动态规划思想&#xff0c;将问题的求解分为多个阶段 对于n个顶…

视频汇聚平台EasyCVR视频监控播放平台WebRTC流地址无法播放的问题解决方案

开源EasyDarwin视频监控TSINGSEE青犀视频平台EasyCVR能在复杂的网络环境中&#xff0c;将分散的各类视频资源进行统一汇聚、整合、集中管理&#xff0c;在视频监控播放上&#xff0c;TSINGSEE青犀视频安防监控汇聚平台可支持1、4、9、16个画面窗口播放&#xff0c;可同时播放多…

NRF24L01+数据手册_关于几种工作模式

使用的是官方数据手册的章节编号&#xff0c;原文截图方便对照&#xff0c;部分翻译&#xff08;标蓝&#xff09;、个人理解&#xff08;标紫&#xff09;&#xff0c;关键信息&#xff08;标红&#xff09;。 6.1 Operational Modes操作模式 6.1.1 State diagram状态机图 6…

Android Socket使用TCP协议实现手机投屏

本节主要通过实战来了解Socket在TCP/IP协议中充当的是一个什么角色&#xff0c;有什么作用。通过Socket使用TCP协议实现局域网内手机A充当服务端&#xff0c;手机B充当客户端&#xff0c;手机B连接手机A&#xff0c;手机A获取屏幕数据转化为Bitmap&#xff0c;通过Socket传递个…

校园外卖小程序怎么做

校园外卖小程序是为满足校园内学生和教职员工的外卖需求而开发的一种应用程序。它涵盖了从用户端、商家端、骑手端、电脑管理员到小票打印、多商户入驻等多个方面的功能&#xff0c;以下将逐一介绍。 1. 用户端功能&#xff1a;校园外卖小程序为用户提供了便捷的订餐和外卖服务…

21.0 CSS 介绍

1. CSS层叠样式表 1.1 CSS简介 CSS(层叠样式表): 是一种用于描述网页上元素外观和布局的样式标记语言. 它可以与HTML结合使用, 通过为HTML元素添加样式来改变其外观. CSS使用选择器来选择需要应用样式的元素, 并使用属性-值对来定义这些样式.1.2 CSS版本 CSS有多个版本, 每个…

《测试设计思想》——图书推荐

前言&#xff1a; 在当今软件行业飞速发展的时代&#xff0c;软件测试的重要性日益凸显。为了帮助读者提高测试效率和测试质量&#xff0c;清华大学出版社推出了一本名为《测试设计思想》的书籍&#xff0c;由知名专家周海旭老师撰写。这本书深入探讨了测试设计的思想和方法&am…

easyx图形库基础:3实现弹球小游戏

实现弹球小游戏 一.实现弹球小游戏:1.初始化布&#xff1a;2.初始化一个球的信息&#xff1a;3.球的移动和碰撞反弹4.底边挡板的绘制和移动碰撞重置数据。 二.整体代码&#xff1a; 一.实现弹球小游戏: 1.初始化布&#xff1a; int main() {initgraph(800, 600);setorigin(40…

P16 电路定理——巧妙-灵性-智慧

1、诺顿定理的证明 诺顿定理的证明&#xff0c; 回忆戴维南定理的证明是&#xff0c;在a,b两端加上一个电流源&#xff0c;再根据叠加定理&#xff0c;就解电压Uab。 对偶原理&#xff1a; 在a,b两端加上一个电压源u&#xff0c;再根据叠加定理求A中的独立源作用是给到a&#x…

【爬虫】P1 对目标网站的背景调研(robot.txt,advanced_search,builtwith,whois)

对目标网站的背景调研 检查 robot.txt估算网站大小识别网站所用技术寻找网站的所有者 检查 robot.txt 目的&#xff1a; 大多数的网站都会包含 robot.txt 文件。该文件用于指出使用爬虫爬取网站时有哪些限制。而我们通过读 robot.txt 文件&#xff0c;亦可以最小化爬虫被封禁的…

观察者模式实战

场景 假设创建订单后需要发短信、发邮件等其它的操作&#xff0c;放在业务逻辑会使代码非常臃肿&#xff0c;可以使用观察者模式优化代码 代码实现 自定义一个事件 发送邮件 发送短信 最后再创建订单的业务逻辑进行监听&#xff0c;创建订单 假设后面还需要做其它的…

循环内的try-catch 跟循环外的try-catch有什么不一样

起因&#xff1a;一位面试管突然问了这么一道基础的面试题&#xff0c;反而秀了面试者一脸&#xff0c;经常用的却被问到时不知道怎么回答&#xff0c;所以我们平时在写代码的时候&#xff0c;要多注意细节跟原理。也许你不服&#xff1a;不就是先这样&#xff0c;再那样&#…

探讨uniapp的navigator 页面跳转问题

navigator 页面跳转。该组件类似HTML中的<a>组件&#xff0c;但只能跳转本地页面。目标页面必须在pages.json中注册。 "tabBar": {"color": "#7A7E83","selectedColor": "#3cc51f","borderStyle": "bl…