From 096105c2afb1cef1731451996eeeafb8aa8ee918 Mon Sep 17 00:00:00 2001 From: Adam Dyess Date: Thu, 29 Feb 2024 22:26:14 -0600 Subject: [PATCH] Successfully tested with storage --- pytest_operator/plugin.py | 94 ++++++++++++++++--- ...est_opstest_add_k8s.py => test_add_k8s.py} | 12 +-- 2 files changed, 84 insertions(+), 22 deletions(-) rename tests/integration/{test_opstest_add_k8s.py => test_add_k8s.py} (58%) diff --git a/pytest_operator/plugin.py b/pytest_operator/plugin.py index 6188c7b..3fae6c3 100644 --- a/pytest_operator/plugin.py +++ b/pytest_operator/plugin.py @@ -39,11 +39,13 @@ from zipfile import Path as ZipPath import jinja2 +import kubernetes.config import pytest import pytest_asyncio.plugin import yaml from _pytest.config import Config from _pytest.config.argparsing import Parser +from kubernetes import client as k8s_client from kubernetes.client import Configuration as K8sConfiguration from juju.client import client from juju.client.jujudata import FileJujuData @@ -937,7 +939,7 @@ async def is_model_alive(): if timeout and await is_model_alive(): log.warning("Waiting for model %s to die...", model_name) while await is_model_alive(): - asyncio.sleep(5) + await asyncio.sleep(5) await model.disconnect() @@ -1542,10 +1544,73 @@ def is_crash_dump_enabled(self) -> bool: else: return False - # Add K8S - async def add_k8s(self, config: K8sConfiguration, **kwargs) -> str: + async def add_k8s( + self, + cloud_name: Optional[str] = None, + kubeconfig: Optional[K8sConfiguration] = None, + context: Optional[str] = None, + skip_storage: bool = True, + storage_class: Optional[str] = None, + ) -> str: + """ + Add a new k8s cloud in the existing controller. + + @param Optional[str] cloud_name: + Name for the new cloud + None will autogenerate a name + @param Optional[kubernetes.client.configuration.Configuration] kubeconfig: + Configuration object from kubernetes.config.load_config + None will read from the usual kubeconfig locations like + os.environ.get('KUBECONFIG', '$HOME/.kube/config') + @param Optional[str] context: + context to use within the kubeconfig + None will use the default context + @param bool skip_storage: + True will not use cloud storage, + False either finds storage or uses storage_class + @param Optional[str] skip_storage: + cluster storage-class to use for juju storage + None will look for a default storage class within the cluster + + @returns str: cloud_name + + Common Examples: + ---------------------------------- + # make a new k8s cloud with any juju name and destroy it when the tests are over + await ops_test.add_k8s() + + # make a cloud known to juju as "bob" + await ops_test.add_k8s(cloud_name="my-k8s") + ---------------------------------- + """ + + if kubeconfig is None: + # kubeconfig should be auto-detected from the usual places + kubeconfig = type.__call__(K8sConfiguration) + kubernetes.config.load_config( + client_configuration=kubeconfig, + context=context, + temp_file_path=self.tmp_path, + ) + juju_cloud_config = {} + if not skip_storage and storage_class is None: + # lookup default storage-class + api_client = kubernetes.client.ApiClient(configuration=kubeconfig) + cluster = k8s_client.StorageV1Api(api_client=api_client) + for sc in cluster.list_storage_class().items: + if ( + sc.metadata.annotations.get( + "storageclass.kubernetes.io/is-default-class" + ) + == "true" + ): + storage_class = sc.metadata.name + if not skip_storage and storage_class: + juju_cloud_config["workload-storage"] = storage_class + juju_cloud_config["operator-storage"] = storage_class + controller = self._controller - cloud_name = self._generate_name("k8s-cloud") + cloud_name = cloud_name or self._generate_name("k8s-cloud") log.info(f"Adding k8s cloud {cloud_name}") cloud_def = client.Cloud( @@ -1556,27 +1621,28 @@ async def add_k8s(self, config: K8sConfiguration, **kwargs) -> str: "oauth2withcert", "userpass", ], - ca_certificates=[Path(config.ssl_ca_cert).read_text()], - endpoint=config.host, + ca_certificates=[Path(kubeconfig.ssl_ca_cert).read_text()], + endpoint=kubeconfig.host, host_cloud_region="kubernetes/ops-test", - regions=[client.CloudRegion(endpoint=config.host, name="k8s")], - skip_tls_verify=not config.verify_ssl, + regions=[client.CloudRegion(endpoint=kubeconfig.host, name="default")], + skip_tls_verify=not kubeconfig.verify_ssl, type_="kubernetes", + config=juju_cloud_config, ) - if config.cert_file and config.key_file: + if kubeconfig.cert_file and kubeconfig.key_file: auth_type = "clientcertificate" attrs = dict( - ClientCertificateData=Path(config.cert_file).read_text(), - ClientKeyData=Path(config.key_file).read_text(), + ClientCertificateData=Path(kubeconfig.cert_file).read_text(), + ClientKeyData=Path(kubeconfig.key_file).read_text(), ) - elif token := config.api_key["authorization"]: + elif token := kubeconfig.api_key["authorization"]: if token.startswith("Bearer "): auth_type = "oauth2" attrs = {"Token": token.split(" ")[1]} elif token.startswith("Basic "): auth_type, userpass = "userpass", token.split(" ")[1] - user, passwd = base64.b64decode(userpass).decode().split(":") + user, passwd = base64.b64decode(userpass).decode().split(":", 1) attrs = {"username": user, "password": passwd} else: raise ValueError("Failed to find credentials in authorization token") @@ -1596,7 +1662,7 @@ async def forget_cloud(self, cloud_name: str): if cloud_name not in self._clouds: raise KeyError(f"{cloud_name} not in clouds") for model in reversed(self._clouds[cloud_name].models): - await self.forget_model(model) + await self.forget_model(model, destroy_storage=True) log.info(f"Forgetting cloud: {cloud_name}...") await self._controller.remove_cloud(cloud_name) del self._clouds[cloud_name] diff --git a/tests/integration/test_opstest_add_k8s.py b/tests/integration/test_add_k8s.py similarity index 58% rename from tests/integration/test_opstest_add_k8s.py rename to tests/integration/test_add_k8s.py index 2172787..d23d20e 100644 --- a/tests/integration/test_opstest_add_k8s.py +++ b/tests/integration/test_add_k8s.py @@ -1,24 +1,20 @@ # test that pytest operator supports adding a k8s to an existing controller # This is a new k8s cloud created/managed by pytest-operator -from kubernetes import config as k8s_config -from kubernetes.client import Configuration from kubernetes.config.config_exception import ConfigException import pytest - from pytest_operator.plugin import OpsTest async def test_add_k8s(ops_test: OpsTest): - config = type.__call__(Configuration) try: - k8s_config.load_config(client_configuration=config) + k8s_cloud = await ops_test.add_k8s(skip_storage=False) except ConfigException: pytest.skip("No Kubernetes config found to add-k8s") - k8s_cloud = await ops_test.add_k8s(config, skip_storage=True, storage_class=None) + k8s_model = await ops_test.track_model( "secondary", cloud_name=k8s_cloud, keep=ops_test.ModelKeep.NEVER ) with ops_test.model_context("secondary"): - await k8s_model.deploy("coredns", trust=True) - await k8s_model.wait_for_idle(apps=["coredns"], status="active") + await k8s_model.deploy("grafana-k8s", trust=True) + await k8s_model.wait_for_idle(apps=["grafana-k8s"], status="active")