合并spark structured streaming处理流式数据产生的小文件

备注:

By 远方时光原创,可转载,不能复制到其他平台

背景:做流批一体,湖仓一体的大数据架构,常见的做法就是

数据源->spark Streaming->ODS(数据湖)->spark streaming->DWD(数据湖)->...

那么数据源->spark Streaming->ODS,以这段为例,在数据源通过spark structured streaming写入ODS在数据湖(Delta Lake)落盘时候必然会产生很多小文件

目的:

为了在批处理spark-sql运行更快,也避免因为小文件而导致报错

影响:

WARNING: Failed to connect to /172.16.xx.xx:9866 for block, add to deadNodes and continue. java.net.SocketException: Too many open files
 

1.小文件在批处理数据IO消耗巨大,程序可能卡死

2.小文件块都有对应的元数据,元数据放在NameNode,导致需要的内存大大增大,增加NameNode压力,这样会限制了集群的扩展。

3.在HDFS或者对象储存中,小文件的读写处理速度要远远小于大文件,(寻址耗时)

解决思路:

事前:

1.避免写入时候产生过多小文件

  • 做好分区partitionBy(年,月,日), 避免小文件过于分散
  • Trigger触发时间可以设置为1分钟,这样会攒一批一写入,避免秒级别写入而产生大量小文件(但是使用spark structured 想要做real-time不能这样,只适合做准实时

2.打开自适应框架的开关

spark.sql.adaptive.enabled true

3.通过spark的coalesce()方法和repartition()方法

val rdd2 = rdd1.coalesce(8, true) //(true表示是否shuffle)
val rdd3 = rdd1.repartition(8)

coalesce:coalesce()方法的作用是返回指定一个新的指定分区的Rdd,如果是生成一个窄依赖的结果,那么可以不发生shuffle,分区的数量发生激烈的变化,计算节点不足,不设置true可能会出错。
repartition:coalesce()方法shuffle为true的情况。

事后(小文件引起已经产生):

1:优化 Delta 表的写入,避免小文件产生

在开源版 Spark 中,每个 executor 向 partition 中写入数据时,都会创建一个表文件进行写入,最终会导致一个 partition 中产生很多的小文件。

Databricks 对 Delta 表的写入过程进行了优化,对每个 partition,使用一个专门的 executor 合并其他 executor 对该 partition 的写入,从而避免了小文件的产生。

该特性由表属性 delta.autoOptimize.optimizeWrite 来控制:

可以在创建表时指定

CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

也可以修改表属性

ALTER TABLE table_name
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

该特性有两个优点:通过减少被写入的表文件数量,提高写数据的吞吐量;避免小文件的产生,提升查询性能。

其缺点也是显而易见的,由于使用了一个 executor 来合并表文件的写入,从而降低了表文件写入的并行度,此外,多引入的一层 executor 需要对写入的数据进行 shuffle,带来额外的开销。因此,在使用该特性时,需要对场景进行评估:

该特性适用的场景:频繁使用 MERGE,UPDATE,DELETE,INSERT INTO,CREATE TABLE AS SELECT 等 SQL 语句的场景;

该特性不适用的场景:写入 TB 级以上数据。

2.自动合并小文件

在流处理场景中,比如流式数据入湖场景下,需要持续的将到达的数据插入到 Delta 表中,每次插入都会创建一个新的表文件用于存储新到达的数据,假设每10s触发一次,那么这样的流处理作业一天产生的表文件数量将达到8640个,且由于流处理作业通常是 long-running 的,运行该流处理作业100天将产生上百万个表文件。这样的 Delta 表,仅元数据的维护就是一个很大的挑战,查询性能更是急剧恶化。

为了解决上述问题,Databricks 提供了小文件自动合并功能,在每次向 Delta 表中写入数据之后,会检查 Delta 表中的表文件数量,如果 Delta 表中的小文件(size < 128MB 的视为小文件)数量达到阈值,则会执行一次小文件合并,将 Delta 表中的小文件合并为一个新的大文件。

该特性由表属性 delta.autoOptimize.autoCompact 控制,和特性 delta.autoOptimize.optimizeWrite 相同,可以在创建表时指定,也可以对已创建的表进行修改。自动合并的阈值由 spark.databricks.delta.autoCompact.minNumFiles 控制,默认为50,即小文件数量达到50会执行表文件合并;合并后产生的文件最大为128MB,如果需要调整合并后的目标文件大小,可以通过调整配置 spark.databricks.delta.autoCompact.maxFileSize 实现。

3:手动合并小文件(我常用,每天定时运行合并分区内小文件,再去处理批任务)

自动小文件合并会在对 Delta 表进行写入,且写入后表中小文件达到阈值时被触发。除了自动合并之外,Databricks 还提供了 Optimize 命令使用户可以手动合并小文件,优化表结构,使得表文件的结构更加紧凑。在实现上 Optimize 使用 bin-packing 算法,该算法不但会合并表中的小文件,且合并后生成的表文件也更均衡(表文件大小相近)。例如,我们要对 Delta 表 student 的表文件进行优化,仅需执行如下命令即可实现:(Optimize 命令不但支持全表小文件的合并,还支持特定的分区的表文件的合并

OPTIMIZE student WHERE date >= '2024-01-01'

一般批处理是day+1(每天凌晨运行前一天的数据),我在运行批处理前,运行一下optimize,合并昨天产生的小文件

附加:

面试官可能会问,我运行optimize合并小文件,但是小文件太多了,直接卡死运行不了程序(某互联网面试题,背景是:一次合并一个月,甚至一年产生的小文件

回答:

1.首先停掉程序,这里注意deltalake因为有历史版本这个概念,所以不存在运行一半覆盖原来版本情况,可以基于上一个版本重新运行(考点)

2.第二点,大数据思想分而治之“分”,即把复杂的任务分解为若干个“简单的任务”来处理。

​OPTIMIZE student WHERE date > '2024-01-01' and date < '2024-01-02'

因为前面做了partitionby(年月日),那么缩小optimize范围,在遍历这个月的每一天日期,分治处理

3.第三点,大数据思想,自己不行找兄弟,加节点,加计算资源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/412536.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

STM32--低功耗模式详解

一、PWR简介 正常模式与睡眠模式耗电是mA级&#xff0c;停机模式与待机模式是uA级。 二、电源框图 供电区域有三处&#xff0c;分别是模拟部分供电&#xff08;VDDA&#xff09;&#xff0c;数字部分供电&#xff0c;包括VDD供电区域和1.8V供电区域&#xff0c;后备供电&…

Java 学习和实践笔记(22):package(包机制)、JDK常见的包、类的导入

前面学的类&#xff0c;每创建一个类&#xff0c;在电脑上就是创建了一个对应的类文件。而package 相当于文件夹对文件的管理作用。主要用于管理类、用于解决类的重名问题。这个含义很简单。因为实际的程序&#xff0c;类可能有成千上万个&#xff0c;这样就需要把不同功能的类…

视频和音频使用ffmpeg进行合并和分离(MP4)

1.下载ffmpeg 官网地址&#xff1a;https://ffmpeg.org/download.html 2.配置环境变量 此电脑右键点击 属性 - 高级系统配置 -高级 -环境变量 - 系统变量 path 新增 文件的bin路径 3.验证配置成功 ffmpeg -version 返回版本信息说明配置成功4.执行合并 ffmpeg -i 武家坡20…

dpdk协议栈之udp架构优化

dpdk优势 传统网络架构与 DPDK&#xff08;Data Plane Development Kit&#xff09;网络架构之间存在许多区别&#xff0c;而 DPDK 的优势主要体现在以下几个方面&#xff1a; 数据包处理性能&#xff1a;传统网络架构中&#xff0c;网络数据包的处理通常由操作系统的网络协议…

测试环境搭建整套大数据系统(七:集群搭建kafka(2.13)+flink(1.14)+dinky+hudi)

一&#xff1a;搭建kafka。 1. 三台机器执行以下命令。 cd /opt wget wget https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz tar zxvf kafka_2.13-3.6.1.tgz cd kafka_2.13-3.6.1/config vim server.properties修改以下俩内容 1.三台机器分别给予各自的broker_id…

网络安全与IP安全网络安全

网络安全与IP安全网络安全 网络安全 是指网络系统的硬件&#xff0c;软件以及系统中的数据收到的保护。 保护的基本属性为&#xff1a;机密性&#xff0c;身份认证&#xff0c;完整性和可用性&#xff1b; 基本特征&#xff1a;相对性&#xff0c;时效性&#xff0c;相关性…

vue3 + vite + ts 中使用less文件全局变量

文章目录 安装依赖新建css变量文件全局引入css变量文件使用css变量 一、安装依赖 npm install less less-loader --save-dev 二、新建CSS变量文件 (1) :在根目录下的src文件中 src-> asset -> css ->glibal.less // glibal.less :root{--public_background_font_Col…

Leetcoder Day23| 回溯part03:组合+分割

语言&#xff1a;Java/Go 39. 组合总和 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target &#xff0c;找出 candidates 中可以使数字和为目标数 target 的所有不同组合 &#xff0c;并以列表形式返回。你可以按任意顺序返回这些组合。 candidates 中的同一个…

LDR6020双盲插音频随便插充电听歌随便插

随着智能手机的普及和功能的日益丰富&#xff0c;手机已经成为我们日常生活中不可或缺的一部分。音乐、电影、游戏等娱乐内容更是丰富了手机的使用体验。而在这其中&#xff0c;音频转接器的作用愈发凸显&#xff0c;特别是在边听边充的场景下&#xff0c;一款高效且便捷的手机…

【底层解读】ArrayList源码学习

成员变量 学习源码前&#xff0c;我们还是先看一下ArrayList中成员变量有哪些 构造函数 ArrayList一共有三个构造函数。 第一个&#xff1a;带有指定初始容量的构造函数 第二个&#xff1a;空参构造 第三个&#xff1a;包含指定集合的构造函数 OK&#xff0c;看完构造函数&a…

Gemma谷歌(google)开源大模型微调实战(fintune gemma-2b)

Gemma-SFT Gemma-SFT(谷歌, Google), gemma-2b/gemma-7b微调(transformers)/LORA(peft)/推理 项目地址 https://github.com/yongzhuo/gemma-sft全部weights要用fp32/tf32, 使用fp16微调十几或几十的步数后大概率lossnan;(即便layer-norm是fp32也不行, LLaMA就没有这个问题, …

Open CASCADE学习|视图

目录 Mainwin.h Mainwin.cpp Mainwin.h ​#pragma once#include <QtWidgets/QMainWindow>#include "Displaywin.h"#include "OCC.h"class Mainwin : public QMainWindow{ Q_OBJECTpublic: Mainwin(QWidget* parent nullptr); ~Mainwin();​pri…

Python中高效的爬虫框架,你用过几个?

在信息时代&#xff0c;数据是无价之宝。许多开发者和数据分析师需要从互联网上采集大量的数据&#xff0c;用于各种用途&#xff0c;如分析、建模、可视化等。Python作为一门强大的编程语言&#xff0c;提供了多种高效的爬虫框架&#xff0c;使数据采集变得更加容易和高效。本…

IT资讯——全速推进“AI+鸿蒙”战略布局!

文章目录 每日一句正能量前言坚持长期研发投入全速推进“AI鸿蒙”战略 人才战略新章落地持续加码核心技术生态建设 后记 每日一句正能量 人总要咽下一些委屈&#xff0c;然后一字不提的擦干眼泪往前走&#xff0c;没有人能像白纸一样没有故事&#xff0c;成长的代价就是失去原来…

HashMap的put()方法执行流程

HashMap的数据结构在jdk1.8之前是数组链表&#xff0c;为了解决数据量过大、链表过长是查询效率会降低的问题变成了数组链表红黑树的结构&#xff0c;利用的是红黑树自平衡的特点。 链表的平均查找时间复杂度是O(n)&#xff0c;红黑树是O(log(n))。 HashMap中的put方法执行过…

机器视觉运动控制一体机在光伏汇流焊机器人系统的解决方案

一、市场应用背景 汇流焊是光伏太阳能电池板中段加工工艺&#xff0c;其前道工序为串焊&#xff0c;在此环节流程中&#xff0c;需要在多个太阳能电池片表面以平行方式串焊多条焊带&#xff0c;形成电池串。串焊好的多组电池串被有序排列输送到汇流焊接工作台&#xff0c;通过…

springboot+vue实现微信公众号扫码登录

通常在个人网站中&#xff0c;都会有各种第三方登录&#xff0c;其中微信登录需要认证才能使用&#xff0c;导致个人开发者不能进行使用此功能&#xff0c;但是我们可以使用微信公众号回复特定验证码来进行登录操作。 微信关键词处理 微信公众号关键词自动回复&#xff0c;具体…

第四套CCF信息学奥赛c++ CSP-J认证初级组 中小学信奥赛入门组初赛考前模拟冲刺题(完善程序题)

第四套中小学信息学奥赛CSP-J考前冲刺题 三、完善程序题 第一题 田忌赛马 田忌赛马&#xff0c;田忌每赢一次齐王的马就得200金&#xff0c;,当然输了就扣200金币&#xff0c;平局则金币数 不变。 #include<iostream> using namespace std; int main(){int n;while(c…

ARM系列 -- 虚拟化(二)

上一篇介绍了虚拟化和hypervisor的基本概念。为了配合虚拟化&#xff0c;ARM做了许多工作&#xff0c;首先是定义了四个异常等级&#xff08;Exception Level&#xff0c;简称EL&#xff09;。 前面介绍异常和特权的文章中有介绍&#xff0c;此处再啰嗦几句。每个异常级别都有…

css transform 会影响position 定位

比如通过以下代码.实现导航条上的每个li栏目,以不同的时间间隔,从上向下移动进来并显示 .my-navbar ul li {position: relative;opacity: 0;transform: translateY(-30px);transition: transform .6s cubic-bezier(.165,.84,.44,1),opacity .6s cubic-bezier(.165,.84,.44,1);…