Skip to content

Commit

Permalink
[BUG] Generic HTTP Actions in Java Client does not work with AwsSdk2T…
Browse files Browse the repository at this point in the history
…ransport

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed May 9, 2024
1 parent 73c1f5b commit 4372193
Showing 1 changed file with 50 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
Expand All @@ -35,6 +37,7 @@
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.GenericEndpoint;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.TransportException;
Expand All @@ -47,6 +50,7 @@
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.SdkHttpClient;
Expand Down Expand Up @@ -393,7 +397,15 @@ private <ResponseT> ResponseT executeSync(
try {
bodyStream = executeResponse.responseBody().orElse(null);
SdkHttpResponse httpResponse = executeResponse.httpResponse();
return parseResponse(httpResponse, bodyStream, endpoint, options);
return parseResponse(
httpRequest.getUri(),
httpRequest.method(),
httpRequest.protocol(),
httpResponse,
bodyStream,
endpoint,
options
);
} finally {
if (bodyStream != null) {
bodyStream.close();
Expand Down Expand Up @@ -421,7 +433,17 @@ private <ResponseT> CompletableFuture<ResponseT> executeAsync(
CompletableFuture<ResponseT> ret = new CompletableFuture<>();
try {
InputStream bodyStream = new ByteArrayInputStream(responseBody);
ret.complete(parseResponse(response, bodyStream, endpoint, options));
ret.complete(
parseResponse(
httpRequest.getUri(),
httpRequest.method(),
httpRequest.protocol(),
response,
bodyStream,
endpoint,
options
)
);
} catch (Throwable e) {
ret.completeExceptionally(e);
}
Expand All @@ -430,6 +452,9 @@ private <ResponseT> CompletableFuture<ResponseT> executeAsync(
}

private <ResponseT, ErrorT> ResponseT parseResponse(
URI uri,
@Nonnull SdkHttpMethod method,
String protocol,
@Nonnull SdkHttpResponse httpResponse,
@CheckForNull InputStream bodyStream,
@Nonnull Endpoint<?, ResponseT, ErrorT> endpoint,
Expand Down Expand Up @@ -523,6 +548,29 @@ private <ResponseT, ErrorT> ResponseT parseResponse(
;
}
return response;
} else if (endpoint instanceof GenericEndpoint) {
@SuppressWarnings("unchecked")
final GenericEndpoint<?, ResponseT> rawEndpoint = (GenericEndpoint<?, ResponseT>) endpoint;

String contentType = null;
if (bodyStream != null) {
contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null);
}

return rawEndpoint.responseDeserializer(
uri.toString(),
method.name(),
protocol,
httpResponse.statusCode(),
httpResponse.statusText().orElse(null),
httpResponse.headers()
.entrySet()
.stream()
.map(h -> new AbstractMap.SimpleEntry<String, String>(h.getKey(), Objects.toString(h.getValue())))
.collect(Collectors.toList()),
contentType,
bodyStream
);
} else {
throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'");
}
Expand Down

0 comments on commit 4372193

Please sign in to comment.