Kafka性能测试初探

相信大家对Kafka不会陌生,但首先还是要简单介绍一下。

Kafka是一种高性能的分布式消息系统,由LinkedIn公司开发,用于处理海量的实时数据流。它采用了发布/订阅模式,可以将数据流分发到多个消费者端,同时提供了高可靠性、高吞吐量和低延迟的特性。

Kafka的应用场景非常广泛,例如日志收集、事件流处理、实时监控等。在这些场景中,Kafka可以提供高可靠性和低延迟的数据传输,确保数据的稳定性和实时性。与此同时,Kafka还提供了丰富的API和管理工具,使得用户可以方便地配置和管理Kafka集群。

很多高性能方案都会用到Kafka,今天我来分享如何使用Kafka Client API进行Kafka生产者和消费者压测。

依赖

我用了Gradle创建的项目,依赖配置如下:

compile group: ‘org.apache.kafka’, name: ‘kafka-clients’, version: ‘3.4.0’

kafka服务端

我本地用了Kafka最新版本:kafka_2.12-3.4.0,这个版本可以不依赖zookeeper,非常方便,用来本地功能验证和测试我是十分推荐的。基本做到了开箱即用。

具体的流程可以自行搜索。

生产者压测Demo

在创建生产者时,会有不少的参数需要配置,这里建议使用默认的。或者使用待测试参数组合。下面是我自己的配置,常用的参数我都列了出来。具体参数含义,可以自行搜索,这方面资料还是很多的,下面直接进入压测用例环节。

package com.funtest.kafka


import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import com.funtester.utils.StringUtil
import groovy.util.logging.Log4j2
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer

@Log4j2
class Produce extends SourceCode {

    static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); //所有分区副本都收到确认信息,才能确认写入
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name);
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        def topic = "testkafka"
        def test = {
            producer.send(new ProducerRecord<>(topic, StringUtil.getString(10)))
        }
        new FunQpsConcurrent(test,"Kafka测试").start()

        producer.close();
    }

}

这里用到了动态QPS模型,最后的close()也可以不使用,毕竟main方法的代码结束了就真的结束了。

消费者

呼应生产者,消费者也有一堆需要配置的参数。这里先按下不表,有兴趣的可以自行学习。

Kafka消费者有两种订阅消息的方式,分别是订阅模式和分配模式。

订阅模式是指消费者订阅一个或多个主题,然后自动分配分区进行消费。这种模式下,Kafka会自动管理消费者与分区之间的关系,当有新的消费者加入或者退出消费组时,Kafka会自动重新分配分区,保证每个消费者都能够获取到消息。

而分配模式则是由消费者主动向Kafka请求分配指定的分区进行消费。这种模式下,消费者需要手动管理分区与消费者之间的关系,需要注意的是,当有新的消费者加入或者退出消费组时,需要手动重新分配分区。

订阅模式相对于分配模式来说更加简单易用,但是分配模式可以更加灵活地控制消费者与分区之间的关系。所以我选择了订阅模式。

package com.funtest.kafka

import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer

import java.time.Duration

class Cunsumer extends SourceCode {

    static void main(String[] args) {
        KafkaConsumer<String, String> consumer;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FunTester32");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");
        properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"10000");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
                , "earliest");
        consumer = new KafkaConsumer<>(properties);

        String topic = "testkafka";
//        TopicPartition topicPartition = new TopicPartition(topic, 0);
//        List<TopicPartition> topics = Arrays.asList(topicPartition);
//        consumer.assign(topics);
//        consumer.seekToEnd(topics);
//        long current = consumer.position(topicPartition);
//        consumer.seek(topicPartition, current - 10);//手动设置偏移量
        consumer.subscribe([topic])//订阅模式,不能与assign混用
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            sleep(1.0)
        }

        def test = {
            consumer.poll(Duration.ofMillis(1000));
        }
        new FunQpsConcurrent(test,"Kafka消费").start()
        consumer.close()

    }
}

由于本地机器原因,需要在服务器上启动一个Kafka服务,用来测试不同参数组合情况下Kafka的性能表现。后续有机会再来分享。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/165585.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp 微信小程序登录 新手专用 引入即可

预览 第一步导入插件 在引入的页面的登录按钮下拷贝一下代码 <template><view class"content"><button type"primary" click"login">微信登录</button></view><TC-WXlogin :wxloginwxlogin /> </templ…

阿坤老师的独特瓷器(Java详解)

一、题目描述 示例&#xff1a; 输入&#xff1a; 5 3 4 5 6 2 5 3 7 6 5 输出&#xff1a; 3 二、题解 思路分析&#xff1a; 题目要求我们计算出“独特瓷器”的个数&#xff0c;而“独特瓷器”是指对于瓷器A&#xff0c;没有另一个瓷器B&#xff0c;直径和高度都大于A。则…

Idea远程debug

Idea远程debug 一、方法 1.1、启动时加参数 java -agentlib:jdwptransportdt_socket,servery,suspendn,address5005 -jar remote-debug-0.0.1-SNAPSHOT.jar1.2、运行时加参数 在没有显式-agentlib:jdwp参数的情况下进行远程调试&#xff1a; 首先&#xff0c;确保您的Sprin…

三极管-开关电路-电路电子-嵌入式开发-物联网开发-电子元件

一、概述 本文我们主要讲解电子电路中十分重要的元件--三极管。三极管常常被用来当作开关或作为放大电流的作用&#xff0c;下面我们将主要围绕着其作为开关电路的使用来介绍三极管。 二、分类 学习三极管前&#xff0c;我们必须认识三极管的三级&#xff0c;包含箭头的一端为发…

Visual Studio Code 从英文界面切换中文

1、先安装中文的插件&#xff0c;直接安装。 2、点击右下角的 change language restart&#xff0c; 让软件重启即可以完成了。

416. 分割等和子集问题(动态规划)

题目 题解 class Solution:def canPartition(self, nums: List[int]) -> bool:# badcaseif not nums:return True# 不能被2整除if sum(nums) % 2 ! 0:return False# 状态定义&#xff1a;dp[i][j]表示当背包容量为j&#xff0c;用前i个物品是否正好可以将背包填满&#xff…

设计模式-行为型模式-模板方法模式

一、什么是模板模式 模板方法模式&#xff08;Template Method Pattern&#xff09;是一种行为型设计模式&#xff0c;它定义了一个算法骨架&#xff0c;允许子类在不改变算法整体结构的情况下重新定义算法的某些步骤。 主要组成部分&#xff1a; 1、模板方法&#xff08;Templ…

k8s的高可用集群搭建,详细过程实战版

kubernetes高可用集群的搭建 前面介绍过了k8s单master节点的安装部署 今天介绍一下k8s高可用集群搭建 环境准备&#xff1a; vip &#xff1a;192.168.121.99 keeplive master01&#xff1a;192.168.121.153 centos7 master02&#xff1a;192.168.121.154 centos7 master03&a…

SpringBoot静态资源配置

项目中 SSM中配置 第一种&#xff1a;配置文件中 <mvc:resources mapping"/js/**" location"/js/"/> <mvc:resources mapping"/css/**" location"/css/"/> <mvc:resources mapping"/html/**" location&q…

【最新Tomcat】IntelliJ IDEA通用配置Tomcat教程(超详细)

前言 IntelliJ IDEA是一个强大的集成开发环境&#xff0c;能够大大简化Java应用程序的开发和部署过程。而Tomcat作为一个流行的Java Web服务器&#xff0c;其与IntelliJ IDEA的整合能够提供便捷的开发环境&#xff0c;让开发人员更专注于代码的创作与优化。 在配置IntelliJ IDE…

单片机课程设计——基于C51电子密码锁(源代码)

本设计是基于AT89C51单片机的电子密码锁设计&#xff0c;实现电子密码锁的基本功能。我们这里实现的是硬件仿真&#xff0c;关于软件仿真可以参考其他人的文章。 单片机课程设计--基于C51电子密码锁 效果展示 我们先来看效果展示&#xff0c;公主王子请看视频&#xff1a; 课…

【LeetCode】二叉树OJ

目录 一、根据二叉树创建字符串 二、二叉树的层序遍历 三、二叉树的层序遍历 II 四、二叉树的最近公共祖先 五、二叉搜索树与双向链表 六、从前序与中序遍历序列构造二叉树 七、从中序与后序遍历序列构造二叉树 一、根据二叉树创建字符串 606. 根据二叉树创建字符串 - …

2024全网最新最全的Pytest接口自动化测试框架教程

pytest编写的规则&#xff1a; 1、测试文件以test_开头&#xff08;以_test结尾也可以&#xff09; 2、测试类以Test开头&#xff0c;并且不能带有__init__方法 3、测试函数以test_开头 4、断言必须使用assert pytest.main([-s,-v]) &#xff1a;用来执行测试用例 -s 打印prin…

计算机毕业设计 基于SpringBoot的健身房管理系统的设计与实现 Java实战项目 附源码+文档+视频讲解目录

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

linux系统环境下mysql安装和基本命令学习

此篇文章为蓝桥云课--MySQL的学习记录 块引用部分为自己的实验部分&#xff0c;其余部分是课程自带的知识&#xff0c;链接如下&#xff1a; MySQL 基础课程_MySQL - 蓝桥云课 本课程为 SQL 基本语法及 MySQL 基本操作的实验&#xff0c;理论内容较少&#xff0c;动手实践多&am…

033、微调

之——高级炼丹术 目录 之——高级炼丹术 杂谈 正文 1.标注数据集是很贵的 2.微调的思想 3.尝试 小结 杂谈 微调&#xff08;Fine-tuning&#xff09;是深度学习中的一种常见策略&#xff0c;它通常用于预训练模型在特定任务上的性能提升。微调的过程涉及在一个已经在大…

7.22 SpringBoot项目实战【收藏 和 取消收藏】

文章目录 前言一、编写控制器二、编写服务层三、Postman测试最后前言 本系统还支持 收藏图书,就是对心仪的书加一下收藏,大家都懂,这是一个很常见的功能。 那么我们来看看怎么来做,先分析一下:【一个人】对【一本书】只需【收藏一次】,但可以【收藏N本】不同的书,收藏…

2024年csdn最新最全面的fiddler教程【1】

Fiddler简介 Fiddler是比较好用的web代理调试工具之一&#xff0c;它能记录并检查所有客户端与服务端的HTTP/HTTPS请求&#xff0c;能够设置断点&#xff0c;篡改及伪造Request/Response的数据&#xff0c;修改hosts&#xff0c;限制网速&#xff0c;http请求性能统计&#xff…

基于springboot实现校园在线拍卖系统项目【项目源码】计算机毕业设计

基于springboot实现校园在线拍卖系统演示 Javar技术 JavaScript是一种网络脚本语言&#xff0c;广泛运用于web应用开发&#xff0c;可以用来添加网页的格式动态效果&#xff0c;该语言不用进行预编译就直接运行&#xff0c;可以直接嵌入HTML语言中&#xff0c;写成js语言&…

高效管理文件:如何通过文件数量归类提高工作效率

在日常生活和工作中&#xff0c;需要处理大量的文件和资料。然而&#xff0c;如果这些文件没有得到妥善的管理&#xff0c;就会使得我们花费大量的时间和精力去寻找和整理它们。对于大量文件&#xff0c;按照数量归类可以使得文件管理更加有序和规范。根据文件的数量建立相应的…