大数据基础设施搭建 - Kafka(with ZooKeeper)

文章目录

  • 一、简介
  • 二、单机部署
    • 2.1 上传压缩包
    • 2.2 解压压缩包
    • 2.3 修改配置文件
      • (1)配置zookeeper地址
      • (2)修改kafka运行日志(数据)存储路径
    • 2.4 配置环境变量
    • 2.5 启动/关闭
    • 2.6 测试
      • (1)查看当前服务器中的所有topic
      • (2)创建topic等增删改查操作未测试,担心后面升级为集群模式时出问题。
  • 三、集群部署
    • 3.0 清空log.dirs目录并删除zookeeper的kafka节点
    • 3.1 同步到其他机器
      • (1)同步Kafka软件
      • (2)修改其他机器的broker.id
      • (3)配置其他机器的环境变量
    • 3.2 启动/停止集群
    • 3.3 测试
      • (1)查看当前服务器中的所有topic
      • (2)创建topic
      • (3)删除topic
      • (4)发送消息
      • (5)消费消息
      • (6)查看某个Topic的详情
      • (7)修改分区数
  • 四、监控(kafka-eagle单机模式)
    • 4.0 上传并解压kafka-eagle压缩包
    • 4.1 修改Kafka集群配置
      • (1)暴露JMX端口
      • (2)调大Kafka内存
      • (3)分发配置
    • 4.2 配置kafka-eagle
      • 4.2.1 修改配置文件
        • (1)配置zk地址
        • (2)Kafka Offset的存储地址
        • (3)配置MySQL地址
        • (4)其他配置
      • 4.2.2 配置环境变量
    • 4.3 启动
      • 4.3.1 启动Kafka集群
      • 4.3.2 启动kafka-eagle
      • 4.3.3 关闭kafka-eagle
    • 4.4 测试

一、简介

Kafka官网:https://kafka.apache.org/intro
Kafka是Scala开发的,运行依赖JVM,所以安装Kafka前需要先安装JDK。

在这里插入图片描述

二、单机部署

Kafka集群化部署需要分布式协调服务来帮助Kafka实现高可用,分布式协调服务可以使用通用解决方案Zookeeper或Kafka内部实现的KRaft。ZooKeeper充当的角色是帮助提供公平的选举机制选举leader等作用。本例采用的模式是Kafka with ZooKeeper(参考资料丰富)。

2.1 上传压缩包

2.2 解压压缩包

[hadoop@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/

2.3 修改配置文件

[hadoop@hadoop102 config]$ vim server.properties

(1)配置zookeeper地址

zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

(2)修改kafka运行日志(数据)存储路径

log.dirs=/opt/module/kafka_2.11-2.4.1/datas

2.4 配置环境变量

[hadoop@hadoop102 config]$ sudo vim /etc/profile.d/my_env.sh

新增内容:

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka_2.11-2.4.1
export PATH=$PATH:$KAFKA_HOME/bin

使环境变量生效:

[hadoop@hadoop102 config]$ source /etc/profile

2.5 启动/关闭

[hadoop@hadoop102 config]$ cd /opt/module/kafka_2.11-2.4.1/
[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-server-stop.sh stop

2.6 测试

(1)查看当前服务器中的所有topic

两种查看方式,一种是连kafka查看,一种是连zookeeper看,topic信息存zookeeper上了????

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list
[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list

(2)创建topic等增删改查操作未测试,担心后面升级为集群模式时出问题。

三、集群部署

从Kafka单机模式升级到Kafka集群模式,一定要先清空log.dirs目录,否则其他机器会启动失败。需要清空zookeeper中kafka信息吗?

3.0 清空log.dirs目录并删除zookeeper的kafka节点

[hadoop@hadoop102 kafka_2.11-2.4.1]$ rm -r datas/
# 启动zookeeper客户端
[zk: localhost:2181(CONNECTED) 5] deleteall /kafka

3.1 同步到其他机器

(1)同步Kafka软件

[hadoop@hadoop102 ~]$ mytools_rsync /opt/module/kafka_2.11-2.4.1/

(2)修改其他机器的broker.id

不同机器的brokerid不能相同

[hadoop@hadoop103 config]$ vim server.properties
# 修改内容:broker.id=1
[hadoop@hadoop104 config]$ vim server.properties
# 修改内容:broker.id=2

(3)配置其他机器的环境变量

[hadoop@hadoop103 config]$ sudo vim /etc/profile.d/my_env.sh
[hadoop@hadoop104 config]$ sudo vim /etc/profile.d/my_env.sh

新增内容:

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka_2.11-2.4.1
export PATH=$PATH:$KAFKA_HOME/bin

使环境变量生效:

[hadoop@hadoop103 config]$ source /etc/profile
[hadoop@hadoop104 config]$ source /etc/profile

3.2 启动/停止集群

# 启动
[hadoop@hadoop102 config]$ cd /opt/module/kafka_2.11-2.4.1/
[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@hadoop103 config]$ cd /opt/module/kafka_2.11-2.4.1/
[hadoop@hadoop103 kafka_2.11-2.4.1]$ bin/kafka-server-start.sh -daemon config/server.properties
[hadoop@hadoop104 config]$ cd /opt/module/kafka_2.11-2.4.1/
[hadoop@hadoop104 kafka_2.11-2.4.1]$ bin/kafka-server-start.sh -daemon config/server.properties

# 停止
[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-server-stop.sh stop
[hadoop@hadoop103 kafka_2.11-2.4.1]$ bin/kafka-server-stop.sh stop
[hadoop@hadoop104 kafka_2.11-2.4.1]$ bin/kafka-server-stop.sh stop

3.3 测试

(1)查看当前服务器中的所有topic

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list

(2)创建topic

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 1 --topic first-topic

选项说明:
–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数

(3)删除topic

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first-topic

(4)发送消息

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first-topic

发送内容:

>hello
>hi~
>are you ok?

(5)消费消息

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first-topic
[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first-topic

消费者组内的消费者数和topic的分区数的关系?

(6)查看某个Topic的详情

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first-topic

(7)修改分区数

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first-topic --partitions 3

四、监控(kafka-eagle单机模式)

用于监控Kafka的消息堆积、消息延迟等情况。
注意:需要提前准备好MySQL环境,kafka-eagle会将监控数据保存到MySQL中。

4.0 上传并解压kafka-eagle压缩包

注意:压缩包里面还有一个压缩包,需要解压两次

[hadoop@hadoop102 software]$ cd /opt/software/
[hadoop@hadoop102 software]$ tar -zxvf kafka-eagle-bin-1.4.8.tar.gz
[hadoop@hadoop102 software]$ cd kafka-eagle-bin-1.4.8/
[hadoop@hadoop102 kafka-eagle-bin-1.4.8]$ tar -zxvf kafka-eagle-web-1.4.8-bin.tar.gz -C /opt/module/

4.1 修改Kafka集群配置

先关闭Kafka集群

[hadoop@hadoop102 bin]$ vim kafka-server-start.sh

(1)暴露JMX端口

JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户能够在任何Java应用程序中使用这些代理和服务实现管理。用人话说,就是对外暴露更多数据,方便某些监控之类的插件来使用

(2)调大Kafka内存

默认初始化内存、运行内存为1G,使用kafka-eagle监控,1G内存不够用。需要增加到2G。

修改内容:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
fi

(3)分发配置

[hadoop@hadoop102 bin]$ mytools_rsync kafka-server-start.sh

4.2 配置kafka-eagle

4.2.1 修改配置文件

[hadoop@hadoop102 ~]$ cd /opt/module/kafka-eagle-web-1.4.8/conf/
[hadoop@hadoop102 conf]$ vim system-config.properties
(1)配置zk地址

为什么要配置zk的地址,因为Kafka的配置信息存储在了zk中。

修改内容:

kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
(2)Kafka Offset的存储地址

kafka-eagle需要监控Kafka的offset,所以需要知道Kafka的offset存储在了哪里,存储位置是在Kafka集群中配置的,Kafka默认将offset存储在了kafka的topic中。

修改内容:

cluster1.kafka.eagle.offset.storage=kafka
(3)配置MySQL地址

修改内容:

kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://mall:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
(4)其他配置
# 是否启动监控图表
kafka.eagle.metrics.charts=true

4.2.2 配置环境变量

[hadoop@hadoop102 conf]$ sudo vim /etc/profile.d/my_env.sh

新增内容:

# kafkaEagle
export KE_HOME=/opt/module/kafka-eagle-web-1.4.8
export PATH=$PATH:$KE_HOME/bin

使环境变量生效:

[hadoop@hadoop102 conf]$ source /etc/profile

4.3 启动

4.3.1 启动Kafka集群

见本文3.2内容

4.3.2 启动kafka-eagle

启动前先放开MySQL所在机器的3306端口号,因为kafka-eagle启动后会进行初始化操作,包括在MySQL中创建ke数据库等。如果不放开初始化数据库会失败!
注意:阿里云安全组内网之间也需要放开对应端口号才能通信。能互相ping同ip,为什么不能连通端口???

[hadoop@hadoop102 conf]$ cd /opt/module/kafka-eagle-web-1.4.8/bin
# 给启动文件执行权限
[hadoop@hadoop102 bin]$ chmod 777 ke.sh
[hadoop@hadoop102 bin]$ cd /opt/module/kafka-eagle-web-1.4.8/
[hadoop@hadoop102 kafka-eagle-web-1.4.8]$ bin/ke.sh start

4.3.3 关闭kafka-eagle

[hadoop@hadoop102 kafka-eagle-web-1.4.8]$ bin/ke.sh stop

4.4 测试

安全组放开8048端口

访问:http://hadoop102:8048/ke
Account:admin
Password:123456

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/171282.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

米诺地尔行业分析:预计2029年将达到14亿美元

米诺地尔市场规模庞大,不仅包括消费品市场和服务行业,还涵盖了创新科技领域。随着经济的发展和市场需求的不断增长,米诺地尔市场的规模将继续扩大,各行各业都将面临更多机遇和挑战。 随着社会经济发展和城市化进程的推进&#xff…

怎么实现在微信公众号秒杀商品的功能呢

实现微信公众号秒杀商品的功能,需要结合微信公众平台和后端开发技术。下面将介绍整个实现过程,包括前期准备、开发流程和后期运营等方面。 一、前期准备 确定秒杀商品:选择适合秒杀的商品,要求数量充足、质量良好,同时…

NOSQL----redis的安装和基础命令

redis是什么 1.redis-------非关系型数据库 redis是非关系数据库的一种,也称为缓存型数据库。 非关系型数据库和关系型数据库 1.关系型数据库 关系型数据库是一个结构化的数据库,记录方式是行和列(列:声明对象,行&am…

Python二级 每周练习题27

如果你感觉有收获,欢迎给我打赏 ———— 以激励我输出更多优质内容 练习一: 用户输入一个半径r,求该半径下的圆的面积s与周长c。要求如下: (1)输出的面积与周长都保留俩位小数; (2&#xff0…

王先生丢手机上热搜!VERTU放大招:推出真人找手机服务

日前,一则关于民警三小时帮助失主寻回三十万天价手机的新闻登上热搜,引发网友对这部三十万手机的好奇与猜测。据了解,该男子丢失的手机疑似为一款名叫VERTU Signature 的奢侈品定制手机,而根据其官网显示,“唐卡定制”…

家庭教育专家:如何创建家庭自主学习环境?

经常听到一些父母这样抱怨:“明明和孩子说好就看20分钟电视,结果到了时间,他死活都不肯关。”“作业还没完成的情况下,孩子还一直抱着手机或者电子产品玩游戏。到了约定时间也不撒手,一直跟你讨价还价。” 其实&#…

图像处理02 matlab中NSCT的使用

06 matlab中NSCT的使用 最近在学习NSCT相关内容,奈何网上资源太少,简单看了些论文找了一些帖子才懂了一点点,在此分享给大家,希望有所帮助。 一.NSCT流程 首先我们先梳理一下NSCT变换的流程,只有清楚流程才更好的理清…

Redis(位图Bitmap和位域Bitfield)

位图: 位图是字符串类型的扩展。 Redis中的位图是一种特殊的数据结构,用于表示一系列位的集合。它可以存储大量的布尔值数据,每个位代表一个布尔值(0或1),并且可以对这些位进行各种位运算操作。位图通常用…

【ARM Trace32(劳特巴赫) 使用介绍 2.3 -- TRACE32 进阶命令之 参数传递介绍】

请阅读【ARM Coresight SoC-400/SoC-600 专栏导读】 文章目录 参数传递命令 ENTRY 参数传递命令 ENTRY ENTRY <parlist>The ENTRY command can be used to Pass parameters to a PRACTICE script or to a subroutineTo return a value from a subroutine 使用示例&am…

C++入门第八篇---STL模板---list的模拟实现

前言&#xff1a; 有了前面的string和vector两个模板的基础&#xff0c;我们接下来就来模拟实现一下list链表模板&#xff0c;我还是要强调的一点是&#xff0c;我们模拟实现模板的目的是熟练的去使用以及去学习一些对于我们本身学习C有用的知识和用法&#xff0c;而不是单纯的…

泛型进阶:通配符

基本概念 对泛型不了解的可以看这篇博客&#xff1a;数据结构前瞻-CSDN博客 一般来说&#xff0c;&#xff1f;在泛型里的使用就是通配符 看看下面的代码 class Message<T> {private T message ;public T getMessage() {return message;}public void setMessage(T m…

Qml使用cpp文件的信号槽

文章目录 一、C文件Demo二、使用步骤1. 初始化C文件和QML文件&#xff0c;并建立信号槽2.在qml中调用 一、C文件Demo Q_INVOKABLE是一个Qt元对象系统中的宏&#xff0c;用于将C函数暴露给QML引擎。具体来说&#xff0c;它使得在QML代码中可以直接调用C类中被标记为Q_INVOKABLE的…

【Sql】sql server还原数据库的时候,提示:因为数据库正在使用,所以无法获得对数据库的独占访问权。

【问题描述】 sql server 还数据库的时候&#xff0c;提示失败。 点击左下角进度位置&#xff0c;可以得到详细信息&#xff1a; 因为数据库正在使用&#xff0c;所以无法获得对数据库的独占访问权。 【解决方法】 针对数据库先后执行下述语句&#xff0c;获得独占访问权后&a…

Python 和 Ruby 谁是最好的Web开发语言?

Python 和 Ruby 都是目前用来开发 websites、web-based apps 和 web services 的流行编程语言之一。 【这个时候又人要说PHP是世界上最好的语言了】 我就不说PHP 最好的方法 VS 以人为本的语言 社区: 稳定与创新 尽管特性和编程哲学是选择一个语言的首要驱动因素&#xff0c…

stack和queue简单实现(容器适配器)

容器适配器 stack介绍stack模拟实现queue 介绍queue模拟实现deque stack介绍 stack模拟实现 以前我们实现stack&#xff0c;需要像list,vector一样手动创建成员函数&#xff0c;成员变量。但是stack作为容器适配器&#xff0c;我们有更简单的方法来实现它。 可以利用模板的强大…

C语言生成dll与lib文件

环境要求 新建一个空白项目&#xff0c;可以是exe的&#xff0c;也可以直接是dll的&#xff0c;也可以是啥都没有的空项目&#xff0c;推荐创建空项目&#xff0c;项目创建好以后进行配置&#xff0c;共两步 第一步&#xff0c;打开项目属性 第二步&#xff0c;设置配置类型…

基础课10——自然语言生成

自然语言生成是让计算机自动或半自动地生成自然语言的文本。这个领域涉及到自然语言处理、语言学、计算机科学等多个领域的知识。 1.简介 自然语言生成系统可以分为基于规则的方法和基于统计的方法两大类。基于规则的方法主要依靠专家知识库和语言学规则来生成文本&#xff0…

java中的抽象

1.当一个类中给出的信息不够全面时&#xff0c;&#xff08;比方说有无法确定的行为&#xff09;&#xff0c;它给出的信息不足以描绘出一个具体的对象&#xff0c;这时我们往往不会实例化该类&#xff0c;这种类就是抽象类。 2. 在Java中&#xff0c;我们通过在类前添加关键字…

Redis篇---第九篇

系列文章目录 文章目录 系列文章目录前言一、如果有大量的 key 需要设置同一时间过期,一般需要注意什么?二、什么情况下可能会导致 Redis 阻塞?三、缓存和数据库谁先更新呢?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击…

南京--ChatGPT/GPT4 科研实践应用

2023年随着OpenAI开发者大会的召开&#xff0c;最重磅更新当属GPTs&#xff0c;多模态API&#xff0c;未来自定义专属的GPT。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义&#xff0c;不亚于互联网和个人电脑的问世。360创始人周鸿祎认为未来各行各业如果不能搭上这班车…