3-Flume之拦截器与GangLia监控

Flume

Interceptor

概述

  1. Interceptor(拦截器)本身是Source的子组件之一,可以对数据进行拦截、过滤、替换等操作
  2. 不同于Selector,一个Source上可以配置多个Interceptor,构成拦截器链。需要注意的是,后一个拦截器不能和前一个拦截器的规则相反!

Timestamp Interceptor

  1. 在Event的headers中添加一个timestamp字段来表示数据被收集的时间戳(单位是毫秒!)

  2. 案例:Event的header中自动添加上时间戳

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    # 给拦截器起名
    a1.sources.s1.interceptors = i1
    # 配置Timestamp Interceptor
    # 类型必须是timestamp
    a1.sources.s1.interceptors.i1.type = timestamp
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.sinks.k1.type = logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    
  3. Timestamp Interceptor结合HDFS Sink可以实现数据的按时间段存放。文件名后会添加上年月日,并且每日的Event输出到当天对应的文件中。

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    a1.sources.s1.interceptors = i1
    a1.sources.s1.interceptors.i1.type = timestamp
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flume_data/logdate=%Y-%m-%d
    a1.sinks.k1.hdfs.rollInterval = 3600
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 1000000000
    a1.sinks.k1.hdfs.fileType = DataStream
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Host Interceptor

  1. 在Event的headers中添加一个host字段,用于标记这个数据的来源主机

  2. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    # 给拦截器起名。多个拦截器的命名必须在一行。不能写在两行
    a1.sources.s1.interceptors = i1 i2
    # 配置Timestamp Interceptor
    # 类型必须是timestamp
    a1.sources.s1.interceptors.i1.type = timestamp
    # 配置Host Interceptor
    # 类型必须是host
    a1.sources.s1.interceptors.i2.type = host
    # 使用IP还是主机名
    a1.sources.s1.interceptors.i2.useIP = false
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.sinks.k1.type = logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Static Interceptor

  1. 在Event的headers中添加指定的字段以及指定的值,实际过程中用于做标记

  2. 案例 Event的header中自动添加 class:flume这一key value

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    # 给拦截器起名
    a1.sources.s1.interceptors = i1 i2 i3
    # 配置Timestamp Interceptor
    # 类型必须是timestamp
    a1.sources.s1.interceptors.i1.type = timestamp
    # 配置Host Interceptor
    # 类型必须是host
    a1.sources.s1.interceptors.i2.type = host
    # 使用IP还是主机名
    a1.sources.s1.interceptors.i2.useIP = true
    # 配置Static Interceptor
    # 类型必须是static
    a1.sources.s1.interceptors.i3.type = static
    # 指定键
    a1.sources.s1.interceptors.i3.key = class
    # 指定值
    a1.sources.s1.interceptors.i3.value = flume
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.sinks.k1.type = logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

UUID Interceptor

  1. UUID计算产生一串编号,由于编号位数比较多,因此几乎不太可能产生重复的编号,因此实际过程中经常使用UUID作为唯一标记

  2. UUID Interceptor是在Event的headers中添加一个id字段来标记这个数据的唯一性

  3. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    # 给拦截器起名 
    a1.sources.s1.interceptors = i1 i2 i3 i4
    # 配置Timestamp Interceptor
    # 类型必须是timestamp
    a1.sources.s1.interceptors.i1.type = timestamp
    # 配置Host Interceptor
    # 类型必须是host
    a1.sources.s1.interceptors.i2.type = host
    # 使用IP还是主机名
    a1.sources.s1.interceptors.i2.useIP = true
    # 配置Static Interceptor
    # 类型必须是static
    a1.sources.s1.interceptors.i3.type = static
    # 指定键
    a1.sources.s1.interceptors.i3.key = class
    # 指定值
    a1.sources.s1.interceptors.i3.value = flume
    # 配置UUID Interceptor
    # 类型是UUIDInterceptor$Builder
    a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
    
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.sinks.k1.type = logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Search And Replace Interceptor

  1. 在使用的时候需要指定正则表达式,会将满足正则表达式的数据替换为指定形式的数据。在替换的时候,只替换body中的数据,不替换headers中的数据!!!

  2. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = http
    a1.sources.s1.port = 8888
    a1.sources.s1.interceptors = i1
    # 类型必须是search_replace
    a1.sources.s1.interceptors.i1.type = search_replace
    # 指定正则表达式
    a1.sources.s1.interceptors.i1.searchPattern = [0-9]
    # 指定替换形式
    a1.sources.s1.interceptors.i1.replaceString = *
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type = logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Regex Filtering Interceptor

  1. 在使用的时候需要指定正则表达式。通过属性excludeEvents来决定过滤方式。如果excludeEvents的值为true,表示符合正则表达式形式的数据会被过滤掉;如果excludeEvents的值为false,那么表示不符合正则表达式形式的数据会被过滤掉

  2. 案例

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8888
    a1.sources.s1.interceptors = i1
    # 类型必须是regex_filter
    a1.sources.s1.interceptors.i1.type = regex_filter
    # 指定正则表达式
    a1.sources.s1.interceptors.i1.regex = .*[0-9].*
    # 指定过滤方式
    a1.sources.s1.interceptors.i1.excludeEvents = true
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type = logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

Custom Interceptor

  1. 定义一个类实现Interceptor接口,同时还需要提供内部类Builder
//模拟TimeStamp
public class AuthInterceptor implements Interceptor {


    @Override
    public void initialize() {

    }

    //单条处理
    @Override
    public Event intercept(Event event){

        Map<String, String> headers = event.getHeaders();
        if(headers.containsKey("time")||headers.containsKey("timestamp"))  return event;
        //时间格式可自定义
        headers.put("time", String.valueOf(System.currentTimeMillis()));
        return event;
    }

  
    //按批处理
    @Override
    public List<Event> intercept(List<Event> list) {
        // 定义集合存储处理之后的数据
        List<Event> es = new ArrayList<>();
        for (Event event : es) {
            Event e = intercept(event);
            es.add(e);
        }
        return es;
    }

    @Override
    public void close() {

    }


//这个权限修饰符手动改为public  。  默认为default,外部程序访问不到
public static class AuthBuilder implements Builder {

    // 通过这个函数来获取当前拦截器对象
    @Override
    public Interceptor build() {
        return new AuthInterceptor();
    }

    // 获取配置
    @Override
    public void configure(Context context) {
    }
}
}
  1. 打成jar包,然后放到Flume的lib目录下

    cd /opt/software/flume-1.11.0/lib/
    rz
    
  2. 格式文件

    cd ../data
    

    在文件中添加

    a1.sources = s1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = 0.0.0.0
    a1.sources.s1.port = 8080
    a1.sources.s1.interceptors = i1
    a1.sources.s1.interceptors.i1.type = com.fesco.in.AuthInterceptor$AuthBuilder
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.sinks.k1.type = logger
    
    a1.sources.s1.channels = c1
    a1.sinks.k1.channel = c1
    

**拦截器的功能:**自动给headers加时间戳。将events按时间在channel中存储在不同文件。给Headers中自动添加该条events的来源主机号;自定义一对kv添加到headers中。将body中的内容按正则匹配并调换。 将body中的内容过滤。 生成UUID唯一标识。

其他

执行流程

Flume执行流程
  1. Source采集数据,数据被采集之后,会交给ChannelProcessor来处理
  2. ChannelProcessor收到数据之后,会将数据交给Interceptor来进行过滤、拦截、替换等操作。需要注意的是,可以存在多个Interceptor,构成拦截器链
  3. Interceptor处理完数据之后,会将数据交给Selector来进行分发。Selector有3种模式:replicatingmultipexingload balancing。根据指定的模式,将数据分发给对应的Channel
  4. Channel收到数据之后,会将数据推送给SinkProcessor。SinkProcessor本质上是一个SinkGroup,需要将一个或者多个Sink绑定到一个组中,支持三种模式:defaultfailoverload balancing
  5. SinkProcessor收到数据之后,将数据按照指定模式推送给Sink,Sink将数据写到目的地

扩展:Flume监控 - Ganglia

概述
  1. 实际过程中,可以使用Ganglia监控Flume的数据流。Ganglia是Berkeley发起的一个开源的集群监控项目,可以检测数以千计的节点的性能
  2. Ganglia包含三个模块
    1. gmond(Ganglia Monitoring Daemon):轻量级的监控服务,需要监控哪一个节点的性能,就在这个节点上安装gmond服务,可以监控当前节点(系统)的各种指标数据:CPU、内存、磁盘、网络等信息
    2. gmetad(Ganglia Meta Daemon):轻量级的汇合服务,可以将监控信息以RRD格式来存储到磁盘上
    3. gweb(Ganglia Web):Ganglia提供的轻量级的可视化服务,本身是使用PHP来开发的,提供了WEB页面,能够使得用户较为直观和简便的查看到节点的性能
安装
  1. 三个节点上都需要安装httpd和php服务

    yum -y install httpd php
    
  2. 三个节点上都需要安装rrd服务

    yum -y install rrdtool perl-rrdtool rrdtool-devel apr-devel
    
  3. 三个节点依赖Epel

    yum -y install epel-release
    
  4. 第一个节点上安装Ganglia

    yum -y install ganglia-gmetad ganglia-gmond ganglia-web
    
  5. 其他两个节点上安装gmond服务

    yum -y install ganglia-gmond
    
  6. 第一个节点上修改ganglia.conf

    vim /etc/httpd/conf.d/ganglia.conf
    

    文件修改如下

    <Location /ganglia>
      # Require local
      # Require ip 10.1.2.3
      # Require host example.org
      Require all granted
    </Location>
    
  7. 第一个节点上修改gmetad.conf

    vim /etc/ganglia/gmetad.conf
    

    修改data_source属性的值

    data_source "flume_cluster" hadoop01
    
  8. 三个节点上修改gmond.conf

    vim /etc/ganglia/gmond.conf
    

    修改cluster中的属性值

    cluster {
      name = "flume_cluster"
      owner = "unspecified"
      latlong = "unspecified"
      url = "unspecified"
    }
    

    修改udp_send_channel中的属性值

    udp_send_channel {
      #bind_hostname = yes # Highly recommended, soon to be default.
                           # This option tells gmond to use a source address
                           # that resolves to the machine's hostname.  Without
                           # this, the metrics may appear to come from any
                           # interface and the DNS names associated with
                           # those IPs will be used to create the RRDs.
      # mcast_join = 239.2.11.71
      # 将监控的信息发送到指定的节点收集
      host = hadoop01
      port = 8649
      ttl = 1
    }
    

    修改udp_recv_channel中的属性值

    udp_recv_channel {
      # mcast_join = 239.2.11.71
      port = 8649
      # 接收任意主机的连接
      bind = 0.0.0.0
      retry_bind = true
      # Size of the UDP buffer. If you are handling lots of metrics you really
      # should bump it up to e.g. 10MB or even higher.
      # buffer = 10485760
    }
    
  9. 三个节点启动gmond服务

    systemctl start gmond
    # 查看进程是否启动
    ps -ef | grep -i gmond
    
  10. 在一个节点上启动gmetad和httpd服务

    :systemctl start gmetad
    systemctl start httpd
    
  11. 在浏览器中输入http://IP/ganglia/

监控Flume
  1. 修改Flume的配置

    # 进入Flume的配置目录
    cd /opt/software/flume-1.11.0/conf/
    # 复制文件
    cp flume-env.sh.template flume-env.sh
    # 编辑文件
    vim flume-env.sh
    # 在文件尾部添加
    export JAVA_HOME=/opt/software/jdk1.8
    export JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.root.monitoring.hosts=hadoop01:8649 -Xms100m -Xmx200m"
    # 保存退出,生效
    source flume-env.sh
    
  2. 启动Flume

    cd ../data
    
    //type =ganglia      type和=之间有空格会报错
    
    //开启监控后,在执行之间的格式文件,就要用这个格式的指令
    flume-ng agent -n a1 -c $FLUME_HOME/conf -f basic.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=hadoop01:8649
    
  3. 属性解释

    属性解释
    ChannelCapacityChannel的容量
    ChannelFillPercentageChannel的利用率
    ChannelSizeChannel的大小
    EventPutAttemptCountPutList向Channel尝试推送数据的次数
    EventPutSuccessCountPutList向Channel推送数据成功的次数
    EventTakeAttemptCountTakeList向Sink推送数据的次数
    EventTakeSuccessCountTakeList向Sink推送数据成功的次数
    StartTime起始时间
    StopTime结束时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/491299.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

zookeeper面试题

文章目录 ZooKeeper 是什么&#xff1f;ZooKeeper 提供什么&#xff1f;1. 文件系统2. 通知机制 ZooKeeper 文件系统四种类型的 znode1. PERSISTENT (持久化目录节点)2. PERSISTENT_SEQUENTIAL (持久化顺序编号目录节点)3. EPHEMERAL (临时目录节点)4. EPHEMERAL_SEQUENTIAL (临…

flutter 弹窗之系列二

自定义弹窗&#xff08;含底部抽屉&#xff09;Dialog class MyHomePage extends StatefulWidget {const MyHomePage({super.key, required this.title});final String title;overrideState<MyHomePage> createState() > _MyHomePageState(); }class _MyHomePageState…

教程3_图像的轮廓

目录 目标 1. 特征矩 2、轮廓质心 3. 轮廓面积 4. 轮廓周长 5. 轮廓近似 6. 轮廓凸包 7. 边界矩形 7.1.直角矩形 7.2. 旋转矩形 8. 最小闭合圈 9. 拟合一个椭圆 10. 拟合直线 目标 在本文中&#xff0c;我们将学习 - 如何找到轮廓的不同特征&#xff0c;例如面积&…

【数据分享】1929-2023年全球站点的逐年平均露点(Shp\Excel\免费获取)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、能见度等指标&#xff0c;说到气象数据&#xff0c;最详细的气象数据是具体到气象监测站点的数据&#xff01; 有关气象指标的监测站点数据&#xff0c;之前我们分享过1929-2023年全球气象站…

某云盘encryptMsg 加密之自动化扣webpack

前言 本文主要介绍了webpack半自动化扣代码的流程。不需要ast基础。也不涉及加密的过程。网站硬扣的话很麻烦。特地挑一个模块多的网站去搞。着重介绍于webpack工具的使用与实现方式。 这里的话。本人也开源到了github上了。本人代码写的烂。大佬勿喷&#xff0c; https://gi…

PriorityQueue详细解读

咦咦咦&#xff0c;各位小可爱&#xff0c;我是你们的好伙伴——bug菌&#xff0c;今天又来给大家普及Java SE相关知识点了&#xff0c;别躲起来啊&#xff0c;听我讲干货还不快点赞&#xff0c;赞多了我就有动力讲得更嗨啦&#xff01;所以呀&#xff0c;养成先点赞后阅读的好…

Quartus II仿真出现错误

ModelSim executable not found in D:/intelFPGA/18.0/quartus/bin64/modelsim_ase/win32aloem/ Error. 找不到modelsim地址&#xff0c;原来是我下载了.exe,但没有双击启动安装ase文件夹呀&#xff01;&#xff01;&#xff01;&#xff01;晕&#xff0c;服了我自己

【C++11】thread线程库

【C11】thread线程库 目录 【C11】thread线程库thread类的简单介绍函数指针lambda表达式常用在线程中 线程函数参数join与detach利用RAII思想来自动回收线程 原子性操作库(atomic)atomic中的load函数&#xff1a;atomic中对变量进行原子操作的一些函数 CAS(Compare-And-Swap)无…

YOLOv5全网独家改进: 红外小目标 | 注意力改进 | 多膨胀通道精炼(MDCR)模块,红外小目标暴力涨点| 2024年3月最新成果

💡💡💡本文独家改进:多膨胀通道精炼(MDCR)模块,解决目标的大小微小以及红外图像中通常具有复杂的背景的问题点,2024年3月最新成果 💡💡💡红外小目标实现暴力涨点,只有几个像素的小目标识别率大幅度提升 改进结构图如下: 收录 YOLOv5原创自研 https://b…

jupyter lab使用虚拟环境

python -m ipykernel install --name 虚拟环境名 --display-name 虚拟环境名然后再启动jupyter lab就行了

Gitea CORS Access-Control-Allow-Origin 的问题

最近我们在想使用我们提供的代码库进行元数据提供的时候&#xff0c;启动的服务报 CORS 问题。 如果你的 Gitea 服务器是直接暴露给外部使用的话&#xff0c;可以在 Gitea 的配置文件中添加下面的配置&#xff1a; [cors] ENABLED true ALLOW_DOMAIN *在完成上面的…

【学习心得】神经网络知识中的符号解释

这里我对我学到的神经网络知识中&#xff0c;常见的符号做一下记录和总结&#xff0c;方便自己在后面学习中复习。下图二分类识别图像识别猫为例。为了保存一张图片&#xff0c;需要三个矩阵&#xff0c;它们分别对应图片中的红、绿、蓝三种颜色通道&#xff0c;如果图片大小为…

MySQL高阶SQL语句

文章目录 MySQL高阶SQL语句MySQL常用查询1、按关键字排序1.1 语法1.2 ASC和DESC1.3 对数据表中信息进行排序1.3.1 普通排序1.3.2 结合where进行条件过滤1.3.3 对多个字段进行排序 2、区间判断及查询不重复记录2.1 and/or —— 且/或2.1.1 普通查询2.1.2 嵌套/多条件查询 2.2 di…

aspect-ratio宽高比

<div class"wrapper"><div class"item">grid-tamplate-columns&#xff1a;设置容器每列的宽度(项目的宽度)grid-template-rows&#xff1a;设置容器每行的宽度(项目的高度)grid-row-gap&#xff1a;设置每行之间的行间距grid-column-gap&…

指定的文件类型无效: animExport

指定的文件类型无效: animExport 原因anim插件没有启用 你可以在 Maya 的“窗口”&#xff08;Window&#xff09;> “设置/首选项”&#xff08;Settings/Preferences&#xff09;> “插件管理器”&#xff08;Plug-in Manager&#xff09;中查看和管理插件。

⼗多种免费Unity VR资源⼯具

1、VRTK是⼀种⾼效的VR⼯具包&#xff0c;⽤于在Unity3d中快速构建VR解决⽅案VRTK - Virtual Reality Toolkit - [ VR Toolkit ] | Integration | Unity Asset StoreUse the VRTK - Virtual Reality Toolkit - [ VR Toolkit ] from Sysdia Solutions Ltd on your next project.…

蓝桥杯刷题-子串简写

子串简写 代码 kint(input()) s,c1,c2input().split() pre[0]*len(s) ans0 for i in range(len(s)):pre[i]pre[i-1]if c1s[i]:pre[i]1elif c2s[i] and i1-k>0:anspre[i-k1] print(ans)

亲测有效Djiango连接oracle

navicat连接本地oracle截图。 Djiango下面settings.py下面的DATABASES&#xff1a; 注意&#xff1a;USER最好不要用sys或者system可能会导致连接不了&#xff0c;最好是自己新建的oracle用户。

Linux一键式安装JDK、Mysql、Redis、Nginx(附带安装包,无需手动配置密码等)

安装包 新服务器安装前置准备 1. 设置系统时区 # 查看服务器时区 timedatectl # 设置服务器时区为上海 timedatectl set-timezone Asia/Shanghai # 设置系统时间为“2021-3-19 11:00:00” date -s "2021-3-19 11:00:00" # 查看校准后的系统时间 date …

笔记本如何调节亮度?笔记本亮度调节方法

对于经常长时间面对笔记本电脑的小伙伴们来说&#xff0c;屏幕亮度过暗或者过亮&#xff0c;都会对眼睛造成伤害。那么&#xff0c;我们如何调节笔记本亮度至适中呢?下面为大家介绍3种简单的调节屏幕亮度的方法&#xff0c;一起来看看吧! 笔记本亮度调节方法一&#xff1a; 1、…