diff --git a/docs/static/dead-letter-queues.asciidoc b/docs/static/dead-letter-queues.asciidoc index ae377d76986..70afd8337ac 100644 --- a/docs/static/dead-letter-queues.asciidoc +++ b/docs/static/dead-letter-queues.asciidoc @@ -21,9 +21,10 @@ loss in this situation, you can <> to write unsuccessful events to a dead letter queue instead of dropping them. NOTE: The dead letter queue is currently supported only for the -<>. The dead letter queue is used for -documents with response codes of 400 or 404, both of which indicate an event +<> and <>. +The dead letter queue is used for documents with response codes of 400 or 404, both of which indicate an event that cannot be retried. +It's also used when a conditional evaluation encounter an error. Each event written to the dead letter queue includes the original event, metadata that describes the reason the event could not be processed, information @@ -57,7 +58,12 @@ status code per entry to indicate why the action could not be performed. If the DLQ is configured, individual indexing failures are routed there. Even if you regularly process events, events remain in the dead letter queue. -The dead letter queue requires <> to clear it. +The dead letter queue requires <> to clear it. + +[[conditionals-dlq]] +==== Conditional statements and the dead letter queue +When a conditional statement reaches an error in processing an event, such as comparing string and integer values, +the event, as it is at the time of evaluation, is inserted into the dead letter queue. [[configuring-dlq]] ==== Configuring {ls} to use dead letter queues diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index d30872e1430..24bcb3adc0c 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -443,9 +443,8 @@ def flush(options) LogStash::PLUGIN_REGISTRY.add(:output, "spec_sampler_output", PipelineHelpers::SpecSamplerOutput) end - describe "given a pipeline executing an event that would trigger an evaluation error" do + context "given a pipeline executing an event that would trigger an evaluation error" do let(:pipeline) do - settings.set_value("queue.drain", true) LogStash::JavaPipeline.new( org.logstash.config.ir.PipelineConfig.new( LogStash::Config::Source::Local, :main, @@ -470,11 +469,50 @@ def flush(options) pipeline.close end - subject {results.length > 1 ? results : results.first} + describe "when DLQ is disabled" do + let(:settings) do + s = super() + s.set_value("queue.drain", true) + s + end - it "should raise an error without killing the pipeline" do - expect(subject).to be nil - expect(pipeline.last_error_evaluation_received).to match(/no implicit conversion of nil into Integer/) + subject {results.length > 1 ? results : results.first} + + it "should raise an error without killing the pipeline" do + expect(subject).to be nil + expect(pipeline.last_error_evaluation_received).to match(/no implicit conversion of nil into Integer/) + end + end + + describe "when DLQ is enabled" do + let(:dlq_path) { Dir.mktmpdir } + + let(:settings) do + s = super() + s.set_value("queue.drain", true) + s.set_value("pipeline.id", "test_dlq") + s.set_value("dead_letter_queue.enable", true) + s.set_value("path.dead_letter_queue", dlq_path) + s + end + + after do + FileUtils.rm_rf(settings.get_value("path.dead_letter_queue")) + end + + subject {results.length > 1 ? results : results.first} + + it "should raise an error without killing the pipeline and insert the event into DLQ" do + expect(subject).to be nil + expect(pipeline.last_error_evaluation_received).to match(/no implicit conversion of nil into Integer/) + dlq_path = java.nio.file.Paths.get(settings.get_value("path.dead_letter_queue"), "test_dlq") + dlq_reader = org.logstash.common.io.DeadLetterQueueReader.new(dlq_path) + entry = dlq_reader.pollEntry(40) + expect(entry).to_not be_nil + expect(entry.reason).to match(/condition evaluation error.*no implicit conversion of nil into Integer/) + expect(entry.plugin_id).to eq("if-statement") + expect(entry.plugin_type).to eq("if-statement") + end end end diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 0ac584f1595..5440f1af05b 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -167,6 +167,7 @@ public class AbstractPipelineExt extends RubyBasicObject { private @SuppressWarnings("rawtypes") RubyArray outputs; private String lastErrorEvaluationReceived = ""; + private DeadLetterQueueWriter javaDlqWriter; public AbstractPipelineExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); @@ -180,10 +181,32 @@ public final class LogErrorEvaluationListener implements ConditionalEvaluationLi @Override public void notify(ConditionalEvaluationError err) { lastErrorEvaluationReceived = err.getCause().getMessage(); - LOGGER.warn("{}. Event was dropped, enable debug logging to see the event's payload.", lastErrorEvaluationReceived); + if (isDLQEnabled()) { + LOGGER.warn("{}. Failing event was sent to dead letter queue", lastErrorEvaluationReceived); + } else { + LOGGER.warn("{}. Event was dropped, enable debug logging to see the event's payload", lastErrorEvaluationReceived); + } LOGGER.debug("Event generating the fault: {}", err.failedEvent().toMap().toString()); // logs the exception at debug level + if (LOGGER.isDebugEnabled()) { + debugLogStackTrace(err); + } + + if (isDLQEnabled()) { + try { + javaDlqWriter.writeEntry(err.failedEvent(), "if-statement", "if-statement", "condition evaluation error, " + lastErrorEvaluationReceived); + } catch (IOException ioex) { + LOGGER.error("Can't write in DLQ", ioex); + } + } + } + + private boolean isDLQEnabled() { + return javaDlqWriter != null; + } + + private void debugLogStackTrace(ConditionalEvaluationError err) { try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { err.printStackTrace(pw); LOGGER.debug("{}", sw); @@ -372,7 +395,7 @@ public IRubyObject lirExecution(final ThreadContext context) { public final IRubyObject dlqWriter(final ThreadContext context) { if (dlqWriter == null) { if (dlqEnabled(context).isTrue()) { - final DeadLetterQueueWriter javaDlqWriter = createDeadLetterQueueWriterFromSettings(context); + javaDlqWriter = createDeadLetterQueueWriterFromSettings(context); dlqWriter = JavaUtil.convertJavaToUsableRubyObject(context.runtime, javaDlqWriter); } else { dlqWriter = RubyUtil.DUMMY_DLQ_WRITER_CLASS.callMethod(context, "new");