Skip to content

Commit

Permalink
Add more test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Aug 27, 2024
1 parent 5a351ca commit b833dce
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testStreamingRequestNoBatching() throws IOException {
assertThat(count, equalTo(5));
}

public void testStreamingRequestOneBatch() throws IOException, InterruptedException {
public void testStreamingRequestOneBatchBySize() throws IOException, InterruptedException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

Expand Down Expand Up @@ -126,7 +126,7 @@ public void testStreamingRequestOneBatch() throws IOException, InterruptedExcept
assertThat(count, equalTo(5));
}

public void testStreamingRequestManyBatches() throws IOException {
public void testStreamingRequestManyBatchesBySize() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

Expand Down Expand Up @@ -169,6 +169,40 @@ public void testStreamingRequestManyBatches() throws IOException {
assertThat(count, equalTo(5));
}

public void testStreamingRequestManyBatchesByIntervalAndSize() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

final Duration delay = Duration.ofSeconds(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.addParameter("batch_interval", "3s");
streamingRequest.addParameter("batch_size", "4");

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

// We don't check for a other documents here since those may appear in any of the chunks (it is very
// difficult to get the timing right). But at the end, the total number of the documents is being checked.
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\""))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"5\""))
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingBadRequest() throws IOException {
final Stream<String> stream = Stream.of(
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n" + "{ \"name\": \"josh\" }\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ public boolean allowsUnsafeBuffers() {
return true;
}

private Flux<List<HttpChunk>> createBufferedFlux(final TimeValue batch_interval, final int batch_size, StreamingRestChannel channel) {
if (batch_interval != null) {
return Flux.from(channel).bufferTimeout(batch_size, Duration.ofMillis(batch_interval.millis()));
private Flux<List<HttpChunk>> createBufferedFlux(final TimeValue batchInterval, final int batchSize, StreamingRestChannel channel) {
if (batchInterval != null) {
return Flux.from(channel).bufferTimeout(batchSize, Duration.ofMillis(batchInterval.millis()));

Check warning on line 238 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L238

Added line #L238 was not covered by tests
} else {
return Flux.from(channel).buffer(batch_size);
return Flux.from(channel).buffer(batchSize);

Check warning on line 240 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L240

Added line #L240 was not covered by tests
}
}
}

0 comments on commit b833dce

Please sign in to comment.