diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index d9dba2cc10073..d58a48e70c9ad 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.IngestPlugin; diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index 355ba2d42104a..c7c5df1e06f99 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -110,4 +110,4 @@ teardown: pipeline: "outer" body: {} - match: { error.root_cause.0.type: "exception" } -- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." } +- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: inner" } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 776a8af0c2420..46c4fb0a69e58 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -605,3 +605,150 @@ teardown: - length: { docs.0.processor_results.1: 2 } - match: { docs.0.processor_results.1.tag: "rename-1" } - match: { docs.0.processor_results.1.doc._source.new_status: 200 } + +--- +"Test verbose simulate with Pipeline Processor with Circular Pipelines": +- do: + ingest.put_pipeline: + id: "outer" + body: > + { + "description" : "outer pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "inner" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "inner" + body: > + { + "description" : "inner pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } +- match: { acknowledged: true } + +- do: + catch: /illegal_state_exception/ + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } + , + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "field1": "123.42 400 " + } + } + ] + } +- match: { error.root_cause.0.type: "illegal_state_exception" } +- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" } + +--- +"Test verbose simulate with Pipeline Processor with Multiple Pipelines": +- do: + ingest.put_pipeline: + id: "pipeline1" + body: > + { + "processors": [ + { + "set": { + "field": "pipeline1", + "value": true + } + }, + { + "pipeline": { + "pipeline": "pipeline2" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "pipeline2" + body: > + { + "processors": [ + { + "set": { + "field": "pipeline2", + "value": true + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors": [ + { + "set": { + "field": "pipeline0", + "value": true + } + }, + { + "pipeline": { + "pipeline": "pipeline1" + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "field1": "123.42 400 " + } + } + ] + } +- length: { docs: 1 } +- length: { docs.0.processor_results: 3 } +- match: { docs.0.processor_results.0.doc._source.pipeline0: true } +- is_false: docs.0.processor_results.0.doc._source.pipeline1 +- is_false: docs.0.processor_results.0.doc._source.pipeline2 +- match: { docs.0.processor_results.1.doc._source.pipeline0: true } +- match: { docs.0.processor_results.1.doc._source.pipeline1: true } +- is_false: docs.0.processor_results.1.doc._source.pipeline2 +- match: { docs.0.processor_results.2.doc._source.pipeline0: true } +- match: { docs.0.processor_results.2.doc._source.pipeline1: true } +- match: { docs.0.processor_results.2.doc._source.pipeline2: true } + diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 430da9955bafa..c081707f4dbda 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -24,12 +24,16 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Set; -import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; +import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; class SimulateExecutionService { @@ -42,11 +46,15 @@ class SimulateExecutionService { } SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { + // Prevent cycles in pipeline decoration + final Set pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); if (verbose) { List processorResultList = new ArrayList<>(); - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen); try { - verbosePipelineProcessor.execute(ingestDocument); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline); return new SimulateDocumentVerboseResult(processorResultList); } catch (Exception e) { return new SimulateDocumentVerboseResult(processorResultList); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 10cb2fd17fec6..719558edbf748 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -647,7 +647,7 @@ private static Object deepCopy(Object value) { public IngestDocument executePipeline(Pipeline pipeline) throws Exception { try { if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); + throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()); } return pipeline.execute(this); } finally { diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java similarity index 87% rename from modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 1958a3e5232b8..918ff6b8aefee 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -17,15 +17,9 @@ * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Map; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.ConfigurationUtils; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.Processor; public class PipelineProcessor extends AbstractProcessor { @@ -50,6 +44,10 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { return ingestDocument.executePipeline(pipeline); } + Pipeline getPipeline(){ + return ingestService.getPipeline(pipelineName); + } + @Override public String getType() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java similarity index 65% rename from server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 04c0fe7ca49dc..41a984be5adad 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -17,14 +17,13 @@ * under the License. */ -package org.elasticsearch.action.ingest; +package org.elasticsearch.ingest; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; +import org.elasticsearch.action.ingest.SimulateProcessorResult; import java.util.ArrayList; import java.util.List; +import java.util.Set; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -35,7 +34,7 @@ public final class TrackingResultProcessor implements Processor { private final List processorResultList; private final boolean ignoreFailure; - public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { + TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { this.ignoreFailure = ignoreFailure; this.processorResultList = processorResultList; this.actualProcessor = actualProcessor; @@ -67,19 +66,35 @@ public String getTag() { return actualProcessor.getTag(); } - public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList, + Set pipelinesSeen) { List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getProcessors()) { - if (processor instanceof CompoundProcessor) { - processors.add(decorate((CompoundProcessor) processor, processorResultList)); + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + if (pipelinesSeen.add(pipelineProcessor) == false) { + throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); + } + processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); + pipelinesSeen.remove(pipelineProcessor); + } else if (processor instanceof CompoundProcessor) { + processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } } List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getOnFailureProcessors()) { - if (processor instanceof CompoundProcessor) { - onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + if (pipelinesSeen.add(pipelineProcessor) == false) { + throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); + } + onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, + pipelinesSeen)); + pipelinesSeen.remove(pipelineProcessor); + } else if (processor instanceof CompoundProcessor) { + onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java deleted file mode 100644 index 3572a04529b43..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; -import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; - -public class TrackingResultProcessorTests extends ESTestCase { - - private IngestDocument ingestDocument; - private List resultList; - - @Before - public void init() { - ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); - resultList = new ArrayList<>(); - } - - public void testActualProcessor() throws Exception { - TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); - TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - - assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - - assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(0).getFailure(), nullValue()); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithoutOnFailure() throws Exception { - RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); - CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - - try { - trackingProcessor.execute(ingestDocument); - fail("processor should throw exception"); - } catch (ElasticsearchException e) { - assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); - } - - SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); - assertThat(testProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - assertThat(resultList.get(0).getIngestDocument(), nullValue()); - assertThat(resultList.get(0).getFailure(), equalTo(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithOnFailure() throws Exception { - RuntimeException exception = new RuntimeException("fail"); - TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); - TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); - CompoundProcessor actualProcessor = new CompoundProcessor(false, - Arrays.asList(new CompoundProcessor(false, - Arrays.asList(failProcessor, onFailureProcessor), - Arrays.asList(onFailureProcessor, failProcessor))), - Arrays.asList(onFailureProcessor)); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); - SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); - - assertThat(failProcessor.getInvokedCounter(), equalTo(2)); - assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); - assertThat(resultList.size(), equalTo(4)); - - assertThat(resultList.get(0).getIngestDocument(), nullValue()); - assertThat(resultList.get(0).getFailure(), equalTo(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); - - Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); - assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); - assertThat(resultList.get(1).getFailure(), nullValue()); - assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); - - assertThat(resultList.get(2).getIngestDocument(), nullValue()); - assertThat(resultList.get(2).getFailure(), equalTo(exception)); - assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); - - metadata = resultList.get(3).getIngestDocument().getIngestMetadata(); - assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); - assertThat(resultList.get(3).getFailure(), nullValue()); - assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { - RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); - CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), - Collections.emptyList()); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); - assertThat(testProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(0).getFailure(), sameInstance(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); - } -} diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java similarity index 97% rename from modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java rename to server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 6e18bac40d4aa..99fa7633d085a 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Collections; import java.util.HashMap; @@ -27,6 +27,7 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; @@ -110,7 +111,7 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { () -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument) ); assertEquals( - "Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage() + "Cycle detected for pipeline: inner", e.getRootCause().getMessage() ); } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java new file mode 100644 index 0000000000000..7a7f9b773727f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -0,0 +1,315 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ingest.SimulateProcessorResult; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; +import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TrackingResultProcessorTests extends ESTestCase { + + private IngestDocument ingestDocument; + private List resultList; + private Set pipelinesSeen; + + @Before + public void init() { + ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); + resultList = new ArrayList<>(); + pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); + } + + public void testActualProcessor() throws Exception { + TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); + TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + + assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithoutOnFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + try { + trackingProcessor.execute(ingestDocument); + fail("processor should throw exception"); + } catch (ElasticsearchException e) { + assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); + } + + SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithOnFailure() throws Exception { + RuntimeException exception = new RuntimeException("fail"); + TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); + TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); + CompoundProcessor actualProcessor = new CompoundProcessor(false, + Arrays.asList(new CompoundProcessor(false, + Arrays.asList(failProcessor, onFailureProcessor), + Arrays.asList(onFailureProcessor, failProcessor))), + Arrays.asList(onFailureProcessor)); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); + + assertThat(failProcessor.getInvokedCounter(), equalTo(2)); + assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); + assertThat(resultList.size(), equalTo(4)); + + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + + assertThat(resultList.get(2).getIngestDocument(), nullValue()); + assertThat(resultList.get(2).getFailure(), equalTo(exception)); + assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + metadata = resultList.get(3).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), + Collections.emptyList()); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(0).getFailure(), sameInstance(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); + } + + public void testActualPipelineProcessor() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); }), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(3)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(2).getFailure(), nullValue()); + assertThat(resultList.get(2).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithHandledFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); }), + new CompoundProcessor( + false, + Collections.singletonList(new TestProcessor(ingestDocument -> { throw exception; })), + Collections.singletonList(new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); })) + ), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(4)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + //failed processor + assertNull(resultList.get(1).getIngestDocument()); + assertThat(resultList.get(1).getFailure().getMessage(), equalTo(exception.getMessage())); + + assertTrue(resultList.get(2).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(2).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(2).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithCycle() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor(pipelineProcessor) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + IllegalStateException exception = expectThrows(IllegalStateException.class, + () -> decorate(actualProcessor, resultList, pipelinesSeen)); + assertThat(exception.getMessage(), equalTo("Cycle detected for pipeline: pipeline1")); + } + + + public void testActualPipelineProcessorRepeatedInvocation() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, times(2)).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(2)); + + assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument()))); + assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getProcessorTag(), nullValue()); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + + //each invocation updates key1 with a random int + assertNotEquals(resultList.get(0).getIngestDocument().getSourceAndMetadata().get(key1), + resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1)); + } + +}