Kafka 快速入门(一)

1.1安装部署

1.1.1 集群规划

bigdata01bigdata02bigdata03
zookeeperzookeeperzookeeper
kafkakafkakafka

1.1.2 集群部署

官方下载地址:http://kafka.apache.org/downloads.html

检查三台虚拟机的zk是否启动:zkServer.sh start 默认启动方式

1)解压安装包

kafka中 2.12 是scala语言的版本,3.0.0是kafka版本

将第一台虚拟机bigdata01作为主机,在bigdata01上完成一系列操作:

tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/installs/ 
#在压缩包路径下解压缩kafka, -C 后面路径为解压路径

2)修改解压后的文件名称

 mv kafka_2.12-3.0.0/ kafka3

 #版本号解读:2.12 是scala版本,3.0.0是kafka版本

3)进入到/opt/installs/kafka3 目录,修改配置文件

cd config/ # 切换到kafkaa的config文件夹下
vi server.properties #进入server.properties 修改配置

修改红色部分:

#broker 的全局唯一编号,不能重复,只能是数字。

broker.id=0

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘 IO 的线程数量

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以

配置多个磁盘路径,路径与路径之间可以用","分隔

log.dirs=/opt/installs/kafka3/datas

#topic 在当前 broker 上的分区个数

num.partitions=1

#用来恢复和清理 data 下数据的线程数量

num.recovery.threads.per.data.dir=1

# 每个 topic 创建时的副本数,默认时 1 个副本

offsets.topic.replication.factor=1

#segment 文件保留的最长时间,超时将被删除

log.retention.hours=168

#每个 segment 文件的大小,默认最大 1G

log.segment.bytes=1073741824

# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期

log.retention.check.interval.ms=300000

#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)

zookeeper.connect=bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka

# /kafka的意思是:在zk中创建一个文件夹叫做kafka

4)分发安装包

可以使用scp

scp /opt/installs/kafka3 root@hadoop12:/opt/installs

或者使用一下脚本命令:xsync.sh kafka3/

脚本:xsync.sh(可选择存放在/usr/local/bin或者sbin中)

脚本代码:

#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if [ $pcount -lt 1 ]
then
    echo No Enough Arguement!
    exit;
fi

#2. 遍历集群所有机器
for host in bigdata02 bigdata03
do
    echo ====================    $host    ====================
    #3. 递归遍历所有目录
    for file in $@
    do
        #4 判断文件是否存在
        if [ -e $file ]
        then
            #5. 获取全路径
            pdir=$(cd -P $(dirname $file); pwd)
            echo pdir=$pdir
            
            #6. 获取当前文件的名称
            fname=$(basename $file)
            echo fname=$fname
            
            #7. 通过ssh执行命令:在$host主机上递归创建文件夹(如果存在该文件夹)
            ssh $host "source /etc/profile;mkdir -p $pdir"
            
            #8. 远程同步文件至$host主机的$USER用户的$pdir文件夹下
            rsync -av $pdir/$fname $USER@$host:$pdir
        else
            echo $file Does Not Exists!
        fi
    done
done

5)修改bigdata02和bigdata03的broker.id

分别在 bigdata02 和 bigdata03上修改配置文件/opt/installs/kafka/config/server.properties 中的 broker.id=1、broker.id=2

注:broker.id 不得重复,整个集群中唯一。

6)配置环境变量

(1)修改bigdata01的环境变量,增加如下内容:

#KAFKA_HOME 

export KAFKA_HOME=/opt/installs/kafka3
export PATH=$PATH:$KAFKA_HOME/bin

分发一下:
xsync.sh /etc/profile

(2)刷新一下环境变量

 xcall.sh source /etc/profile 

7)启动集群

(1)先启动 Zookeeper 集群,然后启动 Kafka。 【如果启动过就跳过】

xcall.sh zkServer.sh start

(2)依次在 bigdata01、bigdata02、bigdata03 节点上启动 Kafka。

先进入到kafka3 这个文件夹中,在三台服务器上分别运行启动命令:
bin/kafka-server-start.sh -daemon config/server.properties

关闭集群 :

bin/kafka-server-stop.sh #每台服务器都要运行

1.1.3 集群kafka启停脚本

1)在/usr/local/sbin 目录下创建文件 kf.sh 脚本文件

vim kf.sh

2) 编写脚本

#! /bin/bash
case $1 in
"start"){
 for i in bigdata01 bigdata02 bigdata03
 do
 echo " --------启动 $i Kafka-------"
 ssh $i "source /etc/profile; /opt/installs/kafka3/bin/kafka-server-start.sh -daemon /opt/installs/kafka3/config/server.properties"
 done
};;
"stop"){
 for i in bigdata01 bigdata02 bigdata03
 do
 echo " --------停止 $i Kafka-------"
 ssh $i "source /etc/profile; /opt/installs/kafka3/bin/kafka-server-stop.sh"
 done
};;
esac

3)添加权限

chmod u+x kf.sh

4)使用脚本

kf.sh start #启动集群kafka
kf.sh stop #停止集群kafka

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止, Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

1.2 Kafka 命令行操作

1.2.1 主题命令行操作

1)查看操作主题命令参数:  bin/kafka-topics.sh

2)查看当前服务器中的所有 topic

(如果提示文件不存在可能是环境变量未配置,详细见1.1.2中的  6))

kafka-topics.sh --bootstrap-server bigdata01:9092 --list 

3)创建 first topic

kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic first

选项说明:

--topic 定义 topic 名

--replication-factor 定义副本数

--partitions 定义分区数

4)查看 first 主题的详情

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic first 

5)修改分区数

注意:分区数只能增加,不能减少

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 3

6)再次查看 first 主题的详情

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic first 

7)删除 topic(慎行)

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --delete --topic first

1.2.2 生产者命令行操作

1)查看操作生产者命令参数

bin/kafka-console-producer.sh

2)发送消息

bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic first

>hello world

>bigdata bigdata

1.2.3 消费者命令行操作

1)查看操作消费者命令参数

bin/kafka-console-consumer.sh

2)消费消息

(1)消费 first 主题中的数据。

kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic first 

(2)把主题中所有的数据都读取出来(包括历史数据)

kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --from-beginning --topic first

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/914779.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

零件图纸的技术要求及标注

1零件的技术要求 零件在加工、检验时的各项技术要求,通常是指表面粗糙度、尺寸公差、形状和位置公差,材料的热处理及表面处理等。 2尺寸公差与配合 1、零件的互换性&定义、作用 在按规定要求大量制造的零件或部件中,任取一个&#xff0…

Python 的 Pygame 库,编写简单的 Flappy Bird 游戏

Pygame 是一个用 Python 编写的开源游戏开发框架,专门用于编写 2D 游戏。它提供了丰富的工具和功能,使得开发者能够快速实现游戏中的图形渲染、声音播放、输入处理和动画效果等功能。Pygame 非常适合初学者和想要快速创建游戏原型的开发者。 Pygame 的主…

【缓存策略】你知道 Cache Aside(缓存旁路)这个缓存策略吗

👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO 专家博主 ⛪️ 个人社区&#x…

2024版最新kali linux 新手教程(非常详细)零基础入门到精通,收藏这篇就够了

您是否有兴趣使用 Kali Linux,但不知道从哪里开始?您来对地方了。 Kali Linux 是一个强大的渗透测试和道德黑客工具,提供许多工具和资源。 本 Kali Linux 教程将向您展示如何下载和安装它、解释桌面并强调您应该知道的关键领域。接下来&…

Android JNI 技术入门指南

引言 在Android开发中,Java是一种主要的编程语言,然而,对于一些性能要求较高的场景(如音视频处理、图像处理、计算密集型任务等),我们可能需要使用到C或C等语言来编写底层的高效代码。为了实现Java代码与C…

国标GB28181视频平台EasyCVR私有化部署视频平台对接监控录像机NVR时,录像机“资源不足”是什么原因?

EasyCVR视频融合云平台,是TSINGSEE青犀视频“云边端”架构体系中的“云平台”系列之一,是一款针对大中型项目设计的跨区域、网络化、视频监控综合管理系统平台,通过接入视频监控设备及视频平台,实现视频数据的集中汇聚、融合管理、…

HarmonyOS NEXT:模块化项目 ——修改应用图标+启动页等

涉及官方文档 应用配置文件应用/组件级配置图标资源规范 涉及到app.json5配置文件和module.json5配置文件 1、 icon和label的校验。 IDE从5.0.3.800版本开始,不再对module.json5中的icon和label做强制校验,因此module.json5与app.json5只需要选择其一…

产品经理晋级-Axure中继器+动态面板制作美观表格

步骤如下: 将你的表格(制作好的表格复制) 在工作页面中,添加动态面板,并把刚才复制的表格添加进来

java 面向对象高级

1.final关键字 class Demo{public static void main(String[] args) {final int[] anew int[]{1,2,3};// anew int[]{4,5,6}; 报错a[0]5;//可以,解释了final修饰引用性变量,变量存储的地址不能被改变,但地址所指向的对象的内容可以改变} }什…

计算机网络:运输层 —— 运输层端口号

文章目录 运输层端口号的分类端口号与应用程序的关联应用举例发送方的复用和接收方的分用 运输层端口号的分类 端口号只具有本地意义,即端口号只是为了标识本计算机网络协议栈应用层中的各应用进程。在因特网中不同计算机中的相同端口号是没有关系的,即…

echarts引入自定义字体不起作用问题记录

echarts引入自定义字体不起作用问题记录 1、问题描述 初始化界面字体不作用,当界面更新后字体样式正常显示 2、原因描述 这通常是由于字体文件加载延迟导致的。ECharts 在初始化时可能还没有加载完字体文件,因此无法正确应用字体样式 3、解决方案 …

AscendC从入门到精通系列(一)初步感知AscendC

1 什么是AscendC Ascend C是CANN针对算子开发场景推出的编程语言,原生支持C和C标准规范,兼具开发效率和运行性能。基于Ascend C编写的算子程序,通过编译器编译和运行时调度,运行在昇腾AI处理器上。使用Ascend C,开发者…

JavaScript——函数、事件与BOM对象

一、系统函数(JS中预置的函数) JS的预置函数在遇到非数字字符时会停止解析 parseInt 转整型 parseFloat 转浮点型 isNaN !isNaN("10") 检测是否纯数字 eval 把字符串转成算式并计算 1.parseInt(string, radix); 语法: string&#x…

Python学习从0到1 day28 Python 高阶技巧 ⑤ 多线程

若事与愿违,请相信,上天自有安排,允许一切如其所是 —— 24.11.12 一、进程、线程 现代操作系统比如Mac OS X,UNIX,Linux,Windows等,都是支持“多任务”的操作系统。 进程 进程:就…

OpenCV视觉分析之目标跟踪(11)计算两个图像之间的最佳变换矩阵函数findTransformECC的使用

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 根据 ECC 标准 78找到两幅图像之间的几何变换(warp)。 该函数根据 ECC 标准 ([78]) 估计最优变换(warpMatri…

彻底解决单片机BootLoader升级程序失败问题

文章目录 1、引言2、MicroBoot:优雅的解决升级问题问题1:bootloader 在跳转到app前没有清理干净存在的痕迹问题2: 需要 APP 传递信息给 Bootloader问题3: APP单独运行没有问题,通过Bootloader跳转到APP运行莫名死机问题…

v-html 富文本中图片使用element-ui image-viewer组件实现预览,并且阻止滚动条

效果 导入组件 import ElImageViewer from "element-ui/packages/image/src/image-viewer"; components:{ ElImageViewer },模板使用组件 <el-image-viewerv-if"isShowPics":on-close"closeViewer":url-list"srcList"/>定义两…

山寨一个Catch2的SECTION

Catch2 是一个 C 单元测试库&#xff0c;吹嘘自己比 NUnit 和 xUnit 还要高明&#xff0c; 支持在 TEST_CASE() 中的多个 SECTION&#xff0c; 意思是说 SECTION 外头的代码相当于setup 和 teardown&#xff0c;section 内部则被认为是实际的 test case&#xff0c; 这种写法可…

深入剖析【C++继承】:单一继承与多重继承的策略与实践,解锁代码复用和多态的编程精髓,迈向高级C++编程之旅

​​​​​​​ &#x1f31f;个人主页&#xff1a;落叶 &#x1f31f;当前专栏: C专栏 目录 继承的概念及定义 继承的概念 继承定义 定义格式 继承基类成员访问⽅式的变化 继承类模板 基类和派⽣类间的转换 继承中的作⽤域 隐藏规则 成员函数的隐藏 考察继承【作⽤…

36.Redis核心设计原理

本文针对前面的讲解做一次总结 1.Redis基本特性 1.非关系型的键值对数据库&#xff0c;可以根据键以O(1)的时间复杂度取出或插入关联值 2.Redis的数据是存在内存中的 3.键值对中键的类型可以是字符串&#xff0c;整型&#xff0c;浮点型等&#xff0c;且键是唯一的 4.键值对中…