Skip to content

Commit

Permalink
Fix Failure to Drain Stream in GCS Repo Tests
Browse files Browse the repository at this point in the history
Same as elastic#51933 but for the custom handler just used in this test.

Closes elastic#52430
  • Loading branch information
original-brownbear committed Feb 17, 2020
1 parent c835c9d commit a8e6e4f
Showing 1 changed file with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
Expand All @@ -52,7 +53,6 @@
import org.junit.Before;
import org.threeten.bp.Duration;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
Expand Down Expand Up @@ -323,14 +323,17 @@ public void testWriteLargeBlob() throws IOException {
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());

httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
// read all the request body, otherwise the SDK client throws a non-retryable StorageException
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());

final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
assertThat(params.get("uploadType"), equalTo("resumable"));

if ("POST".equals(exchange.getRequestMethod())) {
assertThat(params.get("name"), equalTo("write_large_blob"));
if (countInits.decrementAndGet() <= 0) {
byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
byte[] response = requestBody.utf8ToString().getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.getResponseHeaders().add("Location", httpServerUrl() +
"/upload/storage/v1/b/bucket/o?uploadType=resumable&upload_id=" + sessionUploadId.get());
Expand All @@ -348,7 +351,6 @@ public void testWriteLargeBlob() throws IOException {
if (uploadId.equals(sessionUploadId.get()) == false) {
logger.debug("session id [{}] is gone", uploadId);
assertThat(wrongChunk, greaterThan(0));
Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1);
return;
}
Expand All @@ -367,7 +369,6 @@ public void testWriteLargeBlob() throws IOException {
countInits.set(nbErrors);
countUploads.set(nbErrors * totalChunks);

Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1);
return;
}
Expand All @@ -377,14 +378,12 @@ public void testWriteLargeBlob() throws IOException {
assertTrue(Strings.hasLength(range));

if (countUploads.decrementAndGet() % 2 == 0) {
final ByteArrayOutputStream requestBody = new ByteArrayOutputStream();
final long bytesRead = Streams.copy(exchange.getRequestBody(), requestBody);
assertThat(Math.toIntExact(bytesRead), anyOf(equalTo(defaultChunkSize), equalTo(lastChunkSize)));
assertThat(Math.toIntExact(requestBody.length()), anyOf(equalTo(defaultChunkSize), equalTo(lastChunkSize)));

final int rangeStart = getContentRangeStart(range);
final int rangeEnd = getContentRangeEnd(range);
assertThat(rangeEnd + 1 - rangeStart, equalTo(Math.toIntExact(bytesRead)));
assertArrayEquals(Arrays.copyOfRange(data, rangeStart, rangeEnd + 1), requestBody.toByteArray());
assertThat(rangeEnd + 1 - rangeStart, equalTo(Math.toIntExact(requestBody.length())));
assertThat(new BytesArray(data, rangeStart, rangeEnd - rangeStart + 1), is(requestBody));

final Integer limit = getContentRangeLimit(range);
if (limit != null) {
Expand All @@ -399,8 +398,6 @@ public void testWriteLargeBlob() throws IOException {
}
}

// read all the request body, otherwise the SDK client throws a non-retryable StorageException
Streams.readFully(exchange.getRequestBody());
if (randomBoolean()) {
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
}
Expand Down

0 comments on commit a8e6e4f

Please sign in to comment.