Skip to content

Commit

Permalink
[Bugfix] Fix incorrect document order when there's exception during b…
Browse files Browse the repository at this point in the history
…atch ingest (#14341)

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
  • Loading branch information
chishui committed Jun 14, 2024
1 parent 0c2ff03 commit d480027
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.hamcrest.MatcherAssert;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.NodeRoles.nonIngestNode;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -159,6 +162,14 @@ public void testSimulate() throws Exception {
}

public void testBulkWithIngestFailures() throws Exception {
runBulkTestWithRandomDocs(false);
}

public void testBulkWithIngestFailuresWithBatchSize() throws Exception {
runBulkTestWithRandomDocs(true);
}

private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Exception {
createIndex("index");

BytesReference source = BytesReference.bytes(
Expand All @@ -177,6 +188,9 @@ public void testBulkWithIngestFailures() throws Exception {

int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
if (shouldSetBatchSize) {
bulkRequest.batchSize(numRequests);
}
for (int i = 0; i < numRequests; i++) {
IndexRequest indexRequest = new IndexRequest("index").id(Integer.toString(i)).setPipeline("_id");
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", i % 2 == 0);
Expand Down Expand Up @@ -209,6 +223,103 @@ public void testBulkWithIngestFailures() throws Exception {
assertTrue(deletePipelineResponse.isAcknowledged());
}

public void testBulkWithIngestFailuresBatch() throws Exception {
createIndex("index");

BytesReference source = BytesReference.bytes(
jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject()
);
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.batchSize(2);
bulkRequest.add(
new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true)
);
bulkRequest.add(
new IndexRequest("index").id("_success").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", false)
);

BulkResponse response = client().bulk(bulkRequest).actionGet();
MatcherAssert.assertThat(response.getItems().length, equalTo(bulkRequest.requests().size()));

Map<String, BulkItemResponse> results = Arrays.stream(response.getItems())
.collect(Collectors.toMap(BulkItemResponse::getId, r -> r));

MatcherAssert.assertThat(results.keySet(), containsInAnyOrder("_fail", "_success"));
assertNotNull(results.get("_fail").getFailure());
assertNull(results.get("_success").getFailure());

// verify field of successful doc
Map<String, Object> successDoc = client().prepareGet("index", "_success").get().getSourceAsMap();
assertThat(successDoc.get("processed"), equalTo(true));

// cleanup
AcknowledgedResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get();
assertTrue(deletePipelineResponse.isAcknowledged());
}

public void testBulkWithIngestFailuresAndDropBatch() throws Exception {
createIndex("index");

BytesReference source = BytesReference.bytes(
jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject()
);
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.batchSize(3);
bulkRequest.add(
new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true)
);
bulkRequest.add(
new IndexRequest("index").id("_success").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", false)
);
bulkRequest.add(
new IndexRequest("index").id("_drop").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "drop", true)
);

BulkResponse response = client().bulk(bulkRequest).actionGet();
MatcherAssert.assertThat(response.getItems().length, equalTo(bulkRequest.requests().size()));

Map<String, BulkItemResponse> results = Arrays.stream(response.getItems())
.collect(Collectors.toMap(BulkItemResponse::getId, r -> r));

MatcherAssert.assertThat(results.keySet(), containsInAnyOrder("_fail", "_success", "_drop"));
assertNotNull(results.get("_fail").getFailure());
assertNull(results.get("_success").getFailure());
assertNull(results.get("_drop").getFailure());

// verify dropped doc not in index
assertNull(client().prepareGet("index", "_drop").get().getSourceAsMap());

// verify field of successful doc
Map<String, Object> successDoc = client().prepareGet("index", "_success").get().getSourceAsMap();
assertThat(successDoc.get("processed"), equalTo(true));

// cleanup
AcknowledgedResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get();
assertTrue(deletePipelineResponse.isAcknowledged());
}

public void testBulkWithUpsert() throws Exception {
createIndex("index");

Expand Down
18 changes: 9 additions & 9 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ private void executePipelinesInBatchRequests(
),
results.get(i).getException()
);
onFailure.accept(slots.get(i), results.get(i).getException());
onFailure.accept(results.get(i).getSlot(), results.get(i).getException());
}
}

Expand Down Expand Up @@ -1092,15 +1092,15 @@ private void innerBatchExecute(
}
if (!exceptions.isEmpty()) {
totalMetrics.failedN(exceptions.size());
} else if (!dropped.isEmpty()) {
}
if (!dropped.isEmpty()) {
dropped.forEach(t -> itemDroppedHandler.accept(t.getSlot()));
} else {
for (IngestDocumentWrapper ingestDocumentWrapper : succeeded) {
updateIndexRequestWithIngestDocument(
slotToindexRequestMap.get(ingestDocumentWrapper.getSlot()),
ingestDocumentWrapper.getIngestDocument()
);
}
}
for (IngestDocumentWrapper ingestDocumentWrapper : succeeded) {
updateIndexRequestWithIngestDocument(
slotToindexRequestMap.get(ingestDocumentWrapper.getSlot()),
ingestDocumentWrapper.getIngestDocument()
);
}
handler.accept(allResults);
}
Expand Down
22 changes: 16 additions & 6 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;

Expand Down Expand Up @@ -1894,7 +1895,7 @@ public void testExecuteBulkRequestInBatchWithException() {
verify(mockCompoundProcessor, never()).execute(any(), any());
}

public void testExecuteBulkRequestInBatchWithExceptionInCallback() {
public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() {
CompoundProcessor mockCompoundProcessor = mockCompoundProcessor();
IngestService ingestService = createWithProcessors(
Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor)
Expand All @@ -1906,11 +1907,14 @@ public void testExecuteBulkRequestInBatchWithExceptionInCallback() {
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest2);
bulkRequest.batchSize(2);
IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest3);
bulkRequest.batchSize(3);

List<IngestDocumentWrapper> results = Arrays.asList(
new IngestDocumentWrapper(0, IngestService.toIngestDocument(indexRequest1), null),
new IngestDocumentWrapper(1, null, new RuntimeException())
new IngestDocumentWrapper(1, null, new RuntimeException()),
new IngestDocumentWrapper(2, null, null)
);
doAnswer(args -> {
@SuppressWarnings("unchecked")
Expand All @@ -1923,16 +1927,22 @@ public void testExecuteBulkRequestInBatchWithExceptionInCallback() {
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
final IntConsumer dropHandler = mock(IntConsumer.class);
ingestService.executeBulkRequest(
2,
3,
bulkRequest.requests(),
failureHandler,
completionHandler,
indexReq -> {},
dropHandler,
Names.WRITE,
bulkRequest
);
verify(failureHandler, times(1)).accept(any(), any());
ArgumentCaptor<Integer> failureSlotCaptor = ArgumentCaptor.forClass(Integer.class);
verify(failureHandler, times(1)).accept(failureSlotCaptor.capture(), any());
assertEquals(1, failureSlotCaptor.getValue().intValue());
ArgumentCaptor<Integer> dropSlotCaptor = ArgumentCaptor.forClass(Integer.class);
verify(dropHandler, times(1)).accept(dropSlotCaptor.capture());
assertEquals(2, dropSlotCaptor.getValue().intValue());
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
verify(mockCompoundProcessor, times(1)).batchExecute(any(), any());
verify(mockCompoundProcessor, never()).execute(any(), any());
Expand Down

0 comments on commit d480027

Please sign in to comment.