Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xDS: implement ADS stream flow control mechanism #10674

Merged
merged 14 commits into from
Dec 16, 2023
145 changes: 64 additions & 81 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@

private void start() {
shutdown = false;
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this);
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
}

void shutdown() {
Expand All @@ -341,102 +341,85 @@
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
name, error.getCode(), error.getDescription()))
.withCause(error.getCause());
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
}
});
if (shutdown) {
return;

Check warning on line 345 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L345

Added line #L345 was not covered by tests
}
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
}

@Override
public void onResourceDoesNotExist(String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
discovered = true;
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
}
childClusterStates = null;
}
handleClusterDiscovered();
if (shutdown) {
return;

Check warning on line 356 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L356

Added line #L356 was not covered by tests
}
discovered = true;
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
}
});
childClusterStates = null;
}
handleClusterDiscovered();
}

@Override
public void onChanged(final CdsUpdate update) {
class ClusterDiscovered implements Runnable {
@Override
public void run() {
if (shutdown) {
return;
if (shutdown) {
return;

Check warning on line 372 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L372

Added line #L372 was not covered by tests
}
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update;
if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
update.clusterName(), update.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : update.prioritizedClusterNames()) {
if (newChildStates.containsKey(cluster)) {
logger.log(XdsLogLevel.WARNING,
String.format("duplicate cluster name %s in aggregate %s is being ignored",
cluster, update.clusterName()));
continue;
}

logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update;
if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
update.clusterName(), update.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : update.prioritizedClusterNames()) {
if (newChildStates.containsKey(cluster)) {
logger.log(XdsLogLevel.WARNING,
String.format("duplicate cluster name %s in aggregate %s is being ignored",
cluster, update.clusterName()));
continue;
}
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState;
if (clusterStates.containsKey(cluster)) {
childState = clusterStates.get(cluster);
if (childState.shutdown) {
childState.start();
}
} else {
childState = new ClusterState(cluster);
clusterStates.put(cluster, childState);
childState.start();
}
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
}
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState;
if (clusterStates.containsKey(cluster)) {
childState = clusterStates.get(cluster);
if (childState.shutdown) {
childState.start();

Check warning on line 394 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L394

Added line #L394 was not covered by tests
}
} else {
childState = new ClusterState(cluster);
clusterStates.put(cluster, childState);
childState.start();
}
childClusterStates = newChildStates;
} else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName(), update.edsServiceName());
} else { // logical DNS
isLeaf = true;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
}
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
}
handleClusterDiscovered();
}
childClusterStates = newChildStates;
} else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName(), update.edsServiceName());
} else { // logical DNS
isLeaf = true;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
}

syncContext.execute(new ClusterDiscovered());
handleClusterDiscovered();
}

}
}
}
51 changes: 21 additions & 30 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@
void start() {
String resourceName = edsServiceName != null ? edsServiceName : name;
logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this);
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
resourceName, this, syncContext);
}

@Override
Expand Down Expand Up @@ -452,7 +453,7 @@
}
}

syncContext.execute(new EndpointsUpdated());
new EndpointsUpdated().run();
}

private List<String> generatePriorityNames(String name,
Expand Down Expand Up @@ -491,38 +492,28 @@

@Override
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
status = Status.OK;
resolved = true;
result = null; // resource revoked
handleEndpointResourceUpdate();
}
});
if (shutdown) {
return;

Check warning on line 496 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L496

Added line #L496 was not covered by tests
}
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
status = Status.OK;
resolved = true;
result = null; // resource revoked
handleEndpointResourceUpdate();
}

@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
String resourceName = edsServiceName != null ? edsServiceName : name;
status = Status.UNAVAILABLE
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
resourceName, error.getCode(), error.getDescription()))
.withCause(error.getCause());
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
}
});
if (shutdown) {
return;

Check warning on line 508 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L508

Added line #L508 was not covered by tests
}
String resourceName = edsServiceName != null ? edsServiceName : name;
status = Status.UNAVAILABLE
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
resourceName, error.getCode(), error.getDescription()))
.withCause(error.getCause());
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
}
}

Expand Down
21 changes: 17 additions & 4 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ProcessingTracker;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
Expand Down Expand Up @@ -288,6 +288,8 @@ private abstract class AbstractAdsStream {

abstract boolean isReady();

abstract void request(int count);

/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
Expand All @@ -314,7 +316,10 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
}
responseReceived = true;
respNonces.put(type, nonce);
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
ProcessingTracker processingTracker = new ProcessingTracker(() -> request(1), syncContext);
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
processingTracker);
processingTracker.onComplete();
}

final void handleRpcError(Throwable t) {
Expand Down Expand Up @@ -372,14 +377,15 @@ private void cleanUp() {
}

private final class AdsStreamV3 extends AbstractAdsStream {
private StreamObserver<DiscoveryRequest> requestWriter;
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;

@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}

@Override
@SuppressWarnings("unchecked")
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);
Expand All @@ -389,6 +395,7 @@ final class AdsClientResponseObserver

@Override
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
requestStream.disableAutoRequestWithInitial(1);
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
}

Expand Down Expand Up @@ -437,7 +444,8 @@ public void run() {
}
}

requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver());
requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
new AdsClientResponseObserver());
}

@Override
Expand Down Expand Up @@ -467,6 +475,11 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
}
}

@Override
void request(int count) {
requestWriter.request(count);
}

@Override
void sendError(Exception error) {
requestWriter.onError(error);
Expand Down
34 changes: 32 additions & 2 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Splitter;
import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import io.grpc.Status;
import io.grpc.xds.Bootstrapper.ServerInfo;
Expand All @@ -36,6 +37,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -305,10 +308,16 @@ TlsContextManager getTlsContextManager() {
* Registers a data watcher for the given Xds resource.
*/
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) {
ResourceWatcher<T> watcher,
Executor executor) {
throw new UnsupportedOperationException();
}

<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) {
watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
}

/**
* Unregisters the given resource watcher.
*/
Expand Down Expand Up @@ -353,11 +362,32 @@ Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
throw new UnsupportedOperationException();
}

static final class ProcessingTracker {
private final AtomicInteger pendingTask = new AtomicInteger(1);
private final Executor executor;
private final Runnable completionListener;

ProcessingTracker(Runnable completionListener, Executor executor) {
this.executor = executor;
this.completionListener = completionListener;
}

void startTask() {
pendingTask.incrementAndGet();
}

void onComplete() {
if (pendingTask.decrementAndGet() == 0) {
executor.execute(completionListener);
}
}
}

interface XdsResponseHandler {
/** Called when a xds response is received. */
void handleResourceResponse(
XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
List<Any> resources, String nonce);
List<Any> resources, String nonce, ProcessingTracker processingTracker);

/** Called when the ADS stream is closed passively. */
// Must be synchronized.
Expand Down
Loading