Skip to content

Commit

Permalink
local update
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Dec 7, 2023
1 parent a14e4ca commit e9f81f6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 156 deletions.
87 changes: 0 additions & 87 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -277,62 +276,6 @@ XdsResourceType<?> fromTypeUrl(String typeUrl) {
return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
}

void onFanOutDone() {
if (adsStream != null) {
adsStream.onFanOutDone();
}
}

void onFanOut(int count) {
if (adsStream != null) {
adsStream.onResourceFanOut(count);
}
}

Runnable getFlowControlCallback() {
if (adsStream != null) {
return adsStream.getFlowControlCallback();
}
return null;
}

/**
* For xds client flow control.
* For each xds response, increment counter when sending out one notification for a watcher with a
* callback.
* Mark waitComplete when all the watchers are notified (add integer MIN_VALUE). Listener on each
* watcher that calls back upon finish and decrement the counter. When all the watchers finish
* update, reset the counter and request the next response message.
*/
private final class FlowControlCounter {
private final AtomicInteger pendingWatcherCounter = new AtomicInteger(0);
private final Runnable callbackRunnable = new Runnable() {
@Override
public void run() {
if (pendingWatcherCounter.decrementAndGet() == Integer.MIN_VALUE) {
pendingWatcherCounter.getAndSet(0);
flowControlRequest(1);
}
}
};

void onFanOut(int newEventsCount) {
pendingWatcherCounter.getAndAdd(newEventsCount);
}

void onFanOutDone() {
if (pendingWatcherCounter.get() >= 0
&& pendingWatcherCounter.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
pendingWatcherCounter.getAndSet(0);
flowControlRequest(1);
}
}

Runnable getCallbackRunnable() {
return callbackRunnable;
}
}

private abstract class AbstractAdsStream {
private boolean responseReceived;
private boolean closed;
Expand All @@ -352,20 +295,6 @@ private abstract class AbstractAdsStream {

abstract void request(int count);

/**
* For xDS stream flow control. Sending each watcher notification increment the counter
* {@link #onFanOut(int)}.
* Processing completion on each watcher decrement the counter via the callback
* {@link #getFlowControlCallback}. When all the watchers subscribed to the resources in the
* response have been notified {@link #onFanOutDone} and counter reaches zero, ads stream is
* ready to receive the next message.
*/
abstract void onFanOutDone();

abstract void onResourceFanOut(int count);

abstract Runnable getFlowControlCallback();

/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
Expand Down Expand Up @@ -451,7 +380,6 @@ private void cleanUp() {

private final class AdsStreamV3 extends AbstractAdsStream {
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;
private final FlowControlCounter flowControlCounter = new FlowControlCounter();

@Override
public boolean isReady() {
Expand Down Expand Up @@ -554,21 +482,6 @@ void request(int count) {
requestWriter.request(count);
}

@Override
void onFanOutDone() {
flowControlCounter.onFanOutDone();
}

@Override
void onResourceFanOut(int count) {
flowControlCounter.onFanOut(count);
}

@Override
Runnable getFlowControlCallback() {
return flowControlCounter.getCallbackRunnable();
}

@Override
void sendError(Exception error) {
requestWriter.onError(error);
Expand Down
124 changes: 55 additions & 69 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -110,6 +111,7 @@ public void uncaughtException(Thread t, Throwable e) {
private final InternalLogId logId;
private final XdsLogger logger;
private volatile boolean isShutdown;
private final AtomicInteger pendingPub = new AtomicInteger();

XdsClientImpl(
XdsChannelFactory xdsChannelFactory,
Expand Down Expand Up @@ -188,7 +190,7 @@ public void handleStreamClosed(Status error) {
resourceSubscribers.values()) {
for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
if (!subscriber.hasResult()) {
subscriber.onError(error);
subscriber.onError(error, false);
}
}
}
Expand Down Expand Up @@ -452,45 +454,45 @@ private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Arg
long updateTime = timeProvider.currentTimeNanos();
Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
pendingPub.set(subscribedResources.size());
for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();

if (parsedResources.containsKey(resourceName)) {
// Happy path: the resource updated successfully. Notify the watchers of the update.
subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime);
continue;
}

if (invalidResources.contains(resourceName)) {
// The resource update is invalid. Capture the error without notifying the watchers.
subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
}

// Nothing else to do for incremental ADS resources.
if (!xdsResourceType.isFullStateOfTheWorld()) {
continue;
}

// Handle State of the World ADS: invalid resources.
if (invalidResources.contains(resourceName)) {
if (parsedResources.containsKey(resourceName)) {
// Happy path: the resource updated successfully. Notify the watchers of the update.
subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime);
} else if (!xdsResourceType.isFullStateOfTheWorld()) {
pendingPub.decrementAndGet();
// Nothing else to do for incremental ADS resources.
} else if (invalidResources.contains(resourceName)) {
// Handle State of the World ADS: invalid resources.
// The resource is missing. Reuse the cached resource if possible.
if (subscriber.data == null) {
// No cached data. Notify the watchers of an invalid update.
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), true);
} else {
pendingPub.decrementAndGet();
}
continue;
}

// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update. Note that we can only do this if the resource update is coming from
// the same xDS server that the ResourceSubscriber is subscribed to.
if (subscriber.serverInfo.equals(args.serverInfo)) {
} else if (subscriber.serverInfo.equals(args.serverInfo)) {
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update. Note that we can only do this if the resource update is coming from
// the same xDS server that the ResourceSubscriber is subscribed to.
subscriber.onAbsent();
} else {
pendingPub.decrementAndGet();
}
}
for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
entry.getValue().onFinishUpdate();
maybeCompletedHandleResourceUpdate(args.serverInfo);
}

private void maybeCompletedHandleResourceUpdate(ServerInfo serverInfo) {
if (pendingPub.get() == 0) {
serverChannelMap.get(serverInfo).flowControlRequest(1);
}
}

Expand Down Expand Up @@ -565,35 +567,19 @@ private ServerInfo getServerInfo(String resource) {
void addWatcher(ResourceWatcher<T> watcher, SynchronizationContext watcherSyncContext) {
checkArgument(!watchers.containsKey(watcher), "watcher %s already registered", watcher);
watchers.put(watcher, watcherSyncContext);
if (xdsChannel != null) {
xdsChannel.onFanOut(1);
}
T savedData = data;
boolean savedAbsent = absent;
watchers.get(watcher).execute(() -> {
try {
if (errorDescription != null) {
watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
return;
}
if (savedData != null) {
notifyWatcher(watcher, data);
} else if (savedAbsent) {
watcher.onResourceDoesNotExist(resource);
}
} finally {
callbackFlowControl();
if (errorDescription != null) {
watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
return;
}
});
}

private void callbackFlowControl() {
if (xdsChannel != null) {
Runnable cb = xdsChannel.getFlowControlCallback();
if (cb != null) {
cb.run();
if (savedData != null) {
notifyWatcher(watcher, data);
} else if (savedAbsent) {
watcher.onResourceDoesNotExist(resource);
}
}
});
}

void removeWatcher(ResourceWatcher<T> watcher) {
Expand Down Expand Up @@ -679,22 +665,26 @@ void onData(ParsedResource<T> parsedResource, String version, long updateTime) {
resourceDeletionIgnored = false;
}
if (!Objects.equals(oldData, data)) {
if (xdsChannel != null) {
xdsChannel.onFanOut(watchers.size());
}
AtomicInteger fanOutCount = new AtomicInteger(watchers.size());
for (ResourceWatcher<T> watcher : watchers.keySet()) {
watchers.get(watcher).execute(() -> {
try {
notifyWatcher(watcher, data);
} finally {
callbackFlowControl();
if (fanOutCount.decrementAndGet() == 0) {
pendingPub.decrementAndGet();
maybeCompletedHandleResourceUpdate(serverInfo);
}
}
});
}
} else {
pendingPub.decrementAndGet();
}
}

void onAbsent() {
pendingPub.decrementAndGet();
if (respTimer != null && respTimer.isPending()) { // too early to conclude absence
return;
}
Expand All @@ -715,25 +705,27 @@ void onAbsent() {

logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
if (!absent) {
pendingPub.incrementAndGet();
data = null;
absent = true;
metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
if (xdsChannel != null) {
xdsChannel.onFanOut(watchers.size());
}
AtomicInteger fanOutCount = new AtomicInteger(watchers.size());
for (ResourceWatcher<T> watcher : watchers.keySet()) {
watchers.get(watcher).execute(() -> {
try {
watcher.onResourceDoesNotExist(resource);
} finally {
callbackFlowControl();
if (fanOutCount.decrementAndGet() == 0) {
pendingPub.decrementAndGet();
maybeCompletedHandleResourceUpdate(serverInfo);
}
}
});
}
}
}

void onError(Status error) {
void onError(Status error, boolean doFlowControl) {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();
respTimer = null;
Expand All @@ -746,15 +738,16 @@ void onError(Status error) {
.withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
.withCause(error.getCause());

if (xdsChannel != null) {
xdsChannel.onFanOut(watchers.size());
}
AtomicInteger fanOutCount = new AtomicInteger(watchers.size());
for (ResourceWatcher<T> watcher : watchers.keySet()) {
watchers.get(watcher).execute(() -> {
try {
watcher.onError(errorAugmented);
} finally {
callbackFlowControl();
if (fanOutCount.decrementAndGet() == 0 && doFlowControl) {
pendingPub.decrementAndGet();
maybeCompletedHandleResourceUpdate(serverInfo);
}
}
});
}
Expand All @@ -768,13 +761,6 @@ void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetail
private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
watcher.onChanged(update);
}

// For flow control
void onFinishUpdate() {
if (xdsChannel != null) {
xdsChannel.onFanOutDone();
}
}
}

static final class ResourceInvalidException extends Exception {
Expand Down

0 comments on commit e9f81f6

Please sign in to comment.