Skip to content

Commit

Permalink
Apply linters
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Apr 16, 2024
1 parent 6a26d20 commit 7b685f7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
41 changes: 26 additions & 15 deletions ccx_messaging/watchers/stats_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@

LOG = logging.getLogger(__name__)
# A label is added to the metrics so we can differentiate OCP and HyperShift archives
ARCHIVE_TYPE_LABEL = 'archive'
ARCHIVE_TYPE_VALUES = ['ocp', 'hypershift']
ARCHIVE_TYPE_LABEL = "archive"
ARCHIVE_TYPE_VALUES = ["ocp", "hypershift"]


# pylint: disable=too-many-instance-attributes
class StatsWatcher(ConsumerWatcher):
"""A Watcher that stores different Prometheus `Counter`s."""
Expand All @@ -52,7 +54,9 @@ def __init__(self, prometheus_port=8000):
)

self._processed_total = Counter(
"ccx_engine_processed_total", "Counter of files processed by the OCP Engine", [ARCHIVE_TYPE_LABEL]
"ccx_engine_processed_total",
"Counter of files processed by the OCP Engine",
[ARCHIVE_TYPE_LABEL],
)

self._published_total = Counter(
Expand Down Expand Up @@ -87,7 +91,7 @@ def __init__(self, prometheus_port=8000):
self._processed_timeout_total = Counter(
"ccx_engine_processed_timeout_total",
"Counter of timeouts while processing archives",
[ARCHIVE_TYPE_LABEL]
[ARCHIVE_TYPE_LABEL],
)

self._start_time = None
Expand All @@ -97,14 +101,13 @@ def __init__(self, prometheus_port=8000):

# Archive type used in the metrics is set within on_extract, as we need
# to extract the archive in order to know that information
self._archive_type = 'ocp'
self._archive_type = "ocp"

self._initialize_metrics_with_labels()

start_http_server(prometheus_port)
LOG.info("StatWatcher created and listening on port %s", prometheus_port)


def on_recv(self, input_msg):
"""On received event handler."""
self._recv_total.inc()
Expand All @@ -118,9 +121,7 @@ def on_filter(self):
self._filtered_total.inc()

def on_extract(self, ctx, broker, extraction):
"""
Fired just after the archive is extracted but before any analysis.
"""
"""On extract event handler."""
# Set archive_type label to hypershift if config/infrastructure.json is found
hcp_config_file = os.path.join(extraction.tmp_dir, "config", "infrastructure.json")
if os.path.exists(hcp_config_file):
Expand All @@ -139,7 +140,9 @@ def on_process(self, input_msg, results):
self._processed_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc()

self._processed_time = time.time()
self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(self._processed_time - self._downloaded_time)
self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(
self._processed_time - self._downloaded_time
)

def on_process_timeout(self):
"""On process timeout event handler."""
Expand All @@ -150,7 +153,9 @@ def on_consumer_success(self, input_msg, broker, results):
self._published_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc()

self._published_time = time.time()
self._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(self._published_time - self._processed_time)
self._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(
self._published_time - self._processed_time
)

def on_consumer_failure(self, input_msg, exception):
"""On consumer failure event handler."""
Expand All @@ -160,7 +165,9 @@ def on_consumer_failure(self, input_msg, exception):
self._download_duration.observe(time.time() - self._start_time)
self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(0)
elif self._processed_time is None:
self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(time.time() - self._downloaded_time)
self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(
time.time() - self._downloaded_time
)

self._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(0)

Expand All @@ -175,12 +182,16 @@ def _reset_times(self):
self._published_time = None

def _reset_archive_type(self):
"""Resets the _archive_type label's value to 'ocp'"""
"""Reset the _archive_type label's value to `ocp`."""
self._archive_type = "ocp"

def _initialize_metrics_with_labels(self):
"""Metrics with labels are not initialized when declared, because the Prometheus
client can’t know what values the label can have. This will initialize them."""
"""Initialize Prometheus metrics that have at least one label.
Metrics with labels are not initialized when declared, because the Prometheus
client can’t know what values the label can have. This is therefore needed in
order to initialize them.
"""
for val in ARCHIVE_TYPE_VALUES:
self._extracted_total.labels(val)
self._processed_total.labels(val)
Expand Down
3 changes: 2 additions & 1 deletion test/watchers/stats_watcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def test_stats_watcher_initialize_invalid_port(value):

@pytest.fixture(params=ARCHIVE_TYPE_VALUES)
def label_value(request):
"""Set the label value for the running test."""
return request.param

@pytest.mark.parametrize("value", _VALID_PORTS)
Expand Down Expand Up @@ -330,4 +331,4 @@ def test_reset_archive_type(label_value):

w._reset_archive_type()

assert w._archive_type is 'ocp'
assert w._archive_type == 'ocp'

0 comments on commit 7b685f7

Please sign in to comment.