Skip to content

Commit

Permalink
[HUDI-7416] Remove duplicate code for getFileFormat and Refactor filt…
Browse files Browse the repository at this point in the history
…er methods for S3/GCS sources (#10701)
  • Loading branch information
vinishjail97 authored Feb 20, 2024
1 parent ad8a15a commit 8a09c77
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;
import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy;
Expand Down Expand Up @@ -126,8 +124,8 @@ public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, Sp
SchemaProvider schemaProvider) {

this(props, jsc, spark, schemaProvider,
new GcsObjectMetadataFetcher(props, getSourceFileFormat(props)),
new CloudDataFetcher(props, getStringWithAltKeys(props, DATAFILE_FORMAT, true)),
new GcsObjectMetadataFetcher(props),
new CloudDataFetcher(props),
new QueryRunner(spark, props)
);
}
Expand Down Expand Up @@ -196,9 +194,4 @@ private Pair<Option<Dataset<Row>>, String> extractData(QueryInfo queryInfo, Data
Option<Dataset<Row>> fileDataRows = gcsObjectDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props, schemaProvider);
return Pair.of(fileDataRows, queryInfo.getEndInstant());
}

private static String getSourceFileFormat(TypedProperties props) {
return getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
Expand All @@ -52,11 +51,9 @@
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
import static org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_FS_PREFIX;
import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_PREFIX;
import static org.apache.hudi.utilities.config.S3EventsHoodieIncrSourceConfig.S3_IGNORE_KEY_SUBSTRING;
Expand All @@ -72,11 +69,9 @@
public class S3EventsHoodieIncrSource extends HoodieIncrSource {

private static final Logger LOG = LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
private static final String EMPTY_STRING = "";
private final String srcPath;
private final int numInstantsPerFetch;
private final boolean checkIfFileExists;
private final String fileFormat;
private final IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy;
private final QueryRunner queryRunner;
private final CloudDataFetcher cloudDataFetcher;
Expand Down Expand Up @@ -123,7 +118,7 @@ public S3EventsHoodieIncrSource(
SparkSession sparkSession,
SchemaProvider schemaProvider) {
this(props, sparkContext, sparkSession, schemaProvider, new QueryRunner(sparkSession, props),
new CloudDataFetcher(props, getStringWithAltKeys(props, CloudSourceConfig.DATAFILE_FORMAT, true)));
new CloudDataFetcher(props));
}

public S3EventsHoodieIncrSource(
Expand All @@ -138,20 +133,34 @@ public S3EventsHoodieIncrSource(
this.srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH);
this.numInstantsPerFetch = getIntWithAltKeys(props, NUM_INSTANTS_PER_FETCH);
this.checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK);

// This is to ensure backward compatibility where we were using the
// config SOURCE_FILE_FORMAT for file format in previous versions.
this.fileFormat = StringUtils.isNullOrEmpty(getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING))
? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
: getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);

this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
this.queryRunner = queryRunner;
this.cloudDataFetcher = cloudDataFetcher;
this.schemaProvider = Option.ofNullable(schemaProvider);
this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props);
}

public static String generateFilter(TypedProperties props) {
String fileFormat = CloudDataFetcher.getFileFormat(props);
String filter = S3_OBJECT_SIZE + " > 0";
if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_KEY_PREFIX, true))) {
filter = filter + " and " + S3_OBJECT_KEY + " like '" + getStringWithAltKeys(props, S3_KEY_PREFIX) + "%'";
}
if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX, true))) {
filter = filter + " and " + S3_OBJECT_KEY + " not like '" + getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX) + "%'";
}
if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING, true))) {
filter = filter + " and " + S3_OBJECT_KEY + " not like '%" + getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING) + "%'";
}
// Match files with a given extension, or use the fileFormat as the fallback incase the config is not set.
if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION, true))) {
filter = filter + " and " + S3_OBJECT_KEY + " like '%" + getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION) + "'";
} else {
filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + "%'";
}
return filter;
}

@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCheckpoint, long sourceLimit) {
CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint = CloudObjectIncrCheckpoint.fromString(lastCheckpoint);
Expand All @@ -172,7 +181,7 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastChec
}
Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
queryInfo = queryInfoDatasetPair.getLeft();
Dataset<Row> filteredSourceData = applyFilter(queryInfoDatasetPair.getRight(), fileFormat);
Dataset<Row> filteredSourceData = queryInfoDatasetPair.getRight().filter(generateFilter(props));

LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit);
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset =
Expand All @@ -199,25 +208,4 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastChec
Option<Dataset<Row>> datasetOption = cloudDataFetcher.getCloudObjectDataDF(sparkSession, cloudObjectMetadata, props, schemaProvider);
return Pair.of(datasetOption, checkPointAndDataset.getLeft().toString());
}

Dataset<Row> applyFilter(Dataset<Row> source, String fileFormat) {
String filter = S3_OBJECT_SIZE + " > 0";
if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_KEY_PREFIX, true))) {
filter = filter + " and " + S3_OBJECT_KEY + " like '" + getStringWithAltKeys(props, S3_KEY_PREFIX) + "%'";
}
if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX, true))) {
filter = filter + " and " + S3_OBJECT_KEY + " not like '" + getStringWithAltKeys(props, S3_IGNORE_KEY_PREFIX) + "%'";
}
if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING, true))) {
filter = filter + " and " + S3_OBJECT_KEY + " not like '%" + getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING) + "%'";
}
// Match files with a given extension, or use the fileFormat as the fallback incase the config is not set.
if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION, true))) {
filter = filter + " and " + S3_OBJECT_KEY + " like '%" + getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION) + "'";
} else {
filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + "%'";
}

return source.filter(filter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.schema.SchemaProvider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.List;

import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.SOURCE_FILE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.loadAsDataset;

/**
Expand All @@ -39,21 +43,28 @@
*/
public class CloudDataFetcher implements Serializable {

private final String fileFormat;
private TypedProperties props;
private static final String EMPTY_STRING = "";

private final TypedProperties props;

private static final Logger LOG = LoggerFactory.getLogger(CloudDataFetcher.class);

private static final long serialVersionUID = 1L;

public CloudDataFetcher(TypedProperties props, String fileFormat) {
this.fileFormat = fileFormat;
public CloudDataFetcher(TypedProperties props) {
this.props = props;
}

public static String getFileFormat(TypedProperties props) {
// This is to ensure backward compatibility where we were using the
// config SOURCE_FILE_FORMAT for file format in previous versions.
return StringUtils.isNullOrEmpty(getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING))
? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
: getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
}

public Option<Dataset<Row>> getCloudObjectDataDF(SparkSession spark, List<CloudObjectMetadata> cloudObjectMetadata,
TypedProperties props, Option<SchemaProvider> schemaProviderOption) {
return loadAsDataset(spark, cloudObjectMetadata, props, fileFormat, schemaProviderOption);
return loadAsDataset(spark, cloudObjectMetadata, props, getFileFormat(props), schemaProviderOption);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.CloudObjectMetadata;

import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -51,24 +52,15 @@
*/
public class GcsObjectMetadataFetcher implements Serializable {

/**
* The default file format to assume if {@link GcsIngestionConfig#GCS_INCR_DATAFILE_EXTENSION} is not given.
*/
private final String fileFormat;
private final TypedProperties props;

private static final String GCS_PREFIX = "gs://";
private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(GcsObjectMetadataFetcher.class);

/**
* @param fileFormat The default file format to assume if {@link GcsIngestionConfig#GCS_INCR_DATAFILE_EXTENSION}
* is not given.
*/
public GcsObjectMetadataFetcher(TypedProperties props, String fileFormat) {
public GcsObjectMetadataFetcher(TypedProperties props) {
this.props = props;
this.fileFormat = fileFormat;
}

/**
Expand All @@ -86,41 +78,42 @@ public List<CloudObjectMetadata> getGcsObjectMetadata(JavaSparkContext jsc, Data
.collectAsList();
}

/**
* @param cloudObjectMetadataDF a Dataset that contains metadata of GCS objects. Assumed to be a persisted form
* of a Cloud Storage Pubsub Notification event.
* @return Dataset<Row> after apply the filtering.
*/
public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
String filter = createFilter();
LOG.info("Adding filter string to Dataset: " + filter);

return cloudObjectMetadataDF.filter(filter);
}

/**
* Add optional filters that narrow down the list of GCS objects to fetch.
*/
private String createFilter() {
public static String generateFilter(TypedProperties props) {
StringBuilder filter = new StringBuilder("size > 0");

getPropVal(SELECT_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append(" and name like '" + val + "%'"));
getPropVal(IGNORE_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append(" and name not like '" + val + "%'"));
getPropVal(IGNORE_RELATIVE_PATH_SUBSTR).ifPresent(val -> filter.append(" and name not like '%" + val + "%'"));
getPropVal(props, SELECT_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append(" and name like '" + val + "%'"));
getPropVal(props, IGNORE_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append(" and name not like '" + val + "%'"));
getPropVal(props, IGNORE_RELATIVE_PATH_SUBSTR).ifPresent(val -> filter.append(" and name not like '%" + val + "%'"));

// Match files with a given extension, or use the fileFormat as the default.
getPropVal(CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat))
String fileFormat = CloudDataFetcher.getFileFormat(props);
getPropVal(props, CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat))
.map(val -> filter.append(" and name like '%" + val + "'"));

return filter.toString();
}

private Option<String> getPropVal(ConfigProperty<String> configProperty) {
private static Option<String> getPropVal(TypedProperties props, ConfigProperty<String> configProperty) {
String value = getStringWithAltKeys(props, configProperty, true);
if (!isNullOrEmpty(value)) {
return Option.of(value);
}

return Option.empty();
}

/**
* @param cloudObjectMetadataDF a Dataset that contains metadata of GCS objects. Assumed to be a persisted form
* of a Cloud Storage Pubsub Notification event.
* @return Dataset<Row> after apply the filtering.
*/
public Dataset<Row> applyFilter(Dataset<Row> cloudObjectMetadataDF) {
String filter = generateFilter(props);
LOG.info("Adding filter string to Dataset: " + filter);

return cloudObjectMetadataDF.filter(filter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
Expand Down Expand Up @@ -283,7 +284,7 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe
TypedProperties typedProperties) {

GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, jsc(),
spark(), schemaProvider.orElse(null), new GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher, queryRunner);
spark(), schemaProvider.orElse(null), new GcsObjectMetadataFetcher(typedProperties), gcsObjectDataFetcher, queryRunner);

Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull, sourceLimit);

Expand Down Expand Up @@ -374,7 +375,7 @@ private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy miss
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath());
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
missingCheckpointStrategy.name());
properties.setProperty("hoodie.deltastreamer.source.gcsincr.datafile.format", "json");
properties.setProperty(CloudSourceConfig.DATAFILE_FORMAT.key(), "json");
return new TypedProperties(properties);
}

Expand Down

0 comments on commit 8a09c77

Please sign in to comment.