Skip to content

Commit

Permalink
wip: have cloud event source use QueryContext
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Chang committed Oct 21, 2024
1 parent 358c69c commit f9dd954
Show file tree
Hide file tree
Showing 14 changed files with 379 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,19 @@ public HoodieTimeline findInstantsAfter(String instantTime) {
.filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details);
}

@Override
public HoodieTimeline findInstantsAfterByCompletionTime(String completionTime, int numCommits) {
return new HoodieDefaultTimeline(getInstantsOrderedByCompletionTime()
.filter(s -> compareTimestamps(s.getCompletionTime(), GREATER_THAN, completionTime))
.limit(numCommits), details);
}

@Override
public HoodieTimeline findInstantsAfterByCompletionTime(String completionTime) {
return new HoodieDefaultTimeline(getInstantsOrderedByCompletionTime()
.filter(s -> compareTimestamps(s.getCompletionTime(), GREATER_THAN, completionTime)), details);
}

@Override
public HoodieDefaultTimeline findInstantsAfterOrEquals(String commitTime, int numCommits) {
return new HoodieDefaultTimeline(getInstantsAsStream()
Expand All @@ -299,6 +312,19 @@ public HoodieTimeline findInstantsAfterOrEquals(String commitTime) {
.filter(s -> compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, commitTime)), details);
}

@Override
public HoodieDefaultTimeline findInstantsAfterOrEqualsByCompletionTime(String completionTime, int numCommits) {
return new HoodieDefaultTimeline(getInstantsOrderedByCompletionTime()
.filter(s -> compareTimestamps(s.getCompletionTime(), GREATER_THAN_OR_EQUALS, completionTime))
.limit(numCommits), details);
}

@Override
public HoodieDefaultTimeline findInstantsAfterOrEqualsByCompletionTime(String completionTime) {
return new HoodieDefaultTimeline(getInstantsOrderedByCompletionTime()
.filter(s -> compareTimestamps(s.getCompletionTime(), GREATER_THAN_OR_EQUALS, completionTime)), details);
}

@Override
public HoodieDefaultTimeline findInstantsBefore(String instantTime) {
return new HoodieDefaultTimeline(getInstantsAsStream()
Expand All @@ -313,6 +339,13 @@ public Option<HoodieInstant> findInstantBefore(String instantTime) {
.max(Comparator.comparing(HoodieInstant::getTimestamp)));
}

@Override
public Option<HoodieInstant> findInstantBeforeByCompletionTime(String completionTime) {
return Option.fromJavaOptional(getInstantsOrderedByCompletionTime()
.filter(instant -> compareTimestamps(instant.getCompletionTime(), LESSER_THAN, completionTime))
.max(Comparator.comparing(HoodieInstant::getCompletionTime)));
}

@Override
public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) {
return new HoodieDefaultTimeline(getInstantsAsStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,16 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline findInstantsAfterOrEquals(String commitTime);

/**
* Create a new Timeline with all the instants completed after specified completion time.
*/
HoodieTimeline findInstantsAfterOrEqualsByCompletionTime(String completionTime, int numCommits);

/**
* Create a new Timeline with all the instants completed after specified completion time.
*/
HoodieTimeline findInstantsAfterOrEqualsByCompletionTime(String completionTime);

/**
* Create a new Timeline with instants after startTs and before or on endTs.
*/
Expand Down Expand Up @@ -291,6 +301,16 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline findInstantsAfter(String instantTime);

/**
* Create a new Timeline with all the instants completed after specified completion time.
*/
HoodieTimeline findInstantsAfterByCompletionTime(String completionTime);

/**
* Create a new Timeline with all the instants completed after specified completion time.
*/
HoodieTimeline findInstantsAfterByCompletionTime(String completionTime, int numCommits);

/**
* Create a new Timeline with all instants before specified time.
*/
Expand All @@ -301,6 +321,11 @@ public interface HoodieTimeline extends Serializable {
*/
Option<HoodieInstant> findInstantBefore(String instantTime);

/**
* Find the last completed instant before specified completion time
*/
Option<HoodieInstant> findInstantBeforeByCompletionTime(String completionTime);

/**
* Create new timeline with all instants before or equals specified time.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
package org.apache.hudi.utilities.sources;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
Expand All @@ -52,8 +51,6 @@
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.Type.GCS;
import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;

/**
Expand Down Expand Up @@ -165,22 +162,23 @@ public GcsEventsHoodieIncrSource(
@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCheckpoint, long sourceLimit) {
CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);

QueryInfo queryInfo = generateQueryInfo(
IncrementalQueryAnalyzer analyzer =
IncrSourceHelper.generateQueryInfo(
sparkContext, srcPath, numInstantsPerFetch,
Option.of(cloudObjectIncrCheckpoint.getCommit()),
missingCheckpointStrategy, handlingMode, HoodieRecord.COMMIT_TIME_METADATA_FIELD,
CloudObjectsSelectorCommon.GCS_OBJECT_KEY,
CloudObjectsSelectorCommon.GCS_OBJECT_SIZE, true,
missingCheckpointStrategy, true,
Option.ofNullable(cloudObjectIncrCheckpoint.getKey()));
LOG.info("Querying GCS with:" + cloudObjectIncrCheckpoint + " and queryInfo:" + queryInfo);
QueryContext queryContext = analyzer.analyze();
LOG.info("Querying GCS with:" + cloudObjectIncrCheckpoint + " and queryInfo:" + analyzer);

if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && queryInfo.areStartAndEndInstantsEqual()) {
LOG.info("Source of file names is empty. Returning empty result and endInstant: "
+ queryInfo.getStartInstant());
return Pair.of(Option.empty(), queryInfo.getStartInstant());
if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && queryContext.isEmpty()) {
LOG.info("Source of file names is empty. Returning empty result and lastCheckpoint: "
+ lastCheckpoint.orElse(null));
return Pair.of(Option.empty(), lastCheckpoint.orElse(null));
}
return cloudDataFetcher.fetchPartitionedSource(GCS, cloudObjectIncrCheckpoint, this.sourceProfileSupplier, queryRunner.run(queryInfo, snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit);
return cloudDataFetcher.fetchPartitionedSource(
GCS, cloudObjectIncrCheckpoint, this.sourceProfileSupplier,
queryRunner.run(queryContext, snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
Option<SnapshotLoadQuerySplitter.CheckpointWithPredicates> newCheckpointAndPredicate =
snapshotLoadQuerySplitter.get().getNextCheckpointWithPredicates(snapshot, queryContext);
if (newCheckpointAndPredicate.isPresent()) {
endCompletionTime = newCheckpointAndPredicate.get().endCompletionTime;
predicate = Option.of(newCheckpointAndPredicate.get().predicateFilter);
endCompletionTime = newCheckpointAndPredicate.get().getEndCompletionTime();
predicate = Option.of(newCheckpointAndPredicate.get().getPredicateFilter());
instantTimeList = queryContext.getInstants().stream()
.filter(instant -> HoodieTimeline.compareTimestamps(
instant.getCompletionTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, newCheckpointAndPredicate.get().endCompletionTime))
instant.getCompletionTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, newCheckpointAndPredicate.get().getEndCompletionTime()))
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@
package org.apache.hudi.utilities.sources;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectIncrCheckpoint;
import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.StreamContext;
Expand All @@ -50,7 +48,6 @@
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.Type.S3;
import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;

/**
Expand Down Expand Up @@ -110,22 +107,21 @@ public S3EventsHoodieIncrSource(
@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCheckpoint, long sourceLimit) {
CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
HollowCommitHandling handlingMode = getHollowCommitHandleMode(props);
QueryInfo queryInfo =
IncrementalQueryAnalyzer analyzer =
IncrSourceHelper.generateQueryInfo(
sparkContext, srcPath, numInstantsPerFetch,
Option.of(cloudObjectIncrCheckpoint.getCommit()),
missingCheckpointStrategy, handlingMode,
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
CloudObjectsSelectorCommon.S3_OBJECT_KEY,
CloudObjectsSelectorCommon.S3_OBJECT_SIZE, true,
missingCheckpointStrategy, true,
Option.ofNullable(cloudObjectIncrCheckpoint.getKey()));
LOG.info("Querying S3 with:" + cloudObjectIncrCheckpoint + ", queryInfo:" + queryInfo);
QueryContext queryContext = analyzer.analyze();
LOG.info("Querying S3 with:" + cloudObjectIncrCheckpoint + ", queryInfo:" + queryContext);

if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && queryInfo.areStartAndEndInstantsEqual()) {
if (isNullOrEmpty(cloudObjectIncrCheckpoint.getKey()) && queryContext.isEmpty()) {
LOG.warn("Already caught up. No new data to process");
return Pair.of(Option.empty(), queryInfo.getEndInstant());
return Pair.of(Option.empty(), lastCheckpoint.orElse(null));
}
return cloudDataFetcher.fetchPartitionedSource(S3, cloudObjectIncrCheckpoint, this.sourceProfileSupplier, queryRunner.run(queryInfo, snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit);
return cloudDataFetcher.fetchPartitionedSource(
S3, cloudObjectIncrCheckpoint, this.sourceProfileSupplier,
queryRunner.run(queryContext, snapshotLoadQuerySplitter), this.schemaProvider, sourceLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -56,8 +54,8 @@ public static class Config {
* Checkpoint returned for the SnapshotLoadQuerySplitter.
*/
public static class CheckpointWithPredicates {
String endCompletionTime;
String predicateFilter;
private String endCompletionTime;
private String predicateFilter;

public CheckpointWithPredicates(String endCompletionTime, String predicateFilter) {
this.endCompletionTime = endCompletionTime;
Expand Down Expand Up @@ -91,21 +89,6 @@ public SnapshotLoadQuerySplitter(TypedProperties properties) {
*/
public abstract Option<CheckpointWithPredicates> getNextCheckpointWithPredicates(Dataset<Row> df, QueryContext queryContext);

/**
* Retrieves the next checkpoint based on query information and a SourceProfileSupplier.
*
* @param df The dataset to process.
* @param queryInfo The query information object.
* @param sourceProfileSupplier An Option of a SourceProfileSupplier to use in load splitting implementation
* @return Updated query information with the next checkpoint, in case of empty checkpoint,
* returning endPoint same as queryInfo.getEndInstant().
*/
@Deprecated
public QueryInfo getNextCheckpoint(Dataset<Row> df, QueryInfo queryInfo, Option<SourceProfileSupplier> sourceProfileSupplier) {
// TODO(HUDI-8354): fix related usage in the event incremental source
throw new UnsupportedOperationException("getNextCheckpoint is no longer supported with instant time.");
}

public static Option<SnapshotLoadQuerySplitter> getInstance(TypedProperties props) {
return props.getNonEmptyStringOpt(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, null)
.map(className -> (SnapshotLoadQuerySplitter) ReflectionUtils.loadClass(className,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
import static org.apache.hudi.utilities.config.CloudSourceConfig.SOURCE_MAX_BYTES_PER_PARTITION;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.CloudDataColumnInfo.getCloudDataColumnInfo;

/**
* Connects to S3/GCS from Spark and downloads data from a given list of files.
Expand Down Expand Up @@ -87,7 +88,7 @@ public Pair<Option<Dataset<Row>>, String> fetchPartitionedSource(
CloudObjectsSelectorCommon.Type cloudType,
CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint,
Option<SourceProfileSupplier> sourceProfileSupplier,
Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair,
Pair<String, Dataset<Row>> chkptDatasetPair,
Option<SchemaProvider> schemaProvider,
long sourceLimit) {
boolean isSourceProfileSupplierAvailable = sourceProfileSupplier.isPresent() && sourceProfileSupplier.get().getSourceProfile() != null;
Expand All @@ -96,15 +97,16 @@ public Pair<Option<Dataset<Row>>, String> fetchPartitionedSource(
sourceLimit = sourceProfileSupplier.get().getSourceProfile().getMaxSourceBytes();
}

QueryInfo queryInfo = queryInfoDatasetPair.getLeft();
String endCheckpoint = chkptDatasetPair.getLeft();
String filter = CloudObjectsSelectorCommon.generateFilter(cloudType, props);
LOG.info("Adding filter string to Dataset: " + filter);
Dataset<Row> filteredSourceData = queryInfoDatasetPair.getRight().filter(filter);
Dataset<Row> filteredSourceData = chkptDatasetPair.getRight().filter(filter);

LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit);
LOG.info("Adjusting end checkpoint:" + endCheckpoint + " based on sourceLimit :" + sourceLimit);
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset =
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint);
filteredSourceData, sourceLimit, endCheckpoint,
cloudObjectIncrCheckpoint, getCloudDataColumnInfo(cloudType));
if (!checkPointAndDataset.getRight().isPresent()) {
LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft());
return Pair.of(Option.empty(), checkPointAndDataset.getLeft().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -551,4 +552,54 @@ public enum Type {
S3,
GCS
}

public enum CloudDataColumnInfo {
S3(
CloudObjectsSelectorCommon.S3_OBJECT_KEY,
CloudObjectsSelectorCommon.S3_OBJECT_SIZE
),
GCS(
CloudObjectsSelectorCommon.GCS_OBJECT_KEY,
CloudObjectsSelectorCommon.GCS_OBJECT_SIZE
);

// both S3 and GCS use COMMIT_TIME_METADATA_FIELD as order column
private final String orderColumn = HoodieRecord.COMMIT_TIME_METADATA_FIELD;
private final String keyColumn;
private final String limitColumn;
private final List<String> orderByColumns;

CloudDataColumnInfo(String keyColumn, String limitColumn) {
this.keyColumn = keyColumn;
this.limitColumn = limitColumn;
orderByColumns = Arrays.asList(orderColumn, keyColumn);
}

public String getOrderColumn() {
return orderColumn;
}

public String getKeyColumn() {
return keyColumn;
}

public String getLimitColumn() {
return limitColumn;
}

public List<String> getOrderByColumns() {
return orderByColumns;
}

public static CloudDataColumnInfo getCloudDataColumnInfo(CloudObjectsSelectorCommon.Type type) {
switch (type) {
case S3:
return S3;
case GCS:
return GCS;
default:
throw new IllegalArgumentException("Unsupported cloud data column type: " + type);
}
}
}
}
Loading

0 comments on commit f9dd954

Please sign in to comment.