使用Java代码操作Kafka(五):Kafka消费 offset API,包含指定 Offset 消费以及指定时间消费

文章目录

    • 1、指定 Offset 消费
    • 2、指定时间消费


1、指定 Offset 消费

auto.offset.reset = earliest | latest | none 默认是 latest
(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

这个参数的力度太大了,不是从头,就是从尾
kafka提供了seek方法,可以让我们从分区的固定位置开始消费
seek(TopicPartition topicPartition,offset offset)

示例代码:

package com.bigdata.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;

public class CustomConsumerSeek {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // 关闭自动提交offset
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 2 订阅一个主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 执行计划
        // 此时的消费计划是空的,因为没有时间生成
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while(assignment.size() == 0){

            // 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
            assignment = kafkaConsumer.assignment();
        }
        // 获取所有分区的offset =5 以后的数据
        /*for (TopicPartition tp:assignment) {
            kafkaConsumer.seek(tp,5);
        }*/
        // 获取分区0的offset =5 以后的数据
        //kafkaConsumer.seek(new TopicPartition("bigdata",0),5);
        for (TopicPartition tp:assignment) {
            if(tp.partition() == 0){
                kafkaConsumer.seek(tp,5);
            }
        }
        
        while(true){
            //1 秒中向kafka拉取一批数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> record :records) {
                // 打印一条数据
                System.out.println(record);
                // 可以打印记录中的很多内容,比如 key  value  offset topic 等信息
                System.out.println(record.value());
            }

        }

    }
}

2、指定时间消费

示例代码:

package com.bigdata.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 *  从某个特定的时间开始进行消费
 */
public class Customer05 {

    public static void main(String[] args) {

        // 其实就是map
        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node01:9092");
        // 字段反序列化   key 和  value

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());


        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testf");

        // 指定分区的分配方案  为轮询策略
        //properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        
        // 指定分区的分配策略为:Sticky(粘性)
        ArrayList<String> startegys = new ArrayList<>();
        startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

        // 创建一个kafka消费者的对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?
        List<String> topics = new ArrayList<>();
        topics.add("five");// list总可以设置多个主题的名称
        kafkaConsumer.subscribe(topics);

        // 因为消费者是不停的消费,所以是while true

        // 指定了获取分区数据的起始位置。
        // 这样写会报错的,因为前期消费需要指定计划,指定计划需要时间
        // 此时的消费计划是空的,因为没有时间生成
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while(assignment.size() == 0){

            // 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
            assignment = kafkaConsumer.assignment();
        }

        Map<TopicPartition, Long> hashMap = new HashMap<>();
        for (TopicPartition partition:assignment) {
            hashMap.put(partition,System.currentTimeMillis()- 60*60*1000);
        }
        Map<TopicPartition, OffsetAndTimestamp> map = kafkaConsumer.offsetsForTimes(hashMap);

        for (TopicPartition partition:assignment) {

            OffsetAndTimestamp offsetAndTimestamp = map.get(partition);
            kafkaConsumer.seek(partition,offsetAndTimestamp.offset());
        }

        while(true){
            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 循环打印每一条数据
            for (ConsumerRecord record:records) {
                // 打印数据中的值
                System.out.println(record.value());
                System.out.println(record.offset());
                // 打印一条数据
                System.out.println(record);
            }

        }


    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/924991.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

windows安装itop

本文介绍 win10 安装 itop 安装WAMP集成环境前 先安装visual c 安装itop前需要安装WAMP集成环境(windowsApacheMysqlPHP) 所需文件百度云盘 通过网盘分享的文件&#xff1a;itop.zip 链接: https://pan.baidu.com/s/1D5HrKdbyEaYBZ8_IebDQxQ 提取码: m9fh 步骤一&#xff1…

【Linux打怪升级记 | 报错02】-bash: 警告:setlocale: LC_TIME: 无法改变区域选项 (zh_CN.UTF-8)

&#x1f5fa;️博客地图 &#x1f4cd;1、报错发现 &#x1f4cd;2、原因分析 &#x1f4cd;3、解决办法 &#x1f4cd;4、测试结果 1、报错发现 装好了CentOS操作系统&#xff0c;使用ssh远程登陆CentOS&#xff0c;出现如下告警信息&#xff1a; bash: 警告:setlocale…

前端-Git

一.基本概念 Git版本控制系统时一个分布式系统&#xff0c;是用来保存工程源代码历史状态的命令行工具 简单来说Git的作用就是版本管理工具。 Git的应用场景&#xff1a;多人开发管理代码&#xff1b;异地开发&#xff0c;版本管理&#xff0c;版本回滚。 Git 的三个区域&a…

原生html+css+ajax+php图片压缩后替换原input=file上传

当前大部分照片尺寸大于5MB&#xff0c;而50MB限制的PHP通常上传4MB左右 于是就需要压缩后上传&#xff0c;上5代码使用后筛选的代码 <?php if ($_SERVER[REQUEST_METHOD] POST) { $uploadDir uploads/ . date(Ymd) . /; if (!is_dir($uploadDir)) { mkdir($uploadDir, …

k8s集群部署metrics-server

1、Metrics Server介绍 Metrics Server 是集群级别的资源利用率数据的聚合器。从 Kubelets收集资源指标&#xff0c;并通过 Metrics API 在 Kubernetes apiserver 中公开它们&#xff0c;以供 Horizontal Pod Autoscaler 和Vertical Pod Autoscaler 使用。 Metrics API 也可以…

yolov5的pt模型转化为rk3588的rknn,并在rk3588上调用api进行前向推理

当使用yolov5进行目标检测且进行边缘计算的场景时&#xff0c;要考虑性价比或者国产化的话&#xff0c;rk3588板子是个不错的选择。 本篇介绍yolov5的pytorch模型转化为rknn的流程&#xff0c;并展示在rk板子上如何调用相关api来使用转好的rknn模型进行前向推理。 pt转rknn流程…

家校通小程序实战教程03学生管理

目录 1 创建数据源2 搭建后台功能3 设置主列字段4 批量导入数据5 设置查询条件6 实现查询和重置总结 我们现在已经搭建了班级管理&#xff0c;并且录入了班级口令。之后就是加入班级的功能了。这里分为老师加入班级和学生家长加入班级。 如果是学生家长的话&#xff0c;在加入之…

题目 3209: 蓝桥杯2024年第十五届省赛真题-好数

一个整数如果按从低位到高位的顺序&#xff0c;奇数位&#xff08;个位、百位、万位 &#xff09;上的数字是奇数&#xff0c;偶数位&#xff08;十位、千位、十万位 &#xff09;上的数字是偶数&#xff0c;我们就称之为“好数”。给定一个正整数 N&#xff0c;请计算从…

ASP.NET Core Web API 控制器

文章目录 一、基类&#xff1a;ControllerBase二、API 控制器类属性三、使用 Get() 方法提供天气预报结果 在深入探讨如何编写自己的 PizzaController 类之前&#xff0c;让我们先看一下 WeatherController 示例中的代码&#xff0c;了解它的工作原理。 在本单元中&#xff0c;…

scrapy豆瓣爬虫增强-批量随机请求头

1.1 豆瓣爬虫增强,中间件随机请求头 1.2 清除原有的中间件,进行中间件测试 1.3 导入全新的中间件 1.4 运行爬虫,这个时候的请求头是固定的 1.5 强化对agent的输出,会舍弃输出cookie,使输出更明了 1.6 转移输出请求头位置 新增输出 造成这样问题的原因是Douban/Douban/settings…

三维地形图计算软件(三)-原基于PYQT5+pyqtgraph旧代码

最先入手设计三维地形图及平基挖填方计算软件时&#xff0c;地形图的显示方案是&#xff1a;三维视图基于pyqtgraph.opengl显示和二维视图基于pyqtgraph的PlotWidget来显示地形地貌&#xff0c;作到一半时就发现&#xff0c;地形点过多时&#xff0c;将会造成系统卡顿(加载时主…

累积局部效应 (ALE) 图分析记录

Git地址&#xff1a;https://github.com/blent-ai/ALEPython/tree/dev 查看源码需要pip install alepython安装&#xff0c;这边查看源码发现就实际就一个py文件而已&#xff0c;我懒得再去安装&#xff0c;故直接下载源码&#xff0c;调用方法也可&#xff1b; # -*- coding:…

浅谈网络 | 应用层之HTTP协议

目录 HTTP 请求的准备HTTP 请求的构建HTTP 请求的发送过程HTTP 返回的构建HTTP 2.0QUIC 协议HTTP 3.0 在讲完传输层之后&#xff0c;我们接下来进入应用层的内容。应用层的协议种类繁多&#xff0c;那从哪里开始讲起呢&#xff1f;不妨从我们最常用、最熟悉的 HTTP 协议 开始。…

qt5.14.2跟vs2022配置

1.qt6要在线安装&#xff0c;安装时间比较长&#xff0c;要求网络要稳定&#xff0c;不适合快速安装 2.使用qt5.14.2离线安装包&#xff0c;安装速度快&#xff0c;可以快速安装。 3.安装完qt.5.14.2后打开QtCreate4.0.1&#xff0c;打开 工具->选项->Kits,发现如下图: 没…

华为海思2025届校招笔试面试经验分享

目前如果秋招还没有offer的同学&#xff0c;可以赶紧投递下面这些公司&#xff0c;都在补招。争取大家年前就把后端offer拿下。如果大家在准备秋招补录取过程中有任何问题&#xff0c;都可以私信小编&#xff0c;免费提供帮助。如果还有部分准备备战春招的同学&#xff0c;也可…

超详细ensp配置VRRP和MSTP协议

一、简介 1、什么是VRRP&#xff1a; &#xff08;1&#xff09;VRRP&#xff08;Virtual Router Redundancy Protocol&#xff09;的概念&#xff1a; VRRP&#xff08;Virtual Router Redundancy Protocol&#xff09;指的是一种实现路由器冗余备份的协议&#xff0c;常用于…

linux基础2

声明&#xff01; 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章&#xff0c;笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他均与本人以及泷羽sec团队无关&#…

SAAS美容美发系统架构解析

随着技术的不断发展&#xff0c;SAAS&#xff08;Software as a Service&#xff0c;软件即服务&#xff09;模式在各个行业的应用逐渐深化&#xff0c;美容美发行业也不例外。传统的美容美发店面通常依赖纸质记录、手动操作和复杂的管理流程&#xff0c;而随着SAAS平台的出现&…

和数集团业务说明会(南京站)顺利举办

2024年11月24日&#xff0c;上海和数集团业务说明会&#xff08;南京站&#xff09;&#xff0c;在南京希尔顿酒店成功举办。 和数集团董事长兼总经理唐毅先生&#xff0c;以其敏锐的行业洞察力和丰富的实践经验&#xff0c;向与会者分享了和数集团在区块链领域的丰厚研究成果和…

微积分复习笔记 Calculus Volume 1 - 6.9 Calculus of the Hyperbolic Functions

6.9 Calculus of the Hyperbolic Functions - Calculus Volume 1 | OpenStax