zookeeper + kafka 消息队列
一、消息队列简介
1、什么是消息队列
消息队列(Message Queue)是一种用于跨进程或分布式系统
中传递消息的通信机制。消息队列在异步通信、系统解耦、负载均衡和容错方面具有重要作用。
(1)特性
-
异步通信:发送方将消息发送到队列中后,不需要等待接收方处理完毕即刻返回继续执行。接收方可以在需要时从队列中读取并处理消息。
-
解耦:消息队列在发送方和接收方之间充当中介,允许它们独立运行。这样,即使其中一个部分暂时不可用,系统的整体功能依然可以保持正常。
-
负载均衡:通过消息队列,多个消费者可以分担消息处理的工作量,提高系统的吞吐量。
-
可靠性:消息队列可以确保消息在传递过程中不丢失,即使在系统出现故障时也能保证消息被妥善处理。
-
扩展性:可以根据需要增加消息生产者或消费者,从而轻松扩展系统。
-
缓冲和流量控制:在高并发的场景下,系统的不同部分可能无法以相同的速度处理请求。消息队列可以作为缓冲区,调节生产者和消费者之间的速度差异,防止系统过载或崩溃。
(2)用途
- 任务队列:在需要异步处理任务的情况下,消息队列非常有用。例如,Web 应用在接收到用户请求后将耗时操作(如视频处理、文件转换)放入队列中,立即返回响应,后续处理由工作线程完成。
- 事件通知:在事件驱动系统中,组件之间可以通过消息队列进行通信和通知。例如,当用户在网站上完成购买操作后,消息队列可以通知库存管理系统更新库存。
- 日志收集:可以将系统中产生的日志消息通过消息队列发送到集中式日志处理系统中,以便进行实时分析和监控。
- 数据流处理:用于处理数据流的系统,如实时分析平台,可以通过消息队列来传递和处理数据。
2、常见的消息队列软件
(1)RabbitMQ
特点:
- 基于AMQP(Advanced Message Queuing Protocol)协议,具有高度灵活性。
- 支持多种消息传递协议,包括 STOMP、MQTT 和 HTTP。
- 提供可靠的消息传递机制,包括消息确认、持久化、发布确认等。
- 支持复杂的路由机制,如使用交换机(Exchanges)和绑定(Bindings)来实现消息路由。
- 易于管理和监控,提供了丰富的管理工具和插件。
用途:
- 企业级应用程序集成。
- 任务调度系统。
- 实时消息通知系统。
优点:
- 功能强大,支持多种协议和复杂的消息路由。
- 稳定性高,适用于企业级应用。
- 良好的文档和社区支持。
缺点:
- 对资源要求较高,配置和维护相对复杂。
- 吞吐量相对较低,不适合高吞吐量的实时数据流处理。
(2)Apache Kafka
特点:
- 分布式流处理平台,提供高吞吐量、低延迟的消息传递。
- 支持持久化,所有消息都被持久化到磁盘上,可以通过配置保留时间。
- 基于发布-订阅模型,消费者可以独立地读取和处理消息。
- 支持分区(Partition)和副本(Replica),保证高可用性和容错性。
- 强大的扩展性,能够轻松扩展以处理大规模数据。
用途:
- 大数据实时处理和分析。
- 日志收集和处理。
- 流式数据处理(如物联网数据、金融交易数据)。
优点:
- 高吞吐量,适合处理大量实时数据。
- 低延迟,消息传递和处理速度快。
- 可扩展性强,适合大规模分布式系统。
缺点:
- 学习曲线较陡,配置和管理相对复杂。
- 消费者需要处理消息的顺序性和幂等性,增加了应用程序的复杂性。
(3)ActiveMQ
特点:
- 基于JMS(Java Message Service)规范的开源消息中间件。
- 支持多种消息传递协议,包括 OpenWire、STOMP、MQTT、AMQP、REST、WebSocket 等。
- 提供丰富的特性,如消息持久化、事务、消息优先级等。
- 易于嵌入 Java 应用程序,支持多语言客户端(如 C++、.NET、Python、Perl、PHP)。
- 提供了管理控制台和监控工具,便于管理和监控消息系统。
用途:
- 企业应用集成。
- 任务调度和自动化流程。
- 消息驱动的微服务架构。
优点:
- 功能全面,支持多种协议和语言。
- 与 Java 应用程序集成良好。
- 社区活跃,文档丰富。
缺点:
- 性能相对较低,不适合高吞吐量的场景。
- 在处理大规模分布式系统时,扩展性有限。
(4)Amazon SQS
特点:
- AWS 提供的完全托管的消息队列服务,无需管理服务器和基础设施。
- 提供标准队列和 FIFO(先入先出)队列,满足不同的消息传递需求。
- 支持自动扩展,能够处理任意数量的消息。
- 与 AWS 生态系统无缝集成,易于与其他 AWS 服务(如 Lambda、S3、SNS)集成。
- 提供消息可见性超时、消息延迟、消息批处理等功能。
用途:
- 分布式系统中的异步任务处理。
- 微服务架构中的消息传递。
- 日志和事件驱动的工作流。
优点:
- 托管服务,无需运维,简单易用。
- 高可用性和可靠性,由 AWS 提供保障。
- 可与 AWS 生态系统中的其他服务轻松集成。
缺点:
- 成本可能较高,特别是在高消息吞吐量的场景中。
- 对于非常高性能和定制化需求的场景,灵活性可能不足。
二、kafka
Apache Kafka 是一个分布式流处理平台,最初由LinkedIn开发,并于2011年成为Apache开源项目。Kafka主要用于构建实时数据管道和流处理应用程序,它具有高吞吐量、低延迟、可扩展性和容错性。
1、kafka 架构
Kafka 的架构主要包括以下几个核心组件:
1. Producer(生产者)
生产者是负责发布消息到 Kafka 主题的客户端。生产者可以选择将消息发送到特定的分区,也可以通过键进行分区路由。
2. Consumer(消费者)
消费者是负责从 Kafka 主题中读取消息的客户端。消费者可以订阅一个或多个主题,并以流的方式处理数据。消费者通常会组成一个消费者组(Consumer Group),每个消费者组可以同时读取和处理消息,实现负载均衡。
3. Broker(代理)
Kafka 集群中的每个服务器称为一个代理(Broker)。每个代理负责接收、存储和转发消息。一个 Kafka 集群通常由多个代理组成,以实现高可用性和容错性。
4. Topic(主题)
- 主题是 Kafka 中数据的分类容器。每个主题可以有多个生产者和消费者。主题可以分为多个分区,每个分区内的消息是有序的,但跨分区的消息顺序不保证。
- 在 Kafka 中,一个代理(Broker)可以承载多个主题(Topic)。具体有多少个主题取决于 Kafka 集群的配置和使用场景。
(1)分区内的顺序性
- 在一个分区内,消息是严格有序的,这意味着消息的生产顺序与消费顺序一致。消费者在消费消息时,按照偏移量顺序读取消息。
(2)跨分区的无序性
- 虽然分区内的消息是有序的,但不同分区之间的消息顺序不保证。跨分区的消息无序性是由于分区是独立的并行处理单元,生产者可以同时向多个分区发送消息,导致整体顺序无法保证。
(3)特征
-
多生产者和多消费者:一个主题可以有多个生产者发布消息,也可以有多个消费者订阅和消费消息。这种特性使得 Kafka 可以轻松实现多对多的数据传递。
-
逻辑分组:主题用于将相同类别的数据进行逻辑分组,便于管理和处理。例如,可以创建一个主题来存储网站的访问日志,另一个主题来存储订单信息。
5. Partition(分区)
分区是 Kafka 中并行处理的基本单元。每个分区在磁盘上是一个日志文件,消息以追加的方式写入。分区提供了高吞吐量和并行处理能力。
(1)特性
- 顺序性:在一个分区内,消息是严格有序的。每条消息都有一个唯一的偏移量(Offset),表示消息在分区中的位置。消费者在消费消息时,会按照偏移量顺序逐条读取消息。
- 并行处理:主题的分区允许 Kafka 在多个服务器上并行处理消息。生产者可以将消息分布到不同的分区,消费者也可以从多个分区并行读取消息,从而实现负载均衡和高吞吐量。
- 副本机制:每个分区可以有多个副本(Replica),其中一个为领导者(Leader),其余为追随者(Follower)。所有的读写操作都由领导者处理,追随者从领导者复制数据。副本机制保证了数据的高可用性和容错性。
6. Zookeeper
Zookeeper 负责存储集群的元数据,如代理信息、分区状态等。它还负责选举 Kafka 集群的控制器,管理分区的副本分配和故障恢复。
2、partition的路由规则与默认策略
(1) Partition 的数据路由规则
-
数据路由规则决定了生产者如何将消息发送到 Kafka Topic 的各个 Partition。Kafka 提供了几种常见的 Partitioning 策略,具体如下:
-
Round-robin(轮询):这是最简单的策略,生产者轮流将消息发送到不同的 Partition。如果所有 Partition 都有相似的负载和数据量,这种策略可以实现基本的负载均衡。但是,它不能保证消息的相关性或有序性。
-
Hash-based(基于哈希):生产者使用消息的某个属性(如 key)计算哈希值,然后根据哈希值将消息路由到对应的 Partition。这种策略可以保证具有相同 key 的消息总是被发送到同一个 Partition,从而保证了这些消息的顺序性。
-
Custom Partitioner(自定义分区器):开发者可以根据自己的需求实现自定义的 Partitioner 接口,来控制消息的分区逻辑。这种方式灵活性最高,可以根据业务需求定义非常复杂的分区逻辑。
(2) 默认的 Partitioning 策略
-
在 Kafka 中,默认的 Partitioning 策略是基于消息的 key 进行哈希分区。具体步骤如下:
-
如果消息有 key,则使用 key 进行哈希计算,然后将哈希值与 Topic 的 Partition 数取模,以确定消息发送到哪个 Partition。
-
如果消息没有 key,则使用轮询策略,即将消息依次发送到每个 Partition,实现简单的负载均衡。
3、消费者组详细介绍
(1)介绍
-
消费者组成员:
- 消费者组由多个消费者实例(Consumer Instance)组成。
- 每个消费者实例通常运行在不同的进程或者不同的计算机上。
-
主题分区分配:
- 消费者组的每个实例会订阅一个或多个主题。
- 每个主题被分为多个分区(Partitions),每个分区只能由消费者组中的一个实例进行消费。
- Kafka 通过分区的分配策略将分区均匀分配给消费者组中的各个实例,确保每个分区只被一个消费者实例消费,但一个消费者实例可以消费多个分区。
-
消费者组协调器:
- Kafka 集群中有一个特殊的 Broker 作为消费者组的协调器(Consumer Group Coordinator)。
- 每个消费者组有一个唯一的消费者组 ID,由协调器管理和分配。
(2)消费者组的工作机制
-
消费者协调与分配:
- 当消费者组中的一个新消费者实例加入或离开时,或者分区发生重新分配时,消费者组的协调器负责重新分配分区给消费者实例。
- 分区的分配基于消费者实例的订阅和分配策略(Partition Assignment Strategy),如范围分配、轮询分配等。
-
消息处理并行性:
- 每个分区内的消息是有序的,而不同分区之间的消息顺序不保证。
- 消费者组中的每个消费者实例在处理分区内的消息时是单线程的,但是多个实例可以并行处理不同分区的消息,从而提高整体的消费吞吐量。
-
消费者偏移提交:
- 消费者组中的每个消费者实例会周期性地提交消费偏移(Offset)到 Kafka 集群,以记录其对消息的消费进度。
- Kafka 提供了自动和手动两种提交偏移的方式,确保消息处理的可靠性和一致性。
(3)消费者组的应用场景
-
消费者组在 Kafka 中有多种应用场景,包括但不限于:
-
并行处理:多个消费者实例并行处理同一主题的消息,提高消费吞吐量和效率。
-
水平扩展:通过增加消费者实例,可以水平扩展消费者组的处理能力,适应大规模数据流的需求。
-
容错和高可用性:当一个消费者实例故障或下线时,协调器会重新分配其负责的分区给其他实例,确保消息的连续性和可用性。
4、偏移量详细介绍
(1)偏移量概念
-
偏移量是一个64位的整数,用来唯一标识消费者在一个特定分区中已经消费过的消息位置。每个消费者都会为每个分区维护一个偏移量。偏移量的作用包括:
-
消费位置的记录:偏移量表示消费者已经处理并成功提交的消息位置。消费者会定期地更新偏移量,以记录自己的消费进度。
-
消息处理的顺序性:Kafka 保证每个分区内的消息顺序,消费者通过记录偏移量来确保消息的有序消费,避免重复消费或消息丢失。
-
消费者的恢复:如果消费者实例停止或重启,它可以利用存储的偏移量来恢复消费位置,从上次离开的地方继续消费,而不会丢失消息。
(2)偏移量的管理方式
在 Kafka 中,偏移量的管理可以通过以下几种方式实现:
- 自动提交偏移量:
- 消费者可以选择开启自动提交偏移量的功能,Kafka 将定期(根据配置的间隔)自动将消费者的偏移量提交到 Kafka 集群中。
- 这种方式简单方便,但可能会因为提交频率不合适而导致消息的重复消费或丢失。
- 手动提交偏移量:
- 消费者可以显式地在适当的时候手动提交当前的偏移量。
- 手动提交偏移量可以精确地控制偏移量的提交时机,避免自动提交可能出现的问题,如重复消费或消息丢失。
- 偏移量存储:
- 消费者通常会将偏移量存储在外部系统中,如数据库或文件系统。这样做的好处是,即使消费者实例重新启动或扩展,也能够方便地恢复之前的消费进度。
(3)偏移量的生命周期
偏移量的生命周期包括:
- 消费者组内偏移量管理:
- 每个消费者组内的每个消费者实例会独立地管理自己负责的分区的偏移量。
- 每次消费者处理完消息后,会更新并提交偏移量。这个过程可以是自动的(由 Kafka 管理)或者手动的(由应用程序控制)。
- 偏移量的保留和删除:
- Kafka 默认会保留消费者组的偏移量信息一段时间,以便在消费者重新加入或恢复时使用。
- 可以通过 Kafka 配置来调整偏移量的保留时间(retention period)或存储方式。
- 偏移量的提交:
- 偏移量提交到 Kafka 集群后,会被持久化存储,确保在 Kafka 集群故障或消费者重启时,偏移量信息不会丢失。
5、kafka中的副本同步
在 Apache Kafka 中,副本(Replica)的同步是通过一种基于日志的复制机制来实现的,具体过程如下:
(1) 副本同步基本原理
- 领导者和追随者:
- 每个分区(Partition)都有一个领导者(Leader)和若干个追随者(Follower)。
- 领导者负责处理所有的读写请求,并维护分区的写入顺序。
- 追随者从领导者复制数据,并保持与领导者的数据同步。
(2) 副本同步过程
副本同步的过程主要分为两个阶段:首先是领导者将数据写入本地日志(Leader Log),然后追随者从领导者的日志中复制数据。
(2.1) 领导者写入本地日志(Leader Log)
-
生产者发送消息:
- 当生产者向 Kafka 发送消息时,消息首先写入分区的领导者副本(Leader Replica)的本地日志中。
-
消息确认:
-
消息发送确认类型:
-
Kafka 提供了三种消息发送确认类型,分别是:
- acks=0:生产者发送消息后,不等待服务端的任何确认,消息被认为已发送成功。
- acks=1:生产者发送消息后,等待服务端的领导者副本确认接收到消息后,即认为消息发送成功。
- acks=all(或 acks=-1):生产者发送消息后,等待服务端的所有 ISR(同步副本)确认接收到消息后,即认为消息发送成功。
-
-
消息持久性保证:
-
根据不同的确认类型,Kafka 提供了不同级别的消息持久性保证:
- acks=0:消息可能会丢失,适用于对数据实时性要求高,但可以容忍少量消息丢失的场景。
- acks=1:消息被确认写入领导者副本后即认为成功,数据不会丢失,但可能会存在一定程度的重复发送。
- acks=all:消息被所有 ISR 中的副本确认写入后才认为成功,提供了最高级别的数据持久性保证,但延迟较高。
-
-
生产者确认机制:
- 当生产者发送消息后,可以通过设置
acks
参数来选择确认级别。生产者可以通过配置来平衡消息传递的可靠性和延迟。
- 当生产者发送消息后,可以通过设置
-
消息确认机制的工作流程
- 生产者发送消息到 Kafka 集群的指定主题和分区。
- 根据生产者配置的
acks
参数,生产者可能会等待领导者副本或所有 ISR 中的副本确认消息的接收。 - 如果确认级别设为
acks=all
,生产者会等待所有 ISR 中的副本都确认接收消息,然后才会收到确认。 - 如果确认级别设为
acks=1
,生产者会等待领导者副本确认接收消息,并且不会等待其他 ISR 中的副本确认。
-
(2.2) 追随者从领导者复制数据
-
追随者复制数据:
- 追随者定期从领导者获取数据块(batch)并复制到本地的日志中。
- 追随者使用 Kafka 协议从领导者拉取数据,确保领导者和追随者之间的数据一致性。
-
同步方式:
-
Kafka 支持两种类型的复制同步方式:
-
同步复制
- 在 Kafka 中,同步复制是指追随者副本在接收到数据后必须向领导者发送确认,领导者才会继续处理新的写入请求。
- 这种方式确保了每条消息在所有的 ISR 中的副本都得到了写入确认,保证了数据的可靠性和一致性,但会增加延迟。
-
异步复制
- 异步复制是指追随者副本在接收到数据后不会向领导者发送确认,领导者可以立即继续处理新的写入请求。
- 追随者会在后台异步地复制数据,这样可以降低写入操作的延迟,但可能会造成一段时间内领导者和追随者之间的数据不一致。
-
(3) 同步机制细节
-
数据批次:
- Kafka 使用数据批次(batches)来减少网络开销和提高效率。领导者会将多个消息组合成一个批次,然后一次性发送给追随者。
-
保序性:
- Kafka 保证同一分区的消息在所有的副本之间的顺序一致性。即使跨越多个副本,消息也会按照写入的顺序进行复制和提交。
-
ISR 机制:
- ISR 是指与领导者保持同步的副本集合。只有在 ISR 中的副本才能参与到消息的读写操作中,确保了高可用性和一致性。
(4) 故障处理
-
领导者故障:
- 如果领导者宕机,Kafka 会从 ISR 中选择一个新的领导者。
- 新的领导者会继续从上一个领导者复制未提交的数据,并负责后续的写入操作。
-
追随者故障:
- 如果追随者宕机,领导者会继续向其他追随者发送数据,直到该追随者重新加入 ISR 并恢复复制。
6、ISR 的定义和作用
(1)ISR 的定义:
- ISR 是指与分区的领导者副本保持同步的一组副本。领导者会将消息写入 ISR 中的所有副本,确保数据的一致性和可用性。
- ISR 中的副本在领导者视角下是“同步的”,即它们已经接收并复制了领导者的所有写入操作。
(2)ISR 的作用:
- 确保高可用性:当领导者失效时,Kafka 可以从 ISR 中选择一个新的领导者,而无需等待其他副本(不在 ISR 中的副本)复制完整份数据。
- 提高性能:只有 ISR 中的副本才参与到读写请求的处理中,这样可以减少网络延迟和提高性能。
7、kafka文件存储机制概述
(1)日志分段(Log Segments)
- 概念:
- Kafka 将每个主题的每个分区的消息以日志分段(Log Segments)的形式进行持久化存储。每个分区都有一个或多个日志分段,每个日志分段对应一个日志段文件。这种存储模型包括日志文件(Log Segments)和索引文件(Index Files),其存储在同一个文件夹下的不同文件中。
- 日志分段文件:
- 每个日志分段文件都是一个独立的文件,用于存储一定时间范围内或达到一定大小的消息数据。
- Kafka 使用预先配置的参数(例如
log.segment.bytes
控制默认大小,默认为 1GB)来确定何时创建新的日志分段文件。
- 日志分段文件的特点:
- 每个日志分段文件都有一个唯一的起始偏移量(Offset)和一个范围,记录了该段文件内包含的消息的偏移量范围。
- 新消息会追加到当前活跃的日志分段文件中,当文件大小达到预设阈值时,Kafka 将关闭该文件并创建一个新的日志分段文件。
(2) 索引文件(Index Files)
- 概念:
- 为了快速查找消息,Kafka 使用索引文件(Index Files)来存储消息偏移量和物理位置之间的映射关系。
- 索引文件按照预设大小(默认为 4KB)划分成索引条目,每个条目存储一段消息的偏移量和物理位置。
- 索引文件的作用:
- 当消费者需要读取消息时,它可以通过索引文件快速定位到对应消息的物理位置,而无需扫描整个日志文件。
- 索引文件大大提高了消息的读取效率,特别是在大型分区和高吞吐量的情况下。
(3) 清理策略(Retention Policy)
- 概念
- Kafka 支持基于时间和大小的消息保留策略,通过配置参数来控制消息在日志分段中的保留时长和大小。
- 主要参数包括:
log.retention.hours
:指定消息在分段中保留的时间,默认为 7 天。log.retention.bytes
:指定分段文件的最大大小,默认为 -1(无限制)。
- 清理过程
- 根据配置的保留策略,Kafka 定期扫描日志分段文件,并删除过期或超出大小限制的分段文件。
- 清理策略确保了存储空间的有效利用,同时保证了数据的有效性和可靠性。
(4) 日志压缩(Log Compaction)
- 概念:
- 对于一些需要保留最新状态的数据,Kafka 提供了日志压缩(Log Compaction)功能。
- 日志压缩会定期检查主题的消息,保留每个键的最新消息,而删除过时的或重复的消息,从而节省存储空间。
- 应用场景:
- 日志压缩特别适合用于存储键值对的场景,如状态存储或事件日志,确保最新状态的数据不会因为历史数据的堆积而被覆盖。
(5)具体配置
-
默认情况下,Kafka 的日志文件和索引文件存储在一个或多个目录中,这些目录由
log.dirs
参数指定。 -
log.dirs
参数可以配置为一个或多个目录路径,多个路径之间用逗号分隔。这样做的目的是为了提供数据的冗余备份和提高性能。 -
当 Kafka 创建新的日志分段文件或索引文件时,它会依次选择配置的目录路径之一来存储文件。
-
配置示例
在 Kafka 的配置文件(通常是 server.properties
)中,可以配置 log.dirs
参数,例如:
log.dirs=/path/to/kafka/logs
如果需要配置多个存储路径,可以用逗号分隔:
log.dirs=/path/to/kafka/logs1,/path/to/kafka/logs2
8、LEO(Log End Offset)和 HW(High Watermark)
(1)概念
- LEO(Log End Offset):指的是每个副本(包括 leader 和 follower)当前最大的消息偏移量(offset)。即,当前副本中最新消息的 offset。
- HW(High Watermark):指的是消费者能够见到的最大的 offset,也就是所有副本中最小的 LEO。HW 之前的所有消息被认为是已经提交和可靠的。
(2)处理机制
1. Follower 故障处理
- 故障情况
- 如果一个 follower 发生故障,它会暂时被踢出 ISR(In-Sync Replicas,同步副本集合)。
- 当 follower 恢复后,它会从本地磁盘读取上次的 HW,即从已经被确认的最高偏移量开始。
- follower 将本地 log 文件中高于 HW 的部分截掉,然后从 HW 开始向 leader 进行数据同步。
- 一旦 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader,就可以重新加入 ISR,参与到消息的复制和同步中。
2. Leader 故障处理
- 故障情况
- 如果 leader 发生故障,Kafka 会从 ISR 中选出一个新的 leader。
- 新的 leader 被选出后,其余的 follower 会进行以下操作:
- 首先,它们会将自己本地 log 文件中高于 HW 的部分截掉,保留 HW 之前的数据。
- 然后,它们开始从新的 leader 处获取数据进行同步,确保数据的一致性和可靠性。
9、kafka常用命令
(1)Kafka 根目录结构
Kafka 的安装目录结构通常如下所示:
kafka/
├── bin/ # 包含所有 Kafka 命令行工具的目录
├── config/ # 存放 Kafka 配置文件的目录
├── libs/ # 存放 Kafka 所需的库文件
├── logs/ # 存放 Kafka 日志文件的目录
└── ...
(2) 常用 Kafka 命令及使用示例
(2.1)创建和管理主题(Topics)
-
创建主题:
./bin/kafka-topics.sh --create --topic <topic_name> --partitions <num_partitions> --replication-factor <replication_factor> --bootstrap-server <broker_list> --replication-factor <replication_factor>:指定每个分区的副本数。 --bootstrap-server <broker_list>:指定连接的 Kafka Broker 列表。
-
示例:
./bin/kafka-topics.sh --create --topic myTopic --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
-
说明:在 Kafka 安装目录下的
bin/
目录中执行命令。
-
-
查看主题列表:
./bin/kafka-topics.sh --list --bootstrap-server <broker_list>
- 示例:
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
- 示例:
-
查看主题详细信息:
./bin/kafka-topics.sh --describe --topic <topic_name> --bootstrap-server <broker_list>
- 示例:
./bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092
- 示例:
(2.2)生产者和消费者操作
-
生产者发送消息:
./bin/kafka-console-producer.sh --topic <topic_name> --bootstrap-server <broker_list>
-
示例:
./bin/kafka-console-producer.sh --topic myTopic --bootstrap-server localhost:9092
-
-
消费者消费消息:
./bin/kafka-console-consumer.sh --topic <topic_name> --from-beginning --bootstrap-server <broker_list>
-
示例:
./bin/kafka-console-consumer.sh --topic myTopic --from-beginning --bootstrap-server localhost:9092
-
(2.3 )消费者组管理
-
查看消费者组列表:
./bin/kafka-consumer-groups.sh --list --bootstrap-server <broker_list>
-
示例:
./bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
-
-
查看消费者组消费情况:
./bin/kafka-consumer-groups.sh --describe --group <group_name> --bootstrap-server <broker_list>
-
示例:
./bin/kafka-consumer-groups.sh --describe --group myConsumerGroup --bootstrap-server localhost:9092
-
(2.4) 其他管理和调试命令
-
查看集群信息:
./bin/kafka-topics.sh --describe --topic <topic_name> --bootstrap-server <broker_list>
-
示例:
./bin/kafka-topics.sh --describe --topic myTopic --bootstrap-server localhost:9092
-
-
查看 Broker 日志:
tail -f logs/server.log
-
示例:在 Kafka 根目录下执行,查看当前 Kafka Broker 的日志。
tail -f logs/server.log
-
三、zookeeper
- ZooKeeper(动物园管理员)是一个开源的分布式协调服务,旨在提供一个高度可靠的协作基础,用于分布式应用程序中的数据管理、配置管理、命名服务等任务。
1、zookeeper的工作机制
ZooKeeper 的工作机制主要围绕其设计的分布式一致性服务和协作基础展开,它通过一些关键的设计和算法来实现高可靠性和可扩展性。以下是 ZooKeeper 的工作机制的详细解释。
(1)集群模式
-
ZooKeeper 集群:ZooKeeper 以集群的方式运行,每个集群节点称为 ZooKeeper 服务器。集群中的节点数量通常为奇数,例如 3、5、7 等,以实现多数派选举和容错性。
-
客户端连接:客户端通过连接到任意一个 ZooKeeper 服务器来与整个集群进行交互,这些服务器相互之间通过 TCP/IP 进行通信。
(2)数据模型
-
ZooKeeper 的数据模型:ZooKeeper 提供了类似文件系统的数据模型,称为 ZooKeeper 数据树(ZooKeeper Data Tree)。
-
/ ├── apps │ ├── app1 │ └── app2 ├── config │ ├── global │ │ ├── setting1 │ │ └── setting2 │ └── local │ └── setting3 └── services ├── service1 │ ├── instances │ │ ├── instance1 │ │ └── instance2 │ └── status └── service2
-
根节点
/
是 ZooKeeper 数据树的起始点。apps
、config
和services
是一级子节点。apps
下有app1
和app2
两个子节点。config
下有global
和local
两个子节点,global
下包含setting1
和setting2
。services
下有service1
和service2
,service1
下有instances
和status
两个子节点,instances
下有instance1
和instance2
。
-
-
节点(ZNode):ZooKeeper 数据树中的每个节点称为 ZNode,类似于文件系统中的目录或文件。每个 ZNode 可以存储一小段数据,并且可以关联 ACL(访问控制列表)来限制访问权限。
(3) 原子广播协议(ZAB)
-
ZAB 协议:ZooKeeper 使用 ZAB(ZooKeeper Atomic Broadcast)协议来实现数据的一致性和可靠性。
-
基本概念:
- Leader 选举:ZAB 协议确保集群中只有一个 Leader 负责处理客户端请求和更新。
- 原子广播:当 Leader 接收到客户端的写请求时,它将请求广播给所有的 Follower,并等待大多数 Follower 的确认,然后才将操作应用到自身状态。
- 事务 ID:每个事务都有一个全局唯一的事务 ID,用于标识和排序。
(4)一致性和持久性
-
一致性保证:ZooKeeper 保证了在分布式环境下的数据一致性,所有的更新操作都是原子性的,并且按照客户端的顺序执行。
-
持久性存储:ZooKeeper 将数据存储在磁盘上,并使用写前日志(Write-Ahead Log,WAL)来保证即使在节点故障后也不会丢失数据。
(5)事件监听和通知
-
Watch 机制:ZooKeeper 允许客户端注册 Watcher 监听某个 ZNode 的状态变化。
-
事件通知:一旦 ZNode 的状态发生变化(如数据更新、ZNode 删除等),ZooKeeper 将通知所有注册了 Watcher 的客户端,使得客户端可以及时响应状态变化。
2、应用场景
ZooKeeper 的工作机制使其在以下场景中广泛应用:
-
分布式锁:通过创建临时顺序节点实现分布式锁,确保在分布式环境下的资源竞争问题。
-
选举机制:通过 ZAB 协议实现 Leader 选举,确保分布式系统中只有一个主节点处理请求。
-
配置管理:存储和管理配置信息,各个节点通过 Watcher 实时感知配置的变化。
-
服务注册与发现:将服务节点作为 ZNode 注册到 ZooKeeper 中,客户端通过 Watcher 发现服务的上线和下线状态。
3、特点
分布式协调:
- ZooKeeper 提供了一个分布式环境下的一致性服务,帮助应用程序协调多个节点之间的操作和状态。
数据管理:
- 应用程序可以使用 ZooKeeper 存储和管理关键的元数据、配置信息以及其他动态数据。这些数据可以被多个节点共享和访问。
配置管理:
- ZooKeeper 提供了一个轻量级的分布式配置管理系统,可以集中管理和动态更新应用程序的配置。
命名服务:
- 作为一个命名服务,ZooKeeper 允许应用程序注册和发现服务、节点或者资源的信息,从而简化了分布式系统中的服务发现和通信。
分布式锁:
- ZooKeeper 提供了高效的分布式锁机制,允许多个进程或者线程在分布式环境中安全地竞争资源访问。
事件通知:
- 客户端可以注册监听器以接收关于数据变更和状态变化的通知,这使得应用程序可以实时响应系统状态的变化。
4、zookeeper 选举机制
(1)角色介绍
- Leader:负责处理所有的客户端写请求,并进行数据更新操作。Leader 也负责协调和同步所有的 Follower 节点。
- Follower:复制 Leader 的状态,并处理客户端的读请求。如果 Leader 失效,Follower 可以参与选举新的 Leader。
- Observer:观察者节点,接收并复制 Leader 的状态,但不参与投票和选举过程。用于扩展集群的读取能力,减少对 Leader 的读取压力。
(2)zookeeper 节点状态
-
LOOKING(选举状态):
-
节点正在寻找新的 Leader,即处于选举过程中。
-
发起选举请求,尝试成为新的 Leader。
-
-
FOLLOWING(跟随状态):
-
Follower 节点的状态。
-
节点已经确定当前的 Leader,并跟随 Leader 处理请求。
-
-
LEADING(领导状态):
-
Leader 节点的状态。
-
节点负责处理和协调整个集群的写请求和事务处理。
-
-
OBSERVING(观察状态):
-
Observer 节点的状态。
-
类似于 Follower,但专门用于扩展集群的读取能力。
-
不参与 Leader 的选举过程,仅处理读请求,提升系统的性能和吞吐量。
-
(3)第一次启动选举机制流程
-
启动阶段:
- 当一个新的 ZooKeeper 节点首次启动时,它会进入初始状态,并尝试加入到已经存在的集群中。
- 如果此时集群中还没有 Leader(即第一次启动或者之前的 Leader 已经失效),新节点会启动选举过程。
-
节点状态:
- 新节点将自己设置为 LOOKING 状态,表示它正在寻找新的 Leader。
-
投票:
- 新节点会向集群中的其他节点发送选举请求,请求其他节点投票支持自己成为 Leader。
- 每个节点会为自己投票,并将投票结果广播给整个集群。
-
多数派原则:
- 根据 ZooKeeper 的多数派原则,如果新节点能够获得超过半数节点的投票支持(N/2 + 1),它将成为新的 Leader。
- 这确保了选出的 Leader 能够获得集群的大部分节点的认可和支持。
-
Leader 确定:
- 一旦新节点获得了多数节点的投票支持,它将成为新的 Leader。
- 其他节点将根据新的 Leader 的通知和广播更新自己的状态,标记新的 Leader。
-
系统启动:
- 一旦选举完成,ZooKeeper 系统将正式启动,新的 Leader 节点开始处理客户端的写请求,并协调整个集群的状态。
(4)注意事项
-
节点数和半数原则:
- ZooKeeper 集群中至少需要有奇数个节点,这样才能确保在选举中始终有一半以上的节点支持新的 Leader 的选举。
-
超时机制:
- 在选举过程中,每个节点会设置一个超时时间。如果在超时时间内没有达成半数投票,节点将重新发起新一轮的选举。
-
持久化:
- ZooKeeper 通过持久化存储和写前日志(WAL)来保证选举操作和数据更新的持久性,确保即使在节点故障或重启后,系统状态可以恢复到之前的正确状态。
(5)非第一次选举机制
- 非第一次选举机制是指在已经运行的 ZooKeeper 集群中,当现有的 Leader 出现故障或失效时,集群如何重新选举新的 Leader 的过程。这种选举机制的执行确保了集群在面对节点故障或网络分区等情况时能够快速恢复并保持高可用性。
-
Leader 失效检测:
- 集群中的各个节点(包括 Followers 和 Observers)会定期检测 Leader 节点的健康状态。
- 如果节点发现 Leader 失效(例如无法与 Leader 通信或未能及时响应),则认为当前 Leader 失效。
-
触发选举:
- 一旦有足够多的节点检测到 Leader 失效,它们会自动启动选举过程。
- 触发选举的节点会向集群中广播选举请求,希望其他节点投票支持自己成为新的 Leader。
-
投票过程:
- 每个节点收到选举请求后会为自己投票,并将投票信息广播给集群中的其他节点。
- 节点根据收到的投票数量来判断是否获得了多数派的支持(超过半数节点的投票支持)。
-
选举规则:
-
EPOCH(选举周期)优先:
- 每个服务器都有一个称为EPOCH的选举周期标识。EPOCH较大的服务器将优先成为新的Leader。
- 这确保了如果有多个服务器同时尝试成为Leader,具有最新选举周期的服务器将胜出。
事务ID(ZXID)优先:
- 如果多个服务器的EPOCH相同,ZooKeeper将比较它们最近的事务ID(ZXID)。
- 具有较大ZXID的服务器将优先成为Leader。ZXID表示ZooKeeper事务的全局唯一标识符,通常较大的ZXID表示服务器上存储的最新状态。
服务器ID(SID)优先:
- 如果EPOCH和ZXID都相同,ZooKeeper将比较服务器的ID(SID)。
- 具有较大SID的服务器将成为新的Leader。服务器ID是在集群配置时分配的唯一标识符,较大的SID通常意味着服务器在集群中具有更高的优先级。
-
-
多数派原则:
- 根据 ZooKeeper 的多数派原则,只有获得多数节点的投票支持的节点才能成为新的 Leader。
- 这确保了选出的新 Leader 能够获得集群中大部分节点的认可和支持,从而确保系统的一致性和可用性。
-
新 Leader 确定:
- 一旦某个节点获得了足够的投票支持,它将成为新的 Leader。
- 其他节点将根据新 Leader 的通知和广播更新自己的状态,标记新的 Leader。
-
系统恢复:
- 新的 Leader 被选举出来后,它将接管集群的所有写请求和事务处理,确保集群状态的一致性和可用性。
- 集群恢复正常运行,继续处理客户端的请求和事务操作。
(6)注意事项
- 超时机制:
- 在选举过程中,每个节点会设置一个超时时间。如果在超时时间内没有达成半数投票,节点将重新发起新一轮的选举,确保集群能够迅速恢复和响应。
- 节点状态变更:
- 节点在从 FOLLOWING 或 OBSERVING 状态切换到 LOOKING 状态后,开始参与选举过程。
- 选举完成后,节点可能会切换到 FOLLOWING 或 OBSERVING 状态,根据其在集群中的角色来处理读取请求或观察集群状态。
- 持久化和日志:
- ZooKeeper 通过持久化存储和写前日志(WAL)来保证选举操作和数据更新的持久性,即使在节点故障或重启后,系统状态也能够快速恢复到之前正确的状态。
5、zookeeper常用命令
(1)启动和管理 ZooKeeper 服务
-
启动 ZooKeeper 服务
bin/zkServer.sh start
-
停止 ZooKeeper 服务
bin/zkServer.sh stop
-
重启 ZooKeeper 服务
bin/zkServer.sh restart
-
查看 ZooKeeper 服务状态
bin/zkServer.sh status
(2)使用 ZooKeeper 客户端
-
启动 ZooKeeper 客户端
bin/zkCli.sh
-
连接到特定的 ZooKeeper 服务器
bin/zkCli.sh -server 127.0.0.1:2181
(3)ZooKeeper 客户端命令
在启动了 ZooKeeper 客户端后,可以使用以下命令:
-
创建 znode
create /my-node "some data"
-
获取 znode 数据
get /my-node
-
设置 znode 数据
set /my-node "new data"
-
删除 znode
delete /my-node
-
列出 znode
ls /
-
查看 znode 状态
stat /my-node
(4)高级管理命令
-
设置集群模式 在
conf/zoo.cfg
中配置多台服务器:server.1=server1:2888:3888 server.2=server2:2888:3888 server.3=server3:2888:3888
-
管理 ACL(访问控制列表)
setAcl /my-node world:anyone:r
(5)系统服务管理
将 ZooKeeper 设置为系统服务可以简化管理:
-
创建 Systemd 服务文件 在
/etc/systemd/system
目录下创建zookeeper.service
文件:[Unit] Description=ZooKeeper After=network.target [Service] Type=forking ExecStart=/path/to/zookeeper/bin/zkServer.sh start ExecStop=/path/to/zookeeper/bin/zkServer.sh stop ExecReload=/path/to/zookeeper/bin/zkServer.sh restart User=zookeeper Group=zookeeper Restart=on-failure [Install] WantedBy=multi-user.target
-
启动和启用 ZooKeeper 服务
systemctl daemon-reload systemctl start zookeeper systemctl enable zookeeper
四、zookeeper + kafka集群搭建
1、项目需求
服务器 | 部署 |
---|---|
192.168.20.140 | zookeeper、kafka |
192.168.20.141 | zookeeper、kafka |
192.168.20.142 | zookeeper、kafka |
2、zookeeper集群搭建
(1)关闭防火墙
systemctl stop firewalld
setenforce 0
(2)安装jdk和zookeeper
(3)移动zookeeper到/usr/local下
mv zookeeper /usr/local/zookeeper
(4)配置zookeeper
# 复制模板配置文件
cp zoo_sample.cfg zoo.cfg
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
tickTime=2000
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
initLimit=10
#Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认 为Follwer死掉,并从服务器列表中删除Follwer
syncLimit=5
#修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataDir=/tmp/zookeeper
#添加, 指定存放日志的目录,目录需要单独创建
dataLogDir=/tmp/logs
#客户端连接端口
clientPort=2181
#添加集群信息
server.1=192.168.20.140:3188:3288
server.2=192.168.20.141:3188:3288
server.3=192.168.20.142:3188:3288
server.A=B:C:D
●A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件
myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面
的配置信息比较从而判断到底是哪个server
●c是这个服务器Follower与集群中的Leader服务器交换信息的端口
●D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就
是用来执行选举时服务器相互通信的端口
如果指定节点不参加选举,在末尾加observer
server.3=192.168.19.102:3188:3288:observer
#在每个节点上创建数据目录和日志目录
mkdir /tmp/zookeeper
mkdir /tmp/logs
#在每个节点的dataDir指定的目录下创建一个myid的文件
echo 1 > /tmp/zookeeper/myid
echo 2 > /tmp/zookeeper/myid
echo 3 > /tmp/zookeeper/myid
(5)将zookeeper加入到系统服务管理
cd /etc/systemd/system
vim zookeeper.service
[Unit]
Description=ZooKeeper
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
Restart=on-failure
[Install]
WantedBy=multi-user.target
# 写入后加载配置
systemctl daemon-reload
3、kafka集群搭建
(1)解压kafka压缩包,将其移动到/usr/local目录下
(2)配置kafka的配置文件
cd /kafka/config
# 备份配置文件
cp server.properties{,.bak}
vim server.properties
broker.id=0
#21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置
broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.19.100:9092
broker的IP需区分开来,也可保持默认配置不用修改
num.network.threads=3
#42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.io.threads=8
#45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400
#48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
#51行,接收套接字的缓冲区大小
#54行,请求套接字的缓冲区大小
#31行,指定监听的IP和端口,可以修改每个
#60行,kafka运行日志存放的路径,也是数据存放的路径
num.partitions=1
#65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1
log.retention.hours=168
#69行,用来恢复和清理data下数据的线程数量
#103行,segment文件(数据文件)保留的最长时间,单位为小时,默
认为7天,超时将被删除
log.segment.bytes=1073741824
#110行,一个segment文件最大的大小,默认为 1G,超出将新建
一个新的segment文件
zookeeper.connect=192.168.20.140:2181,192.168.20.141:2181,192.168.20.142:2181
#123行,配置连接Zookeeper集群地址
# 如果设备延迟高,可以将zookeeper的连接超时时间改高一些
zookeeper.connection.timeout.ms=30000
(3)配置kafka启动脚本
#修改环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
#配置 Kafka 启动脚本
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
echo "---------- Kafka 启动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 状态 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ]; then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
exit 1
;;
esac
# 添加权限
chmod +x /etc/init.d/kafka
(4)启动kafka,创建topic
service kafka start
# 创建topic
./kafka-topics.sh --create --zookeeper 192.168.20.140:2181,192.168.20.141:2181,192.168.20.142:2181 --replication-factor 2 --partitions 3 --topic test