【Kafka】Kafka Producer 分区-05

【Kafka】Kafka Producer 分区-05

  • 1. 分区的好处
  • 2. 分区策略
    • 2.1 默认的分区器 DefaultPartitioner
  • 3. 自定义分区器

1. 分区的好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

在这里插入图片描述

see 2.4.1http://t.csdnimg.cn/m1O9u

2. 分区策略

2.1 默认的分区器 DefaultPartitioner

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
 … …
}

策略如下:

  1. 如果记录中指定了分区,使用指定的分区。
    这意味着生产者在发送消息时明确指定了一个分区号,Kafka将直接使用这个分区。
  2. 如果没有指定分区但存在键,根据键的哈希值选择分区。
    如果消息中没有明确指定分区,但是提供了一个键,Kafka会使用键的哈希值来计算应该使用哪个分区。这可以保证具有相同键的消息被发送到相同的分区,从而保证消息的有序性。
  3. 如果既没有指定分区也没有键,选择一个“sticky partition”(粘性分区),在批次满时更换分区。
    如果消息既没有指定分区,也没有键,Kafka将使用“sticky partition”策略。这种策略会在消息批次满时更换分区,以便于提高效率和性能。

这个注释是对DefaultPartitioner类的说明,该类实现了Partitioner接口,用于定义消息分区的策略。DefaultPartitioner类的主要功能就是根据上述规则决定消息发送到哪个分区。以下是DefaultPartitioner类的示例实现结构:

public class DefaultPartitioner implements Partitioner {
    // 初始化方法
    @Override
    public void configure(Map<String, ?> configs) {
        // 配置代码
    }

    // 计算分区的方法
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 分区计算逻辑
    }

    // 清理资源的方法
    @Override
    public void close() {
        // 资源清理代码
    }
}

该实现中包括了三个主要的方法:

  • configure(Map<String, ?> configs):用于初始化和配置分区器。
  • partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster):用于根据给定的主题、键和值来计算应该使用哪个分区。
  • close():用于在分区器关闭时清理资源。

在这里插入图片描述

案例一: 将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
public class CustomProducerCallbackPartitions {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

		// 2. 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102
:9092 ");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {
            // 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
            kafkaProducer.send(new ProducerRecord<>("first",
                    1, "", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

案例二: 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

public class CustomProducerCallback {
    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {
            // 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
            kafkaProducer.send(new ProducerRecord<>("first",
                    "a", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

3. 自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器

  1. 需求
    例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区。

  2. 实现步骤
    定义类实现 Partitioner 接口
    重写 partition()方法

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 1. 实现接口 Partitioner
 * 2. 实现 3 个方法:partition,close,configure
 * 3. 编写 partition 方法,返回分区号
 */
public class MyPartitioner implements Partitioner {
    /**
     * 返回信息对应的分区
     *
     * @param topic 主题
     * @param key 消息的 key
     * @param keyBytes 消息的 key 序列化后的字节数组
     * @param value 消息的 value
     * @param valueBytes 消息的 value 序列化后的字节数组
     * @param cluster 集群元数据可以查看分区信息
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[]
            keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取消息
        String msgValue = value.toString();
        // 创建 partition
        int partition;
        // 判断消息是否包含 atguigu
        if (msgValue.contains("atguigu")) {
            partition = 0;
        } else {
            partition = 1;
        }
        // 返回分区号
        return partition;
    }

    // 关闭资源
    @Override
    public void close() {
    }

    // 配置方法
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

使用分区器的方法,在生产者的配置中添加分区器参数。

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CustomProducerCallbackPartitions {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        // 添加自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atgui
                gu.kafka.producer.MyPartitioner");
                KafkaProducer < String, String > kafkaProducer = new
                        KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {

            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

测试
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/711300.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《幻影大师:透视缠中说禅的虚像与真相》

而且他从不犯错&#xff0c;至少在他的叙述中是这样&#xff0c;所有的文章和言论都被粉饰得完美无瑕&#xff0c;即便有误&#xff0c;他也绝不公开承认&#xff0c;更别提什么真诚的道歉和改正了。那些对他推崇备至的人&#xff0c;多是盲目追随&#xff0c;将他神化为无所不…

YOLOv8可视化界面PYQT5

yolov8&#xff0c;可视化界面pyqt。支持图片检测&#xff0c;视频检测&#xff0c;摄像头检测等&#xff0c;实时显示检测画面。支持自定义数据集&#xff0c;计数&#xff0c;fps展示……,即插即用&#xff0c;无需更改太多代码

记一次全设备通杀未授权RCE的挖掘经历

想来上一次挖洞还在一年前的大一下&#xff0c;然后就一直在忙活写论文&#xff0c;感觉挺枯燥的&#xff08;可能是自己不太适合弄学术吧QAQ&#xff09;&#xff0c;所以年初1~2月的时候&#xff0c;有空的时候就又会挖一挖国内外各大知名厂商的设备&#xff0c;拿了几份思科…

---String类---

在c语言中要使用字符串&#xff0c;只能通过字符指针或者字符数组&#xff0c;然后再通过函数进行各种操作&#xff0c;这种将变量和变量方法分开的方式显然不符合面向对象的编程&#xff0c;所以java中添加了String这个类 String类构造 而对于string有很多的方法 字符串长度…

UWB技术定位系统源码,智慧工厂人员定位系统,独特的射频处理,配合先进的位置算法

UWB技术定位系统源码&#xff0c;高精度人员定位系统源码&#xff0c;智慧工厂人员定位系统源码&#xff0c;室内定位系统源码 本套系统运用UWB定位技术&#xff0c;开发的高精度人员定位系统&#xff0c;通过独特的射频处理&#xff0c;配合先进的位置算法&#xff0c;可以有…

结构体对齐,与 触发 segment fault 为什么是 1024*132 ,而不是1024*128

1, 简单的小示例代码 按理说 malloc 的size 是 1024*128&#xff0c;这里却需要 1024*132才能及时触发 segmentation fault #include <stdlib.h> #include <stdio.h> #define SIZE 1024*131int main() {char *p 0;p malloc(SIZE);p[SIZE -1] a;free(p);printf(…

【Mongodb-02】springboot整合mongodb(详解)

springBoot整和mongodb 一&#xff0c;springboot整合mongodb1&#xff0c;依赖加入2&#xff0c;yml文件配置3&#xff0c;_class 字段过滤(可选)4&#xff0c;实体类定义5&#xff0c;索引创建6&#xff0c;数据插入6.1&#xff0c;insert方式6.2&#xff0c;使用save的方式实…

Elixir学习笔记——输入输出和文件系统

本章介绍输入/输出机制、文件系统相关任务以及相关模块&#xff08;如 IO、File 和 Path&#xff09;。IO 系统提供了一个很好的机会来阐明 Elixir 和 Erlang VM 的一些思维模式和新奇思想。 输入输出模块 输入输出模块是 Elixir 中读写标准输入/输出 (:stdio)、标准错误 (:s…

Linux 终端窗口设置为透明

Linux 终端窗口设置为透明 打开终端 右键鼠标 选择Profile Preferences 点击Background 选择 Transparent background 拖动滑条调整透明度 完成。

【机器学习】集成学习方法:Bagging与Boosting的应用与优势

&#x1f525; 个人主页&#xff1a;空白诗 文章目录 引言一、集成学习的定义二、Bagging方法1. 随机森林&#xff08;Random Forest&#xff09;2. 其他Bagging方法 二、Boosting方法1. 梯度提升树&#xff08;Gradient Boosting Machine, GBM&#xff09;解释GBM的基本原理和…

笔记本开机原理

从按下开机键开始&#xff0c;机器是如何开到OS的呢&#xff1f;今天这篇文章和大家极少EC-BIOS-OS的整个开机流程。首先大家要对笔记本的基本架构有所了解&#xff0c;基本架构如下图所示&#xff08;主要组成部分为大写黑体内容&#xff09;。 一、按下PowerButton按钮&#…

手把手带你搞定用户权限控制 | 纯干货

在实际的软件项目开发过程中&#xff0c;用户权限控制可以说是所有运营系统中必不可少的一个重点功能&#xff0c;根据业务的复杂度&#xff0c;设计的时候可深可浅&#xff0c;但无论怎么变化&#xff0c;设计的思路基本都是围绕着用户、角色、菜单这三个部分展开。 如何设计…

Matlab的Simulink系统仿真(simulink调用m函数)

这几天要用Simulink做一个小东西&#xff0c;所以在网上现学现卖&#xff0c;加油&#xff01; 起初的入门是看这篇文章MATLAB 之 Simulink 操作基础和系统仿真模型的建立_matlab仿真模型搭建-CSDN博客 写的很不错 后面我想在simulink中调用m文件 在 Simulink 中调用 MATLA…

Git 基础操作(一)

Git 基础操作 配置Git 安装完Git后&#xff0c;首先要做的事情是设置你的 用户名 和 e-mail 地址。这样在你向仓库提交代码的时候&#xff0c;就知道是谁提交的&#xff0c;以及提交人的联系方式。 配置用户名和邮箱 使用git config [--global] user.name "你的名字&qu…

失眠焦虑?这些维生素或许能帮你找回好眠!

&#x1f4a4; 失眠、焦虑&#xff0c;是现代生活中不少人都可能遇到的问题。长期的失眠与焦虑&#xff0c;不仅影响身体健康&#xff0c;更会对精神状态造成不小的冲击。其实&#xff0c;除了调整作息和放松心情&#xff0c;适当的维生素补充也可能有助于改善这些症状。 &…

SpringCloud-远程调用OpenFeign-基本使用

目录 1 直接使用RestTemplate发起Http请求 1.1 将RestTemplate注册为SpringBean 1.2 在service实现类中注入RestTemplate 1.3 使用注入的RestTemplate 传入参数后发起http请求 2 引入Nacos后使用RestTemplate发起Http请求 2.1 基础知识 2.2 Nacos的使用 2.2.1 引入nac…

Hvv--知攻善防应急响应靶机--Linux2

HW–应急响应靶机–Linux2 所有靶机均来自 知攻善防实验室 靶机整理&#xff1a; 夸克网盘&#xff1a;https://pan.quark.cn/s/4b6dffd0c51a#/list/share百度云盘&#xff1a;https://pan.baidu.com/s/1NnrS5asrS1Pw6LUbexewuA?pwdtxmy 官方WP&#xff1a;https://mp.weixin.…

Asp.Net Core 读取配置接口 IOptions、IOptionsMonitor以及IOptionsSnapshot

&#x1f340;简介 Options是.net Core Web api框架自带的功能&#xff0c;Options模式通过定义强类型的类来表示相关配置设置的集合&#xff0c;使得配置管理更为结构化和类型安全。 IOptions、IOptionsMonitor和IOptionsSnapshot是用于处理配置的依赖注入接口。这些接口允许…

Jenkins三种构建类型

目录 传送门前言一、概念二、前置处理&#xff08;必做&#xff09;1、赋予777权限2、让jenkins用户拥有root用户的kill权限3、要运行jar包端口号需要大于1024 三、自由风格软件项目&#xff08;FreeStyle Project&#xff09;&#xff08;推荐&#xff09;三、Maven项目&#…

【YashanDB知识库】PHP使用OCI接口使用数据库绑定参数功能异常

【问题分类】驱动使用 【关键字】OCI、驱动使用、PHP 【问题描述】 PHP使用OCI8连接yashan数据库&#xff0c;使用绑定参数获取数据时&#xff0c;出现报错 如果使用PDO_OCI接口连接数据库&#xff0c;未弹出异常&#xff0c;但是无法正确获取数据 【问题原因分析】 开启O…