一:kafka架构介绍
1. Brokers
kafka集群包括一个或者多个服务器,服务器的节点叫做broker。
2. Topic
- 类似于数据库中的table。
- 物理上不通的topic会分开存储。一个topic的消息会存储在多个broker上。但是在读取的时候,只要选择好topic,不需要管数据在何处。
- 创建流程。
1.controller在ZooKeeper的/brokers/topics节点上注册watcher,当topic被创建,则 controller会通过watch得到该topic的partition/replica分配。
2.controller从/brokers/ids读取当前所有可用的broker列表,对于set_p中的每一个 partition:
2.1从分配给该partition的所有replica(称为AR)中任选一个可用的broker作为新的 leader,并将AR设置为新的ISR
2.2将新的leader和ISR写
入/brokers/topics/[topic]/partitions/[partition]/state
3.controller通过RPC向相关的broker发送LeaderAndISRRequest。 - 删除流程
1.controller在zooKeeper的/brokers/topics节点上注册watcher,当topic被删除,则 controller会通过watch得到该topic的partition/replica分配。
2.若delete.topic.enable=false,结束;否则controller注册在/admin/delete_topics上 的watch被fire,controller通过回调向对应的broker发送StopReplicaRequest。
3. Partition
- Topic的数据分割程伟一个或者多个partition。
- 每个topic至少有一个partition,当生产者产生数据的时候,根据分区分配策略,选择分区,然后将消息最佳到指定分区的末尾。
## Partation数据路由规则
1. 指定了 patition,则直接使用;
2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
3. patition 和 key 都未指定,使用轮询选出一个 patition。
- 每一条消息都会有自增编号。
标识顺序
○ 用于标识消息的偏移量
○ 每个Partition都有自己独立的编号
- partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。 如果topic有多个partition,消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景 下,需要将partition数目设为1。
- 数据存储
每个partition中的数据使用多个segment文件存储。每个segment由.index,.log,timeindex(后续增加进来的)
.log文件结构
包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、
magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等
字段,可以确定一条消息的大小,即读取到哪里截止。
> .index
offset: 7211 position: 448934 CreateTime: 1587632825139 isvalid: true payloadsize: 29 magic: 1 compresscodec: NONE crc: 995429819 payload: 阳光小区,11,1587632825139
offset: 7212 position: 448997 CreateTime: 1587632825139 isvalid: true payloadsize: 28 magic: 1 compresscodec: NONE crc: 2299568067 payload: 单身小区,5,1587632825139
offset: 7213 position: 449059 CreateTime: 1587632825139 isvalid: true payloadsize: 29 magic: 1 compresscodec: NONE crc: 2772987037 payload: 花花小区,12,1587632825139
offset: 7214 position: 449122 CreateTime: 1587632825139 isvalid: true payloadsize: 28 magic: 1 compresscodec: NONE crc: 2369864650 payload: 阳光小区,6,1587632825139
offset: 7215 position: 449184 CreateTime: 1587632825139 isvalid: true payloadsize: 28 magic: 1 compresscodec: NONE crc: 820724779 payload: 单身小区,4,1587632825139
.index文件
offset: 1269114 position: 79002134
offset: 1269231 position: 79009410
offset: 1269316 position: 79014708
offset: 1269456 position: 79023419
offset: 1269715 position: 79039540
offset: 1269838 position: 79047192
offset: 1269933 position: 79053095
offset: 1270083 position: 79062430
如果我们想要读取offset=368776的message(如图),步骤如下:
(1)查找segment file
00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。
当offset=368776时定位到00000000000000368769.index|log
(2)通过segment file查找message
通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。
replication
-
数据会存放到topic的partation中,但是有可能分区会损坏
-
我们需要对分区的数据进行备份(备份多少取决于你对数据的重视程度)
我们将分区的分为Leader(1)和Follower(N)
Leader负责写入和读取数据
Follower只负责备份
保证了数据的一致性 -
备份数设置为N,表示主+备=N(参考HDFS)
Leader
每个partition有多个副本,其中有且仅有一个作为Leader , Leader是当前负责数据的读写的 partition。
- producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
- producer 将消息发送给该 leader
- leader 将消息写入本地 log
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
Follower
- Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower, Follower与Leader保持数据同步。
- 如果Leader失效,则从Follower中选举出一个新的Leader。
- 当Follower挂掉、卡住或者同步太慢, leader会把这个follower从“in sync replicas”(ISR)列表中 删除,重新创建一个Follower。
producer
- 生产者即数据的发布者,该角色将消息发布到Kafka的topic中。
- broker接收到生产者发送的消息后, broker将该消息追加到当前用于追加数据的segment文件中。 生产者发送的消息,存储到一个partition中。
- 生产者也可以指定数据存储的partition。
consumer
- 消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
- kafka 提供了两套 consumer API:
- The high-level Consumer API
- The SimpleConsumer API
high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则 需要开发人员更多地关注细节。
Consumer Group
- 每个Consumer属于一个特定的Consumer Group (可为每个Consumer指定group name,若不 指定group name则属于默认的group)。
- 将多个消费者集中到一起去处理某一个Topic的数据,可以更快的提高数据的消费能力
- 整个消费者组共享一组偏移量(防止数据被重复读取),因为一个Topic有多个分区
offset偏移量
- 可以唯一的标识一条消息
- 偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息
- 消息被消费之后,并不被马上删除,这样多个业务就可以重复使用kafka的消息
- 我们某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制
- 消息最终还是会被删除的,默认生命周期为1周(7*24小时)
Zookeeper
kafka 通过 zookeeper 来存储集群的 meta 信息。