Skip to content

Commit

Permalink
Fix col stats boostrapping for an existing table to include log file …
Browse files Browse the repository at this point in the history
…stats
  • Loading branch information
nsivabalan committed Oct 19, 2024
1 parent 01ad77a commit 0567058
Show file tree
Hide file tree
Showing 27 changed files with 884 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
import static org.apache.hudi.metadata.MetadataPartitionType.fromPartitionPath;
import static org.apache.hudi.metadata.MetadataPartitionType.getEnabledPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.getEnabledAndInitializedPartitions;

/**
* Writer implementation backed by an internal hudi table. Partition and file listing are saved within an internal MOR table
Expand Down Expand Up @@ -153,6 +154,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM
protected StorageConfiguration<?> storageConf;
protected final transient HoodieEngineContext engineContext;
protected final List<MetadataPartitionType> enabledPartitionTypes;
protected List<MetadataPartitionType> initializedPartitionTypes;
// Is the MDT bootstrapped and ready to be read from
private boolean initialized = false;
private HoodieMetadataFileSystemView metadataView;
Expand All @@ -179,6 +181,7 @@ protected HoodieBackedTableMetadataWriter(StorageConfiguration<?> storageConf,
.setBasePath(dataWriteConfig.getBasePath())
.setTimeGeneratorConfig(dataWriteConfig.getTimeGeneratorConfig()).build();
this.enabledPartitionTypes = getEnabledPartitions(dataWriteConfig.getProps(), dataMetaClient);
this.initializedPartitionTypes = new ArrayList<>(enabledPartitionTypes.size());
if (writeConfig.isMetadataTableEnabled()) {
this.metadataWriteConfig = createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy);
try {
Expand Down Expand Up @@ -233,7 +236,7 @@ public List<MetadataPartitionType> getEnabledPartitionTypes() {
* Initialize the metadata table if needed.
*
* @param dataMetaClient - meta client for the data table
* @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset
* @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset
* @throws IOException on errors
*/
protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
Expand Down Expand Up @@ -262,19 +265,26 @@ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// No partitions left to initialize, since all the metadata enabled partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
this.initializedPartitionTypes = getEnabledAndInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
return true;
}

// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);

// Initialize partitions for the first time using data from the files on the file system
if (!initializeFromFilesystem(initializationTime, partitionsToInit, inflightInstantTimestamp)) {
boolean initializedAllPendingPartitions = initializeFromFilesystem(initializationTime, partitionsToInit, inflightInstantTimestamp);
if (!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES)) {
LOG.error("Failed to initialize MDT from filesystem");
return false;
}

// if FILES partition is available, we should proceed regardless if any new partition were successfully able to initiailize or not. for eg,
// if data table has any pending instant, we may not initialize a new partition. but we should still proceed with other partitions which are
// ready to take in writes. So, lets initialize the metadata reader.
if (!initializedAllPendingPartitions) {
initMetadataReader();
}
this.initializedPartitionTypes = getEnabledAndInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
return true;
} catch (IOException e) {
Expand Down Expand Up @@ -372,7 +382,7 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
// Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT
List<DirectoryInfo> partitionInfoList;
if (filesPartitionAvailable) {
partitionInfoList = listAllPartitionsFromMDT(initializationTime, pendingDataInstants);
partitionInfoList = listAllPartitionsFromMDT(dataMetaClient.getBasePath(), initializationTime, pendingDataInstants);
} else {
// if auto initialization is enabled, then we need to list all partitions from the file system
if (dataWriteConfig.getMetadataConfig().shouldAutoInitialize()) {
Expand Down Expand Up @@ -461,8 +471,7 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
}

if (LOG.isInfoEnabled()) {
LOG.info("Initializing {} index with {} mappings and {} file groups.", partitionTypeName, fileGroupCountAndRecordsPair.getKey(),
fileGroupCountAndRecordsPair.getValue().count());
LOG.info("Initializing {} index with {} mappings", partitionTypeName, fileGroupCountAndRecordsPair.getKey());
}
HoodieTimer partitionInitTimer = HoodieTimer.start();

Expand Down Expand Up @@ -520,9 +529,11 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex(Li
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
// during initialization, we need stats for base and log files.
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, dataMetaClient, dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex());
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), true,
dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize());

final int fileGroupCount = dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
Expand Down Expand Up @@ -845,13 +856,14 @@ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializatio
* @param pendingDataInstants Files coming from pending instants are neglected
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime, Set<String> pendingDataInstants) throws IOException {
private List<DirectoryInfo> listAllPartitionsFromMDT(StoragePath basePath, String initializationTime, Set<String> pendingDataInstants) throws IOException {
List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
.map(partitionPath -> dataWriteConfig.getBasePath() + StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList());
Map<String, List<StoragePathInfo>> partitionFileMap = metadata.getAllFilesInPartitions(allPartitionPaths);
List<DirectoryInfo> dirinfoList = new ArrayList<>(partitionFileMap.size());
for (Map.Entry<String, List<StoragePathInfo>> entry : partitionFileMap.entrySet()) {
dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), initializationTime, pendingDataInstants));
String relativeDirPath = FSUtils.getRelativePartitionPath(basePath, new StoragePath(entry.getKey()));
dirinfoList.add(new DirectoryInfo(relativeDirPath, entry.getValue(), initializationTime, pendingDataInstants, false));
}
return dirinfoList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws Except

@Override
protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
throw new IllegalStateException("Not expected to see delete records in this log-scan mode. Check Job Config");
// no - op
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,12 +840,17 @@ private Map<String, String> reverseLookupSecondaryKeys(String partitionName, Lis
}

Set<String> keySet = new TreeSet<>(recordKeys);
Set<String> deletedRecordsFromLogs = new HashSet<>();
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
HoodieMetadataPayload payload = record.getData();
String recordKey = payload.getRecordKeyFromSecondaryIndex();
if (keySet.contains(recordKey)) {
logRecordsMap.put(recordKey, record);
if (!payload.isDeleted()) { // process only valid records.
String recordKey = payload.getRecordKeyFromSecondaryIndex();
if (keySet.contains(recordKey)) {
logRecordsMap.put(recordKey, record);
}
} else {
deletedRecordsFromLogs.add(record.getRecordKey());
}
});

Expand All @@ -856,7 +861,11 @@ private Map<String, String> reverseLookupSecondaryKeys(String partitionName, Lis
Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord = HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
return mergedRecord.orElseGet(null);
}));
baseFileRecords.forEach((key, value) -> recordKeyMap.put(key, value.getRecordKey()));
baseFileRecords.forEach((key, value) -> {
if (!deletedRecordsFromLogs.contains(key)) {
recordKeyMap.put(key, value.getRecordKey());
}
});
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for " + recordKeys.size() + " key : ", ioe);
} finally {
Expand Down Expand Up @@ -931,17 +940,22 @@ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> lookupSecondaryKe
List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
Collections.sort(sortedSecondaryKeys);
secondaryKeySet.addAll(sortedSecondaryKeys);
Set<String> deletedRecordKeysFromLogs = new HashSet<>();

logRecordScanner.getRecords().forEach(record -> {
HoodieMetadataPayload payload = record.getData();
String recordKey = payload.getRecordKeyFromSecondaryIndex();
if (secondaryKeySet.contains(recordKey)) {
String secondaryKey = payload.getRecordKeyFromSecondaryIndex();
logRecordsMap.computeIfAbsent(secondaryKey, k -> new HashMap<>()).put(recordKey, record);
if (!payload.isDeleted()) {
String recordKey = payload.getRecordKeyFromSecondaryIndex();
if (secondaryKeySet.contains(recordKey)) {
String secondaryKey = payload.getRecordKeyFromSecondaryIndex();
logRecordsMap.computeIfAbsent(secondaryKey, k -> new HashMap<>()).put(recordKey, record);
}
} else {
deletedRecordKeysFromLogs.add(record.getRecordKey());
}
});

return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader, sortedSecondaryKeys, logRecordsMap, timings, partitionName);
return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader, sortedSecondaryKeys, logRecordsMap, timings, partitionName, deletedRecordKeysFromLogs);
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for " + secondaryKeys.size() + " key : ", ioe);
} finally {
Expand All @@ -955,7 +969,8 @@ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> readNonUniqueReco
List<String> sortedKeys,
Map<String, HashMap<String, HoodieRecord>> logRecordsMap,
List<Long> timings,
String partitionName) throws IOException {
String partitionName,
Set<String> deleteRecordKeysFromLogs) throws IOException {
HoodieTimer timer = HoodieTimer.start();

Map<String, List<HoodieRecord<HoodieMetadataPayload>>> resultMap = new HashMap<>();
Expand All @@ -978,9 +993,13 @@ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> readNonUniqueReco
if (logRecordsMap.isEmpty() && !baseFileRecordsMap.isEmpty()) {
// file slice has only base file
timings.add(timer.endTimer());
if (!deleteRecordKeysFromLogs.isEmpty()) { // remove deleted records from log from base file record list
deleteRecordKeysFromLogs.forEach(key -> baseFileRecordsMap.remove(key));
}
return baseFileRecordsMap;
}

// check why we are not considering records missing from logs, but only from base file.
logRecordsMap.forEach((secondaryKey, logRecords) -> {
if (!baseFileRecordsMap.containsKey(secondaryKey)) {
List<HoodieRecord<HoodieMetadataPayload>> recordList = logRecords
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,38 +206,39 @@ public HoodieMetadataPayload(Option<GenericRecord> recordOpt) {
}

protected HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFileInfo> filesystemMetadata) {
this(key, type, filesystemMetadata, null, null, null, null);
this(key, type, filesystemMetadata, null, null, null, null, false);
}

protected HoodieMetadataPayload(String key, HoodieMetadataBloomFilter metadataBloomFilter) {
this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null, metadataBloomFilter, null, null, null);
this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null, metadataBloomFilter, null, null, null, metadataBloomFilter.getIsDeleted());
}

protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats columnStats, int recordType) {
this(key, recordType, null, null, columnStats, null, null);
this(key, recordType, null, null, columnStats, null, null, columnStats.getIsDeleted());
}

private HoodieMetadataPayload(String key, HoodieRecordIndexInfo recordIndexMetadata) {
this(key, MetadataPartitionType.RECORD_INDEX.getRecordType(), null, null, null, recordIndexMetadata, null);
this(key, MetadataPartitionType.RECORD_INDEX.getRecordType(), null, null, null, recordIndexMetadata, null, false);
}

private HoodieMetadataPayload(String key, HoodieSecondaryIndexInfo secondaryIndexMetadata) {
this(key, MetadataPartitionType.SECONDARY_INDEX.getRecordType(), null, null, null, null, secondaryIndexMetadata);
this(key, MetadataPartitionType.SECONDARY_INDEX.getRecordType(), null, null, null, null, secondaryIndexMetadata, secondaryIndexMetadata.getIsDeleted());
}

protected HoodieMetadataPayload(String key, int type,
Map<String, HoodieMetadataFileInfo> filesystemMetadata,
HoodieMetadataBloomFilter metadataBloomFilter,
HoodieMetadataColumnStats columnStats,
HoodieRecordIndexInfo recordIndexMetadata,
HoodieSecondaryIndexInfo secondaryIndexMetadata) {
HoodieSecondaryIndexInfo secondaryIndexMetadata, boolean isDeletedRecord) {
this.key = key;
this.type = type;
this.filesystemMetadata = filesystemMetadata;
this.bloomFilterMetadata = metadataBloomFilter;
this.columnStatMetadata = columnStats;
this.recordIndexMetadata = recordIndexMetadata;
this.secondaryIndexMetadata = secondaryIndexMetadata;
this.isDeletedRecord = isDeletedRecord;
}

/**
Expand Down
Loading

0 comments on commit 0567058

Please sign in to comment.