Kafka生产者消息发送流程原理及源码分析

Kafka是一个分布式流处理平台,它能够以极高的吞吐量处理数据。在Kafka中,生产者负责将消息发送到Kafka集群,而消费者则负责从Kafka集群中读取消息。本文将探讨Kafka生产者消息发送流程的细节,包括消息的序列化、分区分配、记录提交等关键步骤。

先看一个生产者发送消息的代码样例

public class MyProducer1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Map<String, Object> configs = new HashMap<>();
        // 指定初始连接用到的broker地址
        configs.put("bootstrap.servers", "node164:9092");
        // 指定key的序列化类
        configs.put("key.serializer", IntegerSerializer.class);
        // 指定value的序列化类
        configs.put("value.serializer", StringSerializer.class);
        //borker集群消息持久化控制
        configs.put("acks", "all");
        //重试次数
        configs.put("reties", "3");
        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
        
        // 用于设置用户自定义的消息头字段
        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));

        ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
                "test_topic",
                0,
                0,
                "hello world 0",
                headers
        );

        // 消息异步确认
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println("消息的主题:" + metadata.topic());
                    System.out.println("消息的分区号:" + metadata.partition());
                    System.out.println("消息的偏移量:" + metadata.offset());
                } else {
                    System.out.println("异常消息:" + exception.getMessage());
                }
            }
        });

        // 关闭生产者
        producer.close();
    }
}

通过跟踪producer.send源码可知生产者发送消息的大体流程如下图,RecordAccumulator的消息发送到brokers实际上由Sender线程处理,下图暂时忽略,先看producer主线程处理的一些细节。

  • KafkaProducer构造函数根据客户端参数初始化拦截器、序列化器、分区器,创建Sender守护线程。
  • 调用send函数发送消息时,其内部使用异步消息发送,消息经过拦截器、序列化器、分区器后缓存到缓冲区。
  • 批次发送的条件为:缓冲区数据⼤⼩达到batch.size或者linger.ms达到上限。
  • 缓冲区消息发送到指定分区,落盘到broker。如果发送失败,客户端将根据设置的重试参数进行重试,如果超过了重试次数,抛出异常。
  • 发送成功,返回RecordMetadata元数据到客户端。如果是同步调用将阻塞等待元数据返回,如果是异步调用将通过Callback接口进行回调返回元数据

生产者拦截器

KafkaProducer调用send方法后,如果有设置拦截器,会先经过拦截器,默认是不会经过任何拦截器的,除非客户端配置了拦截器(interceptor.classes参数),send函数如下

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

可见,拦截器列表会被首先执行,而拦截器的初始化则是在KafkaProducer的 构造函数中,部分源码如下

List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

可见,拦截器是通过客户端配置的ProducerConfig.INTERCEPTOR_CLASSES_CONFIG来初始化的,拦截器必须实现ProducerInterceptor接口。

public interface ProducerInterceptor<K, V> extends Configurable {

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);


    public void onAcknowledgement(RecordMetadata metadata, Exception exception);


    public void close();
}

拦截器接口共有三个接口,第一个onSend接口把ProducerRecord直接传了进来,我们可以在实现接口时,对原消息进行统一处理,比如添加一些业务相关的头部信息等。onAcknowledgement接口则可以在确认消息发送成功后做一些操作,最后close接口则可以在拦截器关闭时清理一些资源。

如需要自定义拦截器则直接实现ProducerInterceptor接口,实现相关方法,在客户端进行配置即可,客户端配置示例:

 // 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开
  configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.xxx.CustomInterceptorOne,com.xxx.CustomInterceptorTwo");

生产者序列化器

拦截器处理完后,将进入到doSend方法,在发送消息前,首先会根据客户端配置的序列化器对key和value进行序列化。

序列化接口如下:

public interface Serializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    void configure(Map<String, ?> configs, boolean isKey);

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    byte[] serialize(String topic, T data);

    /**
     * Close this serializer.
     *
     * This method must be idempotent as it may be called multiple times.
     */
    @Override
    void close();
}

在Kafka中,消息可以是任何类型的数据,如字符串、JSON、二进制数据等。为了将这些数据存储到Kafka集群中,Kafka需要对它们进行序列化。Kafka提供了多种序列化器,如StringSerializer、JsonSerializer等。生产者可以根据自己的需求选择合适的序列化器来序列化消息。如果默认提供的序列化器仍未满足需求,实现上面的Serializer接口,然后在客户端配置自己的序列化器即可。通过接口可以看出,序列化器最终将key和value序列化成字节数组。

doSend方法使用序列化器的部分源码:

byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/700898.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【QT】记录一次QT程序发布exe过程

记录一次QT程序发布exe过程 使用windeploy与enigma发布独立的QT程序第一步 QT编译输出 **release** 版本第二步 QT 自带 windepoyqt 补全链接库第三步 enigma virtual box压缩打包为单一exe最后【2024-06-07 17】- 【补充】 贴一个自己用的bat脚本【**QtDeploy2exe.bat**】半自…

python数据分析--- ch3-5 python数字类型、算术运算符及流程控制语句

python数据分析--- ch3-5 python数字类型、算术运算符及流程控制语句 1.Ch3--数字类型的数据1.1 Python中的数据类型1.1.1整数类型(int)1.1.2 浮点类型(float)1.1.3复数类型(complex)1.1.4 布尔类型(bool) 1.2 数字类型的相互转换1.2.1 隐式类型的转换1.2.2 显式类型的转换 2. …

Python编程基础5

邮件编程 SMTP&#xff08;Simple Mail Transfer Protocol&#xff09;简单邮件传输协议&#xff0c;使用TCP协议25端口&#xff0c;它是一组用于由源地址到目的地址传送邮件的规则&#xff0c;由它来控制信件的中转方式。python的smtplib提供了一种很方便的途径发送电子邮件。…

惠州惠城:可燃气体报警器定期校准检测,安全更放心

在惠州惠城这片繁华的土地上&#xff0c;工业发展日新月异&#xff0c;安全问题愈发受到重视。其中&#xff0c;可燃气体报警器作为预防火灾和爆炸事故的重要设备&#xff0c;正在越来越多的场所得到应用。 今天&#xff0c;佰德就来探讨一下可燃气体报警器在惠州惠城的重要性…

实测 WordPress 最佳优化方案:WP Super Cache+Memcached+CDN

说起 WordPress 优化加速来可以说是个经久不衰的话题了&#xff0c;包括明月自己都撰写发表了不少相关的文章。基本上到现在为止明月的 WordPress 优化方案已经固定成型了&#xff0c;那就是 WP Super CacheMemcachedCDN 的方案&#xff0c;因为这个方案可以做到免费、稳定、安…

计算机网络知识CIDR(无类别域区间路由)

目录 介绍 基本信息 优点与关联 如何计算判定范围&#xff08;你应该是来看这个的&#xff0c;前面是水字数的&#xff09; 省流版 介绍 无类别域间路由&#xff08;Classless Inter-Domain Routing、CIDR&#xff09;是一个用于给用户分配IP地址以及在互联网上有效地路由…

STM32项目分享:智能蓝牙手环

目录 一、前言 二、项目简介 1.功能详解 2.主要器件 三、原理图设计 四、PCB硬件设计 1.PCB图 2.PCB板打样焊接图 五、程序设计 六、实验效果 七、资料内容 项目分享 一、前言 项目成品图片&#xff1a; 哔哩哔哩视频链接&#xff1a; https://www.bilibili.c…

PCA与LDA

共同点 降维方法&#xff1a; PCA和LDA都是数据降维的方式&#xff0c;它们都能通过某种变换将原始高维数据投影到低维空间。 数学原理&#xff1a; 两者在降维过程中都使用了矩阵特征分解的思想&#xff0c;通过对数据的协方差矩阵或类间、类内散度矩阵进行特征分解&#xff…

鸿蒙低代码开发的局限性

在版本是DevEco Studio 3.1.1 Release&#xff0c;SDK是3.1.0(API9) 的基础上。 1、低代码插件没有WebView组件。 2、低代码插件没有空白的自定义组件&#xff0c;当前提供的所谓自定义组件&#xff0c;只能用列表中提供的组件来拼接新的组件。 3、使用ets代码自定义的组件&…

bugku--web---baby lfi

1、题目描述 2、页面提示使用language参数 3、构造url:/?languagefr。页面有回显 4、这里提示包含关键的文件 5、构造url:/?language/etc/passwd 6、flag shellmates{10CA1_F11e_1Nc1US10n_m4y_r3ve4l_in7Er3st1nG_iNf0Rm4t1on}

单北斗定位手持终端的优势

在追求精准与效率的现代生活中&#xff0c;单北斗定位手持终端以其独特优势&#xff0c;为众多行业领域带来了前所未有的便利与安全保障。凭借北斗卫星导航系统的高精度定位能力&#xff0c;这款单北斗定位手持终端不仅能在复杂环境中实现厘米级别的位置锁定&#xff0c;还具备…

五分钟“手撕”二叉树

代码放开头&#xff0c;供大家查阅。 但是对于树来说&#xff0c;更重要的是理解树的概念&#xff0c;树的概念很多&#xff0c;题却是千篇一律&#xff0c;这篇博客详细的讲解了概念&#xff0c;看完必有很大的收获。 目录 一、实现代码 二、什么是树 三、树的重要概念 四…

探索未来教育的智慧教学整体架构:数字化时代的教育革命

在数字化时代&#xff0c;教育领域也正在经历着前所未有的变革和创新。智慧教学整体架构作为教育技术与教学实践的完美结合&#xff0c;正在成为推动教育革命的关键力量。本文将深入探讨智慧教学整体架构的概念、核心组成部分以及其在未来教育中的应用前景。 **智慧教学整体架构…

C++设计模式---装饰器模式

1、介绍 装饰器&#xff08;Decorator&#xff09;模式是一种结构型设计模式&#xff0c;它允许你动态地给一个对象添加额外的职责。 装饰器模式主要用于扩展对象的功能&#xff0c;而又不改变其原有结构。在C中&#xff0c;装饰器模式主要应用于那些需要为对象动态添加功能或改…

IO流的转换流

目录 ​编辑 转换流 利用转换流按照指定字符编码读取 利用转换流按照指定字符编码写出 将本地文件中的GBK文件。转成UTF-8 练习 序列化流 反序列化流 /对象操作输入流 序列化流/反序列化流的细节 练习 转换流 是字符流和字节流之间的桥梁 字符转换输入流:InputS…

QWidget 属性——windowTitle·windowIcon·qrc

&#x1f40c;博主主页&#xff1a;&#x1f40c;​倔强的大蜗牛&#x1f40c;​ &#x1f4da;专栏分类&#xff1a;QT ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 文章目录 一、windowTitle二、windowIcon三、qrc 一、windowTitle windowTitle 是一个通常用于表示窗口标题…

金融科技企业如何提高服务效率

一、引言 在金融科技&#xff08;FinTech&#xff09;领域&#xff0c;服务效率是企业竞争力的重要组成部分。随着科技的不断进步和市场竞争的加剧&#xff0c;金融科技企业必须不断提升服务效率&#xff0c;以满足客户需求&#xff0c;赢得市场份额。本文将从多个角度详细阐述…

AB测试学习(附有相关代码)

目录 一、基本概念1. 定义2. 作用3. 原理 二、实验基本原则三、实验步骤四、实验步骤详解1. 确定实验目的2. 确定实验变量3. 实验指标设计3.1 实验指标类型&#xff08;按作用区分&#xff09;3.1.1 核心指标3.1.2 驱动指标&#xff08;跟踪指标&#xff09;3.1.3 护栏指标 3.2…

pycharm爬取BOSS直聘岗位信息

编译器&#xff1a;Pycharm 效果展示如图 简单原理描述&#xff1a;模拟人工动作爬取页面信息&#xff0c;运行脚本后代码自动打开浏览器获取相关信息&#xff0c;模拟人工进行页面跳转并自动抓取页面信息记录到表格中。 深入原理描述&#xff1a;页面翻转的时候会调用接口&am…

Pixi.js学习 (四)鼠标跟随、元素组合与图片位控

目录 一、鼠标移动跟随 1.1 获取鼠标坐标 1.2 鼠标跟随 二、锚点、元素组合 2.1 锚点 2.2 元素组合 三、图片图层 四、实战 例题一&#xff1a;完成合金弹头人物交互 例题二&#xff1a;反恐重击瞄准和弹痕 例题一代码&#xff1a; 例题二代码&#xff1a; 总结 前言 为了提高作…