Flume实践

1 NetCat方式

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/flume_netcat.conf --name a1 -Dflume.root.logger=INFO,console

[root@master ~]# yum -y intalll telnet

发数据:

]# telnet master 44444

数据接收,是在终端上接收的,而且接收数据已经是编码后的

工作输入主要是来自文件,输出也不是终端上,这里只是测试

2 Exec方式

监控一个输入文件,一般是日记文件,日记变化,flume自动接收

运行flume-ng

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/flume_exec.conf --name a1 -Dflume.root.logger=INFO,console

发数据:

数据接收

3 输出到HDFS

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/flume.conf --name a1 -Dflume.root.logger=INFO,console

写入

[root@master flume]# echo'flume-hdfs1'>> 2.log

[root@master flume]# echo'flume-hdfs2'>> 2.log

[root@master flume]# echo'flume-hdfs3'>> 2.log

HDFS查看

[root@master ~]# hadoop fs -ls /flume/18-09-17

4 故障转移(failover)

刚开始数据接收服务器是A的,A出问题了,转移B接收数据,A恢复之后A继续B接收数据

这需要集群模式才能实现,三台机器:master、slave1、slave2

avro是网络协议,用于连接agent与agent

master配置:

[root@master agent_agent_collector_base]#pwd

/root/07/flume/apache-flume-1.6.0-bin/conf/agent_agent_collector_base

[root@master agent_agent_collector_base]#ll flume-client.properties

-rwxrwxrwx 1 root root 880 4月 22 10:47 flume-client.properties

master启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-client.properties--name agent1 -Dflume.root.logger=INFO,console

Slave1启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-server.properties--name a1 -Dflume.root.logger=INFO,console

slave2启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-server.properties--name a1 -Dflume.root.logger=INFO,console

测试

写数据

Slave1接收==》正常

把slave1停掉,再写数据,发现是Slave2接收

再开启slave1,再测试

Slave1接收

5 负载均衡(loadbalance)

从服务器轮训读取信息

maste启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-client.properties_loadbalance--name a1 -Dflume.root.logger=INFO,console

slave1启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-server.properties--name a1 -Dflume.root.logger=INFO,console

slave2启动:

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/agent_agent_collector_base/flume-server.properties--name a1 -Dflume.root.logger=INFO,console

测试:

写数据

接收数据

Slave1

Slave2

可以看到,它并不是按数量轮流分发的,而是按批次分发的,再写for循环验证

#for i in `seq 1 100`;do echo “$i” >>2.log; sleep 1;done

6 拦截与过滤(Interceptor)

(1) TimestampInterceptor:在event的header中添加一个key叫:timestamp,value为当前的时间戳

[root@master interceptor_test]# catflume_ts_interceptor.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = http ##以http方式连接

a1.sources.r1.host = master

a1.sources.r1.port = 52020

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.preserveExisting=false

a1.sources.r1.interceptors.i1.type =timestamp

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path =hdfs://master:9000/flume/%Y-%m-%d/%H%M #接收地址

a1.sinks.k1.hdfs.filePrefix = badou.

a1.sinks.k1.hdfs.fileType=DataStream

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/interceptor_test/flume_ts_interceptor.conf --name a1-Dflume.root.logger=INFO,console

http方式输入:(也是在master端)

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"hellobadou"}]' http://master:52020

头信息用来做路由选择

输出

(2) HostInterceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip

[root@master interceptor_test]# catflume_hostname_interceptor.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = syslogtcp #以syslogtcp输入

a1.sources.r1.host = master

a1.sources.r1.port = 52020

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1 i2 #可以定义多个拦截器

a1.sources.r1.interceptors.i1.preserveExisting=false

a1.sources.r1.interceptors.i1.type=timestamp

a1.sources.r1.interceptors.i2.type =host

a1.sources.r1.interceptors.i2.hostHeader=hostname

a1.sources.r1.interceptors.i2.useIP =false

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path =hdfs://master:9000/flume/%Y-%m-%d/%H%M

a1.sinks.k1.hdfs.filePrefix = %{hostname}.

a1.sinks.k1.hdfs.fileType=DataStream

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/interceptor_test/flume_hostname_interceptor.conf --name a1-Dflume.root.logger=INFO,console

syslogtcp方式输入:

]# echo "xxxxx" | nc master 52020

输出

(3) StaticInterceptor:可以在event的header中添加自定义的key和value

[root@master interceptor_test]# catflume_static_interceptor.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = http #http方式输入

a1.sources.r1.host = master

a1.sources.r1.port = 52020

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

a1.sources.r1.interceptors.i1.key = badou_flume

a1.sources.r1.interceptors.i1.value = so_easy

#设置为static之后,强制输出badou_flume和so_easy

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/interceptor_test/flume_static_interceptor.conf --name a1 -Dflume.root.logger=INFO,console

输入:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"hellobadou"}]' http://master:52020

输出

把之前强制加入的<key,value>添加进来了

作用:为后面路由选择做准备

(4) RegexFiltering Interceptor:正则过滤器,通过正则来清洗或包含匹配的events

[root@master interceptor_test]# catflume_regex_interceptor.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = http

a1.sources.r1.host = master

a1.sources.r1.port = 52020

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type=regex_filter

a1.sources.r1.interceptors.i1.regex =^[0-9]*$

a1.sources.r1.interceptors.i1.excludeEvents=true

#数字开头并且数字结尾的过滤掉

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/interceptor_test/flume_regex_interceptor.conf --name a1-Dflume.root.logger=INFO,console

输入1:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"123"}]' http://master:52020

输出无反应

输入2:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"12345a"}]' http://master:52020

输出有反应:

输入3:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"abc12345"}]' http://master:52020

输出有反应:

(5) RegexExtractor Interceptor:正则筛选器,通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

[root@master interceptor_test]# catflume_regex_interceptor.conf_extractor

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = http

a1.sources.r1.host = master

a1.sources.r1.port = 52020

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type =regex_extractor

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)

a1.sources.r1.interceptors.i1.serializers =s1 s2 s3

a1.sources.r1.interceptors.i1.serializers.s1.name= one

a1.sources.r1.interceptors.i1.serializers.s2.name= two

a1.sources.r1.interceptors.i1.serializers.s3.name= three

#\\d代表数字,输入三个单位数数字s1:s2:s3,并且分别赋予给one、two、three,one、two、three作为key输出

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/interceptor_test/flume_regex_interceptor.conf_extractor--name a1 -Dflume.root.logger=INFO,console

输入1:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"1:2:3"}]' http://master:52020

输出

输入2:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is header"},"body":"1:2:3asd"}]' http://master:52020

输出

可以看到只匹配数字,并加入到header中输出

输入3:

]# curl -X POST -d'[{"headers":{"hadoop1":"hadoop1 is

header"},"body":"6:7:8:9bbb5"}]' http://master:52020

只会匹配前三位

7 复制与复用(选择器Selector)

(1) 复制(广播的形式发送给下游节点)

Master配置文件

[root@master selector_test]# catflume_client_replicating.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 50000

a1.sources.r1.host = master

a1.sources.r1.selector.type = replicating #复制

a1.sources.r1.channels = c1 c2

# Describe the sink

a1.sinks.k1.type = avro #与下游sinks是通过svro协议连接的

a1.sinks.k1.channel = c1 #连接通道是c1

a1.sinks.k1.hostname = slave1 #服务器slave1

a1.sinks.k1.port = 50000

a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2 #也可以用c1通道的

a1.sinks.k2.hostname = slave2

a1.sinks.k2.port = 50000

#还可以再加slave,就这种形式配置

# Use a channel which buffers eventsinmemory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

slave配置文件

[root@slave1 selector_test]# catflume_server.conf

# Name the components on this agent

a1.sources = r1 #slave1的agent-name是a1,slave2的是a2

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = slave1 #slave2配置也是一样的,只是这里更改为slave2

a1.sources.r1.port = 50000 #端口要跟master对应

# Describe the sink

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

# Use a channel which buffers eventsinmemory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/selector_test/flume_client_replicating.conf --name a1-Dflume.root.logger=INFO,console

Slave1启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/selector_test/flume_server.conf --name a1-Dflume.root.logger=INFO,console

Slave2启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/selector_test/flume_server.conf --name a2-Dflume.root.logger=INFO,console

输入:

输出:slave1、slave2都接收到了

(2) 复用

Master配置文件

[root@master selector_test]# catflume_client_multiplexing.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2 #有两个sink

a1.channels = c1 c2 #有两个channel

# Describe/configure the source

a1.sources.r1.type= org.apache.flume.source.http.HTTPSource

a1.sources.r1.port= 50000

a1.sources.r1.host= master

a1.sources.r1.selector.type= multiplexing

a1.sources.r1.channels= c1 c2

a1.sources.r1.selector.header= areyouok

a1.sources.r1.selector.mapping.OK = c1

a1.sources.r1.selector.mapping.NO = c2

a1.sources.r1.selector.default= c1

#定义输入策略,ok是走c1通道,on是走c2通道,c1c2对应不同的机器,有个默认通道是c1

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = slave1

a1.sinks.k1.port = 50000

a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = slave2

a1.sinks.k2.port = 50000

# Use a channel which buffers eventsinmemory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity =100

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

slave配置文件跟复制的是一样的

先启动slave

Master启动

]# ./bin/flume-ng agent --conf conf--conf-file ./conf/selector_test/flume_client_multiplexing.conf --name a1-Dflume.root.logger=INFO,console

输入1:

]# curl -X POST -d'[{"headers":{"areyouok":"OK","hadoop1":"hadoop1is header"}, "body":"6:7:8bbb5"}]' http://master:50000

输出:slave1接收到信息

输入2:

]# curl -X POST -d '[{"headers":{"areyouok":"OK","hadoop1":"hadoop1is header"}, "body":"abcabc111111"}]'http://master:50000

输出:也一样是slave1接收到信息

输入3:

]# curl -X POST -d'[{"headers":{"areyouok":"NO","hadoop1":"hadoop1is header"}, "body":"abcabc111111"}]' http://master:50000

]# curl -X POST -d'[{"headers":{"areyouok":"NO","hadoop1":"hadoop1is header"}, "body":"abcabc111111"}]' http://master:50000

输出:slave2接收到信息

不做标记输入4:

]# curl -X POST -d'[{"headers":{"areyouok":"IDONEKNOW","hadoop1":"hadoop1is header"}, "body":"abcabc111111"}]'http://master:50000

默认设置slave1输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/23903.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

从组件化角度聊聊设计工程化

目录 设计系统 设计系统的定义 设计系统的优势 设计系统存在的问题 设计工程化 设计系统探索 设计系统落地实践 Design Token Design Token 实践 设计工程化理想方案构想 展望 参考文献 近几年围绕业务中台化的场景&#xff0c;涌现出了许多低代码平台。面对多组件…

Qt翻金币小游戏详细教程(内涵所有源码、图片资源)

一、项目简介 翻金币项目是一款经典的益智类游戏&#xff0c;我们需要将金币都翻成同色&#xff0c;才视为胜利。首先&#xff0c;开始界面如下&#xff1a; 点击start按钮&#xff0c;进入下层界面&#xff0c;选择关卡&#xff1a; 在这里我们设立了20个关卡供玩家选择&…

Jmeter组件:Random CSV Data Set Config(随机读取文件数据)

一、Jmeter组件&#xff1a;Random CSV Data Set Config(随机读取文件数据) 功能&#xff1a;该组件可以随机读取CSV文件中的每一行的数据 二、下载插件&#xff1a;(jmeter-plugins-random-csv-data-set-xx.jar),并放到lib/ext目录下&#xff0c;重启jmeter 也可以在Jmeter…

Nginx基本使用以及部署前端项目

前言 最近学习了一下Nginx&#xff0c;整理了一个博客&#xff0c;主要参考的是狂神说的b站视频教程&#xff0c;文章链接如下&#xff1a;狂神说Nginx快速入门 一、下载、启动Nginx 1.下载Nginx 到Nginx官方选择自己电脑适用的稳定版本下载&#xff0c;我下载的的windows版…

ChatGPT免费使用的方法有哪些?

目录 一、ChatGpt是什么&#xff1f; 二、ChatGPT国内免费使用的方法&#xff1a; 第一点&#xff1a;电脑端 第二点&#xff1a;手机端 三、结语&#xff1a; 一、ChatGpt是什么&#xff1f; ChatGPt是美国OpenAI [1] 研发的聊天机器人程序 。更是人工智能技术驱动的自然语…

图像算法工程师岗位的主要职责(合集)

图像算法工程师岗位的主要职责 一、确定岗位的职责 1.根据工作任务的需要确立工作岗位名称及其数量; 2.根据岗位工种确定岗位职务范围; 3.根据工种性质确定岗位使用的设备、工具、工作质量和效率; 4.明确岗位环境和确定岗位任职资格; 5.确定各个岗位之间的相互关系; 6.根据岗位…

Solidity基础七

无论风暴将我带到什么岸边&#xff0c;我都将以主人的身份上岸 目录 一、Solidity的单位 1. 货币Ether 2. 时间单位Time 二、地址的形成 三、以太坊的账户 1.内部账户&#xff08;简称CA&#xff09; 2.外部账户&#xff08;简称EOA&#xff09; 3.内部账户和外部账户…

【Linux性能优化】你知道什么是平衡负载么

什么是平衡负载 首先大家思考一下&#xff0c;当你发现自己的服务变慢时&#xff0c;你会首先使用什么命令来排查&#xff1f;我通常做的第一件事&#xff0c;就是执行top或者uptime命令来了解系统的负载情况。比如像下面这样&#xff0c;我在命令行里输入top命令&#xff0c;…

[高光谱]使用PyTorch的dataloader加载高光谱数据

本文实验的部分代码参考 Hyperspectral-Classificationhttps://github.com/eecn/Hyperspectral-Classification如果对dataloader的工作原理不太清楚可以参见 [Pytorch]DataSet和DataLoader逐句详解https://blog.csdn.net/weixin_37878740/article/details/129350390?spm1001…

网络货运平台源码 管理平台端+司机端APP+货主端APP源码

网络货运平台系统源码&#xff0c;网络货运平台源码 管理平台端司机端APP货主端APP 遵循政策要求的八项基本功能&#xff0c;结合货主、实际承运人、监管方等多方业务场景&#xff0c;构建人、车、货、企一体的标准化网络货运平台系统。具有信息发布、线上交易、全程监控、金融…

数据库基础——6.排序与分页

这篇文章来讲一下数据库的排序与分页 目录 1.排序数据 1.1排序规则 1.2 单列排序 1.3 多列排序 2.分页 2.1 背景 2.2 实现规则 2.3 拓展 1.排序数据 1.1排序规则 使用 ORDER BY 子句排序 ASC&#xff08;ascend&#xff09;&#xff1a;升序 &#xff1b; DESC&a…

vue项目中使用depcheck检查缺失的依赖项目

使用depcheck检查缺失的项目依赖 由来&#xff1a;今天在做地铁的时候&#xff0c;刷短视频发现一个非常好用的东西&#xff0c;分享一下 它可以帮助我们找出问题&#xff0c;在 package.json 中&#xff0c;每个依赖包如何被使用、哪些依赖包没有用处、哪些依赖包缺失。它是解…

2023年,推荐10个让你事半功倍的CSS在线生产力工具

1、CSS Gradient CSS Gradient 是一个在线工具&#xff0c;可以帮助用户创建并生成 CSS 渐变代码。用户可以使用该工具中提供的图形用户界面来调整颜色、方向和渐变类型&#xff0c;然后生成相应的 CSS 代码。用户可以将生成的代码复制并粘贴到自己的 CSS 样式表中&#xff0c…

Linux——操作系统详解

目录 一.操作系统的含义 1.操作系统是什么&#xff1f; 2.那么操作系统为什么要对软硬件资源进行管理呢&#xff1f;这样做的好处在哪里&#xff1f; 3.操作系统又是怎么进行管理的&#xff1f; 如何理解“先描述&#xff0c;再组织”&#xff1f; 二.总结&#xff1a; …

音乐小白乐器选择,如何学一手才艺,推荐尤克里里

乐器难度说明 注意&#xff1a;这里的难度说明是音准的难度&#xff0c;就是能不能发出标准的声音 乐器按照演奏方式分类&#xff0c;分为 演奏方式乐器举例难度等级难度说明敲击木鱼&#xff0c;架子鼓&#xff0c;钢琴1敲击乐是音最准的&#xff0c;敲哪个地方就发什么音&…

记录--超长溢出头部省略打点,坑这么大,技巧这么多?

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 在业务中&#xff0c;有这么一种场景&#xff0c;表格下的某一列 ID 值&#xff0c;文本超长了&#xff0c;正常而言会是这样&#xff1a; 通常&#xff0c;这种情况都需要超长省略溢出打点&#xff0…

Kali-linux Gerix Wifi Cracker破解无线网络

Gerix Wifi Cracker是另一个aircrack图形用户界面的无线网络破解工具。本节将介绍使用该工具破解无线网络及创建假的接入点。 9.3.1 Gerix破解WEP加密的无线网络 在前面介绍了手动使用Aircrack-ng破解WEP和WPA/WPA2加密的无线网络。为了方便&#xff0c;本小节将介绍使用Geri…

学习RabbitMQ高级特性

目标&#xff1a; 了解熟悉RabbitMQ的高级特性 学习步骤&#xff1a; 高级特性主要分为以下几点, 官网介绍 1、消息可靠性投递 【confirm 确认模式、return 退回模式】 2、Consumer ACK 【acknowledge】 3、消费端限流 【prefetch】 4、TTL过期时间 【time to live】 5、死信队…

没有权限merge到源git仓库?一招教你如何解决。

在git上贡献项目的时候&#xff0c;一般步骤是&#xff0c;clone源项目到本地&#xff0c;切出一个新的分支&#xff0c;然后在新分支上开发&#xff0c;最后push到远程&#xff0c;然后提出mr。但是对于一些非开源的项目&#xff0c;可能会出现&#xff1a; 这就是说明没有权…

【C++】布隆过滤器

文章目录 布隆过滤器的引入布隆过滤器的概念如何选择哈希函数个数和布隆过滤器长度布隆过滤器的实现布隆过滤器的优缺点 布隆过滤器的引入 我们在使用新闻客户端看新闻时&#xff0c;它会给我们不停地推荐新的内容&#xff0c;它每次推荐时要去重&#xff0c;去掉那些已经看过…