Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rls: Add gauge metric recording #11175

Merged
merged 4 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricRecorder.BatchCallback;
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.Status;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
Expand All @@ -65,6 +69,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -94,6 +99,10 @@ final class CachingRlsLbClient {
private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
private final Registration gaugeRegistration;
private final String metricsInstanceUuid = UUID.randomUUID().toString();

// All cache status changes (pending, backoff, success) must be under this lock
private final Object lock = new Object();
Expand Down Expand Up @@ -138,6 +147,14 @@ final class CachingRlsLbClient {
"Number of LB picks failed due to either a failed RLS request or the RLS channel being "
+ "throttled", "pick", Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
Collections.emptyList(), true);
CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
"Number of entries in the RLS cache", "entry",
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
Collections.emptyList(), true);
CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
"The current size of the RLS cache", "byte",
Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
Collections.emptyList(), true);
}

private CachingRlsLbClient(Builder builder) {
Expand Down Expand Up @@ -202,6 +219,26 @@ private CachingRlsLbClient(Builder builder) {
lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
childLbHelperProvider,
new BackoffRefreshListener());

gaugeRegistration = helper.getMetricRecorder()
.registerBatchCallback(new BatchCallback() {
@Override
public void accept(BatchRecorder recorder) {
int estimatedSize;
long estimatedSizeBytes;
synchronized (lock) {
estimatedSize = linkedHashLruCache.estimatedSize();
estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
}
recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
metricsInstanceUuid), Collections.emptyList());
recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
metricsInstanceUuid), Collections.emptyList());
}
}, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);

logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
}

Expand Down Expand Up @@ -306,6 +343,7 @@ void close() {
pendingCallCache.clear();
rlsChannel.shutdownNow();
rlsPicker.close();
gaugeRegistration.close();
}
}

Expand Down
88 changes: 86 additions & 2 deletions rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import static io.grpc.rls.CachingRlsLbClient.RLS_DATA_KEY;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.base.Converter;
import com.google.common.collect.ImmutableList;
Expand All @@ -47,10 +51,14 @@
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MetricRecorder;
import io.grpc.MetricRecorder.BatchCallback;
import io.grpc.MetricRecorder.BatchRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -95,12 +103,14 @@
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
Expand All @@ -124,6 +134,13 @@ public class CachingRlsLbClientTest {
private SocketAddress socketAddress;
@Mock
private MetricRecorder mockMetricRecorder;
@Mock
private BatchRecorder mockBatchRecorder;
@Mock
private Registration mockGaugeRegistration;
@Captor
private ArgumentCaptor<BatchCallback> gaugeBatchCallbackCaptor;


private final SynchronizationContext syncContext =
new SynchronizationContext(new UncaughtExceptionHandler() {
Expand All @@ -145,7 +162,7 @@ public void uncaughtException(Thread t, Throwable e) {
private final ChildLoadBalancingPolicy childLbPolicy =
new ChildLoadBalancingPolicy("target", Collections.<String, Object>emptyMap(), lbProvider);
private final Helper helper =
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
mock(Helper.class, delegatesTo(new FakeHelper()));
private final FakeThrottler fakeThrottler = new FakeThrottler();
private final LbPolicyConfiguration lbPolicyConfiguration =
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, null, childLbPolicy);
Expand All @@ -168,6 +185,11 @@ private void setUpRlsLbClient() {
.build();
}

@Before
public void setUpMockMetricRecorder() {
when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration);
}

@After
public void tearDown() throws Exception {
rlsLbClient.close();
Expand Down Expand Up @@ -636,6 +658,53 @@ private void setState(ChildPolicyWrapper policyWrapper, ConnectivityState newSta
policyWrapper.getHelper().updateBalancingState(newState, policyWrapper.getPicker());
}

@Test
public void metricGauges() throws ExecutionException, InterruptedException, TimeoutException {
setUpRlsLbClient();

verify(mockMetricRecorder).registerBatchCallback(gaugeBatchCallbackCaptor.capture(),
any());

BatchCallback gaugeBatchCallback = gaugeBatchCallbackCaptor.getValue();

// Verify the correct cache gauge values when requested at this point.
InOrder inOrder = inOrder(mockBatchRecorder);
gaugeBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder).recordLongGauge(
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(0L),
isA(List.class), isA(List.class));
inOrder.verify(mockBatchRecorder)
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
eq(0L), isA(List.class), isA(List.class));

RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(
ImmutableMap.of("server", "bigtable.googleapis.com", "service-key", "foo", "method-key",
"bar"));
rlsServerImpl.setLookupTable(ImmutableMap.of(routeLookupRequest,
RouteLookupResponse.create(ImmutableList.of("target"), "header")));

// Make a request that will populate the cache with an entry
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasData()).isTrue();

// Gauge values should reflect the new cache entry.
gaugeBatchCallback.accept(mockBatchRecorder);
inOrder.verify(mockBatchRecorder).recordLongGauge(
argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_entries")), eq(1L),
isA(List.class), isA(List.class));
inOrder.verify(mockBatchRecorder)
.recordLongGauge(argThat(new LongGaugeInstrumentArgumentMatcher("grpc.lb.rls.cache_size")),
eq(260L), isA(List.class), isA(List.class));

inOrder.verifyNoMoreInteractions();

// Shutdown
rlsLbClient.close();
verify(mockGaugeRegistration).close();
}

private static RouteLookupConfig getRouteLookupConfig() {
return RouteLookupConfig.builder()
.grpcKeybuilders(ImmutableList.of(
Expand Down Expand Up @@ -667,6 +736,21 @@ public long nextBackoffNanos() {
};
}

private static class LongGaugeInstrumentArgumentMatcher implements
ArgumentMatcher<LongGaugeMetricInstrument> {

private final String instrumentName;

public LongGaugeInstrumentArgumentMatcher(String instrumentName) {
this.instrumentName = instrumentName;
}

@Override
public boolean matches(LongGaugeMetricInstrument instrument) {
return instrument.getName().equals(instrumentName);
}
}

private static final class FakeBackoffProvider implements BackoffPolicy.Provider {

private BackoffPolicy nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
Expand Down
16 changes: 6 additions & 10 deletions rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.base.Converter;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -59,6 +59,7 @@
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.MetricInstrument;
import io.grpc.MetricRecorder;
import io.grpc.MetricRecorder.Registration;
import io.grpc.MetricSink;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.NoopMetricSink;
Expand Down Expand Up @@ -128,6 +129,8 @@ public void uncaughtException(Thread t, Throwable e) {
});
@Mock
private MetricRecorder mockMetricRecorder;
@Mock
private Registration mockGaugeRegistration;
private final FakeHelper helperDelegate = new FakeHelper();
private final Helper helper =
mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate));
Expand Down Expand Up @@ -186,6 +189,8 @@ public CachingRlsLbClient.Builder get() {

searchSubchannelArgs = newPickSubchannelArgs(fakeSearchMethod);
rescueSubchannelArgs = newPickSubchannelArgs(fakeRescueMethod);

when(mockMetricRecorder.registerBatchCallback(any(), any())).thenReturn(mockGaugeRegistration);
}

@After
Expand Down Expand Up @@ -226,7 +231,6 @@ public void lb_serverStatusCodeConversion() throws Exception {
assertThat(serverStatus.getDescription()).contains("RLS server returned: ");
assertThat(serverStatus.getDescription()).endsWith("ABORTED: base desc");
assertThat(serverStatus.getDescription()).contains("RLS server conv.test");
verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -290,8 +294,6 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");

verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -351,7 +353,6 @@ public void lb_working_withoutDefaultTarget_noRlsResponse() throws Exception {
inOrder.verify(helper).getChannelTarget();
inOrder.verifyNoMoreInteractions();
verifyFailedPicksCounterAdd(1, 1);
verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand All @@ -377,7 +378,6 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
int times = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2;
verifyLongCounterAdd("grpc.lb.rls.default_target_picks", times, 1,
"defaultTarget", "complete");
verifyNoMoreInteractions(mockMetricRecorder);

Subchannel subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel();
assertThat(subchannelIsReady(subchannel)).isTrue();
Expand Down Expand Up @@ -422,8 +422,6 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(res.getSubchannel()).isNull();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");

verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -499,7 +497,6 @@ public void lb_working_withoutDefaultTarget() throws Exception {
inOrder.verify(helper, atLeast(0)).refreshNameResolution();
inOrder.verifyNoMoreInteractions();
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "fail");
verifyNoMoreInteractions(mockMetricRecorder);
}

@Test
Expand Down Expand Up @@ -542,7 +539,6 @@ public void lb_nameResolutionFailed() throws Exception {
res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
assertThat(res.getStatus().isOk()).isFalse();
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
verifyNoMoreInteractions(mockMetricRecorder);
}

private PickResult markReadyAndGetPickResult(InOrder inOrder,
Expand Down
Loading