flume参考网址:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了https://flume.liyifeng.org/?flag=fromDoc#要求:
使用Flume将日志抽取到hdfs上:通过java代码编写一个拦截器,将日志中不是json数据的数据过滤掉,只保留符合json格式的数据,抽出到hdfs中的数据要有日期路径
日志数据:
最终到hdfs上的数据格式:
步骤:
①首先确保hdfs的服务是开启的
start-dfs.sh
②将使用java写好的jar包导入到flume的lib下
③编写json文件
#为各组件命名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/behavior/.*
a1.sources.r1.positionFile = /root/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.wolfcode.flume.interceptor.ETLInterceptor$Builder
a1.sources.r1.interceptors = i2
a1.sources.r1.interceptors.i2.type = cn.wolfcode.flume.interceptor.TimeStampInterceptor$Builder
## channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /behavior/origin/log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat= Text
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
抽取:
flume-ng agent -n a1 -c ../ -f taildir2hdfs.conf -Dflume.root.logger=INFO,console
关于时间戳拦截器:
假如hdfs中使用了时间转义字符,此时必须指定时间,两种方案
(1)使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp =true
(2)使用时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp