From f89fc05f8403474cf5f95a00411aa18059105256 Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Tue, 13 Jun 2023 13:20:33 -0500 Subject: [PATCH] Add HttpSender abstraction (#5505) --- .../exporter/internal/auth/Authenticator.java | 8 +- .../exporter/internal/http/HttpExporter.java | 145 +++++++++++++ .../internal/http/HttpExporterBuilder.java | 142 ++++++++++++ .../exporter/internal/http/HttpSender.java | 56 +++++ .../internal/okhttp/JsonRequestBody.java | 38 ---- .../internal/okhttp/OkHttpExporter.java | 193 ----------------- .../okhttp/OkHttpExporterBuilder.java | 163 -------------- .../internal/okhttp/OkHttpHttpSender.java | 204 ++++++++++++++++++ .../internal/okhttp/ProtoRequestBody.java | 47 ---- .../exporter/internal/retry/RetryUtil.java | 8 +- .../auth/AuthenticatingExporterTest.java | 19 +- .../internal/auth/AuthenticatorTest.java | 6 +- .../okhttp/HttpExporterBuilderTest.java | 75 +++++++ .../okhttp/OkHttpExporterBuilderTest.java | 70 ------ .../internal/retry/RetryUtilTest.java | 6 +- .../otlp/trace/OltpExporterBenchmark.java | 8 +- .../http/logs/OtlpHttpLogRecordExporter.java | 6 +- .../OtlpHttpLogRecordExporterBuilder.java | 10 +- .../http/metrics/OtlpHttpMetricExporter.java | 6 +- .../OtlpHttpMetricExporterBuilder.java | 10 +- .../otlp/http/trace/OtlpHttpSpanExporter.java | 6 +- .../trace/OtlpHttpSpanExporterBuilder.java | 10 +- .../AbstractHttpTelemetryExporterTest.java | 24 ++- 23 files changed, 687 insertions(+), 573 deletions(-) create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSender.java delete mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/JsonRequestBody.java delete mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporter.java delete mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporterBuilder.java create mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpHttpSender.java delete mode 100644 exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/ProtoRequestBody.java create mode 100644 exporters/common/src/test/java/io/opentelemetry/exporter/internal/okhttp/HttpExporterBuilderTest.java delete mode 100644 exporters/common/src/test/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporterBuilderTest.java diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/auth/Authenticator.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/auth/Authenticator.java index 52f083ebc74..7ad2547ee20 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/auth/Authenticator.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/auth/Authenticator.java @@ -6,7 +6,7 @@ package io.opentelemetry.exporter.internal.auth; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import java.lang.reflect.Field; import java.util.Map; @@ -27,7 +27,7 @@ public interface Authenticator { Map getHeaders(); /** - * Reflectively access a {@link GrpcExporterBuilder}, or {@link OkHttpExporterBuilder} instance in + * Reflectively access a {@link GrpcExporterBuilder}, or {@link HttpExporterBuilder} instance in * field called "delegate" of the instance, and set the {@link Authenticator}. * * @param builder export builder to modify @@ -42,8 +42,8 @@ static void setAuthenticatorOnDelegate(Object builder, Authenticator authenticat Object value = field.get(builder); if (value instanceof GrpcExporterBuilder) { throw new IllegalArgumentException("GrpcExporterBuilder not supported yet."); - } else if (value instanceof OkHttpExporterBuilder) { - ((OkHttpExporterBuilder) value).setAuthenticator(authenticator); + } else if (value instanceof HttpExporterBuilder) { + ((HttpExporterBuilder) value).setAuthenticator(authenticator); } else { throw new IllegalArgumentException( "Delegate field is not type DefaultGrpcExporterBuilder or OkHttpGrpcExporterBuilder."); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java new file mode 100644 index 00000000000..f13f018c9b9 --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java @@ -0,0 +1,145 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.http; + +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.ExporterMetrics; +import io.opentelemetry.exporter.internal.grpc.GrpcStatusUtil; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.internal.ThrottlingLogger; +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * An exporter for http/protobuf or http/json using a signal-specific Marshaler. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +@SuppressWarnings("checkstyle:JavadocMethod") +public final class HttpExporter { + + private static final Logger internalLogger = Logger.getLogger(HttpExporter.class.getName()); + + private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); + private final AtomicBoolean isShutdown = new AtomicBoolean(); + + private final String type; + private final HttpSender httpSender; + private final ExporterMetrics exporterMetrics; + private final boolean exportAsJson; + + public HttpExporter( + String exporterName, + String type, + HttpSender httpSender, + Supplier meterProviderSupplier, + boolean exportAsJson) { + this.type = type; + this.httpSender = httpSender; + this.exporterMetrics = + exportAsJson + ? ExporterMetrics.createHttpJson(exporterName, type, meterProviderSupplier) + : ExporterMetrics.createHttpProtobuf(exporterName, type, meterProviderSupplier); + this.exportAsJson = exportAsJson; + } + + public CompletableResultCode export(T exportRequest, int numItems) { + if (isShutdown.get()) { + return CompletableResultCode.ofFailure(); + } + + exporterMetrics.addSeen(numItems); + + CompletableResultCode result = new CompletableResultCode(); + + Consumer marshaler = + os -> { + try { + if (exportAsJson) { + exportRequest.writeJsonTo(os); + } else { + exportRequest.writeBinaryTo(os); + } + } catch (IOException e) { + throw new IllegalStateException(e); + } + }; + + httpSender.send( + marshaler, + exportRequest.getBinarySerializedSize(), + httpResponse -> { + int statusCode = httpResponse.statusCode(); + + if (statusCode >= 200 && statusCode < 300) { + exporterMetrics.addSuccess(numItems); + result.succeed(); + return; + } + + exporterMetrics.addFailed(numItems); + + byte[] body; + try { + body = httpResponse.responseBody(); + } catch (IOException ex) { + throw new IllegalStateException(ex); + } + + String status = extractErrorStatus(httpResponse.statusMessage(), body); + + logger.log( + Level.WARNING, + "Failed to export " + + type + + "s. Server responded with HTTP status code " + + statusCode + + ". Error message: " + + status); + result.fail(); + }, + e -> { + exporterMetrics.addFailed(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The request could not be executed. Full error message: " + + e.getMessage(), + e); + result.fail(); + }); + + return result; + } + + public CompletableResultCode shutdown() { + if (!isShutdown.compareAndSet(false, true)) { + logger.log(Level.INFO, "Calling shutdown() multiple times."); + return CompletableResultCode.ofSuccess(); + } + return httpSender.shutdown(); + } + + private static String extractErrorStatus(String statusMessage, @Nullable byte[] responseBody) { + if (responseBody == null) { + return "Response body missing, HTTP status message: " + statusMessage; + } + try { + return GrpcStatusUtil.getStatusMessage(responseBody); + } catch (IOException e) { + return "Unable to parse response body, HTTP status message: " + statusMessage; + } + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java new file mode 100644 index 00000000000..db65b6336a6 --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java @@ -0,0 +1,142 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.http; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.exporter.internal.ExporterBuilderUtil; +import io.opentelemetry.exporter.internal.TlsConfigHelper; +import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.okhttp.OkHttpHttpSender; +import io.opentelemetry.exporter.internal.retry.RetryPolicy; +import java.net.URI; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.net.ssl.X509TrustManager; + +/** + * A builder for {@link HttpExporter}. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +@SuppressWarnings("checkstyle:JavadocMethod") +public final class HttpExporterBuilder { + public static final long DEFAULT_TIMEOUT_SECS = 10; + + private final String exporterName; + private final String type; + + private String endpoint; + + private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); + private boolean compressionEnabled = false; + private boolean exportAsJson = false; + @Nullable private Map headers; + + private final TlsConfigHelper tlsConfigHelper = new TlsConfigHelper(); + @Nullable private RetryPolicy retryPolicy; + private Supplier meterProviderSupplier = GlobalOpenTelemetry::getMeterProvider; + @Nullable private Authenticator authenticator; + + public HttpExporterBuilder(String exporterName, String type, String defaultEndpoint) { + this.exporterName = exporterName; + this.type = type; + + endpoint = defaultEndpoint; + } + + public HttpExporterBuilder setTimeout(long timeout, TimeUnit unit) { + timeoutNanos = unit.toNanos(timeout); + return this; + } + + public HttpExporterBuilder setTimeout(Duration timeout) { + return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + + public HttpExporterBuilder setEndpoint(String endpoint) { + URI uri = ExporterBuilderUtil.validateEndpoint(endpoint); + this.endpoint = uri.toString(); + return this; + } + + public HttpExporterBuilder setCompression(String compressionMethod) { + this.compressionEnabled = compressionMethod.equals("gzip"); + return this; + } + + public HttpExporterBuilder addHeader(String key, String value) { + if (headers == null) { + headers = new HashMap<>(); + } + headers.put(key, value); + return this; + } + + public HttpExporterBuilder setAuthenticator(Authenticator authenticator) { + this.authenticator = authenticator; + return this; + } + + public HttpExporterBuilder setTrustManagerFromCerts(byte[] trustedCertificatesPem) { + tlsConfigHelper.setTrustManagerFromCerts(trustedCertificatesPem); + return this; + } + + public HttpExporterBuilder setKeyManagerFromCerts( + byte[] privateKeyPem, byte[] certificatePem) { + tlsConfigHelper.setKeyManagerFromCerts(privateKeyPem, certificatePem); + return this; + } + + public HttpExporterBuilder setSslContext( + SSLContext sslContext, X509TrustManager trustManager) { + tlsConfigHelper.setSslContext(sslContext, trustManager); + return this; + } + + public HttpExporterBuilder setMeterProvider(MeterProvider meterProvider) { + this.meterProviderSupplier = () -> meterProvider; + return this; + } + + public HttpExporterBuilder setRetryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return this; + } + + public HttpExporterBuilder exportAsJson() { + this.exportAsJson = true; + return this; + } + + public HttpExporter build() { + Map headers = this.headers == null ? Collections.emptyMap() : this.headers; + Supplier> headerSupplier = () -> headers; + + HttpSender httpSender = + new OkHttpHttpSender( + endpoint, + compressionEnabled, + exportAsJson ? "application/json" : "application/x-protobuf", + timeoutNanos, + headerSupplier, + authenticator, + retryPolicy, + tlsConfigHelper.getSslContext(), + tlsConfigHelper.getTrustManager()); + + return new HttpExporter<>(exporterName, type, httpSender, meterProviderSupplier, exportAsJson); + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSender.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSender.java new file mode 100644 index 00000000000..f7e21cb781a --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSender.java @@ -0,0 +1,56 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.http; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.io.IOException; +import java.io.OutputStream; +import java.util.function.Consumer; + +/** + * An abstraction for sending HTTP requests and handling responses. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + * + * @see HttpExporter + * @see HttpExporterBuilder + */ +public interface HttpSender { + + /** + * Send an HTTP request, including any retry attempts. {@code onResponse} is called with the HTTP + * response, either a success response or a error response after retries. {@code onError} is + * called when the request could not be executed due to cancellation, connectivity problems, or + * timeout. + * + * @param marshaler the request body marshaler + * @param contentLength the request body content length + * @param onResponse the callback to invoke with the HTTP response + * @param onError the callback to invoke when the HTTP request could not be executed + */ + void send( + Consumer marshaler, + int contentLength, + Consumer onResponse, + Consumer onError); + + /** Shutdown the sender. */ + CompletableResultCode shutdown(); + + /** The HTTP response. */ + interface Response { + + /** The HTTP status code. */ + int statusCode(); + + /** The HTTP status message. */ + String statusMessage(); + + /** The HTTP response body. */ + byte[] responseBody() throws IOException; + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/JsonRequestBody.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/JsonRequestBody.java deleted file mode 100644 index 6f5b0f4cafb..00000000000 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/JsonRequestBody.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.internal.okhttp; - -import io.opentelemetry.exporter.internal.marshal.Marshaler; -import java.io.IOException; -import okhttp3.MediaType; -import okhttp3.RequestBody; -import okio.BufferedSink; - -final class JsonRequestBody extends RequestBody { - private static final MediaType JSON_MEDIA_TYPE = MediaType.parse("application/json"); - - private final Marshaler marshaler; - - /** Creates a new {@link JsonRequestBody}. */ - public JsonRequestBody(Marshaler marshaler) { - this.marshaler = marshaler; - } - - @Override - public long contentLength() { - return -1; - } - - @Override - public MediaType contentType() { - return JSON_MEDIA_TYPE; - } - - @Override - public void writeTo(BufferedSink bufferedSink) throws IOException { - marshaler.writeJsonTo(bufferedSink.outputStream()); - } -} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporter.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporter.java deleted file mode 100644 index 5cd207e327a..00000000000 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporter.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.internal.okhttp; - -import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.ExporterMetrics; -import io.opentelemetry.exporter.internal.grpc.GrpcStatusUtil; -import io.opentelemetry.exporter.internal.marshal.Marshaler; -import io.opentelemetry.exporter.internal.retry.RetryUtil; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.internal.ThrottlingLogger; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; -import okhttp3.Call; -import okhttp3.Callback; -import okhttp3.Headers; -import okhttp3.HttpUrl; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import okhttp3.ResponseBody; -import okio.BufferedSink; -import okio.GzipSink; -import okio.Okio; - -/** - * An exporter for http/protobuf or http/json using a signal-specific Marshaler. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ -@SuppressWarnings("checkstyle:JavadocMethod") -public final class OkHttpExporter { - - private static final Logger internalLogger = Logger.getLogger(OkHttpExporter.class.getName()); - - private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); - private final AtomicBoolean isShutdown = new AtomicBoolean(); - - private final String type; - private final OkHttpClient client; - private final HttpUrl url; - @Nullable private final Headers headers; - private final boolean compressionEnabled; - private final Function requestBodyCreator; - - private final ExporterMetrics exporterMetrics; - - OkHttpExporter( - String exporterName, - String type, - OkHttpClient client, - Supplier meterProviderSupplier, - String endpoint, - @Nullable Headers headers, - boolean compressionEnabled, - boolean exportAsJson) { - this.type = type; - this.client = client; - this.url = HttpUrl.get(endpoint); - this.headers = headers; - this.compressionEnabled = compressionEnabled; - this.requestBodyCreator = exportAsJson ? JsonRequestBody::new : ProtoRequestBody::new; - this.exporterMetrics = - exportAsJson - ? ExporterMetrics.createHttpJson(exporterName, type, meterProviderSupplier) - : ExporterMetrics.createHttpProtobuf(exporterName, type, meterProviderSupplier); - } - - public CompletableResultCode export(T exportRequest, int numItems) { - if (isShutdown.get()) { - return CompletableResultCode.ofFailure(); - } - - exporterMetrics.addSeen(numItems); - - Request.Builder requestBuilder = new Request.Builder().url(url); - if (headers != null) { - requestBuilder.headers(headers); - } - RequestBody requestBody = requestBodyCreator.apply(exportRequest); - if (compressionEnabled) { - requestBuilder.addHeader("Content-Encoding", "gzip"); - requestBuilder.post(gzipRequestBody(requestBody)); - } else { - requestBuilder.post(requestBody); - } - - CompletableResultCode result = new CompletableResultCode(); - - client - .newCall(requestBuilder.build()) - .enqueue( - new Callback() { - @Override - public void onFailure(Call call, IOException e) { - exporterMetrics.addFailed(numItems); - logger.log( - Level.SEVERE, - "Failed to export " - + type - + "s. The request could not be executed. Full error message: " - + e.getMessage()); - result.fail(); - } - - @Override - public void onResponse(Call call, Response response) { - try (ResponseBody body = response.body()) { - if (response.isSuccessful()) { - exporterMetrics.addSuccess(numItems); - result.succeed(); - return; - } - - exporterMetrics.addFailed(numItems); - int code = response.code(); - - String status = extractErrorStatus(response, body); - - logger.log( - Level.WARNING, - "Failed to export " - + type - + "s. Server responded with HTTP status code " - + code - + ". Error message: " - + status); - result.fail(); - } - } - }); - - return result; - } - - public CompletableResultCode shutdown() { - if (!isShutdown.compareAndSet(false, true)) { - logger.log(Level.INFO, "Calling shutdown() multiple times."); - return CompletableResultCode.ofSuccess(); - } - client.dispatcher().cancelAll(); - client.dispatcher().executorService().shutdownNow(); - client.connectionPool().evictAll(); - return CompletableResultCode.ofSuccess(); - } - - static boolean isRetryable(Response response) { - return RetryUtil.retryableHttpResponseCodes().contains(response.code()); - } - - private static RequestBody gzipRequestBody(RequestBody requestBody) { - return new RequestBody() { - @Override - public MediaType contentType() { - return requestBody.contentType(); - } - - @Override - public long contentLength() { - return -1; - } - - @Override - public void writeTo(BufferedSink bufferedSink) throws IOException { - BufferedSink gzipSink = Okio.buffer(new GzipSink(bufferedSink)); - requestBody.writeTo(gzipSink); - gzipSink.close(); - } - }; - } - - private static String extractErrorStatus(Response response, @Nullable ResponseBody responseBody) { - if (responseBody == null) { - return "Response body missing, HTTP status message: " + response.message(); - } - try { - return GrpcStatusUtil.getStatusMessage(responseBody.bytes()); - } catch (IOException e) { - return "Unable to parse response body, HTTP status message: " + response.message(); - } - } -} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporterBuilder.java deleted file mode 100644 index aad41b5c6a3..00000000000 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporterBuilder.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.internal.okhttp; - -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.ExporterBuilderUtil; -import io.opentelemetry.exporter.internal.TlsConfigHelper; -import io.opentelemetry.exporter.internal.auth.Authenticator; -import io.opentelemetry.exporter.internal.marshal.Marshaler; -import io.opentelemetry.exporter.internal.retry.RetryInterceptor; -import io.opentelemetry.exporter.internal.retry.RetryPolicy; -import java.net.URI; -import java.time.Duration; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import javax.annotation.Nullable; -import javax.net.ssl.SSLContext; -import javax.net.ssl.X509TrustManager; -import okhttp3.Headers; -import okhttp3.OkHttpClient; -import okhttp3.Request; - -/** - * A builder for {@link OkHttpExporter}. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ -@SuppressWarnings("checkstyle:JavadocMethod") -public final class OkHttpExporterBuilder { - public static final long DEFAULT_TIMEOUT_SECS = 10; - - private final String exporterName; - private final String type; - - private String endpoint; - - private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); - private boolean compressionEnabled = false; - private boolean exportAsJson = false; - @Nullable private Headers.Builder headersBuilder; - - private final TlsConfigHelper tlsConfigHelper = new TlsConfigHelper(); - @Nullable private RetryPolicy retryPolicy; - private Supplier meterProviderSupplier = GlobalOpenTelemetry::getMeterProvider; - @Nullable private Authenticator authenticator; - - public OkHttpExporterBuilder(String exporterName, String type, String defaultEndpoint) { - this.exporterName = exporterName; - this.type = type; - - endpoint = defaultEndpoint; - } - - public OkHttpExporterBuilder setTimeout(long timeout, TimeUnit unit) { - timeoutNanos = unit.toNanos(timeout); - return this; - } - - public OkHttpExporterBuilder setTimeout(Duration timeout) { - return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); - } - - public OkHttpExporterBuilder setEndpoint(String endpoint) { - URI uri = ExporterBuilderUtil.validateEndpoint(endpoint); - this.endpoint = uri.toString(); - return this; - } - - public OkHttpExporterBuilder setCompression(String compressionMethod) { - this.compressionEnabled = compressionMethod.equals("gzip"); - return this; - } - - public OkHttpExporterBuilder addHeader(String key, String value) { - if (headersBuilder == null) { - headersBuilder = new Headers.Builder(); - } - headersBuilder.add(key, value); - return this; - } - - public OkHttpExporterBuilder setAuthenticator(Authenticator authenticator) { - this.authenticator = authenticator; - return this; - } - - public OkHttpExporterBuilder setTrustManagerFromCerts(byte[] trustedCertificatesPem) { - tlsConfigHelper.setTrustManagerFromCerts(trustedCertificatesPem); - return this; - } - - public OkHttpExporterBuilder setKeyManagerFromCerts( - byte[] privateKeyPem, byte[] certificatePem) { - tlsConfigHelper.setKeyManagerFromCerts(privateKeyPem, certificatePem); - return this; - } - - public OkHttpExporterBuilder setSslContext( - SSLContext sslContext, X509TrustManager trustManager) { - tlsConfigHelper.setSslContext(sslContext, trustManager); - return this; - } - - public OkHttpExporterBuilder setMeterProvider(MeterProvider meterProvider) { - this.meterProviderSupplier = () -> meterProvider; - return this; - } - - public OkHttpExporterBuilder setRetryPolicy(RetryPolicy retryPolicy) { - this.retryPolicy = retryPolicy; - return this; - } - - public OkHttpExporterBuilder exportAsJson() { - this.exportAsJson = true; - return this; - } - - public OkHttpExporter build() { - OkHttpClient.Builder clientBuilder = - new OkHttpClient.Builder() - .dispatcher(OkHttpUtil.newDispatcher()) - .callTimeout(Duration.ofNanos(timeoutNanos)); - - SSLContext sslContext = tlsConfigHelper.getSslContext(); - X509TrustManager trustManager = tlsConfigHelper.getTrustManager(); - if (sslContext != null && trustManager != null) { - clientBuilder.sslSocketFactory(sslContext.getSocketFactory(), trustManager); - } - - Headers headers = headersBuilder == null ? null : headersBuilder.build(); - - if (retryPolicy != null) { - clientBuilder.addInterceptor(new RetryInterceptor(retryPolicy, OkHttpExporter::isRetryable)); - } - - if (authenticator != null) { - Authenticator finalAuthenticator = authenticator; - // Generate and attach OkHttp Authenticator implementation - clientBuilder.authenticator( - (route, response) -> { - Request.Builder requestBuilder = response.request().newBuilder(); - finalAuthenticator.getHeaders().forEach(requestBuilder::header); - return requestBuilder.build(); - }); - } - - return new OkHttpExporter<>( - exporterName, - type, - clientBuilder.build(), - meterProviderSupplier, - endpoint, - headers, - compressionEnabled, - exportAsJson); - } -} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpHttpSender.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpHttpSender.java new file mode 100644 index 00000000000..442f0f8efa7 --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/OkHttpHttpSender.java @@ -0,0 +1,204 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.okhttp; + +import io.opentelemetry.exporter.internal.auth.Authenticator; +import io.opentelemetry.exporter.internal.http.HttpSender; +import io.opentelemetry.exporter.internal.retry.RetryInterceptor; +import io.opentelemetry.exporter.internal.retry.RetryPolicy; +import io.opentelemetry.exporter.internal.retry.RetryUtil; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.io.IOException; +import java.io.OutputStream; +import java.time.Duration; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import javax.net.ssl.X509TrustManager; +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.ResponseBody; +import okio.BufferedSink; +import okio.GzipSink; +import okio.Okio; + +/** + * {@link HttpSender} which is backed by OkHttp. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class OkHttpHttpSender implements HttpSender { + + private final OkHttpClient client; + private final HttpUrl url; + private final boolean compressionEnabled; + private final Supplier> headerSupplier; + private final MediaType mediaType; + + /** Create a sender. */ + public OkHttpHttpSender( + String endpoint, + boolean compressionEnabled, + String contentType, + long timeoutNanos, + Supplier> headerSupplier, + @Nullable Authenticator authenticator, + @Nullable RetryPolicy retryPolicy, + @Nullable SSLContext sslContext, + @Nullable X509TrustManager trustManager) { + OkHttpClient.Builder builder = + new OkHttpClient.Builder() + .dispatcher(OkHttpUtil.newDispatcher()) + .callTimeout(Duration.ofNanos(timeoutNanos)); + + if (authenticator != null) { + Authenticator finalAuthenticator = authenticator; + // Generate and attach OkHttp Authenticator implementation + builder.authenticator( + (route, response) -> { + Request.Builder requestBuilder = response.request().newBuilder(); + finalAuthenticator.getHeaders().forEach(requestBuilder::header); + return requestBuilder.build(); + }); + } + + if (retryPolicy != null) { + builder.addInterceptor(new RetryInterceptor(retryPolicy, OkHttpHttpSender::isRetryable)); + } + if (sslContext != null && trustManager != null) { + builder.sslSocketFactory(sslContext.getSocketFactory(), trustManager); + } + this.client = builder.build(); + this.url = HttpUrl.get(endpoint); + this.compressionEnabled = compressionEnabled; + this.mediaType = MediaType.parse(contentType); + this.headerSupplier = headerSupplier; + } + + @Override + public void send( + Consumer marshaler, + int contentLength, + Consumer onResponse, + Consumer onError) { + Request.Builder requestBuilder = new Request.Builder().url(url); + headerSupplier.get().forEach(requestBuilder::addHeader); + RequestBody body = new RawRequestBody(marshaler, contentLength, mediaType); + if (compressionEnabled) { + requestBuilder.addHeader("Content-Encoding", "gzip"); + requestBuilder.post(new GzipRequestBody(body)); + } else { + requestBuilder.post(body); + } + + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + onError.accept(e); + } + + @Override + public void onResponse(Call call, okhttp3.Response response) { + try (ResponseBody body = response.body()) { + onResponse.accept( + new Response() { + @Override + public int statusCode() { + return response.code(); + } + + @Override + public String statusMessage() { + return response.message(); + } + + @Override + public byte[] responseBody() throws IOException { + return body.bytes(); + } + }); + } + } + }); + } + + @Override + public CompletableResultCode shutdown() { + client.dispatcher().cancelAll(); + client.dispatcher().executorService().shutdownNow(); + client.connectionPool().evictAll(); + return CompletableResultCode.ofSuccess(); + } + + static boolean isRetryable(okhttp3.Response response) { + return RetryUtil.retryableHttpResponseCodes().contains(response.code()); + } + + private static class RawRequestBody extends RequestBody { + + private final Consumer marshaler; + private final int contentLength; + private final MediaType mediaType; + + private RawRequestBody( + Consumer marshaler, int contentLength, MediaType mediaType) { + this.marshaler = marshaler; + this.contentLength = contentLength; + this.mediaType = mediaType; + } + + @Override + public long contentLength() { + return contentLength; + } + + @Override + public MediaType contentType() { + return mediaType; + } + + @Override + public void writeTo(BufferedSink bufferedSink) { + marshaler.accept(bufferedSink.outputStream()); + } + } + + private static class GzipRequestBody extends RequestBody { + private final RequestBody requestBody; + + private GzipRequestBody(RequestBody requestBody) { + this.requestBody = requestBody; + } + + @Override + public MediaType contentType() { + return requestBody.contentType(); + } + + @Override + public long contentLength() { + return -1; + } + + @Override + public void writeTo(BufferedSink bufferedSink) throws IOException { + BufferedSink gzipSink = Okio.buffer(new GzipSink(bufferedSink)); + requestBody.writeTo(gzipSink); + gzipSink.close(); + } + } +} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/ProtoRequestBody.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/ProtoRequestBody.java deleted file mode 100644 index 81581207995..00000000000 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/okhttp/ProtoRequestBody.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.internal.okhttp; - -import io.opentelemetry.exporter.internal.marshal.Marshaler; -import java.io.IOException; -import okhttp3.MediaType; -import okhttp3.RequestBody; -import okio.BufferedSink; - -/** - * A {@link RequestBody} for reading from a {@link Marshaler}. - * - *

This class is internal and is hence not for public use. Its APIs are unstable and can change - * at any time. - */ -final class ProtoRequestBody extends RequestBody { - - private static final MediaType PROTOBUF_MEDIA_TYPE = MediaType.parse("application/x-protobuf"); - - private final Marshaler marshaler; - private final int contentLength; - - /** Creates a new {@link ProtoRequestBody}. */ - public ProtoRequestBody(Marshaler marshaler) { - this.marshaler = marshaler; - contentLength = marshaler.getBinarySerializedSize(); - } - - @Override - public long contentLength() { - return contentLength; - } - - @Override - public MediaType contentType() { - return PROTOBUF_MEDIA_TYPE; - } - - @Override - public void writeTo(BufferedSink bufferedSink) throws IOException { - marshaler.writeBinaryTo(bufferedSink.outputStream()); - } -} diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryUtil.java index 90f05deddae..4c28655afd3 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryUtil.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/retry/RetryUtil.java @@ -7,7 +7,7 @@ import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; import io.opentelemetry.exporter.internal.grpc.GrpcStatusUtil; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collections; @@ -49,7 +49,7 @@ public static Set retryableHttpResponseCodes() { } /** - * Reflectively access a {@link GrpcExporterBuilder}, or {@link OkHttpExporterBuilder} instance in + * Reflectively access a {@link GrpcExporterBuilder}, or {@link HttpExporterBuilder} instance in * field called "delegate" of the instance, and set the {@link RetryPolicy}. * * @throws IllegalArgumentException if the instance does not contain a field called "delegate" of @@ -62,8 +62,8 @@ public static void setRetryPolicyOnDelegate(Object instance, RetryPolicy retryPo Object value = field.get(instance); if (value instanceof GrpcExporterBuilder) { ((GrpcExporterBuilder) value).setRetryPolicy(retryPolicy); - } else if (value instanceof OkHttpExporterBuilder) { - ((OkHttpExporterBuilder) value).setRetryPolicy(retryPolicy); + } else if (value instanceof HttpExporterBuilder) { + ((HttpExporterBuilder) value).setRetryPolicy(retryPolicy); } else { throw new IllegalArgumentException( "delegate field is not type DefaultGrpcExporterBuilder or OkHttpGrpcExporterBuilder"); diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatingExporterTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatingExporterTest.java index 93e6082721a..4a430b9bb0e 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatingExporterTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatingExporterTest.java @@ -10,12 +10,11 @@ import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension; +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.exporter.internal.marshal.Serializer; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporter; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder; import io.opentelemetry.sdk.common.CompletableResultCode; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -38,13 +37,13 @@ public int getBinarySerializedSize() { } @Override - protected void writeTo(Serializer output) throws IOException {} + protected void writeTo(Serializer output) {} }; @Test - void export() throws Exception { - OkHttpExporter exporter = - new OkHttpExporterBuilder<>("otlp", "test", server.httpUri().toASCIIString()) + void export() { + HttpExporter exporter = + new HttpExporterBuilder<>("otlp", "test", server.httpUri().toASCIIString()) .setAuthenticator( () -> { Map headers = new HashMap<>(); @@ -67,9 +66,9 @@ void export() throws Exception { /** Ensure that exporter gives up if a request is always considered UNAUTHORIZED. */ @Test - void export_giveup() throws Exception { - OkHttpExporter exporter = - new OkHttpExporterBuilder<>("otlp", "test", server.httpUri().toASCIIString()) + void export_giveup() { + HttpExporter exporter = + new HttpExporterBuilder<>("otlp", "test", server.httpUri().toASCIIString()) .setAuthenticator( () -> { server.enqueue(HttpResponse.of(HttpStatus.UNAUTHORIZED)); diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java index 92e3614279a..9722aeb617d 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/auth/AuthenticatorTest.java @@ -10,7 +10,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.opentelemetry.exporter.internal.grpc.GrpcExporter; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -31,8 +31,8 @@ void getHeaders() { @Test void setAuthenticatorOnDelegate_Success() { - OkHttpExporterBuilder builder = - new OkHttpExporterBuilder<>("otlp", "test", "http://localhost:4318/test"); + HttpExporterBuilder builder = + new HttpExporterBuilder<>("otlp", "test", "http://localhost:4318/test"); assertThat(builder).extracting("authenticator").isNull(); diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/okhttp/HttpExporterBuilderTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/okhttp/HttpExporterBuilderTest.java new file mode 100644 index 00000000000..f81db2ae116 --- /dev/null +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/okhttp/HttpExporterBuilderTest.java @@ -0,0 +1,75 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal.okhttp; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import org.junit.jupiter.api.Test; + +class HttpExporterBuilderTest { + + private final HttpExporterBuilder builder = + new HttpExporterBuilder<>("otlp", "span", "http://localhost:4318/v1/traces"); + + @Test + void compressionDefault() { + HttpExporter exporter = builder.build(); + try { + assertThat(exporter) + .isInstanceOfSatisfying( + HttpExporter.class, + otlp -> + assertThat(otlp).extracting("httpSender.compressionEnabled").isEqualTo(false)); + } finally { + exporter.shutdown(); + } + } + + @Test + void compressionNone() { + HttpExporter exporter = builder.setCompression("none").build(); + try { + assertThat(exporter) + .isInstanceOfSatisfying( + HttpExporter.class, + otlp -> + assertThat(otlp).extracting("httpSender.compressionEnabled").isEqualTo(false)); + } finally { + exporter.shutdown(); + } + } + + @Test + void compressionGzip() { + HttpExporter exporter = builder.setCompression("gzip").build(); + try { + assertThat(exporter) + .isInstanceOfSatisfying( + HttpExporter.class, + otlp -> assertThat(otlp).extracting("httpSender.compressionEnabled").isEqualTo(true)); + } finally { + exporter.shutdown(); + } + } + + @Test + void compressionEnabledAndDisabled() { + HttpExporter exporter = + builder.setCompression("gzip").setCompression("none").build(); + try { + assertThat(exporter) + .isInstanceOfSatisfying( + HttpExporter.class, + otlp -> + assertThat(otlp).extracting("httpSender.compressionEnabled").isEqualTo(false)); + } finally { + exporter.shutdown(); + } + } +} diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporterBuilderTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporterBuilderTest.java deleted file mode 100644 index d6cd07b5d90..00000000000 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/okhttp/OkHttpExporterBuilderTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.exporter.internal.okhttp; - -import static org.assertj.core.api.Assertions.assertThat; - -import io.opentelemetry.exporter.internal.marshal.Marshaler; -import org.junit.jupiter.api.Test; - -class OkHttpExporterBuilderTest { - - private final OkHttpExporterBuilder builder = - new OkHttpExporterBuilder<>("otlp", "span", "http://localhost:4318/v1/traces"); - - @Test - void compressionDefault() { - OkHttpExporter exporter = builder.build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - OkHttpExporter.class, - otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionNone() { - OkHttpExporter exporter = builder.setCompression("none").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - OkHttpExporter.class, - otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionGzip() { - OkHttpExporter exporter = builder.setCompression("gzip").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - OkHttpExporter.class, - otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(true)); - } finally { - exporter.shutdown(); - } - } - - @Test - void compressionEnabledAndDisabled() { - OkHttpExporter exporter = - builder.setCompression("gzip").setCompression("none").build(); - try { - assertThat(exporter) - .isInstanceOfSatisfying( - OkHttpExporter.class, - otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false)); - } finally { - exporter.shutdown(); - } - } -} diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/retry/RetryUtilTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/retry/RetryUtilTest.java index f4c43f735e3..2897ff7ab43 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/retry/RetryUtilTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/retry/RetryUtilTest.java @@ -11,7 +11,7 @@ import io.opentelemetry.exporter.internal.grpc.GrpcExporter; import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import java.net.URI; import java.net.URISyntaxException; import org.assertj.core.api.InstanceOfAssertFactories; @@ -36,8 +36,8 @@ void setRetryPolicyOnDelegate_GrpcExporterBuilder() throws URISyntaxException { @Test void setRetryPolicyOnDelegate_OkHttpExporterBuilder() { RetryPolicy retryPolicy = RetryPolicy.getDefault(); - OkHttpExporterBuilder builder = - new OkHttpExporterBuilder<>("otlp", "test", "http://localhost:4318/test"); + HttpExporterBuilder builder = + new HttpExporterBuilder<>("otlp", "test", "http://localhost:4318/test"); RetryUtil.setRetryPolicyOnDelegate(new WithDelegate(builder), retryPolicy); assertThat(builder) diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index 036e35409d8..d06060056c7 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -12,8 +12,8 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import io.opentelemetry.exporter.internal.grpc.GrpcExporter; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporter; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder; +import io.opentelemetry.exporter.internal.http.HttpExporter; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; @@ -64,7 +64,7 @@ public void export( private static GrpcExporter defaultGrpcExporter; private static GrpcExporter okhttpGrpcExporter; - private static OkHttpExporter httpExporter; + private static HttpExporter httpExporter; @Setup(Level.Trial) public void setUp() { @@ -96,7 +96,7 @@ public void setUp() { .build(); httpExporter = - new OkHttpExporterBuilder( + new HttpExporterBuilder( "otlp", "span", "http://localhost:" + server.activeLocalPort() + "/v1/traces") .build(); } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporter.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporter.java index 6b6d4cdbfa4..1d1b326aa5e 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporter.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporter.java @@ -5,7 +5,7 @@ package io.opentelemetry.exporter.otlp.http.logs; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.data.LogRecordData; @@ -21,9 +21,9 @@ @ThreadSafe public final class OtlpHttpLogRecordExporter implements LogRecordExporter { - private final OkHttpExporter delegate; + private final HttpExporter delegate; - OtlpHttpLogRecordExporter(OkHttpExporter delegate) { + OtlpHttpLogRecordExporter(HttpExporter delegate) { this.delegate = delegate; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java index a67d86f8e05..b49ab80e3d5 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java @@ -10,7 +10,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.OtlpUserAgent; import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler; import java.time.Duration; @@ -27,16 +27,16 @@ public final class OtlpHttpLogRecordExporterBuilder { private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/logs"; - private final OkHttpExporterBuilder delegate; + private final HttpExporterBuilder delegate; OtlpHttpLogRecordExporterBuilder() { - delegate = new OkHttpExporterBuilder<>("otlp", "log", DEFAULT_ENDPOINT); + delegate = new HttpExporterBuilder<>("otlp", "log", DEFAULT_ENDPOINT); OtlpUserAgent.addUserAgentHeader(delegate::addHeader); } /** * Sets the maximum time to wait for the collector to process an exported batch of logs. If unset, - * defaults to {@value OkHttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. + * defaults to {@value HttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. */ public OtlpHttpLogRecordExporterBuilder setTimeout(long timeout, TimeUnit unit) { requireNonNull(unit, "unit"); @@ -47,7 +47,7 @@ public OtlpHttpLogRecordExporterBuilder setTimeout(long timeout, TimeUnit unit) /** * Sets the maximum time to wait for the collector to process an exported batch of logs. If unset, - * defaults to {@value OkHttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. + * defaults to {@value HttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. */ public OtlpHttpLogRecordExporterBuilder setTimeout(Duration timeout) { requireNonNull(timeout, "timeout"); diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java index 8dac934577e..2db30be7646 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporter.java @@ -5,7 +5,7 @@ package io.opentelemetry.exporter.otlp.http.metrics; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.Aggregation; @@ -26,12 +26,12 @@ @ThreadSafe public final class OtlpHttpMetricExporter implements MetricExporter { - private final OkHttpExporter delegate; + private final HttpExporter delegate; private final AggregationTemporalitySelector aggregationTemporalitySelector; private final DefaultAggregationSelector defaultAggregationSelector; OtlpHttpMetricExporter( - OkHttpExporter delegate, + HttpExporter delegate, AggregationTemporalitySelector aggregationTemporalitySelector, DefaultAggregationSelector defaultAggregationSelector) { this.delegate = delegate; diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java index 2816b973957..2b0a86eee8a 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java @@ -9,7 +9,7 @@ import static java.util.Objects.requireNonNull; import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.OtlpUserAgent; import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler; import io.opentelemetry.sdk.metrics.InstrumentType; @@ -33,7 +33,7 @@ public final class OtlpHttpMetricExporterBuilder { private static final AggregationTemporalitySelector DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR = AggregationTemporalitySelector.alwaysCumulative(); - private final OkHttpExporterBuilder delegate; + private final HttpExporterBuilder delegate; private AggregationTemporalitySelector aggregationTemporalitySelector = DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR; @@ -41,14 +41,14 @@ public final class OtlpHttpMetricExporterBuilder { DefaultAggregationSelector.getDefault(); OtlpHttpMetricExporterBuilder() { - delegate = new OkHttpExporterBuilder<>("otlp", "metric", DEFAULT_ENDPOINT); + delegate = new HttpExporterBuilder<>("otlp", "metric", DEFAULT_ENDPOINT); delegate.setMeterProvider(MeterProvider.noop()); OtlpUserAgent.addUserAgentHeader(delegate::addHeader); } /** * Sets the maximum time to wait for the collector to process an exported batch of metrics. If - * unset, defaults to {@value OkHttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. + * unset, defaults to {@value HttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. */ public OtlpHttpMetricExporterBuilder setTimeout(long timeout, TimeUnit unit) { requireNonNull(unit, "unit"); @@ -59,7 +59,7 @@ public OtlpHttpMetricExporterBuilder setTimeout(long timeout, TimeUnit unit) { /** * Sets the maximum time to wait for the collector to process an exported batch of metrics. If - * unset, defaults to {@value OkHttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. + * unset, defaults to {@value HttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. */ public OtlpHttpMetricExporterBuilder setTimeout(Duration timeout) { requireNonNull(timeout, "timeout"); diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java index 9e093e9b0e1..de832e40dd0 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporter.java @@ -5,7 +5,7 @@ package io.opentelemetry.exporter.otlp.http.trace; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; @@ -21,9 +21,9 @@ @ThreadSafe public final class OtlpHttpSpanExporter implements SpanExporter { - private final OkHttpExporter delegate; + private final HttpExporter delegate; - OtlpHttpSpanExporter(OkHttpExporter delegate) { + OtlpHttpSpanExporter(HttpExporter delegate) { this.delegate = delegate; } diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java index 8f7b7513da4..8f1b2910b37 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java @@ -10,7 +10,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.metrics.MeterProvider; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder; +import io.opentelemetry.exporter.internal.http.HttpExporterBuilder; import io.opentelemetry.exporter.internal.otlp.OtlpUserAgent; import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler; import java.time.Duration; @@ -27,16 +27,16 @@ public final class OtlpHttpSpanExporterBuilder { private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/traces"; - private final OkHttpExporterBuilder delegate; + private final HttpExporterBuilder delegate; OtlpHttpSpanExporterBuilder() { - delegate = new OkHttpExporterBuilder<>("otlp", "span", DEFAULT_ENDPOINT); + delegate = new HttpExporterBuilder<>("otlp", "span", DEFAULT_ENDPOINT); OtlpUserAgent.addUserAgentHeader(delegate::addHeader); } /** * Sets the maximum time to wait for the collector to process an exported batch of spans. If - * unset, defaults to {@value OkHttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. + * unset, defaults to {@value HttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. */ public OtlpHttpSpanExporterBuilder setTimeout(long timeout, TimeUnit unit) { requireNonNull(unit, "unit"); @@ -47,7 +47,7 @@ public OtlpHttpSpanExporterBuilder setTimeout(long timeout, TimeUnit unit) { /** * Sets the maximum time to wait for the collector to process an exported batch of spans. If - * unset, defaults to {@value OkHttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. + * unset, defaults to {@value HttpExporterBuilder#DEFAULT_TIMEOUT_SECS}s. */ public OtlpHttpSpanExporterBuilder setTimeout(Duration timeout) { requireNonNull(timeout, "timeout"); diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java index 78e26c047d6..3c5a799dffc 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractHttpTelemetryExporterTest.java @@ -28,8 +28,8 @@ import io.github.netmikey.logunit.api.LogCapturer; import io.opentelemetry.exporter.internal.TlsUtil; import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter; +import io.opentelemetry.exporter.internal.http.HttpExporter; import io.opentelemetry.exporter.internal.marshal.Marshaler; -import io.opentelemetry.exporter.internal.okhttp.OkHttpExporter; import io.opentelemetry.exporter.internal.retry.RetryPolicy; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; @@ -192,7 +192,7 @@ private static byte[] maybeGzipInflate(RequestHeaders requestHeaders, byte[] con } } - @RegisterExtension LogCapturer logs = LogCapturer.create().captureForType(OkHttpExporter.class); + @RegisterExtension LogCapturer logs = LogCapturer.create().captureForType(HttpExporter.class); private final String type; private final String path; @@ -274,7 +274,9 @@ void multipleItems() { void compressionWithNone() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri() + path).setCompression("none").build(); - assertThat(exporter.unwrap()).extracting("delegate.compressionEnabled").isEqualTo(false); + assertThat(exporter.unwrap()) + .extracting("delegate.httpSender.compressionEnabled") + .isEqualTo(false); try { CompletableResultCode result = exporter.export(Collections.singletonList(generateFakeTelemetry())); @@ -295,7 +297,9 @@ void compressionWithGzip() { assumeThat(exporter.unwrap()) .extracting("delegate") .isNotInstanceOf(UpstreamGrpcExporter.class); - assertThat(exporter.unwrap()).extracting("delegate.compressionEnabled").isEqualTo(true); + assertThat(exporter.unwrap()) + .extracting("delegate.httpSender.compressionEnabled") + .isEqualTo(true); try { CompletableResultCode result = exporter.export(Collections.singletonList(generateFakeTelemetry())); @@ -409,7 +413,7 @@ void tlsViaSslContext() throws Exception { } @Test - @SuppressLogger(OkHttpExporter.class) + @SuppressLogger(HttpExporter.class) void tls_untrusted() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpsUri() + path).build(); try { @@ -479,7 +483,7 @@ void deadlineSetPerExport() throws InterruptedException { } @Test - @SuppressLogger(OkHttpExporter.class) + @SuppressLogger(HttpExporter.class) void exportAfterShutdown() { TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri() + path).build(); exporter.shutdown(); @@ -493,7 +497,7 @@ void exportAfterShutdown() { } @Test - @SuppressLogger(OkHttpExporter.class) + @SuppressLogger(HttpExporter.class) void doubleShutdown() { int logsSizeBefore = logs.getEvents().size(); TelemetryExporter exporter = exporterBuilder().setEndpoint(server.httpUri() + path).build(); @@ -505,7 +509,7 @@ void doubleShutdown() { } @Test - @SuppressLogger(OkHttpExporter.class) + @SuppressLogger(HttpExporter.class) void error() { addHttpError(500); assertThat( @@ -544,7 +548,7 @@ void retryableError(int code) { } @Test - @SuppressLogger(OkHttpExporter.class) + @SuppressLogger(HttpExporter.class) void retryableError_tooManyAttempts() { addHttpError(502); addHttpError(502); @@ -566,7 +570,7 @@ void retryableError_tooManyAttempts() { } @ParameterizedTest - @SuppressLogger(OkHttpExporter.class) + @SuppressLogger(HttpExporter.class) @ValueSource(ints = {400, 401, 403, 500, 501}) void nonRetryableError(int code) { addHttpError(code);