From 025251e8d39bc09cb7d3348676d69922a8b310e1 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 20 Sep 2018 08:33:07 -0500 Subject: [PATCH] ingest: support simulate with verbose for pipeline processor (#33839) * ingest: support simulate with verbose for pipeline processor This change better supports the use of simulate?verbose with the pipeline processor. Prior to this change any pipeline processors executed with simulate?verbose would not show all intermediate processors for the inner pipelines. This changes also moves the PipelineProcess and TrackingResultProcessor classes to enable instance checks and to avoid overly public classes. As well this updates the error message for when cycles are detected in pipelines calling other pipelines. --- .../ingest/common/IngestCommonPlugin.java | 1 + .../test/ingest/210_pipeline_processor.yml | 2 +- .../rest-api-spec/test/ingest/90_simulate.yml | 147 ++++++++ .../ingest/SimulateExecutionService.java | 14 +- .../elasticsearch/ingest/IngestDocument.java | 2 +- .../ingest}/PipelineProcessor.java | 12 +- .../ingest/TrackingResultProcessor.java | 35 +- .../ingest/TrackingResultProcessorTests.java | 149 --------- .../ingest}/PipelineProcessorTests.java | 5 +- .../ingest/TrackingResultProcessorTests.java | 315 ++++++++++++++++++ 10 files changed, 509 insertions(+), 173 deletions(-) rename {modules/ingest-common/src/main/java/org/elasticsearch/ingest/common => server/src/main/java/org/elasticsearch/ingest}/PipelineProcessor.java (87%) rename server/src/main/java/org/elasticsearch/{action => }/ingest/TrackingResultProcessor.java (65%) delete mode 100644 server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java rename {modules/ingest-common/src/test/java/org/elasticsearch/ingest/common => server/src/test/java/org/elasticsearch/ingest}/PipelineProcessorTests.java (97%) create mode 100644 server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index d9dba2cc10073..d58a48e70c9ad 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.IngestPlugin; diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index 355ba2d42104a..c7c5df1e06f99 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -110,4 +110,4 @@ teardown: pipeline: "outer" body: {} - match: { error.root_cause.0.type: "exception" } -- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." } +- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: inner" } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 8b3ed313314bb..fca80ab8fac1d 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -605,3 +605,150 @@ teardown: - length: { docs.0.processor_results.1: 2 } - match: { docs.0.processor_results.1.tag: "rename-1" } - match: { docs.0.processor_results.1.doc._source.new_status: 200 } + +--- +"Test verbose simulate with Pipeline Processor with Circular Pipelines": +- do: + ingest.put_pipeline: + id: "outer" + body: > + { + "description" : "outer pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "inner" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "inner" + body: > + { + "description" : "inner pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } +- match: { acknowledged: true } + +- do: + catch: /illegal_state_exception/ + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } + , + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "field1": "123.42 400 " + } + } + ] + } +- match: { error.root_cause.0.type: "illegal_state_exception" } +- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" } + +--- +"Test verbose simulate with Pipeline Processor with Multiple Pipelines": +- do: + ingest.put_pipeline: + id: "pipeline1" + body: > + { + "processors": [ + { + "set": { + "field": "pipeline1", + "value": true + } + }, + { + "pipeline": { + "pipeline": "pipeline2" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "pipeline2" + body: > + { + "processors": [ + { + "set": { + "field": "pipeline2", + "value": true + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors": [ + { + "set": { + "field": "pipeline0", + "value": true + } + }, + { + "pipeline": { + "pipeline": "pipeline1" + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "field1": "123.42 400 " + } + } + ] + } +- length: { docs: 1 } +- length: { docs.0.processor_results: 3 } +- match: { docs.0.processor_results.0.doc._source.pipeline0: true } +- is_false: docs.0.processor_results.0.doc._source.pipeline1 +- is_false: docs.0.processor_results.0.doc._source.pipeline2 +- match: { docs.0.processor_results.1.doc._source.pipeline0: true } +- match: { docs.0.processor_results.1.doc._source.pipeline1: true } +- is_false: docs.0.processor_results.1.doc._source.pipeline2 +- match: { docs.0.processor_results.2.doc._source.pipeline0: true } +- match: { docs.0.processor_results.2.doc._source.pipeline1: true } +- match: { docs.0.processor_results.2.doc._source.pipeline2: true } + diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 430da9955bafa..c081707f4dbda 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -24,12 +24,16 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Set; -import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; +import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; class SimulateExecutionService { @@ -42,11 +46,15 @@ class SimulateExecutionService { } SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { + // Prevent cycles in pipeline decoration + final Set pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); if (verbose) { List processorResultList = new ArrayList<>(); - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen); try { - verbosePipelineProcessor.execute(ingestDocument); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline); return new SimulateDocumentVerboseResult(processorResultList); } catch (Exception e) { return new SimulateDocumentVerboseResult(processorResultList); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index faf0b49de3a49..1ad2d0f747221 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -651,7 +651,7 @@ private static Object deepCopy(Object value) { public IngestDocument executePipeline(Pipeline pipeline) throws Exception { try { if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); + throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()); } return pipeline.execute(this); } finally { diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java similarity index 87% rename from modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 1958a3e5232b8..918ff6b8aefee 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -17,15 +17,9 @@ * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Map; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.ConfigurationUtils; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.Processor; public class PipelineProcessor extends AbstractProcessor { @@ -50,6 +44,10 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { return ingestDocument.executePipeline(pipeline); } + Pipeline getPipeline(){ + return ingestService.getPipeline(pipelineName); + } + @Override public String getType() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java similarity index 65% rename from server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 04c0fe7ca49dc..41a984be5adad 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -17,14 +17,13 @@ * under the License. */ -package org.elasticsearch.action.ingest; +package org.elasticsearch.ingest; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; +import org.elasticsearch.action.ingest.SimulateProcessorResult; import java.util.ArrayList; import java.util.List; +import java.util.Set; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -35,7 +34,7 @@ public final class TrackingResultProcessor implements Processor { private final List processorResultList; private final boolean ignoreFailure; - public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { + TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { this.ignoreFailure = ignoreFailure; this.processorResultList = processorResultList; this.actualProcessor = actualProcessor; @@ -67,19 +66,35 @@ public String getTag() { return actualProcessor.getTag(); } - public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList, + Set pipelinesSeen) { List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getProcessors()) { - if (processor instanceof CompoundProcessor) { - processors.add(decorate((CompoundProcessor) processor, processorResultList)); + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + if (pipelinesSeen.add(pipelineProcessor) == false) { + throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); + } + processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); + pipelinesSeen.remove(pipelineProcessor); + } else if (processor instanceof CompoundProcessor) { + processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } } List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getOnFailureProcessors()) { - if (processor instanceof CompoundProcessor) { - onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + if (pipelinesSeen.add(pipelineProcessor) == false) { + throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); + } + onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, + pipelinesSeen)); + pipelinesSeen.remove(pipelineProcessor); + } else if (processor instanceof CompoundProcessor) { + onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java deleted file mode 100644 index 3572a04529b43..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; -import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; - -public class TrackingResultProcessorTests extends ESTestCase { - - private IngestDocument ingestDocument; - private List resultList; - - @Before - public void init() { - ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); - resultList = new ArrayList<>(); - } - - public void testActualProcessor() throws Exception { - TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); - TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - - assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - - assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(0).getFailure(), nullValue()); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithoutOnFailure() throws Exception { - RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); - CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - - try { - trackingProcessor.execute(ingestDocument); - fail("processor should throw exception"); - } catch (ElasticsearchException e) { - assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); - } - - SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); - assertThat(testProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - assertThat(resultList.get(0).getIngestDocument(), nullValue()); - assertThat(resultList.get(0).getFailure(), equalTo(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithOnFailure() throws Exception { - RuntimeException exception = new RuntimeException("fail"); - TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); - TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); - CompoundProcessor actualProcessor = new CompoundProcessor(false, - Arrays.asList(new CompoundProcessor(false, - Arrays.asList(failProcessor, onFailureProcessor), - Arrays.asList(onFailureProcessor, failProcessor))), - Arrays.asList(onFailureProcessor)); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); - SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); - - assertThat(failProcessor.getInvokedCounter(), equalTo(2)); - assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); - assertThat(resultList.size(), equalTo(4)); - - assertThat(resultList.get(0).getIngestDocument(), nullValue()); - assertThat(resultList.get(0).getFailure(), equalTo(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); - - Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); - assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); - assertThat(resultList.get(1).getFailure(), nullValue()); - assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); - - assertThat(resultList.get(2).getIngestDocument(), nullValue()); - assertThat(resultList.get(2).getFailure(), equalTo(exception)); - assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); - - metadata = resultList.get(3).getIngestDocument().getIngestMetadata(); - assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); - assertThat(resultList.get(3).getFailure(), nullValue()); - assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { - RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); - CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), - Collections.emptyList()); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); - assertThat(testProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(0).getFailure(), sameInstance(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); - } -} diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java similarity index 97% rename from modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java rename to server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 6e18bac40d4aa..99fa7633d085a 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Collections; import java.util.HashMap; @@ -27,6 +27,7 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; @@ -110,7 +111,7 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { () -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument) ); assertEquals( - "Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage() + "Cycle detected for pipeline: inner", e.getRootCause().getMessage() ); } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java new file mode 100644 index 0000000000000..7a7f9b773727f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -0,0 +1,315 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ingest.SimulateProcessorResult; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; +import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TrackingResultProcessorTests extends ESTestCase { + + private IngestDocument ingestDocument; + private List resultList; + private Set pipelinesSeen; + + @Before + public void init() { + ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); + resultList = new ArrayList<>(); + pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); + } + + public void testActualProcessor() throws Exception { + TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); + TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + + assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithoutOnFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + try { + trackingProcessor.execute(ingestDocument); + fail("processor should throw exception"); + } catch (ElasticsearchException e) { + assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); + } + + SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithOnFailure() throws Exception { + RuntimeException exception = new RuntimeException("fail"); + TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); + TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); + CompoundProcessor actualProcessor = new CompoundProcessor(false, + Arrays.asList(new CompoundProcessor(false, + Arrays.asList(failProcessor, onFailureProcessor), + Arrays.asList(onFailureProcessor, failProcessor))), + Arrays.asList(onFailureProcessor)); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); + + assertThat(failProcessor.getInvokedCounter(), equalTo(2)); + assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); + assertThat(resultList.size(), equalTo(4)); + + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + + assertThat(resultList.get(2).getIngestDocument(), nullValue()); + assertThat(resultList.get(2).getFailure(), equalTo(exception)); + assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + metadata = resultList.get(3).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), + Collections.emptyList()); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(0).getFailure(), sameInstance(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); + } + + public void testActualPipelineProcessor() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); }), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(3)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(2).getFailure(), nullValue()); + assertThat(resultList.get(2).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithHandledFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); }), + new CompoundProcessor( + false, + Collections.singletonList(new TestProcessor(ingestDocument -> { throw exception; })), + Collections.singletonList(new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); })) + ), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(4)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + //failed processor + assertNull(resultList.get(1).getIngestDocument()); + assertThat(resultList.get(1).getFailure().getMessage(), equalTo(exception.getMessage())); + + assertTrue(resultList.get(2).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(2).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(2).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithCycle() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor(pipelineProcessor) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + IllegalStateException exception = expectThrows(IllegalStateException.class, + () -> decorate(actualProcessor, resultList, pipelinesSeen)); + assertThat(exception.getMessage(), equalTo("Cycle detected for pipeline: pipeline1")); + } + + + public void testActualPipelineProcessorRepeatedInvocation() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, times(2)).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(2)); + + assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument()))); + assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getProcessorTag(), nullValue()); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + + //each invocation updates key1 with a random int + assertNotEquals(resultList.get(0).getIngestDocument().getSourceAndMetadata().get(key1), + resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1)); + } + +}