diff --git a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java index 8e4e95afd56810..bc2f6ec59940d9 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java @@ -13,10 +13,11 @@ // limitations under the License. package com.google.devtools.build.lib.remote; -import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection; import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool; @@ -28,9 +29,12 @@ import io.netty.util.ReferenceCounted; import io.reactivex.rxjava3.annotations.CheckReturnValue; import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; import io.reactivex.rxjava3.core.SingleSource; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.functions.Function; import java.io.IOException; +import java.util.concurrent.ExecutionException; /** * A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count. @@ -80,19 +84,48 @@ public ListenableFuture withChannelFuture( } public T withChannelBlocking(Function source) - throws IOException, InterruptedException { + throws ExecutionException, IOException, InterruptedException { try { - return withChannel(channel -> Single.just(source.apply(channel))).blockingGet(); - } catch (RuntimeException e) { + return withChannelBlockingGet(source); + } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause != null) { - throwIfInstanceOf(cause, IOException.class); - throwIfInstanceOf(cause, InterruptedException.class); - } + Throwables.throwIfInstanceOf(cause, IOException.class); + Throwables.throwIfUnchecked(cause); throw e; } } + // prevents rxjava silent possible wrap of RuntimeException and misinterpretation + private T withChannelBlockingGet(Function source) + throws ExecutionException, InterruptedException { + SettableFuture future = SettableFuture.create(); + withChannel(channel -> Single.just(source.apply(channel))) + .subscribe( + new SingleObserver() { + @Override + public void onError(Throwable t) { + future.setException(t); + } + + @Override + public void onSuccess(T t) { + future.set(t); + } + + @Override + public void onSubscribe(Disposable d) { + future.addListener( + () -> { + if (future.isCancelled()) { + d.dispose(); + } + }, + directExecutor()); + } + }); + return future.get(); + } + @CheckReturnValue public Single withChannel(Function> source) { return dynamicConnectionPool diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 18283b6b9c2774..18db228f3e254b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -188,6 +188,7 @@ private static boolean shouldEnableRemoteDownloader(RemoteOptions options) { return !Strings.isNullOrEmpty(options.remoteDownloader); } + @Nullable private static ServerCapabilities getAndVerifyServerCapabilities( RemoteOptions remoteOptions, ReferenceCountedChannel channel, @@ -535,7 +536,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { digestUtil, ServerCapabilitiesRequirement.CACHE); } - } catch (IOException e) { + } catch (AbruptExitException e) { + throw e; // prevent abrupt interception + } catch (Exception e) { String errorMessage = "Failed to query remote execution capabilities: " + Utils.grpcAwareErrorMessage(e, verboseFailures); @@ -560,12 +563,12 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) { try { remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority); - } catch (IOException e) { + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; - } catch (InterruptedException e) { - handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE); - return; } if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) { remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName; @@ -588,7 +591,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { !remoteOptions.remoteOutputsMode.downloadAllOutputs(), digestUtil, cacheClient); - } catch (IOException e) { + } catch (Exception e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; } @@ -653,7 +656,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { !remoteOptions.remoteOutputsMode.downloadAllOutputs(), digestUtil, cacheClient); - } catch (IOException e) { + } catch (Exception e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; } @@ -699,7 +702,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { } private static void handleInitFailure( - CommandEnvironment env, IOException e, Code remoteExecutionCode) { + CommandEnvironment env, Exception e, Code remoteExecutionCode) { env.getReporter().handle(Event.error(e.getMessage())); env.getBlazeModuleEnvironment() .exit( @@ -794,7 +797,7 @@ private static void checkClientServerCompatibility( } @Override - public void afterCommand() throws AbruptExitException { + public void afterCommand() { Preconditions.checkNotNull(blockWaitingModule, "blockWaitingModule must not be null"); // Some cleanup tasks must wait until every other BlazeModule's afterCommand() has run, as diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java index 0afb9894217f3c..9a21977d2becc5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java @@ -31,7 +31,6 @@ import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import io.grpc.CallCredentials; import io.grpc.Channel; -import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; @@ -73,21 +72,17 @@ public ServerCapabilities get(String buildRequestId, String commandId) RequestMetadata metadata = TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "capabilities", null); RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata); - try { - GetCapabilitiesRequest request = - instanceName == null - ? GetCapabilitiesRequest.getDefaultInstance() - : GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build(); - return retrier.execute( - () -> - channel.withChannelBlocking( - channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request))); - } catch (StatusRuntimeException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - throw new IOException(e); - } + GetCapabilitiesRequest request = + instanceName == null + ? GetCapabilitiesRequest.getDefaultInstance() + : GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build(); + ServerCapabilities caps = + retrier.execute( + () -> + channel.withChannelBlocking( + channel -> + capabilitiesBlockingStub(context, channel).getCapabilities(request))); + return caps; } static class ClientServerCompatibilityStatus {