Skip to content

Commit

Permalink
DLQ-ing events that trigger an conditional evaluation error. (#16423)
Browse files Browse the repository at this point in the history
When a conditional evaluation encounter an error in the expression the event that triggered the issue is sent to pipeline's DLQ, if enabled for the executing pipeline.

This PR engage with the work done in #16322, the `ConditionalEvaluationListener` that is receives notifications about if-statements evaluation failure, is improved to also send the event to DLQ (if enabled in the pipeline) and not just logging it.

(cherry picked from commit b69d993)
  • Loading branch information
andsel authored and logstashmachine committed Oct 2, 2024
1 parent 2c024da commit 3ba0e25
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 11 deletions.
12 changes: 9 additions & 3 deletions docs/static/dead-letter-queues.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ loss in this situation, you can <<configuring-dlq,configure Logstash>> to write
unsuccessful events to a dead letter queue instead of dropping them.

NOTE: The dead letter queue is currently supported only for the
<<plugins-outputs-elasticsearch,{es} output>>. The dead letter queue is used for
documents with response codes of 400 or 404, both of which indicate an event
<<plugins-outputs-elasticsearch,{es} output>> and <<conditionals, conditional statements evaluation>>.
The dead letter queue is used for documents with response codes of 400 or 404, both of which indicate an event
that cannot be retried.
It's also used when a conditional evaluation encounter an error.

Each event written to the dead letter queue includes the original event,
metadata that describes the reason the event could not be processed, information
Expand Down Expand Up @@ -57,7 +58,12 @@ status code per entry to indicate why the action could not be performed.
If the DLQ is configured, individual indexing failures are routed there.

Even if you regularly process events, events remain in the dead letter queue.
The dead letter queue requires <<dlq-clear,manual intervention>> to clear it.
The dead letter queue requires <<dlq-clear,manual intervention>> to clear it.

[[conditionals-dlq]]
==== Conditional statements and the dead letter queue
When a conditional statement reaches an error in processing an event, such as comparing string and integer values,
the event, as it is at the time of evaluation, is inserted into the dead letter queue.

[[configuring-dlq]]
==== Configuring {ls} to use dead letter queues
Expand Down
50 changes: 44 additions & 6 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,8 @@ def flush(options)
LogStash::PLUGIN_REGISTRY.add(:output, "spec_sampler_output", PipelineHelpers::SpecSamplerOutput)
end

describe "given a pipeline executing an event that would trigger an evaluation error" do
context "given a pipeline executing an event that would trigger an evaluation error" do
let(:pipeline) do
settings.set_value("queue.drain", true)
LogStash::JavaPipeline.new(
org.logstash.config.ir.PipelineConfig.new(
LogStash::Config::Source::Local, :main,
Expand All @@ -470,11 +469,50 @@ def flush(options)
pipeline.close
end

subject {results.length > 1 ? results : results.first}
describe "when DLQ is disabled" do
let(:settings) do
s = super()
s.set_value("queue.drain", true)
s
end

it "should raise an error without killing the pipeline" do
expect(subject).to be nil
expect(pipeline.last_error_evaluation_received).to match(/no implicit conversion of nil into Integer/)
subject {results.length > 1 ? results : results.first}

it "should raise an error without killing the pipeline" do
expect(subject).to be nil
expect(pipeline.last_error_evaluation_received).to match(/no implicit conversion of nil into Integer/)
end
end

describe "when DLQ is enabled" do
let(:dlq_path) { Dir.mktmpdir }

let(:settings) do
s = super()
s.set_value("queue.drain", true)
s.set_value("pipeline.id", "test_dlq")
s.set_value("dead_letter_queue.enable", true)
s.set_value("path.dead_letter_queue", dlq_path)
s
end

after do
FileUtils.rm_rf(settings.get_value("path.dead_letter_queue"))
end

subject {results.length > 1 ? results : results.first}

it "should raise an error without killing the pipeline and insert the event into DLQ" do
expect(subject).to be nil
expect(pipeline.last_error_evaluation_received).to match(/no implicit conversion of nil into Integer/)
dlq_path = java.nio.file.Paths.get(settings.get_value("path.dead_letter_queue"), "test_dlq")
dlq_reader = org.logstash.common.io.DeadLetterQueueReader.new(dlq_path)
entry = dlq_reader.pollEntry(40)
expect(entry).to_not be_nil
expect(entry.reason).to match(/condition evaluation error.*no implicit conversion of nil into Integer/)
expect(entry.plugin_id).to eq("if-statement")
expect(entry.plugin_type).to eq("if-statement")
end
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public class AbstractPipelineExt extends RubyBasicObject {
private @SuppressWarnings("rawtypes") RubyArray outputs;

private String lastErrorEvaluationReceived = "";
private DeadLetterQueueWriter javaDlqWriter;

public AbstractPipelineExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
Expand All @@ -180,10 +181,32 @@ public final class LogErrorEvaluationListener implements ConditionalEvaluationLi
@Override
public void notify(ConditionalEvaluationError err) {
lastErrorEvaluationReceived = err.getCause().getMessage();
LOGGER.warn("{}. Event was dropped, enable debug logging to see the event's payload.", lastErrorEvaluationReceived);
if (isDLQEnabled()) {
LOGGER.warn("{}. Failing event was sent to dead letter queue", lastErrorEvaluationReceived);
} else {
LOGGER.warn("{}. Event was dropped, enable debug logging to see the event's payload", lastErrorEvaluationReceived);
}
LOGGER.debug("Event generating the fault: {}", err.failedEvent().toMap().toString());

// logs the exception at debug level
if (LOGGER.isDebugEnabled()) {
debugLogStackTrace(err);
}

if (isDLQEnabled()) {
try {
javaDlqWriter.writeEntry(err.failedEvent(), "if-statement", "if-statement", "condition evaluation error, " + lastErrorEvaluationReceived);
} catch (IOException ioex) {
LOGGER.error("Can't write in DLQ", ioex);
}
}
}

private boolean isDLQEnabled() {
return javaDlqWriter != null;
}

private void debugLogStackTrace(ConditionalEvaluationError err) {
try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) {
err.printStackTrace(pw);
LOGGER.debug("{}", sw);
Expand Down Expand Up @@ -372,7 +395,7 @@ public IRubyObject lirExecution(final ThreadContext context) {
public final IRubyObject dlqWriter(final ThreadContext context) {
if (dlqWriter == null) {
if (dlqEnabled(context).isTrue()) {
final DeadLetterQueueWriter javaDlqWriter = createDeadLetterQueueWriterFromSettings(context);
javaDlqWriter = createDeadLetterQueueWriterFromSettings(context);
dlqWriter = JavaUtil.convertJavaToUsableRubyObject(context.runtime, javaDlqWriter);
} else {
dlqWriter = RubyUtil.DUMMY_DLQ_WRITER_CLASS.callMethod(context, "new");
Expand Down

0 comments on commit 3ba0e25

Please sign in to comment.