Kafka 之生产者(Producer)

目录

一. 前言

二. 生产消息

三. 幂等和事务

四. send() 发送消息

五. 原理解析


一. 前言

    Kafka生产者是一个应用程序,它负责向 Kafka 主题发送消息。这些消息可以用于多种目的,如记录用户活动、收集物联网设备数据、保存日志消息或缓存即将写入数据库的数据。

二. 生产消息

    生产者是线程安全的,在线程之间共享单个生产者实例,通常单例比多个实例要快。

    一个简单的例子,使用 Producer 发送一个有序的 key/value(键值对),放到 Java 的 main() 方法里就能直接运行(支持的版本 >= 0.9):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

    生产者的缓冲空间池保留尚未发送到服务器的消息,后台 I/O 线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会丢失这些消息。

    send() 方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。

    ack 是判别请求是否为完整的条件(就是判断是不是成功发送了)。我们指定了 all 将会阻塞消息,这种设置性能最低,但是是最可靠的。

    retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。

    producer(生产者)缓存每个分区未发送的消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批次。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。

    默认缓冲可立即发送,即便缓冲空间还没有满。但是,如果你想减少请求的数量,可以设置linger.ms 大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批次中。这类似于 TCP 的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了 linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的、更有效的请求。

    buffer.memory 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 max.block.ms设定,之后它将抛出一个 TimeoutException。

    key.serializervalue.serializer 示例,将用户提供的 key 和 value 对象 ProducerRecord 转换成字节,你可以使用附带的 ByteArraySerializaer 或 StringSerializer 处理简单的 String 或 Byte 类型。

三. 幂等和事务

    从 Kafka 0.11 开始,KafkaProducer 又支持两种模式:幂等生产者事务生产者。幂等生产者加强了 Kafka 的交付语义,从至少一次交付到精确一次交付。特别是生产者的重试将不再引入重复。事务性生产者允许应用程序原子地将消息发送到多个分区(和主题)。

    要启用幂等(idempotence),必须将 enable.idempotence 配置设置为 true。如果设置,则retries(重试)配置将默认为 Integer.MAX_VALUE,acks 配置将默认为 all。API 没有变化,所以无需修改现有应用程序即可利用此功能。

    此外,如果 send(ProducerRecord) 即使在无限次重试的情况下也会返回错误(例如消息在发送前在缓冲区中过期),那么建议关闭生产者,并检查最后产生的消息的内容,以确保它不重复。最后,生产者只能保证单个会话内发送的消息的幂等性

    要使用事务生产者和 attendant API,必须设置 transactional.id。如果设置了 transactional.id,幂等性会和幂等所依赖的生产者配置一起自动启用。此外,应该对包含在事务中的 Topic 进行耐久性配置。特别是,replication.factor 应该至少是3,而且这些 Topic 的 min.insync.replicas 应该设置为2。最后,为了实现从端到端的事务性保证,消费者也必须配置为只读取已提交的消息。

    transactional.id 的目的是实现单个生产者实例的多个会话之间的事务恢复。它通常是由分区、有状态的应用程序中的分片标识符派生的。因此,它对于在分区应用程序中运行的每个生产者实例来说应该是唯一的。

    所有新的事务性 API 都是阻塞的,并且会在失败时抛出异常。下面的例子说明了新的 API 是如何使用的。它与上面的例子类似,只是所有100条消息都是一个事务的一部分。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
} catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    producer.abortTransaction();
}
producer.close();

    如示例所示,每个生产者只能有一个未完成的事务。在 beginTransaction()commitTransaction() 调用之间发送的所有消息都将是单个事务的一部分。当指定 transactional.id时,生产者发送的所有消息都必须是事务的一部分。

    事务生产者使用异常来传递错误状态。特别是,不需要为 producer.send() 指定回调,也不需要在返回的 Future 上调用 get()。如果任何 producer.send() 或事务性调用在事务过程中遇到不可恢复的错误,就会抛出 KafkaException。

    该客户端可以与 0.10.0 或更高版本的 Broker 进行通信。旧的或较新的 Broker 可能不支持某些客户端功能。例如,事务性 API 需要 0.11.0 或更新版本的 Broker。当调用在运行的 Broker 版本中不可用的 API 时,您将收到 UnsupportedVersionException。

四. send() 发送消息

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

异步发送一条消息到 Topic,并调用 callback(当发送已确认)。语法说明如下: 

参数:

  • record:发送的记录(消息)。
  • callback:用户提供的 callback,服务器调用这个 callback 来应答结果(null 表示没有callback)。

声明的异常:

  • InterruptException:如果线程阻塞中断。
  • SerializationException:如果 key 或 value 不是给定有效配置的 serializers。
  • TimeoutException:如果获取元数据或消息分配内存花费的时间超过 max.block.ms。
  • KafkaException:Kafka 有关的错误(不属于公共 API 的异常)。

    send() 是异步的,并且一旦消息被保存在等待发送的消息缓存中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。

    发送的结果是一个 RecordMetadata,它指定了消息发送的分区,分配的 offset 和消息的时间戳。如果 Topic 使用的是 CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)如果 Topic 使用的是 LogAppendTime,则追加消息时,时间戳是 Broker 的本地时间。

    由于 send() 调用是异步的,它将为此消息的 RecordMetadata 返回一个 Future。如果 future 调用 get(),则将阻塞,直到相关请求完成并返回该消息的 metadata,或抛出发送异常。

如果要模拟一个简单的阻塞调用,你可以调用 get() 方法:

byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();

完全无阻塞的话,可以利用回调参数提供的请求完成时将调用的回调通知:

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               });

发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行在 callback2 之前:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

注意:callback 一般在生产者的 I/O 线程中执行,所以是相当快的,否则将延迟其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在 callback 主体中使用自己的Executor 来并行处理。

五. 原理解析

由上图可以看出:KafkaProducer 有两个基本线程: 

1. 主线程:负责消息创建、拦截器、序列化器、分区器等操作,并将消息追加到消息收集器RecoderAccumulator 中;

  • 消息收集器 RecoderAccumulator 为每个分区都维护了一个 Deque<ProducerBatch> 类型的双端队列。
  • ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响。
  • 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用。该缓存池只针对特定大小(batch.size 指定)的 ByteBuffer 进行管理,对于消息过大的缓存,不能做到重复利用。
  • 每次追加一条 ProducerRecord 消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个 ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size 建立新的 ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。

2. Sender线程:

  • 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch> 的式,Node 表示集群的 Broker 节点。
  • 进一步将 <Node, List<ProducerBatch> 转化为 <Node, Request> 形式,此时才可以向服务端发送数据。
  • 在发送之前,Sender 线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode,即当前 Node 中负载压力最小的一个,以实现消息的尽快发出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/386660.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

66万个 全国行政区划代码表

66万个全国各级行政区划代码表 提供的数据一览 简介 一共有66万个全国各级行政区划&#xff0c;一共有5个级别的行政单位级别 表格头部数据 表格尾部数据 全国行政单位各省份数量统计 数据下载地址 数据整理不易 百度云盘 链接: https://pan.baidu.com/s/1o1C2piYj2wu…

C#,数值计算,矩阵的行列式(Determinant)、伴随矩阵(Adjoint)与逆矩阵(Inverse)的算法与源代码

本文发布矩阵&#xff08;Matrix&#xff09;的一些初级算法。 一、矩阵的行列式&#xff08;Determinant&#xff09; 矩阵行列式是指矩阵的全部元素构成的行列式&#xff0c;设A(a)是数域P上的一个n阶矩阵&#xff0c;则所有A(a)中的元素组成的行列式称为矩阵A的行列式&…

仰暮计划|“​他们艰苦半生,但真的希望祖国安祥,山河无恙”

自述&#xff0c;自赎 我没有在那个年代生活过&#xff0c;我一出生就是盛世中国&#xff0c;看遍了祖国的大好河山。但我没想到&#xff0c;走了这么远的路&#xff0c;吃了这么多的苦的爷爷会一直跟我说“不是国家不好&#xff0c;只是中国的钱拿去还债了&#xff0c;过了那…

Linux释放内存

free -m是Linux上查看内存的指令&#xff0c;其中-m是以兆&#xff08;MB&#xff09;为单位&#xff0c;如果不加则以KB为单位。 如下图表示&#xff0c;&#xff08;total&#xff09;总物理内存是809MB&#xff0c;&#xff08;used&#xff09;已使用167MB&#xff0c;&…

零基础学Python(10)— 序列通用操作

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。本节课就带大家认识下Python语言中常见的序列通用操作&#xff01;~&#x1f308; 目录 &#x1f680;1.索引 &#x1f680;2.切片 &#x1f680;3.序列加法 &#x1f680;4.序列乘法 &#x1f680;5.检查某个元素是…

Python:Pygame游戏编程简述

Python是一种广泛使用的编程语言&#xff0c;它简洁、易懂并且功能强大。在Python的世界中&#xff0c;有许多库和模块可供选择&#xff0c;其中之一就是Pygame。Pygame是一个Python库&#xff0c;用于开发2D游戏&#xff0c;它提供了许多工具和函数&#xff0c;使得游戏开发变…

【2024年毕设系列】如何使用Anaconda和Pycharm

【2024年毕设系列】如何使用Anaconda和Pycharm 视频教程地址&#xff1a;【2024毕设系列】Anaconda和Pycharm如何使用_哔哩哔哩 Hi&#xff0c;各位好久不见&#xff0c;这里是肆十二&#xff0c;首先在这里给大伙拜年了。 诸位过完年之后估计又要开始为了大作业和毕业设计头疼…

移动机器人激光SLAM导航(五):Cartographer SLAM 篇

参考 Cartographer 官方文档Cartographer 从入门到精通 1. Cartographer 安装 1.1 前置条件 推荐在刚装好的 Ubuntu 16.04 或 Ubuntu 18.04 上进行编译ROS 安装&#xff1a;ROS学习1&#xff1a;ROS概述与环境搭建 1.2 依赖库安装 资源下载完解压并执行以下指令 https://pa…

CSRNET图像修复,DNN

CSRNET图像修复 CSRNET图像修复&#xff0c;只需要OPENCV的DNN

【安装指南】图床神器之Picgo下载、安装详细教程

&#x1f33c;一、概述 PicGo是一款开源的图片上传、管理工具&#xff0c;旨在帮助用户快速上传图片到云存储或图床&#xff0c;并提供链接方便在网页或其他应用中使用。它支持各种常见的图床服务商&#xff0c;如GitHub、七牛云、腾讯云等&#xff0c;并提供了简洁易用的界面和…

Vscode 在汇编文件中添加调试断点

Vscode 在汇编文件中添加调试断点 vscode默认不支持汇编文件添加断点, 可以在设置里面打开

软件实例分享,超市便利店进销存管理系统收银软件教程

软件实例分享&#xff0c;超市便利店进销存管理系统收银软件教程 一、前言 以下软件教程以 佳易王超市进销存管理软件V16.0为例说明 软件文件下载可以点击最下方官网卡片——软件下载——试用版软件下载 软件程序导航&#xff0c;系统设置&#xff1a;有管理员账号设置其他账…

前端工程化之:webpack3-5(css module)

目录 一、css module 1.思路 2.实现原理 3.如何应用样式 4.其他操作 &#xff08;1&#xff09;全局类名 &#xff08;2&#xff09;如何控制最终的类名 5.其他注意事项 一、css module 通过命名规范来限制类名太过死板&#xff0c;而 css in js 虽然足够灵活&…

Windows上Miniconda的安装:一步步教你从零开始

&#x1f680;Windows上Miniconda的安装&#xff1a;一步步教你从零开始&#x1f680; &#x1f335;文章目录&#x1f335; &#x1f333;引言&#x1f333;&#x1f333;二、Miniconda简介&#xff1a;开启您的数据科学之旅的得力助手&#xff01; &#x1f333;&#x1f333…

神经网络:卷积神经网络中的BatchNorm

一、BN介绍 1.原理 在机器学习中让输入的数据之间相关性越少越好&#xff0c;最好输入的每个样本都是均值为0方差为1。在输入神经网络之前可以对数据进行处理让数据消除共线性&#xff0c;但是这样的话输入层的激活层看到的是一个分布良好的数据&#xff0c;但是较深的激活层…

Java 集合

一、集合的框架体系&#xff08;重要&#xff0c;背&#xff01;&#xff01;&#xff01;&#xff09; 1.Collection&#xff08;单列集合&#xff09; 2.Map&#xff08;双列集合&#xff09; 二、Collection接口 1.特点 使用了Collection接口的子类 可以存放多个元素&am…

C语言学习day12:for循环

前面学了dowhile循环&#xff0c;今天我们来学习经常用到的for循环&#xff1a; for循环&#xff1a; 例子&#xff1a; int main() {//int i;for (int i 0; i < 10;i) {printf("%d\n",i);};system("pause");return EXIT_SUCCESS; } 解释&#xff…

高中数学:不等式

一、性质 1、同向可加性 2、同向同正可乘 3、正数乘方开方&#xff08;n∈Z&#xff0c;n≥2&#xff09; 常见题型 1、比较大小 分式比较大小&#xff0c;先去分母作差法比较大小带根号的无理数比较大小&#xff0c;直接两边开方因式分解&#xff08;较难&#xff09; 2、…

PhP+vue企业原材料采购系统_cxg0o

伴随着我国社会的发展&#xff0c;人民生活质量日益提高。互联网逐步进入千家万户&#xff0c;改变传统的管理方式&#xff0c;原材料采购系统以互联网为基础&#xff0c;利用php技术&#xff0c;结合vue框架和MySQL数据库开发设计一套原材料采购系统&#xff0c;提高工作效率的…