diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java index 3103fb0392e96..6e18bac40d4aa 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java @@ -113,4 +113,20 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { "Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage() ); } + + public void testAllowsRepeatedPipelineInvocations() throws Exception { + String innerPipelineId = "inner"; + IngestService ingestService = mock(IngestService.class); + IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + Map outerConfig = new HashMap<>(); + outerConfig.put("pipeline", innerPipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + Pipeline inner = new Pipeline( + innerPipelineId, null, null, new CompoundProcessor() + ); + when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); + Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig); + outerProc.execute(testIngestDocument); + outerProc.execute(testIngestDocument); + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 5f122358d0c43..10cb2fd17fec6 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -645,10 +645,14 @@ private static Object deepCopy(Object value) { * @throws Exception On exception in pipeline execution */ public IngestDocument executePipeline(Pipeline pipeline) throws Exception { - if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); + try { + if (this.executedPipelines.add(pipeline) == false) { + throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); + } + return pipeline.execute(this); + } finally { + executedPipelines.remove(pipeline); } - return pipeline.execute(this); } @Override