Flume学习笔记(3)—— Flume 自定义组件

前置知识:

Flume学习笔记(1)—— Flume入门-CSDN博客

Flume学习笔记(2)—— Flume进阶-CSDN博客

Flume 自定义组件

自定义 Interceptor

需求分析:使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统

需要使用Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值

实现流程:

代码

导入依赖:

<dependencies>
  <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
  </dependency>
</dependencies>

自定义拦截器的代码:

package com.why.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TypeInterceptor implements Interceptor {

    //存放事件的集合
    private List<Event> addHeaderEvents;

    @Override
    public void initialize() {
        //初始化集合
        addHeaderEvents = new ArrayList<>();
    }

    //单个事件拦截
    @Override
    public Event intercept(Event event) {
        //获取头信息
        Map<String, String> headers = event.getHeaders();
        //获取body信息
        String body = new String(event.getBody());

        //根据数据中是否包含”why“来分组
        if(body.contains("why"))
        {
            headers.put("type","first");
        }else {
            headers.put("type","second");
        }
        return event;
    }

    //批量事件拦截
    @Override
    public List<Event> intercept(List<Event> events) {
        //清空集合
        addHeaderEvents.clear();

        //遍历events
        for(Event event : events)
        {
            //给每一个事件添加头信息
            addHeaderEvents.add(intercept(event));
        }
        return addHeaderEvents;
    }

    @Override
    public void close() {

    }

    //构建生成器
    public static class TypeBuilder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

}

将代码打包放入flume安装路径下的lib文件夹中

配置文件

在job文件夹下创建group4目录,添加配置文件;

为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.why.interceptor.TypeInterceptor$TypeBuilder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.first = c1
a1.sources.r1.selector.mapping.second = c2

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

hadoop103:配置一个 avro source 和一个 logger sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

hadoop104:配置一个 avro source 和一个 logger sink

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

执行指令

hadoop102:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume-interceptor-flume.conf

hadoop103:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1-flume-logger.conf -Dflume.root.logger=INFO,console

hadoop104:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2-flume-logger.conf -Dflume.root.logger=INFO,console

然后hadoop102通过nc连接44444端口,发送数据:

在hadoop103和104上分别接收到:

自定义 Source

官方提供的文档:Flume 1.11.0 Developer Guide — Apache Flume

给出的示例代码如下:

public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }
}

需要继承AbstractSource,实现Configurable, PollableSource

实战需求分析

使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置

代码

package com.why.source;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;
import java.util.concurrent.ConcurrentMap;

public class MySource extends AbstractSource implements PollableSource, Configurable {

    //定义配置文件将来要读取的字段
    private Long delay;
    private String field;

    //获取数据封装成 event 并写入 channel,这个方法将被循环调用
    @Override
    public Status process() throws EventDeliveryException {
        try {
            //事件头信息
            HashMap<String,String> headerMap = new HashMap<>();
            //创建事件
            SimpleEvent event = new SimpleEvent();
            //循环封装事件
            for (int i = 0; i < 5; i++) {
                //设置头信息
                event.setHeaders(headerMap);
                //设置事件内容
                event.setBody((field + i).getBytes());
                //将事件写入Channel
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }

        }catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return Status.READY;
    }

    //backoff 步长
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    //backoff 最长时间
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    //初始化 context(读取配置文件内容)
    @Override
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field","Hello");
    }

}

打包放到flume安装路径下的lib文件夹中;

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.why.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = why

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行指令

hadoop102上:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group5/mysource.conf -Dflume.root.logger=INFO,console

结果如下:

自定义 Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件

官方文档:Flume 1.11.0 Developer Guide — Apache Flume

接口实例:

public class MySink extends AbstractSink implements Configurable {
    private String myProp;

    @Override
    public void configure(Context context) {
        String myProp = context.getString("myProp", "defaultValue");

        // Process the myProp value (e.g. validation)

        // Store myProp for later retrieval by process() method
        this.myProp = myProp;
    }

    @Override
    public void start() {
        // Initialize the connection to the external repository (e.g. HDFS) that
        // this Sink will forward Events to ..
    }

    @Override
    public void stop () {
        // Disconnect from the external respository and do any
        // additional cleanup (e.g. releasing resources or nulling-out
        // field values) ..
    }

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;

        // Start transaction
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        txn.begin();
        try {
            // This try clause includes whatever Channel operations you want to do

            Event event = ch.take();

            // Send the Event to the external repository.
            // storeSomeData(e);

            txn.commit();
            status = Status.READY;
        } catch (Throwable t) {
            txn.rollback();

            // Log exception, handle individual exceptions as needed

            status = Status.BACKOFF;

            // re-throw all Errors
            if (t instanceof Error) {
                throw (Error)t;
            }
        }
        return status;
    }
}

自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口

实战需求分析

使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置

代码

package com.why.sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {

    //创建 Logger 对象
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
    //前后缀
    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {
        //声明返回值状态信息
        Status status;
        //获取当前 Sink 绑定的 Channel
        Channel ch = getChannel();
        //获取事务
        Transaction txn = ch.getTransaction();
        //声明事件
        Event event;
        //开启事务
        txn.begin();
        //读取 Channel 中的事件,直到读取到事件结束循环
        while (true) {
            event = ch.take();
            if (event != null) {
                break;
            }
        }
        try {
            //处理事件(打印)
            LOG.info(prefix + new String(event.getBody()) + suffix);
            //事务提交
            txn.commit();
            status = Status.READY;
        } catch (Exception e) {
            //遇到异常,事务回滚
            txn.rollback();
            status = Status.BACKOFF;
        } finally {
            //关闭事务
            txn.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix", "hello");
        suffix = context.getString("suffix");
    }
}

打包放到flume安装路径下的lib文件夹中;

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.why.sink.MySink
a1.sinks.k1.prefix = why:
a1.sinks.k1.suffix = :why

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行指令

hadoop102上:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group6/mysink.conf -Dflume.root.logger=INFO,console

结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/160392.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

浅析AcrelEMS-CIA机场智慧能源管平台解决方案-安科瑞 蒋静

1 概述 机场智慧能源管平台解决方案对机场范围内变电站内的高低压配电设备 、 发电机、变压器 、UPS、EPS 、广场照明 、 室内照明 、通风及排水等机电设备进行实时分布式监控和集中管理 , 实现无人值守 , 确保高速公路安全畅通 , 提高 自动化管理水平 , 降低机电设备的运行维…

计算机视觉:驾驶员疲劳检测

目录 前言 关键点讲解 代码详解 结果展示 改进方向&#xff08;打哈欠检测疲劳方法&#xff09; 改进方向&#xff08;点头检测疲劳&#xff09; GUI界面设计展示 前言 上次博客我们讲到了如何定位人脸&#xff0c;并且在人脸上进行关键点定位。其中包括5点定位和68点定…

骨传导能保护听力吗?骨传导耳机是智商税吗?

先说答案&#xff0c;骨传导耳机是可以保护听力的&#xff01;并且骨传导耳机也不是智商税&#xff01;甚至在某些场景下&#xff0c;骨传导耳机比其他耳机更适合。 为什么说骨传导耳机会保护听力呢&#xff1f;因为骨传导耳机跟入耳式耳机的传递声音方式是不一样的&#xff0c…

PACS医学影像信息化数字平台源码

PACS系统对医院影像科意义重大&#xff0c;将业务量巨大的影像检验流程依托于信息化技术&#xff0c;对于进行信息化建设的医院而言&#xff0c;是十分必要的。 PACS系统源码&#xff0c;集成三维影像后处理功能&#xff0c;包括三维多平面重建、三维容积重建、三维表面重建、三…

【HarmonyOS开发】设备调试避坑指南

备注&#xff1a;通过开发验证&#xff0c;发现每个设备调试都会存在不小的差别&#xff0c;开发验证需要注意~ 1、预览器调试&#xff08;只能预览具有Entry修饰的文件&#xff09; 修改文件&#xff0c;预览器将自动刷新 注意&#xff1a;当我们只修改了Component 组件的文件…

基于单片机温湿度PM2.5报警系统

**单片机设计介绍&#xff0c; 基于单片机温湿度PM2.5报警设置系统 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 单片机温湿度PM2.5报警设置系统是一种智能化的环境检测与报警系统。它主要由单片机、传感器、液晶显示屏、蜂鸣器…

嵌入式QTGit面试题

自己在秋招过程中遇到的QT和嵌入式和Git相关的面试题&#xff0c;因为比较少就一起放了 QT connect第5个参数是什么&#xff1f; Qt::AutoConnection&#xff1a; 默认值&#xff0c;使用这个值则连接类型会在信号发送时决定。 如果接收者和发送者在同一个线程&#xff0c;则…

轻地图+数据闭环加速落地,觉非科技获多家头部车企定点

‍作者 |张祥威 编辑 |德新 智能驾驶日益普及&#xff0c;「轻地图」和「数据闭环」成为各家能力比拼的关键&#xff0c;车企对此的需求也逐渐迫切。 11月16日&#xff0c;觉非科技宣布已与多家头部主机厂达成量产定点合作&#xff0c;围绕轻地图与数据闭环服务&#xff0c;支…

《Linux从练气到飞升》No.31 多线程编程实践与线程安全技术

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux菜鸟刷题集 &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的…

【数据结构】图的存储结构(邻接矩阵)

一.邻接矩阵 1.图的特点 任何两个顶点之间都可能存在边&#xff0c;无法通过存储位置表示这种任意的逻辑关系。 图无法采用顺序存储结构。 2.如何存储图&#xff1f; 将顶点与边分开存储。 3.邻接矩阵&#xff08;数组表示法&#xff09; 基本思想&#xff1a; 用一个一维数…

Vatee万腾的科技征程:Vatee数字化创新的前沿探讨

在Vatee万腾的科技征程中&#xff0c;我们目睹了一场数字化创新的引领之旅&#xff0c;探讨了Vatee在科技前沿的独到见解。Vatee万腾不仅仅是一家科技公司&#xff0c;更是一支前行不辍的冒险队伍&#xff0c;通过不断突破自我&#xff0c;探索未知领域&#xff0c;引领着数字化…

Ghidra逆向工具配置 MacOS 的启动台显示(Python)

写在前面 通过 ghidra 工具, 但是只能用命令行启动, 不太舒服, 写个脚本生成 MacOS 的 app 格式并导入启动台. 不算复杂, 主要是解析包的一些元信息还有裁剪软件图标(通过 MacOS 自带的 API) 脚本 #!/opt/homebrew/bin/python3import os import re import subprocess as sp…

SpringCloud 2022有哪些变化

目录 前提条件 AOT支持 Spring Native支持 前提条件 Spring Cloud 2022.0.0是构建在Spring Framework 6.0和Spring Boot 3.0 之上的一S个主要版本。 JDK要求最低需要是Java 17J2EE要求最低需要Jakarta EE 9 AOT支持 Spring cloud 2022支持AOT编译&#xff0c;它是将程序源…

【HarmonyOS开发】配置开发工具DevEco Studio

1、下载 注意&#xff1a; 1、安装过程中&#xff0c;一定要自定义安装位置&#xff0c;包比较大&#xff0c;包比较大&#xff0c;包比较大&#xff01;&#xff01;&#xff01; 2、可以将该工具添加到右键中&#xff0c;否则&#xff0c;如果你的项目不是HarmonyOS&#xff…

Maven依赖传递和依赖冲突以及继承和聚合关系详解

Java全能学习面试指南&#xff1a;https://javaxiaobear.cn 1、Maven依赖传递和依赖冲突 1. Maven依赖传递特性 概念 假如有Maven项目A&#xff0c;项目B依赖A&#xff0c;项目C依赖B。那么我们可以说 C依赖A。也就是说&#xff0c;依赖的关系为&#xff1a;C—>B—>…

探索亚马逊大语言模型:开启人工智能时代的语言创作新篇章

文章目录 前言一、大语言模型是什么&#xff1f;应用范围 二、Amazon Bedrock总结 前言 想必大家在ChatGPT的突然兴起&#xff0c;大家多多少少都会有各种各样的问题&#xff0c;比如&#xff1a;大语言模型和生成式AI有什么关系呢&#xff1f;大语言模型为什么这么火&#xf…

linux使用chage修改用户密码过期时间解决rac安装互信问题

文章目录 一、RAC建多实例库提示互信问题二、原因分析1.修改系统用户密码期限2.修改语法&#xff1a;chage [选项] 用户名3.常用示例&#xff1a; 一、RAC建多实例库提示互信问题 二、原因分析 因为此次是在原有集群情况下创建多个实例&#xff0c;其实不需要优先排查俩节点的…

Aerial for Mac: 沉浸在高清鸟瞰的世界,让你的屏幕焕发新生

你是否已经厌倦了那些平淡无奇的屏保程序&#xff1f;是否希望你的Mac屏幕能更生动、更有趣&#xff1f;如果你对此抱有强烈的期待&#xff0c;那么Aerial for Mac绝对会是你期待已久的解决方案。 Aerial for Mac是一款独具特色的高清屏保程序&#xff0c;它以鸟瞰的视角带你领…

4.5 Windows驱动开发:实现进程数据转储

多数ARK反内核工具中都存在驱动级别的内存转存功能&#xff0c;该功能可以将应用层中运行进程的内存镜像转存到特定目录下&#xff0c;内存转存功能在应对加壳程序的分析尤为重要&#xff0c;当进程在内存中解码后&#xff0c;我们可以很容易的将内存镜像导出&#xff0c;从而更…

FISCO BCOS 3.0【01】搭建第一个区块链网络

官方技术文档&#xff1a;https://fisco-bcos-doc.readthedocs.io/zh-cn/latest/index.html 我们在官方技术文档的基础上&#xff0c;进行&#xff0c;对文档中一些不清楚的地方进行修正 搭建Air版本FISCO BCOS联盟链 本节以搭建单群组FISCO BCOS链为例操作&#xff0c;使用开…