Skip to content

Commit

Permalink
Don't upload remote input to remote cache
Browse files Browse the repository at this point in the history
and when it's missing, treat it as remote cache eviction.

Also revert the workaround for bazelbuild#19513.

Fixes bazelbuild#21777.
Potential fix for bazelbuild#21626 and bazelbuild#21778.

Closes bazelbuild#21825.

PiperOrigin-RevId: 619877088
Change-Id: Ib1204de8440b780e5a6ee6a563a87da08f196ca5
  • Loading branch information
coeuvre authored and iancha1992 committed Mar 28, 2024
1 parent 43abec8 commit 921bde6
Show file tree
Hide file tree
Showing 19 changed files with 240 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.vfs.FileSymlinkLoopException;
Expand Down Expand Up @@ -79,8 +80,6 @@ public abstract class AbstractActionInputPrefetcher implements ActionInputPrefet
protected final Path execRoot;
protected final RemoteOutputChecker remoteOutputChecker;

private final Set<ActionInput> missingActionInputs = Sets.newConcurrentHashSet();

private final ActionOutputDirectoryHelper outputDirectoryHelper;

/** The state of a directory tracked by {@link DirectoryTracker}, as explained below. */
Expand Down Expand Up @@ -538,7 +537,7 @@ private Completable downloadFileNoCheckRx(
.doOnError(
error -> {
if (error instanceof CacheNotFoundException) {
missingActionInputs.add(actionInput);
reporter.post(new LostInputsEvent());
}
}));

Expand Down Expand Up @@ -700,10 +699,6 @@ public void flushOutputTree() throws InterruptedException {
downloadCache.awaitInProgressTasks();
}

public ImmutableSet<ActionInput> getMissingActionInputs() {
return ImmutableSet.copyOf(missingActionInputs);
}

public RemoteOutputChecker getRemoteOutputChecker() {
return remoteOutputChecker;
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event",
"//src/main/java/com/google/devtools/build/lib/remote/disk",
"//src/main/java/com/google/devtools/build/lib/remote/downloader",
"//src/main/java/com/google/devtools/build/lib/remote/grpc",
Expand Down Expand Up @@ -229,6 +230,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/events",
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception",
"//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
Expand All @@ -245,12 +247,13 @@ java_library(
srcs = ["LeaseService.java"],
deps = [
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/actions:artifacts",
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
"//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event",
"//src/main/java/com/google/devtools/build/lib/skyframe:action_execution_value",
"//src/main/java/com/google/devtools/build/lib/skyframe:sky_functions",
"//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
"//src/main/java/com/google/devtools/build/skyframe",
"//third_party:guava",
"//third_party:jsr305",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import com.google.devtools.build.lib.actions.ActionInput;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.cache.ActionCache;
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.skyframe.ActionExecutionValue;
import com.google.devtools.build.lib.skyframe.SkyFunctions;
import com.google.devtools.build.lib.skyframe.TreeArtifactValue;
import com.google.devtools.build.skyframe.MemoizingEvaluator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

Expand All @@ -30,6 +31,7 @@ public class LeaseService {
@Nullable private final ActionCache actionCache;
private final AtomicBoolean leaseExtensionStarted = new AtomicBoolean(false);
@Nullable LeaseExtension leaseExtension;
private final AtomicBoolean hasMissingActionInputs = new AtomicBoolean(false);

public LeaseService(
MemoizingEvaluator memoizingEvaluator,
Expand All @@ -48,12 +50,18 @@ public void finalizeAction() {
}
}

public void finalizeExecution(Set<ActionInput> missingActionInputs) {
@AllowConcurrentEvents
@Subscribe
public void onLostInputs(LostInputsEvent event) {
hasMissingActionInputs.set(true);
}

public void finalizeExecution() {
if (leaseExtension != null) {
leaseExtension.stop();
}

if (!missingActionInputs.isEmpty()) {
if (hasMissingActionInputs.getAndSet(false)) {
handleMissingInputs();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
Expand All @@ -22,24 +23,29 @@
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.Message;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
Expand All @@ -59,13 +65,50 @@
/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

/**
* An interface used to check whether a given {@link Path} is stored in a remote or a disk cache.
*/
public interface RemotePathChecker {
boolean isRemote(RemoteActionExecutionContext context, Path path) throws IOException;
}

private RemotePathChecker remotePathChecker =
new RemotePathChecker() {
@Override
public boolean isRemote(RemoteActionExecutionContext context, Path path)
throws IOException {
var fs = path.getFileSystem();
if (fs instanceof RemoteActionFileSystem) {
var remoteActionFileSystem = (RemoteActionFileSystem) fs;
if (remoteActionFileSystem.isRemote(path)) {
if (context.getReadCachePolicy().allowDiskCache()) {
try (var inputStream = path.getInputStream()) {
// If the file exists in the disk cache, download it and continue the upload.
return false;
} catch (IOException e) {
logger.atWarning().withCause(e).log(
"Failed to get input stream for %s", path.getPathString());
}
}
return true;
}
}
return false;
}
};

public RemoteExecutionCache(
RemoteCacheClient protocolImpl,
RemoteOptions options,
DigestUtil digestUtil) {
RemoteCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) {
super(protocolImpl, options, digestUtil);
}

@VisibleForTesting
void setRemotePathChecker(RemotePathChecker remotePathChecker) {
this.remotePathChecker = remotePathChecker;
}

/**
* Ensures that the tree structure of the inputs, the input files themselves, and the command are
* available in the remote cache, such that the tree can be reassembled and executed on another
Expand All @@ -82,7 +125,8 @@ public void ensureInputsPresent(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
boolean force)
boolean force,
Reporter reporter)
throws IOException, InterruptedException {
Iterable<Digest> merkleTreeAllDigests;
try (SilentCloseable s = Profiler.instance().profile("merkleTree.getAllDigests()")) {
Expand All @@ -95,7 +139,7 @@ public void ensureInputsPresent(
}

Flowable<TransferResult> uploads =
createUploadTasks(context, merkleTree, additionalInputs, allDigests, force)
createUploadTasks(context, merkleTree, additionalInputs, allDigests, force, reporter)
.flatMapPublisher(
result ->
Flowable.using(
Expand All @@ -113,10 +157,7 @@ public void ensureInputsPresent(
}));

try {
// Workaround for https://github.com/bazelbuild/bazel/issues/19513.
if (!mergeBulkTransfer(uploads).blockingAwait(options.remoteTimeout.getSeconds(), SECONDS)) {
throw new IOException("Timed out when waiting for uploads");
}
mergeBulkTransfer(uploads).blockingAwait();
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause != null) {
Expand All @@ -131,7 +172,8 @@ private ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context,
Digest digest,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs) {
Map<Digest, Message> additionalInputs,
Reporter reporter) {
Directory node = merkleTree.getDirectoryByDigest(digest);
if (node != null) {
return cacheProtocol.uploadBlob(context, digest, node.toByteString());
Expand All @@ -142,7 +184,20 @@ private ListenableFuture<Void> uploadBlob(
if (file.getBytes() != null) {
return cacheProtocol.uploadBlob(context, digest, file.getBytes());
}
return cacheProtocol.uploadFile(context, digest, file.getPath());

var path = checkNotNull(file.getPath());
try {
if (remotePathChecker.isRemote(context, path)) {
// If we get here, the remote input was determined to exist in the remote or disk cache at
// some point before action execution, but reported to be missing when querying the remote
// for missing action inputs; possibly because it was evicted in the interim.
reporter.post(new LostInputsEvent());
throw new CacheNotFoundException(digest, path.getPathString());
}
} catch (IOException e) {
return immediateFailedFuture(e);
}
return cacheProtocol.uploadFile(context, digest, path);
}

Message message = additionalInputs.get(digest);
Expand All @@ -169,14 +224,16 @@ private Single<List<UploadTask>> createUploadTasks(
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Iterable<Digest> allDigests,
boolean force) {
boolean force,
Reporter reporter) {
return Single.using(
() -> Profiler.instance().profile("collect digests"),
ignored ->
Flowable.fromIterable(allDigests)
.flatMapMaybe(
digest ->
maybeCreateUploadTask(context, merkleTree, additionalInputs, digest, force))
maybeCreateUploadTask(
context, merkleTree, additionalInputs, digest, force, reporter))
.collect(toImmutableList()),
SilentCloseable::close);
}
Expand All @@ -186,7 +243,8 @@ private Maybe<UploadTask> maybeCreateUploadTask(
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Digest digest,
boolean force) {
boolean force,
Reporter reporter) {
return Maybe.create(
emitter -> {
AsyncSubject<Void> completion = AsyncSubject.create();
Expand All @@ -211,7 +269,11 @@ private Maybe<UploadTask> maybeCreateUploadTask(
return toCompletable(
() ->
uploadBlob(
context, uploadTask.digest, merkleTree, additionalInputs),
context,
uploadTask.digest,
merkleTree,
additionalInputs,
reporter),
directExecutor());
}),
/* onAlreadyRunning= */ () -> emitter.onSuccess(uploadTask),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,8 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
.withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache
merkleTree,
additionalInputs,
force);
force,
reporter);
} finally {
maybeReleaseRemoteActionBuildingSemaphore();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
buildRequestId,
invocationId,
remoteOptions.remoteInstanceName,
remoteOptions.remoteAcceptCached));
remoteOptions.remoteAcceptCached,
env.getReporter()));
} else {
if (enableDiskCache) {
try {
Expand Down Expand Up @@ -997,6 +998,7 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
env.getSkyframeExecutor().getEvaluator(),
env.getBlazeWorkspace().getPersistentActionCache(),
leaseExtension);
env.getEventBus().register(leaseService);

remoteOutputService.setRemoteOutputChecker(remoteOutputChecker);
remoteOutputService.setActionInputFetcher(actionInputFetcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.Subscribe;
import com.google.devtools.build.lib.actions.Action;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputMap;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.ArtifactPathResolver;
Expand Down Expand Up @@ -168,11 +166,7 @@ public void finalizeBuild(boolean buildSuccessful) {
@Subscribe
public void onExecutionPhaseCompleteEvent(ExecutionPhaseCompleteEvent event) {
if (leaseService != null) {
var missingActionInputs = ImmutableSet.<ActionInput>of();
if (actionInputFetcher != null) {
missingActionInputs = actionInputFetcher.getMissingActionInputs();
}
leaseService.finalizeExecution(missingActionInputs);
leaseService.finalizeExecution();
}
}

Expand Down
Loading

0 comments on commit 921bde6

Please sign in to comment.