From c4faaef2f5b95bfd125192723e54be9245c7e5be Mon Sep 17 00:00:00 2001 From: Papa Bakary Camara Date: Mon, 15 Apr 2024 19:25:05 +0200 Subject: [PATCH 1/3] Add label for differentiating OCP and HyperShift archives The "archive" label allows us to differentiate the type of archive we are handling. Since that information is known only after extracting the archive, the following changes have been implemented: - "archive" label was added to StatsWatcher - The new label is only used in metrics that are updated after the extraction of the archive: - ccx_consumer_extracted_total - ccx_engine_processed_total - ccx_published_total - ccx_failures_total - ccx_download_duration_seconds and ccx_process_duration_seconds if there was a failure during download or processing of the archive - ccx_publish_duration_seconds - ccx_engine_processed_timeout_total - The label's value is reset to 'ocp' everytime a new archive is received (when `on_recv` is fired). --- ccx_messaging/watchers/stats_watcher.py | 72 ++++++++++++++++++++----- test/watchers/stats_watcher_test.py | 42 ++++++++------- 2 files changed, 82 insertions(+), 32 deletions(-) diff --git a/ccx_messaging/watchers/stats_watcher.py b/ccx_messaging/watchers/stats_watcher.py index fd7eb28..799cde3 100644 --- a/ccx_messaging/watchers/stats_watcher.py +++ b/ccx_messaging/watchers/stats_watcher.py @@ -24,8 +24,9 @@ LOG = logging.getLogger(__name__) - - +# A label is added to the metrics so we can differentiate OCP and HyperShift archives +ARCHIVE_TYPE_LABEL = 'archive' +ARCHIVE_TYPE_VALUES = ['ocp', 'hypershift'] # pylint: disable=too-many-instance-attributes class StatsWatcher(ConsumerWatcher): """A Watcher that stores different Prometheus `Counter`s.""" @@ -37,6 +38,7 @@ def __init__(self, prometheus_port=8000): self._recv_total = Counter( "ccx_consumer_received_total", "Counter of received Kafka messages" ) + self._filtered_total = Counter( "ccx_consumer_filtered_total", "Counter of filtered Kafka messages" ) @@ -45,16 +47,20 @@ def __init__(self, prometheus_port=8000): "ccx_downloaded_total", "Histogram of the size of downloaded items" ) + self._extracted_total = Counter( + "ccx_consumer_extracted_total", "Counter of extracted archives", [ARCHIVE_TYPE_LABEL] + ) + self._processed_total = Counter( - "ccx_engine_processed_total", "Counter of files processed by the OCP Engine" + "ccx_engine_processed_total", "Counter of files processed by the OCP Engine", [ARCHIVE_TYPE_LABEL] ) self._published_total = Counter( - "ccx_published_total", "Counter of reports successfully published" + "ccx_published_total", "Counter of reports successfully published", [ARCHIVE_TYPE_LABEL] ) self._failures_total = Counter( - "ccx_failures_total", "Counter of failures during the pipeline" + "ccx_failures_total", "Counter of failures during the pipeline", [ARCHIVE_TYPE_LABEL] ) self._not_handling_total = Counter( @@ -69,16 +75,19 @@ def __init__(self, prometheus_port=8000): self._process_duration = Histogram( "ccx_process_duration_seconds", "Histogram of durations of processing archives by the OCP engine", + [ARCHIVE_TYPE_LABEL], ) self._publish_duration = Histogram( "ccx_publish_duration_seconds", "Histogram of durations of publishing the OCP engine results", + [ARCHIVE_TYPE_LABEL], ) self._processed_timeout_total = Counter( "ccx_engine_processed_timeout_total", "Counter of timeouts while processing archives", + [ARCHIVE_TYPE_LABEL] ) self._start_time = None @@ -86,20 +95,38 @@ def __init__(self, prometheus_port=8000): self._processed_time = None self._published_time = None + # Archive type used in the metrics is set within on_extract, as we need + # to extract the archive in order to know that information + self._archive_type = 'ocp' + + self._initialize_metrics_with_labels() + start_http_server(prometheus_port) LOG.info("StatWatcher created and listening on port %s", prometheus_port) + def on_recv(self, input_msg): """On received event handler.""" self._recv_total.inc() self._start_time = time.time() self._reset_times() + self._reset_archive_type() def on_filter(self): """On filter event handler.""" self._filtered_total.inc() + def on_extract(self, ctx, broker, extraction): + """ + Fired just after the archive is extracted but before any analysis. + """ + # Set archive_type label to hypershift if config/infrastructure.json is found + hcp_config_file = os.path.join(extraction.tmp_dir, "config", "infrastructure.json") + if os.path.exists(hcp_config_file): + self._archive_type = "hypershift" + self._extracted_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc() + def on_download(self, path): """On downloaded event handler.""" self._downloaded_total.observe(os.path.getsize(path)) @@ -109,33 +136,33 @@ def on_download(self, path): def on_process(self, input_msg, results): """On processed event handler.""" - self._processed_total.inc() + self._processed_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc() self._processed_time = time.time() - self._process_duration.observe(self._processed_time - self._downloaded_time) + self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(self._processed_time - self._downloaded_time) def on_process_timeout(self): """On process timeout event handler.""" - self._processed_timeout_total.inc() + self._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc() def on_consumer_success(self, input_msg, broker, results): """On consumer success event handler.""" - self._published_total.inc() + self._published_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc() self._published_time = time.time() - self._publish_duration.observe(self._published_time - self._processed_time) + self._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(self._published_time - self._processed_time) def on_consumer_failure(self, input_msg, exception): """On consumer failure event handler.""" - self._failures_total.inc() + self._failures_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc() if self._downloaded_time is None: self._download_duration.observe(time.time() - self._start_time) - self._process_duration.observe(0) + self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(0) elif self._processed_time is None: - self._process_duration.observe(time.time() - self._downloaded_time) + self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(time.time() - self._downloaded_time) - self._publish_duration.observe(0) + self._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(0) def on_not_handled(self, input_msg): """On not handled messages success event handler.""" @@ -147,11 +174,28 @@ def _reset_times(self): self._processed_time = None self._published_time = None + def _reset_archive_type(self): + """Resets the _archive_type label's value to 'ocp'""" + self._archive_type = "ocp" + + def _initialize_metrics_with_labels(self): + """Metrics with labels are not initialized when declared, because the Prometheus + client can’t know what values the label can have. This will initialize them.""" + for val in ARCHIVE_TYPE_VALUES: + self._extracted_total.labels(val) + self._processed_total.labels(val) + self._published_total.labels(val) + self._failures_total.labels(val) + self._process_duration.labels(val) + self._publish_duration.labels(val) + self._processed_timeout_total.labels(val) + def __del__(self): """Destructor for handling counters unregistering.""" REGISTRY.unregister(self._recv_total) REGISTRY.unregister(self._filtered_total) REGISTRY.unregister(self._downloaded_total) + REGISTRY.unregister(self._extracted_total) REGISTRY.unregister(self._processed_total) REGISTRY.unregister(self._published_total) REGISTRY.unregister(self._failures_total) diff --git a/test/watchers/stats_watcher_test.py b/test/watchers/stats_watcher_test.py index e47c70d..afee8a7 100644 --- a/test/watchers/stats_watcher_test.py +++ b/test/watchers/stats_watcher_test.py @@ -19,7 +19,7 @@ import pytest import time -from ccx_messaging.watchers.stats_watcher import StatsWatcher +from ccx_messaging.watchers.stats_watcher import StatsWatcher, ARCHIVE_TYPE_LABEL, ARCHIVE_TYPE_VALUES _INVALID_PORTS = [None, "8000", 8000.0, 70000] @@ -35,6 +35,10 @@ def test_stats_watcher_initialize_invalid_port(value): _VALID_PORTS = [{}, {"prometheus_port": 9500}, {"prometheus_port": 80}] +@pytest.fixture(params=ARCHIVE_TYPE_VALUES) +def label_value(request): + return request.param + @pytest.mark.parametrize("value", _VALID_PORTS) @patch("ccx_messaging.watchers.stats_watcher.start_http_server") def test_stats_watcher_initialize(start_http_server_mock, value): @@ -46,13 +50,15 @@ def test_stats_watcher_initialize(start_http_server_mock, value): def check_initial_metrics_state(w): """Check that all metrics are initialized.""" - assert w._recv_total._value.get() == 0 - assert w._filtered_total._value.get() == 0 - assert w._processed_total._value.get() == 0 - assert w._processed_timeout_total._value.get() == 0 - assert w._published_total._value.get() == 0 - assert w._failures_total._value.get() == 0 - assert w._not_handling_total._value.get() == 0 + for value in ARCHIVE_TYPE_VALUES: + assert w._recv_total._value.get() == 0 + assert w._filtered_total._value.get() == 0 + assert w._extracted_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 + assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 + assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 + assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 + assert w._not_handling_total._value.get() == 0 def init_timestamps(w): @@ -65,7 +71,7 @@ def init_timestamps(w): @patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) -def test_stats_watcher_on_recv(): +def test_stats_watcher_on_recv(label_value): """Test the on_recv() method.""" input_msg = {"identity": {}} @@ -83,15 +89,15 @@ def test_stats_watcher_on_recv(): assert w._recv_total._value.get() == 1 assert w._filtered_total._value.get() == 0 assert len(w._downloaded_total._labelvalues) == 0 - assert w._processed_total._value.get() == 0 - assert w._processed_timeout_total._value.get() == 0 - assert w._published_total._value.get() == 0 - assert w._failures_total._value.get() == 0 + assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 assert w._not_handling_total._value.get() == 0 @patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) -def test_stats_watcher_on_filter(): +def test_stats_watcher_on_filter(label_value): """Test the on_filter() method.""" # construct watcher object w = StatsWatcher(prometheus_port=8001) @@ -107,10 +113,10 @@ def test_stats_watcher_on_filter(): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 1 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total._value.get() == 0 - assert w._processed_timeout_total._value.get() == 0 - assert w._published_total._value.get() == 0 - assert w._failures_total._value.get() == 0 + assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 assert w._not_handling_total._value.get() == 0 From 6a26d2007cb753797d7c84436c1f9e7294d0a2c6 Mon Sep 17 00:00:00 2001 From: Papa Bakary Camara Date: Tue, 16 Apr 2024 15:01:33 +0200 Subject: [PATCH 2/3] Update stats_watcher_test.py to take archive_type label into account --- test/watchers/stats_watcher_test.py | 90 ++++++++++++++++++----------- 1 file changed, 57 insertions(+), 33 deletions(-) diff --git a/test/watchers/stats_watcher_test.py b/test/watchers/stats_watcher_test.py index afee8a7..ef846ad 100644 --- a/test/watchers/stats_watcher_test.py +++ b/test/watchers/stats_watcher_test.py @@ -59,6 +59,11 @@ def check_initial_metrics_state(w): assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 assert w._not_handling_total._value.get() == 0 + # Check initial values of histogram metrics + assert w._downloaded_total._sum.get() == 0 + assert w._download_duration._sum.get() == 0 + assert w._process_duration.labels(**{ARCHIVE_TYPE_LABEL: value})._sum.get() == 0 + assert w._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: value})._sum.get() == 0 def init_timestamps(w): @@ -78,6 +83,7 @@ def test_stats_watcher_on_recv(label_value): # construct watcher object w = StatsWatcher(prometheus_port=8001) init_timestamps(w) + w._archive_type = label_value # check that all metrics are initialized check_initial_metrics_state(w) @@ -102,6 +108,7 @@ def test_stats_watcher_on_filter(label_value): # construct watcher object w = StatsWatcher(prometheus_port=8001) init_timestamps(w) + w._archive_type = label_value # check that all metrics are initialized check_initial_metrics_state(w) @@ -121,11 +128,12 @@ def test_stats_watcher_on_filter(label_value): @patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) -def test_stats_watcher_on_download(): +def test_stats_watcher_on_download(label_value): """Test the on_download() method.""" # construct watcher object w = StatsWatcher(prometheus_port=8002) init_timestamps(w) + w._archive_type = label_value # check that all metrics are initialized check_initial_metrics_state(w) @@ -138,21 +146,22 @@ def test_stats_watcher_on_download(): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 100 - assert w._processed_total._value.get() == 0 - assert w._processed_timeout_total._value.get() == 0 - assert w._published_total._value.get() == 0 - assert w._failures_total._value.get() == 0 + assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 assert w._not_handling_total._value.get() == 0 @patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) -def test_stats_watcher_on_process(): +def test_stats_watcher_on_process(label_value): """Test the on_process() method.""" input_msg = {"identity": {}} # construct watcher object w = StatsWatcher(prometheus_port=8003) init_timestamps(w) + w._archive_type = label_value # check that all metrics are initialized check_initial_metrics_state(w) @@ -164,19 +173,20 @@ def test_stats_watcher_on_process(): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total._value.get() == 1 - assert w._processed_timeout_total._value.get() == 0 - assert w._published_total._value.get() == 0 - assert w._failures_total._value.get() == 0 + assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 + assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 assert w._not_handling_total._value.get() == 0 @patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) -def test_stats_watcher_on_process_timeout(): +def test_stats_watcher_on_process_timeout(label_value): """Test the on_process_timeout() method.""" # construct watcher object w = StatsWatcher(prometheus_port=8004) init_timestamps(w) + w._archive_type = label_value # change metrics w.on_process_timeout() @@ -185,21 +195,22 @@ def test_stats_watcher_on_process_timeout(): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total._value.get() == 0 - assert w._processed_timeout_total._value.get() == 1 - assert w._published_total._value.get() == 0 - assert w._failures_total._value.get() == 0 + assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 + assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 assert w._not_handling_total._value.get() == 0 @patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) -def test_stats_watcher_on_consumer_success(): +def test_stats_watcher_on_consumer_success(label_value): """Test the on_consumer_success() method.""" input_msg = {"identity": {}} # construct watcher object w = StatsWatcher(prometheus_port=8005) init_timestamps(w) + w._archive_type = label_value # change metrics w.on_consumer_success(input_msg, "broker", "{result}") @@ -208,21 +219,22 @@ def test_stats_watcher_on_consumer_success(): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total._value.get() == 0 - assert w._processed_timeout_total._value.get() == 0 - assert w._published_total._value.get() == 1 - assert w._failures_total._value.get() == 0 + assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 assert w._not_handling_total._value.get() == 0 @patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) -def test_stats_watcher_on_consumer_failure(): +def test_stats_watcher_on_consumer_failure(label_value): """Test the on_consumer_failure() method.""" input_msg = {"identity": {}} # construct watcher object w = StatsWatcher(prometheus_port=8006) init_timestamps(w) + w._archive_type = label_value # change metrics w.on_consumer_failure(input_msg, Exception("something")) @@ -231,10 +243,10 @@ def test_stats_watcher_on_consumer_failure(): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total._value.get() == 0 - assert w._processed_timeout_total._value.get() == 0 - assert w._published_total._value.get() == 0 - assert w._failures_total._value.get() == 1 + assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 assert w._not_handling_total._value.get() == 0 # reset downloaded time @@ -244,7 +256,7 @@ def test_stats_watcher_on_consumer_failure(): w.on_consumer_failure(input_msg, Exception("something")) # metric should change - assert w._failures_total._value.get() == 2 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 2 # reset processed time w._processed_time = None @@ -253,7 +265,7 @@ def test_stats_watcher_on_consumer_failure(): w.on_consumer_failure(input_msg, Exception("something")) # metric should change again - assert w._failures_total._value.get() == 3 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 3 # now try this - downloaded time is not None and processed time is none w._downloaded_time = time.time() @@ -263,17 +275,18 @@ def test_stats_watcher_on_consumer_failure(): w.on_consumer_failure(input_msg, Exception("something")) # metric should change again - assert w._failures_total._value.get() == 4 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 4 @patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) -def test_stats_watcher_on_not_handled(): +def test_stats_watcher_on_not_handled(label_value): """Test the on_not_handled() method.""" input_msg = {"identity": {}} # construct watcher object w = StatsWatcher(prometheus_port=8007) init_timestamps(w) + w._archive_type = label_value # change metrics w.on_not_handled(input_msg) @@ -282,10 +295,10 @@ def test_stats_watcher_on_not_handled(): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total._value.get() == 0 - assert w._processed_timeout_total._value.get() == 0 - assert w._published_total._value.get() == 0 - assert w._failures_total._value.get() == 0 + assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 assert w._not_handling_total._value.get() == 1 @@ -307,3 +320,14 @@ def test_reset_times(): assert w._downloaded_time is None assert w._processed_time is None assert w._published_time is None + +@patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) +def test_reset_archive_type(label_value): + """Test the method _reset_times().""" + # construct watcher object + w = StatsWatcher(prometheus_port=8009) + w._archive_type = label_value + + w._reset_archive_type() + + assert w._archive_type is 'ocp' From a7751f950a7caf51da2a27e47876ee678d58af9a Mon Sep 17 00:00:00 2001 From: Papa Bakary Camara Date: Tue, 16 Apr 2024 20:28:46 +0200 Subject: [PATCH 3/3] Apply linters --- ccx_messaging/watchers/stats_watcher.py | 41 +++-- test/watchers/stats_watcher_test.py | 202 ++++++++++++++++++------ 2 files changed, 183 insertions(+), 60 deletions(-) diff --git a/ccx_messaging/watchers/stats_watcher.py b/ccx_messaging/watchers/stats_watcher.py index 799cde3..d969171 100644 --- a/ccx_messaging/watchers/stats_watcher.py +++ b/ccx_messaging/watchers/stats_watcher.py @@ -25,8 +25,10 @@ LOG = logging.getLogger(__name__) # A label is added to the metrics so we can differentiate OCP and HyperShift archives -ARCHIVE_TYPE_LABEL = 'archive' -ARCHIVE_TYPE_VALUES = ['ocp', 'hypershift'] +ARCHIVE_TYPE_LABEL = "archive" +ARCHIVE_TYPE_VALUES = ["ocp", "hypershift"] + + # pylint: disable=too-many-instance-attributes class StatsWatcher(ConsumerWatcher): """A Watcher that stores different Prometheus `Counter`s.""" @@ -52,7 +54,9 @@ def __init__(self, prometheus_port=8000): ) self._processed_total = Counter( - "ccx_engine_processed_total", "Counter of files processed by the OCP Engine", [ARCHIVE_TYPE_LABEL] + "ccx_engine_processed_total", + "Counter of files processed by the OCP Engine", + [ARCHIVE_TYPE_LABEL], ) self._published_total = Counter( @@ -87,7 +91,7 @@ def __init__(self, prometheus_port=8000): self._processed_timeout_total = Counter( "ccx_engine_processed_timeout_total", "Counter of timeouts while processing archives", - [ARCHIVE_TYPE_LABEL] + [ARCHIVE_TYPE_LABEL], ) self._start_time = None @@ -97,14 +101,13 @@ def __init__(self, prometheus_port=8000): # Archive type used in the metrics is set within on_extract, as we need # to extract the archive in order to know that information - self._archive_type = 'ocp' + self._archive_type = "ocp" self._initialize_metrics_with_labels() start_http_server(prometheus_port) LOG.info("StatWatcher created and listening on port %s", prometheus_port) - def on_recv(self, input_msg): """On received event handler.""" self._recv_total.inc() @@ -118,9 +121,7 @@ def on_filter(self): self._filtered_total.inc() def on_extract(self, ctx, broker, extraction): - """ - Fired just after the archive is extracted but before any analysis. - """ + """On extract event handler.""" # Set archive_type label to hypershift if config/infrastructure.json is found hcp_config_file = os.path.join(extraction.tmp_dir, "config", "infrastructure.json") if os.path.exists(hcp_config_file): @@ -139,7 +140,9 @@ def on_process(self, input_msg, results): self._processed_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc() self._processed_time = time.time() - self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(self._processed_time - self._downloaded_time) + self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe( + self._processed_time - self._downloaded_time + ) def on_process_timeout(self): """On process timeout event handler.""" @@ -150,7 +153,9 @@ def on_consumer_success(self, input_msg, broker, results): self._published_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc() self._published_time = time.time() - self._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(self._published_time - self._processed_time) + self._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe( + self._published_time - self._processed_time + ) def on_consumer_failure(self, input_msg, exception): """On consumer failure event handler.""" @@ -160,7 +165,9 @@ def on_consumer_failure(self, input_msg, exception): self._download_duration.observe(time.time() - self._start_time) self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(0) elif self._processed_time is None: - self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(time.time() - self._downloaded_time) + self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe( + time.time() - self._downloaded_time + ) self._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(0) @@ -175,12 +182,16 @@ def _reset_times(self): self._published_time = None def _reset_archive_type(self): - """Resets the _archive_type label's value to 'ocp'""" + """Reset the _archive_type label's value to `ocp`.""" self._archive_type = "ocp" def _initialize_metrics_with_labels(self): - """Metrics with labels are not initialized when declared, because the Prometheus - client can’t know what values the label can have. This will initialize them.""" + """Initialize Prometheus metrics that have at least one label. + + Metrics with labels are not initialized when declared, because the Prometheus + client can’t know what values the label can have. This is therefore needed in + order to initialize them. + """ for val in ARCHIVE_TYPE_VALUES: self._extracted_total.labels(val) self._processed_total.labels(val) diff --git a/test/watchers/stats_watcher_test.py b/test/watchers/stats_watcher_test.py index ef846ad..4ae1f03 100644 --- a/test/watchers/stats_watcher_test.py +++ b/test/watchers/stats_watcher_test.py @@ -19,12 +19,22 @@ import pytest import time -from ccx_messaging.watchers.stats_watcher import StatsWatcher, ARCHIVE_TYPE_LABEL, ARCHIVE_TYPE_VALUES +from ccx_messaging.watchers.stats_watcher import ( + StatsWatcher, + ARCHIVE_TYPE_LABEL, + ARCHIVE_TYPE_VALUES, +) _INVALID_PORTS = [None, "8000", 8000.0, 70000] +@pytest.fixture(params=ARCHIVE_TYPE_VALUES) +def label_value(request): + """Set the label value for the running test.""" + return request.param + + @pytest.mark.parametrize("value", _INVALID_PORTS) def test_stats_watcher_initialize_invalid_port(value): """Test passing invalid data types or values to the `StatsWatcher` initializer fails.""" @@ -35,10 +45,6 @@ def test_stats_watcher_initialize_invalid_port(value): _VALID_PORTS = [{}, {"prometheus_port": 9500}, {"prometheus_port": 80}] -@pytest.fixture(params=ARCHIVE_TYPE_VALUES) -def label_value(request): - return request.param - @pytest.mark.parametrize("value", _VALID_PORTS) @patch("ccx_messaging.watchers.stats_watcher.start_http_server") def test_stats_watcher_initialize(start_http_server_mock, value): @@ -53,10 +59,21 @@ def check_initial_metrics_state(w): for value in ARCHIVE_TYPE_VALUES: assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 - assert w._extracted_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 - assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 - assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 - assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 + assert ( + w._extracted_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 + ) + assert ( + w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 + ) + assert ( + w._processed_timeout_total.labels( + **{ARCHIVE_TYPE_LABEL: value} + )._value.get() + == 0 + ) + assert ( + w._published_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 + ) assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: value})._value.get() == 0 assert w._not_handling_total._value.get() == 0 # Check initial values of histogram metrics @@ -95,10 +112,21 @@ def test_stats_watcher_on_recv(label_value): assert w._recv_total._value.get() == 1 assert w._filtered_total._value.get() == 0 assert len(w._downloaded_total._labelvalues) == 0 - assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert ( + w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._processed_timeout_total.labels( + **{ARCHIVE_TYPE_LABEL: label_value} + )._value.get() + == 0 + ) + assert ( + w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) assert w._not_handling_total._value.get() == 0 @@ -120,10 +148,21 @@ def test_stats_watcher_on_filter(label_value): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 1 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert ( + w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._processed_timeout_total.labels( + **{ARCHIVE_TYPE_LABEL: label_value} + )._value.get() + == 0 + ) + assert ( + w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) assert w._not_handling_total._value.get() == 0 @@ -146,10 +185,21 @@ def test_stats_watcher_on_download(label_value): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 100 - assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert ( + w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._processed_timeout_total.labels( + **{ARCHIVE_TYPE_LABEL: label_value} + )._value.get() + == 0 + ) + assert ( + w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) assert w._not_handling_total._value.get() == 0 @@ -173,10 +223,21 @@ def test_stats_watcher_on_process(label_value): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 - assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert ( + w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 + ) + assert ( + w._processed_timeout_total.labels( + **{ARCHIVE_TYPE_LABEL: label_value} + )._value.get() + == 0 + ) + assert ( + w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) assert w._not_handling_total._value.get() == 0 @@ -195,10 +256,21 @@ def test_stats_watcher_on_process_timeout(label_value): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 - assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert ( + w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._processed_timeout_total.labels( + **{ARCHIVE_TYPE_LABEL: label_value} + )._value.get() + == 1 + ) + assert ( + w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) assert w._not_handling_total._value.get() == 0 @@ -219,10 +291,21 @@ def test_stats_watcher_on_consumer_success(label_value): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert ( + w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._processed_timeout_total.labels( + **{ARCHIVE_TYPE_LABEL: label_value} + )._value.get() + == 0 + ) + assert ( + w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 + ) + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) assert w._not_handling_total._value.get() == 0 @@ -243,10 +326,21 @@ def test_stats_watcher_on_consumer_failure(label_value): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 + assert ( + w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._processed_timeout_total.labels( + **{ARCHIVE_TYPE_LABEL: label_value} + )._value.get() + == 0 + ) + assert ( + w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 1 + ) assert w._not_handling_total._value.get() == 0 # reset downloaded time @@ -256,7 +350,9 @@ def test_stats_watcher_on_consumer_failure(label_value): w.on_consumer_failure(input_msg, Exception("something")) # metric should change - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 2 + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 2 + ) # reset processed time w._processed_time = None @@ -265,7 +361,9 @@ def test_stats_watcher_on_consumer_failure(label_value): w.on_consumer_failure(input_msg, Exception("something")) # metric should change again - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 3 + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 3 + ) # now try this - downloaded time is not None and processed time is none w._downloaded_time = time.time() @@ -275,7 +373,9 @@ def test_stats_watcher_on_consumer_failure(label_value): w.on_consumer_failure(input_msg, Exception("something")) # metric should change again - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 4 + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 4 + ) @patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) @@ -295,10 +395,21 @@ def test_stats_watcher_on_not_handled(label_value): assert w._recv_total._value.get() == 0 assert w._filtered_total._value.get() == 0 assert w._downloaded_total._sum.get() == 0 - assert w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 - assert w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + assert ( + w._processed_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._processed_timeout_total.labels( + **{ARCHIVE_TYPE_LABEL: label_value} + )._value.get() + == 0 + ) + assert ( + w._published_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) + assert ( + w._failures_total.labels(**{ARCHIVE_TYPE_LABEL: label_value})._value.get() == 0 + ) assert w._not_handling_total._value.get() == 1 @@ -321,6 +432,7 @@ def test_reset_times(): assert w._processed_time is None assert w._published_time is None + @patch("ccx_messaging.watchers.stats_watcher.start_http_server", lambda *args: None) def test_reset_archive_type(label_value): """Test the method _reset_times().""" @@ -330,4 +442,4 @@ def test_reset_archive_type(label_value): w._reset_archive_type() - assert w._archive_type is 'ocp' + assert w._archive_type == "ocp"