Skip to content

Commit

Permalink
Incremental API and disk format for task caching (#210)
Browse files Browse the repository at this point in the history
The Task.execute() call now can examine old results and changes to
inputs to make decisions about incrementally building new outputs
more efficiently.

Fixes #176
  • Loading branch information
niloc132 authored Aug 15, 2023
1 parent d90ded0 commit c18f3ea
Show file tree
Hide file tree
Showing 23 changed files with 886 additions and 101 deletions.
7 changes: 7 additions & 0 deletions build-caching/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
<version>0.15.0</version>
</dependency>

<!-- JSON serialization -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>

<!-- Test -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.vertispan.j2cl.build;

import com.vertispan.j2cl.build.impl.CollectedTaskInputs;
import com.vertispan.j2cl.build.task.BuildLog;
import com.vertispan.j2cl.build.task.OutputTypes;
import com.vertispan.j2cl.build.task.TaskFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.vertispan.j2cl.build;

import com.vertispan.j2cl.build.task.CachedPath;

import java.nio.file.Path;
import java.util.Optional;

/**
* Implementation of the ChangedCachedPath interface.
*/
public class ChangedCachedPath implements com.vertispan.j2cl.build.task.ChangedCachedPath {
private final ChangeType type;
private final Path sourcePath;
private final CachedPath newIfAny;

public ChangedCachedPath(ChangeType type, Path sourcePath, CachedPath newPath) {
this.type = type;
this.sourcePath = sourcePath;
this.newIfAny = newPath;
}

@Override
public ChangeType changeType() {
return type;
}

@Override
public Path getSourcePath() {
return sourcePath;
}

@Override
public Optional<Path> getNewAbsolutePath() {
return Optional.ofNullable(newIfAny).map(CachedPath::getAbsolutePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,8 @@ public DefaultDiskCache(File cacheDir, Executor executor) throws IOException {
}

@Override
protected Path taskDir(CollectedTaskInputs inputs) {
String projectName = inputs.getProject().getKey();
Murmur3F hash = new Murmur3F();

hash.update(inputs.getTaskFactory().getClass().toString().getBytes(StandardCharsets.UTF_8));
hash.update(inputs.getTaskFactory().getTaskName().getBytes(StandardCharsets.UTF_8));
hash.update(inputs.getTaskFactory().getVersion().getBytes(StandardCharsets.UTF_8));

for (Input input : inputs.getInputs()) {
input.updateHash(hash);
}

for (Map.Entry<String, String> entry : inputs.getUsedConfigs().entrySet()) {
hash.update(entry.getKey().getBytes(StandardCharsets.UTF_8));
if (entry.getValue() == null) {
hash.update(0);
} else {
hash.update(entry.getValue().getBytes(StandardCharsets.UTF_8));
}
}

return cacheDir.toPath().resolve(projectName.replaceAll("[^\\-_a-zA-Z0-9.]", "-")).resolve(hash.getValueHexString() + "-" + inputs.getTaskFactory().getOutputType());
protected Path taskDir(String projectName, String hashString, String outputType) {
return cacheDir.toPath().resolve(projectName.replaceAll("[^\\-_a-zA-Z0-9.]", "-")).resolve(hashString + "-" + outputType);
}

@Override
Expand All @@ -72,4 +52,9 @@ protected Path logFile(Path taskDir) {
protected Path outputDir(Path taskDir) {
return taskDir.resolve("results");
}

@Override
protected Path cacheSummary(Path taskDir) {
return taskDir.resolve("cacheSummary.json");
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
package com.vertispan.j2cl.build;

import com.google.gson.GsonBuilder;
import com.vertispan.j2cl.build.impl.CollectedTaskInputs;
import com.vertispan.j2cl.build.task.CachedPath;
import io.methvin.watcher.PathUtils;
import io.methvin.watcher.hashing.FileHash;
import io.methvin.watcher.hashing.FileHasher;
import io.methvin.watcher.hashing.Murmur3F;
import io.methvin.watchservice.MacOSXListeningWatchService;
import io.methvin.watchservice.WatchablePath;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.WatchService;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

/**
* Manages the cached task inputs and outputs.
* Manages the cached task inputs and outputs, without direct knowledge of the project or task apis.
*/
public abstract class DiskCache {
private static final boolean IS_MAC = System.getProperty("os.name").toLowerCase().contains("mac");
Expand All @@ -38,6 +40,10 @@ public CacheResult(Path taskDir) {
this.taskDir = taskDir;
}

public Path taskDir() {
return taskDir;
}

public Path logFile() {
//TODO finish building a logger that will write to this
return DiskCache.this.logFile(taskDir);
Expand All @@ -54,6 +60,10 @@ public TaskOutput output() {
return taskOutput;
}

public Path cachedSummary() {
return DiskCache.this.cacheSummary(taskDir);
}

public void markSuccess() {
markFinished(this);
runningTasks.remove(taskDir);
Expand Down Expand Up @@ -237,7 +247,9 @@ public Path getAbsolutePath() {
return absoluteParent.resolve(sourcePath);
}

@Override
/**
* Internal API, as this is not at this time used by any caller.
*/
public FileHash getHash() {
return hash;
}
Expand Down Expand Up @@ -340,16 +352,57 @@ private void deleteRecursively(Path path) throws IOException {
}
}

protected abstract Path taskDir(CollectedTaskInputs inputs);
private String taskSummaryContents(CollectedTaskInputs inputs) {
TaskSummaryDiskFormat src = new TaskSummaryDiskFormat();
src.setProjectKey(inputs.getProject().getKey());
src.setOutputType(inputs.getTaskFactory().getOutputType());
src.setTaskImpl(inputs.getTaskFactory().getClass().getName());
src.setTaskImplVersion(inputs.getTaskFactory().getVersion());

src.setInputs(inputs.getInputs().stream()
.map(Input::makeDiskFormat)
.collect(Collectors.groupingBy(i -> i.getProjectKey() + "-" + i.getOutputType()))
.values().stream()
.map(list -> {
TaskSummaryDiskFormat.InputDiskFormat result = new TaskSummaryDiskFormat.InputDiskFormat();
result.setProjectKey(list.get(0).getProjectKey());
result.setOutputType(list.get(0).getOutputType());

result.setFileHashes(
list.stream().flatMap(i -> i.getFileHashes().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (left, right) -> {
if (left.equals(right)) {
return left;
}
throw new IllegalStateException("Two hashes for one file! " + left + " vs " + right);
}))
);

return result;
})
.collect(Collectors.toUnmodifiableList()));

src.setConfigs(inputs.getUsedConfigs());

return new GsonBuilder().serializeNulls().setPrettyPrinting().create().toJson(src);
}

protected abstract Path taskDir(String projectName, String hashString, String outputType);

protected abstract Path successMarker(Path taskDir);
protected abstract Path failureMarker(Path taskDir);
protected abstract Path logFile(Path taskDir);
protected abstract Path outputDir(Path taskDir);
protected abstract Path cacheSummary(Path taskDir);

interface Listener {
/** Ready for the current listener to do the work */
void onReady(CacheResult result);
/** Someone else did it, but failed for some reason, not re-runnable */
void onFailure(CacheResult result);
/** Someone else tried to do it, but ran into an error, possibly recoverable if we try again */
void onError(Throwable throwable);
/** Someone else finished it, successfully, notify listeners */
void onSuccess(CacheResult result);
}
public class PendingCacheResult implements Cancelable {
Expand Down Expand Up @@ -431,7 +484,14 @@ private synchronized void failure() {
*/
public void waitForTask(CollectedTaskInputs taskDetails, Listener listener) {
assert taskDetails.getInputs().stream().allMatch(Input::hasContents);
final Path taskDir = taskDir(taskDetails);

Murmur3F murmur3F = new Murmur3F();
byte[] taskSummaryContents = taskSummaryContents(taskDetails).getBytes(StandardCharsets.UTF_8);
murmur3F.update(taskSummaryContents);
String hashString = murmur3F.getValueHexString();

final Path taskDir = taskDir(taskDetails.getProject().getKey(), hashString, taskDetails.getTaskFactory().getOutputType());

PendingCacheResult cancelable = new PendingCacheResult(taskDir, listener);
taskFutures.computeIfAbsent(taskDir, ignore -> Collections.newSetFromMap(new ConcurrentHashMap<>())).add(cancelable);
try {
Expand All @@ -449,6 +509,7 @@ public void waitForTask(CollectedTaskInputs taskDetails, Listener listener) {
// caller can begin work right away
Files.createDirectory(outputDir);
Files.createFile(logFile(taskDir));
Files.write(cacheSummary(taskDir), taskSummaryContents);
cancelable.ready();
return;
}
Expand Down Expand Up @@ -546,4 +607,13 @@ public void markFailed(CacheResult failedResult) {
}
}

public Optional<CacheResult> getCacheResult(Path taskDir) {
if (Files.exists(taskDir) || Files.exists(successMarker(taskDir))) {
CacheResult result = new CacheResult(taskDir);
knownOutputs.computeIfAbsent(taskDir, this::makeOutput);
return Optional.of(result);
}
return Optional.empty();
}

}
73 changes: 63 additions & 10 deletions build-caching/src/main/java/com/vertispan/j2cl/build/Input.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.vertispan.j2cl.build;

import io.methvin.watcher.hashing.Murmur3F;
import com.vertispan.j2cl.build.task.ChangedCachedPath;

import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.util.*;
Expand All @@ -18,10 +17,29 @@
* interested in, and take the hash of the hashes to represent
*/
public class Input implements com.vertispan.j2cl.build.task.Input {
public interface BuildSpecificChanges {
List<? extends ChangedCachedPath> compute();
static BuildSpecificChanges memoize(BuildSpecificChanges changes) {
return new BuildSpecificChanges() {
private List<ChangedCachedPath> results;
@Override
public List<ChangedCachedPath> compute() {
if (results == null) {
List<? extends ChangedCachedPath> computed = changes.compute();
assert computed != null;
results = new ArrayList<>(computed);
}

return results;
}
};
}
}
private final Project project;
private final String outputType;

private TaskOutput contents;
private BuildSpecificChanges buildSpecificChanges;

public Input(Project project, String outputType) {
this.project = project;
Expand Down Expand Up @@ -66,6 +84,13 @@ public Collection<DiskCache.CacheEntry> getFilesAndHashes() {
.collect(Collectors.toUnmodifiableList());
}

@Override
public Collection<? extends ChangedCachedPath> getChanges() {
return wrapped.getChanges().stream()
.filter(entry -> Arrays.stream(filters).anyMatch(f -> f.matches(entry.getSourcePath())))
.collect(Collectors.toUnmodifiableList());
}

@Override
public com.vertispan.j2cl.build.task.Project getProject() {
return wrapped.getProject();
Expand Down Expand Up @@ -103,17 +128,37 @@ public void setCurrentContents(TaskOutput contents) {
/**
* Internal API.
*
* Updates the given hash object with the filtered file inputs - their paths and their
* hashes, so that if files are moved or changed we change the hash value, but we don't
* re-hash each file every time we ask.
* Once a task is finished, we can let it be used by other tasks as an input. In
* order for those tasks to execute incrementally, each particular Input must
* only have changed relative to the last time that its owner task ran successfully.
* Instead of being set all at once, this is re-assigned before each task actually
* executes.
*/
public void updateHash(Murmur3F hash) {
for (DiskCache.CacheEntry fileAndHash : getFilesAndHashes()) {
hash.update(fileAndHash.getSourcePath().toString().getBytes(StandardCharsets.UTF_8));
hash.update(fileAndHash.getHash().asBytes());
}
public void setBuildSpecificChanges(BuildSpecificChanges buildSpecificChanges) {
this.buildSpecificChanges = BuildSpecificChanges.memoize(buildSpecificChanges);
}

/**
* Internal API.
*
* Creates a simple payload that describes this input, so it can be written to disk.
*/
public TaskSummaryDiskFormat.InputDiskFormat makeDiskFormat() {
TaskSummaryDiskFormat.InputDiskFormat out = new TaskSummaryDiskFormat.InputDiskFormat();

out.setProjectKey(getProject().getKey());
out.setOutputType(getOutputType());

out.setFileHashes(getFilesAndHashes().stream().collect(Collectors.toMap(
e -> e.getSourcePath().toString(),
e -> e.getHash().asString()
)));

return out;
}



@Override
public Project getProject() {
return project;
Expand All @@ -139,6 +184,14 @@ public Collection<DiskCache.CacheEntry> getFilesAndHashes() {
return contents.filesAndHashes();
}

@Override
public Collection<? extends ChangedCachedPath> getChanges() {
if (buildSpecificChanges == null) {
throw new NullPointerException("Changes not yet provided " + this);
}
return buildSpecificChanges.compute();
}

@Override
public String toString() {
return "Input{" +
Expand Down
Loading

0 comments on commit c18f3ea

Please sign in to comment.