diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 676fcdf..98e9160 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,7 +6,7 @@ ## Upgrading - +* `Client.stream()` will now raise an Exception when the connection is lost. ## New Features diff --git a/src/frequenz/client/dispatch/_client.py b/src/frequenz/client/dispatch/_client.py index 5bac9cc..8f24b78 100644 --- a/src/frequenz/client/dispatch/_client.py +++ b/src/frequenz/client/dispatch/_client.py @@ -33,6 +33,7 @@ from frequenz.client.base.channel import ChannelOptions, SslOptions from frequenz.client.base.client import BaseApiClient from frequenz.client.base.conversion import to_timestamp +from frequenz.client.base.retry import LinearBackoff from frequenz.client.base.streaming import GrpcStreamBroadcaster from ._internal_types import DispatchCreateRequest @@ -208,6 +209,11 @@ def _get_stream( ) -> GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]: """Get an instance to the streaming helper.""" broadcaster = self.streams.get(microgrid_id) + # pylint: disable=protected-access + if broadcaster is not None and broadcaster._channel.is_closed: + # pylint: enable=protected-access + del self.streams[microgrid_id] + broadcaster = None if broadcaster is None: request = StreamMicrogridDispatchesRequest(microgrid_id=microgrid_id) broadcaster = GrpcStreamBroadcaster( @@ -219,6 +225,7 @@ def _get_stream( ), ), transform=DispatchEvent.from_protobuf, + retry_strategy=LinearBackoff(interval=1, limit=0), ) self.streams[microgrid_id] = broadcaster