Flume1.9基础学习

文章目录

  • 一、Flume 入门概述
    • 1、概述
    • 2、Flume 基础架构
      • 2.1 Agent
      • 2.2 Source
      • 2.3 Sink
      • 2.4 Channel
      • 2.5 Event
    • 3、Flume 安装部署
      • 3.1 安装地址
      • 3.2 安装部署
  • 二、Flume 入门案例
    • 1、监控端口数据官方案例
      • 1.1 概述
      • 1.2 实现步骤
    • 2、实时监控单个追加文件
      • 2.1 概述
      • 2.2 实现步骤
    • 3、实时监控目录下多个新文件
      • 3.1 概述
      • 3.2 实现步骤
    • 4、实时监控目录下的多个追加文件
      • 4.1 概述
      • 4.2 实现步骤
      • 4.3 Taildir 问题说明
    • 5、Kafka相关
    • 6、Kafka群起脚本
  • 三、Flume 进阶
    • 1、Flume 事务
    • 2、Flume Agent 内部原理
    • 3、Flume 拓扑结构
      • 3.1 简单串联
      • 3.2 复制和多路复用
      • 3.3 负载均衡和故障转移
      • 3.4 聚合
    • 4、Flume 企业开发案例
      • 4.1 复制和多路复用
      • 4.2 负载均衡和故障转移
      • 4.3 聚合
    • 5、自定义 Interceptor
      • 5.1 概述
      • 5.2 官网实现
      • 5.3 代码实现
    • 6、自定义 Source
      • 6.1 概述
      • 6.2 需求与分析
      • 6.3 编码实现
    • 7、自定义 Sink
      • 7.1 概述
      • 7.2 需求
      • 7.3 编码实现
    • 8、Flume 数据流监控
      • 8.1 Ganglia 的安装与部署
      • 8.2 操作 Flume 测试监控
  • 四、企业真实面试题(重点)
    • 1、Flume 的 Source,Sink,Channel 的作用?你们 Source 是什么类型?
    • 2、Flume 的 Channel Selectors
    • 3、Flume 参数调优
    • 4、Flume 的事务机制
    • 5、Flume 采集数据会丢失吗?

一、Flume 入门概述

1、概述

Flume 是Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。Flume最主要的作用就是,实时读取服务器本地磁盘的数据(或者网络端口数据),将数据写入到HDFS

2、Flume 基础架构

2.1 Agent

Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。Agent 主要有 3 个部分组成,Source、Channel、Sink

2.2 Source

Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、 sequence generator、syslog、http、legacy

2.3 Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义

2.4 Channel

Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。Flume 自带两种 Channel:Memory Channel 和 File Channel。

Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据

2.5 Event

传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。 Event 由 HeaderBody 两部分组成,Header 用来存放该 event 的一些属性,为K-V 结构, Body 用来存放该条数据,形式为字节数组

3、Flume 安装部署

3.1 安装地址

  • Flume 官网地址:http://flume.apache.org/
  • 文档查看地址:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
  • 下载地址:http://archive.apache.org/dist/flume/

3.2 安装部署

# 首先已经搭建好hadoop和jdk了,可以参考之前的hadoop笔记
wget http://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
# 解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下
tar -zxf apache-flume-1.9.0-bin.tar.gz -C /opt/module/
# 修改 apache-flume-1.9.0-bin 的名称为 flume
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
# 将 lib 文件夹下的guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
rm /opt/module/flume/lib/guava-11.0.2.jar

二、Flume 入门案例

1、监控端口数据官方案例

1.1 概述

使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

首先通过netcat工具向本机的44444端口发送数据,Flume监控本机的44444端口,通过Flume的source端读取数据,最后Flume将获取的数据通过Sink端写出到控制台(测试命令使用nc localhost 44444)

1.2 实现步骤

# 安装 netcat 工具
sudo yum install -y nc
# nc -lk 9999
# nc local 9999 这样就可以聊天
# 判断 44444 端口是否被占用
sudo netstat -nlp | grep 44444
# 创建 Flume Agent 配置文件 flume-netcat-logger.conf
# 在 flume 目录下创建 job 文件夹并进入 job 文件夹
mkdir job 
cd job/cd ..
# 在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf
vim flume-netcat-logger.conf
# 添加内容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 一个sink只能一个channel,一个source可以多个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 先开启 flume 监听端口
# 第一种写法
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
# 第二种写法
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
# --conf/-c:表示配置文件存储在 conf/目录
# --name/-n:表示给 agent 起名为 a1
# --conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件。
# -Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error

# 使用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444


# 12/06/19 15:32:19 INFO source.NetcatSource: Source starting
# 12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
# 12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

2、实时监控单个追加文件

2.1 概述

案例需求:实时监控 Hive 日志,并上传到 HDFS 中

2.2 实现步骤

# Flume 要想将数据输出到 HDFS,依赖 Hadoop 相关 jar 包
# 检查/etc/profile.d/my_env.sh 文件,确认 Hadoop 和 Java 环境变量配置正确
JAVA_HOME=/opt/module/jdk1.8.0_212
HADOOP_HOME=/opt/module/ha/hadoop-3.1.3
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export PATH JAVA_HOME HADOOP_HOME

# 去job目录创建 flume-file-hdfs.conf 文件
vim flume-file-hdfs.conf
# 要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行Linux 命令来读取文件
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
# Describe the sink
a2.sinks.k2.type = hdfs
# 去core-site.xml查看,ha集群的话改成hdfs://mycluster
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9870/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件,单位s,一般配置3600
a2.sinks.k2.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
#注意:对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加 timestamp)

# 运行 Flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
# 开启 Hadoop 和 Hive 并操作 Hive 产生日志,去hive目录,有数据才会生成新的文件
bin/hive

3、实时监控目录下多个新文件

3.1 概述

案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS

3.2 实现步骤

vim flume-dir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://mycluster/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

# 启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
# 说明:在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED 结尾;被监控文件夹每 500 毫秒扫描一次文件变动

# 向 upload 文件夹中添加文件,在/opt/module/flume 目录下创建 upload 文件夹
mkdir upload
# 不能上传同名的,否则会挂掉
touch atguigu.txt

4、实时监控目录下的多个追加文件

4.1 概述

Exec source 适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source
适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而 Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传

案例需求:使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS

4.2 实现步骤

vim flume-taildir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = TAILDIR
# 记录JSON格式的文件,记录每个尾文件的inode、绝对路径和最后位置
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://mycluster/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩,还有gzip等
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

# 启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
# 在/opt/module/flume 目录下创建 files 文件夹
mkdir files{1,2}
echo hello >> file1.txt

4.3 Taildir 问题说明

Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件。

但是例如log4j的日志是每过凌晨自动更名为新的文件,这会导致数据的重复上传,若后端不配合,可以修改源码,flume-ng-sources→flume-taildir-source源码包,ReliableTailEventReader读数据,TailFile的更新数据,将更新和读取仅按照inode来。修改完成后打包去lib文件夹,替换掉原来的的jar包

5、Kafka相关

kafka相关文档:https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#kafka-channel

flume一般都和kafka配合使用,用于离线和实时数仓的数据获取,kafka source相当于kafka的消费者,channel数据会存储到kafka topic中,而kafka sink相当于生产者

进入flume软件目录,编写配置文件vim job/file_to_kafka.conf

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
# 自定义拦截器,这里是将不合格的json数据过滤
# a1.sources.r1.interceptors =  i1
# a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
# 不以flume event存储
a1.channels.c1.parseAsFlumeEvent = false

#组装
a1.sources.r1.channels = c1

测试

# 启动flume
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console

# 启动一个Kafka的Console-Consumer
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

同理还有kafka到hdfs的flume配置文件

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
# a1.sources.r1.interceptors = i1
# a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

6、Kafka群起脚本

#!/bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                # $2默认获取脚本第二个参数,加个反斜杠是获取数据中的第二个参数
                ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

三、Flume 进阶

1、Flume 事务

下面时commit和rollback的核心源码,回滚的时候,putList会直接清空,而takeList会将数据重新塞回到channel中(sink的hdfs写成功但通讯失败可能重复消费,source的nc可能会消息丢失);doCommit会提前判断channel够不够takeList回滚以防回滚失败

@Override
protected void doCommit() throws InterruptedException {
  int remainingChange = takeList.size() - putList.size();
  if (remainingChange < 0) {
    if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
      throw new ChannelException("Cannot commit transaction. Byte capacity " +
          "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
          "reached. Please increase heap space/byte capacity allocated to " +
          "the channel as the sinks may not be keeping up with the sources");
    }
    if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
      bytesRemaining.release(putByteCounter);
      throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
          " Sinks are likely not keeping up with sources, or the buffer size is too tight");
    }
  }
  int puts = putList.size();
  int takes = takeList.size();
  synchronized (queueLock) {
    if (puts > 0) {
      while (!putList.isEmpty()) {
        if (!queue.offer(putList.removeFirst())) {
          throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
        }
      }
    }
    putList.clear();
    takeList.clear();
  }
  bytesRemaining.release(takeByteCounter);
  takeByteCounter = 0;
  putByteCounter = 0;

  queueStored.release(puts);
  if (remainingChange > 0) {
    queueRemaining.release(remainingChange);
  }
  if (puts > 0) {
    channelCounter.addToEventPutSuccessCount(puts);
  }
  if (takes > 0) {
    channelCounter.addToEventTakeSuccessCount(takes);
  }

  channelCounter.setChannelSize(queue.size());
}

@Override
protected void doRollback() {
  int takes = takeList.size();
  synchronized (queueLock) {
    Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
        "Not enough space in memory channel " +
        "queue to rollback takes. This should never happen, please report");
    while (!takeList.isEmpty()) {
      queue.addFirst(takeList.removeLast());
    }
    putList.clear();
  }
  putByteCounter = 0;
  takeByteCounter = 0;

  queueStored.release(takes);
  channelCounter.setChannelSize(queue.size());
}

2、Flume Agent 内部原理

  • ChannelSelector

    ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 **Replicating(复制)**和 Multiplexing(多路复用)

    ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel

  • SinkProcessor

    SinkProcessor 共 有 三 种 类 型 , 分 别 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor

    DefaultSinkProcessor 对应的是单个的 Sink , LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能

3、Flume 拓扑结构

3.1 简单串联

这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

3.2 复制和多路复用

Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地

3.3 负载均衡和故障转移

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能

3.4 聚合

这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析

4、Flume 企业开发案例

4.1 复制和多路复用

使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 LocalFileSystem

# https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#avro-sink
# 在/opt/module/flume/job 目录下创建 group1 文件夹并进入
# 在/opt/module/datas/目录下创建 flume3 文件夹并进入

# group1下创建 flume-file-flume.conf
# 配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flumehdfs 和 flume-flume-dir
vim flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel,默认不写就是复制
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

# 创建 flume-flume-hdfs.conf,配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink
vim flume-flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://mycluster/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

# 创建 flume-flume-dir.conf,配置上级 Flume 输出的 Source,输出是到本地目录的 Sink
vim flume-flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2


# 注意输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录

执行配置文件并检查

# 分别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume
# 有先后关系

bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

4.2 负载均衡和故障转移

使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能

# https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sink-processors
# 在/opt/module/flume/job 目录下创建 group2 文件夹
# 配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2
# 创建 flume-netcat-flume.conf
vim flume-netcat-flume.conf

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

# 创建 flume-flume-console1.conf,配置上级 Flume 输出的 Source,输出是到本地控制台
vim flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

# 创建 flume-flume-console2.conf,配置上级 Flume 输出的 Source,输出是到本地控制台
vim flume-flume-console2.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2


# 分别开启对应配置文件:flume-flume-console2,flume-flume-console1,flumenetcat-flume
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

# 使用 netcat 工具向本机的 44444 端口发送内容
nc localhost 44444
# 使用 jps -ml 查看 Flume 进程

如果要换成负载均衡,只需要修改第一个文件

# 如果要改成负载均衡,就变成load_balance
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
# 开启退避
a1.sinkgroups.g1.processor.backoff = true
# 默认 round_robin随机,轮询是sink拉取轮询,不是推轮询
a1.sinkgroups.g1.processor.selector = random
# 退避最大的时间,防止一直退避下去
a1.sinkgroups.g1.processor.selector.maxTimeOut = 30000

4.3 聚合

hadoop102 上的 Flume-1 监控文件/opt/module/group.log,hadoop103 上的 Flume-2 监控某一个端口的数据流,Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台

# 分发 Flume,进入module模块分发
xsync flume
# 在 hadoop102、hadoop103 以及 hadoop104 的/opt/module/flume/job 目录下创建一个group3 文件夹
mkdir group3
# 创建 flume1-logger-flume.conf,配置 Source 用于监控 hive.log 文件,配置 Sink 输出数据到下一级 Flume
# 在 hadoop102 上编辑配置文件
vim flume1-logger-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


# 创建 flume2-netcat-flume.conf,配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume
# 在 hadoop103 上编辑配置文件
vim flume2-netcat-flume.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

# 创建 flume3-flume-logger.conf,配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台
vim flume3-flume-logger.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1


# 分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume1-logger-flume.conf
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume2-netcat-flume.conf

# 在 hadoop103 上向/opt/module 目录下的 group.log 追加内容
echo 'hello' > group.log
# 在 hadoop102 上向 44444 端口发送数据
telnet hadoop102 44444

5、自定义 Interceptor

5.1 概述

使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。

在该案例中,我们以端口数据模拟日志,以是否包含"atguigu"模拟不同类型的日志,我们需要自定义 interceptor 区分数据中是否包含"atguigu",将其分别发往不同的分析系统(Channel)

5.2 官网实现

https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#multiplexing-channel-selector

# 匹配header为state的值,一般需要我们实现拦截器,实现多路复用,只有在source可以使用
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

5.3 代码实现

创建一个 maven 项目,并引入以下依赖

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

定义 com.atguigu.interceptor.TypeInterceptor.CustomInterceptor 类并实现 Interceptor 接口

public class TypeInterceptor implements Interceptor {
    //声明一个存放事件的集合
    private List<Event> addHeaderEvents;
    @Override
    public void initialize() {
        //初始化存放事件的集合
        addHeaderEvents = new ArrayList<>();
    }
    //单个事件拦截
    @Override
    public Event intercept(Event event) {
        //1.获取事件中的头信息
        Map<String, String> headers = event.getHeaders();
        //2.获取事件中的 body 信息
        String body = new String(event.getBody());
        //3.根据 body 中是否有"atguigu"来决定添加怎样的头信息
        if (body.contains("atguigu")) {
            //4.添加头信息
            headers.put("type", "first");
        } else {
            //4.添加头信息
            headers.put("type", "second");
        }
        return event;
    }
    //批量事件拦截
    @Override
    public List<Event> intercept(List<Event> events) {
        //1.清空集合
        addHeaderEvents.clear();
        //2.遍历 events
        for (Event event : events) {
            //3.给每一个事件添加头信息
            addHeaderEvents.add(intercept(event));
        }
        //4.返回结果
        return addHeaderEvents;
    }
    @Override
    public void close() {
    }
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }
}

打包放入flume/lib目录下,启动时会自动通过反射扫描包

然后新建job/group4,编辑 flume 配置文件,为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor

# https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-interceptors
# Name the components on this agent.conf
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1
a1.sources.r1.selector.mapping.second = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

为 hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

启动,先启动103,104,最后启动102

# hadoop103
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2.conf -Dflume.root.logger=INFO,console
# hadoop104
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume3.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1.conf

6、自定义 Source

6.1 概述

官网给出的source:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources

Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequencegenerator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。官方也提供了自定义 source 的接口,根据官方说明自定义MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。实现相应方法:

  • getBackOffSleepIncrement() :backoff 步长
  • getMaxBackOffSleepInterval():backoff 最长时间
  • configure(Context context):初始化 context(读取配置文件内容)
  • process():获取数据封装成 event 并写入 channel,这个方法将被循环调用

使用场景:读取 MySQL 数据或者其他文件系统

6.2 需求与分析

使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置

6.3 编码实现

导入依赖

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

创建com.atguigu.source.MySource

public class MySource extends AbstractSource implements
        Configurable, PollableSource {
    //定义配置文件将来要读取的字段
    private Long delay;
    private String field;
    //初始化配置信息
    @Override
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field", "Hello!");
    }
    @Override
    public Status process() throws EventDeliveryException {
        try {
            //创建事件头信息
            HashMap<String, String> hearderMap = new HashMap<>();
            //创建事件
            SimpleEvent event = new SimpleEvent();
            //循环封装事件
            for (int i = 0; i < 5; i++) {
                //给事件设置头信息
                event.setHeaders(hearderMap);
                //给事件设置内容
                event.setBody((field + i).getBytes());
                //将事件写入 channel,这里面包括了拦截器和channel选择器(包括事务),可以结合之前的流程读一下
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return Status.BACKOFF;
        }
        return Status.READY;
    }
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
}

将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下,然后创建配置文件,启动可以查看效果

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.atguigu.source.MySource
a1.sources.r1.delay = 1000
# a1.sources.r1.field = atguigu
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

7、自定义 Sink

7.1 概述

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。Sink 是完全事务性的。在从Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。官方也提供了自定义 sink 的接口:MySink 需要继承 AbstractSink 类并实现 Configurable 接口。实现相应方法:

  • configure(Context context):初始化 context(读取配置文件内容)
  • process():从 Channel 读取获取数据(event),这个方法将被循环调用。使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。

7.2 需求

使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置

7.3 编码实现

public class MySink extends AbstractSink implements Configurable {
    //创建 Logger 对象
    private static final Logger LOG =
            LoggerFactory.getLogger(AbstractSink.class);
    private String prefix;
    private String suffix;
    @Override
    public Status process() throws EventDeliveryException {
        //声明返回值状态信息
        Status status;
        //获取当前 Sink 绑定的 Channel
        Channel ch = getChannel();
        //获取事务
        Transaction txn = ch.getTransaction();
        //声明事件
        Event event;
        //开启事务
        txn.begin();
        //读取 Channel 中的事件,直到读取到事件结束循环
        while (true) {
            event = ch.take();
            if (event != null) {
                break;
            }
        }
        try {
            //处理事件(打印)
            LOG.info(prefix + new String(event.getBody()) +
                    suffix);
            //事务提交
            txn.commit();
            status = Status.READY;
        } catch (Exception e) {
            //遇到异常,事务回滚
            txn.rollback();
            status = Status.BACKOFF;
        } finally {
            //关闭事务
            txn.close();
        }
        return status;
    }
    @Override
    public void configure(Context context) {
        //读取配置文件内容,有默认值
        prefix = context.getString("prefix", "hello:");
        //读取配置文件内容,无默认值
        suffix = context.getString("suffix");
    }
}

将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下,然后编写配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.atguigu.MySink
#a1.sinks.k1.prefix = atguigu:
a1.sinks.k1.suffix = :atguigu
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

开启任务

bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
# 开启
nc localhost 44444

8、Flume 数据流监控

8.1 Ganglia 的安装与部署

Ganglia 由 gmond、gmetad 和 gweb 三部分组成。

  • gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond,你可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等。
  • gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务。
  • gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据
# 在 102 103 104 分别安装 epel-release
sudo yum -y install epel-release
# 在 102 安装
sudo yum -y install ganglia-gmetad
sudo yum -y install ganglia-web
sudo yum -y install ganglia-gmond
# 在 103 和 104 安装
sudo yum -y install ganglia-gmond

# 在 102 修改配置文件/etc/httpd/conf.d/ganglia.conf
sudo vim /etc/ganglia/gmetad.conf
# 修改为:data_source "my cluster" hadoop102

# 在 102 103 104 修改配置文件/etc/ganglia/gmond.conf
sudo vim /etc/ganglia/gmond.conf
# 修改三个地方,name = "my cluster";host = hadoop102;bind = 0.0.0.0
cluster {
 name = "my cluster"
 owner = "unspecified"
 latlong = "unspecified"
 url = "unspecified"
}
 #bind_hostname = yes # Highly recommended, soon to be default.
 # This option tells gmond to use a source 
address
 # that resolves to the machine's hostname. 
Without
 # this, the metrics may appear to come from 
any
 # interface and the DNS names associated with
 # those IPs will be used to create the RRDs.
 # mcast_join = 239.2.11.71
 # 数据发送给 hadoop102
 host = hadoop102
 port = 8649
 ttl = 1
}
udp_recv_channel {
 # mcast_join = 239.2.11.71
 port = 8649
# 接收来自任意连接的数据
 bind = 0.0.0.0
 retry_bind = true
 # Size of the UDP buffer. If you are handling lots of metrics 
you really
 # should bump it up to e.g. 10MB or even higher.
 # buffer = 10485760
}


# 在 102 修改配置文件/etc/selinux/config
sudo vim /etc/selinux/config
# 设置SELINUX=disabled
# 上面生效要重启,可以暂时生效
sudo setenforce 0

#  设置权限
sudo chmod -R 777 /var/lib/ganglia
# 去hadoop102设置所有ip访问
sudo vim /etc/httpd/conf.d/ganglia.conf
# 添加Require all granted

# 打开网页启动(感觉界面很low)
http://hadoop102/ganglia

8.2 操作 Flume 测试监控

bin/flume-ng agent \
-c conf/ \
-n a1 \
-f job/flume-netcat-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=hadoop102:8649


# 发送
nc localhost 44444

四、企业真实面试题(重点)

1、Flume 的 Source,Sink,Channel 的作用?你们 Source 是什么类型?

  • Source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
  • Channel 组件对采集到的数据进行缓存,可以存放在 Memory 或 File 中
  • Sink 组件是用于把数据发送到目的地的组件,目的地包括 Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义

2、Flume 的 Channel Selectors

3、Flume 参数调优

  • Source

    增加 Source 个(使用 Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数,适当调大这个参数可以提高 Source 搬运Event 到 Channel 时的性能。

  • Channel

    type 选择 memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。type 选择 file 时Channel 的容错性更好,但是性能上会比 memory channel 差。使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。Capacity 参数决定 Channel 可容纳最大的 event 条数。transactionCapacity 参数决定每次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大 event 条数。transactionCapacity 需要大于 Source Sink batchSize 参数

  • Sink

    增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好够用就行,过多的 Sink 会占用系统资源,造成系统资源不必要的浪费。batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数,适当调大这个参数可以提高 Sink 从 Channel 搬出 event 的性能

4、Flume 的事务机制

Flume 的事务机制(类似数据库的事务机制):Flume 使用两个独立的事务分别负责从Soucrce 到 Channel,以及从 Channel 到 Sink 的事件传递。

比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那么Soucrce 就将该文件标记为完成。同理,事务以类似的方式处理从 Channel 到 Sink 的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到 Channel 中,等待重新传递。

5、Flume 采集数据会丢失吗?

根据 Flume 的架构原理,Flume 是不可能丢失数据的,其内部有完善的事务机制, Source 到 Channel 是事务性的,Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是 Channel 采用 memoryChannel,agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。

Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/348495.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ElasticSearch搜索引擎入门到精通

ES 是基于 Lucene 的全文检索引擎,它会对数据进行分词后保存索引,擅长管理大量的数据,相对于 MySQL 来说不擅长经常更新数据及关联查询。这篇文章就是为了进一步了解一下它,到底是如何做到这么高效的查询的。 在学习其他数据库的时候我们知道索引是一个数据库系统极其重要…

OpenAI正式推出GPT商店 ChatGPT团队订阅服务一并推出

2024年1月11日消息&#xff0c;据外媒报道&#xff0c;如上周在给开发者的邮件中所宣布的一样&#xff0c;因ChatGPT而名声大噪的人工智能公司OpenAI&#xff0c;在本周正式推出了GPT商店&#xff0c;供用户分享和发现个性化的ChatGPT&#xff0c;同时他们也推出了面向各种不同…

【Java 数据结构】ArrayList与顺序表

ArrayList 1.线性表2.顺序表2.1 接口的实现 3. ArrayList简介4. ArrayList使用4.1 ArrayList的构造4.2 ArrayList常见操作4.3 ArrayList的遍历4.4 ArrayList的扩容机制 1.线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一…

Java / Spring Boot + POI 给 Word 添加水印

1、前言(瞎扯) 事情是这样子的&#xff1a;我好哥们所在的公司实在是太忙了&#xff0c;多线程都开起来了还是忙不完&#xff0c;只能开了子线程叫我帮他整一个给 Word 加水印的&#xff0c;于是我就网上找呗~ 看到那个 Aspose 好像是收费的&#xff0c;然后就把目光转向了 POI…

基于物联网设计的水稻田智能灌溉系统(STM32+华为云IOT)

一、项目介绍 随着科技的不断发展和人们生活水平的提高&#xff0c;农业生产也逐渐向智能化、高效化的方向发展。水稻作为我国主要的粮食作物之一&#xff0c;其生长过程中的灌溉管理尤为重要。传统的灌溉方式往往依赖于人工观察和控制&#xff0c;不仅效率低下&#xff0c;而…

城市开发区视频系统建设方案:打造视频基座、加强图像数据治理

一、背景需求 随着城市建设的步伐日益加快&#xff0c;开发区已经成为了我国工业化、城镇化和对外开放的重要载体。自贸区、开发区和产业园的管理工作自然也变得至关重要。在城市经开区的展览展示馆、进出口商品展示交易中心等地&#xff0c;数千路监控摄像头遍布各角落&#…

Spring-Kafka 3.0 消费者消费失败处理方案

一、背景 我们作为Kafka在使用Kafka是&#xff0c;必然考虑消息消费失败的重试次数&#xff0c;重试后仍然失败如何处理&#xff0c;要么阻塞&#xff0c;要么丢弃&#xff0c;或者保存 二、设置消费失败重试次数 1 默认重试次数在哪里看 Kafka3.0 版本默认失败重试次数为1…

27.移除元素(力扣LeetCode)

文章目录 27.移除元素&#xff08;力扣LeetCode&#xff09;题目描述方法一&#xff1a;vector成员函数&#xff1a;erase方法二&#xff1a;暴力解法方法三&#xff1a;双指针法 27.移除元素&#xff08;力扣LeetCode&#xff09; 题目描述 给你一个数组 nums 和一个值 val&…

【开源】基于JAVA语言的班级考勤管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 系统基础支持模块2.2 班级学生教师支持模块2.3 考勤签到管理2.4 学生请假管理 三、系统设计3.1 功能设计3.1.1 系统基础支持模块3.1.2 班级学生教师档案模块3.1.3 考勤签到管理模块3.1.4 学生请假管理模块 3.2 数据库设…

python Django入门

1.创建Django项目 方式一:进入到指定要存放项目的目录&#xff0c;执行*django-admin startproject “projectname”* 来创建一个名方式二:使用Pycharm专业版创建Django项目 创建项目后&#xff0c;默认的目录结构: manage.py:是Django用于管理本项目的命令行工具&#xff0c…

前出深入-机器学习

文章目录 一、K近邻算法1.1 先画一个散列图1.2 使用K最近算法建模拟合数据1.3 进行预测1.4 K最近邻算法处理多元分类问题1.5 K最近邻算法用于回归分析1.6 K最近邻算法项目实战-酒的分类1.6.1 对数据进行分析1.6.2 生成训练数据集和测试数据集1.6.3 使用K最近邻算法对数据进行建…

如何在Shopee平台上进行选品 :充分利用渠道获取灵感和数据支持

在Shopee平台上进行选品是一个关键的决策过程&#xff0c;它直接影响到卖家的销售业绩和店铺的发展。为了帮助卖家更好地进行选品&#xff0c;Shopee提供了多种渠道来获取灵感和数据支持。下面将介绍一些主要的选品渠道以及如何利用它们来进行选品。 先给大家推荐一款shopee知…

【DDD】学习笔记-深入分析软件的复杂度

软件复杂度的成因 Eric Evans 的经典著作《领域驱动设计》的副标题为“软件核心复杂性应对之道”&#xff0c;这说明了 Eric 对领域驱动设计的定位就是应对软件开发的复杂度。Eric 甚至认为&#xff1a;“领域驱动设计只有应用在大型项目上才能产生最大的收益”。他通过 Smart…

C#,数据检索算法之线性检索(Linear Search)的源代码

数据检索算法是指从数据集合&#xff08;数组、表、哈希表等&#xff09;中检索指定的数据项。 数据检索算法是所有算法的基础算法之一。 线性&#xff1f;听起来就“高大上”&#xff0c;其实&#xff0c;只不过就是挨个比较呗。 本文发布&#xff08;听起来很正式 &#x…

TarGAN:多模态医学图像转换GAN

TarGAN 核心思想网络结构 核心思想 论文&#xff1a;https://arxiv.org/abs/2105.08993 代码&#xff1a;https://github.com/2165998/TarGAN 解决的问题&#xff1a;传统多模态医学图像转换通常&#xff0c;在生成高质量图像方面存在问题&#xff0c;特别是在关键目标区域或…

C语言中计算结构体的大小

一. 使用sizeof 计算结构体的大小 通常情况下&#xff0c;我们习惯于使用 sizeof 运算符来计算结构体的大小。 例如&#xff0c;下面是一个结构体的定义&#xff1a; struct Student {int id;char name[20];int age;float score; }; 其中&#xff0c;Student是该结构体的类…

IP被封怎么办?如何绕过IP禁令?

相信很多人遇到过IP禁令&#xff1a;比如你在访问社交媒体、搜索引擎或电子商务网站时会被限制访问&#xff0c;又或者你的的账号莫名被封&#xff0c;这些由于网络上的种种限制我们经常会遭遇IP被封的情况&#xff0c;导致无法使用继续进行网络行动。在本文中&#xff0c;我们…

Django笔记(七):JWT认证

首 前后端分离的项目更多使用JWT认证——Json Web Token。本文记录djangorestframework-simplejwt的使用方式。文档 安装 pip install djangorestframework-simplejwt 配置settings.py: INSTALLED_APPS [rest_framework_simplejwt, ]REST_FRAMEWORK {DEFAULT_AUTHENTICA…

ARM 400系列控制器IP简介

1. GIC-400 GIC-400是一个高性能、区域优化的中断控制器&#xff0c;具有高级微控制器总线架构&#xff08;AMBA&#xff09;高级可扩展接口&#xff08;AXI&#xff09;接口。它在片上系统&#xff08;SoC&#xff09;配置中检测、管理和分配中断。你可以对GIC-400进行配置&am…

shell脚本基础之循环语句

目录 一、循环语句的概念 二、for循环语句 1、列表循环 2、列表for循环案例大全 案例一 案例二 案例三 案例四 案例五 案例六 案例七 案例八 3、不带列表循环 4、类似C语言风格的for循环 5、for循环总结 三、while循环语句 1、while循环语句格式 2、while死循…