Skip to content

Commit

Permalink
pipeline:refactored specs. added Exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jsvd committed Feb 6, 2015
1 parent 1cfa954 commit ef560ab
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 68 deletions.
96 changes: 52 additions & 44 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,65 +171,73 @@ def start_input(plugin)
def inputworker(plugin)
LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
plugin.run(@input_to_filter)
rescue LogStash::ShutdownSignal
# nothing
rescue => e
print_exception_information(e)
@logger.error exception_information(e)
# TODO: find a way to obtain the event caused the exception
sleep RETRY_INTERVAL
retry
rescue LogStash::ShutdownSignal
# nothing
rescue Exception => e
@logger.fatal exception_information(e)
shutdown
ensure
plugin.teardown
end # def inputworker

def filterworker
LogStash::Util::set_thread_name("|worker")
begin
while(event = @input_to_filter.pop)
case event
when LogStash::Event
# use events array to guarantee ordering of origin vs created events
# where created events are emitted by filters like split or metrics
events = []
filter(event) { |newevent| events << newevent }
events.each { |event| @filter_to_output.push(event) }
when LogStash::FlushEvent
# handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
# don't have to deal with thread safety implementing the flush method
@flusher_lock.synchronize { flush_filters_to_output! }
when LogStash::ShutdownEvent
# pass it down to any other filterworker and stop this worker
@input_to_filter.push(event)
break
end

while(event = @input_to_filter.pop)
case event
when LogStash::Event
# use events array to guarantee ordering of origin vs created events
# where created events are emitted by filters like split or metrics
events = []
filter(event) { |newevent| events << newevent }
events.each { |event| @filter_to_output.push(event) }
when LogStash::FlushEvent
# handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
# don't have to deal with thread safety implementing the flush method
@flusher_lock.synchronize { flush_filters_to_output! }
when LogStash::ShutdownEvent
# pass it down to any other filterworker and stop this worker
@input_to_filter.push(event)
break
end
rescue => e
print_exception_information(e)
@logger.warn("Discarded event: #{event.to_hash}")
sleep RETRY_INTERVAL
retry
ensure
@filters.each(&:teardown)
end

rescue => e
@logger.error exception_information(e)
@logger.warn("Discarded event: #{event.to_hash}")
sleep RETRY_INTERVAL
retry
rescue Exception => e
@logger.fatal exception_information(e)
shutdown
ensure
@filters.each(&:teardown)
end # def filterworker

def outputworker
LogStash::Util::set_thread_name(">output")

begin
while(event = @filter_to_output.pop)
break if event.is_a?(LogStash::ShutdownEvent)
output(event)
end # while true
rescue => e
print_exception_information(e)
@logger.warn("Discarded event: #{event.to_hash}")
sleep RETRY_INTERVAL
retry
ensure
@outputs.each do |output|
output.worker_plugins.each(&:teardown)
end
while(event = @filter_to_output.pop)
break if event.is_a?(LogStash::ShutdownEvent)
output(event)
end # while true

rescue => e
@logger.error exception_information(e)
@logger.warn("Discarded event: #{event.to_hash}")
sleep RETRY_INTERVAL
retry
rescue Exception => e
@logger.fatal exception_information(e)
shutdown
ensure
@outputs.each do |output|
output.worker_plugins.each(&:teardown)
end
end # def outputworker

Expand Down Expand Up @@ -296,7 +304,7 @@ def flush_filters_to_output!(options = {})
end # flush_filters_to_output!

private
def print_exception_information(exception)
@logger.error("Restarting worker: #{exception} => #{exception.backtrace}")
def exception_information(exception)
"Exception information: #{exception} => #{exception.backtrace}"
end
end # class Pipeline
55 changes: 31 additions & 24 deletions spec/core/pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def teardown

class TestPipeline < LogStash::Pipeline
attr_reader :outputs
attr_reader :inputs
attr_reader :filters
end

describe LogStash::Pipeline do
Expand Down Expand Up @@ -143,52 +145,54 @@ class TestPipeline < LogStash::Pipeline

let(:bad_event) { LogStash::Event.new("message" => "bad message") }
let(:good_event) { LogStash::Event.new("message" => "good message") }
let(:pipeline) { LogStash::Pipeline.new(dummy_config) }
let(:pipeline) { TestPipeline.new(dummy_config) }
let(:input) { pipeline.inputs.first }
let(:output) { pipeline.outputs.first }
let(:filter) { pipeline.filters.first }

context "transient exceptions" do
context "input" do
it "should restart and generate more events" do
expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue|
expect(input).to receive(:run).and_return do |queue|
raise StandardError
end
expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue|
expect(input).to receive(:run).and_return do |queue|
queue << good_event
end
expect_any_instance_of(DummyOutput).to receive(:receive).once.with(good_event)
expect_any_instance_of(DummyInput).to receive(:teardown).once
expect(output).to receive(:receive).once.with(good_event)
expect(input).to receive(:teardown).once
expect { pipeline.run }.to_not raise_error
end
end

context "filter" do
it "should restart and process the next event" do
expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue|
expect(input).to receive(:run).and_return do |queue|
queue << bad_event
queue << good_event
end
expect_any_instance_of(DummyFilter).to receive(:filter).with(bad_event).and_return do |event|
expect(filter).to receive(:filter).with(bad_event).and_return do |event|
raise StandardError
end
expect_any_instance_of(DummyFilter).to receive(:filter).with(good_event)
expect_any_instance_of(DummyOutput).to receive(:receive).once.with(good_event)
expect_any_instance_of(DummyFilter).to receive(:teardown).once
expect(filter).to receive(:filter).with(good_event)
expect(output).to receive(:receive).once.with(good_event)
expect { pipeline.run }.to_not raise_error
end
end

context "output" do
it "should restart and process the next message" do
expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue|
expect(input).to receive(:run).and_return do |queue|
queue << bad_event
queue << good_event
end
expect_any_instance_of(DummyOutput).to receive(:receive).with(bad_event).and_return do |event|
expect(output).to receive(:receive).with(bad_event).and_return do |event|
raise StandardError
end
expect_any_instance_of(DummyOutput).to receive(:receive).with(good_event).and_return do |event|
expect(output).to receive(:receive).with(good_event).and_return do |event|
# ...
end
expect_any_instance_of(DummyOutput).to receive(:teardown).once
expect(output).to receive(:teardown).once
expect { pipeline.run }.to_not raise_error
end
end
Expand All @@ -197,36 +201,39 @@ class TestPipeline < LogStash::Pipeline
context "fatal exceptions" do
context "input" do
it "should raise exception" do
expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue|
expect(input).to receive(:run).and_return do |queue|
raise Exception
end
expect_any_instance_of(DummyFilter).to_not receive(:filter)
expect { pipeline.run }.to raise_error(Exception)
expect(filter).to_not receive(:filter)
expect(pipeline).to receive(:shutdown)
expect { pipeline.run }.to_not raise_error
end
end

context "filter" do
it "should raise exception" do
expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue|
expect(input).to receive(:run).and_return do |queue|
queue << bad_event
end
expect_any_instance_of(DummyFilter).to receive(:filter).with(bad_event).and_return do |event|
expect(filter).to receive(:filter).with(bad_event).and_return do |event|
raise Exception
end
expect_any_instance_of(DummyOutput).to_not receive(:receive)
expect { pipeline.run }.to raise_error(Exception)
expect(output).to_not receive(:receive)
expect(pipeline).to receive(:shutdown)
expect { pipeline.run }.to_not raise_error
end
end

context "output" do
it "should raise exception" do
expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue|
expect(input).to receive(:run).and_return do |queue|
queue << bad_event
end
expect_any_instance_of(DummyOutput).to receive(:receive).with(bad_event).and_return do |event|
expect(output).to receive(:receive).with(bad_event).and_return do |event|
raise Exception
end
expect { pipeline.run }.to raise_error(Exception)
expect(pipeline).to receive(:shutdown)
expect { pipeline.run }.to_not raise_error
end
end
end
Expand Down

0 comments on commit ef560ab

Please sign in to comment.