Skip to content

Commit

Permalink
Make imports private
Browse files Browse the repository at this point in the history
Everything that is not supposed to be exported by this module is renamed
to start with a `_` to make it more explicit that it is private.

Signed-off-by: Leandro Lucarella <luca-frequenz@llucax.com>
  • Loading branch information
llucax committed Jun 8, 2023
1 parent 5103821 commit 3c2a51c
Showing 1 changed file with 18 additions and 19 deletions.
37 changes: 18 additions & 19 deletions src/frequenz/channels/util/_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@
is closed in case of `Receiver` class.
"""

import asyncio
import logging
from dataclasses import dataclass
import asyncio as _asyncio
import logging as _logging
from dataclasses import dataclass as _dataclass
from typing import Any, Dict, List, Optional, Set, TypeVar

from .._base_classes import Receiver
from .._exceptions import ReceiverStoppedError
from .. import _base_classes, _exceptions

logger = logging.Logger(__name__)
T = TypeVar("T")
_logger = _logging.Logger(__name__)
_T = TypeVar("_T")


@dataclass
@_dataclass
class _Selected:
"""A wrapper class for holding values in `Select`.
Expand All @@ -31,7 +30,7 @@ class _Selected:
inner: Optional[Any]


@dataclass
@_dataclass
class _ReadyReceiver:
"""A class for tracking receivers that have a message ready to be read.
Expand All @@ -41,7 +40,7 @@ class _ReadyReceiver:
When a channel has closed, `recv` should be `None`.
"""

recv: Optional[Receiver[Any]]
recv: Optional[_base_classes.Receiver[Any]]

def get(self) -> _Selected:
"""Consume a message from the receiver and return a `_Selected` object.
Expand Down Expand Up @@ -94,17 +93,17 @@ class Select:
```
"""

def __init__(self, **kwargs: Receiver[Any]) -> None:
def __init__(self, **kwargs: _base_classes.Receiver[Any]) -> None:
"""Create a `Select` instance.
Args:
**kwargs: sequence of receivers
"""
self._receivers = kwargs
self._pending: Set[asyncio.Task[bool]] = set()
self._pending: Set[_asyncio.Task[bool]] = set()

for name, recv in self._receivers.items():
self._pending.add(asyncio.create_task(recv.ready(), name=name))
self._pending.add(_asyncio.create_task(recv.ready(), name=name))

self._ready_count = 0
self._prev_ready_count = 0
Expand All @@ -122,7 +121,7 @@ async def stop(self) -> None:
"""Stop the `Select` instance and cleanup any pending tasks."""
for task in self._pending:
task.cancel()
await asyncio.gather(*self._pending, return_exceptions=True)
await _asyncio.gather(*self._pending, return_exceptions=True)
self._pending = set()

async def ready(self) -> bool:
Expand All @@ -145,12 +144,12 @@ async def ready(self) -> bool:
if value.recv is not None:
try:
value.recv.consume()
except ReceiverStoppedError:
except _exceptions.ReceiverStoppedError:
pass
self._result[name] = None
self._ready_count = 0
self._prev_ready_count = 0
logger.warning(
_logger.warning(
"Select.ready() dropped data from receiver(s): %s, "
"because no messages have been fetched since the last call to ready().",
dropped_names,
Expand All @@ -165,8 +164,8 @@ async def ready(self) -> bool:
# `_prev_ready_count` as well, and wait for new messages.
self._prev_ready_count = 0

done, self._pending = await asyncio.wait(
self._pending, return_when=asyncio.FIRST_COMPLETED
done, self._pending = await _asyncio.wait(
self._pending, return_when=_asyncio.FIRST_COMPLETED
)
for task in done:
name = task.get_name()
Expand All @@ -182,7 +181,7 @@ async def ready(self) -> bool:
# don't add a task for it again.
if not receiver_active:
continue
self._pending.add(asyncio.create_task(recv.ready(), name=name))
self._pending.add(_asyncio.create_task(recv.ready(), name=name))
return True

def __getattr__(self, name: str) -> Optional[Any]:
Expand Down

0 comments on commit 3c2a51c

Please sign in to comment.