Spark中读parquet文件是怎么实现的

背景

最近在整理了一下 spark对Parquet的写文件的过程,也是为了更好的理解和调优Spark相关的任务,
因为对于Spark来说,任何一个事情都不是独立的存在的,比如说parquet文件的rowgroup设置的大小对读写的影响,以及parquet写之前排序对读parquet的影响,以及向量化读取等等
本文基于Spark 3.5

分析

我们以FileSourceScanExec的doExecute方法 为切口进行分析:

  protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    if (needsUnsafeRowConversion) {
      inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
        val toUnsafe = UnsafeProjection.create(schema)
        toUnsafe.initialize(index)
        iter.map { row =>
          numOutputRows += 1
          toUnsafe(row)
        }
      }
    } else {
      inputRDD.mapPartitionsInternal { iter =>
        iter.map { row =>
          numOutputRows += 1
          row
        }
      }
    }
  }

这里的needsUnsafeRowConversion判断如果是ParquetSource,且配置了spark.sql.parquet.enableVectorizedReader为‘true’(默认就是true),则会进行unsafeRow的转换,当然这里的好处就是节约内存以及能够减少GC
对于inputRDD来说,就是创建了读取parquet的RDD:
具体的见:ParquetFileFormat.buildReaderWithPartitionValues方法,涉及到的代码多,所以只解读关键的几个部分:

  • fileRooter的读取
        val fileFooter = if (enableVectorizedReader) {
          // When there are vectorized reads, we can avoid reading the footer twice by reading
          // all row groups in advance and filter row groups according to filters that require
          // push down (no need to read the footer metadata again).
          ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS)
        } else {
          ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS)
        }
    

这里enableVectorizedReader如果是true的话, fileFooter 只会得到所属Task的FileMetaData信息,其中只包括了所属Task的需要读取的parquet RowGroups,具体的数据流如下:

ParquetFooterReader.readFooter
   ||
   \/
readFooter
   ||
   \/
fileReader.getFooter
   ||
   \/
readFooter(file, options, f, converter)
   ||
   \/
converter.readParquetMetadata
   ||
   \/
filter.accept(new MetadataFilterVisitor)

filter.accept(new MetadataFilterVisitor就会根据对应的filter类型进行不同的操作:

FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
      @Override
      public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
        FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
        return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
      }

      @Override
      public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
        FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
        return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
      }

      @Override
      public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
        FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
        // We must generate the map *before* filtering because it modifies `fileMetadata`.
        Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
        FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
        return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);
      }

      @Override
      public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
        FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
        // We must generate the map *before* filtering because it modifies `fileMetadata`.
        Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
        FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
        return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);
      }
    });
  • 如果是 ParquetFooterReader.SKIP_ROW_GROUPS ,则是走的SkipMetadataFilter这条filter,则只会拿出rowgroup的信息和rowgrups的的行数

  • 如果是 enableVectorizedReader,也就是会走 RangeMetadataFilter这个Filter,则会调用filterFileMetaDataByMidpoint,该方法会根据Task分配的数据是否覆盖了Rowgroups的中点来纳入到该task的读取的数据中来,具体的可以见:Spark-读取Parquet-为什么task数量会多于Row Group的数量

  • vectorizedReader的创建

            vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter))
            logDebug(s"Appending $partitionSchema ${file.partitionValues}")
            vectorizedReader.initBatch(partitionSchema, file.partitionValues)
            if (returningBatch) {
              vectorizedReader.enableReturningBatches()
            }
    
            // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
            iter.asInstanceOf[Iterator[InternalRow]]
    
    • vectorizedReader.initialize
      重要的点是这个主要是涉及到 parquet messageType到 ParquetColumn的转换,主要是ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration)这个的配置

    • vectorizedReader.initBatch
      这里面主要涉及到了根据memModeOFF_HEAP还是ON_HEAP模式来构造不同的ColumnVector,其中
      如果是ON_HEAP,则会创建OnHeapColumnVector,用jvm数据的形式存储
      如果是OFF_HEAP,则会创建OffHeapColumnVector,这里涉及到的对象都是都是用unsafe api来操作,这里涉及到一个有意思的点:

       Platform.putByte(null, data + rowId, value);
       Platform.putInt(null, data + 4L * rowId, value)
      

      也就是说 无论是put什么 里面的第一个参数是为null,这个其实在Unsafed方法 putInt(Object o, long offset, int x)类中有提到

       Fetches a value from a given Java variable. More specifically, fetches a field or array element within the given object o at the given offset, or (if o is null) from the memory address whose numerical value is the given offset.
      

      也就是说如果传入的第一个参数为null,则会以offset作为地址,而在OffHeapColumnVector中对应的put当法中涉及到的offset就是data这个变量会在
      OffHeapColumnVector构造函数中的reserveInternal方法中赋值,这其中涉及到unsafe.allocateMemory方法会返回分配的内存地址

    • 具体迭代获取InternalRow
      这里的迭代获取主要是通过 vectorizedReader.getCurrentValue方法实现的,也就是会返回columnarBatch,但是这里的columnarBatch赋值是通过
      vectorizedReader.nextKeyValue方法实现的,该方法会被RecordReaderIterator.hasNext调用,vectorizedReader.nextKeyValue的数据流如下:

       VectorizedParquetRecordReader.nextBatch
           ||
           \/
          checkEndOfRowGroup    =>                               初始化  PageReadStore pages = reader.readNextRowGroup(); 
           ||                                                                         ||
           \/                                                                         \/
          columnReader.readBatch(num, leafCv.getValueVector()   initColumnReader(pages, cv); // columnVectors 设置ParquetColumnVector 里面包括了rowgroup里的所有page
           ||
           \/
          readPage
           ||
           \/
          pageReader.readPage()
           ||
           \/
          decompressor.decompress //  之类会进行解压
      
           
      

      decompressor.decompressdecompressorChunk.readAllPagesdescriptor.metadata.getCodec()传进来的,也就是从元数据里面读取的
      具体的向量化的读取,细节比较多,包括批量读取definition levelsrepetition levels等,这些读者自行分析

      注意:为什么 FileSourceScanExec中inputRDDs返回的类型是RDD[InternalRow] ,而vectorizedReader.getCurrentValue返回的类型是ColumnarBatch 也能运行,那是因为 我们在运行的时候,会有ColumnarToRow,他最终调用的是FileSourceScanExec.doExecuteColumnar,如下图:
      在这里插入图片描述

jvm会对Iterator[InternalRow]进行类型擦除,也就是说所有Iterator[InternalRow]在编译的时候会编译成Iterator[Object],会在运行时获取真正的类型

`FileScanRDD` 中的`compute方法` 最后有个
```
    iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.

```

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/429677.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据删除

目录 数据删除 删除员工编号为 7369 的员工信息 删除若干个数据 删除公司中工资最高的员工 Oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 数据删除 删除数据就是指删除不再需要的数据 delete from 表名称 [where 删…

Java架构之路-架构应全面了解的技术栈和工作域

有时候我在想这么简单简单的东西&#xff0c;怎么那么难以贯通。比如作为一个架构师可能涉及的不单单是技术架构&#xff0c;还包含了项目管理&#xff0c;一套完整的技术架构也就那么几个技术栈&#xff0c;只要花点心思&#xff0c;不断的往里面憨实&#xff0c;总会学的会&a…

huggingface学习|controlnet实战:云服务器使用StableDiffusionControlNetPipeline生成图像

ControlNet核心基础知识 文章目录 一、环境配置和安装需要使用的库二、准备数据及相关模型三、参照样例编写代码&#xff08;一&#xff09;导入相关库&#xff08;二&#xff09;准备数据&#xff08;以知名画作《戴珍珠耳环的少女》为例&#xff09;&#xff08;三&#xff0…

C++惯用法之RAII思想: 资源管理

C编程技巧专栏&#xff1a;http://t.csdnimg.cn/eolY7 目录 1.概述 2.RAII的应用 2.1.智能指针 2.2.文件句柄管理 2.3.互斥锁 3.注意事项 3.1.禁止复制 3.2.对底层资源使用引用计数法 3.3.复制底部资源(深拷贝)或者转移资源管理权(移动语义) 4.RAII的优势和挑战 5.总…

制造业数字化赋能:1核心2关键3层面4方向

随着科技的飞速发展&#xff0c;制造业正站在数字化转型的风口浪尖。数字化转型不仅关乎企业效率与利润&#xff0c;更决定了制造业在全球竞争中的地位。那么&#xff0c;在这场波澜壮阔的数字化浪潮中&#xff0c;制造业如何抓住机遇&#xff0c;乘风破浪&#xff1f;本文将从…

Maven终端命令生成Spring-boot项目并输出“helloworld“

1. 生成项目 mvn archetype:generate填写groupId和artifactId&#xff0c;其余默认即可 2. 修改pom.xml文件 将如下内容放入pom.xml文件内 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artif…

计网面试题整理上

1. 计算机网络的各层协议及作用&#xff1f; 计算机网络体系可以大致分为一下三种&#xff0c;OSI七层模型、TCP/IP四层模型和五层模型。 OSI七层模型&#xff1a;大而全&#xff0c;但是比较复杂、而且是先有了理论模型&#xff0c;没有实际应用。TCP/IP四层模型&#xff1a…

Git入门学习笔记

Git 是一个非常强大的分布式版本控制工具&#xff01; 在下载好Git之后&#xff0c;鼠标右击就可以显示 Git Bash 和 Git GUI&#xff0c;Git Bash 就像是在电脑上安装了一个小型的 Linux 系统&#xff01; 1. 打开 Git Bash 2. 设置用户信息&#xff08;这是非常重要的&…

使用GRU进行天气变化的时间序列预测

本文基于最适合入门的100个深度学习项目的学习记录&#xff0c;同时在Google clolab上面是实现&#xff0c;文末有资源连接 天气变化的时间序列的难点 天气变化的时间序列预测涉及到了一系列复杂的挑战&#xff0c;主要是因为天气系统的高度动态性和非线性特征。以下是几个主…

(十五)【Jmeter】取样器(Sampler)之HTTP请求

简述 操作路径如下: HTTP请求 (HTTP Sampler): 作用:模拟发送HTTP请求并获取响应。配置:设置URL、请求方法、请求参数等参数。使用场景:测试Web应用程序的HTTP接口性能。优点:支持多种HTTP方法和请求参数,适用于大多数Web应用程序测试。缺点:功能较为基础,对于复杂…

突发,Anthropic推出突破性Claude 3系列模型,性能超越GPT-4

&#x1f989; AI新闻 &#x1f680; 突发&#xff0c;Anthropic推出突破性Claude 3系列模型 摘要&#xff1a;人工智能创业公司Anthropic宣布推出其Claude 3系列大型语言模型&#xff0c;该系列包括Claude 3 Haiku、Claude 3 Sonnet和Claude 3 Opus三个子模型&#xff0c;旨…

第18章 Java I/O系统

第18章 Java I/O系统 18.1 File类 File类不仅仅指文件&#xff0c;还能代表一个目录下的一组文件。 18.1.1 目录列表器 public class Test {public static void main(String[] args) {File file new File("E:\\test");String[] strings file.list(new DirFilte…

安装Proxmox VE虚拟机平台

PVE是专业的虚拟机平台&#xff0c;可以利用它安装操作系统&#xff0c;如&#xff1a;Win、Linux、Mac、群晖等。 1. 下载镜像 访问PVE官网&#xff0c;下载最新的PVE镜像。 https://www.proxmox.com/en/downloads 2. 下载balenaEtcher balenaEtcher用于将镜像文件&#…

「滚雪球学Java」:多线程(章节汇总)

咦咦咦&#xff0c;各位小可爱&#xff0c;我是你们的好伙伴——bug菌&#xff0c;今天又来给大家普及Java SE相关知识点了&#xff0c;别躲起来啊&#xff0c;听我讲干货还不快点赞&#xff0c;赞多了我就有动力讲得更嗨啦&#xff01;所以呀&#xff0c;养成先点赞后阅读的好…

数据可视化原理-腾讯-3D网格热力图

在做数据分析类的产品功能设计时&#xff0c;经常用到可视化方式&#xff0c;挖掘数据价值&#xff0c;表达数据的内在规律与特征展示给客户。 可是作为一个产品经理&#xff0c;&#xff08;1&#xff09;如果不能够掌握各类可视化图形的含义&#xff0c;就不知道哪类数据该用…

带你全方位体验 Amazon Connect

1.前言 授权说明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在亚马逊云科技开发者社区、 知乎、自媒体平台、第三方开发者媒体等亚马逊云科技官方渠道。 近日亚马逊云科技在拉斯维加斯拉开了年度客户大会-亚马逊云科技 re:Invent 的序…

python网络爬虫教程笔记(1)

系列文章目录 文章目录 系列文章目录前言一、爬虫入门1.爬虫是什么&#xff1f;2.爬虫工作原理3.爬虫基本原理4.工作流程5.HTTP请求6.HTTP响应7.HTTP原理&#xff1a;证书传递、验证和数据加密、解密过程解析8.Urllib.request库的使用9.TCP3次握手&#xff0c;4次挥手过程 总结…

PHP+MySQL实现后台管理系统增删改查之够用就好

说明 最近要给博客弄个后台&#xff0c;不想搞得很复杂&#xff0c;有基本的增删改查就够了&#xff0c;到网上找了一圈发现这个不错&#xff0c;很实用&#xff0c;希望可以帮到大家&#xff0c;需要的朋友评论区留下邮箱&#xff0c;我安排发送。 演示效果 项目介绍 本项目…

【RISC-V 指令集】RISC-V 向量V扩展指令集介绍(一)-向量扩展编程模型

1. 引言 以下是《riscv-v-spec-1.0.pdf》文档的关键内容&#xff1a; 这是一份关于向量扩展的详细技术文档&#xff0c;内容覆盖了向量指令集的多个关键方面&#xff0c;如向量寄存器状态映射、向量指令格式、向量加载和存储操作、向量内存对齐约束、向量内存一致性模型、向量…

【2024.03.05】定时执行专家 V7.1 发布 - TimingExecutor V7.1 Release

目录 ▉ 软件介绍 ▉ 新版本 V7.1 下载地址 ▉ V7.1 新功能 ▼2024-03-03 V7.1 - 更新日志 ▉ V7.0 新UI设计 ▉ 软件介绍 《定时执行专家》是一款制作精良、功能强大、毫秒精度、专业级的定时任务执行软件。软件具有 25 种【任务类型】、12 种【触发器】触发方式&#x…