diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index d56a2731d35b9..681b167c828e1 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; import org.elasticsearch.ingest.WrappingProcessor; import org.elasticsearch.script.ScriptService; @@ -65,29 +67,46 @@ boolean isIgnoreMissing() { } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing); if (values == null) { if (ignoreMissing) { - return ingestDocument; + handler.accept(ingestDocument, null); + } else { + handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.")); } - throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."); + } else { + List newValues = new CopyOnWriteArrayList<>(); + innerExecute(0, values, newValues, ingestDocument, handler); } - List newValues = new ArrayList<>(values.size()); - IngestDocument document = ingestDocument; - for (Object value : values) { - Object previousValue = ingestDocument.getIngestMetadata().put("_value", value); - try { - document = processor.execute(document); - if (document == null) { - return null; - } - } finally { - newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue)); - } + } + + void innerExecute(int index, List values, List newValues, IngestDocument document, + BiConsumer handler) { + if (index == values.size()) { + document.setFieldValue(field, new ArrayList<>(newValues)); + handler.accept(document, null); + return; } - document.setFieldValue(field, newValues); - return document; + + Object value = values.get(index); + Object previousValue = document.getIngestMetadata().put("_value", value); + processor.execute(document, (result, e) -> { + if (e != null) { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + handler.accept(null, e); + } else if (result == null) { + handler.accept(null, null); + } else { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + innerExecute(index + 1, values, newValues, document, handler); + } + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } @Override diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 282994d8eb354..a4ee786315c03 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -53,7 +53,7 @@ public void testExecute() throws Exception { "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), false ); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); @@ -73,12 +73,9 @@ public void testExecuteWithFailure() throws Exception { } }); ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); - try { - processor.execute(ingestDocument); - fail("exception expected"); - } catch (RuntimeException e) { - assertThat(e.getMessage(), equalTo("failure")); - } + Exception[] exceptions = new Exception[1]; + processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;}); + assertThat(exceptions[0].getMessage(), equalTo("failure")); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c"))); @@ -95,7 +92,7 @@ public void testExecuteWithFailure() throws Exception { "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), false ); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("A", "B", "c"))); } @@ -114,7 +111,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); assertThat(ingestDocument.getFieldValue("values.0.index", String.class), equalTo("_index")); @@ -142,7 +139,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), (model) -> model.get("other")), false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("values.1.new_field", String.class), equalTo("value")); @@ -180,7 +177,7 @@ public String getTag() { ); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.size(), equalTo(numValues)); @@ -205,7 +202,7 @@ public void testModifyFieldsOutsideArray() throws Exception { Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) ), false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.get(0), equalTo("STRING")); @@ -231,7 +228,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); - forEachProcessor.execute(ingestDocument); + forEachProcessor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.get(0), equalTo("new_value")); @@ -264,7 +261,7 @@ public void testNestedForEach() throws Exception { ); ForEachProcessor processor = new ForEachProcessor( "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); assertThat(result.get(0), equalTo("ABC")); @@ -282,7 +279,7 @@ public void testIgnoreMissing() throws Exception { IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); TestProcessor testProcessor = new TestProcessor(doc -> {}); ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertIngestDocument(originalIngestDocument, ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(0)); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index afa3b911d6d03..8b305bc980bd5 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.bulk; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SparseFixedBitSet; import org.elasticsearch.Assertions; @@ -57,6 +59,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; @@ -82,6 +85,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -644,14 +648,13 @@ private long relativeTime() { } void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener listener) { - long ingestStartTimeInNanos = System.nanoTime(); - BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - ingestService.executeBulkRequest(() -> bulkRequestModifier, - (indexRequest, exception) -> { - logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", - indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); - bulkRequestModifier.markCurrentItemAsFailed(exception); - }, (exception) -> { + final long ingestStartTimeInNanos = System.nanoTime(); + final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); + ingestService.executeBulkRequest( + original.numberOfActions(), + () -> bulkRequestModifier, + bulkRequestModifier::markItemAsFailed, + (originalThread, exception) -> { if (exception != null) { logger.error("failed to execute pipeline for a bulk request", exception); listener.onFailure(exception); @@ -666,26 +669,56 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen // (this will happen if pre-processing all items in the bulk failed) actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); } else { - doExecute(task, bulkRequest, actionListener); + // If a processor went async and returned a response on a different thread then + // before we continue the bulk request we should fork back on a write thread: + if (originalThread == Thread.currentThread()) { + assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE); + doExecute(task, bulkRequest, actionListener); + } else { + threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + doExecute(task, bulkRequest, actionListener); + } + + @Override + public boolean isForceExecution() { + // If we fork back to a write thread we **not** should fail, because tp queue is full. + // (Otherwise the work done during ingest will be lost) + // It is okay to force execution here. Throttling of write requests happens prior to + // ingest when a node receives a bulk request. + return true; + } + }); + } } } }, - indexRequest -> bulkRequestModifier.markCurrentItemAsDropped()); + bulkRequestModifier::markItemAsDropped + ); } static final class BulkRequestModifier implements Iterator> { + private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class); + final BulkRequest bulkRequest; final SparseFixedBitSet failedSlots; final List itemResponses; + final AtomicIntegerArray originalSlots; - int currentSlot = -1; - int[] originalSlots; + volatile int currentSlot = -1; BulkRequestModifier(BulkRequest bulkRequest) { this.bulkRequest = bulkRequest; this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size()); this.itemResponses = new ArrayList<>(bulkRequest.requests().size()); + this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok } @Override @@ -709,12 +742,11 @@ BulkRequest getBulkRequest() { int slot = 0; List> requests = bulkRequest.requests(); - originalSlots = new int[requests.size()]; // oversize, but that's ok for (int i = 0; i < requests.size(); i++) { DocWriteRequest request = requests.get(i); if (failedSlots.get(i) == false) { modifiedBulkRequest.add(request); - originalSlots[slot++] = i; + originalSlots.set(slot++, i); } } return modifiedBulkRequest; @@ -729,7 +761,7 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> { BulkItemResponse[] items = response.getItems(); for (int i = 0; i < items.length; i++) { - itemResponses.add(originalSlots[i], response.getItems()[i]); + itemResponses.add(originalSlots.get(i), response.getItems()[i]); } delegatedListener.onResponse( new BulkResponse( @@ -738,12 +770,12 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, } } - void markCurrentItemAsDropped() { - IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); - failedSlots.set(currentSlot); + synchronized void markItemAsDropped(int slot) { + IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); + failedSlots.set(slot); final String id = indexRequest.id() == null ? DROPPED_ITEM_WITH_AUTO_GENERATED_ID : indexRequest.id(); itemResponses.add( - new BulkItemResponse(currentSlot, indexRequest.opType(), + new BulkItemResponse(slot, indexRequest.opType(), new UpdateResponse( new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0), indexRequest.type(), id, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, @@ -753,16 +785,19 @@ void markCurrentItemAsDropped() { ); } - void markCurrentItemAsFailed(Exception e) { - IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); + synchronized void markItemAsFailed(int slot, Exception e) { + IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); + LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", + indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e); + // We hit a error during preprocessing a request, so we: // 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed // 2) Add a bulk item failure for this request // 3) Continue with the next request in the bulk. - failedSlots.set(currentSlot); + failedSlots.set(slot); BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e); - itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure)); + itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure)); } } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 8ed2ffa5ba028..070e99cc5c775 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -26,8 +26,10 @@ import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; @@ -41,38 +43,42 @@ class SimulateExecutionService { this.threadPool = threadPool; } - SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { + void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose, + BiConsumer handler) { if (verbose) { - List processorResultList = new ArrayList<>(); + List processorResultList = new CopyOnWriteArrayList<>(); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); - try { - Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), - verbosePipelineProcessor); - ingestDocument.executePipeline(verbosePipeline); - return new SimulateDocumentVerboseResult(processorResultList); - } catch (Exception e) { - return new SimulateDocumentVerboseResult(processorResultList); - } + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline, (result, e) -> { + handler.accept(new SimulateDocumentVerboseResult(processorResultList), e); + }); } else { - try { - IngestDocument result = pipeline.execute(ingestDocument); - return new SimulateDocumentBaseResult(result); - } catch (Exception e) { - return new SimulateDocumentBaseResult(e); - } + pipeline.execute(ingestDocument, (result, e) -> { + if (e == null) { + handler.accept(new SimulateDocumentBaseResult(result), null); + } else { + handler.accept(new SimulateDocumentBaseResult(e), null); + } + }); } } public void execute(SimulatePipelineRequest.Parsed request, ActionListener listener) { threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> { - List responses = new ArrayList<>(); - for (IngestDocument ingestDocument : request.getDocuments()) { - SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()); + final AtomicInteger counter = new AtomicInteger(); + final List responses = new CopyOnWriteArrayList<>(); + for (IngestDocument ingestDocument : request.getDocuments()) { + executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> { if (response != null) { responses.add(response); } - } - l.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); + if (counter.incrementAndGet() == request.getDocuments().size()) { + l.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), + request.isVerbose(), responses)); + } + }); + } })); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index a095d7647d90f..cf75ead37354d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -114,58 +115,78 @@ public String getTag() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - for (Tuple processorWithMetric : processorsWithMetrics) { - Processor processor = processorWithMetric.v1(); - IngestMetric metric = processorWithMetric.v2(); - long startTimeInNanos = relativeTimeProvider.getAsLong(); - try { - metric.preIngest(); - if (processor.execute(ingestDocument) == null) { - return null; - } - } catch (Exception e) { + throw new UnsupportedOperationException("this method should not get executed"); + } + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + innerExecute(0, ingestDocument, handler); + } + + void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer handler) { + if (currentProcessor == processorsWithMetrics.size()) { + handler.accept(ingestDocument, null); + return; + } + + Tuple processorWithMetric = processorsWithMetrics.get(currentProcessor); + final Processor processor = processorWithMetric.v1(); + final IngestMetric metric = processorWithMetric.v2(); + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + metric.preIngest(); + processor.execute(ingestDocument, (result, e) -> { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metric.postIngest(ingestTimeInMillis); + + if (e != null) { metric.ingestFailed(); if (ignoreFailure) { - continue; - } - - ElasticsearchException compoundProcessorException = - newCompoundProcessorException(e, processor.getType(), processor.getTag()); - if (onFailureProcessors.isEmpty()) { - throw compoundProcessorException; + innerExecute(currentProcessor + 1, ingestDocument, handler); } else { - if (executeOnFailure(ingestDocument, compoundProcessorException) == false) { - return null; + ElasticsearchException compoundProcessorException = + newCompoundProcessorException(e, processor.getType(), processor.getTag()); + if (onFailureProcessors.isEmpty()) { + handler.accept(null, compoundProcessorException); + } else { + executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler); } - break; } - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metric.postIngest(ingestTimeInMillis); + } else { + if (result != null) { + innerExecute(currentProcessor + 1, result, handler); + } else { + handler.accept(null, null); + } } - } - return ingestDocument; + }); } - /** - * @return true if execution should continue, false if document is dropped. - */ - boolean executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { - try { + void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestDocument, ElasticsearchException exception, + BiConsumer handler) { + if (currentOnFailureProcessor == 0) { putFailureMetadata(ingestDocument, exception); - for (Processor processor : onFailureProcessors) { - try { - if (processor.execute(ingestDocument) == null) { - return false; - } - } catch (Exception e) { - throw newCompoundProcessorException(e, processor.getType(), processor.getTag()); - } - } - } finally { + } + + if (currentOnFailureProcessor == onFailureProcessors.size()) { removeFailureMetadata(ingestDocument); + handler.accept(ingestDocument, null); + return; } - return true; + + final Processor onFailureProcessor = onFailureProcessors.get(currentOnFailureProcessor); + onFailureProcessor.execute(ingestDocument, (result, e) -> { + if (e != null) { + removeFailureMetadata(ingestDocument); + handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag())); + return; + } + if (result == null) { + removeFailureMetadata(ingestDocument); + handler.accept(null, null); + return; + } + executeOnFailureAsync(currentOnFailureProcessor + 1, ingestDocument, exception, handler); + }); } private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) { diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index e0a054df2db5a..63dd14f7b4ab8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -66,21 +67,28 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { if (evaluate(ingestDocument)) { - long startTimeInNanos = relativeTimeProvider.getAsLong(); - try { - metric.preIngest(); - return processor.execute(ingestDocument); - } catch (Exception e) { - metric.ingestFailed(); - throw e; - } finally { + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + metric.preIngest(); + processor.execute(ingestDocument, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); metric.postIngest(ingestTimeInMillis); - } + if (e != null) { + metric.ingestFailed(); + handler.accept(null, e); + } else { + handler.accept(result, null); + } + }); + } else { + handler.accept(ingestDocument, null); } - return ingestDocument; + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } boolean evaluate(IngestDocument ingestDocument) { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 90ebc8e074108..34299cb475a27 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.BiConsumer; /** * Represents a single document being captured before indexing and holds the source and metadata (like id, type and index). @@ -641,17 +642,18 @@ private static Object deepCopy(Object value) { /** * Executes the given pipeline with for this document unless the pipeline has already been executed * for this document. - * @param pipeline Pipeline to execute - * @throws Exception On exception in pipeline execution + * + * @param pipeline the pipeline to execute + * @param handler handles the result or failure */ - public IngestDocument executePipeline(Pipeline pipeline) throws Exception { - try { - if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()); - } - return pipeline.execute(this); - } finally { - executedPipelines.remove(pipeline); + public void executePipeline(Pipeline pipeline, BiConsumer handler) { + if (executedPipelines.add(pipeline)) { + pipeline.execute(this, (result, e) -> { + executedPipelines.remove(pipeline); + handler.accept(result, e); + }); + } else { + handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId())); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index b9b6dce82acc4..243d8d7c3e5d2 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -63,8 +63,10 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.IntConsumer; /** * Holder class for several ingest related services. @@ -329,42 +331,72 @@ void validatePipeline(Map ingestInfos, PutPipelineReq ExceptionsHelper.rethrowAndSuppress(exceptions); } - public void executeBulkRequest(Iterable> actionRequests, - BiConsumer itemFailureHandler, Consumer completionHandler, - Consumer itemDroppedHandler) { + public void executeBulkRequest(int numberOfActionRequests, + Iterable> actionRequests, + BiConsumer itemFailureHandler, + BiConsumer completionHandler, + IntConsumer itemDroppedHandler) { threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - completionHandler.accept(e); + completionHandler.accept(null, e); } @Override protected void doRun() { + final Thread originalThread = Thread.currentThread(); + final AtomicInteger counter = new AtomicInteger(numberOfActionRequests); + int i = 0; for (DocWriteRequest actionRequest : actionRequests) { IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); if (indexRequest == null) { + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); + } + assert counter.get() >= 0; continue; } String pipelineId = indexRequest.getPipeline(); - if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - try { - PipelineHolder holder = pipelines.get(pipelineId); - if (holder == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + if (NOOP_PIPELINE_NAME.equals(pipelineId)) { + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); + } + assert counter.get() >= 0; + continue; + } + + final int slot = i; + try { + PipelineHolder holder = pipelines.get(pipelineId); + if (holder == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } + Pipeline pipeline = holder.pipeline; + innerExecute(slot, indexRequest, pipeline, itemDroppedHandler, e -> { + if (e == null) { + // this shouldn't be needed here but we do it for consistency with index api + // which requires it to prevent double execution + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + } else { + itemFailureHandler.accept(slot, e); + } + + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); } - Pipeline pipeline = holder.pipeline; - innerExecute(indexRequest, pipeline, itemDroppedHandler); - //this shouldn't be needed here but we do it for consistency with index api - // which requires it to prevent double execution - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - } catch (Exception e) { - itemFailureHandler.accept(indexRequest, e); + assert counter.get() >= 0; + }); + } catch (Exception e) { + itemFailureHandler.accept(slot, e); + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); } + assert counter.get() >= 0; } + i++; } - completionHandler.accept(null); } }); } @@ -420,26 +452,34 @@ static String getProcessorName(Processor processor){ return sb.toString(); } - private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { + private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline, IntConsumer itemDroppedHandler, + Consumer handler) { if (pipeline.getProcessors().isEmpty()) { + handler.accept(null); return; } long startTimeInNanos = System.nanoTime(); // the pipeline specific stat holder may not exist and that is fine: // (e.g. the pipeline may have been removed while we're ingesting a document - try { - totalMetrics.preIngest(); - String index = indexRequest.index(); - String type = indexRequest.type(); - String id = indexRequest.id(); - String routing = indexRequest.routing(); - Long version = indexRequest.version(); - VersionType versionType = indexRequest.versionType(); - Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); - if (pipeline.execute(ingestDocument) == null) { - itemDroppedHandler.accept(indexRequest); + totalMetrics.preIngest(); + String index = indexRequest.index(); + String type = indexRequest.type(); + String id = indexRequest.id(); + String routing = indexRequest.routing(); + Long version = indexRequest.version(); + VersionType versionType = indexRequest.versionType(); + Map sourceAsMap = indexRequest.sourceAsMap(); + IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); + pipeline.execute(ingestDocument, (result, e) -> { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); + totalMetrics.postIngest(ingestTimeInMillis); + if (e != null) { + totalMetrics.ingestFailed(); + handler.accept(e); + } else if (result == null) { + itemDroppedHandler.accept(slot); + handler.accept(null); } else { Map metadataMap = ingestDocument.extractMetadata(); //it's fine to set all metadata fields all the time, as ingest document holds their starting values @@ -453,14 +493,9 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); } indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType()); + handler.accept(null); } - } catch (Exception e) { - totalMetrics.ingestFailed(); - throw e; - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); - totalMetrics.postIngest(ingestTimeInMillis); - } + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 218713383227e..3d41d991f3e10 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import org.elasticsearch.script.ScriptService; @@ -93,18 +94,17 @@ public static Pipeline create(String id, Map config, * If null is returned then this document will be dropped and not indexed, otherwise * this document will be kept and indexed. */ - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - long startTimeInNanos = relativeTimeProvider.getAsLong(); - try { - metrics.preIngest(); - return compoundProcessor.execute(ingestDocument); - } catch (Exception e) { - metrics.ingestFailed(); - throw e; - } finally { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + metrics.preIngest(); + compoundProcessor.execute(ingestDocument, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); metrics.postIngest(ingestTimeInMillis); - } + if (e != null) { + metrics.ingestFailed(); + } + handler.accept(result, e); + }); } /** diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index b5794a3f76819..f5e37a1c1235e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest; import java.util.Map; +import java.util.function.BiConsumer; public class PipelineProcessor extends AbstractProcessor { @@ -36,12 +37,19 @@ private PipelineProcessor(String tag, String pipelineName, IngestService ingestS } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { Pipeline pipeline = ingestService.getPipeline(pipelineName); - if (pipeline == null) { - throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']'); + if (pipeline != null) { + ingestDocument.executePipeline(pipeline, handler); + } else { + handler.accept(null, + new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']')); } - return ingestDocument.executePipeline(pipeline); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } Pipeline getPipeline(){ diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 10bd530e3c1e7..029e80234e9e8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -27,6 +27,7 @@ import org.elasticsearch.threadpool.Scheduler; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.LongSupplier; @@ -38,6 +39,21 @@ */ public interface Processor { + /** + * Introspect and potentially modify the incoming data. + * + * Expert method: only override this method if a processor implementation needs to make an asynchronous call, + * otherwise just overwrite {@link #execute(IngestDocument)}. + */ + default void execute(IngestDocument ingestDocument, BiConsumer handler) { + try { + IngestDocument result = execute(ingestDocument); + handler.accept(result, null); + } catch (Exception e) { + handler.accept(null, e); + } + } + /** * Introspect and potentially modify the incoming data. * diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 39ae9edcb2eee..e9d4ea6b2ad4b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.BiConsumer; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -41,56 +42,76 @@ public final class TrackingResultProcessor implements Processor { } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - Processor processor = actualProcessor; - try { - if (processor instanceof ConditionalProcessor) { - ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor; - if (conditionalProcessor.evaluate(ingestDocument) == false) { - return ingestDocument; - } - if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) { - processor = conditionalProcessor.getInnerProcessor(); - } - } - if (processor instanceof PipelineProcessor) { - PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); - Pipeline pipeline = pipelineProcessor.getPipeline(); - //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines - try { - IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); - ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline()); - } catch (ElasticsearchException elasticsearchException) { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + if (actualProcessor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor); + Pipeline pipeline = pipelineProcessor.getPipeline(); + //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines + IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); + ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> { + // do nothing, let the tracking processors throw the exception while recording the path up to the failure + if (e instanceof ElasticsearchException) { + ElasticsearchException elasticsearchException = (ElasticsearchException) e; + //else do nothing, let the tracking processors throw the exception while recording the path up to the failure if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) { - throw elasticsearchException; + if (ignoreFailure) { + processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), + new IngestDocument(ingestDocument), e)); + } else { + processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), e)); + } + handler.accept(null, elasticsearchException); } - //else do nothing, let the tracking processors throw the exception while recording the path up to the failure - } catch (Exception e) { - // do nothing, let the tracking processors throw the exception while recording the path up to the failure + } else { + //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline, handler); } - //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); - Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), - verbosePipelineProcessor); - ingestDocument.executePipeline(verbosePipeline); + }); + return; + } + + final Processor processor; + if (actualProcessor instanceof ConditionalProcessor) { + ConditionalProcessor conditionalProcessor = (ConditionalProcessor) actualProcessor; + if (conditionalProcessor.evaluate(ingestDocument) == false) { + handler.accept(ingestDocument, null); + return; + } + if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) { + processor = conditionalProcessor.getInnerProcessor(); + } else { + processor = actualProcessor; + } + } else { + processor = actualProcessor; + } + + processor.execute(ingestDocument, (result, e) -> { + if (e != null) { + if (ignoreFailure) { + processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); + } else { + processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); + } + handler.accept(null, e); } else { - IngestDocument result = processor.execute(ingestDocument); if (result != null) { processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); + handler.accept(result, null); } else { processorResultList.add(new SimulateProcessorResult(processor.getTag())); - return null; + handler.accept(null, null); } } - } catch (Exception e) { - if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); - } else { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); - } - throw e; - } - return ingestDocument; + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException(); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java index 82ed518256169..5a168264a740f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java @@ -55,7 +55,7 @@ public void testBulkRequestModifier() { while (bulkRequestModifier.hasNext()) { bulkRequestModifier.next(); if (randomBoolean()) { - bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException()); + bulkRequestModifier.markItemAsFailed(i, new RuntimeException()); failedSlots.add(i); } i++; @@ -93,7 +93,7 @@ public void testPipelineFailures() { for (int i = 0; modifier.hasNext(); i++) { modifier.next(); if (i % 2 == 0) { - modifier.markCurrentItemAsFailed(new RuntimeException()); + modifier.markItemAsFailed(i, new RuntimeException()); } } @@ -102,7 +102,7 @@ public void testPipelineFailures() { assertThat(bulkRequest.requests().size(), Matchers.equalTo(16)); List responses = new ArrayList<>(); - ActionListener bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener() { + ActionListener bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener<>() { @Override public void onResponse(BulkResponse bulkItemResponses) { responses.addAll(Arrays.asList(bulkItemResponses.getItems())); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 1822ed75d6091..2690b5200d19a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -68,11 +68,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -94,6 +94,8 @@ public class TransportBulkActionIngestTests extends ESTestCase { private static final Settings SETTINGS = Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build(); + private static final Thread DUMMY_WRITE_THREAD = new Thread(ThreadPool.Names.WRITE); + /** Services needed by bulk action */ TransportService transportService; ClusterService clusterService; @@ -102,9 +104,9 @@ public class TransportBulkActionIngestTests extends ESTestCase { /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ @Captor - ArgumentCaptor> failureHandler; + ArgumentCaptor> failureHandler; @Captor - ArgumentCaptor> completionHandler; + ArgumentCaptor> completionHandler; @Captor ArgumentCaptor> remoteResponseHandler; @Captor @@ -266,15 +268,16 @@ public void testIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); - completionHandler.getValue().accept(exception); + verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), + failureHandler.capture(), completionHandler.capture(), any()); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success Iterator> req = bulkDocsItr.getValue().iterator(); - failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request + failureHandler.getValue().accept(0, exception); // have an exception for our one index request indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); @@ -300,13 +303,14 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); - completionHandler.getValue().accept(exception); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); @@ -332,7 +336,7 @@ public void testIngestForward() throws Exception { ActionTestUtils.execute(action, null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -376,7 +380,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -460,18 +464,19 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), + failureHandler.capture(), completionHandler.capture(), any()); assertEquals(indexRequest1.getPipeline(), "default_pipeline"); assertEquals(indexRequest2.getPipeline(), "default_pipeline"); assertEquals(indexRequest3.getPipeline(), "default_pipeline"); - completionHandler.getValue().accept(exception); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success of the transport bulk action indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); @@ -498,14 +503,15 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { assertFalse(action.indexCreated); // no index yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); - completionHandler.getValue().accept(exception); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); + completionHandler.getValue().accept(null, exception); assertFalse(action.indexCreated); // still no index yet, the ingest node failed. assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path. assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one @@ -562,7 +568,8 @@ public void testFindDefaultPipelineFromTemplateMatch(){ })); assertEquals("pipeline2", indexRequest.getPipeline()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); } private void validateDefaultPipeline(IndexRequest indexRequest) { @@ -584,14 +591,15 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); assertEquals(indexRequest.getPipeline(), "default_pipeline"); - completionHandler.getValue().accept(exception); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); diff --git a/server/src/test/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java b/server/src/test/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java new file mode 100644 index 0000000000000..fe753675e29c9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java @@ -0,0 +1,159 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.ingest; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.hamcrest.Matchers.equalTo; + +/** + * The purpose of this test is to verify that when a processor executes an operation asynchronously that + * the expected result is the same as if the same operation happens synchronously. + * + * In this test two test processor are defined that basically do the same operation, but a single processor + * executes asynchronously. The result of the operation should be the same and also the order in which the + * bulk responses are returned should be the same as how the corresponding index requests were defined. + */ +public class AsyncIngestProcessorIT extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(TestPlugin.class); + } + + public void testAsyncProcessorImplementation() { + // A pipeline with 2 processors: the test async processor and sync test processor. + BytesReference pipelineBody = new BytesArray("{\"processors\": [{\"test-async\": {}, \"test\": {}}]}"); + client().admin().cluster().putPipeline(new PutPipelineRequest("_id", pipelineBody, XContentType.JSON)).actionGet(); + + BulkRequest bulkRequest = new BulkRequest(); + int numDocs = randomIntBetween(8, 256); + for (int i = 0; i < numDocs; i++) { + bulkRequest.add(new IndexRequest("foobar") + .id(Integer.toString(i)) + .source("{}", XContentType.JSON) + .setPipeline("_id") + ); + } + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(numDocs)); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + assertThat(bulkResponse.getItems()[i].getId(), equalTo(id)); + GetResponse getResponse = client().get(new GetRequest("foobar", id)).actionGet(); + // The expected result of async test processor: + assertThat(getResponse.getSource().get("foo"), equalTo("bar-" + id)); + // The expected result of sync test processor: + assertThat(getResponse.getSource().get("bar"), equalTo("baz-" + id)); + } + } + + public static class TestPlugin extends Plugin implements IngestPlugin { + + private ThreadPool threadPool; + + @Override + public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, ScriptService scriptService, + NamedXContentRegistry xContentRegistry, Environment environment, + NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) { + this.threadPool = threadPool; + return List.of(); + } + + @Override + public Map getProcessors(Processor.Parameters parameters) { + return Map.of( + "test-async", (factories, tag, config) -> { + return new AbstractProcessor(tag) { + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + threadPool.generic().execute(() -> { + String id = (String) ingestDocument.getSourceAndMetadata().get("_id"); + if (usually()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // ignore + } + } + ingestDocument.setFieldValue("foo", "bar-" + id); + handler.accept(ingestDocument, null); + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public String getType() { + return "test-async"; + } + }; + }, + "test", (processorFactories, tag, config) -> { + return new AbstractProcessor(tag) { + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + String id = (String) ingestDocument.getSourceAndMetadata().get("_id"); + ingestDocument.setFieldValue("bar", "baz-" + id); + return ingestDocument; + } + + @Override + public String getType() { + return "test"; + } + }; + } + ); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index ac928d9194f80..a9496ec16fada 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -34,6 +34,8 @@ import java.util.Collections; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; @@ -66,7 +68,15 @@ public void destroy() { public void testExecuteVerboseItem() throws Exception { TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.getAcquire(); assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -91,7 +101,14 @@ public void testExecuteVerboseItem() throws Exception { public void testExecuteItem() throws Exception { TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.getAcquire(); assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; @@ -104,7 +121,14 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception { TestProcessor processor2 = new TestProcessor("processor_1", "mock", new RuntimeException("processor failed")); TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(processor3.getInvokedCounter(), equalTo(0)); @@ -131,7 +155,14 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception { Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2)), processor3)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); @@ -166,7 +197,14 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception TestProcessor testProcessor = new TestProcessor("processor_0", "mock", exception); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -183,7 +221,14 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { }); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -199,7 +244,14 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws public void testExecuteItemWithFailure() throws Exception { TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); }); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; @@ -210,12 +262,19 @@ public void testExecuteItemWithFailure() throws Exception { assertThat(exception.getMessage(), equalTo("java.lang.IllegalArgumentException: java.lang.RuntimeException: processor failed")); } - public void testDropDocument() { + public void testDropDocument() throws Exception { TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value")); Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of()); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; @@ -223,12 +282,19 @@ public void testDropDocument() { assertThat(simulateDocumentBaseResult.getFailure(), nullValue()); } - public void testDropDocumentVerbose() { + public void testDropDocumentVerbose() throws Exception { TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value")); Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of()); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -239,13 +305,20 @@ public void testDropDocumentVerbose() { assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue()); } - public void testDropDocumentVerboseExtraProcessor() { + public void testDropDocumentVerboseExtraProcessor() throws Exception { TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field1", "value")); Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of()); TestProcessor processor3 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value")); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor3.getInvokedCounter(), equalTo(0)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 575d5629b1a78..b3b8ee9762dc1 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -34,6 +34,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -52,7 +53,7 @@ public void testEmpty() throws Exception { CompoundProcessor processor = new CompoundProcessor(); assertThat(processor.getProcessors().isEmpty(), is(true)); assertThat(processor.getOnFailureProcessors().isEmpty(), is(true)); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); } public void testSingleProcessor() throws Exception { @@ -67,7 +68,7 @@ public void testSingleProcessor() throws Exception { assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor.getInvokedCounter(), equalTo(1)); assertStats(compoundProcessor, 1, 0, 1); @@ -82,12 +83,9 @@ public void testSingleProcessorWithException() throws Exception { assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); - try { - compoundProcessor.execute(ingestDocument); - fail("should throw exception"); - } catch (ElasticsearchException e) { - assertThat(e.getRootCause().getMessage(), equalTo("error")); - } + Exception[] holder = new Exception[1]; + compoundProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); + assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo("error")); assertThat(processor.getInvokedCounter(), equalTo(1)); assertStats(compoundProcessor, 1, 1, 0); @@ -100,7 +98,7 @@ public void testIgnoreFailure() throws Exception { when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertStats(0, compoundProcessor, 0, 1, 1, 0); assertThat(processor2.getInvokedCounter(), equalTo(1)); @@ -122,7 +120,7 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception { when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor1.getInvokedCounter(), equalTo(1)); @@ -154,7 +152,9 @@ public String getTag() { when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2), relativeTimeProvider); - assertNull(compoundProcessor.execute(ingestDocument)); + IngestDocument[] result = new IngestDocument[1]; + compoundProcessor.execute(ingestDocument, (r, e) -> result[0] = r); + assertThat(result[0], nullValue()); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertStats(compoundProcessor, 1, 1, 0); } @@ -182,7 +182,7 @@ public void testSingleProcessorWithNestedFailures() throws Exception { Collections.singletonList(lastProcessor), relativeTimeProvider); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); @@ -205,7 +205,7 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); @@ -231,7 +231,7 @@ public void testCompoundProcessorExceptionFail() throws Exception { CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); @@ -257,7 +257,7 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); @@ -272,7 +272,7 @@ public void testBreakOnFailure() throws Exception { when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), Collections.singletonList(onFailureProcessor), relativeTimeProvider); - pipeline.execute(ingestDocument); + pipeline.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index f484957d897f1..204a0bddbc339 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -98,7 +98,7 @@ public String getTag() { String falseValue = "falsy"; IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, falseValue); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); assertStats(processor, 0, 0, 0); @@ -106,13 +106,13 @@ public String getTag() { ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, falseValue); ingestDocument.setFieldValue("error", true); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertStats(processor, 0, 0, 0); //true, always call processor and increments metrics ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, trueValue); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); assertStats(processor, 1, 0, 1); @@ -121,7 +121,9 @@ public String getTag() { ingestDocument.setFieldValue(conditionalField, trueValue); ingestDocument.setFieldValue("error", true); IngestDocument finalIngestDocument = ingestDocument; - expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument)); + Exception holder[] = new Exception[1]; + processor.execute(finalIngestDocument, (result, e) -> {holder[0] = e;}); + assertThat(holder[0], instanceOf(RuntimeException.class)); assertStats(processor, 2, 1, 2); } @@ -177,7 +179,7 @@ public String getTag() { }, relativeTimeProvider); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertWarnings("[types removal] Looking up doc types [_type] in scripts is deprecated."); } @@ -213,7 +215,7 @@ private static void assertMutatingCtxThrows(Consumer> mutati ); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue("listField", new ArrayList<>()); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); Exception e = expectedException.get(); assertThat(e, instanceOf(UnsupportedOperationException.class)); assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 608816baf3010..ec521e4761497 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -73,6 +73,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.IntConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -86,6 +87,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; @@ -135,20 +137,20 @@ public void testExecuteIndexPipelineDoesNotExist() { final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final SetOnce failure = new SetOnce<>(); - final BiConsumer failureHandler = (request, e) -> { + final BiConsumer failureHandler = (slot, e) -> { failure.set(true); - assertThat(request, sameInstance(indexRequest)); + assertThat(slot, equalTo(0)); assertThat(e, instanceOf(IllegalArgumentException.class)); assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist")); }; @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); + final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testUpdatePipelines() { @@ -622,7 +624,7 @@ public String getType() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final SetOnce failure = new SetOnce<>(); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id); - final BiConsumer failureHandler = (request, e) -> { + final BiConsumer failureHandler = (slot, e) -> { assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); assertThat(e.getCause().getCause().getMessage(), equalTo("error")); @@ -630,17 +632,17 @@ public String getType() { }; @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); + final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteBulkPipelineDoesNotExist() { IngestService ingestService = createWithProcessors(Collections.singletonMap( - "mock", (factories, tag, config) -> mock(CompoundProcessor.class))); + "mock", (factories, tag, config) -> mockCompoundProcessor())); PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); @@ -657,15 +659,16 @@ public void testExecuteBulkPipelineDoesNotExist() { new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); bulkRequest.add(indexRequest2); @SuppressWarnings("unchecked") - BiConsumer failureHandler = mock(BiConsumer.class); + BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler, + completionHandler, indexReq -> {}); verify(failureHandler, times(1)).accept( - argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { + argThat(new CustomTypeSafeMatcher<>("failure handler was not called with the expected arguments") { @Override - protected boolean matchesSafely(IndexRequest item) { - return item == indexRequest2; + protected boolean matchesSafely(Integer item) { + return item == 1; } }), @@ -676,12 +679,12 @@ protected boolean matchesSafely(IllegalArgumentException iae) { } }) ); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteSuccess() { IngestService ingestService = createWithProcessors(Collections.singletonMap( - "mock", (factories, tag, config) -> mock(CompoundProcessor.class))); + "mock", (factories, tag, config) -> mockCompoundProcessor())); PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty @@ -690,12 +693,12 @@ public void testExecuteSuccess() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteEmptyPipeline() throws Exception { @@ -708,16 +711,16 @@ public void testExecuteEmptyPipeline() throws Exception { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecutePropagateAllMetaDataUpdates() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); + final CompoundProcessor processor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> processor)); PutPipelineRequest putRequest = new PutPipelineRequest("_id", @@ -739,17 +742,21 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); } } - return ingestDocument; - }).when(processor).execute(any()); + + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[1]; + handler.accept(ingestDocument, null); + return null; + }).when(processor).execute(any(), any()); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(processor).execute(any()); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); assertThat(indexRequest.index(), equalTo("update_index")); assertThat(indexRequest.type(), equalTo("update_type")); assertThat(indexRequest.id(), equalTo("update_id")); @@ -759,7 +766,7 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { } public void testExecuteFailure() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); + final CompoundProcessor processor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> processor)); PutPipelineRequest putRequest = new PutPipelineRequest("_id", @@ -771,22 +778,37 @@ public void testExecuteFailure() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()) .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); + verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteSuccessWithOnFailure() throws Exception { final Processor processor = mock(Processor.class); when(processor.getType()).thenReturn("mock_processor_type"); when(processor.getTag()).thenReturn("mock_processor_tag"); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, new RuntimeException()); + return null; + }).when(processor).execute(eqIndexTypeId(emptyMap()), any()); + final Processor onFailureProcessor = mock(Processor.class); + doAnswer(args -> { + IngestDocument ingestDocument = (IngestDocument) args.getArguments()[0]; + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(ingestDocument, null); + return null; + }).when(onFailureProcessor).execute(eqIndexTypeId(emptyMap()), any()); + final CompoundProcessor compoundProcessor = new CompoundProcessor( false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); IngestService ingestService = createWithProcessors(Collections.singletonMap( @@ -798,14 +820,13 @@ public void testExecuteSuccessWithOnFailure() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(emptyMap())); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); - verify(completionHandler, times(1)).accept(null); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class)); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteFailureWithNestedOnFailure() throws Exception { @@ -829,21 +850,21 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()) .when(onFailureOnFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); doThrow(new RuntimeException()) .when(onFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); doThrow(new RuntimeException()) .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); + verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testBulkRequestExecutionWithFailures() throws Exception { @@ -872,7 +893,12 @@ public void testBulkRequestExecutionWithFailures() throws Exception { CompoundProcessor processor = mock(CompoundProcessor.class); when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); Exception error = new RuntimeException(); - doThrow(error).when(processor).execute(any()); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, error); + return null; + }).when(processor).execute(any(), any()); IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> processor)); PutPipelineRequest putRequest = new PutPipelineRequest("_id", @@ -883,18 +909,18 @@ public void testBulkRequestExecutionWithFailures() throws Exception { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); - verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher() { + verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(new ArgumentMatcher() { @Override public boolean matches(final Object o) { return ((Exception)o).getCause().getCause().equals(error); } })); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testBulkRequestExecution() throws Exception { @@ -917,7 +943,12 @@ public void testBulkRequestExecution() throws Exception { final Processor processor = mock(Processor.class); when(processor.getType()).thenReturn("mock"); when(processor.getTag()).thenReturn("mockTag"); - when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null); + return null; + }).when(processor).execute(any(), any()); Map map = new HashMap<>(2); map.put("mock", (factories, tag, config) -> processor); @@ -930,13 +961,13 @@ public void testBulkRequestExecution() throws Exception { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); verify(requestItemErrorHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); for (DocWriteRequest docWriteRequest : bulkRequest.requests()) { IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(docWriteRequest); assertThat(indexRequest, notNullValue()); @@ -951,8 +982,18 @@ public void testStats() throws Exception { when(processor.getTag()).thenReturn("mockTag"); when(processorFailure.getType()).thenReturn("failure-mock"); //avoid returning null and dropping the document - when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); - when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error")); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null); + return null; + }).when(processor).execute(any(IngestDocument.class), any()); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, new RuntimeException("error")); + return null; + }).when(processorFailure).execute(any(IngestDocument.class), any()); Map map = new HashMap<>(2); map.put("mock", (factories, tag, config) -> processor); map.put("failure-mock", (factories, tag, config) -> processorFailure); @@ -974,13 +1015,13 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); + @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); @@ -998,7 +1039,7 @@ public void testStats() throws Exception { indexRequest.setPipeline("_id2"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -1017,7 +1058,7 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -1041,7 +1082,7 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -1107,15 +1148,15 @@ public String getTag() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); + final BiConsumer completionHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer dropHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler); + final IntConsumer dropHandler = mock(IntConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - verify(dropHandler, times(1)).accept(indexRequest); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(dropHandler, times(1)).accept(0); } public void testIngestClusterStateListeners_orderOfExecution() { @@ -1157,7 +1198,7 @@ public Map getProcessors(Processor.Parameters paramet } private IngestDocument eqIndexTypeId(final Map source) { - return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source)); + return argThat(new IngestDocumentMatcher("_index", "_type", "_id", -3L, VersionType.INTERNAL, source)); } private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map source) { @@ -1193,6 +1234,17 @@ public Map getProcessors(final Processor.Parameters p }), client); } + private CompoundProcessor mockCompoundProcessor() { + CompoundProcessor processor = mock(CompoundProcessor.class); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept((IngestDocument) args.getArguments()[0], null); + return null; + }).when(processor).execute(any(), any()); + return processor; + } + private class IngestDocumentMatcher extends ArgumentMatcher { private final IngestDocument ingestDocument; diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 0ad88c05ccc6e..4f36727c7ac30 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -64,7 +64,7 @@ public String getTag() { PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); Map config = new HashMap<>(); config.put("name", pipelineId); - factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument); + factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument, (result, e) -> {}); assertEquals(testIngestDocument, invoked.get()); } @@ -74,12 +74,11 @@ public void testThrowsOnMissingPipeline() throws Exception { PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); Map config = new HashMap<>(); config.put("name", "missingPipelineId"); - IllegalStateException e = expectThrows( - IllegalStateException.class, - () -> factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument) - ); + IllegalStateException[] e = new IllegalStateException[1]; + factory.create(Collections.emptyMap(), null, config) + .execute(testIngestDocument, (result, e1) -> e[0] = (IllegalStateException) e1); assertEquals( - "Pipeline processor configured for non-existent pipeline [missingPipelineId]", e.getMessage() + "Pipeline processor configured for non-existent pipeline [missingPipelineId]", e[0].getMessage() ); } @@ -104,12 +103,11 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer); when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); outerConfig.put("name", innerPipelineId); - ElasticsearchException e = expectThrows( - ElasticsearchException.class, - () -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument) - ); + ElasticsearchException[] e = new ElasticsearchException[1]; + factory.create(Collections.emptyMap(), null, outerConfig) + .execute(testIngestDocument, (result, e1) -> e[0] = (ElasticsearchException) e1); assertEquals( - "Cycle detected for pipeline: inner", e.getRootCause().getMessage() + "Cycle detected for pipeline: inner", e[0].getRootCause().getMessage() ); } @@ -125,8 +123,8 @@ innerPipelineId, null, null, new CompoundProcessor() ); when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig); - outerProc.execute(testIngestDocument); - outerProc.execute(testIngestDocument); + outerProc.execute(testIngestDocument, (result, e) -> {}); + outerProc.execute(testIngestDocument, (result, e) -> {}); } public void testPipelineProcessorWithPipelineChain() throws Exception { @@ -177,7 +175,7 @@ pipeline3Id, null, null, new CompoundProcessor( IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); //start the chain - ingestDocument.executePipeline(pipeline1); + ingestDocument.executePipeline(pipeline1, (result, e) -> {}); assertNotNull(ingestDocument.getSourceAndMetadata().get(key1)); //check the stats diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 53a5cd1d753c2..b7afb13deafac 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -66,7 +66,7 @@ public void init() { public void testActualProcessor() throws Exception { TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -84,12 +84,9 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception { CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - try { - trackingProcessor.execute(ingestDocument); - fail("processor should throw exception"); - } catch (ElasticsearchException e) { - assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); - } + Exception[] holder = new Exception[1]; + trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); + assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage())); SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); @@ -109,7 +106,7 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception { Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor)); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); @@ -148,7 +145,7 @@ public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { Collections.emptyList()); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); @@ -178,7 +175,7 @@ public void testActualCompoundProcessorWithFalseConditional() throws Exception { new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })); CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); //the step for key 2 is never executed due to conditional and thus not part of the result set @@ -221,7 +218,7 @@ pipelineId, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -287,7 +284,7 @@ pipelineId2, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -355,7 +352,7 @@ pipelineId2, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -407,7 +404,7 @@ pipelineId, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -457,7 +454,9 @@ public void testActualPipelineProcessorWithCycle() throws Exception { CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> trackingProcessor.execute(ingestDocument)); + Exception[] holder = new Exception[1]; + trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); + ElasticsearchException exception = (ElasticsearchException) holder[0]; assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(exception.getCause().getCause(), instanceOf(IllegalStateException.class)); assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1")); @@ -482,7 +481,7 @@ pipelineId, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument);