flume使用实例

1、监听端口a1.sources.r1.type = netcat

配置文件nc-flume-console.conf

# Name the components on this agent a1 表示jvm进程名

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = node1

a1.sources.r1.port = 44444

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000 #1000个event

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume-ng agent -n a1 -c conf/ -f /export/server/flume/job/nc-flume-console.conf

 参数-n 表示jvm进程名 -c表示本次启动读取的配置文件conf目录下的文件 -f 表示具体执行的文件

另开窗口输入内容后控制台会自动返回OK

2、实时监控单个追加文件

配置文件 flume-exec-logger.conf

#Agent_name

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#Sources

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /export/server/hive/logs/hive.log

#Channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#sinks

a1.sinks.k1.type = logger

#组合

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动flume监听

flume-ng agent -c conf/ -f job/flume-exec-logger.conf -n a1

手动追加数据到hive.log文件 并查看监控窗口

echo INFO [main] spark.HiveSparkClientFactory >> logs/hive.log

动态添加数据到hive.log

连接hive 观察flume监控变化

 beeline -u jdbc:hive2://node1:10000 -n ljr

 show databases;

  由此可见当我们操作hive的时候 hive.log 就更新,由于我们监控了hive.log文件所以当有新数据追加到hive.log的时候 就会监听到 并打印到控制台

3、实时监控单个追加文件,并将数据输出到hdfs

配置文件 flume-hivelogs-hdfs.con

# Name the components on this agent

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = exec

a2.sources.r2.command = tail -F /export/server/hive/logs/hive.log

# Describe the sink

a2.sinks.k2.type = hdfs

a2.sinks.k2.hdfs.path = hdfs://node1:8020/flume/%Y%m%d/%H

#上传文件的前缀

a2.sinks.k2.hdfs.filePrefix = logs-

#是否按照时间滚动文件夹

a2.sinks.k2.hdfs.round = true

#多少时间单位创建一个新的文件夹

a2.sinks.k2.hdfs.roundValue = 1

#重新定义时间单位

a2.sinks.k2.hdfs.roundUnit = hour

#是否使用本地时间戳

a2.sinks.k2.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a2.sinks.k2.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a2.sinks.k2.hdfs.fileType = DataStream

#多久生成一个新的文件

a2.sinks.k2.hdfs.rollInterval = 60

#设置每个文件的滚动大小

a2.sinks.k2.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

启动flume监听,操作hive

flume-ng agent -n a2 -c conf/ -f flume-hivelogs-hdfs.con

查看hdfs,有新文件产生

使用 Flume 监听整个目录(a3.sources.r3.type = TAILDIR)

的实时追加文件,并上传至 HDFS

实现步骤:

【1】创建被监控目录

我这里监控data目录  此目录需要提前创建

mkdir data

cd data

touch file1.txt

touch file2.txt

touch log2.txt

toch log1.txt

【2】创建文件 flume-taildir-hdfs.conf

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# Describe/configure the source

a3.sources.r3.type = TAILDIR

#记录最后监控文件的断点的文件,此文件位置可不改

a3.sources.r3.positionFile =  /export/server/flume/data /tail_dir.json

a3.sources.r3.filegroups = f1 f2

a3.sources.r3.filegroups.f1 = /export/server/flume/data/.*file.*

a3.sources.r3.filegroups.f2 =/export/server/flume/data/.*log.*

# Describe the sink

a3.sinks.k3.type = hdfs

# hdfs://node1:8020 可省略

a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload2/%Y%m%d/%H

#上传文件的前缀

a3.sinks.k3.hdfs.filePrefix = upload-

#是否按照时间滚动文件夹

a3.sinks.k3.hdfs.round = true

#多少时间单位创建一个新的文件夹

a3.sinks.k3.hdfs.roundValue = 1

#重新定义时间单位

a3.sinks.k3.hdfs.roundUnit = hour

#是否使用本地时间戳

a3.sinks.k3.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a3.sinks.k3.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a3.sinks.k3.hdfs.fileType = DataStream

#多久生成一个新的文件,单位是秒

a3.sinks.k3.hdfs.rollInterval = 3600

#设置每个文件的滚动大小大概是 128M,单位是byte

a3.sinks.k3.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

【3】启动flume监控

        bin/flume-ng agent -c conf -f datas/flume-taildir-hdfs.conf -n a3

【4】向文件中追加内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/637096.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《王者荣耀》4月狂揽2.34亿美元 单日流水1亿美元 全球销量第二

易采游戏网5月24日消息,在刚刚过去的四月,全球手游市场迎来了一场收益的盛宴,其中《王者荣耀》以其惊人的吸金能力,以2.34亿美元的月收入在全球手游排行榜上位列第二。4月5日,这款由腾讯游戏开发的多人在线战斗竞技游戏…

软考考前前怎么复习?

有一些经验,可以和大家分享一下。 软考的考试内容 软考包含许多科目,共分为五大类,27个专业。 软考的等级不同,考试内容也有所不同。初级和中级考试只包括两门科目,而高级则需要考三门科目。每门科目满分75分&#x…

knife4j-swagger

文章目录 knife4j-swagger第 1 步:引入 jar 包第 2 步:添加注释来开启 knife4j第 3 步:验证问题解决新增功能:ApiOperationSupport 注解新增功能:DynamicParameters 注解忽略参数属性 knife4j-swagger knife4j 是 Swa…

W801 实现获取天气情况

看了小安派(AiPi-Eyes 天气站)的源码,感觉用W801也可以实现。 一、部分源码 main.c #include "wm_include.h" #include "Lcd_Driver.h"void UserMain(void) {printf("\n user task \n");Lcd_Init();Lcd_Clea…

Qt官方示例---opengl

文件相对路径:Examples\Qt-5.9.1\opengl 2dpainting cube computegles31 contextinfo hellogl2 hellowindow paintedwindow qopenglwidget qopenglwindow textures threadedqopenglwidget

VirtualBox设置共享文件夹,用于在Window11 和 Ubuntu22 中共享文件,2024亲测可用

VirtualBox设置共享文件夹,用于在Window11 和 Ubuntu22 中共享文件,2024亲测可用 Windows操作 1、新建文件夹,用于共享 Linux操作 1、添加共享文件夹 共享文件夹路径:选择Windows系统中你需要共享的文件夹 共享文件夹名称&am…

ROS2入门21讲__第07讲__节点:机器人的工作细胞

目录 前言 通信模型 案例一:Hello World节点(面向过程) 运行效果 代码解析 创建节点流程 案例二:Hello World节点(面向对象) 运行效果 代码解析 创建节点流程 案例三:物体识别节点 …

小蓝和小青在做数字破解游戏

小蓝和小青在做数字破解游戏,设某图案由m*n的0和1点阵组成,依照以下规则破解连续一组数值,从点阵图第一行第一个符号开始计算,从左到右,由上至下。第一个数表示连续有几个0,第二个数表示接下来连续有几个1,…

Nuxt3 实战 (七):配置 Supabase 数据库

前言 这个章节我们要先把数据库的环境配置好,古人云:工欲善其事,必先利其器。 这两天我一直在网上寻找最适合 Nuxt 的数据库,之前在做个人项目时经常用的是 Mysql 和 MongoDB,也用过 ORM 框架比如:Sequel…

flutter webview加载本地文件出现跨域解决方案

一直报错 [INFO:CONSOLE(17)] "Access to image at file:///android_asset/flutter_assets/assets/jump/box_bottom.png from origin null has been blocked by CORS policy: Cross origin requests are only supported for protocol schemes: http, data, chrome, chrome…

留学服务平台应用架构的设计与优化

随着全球化进程的加速和人们对国际化教育的需求不断增长,留学服务平台在满足学生留学需求的同时也面临着诸多挑战。在这样的背景下,设计一个高效、稳定且安全可靠的留学服务平台应用架构显得尤为重要。本文将就留学服务平台应用架构的设计与优化进行探讨…

JKTECH柔性振动盘用途

柔性振动盘的作用与用途 在现代工业自动化领域,柔性振动盘凭借其独特的功能和广泛的应用场景,正逐渐成为生产线上的重要工具。柔性振动盘,又称柔性供料器,它结合了传统振动盘的高效性和现代自动化技术的灵活性,为各种…

vulhub——Aria2、bash、catic

文章目录 一、Aria2 任意文件写入漏洞二、CVE-2014-6271(Bash Shell 漏洞)三、CVE-2022-46169(Cacti 前台命令注入漏洞) 一、Aria2 任意文件写入漏洞 Aria2是一个命令行下轻量级、多协议、多来源的下载工具(支持 HTTP…

【MySQL精通之路】InnoDB(10)-行格式

目录 1.表数据结构 1.1 聚集索引数据结构 1.2 辅助索引数据结构 2.行格式 2.1 REDUNDANT行格式 2.2 REDUNDANT存储特性 2.3 COMPACT行格式 2.4 COMPACT存储特性 2.5 DYNAMIC行格式 2.6 DYNAMIC存储特性 2.7 COMPRESSED行格式 2.8 COMPRESSED存储特性 3.定义表格的…

我怎么使用AI大语言模型学英语

今天已经是我开始英语拉练任务的第39天了,一直在笃定的、雷打不动的、机械笨拙的重复做一件事,那就是使用AI工具,将我想要说的话翻译成英文,生成语音文件,每天朗读三小时,最终整个背下来。我也在思考&#…

【人工智能】数据分析与机器学习——泰坦尼克号(更新中)

1912年4月15日,泰坦尼克号在首次航行期间撞上冰山后沉没,船上共有2224名乘客和乘务人员,最终有1502人遇难。沉船导致大量伤亡的重要原因之一是,没有足够的救生艇给乘客和船员。虽然从这样的悲剧性事故中幸存下来有一定的运气因素&…

Aware接口作用

介绍 Aware(感知)接口是一个标记,里面没有任何方法,实际方法定义都是子接口确定(相当于定义了一套规则,并建议子接口中应该只有一个无返回值的方法)。 我们知道spring已经定义好了很多对象,如…

半导体测试基础 - 基本概念

随着芯片集成度越来越高,手动测试已无法满足需求,因此要用到自动化测试设备(ATE,Automated Test Equipment)。因为现在的芯片原来越复杂,普通的 Bench 测试没法满足需求。ATE 可检测集成电路功能之完整性&a…

Joomla 3.7.0 (CVE-2017-8917) SQL注入漏洞环境

1 漏洞概述 Joomla是一个基于PHP的内容管理系统(CMS),广泛应用于各类网站。2017年,Joomla 3.7.0版本被发现存在SQL注入漏洞(CVE-2017-8917),攻击者可以利用该漏洞对数据库进行未授权查询或操作…

Centos 7.9 使用 iso 搭建本地 YUM 源

Centos 7.9 使用 iso 搭建本地 YUM 源 1 建立挂载点 [rootlocalhost ~]# mkdir -p /media/cdrom/ 2 创建光盘存储路径 [rootlocalhost ~]# mkdir -p /mnt/cdrom/ 3 上传 CentOS-7-x86_64-Everything-2207-02.iso 到 光盘存储路径 [rootlocalhost ~]# ls /mnt/cdrom/ CentOS-…