ZooKeeper分布式服务与Kafka消息队列+ELKF整合方案

 前言

ZooKeeper 是一个分布式的、开放源码的分布式应用程序协调服务,提供配置维护、命名服务、分布式同步、组服务等功能;

Kafka 是一个开源的分布式流处理平台,它被设计用来处理实时数据流,包括发布和订阅消息系统、日志收集以及作为事件流数据平台;

在 Kafka 集群中,ZooKeeper 用于协调和管理 Kafka broker 的状态、集群的配置信息以及其他关键元数据。结合使用时,ZooKeeper 提供了 Kafka 所需的集群管理和协调功能,使得 Kafka 集群更加稳定可靠,同时也简化了 Kafka 集群的管理和运维。

目录

一、ZooKeeper 服务

1. 简介

2. 工作机制

3. 特点

4. Zookeeper 数据结构

5. 应用场景

6. 选举机制

6.1 第一次启动选举机制

6.2 非第一次启动选举机制

6.2.1 Leader 选举两种情况

6.2.2 Leader 选举流程时状态

二、部署 Zookeeper 集群

1. 环境准备

2. 下载 zookeeper 安装包

3. 安装 zookeeper 

4. 创建数据目录和日志目录

5. 创建 myid 文件 

6. 配置 Zookeeper 启动脚本

7. 分别启动 Zookeeper 

三、Kafka 消息队列

1. 概述

2. 使用消息队列的好处 

2.1 解耦

2.2 可恢复性

2.3 缓冲

2.4 灵活性及峰值处理能力

2.5 异步通信

3. 消息队列的两种模式

3.1 点对点模式

3.2 发布/订阅模式

4. Kafka 的特性

5. Kafka 系统架构

5.1 架构介绍

5.2 Partation 数据路由规则

5.3 分区原因

四、部署 Kafka 集群

1. 环境准备

2. 下载 Kafka 安装包

3. 安装 Kafka

4. 配置 Zookeeper 启动脚本

5. 分别启动 Kafka

6. Kafka 命令行操作

7. 创建主题一般故障 

五、Kafka+ELKF

1. 修改 filebeat 的主配置文件 

2. 在 filebeat 节点安装 apache

3. 新建一个 Logstash kafka 配置文件

4. 查看所有的索引

5. 登录 Kibana 添加索引 


一、ZooKeeper 服务

1. 简介

Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的 Apache 项目。

2. 工作机制

是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。也就是说 Zookeeper = 文件系统 + 通知机制。

3. 特点

① 一个领导者,多个跟随着组成的集群

② 集群中只要有半数以上存活,就可以正常服务,适合安装奇数台服务器,最少3台

③ 全局数据一致,每个 server 保存一份相同的数据副本,client 无论连接到哪个 server,数据都是一致的

④ 更新请求顺序执行,来自同一个 client 的更新请求按其发送顺序依次执行,即先进先出

⑤ 数据更新原子性,一次数据更新要么成功,要么失败

⑥ 实时性,在一定时间范围内,client 能读到最新数据

4. Zookeeper 数据结构

ZooKeeper 数据模型的结构与 Linux 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储1MB的数据,每个 ZNode 都可以通过其路径唯一标识。 

当 Kafka 与 ZooKeeper 集成时,Kafka 会使用 ZooKeeper 来存储其元数据和配置信息。如果用户与 Kafka 断开连接,Kafka 可能会丢失对 ZooKeeper 中特定路径下数据的访问权限或连接状态。当用户再次连接时,Kafka 可以利用先前定义的路径来重新建立与 ZooKeeper 的连接,并恢复先前的状态,以便继续正常运行。

5. 应用场景

① 统一命名服务:

  • 在分布式环境中,应用/服务需要统一的命名以便于识别。例如,将IP地址转换为易记的域名

② 统一配置管理:

  • 配置同步:在分布式环境下,要求集群中所有节点的配置信息是一致的,如 Kafka 集群。通过ZooKeeper,配置信息可以被写入一个 Znode,并且各个客户端服务器可以监听这个 Znode。任何对 Znode 中数据的修改都会被 ZooKeeper 通知到各个客户端服务器
  • 快速同步:修改后的配置信息能够快速同步到各个节点上

③ 统一集群管理:

  • 实时状态监控:在分布式环境中,实时了解每个节点的状态是必要的,因为可以根据节点状态做出调整。ZooKeeper 可以实现实时监控节点状态变化,将节点信息写入 ZooKeeper 上的一个 ZNode,并监听这个 ZNode 以获取实时状态变化

④ 服务器动态上下线:

  • ZooKeeper 可以让客户端实时感知服务器的上下线变化

⑤ 软负载均衡:

  • 在 ZooKeeper 中记录每台服务器的访问数,从而让访问数最少的服务器去处理最新的客户端请求 

6. 选举机制

在 ZooKeeper 中,选举是通过投票机制来实现的。每个服务器都有权投票给自己和其他服务器,并且一旦某个服务器获得超过半数的选票,它就会成为新的 Leader。

6.1 第一次启动选举机制

服务器启动票数myid状态角色

server1

server1:1

1

looking

server2

server1:0

server2:2

2

looking

server3

server1:0

server2:0

server2:3

3

server1:follower

server2:follower

server3:leader

server4

server1:0

server2:0

server3:3

server4:1

4

server1:follower

server2:follower

server3:leader

server4:follower

server5

server1:0

server2:0

server3:3

server4:1

server5:1

5

server1:follower

server2:follower

server3:leader

server4:follower

server5:follower

① server1 启动,发起一次选举。投自己一票。此时服务器1为一票,不够半数以上(3票),选举无法完成,server1 状态保持为 looking;

② server2 启动,再发起一次选举。server1、server2 各投自己一票并交换选票信息:此时server1 发现 server2 的 myid 比自己目前投票推举的(server1)大,更改选票为推举 server2。此时server1 票数0票,server2 票数2票,没有半数以上结果,选举无法完成,server1、server2 状态保持 looking;

③ server3 启动,发起一次选举。此时server1、server2 都会更改选票为 server3。此次投票结果:server1 为0票,server2 为0票,server3 为3票。此时server3 的票数已经超过半数,server3当选 leader。server1、server2 更改状态为 follower,server3 更改状态为 leader;

④ server4 启动,发起一次选举。此时server1,2,3已经不是 looking 状态,不会更改选票信息。交换选票信息结果:server3 为3票,server4 为1票。此时server4 服从多数,更改选票信息为server3,并更改状态为 follower;

⑤ server5 启动,同 server4 一样为 follower。 

6.2 非第一次启动选举机制

6.2.1 Leader 选举两种情况

当 ZooKeeper 集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:

① 服务器初始化启动

② 服务器运行期间无法和 Leader 保持连接

6.2.2 Leader 选举流程时状态

当一台机器进入 Leader 选举流程时,当前集群也可能会处于以下两种状态:

① 集群中本来就已经存在一个 Leader

  • 仅需要和 Leader 机器建立连接,并进行状态同步即可

② 集群中确实不存在 Leader 

假设 ZooKeeper 由5台服务器组成,SID(唯一标识符,通常也称为 myid)分别为1、2、3、4、5,ZXID(事务ID)分别为8、8、8、7、7,并且此时 SID 为3的服务器是Leader。此时,3和5服务器出现故障,因此开始进行 Leader 选举。

在进行 Leader 选举时,按照以下规则进行:

  • 如果EPOCH值大的服务器直接胜出
  • 如果EPOCH相同,比较事务ID(ZXID),事务ID大的胜出
  • 如果EPOCH和事务ID都相同,则比较服务器ID(SID),服务器ID大的胜出

在ZooKeeper中,Epoch(时代)是用来标识一轮Leader选举过程的编号。每当进行一次新的Leader选举时,Epoch会递增,用于区分不同的选举轮次。由于没有 Leader 时同一轮投票过程中的逻辑时钟值是相同的,所以不存在 EPOCH 最大值;同时也不存在 ZXID 最大值。因此,该示例最终 server4 成为新的 Leader。

二、部署 Zookeeper 集群

1. 环境准备

服务器ip

节点名称myid/角色软件版本jdk版本操作系统
192.168.190.104zk-kfk011/followzookeeper-3.5.7openjdk version "1.8.0_131"centos7.4
192.168.190.105zk-kfk022/followzookeeper-3.5.7openjdk version "1.8.0_131"centos7.4
192.168.190.106zk-kfk033/leaderzookeeper-3.5.7openjdk version "1.8.0_131"centos7.4
systemctl stop firewalld.service 
setenforce 0
[root@localhost ~]# java -version
openjdk version "1.8.0_131"        # 自带java环境openjdk

# 编辑域名解析,制作映射,加快传输速度
echo 192.168.190.104 zk-kfk01 >> /etc/hosts
echo 192.168.190.105 zk-kfk02 >> /etc/hosts
echo 192.168.190.106 zk-kfk03 >> /etc/hosts

2. 下载 zookeeper 安装包

三台节点一起配置:

[root@localhost ~]# cd /opt/
[root@localhost opt]# wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
[root@localhost opt]# ls
apache-zookeeper-3.5.7-bin.tar.gz

3. 安装 zookeeper 

三台节点一起配置:

[root@localhost opt]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
[root@localhost opt]# mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
[root@localhost opt]# cd /usr/local/zookeeper-3.5.7/conf/
[root@localhost conf]# ls
configuration.xsl  log4j.properties  zoo_sample.cfg 
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg
[root@localhost conf]# vim zoo.cfg    # 修改配置文件
  2 tickTime=2000                     # 通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
  5 initLimit=10                      # Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
  8 syncLimit=5                       # Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
 12 dataDir=/usr/local/zookeeper-3.5.7/data    # 修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
 13 dataLogDir=/usr/local/zookeeper-3.5.7/logs # 添加,指定存放日志的目录,目录需要单独创建
 15 clientPort=2181                   # 客户端连:接端口
 30 server.1=192.168.190.104:3188:3288
 31 server.2=192.168.190.105:3188:3288
 32 server.3=192.168.190.106:3188:3288
server.A=B:C:D
# A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
# B是这个服务器的地址。
# C是这个服务器Follower与集群中的Leader服务器交换信息的端口。
# D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

4. 创建数据目录和日志目录

三台节点一起配置:

[root@localhost ~]# mkdir /usr/local/zookeeper-3.5.7/data
[root@localhost ~]# mkdir /usr/local/zookeeper-3.5.7/logs

5. 创建 myid 文件 

在每个节点的 dataDir 指定的目录下创建一个 myid 的文件,dataDir 是用于存储 ZooKeeper 服务器数据的目录。

[root@zk-kfk01 ~]# echo 1 > /usr/local/zookeeper-3.5.7/data/myid
[root@zk-kfk02 ~]# echo 2 > /usr/local/zookeeper-3.5.7/data/myid
[root@zk-kfk03 ~]# echo 3 > /usr/local/zookeeper-3.5.7/data/myid

6. 配置 Zookeeper 启动脚本

 三台节点一起配置:

vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90     # 在运行级别2、3、4、5下,在启动优先级顺序为20,在关闭顺序为90。
#description:Zookeeper Service Control Script
ZK_HOME='/usr/local/zookeeper-3.5.7'              # 定义了 ZooKeeper 的安装路径
case $1 in                                        # 开始一个 case 语句,根据传入的参数进行不同的操作
start)                                            # 如果参数是 "start",则执行 ZooKeeper 启动命令
	echo "---------- zookeeper 启动 ------------" 
	$ZK_HOME/bin/zkServer.sh start
;;
stop)                                             # 如果参数是 "stop",则执行 ZooKeeper 停止命令
	echo "---------- zookeeper 停止 ------------"
	$ZK_HOME/bin/zkServer.sh stop
;; 
restart)                                          # 如果参数是 "restart",则执行 ZooKeeper 重启命令。
	echo "---------- zookeeper 重启 ------------"
	$ZK_HOME/bin/zkServer.sh restart
;;
status)                                           # 如果参数是 "status",则执行 ZooKeeper 查看状态命令
	echo "---------- zookeeper 状态 ------------"
	$ZK_HOME/bin/zkServer.sh status
;;
*)                                                # 如果参数不匹配以上任何情况,则显示使用说明
    echo "Usage: $0 {start|stop|restart|status}"
esac
# 管理ZooKeeper服务,通过传入不同的参数来启动、停止、重启和查看状态

7. 分别启动 Zookeeper 

[root@zk-kfk01 ~]# chmod +x /etc/init.d/zookeeper
[root@zk-kfk01 ~]# chkconfig --add zookeeper
# 将 "zookeeper" 服务添加到系统的服务管理列表中,并且配置它在系统启动时自动运行
# 前提创建一个名为 "zookeeper" 的服务脚本(通常是放在 /etc/init.d/ 目录下)

依次启动,不要一起启动:
[root@zk-kfk01 ~]# service zookeeper start
[root@zk-kfk02 ~]# service zookeeper start
[root@zk-kfk03 ~]# service zookeeper start
---------- zookeeper 启动 ------------
/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

[root@zk-kfk01 ~]# service zookeeper status  # 查看当前状态
---------- zookeeper 状态 ------------
/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

三、Kafka 消息队列

1. 概述

Kafka 即消息队列,简称 MQ。在高并发环境下,当同时处理大量请求访问服务器,往往请求会发生阻塞。如大量访问数据库,导致锁表引发雪崩效应;使用消息队列,可以异步处理请求:流量消峰、应用解耦从而缓解压力。当 Kafka 组合集群每秒可以处理几十万条数据请求,大大增加了抗高并发能力。

当前比较常见的 MQ 中间件有 ActiveMQ(淘汰)、RabbitMQ(用的多)、RocketMQ(企业常用)、Kafka(大数据分析、大量数据收集) 等。

2. 使用消息队列的好处 

2.1 解耦

通过消息队列来实现系统内部各个组件或模块之间的解耦合,即两边遵守约定,则允许通信。

假设有一个电子商务平台,其中订单系统、库存系统和物流系统需要相互通信。通过消息队列解耦,它们可以实现以下功能:

  • 订单下单:当顾客下单时,订单系统将订单信息发布到消息队列中,而不需要直接调用库存系统或物流系统的接口
  • 库存更新:库存系统订阅了订单系统发布的消息队列,一旦接收到订单信息,它会检查库存并更新库存状态
  • 物流处理:同时,物流系统也订阅了订单系统发布的消息队列,一旦接收到订单信息,它会开始安排物流配送

这种架构下,各个系统之间并不直接依赖于彼此的接口,而是通过消息队列进行松耦合的通信。

2.2 可恢复性

通过消息队列,即使某个处理消息的组件出现故障,系统仍然可以保持稳定运行。消息会被安全地存储在队列中,等待故障组件重新上线后进行处理,从而提高了整个系统的可靠性。

2.3 缓冲

消息队列可以作为缓冲层,帮助控制系统中数据流的速度。

2.4 灵活性及峰值处理能力

消息队列可以帮助系统应对突发的访问量增加,确保关键组件能够应对压力而不至于崩溃。这提供了系统在面对异常情况下的弹性和稳定性。

2.5 异步通信

允许用户将消息放入队列,而不需要立即处理;可以提高系统的响应速度和整体效率,同时也降低了对实时处理的需求。

3. 消息队列的两种模式

3.1 点对点模式

即一对一,消息收到后消息清除。在点对点模式中,消息生产者将消息发送到队列中,然后消息消费者从队列中主动拉取并处理消息。一旦消息被某个消费者处理完毕,它就会从队列中清除,因此同一条消息只能被一个消费者处理。这种模式适用于需要确保每条消息只被处理一次的场景。

3.2 发布/订阅模式

即一对多,又叫观察者模式,消费者消费数据之后不会清除消息。观察者的作用是接收消息队列中特定主题或频道上的消息,并通知消费者或其他观察者。在发布/订阅模式中,消息生产者将消息发布到特定的主题(topic)中,而多个消息消费者可以订阅这个主题并独立地消费消息。与点对点模式不同,发布到主题的消息会被所有订阅者接收和消费。这种模式适用于需要将消息广播给多个订阅者的场景,例如直播。

4. Kafka 的特性

① 高吞吐量、低延迟

  • Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。每个 topic 可以分多个 Partition,Consumer Group 对 Partition 进行消费操作,提高负载均衡能力和消费能力

② 可扩展性

  • kafka 集群支持热扩展

③ 持久性、可靠性

  • 消息被持久化到本地磁盘,并且支持数据备份冗余

④ 容错性

  • 允许集群中节点失败(多副本情况下,若副本数量为 n,则允许 n-1 个节点失败)

⑤ 高并发

  • 支持数千个客户端同时读写

5. Kafka 系统架构

5.1 架构介绍

① Broker 服务器

Broker 服务器是 Kafka 中的服务器单元,每个 Broker 可以理解为一个 Kafka 实例,负责存储和管理消息。一台 Kafka 服务器就是一个 Broker,一个集群由多个 Broker 组成,一个 Broker 可以容纳多个 Topic。

② Topic 主题

主题是消息的逻辑容器,类似于消息队列中的队列。生产者发布消息到主题,消费者从主题订阅消息。每个主题可以划分为多个分区,以便实现数据的分布和负载均衡。

③ Partition 分区

为了实现扩展性和高吞吐量,一个主题可以分割为多个分区,每个分区是一个有序的队列。分区允许主题的数据分布到多个 broker 上,并且提供了并行处理消息的能力。Kafka 保证每个分区内的消息顺序,但不保证不同分区之间的消息顺序。或者说 Kafka 只保证 Partition 内的记录是有序的,而不保证 Topic 中不同 Partition 的顺序。

每个 Topic 至少有一个 Partition,当生产者产生数据的时候,会根据分配策略选择分区,然后将消息追加到指定的分区的队列末尾。

5.2 Partation 数据路由规则

① 指定了 Patition:如果消息的生产者明确指定了分区(Partition),则消息将被直接发送到该分区中;

② 未指定 Patition 但指定 Key(相当于消息中某个属性):如果消息的生产者没有指定分区,但指定了一个键(Key),则根据该键的值进行哈希运算,并取模以确定消息应该发送到的分区;

③ Patition 和 Key 都未指定:如果消息的发送者既没有指定分区,也没有指定键,则系统将根据轮询的方式从可用的分区中选取一个分区。

关于分区的一些补充信息:

  • 消息偏移量编号: 每条消息都有一个自增的编号,用于标识消息的偏移量,从0开始标识顺序。
  • 分区中的数据存储方式: 每个分区使用多个 Segment 文件来存储数据。
  • 严格保证消息消费顺序的情况下: 如果有严格保证消息消费顺序的需求(比如商品秒杀、抢红包等),可以将分区数目设置为1,这样可以保证消息在消费时的顺序性。

 关于 Broker 存储数据的情况:

  • Broker 存储 Topic 数据: 如果某个 Topic 有 N 个分区,那么集群中的每个 Broker 存储该 Topic 的一个分区。
  • Broker 数量与 Topic 分区数的关系: 如果某个 Topic 有 N 个分区,而集群中有 N+M 个 Broker,那么其中的 N 个 Broker 分别存储该 Topic 的一个分区,而剩下的 M 个 Broker 不存储该 Topic 的分区数据。
  • Broker 数量少于 Topic 分区数的情况: 如果某个 Topic 有 N 个分区,但集群中的 Broker 数量少于 N 个,那么一个 Broker 可能会存储该 Topic 的一个或多个分区。在实际生产环境中,应尽量避免这种情况,因为这可能导致 Kafka 集群数据不均衡。

5.3 分区原因

  • 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 Topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
  • 以 Partition 为单位读写,Kafka 使用分区来方便集群扩展和提高并发性,

① Replica 副本

为了防止节点故障导致数据丢失,Kafka 提供了副本机制。每个分区都有若干个副本,其中一个是 Leader,负责读写,其他是 Follower,负责备份。

② Leader 领导者

每个 Partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 Partition。

③ Follower 跟随者

Follower 跟随 Leader,负责备份数据。如果 Leader 故障,会从 Follower 中选举出新的 Leader。

④ producer 生产者

生产者即数据的发布者,将消息发布到 Kafka 的 topic 中,可以指定数据存储的分区。Broker 接收到生产者发送的消息后,Broker 将该消息追加到当前用于追加数据的 Segment(通过 Segment.io 收集的数据所存储的文件)文件中。

⑤ Consumer 消费者

从 Kafka 中拉取数据进行消费,可以消费多个 topic。

⑥ Consumer Group(CG)消费者组

多个消费者组成一个消费者组,每个组内的消费者负责消费不同分区的数据,防止数据被重复读取;将多个消费者集中到一起去处理某一个 Topic 的数据,可以更快的提高数据的消费能力。

⑦ offset 偏移量

唯一标识一条消息的位置信息,消费者通过偏移量确定下次读取消息的位置。消息最终还是会被删除的,默认生命周期为 1 周(7*24小时)。

⑧ Zookeeper 

Kafka 使用 Zookeeper 存储集群的元信息,例如 broker 的地址和分区的状态,同时用于管理消费者组的偏移量。Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中;从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为 __consumer_offsets。

总之,zookeeper 的作用就是,生产者 push 数据到 kafka 集群,就必须要找到 kafka 集群的节点在哪里,这些都是通过 zookeeper 去寻找的。消费者消费哪一条数据,也需要 zookeeper 的支持,从 zookeepe r获得 offset,offset 记录上一次消费的数据消费到哪里,这样就可以接着下一条数据进行消费。

四、部署 Kafka 集群

1. 环境准备

服务器ip

节点名称myid/角色软件版本jdk版本操作系统
192.168.190.104zk-kfk011/follow

zookeeper-3.5.7

kafka_2.13-2.7.1

openjdk version "1.8.0_131"centos7.4
192.168.190.105zk-kfk022/follow

zookeeper-3.5.7

kafka_2.13-2.7.1

openjdk version "1.8.0_131"centos7.4
192.168.190.106zk-kfk033/leader

zookeeper-3.5.7

kafka_2.13-2.7.1

openjdk version "1.8.0_131"centos7.4

2. 下载 Kafka 安装包

三台节点一起操作:

官方下载地址:http://kafka.apache.org/downloads.html
[root@zk-kfk01 opt]# wget https://archive.apache.org/dist/kafka/2.7.1/kafka_2.13-2.7.1.tgz
[root@zk-kfk01 opt]# ls
kafka_2.13-2.7.1.tgz

3. 安装 Kafka

三台节点一起操作:注意 broker.id 和 listeners 地址

[root@zk-kfk01 opt]# tar zxvf kafka_2.13-2.7.1.tgz
[root@zk-kfk01 opt]# mv kafka_2.13-2.7.1 /usr/local/kafka
修改配置文件:
[root@zk-kfk01 opt]# cd /usr/local/kafka/config/
[root@zk-kfk01 config]# cp server.properties{,.bak}  # 备份配置文件
[root@zk-kfk01 config]# vim server.properties
 21 broker.id=0(1,2)      # broker的全局唯一编号,每个broker不能重复,另外两台broker.id=1、broker.id=2
 31 listeners=PLAINTEXT://192.168.190.103:9092 (104,105) 
# 指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
 42 num.network.threads=3  # broker处理网络请求的线程数量,一般情况下不需要去修改
 45 num.io.threads=8       # 用来处理磁盘IO的线程数量,数值应该大于硬盘数
 48 socket.send.buffer.bytes=102400 # 发送套接字的缓冲区大小
 51 socket.receive.buffer.bytes=102400 # 接收套接字的缓冲区大小
 54 socket.request.max.bytes=104857600 # 请求套接字的缓冲区大小
 60 log.dirs=/usr/local/kafka/logs     # kafka运行日志存放的路径,也是数据存放的路径
 65 num.partitions=1       # topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
 69 num.recovery.threads.per.data.dir=1 # 用来恢复和清理data下数据的线程数量
103 log.retention.hours=168 # segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
110 log.segment.bytes=1073741824 # 一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
123 zookeeper.connect=192.168.190.104:2181,192.168.190.105:2181,192.168.190.106:2181  
# 配置连接Zookeeper集群地址

修改环境变量:
[root@zk-kfk01 ~]# vim /etc/profile
export KAFKA_HOME=/usr/local/kafka  # 将一个名为 KAFKA_HOME 的环境变量设置为 /usr/local/kafka,这表示 Kafka 的安装路径
export PATH=$PATH:$KAFKA_HOME/bin   # 将 Kafka 的 bin 目录添加到系统的 PATH 环境变量中
[root@zk-kfk01 ~]# source /etc/profile # 重新加载 /etc/profile 文件中设置的环境变量

4. 配置 Zookeeper 启动脚本

三台节点一起操作:

[root@zk-kfk01 ~]# vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'               # 定义了 Kafka 的安装路径
case $1 in                                  # 开始一个 case 语句,根据传入的参数进行不同的操作
start)                                      # 如果参数是 "start",则执行 Kafka 启动命令
	echo "---------- Kafka 启动 ------------"
	${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)                                       # 如果参数是 "stop",则执行 Kafka 停止命令
	echo "---------- Kafka 停止 ------------"
	${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)                                    # 如果参数是 "restart",则先执行 Kafka 停止命令再执行 Kafka 启动命令
	$0 stop 
	$0 start
;;
status)                                     # 如果参数是 "status",则检查 Kafka 进程是否在运行,并输出相应的状态信息
	echo "---------- Kafka 状态 ------------"
	count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
	if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)                                          # 如果参数不匹配以上任何情况,则显示使用说明
    echo "Usage: $0 {start|stop|restart|status}"
esac
# 管理 Kafka 服务,可以通过传入不同的参数来启动、停止、重启和查看状态

5. 分别启动 Kafka

[root@zk-kfk02 ~]# chmod +x /etc/init.d/kafka
[root@zk-kfk02 ~]# chkconfig --add kafka
依次启动 kafaka:
[root@zk-kfk01 ~]# service kafka start   # 启动 Kafka
[root@zk-kfk02 ~]# service kafka start
[root@zk-kfk03 ~]# service kafka start
---------- Kafka 启动 ------------
[root@zk-kfk01 ~]# service kafka status  # 查看状态
---------- Kafka 状态 ------------
kafka is running

6. Kafka 命令行操作

① 创建一个名为 "test" 的 Kafka 主题(topic):任意一台机器即可

[root@zk-kfk01 ~]# kafka-topics.sh --create --zookeeper 192.168.190.104:2181,192.168.190.105:2181,192.168.190.106:2181 --replication-factor 2 --partitions 3 --topic test
# kafka-topics.sh: 这是 Kafka 提供的一个脚本工具,用于管理 Kafka 中的主题。
# --create: 这是告诉 kafka-topics.sh 要创建一个新的主题。
# --zookeeper 192.168.190.104:2181,192.168.190.105:2181,192.168.190.106:2181: 这是指定 ZooKeeper 的连接信息,用于管理 Kafka 集群。在这里,指定了三个 ZooKeeper 实例的地址和端口号。
# --replication-factor 2: 这表示设置每个分区的副本数量为 2。副本是为了数据的冗余和容错性而存在的。
# --partitions 3: 这表示要创建 3 个分区。
# --topic test: 这表示要创建一个名为 "test" 的主题。
# 在 Kafka 集群中创建一个名为 "test" 的主题,该主题有 3 个分区,并且每个分区有 2 个副本。

② 查看当前服务器中的所有 topic

[root@zk-kfk01 config]# kafka-topics.sh --list --zookeeper 192.168.190.104:2181,192.168.190.105:2181,192.168.190.106:2181
test

③ 查看 test topic 的详情

[root@zk-kfk01 config]# kafka-topics.sh --describe --zookeeper 192.168.190.104:2181,192.168.190.105:2181,192.168.190.106:2181 test
Topic: test	PartitionCount: 3	ReplicationFactor: 2	Configs:  
# 表示主题 "test" 共有 3 个分区。每个分区的复制因子为 2,即每个分区都有两个副本
	Topic: test	Partition: 0	Leader: 0	Replicas: 0,2	Isr: 0,2
	Topic: test	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1,0
	Topic: test	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 2,1
# Partition: x 指明了每个分区的编号。
# Leader: x 显示了每个分区的 leader 副本所在的 broker 编号。
# Replicas: x,y 展示了每个分区的所有副本所在的 broker 编号。
# Isr: x,y 描述了每个分区的“在同步副本集”(In-Sync Replica)中的副本编号

④ 发布消息

[root@zk-kfk01 config]# kafka-console-producer.sh --broker-list 192.168.190.104:9092,192.168.190.105:9092,192.168.190.106:9092  --topic test
>123456

⑤ 消费消息

[root@zk-kfk02 config]# kafka-console-consumer.sh --bootstrap-server 192.168.190.104:9092,192.168.190.105:9092,192.168.190.106:9092 --topic test --from-beginning
123456

[root@zk-kfk03 config]# kafka-console-consumer.sh --bootstrap-server 192.168.190.104:9092,192.168.190.105:9092,192.168.190.106:9092 --topic test --from-beginning
123456
# --from-beginning:会把主题中以往所有的数据都读取出来

⑥ 修改分区数

修改主题 "test" 的分区数量为 6:
kafka-topics.sh --zookeeper 192.168.190.104:2181,192.168.190.105:2181,192.168.190.106:2181 --alter --topic test --partitions 6

⑦ 删除 topic

kafka-topics.sh --delete --zookeeper 192.168.190.104:2181,192.168.190.105.21:2181,192.168.190.106:2181 --topic test

7. 创建主题一般故障 

故障示例:

[root@zk-kfk01 config]# kafka-topics.sh --create --zookeeper 192.168.190.104:2181,192.168.190.105:2181,192.168.190.106:2181 --replication-factor 2 --partitions 3 --topic test
Error while executing topic command : Replication factor: 2 larger than available brokers: 0.
[2024-04-13 20:17:55,154] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 0.
# 指定的副本因子大于可用的 broker 数量。这可能是由于 ZooKeeper 中注册的 broker 数量与实际运行的 broker 数量不匹配导致的。

排查过程:

①  确保 broker 数量足够

② 查看 server.properties 配置

broker.id         
listeners=PLAINTEXT://ip:9092
确保编号唯一;监听地址是否正确

③ 查看端口是否正常

telenet 主机名 2181

④ 关闭 kafka,再挨个启动

service kafka stop
service kafka start

五、Kafka+ELKF

ELKF 部署请参考:ELK、ELKF企业级日志分析系统介绍-CSDN博客

环境配置:

服务器ip

节点名称myid/角色软件版本jdk版本操作系统
192.168.190.100node1elasticsearch-5.5.0、kibana-5.5.1openjdk version "1.8.0_131"centos7.4
192.168.190.101node2elasticsearch-5.5.0openjdk version "1.8.0_131"centos7.4
192.168.190.102logstashapache、logstash-5.5.1centos7.4
192.168.190.103filebeatfilebeat、apachecentos7.4
192.168.190.104zk-kfk011/followzookeeper-3.5.7openjdk version "1.8.0_131"centos7.4
192.168.190.105zk-kfk022/followzookeeper-3.5.7openjdk version "1.8.0_131"centos7.4
192.168.190.106zk-kfk033/leaderzookeeper-3.5.7openjdk version "1.8.0_131"centos7.4

数据流向:

1. 修改 filebeat 的主配置文件 

[root@filebeat ~]# cd /usr/local/filebeat
[root@filebeat filebeat]# vim filebeat.yml
filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /var/log/httpd/access_log
  tags: ["access"]
  
- type: log
  enabled: true
  paths:
    - /var/log/httpd/error_log
  tags: ["error"]
……
注释 Logstash output 相关配置,避免冲突
#----------------------------- kafka output --------------------------------
output.kafka:
  enabled: true
  hosts: ["192.168.190.104:9092","192.168.190.105:9092","192.168.190.106:9092"]
# 指定 Kafka 集群配置
  topic: "httpd"  # 指定 Kafka 的 topic
[root@filebeat filebeat]# vim filebeat.yml # 启动 filebeat

2. 在 filebeat 节点安装 apache

[root@filebeat filebeat]# yum install -y httpd
[root@filebeat filebeat]# systemctl start httpd.service

3. 新建一个 Logstash kafka 配置文件

[root@logstash ~]# cd /etc/logstash/conf.d/
[root@logstash conf.d]# vim kafka.conf
input {
    kafka {
        bootstrap_servers => "192.168.190.104:9092,192.168.190.105:9092,192.168.190.106:9092"  
# kafka集群地址
        topics  => "httpd"             # 拉取的kafka的指定topic
        type => "httpd_kafka"          # 指定 type 字段
        codec => "json"                # 解析json格式的日志数据
		auto_offset_reset => "latest"  # 拉取最近数据,earliest为从头开始拉取
		decorate_events => true        # 传递给elasticsearch的数据额外增加kafka的属性数据
    }
}

output {
  if "access" in [tags] {
    elasticsearch {
      hosts => ["192.168.190.100:9200"]
      index => "httpd_access-%{+YYYY.MM.dd}"
    }
  }
  
  if "error" in [tags] {
    elasticsearch {
      hosts => ["192.168.190.100:9200"]
      index => "httpd_error-%{+YYYY.MM.dd}"
    }
  }
  
  stdout { codec => rubydebug }
}
[root@logstash conf.d]# logstash -f kafka.conf
# 启动 logstash;或者 ./filebeat -e -c filebeat.yml &

网页端访问apache:
http://192.168.190.102/

4. 查看所有的索引

生产黑屏操作es时查看所有的索引:

[root@node1 ~]# curl -X GET "localhost:9200/_cat/indices?v"
health status index                    uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   httpd_error-2024.04.13   zHoWjUNPR2uggajGdK13_g   5   1         36            0    716.6kb        358.3kb
green  open   system-2024.04.10        A8E0O-f9Q4OsN5SsoqkXDw   5   1       7132            0      4.4mb          2.2mb
green  open   index-demo1              3voNtLzAQIqDJn3Ip2ODKA   5   1          1            0      9.3kb          4.6kb
green  open   logstash-2024.04.10      1KY7jcRlSfGHhtklVpIchg   5   1          2            0       18kb            9kb
green  open   httpd_access-2024.04.13  0EK1CjoZRCmOboLgtVhesQ   5   1         60            0    260.3kb        113.9kb
green  open   apache_error-2024.04.10  un_LRAJzTSmL9aq6DUL5zg   5   1         13            0    124.5kb         62.2kb
green  open   filebeat-2024.04.11      rX5Vy-73Q1aSUKBI3LGe0Q   5   1       6031            0      3.9mb          1.9mb
green  open   .kibana                  ev0JtIH6SyOpycSjI2TGQQ   1   1          6            1     69.7kb         34.8kb
green  open   apache_access-2024.04.10 lCL7_nw3QDmGr2YmbYUOuQ   5   1         12            0    113.8kb         56.9kb

es-head web 界面查看:

5. 登录 Kibana 添加索引 

浏览器访问 http://192.168.190.100:5601,添加索引“httpd*”,查看图表信息及日志信息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/541413.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

napi系列学习进阶篇——NAPI生命周期

什么是NAPI的生命周期 我们都知道,程序的生命周期是指程序从启动,运行到最后的结束的整个过程。生命周期的管理自然是指控制程序的启动,调用以及结束的方法。 而NAPI中的生命周期又是怎样的呢?如下图所示: 从图上我们…

WordPress 图片压缩插件:Compress JPEG PNG images 使用方法

插件介绍 Compress JPEG & PNG images是一款非常好用的图片压缩插件:,非常值得大家安装使用;特别是图片类型网站。其实我们很多服务器磁盘空间是不在乎多那么几十 MB 大小的,但是压缩了图片能提升网站速度,节省宽带&#xff…

入门:多层感知器Multiple-Layer Perceiver, MLP

本文将简单介绍多层感知器(MLP)的基本概念、原理和应用。MLP是一种前馈人工神经网络,由多层节点组成,每层节点通过权重和偏置与下一层节点相连。MLP在许多领域都有广泛的应用,如分类、回归、自然语言处理等。 本文将分…

软考数据库---2.SQL语言

主要记忆:表、索引、视图操作语句;数据操作;通配符、转义符;授权;存储过程;触发器 这部分等等整理一下: “”" 1、 数据定义语言。 SQL DDL提供定义关系模式和视图、 删除关系和视图、 修改关系模式的…

基于ssm的大学生租房平台的设计与实现(java源码+文档)

风定落花生,歌声逐流水,大家好我是风歌,混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的大学生租房平台。项目源码以及部署相关请联系风歌,文末附上联系信息 。 项目简介: 大学生租房平台的设计与实现的主…

Python数据分析可视化之Pandas的使用

一、项目介绍 数据获取与存储:能够使用Python财经数据接口包tushare下载股票交易数据,并将数据保存到CSV文件或MySQL数据库中。数据处理:能够用Pandas从CSV文件、Excel文件以及MySQL数据库中读取数据。能够使用Pandas对数据进行简单处理和深…

某狗网歌曲接口逆向之加密算法刨析

逆向网址 aHR0cHM6Ly93d3cua3Vnb3UuY29t 逆向链接 aHR0cHM6Ly93d3cua3Vnb3UuY29tL21peHNvbmcvN2dxcGVzNjguaHRtbA 逆向接口 aHR0cHM6Ly93d3dhcGkua3Vnb3UuY29tL3BsYXkvc29uZ2luZm8 逆向过程 请求方式:GET 逆向参数 signature:1898d8f157837fadc9751fdacf1398f9 …

天猫精灵要会员,不能听歌,还能用来干什么呢?榨干它的剩余价值

目录 起因:以听歌为主要功能的设备,却不能听歌了 1.蓝牙音箱 2.控制智能家电 3.万能遥控器,需要一个外接设备 4.倒计时/提醒,闹钟提醒,整点提醒(这功能有人不喜欢,闲吵,还不能关…

职场证件照:不只是一张照片那么简单,这些细节请注意

随着毕业季的到来,许多应届生已经开始在各自的岗位上实习,准备迎接转正的挑战。在这个过程中,一张得体的职场证件照将成为你职业生涯中的一张重要名片。在职场中,证件照的应用场景多种多样,从窗口岗位的公示到工作牌上…

Pytorch Windows EOFError: Ran out of input when num_workers>0

关于深度学习的一些学习框架,我使用过pytorch,caffe,caffe2,openchatkit,oneflow等,最近我将长达几十万字的报错手册重新进行了整理,制作出一个新的专栏,主要记录这几种常见的开发框架在安装和使用过程中常见的报错,以及我是如何解决掉的,以此来帮助更多的深度学习开…

紫光展锐T610平台_4G安卓核心板方案定制开发

紫光展锐T610核心板配备Android 11操作系统,采用12nm制程工艺。该处理器CPU由2颗基于Cortex-A75架构的大核心和6颗基于Cortex-A55架构的小核心组成,最高主频为1.8GHz。GPU采用的是614.4MHz的Mali G52,可以流畅播放2400*1080分辨率视频&#x…

maven之pom中的build标签

1、build标签分类 1.1、全局配置&#xff08;project build&#xff09; 针对整个项目的所有情况都有效。 <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation"htt…

知道智源开源最强语义向量模型BGE是什么吗?

Embedding模型作为大语言模型&#xff08;Large Language Model&#xff0c;LLM&#xff09;的一个重要辅助&#xff0c;是很多LLM应用必不可少的部分。但是&#xff0c;现实中开源的Emebdding模型却很少。北京智源人工智能研究院&#xff08;BAAI&#xff09;开源了BGE系列Emb…

Proxmox VE qm 方式恢复虚拟机

前言 使用qm 恢复Proxmox VE虚拟机&#xff0c;高效便捷。 登录Proxmox VE shell 执行恢复操作 假设备份好的文件在其它主机存储&#xff0c;我们可以下载到Proxmox VE本地目录下&#xff0c;如何执行虚拟化恢复操作--storage local-lvm&#xff08;恢复后存储到的位置&…

【linux】TCP编程{三次握手/四次挥手/API注意点/代码}

文章目录 1.API介绍1.1wc -l dirName1.2inet_pton1.3inet_aton1.4inet_ntop 2.三次握手与四次挥手1.三次握手2.四次挥手3.应用程序和TCP协议层如何交互总结 3.TCP 和 UDP 对比1.宏观2.详细 4.地址转换函数inet_ntoa 5.TCP编程代码Makefiletcp_client.cctcp_server.cctcp_server…

TypeScript学习--day1

一、介绍 TypeScript是JS的超集&#xff0c;为JS添加了类型支持。 1.1 为什么添加类型支持 JS代码的错误大部分是类型错误&#xff0c;增加改Bug时间&#xff0c;影响开发效率。 静态类型&#xff1a;编译期做类型检查 动态类型&#xff1a;执行期做类型检查 TS--静态类型编…

前端保留两位小数

一、保留两位小数&#xff08;四舍五入&#xff09; 解决方案&#xff1a;使用 toFixed(x) 方法可以对小数进行指定位数保留&#xff0c;其中x是要保留的位数用法&#xff1a;num.toFixed(x)&#xff0c;其中num为需要操作的数据&#xff0c;x为要保留的位数示例&#xff1a;1…

力扣LeetCode138. 复制带随机指针的链表 两种解法(C语言实现)

目录 题目链接 题目分析 题目定位&#xff1a; 解题思路 解题思路1&#xff08;粗暴但是复杂度高&#xff09; 解题思路2&#xff08;巧妙并且复杂度低&#xff09; 题目链接 138. 复制带随机指针的链表https://leetcode-cn.com/problems/copy-list-with-random-pointer/ …

双写一致性问题

双写一致性问题&#xff1a;同一份数据&#xff0c;需要写数据库、写缓存。数据库中的数据和缓存中的数据要一致 解决办法&#xff1a;延迟双删 当我们要进行更新操作时&#xff0c;先删除缓存&#xff0c;再更新数据库&#xff0c;延迟几百ms再删除一次redis的缓存数据。 示…

2023年蓝桥杯——日期统计

目录 题目链接&#xff1a;1.日期统计 - 蓝桥云课 (lanqiao.cn) 题目描述 思路 代码思路 定义数据结构&#xff1a; 处理每一个月&#xff1a; 检查日期序列在num100中是否存在&#xff1a; 计数匹配的日期数&#xff1a; 输出结果&#xff1a; 代码实现 总结 题目链…