Kafka3.0.0版本——生产者如何提高吞吐量

目录

    • 一、生产者提高吞吐量参数设置
    • 二、产者提高吞吐量代码示例

一、生产者提高吞吐量参数设置

  • batch.size:设置批次大小,默认16k
  • linger.ms:设置等待时间,修改为5-100ms
  • buffer.memory:设置缓冲区大小, 默认 32M
  • compression.type:设置压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd

二、产者提高吞吐量代码示例

  • 代码示例

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    /**
     * 生产者提高吞吐量
     * 1、设置批次大小,batch.size 默认 16K
     * 2、设置等待时间,linger.ms 默认 0
     * 3、设置缓冲区大小,buffer.memory 默认 32M
     * 4、设置压缩, compression.type 默认 none,可配置值 gzip、snappy、lz4 和 zstd
     * */
    public class CustomProducerParameters {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            /**
             * 4、提高吞吐量
             * */
            //设置缓冲区大小
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
            //设置批次大小
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
            //设置等待时间 linger.ms
            properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            //设置压缩
            properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
    
    
            //5、创建生产者
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //6、调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            //7、关闭资源
            kafkaProducer.close();
        }
    }
    
    
  • 在三台服务器上开启 Kafka 消费者

    [root@localhost kafka-3.0.0]#  bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.29:9092 --topic news
    
  • 在 IDEA 中执行代码,观察 三台服务器控制台中是否接收到消息,如下图:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/57972.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

信号槽中的函数重载

信号槽中的函数重载 QT4的方式QT5的方式函数指针重载函数QT5信号函数重载解决方案 总结 QT4的方式 Qt4中声明槽函数必须要使用 slots 关键字, 不能省略。 信号函数&#xff1a; 槽函数&#xff1a; mainwondow: cpp文件&#xff1a; #include "mainwindow.h"…

快速部署外卖系统:利用现代工具简化开发流程

在竞争激烈的外卖市场中&#xff0c;快速部署高效稳定的外卖系统是餐饮企业成功的关键之一。本文将介绍如何利用现代工具简化外卖系统的开发流程&#xff0c;并附带代码示例&#xff0c;帮助开发者快速搭建功能完备、用户友好的外卖平台。 1. 简介 在外卖业务快速增长的背景…

使用express搭建后端服务

目录 1 创建工程目录2 初始化3 安装express依赖4 启动服务5 访问服务总结 上一篇我们利用TDesign搭建了前端服务&#xff0c;现在的开发讲究一个前后端分离&#xff0c;后端的话需要单独搭建服务。后端服务的技术栈还挺多&#xff0c;有java、php、python、nodejs等。在众多的技…

稍微深度踩坑haystack + whoosh + jieba

说到django的全文检索&#xff0c;网上基本推荐的都是 haystack whoosh jieba 的方案。 由于我的需求对搜索时间敏感度较低&#xff0c;但是要求不能有数据的错漏。 但是没有调试的情况下&#xff0c;搜索质量真的很差&#xff0c;搞得我都想直接用Like搜索数据库算了。 但是…

排序八卦炉之冒泡、快排

文章目录 1.冒泡排序1.1代码实现1.2复杂度 2.快速排序2.1人物及思想介绍【源于百度】2.2hoare【霍尔】版本1.初识代码2.代码分析3.思其因果 3.相关博客 1.冒泡排序 1.1代码实现 //插入排序 O(N)~O(N^2) //冒泡排序 O(N)~O(N^2) //当数据有序 二者均为O(N) //当数据接近有序或…

什么是 webpack?

Webpack 介绍 什么是 webpack&#xff1f; :::tip 官方描述 webpack 是一个用于现代 JavaScript 应用程序的静态模块打包工具。当 webpack 处理应用程序时&#xff0c;它会在内部从一个或多个入口点构建一个 依赖图(dependency graph)&#xff0c;然后将你项目中所需的每一个…

第四章 数据库安全性

问题的提出 &#xff08;1&#xff09;数据库的一大特点是数据可以共享 &#xff08;2&#xff09;数据共享必然带来数据库的安全性问题 &#xff08;3&#xff09;数据库系统中的数据共享不能是无条件的共享 这就引发了数据库安全性问题 1.数据库安全性概述 数据库的安全性…

MySQL日志——查询日志

1.查询日志 show variables like %general%;修改mysql的配置文件 /etc/my.cnf文件&#xff0c;添加如下内容&#xff1a; #该选项用来开启查询日志&#xff0c;可选值&#xff1a;0或者1&#xff1b;0代表关闭&#xff0c;1代表开启 general_log1 #设置日志的文件名&#xff0…

C# Blazor 学习笔记(8):row/col布局开发

文章目录 前言相关文章代码row和col组件B_rowB_col结构 使用 前言 可能是我用的element ui和 uView这种第三方组件用的太多了。我上来就希望能使用这些组件。但是目前Blazor目前的生态其实并不完善&#xff0c;所以很多组件要我们自己写。 我们对组件的要求是 我们在组件化一共…

纯粹即刻,畅享音乐搜索的轻松体验

纯粹即刻&#xff0c;畅享音乐搜索的轻松体验 在当今快节奏的生活中&#xff0c;我们常常渴望一种简单而便捷的方式来探索和享受音乐。现在&#xff0c;你可以纯粹即刻地畅享音乐搜索的轻松体验。无论你是寻找热门歌曲还是探索不同风格的音乐&#xff0c;这款应用将为你带来随…

地址空间细致入微+深入了解页表

目录 地址空间保存了什么&#xff1f; 页表到底是怎么存储的 我们都知道&#xff0c;我们进程看到的空间其实是虚拟内存&#xff0c;真正的内存是需要页表的映射才能找到真正的物理内存&#xff0c;那么我我们有两个问题的引出那么进程地址空间是保存了什么呢&#xff1f;页表…

Maven项目解决cannot resolve plugin maven-deploy-plugin:2.7

导入maven项目后&#xff0c;编辑的时候提示一些插件加载失败&#xff01;大概率是你的网络有问题&#xff0c;插件下载失败。 如下图&#xff1a;&#xff08;网络突然好了&#xff0c;我想截图但是没有复现&#xff0c;用网上找到的截图代替&#xff0c;明白意思就行&#x…

一起学算法(选择排序篇)

距离上次更新已经很久了&#xff0c;以前都是非常认真的写笔记进行知识分享&#xff0c;但是带来的情况并不是很好&#xff0c;一度认为发博客是没有意义的&#xff0c;但是这几天想了很多&#xff0c;已经失去了当时写博客的初心了&#xff0c;但是我觉得应该做点有意义的事&a…

xlrd与xlwt操作Excel文件详解

Python操作Excel的模块有很多&#xff0c;并且各有优劣&#xff0c;不同模块支持的操作和文件类型也有不同。下面是各个模块的支持情况&#xff1a; .xls.xlsx获取文件内容写入数据修改文件内容保存样式调整插入图片xlrd√√√xlwt√√√√√xlutils√√√√xlwings√√√√√…

13-5_Qt 5.9 C++开发指南_基于信号量的线程同步_Semaphore

文章目录 1. 信号量的原理2. 双缓冲区数据采集和读取线程类设计3. QThreadDAQ和QThreadShow 的使用4. 源码4.1 可视化UI设计框架4.2 qmythread.h4.3 qmythread.cpp4.4 dialog.h4.5 dialog.cpp 1. 信号量的原理 信号量(Semaphore)是另一种限制对共享资源进行访问的线程同步机制…

【MMCV】mmpretrain/mmclassification概览、环境安装与验证

概览 MMPretrain 是一个全新升级的预训练开源算法框架,旨在提供各种强大的预训练主干网络, 并支持了不同的预训练策略。MMPretrain 源自著名的开源项目 MMClassification 和 MMSelfSup,并开发了许多令人兴奋的新功能。 目前,预训练阶段对于视觉识别至关重要,凭借丰富而强…

基于linux下的高并发服务器开发(第四章)- 多线程实现并发服务器

>>了解文件描述符 文件描述符分为两类&#xff0c;一类是用于监听的&#xff0c;一类是用于通信的&#xff0c;在服务器端既有监听的&#xff0c;又有通信的。而且在服务器端只有一个用于监听的文件描述符&#xff0c;用于通信的文件描述符是有n个。和多少个客户端建立了…

【Spring Boot】请求参数传json对象,后端采用(map)CRUD案例(101)

请求参数传json对象&#xff0c;后端采用&#xff08;map&#xff09;接收的前提条件&#xff1a; 1.Spring Boot 的Controller接受参数采用&#xff1a;RequestBody 2.需要一个Json工具类&#xff0c;将json数据转成Map&#xff1b; 工具类&#xff1a;Json转Map import com…

Linux部署jar包,隐藏命令行参数

Linux部署jar包&#xff0c;隐藏命令行参数 一、背景需求二、查阅资料三、实现隐藏库3.1、测试test.c3.2、设置隐藏库3.3、验证 四、应用jar启动命令五、直接应用结果 最新项目安全检测&#xff0c;发现配置文件中数据库密码&#xff0c;redis密码仍处理明文状态 于是整理了一篇…

linux快速安装tomcat

linux快速安装tomcat 前提安装好jdk 下载Tomcat安装包 wget https://dlcdn.apache.org/tomcat/tomcat-10/v10.0.27/bin/apache-tomcat-10.0.27.tar.gz如果出现颁发的证书已经过期的错误提示,用下面命令 wget --no-check-certificate https://dlcdn.apache.org/tomcat/tomcat-1…