diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 2f419b439ff..54f86d6da48 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -320,7 +320,7 @@ private ClusterState(String name) { private void start() { shutdown = false; - xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this); } void shutdown() { @@ -341,85 +341,102 @@ public void onError(Status error) { String.format("Unable to load CDS %s. xDS server returned: %s: %s", name, error.getCode(), error.getDescription())) .withCause(error.getCause()); - if (shutdown) { - return; - } - // All watchers should receive the same error, so we only propagate it once. - if (ClusterState.this == root) { - handleClusterDiscoveryError(status); - } + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + // All watchers should receive the same error, so we only propagate it once. + if (ClusterState.this == root) { + handleClusterDiscoveryError(status); + } + } + }); } @Override public void onResourceDoesNotExist(String resourceName) { - if (shutdown) { - return; - } - discovered = true; - result = null; - if (childClusterStates != null) { - for (ClusterState state : childClusterStates.values()) { - state.shutdown(); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + discovered = true; + result = null; + if (childClusterStates != null) { + for (ClusterState state : childClusterStates.values()) { + state.shutdown(); + } + childClusterStates = null; + } + handleClusterDiscovered(); } - childClusterStates = null; - } - handleClusterDiscovered(); + }); } @Override public void onChanged(final CdsUpdate update) { - if (shutdown) { - return; - } - logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update); - discovered = true; - result = update; - if (update.clusterType() == ClusterType.AGGREGATE) { - isLeaf = false; - logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}", - update.clusterName(), update.prioritizedClusterNames()); - Map newChildStates = new LinkedHashMap<>(); - for (String cluster : update.prioritizedClusterNames()) { - if (newChildStates.containsKey(cluster)) { - logger.log(XdsLogLevel.WARNING, - String.format("duplicate cluster name %s in aggregate %s is being ignored", - cluster, update.clusterName())); - continue; + class ClusterDiscovered implements Runnable { + @Override + public void run() { + if (shutdown) { + return; } - if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { - ClusterState childState; - if (clusterStates.containsKey(cluster)) { - childState = clusterStates.get(cluster); - if (childState.shutdown) { - childState.start(); + + logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update); + discovered = true; + result = update; + if (update.clusterType() == ClusterType.AGGREGATE) { + isLeaf = false; + logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}", + update.clusterName(), update.prioritizedClusterNames()); + Map newChildStates = new LinkedHashMap<>(); + for (String cluster : update.prioritizedClusterNames()) { + if (newChildStates.containsKey(cluster)) { + logger.log(XdsLogLevel.WARNING, + String.format("duplicate cluster name %s in aggregate %s is being ignored", + cluster, update.clusterName())); + continue; + } + if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { + ClusterState childState; + if (clusterStates.containsKey(cluster)) { + childState = clusterStates.get(cluster); + if (childState.shutdown) { + childState.start(); + } + } else { + childState = new ClusterState(cluster); + clusterStates.put(cluster, childState); + childState.start(); + } + newChildStates.put(cluster, childState); + } else { + newChildStates.put(cluster, childClusterStates.remove(cluster)); } - } else { - childState = new ClusterState(cluster); - clusterStates.put(cluster, childState); - childState.start(); } - newChildStates.put(cluster, childState); - } else { - newChildStates.put(cluster, childClusterStates.remove(cluster)); - } - } - if (childClusterStates != null) { // stop subscribing to revoked child clusters - for (ClusterState watcher : childClusterStates.values()) { - watcher.shutdown(); + if (childClusterStates != null) { // stop subscribing to revoked child clusters + for (ClusterState watcher : childClusterStates.values()) { + watcher.shutdown(); + } + } + childClusterStates = newChildStates; + } else if (update.clusterType() == ClusterType.EDS) { + isLeaf = true; + logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", + update.clusterName(), update.edsServiceName()); + } else { // logical DNS + isLeaf = true; + logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName()); } + handleClusterDiscovered(); } - childClusterStates = newChildStates; - } else if (update.clusterType() == ClusterType.EDS) { - isLeaf = true; - logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", - update.clusterName(), update.edsServiceName()); - } else { // logical DNS - isLeaf = true; - logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName()); } - handleClusterDiscovered(); - } + syncContext.execute(new ClusterDiscovered()); + } } } } diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 9626b290335..15354da0f95 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -366,8 +366,7 @@ private EdsClusterState(String name, @Nullable String edsServiceName, void start() { String resourceName = edsServiceName != null ? edsServiceName : name; logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), - resourceName, this, syncContext); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this); } @Override @@ -453,7 +452,7 @@ public void run() { } } - new EndpointsUpdated().run(); + syncContext.execute(new EndpointsUpdated()); } private List generatePriorityNames(String name, @@ -492,28 +491,38 @@ private List generatePriorityNames(String name, @Override public void onResourceDoesNotExist(final String resourceName) { - if (shutdown) { - return; - } - logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName); - status = Status.OK; - resolved = true; - result = null; // resource revoked - handleEndpointResourceUpdate(); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName); + status = Status.OK; + resolved = true; + result = null; // resource revoked + handleEndpointResourceUpdate(); + } + }); } @Override public void onError(final Status error) { - if (shutdown) { - return; - } - String resourceName = edsServiceName != null ? edsServiceName : name; - status = Status.UNAVAILABLE - .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s", - resourceName, error.getCode(), error.getDescription())) - .withCause(error.getCause()); - logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error); - handleEndpointResolutionError(); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + String resourceName = edsServiceName != null ? edsServiceName : name; + status = Status.UNAVAILABLE + .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s", + resourceName, error.getCode(), error.getDescription())) + .withCause(error.getCause()); + logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error); + handleEndpointResolutionError(); + } + }); } } diff --git a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java index 10994df6341..c919365d093 100644 --- a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java @@ -38,9 +38,9 @@ import io.grpc.internal.BackoffPolicy; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.Node; -import io.grpc.xds.XdsClient.ProcessingTracker; import io.grpc.xds.XdsClient.ResourceStore; import io.grpc.xds.XdsClient.XdsResponseHandler; import io.grpc.xds.XdsClientImpl.XdsChannelFactory; @@ -288,8 +288,6 @@ private abstract class AbstractAdsStream { abstract boolean isReady(); - abstract void request(int count); - /** * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and * {@code errorDetail}. Used for reacting to a specific discovery response. For @@ -316,10 +314,7 @@ final void handleRpcResponse(XdsResourceType type, String versionInfo, List request(1), syncContext); - xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce, - processingTracker); - processingTracker.onComplete(); + xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce); } final void handleRpcError(Throwable t) { @@ -377,7 +372,7 @@ private void cleanUp() { } private final class AdsStreamV3 extends AbstractAdsStream { - private ClientCallStreamObserver requestWriter; + private StreamObserver requestWriter; @Override public boolean isReady() { @@ -385,7 +380,6 @@ public boolean isReady() { } @Override - @SuppressWarnings("unchecked") void start() { AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(channel); @@ -395,7 +389,6 @@ final class AdsClientResponseObserver @Override public void beforeStart(ClientCallStreamObserver requestStream) { - requestStream.disableAutoRequestWithInitial(1); requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler); } @@ -444,8 +437,7 @@ public void run() { } } - requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources( - new AdsClientResponseObserver()); + requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver()); } @Override @@ -475,11 +467,6 @@ void sendDiscoveryRequest(XdsResourceType type, String versionInfo, } } - @Override - void request(int count) { - requestWriter.request(count); - } - @Override void sendError(Exception error) { requestWriter.onError(error); diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index aa222a98d5c..a66671bcdaa 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -24,7 +24,6 @@ import com.google.common.base.Splitter; import com.google.common.net.UrlEscapers; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; import io.grpc.Status; import io.grpc.xds.Bootstrapper.ServerInfo; @@ -37,8 +36,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; /** @@ -307,15 +304,9 @@ TlsContextManager getTlsContextManager() { /** * Registers a data watcher for the given Xds resource. */ - void watchXdsResource(XdsResourceType type, String resourceName, - ResourceWatcher watcher, - Executor executor) { - throw new UnsupportedOperationException(); - } - void watchXdsResource(XdsResourceType type, String resourceName, ResourceWatcher watcher) { - watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor()); + throw new UnsupportedOperationException(); } /** @@ -362,32 +353,11 @@ Map getServerLrsClientMap() { throw new UnsupportedOperationException(); } - static final class ProcessingTracker { - private final AtomicInteger pendingTask = new AtomicInteger(1); - private final Executor executor; - private final Runnable completionListener; - - ProcessingTracker(Runnable completionListener, Executor executor) { - this.executor = executor; - this.completionListener = completionListener; - } - - void startTask() { - pendingTask.incrementAndGet(); - } - - void onComplete() { - if (pendingTask.decrementAndGet() == 0) { - executor.execute(completionListener); - } - } - } - interface XdsResponseHandler { /** Called when a xds response is received. */ void handleResourceResponse( XdsResourceType resourceType, ServerInfo serverInfo, String versionInfo, - List resources, String nonce, ProcessingTracker processingTracker); + List resources, String nonce); /** Called when the ADS stream is closed passively. */ // Must be synchronized. diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 7389d7ebef1..f617f79acca 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -54,11 +54,11 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -166,7 +166,7 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { @Override public void handleResourceResponse( XdsResourceType xdsResourceType, ServerInfo serverInfo, String versionInfo, - List resources, String nonce, ProcessingTracker processingTracker) { + List resources, String nonce) { checkNotNull(xdsResourceType, "xdsResourceType"); syncContext.throwIfNotInThisSynchronizationContext(); Set toParseResourceNames = null; @@ -178,7 +178,7 @@ public void handleResourceResponse( XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce, bootstrapInfo, filterRegistry, loadBalancerRegistry, tlsContextManager, toParseResourceNames); - handleResourceUpdate(args, resources, xdsResourceType, processingTracker); + handleResourceUpdate(args, resources, xdsResourceType); } @Override @@ -189,7 +189,7 @@ public void handleStreamClosed(Status error) { resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { if (!subscriber.hasResult()) { - subscriber.onError(error, null); + subscriber.onError(error); } } } @@ -289,8 +289,7 @@ TlsContextManager getTlsContextManager() { @Override void watchXdsResource(XdsResourceType type, String resourceName, - ResourceWatcher watcher, - Executor watcherExecutor) { + ResourceWatcher watcher) { syncContext.execute(new Runnable() { @Override @SuppressWarnings("unchecked") @@ -300,7 +299,7 @@ public void run() { subscribedResourceTypeUrls.put(type.typeUrl(), type); } ResourceSubscriber subscriber = - (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName); + (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName);; if (subscriber == null) { logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName); subscriber = new ResourceSubscriber<>(type, resourceName); @@ -309,7 +308,7 @@ public void run() { subscriber.xdsChannel.adjustResourceSubscription(type); } } - subscriber.addWatcher(watcher, watcherExecutor); + subscriber.addWatcher(watcher); } }); } @@ -334,6 +333,7 @@ public void run() { if (resourceSubscribers.get(type).isEmpty()) { resourceSubscribers.remove(type); subscribedResourceTypeUrls.remove(type.typeUrl()); + } } } @@ -420,9 +420,9 @@ private void cleanUpResourceTimers() { } @SuppressWarnings("unchecked") - private void handleResourceUpdate( - XdsResourceType.Args args, List resources, XdsResourceType xdsResourceType, - ProcessingTracker processingTracker) { + private void handleResourceUpdate(XdsResourceType.Args args, + List resources, + XdsResourceType xdsResourceType) { ValidatedResourceUpdate result = xdsResourceType.parse(args, resources); logger.log(XdsLogger.XdsLogLevel.INFO, "Received {0} Response version {1} nonce {2}. Parsed resources: {3}", @@ -449,10 +449,10 @@ private void handleResourceUpdate( for (Map.Entry> entry : subscribedResources.entrySet()) { String resourceName = entry.getKey(); ResourceSubscriber subscriber = (ResourceSubscriber) entry.getValue(); + if (parsedResources.containsKey(resourceName)) { // Happy path: the resource updated successfully. Notify the watchers of the update. - subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime, - processingTracker); + subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime); continue; } @@ -471,7 +471,7 @@ private void handleResourceUpdate( // The resource is missing. Reuse the cached resource if possible. if (subscriber.data == null) { // No cached data. Notify the watchers of an invalid update. - subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker); + subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail)); } continue; } @@ -480,7 +480,7 @@ private void handleResourceUpdate( // from the ADS update. Note that we can only do this if the resource update is coming from // the same xDS server that the ResourceSubscriber is subscribed to. if (subscriber.serverInfo.equals(args.serverInfo)) { - subscriber.onAbsent(processingTracker); + subscriber.onAbsent(); } } } @@ -493,7 +493,7 @@ private final class ResourceSubscriber { @Nullable private final ControlPlaneClient xdsChannel; private final XdsResourceType type; private final String resource; - private final Map, Executor> watchers = new HashMap<>(); + private final Set> watchers = new HashSet<>(); @Nullable private T data; private boolean absent; // Tracks whether the deletion has been ignored per bootstrap server feature. @@ -553,26 +553,22 @@ private ServerInfo getServerInfo(String resource) { return bootstrapInfo.servers().get(0); // use first server } - void addWatcher(ResourceWatcher watcher, Executor watcherExecutor) { - checkArgument(!watchers.containsKey(watcher), "watcher %s already registered", watcher); - watchers.put(watcher, watcherExecutor); - T savedData = data; - boolean savedAbsent = absent; - watcherExecutor.execute(() -> { - if (errorDescription != null) { - watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription)); - return; - } - if (savedData != null) { - notifyWatcher(watcher, savedData); - } else if (savedAbsent) { - watcher.onResourceDoesNotExist(resource); - } - }); + void addWatcher(ResourceWatcher watcher) { + checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher); + watchers.add(watcher); + if (errorDescription != null) { + watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription)); + return; + } + if (data != null) { + notifyWatcher(watcher, data); + } else if (absent) { + watcher.onResourceDoesNotExist(resource); + } } void removeWatcher(ResourceWatcher watcher) { - checkArgument(watchers.containsKey(watcher), "watcher %s not registered", watcher); + checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher); watchers.remove(watcher); } @@ -590,7 +586,7 @@ public void run() { logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", type, resource); respTimer = null; - onAbsent(null); + onAbsent(); } @Override @@ -637,8 +633,7 @@ boolean hasResult() { return data != null || absent; } - void onData(ParsedResource parsedResource, String version, long updateTime, - ProcessingTracker processingTracker) { + void onData(ParsedResource parsedResource, String version, long updateTime) { if (respTimer != null && respTimer.isPending()) { respTimer.cancel(); respTimer = null; @@ -655,20 +650,13 @@ void onData(ParsedResource parsedResource, String version, long updateTime, resourceDeletionIgnored = false; } if (!Objects.equals(oldData, data)) { - for (ResourceWatcher watcher : watchers.keySet()) { - processingTracker.startTask(); - watchers.get(watcher).execute(() -> { - try { - notifyWatcher(watcher, data); - } finally { - processingTracker.onComplete(); - } - }); + for (ResourceWatcher watcher : watchers) { + notifyWatcher(watcher, data); } } } - void onAbsent(@Nullable ProcessingTracker processingTracker) { + void onAbsent() { if (respTimer != null && respTimer.isPending()) { // too early to conclude absence return; } @@ -692,24 +680,13 @@ void onAbsent(@Nullable ProcessingTracker processingTracker) { data = null; absent = true; metadata = ResourceMetadata.newResourceMetadataDoesNotExist(); - for (ResourceWatcher watcher : watchers.keySet()) { - if (processingTracker != null) { - processingTracker.startTask(); - } - watchers.get(watcher).execute(() -> { - try { - watcher.onResourceDoesNotExist(resource); - } finally { - if (processingTracker != null) { - processingTracker.onComplete(); - } - } - }); + for (ResourceWatcher watcher : watchers) { + watcher.onResourceDoesNotExist(resource); } } } - void onError(Status error, @Nullable ProcessingTracker tracker) { + void onError(Status error) { if (respTimer != null && respTimer.isPending()) { respTimer.cancel(); respTimer = null; @@ -722,19 +699,8 @@ void onError(Status error, @Nullable ProcessingTracker tracker) { .withDescription(description + "nodeID: " + bootstrapInfo.node().getId()) .withCause(error.getCause()); - for (ResourceWatcher watcher : watchers.keySet()) { - if (tracker != null) { - tracker.startTask(); - } - watchers.get(watcher).execute(() -> { - try { - watcher.onError(errorAugmented); - } finally { - if (tracker != null) { - tracker.onComplete(); - } - } - }); + for (ResourceWatcher watcher : watchers) { + watcher.onError(errorAugmented); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index a345c7eb263..079f862aeca 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -638,52 +638,66 @@ private class ResolveState implements ResourceWatcher { @Override public void onChanged(final LdsUpdate update) { - if (stopped) { - return; - } - logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update); - HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); - List virtualHosts = httpConnectionManager.virtualHosts(); - String rdsName = httpConnectionManager.rdsName(); - cleanUpRouteDiscoveryState(); - if (virtualHosts != null) { - updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(), - httpConnectionManager.httpFilterConfigs()); - } else { - routeDiscoveryState = new RouteDiscoveryState( - rdsName, httpConnectionManager.httpMaxStreamDurationNano(), - httpConnectionManager.httpFilterConfigs()); - logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), - rdsName, routeDiscoveryState, syncContext); - } + syncContext.execute(new Runnable() { + @Override + public void run() { + if (stopped) { + return; + } + logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update); + HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); + List virtualHosts = httpConnectionManager.virtualHosts(); + String rdsName = httpConnectionManager.rdsName(); + cleanUpRouteDiscoveryState(); + if (virtualHosts != null) { + updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(), + httpConnectionManager.httpFilterConfigs()); + } else { + routeDiscoveryState = new RouteDiscoveryState( + rdsName, httpConnectionManager.httpMaxStreamDurationNano(), + httpConnectionManager.httpFilterConfigs()); + logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), + rdsName, routeDiscoveryState); + } + } + }); } @Override public void onError(final Status error) { - if (stopped || receivedConfig) { - return; - } - listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( - String.format("Unable to load LDS %s. xDS server returned: %s: %s", - ldsResourceName, error.getCode(), error.getDescription()))); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (stopped || receivedConfig) { + return; + } + listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( + String.format("Unable to load LDS %s. xDS server returned: %s: %s", + ldsResourceName, error.getCode(), error.getDescription()))); + } + }); } @Override public void onResourceDoesNotExist(final String resourceName) { - if (stopped) { - return; - } - String error = "LDS resource does not exist: " + resourceName; - logger.log(XdsLogLevel.INFO, error); - cleanUpRouteDiscoveryState(); - cleanUpRoutes(error); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (stopped) { + return; + } + String error = "LDS resource does not exist: " + resourceName; + logger.log(XdsLogLevel.INFO, error); + cleanUpRouteDiscoveryState(); + cleanUpRoutes(error); + } + }); } private void start() { logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(), - ldsResourceName, this, syncContext); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), ldsResourceName, this); } private void stop() { @@ -851,31 +865,47 @@ private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano, @Override public void onChanged(final RdsUpdate update) { - if (RouteDiscoveryState.this != routeDiscoveryState) { - return; - } - logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update); - updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (RouteDiscoveryState.this != routeDiscoveryState) { + return; + } + logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update); + updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, + filterConfigs); + } + }); } @Override public void onError(final Status error) { - if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) { - return; - } - listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( - String.format("Unable to load RDS %s. xDS server returned: %s: %s", - resourceName, error.getCode(), error.getDescription()))); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) { + return; + } + listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( + String.format("Unable to load RDS %s. xDS server returned: %s: %s", + resourceName, error.getCode(), error.getDescription()))); + } + }); } @Override public void onResourceDoesNotExist(final String resourceName) { - if (RouteDiscoveryState.this != routeDiscoveryState) { - return; - } - String error = "RDS resource does not exist: " + resourceName; - logger.log(XdsLogLevel.INFO, error); - cleanUpRoutes(error); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (RouteDiscoveryState.this != routeDiscoveryState) { + return; + } + String error = "RDS resource does not exist: " + resourceName; + logger.log(XdsLogLevel.INFO, error); + cleanUpRoutes(error); + } + }); } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index 8c15dca8ea2..b3bbe005825 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -366,78 +366,92 @@ public Listener interceptCall(ServerCall call, private DiscoveryState(String resourceName) { this.resourceName = checkNotNull(resourceName, "resourceName"); - xdsClient.watchXdsResource( - XdsListenerResource.getInstance(), resourceName, this, syncContext); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), resourceName, this); } @Override public void onChanged(final LdsUpdate update) { - if (stopped) { - return; - } - logger.log(Level.FINEST, "Received Lds update {0}", update); - checkNotNull(update.listener(), "update"); - if (!pendingRds.isEmpty()) { - // filter chain state has not yet been applied to filterChainSelectorManager and there - // are two sets of sslContextProviderSuppliers, so we release the old ones. - releaseSuppliersInFlight(); - pendingRds.clear(); - } - filterChains = update.listener().filterChains(); - defaultFilterChain = update.listener().defaultFilterChain(); - List allFilterChains = filterChains; - if (defaultFilterChain != null) { - allFilterChains = new ArrayList<>(filterChains); - allFilterChains.add(defaultFilterChain); - } - Set allRds = new HashSet<>(); - for (FilterChain filterChain : allFilterChains) { - HttpConnectionManager hcm = filterChain.httpConnectionManager(); - if (hcm.virtualHosts() == null) { - RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName()); - if (rdsState == null) { - rdsState = new RouteDiscoveryState(hcm.rdsName()); - routeDiscoveryStates.put(hcm.rdsName(), rdsState); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), - hcm.rdsName(), rdsState, syncContext); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (stopped) { + return; } - if (rdsState.isPending) { - pendingRds.add(hcm.rdsName()); + logger.log(Level.FINEST, "Received Lds update {0}", update); + checkNotNull(update.listener(), "update"); + if (!pendingRds.isEmpty()) { + // filter chain state has not yet been applied to filterChainSelectorManager and there + // are two sets of sslContextProviderSuppliers, so we release the old ones. + releaseSuppliersInFlight(); + pendingRds.clear(); + } + filterChains = update.listener().filterChains(); + defaultFilterChain = update.listener().defaultFilterChain(); + List allFilterChains = filterChains; + if (defaultFilterChain != null) { + allFilterChains = new ArrayList<>(filterChains); + allFilterChains.add(defaultFilterChain); + } + Set allRds = new HashSet<>(); + for (FilterChain filterChain : allFilterChains) { + HttpConnectionManager hcm = filterChain.httpConnectionManager(); + if (hcm.virtualHosts() == null) { + RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName()); + if (rdsState == null) { + rdsState = new RouteDiscoveryState(hcm.rdsName()); + routeDiscoveryStates.put(hcm.rdsName(), rdsState); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), + hcm.rdsName(), rdsState); + } + if (rdsState.isPending) { + pendingRds.add(hcm.rdsName()); + } + allRds.add(hcm.rdsName()); + } + } + for (Map.Entry entry: routeDiscoveryStates.entrySet()) { + if (!allRds.contains(entry.getKey())) { + xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), + entry.getKey(), entry.getValue()); + } + } + routeDiscoveryStates.keySet().retainAll(allRds); + if (pendingRds.isEmpty()) { + updateSelector(); } - allRds.add(hcm.rdsName()); - } - } - for (Map.Entry entry: routeDiscoveryStates.entrySet()) { - if (!allRds.contains(entry.getKey())) { - xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), - entry.getKey(), entry.getValue()); } - } - routeDiscoveryStates.keySet().retainAll(allRds); - if (pendingRds.isEmpty()) { - updateSelector(); - } + }); } @Override public void onResourceDoesNotExist(final String resourceName) { - if (stopped) { - return; - } - StatusException statusException = Status.UNAVAILABLE.withDescription( - "Listener " + resourceName + " unavailable").asException(); - handleConfigNotFound(statusException); + syncContext.execute(new Runnable() { + @Override + public void run() { + if (stopped) { + return; + } + StatusException statusException = Status.UNAVAILABLE.withDescription( + "Listener " + resourceName + " unavailable").asException(); + handleConfigNotFound(statusException); + } + }); } @Override public void onError(final Status error) { - if (stopped) { - return; - } - logger.log(Level.FINE, "Error from XdsClient", error); - if (!isServing) { - listener.onNotServing(error.asException()); - } + syncContext.execute(new Runnable() { + @Override + public void run() { + if (stopped) { + return; + } + logger.log(Level.FINE, "Error from XdsClient", error); + if (!isServing) { + listener.onNotServing(error.asException()); + } + } + }); } private void shutdown() { diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 3014c809ec2..4f30ec3ec4d 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -65,7 +65,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; @@ -665,7 +664,7 @@ public void unknownLbProvider() { outlierDetection) .lbPolicyConfig(ImmutableMap.of("unknown", ImmutableMap.of("foo", "bar"))).build()); } catch (Exception e) { - assertThat(e).hasMessageThat().contains("No provider available"); + assertThat(e).hasCauseThat().hasMessageThat().contains("No provider available"); return; } fail("Expected the unknown LB to cause an exception"); @@ -680,7 +679,7 @@ public void invalidLbConfig() { ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1"))) .build()); } catch (Exception e) { - assertThat(e).hasMessageThat().contains("Unable to parse"); + assertThat(e).hasCauseThat().hasMessageThat().contains("Unable to parse"); return; } fail("Expected the invalid config to cause an exception"); @@ -790,7 +789,7 @@ private final class FakeXdsClient extends XdsClient { @Override @SuppressWarnings("unchecked") void watchXdsResource(XdsResourceType type, String resourceName, - ResourceWatcher watcher, Executor syncContext) { + ResourceWatcher watcher) { assertThat(type.typeName()).isEqualTo("CDS"); watchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) .add((ResourceWatcher)watcher); diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 76756e63301..1a10dde11c1 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -88,7 +88,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.junit.After; @@ -1182,11 +1181,11 @@ public String toString() { private static final class FakeXdsClient extends XdsClient { private final Map> watchers = new HashMap<>(); + @Override @SuppressWarnings("unchecked") void watchXdsResource(XdsResourceType type, String resourceName, - ResourceWatcher watcher, - Executor syncContext) { + ResourceWatcher watcher) { assertThat(type.typeName()).isEqualTo("EDS"); assertThat(watchers).doesNotContainKey(resourceName); watchers.put(resourceName, (ResourceWatcher) watcher); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 9a3304070d2..c18b324b14c 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -20,10 +20,7 @@ import static com.google.common.truth.Truth.assertWithMessage; import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -107,9 +104,6 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.BlockingDeque; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -127,10 +121,8 @@ import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; -import org.mockito.stubbing.Answer; /** * Tests for {@link XdsClientImpl}. @@ -2959,125 +2951,6 @@ public void cachedEdsResource_absent() { verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); } - @Test - @SuppressWarnings("unchecked") - public void flowControlAbsent() throws Exception { - String anotherCdsResource = CDS_RESOURCE + "2"; - FakeClock fakeWatchClock = new FakeClock(); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, - cdsResourceWatcher, fakeWatchClock.getScheduledExecutorService()); - ResourceWatcher anotherWatcher = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(), anotherCdsResource, - anotherWatcher, fakeWatchClock.getScheduledExecutorService()); - verifyResourceMetadataRequested(CDS, CDS_RESOURCE); - verifyResourceMetadataRequested(CDS, anotherCdsResource); - DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); - call.verifyRequest(CDS, Arrays.asList(CDS_RESOURCE, anotherCdsResource), "", "", NODE); - assertThat(fakeWatchClock.runDueTasks()).isEqualTo(2); - call.sendResponse(CDS, testClusterRoundRobin, VERSION_1, "0000"); - verifyResourceMetadataAcked( - CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1, TIME_INCREMENT); - call.verifyRequest(CDS, Arrays.asList(CDS_RESOURCE, anotherCdsResource), VERSION_1, - "0000", NODE); - verifyNoInteractions(cdsResourceWatcher, anotherWatcher); - fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); - assertThat(fakeWatchClock.getPendingTasks().size()).isEqualTo(2); - CyclicBarrier barrier = new CyclicBarrier(2); - doAnswer(blockUpdate(barrier)).when(cdsResourceWatcher).onChanged(any(CdsUpdate.class)); - - CountDownLatch latch = new CountDownLatch(1); - new Thread(() -> { - try { - fakeWatchClock.runDueTasks(); - latch.countDown(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - }).start(); - ImmutableMap resourcesV2 = ImmutableMap.of( - CDS_RESOURCE, Any.pack(mf.buildEdsCluster(CDS_RESOURCE, "A.2", "round_robin", null, - null, false, null, - "envoy.transport_sockets.tls", null, null - )), - anotherCdsResource, Any.pack(mf.buildClusterInvalid(anotherCdsResource))); - call.sendResponse(CDS, resourcesV2.values().asList(), VERSION_2, "0001"); - assertThat(call.isReady()).isFalse(); - verifyResourceMetadataAcked( - CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1, TIME_INCREMENT); - barrier.await(); - verify(cdsResourceWatcher, atLeastOnce()).onChanged(any()); - String errorMsg = "CDS response Cluster 'cluster.googleapis.com2' validation error: " - + "Cluster cluster.googleapis.com2: unspecified cluster discovery type"; - call.verifyRequestNack(CDS, Arrays.asList(CDS_RESOURCE, anotherCdsResource), VERSION_1, "0001", - NODE, Arrays.asList(errorMsg)); - verify(anotherWatcher).onResourceDoesNotExist(eq(anotherCdsResource)); - barrier.await(); - latch.await(10, TimeUnit.SECONDS); - verify(cdsResourceWatcher, times(2)).onChanged(any()); - verify(anotherWatcher).onError(any()); - } - - private Answer blockUpdate(CyclicBarrier barrier) { - return new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - barrier.await(); - return null; - } - }; - } - - @Test - public void simpleFlowControl() throws Exception { - FakeClock fakeWatchClock = new FakeClock(); - DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE, - edsResourceWatcher, fakeWatchClock.getScheduledExecutorService()); - verifyResourceMetadataRequested(EDS, EDS_RESOURCE); - assertThat(fakeWatchClock.runDueTasks()).isEqualTo(1); - - call.sendResponse(EDS, testClusterLoadAssignment, VERSION_1, "0000"); - call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE); - verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1, - TIME_INCREMENT); - verifyNoInteractions(edsResourceWatcher); - assertThat(fakeWatchClock.getPendingTasks().size()).isEqualTo(1); - - // Updated EDS response. - Any updatedClusterLoadAssignment = Any.pack(mf.buildClusterLoadAssignment(EDS_RESOURCE, - ImmutableList.of(mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2", - mf.buildLbEndpoint("172.44.2.2", 8000, "unknown", 3), 2, 0)), - ImmutableList.of())); - call.sendResponse(EDS, updatedClusterLoadAssignment, VERSION_2, "0001"); - // message not processed due to flow control - call.verifyNoMoreRequest(); - assertThat(call.isReady()).isFalse(); - - CyclicBarrier barrier = new CyclicBarrier(2); - doAnswer(blockUpdate(barrier)).when(edsResourceWatcher).onChanged(any(EdsUpdate.class)); - - CountDownLatch latch = new CountDownLatch(1); - new Thread(() -> { - try { - fakeWatchClock.runDueTasks(); - latch.countDown(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - }).start(); - - verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1, - TIME_INCREMENT); - barrier.await(); - verify(edsResourceWatcher, atLeastOnce()).onChanged(edsUpdateCaptor.capture()); - EdsUpdate edsUpdate = edsUpdateCaptor.getAllValues().get(0); - validateGoldenClusterLoadAssignment(edsUpdate); - barrier.await(); - latch.await(10, TimeUnit.SECONDS); - verify(edsResourceWatcher, times(2)).onChanged(any()); - verifyResourceMetadataAcked(EDS, EDS_RESOURCE, updatedClusterLoadAssignment, VERSION_2, - TIME_INCREMENT * 2); - } - @Test public void edsResourceUpdated() { DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE, @@ -3768,29 +3641,31 @@ private BootstrapInfo buildBootStrap(String serverUri) { } private DiscoveryRpcCall startResourceWatcher( - XdsResourceType type, String name, ResourceWatcher watcher, Executor executor) { - xdsClient.watchXdsResource(type, name, watcher, executor); - DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); - assertThat(call).isNotNull(); - call.verifyRequest(type, Collections.singletonList(name), "", "", NODE); - + XdsResourceType type, String name, ResourceWatcher watcher) { FakeClock.TaskFilter timeoutTaskFilter; switch (type.typeName()) { case "LDS": timeoutTaskFilter = LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; + xdsClient.watchXdsResource(type, name, watcher); break; case "RDS": timeoutTaskFilter = RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; + xdsClient.watchXdsResource(type, name, watcher); break; case "CDS": timeoutTaskFilter = CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; + xdsClient.watchXdsResource(type, name, watcher); break; case "EDS": timeoutTaskFilter = EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; + xdsClient.watchXdsResource(type, name, watcher); break; default: throw new AssertionError("should never be here"); } + + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(type, Collections.singletonList(name), "", "", NODE); ScheduledTask timeoutTask = Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter)); assertThat(timeoutTask.getDelay(TimeUnit.SECONDS)) @@ -3798,11 +3673,6 @@ private DiscoveryRpcCall startResourceWatcher( return call; } - private DiscoveryRpcCall startResourceWatcher( - XdsResourceType type, String name, ResourceWatcher watcher) { - return startResourceWatcher(type, name, watcher, MoreExecutors.directExecutor()); - } - protected abstract static class DiscoveryRpcCall { protected void verifyRequest( @@ -3849,10 +3719,6 @@ protected void sendError(Throwable t) { protected void sendCompleted() { throw new UnsupportedOperationException(); } - - protected boolean isReady() { - throw new UnsupportedOperationException(); - } } protected abstract static class LrsRpcCall { diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplV3Test.java b/xds/src/test/java/io/grpc/xds/XdsClientImplV3Test.java index abf8612e60d..eba41dc4989 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplV3Test.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplV3Test.java @@ -98,7 +98,6 @@ import io.grpc.Context; import io.grpc.Context.CancellationListener; import io.grpc.Status; -import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.Arrays; @@ -248,11 +247,6 @@ protected void sendError(Throwable t) { protected void sendCompleted() { responseObserver.onCompleted(); } - - @Override - protected boolean isReady() { - return ((ServerCallStreamObserver)responseObserver).isReady(); - } } private static class LrsRpcCallV3 extends LrsRpcCall { diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index 18b9ceffb66..885e2d3e121 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -96,7 +96,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -1916,9 +1915,8 @@ BootstrapInfo getBootstrapInfo() { @Override @SuppressWarnings("unchecked") void watchXdsResource(XdsResourceType resourceType, - String resourceName, - ResourceWatcher watcher, - Executor syncContext) { + String resourceName, + ResourceWatcher watcher) { switch (resourceType.typeName()) { case "LDS": @@ -1961,10 +1959,8 @@ void cancelXdsResourceWatch(XdsResourceType type, } void deliverLdsUpdate(long httpMaxStreamDurationNano, List virtualHosts) { - syncContext.execute(() -> { - ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( - httpMaxStreamDurationNano, virtualHosts, null))); - }); + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( + httpMaxStreamDurationNano, virtualHosts, null))); } void deliverLdsUpdate(final List routes) { @@ -1972,10 +1968,8 @@ void deliverLdsUpdate(final List routes) { VirtualHost.create( "virtual-host", Collections.singletonList(expectedLdsResourceName), routes, ImmutableMap.of()); - syncContext.execute(() -> { - ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( - 0L, Collections.singletonList(virtualHost), null))); - }); + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( + 0L, Collections.singletonList(virtualHost), null))); } void deliverLdsUpdateWithFaultInjection( @@ -2019,10 +2013,8 @@ void deliverLdsUpdateWithFaultInjection( Collections.singletonList(expectedLdsResourceName), Collections.singletonList(route), overrideConfig); - syncContext.execute(() -> { - ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( - 0L, Collections.singletonList(virtualHost), filterChain))); - }); + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( + 0L, Collections.singletonList(virtualHost), filterChain))); } void deliverLdsUpdateForRdsNameWithFaultInjection( @@ -2034,23 +2026,17 @@ void deliverLdsUpdateForRdsNameWithFaultInjection( ImmutableList filterChain = ImmutableList.of( new NamedFilterConfig(FAULT_FILTER_INSTANCE_NAME, httpFilterFaultConfig), new NamedFilterConfig(ROUTER_FILTER_INSTANCE_NAME, RouterFilter.ROUTER_CONFIG)); - syncContext.execute(() -> { - ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName( - 0L, rdsName, filterChain))); - }); + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName( + 0L, rdsName, filterChain))); } void deliverLdsUpdateForRdsName(String rdsName) { - syncContext.execute(() -> { - ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName( - 0, rdsName, null))); - }); + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName( + 0, rdsName, null))); } void deliverLdsResourceNotFound() { - syncContext.execute(() -> { - ldsWatcher.onResourceDoesNotExist(expectedLdsResourceName); - }); + ldsWatcher.onResourceDoesNotExist(expectedLdsResourceName); } void deliverRdsUpdateWithFaultInjection( @@ -2086,39 +2072,29 @@ void deliverRdsUpdateWithFaultInjection( Collections.singletonList(expectedLdsResourceName), Collections.singletonList(route), overrideConfig); - syncContext.execute(() -> { - rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost))); - }); + rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost))); } void deliverRdsUpdate(String resourceName, List virtualHosts) { if (!resourceName.equals(rdsResource)) { return; } - syncContext.execute(() -> { - rdsWatcher.onChanged(new RdsUpdate(virtualHosts)); - }); + rdsWatcher.onChanged(new RdsUpdate(virtualHosts)); } void deliverRdsResourceNotFound(String resourceName) { if (!resourceName.equals(rdsResource)) { return; } - syncContext.execute(() -> { - rdsWatcher.onResourceDoesNotExist(rdsResource); - }); + rdsWatcher.onResourceDoesNotExist(rdsResource); } void deliverError(final Status error) { if (ldsWatcher != null) { - syncContext.execute(() -> { - ldsWatcher.onError(error); - }); + ldsWatcher.onError(error); } if (rdsWatcher != null) { - syncContext.execute(() -> { - rdsWatcher.onError(error); - }); + rdsWatcher.onError(error); } } } diff --git a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java index 5e2ab30de30..256e3f61fec 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import javax.annotation.Nullable; /** @@ -183,8 +182,7 @@ public BootstrapInfo getBootstrapInfo() { @SuppressWarnings("unchecked") void watchXdsResource(XdsResourceType resourceType, String resourceName, - ResourceWatcher watcher, - Executor syncContext) { + ResourceWatcher watcher) { switch (resourceType.typeName()) { case "LDS": assertThat(ldsWatcher).isNull(); diff --git a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java index 7bbe6853875..bd797ce6de9 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java @@ -47,7 +47,6 @@ import io.grpc.ServerInterceptor; import io.grpc.Status; import io.grpc.StatusException; -import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.testing.TestMethodDescriptors; import io.grpc.xds.EnvoyServerProtoData.FilterChain; @@ -153,8 +152,7 @@ public void run() { verify(xdsClient, timeout(5000)).watchXdsResource( eq(listenerResource), eq("grpc/server?udpa.resource.listening_address=[::FFFF:129.144.52.38]:80"), - any(ResourceWatcher.class), - any(SynchronizationContext.class)); + any(ResourceWatcher.class)); } @Test @@ -226,8 +224,7 @@ public void run() { eq(listenerResource), eq("xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/server/" + "%5B::FFFF:129.144.52.38%5D:80"), - any(ResourceWatcher.class), - any(SynchronizationContext.class)); + any(ResourceWatcher.class)); } @Test