Kafka(二)Producer第一篇

一,Client开发


生产逻辑需要具备以下几个 步骤:
(1)配置生产者客户端参数及创建相应的生产者实例。
(2)构建待发送的消息。
(3)发送消息。
(4)关闭生产者实例。
public class Producer {
    private static final String BROKER_LIST = "localhost:9092";
    private static final String TOPIC = "TOPIC-A";
    public static Properties initConfig() {
        Properties properties = new Properties();
        // 以下3个必须
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       
        // 客户端ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "eris-kafka-producer");
        // 重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 成功收到消息分区副本数,默认为1,即leader副本收到就返回成功。注意是字符串!
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        // 生产者客户端能发送的消息的最大值,单位为B,默认1M
        properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,  1048576);
        // Producer等待请求响应的最长时间,单位ms,默认值为30000
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,  30000);
        // 生产者发送ProducerBatch之前等待更多ProducerRecord加入的时间。默认为0,ProducerBatch被填满时发出
        properties.put(ProducerConfig.LINGER_MS_CONFIG,  0);
        return properties;
    }

    public static ProducerRecord<String, String> initMessage() {
        return new ProducerRecord<>(TOPIC, "hi eris");
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord record = initMessage();
        kafkaProducer.send(record);
        kafkaProducer.close();
    }
}
注意:KafkaProducer,它是线程安全的,我们可以在多线程的环境中复用它。
消费者客户端KafkaConsumer是非线程安全的。
1,发送消息的构造
消息对象是ProducerRecord,业务信息是value字段。
  • key可以让消息再二次归类,同一个key的消息会被划分到同一个分区中;
  • topic属性和value属性是必填项,其余属性是选填项;
2,主要Producer参数
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG并非需要所有的broker地址,生产者会从给定的broker里查找到其他broker的信息,一般至少要2个。
3, 消息的发送
3-1,消息发送的3种模式:发后即忘、同步及异步
A,发后即忘:
kafkaProducer.send(record);
B,同步:
kafkaProducer.send(record).get();
C,异步:
  • 不推荐手动用future实现业务异步,诸多消息对应的Future对象的处理会引起逻辑的混乱。
  • 对于同一个分区,如果消息record1于record2之前先发送,那么KafkaProducer可以保证对应的callback1在callback2之前调用,也就是说 ,回调函数的调用也可以保证分区有序。
3-2,KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。
对于可重试异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常;不可重试异常会直接抛出异常。
3-3,close方法会回收资源,并且阻塞等待之前所有的发送请求完成后再关闭 KafkaProducer,可带timeout。
3-4,消息在通过send发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)才能被真正地发往 broker。
4,序列化
生产者需要用序列化器(Serializer) 把对象转换成字节数组才能通过网络发送给Kafka。
如果 Kafka 客户端提供的几种序列化器都无法满足需求,则可以选择使用如Avro、JSON、Thrift、ProtoBuf和Protostuff等通用的序列化工具来实现,或者使用 自定义的序列化器来实现
5,分区器
如果ProducerRecord中指定了partition字段,那么就不需要分区器的作用。
反之若没有指定partition字段,那么就需要依赖分区器, 根据key这个字段来计算partition的值。
  • 如果key不为null,计算得到的分区会是所有分区中的任意一个;
  • 如果key为null,计算得到的分区号仅为可用分区中的任意一个;
一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。
可以自定义分区器。
6,拦截器
自定义拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口,接口中包含3个方法:
  • onSend:将消息序列化和计算分区之前调用;
  • onAcknowledgement:消息被应答(ack)之前或消息发送失败时调用,优先于用户设定的 Callback之前执行;
  • interceptor.classes配置拦截器链(逗号分隔),拦截链会按照参数配置的顺序一一执行。如果某个拦截器执行失败,下一个拦截器会接 着上一个执行成功的拦截器继续执行。

二,重要参数


1.acks
用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。
  • ack=1(默认),leader写入(落盘)即成功;
  • ack=0,发送后就认为成功,不需要等待任何服务端的响应;
  • ack=-1或者ack=“all”,需要ISR中的所有副本都成功写入;
2,retries
生产者重试的次数,默认值为0,即不重试。
保证消息顺序
max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器响应之前可以发送多少个消息(即最多缓存的请求数)。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
如果把 retries 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了
一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把 retries 设为 0。可以把 max.in.flight.requests.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给 broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。
3,linger.ms
这个参数用来指定生产者发送ProducerBatch之前等待更多ProducerRecord加入ProducerBatch的时间,默认值为0,即生产者会在 ProducerBatch 被填满时发送出去。
其他:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/783191.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

字节码编程javassist之打印方法耗时和入参

写在前面 本文看下如何实现打印方法耗时和入参。 1&#xff1a;程序 需要增强的类&#xff1a; public class ApiTest1 {public Integer strToInt(String str01, String str02) {return Integer.parseInt(str01);}}插桩类 package com.dahuyou.javassist.huohuo.aa;import…

基于 V7 FPGA 的4X 100G 光纤加速卡,可应用于基于服务器的光纤通道数据采集、数据传输等场景

4个100G QSFP28 光纤通道PCIE x16 主机接口&#xff0c;支持xdma&#xff0c;支持SG DMA光纤通道支持Aurora等协议标准&#xff0c;最高支持25Gbps/lane2组独立的DDR4 SDRAM 缓存&#xff0c;工作时钟频率1200MHz多路数字离散IO接口高性能时钟管理单元 功能框图 一款基于PCIE总…

easyexcel使用小结-未完待续

官网&#xff1a;https://easyexcel.opensource.alibaba.com/docs/current/ <dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>4.0.1</version></dependency>一、读 1.1简单读 Getter…

Vue 与 OpenAI 接口交互实战:发送请求的全流程解析(一)

前言 本文讲解使用vue去搭建一个项目&#xff0c;然后向OpenAI发送请求&#xff0c;并获取数据 文章分为两篇书写&#xff0c;本篇文章侧重于书写API的封装与调用&#xff0c;第二篇文章侧重于页面逻辑的处理 接下来就让我们开始吧! 调用OpenAI的本质是什么&#xff1f; 本…

基于AD8232的心电图套件的测试

基于AD8232的心电图套件的测试 1、测试设备2、电源的选择3、 用于测试心电图套件的模拟心电图电路基本4017B的电路基于multisim的电路仿真基于STM32F103RCT6 参考测试数据 1、测试设备 1、AD8232心电模块 2、手持示波器 3、心电信号模拟发生器 4、NI multisim 14.3 5、实物待补…

关于振动盘正反料下料逻辑编写

写在前文 借鉴某个程序的逻辑套路写的 1.就是第一个料是正方向&#xff0c;第二个料是反方向。 (* 基础逻辑应该都差不多&#xff0c;这个是一个振动盘&#xff0c;振动盘的末端是一个上下对射的感应器&#xff0c;这个感应器的作用是对射感应到物料的到位信号&#xff0c;末端…

java LogUtil输出日志打日志的class文件内具体方法和行号

最近琢磨怎么把日志打的更清晰&#xff0c;方便查找问题&#xff0c;又不需要在每个class内都创建Logger对象&#xff1b;利用堆栈的方向顺序拿到日志的class问题。看效果&#xff0c;直接上代码。 1、demo test 2、输出效果 3、完整的LogUtil文件 import org.jetbrains.anno…

导入项目,JAVA文件是咖啡杯图标

问题 从图中可以看到&#xff0c;JAVA文件是咖啡杯图标 原因 项目没有识别为MAVEN项目 解决办法 进入pom.xml文件&#xff0c;右键点击Add as Maven Project即可

详解Linux的shell脚本基础指令

一、shell简介 是Linux系统的用户界面&#xff0c;它提供用户与内核的一种交互方式。它接收用户输入的命令&#xff0c;并把它送入内核去执行&#xff0c;是一个命令解释器。 脚本&#xff1a;本质是一个文件&#xff0c;文件里面存放的是 特定格式的指令&#xff0c;系统可以…

CC4利用链分析

我的Github主页Java反序列化学习同步更新&#xff0c;有简单的利用链图 分析版本 Commons Collections 4.0 JDK 8u65 环境配置参考JAVA安全初探(三):CC1链全分析 分析过程 在Commons Collections 4.0中&#xff0c;TransformingComparator类变为可序列化类&#xff0c;增…

myeclipse开发ssm框架项目图书管理系统 mysql数据库web计算机毕业设计项目

摘 要 随着计算机的广泛应用&#xff0c;其逐步成为现代化的标志。图书馆的信息量也会越来越大&#xff0c;因此需要对图书信息、借书信息、还书信息等进行管理&#xff0c;及时了解各个环节中信息的变更&#xff0c;要对因此而产生的单据进行及时的处理&#xff0c;为了提高高…

吴恩达老师推荐的大模型分析网站 Artificial Analysis

是吴恩达老师推荐的一个提供各大模型的质量、输出速度、价格对比等多维度分析的网站。 比起 LMSYS Chatbot Arena 或者 HF 上的其他竞技场&#xff0c;它们更注重的输出的质量而这个网站其实更利于我们选择一个合适的大模型 API&#xff0c;强烈安利&#xff01;

千帆大模型平台升级十大能力,企业级 RAG 全面升级

7 月 5 日&#xff0c;2024 世界人工智能大会&#xff08;WAIC&#xff09;期间&#xff0c;百度智能云大模型助力新质生产力发展论坛在在上海世博展览馆举办。会上&#xff0c;百度智能云宣布文心大模型 4.0 Turbo&#xff08;ERNIE 4.0 Turbo&#xff09;面向企业客户全面开放…

人工智能在三级淋巴结:肿瘤浸润淋巴细胞领域的系统研究进展|顶刊速递·24-07-08

小罗碎碎念 本期文献主题&#xff1a;人工智能在三级淋巴结/肿瘤浸润淋巴细胞领域的系统分析 关于三级淋巴结和肿瘤浸润淋巴细胞的文献&#xff0c;会是接下来的分析重点&#xff0c;期间也会穿插临床文献&项目复现的推文。 另外再说点科研道路上的题外话&#xff0c;也算是…

GitLab CI/CD实现项目自动化部署

1 GitLab CI/CD介绍 GitLab CI/CD 是 GitLab 中集成的一套用于软件开发的持续集成&#xff08;Continuous Integration&#xff09;、持续交付&#xff08;Continuous Delivery&#xff09;和持续部署&#xff08;Continuous Deployment&#xff09;工具。这套系统允许开发团队…

一手洞悉泰国slot线上游戏投放本土网盟CPI计费广告优势

一手洞悉泰国slot线上游戏投放本土网盟CPI计费广告优势 ​在泰国这个拥有独特文化背景和审美观念的国家&#xff0c;Slots游戏以其丰富的玩法和刺激的体验迅速赢得了玩家们的喜爱。然而&#xff0c;要在竞争激烈的市场中脱颖而出&#xff0c;有效的推广策略显得尤为重要。本土…

JVM专题之垃圾收集器

JVM参数 3.1.1 标准参数 -version -help -server -cp 3.1.2 -X参数 非标准参数,也就是在JDK各个版本中可能会变动 ``` -Xint 解释执行 -Xcomp 第一次使用就编译成本地代码 -Xmixed 混合模式,JVM自己来决定 3.1.3 -XX参数 > 使用得最多的参数类型 > > 非…

Docassemble interview 未授权任意文件读取漏洞复现(CVE-2024-27292)

0x01 产品简介 Docassemble是一款强大的开源工具,主要用于自动化生成和定制复杂文档,特别是在法律文档处理领域表现出色。由Jonathan Pyle个人开发者开发,是一个免费的开源专家系统,用于指导访谈和文档组装。Docassemble基于Python编写,充分利用了Python的灵活性和广泛的…

【论文阅读】-- Visual Traffic Jam Analysis Based on Trajectory Data

基于轨迹数据的可视化交通拥堵分析 摘要1 引言2 相关工作2.1 交通事件检测2.2 交通可视化2.3 传播图可视化 3 概述3.1 设计要求3.2 输入数据说明3.3 交通拥堵数据模型3.4 工作流程 4 预处理4.1 路网处理4.2 GPS数据清理4.3 地图匹配4.4 道路速度计算4.5 交通拥堵检测4.6 传播图…

Spring Cloud: OpenFeign 超时重试机制

超时重试是一种用于网络通信的常用策略&#xff0c;目的是在请求未能在规定时间内获得响应或响应超时的情况下&#xff0c;重新发送请求。具体来说&#xff0c;当发起请求后&#xff0c;如果在设定的时间内未能收到预期的响应&#xff0c;就会启动超时重试机制&#xff0c;重新…