Kafka-代码示例

一、构建开发环境

File > New > Project

选择一个最简单的模板

项目和坐标命名

配置maven路径

添加maven依赖

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.2.1</version>
    </dependency>
</dependencies>

 加载刚刚添加的依赖

此时发现项目还没有包目录,如果遇到这种情况,点击新建目录就会自动提示了

二、创建一个新的topic

kafka-topics --create --topic kafka-study --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2
#查看topic详情
kafka-topics --describe --zookeeper cdh1:2181 --topic kafka-study
#查看 topic 指定分区 offset
kafka-run-class kafka.tools.GetOffsetShell --topic kafka-study --time -1 --broker-list cdh1:9092

三、编写生产者

kafka源码中有生产者和消费者的示例,我们简单修改下就直接用了

package org.example.kafkaStudy;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

public class KafkaProducerDemo {

    public static void main(String[] args) {
        try{
            //topic名称
            String topicName = "kafka-study";
            //broker列表
            String bootstrapServers = "cdh1:9092,cdh2:9092,cdh3:9092";
            //向topic打多少数据
            int numRecords = 10000;
            //是否异步推送数据
            boolean isAsync = true;
            int key = 0;
            int sentRecords = 0;
            //创建生产者
            KafkaProducer<Integer, String> producer = createKafkaProducer(bootstrapServers,-1,null,false);
            //判断是否达到生产要求
            while (sentRecords < numRecords) {
                if (isAsync) {
                    //异步推送
                    asyncSend(producer,topicName, key, "test" + key,sentRecords);
                } else {
                    //同步推送
                    syncSend(producer,topicName, key, "test" + key,sentRecords);
                }
                key++;
                sentRecords++;
            }
            producer.close();
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }

    private static RecordMetadata syncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords)
            throws ExecutionException, InterruptedException {
        try {
            // 发送记录,然后调用get,这会阻止等待来自broker的ack
            RecordMetadata metadata = producer.send(new ProducerRecord<>(topicName, key, value)).get();
            Utils.maybePrintRecord(sentRecords, key, value, metadata);
            return metadata;
        } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
                | OutOfOrderSequenceException | SerializationException e) {
            Utils.printErr(e.getMessage());
        } catch (KafkaException e) {
            Utils.printErr(e.getMessage());
        }
        return null;
    }

    private static void asyncSend(KafkaProducer<Integer, String> producer,String topicName, int key, String value,int sentRecords) {
        //异步发送记录,设置一个回调以通知结果。
        //请注意,即使使用linger.ms=0设置了一个batch.size 当缓冲区内存已满或元数据不可用时,发送操作仍将被阻止
        producer.send(new ProducerRecord<>(topicName, key, value), new ProducerCallback(key, value,sentRecords));
    }

    private static KafkaProducer<Integer, String> createKafkaProducer(String bootstrapServers ,
              int transactionTimeoutMs,String transactionalId,boolean enableIdempotency) {
        Properties props = new Properties();
        // 生产者连接到broker需要引导服务器配置
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/port
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
        // 设置序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        if (transactionTimeoutMs > 0) {
            // 事务协调器主动中止正在进行的事务之前的最长时间
            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
        }
        if (transactionalId != null) {
            // 事务id必须是静态且唯一的,它用于在流程重启过程中标识相同的生产者实例
            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
        }
        // 在分区级别启用重复保护
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
        return new KafkaProducer<>(props);
    }

    static class ProducerCallback implements Callback {
        private final int key;
        private final int sentRecords;
        private final String value;

        public ProducerCallback(int key, String value,int sentRecords) {
            this.key = key;
            this.sentRecords = sentRecords;
            this.value = value;
        }

        /**
         * 用户可以实现一种回调方法,以提供请求完成的异步处理。当发送到服务器的记录得到确认时,将调用此方法。当回调中的异常不为null时,
         * 元数据将包含除topicPartition之外的所有字段的特殊-1值,该值将有效。
         *
         * @param metadata 发送的记录的元数据(即分区和偏移量)。如果发生错误,将返回除topicPartition之外的所有字段的值为-1的空元数据。
         * @param exception 处理此记录时引发的异常。如果没有发生错误,则为空。
         */
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                Utils.printErr(exception.getMessage());
                if (!(exception instanceof RetriableException)) {
                    // 我们无法从这些异常中恢复过来
                }
            } else {
                Utils.maybePrintRecord(sentRecords, key, value, metadata);
            }
        }
    }
}

四、编写消费者

package org.example.kafkaStudy;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import static java.util.Collections.singleton;

public class KafkaConsumerDemo {

    public static void main(String[] args) {
        //topic名称
        String topicName = "kafka-study";
        //组名称
        String groupName = "my-group-1";
        //broker列表
        String bootstrapServers =  "cdh1:9092,cdh2:9092,cdh3:9092";
        //向topci打多少数据
        int numRecords = 10000;
        int remainingRecords = 10000;
        // 消费来自 topic = kafka-study 的数据
        KafkaConsumer<Integer, String> consumer = createKafkaConsumer(bootstrapServers,groupName,false);
        //订阅主题列表以获取动态分配的分区此类实现了我们在此处传递的再平衡侦听器,以接收此类事件的通知
        consumer.subscribe(singleton(topicName));
        Utils.printOut("Subscribed to %s", topicName);
        while (remainingRecords > 0) {
            try {
                // 如果需要,轮询会更新分区分配并调用配置的重新平衡侦听器,然后尝试使用上次提交的偏移量或auto.offset.reset按顺序获取记录。
                // 如果有记录或超时返回空记录集,则重置策略会立即返回。下一次轮询必须在session.timeout.ms中调用,以避免组重新平衡
                ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<Integer, String> record : records) {
                    Utils.maybePrintRecord(numRecords, record);
                }
                remainingRecords -= records.count();
            } catch (AuthorizationException | UnsupportedVersionException
                    e) {
                // 我们无法从这些异常中恢复过来
                Utils.printErr(e.getMessage());
            } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
                // 在没有auto.reset.policy的情况下,找不到偏移量或偏移量无效
                Utils.printOut("Invalid or no offset found, using latest");
                consumer.seekToEnd(e.partitions());
                consumer.commitSync();
            } catch (KafkaException e) {
                // 记录异常并尝试继续
                Utils.printErr(e.getMessage());
            }
        }
        consumer.close();
        Utils.printOut("Fetched %d records", numRecords - remainingRecords);
    }

    private static KafkaConsumer<Integer, String> createKafkaConsumer(String bootstrapServers,
                     String groupId , boolean readCommitted) {
        Properties props = new Properties();
        // 消费者连接到broker需要引导服务器配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 不需要客户端id,但通过允许在服务器端请求日志中包含逻辑应用程序名称来跟踪请求的来源,而不仅仅是ip/port
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
        // 当我们使用订阅(topic)进行组管理时,需要消费者groupId
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //设置静态成员资格以提高可用性(例如滚动重启)
//        instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
        //启用EOS时禁用自动提交,因为偏移量与事务一起提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");
        //读取数据用到的反序列化器,需要和生产者对应
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        if (readCommitted) {
            // 跳过正在进行和已中止的事务
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        }
        // 在偏移无效或没有偏移的情况下设置重置偏移策略
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new KafkaConsumer<>(props);
    }
}

五、运行程序

生产者日志打印

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(0, test0), partition(kafka-study-0), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(1, test1), partition(kafka-study-0), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(5, test5), partition(kafka-study-0), offset(2)

......

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9940, test9940), partition(kafka-study-0), offset(4979)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9960, test9960), partition(kafka-study-0), offset(4987)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9970, test9970), partition(kafka-study-0), offset(4991)

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(2, test2), partition(kafka-study-1), offset(0)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(3, test3), partition(kafka-study-1), offset(1)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(4, test4), partition(kafka-study-1), offset(2)

.......

kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9950, test9950), partition(kafka-study-1), offset(4966)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9980, test9980), partition(kafka-study-1), offset(4986)
kafka-producer-network-thread | client-abf9115c-bc27-47cf-bddb-4a20d6e103ec - Sample: record(9990, test9990), partition(kafka-study-1), offset(4991)

我们再次用命令看下每个分区的offset

消费者日志打印

main - Subscribed to kafka-study
main - Sample: record(0, test0), partition(kafka-study-0), offset(0)
main - Sample: record(1000, test1000), partition(kafka-study-0), offset(506)
main - Sample: record(2000, test2000), partition(kafka-study-0), offset(1020)
main - Sample: record(3000, test3000), partition(kafka-study-0), offset(1554)
main - Sample: record(7000, test7000), partition(kafka-study-0), offset(3550)
main - Sample: record(4000, test4000), partition(kafka-study-1), offset(1929)
main - Sample: record(5000, test5000), partition(kafka-study-1), offset(2422)
main - Sample: record(6000, test6000), partition(kafka-study-1), offset(2932)
main - Sample: record(8000, test8000), partition(kafka-study-1), offset(3963)
main - Sample: record(9000, test9000), partition(kafka-study-1), offset(4467)
main - Fetched 10000 records

六、问题说明

从日志中我们可以看到,在异步生产和消费时offset并不是逐个递增上去的,这是为什么呢?

在前面博客中我们提到,生产者在异步的情况下会启用批处理,即:Kafka生产者将尝试在内存中积累数据,并在单个请求中发送更大的批处理。批处理可以配置为积累不超过固定数量的消息,并且等待时间不超过一些固定的延迟限制(例如64k或10毫秒)。这允许积累更多的消息来发送,并且在服务器上几乎没有更大的I/O操作。这种缓冲是可配置的,并提供了一种机制来权衡少量额外的延迟以获得更好的吞吐量。当然如果你选择的是同步推送或者异步中单条消息特别大会导致批处理优化使用不到。

消费者也是从brokers一批一批的拉取数据来消费的

我们也可以看下broker的日志中数据的索引情况

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.log | head -10

kafka-run-class kafka.tools.DumpLogSegments --files /var/local/kafka/data/kafka-study-0/00000000000000000000.index | head -10

从这里我们可以看到,生产者是一批一批往broker推送的,broker以更大的批次往磁盘写,从而降低推送的频次,也降低与磁盘交互的频次。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/901247.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

学习笔记——动态路由——OSPF(距离矢量协议)OSPF路由类型

OSPF路由类型 在OSPF中&#xff0c;路由类型指的是不同种类的路由&#xff0c;用于描述网络中不同的路由信息及其传输方式。 1、Intra Area路由(区域内路由) Intra Area路由(区域内路由/本地路由/内部路由)是OSPF协议中的一种路由类型&#xff0c;用于描述在同一个OSPF区域内…

【论文阅读】ESRGAN

学习资料 论文题目&#xff1a;增强型超分辨率生成对抗网络&#xff08;ESRGAN: Enhanced Super-Resolution Generative Adversarial Networks&#xff09;论文地址&#xff1a;[1809.00219] ESRGAN&#xff1a;增强型超分辨率生成对抗网络代码&#xff1a;xinntao / ESRGAN&am…

牛客周赛 Round 64(博弈论、思维、构造、LCA、换根DP)

文章目录 牛客周赛 Round 64(博弈论、思维、构造、LCA、换根DP)A. 小红的对错判断B. 小红的幂表达C. 小红的前缀询问D. 小红和小紫的博弈游戏&#xff08;博弈论&#xff09;E. 小红的字符串重排&#xff08;思维、构造&#xff09;F&G. 小红的树上路径查询&#xff08;LCA…

LabVIEW共享变量通信故障

问题概述&#xff1a; 在LabVIEW项目中&#xff0c;使用IO服务器创建共享变量&#xff0c;并通过LabVIEW作为从站进行数据通信。通讯在最初运行时正常&#xff0c;但在经过一段时间或几个小时后&#xff0c;VI前面板出现错误输出&#xff0c;导致数据传输失败。虽然“分布式系统…

equals方法重写--自写Person类

1.Object类的equals方法&#xff08;源码&#xff09; public boolean equals(Object obj) {return (this obj);//判断如果比较的两个对象是同一个对象&#xff0c;则返回true} 2.String类重写Object类的equals方法&#xff08;源码&#xff09; public boolean equals(Obje…

Git的初次使用

一、下载git 找淘宝的镜像去下载比较快 点击这里 二、配置git 1.打开git命令框 2.设置配置 git config --global user.name "你的用名"git config --global user.email "你的邮箱qq.com" 3.制作本地仓库 新建一个文件夹即可&#xff0c;然后在文件夹…

网络一些相关术语

目录 网络一些相关术语 转发平面效率 可扩展性 控制平面 网络拓扑 服务质量&#xff08;QoS&#xff09; 网络协议 网络带宽 网络拥塞 网络安全 网络冗余 网络切片 网络延迟 网络地址转换&#xff08;NAT&#xff09; 虚拟专用网络&#xff08;VPN&#xff09; …

尚硅谷-react教程-求和案例-优化2-Provider组件的使用-笔记

在这篇文章的基础上&#xff0c;https://blog.csdn.net/weixin_41987016/article/details/143257435?spm1001.2014.3001.5501 继续优化&#xff0c; 借助Provider批量的给整个应用里面的所有的容器组件的添加store 原来的,src/index.js import React from "react&quo…

从0开始深度学习(17)——数值稳定性和模型初始化

在每次训练之前&#xff0c;都会对模型的参数进行初始化&#xff0c;初始化方案的选择在神经网络学习中起着举足轻重的作用&#xff0c; 它对保持数值稳定性至关重要。 我们选择哪个函数以及如何初始化参数可以决定优化算法收敛的速度有多快。 糟糕选择可能会导致我们在训练时遇…

云电脑的真实使用体验

最近这几年&#xff0c;关于云电脑的宣传越来越多。 小枣君之前曾经给大家介绍过云电脑&#xff08;链接&#xff09;。简单来说&#xff0c;它属于云计算的一个应用。通过在云端虚拟出一些虚拟电脑&#xff0c;然后让用户可以远程使用&#xff08;仍然需要借助本地电脑&#x…

jupyter notebook改变默认启动路径

安装好Anaconda 3以后,就可以使用Jupyter notebook了,但是我们打开Jupyter notebook后,发现界面是一个默认的目录,这个目录在哪里?如果想把自己写的程序文件保存在自己新建的一个文件夹里,修改默认目录到自建的文件夹下,该如何做呢! 先看一下Jupyter notebook的默认界…

【ubuntu18.04】ubuntu18.04升级cmake-3.29.8及还原系统自带cmake操作说明

参考链接 cmake升级、更新&#xff08;ubuntu18.04&#xff09;-CSDN博客 升级cmake操作说明 下载链接 Download CMake 下载版本 下载软件包 cmake-3.30.3-linux-x86_64.tar.gz 拷贝软件包到虚拟机 cp /var/run/vmblock-fuse/blockdir/jrY8KS/cmake-3.29.8-linux-x86_64…

【华为路由】OSPF多区域配置

网络拓扑 设备接口地址 设备 端口 IP地址 RTA Loopback 0 1.1.1.1/32 G0/0/0 10.1.1.1/24 RTB Loopback 0 2.2.2.2/32 G0/0/0 10.1.1.2/24 G0/0/1 10.1.2.1/24 RTC Loopback 0 3.3.3.3/32 G0/0/0 10.1.2.2/24 G0/0/1 10.1.3.1/24 RTD Loopback 0 4.4.4…

大模型Transformer笔记:KV缓存

1 MHA&#xff08;Multi-Head Attention&#xff09; 最经典的多头注意力 等价于多个独立的单头注意力的拼接 对于LLM来说&#xff0c;一般都是自回归地一个一个token的输出&#xff0c;也就相当于只有Transformer的decoder input在变化&#xff0c;之前作为prompt部分的是不变…

java智能物流管理系统源码(springboot)

项目简介 智能物流管理系统实现了以下功能&#xff1a; 智能物流管理系统的主要使用者分为管理员&#xff0c;顾客&#xff0c;员工&#xff0c;店主。功能有个人中心&#xff0c;顾客管理&#xff0c;员工管理&#xff0c;店主管理&#xff0c;门店信息管理&#xff0c;门店…

【制造业&电子产品】电脑电子元件检测系统源码&数据集全套:改进yolo11-TADDH

改进yolo11-SCConv等200全套创新点大全&#xff1a;电脑电子元件检测系统源码&#xff06;数据集全套 1.图片效果展示 项目来源 人工智能促进会 2024.10.24 注意&#xff1a;由于项目一直在更新迭代&#xff0c;上面“1.图片效果展示”和“2.视频效果展示”展示的系统图片或者…

蓝桥杯题目理解

1. 一维差分 1.1. 小蓝的操作 1.1.1. 题目解析&#xff1a; 这道题提到了对于“区间”进行操作&#xff0c;而差分数列就是对于区间进行操作的好方法。 观察差分数列&#xff1a; 给定数列&#xff1a;1 3 5 2 7 1 差分数列&#xff1a;1 2 2 -3 5 6 题目要求把原数组全部…

基于Springboot+Vue的食品商城系统 (含源码数据库)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 这个系…

duilib的应用 在双屏异分辨率的显示器上 运行显示不出来

背景&#xff1a;win11&#xff0c;duilib应用&#xff0c;双显示器&#xff0c;两台分辨率相同&#xff0c;分别设置不同的缩放以后&#xff0c;应用运行以后&#xff0c;程序闪一下消失或者程序还在&#xff0c;但是UI显示不出来。 原因 窗口风格设置不合理&#xff0c;所以…

记录贴 为VScode配置C语言环境

大致步骤参考这位博主的过程&#xff1a;如何在 VS Code 中编写、运行C语言程序 教程_visual studio code怎么写c语言-CSDN博客 第一步&#xff1a;安装VScode。 第二步&#xff1a;安装两个插件&#xff1a;C/C Extension Pack和code runner。&#xff08;后面我发现&#x…