Skip to content

Commit

Permalink
fix and test
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Dec 8, 2023
1 parent e9f81f6 commit 176afae
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 71 deletions.
6 changes: 6 additions & 0 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -88,6 +89,7 @@ final class ControlPlaneClient {
private BackoffPolicy retryBackoffPolicy;
@Nullable
private ScheduledHandle rpcRetryTimer;
private final AtomicInteger pendingPub = new AtomicInteger();

/** An entity that manages ADS RPCs over a single channel. */
// TODO: rename to XdsChannel
Expand Down Expand Up @@ -132,6 +134,10 @@ void flowControlRequest(int count) {
}
}

AtomicInteger flowControlWindow() {
return pendingPub;
}

void shutdown() {
syncContext.execute(new Runnable() {
@Override
Expand Down
160 changes: 89 additions & 71 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public void uncaughtException(Thread t, Throwable e) {
private final InternalLogId logId;
private final XdsLogger logger;
private volatile boolean isShutdown;
private final AtomicInteger pendingPub = new AtomicInteger();

XdsClientImpl(
XdsChannelFactory xdsChannelFactory,
Expand Down Expand Up @@ -341,7 +340,6 @@ public void run() {
if (resourceSubscribers.get(type).isEmpty()) {
resourceSubscribers.remove(type);
subscribedResourceTypeUrls.remove(type.typeUrl());

}
}
}
Expand Down Expand Up @@ -454,7 +452,11 @@ private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Arg
long updateTime = timeProvider.currentTimeNanos();
Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
pendingPub.set(subscribedResources.size());
if (subscribedResources.size() == 0) {
maybeCompletedHandleResourceUpdate(args.serverInfo);

Check warning on line 456 in xds/src/main/java/io/grpc/xds/XdsClientImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClientImpl.java#L456

Added line #L456 was not covered by tests
} else {
serverChannelMap.get(args.serverInfo).flowControlWindow().set(subscribedResources.size());
}
for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
Expand All @@ -467,7 +469,7 @@ private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Arg
// Happy path: the resource updated successfully. Notify the watchers of the update.
subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime);
} else if (!xdsResourceType.isFullStateOfTheWorld()) {
pendingPub.decrementAndGet();
maybeCompletedHandleResourceUpdate(args.serverInfo);
// Nothing else to do for incremental ADS resources.
} else if (invalidResources.contains(resourceName)) {
// Handle State of the World ADS: invalid resources.
Expand All @@ -476,22 +478,22 @@ private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Arg
// No cached data. Notify the watchers of an invalid update.
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), true);
} else {
pendingPub.decrementAndGet();
maybeCompletedHandleResourceUpdate(args.serverInfo);
}
} else if (subscriber.serverInfo.equals(args.serverInfo)) {
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update. Note that we can only do this if the resource update is coming from
// the same xDS server that the ResourceSubscriber is subscribed to.
subscriber.onAbsent();
} else {
pendingPub.decrementAndGet();
maybeCompletedHandleResourceUpdate(args.serverInfo);
}
}
maybeCompletedHandleResourceUpdate(args.serverInfo);
}

private void maybeCompletedHandleResourceUpdate(ServerInfo serverInfo) {
if (pendingPub.get() == 0) {
if (serverInfo != null
&& serverChannelMap.get(serverInfo).flowControlWindow().decrementAndGet() == 0) {
serverChannelMap.get(serverInfo).flowControlRequest(1);
}
}
Expand Down Expand Up @@ -649,78 +651,92 @@ boolean hasResult() {
}

void onData(ParsedResource<T> parsedResource, String version, long updateTime) {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();
respTimer = null;
}
this.metadata = ResourceMetadata
.newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
ResourceUpdate oldData = this.data;
this.data = parsedResource.getResourceUpdate();
absent = false;
if (resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
+ "of resource for which we previously ignored a deletion: type {1} name {2}",
serverInfo != null ? serverInfo.target() : "unknown", type, resource);
resourceDeletionIgnored = false;
}
if (!Objects.equals(oldData, data)) {
AtomicInteger fanOutCount = new AtomicInteger(watchers.size());
for (ResourceWatcher<T> watcher : watchers.keySet()) {
watchers.get(watcher).execute(() -> {
try {
notifyWatcher(watcher, data);
} finally {
if (fanOutCount.decrementAndGet() == 0) {
pendingPub.decrementAndGet();
maybeCompletedHandleResourceUpdate(serverInfo);
boolean pendingProcess = false;
try {
if (respTimer != null && respTimer.isPending()) {
respTimer.cancel();
respTimer = null;
}
this.metadata = ResourceMetadata
.newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
ResourceUpdate oldData = this.data;
this.data = parsedResource.getResourceUpdate();
absent = false;
if (resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
+ "of resource for which we previously ignored a deletion: type {1} name {2}",
serverInfo != null ? serverInfo.target() : "unknown", type, resource);
resourceDeletionIgnored = false;
}
if (!Objects.equals(oldData, data)) {
AtomicInteger fanOutCount = new AtomicInteger(watchers.size());
if (fanOutCount.get() > 0) {
pendingProcess = true;
}
for (ResourceWatcher<T> watcher : watchers.keySet()) {
watchers.get(watcher).execute(() -> {
try {
notifyWatcher(watcher, data);
} finally {
if (fanOutCount.decrementAndGet() == 0) {
maybeCompletedHandleResourceUpdate(serverInfo);
}
}
}
});
});
}
}
} finally {
if (!pendingProcess) {
maybeCompletedHandleResourceUpdate(serverInfo);
}
} else {
pendingPub.decrementAndGet();
}
}

void onAbsent() {
pendingPub.decrementAndGet();
if (respTimer != null && respTimer.isPending()) { // too early to conclude absence
return;
}
boolean pendingProcess = false;
try {
if (respTimer != null && respTimer.isPending()) { // too early to conclude absence
return;
}

// Ignore deletion of State of the World resources when this feature is on,
// and the resource is reusable.
boolean ignoreResourceDeletionEnabled =
serverInfo != null && serverInfo.ignoreResourceDeletion();
if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING,
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
serverInfo.target(), type, resource);
resourceDeletionIgnored = true;
// Ignore deletion of State of the World resources when this feature is on,
// and the resource is reusable.
boolean ignoreResourceDeletionEnabled =
serverInfo != null && serverInfo.ignoreResourceDeletion();
if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING,
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
serverInfo.target(), type, resource);
resourceDeletionIgnored = true;
}
return;
}
return;
}

logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
if (!absent) {
pendingPub.incrementAndGet();
data = null;
absent = true;
metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
AtomicInteger fanOutCount = new AtomicInteger(watchers.size());
for (ResourceWatcher<T> watcher : watchers.keySet()) {
watchers.get(watcher).execute(() -> {
try {
watcher.onResourceDoesNotExist(resource);
} finally {
if (fanOutCount.decrementAndGet() == 0) {
pendingPub.decrementAndGet();
maybeCompletedHandleResourceUpdate(serverInfo);
logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
if (!absent) {
data = null;
absent = true;
metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
AtomicInteger fanOutCount = new AtomicInteger(watchers.size());
if (fanOutCount.get() > 0) {
pendingProcess = true;
}
for (ResourceWatcher<T> watcher : watchers.keySet()) {
watchers.get(watcher).execute(() -> {
try {
watcher.onResourceDoesNotExist(resource);
} finally {
if (fanOutCount.decrementAndGet() == 0) {
maybeCompletedHandleResourceUpdate(serverInfo);
}
}
}
});
});
}
}
} finally {
if (!pendingProcess) {
maybeCompletedHandleResourceUpdate(serverInfo);
}
}
}
Expand All @@ -739,13 +755,15 @@ void onError(Status error, boolean doFlowControl) {
.withCause(error.getCause());

AtomicInteger fanOutCount = new AtomicInteger(watchers.size());
if (fanOutCount.get() == 0 && doFlowControl) {
maybeCompletedHandleResourceUpdate(serverInfo);

Check warning on line 759 in xds/src/main/java/io/grpc/xds/XdsClientImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/XdsClientImpl.java#L759

Added line #L759 was not covered by tests
}
for (ResourceWatcher<T> watcher : watchers.keySet()) {
watchers.get(watcher).execute(() -> {
try {
watcher.onError(errorAugmented);
} finally {
if (fanOutCount.decrementAndGet() == 0 && doFlowControl) {
pendingPub.decrementAndGet();
maybeCompletedHandleResourceUpdate(serverInfo);
}
}
Expand Down

0 comments on commit 176afae

Please sign in to comment.