Kafka之Consumer原理

1. Kafka消息消费流程

kafka的消费流程,首先是producer生产消息经过处理后放入到Broker服务器中,然后进入到内存中,内存再进行刷盘到磁盘中,kafak提供了两种刷盘策略,同步刷盘(flush.message 一次IO刷盘多少消息)和异步刷盘(flush.ms多长时间刷盘一次)

 随后,消费者首先建立连接到网卡,到Broker服务器中获取消息,Broker调用sendFile函数进行零拷贝,将磁盘中的数据首先由DMA拷贝到内核态,进而零拷贝到网卡中,最后消费端还要维护一个Offset记录一下消息消费的位置,更新消费进度。

2. Offset的维护

Offset由上面我们了解到主要就是消费端消费消息的一个标记值,那么究竟是怎么存储的呢?值又是怎么更新的呢?

2.1 存储Offset信息

Offset是用于记录消费组中消费者的位置

__consumer_offsets(Topic)存储结构

__consumer_offsets中主要存储两种对象: groupMetadata(消费者信息、消费者编号 从这里知道消费者是消费哪个组的)、OffsetAndMetadata(分区以及offset)

2.2 找不到Offset

比如我们新加了一个消费者,这个消费者跟之前的Broker都没有建立连接,那么我们Broker中的Topic的Partion没有Offset记录怎么办?消费者从哪里开始消费?

在消费者中的API提供了一个参数的几种配置

auto.offset.reset=latest (默认值)消费最新的,不去消费历史记录
auto.offset.reset=earliest 消费最早的,可消费历史信息
auto.offset.reset=none 消费者组在服务端找不到 offset ,则报错

 2.3 更新Offset值

消费者的Offset是储存在Broker中的,由消费者上报给Broker

提交Offset的方式分为两种方式

  • enable.auto.commit=true  自动提交
    • auto.commit.interval.ms=5000 (默认值5秒钟)自动提交的频率
  • enable.auto.commit=false 手动提交 
    consumer.commitSync() 手动同步提交
    consumer.commitAsync() 手动异步提交

 Offset若不提交或提交失败,Broker的Offset就不会更新,消息会被重复消费

3. 消费者消费策略

我们从博客MQ之初识kafka-CSDN博客中已经了解到了有一个消费者组的概念,kafka中使用消费者组,主要是为了提升消费效率和吞吐量。同一个Group中的消费者,不能消费想通过的Partition

 从上图中,我们可以看出,消费者组中的消费者数量是不一定的,在topic中分区partition的数量也是不确定的,那么究竟采取怎样的策略去进行消费的呢?

3.1 范围策略

范围策略就是按范围连续分配,如下图所示,假如我们的消费者组订阅的主题中有5个分区,但是消费者只有两个,那么就会对消费者进行一个简单的排序,在前面的就多消费一个

3.2 轮询策略

轮询策略就是消费者排好序后你一个我一个

4. ReBalance分区再平衡机制

分区再平衡本质上是一种协议,用来管理Consumer与Partition的匹配关系

那么何时会发生Rebalance呢?

  • 消费者组的消费者数量发生变化
  • Topic的分区数发生变更

那么ReBlance是谁来执行的呢?执行流程又是怎么样的呢? kafka中提供了一个协调者Coordinator机制进行协调,具体执行流程如下:

  • Broker集群中的各个服务器会先初始化一个GroupCoordinator服务
  • Consumer Group中的每个消费者也会初始化一个consumerCoordinator属性
  • 在集群中找到一个话事人,具体计算公式是 partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount) 计算出分区后,然后这个分区对应的leader所在的broker就是话事人
  • 之后消费者开始到话事人处进行签到,让话事人知道有几个消费者
  • 这个协调者会在消费者中选一个组长出来
  • 这个组长上报一个partition的分配方案
  • 由协调者最后通知各个消费者的这个分区方案

整体流程图如下:

5. 总结

        本文主要讲了消费者消费消息的流程,使用offset保证消息消费的正确性,以及offset的存储,offset如果找不到的话的策略配置,还有offset的更新。随后又介绍了消费者组中的消费者和主题中的分区之间的消费策略,最后当消费者数量发生变化,或者分区增加的情况下,kafka采用分区再平衡机制进行维护,利用Coordinator协调者机制,来维护分区和消费者数量的一个平衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/676292.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PTA字符串删除

已知del_str为字符串str中要删除的子串&#xff0c;请结合所学字符串操作实现在str中删除所有del_str子串&#xff0c;并统计del_str在str中出现的次数。 输入格式: 123dufvdfv123dfljvb 123 输出格式: dufvdfvdfljvb #include<stdio.h> #include<string.h> in…

SSM物流管理系统的设计与实现-计算机毕业设计源码44323

摘 要 科技进步的飞速发展引起人们日常生活的巨大变化&#xff0c;电子信息技术的飞速发展使得电子信息技术的各个领域的应用水平得到普及和应用。信息时代的到来已成为不可阻挡的时尚潮流&#xff0c;人类发展的历史正进入一个新时代。在现实运用中&#xff0c;应用软件的工作…

Java编程常见问题汇总一

系列文章目录 文章目录 系列文章目录前言一、字符串连接误用二、错误的使用StringBuffer三、测试字符串相等性四、数字转换成字符串五、利用不可变对象(Immutable) 前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分…

企业数字化转型的测度难题:基于大语言模型的新方法与新发现

《经济研究》新文章《企业数字化转型的测度难题&#xff1a;基于大语言模型的新方法与新发现》运用机器学习和大语言模型构造一套新的企业数字化转型指标。理论分析和数据交叉验证均表明&#xff0c;构建的指标相对已有方法更准确&#xff1a; 1.第一步&#xff1a;选择“管理…

45.自定义线程池(三)-拒绝策略

拒绝策略采用函数式接口参数传入&#xff0c;策略模式 FunctionalInterface public interface RejectPolicy<T> {void reject(BlockingQueue<T> queue, T task); } package com.xkj.thread.pool;import com.aspose.words.Run; import lombok.extern.slf4j.Slf4j;…

拒绝服务攻击

文章目录 拒绝服务攻击概述拒绝服务攻击简介分布式拒绝服务攻击DDoS与DoS的关系拒绝服务攻击分类 剧毒包型拒绝服务攻击WinNuke攻击泪滴(Teardrop) 攻击Land 攻击Ping of death攻击循环攻击 风暴型拒绝服务攻击风暴型DoS攻击风暴型攻击用的分组直接风暴型DDoSPING风暴攻击(直接…

03_03_初识SpringAOP和应用

一、SpringAOP的初识与原理 1、概述 AOP&#xff1a;面向切面编程OOP&#xff1a;面向对象编程面相切面编程&#xff1a;是基于OOP基础之上的新编程思想&#xff0c;OOP面向的主要是对象是类&#xff0c;而AOP面向的主要对象是切面&#xff0c;它在处理日志、安全管理、事务管…

第二十六章CSS3续~

3.CSS3渐变属性 CSS3渐变(gradients)可以在两个或多个指定的颜色之间显示平稳的过渡。 以前&#xff0c;我们必须使用图像来实现这些效果。但是&#xff0c;通过使用CSS3渐变(gradients)&#xff0c;可以减少下载的事件和宽带的使用。由于渐变(gradient)是由浏览器生成的&…

MyBatis学习(二)--MyBatis获取参数值的两种方式

1、搭建新的module:mybatis_parameter MyBatis获取参数值的两种方式&#xff1a;${}和#{} ${}的本质就是字符串拼接&#xff0c;采用sql拼接&#xff0c;无法防止sql注入 #{}的本质就是占位符赋值 &#xff0c;采用预编译 防止sql注入 不同参数使用案例 2、单个字面量类型…

深度学习-06-手动进行反向传播

深度学习-06-手动进行反向传播 本文是《深度学习入门2-自製框架》 的学习笔记&#xff0c;记录自己学习心得&#xff0c;以及对重点知识的理解。如果内容对你有帮助&#xff0c;请支持正版&#xff0c;去购买正版书籍&#xff0c;支持正版书籍不仅是尊重作者的辛勤劳动&#xf…

O2O : Finetuning Offline World Models in the Real World

CoRL 2023 Oral paper code Intro 算法基于TD-MPC&#xff0c;利用离线数据训练世界模型&#xff0c;然后在线融合基于集成Q的不确定性估计实现Planning。得到的在线数据将联合离线数据共同训练目标策略。 Method TD-MPC TD-MPC由五部分构成: 状态特征提取 z h θ ( s ) …

Amazon Q Developer 实战:从新代码生成到遗留代码优化(下)

简述 本文是使用 Amazon Q Developer 探索如何在 Visual Studio Code 集成编程环境&#xff08;IDE&#xff09;&#xff0c;从新代码生成到遗留代码优化的续集。在上一篇博客《Amazon Q Developer 实战&#xff1a;从新代码生成到遗留代码优化&#xff08;上&#xff09;》中…

java基础篇(1)

JDK是什么?有哪些内容组成?JDK是Java开发工具包 JVM虚拟机: Java程序运行的地方 核心类库: Java已经写好的东西&#xff0c;我们可以直接用开发工具: javac、java、jdb、jhat.. JRE是什么?有哪些内容组成? JRE是Java运行环境 JVM、核心类库、运行工具 JDK&#xff0c;JRE&…

Linux网络编程:传输层协议|UDP|TCP

知识引入&#xff1a; 端口号&#xff1a; 当应用层获得一个传输过来的报文时&#xff0c;这时数据包需要知道&#xff0c;自己应该送往哪一个应用层的服务&#xff0c;这时就引入了“端口号”&#xff0c;通过区分同一台主机不同应用程序的端口号&#xff0c;来保证数据传输…

Java1.8基于BS版 vue+ uniapp+ springboot专业团队自主研发的一套上门家政APP系统成品源码,支持商用(后台端介绍)

Java1.8基于BS版 vue uniapp springboot专业团队自主研发的一套上门家政APP系统成品源码&#xff0c;支持商用&#xff08;后台端介绍&#xff09; 家政服务后台端 家政服务后台端是一个专为家政服务行业设计的管理系统&#xff0c;用于处理业务运营、用户端管理、师傅端调度、…

Spring boot 随笔 1 DatasourceInitializer

0. 为啥感觉升级了 win11 之后&#xff0c;电脑像是刚买回来的&#xff0c;很快 这篇加餐完全是一个意外&#xff1a;时隔两年半&#xff0c;再看 Springboot-quartz-starter 集成实现的时候&#xff0c;不知道为啥我的h2 在应用启动的时候&#xff0c;不能自动创建quartz相关…

FL Studio怎么给钢琴加延音 FL Studio怎么用钢琴做伴奏

在使用钢琴音色进行音乐创作的时候&#xff0c;可以对钢琴进行延音处理&#xff0c;这样处理的音色给人的感觉会更加的饱满丰富&#xff0c;同时&#xff0c;给钢琴加了延音之后&#xff0c;钢琴的声音时值也会相应的变长&#xff0c;听起来更加的柔和。今天就和大家讲一讲&…

STM32使用HAL库UART接收不定长数据-1

使用STM32的HAL库实现UART串口不定长数据的接收 使用STM32的UART接收数据的时候&#xff0c;经常会遇到接收长度不固定的数据&#xff0c;比如一帧数据可能是10个字节&#xff0c;也可能是12个字节。这种数据称为不定长数据。 现有的很多通信协议是不定长的&#xff0c;比如mo…

vue3_组件间通信方式

目录 一、父子通信 1.父传子&#xff08; defineProps&#xff09; 2.父传子&#xff08;useAttrs&#xff09; 3.子传父&#xff08;ref&#xff0c;defineExpose &#xff09; 4.子传父&#xff08;defineEmits&#xff09; 5.子传父&#xff08;v-model&#xff09; …

数据库 mysql 的彻底卸载

MySQL卸载步骤如下&#xff1a; &#xff08;1&#xff09;按 winr 快捷键&#xff0c;在弹出的窗口输入 services.msc&#xff0c;打开服务列表。 &#xff08;2&#xff09;在服务列表中&#xff0c; 找到 mysql 开头的所有服务&#xff0c; 右键停止&#xff0c;终止对应的…