大数据-玩转数据-Flume

一、Flume简介

    1. Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Unix环境下运行。
    1. Flume基于流式架构,容错性强,也很灵活简单。
    1. Flume、Kafka用来实时进行数据收集,Spark、Flink用来实时处理数据,impala用来实时查询。

二、Flume角色

2.1、Source

用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。

2.2、Channel

用于桥接Sources和Sinks,类似于一个队列。

2.3、Sink

从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。

2.4、Event

传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。

三、Flume传输过程

source监控某个文件或数据流,数据源产生新的数据,拿到该数据后,将数据封装在一个Event中,并put到channel后commit提交,channel队列先进先出,sink去channel队列中拉取数据,然后写入到HDFS中。

四、Flume部署及使用

4.1 采集架构

在这里插入图片描述

4.2 Flume安装

4.2.1 下载

apache-flume-1.6.0-bin.tar.gz
链接:https://pan.baidu.com/s/1ySmEEObFtKtyT7GsEldnfA
提取码:436t

4.2.2 安装

Flume的安装非常简单,只需要解压即可
tar -zxvf apache-flume-1.6.0-bin.tar.gz
然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME

在这里,我们使用集群模式,因此,需要把在master节点部署的flume分发到slave节点上:
]# scp -rp apache-flume-1.7.0-bin slave1:KaTeX parse error: Expected 'EOF', got '#' at position 6: PWD ]#̲ scp -rp apache…PWD

4.2.3 测试

采集配置:

vi netcat-logger.conf
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述和配置sink组件:k1
a1.sinks.k1.type = logger
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent去采集数据
启动命令:

bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf   指定flume自身的配置文件所在目录
-f conf/netcat-logger.con  指定我们所描述的采集方案
-n a1  指定我们这个agent的名字

在这里插入图片描述
先要往agent采集监听的端口上发送数据,让agent有数据可采
发送命令:

安装telnet:

]# yum install telnet
]# telnet anget-hostname port (telnet localhost 44444)

测试输入输出如下图:
在这里插入图片描述
在这里插入图片描述

4.3 Flume配置

1)Flume 配置分析
在这里插入图片描述
Flume 直接读 log 日志的数据,log 日志的格式是 app-yyyy-mm-dd.log。
2)Flume 的具体配置如下:
(1)在/opt/module/flume/conf 目录下创建 file-flume-kafka.conf 文件

vim file-flume-kafka.conf
a1.sources=r1
a1.channels=c1 c2
#configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/src/apache-flume-1.7.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/log/2020-11-03/app.*.log
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.zgjy.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.zgjy.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_resource = c1
a1.sources.r1.selector.mapping.topic_action = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c1.kafka.topic = topic_resource
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
# configure channe2
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c2.kafka.topic = topic_action
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

测试日志:
在这里插入图片描述
配置说明如下:
在这里插入图片描述

4.4 Flume 的 ETL 和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL 拦截器、日志类型区分拦截器。
ETL 拦截器主要作用:过滤时间戳不合法和 Json 数据不完整的日志
日志类型区分拦截器主要作用:将启动日志和事件日志区分开来,方便发往 Kafka 的不 同 Topic。

1)创建 Maven 工程 flume-interceptor
2)创建包名:com.zgjy.flume.interceptor
3)在 pom.xml 文件中添加如下配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.zgjy</groupId>
    <artifactId>flume-interceptor</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.41</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.3</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>

    </build>

</project>

4)在 com.zgjy.flume.interceptor 包下创建 LogETLInterceptor 类名
Flume ETL 拦截器 LogETLInterceptor实现代码如下:

package 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/123330.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

挑战100天 AI In LeetCode Day05(热题+面试经典150题)

挑战100天 AI In LeetCode Day05&#xff08;热题面试经典150题&#xff09; 一、LeetCode介绍二、LeetCode 热题 HOT 100-72.1 题目2.2 题解 三、面试经典 150 题-73.1 题目3.2 题解 一、LeetCode介绍 LeetCode是一个在线编程网站&#xff0c;提供各种算法和数据结构的题目&am…

JVM GC 垃圾收集器

文章目录 System.gc()内存溢出&#xff08;OOM&#xff09;OOM 的原因 内存泄漏垃圾回收的并行与并发安全点与安全区域 Java 中的引用分类强引用&#xff08;Strong Reference&#xff09;软引用&#xff08;Soft Reference&#xff09;弱引用&#xff08;Weak Reference&#…

基于SSM的小区物业管理系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

Flink SQL Regular Join 、Interval Join、Temporal Join、Lookup Join 详解

Flink ⽀持⾮常多的数据 Join ⽅式&#xff0c;主要包括以下三种&#xff1a; 动态表&#xff08;流&#xff09;与动态表&#xff08;流&#xff09;的 Join动态表&#xff08;流&#xff09;与外部维表&#xff08;⽐如 Redis&#xff09;的 Join动态表字段的列转⾏&#xf…

Linux 入门

Linux 入门 1&#xff1a;linux 用户 root 用户 &#xff1a;也叫超级用户&#xff0c;UID0&#xff0c;其权限最高。系统用户&#xff1a;也叫虚拟用户&#xff0c;UID 1-999普通用户: UID1000-60000, 可以登录系统,操作自己目录下的文件. 1.1:用户操作命令 切换用户: su …

STM32外部中断大问题

问题&#xff1a;一直进入中断&#xff0c;没有触发信号&#xff0c;也一直进入。 描述&#xff1a;开PA0为外部中断&#xff0c;刚刚很好&#xff0c;一个触发信号一个中断&#xff0c;中断函数没有丢&#xff0c;也没有抢跑&#xff0c;开PA1为外部中断也是&#xff0c;都很好…

nfs配置

1.NFS介绍 NFS就是Network File System的缩写&#xff0c;它最大的功能就是可以通过网络&#xff0c;让不同的机器、不同的操 作系统可以共享彼此的文件。 NFS服务器可以让PC将网络中的NFS服务器共享的目录挂载到本地端的文 件系统中&#xff0c;而在本地端的系统中来看&#…

C++初阶 | [二] 类和对象(上)

摘要&#xff1a;class&#xff0c;成员函数&#xff0c;成员变量&#xff0c;类的大小&#xff0c;this 指针 C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题的步骤&#xff0c;通过函数调用逐步解决问题。 C是基于面向对象的&#xff0c;关注的是对象…

CSS 网页布局

网页布局有很多种方式&#xff0c;一般分为以下几个部分&#xff1a;头部区域、菜单导航区域、内容区域、底部区域&#xff1a; 1&#xff09;、头部区域位于整个网页的顶部&#xff0c;一般用于设置网页的标题或者网页的logo。 <style> body { margin: 0; } /* 头部样…

【Redis】list常用命令内部编码使用场景

文章目录 前置知识列表类型的特点 命令LPUSHLPUSHXRPUSHRPUSHXLRANGELPOPRPOPLINDEXLREMLINSERTLTRIMLSETLLEN 阻塞版本命令BLPOPBRPOP 命令总结内部编码测试内部编码 使用场景消息队列分频道的消息队列 模拟栈和队列 前置知识 列表类型是⽤来存储多个有序的字符串&#xff0c…

软件测试|Monkey基本参数介绍

说到android移动端稳定性测试&#xff0c;大家通常会想到android系统自动Monkey小猴子&#xff0c;通过Monkey命令模拟用户触摸点击屏幕、滑动、系统按键等操作来对设备上的app进行压力测试&#xff0c;来测试应用的稳定性和健壮性。 下面就说说monkey常用参数的用法~~ 1、-h…

Spring笔记(二)(黑马)(AOP面向切面编程)

01、AOP 简介 1.1 AOP的概念 AOP&#xff0c;Aspect Oriented Programming&#xff0c;面向切面编程&#xff0c;是对面向对象编程OOP的升华。OOP是纵向对一个事物的抽象&#xff0c;一个对象包括静态的属性信息&#xff0c;包括动态的方法信息等。而AOP是横向的对不同事物的…

spdk用户态块层详解

先通过回顾内核态的通用块层来详细介绍SPDK通用块层&#xff0c;包括通用块层的架构、核心数据结构、数据流方面的考量等。最后描述基于通用块层之上的两个特性&#xff1a;一是逻辑卷的支持&#xff0c;基于通用块设备的Blobstore和各种逻辑卷的特性&#xff0c;精简配置&…

C# OpenCvSharp 去除文字中的线条

效果 中间过程效果 项目 代码 using OpenCvSharp; using System; using System.Drawing; using System.Windows.Forms; using static System.Net.Mime.MediaTypeNames;namespace OpenCvSharp_Demo {public partial class frmMain : Form{public frmMain(){InitializeComponent…

Spring面试题:(四)Spring Bean生命周期

Bean生命周期的阶段 实例化初始化完成销毁 IoC容器实例化Bean的流程 Bean定义 Bean工厂处理 反射实例化Bean 初始化 完成存储到单例池 Bean生命周期 Bean初始化话过程 属性填充aware接口BeanPostProcessor前置处理InitialzingBean接口初始化方法自定义init方法BeanPost…

Oracle(15)Managing Users

目录 一、基础知识 1、Users and Security 用户和安全 2、Database Schema 3、Checklist for Creating Users创建用户步骤 二、基础操作 1、创建一个用户 2、OS Authentication 操作系统身份验证 3、Dropping a User 删除用户 4、Getting User Information 获取用户信…

搭建自己的MQTT服务器,实现设备上云(Ubuntu+EMQX)

一、EMQX介绍 这篇文章教大家在ECS云服务器上部署EMQX,搭建自己私有的MQTT服务器,配置EMQX实现设备上云,设备数据转发,存储;服务器我采用的华为云的ECS服务器,系统选择Ubuntu系统。 Windows版本的看这里: https://blog.csdn.net/xiaolong1126626497/article/details/1…

经验模态分解(Empirical Mode Decomposition,EMD)(附代码)

代码原理 EMD&#xff08;Empirical Mode Decomposition&#xff09;&#xff0c;也称为经验模态分解&#xff0c;是一种将非线性和非平稳信号分解成多个本征模态函数&#xff08;Intrinsic Mode Functions&#xff0c;简称IMF&#xff09;的方法。 EMD的基本原理是通过一系列…

掌动智能:UI自动化测试工具产品功能和优势

UI自动化测试工具是一种软件工具&#xff0c;用于模拟用户与应用程序的交互&#xff0c;检查应用程序的用户界面是否按预期工作。这些工具允许开发人员编写测试脚本&#xff0c;以模拟用户操作&#xff0c;例如点击按钮、输入文本、导航菜单等&#xff0c;然后验证应用程序的响…

哪款手机便签软件支持存储录音文件并支持转文字?

手机便签类软件带有存储录音转文字功能是比较实用的&#xff0c;很多人通常会整理很多录音类型的文件&#xff0c;录音文件整合在一起后&#xff0c;后续有需要可以逐条点开播放收听。尤其是在工作中&#xff0c;当领导说一些重点时&#xff0c;大家无法借助灵活的大脑来成功的…