Skip to content

Commit

Permalink
Propagate serialization IOException instead of rethrowing as runtime (#…
Browse files Browse the repository at this point in the history
…6082)

Co-authored-by: Ricardo Mestre <ricardom57@hotmail.com>
  • Loading branch information
jack-berg and ricardo-mestre authored Jan 3, 2024
1 parent 63fe708 commit b4ed532
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -37,7 +35,6 @@ public final class HttpExporter<T extends Marshaler> {
private final String type;
private final HttpSender httpSender;
private final ExporterMetrics exporterMetrics;
private final boolean exportAsJson;

public HttpExporter(
String exporterName,
Expand All @@ -51,7 +48,6 @@ public HttpExporter(
exportAsJson
? ExporterMetrics.createHttpJson(exporterName, type, meterProviderSupplier)
: ExporterMetrics.createHttpProtobuf(exporterName, type, meterProviderSupplier);
this.exportAsJson = exportAsJson;
}

public CompletableResultCode export(T exportRequest, int numItems) {
Expand All @@ -63,21 +59,8 @@ public CompletableResultCode export(T exportRequest, int numItems) {

CompletableResultCode result = new CompletableResultCode();

Consumer<OutputStream> marshaler =
os -> {
try {
if (exportAsJson) {
exportRequest.writeJsonTo(os);
} else {
exportRequest.writeBinaryTo(os);
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
};

httpSender.send(
marshaler,
exportRequest,
exportRequest.getBinarySerializedSize(),
httpResponse -> {
int statusCode = httpResponse.statusCode();
Expand All @@ -90,11 +73,11 @@ public CompletableResultCode export(T exportRequest, int numItems) {

exporterMetrics.addFailed(numItems);

byte[] body;
byte[] body = null;
try {
body = httpResponse.responseBody();
} catch (IOException ex) {
throw new IllegalStateException(ex);
logger.log(Level.FINE, "Unable to obtain response body", ex);
}

String status = extractErrorStatus(httpResponse.statusMessage(), body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public HttpExporter<T> build() {
httpSenderProvider.createSender(
endpoint,
compressor,
exportAsJson,
exportAsJson ? "application/json" : "application/x-protobuf",
timeoutNanos,
connectTimeoutNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

package io.opentelemetry.exporter.internal.http;

import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Consumer;

/**
Expand All @@ -33,7 +33,7 @@ public interface HttpSender {
* @param onError the callback to invoke when the HTTP request could not be executed
*/
void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public interface HttpSenderProvider {
HttpSender createSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeout,
Expand Down
2 changes: 2 additions & 0 deletions exporters/sender/jdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ otelJava.moduleName.set("io.opentelemetry.exporter.sender.jdk.internal")
dependencies {
implementation(project(":exporters:common"))
implementation(project(":sdk:common"))

compileOnly("com.fasterxml.jackson.core:jackson-core")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -53,6 +55,7 @@ public final class JdkHttpSender implements HttpSender {
private final HttpClient client;
private final URI uri;
@Nullable private final Compressor compressor;
private final boolean exportAsJson;
private final String contentType;
private final long timeoutNanos;
private final Supplier<Map<String, List<String>>> headerSupplier;
Expand All @@ -63,6 +66,7 @@ public final class JdkHttpSender implements HttpSender {
HttpClient client,
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
Supplier<Map<String, List<String>>> headerSupplier,
Expand All @@ -74,6 +78,7 @@ public final class JdkHttpSender implements HttpSender {
throw new IllegalArgumentException(e);
}
this.compressor = compressor;
this.exportAsJson = exportAsJson;
this.contentType = contentType;
this.timeoutNanos = timeoutNanos;
this.headerSupplier = headerSupplier;
Expand All @@ -83,6 +88,7 @@ public final class JdkHttpSender implements HttpSender {
JdkHttpSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeoutNanos,
Expand All @@ -93,6 +99,7 @@ public final class JdkHttpSender implements HttpSender {
configureClient(sslContext, connectTimeoutNanos),
endpoint,
compressor,
exportAsJson,
contentType,
timeoutNanos,
headerSupplier,
Expand All @@ -111,7 +118,7 @@ private static HttpClient configureClient(

@Override
public void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError) {
Expand All @@ -121,7 +128,7 @@ public void send(
try {
return sendInternal(marshaler);
} catch (IOException e) {
throw new IllegalStateException(e);
throw new UncheckedIOException(e);
}
},
executorService)
Expand All @@ -136,7 +143,7 @@ public void send(
}

// Visible for testing
HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) throws IOException {
HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
long startTimeNanos = System.nanoTime();
HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder().uri(uri).timeout(Duration.ofNanos(timeoutNanos));
Expand All @@ -151,12 +158,12 @@ HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) throws IOExc
if (compressor != null) {
requestBuilder.header("Content-Encoding", compressor.getEncoding());
try (OutputStream compressed = compressor.compress(os)) {
marshaler.accept(compressed);
write(marshaler, compressed);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
marshaler.accept(os);
write(marshaler, os);
}

ByteBufferPool byteBufferPool = threadLocalByteBufPool.get();
Expand Down Expand Up @@ -211,6 +218,14 @@ HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) throws IOExc
throw exception;
}

private void write(Marshaler marshaler, OutputStream os) throws IOException {
if (exportAsJson) {
marshaler.writeJsonTo(os);
} else {
marshaler.writeBinaryTo(os);
}
}

private HttpResponse<byte[]> sendRequest(
HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class JdkHttpSenderProvider implements HttpSenderProvider {
public HttpSender createSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeout,
Expand All @@ -40,6 +41,7 @@ public HttpSender createSender(
return new JdkHttpSender(
endpoint,
compressor,
exportAsJson,
contentType,
timeoutNanos,
connectTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -54,6 +56,7 @@ void setup() throws IOException, InterruptedException {
"http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection
// timeout
null,
false,
"text/plain",
Duration.ofSeconds(10).toNanos(),
Collections::emptyMap,
Expand All @@ -65,7 +68,7 @@ void setup() throws IOException, InterruptedException {

@Test
void sendInternal_RetryableConnectTimeoutException() throws IOException, InterruptedException {
assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(HttpConnectTimeoutException.class);

verify(mockHttpClient, times(2)).send(any(), any());
Expand All @@ -75,7 +78,7 @@ void sendInternal_RetryableConnectTimeoutException() throws IOException, Interru
void sendInternal_RetryableIoException() throws IOException, InterruptedException {
doThrow(new IOException("error!")).when(mockHttpClient).send(any(), any());

assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(IOException.class)
.hasMessage("error!");

Expand All @@ -86,7 +89,7 @@ void sendInternal_RetryableIoException() throws IOException, InterruptedExceptio
void sendInternal_NonRetryableException() throws IOException, InterruptedException {
doThrow(new SSLException("unknown error")).when(mockHttpClient).send(any(), any());

assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(IOException.class)
.hasMessage("unknown error");

Expand All @@ -99,6 +102,7 @@ void connectTimeout() {
new JdkHttpSender(
"http://localhost",
null,
false,
"text/plain",
1,
TimeUnit.SECONDS.toNanos(10),
Expand All @@ -112,4 +116,15 @@ void connectTimeout() {
httpClient ->
assertThat(httpClient.connectTimeout().get()).isEqualTo(Duration.ofSeconds(10)));
}

private static class NoOpMarshaler extends Marshaler {

@Override
public int getBinarySerializedSize() {
return 0;
}

@Override
protected void writeTo(Serializer output) {}
}
}
1 change: 1 addition & 0 deletions exporters/sender/okhttp/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation("com.squareup.okhttp3:okhttp")

compileOnly("io.grpc:grpc-stub")
compileOnly("com.fasterxml.jackson.core:jackson-core")

testImplementation("com.linecorp.armeria:armeria-junit5")
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -44,6 +44,7 @@ public final class OkHttpHttpSender implements HttpSender {
private final OkHttpClient client;
private final HttpUrl url;
@Nullable private final Compressor compressor;
private final boolean exportAsJson;
private final Supplier<Map<String, List<String>>> headerSupplier;
private final MediaType mediaType;

Expand All @@ -52,6 +53,7 @@ public final class OkHttpHttpSender implements HttpSender {
public OkHttpHttpSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectionTimeoutNanos,
Expand Down Expand Up @@ -86,13 +88,14 @@ public OkHttpHttpSender(
this.client = builder.build();
this.url = HttpUrl.get(endpoint);
this.compressor = compressor;
this.exportAsJson = exportAsJson;
this.mediaType = MediaType.parse(contentType);
this.headerSupplier = headerSupplier;
}

@Override
public void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError) {
Expand All @@ -103,7 +106,7 @@ public void send(
headers.forEach(
(key, values) -> values.forEach(value -> requestBuilder.addHeader(key, value)));
}
RequestBody body = new RawRequestBody(marshaler, contentLength, mediaType);
RequestBody body = new RawRequestBody(marshaler, exportAsJson, contentLength, mediaType);
if (compressor != null) {
requestBuilder.addHeader("Content-Encoding", compressor.getEncoding());
requestBuilder.post(new CompressedRequestBody(compressor, body));
Expand Down Expand Up @@ -161,13 +164,15 @@ static boolean isRetryable(okhttp3.Response response) {

private static class RawRequestBody extends RequestBody {

private final Consumer<OutputStream> marshaler;
private final Marshaler marshaler;
private final boolean exportAsJson;
private final int contentLength;
private final MediaType mediaType;

private RawRequestBody(
Consumer<OutputStream> marshaler, int contentLength, MediaType mediaType) {
Marshaler marshaler, boolean exportAsJson, int contentLength, MediaType mediaType) {
this.marshaler = marshaler;
this.exportAsJson = exportAsJson;
this.contentLength = contentLength;
this.mediaType = mediaType;
}
Expand All @@ -183,8 +188,12 @@ public MediaType contentType() {
}

@Override
public void writeTo(BufferedSink bufferedSink) {
marshaler.accept(bufferedSink.outputStream());
public void writeTo(BufferedSink bufferedSink) throws IOException {
if (exportAsJson) {
marshaler.writeJsonTo(bufferedSink.outputStream());
} else {
marshaler.writeBinaryTo(bufferedSink.outputStream());
}
}
}

Expand Down
Loading

0 comments on commit b4ed532

Please sign in to comment.