Skip to content

Commit

Permalink
sync context
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Nov 18, 2023
1 parent 169dacc commit b1f62e5
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 20 deletions.
45 changes: 42 additions & 3 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Context;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
Expand All @@ -32,6 +33,8 @@
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -42,19 +45,37 @@
*/
@ThreadSafe
final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {

private static final Logger log = Logger.getLogger(SharedXdsClientPoolProvider.class.getName());
private final Bootstrapper bootstrapper;
private final Object lock = new Object();
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
private volatile ObjectPool<XdsClient> xdsClientPool;
private final SynchronizationContext syncContext;

SharedXdsClientPoolProvider() {
this(new BootstrapperImpl());
}

SharedXdsClientPoolProvider(SynchronizationContext syncContext) {
this(new BootstrapperImpl(), syncContext);
}

@VisibleForTesting
SharedXdsClientPoolProvider(Bootstrapper bootstrapper) {
this(bootstrapper, new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.log(Level.WARNING,
"Uncaught exception in XdsClient SynchronizationContext. Panic!", e);
throw new AssertionError(e);
}
}));
}

SharedXdsClientPoolProvider(Bootstrapper bootstrapper, SynchronizationContext syncContext) {
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
this.syncContext = checkNotNull(syncContext, "syncContext");
}

static SharedXdsClientPoolProvider getDefaultProvider() {
Expand Down Expand Up @@ -89,7 +110,7 @@ public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided");
}
ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);
ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo, syncContext);
}
}
}
Expand All @@ -112,10 +133,27 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private XdsClient xdsClient;
@GuardedBy("lock")
private int refCount;
private final SynchronizationContext syncContext;

@VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo) {
this(bootstrapInfo, new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.log(
Level.WARNING,
"Uncaught exception in XdsClient SynchronizationContext. Panic!",
e);
// TODO(chengyuanzhang): better error handling.
throw new AssertionError(e);
}
}));
}

RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, SynchronizationContext syncContext) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.syncContext = checkNotNull(syncContext);
}

@Override
Expand All @@ -131,7 +169,8 @@ public XdsClient getObject() {
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
new TlsContextManagerImpl(bootstrapInfo));
new TlsContextManagerImpl(bootstrapInfo),
syncContext);
}
refCount++;
return xdsClient;
Expand Down
17 changes: 4 additions & 13 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,7 @@ final class XdsClientImpl extends XdsClient
// Longest time to wait, since the subscription to some resource, for concluding its absence.
@VisibleForTesting
static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.log(
XdsLogLevel.ERROR,
"Uncaught exception in XdsClient SynchronizationContext. Panic!",
e);
// TODO(chengyuanzhang): better error handling.
throw new AssertionError(e);
}
});
private final SynchronizationContext syncContext;
private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
private final LoadBalancerRegistry loadBalancerRegistry
= LoadBalancerRegistry.getDefaultRegistry();
Expand Down Expand Up @@ -120,7 +109,8 @@ public void uncaughtException(Thread t, Throwable e) {
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier,
TimeProvider timeProvider,
TlsContextManager tlsContextManager) {
TlsContextManager tlsContextManager,
SynchronizationContext syncContext) {
this.xdsChannelFactory = xdsChannelFactory;
this.bootstrapInfo = bootstrapInfo;
this.context = context;
Expand All @@ -135,6 +125,7 @@ public void uncaughtException(Thread t, Throwable e) {
if (LOG_XDS_NODE_ID) {
classLogger.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
}
this.syncContext = checkNotNull(syncContext, "syncContext");
}

private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
Expand Down
4 changes: 2 additions & 2 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ final class XdsNameResolver extends NameResolver {
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
@Nullable Map<String, ?> bootstrapOverride) {
this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler,
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
new SharedXdsClientPoolProvider(syncContext), ThreadSafeRandomImpl.instance,
FilterRegistry.getDefaultRegistry(), bootstrapOverride);
}

Expand All @@ -161,7 +161,7 @@ final class XdsNameResolver extends NameResolver {
this.syncContext = checkNotNull(syncContext, "syncContext");
this.scheduler = checkNotNull(scheduler, "scheduler");
this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory,
"xdsClientPoolFactory") : new SharedXdsClientPoolProvider();
"xdsClientPoolFactory") : new SharedXdsClientPoolProvider(syncContext);
this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
this.random = checkNotNull(random, "random");
this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
Expand Down
15 changes: 13 additions & 2 deletions xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy;
Expand Down Expand Up @@ -292,6 +293,13 @@ public long currentTimeNanos() {
private final String serverName = InProcessServerBuilder.generateName();
private BindableService adsService = createAdsService();
private BindableService lrsService = createLrsService();
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});

@Before
public void setUp() throws IOException {
Expand Down Expand Up @@ -358,6 +366,7 @@ ManagedChannel create(ServerInfo serverInfo) {
.certProviders(ImmutableMap.of("cert-instance-name",
CertificateProviderInfo.create("file-watcher", ImmutableMap.<String, Object>of())))
.build();

xdsClient =
new XdsClientImpl(
xdsChannelFactory,
Expand All @@ -367,7 +376,8 @@ ManagedChannel create(ServerInfo serverInfo) {
backoffPolicyProvider,
fakeClock.getStopwatchSupplier(),
timeProvider,
tlsContextManager);
tlsContextManager,
syncContext);

assertThat(resourceDiscoveryCalls).isEmpty();
assertThat(loadReportCalls).isEmpty();
Expand Down Expand Up @@ -3613,7 +3623,8 @@ private XdsClientImpl createXdsClient(String serverUri) {
backoffPolicyProvider,
fakeClock.getStopwatchSupplier(),
timeProvider,
tlsContextManager);
tlsContextManager,
syncContext);
}

private BootstrapInfo buildBootStrap(String serverUri) {
Expand Down

0 comments on commit b1f62e5

Please sign in to comment.