Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize the exception behaviour for inputs outputs and filters #2386

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 52 additions & 47 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
class LogStash::Pipeline

FLUSH_EVENT = LogStash::FlushEvent.new
RETRY_INTERVAL = 0.5 # seconds

def initialize(configstr)
@logger = Cabin::Channel.get(LogStash)
Expand Down Expand Up @@ -159,9 +160,8 @@ def start_filters

def start_outputs
@outputs.each(&:register)
@output_threads = [
Thread.new { outputworker }
]
@outputs.each(&:worker_setup)
@output_threads = [ Thread.new { outputworker } ]
end

def start_input(plugin)
Expand All @@ -170,71 +170,72 @@ def start_input(plugin)

def inputworker(plugin)
LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
begin
plugin.run(@input_to_filter)
rescue LogStash::ShutdownSignal
return
rescue => e
if @logger.debug?
@logger.error(I18n.t("logstash.pipeline.worker-error-debug",
:plugin => plugin.inspect, :error => e.to_s,
:exception => e.class,
:stacktrace => e.backtrace.join("\n")))
else
@logger.error(I18n.t("logstash.pipeline.worker-error",
:plugin => plugin.inspect, :error => e))
end
puts e.backtrace if @logger.debug?
plugin.teardown
sleep 1
retry
end
plugin.run(@input_to_filter)
rescue LogStash::ShutdownSignal
# nothing
rescue => e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StandardError => e

@logger.error exception_information(e)
# TODO: find a way to obtain the event caused the exception
sleep RETRY_INTERVAL
retry
rescue Exception => e
@logger.fatal exception_information(e)
shutdown
ensure
plugin.teardown
end # def inputworker

def filterworker
LogStash::Util::set_thread_name("|worker")
begin
while true
event = @input_to_filter.pop

case event
when LogStash::Event
# use events array to guarantee ordering of origin vs created events
# where created events are emitted by filters like split or metrics
events = []
filter(event) { |newevent| events << newevent }
events.each { |event| @filter_to_output.push(event) }
when LogStash::FlushEvent
# handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
# don't have to deal with thread safety implementing the flush method
@flusher_lock.synchronize { flush_filters_to_output! }
when LogStash::ShutdownEvent
# pass it down to any other filterworker and stop this worker
@input_to_filter.push(event)
break
end

while(event = @input_to_filter.pop)
case event
when LogStash::Event
# use events array to guarantee ordering of origin vs created events
# where created events are emitted by filters like split or metrics
events = []
filter(event) { |newevent| events << newevent }
events.each { |event| @filter_to_output.push(event) }
when LogStash::FlushEvent
# handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
# don't have to deal with thread safety implementing the flush method
@flusher_lock.synchronize { flush_filters_to_output! }
when LogStash::ShutdownEvent
# pass it down to any other filterworker and stop this worker
@input_to_filter.push(event)
break
end
rescue => e
@logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace)
end

rescue => e
@logger.error exception_information(e)
@logger.warn("Discarded event: #{event.to_hash}")
sleep RETRY_INTERVAL
retry
rescue Exception => e
@logger.fatal exception_information(e)
shutdown
ensure
@filters.each(&:teardown)
end # def filterworker

def outputworker
LogStash::Util::set_thread_name(">output")
@outputs.each(&:worker_setup)

while true
event = @filter_to_output.pop
while(event = @filter_to_output.pop)
break if event.is_a?(LogStash::ShutdownEvent)
output(event)
end # while true

rescue => e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rescue StandardError => e

@logger.error exception_information(e)
@logger.warn("Discarded event: #{event.to_hash}")
sleep RETRY_INTERVAL
retry
rescue Exception => e
@logger.fatal exception_information(e)
shutdown
ensure
@outputs.each do |output|
output.worker_plugins.each(&:teardown)
end
Expand Down Expand Up @@ -302,4 +303,8 @@ def flush_filters_to_output!(options = {})
end
end # flush_filters_to_output!

private
def exception_information(exception)
"Exception information: #{exception} => #{exception.backtrace}"
end
end # class Pipeline
2 changes: 1 addition & 1 deletion lib/logstash/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
LogStash::Environment.bundler_setup!
LogStash::Environment.load_locale!

Thread.abort_on_exception = true
Thread.abort_on_exception = false

require "logstash/namespace"
require "logstash/program"
Expand Down
124 changes: 124 additions & 0 deletions spec/core/pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ def teardown
end
end

class DummyFilter < LogStash::Filters::Base
config_name "dummyfilter"
milestone 2

def register
end

def filter(event)
end

def teardown
end
end

class DummyCodec < LogStash::Codecs::Base
config_name "dummycodec"
milestone 2
Expand Down Expand Up @@ -54,6 +68,8 @@ def teardown

class TestPipeline < LogStash::Pipeline
attr_reader :outputs
attr_reader :inputs
attr_reader :filters
end

describe LogStash::Pipeline do
Expand All @@ -65,6 +81,8 @@ class TestPipeline < LogStash::Pipeline
.with("codec", "plain").and_return(DummyCodec)
LogStash::Plugin.stub(:lookup)
.with("output", "dummyoutput").and_return(DummyOutput)
LogStash::Plugin.stub(:lookup)
.with("filter", "dummyfilter").and_return(DummyFilter)
end

let(:test_config_without_output_workers) {
Expand Down Expand Up @@ -114,4 +132,110 @@ class TestPipeline < LogStash::Pipeline
end
end
end

context "when plugins raise exceptions" do

let(:dummy_config) {
<<-eos
input { dummyinput {} }
filter { dummyfilter {} }
output { dummyoutput {} }
eos
}

let(:bad_event) { LogStash::Event.new("message" => "bad message") }
let(:good_event) { LogStash::Event.new("message" => "good message") }
let(:pipeline) { TestPipeline.new(dummy_config) }
let(:input) { pipeline.inputs.first }
let(:output) { pipeline.outputs.first }
let(:filter) { pipeline.filters.first }

context "transient exceptions" do
context "input" do
it "should restart and generate more events" do
expect(input).to receive(:run).and_return do |queue|
raise StandardError
end
expect(input).to receive(:run).and_return do |queue|
queue << good_event
end
expect(output).to receive(:receive).once.with(good_event)
expect(input).to receive(:teardown).once
expect { pipeline.run }.to_not raise_error
end
end

context "filter" do
it "should restart and process the next event" do
expect(input).to receive(:run).and_return do |queue|
queue << bad_event
queue << good_event
end
expect(filter).to receive(:filter).with(bad_event).and_return do |event|
raise StandardError
end
expect(filter).to receive(:filter).with(good_event)
expect(output).to receive(:receive).once.with(good_event)
expect { pipeline.run }.to_not raise_error
end
end

context "output" do
it "should restart and process the next message" do
expect(input).to receive(:run).and_return do |queue|
queue << bad_event
queue << good_event
end
expect(output).to receive(:receive).with(bad_event).and_return do |event|
raise StandardError
end
expect(output).to receive(:receive).with(good_event).and_return do |event|
# ...
end
expect(output).to receive(:teardown).once
expect { pipeline.run }.to_not raise_error
end
end
end

context "fatal exceptions" do
context "input" do
it "should raise exception" do
expect(input).to receive(:run).and_return do |queue|
raise Exception
end
expect(filter).to_not receive(:filter)
expect(pipeline).to receive(:shutdown)
expect { pipeline.run }.to_not raise_error
end
end

context "filter" do
it "should raise exception" do
expect(input).to receive(:run).and_return do |queue|
queue << bad_event
end
expect(filter).to receive(:filter).with(bad_event).and_return do |event|
raise Exception
end
expect(output).to_not receive(:receive)
expect(pipeline).to receive(:shutdown)
expect { pipeline.run }.to_not raise_error
end
end

context "output" do
it "should raise exception" do
expect(input).to receive(:run).and_return do |queue|
queue << bad_event
end
expect(output).to receive(:receive).with(bad_event).and_return do |event|
raise Exception
end
expect(pipeline).to receive(:shutdown)
expect { pipeline.run }.to_not raise_error
end
end
end
end
end