大数据-226 离线数仓 - Flume 优化配置 自定义拦截器 拦截原理 拦截器实现 Java

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 需求分析 指标口径
  • 日志数据采集 taildir source HDFS Sink Agent Flume
  • 优化配置

在这里插入图片描述

Flume的优化配置

Flume 是一种分布式、可靠且高效的数据收集、聚合和传输系统,广泛应用于大数据生态系统中。为了提升 Flume 的性能和稳定性,优化配置至关重要。

使用如下的指令,启动Agent进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs1.conf -name a1 -Dflum
e.roog.logger=INFO,console

启动后的截图如下所示:
在这里插入图片描述

查看刚才的Flume窗口:
在这里插入图片描述

查看HDFS的内容:
在这里插入图片描述

批量处理

  • 参数:batchSize
  • 作用:控制 Flume 在批量传输时每次传输的事件数量。
    配置建议:
  • Source 到 Channel:根据 Source 的吞吐量和 Channel 的吞吐能力调整,推荐值为 100-1000。
  • Channel 到 Sink:根据 Sink 的处理能力和目标系统的写入性能调整,推荐值为 500-5000。

压缩传输

  • 参数:compressionType
  • 作用:对事件进行压缩后传输,减少网络带宽消耗。
  • 支持的压缩类型:gzip、snappy、lz4 等。
  • 配置建议:根据目标系统是否支持解压缩功能选择合适的压缩类型。

Source 优化

Taildir Source

  • 参数:batchSize 和 fileHeader
  • batchSize:设置单次从文件中读取的事件数量。
  • fileHeader:是否在事件头部添加文件名,推荐开启以便于后续处理。

Kafka Source

  • 参数:kafka.consumer.timeout.ms 和 fetch.message.max.bytes
  • kafka.consumer.timeout.ms:设置 Kafka 消费者读取数据的超时时间,通常为 100-500ms。
  • fetch.message.max.bytes:设置每次读取的最大消息大小,默认值通常为 1MB,可以根据业务场景适当调整。

Channel 优化

Memory Channel

  • 参数:capacity 和 transactionCapacity
  • capacity:Channel 中允许的最大事件数。
  • transactionCapacity:单次事务中允许的最大事件数。

File Channel

  • 参数:checkpointDir 和 dataDirs
  • checkpointDir:存储 Channel 状态的目录。
  • dataDirs:存储事件数据的目录,建议设置多个磁盘路径以提升 IO 性能。
  • 配置建议:确保磁盘 IO 性能足够,避免瓶颈。

Flume报错解决

向 logs 目录中存放入日志文件,此时如果出现OOM的日志,是因为缺省情况下FlumeJVM的最大分配20M,这个值太小,需要调整。
我这里直接放入:

vim /opt/wzk/logs/start/test.log

2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

解决方案:
在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容:

export JAVA_OPTS="-Xms4000m -Xmx4000m -
Dcom.sun.management.jmxremote"
# 要想使配置文件生效,还要在命令行中指定配置文件目录
flume-ng agent --conf flume-1.9/conf --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

flume-ng agent --conf-file flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console

Flume内存参数设置及优化:

  • 根据日志数据量大小,JVM堆一般要设置为4G或者更高
  • -Xms -Xmx最好设置一致,减少内存抖动带来的性能影响

自定义拦截器

前面FlumeAgent的配置使用了本地时间,可能导致数据存放的路径不正确。要解决上面的问题就需要使用自定义拦截器。
Agent用于测试自定义拦截器,source => logger sink
flumetest1.conf

# a1是agent的名称。source、channel、sink的名称分别为:r1 c1 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = h122.wzk.icu
a1.sources.r1.port = 9999
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# sink
a1.sinks.k1.type = logger
# source、channel、sink之间的关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

自定义拦截器原理

自定义拦截器的原理:

  • 自定义拦截器要集成 Flume 的 Interceptor
  • Event 分为 header 和 body (接收的字符串)
  • 获取 header 和 body
  • 从 body 中 获取 time,并将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置到header中

自定义拦截器实现

自定义拦截器的实现:

  • 获取event的header
  • 获取event的body
  • 解析body获取json串
  • 解析json串获取时间戳
  • 将时间戳转换为字符串 yyyy-MM-dd
  • 将转换后的字符串放置header中
  • 返回event

导入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.1.23</version>
  </dependency>
</dependencies>

编写代码

package icu.wzk;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class CustomerInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 这里是逐条处理
        String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);
        // 获取Event的Header
        Map<String, String> headerMap = event.getHeaders();
        // 解析Body获取JSON字符串
        String[] bodyArr = eventBody.split("\\s+");
        try {
            String jsonStr = bodyArr[6];
            // 解析JSON字符串获取时间戳
            JSONObject jsonObject = JSON.parseObject(jsonStr);
            String timestampStr = jsonObject.getJSONObject("app_active").getString("time");
            // 将时间戳转换字符串 yyyy-MM-dd
            // 将字符串转换为Long
            long timestampLong = Long.parseLong(timestampStr);
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
            Instant instant = Instant.ofEpochMilli(timestampLong);
            LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
            String date = formatter.format(localDateTime);
            // 将转换后的字符串放置header中
            headerMap.put("logtime", date);
            event.setHeaders(headerMap);
        } catch (Exception e) {
            headerMap.put("logtime", "Unknown");
            event.setHeaders(headerMap);
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        List<Event> lstEvent = new ArrayList<>();
        for (Event event : list) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                lstEvent.add(outEvent);
            }
        }
        return lstEvent;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new CustomerInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/918578.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【3D Slicer】的小白入门使用指南九

定量医学影像临床研究与实践 任务 定量成像教程 定量成像是从医学影像中提取定量测量的过程。 本教程基于两个定量成像的例子构建: - 形态学:缓慢生长肿瘤中的小体积变化 - 功能:鳞状细胞癌中的代谢活动 第1部分:使用变化跟踪模块测量脑膜瘤的小体积变化第2部分:使用PET标…

Visual Studio 圈复杂度评估

VisualStudio自带的有工具 之后就可以看到分析结果

计算机毕业设计 | SpringBoot+vue城镇保障性住房管理 公租房系统(附源码+论文)

1&#xff0c;绪论 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理城镇保障性住房管理系统的相关信…

Cherno OpenGL(28 ~ 33)

批量渲染-介绍 在这里我们将在一个drawcall打包多个几何体。即 batch geometry。 我们在这里将聚焦于2d渲染&#xff0c;我们如何渲染一堆2d的quads或者说rectangles呢&#xff1f; 一种情况是比如一个2d游戏有很多个tile组成&#xff0c;要去渲染这些tile&#xff1b;另一种…

物联网——UNIX时间戳、BKP备份寄存器、RTC时钟

RTC时钟 Unix时间戳 UTC/GMT 时间戳转换 时间戳转换 BKP简介 RTC框图 RTC基本结构 硬件供电电路 RTC操作注意事项 接线图&#xff08;读写备份寄存器和实时时钟&#xff09;

EPANET供水系统水力和水质模拟,管网系统水龄模拟、余氯模拟、消毒副产物模拟、失效分析、弹性分析等

EPANET 是美国环保综述开发的用于供水系统水力和水质模拟的软件工具&#xff0c;可以视作是SWMM模型&#xff08;主要用于排水系统&#xff09;的姊妹软件。它主要用于供水系统的设计、操作和分析&#xff0c;适用于包括城市、工业区和农村地区各种规模的供水系统分析。在水力模…

51单片机基础04 LCD1602时序;Proteus仿真单片机、总线、网络标号等;

目录 一、LCD显示字符 1、写指令 &#xff08;1&#xff09;、LCD状态配置 &#xff08;2&#xff09;、显示开关与光标 2、写数据 &#xff08;1&#xff09;、设置地址 &#xff08;2&#xff09;、设置数据 3、初始化代码 &#xff08;1&#xff09;、初始化流程 …

OceanBase 闪回查询

前言 在OB中&#xff0c;drop表可以通过 回收站 或者 以往的备份恢复来还原单表。当delete数据时&#xff0c;由于delete操作的对象不会进入回收站&#xff0c;此时需要通过闪回查询功能查看delete的数据&#xff0c;以便后续恢复 本次实验版本为 OceanBase 4.2.1.8&#xff0…

小版本大不同 | Navicat 17 新增 TiDB 功能

近日&#xff0c;Navicat 17 迎来了小版本更新。此次版本新增了对 PingCap 公司的 TiDB 开源分布式关系型数据库的支持&#xff0c;进一步拓展了 Navicat 的兼容边界。即日起&#xff0c;Navicat 17 所有用户可免费升级至最新版本&#xff0c;通过 Navicat 工具实现 TiDB 数据库…

SpringBoot开发——SpringBoot3.3 实现停止/重启定时任务

文章目录 一、运行效果二、项目结构三、功能实现1、项目依赖配置(pom.xml)2、配置文件(application.yaml)3、创建 TaskSchedulerProperties 配置类4、定时任务的实现5、任务管理器的实现6、控制器的实现7、启动应用程序类8、视图控制器9、前端页面(Thymeleaf + Bootstrap)…

【大数据技术基础 | 实验十一】Hive实验:新建Hive表

文章目录 一、实验目的二、实验要求三、实验原理四、实验环境五、实验内容和步骤&#xff08;一&#xff09;启动Hive&#xff08;二&#xff09;创建表&#xff08;三&#xff09;显示表&#xff08;四&#xff09;显示表列&#xff08;五&#xff09;更改表&#xff08;六&am…

c++ 后端

基础知识 1. 指针、引用2. 数组3. 缺省参数4. 函数重载5. 内联函数6. 宏7. auto8. const9. 类和对象10. 类的6个默认成员函数11. 初始化列表12. this指针13. C/C的区别14. C 三大特性15. 结构体内存对齐规则16. explicit17. static18. 友元类、友元函数19. 内部类20. 内存管理&…

[C++]:C++11(一)

1. 统一列表初始化 1.1 C11 之前的初始化方式 在 C11 标准中&#xff0c;引入了一个非常实用且强大的特性——统一列表初始化&#xff08;Uniform Initialization&#xff09;&#xff0c;它为我们在初始化各种类型的对象时提供了一种统一且方便的语法形式&#xff0c;极大地…

基于的图的异常检测算法OddBall

OddBall异常检测算法出自2010年的论文《OddBall: Spotting Anomalies in Weighted Graphs》&#xff0c;它是一个在加权图(weighted graph)上检测异常点的算法&#xff0c;基本思路为计算每一个点的一度邻域特征&#xff0c;然后在整个图上用这些特征拟合出一个函数&#xff0c…

基于AOA算术优化的KNN数据聚类算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 5.完整程序 1.程序功能描述 基于AOA算术优化的KNN数据聚类算法matlab仿真。通过AOA优化算法&#xff0c;搜索最优的几个特征数据&#xff0c;进行KNN聚类&#xff0c;同时对比不同个数特征下…

【模块一】kubernetes容器编排进阶实战之CoreDNS的介绍与使用

CoreDNS进阶 CoreDNS进阶-简介 DNS组件历史版本有skydns、kube-dns和coredns三个&#xff0c;k8s 1.3版本之前使用skydns&#xff0c;之后的版本到1.17及之间的版本使用kube-dns&#xff0c; 1.18开始目前主要使用coredns&#xff0c;DNS组件用于解析k8s集群中service name所对…

栈Stack和队列Queue

目录 一、栈 &#xff08;1&#xff09;用数组实现 &#xff08;2&#xff09;用单链表实现 &#xff08;3&#xff09;用标注尾结点的单链表实现 &#xff08;4&#xff09;用双向链表实现 2、栈的实际应用 &#xff08;1&#xff09;改变元素的序列 &#xff08;2&am…

ES6标准-Promise对象

目录 Promise对象的含义 Promise对象的特点 Promise对象的缺点 Promise对象的基本用法 Promise对象的简单例子 Promise新建后就会立即执行 Promise对象回调函数的参数 Promise参数不会中断运行 Promise对象的then方法 Promise对象的catch()方法 Promise状态为resolv…

【隐私计算】隐私计算的应用场景探索(大模型隐私计算、隐私数据存储计算、Web3、隐私物联网等)

1. 背景分析 隐私计算作为一种实现“原始数据不出域&#xff0c;可用不可见”的数据流通价值的关键技术&#xff0c;经历了2020-2023年的高光时刻&#xff0c;却在2024年骤然走向低谷。从各种渠道了解到一些业内曾经风光无两的隐私计算公司都有不同程度的裁员。几乎一夜之间&am…

【大数据学习 | flume】flume的概述与组件的介绍

1. flume概述 Flume是cloudera(CDH版本的hadoop) 开发的一个分布式、可靠、高可用的海量日志收集系统。它将各个服务器中的数据收集起来并送到指定的地方去&#xff0c;比如说送到HDFS、Hbase&#xff0c;简单来说flume就是收集日志的。 Flume两个版本区别&#xff1a; ​ 1&…