From ca80a6e9bfc1ee51c1c3d5ffec35c48c64650e32 Mon Sep 17 00:00:00 2001 From: matejcik Date: Wed, 26 Jul 2023 10:59:20 +0200 Subject: [PATCH] feat(core): add mailbox as a simplified version of chan --- core/src/trezor/loop.py | 103 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/core/src/trezor/loop.py b/core/src/trezor/loop.py index 004b70b19c1..e76c692dd40 100644 --- a/core/src/trezor/loop.py +++ b/core/src/trezor/loop.py @@ -350,6 +350,109 @@ def __iter__(self) -> Task: # type: ignore [awaitable-is-generator] raise +class mailbox(Syscall): + """ + Wait to receive a value. + + In terms of synchronization primitives, this is a condition variable that also + contains a value. It is a simplification of Go channels, which is one-ended and + only has a buffer of size 1. + + The receiving end pauses until a value is received, and then empties the mailbox + to wait again. + + The sending end synchronously posts a value. It is impossible to wait until + the value is consumed. Trying to post a value when the mailbox is full raises + an error, unless `replace=True` is specified + + Example: + + >>> # in task #1: + >>> box = loop.mailbox() + >>> while True: + >>> result = await box + >>> print("awaited result:", result) + + >>> # in task #2: + >>> box.put("Hello from the other task") + >>> print("put completed") + + Example Output: + + put completed + awaited result: Hello from the other task + """ + + _NO_VALUE = object() + + def __init__(self, initial_value: Any = _NO_VALUE) -> None: + self.value = initial_value + self.taker: Task | None = None + + def is_empty(self) -> bool: + """Is the mailbox empty?""" + return self.value is self._NO_VALUE + + def clear(self) -> None: + """Empty the mailbox.""" + assert self.taker is None + self.value = self._NO_VALUE + + def put(self, value: Any, replace: bool = False) -> None: + """Put a value into the mailbox. + + If there is another task waiting for the value, it will be scheduled to resume. + Otherwise, the mailbox will hold the value until someone consumes it. + + It is an error to call `put()` when there is a value already held, unless + `replace` is set to `True`. In such case, the held value is replaced with + the new one. + """ + if not self.is_empty() and not replace: + raise ValueError("mailbox already has a value") + + self.value = value + if self.taker is not None: + self._take(self.taker) + + def _take(self, task: Task) -> None: + """Take a value and schedule the taker.""" + self.taker = None + schedule(task, self.value) + self.clear() + + def handle(self, task: Task) -> None: + assert self.taker is None + if not self.is_empty(): + self._take(task) + else: + self.taker = task + + def __iter__(self) -> Generator: + assert self.taker is None + + # short-circuit if there is a value already + if not self.is_empty(): + value = self.value + self.clear() + return value + + # otherwise, wait for a value + try: + return (yield self) + finally: + # Clear the taker even in case of exception. This way stale takers don't + # blow up someone calling `maybe_close()` + self.taker = None + + def maybe_close(self) -> None: + """Shut down the taker if possible.""" + taker = self.taker + self.taker = None + if taker is not None and taker is not this_task: + taker.close() + + class chan: """ Two-ended channel.