解析KafkaConsumer类的神奇之道

欢迎来到我的博客,代码的世界里,每一行都是一个故事


在这里插入图片描述

解析KafkaConsumer类的神奇之道

    • 前言
    • KafkaConsumer双线程设计
      • 主线程(消费线程):
      • 心跳线程:
      • 示例代码:
    • KafkaConsumer线程不安全
      • 线程安全的替代方案:
    • 常用方法

前言

在分布式系统的舞台上,KafkaConsumer类如同消息消费的魔法师,默默地引导着消息的流向。本文将带您进入这个分布式的消费艺术之旅,解析KafkaConsumer类的玄妙之道。让我们一起揭开这个神秘面纱,探索Kafka中KafkaConsumer类的奥秘。

KafkaConsumer双线程设计

对于 Kafka 消费者 (KafkaConsumer) 的双线程设计,一种常见的模式是使用两个线程:主线程和心跳线程。这种设计可以有效提高消费者的稳定性和性能。

主线程(消费线程):

  1. 消费消息: 主线程负责从 Kafka 主题中拉取消息,并进行业务逻辑的处理。

  2. 异步提交位移: 在消费者成功处理消息后,主线程可以异步提交位移(offset)到 Kafka。这可以通过设置 enable.auto.commitfalse,手动控制位移提交的时机,确保消息处理成功后再提交位移。

    consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
  3. 处理业务逻辑: 在主线程中,处理从 Kafka 拉取到的消息,执行具体的业务逻辑。

心跳线程:

  1. 定期发送心跳: 心跳线程负责定期向 Kafka 集群发送心跳请求,以确保消费者仍然处于活动状态。这有助于防止消费者因长时间不活动而被认为失效。

  2. 处理分区再分配: 在消费者组发生分区再分配时,心跳线程可以处理重新分配操作,确保消费者组的协调和平稳进行。

示例代码:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerWithHeartbeat {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your_bootstrap_servers");
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "your_group_id");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
        consumer.subscribe(Collections.singletonList("your_topic"));

        // 创建并启动心跳线程
        HeartbeatThread heartbeatThread = new HeartbeatThread(consumer);
        heartbeatThread.start();

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                // 处理消费记录的逻辑

                // 异步提交位移
                consumer.commitAsync();
            }
        } finally {
            // 在主线程关闭时停止心跳线程
            heartbeatThread.shutdown();
            consumer.close();
        }
    }
}

class HeartbeatThread extends Thread {
    private final Consumer<String, String> consumer;
    private volatile boolean running = true;

    public HeartbeatThread(Consumer<String, String> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        while (running) {
            // 发送心跳请求
            consumer.poll(Duration.ofMillis(100));
        }
    }

    public void shutdown() {
        running = false;
        interrupt();
    }
}

在上述示例中,KafkaConsumer 在主线程中进行消息的消费和位移提交,而 HeartbeatThread 负责定期发送心跳请求。注意在程序结束时关闭 HeartbeatThread,以确保线程正确停止。这种设计有助于确保消费者组的稳定和及时的位移提交。

KafkaConsumer线程不安全

KafkaConsumer 是线程不安全的,这意味着在多线程环境下,单个 KafkaConsumer 实例不能同时被多个线程使用,除非进行额外的同步措施。

在 Kafka 中,通常的做法是为每个消费者线程创建一个独立的 KafkaConsumer 实例。这确保了线程之间的独立性,避免了竞争条件和状态混乱。

线程安全的替代方案:

  1. 多个独立的 KafkaConsumer 实例: 为每个消费者线程创建一个独立的 KafkaConsumer 实例。这确保了每个线程有自己的消费状态和位移信息,不会相互干扰。

    KafkaConsumer<String, String> consumerThread1 = new KafkaConsumer<>(consumerProperties);
    KafkaConsumer<String, String> consumerThread2 = new KafkaConsumer<>(consumerProperties);
    
  2. 线程池中的消费者: 如果你使用线程池来管理消费者线程,确保每个线程都有独立的 KafkaConsumer 实例。

    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 5; i++) {
        executorService.submit(() -> {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
            // 消费逻辑
            consumer.close();
        });
    }
    
  3. 消费者工厂创建实例: 自定义消费者工厂,确保每个工厂创建的消费者实例都是独立的。

    class ConsumerFactory {
        public static KafkaConsumer<String, String> createConsumer() {
            return new KafkaConsumer<>(consumerProperties);
        }
    }
    

    在每个线程中使用 ConsumerFactory.createConsumer() 来获取独立的消费者实例。

总体来说,确保每个消费者线程都有自己的 KafkaConsumer 实例是一种良好的实践,可以避免潜在的线程安全问题。同时,在使用多线程消费时,也要注意处理好位移提交和异常处理,以确保系统的稳定性和一致性。

常用方法

KafkaConsumer 是 Kafka 客户端库中用于消费消息的重要类。以下是一些 KafkaConsumer 中常用的一些重要方法:

  1. subscribe(Collection<String> topics) 订阅一个或多个主题,以开始接收消息。可以通过多次调用 subscribe 来订阅多个主题。

    consumer.subscribe(Arrays.asList("topic1", "topic2"));
    
  2. poll(Duration timeout) 从订阅的主题中拉取消息。该方法会阻塞一段时间或直到拉取到消息,参数 timeout 控制阻塞的最大时长。

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
  3. assign(Collection<TopicPartition> partitions) 手动分配特定的分区给消费者。与 subscribe 不可一起使用,需要手动管理分区的消费。

    consumer.assign(Arrays.asList(new TopicPartition("topic1", 0)));
    
  4. commitSync()commitAsync() 用于手动提交消费者的位移信息。commitSync() 是同步提交,会阻塞直到提交成功或发生错误;commitAsync() 是异步提交,不会阻塞主线程。

    consumer.commitSync();
    // 或
    consumer.commitAsync();
    
  5. seek(TopicPartition partition, long offset) 将消费者定位到特定分区和位移位置。可以在消费者启动后使用该方法。

    consumer.seek(new TopicPartition("topic1", 0), 10);
    
  6. seekToBeginning(Collection<TopicPartition> partitions)seekToEnd(Collection<TopicPartition> partitions) 将消费者定位到分区的开头或末尾。

    consumer.seekToBeginning(Collections.singletonList(new TopicPartition("topic1", 0)));
    // 或
    consumer.seekToEnd(Collections.singletonList(new TopicPartition("topic1", 0)));
    
  7. assignment() 获取当前分配给消费者的分区列表。

    Set<TopicPartition> partitions = consumer.assignment();
    
  8. unsubscribe() 取消订阅,停止消费者消费消息。

    consumer.unsubscribe();
    
  9. close() 关闭消费者,释放资源。在不使用消费者时应调用此方法。

    consumer.close();
    
  10. wakeup():可以在其他线程中安全地调用kafkaConsumer.wakeup()来唤醒Consumer,是线程安全的

这些是 KafkaConsumer 中的一些关键方法,用于管理消费者的订阅、消息拉取、位移提交等操作。根据实际使用场景,适当选择和组合这些方法可以满足不同的需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/461292.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PHP爬虫技术:利用simple_html_dom库分析汽车之家电动车参数

摘要/导言 本文旨在介绍如何利用PHP中的simple_html_dom库结合爬虫代理IP技术来高效采集和分析汽车之家网站的电动车参数。通过实际示例和详细说明&#xff0c;读者将了解如何实现数据分析和爬虫技术的结合应用&#xff0c;从而更好地理解和应用相关技术。 背景/引言 随着电…

非空约束

oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 非空约束 所谓的非空约束&#xff0c;指的是表中的某一个字段的内容不允许为空。如果要使用非空约束&#xff0c;只需要在每个列的后面利用“NOT NULL”声明即可 -- 删除数…

Java习题中 哈希表的理论 有效的字母异位词 快乐数 两数之和

关于 哈希表的理论 今天最大的疑惑好像就是map的复杂度怎么算哈哈,一般n个元素map的复杂度就是On哦,不需要想得太复杂了,冲突的空间并不会造成一个量级,改变n前面的常数不会影响空间复杂度哈提醒&#xff01;熟悉好map,set的API哦 关于 有效的字母异位词 为什么遍历第二个字符…

Linux异步通知实验:应用程序对异步通知的处理

一. 简介 前面文章学习了 应用程序对异步通知的处理方法&#xff0c;另一篇文章实现了Linux驱动对异步通知的处理&#xff1a; Linux应用程序对异步通知的处理-CSDN博客 Linux异步通知实验&#xff1a;驱动中异步通知的处理-CSDN博客 本文继续Linux异步通知实验&#xff0c…

想进阿里?先搞懂Spring Bean作用域!

大家好,我是小米!今天我来和大家分享一下 Java 开发中一项非常重要的技术——参数校验。参数校验在我们的代码中起着至关重要的作用,它能够确保我们的应用程序接收到正确的数据,并且保证了系统的安全性和稳定性。在过去,我们可能会通过繁琐的 if-else 来进行参数校验,但是…

AI视频矩阵混剪系统|罐头鱼AI批量混剪定时发送

AI视频矩阵混剪系统&#xff1a;智能创作与发布的完美结合 随着社交媒体平台的快速发展&#xff0c;视频已成为各行业推广和传播的热门方式。然而&#xff0c;对于许多人来说&#xff0c;制作高质量的视频仍然是一项挑战。Q:290615413但现在&#xff0c;有了AI视频矩阵混剪系统…

lftp服务与http服务(包含scp服务)详解

目录 前言: 1.lftp服务 1.1lftp服务的介绍以及应用场景 1.2安装lftp服务 1.2进行配置 1.3实际操作 2.http服务 2.1http服务介绍以及应用场景 2.1安装httpd服务 2.2进行配置 2.3实际操作 3.scp服务 3.1scp服务的介绍以及应用场景 致谢: 前言: 在当今互联网…

xss.haozi.me靶场“0x0B-0x12”通关教程

君衍. 一、0x0B 实体编码绕过二、0x0C script绕过三、0x0D 注释绕过四、0X0E ſ符号绕过五、0x0F 编码解码六、0x10 直接执行七、0x11 闭合绕过八、0x12 闭合绕过 XSS-Labs靶场“1-5”关通关教程 XSS-Labs靶场“6-10”关通关教程 Appcms存储型XSS漏洞复现 XSS-Labs靶场“11-13、…

CSS学习-选择器

一、基本选择器 1. 通配选择器 作用&#xff1a;可以选中所有的 HTML 元素。 语法&#xff1a; * { 属性名: 属性值; }举例&#xff1a; /* 选中所有元素 */ * { color: orange; font-size: 40px; }主要用于&#xff1a;清除样式。 2. 元素选择器 作用&#xff1a;为页面…

57、服务攻防——应用协议RsyncSSHRDP漏洞批扫口令猜解

文章目录 口令猜解——Hydra-FTP&RDP&SSH配置不当——未授权访问—Rsync文件备份协议漏洞——应用软件-FTP&Proftpd搭建 口令猜解——Hydra-FTP&RDP&SSH FTP&#xff1a;文本传输协议&#xff0c;端口21&#xff1b;RDP&#xff1a;windows上远程终端协议…

多媒体会议系统的优势与核心组成

随着科技的发展&#xff0c;多媒体会议系统已经成为现代商务沟通的重要工具。这种集成了多种通信和信息技术的系统&#xff0c;旨在提高会议的效率和参与度&#xff0c;具有诸多优势。本文将对多媒体会议系统进行详细的介绍和分析&#xff0c;并探讨其对现代商务沟通的影响。 …

Python程序设计基础——代码习题

1 __name__属性 import demodef main():if __name__ __main__:print(这个程序被直接运行。)elif __name__demo:print(这个程序作为模块被使用。) main()3.3 编写程序&#xff0c;生成包含1000个0~100之间的随机整数&#xff0c;并统计每个元素出现的次数。 import randomx[r…

springboot基于java的畅销图书推荐系统

摘 要 二十一世纪我们的社会进入了信息时代&#xff0c;信息管理系统的建立&#xff0c;大大提高了人们信息化水平。传统的管理方式对时间、地点的限制太多&#xff0c;而在线管理系统刚好能满足这些需求&#xff0c;在线管理系统突破了传统管理方式的局限性。于是本文针对这一…

Springboot 整合 Elasticsearch(五):使用RestHighLevelClient操作ES ②

&#x1f4c1; 前情提要&#xff1a; Springboot 整合 Elasticsearch&#xff08;三&#xff09;&#xff1a;使用RestHighLevelClient操作ES ① 目录 一、Springboot 整合 Elasticsearch 1、RestHighLevelClient API介绍 1.1、全查询 & 分页 & 排序 1.2、单条件查询…

PFA烧杯透明聚四氟乙烯刻度量杯

PFA烧杯&#xff0c;刻度清晰&#xff0c;耐酸碱&#xff0c;和有机溶剂。

模板进阶:非类型模板参数,特化

一、非类型模板参数 非类型模板参数&#xff0c;就是用一个常量作为 类/函数 的模板参数&#xff0c;在 类/函数 中可以被当成常量使用。 template<class T, size_t N>// N 为一个非类型模板参数 class Stack { public:Stack(){_a new T[N];} protected:T* _a; };int m…

Python环境安装及Selenium引入

Python环境安装 环境下载 Download Python | Python.org 环境安装 需使用管理员身份运行 查看环境是否安装成功 python --version 如果未成功则检查环境变量配置 安装 Selenium 库 pip install selenium Selenium 可以模拟用户在浏览器中的操作&#xff0c;如点击按钮、填写…

Python的异常处理机制之基础代谢

try语句是Python中的异常处理机制。当我们预料到某个代码块可能会引发异常时&#xff0c;可以将这部分代码放在try语句块中。如果try语句块中的代码执行时发生异常&#xff0c;Python会跳出当前的执行流程&#xff0c;并查找是否有对应的异常处理代码。 try语句的基本语法如下…

C++第五弹---类与对象(二)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】 类与对象 1、类对象模型 1.1、如何计算类对象的大小 1.2、类对象的存储方式猜测 1.3、结构体内存对齐规则 2、this指针 2.1、this指针的引出 2.2…

Spring Cloud Alibab 入门搭建,包含Nacos中心,注册服务发现服务,Feign请求,GateWay网关,sentinel限流

源码在最后 一、安装Nacos注册中心 1.1查看Nacos官网&#xff0c;安装Nacos服务&#xff0c;下载源码或者安装包 1.2启动服务&#xff0c;默认端口为8848&#xff0c; 二、创建服务注册&发现 2.1使用脚手架&#xff0c;创建注册服务和发现服务项目&#xff0c;我用的版…