Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compressor SPI to support additional compression algos #5990

Merged
merged 3 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.compression;

import java.io.IOException;
import java.io.OutputStream;
import javax.annotation.concurrent.ThreadSafe;

/**
* An abstraction for compressing messages. Implementation MUST be thread safe as the same instance
* is expected to be used many times and concurrently. Instances are usually singletons.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
@ThreadSafe
public interface Compressor {

/**
* The name of the compressor encoding.
*
* <p>Used to identify the compressor during configuration and to populate the {@code
* Content-Encoding} header.
*/
String getEncoding();

/** Wrap the {@code outputStream} with a compressing output stream. */
OutputStream compress(OutputStream outputStream) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.compression;

/**
* A service provider interface (SPI) for providing {@link Compressor}s.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface CompressorProvider {

/** Return the {@link Compressor}. */
Compressor getInstance();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.compression;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;

/**
* Utilities for resolving SPI {@link Compressor}s.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*
* @see CompressorProvider
*/
public final class CompressorUtil {

private static final Map<String, Compressor> compressorRegistry = buildCompressorRegistry();

private CompressorUtil() {}

/** Get list of loaded compressors, named according to {@link Compressor#getEncoding()}. */
public static Set<String> supportedCompressors() {
return Collections.unmodifiableSet(compressorRegistry.keySet());
}

/**
* Resolve the {@link Compressor} with the {@link Compressor#getEncoding()} equal to the {@code
* encoding}.
*
* @throws IllegalArgumentException if no match is found
*/
public static Compressor resolveCompressor(String encoding) {
Compressor compressor = compressorRegistry.get(encoding);
if (compressor == null) {
throw new IllegalArgumentException(

Check warning on line 42 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/compression/CompressorUtil.java#L42

Added line #L42 was not covered by tests
"Could not resolve compressor for encoding \"" + encoding + "\".");
}
return compressor;
}

private static Map<String, Compressor> buildCompressorRegistry() {
Map<String, Compressor> compressors = new HashMap<>();
for (CompressorProvider spi :
ServiceLoader.load(CompressorProvider.class, CompressorUtil.class.getClassLoader())) {
Compressor compressor = spi.getInstance();
compressors.put(compressor.getEncoding(), compressor);
}
// Hardcode gzip compressor
compressors.put(GzipCompressor.getInstance().getEncoding(), GzipCompressor.getInstance());
return compressors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal.compression;

import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;

/**
* Gzip {@link Compressor}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class GzipCompressor implements Compressor {

private static final GzipCompressor INSTANCE = new GzipCompressor();

private GzipCompressor() {}

public static GzipCompressor getInstance() {
return INSTANCE;
}

@Override
public String getEncoding() {
return "gzip";
}

@Override
public OutputStream compress(OutputStream outputStream) throws IOException {
return new GZIPOutputStream(outputStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.TlsConfigHelper;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.net.URI;
Expand All @@ -19,6 +20,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -48,8 +50,8 @@ public final class HttpExporterBuilder<T extends Marshaler> {
private String endpoint;

private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
@Nullable private Compressor compressor;
private long connectTimeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_CONNECT_TIMEOUT_SECS);
private boolean compressionEnabled = false;
private boolean exportAsJson = false;
private final Map<String, String> constantHeaders = new HashMap<>();
private Supplier<Map<String, String>> headerSupplier = Collections::emptyMap;
Expand Down Expand Up @@ -82,8 +84,8 @@ public HttpExporterBuilder<T> setEndpoint(String endpoint) {
return this;
}

public HttpExporterBuilder<T> setCompression(String compressionMethod) {
this.compressionEnabled = compressionMethod.equals("gzip");
public HttpExporterBuilder<T> setCompression(@Nullable Compressor compressor) {
this.compressor = compressor;
return this;
}

Expand Down Expand Up @@ -141,7 +143,7 @@ public HttpExporterBuilder<T> copy() {
copy.timeoutNanos = timeoutNanos;
copy.connectTimeoutNanos = connectTimeoutNanos;
copy.exportAsJson = exportAsJson;
copy.compressionEnabled = compressionEnabled;
copy.compressor = compressor;
copy.constantHeaders.putAll(constantHeaders);
copy.headerSupplier = headerSupplier;
copy.tlsConfigHelper = tlsConfigHelper.copy();
Expand Down Expand Up @@ -179,7 +181,7 @@ public HttpExporter<T> build() {
HttpSender httpSender =
httpSenderProvider.createSender(
endpoint,
compressionEnabled,
compressor,
exportAsJson ? "application/json" : "application/x-protobuf",
timeoutNanos,
connectTimeoutNanos,
Expand All @@ -202,8 +204,10 @@ public String toString(boolean includePrefixAndSuffix) {
joiner.add("type=" + type);
joiner.add("endpoint=" + endpoint);
joiner.add("timeoutNanos=" + timeoutNanos);
joiner.add(
"compressorEncoding="
+ Optional.ofNullable(compressor).map(Compressor::getEncoding).orElse(null));
joiner.add("connectTimeoutNanos=" + connectTimeoutNanos);
joiner.add("compressionEnabled=" + compressionEnabled);
joiner.add("exportAsJson=" + exportAsJson);
StringJoiner headersJoiner = new StringJoiner(", ", "Headers{", "}");
constantHeaders.forEach((key, value) -> headersJoiner.add(key + "=OBFUSCATED"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.internal.http;

import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.util.List;
import java.util.Map;
Expand All @@ -27,7 +28,7 @@ public interface HttpSenderProvider {
@SuppressWarnings("TooManyParameters")
HttpSender createSender(
String endpoint,
boolean compressionEnabled,
@Nullable Compressor compressor,
String contentType,
long timeoutNanos,
long connectTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -101,10 +104,16 @@ public OtlpHttpLogRecordExporterBuilder setEndpoint(String endpoint) {
*/
public OtlpHttpLogRecordExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
if (compressionMethod.equals("none")) {
delegate.setCompression(null);
return this;
}
Set<String> supportedCompressionMethods = CompressorUtil.supportedCompressors();
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
delegate.setCompression(compressionMethod);
supportedCompressionMethods.contains(compressionMethod),
"Unsupported compressionMethod. Compression method must be \"none\" or one of: "
+ supportedCompressionMethods.stream().collect(joining(",", "[", "]")));
delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
Expand All @@ -19,6 +21,7 @@
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -113,10 +116,16 @@ public OtlpHttpMetricExporterBuilder setEndpoint(String endpoint) {
*/
public OtlpHttpMetricExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
if (compressionMethod.equals("none")) {
delegate.setCompression(null);
return this;
}
Set<String> supportedCompressionMethods = CompressorUtil.supportedCompressors();
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
delegate.setCompression(compressionMethod);
supportedCompressionMethods.contains(compressionMethod),
"Unsupported compressionMethod. Compression method must be \"none\" or one of: "
+ supportedCompressionMethods.stream().collect(joining(",", "[", "]")));
delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.compression.CompressorUtil;
import io.opentelemetry.exporter.internal.http.HttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.OtlpUserAgent;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -101,10 +104,16 @@ public OtlpHttpSpanExporterBuilder setEndpoint(String endpoint) {
*/
public OtlpHttpSpanExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
if (compressionMethod.equals("none")) {
delegate.setCompression(null);
return this;
}
Set<String> supportedCompressionMethods = CompressorUtil.supportedCompressors();
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
delegate.setCompression(compressionMethod);
supportedCompressionMethods.contains(compressionMethod),
"Unsupported compressionMethod. Compression method must be \"none\" or one of: "
+ supportedCompressionMethods.stream().collect(joining(",", "[", "]")));
delegate.setCompression(CompressorUtil.resolveCompressor(compressionMethod));
return this;
}

Expand Down
Loading