Skip to content

Commit

Permalink
Add file extension filter for s3 incr source
Browse files Browse the repository at this point in the history
  • Loading branch information
rmahindra123 authored and rmahindra123 committed Feb 18, 2024
1 parent 3a97b01 commit 0dcca94
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
import static org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
Expand Down Expand Up @@ -210,8 +211,13 @@ Dataset<Row> applyFilter(Dataset<Row> source, String fileFormat) {
if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING, true))) {
filter = filter + " and " + S3_OBJECT_KEY + " not like '%" + getStringWithAltKeys(props, S3_IGNORE_KEY_SUBSTRING) + "%'";
}
// add file format filtering by default
filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + "%'";
// Match files with a given extension, or use the fileFormat as the fallback incase the config is not set.
if (!StringUtils.isNullOrEmpty(getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION, true))) {
filter = filter + " and " + S3_OBJECT_KEY + " like '%" + getStringWithAltKeys(props, CLOUD_DATAFILE_EXTENSION) + "'";
} else {
filter = filter + " and " + S3_OBJECT_KEY + " like '%" + fileFormat + "%'";
}

return source.filter(filter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.config.CloudSourceConfig;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
Expand All @@ -59,6 +60,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
Expand Down Expand Up @@ -287,32 +289,44 @@ public void testTwoFilesAndContinueInSameCommit() throws IOException {

}

@Test
public void testTwoFilesAndContinueAcrossCommits() throws IOException {
@ParameterizedTest
@ValueSource(strings = {
".json",
".gz"
})
public void testTwoFilesAndContinueAcrossCommits(String extension) throws IOException {
String commitTimeForWrites = "2";
String commitTimeForReads = "1";

Pair<String, List<HoodieRecord>> inserts = writeS3MetadataRecords(commitTimeForReads);
inserts = writeS3MetadataRecords(commitTimeForWrites);

TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
// In the case the extension is explicitly set to something other than the file format.
if (!extension.endsWith("json")) {
typedProperties.setProperty(CloudSourceConfig.CLOUD_DATAFILE_EXTENSION.key(), extension);
}

List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>();
// Add file paths and sizes to the list
filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 100L, "1"));
filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 150L, "1"));
filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "2"));
filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 150L, "2"));
filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file1%s", extension), 100L, "1"));
filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file3%s", extension), 200L, "1"));
filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file2%s", extension), 150L, "1"));
filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file4%s", extension), 50L, "2"));
filePathSizeAndCommitTime.add(Triple.of(String.format("path/to/file5%s", extension), 150L, "2"));

Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);

setMockQueryRunner(inputDs);
when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider)))
.thenReturn(Option.empty());

readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L, "1#path/to/file1.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 100L, "1#path/to/file2.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 1000L, "2#path/to/file5.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 100L,
"1#path/to/file1" + extension, typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1" + extension), 100L,
"1#path/to/file2" + extension, typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2" + extension), 1000L,
"2#path/to/file5" + extension, typedProperties);
}

@Test
Expand Down

0 comments on commit 0dcca94

Please sign in to comment.