【Kafka】常用操作

1、基本概念

在这里插入图片描述

1. 消息: Kafka是一个分布式流处理平台,它通过消息进行数据的传输和存储。消息是Kafka中的基本单元,可以包含任意类型的数据。

2. 生产者(Producer): 生产者负责向Kafka主题发送消息。它将消息发布到指定的主题,可以按照自定义的逻辑生成消息,并决定消息发送的频率和顺序。

3. 消费者(Consumer): 消费者从Kafka主题订阅并接收消息。它可以以不同的方式消费消息,如批量拉取、实时流式处理或订阅特定的消息主题。

4. 主题(Topic): 主题是Kafka中消息的分类标签,用于组织消息。每个主题可以有多个生产者和多个消费者。主题通常与特定的业务领域或数据类型相关联。

5. 分区(Partition): 主题可以被分割成多个分区,每个分区都是一个有序且持久化的消息队列。分区允许Kafka对消息进行水平扩展,并提供了并行处理和负载均衡的能力。

6. 偏移量(Offset): 偏移量是消息在分区中的唯一标识符,用于表示消息在分区内的顺序位置。消费者可以跟踪偏移量来记录已经读取的消息,以便实现精确的消费位置控制。

7. 消费者组(Consumer Group): 消费者组是一组具有相同逻辑的消费者,它们共同消费一个或多个主题中的消息。消费者组允许Kafka进行水平扩展和负载均衡,在该组内的每个消费者负责处理不同的分区。

8. 副本(Replication): Kafka使用副本机制来提供数据冗余和高可用性。每个分区都可以配置多个副本,这些副本保持分区数据的一致性,并可以替代主副本以提供故障恢复功能。

2、安装部署

参考:
https://juejin.cn/post/7158663198411849741

https://www.cnblogs.com/linjiqin/p/13196347.html

3、常用命令

配置文件解析:cat server.properties

#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600 #kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

启动/关闭 kafka:

cd /usr/local/kafka/kafka_2.12-3.5.0/bin/

bin/kafka-server-start.sh config/server.properties
bin/kafka-server-stop.sh stop

验证kafka是否可以使用,仍在bin目录下

运行kafka生产者发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic sun

运行kafka消费者接收消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sun --from-beginning

4、常用操作API

创建生产者并发送消息

from kafka import KafkaProducer
import time
# 创建生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# 发送单条消息
producer.send('my_topic', b'Hello, Kafka!')

# Kafka的发送实际上是异步的
# 生产者在发送消息之后并不会等待确认消息是否已经成功到达Kafka broker
# 而是立即继续执行下一行代码或退出程序
# 在生产者发送完消息后,给消费者足够的时间来连接到Kafka broker并订阅主题

# 等待消费者订阅主题
time.sleep(2)  # 延迟2秒钟,给消费者足够的时间连接到Kafka并订阅主题

# 发送多条消息
messages = [b'Message 1', b'Message 2', b'Message 3']
for message in messages:
    producer.send('my_topic', message)
time.sleep(2)  # 延迟2秒钟,给消费者足够的时间连接到Kafka并订阅主题

创建消费者并订阅主题并消费消息

from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')

# 消费消息
for message in consumer:
    print(message.value.decode())

指定消费者组和自动提交偏移量

from kafka import KafkaConsumer

# 创建消费者,并指定消费者组和自动提交偏移量
consumer = KafkaConsumer('my_topic', group_id='my_consumer_group',
                         bootstrap_servers='localhost:9092',
                         enable_auto_commit=True)

# 消费消息
for message in consumer:
    print(message.value.decode())

指定消费者组和自动提交偏移量

为什么需要指定消费者组呢?

在Kafka中,消费者组是一组消费者的逻辑名称,它们共同协作来消费一个或多个主题中的消息。通过将消费者组绑定到特定主题上,Kafka能够提供高可用性、负载均衡和容错能力。

指定消费者组有以下几个原因:

  1. 负载均衡: 当多个消费者以相同的消费者组订阅同一个主题时,Kafka会自动分配分区给每个消费者,从而实现负载均衡。每个消费者只处理被分配的分区,这样可以确保所有分区被均匀地消费。
  2. 容错能力: 如果有消费者发生故障或离线,指定消费者组可以确保其他消费者接管该消费者组失去的分区,从而实现容错能力。这意味着即使某些消费者不可用,消息仍然可以被处理。
  3. 消费者协作: 消费者组允许多个消费者协同工作,以实现更高的消费并行度。每个消费者可以独立地处理其分配的分区,并且可以扩展系统的整体处理能力。

需要注意的是,如果您没有为消费者指定消费者组,则它将成为一个独立的消费者。这种情况下,每个消费者将独立地消费所有分区中的消息,而不会共享负载或具备容错能力。

因此,在大多数情况下,为了实现负载均衡、容错和提高处理能力,您应该指定消费者组,尤其是在需要同时处理大量消息或要求高可用性的场景中。如果您只需要简单地消费主题中的消息,而不关注这些特性,那么可以选择不指定消费者组。

手动提交偏移量

from kafka import KafkaConsumer

# 创建消费者,并禁用自动提交偏移量
consumer = KafkaConsumer('my_topic', group_id='my_consumer_group',
                         bootstrap_servers='localhost:9092',
                         enable_auto_commit=False)

# 消费消息并手动提交偏移量
for message in consumer:
    print(message.value.decode())
    consumer.commit()

自动提交偏移量和手动提交偏移量有什么区别呢?

自动提交偏移量(Auto Commit Offset)和手动提交偏移量(Manual Commit Offset)是两种不同的消费者偏移量管理方式。

自动提交偏移量:

  • 在自动提交模式下,消费者会定期自动将已消费的消息偏移量提交给Kafka。
  • 消费者无需显式调用提交偏移量的方法,Kafka会在后台自动处理。
  • 自动提交偏移量可以简化代码,减少了手动提交的复杂性。
  • 然而,自动提交偏移量可能会导致一些问题。例如,如果消费者在处理消息之前发生故障,那么已经消费但尚未提交的偏移量将丢失,造成消息重复或丢失。

手动提交偏移量:

  • 在手动提交模式下,消费者需要显式地调用提交偏移量的方法,将已消费的消息偏移量提交给Kafka。
  • 手动提交偏移量提供了更好的控制能力,可以确保消息的准确处理和可靠提交。
  • 消费者可以在适当的时机调用commit()方法来提交偏移量。通常,在成功处理消息后再进行提交是一个常见的模式。
  • 手动提交偏移量需要额外的代码来管理和处理偏移量的提交,但它提供了更高的灵活性和可靠性。

选择使用自动提交偏移量还是手动提交偏移量取决于具体的使用场景和需求。如果您的应用程序对消息处理的准确性和可靠性要求较高,或者需要更精细的控制以避免重复消费或消息丢失,那么手动提交偏移量可能更适合。否则,自动提交偏移量可以提供一种简化的方式来管理偏移量,尤其在简单的消费者应用中很常见。

手动提交偏移量与自动提交偏移量在性能方面可能存在一些差异,但这取决于具体的使用情况和配置。

性能方面的考虑:

  1. 提交频率: 自动提交偏移量会定期提交偏移量到Kafka服务器,默认情况下是每隔一段时间提交一次。相比之下,手动提交偏移量可以根据应用程序的需求选择何时提交,可以控制提交的频率。如果手动提交偏移量过于频繁,可能会影响性能。
  2. 网络延迟: 手动提交偏移量需要与Kafka服务器进行通信来提交偏移量。如果手动提交偏移量的操作导致频繁的网络调用,而且网络延迟较高,可能会对性能产生一定的影响。
  3. 消息处理时间: 如果消息处理时间很长,手动提交偏移量可能会在处理消息之前进行提交,以保证消息处理的可靠性。然而,这样也会增加提交偏移量的开销,可能降低整体性能。

需要注意的是,性能差异通常是微小的,并且在大多数情况下不会成为主要限制因素。如果性能是一个关键问题,可以根据实际情况进行测试和优化。

此外,可以通过调整参数来改善性能,例如增加自动提交的间隔时间、批量提交偏移量等。使用合适的配置和优化技术可以平衡性能和可靠性之间的权衡。

总而言之,手动提交偏移量可能会稍微影响性能,但仍然取决于具体的使用情况和配置。对于大多数应用程序而言,差异通常是可以接受的,并且可以根据实际需求进行调整和优化。

查看当前有哪些topic

from kafka import KafkaAdminClient

# 创建AdminClient连接到Kafka集群
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')

# 获取主题列表
topic_list = admin_client.list_topics()

# 打印主题列表
print(topic_list)

# ['my_topic', 'sun', '__consumer_offsets']
# __consumer_offsets是Kafka中的一个系统内置主题
# 这个特殊的主题用于存储消费者组的偏移量(offsets)
# 以跟踪消费者在每个分区中读取消息的位置
# __consumer_offsets主题的目的是为了支持Kafka的消费者组功能
# 当消费者组启用自动提交偏移量时,Kafka会将消费者组的偏移量信息存储在__consumer_offsets主题中
# 以便能够在重平衡、故障恢复等情况下为消费者提供正确的偏移量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/45992.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

web自动化测试进阶篇05 ——— 界面交互场景测试

😏作者简介:博主是一位测试管理者,同时也是一名对外企业兼职讲师。 📡主页地址:【Austin_zhai】 🙆目的与景愿:旨在于能帮助更多的测试行业人员提升软硬技能,分享行业相关最新信息。…

stm32 IIC通信

文章目录 IIC 通信一、硬件电路二、IIC时序基本单元三、IIC时序1.指定地址写2.当前地址读3.指定地址读 IIC 通信 IIC总线是一种通用数据总线,有两根通信线(SCL(串行时钟总线),SDA(串行数据总线))。 特点:同…

【SpringCloud Alibaba】(二)微服务环境搭建

1. 项目流程搭建 整个项目主要分为 用户微服务、商品微服务和订单微服务,整个过程模拟的是用户下单扣减库存的操作。这里,为了简化整个流程,将商品的库存信息保存到了商品数据表,同时,使用商品微服务来扣减库存。小伙…

Rust vs Go:常用语法对比(十一)

题目来自 Rust Vs Go: Which Language Is Better For Developing High-Performance Applications?[1] 202. Sum of squares Calculate the sum of squares s of data, an array of floating point values. 计算平方和 package mainimport ( "math")func main() { da…

android数据的储存、文件的储存、SharedPreferences储存、SQLite的基本用法

一、文件的储存 1、将数据储存到文件中 Context类中提供了openfileOutput()方法,用来获取一个文件流,这个方法接收两个参数,第一个参数是文件名,在文件创建的时候使用的就是这个名称,注意这里指定的文件名不可以包含…

React AntDesign写一个导出数据的提示语 上面有跳转的路径,或者点击知道了,关闭该弹层

效果如下: 代码如下: ForwardDataCenterModal(_blank);export const ForwardDataCenterModal (target?: string) > {let contentBefore React.createElement(span, null, 数据正在处理中,请稍后前往);let contentAfter React.creat…

JAVA基础-集合(List与Map)

目录 引言 一,Collection集合 1.1,List接口 1.1.1,ArrayList 1.1.1.1,ArrayList的add()添加方法 1.1.1.2,ArrayList的remove()删除方法 1.1.1.3,ArrayList的contai…

网络超时导致namenode被kill的定位

交换机升级导致部分网络通信超时, 集群的namenode主从切换后,主namenode进程被杀死。 网络问题导致namenode与zk间的连接超时触发了hadoop集群的防脑裂机制而主动kill掉了超时的namenode进程。 日志分析发现zk和namenode之间的网络连接超时: 超时触发了namenode切换,并将超时…

游戏引擎UE如何革新影视行业?创意云全面支持UE云渲染

虚幻引擎UE(Unreal Engine)作为一款“殿堂级”的游戏引擎,占据了全球80%的商用游戏引擎市场,但如果仅仅将其当做游戏开发的工具,显然是低估了它的能力。比如迪士尼出品的电视剧《曼达洛人》、电影《狮子王》等等都使用…

白话机器学习笔记(三)评估已建立的模型

模型评估 在进行回归和分类时,为了进行预测,我们定义了函数 f θ ( x ) f_\theta(x) fθ​(x),然后根据训练数据求出了函数的参数 θ \theta θ。 如何预测函数 f θ ( x ) f_\theta(x) fθ​(x)的精度?看它能否很好的拟合训练数…

【Django学习】(十五)API接口文档平台_项目流程分析_日志器_认证_授权

一、API接口文档平台 使用API接口文档不经可以很好的的维护接口数据,还给测试人员的接口测试工作带来了便利; 我们可以在全局配置文件中添加路由路径生成接口文档 1、使用docs接口文档维护接口 1.1在全局配置文件里指定用于支持coreapi的Schema # 指…

Linux の shell 流程控制

条件控制 # if then 如果else 没有语句 可以省略 if condition then#语句 fi# if then 。。。 else 。。。 fi if condition then#语句 else#语句 fi# if condition then#语句 elif condition2 then#语句 else#语句 fiif [ $a -gt $b ] thenecho "a > b&quo…

骆驼祥子思维导图

《骆驼祥子》简单介绍 《骆驼祥子》小说,以20世纪20年代的旧北京为背景。祥子所处的时代是北洋军阀统治的时代。今天我们就用ProcessOn 思维导图 来给大家解析这本名著。所有文章中的思维导图都可以到ProcessOn 模板社区获得。 1936年,老舍的一位山东大…

[vulnhub]DC2

文章目录 [vulnhub]DC2信息收集flag1flag2cewlwpscan flag3什么是rbash? flag4flag5git提权 总结 [vulnhub]DC2 信息收集 扫ip,有两种方式:arp、nmap nmap -sP 192.168.56.0/24 -T4arp-scan -l192.168.56.137 扫端口: nmap -…

Mendix 创客访谈录|综合业务展示大屏应用开发

本期创客 刘书智 西门子工业领域专家 我在西门子工厂自动化工程有限公司工作。一直从事SCADA产品的技术支持工作,已经过去17个年头了。赶上数字化发展的浪潮,不断学习各种IT技术,践行 IT与OT融合,希望借助自己的IT知识助力OT的发…

力扣1114.按序打印-----题目解析

题目描述 解析: class Foo {public int a 0;public Foo() {}public void first(Runnable printFirst) throws InterruptedException {// printFirst.run() outputs "first". Do not change or remove this line.printFirst.run();a;}public void second…

vue全局状态管理工具 Pinia 的使用

先了解一下关于Pinia的一些故事,面试把这些讲给面试官挺加分的,同时这是我持续学习下去的动力 1.为什么叫Pinia? 官网解释是西班牙语中的 pineapple,即“菠萝”,菠萝花是一组各自独立的花朵,它们结合在一起…

【C语言】函数----详解

🍁 博客主页:江池俊的博客 💫收录专栏:C语言——探索高效编程的基石 💻 其他专栏:数据结构探索 🏩代码仓库:江池俊的代码仓库 🎪 社区:C/C之家社区(欢迎大家加入与我一起…

安装VMware

D:\VMware\VMware Workstation\ 输入许可证

Sentinel 规则持久化到 Nacos

一、Sentinel规则管理模式🍉 Sentinel的控制台规则管理有三种模式: 原始模式🥝 原始模式:控制台配置的规则直接推送到Sentinel客户端,也就是我们的应用。然后保存在内存中,服务重启则丢失 pull模式&#…