Skip to content

Commit

Permalink
feat: refactor HoodieBaseRelation and its subclass to introduce multi…
Browse files Browse the repository at this point in the history
…ple file-slice each task

1. refactor HoodieBaseRelation and its subclass to introduce multiple file-slice each task

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un committed Oct 21, 2024
1 parent 506f106 commit 89fa84b
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema)
with SparkAdapterSupport {

case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit
case class HoodieBaseFileSplit(fileSplit: PartitionedFile) extends HoodieFileSplit
case class HoodieBaseFilePartition(override val index: Int, filePartition: FilePartition, override val fileSplits: Seq[HoodieFileSplit] = Seq.empty)
extends HoodieFilePartition(index, fileSplits)

override type FileSplit = HoodieBaseFileSplit
override type Relation = BaseFileOnlyRelation
Expand All @@ -78,7 +80,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
this.copy(prunedDataSchema = Some(prunedSchema))

protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
protected override def composeRDD(partitions: Seq[HoodieFilePartition],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
Expand Down Expand Up @@ -107,13 +109,14 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
val projectedReader = projectReader(baseFileReader, requiredSchema.structTypeSchema)

// SPARK-37273 FileScanRDD constructor changed in SPARK 3.3
sparkAdapter.createHoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits.map(_.filePartition), requiredSchema.structTypeSchema)
.asInstanceOf[HoodieUnsafeRDD]
sparkAdapter.createHoodieFileScanRDD(sparkSession, projectedReader.apply,
partitions.map(partition => partition.asInstanceOf[HoodieBaseFilePartition].filePartition),
requiredSchema.structTypeSchema).asInstanceOf[HoodieUnsafeRDD]
}

protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)
val fileSplits = fileSlices.flatMap { fileSlice =>
fileSlices.flatMap { fileSlice =>
// TODO fix, currently assuming parquet as underlying format
val pathInfo: StoragePathInfo = fileSlice.getBaseFile.get.getPathInfo
HoodieDataSourceHelper.splitFiles(
Expand All @@ -124,12 +127,16 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
}
// NOTE: It's important to order the splits in the reverse order of their
// size so that we can subsequently bucket them in an efficient manner
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
.sortBy(_.length)(implicitly[Ordering[Long]].reverse).map(HoodieBaseFileSplit)
}

override def mergeSplitsToPartitions(splits: Seq[HoodieBaseFileSplit]): Seq[HoodieFilePartition] = {
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes

sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes)
.map(HoodieBaseFileSplit.apply)
sparkAdapter.getFilePartitions(sparkSession, splits.map(_.fileSplit), maxSplitBytes).zipWithIndex.map {
case (filePartition, index) =>
HoodieBaseFilePartition(index, filePartition)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.{Partition, SparkContext}

abstract class HoodieBaseRDD(@transient sc: SparkContext,
@transient partitions: Seq[HoodieFilePartition])
extends RDD[InternalRow](sc, Nil) {
override protected def getPartitions: Array[Partition] = partitions.toArray
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ import org.apache.hudi.io.storage.HoodieSparkIOFactory
import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.Partition
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand All @@ -75,6 +75,8 @@ import scala.util.{Failure, Success, Try}

trait HoodieFileSplit {}

case class HoodieFilePartition(index: Int, fileSplits: Seq[HoodieFileSplit]) extends Partition

case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None)

case class HoodieTableState(tablePath: String,
Expand Down Expand Up @@ -371,6 +373,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

val fileSplits = collectFileSplits(partitionFilters, dataFilters)

// use different strategy to combine splits to partitions
val partitions = mergeSplitsToPartitions(fileSplits)

val tableAvroSchemaStr = tableAvroSchema.toString

val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt)
Expand All @@ -379,7 +384,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, targetColumns, filters)
val rdd = composeRDD(partitions, tableSchema, requiredSchema, targetColumns, filters)

// Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
Expand All @@ -397,7 +402,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* @param filters data filters to be applied
* @return instance of RDD (holding [[InternalRow]]s)
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
protected def composeRDD(partitions: Seq[HoodieFilePartition],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
Expand All @@ -413,6 +418,13 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit]

protected def mergeSplitsToPartitions(splits: Seq[FileSplit]): Seq[HoodieFilePartition] = {
// default strategy is to map each split to a partition
splits.zipWithIndex.map { case (split, index) =>
HoodieFilePartition(index, Seq(split))
}
}

protected def listLatestFileSlices(globPaths: Seq[StoragePath], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
queryTimestamp match {
case Some(ts) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hudi.storage.StoragePath

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{Partition, SerializableWritable, TaskContext}
import org.apache.spark.SerializableWritable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
Expand All @@ -38,22 +38,19 @@ class HoodieBootstrapMORRDD(@transient spark: SparkSession,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
@transient splits: Seq[BaseHoodieBootstrapSplit])
@transient partitions: Seq[HoodieFilePartition])
extends HoodieBootstrapRDD(spark, bootstrapDataFileReader, bootstrapSkeletonFileReader,
regularFileReader, requiredSchema, splits) {
regularFileReader, requiredSchema, partitions) {

protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config))

private val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableWritable(config))

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
maybeLog(bootstrapPartition)
val bootstrapMORSplit = bootstrapPartition.split.asInstanceOf[HoodieBootstrapMORSplit]

override protected def splitToIter(fileSplit: HoodieFileSplit): Iterator[InternalRow] = {
val bootstrapMORSplit = fileSplit.asInstanceOf[HoodieBootstrapMORSplit]
if (bootstrapMORSplit.logFiles.isEmpty) {
//no log files, treat like regular bootstrap
getIterator(bootstrapPartition)
super.splitToIter(fileSplit)
} else {
bootstrapMORSplit.skeletonFile match {
case Some(skeletonFile) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
HoodieBootstrapMORSplit(dataFile, skeletonFile, fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList)
}

protected override def composeRDD(fileSplits: Seq[FileSplit],
protected override def composeRDD(partitions: Seq[HoodieFilePartition],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
Expand All @@ -97,7 +97,7 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
regularFileReader = regularFileReader,
tableSchema = tableSchema,
requiredSchema = requiredSchema,
tableState = tableState, fileSplits)
tableState = tableState, partitions)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.hudi

import org.apache.hudi.HoodieBaseRelation.BaseFileReader
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -34,8 +33,8 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
bootstrapSkeletonFileReader: BaseFileReader,
regularFileReader: BaseFileReader,
requiredSchema: HoodieTableSchema,
@transient splits: Seq[BaseHoodieBootstrapSplit])
extends RDD[InternalRow](spark.sparkContext, Nil) {
@transient partitions: Seq[HoodieFilePartition])
extends HoodieBaseRDD(spark.sparkContext, partitions) {


protected def getSkeletonIteratorSchema(dataFile: PartitionedFile, skeletonFile: PartitionedFile): (Iterator[InternalRow], StructType) = {
Expand Down Expand Up @@ -67,31 +66,44 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
iterator.map(unsafeProjection)
}

protected def maybeLog(bootstrapPartition: HoodieBootstrapPartition): Unit = {
protected def maybeLog(bootstrapPartition: HoodieFilePartition): Unit = {
if (log.isDebugEnabled) {
var msg = "Got Split => Index: " + bootstrapPartition.index + ", Data File: " +
bootstrapPartition.split.dataFile.filePath
if (bootstrapPartition.split.skeletonFile.isDefined) {
msg += ", Skeleton File: " + bootstrapPartition.split.skeletonFile.get.filePath
}
var msg = "Got Partition => Index: " + bootstrapPartition.index + ", Splits: "
bootstrapPartition.fileSplits.foreach(split => msg += logStrForSplit(split) + ", ")
logDebug(msg)
}
}

protected def getIterator(bootstrapPartition: HoodieBootstrapPartition): Iterator[InternalRow] = {
bootstrapPartition.split.skeletonFile match {
private def logStrForSplit(split: HoodieFileSplit): String = {
val bootstrapSplit = split.asInstanceOf[BaseHoodieBootstrapSplit]
var msg = "Data File: " + bootstrapSplit.dataFile.filePath
if (bootstrapSplit.skeletonFile.isDefined) {
msg += ", Skeleton File: " + bootstrapSplit.skeletonFile.get.filePath
} else {
msg += ", No Skeleton File"
}
msg
}

private def getIterator(bootstrapPartition: HoodieFilePartition): Iterator[InternalRow] = {
bootstrapPartition.fileSplits.iterator.flatMap((split) => splitToIter(split.asInstanceOf[HoodieBootstrapSplit]))
}

protected def splitToIter(split: HoodieFileSplit): Iterator[InternalRow] = {
val bootstrapSplit = split.asInstanceOf[BaseHoodieBootstrapSplit]
bootstrapSplit.skeletonFile match {
case Some(skeletonFile) =>
// It is a bootstrap split. Check both skeleton and data files.
val (iterator, schema) = getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile)
val (iterator, schema) = getSkeletonIteratorSchema(bootstrapSplit.dataFile, skeletonFile)
unsafeProjectIterator(iterator, schema)
case _ =>
// NOTE: Regular file-reader is already projected into the required schema
regularFileReader.read(bootstrapPartition.split.dataFile)
regularFileReader.read(bootstrapSplit.dataFile)
}
}

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
override def compute(partition: Partition, context: TaskContext): Iterator[InternalRow] = {
val bootstrapPartition = partition.asInstanceOf[HoodieFilePartition]
maybeLog(bootstrapPartition)
getIterator(bootstrapPartition)
}
Expand All @@ -112,18 +124,4 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
}
}

override protected def getPartitions: Array[Partition] = {
splits.zipWithIndex.map(file => {
if (file._1.skeletonFile.isDefined) {
logDebug("Forming partition with => Index: " + file._2 + ", Files: " + file._1.dataFile.filePath
+ "," + file._1.skeletonFile.get.filePath)
HoodieBootstrapPartition(file._2, file._1)
} else {
logDebug("Forming partition with => Index: " + file._2 + ", File: " + file._1.dataFile.filePath)
HoodieBootstrapPartition(file._2, file._1)
}
}).toArray
}
}

case class HoodieBootstrapPartition(index: Int, split: BaseHoodieBootstrapSplit) extends Partition
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext,
(bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader)
}

protected override def composeRDD(fileSplits: Seq[FileSplit],
protected override def composeRDD(partitions: Seq[HoodieFilePartition],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
Expand All @@ -156,7 +156,7 @@ abstract class BaseHoodieBootstrapRelation(override val sqlContext: SQLContext,
val (bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader) = getFileReaders(tableSchema,
requiredSchema, requestedColumns, filters)
new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader,
requiredSchema, fileSplits)
requiredSchema, partitions)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}

import java.io.Closeable
import java.util.function.Predicate

case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition
case class HoodieMergeOnReadPartition(index: Int, splits: Seq[HoodieMergeOnReadFileSplit]) extends Partition

/**
* Class holding base-file readers for 3 different use-cases:
Expand Down Expand Up @@ -78,19 +77,25 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
mergeType: String,
@transient fileSplits: Seq[HoodieMergeOnReadFileSplit],
@transient partitions: Seq[HoodieFilePartition],
includeStartTime: Boolean = false,
startTimestamp: String = null,
endTimestamp: String = null)
extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD {
extends HoodieBaseRDD(sc, partitions) with HoodieUnsafeRDD {

protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config))

private val hadoopConfBroadcast = sc.broadcast(new SerializableWritable(config))

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = partition.split match {
partition.splits.zipWithIndex.flatMap { case (split, splitIndex) =>
splitToIter(partition.index, splitIndex, split)
}.toIterator
}

private def splitToIter(partitionIndex: Int, splitIndex: Int, split: HoodieMergeOnReadFileSplit): Iterator[InternalRow] = {
val iter = split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema)
projectedReader(dataFileOnlySplit.dataFile.get)
Expand All @@ -112,10 +117,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}

case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
s"file path: ${partition.split.dataFile.get.filePath}" +
s"log paths: ${partition.split.logFiles.toString}" +
s"file path: ${split.dataFile.get.filePath}" +
s"log paths: ${split.logFiles.toString}" +
s"hoodie table path: ${tableState.tablePath}" +
s"spark partition Index: ${partition.index}" +
s"spark partition Index: ${partitionIndex}" +
s"split index in partition: ${splitIndex}" +
s"merge type: ${mergeType}")
}

Expand Down Expand Up @@ -169,9 +175,6 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}

override protected def getPartitions: Array[Partition] =
fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray

private def getHadoopConf: Configuration = {
val conf = hadoopConfBroadcast.value.value
// TODO clean up, this lock is unnecessary
Expand Down
Loading

0 comments on commit 89fa84b

Please sign in to comment.