Skip to content

Commit

Permalink
Send stop signal when duration is reached (#53)
Browse files Browse the repository at this point in the history
- **Reset release notes**
- **Send STOPPED message when duration is reached**
  • Loading branch information
Marenz authored Sep 16, 2024
2 parents 05f333b + 46d5b17 commit 15e619f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 8 deletions.
9 changes: 3 additions & 6 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@

## Upgrading

- The dispatch high level interface now depends on `frequenz-sdk` version `v1.0.0-rc900`.
- We are now using the version `0.6.0` of the underlying `frequenz-client-dispatch` client library.
- The init parameter of the `Dispatcher` class has been changed to accept a `server_url` instead.
* `Dispatcher.running_state_change` now also sends a message when the duration specified in the dispatch has passed. If no duration is specified, no STOPPED message will be sent.

## New Features

* Using the new dispatch client, we now have support for pagination in the dispatch list request.
* The new client version also supports streaming, however it is not yet used internally in the high level interface.
<!-- Here goes the main new features and examples or instructions on how to use them -->

## Bug Fixes

- Fix documentation cross-linking to the `frequenz-client-dispatch` package.
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
14 changes: 12 additions & 2 deletions src/frequenz/dispatch/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,20 @@ def next_run_info() -> tuple[datetime, datetime] | None:
_logger.info("Dispatch %s scheduled for %s", dispatch.id, next_time)
await asyncio.sleep((next_time - now).total_seconds())

_logger.info("Dispatch ready: %s", dispatch)
_logger.info("Dispatch %s executing...", dispatch)
await self._running_state_change_sender.send(dispatch)

_logger.info("Dispatch finished: %s", dispatch)
# Wait for the duration of the dispatch if set
if dispatch.duration:
_logger.info(
"Dispatch %s running for %s", dispatch.id, dispatch.duration
)
await asyncio.sleep(dispatch.duration.total_seconds())

_logger.info("Dispatch %s runtime duration reached", dispatch.id)
await self._running_state_change_sender.send(dispatch)

_logger.info("Dispatch completed: %s", dispatch)
self._scheduled.pop(dispatch.id)

def _running_state_change(
Expand Down
9 changes: 9 additions & 0 deletions tests/test_frequenz_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ async def test_dispatch_schedule(
fake_time.shift(next_run - _now() - timedelta(seconds=1))
await asyncio.sleep(1)

# Expect notification of the dispatch being ready to run
ready_dispatch = await actor_env.ready_dispatches.receive()

assert ready_dispatch == dispatch

# Shift time to the end of the dispatch
fake_time.shift(dispatch.duration + timedelta(seconds=1))
await asyncio.sleep(1)

# Expect notification to stop the dispatch
done_dispatch = await actor_env.ready_dispatches.receive()
assert done_dispatch == dispatch

0 comments on commit 15e619f

Please sign in to comment.