大数据Flume--入门

文章目录

  • Flume
    • Flume 定义
    • Flume 基础架构
      • Agent
      • Source
      • Sink
      • Channel
      • Event
    • Flume 安装部署
      • 安装地址
      • 安装部署
    • Flume 入门案例
      • 监控端口数据官方案例
      • 实时监控单个追加文件
      • 实时监控目录下多个新文件
      • 实时监控目录下的多个追加文件

Flume

Flume 定义

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。
为什么选择Flume

Flume 基础架构

在这里插入图片描述

Agent

Agent 是一个JVM进程,它以事件的形式将数据从源头送至目的。

Agent 主要有3个部分组成,Source、Channel、Sink

Source

Source 是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种
格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。

Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。

Sink 组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

Channel

Channel 是位于Source 和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel 是线程安全的,可以同时处理几个Source 的写入操作和几个Sink 的读取操作。

Flume 自带两种Channel:Memory ChannelFile Channel

Memory Channel 是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适
用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。

File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数
据。

Event

传输单元,Flume 数据传输的基本单元,以Event 的形式将数据从源头送至目的地。Event 由Header Body 两部分组成,Header用来存放该event的一些属性,为K-V结构,Body 用来存放该条数据,形式为字节数组。

Flume 安装部署

安装地址

(1)Flume 官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume
(4)Flume tar包
链接:https://pan.baidu.com/s/1O_CEiuHafNyuWSsrtZaydg?pwd=kw9k
提取码:kw9k

安装部署

(1)将apache-flume-1.9.0-bin.tar.gz 上传到 linux 的/opt/software 目录下

(2)解压apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下

[yudan@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/ 

(3)修改apache-flume-1.9.0-bin 的名称为flume

[yudan@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume 

(4)将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

[yudan@hadoop102 lib]$  rm /opt/module/flume/lib/guava-11.0.2.jar 

Flume 入门案例

监控端口数据官方案例

1)案例需求:

使用Flume监听一个端口,收集该端口数据,并打印到控制台。

2)需求分析:
在这里插入图片描述
3)实现步骤:

(1)安装netcat工具

[yudan@hadoop102 software]$ sudo yum install -y nc

(2)判断44444端口是否被占用

[yudan@hadoop102 flume-telnet]$ sudo netstat -nlp | grep 44444

(3)创建Flume Agent配置文件flume-netcat-logger.conf

(4)在flume目录下创建job文件夹并进入job文件夹。

[yudan@hadoop102 flume]$ mkdir job 
[yudan@hadoop102 flume]$ cd job/ 

(5)在job文件夹下创建Flume Agent配置文件flume-netcat-logger.conf

[yudan@hadoop102 job]$ vim flume-netcat-logger.conf

(6)在flume-netcat-logger.conf 文件中添加如下内容。

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 

# Describe the sink 
a1.sinks.k1.type = logger 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

配置文件解析
(7)先开启flume监听端口

  • 第一种写法:

      [yudan@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console 
    
  • 第二种写法:

      [yudan@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console 
    
  • 参数说明:

    • –conf/-c:表示配置文件存储在conf/目录
    • –name/-n:表示给agent起名为a1
    • –conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf
      文件。
    • -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error。

(8)使用netcat工具向本机的44444端口发送内容

[yudan@hadoop102 ~]$ nc localhost 44444 
hello  
yudan 

(9)在Flume监听页面观察接收数据情况

实时监控单个追加文件

1)案例需求:实时监控Hive日志,并上传到HDFS中

2)需求分析:
实时读取本地文件到HDFS案例
3)实现步骤:

(1)Flume 要想将数据输出到HDFS,依赖Hadoop相关jar包

检查/etc/profile.d/my_env.sh 文件,确认 Hadoop和 Java 环境变量配置正确

JAVA_HOME=/opt/module/jdk1.8.0_212 
HADOOP_HOME=/opt/module/hadoop-3.1.3 
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin 
export PATH JAVA_HOME HADOOP_HOME

(2)创建flume-file-hdfs.conf 文件

[yudan@hadoop102 job]$ vim flume-file-hdfs.conf

注:要想读取Linux系统中的文件,就得按照Linux命令的规则执行命令。由于Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行Linux 命令来读取文件。

# Name the components on this agent 
a2.sources = r2 
a2.sinks = k2 
a2.channels = c2 
 
# Describe/configure the source 
a2.sources.r2.type = exec 
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log 
 
# Describe the sink 
a2.sinks.k2.type = hdfs 
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H 
#上传文件的前缀 
a2.sinks.k2.hdfs.filePrefix = logs- 
#是否按照时间滚动文件夹 
a2.sinks.k2.hdfs.round = true 
#多少时间单位创建一个新的文件夹 
a2.sinks.k2.hdfs.roundValue = 1 
#重新定义时间单位 
a2.sinks.k2.hdfs.roundUnit = hour 
#是否使用本地时间戳 
a2.sinks.k2.hdfs.useLocalTimeStamp = true 
#积攒多少个Event才flush到HDFS一次 
a2.sinks.k2.hdfs.batchSize = 100 
#设置文件类型,可支持压缩 
a2.sinks.k2.hdfs.fileType = DataStream 
#多久生成一个新的文件 
a2.sinks.k2.hdfs.rollInterval = 60 
#设置每个文件的滚动大小 
a2.sinks.k2.hdfs.rollSize = 134217700 
#文件的滚动与Event数量无关 
a2.sinks.k2.hdfs.rollCount = 0 
 
# Use a channel which buffers events in memory 
a2.channels.c2.type = memory 
a2.channels.c2.capacity = 1000 
a2.channels.c2.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
a2.sources.r2.channels = c2 
a2.sinks.k2.channel = c2 

a2.sinks.k2.hdfs.path = hdfs://hadoop102:端口号/flume/%Y%m%d/%H
端口号是NameNode的地址,这个端口号在/opt/module/hadoop-3.1.3/etc/hadoop下core-site.xml文件中的fs.defaultFS配置过

注意:对于所有与时间相关的转义序列,Event Header中必须存在以 “timestamp”的
key(除非hdfs.useLocalTimeStamp设置为true,此方法会使用TimestampInterceptor自
动添加timestamp)。

a3.sinks.k3.hdfs.useLocalTimeStamp = true

在这里插入图片描述
(3)运行Flume

[yudan@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/flume-file-hdfs.conf

(4)开启Hadoop和Hive并操作Hive产生日志

[yudan@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh 
[yudan@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh
 
[yudan@hadoop102 hive]$ bin/hive 
hive (default)>

(5)在HDFS上查看文件。

实时监控目录下多个新文件

1)案例需求:使用Flume监听整个目录的文件,并上传至HDFS

2)需求分析:
在这里插入图片描述
3)实现步骤:

(1)创建配置文件flume-dir-hdfs.conf

创建一个文件 
[yudan@hadoop102 job]$ vim flume-dir-hdfs.conf
# 添加以下内容

a3.sources = r3 
a3.sinks = k3 
a3.channels = c3 
 
# Describe/configure the source 
a3.sources.r3.type = spooldir 
a3.sources.r3.spoolDir = /opt/module/flume/upload 
a3.sources.r3.fileSuffix = .COMPLETED 
a3.sources.r3.fileHeader = true 
#忽略所有以.tmp结尾的文件,不上传 
a3.sources.r3.ignorePattern = ([^ ]*\.tmp) 
 
# Describe the sink 
a3.sinks.k3.type = hdfs 
a3.sinks.k3.hdfs.path = 
hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H 
#上传文件的前缀 
a3.sinks.k3.hdfs.filePrefix = upload- 
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round = true 
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue = 1 
#重新定义时间单位 
a3.sinks.k3.hdfs.roundUnit = hour 
#是否使用本地时间戳 
a3.sinks.k3.hdfs.useLocalTimeStamp = true 
#积攒多少个Event才flush到HDFS一次 
a3.sinks.k3.hdfs.batchSize = 100 
#设置文件类型,可支持压缩 
a3.sinks.k3.hdfs.fileType = DataStream 
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval = 60 
#设置每个文件的滚动大小大概是128M 
a3.sinks.k3.hdfs.rollSize = 134217700 
#文件的滚动与Event数量无关 
a3.sinks.k3.hdfs.rollCount = 0 

# Use a channel which buffers events in memory 
a3.channels.c3.type = memory 
a3.channels.c3.capacity = 1000 
a3.channels.c3.transactionCapacity = 100 

# Bind the source and sink to the channel 
a3.sources.r3.channels = c3 
a3.sinks.k3.channel = c3 

在这里插入图片描述
(2)启动监控文件夹命令

[yudan@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/flume-dir-hdfs.conf 

说明:在使用Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每500毫秒扫描一次文件变动。

(3)向upload文件夹中添加文件

在/opt/module/flume 目录下创建upload 文件夹

[yudan@hadoop102 flume]$ mkdir upload 

向upload文件夹中添加文件

[yudan@hadoop102 upload]$ touch 1.txt 
[yudan@hadoop102 upload]$ touch 2.tmp 
[yudan@hadoop102 upload]$ touch 3.log 

(4)查看HDFS上的数据

实时监控目录下的多个追加文件

Exec source 适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

1)案例需求:使用Flume监听整个目录的实时追加文件,并上传至HDFS

2)需求分析:
在这里插入图片描述
3)实现步骤:

(1)创建配置文件flume-taildir-hdfs.conf

创建一个文件 
[yudan@hadoop102 job]$ vim flume-taildir-hdfs.conf 
# 添加如下内容 
a3.sources = r3 
a3.sinks = k3 
a3.channels = c3 

# Describe/configure the source 
a3.sources.r3.type = TAILDIR 
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json 
a3.sources.r3.filegroups = f1 f2 
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.* 
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.* 

# Describe the sink 
a3.sinks.k3.type = hdfs 
a3.sinks.k3.hdfs.path = 
hdfs://hadoop102:8020/flume/upload2/%Y%m%d/%H 
#上传文件的前缀 
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round = true 
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue = 1 
#重新定义时间单位 
a3.sinks.k3.hdfs.roundUnit = hour 
#是否使用本地时间戳 
a3.sinks.k3.hdfs.useLocalTimeStamp = true 
#积攒多少个Event才flush到HDFS一次 
a3.sinks.k3.hdfs.batchSize = 100 
#设置文件类型,可支持压缩 
a3.sinks.k3.hdfs.fileType = DataStream 
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval = 60 
#设置每个文件的滚动大小大概是128M 
a3.sinks.k3.hdfs.rollSize = 134217700 
#文件的滚动与Event数量无关 
a3.sinks.k3.hdfs.rollCount = 0 
 
# Use a channel which buffers events in memory 
a3.channels.c3.type = memory 
a3.channels.c3.capacity = 1000 
a3.channels.c3.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
a3.sources.r3.channels = c3 
a3.sinks.k3.channel = c3

在这里插入图片描述
(2)启动监控文件夹命令

[yudan@hadoop102 flume]$ bin/flume-ng agent -cconf/ -n a3 -f job/flume-taildir-hdfs.conf

(3)向files文件夹中追加内容

在/opt/module/flume目录下创建files文件夹 
[yudan@hadoop102 flume]$ mkdir files 

向upload文件夹中添加文件 
[yudan@hadoop102 files]$ echo hello >> file1.txt 
[yudan@hadoop102 files]$ echo atguigu >> file2.txt 

(4)查看HDFS上的数据

Taildir 说明:

Taildir Source 维护了一个json 格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File的格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.t
 xt"} 
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.t
 xt"}

注:Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode 号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/382052.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《CSS 简易速速上手小册》第4章:视觉美学(2024 最新版)

文章目录 4.1 颜色理论在 CSS 设计中的应用:网页的调色盘4.1.1 基础知识4.1.2 重点案例:创建一个具有情感设计的登录页面4.1.3 拓展案例 1:使用颜色增强信息的可视化表示4.1.4 拓展案例 2:利用颜色创建网站的品牌身份 4.2 字体与文…

C#使用哈希表对XML文件进行查询

目录 一、使用的方法 1.Hashtable哈希表 2.Hashtable哈希表的Add方法 (1)定义 (2)示例 3.XML文件的使用 二、实例 1.源码 2.生成效果 可以通过使用哈希表可以对XML文件进行查询。 一、使用的方法 1.Hashtable哈希表…

视觉开发板—K210自学笔记(三)

本期我们来遵循其他单片机的学习路线开始去做一位点灯大师—点亮一个LED。那么第一步还是先知道K210里面的硬件电路是怎么连接的,需要查看上一节的文档,看看开发板原理图到底是哪个LED跟哪个IO连在一起。 一、硬件电路 根据之前官方提供的assembly draw…

最简单的基于 FFmpeg 的音频编码器(PCM 编码为 AAC)

最简单的基于 FFmpeg 的音频编码器(PCM 编码为 AAC) 最简单的基于 FFmpeg 的音频编码器(PCM 编码为 AAC)正文结果工程文件下载其他参考链接 最简单的基于 FFmpeg 的音频编码器(PCM 编码为 AAC) 参考雷霄骅…

【小沐学GIS】基于Android绘制三维数字地球Earth(OpenGL)

🍺三维数字地球系列相关文章如下🍺:1【小沐学GIS】基于C绘制三维数字地球Earth(OpenGL、glfw、glut)第一期2【小沐学GIS】基于C绘制三维数字地球Earth(OpenGL、glfw、glut)第二期3【小沐学GIS】…

Java:常用API接上篇 --黑马笔记

一、 StringBuilder类 StringBuilder代表可变字符串对象,相当于是一个容器,它里面的字符串是可以改变的,就是用来操作字符串的。 好处:StringBuilder比String更合适做字符串的修改操作,效率更高,代码也更…

例36:打开文件读出文件内容

1.建立一个EXE工程,在主窗体上放一个按钮,如图32。 图32 在按钮的单击事件中输入代码: Sub Form1_Command1_BN_Clicked(hWndForm As hWnd, hWndControl As hWnd)Dim s as StringDim 文件 As CWSTR FF_OpenFileDialog(hWndForm,_"打开…

微信自动预约小程序开发指南:从小白到专家

在数字化时代,预约小程序已成为各类服务行业的必备工具。本文将指导你从零开始,通过第三方小程序制作平台,顺利开发出一款具有预约功能的实用小程序。 第一步:注册登录第三方小程序制作平台 首先,你需要选择一个适合你…

案例:三台主机实现 级联复制

介绍:级联复制架构 级联复制架构 是一种特殊的主从结构,之前聊到的几种主从结构都只有两层,但级联复制架构中会有三层,关系如下: 也就是在级联复制架构中,存在两层从库,这实际上属于一主多从架…

Hive-架构与设计

架构与设计 一、背景和起源二、框架概述1.设计特点 三、架构图1.UI交互层2.Driver驱动层3.Compiler4.Metastore5.Execution Engine 四、执行流程1.发起请求2.获取执行计划3.获取元数据4.返回元数据5.返回执行计划6.运行执行计划7.运行结果获取 五、数据模型1.DataBase数据库2.T…

fast.ai 机器学习笔记(四)

机器学习 1:第 11 课 原文:medium.com/hiromi_suenaga/machine-learning-1-lesson-11-7564c3c18bbb 译者:飞龙 协议:CC BY-NC-SA 4.0 来自机器学习课程的个人笔记。随着我继续复习课程以“真正”理解它,这些笔记将继续…

[office] Excel2019函数MAXIFS怎么使用?Excel2019函数MAXIFS使用教程 #知识分享#微信#经验分享

Excel2019函数MAXIFS怎么使用?Excel2019函数MAXIFS使用教程 Excel2019函数MAXIFS怎么使用?这篇文章主要介绍了Excel2019函数MAXIFS使用教程,需要的朋友可以参考下 在今年,Excel除了新版本Excel2019,其中有一个新功能MAXIFS函数&am…

python_django高校运动会成绩管理系统4o4c3

田径运动会报名管理系统就是给学生进行网上报名,管理员管理报名信息的一种通用管理平台,从而方便管理人员对运动会的日常报名工作的管理。本系统的前台功能模块包括系统的基本操作、最新公告、运动项目和报名项目;系统的后台功能模块包括系统…

opencv计算机视觉

树莓派主机的无键盘解决 进入控制面板,更改适配器设置,WIFI属性,勾选 1.将网线两头分别接入树莓派和笔记本的网线接口 2.在无线连接属性那里勾选允许其他用户连接 3.运行cmd使用arp -a查看树莓派ip地址,或者使用ipscanner查看 cmd…

Nginx实战:1-安装搭建

目录 前言 一、yum安装 二、编译安装 1.下载安装包 2.解压 3.生成makefile文件 4.编译 5.安装执行 6.执行命令软连接 7.Nginx命令 前言 nginx的安装有两种方式: 1、yum安装:安装快速,但是无法在安装的时候带上想要的第三方包 2、…

巧用liteflow,告别if else,SpringBoot整合liteflow

假设有一个三个原子业务&#xff0c;吃饭、喝水、刷牙。 现在有三个场景&#xff0c;分别是 场景A: 吃饭->刷牙->喝水 官网地址&#xff1a;https://liteflow.cc/ 1.添加依赖&#xff1a; <dependency><groupId>com.yomahub</groupId><artifactI…

FPGA_工程_基于rom的vga显示

一 框图 二 代码修改 module Display #(parameter H_DISP 1280,parameter V_DISP 1024,parameter H_lcd 12d150,parameter V_lcd 12d150,parameter LCD_SIZE 15d10_000 ) ( input wire clk, input wire rst_n, input wire [11:0] lcd_xpos, //lcd horizontal coo…

python+django+vue汽车票在线预订系统58ip7

本课题使用Python语言进行开发。基于web,代码层面的操作主要在PyCharm中进行&#xff0c;将系统所使用到的表以及数据存储到MySQL数据库中 使用说明 使用Navicat或者其它工具&#xff0c;在mysql中创建对应名称的数据库&#xff0c;并导入项目的sql文件&#xff1b; 使用PyChar…

【Linux】学习-基础IO拓展篇

Linux基础IO拓展篇—详解文件系统 理解文件系统 在Linux基础IO篇中&#xff0c;我们站在用户的视角对文件进行了理解&#xff0c;主要是针对被打开的文件&#xff0c;那么有没有没有被打开的文件呢&#xff1f;当然有&#xff01;今天我们换个视角&#xff0c;来站在系统的角…

XSS-Lab

1.关于20关的payload合集。 <script>alert(1)</script> "><script>alert(1)</script> onclickalert(1) " onclick"alert(1) "><a href"javascript:alert(1)"> "><a HrEf"javascript:alert…