Skip to content

Commit

Permalink
Merge pull request #1081 from stanley-cheung/add-instrumentation-metr…
Browse files Browse the repository at this point in the history
…ic-tag

Add instrumentation tags to grpc metrics
  • Loading branch information
stanley-cheung authored Apr 1, 2024
2 parents de71ce3 + 2516559 commit 8239f8a
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.micrometer.core.instrument.Tags;
import net.devh.boot.grpc.common.util.Constants;

/**
* Provides factories for {@link io.grpc.StreamTracer} that records metrics.
Expand All @@ -47,6 +48,8 @@
final class MetricsClientStreamTracers {
private static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = Stopwatch::createUnstarted;
private final Supplier<Stopwatch> stopwatchSupplier;
private static final String INSTRUMENTATION_SOURCE_TAG_KEY = "instrumentation_source";
private static final String INSTRUMENTATION_VERSION_TAG_KEY = "instrumentation_version";

MetricsClientStreamTracers() {
this(STOPWATCH_SUPPLIER);
Expand Down Expand Up @@ -127,7 +130,10 @@ public void streamClosed(Status status) {

void recordFinishedAttempt() {
Tags attemptMetricTags =
Tags.of("grpc.method", fullMethodName, "grpc.status", statusCode.toString());
Tags.of("grpc.method", fullMethodName,
"grpc.status", statusCode.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION);
this.metricsClientMeters.getClientAttemptDuration()
.withTags(attemptMetricTags)
.record(attemptNanos, TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -168,7 +174,9 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory

// Record here in case newClientStreamTracer() would never be called.
this.metricsClientMeters.getAttemptCounter()
.withTags(Tags.of("grpc.method", fullMethodName))
.withTags(Tags.of("grpc.method", fullMethodName,
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION))
.increment();
}

Expand All @@ -188,7 +196,9 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada
// attempt, as first attempt cannot be a transparent retry.
if (attemptsPerCall.get() > 0) {
this.metricsClientMeters.getAttemptCounter()
.withTags((Tags.of("grpc.method", fullMethodName)))
.withTags((Tags.of("grpc.method", fullMethodName,
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION)))
.increment();
}
if (!info.isTransparentRetry()) {
Expand Down Expand Up @@ -248,7 +258,10 @@ void recordFinishedCall() {
}
callLatencyNanos = clientCallStopWatch.elapsed(TimeUnit.NANOSECONDS);
Tags clientCallMetricTags =
Tags.of("grpc.method", this.fullMethodName, "grpc.status", status.getCode().toString());
Tags.of("grpc.method", this.fullMethodName,
"grpc.status", status.getCode().toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION);
this.metricsClientMeters.getClientCallDuration()
.withTags(clientCallMetricTags)
.record(callLatencyNanos, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import net.devh.boot.grpc.client.metrics.MetricsClientStreamTracers.CallAttemptsTracerFactory;
import net.devh.boot.grpc.common.util.Constants;

/**
* Tests for {@link MetricsClientStreamTracers}.
Expand All @@ -61,6 +62,10 @@ class MetricsClientStreamTracersTest {
private static final String GRPC_METHOD_TAG_KEY = "grpc.method";
private static final String GRPC_STATUS_TAG_KEY = "grpc.status";
private static final String FULL_METHOD_NAME = "package1.service1/method1";
private static final String INSTRUMENTATION_SOURCE_TAG_KEY = "instrumentation_source";
private static final String INSTRUMENTATION_SOURCE_TAG_VALUE = Constants.LIBRARY_NAME;
private static final String INSTRUMENTATION_VERSION_TAG_KEY = "instrumentation_version";
private static final String INSTRUMENTATION_VERSION_TAG_VALUE = Constants.VERSION;

private static class StringInputStream extends InputStream {
final String string;
Expand Down Expand Up @@ -125,6 +130,8 @@ void clientBasicMetrics() {

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(1);

Expand All @@ -146,11 +153,16 @@ void clientBasicMetrics() {

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(1);

Tags expectedTags =
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME, GRPC_STATUS_TAG_KEY, Status.Code.OK.toString());
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME,
GRPC_STATUS_TAG_KEY, Status.Code.OK.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE,
INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE);

HistogramSnapshot attemptDurationSnapshot = meterRegistry.get(CLIENT_ATTEMPT_DURATION)
.tags(expectedTags)
Expand Down Expand Up @@ -200,6 +212,8 @@ void recordAttemptMetrics() {

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(1);

Expand All @@ -213,10 +227,15 @@ void recordAttemptMetrics() {
tracer.streamClosed(Status.UNAVAILABLE);

Tags expectedUnailableStatusTags =
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME, GRPC_STATUS_TAG_KEY, Status.Code.UNAVAILABLE.toString());
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME,
GRPC_STATUS_TAG_KEY, Status.Code.UNAVAILABLE.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE,
INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE);

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(1);
assertThat(meterRegistry.get(CLIENT_ATTEMPT_DURATION)
Expand Down Expand Up @@ -248,10 +267,15 @@ void recordAttemptMetrics() {
tracer.streamClosed(Status.NOT_FOUND);

Tags expectedNotFoundStatusTags =
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME, GRPC_STATUS_TAG_KEY, Status.Code.NOT_FOUND.toString());
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME,
GRPC_STATUS_TAG_KEY, Status.Code.NOT_FOUND.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE,
INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE);

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(2);

Expand Down Expand Up @@ -290,6 +314,8 @@ void recordAttemptMetrics() {

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(3);

Expand Down Expand Up @@ -342,10 +368,15 @@ void recordAttemptMetrics() {
callAttemptsTracerFactory.callEnded(Status.OK);

Tags expectedOKStatusTags =
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME, GRPC_STATUS_TAG_KEY, Status.Code.OK.toString());
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME,
GRPC_STATUS_TAG_KEY, Status.Code.OK.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE,
INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE);

assertThat(meterRegistry.get(CLIENT_ATTEMPT_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(4);
assertThat(meterRegistry.get(CLIENT_ATTEMPT_DURATION)
Expand Down Expand Up @@ -388,8 +419,10 @@ void clientStreamNeverCreatedStillRecordMetrics() {
callAttemptsTracerFactory.callEnded(status);

Tags expectedDeadlineExceededStatusTags =
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME, GRPC_STATUS_TAG_KEY,
Status.Code.DEADLINE_EXCEEDED.toString());
Tags.of(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME,
GRPC_STATUS_TAG_KEY, Status.Code.DEADLINE_EXCEEDED.toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE,
INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE);

HistogramSnapshot attemptDurationSnapshot = meterRegistry.get(CLIENT_ATTEMPT_DURATION)
.tags(expectedDeadlineExceededStatusTags)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2016-2023 The gRPC-Spring Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.devh.boot.grpc.common.util;

/**
* Class that contains shared constants.
*/
public final class Constants {

/**
* A constant that defines the current version of the library.
*/
public static final String VERSION = "v" + Constants.class.getPackage().getImplementationVersion();

/**
* A constant that defines the library name that can be used as metric tags.
*/
public static final String LIBRARY_NAME = "grpc-spring";

private Constants() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.grpc.Status;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import net.devh.boot.grpc.common.util.Constants;

/**
* Provides factories for {@link io.grpc.StreamTracer} that records metrics.
Expand All @@ -45,6 +46,8 @@ public final class MetricsServerStreamTracers {

private static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = Stopwatch::createUnstarted;
private final Supplier<Stopwatch> stopwatchSupplier;
private static final String INSTRUMENTATION_SOURCE_TAG_KEY = "instrumentation_source";
private static final String INSTRUMENTATION_VERSION_TAG_KEY = "instrumentation_version";

public MetricsServerStreamTracers() {
this(STOPWATCH_SUPPLIER);
Expand Down Expand Up @@ -100,7 +103,9 @@ private static final class ServerTracer extends ServerStreamTracer {
@Override
public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
this.metricsServerMeters.getServerCallCounter()
.withTags(Tags.of("grpc.method", this.fullMethodName))
.withTags(Tags.of("grpc.method", this.fullMethodName,
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION))
.increment();
}

Expand All @@ -122,7 +127,10 @@ public void streamClosed(Status status) {
long callLatencyNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);

Tags serverMetricTags =
Tags.of("grpc.method", this.fullMethodName, "grpc.status", status.getCode().toString());
Tags.of("grpc.method", this.fullMethodName,
"grpc.status", status.getCode().toString(),
INSTRUMENTATION_SOURCE_TAG_KEY, Constants.LIBRARY_NAME,
INSTRUMENTATION_VERSION_TAG_KEY, Constants.VERSION);
this.metricsServerMeters.getServerCallDuration()
.withTags(serverMetricTags)
.record(callLatencyNanos, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.micrometer.core.instrument.distribution.CountAtBucket;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import net.devh.boot.grpc.common.util.Constants;

/**
* Tests for {@link MetricsServerStreamTracers}.
Expand All @@ -54,6 +55,10 @@ class MetricsServerStreamTracersTest {
private static final String FULL_METHOD_NAME = "package1.service1/method1";
private static final String GRPC_METHOD_TAG_KEY = "grpc.method";
private static final String GRPC_STATUS_TAG_KEY = "grpc.status";
private static final String INSTRUMENTATION_SOURCE_TAG_KEY = "instrumentation_source";
private static final String INSTRUMENTATION_SOURCE_TAG_VALUE = Constants.LIBRARY_NAME;
private static final String INSTRUMENTATION_VERSION_TAG_KEY = "instrumentation_version";
private static final String INSTRUMENTATION_VERSION_TAG_VALUE = Constants.VERSION;


private static class StringInputStream extends InputStream {
Expand Down Expand Up @@ -150,6 +155,8 @@ void serverBasicMetrics() {

assertThat(meterRegistry.get(SERVER_CALL_STARTED)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.counter()
.count()).isEqualTo(1);

Expand All @@ -170,6 +177,8 @@ void serverBasicMetrics() {
HistogramSnapshot sentMessageSizeSnapShot = meterRegistry.get(SERVER_SENT_COMPRESSED_MESSAGE_SIZE)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(GRPC_STATUS_TAG_KEY, Status.Code.CANCELLED.toString())
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.summary()
.takeSnapshot();
HistogramSnapshot expectedSentMessageSizeHistogram = HistogramSnapshot.empty(1L, 1127L, 1127L);
Expand All @@ -181,6 +190,8 @@ void serverBasicMetrics() {
meterRegistry.get(SERVER_RECEIVED_COMPRESSED_MESSAGE_SIZE)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(GRPC_STATUS_TAG_KEY, Status.Code.CANCELLED.toString())
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.summary()
.takeSnapshot();
HistogramSnapshot expectedReceivedMessageSizeHistogram = HistogramSnapshot.empty(1L, 188L, 188L);
Expand All @@ -197,6 +208,8 @@ void serverBasicMetrics() {
HistogramSnapshot callDurationSnapshot = meterRegistry.get(SERVER_CALL_DURATION)
.tag(GRPC_METHOD_TAG_KEY, FULL_METHOD_NAME)
.tag(GRPC_STATUS_TAG_KEY, Status.Code.CANCELLED.toString())
.tag(INSTRUMENTATION_SOURCE_TAG_KEY, INSTRUMENTATION_SOURCE_TAG_VALUE)
.tag(INSTRUMENTATION_VERSION_TAG_KEY, INSTRUMENTATION_VERSION_TAG_VALUE)
.timer()
.takeSnapshot();
HistogramSnapshot expectedCallDurationHistogram = HistogramSnapshot.empty(1L, 40L, 40);
Expand Down

0 comments on commit 8239f8a

Please sign in to comment.