From f9dd9545a5eddfe43ce27c02abea638b06429cd6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sun, 20 Oct 2024 18:23:09 -0700 Subject: [PATCH] wip: have cloud event source use QueryContext --- .../table/timeline/HoodieDefaultTimeline.java | 33 ++++ .../common/table/timeline/HoodieTimeline.java | 25 +++ .../sources/GcsEventsHoodieIncrSource.java | 32 ++-- .../utilities/sources/HoodieIncrSource.java | 6 +- .../sources/S3EventsHoodieIncrSource.java | 26 ++- .../sources/SnapshotLoadQuerySplitter.java | 21 +-- .../sources/helpers/CloudDataFetcher.java | 12 +- .../helpers/CloudObjectsSelectorCommon.java | 51 +++++ .../sources/helpers/IncrSourceHelper.java | 176 ++++++++++-------- .../utilities/sources/helpers/QueryInfo.java | 1 + .../sources/helpers/QueryRunner.java | 76 +++++--- .../TestGcsEventsHoodieIncrSource.java | 4 +- .../sources/TestS3EventsHoodieIncrSource.java | 4 +- .../sources/helpers/TestIncrSourceHelper.java | 110 +++++++---- 14 files changed, 379 insertions(+), 198 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index c2f570fd10dd..6968740402e9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -286,6 +286,19 @@ public HoodieTimeline findInstantsAfter(String instantTime) { .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details); } + @Override + public HoodieTimeline findInstantsAfterByCompletionTime(String completionTime, int numCommits) { + return new HoodieDefaultTimeline(getInstantsOrderedByCompletionTime() + .filter(s -> compareTimestamps(s.getCompletionTime(), GREATER_THAN, completionTime)) + .limit(numCommits), details); + } + + @Override + public HoodieTimeline findInstantsAfterByCompletionTime(String completionTime) { + return new HoodieDefaultTimeline(getInstantsOrderedByCompletionTime() + .filter(s -> compareTimestamps(s.getCompletionTime(), GREATER_THAN, completionTime)), details); + } + @Override public HoodieDefaultTimeline findInstantsAfterOrEquals(String commitTime, int numCommits) { return new HoodieDefaultTimeline(getInstantsAsStream() @@ -299,6 +312,19 @@ public HoodieTimeline findInstantsAfterOrEquals(String commitTime) { .filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, commitTime)), details); } + @Override + public HoodieDefaultTimeline findInstantsAfterOrEqualsByCompletionTime(String completionTime, int numCommits) { + return new HoodieDefaultTimeline(getInstantsOrderedByCompletionTime() + .filter(s -> compareTimestamps(s.getCompletionTime(), GREATER_THAN_OR_EQUALS, completionTime)) + .limit(numCommits), details); + } + + @Override + public HoodieDefaultTimeline findInstantsAfterOrEqualsByCompletionTime(String completionTime) { + return new HoodieDefaultTimeline(getInstantsOrderedByCompletionTime() + .filter(s -> compareTimestamps(s.getCompletionTime(), GREATER_THAN_OR_EQUALS, completionTime)), details); + } + @Override public HoodieDefaultTimeline findInstantsBefore(String instantTime) { return new HoodieDefaultTimeline(getInstantsAsStream() @@ -313,6 +339,13 @@ public Option findInstantBefore(String instantTime) { .max(Comparator.comparing(HoodieInstant::getTimestamp))); } + @Override + public Option findInstantBeforeByCompletionTime(String completionTime) { + return Option.fromJavaOptional(getInstantsOrderedByCompletionTime() + .filter(instant -> compareTimestamps(instant.getCompletionTime(), LESSER_THAN, completionTime)) + .max(Comparator.comparing(HoodieInstant::getCompletionTime))); + } + @Override public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) { return new HoodieDefaultTimeline(getInstantsAsStream() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 3b45510d1314..dceda7e0457b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -260,6 +260,16 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline findInstantsAfterOrEquals(String commitTime); + /** + * Create a new Timeline with all the instants completed after specified completion time. + */ + HoodieTimeline findInstantsAfterOrEqualsByCompletionTime(String completionTime, int numCommits); + + /** + * Create a new Timeline with all the instants completed after specified completion time. + */ + HoodieTimeline findInstantsAfterOrEqualsByCompletionTime(String completionTime); + /** * Create a new Timeline with instants after startTs and before or on endTs. */ @@ -291,6 +301,16 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline findInstantsAfter(String instantTime); + /** + * Create a new Timeline with all the instants completed after specified completion time. + */ + HoodieTimeline findInstantsAfterByCompletionTime(String completionTime); + + /** + * Create a new Timeline with all the instants completed after specified completion time. + */ + HoodieTimeline findInstantsAfterByCompletionTime(String completionTime, int numCommits); + /** * Create a new Timeline with all instants before specified time. */ @@ -301,6 +321,11 @@ public interface HoodieTimeline extends Serializable { */ Option findInstantBefore(String instantTime); + /** + * Find the last completed instant before specified completion time + */ + Option findInstantBeforeByCompletionTime(String completionTime); + /** * Create new timeline with all instants before or equals specified time. */ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java index 7ab8894b315b..d44d7974d051 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -19,17 +19,16 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; +import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer; +import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint; -import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; +import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy; -import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.StreamContext; @@ -52,8 +51,6 @@ import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH; import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.Type.GCS; -import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo; -import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy; /** @@ -165,22 +162,23 @@ public GcsEventsHoodieIncrSource( @Override public Pair>, String> fetchNextBatch(Option lastCheckpoint, long sourceLimit) { CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = CloudObjectIncrCheckpoint.fromString(lastCheckpoint); - HollowCommitHandling handlingMode = getHollowCommitHandleMode(props); - QueryInfo queryInfo = generateQueryInfo( + IncrementalQueryAnalyzer analyzer = + IncrSourceHelper.generateQueryInfo( sparkContext, srcPath, numInstantsPerFetch, Option.of(cloudObjectIncrCheckpoint.getCommit()), - missingCheckpointStrategy, handlingMode, HoodieRecord.COMMIT_TIME_METADATA_FIELD, - CloudObjectsSelectorCommon.GCS_OBJECT_KEY, - CloudObjectsSelectorCommon.GCS_OBJECT_SIZE, true, + missingCheckpointStrategy, true, Option.ofNullable(cloudObjectIncrCheckpoint.getKey())); - LOG.info("Querying GCS with:" + cloudObjectIncrCheckpoint + " and queryInfo:" + queryInfo); + QueryContext queryContext = analyzer.analyze(); + LOG.info("Querying GCS with:" + cloudObjectIncrCheckpoint + " and queryInfo:" + analyzer); - if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && queryInfo.areStartAndEndInstantsEqual()) { - LOG.info("Source of file names is empty. Returning empty result and endInstant: " - + queryInfo.getStartInstant()); - return Pair.of(Option.empty(), queryInfo.getStartInstant()); + if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && queryContext.isEmpty()) { + LOG.info("Source of file names is empty. Returning empty result and lastCheckpoint: " + + lastCheckpoint.orElse(null)); + return Pair.of(Option.empty(), lastCheckpoint.orElse(null)); } - return cloudDataFetcher.fetchPartitionedSource(GCS, cloudObjectIncrCheckpoint, this.sourceProfileSupplier, queryRunner.run(queryInfo, snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit); + return cloudDataFetcher.fetchPartitionedSource( + GCS, cloudObjectIncrCheckpoint, this.sourceProfileSupplier, + queryRunner.run(queryContext, snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index ecad4c1875c5..756b3be16452 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -227,11 +227,11 @@ public Pair>, String> fetchNextBatch(Option lastCkpt Option newCheckpointAndPredicate = snapshotLoadQuerySplitter.get().getNextCheckpointWithPredicates(snapshot, queryContext); if (newCheckpointAndPredicate.isPresent()) { - endCompletionTime = newCheckpointAndPredicate.get().endCompletionTime; - predicate = Option.of(newCheckpointAndPredicate.get().predicateFilter); + endCompletionTime = newCheckpointAndPredicate.get().getEndCompletionTime(); + predicate = Option.of(newCheckpointAndPredicate.get().getPredicateFilter()); instantTimeList = queryContext.getInstants().stream() .filter(instant -> HoodieTimeline.compareTimestamps( - instant.getCompletionTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, newCheckpointAndPredicate.get().endCompletionTime)) + instant.getCompletionTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, newCheckpointAndPredicate.get().getEndCompletionTime())) .map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); } else { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index ab8c0a55bbd0..c67eca85a411 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -19,17 +19,15 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; +import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer; +import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint; -import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; -import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.StreamContext; @@ -50,7 +48,6 @@ import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH; import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.Type.S3; -import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy; /** @@ -110,22 +107,21 @@ public S3EventsHoodieIncrSource( @Override public Pair>, String> fetchNextBatch(Option lastCheckpoint, long sourceLimit) { CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = CloudObjectIncrCheckpoint.fromString(lastCheckpoint); - HollowCommitHandling handlingMode = getHollowCommitHandleMode(props); - QueryInfo queryInfo = + IncrementalQueryAnalyzer analyzer = IncrSourceHelper.generateQueryInfo( sparkContext, srcPath, numInstantsPerFetch, Option.of(cloudObjectIncrCheckpoint.getCommit()), - missingCheckpointStrategy, handlingMode, - HoodieRecord.COMMIT_TIME_METADATA_FIELD, - CloudObjectsSelectorCommon.S3_OBJECT_KEY, - CloudObjectsSelectorCommon.S3_OBJECT_SIZE, true, + missingCheckpointStrategy, true, Option.ofNullable(cloudObjectIncrCheckpoint.getKey())); - LOG.info("Querying S3 with:" + cloudObjectIncrCheckpoint + ", queryInfo:" + queryInfo); + QueryContext queryContext = analyzer.analyze(); + LOG.info("Querying S3 with:" + cloudObjectIncrCheckpoint + ", queryInfo:" + queryContext); - if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && queryInfo.areStartAndEndInstantsEqual()) { + if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && queryContext.isEmpty()) { LOG.warn("Already caught up. No new data to process"); - return Pair.of(Option.empty(), queryInfo.getEndInstant()); + return Pair.of(Option.empty(), lastCheckpoint.orElse(null)); } - return cloudDataFetcher.fetchPartitionedSource(S3, cloudObjectIncrCheckpoint, this.sourceProfileSupplier, queryRunner.run(queryInfo, snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit); + return cloudDataFetcher.fetchPartitionedSource( + S3, cloudObjectIncrCheckpoint, this.sourceProfileSupplier, + queryRunner.run(queryContext, snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java index 78262beed0be..abb7c471b07c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java @@ -24,8 +24,6 @@ import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.utilities.sources.helpers.QueryInfo; -import org.apache.hudi.utilities.streamer.SourceProfileSupplier; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -56,8 +54,8 @@ public static class Config { * Checkpoint returned for the SnapshotLoadQuerySplitter. */ public static class CheckpointWithPredicates { - String endCompletionTime; - String predicateFilter; + private String endCompletionTime; + private String predicateFilter; public CheckpointWithPredicates(String endCompletionTime, String predicateFilter) { this.endCompletionTime = endCompletionTime; @@ -91,21 +89,6 @@ public SnapshotLoadQuerySplitter(TypedProperties properties) { */ public abstract Option getNextCheckpointWithPredicates(Dataset df, QueryContext queryContext); - /** - * Retrieves the next checkpoint based on query information and a SourceProfileSupplier. - * - * @param df The dataset to process. - * @param queryInfo The query information object. - * @param sourceProfileSupplier An Option of a SourceProfileSupplier to use in load splitting implementation - * @return Updated query information with the next checkpoint, in case of empty checkpoint, - * returning endPoint same as queryInfo.getEndInstant(). - */ - @Deprecated - public QueryInfo getNextCheckpoint(Dataset df, QueryInfo queryInfo, Option sourceProfileSupplier) { - // TODO(HUDI-8354): fix related usage in the event incremental source - throw new UnsupportedOperationException("getNextCheckpoint is no longer supported with instant time."); - } - public static Option getInstance(TypedProperties props) { return props.getNonEmptyStringOpt(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, null) .map(className -> (SnapshotLoadQuerySplitter) ReflectionUtils.loadClass(className, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java index 5f14e73a0fd8..7765051543eb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudDataFetcher.java @@ -43,6 +43,7 @@ import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK; import static org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT; +import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.CloudDataColumnInfo.getCloudDataColumnInfo; /** * Connects to S3/GCS from Spark and downloads data from a given list of files. @@ -87,7 +88,7 @@ public Pair>, String> fetchPartitionedSource( CloudObjectsSelectorCommon.Type cloudType, CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint, Option sourceProfileSupplier, - Pair> queryInfoDatasetPair, + Pair> chkptDatasetPair, Option schemaProvider, long sourceLimit) { boolean isSourceProfileSupplierAvailable = sourceProfileSupplier.isPresent() && sourceProfileSupplier.get().getSourceProfile() != null; @@ -96,15 +97,16 @@ public Pair>, String> fetchPartitionedSource( sourceLimit = sourceProfileSupplier.get().getSourceProfile().getMaxSourceBytes(); } - QueryInfo queryInfo = queryInfoDatasetPair.getLeft(); + String endCheckpoint = chkptDatasetPair.getLeft(); String filter = CloudObjectsSelectorCommon.generateFilter(cloudType, props); LOG.info("Adding filter string to Dataset: " + filter); - Dataset filteredSourceData = queryInfoDatasetPair.getRight().filter(filter); + Dataset filteredSourceData = chkptDatasetPair.getRight().filter(filter); - LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); + LOG.info("Adjusting end checkpoint:" + endCheckpoint + " based on sourceLimit :" + sourceLimit); Pair>> checkPointAndDataset = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint); + filteredSourceData, sourceLimit, endCheckpoint, + cloudObjectIncrCheckpoint, getCloudDataColumnInfo(cloudType)); if (!checkPointAndDataset.getRight().isPresent()) { LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft()); return Pair.of(Option.empty(), checkPointAndDataset.getLeft().toString()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index 864bbe6f5527..7486830b6ac4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -21,6 +21,7 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; @@ -551,4 +552,54 @@ public enum Type { S3, GCS } + + public enum CloudDataColumnInfo { + S3( + CloudObjectsSelectorCommon.S3_OBJECT_KEY, + CloudObjectsSelectorCommon.S3_OBJECT_SIZE + ), + GCS( + CloudObjectsSelectorCommon.GCS_OBJECT_KEY, + CloudObjectsSelectorCommon.GCS_OBJECT_SIZE + ); + + // both S3 and GCS use COMMIT_TIME_METADATA_FIELD as order column + private final String orderColumn = HoodieRecord.COMMIT_TIME_METADATA_FIELD; + private final String keyColumn; + private final String limitColumn; + private final List orderByColumns; + + CloudDataColumnInfo(String keyColumn, String limitColumn) { + this.keyColumn = keyColumn; + this.limitColumn = limitColumn; + orderByColumns = Arrays.asList(orderColumn, keyColumn); + } + + public String getOrderColumn() { + return orderColumn; + } + + public String getKeyColumn() { + return keyColumn; + } + + public String getLimitColumn() { + return limitColumn; + } + + public List getOrderByColumns() { + return orderByColumns; + } + + public static CloudDataColumnInfo getCloudDataColumnInfo(CloudObjectsSelectorCommon.Type type) { + switch (type) { + case S3: + return S3; + case GCS: + return GCS; + default: + throw new IllegalArgumentException("Unsupported cloud data column type: " + type); + } + } + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index a7faeef72df0..470197a11d8f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -32,6 +32,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.sources.HoodieIncrSource; +import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.CloudDataColumnInfo; import org.apache.hudi.utilities.streamer.SourceProfile; import org.apache.spark.api.java.JavaSparkContext; @@ -44,11 +45,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.function.Function; - import static org.apache.hudi.DataSourceReadOptions.INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT; -import static org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.instantTimeMinusMillis; -import static org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded; +import static org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.START_COMMIT_EARLIEST; import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty; import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; @@ -82,89 +80,113 @@ public static HollowCommitHandling getHollowCommitHandleMode(TypedProperties pro * * @param jssc Java Spark Context * @param srcBasePath Base path of Hudi source table - * @param numInstantsPerFetch Max Instants per fetch + * @param numInstantsFromConfig Max Instants per fetch * @param beginInstant Last Checkpoint String * @param missingCheckpointStrategy when begin instant is missing, allow reading based on missing checkpoint strategy - * @param handlingMode Hollow Commit Handling Mode - * @param orderColumn Column to order by (used for size based incr source) - * @param keyColumn Key column (used for size based incr source) - * @param limitColumn Limit column (used for size based incr source) * @param sourceLimitBasedBatching When sourceLimit based batching is used, we need to fetch the current commit as well, * this flag is used to indicate that. * @param lastCheckpointKey Last checkpoint key (used in the upgrade code path) * @return begin and end instants along with query type and other information. */ - public static QueryInfo generateQueryInfo(JavaSparkContext jssc, String srcBasePath, - int numInstantsPerFetch, Option beginInstant, + public static IncrementalQueryAnalyzer generateQueryInfo(JavaSparkContext jssc, String srcBasePath, + int numInstantsFromConfig, Option beginInstant, MissingCheckpointStrategy missingCheckpointStrategy, - HollowCommitHandling handlingMode, - String orderColumn, String keyColumn, String limitColumn, boolean sourceLimitBasedBatching, Option lastCheckpointKey) { - ValidationUtils.checkArgument(numInstantsPerFetch > 0, + // TODO: checkpoint key: completionTime#key (previous: timestamp#key) + ValidationUtils.checkArgument(numInstantsFromConfig > 0, "Make sure the config hoodie.streamer.source.hoodieincr.num_instants is set to a positive value"); - HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder() + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration())) .setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build(); - HoodieTimeline completedCommitTimeline = srcMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); - final HoodieTimeline activeCommitTimeline = handleHollowCommitIfNeeded(completedCommitTimeline, srcMetaClient, handlingMode); - Function timestampForLastInstant = instant -> handlingMode == HollowCommitHandling.USE_TRANSITION_TIME - ? instant.getCompletionTime() : instant.getTimestamp(); - String beginInstantTime = beginInstant.orElseGet(() -> { - if (missingCheckpointStrategy != null) { - if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST) { - Option lastInstant = activeCommitTimeline.lastInstant(); - return lastInstant.map( - hoodieInstant -> instantTimeMinusMillis(timestampForLastInstant.apply(hoodieInstant), 1)) - .orElse(DEFAULT_START_TIMESTAMP); - } else { - return DEFAULT_START_TIMESTAMP; - } - } else { - throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest " - + "committed instant set hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy to a valid value"); + final HoodieTimeline completedCommitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + String startCompletionTime; + RangeType rangeType; + + if (beginInstant.isPresent() && !beginInstant.get().isEmpty()) { + startCompletionTime = beginInstant.get(); + rangeType = RangeType.OPEN_CLOSED; + } else if (missingCheckpointStrategy != null) { + rangeType = RangeType.CLOSED_CLOSED; + switch (missingCheckpointStrategy) { + case READ_UPTO_LATEST_COMMIT: + startCompletionTime = DEFAULT_START_TIMESTAMP; + break; + case READ_LATEST: + // rely on IncrementalQueryAnalyzer to use the latest completed instant + startCompletionTime = null; + break; + default: + throw new IllegalArgumentException("Unknown missing checkpoint strategy: " + missingCheckpointStrategy); } - }); + } else { + throw new IllegalArgumentException("Missing start completion time for incremental pull. For reading from latest " + + "committed instant, set " + MISSING_CHECKPOINT_STRATEGY.key() + " to a valid value"); + } + // TODO: why is previous instant time needed? It's not used anywhere // When `beginInstantTime` is present, `previousInstantTime` is set to the completed commit before `beginInstantTime` if that exists. // If there is no completed commit before `beginInstantTime`, e.g., `beginInstantTime` is the first commit in the active timeline, // `previousInstantTime` is set to `DEFAULT_BEGIN_TIMESTAMP`. - String previousInstantTime = DEFAULT_START_TIMESTAMP; - if (!beginInstantTime.equals(DEFAULT_START_TIMESTAMP)) { - Option previousInstant = activeCommitTimeline.findInstantBefore(beginInstantTime); - if (previousInstant.isPresent()) { - previousInstantTime = previousInstant.get().getTimestamp(); + String previousInstantTime = DEFAULT_START_TIMESTAMP; // TODO: this should be removed + if (startCompletionTime != null && !startCompletionTime.equals(DEFAULT_START_TIMESTAMP)) { + // has a valid start completion time, try to find previous instant + Option previousCompletedInstant = completedCommitTimeline.findInstantBeforeByCompletionTime(startCompletionTime); + if (previousCompletedInstant.isPresent()) { + previousInstantTime = previousCompletedInstant.get().getTimestamp(); } else { // if begin instant time matches first entry in active timeline, we can set previous = beginInstantTime - 1 - if (activeCommitTimeline.filterCompletedInstants().firstInstant().isPresent() - && activeCommitTimeline.filterCompletedInstants().firstInstant().get().getTimestamp().equals(beginInstantTime)) { - previousInstantTime = String.valueOf(Long.parseLong(beginInstantTime) - 1); + Option firstCompletedInstant = completedCommitTimeline.filterCompletedInstants().firstInstant(); + if (firstCompletedInstant.isPresent() + && firstCompletedInstant.get().getCompletionTime().equals(startCompletionTime)) { + previousInstantTime = String.valueOf(Long.parseLong(startCompletionTime) - 1); } } } - if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST || !activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) { - Option nthInstant; + IncrementalQueryAnalyzer.Builder analyzerBuilder = IncrementalQueryAnalyzer.builder(); + + if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST + || !completedCommitTimeline.isBeforeTimelineStartsByCompletionTime(startCompletionTime)) { + Option nthInstant; // TODO: remove this // When we are in the upgrade code path from non-sourcelimit-based batching to sourcelimit-based batching, we need to avoid fetching the commit // that is read already. Else we will have duplicates in append-only use case if we use "findInstantsAfterOrEquals". // As soon as we have a new format of checkpoint and a key we will move to the new code of fetching the current commit as well. if (sourceLimitBasedBatching && lastCheckpointKey.isPresent()) { - nthInstant = Option.fromJavaOptional(activeCommitTimeline - .findInstantsAfterOrEquals(beginInstantTime, numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y)); + // nthInstant = Option.fromJavaOptional(completedCommitTimeline + // .findInstantsAfterOrEqualsByCompletionTime(beginCompletionTime, numInstantsFromConfig).getInstantsAsStream().reduce((x, y) -> y)); + + // range stays as CLOSED_CLOSED to include the start instant + rangeType = RangeType.CLOSED_CLOSED; } else { - nthInstant = Option.fromJavaOptional(activeCommitTimeline - .findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y)); + // set the range type to OPEN_CLOSED to avoid duplicates + rangeType = RangeType.OPEN_CLOSED; } - return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), previousInstantTime, - beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime), - orderColumn, keyColumn, limitColumn); + + return analyzerBuilder + .metaClient(metaClient) + .startCompletionTime(startCompletionTime) + .endCompletionTime(null) + .rangeType(rangeType) + .limit(numInstantsFromConfig) + .build(); } else { // when MissingCheckpointStrategy is set to read everything until latest, trigger snapshot query. - Option lastInstant = activeCommitTimeline.lastInstant(); - return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(), - previousInstantTime, beginInstantTime, lastInstant.get().getTimestamp(), - orderColumn, keyColumn, limitColumn); + return analyzerBuilder + .metaClient(metaClient) + .startCompletionTime(START_COMMIT_EARLIEST) + .endCompletionTime(null) + .rangeType(rangeType) + .limit(-1) // snapshot query, disrespect limit + .build(); + + // TODO: Revisit this, maybe we can remove this check and have IncrementalQueryAnalyzer to handle + + // Option lastInstant = completedCommitTimeline.lastInstant(); + // return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(), + // previousInstantTime, beginCompletionTime, lastInstant.get().getTimestamp(), + // orderColumn, keyColumn, limitColumn); } } @@ -230,24 +252,29 @@ public static IncrementalQueryAnalyzer getIncrementalQueryAnalyzer( * * @param sourceData Source dataset * @param sourceLimit Max number of bytes to be read from source - * @param queryInfo Query Info + * @param endCheckpoint New checkpoint using completion time * @return end instants along with filtered rows. */ - public static Pair>> filterAndGenerateCheckpointBasedOnSourceLimit(Dataset sourceData, - long sourceLimit, QueryInfo queryInfo, - CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint) { + public static Pair>> filterAndGenerateCheckpointBasedOnSourceLimit( + Dataset sourceData, + long sourceLimit, + String endCheckpoint, + CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint, + CloudDataColumnInfo cloudDataColumnInfo + ) { if (sourceData.isEmpty()) { // There is no file matching the prefix. CloudObjectIncrCheckpoint updatedCheckpoint = - queryInfo.getEndInstant().equals(cloudObjectIncrCheckpoint.getCommit()) + endCheckpoint.equals(cloudObjectIncrCheckpoint.getCommit()) ? cloudObjectIncrCheckpoint - : new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), null); + : new CloudObjectIncrCheckpoint(endCheckpoint, null); return Pair.of(updatedCheckpoint, Option.empty()); } // Let's persist the dataset to avoid triggering the dag repeatedly sourceData.persist(StorageLevel.MEMORY_AND_DISK()); + // Set ordering in query to enable batching - Dataset orderedDf = QueryRunner.applyOrdering(sourceData, queryInfo.getOrderByColumns()); + Dataset orderedDf = QueryRunner.applyOrdering(sourceData, cloudDataColumnInfo.getOrderByColumns()); Option lastCheckpoint = Option.of(cloudObjectIncrCheckpoint.getCommit()); Option lastCheckpointKey = Option.ofNullable(cloudObjectIncrCheckpoint.getKey()); Option concatenatedKey = lastCheckpoint.flatMap(checkpoint -> lastCheckpointKey.map(key -> checkpoint + key)); @@ -255,41 +282,42 @@ public static Pair>> filterAndGen // Filter until last checkpoint key if (concatenatedKey.isPresent()) { orderedDf = orderedDf.withColumn("commit_key", - functions.concat(functions.col(queryInfo.getOrderColumn()), functions.col(queryInfo.getKeyColumn()))); + functions.concat( + functions.col(cloudDataColumnInfo.getOrderColumn()), functions.col(cloudDataColumnInfo.getKeyColumn()))); // Apply incremental filter orderedDf = orderedDf.filter(functions.col("commit_key").gt(concatenatedKey.get())).drop("commit_key"); // If there are no more files where commit_key is greater than lastCheckpointCommit#lastCheckpointKey if (orderedDf.isEmpty()) { - LOG.info("Empty ordered source, returning endpoint:" + queryInfo.getEndInstant()); + LOG.info("Empty ordered source, returning endpoint:" + endCheckpoint); sourceData.unpersist(); - // queryInfo.getEndInstant() represents source table's last completed instant - // If current checkpoint is c1#abc and queryInfo.getEndInstant() is c1, return c1#abc. - // If current checkpoint is c1#abc and queryInfo.getEndInstant() is c2, return c2. + // endCheckpoint represents source table's last completed instant + // If current checkpoint is c1#abc and endCheckpoint is c1, return c1#abc. + // If current checkpoint is c1#abc and endCheckpoint is c2, return c2. CloudObjectIncrCheckpoint updatedCheckpoint = - queryInfo.getEndInstant().equals(cloudObjectIncrCheckpoint.getCommit()) + endCheckpoint.equals(cloudObjectIncrCheckpoint.getCommit()) ? cloudObjectIncrCheckpoint - : new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), null); + : new CloudObjectIncrCheckpoint(endCheckpoint, null); return Pair.of(updatedCheckpoint, Option.empty()); } } // Limit based on sourceLimit - WindowSpec windowSpec = Window.orderBy(col(queryInfo.getOrderColumn()), col(queryInfo.getKeyColumn())); + WindowSpec windowSpec = Window.orderBy(col(cloudDataColumnInfo.getOrderColumn()), col(cloudDataColumnInfo.getKeyColumn())); // Add the 'cumulativeSize' column with running sum of 'limitColumn' Dataset aggregatedData = orderedDf.withColumn(CUMULATIVE_COLUMN_NAME, - sum(col(queryInfo.getLimitColumn())).over(windowSpec)); + sum(col(cloudDataColumnInfo.getLimitColumn())).over(windowSpec)); // TODO: replace this Dataset collectedRows = aggregatedData.filter(col(CUMULATIVE_COLUMN_NAME).leq(sourceLimit)); - Row row = null; + Row row; if (collectedRows.isEmpty()) { // If the first element itself exceeds limits then return first element LOG.info("First object exceeding source limit: " + sourceLimit + " bytes"); - row = aggregatedData.select(queryInfo.getOrderColumn(), queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).first(); + row = aggregatedData.select(cloudDataColumnInfo.getOrderColumn(), cloudDataColumnInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).first(); collectedRows = aggregatedData.limit(1); } else { // Get the last row and form composite key - row = collectedRows.select(queryInfo.getOrderColumn(), queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).orderBy( - col(queryInfo.getOrderColumn()).desc(), col(queryInfo.getKeyColumn()).desc()).first(); + row = collectedRows.select(cloudDataColumnInfo.getOrderColumn(), cloudDataColumnInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).orderBy( + col(cloudDataColumnInfo.getOrderColumn()).desc(), col(cloudDataColumnInfo.getKeyColumn()).desc()).first(); } LOG.info("Processed batch size: " + row.get(row.fieldIndex(CUMULATIVE_COLUMN_NAME)) + " bytes"); sourceData.unpersist(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java index dc564ea940c3..6183cde85758 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java @@ -28,6 +28,7 @@ import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL; import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL; +// TODO remove this class /** * This class is used to prepare query information for s3 and gcs incr source. * Some of the information in this class is used for batching based on sourceLimit. diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index 2bb1ec153453..eeb065f800a5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -21,9 +21,11 @@ import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.config.HoodieIncrSourceConfig; import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter; @@ -37,6 +39,7 @@ import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; @@ -62,16 +65,15 @@ public QueryRunner(SparkSession sparkSession, TypedProperties props) { /** * This is used to execute queries for cloud stores incremental pipelines. * Regular Hudi incremental queries does not take this flow. - * @param queryInfo all meta info about the query to be executed. + * @param queryContext all meta info about the query to be executed. * @return the output of the query as Dataset < Row >. */ - public Pair> run(QueryInfo queryInfo, Option snapshotLoadQuerySplitterOption) { - if (queryInfo.isIncremental()) { - return runIncrementalQuery(queryInfo); - } else if (queryInfo.isSnapshot()) { - return runSnapshotQuery(queryInfo, snapshotLoadQuerySplitterOption); + public Pair> run(QueryContext queryContext, Option snapshotLoadQuerySplitterOption) { + // no need to check if it's incremental + if (queryContext.getInstantRange().isEmpty()) { + return runSnapshotQuery(queryContext, snapshotLoadQuerySplitterOption); } else { - throw new HoodieException("Unknown query type " + queryInfo.getQueryType()); + return runIncrementalQuery(queryContext); } } @@ -83,35 +85,51 @@ public static Dataset applyOrdering(Dataset dataset, List orde return dataset; } - public Pair> runIncrementalQuery(QueryInfo queryInfo) { + public Pair> runIncrementalQuery(QueryContext queryContext) { LOG.info("Running incremental query"); - return Pair.of(queryInfo, sparkSession.read().format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()) - .option(DataSourceReadOptions.START_COMMIT().key(), queryInfo.getStartInstant()) - .option(DataSourceReadOptions.END_COMMIT().key(), queryInfo.getEndInstant()) + String inclusiveStartCompletionTime = queryContext.getInstants().stream() + .min(HoodieInstant.COMPLETION_TIME_COMPARATOR) + .map(HoodieInstant::getCompletionTime) + .get(); + + return Pair.of(queryContext.getMaxCompletionTime(), sparkSession.read().format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) + .option(DataSourceReadOptions.START_COMMIT().key(), inclusiveStartCompletionTime) + .option(DataSourceReadOptions.END_COMMIT().key(), queryContext.getMaxCompletionTime()) .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue())) .load(sourcePath)); } - public Pair> runSnapshotQuery(QueryInfo queryInfo, Option snapshotLoadQuerySplitterOption) { + public Pair> runSnapshotQuery(QueryContext queryContext, Option snapshotLoadQuerySplitter) { LOG.info("Running snapshot query"); - Dataset snapshot = sparkSession.read().format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).load(sourcePath); - QueryInfo snapshotQueryInfo = snapshotLoadQuerySplitterOption - .map(snapshotLoadQuerySplitter -> snapshotLoadQuerySplitter.getNextCheckpoint(snapshot, queryInfo, Option.empty())) - .orElse(queryInfo); - return Pair.of(snapshotQueryInfo, applySnapshotQueryFilters(snapshot, snapshotQueryInfo)); - } - - public Dataset applySnapshotQueryFilters(Dataset snapshot, QueryInfo snapshotQueryInfo) { - Dataset df = snapshot + Dataset snapshot = sparkSession.read() + .format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()) + .load(sourcePath); + String endCompletionTime = queryContext.getMaxCompletionTime(); + Option predicate = Option.empty(); + List instantTimeList = queryContext.getInstantTimeList(); + if (snapshotLoadQuerySplitter.isPresent()) { + Option newCheckpointAndPredicate = + snapshotLoadQuerySplitter.get().getNextCheckpointWithPredicates(snapshot, queryContext); + if (newCheckpointAndPredicate.isPresent()) { + endCompletionTime = newCheckpointAndPredicate.get().getEndCompletionTime(); + predicate = Option.of(newCheckpointAndPredicate.get().getPredicateFilter()); + instantTimeList = queryContext.getInstants().stream() + .filter(instant -> HoodieTimeline.compareTimestamps( + instant.getCompletionTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, + newCheckpointAndPredicate.get().getEndCompletionTime())) + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + } + } + snapshot = predicate.map(snapshot::filter).orElse(snapshot); + snapshot = snapshot // add filtering so that only interested records are returned. - .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - snapshotQueryInfo.getStartInstant())) - .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - snapshotQueryInfo.getEndInstant())); - return snapshotQueryInfo.getPredicateFilter().map(df::filter).orElse(df); + .filter(String.format("%s IN ('%s')", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + String.join("','", instantTimeList))); + return Pair.of(endCompletionTime, snapshot); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 67149d1e57f2..84dc947163a0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; @@ -91,6 +92,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +// TODO: remove QueryInfo from this test public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarness { private static final Schema GCS_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource( @@ -326,7 +328,7 @@ private void setMockQueryRunner(Dataset inputDs) { private void setMockQueryRunner(Dataset inputDs, Option nextCheckPointOpt) { - when(queryRunner.run(any(QueryInfo.class), any())).thenAnswer(invocation -> { + when(queryRunner.run(any(QueryContext.class), any())).thenAnswer(invocation -> { QueryInfo queryInfo = invocation.getArgument(0); QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint -> queryInfo.withUpdatedEndInstant(nextCheckPoint)) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index d818c6caae62..6aa65e5a30b5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; @@ -88,6 +89,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +// TODO: remove QueryInfo from this test @ExtendWith(MockitoExtension.class) public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarness { private static final Schema S3_METADATA_SCHEMA = SchemaTestUtil.getSchemaFromResource( @@ -556,7 +558,7 @@ private void setMockQueryRunner(Dataset inputDs) { private void setMockQueryRunner(Dataset inputDs, Option nextCheckPointOpt) { - when(mockQueryRunner.run(Mockito.any(QueryInfo.class), Mockito.any())).thenAnswer(invocation -> { + when(mockQueryRunner.run(Mockito.any(QueryContext.class), Mockito.any())).thenAnswer(invocation -> { QueryInfo queryInfo = invocation.getArgument(0); QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint -> queryInfo.withUpdatedEndInstant(nextCheckPoint)) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java index b2480d6f587e..d9d69c897fc6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java @@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; @@ -37,6 +37,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.sources.TestS3EventsHoodieIncrSource; +import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.CloudDataColumnInfo; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -62,8 +63,9 @@ import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +// TODO: remove QueryInfo from this test class TestIncrSourceHelper extends SparkClientFunctionalTestHarness { private ObjectMapper mapper = new ObjectMapper(); @@ -119,10 +121,15 @@ void testEmptySource() { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - emptyDataset, 50L, queryInfo, new CloudObjectIncrCheckpoint(null, null)); + Pair>> result = + IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + emptyDataset, + 50L, + queryInfo.getEndInstant(), + new CloudObjectIncrCheckpoint(null, null), + CloudDataColumnInfo.S3); assertEquals("commit2", result.getKey().toString()); - assertTrue(!result.getRight().isPresent()); + assertFalse(result.getRight().isPresent()); } @Test @@ -140,8 +147,13 @@ void testSingleObjectExceedingSourceLimit() { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 50L, queryInfo, new CloudObjectIncrCheckpoint("commit1", null)); + Pair>> result = + IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + inputDs, + 50L, + queryInfo.getEndInstant(), + new CloudObjectIncrCheckpoint("commit1", null), + CloudDataColumnInfo.S3); Row row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit1#path/to/file1.json", result.getKey().toString()); List rows = result.getRight().get().collectAsList(); @@ -167,8 +179,13 @@ void testMultipleObjectExceedingSourceLimit() { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit1", null)); + Pair>> result = + IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + inputDs, + 350L, + queryInfo.getEndInstant(), + new CloudObjectIncrCheckpoint("commit1", null), + CloudDataColumnInfo.S3); Row row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit1#path/to/file2.json", result.getKey().toString()); List rows = result.getRight().get().collectAsList(); @@ -177,7 +194,11 @@ void testMultipleObjectExceedingSourceLimit() { assertEquals(250L, row.get(0)); result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 550L, queryInfo, new CloudObjectIncrCheckpoint("commit1", null)); + inputDs, + 550L, + queryInfo.getEndInstant(), + new CloudObjectIncrCheckpoint("commit1", null), + CloudDataColumnInfo.S3); row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit2#path/to/file4.json", result.getKey().toString()); rows = result.getRight().get().collectAsList(); @@ -206,8 +227,13 @@ void testCatchAllObjects() { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit2", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit1", null)); + Pair>> result = + IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + inputDs, + 1500L, + queryInfo.getEndInstant(), + new CloudObjectIncrCheckpoint("commit1", null), + CloudDataColumnInfo.S3); Row row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit3#path/to/file8.json", result.getKey().toString()); List rows = result.getRight().get().collectAsList(); @@ -231,8 +257,13 @@ void testFileOrderingAcrossCommits() { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit3", "commit3", "commit4", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 50L, queryInfo, new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json")); + Pair>> result = + IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + inputDs, + 50L, + queryInfo.getEndInstant(), + new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json"), + CloudDataColumnInfo.S3); Row row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit4#path/to/file0.json", result.getKey().toString()); List rows = result.getRight().get().collectAsList(); @@ -240,7 +271,11 @@ void testFileOrderingAcrossCommits() { assertEquals(100L, row.get(0)); result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 350L, queryInfo, new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json")); + inputDs, + 350L, + queryInfo.getEndInstant(), + new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json"), + CloudDataColumnInfo.S3); row = result.getRight().get().select("cumulativeSize").collectAsList().get((int) result.getRight().get().count() - 1); assertEquals("commit4#path/to/file2.json", result.getKey().toString()); rows = result.getRight().get().collectAsList(); @@ -268,19 +303,28 @@ void testLastObjectInCommit() { QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit3", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); - Pair>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json")); + Pair>> result = + IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( + inputDs, + 1500L, + queryInfo.getEndInstant(), + new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json"), + CloudDataColumnInfo.S3); assertEquals("commit3#path/to/file8.json", result.getKey().toString()); - assertTrue(!result.getRight().isPresent()); + assertFalse(result.getRight().isPresent()); // Test case 2 when queryInfo.endInstant() is greater than lastCheckpointCommit queryInfo = new QueryInfo( QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1", "commit4", "_hoodie_commit_time", "s3.object.key", "s3.object.size"); result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( - inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit3","path/to/file8.json")); + inputDs, + 1500L, + queryInfo.getEndInstant(), + new CloudObjectIncrCheckpoint("commit3","path/to/file8.json"), + CloudDataColumnInfo.S3); assertEquals("commit4", result.getKey().toString()); - assertTrue(!result.getRight().isPresent()); + assertFalse(result.getRight().isPresent()); } private HoodieRecord generateS3EventMetadata(String commitTime, String bucketName, String objectKey, Long objectSize) { @@ -361,21 +405,19 @@ void testQueryInfoGeneration() throws IOException { inserts = writeS3MetadataRecords(commitTimeForWrites); String startInstant = commitTimeForReads; - String orderColumn = "_hoodie_commit_time"; - String keyColumn = "s3.object.key"; - String limitColumn = "s3.object.size"; - QueryInfo queryInfo = IncrSourceHelper.generateQueryInfo(jsc, basePath(), 5, Option.of(startInstant), null, - TimelineUtils.HollowCommitHandling.BLOCK, orderColumn, keyColumn, limitColumn, true, Option.empty()); - assertEquals(String.valueOf(Integer.parseInt(commitTimeForReads) - 1), queryInfo.getPreviousInstant()); - assertEquals(commitTimeForReads, queryInfo.getStartInstant()); - assertEquals(commitTimeForWrites, queryInfo.getEndInstant()); - startInstant = commitTimeForWrites; - queryInfo = IncrSourceHelper.generateQueryInfo(jsc, basePath(), 5, Option.of(startInstant), null, - TimelineUtils.HollowCommitHandling.BLOCK, orderColumn, keyColumn, limitColumn, true, Option.empty()); - assertEquals(commitTimeForReads, queryInfo.getPreviousInstant()); - assertEquals(commitTimeForWrites, queryInfo.getStartInstant()); - assertEquals(commitTimeForWrites, queryInfo.getEndInstant()); + QueryContext queryContext = + IncrSourceHelper.generateQueryInfo(jsc, basePath(), 5, + Option.of(startInstant), null, true, Option.empty()) + .analyze(); + assertEquals(commitTimeForReads, queryContext.getStartInstant().get()); + assertEquals(commitTimeForWrites, queryContext.getEndInstant().get()); + startInstant = commitTimeForWrites; + queryContext = IncrSourceHelper.generateQueryInfo(jsc, basePath(), 5, + Option.of(startInstant), null, true, Option.empty()) + .analyze(); + assertEquals(commitTimeForWrites, queryContext.getStartInstant().get()); + assertEquals(commitTimeForWrites, queryContext.getEndInstant().get()); } } \ No newline at end of file