From 4372193fddf946635e05440d7daadf47bfdc5e06 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Thu, 9 May 2024 11:02:42 -0400 Subject: [PATCH] [BUG] Generic HTTP Actions in Java Client does not work with AwsSdk2Transport Signed-off-by: Andriy Redko --- .../transport/aws/AwsSdk2Transport.java | 52 ++++++++++++++++++- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java index 2269b8e13b..4e10aecf24 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java @@ -17,6 +17,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; +import java.util.AbstractMap; import java.util.Collection; import java.util.Map; import java.util.Objects; @@ -24,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; @@ -35,6 +37,7 @@ import org.opensearch.client.opensearch._types.ErrorResponse; import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.transport.Endpoint; +import org.opensearch.client.transport.GenericEndpoint; import org.opensearch.client.transport.JsonEndpoint; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.TransportException; @@ -47,6 +50,7 @@ import software.amazon.awssdk.auth.signer.Aws4Signer; import software.amazon.awssdk.auth.signer.params.Aws4SignerParams; import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.http.Header; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.HttpExecuteResponse; import software.amazon.awssdk.http.SdkHttpClient; @@ -393,7 +397,15 @@ private ResponseT executeSync( try { bodyStream = executeResponse.responseBody().orElse(null); SdkHttpResponse httpResponse = executeResponse.httpResponse(); - return parseResponse(httpResponse, bodyStream, endpoint, options); + return parseResponse( + httpRequest.getUri(), + httpRequest.method(), + httpRequest.protocol(), + httpResponse, + bodyStream, + endpoint, + options + ); } finally { if (bodyStream != null) { bodyStream.close(); @@ -421,7 +433,17 @@ private CompletableFuture executeAsync( CompletableFuture ret = new CompletableFuture<>(); try { InputStream bodyStream = new ByteArrayInputStream(responseBody); - ret.complete(parseResponse(response, bodyStream, endpoint, options)); + ret.complete( + parseResponse( + httpRequest.getUri(), + httpRequest.method(), + httpRequest.protocol(), + response, + bodyStream, + endpoint, + options + ) + ); } catch (Throwable e) { ret.completeExceptionally(e); } @@ -430,6 +452,9 @@ private CompletableFuture executeAsync( } private ResponseT parseResponse( + URI uri, + @Nonnull SdkHttpMethod method, + String protocol, @Nonnull SdkHttpResponse httpResponse, @CheckForNull InputStream bodyStream, @Nonnull Endpoint endpoint, @@ -523,6 +548,29 @@ private ResponseT parseResponse( ; } return response; + } else if (endpoint instanceof GenericEndpoint) { + @SuppressWarnings("unchecked") + final GenericEndpoint rawEndpoint = (GenericEndpoint) endpoint; + + String contentType = null; + if (bodyStream != null) { + contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null); + } + + return rawEndpoint.responseDeserializer( + uri.toString(), + method.name(), + protocol, + httpResponse.statusCode(), + httpResponse.statusText().orElse(null), + httpResponse.headers() + .entrySet() + .stream() + .map(h -> new AbstractMap.SimpleEntry(h.getKey(), Objects.toString(h.getValue()))) + .collect(Collectors.toList()), + contentType, + bodyStream + ); } else { throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'"); }