Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add memory mode support to OTLP exporters #6430

Merged
merged 5 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSender;
import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender;
Expand Down Expand Up @@ -67,7 +68,7 @@ public void export(

private static ManagedChannel defaultGrpcChannel;

private static GrpcExporter<TraceRequestMarshaler> upstreamGrpcExporter;
private static GrpcExporter<Marshaler> upstreamGrpcExporter;
private static GrpcExporter<TraceRequestMarshaler> okhttpGrpcSender;
private static HttpExporter<TraceRequestMarshaler> httpExporter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@

import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.StringJoiner;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand All @@ -22,14 +28,18 @@
@ThreadSafe
public final class OtlpHttpSpanExporter implements SpanExporter {

private final HttpExporterBuilder<TraceRequestMarshaler> builder;
private final HttpExporter<TraceRequestMarshaler> delegate;
private final Deque<LowAllocationTraceRequestMarshaler> marshalerPool = new ArrayDeque<>();
private final HttpExporterBuilder<Marshaler> builder;
private final HttpExporter<Marshaler> delegate;
private final MemoryMode memoryMode;

OtlpHttpSpanExporter(
HttpExporterBuilder<TraceRequestMarshaler> builder,
HttpExporter<TraceRequestMarshaler> delegate) {
HttpExporterBuilder<Marshaler> builder,
HttpExporter<Marshaler> delegate,
MemoryMode memoryMode) {
this.builder = builder;
this.delegate = delegate;
this.memoryMode = memoryMode;
}

/**
Expand Down Expand Up @@ -61,7 +71,7 @@
* @since 1.29.0
*/
public OtlpHttpSpanExporterBuilder toBuilder() {
return new OtlpHttpSpanExporterBuilder(builder.copy());
return new OtlpHttpSpanExporterBuilder(builder.copy(), memoryMode);
}

/**
Expand All @@ -72,8 +82,24 @@
*/
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans);
return delegate.export(exportRequest, spans.size());
if (memoryMode == MemoryMode.REUSABLE_DATA) {
LowAllocationTraceRequestMarshaler marshaler = marshalerPool.poll();

Check warning on line 86 in exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java#L86

Added line #L86 was not covered by tests
if (marshaler == null) {
marshaler = new LowAllocationTraceRequestMarshaler();

Check warning on line 88 in exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java#L88

Added line #L88 was not covered by tests
}
LowAllocationTraceRequestMarshaler exportMarshaler = marshaler;
exportMarshaler.initialize(spans);
return delegate
.export(exportMarshaler, spans.size())
.whenComplete(

Check warning on line 94 in exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java#L90-L94

Added lines #L90 - L94 were not covered by tests
() -> {
exportMarshaler.reset();
marshalerPool.add(exportMarshaler);
});

Check warning on line 98 in exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java#L96-L98

Added lines #L96 - L98 were not covered by tests
}
// MemoryMode == MemoryMode.IMMUTABLE_DATA
TraceRequestMarshaler request = TraceRequestMarshaler.create(spans);
return delegate.export(request, spans.size());
}

/**
Expand All @@ -94,6 +120,9 @@

@Override
public String toString() {
return "OtlpHttpSpanExporter{" + builder.toString(false) + "}";
StringJoiner joiner = new StringJoiner(", ", "OtlpHttpSpanExporter{", "}");
joiner.add(builder.toString(false));
joiner.add("memoryMode=" + memoryMode);
return joiner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
import io.opentelemetry.exporter.internal.compression.CompressorProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.common.export.ProxyOptions;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.time.Duration;
Expand All @@ -33,16 +34,19 @@
public final class OtlpHttpSpanExporterBuilder {

private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/traces";
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;

private final HttpExporterBuilder<TraceRequestMarshaler> delegate;
private final HttpExporterBuilder<Marshaler> delegate;
private MemoryMode memoryMode;

OtlpHttpSpanExporterBuilder(HttpExporterBuilder<TraceRequestMarshaler> delegate) {
OtlpHttpSpanExporterBuilder(HttpExporterBuilder<Marshaler> delegate, MemoryMode memoryMode) {
this.delegate = delegate;
this.memoryMode = memoryMode;
OtlpUserAgent.addUserAgentHeader(delegate::addConstantHeaders);
}

OtlpHttpSpanExporterBuilder() {
this(new HttpExporterBuilder<>("otlp", "span", DEFAULT_ENDPOINT));
this(new HttpExporterBuilder<>("otlp", "span", DEFAULT_ENDPOINT), DEFAULT_MEMORY_MODE);
}

/**
Expand Down Expand Up @@ -207,12 +211,19 @@ public OtlpHttpSpanExporterBuilder setMeterProvider(
return this;
}

/** Set the {@link MemoryMode}. */
OtlpHttpSpanExporterBuilder setMemoryMode(MemoryMode memoryMode) {
requireNonNull(memoryMode, "memoryMode");
this.memoryMode = memoryMode;
return this;
}

/**
* Constructs a new instance of the exporter based on the builder's values.
*
* @return a new exporter's instance
*/
public OtlpHttpSpanExporter build() {
return new OtlpHttpSpanExporter(delegate, delegate.build());
return new OtlpHttpSpanExporter(delegate, delegate.build(), memoryMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import static io.opentelemetry.sdk.metrics.Aggregation.explicitBucketHistogram;

import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException;
import io.opentelemetry.sdk.common.export.MemoryMode;
Expand Down Expand Up @@ -212,12 +214,12 @@ public static void configureOtlpHistogramDefaultAggregation(
}

/**
* Calls {@code #setMemoryMode} on the {@code Otlp{Protocol}MetricExporterBuilder} with the {@code
* memoryMode}.
* Calls {@code #setMemoryMode} on the {@code Otlp{Protocol}{Signal}ExporterBuilder} with the
* {@code memoryMode}.
*/
public static void setMemoryModeOnOtlpMetricExporterBuilder(
Object builder, MemoryMode memoryMode) {
public static void setMemoryModeOnOtlpExporterBuilder(Object builder, MemoryMode memoryMode) {
try {
// Metrics
if (builder instanceof OtlpGrpcMetricExporterBuilder) {
// Calling getDeclaredMethod causes all private methods to be read, which causes a
// ClassNotFoundException when running with the OkHttHttpProvider as the private
Expand All @@ -237,9 +239,27 @@ public static void setMemoryModeOnOtlpMetricExporterBuilder(
"setMemoryMode", MemoryMode.class);
method.setAccessible(true);
method.invoke(builder, memoryMode);
} else if (builder instanceof OtlpGrpcSpanExporterBuilder) {
// Calling getDeclaredMethod causes all private methods to be read, which causes a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could have been avoided by using method handles. Alternatively could have used an internal interface to expose this method.

// ClassNotFoundException when running with the OkHttHttpProvider as the private
// setManagedChanel(io.grpc.ManagedChannel) is reached and io.grpc.ManagedChannel is not on
// the classpath. io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanUtil provides a layer
// of indirection which avoids scanning the OtlpGrpcSpanExporterBuilder private methods.
Class<?> otlpGrpcMetricUtil =
Class.forName("io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanUtil");
Method method =
otlpGrpcMetricUtil.getDeclaredMethod(
"setMemoryMode", OtlpGrpcSpanExporterBuilder.class, MemoryMode.class);
method.setAccessible(true);
method.invoke(null, builder, memoryMode);
} else if (builder instanceof OtlpHttpSpanExporterBuilder) {
Method method =
OtlpHttpSpanExporterBuilder.class.getDeclaredMethod("setMemoryMode", MemoryMode.class);
method.setAccessible(true);
method.invoke(builder, memoryMode);
} else {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ugliness will go away once we promote memory mode to the stable API. I think we just need a release or so to be sure we think this concept is going to work.

throw new IllegalArgumentException(
"Can only set memory mode on OtlpHttpMetricExporterBuilder and OtlpGrpcMetricExporterBuilder.");
"Cannot set memory mode. Unrecognized OTLP exporter builder");
}
} catch (NoSuchMethodException
| InvocationTargetException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public MetricExporter createExporter(ConfigProperties config) {
config, builder::setDefaultAggregationSelector);
ExporterBuilderUtil.configureExporterMemoryMode(
config,
memoryMode ->
OtlpConfigUtil.setMemoryModeOnOtlpMetricExporterBuilder(builder, memoryMode));
memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode));

return builder.build();
} else if (protocol.equals(PROTOCOL_GRPC)) {
Expand All @@ -74,8 +73,7 @@ public MetricExporter createExporter(ConfigProperties config) {
config, builder::setDefaultAggregationSelector);
ExporterBuilderUtil.configureExporterMemoryMode(
config,
memoryMode ->
OtlpConfigUtil.setMemoryModeOnOtlpMetricExporterBuilder(builder, memoryMode));
memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode));

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil.PROTOCOL_HTTP_PROTOBUF;

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
Expand Down Expand Up @@ -52,7 +53,9 @@ public SpanExporter createExporter(ConfigProperties config) {
builder::setClientTls,
builder::setRetryPolicy);
builder.setMeterProvider(meterProviderRef::get);

ExporterBuilderUtil.configureExporterMemoryMode(
config,
memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode));
return builder.build();
} else if (protocol.equals(PROTOCOL_GRPC)) {
OtlpGrpcSpanExporterBuilder builder = grpcBuilder();
Expand All @@ -68,6 +71,9 @@ public SpanExporter createExporter(ConfigProperties config) {
builder::setClientTls,
builder::setRetryPolicy);
builder.setMeterProvider(meterProviderRef::get);
ExporterBuilderUtil.configureExporterMemoryMode(
config,
memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode));

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ final class OtlpGrpcMetricUtil {

private OtlpGrpcMetricUtil() {}

/** See {@link OtlpConfigUtil#setMemoryModeOnOtlpMetricExporterBuilder(Object, MemoryMode)}. */
/** See {@link OtlpConfigUtil#setMemoryModeOnOtlpExporterBuilder(Object, MemoryMode)}. */
static void setMemoryMode(OtlpGrpcMetricExporterBuilder builder, MemoryMode memoryMode) {
builder.setMemoryMode(memoryMode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.grpc.MethodDescriptor;
import io.opentelemetry.exporter.internal.grpc.MarshalerInputStream;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import java.io.InputStream;
import javax.annotation.Nullable;

Expand All @@ -19,15 +19,15 @@ final class MarshalerTraceServiceGrpc {

private static final String SERVICE_NAME = "opentelemetry.proto.collector.trace.v1.TraceService";

private static final MethodDescriptor.Marshaller<TraceRequestMarshaler> REQUEST_MARSHALLER =
new MethodDescriptor.Marshaller<TraceRequestMarshaler>() {
private static final MethodDescriptor.Marshaller<Marshaler> REQUEST_MARSHALLER =
new MethodDescriptor.Marshaller<Marshaler>() {
@Override
public InputStream stream(TraceRequestMarshaler value) {
public InputStream stream(Marshaler value) {
return new MarshalerInputStream(value);
}

@Override
public TraceRequestMarshaler parse(InputStream stream) {
public Marshaler parse(InputStream stream) {
throw new UnsupportedOperationException("Only for serializing");
}
};
Expand All @@ -45,9 +45,9 @@ public ExportTraceServiceResponse parse(InputStream stream) {
}
};

private static final io.grpc.MethodDescriptor<TraceRequestMarshaler, ExportTraceServiceResponse>
private static final io.grpc.MethodDescriptor<Marshaler, ExportTraceServiceResponse>
getExportMethod =
io.grpc.MethodDescriptor.<TraceRequestMarshaler, ExportTraceServiceResponse>newBuilder()
io.grpc.MethodDescriptor.<Marshaler, ExportTraceServiceResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "Export"))
.setRequestMarshaller(REQUEST_MARSHALLER)
Expand All @@ -62,8 +62,7 @@ static TraceServiceFutureStub newFutureStub(
}

static final class TraceServiceFutureStub
extends MarshalerServiceStub<
TraceRequestMarshaler, ExportTraceServiceResponse, TraceServiceFutureStub> {
extends MarshalerServiceStub<Marshaler, ExportTraceServiceResponse, TraceServiceFutureStub> {
private TraceServiceFutureStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
Expand All @@ -76,7 +75,7 @@ protected MarshalerTraceServiceGrpc.TraceServiceFutureStub build(

@Override
public com.google.common.util.concurrent.ListenableFuture<ExportTraceServiceResponse> export(
TraceRequestMarshaler request) {
Marshaler request) {
return io.grpc.stub.ClientCalls.futureUnaryCall(
getChannel().newCall(getExportMethod, getCallOptions()), request);
}
Expand Down
Loading
Loading