Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remote: make the dynamic spawn scheduler work. Fixes #8646 #8648

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.actions.cache.MetadataInjector;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.AbstractRemoteActionCache.ActionResultMetadata.DirectoryMetadata;
Expand Down Expand Up @@ -83,6 +84,12 @@
@ThreadSafety.ThreadSafe
public abstract class AbstractRemoteActionCache implements AutoCloseable {

/** See {@link SpawnExecutionContext#lockOutputFiles()}. */
@FunctionalInterface
interface OutputFilesLocker {
void lock() throws InterruptedException;
}

private static final ListenableFuture<Void> COMPLETED_SUCCESS = SettableFuture.create();
private static final ListenableFuture<byte[]> EMPTY_BYTES = SettableFuture.create();

Expand Down Expand Up @@ -161,16 +168,24 @@ public void onFailure(Throwable t) {
return outerF;
}

private static Path toTmpDownloadPath(Path actualPath) {
return actualPath.getParentDirectory().getRelative(actualPath.getBaseName() + ".tmp");
}

/**
* Download the output files and directory trees of a remotely executed action to the local
* machine, as well stdin / stdout to the given files.
*
* <p>In case of failure, this method deletes any output files it might have already created.
*
* @param outputFilesLocker ensures that we are the only ones writing to the output files when
* using the dynamic spawn strategy.
*
* @throws IOException in case of a cache miss or if the remote cache is unavailable.
* @throws ExecException in case clean up after a failed download failed.
*/
public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)
public void download(ActionResult result, Path execRoot, FileOutErr origOutErr,
OutputFilesLocker outputFilesLocker)
throws ExecException, IOException, InterruptedException {
ActionResultMetadata metadata = parseActionResultMetadata(result, execRoot);

Expand All @@ -182,7 +197,8 @@ public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)
.map(
(file) -> {
try {
ListenableFuture<Void> download = downloadFile(file.path(), file.digest());
ListenableFuture<Void> download =
downloadFile(toTmpDownloadPath(file.path()), file.digest());
return Futures.transform(download, (d) -> file, directExecutor());
} catch (IOException e) {
return Futures.<FileMetadata>immediateFailedFuture(e);
Expand All @@ -209,10 +225,8 @@ public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)

for (ListenableFuture<FileMetadata> download : downloads) {
try {
FileMetadata outputFile = getFromFuture(download);
if (outputFile != null) {
outputFile.path().setExecutable(outputFile.isExecutable());
}
// Wait for all downloads to finish.
getFromFuture(download);
} catch (IOException e) {
downloadException = downloadException == null ? e : downloadException;
} catch (InterruptedException e) {
Expand All @@ -222,10 +236,9 @@ public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)

if (downloadException != null || interruptedException != null) {
try {
// Delete any (partially) downloaded output files, since any subsequent local execution
// of this action may expect none of the output files to exist.
// Delete any (partially) downloaded output files.
for (OutputFile file : result.getOutputFilesList()) {
execRoot.getRelative(file.getPath()).delete();
toTmpDownloadPath(execRoot.getRelative(file.getPath())).delete();
}
for (OutputDirectory directory : result.getOutputDirectoriesList()) {
// Only delete the directories below the output directories because the output
Expand Down Expand Up @@ -261,6 +274,19 @@ public void download(ActionResult result, Path execRoot, FileOutErr origOutErr)
tmpOutErr.clearErr();
}

// Ensure that we are the only ones writing to the output files when using the dynamic spawn
// strategy.
outputFilesLocker.lock();

// Move the output files from their temporary name to the actual output file name.
for (ListenableFuture<FileMetadata> finishedDownload : downloads) {
FileMetadata outputFile = getFromFuture(finishedDownload);
if (outputFile != null) {
FileSystemUtils.moveFile(toTmpDownloadPath(outputFile.path()), outputFile.path());
outputFile.path().setExecutable(outputFile.isExecutable());
}
}

List<SymlinkMetadata> symlinksInDirectories = new ArrayList<>();
for (Entry<Path, DirectoryMetadata> entry : metadata.directories()) {
entry.getKey().createDirectoryAndParents();
Expand Down Expand Up @@ -376,6 +402,8 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
* @param execRoot the execution root
* @param metadataInjector the action's metadata injector that allows this method to inject
* metadata about an action output instead of downloading the output
* @param outputFilesLocker ensures that we are the only ones writing to the output files when
* using the dynamic spawn strategy.
* @throws IOException in case of failure
* @throws InterruptedException in case of receiving an interrupt
*/
Expand All @@ -386,8 +414,8 @@ public InMemoryOutput downloadMinimal(
@Nullable PathFragment inMemoryOutputPath,
OutErr outErr,
Path execRoot,
MetadataInjector metadataInjector)
throws IOException, InterruptedException {
MetadataInjector metadataInjector,
OutputFilesLocker outputFilesLocker) throws IOException, InterruptedException {
Preconditions.checkState(
result.getExitCode() == 0,
"injecting remote metadata is only supported for successful actions (exit code 0).");
Expand All @@ -403,6 +431,10 @@ public InMemoryOutput downloadMinimal(
+ "--experimental_remote_download_outputs=minimal");
}

// Ensure that when using dynamic spawn strategy that we are the only ones writing to the
// output files.
outputFilesLocker.lock();

ActionInput inMemoryOutput = null;
Digest inMemoryOutputDigest = null;
for (ActionInput output : outputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
if (downloadOutputs) {
try (SilentCloseable c =
prof.profile(ProfilerTask.REMOTE_DOWNLOAD, "download outputs")) {
remoteCache.download(result, execRoot, context.getFileOutErr());
remoteCache.download(result, execRoot, context.getFileOutErr(),
context::lockOutputFiles);
}
} else {
PathFragment inMemoryOutputPath = getInMemoryOutputPath(spawn);
Expand All @@ -181,7 +182,8 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
inMemoryOutputPath,
context.getFileOutErr(),
execRoot,
context.getMetadataInjector());
context.getMetadataInjector(),
context::lockOutputFiles);
}
}
SpawnResult spawnResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ private SpawnResult downloadAndFinalizeSpawnResult(
InMemoryOutput inMemoryOutput = null;
if (downloadOutputs) {
try (SilentCloseable c = Profiler.instance().profile(REMOTE_DOWNLOAD, "download outputs")) {
remoteCache.download(actionResult, execRoot, context.getFileOutErr());
remoteCache.download(actionResult, execRoot, context.getFileOutErr(),
context::lockOutputFiles);
}
} else {
PathFragment inMemoryOutputPath = getInMemoryOutputPath(spawn);
Expand All @@ -325,7 +326,8 @@ private SpawnResult downloadAndFinalizeSpawnResult(
inMemoryOutputPath,
context.getFileOutErr(),
execRoot,
context.getMetadataInjector());
context.getMetadataInjector(),
context::lockOutputFiles);
}
}
return createSpawnResult(actionResult.getExitCode(), cacheHit, getName(), inMemoryOutput);
Expand Down Expand Up @@ -405,10 +407,11 @@ private SpawnResult execLocallyAndUploadOrFail(
return execLocallyAndUpload(
spawn, context, inputMap, remoteCache, actionKey, action, command, uploadLocalResults);
}
return handleError(cause, context.getFileOutErr(), actionKey);
return handleError(cause, context.getFileOutErr(), actionKey, context);
}

private SpawnResult handleError(IOException exception, FileOutErr outErr, ActionKey actionKey)
private SpawnResult handleError(IOException exception, FileOutErr outErr, ActionKey actionKey,
SpawnExecutionContext context)
throws ExecException, InterruptedException, IOException {
if (exception.getCause() instanceof ExecutionStatusException) {
ExecutionStatusException e = (ExecutionStatusException) exception.getCause();
Expand All @@ -417,7 +420,7 @@ private SpawnResult handleError(IOException exception, FileOutErr outErr, Action
maybeDownloadServerLogs(resp, actionKey);
if (resp.hasResult()) {
// We try to download all (partial) results even on server error, for debuggability.
remoteCache.download(resp.getResult(), execRoot, outErr);
remoteCache.download(resp.getResult(), execRoot, outErr, context::lockOutputFiles);
}
}
if (e.isExecutionTimeout()) {
Expand Down
Loading