Skip to content

Commit

Permalink
Download outputs that match the regex in the background instead of bl…
Browse files Browse the repository at this point in the history
…ocking spawn execution

PiperOrigin-RevId: 481894591
Change-Id: I3bcfad6e878288d1b014da85108f6a2e25b303ca
  • Loading branch information
coeuvre authored and copybara-github committed Oct 18, 2022
1 parent 9a13051 commit e01e7f5
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.Action;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputPrefetcher;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact;
import com.google.devtools.build.lib.actions.Artifact.TreeFileArtifact;
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.actions.cache.MetadataHandler;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
Expand All @@ -48,7 +52,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

/**
* Abstract implementation of {@link ActionInputPrefetcher} which implements the orchestration of
Expand All @@ -59,8 +65,10 @@ public abstract class AbstractActionInputPrefetcher implements ActionInputPrefet

private final AsyncTaskCache.NoResult<Path> downloadCache = AsyncTaskCache.NoResult.create();
private final TempPathGenerator tempPathGenerator;
protected final Set<Artifact> outputsAreInputs = Sets.newConcurrentHashSet();

protected final Path execRoot;
protected final ImmutableList<Pattern> patternsToDownload;

/** Priority for the staging task. */
protected enum Priority {
Expand All @@ -86,9 +94,13 @@ protected enum Priority {
LOW,
}

protected AbstractActionInputPrefetcher(Path execRoot, TempPathGenerator tempPathGenerator) {
protected AbstractActionInputPrefetcher(
Path execRoot,
TempPathGenerator tempPathGenerator,
ImmutableList<Pattern> patternsToDownload) {
this.execRoot = execRoot;
this.tempPathGenerator = tempPathGenerator;
this.patternsToDownload = patternsToDownload;
}

protected abstract boolean shouldDownloadFile(Path path, FileArtifactValue metadata);
Expand Down Expand Up @@ -360,4 +372,31 @@ public void shutdown() {
}
}
}

@SuppressWarnings({"CheckReturnValue", "FutureReturnValueIgnored"})
public void finalizeAction(Action action, MetadataHandler metadataHandler) {
List<Artifact> inputsToDownload = new ArrayList<>();
List<Artifact> outputsToDownload = new ArrayList<>();

for (Artifact output : action.getOutputs()) {
if (outputsAreInputs.remove(output)) {
inputsToDownload.add(output);
}

for (Pattern pattern : patternsToDownload) {
if (pattern.matcher(output.getExecPathString()).matches()) {
outputsToDownload.add(output);
break;
}
}
}

if (!inputsToDownload.isEmpty()) {
prefetchFiles(inputsToDownload, metadataHandler, Priority.HIGH);
}

if (!outputsToDownload.isEmpty()) {
prefetchFiles(outputsToDownload, metadataHandler, Priority.LOW);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
Expand All @@ -33,6 +34,7 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import io.reactivex.rxjava3.core.Completable;
import java.io.IOException;
import java.util.regex.Pattern;

/**
* Stages output files that are stored remotely to the local filesystem.
Expand All @@ -51,8 +53,9 @@ class RemoteActionInputFetcher extends AbstractActionInputPrefetcher {
String commandId,
RemoteCache remoteCache,
Path execRoot,
TempPathGenerator tempPathGenerator) {
super(execRoot, tempPathGenerator);
TempPathGenerator tempPathGenerator,
ImmutableList<Pattern> patternsToDownload) {
super(execRoot, tempPathGenerator, patternsToDownload);
this.buildRequestId = Preconditions.checkNotNull(buildRequestId);
this.commandId = Preconditions.checkNotNull(commandId);
this.remoteCache = Preconditions.checkNotNull(remoteCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -170,8 +169,6 @@ public class RemoteExecutionService {

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean buildInterrupted = new AtomicBoolean(false);
private final boolean shouldForceDownloads;
private final Predicate<String> shouldForceDownloadPredicate;

public RemoteExecutionService(
Executor executor,
Expand Down Expand Up @@ -215,27 +212,6 @@ public RemoteExecutionService(
this.captureCorruptedOutputsDir = captureCorruptedOutputsDir;

this.scheduler = Schedulers.from(executor, /*interruptibleWorker=*/ true);

// TODO(bazel-team): Consider adding a warning or more validation if the remoteDownloadRegex is
// used without Build without the Bytes.
ImmutableList.Builder<Pattern> builder = ImmutableList.builder();
if (remoteOptions.remoteOutputsMode == RemoteOutputsMode.MINIMAL
|| remoteOptions.remoteOutputsMode == RemoteOutputsMode.TOPLEVEL) {
for (String regex : remoteOptions.remoteDownloadRegex) {
builder.add(Pattern.compile(regex));
}
}
ImmutableList<Pattern> patterns = builder.build();
this.shouldForceDownloads = !patterns.isEmpty();
this.shouldForceDownloadPredicate =
path -> {
for (Pattern pattern : patterns) {
if (pattern.matcher(path).matches()) {
return true;
}
}
return false;
};
}

static Command buildCommand(
Expand Down Expand Up @@ -1004,8 +980,6 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re
/* exitCode = */ result.getExitCode(),
hasFilesToDownload(action.getSpawn().getOutputFiles(), filesToDownload));

ImmutableList<ListenableFuture<FileMetadata>> forcedDownloads = ImmutableList.of();

// Download into temporary paths, then move everything at the end.
// This avoids holding the output lock while downloading, which would prevent the local branch
// from completing sooner under the dynamic execution strategy.
Expand All @@ -1030,15 +1004,6 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re
"Symlinks in action outputs are not yet supported by "
+ "--experimental_remote_download_outputs=minimal");
}
if (shouldForceDownloads) {
forcedDownloads =
buildFilesToDownloadWithPredicate(
context,
progressStatusListener,
metadata,
shouldForceDownloadPredicate,
realToTmpPath);
}
}

FileOutErr tmpOutErr = outErr.childOutErr();
Expand All @@ -1059,17 +1024,6 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re
throw e;
}

// TODO(bazel-team): Unify this block with the equivalent block above.
try (SilentCloseable c = Profiler.instance().profile("Remote.forcedDownload")) {
waitForBulkTransfer(forcedDownloads, /* cancelRemainingOnInterrupt= */ true);
} catch (Exception e) {
// TODO(bazel-team): Consider adding better case-by-case exception handling instead of just
// rethrowing
captureCorruptedOutputs(e);
deletePartialDownloadedOutputs(realToTmpPath, tmpOutErr, e);
throw e;
}

FileOutErr.dump(tmpOutErr, outErr);

// Ensure that we are the only ones writing to the output files when using the dynamic spawn
Expand Down Expand Up @@ -1109,13 +1063,6 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re
// might not be supported on all platforms.
createSymlinks(symlinks);
} else {
// TODO(bazel-team): We should unify this if-block to rely on downloadOutputs above but, as of
// 2022-07-05, downloadOuputs' semantics isn't exactly the same as build-without-the-bytes
// which is necessary for using remoteDownloadRegex.
if (!forcedDownloads.isEmpty()) {
moveOutputsToFinalLocation(forcedDownloads, realToTmpPath);
}

ActionInput inMemoryOutput = null;
Digest inMemoryOutputDigest = null;
PathFragment inMemoryOutputPath = getInMemoryOutputPath(action.getSpawn());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/** RemoteModule provides distributed cache and remote execution for Bazel. */
Expand All @@ -143,6 +144,7 @@ public final class RemoteModule extends BlazeModule {
@Nullable private RemoteOutputService remoteOutputService;
@Nullable private TempPathGenerator tempPathGenerator;
@Nullable private BlockWaitingModule blockWaitingModule;
@Nullable private ImmutableList<Pattern> patternsToDownload;

private ChannelFactory channelFactory =
new ChannelFactory() {
Expand Down Expand Up @@ -277,6 +279,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
Preconditions.checkState(actionInputFetcher == null, "actionInputFetcher must be null");
Preconditions.checkState(remoteOptions == null, "remoteOptions must be null");
Preconditions.checkState(tempPathGenerator == null, "tempPathGenerator must be null");
Preconditions.checkState(patternsToDownload == null, "patternsToDownload must be null");

RemoteOptions remoteOptions = env.getOptions().getOptions(RemoteOptions.class);
if (remoteOptions == null) {
Expand Down Expand Up @@ -326,6 +329,16 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
FailureDetails.RemoteOptions.Code.EXECUTION_WITH_INVALID_CACHE);
}

// TODO(bazel-team): Consider adding a warning or more validation if the remoteDownloadRegex is
// used without Build without the Bytes.
if (!remoteOptions.remoteOutputsMode.downloadAllOutputs()) {
ImmutableList.Builder<Pattern> patternsToDownloadBuilder = ImmutableList.builder();
for (String regex : remoteOptions.remoteDownloadRegex) {
patternsToDownloadBuilder.add(Pattern.compile(regex));
}
patternsToDownload = patternsToDownloadBuilder.build();
}

env.getEventBus().register(this);
String invocationId = env.getCommandId().toString();
String buildRequestId = env.getBuildRequestId();
Expand Down Expand Up @@ -914,6 +927,7 @@ public void afterCommand() throws AbruptExitException {
remoteOutputService = null;
tempPathGenerator = null;
rpcLogFile = null;
patternsToDownload = null;
}

private static void afterCommandTask(
Expand Down Expand Up @@ -1017,13 +1031,16 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
RemoteOutputsMode remoteOutputsMode = remoteOptions.remoteOutputsMode;

if (!remoteOutputsMode.downloadAllOutputs() && actionContextProvider.getRemoteCache() != null) {
Preconditions.checkNotNull(patternsToDownload, "patternsToDownload must not be null");
actionInputFetcher =
new RemoteActionInputFetcher(
env.getBuildRequestId(),
env.getCommandId().toString(),
actionContextProvider.getRemoteCache(),
env.getExecRoot(),
tempPathGenerator);
tempPathGenerator,
patternsToDownload);
env.getEventBus().register(actionInputFetcher);
builder.setActionInputPrefetcher(actionInputFetcher);
remoteOutputService.setActionInputFetcher(actionInputFetcher);
actionContextProvider.setActionInputFetcher(actionInputFetcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ public void flushActionFileSystem(FileSystem actionFileSystem) throws IOExceptio

@Override
public void finalizeAction(Action action, MetadataHandler metadataHandler) {
// Intentionally left empty.
if (actionInputFetcher != null) {
actionInputFetcher.finalizeAction(action, metadataHandler);
}
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public void setUp() throws IOException {
@Override
protected AbstractActionInputPrefetcher createPrefetcher(Map<HashCode, byte[]> cas) {
RemoteCache remoteCache = newCache(options, digestUtil, cas);
return new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
return new RemoteActionInputFetcher(
"none", "none", remoteCache, execRoot, tempPathGenerator, ImmutableList.of());
}

@Test
Expand All @@ -68,7 +69,8 @@ public void testStagingVirtualActionInput() throws Exception {
MetadataProvider metadataProvider = new StaticMetadataProvider(new HashMap<>());
RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
new RemoteActionInputFetcher(
"none", "none", remoteCache, execRoot, tempPathGenerator, ImmutableList.of());
VirtualActionInput a = ActionsTestUtil.createVirtualActionInput("file1", "hello world");

// act
Expand All @@ -88,7 +90,8 @@ public void testStagingEmptyVirtualActionInput() throws Exception {
MetadataProvider metadataProvider = new StaticMetadataProvider(new HashMap<>());
RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
RemoteActionInputFetcher actionInputFetcher =
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
new RemoteActionInputFetcher(
"none", "none", remoteCache, execRoot, tempPathGenerator, ImmutableList.of());

// act
wait(
Expand Down

0 comments on commit e01e7f5

Please sign in to comment.