Kafka生产者如何提高吞吐量?

1、batch.size:批次大小,默认16k

2、linger.ms:等待时间,修改为5-100ms

3、compression.type:压缩snappy

4、 RecordAccumulator:缓冲区大小,修改为64m

 测试代码:

package com.bigdata.producter;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 *   测试自定义分区器的使用
 */
public class CustomProducer07 {

    public static void main(String[] args) {

        // Properties 它是map的一种
        Properties properties = new Properties();
        // 设置连接kafka集群的ip和端口
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        /**
         *  此处是提高效率的代码
         */
        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 18000);
        // linger.ms:等待时间,默认 0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");



        // 创建了一个消息生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 调用这个里面的send方法
        // ctrl + p
        /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first","告诉你个秘密");
        kafkaProducer.send(producerRecord);*/
        for (int i = 0; i < 5; i++) {
            // 发送消息的时候,指定key值,但是没有分区号,会根据 hash(key) % 3 = [0,1,2]
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first","c","告诉你个找bigdata的好办法:"+i);
            // 回调-- 调用之前先商量好,回扣多少。
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    // 获取很多信息,exception == null 说明成功,不为null,说明失败
                    if(exception == null){
                        System.out.println("消息发送成功");
                        System.out.println(metadata.partition());// 三个分区,我什么每次都是2分区,粘性分区
                        System.out.println(metadata.offset());// 13 14 15 16 17
                        System.out.println(metadata.topic());
                    }else{
                        System.out.println("消息发送失败,失败原因:"+exception.getMessage());
                    }

                }
            });
        }

        kafkaProducer.close();
    }
}

测试:

①在 bigdata01 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic first 

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/911964.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【论文笔记】SparseRadNet: Sparse Perception Neural Network on Subsampled Radar Data

原文链接&#xff1a;https://arxiv.org/abs/2406.10600 简介&#xff1a;本文引入自适应子采样方法和定制网络&#xff0c;利用稀疏性模式发掘雷达信号中的全局和局部依赖性。本文的子采样模块选择 RD谱中在下游任务贡献最大 像素 的子集。为提高子采样数据的特征提取&#xf…

如何制定公司软件测试策略

在当今快速发展的软件行业&#xff0c;制定一套有效的软件测试策略至关重要。尤其是在互联网产品的背景下&#xff0c;测试策略不仅需要高效&#xff0c;还要具备灵活性&#xff0c;以便快速响应市场变化。本文将对比传统测试策略与互联网产品测试策略&#xff0c;并提供制定公…

【JWT】Asp.Net Core中JWT刷新Token解决方案

Asp.Net Core中JWT刷新Token解决方案 前言方案一:当我们操作某个需要token作为请求头的接口时,返回的数据错误error.response.status === 401,说明我们的token已经过期了。方案二:实现用户无感知的刷新token值,我们希望当响应返回的数据是401身份过期时,响应阻拦器自动帮我…

机器学习—Softmax

Softmax回归算法是Logistic回归的推广&#xff0c;它是一种针对多类分类上下文的二进制分类算法。 当y可以接受两个可能的输出值时&#xff0c;Logistic回归就适用了&#xff0c;不是零就是一&#xff0c;它计算这个输出的方式是首先计算zw*xb&#xff0c;然后计算ag(z)&#…

docker镜像文件导出导入

1. 导出容器&#xff08;包含内部服务&#xff09;为镜像文件&#xff08;docker commit方法&#xff09; 原理&#xff1a;docker commit命令允许你将一个容器的当前状态保存为一个新的镜像。这个新镜像将包含容器内所有的文件系统更改&#xff0c;包括安装的软件、配置文件等…

【韩老师零基础30天学会Java 】02笔记

sublime Text中本身没有GBK编码&#xff0c;需要安装 要在sublime Text中设置编码为GBK&#xff0c;请按照以下步骤操作 打开Sublime Text编辑器,点击菜单栏中的“Preferences”(首选项)选项&#xff0c;找打Package Control选项。点击Package Control&#xff0c;随后搜索Inst…

如何设置 TORCH_CUDA_ARCH_LIST 环境变量以优化 PyTorch 性能

引言 在深度学习领域&#xff0c;PyTorch 是一个广泛使用的框架&#xff0c;它允许开发者高效地构建和训练模型。为了充分利用你的 GPU 硬件&#xff0c;正确设置 TORCH_CUDA_ARCH_LIST 环境变量至关重要。这个变量告诉 PyTorch 在构建过程中应该针对哪些 CUDA 架构版本进行优…

Matlab 基于声学超表面的深亚波长厚度完美吸收体

传统吸声器的结构厚度与工作波长相当&#xff0c;这在低频范围的实际应用中造成了很大的障碍。我们提出了一种基于超表面的完美吸收器&#xff0c;能够在极低频区域实现声波的全吸收。该超表面具有深亚波长厚度&#xff0c;特征尺寸为k223&#xff0c;由穿孔板和卷曲共面气室组…

【HarmonyOS】not supported when useNormalizedOHMUrl is not true.

【HarmonyOS】 not supported when useNormalizedOHMUrl is not true. 问题背景&#xff1a; 集成三方库编译时&#xff0c;IDE提示报错信息如下&#xff1a; hvigor ERROR: Bytecode HARs: [cashier_alipay/cashiersdk] not supported when useNormalizedOHMUrl is not true…

用接地气的例子趣谈 WWDC 24 全新的 Swift Testing 入门(三)

概述 从 WWDC 24 开始&#xff0c;苹果推出了全新的测试机制&#xff1a;Swift Testing。利用它我们可以大幅度简化之前“老态龙钟”的 XCTest 编码范式&#xff0c;并且使得单元测试更加灵动自由&#xff0c;更符合 Swift 语言的优雅品味。 在这里我们会和大家一起初涉并领略…

嘉吉携百余款产品与解决方案再度亮相进博会

第七届中国国际进口博览会&#xff08;下称“进博会”&#xff09;于11月5日至10日在上海国家会展中心举办。嘉吉连续第七年参与进博会&#xff0c;并以“新质绿动&#xff0c;共赢未来”为参展主题&#xff0c;携超过120款创新产品与解决方案&#xff0c;共赴进博之约。 今年嘉…

LLMs之MemFree:MemFree的简介、安装和使用方法、案例应用之详细攻略

LLMs之MemFree&#xff1a;MemFree的简介、安装和使用方法、案例应用之详细攻略 目录 MemFree的简介 1、MemFree的价值 2、MemFree 配备了强大的功能&#xff0c;可满足各种搜索和生产力需求 3、MemFree AI UI生成器功能 MemFree 安装和使用方法 1. 前端安装 2. 向量服务…

LeetCode:103. 二叉树的锯齿形层序遍历

目录 题目描述: 代码: 这个与二叉树的层序遍历有点类似 题目描述: 给你二叉树的根节点 root &#xff0c;返回其节点值的 锯齿形层序遍历 。&#xff08;即先从左往右&#xff0c;再从右往左进行下一层遍历&#xff0c;以此类推&#xff0c;层与层之间交替进行&#xff09;…

算法

1 差分练习 1 模板题 代码实现&#xff1a; import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner sc new Scanner(System.in);int n sc.nextInt();int m sc.nextInt();int num sc.nextInt();long[][] arr new long[n 2][m 2…

新能源行业必会基础知识-----电力现货市场理论篇-----电力现货市场价格机制-----电力现货市场价格机制

新能源行业必会基础知识-----电力现货市场理论篇-----主目录-----持续更新https://blog.csdn.net/grd_java/article/details/143364261 这本书是2023年出版的&#xff0c;是当下了解国内电力市场最好的途径了。还是推荐大家买来这本书进行阅读观看&#xff0c;最好作为随身携带…

51单片机教程(五)- LED灯闪烁

1 项目分析 让输入/输出口的P1.0或P1.0~P1.7连接的LED灯闪烁。 2 技术准备 1、C语言知识点 1 运算符 1 算术运算符 #include <stdio.h>int main(){// 算术运算符int a 13;int b 6;printf("%d\n", ab); printf("%d\n", a-b); printf("%…

【go从零单排】error错误处理及封装

&#x1f308;Don’t worry , just coding! 内耗与overthinking只会削弱你的精力&#xff0c;虚度你的光阴&#xff0c;每天迈出一小步&#xff0c;回头时发现已经走了很远。 &#x1f4d7;概念 在 Go 语言中&#xff0c;error 是一个内置的接口类型&#xff0c;用于表示错误情…

【人工智能】阿里云PAI平台DSW实例一键安装Python脚本

阿里云的DSW实例自带的镜像很少而且并不好用&#xff0c;所以我在这里写三个一键编译安装Python3.8&#xff0c;Python3.9&#xff0c;Python3.10的Shell脚本。 安装Python3.8 wget https://www.smallbamboo.cn/install_python38.sh && chmod x install_python38.sh …

【TS】九天学会TS语法——1.TypeScript 是什么

&#x1f4af; 欢迎光临清清ww的博客小天地&#x1f4af; &#x1f525; 个人主页:【清清ww】&#x1f525; &#x1f4da; 系列专栏:vue3 | TypeScript &#x1f4da; &#x1f31f; 学习本无底&#xff0c;前进莫徬徨。&#x1f31f; 开始学习 目录 一、TypeScript 简介 1.Ja…

comfyUI官方笔记整理

官网教程笔记ComfyUI_examples 1.ComfyUI基础教程ComfyUI Basic Tutorial VN 从网上下载下来的模型ckpt和safetensors包含有3个不同的模型&#xff0c;CLIP&#xff0c;主模型还有VAEclip模型会和CLIPTextEncode进行连接&#xff0c;将文本编码为模型可以理解的向量形式。在S…