Kafka 消费者全面解析:原理、消费者 API 与Offset 位移

Kafka:分布式消息系统的核心原理与安装部署-CSDN博客

自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客

Kafka 生产者全面解析:从基础原理到高级实践-CSDN博客

Kafka 生产者优化与数据处理经验-CSDN博客

Kafka 工作流程解析:从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客

Kafka 消费者全面解析:原理、消费者 API 与Offset 位移-CSDN博客

Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客

Kafka 数据倾斜:原因、影响与解决方案-CSDN博客

Kafka 核心要点解析_kafka mirrok-CSDN博客

Kafka 核心问题深度解析:全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客

目录

一、Kafka 消费方式

二、Kafka 消费者工作流程

(一)消费者总体工作流程

(二)消费者组原理

(一)消费者组初始化流程

(二)消费计划制定

(三)消费者组详细消费流程

(三)消费者重要参数

三、消费者 API

(一)独立消费者案例(订阅主题)

需求

实现步骤

测试

(二)独立消费者案例(订阅分区)

 需求

实现步骤

测试

(三)消费者组案例

  需求

实现步骤

四、Offset 位移(重要)

(一)消费 offset 案例

(二)自动提交 offset

(三)手动提交 offset

同步提交 offset

异步提交 offset

(四)指定 Offset 消费【重要】

(五)指定时间消费

(六)漏消费和重复消费

五、总结


        在大数据处理领域,Apache Kafka 作为一款高性能的分布式消息队列系统,其消费者组件起着至关重要的作用。它负责从 Kafka 集群中读取消息,并进行相应的业务处理。本文将深入探讨 Kafka 消费者的各个方面,包括消费方式、工作流程、API 用法、位移管理以及生产经验中的消费者事务等,旨在帮助读者全面理解和掌握 Kafka 消费者的相关知识与技术要点。

一、Kafka 消费方式

        Kafka 提供了两种主要的消费方式:拉取(pull)模式和推送(push)模式。在拉取模式下,消费者主动向 Kafka 集群拉取消息,这种方式给予消费者更多的控制权,可以根据自身的处理能力和需求来决定何时获取消息,避免了消费者被消息洪流淹没的情况。而推送模式则是由 Kafka 集群主动将消息推送给消费者,但这种方式可能会导致消费者在处理能力不足时出现消息积压等问题。在实际应用中,Kafka 主要采用拉取模式,消费者通过定期调用 poll 方法从集群中获取消息。

二、Kafka 消费者工作流程

(一)消费者总体工作流程

        一个消费者组中的多个消费者协同工作,它们共同消费 Kafka 主题中的分区数据。需要注意的是,组内的多个消费者不会消费同一个分区的数据,这样可以确保数据不会被重复消费。每个消费者负责消费特定分区的数据,从而实现了数据的并行处理,提高了消费效率。

(二)消费者组原理

        消费者组(Consumer Group,CG)由多个消费者组成,其形成的关键条件是所有消费者具有相同的 groupid

  • 消费者组内每个消费者负责消费不同分区的数据,并且一个分区只能由一个组内消费者消费。这种分配方式保证了数据在消费者组内的均衡分布和高效处理。
  • 不同的消费者组之间互不影响,它们可以独立地消费相同主题的分区数据,从而实现了不同业务逻辑对同一数据的并行处理。

(一)消费者组初始化流程

(二)消费计划制定

        生产者将数据发送到各个分区后,每个 broker 节点都有一个协调器(coordinator)。消费者组在进行分区消费时,首先会根据 groupId 对 50 取模,确定对应的分区节点。例如,如果结果是 1 分区,那么 1 分区的协调器将成为本次消费者组的主导协调器。消费者会向该协调器进行注册,协调器从中随机选择一个消费者作为本次消费的 Leader。然后,将本次消费的具体情况发送给 Leader,由 Leader 制定消费计划,即确定哪个消费者消费哪个分区。最后,Leader 将消费计划发送给协调器,协调器再将计划群发给各个消费者,消费者按照计划进行消费。

(三)消费者组详细消费流程

(三)消费者重要参数

三、消费者 API

(一)独立消费者案例(订阅主题)

需求

        创建一个独立消费者,消费 first 主题中的数据。在消费者 API 代码中必须配置消费者组 id,若在命令行启动消费者时不填写消费者组 id,则会被自动填写随机的消费者组 id。

实现步骤

        创建包名与代码编写:首先创建包名 com.bigdata.kafka.consumer,然后编写代码。在代码中,通过 Properties 对象设置 Kafka 连接信息、字段反序列化方式以及消费者组 id 等参数。创建 KafkaConsumer 对象后,使用 subscribe 方法订阅 first 主题。在无限循环中,通过 poll 方法每隔一秒钟从 Kafka 集群中拉取数据,并对拉取到的数据进行循环打印。

package com.bigdata.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 *  编写代码消费kafka中的数据
 */
public class Customer01 {

    public static void main(String[] args) {

        // 其实就是map
        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        // 创建一个kafka消费者的对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?
        List<String> topics = new ArrayList<>();
        topics.add("first");// list总可以设置多个主题的名称
        kafkaConsumer.subscribe(topics);

        // 因为消费者是不停的消费,所以是while true
        while(true){
            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 循环打印每一条数据
            for (ConsumerRecord record:records) {

                // 打印数据中的值
                System.out.println(record.value());
                // 打印一条数据
                System.out.println(record);
            }
        }


    }
}

测试

(1)在 IDEA 中执行消费者程序

(2)在 Kafka 集群控制台,创建 Kafka 生产者,并输入数据。

bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic first

(3)在IDEA控制台查看。

(二)独立消费者案例(订阅分区)

 需求

创建一个独立消费者,消费 first 主题 0 号分区的数据

实现步骤

       代码编写:同样先创建 Properties 对象设置相关参数,创建 KafkaConsumer 对象。不同的是,通过 TopicPartition 对象指定要消费的分区,使用 assign 方法将该分区分配给消费者。然后在循环中拉取数据并打印,此时打印信息中会显示当前消费的分区信息以及消息值等。

package com.bigdata.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 *  编写代码消费kafka中的数据  消费者只消费某个固定分区的数据
 */
public class Customer02 {

    public static void main(String[] args) {

        // 其实就是map
        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        // 字段反序列化   key 和  value
        /**
         *  如何将自力传输到天安门看升国旗
         *   1、先将自己序列化   原子
         *   2、管道(网线)
         *   3、再进行反序列化 (自力的NDA)  活泼可爱的自力
         *   结论是:只要是一个对象,它想保存或者想传输,必须序列化
         *          传输过去之后,进行反序列化。
         *    比如:java  hadoop
         */
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "abc");

        // 创建一个kafka消费者的对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?

        List<TopicPartition> partitions = new ArrayList<>();
        partitions.add(new TopicPartition("first",0));
        // 指定某个分区进行消费
        kafkaConsumer.assign(partitions);

        // 因为消费者是不停的消费,所以是while true
        while(true){
            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 循环打印每一条数据
            for (ConsumerRecord record:records) {
                 // 打印数据中的值
                System.out.println("目前消费的分区是"+record.partition()+",value的值为:"+record.value());

                // 打印一条数据
                System.out.println(record);
            }
        }


    }
}

测试

(1)在 IDEA 中执行消费者程序。

(2)在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成几个 0 号分区的数据。

(3)在 IDEA 控制台,观察接收到的数据,只能消费到 0 号分区数据表示正确。

(三)消费者组案例

  需求

        测试同一个主题的分区数据只能由一个消费者组中的消费者消费一个分区的数据,不能同时消费多个分区的数据。

实现步骤

(1)运行CustomConsumer ,通过idea,将这个类运行三次

或者使用如下设置:

2023版本:

老版本:

配置完成,点击后面的运行,就可以同一个类,运行三次了。运行三次就是三个消费者。

(2)启动代码中的生产者发送50条数据,在 IDEA 控制台即可看到两个消费者在消费不同 分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码 Thread.sleep(2);)。

四、Offset 位移(重要)

位移概念

        Offset 是记录消费到哪里的一个值,它详细记录了哪个主题、哪个分区以及哪个位置的消息已经被消费。这对于保证数据的完整性和准确性以及在系统故障或重启后能够继续正确消费数据至关重要。

Offset 的默认维护位置

        从 Kafka 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,即 __consumer_offsets。在这个主题中,数据采用 key 和 value 的方式存储,其中 key 是 group.id + topic + 分区号,value 就是当前 offset 的值。Kafka 会每隔一段时间对这个 topic 进行 compact(压缩)操作,使得每个 group.id + topic + 分区号 只保留最新数据。而在 Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中。

(一)消费 offset 案例

思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

(1)在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false

默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

如果不修改是无法查看offset的值的,因为这些都是加密数据。

修改完,记得同步给其他的节点

重新启动zk和kafka.

zk.sh start

kf.sh start

(2)采用命令行方式,创建一个新的 topic。

bin/kafka-topics.sh --bootstrap-server hadoop11:9092 --create --topic bigdata --partitions 2 --replication-factor 2

(3)启动生产者往 bigdata 生产数据。

bin/kafka-console-producer.sh --topic  bigdata --bootstrap-server hadoop11:9092

(4)启动消费者消费 bigdata 数据。

bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic five --group suibian

注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)

假如出现消费不到数据的情况,将分区去掉或者组名称修改一下,起个别的名字

(5)查看消费者消费主题__consumer_offsets。

bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --property print.key=true

(二)自动提交 offset

        Kafka 提供了自动提交 offset 的功能,相关参数包括 enable.auto.commit(是否开启自动提交 offset 功能,默认是 true)和 auto.commit.interval.ms(自动提交 offset 的时间间隔,默认是 5s)。在代码中,通过设置 Properties 对象的相应参数来启用自动提交 offset,并设置提交时间间隔。消费者正常订阅主题并拉取数据,在拉取数据的循环中,无需手动处理 offset 提交,Kafka 会按照设定的时间间隔自动提交 offset。

package com.bigdata.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerAutoOffset {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");
        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        // 是否自动提交 offset  通过这个字段设置
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        // 提交 offset 的时间周期 1000ms,默认 5s
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 消费者订阅主题,主题有数据就会拉取数据
        // 指定消费的主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        // 一个消费者可以订阅多个主题
        kafkaConsumer.subscribe(topics);
        while(true){
            //1 秒中向kafka拉取一批数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> record :records) {
                // 打印一条数据
                System.out.println(record);
                // 可以打印记录中的很多内容,比如 key  value  offset topic 等信息
                System.out.println(record.value());
            }

        }

    }
}

(三)手动提交 offset

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。

手动提交offset的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

• commitSync (同步提交):必须等待offset提交完毕,再去消费下一批数据。

• commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

 

同步提交 offset

        由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交 offset 的示例。

package com.bigdata.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerByHandSync {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");
        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        // 是否自动提交 offset  通过这个字段设置
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);


        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 消费者订阅主题,主题有数据就会拉取数据
        // 指定消费的主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        // 一个消费者可以订阅多个主题
        kafkaConsumer.subscribe(topics);
        while(true){
            //1 秒中向kafka拉取一批数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> record :records) {
                // 打印一条数据
                System.out.println(record);
                // 可以打印记录中的很多内容,比如 key  value  offset topic 等信息
                System.out.println(record.value());
            }
            // 同步提交 offset
            kafkaConsumer.commitSync();

        }

    }
}

异步提交 offset

        虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

以下为异步提交 offset 的示例:

记得将自动提交给关了
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
            // 同步提交 offset
            //kafkaConsumer.commitSync();
            // 异步提交
            kafkaConsumer.commitAsync();

(四)指定 Offset 消费【重要】

        Kafka 提供了 auto.offset.reset 参数,其默认值为 latest,还可以设置为 earliest 或 none。当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如数据已被删除)

earliest 会自动将偏移量重置为最早的偏移量(等同于 --from-beginning

latest 则自动将偏移量重置为最新偏移量

none 如果未找到消费者组的先前偏移量,则向消费者抛出异常。

        此外,Kafka 还提供了 seek 方法,可以让我们从分区的固定位置开始消费。其入参为 seek(TopicPartition topicPartition, offset offset),通过 TopicPartition 对象指定主题和分区,结合 offset 就可以精确定位到某个主题、某个分区的某个 leader 副本的 active 日志文件的某个位置进行消费。在代码中,先订阅主题,然后通过 assignment 方法获取分区方案,当分区方案确定后,可以根据需求使用 seek 方法设置特定分区的 offset 进行消费。

package com.bigdata.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;

public class CustomConsumerSeek {

    public static void main(String[] args) {

        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092");
        // 字段反序列化   key 和  value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());

        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 2 订阅一个主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 执行计划
        // 此时的消费计划是空的,因为没有时间生成
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while(assignment.size() == 0){

            // 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
            assignment = kafkaConsumer.assignment();
        }
        // 获取所有分区的offset =5 以后的数据
        /*for (TopicPartition tp:assignment) {
            kafkaConsumer.seek(tp,5);
        }*/
        // 获取分区0的offset =5 以后的数据
        //kafkaConsumer.seek(new TopicPartition("bigdata",0),5);
        for (TopicPartition tp:assignment) {
            if(tp.partition() == 0){
                kafkaConsumer.seek(tp,5);
            }
        }
        
        while(true){
            //1 秒中向kafka拉取一批数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String,String> record :records) {
                // 打印一条数据
                System.out.println(record);
                // 可以打印记录中的很多内容,比如 key  value  offset topic 等信息
                System.out.println(record.value());
            }

        }

    }
}

注意:每次执行完,需要修改消费者组名;

假如 kafka 崩了,重启之后,想继续消费,怎么做?

1、确定要消费的主题是哪几个

2、使用命令或者其他的组件查看 __consumer_offset 主题下的偏移量信息,找到我们关心的主题再崩溃之前消费到了哪里。

3、使用 java 代码,里面有一个非常重要的方法 seek,指定需要消费的主题,分区以及偏移量,就可以继续消费了。

(五)指定时间消费

        在生产环境中,有时会遇到需要按照特定时间消费数据的情况,例如要求按照时间消费前一天的数据。首先在代码中设置消费者的相关参数,如连接信息、反序列化方式、消费者组 id 等,并订阅特定主题。然后通过 assignment 方法获取分区方案,在分区方案确定后,根据当前时间和指定的时间间隔计算出每个分区对应的时间戳,并将其封装到 Map<TopicPartition, Long> 中。接着使用 offsetsForTimes 方法获取每个分区在指定时间戳对应的 offset,最后通过 seek 方法将消费者的消费位置设置为对应的 offset,从而实现按照时间消费数据的功能。

package com.bigdata.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 *  从某个特定的时间开始进行消费
 */
public class Customer05 {

    public static void main(String[] args) {

        // 其实就是map
        Properties properties = new Properties();
        // 连接kafka
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        // 字段反序列化   key 和  value

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());


        // 配置消费者组(组名任意起名) 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testf");

        // 指定分区的分配方案  为轮询策略
        //properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

        ArrayList<String> startegys = new ArrayList<>();
        startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

        // 创建一个kafka消费者的对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?
        List<String> topics = new ArrayList<>();
        topics.add("five");// list总可以设置多个主题的名称
        kafkaConsumer.subscribe(topics);

        // 因为消费者是不停的消费,所以是while true

        // 指定了获取分区数据的起始位置。
        // 这样写会报错的,因为前期消费需要指定计划,指定计划需要时间
        // 此时的消费计划是空的,因为没有时间生成
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while(assignment.size() == 0){

            // 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环
            assignment = kafkaConsumer.assignment();
        }

        Map<TopicPartition, Long> hashMap = new HashMap<>();
        for (TopicPartition partition:assignment) {
            hashMap.put(partition,System.currentTimeMillis()- 60*60*1000);
        }
        Map<TopicPartition, OffsetAndTimestamp> map = kafkaConsumer.offsetsForTimes(hashMap);

        for (TopicPartition partition:assignment) {

            OffsetAndTimestamp offsetAndTimestamp = map.get(partition);
            kafkaConsumer.seek(partition,offsetAndTimestamp.offset());
        }

        while(true){
            // 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 循环打印每一条数据
            for (ConsumerRecord record:records) {
                // 打印数据中的值
                System.out.println(record.value());
                System.out.println(record.offset());
                // 打印一条数据
                System.out.println(record);
            }

        }


    }
}

(六)漏消费和重复消费

重复消费:已经消费了数据,但是 offset 没提交。

漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

五、总结

        本文对 Kafka 消费者进行了全面而深入的剖析,从消费方式、工作流程、API 用法到位移管理以及生产经验中的消费者事务等方面都进行了详细阐述。通过理解和掌握这些知识,开发者能够更好地利用 Kafka 消费者进行高效、准确的数据处理,构建可靠的大数据处理系统。在实际应用中,需要根据业务需求和场景特点合理选择消费方式、设置消费者参数以及处理 offset 相关操作,同时要关注漏消费和重复消费等问题,并通过消费者事务等技术手段来保障数据的完整性和一致性。希望本文能够为读者在 Kafka 消费者的学习和实践中提供有益的参考和指导,助力读者在大数据领域的技术探索和项目

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/920598.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AUTOSAR - 接口

Application Port Interface&#xff0c;Service Port Interface&#xff0c;除了IS-SERVICE字段外&#xff0c;其余都相同。 ClientServer 支持IsService <CLIENT-SERVER-INTERFACE UUID"523b6eb5-6814-4b10-893e-de3aa9b68b90"><SHORT-NAME>app_cs_1&…

Android Gradle自定义任务在打包任务执行完成后执行cmd命令

背景 在每次打包之后需要做某事&#xff0c;例如每次打包后我都会安装某个目录下的一个apk。这个apk是通过一堆shell命令过滤得到一个apk的地址&#xff0c;然后把执行的几个shell命令何必成一个alias指令&#xff0c;在打包后只需要执行alias指令实现功能。当然也可以直接写在…

2023AE软件、Adobe After Effects安装步骤分享教程

2023AE软件是一款由Adobe公司开发的视频编辑软件&#xff0c;也被称为Adobe After Effects。它在广告、电影、电视和网络视频等领域广泛应用&#xff0c;用于制作动态图形、特效、合成和其他视觉效果。该软件支持多种视频和音频文件格式&#xff0c;具有丰富的插件和预设&#…

Prometheus结合K8s(二)使用

上一篇介绍了如何搭建 Prometheus结合K8s&#xff08;一&#xff09;搭建-CSDN博客&#xff0c;这章介绍使用 页面访问 kubectl get svc -n prom 看promeheus和granfana的端口访问页面 Prometheus 点击status—target&#xff0c;可以看到metrics的数据来源&#xff0c;即各…

单片机学习笔记 1. 点亮一个LED灯

把基础的东西都过一下&#xff0c;用来学习记录一下。 目录 1、Keil工程 2、Keil实现代码 3、烧录程序 0、实现的功能 点亮一个LED灯 1、Keil工程 打开Keil&#xff0c;Project----New uVision Project&#xff0c;工程文件命名----OK 选择单片机类型AT89C52&#xff0c;和…

Ubuntu安装sublime Tex

Ubuntu安装sublime Text步骤_ubuntu sublime-CSDN博客 Sublime Text 3 - Sublime Text sudo dpkg -i sublime-text_build_3211_amd64.deb

基于FPGA(现场可编程门阵列)的SD NAND图片显示系统是一个复杂的项目,它涉及硬件设计、FPGA编程、SD卡接口、NAND闪存控制以及图像显示等多个方面

文章目录 0、前言 1、目标 2、图片的预处理 3、SD NAND的预处理 4、FPGA实现 4.1、详细设计 4.2、仿真 4.3、实验结果 前言 在上一篇文章《基于FPGA的SD卡的数据读写实现&#xff08;SD NAND FLASH&#xff09;》中&#xff0c;我们了解到了SD NAND Flash的相关知识&am…

Ubuntu24.04LTS设置root用户可远程登录

Ubuntu24.04LTS设置root用户可远程登录 文章目录 Ubuntu24.04LTS设置root用户可远程登录1. 设置root密码2. 设置root用户可远程登录1. 查看ssh服务是否安装2. 安装ssh服务3. 再次查看ssh服务是否安装4. 配置ssh文件5. 重启ssh服务6. root远程登录 1. 设置root密码 Ubuntu安装后…

VM虚拟机装MAC后无法联网,如何解决?

✨在vm虚拟机上&#xff0c;给虚拟机MacOS设置网络适配器。选择NAT模式用于共享主机的IP地址 ✨在MacOS设置中设置网络 以太网 使用DHCP ✨回到本地电脑上&#xff0c;打开 服务&#xff0c;找到VMware DHCP和VMware NAT&#xff0c;把这两个服务打开&#xff0c;专一般问题就…

python中的base64使用小笑话

在使用base64的时候将本地的图片转换为base64 代码如下&#xff0c;代码绝对正确 import base64 def image_to_data_uri(image_path):with open(image_path, rb) as image_file:image_data base64.b64encode(image_file.read()).decode(utf-8)file_extension image_path.sp…

bert的模型训练和使用情绪识别

bert我的理解从它的名字可以看出来&#xff0c;它是一种双向的编码器解码器结构&#xff0c;也就是说它非常的高效&#xff0c;非常的高级&#xff0c;在它非常小的麻雀腹内居然五脏俱全。所以按道理来说&#xff0c;它应该是比gpt模型更加全面。所以我们有学习它的必要。 安装…

一分钟学习数据安全——IAM系统的数据访问控制模型

数据访问控制是信息系统中至关重要的安全功能&#xff0c;通过对用户访问权限的管理&#xff0c;保护系统中的敏感数据和资源&#xff0c;防止未经授权的访问、篡改和滥用。 数据访问控制发生在身份鉴别之后&#xff0c;解决主体如何安全访问数据的问题。 实施数据访问控制一…

STARTS:一种用于自动脑电/脑磁(E/MEG)源成像的自适应时空框架|文献速递-基于深度学习的病灶分割与数据超分辨率

Title 题目 STARTS: A Self-adapted Spatio-temporal Framework for Automatic E/MEG SourceImaging STARTS&#xff1a;一种用于自动脑电/脑磁(E/MEG)源成像的自适应时空框架 01 文献速递介绍 电生理源成像&#xff08;Electrophysiological Source Imaging&#xff0c;E…

AI Large Language Model

AI 的 Large Language model LLM , 大语言模型&#xff1a; 是AI的模型&#xff0c;专门设计用来处理自然语言相关任务。它们通过深度学习和庞大的训练数据集&#xff0c;在理解和生成自然语言文本方面表现出色。常见的 LLM 包括 OpenAI 的 GPT 系列、Google 的 PaLM 和 Meta…

Spyglass:更改默认编辑器

相关阅读 Spyglasshttps://blog.csdn.net/weixin_45791458/category_12828934.html?spm1001.2014.3001.5482 Spyglass默认使用的是Vim(Small Version)作为其文本编辑器&#xff0c;如果希望使用其他文本编辑器&#xff08;比如gedit、nano、VS Code、Sublime Text&#xff09…

网络安全之接入控制

身份鉴别 ​ 定义:验证主题真实身份与其所声称的身份是否符合的过程&#xff0c;主体可以是用户、进程、主机。同时也可实现防重放&#xff0c;防假冒。 ​ 分类:单向鉴别、双向鉴别、三向鉴别。 ​ 主题身份标识信息:密钥、用户名和口令、证书和私钥 Internet接入控制过程 …

【网站推荐】the top trending open-source startups, every quarter

每季度最热门的开源初创公司 我们根据 GitHub 存储库自 2020 年以来的明星增长情况发布热门开源项目&#xff0c;并将其称为 Runa 开源初创公司 (ROSS) 指数。 una Capital actively invests in open-source startups (like Nginx and MariaDB) and considers an active deve…

java学习记录11

异常 在java中提供了处理异常的机制&#xff0c;能够帮助我们避免程序崩溃。 Throwable可以用来表示任何可以作为异常抛出的类&#xff0c;分为两种&#xff1a; Error和Exception。其中Error用来表示JVM无法处理的错误。程序被强制终止。 Exception又分为两种&#xff1a; 受…

IDEA如何导入项目,包括从git仓库(github)导入项目

前言 大家好&#xff0c;我是小徐啊。自从使用了IDEA开发Java应用后&#xff0c;我再也不想使用eclipse了。IDEA的好处真的太多了。今天小徐就来介绍下IDEA的入门知识&#xff0c;也就是如何导入一个项目。 IDEA如何导入项目 首先&#xff0c;打开IDEA&#xff0c;点击上方的…

GitLab|数据迁移

注意&#xff1a;新服务器GitLab版本需和旧版本一致 在旧服务器执行命令进行数据备份 gitlab-rake gitlab:backup:create 备份数据存储在 /var/opt/gitlab/backups/ 将备份数据传输到新服务器的/var/opt/gitlab/backups/下&#xff0c;并修改文件权限&#xff08;下载前和上传…