Skip to content

Commit

Permalink
Fixing getLatestFileSlice calls to getLatestMergedFileSlice in MDT wr…
Browse files Browse the repository at this point in the history
…iter
  • Loading branch information
nsivabalan committed Oct 18, 2024
1 parent c4680e4 commit 4f07da9
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,9 @@ private List<Pair<String, FileSlice>> getPartitionFileSlicePairs() throws IOExce
List<String> partitions = metadata.getAllPartitionPaths();
fsView.loadAllPartitions();
List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
partitions.forEach(partition -> fsView.getLatestFileSlices(partition).forEach(fs -> partitionFileSlicePairs.add(Pair.of(partition, fs))));
partitions.forEach(partition -> fsView.getLatestMergedFileSlicesBeforeOrOn(partition,
dataMetaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp())
.forEach(fs -> partitionFileSlicePairs.add(Pair.of(partition, fs))));
return partitionFileSlicePairs;
}
}
Expand Down Expand Up @@ -645,8 +647,9 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
this.getClass().getSimpleName());
} else {
final List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
String latestCommit = dataMetaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
for (String partition : partitions) {
fsView.getLatestFileSlices(partition).forEach(fs -> partitionFileSlicePairs.add(Pair.of(partition, fs)));
fsView.getLatestMergedFileSlicesBeforeOrOn(partition, latestCommit).forEach(fs -> partitionFileSlicePairs.add(Pair.of(partition, fs)));
}

LOG.info("Initializing record index from " + partitionFileSlicePairs.size() + " file slices in "
Expand Down

0 comments on commit 4f07da9

Please sign in to comment.