Skip to content

Commit

Permalink
remote: make the dynamic spawn scheduler work. Fixes #8646
Browse files Browse the repository at this point in the history
This change fixes the correctness issue of dynamic spawn scheduler when being used with remote execution. See #8646 for more details.

There's a performance issue remaining: #8647

Closes #8648.

PiperOrigin-RevId: 253998300
  • Loading branch information
buchgr authored and copybara-github committed Jun 19, 2019
1 parent e4545fe commit d75b6cf
Show file tree
Hide file tree
Showing 11 changed files with 472 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.actions.cache.MetadataInjector;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.DirectoryMetadata;
Expand All @@ -70,6 +71,7 @@
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -83,6 +85,12 @@
@ThreadSafety.ThreadSafe
public abstract class AbstractRemoteActionCache implements AutoCloseable {

/** See {@link SpawnExecutionContext#lockOutputFiles()}. */
@FunctionalInterface
interface OutputFilesLocker {
void lock() throws InterruptedException;
}

private static final ListenableFuture<Void> COMPLETED_SUCCESS = SettableFuture.create();
private static final ListenableFuture<byte[]> EMPTY_BYTES = SettableFuture.create();

Expand Down Expand Up @@ -161,16 +169,26 @@ public void onFailure(Throwable t) {
return outerF;
}

private static Path toTmpDownloadPath(Path actualPath) {
return actualPath.getParentDirectory().getRelative(actualPath.getBaseName() + ".tmp");
}

/**
* Download the output files and directory trees of a remotely executed action to the local
* machine, as well stdin / stdout to the given files.
*
* <p>In case of failure, this method deletes any output files it might have already created.
*
* @param outputFilesLocker ensures that we are the only ones writing to the output files when
* using the dynamic spawn strategy.
* @throws IOException in case of a cache miss or if the remote cache is unavailable.
* @throws ExecException in case clean up after a failed download failed.
*/
public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)
public void download(
ActionResult result,
Path execRoot,
FileOutErr origOutErr,
OutputFilesLocker outputFilesLocker)
throws ExecException, IOException, InterruptedException {
ActionResultMetadata metadata = parseActionResultMetadata(result, execRoot);

Expand All @@ -182,7 +200,8 @@ public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)
.map(
(file) -> {
try {
ListenableFuture<Void> download = downloadFile(file.path(), file.digest());
ListenableFuture<Void> download =
downloadFile(toTmpDownloadPath(file.path()), file.digest());
return Futures.transform(download, (d) -> file, directExecutor());
} catch (IOException e) {
return Futures.<FileMetadata>immediateFailedFuture(e);
Expand All @@ -209,10 +228,8 @@ public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)

for (ListenableFuture<FileMetadata> download : downloads) {
try {
FileMetadata outputFile = getFromFuture(download);
if (outputFile != null) {
outputFile.path().setExecutable(outputFile.isExecutable());
}
// Wait for all downloads to finish.
getFromFuture(download);
} catch (IOException e) {
downloadException = downloadException == null ? e : downloadException;
} catch (InterruptedException e) {
Expand All @@ -222,10 +239,9 @@ public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)

if (downloadException != null || interruptedException != null) {
try {
// Delete any (partially) downloaded output files, since any subsequent local execution
// of this action may expect none of the output files to exist.
// Delete any (partially) downloaded output files.
for (OutputFile file : result.getOutputFilesList()) {
execRoot.getRelative(file.getPath()).delete();
toTmpDownloadPath(execRoot.getRelative(file.getPath())).delete();
}
for (OutputDirectory directory : result.getOutputDirectoriesList()) {
// Only delete the directories below the output directories because the output
Expand Down Expand Up @@ -261,6 +277,12 @@ public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)
tmpOutErr.clearErr();
}

// Ensure that we are the only ones writing to the output files when using the dynamic spawn
// strategy.
outputFilesLocker.lock();

moveOutputsToFinalLocation(downloads);

List<SymlinkMetadata> symlinksInDirectories = new ArrayList<>();
for (Entry<Path, DirectoryMetadata> entry : metadata.directories()) {
entry.getKey().createDirectoryAndParents();
Expand All @@ -275,6 +297,36 @@ public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)
createSymlinks(symlinks);
}

/**
* Copies moves the downloaded outputs from their download location to their declared location.
*/
private void moveOutputsToFinalLocation(List<ListenableFuture<FileMetadata>> downloads)
throws IOException, InterruptedException {
List<FileMetadata> finishedDownloads = new ArrayList<>(downloads.size());
for (ListenableFuture<FileMetadata> finishedDownload : downloads) {
FileMetadata outputFile = getFromFuture(finishedDownload);
if (outputFile != null) {
finishedDownloads.add(outputFile);
}
}
/*
* Sort the list lexicographically based on its temporary download path in order to avoid
* filename clashes when moving the files:
*
* Consider an action that produces two outputs foo and foo.tmp. These outputs would initially
* be downloaded to foo.tmp and foo.tmp.tmp. When renaming them to foo and foo.tmp we need to
* ensure that rename(foo.tmp, foo) happens before rename(foo.tmp.tmp, foo.tmp). We ensure this
* by doing the renames in lexicographical order of the download names.
*/
Collections.sort(finishedDownloads, Comparator.comparing(f -> toTmpDownloadPath(f.path())));

// Move the output files from their temporary name to the actual output file name.
for (FileMetadata outputFile : finishedDownloads) {
FileSystemUtils.moveFile(toTmpDownloadPath(outputFile.path()), outputFile.path());
outputFile.path().setExecutable(outputFile.isExecutable());
}
}

private void createSymlinks(Iterable<SymlinkMetadata> symlinks) throws IOException {
for (SymlinkMetadata symlink : symlinks) {
if (symlink.target().isAbsolute()) {
Expand Down Expand Up @@ -376,6 +428,8 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
* @param execRoot the execution root
* @param metadataInjector the action's metadata injector that allows this method to inject
* metadata about an action output instead of downloading the output
* @param outputFilesLocker ensures that we are the only ones writing to the output files when
* using the dynamic spawn strategy.
* @throws IOException in case of failure
* @throws InterruptedException in case of receiving an interrupt
*/
Expand All @@ -386,7 +440,8 @@ public InMemoryOutput downloadMinimal(
@Nullable PathFragment inMemoryOutputPath,
OutErr outErr,
Path execRoot,
MetadataInjector metadataInjector)
MetadataInjector metadataInjector,
OutputFilesLocker outputFilesLocker)
throws IOException, InterruptedException {
Preconditions.checkState(
result.getExitCode() == 0,
Expand All @@ -403,6 +458,10 @@ public InMemoryOutput downloadMinimal(
+ "--experimental_remote_download_outputs=minimal");
}

// Ensure that when using dynamic spawn strategy that we are the only ones writing to the
// output files.
outputFilesLocker.lock();

ActionInput inMemoryOutput = null;
Digest inMemoryOutputDigest = null;
for (ActionInput output : outputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
if (downloadOutputs) {
try (SilentCloseable c =
prof.profile(ProfilerTask.REMOTE_DOWNLOAD, "download outputs")) {
remoteCache.download(result, execRoot, context.getFileOutErr());
remoteCache.download(
result, execRoot, context.getFileOutErr(), context::lockOutputFiles);
}
} else {
PathFragment inMemoryOutputPath = getInMemoryOutputPath(spawn);
Expand All @@ -181,7 +182,8 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
inMemoryOutputPath,
context.getFileOutErr(),
execRoot,
context.getMetadataInjector());
context.getMetadataInjector(),
context::lockOutputFiles);
}
}
SpawnResult spawnResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ private SpawnResult downloadAndFinalizeSpawnResult(
InMemoryOutput inMemoryOutput = null;
if (downloadOutputs) {
try (SilentCloseable c = Profiler.instance().profile(REMOTE_DOWNLOAD, "download outputs")) {
remoteCache.download(actionResult, execRoot, context.getFileOutErr());
remoteCache.download(
actionResult, execRoot, context.getFileOutErr(), context::lockOutputFiles);
}
} else {
PathFragment inMemoryOutputPath = getInMemoryOutputPath(spawn);
Expand All @@ -325,7 +326,8 @@ private SpawnResult downloadAndFinalizeSpawnResult(
inMemoryOutputPath,
context.getFileOutErr(),
execRoot,
context.getMetadataInjector());
context.getMetadataInjector(),
context::lockOutputFiles);
}
}
return createSpawnResult(actionResult.getExitCode(), cacheHit, getName(), inMemoryOutput);
Expand Down Expand Up @@ -405,10 +407,11 @@ private SpawnResult execLocallyAndUploadOrFail(
return execLocallyAndUpload(
spawn, context, inputMap, remoteCache, actionKey, action, command, uploadLocalResults);
}
return handleError(cause, context.getFileOutErr(), actionKey);
return handleError(cause, context.getFileOutErr(), actionKey, context);
}

private SpawnResult handleError(IOException exception, FileOutErr outErr, ActionKey actionKey)
private SpawnResult handleError(
IOException exception, FileOutErr outErr, ActionKey actionKey, SpawnExecutionContext context)
throws ExecException, InterruptedException, IOException {
if (exception.getCause() instanceof ExecutionStatusException) {
ExecutionStatusException e = (ExecutionStatusException) exception.getCause();
Expand All @@ -417,7 +420,7 @@ private SpawnResult handleError(IOException exception, FileOutErr outErr, Action
maybeDownloadServerLogs(resp, actionKey);
if (resp.hasResult()) {
// We try to download all (partial) results even on server error, for debuggability.
remoteCache.download(resp.getResult(), execRoot, outErr);
remoteCache.download(resp.getResult(), execRoot, outErr, context::lockOutputFiles);
}
}
if (e.isExecutionTimeout()) {
Expand Down
Loading

0 comments on commit d75b6cf

Please sign in to comment.