diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index 8cb89141809..831f0fefde5 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -57,6 +57,7 @@ def versionMap = (Map) (new Yaml()).load(new File("$projectDir/../versions.yml") description = """Logstash Core Java""" +String logstashCoreVersion = versionMap['logstash-core'] String jacksonVersion = versionMap['jackson'] String jacksonDatabindVersion = versionMap['jackson-databind'] String jrubyVersion = versionMap['jruby']['version'] @@ -183,6 +184,23 @@ artifacts { } } +task generateVersionInfoResources(type: DefaultTask) { + ext.outDir = layout.buildDirectory.dir("generated-resources/version-info").get() + + inputs.property("version-info:logstash-core", logstashCoreVersion) + outputs.dir(ext.outDir) + + doLast { + mkdir outDir; + def resourceFile = outDir.file('version-info.properties').asFile + resourceFile.text = "logstash-core: ${logstashCoreVersion}" + } +} +sourceSets { + main { output.dir(generateVersionInfoResources.outputs.files) } +} +processResources.dependsOn generateVersionInfoResources + configurations { provided } diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index a358d9f2b80..bb32cbf14fb 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -54,7 +54,7 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil) @ephemeral_id = SecureRandom.uuid java_import("org.logstash.health.HealthObserver") - @health_observer = HealthObserver.new + @health_observer ||= HealthObserver.new # Mutex to synchronize in the exclusive method # Initial usage for the Ruby pipeline initialization which is not thread safe @@ -156,6 +156,31 @@ def execute transition_to_stopped end + include org.logstash.health.PipelineIndicator::PipelineDetailsProvider + def pipeline_details(pipeline_id) + logger.trace("fetching pipeline details for `#{pipeline_id}`") + pipeline_id = pipeline_id.to_sym + + java_import org.logstash.health.PipelineIndicator + + pipeline_state = @pipelines_registry.states.get(pipeline_id) + if pipeline_state.nil? + return PipelineIndicator::Details.new(PipelineIndicator::Status::UNKNOWN) + end + + status = pipeline_state.synchronize do |sync_state| + case + when sync_state.loading? then PipelineIndicator::Status::LOADING + when sync_state.crashed? then PipelineIndicator::Status::TERMINATED + when sync_state.running? then PipelineIndicator::Status::RUNNING + when sync_state.finished? then PipelineIndicator::Status::FINISHED + else PipelineIndicator::Status::UNKNOWN + end + end + + return PipelineIndicator::Details.new(status) + end + def auto_reload? @auto_reload end diff --git a/logstash-core/lib/logstash/api/command_factory.rb b/logstash-core/lib/logstash/api/command_factory.rb index 6ed9e570142..0b246f14034 100644 --- a/logstash-core/lib/logstash/api/command_factory.rb +++ b/logstash-core/lib/logstash/api/command_factory.rb @@ -18,6 +18,7 @@ require "logstash/api/service" require "logstash/api/commands/system/basicinfo_command" require "logstash/api/commands/system/plugins_command" +require "logstash/api/commands/health_report" require "logstash/api/commands/stats" require "logstash/api/commands/node" require "logstash/api/commands/default_metadata" @@ -34,6 +35,7 @@ def initialize(service) :plugins_command => ::LogStash::Api::Commands::System::Plugins, :stats => ::LogStash::Api::Commands::Stats, :node => ::LogStash::Api::Commands::Node, + :health_report => ::LogStash::Api::Commands::HealthReport, :default_metadata => ::LogStash::Api::Commands::DefaultMetadata } end diff --git a/logstash-core/lib/logstash/api/commands/default_metadata.rb b/logstash-core/lib/logstash/api/commands/default_metadata.rb index 31eba8950f2..635e3e5f43a 100644 --- a/logstash-core/lib/logstash/api/commands/default_metadata.rb +++ b/logstash-core/lib/logstash/api/commands/default_metadata.rb @@ -22,20 +22,14 @@ module Api module Commands class DefaultMetadata < Commands::Base def all - res = {:host => host, - :version => version, - :http_address => http_address, - :id => service.agent.id, - :name => service.agent.name, - :ephemeral_id => service.agent.ephemeral_id, + res = base_info.merge({ :status => service.agent.health_observer.status, - :snapshot => ::BUILD_INFO["build_snapshot"], :pipeline => { :workers => LogStash::SETTINGS.get("pipeline.workers"), :batch_size => LogStash::SETTINGS.get("pipeline.batch.size"), :batch_delay => LogStash::SETTINGS.get("pipeline.batch.delay"), }, - } + }) monitoring = {} if enabled_xpack_monitoring? monitoring = monitoring.merge({ @@ -49,6 +43,18 @@ def all res.merge(monitoring.empty? ? {} : {:monitoring => monitoring}) end + def base_info + { + :host => host, + :version => version, + :http_address => http_address, + :id => service.agent.id, + :name => service.agent.name, + :ephemeral_id => service.agent.ephemeral_id, + :snapshot => ::BUILD_INFO["build_snapshot"], + } + end + def host @@host ||= Socket.gethostname end diff --git a/logstash-core/lib/logstash/api/commands/health_report.rb b/logstash-core/lib/logstash/api/commands/health_report.rb new file mode 100644 index 00000000000..d53a313b499 --- /dev/null +++ b/logstash-core/lib/logstash/api/commands/health_report.rb @@ -0,0 +1,31 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "logstash/api/commands/base" + +module LogStash + module Api + module Commands + class HealthReport < Commands::Base + + def all(selected_fields = []) + service.agent.health_observer.report + end + end + end + end +end \ No newline at end of file diff --git a/logstash-core/lib/logstash/api/modules/health_report.rb b/logstash-core/lib/logstash/api/modules/health_report.rb new file mode 100644 index 00000000000..ff5728d94e7 --- /dev/null +++ b/logstash-core/lib/logstash/api/modules/health_report.rb @@ -0,0 +1,49 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module LogStash + module Api + module Modules + class HealthReport < ::LogStash::Api::Modules::Base + + get "/" do + payload = health_report.all.then do |health_report_pojo| + # The app_helper needs a ruby-hash. + # Manually creating a map of properties works around the issue. + base_metadata.merge({ + status: health_report_pojo.status, + symptom: health_report_pojo.symptom, + indicators: health_report_pojo.indicators, + }) + end + + respond_with(payload, {exclude_default_metadata: true}) + end + + private + + def health_report + @health_report ||= factory.build(:health_report) + end + + def base_metadata + @factory.build(:default_metadata).base_info + end + end + end + end +end \ No newline at end of file diff --git a/logstash-core/lib/logstash/api/rack_app.rb b/logstash-core/lib/logstash/api/rack_app.rb index c14bdf26a23..ee3e409e95e 100644 --- a/logstash-core/lib/logstash/api/rack_app.rb +++ b/logstash-core/lib/logstash/api/rack_app.rb @@ -18,6 +18,7 @@ require "rack" require "sinatra/base" require "logstash/api/modules/base" +require "logstash/api/modules/health_report" require "logstash/api/modules/node" require "logstash/api/modules/node_stats" require "logstash/api/modules/plugins" @@ -123,6 +124,7 @@ def self.app(logger, agent, environment) def self.rack_namespaces(agent) { + "/_health_report" => LogStash::Api::Modules::HealthReport, "/_node" => LogStash::Api::Modules::Node, "/_stats" => LogStash::Api::Modules::Stats, "/_node/stats" => LogStash::Api::Modules::NodeStats, diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 9cec566ccf0..b30d11e2be7 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -65,6 +65,7 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil) @flushing = java.util.concurrent.atomic.AtomicBoolean.new(false) @flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false) @shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false) + @crash_detected = Concurrent::AtomicBoolean.new(false) @outputs_registered = Concurrent::AtomicBoolean.new(false) # @finished_execution signals that the pipeline thread has finished its execution @@ -87,6 +88,10 @@ def finished_execution? @finished_execution.true? end + def finished_run? + @finished_run.true? + end + def ready? @ready.value end @@ -229,6 +234,10 @@ def stopped? @running.false? end + def crashed? + @crash_detected.true? + end + # register_plugins calls #register_plugin on the plugins list and upon exception will call Plugin#do_close on all registered plugins # @param plugins [Array[Plugin]] the list of plugins to register def register_plugins(plugins) @@ -305,6 +314,7 @@ def start_workers rescue => e # WorkerLoop.run() catches all Java Exception class and re-throws as IllegalStateException with the # original exception as the cause + @crash_detected.make_true @logger.error( "Pipeline worker error, the pipeline will be stopped", default_logging_keys(:error => e.cause.message, :exception => e.cause.class, :backtrace => e.cause.backtrace) @@ -319,6 +329,7 @@ def start_workers begin start_inputs rescue => e + @crash_detected.make_true # if there is any exception in starting inputs, make sure we shutdown workers. # exception will already by logged in start_inputs shutdown_workers diff --git a/logstash-core/lib/logstash/pipeline_action/create.rb b/logstash-core/lib/logstash/pipeline_action/create.rb index ffd09777733..6f0fff00119 100644 --- a/logstash-core/lib/logstash/pipeline_action/create.rb +++ b/logstash-core/lib/logstash/pipeline_action/create.rb @@ -46,13 +46,21 @@ def execution_priority # The execute assume that the thread safety access of the pipeline # is managed by the caller. def execute(agent, pipelines_registry) + attach_health_indicator(agent) new_pipeline = LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do new_pipeline.start # block until the pipeline is correctly started or crashed end + LogStash::ConvergeResult::ActionResult.create(self, success) end + def attach_health_indicator(agent) + health_observer = agent.health_observer + health_observer.detach_pipeline_indicator(pipeline_id) # just in case ... + health_observer.attach_pipeline_indicator(pipeline_id, agent) + end + def to_s "PipelineAction::Create<#{pipeline_id}>" end diff --git a/logstash-core/lib/logstash/pipeline_action/delete.rb b/logstash-core/lib/logstash/pipeline_action/delete.rb index 1a19509ba2f..c072e70bf38 100644 --- a/logstash-core/lib/logstash/pipeline_action/delete.rb +++ b/logstash-core/lib/logstash/pipeline_action/delete.rb @@ -27,10 +27,15 @@ def initialize(pipeline_id) def execute(agent, pipelines_registry) success = pipelines_registry.delete_pipeline(@pipeline_id) + detach_health_indicator(agent) if success LogStash::ConvergeResult::ActionResult.create(self, success) end + def detach_health_indicator(agent) + agent.health_observer.detach_pipeline_indicator(pipeline_id) + end + def to_s "PipelineAction::Delete<#{pipeline_id}>" end diff --git a/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb b/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb index c627087ed42..4c8e6ded037 100644 --- a/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb +++ b/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb @@ -31,10 +31,15 @@ def execute(agent, pipelines_registry) end success = pipelines_registry.delete_pipeline(@pipeline_id) + detach_health_indicator(agent) if success LogStash::ConvergeResult::ActionResult.create(self, success) end + def detach_health_indicator(agent) + agent.health_observer.detach_pipeline_indicator(pipeline_id) + end + def to_s "PipelineAction::StopAndDelete<#{pipeline_id}>" end diff --git a/logstash-core/lib/logstash/pipelines_registry.rb b/logstash-core/lib/logstash/pipelines_registry.rb index 3810201e8bb..3752003477c 100644 --- a/logstash-core/lib/logstash/pipelines_registry.rb +++ b/logstash-core/lib/logstash/pipelines_registry.rb @@ -28,6 +28,7 @@ def initialize(pipeline_id, pipeline) @lock = Monitor.new end + # a terminated pipeline has either crashed OR finished normally def terminated? @lock.synchronize do # a loading pipeline is never considered terminated @@ -35,6 +36,20 @@ def terminated? end end + # a finished pipeline finished _normally_ without exception + def finished? + @lock.synchronize do + # a loading pipeline is never considered terminated + @loading.false? && @pipeline.finished_run? + end + end + + def crashed? + @lock.synchronize do + @pipeline&.crashed? + end + end + def running? @lock.synchronize do # not terminated and not loading @@ -104,6 +119,7 @@ def size end end + def empty? @lock.synchronize do @states.empty? diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index 3e743967273..ec44e020efa 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -410,6 +410,7 @@ def flush(options) # wait until there is no more worker thread since we have a single worker that should have died wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey + expect(subject.crashed?).to be true # at this point the input plugin should have been asked to stop wait(5).for {dummyinput.stop?}.to be_truthy @@ -437,6 +438,7 @@ def flush(options) # wait until there is no more worker thread since we have a single worker that should have died wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey + expect(subject.crashed?).to be true # at this point the input plugin should have been asked to stop wait(5).for {dummyinput.stop?}.to be_truthy @@ -602,6 +604,7 @@ def flush(options) expect(input).to receive(:do_close).once pipeline.start pipeline.shutdown + expect(pipeline.crashed?).to be false end end end diff --git a/logstash-core/spec/logstash/pipeline_action/create_spec.rb b/logstash-core/spec/logstash/pipeline_action/create_spec.rb index 16cabacb086..553576b543a 100644 --- a/logstash-core/spec/logstash/pipeline_action/create_spec.rb +++ b/logstash-core/spec/logstash/pipeline_action/create_spec.rb @@ -30,6 +30,7 @@ before do clear_data_dir + allow(agent).to receive(:health_observer).and_return(double("HealthObserver").as_null_object) end subject { described_class.new(pipeline_config, metric) } @@ -66,6 +67,11 @@ it "returns a successful execution status" do expect(subject.execute(agent, pipelines)).to be_truthy end + + it "attached an indicator to the agent's health observer" do + expect(agent.health_observer).to receive(:attach_pipeline_indicator).with(:main, agent) + subject.execute(agent, pipelines) + end end context "when the pipeline doesn't start" do diff --git a/logstash-core/spec/logstash/pipeline_action/delete_spec.rb b/logstash-core/spec/logstash/pipeline_action/delete_spec.rb new file mode 100644 index 00000000000..73193389ae0 --- /dev/null +++ b/logstash-core/spec/logstash/pipeline_action/delete_spec.rb @@ -0,0 +1,78 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "spec_helper" +require_relative "../../support/helpers" +require_relative "../../support/matchers" +require "logstash/pipelines_registry" +require "logstash/pipeline_action/delete" +require "logstash/inputs/generator" + + +describe LogStash::PipelineAction::Delete do + let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" } + let(:pipeline_id) { :main } + let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) } + let(:pipelines) do + LogStash::PipelinesRegistry.new.tap do |chm| + chm.create_pipeline(pipeline_id, pipeline) { true } + end + end + let(:agent) { double("agent") } + + subject { described_class.new(pipeline_id) } + + before do + clear_data_dir + allow(agent).to receive(:health_observer).and_return(double("HealthObserver").as_null_object) + pipeline.start + end + + after do + pipeline.shutdown + end + + it "returns the pipeline_id" do + expect(subject.pipeline_id).to eq(:main) + end + + context "when the pipeline is still running" do + + it 'fails to delete the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to_not be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to_not be_nil + end + end + + context "when the pipeline has completed" do + let(:pipeline_config) { "input { generator { count => 1 } } output { null {} }"} + + before(:each) do + sleep(0.1) until pipelines.non_running_pipelines.keys.include?(pipeline_id) + end + + it 'deletes the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to be_nil + expect(agent.health_observer).to have_received(:detach_pipeline_indicator).with(pipeline_id) + end + end +end \ No newline at end of file diff --git a/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb b/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb new file mode 100644 index 00000000000..a32ed5eb0fa --- /dev/null +++ b/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb @@ -0,0 +1,79 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "spec_helper" +require_relative "../../support/helpers" +require_relative "../../support/matchers" +require "logstash/pipelines_registry" +require "logstash/pipeline_action/delete" +require "logstash/inputs/generator" + + +describe LogStash::PipelineAction::StopAndDelete do + let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" } + let(:pipeline_id) { :main } + let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) } + let(:pipelines) do + LogStash::PipelinesRegistry.new.tap do |chm| + chm.create_pipeline(pipeline_id, pipeline) { true } + end + end + let(:agent) { double("agent") } + + subject { described_class.new(pipeline_id) } + + before do + clear_data_dir + allow(agent).to receive(:health_observer).and_return(double("HealthObserver").as_null_object) + pipeline.start + end + + after do + pipeline.shutdown + end + + it "returns the pipeline_id" do + expect(subject.pipeline_id).to eq(:main) + end + + context "when the pipeline is still running" do + it 'stops and deletes the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to be_nil + expect(agent.health_observer).to have_received(:detach_pipeline_indicator).with(pipeline_id) + end + end + + context "when the pipeline has completed" do + let(:pipeline_config) { "input { generator { count => 1 } } output { null {} }"} + + before(:each) do + sleep(0.1) until pipelines.non_running_pipelines.keys.include?(pipeline_id) + end + + it 'deletes the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to be_nil + + expect(agent.health_observer).to have_received(:detach_pipeline_indicator).with(pipeline_id) + end + end +end \ No newline at end of file diff --git a/logstash-core/src/main/java/org/logstash/Logstash.java b/logstash-core/src/main/java/org/logstash/Logstash.java index eb9c823f397..04ce1b1820f 100644 --- a/logstash-core/src/main/java/org/logstash/Logstash.java +++ b/logstash-core/src/main/java/org/logstash/Logstash.java @@ -26,6 +26,7 @@ import java.io.PrintStream; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Properties; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,6 +46,25 @@ */ public final class Logstash implements Runnable, AutoCloseable { + public static final String VERSION_FULL; + public static final String VERSION_MAJOR; + public static final String VERSION_MINOR; + public static final String VERSION_PATCH; + + static { + final Properties properties = new Properties(); + try { + properties.load(Logstash.class.getResourceAsStream("/version-info.properties")); + VERSION_FULL = properties.getProperty("logstash-core"); + final String[] versions = VERSION_FULL.split("\\."); + VERSION_MAJOR = versions[0]; + VERSION_MINOR = versions[1]; + VERSION_PATCH = versions[2]; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private static final Logger LOGGER = LogManager.getLogger(Logstash.class); /** diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java index e905d922c5b..dbbfb97de5b 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java @@ -22,6 +22,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; + import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.RubyNumeric; diff --git a/logstash-core/src/main/java/org/logstash/health/ApiHealthReport.java b/logstash-core/src/main/java/org/logstash/health/ApiHealthReport.java new file mode 100644 index 00000000000..d2d1cbf6889 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ApiHealthReport.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.Map; + +@JsonSerialize(using = ApiHealthReport.JsonSerializer.class) +public class ApiHealthReport { + private final MultiIndicator.Report delegate; + + public ApiHealthReport(final MultiIndicator.Report delegate) { + this.delegate = delegate; + } + + public Status getStatus() { + return delegate.status(); + } + + public String getSymptom() { + return delegate.symptom(); + } + + public Map getIndicators() { + return delegate.indicators(); + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(final ApiHealthReport apiHealthReport, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("status", apiHealthReport.getStatus()); + jsonGenerator.writeObjectField("symptom", apiHealthReport.getSymptom()); + jsonGenerator.writeObjectField("indicators", apiHealthReport.getIndicators()); + jsonGenerator.writeEndObject(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Diagnosis.java b/logstash-core/src/main/java/org/logstash/health/Diagnosis.java new file mode 100644 index 00000000000..7e64f555638 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Diagnosis.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.UnaryOperator; + +@JsonSerialize(using = Diagnosis.JsonSerializer.class) +public final class Diagnosis { + public final String id; + public final String cause; + public final String action; + public final String helpUrl; + + private Diagnosis(final Builder builder) { + this.id = builder.id; + this.cause = builder.cause; + this.action = builder.action; + this.helpUrl = builder.helpUrl; + } + + static Builder builder() { + return new Builder(); + } + + public static class Builder { + private final String id; + private final String cause; + private final String action; + private final String helpUrl; + + public Builder() { + this(null, null, null, null); + } + + Builder(final String id, + final String cause, + final String action, + final String helpUrl) { + this.id = id; + this.cause = cause; + this.action = action; + this.helpUrl = helpUrl; + } + + public Builder withId(final String id) { + if (Objects.equals(id, this.id)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + + public Builder withCause(final String cause) { + if (Objects.equals(cause, this.cause)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + public Builder withAction(final String action) { + if (Objects.equals(action, this.action)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + public Builder withHelpUrl(final String helpUrl) { + if (Objects.equals(helpUrl, this.helpUrl)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + public Builder transform(final UnaryOperator configurator) { + return configurator.apply(this); + } + public synchronized Diagnosis build() { + return new Diagnosis(this); + } + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(Diagnosis diagnosis, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + if (diagnosis.id != null) { + jsonGenerator.writeStringField("id", diagnosis.id); + } + jsonGenerator.writeStringField("cause", diagnosis.cause); + jsonGenerator.writeStringField("action", diagnosis.action); + jsonGenerator.writeStringField("help_url", diagnosis.helpUrl); + jsonGenerator.writeEndObject(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/HealthObserver.java b/logstash-core/src/main/java/org/logstash/health/HealthObserver.java index 134fd51e1e7..63a461a74e6 100644 --- a/logstash-core/src/main/java/org/logstash/health/HealthObserver.java +++ b/logstash-core/src/main/java/org/logstash/health/HealthObserver.java @@ -18,22 +18,47 @@ */ package org.logstash.health; -import com.google.common.collect.Iterables; - -import java.util.EnumSet; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class HealthObserver { + + private static final Logger LOGGER = LogManager.getLogger(); + + private final MultiIndicator rootIndicator = new MultiIndicator(); + private final MultiIndicator pipelinesIndicator = new MultiIndicator(); + + public HealthObserver() { + this.rootIndicator.attachIndicator("pipelines", this.pipelinesIndicator); + } + public final Status getStatus() { - // INTERNAL-ONLY Proof-of-concept to show flow-through to API results - switch (System.getProperty("logstash.apiStatus", "green")) { - case "green": return Status.GREEN; - case "yellow": return Status.YELLOW; - case "red": return Status.RED; - case "random": - final EnumSet statuses = EnumSet.allOf(Status.class); - return Iterables.get(statuses, new java.util.Random().nextInt(statuses.size())); - default: - return Status.UNKNOWN; + return getReport().getStatus(); + } + + public MultiIndicator getIndicator() { + return this.rootIndicator; + } + + public ApiHealthReport getReport() { + return new ApiHealthReport(this.rootIndicator.report()); + } + + public void attachPipelineIndicator(final String pipelineId, final PipelineIndicator.PipelineDetailsProvider detailsProvider) { + try { + this.pipelinesIndicator.attachIndicator(pipelineId, PipelineIndicator.forPipeline(pipelineId, detailsProvider)); + LOGGER.debug(String.format("attached pipeline indicator [%s]", pipelineId)); + } catch (final Exception e) { + LOGGER.warn(String.format("failed to attach pipeline indicator [%s]", pipelineId), e); + } + } + + public void detachPipelineIndicator(final String pipelineId) { + try { + this.pipelinesIndicator.detachIndicator(pipelineId, null); + LOGGER.debug(String.format("detached pipeline indicator [%s]", pipelineId)); + } catch (final Exception e) { + LOGGER.warn(String.format("failed to detach pipeline indicator [%s]", pipelineId), e); } } } diff --git a/logstash-core/src/main/java/org/logstash/health/HelpUrl.java b/logstash-core/src/main/java/org/logstash/health/HelpUrl.java new file mode 100644 index 00000000000..db2586765d1 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/HelpUrl.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import org.logstash.Logstash; + +import java.util.Objects; + +public class HelpUrl { + static final String BASE_URL; + static { + final String versionAnchor; + if (Integer.parseInt(Logstash.VERSION_MAJOR) >= 9) { + versionAnchor = "master"; + } else { + versionAnchor = String.format("%s.%s", Logstash.VERSION_MAJOR, Logstash.VERSION_MINOR); + } + BASE_URL = String.format("https://www.elastic.co/guide/en/logstash/%s/", versionAnchor); + } + + public HelpUrl(final String page) { + this(page, null); + } + + public HelpUrl withAnchor(final String anchor) { + return new HelpUrl(this.page, anchor); + } + + private HelpUrl(final String page, final String anchor) { + Objects.requireNonNull(page, "page cannot be null"); + this.page = page; + this.anchor = anchor; + } + + private final String page; + private final String anchor; + + private transient String resolved; + + @Override + public String toString() { + if (resolved == null) { + final StringBuilder sb = new StringBuilder(BASE_URL); + sb.append(page).append(".html"); + if (anchor != null) { + sb.append("#").append(anchor); + } + resolved = sb.toString(); + } + return resolved; + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Impact.java b/logstash-core/src/main/java/org/logstash/health/Impact.java new file mode 100644 index 00000000000..3ccda5de2af --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Impact.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.*; +import java.util.function.UnaryOperator; + +@JsonSerialize(using=Impact.JsonSerializer.class) +public final class Impact { + public final String id; + public final int severity; + public final String description; + public final Set impactAreas; + + public Impact(final Builder builder) { + this.id = builder.id; + this.severity = Objects.requireNonNullElse(builder.severity, 0); + this.description = builder.description; + this.impactAreas = Set.copyOf(builder.impactAreas); + } + + static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String id; + private Integer severity; + private String description; + private Set impactAreas; + + public Builder() { + this.impactAreas = Set.of(); + } + + private Builder(String id, Integer severity, String description, Set impactAreas) { + this.id = id; + this.severity = severity; + this.description = description; + this.impactAreas = Set.copyOf(impactAreas); + } + + public synchronized Builder withId(final String id) { + return new Builder(id, severity, description, impactAreas); + } + + public synchronized Builder withSeverity(int severity) { + return new Builder(id, severity, description, impactAreas); + } + + public synchronized Builder withDescription(String description) { + return new Builder(id, severity, description, impactAreas); + } + + public synchronized Builder withAdditionalImpactArea(ImpactArea impactArea) { + final Set mergedImpactAreas = new HashSet<>(impactAreas); + if (!mergedImpactAreas.add(impactArea)) { + return this; + } else { + return this.withImpactAreas(mergedImpactAreas); + } + } + + public synchronized Builder withImpactAreas(Collection impactAreas) { + return new Builder(id, severity, description, Set.copyOf(impactAreas)); + } + + public synchronized Builder transform(final UnaryOperator configurator) { + return configurator.apply(this); + } + + public synchronized Impact build() { + return new Impact(this); + } + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(Impact impact, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + if (impact.id != null) { + jsonGenerator.writeStringField("id", impact.id); + } + jsonGenerator.writeNumberField("severity", impact.severity); + jsonGenerator.writeStringField("description", impact.description); + jsonGenerator.writeObjectField("impact_areas", impact.impactAreas); + jsonGenerator.writeEndObject(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/ImpactArea.java b/logstash-core/src/main/java/org/logstash/health/ImpactArea.java new file mode 100644 index 00000000000..94679c79ad9 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ImpactArea.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.Objects; + +public enum ImpactArea { + PIPELINE_EXECUTION, + ; + + private final String externalValue; + + ImpactArea(final String externalValue) { + this.externalValue = Objects.requireNonNullElseGet(externalValue, () -> name().toLowerCase()); + } + + ImpactArea() { + this(null); + } + + @JsonValue + public String externalValue() { + return this.externalValue; + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Indicator.java b/logstash-core/src/main/java/org/logstash/health/Indicator.java new file mode 100644 index 00000000000..df646e13584 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Indicator.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +public interface Indicator { + REPORT report(ReportContext reportContext); + + default REPORT report() { + return report(ReportContext.EMPTY); + } + + interface Report { + Status status(); + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/MultiIndicator.java b/logstash-core/src/main/java/org/logstash/health/MultiIndicator.java new file mode 100644 index 00000000000..d0699f7687a --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/MultiIndicator.java @@ -0,0 +1,164 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A {@code MultiIndicator} is an {@link Indicator} that combines multiple sub-{@link Indicator}s and produces a + * summarized {@link Report}. + */ +public class MultiIndicator implements Indicator { + private static final Logger LOGGER = LogManager.getLogger(); + + private final Map> indicators = new ConcurrentHashMap<>(); + + public void attachIndicator(final String name, + final Indicator indicatorToAttach) { + final Indicator existing = indicators.putIfAbsent(name, indicatorToAttach); + if (Objects.nonNull(existing) && !Objects.equals(existing, indicatorToAttach)) { + throw new IllegalArgumentException(String.format("Cannot attach indicator %s (%s) because a different one of the same name is already attached (%s).", name, indicatorToAttach, existing)); + } + LOGGER.debug("attached indicator {}=>{} (res:{})", name, indicatorToAttach, this); + } + + public void detachIndicator(final String name, + final Indicator indicatorToDetach) { + final Indicator remaining = indicators.computeIfPresent(name, (key, existing) -> Objects.isNull(indicatorToDetach) || Objects.equals(indicatorToDetach, existing) ? null : existing); + if (Objects.nonNull(remaining)) { + throw new IllegalArgumentException("Cannot detach indicator " + name + " because a different one of the same name is attached."); + } + LOGGER.debug("detached indicator {}<={} (res:{})", name, indicatorToDetach, this); + } + + public > Optional getIndicator(final String name, + final Class indicatorClass) { + return getIndicator(name).map(indicatorClass::cast); + } + + public Optional> getIndicator(final String name) { + return Optional.ofNullable(indicators.get(name)); + } + + @Override + public Report report(final ReportContext reportContext) { + LOGGER.debug("report starting with indicators {} for {}", this.indicators, reportContext); + final Status.Holder combinedStatus = new Status.Holder(); + + final Map reports = new HashMap<>(); + final Map> indicatorNamesByStatus = new HashMap<>(); + + this.indicators.forEach((indicatorName, indicator) -> { + if (reportContext.isMuted(indicatorName)) { + LOGGER.trace("sub-indicator {} is muted for {}", indicatorName, reportContext); + } else { + reportContext.descend(indicatorName, (scopedContext) -> { + final Indicator.Report report = indicator.report(scopedContext); + + combinedStatus.reduce(report.status()); + reports.put(indicatorName, report); + indicatorNamesByStatus.computeIfAbsent(report.status(), k -> new HashSet<>()).add(indicatorName); + }); + } + }); + + final StringBuilder symptom = new StringBuilder(); + // to highlight indicators by most-degraded status, we summarize in reverse-order + final List summaryByStatus = new ArrayList<>(indicatorNamesByStatus.size()); + for (int i = Status.values().length - 1; i >= 0; i--) { + final Status summarizingStatus = Status.values()[i]; + if (indicatorNamesByStatus.containsKey(summarizingStatus)) { + final Set indicatorNames = indicatorNamesByStatus.get(summarizingStatus); + summaryByStatus.add(String.format("%s "+(indicatorNames.size()==1 ? "indicator is" : "indicators are")+" %s (`%s`)", + indicatorNames.size(), + summarizingStatus.descriptiveValue(), + String.join("`, `", indicatorNames))); + } + } + if (summaryByStatus.isEmpty()) { + symptom.append("no indicators"); + } else if (summaryByStatus.size() == 1) { + symptom.append(summaryByStatus.get(0)); + } else if (summaryByStatus.size() == 2) { + symptom.append(summaryByStatus.get(0)).append(" and ").append(summaryByStatus.get(1)); + } else { + final int lastIndex = summaryByStatus.size() - 1; + symptom.append(String.join(", ", summaryByStatus.subList(0, lastIndex))) + .append(", and ").append(summaryByStatus.get(lastIndex)); + } + + return new Report(combinedStatus.value(), symptom.toString(), reports); + } + + @Override + public String toString() { + return "MultiIndicator{" + + "indicators=" + indicators + + '}'; + } + + @JsonSerialize(using=Report.JsonSerializer.class) + public static class Report implements Indicator.Report { + private final Status status; + private final String symptom; + private final Map indicators; + + Report(final Status status, + final String symptom, + final Map indicators) { + this.status = status; + this.symptom = symptom; + this.indicators = Map.copyOf(indicators); + } + + @Override + public Status status() { + return this.status; + } + + public String symptom() { + return this.symptom; + } + + public Map indicators() { + return this.indicators; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(final Report report, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("status", report.status()); + jsonGenerator.writeStringField("symptom", report.symptom); + jsonGenerator.writeObjectField("indicators", report.indicators()); + jsonGenerator.writeEndObject(); + } + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java b/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java new file mode 100644 index 00000000000..3f6821fec4f --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java @@ -0,0 +1,195 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.Objects; + +import static org.logstash.health.Status.*; + +/** + * A {@code PipelineIndicator} is a specialized {@link ProbeIndicator} that is meant for assessing the health of + * an individual pipeline. + */ +public class PipelineIndicator extends ProbeIndicator { + + public static PipelineIndicator forPipeline(final String pipelineId, + final PipelineDetailsProvider pipelineDetailsProvider) { + PipelineIndicator pipelineIndicator = new PipelineIndicator(new DetailsSupplier(pipelineId, pipelineDetailsProvider)); + pipelineIndicator.attachProbe("status", new StatusProbe()); + return pipelineIndicator; + } + + private PipelineIndicator(final DetailsSupplier detailsSupplier) { + super("pipeline", detailsSupplier::get); + } + + @JsonSerialize(using = Status.JsonSerializer.class) + public static class Status { + public enum State { + UNKNOWN, + LOADING, + RUNNING, + FINISHED, + TERMINATED, + } + + public static final Status UNKNOWN = new Status(State.UNKNOWN); + public static final Status LOADING = new Status(State.LOADING); + public static final Status RUNNING = new Status(State.RUNNING); + public static final Status FINISHED = new Status(State.FINISHED); + public static final Status TERMINATED = new Status(State.TERMINATED); + + private final State state; + public Status(final State state) { + this.state = state; + } + public State getState() { + return state; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(Status value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeStartObject(); + gen.writeStringField("state", value.getState().toString()); + gen.writeEndObject(); + } + } + } + + @JsonSerialize(using = Details.JsonSerializer.class) + public static class Details implements Observation { + private final Status status; + + public Details(final Status status) { + this.status = Objects.requireNonNull(status, "status cannot be null"); + } + public Status getStatus() { + return this.status; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer
{ + @Override + public void serialize(final Details details, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("status", details.getStatus()); + jsonGenerator.writeEndObject(); + } + } + } + + /** + * This interface is implemented by the ruby-Agent + */ + @FunctionalInterface + public interface PipelineDetailsProvider { + Details pipelineDetails(final String pipelineId); + } + + public static class DetailsSupplier { + private final String pipelineId; + private final PipelineDetailsProvider pipelineDetailsProvider; + DetailsSupplier(final String pipelineId, + final PipelineDetailsProvider pipelineDetailsProvider) { + this.pipelineId = pipelineId; + this.pipelineDetailsProvider = pipelineDetailsProvider; + } + + public Details get() { + return this.pipelineDetailsProvider.pipelineDetails(pipelineId); + } + } + + static class StatusProbe implements Probe
{ + static final Impact.Builder NOT_PROCESSING = Impact.builder() + .withId(impactId("not_processing")) + .withDescription("the pipeline is not currently processing") + .withAdditionalImpactArea(ImpactArea.PIPELINE_EXECUTION); + + static final HelpUrl HELP_URL = new HelpUrl("health-report-pipeline-status"); + + @Override + public Analysis analyze(final Details details) { + switch (details.getStatus().getState()) { + case LOADING: + return Analysis.builder() + .withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("loading")) + .withCause("pipeline is loading") + .withAction("if pipeline does not come up quickly, you may need to check the logs to see if it is stalled") + .withHelpUrl(HELP_URL.withAnchor("loading").toString())) + .withImpact(NOT_PROCESSING.withSeverity(1).withDescription("pipeline is loading").build()) + .build(); + case RUNNING: + return Analysis.builder() + .withStatus(GREEN) + .build(); + case FINISHED: + return Analysis.builder() + .withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("finished")) + .withCause("pipeline has finished running because its inputs have been closed and events have been processed") + .withAction("if you expect this pipeline to run indefinitely, you will need to configure its inputs to continue receiving or fetching events") + .withHelpUrl(HELP_URL.withAnchor("finished").toString())) + .withImpact(NOT_PROCESSING.withSeverity(10).withDescription("pipeline has finished running").build()) + .build(); + case TERMINATED: + return Analysis.builder() + .withStatus(RED) + .withDiagnosis(db -> db + .withId(diagnosisId("terminated")) + .withCause("pipeline is not running, likely because it has encountered an error") + .withAction("view logs to determine the cause of abnormal pipeline shutdown") + .withHelpUrl(HELP_URL.withAnchor("terminated").toString())) + .withImpact(NOT_PROCESSING.withSeverity(1).build()) + .build(); + case UNKNOWN: + default: + return Analysis.builder() + .withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("unknown")) + .withCause("pipeline is not known; it may have been recently deleted or failed to start") + .withAction("view logs to determine if the pipeline failed to start") + .withHelpUrl(HELP_URL.withAnchor("unknown").toString())) + .withImpact(NOT_PROCESSING.withSeverity(2).build()) + .build(); + } + } + + static String diagnosisId(final String state) { + return String.format("logstash:health:pipeline:status:diagnosis:%s", state); + } + + static String impactId(final String state) { + return String.format("logstash:health:pipeline:status:impact:%s", state); + } + + + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Probe.java b/logstash-core/src/main/java/org/logstash/health/Probe.java new file mode 100644 index 00000000000..ec9cf71d2b4 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Probe.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import java.util.Objects; +import java.util.function.UnaryOperator; + +public interface Probe { + Analysis analyze(OBSERVATION observation); + + final class Analysis { + public final Status status; + public final Diagnosis diagnosis; + public final Impact impact; + + Analysis(final Builder builder) { + this.status = builder.status; + this.diagnosis = builder.diagnosis; + this.impact = builder.impact; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private final Status status; + private final Diagnosis diagnosis; + private final Impact impact; + + public Builder() { + this(Status.UNKNOWN, null, null); + } + + public Builder(final Status status, + final Diagnosis diagnosis, + final Impact impact) { + this.status = status; + this.diagnosis = diagnosis; + this.impact = impact; + } + + public Builder withStatus(final Status status) { + if (Objects.equals(this.status, status)) { + return this; + } + return new Builder(status, this.diagnosis, this.impact); + } + + public Builder withDiagnosis(final Diagnosis diagnosis) { + if (Objects.equals(this.diagnosis, diagnosis)) { + return this; + } + return new Builder(status, diagnosis, impact); + } + + public Builder withDiagnosis(final UnaryOperator diagnosisConfigurator) { + return this.withDiagnosis(Diagnosis.builder().transform(diagnosisConfigurator).build()); + } + + public synchronized Builder withImpact(final Impact impact) { + if (Objects.equals(this.impact, impact)) { + return this; + } + return new Builder(status, this.diagnosis, impact); + } + + public Builder withImpact(final UnaryOperator impactConfigurator) { + return this.withImpact(Impact.builder().transform(impactConfigurator).build()); + } + + public synchronized Analysis build() { + return new Analysis(this); + } + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/ProbeIndicator.java b/logstash-core/src/main/java/org/logstash/health/ProbeIndicator.java new file mode 100644 index 00000000000..ddb4fce6c4b --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ProbeIndicator.java @@ -0,0 +1,205 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * A {@code ProbeIndicator} is an {@link Indicator} that has one or more {@link Probe}s attached, and can be used + * to produce a {@link Report}. + */ +public class ProbeIndicator implements Indicator> { + private static final Logger LOGGER = LogManager.getLogger(); + + // Marker Interface + public interface Observation {} + + @FunctionalInterface + public interface Observer extends Supplier {} + + private final String subject; + private final Observer observer; + + private final Map> probes = new ConcurrentHashMap<>(); + + public ProbeIndicator(final String subject, + final Observer observer, + final Map> probes) { + this(subject, observer); + probes.forEach(this::attachProbe); + } + + public ProbeIndicator(final String subject, + final Observer observer) { + this.subject = subject; + this.observer = observer; + } + + public final void attachProbe(final String name, + final Probe probeToAttach) { + final Probe existing = probes.putIfAbsent(name, probeToAttach); + if (Objects.nonNull(existing) && !Objects.equals(existing, probeToAttach)) { + throw new IllegalArgumentException("Cannot attach probe " + name + " because a different one of the same name is already attached."); + } + } + + public final void detachProbe(final String name, + final Probe probeToDetach) { + Probe remaining = probes.computeIfPresent(name, (key, existing) -> Objects.equals(probeToDetach, existing) ? null : existing); + if (Objects.nonNull(remaining)) { + throw new IllegalArgumentException("Cannot detach probe " + name + " because a different one of the same name is attached."); + } + } + + public final void detachProbe(final String name) { + probes.remove(name); + } + + Probe getProbe(final String name) { + return probes.get(name); + } + + @Override + public Report report(final ReportContext reportContext) { + + LOGGER.debug("report starting with {} probes {}", this.probes.keySet(), reportContext); + final OBSERVATION observation = observer.get(); + + final Status.Holder combinedStatus = new Status.Holder(); + final List diagnoses = new ArrayList<>(); + final List impacts = new ArrayList<>(); + final Set distinctImpactAreas = new HashSet<>(); + + for (Map.Entry> probeEntry : this.probes.entrySet()) { + final String probeName = probeEntry.getKey(); + final Probe.Analysis probeAnalysis = probeEntry.getValue().analyze(observation); + LOGGER.trace("probe {}: {}", probeName, probeAnalysis); + + if (reportContext.isMuted(probeName)) { + LOGGER.trace("probe {} is muted", probeName); + } else { + combinedStatus.reduce(probeAnalysis.status); + Optional.ofNullable(probeAnalysis.diagnosis) + .ifPresent(diagnoses::add); + Optional.ofNullable(probeAnalysis.impact) + .filter(impacts::add) + .map(impact -> impact.impactAreas) + .ifPresent(distinctImpactAreas::addAll); + } + } + + final Status status = combinedStatus.value(); + final StringBuilder symptomBuilder = new StringBuilder(); + symptomBuilder.append(String.format("The %s is %s", this.subject, status.descriptiveValue())); + if (distinctImpactAreas.size() + diagnoses.size() > 0) { + symptomBuilder.append("; ") + .append(String.format(distinctImpactAreas.size() == 1 ? "%s area is impacted" : "%s areas are impacted", distinctImpactAreas.size())) + .append(" and ") + .append(String.format(diagnoses.size() == 1 ? "%s diagnosis is available" : "%s diagnoses are available", diagnoses.size())); + } + final String symptom = symptomBuilder.toString(); + + return new Report<>(status, observation, symptom, diagnoses, impacts); + } + + @Override + public String toString() { + return "ProbeIndicator{" + + "observer=" + observer + + ", probes=" + probes + + '}'; + } + + @JsonSerialize(using=Report.JsonSerializer.class) + public static class Report
implements Indicator.Report { + private final Status status; + private final DETAILS details; + private final String symptom; + private final List diagnosis; + + private final List impacts; + + public Report(final Status status, + final DETAILS details, + final String symptom, + final List diagnosis, + final List impacts) { + this.status = status; + this.details = details; + this.symptom = symptom; + this.diagnosis = List.copyOf(diagnosis); + this.impacts = List.copyOf(impacts); + } + public Status status() { + return status; + } + public DETAILS details() { + return details; + } + public String symptom() { + return symptom; + } + public List diagnosis() { + return diagnosis; + } + public List impacts() { + return impacts; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer> { + @Override + public void serialize(final Report report, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + + jsonGenerator.writeObjectField("status", report.status); + jsonGenerator.writeStringField("symptom", report.symptom); + + if (Objects.nonNull(report.diagnosis) && !report.diagnosis.isEmpty()) { + jsonGenerator.writeObjectField("diagnosis", report.diagnosis); + } + + if (Objects.nonNull(report.impacts) && !report.impacts.isEmpty()) { + jsonGenerator.writeObjectField("impacts", report.impacts); + } + + jsonGenerator.writeObjectField("details", report.details); + + jsonGenerator.writeEndObject(); + } + } + + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/ReportContext.java b/logstash-core/src/main/java/org/logstash/health/ReportContext.java new file mode 100644 index 00000000000..9dc2970fcd4 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ReportContext.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * A {@code ReportContext} is used when building an {@link Indicator.Report} to provide contextual configuration + * for a specific {@link Indicator} that is being reported on. + */ +public class ReportContext { + private final List path; + + public static final ReportContext EMPTY = new ReportContext(List.of()); + + ReportContext(final List path) { + this.path = List.copyOf(path); + } + + public ReportContext descend(final String node) { + final ArrayList newPath = new ArrayList<>(path); + newPath.add(node); + return new ReportContext(newPath); + } + + public void descend(final String node, final Consumer consumer) { + consumer.accept(this.descend(node)); + } + + public boolean isMuted(final String childNodeName) { + return false; + } + + @Override + public String toString() { + return "ReportContext{" + + "path=" + path + + '}'; + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Status.java b/logstash-core/src/main/java/org/logstash/health/Status.java index 80bedc48531..ba0ce68a3fd 100644 --- a/logstash-core/src/main/java/org/logstash/health/Status.java +++ b/logstash-core/src/main/java/org/logstash/health/Status.java @@ -21,19 +21,26 @@ import com.fasterxml.jackson.annotation.JsonValue; public enum Status { - UNKNOWN, - GREEN, - YELLOW, - RED, + GREEN("healthy"), + UNKNOWN("unknown"), + YELLOW("concerning"), + RED("unhealthy"), ; private final String externalValue = name().toLowerCase(); + private final String descriptiveValue; + + Status(String descriptiveValue) { + this.descriptiveValue = descriptiveValue; + } @JsonValue public String externalValue() { return externalValue; } + public String descriptiveValue() { return descriptiveValue; } + /** * Combine this status with another status. * This method is commutative. @@ -47,4 +54,14 @@ public Status reduce(Status status) { return status; } } + + public static class Holder { + private Status status = Status.GREEN; + public synchronized Status reduce(Status status) { + return this.status = this.status.reduce(status); + } + public synchronized Status value() { + return this.status; + } + } } diff --git a/logstash-core/src/test/java/org/logstash/health/ProbeIndicatorTest.java b/logstash-core/src/test/java/org/logstash/health/ProbeIndicatorTest.java new file mode 100644 index 00000000000..0ec8601112a --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/health/ProbeIndicatorTest.java @@ -0,0 +1,128 @@ +package org.logstash.health; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class ProbeIndicatorTest { + + @Test + public void attachProbeWhenNotExists() throws Exception { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + final Probe probeToAdd = new ProbeImplementation(); + probeIndicator.attachProbe("new", probeToAdd); + + assertThat(probeIndicator.getProbe("new"), is(probeToAdd)); + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + } + + @Test + public void attachProbeWhenExists() throws Exception { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + final Probe probeToAdd = new ProbeImplementation(); + assertThrows(IllegalArgumentException.class, () -> probeIndicator.attachProbe("existing", probeToAdd)); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + } + + @Test + public void attachProbeWhenAttached() throws Exception { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + // attach it again + probeIndicator.attachProbe("existing", existingProbe); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + } + + + @Test + public void detachProbeByNameWhenAttached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe existingProbeToRemove = new ProbeImplementation(); + probeIndicator.attachProbe("to_remove", existingProbeToRemove); + + probeIndicator.detachProbe("to_remove"); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByNameWhenDetached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + probeIndicator.detachProbe("to_remove"); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByValueWhenAttached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe existingProbeToRemove = new ProbeImplementation(); + probeIndicator.attachProbe("to_remove", existingProbeToRemove); + + probeIndicator.detachProbe("to_remove", existingProbeToRemove); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByValueWhenDetached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe existingProbeToRemove = new ProbeImplementation(); + + probeIndicator.detachProbe("to_remove", existingProbeToRemove); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByValueWhenConflict() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe anotherProbeToRemove = new ProbeImplementation(); + + assertThrows(IllegalArgumentException.class, () -> probeIndicator.detachProbe("existing", anotherProbeToRemove)); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void report() { + } + + private static class ProbeSubject implements ProbeIndicator.Observation {} + + private static class ProbeImplementation implements Probe { + @Override + public Analysis analyze(ProbeSubject observation) { + return Analysis.builder().build(); + } + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/health/StatusTest.java b/logstash-core/src/test/java/org/logstash/health/StatusTest.java index 86e84107c62..32eca41d593 100644 --- a/logstash-core/src/test/java/org/logstash/health/StatusTest.java +++ b/logstash-core/src/test/java/org/logstash/health/StatusTest.java @@ -28,14 +28,14 @@ public static class Tests { @Test public void testReduceUnknown() { assertThat(UNKNOWN.reduce(UNKNOWN), is(UNKNOWN)); - assertThat(UNKNOWN.reduce(GREEN), is(GREEN)); + assertThat(UNKNOWN.reduce(GREEN), is(UNKNOWN)); assertThat(UNKNOWN.reduce(YELLOW), is(YELLOW)); assertThat(UNKNOWN.reduce(RED), is(RED)); } @Test public void testReduceGreen() { - assertThat(GREEN.reduce(UNKNOWN), is(GREEN)); + assertThat(GREEN.reduce(UNKNOWN), is(UNKNOWN)); assertThat(GREEN.reduce(GREEN), is(GREEN)); assertThat(GREEN.reduce(YELLOW), is(YELLOW)); assertThat(GREEN.reduce(RED), is(RED));