From 623ef8aac588464158b5c051857763a87a508ec3 Mon Sep 17 00:00:00 2001 From: Liyun Xiu Date: Wed, 31 Jul 2024 15:09:29 +0800 Subject: [PATCH 1/4] Fix ingest NPE for empty pipeline Signed-off-by: Liyun Xiu --- .../org/opensearch/ingest/IngestService.java | 10 ++++- .../opensearch/ingest/IngestServiceTests.java | 37 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 17eb23422e68b..938ca7493926e 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -997,7 +997,7 @@ private void innerBatchExecute( Consumer> handler ) { if (pipeline.getProcessors().isEmpty()) { - handler.accept(null); + handler.accept(toIngestDocumentWrappers(slots, indexRequests)); return; } @@ -1271,6 +1271,14 @@ private static IngestDocumentWrapper toIngestDocumentWrapper(int slot, IndexRequ return new IngestDocumentWrapper(slot, toIngestDocument(indexRequest), null); } + private static List toIngestDocumentWrappers(List slots, List indexRequests) { + List ingestDocumentWrappers = new ArrayList<>(); + for (int i = 0; i < slots.size(); ++i) { + ingestDocumentWrappers.add(toIngestDocumentWrapper(slots.get(i), indexRequests.get(i))); + } + return ingestDocumentWrappers; + } + private static Map createSlotIndexRequestMap(List slots, List indexRequests) { Map slotIndexRequestMap = new HashMap<>(); for (int i = 0; i < slots.size(); ++i) { diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 166b94966196c..a97712169022a 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -1995,6 +1995,43 @@ public void testExecuteBulkRequestInBatchWithDefaultBatchSize() { verify(mockCompoundProcessor, never()).execute(any(), any()); } + public void testExecuteEmptyPipelineInBatch() throws Exception { + IngestService ingestService = createWithProcessors(emptyMap()); + PutPipelineRequest putRequest = new PutPipelineRequest( + "_id", + new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), + MediaTypeRegistry.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest2); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_none").setFinalPipeline("_id"); + bulkRequest.add(indexRequest3); + IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest4); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 4, + bulkRequest.requests(), + failureHandler, + completionHandler, + indexReq -> {}, + Names.WRITE, + mockBulkRequest + ); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + } + public void testPrepareBatches_same_index_pipeline() { IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); From 6940c06fd5b3b0fa2b7d3d26692338d62b205f4a Mon Sep 17 00:00:00 2001 From: Liyun Xiu Date: Wed, 31 Jul 2024 15:14:21 +0800 Subject: [PATCH 2/4] Update changelog Signed-off-by: Liyun Xiu --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9689e391c6df3..439bcd4d36c40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Fix constraint bug which allows more primary shards than average primary shards per index ([#14908](https://github.com/opensearch-project/OpenSearch/pull/14908)) +- Fix bulk ingest NPE with empty pipeline ([#15033](https://github.com/opensearch-project/OpenSearch/pull/15033)) - Fix missing value of FieldSort for unsigned_long ([#14963](https://github.com/opensearch-project/OpenSearch/pull/14963)) ### Security From 09630bef4d551a69c6eb0e56528e4b959f35cfe4 Mon Sep 17 00:00:00 2001 From: Liyun Xiu Date: Thu, 1 Aug 2024 10:03:07 +0800 Subject: [PATCH 3/4] Fix UT Signed-off-by: Liyun Xiu --- .../opensearch/ingest/IngestServiceTests.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index a97712169022a..1f4b1d635d438 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -2011,25 +2011,24 @@ public void testExecuteEmptyPipelineInBatch() throws Exception { bulkRequest.add(indexRequest1); IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); bulkRequest.add(indexRequest2); - IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_none").setFinalPipeline("_id"); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); bulkRequest.add(indexRequest3); IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); bulkRequest.add(indexRequest4); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); + bulkRequest.batchSize(4); + final Map failureHandler = new HashMap<>(); + final Map completionHandler = new HashMap<>(); ingestService.executeBulkRequest( 4, bulkRequest.requests(), - failureHandler, - completionHandler, + failureHandler::put, + completionHandler::put, indexReq -> {}, Names.WRITE, - mockBulkRequest + bulkRequest ); - verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + assertTrue(failureHandler.isEmpty()); + assertEquals(Set.of(Thread.currentThread()), completionHandler.keySet()); } public void testPrepareBatches_same_index_pipeline() { From 5557696d7529628a7e0bf85696e9f12664b62c3e Mon Sep 17 00:00:00 2001 From: Liyun Xiu Date: Tue, 6 Aug 2024 09:44:41 +0800 Subject: [PATCH 4/4] Update changelog to trigger build Signed-off-by: Liyun Xiu --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8830cf9e8464..6d59e9fef61d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,7 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Fix constraint bug which allows more primary shards than average primary shards per index ([#14908](https://github.com/opensearch-project/OpenSearch/pull/14908)) -- Fix bulk ingest NPE with empty pipeline ([#15033](https://github.com/opensearch-project/OpenSearch/pull/15033)) +- Fix NPE when bulk ingest with empty pipeline ([#15033](https://github.com/opensearch-project/OpenSearch/pull/15033)) - Fix missing value of FieldSort for unsigned_long ([#14963](https://github.com/opensearch-project/OpenSearch/pull/14963)) - Fix delete index template failed when the index template matches a data stream but is unused ([#15080](https://github.com/opensearch-project/OpenSearch/pull/15080))