diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index f1bab25d87d..ae9336eb98d 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -716,6 +716,13 @@ public boolean isDrop() { return drop; } + /** + * Returns {@code true} if the pick was not created with {@link #withNoResult()}. + */ + public boolean hasResult() { + return !(subchannel == null && status.isOk()); + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index f3ef4f323f2..54c4c411fe5 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -24,6 +24,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Ticker; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -36,9 +37,11 @@ import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LongCounterMetricInstrument; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; +import io.grpc.MetricInstrumentRegistry; import io.grpc.Status; import io.grpc.internal.BackoffPolicy; import io.grpc.internal.ExponentialBackoffPolicy; @@ -87,6 +90,10 @@ final class CachingRlsLbClient { /** Minimum bytes for a Java Object. */ public static final int OBJ_OVERHEAD_B = 16; + private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER; + private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER; + private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER; + // All cache status changes (pending, backoff, success) must be under this lock private final Object lock = new Object(); // LRU cache based on access order (BACKOFF and actual data will be here) @@ -115,6 +122,23 @@ final class CachingRlsLbClient { private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory; private final ChannelLogger logger; + static { + MetricInstrumentRegistry metricInstrumentRegistry + = MetricInstrumentRegistry.getDefaultRegistry(); + DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter( + "grpc.lb.rls.default_target_picks", "Number of LB picks sent to the default target", "pick", + Lists.newArrayList("grpc.target", "grpc.lb.rls.server_target", + "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Lists.newArrayList(), true); + TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks", + "Number of LB picks sent to each RLS target", "pick", + Lists.newArrayList("grpc.target", "grpc.lb.rls.server_target", + "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Lists.newArrayList(), true); + FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks", + "Number of LB picks failed due to either a failed RLS request or the RLS channel being " + + "throttled", "pick", Lists.newArrayList("grpc.target", "grpc.lb.rls.server_target"), + Lists.newArrayList(), true); + } + private CachingRlsLbClient(Builder builder) { helper = new RlsLbHelper(checkNotNull(builder.helper, "helper")); scheduledExecutorService = helper.getScheduledExecutorService(); @@ -147,7 +171,7 @@ private CachingRlsLbClient(Builder builder) { } RlsRequestFactory requestFactory = new RlsRequestFactory( lbPolicyConfig.getRouteLookupConfig(), serverHost); - rlsPicker = new RlsPicker(requestFactory); + rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService()); // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the // RLS server using the same authority as the backends, even though the RLS server’s addresses // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is @@ -904,9 +928,11 @@ public void onStatusChanged(ConnectivityState newState) { final class RlsPicker extends SubchannelPicker { private final RlsRequestFactory requestFactory; + private final String lookupService; - RlsPicker(RlsRequestFactory requestFactory) { + RlsPicker(RlsRequestFactory requestFactory, String lookupService) { this.requestFactory = checkNotNull(requestFactory, "requestFactory"); + this.lookupService = checkNotNull(lookupService, "rlsConfig"); } @Override @@ -941,7 +967,14 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } // Happy path logger.log(ChannelLogLevel.DEBUG, "Returning PickResult"); - return picker.pickSubchannel(args); + PickResult pickResult = picker.pickSubchannel(args); + // TODO: include the "grpc.target" label once target is available here. + if (pickResult.hasResult()) { + helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1, + Lists.newArrayList("", lookupService, childPolicyWrapper.getTarget(), + determineMetricsPickResult(pickResult)), Lists.newArrayList()); + } + return pickResult; } else if (response.hasError()) { logger.log(ChannelLogLevel.DEBUG, "RLS response has errors"); if (hasFallback) { @@ -949,6 +982,9 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { return useFallback(args); } logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error"); + // TODO: include the "grpc.target" label once target is available here. + helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1, + Lists.newArrayList("", lookupService), Lists.newArrayList()); return PickResult.withError( convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); @@ -969,7 +1005,24 @@ private PickResult useFallback(PickSubchannelArgs args) { if (picker == null) { return PickResult.withNoResult(); } - return picker.pickSubchannel(args); + PickResult pickResult = picker.pickSubchannel(args); + if (pickResult.hasResult()) { + // TODO: include the grpc.target label once target is available here. + helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1, + Lists.newArrayList("", lookupService, fallbackChildPolicyWrapper.getTarget(), + determineMetricsPickResult(pickResult)), Lists.newArrayList()); + } + return pickResult; + } + + private String determineMetricsPickResult(PickResult pickResult) { + if (pickResult.getStatus().isOk()) { + return "complete"; + } else if (pickResult.isDrop()) { + return "drop"; + } else { + return "fail"; + } } private void startFallbackChildPolicy() { diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index 9fc59e7aa33..362e961fb79 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -50,6 +50,7 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; +import io.grpc.MetricRecorder; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; import io.grpc.Status.Code; @@ -121,6 +122,8 @@ public class CachingRlsLbClientTest { private EvictionListener evictionListener; @Mock private SocketAddress socketAddress; + @Mock + private MetricRecorder mockMetricRecorder; private final SynchronizationContext syncContext = new SynchronizationContext(new UncaughtExceptionHandler() { @@ -892,6 +895,11 @@ public SynchronizationContext getSynchronizationContext() { public ChannelLogger getChannelLogger() { return mock(ChannelLogger.class); } + + @Override + public MetricRecorder getMetricRecorder() { + return mockMetricRecorder; + } } private static final class FakeThrottler implements Throttler { diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 17f9a3584c5..2137dfeb3f5 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -20,15 +20,19 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.base.Converter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.ChannelCredentials; @@ -46,14 +50,17 @@ import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelStateListener; +import io.grpc.LongCounterMetricInstrument; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.Marshaller; import io.grpc.MethodDescriptor.MethodType; +import io.grpc.MetricRecorder; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; @@ -86,6 +93,7 @@ import org.junit.runners.JUnit4; import org.mockito.AdditionalAnswers; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; @@ -108,6 +116,8 @@ public void uncaughtException(Thread t, Throwable e) { throw new RuntimeException(e); } }); + @Mock + private MetricRecorder mockMetricRecorder; private final FakeHelper helperDelegate = new FakeHelper(); private final Helper helper = mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate)); @@ -195,6 +205,7 @@ public void lb_serverStatusCodeConversion() throws Exception { subchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); res = picker.pickSubchannel(fakeSearchMethodArgs); assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); // Check on conversion Throwable cause = new Throwable("cause"); @@ -205,6 +216,7 @@ public void lb_serverStatusCodeConversion() throws Exception { assertThat(serverStatus.getDescription()).contains("RLS server returned: "); assertThat(serverStatus.getDescription()).endsWith("ABORTED: base desc"); assertThat(serverStatus.getDescription()).contains("RLS server conv.test"); + verifyNoMoreInteractions(mockMetricRecorder); } @Test @@ -226,6 +238,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); inOrder.verifyNoMoreInteractions(); + assertThat(res.getStatus().isOk()).isTrue(); assertThat(subchannels).hasSize(1); FakeSubchannel searchSubchannel = subchannels.getLast(); @@ -238,6 +251,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { res = picker.pickSubchannel(searchSubchannelArgs); assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); assertThat(res.getSubchannel()).isSameInstanceAs(searchSubchannel); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); // rescue should be pending status although the overall channel state is READY res = picker.pickSubchannel(rescueSubchannelArgs); @@ -262,6 +276,29 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail"); + + verifyNoMoreInteractions(mockMetricRecorder); + } + + @Test + public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception { + defaultTarget = ""; + fakeThrottler.nextResult = true; + + deliverResolvedAddresses(); + InOrder inOrder = inOrder(helper); + inOrder.verify(helper) + .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + + // With no RLS response and no fallback, we should see a failure + PickResult res = picker.pickSubchannel(searchSubchannelArgs); // create subchannel + assertThat(res.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + inOrder.verify(helper).getMetricRecorder(); + inOrder.verifyNoMoreInteractions(); + verifyFailedPicksCounterAdd(1, 1); + verifyNoMoreInteractions(mockMetricRecorder); } @Test @@ -281,15 +318,20 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception { (FakeSubchannel) markReadyAndGetPickResult(inOrder, searchSubchannelArgs).getSubchannel(); assertThat(fallbackSubchannel).isNotNull(); assertThat(subchannelIsReady(fallbackSubchannel)).isTrue(); + inOrder.verify(helper).getMetricRecorder(); inOrder.verifyNoMoreInteractions(); + verifyLongCounterAdd("grpc.lb.rls.default_target_picks", 1, 1, "defaultTarget", "complete"); + verifyNoMoreInteractions(mockMetricRecorder); Subchannel subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel(); assertThat(subchannelIsReady(subchannel)).isTrue(); assertThat(subchannel).isSameInstanceAs(fallbackSubchannel); + verifyLongCounterAdd("grpc.lb.rls.default_target_picks", 2, 1, "defaultTarget", "complete"); subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel(); assertThat(subchannelIsReady(subchannel)).isTrue(); assertThat(subchannel).isSameInstanceAs(fallbackSubchannel); + verifyLongCounterAdd("grpc.lb.rls.default_target_picks", 3, 1, "defaultTarget", "complete"); // Make sure that when RLS starts communicating that default stops being used fakeThrottler.nextResult = false; @@ -300,6 +342,7 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception { (FakeSubchannel) markReadyAndGetPickResult(inOrder, searchSubchannelArgs).getSubchannel(); assertThat(searchSubchannel).isNotNull(); assertThat(searchSubchannel).isNotSameInstanceAs(fallbackSubchannel); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); // create rescue subchannel picker.pickSubchannel(rescueSubchannelArgs); @@ -308,6 +351,7 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception { assertThat(rescueSubchannel).isNotNull(); assertThat(rescueSubchannel).isNotSameInstanceAs(fallbackSubchannel); assertThat(rescueSubchannel).isNotSameInstanceAs(searchSubchannel); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "civilization", "complete"); // all channels are failed rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); @@ -318,6 +362,9 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception { searchSubchannelArgs); assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(res.getSubchannel()).isNull(); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail"); + + verifyNoMoreInteractions(mockMetricRecorder); } @Test @@ -345,6 +392,7 @@ public void lb_working_withoutDefaultTarget() throws Exception { assertThat(subchannels).hasSize(1); FakeSubchannel searchSubchannel = (FakeSubchannel) markReadyAndGetPickResult(inOrder, searchSubchannelArgs).getSubchannel(); + inOrder.verify(helper).getMetricRecorder(); inOrder.verifyNoMoreInteractions(); assertThat(subchannelIsReady(searchSubchannel)).isTrue(); assertThat(subchannels.getLast()).isSameInstanceAs(searchSubchannel); @@ -373,11 +421,13 @@ public void lb_working_withoutDefaultTarget() throws Exception { res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod)); assertThat(res.getStatus().isOk()).isFalse(); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); res = picker.pickSubchannel(newPickSubchannelArgs(fakeRescueMethod)); assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); assertThat(res.getSubchannel().getAddresses()).isEqualTo(rescueSubchannel.getAddresses()); assertThat(res.getSubchannel().getAttributes()).isEqualTo(rescueSubchannel.getAttributes()); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "civilization", "complete"); // all channels are failed rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND)); @@ -385,6 +435,8 @@ public void lb_working_withoutDefaultTarget() throws Exception { .updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); inOrder.verify(helper, atLeast(0)).refreshNameResolution(); inOrder.verifyNoMoreInteractions(); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail"); + verifyNoMoreInteractions(mockMetricRecorder); } @Test @@ -413,7 +465,9 @@ public void lb_nameResolutionFailed() throws Exception { assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses()); assertThat(res.getSubchannel().getAttributes()).isEqualTo(searchSubchannel.getAttributes()); + verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete"); + inOrder.verify(helper).getMetricRecorder(); inOrder.verifyNoMoreInteractions(); rlsLb.handleNameResolutionError(Status.UNAVAILABLE); @@ -424,6 +478,7 @@ public void lb_nameResolutionFailed() throws Exception { res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod)); assertThat(res.getStatus().isOk()).isFalse(); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); + verifyNoMoreInteractions(mockMetricRecorder); } private PickResult markReadyAndGetPickResult(InOrder inOrder, @@ -492,6 +547,36 @@ private String getRlsConfigJsonStr() { + "}"; } + // Verifies that the MetricRecorder has been called to record a long counter value of 1 for the + // given metric name, the given number of times + private void verifyLongCounterAdd(String name, int times, long value, + String dataPlaneTargetLabel, String pickResult) { + // TODO: support the "grpc.target" label once available. + verify(mockMetricRecorder, times(times)).addLongCounter( + argThat(new ArgumentMatcher() { + @Override + public boolean matches(LongCounterMetricInstrument longCounterInstrument) { + return longCounterInstrument.getName().equals(name); + } + }), eq(value), + eq(Lists.newArrayList("", "localhost:8972", dataPlaneTargetLabel, pickResult)), + eq(Lists.newArrayList())); + } + + // This one is for verifying the failed_pick metric specifically. + private void verifyFailedPicksCounterAdd(int times, long value) { + // TODO: support the "grpc.target" label once available. + verify(mockMetricRecorder, times(times)).addLongCounter( + argThat(new ArgumentMatcher() { + @Override + public boolean matches(LongCounterMetricInstrument longCounterInstrument) { + return longCounterInstrument.getName().equals("grpc.lb.rls.failed_picks"); + } + }), eq(value), + eq(Lists.newArrayList("", "localhost:8972")), + eq(Lists.newArrayList())); + } + private PickSubchannelArgs newPickSubchannelArgs(MethodDescriptor method) { return new PickSubchannelArgsImpl( method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {}); @@ -585,6 +670,11 @@ public SynchronizationContext getSynchronizationContext() { public ChannelLogger getChannelLogger() { return mock(ChannelLogger.class); } + + @Override + public MetricRecorder getMetricRecorder() { + return mockMetricRecorder; + } } private static final class FakeRlsServerImpl