Skip to content

Commit

Permalink
feat(core): add mailbox as a simplified version of chan
Browse files Browse the repository at this point in the history
  • Loading branch information
matejcik committed Apr 7, 2024
1 parent c01a86d commit ca80a6e
Showing 1 changed file with 103 additions and 0 deletions.
103 changes: 103 additions & 0 deletions core/src/trezor/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,109 @@ def __iter__(self) -> Task: # type: ignore [awaitable-is-generator]
raise


class mailbox(Syscall):
"""
Wait to receive a value.
In terms of synchronization primitives, this is a condition variable that also
contains a value. It is a simplification of Go channels, which is one-ended and
only has a buffer of size 1.
The receiving end pauses until a value is received, and then empties the mailbox
to wait again.
The sending end synchronously posts a value. It is impossible to wait until
the value is consumed. Trying to post a value when the mailbox is full raises
an error, unless `replace=True` is specified
Example:
>>> # in task #1:
>>> box = loop.mailbox()
>>> while True:
>>> result = await box
>>> print("awaited result:", result)
>>> # in task #2:
>>> box.put("Hello from the other task")
>>> print("put completed")
Example Output:
put completed
awaited result: Hello from the other task
"""

_NO_VALUE = object()

def __init__(self, initial_value: Any = _NO_VALUE) -> None:
self.value = initial_value
self.taker: Task | None = None

def is_empty(self) -> bool:
"""Is the mailbox empty?"""
return self.value is self._NO_VALUE

def clear(self) -> None:
"""Empty the mailbox."""
assert self.taker is None
self.value = self._NO_VALUE

def put(self, value: Any, replace: bool = False) -> None:
"""Put a value into the mailbox.
If there is another task waiting for the value, it will be scheduled to resume.
Otherwise, the mailbox will hold the value until someone consumes it.
It is an error to call `put()` when there is a value already held, unless
`replace` is set to `True`. In such case, the held value is replaced with
the new one.
"""
if not self.is_empty() and not replace:
raise ValueError("mailbox already has a value")

self.value = value
if self.taker is not None:
self._take(self.taker)

def _take(self, task: Task) -> None:
"""Take a value and schedule the taker."""
self.taker = None
schedule(task, self.value)
self.clear()

def handle(self, task: Task) -> None:
assert self.taker is None
if not self.is_empty():
self._take(task)
else:
self.taker = task

def __iter__(self) -> Generator:
assert self.taker is None

# short-circuit if there is a value already
if not self.is_empty():
value = self.value
self.clear()
return value

# otherwise, wait for a value
try:
return (yield self)
finally:
# Clear the taker even in case of exception. This way stale takers don't
# blow up someone calling `maybe_close()`
self.taker = None

def maybe_close(self) -> None:
"""Shut down the taker if possible."""
taker = self.taker
self.taker = None
if taker is not None and taker is not this_task:
taker.close()


class chan:
"""
Two-ended channel.
Expand Down

0 comments on commit ca80a6e

Please sign in to comment.