Skip to content

Commit

Permalink
INGEST: Allow Repeated Invocation of Pipeline (elastic#33419)
Browse files Browse the repository at this point in the history
* Allows repeated, non-recursive invocation
of the same pipeline
  • Loading branch information
original-brownbear authored and jakelandis committed Oct 21, 2018
1 parent 28bbf45 commit f1db75d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,20 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception {
"Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage()
);
}

public void testAllowsRepeatedPipelineInvocations() throws Exception {
String innerPipelineId = "inner";
IngestService ingestService = mock(IngestService.class);
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Map<String, Object> outerConfig = new HashMap<>();
outerConfig.put("pipeline", innerPipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Pipeline inner = new Pipeline(
innerPipelineId, null, null, new CompoundProcessor()
);
when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig);
outerProc.execute(testIngestDocument);
outerProc.execute(testIngestDocument);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,14 @@ private static Object deepCopy(Object value) {
* @throws Exception On exception in pipeline execution
*/
public IngestDocument executePipeline(Pipeline pipeline) throws Exception {
if (this.executedPipelines.add(pipeline) == false) {
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
try {
if (this.executedPipelines.add(pipeline) == false) {
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
}
return pipeline.execute(this);
} finally {
executedPipelines.remove(pipeline);
}
return pipeline.execute(this);
}

@Override
Expand Down

0 comments on commit f1db75d

Please sign in to comment.