From ceba7335fe7010f85caa9b17ee7813139b07875a Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Wed, 6 Dec 2023 10:07:16 -0800 Subject: [PATCH] local update --- .../java/io/grpc/xds/ControlPlaneClient.java | 87 ------------ .../main/java/io/grpc/xds/XdsClientImpl.java | 125 ++++++++---------- 2 files changed, 56 insertions(+), 156 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java index f11ff9530270..e1d5f51b7fe0 100644 --- a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java @@ -53,7 +53,6 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; /** @@ -277,62 +276,6 @@ XdsResourceType fromTypeUrl(String typeUrl) { return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl); } - void onFanOutDone() { - if (adsStream != null) { - adsStream.onFanOutDone(); - } - } - - void onFanOut(int count) { - if (adsStream != null) { - adsStream.onResourceFanOut(count); - } - } - - Runnable getFlowControlCallback() { - if (adsStream != null) { - return adsStream.getFlowControlCallback(); - } - return null; - } - - /** - * For xds client flow control. - * For each xds response, increment counter when sending out one notification for a watcher with a - * callback. - * Mark waitComplete when all the watchers are notified (add integer MIN_VALUE). Listener on each - * watcher that calls back upon finish and decrement the counter. When all the watchers finish - * update, reset the counter and request the next response message. - */ - private final class FlowControlCounter { - private final AtomicInteger pendingWatcherCounter = new AtomicInteger(0); - private final Runnable callbackRunnable = new Runnable() { - @Override - public void run() { - if (pendingWatcherCounter.decrementAndGet() == Integer.MIN_VALUE) { - pendingWatcherCounter.getAndSet(0); - flowControlRequest(1); - } - } - }; - - void onFanOut(int newEventsCount) { - pendingWatcherCounter.getAndAdd(newEventsCount); - } - - void onFanOutDone() { - if (pendingWatcherCounter.get() >= 0 - && pendingWatcherCounter.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) { - pendingWatcherCounter.getAndSet(0); - flowControlRequest(1); - } - } - - Runnable getCallbackRunnable() { - return callbackRunnable; - } - } - private abstract class AbstractAdsStream { private boolean responseReceived; private boolean closed; @@ -352,20 +295,6 @@ private abstract class AbstractAdsStream { abstract void request(int count); - /** - * For xDS stream flow control. Sending each watcher notification increment the counter - * {@link #onFanOut(int)}. - * Processing completion on each watcher decrement the counter via the callback - * {@link #getFlowControlCallback}. When all the watchers subscribed to the resources in the - * response have been notified {@link #onFanOutDone} and counter reaches zero, ads stream is - * ready to receive the next message. - */ - abstract void onFanOutDone(); - - abstract void onResourceFanOut(int count); - - abstract Runnable getFlowControlCallback(); - /** * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and * {@code errorDetail}. Used for reacting to a specific discovery response. For @@ -451,7 +380,6 @@ private void cleanUp() { private final class AdsStreamV3 extends AbstractAdsStream { private ClientCallStreamObserver requestWriter; - private final FlowControlCounter flowControlCounter = new FlowControlCounter(); @Override public boolean isReady() { @@ -554,21 +482,6 @@ void request(int count) { requestWriter.request(count); } - @Override - void onFanOutDone() { - flowControlCounter.onFanOutDone(); - } - - @Override - void onResourceFanOut(int count) { - flowControlCounter.onFanOut(count); - } - - @Override - Runnable getFlowControlCallback() { - return flowControlCounter.getCallbackRunnable(); - } - @Override void sendError(Exception error) { requestWriter.onError(error); diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 6e7f3326a64d..67f61cc63f99 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -60,6 +60,7 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -110,6 +111,7 @@ public void uncaughtException(Thread t, Throwable e) { private final InternalLogId logId; private final XdsLogger logger; private volatile boolean isShutdown; + private final AtomicInteger pendingPub = new AtomicInteger(); XdsClientImpl( XdsChannelFactory xdsChannelFactory, @@ -188,7 +190,7 @@ public void handleStreamClosed(Status error) { resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { if (!subscriber.hasResult()) { - subscriber.onError(error); + subscriber.onError(error, false); } } } @@ -452,45 +454,46 @@ private void handleResourceUpdate(XdsResourceType.Arg long updateTime = timeProvider.currentTimeNanos(); Map> subscribedResources = resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap()); + pendingPub.set(subscribedResources.size()); for (Map.Entry> entry : subscribedResources.entrySet()) { String resourceName = entry.getKey(); ResourceSubscriber subscriber = (ResourceSubscriber) entry.getValue(); - - if (parsedResources.containsKey(resourceName)) { - // Happy path: the resource updated successfully. Notify the watchers of the update. - subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime); - continue; - } - if (invalidResources.contains(resourceName)) { // The resource update is invalid. Capture the error without notifying the watchers. subscriber.onRejected(args.versionInfo, updateTime, errorDetail); } - // Nothing else to do for incremental ADS resources. - if (!xdsResourceType.isFullStateOfTheWorld()) { - continue; - } - - // Handle State of the World ADS: invalid resources. - if (invalidResources.contains(resourceName)) { + if (parsedResources.containsKey(resourceName)) { + // Happy path: the resource updated successfully. Notify the watchers of the update. + subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime); + } else if (!xdsResourceType.isFullStateOfTheWorld()) { + pendingPub.decrementAndGet(); + // Nothing else to do for incremental ADS resources. + } else if (invalidResources.contains(resourceName)) { + // Handle State of the World ADS: invalid resources. // The resource is missing. Reuse the cached resource if possible. if (subscriber.data == null) { // No cached data. Notify the watchers of an invalid update. - subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail)); + subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), true); + } else { + pendingPub.decrementAndGet(); } - continue; - } - - // For State of the World services, notify watchers when their watched resource is missing - // from the ADS update. Note that we can only do this if the resource update is coming from - // the same xDS server that the ResourceSubscriber is subscribed to. - if (subscriber.serverInfo.equals(args.serverInfo)) { + } else if (subscriber.serverInfo.equals(args.serverInfo)) { + // For State of the World services, notify watchers when their watched resource is missing + // from the ADS update. Note that we can only do this if the resource update is coming from + // the same xDS server that the ResourceSubscriber is subscribed to. subscriber.onAbsent(); + } else { + pendingPub.decrementAndGet(); } } - for (Map.Entry> entry : subscribedResources.entrySet()) { - entry.getValue().onFinishUpdate(); + maybeCompletedHandleResourceUpdate(args.serverInfo); + } + + private void maybeCompletedHandleResourceUpdate(ServerInfo serverInfo) { + syncContext.throwIfNotInThisSynchronizationContext(); + if (pendingPub.get() == 0) { + serverChannelMap.get(serverInfo).flowControlRequest(1); } } @@ -565,35 +568,19 @@ private ServerInfo getServerInfo(String resource) { void addWatcher(ResourceWatcher watcher, SynchronizationContext watcherSyncContext) { checkArgument(!watchers.containsKey(watcher), "watcher %s already registered", watcher); watchers.put(watcher, watcherSyncContext); - if (xdsChannel != null) { - xdsChannel.onFanOut(1); - } T savedData = data; boolean savedAbsent = absent; watchers.get(watcher).execute(() -> { - try { - if (errorDescription != null) { - watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription)); - return; - } - if (savedData != null) { - notifyWatcher(watcher, data); - } else if (savedAbsent) { - watcher.onResourceDoesNotExist(resource); - } - } finally { - callbackFlowControl(); + if (errorDescription != null) { + watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription)); + return; } - }); - } - - private void callbackFlowControl() { - if (xdsChannel != null) { - Runnable cb = xdsChannel.getFlowControlCallback(); - if (cb != null) { - cb.run(); + if (savedData != null) { + notifyWatcher(watcher, data); + } else if (savedAbsent) { + watcher.onResourceDoesNotExist(resource); } - } + }); } void removeWatcher(ResourceWatcher watcher) { @@ -679,22 +666,26 @@ void onData(ParsedResource parsedResource, String version, long updateTime) { resourceDeletionIgnored = false; } if (!Objects.equals(oldData, data)) { - if (xdsChannel != null) { - xdsChannel.onFanOut(watchers.size()); - } + AtomicInteger fanOutCount = new AtomicInteger(watchers.size()); for (ResourceWatcher watcher : watchers.keySet()) { watchers.get(watcher).execute(() -> { try { notifyWatcher(watcher, data); } finally { - callbackFlowControl(); + if (fanOutCount.decrementAndGet() == 0) { + pendingPub.decrementAndGet(); + maybeCompletedHandleResourceUpdate(serverInfo); + } } }); } + } else { + pendingPub.decrementAndGet(); } } void onAbsent() { + pendingPub.decrementAndGet(); if (respTimer != null && respTimer.isPending()) { // too early to conclude absence return; } @@ -715,25 +706,27 @@ void onAbsent() { logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource); if (!absent) { + pendingPub.incrementAndGet(); data = null; absent = true; metadata = ResourceMetadata.newResourceMetadataDoesNotExist(); - if (xdsChannel != null) { - xdsChannel.onFanOut(watchers.size()); - } + AtomicInteger fanOutCount = new AtomicInteger(watchers.size()); for (ResourceWatcher watcher : watchers.keySet()) { watchers.get(watcher).execute(() -> { try { watcher.onResourceDoesNotExist(resource); } finally { - callbackFlowControl(); + if (fanOutCount.decrementAndGet() == 0) { + pendingPub.decrementAndGet(); + maybeCompletedHandleResourceUpdate(serverInfo); + } } }); } } } - void onError(Status error) { + void onError(Status error, boolean doFlowControl) { if (respTimer != null && respTimer.isPending()) { respTimer.cancel(); respTimer = null; @@ -746,15 +739,16 @@ void onError(Status error) { .withDescription(description + "nodeID: " + bootstrapInfo.node().getId()) .withCause(error.getCause()); - if (xdsChannel != null) { - xdsChannel.onFanOut(watchers.size()); - } + AtomicInteger fanOutCount = new AtomicInteger(watchers.size()); for (ResourceWatcher watcher : watchers.keySet()) { watchers.get(watcher).execute(() -> { try { watcher.onError(errorAugmented); } finally { - callbackFlowControl(); + if (fanOutCount.decrementAndGet() == 0 && doFlowControl) { + pendingPub.decrementAndGet(); + maybeCompletedHandleResourceUpdate(serverInfo); + } } }); } @@ -768,13 +762,6 @@ void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetail private void notifyWatcher(ResourceWatcher watcher, T update) { watcher.onChanged(update); } - - // For flow control - void onFinishUpdate() { - if (xdsChannel != null) { - xdsChannel.onFanOutDone(); - } - } } static final class ResourceInvalidException extends Exception {