分布式 - 消息队列Kafka:Kafka消费者的分区分配策略

文章目录

      • 1. 环境准备
      • 2. range 范围分区策略介绍
      • 3. round-robin 轮询分区策略
      • 4. sticky 粘性分区策略
      • 5. 自定义分区分配策略

1. 环境准备

创建主题 test 有5个分区,准备 3 个消费者并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

① 创建主题 test,该主题有5个分区,2个副本:

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 5 --replication-factor 2  --topic test
Created topic test.
[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --describe --topic test
Topic:test      PartitionCount:5        ReplicationFactor:2     Configs:
Topic: test     Partition: 0    Leader: 2       Replicas: 2,0   Isr: 2,0
Topic: test     Partition: 1    Leader: 0       Replicas: 0,1   Isr: 0,1
Topic: test     Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2
Topic: test     Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1
Topic: test     Partition: 4    Leader: 0       Replicas: 0,2   Isr: 0,2

② 创建3个消费者CustomConsumer01, CustomConsumer02, CustomConsumer03,消费者组名相同,这样3个消费者属于同一个组:

public class CustomConsumer01 {
    private static final String brokerList = "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
        return properties;
    }
    
    public static void main(String[] args) {
        Properties properties = initConfig();
        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题 test
        ArrayList<String> topics = new ArrayList<>();
        topics.add(topic);
        consumer.subscribe(topics);
        // 消费数据
             while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                 System.out.println("分区"+consumerRecord.partition()+"消费数据:"+consumerRecord.value());
            }
        }
    }
} 

③ 创建生产者用来向主题 test 发送消息,随机发送到不同的分区:

public class CustomProducer01 {
    private static final String brokerList = "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return properties;
    }

    public static void main(String[] args) throws InterruptedException {
        // kafka生产者属性配置
        Properties properties = initConfig();
        // kafka生产者发送消息 
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for(int i=0;i<50;i++){
            kafkaProducer.send(new ProducerRecord<>(topic ,"hello,kafka"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if(exception==null){
                        System.out.println("recordMetadata 发往的分区:"+recordMetadata.partition());
                    }else{
                        exception.printStackTrace();
                    }
                }
            });
            Thread.sleep(2);
        }
        kafkaProducer.close();
    }
}

在分区再均衡时消费者组下所有的消费者都会协调在一起共同参与分区分配,这是如何完成的呢?Kafka 新版本 消费者默认提供了3种分配策略,分别是 range 策略、round-robin策略和sticky策略。所谓的分配策略决定了订阅主题的每个分区会被分配给哪个消费者。

① range 策略主要是基于范围的思想。它将单个主题的所有分区按照顺序排列,然后把这些分区划分成固定大小的分区段并依次分配给每个消费者;

② round-robin 策略则会把所有主题的所有分区顺序摆开,然后轮询式地分配给各个消费者。

③ sticky 策略有效地避免了上述两种策略完全无视历史分配方案的缺陷,采用了“有黏性”的策略对所有消费者实例进行分配,可以规避极端情况下的数据倾斜并且在两次rebalance间最大限度地维持了之前的分配方案。

2. range 范围分区策略介绍

RangeAssignor 分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给消费者组内所有的消费者。

针对每一个主题而言,RangeAssignor策略会将订阅这个主题的消费组内的所有消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

在这里插入图片描述

① 消费者组1订阅了主题A,则对主题A内的分区按照序号进行排序,对消费者组1内的消费者按照名称的字典序进行排序。然后用分区的总数除以消费者总数即5/4=1余1,则分区分配结果如图①:前1个消费者消费2个分区,其余消费者消费1个分区。

② 消费者组2订阅了主题B,则对主题B内的分区按照序号进行排序,对消费者组2内的消费者按照名称的字典序进行排序。然后用分区的总数除以消费者总数即5/3=1余2,则分区分配结果如图②:前 2个消费者消费2个分区,其余消费者消费1个分区。

注意:如果只是针对 1 个主题而言,consumer0 消费者多消费1个分区影响不是很大。但是如果有 N 多个主题,那么每个主题,消费者 consumer0 都将多消费 1 个分区,最终 consumer0 消费者会比其他消费者多消费 N 个分区,则有可能出现部分消费者过载的情况。

range 分区分配策略演示:

Kafka提供了消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为 org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。

① 先启动3个消费者,然后启动生产者发送消息,查看每个消费者消费的分区:

consumer02 消费者消费分区0和分区1:

分区0消费数据:hello,kafka
分区1消费数据:hello,kafka
...

consumer03 消费者消费分区2和分区3:

分区2消费数据:hello,kafka
分区3消费数据:hello,kafka
...

consumer01 消费者消费的分区4:

分区4消费数据:hello,kafka
...

② 停止掉 consumer01 消费者,等待45s 以后再次重新发送消息观看结果:

在这里插入图片描述

consumer02 消费者消费的分区:

分区0消费数据:hello,kafka
分区1消费数据:hello,kafka
分区2消费数据:hello,kafka
...

consumer03 消费者消费的分区:

分区3消费数据:hello,kafka
分区4消费数据:hello,kafka
...

consumer01 消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,时间到了 45s 后,判断它真的退出后就会重新按照 range 方式分配分区给消费者。

3. round-robin 轮询分区策略

round-robin 轮询分区策略针对集群中所有主题而言,它的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。

在这里插入图片描述

消费者组1订阅了主题A和主题B,则对主题A和主题B内的所有分区按照序号进行排序,对消费者1内的所有消费者按照名称字典序进行排序,然后通过轮询方式逐个将分区依次分配给每个消费者,则分区分配结果如图所示。

round-robin 轮询分区策略演示:

① 修改消费者 consumer01、consumer02、consumer03的分区分配策略为 RoundRobinAssignor,同时修改消费者组名为 test-group

// 设置分区分配策略为 round-robin
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

② 启动三个消费者,然后启动生产者发送消息,查看分区分配结果:

consumer02 消费者消费分区0和分区3:

分区0消费数据:hello,kafka
分区3消费数据:hello,kafka
...

consumer03 消费者消费分区1和分区4:

分区1消费数据:hello,kafka
分区4消费数据:hello,kafka
...

consumer01 消费者消费的分区2:

分区2消费数据:hello,kafka
...

③ 停止掉 consumer01 消费者,等待45s 以后再次重新发送消息观看结果:

在这里插入图片描述

consumer02 消费者消费的分区:

分区0消费数据:hello,kafka
分区2消费数据:hello,kafka
分区4消费数据:hello,kafka
...

consumer03 消费者消费的分区:

分区1消费数据:hello,kafka
分区3消费数据:hello,kafka
...

consumer01 消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,时间到了 45s 后,判断它真的退出后就会重新按照 round-robin 方式分配分区给消费者。

4. sticky 粘性分区策略

粘性策略是 Kafka 从 0.11.x 版本开始引入,首先会尽量均衡的放置分区到消费者上面,在同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

在这里插入图片描述

① 消费者组1订阅了主题A和主题B,则对主题A和主题B内的所有分区按照序号进行排序,对消费者1内的所有消费者按照名称字典序进行排序,然后通过轮询方式逐个将分区依次分配给每个消费者。则分区分配结果如图①

这样初看上去似乎与采用round-robin 分配策略所分配的结果相同,但事实是否真的如此呢?假设此时消费者consumer1 脱离了消费组,那么消费组就会执行再均衡操作,进而消费分区会重新分配。

② 如果采用round-robin 分配策略,那么此时的分配结果如图②

③ 如果采用round-robin 分配策略,那么此时的分配结果如图③

可以看到分配结果中保留了上一次分配中对消费者 consumer0 和 consumer2 的所有分配结果,并将原来消费者consumer1 的“负担”分配给了剩余的两个消费者 consumer0 和 consumer2 ,最终 consumer0 和 consumer2 的分配还保持了均衡。

如果发生分区重分配,那么对于同一个分区而言,有可能之前的消费者和新指派的消费者不是同一个,之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor 分配策略如同其名称中的“sticky”一样,让分配策略具备一定的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。

sticky 轮询分区策略演示:

① 修改消费者 consumer01、consumer02、consumer03的分区分配策略为 StickyAssignor,同时修改消费者组名为 test-group-1

// 设置分区分配策略为sticky
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());

② 启动三个消费者,然后启动生产者发送消息:

consumer01 消费者消费分区0和分区3:

分区0消费数据:hello,kafka
分区3消费数据:hello,kafka
...

consumer03 消费者消费分区1和分区4:

分区1消费数据:hello,kafka
分区4消费数据:hello,kafka
...

consumer02 消费者消费的分区2:

分区2消费数据:hello,kafka
...

③ 停止掉 consumer01 消费者,等待45s 以后再次重新发送消息观看结果:

在这里插入图片描述

consumer02 消费者消费的分区:

分区2消费数据:hello,kafka
分区0消费数据:hello,kafka
...

consumer03 消费者消费的分区:

分区1消费数据:hello,kafka
分区4消费数据:hello,kafka
分区3消费数据:hello,kafka
...

consumer01 消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,时间到了 45s 后,判断它真的退出后就会重新按照 sticky 方式分配分区给消费者。

5. 自定义分区分配策略

Kafka提供了消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为 org.apache.kafka.clients.consumer.RangeAssignor,即采用RangeAssignor分配策略。除此之外,Kafka还提供了另外两种分配策略:RoundRobinAssignor 和 StickyAssignor。消费者客户端参数partition.assignment.strategy可以配置多个分配策略,彼此之间以逗号分隔。

public class MyAssignor extends AbstractPartitionAssignor {
    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionPerTopic, Map<String, Subscription> subscriptions) {
        Map<String,List<String>> consumersPerTopic =  consumersPerTopic(subscriptions);
        Map<String,List<TopicPartition>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet()) {
            assignment.put(memberId,new ArrayList<>());
        }   
        //针对每一个主题进行分区分配
        for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            List<String> consumersForTopic = topicEntry.getValue();
            int consumerSize = consumersForTopic.size();

            Integer numPartitionsForTopic = partitionPerTopic.get(topic);
            if(numPartitionsForTopic == null){
                continue;
            }

            // 当前主题下所有的分区
            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            for (TopicPartition partition : partitions) {
                int rand = new Random().nextInt(consumerSize);
                String randomConsumer = consumersForTopic.get(rand);
                assignment.get(randomConsumer).add(partition);
            }
        }
        return assignment;

    }

    // 获取每个主题对应的消费者列表
    private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
        Map<String, List<String>> res = new HashMap<>();
        for (Map.Entry<String, Subscription> stringSubscriptionEntry : consumerMetadata.entrySet()) {
            String consumerId = stringSubscriptionEntry.getKey();
            for (String topic : stringSubscriptionEntry.getValue().topics()) {
                put(res,topic,consumerId);
            }
        }
        return res;
    }

    @Override
    public String name() {
        return "my-assignor";
    }
}

在消费者中使用自定义的分区分配策略:

public class CustomConsumer01 {
    private static final String brokerList = "10.65.132.2:9093";
    private static final String topic = "test";

    public static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group-1");

        // 设置分区分配策略为 round-robin
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, MyAssignor.class.getName());
        return properties;
    }
    public static void main(String[] args) {
        Properties properties = initConfig();
        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题 test
        ArrayList<String> topics = new ArrayList<>();
        topics.add(topic);
        consumer.subscribe(topics);
        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println("分区"+consumerRecord.partition()+"消费数据:"+consumerRecord.value());
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/70417.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

fastadmin 自定义搜索分类和时间范围

1.分类搜索&#xff0c;分类信息获取----php 2.对应html页面&#xff0c;页面底部加搜索提交代码&#xff08;这里需要注意&#xff1a;红框内容&#xff09; 图上代码----方便直接复制使用 <script id"countrySearch" type"text/html"><!--form…

python之matplotlib入门初体验:使用Matplotlib进行简单的图形绘制

目录 绘制简单的折线图1.1 修改标签文字和线条粗细1.2 校正图形1.3 使用内置样式1.4 使用scatter()绘制散点图并设置样式1.5 使用scatter()绘制一系列点1.6 python循环自动计算数据1.7 自定义颜色1.8 使用颜色映射1.9 自动保存图表练习题 绘制简单的折线图 绘制一个简单折线图…

GPT-3.5 人工智能还是人工智障?——西红柿炒钢丝球!!

人工智能还是人工智障&#xff1f;——西红柿炒钢丝球 西红柿炒钢丝球的 基本信息西红柿炒钢丝球的 详细制作方法材料步骤 备注幕后花絮。。。。。。。。。关于GPT-3.5&#xff0c;你的看法&#xff1a; 西红柿炒钢丝球的 基本信息 西红柿炒钢丝球是一道具有悠久历史的传统中式…

springboot汽车租赁后台java出租客户管理jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 springboot汽车租赁后台 系统有1权限&#xff1a;管理…

阿里巴巴面试题---考察对底层源代码的熟悉程度

题目如图所示: 很多人可能会觉得两个输出都会是false,因为我们都会觉得""比较的是引用类型的地址,虽然放入的值都一样但是重新创造了新对象,地址不一样,所以结果都是false. 然而,当我们运行程序会发现结果都是false. 下面,我们来分析为什么是这样的结果. 我们知道…

OneFlow 中的 Softmax

Softmax 是深度学习模型中的常见算子。PyTorch 的 Softmax 算子直接调用 cuDNN 的接口。而 OneFlow 内部针对输入数据的类别数量&#xff0c;采用3个 kernel 来分别处理&#xff0c;在多数情况下都可以获得比 cuDNN 更优的性能表现。测试结果可见 如何实现一个高效的Softmax CU…

未来混合动力汽车的发展:技术探索与前景展望

随着环境保护意识的增强和对能源消耗的关注&#xff0c;混合动力汽车成为了汽车行业的研发热点。混合动力汽车融合了传统燃油动力和电力动力系统&#xff0c;通过优化能源利用效率&#xff0c;既降低了燃油消耗和排放&#xff0c;又提供了更长的续航里程。本文将探讨混合动力汽…

配置docker,案例复现

配置docker(系统为centos) 1.检查操作系统环境: docker要求CentOS系统的内核版本高于 3.10 &#xff0c;通过 uname -r 命令查看你当前的内核版本是否支持安装docker 2.查看你是否拥有旧的版本&#xff0c;有的话卸载&#xff0c;没有的话直接略过该步骤 sudo yum remove d…

【快应用】list组件属性的运用指导

【关键词】 list、瀑布流、刷新、页面布局 【问题背景】 1、 页面部分内容需要瀑布流格式展示&#xff0c;在使用lsit列表组件设置columns进行多列渲染时&#xff0c;此时在里面加入刷新动画时&#xff0c;动画只占了list组件的一列&#xff0c;并没有完全占据一行宽度&…

“解锁IDEA的潜力:高级Java Maven项目配置指南”

目录 前言&#xff1a;流程目录&#xff1a;1.确保Java和Maven已安装检查Java是否已正确安装并配置环境变量 2.创建一个新的Maven项目导航到要创建项目的目录配置Maven运行以下命令创建一个新的Maven项目 3.配置项目的pom.xml文件打开项目根目录下的pom.xml文件配置Web.xml 4.配…

案例13 Spring MVC参数传递案例

基于Spring MVC实现HttpServletRequest、基本数据类型、Java Bean、数组、List、Map、JSON方式的参数传递。 1. 创建项目 选择Maven快速构建web项目&#xff0c;项目名称为case13-springmvc02。 2. 配置Maven依赖 <?xml version"1.0" encoding"UTF-8&quo…

Vue2(生命周期,列表排序,计算属性和监听器)

目录 前言一&#xff0c;生命周期1.1&#xff0c;生命周期函数简介1.2&#xff0c;Vue的初始化流程1.3,Vue的更新流程1.4&#xff0c; Vue的销毁流程1.5&#xff0c; 回顾生命周期1.,6&#xff0c;代码演示1.6-1&#xff0c;beforeCreate1.6-2&#xff0c;created1.6-3&#xf…

9.2.1Socket(UDP)

一.传输层: 1.UDP:无连接,不可靠,面向数据报,全双工. 2.TCP:有连接,可靠,面向字节流,全双工. 注意:这里的可不可靠是相对的,并且和安不安全无关. 二.UDP数据报套接字编程: 1.socket文件:表示网卡的这类文件. 2.DatagramPacket:表示一个UDP数据报. 三.代码实现: 1.回显服务…

Linux系列:从0到1用Docker部署springboot项目

目录 1.前提条件 2.编写DockerFile镜像文件 3.打包SpringBoot项目 4.通过软件Xftp进行传输&#xff08;*&#xff09; 1.点击“文件-新建”​编辑 5.操作远程主机 1.docker构建 2.容器运行 6.容器的关闭和删除 1.前提条件 Linux、docker、xftp的安装、一台可以访问的远…

自动驾驶——驶向未来的革命性技术

自动驾驶——驶向未来的革命性技术 自动驾驶的组件自动驾驶的优势自动驾驶的应用自动驾驶的未来中国的自动驾驶 自动驾驶是一种技术&#xff0c;它允许车辆在没有人类驾驶员的情况下自主地进行行驶。它利用各种传感器、计算机视觉、人工智能和机器学习算法来感知和理解周围环境…

Vim学习(三)—— Git Repo Gerrit

Git、Gerrit、Repo三者的概念及使用 三者各自作用&#xff1a; git&#xff1a;版本管理库&#xff0c;在git库中没有中心服务器的概念&#xff0c;真正的分布式。 repo&#xff1a;repo就是多个git库的管理工具。如果是多个git库同时管理&#xff0c;可以使用repo。当然使用…

Linux常见面试题,应对面试分享

操作系统基础 1.cpu占⽤率太⾼了怎么办? 排查思路是什么&#xff0c;怎么定位这个问题&#xff0c;处理流程 其他程序: 1.通过top命令按照CPU使⽤率排序找出占⽤资源最⾼的进程 2.lsof查看这个进程在使⽤什么⽂件或者有哪些线程 3.询问开发或者⽼⼤,是什么业务在使⽤这个进程…

期权定价模型系列【2】—期权的希腊字母计算及应用

本篇文章旨在介绍期权常见希腊字母的计算及应用 本专栏更多侧重于理论及文字方面的展示&#xff0c;文章具体的代码可以参考我的另一个专栏【期权量化】。 【期权量化】专栏有同名文章&#xff0c;并且给出了文章的具体代码。 专栏地址&#xff1a; http://t.csdn.cn/Y30Hk…

接口测试自动化:简化测试流程,提升效率

接口测试自动化&#xff1a;简化测试流程&#xff0c;提升效率 什么是接口测试自动化&#xff1f; 接口测试自动化是指使用特定的工具和技术来自动化执行接口测试的过程。通过编写脚本&#xff0c;自动化工具可以模拟用户与软件系统的交互&#xff0c;验证接口的功能和性能。…

启动springboot,出现Unable to start embedded Tomcat

报错信息 org.apache.catalina.core.ContainerBase : A child container failed during startjava.util.concurrent.ExecutionException: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Tomcat].StandardHost[localhost].TomcatEmbedd…