From a14e4ca20ff73bf8429cfa3aae495f080155348d Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Mon, 27 Nov 2023 16:38:56 -0800 Subject: [PATCH] fix eric's 1 coment --- .../java/io/grpc/xds/ControlPlaneClient.java | 19 ++++++++----------- .../main/java/io/grpc/xds/XdsClientImpl.java | 10 ++++++---- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java index d0d6feea376..f11ff953027 100644 --- a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java @@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; -import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Any; import com.google.rpc.Code; import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; @@ -290,9 +289,9 @@ void onFanOut(int count) { } } - SettableFuture createFlowControlCallBack() { + Runnable getFlowControlCallback() { if (adsStream != null) { - return adsStream.createFlowControlCb(); + return adsStream.getFlowControlCallback(); } return null; } @@ -329,10 +328,8 @@ void onFanOutDone() { } } - SettableFuture callback() { - SettableFuture callback = SettableFuture.create(); - callback.addListener(callbackRunnable, syncContext); - return callback; + Runnable getCallbackRunnable() { + return callbackRunnable; } } @@ -359,7 +356,7 @@ private abstract class AbstractAdsStream { * For xDS stream flow control. Sending each watcher notification increment the counter * {@link #onFanOut(int)}. * Processing completion on each watcher decrement the counter via the callback - * {@link #createFlowControlCb}. When all the watchers subscribed to the resources in the + * {@link #getFlowControlCallback}. When all the watchers subscribed to the resources in the * response have been notified {@link #onFanOutDone} and counter reaches zero, ads stream is * ready to receive the next message. */ @@ -367,7 +364,7 @@ private abstract class AbstractAdsStream { abstract void onResourceFanOut(int count); - abstract SettableFuture createFlowControlCb(); + abstract Runnable getFlowControlCallback(); /** * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and @@ -568,8 +565,8 @@ void onResourceFanOut(int count) { } @Override - SettableFuture createFlowControlCb() { - return flowControlCounter.callback(); + Runnable getFlowControlCallback() { + return flowControlCounter.getCallbackRunnable(); } @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 848fd1a564d..6e7f3326a64 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -568,15 +568,17 @@ void addWatcher(ResourceWatcher watcher, SynchronizationContext watcherSyncCo if (xdsChannel != null) { xdsChannel.onFanOut(1); } + T savedData = data; + boolean savedAbsent = absent; watchers.get(watcher).execute(() -> { try { if (errorDescription != null) { watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription)); return; } - if (data != null) { + if (savedData != null) { notifyWatcher(watcher, data); - } else if (absent) { + } else if (savedAbsent) { watcher.onResourceDoesNotExist(resource); } } finally { @@ -587,9 +589,9 @@ void addWatcher(ResourceWatcher watcher, SynchronizationContext watcherSyncCo private void callbackFlowControl() { if (xdsChannel != null) { - SettableFuture cb = xdsChannel.createFlowControlCallBack(); + Runnable cb = xdsChannel.getFlowControlCallback(); if (cb != null) { - cb.set(null); + cb.run(); } } }