diff --git a/.buildkite/scripts/health-report-tests/README.md b/.buildkite/scripts/health-report-tests/README.md new file mode 100644 index 00000000000..99bd198a823 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/README.md @@ -0,0 +1,18 @@ +## Description +This package for integration tests of the Health Report API. +Export `LS_BRANCH` to run on a specific branch. By default, it uses the main branch. + +## How to run the Health Report Integration test? +### Prerequisites +Make sure you have python installed. Install the integration test dependencies with the following command: +```shell +python3 -mpip install -r .buildkite/scripts/health-report-tests/requirements.txt +``` + +### Run the integration tests +```shell +python3 .buildkite/scripts/health-report-tests/main.py +``` + +### Troubleshooting +- If you get `WARNING: pip is configured with locations that require TLS/SSL,...` warning message, make sure you have python >=3.12.4 installed. \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/__init__.py b/.buildkite/scripts/health-report-tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/.buildkite/scripts/health-report-tests/bootstrap.py b/.buildkite/scripts/health-report-tests/bootstrap.py new file mode 100644 index 00000000000..180592b4e53 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/bootstrap.py @@ -0,0 +1,101 @@ +""" +Health Report Integration test bootstrapper with Python script + - A script to resolve Logstash version if not provided + - Download LS docker image and spin up + - When tests finished, teardown the Logstash +""" +import os +import subprocess +import util +import yaml + + +class Bootstrap: + ELASTIC_STACK_VERSIONS_URL = "https://artifacts-api.elastic.co/v1/versions" + + def __init__(self) -> None: + f""" + A constructor of the {Bootstrap}. + Returns: + Resolves Logstash branch considering provided LS_BRANCH + Checks out git branch + """ + logstash_branch = os.environ.get("LS_BRANCH") + if logstash_branch is None: + # version is not specified, use the main branch, no need to git checkout + print(f"LS_BRANCH is not specified, using main branch.") + else: + # LS_BRANCH accepts major latest as a major.x or specific branch as X.Y + if logstash_branch.find(".x") == -1: + print(f"Using specified branch: {logstash_branch}") + util.git_check_out_branch(logstash_branch) + else: + major_version = logstash_branch.split(".")[0] + if major_version and major_version.isnumeric(): + resolved_version = self.__resolve_latest_stack_version_for(major_version) + minor_version = resolved_version.split(".")[1] + branch = major_version + "." + minor_version + print(f"Using resolved branch: {branch}") + util.git_check_out_branch(branch) + else: + raise ValueError(f"Invalid value set to LS_BRANCH. Please set it properly (ex: 8.x or 9.0) and " + f"rerun again") + + def __resolve_latest_stack_version_for(self, major_version: str) -> str: + resolved_version = "" + response = util.call_url_with_retry(self.ELASTIC_STACK_VERSIONS_URL) + release_versions = response.json()["versions"] + for release_version in reversed(release_versions): + if release_version.find("SNAPSHOT") > 0: + continue + if release_version.split(".")[0] == major_version: + print(f"Resolved latest version for {major_version} is {release_version}.") + resolved_version = release_version + break + + if resolved_version == "": + raise ValueError(f"Cannot resolve latest version for {major_version} major") + return resolved_version + + def install_plugin(self, plugin_path: str) -> None: + util.run_or_raise_error( + ["bin/logstash-plugin", "install", plugin_path], + f"Failed to install {plugin_path}") + + def build_logstash(self): + print(f"Building Logstash.") + util.run_or_raise_error( + ["./gradlew", "clean", "bootstrap", "assemble", "installDefaultGems"], + "Failed to build Logstash") + print(f"Logstash has successfully built.") + + def apply_config(self, config: dict) -> None: + with open(os.getcwd() + "/.buildkite/scripts/health-report-tests/config/pipelines.yml", 'w') as pipelines_file: + yaml.dump(config, pipelines_file) + + def run_logstash(self, full_start_required: bool) -> subprocess.Popen: + # --config.reload.automatic is to make instance active + # it is helpful when testing crash pipeline cases + config_path = os.getcwd() + "/.buildkite/scripts/health-report-tests/config" + process = subprocess.Popen(["bin/logstash", "--config.reload.automatic", "--path.settings", config_path, + "-w 1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=False) + if process.poll() is not None: + print(f"Logstash failed to run, check the the config and logs, then rerun.") + return None + + # Read stdout and stderr in real-time + logs = [] + for stdout_line in iter(process.stdout.readline, ""): + logs.append(stdout_line.strip()) + # we don't wait for Logstash fully start as we also test slow pipeline start scenarios + if full_start_required is False and "Starting pipeline" in stdout_line: + break + if full_start_required is True and "Pipeline started" in stdout_line: + break + if "Logstash shut down" in stdout_line or "Logstash stopped" in stdout_line: + print(f"Logstash couldn't spin up.") + print(logs) + return None + + print(f"Logstash is running with PID: {process.pid}.") + return process diff --git a/.buildkite/scripts/health-report-tests/config/pipelines.yml b/.buildkite/scripts/health-report-tests/config/pipelines.yml new file mode 100644 index 00000000000..cfa2d9632f9 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/config/pipelines.yml @@ -0,0 +1 @@ +# Intentionally left blank \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/config_validator.py b/.buildkite/scripts/health-report-tests/config_validator.py new file mode 100644 index 00000000000..a0b6df9b72d --- /dev/null +++ b/.buildkite/scripts/health-report-tests/config_validator.py @@ -0,0 +1,69 @@ +import yaml +from typing import Any, List, Dict + + +class ConfigValidator: + REQUIRED_KEYS = { + "root": ["name", "config", "conditions", "expectation"], + "config": ["pipeline.id", "config.string"], + "conditions": ["full_start_required"], + "expectation": ["status", "symptom", "indicators"], + "indicators": ["pipelines"], + "pipelines": ["status", "symptom", "indicators"], + "DYNAMIC": ["status", "symptom", "diagnosis", "impacts", "details"], + "details": ["status"], + "status": ["state"] + } + + def __init__(self): + self.yaml_content = None + + def __has_valid_keys(self, data: any, key_path: str, repeated: bool) -> bool: + if isinstance(data, str) or isinstance(data, bool): # we reached values + return True + + # we have two indicators section and for the next repeated ones, we go deeper + first_key = next(iter(data)) + data = data[first_key] if repeated and key_path == "indicators" else data + + if isinstance(data, dict): + # pipeline-id is a DYNAMIC + required = self.REQUIRED_KEYS.get("DYNAMIC" if repeated and key_path == "indicators" else key_path, []) + repeated = not repeated if key_path == "indicators" else repeated + for key in required: + if key not in data: + print(f"Missing key '{key}' in '{key_path}'") + return False + else: + dic_keys_result = self.__has_valid_keys(data[key], key, repeated) + if dic_keys_result is False: + return False + elif isinstance(data, list): + for item in data: + list_keys_result = self.__has_valid_keys(item, key_path, repeated) + if list_keys_result is False: + return False + return True + + def load(self, file_path: str) -> None: + """Load the YAML file content into self.yaml_content.""" + self.yaml_content: [Dict[str, Any]] = None + try: + with open(file_path, 'r') as file: + self.yaml_content = yaml.safe_load(file) + except yaml.YAMLError as exc: + print(f"Error in YAML file: {exc}") + self.yaml_content = None + + def is_valid(self) -> bool: + """Validate the entire YAML structure.""" + if self.yaml_content is None: + print(f"YAML content is empty.") + return False + + if not isinstance(self.yaml_content, dict): + print(f"YAML structure is not as expected, it should start with a Dict.") + return False + + result = self.__has_valid_keys(self.yaml_content, "root", False) + return True if result is True else False diff --git a/.buildkite/scripts/health-report-tests/logstash_health_report.py b/.buildkite/scripts/health-report-tests/logstash_health_report.py new file mode 100644 index 00000000000..6b00cff07bb --- /dev/null +++ b/.buildkite/scripts/health-report-tests/logstash_health_report.py @@ -0,0 +1,16 @@ +""" +A class to provide information about Logstash node stats. +""" + +import util + + +class LogstashHealthReport: + LOGSTASH_HEALTH_REPORT_URL = "http://localhost:9600/_health_report" + + def __init__(self): + pass + + def get(self): + response = util.call_url_with_retry(self.LOGSTASH_HEALTH_REPORT_URL) + return response.json() diff --git a/.buildkite/scripts/health-report-tests/main.py b/.buildkite/scripts/health-report-tests/main.py new file mode 100644 index 00000000000..bccfe7fe0c5 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/main.py @@ -0,0 +1,87 @@ +""" + Main entry point of the LS health report API integration test suites +""" +import glob +import os +import time +import traceback +import yaml +from bootstrap import Bootstrap +from scenario_executor import ScenarioExecutor +from config_validator import ConfigValidator + + +class BootstrapContextManager: + + def __init__(self): + pass + + def __enter__(self): + print(f"Starting Logstash Health Report Integration test.") + self.bootstrap = Bootstrap() + self.bootstrap.build_logstash() + + plugin_path = os.getcwd() + "/qa/support/logstash-integration-failure_injector/logstash-integration" \ + "-failure_injector-*.gem" + matching_files = glob.glob(plugin_path) + if len(matching_files) == 0: + raise ValueError(f"Could not find logstash-integration-failure_injector plugin.") + + self.bootstrap.install_plugin(matching_files[0]) + print(f"logstash-integration-failure_injector successfully installed.") + return self.bootstrap + + def __exit__(self, exc_type, exc_value, exc_traceback): + if exc_type is not None: + print(traceback.format_exception(exc_type, exc_value, exc_traceback)) + + +def main(): + with BootstrapContextManager() as bootstrap: + scenario_executor = ScenarioExecutor() + config_validator = ConfigValidator() + + working_dir = os.getcwd() + scenario_files_path = working_dir + "/.buildkite/scripts/health-report-tests/tests/*.yaml" + scenario_files = glob.glob(scenario_files_path) + + for scenario_file in scenario_files: + print(f"Validating {scenario_file} scenario file.") + config_validator.load(scenario_file) + if config_validator.is_valid() is False: + print(f"{scenario_file} scenario file is not valid.") + return + else: + print(f"Validation succeeded.") + + has_failed_scenario = False + for scenario_file in scenario_files: + with open(scenario_file, 'r') as file: + # scenario_content: Dict[str, Any] = None + scenario_content = yaml.safe_load(file) + print(f"Testing `{scenario_content.get('name')}` scenario.") + scenario_name = scenario_content['name'] + + is_full_start_required = next(sub.get('full_start_required') for sub in + scenario_content.get('conditions') if 'full_start_required' in sub) + config = scenario_content['config'] + if config is not None: + bootstrap.apply_config(config) + expectations = scenario_content.get("expectation") + process = bootstrap.run_logstash(is_full_start_required) + if process is not None: + try: + scenario_executor.on(scenario_name, expectations) + except Exception as e: + print(e) + has_failed_scenario = True + process.terminate() + time.sleep(5) # leave some window to terminate the process + + if has_failed_scenario: + # intentionally fail due to visibility + raise Exception("Some of scenarios failed, check the log for details.") + + +if __name__ == "__main__": + main() diff --git a/.buildkite/scripts/health-report-tests/main.sh b/.buildkite/scripts/health-report-tests/main.sh index c31a4b120e3..8b0dd00cd5f 100755 --- a/.buildkite/scripts/health-report-tests/main.sh +++ b/.buildkite/scripts/health-report-tests/main.sh @@ -1,9 +1,5 @@ #!/usr/bin/env bash set -eo pipefail -# TODO: -# if branch is specified with X.Y, pull branches from ACTIVE_BRANCHES_URL="https://raw.githubusercontent.com/elastic/logstash/main/ci/branches.json", parse and use -# build Logstash from specificed (ex: 8.x -> translates to 8.latest, 8.16) branch, defaults to main -# install requirements of the python package and run main.py - - +python3 -mpip install -r .buildkite/scripts/health-report-tests/requirements.txt +python3 .buildkite/scripts/health-report-tests/main.py diff --git a/.buildkite/scripts/health-report-tests/requirements.txt b/.buildkite/scripts/health-report-tests/requirements.txt new file mode 100644 index 00000000000..c48e502117b --- /dev/null +++ b/.buildkite/scripts/health-report-tests/requirements.txt @@ -0,0 +1,2 @@ +requests==2.32.3 +pyyaml==6.0.2 \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/scenario_executor.py b/.buildkite/scripts/health-report-tests/scenario_executor.py new file mode 100644 index 00000000000..2db8a31d850 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/scenario_executor.py @@ -0,0 +1,65 @@ +""" +A class to execute the given scenario for Logstash Health Report integration test +""" +import time +from logstash_health_report import LogstashHealthReport + + +class ScenarioExecutor: + logstash_health_report_api = LogstashHealthReport() + + def __init__(self): + pass + + def __has_intersection(self, expects, results): + # we expect expects to be existing in results + for expect in expects: + for result in results: + if result.get('help_url') and "health-report-pipeline-status.html#" not in result.get('help_url'): + return False + if not all(key in result and result[key] == value for key, value in expect.items()): + return False + return True + + def __get_difference(self, differences: list, expectations: dict, reports: dict) -> dict: + for key in expectations.keys(): + + if type(expectations.get(key)) != type(reports.get(key)): + differences.append(f"Scenario expectation and Health API report structure differs for {key}.") + return differences + + if isinstance(expectations.get(key), str): + if expectations.get(key) != reports.get(key): + differences.append({key: {"expected": expectations.get(key), "got": reports.get(key)}}) + continue + elif isinstance(expectations.get(key), dict): + self.__get_difference(differences, expectations.get(key), reports.get(key)) + elif isinstance(expectations.get(key), list): + if not self.__has_intersection(expectations.get(key), reports.get(key)): + differences.append({key: {"expected": expectations.get(key), "got": reports.get(key)}}) + return differences + + def __is_expected(self, expectations: dict) -> None: + reports = self.logstash_health_report_api.get() + differences = self.__get_difference([], expectations, reports) + if differences: + print("Differences found in 'expectation' section between YAML content and stats:") + for diff in differences: + print(f"Difference: {diff}") + return False + else: + return True + + def on(self, scenario_name: str, expectations: dict) -> None: + # retriable check the expectations + attempts = 5 + while self.__is_expected(expectations) is False: + attempts = attempts - 1 + if attempts == 0: + break + time.sleep(1) + + if attempts == 0: + raise Exception(f"{scenario_name} failed.") + else: + print(f"Scenario `{scenario_name}` expectaion meets the health report stats.") diff --git a/.buildkite/scripts/health-report-tests/tests/abnormal-termination.yaml b/.buildkite/scripts/health-report-tests/tests/abnormal-termination.yaml new file mode 100644 index 00000000000..219d8e93b7b --- /dev/null +++ b/.buildkite/scripts/health-report-tests/tests/abnormal-termination.yaml @@ -0,0 +1,31 @@ +name: "Abnormally terminated pipeline" +config: + - pipeline.id: abnormally-terminated-pp + config.string: | + input { heartbeat { interval => 1 } } + filter { failure_injector { crash_at => filter } } + output { stdout {} } + pipeline.workers: 1 + pipeline.batch.size: 1 +conditions: + - full_start_required: true +expectation: + status: "red" + symptom: "1 indicator is unhealthy (`pipelines`)" + indicators: + pipelines: + status: "red" + symptom: "1 indicator is unhealthy (`abnormally-terminated-pp`)" + indicators: + abnormally-terminated-pp: + status: "red" + symptom: "The pipeline is unhealthy; 1 area is impacted and 1 diagnosis is available" + diagnosis: + - cause: "pipeline is not running, likely because it has encountered an error" + - action: "view logs to determine the cause of abnormal pipeline shutdown" + impacts: + - description: "the pipeline is not currently processing" + - impact_areas: ["pipeline_execution"] + details: + status: + state: "TERMINATED" \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/tests/normal-termination.yaml b/.buildkite/scripts/health-report-tests/tests/normal-termination.yaml new file mode 100644 index 00000000000..86a05deb5fa --- /dev/null +++ b/.buildkite/scripts/health-report-tests/tests/normal-termination.yaml @@ -0,0 +1,29 @@ +name: "Successfully terminated pipeline" +config: + - pipeline.id: normally-terminated-pp + config.string: | + input { generator { count => 1 } } + output { stdout {} } + pipeline.workers: 1 + pipeline.batch.size: 1 +conditions: + - full_start_required: true +expectation: + status: "yellow" + symptom: "1 indicator is concerning (`pipelines`)" + indicators: + pipelines: + status: "yellow" + symptom: "1 indicator is concerning (`normally-terminated-pp`)" + indicators: + normally-terminated-pp: + status: "yellow" + symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available" + diagnosis: + - cause: "pipeline has finished running because its inputs have been closed and events have been processed" + - action: "if you expect this pipeline to run indefinitely, you will need to configure its inputs to continue receiving or fetching events" + impacts: + - impact_areas: ["pipeline_execution"] + details: + status: + state: "FINISHED" \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/tests/slow-start.yaml b/.buildkite/scripts/health-report-tests/tests/slow-start.yaml new file mode 100644 index 00000000000..d036391a9c3 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/tests/slow-start.yaml @@ -0,0 +1,30 @@ +name: "Slow start pipeline" +config: + - pipeline.id: slow-start-pp + config.string: | + input { heartbeat {} } + filter { failure_injector { degrade_at => [register] } } + output { stdout {} } + pipeline.workers: 1 + pipeline.batch.size: 1 +conditions: + - full_start_required: false +expectation: + status: "yellow" + symptom: "1 indicator is concerning (`pipelines`)" + indicators: + pipelines: + status: "yellow" + symptom: "1 indicator is concerning (`slow-start-pp`)" + indicators: + slow-start-pp: + status: "yellow" + symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available" + diagnosis: + - cause: "pipeline is loading" + - action: "if pipeline does not come up quickly, you may need to check the logs to see if it is stalled" + impacts: + - impact_areas: ["pipeline_execution"] + details: + status: + state: "LOADING" \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/util.py b/.buildkite/scripts/health-report-tests/util.py new file mode 100644 index 00000000000..c7e840f06f4 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/util.py @@ -0,0 +1,35 @@ +import requests +import subprocess +from requests.adapters import HTTPAdapter, Retry + + +def call_url_with_retry(url: str, max_retries: int = 5, delay: int = 1) -> requests.Response: + f""" + Calls the given {url} with maximum of {max_retries} retries with {delay} delay. + """ + schema = "https://" if "https://" in url else "http://" + session = requests.Session() + # retry on most common failures such as connection timeout(408), etc... + retries = Retry(total=max_retries, backoff_factor=delay, status_forcelist=[408, 502, 503, 504]) + session.mount(schema, HTTPAdapter(max_retries=retries)) + return session.get(url) + + +def git_check_out_branch(branch_name: str) -> None: + f""" + Checks out specified branch or fails with error if checkout operation fails. + """ + run_or_raise_error(["git", "checkout", branch_name], + "Error occurred while checking out the " + branch_name + " branch") + + +def run_or_raise_error(commands: list, error_message): + f""" + Executes the {list} commands and raises an {Exception} if opration fails. + """ + result = subprocess.run(commands, universal_newlines=True, stdout=subprocess.PIPE) + if result.returncode != 0: + full_error_message = (error_message + ", output: " + result.stdout.decode('utf-8')) \ + if result.stdout else error_message + raise Exception(f"{full_error_message}") + diff --git a/docs/static/monitoring/monitoring-apis.asciidoc b/docs/static/monitoring/monitoring-apis.asciidoc index 897507d1e22..68b4a0b8378 100644 --- a/docs/static/monitoring/monitoring-apis.asciidoc +++ b/docs/static/monitoring/monitoring-apis.asciidoc @@ -2,13 +2,13 @@ [[monitoring]] == APIs for monitoring {ls} -{ls} provides monitoring APIs for retrieving runtime metrics -about {ls}: +{ls} provides monitoring APIs for retrieving runtime information about {ls}: * <> * <> * <> * <> +* <> You can use the root resource to retrieve general information about the Logstash instance, including @@ -1184,3 +1184,155 @@ Example of a human-readable response: org.jruby.internal.runtime.NativeThread.join(NativeThread.java:75) -------------------------------------------------- + + +[[logstash-health-report-api]] +=== Health report API + +An API that reports the health status of Logstash. + +[source,js] +-------------------------------------------------- +curl -XGET 'localhost:9600/_health_report?pretty' +-------------------------------------------------- + +==== Description + +The health API returns a report with the health status of Logstash and the pipelines that are running inside of it. +The report contains a list of indicators that compose Logstash functionality. + +Each indicator has a health status of: `green`, `unknown`, `yellow`, or `red`. +The indicator will provide an explanation and metadata describing the reason for its current health status. + +The top-level status is controlled by the worst indicator status. + +In the event that an indicator's status is non-green, a list of impacts may be present in the indicator result which detail the functionalities that are negatively affected by the health issue. +Each impact carries with it a severity level, an area of the system that is affected, and a simple description of the impact on the system. + +Some health indicators can determine the root cause of a health problem and prescribe a set of steps that can be performed in order to improve the health of the system. +The root cause and remediation steps are encapsulated in a `diagnosis`. +A diagnosis contains a cause detailing a root cause analysis, an action containing a brief description of the steps to take to fix the problem, and the URL for detailed troubleshooting help. + +NOTE: The health indicators perform root cause analysis of non-green health statuses. + This can be computationally expensive when called frequently. + +==== Response body + +`status`:: +(Optional, string) Health status of {ls}, based on the aggregated status of all indicators. Statuses are: + +`green`::: +{ls} is healthy. + +`unknown`::: +The health of {ls} could not be determined. + +`yellow`::: +The functionality of {ls} is in a degraded state and may need remediation to avoid the health becoming `red`. + +`red`::: +{ls} is experiencing an outage or certain features are unavailable for use. + +`indicators`:: +(object) Information about the health of the {ls} indicators. + ++ +.Properties of `indicators` +[%collapsible%open] +==== +``:: +(object) Contains health results for an indicator. ++ +.Properties of `` +[%collapsible%open] +======= +`status`:: +(string) Health status of the indicator. Statuses are: + +`green`::: +The indicator is healthy. + +`unknown`::: +The health of the indicator could not be determined. + +`yellow`::: +The functionality of an indicator is in a degraded state and may need remediation to avoid the health becoming `red`. + +`red`::: +The indicator is experiencing an outage or certain features are unavailable for use. + +`symptom`:: +(string) A message providing information about the current health status. + +`details`:: +(Optional, object) An object that contains additional information about the indicator that has lead to the current health status result. +Each indicator has <>. + +`impacts`:: +(Optional, array) If a non-healthy status is returned, indicators may include a list of impacts that this health status will have on {ls}. ++ +.Properties of `impacts` +[%collapsible%open] +======== +`severity`:: +(integer) How important this impact is to the functionality of {ls}. +A value of 1 is the highest severity, with larger values indicating lower severity. + +`description`:: +(string) A description of the impact on {ls}. + +`impact_areas`:: +(array of strings) The areas {ls} functionality that this impact affects. +Possible values are: ++ +-- +* `pipeline_execution` +-- + +======== + +`diagnosis`:: +(Optional, array) If a non-healthy status is returned, indicators may include a list of diagnosis that encapsulate the cause of the health issue and an action to take in order to remediate the problem. ++ +.Properties of `diagnosis` +[%collapsible%open] +======== +`cause`:: +(string) A description of a root cause of this health problem. + +`action`:: +(string) A brief description the steps that should be taken to remediate the problem. +A more detailed step-by-step guide to remediate the problem is provided by the `help_url` field. + +`help_url`:: +(string) A link to the troubleshooting guide that'll fix the health problem. +======== +======= +==== + +[role="child_attributes"] +[[logstash-health-api-response-details]] +==== Indicator Details + +Each health indicator in the health API returns a set of details that further explains the state of the system. +The details have contents and a structure that is unique to each indicator. + +[[logstash-health-api-response-details-pipeline]] +===== Pipeline Indicator Details + +`pipelines/indicators//details`:: +(object) Information about the specified pipeline. ++ +.Properties of `pipelines/indicators//details` +[%collapsible%open] +==== +`status`:: +(object) Details related to the pipeline's current status and run-state. ++ +.Properties of `status` +[%collapsible%open] +======== +`state`:: +(string) The current state of the pipeline, including whether it is `loading`, `running`, `finished`, or `terminated`. +======== +==== diff --git a/docs/static/releasenotes.asciidoc b/docs/static/releasenotes.asciidoc index 72d3dcbab72..0151b278af7 100644 --- a/docs/static/releasenotes.asciidoc +++ b/docs/static/releasenotes.asciidoc @@ -2517,4 +2517,4 @@ We have added another flag to the Benchmark CLI to allow passing a data file wit This feature allows users to run the Benchmark CLI in a custom test case with a custom config and a custom dataset. https://github.com/elastic/logstash/pull/12437[#12437] ==== Plugin releases -Plugins align with release 7.14.0 \ No newline at end of file +Plugins align with release 7.14.0 diff --git a/docs/static/troubleshoot/health-pipeline-status.asciidoc b/docs/static/troubleshoot/health-pipeline-status.asciidoc new file mode 100644 index 00000000000..095ef85f950 --- /dev/null +++ b/docs/static/troubleshoot/health-pipeline-status.asciidoc @@ -0,0 +1,37 @@ +[[health-report-pipeline-status]] +=== Health Report Pipeline Status + +The Pipeline indicator has a `status` probe that is capable of producing one of several diagnoses about the pipeline's lifecycle, indicating whether the pipeline is currently running. + +[[health-report-pipeline-status-diagnosis-loading]] +==== [[loading]]Loading Pipeline + +A pipeline that is loading is not yet processing data, and is considered a temporarily-degraded pipeline state. +Some plugins perform actions or pre-validation that can delay the starting of the pipeline, such as when a plugin pre-establishes a connection to an external service before allowing the pipeline to start. +When these plugins take significant time to start up, the whole pipeline can remain in a loading state for an extended time. + +If your pipeline does not come up in a reasonable amount of time, consider checking the Logstash logs to see if the plugin shows evidence of being caught in a retry loop. + +[[health-report-pipeline-status-diagnosis-finished]] +==== [[finished]]Finished Pipeline + +A logstash pipeline whose input plugins have all completed will be shut down once events have finished processing. + +Many plugins can be configured to run indefinitely, either by listening for new inbound events or by polling for events on a schedule. +A finished pipeline will not produce or process any more events until it is restarted, which will occur if the pipeline's definition is changed and pipeline reloads are enabled. +If you wish to keep your pipeline runing, consider configuring its input to run on a schedule or otherwise listen for new events. + +[[health-report-pipeline-status-diagnosis-terminated]] +==== [[terminated]]Terminated Pipeline + +When a Logstash pipeline's filter or output plugins crash, the entire pipeline is terminated and intervention is required. + +A terminated pipeline will not produce or process any more events until it is restarted, which will occur if the pipeline's definition is changed and pipeline reloads are enabled. +Check the logs to determine the cause of the crash, and report the issue to the plugin maintainers. + +[[health-report-pipeline-status-diagnosis-unknown]] +==== [[unknown]]Unknown Pipeline + +When a Logstash pipeline either cannot be created or has recently been deleted the health report doesn't know enough to produce a meaningful status. + +Check the logs to determine if the pipeline crashed during creation, and report the issue to the plugin maintainers. diff --git a/docs/static/troubleshoot/troubleshooting.asciidoc b/docs/static/troubleshoot/troubleshooting.asciidoc index b4c8ee7a0d7..66bb60f45e5 100644 --- a/docs/static/troubleshoot/troubleshooting.asciidoc +++ b/docs/static/troubleshoot/troubleshooting.asciidoc @@ -28,3 +28,4 @@ include::ts-logstash.asciidoc[] include::ts-plugins-general.asciidoc[] include::ts-plugins.asciidoc[] include::ts-other-issues.asciidoc[] +include::health-pipeline-status.asciidoc[] diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index 8cb89141809..831f0fefde5 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -57,6 +57,7 @@ def versionMap = (Map) (new Yaml()).load(new File("$projectDir/../versions.yml") description = """Logstash Core Java""" +String logstashCoreVersion = versionMap['logstash-core'] String jacksonVersion = versionMap['jackson'] String jacksonDatabindVersion = versionMap['jackson-databind'] String jrubyVersion = versionMap['jruby']['version'] @@ -183,6 +184,23 @@ artifacts { } } +task generateVersionInfoResources(type: DefaultTask) { + ext.outDir = layout.buildDirectory.dir("generated-resources/version-info").get() + + inputs.property("version-info:logstash-core", logstashCoreVersion) + outputs.dir(ext.outDir) + + doLast { + mkdir outDir; + def resourceFile = outDir.file('version-info.properties').asFile + resourceFile.text = "logstash-core: ${logstashCoreVersion}" + } +} +sourceSets { + main { output.dir(generateVersionInfoResources.outputs.files) } +} +processResources.dependsOn generateVersionInfoResources + configurations { provided } diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 7f99abe224e..d1f8e006995 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -40,6 +40,8 @@ class LogStash::Agent attr_reader :metric, :name, :settings, :dispatcher, :ephemeral_id, :pipeline_bus attr_accessor :logger + attr_reader :health_observer + # initialize method for LogStash::Agent # @param params [Hash] potential parameters are: # :name [String] - identifier for the agent @@ -51,6 +53,9 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil) @auto_reload = setting("config.reload.automatic") @ephemeral_id = SecureRandom.uuid + java_import("org.logstash.health.HealthObserver") + @health_observer ||= HealthObserver.new + # Mutex to synchronize in the exclusive method # Initial usage for the Ruby pipeline initialization which is not thread safe @webserver_control_lock = Mutex.new @@ -151,6 +156,31 @@ def execute transition_to_stopped end + include org.logstash.health.PipelineIndicator::PipelineDetailsProvider + def pipeline_details(pipeline_id) + logger.trace("fetching pipeline details for `#{pipeline_id}`") + pipeline_id = pipeline_id.to_sym + + java_import org.logstash.health.PipelineIndicator + + pipeline_state = @pipelines_registry.states.get(pipeline_id) + if pipeline_state.nil? + return PipelineIndicator::Details.new(PipelineIndicator::Status::UNKNOWN) + end + + status = pipeline_state.synchronize do |sync_state| + case + when sync_state.loading? then PipelineIndicator::Status::LOADING + when sync_state.crashed? then PipelineIndicator::Status::TERMINATED + when sync_state.running? then PipelineIndicator::Status::RUNNING + when sync_state.finished? then PipelineIndicator::Status::FINISHED + else PipelineIndicator::Status::UNKNOWN + end + end + + return PipelineIndicator::Details.new(status) + end + def auto_reload? @auto_reload end diff --git a/logstash-core/lib/logstash/api/command_factory.rb b/logstash-core/lib/logstash/api/command_factory.rb index 6ed9e570142..0b246f14034 100644 --- a/logstash-core/lib/logstash/api/command_factory.rb +++ b/logstash-core/lib/logstash/api/command_factory.rb @@ -18,6 +18,7 @@ require "logstash/api/service" require "logstash/api/commands/system/basicinfo_command" require "logstash/api/commands/system/plugins_command" +require "logstash/api/commands/health_report" require "logstash/api/commands/stats" require "logstash/api/commands/node" require "logstash/api/commands/default_metadata" @@ -34,6 +35,7 @@ def initialize(service) :plugins_command => ::LogStash::Api::Commands::System::Plugins, :stats => ::LogStash::Api::Commands::Stats, :node => ::LogStash::Api::Commands::Node, + :health_report => ::LogStash::Api::Commands::HealthReport, :default_metadata => ::LogStash::Api::Commands::DefaultMetadata } end diff --git a/logstash-core/lib/logstash/api/commands/default_metadata.rb b/logstash-core/lib/logstash/api/commands/default_metadata.rb index 26f4af31337..635e3e5f43a 100644 --- a/logstash-core/lib/logstash/api/commands/default_metadata.rb +++ b/logstash-core/lib/logstash/api/commands/default_metadata.rb @@ -22,20 +22,14 @@ module Api module Commands class DefaultMetadata < Commands::Base def all - res = {:host => host, - :version => version, - :http_address => http_address, - :id => service.agent.id, - :name => service.agent.name, - :ephemeral_id => service.agent.ephemeral_id, - :status => "green", # This is hard-coded to mirror x-pack behavior - :snapshot => ::BUILD_INFO["build_snapshot"], + res = base_info.merge({ + :status => service.agent.health_observer.status, :pipeline => { :workers => LogStash::SETTINGS.get("pipeline.workers"), :batch_size => LogStash::SETTINGS.get("pipeline.batch.size"), :batch_delay => LogStash::SETTINGS.get("pipeline.batch.delay"), }, - } + }) monitoring = {} if enabled_xpack_monitoring? monitoring = monitoring.merge({ @@ -49,6 +43,18 @@ def all res.merge(monitoring.empty? ? {} : {:monitoring => monitoring}) end + def base_info + { + :host => host, + :version => version, + :http_address => http_address, + :id => service.agent.id, + :name => service.agent.name, + :ephemeral_id => service.agent.ephemeral_id, + :snapshot => ::BUILD_INFO["build_snapshot"], + } + end + def host @@host ||= Socket.gethostname end diff --git a/logstash-core/lib/logstash/api/commands/health_report.rb b/logstash-core/lib/logstash/api/commands/health_report.rb new file mode 100644 index 00000000000..d53a313b499 --- /dev/null +++ b/logstash-core/lib/logstash/api/commands/health_report.rb @@ -0,0 +1,31 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "logstash/api/commands/base" + +module LogStash + module Api + module Commands + class HealthReport < Commands::Base + + def all(selected_fields = []) + service.agent.health_observer.report + end + end + end + end +end \ No newline at end of file diff --git a/logstash-core/lib/logstash/api/modules/health_report.rb b/logstash-core/lib/logstash/api/modules/health_report.rb new file mode 100644 index 00000000000..ff5728d94e7 --- /dev/null +++ b/logstash-core/lib/logstash/api/modules/health_report.rb @@ -0,0 +1,49 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module LogStash + module Api + module Modules + class HealthReport < ::LogStash::Api::Modules::Base + + get "/" do + payload = health_report.all.then do |health_report_pojo| + # The app_helper needs a ruby-hash. + # Manually creating a map of properties works around the issue. + base_metadata.merge({ + status: health_report_pojo.status, + symptom: health_report_pojo.symptom, + indicators: health_report_pojo.indicators, + }) + end + + respond_with(payload, {exclude_default_metadata: true}) + end + + private + + def health_report + @health_report ||= factory.build(:health_report) + end + + def base_metadata + @factory.build(:default_metadata).base_info + end + end + end + end +end \ No newline at end of file diff --git a/logstash-core/lib/logstash/api/rack_app.rb b/logstash-core/lib/logstash/api/rack_app.rb index c14bdf26a23..ee3e409e95e 100644 --- a/logstash-core/lib/logstash/api/rack_app.rb +++ b/logstash-core/lib/logstash/api/rack_app.rb @@ -18,6 +18,7 @@ require "rack" require "sinatra/base" require "logstash/api/modules/base" +require "logstash/api/modules/health_report" require "logstash/api/modules/node" require "logstash/api/modules/node_stats" require "logstash/api/modules/plugins" @@ -123,6 +124,7 @@ def self.app(logger, agent, environment) def self.rack_namespaces(agent) { + "/_health_report" => LogStash::Api::Modules::HealthReport, "/_node" => LogStash::Api::Modules::Node, "/_stats" => LogStash::Api::Modules::Stats, "/_node/stats" => LogStash::Api::Modules::NodeStats, diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 9cec566ccf0..b30d11e2be7 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -65,6 +65,7 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil) @flushing = java.util.concurrent.atomic.AtomicBoolean.new(false) @flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false) @shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false) + @crash_detected = Concurrent::AtomicBoolean.new(false) @outputs_registered = Concurrent::AtomicBoolean.new(false) # @finished_execution signals that the pipeline thread has finished its execution @@ -87,6 +88,10 @@ def finished_execution? @finished_execution.true? end + def finished_run? + @finished_run.true? + end + def ready? @ready.value end @@ -229,6 +234,10 @@ def stopped? @running.false? end + def crashed? + @crash_detected.true? + end + # register_plugins calls #register_plugin on the plugins list and upon exception will call Plugin#do_close on all registered plugins # @param plugins [Array[Plugin]] the list of plugins to register def register_plugins(plugins) @@ -305,6 +314,7 @@ def start_workers rescue => e # WorkerLoop.run() catches all Java Exception class and re-throws as IllegalStateException with the # original exception as the cause + @crash_detected.make_true @logger.error( "Pipeline worker error, the pipeline will be stopped", default_logging_keys(:error => e.cause.message, :exception => e.cause.class, :backtrace => e.cause.backtrace) @@ -319,6 +329,7 @@ def start_workers begin start_inputs rescue => e + @crash_detected.make_true # if there is any exception in starting inputs, make sure we shutdown workers. # exception will already by logged in start_inputs shutdown_workers diff --git a/logstash-core/lib/logstash/pipeline_action/create.rb b/logstash-core/lib/logstash/pipeline_action/create.rb index ffd09777733..6f0fff00119 100644 --- a/logstash-core/lib/logstash/pipeline_action/create.rb +++ b/logstash-core/lib/logstash/pipeline_action/create.rb @@ -46,13 +46,21 @@ def execution_priority # The execute assume that the thread safety access of the pipeline # is managed by the caller. def execute(agent, pipelines_registry) + attach_health_indicator(agent) new_pipeline = LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do new_pipeline.start # block until the pipeline is correctly started or crashed end + LogStash::ConvergeResult::ActionResult.create(self, success) end + def attach_health_indicator(agent) + health_observer = agent.health_observer + health_observer.detach_pipeline_indicator(pipeline_id) # just in case ... + health_observer.attach_pipeline_indicator(pipeline_id, agent) + end + def to_s "PipelineAction::Create<#{pipeline_id}>" end diff --git a/logstash-core/lib/logstash/pipeline_action/delete.rb b/logstash-core/lib/logstash/pipeline_action/delete.rb index 1a19509ba2f..c072e70bf38 100644 --- a/logstash-core/lib/logstash/pipeline_action/delete.rb +++ b/logstash-core/lib/logstash/pipeline_action/delete.rb @@ -27,10 +27,15 @@ def initialize(pipeline_id) def execute(agent, pipelines_registry) success = pipelines_registry.delete_pipeline(@pipeline_id) + detach_health_indicator(agent) if success LogStash::ConvergeResult::ActionResult.create(self, success) end + def detach_health_indicator(agent) + agent.health_observer.detach_pipeline_indicator(pipeline_id) + end + def to_s "PipelineAction::Delete<#{pipeline_id}>" end diff --git a/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb b/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb index c627087ed42..4c8e6ded037 100644 --- a/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb +++ b/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb @@ -31,10 +31,15 @@ def execute(agent, pipelines_registry) end success = pipelines_registry.delete_pipeline(@pipeline_id) + detach_health_indicator(agent) if success LogStash::ConvergeResult::ActionResult.create(self, success) end + def detach_health_indicator(agent) + agent.health_observer.detach_pipeline_indicator(pipeline_id) + end + def to_s "PipelineAction::StopAndDelete<#{pipeline_id}>" end diff --git a/logstash-core/lib/logstash/pipelines_registry.rb b/logstash-core/lib/logstash/pipelines_registry.rb index 3810201e8bb..3752003477c 100644 --- a/logstash-core/lib/logstash/pipelines_registry.rb +++ b/logstash-core/lib/logstash/pipelines_registry.rb @@ -28,6 +28,7 @@ def initialize(pipeline_id, pipeline) @lock = Monitor.new end + # a terminated pipeline has either crashed OR finished normally def terminated? @lock.synchronize do # a loading pipeline is never considered terminated @@ -35,6 +36,20 @@ def terminated? end end + # a finished pipeline finished _normally_ without exception + def finished? + @lock.synchronize do + # a loading pipeline is never considered terminated + @loading.false? && @pipeline.finished_run? + end + end + + def crashed? + @lock.synchronize do + @pipeline&.crashed? + end + end + def running? @lock.synchronize do # not terminated and not loading @@ -104,6 +119,7 @@ def size end end + def empty? @lock.synchronize do @states.empty? diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index 24bcb3adc0c..c64d9275470 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -587,6 +587,7 @@ def flush(options) # wait until there is no more worker thread since we have a single worker that should have died wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey + expect(subject.crashed?).to be true # at this point the input plugin should have been asked to stop wait(5).for {dummyinput.stop?}.to be_truthy @@ -614,6 +615,7 @@ def flush(options) # wait until there is no more worker thread since we have a single worker that should have died wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey + expect(subject.crashed?).to be true # at this point the input plugin should have been asked to stop wait(5).for {dummyinput.stop?}.to be_truthy @@ -779,6 +781,7 @@ def flush(options) expect(input).to receive(:do_close).once pipeline.start pipeline.shutdown + expect(pipeline.crashed?).to be false end end end diff --git a/logstash-core/spec/logstash/pipeline_action/create_spec.rb b/logstash-core/spec/logstash/pipeline_action/create_spec.rb index 16cabacb086..553576b543a 100644 --- a/logstash-core/spec/logstash/pipeline_action/create_spec.rb +++ b/logstash-core/spec/logstash/pipeline_action/create_spec.rb @@ -30,6 +30,7 @@ before do clear_data_dir + allow(agent).to receive(:health_observer).and_return(double("HealthObserver").as_null_object) end subject { described_class.new(pipeline_config, metric) } @@ -66,6 +67,11 @@ it "returns a successful execution status" do expect(subject.execute(agent, pipelines)).to be_truthy end + + it "attached an indicator to the agent's health observer" do + expect(agent.health_observer).to receive(:attach_pipeline_indicator).with(:main, agent) + subject.execute(agent, pipelines) + end end context "when the pipeline doesn't start" do diff --git a/logstash-core/spec/logstash/pipeline_action/delete_spec.rb b/logstash-core/spec/logstash/pipeline_action/delete_spec.rb new file mode 100644 index 00000000000..73193389ae0 --- /dev/null +++ b/logstash-core/spec/logstash/pipeline_action/delete_spec.rb @@ -0,0 +1,78 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "spec_helper" +require_relative "../../support/helpers" +require_relative "../../support/matchers" +require "logstash/pipelines_registry" +require "logstash/pipeline_action/delete" +require "logstash/inputs/generator" + + +describe LogStash::PipelineAction::Delete do + let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" } + let(:pipeline_id) { :main } + let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) } + let(:pipelines) do + LogStash::PipelinesRegistry.new.tap do |chm| + chm.create_pipeline(pipeline_id, pipeline) { true } + end + end + let(:agent) { double("agent") } + + subject { described_class.new(pipeline_id) } + + before do + clear_data_dir + allow(agent).to receive(:health_observer).and_return(double("HealthObserver").as_null_object) + pipeline.start + end + + after do + pipeline.shutdown + end + + it "returns the pipeline_id" do + expect(subject.pipeline_id).to eq(:main) + end + + context "when the pipeline is still running" do + + it 'fails to delete the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to_not be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to_not be_nil + end + end + + context "when the pipeline has completed" do + let(:pipeline_config) { "input { generator { count => 1 } } output { null {} }"} + + before(:each) do + sleep(0.1) until pipelines.non_running_pipelines.keys.include?(pipeline_id) + end + + it 'deletes the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to be_nil + expect(agent.health_observer).to have_received(:detach_pipeline_indicator).with(pipeline_id) + end + end +end \ No newline at end of file diff --git a/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb b/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb new file mode 100644 index 00000000000..a32ed5eb0fa --- /dev/null +++ b/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb @@ -0,0 +1,79 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "spec_helper" +require_relative "../../support/helpers" +require_relative "../../support/matchers" +require "logstash/pipelines_registry" +require "logstash/pipeline_action/delete" +require "logstash/inputs/generator" + + +describe LogStash::PipelineAction::StopAndDelete do + let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" } + let(:pipeline_id) { :main } + let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) } + let(:pipelines) do + LogStash::PipelinesRegistry.new.tap do |chm| + chm.create_pipeline(pipeline_id, pipeline) { true } + end + end + let(:agent) { double("agent") } + + subject { described_class.new(pipeline_id) } + + before do + clear_data_dir + allow(agent).to receive(:health_observer).and_return(double("HealthObserver").as_null_object) + pipeline.start + end + + after do + pipeline.shutdown + end + + it "returns the pipeline_id" do + expect(subject.pipeline_id).to eq(:main) + end + + context "when the pipeline is still running" do + it 'stops and deletes the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to be_nil + expect(agent.health_observer).to have_received(:detach_pipeline_indicator).with(pipeline_id) + end + end + + context "when the pipeline has completed" do + let(:pipeline_config) { "input { generator { count => 1 } } output { null {} }"} + + before(:each) do + sleep(0.1) until pipelines.non_running_pipelines.keys.include?(pipeline_id) + end + + it 'deletes the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to be_nil + + expect(agent.health_observer).to have_received(:detach_pipeline_indicator).with(pipeline_id) + end + end +end \ No newline at end of file diff --git a/logstash-core/spec/logstash/webserver_spec.rb b/logstash-core/spec/logstash/webserver_spec.rb index 50c90911eeb..e766be329be 100644 --- a/logstash-core/spec/logstash/webserver_spec.rb +++ b/logstash-core/spec/logstash/webserver_spec.rb @@ -55,7 +55,13 @@ def free_ports(servers) end let(:logger) { LogStash::Logging::Logger.new("testing") } - let(:agent) { OpenStruct.new({:webserver => webserver_block, :http_address => "127.0.0.1", :id => "myid", :name => "myname"}) } + let(:agent) { OpenStruct.new({ + webserver: webserver_block, + http_address: "127.0.0.1", + id: "myid", + name: "myname", + health_observer: org.logstash.health.HealthObserver.new, + }) } let(:webserver_block) { OpenStruct.new({}) } subject(:webserver) { LogStash::WebServer.new(logger, agent, webserver_options) } diff --git a/logstash-core/src/main/java/org/logstash/Logstash.java b/logstash-core/src/main/java/org/logstash/Logstash.java index eb9c823f397..04ce1b1820f 100644 --- a/logstash-core/src/main/java/org/logstash/Logstash.java +++ b/logstash-core/src/main/java/org/logstash/Logstash.java @@ -26,6 +26,7 @@ import java.io.PrintStream; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Properties; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,6 +46,25 @@ */ public final class Logstash implements Runnable, AutoCloseable { + public static final String VERSION_FULL; + public static final String VERSION_MAJOR; + public static final String VERSION_MINOR; + public static final String VERSION_PATCH; + + static { + final Properties properties = new Properties(); + try { + properties.load(Logstash.class.getResourceAsStream("/version-info.properties")); + VERSION_FULL = properties.getProperty("logstash-core"); + final String[] versions = VERSION_FULL.split("\\."); + VERSION_MAJOR = versions[0]; + VERSION_MINOR = versions[1]; + VERSION_PATCH = versions[2]; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private static final Logger LOGGER = LogManager.getLogger(Logstash.class); /** diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java index e905d922c5b..dbbfb97de5b 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java @@ -22,6 +22,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; + import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.RubyNumeric; diff --git a/logstash-core/src/main/java/org/logstash/health/ApiHealthReport.java b/logstash-core/src/main/java/org/logstash/health/ApiHealthReport.java new file mode 100644 index 00000000000..d2d1cbf6889 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ApiHealthReport.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.Map; + +@JsonSerialize(using = ApiHealthReport.JsonSerializer.class) +public class ApiHealthReport { + private final MultiIndicator.Report delegate; + + public ApiHealthReport(final MultiIndicator.Report delegate) { + this.delegate = delegate; + } + + public Status getStatus() { + return delegate.status(); + } + + public String getSymptom() { + return delegate.symptom(); + } + + public Map getIndicators() { + return delegate.indicators(); + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(final ApiHealthReport apiHealthReport, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("status", apiHealthReport.getStatus()); + jsonGenerator.writeObjectField("symptom", apiHealthReport.getSymptom()); + jsonGenerator.writeObjectField("indicators", apiHealthReport.getIndicators()); + jsonGenerator.writeEndObject(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Diagnosis.java b/logstash-core/src/main/java/org/logstash/health/Diagnosis.java new file mode 100644 index 00000000000..7e64f555638 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Diagnosis.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.UnaryOperator; + +@JsonSerialize(using = Diagnosis.JsonSerializer.class) +public final class Diagnosis { + public final String id; + public final String cause; + public final String action; + public final String helpUrl; + + private Diagnosis(final Builder builder) { + this.id = builder.id; + this.cause = builder.cause; + this.action = builder.action; + this.helpUrl = builder.helpUrl; + } + + static Builder builder() { + return new Builder(); + } + + public static class Builder { + private final String id; + private final String cause; + private final String action; + private final String helpUrl; + + public Builder() { + this(null, null, null, null); + } + + Builder(final String id, + final String cause, + final String action, + final String helpUrl) { + this.id = id; + this.cause = cause; + this.action = action; + this.helpUrl = helpUrl; + } + + public Builder withId(final String id) { + if (Objects.equals(id, this.id)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + + public Builder withCause(final String cause) { + if (Objects.equals(cause, this.cause)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + public Builder withAction(final String action) { + if (Objects.equals(action, this.action)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + public Builder withHelpUrl(final String helpUrl) { + if (Objects.equals(helpUrl, this.helpUrl)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + public Builder transform(final UnaryOperator configurator) { + return configurator.apply(this); + } + public synchronized Diagnosis build() { + return new Diagnosis(this); + } + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(Diagnosis diagnosis, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + if (diagnosis.id != null) { + jsonGenerator.writeStringField("id", diagnosis.id); + } + jsonGenerator.writeStringField("cause", diagnosis.cause); + jsonGenerator.writeStringField("action", diagnosis.action); + jsonGenerator.writeStringField("help_url", diagnosis.helpUrl); + jsonGenerator.writeEndObject(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/HealthObserver.java b/logstash-core/src/main/java/org/logstash/health/HealthObserver.java new file mode 100644 index 00000000000..63a461a74e6 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/HealthObserver.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class HealthObserver { + + private static final Logger LOGGER = LogManager.getLogger(); + + private final MultiIndicator rootIndicator = new MultiIndicator(); + private final MultiIndicator pipelinesIndicator = new MultiIndicator(); + + public HealthObserver() { + this.rootIndicator.attachIndicator("pipelines", this.pipelinesIndicator); + } + + public final Status getStatus() { + return getReport().getStatus(); + } + + public MultiIndicator getIndicator() { + return this.rootIndicator; + } + + public ApiHealthReport getReport() { + return new ApiHealthReport(this.rootIndicator.report()); + } + + public void attachPipelineIndicator(final String pipelineId, final PipelineIndicator.PipelineDetailsProvider detailsProvider) { + try { + this.pipelinesIndicator.attachIndicator(pipelineId, PipelineIndicator.forPipeline(pipelineId, detailsProvider)); + LOGGER.debug(String.format("attached pipeline indicator [%s]", pipelineId)); + } catch (final Exception e) { + LOGGER.warn(String.format("failed to attach pipeline indicator [%s]", pipelineId), e); + } + } + + public void detachPipelineIndicator(final String pipelineId) { + try { + this.pipelinesIndicator.detachIndicator(pipelineId, null); + LOGGER.debug(String.format("detached pipeline indicator [%s]", pipelineId)); + } catch (final Exception e) { + LOGGER.warn(String.format("failed to detach pipeline indicator [%s]", pipelineId), e); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/HelpUrl.java b/logstash-core/src/main/java/org/logstash/health/HelpUrl.java new file mode 100644 index 00000000000..db2586765d1 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/HelpUrl.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import org.logstash.Logstash; + +import java.util.Objects; + +public class HelpUrl { + static final String BASE_URL; + static { + final String versionAnchor; + if (Integer.parseInt(Logstash.VERSION_MAJOR) >= 9) { + versionAnchor = "master"; + } else { + versionAnchor = String.format("%s.%s", Logstash.VERSION_MAJOR, Logstash.VERSION_MINOR); + } + BASE_URL = String.format("https://www.elastic.co/guide/en/logstash/%s/", versionAnchor); + } + + public HelpUrl(final String page) { + this(page, null); + } + + public HelpUrl withAnchor(final String anchor) { + return new HelpUrl(this.page, anchor); + } + + private HelpUrl(final String page, final String anchor) { + Objects.requireNonNull(page, "page cannot be null"); + this.page = page; + this.anchor = anchor; + } + + private final String page; + private final String anchor; + + private transient String resolved; + + @Override + public String toString() { + if (resolved == null) { + final StringBuilder sb = new StringBuilder(BASE_URL); + sb.append(page).append(".html"); + if (anchor != null) { + sb.append("#").append(anchor); + } + resolved = sb.toString(); + } + return resolved; + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Impact.java b/logstash-core/src/main/java/org/logstash/health/Impact.java new file mode 100644 index 00000000000..3ccda5de2af --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Impact.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.*; +import java.util.function.UnaryOperator; + +@JsonSerialize(using=Impact.JsonSerializer.class) +public final class Impact { + public final String id; + public final int severity; + public final String description; + public final Set impactAreas; + + public Impact(final Builder builder) { + this.id = builder.id; + this.severity = Objects.requireNonNullElse(builder.severity, 0); + this.description = builder.description; + this.impactAreas = Set.copyOf(builder.impactAreas); + } + + static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String id; + private Integer severity; + private String description; + private Set impactAreas; + + public Builder() { + this.impactAreas = Set.of(); + } + + private Builder(String id, Integer severity, String description, Set impactAreas) { + this.id = id; + this.severity = severity; + this.description = description; + this.impactAreas = Set.copyOf(impactAreas); + } + + public synchronized Builder withId(final String id) { + return new Builder(id, severity, description, impactAreas); + } + + public synchronized Builder withSeverity(int severity) { + return new Builder(id, severity, description, impactAreas); + } + + public synchronized Builder withDescription(String description) { + return new Builder(id, severity, description, impactAreas); + } + + public synchronized Builder withAdditionalImpactArea(ImpactArea impactArea) { + final Set mergedImpactAreas = new HashSet<>(impactAreas); + if (!mergedImpactAreas.add(impactArea)) { + return this; + } else { + return this.withImpactAreas(mergedImpactAreas); + } + } + + public synchronized Builder withImpactAreas(Collection impactAreas) { + return new Builder(id, severity, description, Set.copyOf(impactAreas)); + } + + public synchronized Builder transform(final UnaryOperator configurator) { + return configurator.apply(this); + } + + public synchronized Impact build() { + return new Impact(this); + } + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(Impact impact, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + if (impact.id != null) { + jsonGenerator.writeStringField("id", impact.id); + } + jsonGenerator.writeNumberField("severity", impact.severity); + jsonGenerator.writeStringField("description", impact.description); + jsonGenerator.writeObjectField("impact_areas", impact.impactAreas); + jsonGenerator.writeEndObject(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/ImpactArea.java b/logstash-core/src/main/java/org/logstash/health/ImpactArea.java new file mode 100644 index 00000000000..94679c79ad9 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ImpactArea.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.Objects; + +public enum ImpactArea { + PIPELINE_EXECUTION, + ; + + private final String externalValue; + + ImpactArea(final String externalValue) { + this.externalValue = Objects.requireNonNullElseGet(externalValue, () -> name().toLowerCase()); + } + + ImpactArea() { + this(null); + } + + @JsonValue + public String externalValue() { + return this.externalValue; + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Indicator.java b/logstash-core/src/main/java/org/logstash/health/Indicator.java new file mode 100644 index 00000000000..df646e13584 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Indicator.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +public interface Indicator { + REPORT report(ReportContext reportContext); + + default REPORT report() { + return report(ReportContext.EMPTY); + } + + interface Report { + Status status(); + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/MultiIndicator.java b/logstash-core/src/main/java/org/logstash/health/MultiIndicator.java new file mode 100644 index 00000000000..d0699f7687a --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/MultiIndicator.java @@ -0,0 +1,164 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A {@code MultiIndicator} is an {@link Indicator} that combines multiple sub-{@link Indicator}s and produces a + * summarized {@link Report}. + */ +public class MultiIndicator implements Indicator { + private static final Logger LOGGER = LogManager.getLogger(); + + private final Map> indicators = new ConcurrentHashMap<>(); + + public void attachIndicator(final String name, + final Indicator indicatorToAttach) { + final Indicator existing = indicators.putIfAbsent(name, indicatorToAttach); + if (Objects.nonNull(existing) && !Objects.equals(existing, indicatorToAttach)) { + throw new IllegalArgumentException(String.format("Cannot attach indicator %s (%s) because a different one of the same name is already attached (%s).", name, indicatorToAttach, existing)); + } + LOGGER.debug("attached indicator {}=>{} (res:{})", name, indicatorToAttach, this); + } + + public void detachIndicator(final String name, + final Indicator indicatorToDetach) { + final Indicator remaining = indicators.computeIfPresent(name, (key, existing) -> Objects.isNull(indicatorToDetach) || Objects.equals(indicatorToDetach, existing) ? null : existing); + if (Objects.nonNull(remaining)) { + throw new IllegalArgumentException("Cannot detach indicator " + name + " because a different one of the same name is attached."); + } + LOGGER.debug("detached indicator {}<={} (res:{})", name, indicatorToDetach, this); + } + + public > Optional getIndicator(final String name, + final Class indicatorClass) { + return getIndicator(name).map(indicatorClass::cast); + } + + public Optional> getIndicator(final String name) { + return Optional.ofNullable(indicators.get(name)); + } + + @Override + public Report report(final ReportContext reportContext) { + LOGGER.debug("report starting with indicators {} for {}", this.indicators, reportContext); + final Status.Holder combinedStatus = new Status.Holder(); + + final Map reports = new HashMap<>(); + final Map> indicatorNamesByStatus = new HashMap<>(); + + this.indicators.forEach((indicatorName, indicator) -> { + if (reportContext.isMuted(indicatorName)) { + LOGGER.trace("sub-indicator {} is muted for {}", indicatorName, reportContext); + } else { + reportContext.descend(indicatorName, (scopedContext) -> { + final Indicator.Report report = indicator.report(scopedContext); + + combinedStatus.reduce(report.status()); + reports.put(indicatorName, report); + indicatorNamesByStatus.computeIfAbsent(report.status(), k -> new HashSet<>()).add(indicatorName); + }); + } + }); + + final StringBuilder symptom = new StringBuilder(); + // to highlight indicators by most-degraded status, we summarize in reverse-order + final List summaryByStatus = new ArrayList<>(indicatorNamesByStatus.size()); + for (int i = Status.values().length - 1; i >= 0; i--) { + final Status summarizingStatus = Status.values()[i]; + if (indicatorNamesByStatus.containsKey(summarizingStatus)) { + final Set indicatorNames = indicatorNamesByStatus.get(summarizingStatus); + summaryByStatus.add(String.format("%s "+(indicatorNames.size()==1 ? "indicator is" : "indicators are")+" %s (`%s`)", + indicatorNames.size(), + summarizingStatus.descriptiveValue(), + String.join("`, `", indicatorNames))); + } + } + if (summaryByStatus.isEmpty()) { + symptom.append("no indicators"); + } else if (summaryByStatus.size() == 1) { + symptom.append(summaryByStatus.get(0)); + } else if (summaryByStatus.size() == 2) { + symptom.append(summaryByStatus.get(0)).append(" and ").append(summaryByStatus.get(1)); + } else { + final int lastIndex = summaryByStatus.size() - 1; + symptom.append(String.join(", ", summaryByStatus.subList(0, lastIndex))) + .append(", and ").append(summaryByStatus.get(lastIndex)); + } + + return new Report(combinedStatus.value(), symptom.toString(), reports); + } + + @Override + public String toString() { + return "MultiIndicator{" + + "indicators=" + indicators + + '}'; + } + + @JsonSerialize(using=Report.JsonSerializer.class) + public static class Report implements Indicator.Report { + private final Status status; + private final String symptom; + private final Map indicators; + + Report(final Status status, + final String symptom, + final Map indicators) { + this.status = status; + this.symptom = symptom; + this.indicators = Map.copyOf(indicators); + } + + @Override + public Status status() { + return this.status; + } + + public String symptom() { + return this.symptom; + } + + public Map indicators() { + return this.indicators; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(final Report report, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("status", report.status()); + jsonGenerator.writeStringField("symptom", report.symptom); + jsonGenerator.writeObjectField("indicators", report.indicators()); + jsonGenerator.writeEndObject(); + } + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java b/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java new file mode 100644 index 00000000000..3f6821fec4f --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java @@ -0,0 +1,195 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.Objects; + +import static org.logstash.health.Status.*; + +/** + * A {@code PipelineIndicator} is a specialized {@link ProbeIndicator} that is meant for assessing the health of + * an individual pipeline. + */ +public class PipelineIndicator extends ProbeIndicator { + + public static PipelineIndicator forPipeline(final String pipelineId, + final PipelineDetailsProvider pipelineDetailsProvider) { + PipelineIndicator pipelineIndicator = new PipelineIndicator(new DetailsSupplier(pipelineId, pipelineDetailsProvider)); + pipelineIndicator.attachProbe("status", new StatusProbe()); + return pipelineIndicator; + } + + private PipelineIndicator(final DetailsSupplier detailsSupplier) { + super("pipeline", detailsSupplier::get); + } + + @JsonSerialize(using = Status.JsonSerializer.class) + public static class Status { + public enum State { + UNKNOWN, + LOADING, + RUNNING, + FINISHED, + TERMINATED, + } + + public static final Status UNKNOWN = new Status(State.UNKNOWN); + public static final Status LOADING = new Status(State.LOADING); + public static final Status RUNNING = new Status(State.RUNNING); + public static final Status FINISHED = new Status(State.FINISHED); + public static final Status TERMINATED = new Status(State.TERMINATED); + + private final State state; + public Status(final State state) { + this.state = state; + } + public State getState() { + return state; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(Status value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeStartObject(); + gen.writeStringField("state", value.getState().toString()); + gen.writeEndObject(); + } + } + } + + @JsonSerialize(using = Details.JsonSerializer.class) + public static class Details implements Observation { + private final Status status; + + public Details(final Status status) { + this.status = Objects.requireNonNull(status, "status cannot be null"); + } + public Status getStatus() { + return this.status; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer
{ + @Override + public void serialize(final Details details, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("status", details.getStatus()); + jsonGenerator.writeEndObject(); + } + } + } + + /** + * This interface is implemented by the ruby-Agent + */ + @FunctionalInterface + public interface PipelineDetailsProvider { + Details pipelineDetails(final String pipelineId); + } + + public static class DetailsSupplier { + private final String pipelineId; + private final PipelineDetailsProvider pipelineDetailsProvider; + DetailsSupplier(final String pipelineId, + final PipelineDetailsProvider pipelineDetailsProvider) { + this.pipelineId = pipelineId; + this.pipelineDetailsProvider = pipelineDetailsProvider; + } + + public Details get() { + return this.pipelineDetailsProvider.pipelineDetails(pipelineId); + } + } + + static class StatusProbe implements Probe
{ + static final Impact.Builder NOT_PROCESSING = Impact.builder() + .withId(impactId("not_processing")) + .withDescription("the pipeline is not currently processing") + .withAdditionalImpactArea(ImpactArea.PIPELINE_EXECUTION); + + static final HelpUrl HELP_URL = new HelpUrl("health-report-pipeline-status"); + + @Override + public Analysis analyze(final Details details) { + switch (details.getStatus().getState()) { + case LOADING: + return Analysis.builder() + .withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("loading")) + .withCause("pipeline is loading") + .withAction("if pipeline does not come up quickly, you may need to check the logs to see if it is stalled") + .withHelpUrl(HELP_URL.withAnchor("loading").toString())) + .withImpact(NOT_PROCESSING.withSeverity(1).withDescription("pipeline is loading").build()) + .build(); + case RUNNING: + return Analysis.builder() + .withStatus(GREEN) + .build(); + case FINISHED: + return Analysis.builder() + .withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("finished")) + .withCause("pipeline has finished running because its inputs have been closed and events have been processed") + .withAction("if you expect this pipeline to run indefinitely, you will need to configure its inputs to continue receiving or fetching events") + .withHelpUrl(HELP_URL.withAnchor("finished").toString())) + .withImpact(NOT_PROCESSING.withSeverity(10).withDescription("pipeline has finished running").build()) + .build(); + case TERMINATED: + return Analysis.builder() + .withStatus(RED) + .withDiagnosis(db -> db + .withId(diagnosisId("terminated")) + .withCause("pipeline is not running, likely because it has encountered an error") + .withAction("view logs to determine the cause of abnormal pipeline shutdown") + .withHelpUrl(HELP_URL.withAnchor("terminated").toString())) + .withImpact(NOT_PROCESSING.withSeverity(1).build()) + .build(); + case UNKNOWN: + default: + return Analysis.builder() + .withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("unknown")) + .withCause("pipeline is not known; it may have been recently deleted or failed to start") + .withAction("view logs to determine if the pipeline failed to start") + .withHelpUrl(HELP_URL.withAnchor("unknown").toString())) + .withImpact(NOT_PROCESSING.withSeverity(2).build()) + .build(); + } + } + + static String diagnosisId(final String state) { + return String.format("logstash:health:pipeline:status:diagnosis:%s", state); + } + + static String impactId(final String state) { + return String.format("logstash:health:pipeline:status:impact:%s", state); + } + + + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Probe.java b/logstash-core/src/main/java/org/logstash/health/Probe.java new file mode 100644 index 00000000000..ec9cf71d2b4 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Probe.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import java.util.Objects; +import java.util.function.UnaryOperator; + +public interface Probe { + Analysis analyze(OBSERVATION observation); + + final class Analysis { + public final Status status; + public final Diagnosis diagnosis; + public final Impact impact; + + Analysis(final Builder builder) { + this.status = builder.status; + this.diagnosis = builder.diagnosis; + this.impact = builder.impact; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private final Status status; + private final Diagnosis diagnosis; + private final Impact impact; + + public Builder() { + this(Status.UNKNOWN, null, null); + } + + public Builder(final Status status, + final Diagnosis diagnosis, + final Impact impact) { + this.status = status; + this.diagnosis = diagnosis; + this.impact = impact; + } + + public Builder withStatus(final Status status) { + if (Objects.equals(this.status, status)) { + return this; + } + return new Builder(status, this.diagnosis, this.impact); + } + + public Builder withDiagnosis(final Diagnosis diagnosis) { + if (Objects.equals(this.diagnosis, diagnosis)) { + return this; + } + return new Builder(status, diagnosis, impact); + } + + public Builder withDiagnosis(final UnaryOperator diagnosisConfigurator) { + return this.withDiagnosis(Diagnosis.builder().transform(diagnosisConfigurator).build()); + } + + public synchronized Builder withImpact(final Impact impact) { + if (Objects.equals(this.impact, impact)) { + return this; + } + return new Builder(status, this.diagnosis, impact); + } + + public Builder withImpact(final UnaryOperator impactConfigurator) { + return this.withImpact(Impact.builder().transform(impactConfigurator).build()); + } + + public synchronized Analysis build() { + return new Analysis(this); + } + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/ProbeIndicator.java b/logstash-core/src/main/java/org/logstash/health/ProbeIndicator.java new file mode 100644 index 00000000000..ddb4fce6c4b --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ProbeIndicator.java @@ -0,0 +1,205 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * A {@code ProbeIndicator} is an {@link Indicator} that has one or more {@link Probe}s attached, and can be used + * to produce a {@link Report}. + */ +public class ProbeIndicator implements Indicator> { + private static final Logger LOGGER = LogManager.getLogger(); + + // Marker Interface + public interface Observation {} + + @FunctionalInterface + public interface Observer extends Supplier {} + + private final String subject; + private final Observer observer; + + private final Map> probes = new ConcurrentHashMap<>(); + + public ProbeIndicator(final String subject, + final Observer observer, + final Map> probes) { + this(subject, observer); + probes.forEach(this::attachProbe); + } + + public ProbeIndicator(final String subject, + final Observer observer) { + this.subject = subject; + this.observer = observer; + } + + public final void attachProbe(final String name, + final Probe probeToAttach) { + final Probe existing = probes.putIfAbsent(name, probeToAttach); + if (Objects.nonNull(existing) && !Objects.equals(existing, probeToAttach)) { + throw new IllegalArgumentException("Cannot attach probe " + name + " because a different one of the same name is already attached."); + } + } + + public final void detachProbe(final String name, + final Probe probeToDetach) { + Probe remaining = probes.computeIfPresent(name, (key, existing) -> Objects.equals(probeToDetach, existing) ? null : existing); + if (Objects.nonNull(remaining)) { + throw new IllegalArgumentException("Cannot detach probe " + name + " because a different one of the same name is attached."); + } + } + + public final void detachProbe(final String name) { + probes.remove(name); + } + + Probe getProbe(final String name) { + return probes.get(name); + } + + @Override + public Report report(final ReportContext reportContext) { + + LOGGER.debug("report starting with {} probes {}", this.probes.keySet(), reportContext); + final OBSERVATION observation = observer.get(); + + final Status.Holder combinedStatus = new Status.Holder(); + final List diagnoses = new ArrayList<>(); + final List impacts = new ArrayList<>(); + final Set distinctImpactAreas = new HashSet<>(); + + for (Map.Entry> probeEntry : this.probes.entrySet()) { + final String probeName = probeEntry.getKey(); + final Probe.Analysis probeAnalysis = probeEntry.getValue().analyze(observation); + LOGGER.trace("probe {}: {}", probeName, probeAnalysis); + + if (reportContext.isMuted(probeName)) { + LOGGER.trace("probe {} is muted", probeName); + } else { + combinedStatus.reduce(probeAnalysis.status); + Optional.ofNullable(probeAnalysis.diagnosis) + .ifPresent(diagnoses::add); + Optional.ofNullable(probeAnalysis.impact) + .filter(impacts::add) + .map(impact -> impact.impactAreas) + .ifPresent(distinctImpactAreas::addAll); + } + } + + final Status status = combinedStatus.value(); + final StringBuilder symptomBuilder = new StringBuilder(); + symptomBuilder.append(String.format("The %s is %s", this.subject, status.descriptiveValue())); + if (distinctImpactAreas.size() + diagnoses.size() > 0) { + symptomBuilder.append("; ") + .append(String.format(distinctImpactAreas.size() == 1 ? "%s area is impacted" : "%s areas are impacted", distinctImpactAreas.size())) + .append(" and ") + .append(String.format(diagnoses.size() == 1 ? "%s diagnosis is available" : "%s diagnoses are available", diagnoses.size())); + } + final String symptom = symptomBuilder.toString(); + + return new Report<>(status, observation, symptom, diagnoses, impacts); + } + + @Override + public String toString() { + return "ProbeIndicator{" + + "observer=" + observer + + ", probes=" + probes + + '}'; + } + + @JsonSerialize(using=Report.JsonSerializer.class) + public static class Report
implements Indicator.Report { + private final Status status; + private final DETAILS details; + private final String symptom; + private final List diagnosis; + + private final List impacts; + + public Report(final Status status, + final DETAILS details, + final String symptom, + final List diagnosis, + final List impacts) { + this.status = status; + this.details = details; + this.symptom = symptom; + this.diagnosis = List.copyOf(diagnosis); + this.impacts = List.copyOf(impacts); + } + public Status status() { + return status; + } + public DETAILS details() { + return details; + } + public String symptom() { + return symptom; + } + public List diagnosis() { + return diagnosis; + } + public List impacts() { + return impacts; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer> { + @Override + public void serialize(final Report report, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + + jsonGenerator.writeObjectField("status", report.status); + jsonGenerator.writeStringField("symptom", report.symptom); + + if (Objects.nonNull(report.diagnosis) && !report.diagnosis.isEmpty()) { + jsonGenerator.writeObjectField("diagnosis", report.diagnosis); + } + + if (Objects.nonNull(report.impacts) && !report.impacts.isEmpty()) { + jsonGenerator.writeObjectField("impacts", report.impacts); + } + + jsonGenerator.writeObjectField("details", report.details); + + jsonGenerator.writeEndObject(); + } + } + + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/ReportContext.java b/logstash-core/src/main/java/org/logstash/health/ReportContext.java new file mode 100644 index 00000000000..9dc2970fcd4 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ReportContext.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * A {@code ReportContext} is used when building an {@link Indicator.Report} to provide contextual configuration + * for a specific {@link Indicator} that is being reported on. + */ +public class ReportContext { + private final List path; + + public static final ReportContext EMPTY = new ReportContext(List.of()); + + ReportContext(final List path) { + this.path = List.copyOf(path); + } + + public ReportContext descend(final String node) { + final ArrayList newPath = new ArrayList<>(path); + newPath.add(node); + return new ReportContext(newPath); + } + + public void descend(final String node, final Consumer consumer) { + consumer.accept(this.descend(node)); + } + + public boolean isMuted(final String childNodeName) { + return false; + } + + @Override + public String toString() { + return "ReportContext{" + + "path=" + path + + '}'; + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Status.java b/logstash-core/src/main/java/org/logstash/health/Status.java new file mode 100644 index 00000000000..ba0ce68a3fd --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Status.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.annotation.JsonValue; + +public enum Status { + GREEN("healthy"), + UNKNOWN("unknown"), + YELLOW("concerning"), + RED("unhealthy"), + ; + + private final String externalValue = name().toLowerCase(); + private final String descriptiveValue; + + Status(String descriptiveValue) { + this.descriptiveValue = descriptiveValue; + } + + @JsonValue + public String externalValue() { + return externalValue; + } + + public String descriptiveValue() { return descriptiveValue; } + + /** + * Combine this status with another status. + * This method is commutative. + * @param status the other status + * @return the more-degraded of the two statuses. + */ + public Status reduce(Status status) { + if (compareTo(status) >= 0) { + return this; + } else { + return status; + } + } + + public static class Holder { + private Status status = Status.GREEN; + public synchronized Status reduce(Status status) { + return this.status = this.status.reduce(status); + } + public synchronized Status value() { + return this.status; + } + } +} diff --git a/logstash-core/src/test/java/org/logstash/health/ProbeIndicatorTest.java b/logstash-core/src/test/java/org/logstash/health/ProbeIndicatorTest.java new file mode 100644 index 00000000000..0ec8601112a --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/health/ProbeIndicatorTest.java @@ -0,0 +1,128 @@ +package org.logstash.health; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class ProbeIndicatorTest { + + @Test + public void attachProbeWhenNotExists() throws Exception { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + final Probe probeToAdd = new ProbeImplementation(); + probeIndicator.attachProbe("new", probeToAdd); + + assertThat(probeIndicator.getProbe("new"), is(probeToAdd)); + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + } + + @Test + public void attachProbeWhenExists() throws Exception { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + final Probe probeToAdd = new ProbeImplementation(); + assertThrows(IllegalArgumentException.class, () -> probeIndicator.attachProbe("existing", probeToAdd)); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + } + + @Test + public void attachProbeWhenAttached() throws Exception { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + // attach it again + probeIndicator.attachProbe("existing", existingProbe); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + } + + + @Test + public void detachProbeByNameWhenAttached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe existingProbeToRemove = new ProbeImplementation(); + probeIndicator.attachProbe("to_remove", existingProbeToRemove); + + probeIndicator.detachProbe("to_remove"); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByNameWhenDetached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + probeIndicator.detachProbe("to_remove"); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByValueWhenAttached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe existingProbeToRemove = new ProbeImplementation(); + probeIndicator.attachProbe("to_remove", existingProbeToRemove); + + probeIndicator.detachProbe("to_remove", existingProbeToRemove); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByValueWhenDetached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe existingProbeToRemove = new ProbeImplementation(); + + probeIndicator.detachProbe("to_remove", existingProbeToRemove); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByValueWhenConflict() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe anotherProbeToRemove = new ProbeImplementation(); + + assertThrows(IllegalArgumentException.class, () -> probeIndicator.detachProbe("existing", anotherProbeToRemove)); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void report() { + } + + private static class ProbeSubject implements ProbeIndicator.Observation {} + + private static class ProbeImplementation implements Probe { + @Override + public Analysis analyze(ProbeSubject observation) { + return Analysis.builder().build(); + } + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/health/StatusTest.java b/logstash-core/src/test/java/org/logstash/health/StatusTest.java new file mode 100644 index 00000000000..32eca41d593 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/health/StatusTest.java @@ -0,0 +1,106 @@ +package org.logstash.health; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.stream.Collectors; + +import static com.google.common.collect.Collections2.orderedPermutations; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.logstash.health.Status.*; + +@RunWith(Enclosed.class) +public class StatusTest { + + public static class Tests { + @Test + public void testReduceUnknown() { + assertThat(UNKNOWN.reduce(UNKNOWN), is(UNKNOWN)); + assertThat(UNKNOWN.reduce(GREEN), is(UNKNOWN)); + assertThat(UNKNOWN.reduce(YELLOW), is(YELLOW)); + assertThat(UNKNOWN.reduce(RED), is(RED)); + } + + @Test + public void testReduceGreen() { + assertThat(GREEN.reduce(UNKNOWN), is(UNKNOWN)); + assertThat(GREEN.reduce(GREEN), is(GREEN)); + assertThat(GREEN.reduce(YELLOW), is(YELLOW)); + assertThat(GREEN.reduce(RED), is(RED)); + } + + @Test + public void testReduceYellow() { + assertThat(YELLOW.reduce(UNKNOWN), is(YELLOW)); + assertThat(YELLOW.reduce(GREEN), is(YELLOW)); + assertThat(YELLOW.reduce(YELLOW), is(YELLOW)); + assertThat(YELLOW.reduce(RED), is(RED)); + } + + @Test + public void testReduceRed() { + assertThat(RED.reduce(UNKNOWN), is(RED)); + assertThat(RED.reduce(GREEN), is(RED)); + assertThat(RED.reduce(YELLOW), is(RED)); + assertThat(RED.reduce(RED), is(RED)); + } + } + + @RunWith(Parameterized.class) + public static class JacksonSerialization { + @Parameters(name = "{0}") + public static Iterable data() { + return EnumSet.allOf(Status.class); + } + + @Parameter + public Status status; + + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + public void testSerialization() throws Exception { + assertThat(mapper.writeValueAsString(status), is(equalTo('"' + status.name().toLowerCase() + '"'))); + } + } + + @RunWith(Parameterized.class) + public static class ReduceCommutativeSpecification { + @Parameters(name = "{0}<=>{1}") + public static Collection data() { + return getCombinations(EnumSet.allOf(Status.class), 2); + } + + @Parameter(0) + public Status statusA; + @Parameter(1) + public Status statusB; + + @Test + public void testReduceCommutative() { + assertThat(statusA.reduce(statusB), is(statusB.reduce(statusA))); + } + + private static > List getCombinations(Collection source, int count) { + return orderedPermutations(source).stream() + .map((l) -> l.subList(0, count)) + .map(ArrayList::new).peek(Collections::sort) + .collect(Collectors.toSet()) + .stream() + .map(List::toArray) + .collect(Collectors.toList()); + } + } +} \ No newline at end of file diff --git a/qa/support/logstash-integration-failure_injector/README.md b/qa/support/logstash-integration-failure_injector/README.md new file mode 100644 index 00000000000..bfa4a01977b --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/README.md @@ -0,0 +1,10 @@ +# About the plugin + +This plugin is for Logstash pipelines test purpose _only_. + +# How to apply changes + +When you change the source code, make sure to build a gem file. +```shell +gem build logstash-integration-failure_injector.gemspec +``` diff --git a/qa/support/logstash-integration-failure_injector/lib/logstash/filters/failure_injector.rb b/qa/support/logstash-integration-failure_injector/lib/logstash/filters/failure_injector.rb new file mode 100644 index 00000000000..919118a0f6b --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/lib/logstash/filters/failure_injector.rb @@ -0,0 +1,91 @@ +# encoding: utf-8 + +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require 'logstash/inputs/base' +require 'logstash/namespace' + +require_relative '../utils/failure_injector_util' + +class LogStash::Filters::FailureInjector < LogStash::Filters::Base + + config_name "failure_injector" + + # Defines the phases where plugin needs to make a pipeline degraded state. + # Accepts one or any of [register, filter, close] + # Note that, order of the phases doesn't matter as it obeys on plugin phase + # Example config to degrade the pipeline status at register and receive phase: + # failure_injector { + # degrade_at => ['register', 'filter'] + # } + config :degrade_at, :validate => :array, :default => [] + + # Defines the phases where plugin needs to be crashed, causes pipeline terminated. + # Accepts one of [register, filter, close] + # Example config to degrade the pipeline status at register and receive phase: + # failure_injector { + # crash_at => 'register' + # } + # + # Note that, order of the phases doesn't matter as it obeys on plugin phase + # Plugin doesn't validate the order, if `crash_at` is combined with `degrade_at`, and plugin cannot simulate degraded state after crash phase + # Example, + # failure_injector { + # crash_at => 'register' + # degrade_at => ['filter'] + # } + config :crash_at, :validate => :string + + def initialize(params) + FailureInjectorUtils.validate_config('filter', params) + super + end + + def register + @logger.debug("Registering plugin") + degrade_or_crash_if_required('register') + end + + def filter(event) + @logger.trace("Received the event to filter: #{event}") + degrade_or_crash_if_required('filter') + end + + def close + @logger.debug("Plugin is closing") + degrade_or_crash_if_required('close') + end + + def degrade_or_crash_if_required(phase) + degrade(phase) if @degrade_at.include?(phase) + crash(phase) if @crash_at && @crash_at == phase + end + + def degrade(phase) + @logger.debug("Degraded at #{phase} phase") + (1..100).each { |i| + sleep(i * 0.005) + } + end + + def crash(phase) + @logger.debug("Crashing at #{phase} phase") + raise "`logstash-filter-failure_injector` is crashing at #{phase} phase" + end + +end diff --git a/qa/support/logstash-integration-failure_injector/lib/logstash/outputs/failure_injector.rb b/qa/support/logstash-integration-failure_injector/lib/logstash/outputs/failure_injector.rb new file mode 100644 index 00000000000..5e836a93f2e --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/lib/logstash/outputs/failure_injector.rb @@ -0,0 +1,91 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# encoding: utf-8 + +require 'logstash/inputs/base' +require 'logstash/namespace' + +require_relative '../utils/failure_injector_util' + +class LogStash::Outputs::FailureInjector < LogStash::Outputs::Base + + config_name "failure_injector" + + # Defines the phases where plugin needs to make a pipeline degraded state. + # Accepts one or any of [register, receive, close] + # Note that, order of the phases doesn't matter as it obeys on plugin phase + # Example config to degrade the pipeline status at register and receive phase: + # failure_injector { + # degrade_at => ['register', 'receive'] + # } + config :degrade_at, :validate => :array, :default => [] + + # Defines the phases where plugin needs to be crashed, causes pipeline terminated. + # Accepts one of [register, receive, close] + # Example config to degrade the pipeline status at register and receive phase: + # failure_injector { + # crash_at => 'register' + # } + # + # Note that, order of the phases doesn't matter as it obeys on plugin phase + # Plugin doesn't validate the order, if `crash_at` is combined with `degrade_at`, and plugin cannot simulate degraded state after crash phase + # Example, + # failure_injector { + # crash_at => 'register' + # degrade_at => ['receive'] + # } + config :crash_at, :validate => :string + + def initialize(params) + FailureInjectorUtils.validate_config('output', params) + super + end + + def register + @logger.debug("Registering plugin") + degrade_or_crash_if_required('register') + end + + def multi_receive(events) + @logger.trace("Received #{events.size} size of events") + degrade_or_crash_if_required('receive') + end + + def close + @logger.debug("Plugin is closing") + degrade_or_crash_if_required('close') + end + + def degrade_or_crash_if_required(phase) + degrade(phase) if @degrade_at.include?(phase) + crash(phase) if @crash_at && @crash_at == phase + end + + def degrade(phase) + @logger.debug("Degraded at #{phase} phase") + (1..100).each { |i| + sleep(i * 0.01) + } + end + + def crash(phase) + @logger.debug("Crashing at #{phase} phase") + raise "`logstash-output-failure_injector` is crashing at #{phase} phase" + end + +end diff --git a/qa/support/logstash-integration-failure_injector/lib/logstash/utils/failure_injector_util.rb b/qa/support/logstash-integration-failure_injector/lib/logstash/utils/failure_injector_util.rb new file mode 100644 index 00000000000..7a2f0af9c13 --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/lib/logstash/utils/failure_injector_util.rb @@ -0,0 +1,37 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# encoding: utf-8 + +class FailureInjectorUtils + + def self.validate_config(type, params) + type_error_message = "`logstash-integration-failure_injector` accepts 'filter' or 'output' type." + raise type_error_message unless type + raise type_error_message unless %w(filter output).include?(type) + + plugin_phase = type == 'output' ? 'receive' : 'filter' + accepted_configs = ['register', "#{plugin_phase}", 'close'] + config_error_message = "failure_injector #{type} plugin accepts #{accepted_configs} configs but received" + params['degrade_at']&.each do | degrade_phase | + raise "#{config_error_message} #{degrade_phase}" unless accepted_configs.include?(degrade_phase) + end + + crash_at = params['crash_at'] + raise "#{config_error_message} #{crash_at}" if crash_at && !accepted_configs.include?(crash_at) + end +end \ No newline at end of file diff --git a/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector-0.0.1.gem b/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector-0.0.1.gem new file mode 100644 index 00000000000..e241aaf9420 Binary files /dev/null and b/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector-0.0.1.gem differ diff --git a/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector.gemspec b/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector.gemspec new file mode 100644 index 00000000000..5525dcf4f51 --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector.gemspec @@ -0,0 +1,26 @@ + +Gem::Specification.new do |s| + s.name = "logstash-integration-failure_injector" + s.version = "0.0.1" + s.licenses = ["Apache-2.0"] + s.summary = "A collection of Logstash plugins that halp simulating abnormal cases during the tests." + s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname." + s.authors = ["Elastic"] + s.email = "info@elastic.co" + s.homepage = "https://www.elastic.co/logstash" + s.metadata = { + "logstash_plugin" => "true", + "logstash_group" => "integration", + "integration_plugins" => %w( + logstash-filter-failure_injector + logstash-output-failure_injector + ).join(",") + } + + s.files = Dir["lib/**/*","spec/**/*","*.gemspec"] + s.test_files = s.files.grep(%r{^(test|spec|features)/}) + + s.add_runtime_dependency "logstash-core-plugin-api", ">= 2.1.12", "<= 2.99" + + s.add_development_dependency "logstash-devutils" +end diff --git a/qa/support/logstash-integration-failure_injector/spec/unit/failure_injector_spec.rb b/qa/support/logstash-integration-failure_injector/spec/unit/failure_injector_spec.rb new file mode 100644 index 00000000000..d455d8b66cc --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/spec/unit/failure_injector_spec.rb @@ -0,0 +1,129 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# encoding: utf-8 + +require "logstash/devutils/rspec/spec_helper" +require "logstash/filters/failure_injector" +require "logstash/outputs/failure_injector" + +%w(filter output).each do | plugin_type | + instance = plugin_type == 'filter' ? LogStash::Filters::FailureInjector : LogStash::Outputs::FailureInjector + phase = plugin_type == 'filter' ? 'filter' : 'receive' + describe instance do + let(:params) { { 'degrade_at' => [], 'crash_at' => nil } } + let(:event) { LogStash::Event.new } + let(:plugin) { described_class.new(params) } + + before do + allow(plugin).to receive(:@logger).and_return(double('logger', :debug => nil, :trace => nil)) + end + + describe 'plugin base' do + subject { described_class } + it { is_expected.to be_a_kind_of Class } + it { is_expected.to be <= (plugin_type == 'filter' ? LogStash::Filters::Base : LogStash::Outputs::Base) } + it { is_expected.to have_attributes(:config_name => "failure_injector") } + end + + shared_examples 'a phase that can degrade or crash' do |phase| + context "when degrades at #{phase}" do + let(:params) { { 'degrade_at' => [phase] } } + + it 'calls the degrade method' do + expect(plugin).to receive(:degrade).with(phase) + case phase + when 'filter' + plugin.filter(event) + when 'receive' + plugin.multi_receive([event]) + else + plugin.send(phase) + end + end + end + + context "when crashes at #{phase}" do + let(:params) { { 'crash_at' => phase } } + + it 'raises a crash error' do + case phase + when 'filter' + expect { plugin.filter(event) }.to raise_error(RuntimeError, /crashing at #{phase}/) + when 'receive' + expect { plugin.multi_receive([event]) }.to raise_error(RuntimeError, /crashing at #{phase}/) + else + expect { plugin.send(phase) }.to raise_error(RuntimeError, /crashing at #{phase}/) + end + end + end + end + + describe '#initialize' do + context 'when valid params are passed' do + let(:params) { { 'degrade_at' => [], 'crash_at' => nil } } + + it 'does not raise any error' do + expect { described_class.new(params) }.not_to raise_error + end + end + + context 'when invalid params are passed' do + it 'raises an error on invalid config' do + configs = ["register", plugin_type == 'filter' ? "filter" : "receive", "close"] + expect { + described_class.new('degrade_at' => ['invalid'], 'crash_at' => 'invalid') + }.to raise_error("failure_injector #{plugin_type} plugin accepts #{configs} configs but received invalid") + end + end + end + + describe '#register' do + it_behaves_like 'a phase that can degrade or crash', 'register' + end + + if plugin_type == 'filter' + describe '#filter' do + it_behaves_like 'a phase that can degrade or crash', 'filter' + end + end + + if plugin_type == 'output' + describe '#receive' do + it_behaves_like 'a phase that can degrade or crash', 'receive' + end + end + + describe '#close' do + it_behaves_like 'a phase that can degrade or crash', 'close' + end + + describe '#degrade' do + it 'sleeps for a certain period of time' do + expect(plugin).to receive(:sleep).at_least(:once) + plugin.degrade('filter') + end + end + + describe '#crash' do + it 'raises an error with the phase' do + expect { plugin.crash(phase) }.to raise_error(RuntimeError, /crashing at #{phase}/) + end + end + end +end +