Skip to content

Commit

Permalink
Avoid discarding SRE state for IO cause
Browse files Browse the repository at this point in the history
Unwrapping all StatusRuntimeExceptions in in ReferenceCountedChannel when caused by IOException will discard critical tracing and retriability. The Retrier evaluations may not see an SRE in the causal chain, and presume it is invariably an unretriable exception. In general, IOExceptions as SRE wrappers are unsuitable containers and are routinely misued either for identification (grpc aware status), or capture (handleInitError).

Partially addresses #18764 (retries will occur with SSL handshake timeout, but the actual connection will not be retried)

Closes #18836.

PiperOrigin-RevId: 546037698
Change-Id: I7f6efcb857c557aa97ad3df085fc032c8538eb9a
  • Loading branch information
werkt authored and copybara-github committed Jul 6, 2023
1 parent 70e66eb commit 6f737d4
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection;
import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool;
Expand All @@ -28,9 +29,12 @@
import io.netty.util.ReferenceCounted;
import io.reactivex.rxjava3.annotations.CheckReturnValue;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.core.SingleSource;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Function;
import java.io.IOException;
import java.util.concurrent.ExecutionException;

/**
* A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count.
Expand Down Expand Up @@ -80,19 +84,48 @@ public <T> ListenableFuture<T> withChannelFuture(
}

public <T> T withChannelBlocking(Function<Channel, T> source)
throws IOException, InterruptedException {
throws ExecutionException, IOException, InterruptedException {
try {
return withChannel(channel -> Single.just(source.apply(channel))).blockingGet();
} catch (RuntimeException e) {
return withChannelBlockingGet(source);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
throwIfInstanceOf(cause, IOException.class);
throwIfInstanceOf(cause, InterruptedException.class);
}
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfUnchecked(cause);
throw e;
}
}

// prevents rxjava silent possible wrap of RuntimeException and misinterpretation
private <T> T withChannelBlockingGet(Function<Channel, T> source)
throws ExecutionException, InterruptedException {
SettableFuture<T> future = SettableFuture.create();
withChannel(channel -> Single.just(source.apply(channel)))
.subscribe(
new SingleObserver<T>() {
@Override
public void onError(Throwable t) {
future.setException(t);
}

@Override
public void onSuccess(T t) {
future.set(t);
}

@Override
public void onSubscribe(Disposable d) {
future.addListener(
() -> {
if (future.isCancelled()) {
d.dispose();
}
},
directExecutor());
}
});
return future.get();
}

@CheckReturnValue
public <T> Single<T> withChannel(Function<Channel, ? extends SingleSource<? extends T>> source) {
return dynamicConnectionPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ private static boolean shouldEnableRemoteDownloader(RemoteOptions options) {
return !Strings.isNullOrEmpty(options.remoteDownloader);
}

@Nullable
private static ServerCapabilities getAndVerifyServerCapabilities(
RemoteOptions remoteOptions,
ReferenceCountedChannel channel,
Expand Down Expand Up @@ -578,7 +579,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
digestUtil,
ServerCapabilitiesRequirement.CACHE);
}
} catch (IOException e) {
} catch (AbruptExitException e) {
throw e; // prevent abrupt interception
} catch (Exception e) {
String errorMessage =
"Failed to query remote execution capabilities: "
+ Utils.grpcAwareErrorMessage(e, verboseFailures);
Expand All @@ -603,12 +606,12 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) {
try {
remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority);
} catch (IOException e) {
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
} catch (InterruptedException e) {
handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE);
return;
}
if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) {
remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName;
Expand All @@ -630,7 +633,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteVerifyDownloads,
digestUtil,
cacheClient);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
Expand Down Expand Up @@ -695,7 +698,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteVerifyDownloads,
digestUtil,
cacheClient);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
Expand Down Expand Up @@ -741,7 +744,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}

private static void handleInitFailure(
CommandEnvironment env, IOException e, Code remoteExecutionCode) {
CommandEnvironment env, Exception e, Code remoteExecutionCode) {
env.getReporter().handle(Event.error(e.getMessage()));
env.getBlazeModuleEnvironment()
.exit(
Expand Down Expand Up @@ -884,7 +887,7 @@ private static void checkClientServerCompatibility(
}

@Override
public void afterCommand() throws AbruptExitException {
public void afterCommand() {
Preconditions.checkNotNull(blockWaitingModule, "blockWaitingModule must not be null");

// Some cleanup tasks must wait until every other BlazeModule's afterCommand() has run, as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,21 +72,17 @@ public ServerCapabilities get(String buildRequestId, String commandId)
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "capabilities", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
try {
GetCapabilitiesRequest request =
instanceName == null
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
return retrier.execute(
() ->
channel.withChannelBlocking(
channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request)));
} catch (StatusRuntimeException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new IOException(e);
}
GetCapabilitiesRequest request =
instanceName == null
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
ServerCapabilities caps =
retrier.execute(
() ->
channel.withChannelBlocking(
channel ->
capabilitiesBlockingStub(context, channel).getCapabilities(request)));
return caps;
}

static class ClientServerCompatibilityStatus {
Expand Down

0 comments on commit 6f737d4

Please sign in to comment.