背景
最近在整理了一下 spark对Parquet的写文件的过程,也是为了更好的理解和调优Spark相关的任务,
因为对于Spark来说,任何一个事情都不是独立的存在的,比如说parquet文件的rowgroup设置的大小对读写的影响,以及parquet写之前排序对读parquet的影响,以及向量化读取等等
本文基于Spark 3.5
分析
我们以FileSourceScanExec的doExecute方法
为切口进行分析:
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
if (needsUnsafeRowConversion) {
inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
val toUnsafe = UnsafeProjection.create(schema)
toUnsafe.initialize(index)
iter.map { row =>
numOutputRows += 1
toUnsafe(row)
}
}
} else {
inputRDD.mapPartitionsInternal { iter =>
iter.map { row =>
numOutputRows += 1
row
}
}
}
}
这里的needsUnsafeRowConversion
判断如果是ParquetSource
,且配置了spark.sql.parquet.enableVectorizedReader
为‘true’(默认就是true),则会进行unsafeRow的转换,当然这里的好处就是节约内存以及能够减少GC
对于inputRDD
来说,就是创建了读取parquet
的RDD:
具体的见:ParquetFileFormat.buildReaderWithPartitionValues
方法,涉及到的代码多,所以只解读关键的几个部分:
- fileRooter的读取
val fileFooter = if (enableVectorizedReader) { // When there are vectorized reads, we can avoid reading the footer twice by reading // all row groups in advance and filter row groups according to filters that require // push down (no need to read the footer metadata again). ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS) } else { ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS) }
这里enableVectorizedReader
如果是true的话, fileFooter 只会得到所属Task的FileMetaData信息,其中只包括了所属Task的需要读取的parquet RowGroups,具体的数据流如下:
ParquetFooterReader.readFooter
||
\/
readFooter
||
\/
fileReader.getFooter
||
\/
readFooter(file, options, f, converter)
||
\/
converter.readParquetMetadata
||
\/
filter.accept(new MetadataFilterVisitor)
filter.accept(new MetadataFilterVisitor
就会根据对应的filter
类型进行不同的操作:
FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
// We must generate the map *before* filtering because it modifies `fileMetadata`.
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);
}
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
// We must generate the map *before* filtering because it modifies `fileMetadata`.
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);
}
});
-
如果是 ParquetFooterReader.SKIP_ROW_GROUPS ,则是走的
SkipMetadataFilter
这条filter,则只会拿出rowgroup的信息和rowgrups的的行数 -
如果是 enableVectorizedReader,也就是会走
RangeMetadataFilter
这个Filter,则会调用filterFileMetaDataByMidpoint
,该方法会根据Task分配的数据是否覆盖了Rowgroups的中点来纳入到该task的读取的数据中来,具体的可以见:Spark-读取Parquet-为什么task数量会多于Row Group的数量 -
vectorizedReader的创建
vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { vectorizedReader.enableReturningBatches() } // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. iter.asInstanceOf[Iterator[InternalRow]]
-
vectorizedReader.initialize
重要的点是这个主要是涉及到parquet messageType到 ParquetColumn的转换
,主要是ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration)这个的配置
-
vectorizedReader.initBatch
这里面主要涉及到了根据memMode
为OFF_HEAP
还是ON_HEAP
模式来构造不同的ColumnVector
,其中
如果是ON_HEAP
,则会创建OnHeapColumnVector
,用jvm数据的形式存储
如果是OFF_HEAP
,则会创建OffHeapColumnVector
,这里涉及到的对象都是都是用unsafe api来操作,这里涉及到一个有意思的点:Platform.putByte(null, data + rowId, value); Platform.putInt(null, data + 4L * rowId, value)
也就是说 无论是put什么 里面的第一个参数是为
null
,这个其实在Unsafed方法 putInt(Object o, long offset, int x)
类中有提到Fetches a value from a given Java variable. More specifically, fetches a field or array element within the given object o at the given offset, or (if o is null) from the memory address whose numerical value is the given offset.
也就是说如果传入的第一个参数为
null
,则会以offset
作为地址,而在OffHeapColumnVector中对应的put当法中涉及到的offset
就是data
这个变量会在
OffHeapColumnVector构造函数中的reserveInternal方法中赋值
,这其中涉及到unsafe.allocateMemory
方法会返回分配的内存地址
-
具体迭代获取
InternalRow
这里的迭代获取主要是通过vectorizedReader.getCurrentValue
方法实现的,也就是会返回columnarBatch
,但是这里的columnarBatch赋值
是通过
vectorizedReader.nextKeyValue
方法实现的,该方法会被RecordReaderIterator.hasNext
调用,vectorizedReader.nextKeyValue
的数据流如下:VectorizedParquetRecordReader.nextBatch || \/ checkEndOfRowGroup => 初始化 PageReadStore pages = reader.readNextRowGroup(); || || \/ \/ columnReader.readBatch(num, leafCv.getValueVector() initColumnReader(pages, cv); // columnVectors 设置ParquetColumnVector 里面包括了rowgroup里的所有page || \/ readPage || \/ pageReader.readPage() || \/ decompressor.decompress // 之类会进行解压
decompressor.decompress
中decompressor
是Chunk.readAllPages
中descriptor.metadata.getCodec()
传进来的,也就是从元数据里面读取的
具体的向量化的读取,细节比较多,包括批量读取definition levels
和repetition levels
等,这些读者自行分析注意:为什么
FileSourceScanExec中inputRDDs
返回的类型是RDD[InternalRow]
,而vectorizedReader.getCurrentValue
返回的类型是ColumnarBatch
也能运行,那是因为 我们在运行的时候,会有ColumnarToRow
,他最终调用的是FileSourceScanExec.doExecuteColumnar
,如下图:
-
且jvm
会对Iterator[InternalRow]
进行类型擦除,也就是说所有Iterator[InternalRow]在编译的时候会编译成Iterator[Object],会在运行时获取真正的类型
`FileScanRDD` 中的`compute方法` 最后有个
```
iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.
```