Skip to content

Commit

Permalink
add shutdown controller to force exit on stalled shutdown
Browse files Browse the repository at this point in the history
* start logstash with --allow-unsafe-shutdown to force_exit on stalled shutdown
* by default --allow-unsafe-shutdown is disabled
* stall detection kicks in when SIGTERM/SIGINT is received
* check if inflight event count isn't going down and if there are blocked/blocking plugin threads
  • Loading branch information
jsvd committed Nov 19, 2015
1 parent 64ae981 commit 79b90b1
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 40 deletions.
127 changes: 127 additions & 0 deletions lib/logstash/shutdown_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# encoding: utf-8

module LogStash
class ShutdownController

CHECK_EVERY = 1 # second
REPORT_EVERY = 5 # checks
ABORT_AFTER = 3 # stalled reports

attr_reader :cycle_period, :report_every, :abort_threshold

def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
@pipeline = pipeline
@cycle_period = cycle_period
@report_every = report_every
@abort_threshold = abort_threshold
@reports = []
end

def self.unsafe_shutdown=(boolean)
@unsafe_shutdown = boolean
end

def self.unsafe_shutdown?
@unsafe_shutdown
end

def self.logger=(logger)
@logger = logger
end

def self.logger
@logger ||= Cabin::Channel.get(LogStash)
end

def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
controller = self.new(pipeline, cycle_period, report_every, abort_threshold)
Thread.new(controller) { |controller| controller.start }
end

def logger
self.class.logger
end

def start
sleep(@cycle_period)
cycle_number = 0
stalled_count = 0
Stud.interval(@cycle_period) do
@reports << Report.from_pipeline(@pipeline)
@reports.delete_at(0) if @reports.size > @report_every # expire old report
if cycle_number == (@report_every - 1) # it's report time!
logger.warn(@reports.last.to_hash)

if shutdown_stalled?
logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0
stalled_count += 1

if self.class.unsafe_shutdown? && @abort_threshold == stalled_count
logger.fatal("Forcefully quitting logstash..")
force_exit()
break
end
else
stalled_count = 0
end
end
cycle_number = (cycle_number + 1) % @report_every
end
end

# A pipeline shutdown is stalled if
# * at least REPORT_EVERY reports have been created
# * the inflight event count is in monotonically increasing
# * there are worker threads running which aren't blocked on SizedQueue pop/push
# * the stalled thread list is constant in the previous REPORT_EVERY reports
def shutdown_stalled?
return false unless @reports.size == @report_every #
# is stalled if inflight count is either constant or increasing
stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report|
prev_report.inflight_count["total"] <= next_report.inflight_count["total"]
end
if stalled_event_count
@reports.each_cons(2).all? do |prev_report, next_report|
prev_report.stalling_threads == next_report.stalling_threads
end
else
false
end
end

def force_exit
exit(-1)
end
end

class Report

attr_reader :inflight_count, :stalling_threads

def self.from_pipeline(pipeline)
new(pipeline.inflight_count, pipeline.stalling_threads)
end

def initialize(inflight_count, stalling_threads)
@inflight_count = inflight_count
@stalling_threads = format_threads_by_plugin(stalling_threads)
end

def to_hash
{
"INFLIGHT_EVENT_COUNT" => @inflight_count,
"STALLING_THREADS" => @stalling_threads
}
end

def format_threads_by_plugin(stalling_threads)
stalled_plugins = {}
stalling_threads.each do |thr|
key = (thr.delete("plugin") || "other")
stalled_plugins[key] ||= []
stalled_plugins[key] << thr
end
stalled_plugins
end
end
end
11 changes: 9 additions & 2 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class LogStash::Agent < Clamp::Command
I18n.t("logstash.agent.flag.configtest"),
:attribute_name => :config_test

option "--[no-]allow-unsafe-shutdown", :flag,
I18n.t("logstash.agent.flag.unsafe_shutdown"),
:attribute_name => :unsafe_shutdown,
:default => false

# Emit a warning message.
def warn(message)
# For now, all warnings are fatal.
Expand All @@ -75,6 +80,9 @@ def execute
require "logstash/plugin"
@logger = Cabin::Channel.get(LogStash)

LogStash::ShutdownController.unsafe_shutdown = unsafe_shutdown?
LogStash::ShutdownController.logger = @logger

if version?
show_version
return 0
Expand Down Expand Up @@ -176,8 +184,7 @@ def execute

def shutdown(pipeline)
pipeline.shutdown do
InflightEventsReporter.logger = @logger
InflightEventsReporter.start(pipeline.input_to_filter, pipeline.filter_to_output, pipeline.outputs)
::LogStash::ShutdownController.start(pipeline)
end
end

Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/filters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def filter(event)
# @return [Array<LogStash::Event] filtered events and any new events generated by the filter
public
def multi_filter(events)
LogStash::Util.set_thread_plugin(self)
result = []
events.each do |event|
unless event.cancelled?
Expand Down
10 changes: 7 additions & 3 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
# Note that this setting may not be useful for all outputs.
config :workers, :validate => :number, :default => 1

attr_reader :worker_plugins, :worker_queue
attr_reader :worker_plugins, :worker_queue, :worker_threads

public
def workers_not_supported(message=nil)
Expand Down Expand Up @@ -56,13 +56,15 @@ def receive(event)
def worker_setup
if @workers == 1
@worker_plugins = [self]
@worker_threads = []
else
define_singleton_method(:handle, method(:handle_worker))
@worker_queue = SizedQueue.new(20)
@worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
@worker_plugins.map.with_index do |plugin, i|
@worker_threads = @worker_plugins.map.with_index do |plugin, i|
Thread.new(original_params, @worker_queue) do |params, queue|
LogStash::Util::set_thread_name(">#{self.class.config_name}.#{i}")
LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}")
LogStash::Util.set_thread_plugin(self)
plugin.register
while true
event = queue.pop
Expand All @@ -75,10 +77,12 @@ def worker_setup

public
def handle(event)
LogStash::Util.set_thread_plugin(self)
receive(event)
end # def handle

def handle_worker(event)
LogStash::Util.set_thread_plugin(self)
@worker_queue.push(event)
end

Expand Down
64 changes: 57 additions & 7 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
require "logstash/filters/base"
require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/util/reporter"
require "logstash/config/cpu_core_strategy"
require "logstash/util/defaults_printer"
require "logstash/shutdown_controller"

class LogStash::Pipeline
module LogStash; class Pipeline
attr_reader :inputs, :filters, :outputs, :input_to_filter, :filter_to_output

def initialize(configstr)
Expand All @@ -25,6 +25,7 @@ def initialize(configstr)

grammar = LogStashConfigParser.new
@config = grammar.parse(configstr)

if @config.nil?
raise LogStash::ConfigurationError, grammar.failure_reason
end
Expand Down Expand Up @@ -170,8 +171,11 @@ def start_filters
# dynamically get thread count based on filter threadsafety
# moved this test to here to allow for future config reloading
to_start = safe_filter_worker_count
@filter_threads = to_start.times.collect do
Thread.new { filterworker }
@filter_threads = to_start.times.collect do |i|
Thread.new do
LogStash::Util.set_thread_name("|filterworker.#{i}")
filterworker
end
end
actually_started = @filter_threads.select(&:alive?).size
msg = "Worker threads expected: #{to_start}, worker threads started: #{actually_started}"
Expand All @@ -195,7 +199,8 @@ def start_input(plugin)
end

def inputworker(plugin)
LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
LogStash::Util.set_thread_name("<#{plugin.class.config_name}")
LogStash::Util.set_thread_plugin(plugin)
begin
plugin.run(@input_to_filter)
rescue => e
Expand Down Expand Up @@ -228,7 +233,6 @@ def inputworker(plugin)
end # def inputworker

def filterworker
LogStash::Util.set_thread_name("|worker")
begin
while true
event = @input_to_filter.pop
Expand Down Expand Up @@ -270,6 +274,7 @@ def outputworker
event = @filter_to_output.pop
break if event == LogStash::SHUTDOWN
output_func(event)
LogStash::Util.set_thread_plugin(nil)
end
ensure
@outputs.each do |output|
Expand Down Expand Up @@ -329,4 +334,49 @@ def flush_filters_to_output!(options = {})
end
end # flush_filters_to_output!

end # class Pipeline
def inflight_count
data = {}
total = 0

input_to_filter = @input_to_filter.size
total += input_to_filter
filter_to_output = @filter_to_output.size
total += filter_to_output

data["input_to_filter"] = input_to_filter if input_to_filter > 0
data["filter_to_output"] = filter_to_output if filter_to_output > 0

output_worker_queues = []
@outputs.each do |output|
next unless output.worker_queue && output.worker_queue.size > 0
plugin_info = output.debug_info
size = output.worker_queue.size
total += size
plugin_info << size
output_worker_queues << plugin_info
end
data["output_worker_queues"] = output_worker_queues unless output_worker_queues.empty?
data["total"] = total
data
end

def stalling_threads
plugin_threads
.reject {|t| t["blocked_on"] } # known begnin blocking statuses
.each {|t| t.delete("backtrace") }
.each {|t| t.delete("blocked_on") }
.each {|t| t.delete("status") }
end

def plugin_threads
input_threads = @input_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
filter_threads = @filter_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
output_threads = @output_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
output_worker_threads = @outputs.flat_map {|output| output.worker_threads }.map {|t| thread_info(t) }
input_threads + filter_threads + output_threads + output_worker_threads
end

def thread_info(thread)
LogStash::Util.thread_info(thread)
end
end; end
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def inspect
end
end

public
def debug_info
[self.class.to_s, original_params]
end

# Look up a plugin by type and name.
public
def self.lookup(type, name)
Expand Down
35 changes: 35 additions & 0 deletions logstash-core/lib/logstash/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,41 @@ def self.set_thread_name(name)
end
end # def set_thread_name

def self.set_thread_plugin(plugin)
Thread.current[:plugin] = plugin
end

def self.get_thread_id(thread)
if RUBY_ENGINE == "jruby"
JRuby.reference(thread).native_thread.id
else
raise Exception.new("Native thread IDs aren't supported outside of JRuby")
end
end

def self.thread_info(thread)
backtrace = thread.backtrace.map do |line|
line.gsub(LogStash::Environment::LOGSTASH_HOME, "[...]")
end

blocked_on = case backtrace.first
when /in `push'/ then "blocked_on_push"
when /(?:pipeline|base).*pop/ then "waiting_for_events"
else nil
end

{
"thread_id" => get_thread_id(thread),
"name" => thread[:name],
"plugin" => (thread[:plugin] ? thread[:plugin].debug_info : nil),
"backtrace" => backtrace,
"blocked_on" => blocked_on,
"status" => thread.status,
"current_call" => backtrace.first
}
end


# Merge hash 'src' into 'dst' nondestructively
#
# Duplicate keys will become array values
Expand Down
Loading

0 comments on commit 79b90b1

Please sign in to comment.