0301taildir-source报错-flume-大数据

1 基础环境简介

linux系统:centos,前置安装:jdk、hadoop、zookeeper、kafka,版本如下

软件版本描述
centos7linux系统发行版
jdk1.8java开发工具集
hadoop2.10.0大数据生态基础组件
zookeeper3.5.7分布式应用程序协调服务
kafka3.0分布式mq组件
flume1.9.0分布式采集传输组件

2 报错

  • 场景1:动态监控目录多个日志变化,通过flume采集传输到kafka

  • 报错日志

    org.apache.flume.FlumeException: Error creating positionFile parent directories
            at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:170)
            at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
            at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:325)
            at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)
            at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    Caused by: java.nio.file.FileAlreadyExistsException: /export/server/flume
            at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
            at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
            at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
            at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
            at java.nio.file.Files.createDirectory(Files.java:674)
            at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
            at java.nio.file.Files.createDirectories(Files.java:727)
            at org.apache.flume.source.taildir.TaildirSource.configure(TaildirSource.java:168)
            ... 11 more
    
    
  • conf文件如下

    #定义组件
    a1.sources = r1
    a1.channels = c1
    
    #配置source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
    a1.sources.r1.positionFile = /export/server/flume/taildir_position.json
    
    #配置channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = node1:9092,node2:9092
    a1.channels.c1.kafka.topic = topic_log01
    a1.channels.c1.parseAsFlumeEvent = false
    
    #组装 
    a1.sources.r1.channels = c1
    
  • 原因就是在创建positionFile的时候父目录已存在

  • 场景2:我们生成的日志文件app.log 每经过一天会按照日期重命名文件,然后生成新的app.log,此时flume会重新采集所有的日志信息,导致信息重复采集2次。

  • Taildir 说明: Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:

    {"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.t
    
    xt"}
    
    {"inode":2496275,"pos":12,"file":"/opt/module/flume/files2/log.t
    
    xt"}
    
  • 而flume会同时判断Inode和file来确定是否同一文件

    注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统

    用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来

    识别文件。

3 解决

场景1解决方案有两种:

  1. 既然是创建父目录已存在,我们可以吧positionFile位置重新配置。

  2. 修改源代码,我们通过源代码找下处理逻辑,下载1.9.0版本的flume源代码,官网地址:https://archive.apache.org/dist/flume/,找到TailSource 170行

     @Override
      public synchronized void configure(Context context) {
        String fileGroups = context.getString(FILE_GROUPS);
        Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS);
    
        filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX),
                                 fileGroups.split("\\s+"));
        Preconditions.checkState(!filePaths.isEmpty(),
            "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'");
    
        String homePath = System.getProperty("user.home").replace('\\', '/');
        positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE);
        Path positionFile = Paths.get(positionFilePath);
        try {
          // 此处创建父目录,如果存在报错
          Files.createDirectories(positionFile.getParent());
        } catch (IOException e) {
          throw new FlumeException("Error creating positionFile parent directories", e);
        }
        headerTable = getTable(context, HEADERS_PREFIX);
        batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
        skipToEnd = context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END);
        byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);
        idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);
        writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);
        cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING,
            DEFAULT_CACHE_PATTERN_MATCHING);
    
        backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
            PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
        maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
            PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
        fileHeader = context.getBoolean(FILENAME_HEADER,
                DEFAULT_FILE_HEADER);
        fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
                DEFAULT_FILENAME_HEADER_KEY);
        maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT);
        if (maxBatchCount <= 0) {
          maxBatchCount = DEFAULT_MAX_BATCH_COUNT;
          logger.warn("Invalid maxBatchCount specified, initializing source "
              + "default maxBatchCount of {}", maxBatchCount);
        }
    
        if (sourceCounter == null) {
          sourceCounter = new SourceCounter(getName());
        }
      }
    

    在这里插入图片描述

可以在创建父目录之前检测是否已存在,如果已存在,直接跳过创建即可,修改try代码块中内容如下

boolean exists = Files.exists(positionFile.getParent());
      if (!exists)
        Files.createDirectories(positionFile.getParent());

maven打包替换flume/lib/下 flume-taildir-source-1.9.0.jar 如图所示:在这里插入图片描述

重新运行,正常启动,如下图日志所示:在这里插入图片描述

kafka中新接收的数据如下图所示:在这里插入图片描述

场景2解决方案 把TailFile如下代码

  public boolean updatePos(String path, long inode, long pos) throws IOException {
    if (this.inode == inode && this.path.equals(path)) {
      setPos(pos);
      updateFilePos(pos);
      logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
      return true;
    }
    return false;
  }
  
  // 修改为
    public boolean updatePos(String path, long inode, long pos) throws IOException {
    if (this.inode == inode) {
      setPos(pos);
      updateFilePos(pos);
      logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
      return true;
    }
    return false;
  }

即不校验file只校验inode,具体这里不再去验证,有兴趣自己验证下

结语

如果小伙伴什么问题或者指教,欢迎交流。

❓QQ:806797785

参考链接:

[1]flume教学视频[CP/OL].2020-04-16.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/458015.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

私域运营的模式

私域运营的模式 | 想要建立私域流量&#xff0c;但由于对私域流量的认知不够全面&#xff0c;不知道该从何处着手进行落地实施。 整理了私域建设的五个主要模式一个SOP 供大家参考。 需要明确的是&#xff0c;每种模式都有各自的利弊&#xff0c;并不存在绝对的优劣之分。最重要…

国创证券策略:股指预计维持震荡格局 关注汽车、通信设备等板块

国创证券指出&#xff0c;近期两市指数持续反弹创新高&#xff0c;但沪指现已率先出现滞涨状况&#xff0c;一起均已进入阻力重压区。不过当时技术形状上坚持较好&#xff0c;可持续做多&#xff0c;一旦跌破重要支撑如沪指的3030点&#xff0c;则需降仓防卫&#xff0c;防止指…

CompletionService 处理异步任务

案例: public static void main(String[] args) throws Exception {ExecutorService executorService Executors.newCachedThreadPool();ArrayList<Future<Integer>> list new ArrayList<>();Future<Integer> future_15 executorService.submit(()…

海外媒体宣发套餐推广:如何选择最佳方案-华媒舍

在信息时代&#xff0c;传播和宣传已经成为各个行业发展的关键部分。尤其对于拓展国际市场的企业来说&#xff0c;海外媒体宣发更是至关重要。由于各种原因&#xff0c;很多企业在选择海外媒体宣发套餐时感到困惑。本文将为您介绍如何选择最佳的海外媒体宣发方案。 1.了解目标市…

目标检测——YOLOv3算法解读

论文&#xff1a;YOLOv3&#xff1a;An Incremental Improvement 作者&#xff1a;Joseph Redmon, Ali Farhadi 链接&#xff1a;https://arxiv.org/abs/1804.02767 代码&#xff1a;http://pjreddie.com/yolo/ YOLO系列其他文章&#xff1a; YOLOv1通俗易懂版解读SSD算法解读…

mac输入su命令报错如何重置密码

diannao1xiejiandeMacBook-Air ~ % su Password: su: Sorry输入 sudo passwd 命令重置密码即可。

名创优品“主战场”增速放缓,第四季度国内市场收入环比下滑

近日&#xff0c;名创优品&#xff08;NYSE:MNSO、HK:09896&#xff09;公布了截至12月31日的2023年第四季度及全年财报。财报显示&#xff0c;名创优品2023年第四季度收入、净利润均实现了双位数增长&#xff0c;多项业绩指标创下历史新高。 然而&#xff0c;在名创优品这份可…

Windows Server 各版本搭建终端服务器实现远程访问(03~19)

一、Windows Server 2003 左下角开始➡管理工具➡管理您的服务器&#xff0c;点击添加或删除角色 点击下一步 勾选自定义&#xff0c;点击下一步 点击终端服务器&#xff0c;点击下一步 点击确定 重新登录后点击确定 点击开始➡管理工具➡计算机管理&#xff0c;展开本地用户…

海康威视相机SDK二次开发(JAVA语言)

目录 前言客户端创建虚拟相机示例代码保存图片程序运行结果修改需求 二次开发引入外部包对SaveImage.java文件进行修改保存图片saveDataToFile方法选择相机chooseCamera方法主方法 FileUtil类处理过期照片启动类与配置文件application.yml通过实体类读取yml启动类 SaveImage.ja…

sqllab第十一关通关笔记

知识点&#xff1a; 发现登录框就可以尝试注入登录框一般都是字符型注入通过注入可以获取其他表的信息绕过手段 单引号闭合联合注入也可以进行错误注入 首先看界面是一个登录框&#xff1b;通过admin admin登录进去&#xff0c;发现页面会把用户名和密码的登录信息打印出来&am…

前端路由跳转bug

路由后面拼接了id的千万不能取相近的名字&#xff0c;浏览器分辩不出&#xff0c;只会匹配前面的路径 浏览器自动跳转到上面的路径页面&#xff0c;即使在菜单管理里面配置了正确的路由 跳转了无数次&#xff0c;页面始终不对&#xff0c;检查了路由配置&#xff0c;没有任何问…

SSL VPN基础原理

目录 SSL ---安全传输协议&#xff08;安全套接层&#xff09;---TLS ----传输层安全协议 SSL的工作原理 SSL会话建立的过程 ​编辑 数据传输过程中的封装示意图 无客户端认证的过程 有客户端认证的过程 SSL VPN的核心技术---虚拟网关技术 服务器验证的点&#xff1a; 资源…

通过路由器监控,优化网络效率

路由器是网络的基本连接组件&#xff0c;路由器监控涉及将路由器网络作为一个整体进行管理&#xff0c;其中持续监控路由器的性能、运行状况、安全性和可用性&#xff0c;以确保更好的操作和最短的停机时间&#xff0c;因此监控路由器至关重要。 为什么路由器监控对组织很重要…

code摘录日记[矩阵变元素,变列向量,3D表面图,table行列设置] Matlab

矩阵变元素&#xff0c;变列向量 W1(Z1 < Z2) nan; % Z1,Z2 all matrix,Only plot points where Z1 > Z2;Z1 < Z2位置值填为NaNx x(:); % Now x is a 30-by-1 vector; matrix变列vector技巧3D表面图 hand figure; % Handle to the figure, for more plotting later…

根据服务器系统选择对应的MySQL版本

1. 根据服务器系统选择对应的MySQL版本 MySQL有多个版本&#xff0c;选择对应的版本&#xff0c;重点信息是Linux的GLIBC版本号&#xff0c;Linux的版本、系统位数。 1.1 查看Linux的GLIBC版本号 通常libc.so会支持多个版本&#xff0c;即向前兼容&#xff0c;查看该文件中…

java-模拟的例题实战

例题实战 在实际的开发工作中&#xff0c;对字符串的处理是最常见的编程惹怒我。本题目即是要求程序对用户输入的串进行处理。具体规则如下&#xff1a; 1 把每个单词的首字母变成大写 2 把数字与字母之间用下划线字符&#xff08;_&#xff09;分开&#xff0c;使得更清晰 …

【论文阅读】MSGNet:学习多变量时间序列预测中的多尺度间序列相关性

MSGNet&#xff1a;学习多变量时间序列预测中的多尺度间序列相关性 文献介绍摘要总体介绍背景及当前面临的问题现有解决方案及其局限性本文的解决方案及其贡献 背景知识的相关工作背景知识问题表述&#xff1a; Method论文主要工作1.输入嵌入和剩余连接 (Input Embedding and R…

Java高级编程—泛型

文章目录 1.为什么要有泛型 (Generic)1.1 泛型的概念1.2 使用泛型后的好处 2.在集合中使用泛型3.自定义泛型结构3.1 自定义泛型类、泛型接口3.2 自定义泛型方法 4.泛型在继承上的体现5.通配符的使用5.1 基本使用5.2 有限制的通配符的使用 1.为什么要有泛型 (Generic) Java中的…

边缘计算与物联网的核心 —— 低功耗芯片

一、低功耗芯片 在边缘计算与物联网&#xff08;IoT&#xff09;中&#xff0c;低功耗芯片扮演了至关重要的角色&#xff0c;主要体现在以下几个方面&#xff1a; 延长设备寿命&#xff1a;物联网设备通常需要部署在难以更换电池或不方便进行频繁维护的环境中&#xff0c;比如…

软考高级:信息系统分类-业务处理系统(TPS)概念和例题

作者&#xff1a;明明如月学长&#xff0c; CSDN 博客专家&#xff0c;大厂高级 Java 工程师&#xff0c;《性能优化方法论》作者、《解锁大厂思维&#xff1a;剖析《阿里巴巴Java开发手册》》、《再学经典&#xff1a;《Effective Java》独家解析》专栏作者。 热门文章推荐&am…