Kafka学习笔记(三)Kafka分区和副本机制、自定义分区、消费者指定分区

文章目录

  • 前言
  • 7 分区和副本机制
    • 7.1 生产者分区写入策略
      • 7.1.1 轮询分区策略
      • 7.1.2 随机分区策略
      • 7.1.3 按key分区分配策略
      • 7.1.4 自定义分区策略
        • 7.1.4.1 实现`Partitioner`接口
        • 7.1.4.2 实现分区逻辑
        • 7.1.4.3 配置使用自定义分区器
        • 7.1.4.4 分区测试
    • 7.2 消费者分区分配策略
      • 7.2.1 RangeAssignor(范围分配策略)
      • 7.2.2 RoundRobinAssignor(轮询分配策略)
      • 7.2.3 StickyAssignor(粘性分配策略)
      • 7.2.4 消费者组的Reblance机制
    • 7.3 副本机制
      • 7.3.1 生产者的`acks`参数
      • 7.3.2 `acks`参数配置为0
      • 7.3.2 `acks`参数配置为1
      • 7.3.3 `acks`参数配置为-1或all
      • 7.3.4 基准测试
    • 7.4 消费指定分区数据

前言

Kafka学习笔记(一)Linux环境基于Zookeeper搭建Kafka集群、Kafka的架构
Kafka学习笔记(二)Kafka基准测试、幂等性和事务、Java编程操作Kafka

7 分区和副本机制

7.1 生产者分区写入策略

生产者写入消息到Topic,Kafka将依据不同的策略将数据分配到不同的分区中,主要有以下策略:

7.1.1 轮询分区策略

默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区。

7.1.2 随机分区策略

每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区,但后续轮询策略表现更佳,所以基本上很少会使用随机策略。

7.1.3 按key分区分配策略

根据key值,通过一定的算法将消费分配到不同分区。按key分配策略,有可能会出现「数据倾斜」,例如某个key包含了大量的数据,因为key值一样,所有所有的数据将都分配到这个分区中,造成该分区的消息数量远大于其他的分区。

7.1.4 自定义分区策略

轮询策略、随机策略都会导致一个问题,生产到Kafka中的数据是乱序存储的。而按key分区可以一定程度上实现数据有序存储(分区内局部有序),但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。

7.1.4.1 实现Partitioner接口

在Java中,自定义分区需要实现org.apache.kafka.clients.producer.Partitioner接口,该接口定义了如下方法:

  • topic:针对特定Topic使用不同的分区规则。
  • keykeyBytes:针对特定key值使用不同的分区规则。
  • valuevalueBytes:针对特定的消息内容使用不同的分区规则。
  • cluster:Cluster对象提供了Topic的分区信息,可以据此动态调整分区策略。
7.1.4.2 实现分区逻辑

重写partition()方法,实现分区逻辑。例如:

/**
 * 自定义分区器
 */
public class MyKafkaPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if(key != null) {
            String keyString = (String) key;
            // key 以 animal 开头时分配到分区 0
            if(keyString.startsWith("animal")) {
                return 0;
            }
            // key 以 food 开头时分配到分区 1
            if(keyString.startsWith("food")) {
                return 1;
            }
        }
        // 默认分配到分区 0
        return 0;
    }
    
    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public void close() {
    }
}
7.1.4.3 配置使用自定义分区器

在Kafka生产者配置中,使用自定义分区器的类名:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());
7.1.4.4 分区测试

向3分区1副本的Topic[topic_3_1]发送key值为animal_rabbit的消息:

执行结果如下:

将key值修改为food_apple,则分配的分区是1:

7.2 消费者分区分配策略

通过消费者组(Consumer Group),Kafka允许多个消费者共同处理某个Topic的消息,但生产者已经将消息写入了Topic的不同分区,因此首先要解决哪个消费者消费哪个分区的数据的问题,即消费者分区分配策略问题。

在Java中,ConsumerPartitionAssignor接口用来定制消费者的分区分配策略,该接口的3个子类实现分别对应3种消费者分区分配策略。

7.2.1 RangeAssignor(范围分配策略)

范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。

注意:范围分配策略是针对每个Topic的。

范围分配策略有两个算法公式:

  • n = 分区数量 / 消费者数量
  • m = 分区数量 % 消费者数量

策略结果是:前m个消费者消费n+1个分区,剩余消费者消费n个分区。如图:

7.2.2 RoundRobinAssignor(轮询分配策略)

轮询分配策略是将消费者组内所有消费者以及消费者所订阅的所有Topic的分区按照字典序排序(Topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区分配给每个消费者。

注意:轮询分配策略不局限于单个Topic。

如上图所示,3个消费者共订阅了2个Topic,共8个分区,将8个分区按照字典序排序后,开始轮询:

  • TopicA p0 → consumer0
  • TopicA p1 → consumer1
  • TopicA p2 → consumer2
  • TopicA p3 → consumer0
  • TopicB p0 → consumer1
  • TopicB p1 → consumer2
  • TopicB p2 → consumer0
  • TopicB p3 → consumer1
  • Topica p0 → consumer2

7.2.3 StickyAssignor(粘性分配策略)

从Kafka 0.11.x版本开始,引入此类分配策略。其主要目的在于使分区分配尽可能均匀,同时在Topic或消费者发送变动需要重新分配时,分区的分配尽可能与上一次分配保持相同。

粘性分配策略主要作用在需要重新分配的情况,而不需要重新分配时和轮询分配策略类似。如图:

如果consumer2崩溃了,此时需要进行重新分配。而粘性分配策略会保留重新分配之前的分配结果,只是将原先consumer2负责的两个分区再均匀分配给consumer0、consumer1。

例如:之前consumer0、consumer1正在消费某几个分区,但由于需要重新分配,导致consumer0、consumer1需要取消处理,之后重新消费之前正在处理的分区,导致不必要的系统开销。而粘性分配策略可以明显减少这样的系统资源浪费。

7.2.4 消费者组的Reblance机制

上面提到了消费者的分区重新分配,其实就是Kafka中的Rebalance机制,称之为再均衡

Reblance机制是Kafka中确保消费者组下所有的consumer如何达成一致,分配订阅的Topic的每个分区的机制。

Rebalance触发的时机有:

  • 1)消费者组中consumer的个数发生变化。例如:有新的consumer加入到消费者组,或者是某个consumer停止了。

  • 2)订阅的Topic个数发生变化。消费者可以订阅多个主题,假设当前消费者组订阅了三个主题,但有一个主题突然被删除了,此时也需要发生再均衡。

  • 3)订阅的Topic分区数发生变化。

当然,Reblance机制的不良影响也挺大的。发生Rebalance时,消费者组下的所有consumer都将停止工作,直到Rebalance完成。

7.3 副本机制

副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以从其他备份上读取,保障数据可用。

7.3.1 生产者的acks参数

生产者配置的acks参数,表示当生产者生产消息时,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。

例如,在之前的测试代码中有如下配置:

props.put("acks", "all");

7.3.2 acks参数配置为0

acks参数配置为0,生产者不会等到Broker确认,而直接发送下一条数据。因此它的性能最高,但有可能会丢失数据。

7.3.2 acks参数配置为1

acks参数配置为1,生产者会等待leader副本确认接收后,才会发送下一条数据,性能中等。

7.3.3 acks参数配置为-1或all

acks参数配置为1,生产者会等待所有副本同步完成并确认接收后,才会发送下一条数据,性能最低。

7.3.4 基准测试

分别对不同的acks参数进行基准测试,acks参数为0时的命令如下,其余类推:

bin/kafka-producer-perf-test.sh --topic topic_1_1 --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=192.168.245.130:9092,192.168.245.131:9092,192.168.245.132:9092 acks=0

基准测试结果如下:

指标(1分区1副本)ack=0ack=1ack=-1/all
吞吐量18299.132255 records/sec19160.979049 records/sec13137.876761 records/sec
吞吐速率17.45 MB/sec18.27 MB/sec12.53 MB/sec
平均延迟时间1769.71 ms1692.25 ms2473.96 ms
最大延迟时间5490.00 ms4455.00 ms10434.00 ms

由此可见,acks参数为0和1时性能相当,为-1/all时性能大幅下降。

7.4 消费指定分区数据

如上图所示的Kafka消费者代码,只需要指定Topic,就可以直接读取消息,而不需要管理分区、副本、offset等元数据,实现方便。

这是因为,Kafka的偏移量offset是由Zookeeper管理的,消费者会自动根据上一次在Zookeeper中保存的offset去接着获取数据。不同的消费者组,在Zookeeper中保存了不同的offset,这样不同消费者组读取同一个Topic就不会有任何影响。

但以上代码也有缺点,就是不能细化控制分区、副本、offset等,从而无法从指定位置读取数据。

如果想要手动指定消费分区,则不能再使用之前的subscribe()方法订阅主题,而是要用assign()方法:

// 3. 订阅要消费的主题
// 指定消费者从哪个topic中拉取数据
// kafkaConsumer.subscribe(Arrays.asList("my_topic"));

String topic = "topic_3_1";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
TopicPartition partition2 = new TopicPartition(topic, 2);
// 手动指定只消费分区1的数据
kafkaConsumer.assign(Arrays.asList(partition1));

利用自定义分区策略(详见7.1.4节),向Topic[topic_3_1]的分区0、分区1分别写入数据:

但消费者只消费了分区1的数据:

本节完,更多内容请查阅分类专栏:微服务学习笔记

感兴趣的读者还可以查阅我的另外几个专栏:

  • SpringBoot源码解读与原理分析
  • MyBatis3源码深度解析
  • Redis从入门到精通
  • MyBatisPlus详解
  • SpringCloud学习笔记

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/886489.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【华为HCIP实战课程三】动态路由OSPF的NBMA环境建立邻居及排错,网络工程师

一、NBMA环境下的OSPF邻居建立问题 上节我们介绍了NBMA环境下OSPF邻居建立需要手动指定邻居,因为NBMA环境是不支持广播/组播的 上一节AR1的配置: ospf 1 peer 10.1.1.4 //手动指定邻居的接口地址,而不是RID peer 10.1.1.5 area 0.0.0.0 手动指定OSPF邻居后抓包查看OSP…

C语言的内存结构

在电脑中C语言编译器也像其他软件一样占用一块内存空间。 为了更好的利用好这块内存&#xff0c;C语言将他们分为 在C语言中&#xff0c;变量定义的位置不一样&#xff0c;那么在内存中所处的位置也是不一样的。&#xff08;变量在函数内部是存储在栈里&#xff0c;而在函数外部…

SPI通信——FPGA学习笔记14

一、简介 SPI(Serial Periphera Interface&#xff0c;串行外围设备接口)通讯协议&#xff0c;是 Motorola 公司提出的一种同步串行接口技术&#xff0c;是一种高速、全双工、同步通信总线&#xff0c;在芯片中只占用四根管脚用来控制及数据传输&#xff0c;广泛用于 EEPROM、F…

基于STM32的智能家居灯光控制系统设计

引言 本项目将使用STM32微控制器实现一个智能家居灯光控制系统&#xff0c;能够通过按键、遥控器或无线模块远程控制家庭照明。该项目展示了如何结合STM32的外设功能&#xff0c;实现对灯光的智能化控制&#xff0c;提升家居生活的便利性和节能效果。 环境准备 1. 硬件设备 …

C--编译和链接见解

欢迎各位看官&#xff01;如果您觉得这篇文章对您有帮助的话 欢迎您分享给更多人哦 感谢大家的点赞收藏评论 感谢各位看官的支持&#xff01;&#xff01;&#xff01; 一&#xff1a;翻译环境和运行环境 在ANSIIC的任何一种实现中&#xff0c;存在两个不同的环境1&#xff0c;…

戴尔电脑怎么开启vt虚拟化_戴尔电脑新旧机型开启vt虚拟化教程

最近使用戴尔电脑的小伙伴们问我&#xff0c;戴尔电脑怎么开启vt虚拟。大多数可以在Bios中开启vt虚拟化技术&#xff0c;当CPU支持VT-x虚拟化技术&#xff0c;有些电脑会自动开启VT-x虚拟化技术功能。而大部分的电脑则需要在Bios Setup界面中&#xff0c;手动进行设置&#xff…

SpringCloud入门(九)Feign实战应用和性能优化

一、Feign实战应用 Feign的客户端与服务提供者的controller代码非常相似&#xff1a; 有没有一种办法简化这种重复的代码编写呢&#xff1f; 方式一&#xff1a;继承 优点&#xff1a; 简单。实现了代码共享。 缺点&#xff1a;服务提供方、服务消费方紧耦合。参数列表中的注解…

vscode安装及c++配置编译

1、VScode下载 VS Code官网下载地址&#xff1a;Visual Studio Code - Code Editing. Redefined。 2、安装中文插件 搜索chinese&#xff0c;点击install下载安装中文插件。 3、VS Code配置C/C开发环境 3.1、MinGW-w64下载 VS Code是一个高级的编辑器&#xff0c;只能用来写代…

Coggle数据科学 | Kaggle赛题解析:CMI 体育损伤指数预测

本文来源公众号“Coggle数据科学”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;Kaggle赛题解析&#xff1a;CMI 体育损伤指数预测 赛题名称&#xff1a;Child Mind Institute — Problematic Internet Use 赛题类型&#xff1a…

Windows 环境上安装 NASM 和 YASM 教程

NASM 和 YASM NASM NASM&#xff08;Netwide Assembler&#xff09;是一个开源的、可移植的汇编器&#xff0c;它支持多种平台和操作系统。它可以用来编写16位、32位以及64位的代码&#xff0c;并且支持多种输出格式&#xff0c;包括ELF、COFF、OMF、a.out、Mach-O等。NASM使用…

GS-SLAM论文阅读笔记--GEVO

前言 这篇文章看着就让人好奇。众所周知&#xff0c;高斯是一个很不错的建图方法&#xff0c;但是本文的题目居然是只用高斯进行单目VO&#xff0c;咱也不知道这是怎么个流程&#xff0c;看了一下作者来自于MIT&#xff0c;说不定是个不错的工作&#xff0c;那就具体看看吧&am…

LeetCode从入门到超凡(五)深入浅出---位运算

引言 大家好&#xff0c;我是GISer Liu&#x1f601;&#xff0c;一名热爱AI技术的GIS开发者。本系列文章是我跟随DataWhale 2024年9月学习赛的LeetCode学习总结文档&#xff1b;本文主要讲解 位运算算法。&#x1f495;&#x1f495;&#x1f60a; 一、 位运算简介 1.什么是位…

简易CPU设计入门:取指令(三),ip_buf与rd_en的非阻塞赋值

在开篇&#xff0c;还是请大家首先准备好本项目所用的源代码。如果已经下载了&#xff0c;那就不用重复下载了。如果还没有下载&#xff0c;那么&#xff0c;请大家点击下方链接&#xff0c;来了解下载本项目的CPU源代码的方法。 下载本项目代码 准备好了项目源代码以后&…

【重学 MySQL】五十一、更新和删除数据

【重学 MySQL】五十一、更新和删除数据 更新数据删除数据注意事项 在MySQL中&#xff0c;更新和删除数据是数据库管理的基本操作。 更新数据 为了更新&#xff08;修改&#xff09;表中的数据&#xff0c;可使用UPDATE语句。UPDATE语句的基本语法如下&#xff1a; UPDATE ta…

【ADC】噪声(1)噪声分类

概述 本文学习于TI 高精度实验室课程&#xff0c;总结 ADC 的噪声分类&#xff0c;并简要介绍量化噪声和热噪声。 文章目录 概述一、ADC 中的噪声类型二、量化噪声三、热噪声四、量化噪声与热噪声对比 一、ADC 中的噪声类型 ADC 固有噪声由两部分组成&#xff1a;第一部分是量…

【重学 MySQL】四十六、创建表的方式

【重学 MySQL】四十六、创建表的方式 使用CREATE TABLE语句创建表使用CREATE TABLE LIKE语句创建表使用CREATE TABLE AS SELECT语句创建表使用CREATE TABLE SELECT语句创建表并从另一个表中选取数据&#xff08;与CREATE TABLE AS SELECT类似&#xff09;使用CREATE TEMPORARY …

【JVM】垃圾释放方式:标记-清除、复制算法、标记-整理、分代回收

文章目录 1. 标记-清除2. 复制算法4. 标记-整理4. 分代回收 把标记为垃圾的对象的内存空间进行释放。主要有三种释放方式 1. 标记-清除 把标记为垃圾的对象&#xff0c;直接释放掉&#xff08;最朴素的做法&#xff09; 此时就是把标记为垃圾的对象所对应的内存空间直接释放。…

本地化测试对游戏漏洞修复的影响

本地化测试在游戏开发的质量保证过程中起着至关重要的作用&#xff0c;尤其是在修复bug方面。当游戏为全球市场做准备时&#xff0c;它们通常会被翻译和改编成各种语言和文化背景。这种本地化带来了新的挑战&#xff0c;例如潜在的语言错误、文化误解&#xff0c;甚至是不同地区…

k8s中,ingress的实现原理,及其架构。

图片来源&#xff1a;自己画的 图片来源&#xff1a;k8s官网 首先&#xff0c;什么是ingress? 是服务还是控制器&#xff1f; 都不精确 ingress是一个api资源 service和deployment也是api资源。 这几个相互协作&#xff0c;组建成一个对外提供服务的架构。 ingress提供的…

MDM监管锁系统上锁流程

上锁与解锁 上锁设备 完整的上锁流程可参考: https://b23.tv/UvM35sU 上锁需要已经注册了一个普通用户 并使用管理员分配了台数 且有可用的MDM证书和ABM证书(公有和私有的都可以 只要有可用的就可以) 一部用来上锁的手机 链接wifi wifi必须要是2.4g频段 不要使用5gwifi 上锁…