From 45ec5ed227e9cd514d7ed5ff2e82e9b1c7480942 Mon Sep 17 00:00:00 2001 From: Jitendra Kumar Date: Thu, 23 Jun 2022 19:43:15 +0530 Subject: [PATCH 1/5] fix for bug: https://github.com/opensearch-project/OpenSearch/issues/3640 Signed-off-by: Jitendra Kumar --- .../org/opensearch/client/RestClient.java | 35 ++++ .../client/RestClientCompressionTests.java | 188 ++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java diff --git a/client/rest/src/main/java/org/opensearch/client/RestClient.java b/client/rest/src/main/java/org/opensearch/client/RestClient.java index 4f899fd709112..b88b132ec123f 100644 --- a/client/rest/src/main/java/org/opensearch/client/RestClient.java +++ b/client/rest/src/main/java/org/opensearch/client/RestClient.java @@ -954,6 +954,41 @@ public InputStream getContent() throws IOException { } return out.asInput(); } + + /** + * A gzip compressing enrity doesn't worked with chunked encoding with sigv4 + * + * @return false + */ + @Override + public boolean isChunked() { + return false; + } + + /** + * A gzip entity require to content length in http headers + * as it doesn't work with chunked encoding for sigv4 + * + * @return content lenght of gzip entity + */ + @Override + public long getContentLength() { + long size = 0; + int chunk = 0; + byte[] buffer = new byte[1024]; + + try { + InputStream is = getContent(); + + while ((chunk = is.read(buffer)) != -1) { + size += chunk; + } + } catch (Exception ex) { + throw new RuntimeException("failed to get compressed content lenght: " + ex.getMessage()); + } + + return size; + } } /** diff --git a/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java b/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java new file mode 100644 index 0000000000000..f473a9f2fea9d --- /dev/null +++ b/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java @@ -0,0 +1,188 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class RestClientCompressionTests extends RestClientTestCase { + + private static HttpServer httpServer; + + @BeforeClass + public static void startHttpServer() throws Exception { + httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + httpServer.createContext("/", new GzipResponseHandler()); + httpServer.start(); + } + + @AfterClass + public static void stopHttpServers() throws IOException { + httpServer.stop(0); + httpServer = null; + } + + /** + * A response handler that accepts gzip-encoded data and replies request and response encoding values + * followed by the request body. The response is compressed if "Accept-Encoding" is "gzip". + */ + private static class GzipResponseHandler implements HttpHandler { + @Override + public void handle(HttpExchange exchange) throws IOException { + + // Decode body (if any) + String contentEncoding = exchange.getRequestHeaders().getFirst("Content-Encoding"); + String contentLength = exchange.getRequestHeaders().getFirst("Content-Length"); + InputStream body = exchange.getRequestBody(); + boolean compressedRequest = false; + if ("gzip".equals(contentEncoding)) { + body = new GZIPInputStream(body); + compressedRequest = true; + } + byte[] bytes = readAll(body); + boolean compress = "gzip".equals(exchange.getRequestHeaders().getFirst("Accept-Encoding")); + if (compress) { + exchange.getResponseHeaders().add("Content-Encoding", "gzip"); + } + + exchange.sendResponseHeaders(200, 0); + + // Encode response if needed + OutputStream out = exchange.getResponseBody(); + if (compress) { + out = new GZIPOutputStream(out); + } + + // Outputs ## + out.write(String.valueOf(contentEncoding).getBytes(StandardCharsets.UTF_8)); + out.write('#'); + out.write((compress ? "gzip" : "null").getBytes(StandardCharsets.UTF_8)); + out.write('#'); + out.write((compressedRequest ? contentLength : "null").getBytes(StandardCharsets.UTF_8)); + out.write('#'); + out.write(bytes); + out.close(); + + exchange.close(); + } + } + + /** Read all bytes of an input stream and close it. */ + private static byte[] readAll(InputStream in) throws IOException { + byte[] buffer = new byte[1024]; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int len = 0; + while ((len = in.read(buffer)) > 0) { + bos.write(buffer, 0, len); + } + in.close(); + return bos.toByteArray(); + } + + private RestClient createClient(boolean enableCompression) { + InetSocketAddress address = httpServer.getAddress(); + return RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http")) + .setCompressionEnabled(enableCompression) + .build(); + } + + public void testCompressingClientWithContentLengthSync() throws Exception { + RestClient restClient = createClient(true); + + Request request = new Request("POST", "/"); + request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN)); + + Response response = restClient.performRequest(request); + + HttpEntity entity = response.getEntity(); + String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8); + // Content-Encoding#Accept-Encoding#Content-Length#Content + Assert.assertEquals("gzip#gzip#38#compressing client", content); + + restClient.close(); + } + + public void testCompressingClientContentLengthAsync() throws Exception { + InetSocketAddress address = httpServer.getAddress(); + RestClient restClient = RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http")) + .setCompressionEnabled(true) + .build(); + + Request request = new Request("POST", "/"); + request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN)); + + FutureResponse futureResponse = new FutureResponse(); + restClient.performRequestAsync(request, futureResponse); + Response response = futureResponse.get(); + + // Server should report it had a compressed request and sent back a compressed response + HttpEntity entity = response.getEntity(); + String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8); + + // Content-Encoding#Accept-Encoding#Content-Length#Content + Assert.assertEquals("gzip#gzip#38#compressing client", content); + + restClient.close(); + } + + public static class FutureResponse extends CompletableFuture implements ResponseListener { + @Override + public void onSuccess(Response response) { + this.complete(response); + } + + @Override + public void onFailure(Exception exception) { + this.completeExceptionally(exception); + } + } +} From d023308f864e724529ad347a6c20a8c15c96247e Mon Sep 17 00:00:00 2001 From: Jitendra Kumar Date: Thu, 23 Jun 2022 23:26:42 +0530 Subject: [PATCH 2/5] fix for bug: https://github.com/opensearch-project/OpenSearch/issues/3640 Signed-off-by: Jitendra Kumar --- .../org/opensearch/client/RestClient.java | 23 +++++++------------ .../client/RestClientCompressionTests.java | 19 --------------- 2 files changed, 8 insertions(+), 34 deletions(-) diff --git a/client/rest/src/main/java/org/opensearch/client/RestClient.java b/client/rest/src/main/java/org/opensearch/client/RestClient.java index b88b132ec123f..4a7cd7225cbe1 100644 --- a/client/rest/src/main/java/org/opensearch/client/RestClient.java +++ b/client/rest/src/main/java/org/opensearch/client/RestClient.java @@ -956,7 +956,7 @@ public InputStream getContent() throws IOException { } /** - * A gzip compressing enrity doesn't worked with chunked encoding with sigv4 + * A gzip compressing entity doesn't work with chunked encoding with sigv4 * * @return false */ @@ -966,25 +966,18 @@ public boolean isChunked() { } /** - * A gzip entity require to content length in http headers + * A gzip entity requires content length in http headers * as it doesn't work with chunked encoding for sigv4 * - * @return content lenght of gzip entity + * @return content length of gzip entity */ @Override public long getContentLength() { - long size = 0; - int chunk = 0; - byte[] buffer = new byte[1024]; - - try { - InputStream is = getContent(); - - while ((chunk = is.read(buffer)) != -1) { - size += chunk; - } - } catch (Exception ex) { - throw new RuntimeException("failed to get compressed content lenght: " + ex.getMessage()); + long size; + try (InputStream is = getContent()) { + size = is.readAllBytes().length; + } catch (IOException ex) { + size = -1L; } return size; diff --git a/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java b/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java index f473a9f2fea9d..82266b7830e71 100644 --- a/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java +++ b/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java @@ -6,25 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - /* * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. From 65a590c4fc2656f1e14a2ae3806fc8ca7c2de509 Mon Sep 17 00:00:00 2001 From: Jitendra Kumar Date: Fri, 24 Jun 2022 00:31:15 +0530 Subject: [PATCH 3/5] fix for bug: https://github.com/opensearch-project/OpenSearch/issues/3640 Signed-off-by: Jitendra Kumar --- .../org/opensearch/client/RestClientCompressionTests.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java b/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java index 82266b7830e71..cc35fe1137edd 100644 --- a/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java +++ b/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java @@ -6,11 +6,6 @@ * compatible open source license. */ -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - package org.opensearch.client; import com.sun.net.httpserver.HttpExchange; From 303e408efcd78d4261b34c09904cc33ff18371fe Mon Sep 17 00:00:00 2001 From: Jitendra Kumar Date: Wed, 29 Jun 2022 12:00:26 +0530 Subject: [PATCH 4/5] fix for bug: https://github.com/opensearch-project/OpenSearch/issues/3640 Signed-off-by: Jitendra Kumar --- .../org/opensearch/client/RestClient.java | 85 ++++++++++++++++--- .../opensearch/client/RestClientBuilder.java | 14 ++- .../client/RestClientCompressionTests.java | 13 +-- 3 files changed, 93 insertions(+), 19 deletions(-) diff --git a/client/rest/src/main/java/org/opensearch/client/RestClient.java b/client/rest/src/main/java/org/opensearch/client/RestClient.java index 4a7cd7225cbe1..dbfd9fe8d96c9 100644 --- a/client/rest/src/main/java/org/opensearch/client/RestClient.java +++ b/client/rest/src/main/java/org/opensearch/client/RestClient.java @@ -131,6 +131,7 @@ public class RestClient implements Closeable { private volatile NodeTuple> nodeTuple; private final WarningsHandler warningsHandler; private final boolean compressionEnabled; + private final boolean chunkedTransferEncodingEnabled; RestClient( CloseableHttpAsyncClient client, @@ -141,6 +142,20 @@ public class RestClient implements Closeable { NodeSelector nodeSelector, boolean strictDeprecationMode, boolean compressionEnabled + ) { + this(client, defaultHeaders, nodes, pathPrefix, failureListener, nodeSelector, strictDeprecationMode, compressionEnabled, true); + } + + RestClient( + CloseableHttpAsyncClient client, + Header[] defaultHeaders, + List nodes, + String pathPrefix, + FailureListener failureListener, + NodeSelector nodeSelector, + boolean strictDeprecationMode, + boolean compressionEnabled, + boolean chunkedTransferEncodingEnabled ) { this.client = client; this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders)); @@ -149,6 +164,7 @@ public class RestClient implements Closeable { this.nodeSelector = nodeSelector; this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE; this.compressionEnabled = compressionEnabled; + this.chunkedTransferEncodingEnabled = chunkedTransferEncodingEnabled; setNodes(nodes); } @@ -583,36 +599,51 @@ private static void addSuppressedException(Exception suppressedException, Except } } - private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity, boolean compressionEnabled) { + private static HttpRequestBase createHttpRequest( + String method, + URI uri, + HttpEntity entity, + boolean compressionEnabled, + boolean chunkedTransferEncodingEnabled + ) { switch (method.toUpperCase(Locale.ROOT)) { case HttpDeleteWithEntity.METHOD_NAME: - return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled); + return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); case HttpGetWithEntity.METHOD_NAME: - return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled); + return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); case HttpHead.METHOD_NAME: - return addRequestBody(new HttpHead(uri), entity, compressionEnabled); + return addRequestBody(new HttpHead(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); case HttpOptions.METHOD_NAME: - return addRequestBody(new HttpOptions(uri), entity, compressionEnabled); + return addRequestBody(new HttpOptions(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); case HttpPatch.METHOD_NAME: - return addRequestBody(new HttpPatch(uri), entity, compressionEnabled); + return addRequestBody(new HttpPatch(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); case HttpPost.METHOD_NAME: HttpPost httpPost = new HttpPost(uri); - addRequestBody(httpPost, entity, compressionEnabled); + addRequestBody(httpPost, entity, compressionEnabled, chunkedTransferEncodingEnabled); return httpPost; case HttpPut.METHOD_NAME: - return addRequestBody(new HttpPut(uri), entity, compressionEnabled); + return addRequestBody(new HttpPut(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); case HttpTrace.METHOD_NAME: - return addRequestBody(new HttpTrace(uri), entity, compressionEnabled); + return addRequestBody(new HttpTrace(uri), entity, compressionEnabled, chunkedTransferEncodingEnabled); default: throw new UnsupportedOperationException("http method not supported: " + method); } } - private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity, boolean compressionEnabled) { + private static HttpRequestBase addRequestBody( + HttpRequestBase httpRequest, + HttpEntity entity, + boolean compressionEnabled, + boolean chunkedTransferEncodingEnabled + ) { if (entity != null) { if (httpRequest instanceof HttpEntityEnclosingRequestBase) { if (compressionEnabled) { - entity = new ContentCompressingEntity(entity); + if (chunkedTransferEncodingEnabled) { + entity = new ContentCompressingChunkedEntity(entity); + } else { + entity = new ContentCompressingEntity(entity); + } } ((HttpEntityEnclosingRequestBase) httpRequest).setEntity(entity); } else { @@ -782,7 +813,13 @@ private class InternalRequest { String ignoreString = params.remove("ignore"); this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod()); URI uri = buildUri(pathPrefix, request.getEndpoint(), params); - this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled); + this.httpRequest = createHttpRequest( + request.getMethod(), + uri, + request.getEntity(), + compressionEnabled, + chunkedTransferEncodingEnabled + ); this.cancellable = Cancellable.fromRequest(httpRequest); setHeaders(httpRequest, request.getOptions().getHeaders()); setRequestConfig(httpRequest, request.getOptions().getRequestConfig()); @@ -932,6 +969,30 @@ private static Exception extractAndWrapCause(Exception exception) { return new RuntimeException("error while performing request", exception); } + /** + * A gzip compressing entity that also implements {@code getContent()}. + */ + public static class ContentCompressingChunkedEntity extends GzipCompressingEntity { + + /** + * Creates a {@link ContentCompressingChunkedEntity} instance with the provided HTTP entity. + * + * @param entity the HTTP entity. + */ + public ContentCompressingChunkedEntity(HttpEntity entity) { + super(entity); + } + + @Override + public InputStream getContent() throws IOException { + ByteArrayInputOutputStream out = new ByteArrayInputOutputStream(1024); + try (GZIPOutputStream gzipOut = new GZIPOutputStream(out)) { + wrappedEntity.writeTo(gzipOut); + } + return out.asInput(); + } + } + /** * A gzip compressing entity that also implements {@code getContent()}. */ diff --git a/client/rest/src/main/java/org/opensearch/client/RestClientBuilder.java b/client/rest/src/main/java/org/opensearch/client/RestClientBuilder.java index 0b259c7983ca5..938d1e1710c99 100644 --- a/client/rest/src/main/java/org/opensearch/client/RestClientBuilder.java +++ b/client/rest/src/main/java/org/opensearch/client/RestClientBuilder.java @@ -84,6 +84,7 @@ public final class RestClientBuilder { private NodeSelector nodeSelector = NodeSelector.ANY; private boolean strictDeprecationMode = false; private boolean compressionEnabled = false; + private boolean chunkedTransferEncodingEnabled = true; /** * Creates a new builder instance and sets the hosts that the client will send requests to. @@ -238,6 +239,16 @@ public RestClientBuilder setCompressionEnabled(boolean compressionEnabled) { return this; } + /** + * Whether the REST client should use Transfer-Encoding: chunked for compress requests" + * + * @param chunkedTransferEncodingEnabled flag for enabling Transfer-Encoding: chunked + */ + public RestClientBuilder setChunkedTransferEncodingEnabled(boolean chunkedTransferEncodingEnabled) { + this.chunkedTransferEncodingEnabled = chunkedTransferEncodingEnabled; + return this; + } + /** * Creates a new {@link RestClient} based on the provided configuration. */ @@ -256,7 +267,8 @@ public RestClient build() { failureListener, nodeSelector, strictDeprecationMode, - compressionEnabled + compressionEnabled, + chunkedTransferEncodingEnabled ); httpClient.start(); return restClient; diff --git a/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java b/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java index cc35fe1137edd..c0280ed7563b7 100644 --- a/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java +++ b/client/rest/src/test/java/org/opensearch/client/RestClientCompressionTests.java @@ -92,7 +92,9 @@ public void handle(HttpExchange exchange) throws IOException { } } - /** Read all bytes of an input stream and close it. */ + /** + * Read all bytes of an input stream and close it. + */ private static byte[] readAll(InputStream in) throws IOException { byte[] buffer = new byte[1024]; ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -104,15 +106,16 @@ private static byte[] readAll(InputStream in) throws IOException { return bos.toByteArray(); } - private RestClient createClient(boolean enableCompression) { + private RestClient createClient(boolean enableCompression, boolean chunkedEnabled) { InetSocketAddress address = httpServer.getAddress(); return RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http")) .setCompressionEnabled(enableCompression) + .setChunkedTransferEncodingEnabled(chunkedEnabled) .build(); } public void testCompressingClientWithContentLengthSync() throws Exception { - RestClient restClient = createClient(true); + RestClient restClient = createClient(true, false); Request request = new Request("POST", "/"); request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN)); @@ -129,9 +132,7 @@ public void testCompressingClientWithContentLengthSync() throws Exception { public void testCompressingClientContentLengthAsync() throws Exception { InetSocketAddress address = httpServer.getAddress(); - RestClient restClient = RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http")) - .setCompressionEnabled(true) - .build(); + RestClient restClient = createClient(true, false); Request request = new Request("POST", "/"); request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN)); From e0b361558e1e59fd4837c888c786b2b0b1866fc0 Mon Sep 17 00:00:00 2001 From: Jitendra Kumar Date: Fri, 1 Jul 2022 19:47:38 +0530 Subject: [PATCH 5/5] adding chunk support for non-compressed request Signed-off-by: Jitendra Kumar --- .../org/opensearch/client/RestClient.java | 25 +++++++++++++++++++ .../opensearch/client/RestClientBuilder.java | 2 +- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/client/rest/src/main/java/org/opensearch/client/RestClient.java b/client/rest/src/main/java/org/opensearch/client/RestClient.java index dbfd9fe8d96c9..dc0baa34e21de 100644 --- a/client/rest/src/main/java/org/opensearch/client/RestClient.java +++ b/client/rest/src/main/java/org/opensearch/client/RestClient.java @@ -36,6 +36,7 @@ import org.apache.http.ConnectionClosedException; import org.apache.http.Header; import org.apache.http.HttpEntity; +import org.apache.http.entity.HttpEntityWrapper; import org.apache.http.HttpHost; import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; @@ -644,6 +645,8 @@ private static HttpRequestBase addRequestBody( } else { entity = new ContentCompressingEntity(entity); } + } else if (chunkedTransferEncodingEnabled) { + entity = new ChunkedHttpEntity(entity); } ((HttpEntityEnclosingRequestBase) httpRequest).setEntity(entity); } else { @@ -1045,6 +1048,28 @@ public long getContentLength() { } } + public static class ChunkedHttpEntity extends HttpEntityWrapper { + /** + * Creates a {@link ChunkedHttpEntity} instance with the provided HTTP entity. + * + * @param entity the HTTP entity. + */ + public ChunkedHttpEntity(HttpEntity entity) { + super(entity); + } + + /** + * A chunked entity requires transfer-encoding:chunked in http headers + * which requires isChunked to be true + * + * @return true + */ + @Override + public boolean isChunked() { + return true; + } + } + /** * A ByteArrayOutputStream that can be turned into an input stream without copying the underlying buffer. */ diff --git a/client/rest/src/main/java/org/opensearch/client/RestClientBuilder.java b/client/rest/src/main/java/org/opensearch/client/RestClientBuilder.java index 938d1e1710c99..3032a43823cd8 100644 --- a/client/rest/src/main/java/org/opensearch/client/RestClientBuilder.java +++ b/client/rest/src/main/java/org/opensearch/client/RestClientBuilder.java @@ -240,7 +240,7 @@ public RestClientBuilder setCompressionEnabled(boolean compressionEnabled) { } /** - * Whether the REST client should use Transfer-Encoding: chunked for compress requests" + * Whether the REST client should use Transfer-Encoding: chunked for requests or not" * * @param chunkedTransferEncodingEnabled flag for enabling Transfer-Encoding: chunked */