03数据仓库Flume

Flume 功能

在这里插入图片描述
Flume主要作用,就是实时读取服务器本地磁盘数据,将数据写入到 HDFS。

Flume是 Cloudera提供的高可用,高可靠性,分布式的海量日志采集、聚合和传输的系统工具。

Flume 架构

Flume组成架构如下图所示:
在这里插入图片描述

Agent

每个 Agent 代表着一个 JVM 进程,它以事件的方式将数据从源头送至目的地。
Agent 由 3 个部分组成,Source、Channel、Sink。

  • Source:Source是负责接收数据到Flume Agent 的组件,Source 组件可以处理各种类型、各种格式的日志数据。
  • Sink:Sink不断轮询Channel中的事件并且批量移除他们,并将这些数据批量写入到存储或者索引系统,或被发送到另一个Flume Agent。
  • Channel:Channel是位于Source 和Sink 之间的缓冲区。因此,Channel允许 Source和 Sink 有不同的运行速率。Channel 是线程安全的, 可以同时处理几个 Source 的写操作和多个 Sink 的读操作。
    Flume自带两种 Channel:Memory Channel 和File Channel。
    Memory Channel 是一个内存队列,Source将事件写入其尾部,Sink从其头部读取事件。 Memory Channel 将源写入的事件存储在堆上。由于它将所有数据存储在内存中,因此提供了高吞吐量。 它最适合那些不担心数据丢失的流。 它不适合涉及数据丢失的数据流。
    File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
  • Event:Flume 传输数据的基本单元,数据以 Event的形式将数据从源头送至目的地。Event由Header 和 Body 组成,Header用来存放event 的属性,为 K-V结构,Body 用于存放数据,为字节数组结构。

安装 flume

话不多说,直接开始安装 flume。

  1. 前往 http://archive.apache.org/dist/flume/ , 选择1.9.0
  2. 将apache-flume-1.9.0-bin.tar.gz使用 sftp上传到linux的/opt/software目录下
    在这里插入图片描述
  3. 解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
[logan@hadoop101 hadoop]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
  1. 创建软链接
cd /opt/module
ln -snf flume-1.9.0/ flume
  1. 删除jar 包,否则会报错
cd /opt/module/flume
rm lib/guava-11.0.2.jar

注意删除guava,一定要配置 Hadoop 环境变量,否则会报错:

Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more
  1. 修改log4j.properties配置文件
[logan@hadoop101 flume]$ vim conf/log4j.properties 
# 注意是修改内容
flume.log.dir=/opt/module/flume/logs
  1. 验证 flume
[logan@hadoop101 flume]$ /opt/module/flume/bin/flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

Flume日志采集

配置解析

需要采集的日志文件分布在hadoop101,hadoop102两台日志服务器,故需要在hadoop101,hadoop102上配置日志采集 Flume。Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到 Kafka。
此处选择TailDirSource和 KafkaChannel。

  • TailDirSource优势:支持断点续传、多目录。
  • KafkaChannel优势:省去了 Sink,提高了效率。
  • 日志采集 Flume关键配置 :
    在这里插入图片描述

Flume日志采集配置

  1. 创建Flume 配置文件
[logan@hadoop101 flume]$ pwd
/opt/module/flume
[logan@hadoop101 flume]$ mkdir job
[logan@hadoop101 flume]$ vim job/file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.logan.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1
  1. 编写拦截器
    • 创建Maven工程flume-interceptor
    • 创建包:com.logan.gmall.flume.interceptor
    • 在pom.xml文件中添加如下配置
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>
    
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    • 在com.logan.gmall.flume.utils包下创建JSONUtil类
    package com.logan.gmall.flume.utils;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONException;
    
    public class JSONUtil {
        /**
         * 通过异常判定是否是JSON字符串
         */
        public static boolean isJSONValidate(String input){
            try {
                JSON.parseObject(input);
                return true;
            } catch (JSONException e) {
                return false;
            }
        }
    }
    
    • 在com.logan.gmall.flume.interceptor包下创建ETLInterceptor类
    package com.logan.gmall.flume.interceptor;
    
    import com.logan.gmall.flume.utils.JSONUtil;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.List;
    
    public class ETLInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            //1、获取body当中的数据并转成字符串
            byte[] body = event.getBody();
            String log = new String(body, StandardCharsets.UTF_8);
            //2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
            if (JSONUtil.isJSONValidate(log)) {
                return event;
            } else {
                return null;
            }
        }
    
    
        @Override
        public List<Event> intercept(List<Event> list) {
    
            Iterator<Event> iterator = list.iterator();
    
            while (iterator.hasNext()){
                Event next = iterator.next();
                if(intercept(next)==null){
                    iterator.remove();
                }
            }
    
            return list;
        }
    
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new ETLInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    
    
        @Override
        public void close() {
    
        }
    }
    
    • 打包
      在这里插入图片描述
    • 将打包文件放到hadoop101 的/opt/moduie/flume/lib文件夹下

采集测试

  1. 启动Zookeeper、Kafka集群
  2. 启动 hadoop101上的日志采集Flume
[logan@hadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
  1. 启动一个Kafka的Console-Consumer
[logan@hadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topic_log
  1. 生成模拟数据
[logan@hadoop101 module]$ mkdir -p /opt/module/applog/log/
[logan@hadoop101 module]$ vim /opt/module/applog/log/app.2023-12-02.log
{"common":{"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs","mid":"mid_552171","os":"iOS 13.3.1","uid":"919","vc":"v2.1.134"},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","order":1,"pos_id":5},{"display_type":"activity","item":"2","item_type":"activity_id","order":2,"pos_id":5},{"display_type":"query","item":"19","item_type":"sku_id","order":3,"pos_id":4},{"display_type":"query","item":"3","item_type":"sku_id","order":4,"pos_id":2},{"display_type":"query","item":"5","item_type":"sku_id","order":5,"pos_id":2},{"display_type":"promotion","item":"19","item_type":"sku_id","order":6,"pos_id":4},{"display_type":"query","item":"14","item_type":"sku_id","order":7,"pos_id":2},{"display_type":"query","item":"9","item_type":"sku_id","order":8,"pos_id":2},{"display_type":"promotion","item":"35","item_type":"sku_id","order":9,"pos_id":1}],"page":{"during_time":9853,"page_id":"home"},"ts":1672512476000}
{"actions":[{"action_id":"favor_add","item":"9","item_type":"sku_id","ts":1672512480386},{"action_id":"get_coupon","item":"2","item_type":"coupon_id","ts":1672512483772}],"common":{"ar":"500000","ba":"iPhone","ch":"Appstore","is_new":"1","md":"iPhone Xs","mid":"mid_552171","os":"iOS 13.3.1","uid":"919","vc":"v2.1.134"},"displays":[{"display_type":"promotion","item":"19","item_type":"sku_id","order":1,"pos_id":4},{"display_type":"promotion","item":"14","item_type":"sku_id","order":2,"pos_id":5},{"display_type":"query","item":"21","item_type":"sku_id","order":3,"pos_id":1},{"display_type":"query","item":"11","item_type":"sku_id","order":4,"pos_id":2},{"display_type":"promotion","item":"28","item_type":"sku_id","order":5,"pos_id":1}],"page":{"during_time":10158,"item":"9","item_type":"sku_id","last_page_id":"home","page_id":"good_detail","source_type":"promotion"},"ts":1672512477000}
  1. 观察kafka是否有消费到数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/209563.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

力扣232-用栈实现队列

文章目录 力扣232-用栈实现队列示例代码实现总结收获 力扣232-用栈实现队列 示例 代码实现 class MyQueue {Deque<Integer> instack;Deque<Integer> outstack ;public MyQueue() {instacknew ArrayDeque<Integer>();outstacknew ArrayDeque<Integer>(…

scrapy介绍,并创建第一个项目

一、scrapy简介 scrapy的概念 Scrapy是一个Python编写的开源网络爬虫框架。它是一个被设计用于爬取网络数据、提取结构性数据的框架。 Scrapy 使用了Twisted异步网络框架&#xff0c;可以加快我们的下载速度。 Scrapy文档地址&#xff1a;http://scrapy-chs.readthedocs.io/z…

西南科技大学模拟电子技术实验三(BJT单管共射放大电路测试)预习报告

一、计算/设计过程 说明:本实验是验证性实验,计算预测验证结果。是设计性实验一定要从系统指标计算出元件参数过程,越详细越好。用公式输入法完成相关公式内容,不得贴手写图片。(注意:从抽象公式直接得出结果,不得分,页数可根据内容调整) 二、画出并填写实验指导书上…

前缀和 LeetCode1094 拼车

1094. 拼车 车上最初有 capacity 个空座位。车 只能 向一个方向行驶&#xff08;也就是说&#xff0c;不允许掉头或改变方向&#xff09; 给定整数 capacity 和一个数组 trips , trip[i] [numPassengersi, fromi, toi] 表示第 i 次旅行有 numPassengersi 乘客&#xff0c;接…

【面经八股】搜广推方向:面试记录(三)

【面经&八股】搜广推方向:面试记录(三) 文章目录 【面经&八股】搜广推方向:面试记录(三)1. 编程题1.1 大数乘法1.2 大数加法2. 项目介绍3. 有了解过的广告推荐模型吗4. 广告模型回归问题1. 编程题 上来直接写编程题,有点儿懵逼。 1.1 大数乘法 可以参考 该博…

【实战教程】PHP如何轻松对接腾讯云COS,实现文件上传下载?

腾讯云提供了一系列丰富的云服务&#xff0c;其中包括对象存储&#xff08;Cloud Object Storage&#xff0c;简称COS&#xff09;&#xff0c;它是一种高可靠性、可扩展性强的云存储服务。本文将介绍如何使用PHP对接腾讯云COS存储服务&#xff0c;实现文件的上传和下载功能。 …

《地理信息系统原理》笔记/期末复习资料(7. 空间分析)

目录 7. 空间分析 7.1 空间分析的内容与步骤 7.2 数据检索及表格分析 7.2.1 属性统计分析 7.2.2 布尔逻辑查询 7.2.3 空间数据库查询语言 7.2.4 重分类&#xff0c;边界消除与合并 7.3 叠置分析 7.3.1 栅格系统的叠加分析 7.3.2 矢量系统的叠加分析&#xff08;拓扑叠…

idea一步一步安装教程

目录 一、什么是idea 二、IDEA安装步骤 1、进入官网下载 2、 下载安装包 3、安装 一、什么是idea IntelliJ IDEA是由JetBrains公司开发的一款集成开发环境&#xff08;IDE&#xff09;&#xff0c;主要用于Java、Kotlin、Groovy等编程语言的开发。IDE是指一种集成了开发者…

Redis 入门、基础。(五种基本类型使用场景)

文章目录 1. 概况1.1 认识 NoSQL1.1.1 查询方式1.1.2 事务1.1.3 总结 2. 认识 Redis4. Redis 常见命令4.1 Redis 数据结构介绍4.2 Redis 通用命令4.3 Redis 命令之 String 命令4.4 Redis 命令的层级结构4.5 Redis 命令之 Hash 命令4.6 Redis 命令之 List 命令4.7 set 唯一不排序…

配备与业务相适应的质检人员。质检人员应当是测绘专业技术人员

配备与业务相适应的质检人员。质检人员应当是测绘专业技术人员 任命质检人员的任命文件&#xff1b;质检人员的专业技术职称。

深入理解Servlet(中)

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 上篇有一张图&#xff…

L1-012:计算指数

⭐题目描述⭐ 真的没骗你&#xff0c;这道才是简单题 —— 对任意给定的不超过 10 的正整数 n&#xff0c;要求你输出 2n。不难吧&#xff1f; 输入格式&#xff1a; 输入在一行中给出一个不超过 10 的正整数 n。 输出格式&#xff1a; 在一行中按照格式 2^n 计算结果 输出 2n…

DELL EMC unity 存储系统日志收集方法

对于一些非简单的硬件故障&#xff0c;解决故障最有效、最快速的方法就是收集日志&#xff0c;而不是瞎搞。常见的乱搞方法就是 1. reimage系统‘ 2. 更换控制器&#xff1b;3&#xff0c; 重启。 本文详细介绍了图形界面GUI和命令行CLI下如何收集DELL EMC Unity日志的方法和常…

如何在财税行业查找批量客户?

现在市场上代记账公司也不算少&#xff0c;做过这行的都知道&#xff0c;最初呢行业竞争不强&#xff0c;都是靠地推、老客户转介绍&#xff0c;或者长期以往的蹲守各个地区的工商注册服务中心&#xff0c;找那些才注册企业的老板或者创业者。但是&#xff0c;随着市场经济的发…

【前端】利用正则生成目录,附加解决a链接锚点偏移

前言 从html字符串中提取出来目录。 目标和源内容都很明确&#xff0c;源内容是html字符串&#xff0c;提取目标是html字符串中h1~h6元素和其闭合标签中间的内容。 思路 分析 获取html字符串 第一步要获取html字符串内容。 观察html字符串 第二步&#xff0c; 观察html字…

鸿蒙开发笔记

最近比较火&#xff0c;本身也是做前端的&#xff0c;就抽空学习了下。对前端很友好 原视频地址&#xff1a;黑马b站鸿蒙OS视频 下载安装跟着视频或者文档就可以了。如果你电脑上安装的有node&#xff0c;但是开发工具显示你没安装&#xff0c;不用动咱们的node&#xff0c;直…

《凤凰项目》读书笔记

文章目录 一、书名和作者二、书籍概览2.1 主要论点和结构2.2 目标读者和应用场景 三、核心观点与主题3.1 DevOps的核心原则与文化变革3.2 持续交付与自动化3.3 变更管理与风险控制3.4 关键绩效指标与持续改进 四、亮点与启发4.1 最有影响的观点4.2 对个人专业发展的启示 五、批…

安装以及使用 stylepro_artistic 所遇问题解决

根据https://github.com/PaddlePaddle/PaddleHub/blob/develop/docs/docs_ch/get_start/windows_quickstart.md 安装 hub install stylepro_artistic1.0.1 出现ImportError: cannot import name ‘Constant’ from ‘paddle.fluid.initializer’&#xff0c;提示在File: "…

谈谈Listener

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 Tomcat三大组件&#x…

图书馆座位预约时间冲突提示(前后端全) 前端elementUI 时间选择器只显示时和分,SQL实现时间冲突判断

背景 帮客户定制项目&#xff0c;要实现图书馆预约座位的功能。 功能描述如下&#xff1a;学生选择开始时间和结束时间&#xff0c;只选择小时和分钟&#xff0c;提交预约后&#xff0c;如果该时间有冲突提示学生修改预约时间。 问题 前端样式选择的是elmentUI&#xff0c;但…