基于hadoop下的Kafka分布式安装

简介

        Kafka是一种分布式流处理平台,它具有高吞吐量、可扩展性、可靠性、实时性和灵活性等优点。它能够支持每秒数百万条消息的传输,并且可以通过增加节点来增加吞吐量和存储容量。Kafka通过将数据复制到多个节点来实现数据冗余和高可用性,即使某个节点故障,也可以保证数据不会丢失。它能够快速地处理和传输数据,支持实时数据的处理和分析。此外,Kafka可以与各种不同的数据处理和分析工具集成,包括流处理、批处理、数据挖掘等等。

        Kafka的优点包括高吞吐量、可扩展性、可靠性、实时性和灵活性。它能够处理大量的数据,支持高并发的场景,并且可以水平扩展以支持更大的负载。此外,Kafka还提供了消息持久化的能力,可以保证数据的可靠性和不丢失。

        Kafka的缺点包括消息可能会重复消费影响数据精确度、数据达不到真正的实时以及只能支持统一分区内消息有序等。此外,Kafka不支持事务和消息的持久化,可能会在系统故障时丢失数据。

        Kafka的同类产品包括RabbitMQ、ActiveMQ、Amazon SQS等等。这些产品都是消息队列系统,具有类似的功能和用途。它们可以用于解耦和缓冲应用程序之间的消息传递,支持异步和同步通信,提供消息持久化和可靠性保证等功能。

        Kafka解决的场景问题包括实时数据流的处理和分析、大规模数据的分布式处理和存储、解耦和缓冲应用程序之间的消息传递等等。它可以用于在线购物平台、智能家居产品等互联网产品的消息传递和数据处理,以及金融、物流等行业的实时数据处理和分析。

        一个典型的Kafka集群包含若干个生产者(Producer)、若干Kafka集群节点(Broker)、若干消费者(Consumer)以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举Leader以及在消费者发生变化时进行负载均衡。生产者使用推(Push)模式将消息发布到集群节点,而消费者使用拉(Pull)模式从集群节点中订阅并消费消息。

安装

       zookeeper安装

       hadoop安装

       Kafka安装

        kafka安装、配置都比较简单,没有主从节点之分。

进入官网选择二进制安装

        所有节点均执行以下操作,我们在安装没有一个组件的时候都会设置软连接,主要是方便其他组件的挂接和后期组件的升级。

[hadoop@vm02 ~]$ tar -zxf kafka_2.12-3.6.0.tgz 
[hadoop@vm02 ~]$ rm -rf kafka_2.12-3.6.0.tgz 
[hadoop@vm02 ~]$ ln -s kafka_2.12-3.6.0/   kafka

配置文件设置

        进入到kafka的配置目录(/home/hadoop/kafka/config)中间,设置相关配置参数

zookeeper.properties配置文件

        该文件主要是挂接上zookeeper,参数kafka集群直接的选主实现运行的可靠性和高可用性

在kafka安装包中自带有zookeeper的程序,本文不适用自带的zookeeper程序,我们独立安装了zookeeper,用于管理hadoop中的所有组件,本文的kafka也将其挂到hadoop统一的zookeeper的程序中去。

        在默认文件中设置了五个参数(其中一个被#注释),所有节点均按照下图配置

# the directory where the snapshot is stored.
dataDir=/home/hadoop/zookeeper/zkdata
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production configq
maxClientCnxns=10
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
  1. maxClientCnxns: 这个参数控制单个 IP 地址可以同时连接到 ZooKeeper 服务的最大数量。默认值是 60。如果你的应用程序需要大量的客户端连接到 ZooKeeper,或者你发现客户端因为达到这个限制而被拒绝连接,你可以考虑增加这个值。但是请注意,过高的设置可能会导致服务器资源耗尽或拒绝服务攻击。

  2. admin.enableServer: 这个参数用来启用或禁用 ZooKeeper 的管理界面。默认情况下,这个参数的值为 false,这意味着 ZooKeeper 不会监听任何端口来提供管理界面。如果你想独立使用kafka内置的zookeeper程序,你可以打开,本问是独立安装的zookeeper,所以不适用参数,即设置为false

  3. dataDir:在参数的地址可以查看zookeeper具体的路径进行配置

    [hadoop@vm02 ~]$ cd zookeeper/
    [hadoop@vm02 zookeeper]$ ll
    total 40
    drwxr-xr-x. 2 hadoop hadoop  4096 Nov 16 22:59 bin
    drwxr-xr-x. 2 hadoop hadoop    87 Nov 15 15:44 conf
    drwxr-xr-x. 5 hadoop hadoop  4096 Oct  4 17:50 docs
    drwxrwxr-x. 2 hadoop hadoop  4096 Nov 15 15:07 lib
    -rw-r--r--. 1 hadoop hadoop 11358 Oct  4 17:50 LICENSE.txt
    drwxrwxr-x. 2 hadoop hadoop    46 Nov 15 15:41 logs
    -rw-r--r--. 1 hadoop hadoop  2084 Oct  4 17:50 NOTICE.txt
    -rw-r--r--. 1 hadoop hadoop  2335 Oct  4 17:50 README.md
    -rw-r--r--. 1 hadoop hadoop  3570 Oct  4 17:50 README_packaging.md
    drwxrwxr-x. 3 hadoop hadoop    63 Nov 28 00:33 zkdata
    drwxrwxr-x. 3 hadoop hadoop    23 Nov 15 15:41 zklog
    
  4. clientPort=2181:指定zookeeper对外的端口号,这里我们在独立部署zookeeper中使用的2181端口号

consumer.properties配置文件

        consumer.properties 文件是一个用于配置消费者客户端的属性文件。这个文件通常包含了消费者如何连接到 Kafka 集群、如何消费消息以及其它相关设置的信息 

  • bootstrap.servers: 这个参数指定了一个或多个 Kafka Broker 的地址列表(格式为:hostname:port),用逗号分隔。消费者将首先连接到这些服务器中的一个,并从中获取集群元数据,如 Topic 的分区信息和 Leader 服务器的位置等。然后,消费者会直接连接到负责其订阅 Topic 分区的 Leader 服务器来拉取消息。本文不采用

  • group.id: 这个参数指定了消费者的组 ID。同一组内的所有消费者共享订阅 Topic 的分区,以便实现负载均衡。每个分区只能被该组内的一个消费者实例消费。本文不采用

  • zookeeper.connect:用于指定zookeeper集群节点。hostname:port,hostname:port,hostname:port使用逗号隔开,列出zookeeper集群的所有节点。本文consumer.properties 配置文件仅采用此配置。

所有节点按照以下标准配置 

zookeeper.connect=vm02:2181,vm03:2181,vm04:2181

 

producer.properties配置文件 

       producer.properties 文件是一个用于配置生产者客户端的属性文件。

        学过kafka读者会知道metadata.broker.list参数,现在在kafka较新版本中已经使用bootstrap.servers参数进行替换。

        bootstrap.servers(旧版本metadata.broker.list):启动时producer查询borokers的列表,可以是集群中所有brokersl的一个子集。注意,这个参数只是用来获取topic的元信息用,producer会从元信息中挑选合适的broker并与之建立socket;连接。格式是:host1:port1,host2:port2。本文仅对该参数进行配置。所有节点均采用以下配置

bootstrap.servers=vm02:9092,vm03:9092,vm04:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none

server.properties配置文件

        server.properties 文件是一个用于配置 Kafka Broker 的属性文件。

以下讲解一下几个比较重要的参数。

  1. Broker ID (broker.id): 每个 Broker 必须有一个唯一的 ID。这个 ID 用于在 ZooKeeper 中注册 Broker。kafka集群中每一个节点的设置均不相同。本文采用(broker.id=2
                                        broker.id=3                                                                                                                           broker.id=4
    的值分别设置在vm02\vm03\vm04节点中去)

  2. Listeners (listeners): 指定 Broker 监听客户端连接的 IP 地址和端口。例如:PLAINTEXT://localhost:9092。本文采用(PLAINTEXT://vm02:9092                                       PLAINTEXT://vm03:9092
                             PLAINTEXT://vm04:9092。

    分别设置在vm02\vm03\vm04节点中去)

  3. ZooKeeper 连接 (zookeeper.connect): 指定 ZooKeeper 集群的地址列表(格式为:hostname:port/path),用逗号分隔。本文采用(zookeeper.connect=vm05:2181,vm06:2181,vm07:2181的值,所有节点同步)

  4. 日志目录 (log.dirs): 指定 Kafka 存储 Topic 分区日志的目录。可以指定多个路径以多副本日志模式增加kafka的可靠性。本文采用(
    log.dirs=/home/hadoop/zookeepr/zklog/kafka-logs的值,所有节点同步)

  5. 自动创建主题 (auto.create.topics.enable): 确保设置为 false,避免意外创建主题。本文采用(auto.create.topics.enable=false的值,所有节点同步)

  6. 日志保留策略 (log.retention.{ms,minutes,hours}):{ms,minutes,hours}是时间单位,据需求设置消息的保留时间或大小限制,建议保留七天。
    本文采用(log.retention.hours=128的值,所有节点同步)

  7. 自动领导选举 (unclean.leader.election.enable): 默认情况下,该参数的值为 true,这意味着当 ZooKeeper 发现一个 Broker 不可用时,它会允许从那些与 Leader 失去同步的副本(Out-of-Sync Replicas, OSRs)中选举新的 Leader。这种方式被称为 "不干净的 Leader 选举"(Unclean Leader Election)。这种选举方式的优点是速度快,因为它不需要等待所有副本都完成数据同步。然而,缺点是可能会导致数据丢失或不一致。建议为false。本文采用(unclean.leader.election.enable=false的值,所有节点同步)

        进入该文件中的添加参数,添加在文末尾,会覆盖前面的参数,其余的参数使用默认值。

##以下参数每一个节点值不能一样
broker.id=2
listeners=PLAINTEXT://vm02:9092
##以下参数所有节点配置均一致
zookeeper.connect=vm05:2181,vm06:2181,vm07:2181
log.dirs=/home/hadoop/zookeepr/zklog/kafka-logs
auto.create.topics.enable=false
log.retention.hours=128
unclean.leader.election.enable=false

环境变量配置 

        博主喜欢把环境变量全部配置在/etc/profile中,这样方便运维,一样望去可以知道服务器安装了哪些程序。然后在对应的用户(hadoop用户下).bash_profile用户环境变量文件中增加source /etc/profile    使得对应的用户能读到所有的环境变量,当然可以根据个人需求,在.bash_profile用户环境变量文件中只配置对应其用户开放的环境变量路径。        

#使用root用户编辑该文件
vim /etc/profile

增加以下变量(所有节点同步)

export KAFKA_HOME=/home/hadoop/kafka
export PATH=$KAFKA_HOME/bin:$PATH
export SERVER_PROPERTIES=$KAFKA_HOME/config/server.properties

记住:wq 退出保存后,需要使用source /etc/profile 加载一遍环境变量 

博主在该服务器上已经配置了一些其他程序,读者将就看吧

切换到hadoop用户下,编辑.bash_profile用户环境变量增加source /etc/profile 

启动Kafka集群

        启动zookeeper

#使用以下指令启动zookeeper(所有节点同步)
zkServer.sh start 

        启动Hadoop

#使用以下指令启动hadoop集群的hdfs\yarn\等服务(主节点vm02)
start-all start 

        启动hbase 

#使用以下指令启动hbase(hbase主节点vm02)
start-hbase.sh

        启动kafka

#使用以下指令启动kafka(所有节点同步)
kafka-server-start.sh -daemon $SERVER_PROPERTIES 

查看所有进程使用jps查看所有进程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/214871.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【拓展】Loguru:更为优雅、简洁的Python 日志管理模块

目录 一、简单介绍 二、安装与简单使用 ​三、常见用法 3.1 显示格式 3.2 写入文件 3.3 json日志 3.4 日志绕接 3.5 并发安全 四、高级用法 4.1 接管标准日志logging 4.2 输出日志到网络服务器 4.2.1 自定义日志服务器 ​4.2.2 第三方库日志服务器 4.3 与pytest结…

RT-Thread 汇编分析启动流程

文章目录 一、汇编指令二、启动文件三、流程图 一、汇编指令 这里介绍即几条最常见实用的汇编指令 LDR R0,[R1]:将R1指定内存地址数据,存储到寄存器R0中。STR R0,[R1,#4]:将寄存器R0中数据存储到寄存器R1加上偏移量4的位置。MOV r0,#0x01&a…

read()之后操作系统都干了什么

首先说明三个参数 file文件 buff从内存中开辟一段缓冲区用来接收读取的数据 size表示这个缓冲区的大小 有关file的参数: 状态:被打开 被关闭权限:可读可写最重要的是inode: 他包含了 文件的元数据(比如文件大小 文件类型 文件在访问前需要加…

Google Earth Engine谷歌地球引擎计算多年中某两个时间点之间遥感数据差值的平均值

本文介绍在谷歌地球引擎GEE中,提取、计算某一种遥感影像产品在连续的多年中,2个不同时相的数据差值的多年平均值,并将计算得到的这一景差值的结果图像导出的方法。 本文是谷歌地球引擎(Google Earth Engine,GEE&#x…

[二分查找]LeetCode2040:两个有序数组的第 K 小乘积

本文涉及的基础知识点 二分查找算法合集 题目 给你两个 从小到大排好序 且下标从 0 开始的整数数组 nums1 和 nums2 以及一个整数 k &#xff0c;请你返回第 k &#xff08;从 1 开始编号&#xff09;小的 nums1[i] * nums2[j] 的乘积&#xff0c;其中 0 < i < nums1.…

【Cell Signaling + 神经递质(neurotransmitter) ; 神经肽 】

Neuroscience EndocytosisExcitatory synapse pathwayGlutamatergic synapseInflammatory PainInhibitors of axonal regenerationNeurotrophin signaling pathwaySecreted Extracellular VesiclesSynaptic vesicle cycle

一个完整的手工构建的cuda动态链接库工程 03记

1&#xff0c; 源代码 仅仅是加入了模板函数和对应的 .cuh文件&#xff0c;当前的目录结构如下&#xff1a; icmm/gpu/add.cu #include <stdio.h> #include <cuda_runtime.h>#include "inc/add.cuh"// different name in this level for different type…

java企业财务管理系统springboot+jsp

1、基本内容 &#xff08;1&#xff09;搭建基础环境&#xff0c;下载JDK、开发工具eclipse/idea。 &#xff08;2&#xff09;通过HTML/CSS/JS搭建前端框架。 &#xff08;3&#xff09;下载MySql数据库&#xff0c;设计数据库表&#xff0c;用于存储系统数据。 &#xff08;4…

LongAddr

目录 1. 引言 2. AtomicInteger的局限性 3. AtomicInteger与LongAdder 的性能差异 4.LongAdder 的结构 LongAddr架构 Striped64中重要的属性 Striped64中一些变量或者方法的定义 Cell类 5. 分散热点的原理 具体流程图 6. 在实际项目中的应用 7. 总结 1. 引言 在这一…

【risc-v】易灵思efinix FPGA riscv 时钟配置的一些总结

系列文章目录 分享一些fpga内使用riscv软核的经验&#xff0c;共大家参考。后续内容比较多&#xff0c;会做成一个系列。 本系列会覆盖以下FPGA厂商 易灵思 efinix 赛灵思 xilinx 阿尔特拉 Altera 本文内容隶属于【易灵思efinix】系列。 文章目录 系列文章目录前言一、pan…

【算法】单调栈题单(矩阵系列、字典序最小、贡献法)⭐

文章目录 题单来源经典题单496. 下一个更大元素 I&#xff08;单调栈模板题&#xff09;503. 下一个更大元素 II&#xff08;单调栈循环数组&#xff09;2454. 下一个更大元素 IV&#xff08;第二个更大的元素&#xff1a;两个单调栈&#xff09;456. 132 模式&#xff08;单调…

java学习part19接口

113-面向对象(高级)-接口的使用_哔哩哔哩_bilibili 1.接口概念 个人认为是一种能力&#xff0c;某个类是否具有某种能力。一个类实现了一个接口就相当于学会了某些功能。 2.使用 接口里的属性都是全局常量public static final&#xff0c;即便不写也会自动加上。 3.多实现 4.接…

Python---函数递归---练习:斐波那契数列(本文以递归算法为主)

编程思想&#xff1a; 如何利用数学模型&#xff0c;来解决对应的需求问题&#xff1b;然后利用代码实现对应的数据模型。 算法&#xff1a;使用代码实现对应的数学模型&#xff0c;从而解决对应的业务问题 程序 算法 数据结构 在经常使用的算法中&#xff0c;有两种非常…

OGG实现Oracle19C到postgreSQL14的实时同步

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

如何从 Jira 成功迁移到极狐GitLab,看这个就够了!

内容来源&#xff1a;https://about.gitlab.com/blog 作者&#xff1a;Melissa Ushakov Atlassian 之前表示&#xff0c;到 2024 年 2 月会全面终止对于其服务器端产品的支持。 随着 Jira Server 的生命周期即将结束&#xff0c;众多组织都在考虑将其敏捷项目管理工具从Jira 迁…

51单片机应用从零开始(十)·指针

指针 C语言指针是一种保存变量地址的数据类型。它可以让程序直接访问内存中的数据&#xff0c;而不需要通过变量名来访问。指针变量存储的是一个地址&#xff0c;这个地址指向内存中的某个位置&#xff0c;该位置存储了一个值。 在C语言中&#xff0c;可以使用&运算符取得一…

网络安全现状

威胁不断演变&#xff1a; 攻击者不断变化和改进攻击方法&#xff0c;采用更复杂、更隐秘的技术&#xff0c;以逃避检测和追踪。这包括新型的勒索软件、零日漏洞利用和社交工程攻击等。 供应链攻击&#xff1a; 攻击者越来越关注供应链的弱点&#xff0c;通过在供应链中植入恶…

5_企业架构LNMP高可用负载均衡服务器

企业架构LNMP高可用负载均衡服务器之Nginx 学习目标和内容 1、能够描述负载均衡的作用 2、能够了解负载均衡常见实现方式 3、能够使用Nginx实现负载均衡 4、能够描述Nginx的常见负载均衡算法 一、背景描述及其方案设计 1、业务背景描述 时间&#xff1a;2011.6.-2013.9 发布产…

移动平均滤波的原理和C代码

移动平均滤波是一种简单有效的平滑信号的方法&#xff0c;它通过计算一系列数据点的平均值来减小信号中的波动。基本的移动平均滤波方法有两种&#xff1a;简单移动平均&#xff08;SMA&#xff09;和指数加权移动平均&#xff08;EWMA&#xff09;。 简单移动平均滤波&#xf…

Stream API 方法使用总结

文章目录 1.1、Stream介绍1.2、Stream创建对象&#xff08;1&#xff09;empty()方法&#xff08;2&#xff09;of()方法&#xff08;3&#xff09;Arrays.stream()方法&#xff08;4&#xff09;list.stream()方法 1.3、Stream中间方法&#xff08;1&#xff09;filter()方法&…