diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/util/_select.py index 80c09d61..1349791d 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/util/_select.py @@ -11,7 +11,7 @@ import asyncio as _asyncio import logging as _logging from dataclasses import dataclass as _dataclass -from typing import Any, Dict, List, Optional, Set, TypeVar +from typing import Any, TypeVar from .. import _base_classes, _exceptions @@ -27,7 +27,7 @@ class _Selected: receiver gets closed. """ - inner: Optional[Any] + inner: Any @_dataclass @@ -40,7 +40,7 @@ class _ReadyReceiver: When a channel has closed, `recv` should be `None`. """ - recv: Optional[_base_classes.Receiver[Any]] + recv: _base_classes.Receiver[Any] | None def get(self) -> _Selected: """Consume a message from the receiver and return a `_Selected` object. @@ -100,14 +100,14 @@ def __init__(self, **kwargs: _base_classes.Receiver[Any]) -> None: **kwargs: sequence of receivers """ self._receivers = kwargs - self._pending: Set[_asyncio.Task[bool]] = set() + self._pending: set[_asyncio.Task[bool]] = set() for name, recv in self._receivers.items(): self._pending.add(_asyncio.create_task(recv.ready(), name=name)) self._ready_count = 0 self._prev_ready_count = 0 - self._result: Dict[str, Optional[_ReadyReceiver]] = { + self._result: dict[str, _ReadyReceiver | None] = { name: None for name in self._receivers } @@ -137,7 +137,7 @@ async def ready(self) -> bool: # pylint: disable=too-many-nested-blocks if self._ready_count > 0: if self._ready_count == self._prev_ready_count: - dropped_names: List[str] = [] + dropped_names: list[str] = [] for name, value in self._result.items(): if value is not None: dropped_names.append(name) @@ -184,7 +184,7 @@ async def ready(self) -> bool: self._pending.add(_asyncio.create_task(recv.ready(), name=name)) return True - def __getattr__(self, name: str) -> Optional[Any]: + def __getattr__(self, name: str) -> Any: """Return the latest unread message from a `Receiver`, if available. Args: