Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7104] Fixing cleaner savepoint interplay to fix edge case with incremental cleaning #10651

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
inflightInstant.getTimestamp(),
Option.of(timer.endTimer()),
cleanStats
cleanStats,
cleanerPlan.getExtraMetadata()
);
if (!skipLocking) {
this.txnManager.beginTransaction(Option.of(inflightInstant), Option.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;

public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> {

private static final Logger LOG = LoggerFactory.getLogger(CleanPlanActionExecutor.class);

private final Option<Map<String, String>> extraMetadata;

public CleanPlanActionExecutor(HoodieEngineContext context,
Expand Down Expand Up @@ -142,12 +142,20 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
planner.getLastCompletedCommitTimestamp(),
config.getCleanerPolicy().name(), Collections.emptyMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete);
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete, prepareExtraMetadata(planner.getSavepointedTimestamps()));
} catch (IOException e) {
throw new HoodieIOException("Failed to schedule clean operation", e);
}
}

private Map<String, String> prepareExtraMetadata(List<String> savepointedTimestamps) {
if (savepointedTimestamps.isEmpty()) {
return Collections.emptyMap();
} else {
return Collections.singletonMap(SAVEPOINTED_TIMESTAMPS, savepointedTimestamps.stream().collect(Collectors.joining(",")));
}
}

/**
* Creates a Cleaner plan if there are files to be cleaned and stores them in instant file.
* Cleaner Plan contains absolute file paths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
Expand All @@ -55,6 +56,7 @@
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -78,6 +80,7 @@ public class CleanPlanner<T, I, K, O> implements Serializable {
public static final Integer CLEAN_PLAN_VERSION_1 = CleanPlanV1MigrationHandler.VERSION;
public static final Integer CLEAN_PLAN_VERSION_2 = CleanPlanV2MigrationHandler.VERSION;
public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2;
public static final String SAVEPOINTED_TIMESTAMPS = "savepointed_timestamps";

private final SyncableFileSystemView fileSystemView;
private final HoodieTimeline commitTimeline;
Expand All @@ -86,6 +89,7 @@ public class CleanPlanner<T, I, K, O> implements Serializable {
private final HoodieTable<T, I, K, O> hoodieTable;
private final HoodieWriteConfig config;
private transient HoodieEngineContext context;
private List<String> savepointedTimestamps;

public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
this.context = context;
Expand All @@ -109,25 +113,43 @@ public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieT
LOG.info("Load all partitions and files into file system view in advance.");
fileSystemView.loadAllPartitions();
}
// collect savepointed timestamps to be assist with incremental cleaning. For non-partitioned and metadata table, we may not need this.
this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.EMPTY_LIST : (hoodieTable.isPartitioned() ? hoodieTable.getSavepointTimestamps().stream().collect(Collectors.toList())
: Collections.EMPTY_LIST);
}

/**
* @return list of savepointed timestamps in active timeline as of this clean planning.
*/
List<String> getSavepointedTimestamps() {
return this.savepointedTimestamps;
}

/**
* Get the list of data file names savepointed.
*/
public Stream<String> getSavepointedDataFiles(String savepointTime) {
if (!hoodieTable.getSavepointTimestamps().contains(savepointTime)) {
HoodieSavepointMetadata metadata = getSavepointMetadata(savepointTime);
return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream());
}

private Stream<String> getPartitionsFromSavepoint(String savepointTime) {
HoodieSavepointMetadata metadata = getSavepointMetadata(savepointTime);
return metadata.getPartitionMetadata().keySet().stream();
}

private HoodieSavepointMetadata getSavepointMetadata(String savepointTimestamp) {
if (!hoodieTable.getSavepointTimestamps().contains(savepointTimestamp)) {
throw new HoodieSavepointException(
"Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
"Could not get data files for savepoint " + savepointTimestamp + ". No such savepoint.");
}
HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
HoodieSavepointMetadata metadata;
HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTimestamp);
try {
metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
return TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get());
} catch (IOException e) {
throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTime, e);
throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTimestamp, e);
}
return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream());
}

/**
Expand Down Expand Up @@ -191,25 +213,71 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
return hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(

List<String> incrementalPartitions = hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(
instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
} else {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).distinct().collect(Collectors.toList());
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp()))
.flatMap(this::getPartitionsForInstants).distinct().collect(Collectors.toList());

// If any savepoint is removed b/w previous clean and this clean planning, lets include the partitions of interest.
// for metadata table and non partitioned table, we do not need this additional processing.
if (hoodieTable.isMetadataTable() || !hoodieTable.isPartitioned()) {
return incrementalPartitions;
}

List<String> partitionsFromDeletedSavepoints = getPartitionsFromDeletedSavepoint(cleanMetadata);
LOG.info("Including partitions part of savepointed commits which was removed after last known clean " + partitionsFromDeletedSavepoints.toString());
List<String> partitionsOfInterest = new ArrayList<>(incrementalPartitions);
partitionsOfInterest.addAll(partitionsFromDeletedSavepoints);
return partitionsOfInterest.stream().distinct().collect(Collectors.toList());
}

private List<String> getPartitionsFromDeletedSavepoint(HoodieCleanMetadata cleanMetadata) {
List<String> savepointedTimestampsFromLastClean = Arrays.stream(cleanMetadata.getExtraMetadata()
.getOrDefault(SAVEPOINTED_TIMESTAMPS, StringUtils.EMPTY_STRING).split(","))
.filter(partition -> !StringUtils.isNullOrEmpty(partition)).collect(Collectors.toList());
if (savepointedTimestampsFromLastClean.isEmpty()) {
return Collections.emptyList();
}
// check for any savepointed removed in latest compared to previous saved list
List<String> removedSavepointedTimestamps = new ArrayList<>(savepointedTimestampsFromLastClean);
removedSavepointedTimestamps.removeAll(savepointedTimestamps);
if (removedSavepointedTimestamps.isEmpty()) {
return Collections.emptyList();
}

// fetch list of partitions from the removed savepoints and add it to return list
return removedSavepointedTimestamps.stream().flatMap(savepointCommit -> {
Option<HoodieInstant> instantOption = hoodieTable.getCompletedCommitsTimeline().filter(instant -> instant.getTimestamp().equals(savepointCommit)).firstInstant();
if (!instantOption.isPresent()) {
LOG.warn("Skipping to process a commit for which savepoint was removed as the instant moved to archived timeline already");
}
HoodieInstant instant = instantOption.get();
return getPartitionsForInstants(instant);
}).collect(Collectors.toList());
}

/**
* Fetch partitions updated as part of a HoodieInstant.
* @param instant {@link HoodieInstant} of interest.
* @return partitions that were part of {@link HoodieInstant} given.
*/
private Stream<String> getPartitionsForInstants(HoodieInstant instant) {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
} else {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}

/**
Expand Down
Loading
Loading