Kafka Java API

1、增加依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

2、三个案例

案例1:生产数据

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class Demo1KafkaProducer {
    public static void main(String[] args) {

        Properties properties = new Properties();
        //指定broker列表
        properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");

        //指定key和value的数据格式
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //生产数据
        producer.send(new ProducerRecord<>("words","java"));
        producer.flush();

        //关闭连接
        producer.close();
    }
}

案例2: 文件生产数据到kafka(读取)

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Demo2FileToKaFka {
    public static void main(String[] args) throws Exception{
        Properties properties = new Properties();
        //指定broker列表
        properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");

        //指定key和value的数据格式
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //读取文件
        BufferedReader bs = new BufferedReader(new FileReader("flink/data/student.csv"));
        String line;
        while ((line=bs.readLine())!=null){
            //生产数据    如果指定分区默认为轮循添加数据
            producer.send(new ProducerRecord<>("students",line));
            producer.flush();
        }

        //关闭连接
        bs.close();
        producer.close();
    }
}

创建控制台消费者消费数据 

kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic bigdata

使用hash分区的方式改写该案例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;

public class Dem3FileToKafkaWithHash {
    public static void main(String[] args) throws Exception {

        Properties properties = new Properties();
        //指定broker列表
        properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");

        //指定key和value的数据格式
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        //读取文件
        FileReader fileReader = new FileReader("flink/data/student.csv");
        BufferedReader bufferedReader = new BufferedReader(fileReader);

        String line;
        while ((line = bufferedReader.readLine()) != null) {

            String clazz = line.split(",")[4];
            //hash分区
            int partition = Math.abs(clazz.hashCode()) % 3;

            //生产数据
            //kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic students_hash_partition
            //指定分区生产数据
            producer.send(new ProducerRecord<>("students_hash_partition", partition, null, line));
            producer.flush();
        }

        //关闭连接
        fileReader.close();
        bufferedReader.close();
        producer.close();
    }
}

案例3:消费kafka中的数据

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.Properties;

public class Demo4Consumer {
    public static void main(String[] args) {
        Properties properties = new Properties();

        //指定kafka集群列表
        properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");

        //指定key和value的数据格式
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        /*
         * earliest
         * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
         * latest  默认
         * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据
         * none
         * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
         *
         */

        properties.setProperty("auto.offset.reset", "earliest");
        //指定消费者组,一条数据在一个组内只消费一次
        properties.setProperty("group.id", "java_kafka_group1");

        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //订阅topic
        ArrayList<String> topics = new ArrayList<>();
        topics.add("hash_students");
        consumer.subscribe(topics);

        //死循环拉取数据,使数据全部拉取完毕
        while (true) {
            //拉取数据  默认只会拉取500条数据
            ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);

            //解析数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                String topic = consumerRecord.topic();         //主题
                long offset = consumerRecord.offset();         //偏移量
                int partition = consumerRecord.partition();    //分区
                String value = consumerRecord.value();         //数据
                long timestamp = consumerRecord.timestamp();   //处理时间
                System.out.println(topic + "\t" + offset + "\t" + partition + "\t" + value + "\t" + timestamp);
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/667161.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot HelloWorld 之 实现注册功能

SpringBoot HelloWorld 之 实现注册功能 一.配置 创建数据库big_event CREATE TABLE user (id int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT ID,username varchar(20) COLLATE utf8_unicode_ci NOT NULL COMMENT 用户名,password varchar(32) COLLATE utf8_unicode_ci …

β-烟酰胺单核苷酸(NMN)功能不断得到验证 市场规模呈增长态势

β-烟酰胺单核苷酸&#xff08;NMN&#xff09;功能不断得到验证 市场规模呈增长态势 β-烟酰胺单核苷酸&#xff08;β-Nicotinamide mononucleotide&#xff0c;NMN&#xff09;是一种生物活性分子&#xff0c;是一种辅酶Ⅰ&#xff08;NAD&#xff09;的前体&#xff0c;也是…

WPF Binding对象

在WinForm中&#xff0c;我们要想对控件赋值&#xff0c;需要在后台代码中拿到控件对象进行操作&#xff0c;这种赋值形式&#xff0c;从根本上是无法实现界面与逻辑分离的。 在WPF中&#xff0c;微软引入了Binding对象&#xff0c;通过Binding&#xff0c;我们可以直接将控件与…

NTFS磁盘格式读写工具:Tuxera NTFS 2021 for Mac

Tuxera NTFS 是一款用于 macOS 系统的 NTFS 文件系统驱动程序。NTFS 是 Windows 系统中常用的文件系统&#xff0c;而 macOS 默认只支持读取 NTFS 格式的磁盘&#xff0c;不能进行写入操作。因此&#xff0c;如果你需要在 macOS 上进行 NTFS 磁盘的写入操作&#xff0c;就需要安…

php使用openssl返回false报错0308010C

本地php使用openssl返回false, 但是在服务器上测试正常openssl_encrypt($jsonStr, DES-ECB, $key, OPENSSL_RAW_DATA, ); 查看错误 openssl_error_string(); error:0308010C:digital envelope routines::unsupported 原因是: 服务器上的openssl是1.1版本, 本地是3.0版本 通…

真北5月小结|物事人心向上

1、跑步 今年的计划是每月跑15小时。五月实际跑了13小时17分。一到五月共跑了74小时43分&#xff0c;所以按平均每月15小时&#xff0c;还欠17分&#xff0c;六月补上。 另外两个跑步的标准是&#xff1a;保持跑步三天可见&#xff0c;最近龙舟雨&#xff0c;对这一条干扰很大&…

iframe内嵌网页自适应缩放 以展示源网页的比例尺寸

需求:这是我最近开发的低代码平台遇到的需求 ,要求将配置好的应用在弹框中预览(将预览网页内嵌入弹框中) 但是内嵌进入后 他会截取一部分(我源网站网页尺寸 是1980x1080 或者 3060X2160等等) 但是我这个dialog弹框只有我自定义的1000多px的宽高 他只会展示我iframe网页的一部分…

k8s之PV、PVC

文章目录 k8s之PV、PVC一、存储卷1、存储卷定义2、存储卷的作用2.1 数据持久化2.2 数据共享2.3 解耦2.4 灵活性 3、存储卷的分类3.1 emptyDir存储卷3.1.1 定义3.1.2 特点3.1.3 用途3.1.4 示例 3.2 hostPath存储卷3.2.1 定义3.2.2 特点3.2.3 用途3.2.4 示例 3.3 NFS存储卷3.3.1 …

BioVendor—sHLA-G ELISA试剂盒

人类白细胞抗原-G (HLA-G)与其他MHC类基因的不同之处在于它的低多态性和产生七种HLA-G蛋白的选择性剪接&#xff0c;这些蛋白的组织分布局限于正常的胎儿和成人组织&#xff0c;这些组织对先天和后天免疫细胞都具有耐受性。可溶性HLA-G是一种免疫抑制分子&#xff0c;诱导活化的…

PyQt5创建与MySQL数据库集成的应用程序

最近&#xff0c;对之前的mysql管理系统进行了更新升级&#xff0c;制作了一版关于车牌的管理系统&#xff01; &#xff08;1&#xff09;实现了对车牌和用户基本信息的增删改查的功能 &#xff01; &#xff08;2&#xff09;加入了对数据库的刷新和状态显示功能 &#xff…

Python3位运算符

前言 本文介绍的是位运算符&#xff0c;位运算可以理解成对二进制数字上的每一个位进行操作的运算&#xff0c;位运算分为 布尔位运算符 和 移位位运算符。 文章目录 前言一、位运算概览1、布尔位运算符1&#xff09;按位与运算符 ( & )2&#xff09;按位或运算符 ( | )3…

一款C#开源、简单、免费的屏幕录制和GIF动画制作神器

前言 今天要给大家推荐一款由C#语言开发且开源的操作简单、免费的屏幕录制和GIF动画制作神器&#xff1a;ScreenToGif 。 工具介绍 ScreenToGif 是一款免费的开源屏幕录制和GIF 制作工具。它可以帮助用户捕捉计算机屏幕上的实时动画&#xff0c;并将其保存为高质量的 GIF 图像…

【鸟哥】Linux笔记-硬件搭配

在Linux这个系统当中&#xff0c;几乎所有的硬件设备文件都在/dev这个目录内。打印机与软盘呢&#xff1f;分别是/dev/lp0, /dev/fd0。 几个常见的设备与其在Linux当中的文件名&#xff1a; 如果你的机器使用的是跟网际网络供应商 &#xff08;ISP&#xff09; 申请使用的云端…

Unity【入门】脚本基础

Unity脚本基础 文章目录 1、脚本基本规则1、创建规则2、MonoBehavior基类3、不继承MonoBehavior的类4、执行的先后顺序5、默认脚本内容 2、生命周期函数1、概念2、生命周期函数有哪些3、生命周期函数支持继承多态 3、Inspector窗口可编辑的变量4、Mono中的重要内容1、重要成员2…

thinkphp6 queue队列的maxTries自定义

前景需求&#xff1a;在我们用队列的时候发现maxtries的个数时255次&#xff0c;这个太影响其他队列任务 我目前使用的thinkphp版本是6.1 第一部定义一个新的类 CustomDataBase&#xff08;我用的mysql数据库存放的队列&#xff09; 重写__make 和createPlainPayload方法 …

第10周 企业认证、分布式事务,分布式锁方案落地

第10周 企业认证、分布式事务&#xff0c;分布式锁方案落地 ********************************************************************************************** 本周我们将对企业入驻认证的流程进行落地&#xff0c;并且结合分布式缓存中间件Redis与Redisson进行相关的技术方…

Easy RoCE:在SONiC交换机上一键启用无损以太网

RDMA&#xff08;远程直接内存访问&#xff09;技术是一种绕过 CPU 或操作系统&#xff0c;在计算机之间直接传输内存数据的技术。它释放了内存带宽和 CPU&#xff0c;使节点之间的通信具有更低的延迟和更高的吞吐量。目前&#xff0c;RDMA 技术已广泛应用于高性能计算、人工智…

web项目规范配置(husky、eslint、lint-staged、commit)

背景&#xff1a; 团队开发为了保证提交代码格式统一&#xff0c;通常在进行代码提交的时候对暂存区代码进行校验&#xff0c;如没有通过eslint(本例使用eslint)校验&#xff0c;则不能提交到远端。 安装依赖 husky 、eslint 、prettier 、lint-staged npm install husky e…

【Uniapp小程序】自定义导航栏uni-nav-bar滚动渐变色

效果图 新建activityScrollTop.js作为mixins export default {data() {return {navBgColor: "rgba(0,0,0,0)", // 初始背景颜色为完全透明navTextColor: "rgba(0,0,0,1)", // 初始文字颜色};},onPageScroll(e) {// 设置背景const newAlpha Math.min((e.s…

手机耳机哪个品牌音质好

在寻找音质出色的手机耳机时&#xff0c;品牌选择显得尤为重要。市场上众多知名品牌提供了各式各样的耳机产品&#xff0c;它们在音质、降噪功能、设计等方面各有千秋。以下是一些在音质上表现优异的手机耳机品牌的分析&#xff1a; 索尼&#xff1a;索尼的耳机以其卓越的降噪技…