diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index ad93298c646e1..9eebc5acdb6e9 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -28,6 +28,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; + import org.elasticsearch.script.ScriptService; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; @@ -63,29 +66,46 @@ boolean isIgnoreMissing() { } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing); if (values == null) { if (ignoreMissing) { - return ingestDocument; + handler.accept(ingestDocument, null); + } else { + handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.")); } - throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."); + } else { + List newValues = new CopyOnWriteArrayList<>(); + innerExecute(0, values, newValues, ingestDocument, handler); } - List newValues = new ArrayList<>(values.size()); - IngestDocument document = ingestDocument; - for (Object value : values) { - Object previousValue = ingestDocument.getIngestMetadata().put("_value", value); - try { - document = processor.execute(document); - if (document == null) { - return null; - } - } finally { - newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue)); - } + } + + void innerExecute(int index, List values, List newValues, IngestDocument document, + BiConsumer handler) { + if (index == values.size()) { + document.setFieldValue(field, new ArrayList<>(newValues)); + handler.accept(document, null); + return; } - document.setFieldValue(field, newValues); - return document; + + Object value = values.get(index); + Object previousValue = document.getIngestMetadata().put("_value", value); + processor.execute(document, (result, e) -> { + if (e != null) { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + handler.accept(null, e); + } else if (result == null) { + handler.accept(null, null); + } else { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + innerExecute(index + 1, values, newValues, document, handler); + } + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } @Override diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 282994d8eb354..a4ee786315c03 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -53,7 +53,7 @@ public void testExecute() throws Exception { "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), false ); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); @@ -73,12 +73,9 @@ public void testExecuteWithFailure() throws Exception { } }); ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); - try { - processor.execute(ingestDocument); - fail("exception expected"); - } catch (RuntimeException e) { - assertThat(e.getMessage(), equalTo("failure")); - } + Exception[] exceptions = new Exception[1]; + processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;}); + assertThat(exceptions[0].getMessage(), equalTo("failure")); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("a", "b", "c"))); @@ -95,7 +92,7 @@ public void testExecuteWithFailure() throws Exception { "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), false ); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); assertThat(ingestDocument.getFieldValue("values", List.class), equalTo(Arrays.asList("A", "B", "c"))); } @@ -114,7 +111,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); assertThat(ingestDocument.getFieldValue("values.0.index", String.class), equalTo("_index")); @@ -142,7 +139,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), (model) -> model.get("other")), false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); assertThat(ingestDocument.getFieldValue("values.1.new_field", String.class), equalTo("value")); @@ -180,7 +177,7 @@ public String getTag() { ); ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.size(), equalTo(numValues)); @@ -205,7 +202,7 @@ public void testModifyFieldsOutsideArray() throws Exception { Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) ), false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.get(0), equalTo("STRING")); @@ -231,7 +228,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); - forEachProcessor.execute(ingestDocument); + forEachProcessor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.get(0), equalTo("new_value")); @@ -264,7 +261,7 @@ public void testNestedForEach() throws Exception { ); ForEachProcessor processor = new ForEachProcessor( "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); assertThat(result.get(0), equalTo("ABC")); @@ -282,7 +279,7 @@ public void testIgnoreMissing() throws Exception { IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); TestProcessor testProcessor = new TestProcessor(doc -> {}); ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertIngestDocument(originalIngestDocument, ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(0)); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 877d9ca0e4a24..34b5ffb40486d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -19,6 +19,8 @@ package org.elasticsearch.action.bulk; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SparseFixedBitSet; import org.elasticsearch.ElasticsearchParseException; @@ -80,6 +82,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -581,14 +584,13 @@ private long relativeTime() { } void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener listener) { - long ingestStartTimeInNanos = System.nanoTime(); - BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - ingestService.executeBulkRequest(() -> bulkRequestModifier, - (indexRequest, exception) -> { - logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", - indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); - bulkRequestModifier.markCurrentItemAsFailed(exception); - }, (exception) -> { + final long ingestStartTimeInNanos = System.nanoTime(); + final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); + ingestService.executeBulkRequest( + original.numberOfActions(), + () -> bulkRequestModifier, + bulkRequestModifier::markItemAsFailed, + (originalThread, exception) -> { if (exception != null) { logger.error("failed to execute pipeline for a bulk request", exception); listener.onFailure(exception); @@ -603,26 +605,56 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen // (this will happen if pre-processing all items in the bulk failed) actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); } else { - doExecute(task, bulkRequest, actionListener); + // If a processor went async and returned a response on a different thread then + // before we continue the bulk request we should fork back on a write thread: + if (originalThread == Thread.currentThread()) { + assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE); + doExecute(task, bulkRequest, actionListener); + } else { + threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + doExecute(task, bulkRequest, actionListener); + } + + @Override + public boolean isForceExecution() { + // If we fork back to a write thread we **not** should fail, because tp queue is full. + // (Otherwise the work done during ingest will be lost) + // It is okay to force execution here. Throttling of write requests happens prior to + // ingest when a node receives a bulk request. + return true; + } + }); + } } } }, - indexRequest -> bulkRequestModifier.markCurrentItemAsDropped()); + bulkRequestModifier::markItemAsDropped + ); } static final class BulkRequestModifier implements Iterator> { + private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class); + final BulkRequest bulkRequest; final SparseFixedBitSet failedSlots; final List itemResponses; + final AtomicIntegerArray originalSlots; - int currentSlot = -1; - int[] originalSlots; + volatile int currentSlot = -1; BulkRequestModifier(BulkRequest bulkRequest) { this.bulkRequest = bulkRequest; this.failedSlots = new SparseFixedBitSet(bulkRequest.requests().size()); this.itemResponses = new ArrayList<>(bulkRequest.requests().size()); + this.originalSlots = new AtomicIntegerArray(bulkRequest.requests().size()); // oversize, but that's ok } @Override @@ -646,12 +678,11 @@ BulkRequest getBulkRequest() { int slot = 0; List> requests = bulkRequest.requests(); - originalSlots = new int[requests.size()]; // oversize, but that's ok for (int i = 0; i < requests.size(); i++) { DocWriteRequest request = requests.get(i); if (failedSlots.get(i) == false) { modifiedBulkRequest.add(request); - originalSlots[slot++] = i; + originalSlots.set(slot++, i); } } return modifiedBulkRequest; @@ -666,7 +697,7 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> { BulkItemResponse[] items = response.getItems(); for (int i = 0; i < items.length; i++) { - itemResponses.add(originalSlots[i], response.getItems()[i]); + itemResponses.add(originalSlots.get(i), response.getItems()[i]); } delegatedListener.onResponse( new BulkResponse( @@ -675,11 +706,11 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, } } - void markCurrentItemAsDropped() { - IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); - failedSlots.set(currentSlot); + synchronized void markItemAsDropped(int slot) { + IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); + failedSlots.set(slot); itemResponses.add( - new BulkItemResponse(currentSlot, indexRequest.opType(), + new BulkItemResponse(slot, indexRequest.opType(), new UpdateResponse( new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0), indexRequest.type(), indexRequest.id(), indexRequest.version(), DocWriteResponse.Result.NOOP @@ -688,16 +719,19 @@ void markCurrentItemAsDropped() { ); } - void markCurrentItemAsFailed(Exception e) { - IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); + synchronized void markItemAsFailed(int slot, Exception e) { + IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); + LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", + indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e); + // We hit a error during preprocessing a request, so we: // 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed // 2) Add a bulk item failure for this request // 3) Continue with the next request in the bulk. - failedSlots.set(currentSlot); + failedSlots.set(slot); BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e); - itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure)); + itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure)); } } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index e2b44ae2a7a60..8014219ad6d38 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -26,8 +26,10 @@ import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; @@ -41,40 +43,44 @@ class SimulateExecutionService { this.threadPool = threadPool; } - SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { + void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose, + BiConsumer handler) { if (verbose) { - List processorResultList = new ArrayList<>(); + List processorResultList = new CopyOnWriteArrayList<>(); CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); - try { - Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), - verbosePipelineProcessor); - ingestDocument.executePipeline(verbosePipeline); - return new SimulateDocumentVerboseResult(processorResultList); - } catch (Exception e) { - return new SimulateDocumentVerboseResult(processorResultList); - } + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline, (result, e) -> { + handler.accept(new SimulateDocumentVerboseResult(processorResultList), e); + }); } else { - try { - pipeline.execute(ingestDocument); - return new SimulateDocumentBaseResult(ingestDocument); - } catch (Exception e) { - return new SimulateDocumentBaseResult(e); - } + pipeline.execute(ingestDocument, (result, e) -> { + if (e == null) { + handler.accept(new SimulateDocumentBaseResult(ingestDocument), null); + } else { + handler.accept(new SimulateDocumentBaseResult(e), null); + } + }); } } public void execute(SimulatePipelineRequest.Parsed request, ActionListener listener) { - threadPool.executor(THREAD_POOL_NAME).execute(new ActionRunnable(listener) { + threadPool.executor(THREAD_POOL_NAME).execute(new ActionRunnable<>(listener) { @Override - protected void doRun() throws Exception { - List responses = new ArrayList<>(); + protected void doRun() { + final AtomicInteger counter = new AtomicInteger(); + final List responses = new CopyOnWriteArrayList<>(); for (IngestDocument ingestDocument : request.getDocuments()) { - SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()); - if (response != null) { - responses.add(response); - } + executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> { + if (response != null) { + responses.add(response); + } + if (counter.incrementAndGet() == request.getDocuments().size()) { + listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), + request.isVerbose(), responses)); + } + }); } - listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); } }); } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index a095d7647d90f..cf75ead37354d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -114,58 +115,78 @@ public String getTag() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - for (Tuple processorWithMetric : processorsWithMetrics) { - Processor processor = processorWithMetric.v1(); - IngestMetric metric = processorWithMetric.v2(); - long startTimeInNanos = relativeTimeProvider.getAsLong(); - try { - metric.preIngest(); - if (processor.execute(ingestDocument) == null) { - return null; - } - } catch (Exception e) { + throw new UnsupportedOperationException("this method should not get executed"); + } + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + innerExecute(0, ingestDocument, handler); + } + + void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer handler) { + if (currentProcessor == processorsWithMetrics.size()) { + handler.accept(ingestDocument, null); + return; + } + + Tuple processorWithMetric = processorsWithMetrics.get(currentProcessor); + final Processor processor = processorWithMetric.v1(); + final IngestMetric metric = processorWithMetric.v2(); + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + metric.preIngest(); + processor.execute(ingestDocument, (result, e) -> { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); + metric.postIngest(ingestTimeInMillis); + + if (e != null) { metric.ingestFailed(); if (ignoreFailure) { - continue; - } - - ElasticsearchException compoundProcessorException = - newCompoundProcessorException(e, processor.getType(), processor.getTag()); - if (onFailureProcessors.isEmpty()) { - throw compoundProcessorException; + innerExecute(currentProcessor + 1, ingestDocument, handler); } else { - if (executeOnFailure(ingestDocument, compoundProcessorException) == false) { - return null; + ElasticsearchException compoundProcessorException = + newCompoundProcessorException(e, processor.getType(), processor.getTag()); + if (onFailureProcessors.isEmpty()) { + handler.accept(null, compoundProcessorException); + } else { + executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler); } - break; } - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); - metric.postIngest(ingestTimeInMillis); + } else { + if (result != null) { + innerExecute(currentProcessor + 1, result, handler); + } else { + handler.accept(null, null); + } } - } - return ingestDocument; + }); } - /** - * @return true if execution should continue, false if document is dropped. - */ - boolean executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { - try { + void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestDocument, ElasticsearchException exception, + BiConsumer handler) { + if (currentOnFailureProcessor == 0) { putFailureMetadata(ingestDocument, exception); - for (Processor processor : onFailureProcessors) { - try { - if (processor.execute(ingestDocument) == null) { - return false; - } - } catch (Exception e) { - throw newCompoundProcessorException(e, processor.getType(), processor.getTag()); - } - } - } finally { + } + + if (currentOnFailureProcessor == onFailureProcessors.size()) { removeFailureMetadata(ingestDocument); + handler.accept(ingestDocument, null); + return; } - return true; + + final Processor onFailureProcessor = onFailureProcessors.get(currentOnFailureProcessor); + onFailureProcessor.execute(ingestDocument, (result, e) -> { + if (e != null) { + removeFailureMetadata(ingestDocument); + handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag())); + return; + } + if (result == null) { + removeFailureMetadata(ingestDocument); + handler.accept(null, null); + return; + } + executeOnFailureAsync(currentOnFailureProcessor + 1, ingestDocument, exception, handler); + }); } private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) { diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index e5333485db52b..bd22d6ca4dbdb 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -66,21 +67,28 @@ public class ConditionalProcessor extends AbstractProcessor { } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { if (evaluate(ingestDocument)) { - long startTimeInNanos = relativeTimeProvider.getAsLong(); - try { - metric.preIngest(); - return processor.execute(ingestDocument); - } catch (Exception e) { - metric.ingestFailed(); - throw e; - } finally { + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + metric.preIngest(); + processor.execute(ingestDocument, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); metric.postIngest(ingestTimeInMillis); - } + if (e != null) { + metric.ingestFailed(); + handler.accept(null, e); + } else { + handler.accept(result, null); + } + }); + } else { + handler.accept(ingestDocument, null); } - return ingestDocument; + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } boolean evaluate(IngestDocument ingestDocument) { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 90ebc8e074108..34299cb475a27 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.BiConsumer; /** * Represents a single document being captured before indexing and holds the source and metadata (like id, type and index). @@ -641,17 +642,18 @@ private static Object deepCopy(Object value) { /** * Executes the given pipeline with for this document unless the pipeline has already been executed * for this document. - * @param pipeline Pipeline to execute - * @throws Exception On exception in pipeline execution + * + * @param pipeline the pipeline to execute + * @param handler handles the result or failure */ - public IngestDocument executePipeline(Pipeline pipeline) throws Exception { - try { - if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()); - } - return pipeline.execute(this); - } finally { - executedPipelines.remove(pipeline); + public void executePipeline(Pipeline pipeline, BiConsumer handler) { + if (executedPipelines.add(pipeline)) { + pipeline.execute(this, (result, e) -> { + executedPipelines.remove(pipeline); + handler.accept(result, e); + }); + } else { + handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId())); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 20c09eccb7bd5..9a44b20bb4538 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -63,8 +63,10 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.IntConsumer; /** * Holder class for several ingest related services. @@ -327,42 +329,72 @@ void validatePipeline(Map ingestInfos, PutPipelineReq ExceptionsHelper.rethrowAndSuppress(exceptions); } - public void executeBulkRequest(Iterable> actionRequests, - BiConsumer itemFailureHandler, Consumer completionHandler, - Consumer itemDroppedHandler) { + public void executeBulkRequest(int numberOfActionRequests, + Iterable> actionRequests, + BiConsumer itemFailureHandler, + BiConsumer completionHandler, + IntConsumer itemDroppedHandler) { threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - completionHandler.accept(e); + completionHandler.accept(null, e); } @Override protected void doRun() { + final Thread originalThread = Thread.currentThread(); + final AtomicInteger counter = new AtomicInteger(numberOfActionRequests); + int i = 0; for (DocWriteRequest actionRequest : actionRequests) { IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); if (indexRequest == null) { + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); + } + assert counter.get() >= 0; continue; } String pipelineId = indexRequest.getPipeline(); - if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - try { - PipelineHolder holder = pipelines.get(pipelineId); - if (holder == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + if (NOOP_PIPELINE_NAME.equals(pipelineId)) { + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); + } + assert counter.get() >= 0; + continue; + } + + final int slot = i; + try { + PipelineHolder holder = pipelines.get(pipelineId); + if (holder == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } + Pipeline pipeline = holder.pipeline; + innerExecute(slot, indexRequest, pipeline, itemDroppedHandler, e -> { + if (e == null) { + // this shouldn't be needed here but we do it for consistency with index api + // which requires it to prevent double execution + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + } else { + itemFailureHandler.accept(slot, e); + } + + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); } - Pipeline pipeline = holder.pipeline; - innerExecute(indexRequest, pipeline, itemDroppedHandler); - //this shouldn't be needed here but we do it for consistency with index api - // which requires it to prevent double execution - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - } catch (Exception e) { - itemFailureHandler.accept(indexRequest, e); + assert counter.get() >= 0; + }); + } catch (Exception e) { + itemFailureHandler.accept(slot, e); + if (counter.decrementAndGet() == 0){ + completionHandler.accept(originalThread, null); } + assert counter.get() >= 0; } + i++; } - completionHandler.accept(null); } }); } @@ -418,26 +450,34 @@ static String getProcessorName(Processor processor){ return sb.toString(); } - private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { + private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline, IntConsumer itemDroppedHandler, + Consumer handler) { if (pipeline.getProcessors().isEmpty()) { + handler.accept(null); return; } long startTimeInNanos = System.nanoTime(); // the pipeline specific stat holder may not exist and that is fine: // (e.g. the pipeline may have been removed while we're ingesting a document - try { - totalMetrics.preIngest(); - String index = indexRequest.index(); - String type = indexRequest.type(); - String id = indexRequest.id(); - String routing = indexRequest.routing(); - Long version = indexRequest.version(); - VersionType versionType = indexRequest.versionType(); - Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); - if (pipeline.execute(ingestDocument) == null) { - itemDroppedHandler.accept(indexRequest); + totalMetrics.preIngest(); + String index = indexRequest.index(); + String type = indexRequest.type(); + String id = indexRequest.id(); + String routing = indexRequest.routing(); + Long version = indexRequest.version(); + VersionType versionType = indexRequest.versionType(); + Map sourceAsMap = indexRequest.sourceAsMap(); + IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); + pipeline.execute(ingestDocument, (result, e) -> { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); + totalMetrics.postIngest(ingestTimeInMillis); + if (e != null) { + totalMetrics.ingestFailed(); + handler.accept(e); + } else if (result == null) { + itemDroppedHandler.accept(slot); + handler.accept(null); } else { Map metadataMap = ingestDocument.extractMetadata(); //it's fine to set all metadata fields all the time, as ingest document holds their starting values @@ -451,14 +491,9 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); } indexRequest.source(ingestDocument.getSourceAndMetadata()); + handler.accept(null); } - } catch (Exception e) { - totalMetrics.ingestFailed(); - throw e; - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); - totalMetrics.postIngest(ingestTimeInMillis); - } + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 218713383227e..3d41d991f3e10 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.LongSupplier; import org.elasticsearch.script.ScriptService; @@ -93,18 +94,17 @@ public static Pipeline create(String id, Map config, * If null is returned then this document will be dropped and not indexed, otherwise * this document will be kept and indexed. */ - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - long startTimeInNanos = relativeTimeProvider.getAsLong(); - try { - metrics.preIngest(); - return compoundProcessor.execute(ingestDocument); - } catch (Exception e) { - metrics.ingestFailed(); - throw e; - } finally { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + final long startTimeInNanos = relativeTimeProvider.getAsLong(); + metrics.preIngest(); + compoundProcessor.execute(ingestDocument, (result, e) -> { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); metrics.postIngest(ingestTimeInMillis); - } + if (e != null) { + metrics.ingestFailed(); + } + handler.accept(result, e); + }); } /** diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index b5794a3f76819..f5e37a1c1235e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest; import java.util.Map; +import java.util.function.BiConsumer; public class PipelineProcessor extends AbstractProcessor { @@ -36,12 +37,19 @@ private PipelineProcessor(String tag, String pipelineName, IngestService ingestS } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { Pipeline pipeline = ingestService.getPipeline(pipelineName); - if (pipeline == null) { - throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']'); + if (pipeline != null) { + ingestDocument.executePipeline(pipeline, handler); + } else { + handler.accept(null, + new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']')); } - return ingestDocument.executePipeline(pipeline); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } Pipeline getPipeline(){ diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 4de02ddeaa691..54fd07a5d2e43 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -27,6 +27,7 @@ import org.elasticsearch.threadpool.Scheduler; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.LongSupplier; @@ -38,6 +39,21 @@ */ public interface Processor { + /** + * Introspect and potentially modify the incoming data. + * + * Expert method: only override this method if a processor implementation needs to make an asynchronous call, + * otherwise just overwrite {@link #execute(IngestDocument)}. + */ + default void execute(IngestDocument ingestDocument, BiConsumer handler) { + try { + IngestDocument result = execute(ingestDocument); + handler.accept(result, null); + } catch (Exception e) { + handler.accept(null, e); + } + } + /** * Introspect and potentially modify the incoming data. * diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 4b78715144649..1bbb9b85eb37c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.BiConsumer; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -41,51 +42,71 @@ public final class TrackingResultProcessor implements Processor { } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - Processor processor = actualProcessor; - try { - if (processor instanceof ConditionalProcessor) { - ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor; - if (conditionalProcessor.evaluate(ingestDocument) == false) { - return ingestDocument; - } - if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) { - processor = conditionalProcessor.getProcessor(); - } - } - if (processor instanceof PipelineProcessor) { - PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); - Pipeline pipeline = pipelineProcessor.getPipeline(); - //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines - try { - IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); - ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline()); - } catch (ElasticsearchException elasticsearchException) { + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + if (actualProcessor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) actualProcessor); + Pipeline pipeline = pipelineProcessor.getPipeline(); + //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines + IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); + ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline(), (result, e) -> { + // do nothing, let the tracking processors throw the exception while recording the path up to the failure + if (e instanceof ElasticsearchException) { + ElasticsearchException elasticsearchException = (ElasticsearchException) e; + //else do nothing, let the tracking processors throw the exception while recording the path up to the failure if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) { - throw elasticsearchException; + if (ignoreFailure) { + processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), + new IngestDocument(ingestDocument), e)); + } else { + processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getTag(), e)); + } + handler.accept(null, elasticsearchException); } - //else do nothing, let the tracking processors throw the exception while recording the path up to the failure - } catch (Exception e) { - // do nothing, let the tracking processors throw the exception while recording the path up to the failure + } else { + //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline, handler); } - //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); - Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), - verbosePipelineProcessor); - ingestDocument.executePipeline(verbosePipeline); - } else { - processor.execute(ingestDocument); - processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); + }); + return; + } + + final Processor processor; + if (actualProcessor instanceof ConditionalProcessor) { + ConditionalProcessor conditionalProcessor = (ConditionalProcessor) actualProcessor; + if (conditionalProcessor.evaluate(ingestDocument) == false) { + handler.accept(ingestDocument, null); + return; } - } catch (Exception e) { - if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); + if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) { + processor = conditionalProcessor.getProcessor(); } else { - processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); + processor = actualProcessor; } - throw e; + } else { + processor = actualProcessor; } - return ingestDocument; + + processor.execute(ingestDocument, (result, e) -> { + if (e != null) { + if (ignoreFailure) { + processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); + } else { + processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); + } + handler.accept(null, e); + } else { + processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); + handler.accept(result, null); + } + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException(); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java index 82ed518256169..5a168264a740f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java @@ -55,7 +55,7 @@ public void testBulkRequestModifier() { while (bulkRequestModifier.hasNext()) { bulkRequestModifier.next(); if (randomBoolean()) { - bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException()); + bulkRequestModifier.markItemAsFailed(i, new RuntimeException()); failedSlots.add(i); } i++; @@ -93,7 +93,7 @@ public void testPipelineFailures() { for (int i = 0; modifier.hasNext(); i++) { modifier.next(); if (i % 2 == 0) { - modifier.markCurrentItemAsFailed(new RuntimeException()); + modifier.markItemAsFailed(i, new RuntimeException()); } } @@ -102,7 +102,7 @@ public void testPipelineFailures() { assertThat(bulkRequest.requests().size(), Matchers.equalTo(16)); List responses = new ArrayList<>(); - ActionListener bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener() { + ActionListener bulkResponseListener = modifier.wrapActionListenerIfNeeded(1L, new ActionListener<>() { @Override public void onResponse(BulkResponse bulkItemResponses) { responses.addAll(Arrays.asList(bulkItemResponses.getItems())); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 95afbbf54f573..5120066b4aa65 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -67,11 +67,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -93,6 +93,8 @@ public class TransportBulkActionIngestTests extends ESTestCase { private static final Settings SETTINGS = Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build(); + private static final Thread DUMMY_WRITE_THREAD = new Thread(ThreadPool.Names.WRITE); + /** Services needed by bulk action */ TransportService transportService; ClusterService clusterService; @@ -101,9 +103,9 @@ public class TransportBulkActionIngestTests extends ESTestCase { /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ @Captor - ArgumentCaptor> failureHandler; + ArgumentCaptor> failureHandler; @Captor - ArgumentCaptor> completionHandler; + ArgumentCaptor> completionHandler; @Captor ArgumentCaptor> remoteResponseHandler; @Captor @@ -265,15 +267,16 @@ public void testIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); - completionHandler.getValue().accept(exception); + verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), + failureHandler.capture(), completionHandler.capture(), any()); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success Iterator> req = bulkDocsItr.getValue().iterator(); - failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request + failureHandler.getValue().accept(0, exception); // have an exception for our one index request indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); @@ -299,13 +302,14 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); - completionHandler.getValue().accept(exception); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); @@ -331,7 +335,7 @@ public void testIngestForward() throws Exception { action.execute(null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -375,7 +379,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { singleItemBulkWriteAction.execute(null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -459,18 +463,19 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + verify(ingestService).executeBulkRequest(eq(bulkRequest.numberOfActions()), bulkDocsItr.capture(), + failureHandler.capture(), completionHandler.capture(), any()); assertEquals(indexRequest1.getPipeline(), "default_pipeline"); assertEquals(indexRequest2.getPipeline(), "default_pipeline"); assertEquals(indexRequest3.getPipeline(), "default_pipeline"); - completionHandler.getValue().accept(exception); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success of the transport bulk action indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); @@ -497,14 +502,15 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception { assertFalse(action.indexCreated); // no index yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); - completionHandler.getValue().accept(exception); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); + completionHandler.getValue().accept(null, exception); assertFalse(action.indexCreated); // still no index yet, the ingest node failed. assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path. assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one @@ -561,7 +567,8 @@ public void testFindDefaultPipelineFromTemplateMatch(){ })); assertEquals("pipeline2", indexRequest.getPipeline()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); } private void validateDefaultPipeline(IndexRequest indexRequest) { @@ -583,14 +590,15 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + verify(ingestService).executeBulkRequest(eq(1), bulkDocsItr.capture(), failureHandler.capture(), + completionHandler.capture(), any()); assertEquals(indexRequest.getPipeline(), "default_pipeline"); - completionHandler.getValue().accept(exception); + completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); // now check success indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); + completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); assertTrue(action.isExecuted); assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one verifyZeroInteractions(transportService); diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index 4c6fb7df62050..2925eeb023556 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -20,19 +20,21 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; @@ -51,11 +53,7 @@ public class SimulateExecutionServiceTests extends ESTestCase { @Before public void setup() { - threadPool = new ThreadPool( - Settings.builder() - .put("node.name", getClass().getName()) - .build() - ); + threadPool = new TestThreadPool(SimulateExecutionServiceTests.class.getSimpleName()); executionService = new SimulateExecutionService(threadPool); ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); } @@ -68,7 +66,15 @@ public void destroy() { public void testExecuteVerboseItem() throws Exception { TestProcessor processor = new TestProcessor("test-id", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.getAcquire(); assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -93,7 +99,14 @@ public void testExecuteVerboseItem() throws Exception { public void testExecuteItem() throws Exception { TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.getAcquire(); assertThat(processor.getInvokedCounter(), equalTo(2)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; @@ -107,7 +120,14 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception { ingestDocument -> { throw new RuntimeException("processor failed"); }); TestProcessor processor3 = new TestProcessor("processor_2", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(processor3.getInvokedCounter(), equalTo(0)); @@ -135,7 +155,14 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception { Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2)), processor3)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertThat(processor2.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); @@ -170,7 +197,14 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { throw exception; }); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -187,7 +221,14 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws TestProcessor testProcessor = new TestProcessor("processor_0", "mock", ingestDocument -> { }); CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; @@ -203,7 +244,14 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws public void testExecuteItemWithFailure() throws Exception { TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); }); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); - SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference holder = new AtomicReference<>(); + executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> { + holder.set(r); + latch.countDown(); + }); + latch.await(); + SimulateDocumentResult actualItemResponse = holder.get(); assertThat(processor.getInvokedCounter(), equalTo(1)); assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class)); SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse; diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 24e3dcd76774b..e44f1fb4700ff 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -33,6 +33,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -51,7 +52,7 @@ public void testEmpty() throws Exception { CompoundProcessor processor = new CompoundProcessor(); assertThat(processor.getProcessors().isEmpty(), is(true)); assertThat(processor.getOnFailureProcessors().isEmpty(), is(true)); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); } public void testSingleProcessor() throws Exception { @@ -66,7 +67,7 @@ public void testSingleProcessor() throws Exception { assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor.getInvokedCounter(), equalTo(1)); assertStats(compoundProcessor, 1, 0, 1); @@ -81,12 +82,9 @@ public void testSingleProcessorWithException() throws Exception { assertThat(compoundProcessor.getProcessors().size(), equalTo(1)); assertThat(compoundProcessor.getProcessors().get(0), sameInstance(processor)); assertThat(compoundProcessor.getOnFailureProcessors().isEmpty(), is(true)); - try { - compoundProcessor.execute(ingestDocument); - fail("should throw exception"); - } catch (ElasticsearchException e) { - assertThat(e.getRootCause().getMessage(), equalTo("error")); - } + Exception[] holder = new Exception[1]; + compoundProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); + assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo("error")); assertThat(processor.getInvokedCounter(), equalTo(1)); assertStats(compoundProcessor, 1, 1, 0); @@ -99,7 +97,7 @@ public void testIgnoreFailure() throws Exception { when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor compoundProcessor = new CompoundProcessor(true, Arrays.asList(processor1, processor2), Collections.emptyList(), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertStats(0, compoundProcessor, 0, 1, 1, 0); assertThat(processor2.getInvokedCounter(), equalTo(1)); @@ -121,7 +119,7 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception { when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(1)); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); verify(relativeTimeProvider, times(2)).getAsLong(); assertThat(processor1.getInvokedCounter(), equalTo(1)); @@ -153,7 +151,9 @@ public String getTag() { when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), Collections.singletonList(processor2), relativeTimeProvider); - assertNull(compoundProcessor.execute(ingestDocument)); + IngestDocument[] result = new IngestDocument[1]; + compoundProcessor.execute(ingestDocument, (r, e) -> result[0] = r); + assertThat(result[0], nullValue()); assertThat(processor1.getInvokedCounter(), equalTo(1)); assertStats(compoundProcessor, 1, 1, 0); } @@ -181,7 +181,7 @@ public void testSingleProcessorWithNestedFailures() throws Exception { Collections.singletonList(lastProcessor), relativeTimeProvider); CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor), Collections.singletonList(compoundOnFailProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(processorToFail.getInvokedCounter(), equalTo(1)); assertThat(lastProcessor.getInvokedCounter(), equalTo(1)); @@ -204,7 +204,7 @@ public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exceptio CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); @@ -230,7 +230,7 @@ public void testCompoundProcessorExceptionFail() throws Exception { CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); @@ -256,7 +256,7 @@ public void testCompoundProcessorExceptionFailInOnFailure() throws Exception { CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(failCompoundProcessor), Collections.singletonList(secondProcessor), relativeTimeProvider); - compoundProcessor.execute(ingestDocument); + compoundProcessor.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(1)); @@ -271,7 +271,7 @@ public void testBreakOnFailure() throws Exception { when(relativeTimeProvider.getAsLong()).thenReturn(0L); CompoundProcessor pipeline = new CompoundProcessor(false, Arrays.asList(firstProcessor, secondProcessor), Collections.singletonList(onFailureProcessor), relativeTimeProvider); - pipeline.execute(ingestDocument); + pipeline.execute(ingestDocument, (result, e) -> {}); assertThat(firstProcessor.getInvokedCounter(), equalTo(1)); assertThat(secondProcessor.getInvokedCounter(), equalTo(0)); assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1)); diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index f484957d897f1..204a0bddbc339 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -98,7 +98,7 @@ public String getTag() { String falseValue = "falsy"; IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, falseValue); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(falseValue)); assertThat(ingestDocument.getSourceAndMetadata(), not(hasKey("foo"))); assertStats(processor, 0, 0, 0); @@ -106,13 +106,13 @@ public String getTag() { ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, falseValue); ingestDocument.setFieldValue("error", true); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertStats(processor, 0, 0, 0); //true, always call processor and increments metrics ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue(conditionalField, trueValue); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getSourceAndMetadata().get(conditionalField), is(trueValue)); assertThat(ingestDocument.getSourceAndMetadata().get("foo"), is("bar")); assertStats(processor, 1, 0, 1); @@ -121,7 +121,9 @@ public String getTag() { ingestDocument.setFieldValue(conditionalField, trueValue); ingestDocument.setFieldValue("error", true); IngestDocument finalIngestDocument = ingestDocument; - expectThrows(RuntimeException.class, () -> processor.execute(finalIngestDocument)); + Exception holder[] = new Exception[1]; + processor.execute(finalIngestDocument, (result, e) -> {holder[0] = e;}); + assertThat(holder[0], instanceOf(RuntimeException.class)); assertStats(processor, 2, 1, 2); } @@ -177,7 +179,7 @@ public String getTag() { }, relativeTimeProvider); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); assertWarnings("[types removal] Looking up doc types [_type] in scripts is deprecated."); } @@ -213,7 +215,7 @@ private static void assertMutatingCtxThrows(Consumer> mutati ); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); ingestDocument.setFieldValue("listField", new ArrayList<>()); - processor.execute(ingestDocument); + processor.execute(ingestDocument, (result, e) -> {}); Exception e = expectedException.get(); assertThat(e, instanceOf(UnsupportedOperationException.class)); assertEquals("Mutating ingest documents in conditionals is not supported", e.getMessage()); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 1e2946a78e234..1dd15064b4dd0 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -63,7 +63,7 @@ import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Consumer; +import java.util.function.IntConsumer; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -75,6 +75,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; @@ -121,20 +122,20 @@ public void testExecuteIndexPipelineDoesNotExist() { final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); final SetOnce failure = new SetOnce<>(); - final BiConsumer failureHandler = (request, e) -> { + final BiConsumer failureHandler = (slot, e) -> { failure.set(true); - assertThat(request, sameInstance(indexRequest)); + assertThat(slot, equalTo(0)); assertThat(e, instanceOf(IllegalArgumentException.class)); assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist")); }; @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); + final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testUpdatePipelines() { @@ -521,7 +522,7 @@ public String getType() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final SetOnce failure = new SetOnce<>(); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id); - final BiConsumer failureHandler = (request, e) -> { + final BiConsumer failureHandler = (slot, e) -> { assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); assertThat(e.getCause().getCause().getMessage(), equalTo("error")); @@ -529,17 +530,17 @@ public String getType() { }; @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); + final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteBulkPipelineDoesNotExist() { IngestService ingestService = createWithProcessors(Collections.singletonMap( - "mock", (factories, tag, config) -> mock(CompoundProcessor.class))); + "mock", (factories, tag, config) -> mockCompoundProcessor())); PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); @@ -556,15 +557,16 @@ public void testExecuteBulkPipelineDoesNotExist() { new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); bulkRequest.add(indexRequest2); @SuppressWarnings("unchecked") - BiConsumer failureHandler = mock(BiConsumer.class); + BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler, + completionHandler, indexReq -> {}); verify(failureHandler, times(1)).accept( - argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { + argThat(new CustomTypeSafeMatcher<>("failure handler was not called with the expected arguments") { @Override - protected boolean matchesSafely(IndexRequest item) { - return item == indexRequest2; + protected boolean matchesSafely(Integer item) { + return item == 1; } }), @@ -575,12 +577,12 @@ protected boolean matchesSafely(IllegalArgumentException iae) { } }) ); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteSuccess() { IngestService ingestService = createWithProcessors(Collections.singletonMap( - "mock", (factories, tag, config) -> mock(CompoundProcessor.class))); + "mock", (factories, tag, config) -> mockCompoundProcessor())); PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty @@ -589,12 +591,12 @@ public void testExecuteSuccess() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteEmptyPipeline() throws Exception { @@ -607,16 +609,16 @@ public void testExecuteEmptyPipeline() throws Exception { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecutePropagateAllMetaDataUpdates() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); + final CompoundProcessor processor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> processor)); PutPipelineRequest putRequest = new PutPipelineRequest("_id", @@ -638,17 +640,21 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); } } - return ingestDocument; - }).when(processor).execute(any()); + + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[1]; + handler.accept(ingestDocument, null); + return null; + }).when(processor).execute(any(), any()); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(processor).execute(any()); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(processor).execute(any(), any()); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); assertThat(indexRequest.index(), equalTo("update_index")); assertThat(indexRequest.type(), equalTo("update_type")); assertThat(indexRequest.id(), equalTo("update_id")); @@ -658,7 +664,7 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { } public void testExecuteFailure() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); + final CompoundProcessor processor = mockCompoundProcessor(); IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> processor)); PutPipelineRequest putRequest = new PutPipelineRequest("_id", @@ -670,22 +676,37 @@ public void testExecuteFailure() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()) .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); + verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteSuccessWithOnFailure() throws Exception { final Processor processor = mock(Processor.class); when(processor.getType()).thenReturn("mock_processor_type"); when(processor.getTag()).thenReturn("mock_processor_tag"); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, new RuntimeException()); + return null; + }).when(processor).execute(eqIndexTypeId(emptyMap()), any()); + final Processor onFailureProcessor = mock(Processor.class); + doAnswer(args -> { + IngestDocument ingestDocument = (IngestDocument) args.getArguments()[0]; + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(ingestDocument, null); + return null; + }).when(onFailureProcessor).execute(eqIndexTypeId(emptyMap()), any()); + final CompoundProcessor compoundProcessor = new CompoundProcessor( false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); IngestService ingestService = createWithProcessors(Collections.singletonMap( @@ -697,14 +718,13 @@ public void testExecuteSuccessWithOnFailure() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(emptyMap())); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); - verify(completionHandler, times(1)).accept(null); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class)); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testExecuteFailureWithNestedOnFailure() throws Exception { @@ -728,21 +748,21 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); doThrow(new RuntimeException()) .when(onFailureOnFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); doThrow(new RuntimeException()) .when(onFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); doThrow(new RuntimeException()) .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); + verify(failureHandler, times(1)).accept(eq(0), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testBulkRequestExecutionWithFailures() throws Exception { @@ -771,7 +791,12 @@ public void testBulkRequestExecutionWithFailures() throws Exception { CompoundProcessor processor = mock(CompoundProcessor.class); when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); Exception error = new RuntimeException(); - doThrow(error).when(processor).execute(any()); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, error); + return null; + }).when(processor).execute(any(), any()); IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> processor)); PutPipelineRequest putRequest = new PutPipelineRequest("_id", @@ -782,18 +807,18 @@ public void testBulkRequestExecutionWithFailures() throws Exception { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); - verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher() { + verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(new ArgumentMatcher() { @Override public boolean matches(final Object o) { return ((Exception)o).getCause().getCause().equals(error); } })); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testBulkRequestExecution() { @@ -816,13 +841,13 @@ public void testBulkRequestExecution() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(numRequest, bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); verify(requestItemErrorHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } public void testStats() throws Exception { @@ -832,8 +857,18 @@ public void testStats() throws Exception { when(processor.getTag()).thenReturn("mockTag"); when(processorFailure.getType()).thenReturn("failure-mock"); //avoid returning null and dropping the document - when(processor.execute(any(IngestDocument.class))).thenReturn( RandomDocumentPicks.randomIngestDocument(random())); - when(processorFailure.execute(any(IngestDocument.class))).thenThrow(new RuntimeException("error")); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null); + return null; + }).when(processor).execute(any(IngestDocument.class), any()); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(null, new RuntimeException("error")); + return null; + }).when(processorFailure).execute(any(IngestDocument.class), any()); Map map = new HashMap<>(2); map.put("mock", (factories, tag, config) -> processor); map.put("failure-mock", (factories, tag, config) -> processorFailure); @@ -855,13 +890,13 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); + @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.getPipelineStats().size(), equalTo(2)); @@ -879,7 +914,7 @@ public void testStats() throws Exception { indexRequest.setPipeline("_id2"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -898,7 +933,7 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterThirdRequestStats = ingestService.stats(); assertThat(afterThirdRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -922,7 +957,7 @@ public void testStats() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterForthRequestStats = ingestService.stats(); assertThat(afterForthRequestStats.getPipelineStats().size(), equalTo(2)); //total @@ -988,19 +1023,19 @@ public String getTag() { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); + final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); + final BiConsumer completionHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") - final Consumer dropHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler); + final IntConsumer dropHandler = mock(IntConsumer.class); + ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler); verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - verify(dropHandler, times(1)).accept(indexRequest); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(dropHandler, times(1)).accept(0); } private IngestDocument eqIndexTypeId(final Map source) { - return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source)); + return argThat(new IngestDocumentMatcher("_index", "_type", "_id", -3L, VersionType.INTERNAL, source)); } private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map source) { @@ -1066,6 +1101,17 @@ public Map getProcessors(final Processor.Parameters p }), null); } + private CompoundProcessor mockCompoundProcessor() { + CompoundProcessor processor = mock(CompoundProcessor.class); + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept((IngestDocument) args.getArguments()[0], null); + return null; + }).when(processor).execute(any(), any()); + return processor; + } + private class IngestDocumentMatcher extends ArgumentMatcher { private final IngestDocument ingestDocument; diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 0ad88c05ccc6e..4f36727c7ac30 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -64,7 +64,7 @@ public String getTag() { PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); Map config = new HashMap<>(); config.put("name", pipelineId); - factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument); + factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument, (result, e) -> {}); assertEquals(testIngestDocument, invoked.get()); } @@ -74,12 +74,11 @@ public void testThrowsOnMissingPipeline() throws Exception { PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); Map config = new HashMap<>(); config.put("name", "missingPipelineId"); - IllegalStateException e = expectThrows( - IllegalStateException.class, - () -> factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument) - ); + IllegalStateException[] e = new IllegalStateException[1]; + factory.create(Collections.emptyMap(), null, config) + .execute(testIngestDocument, (result, e1) -> e[0] = (IllegalStateException) e1); assertEquals( - "Pipeline processor configured for non-existent pipeline [missingPipelineId]", e.getMessage() + "Pipeline processor configured for non-existent pipeline [missingPipelineId]", e[0].getMessage() ); } @@ -104,12 +103,11 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer); when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); outerConfig.put("name", innerPipelineId); - ElasticsearchException e = expectThrows( - ElasticsearchException.class, - () -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument) - ); + ElasticsearchException[] e = new ElasticsearchException[1]; + factory.create(Collections.emptyMap(), null, outerConfig) + .execute(testIngestDocument, (result, e1) -> e[0] = (ElasticsearchException) e1); assertEquals( - "Cycle detected for pipeline: inner", e.getRootCause().getMessage() + "Cycle detected for pipeline: inner", e[0].getRootCause().getMessage() ); } @@ -125,8 +123,8 @@ innerPipelineId, null, null, new CompoundProcessor() ); when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig); - outerProc.execute(testIngestDocument); - outerProc.execute(testIngestDocument); + outerProc.execute(testIngestDocument, (result, e) -> {}); + outerProc.execute(testIngestDocument, (result, e) -> {}); } public void testPipelineProcessorWithPipelineChain() throws Exception { @@ -177,7 +175,7 @@ pipeline3Id, null, null, new CompoundProcessor( IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); //start the chain - ingestDocument.executePipeline(pipeline1); + ingestDocument.executePipeline(pipeline1, (result, e) -> {}); assertNotNull(ingestDocument.getSourceAndMetadata().get(key1)); //check the stats diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 2c047283ed1bb..7f68b33dd86bd 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -66,7 +66,7 @@ public void init() { public void testActualProcessor() throws Exception { TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -84,12 +84,9 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception { CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - try { - trackingProcessor.execute(ingestDocument); - fail("processor should throw exception"); - } catch (ElasticsearchException e) { - assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); - } + Exception[] holder = new Exception[1]; + trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); + assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage())); SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); @@ -109,7 +106,7 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception { Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor)); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); @@ -148,7 +145,7 @@ public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { Collections.emptyList()); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); @@ -178,7 +175,7 @@ public void testActualCompoundProcessorWithFalseConditional() throws Exception { new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })); CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); //the step for key 2 is never executed due to conditional and thus not part of the result set @@ -221,7 +218,7 @@ pipelineId, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -287,7 +284,7 @@ pipelineId2, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -355,7 +352,7 @@ pipelineId2, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -407,7 +404,7 @@ pipelineId, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); @@ -457,7 +454,9 @@ public void testActualPipelineProcessorWithCycle() throws Exception { CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> trackingProcessor.execute(ingestDocument)); + Exception[] holder = new Exception[1]; + trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); + ElasticsearchException exception = (ElasticsearchException) holder[0]; assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(exception.getCause().getCause(), instanceOf(IllegalStateException.class)); assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1")); @@ -482,7 +481,7 @@ pipelineId, null, null, new CompoundProcessor( CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); + trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java index fc323f33d5b4d..820991321fc16 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java @@ -5,11 +5,11 @@ */ package org.elasticsearch.xpack.enrich; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.Preference; -import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.ingest.AbstractProcessor; @@ -21,13 +21,14 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.function.BiConsumer; final class ExactMatchProcessor extends AbstractProcessor { static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field"; - private final CheckedFunction searchRunner; - + private final BiConsumer> searchRunner; private final String policyName; private final String enrichKey; private final boolean ignoreMissing; @@ -39,11 +40,18 @@ final class ExactMatchProcessor extends AbstractProcessor { String enrichKey, boolean ignoreMissing, List specifications) { - this(tag, (req) -> client.search(req).actionGet(), policyName, enrichKey, ignoreMissing, specifications); + this( + tag, + createSearchRunner(client), + policyName, + enrichKey, + ignoreMissing, + specifications + ); } ExactMatchProcessor(String tag, - CheckedFunction searchRunner, + BiConsumer> searchRunner, String policyName, String enrichKey, boolean ignoreMissing, @@ -57,48 +65,64 @@ final class ExactMatchProcessor extends AbstractProcessor { } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - // If a document does not have the enrich key, return the unchanged document - final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing); - if (value == null) { - return ingestDocument; - } - - TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value); - ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery); - // TODO: Use a custom transport action instead of the search API - SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); - searchBuilder.size(1); - searchBuilder.trackScores(false); - searchBuilder.fetchSource(specifications.stream().map(s -> s.sourceField).toArray(String[]::new), null); - searchBuilder.query(constantScore); - - SearchRequest req = new SearchRequest(); - req.indices(EnrichPolicy.getBaseName(policyName)); - req.preference(Preference.LOCAL.type()); - req.source(searchBuilder); - - // TODO: Make this Async - SearchResponse searchResponse = searchRunner.apply(req); - - // If the index is empty, return the unchanged document - // If the enrich key does not exist in the index, throw an error - // If no documents match the key, return the unchanged document - SearchHit[] searchHits = searchResponse.getHits().getHits(); - if (searchHits.length < 1) { - return ingestDocument; - } else if (searchHits.length > 1) { - throw new IllegalStateException("more than one doc id matching for [" + enrichKey + "]"); + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + try { + // If a document does not have the enrich key, return the unchanged document + final String value = ingestDocument.getFieldValue(enrichKey, String.class, ignoreMissing); + if (value == null) { + handler.accept(ingestDocument, null); + return; + } + + TermQueryBuilder termQuery = new TermQueryBuilder(enrichKey, value); + ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery); + // TODO: Use a custom transport action instead of the search API + SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.size(1); + searchBuilder.trackScores(false); + searchBuilder.fetchSource(specifications.stream().map(s -> s.sourceField).toArray(String[]::new), null); + searchBuilder.query(constantScore); + + SearchRequest req = new SearchRequest(); + req.indices(EnrichPolicy.getBaseName(policyName)); + req.preference(Preference.LOCAL.type()); + req.source(searchBuilder); + + searchRunner.accept(req, (searchResponse, e) -> { + if (e != null) { + handler.accept(null, e); + return; + } + + // If the index is empty, return the unchanged document + // If the enrich key does not exist in the index, throw an error + // If no documents match the key, return the unchanged document + SearchHit[] searchHits = searchResponse.getHits().getHits(); + if (searchHits.length < 1) { + handler.accept(ingestDocument, null); + return; + } else if (searchHits.length > 1) { + handler.accept(null, new IllegalStateException("more than one doc id matching for [" + enrichKey + "]")); + return; + } + + // If a document is returned, add its fields to the document + Map enrichDocument = searchHits[0].getSourceAsMap(); + assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length"; + for (EnrichSpecification specification : specifications) { + Object enrichFieldValue = enrichDocument.get(specification.sourceField); + ingestDocument.setFieldValue(specification.targetField, enrichFieldValue); + } + handler.accept(ingestDocument, null); + }); + } catch (Exception e) { + handler.accept(null, e); } + } - // If a document is returned, add its fields to the document - Map enrichDocument = searchHits[0].getSourceAsMap(); - assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length"; - for (EnrichSpecification specification : specifications) { - Object enrichFieldValue = enrichDocument.get(specification.sourceField); - ingestDocument.setFieldValue(specification.targetField, enrichFieldValue); - } - return ingestDocument; + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } @Override @@ -121,4 +145,31 @@ boolean isIgnoreMissing() { List getSpecifications() { return specifications; } + + // TODO: This is temporary and will be removed once internal transport action that does an efficient lookup instead of a search. + // This semaphore purpose is to throttle the number of concurrent search requests, if this is not done then search thread pool + // on nodes may get full and search request fail because they get rejected. + // Because this code is going to change, a semaphore seemed like an easy quick fix to address this problem. + private static final Semaphore SEMAPHORE = new Semaphore(100); + + private static BiConsumer> createSearchRunner(Client client) { + return (req, handler) -> { + try { + SEMAPHORE.acquire(); + } catch (InterruptedException e) { + Thread.interrupted(); + handler.accept(null, e); + return; + } + client.search(req, ActionListener.wrap( + resp -> { + SEMAPHORE.release(); + handler.accept(resp, null); + }, + e -> { + SEMAPHORE.release(); + handler.accept(null, e); + })); + }; + } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java index 1c80f3540c016..4dc8ae51262d3 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/ExactMatchProcessorTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.search.SearchResponseSections; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.routing.Preference; -import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -36,11 +35,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.BiConsumer; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class ExactMatchProcessorTests extends ESTestCase { @@ -51,7 +52,9 @@ public void testBasics() throws Exception { IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.co")); // Run - assertThat(processor.execute(ingestDocument), notNullValue()); + IngestDocument[] holder = new IngestDocument[1]; + processor.execute(ingestDocument, (result, e) -> holder[0] = result); + assertThat(holder[0], notNullValue()); // Check request SearchRequest request = mockSearch.getCapturedRequest(); assertThat(request.indices().length, equalTo(1)); @@ -79,7 +82,9 @@ public void testNoMatch() throws Exception { Map.of("domain", "elastic.com")); int numProperties = ingestDocument.getSourceAndMetadata().size(); // Run - assertThat(processor.execute(ingestDocument), notNullValue()); + IngestDocument[] holder = new IngestDocument[1]; + processor.execute(ingestDocument, (result, e) -> holder[0] = result); + assertThat(holder[0], notNullValue()); // Check request SearchRequest request = mockSearch.getCapturedRequest(); assertThat(request.indices().length, equalTo(1)); @@ -106,7 +111,15 @@ public void testSearchFailure() throws Exception { IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.com")); // Run - IndexNotFoundException expectedException = expectThrows(IndexNotFoundException.class, () -> processor.execute(ingestDocument)); + IngestDocument[] resultHolder = new IngestDocument[1]; + Exception[] exceptionHolder = new Exception[1]; + processor.execute(ingestDocument, (result, e) -> { + resultHolder[0] = result; + exceptionHolder[0] = e; + }); + assertThat(resultHolder[0], nullValue()); + assertThat(exceptionHolder[0], notNullValue()); + assertThat(exceptionHolder[0], instanceOf(IndexNotFoundException.class)); // Check request SearchRequest request = mockSearch.getCapturedRequest(); assertThat(request.indices().length, equalTo(1)); @@ -122,7 +135,7 @@ public void testSearchFailure() throws Exception { assertThat(termQueryBuilder.fieldName(), equalTo("domain")); assertThat(termQueryBuilder.value(), equalTo("elastic.com")); // Check result - assertThat(expectedException.getMessage(), equalTo("no such index [" + indexName + "]")); + assertThat(exceptionHolder[0].getMessage(), equalTo("no such index [" + indexName + "]")); } public void testIgnoreKeyMissing() throws Exception { @@ -132,18 +145,28 @@ public void testIgnoreKeyMissing() throws Exception { IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); - assertThat(processor.execute(ingestDocument), notNullValue()); + IngestDocument[] holder = new IngestDocument[1]; + processor.execute(ingestDocument, (result, e) -> holder[0] = result); + assertThat(holder[0], notNullValue()); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); } { ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", false, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld"))); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); - expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); + IngestDocument[] resultHolder = new IngestDocument[1]; + Exception[] exceptionHolder = new Exception[1]; + processor.execute(ingestDocument, (result, e) -> { + resultHolder[0] = result; + exceptionHolder[0] = e; + }); + assertThat(resultHolder[0], nullValue()); + assertThat(exceptionHolder[0], notNullValue()); + assertThat(exceptionHolder[0], instanceOf(IllegalArgumentException.class)); } } - private static final class MockSearchFunction implements CheckedFunction { + private static final class MockSearchFunction implements BiConsumer> { private final SearchResponse mockResponse; private final SetOnce capturedRequest; private final Exception exception; @@ -161,12 +184,12 @@ private static final class MockSearchFunction implements CheckedFunction handler) { capturedRequest.set(request); if (exception != null) { - throw exception; + handler.accept(null, exception); } else { - return mockResponse; + handler.accept(mockResponse, null); } }