大数据技术学习笔记(十一)—— Flume

目录

  • 1 Flume 概述
    • 1.1 Flume 定义
    • 1.2 Flume 基础架构
  • 2 Flume 安装
  • 3 Flume 入门案例
    • 3.1 监控端口数据
    • 3.2 实时监控单个追加文件
    • 3.3 实时监控目录下多个新文件
    • 3.4 实时监控目录下的多个追加文件
  • 4 Flume 进阶
    • 4.1 Flume 事务
    • 4.2 Flume Agent 内部原理
    • 4.3 Flume 拓扑结构
      • 4.3.1 简单串联
      • 4.3.2 复制和多路复用
      • 4.3.3 负载均衡和故障转移
      • 4.3.4 聚合
    • 4.4 企业开发案例
      • 4.4.1 复制
      • 4.4.2 负载均衡
      • 4.4.3 故障转移
      • 4.4.4 聚合
    • 4.5 自定义 Interceptor
    • 4.6 自定义 Source
    • 4.7 自定义 Sink
    • 4.8 Flume 数据流监控
      • 4.8.1 Ganglia 的安装与部署
      • 4.8.2 操作 Flume 测试监控

1 Flume 概述

1.1 Flume 定义


Flume 是 Cloudera 公司提供的一个 高可用 的, 高可靠 的,分布式海量日志采集聚合传输 的系统。Flume 基于流式架构,灵活简单。

这里的日志不是指框架工作运行的日志,而是跟业务相关的日志数据,如用户行为数据等

在这里插入图片描述

Flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到 HDFS。

1.2 Flume 基础架构


Flume 组成架构如下图所示。

在这里插入图片描述
(1)Agent

Agent 是一个JVM 进程,它以 事件 的形式将数据从源头送至目的地。
Agent 主要有 3 个部分组成,SourceChannelSink

(2)Source

Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy。

Flume中有两种source

  • Pullable Source,TailDirSource就是这种Source,这种Source是主动拉取数据,而不是由数据源推送过来的,这种Source在回滚等待的过程中source不会继续拉取数据。
  • Eventdriven Source,这种Source中的数据是由数据源主动不停的提交数据,在事务回滚的时候,会停止接收数据,这时有可能会产生数据丢失,这种丢失并不是发生在Flume内部,而是发生在Flume和数据源之间。

(3)Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

Flume 与 Flume 之间使用 avro

(4)Channel

Channel 是位于 SourceSink 之间的 缓冲区。因此,Channel 允许 SourceSink 运作在不同的速率上。Channel线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。

Flume 自带两种 Channel:Memory ChannelFile Channel

  • Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
  • File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

(5)Event

Event 是传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。EventHeaderBody 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为 字节数组

在这里插入图片描述

2 Flume 安装


请移步 Flume 安装与部署

3 Flume 入门案例


Flume 官方文档

3.1 监控端口数据


案例需求

使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

需求分析

确定每一个组件的类型

在这里插入图片描述

实现步骤

(1)在 flume 目录下创建 jobs 文件夹并进入 jobs 文件夹

[huwei@hadoop101 ~]$ cd /opt/module/flume-1.9.0/
[huwei@hadoop101 flume-1.9.0]$ mkdir jobs
[huwei@hadoop101 flume-1.9.0]$ cd jobs

(2)新建并编辑 flume-netcat-logger.conf 文件

[huwei@hadoop101 jobs]$ vim flume-netcat-logger.conf

添加如下内容

# Named
# a1表示agent的名称
a1.sources = r1 # r1表示a1的Source的名称
a1.channels = c1 # c1表示a1的channel的名称
a1.sinks = k1  # k1表示a1的Sink的名称

# Source
a1.sources.r1.type = netcat # 表示a1的输入源类型为netcat端口类型
a1.sources.r1.bind = localhost # 表示a1的监听主机
a1.sources.r1.port = 6666 # 表示a1的监听端口号

# Channel
a1.channels.c1.type = memory # 表示a1的channel类型是memory内存型
a1.channels.c1.capacity = 10000 # 表示a1的channel总容量是10000个event
a1.channels.c1.transactionCapacity = 100 # 表示a1的channel传输时收集到了100条event以后再去提交事务

# Sink
a1.sinks.k1.type = logger # 表示a1的输出目的地是控制台logger类型

# Bind
a1.sources.r1.channels = c1 # 表示将r1和c1连接起来
a1.sinks.k1.channel = c1 # 表示将k1和c1连接起来

(3)安装 netcat 工具(实现客户端发送数据到端口)

[huwei@hadoop101 jobs]$ sudo yum install -y nc

(4)开启 flume 监听端口

[huwei@hadoop101 jobs]$ flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/jobs/flume-netcat-logger.conf --name a1

也可以简写为

[huwei@hadoop101 jobs]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume-netcat-logger.conf -n a1

Logger Sink 本质上就是 log4j 的实现,默认是往文件里存日志

(5)使用 netcat 工具向本机的 6666 端口发送内容

重新开启一个 shell 窗口

[huwei@hadoop101 ~]$ nc localhost 6666

在这里插入图片描述
(6)查看日志

jobs 文件夹下会多一个 logs 文件夹

[huwei@hadoop101 jobs]$ cd /opt/module/flume-1.9.0/jobs/logs/
[huwei@hadoop101 logs]$ cat flume.log

在这里插入图片描述

(7)设置将日志打印到控制台

[huwei@hadoop101 flume-1.9.0]$ cd conf
[huwei@hadoop101 conf]$ vim log4j.properties

在这里插入图片描述
但是上述通过配置文件将日志打印控制台的方式并不推荐,因为我们并不是每次都需要打印控制台,推荐指定参数动态修改

修改(4)中的命令,添加参数 -Dflume.root.logger=INFO,console

[huwei@hadoop101 jobs]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume-netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console

此时重新启动(5)中的窗口再次发送数据,就可以在监听窗口的控制台看到发送的数据了

3.2 实时监控单个追加文件


案例需求

实时 监控单个追加文件,并将监控到的内容上传到 HDFS 中

需求分析

确定每一个组件的类型

在这里插入图片描述

实现步骤

(1)在 flume 目录下 jobs 文件夹下,新建一个要监控文件

[huwei@hadoop101 jobs]$ touch tail.txt

(2)新建并编辑 flume-exec-hdfs.conf 文件

[huwei@hadoop101 jobs]$ vim flume-exec-hdfs.conf

添加如下内容

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 

#Source
a1.sources.r1.type = exec 
a1.sources.r1.command = tail -f /opt/module/flume-1.9.0/jobs/tail.txt

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop101:9820/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

#Bind
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

这里 Sink 的端口号是 9820,跟着教程走写的 8020报错了,参考 flume 中sink用hdfs sink报拒绝连接错误hdfs-io

(3)开启 flume 监听端口

[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume-exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

建议加上-Dflume.root.logger=INFO,console,方便直观地观察

(4)向监控文件追加数据

[huwei@hadoop101 ~]$ cd /opt/module/flume-1.9.0/jobs/
[huwei@hadoop101 jobs]$ echo a >> tail.txt
[huwei@hadoop101 jobs]$ echo b >> tail.txt

观察 hdfs 文件系统 http://hadoop101:9870

在这里插入图片描述

3.3 实时监控目录下多个新文件


案例需求

实时监控目录下多个新文件,并上传到 HDFS 。

需求分析

确定每一个组件的类型

在这里插入图片描述

实现步骤

(1)在 flume 目录下 jobs 文件夹下,新建一个要监控目录

[huwei@hadoop101 jobs]$ mkdir spooling

(2)新建并编辑 flume-spooling-hdfs.conf 文件

[huwei@hadoop101 jobs]$ vim flume-spooling-hdfs.conf

添加如下内容

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 

#Source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/module/flume-1.9.0/jobs/spooling
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.ignorePattern = .*\.tmp # 忽略后缀名是.tmp的文件


#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop101:9820/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

#Bind
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

(3)开启 flume 监听端口

[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume-spooling-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

(4)新建文件向监控目录中逐个增加文件

[huwei@hadoop101 jobs]$ touch file1.txt
[huwei@hadoop101 jobs]$ touch file2.txt
[huwei@hadoop101 jobs]$ echo file1 > file1.txt
[huwei@hadoop101 jobs]$ echo file2 > file2.txt
[huwei@hadoop101 jobs]$ mv file1.txt ./spooling/
[huwei@hadoop101 jobs]$ mv file2.txt ./spooling/

观察 hdfs 文件系统 http://hadoop101:9870

(5)再去看 spooling 文件夹下的文件

文件已被添加后缀 .COMPLETED,用来区分是不是新文件,当添加的文件后缀本身就是.COMPLETED,flume 就不会认为它是新文件就不会采集

[huwei@hadoop101 jobs]$ cd spooling/
[huwei@hadoop101 spooling]$ ll
总用量 8
-rw-rw-r--. 1 huwei huwei 6 12月 17 15:20 file1.txt.COMPLETED
-rw-rw-r--. 1 huwei huwei 6 12月 17 15:20 file2.txt.COMPLETED

3.4 实时监控目录下的多个追加文件


案例需求

实时监控目录下多个追加文件,将内容上传到 HDFS 中。

需求分析

确定每一个组件的类型

在这里插入图片描述

实现步骤

(1)在 flume 目录下 jobs 文件夹下,新建一个要监控目录,并在其中创建一些文件

[huwei@hadoop101 jobs]$ mkdir taildir
[huwei@hadoop101 jobs]$ cd taildir
[huwei@hadoop101 taildir]$ touch file1.txt
[huwei@hadoop101 taildir]$ touch file2.txt
[huwei@hadoop101 taildir]$ touch log1.log
[huwei@hadoop101 taildir]$ touch log2.log

(2)新建一个存放每一个文件所采集到数据的位置的文件夹

[huwei@hadoop101 jobs]$ mkdir position

(3)新建并编辑 flume-taildir-hdfs.conf 文件

[huwei@hadoop101 jobs]$ vim flume-taildir-hdfs.conf

添加如下内容

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2 # 将文件分组
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt # f1组负责监控.txt文件
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log # f2组负责监控.log文件
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json # 断点续传

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop101:9820/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

#Bind
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

(4)开启 flume 监听端口

[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume-taildir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

(5)开始逐个往监控目录中的文件中追加数据

[huwei@hadoop101 taildir]$ echo file1 > file1.txt
[huwei@hadoop101 taildir]$ echo file2 > file2.txt
[huwei@hadoop101 taildir]$ echo log1 > log1.log
[huwei@hadoop101 taildir]$ echo log2 > log2.log

观察 hdfs 文件系统 http://hadoop101:9870

(6)查看 position.json 文件

[huwei@hadoop101 position]$ cat position.json
{"inode":535145,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file1.txt"},
{"inode":535146,"pos":6,"file":"/opt/module/flume-1.9.0/jobs/taildir/file2.txt"},
{"inode":535147,"pos":5,"file":"/opt/module/flume-1.9.0/jobs/taildir/log1.log"},
{"inode":535148,"pos":5,"file":"/opt/module/flume-1.9.0/jobs/taildir/log2.log"}

Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File 中更新每个文件读取到的最新的位置,因此能够实现断点续传。

注意:Linux中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。

4 Flume 进阶

4.1 Flume 事务


在这里插入图片描述

Put 事务流程

  • doPut:将批数据先写入临时缓冲区 putList
  • doCommit:检查channel 内存队列是否足够
  • doRollback:channel 内存队列空间不足,回滚数据(把数据直接丢掉,给Source端抛出异常,Source端会重新采集这一批数据)

Take 事务

  • doTake:将数据提取到临时缓冲区 takeList,并将数据发送到 HDFS
  • doCommit:如果数据全部发送成功,则清除临时缓冲区 takeList
  • doRollback:数据发送过程中如果出现异常,回滚数据,将临时缓冲区 takeList 中的数据归还给 channel 内存队列(如果 takeList处理到一半出现异常,则可能会导致数据重复)

4.2 Flume Agent 内部原理


在这里插入图片描述

(1)Channel Selector

ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)。

  • ReplicatingChannelSelector (默认)会将同一个 Event 发往所有的 Channel
  • MultiplexingChannelSelector 会根据相应的原则,将不同的 Event 发往不同的 Channel。

(2)SinkProcessor

SinkProcessor 共有三种类型,分别是DefaultSinkProcessorLoadBalancingSinkProcessorFailoverSinkProcessor

  • DefaultSinkProcessor (默认)对应的是单个的Sink
  • LoadBalancingSinkProcessorFailoverSinkProcessor对应的是Sink Group
    • LoadBalancingSinkProcessor可以实现负载均衡的功能
    • FailoverSinkProcessor可以错误恢复的功能(对应单个的Sink,当该Sink出错,选择其他的Sink代替其工作)

4.3 Flume 拓扑结构

4.3.1 简单串联


在这里插入图片描述

这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。

此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

上游的是客户端,下游的是服务端,所有启动的时候,先启动下游的服务端

简单串联的结构我们一般不用

4.3.2 复制和多路复用


单source,多channel、sink

在这里插入图片描述

Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地。

4.3.3 负载均衡和故障转移


Flume负载均衡或故障转移

在这里插入图片描述

Flume 支持使用将多个 sink 逻辑上分到一个 sink 组,sink 组配合不同的 SinkProcessor 可以实现负载均衡和错误恢复的功能。

4.3.4 聚合


在这里插入图片描述

这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的 flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。

4.4 企业开发案例

4.4.1 复制


案例需求

使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。

需求分析

在这里插入图片描述

实现步骤

(1)在 flume 目录的 jobs 文件夹下 fileroll 文件夹作为 flume3 写入本地文件系统重的路径

[huwei@hadoop101 ~]$ cd /opt/module/flume-1.9.0/jobs
[huwei@hadoop101 jobs]$ mkdir fileroll

(2)在 flume 目录的 jobs 文件夹下创建 replication 文件夹

[huwei@hadoop101 jobs]$ mkdir replication
[huwei@hadoop101 jobs]$ cd replication

(3)在replication 文件夹中新建并编辑配置文件

flume1.conf

[huwei@hadoop101 replication]$ vim flume1.conf

添加如下内容

#Named
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

#channel selector
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888


#Bind
a1.sources.r1.channels = c1 c2  
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c2 

flume2.conf

[huwei@hadoop101 replication]$ vim flume2.conf

添加如下内容

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop101:9820/flume/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 

flume3.conf

[huwei@hadoop101 replication]$ vim flume3.conf

添加如下内容

#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 

(4)启动 flume

开启三个 shell 窗口,分别启动 flume3、flume2、flume1

注意先启动下游后启动上游

[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replication/flume3.conf -n a3 -Dflume.root.logger=INFO,console
[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replication/flume2.conf -n a2 -Dflume.root.logger=INFO,console
[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replication/flume1.conf -n a1 -Dflume.root.logger=INFO,console

(5)向 Flume-1 的监控文件中追加内容

[huwei@hadoop101 ~]$ cd /opt/module/flume-1.9.0/jobs/taildir/
[huwei@hadoop101 taildir]$ echo abcdef >> file1.txt
[huwei@hadoop101 taildir]$ echo 123456 >> file2.txt

(6)查看 flume2、flume3 的输出

观察 hdfs 文件系统 http://hadoop101:9870

在这里插入图片描述

观察本地文件系统

在这里插入图片描述

每隔30秒就会生成新的文件(不管有没有新的数据)

4.4.2 负载均衡


案例需求

Flume1 监控端口数据,将监控到的内容通过轮询或者随机的方式给到Flume2、Flume3,Flume2、Flume3 将内容打印到控制台

需求分析

在这里插入图片描述

实现步骤

(1)在 flume 目录的 jobs 文件夹下创建 loadbalance 文件夹

[huwei@hadoop101 jobs]$ mkdir loadbalance

(2)在 loadbalance 文件夹中新建并编辑配置文件

flume1.conf

[huwei@hadoop101 loadbalance]$ vim flume1.conf

添加如下内容

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666

#channel selector
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random 


#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1

flume2.conf

[huwei@hadoop101 loadbalance]$ vim flume2.conf

添加如下内容

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 

flume3.conf

[huwei@hadoop101 loadbalance]$ vim flume3.conf

添加如下内容

#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 

(3)启动 flume

开启三个 shell 窗口,分别启动 flume3、flume2、flume1

注意先启动下游后启动上游

[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalance/flume3.conf -n a3 -Dflume.root.logger=INFO,console
[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalance/flume2.conf -n a2 -Dflume.root.logger=INFO,console
[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalance/flume1.conf -n a1 -Dflume.root.logger=INFO,console

(4)使用 netcat 工具向本机的 6666 端口发送内容

[huwei@hadoop101 loadbalance]$ nc localhost 6666

在这里插入图片描述

观察 flume2、flume3

4.4.3 故障转移


案例需求

Flume1 监控端口数据,将监控到的内容发送给 active 的 sink,Flume2、Flume3 将内容打印到控制台

需求分析

在这里插入图片描述

实现步骤

(1)在 flume 目录的 jobs 文件夹下创建 failover 文件夹

[huwei@hadoop101 jobs]$ mkdir failover

(2)在 failover 文件夹中新建并编辑配置文件

flume1.conf

[huwei@hadoop101 failover]$ vim flume1.conf

添加如下内容

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666

#channel selector
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 7777

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 8888

#Sink processor
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10


#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1

sink 代表优先级的数字越大,其优先级就越高, 优先级高的就是那个 active 的 sink。这里设置 flume3 的优先级更高,即 flume3 的输入为 active 的 sink 输出

flume2.conf

[huwei@hadoop101 failover]$ vim flume2.conf

添加如下内容

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 7777

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 

flume3.conf

[huwei@hadoop101 failover]$ vim flume3.conf

添加如下内容

#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 

(3)启动 flume

开启三个 shell 窗口,分别启动 flume3、flume2、flume1

注意先启动下游后启动上游

[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console
[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console
[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console

(4)使用 netcat 工具向本机的 6666 端口发送内容

[huwei@hadoop101 loadbalance]$ nc localhost 6666

可以发现只有 flume3 可以接收到发送的数据,当 flume3 故障后,再发送数据时,此时只有 flume2 可以接收到数据,再次启动 flume3 ,数据又要给到 flume3 了,因为其优先级更高。

4.4.4 聚合


案例需求

Flume1 (hadoop101)监控文件内容,Flume2 (hadoop102)监控端口数据,Flume1 和 Flume 2 将监控到的数据发往 Flume3(hadoop103), Flume3 将内容打印到控制台。

需求分析

在这里插入图片描述

实现步骤

(1)在 flume 目录的 jobs 文件夹下创建 aggre 文件夹

[huwei@hadoop101 jobs]$ mkdir aggre

(2)在 failover 文件夹中新建并编辑配置文件

flume1.conf

[huwei@hadoop101 aggre]$ vim flume1.conf

添加如下内容

#Named
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 8888

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 

flume2.conf

[huwei@hadoop101 aggre]$ vim flume2.conf

添加如下内容

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop103
a2.sinks.k1.port = 8888

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 

flume3.conf

[huwei@hadoop101 aggre]$ vim flume3.conf

添加如下内容

#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop103
a3.sources.r1.port = 8888

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 

(3)向其他机器分发 flume

[huwei@hadoop101 ~]$ xsync /opt/module/flume-1.9.0

这里使用的是大数据技术学习笔记(三)—— Hadoop 的运行模式中编写集群分发脚本 xsync

同时发送环境变量配置

[huwei@hadoop101 ~]$ sudo xsync /etc/profile.d/my_env.sh

使得环境变量生效

[huwei@hadoop102 ~]$ source /etc/profile
[huwei@hadoop103 ~]$ source /etc/profile

(3)启动 flume

在 hadoop103、hadoop102、hadoop101,分别启动 flume3、flume2、flume1

注意先启动下游后启动上游

[huwei@hadoop103 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume3.conf -n a3 -Dflume.root.logger=INFO,console
[huwei@hadoop102 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume2.conf -n a2 -Dflume.root.logger=INFO,console
[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume1.conf -n a1 -Dflume.root.logger=INFO,console

后续测试同前文,不再赘述

4.5 自定义 Interceptor


案例需求

Flume1 监控端口数据,将监控到的数据发往 Flume2 、Flume3 、Flume4 ,包含“flume”的数据发往 Flume2,包含“hadoop”的数据发往 Flume 3,其他的数据发往Flume 4

需求分析

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的event 发送到不同的 Channel 中,所以我们需要 自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。

在这里插入图片描述

实现步骤

(1)创建一个 maven 项目,并引入以下依赖。

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

(2)定义 CustomInterceptor 类并实现 Interceptor 接口,然后再定义一个静态内部类用来返回自定义的拦截器对象

public class CustomInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    /**
     * 一个event的处理
     */
    @Override
    public Event intercept(Event event) {
        // 1. 获取event的headers
        Map<String, String> headers = event.getHeaders();
        // 2. 获取event的body
        String body = new String(event.getBody());
        // 3. 判断event的body中是否包含"flume"、"hadoop"
        if (body.contains("flume")){
            headers.put("title","flume");
        }else if (body.contains("hadoop")){
            headers.put("title","hadoop");
        }
        return event;
    }

    /**
     * 迭代每一个event进行处理
     */
    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }

    /**
     * 定义一个静态内部类用来返回自定义的拦截器对象
     */
    public static class MyBuilder implements Builder{

        @Override
        public Interceptor build() {
            return new CustomInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

(3)package 打成 jar 包

在这里插入图片描述
(4)将打好的 jar 包上传到 flume 目录下的 lib 文件夹下

(5)在 flume 目录的 jobs 文件夹下创建 multi 文件夹

[huwei@hadoop101 jobs]$ mkdir multi

(6)在 multi 文件夹中新建并编辑配置文件

flume1.conf

[huwei@hadoop101 multi]$ vim flume1.conf

添加如下内容

#Named
a1.sources = r1
a1.channels = c1 c2 c3
a1.sinks = k1 k2 k3

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 5555

#channel selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = title
a1.sources.r1.selector.mapping.flume = c1
a1.sources.r1.selector.mapping.hadoop = c2
a1.sources.r1.selector.default = c3

# Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.huwei.flume.CustomInterceptor$MyBuilder

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100


a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 100


#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 6666

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = localhost
a1.sinks.k2.port = 7777

a1.sinks.k3.type = avro
a1.sinks.k3.hostname = localhost
a1.sinks.k3.port = 8888

#Bind
a1.sources.r1.channels = c1 c2 c3   
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c2 
a1.sinks.k3.channel = c3 

flume2.conf

[huwei@hadoop101 failover]$ vim flume2.conf

添加如下内容

 a2.sources = r1
a2.channels = c1
a2.sinks = k1 

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = localhost
a2.sources.r1.port = 6666

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1 
a2.sinks.k1.channel = c1 

flume3.conf

[huwei@hadoop101 multi]$ vim flume3.conf

添加如下内容

#Named
a3.sources = r1
a3.channels = c1
a3.sinks = k1 

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = localhost
a3.sources.r1.port = 7777

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1 
a3.sinks.k1.channel = c1 

flume4.conf

[huwei@hadoop101 multi]$ vim flume4.conf

添加如下内容

#Named
a4.sources = r1
a4.channels = c1
a4.sinks = k1 

#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = localhost
a4.sources.r1.port = 8888

#Channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100

#Sink
a4.sinks.k1.type = logger

#Bind
a4.sources.r1.channels = c1 
a4.sinks.k1.channel = c1 

(7)启动 Flume

[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume4.conf -n a4 -Dflume.root.logger=INFO,console
[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume3.conf -n a3 -Dflume.root.logger=INFO,console
[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume2.conf -n a2 -Dflume.root.logger=INFO,console
[huwei@hadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume1.conf -n a1 -Dflume.root.logger=INFO,console

(8)发送数据

输入要发送的数据进行测试

[huwei@hadoop101 ~]$ nc localhost 5555

4.6 自定义 Source


Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。

4.7 自定义 Sink


Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

Sink组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的Sink类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。

4.8 Flume 数据流监控

4.8.1 Ganglia 的安装与部署


Ganglia 由gmond、gmetad 和 gweb 三部分组成。

  • gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond,你可以很容易收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等。
  • gmetad(Ganglia Meta Daemon)整合所有信息,并将其以 RRD 格式存储至磁盘的服务。
  • gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示gmetad 所存储数据的PHP前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。

(1)集群规划

hadoop101:gweb  gmetad gmod 
hadoop102:gmod
hadoop103:gmod

(2)在101 102 103 分别安装 epel-release

[huwei@hadoop101 ~]$ sudo yum -y install epel-release
[huwei@hadoop102 ~]$ sudo yum -y install epel-release
[huwei@hadoop103 ~]$ sudo yum -y install epel-release

(3)在101安装

[huwei@hadoop101 ~]$ sudo yum -y install ganglia-gmetad
[huwei@hadoop101 ~]$ sudo yum -y install ganglia-web
[huwei@hadoop101 ~]$ sudo yum -y install ganglia-gmond

(4)在102 和 103 安装

[huwei@hadoop102 ~]$ sudo yum -y install ganglia-gmond
[huwei@hadoop103 ~]$ sudo yum -y install ganglia-gmond

(5)在101修改配置文件 /etc/httpd/conf.d/ganglia.conf

[huwei@hadoop101 ~]$ sudo vim /etc/httpd/conf.d/ganglia.conf

在这里插入图片描述

通过windows访问ganglia,需要配置Linux对应的主机(windows)ip地址,这里需要根据自己的电脑 ip 来配置

(6)在 101 修改配置文件 /etc/ganglia/gmetad.conf

[huwei@hadoop101 ~]$ sudo vim /etc/ganglia/gmetad.conf

在这里插入图片描述

同时注意集群名称“my cluster”

(7)在101 102 103 修改配置文件 /etc/ganglia/gmond.conf

[huwei@hadoop101 ~]$ sudo vim /etc/ganglia/gmond.conf
[huwei@hadoop102 ~]$ sudo vim /etc/ganglia/gmond.conf
[huwei@hadoop103 ~]$ sudo vim /etc/ganglia/gmond.conf

在这里插入图片描述

数据发送给 hadoop101
在这里插入图片描述
接收来自任意连接的数据

在这里插入图片描述
(8)在101 修改配置文件 /etc/selinux/config

[huwei@hadoop101 ~]$ sudo vim /etc/selinux/config

在这里插入图片描述

selinux 本次生效关闭必须重启,如果此时不想重启,可以临时生效之 sudo setenforce 0

(9)启动 ganglia

在101 102 103 启动

[huwei@hadoop101 ~]$ sudo systemctl  start gmond
[huwei@hadoop102 ~]$ sudo systemctl  start gmond
[huwei@hadoop103 ~]$ sudo systemctl  start gmond

查看服务状态systemctl status gmond

在101 启动

[huwei@hadoop101 ~]$ sudo systemctl start httpd
[huwei@hadoop101 ~]$ sudo systemctl start gmetad

(10)打开网页浏览 ganglia 页面

http://hadoop101/ganglia

4.8.2 操作 Flume 测试监控


(1)启动 flume

[huwei@hadoop101 ~]$ flume-ng agent \
> -c $FLUME_HOME/conf \
> -n a1 \
> -f $FLUME_HOME/jobs/flume-netcat-logger.conf \
> -Dflume.root.logger=INFO,console \
> -Dflume.monitoring.type=ganglia \
> -Dflume.monitoring.hosts=hadoop101:8649

(2)发送数据,观察 web 界面变化

[huwei@hadoop101 ~]$ nc localhost 6666

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/270515.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PyQt5和Qt designer的详细安装教程

Qt designer界面和所有组件功能的详细介绍参考&#xff1a;https://blog.csdn.net/qq_43811536/article/details/135186862?spm1001.2014.3001.5501 目录 0. 写在前面1. Anaconda创建虚拟环境2. 安装PyQt5和Qt designer3. 测试安装成功 0. 写在前面 Qt Designer是Qt提供的一种…

智慧互联网银行引领金融变革,开源网安VulHunter护航数字化发展

某银行作为国内知名的互联网银行&#xff0c;以构建“智慧型互联行”为总体战略目标&#xff0c;始终坚持科技赋能金融的理念。通过AI、大数据、云计算等数字技术与金融业务的探索融合&#xff0c;实现以更低的成本为客户提供便捷、高效和优质体验的互联网金融服务。 架构升级助…

操作无法完成(错误 0x000006ba),Windows 11 PDF打印机无法使用解决办法

操作无法完成(错误 0x000006ba)&#xff0c;Windows 11 PDF打印机无法使用解决办法 解决方式一 先重启一次电脑&#xff0c;看看是否可以解决问题。 解决方式二 重新启动 Printer Spooler 服务

【JAVA】黑马MybatisPlus 学习笔记【三】【拓展功能】

3.扩展功能 3.1.代码生成 在使用MybatisPlus以后&#xff0c;基础的Mapper、Service、PO代码相对固定&#xff0c;重复编写也比较麻烦。因此MybatisPlus官方提供了代码生成器根据数据库表结构生成PO、Mapper、Service等相关代码。只不过代码生成器同样要编码使用&#xff0c;…

【QT】可执行文件图标由png格式手动改为ico格式,Qt程序报错原因及解决方案

1问题说明&#xff1a; 在修改可执行文件图标时&#xff0c;由png格式手动改为ico格式&#xff0c;Qt程序会报错。 报错如下&#xff1a; 2解决办法&#xff1a; 登录网页 在线生成透明ICO图标——ICO图标制作&#xff0c;利用ico在线生成透明ICO图标 将生成的ico图标由favicon…

CSRF(Pikachu)

CSRF&#xff08;get&#xff09; 首先我们先登录账号 admin 密码是&#xff1b;123456 点击修改个人信息 用F12或者BP 抓包看看我们的url 那么构成的CSRF攻击payload为http://pikachu.shifa23.com/pikachu/vul/csrf/csrfget/csrf_get_edit.php?sexboy&phonenum”手机…

如何申请云闪付支付接口?

随着移动支付的普及&#xff0c;越来越多的商家开始接受各种移动支付方式。而在众多移动支付工具中&#xff0c;云闪付支付接口因其安全、便捷的特点&#xff0c;成为了越来越多商家的首选。那么&#xff0c;如何申请云闪付支付接口呢&#xff1f;本文将为您详细介绍申请云闪付…

PaddleOCR 的使用,极简介绍

安装 参考github的官网就可以&#xff1a; github链接 简单的说&#xff0c;就是两句话&#xff1a; python3 -m pip install paddlepaddle-gpu -i https://mirror.baidu.com/pypi/simple pip install "paddleocr>2.0.1" # 推荐使用2.0.1版本 Python下的使用…

NAT协议的实现方式

在网络通信中&#xff0c;NAT协议&#xff08;Network Address Translation&#xff0c;网络地址转换&#xff09;扮演着关键角色&#xff0c;允许内部网络与外部网络之间进行有效的通信。 实现内外网之间网络地址转换的过程中&#xff0c;NAT采用了不同的实现方式&#xff0c;…

FL Studio 21最新版本for mac 21.2.2.3470中文解锁版

FL Studio 21最新版本for mac 21.2.2.3470中文解锁版是最新强大的音乐制作工具。它可以与所有类型的音乐一起创作出令人惊叹的音乐。它提供了一个非常简单且用户友好的集成开发环境&#xff08;IDE&#xff09;来工作。这个完整的音乐工作站是由比利时公司 Image-Line 开发的。…

了解OAuth 2.0以及社交登录认证授权流程

1.前言 目前在写一个电商项目&#xff0c;可以通过手机号进行注册登录&#xff0c;为了方便用户使用本平台的系统&#xff0c;引入社交登录功能&#xff0c;这里使用的是gittee。 2.OAuth 2.0介绍 当谈到网络安全和身份验证时&#xff0c;OAuth 2.0&#xff08;开放授权 2.0&a…

AXI总线协议---关键信号波形图分析

写过程协议图 读过程协议图 读协议执行顺序图 写协议顺序图 单箭头表示两个信号谁先有效无所谓&#xff0c;双箭头表示必须要等到前一个信号有效才能将后面的信号有效 如何体现协议图中的通道理解 声明&#xff1a;以上图均采用AMBA总线文档图 写过程关键信号 主机 写地址—M…

MySQL集群架构搭建以及多数据源管理实战

MySQL集群架构搭建以及多数据源管理实战 ​ 数据库的分库分表操作&#xff0c;是互联网大型应用所需要面对的最核心的问题。因为数据往往是一个应用最核心的价值所在。但是&#xff0c;在最开始的时候&#xff0c;需要强调下&#xff0c;在实际应用中&#xff0c;对于数据库&a…

Keil5 5.38官方下载、安装及注册教程(详细版)

一、下载地址 官方C51版本下载地址&#xff1a;https://www.keil.com/demo/eval/c51.htm 官方ARM版本下载地址&#xff1a;https://www.keil.com/demo/eval/arm.htm 注&#xff1a;两个版本的安装教程一样 Keil注册机2032年&#xff1a; 链接&#xff1a;https://pan.baidu.…

虚拟机安装配置winServer2012

&#x1f3ac; 艳艳耶✌️&#xff1a;个人主页 &#x1f525; 个人专栏 &#xff1a;《产品经理如何画泳道图&流程图》 ⛺️ 越努力 &#xff0c;越幸运 目录 1、准备工作&#xff1a; 2、VM虚拟机的安装 3、配置虚拟网络编辑器 3、安装系统 4、远程连接步骤 5、…

探索 HTTP 请求的世界:get 和 post 的奥秘(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

华为鸿蒙操作系统简介及系统架构分析(1)

本文部分内容参考&#xff1a; 鸿蒙系统学习笔记(一) 鸿蒙系统介绍 特此致谢。 一、简介及历史 1. 简介 鸿蒙操作系统&#xff08;HarmonyOS&#xff09;是华为公司研制的一款自主版权的操作系统。2019年8月9日&#xff0c;鸿蒙系统在华为开发者大会<HDC.2019>上正式…

AI大模型:未来科技的新篇章

目录 1AI大模型&#xff1a;未来科技的新篇章 2AI超越数学家攻克经典数学难题&#xff1b;非侵入式设备解码大脑思维 1AI大模型&#xff1a;未来科技的新篇章 随着科技的飞速发展&#xff0c;人工智能&#xff08;AI&#xff09;已经成为了我们生活中不可或缺的一部分。而AI大…

图解机器学习神器:Scikit-Learn

算法进阶 ​​本文详解 Scikit-learn 工具库的用法&#xff0c;覆盖机器学习基础知识、SKLearn讲解、SKLearn三大核心API、SKLearn高级API等内容。 https://www.showmeai.tech/article-detail/203 我们在上一篇SKLearn入门与简单应用案例 [1] 里给大家讲到了 SKLearn 工具的基…

docker的一些思考

1.docker是啥&#xff1f; 2.镜像执行流程 3.一些疑惑和解答 1. 2.