From 169d97b0dd6ba933789881ff6a74e0d1ffacb67e Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 19 Nov 2024 16:52:55 +0100 Subject: [PATCH 1/8] Remove methods that were moved to client repo * The `running` property was renamed to `started` * The running state was removed Signed-off-by: Mathias L. Baumann --- README.md | 48 +++--- RELEASE_NOTES.md | 2 +- src/frequenz/dispatch/__init__.py | 3 +- src/frequenz/dispatch/_dispatch.py | 201 ++--------------------- src/frequenz/dispatch/_dispatcher.py | 48 +++--- src/frequenz/dispatch/_managing_actor.py | 40 +++-- src/frequenz/dispatch/actor.py | 14 +- tests/test_frequenz_dispatch.py | 29 ++-- 8 files changed, 100 insertions(+), 285 deletions(-) diff --git a/README.md b/README.md index 95ae323..870cbfe 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-pyth ```python import os -from frequenz.dispatch import Dispatcher, RunningState +from frequenz.dispatch import Dispatcher from unittest.mock import MagicMock async def run(): @@ -42,29 +42,29 @@ async def run(): changed_running_status_rx = dispatcher.running_status_change.new_receiver() async for dispatch in changed_running_status_rx: - match dispatch.running("DEMO_TYPE"): - case RunningState.RUNNING: - print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") - if actor.is_running: - actor.reconfigure( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) # this will reconfigure the actor - else: - # this will start a new actor with the given components - # and run it for the duration of the dispatch - actor.start( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) - case RunningState.STOPPED: - actor.stop() # this will stop the actor - case RunningState.DIFFERENT_TYPE: - pass # dispatch not for this type + if dispatch.type != "MY_TYPE": + continue + + if dispatch.started: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") + if actor.is_running: + actor.reconfigure( + components=dispatch.target, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) # this will reconfigure the actor + else: + # this will start a new actor with the given components + # and run it for the duration of the dispatch + actor.start( + components=dispatch.target, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) + else: + actor.stop() # this will stop the actor ``` ## Supported Platforms diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 4b5cede..961bb7e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,7 +6,7 @@ ## Upgrading - +* The method `Dispatch.running(type: str)` was replaced with the property `Dispatch.started: bool`. ## New Features diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index 037665c..0dee73a 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -15,7 +15,7 @@ """ -from ._dispatch import Dispatch, RunningState +from ._dispatch import Dispatch from ._dispatcher import Dispatcher, ReceiverFetcher from ._event import Created, Deleted, DispatchEvent, Updated from ._managing_actor import DispatchManagingActor, DispatchUpdate @@ -28,7 +28,6 @@ "ReceiverFetcher", "Updated", "Dispatch", - "RunningState", "DispatchManagingActor", "DispatchUpdate", ] diff --git a/src/frequenz/dispatch/_dispatch.py b/src/frequenz/dispatch/_dispatch.py index e28bc7e..c998fca 100644 --- a/src/frequenz/dispatch/_dispatch.py +++ b/src/frequenz/dispatch/_dispatch.py @@ -3,53 +3,12 @@ """Dispatch type with support for next_run calculation.""" - -import logging from dataclasses import dataclass from datetime import datetime, timezone -from enum import Enum -from typing import Iterator, cast +from typing import Iterator -from dateutil import rrule -from frequenz.client.dispatch.recurrence import Frequency, Weekday from frequenz.client.dispatch.types import Dispatch as BaseDispatch -_logger = logging.getLogger(__name__) -"""The logger for this module.""" - -_RRULE_FREQ_MAP = { - Frequency.MINUTELY: rrule.MINUTELY, - Frequency.HOURLY: rrule.HOURLY, - Frequency.DAILY: rrule.DAILY, - Frequency.WEEKLY: rrule.WEEKLY, - Frequency.MONTHLY: rrule.MONTHLY, -} -"""To map from our Frequency enum to the dateutil library enum.""" - -_RRULE_WEEKDAY_MAP = { - Weekday.MONDAY: rrule.MO, - Weekday.TUESDAY: rrule.TU, - Weekday.WEDNESDAY: rrule.WE, - Weekday.THURSDAY: rrule.TH, - Weekday.FRIDAY: rrule.FR, - Weekday.SATURDAY: rrule.SA, - Weekday.SUNDAY: rrule.SU, -} -"""To map from our Weekday enum to the dateutil library enum.""" - - -class RunningState(Enum): - """The running state of a dispatch.""" - - RUNNING = "RUNNING" - """The dispatch is running.""" - - STOPPED = "STOPPED" - """The dispatch is stopped.""" - - DIFFERENT_TYPE = "DIFFERENT_TYPE" - """The dispatch is for a different type.""" - @dataclass(frozen=True) class Dispatch(BaseDispatch): @@ -87,6 +46,18 @@ def _set_deleted(self) -> None: """Mark the dispatch as deleted.""" object.__setattr__(self, "deleted", True) + @property + def started(self) -> bool: + """Check if the dispatch is started. + + Returns: + True if the dispatch is started, False otherwise. + """ + if self.deleted: + return False + + return super().started + @property def _running_status_notified(self) -> bool: """Check that the latest running state change notification was sent. @@ -100,52 +71,6 @@ def _set_running_status_notified(self) -> None: """Mark the latest running state change notification as sent.""" object.__setattr__(self, "running_state_change_synced", self.update_time) - def running(self, type_: str) -> RunningState: - """Check if the dispatch is currently supposed to be running. - - Args: - type_: The type of the dispatch that should be running. - - Returns: - RUNNING if the dispatch is running, - STOPPED if it is stopped, - DIFFERENT_TYPE if it is for a different type. - """ - if self.type != type_: - return RunningState.DIFFERENT_TYPE - - if not self.active or self.deleted: - return RunningState.STOPPED - - now = datetime.now(tz=timezone.utc) - - if now < self.start_time: - return RunningState.STOPPED - # A dispatch without duration is always running once it started - if self.duration is None: - return RunningState.RUNNING - - if until := self._until(now): - return RunningState.RUNNING if now < until else RunningState.STOPPED - - return RunningState.STOPPED - - @property - def until(self) -> datetime | None: - """Time when the dispatch should end. - - Returns the time that a running dispatch should end. - If the dispatch is not running, None is returned. - - Returns: - The time when the dispatch should end or None if the dispatch is not running. - """ - if not self.active or self.deleted: - return None - - now = datetime.now(tz=timezone.utc) - return self._until(now) - @property # noqa is needed because of a bug in pydoclint that makes it think a `return` without a return # value needs documenting @@ -170,103 +95,3 @@ def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405 while (next_run := self.next_run_after(from_time)) and next_run < now: yield next_run from_time = next_run - - @property - def next_run(self) -> datetime | None: - """Calculate the next run of a dispatch. - - Returns: - The next run of the dispatch or None if the dispatch is finished. - """ - return self.next_run_after(datetime.now(tz=timezone.utc)) - - def next_run_after(self, after: datetime) -> datetime | None: - """Calculate the next run of a dispatch. - - Args: - after: The time to calculate the next run from. - - Returns: - The next run of the dispatch or None if the dispatch is finished. - """ - if ( - not self.recurrence.frequency - or self.recurrence.frequency == Frequency.UNSPECIFIED - or self.duration is None # Infinite duration - ): - if after > self.start_time: - return None - return self.start_time - - # Make sure no weekday is UNSPECIFIED - if Weekday.UNSPECIFIED in self.recurrence.byweekdays: - _logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id) - return None - - # No type information for rrule, so we need to cast - return cast(datetime | None, self._prepare_rrule().after(after, inc=True)) - - def _prepare_rrule(self) -> rrule.rrule: - """Prepare the rrule object. - - Returns: - The rrule object. - - Raises: - ValueError: If the interval is invalid. - """ - count, until = (None, None) - if end := self.recurrence.end_criteria: - count = end.count - until = end.until - - if self.recurrence.interval is None or self.recurrence.interval < 1: - raise ValueError("Interval must be at least 1") - - rrule_obj = rrule.rrule( - freq=_RRULE_FREQ_MAP[self.recurrence.frequency], - dtstart=self.start_time, - count=count, - until=until, - byminute=self.recurrence.byminutes or None, - byhour=self.recurrence.byhours or None, - byweekday=[ - _RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays - ] - or None, - bymonthday=self.recurrence.bymonthdays or None, - bymonth=self.recurrence.bymonths or None, - interval=self.recurrence.interval, - ) - - return rrule_obj - - def _until(self, now: datetime) -> datetime | None: - """Calculate the time when the dispatch should end. - - If no previous run is found, None is returned. - - Args: - now: The current time. - - Returns: - The time when the dispatch should end or None if the dispatch is not running. - - Raises: - ValueError: If the dispatch has no duration. - """ - if self.duration is None: - raise ValueError("_until: Dispatch has no duration") - - if ( - not self.recurrence.frequency - or self.recurrence.frequency == Frequency.UNSPECIFIED - ): - return self.start_time + self.duration - - latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True) - - if not latest_past_start: - return None - - return latest_past_start + self.duration diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 01b327d..357ea60 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -54,7 +54,7 @@ class Dispatcher: Example: Processing running state change dispatches ```python import os - from frequenz.dispatch import Dispatcher, RunningState + from frequenz.dispatch import Dispatcher from unittest.mock import MagicMock async def run(): @@ -75,29 +75,29 @@ async def run(): changed_running_status = dispatcher.running_status_change.new_receiver() async for dispatch in changed_running_status: - match dispatch.running("DEMO_TYPE"): - case RunningState.RUNNING: - print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") - if actor.is_running: - actor.reconfigure( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) # this will reconfigure the actor - else: - # this will start a new actor with the given components - # and run it for the duration of the dispatch - actor.start( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) - case RunningState.STOPPED: - actor.stop() # this will stop the actor - case RunningState.DIFFERENT_TYPE: - pass # dispatch not for this type + if dispatch.type != "YOUR_DISPATCH_TYPE": + continue + + if dispatch.started: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") + if actor.is_running: + actor.reconfigure( + components=dispatch.target, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) # this will reconfigure the actor + else: + # this will start a new actor with the given components + # and run it for the duration of the dispatch + actor.start( + components=dispatch.target, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) + else: + actor.stop() # this will stop the actor ``` Example: Getting notification about dispatch lifecycle events diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py index 5538e95..d7ef29f 100644 --- a/src/frequenz/dispatch/_managing_actor.py +++ b/src/frequenz/dispatch/_managing_actor.py @@ -11,7 +11,7 @@ from frequenz.client.dispatch.types import TargetComponents from frequenz.sdk.actor import Actor -from ._dispatch import Dispatch, RunningState +from ._dispatch import Dispatch _logger = logging.getLogger(__name__) @@ -156,25 +156,23 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None: Args: dispatch: The dispatch to handle. """ - running = dispatch.running(self._dispatch_type) - match running: - case RunningState.STOPPED: - _logger.info("Stopped by dispatch %s", dispatch.id) - await self._stop_actors("Dispatch stopped") - case RunningState.RUNNING: - if self._updates_sender is not None: - _logger.info("Updated by dispatch %s", dispatch.id) - await self._updates_sender.send( - DispatchUpdate( - components=dispatch.target, - dry_run=dispatch.dry_run, - options=dispatch.payload, - ) + if dispatch.type != self._dispatch_type: + _logger.debug("Ignoring dispatch %s", dispatch.id) + return + + if dispatch.started: + if self._updates_sender is not None: + _logger.info("Updated by dispatch %s", dispatch.id) + await self._updates_sender.send( + DispatchUpdate( + components=dispatch.target, + dry_run=dispatch.dry_run, + options=dispatch.payload, ) - - _logger.info("Started by dispatch %s", dispatch.id) - self._start_actors() - case RunningState.DIFFERENT_TYPE: - _logger.debug( - "Unknown dispatch! Ignoring dispatch of type %s", dispatch.type ) + + _logger.info("Started by dispatch %s", dispatch.id) + self._start_actors() + else: + _logger.info("Stopped by dispatch %s", dispatch.id) + await self._stop_actors("Dispatch stopped") diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 2f58845..56a40dd 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -15,7 +15,7 @@ from frequenz.client.dispatch.types import Event from frequenz.sdk.actor import Actor -from ._dispatch import Dispatch, RunningState +from ._dispatch import Dispatch from ._event import Created, Deleted, DispatchEvent, Updated _logger = logging.getLogger(__name__) @@ -142,7 +142,7 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None: # The timer is always a tiny bit delayed, so we need to check if the # actor is supposed to be running now (we're assuming it wasn't already # running, as all checks are done before scheduling) - if dispatch.running(dispatch.type) == RunningState.RUNNING: + if dispatch.started: # If it should be running, schedule the stop event self._schedule_stop(dispatch) # If the actor is not running, we need to schedule the next start @@ -193,7 +193,7 @@ async def _fetch(self) -> None: await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) await self._update_dispatch_schedule_and_notify(None, dispatch) - # Set deleted only here as it influences the result of dispatch.running() + # Set deleted only here as it influences the result of dispatch.started # which is used in above in _running_state_change dispatch._set_deleted() # pylint: disable=protected-access await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) @@ -222,7 +222,7 @@ async def _update_dispatch_schedule_and_notify( self._remove_scheduled(old_dispatch) # If the dispatch was running, we need to notify - if old_dispatch.running(old_dispatch.type) == RunningState.RUNNING: + if old_dispatch.started: await self._send_running_state_change(old_dispatch) # A new dispatch was created @@ -232,7 +232,7 @@ async def _update_dispatch_schedule_and_notify( ), "New dispatch already scheduled?!" # If its currently running, send notification right away - if dispatch.running(dispatch.type) == RunningState.RUNNING: + if dispatch.started: await self._send_running_state_change(dispatch) self._schedule_stop(dispatch) @@ -249,7 +249,7 @@ async def _update_dispatch_schedule_and_notify( if self._update_changed_running_state(dispatch, old_dispatch): await self._send_running_state_change(dispatch) - if dispatch.running(dispatch.type) == RunningState.RUNNING: + if dispatch.started: self._schedule_stop(dispatch) else: self._schedule_start(dispatch) @@ -336,7 +336,7 @@ def _update_changed_running_state( """ # If any of the runtime attributes changed, we need to send a message runtime_state_attributes = [ - "running", + "started", "type", "target", "duration", diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 9771210..3a3c058 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -18,14 +18,7 @@ from frequenz.client.dispatch.types import Dispatch as BaseDispatch from pytest import fixture -from frequenz.dispatch import ( - Created, - Deleted, - Dispatch, - DispatchEvent, - RunningState, - Updated, -) +from frequenz.dispatch import Created, Deleted, Dispatch, DispatchEvent, Updated from frequenz.dispatch.actor import DispatchingActor @@ -241,7 +234,7 @@ async def test_dispatch_inf_duration_deleted( await asyncio.sleep(40) # Expect notification of the dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - assert ready_dispatch.running(sample.type) == RunningState.RUNNING + assert ready_dispatch.started # Now delete the dispatch await actor_env.client.delete( @@ -251,7 +244,7 @@ async def test_dispatch_inf_duration_deleted( await asyncio.sleep(1) # Expect notification to stop the dispatch done_dispatch = await actor_env.running_state_change.receive() - assert done_dispatch.running(sample.type) == RunningState.STOPPED + assert done_dispatch.started is False async def test_dispatch_inf_duration_updated_stopped_started( @@ -272,7 +265,7 @@ async def test_dispatch_inf_duration_updated_stopped_started( await asyncio.sleep(40) # Expect notification of the dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - assert ready_dispatch.running(sample.type) == RunningState.RUNNING + assert ready_dispatch.started # Now update the dispatch to set active=False (stop it) await actor_env.client.update( @@ -284,7 +277,7 @@ async def test_dispatch_inf_duration_updated_stopped_started( await asyncio.sleep(1) # Expect notification to stop the dispatch stopped_dispatch = await actor_env.running_state_change.receive() - assert stopped_dispatch.running(sample.type) == RunningState.STOPPED + assert stopped_dispatch.started is False # Now update the dispatch to set active=True (start it again) await actor_env.client.update( @@ -296,7 +289,7 @@ async def test_dispatch_inf_duration_updated_stopped_started( await asyncio.sleep(1) # Expect notification of the dispatch being ready to run again started_dispatch = await actor_env.running_state_change.receive() - assert started_dispatch.running(sample.type) == RunningState.RUNNING + assert started_dispatch.started async def test_dispatch_inf_duration_updated_to_finite_and_stops( @@ -321,7 +314,7 @@ async def test_dispatch_inf_duration_updated_to_finite_and_stops( await asyncio.sleep(1) # Expect notification of the dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - assert ready_dispatch.running(sample.type) == RunningState.RUNNING + assert ready_dispatch.started # Update the dispatch to set duration to a finite duration that has already passed # The dispatch has been running for 5 seconds; set duration to 5 seconds @@ -335,7 +328,7 @@ async def test_dispatch_inf_duration_updated_to_finite_and_stops( await asyncio.sleep(1) # Expect notification to stop the dispatch because the duration has passed stopped_dispatch = await actor_env.running_state_change.receive() - assert stopped_dispatch.running(sample.type) == RunningState.STOPPED + assert stopped_dispatch.started is False async def test_dispatch_schedule( @@ -396,7 +389,7 @@ async def test_dispatch_inf_duration_updated_to_finite_and_continues( await asyncio.sleep(1) # Expect notification of the dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - assert ready_dispatch.running(sample.type) == RunningState.RUNNING + assert ready_dispatch.started # Update the dispatch to set duration to a finite duration that hasn't passed yet # The dispatch has been running for 5 seconds; set duration to 100 seconds @@ -414,7 +407,7 @@ async def test_dispatch_inf_duration_updated_to_finite_and_continues( await asyncio.sleep(1) # Expect notification to stop the dispatch because the duration has now passed stopped_dispatch = await actor_env.running_state_change.receive() - assert stopped_dispatch.running(sample.type) == RunningState.STOPPED + assert stopped_dispatch.started is False async def test_dispatch_new_but_finished( @@ -497,4 +490,4 @@ async def test_notification_on_actor_start( # Expect notification of the running dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - assert ready_dispatch.running(running_dispatch.type) == RunningState.RUNNING + assert ready_dispatch.started From bcf3d9581772f18220a0f3cd16f855ea029e7d5b Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 3 Dec 2024 14:20:31 +0100 Subject: [PATCH 2/8] Remove unused dependency dateutil The modules were moved to client-dispatch Signed-off-by: Mathias L. Baumann --- pyproject.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4563695..32fa321 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,6 @@ classifiers = [ ] requires-python = ">= 3.11, < 4" dependencies = [ - "python-dateutil >= 2.8.2, < 3.0", "typing-extensions >= 4.11.0, < 5.0.0", # Make sure to update the version for cross-referencing also in the # mkdocs.yml file when changing the version here (look for the config key @@ -74,7 +73,6 @@ dev-mypy = [ "mypy == 1.13.0", "grpc-stubs == 1.53.0.5", # This dependency introduces breaking changes in patch releases "types-Markdown == 3.7.0.20240822", - "types-python-dateutil==2.9.0.20241003", # For checking the noxfile, docs/ script, and tests "frequenz-dispatch[dev-mkdocs,dev-noxfile,dev-pytest]", ] From 5e2c123d42bb0d55360b8af83fa384e007ebf47f Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 3 Dec 2024 14:20:45 +0100 Subject: [PATCH 3/8] Fix formatting in pyproject Signed-off-by: Mathias L. Baumann --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 32fa321..d33bddc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,7 +71,8 @@ dev-mkdocs = [ ] dev-mypy = [ "mypy == 1.13.0", - "grpc-stubs == 1.53.0.5", # This dependency introduces breaking changes in patch releases + # This dependency introduces breaking changes in patch releases + "grpc-stubs == 1.53.0.5", "types-Markdown == 3.7.0.20240822", # For checking the noxfile, docs/ script, and tests "frequenz-dispatch[dev-mkdocs,dev-noxfile,dev-pytest]", From f65bed5b7c7f68b591eec504ae57132fc780b4c9 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 3 Dec 2024 14:47:18 +0100 Subject: [PATCH 4/8] Remove unused RunningState Sent tracking Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_dispatch.py | 42 ++++++------------------------ src/frequenz/dispatch/actor.py | 3 --- tests/test_frequenz_dispatch.py | 11 +------- 3 files changed, 9 insertions(+), 47 deletions(-) diff --git a/src/frequenz/dispatch/_dispatch.py b/src/frequenz/dispatch/_dispatch.py index c998fca..5d0d3a0 100644 --- a/src/frequenz/dispatch/_dispatch.py +++ b/src/frequenz/dispatch/_dispatch.py @@ -17,30 +17,20 @@ class Dispatch(BaseDispatch): deleted: bool = False """Whether the dispatch is deleted.""" - running_state_change_synced: datetime | None = None - """The last time a message was sent about the running state change.""" - def __init__( self, client_dispatch: BaseDispatch, deleted: bool = False, - running_state_change_synced: datetime | None = None, ): """Initialize the dispatch. Args: client_dispatch: The client dispatch. deleted: Whether the dispatch is deleted. - running_state_change_synced: Timestamp of the last running state change message. """ super().__init__(**client_dispatch.__dict__) # Work around frozen to set deleted object.__setattr__(self, "deleted", deleted) - object.__setattr__( - self, - "running_state_change_synced", - running_state_change_synced, - ) def _set_deleted(self) -> None: """Mark the dispatch as deleted.""" @@ -58,40 +48,24 @@ def started(self) -> bool: return super().started - @property - def _running_status_notified(self) -> bool: - """Check that the latest running state change notification was sent. - - Returns: - True if the latest running state change notification was sent, False otherwise. - """ - return self.running_state_change_synced == self.update_time - - def _set_running_status_notified(self) -> None: - """Mark the latest running state change notification as sent.""" - object.__setattr__(self, "running_state_change_synced", self.update_time) - - @property # noqa is needed because of a bug in pydoclint that makes it think a `return` without a return # value needs documenting - def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405 + def missed_runs(self, since: datetime) -> Iterator[datetime]: # noqa: DOC405 """Yield all missed runs of a dispatch. Yields all missed runs of a dispatch. - If a running state change notification was not sent in time - due to connection issues, this method will yield all missed runs - since the last sent notification. + Args: + since: The time to start checking for missed runs. Returns: A generator that yields all missed runs of a dispatch. - """ - if self.update_time == self.running_state_change_synced: - return - from_time = self.update_time + Yields: + datetime: The missed run. + """ now = datetime.now(tz=timezone.utc) - while (next_run := self.next_run_after(from_time)) and next_run < now: + while (next_run := self.next_run_after(since)) and next_run < now: yield next_run - from_time = next_run + since = next_run diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 56a40dd..85a20ae 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -359,6 +359,3 @@ async def _send_running_state_change(self, dispatch: Dispatch) -> None: dispatch: The dispatch that changed. """ await self._running_state_change_sender.send(dispatch) - # Update the last sent notification time - # so we know if this change was already sent - dispatch._set_running_status_notified() # pylint: disable=protected-access diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 3a3c058..1406121 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -144,8 +144,6 @@ async def _test_new_dispatch_created( assert False, "Expected a created event" case Created(dispatch): received = Dispatch(update_dispatch(sample, dispatch)) - received._set_running_status_notified() # pylint: disable=protected-access - dispatch._set_running_status_notified() # pylint: disable=protected-access assert dispatch == received return dispatch @@ -184,10 +182,7 @@ async def test_existing_dispatch_updated( case Created(dispatch) | Deleted(dispatch): assert False, f"Expected an updated event, got {dispatch_event}" case Updated(dispatch): - assert dispatch == Dispatch( - updated, - running_state_change_synced=dispatch.running_state_change_synced, - ) + assert dispatch == Dispatch(updated) await asyncio.sleep(1) @@ -212,7 +207,6 @@ async def test_existing_dispatch_deleted( assert False, "Expected a deleted event" case Deleted(dispatch): sample._set_deleted() # pylint: disable=protected-access - dispatch._set_running_status_notified() # pylint: disable=protected-access assert dispatch == sample @@ -352,9 +346,6 @@ async def test_dispatch_schedule( # Expect notification of the dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - # Set flag we expect to be different to compare the dispatch with the one received - dispatch._set_running_status_notified() # pylint: disable=protected-access - assert ready_dispatch == dispatch assert dispatch.duration is not None From 9345acd49f20e1488135e26628b953977b6361c7 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 3 Dec 2024 18:29:51 +0100 Subject: [PATCH 5/8] Mark dispatch as deleted only in the update function. This makes the code cleaner. Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/actor.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 85a20ae..7bd6e49 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -126,7 +126,6 @@ async def _run(self) -> None: self._dispatches.pop(dispatch.id) await self._update_dispatch_schedule_and_notify(None, dispatch) - dispatch._set_deleted() # pylint: disable=protected-access await self._lifecycle_updates_sender.send( Deleted(dispatch=dispatch) ) @@ -221,8 +220,11 @@ async def _update_dispatch_schedule_and_notify( if not dispatch and old_dispatch: self._remove_scheduled(old_dispatch) + was_running = old_dispatch.started + old_dispatch._set_deleted() # pylint: disable=protected-access) + # If the dispatch was running, we need to notify - if old_dispatch.started: + if was_running: await self._send_running_state_change(old_dispatch) # A new dispatch was created @@ -230,7 +232,6 @@ async def _update_dispatch_schedule_and_notify( assert not self._remove_scheduled( dispatch ), "New dispatch already scheduled?!" - # If its currently running, send notification right away if dispatch.started: await self._send_running_state_change(dispatch) From bb6260d3bb57617ba41eb092101c9ed1b2b30e48 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 3 Dec 2024 18:30:26 +0100 Subject: [PATCH 6/8] Lower CI timeout to expected runtime for faster failures Signed-off-by: Mathias L. Baumann --- .github/workflows/ci.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a33e6b3..cc11ff0 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -78,7 +78,7 @@ jobs: env: NOX_SESSION: ${{ matrix.nox-session }} run: nox -R -e "$NOX_SESSION" - timeout-minutes: 10 + timeout-minutes: 2 # This job runs if all the `nox` matrix jobs ran and succeeded. # It is only used to have a single job that we can require in branch @@ -185,7 +185,7 @@ jobs: --platform linux/${{ matrix.arch }} \ localhost/nox-cross-arch:latest \ bash -c "pip install -e .[dev-noxfile]; nox --install-only -e ${{ matrix.nox-session }}; pip freeze; nox -R -e ${{ matrix.nox-session }}" - timeout-minutes: 30 + timeout-minutes: 3 # This ensures that the runner has access to the pip cache. - name: Reset pip cache ownership From 4a1b0c5924e1fdd258a2f4f84c2da0e932fd7931 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 3 Dec 2024 18:30:43 +0100 Subject: [PATCH 7/8] Add type information to member variable Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_managing_actor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py index d7ef29f..6ed4e57 100644 --- a/src/frequenz/dispatch/_managing_actor.py +++ b/src/frequenz/dispatch/_managing_actor.py @@ -121,7 +121,9 @@ def __init__( """ super().__init__() self._dispatch_rx = running_status_receiver - self._actors = frozenset([actor] if isinstance(actor, Actor) else actor) + self._actors: frozenset[Actor] = frozenset( + [actor] if isinstance(actor, Actor) else actor + ) self._dispatch_type = dispatch_type self._updates_sender = updates_sender From cca1cc8230e4f57fee30fd0ea00c2c79f5f6ef99 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 3 Dec 2024 19:18:48 +0100 Subject: [PATCH 8/8] Remove outdated part of documentation Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_dispatcher.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 357ea60..c0c8a60 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -255,10 +255,6 @@ def running_status_change(self) -> ReceiverFetcher[Dispatch]: - The payload changed - The dispatch was deleted - Note: Reaching the end time (start_time + duration) will not - send a message, except when it was reached by modifying the duration. - - Returns: A new receiver for dispatches whose running status changed. """