目录
1. 使用消息队列的目的(优点与缺点)
2. 常见各消息队列对比
3. kafka介绍
3.1 kafka简介
3.2 kafka特点
3.3 kafka系统架构
3.4 设置数据可靠性
3.4.1 Topic 分区副本
3.4.2 消息确认机制
4. 常见问题(面试题)
4.1 Kafka如何避免消息重复消费?
原因分析
解决方案
4.2 Kafka能保证数据顺序性?
4.3 Kafka消息积压的原因和处理的方法
积压原因
解决方法
4.4 kafka高性能设计
4.5 kafka高可用机制
4.5.1 follower分类
4.5.2 leader宕机,新的leader选举原则
4.6 死信队列与重试队列
(1)死信队列
(2)重试队列
(3)重试队列和延时队列的区别
1. 使用消息队列的目的(优点与缺点)
优点:
- 解耦;
- 异步;
- 流量削峰。
缺点:
- 可用性降低(mq使用集群部署一定程度上可以避免该问题发生)
- 复杂性提高
- 存在数据一致性问题
2. 常见各消息队列对比
消息队列 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
性能 | 单机6000并发 | 单机12000并发 | 单机10万并发 | 单机100万并发 |
持久化 | 支持(但性能会下降) | 支持(但性能会下降) | 支持 | 支持 |
多语言支持 | 支持 | 支持 | 仅支持Java | 支持 |
其它特点 | 缺乏大规模的运用 目前不推荐使用 | 优点:阿里大规模运用,性能比较好 缺点:只能java | 优点:性能最好,支持大数据 缺点:运维难度大;对带宽有一定要求 |
kafka速度快的原因:
- 基于顺序IO,顺序IO可以减少在机器硬盘上磁头反复移动的时间。
- 使用零拷贝技术,kafka程序使用一个叫做sendfile()的系统调用,告诉操作系统直接将数据从操作系统的缓存中复制到网卡缓存区。(该操作是通过DMA(直接内存访问)完成的,该过程CPU不参与),而传统的数据传输方式要4次拷贝过程。
3. kafka介绍
3.1 kafka简介
Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域。
Kafka 是最初由 Linkedin 公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于 Zookeeper 协调的分布式消息中间件系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景。
3.2 kafka特点
(1)同时为发布和订阅提供高吞吐量
Kafka 的设计目标是以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对TB 级以上数据也能保证常数时间的访问性能。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输。
(2)消息持久化
将消息持久化到磁盘,因此可用于批量消费,例如 ETL 以及实时应用程序。通过将数据持久化到硬盘以及 replication 防止数据丢失。
(3)分布式
支持 Server 间的消息分区及分布式消费,同时保证每个 partition 内的消息顺序传输。这样易于向外扩展,所有的producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。
(4)消费消息采用 pull 模式
消息被处理的状态是在 consumer 端维护,而不是由 server 端维护,broker 无状态,consumer 自己保存 offset。(各名词详见3.3)
(5)支持 online 和 offline 的场景
同时支持离线数据处理和实时数据处理。
3.3 kafka系统架构
(1)Broker
一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
(2)Topic(主题)
可以理解为一个队列,生产者和消费者面向的都是一个 topic,类似于数据库的表名。物理上不同 topic 的消息分开存储。
(3)Partition(分区,实现数据分片)
为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列。Kafka 只保证 partition 内的记录是有序的,而不保证 topic 中不同 partition 的顺序。
每个 topic 至少有一个 partition,当生产者产生数据的时候,会根据分配策略选择分区,然后将消息追加到指定的分区的队列末尾。
Partation 数据路由规则:
- 指定了 patition,则直接使用;
- 未指定 patition 但指定 key(相当于消息中某个属性),通过对 key 的 value 进行 hash 取模,选出一个 patition;
- patition 和 key 都未指定,使用轮询选出一个 patition。
注意:
- 每条消息都会有一个自增的编号,用于标识消息的偏移量,标识顺序从 0 开始。
- 每个 partition 中的数据使用多个segment 文件存储。
- 如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下(例如商品秒杀、 抢红包),需要将 partition 数目设为 1。
broker、topic、partition三者的关系:
- broker 存储 topic 的数据。如果某 topic 有 N 个 partition,集群有 N 个 broker,那么每个 broker 存储该 topic 的一个 partition。
- 如果某 topic 有 N 个 partition,集群有 (N+M) 个 broker,那么其中有 N 个 broker 存储 topic 的一个 partition, 剩下的 M 个 broker 不存储该 topic 的 partition 数据。
- 如果某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。
分区的原因:
- 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
- 可以提高并发,因为可以以Partition为单位读写了。
(4)Replication(副本)
副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
(5)Leader
每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition。
- ISR(in-sync-replica,Leader 维护的一个和 Leader 保持同步的 Follower 集合) 同步保存数据,性能不高,一般至少会设置1个ISR列表,其他的尽量设置为普通副本。保证高可用的前提下,数据的时效性。
- 选举时优先从ISR中选定,因为这个列表中的follower的数据是与leader同步的。
- 如果ISR列表中的follower都不行了,就只能从其他的普通follower中选取。
(6)Follower
Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。Follower 只负责备份,不负责数据的读写。
如果 Leader 故障,则从 Follower 中选举出一个新的 Leader。
当 Follower 挂掉、卡住或者同步太慢,Leader 会把这个 Follower 从 ISR 列表中删除,重新创建一个 Follower。
说明:
- ISR副本:in-sync-replica,需要同步复制保存的follower副本。标名为ISR的副本,leader在同步数据时,以同步请求方式。同步请求的数据更具有完整性。
- 普通副本:普通副本,leader在同步数据时,以异步请求方式。异步请求无法保证完整性。
- 如果leader宕机了,肯定是ISR中的副本更加接近leader中的数据。
(7)Producer
生产者即数据的发布者,该角色将消息 push 发布到 Kafka 的 topic 中。
broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中。
生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition。
(8)Consumer
消费者可以从 broker 中 pull 拉取数据。消费者可以消费多个 topic 中的数据。
(9)Consumer Group(CG)
消费者组,由多个 consumer 组成。
所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。可为每个消费者指定组名,若不指定组名则属于默认的组。
将多个消费者集中到一起去处理某一个 Topic 的数据,可以更快的提高数据的消费能力。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,防止数据被重复读取。
消费者组之间互不影响。
消费者数量和topic分区数量的关系如下图,存在三种情况:
- 消费者数量小于分区数量: 每个消费者至少消费一个分区,部分消费者消费多个分区
- 消费者数量等于分区数量:每个消费者消费一个分区
- 消费者数量多于分区数量:部分消费者消费一个分区,多余的消费者不消费消息
(10)offset 偏移量
可以唯一的标识一条消息。
偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息(即消费位置)。
消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 Kafka 的消息。
某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。
消息最终还是会被删除的,默认生命周期为 1 周(7*24小时)。
(11)Zookeeper
Kafka 通过 Zookeeper 来存储集群的 meta 信息。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中;从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为 __consumer_offsets。
也就是说,zookeeper的作用就是,生产者push数据到kafka集群,就必须要找到kafka集群的节点在哪里,这些都是通过zookeeper去寻找的。消费者消费哪一条数据,也需要zookeeper的支持,从zookeeper获得offset,offset记录上一次消费的数据消费到哪里,这样就可以接着下一条数据进行消费。
3.4 设置数据可靠性
3.4.1 Topic 分区副本
在 Kafka 0.8.0 之前,Kafka 是没有副本的概念的,那时候人们只会用 Kafka 存储一些不重要的数据,因为没有副本,数据很可能会丢失。但是随着业务的发展,支持副本的功能越来越强烈,所以为了保证数据的可靠性,Kafka 从 0.8.0 版本开始引入了分区副本(详情请参见 KAFKA-50)。也就是说每个分区可以人为的配置几个副本(比如创建主题的时候指定 replication-factor,也可以在 Broker 级别进行配置 default.replication.factor),一般会设置为3。
Kafka 可以保证单个分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。在众多的分区副本里面有一个副本是 Leader,其余的副本是 follower,所有的读写操作都是经过 Leader 进行的,同时 follower 会定期地去 leader 上的复制数据。当 Leader 挂了的时候,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。
Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。
3.4.2 消息确认机制
前面讲过 Kafka 主题对应了多个分区,每个分区下面又对应了多个副本。
为了让用户设置数据可靠性, Kafka 在 Producer 里面提供了消息确认机制。也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。
可以在定义 Producer 时通过 acks 参数指定(在 0.8.2.X 版本之前是通过 request.required.acks 参数设置的,详见 KAFKA-3043)。这个参数支持以下三种值:
- acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 Kafka 。在这种情况下还是有可能发生错误,比如发送的对象无法被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式, 一定会丢失一些消息。
- acks = 1:意味若 Leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 Leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 Leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 Leader,但在消息被复制到 follower 副本之前 Leader发生崩溃。
- acks = all(这个和 request.required.acks = -1 含义一样):意味着 Leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性。
另外,Producer 发送消息还可以选择同步(默认,通过 producer.type=sync 配置) 或者异步(producer.type=async)模式。如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync。
综上所述,为了保证数据的可靠性,我们最少需要配置一下几个参数:
- producer 级别:acks=all(或者 request.required.acks=-1),同时发送模式为同步 producer.type=sync
- topic 级别:设置 replication.factor>=3(副本的个数,不能大于集群中broker的个数),并且 min.insync.replicas>=2(ISR的个数);
- broker 级别:关闭不完全的 Leader 选举,即 unclean.leader.election.enable=false;(该字段的默认配置为false,默认情况下leader不能从非ISR(in-sync replicas)的副本列表里选择;因为在非ISR副本列表里选择leader,很有可能会导致部分数据丢失,kafka的可用性就会降低。)
4. 常见问题(面试题)
4.1 Kafka如何避免消息重复消费?
原因分析
导致Kafka消息重复消费有以下两个原因:
(1)第1个原因是 Kafka消费端重复提交导致消息重复消费。
如图所示,在Broker上存储的消息,都有一个Offset标记,用来记录消费者消费消息的位置。Kafka的消费者是通过offSet标记来维护当 前已经消费的数据,每消费一批数据,Broker就会更新offSet的值,避免重复消费。(一台 kafka 服务器就是一个 broker)
而默认情况下,消息消费完以后,会自动提交Offset的值,避免重复消费。
但是Kafka消费端的自动提交,会有一个默认的5秒间隔,也就是说在5秒之后的下一次向Broker拉取消息的时候才提交上一批消费的offset。
所以在消费者消费的过程中,如果遇到应用程序被强制kill掉或者宕机的情况,可能会导致Offset没有及时提交,从而产生重复提交的问题。
(2)第2个原因是 Kafka服务端的Partition再均衡机制导致消息重复消费。
如图所示,在Kafka中有一个Partition Balance机制,就是把多个Partition均衡的分配给多个消费者。消费端会从分配到的Partition里面去消费消息,如果消费者在默认的5分钟内没有处理完这一批消息,就会触发Kafka的Rebalance机制,从而导致offset自动提交失败。而Rebalance之后,消费者还是会从之前没提交的offset位置开始消费,从而导致消息重复消费。
这个5分钟的超时时间默认是由Kafka consumer的
session.timeout.ms
和max.poll.interval.ms
参数共同决定的。如果消息处理时间超过这个间隔,consumer可能会被认为已经死亡,并且分配给它的分区将会重新分配给其他consumer。
解决方案
基于这样的背景下,解决重复消费消息问题的方法有几个:
(1)提高消费端的处理性能避免触发ReBalance,比如可以用多线程的方式来处理消息,缩短单个消息消费的时长。或者还可以调整消息处理的超时时间,也还可以减少一次性从Broker上拉取数据的条数。
(2)使用ConsumerRebalanceListener,再均衡监听器,它可以用来设定发生再均衡动作前后的一些准备或者收尾工作。
(3)生产者发送消息的时候带上一个全局唯一的id,消费者拿到消息后,先根据这个id去redis(或mysql中)查一下,之前没有消费过就处理,并且写入这个id到redis(或mysql中),如果消费过了,则不处理。
4.2 Kafka能保证数据顺序性?
不能,因为一个topic的数据存储在不同的分区上,每个分区都有一个按照顺序存储的偏移量,如果消费者关联了多个分区无法保证顺序性。
如果要保证顺序两种做法:
- 发送消息时指定分区号(或者将需要将 partition 数目设为 1);
- 发送消息按照相同业务设置相同的key,也就是通过hash后key的值相同,分区也就相同。
4.3 Kafka消息积压的原因和处理的方法
积压原因
- 上游数据激增(生产侧原因):由于业务系统,访问量徒增,如热点事件,热门活动等,导致了大量的数据涌入业务系统,有可能导致消息积压
- consumer程序挂掉(消费侧原因):由于下游consumer程序故障也会导致大量消息未消费,从而造成消息积压。
- kafka数据倾斜问题: producer 写入数据时候设置的key 发生数据倾斜,导致过度数据写入少量partition。(也可能是不同消费者的消费速度不一样,导致部分partition数据倾斜)
解决方法
- 如果发现是数据倾斜问题,可以在producer测加盐,缓解倾斜问题(使用Kafka Producer消息时,可以为消息指定key,但是要求key要均匀,否则会出现Kafka分区间数据不均衡。 所以根据业务,合理修改Producer处的key设置规则,解决数据倾斜问题。)。
- 部分场景下,历史数据是没有意义的,比如:股价价格,天气数据,可以重置consumer的offset,直接从latest 消费。
- Kafka分区数是Kafka并行度调优的最小单元,如果Kafka分区数设置的太少,会影响Kafka Consumer消费的吞吐量。 如果数据量很大,Kafka消费能力不足,则可以考虑增加Topic的Partition的个数,同时提升消费者组的消费者数量。
4.4 kafka高性能设计
- 批量发送消息:Producer生成消息发送到Broker,涉及到大量的网络传输,如果一次网络传输只发送一条消息,会带来严重的网络消耗。为了解决这个问题,Kafka采用批量发送的方式,通过将多条消息按照分区进行分组,然后每次发送一个消息集合,从而大大减少了网络传输的 overhead。
- 批量压缩:对比压缩单条消息,同时对多条消息进行压缩,能大幅减少数据量,从而更大程度提高网络传输率。
- 分区分段+索引:Kafka通过索引文件提高对磁盘上消息的查询效率。
- 顺序读:顺序IO相对于随机IO,减少了大量的磁盘寻址过程,提高了数据的查询效率。
- 零拷贝:磁盘文件-内容空间缓存区-网卡接口-消费者进程,避免用户态和内核态两次拷贝,避免cpu上下文切换。
零拷贝是指在数据传输过程中,避免了数据的多次拷贝,从而提高了数据传输的效率。
在传统的IO模型中,数据从磁盘中读取到内核缓冲区,然后再从内核缓冲区拷贝到用户缓冲区,最后再从用户缓冲区拷贝到应用程序中。
而在零拷贝模型中,数据可以直接从内核缓冲区拷贝到应用程序中,避免了数据的多次拷贝,提高了数据传输的效率。零拷贝技术可以通过mmap和sendfile等系统调用实现。
4.5 kafka高可用机制
集群+复制机制(leader+follower----- ISR + 普通)
(1)集群机制
- Kafka集群由多个broker组成,每个broker就是Kafka实例。
- 假如某个broker宕机,在集群中其他的broker依然可以对外提供服务,这个就是集群能够保证高可用性。
(2)分区备份机制
- 分区提供了副本机制,一个分区可能存在多个副本,并且这些副本分别存储在不同的broker中。
- leader副本对外提供读写数据,同一个分区下的所有副本存储的内容是一样的。由leader负责把数据同步给其他follower。
- 当某个分区的leader所在的broker宕机了,就会从其他的broker的follower中选举一个成为新的leader,继续对外服务。这样保证了系统的容错性和高可用性。
4.5.1 follower分类
- ISR副本:in-sync-replica,需要同步复制保存的follower副本。标名为ISR的副本,leader在同步数据时,以同步请求方式。同步请求的数据更具有完整性。
- 普通副本:普通副本,leader在同步数据时,以异步请求方式。异步请求无法保证完整性。
- 如果leader宕机了,肯定是ISR中的副本更加接近leader中的数据。
4.5.2 leader宕机,新的leader选举原则
- ISR配置:在broker配置中,可以设定topic副本数量和ISR最少个数。根据实际情况设置。
- ISR同步保存数据,性能不高,一般至少会设置1个ISR列表,其他的尽量设置为普通副本。保证高可用的前提下,数据的时效性。
- 选举时优先从ISR中选定,因为这个列表中的follower的数据是与leader同步的。
- 如果ISR列表中的follower都不行了,就只能从其他的普通follower中选取。
4.6 死信队列与重试队列
(1)死信队列
由于某些原因消息无法被正确地投递,为了确保消息不会被无故地丢弃,一般将其置于一个特殊角色的队列,这个队列一般称为死信队列。
后续分析程序可以通过消费这个死信队列中的内容来分析当时遇到的异常情况,进而可以改善和优化系统。
理解死信队列,关键是要理解死信。死信可以看作消费者不能处理收到的消息,也可以看作消费者不想处理收到的消息,还可以看作不符合处理要求的消息。
- 比如消息内包含的消息内容无法被消费者解析,为了确保消息的可靠性而不被随意丢弃,故将其投递到死信队列中,这里的死信就可以看作消费者不能处理的消息。
- 再比如超过既定的重试次数之后将消息投入死信队列,这里就可以将死信看作不符合处理要求的消息。
(2)重试队列
重试队列指消费端消费消息失败时,为了防止消息无故丢失而重新将消息回滚到broker中。
重试队列一般分成多个重试等级,每个重试等级一般也会设置重新投递延时,重试次数越多投递延时就越大。
举例:消息第一次消费失败入重试队列Q1,Q1的重新投递延时为5s,5s过后重新投递该消息。如果消息再次消费失败则入重试队列Q2,Q2的重新投递延时为10s,10s过后再次投递该消息,以此类推,重试越多次重新投递的时间就越久,为此还需要设置一个上限,超过投递次数就进入死信队列。
(3)重试队列和延时队列的区别
重试队列与延时队列有相同的地方,都需要设置延时级别。
区别是:延时队列动作由内部触发,重试队列动作由外部消费端触发。
延时队列作用一次,而重试队列的作用范围会向后传递。