diff --git a/README.md b/README.md index c50cb3f..5119f8d 100644 --- a/README.md +++ b/README.md @@ -21,17 +21,20 @@ The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-pyth ```python import os -import grpc.aio +from frequenz.dispatch import Dispatcher, RunningState from unittest.mock import MagicMock async def run(): - host = os.getenv("DISPATCH_API_HOST", "localhost") - port = os.getenv("DISPATCH_API_PORT", "50051") + url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") + key = os.getenv("DISPATCH_API_KEY", "some-key") - service_address = f"{host}:{port}" - grpc_channel = grpc.aio.insecure_channel(service_address) microgrid_id = 1 - dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) + + dispatcher = Dispatcher( + microgrid_id=microgrid_id, + server_url=url, + key=key + ) await dispatcher.start() actor = MagicMock() # replace with your actor diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 29ae957..ff2b4a7 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,11 +6,14 @@ ## Upgrading -- The dispatch high level interface now depends on `frequenz-sdk` version `v1.0.0-rc801`. +- The dispatch high level interface now depends on `frequenz-sdk` version `v1.0.0-rc900`. +- We are now using the version `0.6.0` of the underlying `frequenz-client-dispatch` client library. +- The init parameter of the `Dispatcher` class has been changed to accept a `server_url` instead. ## New Features - +* Using the new dispatch client, we now have support for pagination in the dispatch list request. +* The new client version also supports streaming, however it is not yet used internally in the high level interface. ## Bug Fixes diff --git a/mkdocs.yml b/mkdocs.yml index cceca90..716684f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -116,7 +116,7 @@ plugins: # See https://mkdocstrings.github.io/python/usage/#import for details - https://docs.python.org/3/objects.inv - https://frequenz-floss.github.io/frequenz-channels-python/v1.0/objects.inv - - https://frequenz-floss.github.io/frequenz-client-dispatch-python/v0.5/objects.inv + - https://frequenz-floss.github.io/frequenz-client-dispatch-python/v0.6/objects.inv - https://frequenz-floss.github.io/frequenz-sdk-python/v1.0-pre/objects.inv - https://grpc.github.io/grpc/python/objects.inv - https://typing-extensions.readthedocs.io/en/stable/objects.inv diff --git a/pyproject.toml b/pyproject.toml index 0024767..38a50df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,9 +39,9 @@ dependencies = [ # Make sure to update the version for cross-referencing also in the # mkdocs.yml file when changing the version here (look for the config key # plugins.mkdocstrings.handlers.python.import) - "frequenz-sdk == 1.0.0-rc801", + "frequenz-sdk == 1.0.0-rc900", "frequenz-channels >= 1.1.0, < 2.0.0", - "frequenz-client-dispatch >= 0.5.0, < 0.6.0", + "frequenz-client-dispatch >= 0.6.0, < 0.7.0", ] dynamic = ["version"] diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index e97c34c..69fac4b 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -6,7 +6,6 @@ import abc from typing import Protocol, TypeVar -import grpc.aio from frequenz.channels import Broadcast, Receiver from frequenz.client.dispatch import Client @@ -55,24 +54,18 @@ class Dispatcher: Example: Processing running state change dispatches ```python import os - import grpc.aio from frequenz.dispatch import Dispatcher, RunningState from unittest.mock import MagicMock async def run(): - host = os.getenv("DISPATCH_API_HOST", "localhost") - port = os.getenv("DISPATCH_API_PORT", "50051") + url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") key = os.getenv("DISPATCH_API_KEY", "some-key") - service_address = f"{host}:{port}" - grpc_channel = grpc.aio.secure_channel( - service_address, - credentials=grpc.ssl_channel_credentials() - ) + microgrid_id = 1 + dispatcher = Dispatcher( - microgrid_id=1, - grpc_channel=grpc_channel, - svc_addr=service_address, + microgrid_id=microgrid_id, + server_url=url, key=key ) await dispatcher.start() @@ -112,23 +105,17 @@ async def run(): import os from typing import assert_never - import grpc.aio from frequenz.dispatch import Created, Deleted, Dispatcher, Updated async def run(): - host = os.getenv("DISPATCH_API_HOST", "localhost") - port = os.getenv("DISPATCH_API_PORT", "50051") + url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") key = os.getenv("DISPATCH_API_KEY", "some-key") - service_address = f"{host}:{port}" - grpc_channel = grpc.aio.secure_channel( - service_address, - credentials=grpc.ssl_channel_credentials() - ) + microgrid_id = 1 + dispatcher = Dispatcher( - microgrid_id=1, - grpc_channel=grpc_channel, - svc_addr=service_address, + microgrid_id=microgrid_id, + server_url=url, key=key ) await dispatcher.start() # this will start the actor @@ -154,27 +141,19 @@ async def run(): import os from datetime import datetime, timedelta, timezone - import grpc.aio from frequenz.client.common.microgrid.components import ComponentCategory from frequenz.dispatch import Dispatcher async def run(): - host = os.getenv("DISPATCH_API_HOST", "localhost") - port = os.getenv("DISPATCH_API_PORT", "50051") + url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051") key = os.getenv("DISPATCH_API_KEY", "some-key") microgrid_id = 1 - service_address = f"{host}:{port}" - grpc_channel = grpc.aio.secure_channel( - service_address, - credentials=grpc.ssl_channel_credentials() - ) dispatcher = Dispatcher( microgrid_id=microgrid_id, - grpc_channel=grpc_channel, - svc_addr=service_address, + server_url=url, key=key ) await dispatcher.start() # this will start the actor @@ -208,23 +187,21 @@ def __init__( self, *, microgrid_id: int, - grpc_channel: grpc.aio.Channel, - svc_addr: str, + server_url: str, key: str, ): """Initialize the dispatcher. Args: microgrid_id: The microgrid id. - grpc_channel: The gRPC channel. - svc_addr: The service address. + server_url: The URL of the dispatch service. key: The key to access the service. """ self._running_state_channel = Broadcast[Dispatch](name="running_state_change") self._lifecycle_events_channel = Broadcast[DispatchEvent]( name="lifecycle_events" ) - self._client = Client(grpc_channel=grpc_channel, svc_addr=svc_addr, key=key) + self._client = Client(server_url=server_url, key=key) self._actor = DispatchingActor( microgrid_id, self._client, diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 30e137d..6e8a500 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -89,28 +89,27 @@ async def _fetch(self) -> None: try: _logger.info("Fetching dispatches for microgrid %s", self._microgrid_id) - async for client_dispatch in self._client.list( - microgrid_id=self._microgrid_id - ): - dispatch = Dispatch(client_dispatch) - - self._dispatches[dispatch.id] = Dispatch(client_dispatch) - old_dispatch = old_dispatches.pop(dispatch.id, None) - if not old_dispatch: - self._update_dispatch_schedule(dispatch, None) - _logger.info("New dispatch: %s", dispatch) - await self._lifecycle_updates_sender.send( - Created(dispatch=dispatch) - ) - elif dispatch.update_time != old_dispatch.update_time: - self._update_dispatch_schedule(dispatch, old_dispatch) - _logger.info("Updated dispatch: %s", dispatch) - await self._lifecycle_updates_sender.send( - Updated(dispatch=dispatch) - ) - - if self._running_state_change(dispatch, old_dispatch): - await self._send_running_state_change(dispatch) + async for page in self._client.list(microgrid_id=self._microgrid_id): + for client_dispatch in page: + dispatch = Dispatch(client_dispatch) + + self._dispatches[dispatch.id] = Dispatch(client_dispatch) + old_dispatch = old_dispatches.pop(dispatch.id, None) + if not old_dispatch: + self._update_dispatch_schedule(dispatch, None) + _logger.info("New dispatch: %s", dispatch) + await self._lifecycle_updates_sender.send( + Created(dispatch=dispatch) + ) + elif dispatch.update_time != old_dispatch.update_time: + self._update_dispatch_schedule(dispatch, old_dispatch) + _logger.info("Updated dispatch: %s", dispatch) + await self._lifecycle_updates_sender.send( + Updated(dispatch=dispatch) + ) + + if self._running_state_change(dispatch, old_dispatch): + await self._send_running_state_change(dispatch) except grpc.aio.AioRpcError as error: _logger.error("Error fetching dispatches: %s", error)