Kafka 消费者 API 指南:深入探讨消费者的实现与最佳实践

Kafka 消费者 API 是连接应用程序与 Kafka 集群之间的关键接口,用于从 Kafka 主题中拉取消息并进行处理。本篇文章将深入探讨 Kafka 消费者 API 的核心概念、用法,以及一些最佳实践,帮助你构建高效、可靠的消息消费系统。

1. Kafka 消费者 API 概览

Kafka 消费者 API 允许应用程序从 Kafka 集群中的指定主题订阅消息,并以流式的方式进行消费。消费者 API 提供了丰富的配置选项和强大的消息处理功能,使得开发者能够根据实际需求进行高度定制。

1.1 引入依赖

首先,确保项目中引入了 Kafka 相关的依赖,例如 Maven 中的:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version> <!-- 替换为你的 Kafka 版本 -->
</dependency>

1.2 创建消费者实例

使用 Kafka 消费者 API 首先需要创建一个消费者实例。以下是一个简单的示例:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MyKafkaConsumer {

    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 创建消费者实例
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("my-topic"));

        // 拉取消息并处理
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            // 处理消息逻辑
            records.forEach(record -> {
                System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
            });
        }
    }
}

2. 消息的订阅与拉取

2.1 订阅主题

使用 subscribe 方法订阅一个或多个主题,使消费者能够从这些主题中拉取消息。

consumer.subscribe(Collections.singletonList("my-topic"));

2.2 拉取消息

通过 poll 方法拉取消息,该方法返回一个包含消费记录的 ConsumerRecords 对象。

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

3. 消费者组与分区分配

3.1 消费者组

Kafka 消费者可以组成一个消费者组,共同消费一个主题。消费者组能够实现负载均衡和故障恢复。

props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");

3.2 分区分配

消费者组内的每个消费者会被分配一个或多个分区,以实现消息的并行处理。

consumer.subscribe(Collections.singletonList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

4. 消息处理与提交

4.1 处理消息

通过遍历 ConsumerRecords 对象,可以处理拉取到的每条消息。

records.forEach(record -> {
    System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
});

4.2 手动提交偏移量

消费者可以选择手动提交偏移量,确保消息被成功处理。

consumer.commitSync();

5. 消费者的配置选项

Kafka 消费者 API 同样提供了众多配置选项,根据实际需求进行灵活定制。以下是一些常用的配置选项:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 更多配置项...

6. 消费者的事务支持

Kafka 消费者 API 也支持事务,确保消息的一致性。以下是事务的基本用法:

consumer.subscribe(Collections.singletonList("my-topic"));
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        consumer.beginTransaction();
        records.forEach(record -> {
            // 处理消息逻辑
            System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
        });
        consumer.commitTransaction();
    }
} finally {
    consumer.close();
}

7. 性能调优和最佳实践

7.1 提高并行性

通过增加消费者实例的数量,可以提高消息的并行处理能力。

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5分钟

7.2 手动管理偏移量

在某些场景下,手动管理偏移量能够更精细地控制消息的处理逻辑。

consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    records.forEach(record -> {
        // 处理消息逻辑
        System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
    });
    consumer.commitAsync();
}

总结

通过本文的介绍,对 Kafka 消费者 API 有了更深入的了解。从创建消费者实例、消息的订阅与拉取,再到消费者组与分区分配、消息处理与提交,这些都是构建高效、可靠 Kafka 消费者系统的核心知识点。在实际应用中,根据业务需求和性能期望,结合消费者 API 的灵活配置,可以更好地发挥 Kafka 在消息消费领域的优势。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/221565.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[组合数学]LeetCode:2954:统计感冒序列的数目

作者推荐 [二分查找]LeetCode2040:两个有序数组的第 K 小乘积 题目 给你一个整数 n 和一个下标从 0 开始的整数数组 sick &#xff0c;数组按 升序 排序。 有 n 位小朋友站成一排&#xff0c;按顺序编号为 0 到 n - 1 。数组 sick 包含一开始得了感冒的小朋友的位置。如果位…

BUU UPLOAD COURSE 1

传一个cmd.php木马文件 访问一下这个图片地址 发现什么都没有&#xff0c;在hackbar里面连接一下我们的木马 然后看到了一些目录 然后直接查看flag就出来了 这里也可以用蚁剑去连接 直接访问地址&#xff0c;拿着地址去连接就行了。

FTP协议(PORT和PASV模式)

目录 FTP协议基本概念 PORT主动模式工作流程 PORT工作过程 PORT工作报文 PASV被动模式工作流程 PASV工作过程 PASV工作报文 FTP协议基本概念 FTP文件传输协议&#xff0c;用于在互联网上进行文件传输&#xff0c;基于C/S架构 FTP的连接模式 FTP采用双TCP连接方式 控制连…

七大经典高效学习方法

金字塔学习模型 金字塔学习是美国学习专家爱德加戴尔1946年提出的。 他将学习分为主动学习和被动学习两种类型&#xff0c;用数字形象地呈现了采用不同学习方式&#xff0c;学习者在两周后还能记住的内容有多少。 被动学习&#xff1a;通过听讲、阅读、视听、演示这些活动&a…

Word使用相关——(待完善)

1.word 怎样删除分节符 2.word 怎样删除目录中的分节符 欢迎使用Markdown编辑器 你好&#xff01; 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Markdown编辑器, 可以仔细阅读这篇文章&#xff0c;了解一下Markdown的基本语法知识。 新的改变 我…

基于ssm vue的社区互助平台源码和论文

摘 要 随着社区互助规模的不断扩大&#xff0c;社区互助数量的急剧增加&#xff0c;有关社区互助的各种信息量也在不断成倍增长。面对庞大的信息量&#xff0c;就需要有社区互助管理来提高社区互助管理工作的效率。通过这样的系统&#xff0c;我们可以做到信息的规范管理和快速…

企业级SQL开发:如何审核发布到生产环境的SQL性能

自从上世纪 70 年代数据库开始普及以来&#xff0c;DBA 们就不停地遭遇各种各样的数据库管理难题&#xff0c;其中最为显著的&#xff0c;可能就是日常的开发任务中&#xff0c;研发人员们对于核心库进行变更带来的一系列风险。由于针对数据库的数据变更是一项非常常见的任务&a…

nodejs微信小程序+python+PHP本科生优秀作业交流网站的设计与实现-计算机毕业设计推荐

通过软件的需求分析已经获得了系统的基本功能需求&#xff0c;根据需求&#xff0c;将本科生优秀作业交流网站功能模块主要分为管理员模块。管理员添加系统首页、个人中心、用户管理、作业分类管理、作业分享管理、论坛交流、投诉举报、系统管理等操作。 随着信息化社会的形成…

微软发布Orca2,“调教式”教会小规模大语言模型如何推理!

我们都知道在大多数情况下&#xff0c;语言模型的体量和其推理能力之间存在着正相关的关系&#xff1a;模型越大&#xff0c;其处理复杂任务的能力往往越强。 然而&#xff0c;这并不意味着小型模型就永远无法展现出色的推理性能。最近&#xff0c;奶茶发现了微软的Orca2公开了…

iOS不越狱自动挂机

自动挂机在电脑上或者安卓手机上都相对容易&#xff0c;而在不越狱的iOS设备上还是有点难度的。 此方法不是我原创&#xff0c;详情见&#xff1a; 【苹果党福音&#xff0c;ios也能用的挂机脚本】 https://www.bilibili.com/video/BV1sv4y1P7TL/?share_sourcecopy_web&v…

多模块项目打包lib成aar

首先 我们要理解原理lib和app的gradle配置区别 plugins { id com.android.application } plugins { id "com.android.library" } assembleDebug&#xff08;assembleRelease&#xff09;两者分别生成是apk 和aar 对于app来说有包名、有版本号而library没有。 接…

【页面】表格展示

展示 Dom <template><div class"srch-result-container"><!--左侧--><div class"left"><div v-for"(item,index) in muneList" :key"index" :class"(muneIndexitem.mm)?active:"click"pa…

在线学习平台-需求分析(Java)

需求分析 研发集管理员、教务、教师、学生四种权限一体的中后台教务服务管理系统。其中管理员能够开设账号与角色分配&#xff0c;控制系统权限&#xff1b;教务能够进行班级管理、学员管理&#xff1b;教师能够进行课程与教学资源发布、作业发布与批改&#xff1b;学生能够观…

m1源码编译xgboost的动态链接库dylib

1、下载源码 git clone --recursive https://github.com/dmlc/xgboost cd xgboost拉取源码时候&#xff0c;一定要加"--recursive"这个命令。把它的字模块也要拉取下来&#xff0c;才能编译成功 2、安装c依赖 必要的依赖项(不然后续编译时报错)&#xff0c;包括CM…

无到无限,回顾亚马逊云关系型数据库 15 年的发展历程

又是一年一度 AWS re:Invent&#xff0c;这次关系型数据库最重磅的发布是 Amazon Aurora Limitless Database (无限数据库)。在 AWS 高级副总裁 Peter DeSantis 的 Keynote 里&#xff0c;也用了将近一半的篇幅回顾了 AWS 关系型数据库的发展历程。 2009 - RDS 把 MySQL, Post…

每日3道PWN(第二天)

ciscn_2019_n_1 参考&#xff1a; [BUUCTF-pwn]——ciscn_2019_n_1-CSDN博客 [BUUCTF]PWN5——ciscn_2019_n_1_ciscn_2019_n_4-CSDN博客 BUUCTF—ciscn_2019_n_1 1-CSDN博客 checksec一下 64位栈溢出 按f5查看main函数&#xff0c;双击可疑函数 发现含有命令执行的且发现fl…

uniapp横向滚动示例

目录 插件市场案例最后 插件市场 地址 案例 地址 最后 感觉文章好的话记得点个心心和关注和收藏&#xff0c;有错的地方麻烦指正一下&#xff0c;如果需要转载,请标明出处&#xff0c;多谢&#xff01;&#xff01;&#xff01;

统信UOS_麒麟KYLINOS上安装特定版本python

原文链接&#xff1a;统信UOS/麒麟KYLINOS上安装python特定版本 hello&#xff0c;大家好啊&#xff01;Python作为一种广泛使用的编程语言&#xff0c;其版本多样性给开发者带来了既便利又挑战的情况。不同的项目可能需要不同版本的Python&#xff0c;而在统信UOS/麒麟KYLINOS…

数据结构 | 查漏补缺之DFS、BFS、二次探测再散列法、完全二叉树、深度计算

目录 DFS&BFS 哈希表-二次探测再散列法 完全二叉树&深度计算 排序 快速排序-挖坑法 插入、选择、冒泡、区别 DFS&BFS 哈希表-二次探测再散列法 完全二叉树&深度计算 排序 快速排序-挖坑法 插入、选择、冒泡、区别 插入从第一个元素开始&#xff0c…

文件上传漏洞(带实例)

漏洞介绍&#xff1a; 现代互联网的Web应用程序中&#xff0c;上传文件是一种常见的功能&#xff0c;因为它有助于提高业务效率&#xff0c;如企业的OA系统&#xff0c;允许用户上传图片&#xff0c;视频&#xff0c;头像和许多其他类型的文件。然而向用户提供的功能越多&#…