diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java index 319110de3f144d..66c34323e55d7a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java +++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java @@ -26,6 +26,7 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.flogger.GoogleLogger; @@ -46,6 +47,7 @@ import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.util.AsyncTaskCache; import com.google.devtools.build.lib.remote.util.TempPathGenerator; import com.google.devtools.build.lib.vfs.FileSymlinkLoopException; @@ -79,8 +81,6 @@ public abstract class AbstractActionInputPrefetcher implements ActionInputPrefet protected final Path execRoot; protected final RemoteOutputChecker remoteOutputChecker; - private final Set missingActionInputs = Sets.newConcurrentHashSet(); - private final ActionOutputDirectoryHelper outputDirectoryHelper; /** The state of a directory tracked by {@link DirectoryTracker}, as explained below. */ @@ -538,7 +538,8 @@ private Completable downloadFileNoCheckRx( .doOnError( error -> { if (error instanceof CacheNotFoundException) { - missingActionInputs.add(actionInput); + reporter.post( + new LostInputsEvent(ImmutableList.of(actionInput.getExecPath()))); } })); @@ -698,10 +699,6 @@ public void flushOutputTree() throws InterruptedException { downloadCache.awaitInProgressTasks(); } - public ImmutableSet getMissingActionInputs() { - return ImmutableSet.copyOf(missingActionInputs); - } - public RemoteOutputChecker getRemoteOutputChecker() { return remoteOutputChecker; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index fae70705a23fea..27e73543830552 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -96,6 +96,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", + "//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event", "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/downloader", "//src/main/java/com/google/devtools/build/lib/remote/grpc", @@ -232,6 +233,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/events", "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", + "//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", @@ -250,10 +252,12 @@ java_library( "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:artifacts", "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", + "//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event", "//src/main/java/com/google/devtools/build/lib/skyframe:action_execution_value", "//src/main/java/com/google/devtools/build/lib/skyframe:sky_functions", "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value", "//src/main/java/com/google/devtools/build/skyframe", + "//third_party:guava", "//third_party:jsr305", ], ) diff --git a/src/main/java/com/google/devtools/build/lib/remote/LeaseService.java b/src/main/java/com/google/devtools/build/lib/remote/LeaseService.java index f9ef0e05530082..2058fa14c1d178 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/LeaseService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/LeaseService.java @@ -13,14 +13,15 @@ // limitations under the License. package com.google.devtools.build.lib.remote; -import com.google.devtools.build.lib.actions.ActionInput; +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.Subscribe; import com.google.devtools.build.lib.actions.FileArtifactValue; import com.google.devtools.build.lib.actions.cache.ActionCache; +import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.skyframe.ActionExecutionValue; import com.google.devtools.build.lib.skyframe.SkyFunctions; import com.google.devtools.build.lib.skyframe.TreeArtifactValue; import com.google.devtools.build.skyframe.MemoizingEvaluator; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -30,6 +31,7 @@ public class LeaseService { @Nullable private final ActionCache actionCache; private final AtomicBoolean leaseExtensionStarted = new AtomicBoolean(false); @Nullable LeaseExtension leaseExtension; + private final AtomicBoolean hasMissingActionInputs = new AtomicBoolean(false); public LeaseService( MemoizingEvaluator memoizingEvaluator, @@ -48,12 +50,18 @@ public void finalizeAction() { } } - public void finalizeExecution(Set missingActionInputs) { + @AllowConcurrentEvents + @Subscribe + public void onLostInputs(LostInputsEvent event) { + hasMissingActionInputs.set(true); + } + + public void finalizeExecution() { if (leaseExtension != null) { leaseExtension.stop(); } - if (!missingActionInputs.isEmpty()) { + if (hasMissingActionInputs.getAndSet(false)) { handleMissingInputs(); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index ad3271b6c7c3d8..0799016c6b9295 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.remote; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; @@ -22,17 +23,19 @@ import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer; import static com.google.devtools.build.lib.remote.util.RxUtils.toTransferResult; import static java.lang.String.format; -import static java.util.concurrent.TimeUnit.SECONDS; import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.Directory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.SilentCloseable; +import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; @@ -40,6 +43,7 @@ import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult; +import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.Message; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Completable; @@ -59,13 +63,33 @@ /** A {@link RemoteCache} with additional functionality needed for remote execution. */ public class RemoteExecutionCache extends RemoteCache { + public interface RemotePathChecker { + boolean isRemote(Path path) throws IOException; + } + + private RemotePathChecker remotePathChecker = + new RemotePathChecker() { + @Override + public boolean isRemote(Path path) throws IOException { + var fs = path.getFileSystem(); + if (fs instanceof RemoteActionFileSystem) { + var remoteActionFileSystem = (RemoteActionFileSystem) fs; + return remoteActionFileSystem.isRemote(path); + } + return false; + } + }; + public RemoteExecutionCache( - RemoteCacheClient protocolImpl, - RemoteOptions options, - DigestUtil digestUtil) { + RemoteCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) { super(protocolImpl, options, digestUtil); } + @VisibleForTesting + void setRemotePathChecker(RemotePathChecker remotePathChecker) { + this.remotePathChecker = remotePathChecker; + } + /** * Ensures that the tree structure of the inputs, the input files themselves, and the command are * available in the remote cache, such that the tree can be reassembled and executed on another @@ -82,7 +106,9 @@ public void ensureInputsPresent( RemoteActionExecutionContext context, MerkleTree merkleTree, Map additionalInputs, - boolean force) + boolean force, + Path execRoot, + Reporter reporter) throws IOException, InterruptedException { Iterable merkleTreeAllDigests; try (SilentCloseable s = Profiler.instance().profile("merkleTree.getAllDigests()")) { @@ -95,7 +121,8 @@ public void ensureInputsPresent( } Flowable uploads = - createUploadTasks(context, merkleTree, additionalInputs, allDigests, force) + createUploadTasks( + context, merkleTree, additionalInputs, allDigests, force, execRoot, reporter) .flatMapPublisher( result -> Flowable.using( @@ -113,10 +140,7 @@ public void ensureInputsPresent( })); try { - // Workaround for https://github.com/bazelbuild/bazel/issues/19513. - if (!mergeBulkTransfer(uploads).blockingAwait(options.remoteTimeout.getSeconds(), SECONDS)) { - throw new IOException("Timed out when waiting for uploads"); - } + mergeBulkTransfer(uploads).blockingAwait(); } catch (RuntimeException e) { Throwable cause = e.getCause(); if (cause != null) { @@ -131,7 +155,9 @@ private ListenableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, MerkleTree merkleTree, - Map additionalInputs) { + Map additionalInputs, + Path execRoot, + Reporter reporter) { Directory node = merkleTree.getDirectoryByDigest(digest); if (node != null) { return cacheProtocol.uploadBlob(context, digest, node.toByteString()); @@ -142,7 +168,19 @@ private ListenableFuture uploadBlob( if (file.getBytes() != null) { return cacheProtocol.uploadBlob(context, digest, file.getBytes()); } - return cacheProtocol.uploadFile(context, digest, file.getPath()); + + var path = checkNotNull(file.getPath()); + try { + if (remotePathChecker.isRemote(path)) { + // A remote input is missing from remote cache, probably evicted by the remote server. + var execPath = path.relativeTo(execRoot); + reporter.post(new LostInputsEvent(ImmutableList.of(execPath))); + throw new IOException(format("%s (%s) was evicted from remote cache", execPath, digest)); + } + } catch (IOException e) { + return immediateFailedFuture(e); + } + return cacheProtocol.uploadFile(context, digest, path); } Message message = additionalInputs.get(digest); @@ -169,14 +207,23 @@ private Single> createUploadTasks( MerkleTree merkleTree, Map additionalInputs, Iterable allDigests, - boolean force) { + boolean force, + Path execRoot, + Reporter reporter) { return Single.using( () -> Profiler.instance().profile("collect digests"), ignored -> Flowable.fromIterable(allDigests) .flatMapMaybe( digest -> - maybeCreateUploadTask(context, merkleTree, additionalInputs, digest, force)) + maybeCreateUploadTask( + context, + merkleTree, + additionalInputs, + digest, + force, + execRoot, + reporter)) .collect(toImmutableList()), SilentCloseable::close); } @@ -186,7 +233,9 @@ private Maybe maybeCreateUploadTask( MerkleTree merkleTree, Map additionalInputs, Digest digest, - boolean force) { + boolean force, + Path execRoot, + Reporter reporter) { return Maybe.create( emitter -> { AsyncSubject completion = AsyncSubject.create(); @@ -211,7 +260,12 @@ private Maybe maybeCreateUploadTask( return toCompletable( () -> uploadBlob( - context, uploadTask.digest, merkleTree, additionalInputs), + context, + uploadTask.digest, + merkleTree, + additionalInputs, + execRoot, + reporter), directExecutor()); }), /* onAlreadyRunning= */ () -> emitter.onSuccess(uploadTask), diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java index c6cd98d66c2a16..6377c920b3e183 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java @@ -1503,7 +1503,9 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force) .withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache merkleTree, additionalInputs, - force); + force, + execRoot, + reporter); } finally { maybeReleaseRemoteActionBuildingSemaphore(); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 72dae2a78e118e..051c43a15542ae 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -635,7 +635,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { buildRequestId, invocationId, remoteOptions.remoteInstanceName, - remoteOptions.remoteAcceptCached)); + remoteOptions.remoteAcceptCached, + env::getExecRoot, + env.getReporter())); } else { if (enableDiskCache) { try { @@ -1053,6 +1055,7 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB env.getSkyframeExecutor().getEvaluator(), env.getBlazeWorkspace().getPersistentActionCache(), leaseExtension); + env.getEventBus().register(leaseService); if (outputService instanceof RemoteOutputService remoteOutputService) { remoteOutputService.setRemoteOutputChecker(remoteOutputChecker); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java index f922a38a1ab865..4b947d55acb5ed 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java @@ -168,11 +168,7 @@ public void finalizeBuild(boolean buildSuccessful) { @Subscribe public void onExecutionPhaseCompleteEvent(ExecutionPhaseCompleteEvent event) { if (leaseService != null) { - var missingActionInputs = ImmutableSet.of(); - if (actionInputFetcher != null) { - missingActionInputs = actionInputFetcher.getMissingActionInputs(); - } - leaseService.finalizeExecution(missingActionInputs); + leaseService.finalizeExecution(); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java index 8fb79e208f4cc7..5358b223fb0697 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java @@ -28,6 +28,8 @@ import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; import com.google.devtools.build.lib.analysis.platform.PlatformUtils; +import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions.CredentialHelperOption; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.profiler.SilentCloseable; @@ -48,6 +50,7 @@ import java.time.Duration; import java.util.Map; import java.util.TreeSet; +import java.util.function.Supplier; /** The remote package's implementation of {@link RepositoryRemoteExecutor}. */ public class RemoteRepositoryRemoteExecutor implements RepositoryRemoteExecutor { @@ -60,6 +63,8 @@ public class RemoteRepositoryRemoteExecutor implements RepositoryRemoteExecutor private final String remoteInstanceName; private final boolean acceptCached; + private final Supplier execRootSupplier; + private final Reporter reporter; public RemoteRepositoryRemoteExecutor( RemoteExecutionCache remoteCache, @@ -68,7 +73,9 @@ public RemoteRepositoryRemoteExecutor( String buildRequestId, String commandId, String remoteInstanceName, - boolean acceptCached) { + boolean acceptCached, + Supplier execRootSupplier, + Reporter reporter) { this.remoteCache = remoteCache; this.remoteExecutor = remoteExecutor; this.digestUtil = digestUtil; @@ -76,6 +83,8 @@ public RemoteRepositoryRemoteExecutor( this.commandId = commandId; this.remoteInstanceName = remoteInstanceName; this.acceptCached = acceptCached; + this.execRootSupplier = execRootSupplier; + this.reporter = reporter; } private ExecutionResult downloadOutErr(RemoteActionExecutionContext context, ActionResult result) @@ -162,7 +171,13 @@ public ExecutionResult execute( additionalInputs.put(actionDigest, action); additionalInputs.put(commandHash, command); - remoteCache.ensureInputsPresent(context, merkleTree, additionalInputs, /* force= */ true); + remoteCache.ensureInputsPresent( + context, + merkleTree, + additionalInputs, + /* force= */ true, + execRootSupplier.get(), + reporter); } try (SilentCloseable c = diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java index a67cf859bef185..5f1f0b39d4d5fd 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java @@ -13,10 +13,13 @@ // limitations under the License. package com.google.devtools.build.lib.remote; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.runtime.RepositoryRemoteExecutor; import com.google.devtools.build.lib.runtime.RepositoryRemoteExecutorFactory; +import com.google.devtools.build.lib.vfs.Path; +import java.util.function.Supplier; /** Factory for {@link RemoteRepositoryRemoteExecutor}. */ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorFactory { @@ -29,6 +32,8 @@ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorF private final String remoteInstanceName; private final boolean acceptCached; + private final Supplier execRootSupplier; + private final Reporter reporter; RemoteRepositoryRemoteExecutorFactory( RemoteExecutionCache remoteExecutionCache, @@ -37,7 +42,9 @@ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorF String buildRequestId, String commandId, String remoteInstanceName, - boolean acceptCached) { + boolean acceptCached, + Supplier execRootSupplier, + Reporter reporter) { this.remoteExecutionCache = remoteExecutionCache; this.remoteExecutor = remoteExecutor; this.digestUtil = digestUtil; @@ -45,6 +52,8 @@ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorF this.commandId = commandId; this.remoteInstanceName = remoteInstanceName; this.acceptCached = acceptCached; + this.execRootSupplier = execRootSupplier; + this.reporter = reporter; } @Override @@ -56,6 +65,8 @@ public RepositoryRemoteExecutor create() { buildRequestId, commandId, remoteInstanceName, - acceptCached); + acceptCached, + execRootSupplier, + reporter); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD index e3ac9bb7432b02..ef87c08f39519b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD @@ -30,6 +30,16 @@ java_library( ], ) +java_library( + name = "lost_inputs_event", + srcs = ["LostInputsEvent.java"], + deps = [ + "//src/main/java/com/google/devtools/build/lib/events", + "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", + "//third_party:guava", + ], +) + java_library( name = "common", srcs = glob( @@ -37,6 +47,7 @@ java_library( exclude = [ "BulkTransferException.java", "CacheNotFoundException.java", + "LostInputsEvent.java", ], ), deps = [ diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/LostInputsEvent.java b/src/main/java/com/google/devtools/build/lib/remote/common/LostInputsEvent.java new file mode 100644 index 00000000000000..1d0697a9c9be91 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/common/LostInputsEvent.java @@ -0,0 +1,31 @@ +// Copyright 2024 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.common; + +import com.google.common.collect.ImmutableList; +import com.google.devtools.build.lib.events.ExtendedEventHandler.Postable; +import com.google.devtools.build.lib.vfs.PathFragment; + +/** An event that is sent when inputs to an action are lost from remote server. */ +public class LostInputsEvent implements Postable { + private final ImmutableList lostInputs; + + public LostInputsEvent(ImmutableList lostInputs) { + this.lostInputs = lostInputs; + } + + public ImmutableList getLostInputs() { + return lostInputs; + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java index b39f2cf6836091..c875236c853819 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java @@ -36,6 +36,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import com.google.common.hash.HashCode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -53,6 +56,7 @@ import com.google.devtools.build.lib.actions.FileArtifactValue; import com.google.devtools.build.lib.actions.FileArtifactValue.RemoteFileArtifactValue; import com.google.devtools.build.lib.actions.util.ActionsTestUtil; +import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.util.TempPathGenerator; import com.google.devtools.build.lib.skyframe.TreeArtifactValue; import com.google.devtools.build.lib.testing.vfs.SpiedFileSystem; @@ -71,6 +75,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -115,6 +120,7 @@ void setChmodDelay(Duration chmodDelay) { protected Path execRoot; protected ArtifactRoot artifactRoot; protected TempPathGenerator tempPathGenerator; + protected EventBus eventBus; protected ActionExecutionMetadata action; @@ -134,6 +140,8 @@ public void setUp() throws IOException { Path tempDir = fs.getPath("/tmp"); tempDir.createDirectoryAndParents(); tempPathGenerator = new TempPathGenerator(tempDir); + + eventBus = new EventBus(); } protected Artifact createRemoteArtifact( @@ -804,11 +812,20 @@ public void prefetchFiles_onInterrupt_deletePartialDownloadedFile() throws Excep } @Test - public void missingInputs_addedToList() { + public void missingInputs_sendLostInputsEvent() { Map metadata = new HashMap<>(); Map cas = new HashMap<>(); Artifact a = createRemoteArtifact("file", "hello world", metadata, /* cas= */ null); AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas); + var lostInputsEvents = new ConcurrentLinkedQueue(); + eventBus.register( + new Object() { + @Subscribe + @AllowConcurrentEvents + public void onLostInputsEvent(LostInputsEvent event) { + lostInputsEvents.add(event); + } + }); assertThrows( Exception.class, @@ -817,7 +834,8 @@ public void missingInputs_addedToList() { prefetcher.prefetchFiles( action, metadata.keySet(), metadata::get, Priority.MEDIUM))); - assertThat(prefetcher.getMissingActionInputs()).contains(a); + assertThat(lostInputsEvents).hasSize(1); + assertThat(lostInputsEvents.peek().getLostInputs()).containsExactly(a.getExecPath()); } protected static void wait(ListenableFuture future) diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index b273a8eecc32e4..02dee734fc9088 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -32,6 +32,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/actions:artifacts", "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", "//src/main/java/com/google/devtools/build/lib/remote:abstract_action_input_prefetcher", + "//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value", "//src/main/java/com/google/devtools/build/lib/testing/vfs:spied_filesystem", @@ -105,6 +106,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", + "//src/main/java/com/google/devtools/build/lib/remote/common:lost_inputs_event", "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/downloader", "//src/main/java/com/google/devtools/build/lib/remote/http", diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java index 4df6a0cd31eddb..13b40ec4db9f24 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java @@ -55,6 +55,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; +import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -68,6 +69,7 @@ import com.google.devtools.build.lib.authandtls.GoogleAuthUtils; import com.google.devtools.build.lib.clock.JavaClock; import com.google.devtools.build.lib.events.NullEventHandler; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.exec.SpawnCheckingCacheEvent; import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext; import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; @@ -366,7 +368,13 @@ public void onError(Throwable t) { }); // Upload all missing inputs (that is, the virtual action input from above) - client.ensureInputsPresent(context, merkleTree, ImmutableMap.of(), /*force=*/ true); + client.ensureInputsPresent( + context, + merkleTree, + ImmutableMap.of(), + /* force= */ true, + execRoot, + new Reporter(new EventBus())); } @Test diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java index 0f78c2607b582c..8fee1fc6dc8c35 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java @@ -72,7 +72,7 @@ public void setUp() throws IOException { protected AbstractActionInputPrefetcher createPrefetcher(Map cas) { RemoteCache remoteCache = newCache(options, digestUtil, cas); return new RemoteActionInputFetcher( - new Reporter(new EventBus()), + new Reporter(eventBus), "none", "none", remoteCache, diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java index 5a724a7f4a0296..980dcc1bc2a1d6 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java @@ -16,6 +16,7 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -29,6 +30,9 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; @@ -43,8 +47,10 @@ import com.google.devtools.build.lib.clock.JavaClock; import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder; import com.google.devtools.build.lib.collect.nestedset.Order; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext; import com.google.devtools.build.lib.exec.util.FakeOwner; +import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; @@ -73,6 +79,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -80,7 +87,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -102,6 +108,8 @@ public class RemoteCacheTest { private FakeActionInputFileCache fakeFileCache; private ListeningScheduledExecutorService retryService; + private EventBus eventBus; + private Reporter reporter; @Before public void setUp() throws Exception { @@ -127,6 +135,8 @@ public void setUp() throws Exception { artifactRoot = ArtifactRoot.asDerivedRoot(execRoot, RootType.Output, "outputs"); artifactRoot.getRoot().asPath().createDirectoryAndParents(); retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + eventBus = new EventBus(); + reporter = new Reporter(eventBus); } @After @@ -321,6 +331,45 @@ public void upload_failedUploads_doNotDeduplicate() throws Exception { .isEmpty(); } + @Test + public void ensureInputsPresent_missingInputs_sendLostInputsEvent() throws Exception { + RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient()); + RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol)); + remoteCache.setRemotePathChecker( + (path) -> path.relativeTo(execRoot).equals(PathFragment.create("foo"))); + var lostInputsEvents = new ConcurrentLinkedQueue(); + eventBus.register( + new Object() { + @Subscribe + @AllowConcurrentEvents + public void onLostInputs(LostInputsEvent event) { + lostInputsEvents.add(event); + } + }); + + Path path = execRoot.getRelative("foo"); + FileSystemUtils.writeContentAsLatin1(path, "bar"); + SortedMap inputs = new TreeMap<>(); + inputs.put(PathFragment.create("foo"), path); + MerkleTree merkleTree = MerkleTree.build(inputs, digestUtil); + path.delete(); + + assertThrows( + IOException.class, + () -> { + remoteCache.ensureInputsPresent( + remoteActionExecutionContext, + merkleTree, + ImmutableMap.of(), + false, + execRoot, + reporter); + }); + + assertThat(lostInputsEvents).hasSize(1); + assertThat(lostInputsEvents.peek().getLostInputs()).containsExactly(PathFragment.create("foo")); + } + @Test public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUploadTasks() throws Exception { @@ -349,7 +398,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl .when(cacheProtocol) .uploadFile(any(), any(), any()); - Path path = fs.getPath("/execroot/foo"); + Path path = execRoot.getRelative("foo"); FileSystemUtils.writeContentAsLatin1(path, "bar"); SortedMap inputs = new TreeMap<>(); inputs.put(PathFragment.create("foo"), path); @@ -361,7 +410,12 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl () -> { try { remoteCache.ensureInputsPresent( - remoteActionExecutionContext, merkleTree, ImmutableMap.of(), false); + remoteActionExecutionContext, + merkleTree, + ImmutableMap.of(), + false, + execRoot, + reporter); } catch (IOException | InterruptedException ignored) { // ignored } finally { @@ -424,7 +478,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl .when(cacheProtocol) .uploadFile(any(), any(), any()); - Path path = fs.getPath("/execroot/foo"); + Path path = execRoot.getRelative("foo"); FileSystemUtils.writeContentAsLatin1(path, "bar"); SortedMap inputs = new TreeMap<>(); inputs.put(PathFragment.create("foo"), path); @@ -436,7 +490,12 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl () -> { try { remoteCache.ensureInputsPresent( - remoteActionExecutionContext, merkleTree, ImmutableMap.of(), false); + remoteActionExecutionContext, + merkleTree, + ImmutableMap.of(), + false, + execRoot, + reporter); } catch (IOException ignored) { // ignored } catch (InterruptedException e) { @@ -506,11 +565,11 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl .when(cacheProtocol) .uploadFile(any(), any(), any()); - Path foo = fs.getPath("/execroot/foo"); + Path foo = execRoot.getRelative("foo"); FileSystemUtils.writeContentAsLatin1(foo, "foo"); - Path bar = fs.getPath("/execroot/bar"); + Path bar = execRoot.getRelative("bar"); FileSystemUtils.writeContentAsLatin1(bar, "bar"); - Path qux = fs.getPath("/execroot/qux"); + Path qux = execRoot.getRelative("qux"); FileSystemUtils.writeContentAsLatin1(qux, "qux"); SortedMap input1 = new TreeMap<>(); @@ -530,7 +589,12 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl () -> { try { remoteCache.ensureInputsPresent( - remoteActionExecutionContext, merkleTree1, ImmutableMap.of(), false); + remoteActionExecutionContext, + merkleTree1, + ImmutableMap.of(), + false, + execRoot, + reporter); } catch (IOException ignored) { // ignored } catch (InterruptedException e) { @@ -544,7 +608,12 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl () -> { try { remoteCache.ensureInputsPresent( - remoteActionExecutionContext, merkleTree2, ImmutableMap.of(), false); + remoteActionExecutionContext, + merkleTree2, + ImmutableMap.of(), + false, + execRoot, + reporter); } catch (InterruptedException | IOException ignored) { // ignored } finally { @@ -598,17 +667,22 @@ public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception .when(cacheProtocol) .uploadFile(any(), any(), any()); RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol)); - Path path = fs.getPath("/execroot/foo"); + Path path = execRoot.getRelative("foo"); FileSystemUtils.writeContentAsLatin1(path, "bar"); SortedMap inputs = ImmutableSortedMap.of(PathFragment.create("foo"), path); MerkleTree merkleTree = MerkleTree.build(inputs, digestUtil); IOException e = - Assert.assertThrows( + assertThrows( IOException.class, () -> remoteCache.ensureInputsPresent( - remoteActionExecutionContext, merkleTree, ImmutableMap.of(), false)); + remoteActionExecutionContext, + merkleTree, + ImmutableMap.of(), + false, + execRoot, + reporter)); assertThat(e).hasMessageThat().contains("upload failed"); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java index 2a5a9a5b0d9bf9..aec7d206f02eab 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java @@ -17,6 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -26,11 +27,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; +import com.google.common.eventbus.EventBus; +import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CachedActionResult; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.runtime.RepositoryRemoteExecutor.ExecutionResult; import com.google.devtools.build.lib.vfs.DigestHashFunction; +import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.SyscallCache; import com.google.protobuf.ByteString; import java.io.IOException; @@ -66,7 +70,9 @@ public void setup() { "none", "none", /* remoteInstanceName= */ "foo", - /* acceptCached= */ true); + /* acceptCached= */ true, + () -> mock(Path.class), + new Reporter(new EventBus())); } @Test diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java index 348b786f77e8b8..7d0902eb266e51 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java @@ -302,7 +302,7 @@ public void nonCachableSpawnsShouldNotBeCached_localFallback() throws Exception runner.exec(spawn, policy); verify(localRunner).exec(spawn, policy); - verify(cache).ensureInputsPresent(any(), any(), any(), anyBoolean()); + verify(cache).ensureInputsPresent(any(), any(), any(), anyBoolean(), any(), any()); verifyNoMoreInteractions(cache); }