Skip to content

Commit

Permalink
add flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Nov 18, 2023
1 parent 86835ae commit 169dacc
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 18 deletions.
111 changes: 108 additions & 3 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import com.google.rpc.Code;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
Expand All @@ -38,7 +39,6 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ResourceStore;
Expand All @@ -54,6 +54,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -127,6 +128,12 @@ Channel channel() {
return channel;
}

void flowControlRequest(int count) {
if (adsStream != null) {
adsStream.request(count);
}
}

void shutdown() {
syncContext.execute(new Runnable() {
@Override
Expand Down Expand Up @@ -271,6 +278,64 @@ XdsResourceType<?> fromTypeUrl(String typeUrl) {
return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
}

void onFanOutDone() {
if (adsStream != null) {
adsStream.onFanOutDone();
}
}

void onFanOut(int count) {
if (adsStream != null) {
adsStream.onResourceFanOut(count);
}
}

SettableFuture<Void> createFlowControlCallBack() {
if (adsStream != null) {
return adsStream.createFlowControlCb();
}
return null;
}

/**
* For xds client flow control.
* For each xds response, increment counter when sending out one notification for a watcher with a
* callback.
* Mark waitComplete when all the watchers are notified (add integer MIN_VALUE). Listener on each
* watcher that calls back upon finish and decrement the counter. When all the watchers finish
* update, reset the counter and request the next response message.
*/
private final class FlowControlCounter {
private final AtomicInteger pendingWatcherCounter = new AtomicInteger(0);
private final Runnable callbackRunnable = new Runnable() {
@Override
public void run() {
if (pendingWatcherCounter.decrementAndGet() == Integer.MIN_VALUE) {
pendingWatcherCounter.getAndSet(0);
flowControlRequest(1);
}
}
};

void onFanOut(int newEventsCount) {
pendingWatcherCounter.getAndAdd(newEventsCount);
}

void onFanOutDone() {
if (pendingWatcherCounter.get() >= 0
&& pendingWatcherCounter.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
pendingWatcherCounter.getAndSet(0);
flowControlRequest(1);
}
}

SettableFuture<Void> callback() {
SettableFuture<Void> callback = SettableFuture.create();
callback.addListener(callbackRunnable, syncContext);
return callback;
}
}

private abstract class AbstractAdsStream {
private boolean responseReceived;
private boolean closed;
Expand All @@ -288,6 +353,22 @@ private abstract class AbstractAdsStream {

abstract boolean isReady();

abstract void request(int count);

/**
* For xDS stream flow control. Sending each watcher notification increment the counter
* {@link #onFanOut(int)}.
* Processing completion on each watcher decrement the counter via the callback
* {@link #createFlowControlCb}. When all the watchers subscribed to the resources in the
* response have been notified {@link #onFanOutDone} and counter reaches zero, ads stream is
* ready to receive the next message.
*/
abstract void onFanOutDone();

abstract void onResourceFanOut(int count);

abstract SettableFuture<Void> createFlowControlCb();

/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
Expand Down Expand Up @@ -372,14 +453,16 @@ private void cleanUp() {
}

private final class AdsStreamV3 extends AbstractAdsStream {
private StreamObserver<DiscoveryRequest> requestWriter;
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;
private final FlowControlCounter flowControlCounter = new FlowControlCounter();

@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}

@Override
@SuppressWarnings("unchecked")
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);
Expand All @@ -389,6 +472,7 @@ final class AdsClientResponseObserver

@Override
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
requestStream.disableAutoRequestWithInitial(1);
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
}

Expand Down Expand Up @@ -437,7 +521,8 @@ public void run() {
}
}

requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver());
requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
new AdsClientResponseObserver());
}

@Override
Expand Down Expand Up @@ -467,6 +552,26 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
}
}

@Override
void request(int count) {
requestWriter.request(count);
}

@Override
void onFanOutDone() {
flowControlCounter.onFanOutDone();
}

@Override
void onResourceFanOut(int count) {
flowControlCounter.onFanOut(count);
}

@Override
SettableFuture<Void> createFlowControlCb() {
return flowControlCounter.callback();
}

@Override
void sendError(Exception error) {
requestWriter.onError(error);
Expand Down
85 changes: 70 additions & 15 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void run() {
subscribedResourceTypeUrls.put(type.typeUrl(), type);
}
ResourceSubscriber<T> subscriber =
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
if (subscriber == null) {
logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
subscriber = new ResourceSubscriber<>(type, resourceName);
Expand Down Expand Up @@ -483,6 +483,9 @@ private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Arg
subscriber.onAbsent();
}
}
for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
entry.getValue().onFinishUpdate();
}
}

/**
Expand Down Expand Up @@ -556,14 +559,32 @@ private ServerInfo getServerInfo(String resource) {
void addWatcher(ResourceWatcher<T> watcher) {
checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher);
watchers.add(watcher);
if (errorDescription != null) {
watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
return;
}
if (data != null) {
notifyWatcher(watcher, data);
} else if (absent) {
watcher.onResourceDoesNotExist(resource);
if (xdsChannel != null) {
xdsChannel.onFanOut(1);
}
syncContext.execute(() -> {
try {
if (errorDescription != null) {
watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
return;
}
if (data != null) {
notifyWatcher(watcher, data);
} else if (absent) {
watcher.onResourceDoesNotExist(resource);
}
} finally {
callbackFlowControl();
}
});
}

private void callbackFlowControl() {
if (xdsChannel != null) {
SettableFuture<Void> cb = xdsChannel.createFlowControlCallBack();
if (cb != null) {
cb.set(null);
}
}
}

Expand Down Expand Up @@ -650,9 +671,18 @@ void onData(ParsedResource<T> parsedResource, String version, long updateTime) {
resourceDeletionIgnored = false;
}
if (!Objects.equals(oldData, data)) {
for (ResourceWatcher<T> watcher : watchers) {
notifyWatcher(watcher, data);
if (xdsChannel != null) {
xdsChannel.onFanOut(watchers.size());
}
syncContext.execute(() -> {
for (ResourceWatcher<T> watcher : watchers) {
try {
notifyWatcher(watcher, data);
} finally {
callbackFlowControl();
}
}
});
}
}

Expand Down Expand Up @@ -680,9 +710,18 @@ void onAbsent() {
data = null;
absent = true;
metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
for (ResourceWatcher<T> watcher : watchers) {
watcher.onResourceDoesNotExist(resource);
if (xdsChannel != null) {
xdsChannel.onFanOut(watchers.size());
}
syncContext.execute(() -> {
for (ResourceWatcher<T> watcher : watchers) {
try {
watcher.onResourceDoesNotExist(resource);
} finally {
callbackFlowControl();
}
}
});
}
}

Expand All @@ -699,9 +738,18 @@ void onError(Status error) {
.withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
.withCause(error.getCause());

for (ResourceWatcher<T> watcher : watchers) {
watcher.onError(errorAugmented);
if (xdsChannel != null) {
xdsChannel.onFanOut(watchers.size());
}
syncContext.execute(() -> {
for (ResourceWatcher<T> watcher : watchers) {
try {
watcher.onError(errorAugmented);
} finally {
callbackFlowControl();
}
}
});
}

void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
Expand All @@ -712,6 +760,13 @@ void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetail
private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
watcher.onChanged(update);
}

// For flow control
void onFinishUpdate() {
if (xdsChannel != null) {
xdsChannel.onFanOutDone();
}
}
}

static final class ResourceInvalidException extends Exception {
Expand Down

0 comments on commit 169dacc

Please sign in to comment.