From ef560ab421f9b47d7c1b8689d29e80b82c2e7bf8 Mon Sep 17 00:00:00 2001 From: Joao Duarte Date: Fri, 6 Feb 2015 11:13:59 +0000 Subject: [PATCH] pipeline:refactored specs. added Exception handling --- lib/logstash/pipeline.rb | 96 +++++++++++++++++++++----------------- spec/core/pipeline_spec.rb | 55 ++++++++++++---------- 2 files changed, 83 insertions(+), 68 deletions(-) diff --git a/lib/logstash/pipeline.rb b/lib/logstash/pipeline.rb index 06dda56c500..10e450b1f1f 100644 --- a/lib/logstash/pipeline.rb +++ b/lib/logstash/pipeline.rb @@ -171,65 +171,73 @@ def start_input(plugin) def inputworker(plugin) LogStash::Util::set_thread_name("<#{plugin.class.config_name}") plugin.run(@input_to_filter) + rescue LogStash::ShutdownSignal + # nothing rescue => e - print_exception_information(e) + @logger.error exception_information(e) # TODO: find a way to obtain the event caused the exception sleep RETRY_INTERVAL retry - rescue LogStash::ShutdownSignal - # nothing + rescue Exception => e + @logger.fatal exception_information(e) + shutdown ensure plugin.teardown end # def inputworker def filterworker LogStash::Util::set_thread_name("|worker") - begin - while(event = @input_to_filter.pop) - case event - when LogStash::Event - # use events array to guarantee ordering of origin vs created events - # where created events are emitted by filters like split or metrics - events = [] - filter(event) { |newevent| events << newevent } - events.each { |event| @filter_to_output.push(event) } - when LogStash::FlushEvent - # handle filter flushing here so that non threadsafe filters (thus only running one filterworker) - # don't have to deal with thread safety implementing the flush method - @flusher_lock.synchronize { flush_filters_to_output! } - when LogStash::ShutdownEvent - # pass it down to any other filterworker and stop this worker - @input_to_filter.push(event) - break - end + + while(event = @input_to_filter.pop) + case event + when LogStash::Event + # use events array to guarantee ordering of origin vs created events + # where created events are emitted by filters like split or metrics + events = [] + filter(event) { |newevent| events << newevent } + events.each { |event| @filter_to_output.push(event) } + when LogStash::FlushEvent + # handle filter flushing here so that non threadsafe filters (thus only running one filterworker) + # don't have to deal with thread safety implementing the flush method + @flusher_lock.synchronize { flush_filters_to_output! } + when LogStash::ShutdownEvent + # pass it down to any other filterworker and stop this worker + @input_to_filter.push(event) + break end - rescue => e - print_exception_information(e) - @logger.warn("Discarded event: #{event.to_hash}") - sleep RETRY_INTERVAL - retry - ensure - @filters.each(&:teardown) end + + rescue => e + @logger.error exception_information(e) + @logger.warn("Discarded event: #{event.to_hash}") + sleep RETRY_INTERVAL + retry + rescue Exception => e + @logger.fatal exception_information(e) + shutdown + ensure + @filters.each(&:teardown) end # def filterworker def outputworker LogStash::Util::set_thread_name(">output") - begin - while(event = @filter_to_output.pop) - break if event.is_a?(LogStash::ShutdownEvent) - output(event) - end # while true - rescue => e - print_exception_information(e) - @logger.warn("Discarded event: #{event.to_hash}") - sleep RETRY_INTERVAL - retry - ensure - @outputs.each do |output| - output.worker_plugins.each(&:teardown) - end + while(event = @filter_to_output.pop) + break if event.is_a?(LogStash::ShutdownEvent) + output(event) + end # while true + + rescue => e + @logger.error exception_information(e) + @logger.warn("Discarded event: #{event.to_hash}") + sleep RETRY_INTERVAL + retry + rescue Exception => e + @logger.fatal exception_information(e) + shutdown + ensure + @outputs.each do |output| + output.worker_plugins.each(&:teardown) end end # def outputworker @@ -296,7 +304,7 @@ def flush_filters_to_output!(options = {}) end # flush_filters_to_output! private - def print_exception_information(exception) - @logger.error("Restarting worker: #{exception} => #{exception.backtrace}") + def exception_information(exception) + "Exception information: #{exception} => #{exception.backtrace}" end end # class Pipeline diff --git a/spec/core/pipeline_spec.rb b/spec/core/pipeline_spec.rb index 467635a0574..47e32660555 100644 --- a/spec/core/pipeline_spec.rb +++ b/spec/core/pipeline_spec.rb @@ -68,6 +68,8 @@ def teardown class TestPipeline < LogStash::Pipeline attr_reader :outputs + attr_reader :inputs + attr_reader :filters end describe LogStash::Pipeline do @@ -143,52 +145,54 @@ class TestPipeline < LogStash::Pipeline let(:bad_event) { LogStash::Event.new("message" => "bad message") } let(:good_event) { LogStash::Event.new("message" => "good message") } - let(:pipeline) { LogStash::Pipeline.new(dummy_config) } + let(:pipeline) { TestPipeline.new(dummy_config) } + let(:input) { pipeline.inputs.first } + let(:output) { pipeline.outputs.first } + let(:filter) { pipeline.filters.first } context "transient exceptions" do context "input" do it "should restart and generate more events" do - expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue| + expect(input).to receive(:run).and_return do |queue| raise StandardError end - expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue| + expect(input).to receive(:run).and_return do |queue| queue << good_event end - expect_any_instance_of(DummyOutput).to receive(:receive).once.with(good_event) - expect_any_instance_of(DummyInput).to receive(:teardown).once + expect(output).to receive(:receive).once.with(good_event) + expect(input).to receive(:teardown).once expect { pipeline.run }.to_not raise_error end end context "filter" do it "should restart and process the next event" do - expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue| + expect(input).to receive(:run).and_return do |queue| queue << bad_event queue << good_event end - expect_any_instance_of(DummyFilter).to receive(:filter).with(bad_event).and_return do |event| + expect(filter).to receive(:filter).with(bad_event).and_return do |event| raise StandardError end - expect_any_instance_of(DummyFilter).to receive(:filter).with(good_event) - expect_any_instance_of(DummyOutput).to receive(:receive).once.with(good_event) - expect_any_instance_of(DummyFilter).to receive(:teardown).once + expect(filter).to receive(:filter).with(good_event) + expect(output).to receive(:receive).once.with(good_event) expect { pipeline.run }.to_not raise_error end end context "output" do it "should restart and process the next message" do - expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue| + expect(input).to receive(:run).and_return do |queue| queue << bad_event queue << good_event end - expect_any_instance_of(DummyOutput).to receive(:receive).with(bad_event).and_return do |event| + expect(output).to receive(:receive).with(bad_event).and_return do |event| raise StandardError end - expect_any_instance_of(DummyOutput).to receive(:receive).with(good_event).and_return do |event| + expect(output).to receive(:receive).with(good_event).and_return do |event| # ... end - expect_any_instance_of(DummyOutput).to receive(:teardown).once + expect(output).to receive(:teardown).once expect { pipeline.run }.to_not raise_error end end @@ -197,36 +201,39 @@ class TestPipeline < LogStash::Pipeline context "fatal exceptions" do context "input" do it "should raise exception" do - expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue| + expect(input).to receive(:run).and_return do |queue| raise Exception end - expect_any_instance_of(DummyFilter).to_not receive(:filter) - expect { pipeline.run }.to raise_error(Exception) + expect(filter).to_not receive(:filter) + expect(pipeline).to receive(:shutdown) + expect { pipeline.run }.to_not raise_error end end context "filter" do it "should raise exception" do - expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue| + expect(input).to receive(:run).and_return do |queue| queue << bad_event end - expect_any_instance_of(DummyFilter).to receive(:filter).with(bad_event).and_return do |event| + expect(filter).to receive(:filter).with(bad_event).and_return do |event| raise Exception end - expect_any_instance_of(DummyOutput).to_not receive(:receive) - expect { pipeline.run }.to raise_error(Exception) + expect(output).to_not receive(:receive) + expect(pipeline).to receive(:shutdown) + expect { pipeline.run }.to_not raise_error end end context "output" do it "should raise exception" do - expect_any_instance_of(DummyInput).to receive(:run).and_return do |queue| + expect(input).to receive(:run).and_return do |queue| queue << bad_event end - expect_any_instance_of(DummyOutput).to receive(:receive).with(bad_event).and_return do |event| + expect(output).to receive(:receive).with(bad_event).and_return do |event| raise Exception end - expect { pipeline.run }.to raise_error(Exception) + expect(pipeline).to receive(:shutdown) + expect { pipeline.run }.to_not raise_error end end end