Flume的安装及使用
文章目录
- Flume的安装及使用
- Flume的安装
- 1、上传至虚拟机,并解压
- 2、重命名目录,并配置环境变量
- 3、查看flume版本
- 4、测试flume
- 5、flume的使用
Flume的安装
1、上传至虚拟机,并解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/soft/
在环境变量中增加如下命令,可以使用 soft 快速切换到 /usr/local/soft
alias soft=‘cd /usr/local/soft/’
2、重命名目录,并配置环境变量
mv apache-flume-1.9.0-bin/ flume-1.9.0
vim /etc/profile
source /etc/profile
3、查看flume版本
flume-ng version
[root@master soft]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@master soft]#
4、测试flume
-
监控端口,将数据打印至控制台
# a1表示 agent名称,具体定义在启动命令中 -n 参数中 # r1表示a1中的source名称 a1.sources = r1 # k1表示a1的sink名称 a1.sinks = k1 # c1表示a1中的channel名称 a1.channels = c1 # 表示a1的输入源类型为netcat端口类型 a1.sources.r1.type = netcat # 表示a1的监听主机,表示任意ip都可以 a1.sources.r1.bind = 0.0.0.0 # 表示a1的监听端口号 a1.sources.r1.port = 6666 # 表示a1的sinkc输出的目的地是控制台的logger类型 a1.sinks.k1.type = logger # 表示channel类型是memory内存类型 a1.channels.c1.type = memory # 表示a1的channel总容量为1000 event a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity = 100 # 表示将r1和c1连接起来 a1.sources.r1.channels = c1 # 表示将K1 和 c1 连接起来 a1.sinks.k1.channel=c1
启动命令
flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/job/natcat2logger.conf -Dflume.root.logger=INFO,console
参数说明:
-c 表示配置文件存储在conf目录中
-n 表示agent起名为a1
-f 表示flume启动读取的配置文件
-Dflume.root.logger=INFO,console 表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别 ,日志级别包括: log、info、warn、 error
6666端口来源:
-
监控一个目录,将数据打印出来
- 配置文件
# 首先先给agent起一个名字 叫a1 # 分别给source channel sink取名字 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 分别对source、channel、sink进行配置 # 配置source # 将source的类型指定为 spooldir 用于监听一个目录下文件的变化 # 因为每个组件可能会出现相同的属性名称,所以在对每个组件进行配置的时候 # 需要加上 agent的名字.sources.组件的名字.属性 = 属性值 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data/ a1.sources.r1.fileSuffix = .ok a1.sources.r1.fileHeader = true # 给r1这个souces配置一个拦截器并取名为 i1 a1.sources.r1.interceptors = i1 # 将拦截器i1的类型设置为timestamp 会将处理数据的时间以毫秒的格式插入event的header中 # a1.sources.r1.interceptors.i1.type = timestamp # 将拦截器i1的类型设置为regex_filter 会根据正则表达式过滤数据 a1.sources.r1.interceptors.i1.type = regex_filter # 配置正则表达式 # 如果匹配到那么获取整行数据 a1.sources.r1.interceptors.i1.regex = [0-9]+ # excludeEvents = true 表示将匹配到的过滤,未匹配到的放行 # a1.sources.r1.interceptors.i1.excludeEvents = true # 配置sink # 使用logger作为sink组件,可以将收集到数据直接打印到控制台 a1.sinks.k1.type = logger # 配置channel # 将channel的类型设置为memory,表示将event缓存在内存中 a1.channels.c1.type = memory # 组装 # 将sources的channels属性指定为c1 a1.sources.r1.channels = c1 # 将sinks的channel属性指定为c1 a1.sinks.k1.channel = c1
- 启动agent
flume-ng agent -n a1 -f ./spoolingtest.conf -Dflume.root.logger=DEBUG,console
- 新建/root/data目录
mkdir /root/data
- 在/root/data/目录下新建文件,输入内容,观察flume进程打印的日志
# 随意在a.txt中加入一些内容 vim /root/data/a.txt
5、flume的使用
-
spoolingToHDFS.conf
- 配置文件
# a表示给agent命名为a # 给source组件命名为r1 a.sources = r1 # 给sink组件命名为k1 a.sinks = k1 # 给channel组件命名为c1 a.channels = c1 #指定spooldir的属性 a.sources.r1.type = spooldir a.sources.r1.spoolDir = /root/data a.sources.r1.fileHeader = true a.sources.r1.interceptors = i1 a.sources.r1.interceptors.i1.type = timestamp #指定sink的类型 a.sinks.k1.type = hdfs a.sinks.k1.hdfs.path = /flume/data/dir1 # 指定文件名前缀 a.sinks.k1.hdfs.filePrefix = student # 指定达到多少数据量写一次文件 单位:bytes a.sinks.k1.hdfs.rollSize = 102400 # 指定多少条写一次文件 a.sinks.k1.hdfs.rollCount = 1000 # 指定文件类型为 流 来什么输出什么 a.sinks.k1.hdfs.fileType = DataStream # 指定文件输出格式 为text a.sinks.k1.hdfs.writeFormat = text # 指定文件名后缀 a.sinks.k1.hdfs.fileSuffix = .txt #指定channel a.channels.c1.type = memory a.channels.c1.capacity = 1000 # 表示sink每次会从channel里取多少数据 a.channels.c1.transactionCapacity = 100 # 组装 a.sources.r1.channels = c1 a.sinks.k1.channel = c1
- 在 /root/data/目录下准备数据
The Zen of Python, by Tim Peters Beautiful is better than ugly. Explicit is better than implicit. Simple is better than complex. Complex is better than complicated. Flat is better than nested. Sparse is better than dense. Readability counts. Special cases aren't special enough to break the rules. Although practicality beats purity. Errors should never pass silently. Unless explicitly silenced. In the face of ambiguity, refuse the temptation to guess. There should be one-- and preferably only one --obvious way to do it. Although that way may not be obvious at first unless you're Dutch. Now is better than never. Although never is often better than *right* now. If the implementation is hard to explain, it's a bad idea. If the implementation is easy to explain, it may be a good idea. Namespaces are one honking great idea -- let's do more of those!
- 启动agent
flume-ng agent -n a -f ./spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
注意如果Hadoop是3.x版本,那么需要将Hdfs中的lib目录下的guava包复制到Flume
执行如下:
mv $FLUME_HOME/lib/guava-11.0.2.jar $FLUME_HOME/lib/guava-11.0.2.jar.bak
cd /usr/local/soft/hadoop-3.1.3/share/hadoop/hdfs/lib
cp guava-27.0-jre.jar $FLUME_HOME/lib/
-
httpToLogger
- 配置文件
# a表示给agent命名为a # 给source组件命名为r1 a.sources = r1 # 给sink组件命名为k1 a.sinks = k1 # 给channel组件命名为c1 a.channels = c1 #指定http的属性 a.sources.r1.type = http a.sources.r1.port = 6666 #指定sink的类型 a.sinks.k1.type = logger #指定channel a.channels.c1.type = memory a.channels.c1.capacity = 1000 # 表示sink每次会从channel里取多少数据 a.channels.c1.transactionCapacity = 100 # 组装 a.sources.r1.channels = c1 a.sinks.k1.channel = c1
-
启动
- 先启动agent
flume-ng agent -n a -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
- 再使用curl发起一个http请求
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"},{ "headers" :{"a2" : "a11","b2" : "b11"},"body" : "hello~http~flume2~"}]' http://master:6666
-
exec
-
配置文件
# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /usr/local/soft/hive-3.1.2/log/hive.log # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = /flume/hivelog/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 60 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
-
启动
启动flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
启动hive,之后查看HDFS
-
-
单数据源多输出
Flume1监控文件内容变动,将监控到的内容分别给到flume2和flume3,flume2将内容写到HDFS, Flume3将数据写到本地文件系统
-
配置文件
flume1.conf #Named a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 #Source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /usr/local/soft/flume-1.9.0/job/taildir/.*\.txt # 对于监听的文件监听位置信息需要保存到该json文件中 a1.sources.r1.positionFile = /usr/local/soft/flume-1.9.0/job/position/position.json # channel selector a1.sources.r1.selector.type = replicating #Channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 100 #Sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 7777 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 8888 #Bind a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 flume2.conf a2.sources = r1 a2.channels = c1 a2.sinks = k1 #Source a2.sources.r1.type = avro a2.sources.r1.bind = localhost a2.sources.r1.port = 7777 #Channel a2.channels.c1.type = memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 #Sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = /flume/replicating/%Y%m%d/%H a2.sinks.k1.hdfs.filePrefix = logs- a2.sinks.k1.hdfs.round = true a2.sinks.k1.hdfs.roundValue = 1 a2.sinks.k1.hdfs.roundUnit = hour a2.sinks.k1.hdfs.useLocalTimeStamp = true a2.sinks.k1.hdfs.batchSize = 100 a2.sinks.k1.hdfs.fileType = DataStream a2.sinks.k1.hdfs.rollInterval = 60 a2.sinks.k1.hdfs.rollSize = 134217700 a2.sinks.k1.hdfs.rollCount = 0 #Bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 flume3.conf #Named a3.sources = r1 a3.channels = c1 a3.sinks = k1 #Source a3.sources.r1.type = avro a3.sources.r1.bind = localhost a3.sources.r1.port = 8888 #Channel a3.channels.c1.type = memory a3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 #Sink a3.sinks.k1.type = file_roll # 将数据保存到本地路径中 不断滚动创建,如果某个时间段没有数据,那么会创建一个空的文件 a3.sinks.k1.sink.directory = /usr/local/soft/flume-1.9.0/job/fileroll #Bind a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
* 启动 ```shell flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/Replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console
-