Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统

  • 一、Kafka 概述
    • 1.1 Kafka 定义
    • 1.2 Kafka 设计目标
    • 1.3 Kafka 特点
  • 二、Kafka 架构设计
    • 2.1 基本架构
    • 2.2 Topic 和 Partition
    • 2.3 消费者和消费者组
    • 2.4 Replica 副本
  • 三、Kafka 分布式集群搭建
    • 3.1 下载解压
      • 3.1.1 上传解压
    • 3.2 修改 Kafka 配置文件
      • 3.2.1 修改zookeeper.properties配置文件
      • 3.2.2 修改consumer.properties配置文件
      • 3.2.3 修改producer.properties配置
      • 3.2.4 修改server.properties配置
    • 3.3 修改 Kafka 配置同步到其他节点
    • 3.4 修改 Kafka Server 编号
    • 3.5 启动Kafka 集群
    • 3.5.1 启动Zookeeper集群
    • 3.5.1 启动 Kafka 集群
    • 3.6 Kafka 集群测试
      • 3.6.1 创建Topic
      • 3.6.2 查看Topic列表
      • 3.6.2 查看Topic详情
      • 3.6.3 消费者消费Topic
      • 3.6.4 生产者向Topic发送消息
  • 四、案例实践:Flume 与 Kafka 集成开发
    • 4.1 配置Flume聚合服务
    • 4.2 Flume与Kafka集成测试
      • 4.2.1 启动Flume聚合服务
      • 4.2.2 启动 Flume 采集服务
      • 4.2.3 启动 Kafka 消费者服务
      • 4.2.4 准备测试数据

一、Kafka 概述

1.1 Kafka 定义

Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala语言编写,它以可水平扩展和高吞吐率的特点而被广泛使用。目前越来越多的开源分布式处理系统,如Spark、Flink都支持与Kafka集成。比如一个实时日志分析系统,Flume采集数据通过接口传输到Kafka集群(多台Kafka服务器组成的集群称为Kafka集群),然后Flink或者Spark直接调用接口从Kafka实时读取数据并进行统计分析。

1.2 Kafka 设计目标

  • 以时间复杂度为O(1)的方式提供消息持久化(Kafka)能力,即使对TB级以上数据也能保证常数时间的访问性能。持久化是将程序数据在持久状态和瞬时状态间转换的机制。通俗地讲,就是瞬时数据(比如内存中的数据是不能永久保存的)持久化为持久数据(比如持久化至磁盘中能够长久保存)。
  • 保证高吞吐率,即使在非常廉价的商用机器上,也能做到单机支持每秒100,000条消息的传输速度。
  • 支持Kafka Server间的消息分区,以及分布式消息消费,同时保证每个Partition内的消息顺序传输。
  • 支持离线数据处理和实时数据处理。

1.3 Kafka 特点

  • 高吞吐量、低延迟:Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
  • 可扩展性:Kafka集群同Hadoop集群一样,支持横向扩展。
  • 持久性、可靠性:Kafka消息可以被持久化到本地磁盘,并且支持Partition数据备份,防止数据丢失。
  • 容错性:允许Kafka集群中的节点失败,如果Partition(分区)副本数量为n,则最多允许n-1个节点失败。
  • 高并发:单节点支持上千个客户端同时读写,每秒钟有上百MB的吞吐量,基本上达到了网卡的极限。

二、Kafka 架构设计

2.1 基本架构

在这里插入图片描述
生产者将数据写入 Kafka,消费者从 Kafka 中读取数据,Zookeeper 提供协调服务,如生产者和消费者的负载均衡

2.2 Topic 和 Partition

在这里插入图片描述
生产者将数据写入主题,实际写入分区(轮询,随机等),一个分区只能对应一个消费者组中的一个消费组,而一个消费者可以对应多个分区。

2.3 消费者和消费者组

在这里插入图片描述
一个分区只能对应一个消费者组中的一个消费者,消费者组相互独立,一个分区可以对应多个不同消费者组中的消费者,一个消费者可以对应多个分区。

2.4 Replica 副本

  • Leader:每个Replica集合中的分区都会选出一个唯一的Leader,所有的读写请求都由Leader处理,其他副本从Leader处把数据更新同步到本地。

  • Follower:是副本中的另外一个角色,可以从Leader中复制数据。

  • ISR:Kafka集群通过数据冗余来实现容错。每个分区都会有一个Leader,以及零个或多个Follower,Leader加上Follower总和就是副本因子。Follower与Leader之间的数据同步是通过Follower主动拉取Leader上面的消息来实现的。所有的Follower不可能与Leader中的数据一直保持同步,那么与Leader数据保持同步的这些Follower称为IS(In Sync Replica)。Zookeeper维护着每个分区的Leader信息和ISR信息。

三、Kafka 分布式集群搭建

3.1 下载解压

下载地址:https://archive.apache.org/dist/kafka/

此处使用的下载的版本式:kafka_2.12_2.8.2.tgz

3.1.1 上传解压

[root@hadoop1 local]# tar -zxvf kafka_2.12-2.8.2.tgz 

添加软连接

[root@hadoop1 local]# ln -s kafka_2.12-2.8.2 kafka

在这里插入图片描述

3.2 修改 Kafka 配置文件

3.2.1 修改zookeeper.properties配置文件

进入Kafka的config目录下,修改zookeeper. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/zookeeper.properties 

修改如下内容:

dataDir=/usr/local/data/zookeeper/zkdata
clientPort=2181

3.2.2 修改consumer.properties配置文件

进入Kafka的config目录下,修改consumer. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/consumer.properties

修改如下内容:

bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092

备注:hadoop1:9092,hadoop2:9092,hadoop3:9092 为集群hadoop地址

3.2.3 修改producer.properties配置

进入Kafka的config目录中,修改producer. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/producer.properties 

修改内容如下:

bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092

3.2.4 修改server.properties配置

进入Kafka的config目录下,修改server. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/server.properties 

修改内容如下:

zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181

3.3 修改 Kafka 配置同步到其他节点

将hadoop1节点中配置好的Kafka安装目录分发给hadoop2和hadoop3节点,具体操作如下所示:

[root@hadoop1 local]# deploy.sh /usr/local/kafka_2.12-2.8.2 /usr/local/ slave

给从节点创建软链接:

[root@hadoop1 local]# runRemoteCmd.sh "ln -s /usr/local/kafka_2.12-2.8.2 /usr/local/kafka" slave

备注:deploy.sh 是集群推送脚本,可以参考《ZooKeeper 集群的详细部署》

3.4 修改 Kafka Server 编号

登录hadoop1、hadoop2和hadoop3节点,分别进入Kafka的config目录下,修改server.properties配置文件中的broker.id项,具体操作如下所示:
[root@hadoop1 local]# vim /usr/local/kafka/config/server.properties
#标识hadoop1节点
broker.id=1
[root@hadoop2 local]# vim /usr/local/kafka/config/server.properties
#标识hadoop2节点
broker.id=2
[root@hadoop3 local]# vim /usr/local/kafka/config/server.properties
#标识hadoop3节点
broker.id=3

3.5 启动Kafka 集群

Zookeeper管理着Kafka Broker集群,同时Kafka将元数据信息保存在Zookeeper中,说明Kafka集群依赖Zookeeper提供协调服务,所以需要先启动Zookeeper集群,然后再启动Kafka集群。

3.5.1 启动Zookeeper集群

在集群各个节点中进入Zookeeper安装目录,使用如下命令启动Zookeeper集群。

# 启动集群
[root@hadoop1 local]# runRemoteCmd.sh "/usr/local/zookeeper/bin/zkServer.sh start" all
# 查看zookeeper 集群状态
[root@hadoop1 local]# runRemoteCmd.sh "/usr/local/zookeeper/bin/zkServer.sh status" all

在这里插入图片描述

3.5.1 启动 Kafka 集群

在集群各个节点中进入Kafka安装目录,使用如下命令启动Kafka集群。

[root@hadoop1 local]# runRemoteCmd.sh "/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties" all

在这里插入图片描述
显示 Kafka 已经启动。

3.6 Kafka 集群测试

Kafka自带有很多种Shell脚本供用户使用,包含生产消息、消费消息、Topic管理等功能。接下来利用Kafka Shell脚本测试使用Kafka集群。

3.6.1 创建Topic

使用Kafka的bin目录下的kafka-topics.sh脚本,通过create命令创建名为test的Topic,具体操作如下所示。

[root@hadoop1 local]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3

上述命令中,–zookeeper 指定 Zookeeper 集群;–create 是创建 Topic 命令;–topic指定Topic名称;–replication-factor 指定副本数量;–partitions指定分区个数。

在这里插入图片描述

3.6.2 查看Topic列表

通过list命令可以查看Kafka 的Topic列表,具体操作如下所示。

[root@hadoop1 kafka]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper hadoop1:2181  --list

在这里插入图片描述

3.6.2 查看Topic详情

通过describe命令查看Topic内部结构,具体操作如下所示。

[root@hadoop1 kafka]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper hadoop1:2181 --describe --topic test

在这里插入图片描述

3.6.3 消费者消费Topic

在hadoop1节点上,通过Kafka自带的kafka-console-consumer.sh脚本,开启消费者消费 test中的消息。

[root@hadoop1 kafka]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic test

在这里插入图片描述

3.6.4 生产者向Topic发送消息

在hadoop1节点上,通过Kafka自带的kafka-console-producer.sh脚本启动生产者,然后向 test发送3条消息,具体操作如下所示。

[root@hadoop1 logs]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list  hadoop1:9092 --topic test

生成者输入:
在这里插入图片描述
消费者展示:
在这里插入图片描述

四、案例实践:Flume 与 Kafka 集成开发

在 《Flume 日志采集系统》 的基础上进行 kafka 集成开发

4.1 配置Flume聚合服务

在 hadoop2 和 hadoop3 服务器配置分配配置 Flume 聚合服务

[root@hadoop1 conf]# vim /usr/local/flume/conf/avro-file-selector-kafka.properties
[root@hadoop2 conf]# vim /usr/local/flume/conf/avro-file-selector-kafka.properties

分别写入如下内容并保存:

#定义source、channel、sink的名称
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# 定义和配置一个avro Source
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234
# 定义和配置一个file channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /usr/local/data/flume/checkpointDir
agent1.channels.c1.dataDirs = /usr/local/data/flume/dataDirs
# 定义和配置一个kafka sink
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = test
agent1.sinks.k1.brokerList = hadoop1:9092,hadoop2:9092,hadoop3:9092
agent1.sinks.k1.producer.acks = 1
agent1.sinks.k1.channel = c1

4.2 Flume与Kafka集成测试

4.2.1 启动Flume聚合服务

在 采集服务器 hadoop2 和 hadoop3 分别启动聚合服务

[root@hadoop2 conf]# /usr/local/flume/bin/flume-ng agent -n agent1 -c conf -f /usr/local/flume/conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console

[root@hadoop3 local]# /usr/local/flume/bin/flume-ng agent -n agent1 -c conf -f /usr/local/flume/conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console

在这里插入图片描述

4.2.2 启动 Flume 采集服务

在 Hadoop1 启动 Flume 采集脚本:

[root@hadoop1 conf]# /usr/local/flume/bin/flume-ng agent -n agent1 -c conf -f /usr/local/flume/conf/taildir-file-selector-avro.properties -Dflume.root.logger=INFO,console

在这里插入图片描述
正常启动 Flume 采集脚本

4.2.3 启动 Kafka 消费者服务

在 hadoop1 启动 Kafka 消费者服务脚本

[root@hadoop1 data]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic test

在这里插入图片描述

4.2.4 准备测试数据

在 hadoop1 另开连接,执行如下脚本:

[root@hadoop1 logs]# echo '00:00:100971413028304674[火炬传递路线时间]1 2www.olympic.cn/news/beijing/2008-03-19/1417291.html' >> /usr/local/data/flume/logs/sogou.log

输入三条测试数据
在这里插入图片描述

消费者打印三条测试数据:
在这里插入图片描述
至此,案例测试成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/873527.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[网络原理]关于网络的基本概念 及 协议

文章目录 一. 关于网络的概念介绍1. 局域⽹LAN2. ⼴域⽹WAN3. 主机4. 路由器5. 交换机IP地址端口号 二. 协议协议分层TCP/IP五层模型(或四层)OSI七层模型封装分用 一. 关于网络的概念介绍 1. 局域⽹LAN 局域⽹,即 Local Area Network,简称LAN。 Local …

NGINX开启HTTP3,给web应用提个速

环境说明 linuxdockernginx版本:1.27 HTTP3/QUIC介绍 HTTP3是由IETF于2022年发布的一个标准,文档地址为:https://datatracker.ietf.org/doc/html/rfc9114 如rfc9114所述,http3主要基于QUIC协议实现,在具备高性能的同时又兼备了…

TCP远程命令执行

目录 一. 命令集 二. 命令执行模块实现 三. 服务端模块实现 四. 服务端调用模块实现 五. 客户端模块实现 六. 效果展示 此篇教大家如何利用TCP进行远程命令执行。 一. 命令集 将值得信任的命令放进一个txt文件中,执行命令时,就去这…

elementUI根据列表id进行列合并@莫成尘

本文章提供了elementUI根据列表id进行列合并的demo&#xff0c;效果如图&#xff08;可直接复制代码粘贴&#xff09; <template><div id"app"><el-table border :data"tableList" style"width: 100%" :span-method"objectS…

【STM32】SPI通信-软件与硬件读写SPI

SPI通信-软件与硬件读写SPI 软件SPI一、SPI通信协议1、SPI通信2、硬件电路3、移位示意图4、SPI时序基本单元(1)开始通信和结束通信(2)模式0---用的最多(3)模式1(4)模式2(5)模式3 5、SPI时序(1)写使能(2)指定地址写(3)指定地址读 二、W25Q64模块介绍&#xff11;、W25Q64简介&am…

小怡分享之数据结构LinkedList与链表

前言&#xff1a; &#x1f308;✨前面小怡给大家介绍了ArrayList&#xff0c;今天小怡给大家介绍一下链表。 1.ArrayList的缺陷 当在ArrayList任意位置插入或者删除元素时&#xff0c;就需要后续元素整体往前或者往后搬移&#xff0c;时间复杂度为O&#xff08;n&#xff09;…

网恋照妖镜源码搭建教程

文章目录 前言原理创建网站1.打开网站设置 配置ssl2.要打开强制HTTPS&#xff0c;用宝塔免费的ssl证书即可&#xff0c;也可以使用其他证书&#xff0c;必须是与域名匹配的3.上传文件至根目录进行解压4.解压后&#xff0c;修改文件 sc.php 里面的内容5.其余探索 结语 前言 前俩…

Excel和Word日常使用记录:

Excel使用总结 表格颜色填充&#xff1a; 合并单元格&#xff1a; 选中你要合并的单元格区域。 按下快捷键 Alt H&#xff0c;然后松开这些键。 再按下 M&#xff0c;接着按 C。 这个组合键执行的操作是&#xff1a;Alt H&#xff1a;打开“主页”选项卡。 M&#xff1a;选…

“阡陌云旅”黄河九省文化旅游平台

“阡陌云旅”黄河九省文化旅游平台 GitHub地址&#xff1a;https://github.com/guoJiaQi-123/Yellow-River-Cloud-Journey 项目背景 “阡陌云旅”黄河九省文化旅游平台 “阡陌云旅” 黄河九省文化旅游平台是一个专注于黄河流域九省文化旅游资源整合与推广的项目。 黄河是中…

[oeasy]python0004_游乐场_和python一起玩耍_python解释器_数学运算

和python玩耍 &#x1f94a; Python 回忆 上次 了解shell环境中的命令 <colgroup><col span"1"><col span"1"></colgroup> | 命令 | 作用 | | whoami | 显示当前用户名 | | pwd | 显示当前文件夹 | | ls | 列出当前文件夹下的内容…

51单片机的无线病床呼叫系统【proteus仿真+程序+报告+原理图+演示视频】

1、主要功能 该系统由AT89C51/STC89C52单片机LCD1602显示模块温湿度传感器模块矩阵按键时钟模块等模块构成。适用于病床呼叫系统、16床位呼叫等相似项目。 可实现基本功能: 1、LCD1602实时显示北京时间、温湿度信息、呼叫床位等信息&#xff1b; 2、DHT11采集病房温湿度信息&…

WSL 下的 CentOS 装 Docker

WSL 下的 CentOS 装 Docker 卸载旧版本安装前的准备工作1. 安装 yum-utils2. 添加阿里云的 yum 镜像仓库3. 快速生成 Yum 缓存 安装Docker启动docker运行 hello-world设置镜像加速器&#xff08;阿里云的&#xff09;卸载 Docker 引擎参考资料 卸载旧版本 sudo yum remove doc…

鸿蒙(API 12 Beta6版)超帧功能开发【ABR功能开发】

业务流程 基于相机运动感知策略的ABR主要业务流程如下&#xff1a; 用户进入ABR适用的游戏场景。游戏应用调用[HMS_ABR_CreateContext]接口并指定图形API类型&#xff0c;创建ABR上下文实例。游戏应用调用[HMS_ABR_SetTargetFps]接口初始化ABR实例&#xff0c;配置目标帧率属性…

excel透视图、看板案例(超详细)

一、简介 Excel透视图&#xff08;Pivot Table&#xff09; 功能&#xff1a;透视图是一种强大的数据分析工具&#xff0c;用于汇总、分析和展示数据。它允许用户对数据进行重新排列和分类&#xff0c;从而更容易发现数据中的模式和趋势。用途&#xff1a;可以用来生成动态报表…

【Python机器学习】词向量推理——词向量

目录 面向向量的推理 使用词向量的更多原因 如何计算Word2vec表示 skip-gram方法 什么是softmax 神经网络如何学习向量表示 用线性代数检索词向量 连续词袋方法 skip-gram和CBOW&#xff1a;什么时候用哪种方法 word2vec计算技巧 高频2-gram 高频词条降采样 负采样…

fastadmin 文件上传七牛云

1-安装七牛云官方SDK composer require qiniu/php-sdk 2-七牛云配置 <?phpnamespace app\common\controller;use Qiniu\Storage\BucketManager; use think\Config; use Qiniu\Auth; use Qiniu\Storage\UploadManager; use think\Controller; use think\Db;/*** 七牛基类*…

echarts 实现签到记录日历组件

以下笔记来源&#xff1a;编程导航 分析 有三种基本图表可以选择&#xff1a; 基础日历图&#xff1a;https://echarts.apache.org/examples/zh/editor.html?ccalendar-simple日历热力图&#xff1a;https://echarts.apache.org/examples/zh/editor.html?ccalendar-heatmap…

使用paddlerocr识别固定颜色验证码

1 引言 本文使用opencv和paddlerocr识别出固定颜色的验证码&#xff0c;原理不解释&#xff0c;安装包的方法自行查找&#xff0c;只提供代码和思路。 1 使用opencv对特定颜色区域进行提取2 使用paddlerocr识别并输出验证码 2 代码 2.1 读取图片&#xff0c;提取蓝色区域 …

C语言深入理解指针4

1.回调函数 回调函数是通过函数指针调用的函数 将函数指针作为参数传递给另一个函数&#xff0c;当这个函数指针被用来调用其所指向的函数时&#xff0c;被调用的函数就是回调函数&#xff0c;回调函数不是应该由该函数的实现方直接调用&#xff0c;而是在特定的事件或条件发生…

ASIO网络调试助手之一:简介

多年前&#xff0c;写过几篇《Boost.Asio C网络编程》的学习文章&#xff0c;一直没机会实践。最近项目中用到了Asio&#xff0c;于是抽空写了个网络调试助手。 开发环境&#xff1a; Win10 Qt5.12.6 Asio(standalone) spdlog 支持协议&#xff1a; UDP TCP Client TCP Ser…