Skip to content

Commit

Permalink
health api: expose GET /_health_report with pipelines/*/up probe
Browse files Browse the repository at this point in the history
  • Loading branch information
yaauie committed Aug 21, 2024
1 parent 2b8c473 commit f967f70
Show file tree
Hide file tree
Showing 20 changed files with 949 additions and 6 deletions.
27 changes: 26 additions & 1 deletion logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
@ephemeral_id = SecureRandom.uuid

java_import("org.logstash.health.HealthObserver")
@health_observer = HealthObserver.new
@health_observer ||= HealthObserver.new
@health_observer.indicator.attach_indicator("pipelines", org.logstash.health.MultiIndicator.new)

# Mutex to synchronize in the exclusive method
# Initial usage for the Ruby pipeline initialization which is not thread safe
Expand Down Expand Up @@ -156,6 +157,30 @@ def execute
transition_to_stopped
end

include org.logstash.health.PipelineIndicator::PipelineDetailsProvider
def pipeline_details(pipeline_id)
logger.trace("fetching pipeline details for `#{pipeline_id}`")
pipeline_id = pipeline_id.to_sym

java_import org.logstash.health.PipelineIndicator

pipeline_state = @pipelines_registry.states.get(pipeline_id)
if pipeline_state.nil?
return PipelineIndicator::Details.new(PipelineIndicator::RunState::UNKNOWN)
end

run_state = pipeline_state.synchronize do |sync_state|
case
when sync_state.loading? then PipelineIndicator::RunState::LOADING
when sync_state.running? then PipelineIndicator::RunState::RUNNING
when sync_state.finished? then PipelineIndicator::RunState::FINISHED # must check before terminated
when sync_state.terminated? then PipelineIndicator::RunState::TERMINATED
end
end
return PipelineIndicator::Details.new(run_state)

end

def auto_reload?
@auto_reload
end
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/api/command_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require "logstash/api/service"
require "logstash/api/commands/system/basicinfo_command"
require "logstash/api/commands/system/plugins_command"
require "logstash/api/commands/health_report"
require "logstash/api/commands/stats"
require "logstash/api/commands/node"
require "logstash/api/commands/default_metadata"
Expand All @@ -34,6 +35,7 @@ def initialize(service)
:plugins_command => ::LogStash::Api::Commands::System::Plugins,
:stats => ::LogStash::Api::Commands::Stats,
:node => ::LogStash::Api::Commands::Node,
:health_report => ::LogStash::Api::Commands::HealthReport,
:default_metadata => ::LogStash::Api::Commands::DefaultMetadata
}
end
Expand Down
32 changes: 32 additions & 0 deletions logstash-core/lib/logstash/api/commands/health_report.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

require "logstash/api/commands/base"
require_relative "hot_threads_reporter"

module LogStash
module Api
module Commands
class HealthReport < Commands::Base

def all(selected_fields = [])
service.agent.health_observer.report
end
end
end
end
end
45 changes: 45 additions & 0 deletions logstash-core/lib/logstash/api/modules/health_report.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

module LogStash
module Api
module Modules
class HealthReport < ::LogStash::Api::Modules::Base

get "/" do
payload = health_report.all.then do |health_report_pojo|
# The app_helper needs a ruby-hash.
# Manually creating a map of properties works around the issue.
{
status: health_report_pojo.status,
symptom: health_report_pojo.symptom,
indicators: health_report_pojo.indicators,
}
end

respond_with(payload, {exclude_default_metadata: true})
end

private

def health_report
@health_report ||= factory.build(:health_report)
end
end
end
end
end
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/api/rack_app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require "rack"
require "sinatra/base"
require "logstash/api/modules/base"
require "logstash/api/modules/health_report"
require "logstash/api/modules/node"
require "logstash/api/modules/node_stats"
require "logstash/api/modules/plugins"
Expand Down Expand Up @@ -123,6 +124,7 @@ def self.app(logger, agent, environment)

def self.rack_namespaces(agent)
{
"/_health_report" => LogStash::Api::Modules::HealthReport,
"/_node" => LogStash::Api::Modules::Node,
"/_stats" => LogStash::Api::Modules::Stats,
"/_node/stats" => LogStash::Api::Modules::NodeStats,
Expand Down
4 changes: 4 additions & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ def finished_execution?
@finished_execution.true?
end

def finished_run?
@finished_run.true?
end

def ready?
@ready.value
end
Expand Down
10 changes: 10 additions & 0 deletions logstash-core/lib/logstash/pipeline_action/create.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,23 @@ def execution_priority
# The execute assume that the thread safety access of the pipeline
# is managed by the caller.
def execute(agent, pipelines_registry)
attach_health_indicator(agent)
new_pipeline = LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do
new_pipeline.start # block until the pipeline is correctly started or crashed
end
LogStash::ConvergeResult::ActionResult.create(self, success)
end

def attach_health_indicator(agent)
java_import org.logstash.health.PipelineIndicator
agent.health_observer.indicator.getIndicator("pipelines").get.attachIndicator(pipeline_id, PipelineIndicator.forPipeline(pipeline_id, agent))
logger.info("attached pipeline health indicator: #{pipeline_id}")
logger.debug("health observer: #{agent.health_observer.indicator}")
rescue
logger.warn("Failed to attach pipeline health indicator: #{$!.message}")
end

def to_s
"PipelineAction::Create<#{pipeline_id}>"
end
Expand Down
15 changes: 15 additions & 0 deletions logstash-core/lib/logstash/pipelines_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,22 @@ def initialize(pipeline_id, pipeline)
@lock = Monitor.new
end

# a terminated pipeline has either crashed OR finished normally
def terminated?
@lock.synchronize do
# a loading pipeline is never considered terminated
@loading.false? && @pipeline.finished_execution?
end
end

# a finished pipeline finished _normally_ without exception
def finished?
@lock.synchronize do
# a loading pipeline is never considered terminated
@loading.false? && @pipeline.finished_run?
end
end

def running?
@lock.synchronize do
# not terminated and not loading
Expand Down Expand Up @@ -104,6 +113,12 @@ def size
end
end

def list
@lock.synchronize do
@states.keys.dup
end
end

def empty?
@lock.synchronize do
@states.empty?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.logstash.health;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;

import java.io.IOException;
import java.util.Map;

@JsonSerialize(using = ApiHealthReport.JsonSerializer.class)
public class ApiHealthReport {
private final MultiIndicator.Report delegate;

public ApiHealthReport(final MultiIndicator.Report delegate) {
this.delegate = delegate;
}

public Status getStatus() {
return delegate.status();
}

public String getSymptom() {
return delegate.symptom();
}

public Map<String, Indicator.Report> getIndicators() {
return delegate.indicators();
}

public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer<ApiHealthReport> {
@Override
public void serialize(final ApiHealthReport apiHealthReport,
final JsonGenerator jsonGenerator,
final SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField("status", apiHealthReport.getStatus());
jsonGenerator.writeObjectField("symptom", apiHealthReport.getSymptom());
jsonGenerator.writeObjectField("indicators", apiHealthReport.getIndicators());
jsonGenerator.writeEndObject();
}
}
}
74 changes: 74 additions & 0 deletions logstash-core/src/main/java/org/logstash/health/Diagnosis.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.logstash.health;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;

import java.io.IOException;
import java.util.Objects;
import java.util.function.UnaryOperator;

@JsonSerialize(using = Diagnosis.JsonSerializer.class)
public final class Diagnosis {
final String cause;
final String action;
final String helpUrl;

private Diagnosis(final Builder builder) {
this.cause = builder.cause;
this.action = builder.action;
this.helpUrl = builder.helpUrl;
}

String cause() {
return cause;
}
String action() {
return action;
}
String helpUrl() {
return helpUrl;
}

static Builder builder() {
return new Builder();
}

public static class Builder {
private String cause;
private String action;
private String helpUrl;
public synchronized Builder setCause(final String cause) {
assert Objects.isNull(this.cause) : "cause has already been set";
this.cause = cause;
return this;
}
public synchronized Builder setAction(final String action) {
assert Objects.isNull(this.action) : "action has already been set";
this.action = action;
return this;
}
public synchronized Builder setHelpUrl(final String helpUrl) {
assert Objects.isNull(this.helpUrl) : "helpUrl has already been set";
this.helpUrl = helpUrl;
return this;
}
public synchronized Builder configure(final UnaryOperator<Builder> configurator) {
return configurator.apply(this);
}
public synchronized Diagnosis build() {
return new Diagnosis(this);
}
}

public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer<Diagnosis> {
@Override
public void serialize(Diagnosis diagnosis, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("cause", diagnosis.cause());
jsonGenerator.writeStringField("action", diagnosis.action());
jsonGenerator.writeStringField("help_url", diagnosis.helpUrl());
jsonGenerator.writeEndObject();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.EnumSet;

public class HealthObserver {

private final MultiIndicator rootIndicator = new MultiIndicator();

public final Status getStatus() {
// INTERNAL-ONLY Proof-of-concept to show flow-through to API results
switch (System.getProperty("logstash.apiStatus", "green")) {
Expand All @@ -33,7 +36,15 @@ public final Status getStatus() {
final EnumSet<Status> statuses = EnumSet.allOf(Status.class);
return Iterables.get(statuses, new java.util.Random().nextInt(statuses.size()));
default:
return Status.UNKNOWN;
return getReport().getStatus();
}
}

public MultiIndicator getIndicator() {
return this.rootIndicator;
}

public ApiHealthReport getReport() {
return new ApiHealthReport(this.rootIndicator.report());
}
}
Loading

0 comments on commit f967f70

Please sign in to comment.