Kafka入门,分区的分配再平衡(二十)

分区的分配以及再平衡

在这里插入图片描述

1、kafka有四种主流的分区策略:Range,RoundRobin,Sticky,CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Ranage+CooperativeSticky。Kafka可以同事使用多个分区分配策略。

参数描述
heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于session.timeout.ms,也不应该高于session.timeoyt.ms的1/3
session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者执行再平衡
max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟,超过该值被移除,消费者执行再平衡
partition.assignment.strategy消费者分区分配策略,默认策略是Range+CooperativeStickt。Kafka可以同事使用多个分区分配策略。可以选择策略包括:Range,RoundRobin,sticky,CooperativeSticky

Range以及再平衡

在这里插入图片描述

Range分区策略原理
Range是对每个topic而言
首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
加入现在有7个分区,3个消费者,排序后的分区将会是0,1,2,3,4,5,6消费者排序完之后将会是C0,C1,C2
通过partitions数/consumer数来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费一个分区。
例如。7/3=2余1,除不尽,那么消费者C0便会多消费者1个分区。8/3=2余2,除不尽,那么C0和C1分别多消费一个。
注意:如果只是针对一个topic而言,C0消费者多一个分区影响不是很大,但是如果有N个topic,那么针对每个topic,消费者C0都将多消费一个分区,topic越多,C0消费的分区会比其他消费者明显多消费N个分区
容易尝试数据倾斜
测试代码

package com.longer.range;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * 测试指定分区(partition)
 */
public class Producer {
    public static void main(String[] args) throws InterruptedException {
        //1、创建kafka生产者得配置对象
        Properties properties=new Properties();
        //2、给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //3、key value 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //4、创建kafka生产者对象
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 500; i++) {
            //指定数据发送到1号分区,key为空(IDEA中,ctrl+p查看参数)
            producer.send(new ProducerRecord<>("two", "longer " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if(e==null){
                        System.out.println(String.format("主题:%s,分区:%s",metadata.topic(),metadata.partition()));
                        return;
                    }
                    e.printStackTrace();
                }

            });
            Thread.sleep(1000);
        }
        //关闭资源
        producer.close();
    }
}

package com.longer.range;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

用一个消费者每一秒发送一条信息,三个消费者接收。观察打印情况。再停止其中一个消费者,再观察情况。
(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 3、4 号分区数据。
2 号消费者:消费到 5、6 号分区数据。
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、1、2、3 号分区数据。
2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

RoundRobin 以及再平衡

在这里插入图片描述
RoundRobin针对集群所有Topic而言
RoundRobin沦陷分区策略,是把所有的partition和所有的consumer都列出来,然后按照hashcode而进行排序,最后通过沦陷算法来分配partition给各个消费者。
修改分区策略

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");

测试代码

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分区策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 2、5 号分区数据
2 号消费者:消费到 4、1 号分区数据
0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,
分别由 1 号消费者或者 2 号消费者消费。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、2、4、6 号分区数据
2 号消费者:消费到 1、3、5 号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有”粘性的“,即再执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配变动,可以节省大量的开销。
粘性分区时Kafka从0.11.x版本开始引入这种分配策略,首先会尽量保持原有分配的分区不变化
测试代码

package com.longer.sticky;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer1 {
    public static void main(String[] args) {
        //创建消费者的配置对象
        Properties properties=new Properties();
        //2、给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");
        //配置序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //修改分区策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
        //创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);
        //注册要消费的主题
        ArrayList<String> topics=new ArrayList<>();
        topics.add("two");
        kafkaConsumer.subscribe(topics);
        while (true){
            //设置1s中消费一批数据
            ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));
            //打印消费到的数据
            for(ConsumerRecord<String,String> record:consumerRecords){
                System.out.println(record);
            }
        }
    }
}

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 2、5、3 号分区数据。
2 号消费者:消费到 4、6 号分区数据。
0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别
由 1 号消费者或者 2 号消费者消费。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 2、3、5 号分区数据。
2 号消费者:消费到 0、1、4、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配

总结

range会造成数据倾斜,RoundRobin不会造成,但是分区调整不会考虑最小变动。sticky,尽量少的调整分配变动,可以节省大量的开销。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/36554.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【*1900倍数遍历】CF1627 D

Problem - D - Codeforces 题意&#xff1a; 思路&#xff1a; 在枚举数列子集的gcd时&#xff0c;通常可以枚举倍数 对于这道题要注意&#xff0c;j/i的gcd要为1&#xff0c;这样才能保证i是这个子集的最大公约数 Code&#xff1a; #include <bits/stdc.h>//#define…

决策树(Decision Tree)

文章目录 一、决策树 一、决策树 决策树在机器学习中也是比较常见的一种算法&#xff0c;属于监督学习中的一种。看字面意思应该也比较容易理解&#xff0c;相比其他算法比如支持向量机(SVM)或神经网络&#xff0c;似乎决策树感觉“亲切”许多。 优点&#xff1a;计算复杂度不…

Java 查找二叉树中某一结点的前驱结点以及后继结点

文章目录 前言什么是后继结点什么是前驱结点 代码实现查找某一结点的后继结点思路代码实现图解 查找某一结点的前驱结点思路代码实现图解 测试用例运行结果 结语 前言 给定二叉树结点定义Node结构如下&#xff0c;其中parent结点指向当前Node结点的父结点,根结点指向null&…

使用cuda报错的一次记录(CUDA error: out of memory)

原因&#xff1a; 由于batch_size设置过大导致的&#xff01;&#xff01;&#xff01;

nginx页面优化与防盗链

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 一、nginx页面优化1.版本号1.1 查看版本号1.2 修改版本号1.2.1 修改配置文件1.2.2 修改源码文件&#xff0c;重新编译安装 2.nginx的日志分割3.nginx的页面压缩3.1 …

ENSP模拟器如何设置命令行和描述框的背景颜色及字体

ENSP模拟器如何设置命令行和描述框的背景颜色及字体 选择“菜单 > 工具 > 选项”&#xff0c; 在弹出界面中选择“字体设置”。 单击“字体”后的“选择”设置字体&#xff0c;单击“字体颜色”后的“选择”设置字颜色&#xff0c;单击“背景颜色”后的“选择”设置…

【数据结构】从树到二叉树

目录 ​编辑 一. 前言 二. 树的概念及结构----凉拌海带 2.1 什么是树 2.2 树的基本术语 2.3 树的表示 2.4 树在实际生活中的应用 二. 二叉树的概念及结构----扬州炒饭 2.1 什么是二叉树 2.2 二叉树两种特殊形式 2.3 二叉树的性质 2.4 二叉树的存储结构 三. 链式二叉树基本操…

I/O模型

目录 一、I/O基本概念同步和异步阻塞和非阻塞线程在运行过程中&#xff0c;可能由于以下几种原因进入阻塞状态&#xff1a;可能阻塞套接字的Linux Sockets API调用分为以下四种 二、五种I/O模型阻塞I/O模型非阻塞式I/O模型I/O多路复用模型信号驱动式I/O模型异步I/O模型 三、五种…

On the Properties of Neural Machine Translation: Encoder–DecoderApproaches

摘要 Neural machine translation &#xff1a; 神经机器翻译。 神经机器翻译模型经常包含编码器和解码器&#xff1a;an encoder and a decoder. 编码器&#xff1a; 从一个变长输入序列中提取固定长度的表示。a fixed-length representation. 解码器&#xff1a;从表示中…

C语言图书管理系统

一&#xff0c;开发环境 操作系统&#xff1a;windows10, windows11, linux, mac等。开发工具&#xff1a;Qt, vscode, visual studio等开发语言&#xff1a;c 二&#xff0c;功能需求 1. 图书信息管理&#xff1a; 这个功能的主要任务是保存和管理图书的所有信息。这应该包…

(超级详细)如何在Mac OS上的VScode中配置OpenGL环境并编译

文章目录 安装环境下载GLAD与GLFW一、下载GLAD二、下载GLFW 项目结构配置测试程序与项目的编译测试可执行文件HelloGL 安装环境 机器&#xff1a;macbook air 芯片&#xff1a; M1芯片&#xff08;arm64&#xff09; macOS&#xff1a;macOS Ventura 13.4 VScode version&#…

天台玻璃折叠门可实现室内外空间的无缝连接

天玻璃折叠门是指安装在天台上的可折叠开合的玻璃门&#xff0c;可用于将室外空间与室内空间进行隔离或连接。设计天台玻璃折叠门时需要注意以下几点&#xff1a; 1. 结构稳固性&#xff1a;选择坚固、稳定的材料和结构设计&#xff0c;确保门体在风力和其他外力作用下不易摇晃…

HTTP第18讲——HTTP的缓存控制

诞生背景 由于链路漫长&#xff0c;网络时延不可控&#xff0c;浏览器使用 HTTP 获取资源的成本较高。所以&#xff0c;非常有必要把“来之不易”的数据缓存起来&#xff0c;下次再请求的时候尽可能地复用。这样&#xff0c;就可以避免多次请求 - 应答的通信成本&#xff0c;节…

37.RocketMQ之Broker消息存储源码分析

highlight: arduino-light 消息存储文件 rocketMQ的消息持久化在我们在搭建集群时都特意指定的文件存储路径,进入指定的store目录下就可以看到。 下面介绍各文件含义 CommitLog 存储消息的元数据。produce发出的所有消息都会顺序存入到CommitLog文件当中。 CommitLog由多个文件…

哈达玛矩阵乘法

哈达玛矩阵乘法 作者: 赵晓鹏时间限制: 1S章节: 递归与分治 输入说明 : 见问题描述。 输出说明 : 见问题描述。 输入范例 : 1 4 -6 输出范例 : -2 10 #include <iostream> #include <vector> using namespace std; vector<int>res; void cal(int len…

SpringBoot2+Vue2实战(十六)vue集成视频播放组件

修改文件上传大小限制 servlet:multipart:max-file-size: 100MBmax-request-size: 100MB Video.vue <template><div style"padding: 10px"><el-card><div v-for"item in videos" :key"item.id"style"margin: 10px 0…

8266使用巴法云OTA

为了使用方便把OTA封装一下为以下类 #include "ESP8266HTTPUpdate.h"class OTA { private:ESP8266HTTPUpdate httpUpdate;// using HTTPUpdateStartCB std::function<void()>;void OnStartCB(){Serial.println("开始OTA升级");}// using HTTPUpdat…

OpenCV图像金字塔pyrDown下采样

#include <opencv2/opencv.hpp> #include <opencv2/imgproc/imgproc.hpp>using namespace cv;int main() {// Load the original imageMat srcImage

Jina AI 受邀出席 WAIC 2023「科技无障碍」论坛,与行业专家共话 AI 普惠未来

7 月 6 日&#xff0c;2023 世界人工智能大会&#xff08;WAIC&#xff09;在上海世博中心及世博展览馆开幕&#xff0c;并在浦东张江、徐汇西岸设分会场&#xff0c;同步在闵行等产业集聚区开展同期活动。本届大会由上海市人民政府和国家发改委、工信部、科技部、国家网信办、…

【群智能算法】猎人猎物优化算法 HPO算法【Matlab代码#48】

文章目录 【获取资源请见文章第4节&#xff1a;资源获取】1. 猎人猎物优化算法&#xff08;HPO&#xff09;2. 部分代码展示3. 仿真结果展示4. 资源获取说明 【获取资源请见文章第4节&#xff1a;资源获取】 1. 猎人猎物优化算法&#xff08;HPO&#xff09; 猎人猎物优化算法…