Kafka相关API开发

(一)引入依赖

        用API直接去操作kafka(读写数据)在实际开发中用的并不多,学习它主要还是为了加深对Kafka功能的理解。kafka的读写操作,实际开发中,是通过各类更上层的组件去实现。而这些组件在读写kafka数据时,用的当然是kafka的java api,比如flink、spark streaming和flume等。

<properties> 
   <kafka.version>2.4.1</kafka.version>
</properties>
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>${kafka.version}</version>       
</dependency>

(二)API 开发——producer 生产者

1.构造一个生产者,可以持续发送大量数据

2.构造一个生产者,有必须设置的参数:

bootstrap.server

key.seralizer

value.seralizer

其他的,可选

3.使用特定接口

kafka的生产者发送用户的业务数据时,必须使用org.apache.kafka.common.serialization.Serializer接口的实现类这一序列化框架来序列化用户的数据。

4.发往指定topic

构造一个Kafka生产者后,并没有固定数据要发往的topic,因此,可以将不同的数据发往不同的topic

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * kafka生产者API代码示例
 */
public class ProducerDemo {
    public static void main(String[] args) throws InterruptedException {
        // 泛型K:要发送的数据中的key
        // 泛型V:要发送的数据中的value
        // 隐含之意:kafka中的message,是Key-Value结果的(可以没有Key)
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node141:9092,node142:9092");

        // 因为,kafka底层存储没有类型维护机制,用户所发的所有数据类型,都必须变成 序列化后的byte[]
        // 所以,kafka的producer需要一个针对用户要发送的数据类型的序列化工具类
        // 且这个序列化工具类,需要实现kafka所提供的序列化工具接口:org.apache.kafka.common.serialization.Serializer
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /**
         * 代码中进行客户端参数配置的另一种写法
         */
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node141:9092,node142:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.ACKS_CONFIG, "all");// 消息发送应答级别

        // 构造一个生产者客户端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            // 将业务数据封装成客户端所能发送的封装格式
            // 0->abc0
            // 1->abc1

            // TODO:奇数发往abcx,偶数发往abcy
            ProducerRecord<String, String> message = null;
           if (i % 2 == 0) {
                message = new ProducerRecord<>("abcy", "user_id" + i, "doit_edu" + i);
            } else {
                message = new ProducerRecord<>("abcx", "user_id" + i, "doit_edu" + i);
            }
            // 消费时只会打印value的值,key并没有读到
            // 调用客户端去发送
            // 数据的发送动作在producer的底层是异步线程去异步发送的,即调用send方法立即执行完毕,直接走之后的代码,不代表数据发送成功
            producer.send(message);
            Thread.sleep(100);
        }

        // 关闭客户端
//        producer.flush();
        producer.close();
    }
}

5.消费消息

(三)API开发——consumer消费者

kafka消费者的起始消费位置有两种决定机制:

1.手动指定了起始位置,它肯定从指定的位置开始

2.如果没有手动指定起始位置,它去找消费者组之前所记录的偏移量开始

3.如果之前的位置也获取不到,就看参数:auto.offset.reset所指定的重置策略

4.如果指定的offset>原有分区内的最大offset,就自动重置到最大的offset

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 * kafka消费者API代码示例
 */
public class ConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node141:9092,node142:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // kafka的消费者,默认是从所属组之前所记录的偏移量开始消费,如果找不到之前记录的偏移量,则从如下参数配置的策略来确定消费起始偏移量:
        // 1.earliest:自动重置到每个分区的最前一条消息
        // 2.latest:自动重置到每个分区的最新一条消息
        // 3.none:如果没有为使用者的组找到以前的偏移,则向使用者抛出异常
        // 如果输入除了上述三种之外的,会向使用者抛出异常
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 如果latest消息找不到,consumer.seek就起作用了

        // 设置消费者所属的组id
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "d30-1");

        // 设置消费者自动提交最新的的消费位移——默认是开启的
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        // 设置自动提交位移的时间间隔——默认是5000ms
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");

        // 构造一个消费者客户端
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题(可以是多个)
//        consumer.subscribe(Collections.singletonList("abcx"));
        consumer.subscribe(Arrays.asList("abcx","abcy"));
        // 正则订阅主题
//        consumer.subscribe(Pattern.compile ("abc.*" ));

        // 显式指定消费起始偏移量
        /*TopicPartition abcxP0 = new TopicPartition("abcx", 0);
        TopicPartition abcxP1 = new TopicPartition("abcx", 1);
        consumer.seek(abcxP0,10);
        consumer.seek(abcxP1,15);*/

        // 循环往复拉取数据
        boolean condition = true;
        while (condition) {
            // 客户端去拉取数据的时候,如果服务端没有数据响应,会保持连接等待服务端响应
            // poll中传入的超时时长参数,是指等待的最大时长
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
            // Iterator:迭代器
            // Iterable:可迭代的,是迭代器的再封装
            // 实现了Iterable的对象,可以用增强for循环去遍历迭代,也可以从对象上取到iterator,来用iterator.hasNext来迭代
            // Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
            // 直接用for循环来迭代本次取到的一批数据
            for (ConsumerRecord<String, String> record : records) {
                // ConsumerRecord中,不光有用户的业务数据,还有Kafka注入的元数据
                String key = record.key();
                String value = record.value();

                // 本条消息所属的topic:拉取的时候可能不止一个topic,所以会有这个方法
                String topic = record.topic();
                // 本条数据所属的分区
                int partition = record.partition();

                // 本条数据的偏移量
                long offset = record.offset();

                //key的长度
                int keySize = record.serializedKeySize();
                //value的长度
                int valueSize = record.serializedValueSize();

                // 当前这条数据所在分区的leader的朝代纪年
                Optional<Integer> leaderEpoch = record.leaderEpoch();

                // kafka的数据底层存储中,不光有用户的业务数据,还有大量元数据
                // timestamp就是其中之一:记录本条数据的时间戳
                // 时间戳有两种类型:一个是CreateTime(这条数据创建的时间——生产者), LogAppendTime(broker往log里面追加的时间)
                TimestampType timestampType = record.timestampType();
                long timestamp = record.timestamp();

                // 数据头:是生产者在写入数据时附加进去的,相当于用户自定义的元数据
                // 在生产者写入消息时,可以自定义元数据,所以record.headers()方法就能够消费到
                // public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
                // 如果生产者写入消息时,没有定义元数据,record.headers()方法就不会消费到
                Headers headers = record.headers();

                //            for (Header header : headers) {
//                String hKey = header.key();
//                byte[] hValue = header.value();
//                String valueString = new String(hValue);
//                System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
//            }
                System.out.println(String.format(
                        "key = %s, value = %s,topic = %s , partition = %s, offset = %s," +
                                "leader的纪元 = %s, timestampType = %s ,timestamp = %s," +
                                " key序列化的长度 = %s, value 序列化的长度 = %s",
                        key, value, topic, partition, offset,
                        leaderEpoch.get(), timestampType.name, timestamp,
                        keySize, valueSize));
            }
        }

        // 对数据进行业务逻辑处理

        // 关闭客户端
        // consumer.close();
    }
}

有了上面两个API,先开启消费者,然后开启生产者,消费者控制就会输出消息。

 // 当前这条数据所在分区的leader的朝代纪年
Optional<Integer> leaderEpoch = record.leaderEpoch();

当leader有变化,leaderEpoch.get()的值就会+1,初始值为0

(四)API开发——指定偏移量订阅消息

1.subscribe与assign订阅

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

/**
 * 指定偏移量
 */
public class ConsumerDemo2 {
    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        // 从配置文件中加载
        props.load(ConsumerDemo2.class.getClassLoader().getResourceAsStream("consumer.properties"));
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "doit30-5");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    /*  // subscribe订阅,会参与消费者组的再均衡机制才能真正获得自己要消费的topic及其分区的
        consumer.subscribe(Collections.singletonList("ddd"));
        // 这里无意义地去拉一次数据,主要就是为了确保:分区分配动作已完成
        consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
        // 然后再定义到指定的偏移量,开始正式消费
        consumer.seek(new TopicPartition("ddd",0),2);*/

        // 既然要自己指定一个确定的起始消费位置,那通常隐含之意是不需要去参与消费者组自动再均衡机制,该方法比较常用
        // 那么,就不要使用subscribe来订阅主题
        consumer.assign(Arrays.asList(new TopicPartition("ddd", 0)));
        consumer.seek(new TopicPartition("ddd", 0), 4);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                int keySize = record.serializedKeySize();
                int valueSize = record.serializedValueSize();

                System.out.println(String.format(
                        "key = %s, value = %s,topic = %s , partition = %s, offset = %s," +
                                "leader的纪元 = %s, timestampType = %s ,timestamp = %s," +
                                " key序列化的长度 = %s, value 序列化的长度 = %s",
                        record.key(), record.value(), record.topic(), record.partition(), record.offset(),
                        record.leaderEpoch().get(), record.timestampType().name, record.timestamp(),
                        keySize, valueSize));
            }
        }
    }
}

2.subscribe与assign订阅具体区别

  • 通过subscribe()方法订阅主题具有消费者自动再均衡功能:

        在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。

  • assign()方法订阅分区时,是不具备消费者自动均衡的功能的:

        其实这一点从assign()方法参数可以看出端倪,两种类型 subscribe()都有 ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

3.取消订阅

        如果将subscribe(Collection)或 assign(Collection)集合参数设置为空集合,作用与unsubscribe()方法相同,如下示例中三行代码的效果相同:

consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());

组协调器就是x组写消费位移的leader副本所在的broker。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/904738.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

警务辅助人员管理系统小程序ssm+论文源码调试讲解

2系统关键技术 2.1 微信小程序 微信小程序&#xff0c;简称小程序&#xff0c;英文名Mini Program&#xff0c;是一种全新的连接用户与服务的方式&#xff0c;可以快速访问、快速传播&#xff0c;并具有良好的使用体验。 小程序的主要开发语言是JavaScript&#xff0c;它与普…

微服务设计模式 - 断路器模式 (Circuit Breaker Pattern)

微服务设计模式 - 断路器模式 (Circuit Breaker Pattern) 定义 断路器模式&#xff08;Circuit Breaker Pattern&#xff09;是云计算和微服务架构中的一种保护性设计模式&#xff0c;其目的是避免系统中的调用链出现故障时&#xff0c;导致系统瘫痪。通过断路器模式&#xff…

Yelp 数据集进行用户画像, 使用聚类做推荐

使用 Yelp 数据集进行用户画像&#xff08;User Profiling&#xff09;是一项有趣的任务&#xff0c;可以理解用户的偏好、行为和特征。以下是总结的一个基本的步骤&#xff0c;帮助构建用户画像 pandas 加载数据&#xff1a; import pandas as pd# 加载数据 users pd.read_…

DDRPHY数字IC后端设计实现系列专题之后端设计导入,IO Ring设计

本章详细分析和论述了 LPDDR3 物理层接口模块的布图和布局规划的设计和实 现过程&#xff0c;包括设计环境的建立&#xff0c;布图规划包括模块尺寸的确定&#xff0c;IO 单元、宏单元以及 特殊单元的摆放。由于布图规划中的电源规划环节较为重要&#xff0c; 影响芯片的布线资…

前端路由如何从0开始配置?vue-router 的使用

在 Web 开发中&#xff0c;路由是指根据 URL 的不同部分将请求分发到不同的处理函数或页面的过程。路由是单页应用&#xff08;SPA, Single Page Application&#xff09;和服务器端渲染&#xff08;SSR, Server-Side Rendering&#xff09;应用中的一个重要概念。 在开发中如何…

强化学习的数学原理-06随即近似理论和随机梯度下降

文章目录 Robbins-Monro algorithmStochastic gradient descentBGD、MBGD、 and SGDSummary Robbins-Monro algorithm 迭代式求平均数的算法 S t o c h a s t i c a p p r o x i m a t i o n ( S A ) Stochastic \; approximation \;(SA) Stochasticapproximation(SA)&#xf…

Apache Hive 通过Docker快速入门

QuickStarted 介绍 在伪分布式模式下在 docker 容器内运行 Apache Hive&#xff0c;以便为 Hive 提供以下快速启动/调试/准备测试环境 快速入门 步骤 1&#xff1a;拉取镜像 从 DockerHub 拉取镜像&#xff1a;https://hub.docker.com/r/apache/hive/tags。以下是最新的镜像…

【K8S系列】Kubernetes 中 NodePort 类型的 Service 无法访问的问题【已解决】

在 Kubernetes 中&#xff0c;NodePort 类型的 Service 允许用户通过每个节点的 IP 地址和指定的端口访问应用程序。如果 NodePort 类型的 Service 无法通过节点的 IP 地址和指定端口进行访问&#xff0c;可能会导致用户无法访问应用。本文将详细分析该问题的常见原因及其解决方…

逻辑卷动态扩容与缩容-----

一、创建逻辑卷 需求&#xff1a;创建一个2.5G大小的逻辑卷 思路&#xff1a; 1. 物理的设备 2. 将物理设备做成物理卷 pv 3. 创建卷组并将物理卷加入其中 vg 4. 创建逻辑卷 lv 5. 格式化逻辑卷 mkfs.ext4 6. 挂载使用 mount 步骤&#xff1a; 1. 物理设备【如何来分区】…

开关灯问题(c语言)

样例&#xff1a;10 10 &#xff0c;输出&#xff1a;1&#xff0c;4&#xff0c;9 5 5 &#xff0c;输出&#xff1a;1&#xff0c;4 代码如下 #include<stdio.h> //引入bool值的概念 #include<stdbool.h> int main() {int n 0;//n为灯的数量int m 0;…

扫雷游戏(C语言详解)

扫雷游戏&#xff08;C语言详解&#xff09; 放在最前面的1、前言&#xff08;扫雷游戏的简介&#xff09;2、扫雷游戏的规则&#xff08;简易版&#xff09;3、代码实现&#xff08;3.1&#xff09;提醒一下&#xff1a;( i ) 提醒1&#xff1a;( ii ) 提醒2&#xff1a; &…

在面试了些外包以后,我有了些自己的思考

大家好&#xff0c;我是洋子&#xff0c;最近公司在降本增效&#xff0c;需要把外包从北京迁移到陕西的某新一线城市&#xff0c;其实就是变相裁员&#xff0c;减少外包的成本&#xff0c;裁掉现有的员工&#xff0c;重新招聘新人 在整个测试行业&#xff0c;外包测试的比重是…

论文 | Ignore Previous Prompt: Attack Techniques For Language Models

这篇论文探讨了针对大型语言模型&#xff08;LLM&#xff09;的“提示注入”攻击&#xff0c;并提出了一种名为 PROMPTINJECT 的框架来研究这类攻击。 论文的主要内容包括&#xff1a;1. 提示注入攻击&#xff1a; 论文定义了“提示注入”的概念&#xff0c;即通过在用…

Django-中间件

定义&#xff1a; 编写中间件&#xff1a; 注册中间件&#xff1a; 添加中间件&#xff1a; 1.在项目目录下添加一个文件夹&#xff08;名字随意&#xff09;&#xff0c;然后文件夹下创建.py文件 2.将中间件添加到setting文件中 MIDDLEWARE [django.middleware.security.Se…

MBR20100CT-ASEMI半塑封肖特基二极管MBR20100CT

编辑&#xff1a;ll MBR20100CT-ASEMI半塑封肖特基二极管MBR20100CT 型号&#xff1a;MBR20100CT 品牌&#xff1a;ASEMI 封装&#xff1a;TO-220 安装方式&#xff1a;插件 批号&#xff1a;最新 最大平均正向电流&#xff08;IF&#xff09;&#xff1a;20A 最大循环…

操作数据表

创建表 创建表语法&#xff1a; CREATE TABLE table_name ( field1 datatype [COMMENT 注释内容], field2 datatype [COMMENT 注释内容], field3 datatype ); 注意&#xff1a; 1. 蓝色字体为关键字 2. CREATE TABLE 是创建数据表的固定关键字&#xff0c;表…

一、ARMv8寄存器之通用、状态、特殊寄存器

ARMV8核心寄存器数量是非常大的&#xff0c;为了更好的学习&#xff0c;可以划分为以下几大类&#xff1a; 通用寄存器。这类寄存器主要是用来暂存数据和参与运算。通过load\store指令操作。状态寄存器。AArch64体系结构使用PSTATE寄存器表示当前处理器状态。特殊寄存器。有专门…

WPF+MVVM案例实战(六)- 自定义分页控件实现

文章目录 1、项目准备2、功能实现1、分页控件 DataPager 实现2、分页控件数据模型与查询行为3、数据界面实现 3、运行效果4、源代码获取 1、项目准备 打开项目 Wpf_Examples&#xff0c;新建 PageBarWindow.xaml 界面、PageBarViewModel.cs ,在用户控件库 UserControlLib中创建…

【Docker】构建Linux云桌面环境

目录 一、说明 二、离线安装Docker 1&#xff09;将下载的包上传到服务器上去 2&#xff09;安装docker 3) 启动docker 4&#xff09;配置加速器 三、安装云桌面镜像 四、启动云桌面 方式一&#xff1a;docker命令直接运行 方式二&#xff1a;docker-compose方式 五…

Easysearch 与 LLM 融合打造知识库系统

文章目录 一、LangChain 简介二、RAG 产生的背景及其局限性三、RAG 工作流程四、 Easysearch 结合 LLM 实现 RAG&#xff08;1&#xff09;Easysearch 简介&#xff08;2&#xff09;结合实现RAG 五、 Easysearch 结合 LLM 实现 RAG 的优势&#xff08;1&#xff09;提高检索准…