diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml index 3be038aca2499..accc30faa21e7 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml @@ -57,3 +57,44 @@ teardown: type: test id: 2 - match: { _source.foo: "blub" } + +--- +"Test Drop Processor On Failure": +- do: + ingest.put_pipeline: + id: "my_pipeline_with_failure" + body: > + { + "description" : "pipeline with on failure drop", + "processors": [ + { + "fail": { + "message": "failed", + "on_failure": [ + { + "drop": {} + } + ] + } + } + ] + } +- match: { acknowledged: true } + +- do: + index: + index: test + type: test + id: 3 + pipeline: "my_pipeline_with_failure" + body: { + foo: "bar" + } + +- do: + catch: missing + get: + index: test + type: test + id: 3 +- match: { found: false } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 364e19d1f2e8e..5286c6ea7880a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -135,7 +135,9 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { if (onFailureProcessors.isEmpty()) { throw compoundProcessorException; } else { - executeOnFailure(ingestDocument, compoundProcessorException); + if (executeOnFailure(ingestDocument, compoundProcessorException) == false) { + return null; + } break; } } finally { @@ -146,13 +148,17 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { return ingestDocument; } - - void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { + /** + * @return true if execution should continue, false if document is dropped. + */ + boolean executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { try { putFailureMetadata(ingestDocument, exception); for (Processor processor : onFailureProcessors) { try { - processor.execute(ingestDocument); + if (processor.execute(ingestDocument) == null) { + return false; + } } catch (Exception e) { throw newCompoundProcessorException(e, processor.getType(), processor.getTag()); } @@ -160,6 +166,7 @@ void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exce } finally { removeFailureMetadata(ingestDocument); } + return true; } private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) { diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index dabcae533a0bf..24e3dcd76774b 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -129,6 +129,35 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception { assertThat(processor2.getInvokedCounter(), equalTo(1)); } + public void testSingleProcessorWithOnFailureDropProcessor() throws Exception { + TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");}); + Processor processor2 = new Processor() { + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + //Simulates the drop processor + return null; + } + + @Override + public String getType() { + return "drop"; + } + + @Override + public String getTag() { + return null; + } + }; + + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1), + Collections.singletonList(processor2), relativeTimeProvider); + assertNull(compoundProcessor.execute(ingestDocument)); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(compoundProcessor, 1, 1, 0); + } + public void testSingleProcessorWithNestedFailures() throws Exception { TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");}); TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {