diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 636eb1faa408..55509f8d26d8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -229,6 +229,16 @@ protected Map> getAllInputFileSlices() { return cachedAllInputFileSlices; } + /** + * Returns true if all file-slices only contain base file (no log files) + */ + protected boolean isAllInputFileSlicesBaseFileOnly() { + Map> allInputFileSlices = getAllInputFileSlices(); + return !allInputFileSlices.isEmpty() && allInputFileSlices.values().stream() + .flatMap(Collection::stream) + .allMatch(FileSlice::isBaseFileOnly); + } + /** * Get input file slice for the given partition. Will use cache directly if it is computed before. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java index 3f0fcf941567..1aefedc11a15 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java @@ -131,6 +131,13 @@ public boolean isEmpty() { return (baseFile == null) && (logFiles.isEmpty()); } + /** + * Returns true if there is a base file and no log files. + */ + public boolean isBaseFileOnly() { + return (baseFile != null) && (logFiles.isEmpty()); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("FileSlice {"); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 45134f91278f..6567d2faf879 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -220,6 +220,14 @@ object DataSourceReadOptions { val INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT: ConfigProperty[String] = HoodieCommonConfig.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT + val ENABLE_OPTIMIZED_READ_FOR_MOR_WITH_ALL_BASE_FILE_ONLY_SLICE: ConfigProperty[Boolean] = ConfigProperty + .key("hoodie.datasource.read.optimized.mor.with.all.base.file.only.slice.enable") + .defaultValue(true) + .markAdvanced() + .sinceVersion("0.14.0") + .withDocumentation("Enables optimized read for MOR table whose file-slices only contains base-file. When set to true, " + + "it will regard query as read_optimized query for table which match above conditions.") + /** @deprecated Use {@link QUERY_TYPE} and its methods instead */ @Deprecated val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 3a942285f097..bb679df1a3a2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -250,7 +250,12 @@ object DefaultSource { val isCdcQuery = queryType == QUERY_TYPE_INCREMENTAL_OPT_VAL && parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL) - log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") + val isReadOptimizedForMor = parameters.get(ENABLE_OPTIMIZED_READ_FOR_MOR_WITH_ALL_BASE_FILE_ONLY_SLICE.key()) match { + case Some(value) => value.toBoolean + case None => ENABLE_OPTIMIZED_READ_FOR_MOR_WITH_ALL_BASE_FILE_ONLY_SLICE.defaultValue() + } + + log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType, isCdcQuery is: $isCdcQuery, isReadOptimizedForMor is: $isReadOptimizedForMor") // NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain // Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that @@ -288,7 +293,11 @@ object DefaultSource { case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => if (newHudiFileFormatUtils.isEmpty) { - new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema) + if (isReadOptimizedForMor) { + resolveRelationForMorSnapshotQuery(sqlContext, globPaths, userSchema, metaClient, parameters) + } else { + new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema) + } } else { newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap = false) } @@ -354,6 +363,29 @@ object DefaultSource { } } + private def resolveRelationForMorSnapshotQuery(sqlContext: SQLContext, + globPaths: Seq[StoragePath], + userSchema: Option[StructType], + metaClient: HoodieTableMetaClient, + optParams: Map[String, String]): BaseRelation = { + val snapshotRelation = new MergeOnReadSnapshotRelation(sqlContext, optParams, metaClient, globPaths, userSchema) + // check whether all the file-slices only contain base files + val relation = snapshotRelation.considerConvertToBaseFileOnlyRelation + + // NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of + // [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/ + // vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]]. + // + // You can check out HUDI-3896 for more details + val result = if (relation.isInstanceOf[BaseFileOnlyRelation] && !relation.asInstanceOf[BaseFileOnlyRelation].hasSchemaOnRead) { + relation.asInstanceOf[BaseFileOnlyRelation].toHadoopFsRelation + } else { + relation + } + log.info("Resolved MergeOnReadSnapshotRelation for MOR snapshot query, relation is: " + result) + result + } + private def resolveSchema(metaClient: HoodieTableMetaClient, parameters: Map[String, String], schema: Option[StructType]): StructType = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index edd08fe5d6c0..1ad292855480 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -148,6 +148,9 @@ case class HoodieFileIndex(spark: SparkSession, }).toSeq } + + def isBaseFileOnly: Boolean = !includeLogFiles || isAllInputFileSlicesBaseFileOnly + /** * Invoked by Spark to fetch list of latest base files per partition. * diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 5b6be9c55857..7cd5b297842d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -25,14 +25,14 @@ import org.apache.hudi.MergeOnReadSnapshotRelation.{createPartitionedFile, isPro import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.exception.HoodieNotSupportedException import org.apache.hudi.storage.StoragePath - import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType import scala.collection.JavaConverters._ @@ -100,6 +100,15 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key, DataSourceReadOptions.REALTIME_MERGE.defaultValue) + def considerConvertToBaseFileOnlyRelation: BaseRelation = { + if (fileIndex.isBaseFileOnly) { + // TODO: move FileIndex's reference to BaseFileOnlyRelation to avoid FileIndex re-initialization + BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths) + } else { + this + } + } + /** * Determines whether relation's schema could be pruned by Spark's Optimizer */