kafka系列三:生产与消费实践之旅

在本篇技术博客中,我们将深入探索Apache Kafka 0.10.0.2版本中的消息生产与消费机制。Kafka作为一个分布式消息队列系统,以其高效的吞吐量、低延迟和高可扩展性,在大数据处理和实时数据流处理领域扮演着至关重要的角色。了解如何在这一特定版本中实现消息的高效传输和处理,对于构建健壮的数据管道至关重要。

一、Kafka基础回顾

在深入生产与消费之前,让我们快速回顾一下Kafka的核心概念和架构。Kafka由Brokers、Topics、Partitions、Producers和Consumers组成。每个Broker是一个独立的服务器,负责存储和转发消息;Topic是消息的分类,每个Topic可以分为多个Partitions以实现水平扩展;Producer负责向特定的Topic发送消息;Consumer则从Topic中拉取消息进行处理。

二、Kafka 0.10.0.2生产者详解

2.1 生产者配置与初始化

在Kafka 0.10.0.2版本中,生产者配置变得更为灵活。生产者需要配置bootstrap.servers来指定Kafka集群的地址,acks来控制消息确认策略,以及其他如retriesbatch.sizelinger.ms等参数来优化性能和可靠性。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 配置生产者
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
        props.put("acks", "all"); // 所有副本必须确认接收到消息
        props.put("retries", 0); // 重试次数
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建KafkaProducer实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        for(int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "message-" + i);
            producer.send(record);
        }

        // 关闭生产者
        producer.close();
    }
}

2.2 消息发送与事务支持

Kafka 0.10.0.2引入了对幂等性和事务的支持,这是生产者端的重大改进。幂等性确保了多次发送相同消息至同一Partition时,只会有一次被写入,这对于网络重试场景特别有用。而事务则允许跨多个Partition或Topic的操作具备原子性,这对于需要严格顺序和一致性的场景至关重要。

producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("my-topic", "key", "value"));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // 处理异常
} finally {
    producer.close();
}

三、Kafka 0.10.0.2消费者详解

3.1 新一代消费者API

Kafka 0.10.0.2版本中,消费者API经历了重大重构,引入了新的Consumer API,它摒弃了旧API对ZooKeeper的依赖,转而直接与Kafka Brokers通信,提高了容错性和性能。

3.2 消费者配置与组管理

配置消费者时,需指定group.id来定义消费者所属的消费者组,enable.auto.commit控制自动提交偏移量,以及auto.offset.reset来决定当没有初始偏移量或偏移量无效时如何处理。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 配置消费者
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
        props.put("group.id", "test-group"); // 消费者组ID
        props.put("enable.auto.commit", "true"); // 开启自动提交偏移量
        props.put("auto.commit.interval.ms", "1000"); // 自动提交间隔
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 创建KafkaConsumer实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Arrays.asList("my-topic"));

        // 消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 拉取消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

3.3 消息消费与偏移量管理

消费者通过poll()方法拉取消息,需关注max.poll.records配置来限制每次调用返回的最大记录数。Kafka支持手动和自动两种偏移量提交模式,手动模式给予开发者更多的控制权,自动模式则简化了使用。​​​​​​​

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    consumer.commitAsync();
}

四、性能优化与最佳实践

4.1 生产者优化

  • 批处理:通过调整batch.sizelinger.ms参数,可以提高消息发送的效率。

  • 压缩:启用消息压缩(如gzip、snappy)可以减少网络传输开销,但需权衡压缩和解压缩的CPU成本。

4.2 消费者优化

  • 合理的分区分配:确保消费者组内的消费者数量与Topic的分区数相匹配,避免资源浪费或负载不均。

  • 偏移量管理:根据业务需求选择合适的偏移量提交策略,确保消息不丢失也不重复消费。

五、总结

在Kafka 0.10.0.2版本中,生产者和消费者的增强功能不仅提高了消息处理的可靠性和效率,也为开发者提供了更多灵活性和控制权。通过深入理解生产消费机制,结合合理的配置和最佳实践,可以构建出高效稳定的数据传输管道。尽管随着时间推移,Kafka有了更先进的版本,但0.10.0.2版本仍被广泛应用于遗留系统和特定场景中,其核心概念和机制的学习对于理解和掌握Kafka的演进路径具有重要意义。

图片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/609014.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何安全高效地进行分公司文件下发?

确保分公司文件下发过程中的保密性和安全性&#xff0c;是企业信息安全管理的重要组成部分。以下是一些关键步骤和最佳实践&#xff1a; 权限管理&#xff1a;确保只有授权的人员可以访问文件。使用权限管理系统来控制谁可以查看、编辑或下载文件。 加密传输&#xff1a;在文…

前端面试题 | 常考题整理

本文为面试中出现的高频次考题&#xff0c;具体还是要看所有题。 目录 css 1、☆介绍下 BFC 及其应用 3、☆浮动清除 17、☆说几个未知宽高元素水平垂直居中方法 js 9、☆箭头函数与普通函数的区别是什么&#xff1f;构造函数可以使用 new 生成实例&#xff0c;那么箭头…

Spark云计算平台Databricks使用,SQL

创建workspace&#xff1a;Spark云计算平台Databricks使用&#xff0c;创建workspace和Compute计算集群&#xff08;Spark集群&#xff09;-CSDN博客 1 创建schema 选择Calalog&#xff0c;点击Create schema 输入名字&#xff0c;Storage location选择workspace&#xff0c;数…

Meilisearch vs Elasticsearch

2个搜索引擎的比较&#xff08;官方说法&#xff09;&#xff1a;Meilisearch vs Elasticsearch Elasticsearch 做为老牌搜索引擎&#xff0c;功能基本满足&#xff0c;但复杂&#xff0c;重量级&#xff0c;适合大数据量。 MeiliSearch 设计目标针对数据在 500GB 左右的搜索需…

SpringCloudAlibaba:5.1Sentinel的基本使用

概述 简介 Sentinel是阿里开源的项目&#xff0c;提供了流量控制、熔断降级、系统负载保护等多个维度来保障服务之间的稳定性。 官网 https://sentinelguard.io/zh-cn/ Sentinel的历史 2012 年&#xff0c;Sentinel 诞生&#xff0c;主要功能为入口流量控制。 2013-2017 年…

宝塔面板如何删除一个站点

我们一般的网站都是PHPMySQL开发的&#xff0c;所以删除站点&#xff0c;就要先删数据库&#xff0c;再删网站目录 注意&#xff1a;一点要确保无用的再删 删除站点目录

【C++】CentOS环境搭建-编译安装Boost库(附CMAKE编译文件)

【C】环境搭建-编译安装Boost库 Boost库简介Boost库安装通过YUM安装&#xff08;版本较低 V1.53.0&#xff09;通过编译安装&#xff08;官网最新版本1.85.0&#xff09;1.安装相关依赖2.查询官网下载最新安装包并解压3.编译Boost4.安装Boost库到系统路径 Boost库验证 Boost库简…

通义千问2.5正式发布,能力升级,全面赶超GPT4

简介 在人工智能的大潮中&#xff0c;大模型的竞争愈发激烈。今日&#xff0c;阿里云发布了其最新的通义千问2.5大模型&#xff0c;引起了业界的广泛关注。这款模型不仅在性能上全面赶超了GPT-4&#xff0c;还在多个基准测评中取得了优异的成绩&#xff0c;展现了国产AI技术的…

如何把公章盖在电子档文件上?

将公章盖在电子档文件上&#xff0c;尤其是确保其法律效力和安全性&#xff0c;通常涉及以下步骤&#xff1a; 准备工作 获取合法的电子公章&#xff1a;确保你拥有公司或机构正式授权的电子公章图像&#xff0c;且该图像经过了必要的加密或数字签名处理&#xff0c;以确保其…

LPDDR5电路设计的新功能

最近因为需要使用到LPDDR5&#xff0c;快速地浏览了JEDEC标准文档&#xff0c;发现与前几代相比出现了一些新的电路设计功能&#xff0c;总结为如下三点&#xff1a; 1. CK/WCK/RDQS时钟方案&#xff1b; 2. 电源的PDN设计目标&#xff1b; 3. DQ, DMI和RDQS的Rx端DFE均衡技术。…

五一超级课堂---Llama3-Tutorial(Llama 3 超级课堂)---第一节 Llama 3 本地 Web Demo 部署

课程文档&#xff1a; https://github.com/SmartFlowAI/Llama3-Tutorial 课程视频&#xff1a; https://space.bilibili.com/3546636263360696/channel/collectiondetail?sid2892740&spm_id_from333.788.0.0 操作平台&#xff1a; https://studio.intern-ai.org.cn/consol…

大模型入门(六)—— RLHF微调大模型

一、RLHF微调三阶段 参考&#xff1a;https://huggingface.co/blog/rlhf 1&#xff09;使用监督数据微调语言模型&#xff0c;和fine-tuning一致。 2&#xff09;训练奖励模型 奖励模型是输入一个文本序列&#xff0c;模型给出符合人类偏好的奖励数值&#xff0c;这个奖励数值…

亚马逊云科技中国峰会:与你开启云计算与前沿技术的探索之旅

亚马逊云科技中国峰会&#xff1a;与你开启云计算与前沿技术的探索之旅 Hello,我是科技博主Maynor&#xff0c;非常高兴地向你们推荐亚马逊云科技中国峰会&#xff0c;这是一场将于 5 月 29 日至 30 日在上海世博中心举办的科技盛会&#xff0c;如果你对云计算、行业发展新趋势…

IDEA 使用maven编译,控制台出现乱码问题的解决方式

前言 使用idea进行maven项目的编译时&#xff0c;控制台输出中文的时候出现乱码的情况。 通常出现这样的问题&#xff0c;都是因为编码格式不一样导致的。既然是maven出的问题&#xff0c;我们在idea中查找下看可以如何设置文件编码。 第一种方式 在pom.xml文件中&#xff…

Meta FAIR: 深层网络不合理的低效性

这篇文章的标题"The Unreasonable Ineffectiveness of the Deeper Layers"巧妙地呼应了著名物理学家尤金维格纳在1960年发表的一篇论文"数学在自然科学中不合理的有效性"(The Unreasonable Effectiveness of Mathematics in the Natural Sciences)。维格纳…

FPGA+炬力ARM实现VR视频播放器方案

FPGA炬力ARM方案&#xff0c;单个视频源信号&#xff0c;同时驱动两个LCD屏显示&#xff0c;实现3D 沉浸式播放 客户应用&#xff1a;VR视频播放器 主要功能&#xff1a; 1.支持多种格式视频文件播放 2.支持2D/3D 效果实时切换播放 3.支持TF卡/U盘文件播放 4.支持定制化配置…

Linux运维:centos环境变量

前言 在 Linux 运维工作中&#xff0c;管理环境变量是至关重要的一项任务。在 CentOS 环境下&#xff0c;正确配置环境变量可以使系统更加高效和易于管理。 本文将重点讨论 CentOS 环境下的环境变量设置&#xff0c;并就python的环境变量配置方案进行讲解&#xff08;不包含Ano…

AutoModelForCausalLM.from_pretrained 函数调用本地权重报错

文章目录 1、代码报错的位置&#xff08;前情提要&#xff09;finetune_lora.shfintune_clm_lora.py 2、报错截图2.1、huggingfaces上的 meta-llama/Llama-2-7b-chat-hf2.2、服务器上模型文件路径 3、特别注意事项 1、代码报错的位置&#xff08;前情提要&#xff09; 在终端直…

06.命令的组合使用

命令的组合使用 1.查询当前整个系统每个进程的线程数 我们经常遇到这样的问题&#xff0c;比如某台服务器的CPU 使用率飙升&#xff0c;通过top命令查看是某个程序&#xff08;例如java&#xff09;占用的cpu比较大&#xff0c;现在需要查询java各个进程下的线程数情况。可以通…

Reactor Netty HTTP 服务器端-响应式编程-014

🤗 ApiHug {Postman|Swagger|Api...} = 快↑ 准√ 省↓ GitHub - apihug/apihug.com: All abou the Apihug apihug.com: 有爱,有温度,有质量,有信任ApiHug - API design Copilot - IntelliJ IDEs Plugin | Marketplace The Next Generation API Development Platform …