From 4ff4020c5425f9d46f7cd382043647466fb0debb Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Fri, 30 Aug 2024 12:21:15 -0700 Subject: [PATCH 1/7] add filter context at tracing server --- opentelemetry/build.gradle | 5 +- .../OpenTelemetryTracingModule.java | 436 ++++++++++++ .../OpenTelemetryTracingModuleTest.java | 666 ++++++++++++++++++ 3 files changed, 1105 insertions(+), 2 deletions(-) create mode 100644 opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java create mode 100644 opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java diff --git a/opentelemetry/build.gradle b/opentelemetry/build.gradle index 509960e5dbc..338ac0f6870 100644 --- a/opentelemetry/build.gradle +++ b/opentelemetry/build.gradle @@ -14,8 +14,9 @@ dependencies { libraries.opentelemetry.api, libraries.auto.value.annotations - testImplementation testFixtures(project(':grpc-core')), - project(':grpc-testing'), + testImplementation project(':grpc-testing'), + project(':grpc-inprocess'), + testFixtures(project(':grpc-core')), libraries.opentelemetry.sdk.testing, libraries.assertj.core // opentelemetry.sdk.testing uses compileOnly for assertj diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java new file mode 100644 index 00000000000..8d8b898357a --- /dev/null +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -0,0 +1,436 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.opentelemetry; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientStreamTracer; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerStreamTracer; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * Provides factories for {@link io.grpc.StreamTracer} that records tracing to OpenTelemetry. + */ +final class OpenTelemetryTracingModule { + private static final Logger logger = Logger.getLogger(OpenTelemetryTracingModule.class.getName()); + + @VisibleForTesting + static final String OTEL_TRACING_SCOPE_NAME = "grpc-java"; + private final io.grpc.Context.Key otelSpan = io.grpc.Context.key("opentelemetry-span-key"); + @Nullable + private static final AtomicIntegerFieldUpdater callEndedUpdater; + @Nullable + private static final AtomicIntegerFieldUpdater streamClosedUpdater; + + /* + * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK + * reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to + * (potentially racy) direct updates of the volatile variables. + */ + static { + AtomicIntegerFieldUpdater tmpCallEndedUpdater; + AtomicIntegerFieldUpdater tmpStreamClosedUpdater; + try { + tmpCallEndedUpdater = + AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded"); + tmpStreamClosedUpdater = + AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); + } catch (Throwable t) { + logger.log(Level.SEVERE, "Creating atomic field updaters failed", t); + tmpCallEndedUpdater = null; + tmpStreamClosedUpdater = null; + } + callEndedUpdater = tmpCallEndedUpdater; + streamClosedUpdater = tmpStreamClosedUpdater; + } + + private final Tracer otelTracer; + private final ContextPropagators contextPropagators; + private final MetadataGetter metadataGetter = MetadataGetter.getInstance(); + private final MetadataSetter metadataSetter = MetadataSetter.getInstance(); + private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor(); + private final ServerInterceptor serverSpanPropagationInterceptor = + new TracingServerSpanPropagationInterceptor(); + private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory(); + + OpenTelemetryTracingModule(OpenTelemetry openTelemetry) { + this.otelTracer = checkNotNull(openTelemetry.getTracer(OTEL_TRACING_SCOPE_NAME), "otelTracer"); + this.contextPropagators = checkNotNull(openTelemetry.getPropagators(), "contextPropagators"); + } + + /** + * Creates a {@link CallAttemptsTracerFactory} for a new call. + */ + @VisibleForTesting + CallAttemptsTracerFactory newClientCallTracer(Span clientSpan, MethodDescriptor method) { + return new CallAttemptsTracerFactory(clientSpan, method); + } + + /** + * Returns the server tracer factory. + */ + ServerStreamTracer.Factory getServerTracerFactory() { + return serverTracerFactory; + } + + /** + * Returns the client interceptor that facilitates otel tracing reporting. + */ + ClientInterceptor getClientInterceptor() { + return clientInterceptor; + } + + ServerInterceptor getServerSpanPropagationInterceptor() { + return serverSpanPropagationInterceptor; + } + + @VisibleForTesting + final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory { + volatile int callEnded; + private final Span clientSpan; + private final String fullMethodName; + + CallAttemptsTracerFactory(Span clientSpan, MethodDescriptor method) { + checkNotNull(method, "method"); + this.fullMethodName = checkNotNull(method.getFullMethodName(), "fullMethodName"); + this.clientSpan = checkNotNull(clientSpan, "clientSpan"); + } + + @Override + public ClientStreamTracer newClientStreamTracer( + ClientStreamTracer.StreamInfo info, Metadata headers) { + Span attemptSpan = otelTracer.spanBuilder( + "Attempt." + fullMethodName.replace('/', '.')) + .setParent(Context.current().with(clientSpan)) + .startSpan(); + attemptSpan.setAttribute( + "previous-rpc-attempts", info.getPreviousAttempts()); + attemptSpan.setAttribute( + "transparent-retry",info.isTransparentRetry()); + if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED) != null) { + clientSpan.addEvent("Delayed name resolution complete"); + } + return new ClientTracer(attemptSpan, clientSpan); + } + + /** + * Record a finished call and mark the current time as the end time. + * + *

Can be called from any thread without synchronization. Calling it the second time or more + * is a no-op. + */ + void callEnded(io.grpc.Status status) { + if (callEndedUpdater != null) { + if (callEndedUpdater.getAndSet(this, 1) != 0) { + return; + } + } else { + if (callEnded != 0) { + return; + } + callEnded = 1; + } + endSpanWithStatus(clientSpan, status); + } + } + + private final class ClientTracer extends ClientStreamTracer { + private final Span span; + private final Span parentSpan; + volatile int seqNo; + boolean isPendingStream; + + ClientTracer(Span span, Span parentSpan) { + this.span = checkNotNull(span, "span"); + this.parentSpan = checkNotNull(parentSpan, "parent span"); + } + + @Override + public void streamCreated(Attributes transportAtts, Metadata headers) { + contextPropagators.getTextMapPropagator().inject(Context.current().with(span), headers, + metadataSetter); + if (isPendingStream) { + span.addEvent("Delayed LB pick complete"); + } + } + + @Override + public void createPendingStream() { + isPendingStream = true; + } + + @Override + public void outboundMessageSent( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + //TODO(yifeizhuang): needs support from message deframer. + if (optionalWireSize != optionalUncompressedSize) { + recordInboundCompressedMessage(span, seqNo, optionalWireSize); + } + } + + @Override + public void inboundMessage(int seqNo) { + this.seqNo = seqNo; + } + + @Override + public void inboundUncompressedSize(long bytes) { + recordInboundMessageSize(parentSpan, seqNo, bytes); + } + + @Override + public void streamClosed(io.grpc.Status status) { + endSpanWithStatus(span, status); + } + } + + private final class ServerTracer extends ServerStreamTracer { + private final Span span; + volatile int streamClosed; + private int seqNo; + + ServerTracer(String fullMethodName, @Nullable Span remoteSpan) { + checkNotNull(fullMethodName, "fullMethodName"); + this.span = + otelTracer.spanBuilder(generateTraceSpanName(true, fullMethodName)) + .setParent(remoteSpan == null ? null : Context.current().with(remoteSpan)) + .startSpan(); + } + + /** + * Record a finished stream and mark the current time as the end time. + * + *

Can be called from any thread without synchronization. Calling it the second time or more + * is a no-op. + */ + @Override + public void streamClosed(io.grpc.Status status) { + if (streamClosedUpdater != null) { + if (streamClosedUpdater.getAndSet(this, 1) != 0) { + return; + } + } else { + if (streamClosed != 0) { + return; + } + streamClosed = 1; + } + endSpanWithStatus(span, status); + } + + @Override + public io.grpc.Context filterContext(io.grpc.Context context) { + return context.withValue(otelSpan, span); + } + + @Override + public void outboundMessageSent( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + if (optionalWireSize != optionalUncompressedSize) { + recordInboundCompressedMessage(span, seqNo, optionalWireSize); + } + } + + @Override + public void inboundMessage(int seqNo) { + this.seqNo = seqNo; + } + + @Override + public void inboundUncompressedSize(long bytes) { + recordInboundMessageSize(span, seqNo, bytes); + } + } + + @VisibleForTesting + final class ServerTracerFactory extends ServerStreamTracer.Factory { + @SuppressWarnings("ReferenceEquality") + @Override + public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { + Context context = contextPropagators.getTextMapPropagator().extract( + Context.current(), headers, metadataGetter + ); + Span remoteSpan = Span.fromContext(context); + if (remoteSpan == Span.getInvalid()) { + remoteSpan = null; + } + return new ServerTracer(fullMethodName, remoteSpan); + } + } + + @VisibleForTesting + final class TracingServerSpanPropagationInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall(ServerCall call, + Metadata headers, ServerCallHandler next) { + Span span = otelSpan.get(io.grpc.Context.current()); + try (Scope scope = Context.current().with(span).makeCurrent()) { + return next.startCall(call, headers); + } + } + } + + @VisibleForTesting + final class TracingClientInterceptor implements ClientInterceptor { + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + Span clientSpan = otelTracer.spanBuilder( + generateTraceSpanName(false, method.getFullMethodName())) + .startSpan(); + + final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method); + ClientCall call = + next.newCall( + method, + callOptions.withStreamTracerFactory(tracerFactory)); + return new SimpleForwardingClientCall(call) { + @Override + public void start(Listener responseListener, Metadata headers) { + delegate().start( + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onClose(io.grpc.Status status, Metadata trailers) { + tracerFactory.callEnded(status); + super.onClose(status, trailers); + } + }, + headers); + } + }; + } + } + + // Attribute named "message-size" always means the message size the application sees. + // If there was compression, additional event reports "message-size-compressed". + // + // An example trace with message compression: + // + // Sending: + // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854, + // 'message-size-compressed' = 5493) ----| + // + // Receiving: + // |-- Event 'Inbound compressed message', attributes('sequence-numer' = 0, + // 'message-size-compressed' = 5493 ) ----| + // |-- Event 'Inbound message received', attributes('sequence-numer' = 0, + // 'message-size' = 7854) ----| + // + // An example trace with no message compression: + // + // Sending: + // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854) ---| + // + // Receiving: + // |-- Event 'Inbound message received', attributes('sequence-numer' = 0, + // 'message-size' = 7854) ----| + private void recordOutboundMessageSentEvent(Span span, + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder(); + attributesBuilder.put("sequence-number", seqNo); + if (optionalUncompressedSize != -1) { + attributesBuilder.put("message-size", optionalUncompressedSize); + } + if (optionalWireSize != -1 && optionalWireSize != optionalUncompressedSize) { + attributesBuilder.put("message-size-compressed", optionalWireSize); + } + span.addEvent("Outbound message sent", attributesBuilder.build()); + } + + private void recordInboundCompressedMessage(Span span, int seqNo, long optionalWireSize) { + AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder(); + attributesBuilder.put("sequence-number", seqNo); + attributesBuilder.put("message-size-compressed", optionalWireSize); + span.addEvent("Inbound compressed message", attributesBuilder.build()); + } + + private void recordInboundMessageSize(Span span, int seqNo, long bytes) { + AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder(); + attributesBuilder.put("sequence-number", seqNo); + attributesBuilder.put("message-size", bytes); + span.addEvent("Inbound message received", attributesBuilder.build()); + } + + private String generateErrorStatusDescription(io.grpc.Status status) { + if (status.getDescription() != null) { + return status.getCode() + ": " + status.getDescription(); + } else { + return status.getCode().toString(); + } + } + + private void endSpanWithStatus(Span span, io.grpc.Status status) { + if (status.isOk()) { + span.setStatus(StatusCode.OK); + } else { + span.setStatus(StatusCode.ERROR, generateErrorStatusDescription(status)); + } + span.end(); + } + + /** + * Convert a full method name to a tracing span name. + * + * @param isServer {@code false} if the span is on the client-side, {@code true} if on the + * server-side + * @param fullMethodName the method name as returned by + * {@link MethodDescriptor#getFullMethodName}. + */ + @VisibleForTesting + static String generateTraceSpanName(boolean isServer, String fullMethodName) { + String prefix = isServer ? "Recv" : "Sent"; + return prefix + "." + fullMethodName.replace('/', '.'); + } +} diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java new file mode 100644 index 00000000000..267637e9efb --- /dev/null +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java @@ -0,0 +1,666 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.opentelemetry; + +import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; +import static io.grpc.opentelemetry.OpenTelemetryTracingModule.OTEL_TRACING_SCOPE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.ClientStreamTracer; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptors; +import io.grpc.ServerServiceDefinition; +import io.grpc.ServerStreamTracer; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.opentelemetry.OpenTelemetryTracingModule.CallAttemptsTracerFactory; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.testing.GrpcServerRule; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.EventData; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class OpenTelemetryTracingModuleTest { + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + + private static final ClientStreamTracer.StreamInfo STREAM_INFO = + ClientStreamTracer.StreamInfo.newBuilder() + .setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build(); + private static final CallOptions.Key CUSTOM_OPTION = + CallOptions.Key.createWithDefault("option1", "default"); + private static final CallOptions CALL_OPTIONS = + CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue"); + + private static class StringInputStream extends InputStream { + final String string; + + StringInputStream(String string) { + this.string = string; + } + + @Override + public int read() { + // InProcessTransport doesn't actually read bytes from the InputStream. The InputStream is + // passed to the InProcess server and consumed by MARSHALLER.parse(). + throw new UnsupportedOperationException("Should not be called"); + } + } + + private static final MethodDescriptor.Marshaller MARSHALLER = + new MethodDescriptor.Marshaller() { + @Override + public InputStream stream(String value) { + return new StringInputStream(value); + } + + @Override + public String parse(InputStream stream) { + return ((StringInputStream) stream).string; + } + }; + + private final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNKNOWN) + .setRequestMarshaller(MARSHALLER) + .setResponseMarshaller(MARSHALLER) + .setFullMethodName("package1.service2/method3") + .build(); + + @Rule + public final OpenTelemetryRule openTelemetryRule = OpenTelemetryRule.create(); + @Rule + public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor(); + @Rule + public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); + private Tracer tracerRule; + @Mock + private Tracer mockTracer; + @Mock + TextMapPropagator mockPropagator; + @Mock + private Span mockClientSpan; + @Mock + private Span mockAttemptSpan; + @Mock + private ServerCall.Listener mockServerCallListener; + @Mock + private ClientCall.Listener mockClientCallListener; + @Mock + private SpanBuilder mockSpanBuilder; + @Mock + private OpenTelemetry mockOpenTelemetry; + @Captor + private ArgumentCaptor eventNameCaptor; + @Captor + private ArgumentCaptor attributesCaptor; + @Captor + private ArgumentCaptor statusCaptor; + + @Before + public void setUp() { + tracerRule = openTelemetryRule.getOpenTelemetry().getTracer(OTEL_TRACING_SCOPE_NAME); + when(mockOpenTelemetry.getTracer(OTEL_TRACING_SCOPE_NAME)).thenReturn(mockTracer); + when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(mockPropagator)); + when(mockSpanBuilder.startSpan()).thenReturn(mockAttemptSpan); + when(mockSpanBuilder.setParent(any())).thenReturn(mockSpanBuilder); + when(mockTracer.spanBuilder(any())).thenReturn(mockSpanBuilder); + } + + // Use mock instead of OpenTelemetryRule to verify inOrder and propagator. + @Test + public void clientBasicTracingMocking() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(mockClientSpan, method); + Metadata headers = new Metadata(); + ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); + clientStreamTracer.createPendingStream(); + clientStreamTracer.streamCreated(Attributes.EMPTY, headers); + + verify(mockTracer).spanBuilder(eq("Attempt.package1.service2.method3")); + verify(mockPropagator).inject(any(), eq(headers), eq(MetadataSetter.getInstance())); + verify(mockClientSpan, never()).end(); + verify(mockAttemptSpan, never()).end(); + + clientStreamTracer.outboundMessage(0); + clientStreamTracer.outboundMessageSent(0, 882, -1); + clientStreamTracer.inboundMessage(0); + clientStreamTracer.outboundMessage(1); + clientStreamTracer.outboundMessageSent(1, -1, 27); + clientStreamTracer.inboundMessageRead(0, 255, 90); + + clientStreamTracer.streamClosed(Status.OK); + callTracer.callEnded(Status.OK); + + InOrder inOrder = inOrder(mockClientSpan, mockAttemptSpan); + inOrder.verify(mockAttemptSpan) + .setAttribute("previous-rpc-attempts", 0); + inOrder.verify(mockAttemptSpan) + .setAttribute("transparent-retry", false); + inOrder.verify(mockClientSpan).addEvent("Delayed name resolution complete"); + inOrder.verify(mockAttemptSpan).addEvent("Delayed LB pick complete"); + inOrder.verify(mockAttemptSpan, times(3)).addEvent( + eventNameCaptor.capture(), attributesCaptor.capture() + ); + List events = eventNameCaptor.getAllValues(); + List attributes = attributesCaptor.getAllValues(); + assertEquals( + "Outbound message sent" , + events.get(0)); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 882) + .build(), + attributes.get(0)); + + assertEquals( + "Outbound message sent" , + events.get(1)); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 1) + .put("message-size", 27) + .build(), + attributes.get(1)); + + assertEquals( + "Inbound compressed message" , + events.get(2)); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 255) + .build(), + attributes.get(2)); + + inOrder.verify(mockAttemptSpan).setStatus(StatusCode.OK); + inOrder.verify(mockAttemptSpan).end(); + inOrder.verify(mockClientSpan).setStatus(StatusCode.OK); + inOrder.verify(mockClientSpan).end(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void clientBasicTracingRule() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + Span clientSpan = tracerRule.spanBuilder("test-client-span").startSpan(); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(clientSpan, method); + Metadata headers = new Metadata(); + ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); + clientStreamTracer.createPendingStream(); + clientStreamTracer.streamCreated(Attributes.EMPTY, headers); + clientStreamTracer.outboundMessage(0); + clientStreamTracer.outboundMessageSent(0, 882, -1); + clientStreamTracer.inboundMessage(0); + clientStreamTracer.outboundMessage(1); + clientStreamTracer.outboundMessageSent(1, -1, 27); + clientStreamTracer.inboundMessageRead(0, 255, -1); + clientStreamTracer.inboundUncompressedSize(288); + clientStreamTracer.inboundMessageRead(1, 128, 128); + clientStreamTracer.inboundMessage(1); + clientStreamTracer.inboundUncompressedSize(128); + + clientStreamTracer.streamClosed(Status.OK); + callTracer.callEnded(Status.OK); + + List spans = openTelemetryRule.getSpans(); + assertEquals(spans.size(), 2); + SpanData attemptSpanData = spans.get(0); + SpanData clientSpanData = spans.get(1); + assertEquals(attemptSpanData.getName(), "Attempt.package1.service2.method3"); + assertEquals(clientSpanData.getName(), "test-client-span"); + assertEquals(headers.keys(), ImmutableSet.of("traceparent")); + String spanContext = headers.get( + Metadata.Key.of("traceparent", Metadata.ASCII_STRING_MARSHALLER)); + assertEquals(spanContext.substring(3, 3 + TraceId.getLength()), + spans.get(1).getSpanContext().getTraceId()); + + // parent(client) span data + List clientSpanEvents = clientSpanData.getEvents(); + assertEquals(clientSpanEvents.size(), 3); + assertEquals( + "Delayed name resolution complete", + clientSpanEvents.get(0).getName()); + assertTrue(clientSpanEvents.get(0).getAttributes().isEmpty()); + + assertEquals( + "Inbound message received" , + clientSpanEvents.get(1).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size", 288) + .build(), + clientSpanEvents.get(1).getAttributes()); + + assertEquals( + "Inbound message received" , + clientSpanEvents.get(2).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 1) + .put("message-size", 128) + .build(), + clientSpanEvents.get(2).getAttributes()); + assertEquals(clientSpanData.hasEnded(), true); + + // child(attempt) span data + List attemptSpanEvents = attemptSpanData.getEvents(); + assertEquals(clientSpanEvents.size(), 3); + assertEquals( + "Delayed LB pick complete", + attemptSpanEvents.get(0).getName()); + assertTrue(clientSpanEvents.get(0).getAttributes().isEmpty()); + + assertEquals( + "Outbound message sent" , + attemptSpanEvents.get(1).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 882) + .build(), + attemptSpanEvents.get(1).getAttributes()); + + assertEquals( + "Outbound message sent" , + attemptSpanEvents.get(2).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 1) + .put("message-size", 27) + .build(), + attemptSpanEvents.get(2).getAttributes()); + + assertEquals( + "Inbound compressed message" , + attemptSpanEvents.get(3).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 255) + .build(), + attemptSpanEvents.get(3).getAttributes()); + + assertEquals(attemptSpanData.hasEnded(), true); + } + + @Test + public void clientInterceptor() { + testClientInterceptors(false); + } + + @Test + public void clientInterceptorNonDefaultOtelContext() { + testClientInterceptors(true); + } + + private void testClientInterceptors(boolean nonDefaultOtelContext) { + final AtomicReference capturedMetadata = new AtomicReference<>(); + grpcServerRule.getServiceRegistry().addService( + ServerServiceDefinition.builder("package1.service2").addMethod( + method, new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, Metadata headers) { + capturedMetadata.set(headers); + call.sendHeaders(new Metadata()); + call.sendMessage("Hello"); + call.close( + Status.PERMISSION_DENIED.withDescription("No you don't"), new Metadata()); + return mockServerCallListener; + } + }).build()); + + final AtomicReference capturedCallOptions = new AtomicReference<>(); + ClientInterceptor callOptionsCaptureInterceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + capturedCallOptions.set(callOptions); + return next.newCall(method, callOptions); + } + }; + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + Channel interceptedChannel = + ClientInterceptors.intercept( + grpcServerRule.getChannel(), callOptionsCaptureInterceptor, + tracingModule.getClientInterceptor()); + Span parentSpan = tracerRule.spanBuilder("test-parent-span").startSpan(); + ClientCall call; + + if (nonDefaultOtelContext) { + try (Scope scope = io.opentelemetry.context.Context.current().with(parentSpan) + .makeCurrent()) { + call = interceptedChannel.newCall(method, CALL_OPTIONS); + } + } else { + call = interceptedChannel.newCall(method, CALL_OPTIONS); + } + assertEquals("customvalue", capturedCallOptions.get().getOption(CUSTOM_OPTION)); + assertEquals(1, capturedCallOptions.get().getStreamTracerFactories().size()); + assertTrue( + capturedCallOptions.get().getStreamTracerFactories().get(0) + instanceof CallAttemptsTracerFactory); + + // Make the call + Metadata headers = new Metadata(); + call.start(mockClientCallListener, headers); + + // End the call + call.halfClose(); + call.request(1); + parentSpan.end(); + + verify(mockClientCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); + Status status = statusCaptor.getValue(); + assertEquals(Status.Code.PERMISSION_DENIED, status.getCode()); + assertEquals("No you don't", status.getDescription()); + + List spans = openTelemetryRule.getSpans(); + assertEquals(spans.size(), 3); + + SpanData clientSpan = spans.get(1); + SpanData attemptSpan = spans.get(0); + if (nonDefaultOtelContext) { + assertEquals(clientSpan.getParentSpanContext(), parentSpan.getSpanContext()); + } else { + assertEquals(clientSpan.getParentSpanContext(), + Span.fromContext(Context.root()).getSpanContext()); + } + String spanContext = capturedMetadata.get().get( + Metadata.Key.of("traceparent", Metadata.ASCII_STRING_MARSHALLER)); + // W3C format: 00--- + assertEquals(spanContext.substring(3, 3 + TraceId.getLength()), + attemptSpan.getSpanContext().getTraceId()); + assertEquals(spanContext.substring(3 + TraceId.getLength() + 1, + 3 + TraceId.getLength() + 1 + SpanId.getLength()), + attemptSpan.getSpanContext().getSpanId()); + + assertEquals(attemptSpan.getParentSpanContext(), clientSpan.getSpanContext()); + assertTrue(clientSpan.hasEnded()); + assertEquals(clientSpan.getStatus().getStatusCode(), StatusCode.ERROR); + assertEquals(clientSpan.getStatus().getDescription(), "PERMISSION_DENIED: No you don't"); + assertTrue(attemptSpan.hasEnded()); + assertTrue(attemptSpan.hasEnded()); + assertEquals(attemptSpan.getStatus().getStatusCode(), StatusCode.ERROR); + assertEquals(attemptSpan.getStatus().getDescription(), "PERMISSION_DENIED: No you don't"); + } + + @Test + public void clientStreamNeverCreatedStillRecordTracing() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(mockClientSpan, method); + + callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); + verify(mockClientSpan).end(); + verify(mockClientSpan).setStatus(eq(StatusCode.ERROR), + eq("DEADLINE_EXCEEDED: 3 seconds")); + verifyNoMoreInteractions(mockClientSpan); + } + + @Test + public void serverBasicTracingNoHeaders() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + ServerStreamTracer.Factory tracerFactory = tracingModule.getServerTracerFactory(); + ServerStreamTracer serverStreamTracer = + tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); + assertSame(Span.fromContext(Context.current()), Span.getInvalid()); + + serverStreamTracer.outboundMessage(0); + serverStreamTracer.outboundMessageSent(0, 882, 998); + serverStreamTracer.inboundMessage(0); + serverStreamTracer.outboundMessage(1); + serverStreamTracer.outboundMessageSent(1, -1, 27); + serverStreamTracer.inboundMessageRead(0, 90, -1); + serverStreamTracer.inboundUncompressedSize(255); + + serverStreamTracer.streamClosed(Status.CANCELLED); + + List spans = openTelemetryRule.getSpans(); + assertEquals(spans.size(), 1); + assertEquals(spans.get(0).getName(), "Recv.package1.service2.method3"); + assertEquals(spans.get(0).getParentSpanContext(), Span.getInvalid().getSpanContext()); + + List events = spans.get(0).getEvents(); + assertEquals(events.size(), 4); + assertEquals( + "Outbound message sent" , + events.get(0).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 882) + .put("message-size", 998) + .build(), + events.get(0).getAttributes()); + + assertEquals( + "Outbound message sent" , + events.get(1).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 1) + .put("message-size", 27) + .build(), + events.get(1).getAttributes()); + + assertEquals( + "Inbound compressed message" , + events.get(2).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 90) + .build(), + events.get(2).getAttributes()); + + assertEquals( + "Inbound message received" , + events.get(3).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size", 255) + .build(), + events.get(3).getAttributes()); + + assertEquals(spans.get(0).hasEnded(), true); + } + + @Test + public void grpcTraceBinPropagator() { + when(mockOpenTelemetry.getPropagators()).thenReturn( + ContextPropagators.create(GrpcTraceBinContextPropagator.defaultInstance())); + ArgumentCaptor contextArgumentCaptor = ArgumentCaptor.forClass(Context.class); + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry); + Span testClientSpan = tracerRule.spanBuilder("test-client-span").startSpan(); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(testClientSpan, method); + Span testAttemptSpan = tracerRule.spanBuilder("test-attempt-span").startSpan(); + when(mockSpanBuilder.startSpan()).thenReturn(testAttemptSpan); + + Metadata headers = new Metadata(); + ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); + clientStreamTracer.streamCreated(Attributes.EMPTY, headers); + clientStreamTracer.streamClosed(Status.CANCELLED); + + Metadata.Key key = Metadata.Key.of( + GrpcTraceBinContextPropagator.GRPC_TRACE_BIN_HEADER, Metadata.BINARY_BYTE_MARSHALLER); + assertTrue(Arrays.equals(BinaryFormat.getInstance().toBytes(testAttemptSpan.getSpanContext()), + headers.get(key) + )); + verify(mockSpanBuilder).setParent(contextArgumentCaptor.capture()); + assertEquals(testClientSpan, Span.fromContext(contextArgumentCaptor.getValue())); + + Span serverSpan = tracerRule.spanBuilder("test-server-span").startSpan(); + when(mockSpanBuilder.startSpan()).thenReturn(serverSpan); + ServerStreamTracer.Factory tracerFactory = tracingModule.getServerTracerFactory(); + ServerStreamTracer serverStreamTracer = + tracerFactory.newServerStreamTracer(method.getFullMethodName(), headers); + serverStreamTracer.streamClosed(Status.CANCELLED); + + verify(mockSpanBuilder, times(2)) + .setParent(contextArgumentCaptor.capture()); + assertEquals(testAttemptSpan.getSpanContext(), + Span.fromContext(contextArgumentCaptor.getValue()).getSpanContext()); + } + + @Test + public void testServerParentSpanPropagation() throws Exception { + final AtomicReference applicationSpan = new AtomicReference<>(); + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + ServerServiceDefinition serviceDefinition = + ServerServiceDefinition.builder("package1.service2").addMethod( + method, new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, Metadata headers) { + applicationSpan.set(Span.fromContext(Context.current())); + call.sendHeaders(new Metadata()); + call.sendMessage("Hello"); + call.close( + Status.PERMISSION_DENIED.withDescription("No you don't"), new Metadata()); + return mockServerCallListener; + } + }).build(); + + Server server = InProcessServerBuilder.forName("test-server-span") + .addService( + ServerInterceptors.intercept(serviceDefinition, + tracingModule.getServerSpanPropagationInterceptor())) + .addStreamTracerFactory(tracingModule.getServerTracerFactory()) + .directExecutor().build().start(); + grpcCleanupRule.register(server); + + ManagedChannel channel = InProcessChannelBuilder.forName("test-server-span") + .directExecutor().build(); + grpcCleanupRule.register(channel); + + Span parentSpan = tracerRule.spanBuilder("test-parent-span").startSpan(); + try (Scope scope = Context.current().with(parentSpan).makeCurrent()) { + Channel interceptedChannel = + ClientInterceptors.intercept( + channel, tracingModule.getClientInterceptor()); + ClientCall call = interceptedChannel.newCall(method, CALL_OPTIONS); + Metadata headers = new Metadata(); + call.start(mockClientCallListener, headers); + + // End the call + call.halfClose(); + call.request(1); + parentSpan.end(); + } + + verify(mockClientCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); + Status rpcStatus = statusCaptor.getValue(); + assertEquals(rpcStatus.getCode(), Status.Code.PERMISSION_DENIED); + assertEquals(rpcStatus.getDescription(), "No you don't"); + assertEquals(applicationSpan.get().getSpanContext().getTraceId(), + parentSpan.getSpanContext().getTraceId()); + + List spans = openTelemetryRule.getSpans(); + assertEquals(spans.size(), 4); + SpanData clientSpan = spans.get(2); + SpanData attemptSpan = spans.get(1); + + assertEquals(clientSpan.getName(), "Sent.package1.service2.method3"); + assertTrue(clientSpan.hasEnded()); + assertEquals(clientSpan.getStatus().getStatusCode(), StatusCode.ERROR); + assertEquals(clientSpan.getStatus().getDescription(), "PERMISSION_DENIED: No you don't"); + + assertEquals(attemptSpan.getName(), "Attempt.package1.service2.method3"); + assertTrue(attemptSpan.hasEnded()); + assertEquals(attemptSpan.getStatus().getStatusCode(), StatusCode.ERROR); + assertEquals(attemptSpan.getStatus().getDescription(), "PERMISSION_DENIED: No you don't"); + + SpanData serverSpan = spans.get(0); + assertEquals(serverSpan.getName(), "Recv.package1.service2.method3"); + assertTrue(serverSpan.hasEnded()); + assertEquals(serverSpan.getStatus().getStatusCode(), StatusCode.ERROR); + assertEquals(serverSpan.getStatus().getDescription(), "PERMISSION_DENIED: No you don't"); + } + + @Test + public void generateTraceSpanName() { + assertEquals( + "Sent.io.grpc.Foo", OpenTelemetryTracingModule.generateTraceSpanName( + false, "io.grpc/Foo")); + assertEquals( + "Recv.io.grpc.Bar", OpenTelemetryTracingModule.generateTraceSpanName( + true, "io.grpc/Bar")); + } +} From d8c947a872bb04de5f46d1b37b1024196cdd2690 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Wed, 4 Sep 2024 14:20:00 -0700 Subject: [PATCH 2/7] fix server listener set span in context --- interop-testing/build.gradle | 1 + .../OpenTelemetryContextFilterTest.java | 131 ++++++++++++++++++ .../grpc/opentelemetry/GrpcOpenTelemetry.java | 6 + .../OpenTelemetryTracingModule.java | 53 ++++++- 4 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryContextFilterTest.java diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index a19efb00155..a85aec97adf 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -13,6 +13,7 @@ dependencies { implementation project(path: ':grpc-alts', configuration: 'shadow'), project(':grpc-auth'), project(':grpc-census'), + project(':grpc-opentelemetry'), project(':grpc-gcp-csm-observability'), project(':grpc-netty'), project(':grpc-okhttp'), diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryContextFilterTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryContextFilterTest.java new file mode 100644 index 00000000000..6ff8966e602 --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryContextFilterTest.java @@ -0,0 +1,131 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static org.junit.Assert.assertEquals; + +import io.grpc.ForwardingServerCallListener; +import io.grpc.InsecureServerCredentials; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.ServerBuilder; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.opentelemetry.GrpcOpenTelemetry; +import io.grpc.testing.integration.Messages.SimpleRequest; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OpenTelemetryContextFilterTest extends AbstractInteropTest { + private final OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder() + .setTracerProvider(SdkTracerProvider.builder().build()) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .build(); + private final Tracer tracer = openTelemetrySdk.getTracer("grpc-java"); + private final GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder() + .sdk(openTelemetrySdk).build(); + private final AtomicReference applicationSpan = new AtomicReference<>(); + + @Override + protected ServerBuilder getServerBuilder() { + NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create()) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); + builder.intercept(new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall(ServerCall call, + Metadata headers, ServerCallHandler next) { + ServerCall.Listener listener = next.startCall(call, headers); + return new ForwardingServerCallListener() { + @Override + protected ServerCall.Listener delegate() { + return listener; + } + + @Override + public void onMessage(ReqT request) { + applicationSpan.set(tracer.spanBuilder("InteropTest.Application.Span").startSpan()); + delegate().onMessage(request); + } + + @Override + public void onHalfClose() { + if (applicationSpan.get() != null) { + applicationSpan.get().end(); + } + delegate().onHalfClose(); + } + + @Override + public void onCancel() { + if (applicationSpan.get() != null) { + applicationSpan.get().end(); + } + delegate().onCancel(); + } + + @Override + public void onComplete() { + if (applicationSpan.get() != null) { + applicationSpan.get().end(); + } + delegate().onComplete(); + } + }; + } + }); + grpcOpenTelemetry.configureServerBuilder(builder); + return builder; + } + + @Override + protected boolean metricsExpected() { + return false; + } + + @Override + protected ManagedChannelBuilder createChannelBuilder() { + NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) + .usePlaintext(); + grpcOpenTelemetry.configureChannelBuilder(builder); + return builder; + } + + @Test + public void otelSpanContextPropagation() { + Span parentSpan = tracer.spanBuilder("Test.interopTest").startSpan(); + try (Scope scope = Context.current().with(parentSpan).makeCurrent()) { + blockingStub.unaryCall(SimpleRequest.getDefaultInstance()); + } + assertEquals(parentSpan.getSpanContext().getTraceId(), + applicationSpan.get().getSpanContext().getTraceId()); + } +} diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 03183ef4920..96fb8062c93 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -68,6 +68,7 @@ public Stopwatch get() { private final boolean disableDefault; private final OpenTelemetryMetricsResource resource; private final OpenTelemetryMetricsModule openTelemetryMetricsModule; + private final OpenTelemetryTracingModule openTelemetryTracingModule; private final List optionalLabels; private final MetricSink sink; @@ -88,6 +89,7 @@ private GrpcOpenTelemetry(Builder builder) { this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels); this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule( STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins); + this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk); this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels); } @@ -152,6 +154,7 @@ public void configureChannelBuilder(ManagedChannelBuilder builder) { InternalManagedChannelBuilder.addMetricSink(builder, sink); InternalManagedChannelBuilder.interceptWithTarget( builder, openTelemetryMetricsModule::getClientInterceptor); + builder.intercept(openTelemetryTracingModule.getClientInterceptor()); } /** @@ -161,6 +164,9 @@ public void configureChannelBuilder(ManagedChannelBuilder builder) { */ public void configureServerBuilder(ServerBuilder serverBuilder) { serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory()); + serverBuilder.addStreamTracerFactory( + openTelemetryTracingModule.getServerTracerFactory()); + serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor()); } @VisibleForTesting diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java index 8d8b898357a..b628d371000 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -28,6 +28,7 @@ import io.grpc.ClientStreamTracer; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.ForwardingServerCallListener; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; @@ -315,9 +316,59 @@ final class TracingServerSpanPropagationInterceptor implements ServerInterceptor public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { Span span = otelSpan.get(io.grpc.Context.current()); - try (Scope scope = Context.current().with(span).makeCurrent()) { + if (span == null) { + logger.log(Level.FINE, "Server span not found. ServerTracerFactory for server " + + "tracing must be set."); return next.startCall(call, headers); } + try (Scope scope = Context.current().with(span).makeCurrent()) { + return new AttachSpanServerCallListener<>(next.startCall(call, headers), span); + } + } + } + + private static class AttachSpanServerCallListener extends + ForwardingServerCallListener.SimpleForwardingServerCallListener { + private final Span span; + + protected AttachSpanServerCallListener(ServerCall.Listener delegate, Span span) { + super(delegate); + this.span = checkNotNull(span, "span"); + } + + @Override + public void onMessage(ReqT message) { + try (Scope scope = Context.current().with(span).makeCurrent()) { + delegate().onMessage(message); + } + } + + @Override + public void onHalfClose() { + try (Scope scope = Context.current().with(span).makeCurrent()) { + delegate().onHalfClose(); + } + } + + @Override + public void onCancel() { + try (Scope scope = Context.current().with(span).makeCurrent()) { + delegate().onCancel(); + } + } + + @Override + public void onComplete() { + try (Scope scope = Context.current().with(span).makeCurrent()) { + delegate().onComplete(); + } + } + + @Override + public void onReady() { + try (Scope scope = Context.current().with(span).makeCurrent()) { + delegate().onReady(); + } } } From 5a8cc176a7e57d666c5f3636405d4cc5f522848c Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 10 Sep 2024 11:07:35 -0700 Subject: [PATCH 3/7] split interop test, add spanPropagationServerInterceptor test --- interop-testing/build.gradle | 1 - .../OpenTelemetryContextFilterTest.java | 131 ------------------ opentelemetry/build.gradle | 7 +- .../grpc/opentelemetry/GrpcOpenTelemetry.java | 17 ++- .../InternalGrpcOpenTelemetry.java | 13 ++ .../OpenTelemetryTracingModule.java | 38 +++-- .../opentelemetry/GrpcOpenTelemetryTest.java | 24 ++++ .../OpenTelemetryTracingModuleTest.java | 97 ++++++++++++- 8 files changed, 176 insertions(+), 152 deletions(-) delete mode 100644 interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryContextFilterTest.java diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle index a85aec97adf..a19efb00155 100644 --- a/interop-testing/build.gradle +++ b/interop-testing/build.gradle @@ -13,7 +13,6 @@ dependencies { implementation project(path: ':grpc-alts', configuration: 'shadow'), project(':grpc-auth'), project(':grpc-census'), - project(':grpc-opentelemetry'), project(':grpc-gcp-csm-observability'), project(':grpc-netty'), project(':grpc-okhttp'), diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryContextFilterTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryContextFilterTest.java deleted file mode 100644 index 6ff8966e602..00000000000 --- a/interop-testing/src/test/java/io/grpc/testing/integration/OpenTelemetryContextFilterTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright 2024 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.testing.integration; - -import static org.junit.Assert.assertEquals; - -import io.grpc.ForwardingServerCallListener; -import io.grpc.InsecureServerCredentials; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.ServerBuilder; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.netty.NettyChannelBuilder; -import io.grpc.netty.NettyServerBuilder; -import io.grpc.opentelemetry.GrpcOpenTelemetry; -import io.grpc.testing.integration.Messages.SimpleRequest; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.trace.SdkTracerProvider; -import java.util.concurrent.atomic.AtomicReference; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class OpenTelemetryContextFilterTest extends AbstractInteropTest { - private final OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder() - .setTracerProvider(SdkTracerProvider.builder().build()) - .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) - .build(); - private final Tracer tracer = openTelemetrySdk.getTracer("grpc-java"); - private final GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder() - .sdk(openTelemetrySdk).build(); - private final AtomicReference applicationSpan = new AtomicReference<>(); - - @Override - protected ServerBuilder getServerBuilder() { - NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create()) - .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); - builder.intercept(new ServerInterceptor() { - @Override - public ServerCall.Listener interceptCall(ServerCall call, - Metadata headers, ServerCallHandler next) { - ServerCall.Listener listener = next.startCall(call, headers); - return new ForwardingServerCallListener() { - @Override - protected ServerCall.Listener delegate() { - return listener; - } - - @Override - public void onMessage(ReqT request) { - applicationSpan.set(tracer.spanBuilder("InteropTest.Application.Span").startSpan()); - delegate().onMessage(request); - } - - @Override - public void onHalfClose() { - if (applicationSpan.get() != null) { - applicationSpan.get().end(); - } - delegate().onHalfClose(); - } - - @Override - public void onCancel() { - if (applicationSpan.get() != null) { - applicationSpan.get().end(); - } - delegate().onCancel(); - } - - @Override - public void onComplete() { - if (applicationSpan.get() != null) { - applicationSpan.get().end(); - } - delegate().onComplete(); - } - }; - } - }); - grpcOpenTelemetry.configureServerBuilder(builder); - return builder; - } - - @Override - protected boolean metricsExpected() { - return false; - } - - @Override - protected ManagedChannelBuilder createChannelBuilder() { - NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) - .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) - .usePlaintext(); - grpcOpenTelemetry.configureChannelBuilder(builder); - return builder; - } - - @Test - public void otelSpanContextPropagation() { - Span parentSpan = tracer.spanBuilder("Test.interopTest").startSpan(); - try (Scope scope = Context.current().with(parentSpan).makeCurrent()) { - blockingStub.unaryCall(SimpleRequest.getDefaultInstance()); - } - assertEquals(parentSpan.getSpanContext().getTraceId(), - applicationSpan.get().getSpanContext().getTraceId()); - } -} diff --git a/opentelemetry/build.gradle b/opentelemetry/build.gradle index 338ac0f6870..b554f9f9c98 100644 --- a/opentelemetry/build.gradle +++ b/opentelemetry/build.gradle @@ -7,6 +7,8 @@ plugins { description = 'gRPC: OpenTelemetry' +test.testLogging.showStandardStreams(true) + dependencies { api project(':grpc-api') implementation libraries.guava, @@ -17,8 +19,11 @@ dependencies { testImplementation project(':grpc-testing'), project(':grpc-inprocess'), testFixtures(project(':grpc-core')), + testFixtures(project(':grpc-api')), libraries.opentelemetry.sdk.testing, - libraries.assertj.core // opentelemetry.sdk.testing uses compileOnly for assertj + libraries.assertj.core + testImplementation project(path: ':grpc-api') + // opentelemetry.sdk.testing uses compileOnly for assertj annotationProcessor libraries.auto.value diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 96fb8062c93..723f16a1acb 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -37,6 +37,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.api.trace.Tracer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -89,8 +90,8 @@ private GrpcOpenTelemetry(Builder builder) { this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels); this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule( STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins); - this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk); this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels); + this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk); } @VisibleForTesting @@ -127,6 +128,11 @@ MetricSink getSink() { return sink; } + @VisibleForTesting + Tracer getTracer() { + return this.openTelemetryTracingModule.getTracer(); + } + /** * Registers GrpcOpenTelemetry globally, applying its configuration to all subsequently created * gRPC channels and servers. @@ -154,9 +160,18 @@ public void configureChannelBuilder(ManagedChannelBuilder builder) { InternalManagedChannelBuilder.addMetricSink(builder, sink); InternalManagedChannelBuilder.interceptWithTarget( builder, openTelemetryMetricsModule::getClientInterceptor); + } + + void configTracingForChannel(ManagedChannelBuilder builder) { builder.intercept(openTelemetryTracingModule.getClientInterceptor()); } + void configureTracingForServer(ServerBuilder serverBuilder) { + serverBuilder.addStreamTracerFactory( + openTelemetryTracingModule.getServerTracerFactory()); + serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor()); + } + /** * Configures the given {@link ServerBuilder} with OpenTelemetry metrics instrumentation. * diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java index 5d5543dddda..6424c471598 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java @@ -17,6 +17,8 @@ package io.grpc.opentelemetry; import io.grpc.Internal; +import io.grpc.ManagedChannelBuilder; +import io.grpc.ServerBuilder; /** * Internal accessor for {@link GrpcOpenTelemetry}. @@ -29,4 +31,15 @@ public static void builderPlugin( GrpcOpenTelemetry.Builder builder, InternalOpenTelemetryPlugin plugin) { builder.plugin(plugin); } + + + public static void configTracingForChannelExperimental( + ManagedChannelBuilder builder, GrpcOpenTelemetry grpcOpenTelemetry) { + grpcOpenTelemetry.configTracingForChannel(builder); + } + + public static void configChannelBuilderExperimental( + ServerBuilder builder, GrpcOpenTelemetry grpcOpenTelemetry) { + grpcOpenTelemetry.configureTracingForServer(builder); + } } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java index b628d371000..6f2d3268ae0 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; +import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION; import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; @@ -35,6 +36,7 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.ServerStreamTracer; +import io.grpc.opentelemetry.internal.OpenTelemetryConstants; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; @@ -55,8 +57,7 @@ final class OpenTelemetryTracingModule { private static final Logger logger = Logger.getLogger(OpenTelemetryTracingModule.class.getName()); @VisibleForTesting - static final String OTEL_TRACING_SCOPE_NAME = "grpc-java"; - private final io.grpc.Context.Key otelSpan = io.grpc.Context.key("opentelemetry-span-key"); + final io.grpc.Context.Key otelSpan = io.grpc.Context.key("opentelemetry-span-key"); @Nullable private static final AtomicIntegerFieldUpdater callEndedUpdater; @Nullable @@ -94,10 +95,18 @@ final class OpenTelemetryTracingModule { private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory(); OpenTelemetryTracingModule(OpenTelemetry openTelemetry) { - this.otelTracer = checkNotNull(openTelemetry.getTracer(OTEL_TRACING_SCOPE_NAME), "otelTracer"); + this.otelTracer = checkNotNull(openTelemetry.getTracerProvider(), "tracerProvider") + .tracerBuilder(OpenTelemetryConstants.INSTRUMENTATION_SCOPE) + .setInstrumentationVersion(IMPLEMENTATION_VERSION) + .build(); this.contextPropagators = checkNotNull(openTelemetry.getPropagators(), "contextPropagators"); } + @VisibleForTesting + Tracer getTracer() { + return otelTracer; + } + /** * Creates a {@link CallAttemptsTracerFactory} for a new call. */ @@ -321,52 +330,53 @@ public ServerCall.Listener interceptCall(ServerCall(next.startCall(call, headers), span); + Context serverCallContext = Context.current().with(span); + try (Scope scope = serverCallContext.makeCurrent()) { + return new ContextServerCallListener<>(next.startCall(call, headers), serverCallContext); } } } - private static class AttachSpanServerCallListener extends + private static class ContextServerCallListener extends ForwardingServerCallListener.SimpleForwardingServerCallListener { - private final Span span; + private final Context context; - protected AttachSpanServerCallListener(ServerCall.Listener delegate, Span span) { + protected ContextServerCallListener(ServerCall.Listener delegate, Context context) { super(delegate); - this.span = checkNotNull(span, "span"); + this.context = checkNotNull(context, "context"); } @Override public void onMessage(ReqT message) { - try (Scope scope = Context.current().with(span).makeCurrent()) { + try (Scope scope = context.makeCurrent()) { delegate().onMessage(message); } } @Override public void onHalfClose() { - try (Scope scope = Context.current().with(span).makeCurrent()) { + try (Scope scope = context.makeCurrent()) { delegate().onHalfClose(); } } @Override public void onCancel() { - try (Scope scope = Context.current().with(span).makeCurrent()) { + try (Scope scope = context.makeCurrent()) { delegate().onCancel(); } } @Override public void onComplete() { - try (Scope scope = Context.current().with(span).makeCurrent()) { + try (Scope scope = context.makeCurrent()) { delegate().onComplete(); } } @Override public void onReady() { - try (Scope scope = Context.current().with(span).makeCurrent()) { + try (Scope scope = context.makeCurrent()) { delegate().onReady(); } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java index e4a0fa46e8b..a070d88ec21 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java @@ -25,6 +25,7 @@ import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.sdk.trace.SdkTracerProvider; import java.util.Arrays; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,6 +36,7 @@ public class GrpcOpenTelemetryTest { private final InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create(); private final SdkMeterProvider meterProvider = SdkMeterProvider.builder().registerMetricReader(inMemoryMetricReader).build(); + private final SdkTracerProvider tracerProvider = SdkTracerProvider.builder().build(); private final OpenTelemetry noopOpenTelemetry = OpenTelemetry.noop(); @Test @@ -56,6 +58,21 @@ public void build() { assertThat(openTelemetryModule.getOptionalLabels()).isEqualTo(ImmutableList.of("version")); } + @Test + public void buildTracer() { + OpenTelemetrySdk sdk = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build(); + + GrpcOpenTelemetry openTelemetryModule = GrpcOpenTelemetry.newBuilder() + .sdk(sdk).build(); + + assertThat(openTelemetryModule.getOpenTelemetryInstance()).isSameInstanceAs(sdk); + assertThat(openTelemetryModule.getTracer()).isSameInstanceAs( + tracerProvider.tracerBuilder("grpc-java") + .setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION) + .build()); + } + @Test public void builderDefaults() { GrpcOpenTelemetry module = GrpcOpenTelemetry.newBuilder().build(); @@ -73,6 +90,13 @@ public void builderDefaults() { assertThat(module.getEnableMetrics()).isEmpty(); assertThat(module.getOptionalLabels()).isEmpty(); assertThat(module.getSink()).isInstanceOf(MetricSink.class); + + assertThat(module.getTracer()).isSameInstanceAs(noopOpenTelemetry + .getTracerProvider() + .tracerBuilder("grpc-java") + .setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION) + .build() + ); } @Test diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java index 267637e9efb..a0c0674f309 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java @@ -17,13 +17,14 @@ package io.grpc.opentelemetry; import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; -import static io.grpc.opentelemetry.OpenTelemetryTracingModule.OTEL_TRACING_SCOPE_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -41,9 +42,11 @@ import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.NoopServerCall; import io.grpc.Server; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; @@ -51,6 +54,7 @@ import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.opentelemetry.OpenTelemetryTracingModule.CallAttemptsTracerFactory; +import io.grpc.opentelemetry.internal.OpenTelemetryConstants; import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.GrpcServerRule; import io.opentelemetry.api.OpenTelemetry; @@ -60,6 +64,8 @@ import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.TraceId; import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.TracerBuilder; +import io.opentelemetry.api.trace.TracerProvider; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.ContextPropagators; @@ -164,8 +170,15 @@ public String parse(InputStream stream) { @Before public void setUp() { - tracerRule = openTelemetryRule.getOpenTelemetry().getTracer(OTEL_TRACING_SCOPE_NAME); - when(mockOpenTelemetry.getTracer(OTEL_TRACING_SCOPE_NAME)).thenReturn(mockTracer); + tracerRule = openTelemetryRule.getOpenTelemetry().getTracer( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE); + TracerProvider mockTracerProvider = mock(TracerProvider.class); + when(mockOpenTelemetry.getTracerProvider()).thenReturn(mockTracerProvider); + TracerBuilder mockTracerBuilder = mock(TracerBuilder.class); + when(mockTracerProvider.tracerBuilder(OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .thenReturn(mockTracerBuilder); + when(mockTracerBuilder.setInstrumentationVersion(any())).thenReturn(mockTracerBuilder); + when(mockTracerBuilder.build()).thenReturn(mockTracer); when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(mockPropagator)); when(mockSpanBuilder.startSpan()).thenReturn(mockAttemptSpan); when(mockSpanBuilder.setParent(any())).thenReturn(mockSpanBuilder); @@ -459,7 +472,8 @@ public ClientCall interceptCall( @Test public void clientStreamNeverCreatedStillRecordTracing() { - OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry); + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); CallAttemptsTracerFactory callTracer = tracingModule.newClientCallTracer(mockClientSpan, method); @@ -654,6 +668,81 @@ public ServerCall.Listener startCall( assertEquals(serverSpan.getStatus().getDescription(), "PERMISSION_DENIED: No you don't"); } + @Test + public void serverSpanPropagationInterceptor() throws Exception { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + Server server = InProcessServerBuilder.forName("test-span-propagation-interceptor") + .directExecutor().build().start(); + grpcCleanupRule.register(server); + final AtomicReference callbackSpan = new AtomicReference<>(); + ServerCall.Listener getContextListener = new ServerCall.Listener() { + @Override + public void onMessage(Integer message) { + callbackSpan.set(Span.fromContext(Context.current())); + } + + @Override + public void onHalfClose() { + callbackSpan.set(Span.fromContext(Context.current())); + } + + @Override + public void onCancel() { + callbackSpan.set(Span.fromContext(Context.current())); + } + + @Override + public void onComplete() { + callbackSpan.set(Span.fromContext(Context.current())); + } + }; + ServerInterceptor interceptor = tracingModule.getServerSpanPropagationInterceptor(); + @SuppressWarnings("unchecked") + ServerCallHandler handler = mock(ServerCallHandler.class); + when(handler.startCall(any(), any())).thenReturn(getContextListener); + ServerCall call = new NoopServerCall<>(); + Metadata metadata = new Metadata(); + ServerCall.Listener listener = + interceptor.interceptCall(call, metadata, handler); + verify(handler).startCall(same(call), same(metadata)); + listener.onMessage(1); + assertEquals(callbackSpan.get(), Span.getInvalid()); + listener.onReady(); + assertEquals(callbackSpan.get(), Span.getInvalid()); + listener.onCancel(); + assertEquals(callbackSpan.get(), Span.getInvalid()); + listener.onHalfClose(); + assertEquals(callbackSpan.get(), Span.getInvalid()); + listener.onComplete(); + assertEquals(callbackSpan.get(), Span.getInvalid()); + + Span parentSpan = tracerRule.spanBuilder("parent-span").startSpan(); + io.grpc.Context context = io.grpc.Context.current().withValue(tracingModule.otelSpan, parentSpan); + io.grpc.Context previous = context.attach(); + try { + listener = interceptor.interceptCall(call, metadata, handler); + verify(handler, times(2)).startCall(same(call), same(metadata)); + listener.onMessage(1); + assertEquals(callbackSpan.get().getSpanContext().getTraceId(), + parentSpan.getSpanContext().getTraceId()); + listener.onReady(); + assertEquals(callbackSpan.get().getSpanContext().getTraceId(), + parentSpan.getSpanContext().getTraceId()); + listener.onCancel(); + assertEquals(callbackSpan.get().getSpanContext().getTraceId(), + parentSpan.getSpanContext().getTraceId()); + listener.onHalfClose(); + assertEquals(callbackSpan.get().getSpanContext().getTraceId(), + parentSpan.getSpanContext().getTraceId()); + listener.onComplete(); + assertEquals(callbackSpan.get().getSpanContext().getTraceId(), + parentSpan.getSpanContext().getTraceId()); + } finally { + context.detach(previous); + } + } + @Test public void generateTraceSpanName() { assertEquals( From 5509b8fd25eef8ec92d8d2afa1389969e26df95b Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 10 Sep 2024 11:23:53 -0700 Subject: [PATCH 4/7] sync --- opentelemetry/build.gradle | 6 +----- .../main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java | 2 +- .../grpc/opentelemetry/OpenTelemetryTracingModuleTest.java | 6 +++--- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/opentelemetry/build.gradle b/opentelemetry/build.gradle index b554f9f9c98..00d913c280d 100644 --- a/opentelemetry/build.gradle +++ b/opentelemetry/build.gradle @@ -7,8 +7,6 @@ plugins { description = 'gRPC: OpenTelemetry' -test.testLogging.showStandardStreams(true) - dependencies { api project(':grpc-api') implementation libraries.guava, @@ -21,9 +19,7 @@ dependencies { testFixtures(project(':grpc-core')), testFixtures(project(':grpc-api')), libraries.opentelemetry.sdk.testing, - libraries.assertj.core - testImplementation project(path: ':grpc-api') - // opentelemetry.sdk.testing uses compileOnly for assertj + libraries.assertj.core // opentelemetry.sdk.testing uses compileOnly for assertj annotationProcessor libraries.auto.value diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 723f16a1acb..e95ff580689 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -90,8 +90,8 @@ private GrpcOpenTelemetry(Builder builder) { this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels); this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule( STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins); - this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels); this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk); + this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels); } @VisibleForTesting diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java index a0c0674f309..89e79d55d25 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java @@ -703,8 +703,7 @@ public void onComplete() { when(handler.startCall(any(), any())).thenReturn(getContextListener); ServerCall call = new NoopServerCall<>(); Metadata metadata = new Metadata(); - ServerCall.Listener listener = - interceptor.interceptCall(call, metadata, handler); + ServerCall.Listener listener = interceptor.interceptCall(call, metadata, handler); verify(handler).startCall(same(call), same(metadata)); listener.onMessage(1); assertEquals(callbackSpan.get(), Span.getInvalid()); @@ -718,7 +717,8 @@ public void onComplete() { assertEquals(callbackSpan.get(), Span.getInvalid()); Span parentSpan = tracerRule.spanBuilder("parent-span").startSpan(); - io.grpc.Context context = io.grpc.Context.current().withValue(tracingModule.otelSpan, parentSpan); + io.grpc.Context context = io.grpc.Context.current().withValue( + tracingModule.otelSpan, parentSpan); io.grpc.Context previous = context.attach(); try { listener = interceptor.interceptCall(call, metadata, handler); From 4a72006812a76816d94076c2287e36db3793e95c Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 10 Sep 2024 11:36:37 -0700 Subject: [PATCH 5/7] grpcOpentelemetry --- .../io/grpc/opentelemetry/GrpcOpenTelemetry.java | 3 --- .../grpc/opentelemetry/GrpcOpenTelemetryTest.java | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index e95ff580689..de32f00cd36 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -179,9 +179,6 @@ void configureTracingForServer(ServerBuilder serverBuilder) { */ public void configureServerBuilder(ServerBuilder serverBuilder) { serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory()); - serverBuilder.addStreamTracerFactory( - openTelemetryTracingModule.getServerTracerFactory()); - serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor()); } @VisibleForTesting diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java index a070d88ec21..872d71feb06 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java @@ -17,9 +17,16 @@ package io.grpc.opentelemetry; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.collect.ImmutableList; +import io.grpc.ManagedChannelBuilder; import io.grpc.MetricSink; +import io.grpc.ServerBuilder; import io.grpc.internal.GrpcUtil; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -71,6 +78,14 @@ public void buildTracer() { tracerProvider.tracerBuilder("grpc-java") .setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION) .build()); + ServerBuilder mockServerBuiler = mock(ServerBuilder.class); + openTelemetryModule.configureServerBuilder(mockServerBuiler); + verify(mockServerBuiler).addStreamTracerFactory(any()); + verifyNoMoreInteractions(mockServerBuiler); + + ManagedChannelBuilder mockChannelBuilder = mock(ManagedChannelBuilder.class); + openTelemetryModule.configureChannelBuilder(mockChannelBuilder); + verify(mockChannelBuilder, never()).intercept(); } @Test From b2e6ff16fab343eb2e765fab4fe1f1e8bd7033bb Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Wed, 11 Sep 2024 16:43:02 -0700 Subject: [PATCH 6/7] environmental variable --- .../grpc/opentelemetry/GrpcOpenTelemetry.java | 29 +++++++++++------ .../InternalGrpcOpenTelemetry.java | 13 ++------ .../opentelemetry/GrpcOpenTelemetryTest.java | 32 ++++++++++++++----- 3 files changed, 45 insertions(+), 29 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index de32f00cd36..6a6162de7e4 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -62,6 +63,11 @@ public Stopwatch get() { } }; + @VisibleForTesting + static boolean ENABLE_OTEL_TRACING = + Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING")) + || Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING")); + private final OpenTelemetry openTelemetrySdk; private final MeterProvider meterProvider; private final Meter meter; @@ -160,16 +166,9 @@ public void configureChannelBuilder(ManagedChannelBuilder builder) { InternalManagedChannelBuilder.addMetricSink(builder, sink); InternalManagedChannelBuilder.interceptWithTarget( builder, openTelemetryMetricsModule::getClientInterceptor); - } - - void configTracingForChannel(ManagedChannelBuilder builder) { - builder.intercept(openTelemetryTracingModule.getClientInterceptor()); - } - - void configureTracingForServer(ServerBuilder serverBuilder) { - serverBuilder.addStreamTracerFactory( - openTelemetryTracingModule.getServerTracerFactory()); - serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor()); + if (ENABLE_OTEL_TRACING) { + builder.intercept(openTelemetryTracingModule.getClientInterceptor()); + } } /** @@ -179,6 +178,11 @@ void configureTracingForServer(ServerBuilder serverBuilder) { */ public void configureServerBuilder(ServerBuilder serverBuilder) { serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory()); + if (ENABLE_OTEL_TRACING) { + serverBuilder.addStreamTracerFactory( + openTelemetryTracingModule.getServerTracerFactory()); + serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor()); + } } @VisibleForTesting @@ -360,6 +364,11 @@ public Builder disableAllMetrics() { return this; } + Builder enableTracing(boolean enable) { + ENABLE_OTEL_TRACING = enable; + return this; + } + /** * Returns a new {@link GrpcOpenTelemetry} built with the configuration of this {@link * Builder}. diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java index 6424c471598..ea1e7ab803f 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/InternalGrpcOpenTelemetry.java @@ -17,8 +17,6 @@ package io.grpc.opentelemetry; import io.grpc.Internal; -import io.grpc.ManagedChannelBuilder; -import io.grpc.ServerBuilder; /** * Internal accessor for {@link GrpcOpenTelemetry}. @@ -32,14 +30,7 @@ public static void builderPlugin( builder.plugin(plugin); } - - public static void configTracingForChannelExperimental( - ManagedChannelBuilder builder, GrpcOpenTelemetry grpcOpenTelemetry) { - grpcOpenTelemetry.configTracingForChannel(builder); - } - - public static void configChannelBuilderExperimental( - ServerBuilder builder, GrpcOpenTelemetry grpcOpenTelemetry) { - grpcOpenTelemetry.configureTracingForServer(builder); + public static void enableTracing(GrpcOpenTelemetry.Builder builder, boolean enable) { + builder.enableTracing(enable); } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java index 872d71feb06..1ae7b755a48 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/GrpcOpenTelemetryTest.java @@ -19,11 +19,12 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.collect.ImmutableList; +import io.grpc.ClientInterceptor; import io.grpc.ManagedChannelBuilder; import io.grpc.MetricSink; import io.grpc.ServerBuilder; @@ -34,6 +35,8 @@ import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.trace.SdkTracerProvider; import java.util.Arrays; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -45,6 +48,17 @@ public class GrpcOpenTelemetryTest { SdkMeterProvider.builder().registerMetricReader(inMemoryMetricReader).build(); private final SdkTracerProvider tracerProvider = SdkTracerProvider.builder().build(); private final OpenTelemetry noopOpenTelemetry = OpenTelemetry.noop(); + private boolean originalEnableOtelTracing; + + @Before + public void setup() { + originalEnableOtelTracing = GrpcOpenTelemetry.ENABLE_OTEL_TRACING; + } + + @After + public void tearDown() { + GrpcOpenTelemetry.ENABLE_OTEL_TRACING = originalEnableOtelTracing; + } @Test public void build() { @@ -70,22 +84,24 @@ public void buildTracer() { OpenTelemetrySdk sdk = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build(); - GrpcOpenTelemetry openTelemetryModule = GrpcOpenTelemetry.newBuilder() + GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder() + .enableTracing(true) .sdk(sdk).build(); - assertThat(openTelemetryModule.getOpenTelemetryInstance()).isSameInstanceAs(sdk); - assertThat(openTelemetryModule.getTracer()).isSameInstanceAs( + assertThat(grpcOpenTelemetry.getOpenTelemetryInstance()).isSameInstanceAs(sdk); + assertThat(grpcOpenTelemetry.getTracer()).isSameInstanceAs( tracerProvider.tracerBuilder("grpc-java") .setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION) .build()); ServerBuilder mockServerBuiler = mock(ServerBuilder.class); - openTelemetryModule.configureServerBuilder(mockServerBuiler); - verify(mockServerBuiler).addStreamTracerFactory(any()); + grpcOpenTelemetry.configureServerBuilder(mockServerBuiler); + verify(mockServerBuiler, times(2)).addStreamTracerFactory(any()); + verify(mockServerBuiler).intercept(any()); verifyNoMoreInteractions(mockServerBuiler); ManagedChannelBuilder mockChannelBuilder = mock(ManagedChannelBuilder.class); - openTelemetryModule.configureChannelBuilder(mockChannelBuilder); - verify(mockChannelBuilder, never()).intercept(); + grpcOpenTelemetry.configureChannelBuilder(mockChannelBuilder); + verify(mockChannelBuilder).intercept(any(ClientInterceptor.class)); } @Test From 93ffd4fa8bc1c35d0c58fe4cf04be2dfe239c648 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Wed, 11 Sep 2024 16:50:34 -0700 Subject: [PATCH 7/7] fix env variable --- .../main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 6a6162de7e4..6b257a18a6c 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; -import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -34,6 +33,7 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.MetricSink; import io.grpc.ServerBuilder; +import io.grpc.internal.GrpcUtil; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.metrics.Meter; @@ -64,9 +64,8 @@ public Stopwatch get() { }; @VisibleForTesting - static boolean ENABLE_OTEL_TRACING = - Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING")) - || Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING")); + static boolean ENABLE_OTEL_TRACING = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING", + false); private final OpenTelemetry openTelemetrySdk; private final MeterProvider meterProvider;