解决Spark流处理产生的小文件问题

做流批一体,湖仓一体的大数据架构,常见的做法就是:

数据源->spark Streaming->ODS(数据湖)->spark streaming->DWD(数据湖)->...

那么数据源->spark Streaming->ODS,以这段为例,在数据源通过spark structured streaming写入ODS在数据湖(Delta Lake)落盘时候必然会产生很多小文件。

图片

1、目的

为了在批处理spark-sql运行更快,也避免因为小文件而导致报错。

2、影响

WARNING: Failed to connect to /172.16.xx.xx:9866 for block, add to deadNodes and continue. java.net.SocketException: 
Too many open files
  1. 小文件在批处理数据IO消耗巨大,程序可能卡死。

  2. 小文件块都有对应的元数据,元数据放在NameNode,导致需要的内存大大增大,增加NameNode压力,这样会限制了集群的扩展。

  3. 在HDFS或者对象储存中,小文件的读写处理速度要远远小于大文件,(寻址耗时)。

3、解决思路

3.1、事前

(1)避免写入时候产生过多小文件 做好分区partitionBy(年,月,日), 避免小文件过于分散 Trigger触发时间可以设置为1分钟,这样会攒一批一写入,避免秒级别写入而产生大量小文件(但是使用spark structured 想要做real-time不能这样,只适合做准实时)

(2)打开自适应框架的开关

spark.sql.adaptive.enabled true

(3)通过spark的coalesce()方法和repartition()方法

val rdd2 = rdd1.coalesce(8, true) //(true表示是否shuffle)
val rdd3 = rdd1.repartition(8)

coalesce:coalesce()方法的作用是返回指定一个新的指定分区的Rdd,如果是生成一个窄依赖的结果,那么可以不发生shuffle,分区的数量发生激烈的变化,计算节点不足,不设置true可能会出错。

repartition:coalesce()方法shuffle为true的情况。

3.2、事后(小文件引起已经产生)

(1)优化 Delta 表的写入,避免小文件产生 在开源版 Spark 中,每个 executor 向 partition 中写入数据时,都会创建一个表文件进行写入,最终会导致一个 partition 中产生很多的小文件。

Databricks 对 Delta 表的写入过程进行了优化,对每个 partition,使用一个专门的 executor 合并其他 executor 对该 partition 的写入,从而避免了小文件的产生。

图片

该特性由表属性 delta.autoOptimize.optimizeWrite 来控制:可以在创建表时指定

CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

也可以修改表属性

ALTER TABLE table_name
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

该特性有两个优点:

(1)通过减少被写入的表文件数量,提高写数据的吞吐量;

(2)避免小文件的产生,提升查询性能。

缺点:

其缺点也是显而易见的,由于使用了一个 executor 来合并表文件的写入,从而降低了表文件写入的并行度,此外,多引入的一层 executor 需要对写入的数据进行 shuffle,带来额外的开销。因此,在使用该特性时,需要对场景进行评估。

场景:

该特性适用的场景:频繁使用 MERGE,UPDATE,DELETE,INSERT INTO,CREATE TABLE AS SELECT 等 SQL 语句的场景;

该特性不适用的场景:写入 TB 级以上数据。

(2)自动合并小文件

在流处理场景中,比如流式数据入湖场景下,需要持续的将到达的数据插入到 Delta 表中,每次插入都会创建一个新的表文件用于存储新到达的数据,假设每10s触发一次,那么这样的流处理作业一天产生的表文件数量将达到8640个,且由于流处理作业通常是 long-running 的,运行该流处理作业100天将产生上百万个表文件。这样的 Delta 表,仅元数据的维护就是一个很大的挑战,查询性能更是急剧恶化。

为了解决上述问题,Databricks 提供了小文件自动合并功能,在每次向 Delta 表中写入数据之后,会检查 Delta 表中的表文件数量,如果 Delta 表中的小文件(size < 128MB 的视为小文件)数量达到阈值,则会执行一次小文件合并,将 Delta 表中的小文件合并为一个新的大文件。

该特性由表属性 delta.autoOptimize.autoCompact 控制,和特性 delta.autoOptimize.optimizeWrite 相同,可以在创建表时指定,也可以对已创建的表进行修改。自动合并的阈值由 spark.databricks.delta.autoCompact.minNumFiles 控制,默认为50,即小文件数量达到50会执行表文件合并;

合并后产生的文件最大为128MB,如果需要调整合并后的目标文件大小,可以通过调整配置 spark.databricks.delta.autoCompact.maxFileSize 实现。

(3)手动合并小文件(我常用,每天定时运行合并分区内小文件,再去处理批任务)

自动小文件合并会在对 Delta 表进行写入,且写入后表中小文件达到阈值时被触发。除了自动合并之外,Databricks 还提供了 Optimize 命令使用户可以手动合并小文件,优化表结构,使得表文件的结构更加紧凑。

在实现上 Optimize 使用 bin-packing 算法,该算法不但会合并表中的小文件,且合并后生成的表文件也更均衡(表文件大小相近)。例如,我们要对 Delta 表 student 的表文件进行优化,仅需执行如下命令即可实现:(Optimize 命令不但支持全表小文件的合并,还支持特定的分区的表文件的合并)。

OPTIMIZE student WHERE date >= '2024-01-01'

4、附加

面试官可能会问,我运行optimize合并小文件,但是小文件太多了,直接卡死运行不了程序(某互联网面试题)

(1)首先停掉程序,这里注意deltalake因为有历史版本这个概念,所以不存在运行一半覆盖原来版本情况,可以基于上一个版本重新运行(考点)。

(2)第二点,大数据思想分而治之,“分”,即把复杂的任务分解为若干个“简单的任务”来处理。

OPTIMIZE student WHERE date > '2024-01-01' and date < '2024-01-02'

因为前面做了partitionby(年月日),那么缩小optimize范围,在遍历这个月的每一天日期,分治处理。

(3)第三点,大数据思想,自己不行找兄弟,加节点,加计算资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/686534.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

充电桩产业链及商业模式

产业链概况 充电桩产业链分为上游元器件和设备生产商、建设商&#xff0c;中游为运营商&#xff0c;下游为各类充电场景。其中&#xff0c;上游零部件厂商提供充电模块&#xff08;IGBT、逆变器等&#xff09;、配电滤波设备、监控计费设备、充电枪等&#xff1b;中游充电桩厂…

Linux Ext2/3/4文件系统

文章目录 前言一、Linux文件系统简介1.1 简介1.2 Linux File System Structure1.3 Directory Structure 二、Ext2/3/4文件系统2.1 Minix2.2 EXT2.3 EXT22.4 EXT32.5 EXT4 三、EXT Inode参考资料 前言 这篇文章介绍了Linux文件系统的一些基础知识&#xff1a;Linux 文件系统简介…

sing-task message

文章目录 1.起因2.查因过程2.1 定位job2.2 定位sql text2.3 定位db_link2.4 测试dblink2.5 tnsping host2.6 检查host信息2.7检查网路状况 3.处置办法&#xff1a;4.结论 1.起因 在巡查长事务时&#xff0c;有两个事务执行了很长时间没有完成 SELECT SE.SID,SE.SERIAL#,to_ch…

创新案例 | AI数据驱动下的全域数字化转型的五大关键洞见

近年来通过全域数字化转型在竞争激烈的市场中脱颖而出。传统零食行业面临市场竞争加剧和消费者需求多样化的挑战&#xff0c;如何利用数据驱动和AI技术&#xff0c;能更好地实现会员运营效率和用户满意度的显著提升呢&#xff1f;本文将探讨全域数字化转型的五大关键洞见&#…

Application UI

本节包含关于如何用DevExpress控件模拟许多流行的应用程序ui的教程。 Windows 11 UI Windows 11和最新一代微软Office产品启发的UI。 Office Inspired UI Word、Excel、PowerPoint和Visio等微软Office应用程序启发的UI。 How to: Build an Office-inspired UI manually 本教…

关于Stream.toList()方法使用小记

对照示例 public static void main(String[] args) {final List<String> list new ArrayList<>();list.add("aa");list.add("bb");list.add("cc");list.remove("cc");System.out.println(list);}结果&#xff1a; Stre…

华为机考入门python3--(33)牛客33-图片整理

分类&#xff1a;排序 知识点&#xff1a; 对字符串中的字符ASCII码排序 sorted(my_str) 题目来自【牛客】 def sort_images(s):# 可以使用ord(A)求A的ASCII值&#xff0c;需要注意的是A的值&#xff08;65&#xff09;比a的值小&#xff08;97&#xff09;sorted_images …

经济与安全兼顾:茶饮店购买可燃气体报警器的价格考量

可燃气体报警器在如今的社会中扮演着至关重要的角色。它们用于检测环境中的可燃气体浓度&#xff0c;及早发现潜在的火灾隐患&#xff0c;保护人们的生命和财产安全。 在这篇文章中&#xff0c;佰德将介绍可燃气体报警器的安装、检定以及价格&#xff0c;通过实际案例和数据&a…

【MySQL】SQL通用语法

【MySQL】SQL通用语法 SQL是结构化查询语言&#xff08;Structured Query Language&#xff09;的缩写&#xff0c;是一种专门用来管理和操作关系型数据库的标准化语言。SQL能够实现数据库的创建、查询、更新和删除操作&#xff0c;以及对数据进行存储、检索和管理。通过SQL语句…

【MySQL】数据库的增删查改

文章目录 前言1. 新增1.1 全插入1.2 指定某些列名插入1.3 多行插入1.4 边查询边插入 2. 约束2.1 非空约束2.2 唯一性约束2.3 默认值约束2.4 主键约束2.5 外键约束2.6 check 约束2.7 外键的逻辑删除 3. 查询 - 初阶3.1 全列查询3.2 指定列查询3.3 指定表达式查询3.4 别名查询3.5…

Python pandas openpyxl excel合并单元格,设置边框,背景色

Python pandas openpyxl excel合并单元格&#xff0c;设置边框&#xff0c;背景色 1. 效果图2. 源码参考 1. 效果图 pandas设置单元格背景色&#xff0c;字体颜色&#xff0c;边框 openpyxl合并单元格&#xff0c;设置丰富的字体 2. 源码 # excel数字与列名互转 import o…

【Python】把指定组织形式的txt转换为xmind

人工智能训练通常需要使用文本格式&#xff0c;把基于训练之后的内容&#xff0c;让GLM大模型输出如下格式的文本&#xff1a; weltestDemo|#|weltest|#|静态界面|#|输入|#|长度|#|不超过四位 weltestDemo|#|weltest|#|静态界面|#|输入|#|长度|#|不超过五位 weltestDemo|#|wel…

零基础打靶—Glasgow Smile靶场

一、打靶的主要五大步骤 1.确定目标&#xff1a;在所有的靶场中&#xff0c;确定目标就是使用nmap进行ip扫描&#xff0c;确定ip即为目标&#xff0c;其他实战中确定目标的方式包括nmap进行扫描&#xff0c;但不局限于这个nmap。 2.常见的信息收集&#xff1a;比如平常挖洞使用…

Xxl-Job二开踩坑记录

Xxl-Job踩坑记录 将xxl-job二次开发了&#xff0c;然后在对接于拓展功能的时候发现了一些xxl-job在使用或性能上隐藏的坑&#xff1b; 接口请求超时 起初是设定业务方通过http接口调用xxl-job的增删改接口完成对任务的数据操作&#xff1b; 因此直接使用了内置提供的 XxlJo…

GWT 与 Python App Engine 集成

将 Google Web Toolkit (GWT) 与 Python App Engine 集成可以实现强大的 Web 应用程序开发。这种集成允许你使用 GWT 的 Java 客户端技术构建丰富的用户界面&#xff0c;并将其与 Python 后端结合在一起&#xff0c;后端可以运行在 Google App Engine 上。 1、问题背景 在 Pyt…

【python进阶】python图形化编程之美--tkinter模块初探

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

B=2W,奈奎斯特极限定理详解

一直没搞明白奈奎斯特极限定理的含义&#xff0c;网上搜了很久也没得到答案。最近深思几天后&#xff0c;终于有了点心得。顺便吐槽一下&#xff0c;csdn的提问栏目&#xff0c;有很多人用chatgpt秒回这个事&#xff0c;实在是解决不了问题&#xff0c;有时候人的问题大多数都是…

javaweb的新能源充电系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;用户管理&#xff0c;充电桩管理&#xff0c;报修管理&#xff0c;新能源公告管理 前台账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;充电桩&#xff0c;新…

QA测试开发工程师面试题满分问答26: Cookie、Session、Token和JWT的定义、区别和使用场景

这是一个非常常见的面试题,需要全面掌握 Cookie、Session、Token 和 JWT 的定义和使用场景,以及它们之间的区别。下面是一个详细的满分回答: Cookie: 定义: Cookie 是一种存储在客户端(通常是浏览器)的小型文本文件,用于在客户端与服务器之间保持会话状态。使用场景: 常用于保存…

参数传递和剪枝,从修剪二叉树谈起

669. 修剪二叉搜索树 - 力扣&#xff08;LeetCode&#xff09; 一、参数传递 Java中的参数传递方式只有一种&#xff0c;那就是值传递。如果我们传的是基本数据类型&#xff0c;那么函数接收到的就是该数据的副本&#xff0c;如果我们传的是对象&#xff0c;那么函数接收到的就…