diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index 5263975b49d..ad30674cf17 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -37,10 +37,14 @@ import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LongCounterMetricInstrument; +import io.grpc.LongGaugeMetricInstrument; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder.BatchCallback; +import io.grpc.MetricRecorder.BatchRecorder; +import io.grpc.MetricRecorder.Registration; import io.grpc.Status; import io.grpc.internal.BackoffPolicy; import io.grpc.internal.ExponentialBackoffPolicy; @@ -65,6 +69,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -94,6 +99,10 @@ final class CachingRlsLbClient { private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER; private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER; private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER; + private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE; + private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE; + private final Registration gaugeRegistration; + private final String metricsInstanceUuid = UUID.randomUUID().toString(); // All cache status changes (pending, backoff, success) must be under this lock private final Object lock = new Object(); @@ -138,6 +147,14 @@ final class CachingRlsLbClient { "Number of LB picks failed due to either a failed RLS request or the RLS channel being " + "throttled", "pick", Arrays.asList("grpc.target", "grpc.lb.rls.server_target"), Collections.emptyList(), true); + CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries", + "Number of entries in the RLS cache", "entry", + Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"), + Collections.emptyList(), true); + CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size", + "The current size of the RLS cache", "byte", + Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"), + Collections.emptyList(), true); } private CachingRlsLbClient(Builder builder) { @@ -202,6 +219,26 @@ private CachingRlsLbClient(Builder builder) { lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory, childLbHelperProvider, new BackoffRefreshListener()); + + gaugeRegistration = helper.getMetricRecorder() + .registerBatchCallback(new BatchCallback() { + @Override + public void accept(BatchRecorder recorder) { + int estimatedSize; + long estimatedSizeBytes; + synchronized (lock) { + estimatedSize = linkedHashLruCache.estimatedSize(); + estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes(); + } + recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize, + Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(), + metricsInstanceUuid), Collections.emptyList()); + recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes, + Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(), + metricsInstanceUuid), Collections.emptyList()); + } + }, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE); + logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created"); } @@ -306,6 +343,7 @@ void close() { pendingCallCache.clear(); rlsChannel.shutdownNow(); rlsPicker.close(); + gaugeRegistration.close(); } } diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index 9cef95c2a95..7c5df2c96b3 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -23,12 +23,15 @@ import static io.grpc.rls.CachingRlsLbClient.RLS_DATA_KEY; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.base.Converter; import com.google.common.collect.ImmutableList; @@ -47,10 +50,14 @@ import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; +import io.grpc.LongGaugeMetricInstrument; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MetricRecorder; +import io.grpc.MetricRecorder.BatchCallback; +import io.grpc.MetricRecorder.BatchRecorder; +import io.grpc.MetricRecorder.Registration; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; import io.grpc.Status.Code; @@ -95,12 +102,14 @@ import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.AdditionalAnswers; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; @@ -124,6 +133,13 @@ public class CachingRlsLbClientTest { private SocketAddress socketAddress; @Mock private MetricRecorder mockMetricRecorder; + @Mock + private BatchRecorder mockBatchRecorder; + @Mock + private Registration mockGaugeRegistration; + @Captor + private ArgumentCaptor gaugeBatchCallbackCaptor; + private final SynchronizationContext syncContext = new SynchronizationContext(new UncaughtExceptionHandler() { @@ -145,7 +161,7 @@ public void uncaughtException(Thread t, Throwable e) { private final ChildLoadBalancingPolicy childLbPolicy = new ChildLoadBalancingPolicy("target", Collections.emptyMap(), lbProvider); private final Helper helper = - mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper())); + mock(Helper.class, delegatesTo(new FakeHelper())); private final FakeThrottler fakeThrottler = new FakeThrottler(); private final LbPolicyConfiguration lbPolicyConfiguration = new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, null, childLbPolicy); @@ -168,6 +184,11 @@ private void setUpRlsLbClient() { .build(); } + @Before + public void setUpMockMetricRecorder() { + when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration); + } + @After public void tearDown() throws Exception { rlsLbClient.close(); @@ -636,6 +657,51 @@ private void setState(ChildPolicyWrapper policyWrapper, ConnectivityState newSta policyWrapper.getHelper().updateBalancingState(newState, policyWrapper.getPicker()); } + @Test + public void metricGauges() throws ExecutionException, InterruptedException, TimeoutException { + setUpRlsLbClient(); + + verify(mockMetricRecorder).registerBatchCallback(gaugeBatchCallbackCaptor.capture(), + any()); + + BatchCallback gaugeBatchCallback = gaugeBatchCallbackCaptor.getValue(); + + // Verify the correct cache gauge values when requested at this point. + InOrder inOrder = inOrder(mockBatchRecorder); + gaugeBatchCallback.accept(mockBatchRecorder); + inOrder.verify(mockBatchRecorder).recordLongGauge( + argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(0L), + any(), any()); + inOrder.verify(mockBatchRecorder) + .recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")), + eq(0L), any(), any()); + + RouteLookupRequest routeLookupRequest = RouteLookupRequest.create( + ImmutableMap.of("server", "bigtable.googleapis.com", "service-key", "foo", "method-key", + "bar")); + rlsServerImpl.setLookupTable(ImmutableMap.of(routeLookupRequest, + RouteLookupResponse.create(ImmutableList.of("target"), "header"))); + + // Make a request that will populate the cache with an entry + getInSyncContext(routeLookupRequest); + fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + // Gauge values should reflect the new cache entry. + gaugeBatchCallback.accept(mockBatchRecorder); + inOrder.verify(mockBatchRecorder).recordLongGauge( + argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(1L), + any(), any()); + inOrder.verify(mockBatchRecorder) + .recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")), + eq(260L), any(), any()); + + inOrder.verifyNoMoreInteractions(); + + // Shutdown + rlsLbClient.close(); + verify(mockGaugeRegistration).close(); + } + private static RouteLookupConfig getRouteLookupConfig() { return RouteLookupConfig.builder() .grpcKeybuilders(ImmutableList.of( @@ -667,6 +733,21 @@ public long nextBackoffNanos() { }; } + private static class LongGaugeInstrumentArgumentMatcher implements + ArgumentMatcher { + + private final String instrumentName; + + public LongGaugeInstrumentArgumentMatcher(String instrumentName) { + this.instrumentName = instrumentName; + } + + @Override + public boolean matches(LongGaugeMetricInstrument instrument) { + return instrument.getName().equals(instrumentName); + } + } + private static final class FakeBackoffProvider implements BackoffPolicy.Provider { private BackoffPolicy nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS); diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 1f46b86db60..ce2926919ba 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -28,7 +28,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import com.google.common.base.Converter; import com.google.common.collect.ImmutableList; @@ -59,6 +59,7 @@ import io.grpc.MethodDescriptor.MethodType; import io.grpc.MetricInstrument; import io.grpc.MetricRecorder; +import io.grpc.MetricRecorder.Registration; import io.grpc.MetricSink; import io.grpc.NameResolver.ConfigOrError; import io.grpc.NoopMetricSink; @@ -128,6 +129,8 @@ public void uncaughtException(Thread t, Throwable e) { }); @Mock private MetricRecorder mockMetricRecorder; + @Mock + private Registration mockGaugeRegistration; private final FakeHelper helperDelegate = new FakeHelper(); private final Helper helper = mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate)); @@ -186,6 +189,8 @@ public CachingRlsLbClient.Builder get() { searchSubchannelArgs = newPickSubchannelArgs(fakeSearchMethod); rescueSubchannelArgs = newPickSubchannelArgs(fakeRescueMethod); + + when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration); } @After @@ -226,7 +231,6 @@ public void lb_serverStatusCodeConversion() throws Exception { assertThat(serverStatus.getDescription()).contains("RLS server returned: "); assertThat(serverStatus.getDescription()).endsWith("ABORTED: base desc"); assertThat(serverStatus.getDescription()).contains("RLS server conv.test"); - verifyNoMoreInteractions(mockMetricRecorder); } @Test @@ -290,8 +294,6 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail"); - - verifyNoMoreInteractions(mockMetricRecorder); } @Test @@ -351,7 +353,6 @@ public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception { inOrder.verify(helper).getChannelTarget(); inOrder.verifyNoMoreInteractions(); verifyFailedPicksCounterAdd(1, 1); - verifyNoMoreInteractions(mockMetricRecorder); } @Test @@ -377,7 +378,6 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception { int times = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; verifyLongCounterAdd("grpc.lb.rls.default_target_picks", times, 1, "defaultTarget", "complete"); - verifyNoMoreInteractions(mockMetricRecorder); Subchannel subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel(); assertThat(subchannelIsReady(subchannel)).isTrue(); @@ -422,8 +422,6 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception { assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(res.getSubchannel()).isNull(); verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail"); - - verifyNoMoreInteractions(mockMetricRecorder); } @Test @@ -499,7 +497,6 @@ public void lb_working_withoutDefaultTarget() throws Exception { inOrder.verify(helper, atLeast(0)).refreshNameResolution(); inOrder.verifyNoMoreInteractions(); verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail"); - verifyNoMoreInteractions(mockMetricRecorder); } @Test @@ -542,7 +539,6 @@ public void lb_nameResolutionFailed() throws Exception { res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod)); assertThat(res.getStatus().isOk()).isFalse(); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); - verifyNoMoreInteractions(mockMetricRecorder); } private PickResult markReadyAndGetPickResult(InOrder inOrder,