From 597101c0891c82c45fc86d1c5cab9d5268438cff Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Sun, 31 Dec 2023 12:54:07 -0800 Subject: [PATCH] xds: fix flow control message not delivered when previous message type is unknown (#10785) --- .../java/io/grpc/xds/CdsLoadBalancer2.java | 145 +++++++--------- .../grpc/xds/ClusterResolverLoadBalancer.java | 51 +++--- .../java/io/grpc/xds/ControlPlaneClient.java | 22 ++- xds/src/main/java/io/grpc/xds/XdsClient.java | 34 +++- .../main/java/io/grpc/xds/XdsClientImpl.java | 112 +++++++----- .../java/io/grpc/xds/XdsNameResolver.java | 132 ++++++--------- .../java/io/grpc/xds/XdsServerWrapper.java | 128 +++++++------- .../io/grpc/xds/CdsLoadBalancer2Test.java | 7 +- .../xds/ClusterResolverLoadBalancerTest.java | 5 +- .../io/grpc/xds/XdsClientImplTestBase.java | 160 +++++++++++++++++- .../java/io/grpc/xds/XdsClientImplV3Test.java | 6 + .../java/io/grpc/xds/XdsNameResolverTest.java | 60 +++++-- .../java/io/grpc/xds/XdsServerTestHelper.java | 4 +- .../io/grpc/xds/XdsServerWrapperTest.java | 7 +- 14 files changed, 531 insertions(+), 342 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 54f86d6da48..2f419b439ff 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -320,7 +320,7 @@ private ClusterState(String name) { private void start() { shutdown = false; - xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext); } void shutdown() { @@ -341,102 +341,85 @@ public void onError(Status error) { String.format("Unable to load CDS %s. xDS server returned: %s: %s", name, error.getCode(), error.getDescription())) .withCause(error.getCause()); - syncContext.execute(new Runnable() { - @Override - public void run() { - if (shutdown) { - return; - } - // All watchers should receive the same error, so we only propagate it once. - if (ClusterState.this == root) { - handleClusterDiscoveryError(status); - } - } - }); + if (shutdown) { + return; + } + // All watchers should receive the same error, so we only propagate it once. + if (ClusterState.this == root) { + handleClusterDiscoveryError(status); + } } @Override public void onResourceDoesNotExist(String resourceName) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (shutdown) { - return; - } - discovered = true; - result = null; - if (childClusterStates != null) { - for (ClusterState state : childClusterStates.values()) { - state.shutdown(); - } - childClusterStates = null; - } - handleClusterDiscovered(); + if (shutdown) { + return; + } + discovered = true; + result = null; + if (childClusterStates != null) { + for (ClusterState state : childClusterStates.values()) { + state.shutdown(); } - }); + childClusterStates = null; + } + handleClusterDiscovered(); } @Override public void onChanged(final CdsUpdate update) { - class ClusterDiscovered implements Runnable { - @Override - public void run() { - if (shutdown) { - return; + if (shutdown) { + return; + } + logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update); + discovered = true; + result = update; + if (update.clusterType() == ClusterType.AGGREGATE) { + isLeaf = false; + logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}", + update.clusterName(), update.prioritizedClusterNames()); + Map newChildStates = new LinkedHashMap<>(); + for (String cluster : update.prioritizedClusterNames()) { + if (newChildStates.containsKey(cluster)) { + logger.log(XdsLogLevel.WARNING, + String.format("duplicate cluster name %s in aggregate %s is being ignored", + cluster, update.clusterName())); + continue; } - - logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update); - discovered = true; - result = update; - if (update.clusterType() == ClusterType.AGGREGATE) { - isLeaf = false; - logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}", - update.clusterName(), update.prioritizedClusterNames()); - Map newChildStates = new LinkedHashMap<>(); - for (String cluster : update.prioritizedClusterNames()) { - if (newChildStates.containsKey(cluster)) { - logger.log(XdsLogLevel.WARNING, - String.format("duplicate cluster name %s in aggregate %s is being ignored", - cluster, update.clusterName())); - continue; - } - if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { - ClusterState childState; - if (clusterStates.containsKey(cluster)) { - childState = clusterStates.get(cluster); - if (childState.shutdown) { - childState.start(); - } - } else { - childState = new ClusterState(cluster); - clusterStates.put(cluster, childState); - childState.start(); - } - newChildStates.put(cluster, childState); - } else { - newChildStates.put(cluster, childClusterStates.remove(cluster)); - } - } - if (childClusterStates != null) { // stop subscribing to revoked child clusters - for (ClusterState watcher : childClusterStates.values()) { - watcher.shutdown(); + if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { + ClusterState childState; + if (clusterStates.containsKey(cluster)) { + childState = clusterStates.get(cluster); + if (childState.shutdown) { + childState.start(); } + } else { + childState = new ClusterState(cluster); + clusterStates.put(cluster, childState); + childState.start(); } - childClusterStates = newChildStates; - } else if (update.clusterType() == ClusterType.EDS) { - isLeaf = true; - logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", - update.clusterName(), update.edsServiceName()); - } else { // logical DNS - isLeaf = true; - logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName()); + newChildStates.put(cluster, childState); + } else { + newChildStates.put(cluster, childClusterStates.remove(cluster)); + } + } + if (childClusterStates != null) { // stop subscribing to revoked child clusters + for (ClusterState watcher : childClusterStates.values()) { + watcher.shutdown(); } - handleClusterDiscovered(); } + childClusterStates = newChildStates; + } else if (update.clusterType() == ClusterType.EDS) { + isLeaf = true; + logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", + update.clusterName(), update.edsServiceName()); + } else { // logical DNS + isLeaf = true; + logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName()); } - - syncContext.execute(new ClusterDiscovered()); + handleClusterDiscovered(); } + } } } diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 15354da0f95..9626b290335 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -366,7 +366,8 @@ private EdsClusterState(String name, @Nullable String edsServiceName, void start() { String resourceName = edsServiceName != null ? edsServiceName : name; logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), + resourceName, this, syncContext); } @Override @@ -452,7 +453,7 @@ public void run() { } } - syncContext.execute(new EndpointsUpdated()); + new EndpointsUpdated().run(); } private List generatePriorityNames(String name, @@ -491,38 +492,28 @@ private List generatePriorityNames(String name, @Override public void onResourceDoesNotExist(final String resourceName) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (shutdown) { - return; - } - logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName); - status = Status.OK; - resolved = true; - result = null; // resource revoked - handleEndpointResourceUpdate(); - } - }); + if (shutdown) { + return; + } + logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName); + status = Status.OK; + resolved = true; + result = null; // resource revoked + handleEndpointResourceUpdate(); } @Override public void onError(final Status error) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (shutdown) { - return; - } - String resourceName = edsServiceName != null ? edsServiceName : name; - status = Status.UNAVAILABLE - .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s", - resourceName, error.getCode(), error.getDescription())) - .withCause(error.getCause()); - logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error); - handleEndpointResolutionError(); - } - }); + if (shutdown) { + return; + } + String resourceName = edsServiceName != null ? edsServiceName : name; + status = Status.UNAVAILABLE + .withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s", + resourceName, error.getCode(), error.getDescription())) + .withCause(error.getCause()); + logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error); + handleEndpointResolutionError(); } } diff --git a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java index c919365d093..5a344336ba2 100644 --- a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java @@ -38,9 +38,9 @@ import io.grpc.internal.BackoffPolicy; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; -import io.grpc.stub.StreamObserver; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.Node; +import io.grpc.xds.XdsClient.ProcessingTracker; import io.grpc.xds.XdsClient.ResourceStore; import io.grpc.xds.XdsClient.XdsResponseHandler; import io.grpc.xds.XdsClientImpl.XdsChannelFactory; @@ -288,6 +288,8 @@ private abstract class AbstractAdsStream { abstract boolean isReady(); + abstract void request(int count); + /** * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and * {@code errorDetail}. Used for reacting to a specific discovery response. For @@ -314,7 +316,10 @@ final void handleRpcResponse(XdsResourceType type, String versionInfo, List request(1), syncContext); + xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce, + processingTracker); + processingTracker.onComplete(); } final void handleRpcError(Throwable t) { @@ -372,7 +377,7 @@ private void cleanUp() { } private final class AdsStreamV3 extends AbstractAdsStream { - private StreamObserver requestWriter; + private ClientCallStreamObserver requestWriter; @Override public boolean isReady() { @@ -380,6 +385,7 @@ public boolean isReady() { } @Override + @SuppressWarnings("unchecked") void start() { AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(channel); @@ -389,6 +395,7 @@ final class AdsClientResponseObserver @Override public void beforeStart(ClientCallStreamObserver requestStream) { + requestStream.disableAutoRequestWithInitial(1); requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler); } @@ -408,6 +415,7 @@ public void run() { XdsLogLevel.WARNING, "Ignore an unknown type of DiscoveryResponse: {0}", response.getTypeUrl()); + request(1); return; } handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(), @@ -437,7 +445,8 @@ public void run() { } } - requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver()); + requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources( + new AdsClientResponseObserver()); } @Override @@ -467,6 +476,11 @@ void sendDiscoveryRequest(XdsResourceType type, String versionInfo, } } + @Override + void request(int count) { + requestWriter.request(count); + } + @Override void sendError(Exception error) { requestWriter.onError(error); diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index a66671bcdaa..aa222a98d5c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -24,6 +24,7 @@ import com.google.common.base.Splitter; import com.google.common.net.UrlEscapers; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; import io.grpc.Status; import io.grpc.xds.Bootstrapper.ServerInfo; @@ -36,6 +37,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; /** @@ -305,10 +308,16 @@ TlsContextManager getTlsContextManager() { * Registers a data watcher for the given Xds resource. */ void watchXdsResource(XdsResourceType type, String resourceName, - ResourceWatcher watcher) { + ResourceWatcher watcher, + Executor executor) { throw new UnsupportedOperationException(); } + void watchXdsResource(XdsResourceType type, String resourceName, + ResourceWatcher watcher) { + watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor()); + } + /** * Unregisters the given resource watcher. */ @@ -353,11 +362,32 @@ Map getServerLrsClientMap() { throw new UnsupportedOperationException(); } + static final class ProcessingTracker { + private final AtomicInteger pendingTask = new AtomicInteger(1); + private final Executor executor; + private final Runnable completionListener; + + ProcessingTracker(Runnable completionListener, Executor executor) { + this.executor = executor; + this.completionListener = completionListener; + } + + void startTask() { + pendingTask.incrementAndGet(); + } + + void onComplete() { + if (pendingTask.decrementAndGet() == 0) { + executor.execute(completionListener); + } + } + } + interface XdsResponseHandler { /** Called when a xds response is received. */ void handleResourceResponse( XdsResourceType resourceType, ServerInfo serverInfo, String versionInfo, - List resources, String nonce); + List resources, String nonce, ProcessingTracker processingTracker); /** Called when the ADS stream is closed passively. */ // Must be synchronized. diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index f617f79acca..7389d7ebef1 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -54,11 +54,11 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -166,7 +166,7 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { @Override public void handleResourceResponse( XdsResourceType xdsResourceType, ServerInfo serverInfo, String versionInfo, - List resources, String nonce) { + List resources, String nonce, ProcessingTracker processingTracker) { checkNotNull(xdsResourceType, "xdsResourceType"); syncContext.throwIfNotInThisSynchronizationContext(); Set toParseResourceNames = null; @@ -178,7 +178,7 @@ public void handleResourceResponse( XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce, bootstrapInfo, filterRegistry, loadBalancerRegistry, tlsContextManager, toParseResourceNames); - handleResourceUpdate(args, resources, xdsResourceType); + handleResourceUpdate(args, resources, xdsResourceType, processingTracker); } @Override @@ -189,7 +189,7 @@ public void handleStreamClosed(Status error) { resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { if (!subscriber.hasResult()) { - subscriber.onError(error); + subscriber.onError(error, null); } } } @@ -289,7 +289,8 @@ TlsContextManager getTlsContextManager() { @Override void watchXdsResource(XdsResourceType type, String resourceName, - ResourceWatcher watcher) { + ResourceWatcher watcher, + Executor watcherExecutor) { syncContext.execute(new Runnable() { @Override @SuppressWarnings("unchecked") @@ -299,7 +300,7 @@ public void run() { subscribedResourceTypeUrls.put(type.typeUrl(), type); } ResourceSubscriber subscriber = - (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName);; + (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName); if (subscriber == null) { logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName); subscriber = new ResourceSubscriber<>(type, resourceName); @@ -308,7 +309,7 @@ public void run() { subscriber.xdsChannel.adjustResourceSubscription(type); } } - subscriber.addWatcher(watcher); + subscriber.addWatcher(watcher, watcherExecutor); } }); } @@ -333,7 +334,6 @@ public void run() { if (resourceSubscribers.get(type).isEmpty()) { resourceSubscribers.remove(type); subscribedResourceTypeUrls.remove(type.typeUrl()); - } } } @@ -420,9 +420,9 @@ private void cleanUpResourceTimers() { } @SuppressWarnings("unchecked") - private void handleResourceUpdate(XdsResourceType.Args args, - List resources, - XdsResourceType xdsResourceType) { + private void handleResourceUpdate( + XdsResourceType.Args args, List resources, XdsResourceType xdsResourceType, + ProcessingTracker processingTracker) { ValidatedResourceUpdate result = xdsResourceType.parse(args, resources); logger.log(XdsLogger.XdsLogLevel.INFO, "Received {0} Response version {1} nonce {2}. Parsed resources: {3}", @@ -449,10 +449,10 @@ private void handleResourceUpdate(XdsResourceType.Arg for (Map.Entry> entry : subscribedResources.entrySet()) { String resourceName = entry.getKey(); ResourceSubscriber subscriber = (ResourceSubscriber) entry.getValue(); - if (parsedResources.containsKey(resourceName)) { // Happy path: the resource updated successfully. Notify the watchers of the update. - subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime); + subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime, + processingTracker); continue; } @@ -471,7 +471,7 @@ private void handleResourceUpdate(XdsResourceType.Arg // The resource is missing. Reuse the cached resource if possible. if (subscriber.data == null) { // No cached data. Notify the watchers of an invalid update. - subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail)); + subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker); } continue; } @@ -480,7 +480,7 @@ private void handleResourceUpdate(XdsResourceType.Arg // from the ADS update. Note that we can only do this if the resource update is coming from // the same xDS server that the ResourceSubscriber is subscribed to. if (subscriber.serverInfo.equals(args.serverInfo)) { - subscriber.onAbsent(); + subscriber.onAbsent(processingTracker); } } } @@ -493,7 +493,7 @@ private final class ResourceSubscriber { @Nullable private final ControlPlaneClient xdsChannel; private final XdsResourceType type; private final String resource; - private final Set> watchers = new HashSet<>(); + private final Map, Executor> watchers = new HashMap<>(); @Nullable private T data; private boolean absent; // Tracks whether the deletion has been ignored per bootstrap server feature. @@ -553,22 +553,26 @@ private ServerInfo getServerInfo(String resource) { return bootstrapInfo.servers().get(0); // use first server } - void addWatcher(ResourceWatcher watcher) { - checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher); - watchers.add(watcher); - if (errorDescription != null) { - watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription)); - return; - } - if (data != null) { - notifyWatcher(watcher, data); - } else if (absent) { - watcher.onResourceDoesNotExist(resource); - } + void addWatcher(ResourceWatcher watcher, Executor watcherExecutor) { + checkArgument(!watchers.containsKey(watcher), "watcher %s already registered", watcher); + watchers.put(watcher, watcherExecutor); + T savedData = data; + boolean savedAbsent = absent; + watcherExecutor.execute(() -> { + if (errorDescription != null) { + watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription)); + return; + } + if (savedData != null) { + notifyWatcher(watcher, savedData); + } else if (savedAbsent) { + watcher.onResourceDoesNotExist(resource); + } + }); } void removeWatcher(ResourceWatcher watcher) { - checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher); + checkArgument(watchers.containsKey(watcher), "watcher %s not registered", watcher); watchers.remove(watcher); } @@ -586,7 +590,7 @@ public void run() { logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout", type, resource); respTimer = null; - onAbsent(); + onAbsent(null); } @Override @@ -633,7 +637,8 @@ boolean hasResult() { return data != null || absent; } - void onData(ParsedResource parsedResource, String version, long updateTime) { + void onData(ParsedResource parsedResource, String version, long updateTime, + ProcessingTracker processingTracker) { if (respTimer != null && respTimer.isPending()) { respTimer.cancel(); respTimer = null; @@ -650,13 +655,20 @@ void onData(ParsedResource parsedResource, String version, long updateTime) { resourceDeletionIgnored = false; } if (!Objects.equals(oldData, data)) { - for (ResourceWatcher watcher : watchers) { - notifyWatcher(watcher, data); + for (ResourceWatcher watcher : watchers.keySet()) { + processingTracker.startTask(); + watchers.get(watcher).execute(() -> { + try { + notifyWatcher(watcher, data); + } finally { + processingTracker.onComplete(); + } + }); } } } - void onAbsent() { + void onAbsent(@Nullable ProcessingTracker processingTracker) { if (respTimer != null && respTimer.isPending()) { // too early to conclude absence return; } @@ -680,13 +692,24 @@ void onAbsent() { data = null; absent = true; metadata = ResourceMetadata.newResourceMetadataDoesNotExist(); - for (ResourceWatcher watcher : watchers) { - watcher.onResourceDoesNotExist(resource); + for (ResourceWatcher watcher : watchers.keySet()) { + if (processingTracker != null) { + processingTracker.startTask(); + } + watchers.get(watcher).execute(() -> { + try { + watcher.onResourceDoesNotExist(resource); + } finally { + if (processingTracker != null) { + processingTracker.onComplete(); + } + } + }); } } } - void onError(Status error) { + void onError(Status error, @Nullable ProcessingTracker tracker) { if (respTimer != null && respTimer.isPending()) { respTimer.cancel(); respTimer = null; @@ -699,8 +722,19 @@ void onError(Status error) { .withDescription(description + "nodeID: " + bootstrapInfo.node().getId()) .withCause(error.getCause()); - for (ResourceWatcher watcher : watchers) { - watcher.onError(errorAugmented); + for (ResourceWatcher watcher : watchers.keySet()) { + if (tracker != null) { + tracker.startTask(); + } + watchers.get(watcher).execute(() -> { + try { + watcher.onError(errorAugmented); + } finally { + if (tracker != null) { + tracker.onComplete(); + } + } + }); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 079f862aeca..a345c7eb263 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -638,66 +638,52 @@ private class ResolveState implements ResourceWatcher { @Override public void onChanged(final LdsUpdate update) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (stopped) { - return; - } - logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update); - HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); - List virtualHosts = httpConnectionManager.virtualHosts(); - String rdsName = httpConnectionManager.rdsName(); - cleanUpRouteDiscoveryState(); - if (virtualHosts != null) { - updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(), - httpConnectionManager.httpFilterConfigs()); - } else { - routeDiscoveryState = new RouteDiscoveryState( - rdsName, httpConnectionManager.httpMaxStreamDurationNano(), - httpConnectionManager.httpFilterConfigs()); - logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), - rdsName, routeDiscoveryState); - } - } - }); + if (stopped) { + return; + } + logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update); + HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); + List virtualHosts = httpConnectionManager.virtualHosts(); + String rdsName = httpConnectionManager.rdsName(); + cleanUpRouteDiscoveryState(); + if (virtualHosts != null) { + updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(), + httpConnectionManager.httpFilterConfigs()); + } else { + routeDiscoveryState = new RouteDiscoveryState( + rdsName, httpConnectionManager.httpMaxStreamDurationNano(), + httpConnectionManager.httpFilterConfigs()); + logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), + rdsName, routeDiscoveryState, syncContext); + } } @Override public void onError(final Status error) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (stopped || receivedConfig) { - return; - } - listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( - String.format("Unable to load LDS %s. xDS server returned: %s: %s", - ldsResourceName, error.getCode(), error.getDescription()))); - } - }); + if (stopped || receivedConfig) { + return; + } + listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( + String.format("Unable to load LDS %s. xDS server returned: %s: %s", + ldsResourceName, error.getCode(), error.getDescription()))); } @Override public void onResourceDoesNotExist(final String resourceName) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (stopped) { - return; - } - String error = "LDS resource does not exist: " + resourceName; - logger.log(XdsLogLevel.INFO, error); - cleanUpRouteDiscoveryState(); - cleanUpRoutes(error); - } - }); + if (stopped) { + return; + } + String error = "LDS resource does not exist: " + resourceName; + logger.log(XdsLogLevel.INFO, error); + cleanUpRouteDiscoveryState(); + cleanUpRoutes(error); } private void start() { logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(), ldsResourceName, this); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), + ldsResourceName, this, syncContext); } private void stop() { @@ -865,47 +851,31 @@ private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano, @Override public void onChanged(final RdsUpdate update) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (RouteDiscoveryState.this != routeDiscoveryState) { - return; - } - logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update); - updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, - filterConfigs); - } - }); + if (RouteDiscoveryState.this != routeDiscoveryState) { + return; + } + logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update); + updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs); } @Override public void onError(final Status error) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) { - return; - } - listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( - String.format("Unable to load RDS %s. xDS server returned: %s: %s", - resourceName, error.getCode(), error.getDescription()))); - } - }); + if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) { + return; + } + listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription( + String.format("Unable to load RDS %s. xDS server returned: %s: %s", + resourceName, error.getCode(), error.getDescription()))); } @Override public void onResourceDoesNotExist(final String resourceName) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (RouteDiscoveryState.this != routeDiscoveryState) { - return; - } - String error = "RDS resource does not exist: " + resourceName; - logger.log(XdsLogLevel.INFO, error); - cleanUpRoutes(error); - } - }); + if (RouteDiscoveryState.this != routeDiscoveryState) { + return; + } + String error = "RDS resource does not exist: " + resourceName; + logger.log(XdsLogLevel.INFO, error); + cleanUpRoutes(error); } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index b3bbe005825..8c15dca8ea2 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -366,92 +366,78 @@ public Listener interceptCall(ServerCall call, private DiscoveryState(String resourceName) { this.resourceName = checkNotNull(resourceName, "resourceName"); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(), resourceName, this); + xdsClient.watchXdsResource( + XdsListenerResource.getInstance(), resourceName, this, syncContext); } @Override public void onChanged(final LdsUpdate update) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (stopped) { - return; - } - logger.log(Level.FINEST, "Received Lds update {0}", update); - checkNotNull(update.listener(), "update"); - if (!pendingRds.isEmpty()) { - // filter chain state has not yet been applied to filterChainSelectorManager and there - // are two sets of sslContextProviderSuppliers, so we release the old ones. - releaseSuppliersInFlight(); - pendingRds.clear(); - } - filterChains = update.listener().filterChains(); - defaultFilterChain = update.listener().defaultFilterChain(); - List allFilterChains = filterChains; - if (defaultFilterChain != null) { - allFilterChains = new ArrayList<>(filterChains); - allFilterChains.add(defaultFilterChain); - } - Set allRds = new HashSet<>(); - for (FilterChain filterChain : allFilterChains) { - HttpConnectionManager hcm = filterChain.httpConnectionManager(); - if (hcm.virtualHosts() == null) { - RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName()); - if (rdsState == null) { - rdsState = new RouteDiscoveryState(hcm.rdsName()); - routeDiscoveryStates.put(hcm.rdsName(), rdsState); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), - hcm.rdsName(), rdsState); - } - if (rdsState.isPending) { - pendingRds.add(hcm.rdsName()); - } - allRds.add(hcm.rdsName()); - } + if (stopped) { + return; + } + logger.log(Level.FINEST, "Received Lds update {0}", update); + checkNotNull(update.listener(), "update"); + if (!pendingRds.isEmpty()) { + // filter chain state has not yet been applied to filterChainSelectorManager and there + // are two sets of sslContextProviderSuppliers, so we release the old ones. + releaseSuppliersInFlight(); + pendingRds.clear(); + } + filterChains = update.listener().filterChains(); + defaultFilterChain = update.listener().defaultFilterChain(); + List allFilterChains = filterChains; + if (defaultFilterChain != null) { + allFilterChains = new ArrayList<>(filterChains); + allFilterChains.add(defaultFilterChain); + } + Set allRds = new HashSet<>(); + for (FilterChain filterChain : allFilterChains) { + HttpConnectionManager hcm = filterChain.httpConnectionManager(); + if (hcm.virtualHosts() == null) { + RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName()); + if (rdsState == null) { + rdsState = new RouteDiscoveryState(hcm.rdsName()); + routeDiscoveryStates.put(hcm.rdsName(), rdsState); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), + hcm.rdsName(), rdsState, syncContext); } - for (Map.Entry entry: routeDiscoveryStates.entrySet()) { - if (!allRds.contains(entry.getKey())) { - xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), - entry.getKey(), entry.getValue()); - } - } - routeDiscoveryStates.keySet().retainAll(allRds); - if (pendingRds.isEmpty()) { - updateSelector(); + if (rdsState.isPending) { + pendingRds.add(hcm.rdsName()); } + allRds.add(hcm.rdsName()); } - }); + } + for (Map.Entry entry: routeDiscoveryStates.entrySet()) { + if (!allRds.contains(entry.getKey())) { + xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), + entry.getKey(), entry.getValue()); + } + } + routeDiscoveryStates.keySet().retainAll(allRds); + if (pendingRds.isEmpty()) { + updateSelector(); + } } @Override public void onResourceDoesNotExist(final String resourceName) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (stopped) { - return; - } - StatusException statusException = Status.UNAVAILABLE.withDescription( - "Listener " + resourceName + " unavailable").asException(); - handleConfigNotFound(statusException); - } - }); + if (stopped) { + return; + } + StatusException statusException = Status.UNAVAILABLE.withDescription( + "Listener " + resourceName + " unavailable").asException(); + handleConfigNotFound(statusException); } @Override public void onError(final Status error) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (stopped) { - return; - } - logger.log(Level.FINE, "Error from XdsClient", error); - if (!isServing) { - listener.onNotServing(error.asException()); - } - } - }); + if (stopped) { + return; + } + logger.log(Level.FINE, "Error from XdsClient", error); + if (!isServing) { + listener.onNotServing(error.asException()); + } } private void shutdown() { diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 4f30ec3ec4d..3014c809ec2 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -65,6 +65,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; @@ -664,7 +665,7 @@ public void unknownLbProvider() { outlierDetection) .lbPolicyConfig(ImmutableMap.of("unknown", ImmutableMap.of("foo", "bar"))).build()); } catch (Exception e) { - assertThat(e).hasCauseThat().hasMessageThat().contains("No provider available"); + assertThat(e).hasMessageThat().contains("No provider available"); return; } fail("Expected the unknown LB to cause an exception"); @@ -679,7 +680,7 @@ public void invalidLbConfig() { ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1"))) .build()); } catch (Exception e) { - assertThat(e).hasCauseThat().hasMessageThat().contains("Unable to parse"); + assertThat(e).hasMessageThat().contains("Unable to parse"); return; } fail("Expected the invalid config to cause an exception"); @@ -789,7 +790,7 @@ private final class FakeXdsClient extends XdsClient { @Override @SuppressWarnings("unchecked") void watchXdsResource(XdsResourceType type, String resourceName, - ResourceWatcher watcher) { + ResourceWatcher watcher, Executor syncContext) { assertThat(type.typeName()).isEqualTo("CDS"); watchers.computeIfAbsent(resourceName, k -> new ArrayList<>()) .add((ResourceWatcher)watcher); diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 1a10dde11c1..76756e63301 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -88,6 +88,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.junit.After; @@ -1181,11 +1182,11 @@ public String toString() { private static final class FakeXdsClient extends XdsClient { private final Map> watchers = new HashMap<>(); - @Override @SuppressWarnings("unchecked") void watchXdsResource(XdsResourceType type, String resourceName, - ResourceWatcher watcher) { + ResourceWatcher watcher, + Executor syncContext) { assertThat(type.typeName()).isEqualTo("EDS"); assertThat(watchers).doesNotContainKey(resourceName); watchers.put(resourceName, (ResourceWatcher) watcher); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index c18b324b14c..054b1252ec0 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -20,7 +20,10 @@ import static com.google.common.truth.Truth.assertWithMessage; import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -104,6 +107,9 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -121,8 +127,10 @@ import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; /** * Tests for {@link XdsClientImpl}. @@ -2951,6 +2959,135 @@ public void cachedEdsResource_absent() { verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); } + @Test + @SuppressWarnings("unchecked") + public void flowControlAbsent() throws Exception { + String anotherCdsResource = CDS_RESOURCE + "2"; + FakeClock fakeWatchClock = new FakeClock(); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, + cdsResourceWatcher, fakeWatchClock.getScheduledExecutorService()); + ResourceWatcher anotherWatcher = mock(ResourceWatcher.class); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), anotherCdsResource, + anotherWatcher, fakeWatchClock.getScheduledExecutorService()); + verifyResourceMetadataRequested(CDS, CDS_RESOURCE); + verifyResourceMetadataRequested(CDS, anotherCdsResource); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + call.verifyRequest(CDS, Arrays.asList(CDS_RESOURCE, anotherCdsResource), "", "", NODE); + assertThat(fakeWatchClock.runDueTasks()).isEqualTo(2); + call.sendResponse(CDS, testClusterRoundRobin, VERSION_1, "0000"); + verifyResourceMetadataAcked( + CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1, TIME_INCREMENT); + call.verifyRequest(CDS, Arrays.asList(CDS_RESOURCE, anotherCdsResource), VERSION_1, + "0000", NODE); + verifyNoInteractions(cdsResourceWatcher, anotherWatcher); + fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); + assertThat(fakeWatchClock.getPendingTasks().size()).isEqualTo(2); + CyclicBarrier barrier = new CyclicBarrier(2); + doAnswer(blockUpdate(barrier)).when(cdsResourceWatcher).onChanged(any(CdsUpdate.class)); + + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + try { + fakeWatchClock.runDueTasks(); + latch.countDown(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }).start(); + ImmutableMap resourcesV2 = ImmutableMap.of( + CDS_RESOURCE, Any.pack(mf.buildEdsCluster(CDS_RESOURCE, "A.2", "round_robin", null, + null, false, null, + "envoy.transport_sockets.tls", null, null + )), + anotherCdsResource, Any.pack(mf.buildClusterInvalid(anotherCdsResource))); + call.sendResponse(CDS, resourcesV2.values().asList(), VERSION_2, "0001"); + assertThat(call.isReady()).isFalse(); + verifyResourceMetadataAcked( + CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1, TIME_INCREMENT); + barrier.await(); + verify(cdsResourceWatcher, atLeastOnce()).onChanged(any()); + String errorMsg = "CDS response Cluster 'cluster.googleapis.com2' validation error: " + + "Cluster cluster.googleapis.com2: unspecified cluster discovery type"; + call.verifyRequestNack(CDS, Arrays.asList(CDS_RESOURCE, anotherCdsResource), VERSION_1, "0001", + NODE, Arrays.asList(errorMsg)); + verify(anotherWatcher).onResourceDoesNotExist(eq(anotherCdsResource)); + barrier.await(); + latch.await(10, TimeUnit.SECONDS); + verify(cdsResourceWatcher, times(2)).onChanged(any()); + verify(anotherWatcher).onError(any()); + } + + private Answer blockUpdate(CyclicBarrier barrier) { + return new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + barrier.await(); + return null; + } + }; + } + + @Test + public void simpleFlowControl() throws Exception { + FakeClock fakeWatchClock = new FakeClock(); + DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE, + edsResourceWatcher, fakeWatchClock.getScheduledExecutorService()); + verifyResourceMetadataRequested(EDS, EDS_RESOURCE); + assertThat(fakeWatchClock.runDueTasks()).isEqualTo(1); + + call.sendResponse(EDS, testClusterLoadAssignment, VERSION_1, "0000"); + call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE); + verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1, + TIME_INCREMENT); + verifyNoInteractions(edsResourceWatcher); + assertThat(fakeWatchClock.getPendingTasks().size()).isEqualTo(1); + + // Updated EDS response. + Any updatedClusterLoadAssignment = Any.pack(mf.buildClusterLoadAssignment(EDS_RESOURCE, + ImmutableList.of(mf.buildLocalityLbEndpoints("region2", "zone2", "subzone2", + mf.buildLbEndpoint("172.44.2.2", 8000, "unknown", 3), 2, 0)), + ImmutableList.of())); + call.sendResponse(EDS, updatedClusterLoadAssignment, VERSION_2, "0001"); + // message not processed due to flow control + call.verifyNoMoreRequest(); + assertThat(call.isReady()).isFalse(); + + CyclicBarrier barrier = new CyclicBarrier(2); + doAnswer(blockUpdate(barrier)).when(edsResourceWatcher).onChanged(any(EdsUpdate.class)); + + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + try { + fakeWatchClock.runDueTasks(); + latch.countDown(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }).start(); + + verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1, + TIME_INCREMENT); + barrier.await(); + verify(edsResourceWatcher, atLeastOnce()).onChanged(edsUpdateCaptor.capture()); + EdsUpdate edsUpdate = edsUpdateCaptor.getAllValues().get(0); + validateGoldenClusterLoadAssignment(edsUpdate); + barrier.await(); + latch.await(10, TimeUnit.SECONDS); + verify(edsResourceWatcher, times(2)).onChanged(any()); + verifyResourceMetadataAcked(EDS, EDS_RESOURCE, updatedClusterLoadAssignment, VERSION_2, + TIME_INCREMENT * 2); + } + + @Test + public void flowControlUnknownType() { + DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE, + edsResourceWatcher); + call.sendResponse(CDS, testClusterRoundRobin, VERSION_1, "0000"); + call.sendResponse(EDS, testClusterLoadAssignment, VERSION_1, "0000"); + call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE); + verify(edsResourceWatcher).onChanged(any()); + } + @Test public void edsResourceUpdated() { DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE, @@ -3641,31 +3778,29 @@ private BootstrapInfo buildBootStrap(String serverUri) { } private DiscoveryRpcCall startResourceWatcher( - XdsResourceType type, String name, ResourceWatcher watcher) { + XdsResourceType type, String name, ResourceWatcher watcher, Executor executor) { + xdsClient.watchXdsResource(type, name, watcher, executor); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + assertThat(call).isNotNull(); + call.verifyRequest(type, Collections.singletonList(name), "", "", NODE); + FakeClock.TaskFilter timeoutTaskFilter; switch (type.typeName()) { case "LDS": timeoutTaskFilter = LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchXdsResource(type, name, watcher); break; case "RDS": timeoutTaskFilter = RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchXdsResource(type, name, watcher); break; case "CDS": timeoutTaskFilter = CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchXdsResource(type, name, watcher); break; case "EDS": timeoutTaskFilter = EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER; - xdsClient.watchXdsResource(type, name, watcher); break; default: throw new AssertionError("should never be here"); } - - DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); - call.verifyRequest(type, Collections.singletonList(name), "", "", NODE); ScheduledTask timeoutTask = Iterables.getOnlyElement(fakeClock.getPendingTasks(timeoutTaskFilter)); assertThat(timeoutTask.getDelay(TimeUnit.SECONDS)) @@ -3673,6 +3808,11 @@ private DiscoveryRpcCall startResourceWatcher( return call; } + private DiscoveryRpcCall startResourceWatcher( + XdsResourceType type, String name, ResourceWatcher watcher) { + return startResourceWatcher(type, name, watcher, MoreExecutors.directExecutor()); + } + protected abstract static class DiscoveryRpcCall { protected void verifyRequest( @@ -3719,6 +3859,10 @@ protected void sendError(Throwable t) { protected void sendCompleted() { throw new UnsupportedOperationException(); } + + protected boolean isReady() { + throw new UnsupportedOperationException(); + } } protected abstract static class LrsRpcCall { diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplV3Test.java b/xds/src/test/java/io/grpc/xds/XdsClientImplV3Test.java index eba41dc4989..abf8612e60d 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplV3Test.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplV3Test.java @@ -98,6 +98,7 @@ import io.grpc.Context; import io.grpc.Context.CancellationListener; import io.grpc.Status; +import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.Arrays; @@ -247,6 +248,11 @@ protected void sendError(Throwable t) { protected void sendCompleted() { responseObserver.onCompleted(); } + + @Override + protected boolean isReady() { + return ((ServerCallStreamObserver)responseObserver).isReady(); + } } private static class LrsRpcCallV3 extends LrsRpcCall { diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index 885e2d3e121..18b9ceffb66 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -96,6 +96,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -1915,8 +1916,9 @@ BootstrapInfo getBootstrapInfo() { @Override @SuppressWarnings("unchecked") void watchXdsResource(XdsResourceType resourceType, - String resourceName, - ResourceWatcher watcher) { + String resourceName, + ResourceWatcher watcher, + Executor syncContext) { switch (resourceType.typeName()) { case "LDS": @@ -1959,8 +1961,10 @@ void cancelXdsResourceWatch(XdsResourceType type, } void deliverLdsUpdate(long httpMaxStreamDurationNano, List virtualHosts) { - ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( - httpMaxStreamDurationNano, virtualHosts, null))); + syncContext.execute(() -> { + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( + httpMaxStreamDurationNano, virtualHosts, null))); + }); } void deliverLdsUpdate(final List routes) { @@ -1968,8 +1972,10 @@ void deliverLdsUpdate(final List routes) { VirtualHost.create( "virtual-host", Collections.singletonList(expectedLdsResourceName), routes, ImmutableMap.of()); - ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( - 0L, Collections.singletonList(virtualHost), null))); + syncContext.execute(() -> { + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( + 0L, Collections.singletonList(virtualHost), null))); + }); } void deliverLdsUpdateWithFaultInjection( @@ -2013,8 +2019,10 @@ void deliverLdsUpdateWithFaultInjection( Collections.singletonList(expectedLdsResourceName), Collections.singletonList(route), overrideConfig); - ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( - 0L, Collections.singletonList(virtualHost), filterChain))); + syncContext.execute(() -> { + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forVirtualHosts( + 0L, Collections.singletonList(virtualHost), filterChain))); + }); } void deliverLdsUpdateForRdsNameWithFaultInjection( @@ -2026,17 +2034,23 @@ void deliverLdsUpdateForRdsNameWithFaultInjection( ImmutableList filterChain = ImmutableList.of( new NamedFilterConfig(FAULT_FILTER_INSTANCE_NAME, httpFilterFaultConfig), new NamedFilterConfig(ROUTER_FILTER_INSTANCE_NAME, RouterFilter.ROUTER_CONFIG)); - ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName( - 0L, rdsName, filterChain))); + syncContext.execute(() -> { + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName( + 0L, rdsName, filterChain))); + }); } void deliverLdsUpdateForRdsName(String rdsName) { - ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName( - 0, rdsName, null))); + syncContext.execute(() -> { + ldsWatcher.onChanged(LdsUpdate.forApiListener(HttpConnectionManager.forRdsName( + 0, rdsName, null))); + }); } void deliverLdsResourceNotFound() { - ldsWatcher.onResourceDoesNotExist(expectedLdsResourceName); + syncContext.execute(() -> { + ldsWatcher.onResourceDoesNotExist(expectedLdsResourceName); + }); } void deliverRdsUpdateWithFaultInjection( @@ -2072,29 +2086,39 @@ void deliverRdsUpdateWithFaultInjection( Collections.singletonList(expectedLdsResourceName), Collections.singletonList(route), overrideConfig); - rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost))); + syncContext.execute(() -> { + rdsWatcher.onChanged(new RdsUpdate(Collections.singletonList(virtualHost))); + }); } void deliverRdsUpdate(String resourceName, List virtualHosts) { if (!resourceName.equals(rdsResource)) { return; } - rdsWatcher.onChanged(new RdsUpdate(virtualHosts)); + syncContext.execute(() -> { + rdsWatcher.onChanged(new RdsUpdate(virtualHosts)); + }); } void deliverRdsResourceNotFound(String resourceName) { if (!resourceName.equals(rdsResource)) { return; } - rdsWatcher.onResourceDoesNotExist(rdsResource); + syncContext.execute(() -> { + rdsWatcher.onResourceDoesNotExist(rdsResource); + }); } void deliverError(final Status error) { if (ldsWatcher != null) { - ldsWatcher.onError(error); + syncContext.execute(() -> { + ldsWatcher.onError(error); + }); } if (rdsWatcher != null) { - rdsWatcher.onError(error); + syncContext.execute(() -> { + rdsWatcher.onError(error); + }); } } } diff --git a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java index 256e3f61fec..5e2ab30de30 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import javax.annotation.Nullable; /** @@ -182,7 +183,8 @@ public BootstrapInfo getBootstrapInfo() { @SuppressWarnings("unchecked") void watchXdsResource(XdsResourceType resourceType, String resourceName, - ResourceWatcher watcher) { + ResourceWatcher watcher, + Executor syncContext) { switch (resourceType.typeName()) { case "LDS": assertThat(ldsWatcher).isNull(); diff --git a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java index bd797ce6de9..7bbe6853875 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java @@ -47,6 +47,7 @@ import io.grpc.ServerInterceptor; import io.grpc.Status; import io.grpc.StatusException; +import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.testing.TestMethodDescriptors; import io.grpc.xds.EnvoyServerProtoData.FilterChain; @@ -152,7 +153,8 @@ public void run() { verify(xdsClient, timeout(5000)).watchXdsResource( eq(listenerResource), eq("grpc/server?udpa.resource.listening_address=[::FFFF:129.144.52.38]:80"), - any(ResourceWatcher.class)); + any(ResourceWatcher.class), + any(SynchronizationContext.class)); } @Test @@ -224,7 +226,8 @@ public void run() { eq(listenerResource), eq("xdstp://xds.authority.com/envoy.config.listener.v3.Listener/grpc/server/" + "%5B::FFFF:129.144.52.38%5D:80"), - any(ResourceWatcher.class)); + any(ResourceWatcher.class), + any(SynchronizationContext.class)); } @Test