From 79b90b19d7ef72b77bea1725616fde9af5c3eddb Mon Sep 17 00:00:00 2001 From: Joao Duarte Date: Wed, 12 Aug 2015 16:36:35 +0100 Subject: [PATCH] add shutdown controller to force exit on stalled shutdown * start logstash with --allow-unsafe-shutdown to force_exit on stalled shutdown * by default --allow-unsafe-shutdown is disabled * stall detection kicks in when SIGTERM/SIGINT is received * check if inflight event count isn't going down and if there are blocked/blocking plugin threads --- lib/logstash/shutdown_controller.rb | 127 ++++++++++++++++++++ logstash-core/lib/logstash/agent.rb | 11 +- logstash-core/lib/logstash/filters/base.rb | 1 + logstash-core/lib/logstash/outputs/base.rb | 10 +- logstash-core/lib/logstash/pipeline.rb | 64 ++++++++-- logstash-core/lib/logstash/plugin.rb | 5 + logstash-core/lib/logstash/util.rb | 35 ++++++ logstash-core/lib/logstash/util/reporter.rb | 28 ----- logstash-core/locales/en.yml | 5 + spec/core/shutdown_controller_spec.rb | 107 +++++++++++++++++ 10 files changed, 353 insertions(+), 40 deletions(-) create mode 100644 lib/logstash/shutdown_controller.rb delete mode 100644 logstash-core/lib/logstash/util/reporter.rb create mode 100644 spec/core/shutdown_controller_spec.rb diff --git a/lib/logstash/shutdown_controller.rb b/lib/logstash/shutdown_controller.rb new file mode 100644 index 00000000000..6941753bbc8 --- /dev/null +++ b/lib/logstash/shutdown_controller.rb @@ -0,0 +1,127 @@ +# encoding: utf-8 + +module LogStash + class ShutdownController + + CHECK_EVERY = 1 # second + REPORT_EVERY = 5 # checks + ABORT_AFTER = 3 # stalled reports + + attr_reader :cycle_period, :report_every, :abort_threshold + + def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) + @pipeline = pipeline + @cycle_period = cycle_period + @report_every = report_every + @abort_threshold = abort_threshold + @reports = [] + end + + def self.unsafe_shutdown=(boolean) + @unsafe_shutdown = boolean + end + + def self.unsafe_shutdown? + @unsafe_shutdown + end + + def self.logger=(logger) + @logger = logger + end + + def self.logger + @logger ||= Cabin::Channel.get(LogStash) + end + + def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) + controller = self.new(pipeline, cycle_period, report_every, abort_threshold) + Thread.new(controller) { |controller| controller.start } + end + + def logger + self.class.logger + end + + def start + sleep(@cycle_period) + cycle_number = 0 + stalled_count = 0 + Stud.interval(@cycle_period) do + @reports << Report.from_pipeline(@pipeline) + @reports.delete_at(0) if @reports.size > @report_every # expire old report + if cycle_number == (@report_every - 1) # it's report time! + logger.warn(@reports.last.to_hash) + + if shutdown_stalled? + logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0 + stalled_count += 1 + + if self.class.unsafe_shutdown? && @abort_threshold == stalled_count + logger.fatal("Forcefully quitting logstash..") + force_exit() + break + end + else + stalled_count = 0 + end + end + cycle_number = (cycle_number + 1) % @report_every + end + end + + # A pipeline shutdown is stalled if + # * at least REPORT_EVERY reports have been created + # * the inflight event count is in monotonically increasing + # * there are worker threads running which aren't blocked on SizedQueue pop/push + # * the stalled thread list is constant in the previous REPORT_EVERY reports + def shutdown_stalled? + return false unless @reports.size == @report_every # + # is stalled if inflight count is either constant or increasing + stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report| + prev_report.inflight_count["total"] <= next_report.inflight_count["total"] + end + if stalled_event_count + @reports.each_cons(2).all? do |prev_report, next_report| + prev_report.stalling_threads == next_report.stalling_threads + end + else + false + end + end + + def force_exit + exit(-1) + end + end + + class Report + + attr_reader :inflight_count, :stalling_threads + + def self.from_pipeline(pipeline) + new(pipeline.inflight_count, pipeline.stalling_threads) + end + + def initialize(inflight_count, stalling_threads) + @inflight_count = inflight_count + @stalling_threads = format_threads_by_plugin(stalling_threads) + end + + def to_hash + { + "INFLIGHT_EVENT_COUNT" => @inflight_count, + "STALLING_THREADS" => @stalling_threads + } + end + + def format_threads_by_plugin(stalling_threads) + stalled_plugins = {} + stalling_threads.each do |thr| + key = (thr.delete("plugin") || "other") + stalled_plugins[key] ||= [] + stalled_plugins[key] << thr + end + stalled_plugins + end + end +end diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index bcf1fba0740..cd9d0c6941e 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -50,6 +50,11 @@ class LogStash::Agent < Clamp::Command I18n.t("logstash.agent.flag.configtest"), :attribute_name => :config_test + option "--[no-]allow-unsafe-shutdown", :flag, + I18n.t("logstash.agent.flag.unsafe_shutdown"), + :attribute_name => :unsafe_shutdown, + :default => false + # Emit a warning message. def warn(message) # For now, all warnings are fatal. @@ -75,6 +80,9 @@ def execute require "logstash/plugin" @logger = Cabin::Channel.get(LogStash) + LogStash::ShutdownController.unsafe_shutdown = unsafe_shutdown? + LogStash::ShutdownController.logger = @logger + if version? show_version return 0 @@ -176,8 +184,7 @@ def execute def shutdown(pipeline) pipeline.shutdown do - InflightEventsReporter.logger = @logger - InflightEventsReporter.start(pipeline.input_to_filter, pipeline.filter_to_output, pipeline.outputs) + ::LogStash::ShutdownController.start(pipeline) end end diff --git a/logstash-core/lib/logstash/filters/base.rb b/logstash-core/lib/logstash/filters/base.rb index d2f1c601d05..d2813e5f9c0 100644 --- a/logstash-core/lib/logstash/filters/base.rb +++ b/logstash-core/lib/logstash/filters/base.rb @@ -143,6 +143,7 @@ def filter(event) # @return [Array :number, :default => 1 - attr_reader :worker_plugins, :worker_queue + attr_reader :worker_plugins, :worker_queue, :worker_threads public def workers_not_supported(message=nil) @@ -56,13 +56,15 @@ def receive(event) def worker_setup if @workers == 1 @worker_plugins = [self] + @worker_threads = [] else define_singleton_method(:handle, method(:handle_worker)) @worker_queue = SizedQueue.new(20) @worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) } - @worker_plugins.map.with_index do |plugin, i| + @worker_threads = @worker_plugins.map.with_index do |plugin, i| Thread.new(original_params, @worker_queue) do |params, queue| - LogStash::Util::set_thread_name(">#{self.class.config_name}.#{i}") + LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}") + LogStash::Util.set_thread_plugin(self) plugin.register while true event = queue.pop @@ -75,10 +77,12 @@ def worker_setup public def handle(event) + LogStash::Util.set_thread_plugin(self) receive(event) end # def handle def handle_worker(event) + LogStash::Util.set_thread_plugin(self) @worker_queue.push(event) end diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 069707f02e2..ed943f3180c 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -9,11 +9,11 @@ require "logstash/filters/base" require "logstash/inputs/base" require "logstash/outputs/base" -require "logstash/util/reporter" require "logstash/config/cpu_core_strategy" require "logstash/util/defaults_printer" +require "logstash/shutdown_controller" -class LogStash::Pipeline +module LogStash; class Pipeline attr_reader :inputs, :filters, :outputs, :input_to_filter, :filter_to_output def initialize(configstr) @@ -25,6 +25,7 @@ def initialize(configstr) grammar = LogStashConfigParser.new @config = grammar.parse(configstr) + if @config.nil? raise LogStash::ConfigurationError, grammar.failure_reason end @@ -170,8 +171,11 @@ def start_filters # dynamically get thread count based on filter threadsafety # moved this test to here to allow for future config reloading to_start = safe_filter_worker_count - @filter_threads = to_start.times.collect do - Thread.new { filterworker } + @filter_threads = to_start.times.collect do |i| + Thread.new do + LogStash::Util.set_thread_name("|filterworker.#{i}") + filterworker + end end actually_started = @filter_threads.select(&:alive?).size msg = "Worker threads expected: #{to_start}, worker threads started: #{actually_started}" @@ -195,7 +199,8 @@ def start_input(plugin) end def inputworker(plugin) - LogStash::Util::set_thread_name("<#{plugin.class.config_name}") + LogStash::Util.set_thread_name("<#{plugin.class.config_name}") + LogStash::Util.set_thread_plugin(plugin) begin plugin.run(@input_to_filter) rescue => e @@ -228,7 +233,6 @@ def inputworker(plugin) end # def inputworker def filterworker - LogStash::Util.set_thread_name("|worker") begin while true event = @input_to_filter.pop @@ -270,6 +274,7 @@ def outputworker event = @filter_to_output.pop break if event == LogStash::SHUTDOWN output_func(event) + LogStash::Util.set_thread_plugin(nil) end ensure @outputs.each do |output| @@ -329,4 +334,49 @@ def flush_filters_to_output!(options = {}) end end # flush_filters_to_output! -end # class Pipeline + def inflight_count + data = {} + total = 0 + + input_to_filter = @input_to_filter.size + total += input_to_filter + filter_to_output = @filter_to_output.size + total += filter_to_output + + data["input_to_filter"] = input_to_filter if input_to_filter > 0 + data["filter_to_output"] = filter_to_output if filter_to_output > 0 + + output_worker_queues = [] + @outputs.each do |output| + next unless output.worker_queue && output.worker_queue.size > 0 + plugin_info = output.debug_info + size = output.worker_queue.size + total += size + plugin_info << size + output_worker_queues << plugin_info + end + data["output_worker_queues"] = output_worker_queues unless output_worker_queues.empty? + data["total"] = total + data + end + + def stalling_threads + plugin_threads + .reject {|t| t["blocked_on"] } # known begnin blocking statuses + .each {|t| t.delete("backtrace") } + .each {|t| t.delete("blocked_on") } + .each {|t| t.delete("status") } + end + + def plugin_threads + input_threads = @input_threads.select {|t| t.alive? }.map {|t| thread_info(t) } + filter_threads = @filter_threads.select {|t| t.alive? }.map {|t| thread_info(t) } + output_threads = @output_threads.select {|t| t.alive? }.map {|t| thread_info(t) } + output_worker_threads = @outputs.flat_map {|output| output.worker_threads }.map {|t| thread_info(t) } + input_threads + filter_threads + output_threads + output_worker_threads + end + + def thread_info(thread) + LogStash::Util.thread_info(thread) + end +end; end diff --git a/logstash-core/lib/logstash/plugin.rb b/logstash-core/lib/logstash/plugin.rb index e4ed6171ecc..bfab9a58d28 100644 --- a/logstash-core/lib/logstash/plugin.rb +++ b/logstash-core/lib/logstash/plugin.rb @@ -59,6 +59,11 @@ def inspect end end + public + def debug_info + [self.class.to_s, original_params] + end + # Look up a plugin by type and name. public def self.lookup(type, name) diff --git a/logstash-core/lib/logstash/util.rb b/logstash-core/lib/logstash/util.rb index 2034803f43c..d3c5fe6ff41 100644 --- a/logstash-core/lib/logstash/util.rb +++ b/logstash-core/lib/logstash/util.rb @@ -24,6 +24,41 @@ def self.set_thread_name(name) end end # def set_thread_name + def self.set_thread_plugin(plugin) + Thread.current[:plugin] = plugin + end + + def self.get_thread_id(thread) + if RUBY_ENGINE == "jruby" + JRuby.reference(thread).native_thread.id + else + raise Exception.new("Native thread IDs aren't supported outside of JRuby") + end + end + + def self.thread_info(thread) + backtrace = thread.backtrace.map do |line| + line.gsub(LogStash::Environment::LOGSTASH_HOME, "[...]") + end + + blocked_on = case backtrace.first + when /in `push'/ then "blocked_on_push" + when /(?:pipeline|base).*pop/ then "waiting_for_events" + else nil + end + + { + "thread_id" => get_thread_id(thread), + "name" => thread[:name], + "plugin" => (thread[:plugin] ? thread[:plugin].debug_info : nil), + "backtrace" => backtrace, + "blocked_on" => blocked_on, + "status" => thread.status, + "current_call" => backtrace.first + } + end + + # Merge hash 'src' into 'dst' nondestructively # # Duplicate keys will become array values diff --git a/logstash-core/lib/logstash/util/reporter.rb b/logstash-core/lib/logstash/util/reporter.rb deleted file mode 100644 index 4d983a25e3e..00000000000 --- a/logstash-core/lib/logstash/util/reporter.rb +++ /dev/null @@ -1,28 +0,0 @@ -# encoding: utf-8 -class InflightEventsReporter - def self.logger=(logger) - @logger = logger - end - - def self.start(input_to_filter, filter_to_output, outputs) - Thread.new do - loop do - sleep 5 - report(input_to_filter, filter_to_output, outputs) - end - end - end - - def self.report(input_to_filter, filter_to_output, outputs) - report = { - "input_to_filter" => input_to_filter.size, - "filter_to_output" => filter_to_output.size, - "outputs" => [] - } - outputs.each do |output| - next unless output.worker_queue && output.worker_queue.size > 0 - report["outputs"] << [output.inspect, output.worker_queue.size] - end - @logger.warn ["INFLIGHT_EVENTS_REPORT", Time.now.iso8601, report] - end -end diff --git a/logstash-core/locales/en.yml b/logstash-core/locales/en.yml index f89fb254fed..3626144521a 100644 --- a/logstash-core/locales/en.yml +++ b/logstash-core/locales/en.yml @@ -187,3 +187,8 @@ en: debug: |+ Most verbose logging. This causes 'debug' level logs to be emitted. + unsafe_shutdown: |+ + Force logstash to exit during shutdown even + if there are still inflight events in memory. + By default, logstash will refuse to quit until all + received events have been pushed to the outputs. diff --git a/spec/core/shutdown_controller_spec.rb b/spec/core/shutdown_controller_spec.rb new file mode 100644 index 00000000000..5f755f290a8 --- /dev/null +++ b/spec/core/shutdown_controller_spec.rb @@ -0,0 +1,107 @@ +# encoding: utf-8 +require "spec_helper" +require "logstash/shutdown_controller" + +describe LogStash::ShutdownController do + + let(:check_every) { 0.01 } + let(:check_threshold) { 100 } + subject { LogStash::ShutdownController.new(pipeline, check_every) } + let(:pipeline) { double("pipeline") } + report_count = 0 + + before :each do + allow(LogStash::Report).to receive(:from_pipeline).and_wrap_original do |m, *args| + report_count += 1 + m.call(*args) + end + end + + after :each do + report_count = 0 + end + + context "when pipeline is stalled" do + let(:increasing_count) { (1..5000).to_a.map {|i| { "total" => i } } } + before :each do + allow(pipeline).to receive(:inflight_count).and_return(*increasing_count) + allow(pipeline).to receive(:stalling_threads) { { } } + end + + describe ".unsafe_shutdown = true" do + let(:abort_threshold) { subject.abort_threshold } + let(:report_every) { subject.report_every } + + before :each do + subject.class.unsafe_shutdown = true + end + + it "should force the shutdown" do + expect(subject).to receive(:force_exit).once + subject.start + end + + it "should do exactly \"abort_threshold\" stall checks" do + allow(subject).to receive(:force_exit) + expect(subject).to receive(:shutdown_stalled?).exactly(abort_threshold).times.and_call_original + subject.start + end + + it "should do exactly \"abort_threshold\"*\"report_every\" stall checks" do + allow(subject).to receive(:force_exit) + expect(LogStash::Report).to receive(:from_pipeline).exactly(abort_threshold*report_every).times.and_call_original + subject.start + end + end + + describe ".unsafe_shutdown = false" do + + before :each do + subject.class.unsafe_shutdown = false + end + + it "shouldn't force the shutdown" do + expect(subject).to_not receive(:force_exit) + thread = Thread.new(subject) {|subject| subject.start } + sleep 0.1 until report_count > check_threshold + thread.kill + end + end + end + + context "when pipeline is not stalled" do + let(:decreasing_count) { (1..5000).to_a.reverse.map {|i| { "total" => i } } } + before :each do + allow(pipeline).to receive(:inflight_count).and_return(*decreasing_count) + allow(pipeline).to receive(:stalling_threads) { { } } + end + + describe ".unsafe_shutdown = true" do + + before :each do + subject.class.unsafe_shutdown = true + end + + it "should force the shutdown" do + expect(subject).to_not receive(:force_exit) + thread = Thread.new(subject) {|subject| subject.start } + sleep 0.1 until report_count > check_threshold + thread.kill + end + end + + describe ".unsafe_shutdown = false" do + + before :each do + subject.class.unsafe_shutdown = false + end + + it "shouldn't force the shutdown" do + expect(subject).to_not receive(:force_exit) + thread = Thread.new(subject) {|subject| subject.start } + sleep 0.1 until report_count > check_threshold + thread.kill + end + end + end +end