Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xDS: implement ADS stream flow control mechanism #10674

Merged
merged 14 commits into from
Dec 16, 2023
145 changes: 64 additions & 81 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@

private void start() {
shutdown = false;
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this);
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
}

void shutdown() {
Expand All @@ -341,102 +341,85 @@
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
name, error.getCode(), error.getDescription()))
.withCause(error.getCause());
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
}
});
if (shutdown) {
return;

Check warning on line 345 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L345

Added line #L345 was not covered by tests
}
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
}

@Override
public void onResourceDoesNotExist(String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
discovered = true;
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
}
childClusterStates = null;
}
handleClusterDiscovered();
if (shutdown) {
return;

Check warning on line 356 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L356

Added line #L356 was not covered by tests
}
discovered = true;
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
}
});
childClusterStates = null;
}
handleClusterDiscovered();
}

@Override
public void onChanged(final CdsUpdate update) {
class ClusterDiscovered implements Runnable {
@Override
public void run() {
if (shutdown) {
return;
if (shutdown) {
return;

Check warning on line 372 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L372

Added line #L372 was not covered by tests
}
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update;
if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
update.clusterName(), update.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : update.prioritizedClusterNames()) {
if (newChildStates.containsKey(cluster)) {
logger.log(XdsLogLevel.WARNING,
String.format("duplicate cluster name %s in aggregate %s is being ignored",
cluster, update.clusterName()));
continue;
}

logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update;
if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
update.clusterName(), update.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : update.prioritizedClusterNames()) {
if (newChildStates.containsKey(cluster)) {
logger.log(XdsLogLevel.WARNING,
String.format("duplicate cluster name %s in aggregate %s is being ignored",
cluster, update.clusterName()));
continue;
}
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState;
if (clusterStates.containsKey(cluster)) {
childState = clusterStates.get(cluster);
if (childState.shutdown) {
childState.start();
}
} else {
childState = new ClusterState(cluster);
clusterStates.put(cluster, childState);
childState.start();
}
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
}
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState;
if (clusterStates.containsKey(cluster)) {
childState = clusterStates.get(cluster);
if (childState.shutdown) {
childState.start();

Check warning on line 394 in xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java#L394

Added line #L394 was not covered by tests
}
} else {
childState = new ClusterState(cluster);
clusterStates.put(cluster, childState);
childState.start();
}
childClusterStates = newChildStates;
} else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName(), update.edsServiceName());
} else { // logical DNS
isLeaf = true;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
}
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
}
handleClusterDiscovered();
}
childClusterStates = newChildStates;
} else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName(), update.edsServiceName());
} else { // logical DNS
isLeaf = true;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
}

syncContext.execute(new ClusterDiscovered());
handleClusterDiscovered();
}

}
}
}
51 changes: 21 additions & 30 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@
void start() {
String resourceName = edsServiceName != null ? edsServiceName : name;
logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this);
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
resourceName, this, syncContext);
}

@Override
Expand Down Expand Up @@ -452,7 +453,7 @@
}
}

syncContext.execute(new EndpointsUpdated());
new EndpointsUpdated().run();
}

private List<String> generatePriorityNames(String name,
Expand Down Expand Up @@ -491,38 +492,28 @@

@Override
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
status = Status.OK;
resolved = true;
result = null; // resource revoked
handleEndpointResourceUpdate();
}
});
if (shutdown) {
return;

Check warning on line 496 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L496

Added line #L496 was not covered by tests
}
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
status = Status.OK;
resolved = true;
result = null; // resource revoked
handleEndpointResourceUpdate();
}

@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
String resourceName = edsServiceName != null ? edsServiceName : name;
status = Status.UNAVAILABLE
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
resourceName, error.getCode(), error.getDescription()))
.withCause(error.getCause());
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
}
});
if (shutdown) {
return;

Check warning on line 508 in xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java#L508

Added line #L508 was not covered by tests
}
String resourceName = edsServiceName != null ? edsServiceName : name;
status = Status.UNAVAILABLE
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
resourceName, error.getCode(), error.getDescription()))
.withCause(error.getCause());
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
}
}

Expand Down
27 changes: 24 additions & 3 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ResourceStore;
Expand All @@ -54,6 +53,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -89,6 +89,7 @@ final class ControlPlaneClient {
private BackoffPolicy retryBackoffPolicy;
@Nullable
private ScheduledHandle rpcRetryTimer;
private final AtomicInteger pendingPub = new AtomicInteger();

/** An entity that manages ADS RPCs over a single channel. */
// TODO: rename to XdsChannel
Expand Down Expand Up @@ -127,6 +128,16 @@ Channel channel() {
return channel;
}

void flowControlRequest(int count) {
if (adsStream != null) {
adsStream.request(count);
}
}

AtomicInteger flowControlWindow() {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
return pendingPub;
}

void shutdown() {
syncContext.execute(new Runnable() {
@Override
Expand Down Expand Up @@ -288,6 +299,8 @@ private abstract class AbstractAdsStream {

abstract boolean isReady();

abstract void request(int count);

/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
Expand Down Expand Up @@ -372,14 +385,15 @@ private void cleanUp() {
}

private final class AdsStreamV3 extends AbstractAdsStream {
private StreamObserver<DiscoveryRequest> requestWriter;
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;

@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}

@Override
@SuppressWarnings("unchecked")
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);
Expand All @@ -389,6 +403,7 @@ final class AdsClientResponseObserver

@Override
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
requestStream.disableAutoRequestWithInitial(1);
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
}

Expand Down Expand Up @@ -437,7 +452,8 @@ public void run() {
}
}

requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver());
requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
new AdsClientResponseObserver());
}

@Override
Expand Down Expand Up @@ -467,6 +483,11 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
}
}

@Override
void request(int count) {
requestWriter.request(count);
}

@Override
void sendError(Exception error) {
requestWriter.onError(error);
Expand Down
7 changes: 7 additions & 0 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
Expand Down Expand Up @@ -304,6 +305,12 @@
/**
* Registers a data watcher for the given Xds resource.
*/
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
SynchronizationContext syncContext) {
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException();

Check warning on line 311 in xds/src/main/java/io/grpc/xds/XdsClient.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClient.java#L311

Added line #L311 was not covered by tests
}

<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) {
throw new UnsupportedOperationException();
Expand Down
Loading