diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index 5aabd976085..a55b05ba568 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import io.grpc.Context; +import io.grpc.SynchronizationContext; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ObjectPool; @@ -32,6 +33,8 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -42,19 +45,37 @@ */ @ThreadSafe final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { - + private static final Logger log = Logger.getLogger(SharedXdsClientPoolProvider.class.getName()); private final Bootstrapper bootstrapper; private final Object lock = new Object(); private final AtomicReference> bootstrapOverride = new AtomicReference<>(); private volatile ObjectPool xdsClientPool; + private final SynchronizationContext syncContext; SharedXdsClientPoolProvider() { this(new BootstrapperImpl()); } + SharedXdsClientPoolProvider(SynchronizationContext syncContext) { + this(new BootstrapperImpl(), syncContext); + } + @VisibleForTesting SharedXdsClientPoolProvider(Bootstrapper bootstrapper) { + this(bootstrapper, new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + log.log(Level.WARNING, + "Uncaught exception in XdsClient SynchronizationContext. Panic!", e); + throw new AssertionError(e); + } + })); + } + + SharedXdsClientPoolProvider(Bootstrapper bootstrapper, SynchronizationContext syncContext) { this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper"); + this.syncContext = checkNotNull(syncContext, "syncContext"); } static SharedXdsClientPoolProvider getDefaultProvider() { @@ -89,7 +110,7 @@ public ObjectPool getOrCreate() throws XdsInitializationException { if (bootstrapInfo.servers().isEmpty()) { throw new XdsInitializationException("No xDS server provided"); } - ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo); + ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo, syncContext); } } } @@ -112,10 +133,27 @@ static class RefCountedXdsClientObjectPool implements ObjectPool { private XdsClient xdsClient; @GuardedBy("lock") private int refCount; + private final SynchronizationContext syncContext; @VisibleForTesting RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo) { + this(bootstrapInfo, new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + log.log( + Level.WARNING, + "Uncaught exception in XdsClient SynchronizationContext. Panic!", + e); + // TODO(chengyuanzhang): better error handling. + throw new AssertionError(e); + } + })); + } + + RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, SynchronizationContext syncContext) { this.bootstrapInfo = checkNotNull(bootstrapInfo); + this.syncContext = checkNotNull(syncContext); } @Override @@ -131,7 +169,8 @@ public XdsClient getObject() { new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER, TimeProvider.SYSTEM_TIME_PROVIDER, - new TlsContextManagerImpl(bootstrapInfo)); + new TlsContextManagerImpl(bootstrapInfo), + syncContext); } refCount++; return xdsClient; diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 9d61fb2b462..d33eba883ed 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -78,18 +78,7 @@ final class XdsClientImpl extends XdsClient // Longest time to wait, since the subscription to some resource, for concluding its absence. @VisibleForTesting static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15; - private final SynchronizationContext syncContext = new SynchronizationContext( - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - logger.log( - XdsLogLevel.ERROR, - "Uncaught exception in XdsClient SynchronizationContext. Panic!", - e); - // TODO(chengyuanzhang): better error handling. - throw new AssertionError(e); - } - }); + private final SynchronizationContext syncContext; private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry(); private final LoadBalancerRegistry loadBalancerRegistry = LoadBalancerRegistry.getDefaultRegistry(); @@ -120,7 +109,8 @@ public void uncaughtException(Thread t, Throwable e) { BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier, TimeProvider timeProvider, - TlsContextManager tlsContextManager) { + TlsContextManager tlsContextManager, + SynchronizationContext syncContext) { this.xdsChannelFactory = xdsChannelFactory; this.bootstrapInfo = bootstrapInfo; this.context = context; @@ -135,6 +125,7 @@ public void uncaughtException(Thread t, Throwable e) { if (LOG_XDS_NODE_ID) { classLogger.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId()); } + this.syncContext = checkNotNull(syncContext, "syncContext"); } private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 079f862aeca..8b2fb55713f 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -138,7 +138,7 @@ final class XdsNameResolver extends NameResolver { SynchronizationContext syncContext, ScheduledExecutorService scheduler, @Nullable Map bootstrapOverride) { this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler, - SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance, + new SharedXdsClientPoolProvider(syncContext), ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride); } @@ -161,7 +161,7 @@ final class XdsNameResolver extends NameResolver { this.syncContext = checkNotNull(syncContext, "syncContext"); this.scheduler = checkNotNull(scheduler, "scheduler"); this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory, - "xdsClientPoolFactory") : new SharedXdsClientPoolProvider(); + "xdsClientPoolFactory") : new SharedXdsClientPoolProvider(syncContext); this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride); this.random = checkNotNull(random, "random"); this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry"); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index c18b324b14c..c5b8c91e8d7 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -58,6 +58,7 @@ import io.grpc.Server; import io.grpc.Status; import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.BackoffPolicy; @@ -292,6 +293,13 @@ public long currentTimeNanos() { private final String serverName = InProcessServerBuilder.generateName(); private BindableService adsService = createAdsService(); private BindableService lrsService = createLrsService(); + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); @Before public void setUp() throws IOException { @@ -358,6 +366,7 @@ ManagedChannel create(ServerInfo serverInfo) { .certProviders(ImmutableMap.of("cert-instance-name", CertificateProviderInfo.create("file-watcher", ImmutableMap.of()))) .build(); + xdsClient = new XdsClientImpl( xdsChannelFactory, @@ -367,7 +376,8 @@ ManagedChannel create(ServerInfo serverInfo) { backoffPolicyProvider, fakeClock.getStopwatchSupplier(), timeProvider, - tlsContextManager); + tlsContextManager, + syncContext); assertThat(resourceDiscoveryCalls).isEmpty(); assertThat(loadReportCalls).isEmpty(); @@ -3613,7 +3623,8 @@ private XdsClientImpl createXdsClient(String serverUri) { backoffPolicyProvider, fakeClock.getStopwatchSupplier(), timeProvider, - tlsContextManager); + tlsContextManager, + syncContext); } private BootstrapInfo buildBootStrap(String serverUri) {