Skip to content

Commit

Permalink
Make type hints use built-in types
Browse files Browse the repository at this point in the history
Signed-off-by: Leandro Lucarella <luca-frequenz@llucax.com>
  • Loading branch information
llucax committed Jun 8, 2023
1 parent 3c2a51c commit f69ff82
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions src/frequenz/channels/util/_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import asyncio as _asyncio
import logging as _logging
from dataclasses import dataclass as _dataclass
from typing import Any, Dict, List, Optional, Set, TypeVar
from typing import Any, TypeVar

from .. import _base_classes, _exceptions

Expand All @@ -27,7 +27,7 @@ class _Selected:
receiver gets closed.
"""

inner: Optional[Any]
inner: Any


@_dataclass
Expand All @@ -40,7 +40,7 @@ class _ReadyReceiver:
When a channel has closed, `recv` should be `None`.
"""

recv: Optional[_base_classes.Receiver[Any]]
recv: _base_classes.Receiver[Any] | None

def get(self) -> _Selected:
"""Consume a message from the receiver and return a `_Selected` object.
Expand Down Expand Up @@ -100,14 +100,14 @@ def __init__(self, **kwargs: _base_classes.Receiver[Any]) -> None:
**kwargs: sequence of receivers
"""
self._receivers = kwargs
self._pending: Set[_asyncio.Task[bool]] = set()
self._pending: set[_asyncio.Task[bool]] = set()

for name, recv in self._receivers.items():
self._pending.add(_asyncio.create_task(recv.ready(), name=name))

self._ready_count = 0
self._prev_ready_count = 0
self._result: Dict[str, Optional[_ReadyReceiver]] = {
self._result: dict[str, _ReadyReceiver | None] = {
name: None for name in self._receivers
}

Expand Down Expand Up @@ -137,7 +137,7 @@ async def ready(self) -> bool:
# pylint: disable=too-many-nested-blocks
if self._ready_count > 0:
if self._ready_count == self._prev_ready_count:
dropped_names: List[str] = []
dropped_names: list[str] = []
for name, value in self._result.items():
if value is not None:
dropped_names.append(name)
Expand Down Expand Up @@ -184,7 +184,7 @@ async def ready(self) -> bool:
self._pending.add(_asyncio.create_task(recv.ready(), name=name))
return True

def __getattr__(self, name: str) -> Optional[Any]:
def __getattr__(self, name: str) -> Any:
"""Return the latest unread message from a `Receiver`, if available.
Args:
Expand Down

0 comments on commit f69ff82

Please sign in to comment.