From e7828c09b54ff4aef6d58b80cbeb6dcb9a293ec0 Mon Sep 17 00:00:00 2001 From: Liyun Xiu Date: Wed, 1 May 2024 02:30:21 +0800 Subject: [PATCH] [Backport 2.x] Support batch ingestion in bulk API (#12457) (#13306) (#13462) * Support batch ingestion in bulk API (#12457) (#13306) * [PoC][issues-12457] Support Batch Ingestion Signed-off-by: Liyun Xiu * Rewrite batch interface and handle error and metrics Signed-off-by: Liyun Xiu * Remove unnecessary change Signed-off-by: Liyun Xiu * Revert some unnecessary test change Signed-off-by: Liyun Xiu * Keep executeBulkRequest main logic untouched Signed-off-by: Liyun Xiu * Add UT Signed-off-by: Liyun Xiu * Add UT & yamlRest test, fix BulkRequest se/deserialization Signed-off-by: Liyun Xiu * Add missing java docs Signed-off-by: Liyun Xiu * Remove Writable from BatchIngestionOption Signed-off-by: Liyun Xiu * Add more UTs Signed-off-by: Liyun Xiu * Fix spotlesscheck Signed-off-by: Liyun Xiu * Rename parameter name to batch_size Signed-off-by: Liyun Xiu * Add more rest yaml tests & update rest spec Signed-off-by: Liyun Xiu * Remove batch_ingestion_option and only use batch_size Signed-off-by: Liyun Xiu * Throw invalid request exception for invalid batch_size Signed-off-by: Liyun Xiu * Update server/src/main/java/org/opensearch/action/bulk/BulkRequest.java Co-authored-by: Andriy Redko Signed-off-by: Liyun Xiu * Remove version constant Signed-off-by: Liyun Xiu --------- Signed-off-by: Liyun Xiu Signed-off-by: Liyun Xiu Co-authored-by: Andriy Redko (cherry picked from commit 1219c568248fafa479d67a1eaa6e3e2d9748701e) * Adjust changelog item position to trigger CI Signed-off-by: Liyun Xiu --------- Signed-off-by: Liyun Xiu --- CHANGELOG.md | 1 + .../rest-api-spec/test/ingest/70_bulk.yml | 87 ++++ .../resources/rest-api-spec/api/bulk.json | 4 + .../opensearch/action/bulk/BulkRequest.java | 30 +- .../action/bulk/TransportBulkAction.java | 3 +- .../common/metrics/OperationMetrics.java | 30 ++ .../opensearch/ingest/CompoundProcessor.java | 113 +++++ .../ingest/IngestDocumentWrapper.java | 42 ++ .../org/opensearch/ingest/IngestService.java | 409 +++++++++++++++++- .../java/org/opensearch/ingest/Pipeline.java | 25 ++ .../java/org/opensearch/ingest/Processor.java | 40 ++ .../rest/action/document/RestBulkAction.java | 1 + .../bulk/TransportBulkActionIngestTests.java | 28 +- .../ingest/CompoundProcessorTests.java | 212 +++++++++ .../ingest/IngestDocumentPreparer.java | 32 ++ .../ingest/IngestDocumentWrapperTests.java | 46 ++ .../opensearch/ingest/IngestServiceTests.java | 359 ++++++++++++++- .../org/opensearch/ingest/ProcessorTests.java | 74 ++++ 18 files changed, 1486 insertions(+), 50 deletions(-) create mode 100644 server/src/main/java/org/opensearch/ingest/IngestDocumentWrapper.java create mode 100644 server/src/test/java/org/opensearch/ingest/IngestDocumentPreparer.java create mode 100644 server/src/test/java/org/opensearch/ingest/IngestDocumentWrapperTests.java create mode 100644 server/src/test/java/org/opensearch/ingest/ProcessorTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index a320e9a97aabc..c952cec16feec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225)) - [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386)) - [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276)) +- [Batch Ingestion] Add `batch_size` to `_bulk` API. ([#12457](https://github.com/opensearch-project/OpenSearch/issues/12457)) - [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992)) ### Dependencies diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml index d7be48a92908c..edb7b77eb8d28 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml @@ -167,3 +167,90 @@ teardown: index: test_index id: test_id3 - match: { _source: {"f1": "v2", "f2": 47, "field1": "value1"}} + +--- +"Test bulk API with batch enabled happy case": + - skip: + version: " - 2.13.99" + reason: "Added in 2.14.0" + + - do: + bulk: + refresh: true + batch_size: 2 + pipeline: "pipeline1" + body: + - '{"index": {"_index": "test_index", "_id": "test_id1"}}' + - '{"text": "text1"}' + - '{"index": {"_index": "test_index", "_id": "test_id2"}}' + - '{"text": "text2"}' + - '{"index": {"_index": "test_index", "_id": "test_id3"}}' + - '{"text": "text3"}' + - '{"index": {"_index": "test_index", "_id": "test_id4"}}' + - '{"text": "text4"}' + - '{"index": {"_index": "test_index", "_id": "test_id5", "pipeline": "pipeline2"}}' + - '{"text": "text5"}' + - '{"index": {"_index": "test_index", "_id": "test_id6", "pipeline": "pipeline2"}}' + - '{"text": "text6"}' + + - match: { errors: false } + + - do: + get: + index: test_index + id: test_id5 + - match: { _source: {"text": "text5", "field2": "value2"}} + + - do: + get: + index: test_index + id: test_id3 + - match: { _source: { "text": "text3", "field1": "value1" } } + +--- +"Test bulk API with batch_size missing": + - skip: + version: " - 2.13.99" + reason: "Added in 2.14.0" + + - do: + bulk: + refresh: true + pipeline: "pipeline1" + body: + - '{"index": {"_index": "test_index", "_id": "test_id1"}}' + - '{"text": "text1"}' + - '{"index": {"_index": "test_index", "_id": "test_id2"}}' + - '{"text": "text2"}' + + - match: { errors: false } + + - do: + get: + index: test_index + id: test_id1 + - match: { _source: { "text": "text1", "field1": "value1" } } + + - do: + get: + index: test_index + id: test_id2 + - match: { _source: { "text": "text2", "field1": "value1" } } + +--- +"Test bulk API with invalid batch_size": + - skip: + version: " - 2.13.99" + reason: "Added in 2.14.0" + + - do: + catch: bad_request + bulk: + refresh: true + batch_size: -1 + pipeline: "pipeline1" + body: + - '{"index": {"_index": "test_index", "_id": "test_id1"}}' + - '{"text": "text1"}' + - '{"index": {"_index": "test_index", "_id": "test_id2"}}' + - '{"text": "text2"}' diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json index bb066cd131480..e0566b811ff07 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json @@ -74,6 +74,10 @@ "require_alias": { "type": "boolean", "description": "Sets require_alias for all incoming documents. Defaults to unset (false)" + }, + "batch_size": { + "type": "int", + "description": "Sets the batch size" } }, "body":{ diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java index 47abd0337fcf9..7614206cd226f 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java @@ -34,6 +34,7 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; +import org.opensearch.Version; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.CompositeIndicesRequest; @@ -80,7 +81,6 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkRequest.class); private static final int REQUEST_OVERHEAD = 50; - /** * Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and * {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare @@ -96,6 +96,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques private String globalRouting; private String globalIndex; private Boolean globalRequireAlias; + private int batchSize = 1; private long sizeInBytes = 0; @@ -107,6 +108,9 @@ public BulkRequest(StreamInput in) throws IOException { requests.addAll(in.readList(i -> DocWriteRequest.readDocumentRequest(null, i))); refreshPolicy = RefreshPolicy.readFrom(in); timeout = in.readTimeValue(); + if (in.getVersion().onOrAfter(Version.V_2_14_0)) { + batchSize = in.readInt(); + } } public BulkRequest(@Nullable String globalIndex) { @@ -346,6 +350,27 @@ public final BulkRequest timeout(TimeValue timeout) { return this; } + /** + * Set batch size + * @param size batch size from input + * @return {@link BulkRequest} + */ + public BulkRequest batchSize(int size) { + if (size < 1) { + throw new IllegalArgumentException("batch_size must be greater than 0"); + } + this.batchSize = size; + return this; + } + + /** + * Get batch size + * @return batch size + */ + public int batchSize() { + return this.batchSize; + } + /** * Note for internal callers (NOT high level rest client), * the global parameter setting is ignored when used with: @@ -453,6 +478,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeCollection(requests, DocWriteRequest::writeDocumentRequest); refreshPolicy.writeTo(out); out.writeTimeValue(timeout); + if (out.getVersion().onOrAfter(Version.V_2_14_0)) { + out.writeInt(batchSize); + } } @Override diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 9796afe28f8a8..6a8ede6cc2d58 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -928,7 +928,8 @@ public boolean isForceExecution() { } }, bulkRequestModifier::markItemAsDropped, - executorName + executorName, + original ); } diff --git a/server/src/main/java/org/opensearch/common/metrics/OperationMetrics.java b/server/src/main/java/org/opensearch/common/metrics/OperationMetrics.java index 97fbbc2ce5cde..71c4a29f0f610 100644 --- a/server/src/main/java/org/opensearch/common/metrics/OperationMetrics.java +++ b/server/src/main/java/org/opensearch/common/metrics/OperationMetrics.java @@ -37,6 +37,14 @@ public void before() { current.incrementAndGet(); } + /** + * Invoke before the given operation begins in multiple items at the same time. + * @param n number of items + */ + public void beforeN(int n) { + current.addAndGet(n); + } + /** * Invoked upon completion (success or failure) of the given operation * @param currentTime elapsed time of the operation @@ -46,6 +54,18 @@ public void after(long currentTime) { time.inc(currentTime); } + /** + * Invoked upon completion (success or failure) of the given operation for multiple items. + * @param n number of items completed + * @param currentTime elapsed time of the operation + */ + public void afterN(int n, long currentTime) { + current.addAndGet(-n); + for (int i = 0; i < n; ++i) { + time.inc(currentTime); + } + } + /** * Invoked upon failure of the operation. */ @@ -53,6 +73,16 @@ public void failed() { failed.inc(); } + /** + * Invoked upon failure of the operation on multiple items. + * @param n number of items on operation. + */ + public void failedN(int n) { + for (int i = 0; i < n; ++i) { + failed.inc(); + } + } + public void add(OperationMetrics other) { // Don't try copying over current, since in-flight requests will be linked to the existing metrics instance. failed.inc(other.failed.count()); diff --git a/server/src/main/java/org/opensearch/ingest/CompoundProcessor.java b/server/src/main/java/org/opensearch/ingest/CompoundProcessor.java index a5f4870029e87..64d71691bf818 100644 --- a/server/src/main/java/org/opensearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/opensearch/ingest/CompoundProcessor.java @@ -39,10 +39,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -150,6 +153,108 @@ public void execute(IngestDocument ingestDocument, BiConsumer ingestDocumentWrappers, Consumer> handler) { + innerBatchExecute(0, ingestDocumentWrappers, handler); + } + + /** + * Internal logic to process documents with current processor. + * + * @param currentProcessor index of processor to process batched documents + * @param ingestDocumentWrappers batched documents to be processed + * @param handler callback function + */ + void innerBatchExecute( + int currentProcessor, + List ingestDocumentWrappers, + Consumer> handler + ) { + if (currentProcessor == processorsWithMetrics.size()) { + handler.accept(ingestDocumentWrappers); + return; + } + Tuple processorWithMetric = processorsWithMetrics.get(currentProcessor); + final Processor processor = processorWithMetric.v1(); + final OperationMetrics metric = processorWithMetric.v2(); + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + int size = ingestDocumentWrappers.size(); + metric.beforeN(size); + // Use synchronization to ensure batches are processed by processors in sequential order + AtomicInteger counter = new AtomicInteger(size); + List allResults = Collections.synchronizedList(new ArrayList<>()); + Map slotToWrapperMap = createSlotIngestDocumentWrapperMap(ingestDocumentWrappers); + processor.batchExecute(ingestDocumentWrappers, results -> { + if (results.isEmpty()) return; + allResults.addAll(results); + // counter equals to 0 means all documents are processed and called back. + if (counter.addAndGet(-results.size()) == 0) { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metric.afterN(allResults.size(), ingestTimeInMillis); + + List documentsDropped = new ArrayList<>(); + List documentsWithException = new ArrayList<>(); + List documentsToContinue = new ArrayList<>(); + int totalFailed = 0; + // iterate all results to categorize them to: to continue, to drop, with exception + for (IngestDocumentWrapper resultDocumentWrapper : allResults) { + IngestDocumentWrapper originalDocumentWrapper = slotToWrapperMap.get(resultDocumentWrapper.getSlot()); + if (resultDocumentWrapper.getException() != null) { + ++totalFailed; + if (ignoreFailure) { + documentsToContinue.add(originalDocumentWrapper); + } else { + IngestProcessorException compoundProcessorException = newCompoundProcessorException( + resultDocumentWrapper.getException(), + processor, + originalDocumentWrapper.getIngestDocument() + ); + documentsWithException.add( + new IngestDocumentWrapper( + resultDocumentWrapper.getSlot(), + originalDocumentWrapper.getIngestDocument(), + compoundProcessorException + ) + ); + } + } else { + if (resultDocumentWrapper.getIngestDocument() == null) { + documentsDropped.add(resultDocumentWrapper); + } else { + documentsToContinue.add(resultDocumentWrapper); + } + } + } + if (totalFailed > 0) { + metric.failedN(totalFailed); + } + if (!documentsDropped.isEmpty()) { + handler.accept(documentsDropped); + } + if (!documentsToContinue.isEmpty()) { + innerBatchExecute(currentProcessor + 1, documentsToContinue, handler); + } + if (!documentsWithException.isEmpty()) { + if (onFailureProcessors.isEmpty()) { + handler.accept(documentsWithException); + } else { + documentsWithException.forEach( + doc -> executeOnFailureAsync( + 0, + doc.getIngestDocument(), + (IngestProcessorException) doc.getException(), + (result, ex) -> { + handler.accept(Collections.singletonList(new IngestDocumentWrapper(doc.getSlot(), result, ex))); + } + ) + ); + } + } + } + assert counter.get() >= 0; + }); + } + void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer handler) { if (currentProcessor == processorsWithMetrics.size()) { handler.accept(ingestDocument, null); @@ -266,4 +371,12 @@ static IngestProcessorException newCompoundProcessorException(Exception e, Proce return exception; } + private Map createSlotIngestDocumentWrapperMap(List ingestDocumentWrappers) { + Map slotIngestDocumentWrapperMap = new HashMap<>(); + for (IngestDocumentWrapper ingestDocumentWrapper : ingestDocumentWrappers) { + slotIngestDocumentWrapperMap.put(ingestDocumentWrapper.getSlot(), ingestDocumentWrapper); + } + return slotIngestDocumentWrapperMap; + } + } diff --git a/server/src/main/java/org/opensearch/ingest/IngestDocumentWrapper.java b/server/src/main/java/org/opensearch/ingest/IngestDocumentWrapper.java new file mode 100644 index 0000000000000..6fb9f245f4996 --- /dev/null +++ b/server/src/main/java/org/opensearch/ingest/IngestDocumentWrapper.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +/** + * A IngestDocument wrapper including the slot of the IngestDocument in original IndexRequests. + * It also stores the exception happened during ingest process of the document. + */ +public final class IngestDocumentWrapper { + private final int slot; + private IngestDocument ingestDocument; + private Exception exception; + + public IngestDocumentWrapper(int slot, IngestDocument ingestDocument, Exception ex) { + this.slot = slot; + this.ingestDocument = ingestDocument; + this.exception = ex; + } + + public int getSlot() { + return this.slot; + } + + public IngestDocument getIngestDocument() { + return this.ingestDocument; + } + + public Exception getException() { + return this.exception; + } + + public void update(IngestDocument result, Exception ex) { + this.ingestDocument = result; + this.exception = ex; + } +} diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 2d4439e86461b..ab8e823199447 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -39,6 +39,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.ResourceNotFoundException; import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.TransportBulkAction; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.ingest.DeletePipelineRequest; @@ -93,6 +94,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.IntConsumer; +import java.util.stream.Collectors; /** * Holder class for several ingest related services. @@ -511,9 +513,9 @@ public void executeBulkRequest( BiConsumer onFailure, BiConsumer onCompletion, IntConsumer onDropped, - String executorName + String executorName, + BulkRequest originalBulkRequest ) { - threadPool.executor(executorName).execute(new AbstractRunnable() { @Override @@ -523,6 +525,12 @@ public void onFailure(Exception e) { @Override protected void doRun() { + int batchSize = originalBulkRequest.batchSize(); + if (shouldExecuteBulkRequestInBatch(originalBulkRequest.requests().size(), batchSize)) { + runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest); + return; + } + final Thread originalThread = Thread.currentThread(); final AtomicInteger counter = new AtomicInteger(numberOfActionRequests); int i = 0; @@ -536,7 +544,6 @@ protected void doRun() { i++; continue; } - final String pipelineId = indexRequest.getPipeline(); indexRequest.setPipeline(NOOP_PIPELINE_NAME); final String finalPipelineId = indexRequest.getFinalPipeline(); @@ -571,13 +578,286 @@ protected void doRun() { onCompletion, originalThread ); - i++; } } }); } + private void runBulkRequestInBatch( + int numberOfActionRequests, + Iterable> actionRequests, + BiConsumer onFailure, + BiConsumer onCompletion, + IntConsumer onDropped, + BulkRequest originalBulkRequest + ) { + + final Thread originalThread = Thread.currentThread(); + final AtomicInteger counter = new AtomicInteger(numberOfActionRequests); + int i = 0; + List indexRequestWrappers = new ArrayList<>(); + for (DocWriteRequest actionRequest : actionRequests) { + IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); + if (indexRequest == null) { + if (counter.decrementAndGet() == 0) { + onCompletion.accept(originalThread, null); + } + assert counter.get() >= 0; + i++; + continue; + } + + final String pipelineId = indexRequest.getPipeline(); + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + final String finalPipelineId = indexRequest.getFinalPipeline(); + indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); + boolean hasFinalPipeline = true; + final List pipelines; + if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false + && IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { + pipelines = Arrays.asList(pipelineId, finalPipelineId); + } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { + pipelines = Collections.singletonList(pipelineId); + hasFinalPipeline = false; + } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { + pipelines = Collections.singletonList(finalPipelineId); + } else { + if (counter.decrementAndGet() == 0) { + onCompletion.accept(originalThread, null); + } + assert counter.get() >= 0; + i++; + continue; + } + + indexRequestWrappers.add(new IndexRequestWrapper(i, indexRequest, pipelines, hasFinalPipeline)); + i++; + } + + int batchSize = originalBulkRequest.batchSize(); + List> batches = prepareBatches(batchSize, indexRequestWrappers); + logger.debug("batchSize: {}, batches: {}", batchSize, batches.size()); + + for (List batch : batches) { + executePipelinesInBatchRequests( + batch.stream().map(IndexRequestWrapper::getSlot).collect(Collectors.toList()), + batch.get(0).getPipelines().iterator(), + batch.get(0).isHasFinalPipeline(), + batch.stream().map(IndexRequestWrapper::getIndexRequest).collect(Collectors.toList()), + onDropped, + onFailure, + counter, + onCompletion, + originalThread + ); + } + } + + private boolean shouldExecuteBulkRequestInBatch(int documentSize, int batchSize) { + return documentSize > 1 && batchSize > 1; + } + + /** + * IndexRequests are grouped by unique (index + pipeline_ids) before batching. + * Only IndexRequests in the same group could be batched. It's to ensure batched documents always + * flow through the same pipeline together. + * + * An IndexRequest could be preprocessed by at most two pipelines: default_pipeline and final_pipeline. + * A final_pipeline is configured on index level. The default_pipeline for a IndexRequest in a _bulk API + * could come from three places: + * 1. bound with index + * 2. a request parameter of _bulk API + * 3. a parameter of an IndexRequest. + */ + static List> prepareBatches(int batchSize, List indexRequestWrappers) { + final Map> indexRequestsPerIndexAndPipelines = new HashMap<>(); + for (IndexRequestWrapper indexRequestWrapper : indexRequestWrappers) { + // IndexRequests are grouped by their index + pipeline ids + List indexAndPipelineIds = new ArrayList<>(); + String index = indexRequestWrapper.getIndexRequest().index(); + List pipelines = indexRequestWrapper.getPipelines(); + indexAndPipelineIds.add(index); + indexAndPipelineIds.addAll(pipelines); + int hashCode = indexAndPipelineIds.hashCode(); + indexRequestsPerIndexAndPipelines.putIfAbsent(hashCode, new ArrayList<>()); + indexRequestsPerIndexAndPipelines.get(hashCode).add(indexRequestWrapper); + } + List> batchedIndexRequests = new ArrayList<>(); + for (Map.Entry> indexRequestsPerKey : indexRequestsPerIndexAndPipelines.entrySet()) { + for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += batchSize) { + batchedIndexRequests.add( + new ArrayList<>( + indexRequestsPerKey.getValue().subList(i, i + Math.min(batchSize, indexRequestsPerKey.getValue().size() - i)) + ) + ); + } + } + return batchedIndexRequests; + } + + /* visible for testing */ + static final class IndexRequestWrapper { + private final int slot; + private final IndexRequest indexRequest; + private final List pipelines; + private final boolean hasFinalPipeline; + + IndexRequestWrapper(int slot, IndexRequest indexRequest, List pipelines, boolean hasFinalPipeline) { + this.slot = slot; + this.indexRequest = indexRequest; + this.pipelines = pipelines; + this.hasFinalPipeline = hasFinalPipeline; + } + + public int getSlot() { + return slot; + } + + public IndexRequest getIndexRequest() { + return indexRequest; + } + + public List getPipelines() { + return pipelines; + } + + public boolean isHasFinalPipeline() { + return hasFinalPipeline; + } + } + + private void executePipelinesInBatchRequests( + final List slots, + final Iterator pipelineIterator, + final boolean hasFinalPipeline, + final List indexRequests, + final IntConsumer onDropped, + final BiConsumer onFailure, + final AtomicInteger counter, + final BiConsumer onCompletion, + final Thread originalThread + ) { + if (indexRequests.size() == 1) { + executePipelines( + slots.get(0), + pipelineIterator, + hasFinalPipeline, + indexRequests.get(0), + onDropped, + onFailure, + counter, + onCompletion, + originalThread + ); + return; + } + while (pipelineIterator.hasNext()) { + final String pipelineId = pipelineIterator.next(); + try { + PipelineHolder holder = pipelines.get(pipelineId); + if (holder == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } + Pipeline pipeline = holder.pipeline; + String originalIndex = indexRequests.get(0).indices()[0]; + Map slotIndexRequestMap = createSlotIndexRequestMap(slots, indexRequests); + innerBatchExecute(slots, indexRequests, pipeline, onDropped, results -> { + for (int i = 0; i < results.size(); ++i) { + if (results.get(i).getException() != null) { + IndexRequest indexRequest = slotIndexRequestMap.get(results.get(i).getSlot()); + logger.debug( + () -> new ParameterizedMessage( + "failed to execute pipeline [{}] for document [{}/{}]", + pipelineId, + indexRequest.index(), + indexRequest.id() + ), + results.get(i).getException() + ); + onFailure.accept(slots.get(i), results.get(i).getException()); + } + } + + Iterator newPipelineIterator = pipelineIterator; + boolean newHasFinalPipeline = hasFinalPipeline; + // indexRequests are grouped for the same index and same pipelines + String newIndex = indexRequests.get(0).indices()[0]; + + // handle index change case + if (Objects.equals(originalIndex, newIndex) == false) { + if (hasFinalPipeline && pipelineIterator.hasNext() == false) { + totalMetrics.failed(); + for (int slot : slots) { + onFailure.accept( + slot, + new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index") + ); + } + } else { + // Drain old it so it's not looped over + pipelineIterator.forEachRemaining($ -> {}); + for (IndexRequest indexRequest : indexRequests) { + indexRequest.isPipelineResolved(false); + resolvePipelines(null, indexRequest, state.metadata()); + if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { + newPipelineIterator = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); + newHasFinalPipeline = true; + } else { + newPipelineIterator = Collections.emptyIterator(); + } + } + } + } + + if (newPipelineIterator.hasNext()) { + executePipelinesInBatchRequests( + slots, + newPipelineIterator, + newHasFinalPipeline, + indexRequests, + onDropped, + onFailure, + counter, + onCompletion, + originalThread + ); + } else { + if (counter.addAndGet(-results.size()) == 0) { + onCompletion.accept(originalThread, null); + } + assert counter.get() >= 0; + } + }); + } catch (Exception e) { + StringBuilder documentLogBuilder = new StringBuilder(); + for (int i = 0; i < indexRequests.size(); ++i) { + IndexRequest indexRequest = indexRequests.get(i); + documentLogBuilder.append(indexRequest.index()); + documentLogBuilder.append("/"); + documentLogBuilder.append(indexRequest.id()); + if (i < indexRequests.size() - 1) { + documentLogBuilder.append(", "); + } + onFailure.accept(slots.get(i), e); + } + logger.debug( + () -> new ParameterizedMessage( + "failed to execute pipeline [{}] for documents [{}]", + pipelineId, + documentLogBuilder.toString() + ), + e + ); + if (counter.addAndGet(-indexRequests.size()) == 0) { + onCompletion.accept(originalThread, null); + } + assert counter.get() >= 0; + break; + } + } + } + private void executePipelines( final int slot, final Iterator it, @@ -761,28 +1041,73 @@ private void innerExecute( itemDroppedHandler.accept(slot); handler.accept(null); } else { - Map metadataMap = ingestDocument.extractMetadata(); - // it's fine to set all metadata fields all the time, as ingest document holds their starting values - // before ingestion, which might also get modified during ingestion. - indexRequest.index((String) metadataMap.get(IngestDocument.Metadata.INDEX)); - indexRequest.id((String) metadataMap.get(IngestDocument.Metadata.ID)); - indexRequest.routing((String) metadataMap.get(IngestDocument.Metadata.ROUTING)); - indexRequest.version(((Number) metadataMap.get(IngestDocument.Metadata.VERSION)).longValue()); - if (metadataMap.get(IngestDocument.Metadata.VERSION_TYPE) != null) { - indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.Metadata.VERSION_TYPE))); - } - if (metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO) != null) { - indexRequest.setIfSeqNo(((Number) metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO)).longValue()); - } - if (metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM) != null) { - indexRequest.setIfPrimaryTerm(((Number) metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM)).longValue()); - } - indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType()); + updateIndexRequestWithIngestDocument(indexRequest, ingestDocument); handler.accept(null); } }); } + private void innerBatchExecute( + List slots, + List indexRequests, + Pipeline pipeline, + IntConsumer itemDroppedHandler, + Consumer> handler + ) { + if (pipeline.getProcessors().isEmpty()) { + handler.accept(null); + return; + } + + int size = indexRequests.size(); + long startTimeInNanos = System.nanoTime(); + // the pipeline specific stat holder may not exist and that is fine: + // (e.g. the pipeline may have been removed while we're ingesting a document + totalMetrics.beforeN(size); + List ingestDocumentWrappers = new ArrayList<>(); + Map slotToindexRequestMap = new HashMap<>(); + for (int i = 0; i < slots.size(); ++i) { + slotToindexRequestMap.put(slots.get(i), indexRequests.get(i)); + ingestDocumentWrappers.add(toIngestDocumentWrapper(slots.get(i), indexRequests.get(i))); + } + AtomicInteger counter = new AtomicInteger(size); + List allResults = Collections.synchronizedList(new ArrayList<>()); + pipeline.batchExecute(ingestDocumentWrappers, results -> { + if (results.isEmpty()) return; + allResults.addAll(results); + if (counter.addAndGet(-results.size()) == 0) { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); + totalMetrics.afterN(size, ingestTimeInMillis); + List succeeded = new ArrayList<>(); + List dropped = new ArrayList<>(); + List exceptions = new ArrayList<>(); + for (IngestDocumentWrapper result : allResults) { + if (result.getException() != null) { + exceptions.add(result); + } else if (result.getIngestDocument() == null) { + dropped.add(result); + } else { + succeeded.add(result); + } + } + if (!exceptions.isEmpty()) { + totalMetrics.failedN(exceptions.size()); + } else if (!dropped.isEmpty()) { + dropped.forEach(t -> itemDroppedHandler.accept(t.getSlot())); + } else { + for (IngestDocumentWrapper ingestDocumentWrapper : succeeded) { + updateIndexRequestWithIngestDocument( + slotToindexRequestMap.get(ingestDocumentWrapper.getSlot()), + ingestDocumentWrapper.getIngestDocument() + ); + } + } + handler.accept(allResults); + } + assert counter.get() >= 0; + }); + } + @Override public void applyClusterState(final ClusterChangedEvent event) { state = event.state(); @@ -969,4 +1294,46 @@ static class PipelineHolder { } } + public static void updateIndexRequestWithIngestDocument(IndexRequest indexRequest, IngestDocument ingestDocument) { + Map metadataMap = ingestDocument.extractMetadata(); + // it's fine to set all metadata fields all the time, as ingest document holds their starting values + // before ingestion, which might also get modified during ingestion. + indexRequest.index((String) metadataMap.get(IngestDocument.Metadata.INDEX)); + indexRequest.id((String) metadataMap.get(IngestDocument.Metadata.ID)); + indexRequest.routing((String) metadataMap.get(IngestDocument.Metadata.ROUTING)); + indexRequest.version(((Number) metadataMap.get(IngestDocument.Metadata.VERSION)).longValue()); + if (metadataMap.get(IngestDocument.Metadata.VERSION_TYPE) != null) { + indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.Metadata.VERSION_TYPE))); + } + if (metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO) != null) { + indexRequest.setIfSeqNo(((Number) metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO)).longValue()); + } + if (metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM) != null) { + indexRequest.setIfPrimaryTerm(((Number) metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM)).longValue()); + } + indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType()); + } + + static IngestDocument toIngestDocument(IndexRequest indexRequest) { + return new IngestDocument( + indexRequest.index(), + indexRequest.id(), + indexRequest.routing(), + indexRequest.version(), + indexRequest.versionType(), + indexRequest.sourceAsMap() + ); + } + + private static IngestDocumentWrapper toIngestDocumentWrapper(int slot, IndexRequest indexRequest) { + return new IngestDocumentWrapper(slot, toIngestDocument(indexRequest), null); + } + + private static Map createSlotIndexRequestMap(List slots, List indexRequests) { + Map slotIndexRequestMap = new HashMap<>(); + for (int i = 0; i < slots.size(); ++i) { + slotIndexRequestMap.put(slots.get(i), indexRequests.get(i)); + } + return slotIndexRequestMap; + } } diff --git a/server/src/main/java/org/opensearch/ingest/Pipeline.java b/server/src/main/java/org/opensearch/ingest/Pipeline.java index 2541cfbf4af77..708416cfca3b7 100644 --- a/server/src/main/java/org/opensearch/ingest/Pipeline.java +++ b/server/src/main/java/org/opensearch/ingest/Pipeline.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.LongSupplier; /** @@ -201,4 +202,28 @@ public List flattenAllProcessors() { public OperationMetrics getMetrics() { return metrics; } + + /** + * Modifies the data of batched multiple documents to be indexed based on the processor this pipeline holds + *

+ * If {@code null} is returned then this document will be dropped and not indexed, otherwise + * this document will be kept and indexed. Document and the exception happened during processing are kept in + * IngestDocumentWrapper and callback to upper level. + * + * @param ingestDocumentWrappers a list of wrapped IngestDocument to ingest. + * @param handler callback with IngestDocument result and exception wrapped in IngestDocumentWrapper. + */ + public void batchExecute(List ingestDocumentWrappers, Consumer> handler) { + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + int size = ingestDocumentWrappers.size(); + metrics.beforeN(size); + compoundProcessor.batchExecute(ingestDocumentWrappers, results -> { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metrics.afterN(results.size(), ingestTimeInMillis); + + int failedCount = (int) results.stream().filter(t -> t.getException() != null).count(); + metrics.failedN(failedCount); + handler.accept(results); + }); + } } diff --git a/server/src/main/java/org/opensearch/ingest/Processor.java b/server/src/main/java/org/opensearch/ingest/Processor.java index ecae1c139ea5e..9af1104502047 100644 --- a/server/src/main/java/org/opensearch/ingest/Processor.java +++ b/server/src/main/java/org/opensearch/ingest/Processor.java @@ -33,6 +33,7 @@ package org.opensearch.ingest; import org.opensearch.client.Client; +import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.env.Environment; import org.opensearch.index.analysis.AnalysisRegistry; @@ -40,7 +41,10 @@ import org.opensearch.script.ScriptService; import org.opensearch.threadpool.Scheduler; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -81,6 +85,42 @@ default void execute(IngestDocument ingestDocument, BiConsumer ingestDocumentWrappers, Consumer> handler) { + if (ingestDocumentWrappers.isEmpty()) { + handler.accept(Collections.emptyList()); + return; + } + int size = ingestDocumentWrappers.size(); + AtomicInteger counter = new AtomicInteger(size); + AtomicArray results = new AtomicArray<>(size); + for (int i = 0; i < size; ++i) { + innerExecute(i, ingestDocumentWrappers.get(i), results, counter, handler); + } + } + + private void innerExecute( + int slot, + IngestDocumentWrapper ingestDocumentWrapper, + AtomicArray results, + AtomicInteger counter, + Consumer> handler + ) { + execute(ingestDocumentWrapper.getIngestDocument(), (doc, ex) -> { + results.set(slot, new IngestDocumentWrapper(ingestDocumentWrapper.getSlot(), doc, ex)); + if (counter.decrementAndGet() == 0) { + handler.accept(results.asList()); + } + }); + } + /** * Gets the type of a processor */ diff --git a/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java index b046146707885..0bc4234c9b8b8 100644 --- a/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java @@ -97,6 +97,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null); bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); + bulkRequest.batchSize(request.paramAsInt("batch_size", 1)); bulkRequest.add( request.requiredContent(), defaultIndex, diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index 141c630b94020..da9156ccdb71a 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -341,7 +341,8 @@ public void testIngestLocal() throws Exception { failureHandler.capture(), completionHandler.capture(), any(), - eq(Names.WRITE) + eq(Names.WRITE), + eq(bulkRequest) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -378,7 +379,8 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { failureHandler.capture(), completionHandler.capture(), any(), - eq(Names.WRITE) + eq(Names.WRITE), + any() ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -424,7 +426,8 @@ public void testIngestSystemLocal() throws Exception { failureHandler.capture(), completionHandler.capture(), any(), - eq(Names.SYSTEM_WRITE) + eq(Names.SYSTEM_WRITE), + eq(bulkRequest) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -455,7 +458,7 @@ public void testIngestForward() throws Exception { action.execute(null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -495,7 +498,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { singleItemBulkWriteAction.execute(null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -581,7 +584,8 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa failureHandler.capture(), completionHandler.capture(), any(), - eq(Names.WRITE) + eq(Names.WRITE), + eq(bulkRequest) ); assertEquals(indexRequest1.getPipeline(), "default_pipeline"); assertEquals(indexRequest2.getPipeline(), "default_pipeline"); @@ -624,7 +628,8 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { failureHandler.capture(), completionHandler.capture(), any(), - eq(Names.WRITE) + eq(Names.WRITE), + any() ); completionHandler.getValue().accept(null, exception); assertFalse(action.indexCreated); // still no index yet, the ingest node failed. @@ -711,7 +716,8 @@ public void testFindDefaultPipelineFromTemplateMatch() { failureHandler.capture(), completionHandler.capture(), any(), - eq(Names.WRITE) + eq(Names.WRITE), + any() ); } @@ -750,7 +756,8 @@ public void testFindDefaultPipelineFromV2TemplateMatch() { failureHandler.capture(), completionHandler.capture(), any(), - eq(Names.WRITE) + eq(Names.WRITE), + any() ); } @@ -775,7 +782,8 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { failureHandler.capture(), completionHandler.capture(), any(), - eq(Names.WRITE) + eq(Names.WRITE), + any() ); assertEquals(indexRequest.getPipeline(), "default_pipeline"); completionHandler.getValue().accept(null, exception); diff --git a/server/src/test/java/org/opensearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/opensearch/ingest/CompoundProcessorTests.java index 76301acac0c19..aad6063bd3f4d 100644 --- a/server/src/test/java/org/opensearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/opensearch/ingest/CompoundProcessorTests.java @@ -37,16 +37,23 @@ import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongSupplier; import static java.util.Collections.singletonList; +import static org.opensearch.ingest.IngestDocumentPreparer.SHOULD_FAIL_KEY; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -429,6 +436,211 @@ public String getType() { assertThat(ingestProcessorException.getHeader("pipeline_origin"), equalTo(Arrays.asList("2", "1"))); } + public void testBatchExecute_happyCase() { + List wrapperList = Arrays.asList( + IngestDocumentPreparer.createIngestDocumentWrapper(1), + IngestDocumentPreparer.createIngestDocumentWrapper(2), + IngestDocumentPreparer.createIngestDocumentWrapper(3) + ); + TestProcessor firstProcessor = new TestProcessor(doc -> {}); + TestProcessor secondProcessor = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + Arrays.asList(firstProcessor, secondProcessor), + null, + relativeTimeProvider + ); + + compoundProcessor.batchExecute(wrapperList, results -> { + assertEquals(firstProcessor.getInvokedCounter(), wrapperList.size()); + assertEquals(secondProcessor.getInvokedCounter(), wrapperList.size()); + assertEquals(results.size(), wrapperList.size()); + OperationStats stats = compoundProcessor.getProcessorsWithMetrics().get(0).v2().createStats(); + assertEquals(0, stats.getCurrent()); + assertEquals(3, stats.getCount()); + for (int i = 0; i < wrapperList.size(); ++i) { + assertEquals(wrapperList.get(i).getSlot(), results.get(i).getSlot()); + assertEquals(wrapperList.get(i).getIngestDocument(), results.get(i).getIngestDocument()); + assertEquals(wrapperList.get(i).getException(), results.get(i).getException()); + } + }); + } + + public void testBatchExecute_documentToDrop() { + List wrapperList = Arrays.asList( + IngestDocumentPreparer.createIngestDocumentWrapper(1), + IngestDocumentPreparer.createIngestDocumentWrapper(2, true), + IngestDocumentPreparer.createIngestDocumentWrapper(3) + ); + TestProcessor firstProcessor = new TestProcessor("", "", "", doc -> { + if (doc.hasField(SHOULD_FAIL_KEY) && doc.getFieldValue(SHOULD_FAIL_KEY, Boolean.class)) { + return null; + } + return doc; + }); + TestProcessor secondProcessor = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + Arrays.asList(firstProcessor, secondProcessor), + null, + relativeTimeProvider + ); + + AtomicInteger callCounter = new AtomicInteger(); + List totalResults = Collections.synchronizedList(new ArrayList<>()); + compoundProcessor.batchExecute(wrapperList, results -> { + totalResults.addAll(results); + if (callCounter.addAndGet(results.size()) == 3) { + assertEquals(firstProcessor.getInvokedCounter(), wrapperList.size()); + assertEquals(secondProcessor.getInvokedCounter(), wrapperList.size() - 1); + assertEquals(totalResults.size(), wrapperList.size()); + OperationStats stats = compoundProcessor.getProcessorsWithMetrics().get(0).v2().createStats(); + assertEquals(0, stats.getCurrent()); + assertEquals(3, stats.getCount()); + totalResults.sort(Comparator.comparingInt(IngestDocumentWrapper::getSlot)); + for (int i = 0; i < wrapperList.size(); ++i) { + assertEquals(wrapperList.get(i).getSlot(), totalResults.get(i).getSlot()); + if (2 == wrapperList.get(i).getSlot()) { + assertNull(totalResults.get(i).getIngestDocument()); + } else { + assertEquals(wrapperList.get(i).getIngestDocument(), totalResults.get(i).getIngestDocument()); + } + assertEquals(wrapperList.get(i).getException(), totalResults.get(i).getException()); + } + } + }); + } + + public void testBatchExecute_ignoreFailure() { + List wrapperList = Arrays.asList( + IngestDocumentPreparer.createIngestDocumentWrapper(1), + IngestDocumentPreparer.createIngestDocumentWrapper(2, true), + IngestDocumentPreparer.createIngestDocumentWrapper(3, true) + ); + TestProcessor firstProcessor = new TestProcessor(doc -> { + if (doc.hasField(SHOULD_FAIL_KEY) && doc.getFieldValue(SHOULD_FAIL_KEY, Boolean.class)) { + throw new RuntimeException("fail"); + } + }); + TestProcessor secondProcessor = new TestProcessor(doc -> {}); + TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", null, doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + CompoundProcessor compoundProcessor = new CompoundProcessor( + true, + Arrays.asList(firstProcessor, secondProcessor), + singletonList(onFailureProcessor), + relativeTimeProvider + ); + + compoundProcessor.batchExecute(wrapperList, results -> { + assertEquals(firstProcessor.getInvokedCounter(), wrapperList.size()); + assertEquals(secondProcessor.getInvokedCounter(), wrapperList.size()); + assertEquals(0, onFailureProcessor.getInvokedCounter()); + assertEquals(results.size(), wrapperList.size()); + OperationStats stats = compoundProcessor.getProcessorsWithMetrics().get(0).v2().createStats(); + assertEquals(0, stats.getCurrent()); + assertEquals(3, stats.getCount()); + for (int i = 0; i < wrapperList.size(); ++i) { + assertEquals(wrapperList.get(i).getSlot(), results.get(i).getSlot()); + assertEquals(wrapperList.get(i).getIngestDocument(), results.get(i).getIngestDocument()); + assertEquals(wrapperList.get(i).getException(), results.get(i).getException()); + } + }); + } + + public void testBatchExecute_exception_no_onFailureProcessor() { + Set failureSlot = new HashSet<>(Arrays.asList(2, 3)); + List wrapperList = Arrays.asList( + IngestDocumentPreparer.createIngestDocumentWrapper(1), + IngestDocumentPreparer.createIngestDocumentWrapper(2, true), + IngestDocumentPreparer.createIngestDocumentWrapper(3, true) + ); + TestProcessor firstProcessor = new TestProcessor(doc -> { + if (doc.hasField(SHOULD_FAIL_KEY) && doc.getFieldValue(SHOULD_FAIL_KEY, Boolean.class)) { + throw new RuntimeException("fail"); + } + }); + TestProcessor secondProcessor = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + Arrays.asList(firstProcessor, secondProcessor), + Collections.emptyList(), + relativeTimeProvider + ); + + AtomicInteger callCounter = new AtomicInteger(); + List totalResults = Collections.synchronizedList(new ArrayList<>()); + compoundProcessor.batchExecute(wrapperList, results -> { + totalResults.addAll(results); + if (callCounter.incrementAndGet() == 3) { + assertEquals(wrapperList.size(), firstProcessor.getInvokedCounter()); + assertEquals(1, secondProcessor.getInvokedCounter()); + assertEquals(totalResults.size(), wrapperList.size()); + OperationStats stats = compoundProcessor.getProcessorsWithMetrics().get(0).v2().createStats(); + assertEquals(0, stats.getCurrent()); + assertEquals(3, stats.getCount()); + assertEquals(2, stats.getFailedCount()); + totalResults.sort(Comparator.comparingInt(IngestDocumentWrapper::getSlot)); + for (int i = 0; i < wrapperList.size(); ++i) { + assertEquals(wrapperList.get(i).getSlot(), totalResults.get(i).getSlot()); + if (failureSlot.contains(wrapperList.get(i).getSlot())) { + assertNotNull(totalResults.get(i).getException()); + } else { + assertEquals(wrapperList.get(i).getIngestDocument(), totalResults.get(i).getIngestDocument()); + assertEquals(wrapperList.get(i).getException(), totalResults.get(i).getException()); + } + } + } + }); + } + + public void testBatchExecute_exception_with_onFailureProcessor() { + List wrapperList = Arrays.asList( + IngestDocumentPreparer.createIngestDocumentWrapper(1), + IngestDocumentPreparer.createIngestDocumentWrapper(2, true), + IngestDocumentPreparer.createIngestDocumentWrapper(3, true) + ); + TestProcessor firstProcessor = new TestProcessor(doc -> { + if (doc.hasField(SHOULD_FAIL_KEY) && doc.getFieldValue(SHOULD_FAIL_KEY, Boolean.class)) { + throw new RuntimeException("fail"); + } + }); + TestProcessor secondProcessor = new TestProcessor(doc -> {}); + TestProcessor onFailureProcessor = new TestProcessor("id2", "on_failure", null, doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + Arrays.asList(firstProcessor, secondProcessor), + singletonList(onFailureProcessor), + relativeTimeProvider + ); + + AtomicInteger callCounter = new AtomicInteger(); + List totalResults = Collections.synchronizedList(new ArrayList<>()); + compoundProcessor.batchExecute(wrapperList, results -> { + totalResults.addAll(results); + if (callCounter.incrementAndGet() == 3) { + assertEquals(wrapperList.size(), firstProcessor.getInvokedCounter()); + assertEquals(1, secondProcessor.getInvokedCounter()); + assertEquals(2, onFailureProcessor.getInvokedCounter()); + assertEquals(totalResults.size(), wrapperList.size()); + OperationStats stats = compoundProcessor.getProcessorsWithMetrics().get(0).v2().createStats(); + assertEquals(0, stats.getCurrent()); + assertEquals(3, stats.getCount()); + assertEquals(2, stats.getFailedCount()); + totalResults.sort(Comparator.comparingInt(IngestDocumentWrapper::getSlot)); + for (int i = 0; i < wrapperList.size(); ++i) { + assertEquals(wrapperList.get(i).getSlot(), totalResults.get(i).getSlot()); + assertEquals(wrapperList.get(i).getIngestDocument(), totalResults.get(i).getIngestDocument()); + assertNull(totalResults.get(i).getException()); + } + } + }); + } + private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) { assertStats(0, compoundProcessor, 0L, count, failed, time); } diff --git a/server/src/test/java/org/opensearch/ingest/IngestDocumentPreparer.java b/server/src/test/java/org/opensearch/ingest/IngestDocumentPreparer.java new file mode 100644 index 0000000000000..a02595df5589d --- /dev/null +++ b/server/src/test/java/org/opensearch/ingest/IngestDocumentPreparer.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import java.util.HashMap; +import java.util.Map; + +public class IngestDocumentPreparer { + public static final String SHOULD_FAIL_KEY = "shouldFail"; + + public static IngestDocument createIngestDocument(boolean shouldFail) { + Map source = new HashMap<>(); + if (shouldFail) { + source.put(SHOULD_FAIL_KEY, true); + } + return new IngestDocument(source, new HashMap<>()); + } + + public static IngestDocumentWrapper createIngestDocumentWrapper(int slot) { + return createIngestDocumentWrapper(slot, false); + } + + public static IngestDocumentWrapper createIngestDocumentWrapper(int slot, boolean shouldFail) { + return new IngestDocumentWrapper(slot, createIngestDocument(shouldFail), null); + } +} diff --git a/server/src/test/java/org/opensearch/ingest/IngestDocumentWrapperTests.java b/server/src/test/java/org/opensearch/ingest/IngestDocumentWrapperTests.java new file mode 100644 index 0000000000000..9d09cd80abd05 --- /dev/null +++ b/server/src/test/java/org/opensearch/ingest/IngestDocumentWrapperTests.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import org.opensearch.index.VersionType; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; + +public class IngestDocumentWrapperTests extends OpenSearchTestCase { + + private IngestDocument ingestDocument; + + private static final String INDEX = "index"; + private static final String ID = "id"; + private static final String ROUTING = "routing"; + private static final Long VERSION = 1L; + private static final VersionType VERSION_TYPE = VersionType.INTERNAL; + private static final String DOCUMENT_KEY = "foo"; + private static final String DOCUMENT_VALUE = "bar"; + private static final int SLOT = 12; + + @Before + public void setup() throws Exception { + super.setUp(); + Map document = new HashMap<>(); + document.put(DOCUMENT_KEY, DOCUMENT_VALUE); + ingestDocument = new IngestDocument(INDEX, ID, ROUTING, VERSION, VERSION_TYPE, document); + } + + public void testIngestDocumentWrapper() { + Exception ex = new RuntimeException("runtime exception"); + IngestDocumentWrapper wrapper = new IngestDocumentWrapper(SLOT, ingestDocument, ex); + assertEquals(wrapper.getSlot(), SLOT); + assertEquals(wrapper.getException(), ex); + assertEquals(wrapper.getIngestDocument(), ingestDocument); + } +} diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 2edfe87387c92..6d216370bae9a 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -116,6 +116,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -132,6 +133,7 @@ public Map getProcessors(Processor.Parameters paramet }; private ThreadPool threadPool; + private BulkRequest mockBulkRequest; @Before public void setup() { @@ -139,6 +141,8 @@ public void setup() { ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); when(threadPool.generic()).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService); + mockBulkRequest = mock(BulkRequest.class); + lenient().when(mockBulkRequest.batchSize()).thenReturn(1); } public void testIngestPlugin() { @@ -210,7 +214,8 @@ public void testExecuteIndexPipelineDoesNotExist() { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + new BulkRequest() ); assertTrue(failure.get()); @@ -761,7 +766,8 @@ public String getType() { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + bulkRequest ); assertTrue(failure.get()); @@ -807,7 +813,8 @@ public void testExecuteBulkPipelineDoesNotExist() { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + bulkRequest ); verify(failureHandler, times(1)).accept( argThat((Integer item) -> item == 2), @@ -843,7 +850,8 @@ public void testExecuteSuccess() { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -874,7 +882,8 @@ public void testExecuteEmptyPipeline() throws Exception { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -933,7 +942,8 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); @@ -977,7 +987,8 @@ public void testExecuteFailure() throws Exception { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); @@ -1035,7 +1046,8 @@ public void testExecuteSuccessWithOnFailure() throws Exception { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1084,7 +1096,8 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); @@ -1146,7 +1159,8 @@ public void testBulkRequestExecutionWithFailures() throws Exception { requestItemErrorHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + bulkRequest ); verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(o -> o.getCause().equals(error))); @@ -1204,7 +1218,8 @@ public void testBulkRequestExecution() throws Exception { requestItemErrorHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + bulkRequest ); verify(requestItemErrorHandler, never()).accept(any(), any()); @@ -1272,7 +1287,8 @@ public void testStats() throws Exception { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); @@ -1296,7 +1312,8 @@ public void testStats() throws Exception { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); @@ -1325,7 +1342,8 @@ public void testStats() throws Exception { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); @@ -1358,7 +1376,8 @@ public void testStats() throws Exception { failureHandler, completionHandler, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); @@ -1456,7 +1475,8 @@ public String getDescription() { failureHandler, completionHandler, dropHandler, - Names.WRITE + Names.WRITE, + bulkRequest ); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); @@ -1543,7 +1563,8 @@ public void testCBORParsing() throws Exception { (integer, e) -> {}, (thread, e) -> {}, indexReq -> {}, - Names.WRITE + Names.WRITE, + mockBulkRequest ); } @@ -1672,6 +1693,283 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { } } + public void testExecuteBulkRequestInBatch() { + CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) + ); + createPipeline("_id", ingestService); + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest2); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_none").setFinalPipeline("_id"); + bulkRequest.add(indexRequest3); + IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest4); + bulkRequest.batchSize(2); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 4, + bulkRequest.requests(), + failureHandler, + completionHandler, + indexReq -> {}, + Names.WRITE, + bulkRequest + ); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(mockCompoundProcessor, times(2)).batchExecute(any(), any()); + verify(mockCompoundProcessor, never()).execute(any(), any()); + } + + public void testExecuteBulkRequestInBatchWithDefaultAndFinalPipeline() { + CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) + ); + ClusterState clusterState = createPipeline("_id", ingestService); + createPipeline("_final", ingestService, clusterState); + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_final"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_final"); + bulkRequest.add(indexRequest2); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_id").setFinalPipeline("_final"); + bulkRequest.add(indexRequest3); + IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_final"); + bulkRequest.add(indexRequest4); + bulkRequest.batchSize(2); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 4, + bulkRequest.requests(), + failureHandler, + completionHandler, + indexReq -> {}, + Names.WRITE, + bulkRequest + ); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(mockCompoundProcessor, times(4)).batchExecute(any(), any()); + verify(mockCompoundProcessor, never()).execute(any(), any()); + } + + public void testExecuteBulkRequestInBatchFallbackWithOneDocument() { + CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) + ); + createPipeline("_id", ingestService); + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + bulkRequest.batchSize(2); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 1, + bulkRequest.requests(), + failureHandler, + completionHandler, + indexReq -> {}, + Names.WRITE, + bulkRequest + ); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(mockCompoundProcessor, never()).batchExecute(any(), any()); + verify(mockCompoundProcessor, times(1)).execute(any(), any()); + } + + public void testExecuteBulkRequestInBatchNoValidPipeline() { + CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) + ); + createPipeline("_id", ingestService); + BulkRequest bulkRequest = new BulkRequest(); + // will not be handled as not valid document type + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") + .source(emptyMap()) + .setPipeline("_none") + .setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") + .source(emptyMap()) + .setPipeline("_none") + .setFinalPipeline("_none"); + bulkRequest.add(indexRequest2); + bulkRequest.batchSize(2); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 2, + bulkRequest.requests(), + failureHandler, + completionHandler, + indexReq -> {}, + Names.WRITE, + bulkRequest + ); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(mockCompoundProcessor, never()).batchExecute(any(), any()); + verify(mockCompoundProcessor, never()).execute(any(), any()); + } + + public void testExecuteBulkRequestInBatchNoValidDocument() { + CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) + ); + createPipeline("_id", ingestService); + BulkRequest bulkRequest = new BulkRequest(); + // will not be handled as not valid document type + bulkRequest.add(new DeleteRequest("_index", "_id")); + bulkRequest.add(new DeleteRequest("_index", "_id")); + bulkRequest.batchSize(2); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 2, + bulkRequest.requests(), + failureHandler, + completionHandler, + indexReq -> {}, + Names.WRITE, + bulkRequest + ); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(mockCompoundProcessor, never()).batchExecute(any(), any()); + verify(mockCompoundProcessor, never()).execute(any(), any()); + } + + public void testExecuteBulkRequestInBatchWithException() { + CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) + ); + doThrow(new RuntimeException()).when(mockCompoundProcessor).batchExecute(any(), any()); + createPipeline("_id", ingestService); + BulkRequest bulkRequest = new BulkRequest(); + // will not be handled as not valid document type + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest2); + bulkRequest.batchSize(2); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 2, + bulkRequest.requests(), + failureHandler, + completionHandler, + indexReq -> {}, + Names.WRITE, + bulkRequest + ); + verify(failureHandler, times(2)).accept(any(), any()); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(mockCompoundProcessor, times(1)).batchExecute(any(), any()); + verify(mockCompoundProcessor, never()).execute(any(), any()); + } + + public void testExecuteBulkRequestInBatchWithExceptionInCallback() { + CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); + IngestService ingestService = createWithProcessors( + Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) + ); + createPipeline("_id", ingestService); + BulkRequest bulkRequest = new BulkRequest(); + // will not be handled as not valid document type + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest2); + bulkRequest.batchSize(2); + + List results = Arrays.asList( + new IngestDocumentWrapper(0, IngestService.toIngestDocument(indexRequest1), null), + new IngestDocumentWrapper(1, null, new RuntimeException()) + ); + doAnswer(args -> { + @SuppressWarnings("unchecked") + Consumer> handler = (Consumer) args.getArguments()[1]; + handler.accept(results); + return null; + }).when(mockCompoundProcessor).batchExecute(any(), any()); + + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest( + 2, + bulkRequest.requests(), + failureHandler, + completionHandler, + indexReq -> {}, + Names.WRITE, + bulkRequest + ); + verify(failureHandler, times(1)).accept(any(), any()); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(mockCompoundProcessor, times(1)).batchExecute(any(), any()); + verify(mockCompoundProcessor, never()).execute(any(), any()); + } + + public void testPrepareBatches_same_index_pipeline() { + IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); + IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); + IngestService.IndexRequestWrapper wrapper3 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); + IngestService.IndexRequestWrapper wrapper4 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); + List> batches = IngestService.prepareBatches( + 2, + Arrays.asList(wrapper1, wrapper2, wrapper3, wrapper4) + ); + assertEquals(2, batches.size()); + for (int i = 0; i < 2; ++i) { + assertEquals(2, batches.get(i).size()); + } + } + + public void testPrepareBatches_different_index_pipeline() { + IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); + IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index2", Collections.singletonList("p1")); + IngestService.IndexRequestWrapper wrapper3 = createIndexRequestWrapper("index1", Arrays.asList("p1", "p2")); + IngestService.IndexRequestWrapper wrapper4 = createIndexRequestWrapper("index1", Collections.singletonList("p2")); + List> batches = IngestService.prepareBatches( + 2, + Arrays.asList(wrapper1, wrapper2, wrapper3, wrapper4) + ); + assertEquals(4, batches.size()); + } + + private IngestService.IndexRequestWrapper createIndexRequestWrapper(String index, List pipelines) { + IndexRequest indexRequest = new IndexRequest(index); + return new IngestService.IndexRequestWrapper(0, indexRequest, pipelines, true); + } + private IngestDocument eqIndexTypeId(final Map source) { return argThat(new IngestDocumentMatcher("_index", "_type", "_id", -3L, VersionType.INTERNAL, source)); } @@ -1718,6 +2016,13 @@ private CompoundProcessor mockCompoundProcessor() { handler.accept((IngestDocument) args.getArguments()[0], null); return null; }).when(processor).execute(any(), any()); + + doAnswer(args -> { + @SuppressWarnings("unchecked") + Consumer> handler = (Consumer) args.getArguments()[1]; + handler.accept((List) args.getArguments()[0]); + return null; + }).when(processor).batchExecute(any(), any()); return processor; } @@ -1757,4 +2062,24 @@ private void assertStats(OperationStats stats, long count, long failed, long tim private OperationStats getPipelineStats(List pipelineStats, String id) { return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); } + + private ClusterState createPipeline(String pipeline, IngestService ingestService) { + return createPipeline(pipeline, ingestService, null); + } + + private ClusterState createPipeline(String pipeline, IngestService ingestService, ClusterState previousState) { + PutPipelineRequest putRequest = new PutPipelineRequest( + pipeline, + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + MediaTypeRegistry.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + if (previousState != null) { + clusterState = previousState; + } + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + return clusterState; + } } diff --git a/server/src/test/java/org/opensearch/ingest/ProcessorTests.java b/server/src/test/java/org/opensearch/ingest/ProcessorTests.java new file mode 100644 index 0000000000000..d6ef3be73adb8 --- /dev/null +++ b/server/src/test/java/org/opensearch/ingest/ProcessorTests.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.opensearch.ingest.IngestDocumentPreparer.SHOULD_FAIL_KEY; + +public class ProcessorTests extends OpenSearchTestCase { + private Processor processor; + private static final String FIELD_KEY = "result"; + private static final String FIELD_VALUE_PROCESSED = "processed"; + + @Before + public void setup() {} + + public void test_batchExecute_success() { + processor = new FakeProcessor("type", "tag", "description", doc -> { doc.setFieldValue(FIELD_KEY, FIELD_VALUE_PROCESSED); }); + List wrapperList = Arrays.asList( + IngestDocumentPreparer.createIngestDocumentWrapper(1), + IngestDocumentPreparer.createIngestDocumentWrapper(2), + IngestDocumentPreparer.createIngestDocumentWrapper(3) + ); + processor.batchExecute(wrapperList, results -> { + assertEquals(3, results.size()); + for (IngestDocumentWrapper wrapper : results) { + assertNull(wrapper.getException()); + assertEquals(FIELD_VALUE_PROCESSED, wrapper.getIngestDocument().getFieldValue(FIELD_KEY, String.class)); + } + }); + } + + public void test_batchExecute_empty() { + processor = new FakeProcessor("type", "tag", "description", doc -> { doc.setFieldValue(FIELD_KEY, FIELD_VALUE_PROCESSED); }); + processor.batchExecute(Collections.emptyList(), results -> { assertEquals(0, results.size()); }); + } + + public void test_batchExecute_exception() { + processor = new FakeProcessor("type", "tag", "description", doc -> { + if (doc.hasField(SHOULD_FAIL_KEY) && doc.getFieldValue(SHOULD_FAIL_KEY, Boolean.class)) { + throw new RuntimeException("fail"); + } + doc.setFieldValue(FIELD_KEY, FIELD_VALUE_PROCESSED); + }); + List wrapperList = Arrays.asList( + IngestDocumentPreparer.createIngestDocumentWrapper(1), + IngestDocumentPreparer.createIngestDocumentWrapper(2, true), + IngestDocumentPreparer.createIngestDocumentWrapper(3) + ); + processor.batchExecute(wrapperList, results -> { + assertEquals(3, results.size()); + for (IngestDocumentWrapper wrapper : results) { + if (wrapper.getSlot() == 2) { + assertNotNull(wrapper.getException()); + assertNull(wrapper.getIngestDocument()); + } else { + assertNull(wrapper.getException()); + assertEquals(FIELD_VALUE_PROCESSED, wrapper.getIngestDocument().getFieldValue(FIELD_KEY, String.class)); + } + } + }); + } +}