Kafka入门-分区及压缩

一、生产者消息分区

Kafka的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。

分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。 

分区策略

所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka为我们提供了默认的分区策略,同时它也支持自定义分区策略。 如果要自定义分区策略,需要显式地配置生产者端的参数partitioner.class。这个参数该怎么设定呢?方法很简单,在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的partition方法。我们来看看这个方法的方法签名:

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前Kafka集群共有多少主题、多少Broker等)。只要实现类定义好了partition方法,同时设置partitioner.class参数为实现类的FullQualified Name,那么生产者程序就会按照代码逻辑对消息进行分区。

比较常见的分区策略:

1. 轮询策略

未指定partitioner.class参数,默认使用

2. 随机策略

要实现随机策略版的partition方法,很简单,只需要两行代码即可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

 先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。

3. 按消息键保序策略 

Kafka允许为每条消息定义消息键,简称为Key。这个Key的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务ID等;一旦消息被定义了Key,那么就可以保证同一个Key的所有消息都进入到相同的分区里面,实现这个策略的partition方法同样简单,只需要下面两行代码即可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

4. 基于地理位置的分区策略

这种策略一般只针对那些大规模的Kafka集群,特别是跨城市、跨国家甚至是跨大洲的集群。

根据Broker所在的IP地址实现定制化的分区策略。比如下面这段代码:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

 二、Kafka的压缩

Kafka的消息层次都分为两层:消息集合(messageset)以及消息(message)。一个消息集合中包含若干条日志项(record item),而日志项才是真正封装消息的地方。Kafka底层的消息日志由一系列消息集合日志项组成。Kafka通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

在Kafka中,压缩可能发生在两个地方:生产者端和Broker端

生产者端

生产者程序中配置compression.type参数即表示启用指定类型的压缩算法。比如下面这段程序代码展示了如何构建一个开启GZIP的Producer对象:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启GZIP压缩
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);

这样Producer启动后生产的每个消息集合都是经GZIP压缩过的,故而能很好地节省网络传输带宽以 及Kafka Broker端的磁盘占用。 

Broker端

1. Broker端指定了和Producer端不同的压缩算法。Broker端也有一个参数叫compression.type,默认值是producer,可以设置不同压缩算法。

2. Broker端发生了消息格式转换。为了兼容老版本的格式,Broker端 会对新版本消息执行向老版本格式的转换。这个过程中会涉及消息的解压缩和重新压缩。

解压缩

解压缩发生在消费者程序中,也就是说Producer发送压缩消息到Broker后,Broker照单全收并原样保存起来。当Consumer程序请求这部分消息时,由Consumer自行解压缩还原成之前的消息。

Consumer怎么知道这些消息是用何种压缩算法压缩的呢?其实答案就在消息中。Kafka会将启用了哪种压缩算法封装进消息集合中,这样当Consumer读取到消息集合时,它自然就知道 这些消息使用的是哪种压缩算法。

Producer端压缩、Broker端保持、Consumer端解压缩

对于Kafka而言压缩算法对比:

        在吞吐量方面:LZ4 > Snappy > zstd和GZIP;

        而在压缩比方面,zstd > LZ4 > GZIP > Snappy。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/756825.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

spring和springboot的关系是什么?

大家好&#xff0c;我是网创有方的站长&#xff0c;今天给大家分享下spring和springboot的关系是什么&#xff1f; Spring和Spring Boot之间的关系可以归纳为以下几个方面&#xff1a; 技术基础和核心特性&#xff1a; Spring&#xff1a;是一个广泛应用的开源Java框架&#…

我们后端程序员不是操作MyBatis的CRUD Boy

大家好&#xff0c;我是南哥。 一个对Java程序员进阶成长颇有研究的人&#xff0c;今天我们接着新的一篇Java进阶指南。 为啥都戏称后端是CRUD Boy&#xff1f;难道就因为天天怼着数据库CRUD吗&#xff1f;要我说&#xff0c;是这个岗位的位置要的就是你CRUD&#xff0c;你不…

JeecgBoot中如何对敏感信息进行脱敏处理?

数据脱敏即将一些敏感信息通过加密、格式化等方式处理&#xff0c;展示给用户一个新的或是格式化后的信息&#xff0c;避免了敏感信息的暴露。 一、接口脱敏注解 针对接口数据实现脱敏加密&#xff0c;只加密&#xff0c;一般此方案用于数据加密展示。 1.1 注解介绍 注解作用域…

Qt信号槽的坑

1、重载的信号&#xff08;以QSpinBox为例&#xff09; 像是点击按钮之类的信号槽很好连接&#xff0c;这是因为它的信号没有重载&#xff0c;如果像SpinBox那样有重载信号的话&#xff08;Qt5.12的见下图&#xff0c;不过Qt5.15LTS开始就不再重载而是换信号名了&#xff09;&…

Linux常用命令大全(超详细!!!)

文章目录 1.Linux是什么1.1 关于Linux我们主要学习什么1.1 学习Linux常见命令的前置知识 2. Linux常见命令2.1 ls命令2.2 cd命令2.3 pwd命令2.4 touch命令2.5 cat命令2.6 echo命令2.7 vim命令2.8 mkdir 命令2.9 rm命令2.10 cp命令2.11 mv命令2.12 grep命令2.13 ps命令2.14 nets…

<Python><ffmpeg>基于python使用PyQt5构建GUI实例:音频格式转换程序(MP3/aac/wma/flac)(优化版2)

前言 本文是基于python语言使用pyqt5来构建的GUI,功能是使用ffmpeg来对音频文件进行格式转换,如mp3、aac、wma、flac等音乐格式。 UI示例: 环境配置 系统:windows 平台:visual studio code 语言:python 库:pyqt5、ffmpeg 概述 本文是建立在之前的博文的基础上的优化版…

(笔记)Error: qemu-virgl: Failed to download resource “qemu-virgl--test-image“解决方法

错误&#xff1a; > Downloading https://www.ibiblio.org/pub/micro/pc-stuff/freedos/files/distributions/1.2/FD12FLOPPY.zip curl: (22) The requested URL returned error: 404Error: qemu-virgl: Failed to download resource "qemu-virgl--test-image" D…

Grafana-11.0.0 在线部署教程

Grafana-11.0.0 在线部署教程 环境&#xff1a; 操作系统&#xff1a; ubuntugrafana版本&#xff1a; 11.0.0 &#xff08;建议不要按照最新版&#xff09;grafana要求的系统配置不高&#xff0c;建议直接部署在监控服务器上&#xff0c;比如zabbix服务器、prometheus服务器…

文华财经通达信同花顺期货通盘立方博易大师主图指标公式源码

买线:EMA(C,2); 卖线:EMA(SLOPE(C,21)*20C,42); BU:CROSS(买线,卖线); SEL:CROSS(卖线,买线); STICKLINE1(买线>卖线,LOW,MIN(O,C),0.1,1),COLORRED; STICKLINE1(买线>卖线,MAX(O,C),HIGH,0.1,1),COLORRED; STICKLINE(买线>卖线,CLOSE,OPEN,8,1),COLORRED; STI…

页面开发感想

页面开发 1、 前端预览 2、一些思路 2.1、首页自定义element-plus的走马灯 :deep(.el-carousel__arrow){border-radius: 0%;height: 10vh; }需要使用:deep(标签)才能修改样式 或者 ::v-deep 标签 2.2、整体设计思路 <template><div class"card" style&…

跟《经济学人》学英文:2024年6月22日这期 India’s electronics industry is surging

India’s electronics industry is surging Foreign and domestic firms are investing in local manufacturing surge:激增&#xff1b;急剧上升&#xff1b; 原文&#xff1a; To witness India’s growing role as a manufacturing hub, dodge Bangalore’s notorious t…

maven安装jar和pom到本地仓库

举例子我们要将 elastic-job-spring-boot-starter安装到本地的maven仓库&#xff0c;如下&#xff1a; <dependency><groupId>com.github.yinjihuan</groupId><artifactId>elastic-job-spring-boot-starter</artifactId><version>1.0.5&l…

使用腾讯云服务器从0搭建个人网站,超简单图文教程

使用腾讯云服务器搭建网站全流程&#xff0c;包括轻量应用服务器和云服务器CVM建站教程&#xff0c;轻量可以使用应用镜像一键建站&#xff0c;云服务器CVM可以通过安装宝塔面板的方式来搭建网站&#xff0c;腾讯云服务器网txyfwq.com整理使用腾讯云服务器建站教程&#xff0c;…

Java的IO体系

目录 1、Java的IO体系2、IO的常用方法3、Java中为什么要分为字节流和字符流4、File和RandomAccessFile5、Java对象的序列化和反序列化6、缓冲流7、Java 的IO流中涉及哪些设计模式 1、Java的IO体系 IO 即为 input 输入 和 output输出 Java的IO体系主要分为字节流和字符流两大类…

【51单片机】串口通信(发送与接收)

文章目录 前言串口通信简介串口通信的原理串口通信的作用串口编程的一些概念仿真图如何使用串口初始化串口串口模式波特率配置 发送与接收发送接收 示例代码 总结 前言 在嵌入式系统的开发中&#xff0c;串口通信是一种常见且重要的通信方式。它以其简单、稳定的特性在各种应用…

多阶段分层构建容器化Spring Boot应用程序

上一节中&#xff0c;容器化spring boot应用程序-CSDN博客我们介绍了基于简单的Dockerfile对spring boot进行容器化的过程&#xff0c;本讲将介绍如何基于Dockerfile进行多阶段的分层构建过程&#xff0c;希望对大家有所帮助。 Spring Boot从版本2.3.0开始支持分层构建容器化的…

4.x86游戏实战-人物状态标志位

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 上一个内容&#xff1a;3.x86游戏实战-寄存器 人物状态标志位&#xff1a; 什么叫人物状态标志位&…

PAE:从潮流报告中提炼有效产品属性

本文将介绍PAE&#xff0c;一种用于包含 PDF格式的文本和图像的产品属性提取算法。目前大部分的方法侧重于从标题或产品描述中提取属性&#xff0c;或利用现有产品图像中的视觉信息。与之前的工作相比&#xff0c;PAE从潮流趋势报告的PDF文件中提取属性&#xff0c;提取的属性包…

Django 自定义标签

1&#xff0c;简单标签 1.1 添加自定义标签函数 Test/app5/templatetags/mytags.py from django import template register template.Library() register.simple_tag() def show_title(value, n):if len(value) > n:return f{value[:n]}...else:return value 1.2 添加视…

day02-Spark集群及参数

一、Spark运行环境变量问题(了解) 1-pycharm远程开发运行时&#xff0c;执行的是服务器的代码 2-通过本地传递指令到远程服务器运行代码时&#xff0c;会加载对应环境变量数据&#xff0c;加载环境变量文件是用户目录下的.bashrc文件 在/etc/bashrc 1-1 在代码中添加 使用os模块…