Skip to content

Commit

Permalink
incomplete
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Nov 16, 2023
1 parent 86835ae commit 6adfd3e
Show file tree
Hide file tree
Showing 16 changed files with 545 additions and 225 deletions.
16 changes: 13 additions & 3 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
Expand Down Expand Up @@ -335,7 +336,7 @@ void shutdown() {
}

@Override
public void onError(Status error) {
public void onError(Status error, SettableFuture<Void> callback) {
Status status = Status.UNAVAILABLE
.withDescription(
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
Expand All @@ -351,12 +352,15 @@ public void run() {
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
if (callback != null) {
callback.set(null);
}
}
});
}

@Override
public void onResourceDoesNotExist(String resourceName) {
public void onResourceDoesNotExist(String resourceName, SettableFuture<Void> callback) {
syncContext.execute(new Runnable() {
@Override
public void run() {
Expand All @@ -372,12 +376,15 @@ public void run() {
childClusterStates = null;
}
handleClusterDiscovered();
if (callback != null) {
callback.set(null);
}
}
});
}

@Override
public void onChanged(final CdsUpdate update) {
public void onChanged(final CdsUpdate update, SettableFuture<Void> callback) {
class ClusterDiscovered implements Runnable {
@Override
public void run() {
Expand Down Expand Up @@ -432,6 +439,9 @@ public void run() {
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
}
handleClusterDiscovered();
if (callback != null) {
callback.set(null);
}
}
}

Expand Down
16 changes: 13 additions & 3 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
Expand Down Expand Up @@ -378,7 +379,7 @@ protected void shutdown() {
}

@Override
public void onChanged(final EdsUpdate update) {
public void onChanged(final EdsUpdate update, SettableFuture<Void> callback) {
class EndpointsUpdated implements Runnable {
@Override
public void run() {
Expand Down Expand Up @@ -449,6 +450,9 @@ public void run() {
result = new ClusterResolutionResult(addresses, priorityChildConfigs,
sortedPriorityNames);
handleEndpointResourceUpdate();
if (callback != null) {
callback.set(null);
}
}
}

Expand Down Expand Up @@ -490,7 +494,7 @@ private List<String> generatePriorityNames(String name,
}

@Override
public void onResourceDoesNotExist(final String resourceName) {
public void onResourceDoesNotExist(final String resourceName, SettableFuture<Void> callback) {
syncContext.execute(new Runnable() {
@Override
public void run() {
Expand All @@ -502,12 +506,15 @@ public void run() {
resolved = true;
result = null; // resource revoked
handleEndpointResourceUpdate();
if (callback != null) {
callback.set(null);
}
}
});
}

@Override
public void onError(final Status error) {
public void onError(final Status error, SettableFuture<Void> callback) {
syncContext.execute(new Runnable() {
@Override
public void run() {
Expand All @@ -521,6 +528,9 @@ public void run() {
.withCause(error.getCause());
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
if (callback != null) {
callback.set(null);
}
}
});
}
Expand Down
110 changes: 107 additions & 3 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import com.google.rpc.Code;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
Expand All @@ -38,7 +39,6 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ResourceStore;
Expand All @@ -54,6 +54,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -127,6 +128,12 @@ Channel channel() {
return channel;
}

void flowControlRequest(int count) {
if (adsStream != null) {
adsStream.request(count);
}
}

void shutdown() {
syncContext.execute(new Runnable() {
@Override
Expand Down Expand Up @@ -271,6 +278,63 @@ XdsResourceType<?> fromTypeUrl(String typeUrl) {
return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
}

void waitCompletion() {
if (adsStream != null) {
adsStream.waitHandleResponse();
}
}

void onFanOut(int count) {
if (adsStream != null) {
adsStream.onResourceFanOut(count);
}
}

SettableFuture<Void> flowControlCallBack() {
if (adsStream != null) {
return adsStream.flowControlCb();
}
return null;
}

/**
* For xds client flow control.
* For each xds response, increment counter when sending out one notification for a watcher with a
* callback.
* Mark waitComplete when all the watchers are notified (add integer MIN_VALUE). Listener on each
* watcher that calls back upon finish and decrement the counter. When all the watchers finish
* update, reset the counter and request the next response message.
*/
private final class FlowControlCounter {
private final AtomicInteger pendingWatcherCounter = new AtomicInteger(0);
private final Runnable callbackRunnable = new Runnable() {
@Override
public void run() {
if (pendingWatcherCounter.decrementAndGet() == Integer.MIN_VALUE) {
pendingWatcherCounter.getAndSet(0);
flowControlRequest(1);
}
}
};

void add(int newEventsCount) {
pendingWatcherCounter.getAndAdd(newEventsCount);
}

void waitCompletion() {
if (pendingWatcherCounter.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
pendingWatcherCounter.getAndSet(0);
flowControlRequest(1);
}
}

SettableFuture<Void> callback() {
SettableFuture<Void> callback = SettableFuture.create();
callback.addListener(callbackRunnable, syncContext);
return callback;
}
}

private abstract class AbstractAdsStream {
private boolean responseReceived;
private boolean closed;
Expand All @@ -288,6 +352,22 @@ private abstract class AbstractAdsStream {

abstract boolean isReady();

abstract void request(int count);

/**
* For xDS stream flow control. Sending each watcher notification increment the counter
* {@link #onFanOut(int)}.
* Processing completion on each watcher decrement the counter via the callback
* {@link #flowControlCb}. When all the watchers subscribed to the resources in the response
* have been notified {@link #waitHandleResponse} and counter reaches zero, ads stream is ready
* to receive the message.
*/
abstract void waitHandleResponse();

abstract void onResourceFanOut(int count);

abstract SettableFuture<Void> flowControlCb();

/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
Expand Down Expand Up @@ -372,14 +452,16 @@ private void cleanUp() {
}

private final class AdsStreamV3 extends AbstractAdsStream {
private StreamObserver<DiscoveryRequest> requestWriter;
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;
private final FlowControlCounter flowControlCounter = new FlowControlCounter();

@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}

@Override
@SuppressWarnings("unchecked")
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);
Expand All @@ -389,6 +471,7 @@ final class AdsClientResponseObserver

@Override
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
requestStream.disableAutoRequestWithInitial(1);
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
}

Expand Down Expand Up @@ -437,7 +520,8 @@ public void run() {
}
}

requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver());
requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
new AdsClientResponseObserver());
}

@Override
Expand Down Expand Up @@ -467,6 +551,26 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
}
}

@Override
void request(int count) {
requestWriter.request(count);
}

@Override
void waitHandleResponse() {
flowControlCounter.waitCompletion();
}

@Override
void onResourceFanOut(int count) {
flowControlCounter.add(count);
}

@Override
SettableFuture<Void> flowControlCb() {
return flowControlCounter.callback();
}

@Override
void sendError(Exception error) {
requestWriter.onError(error);
Expand Down
7 changes: 4 additions & 3 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Splitter;
import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import io.grpc.Status;
import io.grpc.xds.Bootstrapper.ServerInfo;
Expand Down Expand Up @@ -125,16 +126,16 @@ interface ResourceWatcher<T extends ResourceUpdate> {
* - Keep {@link Status} description in one form or another, as it contains valuable debugging
* information.
*/
void onError(Status error);
void onError(Status error, @Nullable SettableFuture<Void> callback);

/**
* Called when the requested resource is not available.
*
* @param resourceName name of the resource requested in discovery request.
*/
void onResourceDoesNotExist(String resourceName);
void onResourceDoesNotExist(String resourceName, @Nullable SettableFuture<Void> callback);

void onChanged(T update);
void onChanged(T update, @Nullable SettableFuture<Void> callback);
}

/**
Expand Down
Loading

0 comments on commit 6adfd3e

Please sign in to comment.