From 48af2decebc16a527025ab7bc9a6f7091033f758 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 5 Sep 2018 14:02:48 +0200 Subject: [PATCH] INGEST: Allow Repeated Invocation of Pipeline * Allows repeated, non-recursive invocation of the same pipeline --- .../ingest/common/PipelineProcessorTests.java | 16 ++++++++++++++++ .../org/elasticsearch/ingest/IngestDocument.java | 10 +++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java index 5baf3cf822d72..38d6cb2e84479 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java @@ -112,4 +112,20 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { "Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage() ); } + + public void testAllowsRepeatedPipelineInvocations() throws Exception { + String innerPipelineId = "inner"; + IngestService ingestService = mock(IngestService.class); + IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + Map outerConfig = new HashMap<>(); + outerConfig.put("pipeline", innerPipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + Pipeline inner = new Pipeline( + innerPipelineId, null, null, new CompoundProcessor() + ); + when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); + Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig); + outerProc.execute(testIngestDocument); + outerProc.execute(testIngestDocument); + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index e218168eeb7b5..95bdea74d6929 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -645,10 +645,14 @@ private static Object deepCopy(Object value) { * @throws Exception On exception in pipeline execution */ public void executePipeline(Pipeline pipeline) throws Exception { - if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); + try { + if (this.executedPipelines.add(pipeline) == false) { + throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); + } + pipeline.execute(this); + } finally { + executedPipelines.remove(pipeline); } - pipeline.execute(this); } @Override