From 726c5aa9126f362f3e577acaeaa8536ea2ccd8c3 Mon Sep 17 00:00:00 2001 From: Jack Berg Date: Fri, 15 Dec 2023 17:02:47 -0600 Subject: [PATCH] Add connectTimeout configuration option OtlpGrpc{Signal}Exporters --- .../opentelemetry-exporter-otlp.txt | 13 ++++++- .../internal/grpc/GrpcExporterBuilder.java | 11 ++++++ .../internal/grpc/GrpcSenderProvider.java | 1 + .../otlp/trace/OltpExporterBenchmark.java | 1 + .../OtlpGrpcLogRecordExporterBuilder.java | 20 +++++++++++ .../OtlpGrpcMetricExporterBuilder.java | 20 +++++++++++ .../trace/OtlpGrpcSpanExporterBuilder.java | 20 +++++++++++ .../AbstractGrpcTelemetryExporterTest.java | 34 +++++++++++++++++++ .../GrpcLogRecordExporterBuilderWrapper.java | 3 +- .../GrpcMetricExporterBuilderWrapper.java | 3 +- .../GrpcSpanExporterBuilderWrapper.java | 3 +- .../internal/UpstreamGrpcSenderProvider.java | 1 + .../okhttp/internal/OkHttpGrpcSender.java | 4 ++- .../internal/OkHttpGrpcSenderProvider.java | 2 ++ .../internal/OkHttpGrpcSuppressionTest.java | 2 +- 15 files changed, 132 insertions(+), 6 deletions(-) diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt index df26146497b..f5f51eb6e25 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt @@ -1,2 +1,13 @@ Comparing source compatibility of against -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setConnectTimeout(java.time.Duration) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setConnectTimeout(java.time.Duration) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit) + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setConnectTimeout(java.time.Duration) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java index bef1db06224..7e5e1fd57df 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java @@ -41,6 +41,8 @@ @SuppressWarnings("JavadocMethod") public class GrpcExporterBuilder { + public static final long DEFAULT_CONNECT_TIMEOUT_SECS = 10; + private static final Logger LOGGER = Logger.getLogger(GrpcExporterBuilder.class.getName()); private final String exporterName; @@ -50,6 +52,7 @@ public class GrpcExporterBuilder { grpcStubFactory; private long timeoutNanos; + private long connectTimeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_CONNECT_TIMEOUT_SECS); private URI endpoint; private boolean compressionEnabled = false; private final Map constantHeaders = new HashMap<>(); @@ -90,6 +93,11 @@ public GrpcExporterBuilder setTimeout(Duration timeout) { return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); } + public GrpcExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) { + connectTimeoutNanos = unit.toNanos(timeout); + return this; + } + public GrpcExporterBuilder setEndpoint(String endpoint) { this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint); return this; @@ -149,6 +157,7 @@ public GrpcExporterBuilder copy() { grpcEndpointPath); copy.timeoutNanos = timeoutNanos; + copy.connectTimeoutNanos = connectTimeoutNanos; copy.endpoint = endpoint; copy.compressionEnabled = compressionEnabled; copy.constantHeaders.putAll(constantHeaders); @@ -191,6 +200,7 @@ public GrpcExporter build() { grpcEndpointPath, compressionEnabled, timeoutNanos, + connectTimeoutNanos, headerSupplier, grpcChannel, grpcStubFactory, @@ -212,6 +222,7 @@ public String toString(boolean includePrefixAndSuffix) { joiner.add("endpoint=" + endpoint.toString()); joiner.add("endpointPath=" + grpcEndpointPath); joiner.add("timeoutNanos=" + timeoutNanos); + joiner.add("connectTimeoutNanos=" + connectTimeoutNanos); joiner.add("compressionEnabled=" + compressionEnabled); StringJoiner headersJoiner = new StringJoiner(", ", "Headers{", "}"); constantHeaders.forEach((key, value) -> headersJoiner.add(key + "=OBFUSCATED")); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java index c8c584f6e58..b38b2a7e567 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcSenderProvider.java @@ -33,6 +33,7 @@ GrpcSender createSender( String endpointPath, boolean compressionEnabled, long timeoutNanos, + long connectTimeoutNanos, Supplier>> headersSupplier, @Nullable Object managedChannel, Supplier>> stubFactory, diff --git a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java index 1ad124271a3..f6e02c5e7a7 100644 --- a/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java +++ b/exporters/otlp/all/src/jmh/java/io/opentelemetry/exporter/otlp/trace/OltpExporterBenchmark.java @@ -99,6 +99,7 @@ public void setUp() { .toString(), /* compressionEnabled= */ false, 10, + 10, Collections::emptyMap, null, null, diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java index b04014fc729..14efd2169f4 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java @@ -98,6 +98,26 @@ public OtlpGrpcLogRecordExporterBuilder setTimeout(Duration timeout) { return this; } + /** + * Sets the maximum time to wait for new connections to be established. If unset, defaults to + * {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s. + */ + public OtlpGrpcLogRecordExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(timeout >= 0, "timeout must be non-negative"); + delegate.setConnectTimeout(timeout, unit); + return this; + } + + /** + * Sets the maximum time to wait for new connections to be established. If unset, defaults to + * {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s. + */ + public OtlpGrpcLogRecordExporterBuilder setConnectTimeout(Duration timeout) { + requireNonNull(timeout, "timeout"); + return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + /** * Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The * endpoint must start with either http:// or https://. diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java index 43a0dbdcc41..f35e760f3b4 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java @@ -110,6 +110,26 @@ public OtlpGrpcMetricExporterBuilder setTimeout(Duration timeout) { return this; } + /** + * Sets the maximum time to wait for new connections to be established. If unset, defaults to + * {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s. + */ + public OtlpGrpcMetricExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(timeout >= 0, "timeout must be non-negative"); + delegate.setConnectTimeout(timeout, unit); + return this; + } + + /** + * Sets the maximum time to wait for new connections to be established. If unset, defaults to + * {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s. + */ + public OtlpGrpcMetricExporterBuilder setConnectTimeout(Duration timeout) { + requireNonNull(timeout, "timeout"); + return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + /** * Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The * endpoint must start with either http:// or https://. diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java index 884a8dea172..de5cd7c6f91 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java @@ -94,6 +94,26 @@ public OtlpGrpcSpanExporterBuilder setTimeout(Duration timeout) { return this; } + /** + * Sets the maximum time to wait for new connections to be established. If unset, defaults to + * {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s. + */ + public OtlpGrpcSpanExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) { + requireNonNull(unit, "unit"); + checkArgument(timeout >= 0, "timeout must be non-negative"); + delegate.setConnectTimeout(timeout, unit); + return this; + } + + /** + * Sets the maximum time to wait for new connections to be established. If unset, defaults to + * {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s. + */ + public OtlpGrpcSpanExporterBuilder setConnectTimeout(Duration timeout) { + requireNonNull(timeout, "timeout"); + return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS); + } + /** * Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The * endpoint must start with either http:// or https://. diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java index c2490029aac..253914e3f48 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/AbstractGrpcTelemetryExporterTest.java @@ -418,6 +418,33 @@ public Stream provideArguments(ExtensionContext context) th } } + @Test + @SuppressLogger(GrpcExporter.class) + void connectTimeout() { + // UpstreamGrpcSender doesn't support connectTimeout, so we skip the test + assumeThat(exporter.unwrap()) + .extracting("delegate.grpcSender") + .matches(sender -> sender.getClass().getSimpleName().equals("OkHttpGrpcSender")); + + TelemetryExporter exporter = + exporterBuilder() + // Connecting to a non-routable IP address to trigger connection error + .setEndpoint("http://10.255.255.1") + .setConnectTimeout(Duration.ofMillis(1)) + .build(); + try { + long startTimeMillis = System.currentTimeMillis(); + CompletableResultCode result = + exporter.export(Collections.singletonList(generateFakeTelemetry())); + assertThat(result.join(10, TimeUnit.SECONDS).isSuccess()).isFalse(); + // Assert that the export request fails well before the default connect timeout of 10s + assertThat(System.currentTimeMillis() - startTimeMillis) + .isLessThan(TimeUnit.SECONDS.toMillis(1)); + } finally { + exporter.shutdown(); + } + } + @Test void deadlineSetPerExport() throws InterruptedException { TelemetryExporter exporter = @@ -800,6 +827,9 @@ void stringRepresentation() throws IOException, CertificateEncodingException { + "timeoutNanos=" + TimeUnit.SECONDS.toNanos(10) + ", " + + "connectTimeoutNanos=" + + TimeUnit.SECONDS.toNanos(10) + + ", " + "compressionEnabled=false, " + "headers=Headers\\{User-Agent=OBFUSCATED\\}" + ".*" // Maybe additional grpcChannel field @@ -811,6 +841,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException { telemetryExporter = exporterBuilder() .setTimeout(Duration.ofSeconds(5)) + .setConnectTimeout(Duration.ofSeconds(4)) .setEndpoint("http://example:4317") .setCompression("gzip") .addHeader("foo", "bar") @@ -837,6 +868,9 @@ void stringRepresentation() throws IOException, CertificateEncodingException { + "timeoutNanos=" + TimeUnit.SECONDS.toNanos(5) + ", " + + "connectTimeoutNanos=" + + TimeUnit.SECONDS.toNanos(4) + + ", " + "compressionEnabled=true, " + "headers=Headers\\{.*foo=OBFUSCATED.*\\}, " + "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}" diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcLogRecordExporterBuilderWrapper.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcLogRecordExporterBuilderWrapper.java index 5495a9d6bcd..60e616e7838 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcLogRecordExporterBuilderWrapper.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcLogRecordExporterBuilderWrapper.java @@ -49,7 +49,8 @@ public TelemetryExporterBuilder setConnectTimeout(long timeout, T @Override public TelemetryExporterBuilder setConnectTimeout(Duration timeout) { - throw new UnsupportedOperationException(); + builder.setConnectTimeout(timeout); + return this; } @Override diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcMetricExporterBuilderWrapper.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcMetricExporterBuilderWrapper.java index ff9a53a7ca5..6d1f2365512 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcMetricExporterBuilderWrapper.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcMetricExporterBuilderWrapper.java @@ -49,7 +49,8 @@ public TelemetryExporterBuilder setConnectTimeout(long timeout, Time @Override public TelemetryExporterBuilder setConnectTimeout(Duration timeout) { - throw new UnsupportedOperationException(); + builder.setConnectTimeout(timeout); + return this; } @Override diff --git a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcSpanExporterBuilderWrapper.java b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcSpanExporterBuilderWrapper.java index 59068809909..871eb559deb 100644 --- a/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcSpanExporterBuilderWrapper.java +++ b/exporters/otlp/testing-internal/src/main/java/io/opentelemetry/exporter/otlp/testing/internal/GrpcSpanExporterBuilderWrapper.java @@ -50,7 +50,8 @@ public TelemetryExporterBuilder setConnectTimeout(long timeout, TimeUn @Override public TelemetryExporterBuilder setConnectTimeout(Duration timeout) { - throw new UnsupportedOperationException(); + builder.setConnectTimeout(timeout); + return this; } @Override diff --git a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java index 34200681980..ba38519651d 100644 --- a/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java +++ b/exporters/sender/grpc-managed-channel/src/main/java/io/opentelemetry/exporter/sender/grpc/managedchannel/internal/UpstreamGrpcSenderProvider.java @@ -35,6 +35,7 @@ public GrpcSender createSender( String endpointPath, boolean compressionEnabled, long timeoutNanos, + long connectTimeoutNanos, Supplier>> headersSupplier, @Nullable Object managedChannel, Supplier>> stubFactory, diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 24ade7fae5a..ad0d07ecec1 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -74,6 +74,7 @@ public OkHttpGrpcSender( String endpoint, boolean compressionEnabled, long timeoutNanos, + long connectTimeoutNanos, Supplier>> headersSupplier, @Nullable RetryPolicy retryPolicy, @Nullable SSLContext sslContext, @@ -81,7 +82,8 @@ public OkHttpGrpcSender( OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder() .dispatcher(OkHttpUtil.newDispatcher()) - .callTimeout(Duration.ofNanos(timeoutNanos)); + .callTimeout(Duration.ofNanos(timeoutNanos)) + .connectTimeout(Duration.ofNanos(connectTimeoutNanos)); if (retryPolicy != null) { clientBuilder.addInterceptor( new RetryInterceptor(retryPolicy, OkHttpGrpcSender::isRetryable)); diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java index 6ac663495b0..791e94ac68c 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSenderProvider.java @@ -34,6 +34,7 @@ public GrpcSender createSender( String endpointPath, boolean compressionEnabled, long timeoutNanos, + long connectTimeoutNanos, Supplier>> headersSupplier, @Nullable Object managedChannel, Supplier>> stubFactory, @@ -44,6 +45,7 @@ public GrpcSender createSender( endpoint.resolve(endpointPath).toString(), compressionEnabled, timeoutNanos, + connectTimeoutNanos, headersSupplier, retryPolicy, sslContext, diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java index 482eadc1bd4..9ce333cc924 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java @@ -21,7 +21,7 @@ void send(OkHttpGrpcSender sender, Runnable onSuccess, Runnable @Override OkHttpGrpcSender createSender(String endpoint) { return new OkHttpGrpcSender<>( - "https://localhost", false, 10L, Collections::emptyMap, null, null, null); + "https://localhost", false, 10L, 10L, Collections::emptyMap, null, null, null); } protected static class DummyMarshaler extends MarshalerWithSize {