Producer

Producer开发样例

版本说明

新客在这里插入图片描述
户端, 从Kafka 0.9.x 开始, client基于Java语言实现。同时提供C/C++, Python等其他客户端实现。

开发步骤

  1. 配置客户端参数以及创建客户端实例;
  2. 构建待发送消息;
  3. 发送消息;
  4. 关闭生产者实例;

代码示例

public class KafkaProducer {
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.server", "localhost:9092");
        // key.serializer
        // value.serializer
        // client.id xxx
        return props;
    }
    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello, world");
        try {
            producer.send(record);
        } catch(Exception e) {

        }
    }
}

Producer参数配置

配置项目

部分配置项在后续文章中介绍

配置项意义
bootstrap.serversbroker列表(至少2个, client会从中得到所有)
key.serializer序列化key使用
value.serializer序列化value使用
client.id默认为"“,不设置会创建为"producer-1”,"producer-2"等
partitioner.class为消息分配分区使用
interceptor.classes执行消息拦截逻辑

小技巧

基本原则: 将拼写配置转换为代码编译, 借助代码编译器的校验能力来辅助检查

  1. 配置项拼写错误通过引用静态变量解决;
  2. key.serializer填写应该为全限定类名, 容易拼写错误, 可以基于Serializer.class.getName()来解决;
  3. KafkaProducer是ThreadSafe;

消息发送

消息构造

由于使用Kafka发送消息是一个非常频繁的动作, 因此ProduceRecord的构造也非常频繁。构造ProducerRecord对象, 必选属性key,value, 其他均为可选属性。

class ProduceRecord {
    String topic;
    Integer partition;
    Headers headers; // 增加应用相关信息
    K key; // 相同key被发送到同一个partition, 支持消息压缩
    V value;
    Long timestamp; // CreateTime 创建时间; LogAppendTime 追加到日志文件的时间;
}

发送方式

Kafka Producer本身支持3种模式, 同步, 异步和发后即忘, 并且Kafka Producer在实现上做到了三种模式的统一。
Producer#send声明如下:

Future<ReocrdMetadata> send(ProducerRecord<> record);

具体具体使用哪种模式, 取决于我们对返回值Future的处理。

模式实现Future处理
同步发送线程,Future#get获得结果
异步非发送线程单独处理
发后即忘不处理

关于异步模式, 实际中更多基于callback处理, 即调用send(record, callback)方法比较多, 避免应用侧对Future的管理。Producer内部可以保证对callback调用的顺序是分区有序。

class Callback{
    public void onComplete(RecordMetadata meta, Exception e) {}
}

异常处理

发送异常一般由2种, 可重试异常(多由于集群处于一种状态迁移过程中, 比如Leader选举过程, partition rebalance过程)和不可重试异常(不满足硬性约束, 比如RecordTooLarge)。对于可重试异常, 如果配置了retries参数, KafkaProducer内部会自动重试给定次数, 依然不成功则抛出异常。

|发送模式| 结果 | 可靠性 vs 性能 |
|—|—|—|—|
|同步| 成功或异常 | 可靠性最好但牺牲性能 |
|异步| 成功或异常 | 兼顾可靠性和性能 |
|发送即忘| 不确定 | 性能最好牺牲可靠性 |

资源释放

直接通过close()或者close(long timeout, TimeUnit timeUnit)方法完成。后者支持等待一定时间, 建议基于后者来完成, 实际应用中的关闭是个复杂的过程, 也是会受到协作应用影响的过程, 但好在最终由操作系统兜底完成资源释放。底线是避免应用侧产生错误数据, 因此如何关闭是个case by case的选择。

serializer

作用

发送侧: 将待发送的对象转换为byte[], 在网络上传输。
接受侧: 将接收到的byte[]转换为Java对象, 在应用中处理。

常见类型

Byte、Short、Long、Float、Double、String对应的Serializer。当然也可以自己实现Serializer。

约束

发送侧和接收侧应该使用兼容的Serialzer, 否则无法进行消息解码, 因此建议使用通用serilizer。

partitioner

作用

给待发送的消息分配消息分区。如果ProducerRecord中的partition字段已设置, 则Partitioner不起作用, 否则将由Partitioner决定消息分区。

默认与自定义

Kafka默认的Partioner是DefaultPartioner。我们可以基于Partioner接口进行自定义, 自定义Partitioner可以通过partitioner.class来显示指定。

使用案例

大型电商存在多个仓库, 使用仓库名称或者ID作为key, 灵活记录商品/发单信息。

Producer Interceptor

声明与作用

ProducerInterceptor声明

ProducerRecord<K,V> onSend(ProducerRecord<K,V> record);
void onAcknowledgement(RecordMetadata metadata, Exception e);
void close();

KafkaProducer在消息序列化和分区前调用onSend, 在有发送结果后调用onAcknowledgement,该方法提前于Callback执行。
自定义实现后需要在配置项interceptor.classes中声明。
拦截器可以按顺序形成拦截器链, Kafka的拦截器链会从上一个执行成功的上下文继续执行, 如果拦截器出现异常, 可能产生副作用。

使用场景

  1. 类型于Java Web开发中的Filter, 增加一些通用的规则性逻辑, 比如增加统一前缀。

整体流程

在这里插入图片描述

消息发送过程涉及两个关键线程main和sender。Main Thread中, 应用侧完成消息放入RecordAccumulator中。sender则轮询RecordAccumulator, 完成消息发送。

其中RecordAccumulator, 按照partion完成消息合并, 将消息发送单位从逐条发送, 转变为按批发送, 从而提高消息发送效率。

Sender则将每个partion的消息转换为面向每个Node的请求, 毕竟partion是个逻辑概念, node才是物理存在的。

在整个发送过程中, producer需要知道cluster对应的metadata, 例如node/partion对应关系等, 从而及时调整目标Node。这里也涉及metadata更新等问题。这里仅做简要说明, 后续文章中做进一步阐述。

总结

本文介绍了Kafka Producer发送数据中涉及的线程和各自的职责,重点介绍了与应用直接相关的Interceptor, Serializer和Partitioner。希望能帮助你初步认识Kafka, 感谢你的阅读。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/137985.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++标准模板(STL)- 类型支持 (受支持操作,检查类型是否有拥有移动赋值运算符)

类型特性 类型特性定义一个编译时基于模板的结构&#xff0c;以查询或修改类型的属性。 试图特化定义于 <type_traits> 头文件的模板导致未定义行为&#xff0c;除了 std::common_type 可依照其所描述特化。 定义于<type_traits>头文件的模板可以用不完整类型实例…

视频剪辑:掌握视频封面提取与剪辑技巧,提升视频传播效果

在数字媒体时代&#xff0c;视频已成为人们获取信息、娱乐和交流的重要方式。而一个吸引人的视频封面往往能吸引更多的观众点击观看&#xff0c;因此&#xff0c;掌握视频封面提取与剪辑技巧对于提升视频传播效果至关重要。视频封面是视频的“门面”&#xff0c;它不仅展示视频…

Vue.js中的路由(router)和Vue Router的作用?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

牛客刷题记录11.12

继承和组合 二进制数统计 1的个数 和 0 的个数

操作系统 | 虚拟机及linux的安装

​ &#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《操作系统实验室》&#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 目录结构 1.操作系统实验之虚拟机及linux的安装 1.1 实验目的 1.2 实验内容 1.3 实验步骤 …

Linux常用命令——bzip2recover命令

在线Linux命令查询工具 bzip2recover 恢复被破坏的.bz2压缩包中的文件 补充说明 bzip2recover命令可用于恢复被破坏的“.bz2”压缩包中的文件。 bzip2是以区块的方式来压缩文件&#xff0c;每个区块视为独立的单位。因此&#xff0c;当某一区块损坏时&#xff0c;便可利用b…

【框架篇】统一异常处理

✅作者简介&#xff1a;大家好&#xff0c;我是小杨 &#x1f4c3;个人主页&#xff1a;「小杨」的csdn博客 &#x1f433;希望大家多多支持&#x1f970;一起进步呀&#xff01; 1&#xff0c;统一异常处理的介绍 统⼀异常处理使⽤的是 ControllerAdvice ExceptionHandler 来…

vue前端项目配置

目录 背景&#xff1a; 0.参考文档 0.1介绍 | Vue CLI (vuejs.org)&#xff08;官方文档&#xff09; 0.2【vue-cli5 bug】npm run build自动编译两次??? - 掘金 (juejin.cn) 0.3vendor.js文本过大 0.4vue性能优化 0.5启动项目一直是生产环境 0.6process.env&#x…

固定主机1500PLC与两台移动1200PLC之间以太网通讯

本方案搭建的是固定主机1500PLC与两台移动1200PLC之间以太网通讯。 无线通讯网络搭建 首先在固定端主机设备上的西门子S7-1500PLC上搭载一块达泰DTD418MB作为主站。然后在两台移动的西门子S7-1200PLC上分别搭载一块达泰DTD418MB作为从站。由此&#xff0c;便通过DTD418MB搭建…

Pandas教程(非常详细)(第五部分)

接着Pandas教程&#xff08;非常详细&#xff09;&#xff08;第四部分&#xff09;&#xff0c;继续讲述。 二十五、Pandas sample随机抽样 随机抽样&#xff0c;是统计学中常用的一种方法&#xff0c;它可以帮助我们从大量的数据中快速地构建出一组数据分析模型。在 Pandas…

BIO、NIO、AIO之间有什么区别

文章目录 BIO优缺点示例代码 NIO优缺点示例代码 AIO优缺点示例代码 总结 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 BIO、NIO和AIO是Java编程语言中用于处理输入输出&#xff08;IO…

【商城更新】神秘市场通行证上架、齿轮头归来//及下架内容

本周商城将于11月8号更新。本次商城除了神秘市场2023通行证上架之外&#xff0c;还有齿轮头黑货箱也会上架藏匿处。随之小兔奇趣齐聚大礼包、危险玩偶大礼包等饰品下架商城。 上架饰品&#xff1a; ▲神秘市场2023通行证 神秘市场2023通行证基础版 售价&#xff1a;1200G-coi…

租用服务器带宽类型应用

服务器带宽类型多样&#xff0c;以满足不同行业的需求。本文将介绍香港常见的服务器带宽类型及其应用领域。 1. 共享带宽 共享带宽是指多个用户共同使用同一台服务器的带宽资源。这种带宽类型适用于小型企业或个人网站&#xff0c;因为其成本较低。由于多个用户共享带宽资源&…

力扣第516题 最长回文子序列 c++ 动态规划 附Java代码 注释版

题目 516. 最长回文子序列 中等 相关标签 字符串 动态规划 给你一个字符串 s &#xff0c;找出其中最长的回文子序列&#xff0c;并返回该序列的长度。 子序列定义为&#xff1a;不改变剩余字符顺序的情况下&#xff0c;删除某些字符或者不删除任何字符形成的一个序列。…

勘察设计考试公共基础之数学篇

1、数学 向量点积&#xff1a;向量叉积&#xff1a;平面的法向量为n&#xff08;A&#xff0c;B&#xff0c;C&#xff09;&#xff0c;则该平面的点法式方程为&#xff1a; A&#xff08;x-x0&#xff09;B&#xff08;y-y0&#xff09;C&#xff08;z-z0&#xff09;0 两平…

upload-labs关卡8(基于黑名单的点绕过)通关思路

文章目录 前言一、回顾上一关知识点二、靶场第八关通关思路1、看源代码2、点绕过3、验证文件是否成功上传 总结 前言 此文章只用于学习和反思巩固文件上传漏洞知识&#xff0c;禁止用于做非法攻击。注意靶场是可以练习的平台&#xff0c;不能随意去尚未授权的网站做渗透测试&am…

并发事务下,不同隔离级别可能出现的问题

并发事务下&#xff0c;不同隔离级别可能出现的问题 1、事务的 ACID2、并发事务下&#xff0c;不同隔离级别可能出现的问题2.1、脏写2.2、脏读2.3、不可重复读2.4、幻读 3、SQL 中的四种隔离级别 1、事务的 ACID 原子性&#xff08;Atomicity&#xff09;&#xff1a;原子性意味…

贝锐向日葵如何实现无人值守远程控制?

1.适用场景 &#xff08;1&#xff09;远程公司电脑应急办公&#xff08;2&#xff09;远程家里电脑游戏挂机&#xff08;3&#xff09;异地远程传输文件 2.操作步骤 &#xff08;1&#xff09;电脑安装向日葵个人版并登录贝锐账号&#xff08;点击注册&#xff09;&#xf…

Python - GFPGAN + MoviePy 提高人物视频画质

目录 一.引言 二.gif_to_png 三.gfp_gan 四.png_to_gif 五.总结 一.引言 前面我们介绍了 GFP-GAN 提高人脸质量 与 OCR 提取视频台词、字幕&#xff0c;前者可以提高图像质量&#xff0c;后者可以从视频中抽帧&#xff0c;于是博主便想到了将二者进行结合并优化人物 GIF …

Web实验(总)

目录 网站需求&#xff1a; 思路&#xff1a; 实验步骤&#xff1a; 第一步&#xff1a;准备工作 第二步&#xff1a;新建一个存储网页的目录 第三步&#xff1a;修改本地hosts映射 第四步&#xff1a;修改配置文件&#xff0c;建立基于http服务的网站 1)创建用户song和…