Skip to content

Commit

Permalink
Update to latest SDK and dispatch-client versions
Browse files Browse the repository at this point in the history
Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
  • Loading branch information
Marenz committed Sep 4, 2024
1 parent fdde894 commit 11dd7b9
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 71 deletions.
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-pyth

```python
import os
import grpc.aio
from frequenz.dispatch import Dispatcher, RunningState
from unittest.mock import MagicMock

async def run():
host = os.getenv("DISPATCH_API_HOST", "localhost")
port = os.getenv("DISPATCH_API_PORT", "50051")
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")

service_address = f"{host}:{port}"
grpc_channel = grpc.aio.insecure_channel(service_address)
microgrid_id = 1
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)

dispatcher = Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
await dispatcher.start()

actor = MagicMock() # replace with your actor
Expand Down
7 changes: 5 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@

## Upgrading

- The dispatch high level interface now depends on `frequenz-sdk` version `v1.0.0-rc801`.
- The dispatch high level interface now depends on `frequenz-sdk` version `v1.0.0-rc900`.
- We are now using the version `0.6.0` of the underlying `frequenz-client-dispatch` client library.
- The init parameter of the `Dispatcher` class has been changed to accept a `server_url` instead.

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* Using the new dispatch client, we now have support for pagination in the dispatch list request.
* The new client version also supports streaming, however it is not yet used internally in the high level interface.

## Bug Fixes

Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ plugins:
# See https://mkdocstrings.github.io/python/usage/#import for details
- https://docs.python.org/3/objects.inv
- https://frequenz-floss.github.io/frequenz-channels-python/v1.0/objects.inv
- https://frequenz-floss.github.io/frequenz-client-dispatch-python/v0.5/objects.inv
- https://frequenz-floss.github.io/frequenz-client-dispatch-python/v0.6/objects.inv
- https://frequenz-floss.github.io/frequenz-sdk-python/v1.0-pre/objects.inv
- https://grpc.github.io/grpc/python/objects.inv
- https://typing-extensions.readthedocs.io/en/stable/objects.inv
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ dependencies = [
# Make sure to update the version for cross-referencing also in the
# mkdocs.yml file when changing the version here (look for the config key
# plugins.mkdocstrings.handlers.python.import)
"frequenz-sdk == 1.0.0-rc801",
"frequenz-sdk == 1.0.0-rc900",
"frequenz-channels >= 1.1.0, < 2.0.0",
"frequenz-client-dispatch >= 0.5.0, < 0.6.0",
"frequenz-client-dispatch >= 0.6.0, < 0.7.0",
]
dynamic = ["version"]

Expand Down
53 changes: 15 additions & 38 deletions src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import abc
from typing import Protocol, TypeVar

import grpc.aio
from frequenz.channels import Broadcast, Receiver
from frequenz.client.dispatch import Client

Expand Down Expand Up @@ -55,24 +54,18 @@ class Dispatcher:
Example: Processing running state change dispatches
```python
import os
import grpc.aio
from frequenz.dispatch import Dispatcher, RunningState
from unittest.mock import MagicMock
async def run():
host = os.getenv("DISPATCH_API_HOST", "localhost")
port = os.getenv("DISPATCH_API_PORT", "50051")
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
service_address = f"{host}:{port}"
grpc_channel = grpc.aio.secure_channel(
service_address,
credentials=grpc.ssl_channel_credentials()
)
microgrid_id = 1
dispatcher = Dispatcher(
microgrid_id=1,
grpc_channel=grpc_channel,
svc_addr=service_address,
microgrid_id=microgrid_id,
server_url=url,
key=key
)
await dispatcher.start()
Expand Down Expand Up @@ -112,23 +105,17 @@ async def run():
import os
from typing import assert_never
import grpc.aio
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
async def run():
host = os.getenv("DISPATCH_API_HOST", "localhost")
port = os.getenv("DISPATCH_API_PORT", "50051")
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
service_address = f"{host}:{port}"
grpc_channel = grpc.aio.secure_channel(
service_address,
credentials=grpc.ssl_channel_credentials()
)
microgrid_id = 1
dispatcher = Dispatcher(
microgrid_id=1,
grpc_channel=grpc_channel,
svc_addr=service_address,
microgrid_id=microgrid_id,
server_url=url,
key=key
)
await dispatcher.start() # this will start the actor
Expand All @@ -154,27 +141,19 @@ async def run():
import os
from datetime import datetime, timedelta, timezone
import grpc.aio
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.dispatch import Dispatcher
async def run():
host = os.getenv("DISPATCH_API_HOST", "localhost")
port = os.getenv("DISPATCH_API_PORT", "50051")
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
service_address = f"{host}:{port}"
grpc_channel = grpc.aio.secure_channel(
service_address,
credentials=grpc.ssl_channel_credentials()
)
dispatcher = Dispatcher(
microgrid_id=microgrid_id,
grpc_channel=grpc_channel,
svc_addr=service_address,
server_url=url,
key=key
)
await dispatcher.start() # this will start the actor
Expand Down Expand Up @@ -208,23 +187,21 @@ def __init__(
self,
*,
microgrid_id: int,
grpc_channel: grpc.aio.Channel,
svc_addr: str,
server_url: str,
key: str,
):
"""Initialize the dispatcher.
Args:
microgrid_id: The microgrid id.
grpc_channel: The gRPC channel.
svc_addr: The service address.
server_url: The URL of the dispatch service.
key: The key to access the service.
"""
self._running_state_channel = Broadcast[Dispatch](name="running_state_change")
self._lifecycle_events_channel = Broadcast[DispatchEvent](
name="lifecycle_events"
)
self._client = Client(grpc_channel=grpc_channel, svc_addr=svc_addr, key=key)
self._client = Client(server_url=server_url, key=key)
self._actor = DispatchingActor(
microgrid_id,
self._client,
Expand Down
43 changes: 21 additions & 22 deletions src/frequenz/dispatch/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,28 +89,27 @@ async def _fetch(self) -> None:

try:
_logger.info("Fetching dispatches for microgrid %s", self._microgrid_id)
async for client_dispatch in self._client.list(
microgrid_id=self._microgrid_id
):
dispatch = Dispatch(client_dispatch)

self._dispatches[dispatch.id] = Dispatch(client_dispatch)
old_dispatch = old_dispatches.pop(dispatch.id, None)
if not old_dispatch:
self._update_dispatch_schedule(dispatch, None)
_logger.info("New dispatch: %s", dispatch)
await self._lifecycle_updates_sender.send(
Created(dispatch=dispatch)
)
elif dispatch.update_time != old_dispatch.update_time:
self._update_dispatch_schedule(dispatch, old_dispatch)
_logger.info("Updated dispatch: %s", dispatch)
await self._lifecycle_updates_sender.send(
Updated(dispatch=dispatch)
)

if self._running_state_change(dispatch, old_dispatch):
await self._send_running_state_change(dispatch)
async for page in self._client.list(microgrid_id=self._microgrid_id):
for client_dispatch in page:
dispatch = Dispatch(client_dispatch)

self._dispatches[dispatch.id] = Dispatch(client_dispatch)
old_dispatch = old_dispatches.pop(dispatch.id, None)
if not old_dispatch:
self._update_dispatch_schedule(dispatch, None)
_logger.info("New dispatch: %s", dispatch)
await self._lifecycle_updates_sender.send(
Created(dispatch=dispatch)
)
elif dispatch.update_time != old_dispatch.update_time:
self._update_dispatch_schedule(dispatch, old_dispatch)
_logger.info("Updated dispatch: %s", dispatch)
await self._lifecycle_updates_sender.send(
Updated(dispatch=dispatch)
)

if self._running_state_change(dispatch, old_dispatch):
await self._send_running_state_change(dispatch)

except grpc.aio.AioRpcError as error:
_logger.error("Error fetching dispatches: %s", error)
Expand Down

0 comments on commit 11dd7b9

Please sign in to comment.