kafka晋升之路-理论+场景

kafka晋升之路

  • 一:故事背景
  • 二:核心概念
    • 2.1 系统架构
    • 2.2 生产者(Producer)
      • 2.2.1 生产者分区
      • 2.2.2 生产者分区策略
    • 2.3 经纪人(Broker)
      • 2.3.1 主题(Topic)
      • 2.3.2 分区(Partition)
      • 2.3.2 消息
    • 2.4 消费者(Consumer)
      • 2.4.1 消费者组(ConsumerGroup)
      • 2.4.2 协调者(Coordinator)
      • 2.4.3 消费者策略
      • 2.4.4 重平衡 Rebalance
  • 三:进阶概念
    • 3.1 消息幂等
    • 3.2 事务
    • 3.3 拦截器
    • 3.4 控制器(Controller)
    • 3.4 日志存储
    • 3.5 常用参数配置
      • 3.5.1 broker.id
      • 3.5.2 port
      • 3.5.3 zookeeper.connect
      • 3.5.4 log.dirs
      • 3.5.5 auto.create.topics.enable
      • 3.5.6 num.partitions
      • 3.5.7 default.replication.factor
      • 3.5.8 log.retention.ms
      • 3.5.9 message.max.bytes
  • 四:常见问题
    • 4.1 消息丢失问题
      • 4.1.1 生产者消息丢失
      • 4.1.2 消费者消息丢失
    • 4.2 重复消费问题
    • 4.3 消息顺序问题
  • 五:总结提升

一:故事背景

一直在使用消息队列中间件,今天进行梳理其对应的知识。本篇文章将会带你系统梳理常用消息中间件kafka。主要侧重于核心知识、应用场景、常见问题。希望读者能够通过本篇文章系统了解,应用kafka。

二:核心概念

Kafka是一种分布式的,基于发布/订阅的消息系统。消息系统的作用,我们就不在这里讲了,大家可以网上自行查阅。接下来我们主要讲一讲kafka的系统架构,整个架构的各个角色。

2.1 系统架构

通常情况下,一个kafka体系架构包括 「多个Producer」、「多个Consumer」、「多个broker」以及「一个Zookeeper集群」。
一张图胜过千言万语,首先让我们开看一下,kafka的基本的架构。
在这里插入图片描述

这里我们列出了kafka的基本架构,整张图宏观看下来分为了四大部分,Zookeeper是用来做集群管理、元数据以及控制器选择的,抛出它以外剩下的三部分分别是:

  • 生产者
  • 队列
  • 消费者
    这三部分其实就是kafka的核心逻辑,生产、暂存、消费。
    接下来我会围绕这张基本架构的图,详细展开讲解kafka架构的各个部分。

2.2 生产者(Producer)

生产者,负责将消息发送到kafka中。生产者是整个流程的起始位置,如果没有生产者,就没有接下来的流程。针对生产者我们只需要考虑以下两个问题。

  1. 谁来生产
  2. 生产出来放在那

谁来进行消息的生产呢?在kafka中没有对谁来进行生产进行限制,也就是说我不用关注这条消息是谁生产的,我们只需要关注,生产出来的数据应该放在哪里,这就引入了生产者分区的概念

2.2.1 生产者分区

Kafka的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
在逻辑上一个Topic就是一个队列、但是实际上一个Topic内分了多个区,每个区都是一个队列。生产者生产的数据也都是放到分区内的。
在这里插入图片描述
实际上的消息是放到分区的某个块上的,读写操作是针对分区的粒度上进行的。这样每个节点下每个分区都能独立的处理各自的读写请求、增加了系统的效率。

2.2.2 生产者分区策略

分区策略指的就是生产者生产出来的消息要放到具体的topic的哪一个分区下的策略,主要有以下三种:

  1. 轮询策略(Round-robin)
    顺序分配,例如一个topic下有3个分区,第一条消息发送到分区0,第二条发送到1,第三条发送到2。第四条又会重新开始。是kafka默认的分区策略
  2. 随机策略
    将消息放到任意一个分区上,一般是先算出topic的总分区数,返回一个小于它的随机数即可,但是可能会造成数据分布不均匀的情况。
  3. 按消息键保存策略
    Kafka允许为每条消息定义消息键,简称为Key。一旦消息被定义了Key,那么你就可以保证同一个Key的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
    如果没有指定key的话,会走默认的轮询策略。

2.3 经纪人(Broker)

Kafka服务节点,一个或多个Broker组成了一个Kafka集群。每一个broker都是一个单独提供服务的节点,每个broker内可以放置多个topic。

2.3.1 主题(Topic)

Topic是Producer和Consumer订阅的对象,可以为每个业务、每个应用、每类数据创造相应的Topic。我们可以把一个topic看成一个逻辑上的队列,Produce生产消息放入队列,Consumer从队列取出消息进行消费。
在这里插入图片描述

2.3.2 分区(Partition)

kafka支持分区机制,所谓分区指的就是一个Topic内可以有多个分区,虽然这些消息同属一个Topic,但是却有不同的分区,并且只会存在一份。通过分区设置,等于一个Topic内拥有了多个队列,这些Partition可以进行单独支持接收消息,取消息。

2.3.2 消息

在 Kafka 中,“消息”(Message)是指一段数据,它可以是任何形式的信息,例如文本、图像、日志记录等。消息一般包含以下几个关键部分:

  • 主题(Topic): 主题是消息的分类标签。消息会被发布到一个特定的主题中,而消费者可以订阅这些主题来接收消息。主题可以看作是消息的逻辑容器,帮助对消息进行组织和分区。

  • 消息键(Message Key): 每条消息可以有一个可选的键,它用于在发布消息时指定分区。如果消息键被提供,Kafka 会使用哈希算法将所有具有相同键的消息分配到同一个分区中,以确保具有相同键的消息始终位于同一个分区内。

  • 消息值(Message Value): 消息值是实际的数据内容,它可以是任何字节序列,通常是文本或二进制数据。

  • 时间戳(Timestamp): 每条消息可以有一个时间戳,表示消息的创建时间。时间戳可以用于数据处理和分析,以及确保消息的顺序性。

  • 分区(Partition): 主题可以被分成多个分区,每个分区是一个有序、不可变的消息序列。分区可以帮助实现消息的水平扩展,使 Kafka 集群能够处理大量消息。

  • 偏移量(Offset): 每条消息在其所属分区内都有一个唯一的偏移量,用于标识消息在分区中的位置。消费者可以根据偏移量来跟踪已经处理过的消息。

2.4 消费者(Consumer)

消费者Consumer是一种应用程序,它订阅一个或多个 Kafka 主题并从这些主题中拉取消息进行处理
消费者通常的工作流程如下:
在这里插入图片描述
我们可以根据需求,创建多个消费者实例进行并行消费,以实现并行处理。
偏移量(offset)指的是队列中的一个标记变量,其记录了某个消费者组消费到了什么位置,每个消费者组都在其订阅的主题的分区内有对应的偏移量。

2.4.1 消费者组(ConsumerGroup)

  • Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制。一个组内可以有多个消费者实例,他们共享同一个公共的GroupID。
  • 组内的所有消费者一起消费组订阅的主题内的所有分区。每个分区只能由消费者组内一个Consumer实例进行消费。
  • Consumer Group之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。这就是上文所讲每个消费者组都在其订阅的主题的分区内有对应的偏移量

2.4.2 协调者(Coordinator)

  • 协调者,它专门为Consumer Group服务,负责为Group执行Rebalance以及提供位移管理和组成员管理等。
  • Consumer端应用程序在提交位移时,其实是向Coordinator所在的Broker提交位移,同样地,当Consumer应用启动时,也是向Coordinator所在的Broker发送各种请求,然后由Coordinator负责执行消费者组的注册、成员管理记录等元数据管理操作。
  • 所有Broker都有各自的Coordinator组件。

2.4.3 消费者策略

上文我们说到,一个消费者组内有多个消费者实例,这些实例一起消费订阅的主题内的所有分区。那么这些分区应该怎么分呢?一个实例具体应该去消费那个或那几个分区呢?针对这个问题,kafka有3种对应的消费者策略。

  1. Round
    默认的轮循方式、决定消费者分区。假设有7个分区,分区将会如下分配:
    在这里插入图片描述

  2. Range
    对一个消费者组来说决定消费方式是以分区总数除以消费者总数来决定,一般如果不能整除,往往是从头开始将剩余的分区分配开。
    在这里插入图片描述
    一个分区放两个,剩下的轮循。

  3. Sticky
    它是在Range上的一种升华,且前面两个当同组内有新的消费者加入或者旧的消费者退出的时候,会从新开始决定消费者消费方式,但是Sticky,在同组中有新的新的消费者加入或者旧的消费者退出时,不会直接开始新的Range分配,而是保留现有消费者原来的消费策略,将退出的消费者所消费的分区平均分配给现有消费者,新增消费者同理,同其他现存消费者的消费策略中分离。

2.4.4 重平衡 Rebalance

上文说了消费者的分配策略,但是我们的消费者往往都不是静态的,会有消费者上线、下线的情况。这时候就引入了Rebalance机制,Rebalance触发的条件有3个分别是:

  1. 组成员数发生变更。比如有新的Consumer实例加入组或者离开组,或是有Consumer实例崩溃被踢出组。
  2. 订阅主题数发生变更。Consumer Group可以使用正则表达式的方式订阅主题,比如consumer.subscribe(Pattern.compile(“t.*c”))就表明该Group订阅所有以字母t开头、字母c结尾的主题,在Consumer Group的运行过程中,你新创建了一个满足这样条件的主题,那么该Group就会发生Rebalance。
  3. 订阅主题的分区数发生变更。Kafka当前只能允许增加一个主题的分区数,当分区数增加时,就会触发订阅该主题的所有Group开启Rebalance。

三:进阶概念

3.1 消息幂等

在Kafka中,Producer默认不是幂等性的,但我们可以创建幂等性Producer。
设置方法:

props.put(“enable.idempotence”, ture);
或
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIGtrue);

通过设置幂等,kafka会自动对你的消息进行去重,确保重复的消息只会有一条。
一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。它只能实现单会话上的幂等性,不能实现跨会话的幂等性

3.2 事务

事务型Producer能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。
设置方式:
和幂等性Producer一样,开启enable.idempotence = true。
设置Producer端参数transactional. id,最好为其设置一个有意义的名字。

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}

3.3 拦截器

拦截器一共有两类四种,分别是:

  1. onSend:该方法会在消息发送之前被调用。
  2. onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。
  3. onConsume:该方法在消息返回给Consumer程序之前调用。
  4. onCommit:Consumer在提交位移之后调用该方法。

3.4 控制器(Controller)

在kafka中,控制器会由某个Broker节点担任,每个Broker启动时,都会尝试成为控制器,但是只有第一个会成为控制器,当运行中的控制器宕机或中止的时候,kafka会进行Failover(故障转移),选择新的控制器。
控制器主要有以下几个职责:

  • 主题管理(创建、删除、增加分区)
  • 分区重分配
  • Preferred领导者选举
  • 集群成员管理(新增Broker、Broker主动关闭、Broker宕机)
  • 数据服务

3.4 日志存储

Kafka中的消息是以主题为基本单位进行归类的,每个主题在逻辑上相互独立。
每个主题又可以分为一个或多个分区,在不考虑副本的情况下,一个分区会对应一个日志。
在kafka中引入的日志分段的概念,默认最大是1G,超过1G,日志文件就会分出一个新的段。

3.5 常用参数配置

3.5.1 broker.id

每个 kafka broker 都有一个唯一的标识来表示
这个唯一的标识符即是 broker.id,它的默认值是 0
这个值在 kafka 集群中必须是唯一的,可以任意设定

3.5.2 port

默认为9092端口,可以修改

3.5.3 zookeeper.connect

用来指定zookeeper的连接,其中有如下几个指定参数

  • hostname 是 Zookeeper 服务器的机器名或者 ip 地址。
  • port 是 Zookeeper 客户端的端口号
  • /path 是可选择的 Zookeeper 路径,Kafka 路径是使用了 chroot 环境,如果不指定默认使用跟路径。

3.5.4 log.dirs

Kafka 把所有的消息都保存到磁盘上,存放这些日志片段的目录是通过 log.dirs 来指定的

3.5.5 auto.create.topics.enable

自动创建主题、默认情况下,kafka自动创建主题

3.5.6 num.partitions

num.partitions 参数指定了新创建的主题需要包含多少个分区,该参数的默认值是 1。

3.5.7 default.replication.factor

kafka保存消息的副本数。

3.5.8 log.retention.ms

决定数据可以保留多久,默认是 168 个小时,也就是一周

3.5.9 message.max.bytes

限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。

四:常见问题

4.1 消息丢失问题

4.1.1 生产者消息丢失

问题描述:
Kafka Producer是异步发送消息的,也就是说如果你调用producer.send(msg)这个API,那么它通常会立即返回。实际上消息并没有发送成功,消息有可能因为网络抖动、消息过大、等原因没有实际发送成功。
解决方式:
**不要使用producer.send(msg),而要使用producer.send(msg, callback)。**通过回调确认消息是否真正发送成功

4.1.2 消费者消息丢失

问题描述:
Consumer端丢失数据主要体现在Consumer端要消费的消息不见了。Consumer程序从Kafka获取到消息后开启了多个线程异步处理消息,而Consumer程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于Consumer而言实际上是丢失了。
解决方案:
这里问题的关键存在于kafka的自动提交。如果是多线程异步处理消费消息,Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移线程执行失败了,不提交位移,将这组消息视为失败。
但这里有另外一个问题,**可能一组50条的数据,已经成功处理了20条,但是我们又无法去提交成功了20条,位移20条的信息。**我们可以在每个消费者的处理逻辑中,根据业务去验证拿到的消费是否已经消费过,避免重复消费。

4.2 重复消费问题

问题描述:
上文已经提到一种重复消费问题,还有另外一种场景即:
消费者消费时间过长
max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起离开组的请求,Coordinator 也会开启新一轮 Rebalance。
解决方案:

  1. 提高消费能力,提高单条消息的处理速度;根据实际场景可讲max.poll.interval.ms值设置大一点,避免不必要的rebalance;可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。
  2. 生成消息时,可加入唯一标识符如消息id,在消费端,保存最近的1000条消息id存入到redis或mysql中,消费的消息时通过前置去重。

4.3 消息顺序问题

问题描述
由于kafka的设计中一个Topic可以包含多个Partition,每个parttition内部是有序的,kafka只能保证partition内部有序,所以业务设计到需要有序的情况时,需要在parttition层面上进行控制,已达到有序。
解决方案

  1. 可以设置topic,有且只有一个partition
  2. 根据业务需要,需要顺序的 指定为同一个partition
  3. 根据业务需要,比如同一个订单,使用同一个key,可以保证分配到同一个partition上

上述只能从生产的过程中保证生产消息是有序的,但是如果消费者是通过多线程进行消息消费的,任然可能出现顺序不符问题。如果这种情况我们可以在消费者内部,使用队列维护从kafka获得的消息,然后通过锁的方式,控制消息的取出。

五:总结提升

本文讲解了kafka的基本概念、常见问题、通过此篇文章,相信你对kafka已经有了一定的了解,赶紧实验起来吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/83341.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Charles通过逍遥模拟器抓包APP,亲测可用

1.设置http代理. Proxy-->Proxy settings 2.设置ssl proxy-->ssl proxying settings 3.Charles安装证书 弹出证书安装界面,点击"安装证书" 选择当前用户, 选择: 将所有的证书都放入下列存储: 直接下一步,最后确定提示"导入成功" 4.接着设置Charles安…

怎样通过本地电脑搭建SFTP服务器,并实现公网访问?

本地电脑搭建SFTP服务器,并实现公网访问 文章目录 本地电脑搭建SFTP服务器,并实现公网访问1. 搭建SFTP服务器1.1 下载 freesshd 服务器软件1.3 启动SFTP服务1.4 添加用户1.5 保存所有配置 2. 安装SFTP客户端FileZilla测试2.1 配置一个本地SFTP站点2.2 内…

科技项目验收检测报告获取有哪些注意事项,作用都有哪些?

验收测试报告 软件从研发到结束是一个很长的周期,对于软件想要完成上市或者是交付到用户手中之前我们还需要进行一次全面检测,也就是科技项目验收测试,此测试有着严格的要求,需要第三方软件测评机构来完成,并出具科技…

相关搜索引擎常用搜索语法(Google hacking语法和FOFA语法)

一:Google Hack语法 Google Hacking原指利用Google搜索引擎搜索信息来进行入侵的技术和行为,现指利用各种搜索引擎并使用一些高级的搜索语法来搜索信息。既利用搜索引擎强大的搜索功能,在在浩瀚的互联网中搜索到我们需要的信息。 &#xff0…

代码随想录算法训练营第四十三天|LeetCode 123, LeetCode 188

目录 LeetCode 123.买卖股票的最佳时机III 动态规划五步曲: 1.确定dp[i][j]的含义 2.找出递推公式 3.初始化dp数组 4.确定遍历方向 5.打印dp数组 LeetCode 188.买卖股票的最佳时机IV 动态规划五步曲: 1.确定dp[i][j]的含义 2.找出递推公式 3.初始化dp数…

【仿写tomcat】三、通过socket读取http请求信息

仿写tomcat 建立Socket连接获取连接信息查看HTTP信息 建立Socket连接 这里我们也是创建一个专门管理socket的类 package com.tomcatServer.socket;import java.io.*; import java.net.ServerSocket;/*** 套接字存储** author ez4sterben* date 2023/08/15*/ public class Soc…

如何在控制台查看excel内容

背景 最近发现打开电脑的excel很慢,而且使用到的场景很少,也因为mac自带了预览的功能。但是shigen就是闲不住,想自己搞一个excel预览软件,于是在一番技术选型之后,我决定使用python在控制台显示excel的内容。 具体的需…

交互消息式IMessage扩展开发记录

IMessage扩展简介 iOS10新加入的基于iMessage的应用扩展,可以丰富发送消息的内容。(分享表情、图片、文字、视频、动态消息;一起完成任务或游戏。) 简单的将发送的数据内型分为三种: 1.贴纸Stickers; 2.交…

Python“牵手”淘宝商品详情数据采集方法,淘宝API申请步骤说明

淘宝平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范。 淘宝API接口是指通过编程的方式,让开发者能够通过HTTP协议直接访问淘宝平台的数据,包括商品信息、店铺信息、物流信息等,从而实现淘宝平台的数据开…

NEO-6M GPS模块 +无线透传模块组成短距离数据空中传输

NEO-6M GPS模块 无线透传模块组成短距离数据空中传输 📌相关篇《GY-NEO6MV2 GPS模块测试》 🌿NEO-6M GPS模块 🌿透传模块采用的是GC2400-TC017 ✨数据的一收一发,需要配合一个USB转TTL工具,在电脑端通过串口调试助…

无涯教程-PHP - 静态变量函数

静态变量 静态变量在函数退出时值不会丢失,您只需将关键字STATIC放在变量名称的前面,即可将变量声明为静态变量。 <?phpfunction keep_track() {STATIC $count0;$count;print $count;print "<br />";}keep_track();keep_track();keep_track(); ?> 这…

C++ 网络编程项目fastDFS分布式文件系统(四)-fastCGI项目相关技术以及linux搜狗输入法相关问题。

目录 1. Nginx作为web服务器处理请求 2. http协议复习 Get方式提交数据 Post方式提交数据 3. fastCGI 3.1 CGI 3.2 fastCGI 3.3 fastCGI和spawn-fcgi安装 1. 安装fastCGI 2. 安装spawn-fcgi 3.4 nginx && fastcgi 4其他知识点 1. fastCGI环境变量 - fas…

Nacos配置管理、Feign远程调用、Gateway服务网关

1.Nacos配置管理 1.1.将配置交给Nacos管理的步骤 1.在Nacos中添加配置 Data Id服务名称-环境名称.yaml eg&#xff1a;userservice-dev.yaml 2.引入nacos-config依赖 在user-service服务中&#xff0c;引入nacos-config的客户端依赖 <!--nacos配置管理依赖--> <dep…

【使用Zookeeper当作注册中心】自己定制负载均衡常见策略

自己定制负载均衡常见策略 一、前言随机&#xff08;Random&#xff09;策略的实现轮询&#xff08;Round Robin&#xff09;策略的实现哈希&#xff08;Hash&#xff09;策略 一、前言 大伙肯定知道&#xff0c;在分布式开发中&#xff0c;目前使用较多的注册中心有以下几个&…

iOS代码混淆

文章目录 一、混淆的原理二、实现混淆1. 创建文件2. 将文件拖导入目录中3. 将以下脚本拷贝到刚新建的confuse.sh文件中4. 修改文件权限5. 修改项目配置6. 添加需要混淆的方法名7. 配置PCH文件8. 运行效果 一、混淆的原理 这里使用的混淆的原理是&#xff0c;用一串随机生成的字…

发展全球电商业务,为什么首选Facebook Messenger电子商务?

Facebook Messenger电子商务就是使用Facebook的消息传递渠道Messenger来购买和销售产品或服务。通过将 Messenger与客户服务平台&#xff0c;例如SaleSmartly&#xff08;SS客服&#xff09;集成&#xff0c;企业可以利用渠道强大的消息传递功能为客户提供更加个性化和对话式的…

ComPDFKit PDF SDK for Windows Crack

ComPDFKit PDF SDK for Windows Crack 添加了在创建文本框时调整默认属性的支持。 增加了对调整PDF大小时调整宽度的支持。 添加了对编辑文本时更多快捷方式的支持。 优化了文本输入&#xff0c;并将字体样式与原始文本相匹配。 在内容编辑器模式下复制和粘贴时优化了UI交互。 …

提升研发效能的开发工具

一、前言 随着企业对创新和效率的追求不断升级&#xff0c;研发效能成为了炙手可热的概念。各大公司纷纷成立专门的团队&#xff0c;以提升研发效能为重要目标。本文将从研发人员的视角出发&#xff0c;结合自身的深度思考&#xff0c;探讨研发效能的相关概念&#xff0c;共同寻…

Django的模型

定义模型 from django.db import models class User(models.Model):# 类属性是表示表的字段username models.CharField(max_length50,uniqueTrue)password models.CharField(max_length200)create_time models.DateTimeField(auto_now_addTrue) # auto_now_add新增数据时间…