【快速解决】kafka崩了,重启之后,想继续消费,怎么做?

目录

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

1、一个问题:

 2、查看消费者消费主题__consumer_offsets

3、一个重要前提:消费时要提交offset

二、指定 Offset 消费


假如遇到kafka崩了,你重启kafka之后,想要继续消费,应该怎么办?

  1. 首先确定要消费的主题是哪几个
  2. 其次使用命令或者其他的组件查看 __consumer_offset 主题下的偏移量信息,找到我们关心的主题在崩溃之前消费到了哪里。
  3. 最后使用 java 代码,里面有一个非常重要的方法 seek,指定需要消费的主题,分区以及偏移量,就可以继续消费了。

下面是解决这个问题的具体步骤!!! 

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

        因为__consumer_offset 主题下记录了主题的偏移量信息,所以提交offset之后,消费__consumer_offset 主题便可查看所有主题的偏移量信息

1、一个问题:

__consumer_offset 主题下的数据是不能查看的,怎么解决?

解决方案:

在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false。然后分发一下

分发:(这里我使用了脚本)

xsync.sh /opt/installs/kafka3/config/consumer.properties

注意:修改之前要先关闭kafka和zookeeper,修改完毕后再开启!

说明:默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。如果不修改是无法查看offset的值的,因为这些都是加密数据。

 2、查看消费者消费主题__consumer_offsets

kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --consumer.config /opt/installs/kafka3/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 

查询结果如下:

图中:1是消费者组名;2是Topic主题名;3是分区;4是偏移量,说明该主题崩溃前消费到了这里

此时便查询到了偏移量信息!!!

3、一个重要前提:消费时要提交offset

能查询到偏移量的前提是消费时要自动提交 offset(默认开启)或者手动提交 offset

一般不用管,因为自动提交会默认开启!!!

二、指定 Offset 消费

 kafka提供了seek方法,可以让我们从分区的固定位置开始消费。

seek (TopicPartition Partition,offset offset):指定分区和偏移量

package com.bigdata._03offsetTest;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;

public class _03CustomConsumerSeek {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //连接kafka setProperty和put都行
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test5");

        // 是否自动提交 offset  通过这个字段设置
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?
        List<String> topics = new ArrayList<>();
        topics.add("bigdata");// list可以设置多个主题的名称
        kafkaConsumer.subscribe(topics);

        // 执行计划
        // 此时的消费计划是空的,因为没有时间生成
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while(assignment.size() == 0){

            // 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
            assignment = kafkaConsumer.assignment();
        }
        
        // 获取分区0的offset =100 以后的数据
       kafkaConsumer.seek(new TopicPartition("bigdata",0),100);
        // 因为消费者是不停的消费,所以是while true
        while(true){
            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 循环打印每一条数据
            for (ConsumerRecord record:records) {
                // 打印一条数据
                System.out.println(record);
                // 打印数据中的值
                System.out.println(record.value());
            }
        }
    }
}

执行这个java代码就可以从精确的指定位置继续消费了!!!

结果如下:

从上图可以看出,确实是从指定的主题、分区、偏移量开始消费的! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/915062.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

matlab建模入门指导

本文以水池中鸡蛋温度随时间的变化为切入点&#xff0c;对其进行数学建模并进行MATLAB求解&#xff0c;以更为通俗地进行数学建模问题入门指导。 一、问题简述 一个煮熟的鸡蛋有98摄氏度&#xff0c;将它放在18摄氏度的水池中&#xff0c;五分钟后鸡蛋的温度为38摄氏度&#x…

51单片机应用开发(进阶)---定时器应用(电子时钟)

实现目标 1、巩固定时器的配置流程&#xff1b; 2、掌握按键、数码管与定时器配合使用&#xff1b; 3、功能1&#xff1a;&#xff08;1&#xff09;简单显示时间。显示格式&#xff1a;88-88-88&#xff08;时-分-秒&#xff09; 4、功能2&#xff1a;&#xff08;1&#…

【外包】软件行业的原始形态,项目外包与独立开发者

【外包】互联网软件行业的原始形态&#xff0c;项目外包与独立开发者 本科期间写的一些东西&#xff0c;最近整理东西看到了&#xff0c;大致整理一下放出来&#xff0c;部分内容来自其他文章&#xff0c;均已引用。 文章目录 1、互联网软件行业的原始形态2、项目订单&#xff…

Python酷库之旅-第三方库Pandas(208)

目录 一、用法精讲 971、pandas.MultiIndex.set_levels方法 971-1、语法 971-2、参数 971-3、功能 971-4、返回值 971-5、说明 971-6、用法 971-6-1、数据准备 971-6-2、代码示例 971-6-3、结果输出 972、pandas.MultiIndex.from_arrays类方法 972-1、语法 972-2…

基于ConvNeXt的矿石种类识别

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【基于CNN-RNN的影像报告生成】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模型实现…

C++入门基础知识140—【关于C++ 类构造函数 析构函数】

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///C爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于C 类构造函数 & 析构函数的相关内容…

【Linux】-学习笔记03

第十一章-管理Linux软件包和进程 1.源码下载安装软件 1.1概念 源码文件&#xff1a;程序编写者使用C或C等语言编写的原始代码文本文件 源码文件使用.tar.gz或.tar.bz2打包成压缩文件 1.2特点 源码包可移植性好&#xff0c;与待安装软件的工作环境依赖性不大 由于有编译过程…

鸿蒙HarmonyOS(ArkUI基础篇大合集!)

文章目录 ArkUI&#xff08;方舟UI框架&#xff09;1.简介2.基本概念3.概述4.布局1.概述2.通用布局属性&#x1f388;1.盒子属性2.背景属性3.定位属性4.通用属性&#x1f388; 3.线性布局4.弹性布局(Flex)5.层叠布局(Stack) 5.组件1.使用文本1.文本显示(Text/Span)2.文本输入 (…

Prompt 工程

Prompt 工程 1. Prompt 工程简介 “预训练-提示预测”范式是近年来自然语言处理&#xff08;NLP&#xff09;领域的一个重要趋势&#xff0c;它与传统的“预训练-微调-预测”范式相比&#xff0c;提供了一种更为灵活和高效的模型应用方式。 Prompt工程是指在预训练的大型语言…

十天入门javaScript第四天(Promises对象异步 )(睡眠函数) (json)

Promise 是一个 JavaScript 的内置对象&#xff0c;它代表了一个异步操作的最终完成&#xff08;或失败&#xff09;及其结果值。Promise 对象是异步编程的一种解决方案&#xff0c;它可以使异步操作以更简洁、更易于管理的方式进行。 Promise 对象有三个状态&#xff1a; Pen…

【C#设计模式(8)——过滤器模式(Adapter Pattern)】

前言 滤液器模式可以很方便地实现对一个列表中的元素进行过滤的功能&#xff0c;能方便地修改滤器的现实&#xff0c;符合开闭原则。 代码 //过滤接口public interface IFilter{List<RefuseSorting> Filter(List<RefuseSorting> refuseList);}//垃圾分类public cla…

开源共建 | 长安链开发常见问题及规避

长安链开源社区鼓励社区成员参与社区共建&#xff0c;参与形式包括不限于代码贡献、文章撰写、社区答疑等。腾讯云区块链王燕飞在参与长安链测试工作过程中&#xff0c;深入细致地总结了长安链实际开发应用中的常见问题及其有效的规避方法&#xff0c;相关内容多次解答社区成员…

华为云创建ECS前台展示规格类型选项是怎么做到的?

前台展示很多规格可选&#xff0c;怎么做到的&#xff1f;先了解规格其实都是管理员在后台service_OM创建好规格 1.规格 1.1设置自定义标签打通规格和主机组还能体验调度功能 引申&#xff1a;AZ可用分区&#xff08;为了做容灾&#xff09; 为什么在界面可以让我√az0.dc0,…

Linux网络——自定义协议与序列化

一、协议 协议是一种 " 约定 ". socket api 的接口 , 在读写数据时 , 都是按 " 字符串 " 的方式来发送接收的。如 果我们要传输一些 " 结构化的数据 "&#xff0c;依然可以通过协议。 其实&#xff0c;协议就是双方约定好的结构化的数据。…

《TCP/IP网络编程》学习笔记 | Chapter 6:基于UDP的服务器端/客户端

《TCP/IP网络编程》学习笔记 | Chapter 6&#xff1a;基于UDP的服务器端/客户端 《TCP/IP网络编程》学习笔记 | Chapter 6&#xff1a;基于UDP的服务器端/客户端理解UDPUDP套接字的特点UDP内部工作原理UDP的高效使用 实现基于UDP的服务器端/客户端UDP中的服务器端和客户端没有连…

Linux也有百度云喔~

一、写在前面 经常有粉丝向我抱怨&#xff0c;为什么每次发放资料都用百度云&#xff0c;自己下载了一遍之后还得再上传一遍服务器才能分析。其实大家大可不必这么周转&#xff0c;百度云也有Linux的发行版本&#xff0c;利用python包bypy来管理/传输百度云盘资源也很方便(别问…

从0开始机器学习--Day23--支持向量机

经过前面的学习&#xff0c;我们已经知道在解决问题时&#xff0c;重要的不仅仅是要在算法A或算法B中选择更优的&#xff0c;而是考虑怎么选择用于学习算法的特征和正则化参数&#xff0c;相比神经网络和逻辑回归&#xff0c;支持向量机在这两个方面做得更好。 优化目标(Optimi…

JavaScript 中实例化生成对象的相关探讨

JavaScript 中实例化生成对象的相关探讨 在 JavaScript 世界中&#xff0c;对象的实例化是一个关键且基础的概念。当我们使用构造函数创建对象时&#xff0c;会引发一系列关于对象之间联系、原型链以及相关概念的思考。 让我们通过一段代码来深入探讨这些问题&#xff1a; f…

MatSci-LLM ——潜力和挑战以及大规模语言模型在材料科学中的应用

概述 大规模语言模型的出现正在从根本上改变技术开发和研究的方式。大规模语言模型不仅对自然语言处理领域产生了重大影响&#xff0c;而且对许多相关领域也产生了重大影响&#xff0c;例如从文本生成图像的计算机视觉&#xff08;Zhang 等人&#xff0c;2023 年&#xff09;。…

【C++】C++11特性(上)

✨✨欢迎大家来到Celia的博客✨✨ &#x1f389;&#x1f389;创作不易&#xff0c;请点赞关注&#xff0c;多多支持哦&#x1f389;&#x1f389; 所属专栏&#xff1a;C 个人主页&#xff1a;Celias blog~ 目录 一、列表初始化 二、std::initializer_list 三、右值引用和移…