Kafka中的Partition详解与示例代码

在Apache Kafka中,Partition(分区)是一个关键的概念。分区的引入使得Kafka能够处理大规模数据,并提供高性能和可伸缩性。本文将深入探讨Kafka中的Partition,包括分区的作用、创建、配置以及一些实际应用中的示例代码。

Partition的作用

在Kafka中,Topic被分为一个或多个Partition。每个Partition是一个有序且不可变的消息序列,具有自己的唯一标识符(Partition ID)。分区的主要作用有:

  • 水平扩展性:通过将Topic划分为多个Partition,可以将消息分布到多个Broker上,实现水平扩展,提高整体吞吐量。

  • 并行处理:每个Partition可以在不同的消费者上并行处理,提高系统的处理能力。

  • 顺序性:在同一个Partition内,消息的顺序是有序的。这有助于确保一些需要顺序处理的场景,如日志记录。

创建与配置Partition

1 创建Topic时指定Partition数量

可以在创建Topic时指定Partition的数量。以下是一个使用命令行工具创建Topic并指定分区数量的示例:

bin/kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

这将创建一个名为my_topic的Topic,有3个分区。

2 动态调整Partition数量

Kafka支持在运行时动态调整Topic的Partition数量。以下是一个示例:

bin/kafka-topics.sh --alter --topic my_topic --partitions 5 --bootstrap-server localhost:9092

这将把my_topic的分区数增加到5。

3 Partition的配置选项

Partition还有一些配置选项,例如消息的保留时间、清理策略等。以下是一个设置消息保留时间的示例:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my_topic --alter --add-config retention.ms=86400000

这将设置my_topic的消息保留时间为1天(86400000毫秒)。

生产者与消费者操作分区

1 生产者发送消息到指定Partition

在生产者发送消息时,可以选择将消息发送到特定的Partition。以下是一个Java生产者示例代码:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(properties);

ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", 1, "key", "value");
producer.send(record);
producer.close();

这将消息发送到my_topic的第2个分区(分区编号从0开始)。

2 消费者订阅指定Partition

消费者可以选择订阅特定的Partition,也可以订阅整个Topic。以下是一个Java消费者示例代码:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "my_group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(properties);
TopicPartition partition = new TopicPartition("my_topic", 1);
consumer.assign(Collections.singletonList(partition));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
    }
}

这将使消费者仅订阅my_topic的第2个分区。

实际应用示例

1 数据分流

在实际应用中,数据分流是一种常见的需求,特别是在处理用户活动日志等场景。通过根据用户ID将日志分发到相同的分区,可以轻松实现对用户活动的精细化处理、分析和统计。下面是一个具体的数据分流示例:

创建Topic时指定Partition数量

首先,创建一个Topic,假设名为user_activity_logs,并指定适当的分区数量,例如5个分区:

bin/kafka-topics.sh --create --topic user_activity_logs --partitions 5 --replication-factor 2 --bootstrap-server localhost:9092
生产者发送用户活动日志

在实际场景中,应用程序可能会有不同的用户活动日志,例如登录、点击、购买等。以下是一个简化的Java生产者示例代码,用于向user_activity_logs Topic发送用户活动日志:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class UserActivityProducer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(properties);

        // 模拟不同用户的活动日志
        for (int i = 1; i <= 10; i++) {
            String userId = "user_" + i;
            String activity = "Login";  // 模拟用户登录活动,实际应用中可根据业务类型发送不同类型的活动日志

            // 根据用户ID计算分区
            int partition = userId.hashCode() % 5;

            ProducerRecord<String, String> record = new ProducerRecord<>("user_activity_logs", partition, userId, activity);
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.printf("Sent record to partition %d with offset %d%n", metadata.partition(), metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            });
        }

        producer.close();
    }
}

在这个示例中,模拟了10个用户的登录活动,并通过计算用户ID的哈希值来确定将活动发送到哪个分区。

消费者按用户ID订阅分区

消费者可以根据用户ID订阅相应的分区,以便按用户维度处理活动日志。以下是一个简化的Java消费者示例代码:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class UserActivityConsumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "user_activity_group");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 订阅指定分区
        TopicPartition partitionToSubscribe = new TopicPartition("user_activity_logs", 0);
        consumer.assign(Collections.singletonList(partitionToSubscribe));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received record with key %s and value %s from partition %d%n",
                        record.key(), record.value(), record.partition());
                // 在这里进行按用户ID处理的业务逻辑
            }
        }
    }
}

在实际应用中,可能会根据业务逻辑动态订阅多个分区,以处理更多用户的活动日志。

2 时间窗口统计

在实际应用中,按照时间窗口对数据进行分区是一种常见的做法,特别是对于具有时间戳的数据。这种分区方式使得可以方便地进行时间窗口内的统计、分析和处理。下面是一个具体的按时间窗口统计的示例:

创建Topic时指定Partition数量

首先,创建一个Topic,假设名为timestamped_data,并指定适当的分区数量,例如5个分区:

bin/kafka-topics.sh --create --topic timestamped_data --partitions 5 --replication-factor 2 --bootstrap-server localhost:9092
生产者发送带有时间戳的数据

在实际场景中,应用程序可能会产生带有时间戳的数据,例如传感器数据、日志等。以下是一个简化的Java生产者示例代码,用于向timestamped_data Topic发送带有时间戳的数据:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class TimestampedDataProducer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(properties);

        // 模拟带有时间戳的数据
        for (int i = 1; i <= 10; i++) {
            long timestamp = System.currentTimeMillis();
            String data = "Data_" + i;

            // 根据时间戳计算分区
            int partition = (int) (timestamp % 5);

            ProducerRecord<String, String> record = new ProducerRecord<>("timestamped_data", partition, String.valueOf(timestamp), data);
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.printf("Sent record to partition %d with offset %d%n", metadata.partition(), metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            });
        }

        producer.close();
    }
}

在这个示例中,模拟了10条带有时间戳的数据,并通过计算数据的时间戳来确定将数据发送到哪个分区。

消费者按时间窗口统计订阅分区

消费者可以根据时间戳订阅相应的分区,以便按时间窗口统计数据。以下是一个简化的Java消费者示例代码:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class TimeWindowedDataConsumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "time_windowed_data_group");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        Consumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 订阅指定分区
        TopicPartition partitionToSubscribe = new TopicPartition("timestamped_data", 0);
        consumer.assign(Collections.singletonList(partitionToSubscribe));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received record with key %s and value %s from partition %d%n",
                        record.key(), record.value(), record.partition());
                // 在这里进行按时间窗口统计的业务逻辑
            }
        }
    }
}

在实际应用中,你可能会根据业务逻辑动态订阅多个分区,以处理更多时间窗口的数据。

分区的内部工作原理

理解Partition在Kafka中的内部工作原理有助于更深入地使用和优化Kafka。以下是一些关键的工作原理:

1 分区的负载均衡

Kafka的生产者在将消息发送到分区时,使用分区键的哈希函数来决定消息应该被分配到哪个分区。这种哈希算法有助于实现负载均衡,确保消息均匀分布在所有分区上。

2 分区的数据保留和清理

每个分区都维护着自己的消息日志,Kafka支持配置消息的保留时间和大小。当消息达到指定的保留时间或大小时,Kafka会进行清理,删除过期的消息。这有助于控制存储占用和维持系统性能。

3 分区的复制机制

为了提高系统的可用性和容错性,Kafka使用分区的复制机制。每个分区可以有多个副本,分布在不同的Broker上。生产者向主分区发送消息,而主分区负责将消息复制到所有副本。这种设计保证了即使某个Broker发生故障,仍然可以从其他副本中获取数据。

性能调优与监控

1 监控工具

Kafka提供了一些监控工具,例如JConsole、Kafka Manager等,可以用于实时监控Kafka集群的状态。这些工具可以展示各个分区的吞吐量、偏移量、副本状态等信息,有助于及时发现和解决问题。

2 性能调优

性能调优是使用Kafka的关键一环。你可以通过调整Producer和Consumer的参数,以及适时更新Broker的配置来优化系统性能。例如,调整Producer的batch.sizelinger.ms参数以优化消息的批量发送,或者调整Consumer的max.poll.records参数以优化批量处理能力。

3 数据压缩

Kafka支持消息的压缩,可以通过配置Producer的compression.type参数来选择压缩算法。压缩可以降低网络传输成本,提高数据的传输效率。

properties.put("compression.type", "gzip");

总结

本文详细介绍了Kafka中的Partition概念,从创建、配置、内部工作原理到实际应用示例和性能调优等多个方面进行了深入的讨论。希望这些详细的示例代码和解释能够帮助大家更全面地理解和应用Kafka中的Partition。在实际应用中,根据业务需求和系统规模,可以灵活配置Partition以达到最佳性能和可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/221073.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++ day55 判断子序列 不同的子序列

题目1&#xff1a;392 判断子序列 题目链接&#xff1a;判断子序列 对题目的理解 判断字符串s是否为t的子序列 字符串s和字符串t的长度大于等于0&#xff0c;字符串s的长度小于等于字符串t的长度&#xff0c;本题其实和最长公共子序列的那道题很相似&#xff0c;相当于找两…

gitlab高级功能之容器镜像仓库

今天给大家介绍一个gitlab的高级功能 - Container Registry&#xff0c;该功能可以实现docker镜像的仓库功能&#xff0c;将gitlab上的代码仓的代码通过docker构建后并推入到容器仓库中&#xff0c;好处就是无需再额外部署一套docker仓库。 文章目录 1. 参考文档2. Container R…

yolov8添加ca注意力机制

创建文件 coordAtt.py 位置&#xff1a;ultralytics/nn/modules/coordAtt.py ###################### CoordAtt #### start by AI&CV ############################### # https://zhuanlan.zhihu.com/p/655475515 import torch import torch.nn as nn import t…

在Windows11(WSL)中如何迁移Docker

前言&#xff1a; 在Windows 10中Docker是默认安装到WSL中的&#xff0c;而安装到WSL中的任意分发版都是默认放在C盘中的。这样会让我们的C盘资源极度紧张&#xff0c;而且也限制了Docker的镜像数量。 迁移步骤 假设我有一个临时目录“D:\docker”用来存放临时文件&#xff0c;…

【开源】基于Vue和SpringBoot的在线课程教学系统

项目编号&#xff1a; S 014 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S014&#xff0c;文末获取源码。} 项目编号&#xff1a;S014&#xff0c;文末获取源码。 目录 一、摘要1.1 系统介绍1.2 项目录屏 二、研究内容2.1 课程类型管理模块2.2 课程管理模块2…

黑马头条数据管理平台项目总结

今天主要看了该项目的介绍&#xff0c;这个黑马头条数据管理平台项目主要包括登录、用户的权限判断、文章内容列表的筛选和分页、文章的增删查改还有图片和富文本编辑器这几大部分组成&#xff0c;项目配套了素材代码&#xff0c;像资源文件、第三方插件、页面文件夹、工具插件…

【MySQL】表的增删查改

增创建库创建表表插入表更新插入表替换插入查询结果 查全列查找指定列查找查找结果去重where条件查找筛选分页结果 改对查询到的结果进行列值更新 删delete 和 truncate 的区别 增 创建库创建表 create database 库名称;use 进入的库名称;create table 表名称; select * from…

Apollo新版本Beta技术沙龙

有幸参加Apollo开发者社区于12月2日举办的Apollo新版本(8.0)的技术沙龙会&#xff0c;地址在首钢园百度Apollo Park。由于去的比较早&#xff0c;先参观了一下这面的一些产品&#xff0c;还有专门的讲解&#xff0c;主要讲了一下百度无人驾驶的发展历程和历代产品。我对下面几个…

单点登录方案调研与实现

作用 在一个系统登录后&#xff0c;其他系统也能共享该登录状态&#xff0c;无需重新登录。 演进 cookie → session → token →单点登录 Cookie 可以实现浏览器和服务器状态的记录&#xff0c;但Cookie会出现存储体积过大和可以在前后端修改的问题 Session 为了解决Co…

Doris 集成 ElasticSearch

Doris-On-ES将Doris的分布式查询规划能力和ES(Elasticsearch)的全文检索能力相结合,提供更完善的OLAP分析场景解决方案: (1)ES中的多index分布式Join查询 (2)Doris和ES中的表联合查询,更复杂的全文检索过滤 1 原理 (1)创建ES外表后,FE会请求建表指定的主机,获取所有…

Git 应用 -- 多人协作开发场景1

目录 1. 既查看本地仓库的分支&#xff0c;又查看远程仓库的分支&#xff1a; git branch -a &#xff08;但是远程的分支只能查看&#xff0c;不能直接切换到远程的分支上&#xff09; 2. 本地的分支和远程的分支建立连接&#xff1a;git checkout -b [分支名] [要连接远程的…

【模型可解释性系列一】树模型-拿到特征重要度-打印关键因素

接下来一段时间内&#xff0c;会主要介绍下模型可解释性方向的一些常用方法。 模型可解释性&#xff1a;主要用来解释为什么这个样本的特征是这样的时候&#xff0c;模型结果是那样。面向老板汇报工作(尤其是不懂算法的老板)和业务方。 常用的树模型 xgboost、lightgbm这两个…

Ps:文字操作常用快捷键

对文字的设置操作&#xff0c;可在工具选项栏或“字符”面板上进行。但是&#xff0c;如果能记住并使用快捷键&#xff0c;可大大提高工作效率。 设置文字颜色 Color 1、选中几个或全部文字后&#xff0c;除了使用工具选项栏上的“颜色”按钮&#xff0c;还可以使用快捷键 Alt…

Linux系统调试课:PCIe调试手段

文章目录 一、lspci 命令二、pciutils 工具沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本文我们要介绍pcie调试手段。 一、lspci 命令 通过lspci可以查看当前系统挂载了哪些pci设备。 lspci - 列出 PCI 设备 lspci 命令可以列出计算机中所有 PCI 设备的详细信息,…

23、pytest通过skip跳过测试用例

官方实例 # content of test_skip.py import pytest import syspytest.mark.skip(reason"no way of currently testing this") def test_the_unknown():passdef valid_config():return Falsedef test_function():if not valid_config():pytest.skip("unsupport…

Android View.inflate 和 LayoutInflater.from(this).inflate的区别

前言 两个都是布局加载器&#xff0c;而View.inflate是对 LayoutInflater.from(context).inflate的封装&#xff0c;功能相同&#xff0c;案例使用了dataBinding。 View.inflate(context, layoutResId, root) LayoutInflater.from(context).inflate(layoutResId, root, fals…

如何通过navicat连接SQL Server数据库

本文介绍如何通过Navicat 连接SQL Server数据库。如果想了解如何连接Oracle数据库&#xff0c;可以参考下边这篇文章。如何通过Navicat连接Oracle数据库https://sgknight.blog.csdn.net/article/details/132064235 1、新建SQL Server连接配置 打开Navicat软件&#xff0c;点击…

centos7安装Elasticsearch7系列

背景 今天公司项目需要使用Elasticsearch7.17.7。所有网上搜索了一番&#xff0c;查到一个很不错安装方式分享给大家。 Elasticsearch官网发布 从 Elasticsearch 7.x 版本开始&#xff0c;Elasticsearch 发行版包括了自己的 JDK。因此&#xff0c;您不需要单独安装 Java。以…

2023.2版idea安装教程,现在jdk8已经过去式了,不同idea支持的jdk不同。升级jdk后idea也要随之升级

下载idea2023.2版本&#xff0c;下载之前需要删除之前的版本&#xff0c;一定要删除干净&#xff0c;删除程序要勾选那两个delete 下载路径&#xff1a;其他版本 - IntelliJ IDEA (jetbrains.com.cn) 选择2023.2版本 下载后进入安装程序&#xff0c;选择安装目录&#xff0c;然…

去掉参数中第一个“,”

记录一下&#xff0c;前端传参中&#xff0c;传给我参数是“categoryIds: ,1731557494586241026,1731569816263311362,1731569855534579713,1731858335179223042,1731858366821052418” 但是后端&#xff0c;因为我的mybati是in查询&#xff0c;所以因为第一个是“,”。所以会导…