From 8684882c06b91600ce52baaaefe1242f34aa45b8 Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Wed, 31 Jan 2024 10:59:48 -0600 Subject: [PATCH] Add Compressor SPI support to OtlpGrpc{Signal}Exporters (#6103) --- .../internal/compression/CompressorUtil.java | 28 +++++++-------- .../internal/grpc/GrpcExporterBuilder.java | 16 +++++---- .../internal/grpc/GrpcSenderProvider.java | 3 +- .../reflect-config.json | 4 +++ .../grpc/GrpcExporterBuilderTest.java | 15 ++++---- .../otlp/trace/OltpExporterBenchmark.java | 2 +- .../OtlpHttpLogRecordExporterBuilder.java | 21 ++++------- .../OtlpHttpMetricExporterBuilder.java | 21 ++++------- .../trace/OtlpHttpSpanExporterBuilder.java | 21 ++++------- .../OtlpGrpcLogRecordExporterBuilder.java | 14 ++++---- .../OtlpGrpcMetricExporterBuilder.java | 14 ++++---- .../trace/OtlpGrpcSpanExporterBuilder.java | 14 ++++---- .../AbstractGrpcTelemetryExporterTest.java | 35 ++++++++++++++----- .../internal/UpstreamGrpcSenderProvider.java | 27 ++++++++++++-- .../okhttp/internal/GrpcRequestBody.java | 17 ++++----- .../okhttp/internal/OkHttpGrpcSender.java | 13 +++---- .../internal/OkHttpGrpcSenderProvider.java | 5 +-- .../internal/OkHttpGrpcSuppressionTest.java | 2 +- .../jaeger/sampler/OkHttpGrpcService.java | 2 +- 19 files changed, 156 insertions(+), 118 deletions(-) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java index 6a777f759ba..9748ea508ad 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java @@ -5,11 +5,14 @@ package io.opentelemetry.exporter.internal.compression; -import java.util.Collections; +import static io.opentelemetry.api.internal.Utils.checkArgument; +import static java.util.stream.Collectors.joining; + import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; import java.util.Set; +import javax.annotation.Nullable; /** * Utilities for resolving SPI {@link Compressor}s. @@ -25,23 +28,20 @@ public final class CompressorUtil { private CompressorUtil() {} - /** Get list of loaded compressors, named according to {@link Compressor#getEncoding()}. */ - public static Set supportedCompressors() { - return Collections.unmodifiableSet(compressorRegistry.keySet()); - } - /** - * Resolve the {@link Compressor} with the {@link Compressor#getEncoding()} equal to the {@code - * encoding}. + * Validate that the {@code compressionMethod} is "none" or matches a registered compressor. * + * @return {@code null} if {@code compressionMethod} is "none" or the registered compressor * @throws IllegalArgumentException if no match is found */ - public static Compressor resolveCompressor(String encoding) { - Compressor compressor = compressorRegistry.get(encoding); - if (compressor == null) { - throw new IllegalArgumentException( - "Could not resolve compressor for encoding \"" + encoding + "\"."); - } + @Nullable + public static Compressor validateAndResolveCompressor(String compressionMethod) { + Set supportedEncodings = compressorRegistry.keySet(); + Compressor compressor = compressorRegistry.get(compressionMethod); + checkArgument( + "none".equals(compressionMethod) || compressor != null, + "Unsupported compressionMethod. Compression method must be \"none\" or one of: " + + supportedEncodings.stream().collect(joining(",", "[", "]"))); return compressor; } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java index bef1db06224..df9bb403099 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java @@ -12,6 +12,7 @@ import io.opentelemetry.api.metrics.MeterProvider; import io.opentelemetry.exporter.internal.ExporterBuilderUtil; import io.opentelemetry.exporter.internal.TlsConfigHelper; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.net.URI; @@ -21,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.ServiceLoader; import java.util.StringJoiner; import java.util.concurrent.TimeUnit; @@ -51,7 +53,7 @@ public class GrpcExporterBuilder { private long timeoutNanos; private URI endpoint; - private boolean compressionEnabled = false; + @Nullable private Compressor compressor; private final Map constantHeaders = new HashMap<>(); private Supplier> headerSupplier = Collections::emptyMap; private TlsConfigHelper tlsConfigHelper = new TlsConfigHelper(); @@ -95,8 +97,8 @@ public GrpcExporterBuilder setEndpoint(String endpoint) { return this; } - public GrpcExporterBuilder setCompression(String compressionMethod) { - this.compressionEnabled = compressionMethod.equals("gzip"); + public GrpcExporterBuilder setCompression(@Nullable Compressor compressor) { + this.compressor = compressor; return this; } @@ -150,7 +152,7 @@ public GrpcExporterBuilder copy() { copy.timeoutNanos = timeoutNanos; copy.endpoint = endpoint; - copy.compressionEnabled = compressionEnabled; + copy.compressor = compressor; copy.constantHeaders.putAll(constantHeaders); copy.headerSupplier = headerSupplier; copy.tlsConfigHelper = tlsConfigHelper.copy(); @@ -189,7 +191,7 @@ public GrpcExporter build() { grpcSenderProvider.createSender( endpoint, grpcEndpointPath, - compressionEnabled, + compressor, timeoutNanos, headerSupplier, grpcChannel, @@ -212,7 +214,9 @@ public String toString(boolean includePrefixAndSuffix) { joiner.add("endpoint=" + endpoint.toString()); joiner.add("endpointPath=" + grpcEndpointPath); joiner.add("timeoutNanos=" + timeoutNanos); - joiner.add("compressionEnabled=" + compressionEnabled); + joiner.add( + "compressorEncoding=" + + Optional.ofNullable(compressor).map(Compressor::getEncoding).orElse(null)); StringJoiner headersJoiner = new StringJoiner(", ", "Headers{", "}"); constantHeaders.forEach((key, value) -> headersJoiner.add(key + "=OBFUSCATED")); Map headers = headerSupplier.get(); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java index c8c584f6e58..e7618e9bfaf 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.internal.grpc; import io.grpc.Channel; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.net.URI; @@ -31,7 +32,7 @@ public interface GrpcSenderProvider { GrpcSender createSender( URI endpoint, String endpointPath, - boolean compressionEnabled, + @Nullable Compressor compressor, long timeoutNanos, Supplier>> headersSupplier, @Nullable Object managedChannel, diff --git a/exporters/common/src/main/resources/META-INF/native-image/io.opentelemetry.opentelemetry-exporter-common/reflect-config.json b/exporters/common/src/main/resources/META-INF/native-image/io.opentelemetry.opentelemetry-exporter-common/reflect-config.json index 033ccf44f0c..1d93899460b 100644 --- a/exporters/common/src/main/resources/META-INF/native-image/io.opentelemetry.opentelemetry-exporter-common/reflect-config.json +++ b/exporters/common/src/main/resources/META-INF/native-image/io.opentelemetry.opentelemetry-exporter-common/reflect-config.json @@ -6,5 +6,9 @@ { "name":"io.opentelemetry.sdk.common.export.RetryPolicy", "queryAllDeclaredMethods":true + }, + { + "name":"io.opentelemetry.exporter.internal.compression.Compressor", + "queryAllDeclaredMethods":true } ] diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilderTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilderTest.java index dcf1ea64387..e562729ae1d 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilderTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilderTest.java @@ -7,6 +7,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.opentelemetry.exporter.internal.compression.GzipCompressor; import io.opentelemetry.exporter.internal.marshal.Marshaler; import java.net.URI; import org.junit.jupiter.api.BeforeEach; @@ -25,27 +26,27 @@ void setUp() { @Test void compressionDefault() { - assertThat(builder).extracting("compressionEnabled").isEqualTo(false); + assertThat(builder).extracting("compressor").isNull(); } @Test void compressionNone() { - builder.setCompression("none"); + builder.setCompression(null); - assertThat(builder).extracting("compressionEnabled").isEqualTo(false); + assertThat(builder).extracting("compressor").isNull(); } @Test void compressionGzip() { - builder.setCompression("gzip"); + builder.setCompression(GzipCompressor.getInstance()); - assertThat(builder).extracting("compressionEnabled").isEqualTo(true); + assertThat(builder).extracting("compressor").isEqualTo(GzipCompressor.getInstance()); } @Test void compressionEnabledAndDisabled() { - builder.setCompression("gzip").setCompression("none"); + builder.setCompression(GzipCompressor.getInstance()).setCompression(null); - assertThat(builder).extracting("compressionEnabled").isEqualTo(false); + assertThat(builder).extracting("compressor").isNull(); } } diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index ef620333474..dd8ca40dba2 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -98,7 +98,7 @@ public void setUp() { URI.create("http://localhost:" + server.activeLocalPort()) .resolve(OtlpGrpcSpanExporterBuilder.GRPC_ENDPOINT_PATH) .toString(), - /* compressionEnabled= */ false, + null, 10, Collections::emptyMap, null, diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java index 5861cd15832..0b6253336b8 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java @@ -7,10 +7,11 @@ import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.joining; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.Compressor; +import io.opentelemetry.exporter.internal.compression.CompressorProvider; import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; @@ -18,7 +19,6 @@ import io.opentelemetry.sdk.common.export.RetryPolicy; import java.time.Duration; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import javax.net.ssl.SSLContext; @@ -99,21 +99,14 @@ public OtlpHttpLogRecordExporterBuilder setEndpoint(String endpoint) { } /** - * Sets the method used to compress payloads. If unset, compression is disabled. Currently - * supported compression methods include "gzip" and "none". + * Sets the method used to compress payloads. If unset, compression is disabled. Compression + * method "gzip" and "none" are supported out of the box. Support for additional compression + * methods is available by implementing {@link Compressor} and {@link CompressorProvider}. */ public OtlpHttpLogRecordExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); - if (compressionMethod.equals("none")) { - delegate.setCompression(null); - return this; - } - Set supportedCompressionMethods = CompressorUtil.supportedCompressors(); - checkArgument( - supportedCompressionMethods.contains(compressionMethod), - "Unsupported compressionMethod. Compression method must be \"none\" or one of: " - + supportedCompressionMethods.stream().collect(joining(",", "[", "]"))); - delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod)); + Compressor compressor = CompressorUtil.validateAndResolveCompressor(compressionMethod); + delegate.setCompression(compressor); return this; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java index 9f97d7b3f9e..d4b2e57df88 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java @@ -7,9 +7,10 @@ import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.joining; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.Compressor; +import io.opentelemetry.exporter.internal.compression.CompressorProvider; import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; @@ -21,7 +22,6 @@ import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.time.Duration; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import javax.net.ssl.SSLContext; @@ -111,21 +111,14 @@ public OtlpHttpMetricExporterBuilder setEndpoint(String endpoint) { } /** - * Sets the method used to compress payloads. If unset, compression is disabled. Currently - * supported compression methods include "gzip" and "none". + * Sets the method used to compress payloads. If unset, compression is disabled. Compression + * method "gzip" and "none" are supported out of the box. Support for additional compression + * methods is available by implementing {@link Compressor} and {@link CompressorProvider}. */ public OtlpHttpMetricExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); - if (compressionMethod.equals("none")) { - delegate.setCompression(null); - return this; - } - Set supportedCompressionMethods = CompressorUtil.supportedCompressors(); - checkArgument( - supportedCompressionMethods.contains(compressionMethod), - "Unsupported compressionMethod. Compression method must be \"none\" or one of: " - + supportedCompressionMethods.stream().collect(joining(",", "[", "]"))); - delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod)); + Compressor compressor = CompressorUtil.validateAndResolveCompressor(compressionMethod); + delegate.setCompression(compressor); return this; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java index ed5975aa4b2..c7e3549428a 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java @@ -7,10 +7,11 @@ import static io.opentelemetry.api.internal.Utils.checkArgument; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.joining; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.Compressor; +import io.opentelemetry.exporter.internal.compression.CompressorProvider; import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; @@ -18,7 +19,6 @@ import io.opentelemetry.sdk.common.export.RetryPolicy; import java.time.Duration; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import javax.net.ssl.SSLContext; @@ -99,21 +99,14 @@ public OtlpHttpSpanExporterBuilder setEndpoint(String endpoint) { } /** - * Sets the method used to compress payloads. If unset, compression is disabled. Currently - * supported compression methods include "gzip" and "none". + * Sets the method used to compress payloads. If unset, compression is disabled. Compression + * method "gzip" and "none" are supported out of the box. Support for additional compression + * methods is available by implementing {@link Compressor} and {@link CompressorProvider}. */ public OtlpHttpSpanExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); - if (compressionMethod.equals("none")) { - delegate.setCompression(null); - return this; - } - Set supportedCompressionMethods = CompressorUtil.supportedCompressors(); - checkArgument( - supportedCompressionMethods.contains(compressionMethod), - "Unsupported compressionMethod. Compression method must be \"none\" or one of: " - + supportedCompressionMethods.stream().collect(joining(",", "[", "]"))); - delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod)); + Compressor compressor = CompressorUtil.validateAndResolveCompressor(compressionMethod); + delegate.setCompression(compressor); return this; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java index b04014fc729..3a50b3e1d1c 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java @@ -11,6 +11,9 @@ import io.grpc.ManagedChannel; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.Compressor; +import io.opentelemetry.exporter.internal.compression.CompressorProvider; +import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; @@ -109,15 +112,14 @@ public OtlpGrpcLogRecordExporterBuilder setEndpoint(String endpoint) { } /** - * Sets the method used to compress payloads. If unset, compression is disabled. Currently - * supported compression methods include "gzip" and "none". + * Sets the method used to compress payloads. If unset, compression is disabled. Compression + * method "gzip" and "none" are supported out of the box. Support for additional compression + * methods is available by implementing {@link Compressor} and {@link CompressorProvider}. */ public OtlpGrpcLogRecordExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); - checkArgument( - compressionMethod.equals("gzip") || compressionMethod.equals("none"), - "Unsupported compression method. Supported compression methods include: gzip, none."); - delegate.setCompression(compressionMethod); + Compressor compressor = CompressorUtil.validateAndResolveCompressor(compressionMethod); + delegate.setCompression(compressor); return this; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java index 43a0dbdcc41..302eee845ed 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java @@ -10,6 +10,9 @@ import io.grpc.ManagedChannel; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.Compressor; +import io.opentelemetry.exporter.internal.compression.CompressorProvider; +import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; @@ -121,15 +124,14 @@ public OtlpGrpcMetricExporterBuilder setEndpoint(String endpoint) { } /** - * Sets the method used to compress payloads. If unset, compression is disabled. Currently - * supported compression methods include "gzip" and "none". + * Sets the method used to compress payloads. If unset, compression is disabled. Compression + * method "gzip" and "none" are supported out of the box. Support for additional compression + * methods is available by implementing {@link Compressor} and {@link CompressorProvider}. */ public OtlpGrpcMetricExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); - checkArgument( - compressionMethod.equals("gzip") || compressionMethod.equals("none"), - "Unsupported compression method. Supported compression methods include: gzip, none."); - delegate.setCompression(compressionMethod); + Compressor compressor = CompressorUtil.validateAndResolveCompressor(compressionMethod); + delegate.setCompression(compressor); return this; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java index 884a8dea172..cac7cd5cb85 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java @@ -11,6 +11,9 @@ import io.grpc.ManagedChannel; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.compression.Compressor; +import io.opentelemetry.exporter.internal.compression.CompressorProvider; +import io.opentelemetry.exporter.internal.compression.CompressorUtil; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent; @@ -105,15 +108,14 @@ public OtlpGrpcSpanExporterBuilder setEndpoint(String endpoint) { } /** - * Sets the method used to compress payloads. If unset, compression is disabled. Currently - * supported compression methods include "gzip" and "none". + * Sets the method used to compress payloads. If unset, compression is disabled. Compression + * method "gzip" and "none" are supported out of the box. Support for additional compression + * methods is available by implementing {@link Compressor} and {@link CompressorProvider}. */ public OtlpGrpcSpanExporterBuilder setCompression(String compressionMethod) { requireNonNull(compressionMethod, "compressionMethod"); - checkArgument( - compressionMethod.equals("gzip") || compressionMethod.equals("none"), - "Unsupported compression method. Supported compression methods include: gzip, none."); - delegate.setCompression(compressionMethod); + Compressor compressor = CompressorUtil.validateAndResolveCompressor(compressionMethod); + delegate.setCompression(compressor); return this; } diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java index 0b3d79a2690..f3310eb36d8 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java @@ -26,9 +26,11 @@ import io.github.netmikey.logunit.api.LogCapturer; import io.grpc.ManagedChannel; import io.opentelemetry.exporter.internal.TlsUtil; +import io.opentelemetry.exporter.internal.compression.GzipCompressor; import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.otlp.testing.internal.compressor.Base64Compressor; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; @@ -272,9 +274,7 @@ void compressionWithNone() { assumeThat(exporter.unwrap()) .extracting("delegate.grpcSender") .matches(sender -> sender.getClass().getSimpleName().equals("OkHttpGrpcSender")); - assertThat(exporter.unwrap()) - .extracting("delegate.grpcSender.compressionEnabled") - .isEqualTo(false); + assertThat(exporter.unwrap()).extracting("delegate.grpcSender.compressor").isNull(); } finally { exporter.shutdown(); } @@ -290,8 +290,25 @@ void compressionWithGzip() { .extracting("delegate.grpcSender") .matches(sender -> sender.getClass().getSimpleName().equals("OkHttpGrpcSender")); assertThat(exporter.unwrap()) - .extracting("delegate.grpcSender.compressionEnabled") - .isEqualTo(true); + .extracting("delegate.grpcSender.compressor") + .isEqualTo(GzipCompressor.getInstance()); + } finally { + exporter.shutdown(); + } + } + + @Test + void compressionWithSpiCompressor() { + TelemetryExporter exporter = + exporterBuilder().setEndpoint(server.httpUri().toString()).setCompression("base64").build(); + try { + // UpstreamGrpcSender doesn't support compression, so we skip the assertion + assumeThat(exporter.unwrap()) + .extracting("delegate.grpcSender") + .matches(sender -> sender.getClass().getSimpleName().equals("OkHttpGrpcSender")); + assertThat(exporter.unwrap()) + .extracting("delegate.grpcSender.compressor") + .isEqualTo(Base64Compressor.getInstance()); } finally { exporter.shutdown(); } @@ -703,6 +720,8 @@ void validConfig() { .doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().setCompression("gzip")).doesNotThrowAnyException(); + // SPI compressor available for this test but not packaged with OTLP exporter + assertThatCode(() -> exporterBuilder().setCompression("base64")).doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().setCompression("none")).doesNotThrowAnyException(); assertThatCode(() -> exporterBuilder().addHeader("foo", "bar").addHeader("baz", "qux")) @@ -745,7 +764,7 @@ void invalidConfig() { assertThatThrownBy(() -> exporterBuilder().setCompression("foo")) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "Unsupported compression method. Supported compression methods include: gzip, none."); + "Unsupported compressionMethod. Compression method must be \"none\" or one of: [base64,gzip]"); } @Test @@ -821,7 +840,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException { + "timeoutNanos=" + TimeUnit.SECONDS.toNanos(10) + ", " - + "compressionEnabled=false, " + + "compressorEncoding=null, " + "headers=Headers\\{User-Agent=OBFUSCATED\\}" + ".*" // Maybe additional grpcChannel field + "\\}"); @@ -858,7 +877,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException { + "timeoutNanos=" + TimeUnit.SECONDS.toNanos(5) + ", " - + "compressionEnabled=true, " + + "compressorEncoding=gzip, " + "headers=Headers\\{.*foo=OBFUSCATED.*\\}, " + "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}" + ".*" // Maybe additional grpcChannel field diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java index a4effdbd09c..880d288417e 100644 --- a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java @@ -7,13 +7,17 @@ import io.grpc.Channel; import io.grpc.Codec; +import io.grpc.CompressorRegistry; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.grpc.GrpcSender; import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider; import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.io.IOException; +import java.io.OutputStream; import java.net.URI; import java.util.List; import java.util.Map; @@ -35,7 +39,7 @@ public class UpstreamGrpcSenderProvider implements GrpcSenderProvider { public GrpcSender createSender( URI endpoint, String endpointPath, - boolean compressionEnabled, + @Nullable Compressor compressor, long timeoutNanos, Supplier>> headersSupplier, @Nullable Object managedChannel, @@ -60,12 +64,29 @@ public GrpcSender createSender( } } - Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE; + String compression = Codec.Identity.NONE.getMessageEncoding(); + if (compressor != null) { + CompressorRegistry.getDefaultInstance() + .register( + new io.grpc.Compressor() { + @Override + public String getMessageEncoding() { + return compressor.getEncoding(); + } + + @Override + public OutputStream compress(OutputStream os) throws IOException { + return compressor.compress(os); + } + }); + compression = compressor.getEncoding(); + } + MarshalerServiceStub stub = stubFactory .get() .apply((Channel) managedChannel, authorityOverride) - .withCompression(codec.getMessageEncoding()); + .withCompression(compression); return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier); } diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/GrpcRequestBody.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/GrpcRequestBody.java index ca9191360db..7baa5c4dce0 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/GrpcRequestBody.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/GrpcRequestBody.java @@ -5,6 +5,7 @@ package io.opentelemetry.exporter.sender.okhttp.internal; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.marshal.Marshaler; import java.io.IOException; import javax.annotation.Nullable; @@ -12,7 +13,6 @@ import okhttp3.RequestBody; import okio.Buffer; import okio.BufferedSink; -import okio.GzipSink; import okio.Okio; /** @@ -33,15 +33,15 @@ public final class GrpcRequestBody extends RequestBody { private final Marshaler marshaler; private final int messageSize; private final int contentLength; - private final boolean compressed; + @Nullable private final Compressor compressor; /** Creates a new {@link GrpcRequestBody}. */ - public GrpcRequestBody(Marshaler marshaler, boolean compressed) { + public GrpcRequestBody(Marshaler marshaler, @Nullable Compressor compressor) { this.marshaler = marshaler; - this.compressed = compressed; + this.compressor = compressor; messageSize = marshaler.getBinarySerializedSize(); - if (compressed) { + if (compressor != null) { // Content length not known since we want to compress on the I/O thread. contentLength = -1; } else { @@ -62,14 +62,15 @@ public long contentLength() { @Override public void writeTo(BufferedSink sink) throws IOException { - if (!compressed) { + if (compressor == null) { sink.writeByte(UNCOMPRESSED_FLAG); sink.writeInt(messageSize); marshaler.writeBinaryTo(sink.outputStream()); } else { try (Buffer compressedBody = new Buffer()) { - try (BufferedSink gzipSink = Okio.buffer(new GzipSink(compressedBody))) { - marshaler.writeBinaryTo(gzipSink.outputStream()); + try (BufferedSink compressedSink = + Okio.buffer(Okio.sink(compressor.compress(compressedBody.outputStream())))) { + marshaler.writeBinaryTo(compressedSink.outputStream()); } sink.writeByte(COMPRESSED_FLAG); int compressedBytes = (int) compressedBody.size(); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 24ade7fae5a..477b3fdda59 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -25,6 +25,7 @@ import io.opentelemetry.exporter.internal.InstrumentationUtil; import io.opentelemetry.exporter.internal.RetryUtil; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil; import io.opentelemetry.exporter.internal.grpc.GrpcResponse; import io.opentelemetry.exporter.internal.grpc.GrpcSender; @@ -67,12 +68,12 @@ public final class OkHttpGrpcSender implements GrpcSender>> headersSupplier; - private final boolean compressionEnabled; + @Nullable private final Compressor compressor; /** Creates a new {@link OkHttpGrpcSender}. */ public OkHttpGrpcSender( String endpoint, - boolean compressionEnabled, + @Nullable Compressor compressor, long timeoutNanos, Supplier>> headersSupplier, @Nullable RetryPolicy retryPolicy, @@ -97,7 +98,7 @@ public OkHttpGrpcSender( this.client = clientBuilder.build(); this.headersSupplier = headersSupplier; this.url = HttpUrl.get(endpoint); - this.compressionEnabled = compressionEnabled; + this.compressor = compressor; } @Override @@ -110,10 +111,10 @@ public void send(T request, Runnable onSuccess, BiConsumer values.forEach(value -> requestBuilder.addHeader(key, value))); } requestBuilder.addHeader("te", "trailers"); - if (compressionEnabled) { - requestBuilder.addHeader("grpc-encoding", "gzip"); + if (compressor != null) { + requestBuilder.addHeader("grpc-encoding", compressor.getEncoding()); } - RequestBody requestBody = new GrpcRequestBody(request, compressionEnabled); + RequestBody requestBody = new GrpcRequestBody(request, compressor); requestBuilder.post(requestBody); InstrumentationUtil.suppressInstrumentation( diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java index 6ac663495b0..affaf09be68 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java @@ -6,6 +6,7 @@ package io.opentelemetry.exporter.sender.okhttp.internal; import io.grpc.Channel; +import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.grpc.GrpcSender; import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider; import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub; @@ -32,7 +33,7 @@ public class OkHttpGrpcSenderProvider implements GrpcSenderProvider { public GrpcSender createSender( URI endpoint, String endpointPath, - boolean compressionEnabled, + @Nullable Compressor compressor, long timeoutNanos, Supplier>> headersSupplier, @Nullable Object managedChannel, @@ -42,7 +43,7 @@ public GrpcSender createSender( @Nullable X509TrustManager trustManager) { return new OkHttpGrpcSender<>( endpoint.resolve(endpointPath).toString(), - compressionEnabled, + compressor, timeoutNanos, headersSupplier, retryPolicy, diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java index 482eadc1bd4..fcedd3bc25f 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java @@ -21,7 +21,7 @@ void send(OkHttpGrpcSender sender, Runnable onSuccess, Runnable @Override OkHttpGrpcSender createSender(String endpoint) { return new OkHttpGrpcSender<>( - "https://localhost", false, 10L, Collections::emptyMap, null, null, null); + "https://localhost", null, 10L, Collections::emptyMap, null, null, null); } protected static class DummyMarshaler extends MarshalerWithSize { diff --git a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java index 02cc43869cb..ca1455ad3f3 100644 --- a/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java +++ b/sdk-extensions/jaeger-remote-sampler/src/main/java/io/opentelemetry/sdk/extension/trace/jaeger/sampler/OkHttpGrpcService.java @@ -53,7 +53,7 @@ public SamplingStrategyResponseUnMarshaler execute( SamplingStrategyResponseUnMarshaler responseUnmarshaller) { Request.Builder requestBuilder = new Request.Builder().url(url).headers(headers); - RequestBody requestBody = new GrpcRequestBody(exportRequest, false); + RequestBody requestBody = new GrpcRequestBody(exportRequest, null); requestBuilder.post(requestBody); try {