-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shutdown Semantics: Exception handling in the pipeline #2477
Comments
Not sure it's related. When I have an exception in a filter, I can't stop anymore Logstash process from running when using Linux pipe. For example: Consider the following input {
stdin { }
}
filter {
grok {
match => {
"message" => '^%{DATA:start}\[%{HTTPDATE:timestamp}\]%{DATA:end}$'
}
}
date {
match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
locale => en
}
# Bump the time forward
ruby {
init => "last = Time.iso8601('2014-09-25T12:00:00+00:00'); @shift = Time.iso8601('2015-04-10T15:00:00.000+02:00') - last"
code => "event['@timestamp'] += @shift"
}
mutate {
replace => { "message" => "%{start}[%{+dd/MMM/YYYY:HH:mm:ss Z}]%{end}" }
}
}
output {
stdout { codec => line { format => "%{message}" } }
}
When running: bin/logstash -f logstash.conf I got the error and I can stop the process using But when running: cat logs | bin/logstash -f logstash.conf I can not stop the Logstash process and I need to |
okay I'll try out the latest again |
The aforementioned fixes make 1.5 appear to be stable and working. Still
|
@joekiller exactly which "aforementioned fixes"? |
#2928 I'm thinking however I did take the build from master so it could
|
My 2 cents. Many plugins do not handle the range of exceptions with enough forethought. For example, imagine a SMTP output plugin. There are many ways that a SMTP call can fail: We need a mechanism in the plugin to handle these sorts of errors appropriately. I suggest a default wrapper that provides the minimum standard and encourage plugin developers to provide a specific subclass for that plugin if the default one is too generic. def receive(event)
DefaultErrorHandler.new(self).exec do
# some receive code
end
end and in LogStash::Plugin class DefaultErrorHandler < SimpleDelegator
# initialize provided by SimpleDelegator
def exec
begin
yield if block_given?
rescue StandardError => e
logger.error(default_error_log_message, :error => e.inspect)
end
end
private
def default_error_log_message
"Unexpected error occurred in #{type} plugin: #{name}"
end
end and in the fictitious SMTP output SmtpTransientError = Class.new(TransientException)
class SmtpOutputErrorHandler < SimpleDelegator
MaxRetries = 10
attr_reader :retry_count
# initialize provided by SimpleDelegator
def exec
@retry_count = 0
begin
yield if block_given?
rescue Net::SMTPAuthenticationError,
Net::SMTPSyntaxError,
Net::SMTPFatalError,
Net::SMTPUnknownError => e
raise SmtpTransientError.build_from(e)
rescue Net::SMTPServerBusy,
Net::OpenTimeout,
IOError
if retry_count < MaxRetries
retry_count = retry_count.next
sleep 0.1
retry
else
logger.error(retry_error_log_message(e), :error => e.inspect)
end
rescue StandardError => e
logger.error(default_error_log_message, :error => e.inspect)
end
end
private
def retry_error_log_message(error)
"#{error.class} error occurred in output plugin: SMTP, retries exhausted"
end
end Another example, a Mongo plugin, it should provide rescue causes for Mongo errors and non Mongo errors, as a Mongo error has an error_code method and other errors do not. |
Related issues/prs:
#1373
#1250
#2130
#2152
The pipeline is composed of {input,filter,output}workers that execute the plugins' code and take care of uncaught exceptions that may occur during their execution.
These workers should tolerate (hopefully) transient exceptions and not allow the pipeline to crash. Also, known fatal exceptions should abort the pipeline execution and terminate logstash. Therefore, workers should have the following behaviour:
Input workers have a long running method call
run
Filters and output workers continuously pop from a queue
What is a TransientException / FatalException?
We have no way of knowing what the plugin considers a transient/fatal exception. Two possible choices:
Questions
Should a transient failure call some setup/teardown behavior on the plugin?
Current code for the input plugin calls teardown and then retries, but at that point the plugin instance might no longer be usable.
For filters and outputs, the shutdown is done through a ShutdownEvent. However, if a plugin is very slow (e.g. executing sleep 100000), it will never pop the ShutdownEvent from the queue and execute teardown. Should there be a pipeline.shutdown(force=true) on some situations?
Thread naming problem
Related issues:
#2462
#2425
Best practices when developing plugins
Exception handling recommendations:
The text was updated successfully, but these errors were encountered: