diff --git a/.github/workflows/integrate.yaml b/.github/workflows/integrate.yaml index 9e56cc57..c6cee518 100644 --- a/.github/workflows/integrate.yaml +++ b/.github/workflows/integrate.yaml @@ -73,6 +73,19 @@ jobs: - kfp-profile-controller - kfp-api steps: + # Ideally we'd use self-hosted runners, but this effort is still not stable + # This action will remove unused software (dotnet, haskell, android libs, codeql, + # and docker images) from the GH runner, which will liberate around 60 GB of storage + # distributed in 40GB for root and around 20 for a mnt point. + - name: Maximise GH runner space + uses: easimon/maximize-build-space@v7 + with: + root-reserve-mb: 40960 + remove-dotnet: 'true' + remove-haskell: 'true' + remove-android: 'true' + remove-codeql: 'true' + remove-docker-images: 'true' - uses: actions/checkout@v3 - name: Setup operator environment uses: charmed-kubernetes/actions-operator@main @@ -82,21 +95,12 @@ jobs: juju-channel: 2.9/stable charmcraft-channel: latest/candidate - # TODO: Remove once the actions-operator does this automatically - - name: Configure kubectl - run: | - sg microk8s -c "microk8s config > ~/.kube/config" - - run: | sg microk8s -c "tox -e ${{ matrix.charm }}-integration" - # Collect debug logs if failed - - name: Dump Juju/k8s logs on failure - uses: canonical/charm-logdump-action@main - if: failure() - with: - app: ${{ matrix.charm }} - model: testing + - name: Collect charm debug artifacts + uses: canonical/kubeflow-ci/actions/dump-charm-debug-artifacts@main + if: always() test-bundle: name: Test the bundle @@ -128,21 +132,13 @@ jobs: channel: 1.24/stable juju-channel: 2.9/stable charmcraft-channel: latest/candidate + microk8s-addons: "dns hostpath-storage rbac metallb:10.64.140.43-10.64.140.49" # TODO: Remove once https://bugs.launchpad.net/juju/+bug/2024897 is fixed - name: Refresh Juju snap run: | sudo snap refresh juju --revision 22345 - # Required until https://github.com/charmed-kubernetes/actions-operator/pull/33 is merged - - run: | - sg microk8s -c "microk8s enable metallb:'10.64.140.43-10.64.140.49,192.168.0.105-192.168.0.111'" - - # TODO: Remove once the actions-operator does this automatically - - name: Configure kubectl - run: | - sg microk8s -c "microk8s config > ~/.kube/config" - - name: Run test run: | # Requires the model to be called kubeflow due to kfp-viewer @@ -160,16 +156,9 @@ jobs: run: juju status if: failure() - - name: Save relevant debug artifacts + - name: Collect charm debug artifacts + uses: canonical/kubeflow-ci/actions/dump-charm-debug-artifacts@main if: always() - run: | - mkdir ~/kfp-operators-debug-logs - juju debug-log --replay | tee ~/kfp-operators-debug-logs/juju-debug-log.log - juju status | tee ~/kfp-operators-debug-logs/juju-status.log - bash -c "cp ~/.local/state/charmcraft/log/* ~/kfp-operators-debug-logs/" - sg microk8s -c "microk8s.kubectl describe deployments -A | tee ~/kfp-operators-debug-logs/deployments.log" - sg microk8s -c "microk8s.kubectl describe pods -A | tee ~/kfp-operators-debug-logs/pods.log" - sg microk8s -c "microk8s.kubectl describe workloads -A | tee ~/kfp-operators-debug-logs/workloads.log" - name: Upload debug artifacts if: always() diff --git a/charms/kfp-api/lib/charms/data_platform_libs/v0/data_interfaces.py b/charms/kfp-api/lib/charms/data_platform_libs/v0/data_interfaces.py index e9152e1d..74db75db 100644 --- a/charms/kfp-api/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/charms/kfp-api/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Library to manage the relation for the data-platform products. +r"""Library to manage the relation for the data-platform products. This library contains the Requires and Provides classes for handling the relation between an application and multiple managed application supported by the data-team: @@ -144,6 +144,19 @@ def _on_cluster2_database_created(self, event: DatabaseCreatedEvent) -> None: ``` +When it's needed to check whether a plugin (extension) is enabled on the PostgreSQL +charm, you can use the is_postgresql_plugin_enabled method. To use that, you need to +add the following dependency to your charmcraft.yaml file: + +```yaml + +parts: + charm: + charm-binary-python-packages: + - psycopg[binary] + +``` + ### Provider Charm Following an example of using the DatabaseRequestedEvent, in the context of the @@ -283,17 +296,17 @@ def _on_topic_requested(self, event: TopicRequestedEvent): from abc import ABC, abstractmethod from collections import namedtuple from datetime import datetime -from typing import List, Optional +from typing import List, Optional, Union from ops.charm import ( CharmBase, CharmEvents, RelationChangedEvent, + RelationCreatedEvent, RelationEvent, - RelationJoinedEvent, ) from ops.framework import EventSource, Object -from ops.model import Relation +from ops.model import Application, ModelError, Relation, Unit # The unique Charmhub library identifier, never change it LIBID = "6c3e6b6680d64e9c89e611d1a15f65be" @@ -303,7 +316,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 9 +LIBPATCH = 16 PYDEPS = ["ops>=2.0.0"] @@ -318,7 +331,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): deleted - key that were deleted""" -def diff(event: RelationChangedEvent, bucket: str) -> Diff: +def diff(event: RelationChangedEvent, bucket: Union[Unit, Application]) -> Diff: """Retrieves the diff of the data in the relation changed databag. Args: @@ -332,9 +345,11 @@ def diff(event: RelationChangedEvent, bucket: str) -> Diff: # Retrieve the old data from the data key in the application relation databag. old_data = json.loads(event.relation.data[bucket].get("data", "{}")) # Retrieve the new data from the event relation databag. - new_data = { - key: value for key, value in event.relation.data[event.app].items() if key != "data" - } + new_data = ( + {key: value for key, value in event.relation.data[event.app].items() if key != "data"} + if event.app + else {} + ) # These are the keys that were added to the databag and triggered this event. added = new_data.keys() - old_data.keys() @@ -396,9 +411,11 @@ def fetch_relation_data(self) -> dict: """ data = {} for relation in self.relations: - data[relation.id] = { - key: value for key, value in relation.data[relation.app].items() if key != "data" - } + data[relation.id] = ( + {key: value for key, value in relation.data[relation.app].items() if key != "data"} + if relation.app + else {} + ) return data def _update_relation_data(self, relation_id: int, data: dict) -> None: @@ -413,8 +430,8 @@ def _update_relation_data(self, relation_id: int, data: dict) -> None: that should be updated in the relation. """ if self.local_unit.is_leader(): - relation = self.charm.model.get_relation(self.relation_name, relation_id) - relation.data[self.local_app].update(data) + if relation := self.charm.model.get_relation(self.relation_name, relation_id): + relation.data[self.local_app].update(data) @property def relations(self) -> List[Relation]: @@ -456,7 +473,7 @@ def set_tls_ca(self, relation_id: int, tls_ca: str) -> None: relation_id: the identifier for a particular relation. tls_ca: TLS certification authority. """ - self._update_relation_data(relation_id, {"tls_ca": tls_ca}) + self._update_relation_data(relation_id, {"tls-ca": tls_ca}) class DataRequires(Object, ABC): @@ -466,7 +483,7 @@ def __init__( self, charm, relation_name: str, - extra_user_roles: str = None, + extra_user_roles: Optional[str] = None, ): """Manager of base client relations.""" super().__init__(charm, relation_name) @@ -476,15 +493,15 @@ def __init__( self.local_unit = self.charm.unit self.relation_name = relation_name self.framework.observe( - self.charm.on[relation_name].relation_joined, self._on_relation_joined_event + self.charm.on[relation_name].relation_created, self._on_relation_created_event ) self.framework.observe( self.charm.on[relation_name].relation_changed, self._on_relation_changed_event ) @abstractmethod - def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None: - """Event emitted when the application joins the relation.""" + def _on_relation_created_event(self, event: RelationCreatedEvent) -> None: + """Event emitted when the relation is created.""" raise NotImplementedError @abstractmethod @@ -504,9 +521,11 @@ def fetch_relation_data(self) -> dict: """ data = {} for relation in self.relations: - data[relation.id] = { - key: value for key, value in relation.data[relation.app].items() if key != "data" - } + data[relation.id] = ( + {key: value for key, value in relation.data[relation.app].items() if key != "data"} + if relation.app + else {} + ) return data def _update_relation_data(self, relation_id: int, data: dict) -> None: @@ -550,11 +569,14 @@ def _is_relation_active(relation: Relation): try: _ = repr(relation.data) return True - except RuntimeError: + except (RuntimeError, ModelError): return False @staticmethod - def _is_resource_created_for_relation(relation: Relation): + def _is_resource_created_for_relation(relation: Relation) -> bool: + if not relation.app: + return False + return ( "username" in relation.data[relation.app] and "password" in relation.data[relation.app] ) @@ -586,10 +608,7 @@ def is_resource_created(self, relation_id: Optional[int] = None) -> bool: else: return ( all( - [ - self._is_resource_created_for_relation(relation) - for relation in self.relations - ] + self._is_resource_created_for_relation(relation) for relation in self.relations ) if self.relations else False @@ -605,6 +624,9 @@ class ExtraRoleEvent(RelationEvent): @property def extra_user_roles(self) -> Optional[str]: """Returns the extra user roles that were requested.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("extra-user-roles") @@ -614,21 +636,33 @@ class AuthenticationEvent(RelationEvent): @property def username(self) -> Optional[str]: """Returns the created username.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("username") @property def password(self) -> Optional[str]: """Returns the password for the created user.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("password") @property def tls(self) -> Optional[str]: """Returns whether TLS is configured.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("tls") @property def tls_ca(self) -> Optional[str]: """Returns TLS CA.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("tls-ca") @@ -641,6 +675,9 @@ class DatabaseProvidesEvent(RelationEvent): @property def database(self) -> Optional[str]: """Returns the database that was requested.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("database") @@ -663,16 +700,33 @@ class DatabaseRequiresEvent(RelationEvent): @property def database(self) -> Optional[str]: """Returns the database name.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("database") @property def endpoints(self) -> Optional[str]: - """Returns a comma separated list of read/write endpoints.""" + """Returns a comma separated list of read/write endpoints. + + In VM charms, this is the primary's address. + In kubernetes charms, this is the service to the primary pod. + """ + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("endpoints") @property def read_only_endpoints(self) -> Optional[str]: - """Returns a comma separated list of read only endpoints.""" + """Returns a comma separated list of read only endpoints. + + In VM charms, this is the address of all the secondary instances. + In kubernetes charms, this is the service to all replica pod instances. + """ + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("read-only-endpoints") @property @@ -681,6 +735,9 @@ def replset(self) -> Optional[str]: MongoDB only. """ + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("replset") @property @@ -689,6 +746,9 @@ def uris(self) -> Optional[str]: MongoDB, Redis, OpenSearch. """ + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("uris") @property @@ -697,6 +757,9 @@ def version(self) -> Optional[str]: Version as informed by the database daemon. """ + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("version") @@ -729,7 +792,7 @@ class DatabaseRequiresEvents(CharmEvents): class DatabaseProvides(DataProvides): """Provider-side of the database relations.""" - on = DatabaseProvidesEvents() + on = DatabaseProvidesEvents() # pyright: ignore [reportGeneralTypeIssues] def __init__(self, charm: CharmBase, relation_name: str) -> None: super().__init__(charm, relation_name) @@ -746,7 +809,9 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: # Emit a database requested event if the setup key (database name and optional # extra user roles) was added to the relation databag by the application. if "database" in diff.added: - self.on.database_requested.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "database_requested").emit( + event.relation, app=event.app, unit=event.unit + ) def set_database(self, relation_id: int, database_name: str) -> None: """Set database name. @@ -766,6 +831,10 @@ def set_endpoints(self, relation_id: int, connection_strings: str) -> None: This function writes in the application data bag, therefore, only the leader unit can call it. + In VM charms, only the primary's address should be passed as an endpoint. + In kubernetes charms, the service endpoint to the primary pod should be + passed as an endpoint. + Args: relation_id: the identifier for a particular relation. connection_strings: database hosts and ports comma separated list. @@ -819,15 +888,15 @@ def set_version(self, relation_id: int, version: str) -> None: class DatabaseRequires(DataRequires): """Requires-side of the database relation.""" - on = DatabaseRequiresEvents() + on = DatabaseRequiresEvents() # pyright: ignore [reportGeneralTypeIssues] def __init__( self, charm, relation_name: str, database_name: str, - extra_user_roles: str = None, - relations_aliases: List[str] = None, + extra_user_roles: Optional[str] = None, + relations_aliases: Optional[List[str]] = None, ): """Manager of database client relations.""" super().__init__(charm, relation_name, extra_user_roles) @@ -915,8 +984,52 @@ def _get_relation_alias(self, relation_id: int) -> Optional[str]: return relation.data[self.local_unit].get("alias") return None - def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None: - """Event emitted when the application joins the database relation.""" + def is_postgresql_plugin_enabled(self, plugin: str, relation_index: int = 0) -> bool: + """Returns whether a plugin is enabled in the database. + + Args: + plugin: name of the plugin to check. + relation_index: optional relation index to check the database + (default: 0 - first relation). + + PostgreSQL only. + """ + # Psycopg 3 is imported locally to avoid the need of its package installation + # when relating to a database charm other than PostgreSQL. + import psycopg + + # Return False if no relation is established. + if len(self.relations) == 0: + return False + + relation_data = self.fetch_relation_data()[self.relations[relation_index].id] + host = relation_data.get("endpoints") + + # Return False if there is no endpoint available. + if host is None: + return False + + host = host.split(":")[0] + user = relation_data.get("username") + password = relation_data.get("password") + connection_string = ( + f"host='{host}' dbname='{self.database}' user='{user}' password='{password}'" + ) + try: + with psycopg.connect(connection_string) as connection: + with connection.cursor() as cursor: + cursor.execute( + "SELECT TRUE FROM pg_extension WHERE extname=%s::text;", (plugin,) + ) + return cursor.fetchone() is not None + except psycopg.Error as e: + logger.exception( + f"failed to check whether {plugin} plugin is enabled in the database: %s", str(e) + ) + return False + + def _on_relation_created_event(self, event: RelationCreatedEvent) -> None: + """Event emitted when the database relation is created.""" # If relations aliases were provided, assign one to the relation. self._assign_relation_alias(event.relation.id) @@ -943,7 +1056,9 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: if "username" in diff.added and "password" in diff.added: # Emit the default event (the one without an alias). logger.info("database created at %s", datetime.now()) - self.on.database_created.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "database_created").emit( + event.relation, app=event.app, unit=event.unit + ) # Emit the aliased event (if any). self._emit_aliased_event(event, "database_created") @@ -957,7 +1072,9 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: if "endpoints" in diff.added or "endpoints" in diff.changed: # Emit the default event (the one without an alias). logger.info("endpoints changed on %s", datetime.now()) - self.on.endpoints_changed.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "endpoints_changed").emit( + event.relation, app=event.app, unit=event.unit + ) # Emit the aliased event (if any). self._emit_aliased_event(event, "endpoints_changed") @@ -971,7 +1088,7 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: if "read-only-endpoints" in diff.added or "read-only-endpoints" in diff.changed: # Emit the default event (the one without an alias). logger.info("read-only-endpoints changed on %s", datetime.now()) - self.on.read_only_endpoints_changed.emit( + getattr(self.on, "read_only_endpoints_changed").emit( event.relation, app=event.app, unit=event.unit ) @@ -988,11 +1105,17 @@ class KafkaProvidesEvent(RelationEvent): @property def topic(self) -> Optional[str]: """Returns the topic that was requested.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("topic") @property def consumer_group_prefix(self) -> Optional[str]: """Returns the consumer-group-prefix that was requested.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("consumer-group-prefix") @@ -1015,21 +1138,33 @@ class KafkaRequiresEvent(RelationEvent): @property def topic(self) -> Optional[str]: """Returns the topic.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("topic") @property def bootstrap_server(self) -> Optional[str]: - """Returns a a comma-seperated list of broker uris.""" + """Returns a comma-separated list of broker uris.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("endpoints") @property def consumer_group_prefix(self) -> Optional[str]: """Returns the consumer-group-prefix.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("consumer-group-prefix") @property def zookeeper_uris(self) -> Optional[str]: """Returns a comma separated list of Zookeeper uris.""" + if not self.relation.app: + return None + return self.relation.data[self.relation.app].get("zookeeper-uris") @@ -1057,7 +1192,7 @@ class KafkaRequiresEvents(CharmEvents): class KafkaProvides(DataProvides): """Provider-side of the Kafka relation.""" - on = KafkaProvidesEvents() + on = KafkaProvidesEvents() # pyright: ignore [reportGeneralTypeIssues] def __init__(self, charm: CharmBase, relation_name: str) -> None: super().__init__(charm, relation_name) @@ -1074,7 +1209,9 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: # Emit a topic requested event if the setup key (topic name and optional # extra user roles) was added to the relation databag by the application. if "topic" in diff.added: - self.on.topic_requested.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "topic_requested").emit( + event.relation, app=event.app, unit=event.unit + ) def set_topic(self, relation_id: int, topic: str) -> None: """Set topic name in the application relation databag. @@ -1108,7 +1245,7 @@ def set_zookeeper_uris(self, relation_id: int, zookeeper_uris: str) -> None: Args: relation_id: the identifier for a particular relation. - zookeeper_uris: comma-seperated list of ZooKeeper server uris. + zookeeper_uris: comma-separated list of ZooKeeper server uris. """ self._update_relation_data(relation_id, {"zookeeper-uris": zookeeper_uris}) @@ -1116,7 +1253,7 @@ def set_zookeeper_uris(self, relation_id: int, zookeeper_uris: str) -> None: class KafkaRequires(DataRequires): """Requires-side of the Kafka relation.""" - on = KafkaRequiresEvents() + on = KafkaRequiresEvents() # pyright: ignore [reportGeneralTypeIssues] def __init__( self, @@ -1133,8 +1270,20 @@ def __init__( self.topic = topic self.consumer_group_prefix = consumer_group_prefix or "" - def _on_relation_joined_event(self, event: RelationJoinedEvent) -> None: - """Event emitted when the application joins the Kafka relation.""" + @property + def topic(self): + """Topic to use in Kafka.""" + return self._topic + + @topic.setter + def topic(self, value): + # Avoid wildcards + if value == "*": + raise ValueError(f"Error on topic '{value}', cannot be a wildcard.") + self._topic = value + + def _on_relation_created_event(self, event: RelationCreatedEvent) -> None: + """Event emitted when the Kafka relation is created.""" # Sets topic, extra user roles, and "consumer-group-prefix" in the relation relation_data = { f: getattr(self, f.replace("-", "_"), "") @@ -1153,18 +1302,183 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: if "username" in diff.added and "password" in diff.added: # Emit the default event (the one without an alias). logger.info("topic created at %s", datetime.now()) - self.on.topic_created.emit(event.relation, app=event.app, unit=event.unit) + getattr(self.on, "topic_created").emit(event.relation, app=event.app, unit=event.unit) # To avoid unnecessary application restarts do not trigger # “endpoints_changed“ event if “topic_created“ is triggered. return - # Emit an endpoints (bootstap-server) changed event if the Kafka endpoints + # Emit an endpoints (bootstrap-server) changed event if the Kafka endpoints # added or changed this info in the relation databag. if "endpoints" in diff.added or "endpoints" in diff.changed: # Emit the default event (the one without an alias). logger.info("endpoints changed on %s", datetime.now()) - self.on.bootstrap_server_changed.emit( + getattr(self.on, "bootstrap_server_changed").emit( + event.relation, app=event.app, unit=event.unit + ) # here check if this is the right design + return + + +# Opensearch related events + + +class OpenSearchProvidesEvent(RelationEvent): + """Base class for OpenSearch events.""" + + @property + def index(self) -> Optional[str]: + """Returns the index that was requested.""" + if not self.relation.app: + return None + + return self.relation.data[self.relation.app].get("index") + + +class IndexRequestedEvent(OpenSearchProvidesEvent, ExtraRoleEvent): + """Event emitted when a new index is requested for use on this relation.""" + + +class OpenSearchProvidesEvents(CharmEvents): + """OpenSearch events. + + This class defines the events that OpenSearch can emit. + """ + + index_requested = EventSource(IndexRequestedEvent) + + +class OpenSearchRequiresEvent(DatabaseRequiresEvent): + """Base class for OpenSearch requirer events.""" + + +class IndexCreatedEvent(AuthenticationEvent, OpenSearchRequiresEvent): + """Event emitted when a new index is created for use on this relation.""" + + +class OpenSearchRequiresEvents(CharmEvents): + """OpenSearch events. + + This class defines the events that the opensearch requirer can emit. + """ + + index_created = EventSource(IndexCreatedEvent) + endpoints_changed = EventSource(DatabaseEndpointsChangedEvent) + authentication_updated = EventSource(AuthenticationEvent) + + +# OpenSearch Provides and Requires Objects + + +class OpenSearchProvides(DataProvides): + """Provider-side of the OpenSearch relation.""" + + on = OpenSearchProvidesEvents() # pyright: ignore[reportGeneralTypeIssues] + + def __init__(self, charm: CharmBase, relation_name: str) -> None: + super().__init__(charm, relation_name) + + def _on_relation_changed(self, event: RelationChangedEvent) -> None: + """Event emitted when the relation has changed.""" + # Only the leader should handle this event. + if not self.local_unit.is_leader(): + return + + # Check which data has changed to emit customs events. + diff = self._diff(event) + + # Emit an index requested event if the setup key (index name and optional extra user roles) + # have been added to the relation databag by the application. + if "index" in diff.added: + getattr(self.on, "index_requested").emit( + event.relation, app=event.app, unit=event.unit + ) + + def set_index(self, relation_id: int, index: str) -> None: + """Set the index in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + index: the index as it is _created_ on the provider charm. This needn't match the + requested index, and can be used to present a different index name if, for example, + the requested index is invalid. + """ + self._update_relation_data(relation_id, {"index": index}) + + def set_endpoints(self, relation_id: int, endpoints: str) -> None: + """Set the endpoints in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + endpoints: the endpoint addresses for opensearch nodes. + """ + self._update_relation_data(relation_id, {"endpoints": endpoints}) + + def set_version(self, relation_id: int, version: str) -> None: + """Set the opensearch version in the application relation databag. + + Args: + relation_id: the identifier for a particular relation. + version: database version. + """ + self._update_relation_data(relation_id, {"version": version}) + + +class OpenSearchRequires(DataRequires): + """Requires-side of the OpenSearch relation.""" + + on = OpenSearchRequiresEvents() # pyright: ignore[reportGeneralTypeIssues] + + def __init__( + self, charm, relation_name: str, index: str, extra_user_roles: Optional[str] = None + ): + """Manager of OpenSearch client relations.""" + super().__init__(charm, relation_name, extra_user_roles) + self.charm = charm + self.index = index + + def _on_relation_created_event(self, event: RelationCreatedEvent) -> None: + """Event emitted when the OpenSearch relation is created.""" + # Sets both index and extra user roles in the relation if the roles are provided. + # Otherwise, sets only the index. + data = {"index": self.index} + if self.extra_user_roles: + data["extra-user-roles"] = self.extra_user_roles + + self._update_relation_data(event.relation.id, data) + + def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: + """Event emitted when the OpenSearch relation has changed. + + This event triggers individual custom events depending on the changing relation. + """ + # Check which data has changed to emit customs events. + diff = self._diff(event) + + # Check if authentication has updated, emit event if so + updates = {"username", "password", "tls", "tls-ca"} + if len(set(diff._asdict().keys()) - updates) < len(diff): + logger.info("authentication updated at: %s", datetime.now()) + getattr(self.on, "authentication_updated").emit( + event.relation, app=event.app, unit=event.unit + ) + + # Check if the index is created + # (the OpenSearch charm shares the credentials). + if "username" in diff.added and "password" in diff.added: + # Emit the default event (the one without an alias). + logger.info("index created at: %s", datetime.now()) + getattr(self.on, "index_created").emit(event.relation, app=event.app, unit=event.unit) + + # To avoid unnecessary application restarts do not trigger + # “endpoints_changed“ event if “index_created“ is triggered. + return + + # Emit a endpoints changed event if the OpenSearch application added or changed this info + # in the relation databag. + if "endpoints" in diff.added or "endpoints" in diff.changed: + # Emit the default event (the one without an alias). + logger.info("endpoints changed on %s", datetime.now()) + getattr(self.on, "endpoints_changed").emit( event.relation, app=event.app, unit=event.unit ) # here check if this is the right design return diff --git a/charms/kfp-api/lib/charms/grafana_k8s/v0/grafana_dashboard.py b/charms/kfp-api/lib/charms/grafana_k8s/v0/grafana_dashboard.py index fa7ed773..c20ab2b1 100644 --- a/charms/kfp-api/lib/charms/grafana_k8s/v0/grafana_dashboard.py +++ b/charms/kfp-api/lib/charms/grafana_k8s/v0/grafana_dashboard.py @@ -219,7 +219,7 @@ def __init__(self, *args): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 28 +LIBPATCH = 32 logger = logging.getLogger(__name__) @@ -525,7 +525,7 @@ def _validate_relation_by_interface_and_direction( relation = charm.meta.relations[relation_name] actual_relation_interface = relation.interface_name - if actual_relation_interface != expected_relation_interface: + if actual_relation_interface and actual_relation_interface != expected_relation_interface: raise RelationInterfaceMismatchError( relation_name, expected_relation_interface, actual_relation_interface ) @@ -582,7 +582,7 @@ def _convert_dashboard_fields(content: str, inject_dropdowns: bool = True) -> st # If no existing template variables exist, just insert our own if "templating" not in dict_content: - dict_content["templating"] = {"list": [d for d in template_dropdowns]} # type: ignore + dict_content["templating"] = {"list": list(template_dropdowns)} # type: ignore else: # Otherwise, set a flag so we can go back later existing_templates = True @@ -830,18 +830,18 @@ def _modify_panel(panel: dict, topology: dict, transformer: "CosTool") -> dict: if "datasource" not in panel.keys(): continue - else: - if type(panel["datasource"]) == str: - if panel["datasource"] not in known_datasources: - continue - querytype = known_datasources[panel["datasource"]] - elif type(panel["datasource"]) == dict: - if panel["datasource"]["uid"] not in known_datasources: - continue - querytype = known_datasources[panel["datasource"]["uid"]] - else: - logger.error("Unknown datasource format: skipping") + + if type(panel["datasource"]) == str: + if panel["datasource"] not in known_datasources: + continue + querytype = known_datasources[panel["datasource"]] + elif type(panel["datasource"]) == dict: + if panel["datasource"]["uid"] not in known_datasources: continue + querytype = known_datasources[panel["datasource"]["uid"]] + else: + logger.error("Unknown datasource format: skipping") + continue # Capture all values inside `[]` into a list which we'll iterate over later to # put them back in-order. Then apply the regex again and replace everything with @@ -901,13 +901,12 @@ def _type_convert_stored(obj): """Convert Stored* to their appropriate types, recursively.""" if isinstance(obj, StoredList): return list(map(_type_convert_stored, obj)) - elif isinstance(obj, StoredDict): + if isinstance(obj, StoredDict): rdict = {} # type: Dict[Any, Any] for k in obj.keys(): rdict[k] = _type_convert_stored(obj[k]) return rdict - else: - return obj + return obj class GrafanaDashboardsChanged(EventBase): @@ -956,7 +955,7 @@ def restore(self, snapshot): """Restore grafana source information.""" self.error_message = snapshot["error_message"] self.valid = snapshot["valid"] - self.errors = json.loads(snapshot["errors"]) + self.errors = json.loads(str(snapshot["errors"])) class GrafanaProviderEvents(ObjectEvents): @@ -969,7 +968,7 @@ class GrafanaDashboardProvider(Object): """An API to provide Grafana dashboards to a Grafana charm.""" _stored = StoredState() - on = GrafanaProviderEvents() + on = GrafanaProviderEvents() # pyright: ignore def __init__( self, @@ -1073,7 +1072,7 @@ def add_dashboard(self, content: str, inject_dropdowns: bool = True) -> None: """ # Update of storage must be done irrespective of leadership, so # that the stored state is there when this unit becomes leader. - stored_dashboard_templates = self._stored.dashboard_templates # type: Any + stored_dashboard_templates: Any = self._stored.dashboard_templates # pyright: ignore encoded_dashboard = _encode_dashboard_content(content) @@ -1094,7 +1093,7 @@ def remove_non_builtin_dashboards(self) -> None: """Remove all dashboards to the relation added via :method:`add_dashboard`.""" # Update of storage must be done irrespective of leadership, so # that the stored state is there when this unit becomes leader. - stored_dashboard_templates = self._stored.dashboard_templates # type: Any + stored_dashboard_templates: Any = self._stored.dashboard_templates # pyright: ignore for dashboard_id in list(stored_dashboard_templates.keys()): if dashboard_id.startswith("prog:"): @@ -1121,7 +1120,7 @@ def _update_all_dashboards_from_dir( # Ensure we do not leave outdated dashboards by removing from stored all # the encoded dashboards that start with "file/". if self._dashboards_path: - stored_dashboard_templates = self._stored.dashboard_templates # type: Any + stored_dashboard_templates: Any = self._stored.dashboard_templates # pyright: ignore for dashboard_id in list(stored_dashboard_templates.keys()): if dashboard_id.startswith("file:"): @@ -1175,7 +1174,7 @@ def _reinitialize_dashboard_data(self, inject_dropdowns: bool = True) -> None: e.grafana_dashboards_absolute_path, e.message, ) - stored_dashboard_templates = self._stored.dashboard_templates # type: Any + stored_dashboard_templates: Any = self._stored.dashboard_templates # pyright: ignore for dashboard_id in list(stored_dashboard_templates.keys()): if dashboard_id.startswith("file:"): @@ -1213,16 +1212,18 @@ def _on_grafana_dashboard_relation_changed(self, event: RelationChangedEvent) -> valid = bool(data.get("valid", True)) errors = data.get("errors", []) if valid and not errors: - self.on.dashboard_status_changed.emit(valid=valid) + self.on.dashboard_status_changed.emit(valid=valid) # pyright: ignore else: - self.on.dashboard_status_changed.emit(valid=valid, errors=errors) + self.on.dashboard_status_changed.emit( # pyright: ignore + valid=valid, errors=errors + ) def _upset_dashboards_on_relation(self, relation: Relation) -> None: """Update the dashboards in the relation data bucket.""" # It's completely ridiculous to add a UUID, but if we don't have some # pseudo-random value, this never makes it across 'juju set-state' stored_data = { - "templates": _type_convert_stored(self._stored.dashboard_templates), + "templates": _type_convert_stored(self._stored.dashboard_templates), # pyright: ignore "uuid": str(uuid.uuid4()), } @@ -1251,13 +1252,13 @@ def _juju_topology(self) -> Dict: @property def dashboard_templates(self) -> List: """Return a list of the known dashboard templates.""" - return [v for v in self._stored.dashboard_templates.values()] # type: ignore + return list(self._stored.dashboard_templates.values()) # type: ignore class GrafanaDashboardConsumer(Object): """A consumer object for working with Grafana Dashboards.""" - on = GrafanaDashboardEvents() + on = GrafanaDashboardEvents() # pyright: ignore _stored = StoredState() def __init__( @@ -1305,7 +1306,7 @@ def __init__( self._relation_name = relation_name self._tranformer = CosTool(self._charm) - self._stored.set_default(dashboards=dict()) # type: ignore + self._stored.set_default(dashboards={}) # type: ignore self.framework.observe( self._charm.on[self._relation_name].relation_changed, @@ -1349,13 +1350,13 @@ def _on_grafana_dashboard_relation_changed(self, event: RelationChangedEvent) -> changes = self._render_dashboards_and_signal_changed(event.relation) if changes: - self.on.dashboards_changed.emit() + self.on.dashboards_changed.emit() # pyright: ignore def _on_grafana_peer_changed(self, _: RelationChangedEvent) -> None: """Emit dashboard events on peer events so secondary charm data updates.""" if self._charm.unit.is_leader(): return - self.on.dashboards_changed.emit() + self.on.dashboards_changed.emit() # pyright: ignore def update_dashboards(self, relation: Optional[Relation] = None) -> None: """Re-establish dashboards on one or more relations. @@ -1402,7 +1403,7 @@ def _render_dashboards_and_signal_changed(self, relation: Relation) -> bool: # """ other_app = relation.app - raw_data = relation.data[other_app].get("dashboards", {}) # type: ignore + raw_data = relation.data[other_app].get("dashboards", "") # pyright: ignore if not raw_data: logger.warning( @@ -1417,11 +1418,6 @@ def _render_dashboards_and_signal_changed(self, relation: Relation) -> bool: # # The only piece of data needed on this side of the relations is "templates" templates = data.pop("templates") - # Import only if a charmed operator uses the consumer, we don't impose these - # dependencies on the client - from jinja2 import Template - from jinja2.exceptions import TemplateSyntaxError - # The dashboards are WAY too big since this ultimately calls out to Juju to # set the relation data, and it overflows the maximum argument length for # subprocess, so we have to use b64, annoyingly. @@ -1434,14 +1430,12 @@ def _render_dashboards_and_signal_changed(self, relation: Relation) -> bool: # relation_has_invalid_dashboards = False for _, (fname, template) in enumerate(templates.items()): - decoded_content = None content = None error = None topology = template.get("juju_topology", {}) try: - decoded_content = _decode_dashboard_content(template["content"]) + content = _decode_dashboard_content(template["content"]) inject_dropdowns = template.get("inject_dropdowns", True) - content = Template(decoded_content).render() content = self._manage_dashboard_uid(content, template) content = _convert_dashboard_fields(content, inject_dropdowns) @@ -1456,9 +1450,6 @@ def _render_dashboards_and_signal_changed(self, relation: Relation) -> bool: # error = str(e.msg) logger.warning("Invalid JSON in Grafana dashboard: {}".format(fname)) continue - except TemplateSyntaxError as e: - error = str(e) - relation_has_invalid_dashboards = True # Prepend the relation name and ID to the dashboard ID to avoid clashes with # multiple relations with apps from the same charm, or having dashboards with @@ -1505,28 +1496,27 @@ def _render_dashboards_and_signal_changed(self, relation: Relation) -> bool: # # Dropping dashboards for a relation needs to be signalled return True - else: - stored_data = rendered_dashboards - currently_stored_data = self._get_stored_dashboards(relation.id) - coerced_data = ( - _type_convert_stored(currently_stored_data) if currently_stored_data else {} - ) + stored_data = rendered_dashboards + currently_stored_data = self._get_stored_dashboards(relation.id) + + coerced_data = _type_convert_stored(currently_stored_data) if currently_stored_data else {} - if not coerced_data == stored_data: - stored_dashboards = self.get_peer_data("dashboards") - stored_dashboards[relation.id] = stored_data - self.set_peer_data("dashboards", stored_dashboards) - return True + if not coerced_data == stored_data: + stored_dashboards = self.get_peer_data("dashboards") + stored_dashboards[relation.id] = stored_data + self.set_peer_data("dashboards", stored_dashboards) + return True + return None # type: ignore def _manage_dashboard_uid(self, dashboard: str, template: dict) -> str: """Add an uid to the dashboard if it is not present.""" - dashboard = json.loads(dashboard) + dashboard_dict = json.loads(dashboard) - if not dashboard.get("uid", None) and "dashboard_alt_uid" in template: - dashboard["uid"] = template["dashboard_alt_uid"] + if not dashboard_dict.get("uid", None) and "dashboard_alt_uid" in template: + dashboard_dict["uid"] = template["dashboard_alt_uid"] - return json.dumps(dashboard) + return json.dumps(dashboard_dict) def _remove_all_dashboards_for_relation(self, relation: Relation) -> None: """If an errored dashboard is in stored data, remove it and trigger a deletion.""" @@ -1534,7 +1524,7 @@ def _remove_all_dashboards_for_relation(self, relation: Relation) -> None: stored_dashboards = self.get_peer_data("dashboards") stored_dashboards.pop(str(relation.id)) self.set_peer_data("dashboards", stored_dashboards) - self.on.dashboards_changed.emit() + self.on.dashboards_changed.emit() # pyright: ignore def _to_external_object(self, relation_id, dashboard): return { @@ -1616,7 +1606,7 @@ class GrafanaDashboardAggregator(Object): """ _stored = StoredState() - on = GrafanaProviderEvents() + on = GrafanaProviderEvents() # pyright: ignore def __init__( self, @@ -1681,7 +1671,7 @@ def _update_remote_grafana(self, _: Optional[RelationEvent] = None) -> None: """Push dashboards to the downstream Grafana relation.""" # It's still ridiculous to add a UUID here, but needed stored_data = { - "templates": _type_convert_stored(self._stored.dashboard_templates), + "templates": _type_convert_stored(self._stored.dashboard_templates), # pyright: ignore "uuid": str(uuid.uuid4()), } @@ -1702,7 +1692,7 @@ def remove_dashboards(self, event: RelationBrokenEvent) -> None: del self._stored.dashboard_templates[id] # type: ignore stored_data = { - "templates": _type_convert_stored(self._stored.dashboard_templates), + "templates": _type_convert_stored(self._stored.dashboard_templates), # pyright: ignore "uuid": str(uuid.uuid4()), } @@ -1830,10 +1820,10 @@ def _handle_reactive_dashboards(self, event: RelationEvent) -> Optional[Dict]: # Replace old piechart panels dash = re.sub(r'"type": "grafana-piechart-panel"', '"type": "piechart"', dash) - from jinja2 import Template + from jinja2 import DebugUndefined, Template content = _encode_dashboard_content( - Template(dash).render(host=r"$host", datasource=r"${prometheusds}") # type: ignore + Template(dash, undefined=DebugUndefined).render(datasource=r"${prometheusds}") # type: ignore ) id = "prog:{}".format(content[-24:-16]) diff --git a/charms/kfp-api/lib/charms/observability_libs/v1/kubernetes_service_patch.py b/charms/kfp-api/lib/charms/observability_libs/v1/kubernetes_service_patch.py index 56cca01a..64dd13ce 100644 --- a/charms/kfp-api/lib/charms/observability_libs/v1/kubernetes_service_patch.py +++ b/charms/kfp-api/lib/charms/observability_libs/v1/kubernetes_service_patch.py @@ -146,7 +146,7 @@ def setUp(self, *unused): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 6 +LIBPATCH = 7 ServiceType = Literal["ClusterIP", "LoadBalancer"] @@ -310,9 +310,8 @@ def _is_patched(self, client: Client) -> bool: except ApiError as e: if e.status.code == 404 and self.service_name != self._app: return False - else: - logger.error("Kubernetes service get failed: %s", str(e)) - raise + logger.error("Kubernetes service get failed: %s", str(e)) + raise # Construct a list of expected ports, should the patch be applied expected_ports = [(p.port, p.targetPort) for p in self.service.spec.ports] diff --git a/charms/kfp-api/lib/charms/prometheus_k8s/v0/prometheus_scrape.py b/charms/kfp-api/lib/charms/prometheus_k8s/v0/prometheus_scrape.py index 13ae0c48..cac364e3 100644 --- a/charms/kfp-api/lib/charms/prometheus_k8s/v0/prometheus_scrape.py +++ b/charms/kfp-api/lib/charms/prometheus_k8s/v0/prometheus_scrape.py @@ -370,7 +370,7 @@ def _on_scrape_targets_changed(self, event): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 34 +LIBPATCH = 38 logger = logging.getLogger(__name__) @@ -391,6 +391,7 @@ def _on_scrape_targets_changed(self, event): "scheme", "basic_auth", "tls_config", + "authorization", } DEFAULT_JOB = { "metrics_path": "/metrics", @@ -601,15 +602,22 @@ def render_alertmanager_static_configs(alertmanagers: List[str]): # Create a mapping from paths to netlocs # Group alertmanager targets into a dictionary of lists: # {path: [netloc1, netloc2]} - paths = defaultdict(list) # type: Dict[str, List[str]] + paths = defaultdict(list) # type: Dict[Tuple[str, str], List[str]] for parsed in map(urlparse, sanitized): path = parsed.path or "/" - paths[path].append(parsed.netloc) + paths[(parsed.scheme, path)].append(parsed.netloc) return { "alertmanagers": [ - {"path_prefix": path_prefix, "static_configs": [{"targets": netlocs}]} - for path_prefix, netlocs in paths.items() + { + "scheme": scheme, + "path_prefix": path_prefix, + "static_configs": [{"targets": netlocs}], + # FIXME figure out how to get alertmanager's ca_file into here + # Without this, prom errors: "x509: certificate signed by unknown authority" + "tls_config": {"insecure_skip_verify": True}, + } + for (scheme, path_prefix), netlocs in paths.items() ] } @@ -715,13 +723,12 @@ def _type_convert_stored(obj): """Convert Stored* to their appropriate types, recursively.""" if isinstance(obj, StoredList): return list(map(_type_convert_stored, obj)) - elif isinstance(obj, StoredDict): + if isinstance(obj, StoredDict): rdict = {} # type: Dict[Any, Any] for k in obj.keys(): rdict[k] = _type_convert_stored(obj[k]) return rdict - else: - return obj + return obj def _validate_relation_by_interface_and_direction( @@ -1353,29 +1360,39 @@ def _static_scrape_config(self, relation) -> list: if not relation.units: return [] - scrape_jobs = json.loads(relation.data[relation.app].get("scrape_jobs", "[]")) + scrape_configs = json.loads(relation.data[relation.app].get("scrape_jobs", "[]")) - if not scrape_jobs: + if not scrape_configs: return [] scrape_metadata = json.loads(relation.data[relation.app].get("scrape_metadata", "{}")) if not scrape_metadata: - return scrape_jobs + return scrape_configs topology = JujuTopology.from_dict(scrape_metadata) job_name_prefix = "juju_{}_prometheus_scrape".format(topology.identifier) - scrape_jobs = PrometheusConfig.prefix_job_names(scrape_jobs, job_name_prefix) - scrape_jobs = PrometheusConfig.sanitize_scrape_configs(scrape_jobs) + scrape_configs = PrometheusConfig.prefix_job_names(scrape_configs, job_name_prefix) + scrape_configs = PrometheusConfig.sanitize_scrape_configs(scrape_configs) hosts = self._relation_hosts(relation) - scrape_jobs = PrometheusConfig.expand_wildcard_targets_into_individual_jobs( - scrape_jobs, hosts, topology + scrape_configs = PrometheusConfig.expand_wildcard_targets_into_individual_jobs( + scrape_configs, hosts, topology ) - return scrape_jobs + # If scheme is https but no ca section present, then auto add "insecure_skip_verify", + # otherwise scraping errors out with "x509: certificate signed by unknown authority". + # https://prometheus.io/docs/prometheus/latest/configuration/configuration/#tls_config + for scrape_config in scrape_configs: + tls_config = scrape_config.get("tls_config", {}) + ca_present = "ca" in tls_config or "ca_file" in tls_config + if scrape_config.get("scheme") == "https" and not ca_present: + tls_config["insecure_skip_verify"] = True + scrape_config["tls_config"] = tls_config + + return scrape_configs def _relation_hosts(self, relation: Relation) -> Dict[str, Tuple[str, str]]: """Returns a mapping from unit names to (address, path) tuples, for the given relation.""" @@ -1440,7 +1457,7 @@ def _dedupe_job_names(jobs: List[dict]): job["job_name"] = "{}_{}".format(job["job_name"], hashed) new_jobs = [] for key in jobs_dict: - new_jobs.extend([i for i in jobs_dict[key]]) + new_jobs.extend(list(jobs_dict[key])) # Deduplicate jobs which are equal # Again this in O(n^2) but it should be okay @@ -1793,11 +1810,10 @@ def _scrape_jobs(self) -> list: A list of dictionaries, where each dictionary specifies a single scrape job for Prometheus. """ - jobs = self._jobs if self._jobs else [DEFAULT_JOB] + jobs = self._jobs or [] if callable(self._lookaside_jobs): - return jobs + PrometheusConfig.sanitize_scrape_configs(self._lookaside_jobs()) - else: - return jobs + jobs.extend(PrometheusConfig.sanitize_scrape_configs(self._lookaside_jobs())) + return jobs or [DEFAULT_JOB] @property def _scrape_metadata(self) -> dict: @@ -2078,6 +2094,7 @@ def set_target_job_data(self, targets: dict, app_name: str, **kwargs) -> None: Args: targets: a `dict` containing target information app_name: a `str` identifying the application + kwargs: a `dict` of the extra arguments passed to the function """ if not self._charm.unit.is_leader(): return @@ -2203,6 +2220,7 @@ def _static_scrape_job(self, targets, application_name, **kwargs) -> dict: "port". application_name: a string name of the application for which this static scrape job is being constructed. + kwargs: a `dict` of the extra arguments passed to the function Returns: A dictionary corresponding to a Prometheus static scrape @@ -2248,7 +2266,7 @@ def _static_config_extra_labels(self, target: Dict[str, str]) -> Dict[str, str]: logger.debug("Could not perform DNS lookup for %s", target["hostname"]) dns_name = target["hostname"] extra_info["dns_name"] = dns_name - label_re = re.compile(r'(?P