Skip to content

Commit

Permalink
Revert "use sequentialBatchedExecute"
Browse files Browse the repository at this point in the history
This reverts commit 43350a6.
  • Loading branch information
bknbkn committed Oct 16, 2024
1 parent 43350a6 commit 0705afa
Showing 1 changed file with 26 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
import static org.apache.paimon.utils.ThreadPoolUtils.sequentialBatchedExecute;

/**
* Local {@link OrphanFilesClean}, it will use thread pool to execute deletion.
Expand All @@ -71,8 +70,6 @@ public class LocalOrphanFilesClean extends OrphanFilesClean {

private Set<String> candidateDeletes;

private final int parallelism;

public LocalOrphanFilesClean(FileStoreTable table) {
this(table, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1));
}
Expand All @@ -85,8 +82,9 @@ public LocalOrphanFilesClean(
FileStoreTable table, long olderThanMillis, SerializableConsumer<Path> fileCleaner) {
super(table, olderThanMillis, fileCleaner);
this.deleteFiles = new ArrayList<>();
this.parallelism = table.coreOptions().deleteFileThreadNum();
this.executor = createCachedThreadPool(parallelism, "ORPHAN_FILES_CLEAN");
this.executor =
createCachedThreadPool(
table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN");
}

public List<Path> clean() throws IOException, ExecutionException, InterruptedException {
Expand Down Expand Up @@ -141,39 +139,29 @@ private Set<String> getUsedFiles(String branch) {
try {
Set<String> manifests = ConcurrentHashMap.newKeySet();
collectWithoutDataFile(branch, usedFiles::add, manifests::add);
Iterable<String> dataFiles =
sequentialBatchedExecute(
executor,
manifestName -> {
try {
List<String> dataFilesInBatch = new ArrayList<>();
retryReadingFiles(
() ->
manifestFile.readWithIOException(
manifestName),
Collections.<ManifestEntry>emptyList())
.stream()
.map(ManifestEntry::file)
.forEach(
f -> {
if (candidateDeletes.contains(
f.fileName())) {
dataFilesInBatch.add(f.fileName());
}
f.extraFiles().stream()
.filter(candidateDeletes::contains)
.forEach(dataFilesInBatch::add);
});
return dataFilesInBatch;
} catch (IOException e) {
throw new RuntimeException(e);
}
},
new ArrayList<>(manifests),
parallelism);
for (String fileName : dataFiles) {
usedFiles.add(fileName);
}
randomlyOnlyExecute(
executor,
manifestName -> {
try {
retryReadingFiles(
() -> manifestFile.readWithIOException(manifestName),
Collections.<ManifestEntry>emptyList())
.stream()
.map(ManifestEntry::file)
.forEach(
f -> {
if (candidateDeletes.contains(f.fileName())) {
usedFiles.add(f.fileName());
}
f.extraFiles().stream()
.filter(candidateDeletes::contains)
.forEach(usedFiles::add);
});
} catch (IOException e) {
throw new RuntimeException(e);
}
},
manifests);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 0705afa

Please sign in to comment.