在上一篇文章中,已经知道了Flume的架构、概述、与安装,现在我们来用十个案例去学习flume的使用。
在使用之前,提供一个大致思想,使用Flume的过程是确定scource类型,channel类型和sink类型,编写conf文件并开启服务,在数据捕获端进行传入数据流入到目的地。
我们可以在官方文档找到案例的使用方法:
官方网址:Flume 1.11.0 User Guide — Apache Flumehttps://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html
案例使用
案例一、从控制台打入数据,在控制台显示
1、确定scource类型,channel类型和sink类型
确定的使用类型分别是,netcat source, memory channel, logger sink.
2、创建并编写conf文件
vim netcat2logger.conf
#a代表agent的名称,r1代表source的名称。c1代表channel名称,k1代表的是sink的名称
#声明各个组件
a.sources=r1
a.channels=c1
a.sinks=k1
#定义source类型,这里是试用netcat的类型
a.sources.r1.type=netcat
a.sources.r1.bind=192.168.254.100
a.sources.r1.port=8888
#定义source发送的下游channel
a.sources.r1.channels=c1#定义channel
a.channels.c1.type=memory
#缓存的数据条数
a.channels.c1.capacity=1000
#事务数据量
a.channels.c1.transactionCapacity=1000
#定义sink的类型,确定上游channel
a.sinks.k1.channel=c1
a.sinks.k1.type=logger
3、开启服务,我们重新开启一个master客户端进行开启服务
命令: 注意 -n 后面跟着的是你在conf文件中定义好的,-f 后面跟着的是编写conf文件的路径
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11/conf -f ./netcat2logger.conf -Dflume.root.logger=DEBUG,console
4.在原本客户端输入命令:
(1 yum install -y telnet
(2)telnet master 11223
此时在该端口可以输入一些数据测试,可以在启一个master客户端去监控flume.log(该文件产生在启动服务的同一个目录下)
案例二、从本地指定路径中打入数据到HDFS/hive中
HDFS:
1、确定scource类型,channel类型和sink类型
我们确定使用的类型分别是,spooldir source, memory channel, hdfs sink
2、编写conf文件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/bigdata29/flumedata2 --此目录需要自己指定创建
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/bigdata29/flumeout2/log_s/dt=%Y-%m-%d
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = log_%Y-%m-%d
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 100
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、 开启服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11/conf -f ./spooldir2hdfs.conf -Dflume.root.logger=DEBUG,console
4.将数据导入/usr/local/soft/bigdata29/flumedata2文件,并监控日志查看情况
再到hdfs中查看数据:
hive表
1.编写conf文件
vim spooldir2hive.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#指定spooldir的属性
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/soft/bigdata29/flumedata3
#时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp#指定sink的类型
a1.sinks.k1.type = hdfs
#指定hdfs的集群地址和路径,路径如果没有创建会自动创建
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hive/warehouse/bigdata29.db/students_flume
#指定hdfs路径下生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = students_test
#手动指定hdfs最小备份
a1.sinks.k1.hdfs.minBlockReplicas=1
#设置数据传输类型
a1.sinks.k1.hdfs.fileType = DataStream
#如果参数为0,不按照条数生成文件。如果参数为n,就是按照n条生成一个文件
a1.sinks.k1.hdfs.rollCount = 1000
#这个参数是hdfs下文件sink的数据size。每sink 32MB的数据,自动生成一个文件
a1.sinks.k1.hdfs.rollSize =0
#每隔n 秒 将临时文件滚动成一个目标文件。如果是0,就不按照时间进行生成目标文件。
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
#每次从channel中取出的条数
a1.sinks.k1.hdfs.batchSize=1000#指定channel
a1.channels.c1.type = memory
#暂存的条数
a1.channels.c1.capacity = 10000
#每次sink取的条数
a1.channels.c1.transactionCapacity = 1000
#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.启动hiveserver2服务并创建students_flume表
建表语句:
create table students_flume(id string,name string,age string,gender string,clazz st
ring)row format delimited fields terminated by ',';
3.开启服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11/conf -f ./spooldir2hive.conf -Dflume.root.logger=DEBUG,console
4.将数据导入到 flumedata3文件中:
案例三、从java代码中进行捕获打入到HDFS
1、确定的三个组件的类型是,avro source, memory channel, hdfs sink
2、打开maven项目,添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.11.0</version>
</dependency>
3、创建log4j配置文件,并加入以下内容
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.19.100
log4j.appender.flume.Port = 12345
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%m%n
4、编写Java代码
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.logging.Logger;
public class LoggerToFlume {
public static void main(String[] args) throws InterruptedException {
//创建一个logger对象
Logger logger = Logger.getLogger(LoggerToFlume.class.getName());
//创建一个日期格式化对象
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//写一个死循环打入日志
while (true){
//打入日志信息
logger.info("dataToBigdata29:"+simpleDateFormat.format(new Date()));
//时间设置为每0.1秒打入一次
Thread.sleep(100);
}
}
}
5、编写conf文件
vim avro2hdfs.conf
#定义agent名, source、channel、sink的名称
a.sources = r1
a.channels = c1
a.sinks = k1#具体定义source
a.sources.r1.type = avro
a.sources.r1.bind = 192.168.19.100
a.sources.r1.port = 12345#具体定义channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
a.channels.c1.transactionCapacity = 1000#具体定义sink
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = hdfs://master:9000/bigdata29/flumeout4/jd_goods
a.sinks.k1.hdfs.filePrefix = events-
a.sinks.k1.hdfs.minBlockReplicas=1
a.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a.sinks.k1.hdfs.rollCount = 1000
a.sinks.k1.hdfs.rollSize =0
#每隔N s将临时文件滚动成一个目标文件
a.sinks.k1.hdfs.rollInterval =0
a.sinks.k1.hdfs.idleTimeout=0#组装source、channel、sink
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
6、开启服务,命令:
flume-ng agent -n a -c usr/local/soft/flume1.11/conf -f ./avro2hdfs2.conf -Dflume.root.logger=DEBUG,console
7、运行java代码
案例四、监控HBase日志到Hbase表中
4.1监控HBase日志到Hbase表中
1、提前建好表
create 'foo_table','bar_cf'
2、编写conf文件
vim exec2base.conf
# a表示给agent命名为a
# 给source组件命名为r1
a.sources = r1
# 给sink组件命名为k1
a.sinks = k1
# 给channel组件命名为c1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = exec
a.sources.r1.command = tail -F /usr/local/soft/hbase-2.2.7/logs/hbase-root-master-master.log#指定sink的类型
a.sinks.k1.type = hbase2
a.sinks.k1.table = foo_table
a.sinks.k1.columnFamily = bar_cf
a.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 10000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transactionCapacity = 100# 组装
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
3、启动服务并监控hbase-root-master-master.log
启动服务命令:
flume-ng agent -n a -c /usr/local/soft/flume-1.11/conf -f ./exec2base.conf
-Dflume.root.logger=DEBUG,console
4、启动hive服务随意操作
4.2将文件数据打入到到Hive表中
1、开启hive服务并创建表
create table exec2hive
(
id string not null,
name string not null,
price string not null,
comments string not null,
shop string not null,
tags string not null
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
2.创建配置文件:
vim exec2hive.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/soft/bigdata29/wendang/jd_goods.txt#具体定义sink
a1.sinks.k1.type = hdfs#路径要与hive中表的路径一致
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hive/warehouse/bigdata29.db/exec2hive
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.minBlockReplicas=1
a1.sinks.k1.hdfs.fileType = DataStream
#不按照条数生成文件
a1.sinks.k1.hdfs.rollCount = 1000
a1.sinks.k1.hdfs.rollSize = 0
#每隔N s将临时文件滚动成一个目标文件
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.idleTimeout=0
a1.sinks.k1.hdfs.batchSize=100# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3、打入数据:
案例五、flume监控Http source
1、确定的三个组件的类型是,http source, memory channel, logger sink
2、编写conf文件
vim http2logger.conf
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
# 表示sink每次会从channel里取多少数据
a1.channels.c1.transactionCapacity=100
3、启动服务
flume-ng agent -n a1 -c /usr/local/soft/flume-1.11/conf -f ./http2logger.conf -Dflume.root.logger=DEBUG,console
4、另开一个窗口进行打数据
案例六、多路复制
图解:
1、将flume复制到node1,node2
scp -r flume-1.11 node1:`pwd`
scp -r flume-1.11 node2:`pwd`
2、将master的环境配置文件复制到node1,node2并source使其生效
scp /etc/profile node1:`pwd`
scp /etc/profile node2:`pwd`
3、在node1节点新建配置文件:
vim avro2logger.conf
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 192.168.19.110
a1.sources.r1.port = 11111a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
4、在node2节点新建配置文件:
vim avro2logger.conf
a2.sources = r2
a2.channels = c2
a2.sources.r2.type = avro
a2.sources.r2.channels = c2
a2.sources.r2.bind = 192.168.19.120
a2.sources.r2.port = 22222a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100a2.sinks = k2
a2.sinks.k2.type = logger
a2.sinks.k2.channel = c2
5、在master节点新建配置文件:
vim netcat2avro.conf
m.sources = r1
m.channels = c1 c2
m.sinks = k1 k2# Describe/configure the source
m.sources.r1.type = netcat
m.sources.r1.bind = 192.168.19.100
m.sources.r1.port = 12345
# Use a channel which buffers events in memory
m.channels.c1.type = memory
m.channels.c1.capacity = 1000
m.channels.c1.transactionCapacity = 100# Use a channel which buffers events in memory
m.channels.c2.type = memory
m.channels.c2.capacity = 1000
m.channels.c2.transactionCapacity = 100# Describe the sink
m.sinks.k1.type = avro
m.sinks.k1.hostname = 192.168.19.110
m.sinks.k1.port = 11111m.sinks.k2.type = avro
m.sinks.k2.hostname = 192.168.19.120
m.sinks.k2.port = 22222# Bind the source and sink to the channel
m.sources.r1.channels = c1 c2
m.sinks.k1.channel = c1
m.sinks.k2.channel = c2
6、先启动node1和node2节点的服务端,在启动master的
7、在master开一个端口数据
telnet master 12345
观察node1,node2的日志:
案例七、故障转移
效果上与hadoop中的高可用集群很相似,但是hadoop选举的机制是靠zookeeper,而flume是通过sinkgroups里priority属性配置的权重来决定哪台的优先级高,同一时间只能有一台机器工作。
与hadoop的高可用不同,如果当前的sink挂掉后切换为standby模式(假设优先级10),并立刻切换到另一台(假设优先级9),当sink修复好重新启动后,隔段时间会恢复使用优先级为10的sink。
1、master:vim netcat2failover.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 12345a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100#将数据写到另一台Flume服务器上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 11111#将数据写到另一台Flume服务器上
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 22222#使用sink processor来控制channel的数据流向
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
#设置优先级(分数)
a1.sinkgroups.g1.processor.priority.k1 = 1
a1.sinkgroups.g1.processor.priority.k2 = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
2、 node1: vim avro2logger.conf
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 192.168.19.110
a1.sources.r1.port = 11111a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
3、 node2: vim avro2logger.conf
a2.sources = r2
a2.channels = c2
a2.sources.r2.type = avro
a2.sources.r2.channels = c2
a2.sources.r2.bind = 192.168.19.120
a2.sources.r2.port = 22222a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100a2.sinks = k2
a2.sinks.k2.type = logger
a2.sinks.k2.channel = c2
4、先启动node1,node2服务,在启动master服务
5、开启端口号测试
因为设置的node1的优先级为1,node2的优先级为100,则数据打在了node2上
此时关闭node2服务数据便会打在node2上
如果再将node1服务开启,那么后续的数据还是会打在node1上,与hadoop的高可用集群不同,在此就不过多演示了
案例八、负载均衡
使用负载均衡以后,channel会轮训分配任务(随机),减少机器负荷
1、master上的配置文件:vim netcat2balance.conf
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 12345a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100#将数据写到另一台Flume服务器上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 11111#将数据写到另一台Flume服务器上
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 22222a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = randoma1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
2、node1和node2上配置与案例七相同即可
3、先启动node1,node2服务,在启动master服务
node1:flume-ng agent -n a1 -c /usr/local/soft/flume-1.11
onf -f ./avro2logger.conf -Dflume.root.logger=DEBUG,consolenode2:flume-ng agent -n a1 -c /usr/local/soft/flume-1.11
onf -f ./avro2logger.conf -Dflume.root.logger=DEBUG,consolemaster:flume-ng agent -n a1 -c /usr/local/soft/flume-1.
/conf -f ./netcat2balance.conf -Dflume.root.logger=DEBUG,console
4、启用54321端口观察node1和node2的日志文件:
案例九、聚合
node1、node2两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log 现在要求:
把node1、node2机器中的access.log、nginx.log、web.log 采集汇总到master机器上然后统一收集到hdfs中。 但是在hdfs中要求的目录为:
图解:
1、node1和node2创建都创建3个文件用来模拟数据源
mkdir -p /usr/local/soft/bigdata29/wendang/juhe
touch /usr/local/soft/bigdata29/wendang/juhe/access.log
touch /usr/local/soft/bigdata29/wendang/juhe/nginx.log
touch /usr/local/soft/bigdata29/wendang/juhe/web.log
2、node1和node2上配置文件
vim juhetohdfs.conf
# Name the components on this agent
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/soft/bigdata29/wendang/juhe/access.log
# static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
# Event: { headers:{type=access} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 0D hello world. }a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /usr/local/soft/bigdata29/wendang/juhe/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginxa1.sources.r3.type = exec
a1.sources.r3.command = tail -F /usr/local/soft/bigdata29/wendang/juhe/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 12345# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
3、master: vim juhetohdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = master
a1.sources.r1.port = 12345
# 添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp# 定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000# 定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://master:9000/bigdata29/flumelogs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# 时间类型
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 生成的文件不按条数生成
a1.sinks.k1.hdfs.rollCount = 0
# 生成的文件按时间生成
a1.sinks.k1.hdfs.rollInterval = 30
# 生成的文件按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760
# 批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 10000
# flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
# 操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000
# 组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、采集端文件生成脚本,在node1与node2上面编写shell脚本,模拟数据生成
vim createdata.sh
# !/bin/bash
num=1
while true
do
num=$((num+1))
echo "${num}_access" >> /usr/local/soft/bigdata17/scrips/taillogs/access.log;
echo "${num}_web" >> /usr/local/soft/bigdata17/scrips/taillogs/web.log;
echo "${num}_nginx" >> /usr/local/soft/bigdata17/scrips/taillogs/nginx.log;
sleep 0.5;
done
5、先启动服务master服务,再启动node1和node2的服务
6、启动脚本模拟数据产生并观察日志文件
node1:sh createdata.sh
node2:sh createdata.sh
案例十、自定义Interceptor(拦截器)
使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。
1、引入依赖
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <!-- 打包出来的带依赖jar包名称 --> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <!--下面是为了使用 mvn package命令,如果不加则使用mvn assembly--> <executions> <execution> <id>make-assemble</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
2、实现自定义的java代码
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 1. 如何自定义拦截器?
* flume的自定义拦截器需要实现Flume提供的Interceptor接口.
*
* 实现抽象方法:
* initialize: 完成一些初始化工作.
* close: 完成一些善后的工作
* intercept:拦截器的核心处理方法. 拦截的逻辑.
* intercept(Event event) : 单个event的拦截处理
* intercept(List<Event> events): 批次event的拦截处理
*
* 2. 拦截器的对象如何实例化?
* 在拦截器中定义一个static的内部类,实现Flume提供的Builder接口
*
* 实现抽象方法:
* build : 用于构建拦截器对象
* configure:用于读取配置信息(xxxx.conf)
*
*
*
*/
public class ChannelSelector implements Interceptor {
@Override
public void initialize() {
//不需要结合其他的组件,这里什么都不写
}
/*
将接收到的event数据进行解析,重新设置headers或者数据,返回一个新的event
headers的主要作用是为了后面判断给哪一个sink处理的依据
判断接收到的数据中是否包含shujia,如果包含,就设置一个键值对在headers中type=sj
否则设置为type=nsj
*/
@Override
public Event intercept(Event event) {
//获取headers的body内容,body就是我们所监控到的数据
String info = new String(event.getBody());
//获取headers,默认情况下,event中的headers是{}
Map<String, String> headers = event.getHeaders();
if(info.contains("caocao")){
headers.put("type","wei");
event.setHeaders(headers);
}else {
headers.put("type","dongwu");
event.setHeaders(headers);
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> events = new ArrayList<>();
for (Event event : events) {
events.add(intercept(event));
}
return events;
}
@Override
public void close() {
//因为没有额外的初始化连接,也不需要关闭
}
public static class MyBuilder implements Builder{
@Override
public Interceptor build() {
return new ChannelSelector();
}
@Override
public void configure(Context context) {
}
}
}
3、将代码打成jar包(带依赖),将jar包放在flume的lib目录下
4、配置node1文件
vim custom.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1a2.sources.r1.type = avro
a2.sources.r1.bind = node1
a2.sources.r1.port = 5555a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100a2.sinks.k1.type =logger
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、配置node2文件
vim custom.conf
a3.sources = r1
a3.channels = c1
a3.sinks = k1a3.sources.r1.type = avro
a3.sources.r1.bind = node2
a3.sources.r1.port = 6666a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100a3.sinks.k1.type = logger
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
6、配置master文件
vim custom.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2a1.sources.r1.type = netcat
a1.sources.r1.bind = master
a1.sources.r1.port = 11111#将选择器类型改为multiplexing分发
a1.sources.r1.selector.type = multiplexing
#检测每个event里head的title key
a1.sources.r1.selector.header = type
#如果title的值为at,吧event发到channel c1里,如果为ot,发到channel c2里,如果都不匹配,默认发到c2里
a1.sources.r1.selector.mapping.wei = c1
a1.sources.r1.selector.mapping.dongwu = c2
a1.sources.r1.selector.default=c2
#给拦截器命名i1
a1.sources.r1.interceptors = i1
#这里写自定义类的全类名
a1.sources.r1.interceptors.i1.type = com.shujia.jinjie.ChannelSelector$MyBuilder
# 组装channel与source
a1.sources.r1.channels = c1 c2a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 5555a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node2
a1.sinks.k2.port = 6666
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2