From 921bde65f65dc16ac3c5290a3d2ca671fa490fff Mon Sep 17 00:00:00 2001 From: Chi Wang Date: Thu, 28 Mar 2024 04:19:48 -0700 Subject: [PATCH] Don't upload remote input to remote cache and when it's missing, treat it as remote cache eviction. Also revert the workaround for #19513. Fixes #21777. Potential fix for #21626 and #21778. Closes #21825. PiperOrigin-RevId: 619877088 Change-Id: Ib1204de8440b780e5a6ee6a563a87da08f196ca5 --- .../remote/AbstractActionInputPrefetcher.java | 9 +- .../google/devtools/build/lib/remote/BUILD | 5 +- .../build/lib/remote/LeaseService.java | 16 +++- .../lib/remote/RemoteExecutionCache.java | 94 +++++++++++++++---- .../lib/remote/RemoteExecutionService.java | 3 +- .../build/lib/remote/RemoteModule.java | 4 +- .../build/lib/remote/RemoteOutputService.java | 8 +- .../RemoteRepositoryRemoteExecutor.java | 9 +- ...RemoteRepositoryRemoteExecutorFactory.java | 9 +- .../devtools/build/lib/remote/common/BUILD | 9 ++ .../remote/common/CacheNotFoundException.java | 5 + .../lib/remote/common/LostInputsEvent.java | 22 +++++ .../remote/ActionInputPrefetcherTestBase.java | 23 ++++- .../google/devtools/build/lib/remote/BUILD | 2 + .../build/lib/remote/GrpcCacheClientTest.java | 5 +- .../remote/RemoteActionInputFetcherTest.java | 2 +- .../build/lib/remote/RemoteCacheTest.java | 69 +++++++++++--- .../RemoteRepositoryRemoteExecutorTest.java | 5 +- .../lib/remote/RemoteSpawnRunnerTest.java | 2 +- 19 files changed, 240 insertions(+), 61 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/remote/common/LostInputsEvent.java diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java index a81718008a27ca..5c8798164a2bef 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java +++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java @@ -46,6 +46,7 @@ import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.util.AsyncTaskCache; import com.google.devtools.build.lib.remote.util.TempPathGenerator; import com.google.devtools.build.lib.vfs.FileSymlinkLoopException; @@ -79,8 +80,6 @@ public abstract class AbstractActionInputPrefetcher implements ActionInputPrefet protected final Path execRoot; protected final RemoteOutputChecker remoteOutputChecker; - private final Set missingActionInputs = Sets.newConcurrentHashSet(); - private final ActionOutputDirectoryHelper outputDirectoryHelper; /** The state of a directory tracked by {@link DirectoryTracker}, as explained below. */ @@ -538,7 +537,7 @@ private Completable downloadFileNoCheckRx( .doOnError( error -> { if (error instanceof CacheNotFoundException) { - missingActionInputs.add(actionInput); + reporter.post(new LostInputsEvent()); } })); @@ -700,10 +699,6 @@ public void flushOutputTree() throws InterruptedException { downloadCache.awaitInProgressTasks(); } - public ImmutableSet getMissingActionInputs() { - return ImmutableSet.copyOf(missingActionInputs); - } - public RemoteOutputChecker getRemoteOutputChecker() { return remoteOutputChecker; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 91fa5cde4a2729..fd600682ed8e35 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -96,6 +96,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", + "//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event", "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/downloader", "//src/main/java/com/google/devtools/build/lib/remote/grpc", @@ -229,6 +230,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/events", "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", + "//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", @@ -245,12 +247,13 @@ java_library( srcs = ["LeaseService.java"], deps = [ "//src/main/java/com/google/devtools/build/lib/actions", - "//src/main/java/com/google/devtools/build/lib/actions:artifacts", "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", + "//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event", "//src/main/java/com/google/devtools/build/lib/skyframe:action_execution_value", "//src/main/java/com/google/devtools/build/lib/skyframe:sky_functions", "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value", "//src/main/java/com/google/devtools/build/skyframe", + "//third_party:guava", "//third_party:jsr305", ], ) diff --git a/src/main/java/com/google/devtools/build/lib/remote/LeaseService.java b/src/main/java/com/google/devtools/build/lib/remote/LeaseService.java index f9ef0e05530082..2058fa14c1d178 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/LeaseService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/LeaseService.java @@ -13,14 +13,15 @@ // limitations under the License. package com.google.devtools.build.lib.remote; -import com.google.devtools.build.lib.actions.ActionInput; +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.Subscribe; import com.google.devtools.build.lib.actions.FileArtifactValue; import com.google.devtools.build.lib.actions.cache.ActionCache; +import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.skyframe.ActionExecutionValue; import com.google.devtools.build.lib.skyframe.SkyFunctions; import com.google.devtools.build.lib.skyframe.TreeArtifactValue; import com.google.devtools.build.skyframe.MemoizingEvaluator; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -30,6 +31,7 @@ public class LeaseService { @Nullable private final ActionCache actionCache; private final AtomicBoolean leaseExtensionStarted = new AtomicBoolean(false); @Nullable LeaseExtension leaseExtension; + private final AtomicBoolean hasMissingActionInputs = new AtomicBoolean(false); public LeaseService( MemoizingEvaluator memoizingEvaluator, @@ -48,12 +50,18 @@ public void finalizeAction() { } } - public void finalizeExecution(Set missingActionInputs) { + @AllowConcurrentEvents + @Subscribe + public void onLostInputs(LostInputsEvent event) { + hasMissingActionInputs.set(true); + } + + public void finalizeExecution() { if (leaseExtension != null) { leaseExtension.stop(); } - if (!missingActionInputs.isEmpty()) { + if (hasMissingActionInputs.getAndSet(false)) { handleMissingInputs(); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index ad3271b6c7c3d8..f9f7603af469ed 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.remote; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; @@ -22,17 +23,21 @@ import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer; import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult; import static java.lang.String.format; -import static java.util.concurrent.TimeUnit.SECONDS; import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.Directory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.flogger.GoogleLogger; import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.SilentCloseable; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; @@ -40,6 +45,7 @@ import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult; +import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.Message; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Completable; @@ -59,13 +65,50 @@ /** A {@link RemoteCache} with additional functionality needed for remote execution. */ public class RemoteExecutionCache extends RemoteCache { + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + + /** + * An interface used to check whether a given {@link Path} is stored in a remote or a disk cache. + */ + public interface RemotePathChecker { + boolean isRemote(RemoteActionExecutionContext context, Path path) throws IOException; + } + + private RemotePathChecker remotePathChecker = + new RemotePathChecker() { + @Override + public boolean isRemote(RemoteActionExecutionContext context, Path path) + throws IOException { + var fs = path.getFileSystem(); + if (fs instanceof RemoteActionFileSystem) { + var remoteActionFileSystem = (RemoteActionFileSystem) fs; + if (remoteActionFileSystem.isRemote(path)) { + if (context.getReadCachePolicy().allowDiskCache()) { + try (var inputStream = path.getInputStream()) { + // If the file exists in the disk cache, download it and continue the upload. + return false; + } catch (IOException e) { + logger.atWarning().withCause(e).log( + "Failed to get input stream for %s", path.getPathString()); + } + } + return true; + } + } + return false; + } + }; + public RemoteExecutionCache( - RemoteCacheClient protocolImpl, - RemoteOptions options, - DigestUtil digestUtil) { + RemoteCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) { super(protocolImpl, options, digestUtil); } + @VisibleForTesting + void setRemotePathChecker(RemotePathChecker remotePathChecker) { + this.remotePathChecker = remotePathChecker; + } + /** * Ensures that the tree structure of the inputs, the input files themselves, and the command are * available in the remote cache, such that the tree can be reassembled and executed on another @@ -82,7 +125,8 @@ public void ensureInputsPresent( RemoteActionExecutionContext context, MerkleTree merkleTree, Map additionalInputs, - boolean force) + boolean force, + Reporter reporter) throws IOException, InterruptedException { Iterable merkleTreeAllDigests; try (SilentCloseable s = Profiler.instance().profile("merkleTree.getAllDigests()")) { @@ -95,7 +139,7 @@ public void ensureInputsPresent( } Flowable uploads = - createUploadTasks(context, merkleTree, additionalInputs, allDigests, force) + createUploadTasks(context, merkleTree, additionalInputs, allDigests, force, reporter) .flatMapPublisher( result -> Flowable.using( @@ -113,10 +157,7 @@ public void ensureInputsPresent( })); try { - // Workaround for https://github.com/bazelbuild/bazel/issues/19513. - if (!mergeBulkTransfer(uploads).blockingAwait(options.remoteTimeout.getSeconds(), SECONDS)) { - throw new IOException("Timed out when waiting for uploads"); - } + mergeBulkTransfer(uploads).blockingAwait(); } catch (RuntimeException e) { Throwable cause = e.getCause(); if (cause != null) { @@ -131,7 +172,8 @@ private ListenableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, MerkleTree merkleTree, - Map additionalInputs) { + Map additionalInputs, + Reporter reporter) { Directory node = merkleTree.getDirectoryByDigest(digest); if (node != null) { return cacheProtocol.uploadBlob(context, digest, node.toByteString()); @@ -142,7 +184,20 @@ private ListenableFuture uploadBlob( if (file.getBytes() != null) { return cacheProtocol.uploadBlob(context, digest, file.getBytes()); } - return cacheProtocol.uploadFile(context, digest, file.getPath()); + + var path = checkNotNull(file.getPath()); + try { + if (remotePathChecker.isRemote(context, path)) { + // If we get here, the remote input was determined to exist in the remote or disk cache at + // some point before action execution, but reported to be missing when querying the remote + // for missing action inputs; possibly because it was evicted in the interim. + reporter.post(new LostInputsEvent()); + throw new CacheNotFoundException(digest, path.getPathString()); + } + } catch (IOException e) { + return immediateFailedFuture(e); + } + return cacheProtocol.uploadFile(context, digest, path); } Message message = additionalInputs.get(digest); @@ -169,14 +224,16 @@ private Single> createUploadTasks( MerkleTree merkleTree, Map additionalInputs, Iterable allDigests, - boolean force) { + boolean force, + Reporter reporter) { return Single.using( () -> Profiler.instance().profile("collect digests"), ignored -> Flowable.fromIterable(allDigests) .flatMapMaybe( digest -> - maybeCreateUploadTask(context, merkleTree, additionalInputs, digest, force)) + maybeCreateUploadTask( + context, merkleTree, additionalInputs, digest, force, reporter)) .collect(toImmutableList()), SilentCloseable::close); } @@ -186,7 +243,8 @@ private Maybe maybeCreateUploadTask( MerkleTree merkleTree, Map additionalInputs, Digest digest, - boolean force) { + boolean force, + Reporter reporter) { return Maybe.create( emitter -> { AsyncSubject completion = AsyncSubject.create(); @@ -211,7 +269,11 @@ private Maybe maybeCreateUploadTask( return toCompletable( () -> uploadBlob( - context, uploadTask.digest, merkleTree, additionalInputs), + context, + uploadTask.digest, + merkleTree, + additionalInputs, + reporter), directExecutor()); }), /* onAlreadyRunning= */ () -> emitter.onSuccess(uploadTask), diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java index 149f9c752c1e01..1a6f945beec46a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java @@ -1477,7 +1477,8 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force) .withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache merkleTree, additionalInputs, - force); + force, + reporter); } finally { maybeReleaseRemoteActionBuildingSemaphore(); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index f23d9870f34b8b..e5d108b030a0e3 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -586,7 +586,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { buildRequestId, invocationId, remoteOptions.remoteInstanceName, - remoteOptions.remoteAcceptCached)); + remoteOptions.remoteAcceptCached, + env.getReporter())); } else { if (enableDiskCache) { try { @@ -997,6 +998,7 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB env.getSkyframeExecutor().getEvaluator(), env.getBlazeWorkspace().getPersistentActionCache(), leaseExtension); + env.getEventBus().register(leaseService); remoteOutputService.setRemoteOutputChecker(remoteOutputChecker); remoteOutputService.setActionInputFetcher(actionInputFetcher); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java index ab19f071381b67..7ab01b673a802c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java @@ -19,11 +19,9 @@ import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.eventbus.Subscribe; import com.google.devtools.build.lib.actions.Action; import com.google.devtools.build.lib.actions.ActionExecutionMetadata; -import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputMap; import com.google.devtools.build.lib.actions.Artifact; import com.google.devtools.build.lib.actions.ArtifactPathResolver; @@ -168,11 +166,7 @@ public void finalizeBuild(boolean buildSuccessful) { @Subscribe public void onExecutionPhaseCompleteEvent(ExecutionPhaseCompleteEvent event) { if (leaseService != null) { - var missingActionInputs = ImmutableSet.of(); - if (actionInputFetcher != null) { - missingActionInputs = actionInputFetcher.getMissingActionInputs(); - } - leaseService.finalizeExecution(missingActionInputs); + leaseService.finalizeExecution(); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java index 8fb79e208f4cc7..b30306fca1c360 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; import com.google.devtools.build.lib.analysis.platform.PlatformUtils; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.profiler.SilentCloseable; @@ -60,6 +61,7 @@ public class RemoteRepositoryRemoteExecutor implements RepositoryRemoteExecutor private final String remoteInstanceName; private final boolean acceptCached; + private final Reporter reporter; public RemoteRepositoryRemoteExecutor( RemoteExecutionCache remoteCache, @@ -68,7 +70,8 @@ public RemoteRepositoryRemoteExecutor( String buildRequestId, String commandId, String remoteInstanceName, - boolean acceptCached) { + boolean acceptCached, + Reporter reporter) { this.remoteCache = remoteCache; this.remoteExecutor = remoteExecutor; this.digestUtil = digestUtil; @@ -76,6 +79,7 @@ public RemoteRepositoryRemoteExecutor( this.commandId = commandId; this.remoteInstanceName = remoteInstanceName; this.acceptCached = acceptCached; + this.reporter = reporter; } private ExecutionResult downloadOutErr(RemoteActionExecutionContext context, ActionResult result) @@ -162,7 +166,8 @@ public ExecutionResult execute( additionalInputs.put(actionDigest, action); additionalInputs.put(commandHash, command); - remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs, /* force= */ true); + remoteCache.ensureInputsPresent( + context, merkleTree, additionalInputs, /* force= */ true, reporter); } try (SilentCloseable c = diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java index a67cf859bef185..bb5e80a12e7c15 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.remote; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.runtime.RepositoryRemoteExecutor; @@ -29,6 +30,7 @@ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorF private final String remoteInstanceName; private final boolean acceptCached; + private final Reporter reporter; RemoteRepositoryRemoteExecutorFactory( RemoteExecutionCache remoteExecutionCache, @@ -37,7 +39,8 @@ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorF String buildRequestId, String commandId, String remoteInstanceName, - boolean acceptCached) { + boolean acceptCached, + Reporter reporter) { this.remoteExecutionCache = remoteExecutionCache; this.remoteExecutor = remoteExecutor; this.digestUtil = digestUtil; @@ -45,6 +48,7 @@ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorF this.commandId = commandId; this.remoteInstanceName = remoteInstanceName; this.acceptCached = acceptCached; + this.reporter = reporter; } @Override @@ -56,6 +60,7 @@ public RepositoryRemoteExecutor create() { buildRequestId, commandId, remoteInstanceName, - acceptCached); + acceptCached, + reporter); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD index e3ac9bb7432b02..6b8390e8a06fec 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD @@ -30,6 +30,14 @@ java_library( ], ) +java_library( + name = "lost_inputs_event", + srcs = ["LostInputsEvent.java"], + deps = [ + "//src/main/java/com/google/devtools/build/lib/events", + ], +) + java_library( name = "common", srcs = glob( @@ -37,6 +45,7 @@ java_library( exclude = [ "BulkTransferException.java", "CacheNotFoundException.java", + "LostInputsEvent.java", ], ), deps = [ diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/CacheNotFoundException.java b/src/main/java/com/google/devtools/build/lib/remote/common/CacheNotFoundException.java index b48ccb2b23fa9b..caa8a41a676a8c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/CacheNotFoundException.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/CacheNotFoundException.java @@ -31,6 +31,11 @@ public CacheNotFoundException(Digest missingDigest) { this.missingDigest = missingDigest; } + public CacheNotFoundException(Digest missingDigest, String filename) { + this.missingDigest = missingDigest; + this.filename = filename; + } + public void setFilename(@Nullable String filename) { this.filename = filename; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/LostInputsEvent.java b/src/main/java/com/google/devtools/build/lib/remote/common/LostInputsEvent.java new file mode 100644 index 00000000000000..b770ff263c5aed --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/common/LostInputsEvent.java @@ -0,0 +1,22 @@ +// Copyright 2024 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.common; + +import com.google.devtools.build.lib.events.ExtendedEventHandler.Postable; + +/** + * An event sent when an input to an action was previously determined to exist remotely, but has + * since been evicted. + */ +public class LostInputsEvent implements Postable {} diff --git a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java index b39f2cf6836091..78f260ecb13763 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java @@ -36,6 +36,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import com.google.common.hash.HashCode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -53,6 +56,7 @@ import com.google.devtools.build.lib.actions.FileArtifactValue; import com.google.devtools.build.lib.actions.FileArtifactValue.RemoteFileArtifactValue; import com.google.devtools.build.lib.actions.util.ActionsTestUtil; +import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.util.TempPathGenerator; import com.google.devtools.build.lib.skyframe.TreeArtifactValue; import com.google.devtools.build.lib.testing.vfs.SpiedFileSystem; @@ -71,6 +75,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -115,6 +120,7 @@ void setChmodDelay(Duration chmodDelay) { protected Path execRoot; protected ArtifactRoot artifactRoot; protected TempPathGenerator tempPathGenerator; + protected EventBus eventBus; protected ActionExecutionMetadata action; @@ -134,6 +140,8 @@ public void setUp() throws IOException { Path tempDir = fs.getPath("/tmp"); tempDir.createDirectoryAndParents(); tempPathGenerator = new TempPathGenerator(tempDir); + + eventBus = new EventBus(); } protected Artifact createRemoteArtifact( @@ -804,11 +812,20 @@ public void prefetchFiles_onInterrupt_deletePartialDownloadedFile() throws Excep } @Test - public void missingInputs_addedToList() { + public void missingInputs_sendLostInputsEvent() { Map metadata = new HashMap<>(); Map cas = new HashMap<>(); - Artifact a = createRemoteArtifact("file", "hello world", metadata, /* cas= */ null); + var unused = createRemoteArtifact("file", "hello world", metadata, /* cas= */ null); AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas); + var lostInputsEvents = new ConcurrentLinkedQueue(); + eventBus.register( + new Object() { + @Subscribe + @AllowConcurrentEvents + public void onLostInputsEvent(LostInputsEvent event) { + lostInputsEvents.add(event); + } + }); assertThrows( Exception.class, @@ -817,7 +834,7 @@ public void missingInputs_addedToList() { prefetcher.prefetchFiles( action, metadata.keySet(), metadata::get, Priority.MEDIUM))); - assertThat(prefetcher.getMissingActionInputs()).contains(a); + assertThat(lostInputsEvents).hasSize(1); } protected static void wait(ListenableFuture future) diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index 4440a2c67e6a37..00de4ab632c360 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -32,6 +32,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/actions:artifacts", "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", "//src/main/java/com/google/devtools/build/lib/remote:abstract_action_input_prefetcher", + "//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value", "//src/main/java/com/google/devtools/build/lib/testing/vfs:spied_filesystem", @@ -104,6 +105,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", + "//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event", "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/downloader", "//src/main/java/com/google/devtools/build/lib/remote/http", diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java index 4df6a0cd31eddb..476346bc01fada 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java @@ -55,6 +55,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; +import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -68,6 +69,7 @@ import com.google.devtools.build.lib.authandtls.GoogleAuthUtils; import com.google.devtools.build.lib.clock.JavaClock; import com.google.devtools.build.lib.events.NullEventHandler; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.exec.SpawnCheckingCacheEvent; import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext; import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; @@ -366,7 +368,8 @@ public void onError(Throwable t) { }); // Upload all missing inputs (that is, the virtual action input from above) - client.ensureInputsPresent(context, merkleTree, ImmutableMap.of(), /*force=*/ true); + client.ensureInputsPresent( + context, merkleTree, ImmutableMap.of(), /* force= */ true, new Reporter(new EventBus())); } @Test diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java index 0f78c2607b582c..8fee1fc6dc8c35 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java @@ -72,7 +72,7 @@ public void setUp() throws IOException { protected AbstractActionInputPrefetcher createPrefetcher(Map cas) { RemoteCache remoteCache = newCache(options, digestUtil, cas); return new RemoteActionInputFetcher( - new Reporter(new EventBus()), + new Reporter(eventBus), "none", "none", remoteCache, diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java index 5a724a7f4a0296..6fd7d0acdea5a9 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java @@ -16,6 +16,7 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -29,6 +30,9 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; @@ -43,8 +47,10 @@ import com.google.devtools.build.lib.clock.JavaClock; import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder; import com.google.devtools.build.lib.collect.nestedset.Order; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext; import com.google.devtools.build.lib.exec.util.FakeOwner; +import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; @@ -73,6 +79,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -80,7 +87,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -102,6 +108,8 @@ public class RemoteCacheTest { private FakeActionInputFileCache fakeFileCache; private ListeningScheduledExecutorService retryService; + private EventBus eventBus; + private Reporter reporter; @Before public void setUp() throws Exception { @@ -127,6 +135,8 @@ public void setUp() throws Exception { artifactRoot = ArtifactRoot.asDerivedRoot(execRoot, RootType.Output, "outputs"); artifactRoot.getRoot().asPath().createDirectoryAndParents(); retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + eventBus = new EventBus(); + reporter = new Reporter(eventBus); } @After @@ -321,6 +331,39 @@ public void upload_failedUploads_doNotDeduplicate() throws Exception { .isEmpty(); } + @Test + public void ensureInputsPresent_missingInputs_sendLostInputsEvent() throws Exception { + RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient()); + RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol)); + remoteCache.setRemotePathChecker( + (context, path) -> path.relativeTo(execRoot).equals(PathFragment.create("foo"))); + var lostInputsEvents = new ConcurrentLinkedQueue(); + eventBus.register( + new Object() { + @Subscribe + @AllowConcurrentEvents + public void onLostInputs(LostInputsEvent event) { + lostInputsEvents.add(event); + } + }); + + Path path = execRoot.getRelative("foo"); + FileSystemUtils.writeContentAsLatin1(path, "bar"); + SortedMap inputs = new TreeMap<>(); + inputs.put(PathFragment.create("foo"), path); + MerkleTree merkleTree = MerkleTree.build(inputs, digestUtil); + path.delete(); + + assertThrows( + IOException.class, + () -> { + remoteCache.ensureInputsPresent( + remoteActionExecutionContext, merkleTree, ImmutableMap.of(), false, reporter); + }); + + assertThat(lostInputsEvents).hasSize(1); + } + @Test public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUploadTasks() throws Exception { @@ -349,7 +392,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl .when(cacheProtocol) .uploadFile(any(), any(), any()); - Path path = fs.getPath("/execroot/foo"); + Path path = execRoot.getRelative("foo"); FileSystemUtils.writeContentAsLatin1(path, "bar"); SortedMap inputs = new TreeMap<>(); inputs.put(PathFragment.create("foo"), path); @@ -361,7 +404,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl () -> { try { remoteCache.ensureInputsPresent( - remoteActionExecutionContext, merkleTree, ImmutableMap.of(), false); + remoteActionExecutionContext, merkleTree, ImmutableMap.of(), false, reporter); } catch (IOException | InterruptedException ignored) { // ignored } finally { @@ -424,7 +467,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl .when(cacheProtocol) .uploadFile(any(), any(), any()); - Path path = fs.getPath("/execroot/foo"); + Path path = execRoot.getRelative("foo"); FileSystemUtils.writeContentAsLatin1(path, "bar"); SortedMap inputs = new TreeMap<>(); inputs.put(PathFragment.create("foo"), path); @@ -436,7 +479,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl () -> { try { remoteCache.ensureInputsPresent( - remoteActionExecutionContext, merkleTree, ImmutableMap.of(), false); + remoteActionExecutionContext, merkleTree, ImmutableMap.of(), false, reporter); } catch (IOException ignored) { // ignored } catch (InterruptedException e) { @@ -506,11 +549,11 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl .when(cacheProtocol) .uploadFile(any(), any(), any()); - Path foo = fs.getPath("/execroot/foo"); + Path foo = execRoot.getRelative("foo"); FileSystemUtils.writeContentAsLatin1(foo, "foo"); - Path bar = fs.getPath("/execroot/bar"); + Path bar = execRoot.getRelative("bar"); FileSystemUtils.writeContentAsLatin1(bar, "bar"); - Path qux = fs.getPath("/execroot/qux"); + Path qux = execRoot.getRelative("qux"); FileSystemUtils.writeContentAsLatin1(qux, "qux"); SortedMap input1 = new TreeMap<>(); @@ -530,7 +573,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl () -> { try { remoteCache.ensureInputsPresent( - remoteActionExecutionContext, merkleTree1, ImmutableMap.of(), false); + remoteActionExecutionContext, merkleTree1, ImmutableMap.of(), false, reporter); } catch (IOException ignored) { // ignored } catch (InterruptedException e) { @@ -544,7 +587,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl () -> { try { remoteCache.ensureInputsPresent( - remoteActionExecutionContext, merkleTree2, ImmutableMap.of(), false); + remoteActionExecutionContext, merkleTree2, ImmutableMap.of(), false, reporter); } catch (InterruptedException | IOException ignored) { // ignored } finally { @@ -598,17 +641,17 @@ public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception .when(cacheProtocol) .uploadFile(any(), any(), any()); RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol)); - Path path = fs.getPath("/execroot/foo"); + Path path = execRoot.getRelative("foo"); FileSystemUtils.writeContentAsLatin1(path, "bar"); SortedMap inputs = ImmutableSortedMap.of(PathFragment.create("foo"), path); MerkleTree merkleTree = MerkleTree.build(inputs, digestUtil); IOException e = - Assert.assertThrows( + assertThrows( IOException.class, () -> remoteCache.ensureInputsPresent( - remoteActionExecutionContext, merkleTree, ImmutableMap.of(), false)); + remoteActionExecutionContext, merkleTree, ImmutableMap.of(), false, reporter)); assertThat(e).hasMessageThat().contains("upload failed"); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java index 2a95c1349db099..bca097555e8a82 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java @@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; +import com.google.common.eventbus.EventBus; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CachedActionResult; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.util.DigestUtil; @@ -66,7 +68,8 @@ public void setup() { "none", "none", /* remoteInstanceName= */ "foo", - /* acceptCached= */ true); + /* acceptCached= */ true, + new Reporter(new EventBus())); } @Test diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java index 7884ad37e85d74..51e916d5bdc866 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java @@ -301,7 +301,7 @@ public void nonCachableSpawnsShouldNotBeCached_localFallback() throws Exception runner.exec(spawn, policy); verify(localRunner).exec(spawn, policy); - verify(cache).ensureInputsPresent(any(), any(), any(), anyBoolean()); + verify(cache).ensureInputsPresent(any(), any(), any(), anyBoolean(), any()); verifyNoMoreInteractions(cache); }