Linux平台Kafka高可用集群部署全攻略

🐇明明跟你说过:个人主页

🏅个人专栏:《大数据前沿:技术与应用并进》🏅

🔖行路有良友,便是天堂🔖

目录

一、引言

1、Kafka简介

2、Kafka核心优势

二、环境准备

1、服务器

2、服务器环境初始化

三、安装zookeeper

1、上传tar包

2、编辑配置文件

3、创建数据目录

4、安装JAVA

5、启动zookeeper

四、Kafka集群搭建

1、上传tar包

2、编辑配置文件

3、创建数据目录

4、启动kafka

5、查看端口状态

五、测试 

1、创建Topic

2、生产消息 

3、消费消息 


一、引言

1、Kafka简介

Apache Kafka 是一个开源的分布式流处理平台,它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的顶级项目。Kafka 设计用于处理实时数据流,提供了一种高效、可扩展、持久化的方式来进行数据发布和订阅。它通常被描述为一种分布式发布-订阅消息队列,但它实际上超越了传统消息队列的概念。

2、Kafka核心优势

1. 高吞吐量:

  • Kafka 能够处理海量数据,支持每秒数十万条消息的读写操作,即使在大规模部署中也能保持高性能。
  • 通过高效的文件系统设计和内存管理机制,Kafka 能够在处理大量数据的同时保持低延迟。


2. 持久性和可靠性:

  • Kafka 将数据存储在磁盘上,并支持数据复制(replication),确保即使在节点故障的情况下也能保证数据的可靠性和持久性。
  • 数据以追加的方式写入日志文件,减少了磁盘的随机写操作,提高了写入速度和数据完整性。


3. 可扩展性:

  • Kafka 具有良好的水平扩展能力,可以通过增加更多的节点来提升系统的处理能力和存储容量。
  • 分布式架构使得 Kafka 能够轻松地在多台服务器上部署,并且能够动态扩展和收缩集群大小。


4. 灵活的发布-订阅模型:

  • Kafka 支持发布-订阅模式,允许多个消费者订阅同一个主题,并且消费者可以独立消费消息。
  • 消费者可以控制自己的消费进度,不会影响其他消费者的状态,实现了消息消费的解耦。

二、环境准备

1、服务器

准备3台或者5台Linux服务器,用来组建高可用集群,这里使用3台Centos 7.9来进行搭建,大家也可以使用其他的Linux发行版本

配置如下:

2、服务器环境初始化

3台机器都要执行

关闭Selinux

vi /etc/selinux/config

#修改成如下
SELINUX=disabled

之后重启服务器

reboot

关闭并禁用防火墙

[root@kafka1 ~]# systemctl stop firewalld && systemctl disable firewalld

修改 /etc/hosts

vi /etc/hosts

# 添加以下内容
192.168.40.100  kafka1
192.168.40.101  kafka2
192.168.40.102  kafka3

修改镜像源 

curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo

三、安装zookeeper

为什么安装Kafka时,要先安装zookeeper:

ZooKeeper 是一个分布式的协调服务,它为分布式应用程序提供了一套完整的协调服务功能,包括命名服务、配置管理、集群管理和同步等。Kafka 利用 ZooKeeper 来管理其集群中的多个组件,确保系统的稳定性和一致性。

1、上传tar包

apache-zookeeper-3.8.0-bin.tar.gz

tar包可以去官网进行下载 Apache ZooKeepericon-default.png?t=O83Ahttps://zookeeper.apache.org/releases.html#download

解压tar包至 /opt 下

tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt

2、编辑配置文件

vim /opt/apache-zookeeper-3.8.0-bin/conf/zoo.cfg

# 输入如下内容
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/zkData
dataLogDir=/opt/zookeeper/zkLog
clientPort=2181
server.1=kafka1:2188:3888
server.2=kafka2:2188:3888
server.3=kafka3:2188:3888
4lw.commands.whitelist=*

tickTime=2000:

  • tickTime 定义了 ZooKeeper 服务器之间的心跳间隔时间(毫秒)。它是 ZooKeeper 中最基本的单位时间。默认值通常是 2000 毫秒(即 2 秒)。


initLimit=10:

  • initLimit 定义了初始同步阶段的最大超时时间(心跳次数)。这意味着在初始同步阶段,跟随者(follower)必须在 initLimit * tickTime 毫秒内完成与领导者(leader)的同步。例如,这里设置为 20 秒(10 * 2000 毫秒)。


syncLimit=5:

  • syncLimit 定义了在领导者和跟随者之间发送消息的最大超时时间(心跳次数)。这意味着在同步阶段,跟随者必须在 syncLimit * tickTime 毫秒内响应领导者的请求。例如,这里设置为 10 秒(5 * 2000 毫秒)。


dataDir=/opt/zookeeper/zkData:

  • dataDir 指定 ZooKeeper 服务器用来存储快照(snapshot)的目录。


dataLogDir=/opt/zookeeper/zkLog:

  • dataLogDir 指定 ZooKeeper 服务器用来存储事务日志(transaction logs)的目录。这是从 ZooKeeper 3.4.6 开始引入的一个配置项,使得日志和数据可以分开存储。


clientPort=2181:

  • clientPort 指定客户端连接到 ZooKeeper 服务器的端口,默认为 2181。


server.1=ka1:2188:3888:

  • server.N 表示第 N 台服务器的信息,格式为 hostname:peerPort:leaderPort。peerPort 是服务器之间通信的端口,leaderPort 是选举领导者时使用的端口。


4lw.commands.whitelist=*:

  • 4lw.commands.whitelist 指定客户端可以执行的命令白名单。* 表示允许所有命令。 

3、创建数据目录

mkdir -p /opt/zookeeper/zkData 
mkdir -p /opt/zookeeper/zkLog

创建集群ID文件

在3台机器上分别执行

 

[root@kafka1 bin]# echo 1 >  /opt/zookeeper/zkData/myid
[root@kafka2 bin]# echo 2 >  /opt/zookeeper/zkData/myid
[root@kafka3 bin]# echo 3 >  /opt/zookeeper/zkData/myid

4、安装JAVA

yum install -y java-1.8.0-openjdk-devel

5、启动zookeeper

cd /opt/apache-zookeeper-3.8.0-bin/bin/ 
./zkServer.sh start ../conf/zoo.cfg &

查看状态

[root@kafka1 bin]# ps -aux | grep zook

四、Kafka集群搭建

1、上传tar包

资源包大家可以到官网下载

https://kafka.apache.org/

解压至指定目录

[root@kafka1 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
[root@kafka2 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/
[root@kafka3 ~]# tar zxvf kafka_2.13-3.1.0.tgz -C /opt/

2、编辑配置文件

在kafka1上执行

[root@kafka1 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties

#输入如下内容
broker.id=0
listeners=PLAINTEXT://kafka1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

 在kafka2上执行

[root@kafka2 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties

#输入如下内容
broker.id=1
listeners=PLAINTEXT://kafka2:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

 在kafka3上执行

[root@kafka3 config]# vim /opt/kafka_2.13-3.1.0/config/server.properties

#输入如下内容
broker.id=2
listeners=PLAINTEXT://kafka3:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-logs
num.partitions=5
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka
zookeeper.connection.timeout.ms=18000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

 

broker.id=0

  • 这是 Kafka broker 的唯一标识符。每个 broker 必须有唯一的 ID。这里的值为 0,意味着这是一个集群中的单个 broker 或者是第一个 broker。

listeners=PLAINTEXT://kafka1:9092

  • 定义了 broker 监听的网络接口和端口。此处使用 PLAINTEXT 协议,意味着没有加密。kafka1:9092 表示监听名为 kafka1 的主机上的 9092 端口。

num.network.threads=3

  • 指定了用于网络请求处理的线程数。网络请求包括接收来自生产者的消息、发送消息给消费者等操作。这里设置为 3 个线程。

num.io.threads=8

  • 指定了用于处理 I/O 请求的线程数。I/O 请求包括磁盘上的读写操作。这里设置为 8 个线程。

socket.send.buffer.bytes=102400

  • 设置了发送套接字的缓冲区大小(单位:字节)。此配置影响网络数据包的发送速度。此处设置为 102400 字节。

socket.receive.buffer.bytes=102400

  • 设置了接收套接字的缓冲区大小(单位:字节)。此配置影响网络数据包的接收速度。此处设置为 102400 字节。

socket.request.max.bytes=104857600

  • 定义了从客户端接收的最大请求大小(单位:字节)。这有助于防止因过大请求而导致的内存溢出。此处设置为 104857600 字节,即约 100MB。

log.dirs=/opt/kafka-logs

  • 指定了日志文件存储的位置。日志文件包含了 Kafka topic 的数据。这里设置的日志目录为 /opt/kafka-logs。

num.partitions=5

  • 指定了默认主题分区的数量。分区越多,通常意味着更高的并发度。这里设置的主题默认分区数为 5。

default.replication.factor=2

  • 指定了创建新主题时的默认复制因子。复制因子决定了每个分区的副本数量。这里设置的复制因子为 2,意味着每个分区有 2 份副本。

num.recovery.threads.per.data.dir=1

  • 指定了用于恢复日志段的线程数。每个数据目录可以有不同的线程数。这里设置为 1 个线程。

offsets.topic.replication.factor=1

  • 指定了 _consumer_offsets 主题的复制因子。此主题用于存储消费者的偏移量信息。这里设置的复制因子为 1,意味着只有一个副本。

transaction.state.log.replication.factor=1

  • 指定了 _transactions 主题的复制因子。此主题用于记录事务状态。这里设置的复制因子为 1,意味着只有一个副本。

transaction.state.log.min.isr=1

  • 指定了 _transactions 主题的最小 ISR(In-Sync Replicas)数量。ISR 是与 leader 同步的副本集合。这里设置的最小 ISR 数量为 1。

log.retention.hours=168

  • 指定了日志数据保留的时间长度(单位:小时)。这里设置的日志保留时间为 168 小时,即 7 天。

log.segment.bytes=1073741824

  • 指定了日志段的最大大小(单位:字节)。一旦达到这个大小,Kafka 就会创建一个新的日志段。这里设置的日志段大小为 1073741824 字节,即 1GB。

log.retention.check.interval.ms=300000

  • 指定了检查日志清理的间隔时间(单位:毫秒)。这里设置的检查间隔为 300000 毫秒,即 5 分钟。

zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181/kafka

  • 指定了 ZooKeeper 服务器列表。ZooKeeper 用于协调 Kafka 集群。这里设置的 ZooKeeper 服务器为 kafka1:2181, kafka2:2181, kafka3:2181,路径为 /kafka。

zookeeper.connection.timeout.ms=18000

  • 指定了 Kafka 与 ZooKeeper 之间的连接超时时间(单位:毫秒)。这里设置的超时时间为 18000 毫秒,即 18 秒。请注意,zookeeper.connection.timeout.ms 在配置中出现了两次,应该是误写,只需要保留一次即可。

group.initial.rebalance.delay.ms=0

  • 指定了消费者组初始重新平衡的延迟时间(单位:毫秒)。这里设置的延迟时间为 0,即立即开始重新平衡。

3、创建数据目录

在3台机器上分别执行

mkdir /opt/kafka-logs

4、启动kafka

[root@kafka1 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka1 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@kafka2 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka2 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@kafka3 bin]# cd /opt/kafka_2.13-3.1.0/bin/
[root@kafka3 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

5、查看端口状态

[root@kafka1 bin]# netstat -antupl

五、测试 

1、创建Topic

前面我们已经将kafka集群搭建起来了,接下来创建一个Topic进行写入测试,如果不清楚Topic是什么,可以翻看作者之前的文章。

在kafka1上执行

[root@kafka1 bin]# cd /opt/kafka_2.13-3.1.0/bin
[root@kafka1 bin]# ./kafka-topics.sh --bootstrap-server=192.168.40.100:9092 --topic test --create --partitions=3 --replication-factor=2
  • --bootstrap-server:指定 Kafka broker 的地址和端口号。这里的 192.168.40.100:9092 指定了 broker 的 IP 地址为 192.168.40.100,端口号为 9092。
  • --topic:指定要操作的主题名称。在这个例子中,主题名为 test。
  • --create:告诉 Kafka 创建一个新主题。如果主题已经存在,这条命令将会失败,除非你配置了允许创建已存在的主题。
  • --partitions:指定主题的分区数。分区数决定了主题能够并行处理消息的能力。在这个例子中,主题 test 将会有 3 个分区。
  • --replication-factor:指定主题的复制因子。复制因子决定了每个分区的副本数量,这对于数据的冗余和可靠性非常重要。在这个例子中,主题 test 的每个分区将会有 2 个副本。

查看Topic

[root@kafka1 bin]# ./kafka-topics.sh --bootstrap-server=192.168.40.100:9092 --list

 

2、生产消息 

[root@kafka1 bin]# ./kafka-console-producer.sh --bootstrap-server 192.168.40.100:9092 --topic test

向我们刚刚创建的test Topic写入几条消息

3、消费消息 

[root@kafka1 bin]# ./kafka-console-consumer.sh --bootstrap-server=192.168.40.100:9092 --topic test --from-beginning

如果能看到之前生产的消息,则证明集群搭建成功

 💕💕💕每一次的分享都是一次成长的旅程,感谢您的陪伴和关注。希望这些关于大数据的文章能陪伴您走过技术的一段旅程,共同见证成长和进步!😺😺😺

🧨🧨🧨让我们一起在技术的海洋中探索前行,共同书写美好的未来!!!   

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/888968.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【学习笔记】SquareLine Studio安装教程(LVGL官方工具)

一.简介与导航: SquareLine Studio是由LVGL官方开发的一款UI设计工具,采用图形化进行界面UI设计,轻易上手。 SquareLine Studio官方网址:https://squareline.io/SquareLine Studio官方文档:https://docs.squareline.io…

上传本地项目到GitHub远程仓库(极简洁操作版)

第一步:在GitHub创建一个空的仓库 第二步:将仓库克隆(下载)到本地 第三步:将你要上传的所有文件放到这个克隆的仓库文件夹中 第四步:通过git add .将待上传文件添加到暂存区 此时,可以通过git …

css3-----2D转换、动画

2D 转换(transform) 转换(transform)是CSS3中具有颠覆性的特征之一,可以实现元素的位移、旋转、缩放等效果 移动:translate旋转:rotate缩放:scale 二维坐标系 2D 转换之移动 trans…

SpringBoot项目打成jar包,在其他项目中引用

1、首先新建一个SpringBoot工程 记得要将Gradle换成Maven 2、新建一个要引用的方法 3、打包的时候要注意: ① 不能使用springboot项目自带的打包插件进行打包,下面是自带的: ②要换成传统项目的maven打包,如下图: 依…

《网络安全自学教程》- Nmap使用及扫描原理分析

《网络安全自学教程》 Nmap(Network Mapper)是一款免费的开源网络扫描器,向目标主机发送特定的数据包,根据返回的流量特征,分析主机信息。主要功能有:「端口扫描」、「主机探测」、「服务识别」和「系统识别…

(接口测试)接口测试理论 http理论 接口测试流程 接口文档解析

一.接口测试理论 1.接口和接口测试 服务器为客户端开了一个验证接口(接口本质:函数方法)客户端向服务器传送的消息可以相当于函数的参数,接口是用来让客户端传递数据的 接口:相当于开了一个通道 当服务器要给客户端响…

笔记整理—linux进程部分(6)进程间通信、alarm和pause

两个进程间通信可能是任何两个进程间的通信(IPC)。同一个进程是在同一块地址空间中的,在不同的函数与文件以变量进程传递,也可通过形参传递。2个不同进程处于不同的地址空间,要互相通信有难度(内存隔离的原…

Awaken Likho恶意组织利用高级网络工具对俄罗斯政府发起“猛攻”

近日,俄罗斯政府机构和工业实体遭遇了一场名为“ Awaken Likho ”的网络活动攻击活动。 卡巴斯基表示,攻击者现在更倾向于使用合法MeshCentral平台的代理,而不是他们之前用来获得系统远程访问权限的UltraVNC模块。这家俄罗斯网络安全公司详细…

Golang | Leetcode Golang题解之第457题环形数组是否存在循环

题目: 题解: func circularArrayLoop(nums []int) bool {n : len(nums)next : func(cur int) int {return ((curnums[cur])%n n) % n // 保证返回值在 [0,n) 中}for i, num : range nums {if num 0 {continue}slow, fast : i, next(i)// 判断非零且方…

docker简述

1.安装dockers,配置docker软件仓库 安装,可能需要开代理,这里我提前使用了下好的包安装 启动docker systemctl enable --now docker查看是否安装成功 2.简单命令 拉取镜像,也可以提前下载使用以下命令上传 docker load -i imag…

大数据毕业设计选题推荐-B站热门视频数据分析-Python数据可视化-Hive-Hadoop-Spark

✨作者主页:IT研究室✨ 个人简介:曾从事计算机专业培训教学,擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

使用C语言获取iostat中的await值的方法和方案

使用C语言获取iostat中的await值的方法和方案 1. 准备工作2. 调用iostat命令并获取输出3. 解析iostat输出4. 完整实现和错误处理5. 注意事项在Linux系统中,iostat命令是sysstat软件包的一部分,用于监控系统的CPU、网卡、tty设备、磁盘、CD-ROM等设备的活动情况和负载信息。其…

鸿蒙OS投票机制

(基于openharmony5.0) 投票机制 param get | grep ohos.boot.time 图 投票机制参数图 只有当所有的投票完成,开机动画才会退出,整理需要投票的系统应用(三方应用不参与投票)如下图所示: 以进程foundation为例&…

基于Kafka2.1解读Producer原理

文章目录 前言一、Kafka Producer是什么?二、主要组件1.Kafka Producer1.1 partitioner1.2 keySerializer1.3 valueSerializer1.4 accumulator1.5 sender 2.Sender2.1 acks2.2 clientinFlightBatches 3. Selector3.1 nioSelector3.2 channels 4. 全局总览 总结 前言…

Arduino UNO R3自学笔记20 之 Arduino如何测定电机速度?

注意:学习和写作过程中,部分资料搜集于互联网,如有侵权请联系删除。 前言:在学习了Arduino的相关基础知识后,现在做个综合应用,给旋转的电机测速。 1.实验目的 测定旋转电机的转速。 2.实验器材-编码器 …

【hot100-java】二叉树的最近公共祖先

二叉树篇 我觉得是比两个节点的深度,取min(一种情况) DFS解题。 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode(int x) { val x; }* }*/ clas…

力扣题11~15

题11(中等): 思路: 这种题目第一眼就是双循环,但是肯定不行滴,o(n^2)这种肯定超时,很难接受。 所以要另辟蹊径,我们先用俩指针(标志位)在最左端和最右端&am…

基于SpringBoot智能垃圾分类系统【附源码】

基于SpringBoot智能垃圾分类系统 效果如下: 系统首页界面 用户注册界面 垃圾站点页面 商品兑换页面 管理员登录界面 垃圾投放界面 物业登录界面 物业功能界图 研究背景 随着城市化进程的加速,生活垃圾的产量急剧增加,传统的垃圾分类方式已…

【C++】二叉搜索树+变身 = AVL树

🚀个人主页:小羊 🚀所属专栏:C 很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~ 目录 前言一、AVL树二、AVL树的实现2.1 平衡因子2.2 旋转处理2.2.1 左单旋:插入新节点后单纯的右边高2.2.2 …

光路科技TSN交换机:驱动自动驾驶技术革新,保障高精度实时数据传输

自动驾驶技术正快速演进,对实时数据处理能力的需求激增。光路科技推出的TSN(时间敏感网络)交换机,在比亚迪最新车型中的成功应用,显著推动了这一领域的技术进步。 自动驾驶技术面临的挑战 自动驾驶系统需整合来自雷达…