Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add label for differentiating OCP and HyperShift archives #196

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 67 additions & 12 deletions ccx_messaging/watchers/stats_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@


LOG = logging.getLogger(__name__)
# A label is added to the metrics so we can differentiate OCP and HyperShift archives
ARCHIVE_TYPE_LABEL = "archive"
ARCHIVE_TYPE_VALUES = ["ocp", "hypershift"]


# pylint: disable=too-many-instance-attributes
Expand All @@ -37,6 +40,7 @@ def __init__(self, prometheus_port=8000):
self._recv_total = Counter(
"ccx_consumer_received_total", "Counter of received Kafka messages"
)

self._filtered_total = Counter(
"ccx_consumer_filtered_total", "Counter of filtered Kafka messages"
)
Expand All @@ -45,16 +49,22 @@ def __init__(self, prometheus_port=8000):
"ccx_downloaded_total", "Histogram of the size of downloaded items"
)

self._extracted_total = Counter(
"ccx_consumer_extracted_total", "Counter of extracted archives", [ARCHIVE_TYPE_LABEL]
)

self._processed_total = Counter(
"ccx_engine_processed_total", "Counter of files processed by the OCP Engine"
"ccx_engine_processed_total",
"Counter of files processed by the OCP Engine",
[ARCHIVE_TYPE_LABEL],
)

self._published_total = Counter(
"ccx_published_total", "Counter of reports successfully published"
"ccx_published_total", "Counter of reports successfully published", [ARCHIVE_TYPE_LABEL]
)

self._failures_total = Counter(
"ccx_failures_total", "Counter of failures during the pipeline"
"ccx_failures_total", "Counter of failures during the pipeline", [ARCHIVE_TYPE_LABEL]
)

self._not_handling_total = Counter(
Expand All @@ -69,23 +79,32 @@ def __init__(self, prometheus_port=8000):
self._process_duration = Histogram(
"ccx_process_duration_seconds",
"Histogram of durations of processing archives by the OCP engine",
[ARCHIVE_TYPE_LABEL],
)

self._publish_duration = Histogram(
"ccx_publish_duration_seconds",
"Histogram of durations of publishing the OCP engine results",
[ARCHIVE_TYPE_LABEL],
)

self._processed_timeout_total = Counter(
"ccx_engine_processed_timeout_total",
"Counter of timeouts while processing archives",
[ARCHIVE_TYPE_LABEL],
)

self._start_time = None
self._downloaded_time = None
self._processed_time = None
self._published_time = None

# Archive type used in the metrics is set within on_extract, as we need
# to extract the archive in order to know that information
self._archive_type = "ocp"

self._initialize_metrics_with_labels()

start_http_server(prometheus_port)
LOG.info("StatWatcher created and listening on port %s", prometheus_port)

Expand All @@ -95,11 +114,20 @@ def on_recv(self, input_msg):

self._start_time = time.time()
self._reset_times()
self._reset_archive_type()

def on_filter(self):
"""On filter event handler."""
self._filtered_total.inc()

def on_extract(self, ctx, broker, extraction):
"""On extract event handler."""
# Set archive_type label to hypershift if config/infrastructure.json is found
hcp_config_file = os.path.join(extraction.tmp_dir, "config", "infrastructure.json")
if os.path.exists(hcp_config_file):
self._archive_type = "hypershift"
self._extracted_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc()

def on_download(self, path):
"""On downloaded event handler."""
self._downloaded_total.observe(os.path.getsize(path))
Expand All @@ -109,33 +137,39 @@ def on_download(self, path):

def on_process(self, input_msg, results):
"""On processed event handler."""
self._processed_total.inc()
self._processed_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc()

self._processed_time = time.time()
self._process_duration.observe(self._processed_time - self._downloaded_time)
self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(
self._processed_time - self._downloaded_time
)

def on_process_timeout(self):
"""On process timeout event handler."""
self._processed_timeout_total.inc()
self._processed_timeout_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc()

def on_consumer_success(self, input_msg, broker, results):
"""On consumer success event handler."""
self._published_total.inc()
self._published_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc()

self._published_time = time.time()
self._publish_duration.observe(self._published_time - self._processed_time)
self._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(
self._published_time - self._processed_time
)

def on_consumer_failure(self, input_msg, exception):
"""On consumer failure event handler."""
self._failures_total.inc()
self._failures_total.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).inc()

if self._downloaded_time is None:
self._download_duration.observe(time.time() - self._start_time)
self._process_duration.observe(0)
self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(0)
elif self._processed_time is None:
self._process_duration.observe(time.time() - self._downloaded_time)
self._process_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(
time.time() - self._downloaded_time
)

self._publish_duration.observe(0)
self._publish_duration.labels(**{ARCHIVE_TYPE_LABEL: self._archive_type}).observe(0)

def on_not_handled(self, input_msg):
"""On not handled messages success event handler."""
Expand All @@ -147,11 +181,32 @@ def _reset_times(self):
self._processed_time = None
self._published_time = None

def _reset_archive_type(self):
"""Reset the _archive_type label's value to `ocp`."""
self._archive_type = "ocp"

def _initialize_metrics_with_labels(self):
"""Initialize Prometheus metrics that have at least one label.

Metrics with labels are not initialized when declared, because the Prometheus
client can’t know what values the label can have. This is therefore needed in
order to initialize them.
"""
for val in ARCHIVE_TYPE_VALUES:
self._extracted_total.labels(val)
self._processed_total.labels(val)
self._published_total.labels(val)
self._failures_total.labels(val)
self._process_duration.labels(val)
self._publish_duration.labels(val)
self._processed_timeout_total.labels(val)

def __del__(self):
"""Destructor for handling counters unregistering."""
REGISTRY.unregister(self._recv_total)
REGISTRY.unregister(self._filtered_total)
REGISTRY.unregister(self._downloaded_total)
REGISTRY.unregister(self._extracted_total)
REGISTRY.unregister(self._processed_total)
REGISTRY.unregister(self._published_total)
REGISTRY.unregister(self._failures_total)
Expand Down
Loading
Loading