Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connectTimeout configuration option OtlpGrpc{Signal}Exporters #6079

Merged
merged 2 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
Comparing source compatibility of against
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setConnectTimeout(java.time.Duration)
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setConnectTimeout(java.time.Duration)
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setConnectTimeout(java.time.Duration)
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
@SuppressWarnings("JavadocMethod")
public class GrpcExporterBuilder<T extends Marshaler> {

public static final long DEFAULT_CONNECT_TIMEOUT_SECS = 10;

private static final Logger LOGGER = Logger.getLogger(GrpcExporterBuilder.class.getName());

private final String exporterName;
Expand All @@ -52,6 +54,7 @@ public class GrpcExporterBuilder<T extends Marshaler> {
grpcStubFactory;

private long timeoutNanos;
private long connectTimeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_CONNECT_TIMEOUT_SECS);
private URI endpoint;
@Nullable private Compressor compressor;
private final Map<String, String> constantHeaders = new HashMap<>();
Expand Down Expand Up @@ -92,6 +95,11 @@ public GrpcExporterBuilder<T> setTimeout(Duration timeout) {
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

public GrpcExporterBuilder<T> setConnectTimeout(long timeout, TimeUnit unit) {
connectTimeoutNanos = unit.toNanos(timeout);
return this;
}

public GrpcExporterBuilder<T> setEndpoint(String endpoint) {
this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint);
return this;
Expand Down Expand Up @@ -151,6 +159,7 @@ public GrpcExporterBuilder<T> copy() {
grpcEndpointPath);

copy.timeoutNanos = timeoutNanos;
copy.connectTimeoutNanos = connectTimeoutNanos;
copy.endpoint = endpoint;
copy.compressor = compressor;
copy.constantHeaders.putAll(constantHeaders);
Expand Down Expand Up @@ -193,6 +202,7 @@ public GrpcExporter<T> build() {
grpcEndpointPath,
compressor,
timeoutNanos,
connectTimeoutNanos,
headerSupplier,
grpcChannel,
grpcStubFactory,
Expand All @@ -214,6 +224,7 @@ public String toString(boolean includePrefixAndSuffix) {
joiner.add("endpoint=" + endpoint.toString());
joiner.add("endpointPath=" + grpcEndpointPath);
joiner.add("timeoutNanos=" + timeoutNanos);
joiner.add("connectTimeoutNanos=" + connectTimeoutNanos);
joiner.add(
"compressorEncoding="
+ Optional.ofNullable(compressor).map(Compressor::getEncoding).orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ <T extends Marshaler> GrpcSender<T> createSender(
String endpointPath,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable Object managedChannel,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void setUp() {
.toString(),
null,
10,
10,
Collections::emptyMap,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,26 @@ public OtlpGrpcLogRecordExporterBuilder setTimeout(Duration timeout) {
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcLogRecordExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved
delegate.setConnectTimeout(timeout, unit);
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcLogRecordExporterBuilder setConnectTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

/**
* Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The
* endpoint must start with either http:// or https://.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ public OtlpGrpcMetricExporterBuilder setTimeout(Duration timeout) {
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcMetricExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
delegate.setConnectTimeout(timeout, unit);
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcMetricExporterBuilder setConnectTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

/**
* Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The
* endpoint must start with either http:// or https://.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ public OtlpGrpcSpanExporterBuilder setTimeout(Duration timeout) {
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcSpanExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
delegate.setConnectTimeout(timeout, unit);
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcSpanExporterBuilder setConnectTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

/**
* Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The
* endpoint must start with either http:// or https://.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,33 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) th
}
}

@Test
@SuppressLogger(GrpcExporter.class)
void connectTimeout() {
// UpstreamGrpcSender doesn't support connectTimeout, so we skip the test
assumeThat(exporter.unwrap())
.extracting("delegate.grpcSender")
.matches(sender -> sender.getClass().getSimpleName().equals("OkHttpGrpcSender"));

TelemetryExporter<T> exporter =
exporterBuilder()
// Connecting to a non-routable IP address to trigger connection error
.setEndpoint("http://10.255.255.1")
.setConnectTimeout(Duration.ofMillis(1))
.build();
try {
long startTimeMillis = System.currentTimeMillis();
CompletableResultCode result =
exporter.export(Collections.singletonList(generateFakeTelemetry()));
assertThat(result.join(10, TimeUnit.SECONDS).isSuccess()).isFalse();
// Assert that the export request fails well before the default connect timeout of 10s
assertThat(System.currentTimeMillis() - startTimeMillis)
.isLessThan(TimeUnit.SECONDS.toMillis(1));
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved
} finally {
exporter.shutdown();
}
}

@Test
void deadlineSetPerExport() throws InterruptedException {
TelemetryExporter<T> exporter =
Expand Down Expand Up @@ -840,6 +867,9 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ "timeoutNanos="
+ TimeUnit.SECONDS.toNanos(10)
+ ", "
+ "connectTimeoutNanos="
+ TimeUnit.SECONDS.toNanos(10)
+ ", "
+ "compressorEncoding=null, "
+ "headers=Headers\\{User-Agent=OBFUSCATED\\}"
+ ".*" // Maybe additional grpcChannel field
Expand All @@ -851,6 +881,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
telemetryExporter =
exporterBuilder()
.setTimeout(Duration.ofSeconds(5))
.setConnectTimeout(Duration.ofSeconds(4))
.setEndpoint("http://example:4317")
.setCompression("gzip")
.addHeader("foo", "bar")
Expand All @@ -877,6 +908,9 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ "timeoutNanos="
+ TimeUnit.SECONDS.toNanos(5)
+ ", "
+ "connectTimeoutNanos="
+ TimeUnit.SECONDS.toNanos(4)
+ ", "
+ "compressorEncoding=gzip, "
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public TelemetryExporterBuilder<LogRecordData> setConnectTimeout(long timeout, T

@Override
public TelemetryExporterBuilder<LogRecordData> setConnectTimeout(Duration timeout) {
throw new UnsupportedOperationException();
builder.setConnectTimeout(timeout);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public TelemetryExporterBuilder<MetricData> setConnectTimeout(long timeout, Time

@Override
public TelemetryExporterBuilder<MetricData> setConnectTimeout(Duration timeout) {
throw new UnsupportedOperationException();
builder.setConnectTimeout(timeout);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public TelemetryExporterBuilder<SpanData> setConnectTimeout(long timeout, TimeUn

@Override
public TelemetryExporterBuilder<SpanData> setConnectTimeout(Duration timeout) {
throw new UnsupportedOperationException();
builder.setConnectTimeout(timeout);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
String endpointPath,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable Object managedChannel,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ public OkHttpGrpcSender(
String endpoint,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
OkHttpClient.Builder clientBuilder =
new OkHttpClient.Builder()
.dispatcher(OkHttpUtil.newDispatcher())
.callTimeout(Duration.ofNanos(timeoutNanos));
.callTimeout(Duration.ofNanos(timeoutNanos))
.connectTimeout(Duration.ofNanos(connectTimeoutNanos));
if (retryPolicy != null) {
clientBuilder.addInterceptor(
new RetryInterceptor(retryPolicy, OkHttpGrpcSender::isRetryable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
String endpointPath,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable Object managedChannel,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
Expand All @@ -45,6 +46,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
endpoint.resolve(endpointPath).toString(),
compressor,
timeoutNanos,
connectTimeoutNanos,
headersSupplier,
retryPolicy,
sslContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void send(OkHttpGrpcSender<DummyMarshaler> sender, Runnable onSuccess, Runnable
@Override
OkHttpGrpcSender<DummyMarshaler> createSender(String endpoint) {
return new OkHttpGrpcSender<>(
"https://localhost", null, 10L, Collections::emptyMap, null, null, null);
"https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null);
}

protected static class DummyMarshaler extends MarshalerWithSize {
Expand Down
Loading