From 0d2d67efe4b2bd9c75e15349adda53dafcf1f09b Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Thu, 9 May 2024 12:51:53 -0500 Subject: [PATCH] Add memory mode support to OTLP exporters (#6430) --- .../otlp/trace/OltpExporterBenchmark.java | 3 +- .../http/logs/OtlpHttpLogRecordExporter.java | 45 +++++++++++++--- .../OtlpHttpLogRecordExporterBuilder.java | 21 ++++++-- .../http/metrics/OtlpHttpMetricExporter.java | 33 +++++++++--- .../OtlpHttpMetricExporterBuilder.java | 7 ++- .../otlp/http/trace/OtlpHttpSpanExporter.java | 45 +++++++++++++--- .../trace/OtlpHttpSpanExporterBuilder.java | 21 ++++++-- .../otlp/internal/OtlpConfigUtil.java | 52 +++++++++++++++++-- .../OtlpLogRecordExporterProvider.java | 7 +++ .../internal/OtlpMetricExporterProvider.java | 6 +-- .../internal/OtlpSpanExporterProvider.java | 8 ++- .../otlp/logs/MarshalerLogsServiceGrpc.java | 30 +++++------ .../otlp/logs/OtlpGrpcLogRecordExporter.java | 41 ++++++++++++--- .../OtlpGrpcLogRecordExporterBuilder.java | 22 ++++++-- .../exporter/otlp/logs/OtlpGrpcLogUtil.java | 19 +++++++ .../metrics/MarshalerMetricsServiceGrpc.java | 29 +++++------ .../otlp/metrics/OtlpGrpcMetricExporter.java | 30 +++++++++-- .../OtlpGrpcMetricExporterBuilder.java | 7 ++- .../otlp/metrics/OtlpGrpcMetricUtil.java | 2 +- .../otlp/trace/MarshalerTraceServiceGrpc.java | 19 ++++--- .../otlp/trace/OtlpGrpcSpanExporter.java | 42 ++++++++++++--- .../trace/OtlpGrpcSpanExporterBuilder.java | 22 ++++++-- .../exporter/otlp/trace/OtlpGrpcSpanUtil.java | 19 +++++++ .../OtlpLogRecordExporterProviderTest.java | 7 +++ .../OtlpSpanExporterProviderTest.java | 8 +++ .../OtlpHttpSpanExporterOkHttpSenderTest.java | 30 +++++++++++ .../otlp/traces/OtlpGrpcSpanExporterTest.java | 26 ++++++++++ .../otlp/KeyValueStatelessMarshaler.java | 4 +- .../metrics/ExemplarStatelessMarshaler.java | 6 +-- ...lHistogramDataPointStatelessMarshaler.java | 6 +-- .../HistogramDataPointStatelessMarshaler.java | 6 +-- .../NumberDataPointStatelessMarshaler.java | 6 +-- .../SummaryDataPointStatelessMarshaler.java | 6 +-- .../OtlpExporterIntegrationTest.java | 4 +- 34 files changed, 499 insertions(+), 140 deletions(-) create mode 100644 exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogUtil.java create mode 100644 exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanUtil.java diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index ba6492547c7..5dc9f2107c8 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -15,6 +15,7 @@ import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; import io.opentelemetry.exporter.sender.grpc.managedchannel.internal.UpstreamGrpcSender; import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender; @@ -67,7 +68,7 @@ public void export( private static ManagedChannel defaultGrpcChannel; - private static GrpcExporter upstreamGrpcExporter; + private static GrpcExporter upstreamGrpcExporter; private static GrpcExporter okhttpGrpcSender; private static HttpExporter httpExporter; diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporter.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporter.java index de48a358f41..4f3202ceb99 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporter.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporter.java @@ -7,11 +7,17 @@ import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; +import io.opentelemetry.exporter.internal.otlp.logs.LowAllocationLogsRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import java.util.ArrayDeque; import java.util.Collection; +import java.util.Deque; +import java.util.StringJoiner; import javax.annotation.concurrent.ThreadSafe; /** @@ -22,14 +28,18 @@ @ThreadSafe public final class OtlpHttpLogRecordExporter implements LogRecordExporter { - private final HttpExporterBuilder builder; - private final HttpExporter delegate; + private final Deque marshalerPool = new ArrayDeque<>(); + private final HttpExporterBuilder builder; + private final HttpExporter delegate; + private final MemoryMode memoryMode; OtlpHttpLogRecordExporter( - HttpExporterBuilder builder, - HttpExporter delegate) { + HttpExporterBuilder builder, + HttpExporter delegate, + MemoryMode memoryMode) { this.builder = builder; this.delegate = delegate; + this.memoryMode = memoryMode; } /** @@ -61,7 +71,7 @@ public static OtlpHttpLogRecordExporterBuilder builder() { * @since 1.29.0 */ public OtlpHttpLogRecordExporterBuilder toBuilder() { - return new OtlpHttpLogRecordExporterBuilder(builder.copy()); + return new OtlpHttpLogRecordExporterBuilder(builder.copy(), memoryMode); } /** @@ -72,8 +82,24 @@ public OtlpHttpLogRecordExporterBuilder toBuilder() { */ @Override public CompletableResultCode export(Collection logs) { - LogsRequestMarshaler exportRequest = LogsRequestMarshaler.create(logs); - return delegate.export(exportRequest, logs.size()); + if (memoryMode == MemoryMode.REUSABLE_DATA) { + LowAllocationLogsRequestMarshaler marshaler = marshalerPool.poll(); + if (marshaler == null) { + marshaler = new LowAllocationLogsRequestMarshaler(); + } + LowAllocationLogsRequestMarshaler exportMarshaler = marshaler; + exportMarshaler.initialize(logs); + return delegate + .export(exportMarshaler, logs.size()) + .whenComplete( + () -> { + exportMarshaler.reset(); + marshalerPool.add(exportMarshaler); + }); + } + // MemoryMode == MemoryMode.IMMUTABLE_DATA + LogsRequestMarshaler request = LogsRequestMarshaler.create(logs); + return delegate.export(request, logs.size()); } @Override @@ -89,6 +115,9 @@ public CompletableResultCode shutdown() { @Override public String toString() { - return "OtlpHttpLogRecordExporter{" + builder.toString(false) + "}"; + StringJoiner joiner = new StringJoiner(", ", "OtlpHttpLogRecordExporter{", "}"); + joiner.add(builder.toString(false)); + joiner.add("memoryMode=" + memoryMode); + return joiner.toString(); } } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java index 4d330c42546..619914f26e3 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java @@ -14,8 +14,9 @@ import io.opentelemetry.exporter.internal.compression.CompressorProvider; import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; -import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.common.export.ProxyOptions; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.time.Duration; @@ -33,16 +34,19 @@ public final class OtlpHttpLogRecordExporterBuilder { private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/logs"; + private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA; - private final HttpExporterBuilder delegate; + private final HttpExporterBuilder delegate; + private MemoryMode memoryMode; - OtlpHttpLogRecordExporterBuilder(HttpExporterBuilder delegate) { + OtlpHttpLogRecordExporterBuilder(HttpExporterBuilder delegate, MemoryMode memoryMode) { this.delegate = delegate; + this.memoryMode = memoryMode; OtlpUserAgent.addUserAgentHeader(delegate::addConstantHeaders); } OtlpHttpLogRecordExporterBuilder() { - this(new HttpExporterBuilder<>("otlp", "log", DEFAULT_ENDPOINT)); + this(new HttpExporterBuilder<>("otlp", "log", DEFAULT_ENDPOINT), DEFAULT_MEMORY_MODE); } /** @@ -206,12 +210,19 @@ public OtlpHttpLogRecordExporterBuilder setMeterProvider( return this; } + /** Set the {@link MemoryMode}. */ + OtlpHttpLogRecordExporterBuilder setMemoryMode(MemoryMode memoryMode) { + requireNonNull(memoryMode, "memoryMode"); + this.memoryMode = memoryMode; + return this; + } + /** * Constructs a new instance of the exporter based on the builder's values. * * @return a new exporter's instance */ public OtlpHttpLogRecordExporter build() { - return new OtlpHttpLogRecordExporter(delegate, delegate.build()); + return new OtlpHttpLogRecordExporter(delegate, delegate.build(), memoryMode); } } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java index e361bb5d2da..1634d47fee5 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java @@ -7,6 +7,8 @@ import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.otlp.metrics.LowAllocationMetricsRequestMarshaler; import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.MemoryMode; @@ -17,7 +19,9 @@ import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.util.ArrayDeque; import java.util.Collection; +import java.util.Deque; import java.util.StringJoiner; import javax.annotation.concurrent.ThreadSafe; @@ -29,15 +33,16 @@ @ThreadSafe public final class OtlpHttpMetricExporter implements MetricExporter { - private final HttpExporterBuilder builder; - private final HttpExporter delegate; + private final Deque marshalerPool = new ArrayDeque<>(); + private final HttpExporterBuilder builder; + private final HttpExporter delegate; private final AggregationTemporalitySelector aggregationTemporalitySelector; private final DefaultAggregationSelector defaultAggregationSelector; private final MemoryMode memoryMode; OtlpHttpMetricExporter( - HttpExporterBuilder builder, - HttpExporter delegate, + HttpExporterBuilder builder, + HttpExporter delegate, AggregationTemporalitySelector aggregationTemporalitySelector, DefaultAggregationSelector defaultAggregationSelector, MemoryMode memoryMode) { @@ -103,8 +108,24 @@ public MemoryMode getMemoryMode() { */ @Override public CompletableResultCode export(Collection metrics) { - MetricsRequestMarshaler exportRequest = MetricsRequestMarshaler.create(metrics); - return delegate.export(exportRequest, metrics.size()); + if (memoryMode == MemoryMode.REUSABLE_DATA) { + LowAllocationMetricsRequestMarshaler marshaler = marshalerPool.poll(); + if (marshaler == null) { + marshaler = new LowAllocationMetricsRequestMarshaler(); + } + LowAllocationMetricsRequestMarshaler exportMarshaler = marshaler; + exportMarshaler.initialize(metrics); + return delegate + .export(exportMarshaler, metrics.size()) + .whenComplete( + () -> { + exportMarshaler.reset(); + marshalerPool.add(exportMarshaler); + }); + } + // MemoryMode == MemoryMode.IMMUTABLE_DATA + MetricsRequestMarshaler request = MetricsRequestMarshaler.create(metrics); + return delegate.export(request, metrics.size()); } /** diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java index 8390d5e03b3..e3b23aa77e0 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java @@ -13,7 +13,7 @@ import io.opentelemetry.exporter.internal.compression.CompressorProvider; import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; -import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.common.export.ProxyOptions; @@ -42,7 +42,7 @@ public final class OtlpHttpMetricExporterBuilder { AggregationTemporalitySelector.alwaysCumulative(); private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA; - private final HttpExporterBuilder delegate; + private final HttpExporterBuilder delegate; private AggregationTemporalitySelector aggregationTemporalitySelector = DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR; @@ -50,8 +50,7 @@ public final class OtlpHttpMetricExporterBuilder { DefaultAggregationSelector.getDefault(); private MemoryMode memoryMode; - OtlpHttpMetricExporterBuilder( - HttpExporterBuilder delegate, MemoryMode memoryMode) { + OtlpHttpMetricExporterBuilder(HttpExporterBuilder delegate, MemoryMode memoryMode) { this.delegate = delegate; this.memoryMode = memoryMode; delegate.setMeterProvider(MeterProvider::noop); diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java index d693090e316..86d4016adb6 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java @@ -7,11 +7,17 @@ import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayDeque; import java.util.Collection; +import java.util.Deque; +import java.util.StringJoiner; import javax.annotation.concurrent.ThreadSafe; /** @@ -22,14 +28,18 @@ @ThreadSafe public final class OtlpHttpSpanExporter implements SpanExporter { - private final HttpExporterBuilder builder; - private final HttpExporter delegate; + private final Deque marshalerPool = new ArrayDeque<>(); + private final HttpExporterBuilder builder; + private final HttpExporter delegate; + private final MemoryMode memoryMode; OtlpHttpSpanExporter( - HttpExporterBuilder builder, - HttpExporter delegate) { + HttpExporterBuilder builder, + HttpExporter delegate, + MemoryMode memoryMode) { this.builder = builder; this.delegate = delegate; + this.memoryMode = memoryMode; } /** @@ -61,7 +71,7 @@ public static OtlpHttpSpanExporterBuilder builder() { * @since 1.29.0 */ public OtlpHttpSpanExporterBuilder toBuilder() { - return new OtlpHttpSpanExporterBuilder(builder.copy()); + return new OtlpHttpSpanExporterBuilder(builder.copy(), memoryMode); } /** @@ -72,8 +82,24 @@ public OtlpHttpSpanExporterBuilder toBuilder() { */ @Override public CompletableResultCode export(Collection spans) { - TraceRequestMarshaler exportRequest = TraceRequestMarshaler.create(spans); - return delegate.export(exportRequest, spans.size()); + if (memoryMode == MemoryMode.REUSABLE_DATA) { + LowAllocationTraceRequestMarshaler marshaler = marshalerPool.poll(); + if (marshaler == null) { + marshaler = new LowAllocationTraceRequestMarshaler(); + } + LowAllocationTraceRequestMarshaler exportMarshaler = marshaler; + exportMarshaler.initialize(spans); + return delegate + .export(exportMarshaler, spans.size()) + .whenComplete( + () -> { + exportMarshaler.reset(); + marshalerPool.add(exportMarshaler); + }); + } + // MemoryMode == MemoryMode.IMMUTABLE_DATA + TraceRequestMarshaler request = TraceRequestMarshaler.create(spans); + return delegate.export(request, spans.size()); } /** @@ -94,6 +120,9 @@ public CompletableResultCode shutdown() { @Override public String toString() { - return "OtlpHttpSpanExporter{" + builder.toString(false) + "}"; + StringJoiner joiner = new StringJoiner(", ", "OtlpHttpSpanExporter{", "}"); + joiner.add(builder.toString(false)); + joiner.add("memoryMode=" + memoryMode); + return joiner.toString(); } } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java index e5039c61907..1c735001ebf 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java @@ -14,8 +14,9 @@ import io.opentelemetry.exporter.internal.compression.CompressorProvider; import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; -import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.common.export.ProxyOptions; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.time.Duration; @@ -33,16 +34,19 @@ public final class OtlpHttpSpanExporterBuilder { private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/traces"; + private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA; - private final HttpExporterBuilder delegate; + private final HttpExporterBuilder delegate; + private MemoryMode memoryMode; - OtlpHttpSpanExporterBuilder(HttpExporterBuilder delegate) { + OtlpHttpSpanExporterBuilder(HttpExporterBuilder delegate, MemoryMode memoryMode) { this.delegate = delegate; + this.memoryMode = memoryMode; OtlpUserAgent.addUserAgentHeader(delegate::addConstantHeaders); } OtlpHttpSpanExporterBuilder() { - this(new HttpExporterBuilder<>("otlp", "span", DEFAULT_ENDPOINT)); + this(new HttpExporterBuilder<>("otlp", "span", DEFAULT_ENDPOINT), DEFAULT_MEMORY_MODE); } /** @@ -207,12 +211,19 @@ public OtlpHttpSpanExporterBuilder setMeterProvider( return this; } + /** Set the {@link MemoryMode}. */ + OtlpHttpSpanExporterBuilder setMemoryMode(MemoryMode memoryMode) { + requireNonNull(memoryMode, "memoryMode"); + this.memoryMode = memoryMode; + return this; + } + /** * Constructs a new instance of the exporter based on the builder's values. * * @return a new exporter's instance */ public OtlpHttpSpanExporter build() { - return new OtlpHttpSpanExporter(delegate, delegate.build()); + return new OtlpHttpSpanExporter(delegate, delegate.build(), memoryMode); } } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtil.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtil.java index 5bd08742b09..c06d5f1f60e 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtil.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtil.java @@ -7,8 +7,12 @@ import static io.opentelemetry.sdk.metrics.Aggregation.explicitBucketHistogram; +import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporterBuilder; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder; +import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder; import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException; import io.opentelemetry.sdk.common.export.MemoryMode; @@ -212,12 +216,12 @@ public static void configureOtlpHistogramDefaultAggregation( } /** - * Calls {@code #setMemoryMode} on the {@code Otlp{Protocol}MetricExporterBuilder} with the {@code - * memoryMode}. + * Calls {@code #setMemoryMode} on the {@code Otlp{Protocol}{Signal}ExporterBuilder} with the + * {@code memoryMode}. */ - public static void setMemoryModeOnOtlpMetricExporterBuilder( - Object builder, MemoryMode memoryMode) { + public static void setMemoryModeOnOtlpExporterBuilder(Object builder, MemoryMode memoryMode) { try { + // Metrics if (builder instanceof OtlpGrpcMetricExporterBuilder) { // Calling getDeclaredMethod causes all private methods to be read, which causes a // ClassNotFoundException when running with the OkHttHttpProvider as the private @@ -237,9 +241,47 @@ public static void setMemoryModeOnOtlpMetricExporterBuilder( "setMemoryMode", MemoryMode.class); method.setAccessible(true); method.invoke(builder, memoryMode); + } else if (builder instanceof OtlpGrpcSpanExporterBuilder) { + // Calling getDeclaredMethod causes all private methods to be read, which causes a + // ClassNotFoundException when running with the OkHttHttpProvider as the private + // setManagedChanel(io.grpc.ManagedChannel) is reached and io.grpc.ManagedChannel is not on + // the classpath. io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanUtil provides a layer + // of indirection which avoids scanning the OtlpGrpcSpanExporterBuilder private methods. + Class otlpGrpcMetricUtil = + Class.forName("io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanUtil"); + Method method = + otlpGrpcMetricUtil.getDeclaredMethod( + "setMemoryMode", OtlpGrpcSpanExporterBuilder.class, MemoryMode.class); + method.setAccessible(true); + method.invoke(null, builder, memoryMode); + } else if (builder instanceof OtlpHttpSpanExporterBuilder) { + Method method = + OtlpHttpSpanExporterBuilder.class.getDeclaredMethod("setMemoryMode", MemoryMode.class); + method.setAccessible(true); + method.invoke(builder, memoryMode); + } else if (builder instanceof OtlpGrpcLogRecordExporterBuilder) { + // Calling getDeclaredMethod causes all private methods to be read, which causes a + // ClassNotFoundException when running with the OkHttHttpProvider as the private + // setManagedChanel(io.grpc.ManagedChannel) is reached and io.grpc.ManagedChannel is not on + // the classpath. io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogUtil provides a layer + // of indirection which avoids scanning the OtlpGrpcLogRecordExporterBuilder private + // methods. + Class otlpGrpcMetricUtil = + Class.forName("io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogUtil"); + Method method = + otlpGrpcMetricUtil.getDeclaredMethod( + "setMemoryMode", OtlpGrpcLogRecordExporterBuilder.class, MemoryMode.class); + method.setAccessible(true); + method.invoke(null, builder, memoryMode); + } else if (builder instanceof OtlpHttpLogRecordExporterBuilder) { + Method method = + OtlpHttpLogRecordExporterBuilder.class.getDeclaredMethod( + "setMemoryMode", MemoryMode.class); + method.setAccessible(true); + method.invoke(builder, memoryMode); } else { throw new IllegalArgumentException( - "Can only set memory mode on OtlpHttpMetricExporterBuilder and OtlpGrpcMetricExporterBuilder."); + "Cannot set memory mode. Unrecognized OTLP exporter builder"); } } catch (NoSuchMethodException | InvocationTargetException diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProvider.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProvider.java index 1694189d69b..04cf661b58b 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProvider.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProvider.java @@ -10,6 +10,7 @@ import static io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil.PROTOCOL_HTTP_PROTOBUF; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.ExporterBuilderUtil; import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter; import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporterBuilder; import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter; @@ -53,6 +54,9 @@ public LogRecordExporter createExporter(ConfigProperties config) { builder::setClientTls, builder::setRetryPolicy); builder.setMeterProvider(meterProviderRef::get); + ExporterBuilderUtil.configureExporterMemoryMode( + config, + memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode)); return builder.build(); } else if (protocol.equals(PROTOCOL_GRPC)) { @@ -69,6 +73,9 @@ public LogRecordExporter createExporter(ConfigProperties config) { builder::setClientTls, builder::setRetryPolicy); builder.setMeterProvider(meterProviderRef::get); + ExporterBuilderUtil.configureExporterMemoryMode( + config, + memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode)); return builder.build(); } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpMetricExporterProvider.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpMetricExporterProvider.java index cfea314f028..490f4044bea 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpMetricExporterProvider.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpMetricExporterProvider.java @@ -51,8 +51,7 @@ public MetricExporter createExporter(ConfigProperties config) { config, builder::setDefaultAggregationSelector); ExporterBuilderUtil.configureExporterMemoryMode( config, - memoryMode -> - OtlpConfigUtil.setMemoryModeOnOtlpMetricExporterBuilder(builder, memoryMode)); + memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode)); return builder.build(); } else if (protocol.equals(PROTOCOL_GRPC)) { @@ -74,8 +73,7 @@ public MetricExporter createExporter(ConfigProperties config) { config, builder::setDefaultAggregationSelector); ExporterBuilderUtil.configureExporterMemoryMode( config, - memoryMode -> - OtlpConfigUtil.setMemoryModeOnOtlpMetricExporterBuilder(builder, memoryMode)); + memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode)); return builder.build(); } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProvider.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProvider.java index b4b8731b1bd..bdf9633a441 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProvider.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProvider.java @@ -10,6 +10,7 @@ import static io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil.PROTOCOL_HTTP_PROTOBUF; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.ExporterBuilderUtil; import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder; import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; @@ -52,7 +53,9 @@ public SpanExporter createExporter(ConfigProperties config) { builder::setClientTls, builder::setRetryPolicy); builder.setMeterProvider(meterProviderRef::get); - + ExporterBuilderUtil.configureExporterMemoryMode( + config, + memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode)); return builder.build(); } else if (protocol.equals(PROTOCOL_GRPC)) { OtlpGrpcSpanExporterBuilder builder = grpcBuilder(); @@ -68,6 +71,9 @@ public SpanExporter createExporter(ConfigProperties config) { builder::setClientTls, builder::setRetryPolicy); builder.setMeterProvider(meterProviderRef::get); + ExporterBuilderUtil.configureExporterMemoryMode( + config, + memoryMode -> OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode)); return builder.build(); } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/MarshalerLogsServiceGrpc.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/MarshalerLogsServiceGrpc.java index 06d7da50bff..451e5abae32 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/MarshalerLogsServiceGrpc.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/MarshalerLogsServiceGrpc.java @@ -14,7 +14,7 @@ import io.grpc.stub.ClientCalls; import io.opentelemetry.exporter.internal.grpc.MarshalerInputStream; import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; -import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import java.io.InputStream; import javax.annotation.Nullable; @@ -23,15 +23,15 @@ final class MarshalerLogsServiceGrpc { private static final String SERVICE_NAME = "opentelemetry.proto.collector.logs.v1.LogsService"; - private static final MethodDescriptor.Marshaller REQUEST_MARSHALLER = - new MethodDescriptor.Marshaller() { + private static final MethodDescriptor.Marshaller REQUEST_MARSHALLER = + new MethodDescriptor.Marshaller() { @Override - public InputStream stream(LogsRequestMarshaler value) { + public InputStream stream(Marshaler value) { return new MarshalerInputStream(value); } @Override - public LogsRequestMarshaler parse(InputStream stream) { + public Marshaler parse(InputStream stream) { throw new UnsupportedOperationException("Only for serializing"); } }; @@ -49,14 +49,13 @@ public ExportLogsServiceResponse parse(InputStream stream) { } }; - private static final MethodDescriptor - getExportMethod = - MethodDescriptor.newBuilder() - .setType(MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "Export")) - .setRequestMarshaller(REQUEST_MARSHALLER) - .setResponseMarshaller(RESPONSE_MARSHALER) - .build(); + private static final MethodDescriptor getExportMethod = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName(generateFullMethodName(SERVICE_NAME, "Export")) + .setRequestMarshaller(REQUEST_MARSHALLER) + .setResponseMarshaller(RESPONSE_MARSHALER) + .build(); static LogsServiceFutureStub newFutureStub(Channel channel, @Nullable String authorityOverride) { return LogsServiceFutureStub.newStub( @@ -65,8 +64,7 @@ static LogsServiceFutureStub newFutureStub(Channel channel, @Nullable String aut } static final class LogsServiceFutureStub - extends MarshalerServiceStub< - LogsRequestMarshaler, ExportLogsServiceResponse, LogsServiceFutureStub> { + extends MarshalerServiceStub { private LogsServiceFutureStub(Channel channel, CallOptions callOptions) { super(channel, callOptions); } @@ -78,7 +76,7 @@ protected MarshalerLogsServiceGrpc.LogsServiceFutureStub build( } @Override - public ListenableFuture export(LogsRequestMarshaler request) { + public ListenableFuture export(Marshaler request) { return ClientCalls.futureUnaryCall( getChannel().newCall(getExportMethod, getCallOptions()), request); } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporter.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporter.java index a97a9553d13..efde0010450 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporter.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporter.java @@ -7,11 +7,17 @@ import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; +import io.opentelemetry.exporter.internal.otlp.logs.LowAllocationLogsRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import java.util.ArrayDeque; import java.util.Collection; +import java.util.Deque; +import java.util.StringJoiner; import javax.annotation.concurrent.ThreadSafe; /** @@ -22,8 +28,10 @@ @ThreadSafe public final class OtlpGrpcLogRecordExporter implements LogRecordExporter { - private final GrpcExporterBuilder builder; - private final GrpcExporter delegate; + private final Deque marshalerPool = new ArrayDeque<>(); + private final GrpcExporterBuilder builder; + private final GrpcExporter delegate; + private final MemoryMode memoryMode; /** * Returns a new {@link OtlpGrpcLogRecordExporter} using the default values. @@ -47,10 +55,12 @@ public static OtlpGrpcLogRecordExporterBuilder builder() { } OtlpGrpcLogRecordExporter( - GrpcExporterBuilder builder, - GrpcExporter delegate) { + GrpcExporterBuilder builder, + GrpcExporter delegate, + MemoryMode memoryMode) { this.builder = builder; this.delegate = delegate; + this.memoryMode = memoryMode; } /** @@ -61,7 +71,7 @@ public static OtlpGrpcLogRecordExporterBuilder builder() { * @since 1.29.0 */ public OtlpGrpcLogRecordExporterBuilder toBuilder() { - return new OtlpGrpcLogRecordExporterBuilder(builder.copy()); + return new OtlpGrpcLogRecordExporterBuilder(builder.copy(), memoryMode); } /** @@ -72,6 +82,22 @@ public OtlpGrpcLogRecordExporterBuilder toBuilder() { */ @Override public CompletableResultCode export(Collection logs) { + if (memoryMode == MemoryMode.REUSABLE_DATA) { + LowAllocationLogsRequestMarshaler marshaler = marshalerPool.poll(); + if (marshaler == null) { + marshaler = new LowAllocationLogsRequestMarshaler(); + } + LowAllocationLogsRequestMarshaler exportMarshaler = marshaler; + exportMarshaler.initialize(logs); + return delegate + .export(exportMarshaler, logs.size()) + .whenComplete( + () -> { + exportMarshaler.reset(); + marshalerPool.add(exportMarshaler); + }); + } + // MemoryMode == MemoryMode.IMMUTABLE_DATA LogsRequestMarshaler request = LogsRequestMarshaler.create(logs); return delegate.export(request, logs.size()); } @@ -92,6 +118,9 @@ public CompletableResultCode shutdown() { @Override public String toString() { - return "OtlpGrpcLogRecordExporter{" + builder.toString(false) + "}"; + StringJoiner joiner = new StringJoiner(", ", "OtlpGrpcLogRecordExporter{", "}"); + joiner.add(builder.toString(false)); + joiner.add("memoryMode=" + memoryMode); + return joiner.toString(); } } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java index df5f4769588..58cfc8e3c61 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java @@ -15,8 +15,9 @@ import io.opentelemetry.exporter.internal.compression.CompressorProvider; import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; -import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.net.URI; import java.time.Duration; @@ -41,12 +42,15 @@ public final class OtlpGrpcLogRecordExporterBuilder { private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317"; private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL); private static final long DEFAULT_TIMEOUT_SECS = 10; + private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA; // Visible for testing - final GrpcExporterBuilder delegate; + final GrpcExporterBuilder delegate; + private MemoryMode memoryMode; - OtlpGrpcLogRecordExporterBuilder(GrpcExporterBuilder delegate) { + OtlpGrpcLogRecordExporterBuilder(GrpcExporterBuilder delegate, MemoryMode memoryMode) { this.delegate = delegate; + this.memoryMode = memoryMode; OtlpUserAgent.addUserAgentHeader(delegate::addConstantHeader); } @@ -58,7 +62,8 @@ public final class OtlpGrpcLogRecordExporterBuilder { DEFAULT_TIMEOUT_SECS, DEFAULT_ENDPOINT, () -> MarshalerLogsServiceGrpc::newFutureStub, - GRPC_ENDPOINT_PATH)); + GRPC_ENDPOINT_PATH), + DEFAULT_MEMORY_MODE); } /** @@ -238,12 +243,19 @@ public OtlpGrpcLogRecordExporterBuilder setMeterProvider( return this; } + /** Set the {@link MemoryMode}. */ + OtlpGrpcLogRecordExporterBuilder setMemoryMode(MemoryMode memoryMode) { + requireNonNull(memoryMode, "memoryMode"); + this.memoryMode = memoryMode; + return this; + } + /** * Constructs a new instance of the exporter based on the builder's values. * * @return a new exporter's instance */ public OtlpGrpcLogRecordExporter build() { - return new OtlpGrpcLogRecordExporter(delegate, delegate.build()); + return new OtlpGrpcLogRecordExporter(delegate, delegate.build(), memoryMode); } } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogUtil.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogUtil.java new file mode 100644 index 00000000000..1602514e8d5 --- /dev/null +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogUtil.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.logs; + +import io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil; +import io.opentelemetry.sdk.common.export.MemoryMode; + +final class OtlpGrpcLogUtil { + + private OtlpGrpcLogUtil() {} + + /** See {@link OtlpConfigUtil#setMemoryModeOnOtlpExporterBuilder(Object, MemoryMode)}. */ + static void setMemoryMode(OtlpGrpcLogRecordExporterBuilder builder, MemoryMode memoryMode) { + builder.setMemoryMode(memoryMode); + } +} diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/MarshalerMetricsServiceGrpc.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/MarshalerMetricsServiceGrpc.java index 240ad81beb2..af70c6bd175 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/MarshalerMetricsServiceGrpc.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/MarshalerMetricsServiceGrpc.java @@ -14,7 +14,7 @@ import io.grpc.stub.ClientCalls; import io.opentelemetry.exporter.internal.grpc.MarshalerInputStream; import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; -import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import java.io.InputStream; import javax.annotation.Nullable; @@ -24,15 +24,15 @@ final class MarshalerMetricsServiceGrpc { private static final String SERVICE_NAME = "opentelemetry.proto.collector.metrics.v1.MetricsService"; - private static final MethodDescriptor.Marshaller REQUEST_MARSHALLER = - new MethodDescriptor.Marshaller() { + private static final MethodDescriptor.Marshaller REQUEST_MARSHALLER = + new MethodDescriptor.Marshaller() { @Override - public InputStream stream(MetricsRequestMarshaler value) { + public InputStream stream(Marshaler value) { return new MarshalerInputStream(value); } @Override - public MetricsRequestMarshaler parse(InputStream stream) { + public Marshaler parse(InputStream stream) { throw new UnsupportedOperationException("Only for serializing"); } }; @@ -51,14 +51,13 @@ public ExportMetricsServiceResponse parse(InputStream stream) { } }; - private static final MethodDescriptor - getExportMethod = - MethodDescriptor.newBuilder() - .setType(MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "Export")) - .setRequestMarshaller(REQUEST_MARSHALLER) - .setResponseMarshaller(RESPONSE_MARSHALER) - .build(); + private static final MethodDescriptor getExportMethod = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName(generateFullMethodName(SERVICE_NAME, "Export")) + .setRequestMarshaller(REQUEST_MARSHALLER) + .setResponseMarshaller(RESPONSE_MARSHALER) + .build(); static MetricsServiceFutureStub newFutureStub( Channel channel, @Nullable String authorityOverride) { @@ -69,7 +68,7 @@ static MetricsServiceFutureStub newFutureStub( static final class MetricsServiceFutureStub extends MarshalerServiceStub< - MetricsRequestMarshaler, ExportMetricsServiceResponse, MetricsServiceFutureStub> { + Marshaler, ExportMetricsServiceResponse, MetricsServiceFutureStub> { private MetricsServiceFutureStub(Channel channel, CallOptions callOptions) { super(channel, callOptions); } @@ -81,7 +80,7 @@ protected MarshalerMetricsServiceGrpc.MetricsServiceFutureStub build( } @Override - public ListenableFuture export(MetricsRequestMarshaler request) { + public ListenableFuture export(Marshaler request) { return ClientCalls.futureUnaryCall( getChannel().newCall(getExportMethod, getCallOptions()), request); } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporter.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporter.java index eeeba0773b7..1a9d3ed20e2 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporter.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporter.java @@ -7,6 +7,8 @@ import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.otlp.metrics.LowAllocationMetricsRequestMarshaler; import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.MemoryMode; @@ -17,7 +19,9 @@ import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector; import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.util.ArrayDeque; import java.util.Collection; +import java.util.Deque; import java.util.StringJoiner; import javax.annotation.concurrent.ThreadSafe; @@ -29,8 +33,9 @@ @ThreadSafe public final class OtlpGrpcMetricExporter implements MetricExporter { - private final GrpcExporterBuilder builder; - private final GrpcExporter delegate; + private final Deque marshalerPool = new ArrayDeque<>(); + private final GrpcExporterBuilder builder; + private final GrpcExporter delegate; private final AggregationTemporalitySelector aggregationTemporalitySelector; private final DefaultAggregationSelector defaultAggregationSelector; private final MemoryMode memoryMode; @@ -57,8 +62,8 @@ public static OtlpGrpcMetricExporterBuilder builder() { } OtlpGrpcMetricExporter( - GrpcExporterBuilder builder, - GrpcExporter delegate, + GrpcExporterBuilder builder, + GrpcExporter delegate, AggregationTemporalitySelector aggregationTemporalitySelector, DefaultAggregationSelector defaultAggregationSelector, MemoryMode memoryMode) { @@ -103,8 +108,23 @@ public MemoryMode getMemoryMode() { */ @Override public CompletableResultCode export(Collection metrics) { + if (memoryMode == MemoryMode.REUSABLE_DATA) { + LowAllocationMetricsRequestMarshaler marshaler = marshalerPool.poll(); + if (marshaler == null) { + marshaler = new LowAllocationMetricsRequestMarshaler(); + } + LowAllocationMetricsRequestMarshaler exportMarshaler = marshaler; + exportMarshaler.initialize(metrics); + return delegate + .export(exportMarshaler, metrics.size()) + .whenComplete( + () -> { + exportMarshaler.reset(); + marshalerPool.add(exportMarshaler); + }); + } + // MemoryMode == MemoryMode.IMMUTABLE_DATA MetricsRequestMarshaler request = MetricsRequestMarshaler.create(metrics); - return delegate.export(request, metrics.size()); } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java index 782907cd27c..343e2a5dd9b 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java @@ -14,7 +14,7 @@ import io.opentelemetry.exporter.internal.compression.CompressorProvider; import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; -import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.common.export.RetryPolicy; @@ -50,7 +50,7 @@ public final class OtlpGrpcMetricExporterBuilder { private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA; // Visible for testing - final GrpcExporterBuilder delegate; + final GrpcExporterBuilder delegate; private AggregationTemporalitySelector aggregationTemporalitySelector = DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR; @@ -59,8 +59,7 @@ public final class OtlpGrpcMetricExporterBuilder { DefaultAggregationSelector.getDefault(); private MemoryMode memoryMode; - OtlpGrpcMetricExporterBuilder( - GrpcExporterBuilder delegate, MemoryMode memoryMode) { + OtlpGrpcMetricExporterBuilder(GrpcExporterBuilder delegate, MemoryMode memoryMode) { this.delegate = delegate; this.memoryMode = memoryMode; delegate.setMeterProvider(MeterProvider::noop); diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricUtil.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricUtil.java index f99243cb0ba..7d4080b87ef 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricUtil.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricUtil.java @@ -12,7 +12,7 @@ final class OtlpGrpcMetricUtil { private OtlpGrpcMetricUtil() {} - /** See {@link OtlpConfigUtil#setMemoryModeOnOtlpMetricExporterBuilder(Object, MemoryMode)}. */ + /** See {@link OtlpConfigUtil#setMemoryModeOnOtlpExporterBuilder(Object, MemoryMode)}. */ static void setMemoryMode(OtlpGrpcMetricExporterBuilder builder, MemoryMode memoryMode) { builder.setMemoryMode(memoryMode); } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java index 973bc37cb39..784eae98a49 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/MarshalerTraceServiceGrpc.java @@ -10,7 +10,7 @@ import io.grpc.MethodDescriptor; import io.opentelemetry.exporter.internal.grpc.MarshalerInputStream; import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; -import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import java.io.InputStream; import javax.annotation.Nullable; @@ -19,15 +19,15 @@ final class MarshalerTraceServiceGrpc { private static final String SERVICE_NAME = "opentelemetry.proto.collector.trace.v1.TraceService"; - private static final MethodDescriptor.Marshaller REQUEST_MARSHALLER = - new MethodDescriptor.Marshaller() { + private static final MethodDescriptor.Marshaller REQUEST_MARSHALLER = + new MethodDescriptor.Marshaller() { @Override - public InputStream stream(TraceRequestMarshaler value) { + public InputStream stream(Marshaler value) { return new MarshalerInputStream(value); } @Override - public TraceRequestMarshaler parse(InputStream stream) { + public Marshaler parse(InputStream stream) { throw new UnsupportedOperationException("Only for serializing"); } }; @@ -45,9 +45,9 @@ public ExportTraceServiceResponse parse(InputStream stream) { } }; - private static final io.grpc.MethodDescriptor + private static final io.grpc.MethodDescriptor getExportMethod = - io.grpc.MethodDescriptor.newBuilder() + io.grpc.MethodDescriptor.newBuilder() .setType(io.grpc.MethodDescriptor.MethodType.UNARY) .setFullMethodName(generateFullMethodName(SERVICE_NAME, "Export")) .setRequestMarshaller(REQUEST_MARSHALLER) @@ -62,8 +62,7 @@ static TraceServiceFutureStub newFutureStub( } static final class TraceServiceFutureStub - extends MarshalerServiceStub< - TraceRequestMarshaler, ExportTraceServiceResponse, TraceServiceFutureStub> { + extends MarshalerServiceStub { private TraceServiceFutureStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) { super(channel, callOptions); } @@ -76,7 +75,7 @@ protected MarshalerTraceServiceGrpc.TraceServiceFutureStub build( @Override public com.google.common.util.concurrent.ListenableFuture export( - TraceRequestMarshaler request) { + Marshaler request) { return io.grpc.stub.ClientCalls.futureUnaryCall( getChannel().newCall(getExportMethod, getCallOptions()), request); } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java index a249c25e76f..a2c29d87bc1 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporter.java @@ -7,19 +7,27 @@ import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayDeque; import java.util.Collection; +import java.util.Deque; +import java.util.StringJoiner; import javax.annotation.concurrent.ThreadSafe; /** Exports spans using OTLP via gRPC, using OpenTelemetry's protobuf model. */ @ThreadSafe public final class OtlpGrpcSpanExporter implements SpanExporter { - private final GrpcExporterBuilder builder; - private final GrpcExporter delegate; + private final Deque marshalerPool = new ArrayDeque<>(); + private final GrpcExporterBuilder builder; + private final GrpcExporter delegate; + private final MemoryMode memoryMode; /** * Returns a new {@link OtlpGrpcSpanExporter} using the default values. @@ -43,10 +51,12 @@ public static OtlpGrpcSpanExporterBuilder builder() { } OtlpGrpcSpanExporter( - GrpcExporterBuilder builder, - GrpcExporter delegate) { + GrpcExporterBuilder builder, + GrpcExporter delegate, + MemoryMode memoryMode) { this.builder = builder; this.delegate = delegate; + this.memoryMode = memoryMode; } /** @@ -57,7 +67,7 @@ public static OtlpGrpcSpanExporterBuilder builder() { * @since 1.29.0 */ public OtlpGrpcSpanExporterBuilder toBuilder() { - return new OtlpGrpcSpanExporterBuilder(builder.copy()); + return new OtlpGrpcSpanExporterBuilder(builder.copy(), memoryMode); } /** @@ -68,8 +78,23 @@ public OtlpGrpcSpanExporterBuilder toBuilder() { */ @Override public CompletableResultCode export(Collection spans) { + if (memoryMode == MemoryMode.REUSABLE_DATA) { + LowAllocationTraceRequestMarshaler marshaler = marshalerPool.poll(); + if (marshaler == null) { + marshaler = new LowAllocationTraceRequestMarshaler(); + } + LowAllocationTraceRequestMarshaler exportMarshaler = marshaler; + exportMarshaler.initialize(spans); + return delegate + .export(exportMarshaler, spans.size()) + .whenComplete( + () -> { + exportMarshaler.reset(); + marshalerPool.add(exportMarshaler); + }); + } + // MemoryMode == MemoryMode.IMMUTABLE_DATA TraceRequestMarshaler request = TraceRequestMarshaler.create(spans); - return delegate.export(request, spans.size()); } @@ -94,6 +119,9 @@ public CompletableResultCode shutdown() { @Override public String toString() { - return "OtlpGrpcSpanExporter{" + builder.toString(false) + "}"; + StringJoiner joiner = new StringJoiner(", ", "OtlpGrpcSpanExporter{", "}"); + joiner.add(builder.toString(false)); + joiner.add("memoryMode=" + memoryMode); + return joiner.toString(); } } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java index 9f48078b701..20ad734da43 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java @@ -15,8 +15,9 @@ import io.opentelemetry.exporter.internal.compression.CompressorProvider; import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; -import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.net.URI; import java.time.Duration; @@ -37,12 +38,15 @@ public final class OtlpGrpcSpanExporterBuilder { private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317"; private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL); private static final long DEFAULT_TIMEOUT_SECS = 10; + private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA; // Visible for testing - final GrpcExporterBuilder delegate; + final GrpcExporterBuilder delegate; + private MemoryMode memoryMode; - OtlpGrpcSpanExporterBuilder(GrpcExporterBuilder delegate) { + OtlpGrpcSpanExporterBuilder(GrpcExporterBuilder delegate, MemoryMode memoryMode) { this.delegate = delegate; + this.memoryMode = memoryMode; OtlpUserAgent.addUserAgentHeader(delegate::addConstantHeader); } @@ -54,7 +58,8 @@ public final class OtlpGrpcSpanExporterBuilder { DEFAULT_TIMEOUT_SECS, DEFAULT_ENDPOINT, () -> MarshalerTraceServiceGrpc::newFutureStub, - GRPC_ENDPOINT_PATH)); + GRPC_ENDPOINT_PATH), + DEFAULT_MEMORY_MODE); } /** @@ -235,12 +240,19 @@ public OtlpGrpcSpanExporterBuilder setMeterProvider( return this; } + /** Set the {@link MemoryMode}. */ + OtlpGrpcSpanExporterBuilder setMemoryMode(MemoryMode memoryMode) { + requireNonNull(memoryMode, "memoryMode"); + this.memoryMode = memoryMode; + return this; + } + /** * Constructs a new instance of the exporter based on the builder's values. * * @return a new exporter's instance */ public OtlpGrpcSpanExporter build() { - return new OtlpGrpcSpanExporter(delegate, delegate.build()); + return new OtlpGrpcSpanExporter(delegate, delegate.build(), memoryMode); } } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanUtil.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanUtil.java new file mode 100644 index 00000000000..25955ffa265 --- /dev/null +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanUtil.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.otlp.trace; + +import io.opentelemetry.exporter.otlp.internal.OtlpConfigUtil; +import io.opentelemetry.sdk.common.export.MemoryMode; + +final class OtlpGrpcSpanUtil { + + private OtlpGrpcSpanUtil() {} + + /** See {@link OtlpConfigUtil#setMemoryModeOnOtlpExporterBuilder(Object, MemoryMode)}. */ + static void setMemoryMode(OtlpGrpcSpanExporterBuilder builder, MemoryMode memoryMode) { + builder.setMemoryMode(memoryMode); + } +} diff --git a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProviderTest.java b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProviderTest.java index 8bfb90e1714..5619ddf5aa4 100644 --- a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProviderTest.java +++ b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProviderTest.java @@ -20,6 +20,7 @@ import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder; import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException; import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.logs.export.LogRecordExporter; import java.io.IOException; import java.nio.file.Files; @@ -127,6 +128,7 @@ void createExporter_GrpcDefaults() { verify(grpcBuilder, never()).setTrustedCertificates(any()); verify(grpcBuilder, never()).setClientTls(any(), any()); assertThat(grpcBuilder).extracting("delegate").extracting("retryPolicy").isNull(); + assertThat(exporter).extracting("memoryMode").isEqualTo(MemoryMode.IMMUTABLE_DATA); } Mockito.verifyNoInteractions(httpBuilder); } @@ -176,6 +178,7 @@ void createExporter_GrpcWithSignalConfiguration() throws CertificateEncodingExce config.put("otel.exporter.otlp.logs.compression", "gzip"); config.put("otel.exporter.otlp.timeout", "1s"); config.put("otel.exporter.otlp.logs.timeout", "15s"); + config.put("otel.java.experimental.exporter.memory_mode", "reusable_data"); try (LogRecordExporter exporter = provider.createExporter(DefaultConfigProperties.createFromMap(config))) { @@ -188,6 +191,7 @@ void createExporter_GrpcWithSignalConfiguration() throws CertificateEncodingExce verify(grpcBuilder).setTrustedCertificates(serverTls.certificate().getEncoded()); verify(grpcBuilder) .setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded()); + assertThat(exporter).extracting("memoryMode").isEqualTo(MemoryMode.REUSABLE_DATA); } Mockito.verifyNoInteractions(httpBuilder); } @@ -207,6 +211,7 @@ void createExporter_HttpDefaults() { verify(httpBuilder, never()).setTrustedCertificates(any()); verify(httpBuilder, never()).setClientTls(any(), any()); assertThat(httpBuilder).extracting("delegate").extracting("retryPolicy").isNull(); + assertThat(exporter).extracting("memoryMode").isEqualTo(MemoryMode.IMMUTABLE_DATA); } Mockito.verifyNoInteractions(grpcBuilder); } @@ -259,6 +264,7 @@ void createExporter_HttpWithSignalConfiguration() throws CertificateEncodingExce config.put("otel.exporter.otlp.logs.compression", "gzip"); config.put("otel.exporter.otlp.timeout", "1s"); config.put("otel.exporter.otlp.logs.timeout", "15s"); + config.put("otel.java.experimental.exporter.memory_mode", "reusable_data"); try (LogRecordExporter exporter = provider.createExporter(DefaultConfigProperties.createFromMap(config))) { @@ -271,6 +277,7 @@ void createExporter_HttpWithSignalConfiguration() throws CertificateEncodingExce verify(httpBuilder).setTrustedCertificates(serverTls.certificate().getEncoded()); verify(httpBuilder) .setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded()); + assertThat(exporter).extracting("memoryMode").isEqualTo(MemoryMode.REUSABLE_DATA); } Mockito.verifyNoInteractions(grpcBuilder); } diff --git a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProviderTest.java b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProviderTest.java index 1918408e75b..b2b67b56b61 100644 --- a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProviderTest.java +++ b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProviderTest.java @@ -20,6 +20,7 @@ import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder; import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException; import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties; +import io.opentelemetry.sdk.common.export.MemoryMode; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.io.IOException; import java.nio.file.Files; @@ -128,6 +129,7 @@ void createExporter_GrpcDefaults() { verify(grpcBuilder, never()).setTrustedCertificates(any()); verify(grpcBuilder, never()).setClientTls(any(), any()); assertThat(grpcBuilder).extracting("delegate").extracting("retryPolicy").isNull(); + assertThat(exporter).extracting("memoryMode").isEqualTo(MemoryMode.IMMUTABLE_DATA); } Mockito.verifyNoInteractions(httpBuilder); } @@ -177,6 +179,7 @@ void createExporter_GrpcWithSignalConfiguration() throws CertificateEncodingExce config.put("otel.exporter.otlp.traces.compression", "gzip"); config.put("otel.exporter.otlp.timeout", "1s"); config.put("otel.exporter.otlp.traces.timeout", "15s"); + config.put("otel.java.experimental.exporter.memory_mode", "reusable_data"); try (SpanExporter exporter = provider.createExporter(DefaultConfigProperties.createFromMap(config))) { @@ -189,6 +192,7 @@ void createExporter_GrpcWithSignalConfiguration() throws CertificateEncodingExce verify(grpcBuilder).setTrustedCertificates(serverTls.certificate().getEncoded()); verify(grpcBuilder) .setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded()); + assertThat(exporter).extracting("memoryMode").isEqualTo(MemoryMode.REUSABLE_DATA); } Mockito.verifyNoInteractions(httpBuilder); } @@ -208,6 +212,7 @@ void createExporter_HttpDefaults() { verify(httpBuilder, never()).setTrustedCertificates(any()); verify(httpBuilder, never()).setClientTls(any(), any()); assertThat(httpBuilder).extracting("delegate").extracting("retryPolicy").isNull(); + assertThat(exporter).extracting("memoryMode").isEqualTo(MemoryMode.IMMUTABLE_DATA); } Mockito.verifyNoInteractions(grpcBuilder); } @@ -239,6 +244,7 @@ void createExporter_HttpWithGeneralConfiguration() throws CertificateEncodingExc verify(httpBuilder) .setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded()); assertThat(httpBuilder).extracting("delegate").extracting("retryPolicy").isNotNull(); + assertThat(exporter).extracting("memoryMode").isEqualTo(MemoryMode.IMMUTABLE_DATA); } Mockito.verifyNoInteractions(grpcBuilder); } @@ -262,6 +268,7 @@ void createExporter_HttpWithSignalConfiguration() throws CertificateEncodingExce config.put("otel.exporter.otlp.traces.compression", "gzip"); config.put("otel.exporter.otlp.timeout", "1s"); config.put("otel.exporter.otlp.traces.timeout", "15s"); + config.put("otel.java.experimental.exporter.memory_mode", "reusable_data"); try (SpanExporter exporter = provider.createExporter(DefaultConfigProperties.createFromMap(config))) { @@ -274,6 +281,7 @@ void createExporter_HttpWithSignalConfiguration() throws CertificateEncodingExce verify(httpBuilder).setTrustedCertificates(serverTls.certificate().getEncoded()); verify(httpBuilder) .setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded()); + assertThat(exporter).extracting("memoryMode").isEqualTo(MemoryMode.REUSABLE_DATA); } Mockito.verifyNoInteractions(grpcBuilder); } diff --git a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java b/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java index 4752832bbc0..dbe9f9a816a 100644 --- a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java +++ b/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterOkHttpSenderTest.java @@ -5,6 +5,8 @@ package io.opentelemetry.exporter.otlp.http.trace; +import static org.assertj.core.api.Assertions.assertThat; + import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler; import io.opentelemetry.exporter.otlp.testing.internal.AbstractHttpTelemetryExporterTest; @@ -14,7 +16,10 @@ import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; class OtlpHttpSpanExporterOkHttpSenderTest extends AbstractHttpTelemetryExporterTest { @@ -23,6 +28,31 @@ protected OtlpHttpSpanExporterOkHttpSenderTest() { super("span", "/v1/traces", ResourceSpans.getDefaultInstance()); } + /** Test configuration specific to metric exporter. */ + @Test + void stringRepresentation() { + try (SpanExporter spanExporter = OtlpHttpSpanExporter.builder().build()) { + assertThat(spanExporter.toString()) + .matches( + "OtlpHttpSpanExporter\\{" + + "exporterName=otlp, " + + "type=span, " + + "endpoint=http://localhost:4318/v1/traces, " + + "timeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + + "proxyOptions=null, " + + "compressorEncoding=null, " + + "connectTimeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + + "exportAsJson=false, " + + "headers=Headers\\{User-Agent=OBFUSCATED\\}, " + + "memoryMode=IMMUTABLE_DATA" + + "\\}"); + } + } + @Override protected TelemetryExporterBuilder exporterBuilder() { return new HttpSpanExporterBuilderWrapper(OtlpHttpSpanExporter.builder()); diff --git a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java b/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java index a74b5e39977..05b8523ebd4 100644 --- a/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java +++ b/exporters/otlp/all/src/testDefaultSender/java/io/opentelemetry/exporter/otlp/traces/OtlpGrpcSpanExporterTest.java @@ -17,8 +17,10 @@ import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSender; import io.opentelemetry.proto.trace.v1.ResourceSpans; import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; import java.io.Closeable; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; class OtlpGrpcSpanExporterTest extends AbstractGrpcTelemetryExporterTest { @@ -27,6 +29,30 @@ class OtlpGrpcSpanExporterTest extends AbstractGrpcTelemetryExporterTest { +public final class KeyValueStatelessMarshaler implements StatelessMarshaler { - static final KeyValueStatelessMarshaler INSTANCE = new KeyValueStatelessMarshaler(); + public static final KeyValueStatelessMarshaler INSTANCE = new KeyValueStatelessMarshaler(); private static final byte[] EMPTY_BYTES = new byte[0]; private KeyValueStatelessMarshaler() {} diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExemplarStatelessMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExemplarStatelessMarshaler.java index 3982b697ee0..9959beb7160 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExemplarStatelessMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExemplarStatelessMarshaler.java @@ -14,7 +14,7 @@ import io.opentelemetry.exporter.internal.marshal.Serializer; import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler; import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil; -import io.opentelemetry.exporter.internal.otlp.KeyValueStatelessMarshaler; +import io.opentelemetry.exporter.internal.otlp.AttributeKeyValueStatelessMarshaler; import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.LongExemplarData; @@ -52,7 +52,7 @@ public void writeTo(Serializer output, ExemplarData exemplar, MarshalerContext c output.serializeRepeatedMessageWithContext( io.opentelemetry.proto.metrics.v1.internal.Exemplar.FILTERED_ATTRIBUTES, exemplar.getFilteredAttributes(), - KeyValueStatelessMarshaler.INSTANCE, + AttributeKeyValueStatelessMarshaler.INSTANCE, context); } @@ -85,7 +85,7 @@ public int getBinarySerializedSize(ExemplarData exemplar, MarshalerContext conte StatelessMarshalerUtil.sizeRepeatedMessageWithContext( io.opentelemetry.proto.metrics.v1.internal.Exemplar.FILTERED_ATTRIBUTES, exemplar.getFilteredAttributes(), - KeyValueStatelessMarshaler.INSTANCE, + AttributeKeyValueStatelessMarshaler.INSTANCE, context); return size; diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExponentialHistogramDataPointStatelessMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExponentialHistogramDataPointStatelessMarshaler.java index 23117aab5d5..bbf2a1d6881 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExponentialHistogramDataPointStatelessMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/ExponentialHistogramDataPointStatelessMarshaler.java @@ -10,7 +10,7 @@ import io.opentelemetry.exporter.internal.marshal.Serializer; import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler; import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil; -import io.opentelemetry.exporter.internal.otlp.KeyValueStatelessMarshaler; +import io.opentelemetry.exporter.internal.otlp.AttributeKeyValueStatelessMarshaler; import io.opentelemetry.proto.metrics.v1.internal.ExponentialHistogramDataPoint; import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; import java.io.IOException; @@ -58,7 +58,7 @@ public void writeTo( output.serializeRepeatedMessageWithContext( ExponentialHistogramDataPoint.ATTRIBUTES, point.getAttributes(), - KeyValueStatelessMarshaler.INSTANCE, + AttributeKeyValueStatelessMarshaler.INSTANCE, context); } @@ -105,7 +105,7 @@ public int getBinarySerializedSize( StatelessMarshalerUtil.sizeRepeatedMessageWithContext( ExponentialHistogramDataPoint.ATTRIBUTES, point.getAttributes(), - KeyValueStatelessMarshaler.INSTANCE, + AttributeKeyValueStatelessMarshaler.INSTANCE, context); return size; diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/HistogramDataPointStatelessMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/HistogramDataPointStatelessMarshaler.java index 2102aa8bf35..c078a9739f4 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/HistogramDataPointStatelessMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/HistogramDataPointStatelessMarshaler.java @@ -10,7 +10,7 @@ import io.opentelemetry.exporter.internal.marshal.Serializer; import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler; import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil; -import io.opentelemetry.exporter.internal.otlp.KeyValueStatelessMarshaler; +import io.opentelemetry.exporter.internal.otlp.AttributeKeyValueStatelessMarshaler; import io.opentelemetry.proto.metrics.v1.internal.HistogramDataPoint; import io.opentelemetry.sdk.metrics.data.HistogramPointData; import java.io.IOException; @@ -45,7 +45,7 @@ public void writeTo(Serializer output, HistogramPointData point, MarshalerContex output.serializeRepeatedMessageWithContext( HistogramDataPoint.ATTRIBUTES, point.getAttributes(), - KeyValueStatelessMarshaler.INSTANCE, + AttributeKeyValueStatelessMarshaler.INSTANCE, context); } @@ -77,7 +77,7 @@ public int getBinarySerializedSize(HistogramPointData point, MarshalerContext co StatelessMarshalerUtil.sizeRepeatedMessageWithContext( HistogramDataPoint.ATTRIBUTES, point.getAttributes(), - KeyValueStatelessMarshaler.INSTANCE, + AttributeKeyValueStatelessMarshaler.INSTANCE, context); return size; } diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/NumberDataPointStatelessMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/NumberDataPointStatelessMarshaler.java index 2981e8cd598..c907d59a7e7 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/NumberDataPointStatelessMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/NumberDataPointStatelessMarshaler.java @@ -13,7 +13,7 @@ import io.opentelemetry.exporter.internal.marshal.Serializer; import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler; import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil; -import io.opentelemetry.exporter.internal.otlp.KeyValueStatelessMarshaler; +import io.opentelemetry.exporter.internal.otlp.AttributeKeyValueStatelessMarshaler; import io.opentelemetry.proto.metrics.v1.internal.NumberDataPoint; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.LongPointData; @@ -45,7 +45,7 @@ public void writeTo(Serializer output, PointData point, MarshalerContext context output.serializeRepeatedMessageWithContext( NumberDataPoint.ATTRIBUTES, point.getAttributes(), - KeyValueStatelessMarshaler.INSTANCE, + AttributeKeyValueStatelessMarshaler.INSTANCE, context); } @@ -71,7 +71,7 @@ public int getBinarySerializedSize(PointData point, MarshalerContext context) { StatelessMarshalerUtil.sizeRepeatedMessageWithContext( NumberDataPoint.ATTRIBUTES, point.getAttributes(), - KeyValueStatelessMarshaler.INSTANCE, + AttributeKeyValueStatelessMarshaler.INSTANCE, context); return size; } diff --git a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/SummaryDataPointStatelessMarshaler.java b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/SummaryDataPointStatelessMarshaler.java index d8d611c12fb..bcd2306bb13 100644 --- a/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/SummaryDataPointStatelessMarshaler.java +++ b/exporters/otlp/common/src/main/java/io/opentelemetry/exporter/internal/otlp/metrics/SummaryDataPointStatelessMarshaler.java @@ -10,7 +10,7 @@ import io.opentelemetry.exporter.internal.marshal.Serializer; import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler; import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil; -import io.opentelemetry.exporter.internal.otlp.KeyValueStatelessMarshaler; +import io.opentelemetry.exporter.internal.otlp.AttributeKeyValueStatelessMarshaler; import io.opentelemetry.proto.metrics.v1.internal.SummaryDataPoint; import io.opentelemetry.sdk.metrics.data.SummaryPointData; import java.io.IOException; @@ -37,7 +37,7 @@ public void writeTo(Serializer output, SummaryPointData point, MarshalerContext output.serializeRepeatedMessageWithContext( SummaryDataPoint.ATTRIBUTES, point.getAttributes(), - KeyValueStatelessMarshaler.INSTANCE, + AttributeKeyValueStatelessMarshaler.INSTANCE, context); } @@ -60,7 +60,7 @@ public int getBinarySerializedSize(SummaryPointData point, MarshalerContext cont StatelessMarshalerUtil.sizeRepeatedMessageWithContext( SummaryDataPoint.ATTRIBUTES, point.getAttributes(), - KeyValueStatelessMarshaler.INSTANCE, + AttributeKeyValueStatelessMarshaler.INSTANCE, context); return size; } diff --git a/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java b/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java index 1339734f5b8..5310162ec60 100644 --- a/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java +++ b/integration-tests/otlp/src/main/java/io/opentelemetry/integrationtest/OtlpExporterIntegrationTest.java @@ -350,7 +350,7 @@ void testOtlpGrpcMetricExport(String compression) { @EnumSource(MemoryMode.class) void testOtlpGrpcMetricExport_memoryMode(MemoryMode memoryMode) { OtlpGrpcMetricExporterBuilder builder = OtlpGrpcMetricExporter.builder(); - OtlpConfigUtil.setMemoryModeOnOtlpMetricExporterBuilder(builder, memoryMode); + OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode); MetricExporter exporter = builder @@ -403,7 +403,7 @@ void testOtlpHttpMetricExport(String compression) { @EnumSource(MemoryMode.class) void testOtlpHttpMetricExport_memoryMode(MemoryMode memoryMode) { OtlpHttpMetricExporterBuilder builder = OtlpHttpMetricExporter.builder(); - OtlpConfigUtil.setMemoryModeOnOtlpMetricExporterBuilder(builder, memoryMode); + OtlpConfigUtil.setMemoryModeOnOtlpExporterBuilder(builder, memoryMode); MetricExporter exporter = builder