From 6e97b180b4d1f94977ec1b91295c99333462dfa0 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 3 Apr 2024 12:22:04 -0700 Subject: [PATCH] rls: Synchronization fixes in CachingRlsLbClient This started with combining handleNewRequest with asyncRlsCall, but that emphasized pre-existing synchronization issues and trying to fix those exposed others. It was hard to split this into smaller commits because they were interconnected. handleNewRequest was combined with asyncRlsCall to use a single code flow for handling the completed future while also failing the pick immediately for thottled requests. That flow was then reused for refreshing after backoff and data stale. It no longer optimizes the RPC completing immediately because that would not happen in real life; it only happens in tests because of inprocess+directExecutor() and we don't want to test a different code flow in tests. This did require updating some of the tests. One small behavior change to share the combined asyncRlsCall with backoff is we now always invalidate an entry after the backoff. Previously the code could replace the entry with its new value in one operation if the asyncRlsCall future completed immediately. That only mattered to a single test which now sees an EXPLICIT eviction. SynchronizationContext used to provide atomic scheduling in BackoffCacheEntry, but it was not guaranteeing the scheduledRunnable was only accessed from the sync context. The same was true for calling up the LB tree with `updateBalancingState()`. In particular, adding entries to the cache during a pick could evict entries without running the cleanup methods within the context, as well as the RLS channel transitioning from TRANSIENT_FAILURE to READY. This was replaced with using a bare Future with a lock to provide atomicity. BackoffCacheEntry no longer uses the current time and instead waits for the backoff timer to actually run before considering itself expired. Previously, it could race with periodic cleanup and get evicted before the timer ran, which would cancel the timer and forget the backoffPolicy. Since the backoff timer invalidates the entry, it is likely useless to claim it ever expires, but that level of behavior was preserved since I didn't look into the LRU cache deeply. propagateRlsError() was moved out of asyncRlsCall because it was not guaranteed to run after the cache was updated. If something was already running on the sync context, then RPCs would hang until another update caused updateBalancingState(). Some methods were moved out of the CacheEntry classes to avoid shared-state mutation in constructors. But if we add something in a factory method, we want to remove it in a sibling method to the factory method, so additional code is moved for symmetry. Moving shared-state mutation ouf of constructors is important because 1) it is surprising and 2) ErrorProne doesn't validate locking within constructors. In general, having shared-state methods in CacheEntries also has the problem that ErrorProne can't validate CachingRlsLbClient calls to CacheEntry. ErrorProne can't know that "lock" is already held because CacheEntry could have been created from a _different instance_ of CachingRlsLbClient and there's no way for us to let ErrorProne prove it is the same instance of "lock". DataCacheEntry still mutates global state that requires a lock in its constructor, but it is less severe of a problem and it requires more choices to address. --- .../java/io/grpc/rls/CachingRlsLbClient.java | 319 +++++++----------- rls/src/main/java/io/grpc/rls/Throttler.java | 23 -- .../io/grpc/rls/CachingRlsLbClientTest.java | 2 +- .../java/io/grpc/rls/RlsLoadBalancerTest.java | 17 +- 4 files changed, 134 insertions(+), 227 deletions(-) diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index a82ca770093..91007c392d1 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -24,7 +24,9 @@ import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Ticker; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.grpc.ChannelLogger; import io.grpc.ChannelLogger.ChannelLogLevel; @@ -38,8 +40,6 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.lookup.v1.RouteLookupServiceGrpc; @@ -54,7 +54,6 @@ import io.grpc.rls.RlsProtoData.RouteLookupConfig; import io.grpc.rls.RlsProtoData.RouteLookupRequest; import io.grpc.rls.RlsProtoData.RouteLookupResponse; -import io.grpc.rls.Throttler.ThrottledException; import io.grpc.stub.StreamObserver; import io.grpc.util.ForwardingLoadBalancerHelper; import java.net.URI; @@ -62,6 +61,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.annotation.CheckReturnValue; @@ -96,7 +96,6 @@ final class CachingRlsLbClient { @GuardedBy("lock") private final Map pendingCallCache = new HashMap<>(); - private final SynchronizationContext synchronizationContext; private final ScheduledExecutorService scheduledExecutorService; private final Ticker ticker; private final Throttler throttler; @@ -118,7 +117,6 @@ final class CachingRlsLbClient { private CachingRlsLbClient(Builder builder) { helper = new RlsLbHelper(checkNotNull(builder.helper, "helper")); scheduledExecutorService = helper.getScheduledExecutorService(); - synchronizationContext = helper.getSynchronizationContext(); lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig"); RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig(); maxAgeNanos = rlsConfig.maxAgeInNanos(); @@ -129,10 +127,11 @@ private CachingRlsLbClient(Builder builder) { linkedHashLruCache = new RlsAsyncLruCache( rlsConfig.cacheSizeBytes(), - builder.evictionListener, + new AutoCleaningEvictionListener(builder.evictionListener), scheduledExecutorService, ticker, - lock); + lock, + helper); logger = helper.getChannelLogger(); String serverHost = null; try { @@ -193,15 +192,19 @@ static Status convertRlsServerStatus(Status status, String serverName) { serverName, status.getCode(), status.getDescription())); } - @CheckReturnValue - private ListenableFuture asyncRlsCall(RouteLookupRequest request) { + /** Populates async cache entry for new request. */ + @GuardedBy("lock") + private CachedRouteLookupResponse asyncRlsCall( + RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) { logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS"); - final SettableFuture response = SettableFuture.create(); if (throttler.shouldThrottle()) { logger.log(ChannelLogLevel.DEBUG, "Request is throttled"); - response.setException(new ThrottledException()); - return response; + // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting + // on this result + return CachedRouteLookupResponse.backoffEntry(createBackOffEntry( + request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy)); } + final SettableFuture response = SettableFuture.create(); io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request); logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest); rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS) @@ -219,7 +222,6 @@ public void onError(Throwable t) { logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t); response.setException(t); throttler.registerBackendResponse(true); - helper.propagateRlsError(); } @Override @@ -228,7 +230,8 @@ public void onCompleted() { throttler.registerBackendResponse(false); } }); - return response; + return CachedRouteLookupResponse.pendingResponse( + createPendingEntry(request, response, backoffPolicy)); } /** @@ -245,7 +248,11 @@ final CachedRouteLookupResponse get(final RouteLookupRequest request) { cacheEntry = linkedHashLruCache.read(request); if (cacheEntry == null) { logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request"); - return handleNewRequest(request); + PendingCacheEntry pendingEntry = pendingCallCache.get(request); + if (pendingEntry != null) { + return CachedRouteLookupResponse.pendingResponse(pendingEntry); + } + return asyncRlsCall(request, /* backoffPolicy= */ null); } if (cacheEntry instanceof DataCacheEntry) { @@ -276,46 +283,86 @@ void close() { } } - /** - * Populates async cache entry for new request. This is only methods directly modifies the cache, - * any status change is happening via event (async request finished, timed out, etc) in {@link - * PendingCacheEntry}, {@link DataCacheEntry} and {@link BackoffCacheEntry}. - */ - private CachedRouteLookupResponse handleNewRequest(RouteLookupRequest request) { + void requestConnection() { + rlsChannel.getState(true); + } + + @GuardedBy("lock") + private PendingCacheEntry createPendingEntry( + RouteLookupRequest request, + ListenableFuture pendingCall, + @Nullable BackoffPolicy backoffPolicy) { + PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy); + // Add the entry to the map before adding the Listener, because the listener removes the + // entry from the map + pendingCallCache.put(request, entry); + // Beware that the listener can run immediately on the current thread + pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor()); + return entry; + } + + private void pendingRpcComplete(PendingCacheEntry entry) { synchronized (lock) { - PendingCacheEntry pendingEntry = pendingCallCache.get(request); - if (pendingEntry != null) { - return CachedRouteLookupResponse.pendingResponse(pendingEntry); + boolean clientClosed = pendingCallCache.remove(entry.request) == null; + if (clientClosed) { + return; } - ListenableFuture asyncCall = asyncRlsCall(request); - if (!asyncCall.isDone()) { - pendingEntry = new PendingCacheEntry(request, asyncCall); - // Add the entry to the map before adding the Listener, because the listener removes the - // entry from the map - pendingCallCache.put(request, pendingEntry); - // Beware that the listener can run immediately on the current thread - asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext); - return CachedRouteLookupResponse.pendingResponse(pendingEntry); - } else { - // async call returned finished future is most likely throttled - try { - RouteLookupResponse response = asyncCall.get(); - DataCacheEntry dataEntry = new DataCacheEntry(request, response); - linkedHashLruCache.cacheAndClean(request, dataEntry); - return CachedRouteLookupResponse.dataEntry(dataEntry); - } catch (Exception e) { - BackoffCacheEntry backoffEntry = - new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get()); - linkedHashLruCache.cacheAndClean(request, backoffEntry); - return CachedRouteLookupResponse.backoffEntry(backoffEntry); - } + try { + createDataEntry(entry.request, Futures.getDone(entry.pendingCall)); + // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to + // reattempt picks when the child LB is done connecting + } catch (Exception e) { + createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy); + // Cache updated. updateBalancingState() to reattempt picks + helper.propagateRlsError(); } } } - void requestConnection() { - rlsChannel.getState(true); + @GuardedBy("lock") + private DataCacheEntry createDataEntry( + RouteLookupRequest request, RouteLookupResponse routeLookupResponse) { + logger.log( + ChannelLogLevel.DEBUG, + "Transition to data cache: routeLookupResponse={0}", + routeLookupResponse); + DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse); + // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until + // this cache update because the lock is held + linkedHashLruCache.cacheAndClean(request, entry); + return entry; + } + + @GuardedBy("lock") + private BackoffCacheEntry createBackOffEntry( + RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) { + logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status); + if (backoffPolicy == null) { + backoffPolicy = backoffProvider.get(); + } + long delayNanos = backoffPolicy.nextBackoffNanos(); + BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy); + // Lock is held, so the task can't execute before the assignment + entry.scheduledFuture = scheduledExecutorService.schedule( + () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS); + linkedHashLruCache.cacheAndClean(request, entry); + logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos", + delayNanos); + return entry; + } + + private void refreshBackoffEntry(BackoffCacheEntry entry) { + synchronized (lock) { + // This checks whether the task has been cancelled and prevents a second execution. + if (!entry.scheduledFuture.cancel(false)) { + // Future was previously cancelled + return; + } + logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending"); + linkedHashLruCache.invalidate(entry.request); + asyncRlsCall(entry.request, entry.backoffPolicy); + } } private static final class RlsLbHelper extends ForwardingLoadBalancerHelper { @@ -353,7 +400,8 @@ public void run() { } void triggerPendingRpcProcessing() { - super.updateBalancingState(state, picker); + helper.getSynchronizationContext().execute( + () -> super.updateBalancingState(state, picker)); } } @@ -455,60 +503,19 @@ public String toString() { } /** A pending cache entry when the async RouteLookup RPC is still on the fly. */ - final class PendingCacheEntry { + static final class PendingCacheEntry { private final ListenableFuture pendingCall; private final RouteLookupRequest request; + @Nullable private final BackoffPolicy backoffPolicy; - PendingCacheEntry( - RouteLookupRequest request, ListenableFuture pendingCall) { - this(request, pendingCall, null); - } - PendingCacheEntry( RouteLookupRequest request, ListenableFuture pendingCall, @Nullable BackoffPolicy backoffPolicy) { this.request = checkNotNull(request, "request"); - this.pendingCall = pendingCall; - this.backoffPolicy = backoffPolicy == null ? backoffProvider.get() : backoffPolicy; - } - - void handleDoneFuture() { - synchronized (lock) { - pendingCallCache.remove(request); - if (pendingCall.isCancelled()) { - return; - } - - try { - transitionToDataEntry(pendingCall.get()); - } catch (Exception e) { - if (e instanceof ThrottledException) { - transitionToBackOff(Status.RESOURCE_EXHAUSTED.withCause(e)); - } else { - transitionToBackOff(Status.fromThrowable(e)); - } - } - } - } - - private void transitionToDataEntry(RouteLookupResponse routeLookupResponse) { - synchronized (lock) { - logger.log( - ChannelLogLevel.DEBUG, - "Transition to data cache: routeLookupResponse={0}", - routeLookupResponse); - linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, routeLookupResponse)); - } - } - - private void transitionToBackOff(Status status) { - synchronized (lock) { - logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status); - linkedHashLruCache.cacheAndClean(request, - new BackoffCacheEntry(request, status, backoffPolicy)); - } + this.pendingCall = checkNotNull(pendingCall, "pendingCall"); + this.backoffPolicy = backoffPolicy; } @Override @@ -541,10 +548,6 @@ final boolean isExpired() { protected long getMinEvictionTime() { return 0L; } - - protected void triggerPendingRpcProcessing() { - helper.triggerPendingRpcProcessing(); - } } /** Implementation of {@link CacheEntry} contains valid data. */ @@ -584,38 +587,14 @@ final class DataCacheEntry extends CacheEntry { * */ void maybeRefresh() { - logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to maybe refresh cache entry"); - synchronized (lock) { - logger.log(ChannelLogLevel.DEBUG, "Lock to maybe refresh cache entry acquired"); + synchronized (lock) { // Lock is already held, but ErrorProne can't tell if (pendingCallCache.containsKey(request)) { // pending already requested logger.log(ChannelLogLevel.DEBUG, "A pending refresh request already created, no need to proceed with refresh"); return; } - final ListenableFuture asyncCall = asyncRlsCall(request); - if (!asyncCall.isDone()) { - logger.log(ChannelLogLevel.DEBUG, - "Async call to rls not yet complete, adding a pending cache entry"); - PendingCacheEntry pendingEntry = new PendingCacheEntry(request, asyncCall); - pendingCallCache.put(request, pendingEntry); - asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext); - } else { - // async call returned finished future is most likely throttled - try { - logger.log(ChannelLogLevel.DEBUG, "Waiting for RLS call to return"); - RouteLookupResponse response = asyncCall.get(); - logger.log(ChannelLogLevel.DEBUG, "RLS call to returned"); - linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - logger.log(ChannelLogLevel.DEBUG, "RLS call failed, adding a backoff entry", e); - BackoffCacheEntry backoffEntry = - new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get()); - linkedHashLruCache.cacheAndClean(request, backoffEntry); - } - } + asyncRlsCall(request, /* backoffPolicy= */ null); } } @@ -701,75 +680,13 @@ public String toString() { private final class BackoffCacheEntry extends CacheEntry { private final Status status; - private final ScheduledHandle scheduledHandle; private final BackoffPolicy backoffPolicy; - private final long expireNanos; - private boolean shutdown = false; + private Future scheduledFuture; BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) { super(request); this.status = checkNotNull(status, "status"); this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy"); - long delayNanos = backoffPolicy.nextBackoffNanos(); - this.expireNanos = ticker.read() + delayNanos; - this.scheduledHandle = - synchronizationContext.schedule( - new Runnable() { - @Override - public void run() { - transitionToPending(); - } - }, - delayNanos, - TimeUnit.NANOSECONDS, - scheduledExecutorService); - logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos", - delayNanos); - } - - /** Forcefully refreshes cache entry by ignoring the backoff timer. */ - void forceRefresh() { - logger.log(ChannelLogLevel.DEBUG, "Forcefully refreshing cache entry"); - if (scheduledHandle.isPending()) { - scheduledHandle.cancel(); - transitionToPending(); - } - } - - private void transitionToPending() { - logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to transition to pending"); - synchronized (lock) { - logger.log(ChannelLogLevel.DEBUG, "Acquired lock to transition to pending"); - if (shutdown) { - logger.log(ChannelLogLevel.DEBUG, "Already shut down, not transitioning to pending"); - return; - } - logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending"); - ListenableFuture call = asyncRlsCall(request); - if (!call.isDone()) { - logger.log(ChannelLogLevel.DEBUG, - "Transition to pending RLS call not done, adding a pending cache entry"); - linkedHashLruCache.invalidate(request); - PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy); - pendingCallCache.put(request, pendingEntry); - call.addListener(pendingEntry::handleDoneFuture, synchronizationContext); - } else { - try { - logger.log(ChannelLogLevel.DEBUG, - "Waiting for transition to pending RLS call response"); - RouteLookupResponse response = call.get(); - linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - logger.log(ChannelLogLevel.DEBUG, - "Transition to pending RLS call failed, creating a backoff entry", e); - linkedHashLruCache.cacheAndClean( - request, - new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy)); - } - } - } } Status getStatus() { @@ -783,18 +700,12 @@ int getSizeBytes() { @Override boolean isExpired(long now) { - return expireNanos - now <= 0; + return scheduledFuture.isDone(); } @Override void cleanup() { - if (shutdown) { - return; - } - shutdown = true; - if (!scheduledHandle.isPending()) { - scheduledHandle.cancel(); - } + scheduledFuture.cancel(false); } @Override @@ -911,18 +822,20 @@ public void registerBackendResponse(boolean throttled) { /** Implementation of {@link LinkedHashLruCache} for RLS. */ private static final class RlsAsyncLruCache extends LinkedHashLruCache { + private final RlsLbHelper helper; RlsAsyncLruCache(long maxEstimatedSizeBytes, @Nullable EvictionListener evictionListener, - ScheduledExecutorService ses, Ticker ticker, Object lock) { + ScheduledExecutorService ses, Ticker ticker, Object lock, RlsLbHelper helper) { super( maxEstimatedSizeBytes, - new AutoCleaningEvictionListener(evictionListener), + evictionListener, 1, TimeUnit.MINUTES, ses, ticker, lock); + this.helper = checkNotNull(helper, "helper"); } @Override @@ -951,7 +864,7 @@ public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) { // force cleanup if new entry pushed cache over max size (in bytes) if (fitToLimit()) { - value.triggerPendingRpcProcessing(); + helper.triggerPendingRpcProcessing(); } return newEntry; } @@ -977,7 +890,7 @@ public void onStatusChanged(ConnectivityState newState) { logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries"); for (CacheEntry value : linkedHashLruCache.values()) { if (value instanceof BackoffCacheEntry) { - ((BackoffCacheEntry) value).forceRefresh(); + refreshBackoffEntry((BackoffCacheEntry) value); } } } @@ -1077,9 +990,11 @@ private void startFallbackChildPolicy() { // GuardedBy CachingRlsLbClient.lock void close() { - logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker"); - if (fallbackChildPolicyWrapper != null) { - refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper); + synchronized (lock) { // Lock is already held, but ErrorProne can't tell + logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker"); + if (fallbackChildPolicyWrapper != null) { + refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper); + } } } diff --git a/rls/src/main/java/io/grpc/rls/Throttler.java b/rls/src/main/java/io/grpc/rls/Throttler.java index 08f54c2e1b3..96d17e70adf 100644 --- a/rls/src/main/java/io/grpc/rls/Throttler.java +++ b/rls/src/main/java/io/grpc/rls/Throttler.java @@ -42,27 +42,4 @@ interface Throttler { * @param throttled specifies whether the request was throttled by the backend. */ void registerBackendResponse(boolean throttled); - - /** - * A ThrottledException indicates the call is throttled. This exception is meant to be used by - * caller of {@link Throttler}, the implementation of Throttler should not throw - * this exception when {@link #shouldThrottle()} is called. - */ - final class ThrottledException extends RuntimeException { - - static final long serialVersionUID = 1L; - - public ThrottledException() { - super(); - } - - public ThrottledException(String s) { - super(s); - } - - @Override - public synchronized Throwable fillInStackTrace() { - return this; - } - } } diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index 61cf4023779..8dd99bff320 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -301,7 +301,7 @@ public void get_throttledAndRecover() throws Exception { fakeClock.forwardTime(10, TimeUnit.MILLISECONDS); // initially backed off entry is backed off again verify(evictionListener) - .onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.REPLACED)); + .onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPLICIT)); resp = getInSyncContext(routeLookupRequest); diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 78f4ee28a52..936539af6bb 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -186,13 +186,18 @@ public void lb_serverStatusCodeConversion() throws Exception { Metadata headers = new Metadata(); PickSubchannelArgsImpl fakeSearchMethodArgs = new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT); + // Warm-up pick; will be queued + PickResult res = picker.pickSubchannel(fakeSearchMethodArgs); + assertThat(res.getStatus().isOk()).isTrue(); + assertThat(res.getSubchannel()).isNull(); + // Cache is warm, but still unconnected picker.pickSubchannel(fakeSearchMethodArgs); // Will create the subchannel FakeSubchannel subchannel = subchannels.peek(); assertThat(subchannel).isNotNull(); // Ensure happy path is unaffected subchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); - PickResult res = picker.pickSubchannel(fakeSearchMethodArgs); + res = picker.pickSubchannel(fakeSearchMethodArgs); assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK); // Check on conversion @@ -213,7 +218,12 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); + // Warm-up pick; will be queued PickResult res = picker.pickSubchannel(searchSubchannelArgs); + assertThat(res.getStatus().isOk()).isTrue(); + assertThat(res.getSubchannel()).isNull(); + // Cache is warm, but still unconnected + res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper, atLeast(0)) .updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); @@ -323,7 +333,12 @@ public void lb_working_withoutDefaultTarget() throws Exception { .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); Metadata headers = new Metadata(); + // Warm-up pick; will be queued PickResult res = picker.pickSubchannel(searchSubchannelArgs); + assertThat(res.getStatus().isOk()).isTrue(); + assertThat(res.getSubchannel()).isNull(); + // Cache is warm, but still unconnected + res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); inOrder.verify(helper, atLeast(0)) .updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class));