From d480027b7ada659a383fc4722bb01dcbac320311 Mon Sep 17 00:00:00 2001 From: Liyun Xiu Date: Fri, 14 Jun 2024 09:57:02 -0700 Subject: [PATCH] [Bugfix] Fix incorrect document order when there's exception during batch ingest (#14341) Signed-off-by: Liyun Xiu --- .../org/opensearch/ingest/IngestClientIT.java | 111 ++++++++++++++++++ .../org/opensearch/ingest/IngestService.java | 18 +-- .../opensearch/ingest/IngestServiceTests.java | 22 +++- 3 files changed, 136 insertions(+), 15 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java b/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java index 9481a6116cdbc..dbde31ef1eb65 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java @@ -60,15 +60,18 @@ import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; +import org.hamcrest.MatcherAssert; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import static org.opensearch.test.NodeRoles.nonIngestNode; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; @@ -159,6 +162,14 @@ public void testSimulate() throws Exception { } public void testBulkWithIngestFailures() throws Exception { + runBulkTestWithRandomDocs(false); + } + + public void testBulkWithIngestFailuresWithBatchSize() throws Exception { + runBulkTestWithRandomDocs(true); + } + + private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Exception { createIndex("index"); BytesReference source = BytesReference.bytes( @@ -177,6 +188,9 @@ public void testBulkWithIngestFailures() throws Exception { int numRequests = scaledRandomIntBetween(32, 128); BulkRequest bulkRequest = new BulkRequest(); + if (shouldSetBatchSize) { + bulkRequest.batchSize(numRequests); + } for (int i = 0; i < numRequests; i++) { IndexRequest indexRequest = new IndexRequest("index").id(Integer.toString(i)).setPipeline("_id"); indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", i % 2 == 0); @@ -209,6 +223,103 @@ public void testBulkWithIngestFailures() throws Exception { assertTrue(deletePipelineResponse.isAcknowledged()); } + public void testBulkWithIngestFailuresBatch() throws Exception { + createIndex("index"); + + BytesReference source = BytesReference.bytes( + jsonBuilder().startObject() + .field("description", "my_pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + .endObject() + ); + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.batchSize(2); + bulkRequest.add( + new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true) + ); + bulkRequest.add( + new IndexRequest("index").id("_success").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", false) + ); + + BulkResponse response = client().bulk(bulkRequest).actionGet(); + MatcherAssert.assertThat(response.getItems().length, equalTo(bulkRequest.requests().size())); + + Map results = Arrays.stream(response.getItems()) + .collect(Collectors.toMap(BulkItemResponse::getId, r -> r)); + + MatcherAssert.assertThat(results.keySet(), containsInAnyOrder("_fail", "_success")); + assertNotNull(results.get("_fail").getFailure()); + assertNull(results.get("_success").getFailure()); + + // verify field of successful doc + Map successDoc = client().prepareGet("index", "_success").get().getSourceAsMap(); + assertThat(successDoc.get("processed"), equalTo(true)); + + // cleanup + AcknowledgedResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get(); + assertTrue(deletePipelineResponse.isAcknowledged()); + } + + public void testBulkWithIngestFailuresAndDropBatch() throws Exception { + createIndex("index"); + + BytesReference source = BytesReference.bytes( + jsonBuilder().startObject() + .field("description", "my_pipeline") + .startArray("processors") + .startObject() + .startObject("test") + .endObject() + .endObject() + .endArray() + .endObject() + ); + PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).get(); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.batchSize(3); + bulkRequest.add( + new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true) + ); + bulkRequest.add( + new IndexRequest("index").id("_success").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", false) + ); + bulkRequest.add( + new IndexRequest("index").id("_drop").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "drop", true) + ); + + BulkResponse response = client().bulk(bulkRequest).actionGet(); + MatcherAssert.assertThat(response.getItems().length, equalTo(bulkRequest.requests().size())); + + Map results = Arrays.stream(response.getItems()) + .collect(Collectors.toMap(BulkItemResponse::getId, r -> r)); + + MatcherAssert.assertThat(results.keySet(), containsInAnyOrder("_fail", "_success", "_drop")); + assertNotNull(results.get("_fail").getFailure()); + assertNull(results.get("_success").getFailure()); + assertNull(results.get("_drop").getFailure()); + + // verify dropped doc not in index + assertNull(client().prepareGet("index", "_drop").get().getSourceAsMap()); + + // verify field of successful doc + Map successDoc = client().prepareGet("index", "_success").get().getSourceAsMap(); + assertThat(successDoc.get("processed"), equalTo(true)); + + // cleanup + AcknowledgedResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get(); + assertTrue(deletePipelineResponse.isAcknowledged()); + } + public void testBulkWithUpsert() throws Exception { createIndex("index"); diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index ab8e823199447..2281ccd4c0382 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -775,7 +775,7 @@ private void executePipelinesInBatchRequests( ), results.get(i).getException() ); - onFailure.accept(slots.get(i), results.get(i).getException()); + onFailure.accept(results.get(i).getSlot(), results.get(i).getException()); } } @@ -1092,15 +1092,15 @@ private void innerBatchExecute( } if (!exceptions.isEmpty()) { totalMetrics.failedN(exceptions.size()); - } else if (!dropped.isEmpty()) { + } + if (!dropped.isEmpty()) { dropped.forEach(t -> itemDroppedHandler.accept(t.getSlot())); - } else { - for (IngestDocumentWrapper ingestDocumentWrapper : succeeded) { - updateIndexRequestWithIngestDocument( - slotToindexRequestMap.get(ingestDocumentWrapper.getSlot()), - ingestDocumentWrapper.getIngestDocument() - ); - } + } + for (IngestDocumentWrapper ingestDocumentWrapper : succeeded) { + updateIndexRequestWithIngestDocument( + slotToindexRequestMap.get(ingestDocumentWrapper.getSlot()), + ingestDocumentWrapper.getIngestDocument() + ); } handler.accept(allResults); } diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 6d216370bae9a..a32cd2c3cad3f 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -97,6 +97,7 @@ import java.util.function.LongSupplier; import java.util.stream.Collectors; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -1894,7 +1895,7 @@ public void testExecuteBulkRequestInBatchWithException() { verify(mockCompoundProcessor, never()).execute(any(), any()); } - public void testExecuteBulkRequestInBatchWithExceptionInCallback() { + public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() { CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) @@ -1906,11 +1907,14 @@ public void testExecuteBulkRequestInBatchWithExceptionInCallback() { bulkRequest.add(indexRequest1); IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); bulkRequest.add(indexRequest2); - bulkRequest.batchSize(2); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest3); + bulkRequest.batchSize(3); List results = Arrays.asList( new IngestDocumentWrapper(0, IngestService.toIngestDocument(indexRequest1), null), - new IngestDocumentWrapper(1, null, new RuntimeException()) + new IngestDocumentWrapper(1, null, new RuntimeException()), + new IngestDocumentWrapper(2, null, null) ); doAnswer(args -> { @SuppressWarnings("unchecked") @@ -1923,16 +1927,22 @@ public void testExecuteBulkRequestInBatchWithExceptionInCallback() { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); + final IntConsumer dropHandler = mock(IntConsumer.class); ingestService.executeBulkRequest( - 2, + 3, bulkRequest.requests(), failureHandler, completionHandler, - indexReq -> {}, + dropHandler, Names.WRITE, bulkRequest ); - verify(failureHandler, times(1)).accept(any(), any()); + ArgumentCaptor failureSlotCaptor = ArgumentCaptor.forClass(Integer.class); + verify(failureHandler, times(1)).accept(failureSlotCaptor.capture(), any()); + assertEquals(1, failureSlotCaptor.getValue().intValue()); + ArgumentCaptor dropSlotCaptor = ArgumentCaptor.forClass(Integer.class); + verify(dropHandler, times(1)).accept(dropSlotCaptor.capture()); + assertEquals(2, dropSlotCaptor.getValue().intValue()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); verify(mockCompoundProcessor, times(1)).batchExecute(any(), any()); verify(mockCompoundProcessor, never()).execute(any(), any());