Kafka中的Topic

在Kafka中,Topic是消息的逻辑容器,用于组织和分类消息。本文将深入探讨Kafka Topic的各个方面,包括创建、配置、生产者和消费者,以及一些实际应用中的示例代码。

1. 介绍

在Kafka中,Topic是消息的逻辑通道,生产者将消息发布到Topic,而消费者从Topic订阅消息。每个Topic可以有多个分区(Partitions),每个分区可以在不同的服务器上,以实现横向扩展。

2. 创建和配置Topic

2.1 创建Topic

使用Kafka提供的命令行工具(kafka-topics.sh)或Kafka的API来创建Topic。下面是一个使用命令行工具创建Topic的示例:

bin/kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

这将创建一个名为my_topic的Topic,有3个分区,复制因子为2。

2.2 配置Topic

Kafka的Topic有各种配置选项,可以通过修改Topic的属性来满足不同的需求。例如,可以设置消息保留时间、清理策略等。以下是一个配置Topic属性的示例:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my_topic --alter --add-config max.message.bytes=1048576

这将修改my_topic的配置,将最大消息字节数设置为1 MB。

3. 生产者和消费者

3.1 生产者

生产者负责将消息发布到Topic。使用Kafka的Producer API,可以轻松地创建一个生产者。以下是一个简单的Java示例代码:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(properties);

producer.send(new ProducerRecord<>("my_topic", "key1", "value1"));
producer.close();

3.2 消费者

消费者从Topic中读取消息。Kafka的Consumer API提供了强大而灵活的方式来实现消费者。

以下是一个简单的Java示例代码:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "my_group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
    }
}

4. 实际应用示例

4.1 实时日志处理

在实时日志处理的场景中,Kafka的Topic可以按照日志类型进行划分,每个Topic代表一种日志类型。这样的设计可以使得系统更具可维护性、可扩展性,并且允许不同类型的日志通过独立的消费者进行处理。以下是一个更详细的示例代码,展示如何在实时日志处理中使用Kafka Topic:

4.1.1 创建日志类型Topic

首先,为不同的日志类型创建各自的Topic。以错误日志和访问日志为例:

# 创建错误日志Topic
bin/kafka-topics.sh --create --topic error_logs --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

# 创建访问日志Topic
bin/kafka-topics.sh --create --topic access_logs --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
4.1.2 生产者发布日志消息

在应用中,生成错误日志和访问日志的代码可能如下:

// 错误日志生产者
Producer<String, String> errorLogProducer = new KafkaProducer<>(errorLogProperties);
errorLogProducer.send(new ProducerRecord<>("error_logs", "Error message"));

// 访问日志生产者
Producer<String, String> accessLogProducer = new KafkaProducer<>(accessLogProperties);
accessLogProducer.send(new ProducerRecord<>("access_logs", "Access log message"));
4.1.3 消费者实时处理日志

创建独立的消费者来处理错误日志和访问日志:

// 错误日志消费者
Consumer<String, String> errorLogConsumer = new KafkaConsumer<>(errorLogProperties);
errorLogConsumer.subscribe(Collections.singletonList("error_logs"));

while (true) {
    ConsumerRecords<String, String> records = errorLogConsumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理错误日志
        System.out.printf("Error Log - Offset = %d, Value = %s%n", record.offset(), record.value());
    }
}

// 访问日志消费者
Consumer<String, String> accessLogConsumer = new KafkaConsumer<>(accessLogProperties);
accessLogConsumer.subscribe(Collections.singletonList("access_logs"));

while (true) {
    ConsumerRecords<String, String> records = accessLogConsumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理访问日志
        System.out.printf("Access Log - Offset = %d, Value = %s%n", record.offset(), record.value());
    }
}
4.1.4 实时监控和分析

消费者可以通过实时处理日志来进行监控和分析。例如,可以使用流处理框架(如Kafka Streams)对日志进行聚合、过滤或转换。以下是一个简化的示例:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> errorLogsStream = builder.stream("error_logs");
KStream<String, String> accessLogsStream = builder.stream("access_logs");

// 在这里进行实时处理,如聚合、过滤等

// 通过输出Topic将处理结果发送到下游系统
errorLogsStream.to("processed_error_logs");
accessLogsStream.to("processed_access_logs");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

通过这种设计,可以根据实际需要扩展不同类型的日志处理,同时确保系统具有高度的灵活性和可扩展性。在实际应用中,可能需要更详细的配置和处理逻辑,以满足具体的监控和分析需求。

4.2 事件溯源

在事件驱动的架构中,事件溯源是一种强大的方式,通过创建一个专门的Kafka Topic来记录每个业务事件的发生,以便随时追踪和回溯整个系统的状态。以下是一个基于Kafka的事件溯源的详细示例代码:

4.2.1 创建事件Topic

首先,为每个关键的业务事件创建一个专用的Kafka Topic,例如order_createdorder_shipped等:

# 创建订单创建事件Topic
bin/kafka-topics.sh --create --topic order_created --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

# 创建订单发货事件Topic
bin/kafka-topics.sh --create --topic order_shipped --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
4.2.2 发布业务事件

在应用中,当业务事件发生时,将事件发布到相应的Topic。以下是一个订单创建事件和订单发货事件的示例:

// 订单创建事件生产者
Producer<String, String> orderCreatedProducer = new KafkaProducer<>(orderCreatedProperties);
orderCreatedProducer.send(new ProducerRecord<>("order_created", "order_id", "Order created - Order ID: 123"));

// 订单发货事件生产者
Producer<String, String> orderShippedProducer = new KafkaProducer<>(orderShippedProperties);
orderShippedProducer.send(new ProducerRecord<>("order_shipped", "order_id", "Order shipped - Order ID: 123"));
4.2.3 事件溯源消费者

为了实现事件溯源,我们需要一个专用的消费者来订阅所有的事件Topic,并将事件记录到一个持久化存储中(如数据库、日志文件等):

// 事件溯源消费者
Consumer<String, String> eventTraceConsumer = new KafkaConsumer<>(eventTraceProperties);
eventTraceConsumer.subscribe(Arrays.asList("order_created", "order_shipped"));

while (true) {
    ConsumerRecords<String, String> records = eventTraceConsumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理事件,可以将事件记录到数据库或日志文件中
        System.out.printf("Event Trace - Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
        // 持久化处理逻辑
    }
}
4.2.4 事件回溯和分析

通过上述设置,可以在任何时候回溯系统中的每个事件,了解事件的发生时间、顺序和内容。通过将事件存储到持久化存储中,可以建立一个事件溯源系统,支持系统状态的分析、回滚和审计。

还可以使用流处理来实时分析事件,例如计算每个订单的处理时间、统计每个事件类型的发生频率等。以下是一个简单的流处理示例:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> eventStream = builder.stream(Arrays.asList("order_created", "order_shipped"));

// 在这里进行实时处理,如计算处理时间、统计频率等

// 通过输出Topic将处理结果发送到下游系统
eventStream.to("processed_events");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

通过这种方式,可以在事件溯源系统中实现强大的监控、分析和管理功能,提高系统的可观察性和可维护性。

5. 消息处理语义

Kafka支持不同的消息处理语义,包括最多一次、最少一次和正好一次。这些语义由消费者的配置决定,可以根据应用的要求进行选择。以下是一个使用最多一次语义的消费者示例代码:

properties.put("enable.auto.commit", "false"); // 禁用自动提交偏移量
properties.put("auto.offset.reset", "earliest"); // 设置偏移量重置策略为最早

Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 处理消息
            System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
        }
        consumer.commitSync(); // 手动提交偏移量
    }
} finally {
    consumer.close();
}

6. 安全性和权限控制

Kafka提供了安全性特性,包括SSL加密、SASL认证等。在生产环境中,确保适当的安全性设置是至关重要的。

以下是一个使用SSL连接的生产者示例:

properties.put("security.protocol", "SSL");
properties.put("ssl.truststore.location", "/path/to/truststore");
properties.put("ssl.truststore.password", "truststore_password");

Producer<String, String> producer = new KafkaProducer<>(properties);

7. 故障容忍和可伸缩性

7.1 多节点分布和分区

在Kafka中,分布式的设计允许数据分布在多个节点上,这提供了高度的可伸缩性。每个Topic可以分成多个分区,而这些分区可以分布在不同的服务器上。这种分布式设计使得Kafka可以轻松地处理大规模数据,并实现水平扩展。

7.1.1 增加分区数

要增加Topic的分区数,可以使用以下命令:

bin/kafka-topics.sh --alter --topic my_topic --partitions 5 --bootstrap-server localhost:9092

这将把my_topic的分区数增加到5,从而提高系统的吞吐量和可伸缩性。

7.2 复制因子

Kafka通过数据的复制来实现容错性。每个分区可以有多个副本,这些副本分布在不同的节点上。在节点发生故障时,其他副本可以继续提供服务。

7.2.1 增加复制因子

要增加Topic的复制因子,可以使用以下命令:

bin/kafka-topics.sh --alter --topic my_topic --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092

这将把my_topic的复制因子增加到3,确保每个分区有3个副本。增加复制因子提高了系统的容错性,因为每个分区都有多个副本,即使一个节点发生故障,其他节点上的副本仍然可用。

7.3 节点故障处理

Kafka能够处理节点故障,确保系统的可用性。当一个节点发生故障时,Kafka会自动将该节点上的分区重新分配到其他可用节点上,以保持分区的复制因子。

7.3.1 节点故障模拟

为了模拟节点故障,你可以通过停止一个Kafka broker进程来模拟。Kafka会自动感知到该节点的故障,并进行分区的重新分配。

# 停止一个Kafka broker进程
bin/kafka-server-stop.sh config/server-1.properties

7.4 性能调优

在实际应用中,通过监控系统的性能指标,你可以调整Kafka的配置以满足不同的性能需求。例如,调整日志刷写频率、调整内存和磁盘的配置等,都可以对系统的性能产生影响。

总结

Kafka的Topic是构建实时流数据处理系统的核心组件之一。通过深入了解Topic的创建、配置、生产者和消费者,以及实际应用中的示例代码,可以更好地理解和应用Kafka。在实际项目中,根据具体需求和场景进行灵活配置,以确保系统的可靠性、性能和安全性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/222067.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

c++函数模板STL详解

函数模板 函数模板语法 所谓函数模板&#xff0c;实际上是建立一个通用函数&#xff0c;其函数类型和形参类型不具体指定&#xff0c;用一个虚拟的类型来代表。这个通用函数就称为函数模板。 凡是函数体相同的函数都可以用这个模板来代替&#xff0c;不必定义多个函数&#xf…

全面解决Error: Uncaught SyntaxError: Invalid Unicode escape sequence

是因为.js文件中的路径转义&#xff08;\&#xff09;错误&#xff0c;可能是windows内的相对路径放到linux中有问题 直接看图&#xff1a; (上面是修改后的&#xff0c;下面的则是原来的) 解决方式&#xff1a; 先在报错浏览器按f12打开调试&#xff0c;选择console窗口查看…

【数据结构与算法篇】八种排序 (C++实现)

多种排序算法的Cpp实现 一. 排序的概念及其运用排序的概念 二. 一图速览常见排序三. 排序的C实现1> 直接插入排序2> 希尔排序希尔排序代码实现(希尔所实现)希尔排序代码实现(优化版) 3> 选择排序选择排序的代码实现(同时选出最大和最小的元素) 4> 堆排序堆排序的代…

俄罗斯方块小游戏开发

代码图&#xff1a; import pygame, randompygame.init()# 游戏界面参数 width 300 height 600 surface pygame.display.set_mode((width, height))# 颜色定义 black (0, 0, 0) white (255, 255, 255) red (200, 0, 0) green (0, 200, 0) blue (0, 0, 200)# 俄罗斯方块…

QT 中 多线程(备查)

基础 一个线程处理窗口事件&#xff0c;其他线程进行逻辑运算 在QT中使用多线程&#xff0c;需要额外注意的&#xff1a; 1&#xff09;默认的线程在Qt中称之为窗口线程&#xff0c;也叫主线程&#xff0c;负责窗口事件处理或者窗口控件数据的更新 2&#xff09;子线程负责后台…

ORA-12560:TNS:协议适配器错误 ORA-12518:TNS:监听程序无法分发客户机连接

ORA-12560:TNS:协议适配器错误的解决方法 造成ORA-12560:TNS:协议适配器错误的问题的原因有三个&#xff1a; 1.监听服务没有起起来。windows平台如下操作&#xff1a;开始一程序一管理工具一服务&#xff0c;打开服务面板&#xff0c;启动oraclehome92TNS listener服务。 2.…

搭建React项目,基于Vite+React+TS+ESLint+Prettier+Husky+Commitlint

基于ViteReactTSESLintPrettierHuskyCommitlint搭建React项目 node: 20.10.0 一、创建项目 安装包管理器pnpm npm i pnpm -g基于Vite创建项目 pnpm create vitelatest web-gis-react --template react-ts进入项目目录安装依赖 $ cd web-gis-react $ pnpm i启动项目 $ pnpm…

CentOS7 部署PostgreSQL

参考文档&#xff1a;https://www.postgresql.org/download/linux/redhat/ 1. 配置yum源 yum install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm2. 安装PostgreSQL13 yum install -y postgresql13-server3…

【MATLAB源码-第95期】基于matlab的协作通信中(AF模式)中继选择算法对比。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 1. 最大最小中继选择 (Max-Min Relay Selection)&#xff1a;这种算法选择能够提供最大最小信号强度的中继。它首先计算所有可用中继的信号强度&#xff0c;然后选择那些在最差信道条件下仍能保持最高信号强度的中继。其目的…

【Git】ssh: connect to host github.com port 22: Connection refused

错误展示&#xff1a; 错误原因&#xff1a;22端口被拒绝访问 解决办法 在~/.ssh/config文件&#xff08;有就直接编辑&#xff0c;没有就创建&#xff09;里添加以下内容&#xff0c;这样ssh连接GitHub的时候就会使用443端口。 Host github.comHostname ssh.github.comPort…

【Linux】Linux基础

文章目录 学习目标操作系统不同应用领域的主流操作系统虚拟机 Linux系统的发展史Linux内核版和发行版 Linux系统下的文件和目录结构单用户操作系统vs多用户操作系统Windows和Linux文件系统区别 Linux终端命令格式终端命令格式查阅命令帮助信息 常用命令显示文件和目录切换工作目…

【Delphi】一个函数实现ios,android震动功能 Vibrate(包括3D Touch 中 Peek 震动等)

一、前言 我们在开发移动端APP的时候&#xff0c;有时可能需要APP能够提供震动功能&#xff0c;以便提醒操作者&#xff0c;特别是ios提供的3D Touch触感功能&#xff0c;操作者操作时会有触感震动&#xff0c;给操作者的感觉很友好。那么&#xff0c;在Delphi的移动端FMX开发中…

亚信安慧AntDB受邀分享核心业务系统全域数据库替换实践

近日&#xff0c;亚信安慧AntDB数据库凭借丰富的核心业务系统升级替换能力和经验&#xff0c;受邀参与IT168组织的第三期“国产软硬件升级替换之路”的直播沙龙。 亚信安慧AntDB数据库相关负责人发表《基于AntDB的CRM全域数据库替换实践》的精彩演讲&#xff0c;通过通信行业率…

cocos creator [Window] Cannot read property ‘dump‘ of null

写脚本的时候&#xff0c;出现了如下的问题&#xff0c; [Window] Cannot read property dump of null 原因&#xff1a;在下图中&#xff0c;方式一是正常的&#xff0c;而方式二则会爆出此错误&#xff0c;所以需要初始化&#xff0c;给它赋值

如何提高Pycharm的使用体验?

汉化 文件---设置---插件---chinese---安装---重启ide 代码补全 tabnine 文件---设置---插件---tabnine---安装---重启ide 重启ide后生效&#xff0c;补全效果如下 自定义背景 文件---设置---外观---背景图像---选择图片---调整透明度保存即可 设置头部声明 英文版…

Python 网络爬虫(四):初识网络爬虫

《Python入门核心技术》专栏总目录・点这里 文章目录 什么是爬虫爬虫的工作原理应用场景反爬虫合法和道德问题Robots 协议练习爬虫的一些网站总结 大家好&#xff0c;我是水滴~~ 在当今数字化时代&#xff0c;互联网上充斥着大量的数据和信息&#xff0c;而我们常常需要从这个…

python笔记:dtaidistance

1 介绍 用于DTW的库纯Python实现和更快的C语言实现 2 DTW举例 2.1 绘制warping 路径 from dtaidistance import dtw from dtaidistance import dtw_visualisation as dtwvis import numpy as np import matplotlib.pyplot as plts1 np.array([0., 0, 1, 2, 1, 0, 1, 0, 0…

android如何优雅的编写OpenGl的shader代码

通常在android里编写openGl代码的方式是创建一个类&#xff0c;类里面用硬编码的形式引入两个shader&#xff0c;如下图&#xff1a; 这里把glsl语言通过string字符串的形式定义在类里&#xff0c;虽然便于管理&#xff0c;但是不利于阅读和编写 那么有没有比较优雅的解决方案…

详解Python 迭代器介绍及作用

文章目录 迭代器&#xff1a;初探什么是迭代器&#xff1f;通过迭代器进行迭代迭代器 for 循环的工作构建自定义迭代器Python 无限迭代器Python 迭代器的好处总结关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包…

Uncle Maker: (Time)Stamping Out The Competition in Ethereum

目录 笔记后续的研究方向摘要引言贡献攻击的简要概述 Uncle Maker: (Time)Stamping Out The Competition in Ethereum CCS 2023 笔记 本文对以太坊 1 的共识机制进行了攻击&#xff0c;该机制允许矿工获得比诚实同行更高的挖矿奖励。这种名为“Uncle Maker”的攻击操纵区块时间…