diff --git a/CHANGES.md b/CHANGES.md index e2d5768cb..5faa20943 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # 2.12.0 * Updated Pebble Notices `get_notices` parameter name to `users=all` (previously `select=all`). +* Added `Model.get_cloud_spec` which uses the `credential-get` hook tool to get details of the cloud where the model is deployed. # 2.11.0 diff --git a/ops/__init__.py b/ops/__init__.py index 2f359c2b0..60de7ad4a 100644 --- a/ops/__init__.py +++ b/ops/__init__.py @@ -129,6 +129,8 @@ 'BindingMapping', 'BlockedStatus', 'CheckInfoMapping', + 'CloudCredential', + 'CloudSpec', 'ConfigData', 'Container', 'ContainerMapping', @@ -270,6 +272,8 @@ BindingMapping, BlockedStatus, CheckInfoMapping, + CloudCredential, + CloudSpec, ConfigData, Container, ContainerMapping, diff --git a/ops/model.py b/ops/model.py index bc87b3bdb..9ccadcd82 100644 --- a/ops/model.py +++ b/ops/model.py @@ -281,6 +281,21 @@ def get_secret(self, *, id: Optional[str] = None, label: Optional[str] = None) - content = self._backend.secret_get(id=id, label=label) return Secret(self._backend, id=id, label=label, content=content) + def get_cloud_spec(self) -> 'CloudSpec': + """Get details of the cloud in which the model is deployed. + + Note: This information is only available for machine charms, + not Kubernetes sidecar charms. + + Returns: + a specification for the cloud in which the model is deployed, + including credential information. + + Raises: + :class:`ModelError`: if called in a Kubernetes model. + """ + return self._backend.credential_get() + if typing.TYPE_CHECKING: # (entity type, name): instance. @@ -3507,6 +3522,14 @@ def reboot(self, now: bool = False): else: self._run("juju-reboot") + def credential_get(self) -> 'CloudSpec': + """Access cloud credentials by running the credential-get hook tool. + + Returns the cloud specification used by the model. + """ + result = self._run('credential-get', return_output=True, use_json=True) + return CloudSpec.from_dict(typing.cast(Dict[str, Any], result)) + class _ModelBackendValidator: """Provides facilities for validating inputs and formatting them for model backends.""" @@ -3596,3 +3619,85 @@ def _ensure_loaded(self): self._notice = self._container.get_notice(self.id) assert self._notice.type == self.type assert self._notice.key == self.key + + +@dataclasses.dataclass(frozen=True) +class CloudCredential: + """Credentials for cloud. + + Used as the type of attribute `credential` in :class:`CloudSpec`. + """ + + auth_type: str + """Authentication type.""" + + attributes: Dict[str, str] = dataclasses.field(default_factory=dict) + """A dictionary containing cloud credentials. + + For example, for AWS, it contains `access-key` and `secret-key`; + for Azure, `application-id`, `application-password` and `subscription-id` + can be found here. + """ + + redacted: List[str] = dataclasses.field(default_factory=list) + """A list of redacted secrets.""" + + @classmethod + def from_dict(cls, d: Dict[str, Any]) -> 'CloudCredential': + """Create a new CloudCredential object from a dictionary.""" + return cls( + auth_type=d['auth-type'], + attributes=d.get('attrs') or {}, + redacted=d.get('redacted') or [], + ) + + +@dataclasses.dataclass(frozen=True) +class CloudSpec: + """Cloud specification information (metadata) including credentials.""" + + type: str + """Type of the cloud.""" + + name: str + """Juju cloud name.""" + + region: Optional[str] = None + """Region of the cloud.""" + + endpoint: Optional[str] = None + """Endpoint of the cloud.""" + + identity_endpoint: Optional[str] = None + """Identity endpoint of the cloud.""" + + storage_endpoint: Optional[str] = None + """Storage endpoint of the cloud.""" + + credential: Optional[CloudCredential] = None + """Cloud credentials with key-value attributes.""" + + ca_certificates: List[str] = dataclasses.field(default_factory=list) + """A list of CA certificates.""" + + skip_tls_verify: bool = False + """Whether to skip TLS verfication.""" + + is_controller_cloud: bool = False + """If this is the cloud used by the controller, defaults to False.""" + + @classmethod + def from_dict(cls, d: Dict[str, Any]) -> 'CloudSpec': + """Create a new CloudSpec object from a dict parsed from JSON.""" + return cls( + type=d['type'], + name=d['name'], + region=d.get('region') or None, + endpoint=d.get('endpoint') or None, + identity_endpoint=d.get('identity-endpoint') or None, + storage_endpoint=d.get('storage-endpoint') or None, + credential=CloudCredential.from_dict(d['credential']) if d.get('credential') else None, + ca_certificates=d.get('cacertificates') or [], + skip_tls_verify=d.get('skip-tls-verify') or False, + is_controller_cloud=d.get('is-controller-cloud') or False, + ) diff --git a/ops/testing.py b/ops/testing.py index cb395be51..5d8cc838c 100644 --- a/ops/testing.py +++ b/ops/testing.py @@ -1911,6 +1911,49 @@ def run_action(self, action_name: str, output=action_under_test.output) return action_under_test.output + def set_cloud_spec(self, spec: 'model.CloudSpec'): + """Set cloud specification (metadata) including credentials. + + Call this method before the charm calls :meth:`ops.Model.get_cloud_spec`. + + Example usage:: + + class MyVMCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + framework.observe(self.on.start, self._on_start) + + def _on_start(self, event: ops.StartEvent): + self.cloud_spec = self.model.get_cloud_spec() + + class TestCharm(unittest.TestCase): + def setUp(self): + self.harness = ops.testing.Harness(MyVMCharm) + self.addCleanup(self.harness.cleanup) + + def test_start(self): + cloud_spec_dict = { + 'name': 'localhost', + 'type': 'lxd', + 'endpoint': 'https://127.0.0.1:8443', + 'credential': { + 'authtype': 'certificate', + 'attrs': { + 'client-cert': 'foo', + 'client-key': 'bar', + 'server-cert': 'baz' + }, + }, + } + self.harness.set_cloud_spec(ops.CloudSpec.from_dict(cloud_spec_dict)) + self.harness.begin() + self.harness.charm.on.start.emit() + expected = ops.CloudSpec.from_dict(cloud_spec_dict) + self.assertEqual(harness.charm.cloud_spec, expected) + + """ + self._backend._cloud_spec = spec + def _get_app_or_unit_name(app_or_unit: AppUnitOrName) -> str: """Return name of given application or unit (return strings directly).""" @@ -2126,6 +2169,7 @@ def __init__(self, unit_name: str, meta: charm.CharmMeta, config: '_RawConfig'): self._networks: Dict[Tuple[Optional[str], Optional[int]], _NetworkDict] = {} self._reboot_count = 0 self._running_action: Optional[_RunningAction] = None + self._cloud_spec: Optional[model.CloudSpec] = None def _validate_relation_access(self, relation_name: str, relations: List[model.Relation]): """Ensures that the named relation exists/has been added. @@ -2709,6 +2753,12 @@ def reboot(self, now: bool = False): # to handle everything after the exit. raise SystemExit() + def credential_get(self) -> model.CloudSpec: + if not self._cloud_spec: + raise model.ModelError( + 'ERROR cloud spec is empty, set it with `Harness.set_cloud_spec()` first') + return self._cloud_spec + @_copy_docstrings(pebble.ExecProcess) class _TestingExecProcess: diff --git a/test/test_model.py b/test/test_model.py index 9155217d8..c1f990c65 100644 --- a/test/test_model.py +++ b/test/test_model.py @@ -3791,5 +3791,131 @@ def test_repr(self): ) +class TestCloudCredential(unittest.TestCase): + def test_from_dict(self): + d = { + 'auth-type': 'certificate', + } + cloud_cred = ops.CloudCredential.from_dict(d) + self.assertEqual(cloud_cred.auth_type, d['auth-type']) + self.assertEqual(cloud_cred.attributes, {}) + self.assertEqual(cloud_cred.redacted, []) + + def test_from_dict_full(self): + d = { + 'auth-type': 'certificate', + 'attrs': { + 'client-cert': 'foo', + 'client-key': 'bar', + 'server-cert': 'baz' + }, + 'redacted': ['foo'] + } + cloud_cred = ops.CloudCredential.from_dict(d) + self.assertEqual(cloud_cred.auth_type, d['auth-type']) + self.assertEqual(cloud_cred.attributes, d['attrs']) + self.assertEqual(cloud_cred.redacted, d['redacted']) + + +class TestCloudSpec(unittest.TestCase): + def test_from_dict(self): + cloud_spec = ops.CloudSpec.from_dict( + { + 'type': 'lxd', + 'name': 'localhost', + } + ) + self.assertEqual(cloud_spec.type, 'lxd') + self.assertEqual(cloud_spec.name, 'localhost') + self.assertEqual(cloud_spec.region, None) + self.assertEqual(cloud_spec.endpoint, None) + self.assertEqual(cloud_spec.identity_endpoint, None) + self.assertEqual(cloud_spec.storage_endpoint, None) + self.assertIsNone(cloud_spec.credential) + self.assertEqual(cloud_spec.ca_certificates, []) + self.assertEqual(cloud_spec.skip_tls_verify, False) + self.assertEqual(cloud_spec.is_controller_cloud, False) + + def test_from_dict_full(self): + cred = { + 'auth-type': 'certificate', + 'attrs': { + 'client-cert': 'foo', + 'client-key': 'bar', + 'server-cert': 'baz' + }, + 'redacted': ['foo'] + } + d = { + 'type': 'lxd', + 'name': 'localhost', + 'region': 'localhost', + 'endpoint': 'https://10.76.251.1:8443', + 'credential': cred, + 'identity-endpoint': 'foo', + 'storage-endpoint': 'bar', + 'cacertificates': ['foo', 'bar'], + 'skip-tls-verify': False, + 'is-controller-cloud': True, + } + cloud_spec = ops.CloudSpec.from_dict(d) + self.assertEqual(cloud_spec.type, d['type']) + self.assertEqual(cloud_spec.name, d['name']) + self.assertEqual(cloud_spec.region, d['region']) + self.assertEqual(cloud_spec.endpoint, d['endpoint']) + self.assertEqual(cloud_spec.credential, ops.CloudCredential.from_dict(cred)) + self.assertEqual(cloud_spec.identity_endpoint, d['identity-endpoint']) + self.assertEqual(cloud_spec.storage_endpoint, d['storage-endpoint']) + self.assertEqual(cloud_spec.ca_certificates, d['cacertificates']) + self.assertEqual(cloud_spec.skip_tls_verify, False) + self.assertEqual(cloud_spec.is_controller_cloud, True) + + def test_from_dict_no_credential(self): + d = { + 'type': 'lxd', + 'name': 'localhost', + 'region': 'localhost', + 'endpoint': 'https://10.76.251.1:8443', + 'identity-endpoint': 'foo', + 'storage-endpoint': 'bar', + 'cacertificates': ['foo', 'bar'], + 'skip-tls-verify': False, + 'is-controller-cloud': True, + } + cloud_spec = ops.CloudSpec.from_dict(d) + self.assertEqual(cloud_spec.type, d['type']) + self.assertEqual(cloud_spec.name, d['name']) + self.assertEqual(cloud_spec.region, d['region']) + self.assertEqual(cloud_spec.endpoint, d['endpoint']) + self.assertIsNone(cloud_spec.credential) + self.assertEqual(cloud_spec.identity_endpoint, d['identity-endpoint']) + self.assertEqual(cloud_spec.storage_endpoint, d['storage-endpoint']) + self.assertEqual(cloud_spec.ca_certificates, d['cacertificates']) + self.assertEqual(cloud_spec.skip_tls_verify, False) + self.assertEqual(cloud_spec.is_controller_cloud, True) + + +class TestGetCloudSpec(unittest.TestCase): + def setUp(self): + self.model = ops.Model(ops.CharmMeta(), _ModelBackend('myapp/0')) + + def test_success(self): + fake_script(self, 'credential-get', """echo '{"type": "lxd", "name": "localhost"}'""") + cloud_spec = self.model.get_cloud_spec() + self.assertEqual(cloud_spec.type, 'lxd') + self.assertEqual(cloud_spec.name, 'localhost') + self.assertEqual(fake_script_calls(self, clear=True), + [['credential-get', '--format=json']]) + + def test_error(self): + fake_script( + self, + 'credential-get', + """echo 'ERROR cannot access cloud credentials' >&2; exit 1""") + with self.assertRaises(ops.ModelError) as cm: + self.model.get_cloud_spec() + self.assertEqual(str(cm.exception), 'ERROR cannot access cloud credentials\n') + + if __name__ == "__main__": unittest.main() diff --git a/test/test_testing.py b/test/test_testing.py index d531e08d5..504a851e2 100644 --- a/test/test_testing.py +++ b/test/test_testing.py @@ -5854,3 +5854,41 @@ def test_get_notices(self): class TestNotices(unittest.TestCase, _TestingPebbleClientMixin, PebbleNoticesMixin): def setUp(self): self.client = self.get_testing_client() + + +class TestCloudSpec(unittest.TestCase): + def test_set_cloud_spec(self): + class TestCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + framework.observe(self.on.start, self._on_start) + + def _on_start(self, event: ops.StartEvent): + self.cloud_spec = self.model.get_cloud_spec() + + harness = ops.testing.Harness(TestCharm) + self.addCleanup(harness.cleanup) + cloud_spec_dict = { + 'name': 'localhost', + 'type': 'lxd', + 'endpoint': 'https://127.0.0.1:8443', + 'credential': { + 'auth-type': 'certificate', + 'attrs': { + 'client-cert': 'foo', + 'client-key': 'bar', + 'server-cert': 'baz' + }, + }, + } + harness.set_cloud_spec(ops.CloudSpec.from_dict(cloud_spec_dict)) + harness.begin() + harness.charm.on.start.emit() + self.assertEqual(harness.charm.cloud_spec, ops.CloudSpec.from_dict(cloud_spec_dict)) + + def test_get_cloud_spec_without_set_error(self): + harness = ops.testing.Harness(ops.CharmBase) + self.addCleanup(harness.cleanup) + harness.begin() + with self.assertRaises(ops.ModelError): + harness.model.get_cloud_spec()