From 89dc07bdd958574810fea533c87d2bf3c3d2b38a Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 23 Oct 2018 11:33:48 -0500 Subject: [PATCH] ingest: better support for conditionals with simulate?verbose (#34155) This commit introduces two corrections to the way simulate?verbose handles conditionals on processors. 1) Prior to this change when executing simulate?verbose for processors with conditionals that evaluate to false, that processor would still be displayed in the result set. What was displayed was correct, such that no changes to the document occurred. However, if the conditional evaluates to false, the processor should not even be displayed. 2) Prior to this change when executing simulate?verbose for pipeline processors with conditionals, the individual steps would no longer be displayed. Commit e37e5df addressed the issue, but failed account for a conditional on the pipeline processor. Since a pipeline processor can introduce cycles and is effectively a single processor that encapsulates multiple other processors that are potentially guarded by a single conditional, special handling is needed to for pipeline and conditional pipeline processors. --- .../rest-api-spec/test/ingest/90_simulate.yml | 7 +- .../ingest/SimulateExecutionService.java | 10 +- .../ingest/ConditionalProcessor.java | 11 +- .../ingest/TrackingResultProcessor.java | 68 +++-- .../ingest/TrackingResultProcessorTests.java | 250 +++++++++++++++--- 5 files changed, 275 insertions(+), 71 deletions(-) diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index bdcfdc7f6b18b..65e888d8caa32 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -641,7 +641,6 @@ teardown: - match: { acknowledged: true } - do: - catch: /illegal_state_exception/ ingest.simulate: verbose: true body: > @@ -667,8 +666,10 @@ teardown: } ] } -- match: { error.root_cause.0.type: "illegal_state_exception" } -- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" } +- length: { docs: 1 } +- length: { docs.0.processor_results: 1 } +- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: outer" } +- match: { docs.0.processor_results.0.error.caused_by.caused_by.reason: "Cycle detected for pipeline: outer" } --- "Test verbose simulate with Pipeline Processor with Multiple Pipelines": diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index c081707f4dbda..e2b44ae2a7a60 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -21,17 +21,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; -import java.util.Collections; -import java.util.IdentityHashMap; import java.util.List; -import java.util.Set; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; @@ -46,11 +42,9 @@ class SimulateExecutionService { } SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { - // Prevent cycles in pipeline decoration - final Set pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); if (verbose) { List processorResultList = new ArrayList<>(); - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); try { Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), verbosePipelineProcessor); diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index 9078dc86c1b07..2493f291bcddf 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -62,10 +62,7 @@ public class ConditionalProcessor extends AbstractProcessor { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - IngestConditionalScript script = - scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); - if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { - // Only record metric if the script evaluates to true + if (evaluate(ingestDocument)) { long startTimeInNanos = relativeTimeProvider.getAsLong(); try { metric.preIngest(); @@ -81,6 +78,12 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { return ingestDocument; } + boolean evaluate(IngestDocument ingestDocument) { + IngestConditionalScript script = + scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); + return script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata())); + } + Processor getProcessor() { return processor; } diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 41a984be5adad..4b78715144649 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -19,11 +19,11 @@ package org.elasticsearch.ingest; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ingest.SimulateProcessorResult; import java.util.ArrayList; import java.util.List; -import java.util.Set; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -42,14 +42,46 @@ public final class TrackingResultProcessor implements Processor { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + Processor processor = actualProcessor; try { - actualProcessor.execute(ingestDocument); - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); + if (processor instanceof ConditionalProcessor) { + ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor; + if (conditionalProcessor.evaluate(ingestDocument) == false) { + return ingestDocument; + } + if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) { + processor = conditionalProcessor.getProcessor(); + } + } + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + Pipeline pipeline = pipelineProcessor.getPipeline(); + //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines + try { + IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); + ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline()); + } catch (ElasticsearchException elasticsearchException) { + if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) { + throw elasticsearchException; + } + //else do nothing, let the tracking processors throw the exception while recording the path up to the failure + } catch (Exception e) { + // do nothing, let the tracking processors throw the exception while recording the path up to the failure + } + //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline); + } else { + processor.execute(ingestDocument); + processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); + } } catch (Exception e) { if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e)); + processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); } else { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e)); + processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); } throw e; } @@ -66,35 +98,19 @@ public String getTag() { return actualProcessor.getTag(); } - public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList, - Set pipelinesSeen) { + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getProcessors()) { - if (processor instanceof PipelineProcessor) { - PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); - if (pipelinesSeen.add(pipelineProcessor) == false) { - throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); - } - processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); - pipelinesSeen.remove(pipelineProcessor); - } else if (processor instanceof CompoundProcessor) { - processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); + if (processor instanceof CompoundProcessor) { + processors.add(decorate((CompoundProcessor) processor, processorResultList)); } else { processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } } List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getOnFailureProcessors()) { - if (processor instanceof PipelineProcessor) { - PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); - if (pipelinesSeen.add(pipelineProcessor) == false) { - throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); - } - onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, - pipelinesSeen)); - pipelinesSeen.remove(pipelineProcessor); - } else if (processor instanceof CompoundProcessor) { - onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); + if (processor instanceof CompoundProcessor) { + onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); } else { onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index fbb46fc8787af..2c047283ed1bb 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -21,17 +21,22 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ingest.SimulateProcessorResult; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; @@ -39,10 +44,11 @@ import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -50,13 +56,11 @@ public class TrackingResultProcessorTests extends ESTestCase { private IngestDocument ingestDocument; private List resultList; - private Set pipelinesSeen; @Before public void init() { ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); resultList = new ArrayList<>(); - pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); } public void testActualProcessor() throws Exception { @@ -76,9 +80,9 @@ public void testActualProcessor() throws Exception { public void testActualCompoundProcessorWithoutOnFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); try { trackingProcessor.execute(ingestDocument); @@ -97,14 +101,14 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception { public void testActualCompoundProcessorWithOnFailure() throws Exception { RuntimeException exception = new RuntimeException("fail"); - TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); + TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); CompoundProcessor actualProcessor = new CompoundProcessor(false, Arrays.asList(new CompoundProcessor(false, Arrays.asList(failProcessor, onFailureProcessor), Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor)); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); @@ -139,10 +143,10 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception { public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); @@ -154,6 +158,45 @@ public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); } + public void testActualCompoundProcessorWithFalseConditional() throws Exception { + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + String scriptName = "conditionalScript"; + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> false), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + + CompoundProcessor compoundProcessor = new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); })), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })); + + CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList); + trackingProcessor.execute(ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); + + //the step for key 2 is never executed due to conditional and thus not part of the result set + assertThat(resultList.size(), equalTo(2)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + } + public void testActualPipelineProcessor() throws Exception { String pipelineId = "pipeline1"; IngestService ingestService = mock(IngestService.class); @@ -176,13 +219,13 @@ pipelineId, null, null, new CompoundProcessor( PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService).getPipeline(pipelineId); + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(3)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); @@ -198,6 +241,142 @@ pipelineId, null, null, new CompoundProcessor( assertThat(resultList.get(2).getProcessorTag(), nullValue()); } + public void testActualPipelineProcessorWithTrueConditional() throws Exception { + String pipelineId1 = "pipeline1"; + String pipelineId2 = "pipeline2"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig0 = new HashMap<>(); + pipelineConfig0.put("name", pipelineId1); + Map pipelineConfig1 = new HashMap<>(); + pipelineConfig1.put("name", pipelineId1); + Map pipelineConfig2 = new HashMap<>(); + pipelineConfig2.put("name", pipelineId2); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + String scriptName = "conditionalScript"; + + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> true), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + + Pipeline pipeline1 = new Pipeline( + pipelineId1, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + factory.create(Collections.emptyMap(), null, pipelineConfig2)), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); }) + ) + ); + + Pipeline pipeline2 = new Pipeline( + pipelineId2, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); }))); + + when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + + trackingProcessor.execute(ingestDocument); + + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2); + assertThat(resultList.size(), equalTo(3)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(2).getFailure(), nullValue()); + assertThat(resultList.get(2).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithFalseConditional() throws Exception { + String pipelineId1 = "pipeline1"; + String pipelineId2 = "pipeline2"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig0 = new HashMap<>(); + pipelineConfig0.put("name", pipelineId1); + Map pipelineConfig1 = new HashMap<>(); + pipelineConfig1.put("name", pipelineId1); + Map pipelineConfig2 = new HashMap<>(); + pipelineConfig2.put("name", pipelineId2); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + String scriptName = "conditionalScript"; + + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> false), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + + Pipeline pipeline1 = new Pipeline( + pipelineId1, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + factory.create(Collections.emptyMap(), null, pipelineConfig2)), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); }) + ) + ); + + Pipeline pipeline2 = new Pipeline( + pipelineId2, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); }))); + + when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + + trackingProcessor.execute(ingestDocument); + + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); + verify(ingestService, Mockito.never()).getPipeline(pipelineId2); + assertThat(resultList.size(), equalTo(2)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + } + public void testActualPipelineProcessorWithHandledFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); @@ -226,13 +405,13 @@ pipelineId, null, null, new CompoundProcessor( PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService).getPipeline(pipelineId); + verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(4)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); @@ -253,25 +432,36 @@ pipelineId, null, null, new CompoundProcessor( } public void testActualPipelineProcessorWithCycle() throws Exception { - String pipelineId = "pipeline1"; + String pipelineId1 = "pipeline1"; + String pipelineId2 = "pipeline2"; IngestService ingestService = mock(IngestService.class); - Map pipelineConfig = new HashMap<>(); - pipelineConfig.put("name", pipelineId); + Map pipelineConfig0 = new HashMap<>(); + pipelineConfig0.put("name", pipelineId1); + Map pipelineConfig1 = new HashMap<>(); + pipelineConfig1.put("name", pipelineId1); + Map pipelineConfig2 = new HashMap<>(); + pipelineConfig2.put("name", pipelineId2); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); - PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); - Pipeline pipeline = new Pipeline( - pipelineId, null, null, new CompoundProcessor(pipelineProcessor) - ); - when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + Pipeline pipeline1 = new Pipeline( + pipelineId1, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, pipelineConfig2))); + + Pipeline pipeline2 = new Pipeline( + pipelineId2, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, pipelineConfig1))); + when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - IllegalStateException exception = expectThrows(IllegalStateException.class, - () -> decorate(actualProcessor, resultList, pipelinesSeen)); - assertThat(exception.getMessage(), equalTo("Cycle detected for pipeline: pipeline1")); - } + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> trackingProcessor.execute(ingestDocument)); + assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class)); + assertThat(exception.getCause().getCause(), instanceOf(IllegalStateException.class)); + assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1")); + } public void testActualPipelineProcessorRepeatedInvocation() throws Exception { String pipelineId = "pipeline1"; @@ -284,19 +474,19 @@ public void testActualPipelineProcessorRepeatedInvocation() throws Exception { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); Pipeline pipeline = new Pipeline( pipelineId, null, null, new CompoundProcessor( - new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) ); when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService, times(2)).getPipeline(pipelineId); + verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(2)); assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument())));