Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otel server context interceptor and add experimental variable for experimental API #11500

Merged
merged 9 commits into from
Sep 16, 2024
6 changes: 4 additions & 2 deletions opentelemetry/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ dependencies {
libraries.opentelemetry.api,
libraries.auto.value.annotations

testImplementation testFixtures(project(':grpc-core')),
project(':grpc-testing'),
testImplementation project(':grpc-testing'),
project(':grpc-inprocess'),
testFixtures(project(':grpc-core')),
testFixtures(project(':grpc-api')),
libraries.opentelemetry.sdk.testing,
libraries.assertj.core // opentelemetry.sdk.testing uses compileOnly for assertj

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import io.grpc.ManagedChannelBuilder;
import io.grpc.MetricSink;
import io.grpc.ServerBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.api.trace.Tracer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -61,13 +63,18 @@ public Stopwatch get() {
}
};

@VisibleForTesting
static boolean ENABLE_OTEL_TRACING = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING",
false);

private final OpenTelemetry openTelemetrySdk;
private final MeterProvider meterProvider;
private final Meter meter;
private final Map<String, Boolean> enableMetrics;
private final boolean disableDefault;
private final OpenTelemetryMetricsResource resource;
private final OpenTelemetryMetricsModule openTelemetryMetricsModule;
private final OpenTelemetryTracingModule openTelemetryTracingModule;
private final List<String> optionalLabels;
private final MetricSink sink;

Expand All @@ -88,6 +95,7 @@ private GrpcOpenTelemetry(Builder builder) {
this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels);
this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule(
STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins);
this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk);
this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels);
}

Expand Down Expand Up @@ -125,6 +133,11 @@ MetricSink getSink() {
return sink;
}

@VisibleForTesting
Tracer getTracer() {
return this.openTelemetryTracingModule.getTracer();
}

/**
* Registers GrpcOpenTelemetry globally, applying its configuration to all subsequently created
* gRPC channels and servers.
Expand Down Expand Up @@ -152,6 +165,9 @@ public void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
InternalManagedChannelBuilder.addMetricSink(builder, sink);
InternalManagedChannelBuilder.interceptWithTarget(
builder, openTelemetryMetricsModule::getClientInterceptor);
if (ENABLE_OTEL_TRACING) {
builder.intercept(openTelemetryTracingModule.getClientInterceptor());
}
}

/**
Expand All @@ -161,6 +177,11 @@ public void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
*/
public void configureServerBuilder(ServerBuilder<?> serverBuilder) {
serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory());
if (ENABLE_OTEL_TRACING) {
serverBuilder.addStreamTracerFactory(
openTelemetryTracingModule.getServerTracerFactory());
serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor());
}
}

@VisibleForTesting
Expand Down Expand Up @@ -342,6 +363,11 @@ public Builder disableAllMetrics() {
return this;
}

Builder enableTracing(boolean enable) {
ENABLE_OTEL_TRACING = enable;
return this;
}

/**
* Returns a new {@link GrpcOpenTelemetry} built with the configuration of this {@link
* Builder}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@
GrpcOpenTelemetry.Builder builder, InternalOpenTelemetryPlugin plugin) {
builder.plugin(plugin);
}

public static void enableTracing(GrpcOpenTelemetry.Builder builder, boolean enable) {
builder.enableTracing(enable);
}

Check warning on line 35 in opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java

View check run for this annotation

Codecov / codecov/patch

opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java#L34-L35

Added lines #L34 - L35 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
Expand All @@ -28,15 +29,21 @@
import io.grpc.ClientStreamTracer;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerStreamTracer;
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.ContextPropagators;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.logging.Level;
Expand All @@ -50,7 +57,7 @@ final class OpenTelemetryTracingModule {
private static final Logger logger = Logger.getLogger(OpenTelemetryTracingModule.class.getName());

@VisibleForTesting
static final String OTEL_TRACING_SCOPE_NAME = "grpc-java";
final io.grpc.Context.Key<Span> otelSpan = io.grpc.Context.key("opentelemetry-span-key");
@Nullable
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
@Nullable
Expand Down Expand Up @@ -83,13 +90,23 @@ final class OpenTelemetryTracingModule {
private final MetadataGetter metadataGetter = MetadataGetter.getInstance();
private final MetadataSetter metadataSetter = MetadataSetter.getInstance();
private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
private final ServerInterceptor serverSpanPropagationInterceptor =
new TracingServerSpanPropagationInterceptor();
private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();

OpenTelemetryTracingModule(OpenTelemetry openTelemetry) {
this.otelTracer = checkNotNull(openTelemetry.getTracer(OTEL_TRACING_SCOPE_NAME), "otelTracer");
this.otelTracer = checkNotNull(openTelemetry.getTracerProvider(), "tracerProvider")
.tracerBuilder(OpenTelemetryConstants.INSTRUMENTATION_SCOPE)
.setInstrumentationVersion(IMPLEMENTATION_VERSION)
.build();
this.contextPropagators = checkNotNull(openTelemetry.getPropagators(), "contextPropagators");
}

@VisibleForTesting
Tracer getTracer() {
return otelTracer;
}

/**
* Creates a {@link CallAttemptsTracerFactory} for a new call.
*/
Expand All @@ -112,6 +129,10 @@ ClientInterceptor getClientInterceptor() {
return clientInterceptor;
}

ServerInterceptor getServerSpanPropagationInterceptor() {
return serverSpanPropagationInterceptor;
}

@VisibleForTesting
final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
volatile int callEnded;
Expand Down Expand Up @@ -252,6 +273,11 @@ public void streamClosed(io.grpc.Status status) {
endSpanWithStatus(span, status);
}

@Override
public io.grpc.Context filterContext(io.grpc.Context context) {
return context.withValue(otelSpan, span);
}

@Override
public void outboundMessageSent(
int seqNo, long optionalWireSize, long optionalUncompressedSize) {
Expand Down Expand Up @@ -293,6 +319,69 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
}
}

@VisibleForTesting
final class TracingServerSpanPropagationInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Span span = otelSpan.get(io.grpc.Context.current());
if (span == null) {
logger.log(Level.FINE, "Server span not found. ServerTracerFactory for server "
+ "tracing must be set.");
return next.startCall(call, headers);
}
Context serverCallContext = Context.current().with(span);
try (Scope scope = serverCallContext.makeCurrent()) {
return new ContextServerCallListener<>(next.startCall(call, headers), serverCallContext);
}
}
}

private static class ContextServerCallListener<ReqT> extends
ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
private final Context context;

protected ContextServerCallListener(ServerCall.Listener<ReqT> delegate, Context context) {
super(delegate);
this.context = checkNotNull(context, "context");
}

@Override
public void onMessage(ReqT message) {
try (Scope scope = context.makeCurrent()) {
delegate().onMessage(message);
}
}

@Override
public void onHalfClose() {
try (Scope scope = context.makeCurrent()) {
delegate().onHalfClose();
}
}

@Override
public void onCancel() {
try (Scope scope = context.makeCurrent()) {
delegate().onCancel();
}
}

@Override
public void onComplete() {
try (Scope scope = context.makeCurrent()) {
delegate().onComplete();
}
}

@Override
public void onReady() {
try (Scope scope = context.makeCurrent()) {
delegate().onReady();
}
}
}

@VisibleForTesting
final class TracingClientInterceptor implements ClientInterceptor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@
package io.grpc.opentelemetry;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MetricSink;
import io.grpc.ServerBuilder;
import io.grpc.internal.GrpcUtil;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import java.util.Arrays;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -35,7 +46,19 @@ public class GrpcOpenTelemetryTest {
private final InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create();
private final SdkMeterProvider meterProvider =
SdkMeterProvider.builder().registerMetricReader(inMemoryMetricReader).build();
private final SdkTracerProvider tracerProvider = SdkTracerProvider.builder().build();
private final OpenTelemetry noopOpenTelemetry = OpenTelemetry.noop();
private boolean originalEnableOtelTracing;

@Before
public void setup() {
originalEnableOtelTracing = GrpcOpenTelemetry.ENABLE_OTEL_TRACING;
}

@After
public void tearDown() {
GrpcOpenTelemetry.ENABLE_OTEL_TRACING = originalEnableOtelTracing;
}

@Test
public void build() {
Expand All @@ -56,6 +79,31 @@ public void build() {
assertThat(openTelemetryModule.getOptionalLabels()).isEqualTo(ImmutableList.of("version"));
}

@Test
public void buildTracer() {
OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();

GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder()
.enableTracing(true)
.sdk(sdk).build();

assertThat(grpcOpenTelemetry.getOpenTelemetryInstance()).isSameInstanceAs(sdk);
assertThat(grpcOpenTelemetry.getTracer()).isSameInstanceAs(
tracerProvider.tracerBuilder("grpc-java")
.setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION)
.build());
ServerBuilder<?> mockServerBuiler = mock(ServerBuilder.class);
grpcOpenTelemetry.configureServerBuilder(mockServerBuiler);
verify(mockServerBuiler, times(2)).addStreamTracerFactory(any());
verify(mockServerBuiler).intercept(any());
verifyNoMoreInteractions(mockServerBuiler);

ManagedChannelBuilder<?> mockChannelBuilder = mock(ManagedChannelBuilder.class);
grpcOpenTelemetry.configureChannelBuilder(mockChannelBuilder);
verify(mockChannelBuilder).intercept(any(ClientInterceptor.class));
}

@Test
public void builderDefaults() {
GrpcOpenTelemetry module = GrpcOpenTelemetry.newBuilder().build();
Expand All @@ -73,6 +121,13 @@ public void builderDefaults() {
assertThat(module.getEnableMetrics()).isEmpty();
assertThat(module.getOptionalLabels()).isEmpty();
assertThat(module.getSink()).isInstanceOf(MetricSink.class);

assertThat(module.getTracer()).isSameInstanceAs(noopOpenTelemetry
.getTracerProvider()
.tracerBuilder("grpc-java")
.setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION)
.build()
);
}

@Test
Expand Down
Loading