【Flume】尚硅谷学习笔记

实时监控目录下多个新文件

本案例是将虚拟机本地文件进行实时监控,并将上传的数据实时上传到HDFS中。

TAILDIR SOURCE【实现多目录监控、断点续传】

监视指定的文件,一旦检测到附加到每个文件的新行,就几乎实时地跟踪它们。如果正在写入新行,则该源将在等待写入完成时重试读取它们。它以JSON格式定期将每个文件的最后读位置写入给定的位置文件。如果Flume因某种原因停止或关闭,它可以从现有位置文件上写入的位置重新开始跟踪。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
# 多文件监控
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*
a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*
# 断点续传
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

文件改名、vim修改文件等操作(本地文件发生改变),Flume均要进行重新读取,其本质原因是因为inode发生了变化。

HDFS Sink

该sink将事件写入HDFS,可以根据经过的时间、数据大小或事件数量定期滚动文件(关闭当前文件并创建新文件)。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
# 多文件监控
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*
a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*
# 断点续传
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix = log-
# 实际开发使用时间需要调整为1h 学习使用改为10s
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Flume事务

在这里插入图片描述

多路复用及拦截器的使用

在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义 interceptor 区分数字和字母,将其分别发往不同的分析系统(Channel)。

Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。

在这里插入图片描述

首先编写拦截器,我觉得起主要作用是对header打标签,后传给channel选择器来进行分通道传输。

package com.atguigu.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

/**
 * 1、 继承flume的拦截器接口
 * 2、 重写4个抽象方法
 * 3、 编写静态内部类 builder
 */

public class MyInterceptor implements Interceptor {
    public void initialize() {

    }

    // 处理单个event
    public Event intercept(Event event) {
        // 在event的头信息里面添加标记
        // 提供给channel selector 选择发送到不同的channel
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody());

        // 判断第一个单符 如果是字母发送到channel1, 如果是数字发送到channel2
        char c = log.charAt(0);
        if (c >= '0' && c <= '9') {
            // 判断c为数字
            headers.put("type", "number");
        } else if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
            // 判断c为字母
            headers.put("type", "letter");
        }

        event.setHeaders(headers);
        return event;
    }

    // 处理多个event
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    public void close() {

    }

    public static class Builder implements Interceptor.Builder{
        // 创建一个拦截器对象
        public Interceptor build() {
            return new MyInterceptor();
        }

        // 读配置文件
        public void configure(Context context) {

        }
    }
}

其次完成配置文件的编写:

①为 hadoop102 上 的 Flume1 配 置 1 个 netcat source , 1 个 sink group ( 2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sources.r1.selector.type = multiplexing
# 填写标记的key
a1.sources.r1.selector.header = type
# value对应的channel
a1.sources.r1.selector.mapping.number = c2
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.default = c2

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.MyInterceptor$Builder

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

②为 hadoop102 上的 Flume4 配置一个 avro source 和一个 logger sink。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

③为 hadoop102 上的 Flume4 配置一个 avro source 和一个 logger sink。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4142

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Flume聚合案例

在这里插入图片描述

(1)准备工作

分发 Flume xsync flume

在 hadoop102、hadoop103 以及 hadoop104 的/opt/module/flume/job 目录下创建一个group3 文件夹。 mkdir group3

(2)创建配置文件

配置 Source 用于监控文件,配置 Sink 输出数据到下一级 Flume。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position4.json

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行配置文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/503168.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

前端学习<二>CSS基础——10-CSS3选择器详解

CSS3介绍 CSS3在CSS2基础上&#xff0c;增强或新增了许多特性&#xff0c; 弥补了CSS2的众多不足之处&#xff0c;使得Web开发变得更为高效和便捷。 CSS3的现状 浏览器支持程度不够好&#xff0c;有些需要添加私有前缀 移动端支持优于PC端 不断改进中 应用相对广泛 ### …

网络性能提升10%,ZStack Edge 云原生超融合基于第四代英特尔®至强®可扩展处理器解决方案发布

随着业务模式的逐渐转变、业务架构逐渐变得复杂&#xff0c;同时容器技术的兴起和逐渐成熟&#xff0c;使得Kubernetes、微服务等新潮技术逐步应用于业务应用系统上。 为了充分释放性能、为业务系统提供更高效的运行环境&#xff0c;ZStack Edge 云原生超融合采用了第四代英特尔…

第十四届蓝桥杯javaA组 阶乘的和

这个题我做的时候已经看出来&#xff0c;最小的m一定是最小的Ai&#xff0c;然后如果想让想让m1那么只能让最小的Ai的数量是m1的倍数因为比如说mAmin 1 那么想让m&#xff01;是 Ai阶乘和的倍数只有 nAmin&#xff01;%m&#xff01;0 然后我就不知道如何就m等于Amin2的情况了…

Spring 自定义注解 为 BeanDefinition 添加 qualifier 信息 从而约束自动装配范围

为什么写这篇文章 Spring 支持类型注入&#xff0c;并且可以通过Qualifier 或者Mate 调整类型注入的范围。但是通过自定义注解结合现有的 Qualifier 使用起来有种种困难。 将 Qualifier 融合在自定义注解中&#xff0c;在使用 AliasFor 遇到问题仅仅检查注解中的一部分内容是否…

2024年北京通信展|北京国际信息通信展览会|北京PT展

2024年北京通信展|北京国际信息通信展览会|北京PT展 2024年中国国际信息通信展览会&#xff08;PTEXPO&#xff09;&#xff0c;是由工业和信息化部主办的ICT行业盛会&#xff0c;自1990年创办以来&#xff0c;已成功举办31届&#xff0c;是反映信息通信行业发展最新成果的重要…

最小质因数 == 最大质因数,不等式秒了!

起因&#xff1a; 在洛谷做题遇到了这道题~ 一看咿呀&#xff0c;又是道数学题~ 首先我们要了解一下&#xff0c;什么是质数&#xff1f; 我记得好像有年高考题的前几题好像考了这玩意来着&#xff0c;质数的概念好像在小学学过&#xff0c;上了初中后基本都没有用过了~ 质数就…

error lsof 0.1 does not meet the minimal requirement

很多小伙伴在linux虚拟机中采用Centos 7镜像安装TitanIDE时&#xff0c;会报错如下信息 error lsof 0.1 does not meet the minimal requirement 这是因为lsof依赖版本较低&#xff0c;只需要在命令行输入 sudo yum install lsof 按下回车以后&#xff0c;命令行会弹出提示命令…

C语言例1-7:以下程序段中执行循环的次数是

代码如下&#xff1a; x-2; do { xx*x; } while(!x); 执行循环次数是&#xff1a;1 先执行后判断 代码如下&#xff1a; #include<stdio.h> int main(void) {int x;x-2;do{ xx*x; printf("\n");printf("x %d\n",x);}while(!x);return 0; } 结果…

Linux - 第三节

改变用户类型 su 仅单纯的进行身份变化 依旧处于普通用户里面 su - 进行重新登录更改身份 退出用exit / ctrld su 用户名 改成成其他身份 对一条命令进行提权 sudo command r:可读 w:可写 x:可执行 -:对应的权限位置&#xff0c;没有权限 去掉所有权限 chmod u…

2024 ccfcsp认证打卡【汇总】

202312-1 仓库规划 202312-2 因子化简 202312-3 树上搜索 202309-1 坐标变换&#xff08;其一&#xff09; 202309-2 坐标变换&#xff08;其二&#xff09; 202305-1 重复局面 202305-2 矩阵运算 202303-1 田地丈量 202303-2 垦田计划 202212-1 现值计算 202212-2 训练计划 20…

Redis实战篇-利用逻辑过期解决缓存击穿问题

实战篇Redis 3.0 、利用逻辑过期解决缓存击穿问题 需求&#xff1a;修改根据id查询商铺的业务&#xff0c;基于逻辑过期方式来解决缓存击穿问题 思路分析&#xff1a;当用户开始查询redis时&#xff0c;判断是否命中&#xff0c;如果没有命中则直接返回空数据&#xff0c;不…

打工人神器! Raccoon 代码小浣熊

继这三个之后&#xff0c;今天又来了一个 [ Raccoon代码小浣熊 ] 核心精要与产品特点 全面支持多种编程语言和IDE&#xff1a;「代码小浣熊」支持超过90种主流编程语言&#xff0c;包括但不限于Python、Java、JavaScript、C、Go和SQL等。同时&#xff0c;它集成了市面上主流的…

基于jsp+mysql+Spring+hibernate+的SSH在线学习交流论坛平台

基于jspmysqlSpringhibernate的SSH在线学习交流论坛平台 博主介绍&#xff1a;多年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《1000套》 欢迎点赞 收藏 ⭐留言 文末…

Unreal的Quixel Bridge下载速度过慢、下载失败

从Quixel Bridge下载MetaHuman模型&#xff0c;速度非常慢&#xff0c;而且经常下载失败&#xff0c;从头下载。 可以从Quixel Bridge的右上角我的图标->Support->Show Logs打开日志目录 downloaded-assets目录下为下载的资源 bridge-plugin.log文件记录了下载URL和下载…

java(1)之环境部署

1、下载安装包 直接百度java 点击这个就可以&#xff0c;进去之后下载&#xff0c;根据自身情况&#xff0c;window就下Windows版本的记得下那个jdk别下别的&#xff08;用不了&#xff09;&#xff0c;然后下一个编译器可以是idea可以是eclipse都可以 2、环境搭建 分为两步…

如何确保实物档案的安全

确保实物档案的安全有以下几个关键点&#xff1a; 1. 建立完善的安全措施&#xff1a;为实物档案建立专门的存储区域&#xff0c;控制进出口&#xff0c;限制访问权限&#xff0c;并使用安全锁和监控设备等物理安保措施。 2. 规范档案管理制度&#xff1a;建立档案管理制度&…

算法学习——LeetCode力扣动态规划篇10

算法学习——LeetCode力扣动态规划篇10 583. 两个字符串的删除操作 583. 两个字符串的删除操作 - 力扣&#xff08;LeetCode&#xff09; 描述 给定两个单词 word1 和 word2 &#xff0c;返回使得 word1 和 word2 相同所需的最小步数。 每步 可以删除任意一个字符串中的一个…

使用 golang 以及 Gin 框架,将上传的图片在不保存至本地的情况下添加水印,并上传至阿里云 OSS

正如标题所述&#xff0c;使用golang对上传图片添加水印&#xff0c;以及将图片上传到阿里云OSS&#xff0c;网上一搜索&#xff0c;便有你想要的结果了&#xff0c;可是&#xff0c;他们却先将上传图片添加水印后保存在本地&#xff0c;而后再将添加了水印的图片上传到阿里云O…

【个人笔记】python界面美化

目录 标题栏美化 样例展示 代码 配套鼠标移动 完整展示 标题栏美化 样例展示 代码 import tkinter as tk from tkinter import ttk from PIL import Image, ImageTk import subprocess import sysdef open_buy_quantity():window.destroy()subprocess.run(["p…

网际协议 - IP

文章目录 目录 文章目录 前言 1 . 网际协议IP 1.1 网络层和数据链路层的关系 2. IP基础知识 2.1 什么是IP地址? 2.2 路由控制 3. IP地址基础知识 3.1 IP地址定义 3.2 IP地址组成 3.3 IP地址分类 3.4 子网掩码 IP地址分类导致浪费? 子网与子网掩码 3.5 CIDR与…