Flume 拦截器概念及自定义拦截器的运用

文章目录

    • Flume 拦截器
    • 拦截器的作用
    • 拦截器运用
      • 1.创建项目
      • 2.实现拦截器接口
      • 3.编写事件处理逻辑
      • 4.拦截器构建
      • 5.打包与上传
      • 6.编写配置文件
      • 7.测试运行

Flume 拦截器

在 Flume 中,拦截器(Interceptors)是一种可以在事件传输过程中拦截、处理和修改事件的组件。

位于 Source 与 Channel 之间,在写入Channel 之前,拦截器可以对数据进行转换、提取或删除,以满足特定的需求。每个拦截器只处理同一个 Source 接收到的事件,你也可以同时配置多个拦截器,它们会按顺序执行。

拦截器的作用

  • 数据处理和转换: 拦截器可以对事件数据进行处理和转换。例如,可以对原始日志进行解析、过滤、格式化等操作,以便后续处理或存储。

  • 数据增强: 拦截器可以为事件数据添加额外的信息或元数据。例如,可以添加时间戳、主机信息、标签等,以丰富事件数据的内容。

  • 数据过滤: 拦截器可以根据特定条件过滤掉不需要的事件数据,减少数据传输的量或过滤掉无效数据。

  • 监控和日志: 拦截器可以用于监控数据流的运行情况,记录日志信息或统计数据流中的事件数量、处理速率等指标,帮助用户进行性能分析和故障排查。

拦截器运用

1.创建项目

创建一个 Maven 工程项目,引入 Flume 依赖。

在 IDEA 中创建 Maven 项目想必大家都会,这里不再赘述。

根据集群中的 Flume 版本,引入 Flume 依赖,如下所示:

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.10.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

无需将该依赖打包进最后的 JAR 包中,故将其作用域设置为 provided

当一个依赖项的 scope 被设置为 compile 时,它将在编译和运行时都可用,并包含在最终的项目包中。而 provided 范围的依赖项仅在编译和测试阶段需要,运行时不包括。

2.实现拦截器接口

创建测试类 TestInterceptor 实现拦截器 Interceptor,注意,导包时不要导错了。

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

public class TimestampInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        return null;
    }

    @Override
    public void close() {

    }
}

在上面的代码中,我们实现了 Flume 拦截器接口 Interceptor,并重写了其中的四个方法:

  • initialize() 方法:初始化拦截器操作,读取配置信息、建立连接等。

  • intercept(Event event) 方法:用于拦截单个事件,并对事件进行处理。接收一个事件对象作为输入,并返回一个修改后的事件对象。

  • intercept(List<Event> list) 方法:事件批处理,拦截事件列表,并对事件列表进行处理。

  • close() 方法:关闭拦截器,在这里释放资源、关闭连接等。

3.编写事件处理逻辑

在这里做个简单的事件处理,如果数据中包含字符串 hello 则进行过滤操作,这样我们可以直观感受到拦截器的存在,下面来进行逻辑设计。

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class TestInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 获取事件数据
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);

        // 检查事件数据中是否包含指定字符串
        if (eventData.contains("hello")) {
            // 如果包含指定字符串,则过滤掉该事件,返回 null
            return null;
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        // 创建一个新的列表,存储处理过后的事件
        List<Event> interceptedEvents = new ArrayList<>();

        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }

        return interceptedEvents;
    }

    @Override
    public void close() {

    }
    
}

intercept(List<Event> events) 方法用于对事件列表进行批量处理。这个方法会遍历传入的事件列表,并对每一个事件调用 intercept(Event event) 方法来进行单独处理。

注意,如果只有 intercept(Event event) 方法被重写了,而没有实现 intercept(List<Event> events) 批处理方法,那么在处理事件时会以单个事件的方式进行处理。

在不需要进行初始化和释放资源的情况下,我们可以选择不重写 initializeclose 方法。

4.拦截器构建

在编写完事件处理逻辑后,我们还需要对拦截器进行构建。

在 Flume 中,拦截器的创建和配置通常是通过 Builder 模式来完成的。

在程序中,我们可以定义一个静态内部类 Builder,实现 Interceptor.Builder 接口来对拦截器进行构建,如下所示:

    public static class Builder implements Interceptor.Builder {

        @Override
        public void configure(Context context) {
            // 配置操作,可留空
        }

        @Override
        public Interceptor build() {
            // 返回构建的拦截器类
        }
        
    }

在我们这个案例中,完整的代码如下所示:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class TestInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 获取事件数据
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);

        // 检查事件数据中是否包含指定字符串
        if (eventData.contains("hello")) {
            // 如果包含指定字符串,则过滤掉该事件,返回 null
            return null;
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> interceptedEvents = new ArrayList<>();

        for (Event event : events) {
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
                interceptedEvents.add(interceptedEvent);
            }
        }

        return interceptedEvents;
    }

    @Override
    public void close() {

    }

    // 拦截器构建
    public static class Builder implements Interceptor.Builder {

        @Override
        public void configure(Context context) {

        }

        @Override
        public Interceptor build() {
            return new TestInterceptor();
        }

    }

}

5.打包与上传

将写好的项目进行打包,并上传到集群中,进行测试。

注意,需要将打包好的拦截器包放在 Flume 安装目录下的 lib 文件夹中。

在这里插入图片描述

6.编写配置文件

这里为了验证拦截器的作用,通过一个 Flume 采集案例来进行体现。

如果你不知道如何编写配置文件,可以看我写的这篇文章 —— Flume 配置文件编写技巧(包会的,抄就完了)

这个配置案例是将发送到 HTTP 源中的数据采集到 HDFS 上,将本地文件作为缓冲通道,该配置文件命名为 httpToHDFS.conf

# 声明
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Source 源配置
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.bind = localhost

# 拦截器配置
# 拦截器定义
a1.sources.r1.interceptors = i1
# 拦截器全类名
a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder

# Sink 处理/存储配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
	
# Channel 通道配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/data

# 组装/绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

拦截器全类名配置那里需要注意,格式为 拦截器的全类名 + $Builder

在 IDEA 中获取全类名的方式:右击需要引用的类,依次选择【File——>Copy Path/Reference…——>Copy Reference】即可复制。

你可以根据你的需要对该配置文件进行修改。

7.测试运行

因为我们是将数据采集到 HDFS 上,所以需要先启动 Hadoop,然后再进行操作。

# 运行 Flume
cd $FLUME_HOME

# 注意引用路径,需要修改成你自己的
./bin/flume-ng agent -n a1 -c conf/ -f job/httpToHDFS.conf -Dflume.root.logger=INFO,console

Flume 启动完成后,如下所示:

在这里插入图片描述

我们通过其它窗口,使用 curl 命令向 HTTP 源发送两条模拟数据:

curl -X POST -d '[{"body":"hello body"}]'  http://localhost:5140

curl -X POST -d '[{"body":"HELLO FLUME"}]'  http://localhost:5140

在这里插入图片描述

数据发送完成后,Flume 会采集到该数据,并存储到 HDFS 上。

在这里插入图片描述

通过命令,查看 HDFS 中存储的内容,验证拦截器是否生效:

hdfs dfs -text /flume/events/2024-04-04/1630/00/ev* 

结果如下所示:

在这里插入图片描述

可以看到,我们在上面分别发送了两条数据 hello bodyHELLO FLUME,但最终 HDFS 中只存储了一条数据。

这是因为我们设置的拦截器生效了,它对数据中包含 hello 字符串的事件进行了过滤,故只存储了一条数据。

Flume 拦截器就是起到这样的效果,对数据进行处理、转换、删除等操作,是不是很简单呀。(同学,包会的呀)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/515770.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux网络协议栈从应用层到内核层④

文章目录 1、网卡接受数据2、网络设备层接收数据3、ip层接受数据4、tcp层接受数据5、上层应用读取数据6、数据从网卡到应用层的整体流程 1、网卡接受数据 当网卡收到数据时&#xff0c;会触发一个中断&#xff0c;然后就会调用对应的中断处理函数&#xff0c;再做进一步处理。…

python相对路径导包与绝对路径导包的正确方式

【python相对路径导包与绝对路径导包的正确方式】 python相对路径导包与绝对路径导包的正确方式_哔哩哔哩_bilibilipython导包的难题&#xff0c;今天解决了&#xff0c;相对路径导包和绝对路径导包&#xff0c;均可以&#xff01;&#xff01;&#xff01;, 视频播放量 5、弹…

如何(关闭)断开 Websocket 连接:简单易懂的实现指南

WebSocket 协议提供了一条用于 Web 应用程序中双向通讯的高效通道&#xff0c;让服务器能够实时地向客户端发送信息&#xff0c;而无需客户端每次都发起请求。本文旨在探讨有关结束 WebSocket 连接的适当时机&#xff0c;内容包括协议的基础知识、如何结束连接、一些使用场景&a…

maven本地仓库设置

1、背景 我们在本地安装好maven后&#xff0c;java环境也安装好了以后&#xff0c;运行java项目A,我希望把项目A所有的依赖安装在我电脑中的a文件夹下&#xff0c;项目B安装在我电脑的b文件夹下。 2、解决 需要在 maven 文件中找到 conf 文件夹下的 settings.xml 文件进行修…

Unity | Shader基础知识(第十一集:什么是Normal Map法线贴图)

目录 前言 一、图片是否有法线贴图的视觉区别 二、有视觉区别的原因 三、法线贴图的作用 四、信息是如何存进去的 五、自己写一个Shader用到法线贴图 六、注意事项 七、作者的话 前言 本小节会给大家解释&#xff0c;什么是法线贴图&#xff1f;为什么法线贴图会产生深…

SpringBoot -- 外部化配置

我们如果要对普通程序的jar包更改配置&#xff0c;那么我们需要对jar包解压&#xff0c;并在其中的配置文件中更改配置参数&#xff0c;然后再打包并重新运行。可以看到过程比较繁琐&#xff0c;SpringBoot也注意到了这个问题&#xff0c;其可以通过外部配置文件更新配置。 我…

前端三剑客 —— CSS (上)

上节内容中提到了 前端三剑客 —— HTML 超文本标记语言&#xff0c;这节内容 跟大家讲述三剑客中的第二个 CSS。 CSS 什么是CSS Cascading Style Sheel&#xff0c;简称CSS&#xff0c;中文叫层叠样式表&#xff0c;也叫级联样式表。主要作用是来修饰HTML页面的一种技术。 …

【C++学习】哈希表的底层实现及其在unordered_set与unordered_map中的封装

文章目录 1. unordered系列关联式容器1.1 unordered_map1.2 unordered_set1.3.底层结构 2.哈希2.1哈希概念2.2哈希冲突2.3 哈希函数2.4 哈希冲突解决2.4.1闭散列2.4.1开散列2.5开散列与闭散列比较 3.哈希的模拟实现1. 模板参数列表2. 迭代器的实现3. 增加通过key获取value操作4…

66toolkit终极网络工具系统:470+强大Web工具,助力您的网络运营与开发

一、产品介绍 66toolkit&#xff0c;被誉为“终极网络工具系统”&#xff08;SAAS&#xff09;&#xff0c;是一款功能强大的PHP脚本。它集合了超过470种快速且易用的Web工具&#xff0c;为日常任务处理和开发人员提供了极大的便利。作为一款综合性的网络工具系统&#xff0c;…

面试题目--fork

问题&#xff1a; (1)fork 以后&#xff0c;父进程打开的文件指针位置在子进程里面是否一样&#xff1f;(先open再fork) (2)能否用代码简单的验证一下? (3)先fork再打开文件父子进程是否共享偏移量?父进程打开的文件指针位置在子进程里面是否一样&#xff1f;能否用代码简…

武汉星起航:引领亚马逊孵化新篇章,助力合作伙伴共创商业辉煌

武汉星起航电子商务有限公司自2020年成立以来&#xff0c;凭借其敏锐的市场洞察和深度合作模式&#xff0c;在跨境电商领域取得了显著的成绩。为了进一步满足市场需求&#xff0c;公司决定推出亚马逊一站式孵化平台&#xff0c;为合作伙伴提供更全面的指导和支持。 该孵化平台…

【办公类-47-01】20240404 Word内部照片批量缩小长宽(课题资料系列)

作品展示 背景需求 最近在做《运用Python优化3-6岁幼儿学习操作材料的实践研究》的课题研究资料&#xff08;上半学期和下半学期&#xff09;。 将CSDN里面相关的研究照片文字贴入Word后&#xff0c;就发现一张图片就占了A4竖版一页&#xff0c;太大了。我想把word里面的所有…

数学结论在dsa中的应用

1. LC 3102 最小化曼哈顿距离 VP周赛391 T4。这是个结论题目。 首先曼哈顿距离是需要两个数对而不是两个数去进行比较的&#xff0c;两个数之间你很轻易就知道差的绝对值最大是多少了&#xff0c;只要挑最大和最小两个数一减就可以了。 但是两个数对之间各项差的绝对值之和最…

Spring的注入小技巧(接口前置处理,后置处理等优化写法)

目录 1.定一个公共&#xff08;前置、后置&#xff09;接口 2.添加接口的实现类&#xff08;就是不同的处理&#xff09; 3.测试小栗子 4.执行结果 接口的前置处理或是后置处理&#xff0c;这样写代码更优雅&#xff0c;可读性高&#xff0c;当然更有水平更装逼。前置处理或…

【信号处理】基于变分自编码器(VAE)的图片典型增强方法实现

关于 深度学习中&#xff0c;经常面临图片数据量较小的问题&#xff0c;此时&#xff0c;对数据进行增强&#xff0c;显得比较重要。传统的图片增强方法包括剪切&#xff0c;增加噪声&#xff0c;改变对比度等等方法&#xff0c;但是&#xff0c;对于后端任务的性能提升有限。…

运算符规则

console.log(null undefined) null和undefined都是原始类型&#xff0c;然后把这两个转换为数字。是0NaN.看规则有一个NaN的话就得到NaN. console.log({} []); 把{}和[]转换为原始类型分别为和[Object Object]。然后特殊情况有字符串&#xff0c;那就拼接字符串返回[Object…

Redis数据库——群集(主从、哨兵)

目录 前言 一、主从复制 1.基本原理 2.作用 3.流程 4.搭建主动复制 4.1环境准备 4.2修改主服务器配置 4.3从服务器配置&#xff08;Slave1和Slave2&#xff09; 4.4查看主从复制信息 4.5验证主从复制 二、哨兵模式——Sentinel 1.定义 2.原理 3.作用 4.组成 5.…

【Java多线程(3)】线程安全问题和解决方案

目录 一、线程安全问题 1. 线程不安全的示例 2. 线程安全的概念 3. 线程不安全的原因 二、线程不安全的解决方案 1. synchronized 关键字 1.1 synchronized 的互斥特性 1.2 synchronized 的可重入特性 1.3 死锁的进一步讨论 1.4 死锁的四个必要条件&#xff08;重点&…

Golang 内存管理和垃圾回收底层原理(一)

一、这篇文章我们来聊聊Golang内存管理和垃圾回收&#xff0c;主要注重基本底层原理讲解&#xff0c;进一步实战待后续文章 1、这篇我们来讨论一下Golang的内存管理 先上结构图 从图我们来讲Golang的基本内存结构&#xff0c;内存结构可以分为&#xff1a;协程缓存、中央缓存…

vue3+eachrts饼图轮流切换显示高亮数据

<template><div class"charts-box"><div class"charts-instance" ref"chartRef"></div>// 自定义legend 样式<div class"charts-note"><span v-for"(items, index) in data.dataList" cla…