Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for bug: https://github.com/opensearch-project/OpenSearch/issues/… #3665

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions client/rest/src/main/java/org/opensearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,41 @@ public InputStream getContent() throws IOException {
}
return out.asInput();
}

/**
* A gzip compressing enrity doesn't worked with chunked encoding with sigv4
dblock marked this conversation as resolved.
Show resolved Hide resolved
*
* @return false
*/
@Override
public boolean isChunked() {
return false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer always the case. A compressing entity can be both chunked or not chunked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes but we require that flag to tell it's a chunked or not and set correct headers


/**
* A gzip entity require to content length in http headers
dblock marked this conversation as resolved.
Show resolved Hide resolved
* as it doesn't work with chunked encoding for sigv4
*
* @return content lenght of gzip entity
dblock marked this conversation as resolved.
Show resolved Hide resolved
*/
@Override
public long getContentLength() {
long size = 0;
int chunk = 0;
byte[] buffer = new byte[1024];

try {
InputStream is = getContent();

while ((chunk = is.read(buffer)) != -1) {
size += chunk;
}
dblock marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception ex) {
throw new RuntimeException("failed to get compressed content lenght: " + ex.getMessage());
dblock marked this conversation as resolved.
Show resolved Hide resolved
}

return size;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to get the size of the compressed entity directly without reading all the bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't see any method that can gives us that info directly

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new code and doesn't require a license to ElasticSearch, just the SPDX header.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack will update in next commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do similar/related work in https://github.com/opensearch-project/opensearch-java?
from dependency it's also using opensearch-rest-client
https://github.com/opensearch-project/opensearch-java/blob/main/java-client/build.gradle.kts#L131
so it's only require version change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do check if there's anything else to do, I am not sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only see dependency not much change required in opensearch-java apart from it.


/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
dblock marked this conversation as resolved.
Show resolved Hide resolved

package org.opensearch.client;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class RestClientCompressionTests extends RestClientTestCase {

private static HttpServer httpServer;

@BeforeClass
public static void startHttpServer() throws Exception {
httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.createContext("/", new GzipResponseHandler());
httpServer.start();
}

@AfterClass
public static void stopHttpServers() throws IOException {
httpServer.stop(0);
httpServer = null;
}

/**
* A response handler that accepts gzip-encoded data and replies request and response encoding values
* followed by the request body. The response is compressed if "Accept-Encoding" is "gzip".
*/
private static class GzipResponseHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {

// Decode body (if any)
String contentEncoding = exchange.getRequestHeaders().getFirst("Content-Encoding");
String contentLength = exchange.getRequestHeaders().getFirst("Content-Length");
InputStream body = exchange.getRequestBody();
boolean compressedRequest = false;
if ("gzip".equals(contentEncoding)) {
body = new GZIPInputStream(body);
compressedRequest = true;
}
byte[] bytes = readAll(body);
boolean compress = "gzip".equals(exchange.getRequestHeaders().getFirst("Accept-Encoding"));
if (compress) {
exchange.getResponseHeaders().add("Content-Encoding", "gzip");
}

exchange.sendResponseHeaders(200, 0);

// Encode response if needed
OutputStream out = exchange.getResponseBody();
if (compress) {
out = new GZIPOutputStream(out);
}

// Outputs <request-encoding|null>#<response-encoding|null>#<request-body>
out.write(String.valueOf(contentEncoding).getBytes(StandardCharsets.UTF_8));
out.write('#');
out.write((compress ? "gzip" : "null").getBytes(StandardCharsets.UTF_8));
out.write('#');
out.write((compressedRequest ? contentLength : "null").getBytes(StandardCharsets.UTF_8));
out.write('#');
out.write(bytes);
out.close();

exchange.close();
}
}

/** Read all bytes of an input stream and close it. */
private static byte[] readAll(InputStream in) throws IOException {
byte[] buffer = new byte[1024];
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int len = 0;
while ((len = in.read(buffer)) > 0) {
bos.write(buffer, 0, len);
}
in.close();
return bos.toByteArray();
}

private RestClient createClient(boolean enableCompression) {
InetSocketAddress address = httpServer.getAddress();
return RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
.setCompressionEnabled(enableCompression)
.build();
}

public void testCompressingClientWithContentLengthSync() throws Exception {
RestClient restClient = createClient(true);

Request request = new Request("POST", "/");
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));

Response response = restClient.performRequest(request);

HttpEntity entity = response.getEntity();
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
// Content-Encoding#Accept-Encoding#Content-Length#Content
Assert.assertEquals("gzip#gzip#38#compressing client", content);

restClient.close();
}

public void testCompressingClientContentLengthAsync() throws Exception {
InetSocketAddress address = httpServer.getAddress();
RestClient restClient = RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
.setCompressionEnabled(true)
.build();

Request request = new Request("POST", "/");
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));

FutureResponse futureResponse = new FutureResponse();
restClient.performRequestAsync(request, futureResponse);
Response response = futureResponse.get();

// Server should report it had a compressed request and sent back a compressed response
HttpEntity entity = response.getEntity();
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);

// Content-Encoding#Accept-Encoding#Content-Length#Content
Assert.assertEquals("gzip#gzip#38#compressing client", content);

restClient.close();
}

public static class FutureResponse extends CompletableFuture<Response> implements ResponseListener {
@Override
public void onSuccess(Response response) {
this.complete(response);
}

@Override
public void onFailure(Exception exception) {
this.completeExceptionally(exception);
}
}
}