From 19af20d1f3eaf7d49fd274471451deb99ffc30e9 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 18 Sep 2018 16:17:31 -0500 Subject: [PATCH 1/4] ingest: support simulate with verbose for pipeline processor This change better supports the use of simulate?verbose with the pipeline processor. Prior to this change any pipeline processors executed with simulate?verbose would not show the full steps that include the intermediate processors for the inner pipelines. This changes also moves the PipelineProcess and TrackingResultProcessor classes to enable instance checks and to avoid overly public classes. --- .../ingest/common/IngestCommonPlugin.java | 1 + .../rest-api-spec/test/ingest/90_simulate.yml | 147 +++++++++++++++ .../ingest/SimulateExecutionService.java | 14 +- .../ingest}/PipelineProcessor.java | 12 +- .../ingest/TrackingResultProcessor.java | 35 +++- .../ingest}/PipelineProcessorTests.java | 3 +- .../ingest/TrackingResultProcessorTests.java | 171 ++++++++++++++++-- 7 files changed, 351 insertions(+), 32 deletions(-) rename {modules/ingest-common/src/main/java/org/elasticsearch/ingest/common => server/src/main/java/org/elasticsearch/ingest}/PipelineProcessor.java (87%) rename server/src/main/java/org/elasticsearch/{action => }/ingest/TrackingResultProcessor.java (66%) rename {modules/ingest-common/src/test/java/org/elasticsearch/ingest/common => server/src/test/java/org/elasticsearch/ingest}/PipelineProcessorTests.java (98%) rename server/src/test/java/org/elasticsearch/{action => }/ingest/TrackingResultProcessorTests.java (50%) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index d9dba2cc10073..d58a48e70c9ad 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.IngestPlugin; diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 776a8af0c2420..5596227bbe0fc 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -605,3 +605,150 @@ teardown: - length: { docs.0.processor_results.1: 2 } - match: { docs.0.processor_results.1.tag: "rename-1" } - match: { docs.0.processor_results.1.doc._source.new_status: 200 } + +--- +"Test verbose simulate with Pipeline Processor with Circular Pipelines": +- do: + ingest.put_pipeline: + id: "outer" + body: > + { + "description" : "outer pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "inner" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "inner" + body: > + { + "description" : "inner pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } +- match: { acknowledged: true } + +- do: + catch: /illegal_state_exception/ + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } + , + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "field1": "123.42 400 " + } + } + ] + } +- match: { error.root_cause.0.type: "illegal_state_exception" } +- match: { error.root_cause.0.reason: "Recursive invocation of pipeline [inner] detected." } + +--- +"Test verbose simulate with Pipeline Processor with Multiple Pipelines": +- do: + ingest.put_pipeline: + id: "pipeline1" + body: > + { + "processors": [ + { + "set": { + "field": "pipeline1", + "value": true + } + }, + { + "pipeline": { + "pipeline": "pipeline2" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "pipeline2" + body: > + { + "processors": [ + { + "set": { + "field": "pipeline2", + "value": true + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors": [ + { + "set": { + "field": "pipeline0", + "value": true + } + }, + { + "pipeline": { + "pipeline": "pipeline1" + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "field1": "123.42 400 " + } + } + ] + } +- length: { docs: 1 } +- length: { docs.0.processor_results: 3 } +- match: { docs.0.processor_results.0.doc._source.pipeline0: true } +- is_false: docs.0.processor_results.0.doc._source.pipeline1 +- is_false: docs.0.processor_results.0.doc._source.pipeline2 +- match: { docs.0.processor_results.1.doc._source.pipeline0: true } +- match: { docs.0.processor_results.1.doc._source.pipeline1: true } +- is_false: docs.0.processor_results.1.doc._source.pipeline2 +- match: { docs.0.processor_results.2.doc._source.pipeline0: true } +- match: { docs.0.processor_results.2.doc._source.pipeline1: true } +- match: { docs.0.processor_results.2.doc._source.pipeline2: true } + diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 430da9955bafa..c081707f4dbda 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -24,12 +24,16 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Set; -import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; +import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; class SimulateExecutionService { @@ -42,11 +46,15 @@ class SimulateExecutionService { } SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { + // Prevent cycles in pipeline decoration + final Set pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); if (verbose) { List processorResultList = new ArrayList<>(); - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen); try { - verbosePipelineProcessor.execute(ingestDocument); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline); return new SimulateDocumentVerboseResult(processorResultList); } catch (Exception e) { return new SimulateDocumentVerboseResult(processorResultList); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java similarity index 87% rename from modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 1958a3e5232b8..918ff6b8aefee 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -17,15 +17,9 @@ * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Map; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.ConfigurationUtils; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.Processor; public class PipelineProcessor extends AbstractProcessor { @@ -50,6 +44,10 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { return ingestDocument.executePipeline(pipeline); } + Pipeline getPipeline(){ + return ingestService.getPipeline(pipelineName); + } + @Override public String getType() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java similarity index 66% rename from server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 04c0fe7ca49dc..4c7ea8142e3a5 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -17,14 +17,13 @@ * under the License. */ -package org.elasticsearch.action.ingest; +package org.elasticsearch.ingest; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; +import org.elasticsearch.action.ingest.SimulateProcessorResult; import java.util.ArrayList; import java.util.List; +import java.util.Set; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -35,7 +34,7 @@ public final class TrackingResultProcessor implements Processor { private final List processorResultList; private final boolean ignoreFailure; - public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { + TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { this.ignoreFailure = ignoreFailure; this.processorResultList = processorResultList; this.actualProcessor = actualProcessor; @@ -67,19 +66,35 @@ public String getTag() { return actualProcessor.getTag(); } - public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList, + Set pipelinesSeen) { List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getProcessors()) { - if (processor instanceof CompoundProcessor) { - processors.add(decorate((CompoundProcessor) processor, processorResultList)); + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + if (pipelinesSeen.add(pipelineProcessor) == false) { + throw new IllegalStateException("Recursive invocation of pipeline [" + + pipelineProcessor.getPipeline().getId() + "] detected."); + } + processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); + } else if (processor instanceof CompoundProcessor) { + processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } } List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getOnFailureProcessors()) { - if (processor instanceof CompoundProcessor) { - onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + if (pipelinesSeen.add(pipelineProcessor) == false) { + throw new IllegalStateException("Recursive invocation of pipeline [" + + pipelineProcessor.getPipeline().getId() + "] detected."); + } + onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, + pipelinesSeen)); + } else if (processor instanceof CompoundProcessor) { + onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java similarity index 98% rename from modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java rename to server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 6e18bac40d4aa..509940ecafa9a 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Collections; import java.util.HashMap; @@ -27,6 +27,7 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; diff --git a/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java similarity index 50% rename from server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java rename to server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 3572a04529b43..307da6603c758 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -17,12 +17,14 @@ * under the License. */ -package org.elasticsearch.action.ingest; +package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ingest.SimulateProcessorResult; import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.TrackingResultProcessor; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -30,30 +32,38 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; -import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; +import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TrackingResultProcessorTests extends ESTestCase { private IngestDocument ingestDocument; private List resultList; + private Set pipelinesSeen; @Before public void init() { ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); resultList = new ArrayList<>(); + pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); } public void testActualProcessor() throws Exception { - TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); + TestProcessor actualProcessor = new TestProcessor(ingestDocument -> { + }); TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); trackingProcessor.execute(ingestDocument); @@ -69,9 +79,11 @@ public void testActualProcessor() throws Exception { public void testActualCompoundProcessorWithoutOnFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { + throw exception; + }); CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); try { trackingProcessor.execute(ingestDocument); @@ -90,14 +102,17 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception { public void testActualCompoundProcessorWithOnFailure() throws Exception { RuntimeException exception = new RuntimeException("fail"); - TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); - TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); + TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { + throw exception; + }); + TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> { + }); CompoundProcessor actualProcessor = new CompoundProcessor(false, Arrays.asList(new CompoundProcessor(false, Arrays.asList(failProcessor, onFailureProcessor), Arrays.asList(onFailureProcessor, failProcessor))), - Arrays.asList(onFailureProcessor)); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + Arrays.asList(onFailureProcessor)); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); @@ -132,10 +147,12 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception { public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { + throw exception; + }); CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); trackingProcessor.execute(ingestDocument); @@ -146,4 +163,136 @@ public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { assertThat(resultList.get(0).getFailure(), sameInstance(exception)); assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); } + + public void testActualPipelineProcessor() throws Exception { + String innerPipelineId = "inner"; + IngestService ingestService = mock(IngestService.class); + Map outerConfig = new HashMap<>(); + outerConfig.put("pipeline", innerPipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + Pipeline inner = new Pipeline( + innerPipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { + ingestDocument.setFieldValue(key1, randomInt()); + }), + new TestProcessor(ingestDocument -> { + ingestDocument.setFieldValue(key2, randomInt()); + }), + new TestProcessor(ingestDocument -> { + ingestDocument.setFieldValue(key3, randomInt()); + })) + ); + when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, outerConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService).getPipeline(innerPipelineId); + assertThat(resultList.size(), equalTo(3)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(2).getFailure(), nullValue()); + assertThat(resultList.get(2).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithHandledFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + + String innerPipelineId = "inner"; + IngestService ingestService = mock(IngestService.class); + Map outerConfig = new HashMap<>(); + outerConfig.put("pipeline", innerPipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + Pipeline inner = new Pipeline( + innerPipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { + ingestDocument.setFieldValue(key1, randomInt()); + }), + new CompoundProcessor( + false, + Collections.singletonList(new TestProcessor(ingestDocument -> { + throw exception; + })), + Collections.singletonList(new TestProcessor(ingestDocument -> { + ingestDocument.setFieldValue(key2, randomInt()); + })) + ), + new TestProcessor(ingestDocument -> { + ingestDocument.setFieldValue(key3, randomInt()); + })) + ); + when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, outerConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService).getPipeline(innerPipelineId); + assertThat(resultList.size(), equalTo(4)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + //failed processor + assertNull(resultList.get(1).getIngestDocument()); + assertThat(resultList.get(1).getFailure().getMessage(), equalTo(exception.getMessage())); + + assertTrue(resultList.get(2).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(2).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(2).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithCycle() throws Exception { + String innerPipelineId = "inner"; + IngestService ingestService = mock(IngestService.class); + Map outerConfig = new HashMap<>(); + outerConfig.put("pipeline", innerPipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + PipelineProcessor outerPipeline = factory.create(Collections.emptyMap(), null, outerConfig); + Pipeline inner = new Pipeline( + innerPipelineId, null, null, new CompoundProcessor(outerPipeline) + ); + when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); + + CompoundProcessor actualProcessor = new CompoundProcessor(outerPipeline); + + IllegalStateException exception = expectThrows(IllegalStateException.class, + () -> decorate(actualProcessor, resultList, pipelinesSeen)); + assertThat(exception.getMessage(), equalTo("Recursive invocation of pipeline [inner] detected.")); + } } From 7f0d154d1b0fcc99f6edbf79c3c280adc43a547c Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 19 Sep 2018 08:39:21 -0500 Subject: [PATCH 2/4] fix noisy re-formatting --- .../ingest/TrackingResultProcessorTests.java | 42 +++++-------------- 1 file changed, 11 insertions(+), 31 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 307da6603c758..6dffe879b7b0d 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -62,8 +62,7 @@ public void init() { } public void testActualProcessor() throws Exception { - TestProcessor actualProcessor = new TestProcessor(ingestDocument -> { - }); + TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); trackingProcessor.execute(ingestDocument); @@ -79,9 +78,7 @@ public void testActualProcessor() throws Exception { public void testActualCompoundProcessorWithoutOnFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { - throw exception; - }); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); @@ -102,11 +99,8 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception { public void testActualCompoundProcessorWithOnFailure() throws Exception { RuntimeException exception = new RuntimeException("fail"); - TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { - throw exception; - }); - TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> { - }); + TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); + TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); CompoundProcessor actualProcessor = new CompoundProcessor(false, Arrays.asList(new CompoundProcessor(false, Arrays.asList(failProcessor, onFailureProcessor), @@ -147,9 +141,7 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception { public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { - throw exception; - }); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); @@ -177,15 +169,9 @@ public void testActualPipelineProcessor() throws Exception { Pipeline inner = new Pipeline( innerPipelineId, null, null, new CompoundProcessor( - new TestProcessor(ingestDocument -> { - ingestDocument.setFieldValue(key1, randomInt()); - }), - new TestProcessor(ingestDocument -> { - ingestDocument.setFieldValue(key2, randomInt()); - }), - new TestProcessor(ingestDocument -> { - ingestDocument.setFieldValue(key3, randomInt()); - })) + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); }), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) ); when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); @@ -234,16 +220,10 @@ innerPipelineId, null, null, new CompoundProcessor( }), new CompoundProcessor( false, - Collections.singletonList(new TestProcessor(ingestDocument -> { - throw exception; - })), - Collections.singletonList(new TestProcessor(ingestDocument -> { - ingestDocument.setFieldValue(key2, randomInt()); - })) + Collections.singletonList(new TestProcessor(ingestDocument -> { throw exception; })), + Collections.singletonList(new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); })) ), - new TestProcessor(ingestDocument -> { - ingestDocument.setFieldValue(key3, randomInt()); - })) + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) ); when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); From 36eafc98c75e4225a2a79377dce58291ff1fbd00 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 19 Sep 2018 10:11:27 -0500 Subject: [PATCH 3/4] support repeated invocations and clean up variable names in tests --- .../ingest/TrackingResultProcessor.java | 2 + .../ingest/TrackingResultProcessorTests.java | 101 ++++++++++++------ 2 files changed, 71 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 4c7ea8142e3a5..037fe9f7542c2 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -77,6 +77,7 @@ public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, Li + pipelineProcessor.getPipeline().getId() + "] detected."); } processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); + pipelinesSeen.remove(pipelineProcessor); } else if (processor instanceof CompoundProcessor) { processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { @@ -93,6 +94,7 @@ public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, Li } onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); + pipelinesSeen.remove(pipelineProcessor); } else if (processor instanceof CompoundProcessor) { onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 6dffe879b7b0d..e8b327c1c046f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -21,10 +21,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ingest.SimulateProcessorResult; -import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.TrackingResultProcessor; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -42,9 +38,11 @@ import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -157,25 +155,25 @@ public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { } public void testActualPipelineProcessor() throws Exception { - String innerPipelineId = "inner"; + String pipelineId = "pipeline1"; IngestService ingestService = mock(IngestService.class); - Map outerConfig = new HashMap<>(); - outerConfig.put("pipeline", innerPipelineId); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); String key1 = randomAlphaOfLength(10); String key2 = randomAlphaOfLength(10); String key3 = randomAlphaOfLength(10); - Pipeline inner = new Pipeline( - innerPipelineId, null, null, new CompoundProcessor( + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); }), new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) ); - when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); - PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, outerConfig); + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); @@ -184,7 +182,7 @@ innerPipelineId, null, null, new CompoundProcessor( SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService).getPipeline(innerPipelineId); + verify(ingestService).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(3)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); @@ -203,21 +201,19 @@ innerPipelineId, null, null, new CompoundProcessor( public void testActualPipelineProcessorWithHandledFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - String innerPipelineId = "inner"; + String pipelineId = "pipeline1"; IngestService ingestService = mock(IngestService.class); - Map outerConfig = new HashMap<>(); - outerConfig.put("pipeline", innerPipelineId); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); String key1 = randomAlphaOfLength(10); String key2 = randomAlphaOfLength(10); String key3 = randomAlphaOfLength(10); - Pipeline inner = new Pipeline( - innerPipelineId, null, null, new CompoundProcessor( - new TestProcessor(ingestDocument -> { - ingestDocument.setFieldValue(key1, randomInt()); - }), + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); }), new CompoundProcessor( false, Collections.singletonList(new TestProcessor(ingestDocument -> { throw exception; })), @@ -225,9 +221,9 @@ innerPipelineId, null, null, new CompoundProcessor( ), new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) ); - when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); - PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, outerConfig); + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); @@ -236,7 +232,7 @@ innerPipelineId, null, null, new CompoundProcessor( SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService).getPipeline(innerPipelineId); + verify(ingestService).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(4)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); @@ -257,22 +253,63 @@ innerPipelineId, null, null, new CompoundProcessor( } public void testActualPipelineProcessorWithCycle() throws Exception { - String innerPipelineId = "inner"; + String pipelineId = "pipeline1"; IngestService ingestService = mock(IngestService.class); - Map outerConfig = new HashMap<>(); - outerConfig.put("pipeline", innerPipelineId); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); - PipelineProcessor outerPipeline = factory.create(Collections.emptyMap(), null, outerConfig); - Pipeline inner = new Pipeline( - innerPipelineId, null, null, new CompoundProcessor(outerPipeline) + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor(pipelineProcessor) ); - when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); - CompoundProcessor actualProcessor = new CompoundProcessor(outerPipeline); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); IllegalStateException exception = expectThrows(IllegalStateException.class, () -> decorate(actualProcessor, resultList, pipelinesSeen)); - assertThat(exception.getMessage(), equalTo("Recursive invocation of pipeline [inner] detected.")); + assertThat(exception.getMessage(), equalTo("Recursive invocation of pipeline [pipeline1] detected.")); } + + + public void testActualPipelineProcessorRepeatedInvocation() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, times(2)).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(2)); + + assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument()))); + assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getProcessorTag(), nullValue()); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + + //each invocation updates key1 with a random int + assertNotEquals(resultList.get(0).getIngestDocument().getSourceAndMetadata().get(key1), + resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1)); + } + } From db4d53e55cf576ef127fdb7ce3969b263e3b16e4 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 19 Sep 2018 16:32:42 -0500 Subject: [PATCH 4/4] re-word 'recursive' error case to 'cycle' --- .../rest-api-spec/test/ingest/210_pipeline_processor.yml | 2 +- .../resources/rest-api-spec/test/ingest/90_simulate.yml | 2 +- .../main/java/org/elasticsearch/ingest/IngestDocument.java | 2 +- .../org/elasticsearch/ingest/TrackingResultProcessor.java | 6 ++---- .../org/elasticsearch/ingest/PipelineProcessorTests.java | 2 +- .../elasticsearch/ingest/TrackingResultProcessorTests.java | 2 +- 6 files changed, 7 insertions(+), 9 deletions(-) diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index 355ba2d42104a..c7c5df1e06f99 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -110,4 +110,4 @@ teardown: pipeline: "outer" body: {} - match: { error.root_cause.0.type: "exception" } -- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." } +- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: inner" } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 5596227bbe0fc..46c4fb0a69e58 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -668,7 +668,7 @@ teardown: ] } - match: { error.root_cause.0.type: "illegal_state_exception" } -- match: { error.root_cause.0.reason: "Recursive invocation of pipeline [inner] detected." } +- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" } --- "Test verbose simulate with Pipeline Processor with Multiple Pipelines": diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 10cb2fd17fec6..719558edbf748 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -647,7 +647,7 @@ private static Object deepCopy(Object value) { public IngestDocument executePipeline(Pipeline pipeline) throws Exception { try { if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); + throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()); } return pipeline.execute(this); } finally { diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 037fe9f7542c2..41a984be5adad 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -73,8 +73,7 @@ public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, Li if (processor instanceof PipelineProcessor) { PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); if (pipelinesSeen.add(pipelineProcessor) == false) { - throw new IllegalStateException("Recursive invocation of pipeline [" - + pipelineProcessor.getPipeline().getId() + "] detected."); + throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); } processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); pipelinesSeen.remove(pipelineProcessor); @@ -89,8 +88,7 @@ public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, Li if (processor instanceof PipelineProcessor) { PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); if (pipelinesSeen.add(pipelineProcessor) == false) { - throw new IllegalStateException("Recursive invocation of pipeline [" - + pipelineProcessor.getPipeline().getId() + "] detected."); + throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); } onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 509940ecafa9a..99fa7633d085a 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -111,7 +111,7 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { () -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument) ); assertEquals( - "Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage() + "Cycle detected for pipeline: inner", e.getRootCause().getMessage() ); } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index e8b327c1c046f..7a7f9b773727f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -269,7 +269,7 @@ pipelineId, null, null, new CompoundProcessor(pipelineProcessor) IllegalStateException exception = expectThrows(IllegalStateException.class, () -> decorate(actualProcessor, resultList, pipelinesSeen)); - assertThat(exception.getMessage(), equalTo("Recursive invocation of pipeline [pipeline1] detected.")); + assertThat(exception.getMessage(), equalTo("Cycle detected for pipeline: pipeline1")); }