Kafka Producer发送消息流程之分区器和数据收集器

文章目录

  • 1. Partitioner分区器
  • 2. 自定义分区器
  • 3. RecordAccumulator数据收集器

1. Partitioner分区器

在这里插入图片描述

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,

int partition = partition(record, serializedKey, serializedValue, cluster);

private final Partitioner partitioner;

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        //当record的分区已存在,则直接返回,这对应了创建Record时可以手动传入partition参数
        if (record.partition() != null)
            return record.partition();

        // 如果存在partitioner分区器,则使用Partitioner.partition方法计算分区数据
        if (partitioner != null) {
            int customPartition = partitioner.partition(
                record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
            if (customPartition < 0) {
                throw new IllegalArgumentException(String.format(
                    "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
            }
            return customPartition;
        }


        // 如果没有分区器的情况
        if (serializedKey != null && !partitionerIgnoreKeys) {
            // hash the keyBytes to choose a partition
            return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
        } else {
            return RecordMetadata.UNKNOWN_PARTITION;
        }
    }


// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
        return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
    }

2. 自定义分区器

新建类实现Partitioner接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。

public class MyKafkaPartitioner implements Partitioner {
    @Override
    public int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        // Ensure the key is a non-null string
        if (key == null || !(key instanceof String)) {
            throw new IllegalArgumentException("Key must be a non-null String");
        }

        // Parse the key as an integer
        int keyInt;
        try {
            keyInt = Integer.parseInt((String) key);
        } catch (NumberFormatException e) {
            throw new IllegalArgumentException("Key must be a numeric string", e);
        }

        // Determine the partition based on the key's odd/even nature
        if (keyInt % 2 == 0) {
            return 1; // Even keys go to partition 2
        } else {
            return 0; // Odd keys go to partition 0
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

新建一个存在多分区的Topic。

在这里插入图片描述

public class KafkaProducerPartitionorTest {
    public static void main(String[] args) throws InterruptedException {
        //创建producer
        HashMap<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //指定拦截器
        config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());
        //指定分区器
        config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

        for (int i = 0; i < 10; i++) {
            //创建record
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                    "test1",
                    "key"+i,
                    "我是你爹"+i
            );
            //发送record
            producer.send(record);
            Thread.sleep(500);
        }

        //关闭producer
        producer.close();
    }
}

配置好PARTITIONER_CLASS_CONFIG后发送消息。
在这里插入图片描述
在这里插入图片描述

可以分区器成功起作用了。

3. RecordAccumulator数据收集器

通过数据校验后,数据从分区器来到数据收集器

数据收集器的工作机制

  1. 队列缓存RecordAccumulator为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size来调整。

  2. 缓冲区管理

    • 每个分区都有一个或多个批次,每个批次包含多条消息。
    • 当一个批次填满(即达到batch.size),或者达到发送条件(如linger.ms时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender线程。
  3. 满批次处理

    • 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足linger.ms的时间条件,RecordAccumulator会将该批次加入到一个待发送的队列中。
    • Sender线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/802657.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【XSS】

文章目录 0x01 简介0x02 XSS Payload用法XSS攻击平台及调试JavaScript 0x03 XSS构造技巧XSS漏洞防御策略 跨站脚本攻击&#xff0c;Cross Site Script。&#xff08;重点在于脚本script&#xff09; 分类 反射型、存储型DOM型 漏洞原理&#xff1a;通过插入script篡改“HTML”…

字节码编程bytebuddy之通过Advice动态修改方法参数值

写在前面 本文看下如何通过bytebuddy的advice切面技术来动态修改方法入参值。 1&#xff1a;程序 首先定义premain&#xff1a; package com.dahuyou.change.method.param;//import net.bytebuddy.agent.builder.AgentBuilder; import net.bytebuddy.agent.builder.AgentBu…

Java web从入门到精通 (第 2版)中文电子版

前言 《Java Web从入门到精通&#xff08;第2版&#xff09;》共分21章&#xff0c;包括Java Web应用开发概述、HTML与CSS网页开发基础、JavaScript脚本语言、搭建开发环境、JavaBean技术、Servlet技术、过滤器和监听器、Hibernate高级应用、Java Web的数据库操作、EL&#xf…

Linux 上 TTY 的起源

注&#xff1a;机翻&#xff0c;未校对。 What is a TTY on Linux? (and How to Use the tty Command) What does the tty command do? It prints the name of the terminal you’re using. TTY stands for “teletypewriter.” What’s the story behind the name of the co…

每日一题,力扣leetcode Hot100之49. 字母异位词分组

该题用哈希表解答&#xff0c;具有统一特征的作为哈希表的键名&#xff0c;然后满足要求的作为值 解法一&#xff1a; 我们将每个字符串进行排序&#xff0c;如果排序后的结果相同&#xff0c;则可以认为是字母异位词&#xff0c;我们将排序后的结果作为哈希表的key&#xff…

智能听诊器:宠物健康监测的革新者

宠物健康护理领域迎来了一项激动人心的技术革新——智能听诊器。这款创新设备以其卓越的精确度和用户友好的操作&#xff0c;为宠物主人提供了一种全新的健康监测方法。 使用智能听诊器时&#xff0c;只需将其放置在宠物身上&#xff0c;它便能立即捕捉到宠物胸腔的微小振动。…

S274多功能可编程RTU在智慧水务远程水质检测系统中的应用案例

钡铼第四代RTU S274作为一款多功能可编程的无线工业物联网数据监测采集控制短信报警终端&#xff0c;为智慧水务领域提供了强大的技术支持和解决方案。 技术概述与特点 钡铼S274基于UCOSII嵌入式实时操作系统&#xff0c;支持多种通信协议包括短信和MQTT&#xff0c;能够接入…

2024嘶吼网络安全产业图谱(高清完整版)

在数字化和智能化浪潮的推动下&#xff0c;网络安全产业正处于一个快速变革的时期。从传统的防御手段和被动的威胁应对&#xff0c;到如今主动预防和智能检测技术的普及&#xff0c;网络安全领域的焦点和需求正不断演进。为了更好的理解当前网络安全产业现状和未来发展方向&…

jeecgboot项目不知道什么原因启动出来8080端口后就不下去了,要等上10多分钟才出来接口地址等正常情况

因为这个项目license问题无法开源&#xff0c;更多技术支持与服务请加入我的知识星球。 1、项目中途不知道什么原因&#xff0c;就出现下面情况 具体如下&#xff1a; 2024-07-15 15:08:15.767 [main] [34mINFO [0;39m [36mliquibase.changelog:30[0;39m - Reading from jeec…

【LeetCode】十七、并查集

文章目录 1、并查集Union Find2、并查集find的优化&#xff1a;路径压缩 Quick find3、并查集union的优化&#xff1a;权重标记 1、并查集Union Find 并查集&#xff0c;一种树形的数据结构&#xff0c;处理不相交的两个集合的合并与查询问题。 【参考&#xff1a;&#x1f4…

优化 Java 数据结构选择与使用,提升程序性能与可维护性

优化 Java 数据结构选择与使用&#xff0c;提升程序性能与可维护性 引言 在软件开发中&#xff0c;数据结构的选择是影响程序性能、内存使用以及代码可维护性的关键因素之一。Java 作为一门广泛使用的编程语言&#xff0c;提供了丰富的内置数据结构&#xff0c;如数组、链表、…

python用selenium网页模拟时xpath无法定位元素解决方法2

有时我们在使用python selenium xpath时&#xff0c;无法定位元素&#xff0c;红字显示no such element。上一篇文章写了1种情况&#xff0c;是包含iframe的&#xff0c;详见https://blog.csdn.net/Sixth5/article/details/140342929。 本篇写第2种情况&#xff0c;就是xpath定…

嵌入式人工智能(9-基于树莓派4B的PWM-LED呼吸灯)

1、PWM简介 (1)、什么是PWM 脉冲宽度调制(PWM)&#xff0c;是英文“Pulse Width Modulation”的缩写&#xff0c;简称脉宽调制&#xff0c;是在具有惯性的系统中利用微处理器的数字输出来对模拟电路进行控制的一种非常有效的技术&#xff0c;广泛应用在从测量、通信到功率控制…

Open3D 最小二乘法拟合点云平面

目录 一、概述 1.1最小二乘法原理 1.2实现步骤 1.3应用场景 二、代码实现 2.1关键函数 2.2完整代码 三、实现效果 3.1原始点云 3.2matplotlib可视化 3.3平面拟合方程 前期试读&#xff0c;后续会将博客加入该专栏&#xff0c;欢迎订阅 Open3D点云算法与点云深度学习…

【内网穿透】打洞笔记

文章目录 前言原理阐述公网sshfrp转发服务 实现前提第一步&#xff1a;第二步第三步第四步 补充第五步&#xff08;希望隧道一直开着&#xff09;sftp传数据&#xff08;嫌云服务器上的网太慢&#xff09; 前言 租了一个云服务器&#xff0c;想用vscode的ssh远程连接&#xff…

Java面试八股之简述redis常见的性能问题和解决方案

简述redis常见的性能问题和解决方案 Redis作为一款高性能的键值存储系统&#xff0c;虽然设计用于提供低延迟的读写操作&#xff0c;但在特定场景下仍可能出现性能瓶颈。下面列出了一些常见的Redis性能问题及其解决方案&#xff1a; 常见性能问题 内存使用过高 Redis是内存…

支付宝低代码搭建电商小程序,无需编程,可视化操作

大家好&#xff0c;我是小悟 在数字化浪潮的推动下&#xff0c;为了更快速、高效地搭建电商小程序&#xff0c;支付宝低代码平台凭借其独特优势&#xff0c;为商家提供了便捷的解决方案。 支付宝低代码平台犹如一座精心打造的智慧工坊&#xff0c;让电商小程序的搭建变得轻而易…

生成树(STP)协议

一、生成树的技术背景 1、交换机单线路上链,存在单点故障,上行线路及设备都不具备冗余性,一旦链路或上行设备发生故障,网络将面临断网。 总结:以下网络不够健壮,不具备冗余性。 2、因此引入如下网络拓扑结构: 上述冗余拓扑能够解决单点故障问题,但同时冗拓扑也带来了…

多级表头固定列问题

父级的width&#xff0c;是需要固定的列的width的总和 参考&#xff1a; el-table 多级表头下对应列的固定