Skip to content

Commit

Permalink
xds: XdsClient should unsubscribe on last resource (#11264)
Browse files Browse the repository at this point in the history
Otherwise, the server will continue sending updates and if we
re-subscribe to the last resource, the server won't re-send it. Also
completely remove the per-type state, as it could only add confusion.
  • Loading branch information
lujiajing1126 authored and ejona86 committed Aug 2, 2024
1 parent 752a045 commit 19c9b99
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 6 deletions.
10 changes: 8 additions & 2 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,14 @@ void adjustResourceSubscription(XdsResourceType<?> resourceType) {
startRpcStream();
}
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
if (resources != null) {
adsStream.sendDiscoveryRequest(resourceType, resources);
if (resources == null) {
resources = Collections.emptyList();
}
adsStream.sendDiscoveryRequest(resourceType, resources);
if (resources.isEmpty()) {
// The resource type no longer has subscribing resources; clean up references to it
versions.remove(resourceType);
adsStream.respNonces.remove(resourceType);
}
}

Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T>
@SuppressWarnings("unchecked")
public void run() {
ResourceSubscriber<T> subscriber =
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.cancelResourceWatch();
Expand Down
46 changes: 45 additions & 1 deletion xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

/**
* Tests for {@link XdsClientImpl}.
Expand Down Expand Up @@ -2757,6 +2758,37 @@ public void edsResourceNotFound() {
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
}

@Test
public void edsCleanupNonceAfterUnsubscription() {
Assume.assumeFalse(ignoreResourceDeletion());

// Suppose we have an EDS subscription A.1
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
assertThat(call).isNotNull();
call.verifyRequest(EDS, "A.1", "", "", NODE);

// EDS -> {A.1}, version 1
List<Message> dropOverloads = ImmutableList.of();
List<Message> endpointsV1 = ImmutableList.of(lbEndpointHealthy);
ImmutableMap<String, Any> resourcesV1 = ImmutableMap.of(
"A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads)));
call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000");
// {A.1} -> ACK, version 1
call.verifyRequest(EDS, "A.1", VERSION_1, "0000", NODE);
verify(edsResourceWatcher, times(1)).onChanged(any());

// trigger an EDS resource unsubscription.
xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 0);
call.verifyRequest(EDS, Arrays.asList(), VERSION_1, "0000", NODE);

// When re-subscribing, the version and nonce were properly forgotten, so the request is the
// same as the initial request
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
call.verifyRequest(EDS, "A.1", "", "", NODE, Mockito.timeout(2000).times(2));
}

@Test
public void edsResponseErrorHandling_allResourcesFailedUnpack() {
DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE,
Expand Down Expand Up @@ -3787,10 +3819,22 @@ protected abstract static class DiscoveryRpcCall {

protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
Node node) {
Node node, VerificationMode verificationMode) {
throw new UnsupportedOperationException();
}

protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
Node node) {
verifyRequest(type, resources, versionInfo, nonce, node, Mockito.timeout(2000));
}

protected void verifyRequest(
XdsResourceType<?> type, String resource, String versionInfo, String nonce,
Node node, VerificationMode verificationMode) {
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node, verificationMode);
}

protected void verifyRequest(
XdsResourceType<?> type, String resource, String versionInfo, String nonce, Node node) {
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node);
Expand Down
5 changes: 3 additions & 2 deletions xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

/**
* Tests for {@link XdsClientImpl} with protocol version v3.
Expand Down Expand Up @@ -205,8 +206,8 @@ private DiscoveryRpcCallV3(StreamObserver<DiscoveryRequest> requestObserver,
@Override
protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node) {
verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher(
EnvoyProtoData.Node node, VerificationMode verificationMode) {
verify(requestObserver, verificationMode).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null)));
}

Expand Down

0 comments on commit 19c9b99

Please sign in to comment.