Flume详解(3)

Host Interceptor

主机拦截器,本质上不是拦截数据,而是在数据的headers中添加一个host字段,可以用于标记数据来源(被收集)的主机。

Host Interceptor可以配置的选项有:

表-22 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是host

preserveExisting

optional

如果host字段已经存在,是否替换。默认是false

useIP

optional

是否使用IP。如果为true,则显示为IP;如果为false,则显示为主机名。默认为true

hostHeader

optional

显示的字段,默认是host

案例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置Host Interceptor

a1.sources.s1.interceptors.i1.type = host

# 是否使用IP

a1.sources.s1.interceptors.i1.useIP = false



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

Static Interceptor

静态拦截器,本质上不是拦截器,而是在数据的headers中添加一个指定的字段。

Static Interceptor可以配置的选项有:

表-23 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是static

preserveExisting

optional

如果指定字段已经存在,是否替换。默认是false

key

optional

指定的键。默认是key

value

optional

指定的值,默认是value

案例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置Static Interceptor

a1.sources.s1.interceptors.i1.type = static

# 指定键

a1.sources.s1.interceptors.i1.key = kind

# 指定值

a1.sources.s1.interceptors.i1.value = test



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

UUID Interceptor

UUID拦截器,本质上也不是一个拦截器,而是在数据的headers中添加一个id字段,可以用于标记数据的唯一性。

UUID Interceptor可以配置的选项包含:

表-24 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

headerName

optional

headers中添加的字段名,默认是id

preserveExisting

optional

如果headers中已经存在id字段,是否替换。默认是true

prefix

optional

在生成的id之前添加前缀

案例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置UUID Interceptor

a1.sources.s1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

Search And Replace Interceptor

搜索和替换拦截器,在使用的时候需要指定正则表达式,会根据正则表达式指定的规则,对Event中body部分的数据进行替换。注意,只替换body部分的数据,而不会影响headers中的数据。正则表达式的使用规则和Java中正则表达式的规则是一致的。

Search And Replace Interceptor中可以配置的选项包含:

表-25 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是search_replace

searchPattern

optional

搜索的正则表达式形式

replaceString

optional

替换的形式

charset

optional

body部分的字符集编码,默认是UTF-8

案例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置Search And Replace Interceptor

a1.sources.s1.interceptors.i1.type = search_replace

# 指定正则表达式

a1.sources.s1.interceptors.i1.searchPattern = [a-z]

# 指定替换后的形式

a1.sources.s1.interceptors.i1.replaceString = *



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

Regex Filtering Interceptor

正则过滤拦截器,在使用的时候需要指定一个正则表达式,然后根据属性excludeEvents的值来确定过滤方式。如果excludeEvents的值为true,则表示过滤掉符合正则表达式形式的数据,其他数据留下来;如果excludeEvents的值为false,则表示过滤掉不合符正则表达式形式的数据,符合形式的数据留下来。默认情况下,excludeEvents的值为false。

Regex Filtering Interceptor中可以配置的选项包含:

表-26 配置选项

选项

备注

解释

type

required

拦截器类型,此处必须是regex_filter

regex

optional

正则表达式

excludeEvents

optional

替换规则,默认为false

案例:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置Regex Filtering Interceptor

a1.sources.s1.interceptors.i1.type = regex_filter

# 指定正则表达式

a1.sources.s1.interceptors.i1.regex = .*[0-9].*

# 指定替换规则

a1.sources.s1.interceptors.i1.excludeEvents = true



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

Custom Interceptor

在实际开发过程中,如果Flume提供的拦截器无法满足业务需求,那么Flume同样支持自定义拦截器。但是不同于其他组件的地方在于,Flume中的拦截器在启动的时候,是通过内部类Builder来启动,所以拦截器中需要覆盖其内部类。

步骤:

1)定义类实现Interceptor接口,覆盖其中的intercept方法,其他方法可以忽略掉;同时需要定义内部类实现Interceptor.Builder接口,覆盖其中的build方法;如果需要获取配置,那么可以配置configure方法。

package com.flume.interceptor;



import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;



import java.util.ArrayList;

import java.util.List;

import java.util.Map;



public class AuthInterceptor implements Interceptor {

    @Override

    public void initialize() {

    }



    @Override

    public Event intercept(Event event) {

        // 获取headers部分

        Map<String, String> headers = event.getHeaders();

        // 判断headers是否包含time或者timestamp字段

        if (headers.containsKey("time") || headers.containsKey("timestamp")) return event;

        // 如果没有,则添加当前的时间戳

        headers.put("timestamp", String.valueOf(System.currentTimeMillis()));

        return event;

    }



    @Override

    public List<Event> intercept(List<Event> events) {

        // 定义新的集合

        List<Event> es = new ArrayList<>();

        // 遍历

        for (Event event : events) {

            es.add(intercept(event));

        }

        return es;

    }



    @Override

    public void close() {

    }



    public static class Builder implements Interceptor.Builder {



        @Override

        public Interceptor build() {

            return new AuthInterceptor();

        }



        @Override

        public void configure(Context context) {

        }

    }

}

2)将定义好的Interceptor打成jar包。

3)将jar包上传到Flume安装目录的lib目录下:

# 进入Flume的lib目录

cd /opt/software/flume-1.11.0/lib/

# 选择jar包,上传

rz

4)回到数据目录,编辑文件:

# 回到数据目录

cd ../data

# 编辑文件

vim authin.properties

在文件中添加:

a1.sources = s1

a1.channels = c1

a1.sinks = k1



a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 8090

a1.sources.s1.interceptors = i1

# 配置Custom Interceptor

a1.sources.s1.interceptors.i1.type = com.flume.interceptor.AuthInterceptor$Builder



a1.channels.c1.type = memory



a1.sinks.k1.type = logger



a1.sources.s1.channels = c1

a1.sinks.k1.channel = c1

5)启动Flume:

flume-ng agent -n a1 -c $FLUME_HOME/conf -f authin.properties -Dflume.root.logger=INFO,console

其他

事务

在Flume中,也存在事务的问题:

图-7 Flume事务

流程如下:

1)Source进行doPut将数据写到临时的缓冲区PutList中;

2)PutList会推送数据给Channel,如果Channel中有足够的位置,则数据推送成功(doCommit),如果Channel中没有位置,则推送失败,进行回滚(doRollback);

3)Channel进行doTake操作将数据写到临时缓冲区TakeList中;

4)将TakeList中的数据通过Sink批量写往目的地;

5)如果写成功了,则执行doCommit操作;如果写失败了,则执行doRollback操作。

执行流程

Flume执行流程如下图所示:

图-8 Flume执行流程

流程如下:

1)Source会先采集数据,然后将数据发送给ChannelProcessor进行处理;

2)ChannelProcessor收到数据处理之后,会将数据交给Interceptor来处理,注意,在Flume允许存在多个Interceptor来构成拦截器链;

3)Interceptor处理完成之后,会交给Channel Selector处理,Selector存在三种模式:replicating、multiplexing和load_balancing。Selector收到数据之后会根据对应的模式将数据交给对应的Channel来处理;

4)Channel处理之后会交给SinkProcessor。SinkProcessor本质上是一个Sinkgroup,包含了三种方式:default,failover和load_balance。SinkProcessor收到数据之后会根据对应的方式将数据交给Sink来处理;

5)Sink收到数据之后,会将数据写到指定的目的地。

扩展:Flume监控

Ganglia概述

Flume的数据流可以通过Ganglia来进行监控。Ganglia是UC Berkeley发起的一个开源的集群监控项目,可以用于监测数以千计的节点性能。

Ganglia的核心主要包含了三个模块:

1)gmond(Ganglia Monitoring Daemon):轻量级的服务,需要安装在每一个需要收集指标数据的主机上。gmond可以用于收集系统指标数据,包含CPU、内存、磁盘、网络以及活跃的进程数量等。

2)gmetad(Ganglia Meta Daemon):用于整合所有的信息,并将这些信息以RRD格式来存储到磁盘上。

3)gweb(Ganglia Web):Ganglia提供的一个可视化工具,本身是使用PHP开发的,提供了WEB页面,在WEB界面中以图标形式来显示集群的运行状态,以及所收集到的不同的指标数据。

Ganglia安装

1)所有节点安装httpd和php服务:

yum -y install httpd php

2)所有节点安装关联依赖:

yum -y install rrdtool perl-rrdtool rrdtool-devel apr-devel

3)安装Epel:

yum -y install epel-release

4)在第一个节点上安装gmetad,gmond和gweb:

yum -y install ganglia-gmetad

yum -y install ganglia-gmond

yum -y install ganglia-web

5)在其他节点上安装gmond:

yum -y install ganglia-gmond

6)第一个节点上修改ganglia.conf:

# 编辑文件

vim /etc/httpd/conf.d/ganglia.conf

# 修改内容如下

<Location /ganglia>

  # Require local

  # Require ip 10.1.2.3

  # Require host example.org

  Require all granted

</Location>

7)第一个节点上修改gmetad.conf:

# 编辑文件

vim /etc/ganglia/gmetad.conf

# 修改data_source属性

data_source "flume_cluster" hadoop01

8)所有节点修改gmond.conf文件:

# 编辑文件

vim /etc/ganglia/gmond.conf

# 修改cluster中的属性值

cluster {

  name = "flume_cluster"

  owner = "unspecified"

  latlong = "unspecified"

  url = "unspecified"

}

# 修改udp_send_channel中的属性值

udp_send_channel {

  #bind_hostname = yes # Highly recommended, soon to be default.

                       # This option tells gmond to use a source address

                       # that resolves to the machine's hostname.  Without

                       # this, the metrics may appear to come from any

                       # interface and the DNS names associated with

                       # those IPs will be used to create the RRDs.

  #mcast_join = 239.2.11.71

  # 监控数据发送给hadoop01

  host = hadoop01

  port = 8649

  ttl = 1

}

# 修改udp_recv_channel中的属性值

udp_recv_channel {

  # mcast_join = 239.2.11.71

  port = 8649

  # 接收来自任意连接的数据

  bind = 0.0.0.0

  retry_bind = true

  # Size of the UDP buffer. If you are handling lots of metrics you really

  # should bump it up to e.g. 10MB or even higher.

  # buffer = 10485760

}

9)所有节点关闭selinux:

# 编辑文件

vim /etc/selinux/config

# 修改SELINUX属性的值为disabled

SELINUX=disabled

# 保存退出,重启服务器

reboot

10)所有节点启动gmond:

systemctl start gmond

11)在第一个节点上启动gmetad和httpd:

systemctl start gmetad

systemctl start httpd

12)在浏览器输入http://IP/ganglia,查看Ganglia页面。

图-9 Ganglia界面

监控Flume

1)修改Flume的配置文件:

# 进入Flume的配置目录

cd /opt/software/flume-1.11.0/conf/

# 复制文件

cp flume-env.sh.template flume-env.sh

# 编辑文件

vim flume-env.sh

# 在文件尾部添加

export JAVA_HOME=/opt/software/jdk1.8

export JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.root.monitoring.hosts=hadoop01:8649 -Xms100m -Xmx200m"

# 保存退出,生效

source flume-env.sh

2)启动Flume:

# 回到数据目录

cd ../data

# 启动Flume

flume-ng agent -n a1 -c $FLUME_HOME/conf -f basic.properties -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=hadoop01:8649

3)属性解释:

表-27 属性解释

属性

解释

ChannelCapacity

Channel的容量

ChannelFillPercentage

Channel的利用率

ChannelSize

Channel的大小

EventPutAttemptCount

Source试图放入Channel的次数

EventPutSuccessCount

Source向Channel成功放入数据的次数

EventTakeAttemptCount

Channel试图向Sink发送数据的次数

EventTakeSuccessCount

Channel向Sink成功发送数据的次数

startTime

起始时间

stopTime

结束时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/487078.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

翻过DP这座大山

1.AcWing 跳台阶 第一种方法:暴力搜索DFS #include <iostream> using namespace std;int dfs(int n) {if(n 1) return 1;else if(n 2) return 2;else return dfs(n-1)dfs(n-2); }int main() {int x; cin>>x;cout<<dfs(x)<<endl;return 0; }显然如…

CSS设置移动端页面底部安全距离

env(safe-area-inset-bottom)是一个CSS属性值&#xff0c;用于设置底部安全距离。它表示使用环境变量来获取底部安全距离的值。当使用环境变量时&#xff0c;需要使用env()函数来引用具体的环境变量。例如&#xff1a; <style> .box{padding-bottom: env(safe-area-inse…

OSCP靶场--Crane

OSCP靶场–Crane 考点(CVE-2022-23940sudo service提权) 1.nmap扫描 ┌──(root㉿kali)-[~/Desktop] └─# nmap 192.168.229.146 -sC -sV --min-rate 2500 Starting Nmap 7.92 ( https://nmap.org ) at 2024-03-25 08:07 EDT Nmap scan report for 192.16…

MySQL索引18连问,谁能顶住

前言 过完这个节&#xff0c;就要进入金银季&#xff0c;准备了 18 道 MySQL 索引题&#xff0c;一定用得上。 作者&#xff1a;感谢每一个支持&#xff1a; github 1. 索引是什么 索引是一种数据结构&#xff0c;用来帮助提升查询和检索数据速度。可以理解为一本书的目录&…

2-Flume之Sink与Channel

Flume Sink HDFS Sink 将数据写到HDFS上。数据以文件形式落地到HDFS上&#xff0c;文件名默认是以FlumeData开头&#xff0c;可以通过hdfs.filePrefix来修改 HDFS Sink默认每隔30s会滚动一次生成一个文件&#xff0c;因此会导致在HDFS上生成大量的小文件&#xff0c;实际过程…

蔚来JAVA面试(收集)

先叠加&#xff0c;这个是自己找的答案不一定对&#xff0c;只是给我参考看看而已。 一、项目 这个没有&#xff0c;根据实际项目情况来。蔚来比较喜欢拷打项目&#xff0c;所以要对项目非常熟悉&#xff08;慌&#xff09; 二、JAVA基础 2.1 Java中的IO模型有用到过吗&#…

代码学习记录27---贪心算法

随想录日记part27 t i m e &#xff1a; time&#xff1a; time&#xff1a; 2024.03.25 主要内容&#xff1a;今天深入学习贪心算法&#xff0c;接下来是针对题目的讲解&#xff1a;1.K次取反后最大化的数组和 &#xff1b;2. 加油站 &#xff1b;3.分发糖果 1005.K次取反后最…

华为防火墙二层墙(VAN/SVI/单臂路由)

二层墙只能做地址池形式的NAT。 交换机安全策略防火墙二层墙 路由器安全策略防火墙三层墙 交换机的光口是不能直接插线的&#xff0c;光模块&#xff0c;包括进和出 长距离&#xff1a;单模 短距离&#xff1a;多模 防火墙自身的ping流量需要单独配置

202447读书笔记|《围炉夜话》——多记先正格言,胸中方有主宰 闲看他人行事,眼前即是规箴

202447读书笔记|《围炉夜话》——多记先正格言&#xff0c;胸中方有主宰&#xff1b;闲看他人行事&#xff0c;眼前即是规箴 围炉夜话 《围炉夜话&#xff08;读客三个圈经典文库&#xff09;》作者王永彬。读《围炉夜话》&#xff0c;可以掌握君子安身立业的大智慧&#xff01…

MySQL数据库的备份

文章目录 MySQL数据库的备份MySQL备份方法完全备份物理备份备份 逻辑热备完全备份逻辑热备恢复恢复库恢复表 增量备份备份增量备份恢复基于位置进行恢复基于时间 MySQL数据库的备份 MySQL备份方法 物理备份&#xff1a; 物理备份涉及直接复制MySQL的数据文件和日志文件。这种…

(进程线程)的状态和线程安全

进程有两个状态就绪状态和阻塞状态。 这些状态决定了系统会按照什么样的态度来调度这个进程&#xff08;这些一般是针对一个进程里面有一个线程的情况&#xff09;。在实际的大多数情况下&#xff0c;一个进程中包含多个线程&#xff0c;其状态则会绑定在线程上。 上诉状态一…

计算机408炸了!大多数人都栽在这门课上

组成原理>>数据结构>操作系统>计算机网络 在本科时&#xff0c;我在学习组成原理之前已经学过数字电路和模拟电路&#xff0c;但在接下来学习组成原理时&#xff0c;我依然感到困难。也许是因为自己理解能力不足&#xff0c;总觉得难以掌握&#xff0c;甚至在考研…

算法打卡day28|贪心算法篇02|Leetcode 122.买卖股票的最佳时机 II、55. 跳跃游戏、45.跳跃游戏 II

算法题 Leetcode 122.买卖股票的最佳时机 II 题目链接:122.买卖股票的最佳时机 II 大佬视频讲解&#xff1a;买卖股票的最佳时机 II视频讲解 个人思路 因为只有一只股票&#xff0c;且两天作一个交易单元&#xff0c;那每次只收集正利润就可以最终最多可以获取的利润&#xf…

数据运营常用的8大模型

✅作者简介&#xff1a;《数据运营&#xff1a;数据分析模型撬动新零售实战》作者、《数据实践之美》作者、数据科技公司创始人、多次参加国家级大数据行业标准研讨及制定、高端企培合作讲师。 &#x1f338;公众号&#xff1a;风姑娘的数字视角&#xff0c;免费分享数据应用相…

10个优秀的Github开源项目

1Panel 是一个现代化、开源的 Linux 服务器运维管理面板 EX-chatGPT-精准搜索工具 feishu-chatgpt-飞一般的工作体验工具 Knife4j-是一个集Swagger2 和 OpenAPI3为一体的增强解决方案 Kooder 是 Gitee 团队开发的一个代码搜索系统 mtbird 是一款低代码可视化页面生成器 S…

<Linux> 模拟实现文件流 - 简易版

目录 1. FILE 结构设计 2、函数使用及分析 3、文件打开 fopen 4. 缓冲区刷新fflush 5. 数据写入fwrite 6. 文件关闭 fclose 7. 测试 8. 小结 1. FILE 结构设计 在设计 FILE 结构体前&#xff0c;首先要清楚 FILE 中有自己的缓冲区及冲刷方式 缓冲区的大小和刷新方式因…

巧用 20个 Linux 命令贴士与技巧,让你生产力瞬间翻倍?

在本文中&#xff0c;我将向您演示一些专业的Linux命令技巧&#xff0c;这些技巧将使您节省大量时间&#xff0c;在某些情况下还可以避免很多麻烦&#xff0c;而且它也将帮助您提高工作效率。 并不是说这些只是针对初学者的 Linux 技巧。即使有经验的Linux用户也有可能没有发现…

C++ 扫描当前路径下文件并删除大文件

C 扫描当前路径下文件并删除大文件 C获取当前路径扫描文件路径下规定后缀名称的文件计算文件大小 1. 获取当前路径 使用<Windows.h>中的GetCurrentDirectory方法实现&#xff0c;单独编写验证程序如下&#xff1a; #include<iostream> #include<Windows.h&g…

R语言基础入门

1.保存或加载工作空间 改变工作目录——进行文件读写&#xff0c;默认去指定文件进行操作。&#xff08;使用R时&#xff0c;最好先设定工作目录&#xff08;setwd(),getwd()&#xff09;&#xff09; setwd(“工作文件路径”)&#xff1a;建立工作目录 getwd&#xff08;&…

Linux的进程控制(创建和终止)

进程创建 fork 我们前面已经认识过fork函数&#xff0c; 用fork创建新进程后&#xff0c; 新建立的进程为子进程&#xff0c; 该进程为父进程。fork给父进程返回的是子进程的pid&#xff0c; 给子进程返回的是0&#xff0c; 出错时返回-1 进程调用fork后&#xff0c; 当控制…