


TAILDIR SOURCE【实现多目录监控、断点续传】


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
# 多文件监控
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*
a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*
# 断点续传
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1




# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
# 多文件监控
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*
a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*
# 断点续传
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y-%m-%d/%H
a1.sinks.k1.hdfs.filePrefix = log-
# 实际开发使用时间需要调整为1h 学习使用改为10s
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1




在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义 interceptor 区分数字和字母,将其分别发往不同的分析系统(Channel)。

Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。



package com.atguigu.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

 * 1、 继承flume的拦截器接口
 * 2、 重写4个抽象方法
 * 3、 编写静态内部类 builder

public class MyInterceptor implements Interceptor {
    public void initialize() {


    // 处理单个event
    public Event intercept(Event event) {
        // 在event的头信息里面添加标记
        // 提供给channel selector 选择发送到不同的channel
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody());

        // 判断第一个单符 如果是字母发送到channel1, 如果是数字发送到channel2
        char c = log.charAt(0);
        if (c >= '0' && c <= '9') {
            // 判断c为数字
            headers.put("type", "number");
        } else if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
            // 判断c为字母
            headers.put("type", "letter");

        return event;

    // 处理多个event
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
        return events;

    public void close() {


    public static class Builder implements Interceptor.Builder{
        // 创建一个拦截器对象
        public Interceptor build() {
            return new MyInterceptor();

        // 读配置文件
        public void configure(Context context) {



①为 hadoop102 上 的 Flume1 配 置 1 个 netcat source , 1 个 sink group ( 2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sources.r1.selector.type = multiplexing
# 填写标记的key
a1.sources.r1.selector.header = type
# value对应的channel
a1.sources.r1.selector.mapping.number = c2
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.default = c2

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.MyInterceptor$Builder

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

②为 hadoop102 上的 Flume4 配置一个 avro source 和一个 logger sink。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

③为 hadoop102 上的 Flume4 配置一个 avro source 和一个 logger sink。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4142

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1




分发 Flume xsync flume

在 hadoop102、hadoop103 以及 hadoop104 的/opt/module/flume/job 目录下创建一个group3 文件夹。 mkdir group3


配置 Source 用于监控文件,配置 Sink 输出数据到下一级 Flume。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position4.json

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1






CSS3介绍 CSS3在CSS2基础上&#xff0c;增强或新增了许多特性&#xff0c; 弥补了CSS2的众多不足之处&#xff0c;使得Web开发变得更为高效和便捷。 CSS3的现状 浏览器支持程度不够好&#xff0c;有些需要添加私有前缀 移动端支持优于PC端 不断改进中 应用相对广泛 ### …

网络性能提升10%,ZStack Edge 云原生超融合基于第四代英特尔®至强®可扩展处理器解决方案发布

随着业务模式的逐渐转变、业务架构逐渐变得复杂&#xff0c;同时容器技术的兴起和逐渐成熟&#xff0c;使得Kubernetes、微服务等新潮技术逐步应用于业务应用系统上。 为了充分释放性能、为业务系统提供更高效的运行环境&#xff0c;ZStack Edge 云原生超融合采用了第四代英特尔…

第十四届蓝桥杯javaA组 阶乘的和

这个题我做的时候已经看出来&#xff0c;最小的m一定是最小的Ai&#xff0c;然后如果想让想让m1那么只能让最小的Ai的数量是m1的倍数因为比如说mAmin 1 那么想让m&#xff01;是 Ai阶乘和的倍数只有 nAmin&#xff01;%m&#xff01;0 然后我就不知道如何就m等于Amin2的情况了…

Spring 自定义注解 为 BeanDefinition 添加 qualifier 信息 从而约束自动装配范围

为什么写这篇文章 Spring 支持类型注入&#xff0c;并且可以通过Qualifier 或者Mate 调整类型注入的范围。但是通过自定义注解结合现有的 Qualifier 使用起来有种种困难。 将 Qualifier 融合在自定义注解中&#xff0c;在使用 AliasFor 遇到问题仅仅检查注解中的一部分内容是否…


2024年北京通信展|北京国际信息通信展览会|北京PT展 2024年中国国际信息通信展览会&#xff08;PTEXPO&#xff09;&#xff0c;是由工业和信息化部主办的ICT行业盛会&#xff0c;自1990年创办以来&#xff0c;已成功举办31届&#xff0c;是反映信息通信行业发展最新成果的重要…

最小质因数 == 最大质因数,不等式秒了!

起因&#xff1a; 在洛谷做题遇到了这道题~ 一看咿呀&#xff0c;又是道数学题~ 首先我们要了解一下&#xff0c;什么是质数&#xff1f; 我记得好像有年高考题的前几题好像考了这玩意来着&#xff0c;质数的概念好像在小学学过&#xff0c;上了初中后基本都没有用过了~ 质数就…

error lsof 0.1 does not meet the minimal requirement

很多小伙伴在linux虚拟机中采用Centos 7镜像安装TitanIDE时&#xff0c;会报错如下信息 error lsof 0.1 does not meet the minimal requirement 这是因为lsof依赖版本较低&#xff0c;只需要在命令行输入 sudo yum install lsof 按下回车以后&#xff0c;命令行会弹出提示命令…


代码如下&#xff1a; x-2; do { xx*x; } while(!x); 执行循环次数是&#xff1a;1 先执行后判断 代码如下&#xff1a; #include<stdio.h> int main(void) {int x;x-2;do{ xx*x; printf("\n");printf("x %d\n",x);}while(!x);return 0; } 结果…

Linux - 第三节

改变用户类型 su 仅单纯的进行身份变化 依旧处于普通用户里面 su - 进行重新登录更改身份 退出用exit / ctrld su 用户名 改成成其他身份 对一条命令进行提权 sudo command r:可读 w:可写 x:可执行 -:对应的权限位置&#xff0c;没有权限 去掉所有权限 chmod u…

2024 ccfcsp认证打卡【汇总】

202312-1 仓库规划 202312-2 因子化简 202312-3 树上搜索 202309-1 坐标变换&#xff08;其一&#xff09; 202309-2 坐标变换&#xff08;其二&#xff09; 202305-1 重复局面 202305-2 矩阵运算 202303-1 田地丈量 202303-2 垦田计划 202212-1 现值计算 202212-2 训练计划 20…


实战篇Redis 3.0 、利用逻辑过期解决缓存击穿问题 需求&#xff1a;修改根据id查询商铺的业务&#xff0c;基于逻辑过期方式来解决缓存击穿问题 思路分析&#xff1a;当用户开始查询redis时&#xff0c;判断是否命中&#xff0c;如果没有命中则直接返回空数据&#xff0c;不…

打工人神器! Raccoon 代码小浣熊

继这三个之后&#xff0c;今天又来了一个 [ Raccoon代码小浣熊 ] 核心精要与产品特点 全面支持多种编程语言和IDE&#xff1a;「代码小浣熊」支持超过90种主流编程语言&#xff0c;包括但不限于Python、Java、JavaScript、C、Go和SQL等。同时&#xff0c;它集成了市面上主流的…


基于jspmysqlSpringhibernate的SSH在线学习交流论坛平台 博主介绍&#xff1a;多年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 央顺技术团队 Java毕设项目精品实战案例《1000套》 欢迎点赞 收藏 ⭐留言 文末…

Unreal的Quixel Bridge下载速度过慢、下载失败

从Quixel Bridge下载MetaHuman模型&#xff0c;速度非常慢&#xff0c;而且经常下载失败&#xff0c;从头下载。 可以从Quixel Bridge的右上角我的图标->Support->Show Logs打开日志目录 downloaded-assets目录下为下载的资源 bridge-plugin.log文件记录了下载URL和下载…


1、下载安装包 直接百度java 点击这个就可以&#xff0c;进去之后下载&#xff0c;根据自身情况&#xff0c;window就下Windows版本的记得下那个jdk别下别的&#xff08;用不了&#xff09;&#xff0c;然后下一个编译器可以是idea可以是eclipse都可以 2、环境搭建 分为两步…


确保实物档案的安全有以下几个关键点&#xff1a; 1. 建立完善的安全措施&#xff1a;为实物档案建立专门的存储区域&#xff0c;控制进出口&#xff0c;限制访问权限&#xff0c;并使用安全锁和监控设备等物理安保措施。 2. 规范档案管理制度&#xff1a;建立档案管理制度&…


算法学习——LeetCode力扣动态规划篇10 583. 两个字符串的删除操作 583. 两个字符串的删除操作 - 力扣&#xff08;LeetCode&#xff09; 描述 给定两个单词 word1 和 word2 &#xff0c;返回使得 word1 和 word2 相同所需的最小步数。 每步 可以删除任意一个字符串中的一个…

使用 golang 以及 Gin 框架,将上传的图片在不保存至本地的情况下添加水印,并上传至阿里云 OSS



目录 标题栏美化 样例展示 代码 配套鼠标移动 完整展示 标题栏美化 样例展示 代码 import tkinter as tk from tkinter import ttk from PIL import Image, ImageTk import subprocess import sysdef open_buy_quantity():window.destroy()subprocess.run(["p…

网际协议 - IP

文章目录 目录 文章目录 前言 1 . 网际协议IP 1.1 网络层和数据链路层的关系 2. IP基础知识 2.1 什么是IP地址? 2.2 路由控制 3. IP地址基础知识 3.1 IP地址定义 3.2 IP地址组成 3.3 IP地址分类 3.4 子网掩码 IP地址分类导致浪费? 子网与子网掩码 3.5 CIDR与…