【大数据学习 | flume】flume之常见的sink组件

Flume Sink取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、HBase sink,。

​ Flume Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据,在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

1. File_roll Sink

File_roll sink是将收集到的数据存放在本地文件系统中,根据指定的时间生成新的文件用来保存数据。

# file_role sink

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/root/file_role
a1.sinks.k1.sink.rollInterval=60
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

rollInterval=60:每隔60s滚动生成一个文件。

创建数据输出目录

mkdir -p /root/file_role

启动flume agent a1 服务端

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./file_roll.agent -Dflume.root.logger=INFO,console

2. hdfs sink

hdfs sink是将flume收集到的数据写入到hdfs中,方便数据可靠的保存。

其中:

sink 输出到hdfs中,默认每10个event 生成一个hdfs文件,hdfs文件目录会根据hdfs.path 的配置自动创建。

sink hdfs 配置参数描述:

名称描述
hdfs.pathhdfs目录路径
hdfs.filePrefix文件前缀。默认值FlumeData
hdfs.fileSuffix文件后缀
hdfs.rollInterval多久时间后close hdfs文件。单位是秒,默认30秒。设置为0的话表示不根据时间close hdfs文件
hdfs.rollSize文件大小超过一定值后,close文件。默认值1024,单位是字节。设置为0的话表示不基于文件大小
hdfs.rollCount写入了多少个事件后close文件。默认值是10个。设置为0的话表示不基于事件个数
hdfs.fileType文件格式, 有3种格式可选择:SequenceFile(默认), DataStream(不压缩) or CompressedStream(可压缩)
hdfs.batchSize批次数,HDFS Sink每次从Channel中拿的事件个数。默认值100
hdfs.minBlockReplicasHDFS每个块最小的replicas数字,不设置的话会取hadoop中的配置
hdfs.maxOpenFiles允许最多打开的文件数,默认是5000。如果超过了这个值,越早的文件会被关闭
hdfs.callTimeoutHDFS操作允许的时间,比如hdfs文件的open,write,flush,close操作。单位是毫秒,默认值是10000
hdfs.codeC压缩编解码器。以下之一:gzip,bzip2,lzo,lzop,snappy
# hdfs sink
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/data/xinniu/output/%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.filePrefix=hainiu-
a1.sinks.k1.hdfs.fileSuffix=.log
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

3. kafka sink

将数据写入到kafka中

# kafka sink
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.topic = hainiu
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动kafka消费者消费hainiu topic中的数据

启动fluem agent

启动flume agent a1 服务端

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./kafkasink.agent -Dflume.root.logger=INFO,console

kafka保存flume收集到的数据,并通过kafka消费者消费到收集到的数据

4. avro sink

将flume收集到的数据通过avro sink序列化出去,通常用于数据跨服服务多级流动。

启动三台机器:

在第一台节点编写agent

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 55555

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

第二台节点编写agent

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=avro
a1.sources.r1.bind=11.94.204.87
a1.sources.r1.port=55555

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=avro
a1.sinks.k1.hostname =11.147.251.96
a1.sinks.k1.port = 55555

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

第三台节点编写agent

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,第三台节点打印数据到控制台

4.1 扇出操作

还可以通过avro sink 实现扇出操作:即第一台服务器收集数据,将数据发送到第二台和第三台服务器。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 c2

a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=100

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555

a1.sinks.k2.type=avro
a1.sinks.k2.hostname = worke-2
a1.sinks.k2.port = 55555

a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

第二台和第三台agent编写如下:

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,第二台和第三台节点打印数据到控制台

4.2 扇入操作

还可以通过avro sink 实现扇入操作:即第一台和第二台手机数据,将数据发送到第三台服务器。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/917848.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++构造函数详解

构造函数详解:C 中对象初始化与构造函数的使用 在 C 中,构造函数是一种特殊的成员函数,它在创建对象时自动调用,用来初始化对象的状态。构造函数帮助我们确保每个对象在被创建时就处于一个有效的状态,并且在不传递任何…

最小的子数组(leetcode 209)

给定一个正整数数组,找到大于等于s的连续的最小长度的区间。 解法一:暴力解法 两层for循环,一个区间终止位置,一个区间起始位置,找到大于等于s的最小区间长度(超时了) 解法二:双指…

应用系统开发(10) 钢轨缺陷的检测系统

涡流检测系统框图 其中信号发生器为一定频率的正弦信号作为激励信号,这个激励信号同时输入给交流电桥中的两个检测线圈,将两个线圈输出的电压差值作为差分信号引出至差分放大电路进行放大,经过放大后信号变为低频的缺陷信号叠加在高频载波上…

结构化需求分析与设计

前言: 感觉书本上和线上课程, 讲的太抽象, 不好理解, 但软件开发不就是为了开发应用程序吗?! 干嘛搞这么抽象,对吧, 下面是个人对于软件开发的看法, 结合我的一些看法, 主打简单易懂, 当然,我一IT界小菜鸟, 对软件开发的认识也很浅显, 这个思维导图也仅仅是现阶段我的看…

列出D3的所有交互方法,并给出示例

D3.js 提供了丰富的交互方法,可以用来增强图表的用户交互体验。以下是一些常用的交互方法及其示例: 1. 鼠标事件 on("mouseover", function) 用途: 当鼠标悬停在元素上时触发。示例:svg.selectAll(".bar").on("mouseover&qu…

【传知代码】VRT_ 关于视频修复的模型

📝个人主页🌹:Eternity._ 🌹🌹期待您的关注 🌹🌹 ❀ VRT_ 关于视频修复的模型 背景介绍:重要性: VRT的重要性和研究背景VRT的背景:VRT的重要性: 视…

软考教材重点内容 信息安全工程师 第 4 章 网络安全体系与网络安全模型

4,1 网络安全体系的主要特征: (1)整体性。网络安全体系从全局、长远的角度实现安全保障,网络安全单元按照一定的规则,相互依赖、相互约束、相互作用而形成人机物一体化的网络安全保护方式。 (2)协同性。网络安全体系依赖于多种安全机制,通过各…

【java】链表:找到成环的起始节点

分析: 定义快慢双指针,在上一篇博客中,分析过若有环,快慢指针一定会相遇。 在这里,想要找到成环的起始节点,我们在快慢指针相遇的时候,让其中一个指针回到开始结点,然后两个指针一步…

使用nossl模式连接MySQL数据库详解

使用nossl模式连接MySQL数据库详解 摘要一、引言二、nossl模式概述2.1 SSL与nossl模式的区别2.2 选择nossl模式的场景三、在nossl模式下连接MySQL数据库3.1 准备工作3.2 C++代码示例3.3 代码详解3.3.1 初始化MySQL连接对象3.3.2 连接到MySQL数据库3.3.3 执行查询操作3.3.4 处理…

Android OpenGL ES详解——立方体贴图

目录 一、概念 二、如何使用 1、创建立方体贴图 2、生成纹理 3、设置纹理环绕和过滤方式 4、激活和绑定立方体贴图 三、应用举例——天空盒 1、概念 2、加载天空盒 3、显示天空盒 4、优化 四、应用举例——环境映射:反射 五、应用举例——环境映射:折射 六、应用…

【模拟仿真】基于区间观测器的故障诊断与容错控制

摘要 本文提出了一种基于区间观测器的故障诊断与容错控制方法。该方法通过构建区间观测器,实现对系统状态的上下边界估计,从而在存在不确定性和外部噪声的情况下进行高效的故障诊断。进一步地,本文设计了一种容错控制策略,以保证…

深度学习-卷积神经网络CNN

案例-图像分类 网络结构: 卷积BN激活池化 数据集介绍 CIFAR-10数据集5万张训练图像、1万张测试图像、10个类别、每个类别有6k个图像,图像大小32323。下图列举了10个类,每一类随机展示了10张图片: 特征图计算 在卷积层和池化层结束后, 将特征…

PHP Switch 语句

<?php switch (expression) {case value1:// 代码块1break;case value2:// 代码块2break;// 更多的 case 语句default:// 如果没有匹配的值&#xff0c;输出这一行 } ?> $color 表示自己的颜色&#xff0c;需要switch循环找到对应的值。 case value : 表示对应的值&am…

Python Plotly 库使用教程

Python Plotly 库使用教程 引言 数据可视化是数据分析中至关重要的一部分&#xff0c;它能够帮助我们更直观地理解数据、发现潜在的模式和趋势。Python 提供了多种数据可视化库&#xff0c;其中 Plotly 是一个功能强大且灵活的库&#xff0c;支持交互式图表的创建。与静态图表…

ubuntu:20.04安装协议逆向工具netzob

创建容器 docker run -d --name ubuntu_env ubuntu:20.04 /bin/bash -c "while true; do sleep 1; done" 63a8f5cf5431a930671ff0e7bb2b667adf001efb05fd7261da244879d2699bec 进入容器 PS E:\src> docker exec -it ubuntu_env /bin/bash 安装常用工具 apt upda…

H3C NX30Pro刷机教程-2024-11-16

H3C NX30Pro刷机教程-2024-11-16 ref: http://www.ttcoder.cn/index.php/2024/11/03/h3c-nx30pro亲测无需分区备份 路由器-新机初始化设置路由器登录密码telnet进入路由器后台 刷机上传uboot到路由器后台在Windows环境下解压后的软件包中打开 tftpd64.exe在NX30Pro环境下通过以…

什么是嵌入式?

目录 一、什么是嵌入式 二、嵌入式系统的特点 &#xff08;一&#xff09;专用性与隐蔽性 &#xff08;二&#xff09;高可靠性与实时性 &#xff08;三&#xff09;资源固定与小型化 三、嵌入式系统的发展历史 &#xff08;一&#xff09;20 世纪 60 年代早期雏形 &am…

学习大数据DAY62 指标计算

客户需求 第一张汇总报表需要的指标 - 决策报表 汇总表 每次计算只有一天的记录 - 大 BOSS: - 全部会员数 新增会员数 - 有效会员数 有效会员占比 - 流失会员数: 倒推一年含一年无消费记录的会员 - 净增有效会员数 - 会员消费级别分类人数 (A >2000 B >1000 < …

快速上手 Vue 3 的高效组件库Element Plus

目录 前言1. 什么是组件&#xff1f;2. 安装与引入 Element Plus2.1 安装 Element Plus2.2 在 main.js 中引入 Element Plus 3. 使用 Element Plus 组件3.1 组件的基本使用3.2 控制组件状态 4. 常用组件实例解析4.1 表单与输入框4.2 表格与分页 5. 组件库的扩展性结语 前言 在…

自动驾驶车载SoC设计功能安全

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 所有人的看法和评价都是暂时的&#xff0c;只有自己的经历是伴随一生的&#xff0c;几乎所有的担忧和畏惧…