Kafa分区策略实现

引言

Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。

轮询策略(默认)

  • 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时)。生产者会按照顺序依次将消息发送到各个分区中,确保每个分区都能均匀地接收到消息,从而实现负载均衡。简单高效,能使各个分区的消息量相对均衡,充分利用每个分区的存储和处理能力。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class RoundRobinProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    随机策略

  • 随机策略会随机地将消息分配到一个分区中。这种策略在某些情况下可以实现一定程度的负载均衡,但由于是随机分配,可能会导致分区之间的消息分布不够均匀。可以通过自定义分区器来实现随机策略。
  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    
    public class RandomPartitioner implements Partitioner {
        private final Random random = new Random();
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            return random.nextInt(partitions.size());
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }
    
    // 使用随机分区器的生产者示例
    public class RandomProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "RandomPartitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    按键哈希策略

  • 当消息指定了键时,Kafka 会根据键的哈希值将消息分配到特定的分区中。相同键的消息会被分配到同一个分区,这有助于保证具有相同业务逻辑的消息顺序性。可以保证消息的局部有序性,例如在处理用户相关的消息时,将同一个用户的消息发送到同一个分区,方便后续的处理和分析。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    
    public class KeyBasedProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

    自定义分区策略(实现接口)

  • 当上述默认策略无法满足业务需求时,可以自定义分区策略。通过实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法来实现自定义的分区逻辑。例如,根据消息的某些特定字段(如时间、地理位置等)来进行分区,以满足特定的业务需求。

  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    
    public class CustomPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            // 自定义分区逻辑,这里简单示例根据消息值的长度分区
            String message = (String) value;
            return message.length() % partitions.size();
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }
    
    // 使用自定义分区器的生产者示例
    public class CustomProducer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("partitioner.class", "CustomPartitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);
                producer.send(record);
            }
            producer.close();
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/961883.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Hive:窗口函数(1)

窗口函数 窗口函数OVER()用于定义一个窗口&#xff0c;该窗口指定了函数应用的数据范围 对窗口数据进行分区 partition by 必须和over () 一起使用, distribute by经常和sort by 一起使用,可以不和over() 一起使用.DISTRIBUTE BY决定了数据如何分布到不同的Reducer上&#xf…

多目标优化策略之一:非支配排序

多目标优化策略中的非支配排序是一种关键的技术,它主要用于解决多目标优化问题中解的选择和排序问题,确定解集中的非支配解(也称为Pareto解)。 关于什么是多目标优化问题,可以查看我的文章:改进候鸟优化算法之五:基于多目标优化的候鸟优化算法(MBO-MO)-CSDN博客 多目…

神经网络和深度学习

应用 类型 为什么近几年飞速发展 数据增长&#xff0c;算力增长&#xff0c;算法革新 逻辑回归 向量化 浅层神经网络(Shallow neural network) 单条训练数据前向传播计算表达式 batch训练数据前向传播计算表达式 反向传播计算表达式 参数随机初始化 不能全部设为0 原因是同一…

生成模型:扩散模型(DDPM, DDIM, 条件生成)

扩散模型的理论较为复杂&#xff0c;论文公式与开源代码都难以理解。现有的教程大多侧重推导公式。为此&#xff0c;本文通过精简代码&#xff08;约300行&#xff09;&#xff0c;从代码运行角度讲解扩散模型。 本文包括扩散模型的3项技术复现&#xff1a; 1.DDPM (Denoising…

浅谈网络 | 容器网络之Flannel

目录 云原生网络架构深度解构&#xff1a;Flannel的设计哲学与实现机制Flannel架构解析&#xff1a;三层核心设计原则UDP模式&#xff08;用户态隧道&#xff09;VXLAN模式&#xff08;内核态隧道&#xff09;Host-GW模式&#xff08;直连路由&#xff09; 生产环境架构选型与调…

新鲜速递:DeepSeek-R1开源大模型本地部署实战—Ollama + MaxKB 搭建RAG检索增强生成应用

在AI技术快速发展的今天&#xff0c;开源大模型的本地化部署正在成为开发者们的热门实践方向。最火的莫过于吊打OpenAI过亿成本的纯国产DeepSeek开源大模型&#xff0c;就在刚刚&#xff0c;凭一己之力让英伟达大跌18%&#xff0c;纳斯达克大跌3.7%&#xff0c;足足是给中国AI产…

用BGP的路由聚合功能聚合大陆路由,效果显著不?

正文共&#xff1a;666 字 11 图&#xff0c;预估阅读时间&#xff1a;1 分钟 之前我们统计过中国境内的IP地址和路由信息&#xff08;你知道中国大陆一共有多少IPv4地址吗&#xff1f;&#xff09;&#xff0c;不过数量比较多&#xff0c;有8000多条。截止到2021年底&#xff…

AI DeepSeek-R1 Windos 10 环境搭建

1、安装&#xff1a; 下载 Python |Python.org CUDA Drivers for MAC Archive | NVIDIA pip 和virtualenv Download Ollama on Windows 如下图 2、下载模型 deepseek-r1 ollama run deepseek-r1 或者可以ollama run deepseek-r1:8b 或 3、安装一个可视化对话Chatbox 下载 …

SOME/IP--协议英文原文讲解4

前言 SOME/IP协议越来越多的用于汽车电子行业中&#xff0c;关于协议详细完全的中文资料却没有&#xff0c;所以我将结合工作经验并对照英文原版协议做一系列的文章。基本分三大块&#xff1a; 1. SOME/IP协议讲解 2. SOME/IP-SD协议讲解 3. python/C举例调试讲解 4.1.3 End…

工作总结:git篇

文章目录 前言基础Gerrit1.克隆2.新建本地分支和checkout3.添加到暂存区新增文件到暂存区修改已经添加到暂存区的文件取消添加到暂存区的文件 4.提交到本地仓库在不重复提交的情况下&#xff0c;修改本次提交 5.提交到远程仓库6.评审其他辅助命令 前言 目前也算是工作一段时间…

SpringCloud系列教程:微服务的未来(十七)监听Nacos配置变更、更新路由、实现动态路由

前言 在微服务架构中&#xff0c;API 网关是各个服务之间的入口点&#xff0c;承担着路由、负载均衡、安全认证等重要功能。为了实现动态的路由配置管理&#xff0c;通常需要通过中心化的配置管理系统来实现灵活的路由更新&#xff0c;而无需重启网关服务。Nacos 作为一个开源…

WireShark4.4.2浏览器网络调试指南:数据统计(八)

概述 Wireshark 是一款功能强大的开源网络协议分析软件&#xff0c;被广泛应用于网络调试和数据分析。随着互联网的发展&#xff0c;以及网络安全问题日益严峻&#xff0c;了解如何使用 Wireshark进行浏览器网络调试显得尤为重要。最新的 Wireshark4.4.2 提供了更加强大的功能…

【2025年数学建模美赛E题】(农业生态系统)完整解析+模型代码+论文

生态共生与数值模拟&#xff1a;生态系统模型的物种种群动态研究 摘要1Introduction1.1Problem Background1.2Restatement of the Problem1.3Our Work 2 Assumptions and Justifications3 Notations4 模型的建立与求解4.1 农业生态系统模型的建立与求解4.1.1 模型建立4.1.2求解…

Eureka 服务注册和服务发现的使用

1. 父子工程的搭建 首先创建一个 Maven 项目&#xff0c;删除 src &#xff0c;只保留 pom.xml 然后来进行 pom.xml 的相关配置 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xs…

【PowerShell专栏】实现Terminal工具的安装

微软已经发布了Windows Terminal的版本,提供各个命令操作工具的集成环境,在Windows Terminal中,我们可以集中实现多页签的各个命令方式,相比传统的命令执行分离,着实方便了不少。图为Terminal 界面: 如何实现Windows Terminal的安装呢?有好几种方式可以实现Windows Term…

从0到1:C++ 开启游戏开发奇幻之旅(二)

目录 游戏开发核心组件设计 游戏循环 游戏对象管理 碰撞检测 人工智能&#xff08;AI&#xff09; 与物理引擎 人工智能 物理引擎 性能优化技巧 内存管理优化 多线程处理 实战案例&#xff1a;开发一个简单的 2D 射击游戏 项目结构设计 代码实现 总结与展望 游戏…

九大服务构建高效 AIOps 平台,全面解决GenAI落地挑战

最近,DevOps运动的联合创始人Patrick Debois分享了他对AI平台与软件研发关系的深刻见解,让我们一起来探讨这个话题。 在AI的落地过程中,我们面临着两个主要难题: 引入AI编码工具后的开发者角色转变:随着像GitHub Copilot这样的AI工具的普及,工程师的角色正在发生深刻变革…

LangChain概述

文章目录 为什么需要LangChainLLM应用开发的最后1公里LangChain的2个关键词LangChain的3个场景LangChain的6大模块 为什么需要LangChain 首先想象一个开发者在构建一个LLM应用时的常见场景。当你开始构建一个新项目时&#xff0c;你可能会遇到许多API接口、数据格式和工具。对于…

【浏览器 - Mac实时调试iOS手机浏览器页面】

最近开发个项目&#xff0c;需要在 Mac 电脑上调试 iOS 手机设备上的 Chrome 浏览器&#xff0c;并查看Chrome网页上的 console 信息&#xff0c;本来以为要安装一些插件&#xff0c;没想到直接使用Mac上的Safari 直接可以调试&#xff0c;再此记录下&#xff0c;分享给需要的伙…

【Rust自学】15.4. Drop trait:告别手动清理,释放即安全

喜欢的话别忘了点赞、收藏加关注哦&#xff08;加关注即可阅读全文&#xff09;&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 15.4.1. Drop trait的意义 类型如果实现了Drop trait&#xff0c;就可以让程序员自定义当值…