Kafka分区策略实现

引言

Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。

轮询策略(默认)

  • 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时)。生产者会按照顺序依次将消息发送到各个分区中,确保每个分区都能均匀地接收到消息,从而实现负载均衡。简单高效,能使各个分区的消息量相对均衡,充分利用每个分区的存储和处理能力。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class RoundRobinProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    随机策略

  • 随机策略会随机地将消息分配到一个分区中。这种策略在某些情况下可以实现一定程度的负载均衡,但由于是随机分配,可能会导致分区之间的消息分布不够均匀。可以通过自定义分区器来实现随机策略。
  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    
    public class RandomPartitioner implements Partitioner {
        private final Random random = new Random();
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            return random.nextInt(partitions.size());
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }
    
    // 使用随机分区器的生产者示例
    public class RandomProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "RandomPartitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    按键哈希策略

  • 当消息指定了键时,Kafka 会根据键的哈希值将消息分配到特定的分区中。相同键的消息会被分配到同一个分区,这有助于保证具有相同业务逻辑的消息顺序性。可以保证消息的局部有序性,例如在处理用户相关的消息时,将同一个用户的消息发送到同一个分区,方便后续的处理和分析。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class KeyBasedProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    自定义分区策略(实现接口)

  • 当上述默认策略无法满足业务需求时,可以自定义分区策略。通过实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法来实现自定义的分区逻辑。例如,根据消息的某些特定字段(如时间、地理位置等)来进行分区,以满足特定的业务需求。

  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    
    public class CustomPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            // 自定义分区逻辑,这里简单示例根据消息值的长度分区
            String message = (String) value;
            return message.length() % partitions.size();
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }
    
    // 使用自定义分区器的生产者示例
    public class CustomProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "CustomPartitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/962968.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络工程师 (11)软件生命周期与开发模型

一、软件生命周期 前言 软件生命周期&#xff0c;也称为软件开发周期或软件开发生命周期&#xff0c;是指从软件项目的启动到软件不再被使用为止的整个期间。这个过程可以细分为多个阶段&#xff0c;每个阶段都有其特定的目标、任务和产出物。 1. 问题定义与需求分析 问题定义…

深度学习练手小例子——cifar10数据集分类问题

CIFAR-10 是一个经典的计算机视觉数据集&#xff0c;广泛用于图像分类任务。它包含 10 个类别的 60,000 张彩色图像&#xff0c;每张图像的大小是 32x32 像素。数据集被分为 50,000 张训练图像和 10,000 张测试图像。每个类别包含 6,000 张图像&#xff0c;具体类别包括&#x…

力扣257. 二叉树的所有路径(遍历思想解决)

Problem: 257. 二叉树的所有路径 文章目录 题目描述思路复杂度Code 题目描述 思路 遍历思想(利用二叉树的先序遍历) 利用先序遍历的思想&#xff0c;我门用一个List变量path记录当前先序遍历的节点&#xff0c;当遍历到根节点时&#xff0c;将其添加到另一个List变量res中&…

力扣第149场双周赛

文章目录 题目总览题目详解找到字符串中合法的相邻数字重新安排会议得到最多空余时间I 第149场双周赛 题目总览 找到字符串中合法的相邻数字 重新安排会议得到最多空余时间I 重新安排会议得到最多空余时间II 变成好标题的最少代价 题目详解 找到字符串中合法的相邻数字 思…

算法题(54):插入区间

审题&#xff1a; 需要我们把newinterval的区间与interval的区间合并起来&#xff0c;并返回合并后的二维数组地址 思路&#xff1a; 方法一&#xff1a;排序合并区间 我们可以先把newinterval插入到interval中&#xff0c;进行排序然后复用合并区间的代码 方法二&#xff1a;模…

网工_HDLC协议

2025.01.25&#xff1a;网工老姜学习笔记 第9节 HDLC协议 9.1 HDLC高级数据链路控制9.2 HDLC帧格式&#xff08;*控制字段&#xff09;9.2.1 信息帧&#xff08;承载用户数据&#xff0c;0开头&#xff09;9.2.2 监督帧&#xff08;帮助信息可靠传输&#xff0c;10开头&#xf…

[免费]微信小程序智能商城系统(uniapp+Springboot后端+vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序智能商城系统(uniappSpringboot后端vue管理端)&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序智能商城系统(uniappSpringboot后端vue管理端) Java毕业设计_哔哩哔哩_bilibili 项目介绍…

nth_element函数——C++快速选择函数

目录 1. 函数原型 2. 功能描述 3. 算法原理 4. 时间复杂度 5. 空间复杂度 6. 使用示例 8. 注意事项 9. 自定义比较函数 11. 总结 nth_element 是 C 标准库中提供的一个算法&#xff0c;位于 <algorithm> 头文件中&#xff0c;用于部分排序序列。它的主要功能是将…

CF 581A.Vasya the Hipster(Java实现)

题目分析 红色袜子数量a&#xff0c;蓝色袜子数量b&#xff0c;题目是个潮哥儿&#xff0c;首先选择两种袜子混搭&#xff0c;搭不出来就纯色 思路分析 混搭数量取决于最小数量&#xff0c;剩余的纯色数量取决于哪个还有剩余且数量要/2 代码 import java.util.*;public class…

C基础寒假练习(6)

一、终端输入行数&#xff0c;打印倒金字塔 #include <stdio.h> int main() {int rows;printf("请输入倒金字塔的行数: ");scanf("%d", &rows);for (int i rows; i > 0; i--) {// 打印空格for (int j 0; j < rows - i; j) {printf(&qu…

Python在线编辑器

from flask import Flask, render_template, request, jsonify import sys from io import StringIO import contextlib import subprocess import importlib import threading import time import ast import reapp Flask(__name__)RESTRICTED_PACKAGES {tkinter: 抱歉&…

ASP.NET Core 中间件

目录 一、常见的内置中间件 二、自定义中间件 三、中间件的执行顺序 四、其他自动逸中间件案例 1. 身份验证中间件 2、跨域中间件&#xff08;CORS&#xff09; ASP.NET Core 中&#xff0c;中间件&#xff08;Middleware&#xff09;是处理 HTTP 请求和响应的组件链。你…

LevelDB 源码阅读:写入键值的工程实现和优化细节

读、写键值是 KV 数据库中最重要的两个操作&#xff0c;LevelDB 中提供了一个 Put 接口&#xff0c;用于写入键值对。使用方法很简单&#xff1a; leveldb::Status status leveldb::DB::Open(options, "./db", &db); status db->Put(leveldb::WriteOptions…

2007-2019年各省科学技术支出数据

2007-2019年各省科学技术支出数据 1、时间&#xff1a;2007-2019年 2、来源&#xff1a;国家统计局、统计年鉴 3、指标&#xff1a;行政区划代码、地区名称、年份、科学技术支出 4、范围&#xff1a;31省 5、指标解释&#xff1a;科学技术支出是指为促进科学研究、技术开发…

2025年1月22日(网络编程 udp)

系统信息&#xff1a; ubuntu 16.04LTS Raspberry Pi Zero 2W 系统版本&#xff1a; 2024-10-22-raspios-bullseye-armhf Python 版本&#xff1a;Python 3.9.2 已安装 pip3 支持拍摄 1080p 30 (1092*1080), 720p 60 (1280*720), 60/90 (640*480) 已安装 vim 已安装 git 学习…

如何对系统调用进行扩展?

扩展系统调用是操作系统开发中的一个重要任务。系统调用是用户程序与操作系统内核之间的接口,允许用户程序执行内核级操作(如文件操作、进程管理、内存管理等)。扩展系统调用通常包括以下几个步骤: 一、定义新系统调用 扩展系统调用首先需要定义新的系统调用的功能。系统…

当卷积神经网络遇上AI编译器:TVM自动调优深度解析

从铜线到指令&#xff1a;硬件如何"消化"卷积 在深度学习的世界里&#xff0c;卷积层就像人体中的毛细血管——数量庞大且至关重要。但鲜有人知&#xff0c;一个简单的3x3卷积在CPU上的执行路径&#xff0c;堪比北京地铁线路图般复杂。 卷积的数学本质 对于输入张…

深度学习的应用

目录 一、机器视觉 1.1 应用场景 1.2 常见的计算机视觉任务 1.2.1 图像分类 1.2.2 目标检测 1.2.3 图像分割 二、自然语言处理 三、推荐系统 3.1 常用的推荐系统算法实现方案 四、图像分类实验补充 4.1 CIFAR-100 数据集实验 实验代码 4.2 CIFAR-10 实验代码 深…

Flutter常用Widget小部件

小部件Widget是一个类&#xff0c;按照继承方式&#xff0c;分为无状态的StatelessWidget和有状态的StatefulWidget。 这里先创建一个简单的无状态的Text小部件。 Text文本Widget 文件&#xff1a;lib/app/app.dart。 import package:flutter/material.dart;class App exte…

mysqldump+-binlog增量备份

注意&#xff1a;二进制文件删除必须使用help purge 不可用rm -f 会崩 一、概念 增量备份&#xff1a;仅备份上次备份以后变化的数据 差异备份&#xff1a;仅备份上次完全备份以后变化的数据 完全备份&#xff1a;顾名思义&#xff0c;将数据完全备份 其中&#xff0c;…