三、Kafka生产者

目录

    • 3.1 生产者消息发送流程
      • 3.1.1 发送原理
    • 3.2 异步发送 API
    • 3.3 同步发送数据
    • 3.4 生产者分区
      • 3.4.1 kafka分区的好处
      • 3.4.2 生产者发送消息的分区策略
      • 3.4.3 自定义分区器
    • 3.5 生产者如何提高吞吐量
    • 3.6 数据可靠性

3.1 生产者消息发送流程

3.1.1 发送原理

3.2 异步发送 API

3.3 同步发送数据

3.4 生产者分区

3.4.1 kafka分区的好处

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

3.4.2 生产者发送消息的分区策略

在这里插入图片描述

3.4.3 自定义分区器

1、需求:
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区

2、定义类实现 Partitioner 接口,重写 partition()方法。

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 获取数据 atguigu  hello
        String msgValues = value.toString();

        int partition;

        if (msgValues.contains("atguigu")){
            partition = 0;
        }else {
            partition = 1;
        }

        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

3、使用分区器的方法,在生产者的配置中添加分区器参数

public class CustomProducerCallbackPartitions {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 关联自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null) {
                        System.out.println("主题: " + metadata.topic() + " 分区: " + metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

在这里插入图片描述



3.5 生产者如何提高吞吐量

  • 分批次发送消息
  • 对生产者消息采用压缩

四个重要参数:
在这里插入图片描述

public class CustomProducerParameters {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");


        // 1 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 50; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

3.6 数据可靠性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/83136.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp 顶部头部样式

<u-navbartitle"商城":safeAreaInsetTop"true"><view slot"left"><image src"/static/logo.png" mode"" class"u-w-50 u-h-50"></image></view></u-navbar>

深入了解 Java 中 Files 类的常用方法及抽象类的 final 修饰

文章目录 Files 类常用方法抽象类的 final 修饰 &#x1f389;欢迎来到Java学习路线专栏~深入了解 Java 中 Files 类的常用方法及抽象类的 final 修饰 ☆* o(≧▽≦)o *☆嗨~我是IT陈寒&#x1f379;✨博客主页&#xff1a;IT陈寒的博客&#x1f388;该系列文章专栏&#xff1a…

JetBrains IDE远程开发功能可供GitHub用户使用

JetBrains与GitHub去年已达成合作&#xff0c;提供GitHub Codespaces 与 JetBrains Gateway 之间的集成。 GitHub Codespaces允许用户创建安全、可配置、专属的云端开发环境&#xff0c;此集成意味着您可以通过JetBrains Gateway使用在 GitHub Codespaces 中运行喜欢的IDE进行…

数字化赋能高质量施工,成企业创新转型新方向

建筑行业是一个需要投入大量资金、能源消耗大、风险高且劳动力密集的行业&#xff0c;传统施工管理方式存在着“无法实时控制进度、无法实时控制质量、材料浪费、常需返工、安全事件频发”等问题。 为了自身的转型升级&#xff0c;也为了响应国家战略规划落地对建筑行业提出的要…

Java源码分析(一)Integer

当你掌握Java语言到了一定的阶段&#xff0c;或者说已经对Java的常用类和API都使用的行云流水。你会不会有一些思考&#xff1f;比如&#xff0c;这个类是如何设计的&#xff1f;这个方法是怎么实现的&#xff1f;接下来的一系列文章&#xff0c;我们一起学习下Java的一些常见类…

day-27 代码随想录算法训练营(19)part03

78.子集 画图分析&#xff1a; 思路&#xff1a;横向遍历&#xff0c;每次遍历的时候都进行一次添加&#xff0c;然后进行纵向递归&#xff0c;递归完之后进行回溯。 注意&#xff1a;空集也是子集。 90.子集|| 分析&#xff1a;和上题一样&#xff0c;区别在于有重复数字 …

docker导出、导入镜像、提交

导出镜像到本地&#xff0c;然后可以通过压缩包的方式传输。 导出&#xff1a;docker image save 镜像名:版本号 > /home/quxiao/javatest.tgz 导入&#xff1a;docker image load -i /home/quxiao/javatest.tgz 删除镜像就得先删除容器&#xff0c;当你每运行一次镜像&…

【【STM32-SPI通信协议】】

STM32-SPI通信协议 STM32-SPI通信协议 •SPI&#xff08;Serial Peripheral Interface&#xff09;是由Motorola公司开发的一种通用数据总线 •四根通信线&#xff1a;SCK&#xff08;Serial Clock&#xff09;、MOSI&#xff08;Master Output Slave Input&#xff09;、MISO…

深度学习最强奠基作ResNet《Deep Residual Learning for Image Recognition》论文解读(上篇)

1、摘要 1.1 第一段 作者说深度神经网络是非常难以训练的&#xff0c;我们使用了一个残差学习框架的网络来使得训练非常深的网络比之前容易得很多。 把层作为一个残差学习函数相对于层输入的一个方法&#xff0c;而不是说跟之前一样的学习unreferenced functions 作者提供了…

项目实战 — 博客系统③ {功能实现}

目录 一、编写注册功能 &#x1f345; 1、使用ajax构造请求&#xff08;前端&#xff09; &#x1f345; 2、统一处理 &#x1f384; 统一对象处理 &#x1f384; 保底统一返回处理 &#x1f384; 统一异常处理 &#x1f345; 3、处理请求 二、编写登录功能 &#x1f345; …

Leetcode-每日一题【剑指 Offer 33. 二叉搜索树的后序遍历序列】

题目 输入一个整数数组&#xff0c;判断该数组是不是某二叉搜索树的后序遍历结果。如果是则返回 true&#xff0c;否则返回 false。假设输入的数组的任意两个数字都互不相同。 参考以下这颗二叉搜索树&#xff1a; 5 / \ 2 6 / \ 1 3 示例 1&#xff1a; 输入: […

Vue--BM记事本

效果如下&#xff1a; 用到了如下的技术&#xff1a; 1.列表渲染&#xff1a;v-for key的设置 2.删除功能&#xff1a;v-on调用参数 fliter过滤 覆盖修改原数组 3.添加功能&#xff1a;v-model绑定&#xff0c;unshift修改原数组添加 html文件如下&#xff1a; <!DOCTYPE …

(排序) 剑指 Offer 21. 调整数组顺序使奇数位于偶数前面 ——【Leetcode每日一题】

❓剑指 Offer 21. 调整数组顺序使奇数位于偶数前面 难度&#xff1a;简单 输入一个整数数组&#xff0c;实现一个函数来调整该数组中数字的顺序&#xff0c;使得所有奇数在数组的前半部分&#xff0c;所有偶数在数组的后半部分。 示例&#xff1a; 输入&#xff1a;nums [1…

3D医学教学虚拟仿真系统:身临其境感受人体结构和功能

3D医学教学虚拟仿真系统是一种基于虚拟现实技术的教学工具&#xff0c;它可以帮助学生更好地理解和掌握医学知识。这种课件通常包括人体解剖学、生理学、病理学等方面的教学内容&#xff0c;通过三维立体的图像和动画展示&#xff0c;让学生更加直观地了解人体结构和功能。 与传…

CentOS系统环境搭建(十六)——es7安装ik分词器(纯命令行安装)

centos系统环境搭建专栏&#x1f517;点击跳转 关于Elasticsearch的安装请看CentOS系统环境搭建&#xff08;十二&#xff09;——CentOS7安装Elasticsearch。 es7安装ik分词器&#xff08;纯命令行安装&#xff09; 1.找版本 我的Elasticsearch是7.17.6的&#xff0c;下载ik…

Mac安装opencv后无法导入cv2的解决方法

前提条件&#xff1a;以下两个插件安装成功 pip install opencv-python pip install --user opencv-contrib-python 注&#xff1a;直接用pip install opencv-contrib-python如果报错&#xff0c;就加上“–user" 第一步&#xff1a; 设置–添加python解释器 第二步&am…

C++笔记之条件变量(Condition Variable)与cv.wait 和 cv.wait_for的使用

C笔记之条件变量&#xff08;Condition Variable&#xff09;与cv.wait 和 cv.wait_for的使用 参考博客&#xff1a;C笔记之各种sleep方法总结 code review! 文章目录 C笔记之条件变量&#xff08;Condition Variable&#xff09;与cv.wait 和 cv.wait_for的使用1.条件变量&…

小程序swiper一个轮播显示一个半内容且实现无缝滚动

效果图&#xff1a; wxml&#xff08;无缝滚动&#xff1a;circular"true"&#xff09;&#xff1a; <!--components/tool_version/tool_version.wxml--> <view class"tool-version"><swiper class"tool-version-swiper" circul…

五款拿来就能用的炫酷表白代码

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;小白零基础《Python入门到精通》 五款炫酷表白代码 1、无限弹窗表白2、做我女朋友好吗&#xff0c;不同意就关机3、…

前端框架Vue

Vue 学习路线 学习HTML、CSS和JavaScript基础知识&#xff1a;Vue是基于JavaScript的框架&#xff0c;所以首先需要掌握HTML、CSS和JavaScript的基础知识&#xff0c;包括DOM操作、事件处理、变量和函数等。 学习Vue的基本概念&#xff1a;了解Vue的核心概念&#xff0c;如Vu…