Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't upload remote input to remote cache #21825

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.flogger.GoogleLogger;
Expand All @@ -46,6 +47,7 @@
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.vfs.FileSymlinkLoopException;
Expand Down Expand Up @@ -79,8 +81,6 @@ public abstract class AbstractActionInputPrefetcher implements ActionInputPrefet
protected final Path execRoot;
protected final RemoteOutputChecker remoteOutputChecker;

private final Set<ActionInput> missingActionInputs = Sets.newConcurrentHashSet();

private final ActionOutputDirectoryHelper outputDirectoryHelper;

/** The state of a directory tracked by {@link DirectoryTracker}, as explained below. */
Expand Down Expand Up @@ -538,7 +538,8 @@ private Completable downloadFileNoCheckRx(
.doOnError(
error -> {
if (error instanceof CacheNotFoundException) {
missingActionInputs.add(actionInput);
reporter.post(
new LostInputsEvent(ImmutableList.of(actionInput.getExecPath())));
}
}));

Expand Down Expand Up @@ -698,10 +699,6 @@ public void flushOutputTree() throws InterruptedException {
downloadCache.awaitInProgressTasks();
}

public ImmutableSet<ActionInput> getMissingActionInputs() {
return ImmutableSet.copyOf(missingActionInputs);
}

public RemoteOutputChecker getRemoteOutputChecker() {
return remoteOutputChecker;
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event",
"//src/main/java/com/google/devtools/build/lib/remote/disk",
"//src/main/java/com/google/devtools/build/lib/remote/downloader",
"//src/main/java/com/google/devtools/build/lib/remote/grpc",
Expand Down Expand Up @@ -232,6 +233,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/events",
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
Expand All @@ -250,10 +252,12 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/actions:artifacts",
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
"//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event",
"//src/main/java/com/google/devtools/build/lib/skyframe:action_execution_value",
"//src/main/java/com/google/devtools/build/lib/skyframe:sky_functions",
"//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
"//src/main/java/com/google/devtools/build/skyframe",
"//third_party:guava",
"//third_party:jsr305",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import com.google.devtools.build.lib.actions.ActionInput;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.cache.ActionCache;
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.skyframe.ActionExecutionValue;
import com.google.devtools.build.lib.skyframe.SkyFunctions;
import com.google.devtools.build.lib.skyframe.TreeArtifactValue;
import com.google.devtools.build.skyframe.MemoizingEvaluator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

Expand All @@ -30,6 +31,7 @@ public class LeaseService {
@Nullable private final ActionCache actionCache;
private final AtomicBoolean leaseExtensionStarted = new AtomicBoolean(false);
@Nullable LeaseExtension leaseExtension;
private final AtomicBoolean hasMissingActionInputs = new AtomicBoolean(false);

public LeaseService(
MemoizingEvaluator memoizingEvaluator,
Expand All @@ -48,12 +50,18 @@ public void finalizeAction() {
}
}

public void finalizeExecution(Set<ActionInput> missingActionInputs) {
@AllowConcurrentEvents
@Subscribe
public void onLostInputs(LostInputsEvent event) {
hasMissingActionInputs.set(true);
}

public void finalizeExecution() {
if (leaseExtension != null) {
leaseExtension.stop();
}

if (!missingActionInputs.isEmpty()) {
if (hasMissingActionInputs.getAndSet(false)) {
handleMissingInputs();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
Expand All @@ -22,24 +23,27 @@
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.Message;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
Expand All @@ -59,13 +63,33 @@
/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {

public interface RemotePathChecker {
boolean isRemote(Path path) throws IOException;
}

private RemotePathChecker remotePathChecker =
new RemotePathChecker() {
@Override
public boolean isRemote(Path path) throws IOException {
var fs = path.getFileSystem();
if (fs instanceof RemoteActionFileSystem) {
var remoteActionFileSystem = (RemoteActionFileSystem) fs;
return remoteActionFileSystem.isRemote(path);
}
return false;
}
};

public RemoteExecutionCache(
RemoteCacheClient protocolImpl,
RemoteOptions options,
DigestUtil digestUtil) {
RemoteCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) {
super(protocolImpl, options, digestUtil);
}

@VisibleForTesting
void setRemotePathChecker(RemotePathChecker remotePathChecker) {
this.remotePathChecker = remotePathChecker;
}

/**
* Ensures that the tree structure of the inputs, the input files themselves, and the command are
* available in the remote cache, such that the tree can be reassembled and executed on another
Expand All @@ -82,7 +106,9 @@ public void ensureInputsPresent(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
boolean force)
boolean force,
Path execRoot,
Reporter reporter)
throws IOException, InterruptedException {
Iterable<Digest> merkleTreeAllDigests;
try (SilentCloseable s = Profiler.instance().profile("merkleTree.getAllDigests()")) {
Expand All @@ -95,7 +121,8 @@ public void ensureInputsPresent(
}

Flowable<TransferResult> uploads =
createUploadTasks(context, merkleTree, additionalInputs, allDigests, force)
createUploadTasks(
context, merkleTree, additionalInputs, allDigests, force, execRoot, reporter)
.flatMapPublisher(
result ->
Flowable.using(
Expand All @@ -113,10 +140,7 @@ public void ensureInputsPresent(
}));

try {
// Workaround for https://github.com/bazelbuild/bazel/issues/19513.
if (!mergeBulkTransfer(uploads).blockingAwait(options.remoteTimeout.getSeconds(), SECONDS)) {
throw new IOException("Timed out when waiting for uploads");
}
mergeBulkTransfer(uploads).blockingAwait();
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause != null) {
Expand All @@ -131,7 +155,9 @@ private ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context,
Digest digest,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs) {
Map<Digest, Message> additionalInputs,
Path execRoot,
Reporter reporter) {
Directory node = merkleTree.getDirectoryByDigest(digest);
if (node != null) {
return cacheProtocol.uploadBlob(context, digest, node.toByteString());
Expand All @@ -142,7 +168,19 @@ private ListenableFuture<Void> uploadBlob(
if (file.getBytes() != null) {
return cacheProtocol.uploadBlob(context, digest, file.getBytes());
}
return cacheProtocol.uploadFile(context, digest, file.getPath());

var path = checkNotNull(file.getPath());
try {
if (remotePathChecker.isRemote(path)) {
// A remote input is missing from remote cache, probably evicted by the remote server.
var execPath = path.relativeTo(execRoot);
reporter.post(new LostInputsEvent(ImmutableList.of(execPath)));
throw new IOException(format("%s (%s) was evicted from remote cache", execPath, digest));
}
} catch (IOException e) {
return immediateFailedFuture(e);
}
return cacheProtocol.uploadFile(context, digest, path);
}

Message message = additionalInputs.get(digest);
Expand All @@ -169,14 +207,23 @@ private Single<List<UploadTask>> createUploadTasks(
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Iterable<Digest> allDigests,
boolean force) {
boolean force,
Path execRoot,
Reporter reporter) {
return Single.using(
() -> Profiler.instance().profile("collect digests"),
ignored ->
Flowable.fromIterable(allDigests)
.flatMapMaybe(
digest ->
maybeCreateUploadTask(context, merkleTree, additionalInputs, digest, force))
maybeCreateUploadTask(
context,
merkleTree,
additionalInputs,
digest,
force,
execRoot,
reporter))
.collect(toImmutableList()),
SilentCloseable::close);
}
Expand All @@ -186,7 +233,9 @@ private Maybe<UploadTask> maybeCreateUploadTask(
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Digest digest,
boolean force) {
boolean force,
Path execRoot,
Reporter reporter) {
return Maybe.create(
emitter -> {
AsyncSubject<Void> completion = AsyncSubject.create();
Expand All @@ -211,7 +260,12 @@ private Maybe<UploadTask> maybeCreateUploadTask(
return toCompletable(
() ->
uploadBlob(
context, uploadTask.digest, merkleTree, additionalInputs),
context,
uploadTask.digest,
merkleTree,
additionalInputs,
execRoot,
reporter),
directExecutor());
}),
/* onAlreadyRunning= */ () -> emitter.onSuccess(uploadTask),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,9 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
.withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache
merkleTree,
additionalInputs,
force);
force,
execRoot,
reporter);
} finally {
maybeReleaseRemoteActionBuildingSemaphore();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
buildRequestId,
invocationId,
remoteOptions.remoteInstanceName,
remoteOptions.remoteAcceptCached));
remoteOptions.remoteAcceptCached,
env::getExecRoot,
env.getReporter()));
} else {
if (enableDiskCache) {
try {
Expand Down Expand Up @@ -1053,6 +1055,7 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
env.getSkyframeExecutor().getEvaluator(),
env.getBlazeWorkspace().getPersistentActionCache(),
leaseExtension);
env.getEventBus().register(leaseService);

if (outputService instanceof RemoteOutputService remoteOutputService) {
remoteOutputService.setRemoteOutputChecker(remoteOutputChecker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,7 @@ public void finalizeBuild(boolean buildSuccessful) {
@Subscribe
public void onExecutionPhaseCompleteEvent(ExecutionPhaseCompleteEvent event) {
if (leaseService != null) {
var missingActionInputs = ImmutableSet.<ActionInput>of();
if (actionInputFetcher != null) {
missingActionInputs = actionInputFetcher.getMissingActionInputs();
}
leaseService.finalizeExecution(missingActionInputs);
leaseService.finalizeExecution();
}
}

Expand Down
Loading
Loading