Skip to content

Commit

Permalink
Addressing feedback from Danny
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Oct 20, 2024
1 parent 6b75315 commit ab5deba
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public List<MetadataPartitionType> getEnabledPartitionTypes() {
* Initialize the metadata table if needed.
*
* @param dataMetaClient - meta client for the data table
* @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset
* @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset
* @throws IOException on errors
*/
protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
Expand Down Expand Up @@ -1053,7 +1053,7 @@ public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieD
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient,
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
initializedPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getMetadataConfig());

Expand All @@ -1077,7 +1077,7 @@ public void update(HoodieCommitMetadata commitMetadata, HoodieData<HoodieRecord>
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
engineContext, dataWriteConfig, commitMetadata, instantTime, dataMetaClient,
enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
initializedPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), dataWriteConfig.getMetadataConfig());
HoodieData<HoodieRecord> additionalUpdates = getRecordIndexAdditionalUpserts(records, commitMetadata);
Expand Down Expand Up @@ -1230,7 +1230,7 @@ private static List<Pair<String, Pair<String, List<String>>>> getPartitionFilePa
@Override
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
cleanMetadata, instantTime, dataMetaClient, enabledPartitionTypes,
cleanMetadata, instantTime, dataMetaClient, initializedPartitionTypes,
dataWriteConfig.getBloomIndexParallelism(), dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex()));
closeInternal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ protected HoodieMetadataPayload(String key, int type,
HoodieMetadataBloomFilter metadataBloomFilter,
HoodieMetadataColumnStats columnStats,
HoodieRecordIndexInfo recordIndexMetadata,
HoodieSecondaryIndexInfo secondaryIndexMetadata, boolean isDeletedRecord) {
HoodieSecondaryIndexInfo secondaryIndexMetadata,
boolean isDeletedRecord) {
this.key = key;
this.type = type;
this.filesystemMetadata = filesystemMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,8 @@ public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(Hoodi
return engineContext.parallelize(deleteFileList, parallelism)
.flatMap(deleteFileInfoPair -> {
String partitionPath = deleteFileInfoPair.getLeft();
String filePath = partitionPath.equals(EMPTY_PARTITION_NAME) ? deleteFileInfoPair.getRight() : partitionPath + "/" + deleteFileInfoPair.getRight();
return getColumnStatsRecords(partitionPath, filePath, dataMetaClient, columnsToIndex, true).iterator();
String fileName = deleteFileInfoPair.getRight();
return getColumnStatsRecords(partitionPath, fileName, dataMetaClient, columnsToIndex, true).iterator();
});
}

Expand Down Expand Up @@ -969,11 +969,10 @@ public static HoodieData<HoodieRecord> convertFilesToColumnStatsRecords(HoodieEn
Option<Schema> writerSchemaOpt = fetchStatsForLogFiles ? tryResolveSchemaForTable(dataMetaClient) : Option.empty();
int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), columnStatsIndexParallelism), 1);
return engineContext.parallelize(partitionFileFlagTupleList, parallelism).flatMap(partitionFileFlagTuple -> {
final String partitionName = partitionFileFlagTuple.f0;
final String partitionPath = partitionFileFlagTuple.f0;
final String filename = partitionFileFlagTuple.f1;
final boolean isDeleted = partitionFileFlagTuple.f2;
final String filePathWithPartition = partitionName.equals(NON_PARTITIONED_NAME) ? filename : partitionName + "/" + filename;
return getColumnStatsRecords(partitionName, filePathWithPartition, dataMetaClient, columnsToIndex, isDeleted,
return getColumnStatsRecords(partitionPath, filename, dataMetaClient, columnsToIndex, isDeleted,
fetchStatsForLogFiles, writerSchemaOpt, maxReaderBufferSize).iterator();
});
}
Expand Down Expand Up @@ -1214,27 +1213,26 @@ private static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteS
return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
}

return getColumnStatsRecords(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
String filePath = writeStat.getPath();
return getColumnStatsRecords(writeStat.getPartitionPath(), filePath.substring(filePath.lastIndexOf("/") + 1), datasetMetaClient, columnsToIndex, false);
}

private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath,
String filePathWithPartition,
String fileName,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
boolean isDeleted) {
return getColumnStatsRecords(partitionPath, filePathWithPartition, datasetMetaClient, columnsToIndex, isDeleted, false, Option.empty(), -1);
return getColumnStatsRecords(partitionPath, fileName, datasetMetaClient, columnsToIndex, isDeleted, false, Option.empty(), -1);
}

private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath,
String filePathWithPartition,
String fileName,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
boolean isDeleted,
boolean fetchStatsForLogFiles,
Option<Schema> writerSchemaOpt,
int maxBufferSize) {
String filePartitionPath = filePathWithPartition.startsWith("/") ? filePathWithPartition.substring(1) : filePathWithPartition;
String fileName = filePartitionPath.substring(filePartitionPath.lastIndexOf("/") + 1);

if (isDeleted) {
// TODO we should delete records instead of stubbing them
Expand All @@ -1245,34 +1243,35 @@ private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath,
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, true);
}

String partitionPathFileName = partitionPath.equals(EMPTY_PARTITION_NAME) ? fileName : partitionPath + "/" + fileName;
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, columnsToIndex, fetchStatsForLogFiles, writerSchemaOpt, maxBufferSize);
readColumnRangeMetadataFrom(partitionPathFileName, datasetMetaClient, columnsToIndex, fetchStatsForLogFiles, writerSchemaOpt, maxBufferSize);

return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadata, false);
}

private static List<HoodieColumnRangeMetadata<Comparable>> readColumnRangeMetadataFrom(String filePathWithPartition,
private static List<HoodieColumnRangeMetadata<Comparable>> readColumnRangeMetadataFrom(String partitionPathFileName,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
boolean fetchStatsForLogFiles,
Option<Schema> writerSchemaOpt,
int maxBufferSize) {
try {
StoragePath fullFilePath = new StoragePath(datasetMetaClient.getBasePath(), filePathWithPartition);
if (filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
StoragePath fullFilePath = new StoragePath(datasetMetaClient.getBasePath(), partitionPathFileName);
if (partitionPathFileName.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(datasetMetaClient.getStorage(), fullFilePath, columnsToIndex);
} else if (fetchStatsForLogFiles && FSUtils.isLogFile(filePathWithPartition.substring(filePathWithPartition.lastIndexOf("/") + 1))) {
LOG.warn("Reading log file: {}, to build column range metadata.", filePathWithPartition);
} else if (fetchStatsForLogFiles && FSUtils.isLogFile(partitionPathFileName.substring(partitionPathFileName.lastIndexOf("/") + 1))) {
LOG.warn("Reading log file: {}, to build column range metadata.", partitionPathFileName);
return getLogFileColumnRangeMetadata(fullFilePath.toString(), datasetMetaClient, columnsToIndex, writerSchemaOpt, maxBufferSize);
}
LOG.warn("Column range index not supported for: {}", filePathWithPartition);
LOG.warn("Column range index not supported for: {}", partitionPathFileName);
return Collections.emptyList();
} catch (Exception e) {
// NOTE: In case reading column range metadata from individual file failed,
// we simply fall back, in lieu of failing the whole task
LOG.error("Failed to fetch column range metadata for: {}", filePathWithPartition);
LOG.error("Failed to fetch column range metadata for: {}", partitionPathFileName);
return Collections.emptyList();
}
}
Expand Down Expand Up @@ -2151,7 +2150,7 @@ public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(Hoodi
final String partitionPath = partitionInfo.getRelativePath();
// Step 1: Collect Column Metadata for Each File
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = partitionInfo.getFileNameToSizeMap().keySet().stream()
.map(fileName -> getFileStatsRangeMetadata(partitionPath, partitionPath + "/" + fileName, dataTableMetaClient, columnsToIndex, false, true, writerSchemaOpt,
.map(fileName -> getFileStatsRangeMetadata(partitionPath, fileName, dataTableMetaClient, columnsToIndex, false, true, writerSchemaOpt,
metadataConfig.getMaxReaderBufferSize()))
.collect(Collectors.toList());

Expand All @@ -2160,21 +2159,20 @@ public static HoodieData<HoodieRecord> convertFilesToPartitionStatsRecords(Hoodi
}

private static List<HoodieColumnRangeMetadata<Comparable>> getFileStatsRangeMetadata(String partitionPath,
String filePath,
String fileName,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
boolean isDeleted,
boolean shouldReadColumnMetadataForLogFiles,
Option<Schema> writerSchemaOpt,
int maxBufferSize) {
String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath;
String fileName = FSUtils.getFileName(filePath, partitionPath);
if (isDeleted) {
return columnsToIndex.stream()
.map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
.collect(Collectors.toList());
}
return readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, columnsToIndex, shouldReadColumnMetadataForLogFiles, writerSchemaOpt, maxBufferSize);
String partitionPathFileName = partitionPath.equals(EMPTY_PARTITION_NAME) ? fileName : partitionPath + "/" + fileName;
return readColumnRangeMetadataFrom(partitionPathFileName, datasetMetaClient, columnsToIndex, shouldReadColumnMetadataForLogFiles, writerSchemaOpt, maxBufferSize);
}

public static HoodieData<HoodieRecord> convertMetadataToPartitionStatsRecords(HoodieCommitMetadata commitMetadata,
Expand Down Expand Up @@ -2281,7 +2279,8 @@ private static List<HoodieColumnRangeMetadata<Comparable>> translateWriteStatToF
return columnRangeMap.values().stream().collect(Collectors.toList());
}

return getFileStatsRangeMetadata(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false, false, writerSchemaOpt,
String filePath = writeStat.getPath();
return getFileStatsRangeMetadata(writeStat.getPartitionPath(), filePath.substring(filePath.lastIndexOf("/") + 1), datasetMetaClient, columnsToIndex, false, false, writerSchemaOpt,
-1);
}

Expand Down

0 comments on commit ab5deba

Please sign in to comment.