From b50d185666455bd6120539faabe366e9eb2b0d9c Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Thu, 9 May 2024 11:02:42 -0400 Subject: [PATCH] [BUG] Generic HTTP Actions in Java Client does not work with AwsSdk2Transport Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + .../generic/OpenSearchGenericClient.java | 30 ++--- .../transport/aws/AwsSdk2Transport.java | 113 +++++++++++++++--- 3 files changed, 105 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 36175f5200..15d5f3f826 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ This section is for maintaining a changelog for all breaking changes for the cli ### Removed ### Fixed +- [BUG] Generic HTTP Actions in Java Client does not work with AwsSdk2Transport ([#978](https://github.com/opensearch-project/opensearch-java/pull/978)) ### Security diff --git a/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java b/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java index 948444e150..4a8fffa8a4 100644 --- a/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java +++ b/java-client/src/main/java/org/opensearch/client/opensearch/generic/OpenSearchGenericClient.java @@ -95,27 +95,17 @@ public GenericResponse responseDeserializer( @Nullable String contentType, @Nullable InputStream body ) { - if (isError(status)) { - // Fully consume the response body since the it will be propagated as an exception with possible no chance to be closed - try (Body b = Body.from(body, contentType)) { - if (b != null) { - return new GenericResponse( - uri, - protocol, - method, - status, - reason, - headers, - Body.from(b.bodyAsBytes(), b.contentType()) - ); - } else { - return new GenericResponse(uri, protocol, method, status, reason, headers); - } - } catch (final IOException ex) { - throw new UncheckedIOException(ex); + try (Body b = Body.from(body, contentType)) { + if (b != null) { + // Fully consume the response body: + // - if it will be propagated as an exception with possible no chance to be closed + // - the entity stream will be consumed and become unavailable + return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(b.bodyAsBytes(), b.contentType())); + } else { + return new GenericResponse(uri, protocol, method, status, reason, headers); } - } else { - return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(body, contentType)); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); } } diff --git a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java index 2269b8e13b..289399b610 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java @@ -17,6 +17,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; +import java.util.AbstractMap; import java.util.Collection; import java.util.Map; import java.util.Objects; @@ -24,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; @@ -35,6 +37,7 @@ import org.opensearch.client.opensearch._types.ErrorResponse; import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.transport.Endpoint; +import org.opensearch.client.transport.GenericEndpoint; import org.opensearch.client.transport.JsonEndpoint; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.TransportException; @@ -47,6 +50,7 @@ import software.amazon.awssdk.auth.signer.Aws4Signer; import software.amazon.awssdk.auth.signer.params.Aws4SignerParams; import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.http.Header; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.HttpExecuteResponse; import software.amazon.awssdk.http.SdkHttpClient; @@ -393,7 +397,15 @@ private ResponseT executeSync( try { bodyStream = executeResponse.responseBody().orElse(null); SdkHttpResponse httpResponse = executeResponse.httpResponse(); - return parseResponse(httpResponse, bodyStream, endpoint, options); + return parseResponse( + httpRequest.getUri(), + httpRequest.method(), + httpRequest.protocol(), + httpResponse, + bodyStream, + endpoint, + options + ); } finally { if (bodyStream != null) { bodyStream.close(); @@ -421,7 +433,17 @@ private CompletableFuture executeAsync( CompletableFuture ret = new CompletableFuture<>(); try { InputStream bodyStream = new ByteArrayInputStream(responseBody); - ret.complete(parseResponse(response, bodyStream, endpoint, options)); + ret.complete( + parseResponse( + httpRequest.getUri(), + httpRequest.method(), + httpRequest.protocol(), + response, + bodyStream, + endpoint, + options + ) + ); } catch (Throwable e) { ret.completeExceptionally(e); } @@ -430,6 +452,9 @@ private CompletableFuture executeAsync( } private ResponseT parseResponse( + URI uri, + @Nonnull SdkHttpMethod method, + String protocol, @Nonnull SdkHttpResponse httpResponse, @CheckForNull InputStream bodyStream, @Nonnull Endpoint endpoint, @@ -478,24 +503,51 @@ private ResponseT parseResponse( } if (endpoint.isError(statusCode)) { - JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); - if (errorDeserializer == null || bodyStream == null) { - throw new TransportException("Request failed with status code '" + statusCode + "'"); - } - try { - try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) { - ErrorT error = errorDeserializer.deserialize(parser, mapper); - throw new OpenSearchException((ErrorResponse) error); + if (endpoint instanceof GenericEndpoint) { + @SuppressWarnings("unchecked") + final GenericEndpoint rawEndpoint = (GenericEndpoint) endpoint; + + String contentType = null; + if (bodyStream != null) { + contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null); + } + + final ResponseT error = rawEndpoint.responseDeserializer( + uri.toString(), + method.name(), + protocol, + httpResponse.statusCode(), + httpResponse.statusText().orElse(null), + httpResponse.headers() + .entrySet() + .stream() + .map(h -> new AbstractMap.SimpleEntry(h.getKey(), Objects.toString(h.getValue()))) + .collect(Collectors.toList()), + contentType, + bodyStream + ); + + throw rawEndpoint.exceptionConverter(statusCode, error); + } else { + JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); + if (errorDeserializer == null || bodyStream == null) { + throw new TransportException("Request failed with status code '" + statusCode + "'"); + } + try { + try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) { + ErrorT error = errorDeserializer.deserialize(parser, mapper); + throw new OpenSearchException((ErrorResponse) error); + } + } catch (OpenSearchException e) { + throw e; + } catch (Exception e) { + // can't parse the error - use a general exception + ErrorCause.Builder cause = new ErrorCause.Builder(); + cause.type("http_exception"); + cause.reason("server returned " + statusCode); + ErrorResponse error = ErrorResponse.of(err -> err.status(statusCode).error(cause.build())); + throw new OpenSearchException(error); } - } catch (OpenSearchException e) { - throw e; - } catch (Exception e) { - // can't parse the error - use a general exception - ErrorCause.Builder cause = new ErrorCause.Builder(); - cause.type("http_exception"); - cause.reason("server returned " + statusCode); - ErrorResponse error = ErrorResponse.of(err -> err.status(statusCode).error(cause.build())); - throw new OpenSearchException(error); } } else { if (endpoint instanceof BooleanEndpoint) { @@ -523,6 +575,29 @@ private ResponseT parseResponse( ; } return response; + } else if (endpoint instanceof GenericEndpoint) { + @SuppressWarnings("unchecked") + final GenericEndpoint rawEndpoint = (GenericEndpoint) endpoint; + + String contentType = null; + if (bodyStream != null) { + contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null); + } + + return rawEndpoint.responseDeserializer( + uri.toString(), + method.name(), + protocol, + httpResponse.statusCode(), + httpResponse.statusText().orElse(null), + httpResponse.headers() + .entrySet() + .stream() + .map(h -> new AbstractMap.SimpleEntry(h.getKey(), Objects.toString(h.getValue()))) + .collect(Collectors.toList()), + contentType, + bodyStream + ); } else { throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'"); }