Skip to content

Commit

Permalink
feat: regard mor snapshot query with all base-file-only table as mor …
Browse files Browse the repository at this point in the history
…read-optimized query

1. regard mor snapshot query with all base-file-only table as mor read-optimized query

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un committed Oct 16, 2024
1 parent 506f106 commit be6673e
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,16 @@ protected Map<PartitionPath, List<FileSlice>> getAllInputFileSlices() {
return cachedAllInputFileSlices;
}

/**
* Returns true if all file-slices only contain base file (no log files)
*/
protected boolean isAllInputFileSlicesBaseFileOnly() {
Map<PartitionPath, List<FileSlice>> allInputFileSlices = getAllInputFileSlices();
return !allInputFileSlices.isEmpty() && allInputFileSlices.values().stream()
.flatMap(Collection::stream)
.allMatch(FileSlice::isBaseFileOnly);
}

/**
* Get input file slice for the given partition. Will use cache directly if it is computed before.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ public boolean isEmpty() {
return (baseFile == null) && (logFiles.isEmpty());
}

/**
* Returns true if there is a base file and no log files.
*/
public boolean isBaseFileOnly() {
return (baseFile != null) && (logFiles.isEmpty());
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("FileSlice {");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ object DataSourceReadOptions {

val INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT: ConfigProperty[String] = HoodieCommonConfig.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT

val ENABLE_OPTIMIZED_READ_FOR_MOR_WITH_ALL_BASE_FILE_ONLY_SLICE: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.datasource.read.optimized.mor.with.all.base.file.only.slice.enable")
.defaultValue(true)
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Enables optimized read for MOR table whose file-slices only contains base-file. When set to true, " +
"it will regard query as read_optimized query for table which match above conditions.")

/** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,12 @@ object DefaultSource {
val isCdcQuery = queryType == QUERY_TYPE_INCREMENTAL_OPT_VAL &&
parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL)

log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")
val isReadOptimizedForMor = parameters.get(ENABLE_OPTIMIZED_READ_FOR_MOR_WITH_ALL_BASE_FILE_ONLY_SLICE.key()) match {
case Some(value) => value.toBoolean
case None => ENABLE_OPTIMIZED_READ_FOR_MOR_WITH_ALL_BASE_FILE_ONLY_SLICE.defaultValue()
}

log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType, isCdcQuery is: $isCdcQuery, isReadOptimizedForMor is: $isReadOptimizedForMor")

// NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain
// Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that
Expand Down Expand Up @@ -288,7 +293,11 @@ object DefaultSource {

case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
if (newHudiFileFormatUtils.isEmpty) {
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)
if (isReadOptimizedForMor) {
resolveRelationForMorSnapshotQuery(sqlContext, globPaths, userSchema, metaClient, parameters)
} else {
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)
}
} else {
newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap = false)
}
Expand Down Expand Up @@ -354,6 +363,29 @@ object DefaultSource {
}
}

private def resolveRelationForMorSnapshotQuery(sqlContext: SQLContext,
globPaths: Seq[StoragePath],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
optParams: Map[String, String]): BaseRelation = {
val snapshotRelation = new MergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, globPaths, userSchema)
// check whether all the file-slices only contain base files
val relation = snapshotRelation.considerConvertToBaseFileOnlyRelation

// NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of
// [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/
// vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]].
//
// You can check out HUDI-3896 for more details
val result = if (relation.isInstanceOf[BaseFileOnlyRelation] && !relation.asInstanceOf[BaseFileOnlyRelation].hasSchemaOnRead) {
relation.asInstanceOf[BaseFileOnlyRelation].toHadoopFsRelation
} else {
relation
}
log.info("Resolved MergeOnReadSnapshotRelation for MOR snapshot query, relation is: " + result)
result
}

private def resolveSchema(metaClient: HoodieTableMetaClient,
parameters: Map[String, String],
schema: Option[StructType]): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ case class HoodieFileIndex(spark: SparkSession,
}).toSeq
}


def isBaseFileOnly: Boolean = !includeLogFiles || isAllInputFileSlicesBaseFileOnly

/**
* Invoked by Spark to fetch list of latest base files per partition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import org.apache.hudi.MergeOnReadSnapshotRelation.{createPartitionedFile, isPro
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.exception.HoodieNotSupportedException
import org.apache.hudi.storage.StoragePath

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -100,6 +100,15 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext,
protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_MERGE.defaultValue)

def considerConvertToBaseFileOnlyRelation: BaseRelation = {
if (fileIndex.isBaseFileOnly) {
// TODO: move FileIndex's reference to BaseFileOnlyRelation to avoid FileIndex re-initialization
BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths)
} else {
this
}
}

/**
* Determines whether relation's schema could be pruned by Spark's Optimizer
*/
Expand Down

0 comments on commit be6673e

Please sign in to comment.