From 3f5784159969ac9569da9677f82f8e08eb2bbad2 Mon Sep 17 00:00:00 2001 From: Robert Gildein Date: Wed, 7 Aug 2024 09:03:53 +0200 Subject: [PATCH] chore(backport): Add metrics relation (#261) Expose metrics port and provide metrics-endpoint relation. With these changes, charm can be monitored with Prometheus. --- .../v1/kubernetes_service_patch.py | 433 +++ .../prometheus_k8s/v0/prometheus_scrape.py | 2378 +++++++++++++++++ charms/kserve-controller/metadata.yaml | 3 + charms/kserve-controller/src/charm.py | 21 +- .../src/templates/configmap_manifests.yaml.j2 | 4 +- .../integration/config-map-data-changed.yaml | 4 +- .../tests/integration/config-map-data.yaml | 4 +- .../tests/integration/test_charm.py | 13 +- .../tests/unit/test_charm.py | 21 +- 9 files changed, 2871 insertions(+), 10 deletions(-) create mode 100644 charms/kserve-controller/lib/charms/observability_libs/v1/kubernetes_service_patch.py create mode 100644 charms/kserve-controller/lib/charms/prometheus_k8s/v0/prometheus_scrape.py diff --git a/charms/kserve-controller/lib/charms/observability_libs/v1/kubernetes_service_patch.py b/charms/kserve-controller/lib/charms/observability_libs/v1/kubernetes_service_patch.py new file mode 100644 index 0000000..e85834b --- /dev/null +++ b/charms/kserve-controller/lib/charms/observability_libs/v1/kubernetes_service_patch.py @@ -0,0 +1,433 @@ +# Copyright 2021 Canonical Ltd. +# See LICENSE file for licensing details. + +"""# KubernetesServicePatch Library. + +This library is designed to enable developers to more simply patch the Kubernetes Service created +by Juju during the deployment of a sidecar charm. When sidecar charms are deployed, Juju creates a +service named after the application in the namespace (named after the Juju model). This service by +default contains a "placeholder" port, which is 65535/TCP. + +When modifying the default set of resources managed by Juju, one must consider the lifecycle of the +charm. In this case, any modifications to the default service (created during deployment), will be +overwritten during a charm upgrade. + +When initialised, this library binds a handler to the parent charm's `install` and `upgrade_charm` +events which applies the patch to the cluster. This should ensure that the service ports are +correct throughout the charm's life. + +The constructor simply takes a reference to the parent charm, and a list of +[`lightkube`](https://github.com/gtsystem/lightkube) ServicePorts that each define a port for the +service. For information regarding the `lightkube` `ServicePort` model, please visit the +`lightkube` [docs](https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#serviceport). + +Optionally, a name of the service (in case service name needs to be patched as well), labels, +selectors, and annotations can be provided as keyword arguments. + +## Getting Started + +To get started using the library, you just need to fetch the library using `charmcraft`. **Note +that you also need to add `lightkube` and `lightkube-models` to your charm's `requirements.txt`.** + +```shell +cd some-charm +charmcraft fetch-lib charms.observability_libs.v1.kubernetes_service_patch +cat << EOF >> requirements.txt +lightkube +lightkube-models +EOF +``` + +Then, to initialise the library: + +For `ClusterIP` services: + +```python +# ... +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + port = ServicePort(443, name=f"{self.app.name}") + self.service_patcher = KubernetesServicePatch(self, [port]) + # ... +``` + +For `LoadBalancer`/`NodePort` services: + +```python +# ... +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + port = ServicePort(443, name=f"{self.app.name}", targetPort=443, nodePort=30666) + self.service_patcher = KubernetesServicePatch( + self, [port], "LoadBalancer" + ) + # ... +``` + +Port protocols can also be specified. Valid protocols are `"TCP"`, `"UDP"`, and `"SCTP"` + +```python +# ... +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + tcp = ServicePort(443, name=f"{self.app.name}-tcp", protocol="TCP") + udp = ServicePort(443, name=f"{self.app.name}-udp", protocol="UDP") + sctp = ServicePort(443, name=f"{self.app.name}-sctp", protocol="SCTP") + self.service_patcher = KubernetesServicePatch(self, [tcp, udp, sctp]) + # ... +``` + +Bound with custom events by providing `refresh_event` argument: +For example, you would like to have a configurable port in your charm and want to apply +service patch every time charm config is changed. + +```python +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + port = ServicePort(int(self.config["charm-config-port"]), name=f"{self.app.name}") + self.service_patcher = KubernetesServicePatch( + self, + [port], + refresh_event=self.on.config_changed + ) + # ... +``` + +Creating a new k8s lb service instead of patching the one created by juju +Service name is optional. If not provided, it defaults to {app_name}-lb. +If provided and equal to app_name, it also defaults to {app_name}-lb to prevent conflicts with the Juju default service. +```python +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from lightkube.models.core_v1 import ServicePort + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + port = ServicePort(int(self.config["charm-config-port"]), name=f"{self.app.name}") + self.service_patcher = KubernetesServicePatch( + self, + [port], + service_type="LoadBalancer", + service_name="application-lb" + ) + # ... +``` + +Additionally, you may wish to use mocks in your charm's unit testing to ensure that the library +does not try to make any API calls, or open any files during testing that are unlikely to be +present, and could break your tests. The easiest way to do this is during your test `setUp`: + +```python +# ... + +@patch("charm.KubernetesServicePatch", lambda x, y: None) +def setUp(self, *unused): + self.harness = Harness(SomeCharm) + # ... +``` +""" + +import logging +from types import MethodType +from typing import Any, List, Literal, Optional, Union + +from lightkube import ApiError, Client # pyright: ignore +from lightkube.core import exceptions +from lightkube.models.core_v1 import ServicePort, ServiceSpec +from lightkube.models.meta_v1 import ObjectMeta +from lightkube.resources.core_v1 import Service +from lightkube.types import PatchType +from ops import UpgradeCharmEvent +from ops.charm import CharmBase +from ops.framework import BoundEvent, Object + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "0042f86d0a874435adef581806cddbbb" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 12 + +ServiceType = Literal["ClusterIP", "LoadBalancer"] + + +class KubernetesServicePatch(Object): + """A utility for patching the Kubernetes service set up by Juju.""" + + def __init__( + self, + charm: CharmBase, + ports: List[ServicePort], + service_name: Optional[str] = None, + service_type: ServiceType = "ClusterIP", + additional_labels: Optional[dict] = None, + additional_selectors: Optional[dict] = None, + additional_annotations: Optional[dict] = None, + *, + refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None, + ): + """Constructor for KubernetesServicePatch. + + Args: + charm: the charm that is instantiating the library. + ports: a list of ServicePorts + service_name: allows setting custom name to the patched service. If none given, + application name will be used. + service_type: desired type of K8s service. Default value is in line with ServiceSpec's + default value. + additional_labels: Labels to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_selectors: Selectors to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_annotations: Annotations to be added to the kubernetes service. + refresh_event: an optional bound event or list of bound events which + will be observed to re-apply the patch (e.g. on port change). + The `install` and `upgrade-charm` events would be observed regardless. + """ + super().__init__(charm, "kubernetes-service-patch") + self.charm = charm + self.service_name = service_name or self._app + # To avoid conflicts with the default Juju service, append "-lb" to the service name. + # The Juju application name is retained for the default service created by Juju. + if self.service_name == self._app and service_type == "LoadBalancer": + self.service_name = f"{self._app}-lb" + self.service_type = service_type + self.service = self._service_object( + ports, + self.service_name, + service_type, + additional_labels, + additional_selectors, + additional_annotations, + ) + + # Make mypy type checking happy that self._patch is a method + assert isinstance(self._patch, MethodType) + # Ensure this patch is applied during the 'install' and 'upgrade-charm' events + self.framework.observe(charm.on.install, self._patch) + self.framework.observe(charm.on.upgrade_charm, self._on_upgrade_charm) + self.framework.observe(charm.on.update_status, self._patch) + # Sometimes Juju doesn't clean-up a manually created LB service, + # so we clean it up ourselves just in case. + self.framework.observe(charm.on.remove, self._remove_service) + + # apply user defined events + if refresh_event: + if not isinstance(refresh_event, list): + refresh_event = [refresh_event] + + for evt in refresh_event: + self.framework.observe(evt, self._patch) + + def _service_object( + self, + ports: List[ServicePort], + service_name: Optional[str] = None, + service_type: ServiceType = "ClusterIP", + additional_labels: Optional[dict] = None, + additional_selectors: Optional[dict] = None, + additional_annotations: Optional[dict] = None, + ) -> Service: + """Creates a valid Service representation. + + Args: + ports: a list of ServicePorts + service_name: allows setting custom name to the patched service. If none given, + application name will be used. + service_type: desired type of K8s service. Default value is in line with ServiceSpec's + default value. + additional_labels: Labels to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_selectors: Selectors to be added to the kubernetes service (by default only + "app.kubernetes.io/name" is set to the service name) + additional_annotations: Annotations to be added to the kubernetes service. + + Returns: + Service: A valid representation of a Kubernetes Service with the correct ports. + """ + if not service_name: + service_name = self._app + labels = {"app.kubernetes.io/name": self._app} + if additional_labels: + labels.update(additional_labels) + selector = {"app.kubernetes.io/name": self._app} + if additional_selectors: + selector.update(additional_selectors) + return Service( + apiVersion="v1", + kind="Service", + metadata=ObjectMeta( + namespace=self._namespace, + name=service_name, + labels=labels, + annotations=additional_annotations, # type: ignore[arg-type] + ), + spec=ServiceSpec( + selector=selector, + ports=ports, + type=service_type, + ), + ) + + def _patch(self, _) -> None: + """Patch the Kubernetes service created by Juju to map the correct port. + + Raises: + PatchFailed: if patching fails due to lack of permissions, or otherwise. + """ + try: + client = Client() # pyright: ignore + except exceptions.ConfigError as e: + logger.warning("Error creating k8s client: %s", e) + return + + try: + if self._is_patched(client): + return + if self.service_name != self._app: + if not self.service_type == "LoadBalancer": + self._delete_and_create_service(client) + else: + self._create_lb_service(client) + client.patch(Service, self.service_name, self.service, patch_type=PatchType.MERGE) + except ApiError as e: + if e.status.code == 403: + logger.error("Kubernetes service patch failed: `juju trust` this application.") + else: + logger.error("Kubernetes service patch failed: %s", str(e)) + else: + logger.info("Kubernetes service '%s' patched successfully", self._app) + + def _delete_and_create_service(self, client: Client): + service = client.get(Service, self._app, namespace=self._namespace) + service.metadata.name = self.service_name # type: ignore[attr-defined] + service.metadata.resourceVersion = service.metadata.uid = None # type: ignore[attr-defined] # noqa: E501 + client.delete(Service, self._app, namespace=self._namespace) + client.create(service) + + def _create_lb_service(self, client: Client): + try: + client.get(Service, self.service_name, namespace=self._namespace) + except ApiError: + client.create(self.service) + + def is_patched(self) -> bool: + """Reports if the service patch has been applied. + + Returns: + bool: A boolean indicating if the service patch has been applied. + """ + client = Client() # pyright: ignore + return self._is_patched(client) + + def _is_patched(self, client: Client) -> bool: + # Get the relevant service from the cluster + try: + service = client.get(Service, name=self.service_name, namespace=self._namespace) + except ApiError as e: + if e.status.code == 404 and self.service_name != self._app: + return False + logger.error("Kubernetes service get failed: %s", str(e)) + raise + + # Construct a list of expected ports, should the patch be applied + expected_ports = [(p.port, p.targetPort) for p in self.service.spec.ports] # type: ignore[attr-defined] + # Construct a list in the same manner, using the fetched service + fetched_ports = [ + (p.port, p.targetPort) for p in service.spec.ports # type: ignore[attr-defined] + ] # noqa: E501 + return expected_ports == fetched_ports + + def _on_upgrade_charm(self, event: UpgradeCharmEvent): + """Handle the upgrade charm event.""" + # If a charm author changed the service type from LB to ClusterIP across an upgrade, we need to delete the previous LB. + if self.service_type == "ClusterIP": + + client = Client() # pyright: ignore + + # Define a label selector to find services related to the app + selector: dict[str, Any] = {"app.kubernetes.io/name": self._app} + + # Check if any service of type LoadBalancer exists + services = client.list(Service, namespace=self._namespace, labels=selector) + for service in services: + if ( + not service.metadata + or not service.metadata.name + or not service.spec + or not service.spec.type + ): + logger.warning( + "Service patch: skipping resource with incomplete metadata: %s.", service + ) + continue + if service.spec.type == "LoadBalancer": + client.delete(Service, service.metadata.name, namespace=self._namespace) + logger.info(f"LoadBalancer service {service.metadata.name} deleted.") + + # Continue the upgrade flow normally + self._patch(event) + + def _remove_service(self, _): + """Remove a Kubernetes service associated with this charm. + + Specifically designed to delete the load balancer service created by the charm, since Juju only deletes the + default ClusterIP service and not custom services. + + Returns: + None + + Raises: + ApiError: for deletion errors, excluding when the service is not found (404 Not Found). + """ + client = Client() # pyright: ignore + + try: + client.delete(Service, self.service_name, namespace=self._namespace) + logger.info("The patched k8s service '%s' was deleted.", self.service_name) + except ApiError as e: + if e.status.code == 404: + # Service not found, so no action needed + return + # Re-raise for other statuses + raise + + @property + def _app(self) -> str: + """Name of the current Juju application. + + Returns: + str: A string containing the name of the current Juju application. + """ + return self.charm.app.name + + @property + def _namespace(self) -> str: + """The Kubernetes namespace we're running in. + + Returns: + str: A string containing the name of the current Kubernetes namespace. + """ + with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: + return f.read().strip() diff --git a/charms/kserve-controller/lib/charms/prometheus_k8s/v0/prometheus_scrape.py b/charms/kserve-controller/lib/charms/prometheus_k8s/v0/prometheus_scrape.py new file mode 100644 index 0000000..e3d35c6 --- /dev/null +++ b/charms/kserve-controller/lib/charms/prometheus_k8s/v0/prometheus_scrape.py @@ -0,0 +1,2378 @@ +# Copyright 2021 Canonical Ltd. +# See LICENSE file for licensing details. +"""Prometheus Scrape Library. + +## Overview + +This document explains how to integrate with the Prometheus charm +for the purpose of providing a metrics endpoint to Prometheus. It +also explains how alternative implementations of the Prometheus charms +may maintain the same interface and be backward compatible with all +currently integrated charms. Finally this document is the +authoritative reference on the structure of relation data that is +shared between Prometheus charms and any other charm that intends to +provide a scrape target for Prometheus. + +## Source code + +Source code can be found on GitHub at: + https://github.com/canonical/prometheus-k8s-operator/tree/main/lib/charms/prometheus_k8s + +## Provider Library Usage + +This Prometheus charm interacts with its scrape targets using its +charm library. Charms seeking to expose metric endpoints for the +Prometheus charm, must do so using the `MetricsEndpointProvider` +object from this charm library. For the simplest use cases, using the +`MetricsEndpointProvider` object only requires instantiating it, +typically in the constructor of your charm (the one which exposes a +metrics endpoint). The `MetricsEndpointProvider` constructor requires +the name of the relation over which a scrape target (metrics endpoint) +is exposed to the Prometheus charm. This relation must use the +`prometheus_scrape` interface. By default address of the metrics +endpoint is set to the unit IP address, by each unit of the +`MetricsEndpointProvider` charm. These units set their address in +response to the `PebbleReady` event of each container in the unit, +since container restarts of Kubernetes charms can result in change of +IP addresses. The default name for the metrics endpoint relation is +`metrics-endpoint`. It is strongly recommended to use the same +relation name for consistency across charms and doing so obviates the +need for an additional constructor argument. The +`MetricsEndpointProvider` object may be instantiated as follows + + from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider + + def __init__(self, *args): + super().__init__(*args) + ... + self.metrics_endpoint = MetricsEndpointProvider(self) + ... + +Note that the first argument (`self`) to `MetricsEndpointProvider` is +always a reference to the parent (scrape target) charm. + +An instantiated `MetricsEndpointProvider` object will ensure that each +unit of its parent charm, is a scrape target for the +`MetricsEndpointConsumer` (Prometheus) charm. By default +`MetricsEndpointProvider` assumes each unit of the consumer charm +exports its metrics at a path given by `/metrics` on port 80. These +defaults may be changed by providing the `MetricsEndpointProvider` +constructor an optional argument (`jobs`) that represents a +Prometheus scrape job specification using Python standard data +structures. This job specification is a subset of Prometheus' own +[scrape +configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) +format but represented using Python data structures. More than one job +may be provided using the `jobs` argument. Hence `jobs` accepts a list +of dictionaries where each dictionary represents one `` +object as described in the Prometheus documentation. The currently +supported configuration subset is: `job_name`, `metrics_path`, +`static_configs` + +Suppose it is required to change the port on which scraped metrics are +exposed to 8000. This may be done by providing the following data +structure as the value of `jobs`. + +``` +[ + { + "static_configs": [ + { + "targets": ["*:8000"] + } + ] + } +] +``` + +The wildcard ("*") host specification implies that the scrape targets +will automatically be set to the host addresses advertised by each +unit of the consumer charm. + +It is also possible to change the metrics path and scrape multiple +ports, for example + +``` +[ + { + "metrics_path": "/my-metrics-path", + "static_configs": [ + { + "targets": ["*:8000", "*:8081"], + } + ] + } +] +``` + +More complex scrape configurations are possible. For example + +``` +[ + { + "static_configs": [ + { + "targets": ["10.1.32.215:7000", "*:8000"], + "labels": { + "some_key": "some-value" + } + } + ] + } +] +``` + +This example scrapes the target "10.1.32.215" at port 7000 in addition +to scraping each unit at port 8000. There is however one difference +between wildcard targets (specified using "*") and fully qualified +targets (such as "10.1.32.215"). The Prometheus charm automatically +associates labels with metrics generated by each target. These labels +localise the source of metrics within the Juju topology by specifying +its "model name", "model UUID", "application name" and "unit +name". However unit name is associated only with wildcard targets but +not with fully qualified targets. + +Multiple jobs with different metrics paths and labels are allowed, but +each job must be given a unique name: + +``` +[ + { + "job_name": "my-first-job", + "metrics_path": "one-path", + "static_configs": [ + { + "targets": ["*:7000"], + "labels": { + "some_key": "some-value" + } + } + ] + }, + { + "job_name": "my-second-job", + "metrics_path": "another-path", + "static_configs": [ + { + "targets": ["*:8000"], + "labels": { + "some_other_key": "some-other-value" + } + } + ] + } +] +``` + +**Important:** `job_name` should be a fixed string (e.g. hardcoded literal). +For instance, if you include variable elements, like your `unit.name`, it may break +the continuity of the metrics time series gathered by Prometheus when the leader unit +changes (e.g. on upgrade or rescale). + +Additionally, it is also technically possible, but **strongly discouraged**, to +configure the following scrape-related settings, which behave as described by the +[Prometheus documentation](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config): + +- `static_configs` +- `scrape_interval` +- `scrape_timeout` +- `proxy_url` +- `relabel_configs` +- `metric_relabel_configs` +- `sample_limit` +- `label_limit` +- `label_name_length_limit` +- `label_value_length_limit` + +The settings above are supported by the `prometheus_scrape` library only for the sake of +specialized facilities like the [Prometheus Scrape Config](https://charmhub.io/prometheus-scrape-config-k8s) +charm. Virtually no charms should use these settings, and charmers definitely **should not** +expose them to the Juju administrator via configuration options. + +## Consumer Library Usage + +The `MetricsEndpointConsumer` object may be used by Prometheus +charms to manage relations with their scrape targets. For this +purposes a Prometheus charm needs to do two things + +1. Instantiate the `MetricsEndpointConsumer` object by providing it a +reference to the parent (Prometheus) charm and optionally the name of +the relation that the Prometheus charm uses to interact with scrape +targets. This relation must confirm to the `prometheus_scrape` +interface and it is strongly recommended that this relation be named +`metrics-endpoint` which is its default value. + +For example a Prometheus charm may instantiate the +`MetricsEndpointConsumer` in its constructor as follows + + from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointConsumer + + def __init__(self, *args): + super().__init__(*args) + ... + self.metrics_consumer = MetricsEndpointConsumer(self) + ... + +2. A Prometheus charm also needs to respond to the +`TargetsChangedEvent` event of the `MetricsEndpointConsumer` by adding itself as +an observer for these events, as in + + self.framework.observe( + self.metrics_consumer.on.targets_changed, + self._on_scrape_targets_changed, + ) + +In responding to the `TargetsChangedEvent` event the Prometheus +charm must update the Prometheus configuration so that any new scrape +targets are added and/or old ones removed from the list of scraped +endpoints. For this purpose the `MetricsEndpointConsumer` object +exposes a `jobs()` method that returns a list of scrape jobs. Each +element of this list is the Prometheus scrape configuration for that +job. In order to update the Prometheus configuration, the Prometheus +charm needs to replace the current list of jobs with the list provided +by `jobs()` as follows + + def _on_scrape_targets_changed(self, event): + ... + scrape_jobs = self.metrics_consumer.jobs() + for job in scrape_jobs: + prometheus_scrape_config.append(job) + ... + +## Alerting Rules + +This charm library also supports gathering alerting rules from all +related `MetricsEndpointProvider` charms and enabling corresponding alerts within the +Prometheus charm. Alert rules are automatically gathered by `MetricsEndpointProvider` +charms when using this library, from a directory conventionally named +`prometheus_alert_rules`. This directory must reside at the top level +in the `src` folder of the consumer charm. Each file in this directory +is assumed to be in one of two formats: +- the official prometheus alert rule format, conforming to the +[Prometheus docs](https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/) +- a single rule format, which is a simplified subset of the official format, +comprising a single alert rule per file, using the same YAML fields. + +The file name must have one of the following extensions: +- `.rule` +- `.rules` +- `.yml` +- `.yaml` + +An example of the contents of such a file in the custom single rule +format is shown below. + +``` +alert: HighRequestLatency +expr: job:request_latency_seconds:mean5m{my_key=my_value} > 0.5 +for: 10m +labels: + severity: Medium + type: HighLatency +annotations: + summary: High request latency for {{ $labels.instance }}. +``` + +The `MetricsEndpointProvider` will read all available alert rules and +also inject "filtering labels" into the alert expressions. The +filtering labels ensure that alert rules are localised to the metrics +provider charm's Juju topology (application, model and its UUID). Such +a topology filter is essential to ensure that alert rules submitted by +one provider charm generates alerts only for that same charm. When +alert rules are embedded in a charm, and the charm is deployed as a +Juju application, the alert rules from that application have their +expressions automatically updated to filter for metrics coming from +the units of that application alone. This remove risk of spurious +evaluation, e.g., when you have multiple deployments of the same charm +monitored by the same Prometheus. + +Not all alerts one may want to specify can be embedded in a +charm. Some alert rules will be specific to a user's use case. This is +the case, for example, of alert rules that are based on business +constraints, like expecting a certain amount of requests to a specific +API every five minutes. Such alert rules can be specified via the +[COS Config Charm](https://charmhub.io/cos-configuration-k8s), +which allows importing alert rules and other settings like dashboards +from a Git repository. + +Gathering alert rules and generating rule files within the Prometheus +charm is easily done using the `alerts()` method of +`MetricsEndpointConsumer`. Alerts generated by Prometheus will +automatically include Juju topology labels in the alerts. These labels +indicate the source of the alert. The following labels are +automatically included with each alert + +- `juju_model` +- `juju_model_uuid` +- `juju_application` + +## Relation Data + +The Prometheus charm uses both application and unit relation data to +obtain information regarding its scrape jobs, alert rules and scrape +targets. This relation data is in JSON format and it closely resembles +the YAML structure of Prometheus [scrape configuration] +(https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config). + +Units of Metrics provider charms advertise their names and addresses +over unit relation data using the `prometheus_scrape_unit_name` and +`prometheus_scrape_unit_address` keys. While the `scrape_metadata`, +`scrape_jobs` and `alert_rules` keys in application relation data +of Metrics provider charms hold eponymous information. + +""" # noqa: W505 + +import copy +import hashlib +import ipaddress +import json +import logging +import os +import platform +import re +import socket +import subprocess +import tempfile +from collections import defaultdict +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from urllib.parse import urlparse + +import yaml +from cosl import JujuTopology +from cosl.rules import AlertRules +from ops.charm import CharmBase, RelationRole +from ops.framework import ( + BoundEvent, + EventBase, + EventSource, + Object, + ObjectEvents, + StoredDict, + StoredList, + StoredState, +) +from ops.model import Relation + +# The unique Charmhub library identifier, never change it +LIBID = "bc84295fef5f4049878f07b131968ee2" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 47 + +PYDEPS = ["cosl"] + +logger = logging.getLogger(__name__) + + +ALLOWED_KEYS = { + "job_name", + "metrics_path", + "static_configs", + "scrape_interval", + "scrape_timeout", + "proxy_url", + "relabel_configs", + "metric_relabel_configs", + "sample_limit", + "label_limit", + "label_name_length_limit", + "label_value_length_limit", + "scheme", + "basic_auth", + "tls_config", + "authorization", + "params", +} +DEFAULT_JOB = { + "metrics_path": "/metrics", + "static_configs": [{"targets": ["*:80"]}], +} + + +DEFAULT_RELATION_NAME = "metrics-endpoint" +RELATION_INTERFACE_NAME = "prometheus_scrape" + +DEFAULT_ALERT_RULES_RELATIVE_PATH = "./src/prometheus_alert_rules" + + +class PrometheusConfig: + """A namespace for utility functions for manipulating the prometheus config dict.""" + + # relabel instance labels so that instance identifiers are globally unique + # stable over unit recreation + topology_relabel_config = { + "source_labels": ["juju_model", "juju_model_uuid", "juju_application"], + "separator": "_", + "target_label": "instance", + "regex": "(.*)", + } + + topology_relabel_config_wildcard = { + "source_labels": ["juju_model", "juju_model_uuid", "juju_application", "juju_unit"], + "separator": "_", + "target_label": "instance", + "regex": "(.*)", + } + + @staticmethod + def sanitize_scrape_config(job: dict) -> dict: + """Restrict permissible scrape configuration options. + + If job is empty then a default job is returned. The + default job is + + ``` + { + "metrics_path": "/metrics", + "static_configs": [{"targets": ["*:80"]}], + } + ``` + + Args: + job: a dict containing a single Prometheus job + specification. + + Returns: + a dictionary containing a sanitized job specification. + """ + sanitized_job = DEFAULT_JOB.copy() + sanitized_job.update({key: value for key, value in job.items() if key in ALLOWED_KEYS}) + return sanitized_job + + @staticmethod + def sanitize_scrape_configs(scrape_configs: List[dict]) -> List[dict]: + """A vectorized version of `sanitize_scrape_config`.""" + return [PrometheusConfig.sanitize_scrape_config(job) for job in scrape_configs] + + @staticmethod + def prefix_job_names(scrape_configs: List[dict], prefix: str) -> List[dict]: + """Adds the given prefix to all the job names in the given scrape_configs list.""" + modified_scrape_configs = [] + for scrape_config in scrape_configs: + job_name = scrape_config.get("job_name") + modified = scrape_config.copy() + modified["job_name"] = prefix + "_" + job_name if job_name else prefix + modified_scrape_configs.append(modified) + + return modified_scrape_configs + + @staticmethod + def expand_wildcard_targets_into_individual_jobs( + scrape_jobs: List[dict], + hosts: Dict[str, Tuple[str, str]], + topology: Optional[JujuTopology] = None, + ) -> List[dict]: + """Extract wildcard hosts from the given scrape_configs list into separate jobs. + + Args: + scrape_jobs: list of scrape jobs. + hosts: a dictionary mapping host names to host address for + all units of the relation for which this job configuration + must be constructed. + topology: optional arg for adding topology labels to scrape targets. + """ + # hosts = self._relation_hosts(relation) + + modified_scrape_jobs = [] + for job in scrape_jobs: + static_configs = job.get("static_configs") + if not static_configs: + continue + + # When a single unit specified more than one wildcard target, then they are expanded + # into a static_config per target + non_wildcard_static_configs = [] + + for static_config in static_configs: + targets = static_config.get("targets") + if not targets: + continue + + # All non-wildcard targets remain in the same static_config + non_wildcard_targets = [] + + # All wildcard targets are extracted to a job per unit. If multiple wildcard + # targets are specified, they remain in the same static_config (per unit). + wildcard_targets = [] + + for target in targets: + match = re.compile(r"\*(?:(:\d+))?").match(target) + if match: + # This is a wildcard target. + # Need to expand into separate jobs and remove it from this job here + wildcard_targets.append(target) + else: + # This is not a wildcard target. Copy it over into its own static_config. + non_wildcard_targets.append(target) + + # All non-wildcard targets remain in the same static_config + if non_wildcard_targets: + non_wildcard_static_config = static_config.copy() + non_wildcard_static_config["targets"] = non_wildcard_targets + + if topology: + # When non-wildcard targets (aka fully qualified hostnames) are specified, + # there is no reliable way to determine the name (Juju topology unit name) + # for such a target. Therefore labeling with Juju topology, excluding the + # unit name. + non_wildcard_static_config["labels"] = { + **topology.label_matcher_dict, + **non_wildcard_static_config.get("labels", {}), + } + + non_wildcard_static_configs.append(non_wildcard_static_config) + + # Extract wildcard targets into individual jobs + if wildcard_targets: + for unit_name, (unit_hostname, unit_path) in hosts.items(): + modified_job = job.copy() + modified_job["static_configs"] = [static_config.copy()] + modified_static_config = modified_job["static_configs"][0] + modified_static_config["targets"] = [ + target.replace("*", unit_hostname) for target in wildcard_targets + ] + + unit_num = unit_name.split("/")[-1] + job_name = modified_job.get("job_name", "unnamed-job") + "-" + unit_num + modified_job["job_name"] = job_name + modified_job["metrics_path"] = unit_path + ( + job.get("metrics_path") or "/metrics" + ) + + if topology: + # Add topology labels + modified_static_config["labels"] = { + **topology.label_matcher_dict, + **{"juju_unit": unit_name}, + **modified_static_config.get("labels", {}), + } + + # Instance relabeling for topology should be last in order. + modified_job["relabel_configs"] = modified_job.get( + "relabel_configs", [] + ) + [PrometheusConfig.topology_relabel_config_wildcard] + + modified_scrape_jobs.append(modified_job) + + if non_wildcard_static_configs: + modified_job = job.copy() + modified_job["static_configs"] = non_wildcard_static_configs + modified_job["metrics_path"] = modified_job.get("metrics_path") or "/metrics" + + if topology: + # Instance relabeling for topology should be last in order. + modified_job["relabel_configs"] = modified_job.get("relabel_configs", []) + [ + PrometheusConfig.topology_relabel_config + ] + + modified_scrape_jobs.append(modified_job) + + return modified_scrape_jobs + + @staticmethod + def render_alertmanager_static_configs(alertmanagers: List[str]): + """Render the alertmanager static_configs section from a list of URLs. + + Each target must be in the hostname:port format, and prefixes are specified in a separate + key. Therefore, with ingress in place, would need to extract the path into the + `path_prefix` key, which is higher up in the config hierarchy. + + https://prometheus.io/docs/prometheus/latest/configuration/configuration/#alertmanager_config + + Args: + alertmanagers: List of alertmanager URLs. + + Returns: + A dict representation for the static_configs section. + """ + # Make sure it's a valid url so urlparse could parse it. + scheme = re.compile(r"^https?://") + sanitized = [am if scheme.search(am) else "http://" + am for am in alertmanagers] + + # Create a mapping from paths to netlocs + # Group alertmanager targets into a dictionary of lists: + # {path: [netloc1, netloc2]} + paths = defaultdict(list) # type: Dict[Tuple[str, str], List[str]] + for parsed in map(urlparse, sanitized): + path = parsed.path or "/" + paths[(parsed.scheme, path)].append(parsed.netloc) + + return { + "alertmanagers": [ + { + # For https we still do not render a `tls_config` section because + # certs are expected to be made available by the charm via the + # `update-ca-certificates` mechanism. + "scheme": scheme, + "path_prefix": path_prefix, + "static_configs": [{"targets": netlocs}], + } + for (scheme, path_prefix), netlocs in paths.items() + ] + } + + +class RelationNotFoundError(Exception): + """Raised if there is no relation with the given name is found.""" + + def __init__(self, relation_name: str): + self.relation_name = relation_name + self.message = "No relation named '{}' found".format(relation_name) + + super().__init__(self.message) + + +class RelationInterfaceMismatchError(Exception): + """Raised if the relation with the given name has a different interface.""" + + def __init__( + self, + relation_name: str, + expected_relation_interface: str, + actual_relation_interface: str, + ): + self.relation_name = relation_name + self.expected_relation_interface = expected_relation_interface + self.actual_relation_interface = actual_relation_interface + self.message = ( + "The '{}' relation has '{}' as interface rather than the expected '{}'".format( + relation_name, actual_relation_interface, expected_relation_interface + ) + ) + + super().__init__(self.message) + + +class RelationRoleMismatchError(Exception): + """Raised if the relation with the given name has a different role.""" + + def __init__( + self, + relation_name: str, + expected_relation_role: RelationRole, + actual_relation_role: RelationRole, + ): + self.relation_name = relation_name + self.expected_relation_interface = expected_relation_role + self.actual_relation_role = actual_relation_role + self.message = "The '{}' relation has role '{}' rather than the expected '{}'".format( + relation_name, repr(actual_relation_role), repr(expected_relation_role) + ) + + super().__init__(self.message) + + +class InvalidAlertRuleEvent(EventBase): + """Event emitted when alert rule files are not parsable. + + Enables us to set a clear status on the provider. + """ + + def __init__(self, handle, errors: str = "", valid: bool = False): + super().__init__(handle) + self.errors = errors + self.valid = valid + + def snapshot(self) -> Dict: + """Save alert rule information.""" + return { + "valid": self.valid, + "errors": self.errors, + } + + def restore(self, snapshot): + """Restore alert rule information.""" + self.valid = snapshot["valid"] + self.errors = snapshot["errors"] + + +class InvalidScrapeJobEvent(EventBase): + """Event emitted when alert rule files are not valid.""" + + def __init__(self, handle, errors: str = ""): + super().__init__(handle) + self.errors = errors + + def snapshot(self) -> Dict: + """Save error information.""" + return {"errors": self.errors} + + def restore(self, snapshot): + """Restore error information.""" + self.errors = snapshot["errors"] + + +class MetricsEndpointProviderEvents(ObjectEvents): + """Events raised by :class:`InvalidAlertRuleEvent`s.""" + + alert_rule_status_changed = EventSource(InvalidAlertRuleEvent) + invalid_scrape_job = EventSource(InvalidScrapeJobEvent) + + +def _type_convert_stored(obj): + """Convert Stored* to their appropriate types, recursively.""" + if isinstance(obj, StoredList): + return list(map(_type_convert_stored, obj)) + if isinstance(obj, StoredDict): + rdict = {} # type: Dict[Any, Any] + for k in obj.keys(): + rdict[k] = _type_convert_stored(obj[k]) + return rdict + return obj + + +def _validate_relation_by_interface_and_direction( + charm: CharmBase, + relation_name: str, + expected_relation_interface: str, + expected_relation_role: RelationRole, +): + """Verifies that a relation has the necessary characteristics. + + Verifies that the `relation_name` provided: (1) exists in metadata.yaml, + (2) declares as interface the interface name passed as `relation_interface` + and (3) has the right "direction", i.e., it is a relation that `charm` + provides or requires. + + Args: + charm: a `CharmBase` object to scan for the matching relation. + relation_name: the name of the relation to be verified. + expected_relation_interface: the interface name to be matched by the + relation named `relation_name`. + expected_relation_role: whether the `relation_name` must be either + provided or required by `charm`. + + Raises: + RelationNotFoundError: If there is no relation in the charm's metadata.yaml + with the same name as provided via `relation_name` argument. + RelationInterfaceMismatchError: The relation with the same name as provided + via `relation_name` argument does not have the same relation interface + as specified via the `expected_relation_interface` argument. + RelationRoleMismatchError: If the relation with the same name as provided + via `relation_name` argument does not have the same role as specified + via the `expected_relation_role` argument. + """ + if relation_name not in charm.meta.relations: + raise RelationNotFoundError(relation_name) + + relation = charm.meta.relations[relation_name] + + actual_relation_interface = relation.interface_name + if actual_relation_interface != expected_relation_interface: + raise RelationInterfaceMismatchError( + relation_name, expected_relation_interface, actual_relation_interface or "None" + ) + + if expected_relation_role == RelationRole.provides: + if relation_name not in charm.meta.provides: + raise RelationRoleMismatchError( + relation_name, RelationRole.provides, RelationRole.requires + ) + elif expected_relation_role == RelationRole.requires: + if relation_name not in charm.meta.requires: + raise RelationRoleMismatchError( + relation_name, RelationRole.requires, RelationRole.provides + ) + else: + raise Exception("Unexpected RelationDirection: {}".format(expected_relation_role)) + + +class InvalidAlertRulePathError(Exception): + """Raised if the alert rules folder cannot be found or is otherwise invalid.""" + + def __init__( + self, + alert_rules_absolute_path: Path, + message: str, + ): + self.alert_rules_absolute_path = alert_rules_absolute_path + self.message = message + + super().__init__(self.message) + + +def _is_official_alert_rule_format(rules_dict: dict) -> bool: + """Are alert rules in the upstream format as supported by Prometheus. + + Alert rules in dictionary format are in "official" form if they + contain a "groups" key, since this implies they contain a list of + alert rule groups. + + Args: + rules_dict: a set of alert rules in Python dictionary format + + Returns: + True if alert rules are in official Prometheus file format. + """ + return "groups" in rules_dict + + +def _is_single_alert_rule_format(rules_dict: dict) -> bool: + """Are alert rules in single rule format. + + The Prometheus charm library supports reading of alert rules in a + custom format that consists of a single alert rule per file. This + does not conform to the official Prometheus alert rule file format + which requires that each alert rules file consists of a list of + alert rule groups and each group consists of a list of alert + rules. + + Alert rules in dictionary form are considered to be in single rule + format if in the least it contains two keys corresponding to the + alert rule name and alert expression. + + Returns: + True if alert rule is in single rule file format. + """ + # one alert rule per file + return set(rules_dict) >= {"alert", "expr"} + + +class TargetsChangedEvent(EventBase): + """Event emitted when Prometheus scrape targets change.""" + + def __init__(self, handle, relation_id): + super().__init__(handle) + self.relation_id = relation_id + + def snapshot(self): + """Save scrape target relation information.""" + return {"relation_id": self.relation_id} + + def restore(self, snapshot): + """Restore scrape target relation information.""" + self.relation_id = snapshot["relation_id"] + + +class MonitoringEvents(ObjectEvents): + """Event descriptor for events raised by `MetricsEndpointConsumer`.""" + + targets_changed = EventSource(TargetsChangedEvent) + + +class MetricsEndpointConsumer(Object): + """A Prometheus based Monitoring service.""" + + on = MonitoringEvents() # pyright: ignore + + def __init__(self, charm: CharmBase, relation_name: str = DEFAULT_RELATION_NAME): + """A Prometheus based Monitoring service. + + Args: + charm: a `CharmBase` instance that manages this + instance of the Prometheus service. + relation_name: an optional string name of the relation between `charm` + and the Prometheus charmed service. The default is "metrics-endpoint". + It is strongly advised not to change the default, so that people + deploying your charm will have a consistent experience with all + other charms that consume metrics endpoints. + + Raises: + RelationNotFoundError: If there is no relation in the charm's metadata.yaml + with the same name as provided via `relation_name` argument. + RelationInterfaceMismatchError: The relation with the same name as provided + via `relation_name` argument does not have the `prometheus_scrape` relation + interface. + RelationRoleMismatchError: If the relation with the same name as provided + via `relation_name` argument does not have the `RelationRole.requires` + role. + """ + _validate_relation_by_interface_and_direction( + charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.requires + ) + + super().__init__(charm, relation_name) + self._charm = charm + self._relation_name = relation_name + self._tool = CosTool(self._charm) + events = self._charm.on[relation_name] + self.framework.observe(events.relation_changed, self._on_metrics_provider_relation_changed) + self.framework.observe( + events.relation_departed, self._on_metrics_provider_relation_departed + ) + + def _on_metrics_provider_relation_changed(self, event): + """Handle changes with related metrics providers. + + Anytime there are changes in relations between Prometheus + and metrics provider charms the Prometheus charm is informed, + through a `TargetsChangedEvent` event. The Prometheus charm can + then choose to update its scrape configuration. + + Args: + event: a `CharmEvent` in response to which the Prometheus + charm must update its scrape configuration. + """ + rel_id = event.relation.id + + self.on.targets_changed.emit(relation_id=rel_id) + + def _on_metrics_provider_relation_departed(self, event): + """Update job config when a metrics provider departs. + + When a metrics provider departs the Prometheus charm is informed + through a `TargetsChangedEvent` event so that it can update its + scrape configuration to ensure that the departed metrics provider + is removed from the list of scrape jobs and + + Args: + event: a `CharmEvent` that indicates a metrics provider + unit has departed. + """ + rel_id = event.relation.id + self.on.targets_changed.emit(relation_id=rel_id) + + def jobs(self) -> list: + """Fetch the list of scrape jobs. + + Returns: + A list consisting of all the static scrape configurations + for each related `MetricsEndpointProvider` that has specified + its scrape targets. + """ + scrape_jobs = [] + + for relation in self._charm.model.relations[self._relation_name]: + static_scrape_jobs = self._static_scrape_config(relation) + if static_scrape_jobs: + # Duplicate job names will cause validate_scrape_jobs to fail. + # Therefore we need to dedupe here and after all jobs are collected. + static_scrape_jobs = _dedupe_job_names(static_scrape_jobs) + try: + self._tool.validate_scrape_jobs(static_scrape_jobs) + except subprocess.CalledProcessError as e: + if self._charm.unit.is_leader(): + data = json.loads(relation.data[self._charm.app].get("event", "{}")) + data["scrape_job_errors"] = str(e) + relation.data[self._charm.app]["event"] = json.dumps(data) + else: + scrape_jobs.extend(static_scrape_jobs) + + scrape_jobs = _dedupe_job_names(scrape_jobs) + + return scrape_jobs + + @property + def alerts(self) -> dict: + """Fetch alerts for all relations. + + A Prometheus alert rules file consists of a list of "groups". Each + group consists of a list of alerts (`rules`) that are sequentially + executed. This method returns all the alert rules provided by each + related metrics provider charm. These rules may be used to generate a + separate alert rules file for each relation since the returned list + of alert groups are indexed by that relations Juju topology identifier. + The Juju topology identifier string includes substrings that identify + alert rule related metadata such as the Juju model, model UUID and the + application name from where the alert rule originates. Since this + topology identifier is globally unique, it may be used for instance as + the name for the file into which the list of alert rule groups are + written. For each relation, the structure of data returned is a dictionary + representation of a standard prometheus rules file: + + {"groups": [{"name": ...}, ...]} + + per official prometheus documentation + https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/ + + The value of the `groups` key is such that it may be used to generate + a Prometheus alert rules file directly using `yaml.dump` but the + `groups` key itself must be included as this is required by Prometheus. + + For example the list of alert rule groups returned by this method may + be written into files consumed by Prometheus as follows + + ``` + for topology_identifier, alert_rule_groups in self.metrics_consumer.alerts().items(): + filename = "juju_" + topology_identifier + ".rules" + path = os.path.join(PROMETHEUS_RULES_DIR, filename) + rules = yaml.safe_dump(alert_rule_groups) + container.push(path, rules, make_dirs=True) + ``` + + Returns: + A dictionary mapping the Juju topology identifier of the source charm to + its list of alert rule groups. + """ + alerts = {} # type: Dict[str, dict] # mapping b/w juju identifiers and alert rule files + for relation in self._charm.model.relations[self._relation_name]: + if not relation.units or not relation.app: + continue + + alert_rules = json.loads(relation.data[relation.app].get("alert_rules", "{}")) + if not alert_rules: + continue + + alert_rules = self._inject_alert_expr_labels(alert_rules) + + identifier, topology = self._get_identifier_by_alert_rules(alert_rules) + if not topology: + try: + scrape_metadata = json.loads(relation.data[relation.app]["scrape_metadata"]) + identifier = JujuTopology.from_dict(scrape_metadata).identifier + + except KeyError as e: + logger.debug( + "Relation %s has no 'scrape_metadata': %s", + relation.id, + e, + ) + + if not identifier: + logger.error( + "Alert rules were found but no usable group or identifier was present." + ) + continue + + # We need to append the relation info to the identifier. This is to allow for cases for there are two + # relations which eventually scrape the same application. Issue #551. + identifier = f"{identifier}_{relation.name}_{relation.id}" + + alerts[identifier] = alert_rules + + _, errmsg = self._tool.validate_alert_rules(alert_rules) + if errmsg: + if alerts[identifier]: + del alerts[identifier] + if self._charm.unit.is_leader(): + data = json.loads(relation.data[self._charm.app].get("event", "{}")) + data["errors"] = errmsg + relation.data[self._charm.app]["event"] = json.dumps(data) + continue + + return alerts + + def _get_identifier_by_alert_rules( + self, rules: dict + ) -> Tuple[Union[str, None], Union[JujuTopology, None]]: + """Determine an appropriate dict key for alert rules. + + The key is used as the filename when writing alerts to disk, so the structure + and uniqueness is important. + + Args: + rules: a dict of alert rules + Returns: + A tuple containing an identifier, if found, and a JujuTopology, if it could + be constructed. + """ + if "groups" not in rules: + logger.debug("No alert groups were found in relation data") + return None, None + + # Construct an ID based on what's in the alert rules if they have labels + for group in rules["groups"]: + try: + labels = group["rules"][0]["labels"] + topology = JujuTopology( + # Don't try to safely get required constructor fields. There's already + # a handler for KeyErrors + model_uuid=labels["juju_model_uuid"], + model=labels["juju_model"], + application=labels["juju_application"], + unit=labels.get("juju_unit", ""), + charm_name=labels.get("juju_charm", ""), + ) + return topology.identifier, topology + except KeyError: + logger.debug("Alert rules were found but no usable labels were present") + continue + + logger.warning( + "No labeled alert rules were found, and no 'scrape_metadata' " + "was available. Using the alert group name as filename." + ) + try: + for group in rules["groups"]: + return group["name"], None + except KeyError: + logger.debug("No group name was found to use as identifier") + + return None, None + + def _inject_alert_expr_labels(self, rules: Dict[str, Any]) -> Dict[str, Any]: + """Iterate through alert rules and inject topology into expressions. + + Args: + rules: a dict of alert rules + """ + if "groups" not in rules: + return rules + + modified_groups = [] + for group in rules["groups"]: + # Copy off rules, so we don't modify an object we're iterating over + rules_copy = group["rules"] + for idx, rule in enumerate(rules_copy): + labels = rule.get("labels") + + if labels: + try: + topology = JujuTopology( + # Don't try to safely get required constructor fields. There's already + # a handler for KeyErrors + model_uuid=labels["juju_model_uuid"], + model=labels["juju_model"], + application=labels["juju_application"], + unit=labels.get("juju_unit", ""), + charm_name=labels.get("juju_charm", ""), + ) + + # Inject topology and put it back in the list + rule["expr"] = self._tool.inject_label_matchers( + re.sub(r"%%juju_topology%%,?", "", rule["expr"]), + topology.alert_expression_dict, + ) + except KeyError: + # Some required JujuTopology key is missing. Just move on. + pass + + group["rules"][idx] = rule + + modified_groups.append(group) + + rules["groups"] = modified_groups + return rules + + def _static_scrape_config(self, relation) -> list: + """Generate the static scrape configuration for a single relation. + + If the relation data includes `scrape_metadata` then the value + of this key is used to annotate the scrape jobs with Juju + Topology labels before returning them. + + Args: + relation: an `ops.model.Relation` object whose static + scrape configuration is required. + + Returns: + A list (possibly empty) of scrape jobs. Each job is a + valid Prometheus scrape configuration for that job, + represented as a Python dictionary. + """ + if not relation.units: + return [] + + scrape_configs = json.loads(relation.data[relation.app].get("scrape_jobs", "[]")) + + if not scrape_configs: + return [] + + scrape_metadata = json.loads(relation.data[relation.app].get("scrape_metadata", "{}")) + + if not scrape_metadata: + return scrape_configs + + topology = JujuTopology.from_dict(scrape_metadata) + + job_name_prefix = "juju_{}_prometheus_scrape".format(topology.identifier) + scrape_configs = PrometheusConfig.prefix_job_names(scrape_configs, job_name_prefix) + scrape_configs = PrometheusConfig.sanitize_scrape_configs(scrape_configs) + + hosts = self._relation_hosts(relation) + + scrape_configs = PrometheusConfig.expand_wildcard_targets_into_individual_jobs( + scrape_configs, hosts, topology + ) + + # For https scrape targets we still do not render a `tls_config` section because certs + # are expected to be made available by the charm via the `update-ca-certificates` mechanism. + return scrape_configs + + def _relation_hosts(self, relation: Relation) -> Dict[str, Tuple[str, str]]: + """Returns a mapping from unit names to (address, path) tuples, for the given relation.""" + hosts = {} + for unit in relation.units: + # TODO deprecate and remove unit.name + unit_name = relation.data[unit].get("prometheus_scrape_unit_name") or unit.name + # TODO deprecate and remove "prometheus_scrape_host" + unit_address = relation.data[unit].get( + "prometheus_scrape_unit_address" + ) or relation.data[unit].get("prometheus_scrape_host") + unit_path = relation.data[unit].get("prometheus_scrape_unit_path", "") + if unit_name and unit_address: + hosts.update({unit_name: (unit_address, unit_path)}) + + return hosts + + def _target_parts(self, target) -> list: + """Extract host and port from a wildcard target. + + Args: + target: a string specifying a scrape target. A + scrape target is expected to have the format + "host:port". The host part may be a wildcard + "*" and the port part can be missing (along + with ":") in which case port is set to 80. + + Returns: + a list with target host and port as in [host, port] + """ + if ":" in target: + parts = target.split(":") + else: + parts = [target, "80"] + + return parts + + +def _dedupe_job_names(jobs: List[dict]): + """Deduplicate a list of dicts by appending a hash to the value of the 'job_name' key. + + Additionally, fully de-duplicate any identical jobs. + + Args: + jobs: A list of prometheus scrape jobs + """ + jobs_copy = copy.deepcopy(jobs) + + # Convert to a dict with job names as keys + # I think this line is O(n^2) but it should be okay given the list sizes + jobs_dict = { + job["job_name"]: list(filter(lambda x: x["job_name"] == job["job_name"], jobs_copy)) + for job in jobs_copy + } + + # If multiple jobs have the same name, convert the name to "name_" + for key in jobs_dict: + if len(jobs_dict[key]) > 1: + for job in jobs_dict[key]: + job_json = json.dumps(job) + hashed = hashlib.sha256(job_json.encode()).hexdigest() + job["job_name"] = "{}_{}".format(job["job_name"], hashed) + new_jobs = [] + for key in jobs_dict: + new_jobs.extend(list(jobs_dict[key])) + + # Deduplicate jobs which are equal + # Again this in O(n^2) but it should be okay + deduped_jobs = [] + seen = [] + for job in new_jobs: + job_json = json.dumps(job) + hashed = hashlib.sha256(job_json.encode()).hexdigest() + if hashed in seen: + continue + seen.append(hashed) + deduped_jobs.append(job) + + return deduped_jobs + + +def _resolve_dir_against_charm_path(charm: CharmBase, *path_elements: str) -> str: + """Resolve the provided path items against the directory of the main file. + + Look up the directory of the `main.py` file being executed. This is normally + going to be the charm.py file of the charm including this library. Then, resolve + the provided path elements and, if the result path exists and is a directory, + return its absolute path; otherwise, raise en exception. + + Raises: + InvalidAlertRulePathError, if the path does not exist or is not a directory. + """ + charm_dir = Path(str(charm.charm_dir)) + if not charm_dir.exists() or not charm_dir.is_dir(): + # Operator Framework does not currently expose a robust + # way to determine the top level charm source directory + # that is consistent across deployed charms and unit tests + # Hence for unit tests the current working directory is used + # TODO: updated this logic when the following ticket is resolved + # https://github.com/canonical/operator/issues/643 + charm_dir = Path(os.getcwd()) + + alerts_dir_path = charm_dir.absolute().joinpath(*path_elements) + + if not alerts_dir_path.exists(): + raise InvalidAlertRulePathError(alerts_dir_path, "directory does not exist") + if not alerts_dir_path.is_dir(): + raise InvalidAlertRulePathError(alerts_dir_path, "is not a directory") + + return str(alerts_dir_path) + + +class MetricsEndpointProvider(Object): + """A metrics endpoint for Prometheus.""" + + on = MetricsEndpointProviderEvents() # pyright: ignore + + def __init__( + self, + charm, + relation_name: str = DEFAULT_RELATION_NAME, + jobs=None, + alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH, + refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None, + external_url: str = "", + lookaside_jobs_callable: Optional[Callable] = None, + ): + """Construct a metrics provider for a Prometheus charm. + + If your charm exposes a Prometheus metrics endpoint, the + `MetricsEndpointProvider` object enables your charm to easily + communicate how to reach that metrics endpoint. + + By default, a charm instantiating this object has the metrics + endpoints of each of its units scraped by the related Prometheus + charms. The scraped metrics are automatically tagged by the + Prometheus charms with Juju topology data via the + `juju_model_name`, `juju_model_uuid`, `juju_application_name` + and `juju_unit` labels. To support such tagging `MetricsEndpointProvider` + automatically forwards scrape metadata to a `MetricsEndpointConsumer` + (Prometheus charm). + + Scrape targets provided by `MetricsEndpointProvider` can be + customized when instantiating this object. For example in the + case of a charm exposing the metrics endpoint for each of its + units on port 8080 and the `/metrics` path, the + `MetricsEndpointProvider` can be instantiated as follows: + + self.metrics_endpoint_provider = MetricsEndpointProvider( + self, + jobs=[{ + "static_configs": [{"targets": ["*:8080"]}], + }]) + + The notation `*:` means "scrape each unit of this charm on port + ``. + + In case the metrics endpoints are not on the standard `/metrics` path, + a custom path can be specified as follows: + + self.metrics_endpoint_provider = MetricsEndpointProvider( + self, + jobs=[{ + "metrics_path": "/my/strange/metrics/path", + "static_configs": [{"targets": ["*:8080"]}], + }]) + + Note how the `jobs` argument is a list: this allows you to expose multiple + combinations of paths "metrics_path" and "static_configs" in case your charm + exposes multiple endpoints, which could happen, for example, when you have + multiple workload containers, with applications in each needing to be scraped. + The structure of the objects in the `jobs` list is one-to-one with the + `scrape_config` configuration item of Prometheus' own configuration (see + https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config + ), but with only a subset of the fields allowed. The permitted fields are + listed in `ALLOWED_KEYS` object in this charm library module. + + It is also possible to specify alert rules. By default, this library will look + into the `/prometheus_alert_rules`, which in a standard charm + layouts resolves to `src/prometheus_alert_rules`. Each alert rule goes into a + separate `*.rule` file. If the syntax of a rule is invalid, + the `MetricsEndpointProvider` logs an error and does not load the particular + rule. + + To avoid false positives and negatives in the evaluation of alert rules, + all ingested alert rule expressions are automatically qualified using Juju + Topology filters. This ensures that alert rules provided by your charm, trigger + alerts based only on data scrapped from your charm. For example an alert rule + such as the following + + alert: UnitUnavailable + expr: up < 1 + for: 0m + + will be automatically transformed into something along the lines of the following + + alert: UnitUnavailable + expr: up{juju_model=, juju_model_uuid=, juju_application=} < 1 + for: 0m + + An attempt will be made to validate alert rules prior to loading them into Prometheus. + If they are invalid, an event will be emitted from this object which charms can respond + to in order to set a meaningful status for administrators. + + This can be observed via `consumer.on.alert_rule_status_changed` which contains: + - The error(s) encountered when validating as `errors` + - A `valid` attribute, which can be used to reset the state of charms if alert rules + are updated via another mechanism (e.g. `cos-config`) and refreshed. + + Args: + charm: a `CharmBase` object that manages this + `MetricsEndpointProvider` object. Typically, this is + `self` in the instantiating class. + relation_name: an optional string name of the relation between `charm` + and the Prometheus charmed service. The default is "metrics-endpoint". + It is strongly advised not to change the default, so that people + deploying your charm will have a consistent experience with all + other charms that provide metrics endpoints. + jobs: an optional list of dictionaries where each + dictionary represents the Prometheus scrape + configuration for a single job. When not provided, a + default scrape configuration is provided for the + `/metrics` endpoint polling all units of the charm on port `80` + using the `MetricsEndpointProvider` object. + alert_rules_path: an optional path for the location of alert rules + files. Defaults to "./prometheus_alert_rules", + resolved relative to the directory hosting the charm entry file. + The alert rules are automatically updated on charm upgrade. + refresh_event: an optional bound event or list of bound events which + will be observed to re-set scrape job data (IP address and others) + external_url: an optional argument that represents an external url that + can be generated by an Ingress or a Proxy. + lookaside_jobs_callable: an optional `Callable` which should be invoked + when the job configuration is built as a secondary mapping. The callable + should return a `List[Dict]` which is syntactically identical to the + `jobs` parameter, but can be updated out of step initialization of + this library without disrupting the 'global' job spec. + + Raises: + RelationNotFoundError: If there is no relation in the charm's metadata.yaml + with the same name as provided via `relation_name` argument. + RelationInterfaceMismatchError: The relation with the same name as provided + via `relation_name` argument does not have the `prometheus_scrape` relation + interface. + RelationRoleMismatchError: If the relation with the same name as provided + via `relation_name` argument does not have the `RelationRole.provides` + role. + """ + _validate_relation_by_interface_and_direction( + charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides + ) + + try: + alert_rules_path = _resolve_dir_against_charm_path(charm, alert_rules_path) + except InvalidAlertRulePathError as e: + logger.debug( + "Invalid Prometheus alert rules folder at %s: %s", + e.alert_rules_absolute_path, + e.message, + ) + + super().__init__(charm, relation_name) + self.topology = JujuTopology.from_charm(charm) + + self._charm = charm + self._alert_rules_path = alert_rules_path + self._relation_name = relation_name + # sanitize job configurations to the supported subset of parameters + jobs = [] if jobs is None else jobs + self._jobs = PrometheusConfig.sanitize_scrape_configs(jobs) + + if external_url: + external_url = ( + external_url if urlparse(external_url).scheme else ("http://" + external_url) + ) + self.external_url = external_url + self._lookaside_jobs = lookaside_jobs_callable + + events = self._charm.on[self._relation_name] + self.framework.observe(events.relation_changed, self._on_relation_changed) + + if not refresh_event: + # FIXME remove once podspec charms are verified. + # `self.set_scrape_job_spec()` is called every re-init so this should not be needed. + if len(self._charm.meta.containers) == 1: + if "kubernetes" in self._charm.meta.series: + # This is a podspec charm + refresh_event = [self._charm.on.update_status] + else: + # This is a sidecar/pebble charm + container = list(self._charm.meta.containers.values())[0] + refresh_event = [self._charm.on[container.name.replace("-", "_")].pebble_ready] + else: + logger.warning( + "%d containers are present in metadata.yaml and " + "refresh_event was not specified. Defaulting to update_status. " + "Metrics IP may not be set in a timely fashion.", + len(self._charm.meta.containers), + ) + refresh_event = [self._charm.on.update_status] + + else: + if not isinstance(refresh_event, list): + refresh_event = [refresh_event] + + self.framework.observe(events.relation_joined, self.set_scrape_job_spec) + for ev in refresh_event: + self.framework.observe(ev, self.set_scrape_job_spec) + + def _on_relation_changed(self, event): + """Check for alert rule messages in the relation data before moving on.""" + if self._charm.unit.is_leader(): + ev = json.loads(event.relation.data[event.app].get("event", "{}")) + + if ev: + valid = bool(ev.get("valid", True)) + errors = ev.get("errors", "") + + if valid and not errors: + self.on.alert_rule_status_changed.emit(valid=valid) + else: + self.on.alert_rule_status_changed.emit(valid=valid, errors=errors) + + scrape_errors = ev.get("scrape_job_errors", None) + if scrape_errors: + self.on.invalid_scrape_job.emit(errors=scrape_errors) + + def update_scrape_job_spec(self, jobs): + """Update scrape job specification.""" + self._jobs = PrometheusConfig.sanitize_scrape_configs(jobs) + self.set_scrape_job_spec() + + def set_scrape_job_spec(self, _=None): + """Ensure scrape target information is made available to prometheus. + + When a metrics provider charm is related to a prometheus charm, the + metrics provider sets specification and metadata related to its own + scrape configuration. This information is set using Juju application + data. In addition, each of the consumer units also sets its own + host address in Juju unit relation data. + """ + self._set_unit_ip() + + if not self._charm.unit.is_leader(): + return + + alert_rules = AlertRules(query_type="promql", topology=self.topology) + alert_rules.add_path(self._alert_rules_path, recursive=True) + alert_rules_as_dict = alert_rules.as_dict() + + for relation in self._charm.model.relations[self._relation_name]: + relation.data[self._charm.app]["scrape_metadata"] = json.dumps(self._scrape_metadata) + relation.data[self._charm.app]["scrape_jobs"] = json.dumps(self._scrape_jobs) + + # Update relation data with the string representation of the rule file. + # Juju topology is already included in the "scrape_metadata" field above. + # The consumer side of the relation uses this information to name the rules file + # that is written to the filesystem. + relation.data[self._charm.app]["alert_rules"] = json.dumps(alert_rules_as_dict) + + def _set_unit_ip(self, _=None): + """Set unit host address. + + Each time a metrics provider charm container is restarted it updates its own + host address in the unit relation data for the prometheus charm. + + The only argument specified is an event, and it ignored. This is for expediency + to be able to use this method as an event handler, although no access to the + event is actually needed. + """ + for relation in self._charm.model.relations[self._relation_name]: + unit_ip = str(self._charm.model.get_binding(relation).network.bind_address) + + # TODO store entire url in relation data, instead of only select url parts. + + if self.external_url: + parsed = urlparse(self.external_url) + unit_address = parsed.hostname + path = parsed.path + elif self._is_valid_unit_address(unit_ip): + unit_address = unit_ip + path = "" + else: + unit_address = socket.getfqdn() + path = "" + + relation.data[self._charm.unit]["prometheus_scrape_unit_address"] = unit_address + relation.data[self._charm.unit]["prometheus_scrape_unit_path"] = path + relation.data[self._charm.unit]["prometheus_scrape_unit_name"] = str( + self._charm.model.unit.name + ) + + def _is_valid_unit_address(self, address: str) -> bool: + """Validate a unit address. + + At present only IP address validation is supported, but + this may be extended to DNS addresses also, as needed. + + Args: + address: a string representing a unit address + """ + try: + _ = ipaddress.ip_address(address) + except ValueError: + return False + + return True + + @property + def _scrape_jobs(self) -> list: + """Fetch list of scrape jobs. + + Returns: + A list of dictionaries, where each dictionary specifies a + single scrape job for Prometheus. + """ + jobs = self._jobs or [] + if callable(self._lookaside_jobs): + jobs.extend(PrometheusConfig.sanitize_scrape_configs(self._lookaside_jobs())) + return jobs or [DEFAULT_JOB] + + @property + def _scrape_metadata(self) -> dict: + """Generate scrape metadata. + + Returns: + Scrape configuration metadata for this metrics provider charm. + """ + return self.topology.as_dict() + + +class PrometheusRulesProvider(Object): + """Forward rules to Prometheus. + + This object may be used to forward rules to Prometheus. At present it only supports + forwarding alert rules. This is unlike :class:`MetricsEndpointProvider`, which + is used for forwarding both scrape targets and associated alert rules. This object + is typically used when there is a desire to forward rules that apply globally (across + all deployed charms and units) rather than to a single charm. All rule files are + forwarded using the same 'prometheus_scrape' interface that is also used by + `MetricsEndpointProvider`. + + Args: + charm: A charm instance that `provides` a relation with the `prometheus_scrape` interface. + relation_name: Name of the relation in `metadata.yaml` that + has the `prometheus_scrape` interface. + dir_path: Root directory for the collection of rule files. + recursive: Whether to scan for rule files recursively. + """ + + def __init__( + self, + charm: CharmBase, + relation_name: str = DEFAULT_RELATION_NAME, + dir_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH, + recursive=True, + ): + super().__init__(charm, relation_name) + self._charm = charm + self._relation_name = relation_name + self._recursive = recursive + + try: + dir_path = _resolve_dir_against_charm_path(charm, dir_path) + except InvalidAlertRulePathError as e: + logger.debug( + "Invalid Prometheus alert rules folder at %s: %s", + e.alert_rules_absolute_path, + e.message, + ) + self.dir_path = dir_path + + events = self._charm.on[self._relation_name] + event_sources = [ + events.relation_joined, + events.relation_changed, + self._charm.on.leader_elected, + self._charm.on.upgrade_charm, + ] + + for event_source in event_sources: + self.framework.observe(event_source, self._update_relation_data) + + def _reinitialize_alert_rules(self): + """Reloads alert rules and updates all relations.""" + self._update_relation_data(None) + + def _update_relation_data(self, _): + """Update application relation data with alert rules for all relations.""" + if not self._charm.unit.is_leader(): + return + + alert_rules = AlertRules(query_type="promql") + alert_rules.add_path(self.dir_path, recursive=self._recursive) + alert_rules_as_dict = alert_rules.as_dict() + + logger.info("Updating relation data with rule files from disk") + for relation in self._charm.model.relations[self._relation_name]: + relation.data[self._charm.app]["alert_rules"] = json.dumps( + alert_rules_as_dict, + sort_keys=True, # sort, to prevent unnecessary relation_changed events + ) + + +class MetricsEndpointAggregator(Object): + """Aggregate metrics from multiple scrape targets. + + `MetricsEndpointAggregator` collects scrape target information from one + or more related charms and forwards this to a `MetricsEndpointConsumer` + charm, which may be in a different Juju model. However, it is + essential that `MetricsEndpointAggregator` itself resides in the same + model as its scrape targets, as this is currently the only way to + ensure in Juju that the `MetricsEndpointAggregator` will be able to + determine the model name and uuid of the scrape targets. + + `MetricsEndpointAggregator` should be used in place of + `MetricsEndpointProvider` in the following two use cases: + + 1. Integrating one or more scrape targets that do not support the + `prometheus_scrape` interface. + + 2. Integrating one or more scrape targets through cross model + relations. Although the [Scrape Config Operator](https://charmhub.io/cos-configuration-k8s) + may also be used for the purpose of supporting cross model + relations. + + Using `MetricsEndpointAggregator` to build a Prometheus charm client + only requires instantiating it. Instantiating + `MetricsEndpointAggregator` is similar to `MetricsEndpointProvider` except + that it requires specifying the names of three relations: the + relation with scrape targets, the relation for alert rules, and + that with the Prometheus charms. For example + + ```python + self._aggregator = MetricsEndpointAggregator( + self, + { + "prometheus": "monitoring", + "scrape_target": "prometheus-target", + "alert_rules": "prometheus-rules" + } + ) + ``` + + `MetricsEndpointAggregator` assumes that each unit of a scrape target + sets in its unit-level relation data two entries with keys + "hostname" and "port". If it is required to integrate with charms + that do not honor these assumptions, it is always possible to + derive from `MetricsEndpointAggregator` overriding the `_get_targets()` + method, which is responsible for aggregating the unit name, host + address ("hostname") and port of the scrape target. + `MetricsEndpointAggregator` also assumes that each unit of a + scrape target sets in its unit-level relation data a key named + "groups". The value of this key is expected to be the string + representation of list of Prometheus Alert rules in YAML format. + An example of a single such alert rule is + + ```yaml + - alert: HighRequestLatency + expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5 + for: 10m + labels: + severity: page + annotations: + summary: High request latency + ``` + + Once again if it is required to integrate with charms that do not + honour these assumptions about alert rules then an object derived + from `MetricsEndpointAggregator` may be used by overriding the + `_get_alert_rules()` method. + + `MetricsEndpointAggregator` ensures that Prometheus scrape job + specifications and alert rules are annotated with Juju topology + information, just like `MetricsEndpointProvider` and + `MetricsEndpointConsumer` do. + + By default, `MetricsEndpointAggregator` ensures that Prometheus + "instance" labels refer to Juju topology. This ensures that + instance labels are stable over unit recreation. While it is not + advisable to change this option, if required it can be done by + setting the "relabel_instance" keyword argument to `False` when + constructing an aggregator object. + """ + + _stored = StoredState() + + def __init__( + self, + charm, + relation_names: Optional[dict] = None, + relabel_instance=True, + resolve_addresses=False, + ): + """Construct a `MetricsEndpointAggregator`. + + Args: + charm: a `CharmBase` object that manages this + `MetricsEndpointAggregator` object. Typically, this is + `self` in the instantiating class. + relation_names: a dictionary with three keys. The value + of the "scrape_target" and "alert_rules" keys are + the relation names over which scrape job and alert rule + information is gathered by this `MetricsEndpointAggregator`. + And the value of the "prometheus" key is the name of + the relation with a `MetricsEndpointConsumer` such as + the Prometheus charm. + relabel_instance: A boolean flag indicating if Prometheus + scrape job "instance" labels must refer to Juju Topology. + resolve_addresses: A boolean flag indiccating if the aggregator + should attempt to perform DNS lookups of targets and append + a `dns_name` label + """ + self._charm = charm + + relation_names = relation_names or {} + + self._prometheus_relation = relation_names.get( + "prometheus", "downstream-prometheus-scrape" + ) + self._target_relation = relation_names.get("scrape_target", "prometheus-target") + self._alert_rules_relation = relation_names.get("alert_rules", "prometheus-rules") + + super().__init__(charm, self._prometheus_relation) + self._stored.set_default(jobs=[], alert_rules=[]) + + self._relabel_instance = relabel_instance + self._resolve_addresses = resolve_addresses + + # manage Prometheus charm relation events + prometheus_events = self._charm.on[self._prometheus_relation] + self.framework.observe(prometheus_events.relation_joined, self._set_prometheus_data) + + # manage list of Prometheus scrape jobs from related scrape targets + target_events = self._charm.on[self._target_relation] + self.framework.observe(target_events.relation_changed, self._on_prometheus_targets_changed) + self.framework.observe( + target_events.relation_departed, self._on_prometheus_targets_departed + ) + + # manage alert rules for Prometheus from related scrape targets + alert_rule_events = self._charm.on[self._alert_rules_relation] + self.framework.observe(alert_rule_events.relation_changed, self._on_alert_rules_changed) + self.framework.observe(alert_rule_events.relation_departed, self._on_alert_rules_departed) + + def _set_prometheus_data(self, event): + """Ensure every new Prometheus instances is updated. + + Any time a new Prometheus unit joins the relation with + `MetricsEndpointAggregator`, that Prometheus unit is provided + with the complete set of existing scrape jobs and alert rules. + """ + if not self._charm.unit.is_leader(): + return + + jobs = [] + _type_convert_stored( + self._stored.jobs # pyright: ignore + ) # list of scrape jobs, one per relation + for relation in self.model.relations[self._target_relation]: + targets = self._get_targets(relation) + if targets and relation.app: + jobs.append(self._static_scrape_job(targets, relation.app.name)) + + groups = [] + _type_convert_stored( + self._stored.alert_rules # pyright: ignore + ) # list of alert rule groups + for relation in self.model.relations[self._alert_rules_relation]: + unit_rules = self._get_alert_rules(relation) + if unit_rules and relation.app: + appname = relation.app.name + rules = self._label_alert_rules(unit_rules, appname) + group = {"name": self.group_name(appname), "rules": rules} + groups.append(group) + + event.relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs) + event.relation.data[self._charm.app]["alert_rules"] = json.dumps({"groups": groups}) + + def _on_prometheus_targets_changed(self, event): + """Update scrape jobs in response to scrape target changes. + + When there is any change in relation data with any scrape + target, the Prometheus scrape job, for that specific target is + updated. + """ + targets = self._get_targets(event.relation) + if not targets: + return + + # new scrape job for the relation that has changed + self.set_target_job_data(targets, event.relation.app.name) + + def set_target_job_data(self, targets: dict, app_name: str, **kwargs) -> None: + """Update scrape jobs in response to scrape target changes. + + When there is any change in relation data with any scrape + target, the Prometheus scrape job, for that specific target is + updated. Additionally, if this method is called manually, do the + same. + + Args: + targets: a `dict` containing target information + app_name: a `str` identifying the application + kwargs: a `dict` of the extra arguments passed to the function + """ + if not self._charm.unit.is_leader(): + return + + # new scrape job for the relation that has changed + updated_job = self._static_scrape_job(targets, app_name, **kwargs) + + for relation in self.model.relations[self._prometheus_relation]: + jobs = json.loads(relation.data[self._charm.app].get("scrape_jobs", "[]")) + # list of scrape jobs that have not changed + jobs = [job for job in jobs if updated_job["job_name"] != job["job_name"]] + jobs.append(updated_job) + relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs) + + if not _type_convert_stored(self._stored.jobs) == jobs: # pyright: ignore + self._stored.jobs = jobs + + def _on_prometheus_targets_departed(self, event): + """Remove scrape jobs when a target departs. + + Any time a scrape target departs, any Prometheus scrape job + associated with that specific scrape target is removed. + """ + job_name = self._job_name(event.relation.app.name) + unit_name = event.unit.name + self.remove_prometheus_jobs(job_name, unit_name) + + def remove_prometheus_jobs(self, job_name: str, unit_name: Optional[str] = ""): + """Given a job name and unit name, remove scrape jobs associated. + + The `unit_name` parameter is used for automatic, relation data bag-based + generation, where the unit name in labels can be used to ensure that jobs with + similar names (which are generated via the app name when scanning relation data + bags) are not accidentally removed, as their unit name labels will differ. + For NRPE, the job name is calculated from an ID sent via the NRPE relation, and is + sufficient to uniquely identify the target. + """ + if not self._charm.unit.is_leader(): + return + + for relation in self.model.relations[self._prometheus_relation]: + jobs = json.loads(relation.data[self._charm.app].get("scrape_jobs", "[]")) + if not jobs: + continue + + changed_job = [j for j in jobs if j.get("job_name") == job_name] + if not changed_job: + continue + changed_job = changed_job[0] + + # list of scrape jobs that have not changed + jobs = [job for job in jobs if job.get("job_name") != job_name] + + # list of scrape jobs for units of the same application that still exist + configs_kept = [ + config + for config in changed_job["static_configs"] # type: ignore + if config.get("labels", {}).get("juju_unit") != unit_name + ] + + if configs_kept: + changed_job["static_configs"] = configs_kept # type: ignore + jobs.append(changed_job) + + relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs) + + if not _type_convert_stored(self._stored.jobs) == jobs: # pyright: ignore + self._stored.jobs = jobs + + def _job_name(self, appname) -> str: + """Construct a scrape job name. + + Each relation has its own unique scrape job name. All units in + the relation are scraped as part of the same scrape job. + + Args: + appname: string name of a related application. + + Returns: + a string Prometheus scrape job name for the application. + """ + return "juju_{}_{}_{}_prometheus_scrape".format( + self.model.name, self.model.uuid[:7], appname + ) + + def _get_targets(self, relation) -> dict: + """Fetch scrape targets for a relation. + + Scrape target information is returned for each unit in the + relation. This information contains the unit name, network + hostname (or address) for that unit, and port on which a + metrics endpoint is exposed in that unit. + + Args: + relation: an `ops.model.Relation` object for which scrape + targets are required. + + Returns: + a dictionary whose keys are names of the units in the + relation. There values associated with each key is itself + a dictionary of the form + ``` + {"hostname": hostname, "port": port} + ``` + """ + targets = {} + for unit in relation.units: + port = relation.data[unit].get("port", 80) + hostname = relation.data[unit].get("hostname") + if hostname: + targets.update({unit.name: {"hostname": hostname, "port": port}}) + + return targets + + def _static_scrape_job(self, targets, application_name, **kwargs) -> dict: + """Construct a static scrape job for an application. + + Args: + targets: a dictionary providing hostname and port for all + scrape target. The keys of this dictionary are unit + names. Values corresponding to these keys are + themselves a dictionary with keys "hostname" and + "port". + application_name: a string name of the application for + which this static scrape job is being constructed. + kwargs: a `dict` of the extra arguments passed to the function + + Returns: + A dictionary corresponding to a Prometheus static scrape + job configuration for one application. The returned + dictionary may be transformed into YAML and appended to + the list of any existing list of Prometheus static configs. + """ + juju_model = self.model.name + juju_model_uuid = self.model.uuid + + job = { + "job_name": self._job_name(application_name), + "static_configs": [ + { + "targets": ["{}:{}".format(target["hostname"], target["port"])], + "labels": { + "juju_model": juju_model, + "juju_model_uuid": juju_model_uuid, + "juju_application": application_name, + "juju_unit": unit_name, + "host": target["hostname"], + # Expanding this will merge the dicts and replace the + # topology labels if any were present/found + **self._static_config_extra_labels(target), + }, + } + for unit_name, target in targets.items() + ], + "relabel_configs": self._relabel_configs + kwargs.get("relabel_configs", []), + } + job.update(kwargs.get("updates", {})) + + return job + + def _static_config_extra_labels(self, target: Dict[str, str]) -> Dict[str, str]: + """Build a list of extra static config parameters, if specified.""" + extra_info = {} + + if self._resolve_addresses: + try: + dns_name = socket.gethostbyaddr(target["hostname"])[0] + except OSError: + logger.debug("Could not perform DNS lookup for %s", target["hostname"]) + dns_name = target["hostname"] + extra_info["dns_name"] = dns_name + + return extra_info + + @property + def _relabel_configs(self) -> list: + """Create Juju topology relabeling configuration. + + Using Juju topology for instance labels ensures that these + labels are stable across unit recreation. + + Returns: + a list of Prometheus relabeling configurations. Each item in + this list is one relabel configuration. + """ + return ( + [ + { + "source_labels": [ + "juju_model", + "juju_model_uuid", + "juju_application", + "juju_unit", + ], + "separator": "_", + "target_label": "instance", + "regex": "(.*)", + } + ] + if self._relabel_instance + else [] + ) + + def _on_alert_rules_changed(self, event): + """Update alert rules in response to scrape target changes. + + When there is any change in alert rule relation data for any + scrape target, the list of alert rules for that specific + target is updated. + """ + unit_rules = self._get_alert_rules(event.relation) + if not unit_rules: + return + + app_name = event.relation.app.name + self.set_alert_rule_data(app_name, unit_rules) + + def set_alert_rule_data(self, name: str, unit_rules: dict, label_rules: bool = True) -> None: + """Update alert rule data. + + The unit rules should be a dict, which is has additional Juju topology labels added. For + rules generated by the NRPE exporter, they are pre-labeled so lookups can be performed. + """ + if not self._charm.unit.is_leader(): + return + + if label_rules: + rules = self._label_alert_rules(unit_rules, name) + else: + rules = [unit_rules] + updated_group = {"name": self.group_name(name), "rules": rules} + + for relation in self.model.relations[self._prometheus_relation]: + alert_rules = json.loads(relation.data[self._charm.app].get("alert_rules", "{}")) + groups = alert_rules.get("groups", []) + # list of alert rule groups that have not changed + for group in groups: + if group["name"] == updated_group["name"]: + group["rules"] = [r for r in group["rules"] if r not in updated_group["rules"]] + group["rules"].extend(updated_group["rules"]) + + if updated_group["name"] not in [g["name"] for g in groups]: + groups.append(updated_group) + relation.data[self._charm.app]["alert_rules"] = json.dumps({"groups": groups}) + + if not _type_convert_stored(self._stored.alert_rules) == groups: # pyright: ignore + self._stored.alert_rules = groups + + def _on_alert_rules_departed(self, event): + """Remove alert rules for departed targets. + + Any time a scrape target departs any alert rules associated + with that specific scrape target is removed. + """ + group_name = self.group_name(event.relation.app.name) + unit_name = event.unit.name + self.remove_alert_rules(group_name, unit_name) + + def remove_alert_rules(self, group_name: str, unit_name: str) -> None: + """Remove an alert rule group from relation data.""" + if not self._charm.unit.is_leader(): + return + + for relation in self.model.relations[self._prometheus_relation]: + alert_rules = json.loads(relation.data[self._charm.app].get("alert_rules", "{}")) + if not alert_rules: + continue + + groups = alert_rules.get("groups", []) + if not groups: + continue + + changed_group = [group for group in groups if group["name"] == group_name] + if not changed_group: + continue + changed_group = changed_group[0] + + # list of alert rule groups that have not changed + groups = [group for group in groups if group["name"] != group_name] + + # list of alert rules not associated with departing unit + rules_kept = [ + rule + for rule in changed_group.get("rules") # type: ignore + if rule.get("labels").get("juju_unit") != unit_name + ] + + if rules_kept: + changed_group["rules"] = rules_kept # type: ignore + groups.append(changed_group) + + relation.data[self._charm.app]["alert_rules"] = ( + json.dumps({"groups": groups}) if groups else "{}" + ) + + if not _type_convert_stored(self._stored.alert_rules) == groups: # pyright: ignore + self._stored.alert_rules = groups + + def _get_alert_rules(self, relation) -> dict: + """Fetch alert rules for a relation. + + Each unit of the related scrape target may have its own + associated alert rules. Alert rules for all units are returned + indexed by unit name. + + Args: + relation: an `ops.model.Relation` object for which alert + rules are required. + + Returns: + a dictionary whose keys are names of the units in the + relation. There values associated with each key is a list + of alert rules. Each rule is in dictionary format. The + structure "rule dictionary" corresponds to single + Prometheus alert rule. + """ + rules = {} + for unit in relation.units: + unit_rules = yaml.safe_load(relation.data[unit].get("groups", "")) + if unit_rules: + rules.update({unit.name: unit_rules}) + + return rules + + def group_name(self, unit_name: str) -> str: + """Construct name for an alert rule group. + + Each unit in a relation may define its own alert rules. All + rules, for all units in a relation are grouped together and + given a single alert rule group name. + + Args: + unit_name: string name of a related application. + + Returns: + a string Prometheus alert rules group name for the unit. + """ + unit_name = re.sub(r"/", "_", unit_name) + return "juju_{}_{}_{}_alert_rules".format(self.model.name, self.model.uuid[:7], unit_name) + + def _label_alert_rules(self, unit_rules, app_name: str) -> list: + """Apply juju topology labels to alert rules. + + Args: + unit_rules: a list of alert rules, where each rule is in + dictionary format. + app_name: a string name of the application to which the + alert rules belong. + + Returns: + a list of alert rules with Juju topology labels. + """ + labeled_rules = [] + for unit_name, rules in unit_rules.items(): + for rule in rules: + # the new JujuTopology removed this, so build it up by hand + matchers = { + "juju_{}".format(k): v + for k, v in JujuTopology(self.model.name, self.model.uuid, app_name, unit_name) + .as_dict(excluded_keys=["charm_name"]) + .items() + } + rule["labels"].update(matchers.items()) + labeled_rules.append(rule) + + return labeled_rules + + +class CosTool: + """Uses cos-tool to inject label matchers into alert rule expressions and validate rules.""" + + _path = None + _disabled = False + + def __init__(self, charm): + self._charm = charm + + @property + def path(self): + """Lazy lookup of the path of cos-tool.""" + if self._disabled: + return None + if not self._path: + self._path = self._get_tool_path() + if not self._path: + logger.debug("Skipping injection of juju topology as label matchers") + self._disabled = True + return self._path + + def apply_label_matchers(self, rules) -> dict: + """Will apply label matchers to the expression of all alerts in all supplied groups.""" + if not self.path: + return rules + for group in rules["groups"]: + rules_in_group = group.get("rules", []) + for rule in rules_in_group: + topology = {} + # if the user for some reason has provided juju_unit, we'll need to honor it + # in most cases, however, this will be empty + for label in [ + "juju_model", + "juju_model_uuid", + "juju_application", + "juju_charm", + "juju_unit", + ]: + if label in rule["labels"]: + topology[label] = rule["labels"][label] + + rule["expr"] = self.inject_label_matchers(rule["expr"], topology) + return rules + + def validate_alert_rules(self, rules: dict) -> Tuple[bool, str]: + """Will validate correctness of alert rules, returning a boolean and any errors.""" + if not self.path: + logger.debug("`cos-tool` unavailable. Not validating alert correctness.") + return True, "" + + with tempfile.TemporaryDirectory() as tmpdir: + rule_path = Path(tmpdir + "/validate_rule.yaml") + rule_path.write_text(yaml.dump(rules)) + + args = [str(self.path), "validate", str(rule_path)] + # noinspection PyBroadException + try: + self._exec(args) + return True, "" + except subprocess.CalledProcessError as e: + logger.debug("Validating the rules failed: %s", e.output) + return False, ", ".join( + [ + line + for line in e.output.decode("utf8").splitlines() + if "error validating" in line + ] + ) + + def validate_scrape_jobs(self, jobs: list) -> bool: + """Validate scrape jobs using cos-tool.""" + if not self.path: + logger.debug("`cos-tool` unavailable. Not validating scrape jobs.") + return True + conf = {"scrape_configs": jobs} + with tempfile.NamedTemporaryFile() as tmpfile: + with open(tmpfile.name, "w") as f: + f.write(yaml.safe_dump(conf)) + try: + self._exec([str(self.path), "validate-config", tmpfile.name]) + except subprocess.CalledProcessError as e: + logger.error("Validating scrape jobs failed: {}".format(e.output)) + raise + return True + + def inject_label_matchers(self, expression, topology) -> str: + """Add label matchers to an expression.""" + if not topology: + return expression + if not self.path: + logger.debug("`cos-tool` unavailable. Leaving expression unchanged: %s", expression) + return expression + args = [str(self.path), "transform"] + args.extend( + ["--label-matcher={}={}".format(key, value) for key, value in topology.items()] + ) + + args.extend(["{}".format(expression)]) + # noinspection PyBroadException + try: + return self._exec(args) + except subprocess.CalledProcessError as e: + logger.debug('Applying the expression failed: "%s", falling back to the original', e) + return expression + + def _get_tool_path(self) -> Optional[Path]: + arch = platform.machine() + arch = "amd64" if arch == "x86_64" else arch + res = "cos-tool-{}".format(arch) + try: + path = Path(res).resolve() + path.chmod(0o777) + return path + except NotImplementedError: + logger.debug("System lacks support for chmod") + except FileNotFoundError: + logger.debug('Could not locate cos-tool at: "{}"'.format(res)) + return None + + def _exec(self, cmd) -> str: + result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + return result.stdout.decode("utf-8").strip() diff --git a/charms/kserve-controller/metadata.yaml b/charms/kserve-controller/metadata.yaml index 1571ea1..3097ded 100644 --- a/charms/kserve-controller/metadata.yaml +++ b/charms/kserve-controller/metadata.yaml @@ -23,6 +23,9 @@ resources: type: oci-image description: OCI image for kube rbac proxy upstream-source: gcr.io/kubebuilder/kube-rbac-proxy:v0.13.1 +provides: + metrics-endpoint: + interface: prometheus_scrape requires: object-storage: interface: object-storage diff --git a/charms/kserve-controller/src/charm.py b/charms/kserve-controller/src/charm.py index e974574..6d2065f 100755 --- a/charms/kserve-controller/src/charm.py +++ b/charms/kserve-controller/src/charm.py @@ -29,6 +29,8 @@ GatewayRequirer, ) from charms.loki_k8s.v1.loki_push_api import LogForwarder +from charms.observability_libs.v1.kubernetes_service_patch import KubernetesServicePatch +from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider from charms.resource_dispatcher.v0.kubernetes_manifests import ( KubernetesManifest, KubernetesManifestRequirerWrapper, @@ -36,6 +38,7 @@ from jinja2 import Template from jsonschema import ValidationError from lightkube import ApiError +from lightkube.models.core_v1 import ServicePort from ops.charm import CharmBase from ops.framework import StoredState from ops.main import main @@ -89,6 +92,8 @@ ] NO_MINIO_RELATION_DATA = {} +METRICS_PORT = 8080 + def parse_images_config(config: str) -> Dict: """ @@ -166,6 +171,18 @@ def __init__(self, *args): self._logging = LogForwarder(charm=self) + # metrics relation configuration + metrics_port = ServicePort( + port=METRICS_PORT, targetPort=METRICS_PORT, name=f"{self.app.name}-metrics" + ) + self.service_patcher = KubernetesServicePatch( + self, [metrics_port], service_name=f"{self.model.app.name}" + ) + self.prometheus_provider = MetricsEndpointProvider( + self, + jobs=[{"static_configs": [{"targets": [f"*:{METRICS_PORT}"]}]}], + ) + @property def _context(self): """Returns a dictionary containing context to be used for rendering.""" @@ -233,7 +250,7 @@ def _controller_pebble_layer(self): self._controller_container_name: { "override": "replace", "summary": "KServe Controller", - "command": "/manager --metrics-addr=:8080", + "command": f"/manager --metrics-addr=:{METRICS_PORT}", "startup": "enabled", "environment": { "POD_NAMESPACE": self.model.name, @@ -253,7 +270,7 @@ def _rbac_proxy_pebble_layer(self): self._rbac_proxy_container_name: { "override": "replace", "summary": "Kube Rbac Proxy", - "command": "/usr/local/bin/kube-rbac-proxy --secure-listen-address=0.0.0.0:8443 --upstream=http://127.0.0.1:8080 --logtostderr=true --v=10", # noqa E501 + "command": f"/usr/local/bin/kube-rbac-proxy --secure-listen-address=0.0.0.0:8443 --upstream=http://127.0.0.1:{METRICS_PORT} --logtostderr=true --v=10", # noqa E501 "startup": "enabled", } } diff --git a/charms/kserve-controller/src/templates/configmap_manifests.yaml.j2 b/charms/kserve-controller/src/templates/configmap_manifests.yaml.j2 index 8a562d8..dc26b72 100644 --- a/charms/kserve-controller/src/templates/configmap_manifests.yaml.j2 +++ b/charms/kserve-controller/src/templates/configmap_manifests.yaml.j2 @@ -72,8 +72,8 @@ data: } metricsAggregator: |- { - "enableMetricAggregation": "false", - "enablePrometheusScraping" : "false" + "enableMetricAggregation": "true", + "enablePrometheusScraping" : "true" } router: |- { diff --git a/charms/kserve-controller/tests/integration/config-map-data-changed.yaml b/charms/kserve-controller/tests/integration/config-map-data-changed.yaml index 46dfe2a..f834660 100644 --- a/charms/kserve-controller/tests/integration/config-map-data-changed.yaml +++ b/charms/kserve-controller/tests/integration/config-map-data-changed.yaml @@ -68,8 +68,8 @@ logger: |- } metricsAggregator: |- { - "enableMetricAggregation": "false", - "enablePrometheusScraping" : "false" + "enableMetricAggregation": "true", + "enablePrometheusScraping" : "true" } router: |- { diff --git a/charms/kserve-controller/tests/integration/config-map-data.yaml b/charms/kserve-controller/tests/integration/config-map-data.yaml index 2e9c751..614adc9 100644 --- a/charms/kserve-controller/tests/integration/config-map-data.yaml +++ b/charms/kserve-controller/tests/integration/config-map-data.yaml @@ -68,8 +68,8 @@ logger: |- } metricsAggregator: |- { - "enableMetricAggregation": "false", - "enablePrometheusScraping" : "false" + "enableMetricAggregation": "true", + "enablePrometheusScraping" : "true" } router: |- { diff --git a/charms/kserve-controller/tests/integration/test_charm.py b/charms/kserve-controller/tests/integration/test_charm.py index 189d0ce..cc74d0a 100644 --- a/charms/kserve-controller/tests/integration/test_charm.py +++ b/charms/kserve-controller/tests/integration/test_charm.py @@ -17,6 +17,7 @@ from charmed_kubeflow_chisme.kubernetes import KubernetesResourceHandler from charmed_kubeflow_chisme.testing import ( assert_logging, + assert_metrics_endpoint, deploy_and_assert_grafana_agent, ) from lightkube.core.exceptions import ApiError @@ -230,7 +231,7 @@ async def test_build_and_deploy(ops_test: OpsTest): # Deploying grafana-agent-k8s and add all relations await deploy_and_assert_grafana_agent( - ops_test.model, APP_NAME, metrics=False, dashboard=False, logging=True + ops_test.model, APP_NAME, metrics=True, dashboard=False, logging=True ) @@ -319,6 +320,16 @@ async def test_logging(ops_test: OpsTest): await assert_logging(app) +async def test_metrics_enpoint(ops_test): + """Test metrics_endpoints are defined in relation data bag and their accessibility. + This function gets all the metrics_endpoints from the relation data bag, checks if + they are available from the grafana-agent-k8s charm and finally compares them with the + ones provided to the function. + """ + app = ops_test.model.applications[APP_NAME] + await assert_metrics_endpoint(app, metrics_port=8080, metrics_path="/metrics") + + # # Remove the InferenceService deployed in RawDeployment mode # lightkube_client.delete( # inference_service_resource, name=inf_svc_name, namespace=rawdeployment_mode_namespace diff --git a/charms/kserve-controller/tests/unit/test_charm.py b/charms/kserve-controller/tests/unit/test_charm.py index 0a21e09..7dcadb2 100644 --- a/charms/kserve-controller/tests/unit/test_charm.py +++ b/charms/kserve-controller/tests/unit/test_charm.py @@ -72,7 +72,8 @@ def harness(): harness.set_leader(True) harness.set_can_connect("kserve-controller", True) harness.set_can_connect("kube-rbac-proxy", True) - return harness + with patch("charm.ServicePort"), patch("charm.KubernetesServicePatch"): + yield harness @pytest.fixture() @@ -91,6 +92,24 @@ def mocked_lightkube_client(mocker, mocked_resource_handler): yield mocked_resource_handler.lightkube_client +def test_metrics(harness): + """Test MetricsEndpointProvider initialization.""" + with patch("charm.MetricsEndpointProvider") as mock_metrics, patch( + "charm.KubernetesServicePatch" + ) as mock_service_patcher, patch("charm.ServicePort") as mock_service_port: + harness.begin() + mock_metrics.assert_called_once_with( + harness.charm, + jobs=[{"static_configs": [{"targets": ["*:8080"]}]}], + ) + mock_service_port.assert_called_once_with( + port=8080, targetPort=8080, name="kserve-controller-metrics" + ) + mock_service_patcher.assert_called_once_with( + harness.charm, [mock_service_port.return_value], service_name="kserve-controller" + ) + + def test_log_forwarding(harness): """Test LogForwarder initialization.""" with patch("charm.LogForwarder") as mock_logging: