一、消息传递语义
现在我们对生产者和消费者的工作方式有了一些了解,让我们讨论一下Kafka在生产者和消费者之间提供的语义保证。
1、最多发送一次:会造成数据丢失
2、至少发送一次:会造成数据重复消费
3、只发送一次:我们想要的效果
Kafka默认采用2,但允许用户通过在处理一批消息之前禁用生产者的重试并在消费者中提交偏移量来实现1
想要达到第3点需要满足两点:
1、生产者发布消息的持久性保证
2、消费者使用消息的循序性保证
期间可能出现的现象有:
1、生产者和消费者可能挂掉
2、多个消费者消费情况
3、生产者写入磁盘的数据可能会丢失
下面我们分别从生产者和消费者来进行分析
生产者
生产者发布消息时,有消息被“提交”到日志中的概念。一旦发布的消息被提交,且有一个副本分区的broker处于活动状态,消息就不会丢失。
如果生产者尝试发布消息并遇到网络错误,它无法确定该错误是在消息提交之前还是之后发生的。
在0.11.0.0之前:Kafka提供的时至少发送一次语义,因此会造成消息重复发布
从0.11.0.0开始:Kafka生产者还支持幂等传递选项,该选项保证重新发送不会导致日志中的重复条目。为了实现这一点,代理为每个生产者分配一个ID,并使用生产者与每条消息一起发送的序列号对消息进行重复删除。生产者支持使用类似事务的语义学将消息发送到多个主题分区的能力:即要么所有消息都成功写入,要么没有消息。
生产者为我们提供了一个配置项:ack
我们可以根据自己的场景做取舍,你是想要更高的性能还是更好的稳定性?
acks=0:则生产者根本不会等待服务器的任何确认。记录将立即添加到套接字缓冲区并被视为已发送。在这种情况下,无法保证服务器已收到记录,重试配置将不会生效(因为客户端通常不会知道任何故障)。每条记录的偏移量将始终设置为-1。
acks=1:这意味着leader将把记录写入其本地日志,但不会等待所有followers的完全确认。在这种情况下,如果leader在确认记录后立即失败,但在followers复制记录之前失败,那么记录就会丢失。
acks=all这意味着leader将等待整套同步副本确认记录。这保证了只要至少有一个同步副本仍然有效,记录就不会丢失。这是最有力的保证。这相当于acks=-1设置。
消费者
所有副本都有完全相同的日志,具有相同的偏移量。消费者自己维护整个偏移量,如果消费者从未崩溃,它可以将这个位置存储在内存中,但是如果消费者失败了,我们希望这个topic分区被另一个消费者接管,新的消费者将需要选择一个适当的位置开始处理。此时会有如下几种情况:
1、消费者逻辑是先处理消息,再更新偏移量,在更新偏移量前,消息处理成功后挂掉,下一个消费者接替消费时造成重复消费。对应“至少一次”语义
2、消费者逻辑是先更新偏移量,再处理消息。在处理消息前,更新偏移量后挂掉,下一个消费者接替消费时造成消息丢失。对应“最多一次”语义
那么kafka如何保证“仅一次”语义呢?想想生产者向多个topi打数据时用到的事务功能(0.11.0.0后提供),我们也可以把更新偏移量和处理消息放在一个事务中来保证这个语义
二、副本
Kafka可以为每个topic设置副本数,当集群中的服务器发生故障时可以自动转移到这些副本来保证高可用。
副本面向的是topic的分区,在非故障条件下,Kafka中的每个分区都有一个 leader 和零个或多个 followers 。生产者是面向 leader 进行写入的,读取可以转到分区的 leader 或者 followers 。
通常,分区比borker要多,leader在 borker集群中均匀分布,leader和followers的日志保持一直(数据量级和顺序都一致),且有相同的偏移量。当然,在任何时间点,leader肯定比followers多一些还未来得及同步的消息。
followers 拉取 leader 中的消息的原理和 消费者的消费原理类似。
要处理自动故障转移,就需要有一个角色来明确指定各个节点的状态,这个角色就是 controller (它通过ZooKeeper 来选举和协调),大数据架构中大多数都是采用的主从架构,kafka也不例外,每一个broker都需要与 controller 保持心跳来更新元数据,且每一个broker 必须命令 自己节点的 follower 去 同步 leader 的消息(如果controller 没有在 broker.session.timeout.ms 之前收到broker的心跳,就认为该broker脱机,如果集群使用了Zookeeper,那么评判broker是否投机就调整为:broker是否在zookeeper.session.timeout.ms时间内向Zookeeper发送心跳)。
leader 会跟踪 followers 中的副本,这被称为 ISR ,
1、如果所属副本分区的broker处于脱机状态就会被踢出 ISR 列表,
2、如果 followers 中的消息落后 leader 太远(无法在replica.lag.time.max.ms内追上leader的末尾即为滞后副本),也会被踢出 ISR 列表。
在这里要说明的是,Kafka只尝试处理节点突然停止工作然后恢复它。至于所谓的“拜占庭”故障并不会处理。
我们现在可以更精确地定义,当ISR中该分区的所有副本都将消息应用到其日志时,该消息被视为已提交。只有已提交的消息才会提供给消费者。这就导致消费者看到的消息一定都是成功持久化的。另外topci可以设置同步副本“最小数量”(min.insync.replicas),一般我们会设置为2。
ISR vs 过半通过
kafka分区的核心是 副本, 而副本是分布式数据系统中最级别的一个术语,来实现副本有很多种方法。
最简单的,leader始终处于活跃状态,followers 只需要 从 leader 同步数据和顺序即可,当然如果leader如果一直活跃,followers的存在也就失去了意义。
当 leader 挂掉后,我们要做的是从剩下的 followers 中选择一个新的 leader 。此时肯定要选择一个和leader 数据最接近的一个follower。这里我们来对比下Kafka的ISR机制和分布式中常用的过半通过机制
过半通过
优点:延迟仅取决于最快的服务器,如果副本数为3,则延迟由较快的follower而不是较慢的follower决定
缺点:容忍挂掉的或分区写入失败的节点变少,比如3个副本最多只允许1个节点挂掉或同步过慢,5个副本最多只允许2个节点挂掉或同步过慢,7个副本最多允许有3个节点挂掉或同步过慢
ISR
一般我们把min.insync.replicas设置成2,那么只要ISR列表中有2台以上活跃的节点就可以正常允许,也就是如果有7个副本最多允许5个节点挂掉或同步过慢。而如果OSR中的节点重新跟上同步后会再次加入ISR,当leader挂掉后从ISR中找一台节点升级成leader即可。
Zookeeper为什么使用过半通过?因为它涉及的磁盘读写并不是很大,Kafka为什么采用ISR?因为它涉及到了大量数据的持久化,采用SIR可以换取更多的高可用和吞吐量。
极端情况处理
如果所有分区的broker都挂掉,Kafka该如何应对?
如果发生了这种极端现象,首先Kafka对于这个topic就无法工作了,有两中恢复策略
1、等待ISR中的副本复活并选择此副本作为领导者(希望它仍然拥有所有数据)
2、选择第一个复活的副本(不一定在ISR中)作为领导者复活。
这需要在可用性和一致性间做tradeoff,
如果选择方式1且没有ISR中的节点恢复,kafka会保持不可用,如果这些副本被销毁或数据丢失,那么我们就会永久关闭。
如果选择方式2,任意一个节点恢复,kafka就将其置成leader,即使它不能保证拥有所有提交的消息。
0.11.0.0以后默认情况下,Kafka选择第1个策略,并倾向于等待一致的副本。可以通过unclean.leader.election.enable来修改成2策略。
可用性和持久性保证
生产者生产数据时,kafka提供了acks的配置来提升灵活性(0、1、-1或all)。
例如,如果一个主题配置只有两个副本并且一个失败(即,只剩下一个同步副本),则指定acks=all的写入将成功。但是,这些 如果剩余的副本也失败,写入可能会丢失。这样持久性就得不到保证,kafka提供了如下的解决方案:
1、如果所有副本都不可用,那么分区将一直不可用,直到最近的领导者再次可用。
2、指定最小ISR大小——只有当ISR的大小高于某个最小值时,分区才会接受写入,以防止仅写入单个副本的消息丢失,该副本随后变得不可用。此设置仅在生产者使用acks=all并保证消息将被至少这么多同步副本确认时才会生效。此设置提供了一致性和可用性之间的权衡。最小ISR大小的更高设置保证了更好的一致性,因为消息保证会被写入更多副本,从而降低了丢失的可能性。但是,它降低了可用性,因为如果同步副本的数量低于最小阈值,分区将无法进行写入。
副本管理
上面关于副本日志的讨论实际上只涵盖了一个日志,即一个topic分区。然而,Kafka集群将管理成百上千个这样的分区。我们尝试平衡分区 以循环方式在集群中,以避免将大容量主题的所有分区聚集在少数节点上。同样,我们尝试平衡领导力,以便每个节点都是比例的领导者 其分区的份额。
优化leader选举过程也很重要,因为这是不可用的关键窗口。leader选举的天真实现最终会为所有分区运行选举 在节点发生故障时对托管的节点进行分区。Kafka集群有一个称为“controller”的特殊角色 负责管理broker的注册。如果controller检测到broker的失败,它负责选举ISR的剩余成员之一担任新的领导者。 结果是我们能够将许多所需的领导层变更通知批处理在一起,这使得选举过程对大量人来说更便宜、更快 的分区。如果controller本身失败,则将t通过zookeeper选举新的controller
三、日志压缩
我们知道,Kafka中的旧的日志数据会在固定的时间段后,或者日志数据达到某i一个大小后,就会被清理。这些数据适用于时间事件数据,例如:每条记录独立的日志记录。而还有一类数据:这类数据是对key做控制的数据,例如对数据库表的更改。如下是一个示例:
假设我们有一个包含用户电子邮件地址的topic;每次用户更新他们的电子邮件地址时,我们都会使用他们的用户ID作为key向该topic发送一条消息。现在假设我们在一段时间内为id为123的用户发送以下消息,每条消息对应于电子邮件地址的更改(其他id的消息被省略):
123 => bill@microsoft.com
.
.
.
123 => bill@gatesfoundation.org
.
.
.
123 => bill@gmail.com
日志压缩确保Kafka将始终至少保留单个topic的单个分区的数据日志中每个消息key的最后已知value,例如:123 => bill@gmail.com
日志压缩是一种提供细粒度的每条记录保留的机制,而不是粗粒度的基于时间的保留。这个想法是有选择地删除我们使用相同key进行更新的记录。这样,日志就可以保证每个key至少有最后一个状态。
Kafka很灵活,可以为每个topic设置此保留策略,因此一个集群可以有一些topic,其中保留是通过大小或时间强制执行的,而其他topic的保留是通过压缩强制执行的。
压缩原理
下面是一张官网的图片:显示了Kafka日志的逻辑结构以及每条消息的偏移量。
日志的头部与传统的Kafka日志相同。它具有密集的顺序偏移量,并保留所有消息。日志压缩添加了一个处理日志尾部的选项。上图显示了一个带有压缩尾部的日志。
请注意,日志尾部的消息保留了第一次写入时分配的原始偏移量——这永远不会改变。还要注意,即使具有该偏移量的消息已被压缩,所有偏移量在日志中仍然保持有效位置;在这种情况下,该位置与日志中出现的下一个最高偏移量没有区别。例如,在上图中,偏移量36、37和38都是等效位置,任何这些偏移量的读取开始都会返回以38开头的消息集。
压缩也允许删除。带有key且value为空的消息将被视为从日志中删除。这样的记录有时被称为墓碑。此删除标记将导致删除带有该key的任何先前消息(以及带有该key的任何新消息),但删除标记的特殊之处在于,它们自己将在一段时间后从日志中清除以释放空间。不再保留删除的时间点在上图中标记为“删除保留点”。
压缩是通过定期重新复制日志段在后台完成的。清理不会阻塞读取,并且可以限制使用不超过可配置数量的I/O吞吐量,以避免影响生产者和消费者。压缩日志段的实际过程如下所示:
1、任何停留在日志头部的消费者都将看到写入的每条消息;这些消息将具有顺序偏移。 topic的min.compaction.lag.ms可用于 保证消息写入后必须经过的最小时间长度才能被压缩。它提供了每条消息在(未压缩的)头部中保留多长时间的下限。 topic的max.compaction.lag.ms可用于保证写入消息和消息符合压缩条件之间的最大延迟。
2、始终保持消息的排序。压缩永远不会重新排序消息,只需删除key相同的旧数据
3、消息的偏移量永远不会改变。它是日志中位置的永久标识符
4、从日志开始进行的任何消费者都将至少按照写入顺序看到所有记录的最终状态。此外,将看到已删除记录的所有删除标记,前提是 消费者在少于主题delete.retention.ms设置的时间段内到达日志的头部(默认为24小时)。换句话说:由于删除标记的删除发生 与读取同时,如果延迟超过delete.retention.ms,使用者可能会错过删除标记
压缩细节
日志压缩由日志清理器处理,这是一个后台线程池,用于重新复制日志段文件,删除其键出现在日志头部的记录。每个压缩器线程的工作方式如下:
1、它选择日志头与日志尾比率最高的日志
2、它为日志头部的每个key创建最后偏移量的简洁摘要
3、它从头到尾重新复制日志,删除日志中稍后出现的key。新的干净段立即交换到日志中,因此所需的额外磁盘空间只是一个额外的日志段(不是日志的完整副本)。
4、日志头的摘要本质上只是一个空间紧凑的哈希表。它每个条目正好使用24个字节。因此,使用8GB的更清洁缓冲区,一次更清洁的迭代可以清理大约366GB的日志头(假设1k消息)
配置生效
默认情况下启用日志清理器。这将启动清理线程池。 要在特定主题上启用日志清理,请添加特定于日志的属性
log.cleanup.policy=compact
log.cleanup.policy属性是在broker的server.properties文件中定义的;它会影响集群中所有没有配置覆盖的topic,如本文所述。日志清理器可以配置为保留日志中最少量的未压缩“头部”。这可以通过设置压缩时间延迟来启用。
log.cleaner.min.compaction.lag.ms
这可用于防止更新于最低消息年龄的消息受到压缩。如果未设置,则除最后一个段(即当前段)外,所有日志段都有资格进行压缩 被写入。即使活动段的所有消息都早于最小压缩时间延迟,活动段也不会被压缩。 日志清理器可以配置为确保最大延迟,之后日志的未压缩“头部”有资格进行日志压缩。
log.cleaner.max.compaction.lag.ms
这可用于防止生产率低的日志在无限的持续时间内不符合压缩条件。如果未设置,则不会压缩不超过min.清洁.脏.比率的日志。 请注意,此压缩期限不是硬保证,因为它仍然取决于日志清理器线程的可用性和实际压缩时间。 您需要监控uncleanable-partitions-count、max-clee-time-secs和max-compaction-delay-secs指标。
四、资源限制
Kafka集群能够对请求强制执行资源限制,以控制客户端使用的broker资源。Kafka代理可以为共享配额的每组客户端强制执行两种类型的客户端资源限制:
1、网络带宽配额定义字节率阈值(自0.9以来)
2、请求速率配额将CPU利用率阈值定义为网络和I/O线程的百分比(自0.11以来)
为什么需要资源限制
生产者和消费者有可能产生/消耗非常大量的数据或以非常高的速率发送请求,从而垄断broker资源,导致网络饱和,通常DOS其他客户端和broker本身。拥有资源限制的能力可以防止这些问题,在大型多租户集群中尤为重要。
客户端分组
Kafka客户端的身份是用户主体,它代表安全集群中经过身份验证的用户。在支持未经身份验证的客户端的集群中,用户主体是由broker使用可配置的PrincipalBuilder选择的一组未经身份认证的用户。客户端id是客户端应用程序选择的具有有意义名称的客户端的逻辑分组。元组(用户,客户端id)定义了一个共享用户主体和客户端id的安全客户端逻辑组。可以直接对这个元组进行资源限制,如果配置了资源限制为10MB/秒,则会在该用户或者客户端id下的所有生产者之间共享,
配置
可以为(user, client-id)、user和client-id组在多个地方配置资源限制。这些配置会覆盖写入/config/users下的Zookeeper,client-id配额覆盖写入/config/clients下。 所有broker都会读取这些覆盖并立即生效。这使我们无需滚动重启整个集群即可更改资源限制。配置的优先顺序为:
- /config/users/<user>/clients/<client-id>
- /config/users/<user>/clients/<default>
- /config/users/<user>
- /config/users/<default>/clients/<client-id>
- /config/users/<default>/clients/<default>
- /config/users/<default>
- /config/clients/<client-id>
- /config/clients/<default>
网络带宽限制
网络带宽配额被定义为共享配额的每组客户端的字节率阈值。默认情况下,集群broker就已经对每个唯一的客户端组进行了限制:每组客户端每个broker最多可以发布/获取X字节/秒。
请求速率限制
请求速率配额定义为客户端在配额窗口内可以利用每个代理的请求处理程序I/O线程和网络线程的时间百分比。n%的配额表示一个线程的n%,因此配额超出了((num.io.threads+num.network.threads)*100)%的总容量。在被限制之前,每组客户端可以在配额窗口中的所有I/O和网络线程上使用高达n%的总百分比。由于为I/O和网络线程分配的线程数量通常基于代理主机上可用的内核数量,因此请求率配额表示共享配额的每组客户端可能使用的CPU的总百分比。
具体执行
默认情况下,每个唯一的客户端组都会收到集群配置的固定配额。此配额是按每个broker定义的。每个客户都可以在每个broker受到限制之前使用这个配额。我们决定,为每broker定义这些配额比为每个客户端设置固定的集群带宽要好得多,因为这需要一种在所有broker之间共享客户端配额使用的机制。这可能比配额实施本身更难做到!
当broker检测到配额违规时,它会如何反应?
在我们的解决方案中,broker首先计算使违规客户端低于其配额所需的延迟量,并立即返回包含延迟的响应。在获取请求的情况下,响应将不包含任何数据。然后,broker会静音到客户端的通道,不再处理来自客户端的请求,直到延迟结束。在收到非零延迟持续时间的响应后,Kafka客户端还将在延迟期间避免向代理发送进一步的请求。因此,来自受限客户端的请求在双方都被有效阻止。即使使用不尊重broker延迟响应的旧客户端实现,broker通过mute其Soket通道施加的背压仍然可以处理行为不佳的客户端的节流。那些向受限通道发送进一步请求的客户端只有在延迟结束后才会收到响应。
字节率和线程利用率在多个小窗口(例如30个窗口,每个窗口1秒)内进行测量,以便快速检测和纠正配额违规。通常,具有较大的测量窗口(例如10个窗口,每个窗口30秒)会导致大量流量爆发,随后出现长时间延迟,这对用户体验来说并不好。