Skip to content

Commit

Permalink
Successfully tested with storage
Browse files Browse the repository at this point in the history
  • Loading branch information
addyess committed Mar 1, 2024
1 parent c4c2d17 commit 096105c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 22 deletions.
94 changes: 80 additions & 14 deletions pytest_operator/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
from zipfile import Path as ZipPath

import jinja2
import kubernetes.config
import pytest
import pytest_asyncio.plugin
import yaml
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from kubernetes import client as k8s_client
from kubernetes.client import Configuration as K8sConfiguration
from juju.client import client
from juju.client.jujudata import FileJujuData
Expand Down Expand Up @@ -937,7 +939,7 @@ async def is_model_alive():
if timeout and await is_model_alive():
log.warning("Waiting for model %s to die...", model_name)
while await is_model_alive():
asyncio.sleep(5)
await asyncio.sleep(5)

await model.disconnect()

Expand Down Expand Up @@ -1542,10 +1544,73 @@ def is_crash_dump_enabled(self) -> bool:
else:
return False

# Add K8S
async def add_k8s(self, config: K8sConfiguration, **kwargs) -> str:
async def add_k8s(
self,
cloud_name: Optional[str] = None,
kubeconfig: Optional[K8sConfiguration] = None,
context: Optional[str] = None,
skip_storage: bool = True,
storage_class: Optional[str] = None,
) -> str:
"""
Add a new k8s cloud in the existing controller.
@param Optional[str] cloud_name:
Name for the new cloud
None will autogenerate a name
@param Optional[kubernetes.client.configuration.Configuration] kubeconfig:
Configuration object from kubernetes.config.load_config
None will read from the usual kubeconfig locations like
os.environ.get('KUBECONFIG', '$HOME/.kube/config')
@param Optional[str] context:
context to use within the kubeconfig
None will use the default context
@param bool skip_storage:
True will not use cloud storage,
False either finds storage or uses storage_class
@param Optional[str] skip_storage:
cluster storage-class to use for juju storage
None will look for a default storage class within the cluster
@returns str: cloud_name
Common Examples:
----------------------------------
# make a new k8s cloud with any juju name and destroy it when the tests are over
await ops_test.add_k8s()
# make a cloud known to juju as "bob"
await ops_test.add_k8s(cloud_name="my-k8s")
----------------------------------
"""

if kubeconfig is None:
# kubeconfig should be auto-detected from the usual places
kubeconfig = type.__call__(K8sConfiguration)
kubernetes.config.load_config(
client_configuration=kubeconfig,
context=context,
temp_file_path=self.tmp_path,
)
juju_cloud_config = {}
if not skip_storage and storage_class is None:
# lookup default storage-class
api_client = kubernetes.client.ApiClient(configuration=kubeconfig)
cluster = k8s_client.StorageV1Api(api_client=api_client)
for sc in cluster.list_storage_class().items:
if (
sc.metadata.annotations.get(
"storageclass.kubernetes.io/is-default-class"
)
== "true"
):
storage_class = sc.metadata.name
if not skip_storage and storage_class:
juju_cloud_config["workload-storage"] = storage_class
juju_cloud_config["operator-storage"] = storage_class

controller = self._controller
cloud_name = self._generate_name("k8s-cloud")
cloud_name = cloud_name or self._generate_name("k8s-cloud")
log.info(f"Adding k8s cloud {cloud_name}")

cloud_def = client.Cloud(
Expand All @@ -1556,27 +1621,28 @@ async def add_k8s(self, config: K8sConfiguration, **kwargs) -> str:
"oauth2withcert",
"userpass",
],
ca_certificates=[Path(config.ssl_ca_cert).read_text()],
endpoint=config.host,
ca_certificates=[Path(kubeconfig.ssl_ca_cert).read_text()],
endpoint=kubeconfig.host,
host_cloud_region="kubernetes/ops-test",
regions=[client.CloudRegion(endpoint=config.host, name="k8s")],
skip_tls_verify=not config.verify_ssl,
regions=[client.CloudRegion(endpoint=kubeconfig.host, name="default")],
skip_tls_verify=not kubeconfig.verify_ssl,
type_="kubernetes",
config=juju_cloud_config,
)

if config.cert_file and config.key_file:
if kubeconfig.cert_file and kubeconfig.key_file:
auth_type = "clientcertificate"
attrs = dict(
ClientCertificateData=Path(config.cert_file).read_text(),
ClientKeyData=Path(config.key_file).read_text(),
ClientCertificateData=Path(kubeconfig.cert_file).read_text(),
ClientKeyData=Path(kubeconfig.key_file).read_text(),
)
elif token := config.api_key["authorization"]:
elif token := kubeconfig.api_key["authorization"]:
if token.startswith("Bearer "):
auth_type = "oauth2"
attrs = {"Token": token.split(" ")[1]}
elif token.startswith("Basic "):
auth_type, userpass = "userpass", token.split(" ")[1]
user, passwd = base64.b64decode(userpass).decode().split(":")
user, passwd = base64.b64decode(userpass).decode().split(":", 1)
attrs = {"username": user, "password": passwd}
else:
raise ValueError("Failed to find credentials in authorization token")
Expand All @@ -1596,7 +1662,7 @@ async def forget_cloud(self, cloud_name: str):
if cloud_name not in self._clouds:
raise KeyError(f"{cloud_name} not in clouds")
for model in reversed(self._clouds[cloud_name].models):
await self.forget_model(model)
await self.forget_model(model, destroy_storage=True)
log.info(f"Forgetting cloud: {cloud_name}...")
await self._controller.remove_cloud(cloud_name)
del self._clouds[cloud_name]
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
# test that pytest operator supports adding a k8s to an existing controller
# This is a new k8s cloud created/managed by pytest-operator

from kubernetes import config as k8s_config
from kubernetes.client import Configuration
from kubernetes.config.config_exception import ConfigException
import pytest

from pytest_operator.plugin import OpsTest


async def test_add_k8s(ops_test: OpsTest):
config = type.__call__(Configuration)
try:
k8s_config.load_config(client_configuration=config)
k8s_cloud = await ops_test.add_k8s(skip_storage=False)
except ConfigException:
pytest.skip("No Kubernetes config found to add-k8s")
k8s_cloud = await ops_test.add_k8s(config, skip_storage=True, storage_class=None)

k8s_model = await ops_test.track_model(
"secondary", cloud_name=k8s_cloud, keep=ops_test.ModelKeep.NEVER
)
with ops_test.model_context("secondary"):
await k8s_model.deploy("coredns", trust=True)
await k8s_model.wait_for_idle(apps=["coredns"], status="active")
await k8s_model.deploy("grafana-k8s", trust=True)
await k8s_model.wait_for_idle(apps=["grafana-k8s"], status="active")

0 comments on commit 096105c

Please sign in to comment.