diff --git a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java index e1d5f51b7fe..6030b7b2528 100644 --- a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java @@ -53,6 +53,7 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; /** @@ -88,6 +89,7 @@ final class ControlPlaneClient { private BackoffPolicy retryBackoffPolicy; @Nullable private ScheduledHandle rpcRetryTimer; + private final AtomicInteger pendingPub = new AtomicInteger(); /** An entity that manages ADS RPCs over a single channel. */ // TODO: rename to XdsChannel @@ -132,6 +134,10 @@ void flowControlRequest(int count) { } } + AtomicInteger flowControlWindow() { + return pendingPub; + } + void shutdown() { syncContext.execute(new Runnable() { @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 6a2db899a04..8edcc9692ab 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -111,7 +111,6 @@ public void uncaughtException(Thread t, Throwable e) { private final InternalLogId logId; private final XdsLogger logger; private volatile boolean isShutdown; - private final AtomicInteger pendingPub = new AtomicInteger(); XdsClientImpl( XdsChannelFactory xdsChannelFactory, @@ -341,7 +340,6 @@ public void run() { if (resourceSubscribers.get(type).isEmpty()) { resourceSubscribers.remove(type); subscribedResourceTypeUrls.remove(type.typeUrl()); - } } } @@ -454,7 +452,11 @@ private void handleResourceUpdate(XdsResourceType.Arg long updateTime = timeProvider.currentTimeNanos(); Map> subscribedResources = resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap()); - pendingPub.set(subscribedResources.size()); + if (subscribedResources.size() == 0) { + maybeCompletedHandleResourceUpdate(args.serverInfo); + } else { + serverChannelMap.get(args.serverInfo).flowControlWindow().set(subscribedResources.size()); + } for (Map.Entry> entry : subscribedResources.entrySet()) { String resourceName = entry.getKey(); ResourceSubscriber subscriber = (ResourceSubscriber) entry.getValue(); @@ -467,7 +469,7 @@ private void handleResourceUpdate(XdsResourceType.Arg // Happy path: the resource updated successfully. Notify the watchers of the update. subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime); } else if (!xdsResourceType.isFullStateOfTheWorld()) { - pendingPub.decrementAndGet(); + maybeCompletedHandleResourceUpdate(args.serverInfo); // Nothing else to do for incremental ADS resources. } else if (invalidResources.contains(resourceName)) { // Handle State of the World ADS: invalid resources. @@ -476,7 +478,7 @@ private void handleResourceUpdate(XdsResourceType.Arg // No cached data. Notify the watchers of an invalid update. subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), true); } else { - pendingPub.decrementAndGet(); + maybeCompletedHandleResourceUpdate(args.serverInfo); } } else if (subscriber.serverInfo.equals(args.serverInfo)) { // For State of the World services, notify watchers when their watched resource is missing @@ -484,14 +486,14 @@ private void handleResourceUpdate(XdsResourceType.Arg // the same xDS server that the ResourceSubscriber is subscribed to. subscriber.onAbsent(); } else { - pendingPub.decrementAndGet(); + maybeCompletedHandleResourceUpdate(args.serverInfo); } } - maybeCompletedHandleResourceUpdate(args.serverInfo); } private void maybeCompletedHandleResourceUpdate(ServerInfo serverInfo) { - if (pendingPub.get() == 0) { + if (serverInfo != null + && serverChannelMap.get(serverInfo).flowControlWindow().decrementAndGet() == 0) { serverChannelMap.get(serverInfo).flowControlRequest(1); } } @@ -649,78 +651,92 @@ boolean hasResult() { } void onData(ParsedResource parsedResource, String version, long updateTime) { - if (respTimer != null && respTimer.isPending()) { - respTimer.cancel(); - respTimer = null; - } - this.metadata = ResourceMetadata - .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime); - ResourceUpdate oldData = this.data; - this.data = parsedResource.getResourceUpdate(); - absent = false; - if (resourceDeletionIgnored) { - logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version " - + "of resource for which we previously ignored a deletion: type {1} name {2}", - serverInfo != null ? serverInfo.target() : "unknown", type, resource); - resourceDeletionIgnored = false; - } - if (!Objects.equals(oldData, data)) { - AtomicInteger fanOutCount = new AtomicInteger(watchers.size()); - for (ResourceWatcher watcher : watchers.keySet()) { - watchers.get(watcher).execute(() -> { - try { - notifyWatcher(watcher, data); - } finally { - if (fanOutCount.decrementAndGet() == 0) { - pendingPub.decrementAndGet(); - maybeCompletedHandleResourceUpdate(serverInfo); + boolean pendingProcess = false; + try { + if (respTimer != null && respTimer.isPending()) { + respTimer.cancel(); + respTimer = null; + } + this.metadata = ResourceMetadata + .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime); + ResourceUpdate oldData = this.data; + this.data = parsedResource.getResourceUpdate(); + absent = false; + if (resourceDeletionIgnored) { + logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version " + + "of resource for which we previously ignored a deletion: type {1} name {2}", + serverInfo != null ? serverInfo.target() : "unknown", type, resource); + resourceDeletionIgnored = false; + } + if (!Objects.equals(oldData, data)) { + AtomicInteger fanOutCount = new AtomicInteger(watchers.size()); + if (fanOutCount.get() > 0) { + pendingProcess = true; + } + for (ResourceWatcher watcher : watchers.keySet()) { + watchers.get(watcher).execute(() -> { + try { + notifyWatcher(watcher, data); + } finally { + if (fanOutCount.decrementAndGet() == 0) { + maybeCompletedHandleResourceUpdate(serverInfo); + } } - } - }); + }); + } + } + } finally { + if (!pendingProcess) { + maybeCompletedHandleResourceUpdate(serverInfo); } - } else { - pendingPub.decrementAndGet(); } } void onAbsent() { - pendingPub.decrementAndGet(); - if (respTimer != null && respTimer.isPending()) { // too early to conclude absence - return; - } + boolean pendingProcess = false; + try { + if (respTimer != null && respTimer.isPending()) { // too early to conclude absence + return; + } - // Ignore deletion of State of the World resources when this feature is on, - // and the resource is reusable. - boolean ignoreResourceDeletionEnabled = - serverInfo != null && serverInfo.ignoreResourceDeletion(); - if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) { - if (!resourceDeletionIgnored) { - logger.log(XdsLogLevel.FORCE_WARNING, - "xds server {0}: ignoring deletion for resource type {1} name {2}}", - serverInfo.target(), type, resource); - resourceDeletionIgnored = true; + // Ignore deletion of State of the World resources when this feature is on, + // and the resource is reusable. + boolean ignoreResourceDeletionEnabled = + serverInfo != null && serverInfo.ignoreResourceDeletion(); + if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) { + if (!resourceDeletionIgnored) { + logger.log(XdsLogLevel.FORCE_WARNING, + "xds server {0}: ignoring deletion for resource type {1} name {2}}", + serverInfo.target(), type, resource); + resourceDeletionIgnored = true; + } + return; } - return; - } - logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); - if (!absent) { - pendingPub.incrementAndGet(); - data = null; - absent = true; - metadata = ResourceMetadata.newResourceMetadataDoesNotExist(); - AtomicInteger fanOutCount = new AtomicInteger(watchers.size()); - for (ResourceWatcher watcher : watchers.keySet()) { - watchers.get(watcher).execute(() -> { - try { - watcher.onResourceDoesNotExist(resource); - } finally { - if (fanOutCount.decrementAndGet() == 0) { - pendingPub.decrementAndGet(); - maybeCompletedHandleResourceUpdate(serverInfo); + logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); + if (!absent) { + data = null; + absent = true; + metadata = ResourceMetadata.newResourceMetadataDoesNotExist(); + AtomicInteger fanOutCount = new AtomicInteger(watchers.size()); + if (fanOutCount.get() > 0) { + pendingProcess = true; + } + for (ResourceWatcher watcher : watchers.keySet()) { + watchers.get(watcher).execute(() -> { + try { + watcher.onResourceDoesNotExist(resource); + } finally { + if (fanOutCount.decrementAndGet() == 0) { + maybeCompletedHandleResourceUpdate(serverInfo); + } } - } - }); + }); + } + } + } finally { + if (!pendingProcess) { + maybeCompletedHandleResourceUpdate(serverInfo); } } } @@ -739,13 +755,15 @@ void onError(Status error, boolean doFlowControl) { .withCause(error.getCause()); AtomicInteger fanOutCount = new AtomicInteger(watchers.size()); + if (fanOutCount.get() == 0 && doFlowControl) { + maybeCompletedHandleResourceUpdate(serverInfo); + } for (ResourceWatcher watcher : watchers.keySet()) { watchers.get(watcher).execute(() -> { try { watcher.onError(errorAugmented); } finally { if (fanOutCount.decrementAndGet() == 0 && doFlowControl) { - pendingPub.decrementAndGet(); maybeCompletedHandleResourceUpdate(serverInfo); } }