Flume基础知识(十一):Flume自定义接口

1)案例需求

使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

2)需求分析

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要 发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予 不同的值。

在该案例中,我们以端口数据模拟日志,以是否包含”atguigu”模拟不同类型的日志, 我们需要自定义 interceptor 区分数据中是否包含”atguigu”,将其分别发往不同的分析 系统(Channel)。

3)实现步骤

(1)创建一个 maven 项目,并引入以下依赖。

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
    </dependency>
</dependencies>

(2)定义 CustomInterceptor 类并实现 Interceptor 接口。

package com.atguigu.interceptor;
​
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
​
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
​
/**
 * @author:左泽林
 * @date:日期:2022-01-17-时间:14:47
 * @message:
 */
public class TypeInterceptor implements Interceptor {
​
    /*声明一个集合,用于存放拦截器处理后的事件*/
    private List<Event> addHeaderEvents ;
​
    //初始化
    public void initialize() {
        /*初始化*/
        addHeaderEvents = new ArrayList<Event>();
    }
​
    //处理单个事件
    public Event intercept(Event event) {
​
        //1.获取header & body
        Map<String, String> headers = event.getHeaders();
        String body = new String(event.getBody());
​
        //2.根据body中是否包含“atguigu”添加不同的头信息
        if (body.contains("atguigu")){
            headers.put("type","atguigu");
        }else{
            headers.put("type","other");
        }
​
        /*返回数据*/
        return event;
    }
​
    /*批处理事件*/
    public List<Event> intercept(List<Event> events) {
​
        //1. 清空集合
        addHeaderEvents.clear();
​
        /*遍历events*/
        for (Event event : events) {
            addHeaderEvents.add(intercept(event));
        }
​
        /*返回数据*/
        return events;
    }
    
    public void close() {
​
    }
    
    public static class Builder implements Interceptor.Builder{
​
        public Interceptor build() {
            return new TypeInterceptor();
        }
​
        public void configure(Context context) {
            
        }
    }
}
打包,上传到服务器Flume的lib下,Flume会在启动时调用

(3)编辑 flume 配置文件

为 hadoop100 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink), 并配置相应的 ChannelSelector 和 interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
#拦截器
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
#映射需要对应代码中的拦截类型,这里就是atguigu、other
a1.sources.r1.selector.mapping.atguigu = c1
a1.sources.r1.selector.mapping.other = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 4141
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

为 hadoop102 上的 Flume3 配置一个 avro source 和一个 logger sink。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4242
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

(4)分别在 hadoop100,hadoop101,hadoop102 上启动 flume 进程,注意先后顺序。

[root@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume3.conf -Dflume.root.logger=INFO,console
[root@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2.conf -Dflume.root.logger=INFO,console
[root@hadoop100 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1.conf

(5)在 hadoop100 使用 netcat 向 localhost:44444 发送字母和数字。

(6)观察 hadoop101 和 hadoop102 打印的日志。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/298770.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

卫星互联网与MEC融合方案研究

卫星互联网与MEC融合方案研究 作者&#xff1a;温特、王立中、司鹏、颜明明、马恬、郭伊蒙 中国卫通集团股份有限公司 本文首发&#xff1a;第十九届卫星通信学术年会 摘 要&#xff1a;在卫星互联网中引入移动边缘计算(MEC)技术可有效提高用户体验质量&#xff0c;降低运营成…

Android studio环境配置

1.搜索android studio下载 Android Studio - Download 2.安装 3.配置环境 配置gradle&#xff0c;gradle参考网络配置。最后根据项目需求选择不同的jdk。

SpringDoc注解解析

一、什么是SpringDoc SpringDoc注解的使用&#xff0c;它是基于OpenAPI 3和Swagger 3的现代化解决方案&#xff0c;相较于旧版的Swagger2(SpringFox)&#xff0c;SpringDoc提供了更简洁、更直观的注解方式。 二、SpringDoc的注解分类 2.1 作用于类的注解 1. Tag 用于说明…

docker部署simpleDocker

1&#xff0c;安装docker&#xff0c;请参考 linux安装docker 2&#xff0c;安装docker-compose&#xff0c;请参考 Docker-Compose 3&#xff0c;安装simpleDocker 准备docker-compose.yml文件 version: 3 services:redis:container_name: redisimage: redis:latestweb:conta…

未完成销量任务的智己汽车突发大规模车机故障,竞争压力不小

2024年刚开年&#xff0c;智己汽车便上演了一出“开门黑”。 近日&#xff0c;不少车主在社交平台发帖&#xff0c;反映智己LS6出现大规模车机故障&#xff0c;包括但不限于主驾驶屏幕不显示车速、档位、行驶里程&#xff0c;左右转盲区显示失效&#xff0c;无转向灯、雷达提醒…

时钟的实现(MFC)

文章目录 1.预备知识1.日期和时间类1.概述2.构造3.CTime类主要成员函数3.CTimeSpan类主要成员函数 2.计时器1.创建计时器2.销毁计时器 3.位图类1.构造2.初始化3.属性4.操作 2.实验目的3.实验内容4.代码实现1.准备工作2.基类CClockBaseClockBase.hClockBase.cpp 3.时钟背景类CCl…

【大数据】Zookeeper 数据写入与分布式锁

Zookeeper 数据写入与分布式锁 1.数据是怎么写入的2.基于 Zookeeper 实现分布式锁 1.数据是怎么写入的 无论是 Zookeeper 自带的客户端 zkCli.sh&#xff0c;还是使用 Python&#xff08;或者其它语言&#xff09;实现的客户端&#xff0c;本质上都是连接至集群&#xff0c;然…

1982-2022年GIMMS 标准化差异植被指数

GIMMS 标准化差异植被指数 1982-2022 PKU GIMMS 归一化植被指数数据集&#xff08;PKU GIMMS NDVI&#xff0c;版本 1.2&#xff09;提供了从 1982 年到 2022 年以半个月为间隔、分辨率为 1/12 的一致的全球 NDVI 数据。其主要目标是解决现有领域中普遍存在的关键不确定性。全…

BMS电池管理系统带充放电控制过流过压保护

2.4G无线采集BMS开发板&#xff08;主从一体&#xff09; 全新升级 &#xff08;赠送上位机源码TTL 上位机&#xff0c;可以改成自己想要的界面&#xff09; 12串电池TTL上位机 CAN通信上位机源码有偿开源&#xff0c;供项目二次开发。 增加STM32平台 USB转TTL通信 CAN通信 增加…

C++面向对象核心-继承

1、继承 1.1 概念 继承是面向对象的三大特性之一&#xff0c;体现了代码复用的思想。 继承就是在一个已存在的类的基础上建立一个新的类&#xff0c;并拥有其特性。 已存在的类被称为“基类”或者“父类”新建立的类被称为“派生类”或者“子类”对象间没有继承关系 #include &…

数据恢复与并发控制例题

例1: (1)重做&#xff08;REDO&#xff09;&#xff1a;T1,T2,T3; 撤销&#xff08;UNDO&#xff09;&#xff1a;T4。 (2)重做&#xff1a;T1,T2&#xff1b; 撤销&#xff1a;T3。 (3)重做&#xff1a;T1; 撤销&#xff1a;T2,T3. (4)重做&#xff1a;T1&#xff1b; 撤销…

手机上下载 Linux 系统

我们首先要下载 Ternux 点击下载以及vnc viewer (提取码&#xff1a;d9sX)&#xff0c;需要魔法才行 下载完以后我们打开 Ternux 敲第一个命令 pkg upgrade 这个命令是用来跟新软件的 敲完命令就直接回车&#xff0c;如果遇到需要输入 Y/N 的地方全部输入 Y 下一步 #启动TMOE…

java SSM问卷调查系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM问卷调查管理系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代 码和数据库&#xff0c;系统主要采…

vite + vue3引入ant design vue 报错

npm install ant-design-vue --save下载插件并在main.ts 全局引入 报错 解决办法一&#xff1a; main.ts注释掉全局引入 模块按需引入 解决办法二 将package.json中的ant-design-vue的版本^4.0.0-rc.4改为 ^3.2.15版本 同时将将package-lock.json中的ant-design-vue的版本…

华为云服务器试用领取

系列文章目录 华为云服务器试用领取 领取的试用云服务器在哪 文章目录 系列文章目录介绍 介绍 我将会用该系列文章讲述如何在云服务器中安装大数据软件及其环境搭建。如有不足之处&#xff0c;还望指点。 本篇文章讲述的是华为云服务器的免费试用。 华为弹性云服务器 ECS 该云…

metaSPAdes,megahit,IDBA-UB:宏基因组装软件安装与使用

metaSPAdes,megahit,IDBA-UB是目前比较主流的宏基因组组装软件 metaSPAdes安装 GitHub - ablab/spades: SPAdes Genome Assembler #3.15.5的预编译版貌似有问题&#xff0c;使用源码安装试试 wget http://cab.spbu.ru/files/release3.15.5/SPAdes-3.15.5.tar.gz tar -xzf SP…

数据分析——快递电商

一、任务目标 1、任务 总体目的——对账 本项目解决同时使用多个快递发货&#xff0c;部分隔离区域出现不同程度涨价等情形下&#xff0c;如何快速准确核对账单的问题。 1、在订单表中新增一列【运费差异核对】来表示订单运费实际有多少差异&#xff0c;结果为数值。 2、将…

【书生·浦语大模型实战营02】《轻松玩转书生·浦语大模型趣味Demo》学习笔记

《轻松玩转书生浦语大模型趣味Demo》 1、InternLM-Chat-7B 智能对话&#xff1a;生成 300 字的小故事 本节中我们将使用InternLM-Chat-7B 模型部署一个智能对话 Demo。 1.1 环境准备 在InternStudio平台中选择 A100(1/4) 的配置&#xff0c;镜像选择 Cuda11.7-conda&#x…

idea中使用Lombok 失效,@Slf4j 找不到符号的解决办法

文章目录 一、前言二、问题排查和解决方案三、 其他解决方案3.1 另一种解决方案3.2 参考文章 一、前言 今天在一个多module工程中&#xff0c;新增了一个 springboot&#xff08;版本 2.2.4.RELEASE&#xff09; module&#xff0c;像往常一样&#xff0c;我引入了lombok依赖&…

电脑开启虚拟化如何查看自己的主机主板型号

问题描述 在使用virtualbox、vmware安装虚拟机的时候&#xff0c;需要本机电脑能够支持虚拟化。 但是不同厂家的主机&#xff08;主板&#xff09;幸好并不一致&#xff0c;所以需要先了解自己的电脑主板型号 操作方法 1、win r 键打开运行窗口&#xff0c;输入cmd并确定打开…