Kafka
概述
基于Scala语言,是一个分布式,分区的,多副本的,多订阅者的消息队列系统。
优势
- 可靠性:分布式的,分区,复制和容错的。
- 可扩展性:kafka消息传递系统轻松缩放,无需停机。
- 耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是持久的。
- 性能:kafka对于发布和定于消息都具有高吞吐量。即使存储了许多TB的消息,他也爆出稳定的性能。
- kafka非常快:保证零停机和零数据丢失。
架构
Broker:kafka集群中包含一个或者多个服务实例,这种服务实例被称为Broker。一个 broker 可以容纳多个 topic。
Topic:每条发布到kafka集群的消息都有一个类别,这个类别就叫做Topic
Partition:Partition是一个物理上的概念,每个Topic包含一个或者多个Partition
Producer:负责发布消息到kafka的Broker中。
Consumer:消息消费者,向kafka的broker中读取消息的客户端
Consumer Group:每一个Consumer属于一个特定的Consumer Group(可以为每个Consumer指定 groupName)
kafka的消费者方式
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞。
有序性
如何做到消费的有序性
一个主题(topic)下面有一个分区(partition)即可
为什么topic下多个分区不能保证有序
生产者生产数据到borker的多个分区,每个分区的数据是相对有序的,但整体的数据就无序了。因为消费者在消费的时候是一个个的分区进行消费的,所以不能保证全局有序
分区与消费者组间的关系
由一个或者多个消费者组成,同一个组中的消费者对于同一条消息只消费一次。
某一个主题下的分区数,对于同一消费组下的消费者数来说,应该小于等于该主题下的分区数。
生产者分区策略
- 没有指定分区号、没指定key根据轮询的方式发送到不同的分区
- 没有指定分区号、指定了key,根据
key.hashcode%numPartition
- 指定了分区号,则直接将数据写到指定的分区里面去
- 自定义分区策略
//可根据主题和内容发送
public ProducerRecord(String topic, V value)
//根据主题,key、内容发送
public ProducerRecord(String topic, K key, V value)
//根据主题、分区、key、内容发送
public ProducerRecord(String topic, Integer partition, K key, V value)
//根据主题、分区、时间戳、key,内容发送
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
分区数
分区数并不是越多越好,一般分区数不要超过集群机器数量。分区数越多占用内存越大 (ISR 等),一个节点集中的分区也就越多,当它宕机的时候,对系统的影响也就越大。
分区数一般设置为:3-10 个
副本数
一般设置成 2 个或 3 个,很多企业设置为 2 个。
多少个 Topic
通常情况下,多少个日志类型就多少个 Topic。也有对日志类型进行合并的。
数据查找过程
-
通过offset确定数据保存在哪一个segment里面了,
-
查找对应的segment里面的index文件 。index文件都是key/value对的。key表示数据在log文件里面的顺序是第几条。value记录了这一条数据在全局的标号。如果能够直接找到对应的offset直接去获取对应的数据即可
如果index文件里面没有存储offset,就会查找offset最近的那一个offset,例如查找offset为7的数据找不到,那么就会去查找offset为6对应的数据,找到之后,再取下一条数据就是offset为7的数据
Kafka auto.offset.reset值详解
earliest
- 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
- 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
- topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
latest 这个设置容易丢失消息,假如kafka出现问题,还有数据往topic中写,这个时候重启kafka,这个设置会从最新的offset开始消费,中间出问题的哪些就不管了。
压测
Kafka 官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。 Kafka 压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络 IO)。一般都是网络 IO 达到瓶颈。
机器数量
2 ∗ ( 峰值生产速度 ∗ 副本数 / 100 ) + 1 2*(峰值生产速度*副本数/100)+1 2∗(峰值生产速度∗副本数/100)+1
日志保存时间
7 天
硬盘大小
每天的数据量 ∗ 7 天 / 70 % 每天的数据量*7 天/70\% 每天的数据量∗7天/70%
监控
公司自己开发的监控器;
开源的监控器:KafkaManager、KafkaMonitor、kafkaeagle
ISR、OSR、AR
ISR(In-Sync Replicas)副本同步队列
OSR(Out-of-Sync Replicas)
AR(Assigned Replicas )所有副本
ISR 中包括 Leader 和 Follower。如果 Leader 进程挂掉,会在 ISR 队列中选择一个服务作为新的 Leader。replica.lag.max.messages
(延迟条数)和 replica.lag.time.max.ms
(延迟时间)两个参数决定一台服务是否可以加入 ISR 副本队列,在 0.10 版本移除了 replica.lag.max.messages
参数,防止服务频繁的进去队列。 任意一个维度超过阈值都会把 Follower 剔除出 ISR,存入 OSR(Outof-Sync Replicas) 列表,新加入的 Follower 也会先存放在 OSR 中。
新旧消费者的区别
旧的 Kafka 消费者 API 主要包括:SimpleConsumer(简单消费者) 和 ZookeeperConsumerConnectir(高级消费者)。
-
SimpleConsumer 可以使用它从特定的分区和偏移量开始读取消息。
-
高级消费者和现在新的消费者有点像,有消费者群组,有分区再均衡,不过它使用 ZK 来管理消费者群组,并不具备偏移量和再均衡的可操控性。
现在的消费者同时支持以上两种行为。
分区分配策略
在 Kafka 内部存在两种默认的分区分配策略:Range 和 RoundRobin。当以下事件发生时,Kafka 将会进行一次分区分配:
1)同一个 Consumer Group 内新增消费者
2)消费者离开当前所属的Consumer Group,包括shuts down 或 crashes
3)订阅的主题新增分区
将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)
假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据,而且 C1 的
num.streams = 1
,C2 的num.streams = 2
。
Range strategy
原理:将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C2-1。
假设有10个分区,3个消费者线程,10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区
假如我们有11个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6, 7 分区
C2-1 将消费 8, 9, 10 分区
假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区
C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区
C2-1 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区
可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端。
RoundRobin strategy
原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序。
前提条件必须满足:
- 同一个Consumer Group里面的所有消费者的
num.streams
必须相等 - 每个消费者订阅的主题必须相同
所以这里假设前面提到的2个消费者的
num.streams = 2
。最后按照round-robin风格将分区分别分配给不同的消费者线程。
在我们的例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区。
多个主题的分区分配和单个主题类似。
创建Topic时如何将分区放置到不同的Broker中
副本因子不能大于 Broker 的个数;
第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;
其他分区的第一个副本放置位置相对于第0个分区依次往后移。
也就是如果我们有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;
剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift
决定的,而这个数也是随机产生的;
新建的分区会在哪个目录下
在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录。
这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。
当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。
-
如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。
-
如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?
答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为:Topic名+分区ID
注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!
数据量计算
每天总数据量 100g,每天产生 1 亿条日志, 10000 万/24/60/60=1150 条/每秒钟
平均每秒钟:1150 条
低谷每秒钟:50 条
高峰每秒钟:1150 条*(2-20 倍)=2300 条-23000 条
每条日志大小:0.5k-2k
每秒多少数据量:2.3M-20MB
数据丢失
Ack=0,相当于异步发送,消息发送完毕即 offset 增加,继续生产。
Ack=1,leader 收到 leader replica 对一个消息的接受 ack 才增加 offset,然后继续生产。
Ack=-1,leader 收到所有 replica 对一个消息的接受 ack 才增加 offset,然后继续生产。
数据积压
-
如果是 Kafka 消费能力不足,则可以考虑增加 Topic 的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
-
如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
幂等性
Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重。
但是这里的幂等性是有条件的:
-
只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是 无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不 丢不重)。
-
幂等性不能跨多个 Topic-Partition,只能保证单个 Partition 内的幂等性,当涉及多个Topic-Partition 时,这中间的状态并没有同步。
事务
Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基 础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
Producer 事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer 获得的PID 和Transaction ID 绑定。
这样当 Producer重启后就可以通过正在进行的 Transaction ID 获得原来的 PID。
为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。
Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。
Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
Consumer 事务
对于 Consumer 而言,事务的保证就会相对 较弱,尤其时无法保证 Commit 的信息被精确消费。
这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
数据重复
幂等性+ack-1+事务
Kafka数据重复,可以在下一级:SparkStreaming、redis 或者 hive 中 dwd 层去重。
去重的手段:分组、按照 id 开窗只取第一个值;
消息投递保证
Kafka支持三种消息投递语义:
- At most once 消息可能会丢,但绝不会重复传递
读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once。
- At least one 消息绝不会丢,但可能会重复传递
读完消息先处理再commit消费状态(保存offset)。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了,这就对应于At least once。
- Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户想要的
如果一定要做到Exactly once,就需要协调offset和实际操作的输出。经典的做法是引入两阶段提交,但由于许多输出系统不支持两阶段提交,更为通用的方式是将offset和操作输入存在同一个地方。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)。
Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once,而Exactly once要求与目标存储系统协作,Kafka提供的offset可以较为容易地实现这种方式。
高效读写数据
Kafka是分布式消息系统,需要处理海量的消息,Kafka的设计是把所有的消息都写入速度低容量大的硬盘,以此来换取更强的存储能力,但实际上,使用硬盘并没有带来过多的性能损失。kafka主要使用了以下几个方式实现了超高的吞吐率:
- 顺序读写
- 零拷贝
- 文件分段
- 批量发送
- 数据压缩
分区数能增加不能减少
可以使用 bin/kafka-topics.sh
命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数。
Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,还是保留?删除的话,那么这些没消费的消息不就丢了。如果保留这些消息如何放到其他分区里面?追加到其他分区后面的话那么就破坏了 Kafka 单个分区的有序性。如果要保证删除分区数据插入到其他分区保证有序性,那么实现起来逻辑就会非常复杂。
Kafka 缺点
- 由于是批量发送,数据并非真正的实时;
- 仅支持统一分区内消息有序,无法实现全局消息有序;
- 监控不完善,需要安装插件;
- 依赖zookeeper进行元数据管理。
参数优化
Broker 参数配置
(server.properties)
- 网络和 io 操作线程配置优化
# broker 处理消息的最大线程数(默认为 3)
num.network.threads=cpu 核数+1
# broker 处理磁盘 IO 的线程数
num.io.threads=cpu 核数*2
- log 数据文件刷盘策略
# 每当 producer 写入 10000 条消息时,刷数据到磁盘
log.flush.interval.messages=10000
# 每间隔 1 秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000
- 日志保留策略配置
# 保留三天,也可以更短 (log.cleaner.delete.retention.ms)
log.retention.hours=72
- Replica 相关配置
offsets.topic.replication.factor:3
# 这个参数指新创建一个 topic 时,默认的 Replica 数量,Replica 过少会影响数据的可用性,太多则会白白浪费存储资源,一般建议在 2~3 为宜。
Producer 优化
(producer.properties)
buffer.memory:33554432 (32m)
#在 Producer 端用来存放尚未发送出去的 Message 的缓冲区大小。缓冲区满了之后
#可以选择阻塞发送或抛出异常,由 block.on.buffer.full 的配置来决定。
compression.type:none
#默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和
Broker 的存储压力。
Kafka 内存调整
(kafka-server-start.sh)
#默认内存 1 个 G,生产环境尽量不要超过 6 个 G。
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"