Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

health api: expose GET /_health_report with pipelines/*/status probe #16398

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions logstash-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def versionMap = (Map) (new Yaml()).load(new File("$projectDir/../versions.yml")

description = """Logstash Core Java"""

String logstashCoreVersion = versionMap['logstash-core']
String jacksonVersion = versionMap['jackson']
String jacksonDatabindVersion = versionMap['jackson-databind']
String jrubyVersion = versionMap['jruby']['version']
Expand Down Expand Up @@ -183,6 +184,23 @@ artifacts {
}
}

task generateVersionInfoResources(type: DefaultTask) {
ext.outDir = layout.buildDirectory.dir("generated-resources/version-info").get()

inputs.property("version-info:logstash-core", logstashCoreVersion)
outputs.dir(ext.outDir)

doLast {
mkdir outDir;
def resourceFile = outDir.file('version-info.properties').asFile
resourceFile.text = "logstash-core: ${logstashCoreVersion}"
}
}
sourceSets {
main { output.dir(generateVersionInfoResources.outputs.files) }
}
processResources.dependsOn generateVersionInfoResources

configurations {
provided
}
Expand Down
27 changes: 26 additions & 1 deletion logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
@ephemeral_id = SecureRandom.uuid

java_import("org.logstash.health.HealthObserver")
@health_observer = HealthObserver.new
@health_observer ||= HealthObserver.new

# Mutex to synchronize in the exclusive method
# Initial usage for the Ruby pipeline initialization which is not thread safe
Expand Down Expand Up @@ -156,6 +156,31 @@ def execute
transition_to_stopped
end

include org.logstash.health.PipelineIndicator::PipelineDetailsProvider
def pipeline_details(pipeline_id)
logger.trace("fetching pipeline details for `#{pipeline_id}`")
pipeline_id = pipeline_id.to_sym

java_import org.logstash.health.PipelineIndicator

pipeline_state = @pipelines_registry.states.get(pipeline_id)
if pipeline_state.nil?
return PipelineIndicator::Details.new(PipelineIndicator::Status::UNKNOWN)
end

status = pipeline_state.synchronize do |sync_state|
case
when sync_state.loading? then PipelineIndicator::Status::LOADING
when sync_state.crashed? then PipelineIndicator::Status::TERMINATED
when sync_state.running? then PipelineIndicator::Status::RUNNING
when sync_state.finished? then PipelineIndicator::Status::FINISHED
else PipelineIndicator::Status::UNKNOWN
end
end

return PipelineIndicator::Details.new(status)
end

def auto_reload?
@auto_reload
end
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/api/command_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require "logstash/api/service"
require "logstash/api/commands/system/basicinfo_command"
require "logstash/api/commands/system/plugins_command"
require "logstash/api/commands/health_report"
require "logstash/api/commands/stats"
require "logstash/api/commands/node"
require "logstash/api/commands/default_metadata"
Expand All @@ -34,6 +35,7 @@ def initialize(service)
:plugins_command => ::LogStash::Api::Commands::System::Plugins,
:stats => ::LogStash::Api::Commands::Stats,
:node => ::LogStash::Api::Commands::Node,
:health_report => ::LogStash::Api::Commands::HealthReport,
:default_metadata => ::LogStash::Api::Commands::DefaultMetadata
}
end
Expand Down
22 changes: 14 additions & 8 deletions logstash-core/lib/logstash/api/commands/default_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,14 @@ module Api
module Commands
class DefaultMetadata < Commands::Base
def all
res = {:host => host,
:version => version,
:http_address => http_address,
:id => service.agent.id,
:name => service.agent.name,
:ephemeral_id => service.agent.ephemeral_id,
res = base_info.merge({
:status => service.agent.health_observer.status,
:snapshot => ::BUILD_INFO["build_snapshot"],
:pipeline => {
:workers => LogStash::SETTINGS.get("pipeline.workers"),
:batch_size => LogStash::SETTINGS.get("pipeline.batch.size"),
:batch_delay => LogStash::SETTINGS.get("pipeline.batch.delay"),
},
}
})
monitoring = {}
if enabled_xpack_monitoring?
monitoring = monitoring.merge({
Expand All @@ -49,6 +43,18 @@ def all
res.merge(monitoring.empty? ? {} : {:monitoring => monitoring})
end

def base_info
{
:host => host,
:version => version,
:http_address => http_address,
:id => service.agent.id,
:name => service.agent.name,
:ephemeral_id => service.agent.ephemeral_id,
:snapshot => ::BUILD_INFO["build_snapshot"],
}
end

def host
@@host ||= Socket.gethostname
end
Expand Down
31 changes: 31 additions & 0 deletions logstash-core/lib/logstash/api/commands/health_report.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

require "logstash/api/commands/base"

module LogStash
module Api
module Commands
class HealthReport < Commands::Base

def all(selected_fields = [])
service.agent.health_observer.report
end
end
end
end
end
49 changes: 49 additions & 0 deletions logstash-core/lib/logstash/api/modules/health_report.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

module LogStash
module Api
module Modules
class HealthReport < ::LogStash::Api::Modules::Base

get "/" do
payload = health_report.all.then do |health_report_pojo|
# The app_helper needs a ruby-hash.
# Manually creating a map of properties works around the issue.
base_metadata.merge({
status: health_report_pojo.status,
symptom: health_report_pojo.symptom,
indicators: health_report_pojo.indicators,
})
end

respond_with(payload, {exclude_default_metadata: true})
yaauie marked this conversation as resolved.
Show resolved Hide resolved
end

private

def health_report
@health_report ||= factory.build(:health_report)
yaauie marked this conversation as resolved.
Show resolved Hide resolved
end

def base_metadata
@factory.build(:default_metadata).base_info
end
end
end
end
end
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/api/rack_app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require "rack"
require "sinatra/base"
require "logstash/api/modules/base"
require "logstash/api/modules/health_report"
require "logstash/api/modules/node"
require "logstash/api/modules/node_stats"
require "logstash/api/modules/plugins"
Expand Down Expand Up @@ -123,6 +124,7 @@ def self.app(logger, agent, environment)

def self.rack_namespaces(agent)
{
"/_health_report" => LogStash::Api::Modules::HealthReport,
"/_node" => LogStash::Api::Modules::Node,
"/_stats" => LogStash::Api::Modules::Stats,
"/_node/stats" => LogStash::Api::Modules::NodeStats,
Expand Down
11 changes: 11 additions & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
@flushing = java.util.concurrent.atomic.AtomicBoolean.new(false)
@flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
@shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
@crash_detected = Concurrent::AtomicBoolean.new(false)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had thought we could rely on the difference between finished_run and finished_execution to tell us whether we had crashed, because finished_run is only set in a non-exceptional path, but it turns out that the execeptions from a crash are already handled by the time it gets there.

Ideally we would have a finite state machine for the possible states of a given pipeline instead of having state sprinkled throughout, but that task was a much larger lift than simply adding a single flag.

I found a dirty way to abuse the ruby filter to cause an exception that escapes its handling and causes the pipeline to crash, emulating the type of crash that is possible by a plugin that fails to handle exceptions:

- pipeline.id: uppy
  config.string: |
    input { heartbeat {} }
    output { sink {} }
- pipeline.id: crashy
  config.string: |
    input { heartbeat { } }
    filter {
      ruby {
        init => "@poison = ::Class.new(::Exception) { def backtrace; throw(:boom); end; }"
        code => "fail @poison.new"
      }
    }
    output { sink {} }
    output { stdout { codec => rubydebug } }

@outputs_registered = Concurrent::AtomicBoolean.new(false)

# @finished_execution signals that the pipeline thread has finished its execution
Expand All @@ -87,6 +88,10 @@ def finished_execution?
@finished_execution.true?
end

def finished_run?
@finished_run.true?
end

def ready?
@ready.value
end
Expand Down Expand Up @@ -229,6 +234,10 @@ def stopped?
@running.false?
end

def crashed?
@crash_detected.true?
end

# register_plugins calls #register_plugin on the plugins list and upon exception will call Plugin#do_close on all registered plugins
# @param plugins [Array[Plugin]] the list of plugins to register
def register_plugins(plugins)
Expand Down Expand Up @@ -305,6 +314,7 @@ def start_workers
rescue => e
# WorkerLoop.run() catches all Java Exception class and re-throws as IllegalStateException with the
# original exception as the cause
@crash_detected.make_true
@logger.error(
"Pipeline worker error, the pipeline will be stopped",
default_logging_keys(:error => e.cause.message, :exception => e.cause.class, :backtrace => e.cause.backtrace)
Expand All @@ -319,6 +329,7 @@ def start_workers
begin
start_inputs
rescue => e
@crash_detected.make_true
# if there is any exception in starting inputs, make sure we shutdown workers.
# exception will already by logged in start_inputs
shutdown_workers
Expand Down
8 changes: 8 additions & 0 deletions logstash-core/lib/logstash/pipeline_action/create.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,21 @@ def execution_priority
# The execute assume that the thread safety access of the pipeline
# is managed by the caller.
def execute(agent, pipelines_registry)
attach_health_indicator(agent)
new_pipeline = LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do
new_pipeline.start # block until the pipeline is correctly started or crashed
end

LogStash::ConvergeResult::ActionResult.create(self, success)
end

def attach_health_indicator(agent)
health_observer = agent.health_observer
health_observer.detach_pipeline_indicator(pipeline_id) # just in case ...
health_observer.attach_pipeline_indicator(pipeline_id, agent)
end

def to_s
"PipelineAction::Create<#{pipeline_id}>"
end
Expand Down
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/pipeline_action/delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@ def initialize(pipeline_id)

def execute(agent, pipelines_registry)
success = pipelines_registry.delete_pipeline(@pipeline_id)
detach_health_indicator(agent) if success

LogStash::ConvergeResult::ActionResult.create(self, success)
end

def detach_health_indicator(agent)
agent.health_observer.detach_pipeline_indicator(pipeline_id)
end

def to_s
"PipelineAction::Delete<#{pipeline_id}>"
end
Expand Down
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@ def execute(agent, pipelines_registry)
end

success = pipelines_registry.delete_pipeline(@pipeline_id)
detach_health_indicator(agent) if success

LogStash::ConvergeResult::ActionResult.create(self, success)
end

def detach_health_indicator(agent)
agent.health_observer.detach_pipeline_indicator(pipeline_id)
end

def to_s
"PipelineAction::StopAndDelete<#{pipeline_id}>"
end
Expand Down
16 changes: 16 additions & 0 deletions logstash-core/lib/logstash/pipelines_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,28 @@ def initialize(pipeline_id, pipeline)
@lock = Monitor.new
end

# a terminated pipeline has either crashed OR finished normally
def terminated?
@lock.synchronize do
# a loading pipeline is never considered terminated
@loading.false? && @pipeline.finished_execution?
end
end

# a finished pipeline finished _normally_ without exception
def finished?
@lock.synchronize do
# a loading pipeline is never considered terminated
@loading.false? && @pipeline.finished_run?
end
end

def crashed?
@lock.synchronize do
@pipeline&.crashed?
end
end

def running?
@lock.synchronize do
# not terminated and not loading
Expand Down Expand Up @@ -104,6 +119,7 @@ def size
end
end


def empty?
@lock.synchronize do
@states.empty?
Expand Down
3 changes: 3 additions & 0 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ def flush(options)

# wait until there is no more worker thread since we have a single worker that should have died
wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey
expect(subject.crashed?).to be true

# at this point the input plugin should have been asked to stop
wait(5).for {dummyinput.stop?}.to be_truthy
Expand Down Expand Up @@ -437,6 +438,7 @@ def flush(options)

# wait until there is no more worker thread since we have a single worker that should have died
wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey
expect(subject.crashed?).to be true

# at this point the input plugin should have been asked to stop
wait(5).for {dummyinput.stop?}.to be_truthy
Expand Down Expand Up @@ -602,6 +604,7 @@ def flush(options)
expect(input).to receive(:do_close).once
pipeline.start
pipeline.shutdown
expect(pipeline.crashed?).to be false
end
end
end
Expand Down
Loading