Skip to content

Commit

Permalink
fix eric's 1 coment
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Nov 28, 2023
1 parent fc38e2b commit a14e4ca
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 15 deletions.
19 changes: 8 additions & 11 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any;
import com.google.rpc.Code;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
Expand Down Expand Up @@ -290,9 +289,9 @@ void onFanOut(int count) {
}
}

SettableFuture<Void> createFlowControlCallBack() {
Runnable getFlowControlCallback() {
if (adsStream != null) {
return adsStream.createFlowControlCb();
return adsStream.getFlowControlCallback();
}
return null;
}
Expand Down Expand Up @@ -329,10 +328,8 @@ void onFanOutDone() {
}
}

SettableFuture<Void> callback() {
SettableFuture<Void> callback = SettableFuture.create();
callback.addListener(callbackRunnable, syncContext);
return callback;
Runnable getCallbackRunnable() {
return callbackRunnable;
}
}

Expand All @@ -359,15 +356,15 @@ private abstract class AbstractAdsStream {
* For xDS stream flow control. Sending each watcher notification increment the counter
* {@link #onFanOut(int)}.
* Processing completion on each watcher decrement the counter via the callback
* {@link #createFlowControlCb}. When all the watchers subscribed to the resources in the
* {@link #getFlowControlCallback}. When all the watchers subscribed to the resources in the
* response have been notified {@link #onFanOutDone} and counter reaches zero, ads stream is
* ready to receive the next message.
*/
abstract void onFanOutDone();

abstract void onResourceFanOut(int count);

abstract SettableFuture<Void> createFlowControlCb();
abstract Runnable getFlowControlCallback();

/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
Expand Down Expand Up @@ -568,8 +565,8 @@ void onResourceFanOut(int count) {
}

@Override
SettableFuture<Void> createFlowControlCb() {
return flowControlCounter.callback();
Runnable getFlowControlCallback() {
return flowControlCounter.getCallbackRunnable();
}

@Override
Expand Down
10 changes: 6 additions & 4 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -568,15 +568,17 @@ void addWatcher(ResourceWatcher<T> watcher, SynchronizationContext watcherSyncCo
if (xdsChannel != null) {
xdsChannel.onFanOut(1);
}
T savedData = data;
boolean savedAbsent = absent;
watchers.get(watcher).execute(() -> {
try {
if (errorDescription != null) {
watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
return;
}
if (data != null) {
if (savedData != null) {
notifyWatcher(watcher, data);
} else if (absent) {
} else if (savedAbsent) {
watcher.onResourceDoesNotExist(resource);
}
} finally {
Expand All @@ -587,9 +589,9 @@ void addWatcher(ResourceWatcher<T> watcher, SynchronizationContext watcherSyncCo

private void callbackFlowControl() {
if (xdsChannel != null) {
SettableFuture<Void> cb = xdsChannel.createFlowControlCallBack();
Runnable cb = xdsChannel.getFlowControlCallback();
if (cb != null) {
cb.set(null);
cb.run();
}
}
}
Expand Down

0 comments on commit a14e4ca

Please sign in to comment.