Kafka Producer发送消息流程之消息异步发送和同步发送

文章目录

  • 1. 异步发送
  • 2. 同步发送

在这里插入图片描述

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。

public class KafkaProducerCallbackTest {
    public static void main(String[] args) throws InterruptedException {
        //创建producer
        HashMap<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

        for (int i = 0; i < 10; i++) {
            //创建record
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                    "test2",
                    ""+i,
                    "我是你爹"+i
            );
            //发送record
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("回调信息:消息发送成功");
                }
            });
            System.out.println("发送数据");
        }

        //关闭producer
        producer.close();
    }
}

Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。

2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。

2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。

按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。

public class KafkaProducerCallbackTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //创建producer
        HashMap<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

        for (int i = 0; i < 10; i++) {
            //创建record
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                    "test2",
                    ""+i,
                    "我是你爹"+i
            );
            //发送record
            Future<RecordMetadata> send = producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("回调信息:消息发送成功");
                }
            });
            System.out.println("发送数据");
            send.get();
        }

        //关闭producer
        producer.close();
    }
}
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/803593.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

集成excel工具:自定义导入回调监听器、自定义类型转换器、web中的读、捕获文件格式转换错误ExcelDataConvertException

文章目录 I 封装导入导出1.1 定义工具类1.2 自定义读回调监听器: 回调业务层处理导入数据1.3 定义文件导入上下文1.4 定义回调协议II 自定义转换器2.1 自定义枚举转换器2.2 日期转换器2.3 时间、日期、月份之间的互转2.4 LongConverterIII web中的读3.1 使用默认回调监听器3.2…

【Android14 ShellTransitions】(七)Transition就绪

Transition.onTransactionReady的内容比较长&#xff0c;我们挑重点的部分逐段分析&#xff08;跳过的地方并非不重要&#xff0c;而是我柿子挑软的捏&#xff09;。 1 窗口绘制状态的流转以及显示SurfaceControl 注意我们这里的SurfaceControl特指的是WindowSurfaceControll…

标准IO(Ubuntu)

标准IO 什么是标准IO 头文件stdio.h 用的最多的两个输入输出函数printf和scanf 针对终端的输入输出函数 数据输入到特定的文件需要的条件 需要知道指定文件的文件名和位置 输入到文件中的内容 指定文件 fopen函数 FILE *fopen(const char *pathname, const char *mode) 功能…

pnpm install安装失败

ERR_PNPM_META_FETCH_FAIL GET https://registry.npmjs.org/commitlint%2Fcli: request to https://registry.npmjs.org/commitlint%2Fcli failed, reason: connect ETIMEDOUT 2606:4700::6810:123:443 1. 检查网络连接 确保你的网络连接正常并且没有被防火墙或代理服务器阻止…

Linux shell编程学习笔记64:vmstat命令 获取进程、内存、虚拟内存、IO、cpu等信息

0 前言 在系统安全检查中&#xff0c;通常要收集进程、内存、IO等信息。Linux提供了功能众多的命令来获取这些信息。今天我们先研究vmstat命令。 1.vmstat命令的功能、用法、选项说明和注意事项 1.1 vmstat命令的功能 vmstat是 Virtual Meomory Statistics&#xff08;虚拟内…

【Java数据结构】初始线性表之一:链表

为什么要有链表 上一节我们描述了顺序表&#xff1a;【Java数据结构】初识线性表之一&#xff1a;顺序表-CSDN博客 并且进行了简单模拟实现。通过源码知道&#xff0c;ArrayList底层使用数组来存储元素。 由于其底层是一段连续空间&#xff0c;当在ArrayList任意位置插入或者…

NGFW和防火墙的区别?

NGFW&#xff08;Next Generation Firewall&#xff0c;下一代防火墙&#xff09;和FW&#xff08;Firewall&#xff0c;防火墙&#xff09;在网络安全领域都扮演着重要角色&#xff0c;但它们在功能、性能和应用场景上存在显著的区别。以下是NGFW和FW之间的主要区别&#xff1…

数据库作业九

1、安装redis&#xff0c;启动客户端、验证。 2、string类型数据的命令操作&#xff1a; &#xff08;1&#xff09; 设置键值&#xff1a; &#xff08;2&#xff09; 读取键值&#xff1a; ​ &#xff08;3&#xff09; 数值类型自增1&#xff1a; ​ &#xff08;4&am…

NFT项目的第三方功能及接口

NFT项目涉及到第三方功能及接口的情况非常常见&#xff0c;因为NFT项目本身的功能通常是有限的&#xff0c;需要通过与第三方功能和接口的整合来实现更丰富的功能和更好的用户体验。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 以下…

【LeetCode】十六、并查集

文章目录 1、并查集Union Find2、并查集find的优化&#xff1a;路径压缩 Quick find3、并查集union的优化&#xff1a;权重标记4、leetcode200&#xff1a;岛屿数量解法一&#xff1a;DFS 1、并查集Union Find 并查集&#xff0c;一种树形的数据结构&#xff0c;处理不相交的两…

如何在excel表中实现单元格满足条件时整行变色?

可以试试使用条件格式&#xff1a; 一、条件格式 所谓“自动变色”就要使用条件格式。 先简单模拟数据如下&#xff0c; 按 B列数字为偶数 为条件&#xff0c;整行标记为蓝色背景色。 可以这样设置&#xff1a; 先选中1:10行数据&#xff0c;在这里要确定一下名称栏里显示…

DNS查询过程

DNS&#xff08;域名系统&#xff0c;Domain Name System&#xff09;是一个用于将域名和IP地址相互映射的系统。当你在浏览器中输入一个网址时&#xff0c;浏览器会通过DNS查询过程来找到对应的IP地址&#xff0c;以便能够连接到目标服务器。其查询过程一般通过以下步骤&#…

Apple Vision Pro 和其商业未来

机器人、人工智能相关领域 news/events &#xff08;专栏目录&#xff09; 本文目录 一、Vision Pro 生态系统二、Apple Vision Pro 的营销用例 随着苹果公司备受期待的进军可穿戴计算领域&#xff0c;新款 Apple Vision Pro 承载着巨大的期望。 苹果公司推出的 Vision Pro 售…

MySQL数据库慢查询日志、SQL分析、数据库诊断

1 数据库调优维度 业务需求&#xff1a;勇敢地对不合理的需求说不系统架构&#xff1a;做架构设计的时候&#xff0c;应充分考虑业务的实际情况&#xff0c;考虑好数据库的各种选择(读写分离?高可用?实例个数?分库分表?用什么数据库?)SQL及索引&#xff1a;根据需求编写良…

Qt窗口程序整理汇总

到今日为止&#xff0c;通过一个个案例的实验&#xff0c;逐步熟悉了 Qt6下 窗体界面开发的&#xff0c;将走过的路&#xff0c;再次汇总整理。 Qt Splash样式的登录窗https://blog.csdn.net/castlooo/article/details/140462768 Qt实现MDI应用程序https://blog.csdn.net/cast…

数据库课设---学生宿舍管理系统(sql server+C#)

1.引言 1.1 内容及要求 设计内容&#xff1a;设计学生宿舍管理系统。 设计要求&#xff1a; &#xff08;1&#xff09;数据库应用系统开发的需求分析&#xff0c;写出比较完善系统功能。 &#xff08;2&#xff09;数据库概念模型设计、逻辑模型设计以及物理模型设计。 …

【数学建模】——多领域资源优化中的创新应用-六大经典问题解答

目录 题目1&#xff1a;截取条材 题目 1.1问题描述 1.2 数学模型 1.3 求解 1.4 解答 题目2&#xff1a;商店进货销售计划 题目 2.1 问题描述 2.2 数学模型 2.3 求解 2.4 解答 题目3&#xff1a;货船装载问题 题目 3.1问题重述 3.2 数学模型 3.3 求解 3.4 解…

JS爬虫实战之极验四代

极验四代滑块验证码 一、目标网站说明二、流程步骤1. 逆向步骤一般分为&#xff1a;2. 接口确认1- 确认流程2- 获取verify的参数3- 构建requests验证verify的参数4- 锁定secode参数的作用 ok&#xff0c;让我们去获取verify接口中的响应&#xff01;&#xff01;&#xff01; 3…

el-table表格操作列错行处理

解决方法&#xff1a; <style>::v-deep .el-table th.el-table__cell > .cell {white-space: nowrap !important;} </style>

【C++航海王:追寻罗杰的编程之路】智能指针

目录 1 -> 为什么需要智能指针&#xff1f; 2 -> 内存泄漏 2.1 ->什么是内存泄漏&#xff0c;以及内存泄漏的危害 2.2 -> 内存泄漏分类 2.3 -> 如何避免内存泄漏 3 -> 智能指针的使用及原理 3.1 -> RAII 3.2 -> 智能指针的原理 3.3 -> std…