Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate serialization IOException instead of rethrowing as runtime #6082

Merged
merged 7 commits into from
Jan 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -37,7 +35,6 @@
private final String type;
private final HttpSender httpSender;
private final ExporterMetrics exporterMetrics;
private final boolean exportAsJson;

public HttpExporter(
String exporterName,
Expand All @@ -51,7 +48,6 @@
exportAsJson
? ExporterMetrics.createHttpJson(exporterName, type, meterProviderSupplier)
: ExporterMetrics.createHttpProtobuf(exporterName, type, meterProviderSupplier);
this.exportAsJson = exportAsJson;
}

public CompletableResultCode export(T exportRequest, int numItems) {
Expand All @@ -63,21 +59,8 @@

CompletableResultCode result = new CompletableResultCode();

Consumer<OutputStream> marshaler =
os -> {
try {
if (exportAsJson) {
exportRequest.writeJsonTo(os);
} else {
exportRequest.writeBinaryTo(os);
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
};

httpSender.send(
marshaler,
exportRequest,
exportRequest.getBinarySerializedSize(),
httpResponse -> {
int statusCode = httpResponse.statusCode();
Expand All @@ -90,11 +73,11 @@

exporterMetrics.addFailed(numItems);

byte[] body;
byte[] body = null;
try {
body = httpResponse.responseBody();
} catch (IOException ex) {
throw new IllegalStateException(ex);
logger.log(Level.FINE, "Unable to obtain response body", ex);

Check warning on line 80 in exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java

View check run for this annotation

Codecov / codecov/patch

exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java#L80

Added line #L80 was not covered by tests
}

String status = extractErrorStatus(httpResponse.statusMessage(), body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public HttpExporter<T> build() {
httpSenderProvider.createSender(
endpoint,
compressor,
exportAsJson,
exportAsJson ? "application/json" : "application/x-protobuf",
timeoutNanos,
connectTimeoutNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

package io.opentelemetry.exporter.internal.http;

import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Consumer;

/**
Expand All @@ -33,7 +33,7 @@ public interface HttpSender {
* @param onError the callback to invoke when the HTTP request could not be executed
*/
void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public interface HttpSenderProvider {
HttpSender createSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeout,
Expand Down
2 changes: 2 additions & 0 deletions exporters/sender/jdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ otelJava.moduleName.set("io.opentelemetry.exporter.sender.jdk.internal")
dependencies {
implementation(project(":exporters:common"))
implementation(project(":sdk:common"))

compileOnly("com.fasterxml.jackson.core:jackson-core")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -53,6 +55,7 @@
private final HttpClient client;
private final URI uri;
@Nullable private final Compressor compressor;
private final boolean exportAsJson;
private final String contentType;
private final long timeoutNanos;
private final Supplier<Map<String, List<String>>> headerSupplier;
Expand All @@ -63,6 +66,7 @@
HttpClient client,
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
Supplier<Map<String, List<String>>> headerSupplier,
Expand All @@ -74,6 +78,7 @@
throw new IllegalArgumentException(e);
}
this.compressor = compressor;
this.exportAsJson = exportAsJson;
this.contentType = contentType;
this.timeoutNanos = timeoutNanos;
this.headerSupplier = headerSupplier;
Expand All @@ -83,6 +88,7 @@
JdkHttpSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeoutNanos,
Expand All @@ -93,6 +99,7 @@
configureClient(sslContext, connectTimeoutNanos),
endpoint,
compressor,
exportAsJson,
contentType,
timeoutNanos,
headerSupplier,
Expand All @@ -111,7 +118,7 @@

@Override
public void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError) {
Expand All @@ -121,7 +128,7 @@
try {
return sendInternal(marshaler);
} catch (IOException e) {
throw new IllegalStateException(e);
throw new UncheckedIOException(e);
}
},
executorService)
Expand All @@ -136,7 +143,7 @@
}

// Visible for testing
HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) throws IOException {
HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
long startTimeNanos = System.nanoTime();
HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder().uri(uri).timeout(Duration.ofNanos(timeoutNanos));
Expand All @@ -151,12 +158,12 @@
if (compressor != null) {
requestBuilder.header("Content-Encoding", compressor.getEncoding());
try (OutputStream compressed = compressor.compress(os)) {
marshaler.accept(compressed);
write(marshaler, compressed);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
marshaler.accept(os);
write(marshaler, os);
}

ByteBufferPool byteBufferPool = threadLocalByteBufPool.get();
Expand Down Expand Up @@ -211,6 +218,14 @@
throw exception;
}

private void write(Marshaler marshaler, OutputStream os) throws IOException {
if (exportAsJson) {
marshaler.writeJsonTo(os);

Check warning on line 223 in exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java#L223

Added line #L223 was not covered by tests
} else {
marshaler.writeBinaryTo(os);
}
}

private HttpResponse<byte[]> sendRequest(
HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class JdkHttpSenderProvider implements HttpSenderProvider {
public HttpSender createSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeout,
Expand All @@ -40,6 +41,7 @@ public HttpSender createSender(
return new JdkHttpSender(
endpoint,
compressor,
exportAsJson,
contentType,
timeoutNanos,
connectTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -54,6 +56,7 @@ void setup() throws IOException, InterruptedException {
"http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection
// timeout
null,
false,
"text/plain",
Duration.ofSeconds(10).toNanos(),
Collections::emptyMap,
Expand All @@ -65,7 +68,7 @@ void setup() throws IOException, InterruptedException {

@Test
void sendInternal_RetryableConnectTimeoutException() throws IOException, InterruptedException {
assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(HttpConnectTimeoutException.class);

verify(mockHttpClient, times(2)).send(any(), any());
Expand All @@ -75,7 +78,7 @@ void sendInternal_RetryableConnectTimeoutException() throws IOException, Interru
void sendInternal_RetryableIoException() throws IOException, InterruptedException {
doThrow(new IOException("error!")).when(mockHttpClient).send(any(), any());

assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(IOException.class)
.hasMessage("error!");

Expand All @@ -86,7 +89,7 @@ void sendInternal_RetryableIoException() throws IOException, InterruptedExceptio
void sendInternal_NonRetryableException() throws IOException, InterruptedException {
doThrow(new SSLException("unknown error")).when(mockHttpClient).send(any(), any());

assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(IOException.class)
.hasMessage("unknown error");

Expand All @@ -99,6 +102,7 @@ void connectTimeout() {
new JdkHttpSender(
"http://localhost",
null,
false,
"text/plain",
1,
TimeUnit.SECONDS.toNanos(10),
Expand All @@ -112,4 +116,15 @@ void connectTimeout() {
httpClient ->
assertThat(httpClient.connectTimeout().get()).isEqualTo(Duration.ofSeconds(10)));
}

private static class NoOpMarshaler extends Marshaler {

@Override
public int getBinarySerializedSize() {
return 0;
}

@Override
protected void writeTo(Serializer output) {}
}
}
1 change: 1 addition & 0 deletions exporters/sender/okhttp/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation("com.squareup.okhttp3:okhttp")

compileOnly("io.grpc:grpc-stub")
compileOnly("com.fasterxml.jackson.core:jackson-core")

testImplementation("com.linecorp.armeria:armeria-junit5")
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -44,6 +44,7 @@
private final OkHttpClient client;
private final HttpUrl url;
@Nullable private final Compressor compressor;
private final boolean exportAsJson;
private final Supplier<Map<String, List<String>>> headerSupplier;
private final MediaType mediaType;

Expand All @@ -52,6 +53,7 @@
public OkHttpHttpSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectionTimeoutNanos,
Expand Down Expand Up @@ -86,13 +88,14 @@
this.client = builder.build();
this.url = HttpUrl.get(endpoint);
this.compressor = compressor;
this.exportAsJson = exportAsJson;
this.mediaType = MediaType.parse(contentType);
this.headerSupplier = headerSupplier;
}

@Override
public void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError) {
Expand All @@ -103,7 +106,7 @@
headers.forEach(
(key, values) -> values.forEach(value -> requestBuilder.addHeader(key, value)));
}
RequestBody body = new RawRequestBody(marshaler, contentLength, mediaType);
RequestBody body = new RawRequestBody(marshaler, exportAsJson, contentLength, mediaType);
if (compressor != null) {
requestBuilder.addHeader("Content-Encoding", compressor.getEncoding());
requestBuilder.post(new CompressedRequestBody(compressor, body));
Expand Down Expand Up @@ -161,13 +164,15 @@

private static class RawRequestBody extends RequestBody {

private final Consumer<OutputStream> marshaler;
private final Marshaler marshaler;
private final boolean exportAsJson;
private final int contentLength;
private final MediaType mediaType;

private RawRequestBody(
Consumer<OutputStream> marshaler, int contentLength, MediaType mediaType) {
Marshaler marshaler, boolean exportAsJson, int contentLength, MediaType mediaType) {
this.marshaler = marshaler;
this.exportAsJson = exportAsJson;
this.contentLength = contentLength;
this.mediaType = mediaType;
}
Expand All @@ -183,8 +188,12 @@
}

@Override
public void writeTo(BufferedSink bufferedSink) {
marshaler.accept(bufferedSink.outputStream());
public void writeTo(BufferedSink bufferedSink) throws IOException {
if (exportAsJson) {
marshaler.writeJsonTo(bufferedSink.outputStream());

Check warning on line 193 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java#L193

Added line #L193 was not covered by tests
} else {
marshaler.writeBinaryTo(bufferedSink.outputStream());
}
}
}

Expand Down
Loading
Loading