diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 9d03127692975..355ccacc01945 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; +import org.hamcrest.MatcherAssert; import org.opensearch.OpenSearchParseException; import org.opensearch.ResourceNotFoundException; import org.opensearch.Version; @@ -104,8 +105,10 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -1106,27 +1109,23 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } - public void testBulkRequestExecutionWithFailures() throws Exception { + public void testBulkRequestExecutionWithFailures() { BulkRequest bulkRequest = new BulkRequest(); String pipelineId = "_id"; - int numRequest = scaledRandomIntBetween(8, 64); - int numIndexRequests = 0; - for (int i = 0; i < numRequest; i++) { - DocWriteRequest request; + int numIndexRequests = scaledRandomIntBetween(4, 32); + for (int i = 0; i < numIndexRequests; i++) { + IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + bulkRequest.add(indexRequest); + } + int numOtherRequests = scaledRandomIntBetween(4, 32); + for (int i = 0; i < numOtherRequests; i++) { if (randomBoolean()) { - if (randomBoolean()) { - request = new DeleteRequest("_index", "_id"); - } else { - request = new UpdateRequest("_index", "_id"); - } + bulkRequest.add(new DeleteRequest("_index", "_id")); } else { - IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); - indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); - request = indexRequest; - numIndexRequests++; + bulkRequest.add(new UpdateRequest("_index", "_id")); } - bulkRequest.add(request); } CompoundProcessor processor = mock(CompoundProcessor.class); @@ -1155,23 +1154,22 @@ public void testBulkRequestExecutionWithFailures() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + final Map errorHandler = new HashMap<>(); + final Map completionHandler = new HashMap<>(); ingestService.executeBulkRequest( - numRequest, + numIndexRequests + numOtherRequests, bulkRequest.requests(), - requestItemErrorHandler, - completionHandler, + errorHandler::put, + completionHandler::put, indexReq -> {}, Names.WRITE, bulkRequest ); - verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(o -> o.getCause().equals(error))); + MatcherAssert.assertThat(errorHandler.entrySet(), hasSize(numIndexRequests)); + errorHandler.values().forEach(e -> assertEquals(e.getCause(), error)); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + MatcherAssert.assertThat(completionHandler.keySet(), contains(Thread.currentThread())); } public void testBulkRequestExecution() throws Exception {