From b4343cc494842b6552348dc6300657a2a8b00128 Mon Sep 17 00:00:00 2001 From: Googler Date: Wed, 17 Jan 2024 05:02:00 -0800 Subject: [PATCH] Automated rollback of commit 915fb3e861dd28e16f42072101adf498242d26d0. *** Reason for rollback *** Might cause build to hang forever. b/320630578 *** Original change description *** Optimize prefetchInputs. Use a pre-allocated array to hold the intermediate transfers to avoid allocations. Replace some of RxJava code with Futures to avoid RxJava overheads. This improves the perfromance of prefetchInputs on a large set of inputs from ~400ms to ~16ms. Fixes #20555. Closes #20557. PiperOrigin-RevId: 599135847 Change-Id: Idae6a1c57e634d16091e31e097b16ca97a67e62d --- .../remote/AbstractActionInputPrefetcher.java | 77 ++++++++----------- .../devtools/build/lib/remote/util/Utils.java | 68 +++------------- .../remote/ActionInputPrefetcherTestBase.java | 3 +- 3 files changed, 46 insertions(+), 102 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java index 319110de3f144d..bbe91c809aa703 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java +++ b/src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java @@ -16,20 +16,17 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.util.concurrent.Futures.immediateFailedFuture; -import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable; import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture; +import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer; import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; -import static com.google.devtools.build.lib.remote.util.Utils.mergeBulkTransfer; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.flogger.GoogleLogger; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.actions.Action; import com.google.devtools.build.lib.actions.ActionExecutionMetadata; @@ -47,6 +44,8 @@ import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.util.AsyncTaskCache; +import com.google.devtools.build.lib.remote.util.RxUtils; +import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult; import com.google.devtools.build.lib.remote.util.TempPathGenerator; import com.google.devtools.build.lib.vfs.FileSymlinkLoopException; import com.google.devtools.build.lib.vfs.FileSystemUtils; @@ -54,6 +53,8 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -282,10 +283,6 @@ public ListenableFuture prefetchFiles( files.add(input); } - if (files.isEmpty()) { - return immediateVoidFuture(); - } - // Collect the set of directories whose output permissions must be set at the end of this call. // This responsibility cannot lie with the downloading of an individual file, because multiple // files may be concurrently downloaded into the same directory within a single call to @@ -294,38 +291,30 @@ public ListenableFuture prefetchFiles( // it must still synchronize on the output permissions having been set. Set dirsWithOutputPermissions = Sets.newConcurrentHashSet(); - // Using plain futures to avoid RxJava overheads. - List> transfers = new ArrayList<>(files.size()); - try (var s = Profiler.instance().profile("compose prefetches")) { - for (var file : files) { - transfers.add( - prefetchFile(action, dirsWithOutputPermissions, metadataSupplier, file, priority)); - } - } - - ListenableFuture mergedTransfer; - try (var s = Profiler.instance().profile("mergeBulkTransfer")) { - mergedTransfer = mergeBulkTransfer(transfers); - } - - return Futures.transformAsync( - mergedTransfer, - unused -> { - try { - // Set output permissions on tree artifact subdirectories, matching the behavior of - // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions. - for (Path dir : dirsWithOutputPermissions) { - directoryTracker.setOutputPermissions(dir); - } - } catch (IOException e) { - return immediateFailedFuture(e); - } - return immediateVoidFuture(); - }, - directExecutor()); + Completable prefetch = + mergeBulkTransfer( + Flowable.fromIterable(files) + .flatMapSingle( + input -> + prefetchFile( + action, + dirsWithOutputPermissions, + metadataSupplier, + input, + priority))) + .doOnComplete( + // Set output permissions on tree artifact subdirectories, matching the behavior of + // SkyframeActionExecutor#checkOutputs for artifacts produced by local actions. + () -> { + for (Path dir : dirsWithOutputPermissions) { + directoryTracker.setOutputPermissions(dir); + } + }); + + return toListenableFuture(prefetch); } - private ListenableFuture prefetchFile( + private Single prefetchFile( ActionExecutionMetadata action, Set dirsWithOutputPermissions, MetadataSupplier metadataSupplier, @@ -334,14 +323,14 @@ private ListenableFuture prefetchFile( try { if (input instanceof VirtualActionInput) { prefetchVirtualActionInput((VirtualActionInput) input); - return immediateVoidFuture(); + return Single.just(TransferResult.ok()); } PathFragment execPath = input.getExecPath(); FileArtifactValue metadata = metadataSupplier.getMetadata(input); if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) { - return immediateVoidFuture(); + return Single.just(TransferResult.ok()); } @Nullable Symlink symlink = maybeGetSymlink(action, input, metadata, metadataSupplier); @@ -368,9 +357,11 @@ private ListenableFuture prefetchFile( result = result.andThen(plantSymlink(symlink)); } - return toListenableFuture(result); - } catch (IOException | InterruptedException e) { - return immediateFailedFuture(e); + return RxUtils.toTransferResult(result); + } catch (IOException e) { + return Single.just(TransferResult.error(e)); + } catch (InterruptedException e) { + return Single.just(TransferResult.interrupted()); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java index 61c4c4f4b9d9a4..3e8adc3a9de49c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java @@ -13,12 +13,8 @@ // limitations under the License. package com.google.devtools.build.lib.remote.util; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Throwables.getStackTraceAsString; -import static com.google.common.util.concurrent.Futures.immediateFailedFuture; -import static com.google.common.util.concurrent.Futures.immediateVoidFuture; -import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.util.stream.Collectors.joining; import build.bazel.remote.execution.v2.Action; @@ -33,6 +29,7 @@ import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ExecutionRequirements; import com.google.devtools.build.lib.actions.Spawn; @@ -420,11 +417,11 @@ public static ListenableFuture downloadAsActionResult( try { return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray())); } catch (InvalidProtocolBufferException e) { - return immediateFailedFuture(e); + return Futures.immediateFailedFuture(e); } }, - directExecutor()) - .catching(CacheNotFoundException.class, (e) -> null, directExecutor()); + MoreExecutors.directExecutor()) + .catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor()); } public static void verifyBlobContents(Digest expected, Digest actual) throws IOException { @@ -486,15 +483,15 @@ public ByteString getContents() { */ public static ListenableFuture refreshIfUnauthenticatedAsync( AsyncCallable call, CallCredentialsProvider callCredentialsProvider) { - checkNotNull(call); - checkNotNull(callCredentialsProvider); + Preconditions.checkNotNull(call); + Preconditions.checkNotNull(callCredentialsProvider); try { return Futures.catchingAsync( call.call(), Throwable.class, (e) -> refreshIfUnauthenticatedAsyncOnException(e, call, callCredentialsProvider), - directExecutor()); + MoreExecutors.directExecutor()); } catch (Throwable t) { return refreshIfUnauthenticatedAsyncOnException(t, call, callCredentialsProvider); } @@ -514,15 +511,15 @@ private static ListenableFuture refreshIfUnauthenticatedAsyncOnException( } } - return immediateFailedFuture(t); + return Futures.immediateFailedFuture(t); } /** Same as {@link #refreshIfUnauthenticatedAsync} but calling a synchronous code block. */ public static V refreshIfUnauthenticated( Callable call, CallCredentialsProvider callCredentialsProvider) throws IOException, InterruptedException { - checkNotNull(call); - checkNotNull(callCredentialsProvider); + Preconditions.checkNotNull(call); + Preconditions.checkNotNull(callCredentialsProvider); try { return call.call(); @@ -621,49 +618,4 @@ public static void waitForBulkTransfer( throw bulkTransferException; } } - - public static ListenableFuture mergeBulkTransfer( - Iterable> transfers) { - return Futures.whenAllComplete(transfers) - .callAsync( - () -> { - BulkTransferException bulkTransferException = null; - - for (var transfer : transfers) { - IOException error = null; - try { - transfer.get(); - } catch (CancellationException e) { - return immediateFailedFuture(new InterruptedException()); - } catch (InterruptedException e) { - return immediateFailedFuture(e); - } catch (ExecutionException e) { - var cause = e.getCause(); - if (cause instanceof InterruptedException) { - return immediateFailedFuture(cause); - } else if (cause instanceof IOException) { - error = (IOException) cause; - } else { - error = new IOException(cause); - } - } - - if (error == null) { - continue; - } - - if (bulkTransferException == null) { - bulkTransferException = new BulkTransferException(); - } - bulkTransferException.add(error); - } - - if (bulkTransferException != null) { - return immediateFailedFuture(bulkTransferException); - } - - return immediateVoidFuture(); - }, - directExecutor()); - } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java index b39f2cf6836091..76b949eaf6d798 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ActionInputPrefetcherTestBase.java @@ -71,6 +71,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -759,7 +760,7 @@ public void prefetchFile_interruptingMetadataSupplier_interruptsDownload() throw prefetcher.prefetchFiles( action, ImmutableList.of(a1), interruptedMetadataSupplier, Priority.MEDIUM); - assertThrows(InterruptedException.class, () -> getFromFuture(future)); + assertThrows(CancellationException.class, future::get); } @Test