From dcdd7c35c72ffe28506edf38486e502f0ebb1798 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 2 Sep 2019 16:48:28 +0200 Subject: [PATCH 1/2] Allow ingest processors to execute in a non blocking manner. This PR changes the ingest executing to be non blocking by adding an additional method to the Processor interface that accepts a BiConsumer as handler and changing IngestService#executeBulkRequest(...) to ingest document in a non blocking fashion iff a processor executes in a non blocking fashion. This is the second PR that merges changes made to server module from the enrich branch (see #32789) into the master branch. The plan is to merge changes made to the server module separately from the pr that will merge enrich into master, so that these changes can be reviewed in isolation. --- .../ingest/common/ForEachProcessor.java | 53 +++-- .../ingest/common/ForEachProcessorTests.java | 27 +-- .../action/bulk/TransportBulkAction.java | 81 +++++-- .../ingest/SimulateExecutionService.java | 50 ++-- .../ingest/CompoundProcessor.java | 103 ++++---- .../ingest/ConditionalProcessor.java | 30 ++- .../elasticsearch/ingest/IngestDocument.java | 22 +- .../elasticsearch/ingest/IngestService.java | 109 ++++++--- .../org/elasticsearch/ingest/Pipeline.java | 20 +- .../ingest/PipelineProcessor.java | 16 +- .../org/elasticsearch/ingest/Processor.java | 16 ++ .../ingest/TrackingResultProcessor.java | 101 ++++---- .../action/bulk/BulkRequestModifierTests.java | 6 +- .../bulk/TransportBulkActionIngestTests.java | 52 +++-- .../action/ingest/AsyncIngestProcessorIT.java | 159 +++++++++++++ .../ingest/SimulateExecutionServiceTests.java | 99 ++++++-- .../ingest/CompoundProcessorTests.java | 32 +-- .../ingest/ConditionalProcessorTests.java | 14 +- .../ingest/IngestServiceTests.java | 219 +++++++++++------- .../ingest/PipelineProcessorTests.java | 26 +-- .../ingest/TrackingResultProcessorTests.java | 31 ++- 21 files changed, 862 insertions(+), 404 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index d56a2731d35b9..681b167c828e1 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; import org.elasticsearch.ingest.WrappingProcessor; import org.elasticsearch.script.ScriptService; @@ -65,29 +67,46 @@ boolean isIgnoreMissing() { } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing); if (values == null) { if (ignoreMissing) { - return ingestDocument; + handler.accept(ingestDocument, null); + } else { + handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.")); } - throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."); + } else { + List newValues = new CopyOnWriteArrayList<>(); + innerExecute(0, values, newValues, ingestDocument, handler); } - List newValues = new ArrayList<>(values.size()); - IngestDocument document = ingestDocument; - for (Object value : values) { - Object previousValue = ingestDocument.getIngestMetadata().put("_value", value); - try { - document = processor.execute(document); - if (document == null) { - return null; - } - } finally { - newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue)); - } + } + + void innerExecute(int index, List values, List newValues, IngestDocument document, + BiConsumer handler) { + if (index == values.size()) { + document.setFieldValue(field, new ArrayList<>(newValues)); + handler.accept(document, null); + return; } - document.setFieldValue(field, newValues); - return document; + + Object value = values.get(index); + Object previousValue = document.getIngestMetadata().put("_value", value); + processor.execute(document, (result, e) -> { + if (e != null) { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + handler.accept(null, e); + } else if (result == null) { + handler.accept(null, null); + } else { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + innerExecute(index + 1, values, newValues, document, handler); + } + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } @Override diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 282994d8eb354..a4ee786315c03 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -53,7 +53,7 @@ public void testExecute() throws Exception { "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), false ); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); @@ -73,12 +73,9 @@ public void testExecuteWithFailure() throws Exception { } }); ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); - try { - processor.execute(ingestDocument); - fail("exception expected"); - } catch (RuntimeException e) { - assertThat(e.getMessage(), equalTo("failure")); - } + Exception[] exceptions = new Exception[1]; + processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;}); + assertThat(exceptions[0].getMessage(), equalTo("failure")); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c"))); @@ -95,7 +92,7 @@ public void testExecuteWithFailure() throws Exception { "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), false ); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("A", "B", "c"))); } @@ -114,7 +111,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); assertThat(ingestDocument.getFieldValue("values.0.index", String.class), equalTo("_index")); @@ -142,7 +139,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), (model) -> model.get("other")), false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("values.1.new_field", String.class), equalTo("value")); @@ -180,7 +177,7 @@ public String getTag() { ); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.size(), equalTo(numValues)); @@ -205,7 +202,7 @@ public void testModifyFieldsOutsideArray() throws Exception { Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) ), false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.get(0), equalTo("STRING")); @@ -231,7 +228,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); - forEachProcessor.execute(ingestDocument); + forEachProcessor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.get(0), equalTo("new_value")); @@ -264,7 +261,7 @@ public void testNestedForEach() throws Exception { ); ForEachProcessor processor = new ForEachProcessor( "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); assertThat(result.get(0), equalTo("ABC")); @@ -282,7 +279,7 @@ public void testIgnoreMissing() throws Exception { IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); TestProcessor testProcessor = new TestProcessor(doc -> {}); ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertIngestDocument(originalIngestDocument, ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(0)); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a2f105df7e9b7..05e9a3f203a83 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.bulk; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SparseFixedBitSet; import org.elasticsearch.ElasticsearchParseException; @@ -57,6 +59,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; @@ -81,6 +84,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -575,14 +579,13 @@ private long relativeTime() { } void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener listener) { - long ingestStartTimeInNanos = System.nanoTime(); - BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - ingestService.executeBulkRequest(() -> bulkRequestModifier, - (indexRequest, exception) -> { - logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", - indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); - bulkRequestModifier.markCurrentItemAsFailed(exception); - }, (exception) -> { + final long ingestStartTimeInNanos = System.nanoTime(); + final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); + ingestService.executeBulkRequest( + original.numberOfActions(), + () -> bulkRequestModifier, + bulkRequestModifier::markItemAsFailed, + (originalThread, exception) -> { if (exception != null) { logger.error("failed to execute pipeline for a bulk request", exception); listener.onFailure(exception); @@ -597,26 +600,56 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen // (this will happen if pre-processing all items in the bulk failed) actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); } else { - doExecute(task, bulkRequest, actionListener); + // If a processor went async and returned a response on a different thread then + // before we continue the bulk request we should fork back on a write thread: + if (originalThread == Thread.currentThread()) { + assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE); + doExecute(task, bulkRequest, actionListener); + } else { + threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + doExecute(task, bulkRequest, actionListener); + } + + @Override + public boolean isForceExecution() { + // If we fork back to a write thread we **not** should fail, because tp queue is full. + // (Otherwise the work done during ingest will be lost) + // It is okay to force execution here. Throttling of write requests happens prior to + // ingest when a node receives a bulk request. + return true; + } + }); + } } } }, - indexRequest -> bulkRequestModifier.markCurrentItemAsDropped()); + bulkRequestModifier::markItemAsDropped + ); } static final class BulkRequestModifier implements Iterator> { + private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class); + final BulkRequest bulkRequest; final SparseFixedBitSet failedSlots; final List itemResponses; + final AtomicIntegerArray originalSlots; - int currentSlot = -1; - int[] originalSlots; + volatile int currentSlot = -1; BulkRequestModifier(BulkRequest bulkRequest) { this.bulkRequest = bulkRequest; this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size()); this.itemResponses = new ArrayList<>(bulkRequest.requests().size()); + this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok } @Override @@ -640,12 +673,11 @@ BulkRequest getBulkRequest() { int slot = 0; List> requests = bulkRequest.requests(); - originalSlots = new int[requests.size()]; // oversize, but that's ok for (int i = 0; i < requests.size(); i++) { DocWriteRequest request = requests.get(i); if (failedSlots.get(i) == false) { modifiedBulkRequest.add(request); - originalSlots[slot++] = i; + originalSlots.set(slot++, i); } } return modifiedBulkRequest; @@ -660,7 +692,7 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> { BulkItemResponse[] items = response.getItems(); for (int i = 0; i < items.length; i++) { - itemResponses.add(originalSlots[i], response.getItems()[i]); + itemResponses.add(originalSlots.get(i), response.getItems()[i]); } delegatedListener.onResponse( new BulkResponse( @@ -669,11 +701,11 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, } } - void markCurrentItemAsDropped() { - IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); - failedSlots.set(currentSlot); + synchronized void markItemAsDropped(int slot) { + IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); + failedSlots.set(slot); itemResponses.add( - new BulkItemResponse(currentSlot, indexRequest.opType(), + new BulkItemResponse(slot, indexRequest.opType(), new UpdateResponse( new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0), indexRequest.type(), indexRequest.id(), SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, @@ -683,16 +715,19 @@ void markCurrentItemAsDropped() { ); } - void markCurrentItemAsFailed(Exception e) { - IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); + synchronized void markItemAsFailed(int slot, Exception e) { + IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); + LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", + indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e); + // We hit a error during preprocessing a request, so we: // 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed // 2) Add a bulk item failure for this request // 3) Continue with the next request in the bulk. - failedSlots.set(currentSlot); + failedSlots.set(slot); BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e); - itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure)); + itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure)); } } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 8ed2ffa5ba028..070e99cc5c775 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -26,8 +26,10 @@ import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; @@ -41,38 +43,42 @@ class SimulateExecutionService { this.threadPool = threadPool; } - SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { + void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose, + BiConsumer handler) { if (verbose) { - List processorResultList = new ArrayList<>(); + List processorResultList = new CopyOnWriteArrayList<>(); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); - try { - Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), - verbosePipelineProcessor); - ingestDocument.executePipeline(verbosePipeline); - return new SimulateDocumentVerboseResult(processorResultList); - } catch (Exception e) { - return new SimulateDocumentVerboseResult(processorResultList); - } + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline, (result, e) -> { + handler.accept(new SimulateDocumentVerboseResult(processorResultList), e); + }); } else { - try { - IngestDocument result = pipeline.execute(ingestDocument); - return new SimulateDocumentBaseResult(result); - } catch (Exception e) { - return new SimulateDocumentBaseResult(e); - } + pipeline.execute(ingestDocument, (result, e) -> { + if (e == null) { + handler.accept(new SimulateDocumentBaseResult(result), null); + } else { + handler.accept(new SimulateDocumentBaseResult(e), null); + } + }); } } public void execute(SimulatePipelineRequest.Parsed request, ActionListener listener) { threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> { - List responses = new ArrayList<>(); - for (IngestDocument ingestDocument : request.getDocuments()) { - SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()); + final AtomicInteger counter = new AtomicInteger(); + final List responses = new CopyOnWriteArrayList<>(); + for (IngestDocument ingestDocument : request.getDocuments()) { + executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> { if (response != null) { responses.add(response); } - } - l.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); + if (counter.incrementAndGet() == request.getDocuments().size()) { + l.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), + request.isVerbose(), responses)); + } + }); + } })); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index a095d7647d90f..cf75ead37354d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -114,58 +115,78 @@ public String getTag() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - for (Tuple processorWithMetric : processorsWithMetrics) { - Processor processor = processorWithMetric.v1(); - IngestMetric metric = processorWithMetric.v2(); - long startTimeInNanos = relativeTimeProvider.getAsLong(); - try { - metric.preIngest(); - if (processor.execute(ingestDocument) == null) { - return null; - } - } catch (Exception e) { + throw new UnsupportedOperationException("this method should not get executed"); + } + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + innerExecute(0, ingestDocument, handler); + } + + void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer handler) { + if (currentProcessor == processorsWithMetrics.size()) { + handler.accept(ingestDocument, null); + return; + } + + Tuple processorWithMetric = processorsWithMetrics.get(currentProcessor); + final Processor processor = processorWithMetric.v1(); + final IngestMetric metric = processorWithMetric.v2(); + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + metric.preIngest(); + processor.execute(ingestDocument, (result, e) -> { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metric.postIngest(ingestTimeInMillis); + + if (e != null) { metric.ingestFailed(); if (ignoreFailure) { - continue; - } - - ElasticsearchException compoundProcessorException = - newCompoundProcessorException(e, processor.getType(), processor.getTag()); - if (onFailureProcessors.isEmpty()) { - throw compoundProcessorException; + innerExecute(currentProcessor + 1, ingestDocument, handler); } else { - if (executeOnFailure(ingestDocument, compoundProcessorException) == false) { - return null; + ElasticsearchException compoundProcessorException = + newCompoundProcessorException(e, processor.getType(), processor.getTag()); + if (onFailureProcessors.isEmpty()) { + handler.accept(null, compoundProcessorException); + } else { + executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler); } - break; } - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metric.postIngest(ingestTimeInMillis); + } else { + if (result != null) { + innerExecute(currentProcessor + 1, result, handler); + } else { + handler.accept(null, null); + } } - } - return ingestDocument; + }); } - /** - * @return true if execution should continue, false if document is dropped. - */ - boolean executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { - try { + void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestDocument, ElasticsearchException exception, + BiConsumer handler) { + if (currentOnFailureProcessor == 0) { putFailureMetadata(ingestDocument, exception); - for (Processor processor : onFailureProcessors) { - try { - if (processor.execute(ingestDocument) == null) { - return false; - } - } catch (Exception e) { - throw newCompoundProcessorException(e, processor.getType(), processor.getTag()); - } - } - } finally { + } + + if (currentOnFailureProcessor == onFailureProcessors.size()) { removeFailureMetadata(ingestDocument); + handler.accept(ingestDocument, null); + return; } - return true; + + final Processor onFailureProcessor = onFailureProcessors.get(currentOnFailureProcessor); + onFailureProcessor.execute(ingestDocument, (result, e) -> { + if (e != null) { + removeFailureMetadata(ingestDocument); + handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag())); + return; + } + if (result == null) { + removeFailureMetadata(ingestDocument); + handler.accept(null, null); + return; + } + executeOnFailureAsync(currentOnFailureProcessor + 1, ingestDocument, exception, handler); + }); } private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) { diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index e0a054df2db5a..63dd14f7b4ab8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -66,21 +67,28 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { if (evaluate(ingestDocument)) { - long startTimeInNanos = relativeTimeProvider.getAsLong(); - try { - metric.preIngest(); - return processor.execute(ingestDocument); - } catch (Exception e) { - metric.ingestFailed(); - throw e; - } finally { + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + metric.preIngest(); + processor.execute(ingestDocument, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); metric.postIngest(ingestTimeInMillis); - } + if (e != null) { + metric.ingestFailed(); + handler.accept(null, e); + } else { + handler.accept(result, null); + } + }); + } else { + handler.accept(ingestDocument, null); } - return ingestDocument; + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } boolean evaluate(IngestDocument ingestDocument) { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 90ebc8e074108..34299cb475a27 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.BiConsumer; /** * Represents a single document being captured before indexing and holds the source and metadata (like id, type and index). @@ -641,17 +642,18 @@ private static Object deepCopy(Object value) { /** * Executes the given pipeline with for this document unless the pipeline has already been executed * for this document. - * @param pipeline Pipeline to execute - * @throws Exception On exception in pipeline execution + * + * @param pipeline the pipeline to execute + * @param handler handles the result or failure */ - public IngestDocument executePipeline(Pipeline pipeline) throws Exception { - try { - if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()); - } - return pipeline.execute(this); - } finally { - executedPipelines.remove(pipeline); + public void executePipeline(Pipeline pipeline, BiConsumer handler) { + if (executedPipelines.add(pipeline)) { + pipeline.execute(this, (result, e) -> { + executedPipelines.remove(pipeline); + handler.accept(result, e); + }); + } else { + handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId())); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 07a92767dbdc6..d5d8b860d1b66 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -62,8 +62,10 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.IntConsumer; /** * Holder class for several ingest related services. @@ -327,42 +329,72 @@ void validatePipeline(Map ingestInfos, PutPipelineReq ExceptionsHelper.rethrowAndSuppress(exceptions); } - public void executeBulkRequest(Iterable> actionRequests, - BiConsumer itemFailureHandler, Consumer completionHandler, - Consumer itemDroppedHandler) { + public void executeBulkRequest(int numberOfActionRequests, + Iterable> actionRequests, + BiConsumer itemFailureHandler, + BiConsumer completionHandler, + IntConsumer itemDroppedHandler) { threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - completionHandler.accept(e); + completionHandler.accept(null, e); } @Override protected void doRun() { + final Thread originalThread = Thread.currentThread(); + final AtomicInteger counter = new AtomicInteger(numberOfActionRequests); + int i = 0; for (DocWriteRequest actionRequest : actionRequests) { IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); if (indexRequest == null) { + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); + } + assert counter.get() >= 0; continue; } String pipelineId = indexRequest.getPipeline(); - if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - try { - PipelineHolder holder = pipelines.get(pipelineId); - if (holder == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + if (NOOP_PIPELINE_NAME.equals(pipelineId)) { + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); + } + assert counter.get() >= 0; + continue; + } + + final int slot = i; + try { + PipelineHolder holder = pipelines.get(pipelineId); + if (holder == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } + Pipeline pipeline = holder.pipeline; + innerExecute(slot, indexRequest, pipeline, itemDroppedHandler, e -> { + if (e == null) { + // this shouldn't be needed here but we do it for consistency with index api + // which requires it to prevent double execution + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + } else { + itemFailureHandler.accept(slot, e); + } + + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); } - Pipeline pipeline = holder.pipeline; - innerExecute(indexRequest, pipeline, itemDroppedHandler); - //this shouldn't be needed here but we do it for consistency with index api - // which requires it to prevent double execution - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - } catch (Exception e) { - itemFailureHandler.accept(indexRequest, e); + assert counter.get() >= 0; + }); + } catch (Exception e) { + itemFailureHandler.accept(slot, e); + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); } + assert counter.get() >= 0; } + i++; } - completionHandler.accept(null); } }); } @@ -407,26 +439,34 @@ static String getProcessorName(Processor processor){ return sb.toString(); } - private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { + private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline, IntConsumer itemDroppedHandler, + Consumer handler) { if (pipeline.getProcessors().isEmpty()) { + handler.accept(null); return; } long startTimeInNanos = System.nanoTime(); // the pipeline specific stat holder may not exist and that is fine: // (e.g. the pipeline may have been removed while we're ingesting a document - try { - totalMetrics.preIngest(); - String index = indexRequest.index(); - String type = indexRequest.type(); - String id = indexRequest.id(); - String routing = indexRequest.routing(); - Long version = indexRequest.version(); - VersionType versionType = indexRequest.versionType(); - Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); - if (pipeline.execute(ingestDocument) == null) { - itemDroppedHandler.accept(indexRequest); + totalMetrics.preIngest(); + String index = indexRequest.index(); + String type = indexRequest.type(); + String id = indexRequest.id(); + String routing = indexRequest.routing(); + Long version = indexRequest.version(); + VersionType versionType = indexRequest.versionType(); + Map sourceAsMap = indexRequest.sourceAsMap(); + IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); + pipeline.execute(ingestDocument, (result, e) -> { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); + totalMetrics.postIngest(ingestTimeInMillis); + if (e != null) { + totalMetrics.ingestFailed(); + handler.accept(e); + } else if (result == null) { + itemDroppedHandler.accept(slot); + handler.accept(null); } else { Map metadataMap = ingestDocument.extractMetadata(); //it's fine to set all metadata fields all the time, as ingest document holds their starting values @@ -440,14 +480,9 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); } indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType()); + handler.accept(null); } - } catch (Exception e) { - totalMetrics.ingestFailed(); - throw e; - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); - totalMetrics.postIngest(ingestTimeInMillis); - } + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 218713383227e..3d41d991f3e10 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import org.elasticsearch.script.ScriptService; @@ -93,18 +94,17 @@ public static Pipeline create(String id, Map config, * If null is returned then this document will be dropped and not indexed, otherwise * this document will be kept and indexed. */ - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - long startTimeInNanos = relativeTimeProvider.getAsLong(); - try { - metrics.preIngest(); - return compoundProcessor.execute(ingestDocument); - } catch (Exception e) { - metrics.ingestFailed(); - throw e; - } finally { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + metrics.preIngest(); + compoundProcessor.execute(ingestDocument, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); metrics.postIngest(ingestTimeInMillis); - } + if (e != null) { + metrics.ingestFailed(); + } + handler.accept(result, e); + }); } /** diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index b5794a3f76819..f5e37a1c1235e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest; import java.util.Map; +import java.util.function.BiConsumer; public class PipelineProcessor extends AbstractProcessor { @@ -36,12 +37,19 @@ private PipelineProcessor(String tag, String pipelineName, IngestService ingestS } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { Pipeline pipeline = ingestService.getPipeline(pipelineName); - if (pipeline == null) { - throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']'); + if (pipeline != null) { + ingestDocument.executePipeline(pipeline, handler); + } else { + handler.accept(null, + new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']')); } - return ingestDocument.executePipeline(pipeline); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } Pipeline getPipeline(){ diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 10bd530e3c1e7..029e80234e9e8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -27,6 +27,7 @@ import org.elasticsearch.threadpool.Scheduler; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.LongSupplier; @@ -38,6 +39,21 @@ */ public interface Processor { + /** + * Introspect and potentially modify the incoming data. + * + * Expert method: only override this method if a processor implementation needs to make an asynchronous call, + * otherwise just overwrite {@link #execute(IngestDocument)}. + */ + default void execute(IngestDocument ingestDocument, BiConsumer handler) { + try { + IngestDocument result = execute(ingestDocument); + handler.accept(result, null); + } catch (Exception e) { + handler.accept(null, e); + } + } + /** * Introspect and potentially modify the incoming data. * diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 39ae9edcb2eee..e9d4ea6b2ad4b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.BiConsumer; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -41,56 +42,76 @@ public final class TrackingResultProcessor implements Processor { } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - Processor processor = actualProcessor; - try { - if (processor instanceof ConditionalProcessor) { - ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor; - if (conditionalProcessor.evaluate(ingestDocument) == false) { - return ingestDocument; - } - if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) { - processor = conditionalProcessor.getInnerProcessor(); - } - } - if (processor instanceof PipelineProcessor) { - PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); - Pipeline pipeline = pipelineProcessor.getPipeline(); - //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines - try { - IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); - ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline()); - } catch (ElasticsearchException elasticsearchException) { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + if (actualProcessor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor); + Pipeline pipeline = pipelineProcessor.getPipeline(); + //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines + IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); + ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> { + // do nothing, let the tracking processors throw the exception while recording the path up to the failure + if (e instanceof ElasticsearchException) { + ElasticsearchException elasticsearchException = (ElasticsearchException) e; + //else do nothing, let the tracking processors throw the exception while recording the path up to the failure if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) { - throw elasticsearchException; + if (ignoreFailure) { + processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), + new IngestDocument(ingestDocument), e)); + } else { + processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), e)); + } + handler.accept(null, elasticsearchException); } - //else do nothing, let the tracking processors throw the exception while recording the path up to the failure - } catch (Exception e) { - // do nothing, let the tracking processors throw the exception while recording the path up to the failure + } else { + //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline, handler); } - //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); - Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), - verbosePipelineProcessor); - ingestDocument.executePipeline(verbosePipeline); + }); + return; + } + + final Processor processor; + if (actualProcessor instanceof ConditionalProcessor) { + ConditionalProcessor conditionalProcessor = (ConditionalProcessor) actualProcessor; + if (conditionalProcessor.evaluate(ingestDocument) == false) { + handler.accept(ingestDocument, null); + return; + } + if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) { + processor = conditionalProcessor.getInnerProcessor(); + } else { + processor = actualProcessor; + } + } else { + processor = actualProcessor; + } + + processor.execute(ingestDocument, (result, e) -> { + if (e != null) { + if (ignoreFailure) { + processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); + } else { + processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); + } + handler.accept(null, e); } else { - IngestDocument result = processor.execute(ingestDocument); if (result != null) { processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); + handler.accept(result, null); } else { processorResultList.add(new SimulateProcessorResult(processor.getTag())); - return null; + handler.accept(null, null); } } - } catch (Exception e) { - if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); - } else { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); - } - throw e; - } - return ingestDocument; + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException(); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java index 82ed518256169..5a168264a740f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java @@ -55,7 +55,7 @@ public void testBulkRequestModifier() { while (bulkRequestModifier.hasNext()) { bulkRequestModifier.next(); if (randomBoolean()) { - bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException()); + bulkRequestModifier.markItemAsFailed(i, new RuntimeException()); failedSlots.add(i); } i++; @@ -93,7 +93,7 @@ public void testPipelineFailures() { for (int i = 0; modifier.hasNext(); i++) { modifier.next(); if (i % 2 == 0) { - modifier.markCurrentItemAsFailed(new RuntimeException()); + modifier.markItemAsFailed(i, new RuntimeException()); } } @@ -102,7 +102,7 @@ public void testPipelineFailures() { assertThat(bulkRequest.requests().size(), Matchers.equalTo(16)); List responses = new ArrayList<>(); - ActionListener bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener() { + ActionListener bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener<>() { @Override public void onResponse(BulkResponse bulkItemResponses) { responses.addAll(Arrays.asList(bulkItemResponses.getItems())); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 1822ed75d6091..2690b5200d19a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -68,11 +68,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -94,6 +94,8 @@ public class TransportBulkActionIngestTests extends ESTestCase { private static final Settings SETTINGS = Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build(); + private static final Thread DUMMY_WRITE_THREAD = new Thread(ThreadPool.Names.WRITE); + /** Services needed by bulk action */ TransportService transportService; ClusterService clusterService; @@ -102,9 +104,9 @@ public class TransportBulkActionIngestTests extends ESTestCase { /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ @Captor - ArgumentCaptor> failureHandler; + ArgumentCaptor> failureHandler; @Captor - ArgumentCaptor> completionHandler; + ArgumentCaptor> completionHandler; @Captor ArgumentCaptor> remoteResponseHandler; @Captor @@ -266,15 +268,16 @@ public void testIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); - completionHandler.getValue().accept(exception); + verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), + failureHandler.capture(), completionHandler.capture(), any()); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success Iterator> req = bulkDocsItr.getValue().iterator(); - failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request + failureHandler.getValue().accept(0, exception); // have an exception for our one index request indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); @@ -300,13 +303,14 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); - completionHandler.getValue().accept(exception); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); @@ -332,7 +336,7 @@ public void testIngestForward() throws Exception { ActionTestUtils.execute(action, null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -376,7 +380,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -460,18 +464,19 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), + failureHandler.capture(), completionHandler.capture(), any()); assertEquals(indexRequest1.getPipeline(), "default_pipeline"); assertEquals(indexRequest2.getPipeline(), "default_pipeline"); assertEquals(indexRequest3.getPipeline(), "default_pipeline"); - completionHandler.getValue().accept(exception); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success of the transport bulk action indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); @@ -498,14 +503,15 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { assertFalse(action.indexCreated); // no index yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); - completionHandler.getValue().accept(exception); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); + completionHandler.getValue().accept(null, exception); assertFalse(action.indexCreated); // still no index yet, the ingest node failed. assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path. assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one @@ -562,7 +568,8 @@ public void testFindDefaultPipelineFromTemplateMatch(){ })); assertEquals("pipeline2", indexRequest.getPipeline()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); } private void validateDefaultPipeline(IndexRequest indexRequest) { @@ -584,14 +591,15 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); assertEquals(indexRequest.getPipeline(), "default_pipeline"); - completionHandler.getValue().accept(exception); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); diff --git a/server/src/test/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java b/server/src/test/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java new file mode 100644 index 0000000000000..fe753675e29c9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java @@ -0,0 +1,159 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.ingest; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.hamcrest.Matchers.equalTo; + +/** + * The purpose of this test is to verify that when a processor executes an operation asynchronously that + * the expected result is the same as if the same operation happens synchronously. + * + * In this test two test processor are defined that basically do the same operation, but a single processor + * executes asynchronously. The result of the operation should be the same and also the order in which the + * bulk responses are returned should be the same as how the corresponding index requests were defined. + */ +public class AsyncIngestProcessorIT extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(TestPlugin.class); + } + + public void testAsyncProcessorImplementation() { + // A pipeline with 2 processors: the test async processor and sync test processor. + BytesReference pipelineBody = new BytesArray("{\"processors\": [{\"test-async\": {}, \"test\": {}}]}"); + client().admin().cluster().putPipeline(new PutPipelineRequest("_id", pipelineBody, XContentType.JSON)).actionGet(); + + BulkRequest bulkRequest = new BulkRequest(); + int numDocs = randomIntBetween(8, 256); + for (int i = 0; i < numDocs; i++) { + bulkRequest.add(new IndexRequest("foobar") + .id(Integer.toString(i)) + .source("{}", XContentType.JSON) + .setPipeline("_id") + ); + } + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(numDocs)); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + assertThat(bulkResponse.getItems()[i].getId(), equalTo(id)); + GetResponse getResponse = client().get(new GetRequest("foobar", id)).actionGet(); + // The expected result of async test processor: + assertThat(getResponse.getSource().get("foo"), equalTo("bar-" + id)); + // The expected result of sync test processor: + assertThat(getResponse.getSource().get("bar"), equalTo("baz-" + id)); + } + } + + public static class TestPlugin extends Plugin implements IngestPlugin { + + private ThreadPool threadPool; + + @Override + public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, ScriptService scriptService, + NamedXContentRegistry xContentRegistry, Environment environment, + NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) { + this.threadPool = threadPool; + return List.of(); + } + + @Override + public Map getProcessors(Processor.Parameters parameters) { + return Map.of( + "test-async", (factories, tag, config) -> { + return new AbstractProcessor(tag) { + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + threadPool.generic().execute(() -> { + String id = (String) ingestDocument.getSourceAndMetadata().get("_id"); + if (usually()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // ignore + } + } + ingestDocument.setFieldValue("foo", "bar-" + id); + handler.accept(ingestDocument, null); + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public String getType() { + return "test-async"; + } + }; + }, + "test", (processorFactories, tag, config) -> { + return new AbstractProcessor(tag) { + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + String id = (String) ingestDocument.getSourceAndMetadata().get("_id"); + ingestDocument.setFieldValue("bar", "baz-" + id); + return ingestDocument; + } + + @Override + public String getType() { + return "test"; + } + }; + } + ); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index 5e44e196fe8d4..0ce8251c3a763 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -34,6 +34,8 @@ import java.util.Collections; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; @@ -66,7 +68,15 @@ public void destroy() { public void testExecuteVerboseItem() throws Exception { TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.getAcquire(); assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -91,7 +101,14 @@ public void testExecuteVerboseItem() throws Exception { public void testExecuteItem() throws Exception { TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.getAcquire(); assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; @@ -105,7 +122,14 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception { ingestDocument -> { throw new RuntimeException("processor failed"); }); TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(processor3.getInvokedCounter(), equalTo(0)); @@ -133,7 +157,14 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception { Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2)), processor3)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); @@ -168,7 +199,14 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; }); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -185,7 +223,14 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { }); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -201,7 +246,14 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws public void testExecuteItemWithFailure() throws Exception { TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); }); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; @@ -212,12 +264,19 @@ public void testExecuteItemWithFailure() throws Exception { assertThat(exception.getMessage(), equalTo("java.lang.IllegalArgumentException: java.lang.RuntimeException: processor failed")); } - public void testDropDocument() { + public void testDropDocument() throws Exception { TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value")); Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of()); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; @@ -225,12 +284,19 @@ public void testDropDocument() { assertThat(simulateDocumentBaseResult.getFailure(), nullValue()); } - public void testDropDocumentVerbose() { + public void testDropDocumentVerbose() throws Exception { TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value")); Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of()); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -241,13 +307,20 @@ public void testDropDocumentVerbose() { assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue()); } - public void testDropDocumentVerboseExtraProcessor() { + public void testDropDocumentVerboseExtraProcessor() throws Exception { TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field1", "value")); Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of()); TestProcessor processor3 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value")); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor3.getInvokedCounter(), equalTo(0)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 24e3dcd76774b..e44f1fb4700ff 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -33,6 +33,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -51,7 +52,7 @@ public void testEmpty() throws Exception { CompoundProcessor processor = new CompoundProcessor(); assertThat(processor.getProcessors().isEmpty(), is(true)); assertThat(processor.getOnFailureProcessors().isEmpty(), is(true)); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); } public void testSingleProcessor() throws Exception { @@ -66,7 +67,7 @@ public void testSingleProcessor() throws Exception { assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor.getInvokedCounter(), equalTo(1)); assertStats(compoundProcessor, 1, 0, 1); @@ -81,12 +82,9 @@ public void testSingleProcessorWithException() throws Exception { assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); - try { - compoundProcessor.execute(ingestDocument); - fail("should throw exception"); - } catch (ElasticsearchException e) { - assertThat(e.getRootCause().getMessage(), equalTo("error")); - } + Exception[] holder = new Exception[1]; + compoundProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); + assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo("error")); assertThat(processor.getInvokedCounter(), equalTo(1)); assertStats(compoundProcessor, 1, 1, 0); @@ -99,7 +97,7 @@ public void testIgnoreFailure() throws Exception { when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertStats(0, compoundProcessor, 0, 1, 1, 0); assertThat(processor2.getInvokedCounter(), equalTo(1)); @@ -121,7 +119,7 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception { when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor1.getInvokedCounter(), equalTo(1)); @@ -153,7 +151,9 @@ public String getTag() { when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2), relativeTimeProvider); - assertNull(compoundProcessor.execute(ingestDocument)); + IngestDocument[] result = new IngestDocument[1]; + compoundProcessor.execute(ingestDocument, (r, e) -> result[0] = r); + assertThat(result[0], nullValue()); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertStats(compoundProcessor, 1, 1, 0); } @@ -181,7 +181,7 @@ public void testSingleProcessorWithNestedFailures() throws Exception { Collections.singletonList(lastProcessor), relativeTimeProvider); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); @@ -204,7 +204,7 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); @@ -230,7 +230,7 @@ public void testCompoundProcessorExceptionFail() throws Exception { CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); @@ -256,7 +256,7 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); @@ -271,7 +271,7 @@ public void testBreakOnFailure() throws Exception { when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), Collections.singletonList(onFailureProcessor), relativeTimeProvider); - pipeline.execute(ingestDocument); + pipeline.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index f484957d897f1..204a0bddbc339 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -98,7 +98,7 @@ public String getTag() { String falseValue = "falsy"; IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, falseValue); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); assertStats(processor, 0, 0, 0); @@ -106,13 +106,13 @@ public String getTag() { ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, falseValue); ingestDocument.setFieldValue("error", true); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertStats(processor, 0, 0, 0); //true, always call processor and increments metrics ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, trueValue); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); assertStats(processor, 1, 0, 1); @@ -121,7 +121,9 @@ public String getTag() { ingestDocument.setFieldValue(conditionalField, trueValue); ingestDocument.setFieldValue("error", true); IngestDocument finalIngestDocument = ingestDocument; - expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument)); + Exception holder[] = new Exception[1]; + processor.execute(finalIngestDocument, (result, e) -> {holder[0] = e;}); + assertThat(holder[0], instanceOf(RuntimeException.class)); assertStats(processor, 2, 1, 2); } @@ -177,7 +179,7 @@ public String getTag() { }, relativeTimeProvider); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertWarnings("[types removal] Looking up doc types [_type] in scripts is deprecated."); } @@ -213,7 +215,7 @@ private static void assertMutatingCtxThrows(Consumer> mutati ); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue("listField", new ArrayList<>()); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); Exception e = expectedException.get(); assertThat(e, instanceOf(UnsupportedOperationException.class)); assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 85ae970a7882f..e0ac0cf416c90 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -71,7 +71,7 @@ import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Consumer; +import java.util.function.IntConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -85,6 +85,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; @@ -134,20 +135,20 @@ public void testExecuteIndexPipelineDoesNotExist() { final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final SetOnce failure = new SetOnce<>(); - final BiConsumer failureHandler = (request, e) -> { + final BiConsumer failureHandler = (slot, e) -> { failure.set(true); - assertThat(request, sameInstance(indexRequest)); + assertThat(slot, equalTo(0)); assertThat(e, instanceOf(IllegalArgumentException.class)); assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist")); }; @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); + final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testUpdatePipelines() { @@ -621,7 +622,7 @@ public String getType() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final SetOnce failure = new SetOnce<>(); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id); - final BiConsumer failureHandler = (request, e) -> { + final BiConsumer failureHandler = (slot, e) -> { assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); assertThat(e.getCause().getCause().getMessage(), equalTo("error")); @@ -629,17 +630,17 @@ public String getType() { }; @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); + final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteBulkPipelineDoesNotExist() { IngestService ingestService = createWithProcessors(Collections.singletonMap( - "mock", (factories, tag, config) -> mock(CompoundProcessor.class))); + "mock", (factories, tag, config) -> mockCompoundProcessor())); PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); @@ -656,15 +657,16 @@ public void testExecuteBulkPipelineDoesNotExist() { new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); bulkRequest.add(indexRequest2); @SuppressWarnings("unchecked") - BiConsumer failureHandler = mock(BiConsumer.class); + BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler, + completionHandler, indexReq -> {}); verify(failureHandler, times(1)).accept( - argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { + argThat(new CustomTypeSafeMatcher<>("failure handler was not called with the expected arguments") { @Override - protected boolean matchesSafely(IndexRequest item) { - return item == indexRequest2; + protected boolean matchesSafely(Integer item) { + return item == 1; } }), @@ -675,12 +677,12 @@ protected boolean matchesSafely(IllegalArgumentException iae) { } }) ); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteSuccess() { IngestService ingestService = createWithProcessors(Collections.singletonMap( - "mock", (factories, tag, config) -> mock(CompoundProcessor.class))); + "mock", (factories, tag, config) -> mockCompoundProcessor())); PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty @@ -689,12 +691,12 @@ public void testExecuteSuccess() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteEmptyPipeline() throws Exception { @@ -707,16 +709,16 @@ public void testExecuteEmptyPipeline() throws Exception { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecutePropagateAllMetaDataUpdates() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); + final CompoundProcessor processor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> processor)); PutPipelineRequest putRequest = new PutPipelineRequest("_id", @@ -738,17 +740,21 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); } } - return ingestDocument; - }).when(processor).execute(any()); + + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[1]; + handler.accept(ingestDocument, null); + return null; + }).when(processor).execute(any(), any()); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(processor).execute(any()); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); assertThat(indexRequest.index(), equalTo("update_index")); assertThat(indexRequest.type(), equalTo("update_type")); assertThat(indexRequest.id(), equalTo("update_id")); @@ -758,7 +764,7 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { } public void testExecuteFailure() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); + final CompoundProcessor processor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> processor)); PutPipelineRequest putRequest = new PutPipelineRequest("_id", @@ -770,22 +776,37 @@ public void testExecuteFailure() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()) .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); + verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteSuccessWithOnFailure() throws Exception { final Processor processor = mock(Processor.class); when(processor.getType()).thenReturn("mock_processor_type"); when(processor.getTag()).thenReturn("mock_processor_tag"); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, new RuntimeException()); + return null; + }).when(processor).execute(eqIndexTypeId(emptyMap()), any()); + final Processor onFailureProcessor = mock(Processor.class); + doAnswer(args -> { + IngestDocument ingestDocument = (IngestDocument) args.getArguments()[0]; + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(ingestDocument, null); + return null; + }).when(onFailureProcessor).execute(eqIndexTypeId(emptyMap()), any()); + final CompoundProcessor compoundProcessor = new CompoundProcessor( false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); IngestService ingestService = createWithProcessors(Collections.singletonMap( @@ -797,14 +818,13 @@ public void testExecuteSuccessWithOnFailure() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(emptyMap())); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); - verify(completionHandler, times(1)).accept(null); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class)); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteFailureWithNestedOnFailure() throws Exception { @@ -828,21 +848,21 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()) .when(onFailureOnFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); doThrow(new RuntimeException()) .when(onFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); doThrow(new RuntimeException()) .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); + verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testBulkRequestExecutionWithFailures() throws Exception { @@ -871,7 +891,12 @@ public void testBulkRequestExecutionWithFailures() throws Exception { CompoundProcessor processor = mock(CompoundProcessor.class); when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); Exception error = new RuntimeException(); - doThrow(error).when(processor).execute(any()); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, error); + return null; + }).when(processor).execute(any(), any()); IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> processor)); PutPipelineRequest putRequest = new PutPipelineRequest("_id", @@ -882,18 +907,18 @@ public void testBulkRequestExecutionWithFailures() throws Exception { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); - verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher() { + verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(new ArgumentMatcher() { @Override public boolean matches(final Object o) { return ((Exception)o).getCause().getCause().equals(error); } })); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testBulkRequestExecution() throws Exception { @@ -916,7 +941,12 @@ public void testBulkRequestExecution() throws Exception { final Processor processor = mock(Processor.class); when(processor.getType()).thenReturn("mock"); when(processor.getTag()).thenReturn("mockTag"); - when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null); + return null; + }).when(processor).execute(any(), any()); Map map = new HashMap<>(2); map.put("mock", (factories, tag, config) -> processor); @@ -929,13 +959,13 @@ public void testBulkRequestExecution() throws Exception { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); verify(requestItemErrorHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); for (DocWriteRequest docWriteRequest : bulkRequest.requests()) { IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(docWriteRequest); assertThat(indexRequest, notNullValue()); @@ -950,8 +980,18 @@ public void testStats() throws Exception { when(processor.getTag()).thenReturn("mockTag"); when(processorFailure.getType()).thenReturn("failure-mock"); //avoid returning null and dropping the document - when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); - when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error")); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null); + return null; + }).when(processor).execute(any(IngestDocument.class), any()); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, new RuntimeException("error")); + return null; + }).when(processorFailure).execute(any(IngestDocument.class), any()); Map map = new HashMap<>(2); map.put("mock", (factories, tag, config) -> processor); map.put("failure-mock", (factories, tag, config) -> processorFailure); @@ -973,13 +1013,13 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); + @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); @@ -997,7 +1037,7 @@ public void testStats() throws Exception { indexRequest.setPipeline("_id2"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -1016,7 +1056,7 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -1040,7 +1080,7 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -1106,19 +1146,19 @@ public String getTag() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); + final BiConsumer completionHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer dropHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler); + final IntConsumer dropHandler = mock(IntConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - verify(dropHandler, times(1)).accept(indexRequest); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(dropHandler, times(1)).accept(0); } private IngestDocument eqIndexTypeId(final Map source) { - return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source)); + return argThat(new IngestDocumentMatcher("_index", "_type", "_id", -3L, VersionType.INTERNAL, source)); } private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map source) { @@ -1154,6 +1194,17 @@ public Map getProcessors(final Processor.Parameters p }), client); } + private CompoundProcessor mockCompoundProcessor() { + CompoundProcessor processor = mock(CompoundProcessor.class); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept((IngestDocument) args.getArguments()[0], null); + return null; + }).when(processor).execute(any(), any()); + return processor; + } + private class IngestDocumentMatcher extends ArgumentMatcher { private final IngestDocument ingestDocument; diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 0ad88c05ccc6e..4f36727c7ac30 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -64,7 +64,7 @@ public String getTag() { PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); Map config = new HashMap<>(); config.put("name", pipelineId); - factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument); + factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument, (result, e) -> {}); assertEquals(testIngestDocument, invoked.get()); } @@ -74,12 +74,11 @@ public void testThrowsOnMissingPipeline() throws Exception { PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); Map config = new HashMap<>(); config.put("name", "missingPipelineId"); - IllegalStateException e = expectThrows( - IllegalStateException.class, - () -> factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument) - ); + IllegalStateException[] e = new IllegalStateException[1]; + factory.create(Collections.emptyMap(), null, config) + .execute(testIngestDocument, (result, e1) -> e[0] = (IllegalStateException) e1); assertEquals( - "Pipeline processor configured for non-existent pipeline [missingPipelineId]", e.getMessage() + "Pipeline processor configured for non-existent pipeline [missingPipelineId]", e[0].getMessage() ); } @@ -104,12 +103,11 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer); when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); outerConfig.put("name", innerPipelineId); - ElasticsearchException e = expectThrows( - ElasticsearchException.class, - () -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument) - ); + ElasticsearchException[] e = new ElasticsearchException[1]; + factory.create(Collections.emptyMap(), null, outerConfig) + .execute(testIngestDocument, (result, e1) -> e[0] = (ElasticsearchException) e1); assertEquals( - "Cycle detected for pipeline: inner", e.getRootCause().getMessage() + "Cycle detected for pipeline: inner", e[0].getRootCause().getMessage() ); } @@ -125,8 +123,8 @@ innerPipelineId, null, null, new CompoundProcessor() ); when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig); - outerProc.execute(testIngestDocument); - outerProc.execute(testIngestDocument); + outerProc.execute(testIngestDocument, (result, e) -> {}); + outerProc.execute(testIngestDocument, (result, e) -> {}); } public void testPipelineProcessorWithPipelineChain() throws Exception { @@ -177,7 +175,7 @@ pipeline3Id, null, null, new CompoundProcessor( IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); //start the chain - ingestDocument.executePipeline(pipeline1); + ingestDocument.executePipeline(pipeline1, (result, e) -> {}); assertNotNull(ingestDocument.getSourceAndMetadata().get(key1)); //check the stats diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 2c047283ed1bb..7f68b33dd86bd 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -66,7 +66,7 @@ public void init() { public void testActualProcessor() throws Exception { TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -84,12 +84,9 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception { CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - try { - trackingProcessor.execute(ingestDocument); - fail("processor should throw exception"); - } catch (ElasticsearchException e) { - assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); - } + Exception[] holder = new Exception[1]; + trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); + assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage())); SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); @@ -109,7 +106,7 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception { Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor)); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); @@ -148,7 +145,7 @@ public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { Collections.emptyList()); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); @@ -178,7 +175,7 @@ public void testActualCompoundProcessorWithFalseConditional() throws Exception { new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })); CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); //the step for key 2 is never executed due to conditional and thus not part of the result set @@ -221,7 +218,7 @@ pipelineId, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -287,7 +284,7 @@ pipelineId2, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -355,7 +352,7 @@ pipelineId2, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -407,7 +404,7 @@ pipelineId, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -457,7 +454,9 @@ public void testActualPipelineProcessorWithCycle() throws Exception { CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> trackingProcessor.execute(ingestDocument)); + Exception[] holder = new Exception[1]; + trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); + ElasticsearchException exception = (ElasticsearchException) holder[0]; assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(exception.getCause().getCause(), instanceOf(IllegalStateException.class)); assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1")); @@ -482,7 +481,7 @@ pipelineId, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); From 55b7a13ae6b2555eb99f6c6847564c400f8b0c45 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 25 Sep 2019 13:47:16 +0200 Subject: [PATCH 2/2] fixed test compile error --- .../test/java/org/elasticsearch/ingest/IngestServiceTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index ed91001f31a13..ec521e4761497 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -72,6 +72,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.IntConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors;