Flume学习笔记(2)—— Flume进阶

Flume进阶

Flume 事务

事务处理流程如下:

Put

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,回滚数据

Take

  • doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列

Flume Agent 内部原理

ChannelSelector

ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel

其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)

  • ReplicatingSelector 会将同一个 Event 发往所有的 Channel
  • Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel

SinkProcessor

SinkProcessor 共有三种类型 , 分别是 DefaultSinkProcessor 、LoadBalancingSinkProcessor、FailoverSinkProcessor

  • DefaultSinkProcessor 对应的是单个的Sink
  • LoadBalancingSinkProcessor 可以实现负载均衡的功能
  • FailoverSinkProcessor 可以实现错误恢复的功能

Flume 拓扑结构

简单串联

将多个 flume 顺序连接起来,从最初的 source 开始到最终 sink 传送的目的存储系统

不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统

复制和多路复用

(单 source,多 channel、sink)

Flume 支持将事件流向一个或者多个目的地

这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地

负载均衡和故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能、

这里的agent1有三个sink,分别连接agent2,agent3,agent4,即使其中有的sink出现了故障,数据还是能同步到hdfs

聚合

业务中常用,比如说日志采集功能:

日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器,产生的日志处理起来也非常麻烦

可以采用聚合的方式,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析

Flume实战案例

复制和多路复用

需求:使用 Flume-1 监控文件变动

  1. Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS
  2. Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem

实现流程:
1.在job下创建文件夹group1,并在其中创建配置文件flume-file-flume.conf

配置文件中需要有1个source,2个channel,2个sink

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/apache-hive-3.1.2-bin/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

该配置文件的作用是将数据发送到两个不同的sink,再由sink发送到其他的agent进行处理

2.创建配置文件flume-flume-hdfs.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

source绑定上一个agent的sink1,然后上传到hdfs

3.创建配置文件:flume-flume-dir.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /home/why/data/flumeDemo/test1

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

参数说明:

sink类型为file_roll:Flume 1.11.0 User Guide — Apache Flume

可以将events保存到本地文件系统

  • directory:本地文件系统保存数据的路径(注意,该路径必须已经存在才可以)

4.分别启动相应的flume进程:

nohup bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf &

nohup bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf &

nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf &
5.在hdfs和文件夹中都能看到相应的内容:

hdfs:

文件系统:

负载均衡和故障转移

需求:使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能

实现流程:

1.在/opt/module/flume/job 目录下创建 group2 文件夹,创建配置文件flume-netcat-flume.conf

配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444


# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

参数说明:Flume 1.11.0 User Guide — Apache Flume

通过sink groups在一个agent中定义多个sink,并可以配置sink processor使用:Flume 1.11.0 User Guide — Apache Flume

2.创建 flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

sink输出到本地的控制台

3.创建 flume-flume-console2.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

sink输出到本地的控制台

4.执行指令:

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

5.使用nc localhost 44444发送数据

由于console2设置的优先级高于console1,因此数据由console2接收到;

接下来将console2进程kill掉,数据就由console1接收了:

聚合

需求:

hadoop102 上的 Flume-1 监控文件/home/why/data/flumeDemo/test3/test3.log

hadoop103 上的 Flume-2 监控某一个端口的数据流

Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台

实现流程:

1.首先在三台服务器的job文件夹先创建目录group3

2.在hadoop102上,创建配置文件flume1-logger-flume.conf,source用于监控log日志文件,sink用于输出数据到下一级的Flume

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/why/data/flumeDemo/test3/test3.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.在hadoop103上,创建配置文件flume2-netcat-flume.conf,source用于监控端口44444的数据流,sink用于将数据传输到下一级的flume

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

注意,这两个agent的sink目的地都是hadoop104这一个服务器,因此hostname和port都相同

4.在hadoop104上创建配置文件flume3-flume-logger.conf,source用于接收flume1和flume2发送来的数据流,sink用于输出数据到控制台;

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

5.分别在三台服务器上执行指令

hadoop104: bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

hadoop102:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf

hadoop103:bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf

6.在hadoop102上向日志文件中追加内容:

echo "hello" > /home/why/data/flumeDemo/test3/test3.log

在hadoop103中通过nc hadoop103 44444向44444端口发送数据;

然后在hadoop104中即可接收到数据:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/159812.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

笔记54:门控循环单元 GRU

本地笔记地址:D:\work_file\DeepLearning_Learning\03_个人笔记\3.循环神经网络\第9章:动手学深度学习~现代循环神经网络 a a a a a a a

802.1Qbb

[TOC] 802.1Qbb 802.1Qbb是什么? 802.1Qbb(基于优先级的流控制,PFC)是以太网数据中心中一项重要的标准,用于提供无丢包的网络环境。这项标准是IEEE 802.1Q标准的一部分,旨在解决以太网数据中心网络中的拥…

基于共生生物算法优化概率神经网络PNN的分类预测 - 附代码

基于共生生物算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于共生生物算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于共生生物优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要:针对PNN神…

C++语言的由来与发展历程

C语言的由来与发展历程可以追溯到1978年,当时美国电话电报公司(AT&T)的贝尔实验室发明了C语言,以满足UNIX操作系统的开发需求。在C语言的基础上,Bjarne Stroustrup于1983年创立了C编程语言,作为C语言的…

Linux 安装多版本 JDK 详细过程

背景说明 服务器已安装jdk1.8,但随着spring全家桶的升级换代,已不满足使用,先要用高版本jdk,暂时不想卸载旧的版本,故安装两个版本,jdk1.8和jdk17,jdk1.8的已经安装过了,所以此次只安装jdk17,以及配置jdk切…

homeassiant主题

下载主题 https://github.com/maartenpaauw/home-assistant-community-themes.git 使用file editor到homeassiant路径下,新建文件夹themes文件夹,用terminal新建也可以。 使用file editor上传文件 使用Terminal解压 mkdir themes unzip home-assistan…

振弦传感器表面钢筋计与振弦采集仪形成岩土工程监测的案例

振弦传感器表面钢筋计与振弦采集仪形成岩土工程监测的案例 振弦传感器和表面钢筋计是岩土工程监测中常用的仪器设备,可用于测量结构物的振动和变形情况,以及土体的变形和应力状态等。 以下是一个振弦传感器和表面钢筋计结合使用的案例: 在一…

【Python入门五】第三方库(包)介绍

Python第三方库/包介绍 前言安装方法 2 数据分析和处理netCDF4numpyxarray 3参考 前言 Python 的库分为2类。 标准库:不需要安装,需要导入。第三库:需要安装、需要导入。 Python的标准库中提供了许多有用的模块和功能,如字符串…

OpenCV技术应用(4)— 如何改变图像的透明度

前言:Hello大家好,我是小哥谈。本节课就手把手教你如何改变图像的透明度,希望大家学习之后能够有所收获~!🌈 目录 🚀1.技术介绍 🚀2.实现代码 🚀1.技术介绍 改变图像透明度的实…

Flink(六)【DataFrame 转换算子(下)】

前言 今天学习剩下的转换算子。 1、物理分区算子 常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast),下边我们分别来做…

【TEC100TAI-KIT】青翼科技基于复微青龙JFMQL100TAI的全国产化智能异构计算平台

板卡概述 TEC100TAI-KIT是我司自主研制的一款基于上海复旦微电子复微青龙100TAI的全国产智能异构计算平台开发套件,该套件包含1个复微青龙100TAI核心板和1个PCIE规格的扩展底板。 该套件的核心板集成了100TAI的最小系统,包含一颗JFMQL100TAI900片上系统…

《网络协议》08. 概念补充

title: 《网络协议》08. 概念补充 date: 2022-10-06 18:33:04 updated: 2023-11-17 10:35:52 categories: 学习记录:网络协议 excerpt: 代理、VPN、CDN、网络爬虫、无线网络、缓存、Cookie & Session、RESTful。 comments: false tags: top_image: /images/back…

网络规模与性能优化的一篇随笔

本周写篇轻松的话题,注意信息传输的尺度和缩放比例,写篇随笔。 控制面和数据面随规模缩放的影响,举几个例子就能说明白。 CSMA/CD,控制面和数据面在一起,控制信息交互时延和数据面时延在同一尺度时,就到了…

视频制作技巧:添加srt字幕,批量剪辑,省时省力

随着社交媒体的兴起,视频制作越来越成为人们表达自我、分享经验的重要方式。然而,视频制作需要耗费大量的时间和精力。在视频制作中,字幕是非常重要的元素,可以帮助观众更好地理解视频内容。而SRT字幕则是一种更为先进的字幕技术&…

计算机毕业设计选题推荐-高校后勤报修微信小程序/安卓APP-项目实战

✨作者主页:IT研究室✨ 个人简介:曾从事计算机专业培训教学,擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

分发糖果(贪心算法)

题目描述 n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求,给这些孩子分发糖果: 每个孩子至少分配到 1 个糖果。相邻两个孩子评分更高的孩子会获得更多的糖果。 请你给每个孩子分发糖果,计算并返回…

利用NVIDIA DALI读取视频帧

1. NVIDIA DALI简介 NVIDIA DALI全称是NVIDIA Data Loading Library,是一个用GPU加速的数据加载和预处理库,可用于图像、视频和语音数据的加载和处理,从而为深度学习的训练和推理加速。 NVIDIA DALI库的出发点是,深度学习应用中…

网络基础(一)

文章目录: 计算机网络认识计算机网络背景网络发展认识 “协议” 网络协议初识协议分层OSI七层模型TC/IP 五层(或四层)模型 网络传输基本流程网络传输流程图同局域网的两台主机进行通信跨网络的两台主机进行通信数据包的封装和分用 网络中的地…

本周Github有趣项目:draw-a-ui等

有趣的项目、工具和库 gpt-crawler 抓取网站以生成知识文件,从而从 URL 创建您自己的自定义 GPT。 需要步骤: 配置运行爬虫、 将您的数据上传到 OpenAI:使用此选项通过 UI 访问您生成的知识,您可以轻松与他人共享 创建自定义助…

AR眼镜_单目光波导VS双目光波导方案

双目光波导AR眼镜方案是一种创新的智能设备,可以在现实场景中叠加虚拟信息,提供增强的视觉体验和交互体验。光学显示方案是AR眼镜的核心技术之一,它对眼镜的性能和使用体验起着决定性的作用。 相比于单目AR眼镜,双目AR眼镜具有更好…