分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式

文章目录

    • 1. 自动提交消费位移
    • 2. 自动提交消费位移存在的问题?
    • 3. 手动提交消费位移
      • 1. 同步提交消费位移
      • 2. 异步提交消费位移
      • 3. 同步和异步组合提交消费位移
      • 4. 提交特定的消费位移
      • 5. 按分区提交消费位移
    • 4. 消费者查找不到消费位移时怎么办?
    • 5. 如何从特定分区位移处读取消息?
    • 6. 如何优雅地退出轮询循环消费?

1. 自动提交消费位移

最简单的提交方式是让消费者自动提交偏移量,自动提交 offset 的相关参数:

  • enable.auto.commit:是否开启自动提交 offset 功能,默认为 true;
  • auto.commit.interval.ms:自动提交 offset 的时间间隔,默认为5秒;

如果 enable.auto.commit 被设置为true,那么每过5秒,消费者就会自动提交 poll() 返回的最大偏移量,即将拉取到的每个分区中最大的消息位移进行提交。提交时间间隔通过 auto.commit.interval.ms 来设定,默认是5秒。与消费者中的其他处理过程一样,自动提交也是在轮询循环中进行的。消费者会在每次轮询时检查是否该提交偏移量了,如果是,就会提交最后一次轮询返回的偏移量。

① 启动消费者消费程序,并设置为自动提交消费者位移的方式:

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-ni");

        // 显式配置消费者自动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        // 显式配置消费者自动提交位移的事件间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,4);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 订阅主题
        consumer.subscribe(Arrays.asList("ni"));

        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.printf("主题 = %s, 分区 = %d, 位移 = %d, " + "消息键 = %s, 消息值 = %s\n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

② 启动生产者程序发送3条消息,消息的内容都为 hello,kafka

③ 查看消费者消费的消息记录:

主题 = ni, 分区 = 0, 位移 = 0, 消息键 = null, 消息值 = hello,kafka
主题 = ni, 分区 = 0, 位移 = 1, 消息键 = null, 消息值 = hello,kafka
主题 = ni, 分区 = 0, 位移 = 2, 消息键 = null, 消息值 = hello,kafka

可以看到,消费者消费分区的最新消息的位移为 offset= 2,即消费者的消息位移为 offset =2;

④ 查看消费者提交的位移:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

[group-ni,ni,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional[0], metadata=, commitTimestamp=1692168114999, expireTimestamp=None)

可以看到,消费者的消息位移为 offset =2,但是消费者的提交位移为 offset =3;

2. 自动提交消费位移存在的问题?

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用,再均衡完成之后,接管分区的消费者将从最后一次提交的偏移量的位置开始读取消息)。可以通过修改提交时间间隔来更频繁地提交偏移量,缩小可能导致重复消息的时间窗口,但无法完全避免。

在使用自动提交时,到了该提交偏移量的时候,轮询方法将提交上一次轮询返回的偏移量,但它并不知道具体哪些消息已经被处理过了。所以,在再次调用poll()之前,要确保上一次poll()返回的所有消息都已经处理完毕(调用close()方法也会自动提交偏移量)。通常情况下这不会有什么问题,但在处理异常或提前退出轮询循环时需要特别小心。

虽然自动提交很方便,但是没有为避免开发者重复处理消息留有余地。

3. 手动提交消费位移

在Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。

开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false,让应用程序自己决定何时提交偏移量。手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync() 和 commitAsync() 两种类型的方法。

① 同步提交位移是指消费者在提交位移时会阻塞,直到提交完成并收到确认。它会提交 poll() 返回的最新偏移量,提交成功后马上返回,如果由于某些原因提交失败就抛出异常。 commitAsync() 方法有四个不同的重载方法,具体定义如下:

public void commitSync()
public void commitSync(Duration timeout)
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) 
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) 

② 异步提交位移在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。commitAsync方法有三个不同的重载方法,具体定义如下:

public void commitAsync() 
public void commitAsync(OffsetCommitCallback callback) 
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) 

1. 同步提交消费位移

在消费消息的循环中,处理完当前批次的消息后,在轮询更多的消息之前,调用 commitSync() 方法提交当前批次最新的偏移量,这会阻塞当前线程,直到位移提交完成并收到确认。 只要没有发生不可恢复的错误,commitSync() 方法就会一直尝试直至提交成功。如果提交失败,就把异常记录到错误日志里。

public void commitSync()
@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : consumerRecords) {
                // 业务处理拉取的消息
            }
            try{
                // 消费者手动提交消费位移:同步提交方式
                consumer.commitSync();
            }catch (CommitFailedException exception){
                log.error("commit failed....");
            }
        }
    }
}

还可以将消费者程序修改为批量处理+批量提交的方式:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            int minSize = 200;
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
            for (ConsumerRecord<String, String> record : consumerRecords) {
                buffer.add(record);
            }
            try{
                // 消费者手动提交消费位移:同步提交方式
                if(buffer.size()>minSize){
                    // 批量处理消息
                    // ...
                }
                // 手动提交位移:同步方式
                consumer.commitSync();
            }catch (CommitFailedException exception){
                log.error("commit failed....");
            }
        }
    }
}

上面的示例中将拉取到的消息存入缓存 buffer,等到积累到足够多的时候,也就是大于等于200个的时候,再做相应的批量处理,之后再做批量提交。

commitSync() 方法会根据 poll() 方法拉取的最新位移来进行提交,只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如 CommitFailedException、WakeupException、InterruptException、AuthenticationException、AuthorizationException 等,我们可以将其捕获并做针对性的处理。

需要注意的是,同步提交位移时需要确保在处理完消息后再进行提交,因为 commitSync() 将会提交 poll() 返回的最新偏移量,如果你在处理完所有记录之前就调用了 commitSync(),那么一旦应用程序发生崩溃,就会有丢失消息的风险(消息已被提交但未被处理)。如果应用程序在处理记录时发生崩溃,但 commitSync() 还没有被调用,那么从最近批次的开始位置到发生再均衡时的所有消息都将被再次处理——这或许比丢失消息更好,或许更坏。

2. 异步提交消费位移

同步提交有一个缺点,在broker对请求做出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,则会增加潜在的消息重复。这个时候可以使用异步提交API。只管发送请求,无须等待broker做出响应。

public void commitAsync() 
@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 业务逻辑处理
            }
            // 异步提交消费位移
            consumer.commitAsync();
        }
    }
}

在提交成功或碰到无法恢复的错误之前,commitSync() 会一直重试,但commitAsync()不会,这是commitAsync() 的一个缺点。之所以不进行重试,是因为 commitAsync() 在收到服务器端的响应时,可能已经有一个更大的位移提交成功。假设我们发出一个提交位移2000的请求,这个时候出现了短暂的通信问题,服务器收不到请求,自然也不会做出响应。与此同时,我们处理了另外一批消息,并成功提交了位移3000。如果此时 commitAsync() 重新尝试提交位移2000,则有可能在位移3000之后提交成功。这个时候如果发生再均衡,就会导致消息重复。

之所以提到这个问题并强调提交顺序的重要性,是因为 commitAsync() 也支持回调,回调会在broker返回响应时执行。回调经常被用于记录位移提交错误或生成指标,如果要用它来重试提交位移,那么一定要注意提交顺序。

public void commitAsync(OffsetCommitCallback callback)
@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 订阅主题
        consumer.subscribe(Arrays.asList("topic-01"));
        // 消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 业务逻辑处理
            }
            // 异步提交消费位移
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap, Exception exception) {
                    if(exception!=null){
                        log.info("fail to commit offsets:{}",offsetAndMetadataMap,exception);
                    }
                }
            });
        }
    }
}

异步提交中如何实现重试:我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的值。在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号的值的大小,如果前者小于后者,则说明有更大的位移已经提交了,不需要再进行本次重试;如果两者相同,则说明可以进行重试提交。

3. 同步和异步组合提交消费位移

一般情况下,偶尔提交失败但不进行重试不会有太大问题,因为如果提交失败是由于临时问题导致的,后续的提交总会成功。如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移;但如果这是发生在消费者被关闭或再均衡前的最后一次提交,则要确保提交是成功的,可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll( Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
					// 业务逻辑处理
                }
                // 异步提交位移
                consumer.commitAsync();
            }
        } catch (Exception e) {
            log.error("Unexpected error", e);
        } finally {
            try {
                // 同步提交位移
                consumer.commitSync();
            }finally{
                consumer.close();
            }
        }
    }
}

4. 提交特定的消费位移

对于采用 commitSync() 的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的。但如果想要更频繁地提交位移该怎么办?如果 poll() 返回了一大批数据,那么为了避免可能因再均衡引起的消息重复,想要在批次处理过程中提交位移该怎么办?这个时候不能只是调用 commitSync() 或commitAsync(),因为它们只会提交消息批次里的最后一个位移。

幸运的是,消费者API允许在调用 commitSync() 和 commitAsync() 时传给它们想要提交的分区和位移:

public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

在这里插入图片描述

如图:消费者的提交位移=当前一次poll拉取的分区消息的最大位移offset + 1,这个提交位移就是下次

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        ConcurrentHashMap<TopicPartition,OffsetAndMetadata> offsets = new ConcurrentHashMap<>();
        int count = 0;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 消息所属的主题和分区
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                // 消费者提交的消费位移=当前消费消息的位移+1
                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
                offsets.put(topicPartition, offsetAndMetadata);
                if(count % 1000 == 0){
                    consumer.commitAsync(offsets,null);
                }
                count++;
            }
        }
    }
}

5. 按分区提交消费位移

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 获取拉取的消息包含的所有分区列表
            Set<TopicPartition> partitions = consumerRecords.partitions();
            for (TopicPartition partition : partitions) {
                // 获取当前分区要消费的消息
                List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
                // 获取当前分区消息的最大位移
                long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                // 当前分区的消费位移提交 = 当前分区消息的最大位移 + 1
                Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumerOffset + 1));
                consumer.commitSync(topicPartitionOffsetAndMetadataMap);
            }
        }
    }
}

4. 消费者查找不到消费位移时怎么办?

当一个新的消费组建立的时候,它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。当__consumer_offsets 主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。当 Kafka 中没有初始位移或服务器上不再存在当前位移时,该怎么办?

此时会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,auto.offset.reset 参数的取值如下:

  • latest(默认值):表示从分区末尾开始消费消息。
  • earliest: 表示消费者会从起始处,也就是0开始消费。
  • none:查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常。如果能够找到消费位移,那么配置为“none”不会出现任何异常。

如果配置的不是“latest”、“earliest”和“none”,则会报出ConfigException异常。

auto.offset.reset 参数用于指定消费者在启动时,如果找不到消费位移应该从哪里开始消费消息。 如果能够找到消费位移,那么消费者会从该位移处开始消费消息,那么 auto.offset.reset 参数并不会奏效,只有在找不到消费位移时才会生效。如果发生位移越界,即消费位移超出了消息队列中消息的数量或位置范围,那么 auto.offset.reset 参数也会生效。

5. 如何从特定分区位移处读取消息?

如果消费者能够找到消费位移,使用 poll() 可以从各个分区的最新位移处读取消息, 而且提供的 auto.offset.reset 参数也可以在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。但是有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek() 方法正好提供了这个功能,让我们得以追前消费或回溯消费。

public void seek(TopicPartition partition, long offset)
public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)

① seek() 方法中的参数 partition 表示分区,而 offset 参数用来指定从分区的哪个位置开始消费。seek() 方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll() 方法的调用过程中实现的。也就是说,在执行 seek() 方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        // 执行一次poll() 方法完成分区分配的逻辑
        //  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
        Set<TopicPartition> topicPartitions = consumer.assignment();
        for (TopicPartition topicPartition : topicPartitions) {
            consumer.seek(topicPartition,10);
        }

        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
            // ...
        }
    }
}

② 如果 poll() 方法中的参数为0,此方法立刻返回,那么 poll() 方法内部进行分区分配的逻辑就会来不及实施,也就是说,消费者此时并未分配到任何分区,那么 topicPartitions 便是一个空列表。那么这里的 timeout 参数设置为多少合适呢?太短会使分配分区的动作失败,太长又有可能造成一些不必要的等待。我们可以通过 KafkaConsumer的 assignment()方法来判定是否分配到了相应的分区:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        Set<TopicPartition> topicPartitions = consumer.assignment();
        // 此时说明还未完成分区分配
        while (topicPartitions.size()==0){
            consumer.poll(Duration.ofMillis(100));
            topicPartitions = consumer.assignment();
        }
        for (TopicPartition topicPartition : topicPartitions) {
            // 重置每个分区的消费位置为10
            consumer.seek(topicPartition,10);
        }

        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
            // 消费消息
        }
    }
}

③ 如果对未分配到的分区执行seek() 方法,那么会报出 IllegalStateException 的异常。类似在调用subscribe() 方法之后直接调用seek() 方法:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));


        // 未完成分区分配,直接调用seek方法,重置分区1的消费位置为10
        consumer.seek(new TopicPartition("topic-01",1),10);

        while (true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
            // 消费消息
        }
    }
}

报错:

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition topic-01-1

④ 如果消费组内的消费者在启动的时候能够找到消费位移,那么消费者就会从该位移处开始消费消息。除非发生位移越界,即消费位移超出了消息队列中消息的数量或位置范围,否则 auto.offset.reset 参数并不会奏效,此时如果想指定从开头或末尾开始消费,就需要seek() 方法的帮助了,指定从分区末尾开始消费:

endOffsets() 方法用来获取指定分区的末尾的消息位置, endOffsets 的具体方法定义如下:

public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout)

其中 partitions 参数表示分区集合,而 timeout 参数用来设置等待获取的超时时间。如果没有指定 timeout 参数的值,那么 endOffsets() 方法的等待时间由客户端参数 request.timeout.ms 来设置,默认值为 30000。与 endOffsets 对应的是 beginningOffset() 方法,一个分区的起始位置起初是0,但并不代表每时每刻都为0,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加,beginningOffsets() 方法的具体定义如下:

public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) 
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)

beginningOffsets() 方法中的参数内容和含义都与 endOffsets() 方法中的一样,配合这两个方法我们就可以从分区的开头或末尾开始消费。其实KafkaConsumer中直接提供了seekToBeginning() 方法和seekToEnd() 方法来实现这两个功能,这两个方法的具体定义如下:

public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)

⑤ 有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如我们想要消费昨天8点之后的消息,这个需求更符合正常的思维逻辑。此时我们无法直接使用seek() 方法来追溯到相应的位置。KafkaConsumer同样考虑到了这种情况,它提供了一个offsetsForTimes() 方法,通过timestamp来查询与此对应的分区位置:

public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)

offsetsForTimes() 方法的参数 timestampsToSearch 是一个Map类型,key为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于 OffsetAndTimestamp 中的 offset 和 timestamp字段。下面的示例演示了 offsetsForTimes() 和 seek() 之间的使用方法,首先通过 offsetsForTimes() 方法获取一天之前的消息位置,然后使用 seek() 方法追溯到相应位置开始消费:

@Slf4j
public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.65.132.2:9093");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-topic-01");
        // 显式配置消费者手动提交位移
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList("topic-01"));

        Map<TopicPartition,Long> timestampToSearch = new HashMap<>();
        Set<TopicPartition> topicPartitionSet = consumer.assignment();
        // 查询的分区以及查询的时间戳
        for (TopicPartition topicPartition : topicPartitionSet) {
            timestampToSearch.put(topicPartition,System.currentTimeMillis()-1*24*3600*1000);
        }

        // 获取时间戳大于等于待查询时间的第一条消息对应的位置和时间戳
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(timestampToSearch);
        for (TopicPartition topicPartition : topicPartitionSet) {
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
            // seek 方法重置消费的位移
            if(offsetAndTimestamp != null){
                consumer.seek(topicPartition,offsetAndTimestamp.offset());
            }
        }
    }
}

⑥ 位移越界也会触发 auto.offset.reset 参数的执行,位移越界是指知道消费位置却无法在实际的分区中查找到,比如原本拉取位置为101(fetch offset 101),但已经越界了(out of range),所以此时会根据 auto.offset.reset 参数的默认值来将拉取位置重置(resetting offset)为100,我们也能知道此时分区中最大的消息 offset 为99。

6. 如何优雅地退出轮询循环消费?

如何优雅地退出轮询循环,如果你确定马上要关闭消费者(即使消费者还在等待一个poll()返回),那么可以在另一个线程中调用consumer.wakeup()。如果轮询循环运行在主线程中,那么可以在ShutdownHook里调用这个方法。需要注意的是,consumer.wakeup() 是消费者唯一一个可以在其他线程中安全调用的方法。调用 consumer.wakeup() 会导致poll()抛出WakeupException,如果调用 consumer.wakeup() 时线程没有在轮询,那么异常将在下一次调用 poll() 时抛出。不一定要处理WakeupException,但在退出线程之前必须调用consumer.close() 。消费者在被关闭时会提交还没有提交的偏移量,并向消费者协调器发送消息,告知自己正在离开群组。协调器会立即触发再均衡,被关闭的消费者所拥有的分区将被重新分配给群组里其他的消费者,不需要等待会话超时。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/84279.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

DSO 系列文章(2)——DSO点帧管理策略

文章目录 1.点所构成的残差Residual的管理1.1.前端残差的状态1.2.后端点的残差的状态1.3.点的某个残差的删除 2.点Point的管理2.1.如何删除点——点Point的删除2.2.边缘化时删除哪些点&#xff1f; 3.帧FrameHessian的管理 DSO代码注释&#xff1a;https://github.com/Cc19245/…

elemenPlus ElMessage 字符串如何换行问题

因为后端返回的数据是一长串&#xff0c;而且带有\r,\n等换行符&#xff0c;但是并没有生效。前端写法&#xff1a; // 抛出错误ElMessage.error(msg);我们知道\r&#xff0c;\n&#xff0c;\r\n 是在不同系统下的换行符的表示&#xff0c;但在JavaScript返回字符串中并没有生效…

ElasticSearch学习2

1、索引的操作 1、创建索引 对ES的操作其实就是发送一个restful请求&#xff0c;kibana中在DevTools中进行ES操作 创建索引时需要注意ES的版本&#xff0c;不同版本的ES创建索引的语句略有差别&#xff0c;会导致失败 如下创建一个名为people的索引&#xff0c;settings&…

SqlServer 快速数据库脚本迁移

文章目录 前言数据库脚本数据库->任务->生成脚本选择数据库对象高级 如何迁移&#xff1a;脚本修改 如何使用新建数据库 前言 做工业的&#xff0c;经常遇到内网的项目&#xff0c;就是数据往本地的数据库传。由于这个问题所以我们需要新建一个数据库。最合适的就是数据…

编写一个俄罗斯方块

编写俄罗斯方块 思路。 1、创建容器数组&#xff0c;方块&#xff0c; 2、下落&#xff0c;左右移动&#xff0c;旋转&#xff0c;判断结束&#xff0c;消除。 定义一个20行10列的数组表示游戏区。初始这个数组里用0填充&#xff0c;1表示有一个方块&#xff0c;2表示该方块固…

Nvidia Jetson 编解码开发(3)解决H265解码报错“PPS id out of range”

1.问题描述 基于之前的开发程序 Nvidia Jetson 编解码开发(2)Jetpack 4.x版本Multimedia API 硬件编码开发--集成encode模块_free-xx的博客-CSDN博客 通过Jetson Xavier NX 硬编码的H265发出后, 上位机断点播放发出来的H265码流, 会报“PPS id out of range” 错误 …

2023年最佳JavaScript框架:React、Vue、Angular和Node.js的比较

文章目录 React&#xff1a;构建用户界面的首选Vue&#xff1a;简单优雅的前端框架Angular&#xff1a;Google支持的全面框架Node.js&#xff1a;服务器端的JavaScript运行环境比较不同框架的优势与劣势React&#xff1a;Vue&#xff1a;Angular&#xff1a;Node.js&#xff1a…

Pytest使用fixture实现token共享

同学们在做pytest接口自动化时&#xff0c;会遇到一个场景就是不同的测试用例需要有一个登录的前置步骤&#xff0c;登录完成后会获取到token&#xff0c;用于之后的代码中。首先我先演示一个常规的做法。 首先在conftest定义一个login的方法&#xff0c;方法返回token pytes…

python 打印一个条形堆积图

背景 今天介绍一个不使用 matplot&#xff0c;通过 DebugInfo模块打印条形堆积图 的方法。 引入模块 pip install DebugInfo打印销售转化数据 下面的代码构建了两个销售团队&#xff0c;团队A 和团队B&#xff1b;两个团队的销售数据构成了公司总的销售成果。以条形堆积图的…

bug记录:微信小程序 给button使用all: initial重置样式

场景&#xff1a;通过uniapp开发微信小程序 &#xff0c;使用uview的u-popup弹窗&#xff0c;里面内嵌了一个原生button标签&#xff0c;因为微信小程序的button是有默认样式的&#xff0c;所以通过all: initial重置样式 。但是整个弹窗的点击事件都会被button上面的点击事件覆…

node没有自动安装npm时,如何手动安装 npm

之前写过一篇使用 nvm 管理 node 版本的文章&#xff0c;node版本管理&#xff08;Windows&#xff09; 有时候&#xff0c;我们使用 nvm 下载 node 时&#xff0c;node 没有自动下载 npm &#xff0c;此时就需要我们自己手动下载 npm 1、下载 npm下载地址&#xff1a;&…

java请求SAP系统,发起soap的xml报文,实体类转换,idea自动生成教程

1、将接口的网页地址&#xff0c;右键保存&#xff0c;然后修改文件后缀为wsdl文件 2、idea全局搜索 wsdl&#xff0c;找到自动转换javabean插件&#xff1a; 3、点击后&#xff0c;选择下载改完后缀的文件(选择)&#xff1a; 4、将无用的class文件删除掉 5、请求sap的地址为…

嵌入式蓝海变红海?其实是大浪淘沙!

嵌入式是当下热门的职业方向之一&#xff0c;吸引了众多求职者的目光。然而&#xff0c;有人担心大家一拥而上&#xff0c;导致嵌入式就业竞争激烈&#xff0c;找工作难度大。其实&#xff0c;嵌入式行业的竞争并非无法逾越的天堑&#xff0c;也远远没有从蓝海变成红海&#xf…

【Redis从头学-7】Redis中的Set数据类型实战场景之用户画像去重、共同关注、专属粉丝

&#x1f9d1;‍&#x1f4bb;作者名称&#xff1a;DaenCode &#x1f3a4;作者简介&#xff1a;啥技术都喜欢捣鼓捣鼓&#xff0c;喜欢分享技术、经验、生活。 &#x1f60e;人生感悟&#xff1a;尝尽人生百味&#xff0c;方知世间冷暖。 &#x1f4d6;所属专栏&#xff1a;Re…

春秋云镜 CVE-2019-2725

春秋云镜 CVE-2019-2725 Weblogic < 10.3.6 ‘wls-wsat’ XMLDecoder 反序列化漏洞 靶标介绍 Oracle Fusion Middleware&#xff08;子组件&#xff1a;Web Services&#xff09;的 Oracle WebLogic Server 组件中的漏洞。受影响的受支持版本为 10.3.6.0.0 和 12.1.3.0.0。…

Python爬虫实战案例——第一例

X卢小说登录(包括验证码处理) 地址&#xff1a;aHR0cHM6Ly91LmZhbG9vLmNvbS9yZWdpc3QvbG9naW4uYXNweA 打开页面直接进行分析 任意输入用户名密码及验证码之后可以看到抓到的包中传输的数据明显需要的是txtPwd进行加密分析。按ctrlshiftf进行搜索。 定位来到源代码中断点进行调…

python 开发环境(PyCharm)搭建指南

Python 的下载并安装 参考&#xff1a;Python基础教程——搭建Python编程环境 下载 Python Python 下载地址&#xff1a;官网 &#xff08;1&#xff09;点击【Downloads】>>>点击【Windows】>>>点击【Python 3.x.x】下载最新版 Python&#xff1b; Pyt…

resource doesn‘t have a corresponding Go package.

resource doesnt have a corresponding Go package. GO这个鬼东西不能直接放src下。 ************ Building Go project: ProjectGoTest ************with GOPATH: D:\Go;D:\eclipse-jee-oxygen-2-win32-x86_64\workspace\ProjectGoTest >> Running: D:\Go\bin\go.exe …

SNAT和DNAT

SNAT和DNAT 一、SNAT策略及应用1.1SNAT策略概述1.2开启SNAT的命令1.2.1 临时打开1.2.2永久打开 1.3SNAT转换1&#xff1a;固定的公网IP地址1.4SNAT转换2&#xff1a;非固定的公网IP地址&#xff08;共享动态IP地址&#xff09;1.5SNAT案例1.5.1实验准备1.5.2配置网关服务器&…

(数字图像处理MATLAB+Python)第十章图像分割-第三,四节:区域分割和基于聚类的图像分割

文章目录 一&#xff1a;区域分割&#xff08;1&#xff09;区域生长A&#xff1a;原理B&#xff1a;示例C&#xff1a;程序 &#xff08;2&#xff09;区域合并A&#xff1a;原理B&#xff1a;示例C&#xff1a;程序 &#xff08;3&#xff09;区域分裂A&#xff1a;原理B&…