diff --git a/test/test_charm.py b/test/test_charm.py index 47121422b..5d72bc7d2 100644 --- a/test/test_charm.py +++ b/test/test_charm.py @@ -12,12 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import functools -import os import pathlib -import shutil import tempfile import typing -import unittest from pathlib import Path import pytest @@ -25,175 +22,146 @@ import ops import ops.charm -from ops.model import ModelError, _ModelBackend -from ops.storage import SQLiteStorage +from ops.model import ModelError -from .test_helpers import fake_script, fake_script_calls +from .test_helpers import FakeScript, create_framework -class TestCharm(unittest.TestCase): +@pytest.fixture +def fake_script(request: pytest.FixtureRequest) -> FakeScript: + return FakeScript(request) - def setUp(self): - def restore_env(env: typing.Dict[str, str]): - os.environ.clear() - os.environ.update(env) - self.addCleanup(restore_env, os.environ.copy()) - os.environ['PATH'] = os.pathsep.join([ - str(Path(__file__).parent / 'bin'), - os.environ['PATH']]) - os.environ['JUJU_UNIT_NAME'] = 'local/0' +def test_basic(request: pytest.FixtureRequest): + class MyCharm(ops.CharmBase): - self.tmpdir = Path(tempfile.mkdtemp()) - self.addCleanup(shutil.rmtree, str(self.tmpdir)) - self.meta = ops.CharmMeta() + def __init__(self, framework: ops.Framework): + super().__init__(framework) - class CustomEvent(ops.EventBase): - pass + self.started = False + framework.observe(self.on.start, self._on_start) - class TestCharmEvents(ops.CharmEvents): - custom = ops.EventSource(CustomEvent) - - # Relations events are defined dynamically and modify the class attributes. - # We use a subclass temporarily to prevent these side effects from leaking. - ops.CharmBase.on = TestCharmEvents() # type: ignore - - def cleanup(): - ops.CharmBase.on = ops.CharmEvents() # type: ignore - self.addCleanup(cleanup) - - def create_framework(self): - model = ops.Model(self.meta, _ModelBackend('local/0')) - # we can pass foo_event as event_name because we're not actually testing dispatch - framework = ops.Framework(SQLiteStorage(':memory:'), self.tmpdir, self.meta, - model) - self.addCleanup(framework.close) - return framework - - def test_basic(self): - - class MyCharm(ops.CharmBase): - - def __init__(self, *args: typing.Any): - super().__init__(*args) - - self.started = False - framework.observe(self.on.start, self._on_start) - - def _on_start(self, event: ops.EventBase): - self.started = True - - events: typing.List[str] = list(MyCharm.on.events()) # type: ignore - assert 'install' in events - assert 'custom' in events - - framework = self.create_framework() - charm = MyCharm(framework) - charm.on.start.emit() - - assert charm.started - - with pytest.raises(TypeError): - framework.observe(charm.on.start, charm) # type: ignore - - def test_observe_decorated_method(self): - # we test that charm methods decorated with @functools.wraps(wrapper) - # can be observed by Framework. Simpler decorators won't work because - # Framework searches for __self__ and other method things; functools.wraps - # is more careful and it still works, this test is here to ensure that - # it keeps working in future releases, as this is presently the only - # way we know of to cleanly decorate charm event observers. - events: typing.List[ops.EventBase] = [] - - def dec(fn: typing.Any) -> typing.Callable[..., None]: - # simple decorator that appends to the nonlocal - # `events` list all events it receives - @functools.wraps(fn) - def wrapper(charm: 'MyCharm', evt: ops.EventBase): - events.append(evt) - fn(charm, evt) - return wrapper - - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - framework.observe(self.on.start, self._on_start) - self.seen = None - - @dec - def _on_start(self, event: ops.EventBase): - self.seen = event - - framework = self.create_framework() - charm = MyCharm(framework) - charm.on.start.emit() - # check that the event has been seen by the decorator - assert len(events) == 1 - # check that the event has been seen by the observer - assert isinstance(charm.seen, ops.StartEvent) - - def test_observer_not_referenced_warning(self): - class MyObj(ops.Object): - def __init__(self, charm: ops.CharmBase): - super().__init__(charm, "obj") - framework.observe(charm.on.start, self._on_start) - - def _on_start(self, _: ops.StartEvent): - raise RuntimeError() # never reached! - - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - MyObj(self) # not assigned! - framework.observe(self.on.start, self._on_start) - - def _on_start(self, _: ops.StartEvent): - pass # is reached - - framework = self.create_framework() - c = MyCharm(framework) - with self.assertLogs() as logs: - c.on.start.emit() - assert any('Reference to ops.Object' in log for log in logs.output) - - def test_empty_action(self): - meta = ops.CharmMeta.from_yaml('name: my-charm', '') - assert meta.actions == {} - - def test_helper_properties(self): - framework = self.create_framework() - - class MyCharm(ops.CharmBase): - pass + def _on_start(self, event: ops.EventBase): + self.started = True - charm = MyCharm(framework) - assert charm.app == framework.model.app - assert charm.unit == framework.model.unit - assert charm.meta == framework.meta - assert charm.charm_dir == framework.charm_dir - assert charm.config is framework.model.config - - def test_relation_events(self): - - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.seen: typing.List[str] = [] - for rel in ('req1', 'req-2', 'pro1', 'pro-2', 'peer1', 'peer-2'): - # Hook up relation events to generic handler. - self.framework.observe(self.on[rel].relation_joined, self.on_any_relation) - self.framework.observe(self.on[rel].relation_changed, self.on_any_relation) - self.framework.observe(self.on[rel].relation_departed, self.on_any_relation) - self.framework.observe(self.on[rel].relation_broken, self.on_any_relation) - - def on_any_relation(self, event: ops.RelationEvent): - assert event.relation.name == 'req1' - assert event.relation.app is not None - assert event.relation.app.name == 'remote' - self.seen.append(type(event).__name__) + framework = create_framework(request) - # language=YAML - self.meta = ops.CharmMeta.from_yaml(metadata=''' + events: typing.List[str] = list(MyCharm.on.events()) # type: ignore + assert 'install' in events + assert 'custom' in events + + charm = MyCharm(framework) + charm.on.start.emit() + + assert charm.started + + with pytest.raises(TypeError): + framework.observe(charm.on.start, charm) # type: ignore + + +def test_observe_decorated_method(request: pytest.FixtureRequest): + # We test that charm methods decorated with @functools.wraps(wrapper) + # can be observed by Framework. Simpler decorators won't work because + # Framework searches for __self__ and other method things; functools.wraps + # is more careful and it still works, this test is here to ensure that + # it keeps working in future releases, as this is presently the only + # way we know of to cleanly decorate charm event observers. + events: typing.List[ops.EventBase] = [] + + def dec(fn: typing.Any) -> typing.Callable[..., None]: + # simple decorator that appends to the nonlocal + # `events` list all events it receives + @functools.wraps(fn) + def wrapper(charm: 'MyCharm', evt: ops.EventBase): + events.append(evt) + fn(charm, evt) + return wrapper + + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + framework.observe(self.on.start, self._on_start) + self.seen = None + + @dec + def _on_start(self, event: ops.EventBase): + self.seen = event + + framework = create_framework(request) + charm = MyCharm(framework) + charm.on.start.emit() + # check that the event has been seen by the decorator + assert len(events) == 1 + # check that the event has been seen by the observer + assert isinstance(charm.seen, ops.StartEvent) + + +def test_observer_not_referenced_warning( + request: pytest.FixtureRequest, + caplog: pytest.LogCaptureFixture +): + class MyObj(ops.Object): + def __init__(self, charm: ops.CharmBase): + super().__init__(charm, "obj") + framework.observe(charm.on.start, self._on_start) + + def _on_start(self, _: ops.StartEvent): + raise RuntimeError() # never reached! + + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + MyObj(self) # not assigned! + framework.observe(self.on.start, self._on_start) + + def _on_start(self, _: ops.StartEvent): + pass # is reached + + framework = create_framework(request) + c = MyCharm(framework) + c.on.start.emit() + assert 'Reference to ops.Object' in caplog.text + + +def test_empty_action(): + meta = ops.CharmMeta.from_yaml('name: my-charm', '') + assert meta.actions == {} + + +def test_helper_properties(request: pytest.FixtureRequest): + class MyCharm(ops.CharmBase): + pass + + framework = create_framework(request) + charm = MyCharm(framework) + assert charm.app == framework.model.app + assert charm.unit == framework.model.unit + assert charm.meta == framework.meta + assert charm.charm_dir == framework.charm_dir + assert charm.config is framework.model.config + + +def test_relation_events(request: pytest.FixtureRequest): + + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.seen: typing.List[str] = [] + for rel in ('req1', 'req-2', 'pro1', 'pro-2', 'peer1', 'peer-2'): + # Hook up relation events to generic handler. + self.framework.observe(self.on[rel].relation_joined, self.on_any_relation) + self.framework.observe(self.on[rel].relation_changed, self.on_any_relation) + self.framework.observe(self.on[rel].relation_departed, self.on_any_relation) + self.framework.observe(self.on[rel].relation_broken, self.on_any_relation) + + def on_any_relation(self, event: ops.RelationEvent): + assert event.relation.name == 'req1' + assert event.relation.app is not None + assert event.relation.app.name == 'remote' + self.seen.append(type(event).__name__) + + # language=YAML + meta = ops.CharmMeta.from_yaml(metadata=''' name: my-charm requires: req1: @@ -211,59 +179,60 @@ def on_any_relation(self, event: ops.RelationEvent): peer-2: interface: peer2 ''') - - charm = MyCharm(self.create_framework()) - - assert 'pro_2_relation_broken' in repr(charm.on) - - rel = charm.framework.model.get_relation('req1', 1) - app = charm.framework.model.get_app('remote') - unit = charm.framework.model.get_unit('remote/0') - charm.on['req1'].relation_joined.emit(rel, app, unit) - charm.on['req1'].relation_changed.emit(rel, app, unit) - charm.on['req1'].relation_changed.emit(rel, app) - charm.on['req-2'].relation_changed.emit(rel, app, unit) - charm.on['pro1'].relation_departed.emit(rel, app, unit) - charm.on['pro-2'].relation_departed.emit(rel, app, unit) - charm.on['peer1'].relation_broken.emit(rel, app) - charm.on['peer-2'].relation_broken.emit(rel, app) - - assert charm.seen == [ - 'RelationJoinedEvent', - 'RelationChangedEvent', - 'RelationChangedEvent', - 'RelationChangedEvent', - 'RelationDepartedEvent', - 'RelationDepartedEvent', - 'RelationBrokenEvent', - 'RelationBrokenEvent', - ] - - def test_storage_events(self): - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.seen: typing.List[str] = [] - self.framework.observe(self.on['stor1'].storage_attached, self._on_stor1_attach) - self.framework.observe(self.on['stor2'].storage_detaching, self._on_stor2_detach) - self.framework.observe(self.on['stor3'].storage_attached, self._on_stor3_attach) - self.framework.observe(self.on['stor-4'].storage_attached, self._on_stor4_attach) - - def _on_stor1_attach(self, event: ops.StorageAttachedEvent): - self.seen.append(type(event).__name__) - assert event.storage.location == Path("/var/srv/stor1/0") - - def _on_stor2_detach(self, event: ops.StorageDetachingEvent): - self.seen.append(type(event).__name__) - - def _on_stor3_attach(self, event: ops.StorageAttachedEvent): - self.seen.append(type(event).__name__) - - def _on_stor4_attach(self, event: ops.StorageAttachedEvent): - self.seen.append(type(event).__name__) - - # language=YAML - self.meta = ops.CharmMeta.from_yaml(''' + framework = create_framework(request, meta=meta) + charm = MyCharm(framework) + + assert 'pro_2_relation_broken' in repr(charm.on) + + rel = charm.framework.model.get_relation('req1', 1) + app = charm.framework.model.get_app('remote') + unit = charm.framework.model.get_unit('remote/0') + charm.on['req1'].relation_joined.emit(rel, app, unit) + charm.on['req1'].relation_changed.emit(rel, app, unit) + charm.on['req1'].relation_changed.emit(rel, app) + charm.on['req-2'].relation_changed.emit(rel, app, unit) + charm.on['pro1'].relation_departed.emit(rel, app, unit) + charm.on['pro-2'].relation_departed.emit(rel, app, unit) + charm.on['peer1'].relation_broken.emit(rel, app) + charm.on['peer-2'].relation_broken.emit(rel, app) + + assert charm.seen == [ + 'RelationJoinedEvent', + 'RelationChangedEvent', + 'RelationChangedEvent', + 'RelationChangedEvent', + 'RelationDepartedEvent', + 'RelationDepartedEvent', + 'RelationBrokenEvent', + 'RelationBrokenEvent', + ] + + +def test_storage_events(request: pytest.FixtureRequest, fake_script: FakeScript): + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.seen: typing.List[str] = [] + self.framework.observe(self.on['stor1'].storage_attached, self._on_stor1_attach) + self.framework.observe(self.on['stor2'].storage_detaching, self._on_stor2_detach) + self.framework.observe(self.on['stor3'].storage_attached, self._on_stor3_attach) + self.framework.observe(self.on['stor-4'].storage_attached, self._on_stor4_attach) + + def _on_stor1_attach(self, event: ops.StorageAttachedEvent): + self.seen.append(type(event).__name__) + assert event.storage.location == Path("/var/srv/stor1/0") + + def _on_stor2_detach(self, event: ops.StorageDetachingEvent): + self.seen.append(type(event).__name__) + + def _on_stor3_attach(self, event: ops.StorageAttachedEvent): + self.seen.append(type(event).__name__) + + def _on_stor4_attach(self, event: ops.StorageAttachedEvent): + self.seen.append(type(event).__name__) + + # language=YAML + meta = ops.CharmMeta.from_yaml(''' name: my-charm storage: stor-4: @@ -290,124 +259,125 @@ def _on_stor4_attach(self, event: ops.StorageAttachedEvent): type: filesystem ''') - fake_script( - self, - "storage-get", - """ - if [ "$1" = "-s" ]; then - id=${2#*/} - key=${2%/*} - echo "\\"/var/srv/${key}/${id}\\"" # NOQA: test_quote_backslashes - elif [ "$1" = '--help' ]; then - printf '%s\\n' \\ - 'Usage: storage-get [options] []' \\ - ' ' \\ - 'Summary:' \\ - 'print information for storage instance with specified id' \\ - ' ' \\ - 'Options:' \\ - '--format (= smart)' \\ - ' Specify output format (json|smart|yaml)' \\ - '-o, --output (= "")' \\ - ' Specify an output file' \\ - '-s (= test-stor/0)' \\ - ' specify a storage instance by id' \\ - ' ' \\ - 'Details:' \\ - 'When no is supplied, all keys values are printed.' - else - # Return the same path for all disks since `storage-get` - # on attach and detach takes no parameters and is not - # deterministically faked with fake_script - exit 1 - fi - """, - ) - fake_script( - self, - "storage-list", - """ - echo '["disks/0"]' - """, - ) + fake_script.write( + "storage-get", + """ + if [ "$1" = "-s" ]; then + id=${2#*/} + key=${2%/*} + echo "\\"/var/srv/${key}/${id}\\"" # NOQA: test_quote_backslashes + elif [ "$1" = '--help' ]; then + printf '%s\\n' \\ + 'Usage: storage-get [options] []' \\ + ' ' \\ + 'Summary:' \\ + 'print information for storage instance with specified id' \\ + ' ' \\ + 'Options:' \\ + '--format (= smart)' \\ + ' Specify output format (json|smart|yaml)' \\ + '-o, --output (= "")' \\ + ' Specify an output file' \\ + '-s (= test-stor/0)' \\ + ' specify a storage instance by id' \\ + ' ' \\ + 'Details:' \\ + 'When no is supplied, all keys values are printed.' + else + # Return the same path for all disks since `storage-get` + # on attach and detach takes no parameters and is not + # deterministically faked with fake_script + exit 1 + fi + """, + ) + fake_script.write( + "storage-list", + """ + echo '["disks/0"]' + """, + ) + + assert meta.storages['stor1'].multiple_range is None + assert meta.storages['stor2'].multiple_range == (2, 2) + assert meta.storages['stor3'].multiple_range == (2, None) + assert meta.storages['stor-4'].multiple_range == (2, 4) + assert meta.storages['stor-plus'].multiple_range == (10, None) + + framework = create_framework(request, meta=meta) + charm = MyCharm(framework) + + charm.on['stor1'].storage_attached.emit(ops.Storage("stor1", 0, charm.model._backend)) + charm.on['stor2'].storage_detaching.emit(ops.Storage("stor2", 0, charm.model._backend)) + charm.on['stor3'].storage_attached.emit(ops.Storage("stor3", 0, charm.model._backend)) + charm.on['stor-4'].storage_attached.emit(ops.Storage("stor-4", 0, charm.model._backend)) + charm.on['stor-multiple-dashes'].storage_attached.emit( + ops.Storage("stor-multiple-dashes", 0, charm.model._backend)) + + assert charm.seen == [ + 'StorageAttachedEvent', + 'StorageDetachingEvent', + 'StorageAttachedEvent', + 'StorageAttachedEvent', + ] + + +def test_workload_events(request: pytest.FixtureRequest): + + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.seen: typing.List[str] = [] + for workload in ('container-a', 'containerb'): + # Hook up relation events to generic handler. + self.framework.observe( + self.on[workload].pebble_ready, + self.on_any_pebble_ready) + self.framework.observe( + self.on[workload].pebble_custom_notice, + self.on_any_pebble_custom_notice, + ) - assert self.meta.storages['stor1'].multiple_range is None - assert self.meta.storages['stor2'].multiple_range == (2, 2) - assert self.meta.storages['stor3'].multiple_range == (2, None) - assert self.meta.storages['stor-4'].multiple_range == (2, 4) - assert self.meta.storages['stor-plus'].multiple_range == (10, None) - - charm = MyCharm(self.create_framework()) - - charm.on['stor1'].storage_attached.emit(ops.Storage("stor1", 0, charm.model._backend)) - charm.on['stor2'].storage_detaching.emit(ops.Storage("stor2", 0, charm.model._backend)) - charm.on['stor3'].storage_attached.emit(ops.Storage("stor3", 0, charm.model._backend)) - charm.on['stor-4'].storage_attached.emit(ops.Storage("stor-4", 0, charm.model._backend)) - charm.on['stor-multiple-dashes'].storage_attached.emit( - ops.Storage("stor-multiple-dashes", 0, charm.model._backend)) - - assert charm.seen == [ - 'StorageAttachedEvent', - 'StorageDetachingEvent', - 'StorageAttachedEvent', - 'StorageAttachedEvent', - ] - - def test_workload_events(self): - - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.seen: typing.List[str] = [] - for workload in ('container-a', 'containerb'): - # Hook up relation events to generic handler. - self.framework.observe( - self.on[workload].pebble_ready, - self.on_any_pebble_ready) - self.framework.observe( - self.on[workload].pebble_custom_notice, - self.on_any_pebble_custom_notice, - ) - - def on_any_pebble_ready(self, event: ops.PebbleReadyEvent): - self.seen.append(type(event).__name__) - - def on_any_pebble_custom_notice(self, event: ops.PebbleCustomNoticeEvent): - self.seen.append(type(event).__name__) + def on_any_pebble_ready(self, event: ops.PebbleReadyEvent): + self.seen.append(type(event).__name__) - # language=YAML - self.meta = ops.CharmMeta.from_yaml(metadata=''' + def on_any_pebble_custom_notice(self, event: ops.PebbleCustomNoticeEvent): + self.seen.append(type(event).__name__) + + # language=YAML + meta = ops.CharmMeta.from_yaml(metadata=''' name: my-charm containers: container-a: containerb: ''') + framework = create_framework(request, meta=meta) + charm = MyCharm(framework) - charm = MyCharm(self.create_framework()) + assert 'container_a_pebble_ready' in repr(charm.on) + assert 'containerb_pebble_ready' in repr(charm.on) - assert 'container_a_pebble_ready' in repr(charm.on) - assert 'containerb_pebble_ready' in repr(charm.on) + charm.on['container-a'].pebble_ready.emit( + charm.framework.model.unit.get_container('container-a')) + charm.on['containerb'].pebble_ready.emit( + charm.framework.model.unit.get_container('containerb')) - charm.on['container-a'].pebble_ready.emit( - charm.framework.model.unit.get_container('container-a')) - charm.on['containerb'].pebble_ready.emit( - charm.framework.model.unit.get_container('containerb')) + charm.on['container-a'].pebble_custom_notice.emit( + charm.framework.model.unit.get_container('container-a'), '1', 'custom', 'x') + charm.on['containerb'].pebble_custom_notice.emit( + charm.framework.model.unit.get_container('containerb'), '2', 'custom', 'y') - charm.on['container-a'].pebble_custom_notice.emit( - charm.framework.model.unit.get_container('container-a'), '1', 'custom', 'x') - charm.on['containerb'].pebble_custom_notice.emit( - charm.framework.model.unit.get_container('containerb'), '2', 'custom', 'y') + assert charm.seen == [ + 'PebbleReadyEvent', + 'PebbleReadyEvent', + 'PebbleCustomNoticeEvent', + 'PebbleCustomNoticeEvent', + ] - assert charm.seen == [ - 'PebbleReadyEvent', - 'PebbleReadyEvent', - 'PebbleCustomNoticeEvent', - 'PebbleCustomNoticeEvent', - ] - def test_relations_meta(self): - # language=YAML - self.meta = ops.CharmMeta.from_yaml(''' +def test_relations_meta(): + # language=YAML + meta = ops.CharmMeta.from_yaml(metadata=''' name: my-charm requires: database: @@ -419,20 +389,21 @@ def test_relations_meta(self): optional: true ''') - assert self.meta.requires['database'].interface_name == 'mongodb' - assert self.meta.requires['database'].limit == 1 - assert self.meta.requires['database'].scope == 'container' - assert not self.meta.requires['database'].optional + assert meta.requires['database'].interface_name == 'mongodb' + assert meta.requires['database'].limit == 1 + assert meta.requires['database'].scope == 'container' + assert not meta.requires['database'].optional - assert self.meta.requires['metrics'].interface_name == 'prometheus-scraping' - assert self.meta.requires['metrics'].limit is None - assert self.meta.requires['metrics'].scope == 'global' # Default value - assert self.meta.requires['metrics'].optional + assert meta.requires['metrics'].interface_name == 'prometheus-scraping' + assert meta.requires['metrics'].limit is None + assert meta.requires['metrics'].scope == 'global' # Default value + assert meta.requires['metrics'].optional - def test_relations_meta_limit_type_validation(self): - with pytest.raises(TypeError, match=r"limit should be an int, not "): - # language=YAML - self.meta = ops.CharmMeta.from_yaml(''' + +def test_relations_meta_limit_type_validation(): + with pytest.raises(TypeError, match=r"limit should be an int, not "): + # language=YAML + ops.CharmMeta.from_yaml(''' name: my-charm requires: database: @@ -440,13 +411,14 @@ def test_relations_meta_limit_type_validation(self): limit: foobar ''') - def test_relations_meta_scope_type_validation(self): - with pytest.raises( - TypeError, - match="scope should be one of 'global', 'container'; not 'foobar'" - ): - # language=YAML - self.meta = ops.CharmMeta.from_yaml(''' + +def test_relations_meta_scope_type_validation(): + with pytest.raises( + TypeError, + match="scope should be one of 'global', 'container'; not 'foobar'" + ): + # language=YAML + ops.CharmMeta.from_yaml(''' name: my-charm requires: database: @@ -454,10 +426,57 @@ def test_relations_meta_scope_type_validation(self): scope: foobar ''') - @classmethod - def _get_action_test_meta(cls): - # language=YAML - return ops.CharmMeta.from_yaml(metadata=''' + +def test_meta_from_charm_root(): + with tempfile.TemporaryDirectory() as d: + td = pathlib.Path(d) + (td / 'metadata.yaml').write_text( + yaml.safe_dump( + {"name": "bob", + "requires": { + "foo": + {"interface": "bar"} + }})) + meta = ops.CharmMeta.from_charm_root(td) + assert meta.name == "bob" + assert meta.requires['foo'].interface_name == "bar" + + +def test_actions_from_charm_root(): + with tempfile.TemporaryDirectory() as d: + td = pathlib.Path(d) + (td / 'actions.yaml').write_text( + yaml.safe_dump( + {"foo": { + "description": "foos the bar", + "additionalProperties": False + }} + ) + ) + (td / 'metadata.yaml').write_text( + yaml.safe_dump( + {"name": "bob", + "requires": { + "foo": + {"interface": "bar"} + }})) + + meta = ops.CharmMeta.from_charm_root(td) + assert meta.name == "bob" + assert meta.requires['foo'].interface_name == "bar" + assert not meta.actions['foo'].additional_properties + assert meta.actions['foo'].description == "foos the bar" + + +def _setup_test_action(fake_script: FakeScript): + fake_script.write('action-get', """echo '{"foo-name": "name", "silent": true}'""") + fake_script.write('action-set', "") + fake_script.write('action-log', "") + fake_script.write('action-fail', "") + + +def _get_action_test_meta(): + return ops.CharmMeta.from_yaml(metadata=''' name: my-charm ''', actions=''' foo-bar: @@ -477,141 +496,108 @@ def _get_action_test_meta(cls): additionalProperties: false ''') - def test_meta_from_charm_root(self): - with tempfile.TemporaryDirectory() as d: - td = pathlib.Path(d) - (td / 'metadata.yaml').write_text( - yaml.safe_dump( - {"name": "bob", - "requires": { - "foo": - {"interface": "bar"} - }})) - meta = ops.CharmMeta.from_charm_root(td) - assert meta.name == "bob" - assert meta.requires['foo'].interface_name == "bar" - - def test_actions_from_charm_root(self): - with tempfile.TemporaryDirectory() as d: - td = pathlib.Path(d) - (td / 'actions.yaml').write_text( - yaml.safe_dump( - {"foo": { - "description": "foos the bar", - "additionalProperties": False - }} - ) - ) - (td / 'metadata.yaml').write_text( - yaml.safe_dump( - {"name": "bob", - "requires": { - "foo": - {"interface": "bar"} - }})) - - meta = ops.CharmMeta.from_charm_root(td) - assert meta.name == "bob" - assert meta.requires['foo'].interface_name == "bar" - assert not meta.actions['foo'].additional_properties - assert meta.actions['foo'].description == "foos the bar" - - def _setup_test_action(self): - fake_script(self, 'action-get', """echo '{"foo-name": "name", "silent": true}'""") - fake_script(self, 'action-set', "") - fake_script(self, 'action-log', "") - fake_script(self, 'action-fail', "") - self.meta = self._get_action_test_meta() - - def test_action_events(self): - - class MyCharm(ops.CharmBase): - - def __init__(self, *args: typing.Any): - super().__init__(*args) - framework.observe(self.on.foo_bar_action, self._on_foo_bar_action) - framework.observe(self.on.start_action, self._on_start_action) - - def _on_foo_bar_action(self, event: ops.ActionEvent): - self.seen_action_params = event.params - event.log('test-log') - event.set_results({'res': 'val with spaces', 'id': event.id}) - event.fail('test-fail') - - def _on_start_action(self, event: ops.ActionEvent): - pass - - self._setup_test_action() - framework = self.create_framework() - charm = MyCharm(framework) - - events: typing.List[str] = list(MyCharm.on.events()) # type: ignore - assert 'foo_bar_action' in events - assert 'start_action' in events - - action_id = "1234" - charm.on.foo_bar_action.emit(id=action_id) - assert charm.seen_action_params == {"foo-name": "name", "silent": True} - assert fake_script_calls(self) == [ - ['action-get', '--format=json'], - ['action-log', "test-log"], - ['action-set', "res=val with spaces", f"id={action_id}"], - ['action-fail', "test-fail"], - ] - - def test_invalid_action_results(self): - - class MyCharm(ops.CharmBase): - - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.res: typing.Dict[str, typing.Any] = {} - framework.observe(self.on.foo_bar_action, self._on_foo_bar_action) - - def _on_foo_bar_action(self, event: ops.ActionEvent): - event.set_results(self.res) - - self._setup_test_action() - framework = self.create_framework() - charm = MyCharm(framework) - - for bad_res in ( - {'a': {'b': 'c'}, 'a.b': 'c'}, - {'a': {'B': 'c'}}, - {'a': {(1, 2): 'c'}}, - {'a': {None: 'c'}}, - {'aBc': 'd'}): - charm.res = bad_res - - with pytest.raises(ValueError): - charm.on.foo_bar_action.emit(id='1') - - def _test_action_event_defer_fails(self, cmd_type: str): - - class MyCharm(ops.CharmBase): - - def __init__(self, *args: typing.Any): - super().__init__(*args) - framework.observe(self.on.start_action, self._on_start_action) - - def _on_start_action(self, event: ops.ActionEvent): - event.defer() - - fake_script(self, f"{cmd_type}-get", """echo '{"foo-name": "name", "silent": true}'""") - self.meta = self._get_action_test_meta() - - os.environ[f'JUJU_{cmd_type.upper()}_NAME'] = 'start' - framework = self.create_framework() - charm = MyCharm(framework) - - with pytest.raises(RuntimeError): - charm.on.start_action.emit(id='2') - def test_action_event_defer_fails(self): - self._test_action_event_defer_fails('action') +def test_action_events(request: pytest.FixtureRequest, fake_script: FakeScript): + + class MyCharm(ops.CharmBase): + + def __init__(self, framework: ops.Framework): + super().__init__(framework) + framework.observe(self.on.foo_bar_action, self._on_foo_bar_action) + framework.observe(self.on.start_action, self._on_start_action) + + def _on_foo_bar_action(self, event: ops.ActionEvent): + self.seen_action_params = event.params + event.log('test-log') + event.set_results({'res': 'val with spaces', 'id': event.id}) + event.fail('test-fail') + + def _on_start_action(self, event: ops.ActionEvent): + pass - def test_containers(self): - meta = ops.CharmMeta.from_yaml(""" + _setup_test_action(fake_script) + meta = _get_action_test_meta() + framework = create_framework(request, meta=meta) + charm = MyCharm(framework) + + events: typing.List[str] = list(MyCharm.on.events()) # type: ignore + assert 'foo_bar_action' in events + assert 'start_action' in events + + action_id = "1234" + charm.on.foo_bar_action.emit(id=action_id) + assert charm.seen_action_params == {"foo-name": "name", "silent": True} + assert fake_script.calls() == [ + ['action-get', '--format=json'], + ['action-log', "test-log"], + ['action-set', "res=val with spaces", f"id={action_id}"], + ['action-fail', "test-fail"], + ] + + +@pytest.mark.parametrize("bad_res", [ + {'a': {'b': 'c'}, 'a.b': 'c'}, + {'a': {'B': 'c'}}, + {'a': {(1, 2): 'c'}}, + {'a': {None: 'c'}}, + {'aBc': 'd'} +]) +def test_invalid_action_results( + request: pytest.FixtureRequest, + fake_script: FakeScript, + bad_res: typing.Dict[str, typing.Any] +): + + class MyCharm(ops.CharmBase): + + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.res: typing.Dict[str, typing.Any] = {} + framework.observe(self.on.foo_bar_action, self._on_foo_bar_action) + + def _on_foo_bar_action(self, event: ops.ActionEvent): + event.set_results(self.res) + + _setup_test_action(fake_script) + meta = _get_action_test_meta() + framework = create_framework(request, meta=meta) + charm = MyCharm(framework) + + charm.res = bad_res + with pytest.raises(ValueError): + charm.on.foo_bar_action.emit(id='1') + + +def test_action_event_defer_fails( + request: pytest.FixtureRequest, + monkeypatch: pytest.MonkeyPatch, + fake_script: FakeScript +): + + cmd_type = 'action' + + class MyCharm(ops.CharmBase): + + def __init__(self, framework: ops.Framework): + super().__init__(framework) + framework.observe(self.on.start_action, self._on_start_action) + + def _on_start_action(self, event: ops.ActionEvent): + event.defer() + + fake_script.write(f"{cmd_type}-get", + """echo '{"foo-name": "name", "silent": true}'""") + monkeypatch.setenv(f'JUJU_{cmd_type.upper()}_NAME', 'start') + meta = _get_action_test_meta() + framework = create_framework(request, meta=meta) + charm = MyCharm(framework) + + with pytest.raises(RuntimeError): + charm.on.start_action.emit(id='2') + + +def test_containers(): + meta = ops.CharmMeta.from_yaml(""" name: k8s-charm containers: test1: @@ -619,13 +605,14 @@ def test_containers(self): test2: k: v """) - assert isinstance(meta.containers['test1'], ops.ContainerMeta) - assert isinstance(meta.containers['test2'], ops.ContainerMeta) - assert meta.containers['test1'].name == 'test1' - assert meta.containers['test2'].name == 'test2' + assert isinstance(meta.containers['test1'], ops.ContainerMeta) + assert isinstance(meta.containers['test2'], ops.ContainerMeta) + assert meta.containers['test1'].name == 'test1' + assert meta.containers['test2'].name == 'test2' - def test_containers_storage(self): - meta = ops.CharmMeta.from_yaml(""" + +def test_containers_storage(): + meta = ops.CharmMeta.from_yaml(""" name: k8s-charm storage: data: @@ -655,23 +642,23 @@ def test_containers_storage(self): architectures: - arm """) - assert isinstance(meta.containers['test1'], ops.ContainerMeta) - assert isinstance(meta.containers['test1'].mounts["data"], ops.ContainerStorageMeta) - assert meta.containers['test1'].mounts["data"].location == '/test/storagemount' - assert meta.containers['test1'].mounts["other"].location == '/test/otherdata' - assert meta.storages['other'].properties == ['transient'] - assert meta.containers['test1'].resource == 'ubuntu-22.10' - assert meta.containers['test2'].bases is not None - assert len(meta.containers['test2'].bases) == 2 - assert meta.containers['test2'].bases[0].os_name == 'ubuntu' - assert meta.containers['test2'].bases[0].channel == '23.10' - assert meta.containers['test2'].bases[0].architectures == ['amd64'] - assert meta.containers['test2'].bases[1].os_name == 'ubuntu' - assert meta.containers['test2'].bases[1].channel == '23.04/stable/fips' - assert meta.containers['test2'].bases[1].architectures == ['arm'] - # It's an error to specify both the 'resource' and the 'bases' fields. - with pytest.raises(ModelError): - ops.CharmMeta.from_yaml(""" + assert isinstance(meta.containers['test1'], ops.ContainerMeta) + assert isinstance(meta.containers['test1'].mounts["data"], ops.ContainerStorageMeta) + assert meta.containers['test1'].mounts["data"].location == '/test/storagemount' + assert meta.containers['test1'].mounts["other"].location == '/test/otherdata' + assert meta.storages['other'].properties == ['transient'] + assert meta.containers['test1'].resource == 'ubuntu-22.10' + assert meta.containers['test2'].bases is not None + assert len(meta.containers['test2'].bases) == 2 + assert meta.containers['test2'].bases[0].os_name == 'ubuntu' + assert meta.containers['test2'].bases[0].channel == '23.10' + assert meta.containers['test2'].bases[0].architectures == ['amd64'] + assert meta.containers['test2'].bases[1].os_name == 'ubuntu' + assert meta.containers['test2'].bases[1].channel == '23.04/stable/fips' + assert meta.containers['test2'].bases[1].architectures == ['arm'] + # It's an error to specify both the 'resource' and the 'bases' fields. + with pytest.raises(ModelError): + ops.CharmMeta.from_yaml(""" name: invalid-charm containers: test1: @@ -682,8 +669,9 @@ def test_containers_storage(self): resource: ubuntu-23.10 """) - def test_containers_storage_multiple_mounts(self): - meta = ops.CharmMeta.from_yaml(""" + +def test_containers_storage_multiple_mounts(): + meta = ops.CharmMeta.from_yaml(""" name: k8s-charm storage: data: @@ -697,262 +685,273 @@ def test_containers_storage_multiple_mounts(self): - storage: data location: /test/otherdata """) - assert isinstance(meta.containers['test1'], ops.ContainerMeta) - assert isinstance(meta.containers['test1'].mounts["data"], ops.ContainerStorageMeta) - assert meta.containers['test1'].mounts["data"].locations[0] == \ - '/test/storagemount' - assert meta.containers['test1'].mounts["data"].locations[1] == '/test/otherdata' - - with pytest.raises(RuntimeError): - meta.containers["test1"].mounts["data"].location - - def test_secret_events(self): - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.seen: typing.List[str] = [] - self.framework.observe(self.on.secret_changed, self.on_secret_changed) - self.framework.observe(self.on.secret_rotate, self.on_secret_rotate) - self.framework.observe(self.on.secret_remove, self.on_secret_remove) - self.framework.observe(self.on.secret_expired, self.on_secret_expired) - - def on_secret_changed(self, event: ops.SecretChangedEvent): - assert event.secret.id == 'secret:changed' - assert event.secret.label is None - self.seen.append(type(event).__name__) - - def on_secret_rotate(self, event: ops.SecretRotateEvent): - assert event.secret.id == 'secret:rotate' - assert event.secret.label == 'rot' - self.seen.append(type(event).__name__) - - def on_secret_remove(self, event: ops.SecretRemoveEvent): - assert event.secret.id == 'secret:remove' - assert event.secret.label == 'rem' - assert event.revision == 7 - self.seen.append(type(event).__name__) - - def on_secret_expired(self, event: ops.SecretExpiredEvent): - assert event.secret.id == 'secret:expired' - assert event.secret.label == 'exp' - assert event.revision == 42 - self.seen.append(type(event).__name__) - - self.meta = ops.CharmMeta.from_yaml(metadata='name: my-charm') - charm = MyCharm(self.create_framework()) - - charm.on.secret_changed.emit('secret:changed', None) - charm.on.secret_rotate.emit('secret:rotate', 'rot') - charm.on.secret_remove.emit('secret:remove', 'rem', 7) - charm.on.secret_expired.emit('secret:expired', 'exp', 42) - - assert charm.seen == [ - 'SecretChangedEvent', - 'SecretRotateEvent', - 'SecretRemoveEvent', - 'SecretExpiredEvent', - ] - - def test_collect_app_status_leader(self): - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.framework.observe(self.on.collect_app_status, self._on_collect_status) - - def _on_collect_status(self, event: ops.CollectStatusEvent): - event.add_status(ops.ActiveStatus()) - event.add_status(ops.BlockedStatus('first')) - event.add_status(ops.WaitingStatus('waiting')) - event.add_status(ops.BlockedStatus('second')) - - fake_script(self, 'is-leader', 'echo true') - fake_script(self, 'status-set', 'exit 0') - - charm = MyCharm(self.create_framework()) - ops.charm._evaluate_status(charm) - - assert fake_script_calls(self, True) == [ - ['is-leader', '--format=json'], - ['status-set', '--application=True', 'blocked', 'first'], - ] - - def test_collect_app_status_no_statuses(self): - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.framework.observe(self.on.collect_app_status, self._on_collect_status) - - def _on_collect_status(self, event: ops.CollectStatusEvent): - pass - - fake_script(self, 'is-leader', 'echo true') - - charm = MyCharm(self.create_framework()) - ops.charm._evaluate_status(charm) + assert isinstance(meta.containers['test1'], ops.ContainerMeta) + assert isinstance(meta.containers['test1'].mounts["data"], ops.ContainerStorageMeta) + assert meta.containers['test1'].mounts["data"].locations[0] == \ + '/test/storagemount' + assert meta.containers['test1'].mounts["data"].locations[1] == '/test/otherdata' + + with pytest.raises(RuntimeError): + meta.containers["test1"].mounts["data"].location + + +def test_secret_events(request: pytest.FixtureRequest): + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.seen: typing.List[str] = [] + self.framework.observe(self.on.secret_changed, self.on_secret_changed) + self.framework.observe(self.on.secret_rotate, self.on_secret_rotate) + self.framework.observe(self.on.secret_remove, self.on_secret_remove) + self.framework.observe(self.on.secret_expired, self.on_secret_expired) + + def on_secret_changed(self, event: ops.SecretChangedEvent): + assert event.secret.id == 'secret:changed' + assert event.secret.label is None + self.seen.append(type(event).__name__) + + def on_secret_rotate(self, event: ops.SecretRotateEvent): + assert event.secret.id == 'secret:rotate' + assert event.secret.label == 'rot' + self.seen.append(type(event).__name__) + + def on_secret_remove(self, event: ops.SecretRemoveEvent): + assert event.secret.id == 'secret:remove' + assert event.secret.label == 'rem' + assert event.revision == 7 + self.seen.append(type(event).__name__) + + def on_secret_expired(self, event: ops.SecretExpiredEvent): + assert event.secret.id == 'secret:expired' + assert event.secret.label == 'exp' + assert event.revision == 42 + self.seen.append(type(event).__name__) + + framework = create_framework(request) + charm = MyCharm(framework) + + charm.on.secret_changed.emit('secret:changed', None) + charm.on.secret_rotate.emit('secret:rotate', 'rot') + charm.on.secret_remove.emit('secret:remove', 'rem', 7) + charm.on.secret_expired.emit('secret:expired', 'exp', 42) + + assert charm.seen == [ + 'SecretChangedEvent', + 'SecretRotateEvent', + 'SecretRemoveEvent', + 'SecretExpiredEvent', + ] + + +def test_collect_app_status_leader(request: pytest.FixtureRequest, fake_script: FakeScript): + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.framework.observe(self.on.collect_app_status, self._on_collect_status) + + def _on_collect_status(self, event: ops.CollectStatusEvent): + event.add_status(ops.ActiveStatus()) + event.add_status(ops.BlockedStatus('first')) + event.add_status(ops.WaitingStatus('waiting')) + event.add_status(ops.BlockedStatus('second')) + + fake_script.write('is-leader', 'echo true') + fake_script.write('status-set', 'exit 0') + + framework = create_framework(request) + charm = MyCharm(framework) + ops.charm._evaluate_status(charm) + + assert fake_script.calls(True) == [ + ['is-leader', '--format=json'], + ['status-set', '--application=True', 'blocked', 'first'], + ] + + +def test_collect_app_status_no_statuses(request: pytest.FixtureRequest, fake_script: FakeScript): + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.framework.observe(self.on.collect_app_status, self._on_collect_status) + + def _on_collect_status(self, event: ops.CollectStatusEvent): + pass - assert fake_script_calls(self, True) == [ - ['is-leader', '--format=json'], - ] + fake_script.write('is-leader', 'echo true') - def test_collect_app_status_non_leader(self): - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.framework.observe(self.on.collect_app_status, self._on_collect_status) + framework = create_framework(request) + charm = MyCharm(framework) + ops.charm._evaluate_status(charm) - def _on_collect_status(self, event: ops.CollectStatusEvent): - raise Exception # shouldn't be called + assert fake_script.calls(True) == [ + ['is-leader', '--format=json'], + ] - fake_script(self, 'is-leader', 'echo false') - charm = MyCharm(self.create_framework()) - ops.charm._evaluate_status(charm) +def test_collect_app_status_non_leader(request: pytest.FixtureRequest, fake_script: FakeScript): + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.framework.observe(self.on.collect_app_status, self._on_collect_status) - assert fake_script_calls(self, True) == [ - ['is-leader', '--format=json'], - ] + def _on_collect_status(self, event: ops.CollectStatusEvent): + raise Exception # shouldn't be called - def test_collect_unit_status(self): - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.framework.observe(self.on.collect_unit_status, self._on_collect_status) + fake_script.write('is-leader', 'echo false') - def _on_collect_status(self, event: ops.CollectStatusEvent): - event.add_status(ops.ActiveStatus()) - event.add_status(ops.BlockedStatus('first')) - event.add_status(ops.WaitingStatus('waiting')) - event.add_status(ops.BlockedStatus('second')) + framework = create_framework(request) + charm = MyCharm(framework) + ops.charm._evaluate_status(charm) - fake_script(self, 'is-leader', 'echo false') # called only for collecting app statuses - fake_script(self, 'status-set', 'exit 0') + assert fake_script.calls(True) == [ + ['is-leader', '--format=json'], + ] - charm = MyCharm(self.create_framework()) - ops.charm._evaluate_status(charm) - assert fake_script_calls(self, True) == [ - ['is-leader', '--format=json'], - ['status-set', '--application=False', 'blocked', 'first'], - ] +def test_collect_unit_status(request: pytest.FixtureRequest, fake_script: FakeScript): + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.framework.observe(self.on.collect_unit_status, self._on_collect_status) - def test_collect_unit_status_no_statuses(self): - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.framework.observe(self.on.collect_unit_status, self._on_collect_status) + def _on_collect_status(self, event: ops.CollectStatusEvent): + event.add_status(ops.ActiveStatus()) + event.add_status(ops.BlockedStatus('first')) + event.add_status(ops.WaitingStatus('waiting')) + event.add_status(ops.BlockedStatus('second')) - def _on_collect_status(self, event: ops.CollectStatusEvent): - pass + # called only for collecting app statuses + fake_script.write('is-leader', 'echo false') + fake_script.write('status-set', 'exit 0') - fake_script(self, 'is-leader', 'echo false') # called only for collecting app statuses + framework = create_framework(request) + charm = MyCharm(framework) + ops.charm._evaluate_status(charm) - charm = MyCharm(self.create_framework()) - ops.charm._evaluate_status(charm) + assert fake_script.calls(True) == [ + ['is-leader', '--format=json'], + ['status-set', '--application=False', 'blocked', 'first'], + ] - assert fake_script_calls(self, True) == [ - ['is-leader', '--format=json'], - ] - def test_collect_app_and_unit_status(self): - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.framework.observe(self.on.collect_app_status, self._on_collect_app_status) - self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status) +def test_collect_unit_status_no_statuses(request: pytest.FixtureRequest, fake_script: FakeScript): + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.framework.observe(self.on.collect_unit_status, self._on_collect_status) - def _on_collect_app_status(self, event: ops.CollectStatusEvent): - event.add_status(ops.ActiveStatus()) + def _on_collect_status(self, event: ops.CollectStatusEvent): + pass - def _on_collect_unit_status(self, event: ops.CollectStatusEvent): - event.add_status(ops.WaitingStatus('blah')) + # called only for collecting app statuses + fake_script.write('is-leader', 'echo false') - fake_script(self, 'is-leader', 'echo true') - fake_script(self, 'status-set', 'exit 0') + framework = create_framework(request) + charm = MyCharm(framework) + ops.charm._evaluate_status(charm) - charm = MyCharm(self.create_framework()) - ops.charm._evaluate_status(charm) + assert fake_script.calls(True) == [ + ['is-leader', '--format=json'], + ] - assert fake_script_calls(self, True) == [ - ['is-leader', '--format=json'], - ['status-set', '--application=True', 'active', ''], - ['status-set', '--application=False', 'waiting', 'blah'], - ] - def test_add_status_type_error(self): - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any): - super().__init__(*args) - self.framework.observe(self.on.collect_app_status, self._on_collect_status) +def test_collect_app_and_unit_status(request: pytest.FixtureRequest, fake_script: FakeScript): + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.framework.observe(self.on.collect_app_status, self._on_collect_app_status) + self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status) - def _on_collect_status(self, event: ops.CollectStatusEvent): - event.add_status('active') # type: ignore + def _on_collect_app_status(self, event: ops.CollectStatusEvent): + event.add_status(ops.ActiveStatus()) - fake_script(self, 'is-leader', 'echo true') + def _on_collect_unit_status(self, event: ops.CollectStatusEvent): + event.add_status(ops.WaitingStatus('blah')) - charm = MyCharm(self.create_framework()) - with pytest.raises(TypeError): - ops.charm._evaluate_status(charm) + fake_script.write('is-leader', 'echo true') + fake_script.write('status-set', 'exit 0') - def test_collect_status_priority(self): - class MyCharm(ops.CharmBase): - def __init__(self, *args: typing.Any, statuses: typing.List[str]): - super().__init__(*args) - self.framework.observe(self.on.collect_app_status, self._on_collect_status) - self.statuses = statuses + framework = create_framework(request) + charm = MyCharm(framework) + ops.charm._evaluate_status(charm) - def _on_collect_status(self, event: ops.CollectStatusEvent): - for status in self.statuses: - event.add_status(ops.StatusBase.from_name(status, '')) + assert fake_script.calls(True) == [ + ['is-leader', '--format=json'], + ['status-set', '--application=True', 'active', ''], + ['status-set', '--application=False', 'waiting', 'blah'], + ] - fake_script(self, 'is-leader', 'echo true') - fake_script(self, 'status-set', 'exit 0') - charm = MyCharm(self.create_framework(), statuses=['blocked', 'error']) - ops.charm._evaluate_status(charm) +def test_add_status_type_error(request: pytest.FixtureRequest, fake_script: FakeScript): + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.framework.observe(self.on.collect_app_status, self._on_collect_status) - charm = MyCharm(self.create_framework(), statuses=['waiting', 'blocked']) - ops.charm._evaluate_status(charm) + def _on_collect_status(self, event: ops.CollectStatusEvent): + event.add_status('active') # type: ignore - charm = MyCharm(self.create_framework(), statuses=['waiting', 'maintenance']) - ops.charm._evaluate_status(charm) - - charm = MyCharm(self.create_framework(), statuses=['active', 'waiting']) - ops.charm._evaluate_status(charm) + fake_script.write('is-leader', 'echo true') - charm = MyCharm(self.create_framework(), statuses=['active', 'unknown']) + framework = create_framework(request) + charm = MyCharm(framework) + with pytest.raises(TypeError): ops.charm._evaluate_status(charm) - charm = MyCharm(self.create_framework(), statuses=['unknown']) - ops.charm._evaluate_status(charm) - status_set_calls = [call for call in fake_script_calls(self, True) - if call[0] == 'status-set'] - assert status_set_calls == [ - ['status-set', '--application=True', 'error', ''], - ['status-set', '--application=True', 'blocked', ''], - ['status-set', '--application=True', 'maintenance', ''], - ['status-set', '--application=True', 'waiting', ''], - ['status-set', '--application=True', 'active', ''], - ['status-set', '--application=True', 'unknown', ''], - ] - - -class TestCharmMeta(unittest.TestCase): - def test_links(self): - # Each type of link can be a single item. - meta = ops.CharmMeta.from_yaml(""" +@pytest.mark.parametrize("statuses,expected", [ + (['blocked', 'error'], 'error'), + (['waiting', 'blocked'], 'blocked'), + (['waiting', 'maintenance'], 'maintenance'), + (['active', 'waiting'], 'waiting'), + (['active', 'unknown'], 'active'), + (['unknown'], 'unknown') +]) +def test_collect_status_priority( + request: pytest.FixtureRequest, + fake_script: FakeScript, + statuses: typing.List[str], + expected: str, +): + class MyCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework, statuses: typing.List[str]): + super().__init__(framework) + self.framework.observe(self.on.collect_app_status, self._on_collect_status) + self.statuses = statuses + + def _on_collect_status(self, event: ops.CollectStatusEvent): + for status in self.statuses: + event.add_status(ops.StatusBase.from_name(status, '')) + + fake_script.write('is-leader', 'echo true') + fake_script.write('status-set', 'exit 0') + + framework = create_framework(request) + charm = MyCharm(framework, statuses=statuses) + ops.charm._evaluate_status(charm) + + status_set_calls = [call for call in fake_script.calls(True) + if call[0] == 'status-set'] + assert status_set_calls == [ + ['status-set', '--application=True', expected, ''] + ] + + +def test_meta_links(): + # Each type of link can be a single item. + meta = ops.CharmMeta.from_yaml(""" name: my-charm website: https://example.com source: https://git.example.com issues: https://bugs.example.com docs: https://docs.example.com """) - assert meta.links.websites == ['https://example.com'] - assert meta.links.sources == ['https://git.example.com'] - assert meta.links.issues == ['https://bugs.example.com'] - assert meta.links.documentation == 'https://docs.example.com' - # Other than documentation, they can also all be lists of items. - meta = ops.CharmMeta.from_yaml(""" + assert meta.links.websites == ['https://example.com'] + assert meta.links.sources == ['https://git.example.com'] + assert meta.links.issues == ['https://bugs.example.com'] + assert meta.links.documentation == 'https://docs.example.com' + # Other than documentation, they can also all be lists of items. + meta = ops.CharmMeta.from_yaml(""" name: my-charm website: - https://example.com @@ -964,14 +963,15 @@ def test_links(self): - https://bugs.example.com - https://features.example.com """) - assert meta.links.websites == ['https://example.com', 'https://example.org'] - assert meta.links.sources == [ - 'https://git.example.com', 'https://bzr.example.com'] - assert meta.links.issues == [ - 'https://bugs.example.com', 'https://features.example.com'] - - def test_links_charmcraft_yaml(self): - meta = ops.CharmMeta.from_yaml(""" + assert meta.links.websites == ['https://example.com', 'https://example.org'] + assert meta.links.sources == [ + 'https://git.example.com', 'https://bzr.example.com'] + assert meta.links.issues == [ + 'https://bugs.example.com', 'https://features.example.com'] + + +def test_meta_links_charmcraft_yaml(): + meta = ops.CharmMeta.from_yaml(""" links: documentation: https://discourse.example.com/ issues: @@ -982,25 +982,26 @@ def test_links_charmcraft_yaml(self): - https://example.com/ contact: Support Team """) - assert meta.links.websites == ['https://example.com/'] - assert meta.links.sources == ['https://git.example.com/issues/'] - assert meta.links.issues == ['https://git.example.com/'] - assert meta.links.documentation == 'https://discourse.example.com/' - assert meta.maintainers == ['Support Team '] - - def test_assumes(self): - meta = ops.CharmMeta.from_yaml(""" + assert meta.links.websites == ['https://example.com/'] + assert meta.links.sources == ['https://git.example.com/issues/'] + assert meta.links.issues == ['https://git.example.com/'] + assert meta.links.documentation == 'https://discourse.example.com/' + assert meta.maintainers == ['Support Team '] + + +def test_meta_assumes(): + meta = ops.CharmMeta.from_yaml(""" assumes: - juju """) - assert meta.assumes.features == ['juju'] - meta = ops.CharmMeta.from_yaml(""" + assert meta.assumes.features == ['juju'] + meta = ops.CharmMeta.from_yaml(""" assumes: - juju > 3 - k8s-api """) - assert meta.assumes.features == ['juju > 3', 'k8s-api'] - meta = ops.CharmMeta.from_yaml(""" + assert meta.assumes.features == ['juju > 3', 'k8s-api'] + meta = ops.CharmMeta.from_yaml(""" assumes: - k8s-api - any-of: @@ -1011,11 +1012,11 @@ def test_assumes(self): - juju >= 3.1.5 - juju < 4 """) - assert meta.assumes.features == [ - 'k8s-api', - ops.JujuAssumes( - [ops.JujuAssumes(['juju >= 2.9.44', 'juju < 3']), - ops.JujuAssumes(['juju >= 3.1.5', 'juju < 4'])], - ops.JujuAssumesCondition.ANY - ), - ] + assert meta.assumes.features == [ + 'k8s-api', + ops.JujuAssumes( + [ops.JujuAssumes(['juju >= 2.9.44', 'juju < 3']), + ops.JujuAssumes(['juju >= 3.1.5', 'juju < 4'])], + ops.JujuAssumesCondition.ANY + ), + ] diff --git a/test/test_helpers.py b/test/test_helpers.py index 5f9b0e316..38e52e92a 100644 --- a/test/test_helpers.py +++ b/test/test_helpers.py @@ -20,6 +20,8 @@ import typing import unittest +import pytest + import ops from ops.model import _ModelBackend from ops.storage import SQLiteStorage @@ -74,6 +76,101 @@ def fake_script_calls(test_case: unittest.TestCase, return calls # type: ignore +def create_framework( + request: pytest.FixtureRequest, + *, + meta: typing.Optional[ops.CharmMeta] = None): + env_backup = os.environ.copy() + os.environ['PATH'] = os.pathsep.join([ + str(pathlib.Path(__file__).parent / 'bin'), + os.environ['PATH']]) + os.environ['JUJU_UNIT_NAME'] = 'local/0' + + tmpdir = pathlib.Path(tempfile.mkdtemp()) + + class CustomEvent(ops.EventBase): + pass + + class TestCharmEvents(ops.CharmEvents): + custom = ops.EventSource(CustomEvent) + + # Relations events are defined dynamically and modify the class attributes. + # We use a subclass temporarily to prevent these side effects from leaking. + ops.CharmBase.on = TestCharmEvents() # type: ignore + + if meta is None: + meta = ops.CharmMeta() + model = ops.Model(meta, _ModelBackend('local/0')) # type: ignore + # We can pass foo_event as event_name because we're not actually testing dispatch. + framework = ops.Framework(SQLiteStorage(':memory:'), tmpdir, meta, model) # type: ignore + + def finalizer(): + os.environ.clear() + os.environ.update(env_backup) + shutil.rmtree(tmpdir) + ops.CharmBase.on = ops.CharmEvents() # type: ignore + framework.close() + + request.addfinalizer(finalizer) + + return framework + + +class FakeScript: + def __init__( + self, + request: pytest.FixtureRequest, + path: typing.Optional[pathlib.Path] = None, + ): + if path is None: + fake_script_path = tempfile.mkdtemp('-fake_script') + self.path = pathlib.Path(fake_script_path) + old_path = os.environ['PATH'] + os.environ['PATH'] = os.pathsep.join([fake_script_path, os.environ['PATH']]) + + def cleanup(): + shutil.rmtree(self.path) + os.environ['PATH'] = old_path + + request.addfinalizer(cleanup) + else: + self.path = path + + def write(self, name: str, content: str): + template_args: typing.Dict[str, str] = { + 'name': name, + 'path': self.path.as_posix(), + 'content': content, + } + + path: pathlib.Path = self.path / name + with path.open('wt') as f: + # Before executing the provided script, dump the provided arguments in calls.txt. + # RS 'record separator' (octal 036 in ASCII), FS 'file separator' (octal 034 in ASCII). + f.write( + '''#!/bin/sh +{{ printf {name}; printf "\\036%s" "$@"; printf "\\034"; }} >> {path}/calls.txt +{content}'''.format_map(template_args)) + path.chmod(0o755) + # TODO: this hardcodes the path to bash.exe, which works for now but might + # need to be set via environ or something like that. + path.with_suffix(".bat").write_text( # type: ignore + f'@"C:\\Program Files\\git\\bin\\bash.exe" {path} %*\n') + + def calls(self, clear: bool = False) -> typing.List[typing.List[str]]: + calls_file: pathlib.Path = self.path / 'calls.txt' + if not calls_file.exists(): + return [] + + # Newline and encoding forced to Linux-y defaults because on + # windows they're written from git-bash. + with calls_file.open('r+t', newline='\n', encoding='utf8') as f: + calls = [line.split('\036') for line in f.read().split('\034')[:-1]] + if clear: + f.truncate(0) + return calls + + class FakeScriptTest(unittest.TestCase): def test_fake_script_works(self):