kafka之java客户端实战

1. kafka的客户端

        Kafka提供了两套客户端API,HighLevel API和LowLevel API。 HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用。我们的重点是HighLeve API 。

2. 基础客户端的使用

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.13</artifactId>
   <version>3.4.0</version>
  </dependency>

2.1 如何发消息

        现在, 我们使用Kafka提供的Producer类,如何发送消息。

2.1.1 单项发送消息

代码:

public class MyProducerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 5; i++) {
            //Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            //Part3:发送消息
            //单向发送:不关心服务端的应答。
            producer.send(record);
            System.out.println("message "+i+" sended");
        }
        //消息处理完才停止发送者。
        producer.close();
    }
}

执行结果:

2.1.2 同步发送

代码:

public class MyProducerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 5; i++) {
            //Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            //Part3:发送消息
            //同步发送:获取服务端应答消息前,会阻塞当前线程。
            RecordMetadata recordMetadata = producer.send(record).get();
            String topic = recordMetadata.topic();
            int partition = recordMetadata.partition();
            long offset = recordMetadata.offset();
            String message = recordMetadata.toString();
            System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);

        }
        //消息处理完才停止发送者。
        producer.close();
    }
}

执行结果:

 2.1.2 异步发送 

代码:

public class MyProducerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.roy.kfk.basic.MyInterceptor");
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        CountDownLatch latch = new CountDownLatch(5);
        for(int i = 0; i < 5; i++) {
            //Part2:构建消息
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
            //Part3:发送消息
            //异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(null != e){
                        System.out.println("消息发送失败,"+e.getMessage());
                        e.printStackTrace();
                    }else{
                        String topic = recordMetadata.topic();
                        long offset = recordMetadata.offset();
                        String message = recordMetadata.toString();
                        System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);
                    }
                    latch.countDown();
                }
            });
        }
        //消息处理完才停止发送者。
        latch.await();
        //消息处理完才停止发送者。
        producer.close();
    }
}

执行结果:

2.1.3 总结 

​ 从上述示例中,我们可以总结出,构建Producer分为三个步骤:

  1. 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  2. 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  3. 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

2.2 如何消费消息

        接下来可以使用Kafka提供的Consumer类,快速消费消息。

2.2.1 消费消息

代码:

public class MyConsumerTest {
    private static final String BOOTSTRAP_SERVERS = "192.168.31.5:9092,192.168.31.176:9092,192.168.31.232:9092";
    private static final String TOPIC = "disTopic";

    public static void main(String[] args) {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        //kafka地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //每个消费者要指定一个group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        //key序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //value序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC));

        while (true) {
            //PART2:拉取消息
            // 100毫秒超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
            //PART3:处理消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition = "+record.partition()+"offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());
            }


            //提交offset,消息就不会重复推送。
            consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
        }
    }
}

2.2.2 总结

​ 整体来说,Consumer同样是分为三个步骤:

  1. 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  2. 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  3. 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。

3. 从客户端一些属性来认识kafka客户端工作机制

内容更新中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/318260.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

通过shell脚本确定当前平台

shell中的变量OSTYPE存储操作系统的名称&#xff0c;也可以使用uname命令来确认当前所在的平台。 shell中的变量HOSTTYPE存储操作系统的架构。 测试代码如下所示&#xff1a; #! /bin/bashecho "use OSTYPE:" if [[ "$OSTYPE" "linux-gnu&quo…

canvas设置渐变色文字(线性、径向)

查看专栏目录 canvas示例教程100专栏&#xff0c;提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重…

ssm基于JAVA的酒店客房管理系统论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本酒店客房管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息…

JAVA实现循环日期加一天

一、业务背景 现在数据库新增字段需要区分平日(0)和假期(1)的数据&#xff0c;之前有一批去年的数据都没有算过&#xff0c;所以得用日期循环来根据实际的时间来修改对应的数值&#xff0c;废话不多说看具体操作方法。 二、操作方法 // 初始日期 String dateString "20…

谷粒商城项目|es的应用场景及常见问题

es是什么 es多被用于搜索聚合分析引擎 是分布式的可以高性能查询的引擎 es应用场景 为什么不用MYSQL而用es es将数据存在内存中且可以分布式的存储数据 商品上架 商品在es中的保存 1.在es中建立索引 spu sku spu sku保存在一起防止分布查询 为了防止对象数组扁平化&#xff…

NetDevOps:华三交换机通过Netmiko或者Nornir获取接口信息通过TextFSM解析报错问题

python代码&#xff1a;实现功能获取交换机接口信息并通过TextFSM进行解析。 from netmiko import Netmiko import textfsm show_intf_cmd_mapping {hp_comware: display interface, }def ssh_device_2_get_intfs(device_type, host, username, password, port):dev_info {d…

微机原理常考填空总结

hello大家好我是吃个西瓜嘤&#xff0c;这篇节只总结微机原理常考填空题都是干货展示常出现的易错点以及微机原理注意事项。 以下仅代表个人发言 #微机原理 正文开始&#xff1a; 1&#xff0c;区分JZ&#xff0c;JNZ技巧 也就是D70用JZ&#xff1b;D71用JNZ。 JZ;条件ZF1时…

LLM之RAG实战(十四)| 利用LongContextRetriver克服RAG中的中间丢失现象

人类和大型语言模型&#xff08;LLM&#xff09;都有一个共同的行为模式&#xff1a;他们往往擅长处理位于给定内容开头或结尾的信息&#xff0c;而中间的信息往往会被忽视。 来自斯坦福大学、加州大学伯克利分校和Samaya AI的研究人员在论文《Lost in the Middle: How Languag…

安装MySQL

采用ubuntu系统&#xff0c;安装MySQL5.7 安装 下载apt仓库文件 #下载apt仓库的安装包&#xff0c;Ubuntu的安装包是.deb文件 wget https://dev.mysql.com/get/mysql-apt-config_0.8.12-1_all.deb配置apt仓库 #使用dpkg命令安装仓库dpkg -i mysql-apt-config_0.8.12-1_all.…

大模型实战营Day3 作业

基础作业&#xff1a; 复现课程知识库助手搭建过程 (截图) 进阶作业&#xff1a; 选择一个垂直领域&#xff0c;收集该领域的专业资料构建专业知识库&#xff0c;并搭建专业问答助手&#xff0c;并在 OpenXLab 上成功部署&#xff08;截图&#xff0c;并提供应用地址&#xf…

Nacos和Eureka比较、统一配置管理、Nacos热更新、多环境配置共享、Nacos集群搭建步骤

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Nacos和eureka的对比二、统一配置管理二、Nacos热更新方式一方式二 三、多环境配置共享四、Nacos集群搭建步骤&#xff08;黑马springCloud的p29&#xff0…

简单明了,汽车级LM317系列LM317D2TR4G线性电压稳压器电源设计-参数应用方案分享

低压差线性稳压器&#xff08;LDO&#xff09;&#xff0c;是指一种具有恒定电流输出电压的装置&#xff0c;主要由输入变压器、整流器、输出变压器三部分构成&#xff0c;工业原理为将输入的交流电压经过整流、滤波后得到直流输出电压&#xff0c;再经过控制元件和开关器件将稳…

暄桐写字计划 | 开始布局我们的2024

暄桐是一间传统美学教育教室&#xff0c;创办于2011年&#xff0c;林曦是创办人和授课老师&#xff0c;教授以书法为主的传统文化和技艺&#xff0c;皆在以书法为起点&#xff0c;亲近中国传统之美&#xff0c;以实践和所得&#xff0c;滋养当下生活。      暄桐林曦老师有…

【自译】【精华】MIT麻省理工学院技术双月刊(The Bimonthly MIT Technology Review)2024年1~2月【创新版块概览(一)】

导读&#xff1a; 今年是 《MIT技术评论杂志》 创刊125周年纪念年&#xff08;该杂志自1899年创刊&#xff09;&#xff0c;笔者将2024开年第1期&#xff08;1月~2月号&#xff09;的创新版块&#xff08;Innovation Issue&#xff09;中的重要内容进行梳理&#xff0c;获得近年…

紫光展锐M6780丨画质增强——更炫的视觉体验

智能显示被认为是推动数字化转型和创新的重要技术之一。研究机构数据显示&#xff0c;预计到2035年底&#xff0c;全球智能显示市场规模将达到1368.6亿美元&#xff0c;2023-2035年符合年增长率为36.4%。 随着消费者对高品质视觉体验的需求不断增加&#xff0c;智能手机、平板…

二十几种未授权访问漏洞合集

未授权访问漏洞是一个在企业内部非常常见的问题&#xff0c;这种问题通常都是由于安全配置不当、认证页面存在缺陷&#xff0c;或者压根就没有认证导致的。当某企业对外的服务端口、功能无限制开放&#xff0c;并且对用户的访问没有做任何限制的时候&#xff0c;可能会泄露出某…

JavaScript常用事件详解

一、用于form&#xff08;表单&#xff09;的事件 在网页中经常会遇到一些表单的验证&#xff0c;是通过事件进行处理的&#xff0c;比如用户输入用户名之后&#xff0c;及时显示用户是否被注册 用于form&#xff08;表单&#xff09;的事件 事件名功能 onblur 当元素失…

基于OpenCV的谷物颗粒识别

基于OpenCV的谷物颗粒识别 一、程序整体功能介绍1.1 导入库与函数定义1.2 颜色分割与灰度处理1.3 二值化与轮廓检测1.4 绘制与计数1.5 主程序与结果展示 二、算法原理与实现流程2.1算法原理&#xff08;1&#xff09;颜色分割&#xff08;2&#xff09;灰度处理与二值化&#x…

ubuntu安装node

1 下载 node 官网下载 如果需要其他版本&#xff0c;点击上图的Other Downloads 这里下载的版本是20.11.0 Linux Binaries (x64)&#xff0c;下载下来后是node-v20.11.0-linux-x64.tar.xz这样的格式&#xff0c;直接右键解压得到如下目录&#xff1a; 直接拷贝该文件夹到指定目…

哈希表的实现(1)----除留余数法实现

一&#xff0c;哈希表的介绍 哈希表是一种通过哈希思想实现的一种数据结构。哈希表这种数据结构的特点便是可以通过一个值快速的定位这个值所在的位置实现插入&#xff0c;删除&#xff0c;查找。在这篇博客里面&#xff0c;我们便来实现一个通过除留余数法实现的一个哈希表。 …