From fe659dfbcc799edb1e5d868159036482d3c0e6e1 Mon Sep 17 00:00:00 2001 From: Luke Wagner Date: Mon, 9 Dec 2024 17:19:50 -0600 Subject: [PATCH] Change async CABI, add context.{get,set} and waitable sets --- design/mvp/Async.md | 213 ++++++++---- design/mvp/Binary.md | 13 +- design/mvp/CanonicalABI.md | 419 +++++++++++++++++------- design/mvp/Explainer.md | 200 +++++++---- design/mvp/canonical-abi/definitions.py | 194 ++++++++--- design/mvp/canonical-abi/run_tests.py | 345 ++++++++++++------- 6 files changed, 969 insertions(+), 415 deletions(-) diff --git a/design/mvp/Async.md b/design/mvp/Async.md index 7189cc29..38e9bdd4 100644 --- a/design/mvp/Async.md +++ b/design/mvp/Async.md @@ -15,6 +15,7 @@ summary of the motivation and animated sketch of the design in action. * [Sync and Async Functions](#sync-and-async-functions) * [Task](#task) * [Current task](#current-task) + * [Context-Local Storage](#context-local-storage) * [Subtask and Supertask](#subtask-and-supertask) * [Structured concurrency](#structured-concurrency) * [Streams and Futures](#streams-and-futures) @@ -181,6 +182,38 @@ although there can be multiple live `Task` objects in a component instance, "the current one" is always clear: it's the one passed to the current function as a parameter. +### Context-Local Storage + +Each task contains a distinct mutable **context-local storage** array. The +current task's context-local storage can be read and written from core wasm +code by calling the [`context.get`] and [`context.set`] built-ins. + +The context-local storage array's length is currently fixed to contain exactly +2 `i32`s with the goal of allowing this array to be stored inline in whatever +existing runtime data structure is already efficiently reachable from ambient +compiled wasm code. Because module instantiation is declarative in the +Component Model, the imported `context.{get,set}` built-ins can be inlined by +the core wasm compiler as-if they were instructions, allowing the generated +machine code to be a single load or store. This makes context-local storage a +good place to store the linear-memory shadow stack pointer as well as the +pointer to the struct used to implement [thread-local storage] APIs used by +guest code. + +When [memory64] is integrated into the Component Model's Canonical ABI, +`context.{get,set}` will be backwards-compatibly relaxed to allow `i64` +pointers (overlaying the `i32` values like hardware 32/64-bit registers). When +[wasm-gc] is integrated, these integral context values can serve as indices +into guest-managed tables of typed GC references. + +When [threads are added](#interaction-with-multi-threading), each thread will +also get its own distinct mutable context-local storage array. This is the +reason why "context-local" storage is not called "task-local" storage (where a +"context" is a finer-grained unit of execution than either a "task" or a +"thread"). + +For details, see [`context.get`] in the AST explainer and [`canon_context_get`] +in the Canonical ABI explainer. + ### Subtask and Supertask Each component-to-component call necessarily creates a new task in the callee. @@ -322,32 +355,52 @@ maintained for streams and futures by the Canonical ABI. When a component asynchronously lowers an import, it is explicitly requesting that, if the import blocks, control flow be returned back to the calling task so that it can do something else. Similarly, if `stream.read` or `stream.write` -would block, they return a "blocked" code so that the caller can continue to -make progress on other things. But eventually, a task will run out of other -things to do and will need to **wait** for progress on one of the task's -subtasks, readable stream ends, writable stream ends, readable future ends or -writable future ends, which are collectively called its **waitables**. While a -task is waiting on its waitables, the Component Model runtime can switch to -other running tasks or start new tasks by invoking exports. - -The Canonical ABI provides two ways for a task to wait: -* The task can call the [`task.wait`] built-in to synchronously wait for - progress. This is specified in the Canonical ABI by the [`canon_task_wait`] - function. -* The task can specify a `callback` function (in the `canon lift` definition) - and return to the event loop to wait for notification of progress by a call - to the `callback` function. This is specified in the Canonical ABI by - the `opts.callback` case in [`canon_lift`]. +are called asynchronously and would block, they return a "blocked" code so that +the caller can continue to make progress on other things. But eventually, a +task will run out of other things to do and will need to **wait** for progress +on one of the task's subtasks, reads or writes, which are collectively called +its **waitables**. The Canonical ABI Python represents waitables with the +[`Waitable`] base class. While a task is waiting, the Component Model runtime +can switch to other running tasks or start new tasks by invoking exports. + +To avoid the O(N) cost of processing an N-ary list of waitables every time a +task needs to wait (which is the classic performance bottleneck of, e.g., POSIX +`select()`), the Canonical ABI allows waitables to be maintained in **waitable +sets** which (like `epoll()`) can be waited upon as a whole for any one of the +member waitables to make progress. Waitable sets are independent of tasks; +tasks can wait on different waitable sets over time and a single waitable set +can be waited upon by multiple tasks at once. Waitable sets are local to a +component instance and cannot be shared across component boundaries. + +The Canonical ABI provides two ways for a task to wait on a waitable set: +* Core wasm can pass (the index of) the waitable set as a parameter to the + [`waitable-set.wait`] built-in which blocks and returns the event that + occurred. +* If the task uses a `callback` function, core wasm can return (the index of) + the waitable set as a return value to the event loop, which will block and + then pass the event that occurred as a parameter to the `callback`. While the two approaches have significant runtime implementation differences (the former requires [fibers] or a [CPS transform] while the latter only -requires storing a small `i32` "context" in the task), semantically they do the -same thing which, in the Canonical ABI Python code, is factored out into -[`Task`]'s `wait` method. Thus, the difference between `callback` and -non-`callback` is mostly one of optimization, not expressivity. - -The Canonical ABI Python represents waitables with a common [`Waitable`] -base class. +requires storing fixed-size context-local storage and [`Task`] state), +semantically they do the same thing which, in the Canonical ABI Python code, is +factored out into the [`Task.wait`] method. Thus, the difference between +`callback` and non-`callback` is one of optimization, not expressivity. + +In addition to waiting for an event to occur, a task can also **poll** for +whether an event has already occurred. Polling does not block, but does allow +other tasks to be switched to and executed. Polling is opportunistic, allowing +the servicing of higher-priority events in the middle of longer-running +computations; when there is nothing left to do, a task must *wait*. A task +can poll by either calling [`waitable-set.poll`] or, when using a +`callback`, by returning the Canonical-ABI-defined "poll" code to the event loop +along with (the index of) the waitable set to poll. + +Lastly, if a long-running task wants to allow other tasks to execute, without +having any of its own subtasks to wait on, it can **yield**, allowing other +tasks to be scheduled before continuing execution of the current task. A task +can yield by either calling [`yield`] or, when using a `callback`, by returning +the Canonical-ABI-defined "yield" code to the event loop. ### Backpressure @@ -356,16 +409,16 @@ export calls can start piling up, each consuming some of the component's finite private resources (like linear memory), requiring the component to be able to exert *backpressure* to allow some tasks to finish (and release private resources) before admitting new async export calls. To do this, a component may -call the `task.backpressure` built-in to set a "backpressure" flag that causes -subsequent export calls to immediately return in the "starting" state without -calling the component's Core WebAssembly code. +call the [`backpressure.set`] built-in to set a component-instance-wide +"backpressure" flag that causes subsequent export calls to immediately return +in the "starting" state without calling the component's Core WebAssembly code. Once task enables backpressure, it can [wait](#waiting) for existing tasks to finish and release their associated resources. Thus, a task can choose to [wait](#waiting) with or without backpressure enabled, depending on whether it wants to accept new accept new export calls while waiting or not. -See the [`canon_task_backpressure`] function and [`Task.enter`] method in the +See the [`canon_backpressure_set`] function and [`Task.enter`] method in the Canonical ABI explainer for the setting and implementation of backpressure. Once a task is allowed to start according to these backpressure rules, its @@ -415,18 +468,29 @@ replaced with `...` to focus on the overall flow of function calls. (import "libc" "mem" (memory 1)) (import "libc" "realloc" (func (param i32 i32 i32 i32) (result i32))) (import "" "fetch" (func $fetch (param i32 i32) (result i32))) + (import "" "waitable-set.new" (func $new_waitable_set (result i32))) + (import "" "waitable-set.wait" (func $wait (param i32 i32) (result i32))) + (import "" "waitable.join" (func $join (param i32 i32))) (import "" "task.return" (func $task_return (param i32 i32))) - (import "" "task.wait" (func $wait (param i32) (result i32))) + (global $wsi (mut i32)) + (func $start + (global.set $wsi (call $new_waitable_set)) + ) + (start $start) (func (export "summarize") (param i32 i32) ... loop ... call $fetch ;; pass a pointer-to-string and pointer-to-list-of-bytes outparam ... ;; ... and receive the index of a new async subtask + global.get $wsi + call $join ;; ... and add it to the waitable set + ... end loop ;; loop as long as there are any subtasks ... - call $task_wait ;; wait for a subtask to make progress + global.get $wsi + call $wait ;; wait for a subtask in the waitable set to make progress ... end ... @@ -438,14 +502,18 @@ replaced with `...` to focus on the overall flow of function calls. (alias $libc "mem" (core memory $mem)) (alias $libc "realloc" (core func $realloc)) (canon lower $fetch async (memory $mem) (realloc $realloc) (core func $fetch')) + (canon waitable-set.new (core func $new)) + (canon waitable-set.wait async (memory $mem) (core func $wait)) + (canon waitable.join (core func $join)) (canon task.return (result string) async (memory $mem) (realloc $realloc) (core func $task_return)) - (canon task.wait async (memory $mem) (core func $task_wait)) (core instance $main (instantiate $Main (with "" (instance (export "mem" (memory $mem)) (export "realloc" (func $realloc)) (export "fetch" (func $fetch')) + (export "waitable-set.new" (func $new)) + (export "waitable-set.wait" (func $wait)) + (export "waitable.join" (func $join)) (export "task.return" (func $task_return)) - (export "task.wait" (func $task_wait)) )))) (canon lift (core func $main "summarize") async (memory $mem) (realloc $realloc) @@ -456,25 +524,21 @@ replaced with `...` to focus on the overall flow of function calls. Because the imported `fetch` function is `canon lower`ed with `async`, its core function type (shown in the first import of `$Main`) takes pointers to the parameter and results (which are asynchronously read-from and written-to) and -returns the index of a new subtask. `summarize` calls `task.wait` repeatedly -until all `fetch` subtasks have finished, noting that `task.wait` can return -intermediate progress (as subtasks transition from "starting" to "started" to -"returned") which tell the surrounding core wasm code that it can reclaim the -memory passed arguments or use the results that have now been written to the -outparam memory. +returns the index of a new subtask. `summarize` calls `waitable-set.wait` +repeatedly until all `fetch` subtasks have finished, noting that +`waitable-set.wait` can return intermediate progress (as subtasks transition +from "starting" to "started" to "returned") which tell the surrounding core +wasm code that it can reclaim the memory passed arguments or use the results +that have now been written to the outparam memory. Because the `summarize` function is `canon lift`ed with `async`, its core -function type has no results, since results are passed out via `task.return`. -It also means that multiple `summarize` calls can be active at once: once the -first call to `task.wait` blocks, the runtime will suspend its callstack +function type has no results; results are passed out via `task.return`. It also +means that multiple `summarize` calls can be active at once: once the first +call to `waitable-set.wait` blocks, the runtime will suspend its callstack (fiber) and start a new stack for the new call to `summarize`. Thus, `summarize` must be careful to allocate a separate linear-memory stack in its -entry point, if one is needed, and to save and restore this before and after -calling `task.wait`. - -(Note that, for brevity this example ignores the `memory` and `realloc` -immediates required by `canon lift` and `canon lower` to allocate the `list` -param and `string` result, resp.) +entry point and store it in context-local storage (via `context.set`) instead +of simply using a `global`, as in a synchronous function. This same example can be re-written to use the `callback` immediate (thereby avoiding the need for fibers) as follows. Note that the internal structure of @@ -495,37 +559,55 @@ not externally-visible behavior. (import "libc" "mem" (memory 1)) (import "libc" "realloc" (func (param i32 i32 i32 i32) (result i32))) (import "" "fetch" (func $fetch (param i32 i32) (result i32))) + (import "" "waitable-set.new" (func $new_waitable_set (result i32))) + (import "" "waitable.join" (func $join (param i32 i32))) (import "" "task.return" (func $task_return (param i32 i32))) + (global $wsi (mut i32)) + (func $start + (global.set $wsi (call $new_waitable_set)) + ) + (start $start) (func (export "summarize") (param i32 i32) (result i32) ... loop ... call $fetch ;; pass a pointer-to-string and pointer-to-list-of-bytes outparam ... ;; ... and receive the index of a new async subtask + global.get $wsi + call $join ;; ... and add it to the waitable set + ... end - ... ;; return a non-zero "cx" value passed to the next call to "cb" + (i32.or ;; return (WAIT | ($wsi << 4)) + (i32.const 2) ;; 2 -> WAIT + (i32.shl + (global.get $wsi) + (i32.const 4))) ) - (func (export "cb") (param $cx i32) (param $event i32) (param $p1 i32) (param $p2 i32) + (func (export "cb") (param $event i32) (param $p1 i32) (param $p2 i32) ... - if ... subtasks remain ... - get_local $cx - return ;; wait for another subtask to make progress + if (result i32) ;; if subtasks remain: + i32.const 2 ;; return WAIT + else ;; if no subtasks remain: + ... + call $task_return ;; return the string result (pointer,length) + ... + i32.const 0 ;; return EXIT end - ... - call $task_return ;; return the string result (pointer,length) - ... - i32.const 0 ;; return zero to signal that this task is done ) ) (core instance $libc (instantiate $Libc)) (alias $libc "mem" (core memory $mem)) (alias $libc "realloc" (core func $realloc)) (canon lower $fetch async (memory $mem) (realloc $realloc) (core func $fetch')) + (canon waitable-set.new (core func $new)) + (canon waitable.join (core func $join)) (canon task.return (result string) async (memory $mem) (realloc $realloc) (core func $task_return)) (core instance $main (instantiate $Main (with "" (instance (export "mem" (memory $mem)) (export "realloc" (func $realloc)) (export "fetch" (func $fetch')) + (export "waitable-set.new" (func $new)) + (export "waitable.join" (func $join)) (export "task.return" (func $task_return)) )))) (canon lift (core func $main "summarize") @@ -534,6 +616,9 @@ not externally-visible behavior. (export "summarize" (func $summarize)) ) ``` +For an explanation of the bitpacking of the `i32` callback return value, +see [`unpack_callback_result`] in the Canonical ABI explainer. + While this example spawns all the subtasks in the initial call to `summarize`, subtasks can also be spawned from `cb` (even after the call to `task.return`). It's also possible for `summarize` to call `task.return` called eagerly in the @@ -623,25 +708,33 @@ comes after: [Event Loop]: https://en.wikipedia.org/wiki/Event_loop [Structured Concurrency]: https://en.wikipedia.org/wiki/Structured_concurrency [Unit]: https://en.wikipedia.org/wiki/Unit_type +[Thread-local Storage]: https://en.wikipedia.org/wiki/Thread-local_storage [AST Explainer]: Explainer.md [Lift and Lower Definitions]: Explainer.md#canonical-definitions [Lifted]: Explainer.md#canonical-definitions [Canonical Built-in]: Explainer.md#canonical-built-ins +[`context.get`]: Explainer.md#-contextget +[`context.set`]: Explainer.md#-contextset +[`backpressure.set`]: Explainer.md#-backpressureset [`task.return`]: Explainer.md#-taskreturn -[`task.wait`]: Explainer.md#-taskwait +[`yield`]: Explainer.md#-yield +[`waitable-set.wait`]: Explainer.md#-waitable-setwait +[`waitable-set.poll`]: Explainer.md#-waitable-setpoll [`thread.spawn`]: Explainer.md#-threadspawn [ESM-integration]: Explainer.md#ESM-integration [Canonical ABI Explainer]: CanonicalABI.md [`canon_lift`]: CanonicalABI.md#canon-lift -[`canon_lift`]: CanonicalABI.md#canon-lift +[`unpack_callback_result`]: CanonicalABI.md#canon-lift [`canon_lower`]: CanonicalABI.md#canon-lower -[`canon_task_wait`]: CanonicalABI.md#-canon-taskwait -[`canon_task_backpressure`]: CanonicalABI.md#-canon-taskbackpressure +[`canon_context_get`]: CanonicalABI.md#-canon-contextget +[`canon_backpressure_set`]: CanonicalABI.md#-canon-backpressureset +[`canon_waitable_set_wait`]: CanonicalABI.md#-canon-waitable-setwait [`canon_task_return`]: CanonicalABI.md#-canon-taskreturn [`Task`]: CanonicalABI.md#task-state [`Task.enter`]: CanonicalABI.md#task-state +[`Task.wait`]: CanonicalABI.md#task-state [`Waitable`]: CanonicalABI.md#waitable-state [`Subtask`]: CanonicalABI.md#subtask-state [Stream State]: CanonicalABI.md#stream-state @@ -657,6 +750,8 @@ comes after: [stack-switching]: https://github.com/WebAssembly/stack-switching/ [JSPI]: https://github.com/WebAssembly/js-promise-integration/ [shared-everything-threads]: https://github.com/webAssembly/shared-everything-threads +[memory64]: https://github.com/webAssembly/memory64 +[wasm-gc]: https://github.com/WebAssembly/gc/blob/main/proposals/gc/MVP.md [WASI Preview 3]: https://github.com/WebAssembly/WASI/tree/main/wasip2#looking-forward-to-preview-3 [`wasi:http/handler.handle`]: https://github.com/WebAssembly/wasi-http/blob/main/wit-0.3.0-draft/handler.wit diff --git a/design/mvp/Binary.md b/design/mvp/Binary.md index f0456674..8026c608 100644 --- a/design/mvp/Binary.md +++ b/design/mvp/Binary.md @@ -290,11 +290,11 @@ canon ::= 0x00 0x00 f: opts: ft: => (canon lift | 0x04 rt: => (canon resource.rep rt (core func)) | 0x05 ft: => (canon thread.spawn ft (core func)) ๐Ÿงต | 0x06 => (canon thread.available_parallelism (core func)) ๐Ÿงต - | 0x08 => (canon task.backpressure (core func)) ๐Ÿ”€ + | 0x08 => (canon backpressure.set (core func)) ๐Ÿ”€ | 0x09 rs: opts: => (canon task.return rs opts (core func)) ๐Ÿ”€ - | 0x0a async?:? m: => (canon task.wait async? (memory m) (core func)) ๐Ÿ”€ - | 0x0b async?:? m: => (canon task.poll async? (memory m) (core func)) ๐Ÿ”€ - | 0x0c async?:? => (canon task.yield async? (core func)) ๐Ÿ”€ + | 0x0a 0x7f i: => (canon context.get i32 i (core func)) ๐Ÿ”€ + | 0x0b 0x7f i: => (canon context.set i32 i (core func)) ๐Ÿ”€ + | 0x0c async?:? => (canon yield async? (core func)) ๐Ÿ”€ | 0x0d => (canon subtask.drop (core func)) ๐Ÿ”€ | 0x0e t: => (canon stream.new t (core func)) ๐Ÿ”€ | 0x0f t: opts: => (canon stream.read t opts (core func)) ๐Ÿ”€ @@ -313,6 +313,11 @@ canon ::= 0x00 0x00 f: opts: ft: => (canon lift | 0x1c opts: => (canon error-context.new opts (core func)) ๐Ÿ”€ | 0x1d opts: => (canon error-context.debug-message opts (core func)) ๐Ÿ”€ | 0x1e => (canon error-context.drop (core func)) ๐Ÿ”€ + | 0x1f => (canon waitable-set.new (core func)) ๐Ÿ”€ + | 0x20 async?:? m: => (canon waitable-set.wait async? (memory m) (core func)) ๐Ÿ”€ + | 0x21 async?:? m: => (canon waitable-set.poll async? (memory m) (core func)) ๐Ÿ”€ + | 0x22 => (canon waitable-set.drop (core func)) ๐Ÿ”€ + | 0x23 => (canon waitable.join (core func)) ๐Ÿ”€ async? ::= 0x00 => | 0x01 => async opts ::= opt*:vec() => opt* diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index 043e2580..9576d437 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -36,11 +36,16 @@ being specified here. * [`canon resource.new`](#canon-resourcenew) * [`canon resource.drop`](#canon-resourcedrop) * [`canon resource.rep`](#canon-resourcerep) - * [`canon task.backpressure`](#-canon-taskbackpressure) ๐Ÿ”€ + * [`canon context.get`](#-canon-contextget) ๐Ÿ”€ + * [`canon context.set`](#-canon-contextset) ๐Ÿ”€ + * [`canon backpressure.set`](#-canon-backpressureset) ๐Ÿ”€ * [`canon task.return`](#-canon-taskreturn) ๐Ÿ”€ - * [`canon task.wait`](#-canon-taskwait) ๐Ÿ”€ - * [`canon task.poll`](#-canon-taskpoll) ๐Ÿ”€ - * [`canon task.yield`](#-canon-taskyield) ๐Ÿ”€ + * [`canon yield`](#-canon-yield) ๐Ÿ”€ + * [`canon waitable-set.new`](#-canon-waitable-setnew) ๐Ÿ”€ + * [`canon waitable-set.wait`](#-canon-waitable-setwait) ๐Ÿ”€ + * [`canon waitable-set.poll`](#-canon-waitable-setpoll) ๐Ÿ”€ + * [`canon waitable-set.drop`](#-canon-waitable-setdrop) ๐Ÿ”€ + * [`canon waitable.join`](#-canon-waitablejoin) ๐Ÿ”€ * [`canon subtask.drop`](#-canon-subtaskdrop) ๐Ÿ”€ * [`canon {stream,future}.new`](#-canon-streamfuturenew) ๐Ÿ”€ * [`canon {stream,future}.{read,write}`](#-canon-streamfuturereadwrite) ๐Ÿ”€ @@ -159,6 +164,7 @@ behavior and enforce invariants. class ComponentInstance: resources: Table[ResourceHandle] waitables: Table[Waitable] + waitable_sets: Table[WaitableSet] error_contexts: Table[ErrorContext] may_leave: bool backpressure: bool @@ -170,6 +176,7 @@ class ComponentInstance: def __init__(self): self.resources = Table[ResourceHandle]() self.waitables = Table[Waitable]() + self.waitable_sets = Table[WaitableSet]() self.error_contexts = Table[ErrorContext]() self.may_leave = True self.backpressure = False @@ -193,7 +200,7 @@ class Table(Generic[ElemT]): array: list[Optional[ElemT]] free: list[int] - MAX_LENGTH = 2**30 - 1 + MAX_LENGTH = 2**28 - 1 def __init__(self): self.array = [None] @@ -401,6 +408,30 @@ that do all the heavy lifting are shared with function parameter/result lifting and lowering and defined below. +#### Context-Local Storage + +The `ContextLocalStorage` class implements [context-local storage], with each +new `Task` getting a fresh, zero-initialized `ContextLocalStorage` that can be +accessed by core wasm code using `canon context.{get,set}`. (In the future, +when threads are integrated, each `thread.spawn`ed thread would also get a +fresh, zero-initialized `ContextLocalStorage`.) +```python +class ContextLocalStorage: + LENGTH = 2 + array: list[int] + + def __init__(self): + self.array = [0] * ContextLocalStorage.LENGTH + + def set(self, i, v): + assert(types_match_values(['i32'], [v])) + self.array[i] = v + + def get(self, i): + return self.array[i] +``` + + #### Task State A `Task` object is created for each call to `canon_lift` and is implicitly @@ -415,9 +446,9 @@ class Task: caller: Optional[Task] on_return: Optional[Callable] on_block: Callable[[Awaitable], Awaitable] - waitable_set: WaitableSet num_subtasks: int num_borrows: int + context: ContextLocalStorage def __init__(self, opts, inst, ft, caller, on_return, on_block): self.opts = opts @@ -426,9 +457,9 @@ class Task: self.caller = caller self.on_return = on_return self.on_block = on_block - self.waitable_set = WaitableSet() self.num_subtasks = 0 self.num_borrows = 0 + self.context = ContextLocalStorage() ``` Using a conservative syntactic analysis of a complete component, an optimizing implementation can statically eliminate fields when a particular feature (such @@ -580,7 +611,7 @@ Notably, the loop in `maybe_start_pending_task` allows pending async tasks to start even when there is a blocked pending sync task ahead of them in the `pending_tasks` queue. -The `Task.yield_` method is called by `canon task.yield` or, when a `callback` +The `Task.yield_` method is called by `canon yield` or, when a `callback` is used, when core wasm returns the "yield" code to the event loop. Yielding allows the runtime to switch execution to another task without having to wait for any external I/O as emulated in the Python definition by waiting on @@ -590,17 +621,6 @@ for any external I/O as emulated in the Python definition by waiting on await self.wait_on(asyncio.sleep(0), sync) ``` -The `Task.wait` and `Task.poll` methods delegate to `WaitableSet`, defined in -the next section. -```python - async def wait(self, sync) -> EventTuple: - return await self.wait_on(self.waitable_set.wait(), sync) - - async def poll(self, sync) -> Optional[EventTuple]: - await self.yield_(sync) - return self.waitable_set.poll() -``` - The `Task.wait_on` method defines how to block the current task until a given Python awaitable is resolved. By calling the `on_block` callback, `wait_on` allows other tasks to execute before the current task resumes, but *which* @@ -746,7 +766,6 @@ may be a synchronous task unblocked by the clearing of `calling_sync_export`. def exit(self): assert(Task.current.locked()) trap_if(self.num_subtasks > 0) - self.waitable_set.drop() trap_if(self.on_return) assert(self.num_borrows == 0) if self.opts.sync: @@ -771,19 +790,19 @@ code that produces the events (specifically, in `subtask_event` and `copy_event`). ```python class CallState(IntEnum): - STARTING = 0 - STARTED = 1 - RETURNED = 2 + STARTING = 1 + STARTED = 2 + RETURNED = 3 class EventCode(IntEnum): + NONE = 0 CALL_STARTING = CallState.STARTING CALL_STARTED = CallState.STARTED CALL_RETURNED = CallState.RETURNED - YIELDED = 3 - STREAM_READ = 4 - STREAM_WRITE = 5 - FUTURE_READ = 6 - FUTURE_WRITE = 7 + STREAM_READ = 5 + STREAM_WRITE = 6 + FUTURE_READ = 7 + FUTURE_WRITE = 8 EventTuple = tuple[EventCode, int, int] ``` @@ -876,15 +895,19 @@ polling. class WaitableSet: elems: list[Waitable] maybe_has_pending_event: asyncio.Event + num_waiting: int def __init__(self): self.elems = [] self.maybe_has_pending_event = asyncio.Event() + self.num_waiting = 0 async def wait(self) -> EventTuple: + self.num_waiting += 1 while True: await self.maybe_has_pending_event.wait() if (e := self.poll()): + self.num_waiting -= 1 return e def poll(self) -> Optional[EventTuple]: @@ -899,11 +922,11 @@ class WaitableSet: def drop(self): trap_if(len(self.elems) > 0) + trap_if(self.num_waiting > 0) ``` -The `WaitableSet.drop` method traps if dropped (by the owning `Task`) while it -still contains elements (whose `Waitable.waitable_set` field would become -dangling). This can happen if a task tries to exit in the middle of a stream or -future copy operation. +The `WaitableSet.drop` method traps if dropped while it still contains elements +(whose `Waitable.waitable_set` field would become dangling) or if it is being +waited-upon by another `Task`. Note: the `random.shuffle` in `poll` is meant to give runtimes the semantic freedom to schedule delivery of events non-deterministically (e.g., taking into @@ -955,7 +978,6 @@ turn only happens if the call is `async` *and* blocks. In this case, the self.supertask = task self.supertask.num_subtasks += 1 Waitable.__init__(self) - Waitable.join(self, task.waitable_set) return task.inst.waitables.add(self) ``` The `num_subtasks` increment ensures that the parent `Task` cannot `exit` @@ -2708,13 +2730,29 @@ declared types of the export. In any case, `canon lift` specifies how these variously-produced values are consumed as parameters (and produced as results) by a *single host-agnostic component*. -Based on this, `canon_lift` is defined: +Based on this, `canon_lift` is defined in chunks as follows: ```python async def canon_lift(opts, inst, ft, callee, caller, on_start, on_return, on_block): task = Task(opts, inst, ft, caller, on_return, on_block) flat_args = await task.enter(on_start) flat_ft = flatten_functype(opts, ft, 'lift') assert(types_match_values(flat_ft.params, flat_args)) +``` +Each call to `canon lift` creates a new `Task` and waits to enter the task, +allowing the task to express backpressure (for several independent reasons +listed in `Task.may_enter` above) before lowering the arguments into the +callee's memory. + +In the synchronous case, if `always-task-return` ABI option is set, the lifted +core wasm code must call `canon task.return` to return a value before returning +to `canon lift` (or else there will be a trap in `task.exit()`), which allows +the core wasm to do cleanup and finalization before returning. Otherwise, +if `always-task-return` is *not* set, `canon lift` calls `task.return` when +core wasm returns (which lifts the values return by core wasm) and then calls +the `post-return` function to let core wasm do cleanup and finalization. In +the future, `post-return` and the option to not set `always-task-return` may +be deprecated and removed. +```python if opts.sync: flat_results = await call_and_trap_on_throw(callee, task, flat_args) if not opts.always_task_return: @@ -2722,37 +2760,80 @@ async def canon_lift(opts, inst, ft, callee, caller, on_start, on_return, on_blo task.return_(flat_results) if opts.post_return is not None: [] = await call_and_trap_on_throw(opts.post_return, task, flat_results) + task.exit() + return +``` +Next, the asynchronous non-`callback` case is simple and requires `task.return` +to be called, effectively implying `always-task-return`. Asynchronous waiting +happens by core wasm calling `waitable-set.wait`. +```python else: if not opts.callback: [] = await call_and_trap_on_throw(callee, task, flat_args) assert(types_match_values(flat_ft.results, [])) + task.exit() + return +``` +In contrast, the aynchronous `callback` case does asynchronous waiting in +the event loop, with core wasm (repeatedly) returning instructions for what +to do next: +```python else: - [packed_ctx] = await call_and_trap_on_throw(callee, task, flat_args) - assert(types_match_values(flat_ft.results, [packed_ctx])) - while packed_ctx != 0: - is_yield = bool(packed_ctx & 1) - ctx = packed_ctx & ~1 - if is_yield: - await task.yield_(sync = False) - event, p1, p2 = (EventCode.YIELDED, 0, 0) + [packed] = await call_and_trap_on_throw(callee, task, flat_args) + s = None + while True: + code,si = unpack_callback_result(packed) + if si != 0: + s = task.inst.waitable_sets.get(si) + match code: + case CallbackCode.EXIT: + task.exit() + return + case CallbackCode.YIELD: + await task.yield_(opts.sync) + e = None + case CallbackCode.WAIT: + trap_if(not s) + e = await task.wait_on(s.wait(), sync = False) + case CallbackCode.POLL: + trap_if(not s) + await task.yield_(opts.sync) + e = s.poll() + if e: + event, p1, p2 = e else: - event, p1, p2 = await task.wait(sync = False) - [packed_ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, p1, p2]) - task.exit() -``` -In the `sync` case, if the `always-task-return` ABI option is *not* set, then -`task.return_` will be called by `callee` to return values; otherwise, -`task.return_` must be called by `canon_lift`. - -In the `async` case, there are two sub-cases depending on whether the -`callback` `canonopt` was set. When `callback` is present, waiting happens in -an "event loop" inside `canon_lift` which also allows yielding (i.e., allowing -other tasks to run without blocking) by setting the LSB of the returned `i32`. -Otherwise, waiting must happen by calling `task.wait` (defined below), which -potentially requires the runtime implementation to use a fiber (aka. stackful -coroutine) to switch to another task. Thus, `callback` is an optimization for -avoiding fiber creation for async languages that don't need it (e.g., JS, -Python, C# and Rust). + event, p1, p2 = (EventCode.NONE, 0, 0) + [packed] = await call_and_trap_on_throw(opts.callback, task, [event, p1, p2]) +``` +One detail worth noting here is that the index of the waitable set does not +need to be returned every time; as an optimization to avoid a `waitable_sets` +table access on every turn of the event loop, if the returned waitable set +index is `0` (which is an invalid table index anyways), the previous waitable +set will be used. + +The bit-packing scheme used for the `i32` `packed` return value is defined as +follows: +```python +class CallbackCode(IntEnum): + EXIT = 0 + YIELD = 1 + WAIT = 2 + POLL = 3 + MAX = 3 + +def unpack_callback_result(packed): + code = packed & 0xf + trap_if(code > CallbackCode.MAX) + assert(packed < 2**32) + assert(Table.MAX_LENGTH < 2**28) + waitable_set_index = packed >> 4 + return (CallbackCode(code), waitable_set_index) +``` +The ability to asynchronously wait, poll, yield and exit is thus available to +both the `callback` and non-`callback` cases, making `callback` just an +optimization to avoid allocating stacks for async languages that have avoided +the need for [stack-switching] by design (e.g., `async`/`await` in JS, Python, +C# and Rust). Uncaught Core WebAssembly [exceptions] result in a trap at component boundaries. Thus, if a component wishes to signal an error, it must use some @@ -2865,9 +2946,9 @@ immediately return control flow back to the `async` caller if `callee` blocks: subtask.finish() return (EventCode(subtask.state), subtaski, 0) subtask.set_event(subtask_event) - assert(0 < subtaski <= Table.MAX_LENGTH < 2**30) - assert(0 <= int(subtask.state) < 2**2) - flat_results = [subtaski | (int(subtask.state) << 30)] + assert(0 < subtaski <= Table.MAX_LENGTH < 2**28) + assert(0 <= int(subtask.state) < 2**4) + flat_results = [int(subtask.state) | (subtaski << 4)] return flat_results ``` @@ -3017,19 +3098,62 @@ Note that the "locally-defined" requirement above ensures that only the component instance defining a resource can access its representation. -### ๐Ÿ”€ `canon task.backpressure` +### ๐Ÿ”€ `canon context.get` + +For a canonical definition: +```wasm +(canon context.get $t $i (core func $f)) +``` +validation specifies: +* `$t` must be `i32` (for now; see [here][context-local storage]) +* `$i` must be less than `2` +* `$f` is given type `(func (result i32))` + +Calling `$f` invokes the following function, which reads the [context-local +storage] of the [current task]: +```python +async def canon_context_get(t, i, task): + assert(t == 'i32') + assert(i < ContextLocalStorage.LENGTH) + return [task.context.get(i)] +``` + + +### ๐Ÿ”€ `canon context.set` + +For a canonical definition: +```wasm +(canon context.set $t $i (core func $f)) +``` +validation specifies: +* `$t` must be `i32` (for now; see [here][context-local storage]) +* `$i` must be less than `2` +* `$f` is given type `(func (param $v i32))` + +Calling `$f` invokes the following function, which writes to the [context-local +storage] of the [current task]: +```python +async def canon_context_set(t, i, task, v): + assert(t == 'i32') + assert(i < ContextLocalStorage.LENGTH) + task.context.set(i, v) + return [] +``` + + +### ๐Ÿ”€ `canon backpressure.set` For a canonical definition: ```wasm -(canon task.backpressure (core func $f)) +(canon backpressure.set (core func $f)) ``` validation specifies: -* `$f` is given type `[i32] -> []` +* `$f` is given type `(func (param $enabled i32))` Calling `$f` invokes the following function, which sets the `backpressure` flag on the current `ComponentInstance`: ```python -async def canon_task_backpressure(task, flat_args): +async def canon_backpressure_set(task, flat_args): trap_if(task.opts.sync) task.inst.backpressure = bool(flat_args[0]) return [] @@ -3070,68 +3194,117 @@ This ensures that AOT fusion of `canon lift` and `canon lower` can generate a thunk that is indirectly called by `task.return` after these guards. -### ๐Ÿ”€ `canon task.wait` +### ๐Ÿ”€ `canon yield` For a canonical definition: ```wasm -(canon task.wait $async? (memory $mem) (core func $f)) +(canon yield $async? (core func $f)) ``` validation specifies: -* `$f` is given type `(func (param i32) (result i32))` +* `$f` is given type `(func)` + +Calling `$f` calls `Task.yield_` to allow other tasks to execute: +```python +async def canon_yield(sync, task): + trap_if(not task.inst.may_leave) + trap_if(task.opts.callback and not sync) + await task.yield_(sync) + return [] +``` +If `async` is not set, no other tasks *in the same component instance* can +execute, however tasks in *other* component instances may execute. This allows +a long-running task in one component to avoid starving other components without +needing support full reentrancy. + +The guard preventing `async` use of `task.poll` when a `callback` has +been used preserves the invariant that producer toolchains using +`callback` never need to handle multiple overlapping callback +activations. + + +### ๐Ÿ”€ `canon waitable-set.new` + +For a canonical definition: +```wasm +(canon waitable-set.new (core func $f)) +``` +validation specifies: +* `$f` is given type `(func (result i32))` -Calling `$f` waits for progress to be made in a subtask of the current task, -returning the event (which is currently simply a `CallState` value) and -writing the subtask index as an outparam: +Calling `$f` invokes the following function, which adds an empty waitable set +to the component instance's `waitable_sets` table: ```python -async def canon_task_wait(sync, mem, task, ptr): +async def canon_waitable_set_new(task): + trap_if(not task.inst.may_leave) + return [ task.inst.waitable_sets.add(WaitableSet()) ] +``` + + +### ๐Ÿ”€ `canon waitable-set.wait` + +For a canonical definition: +```wasm +(canon waitable-set.wait $async? (memory $mem) (core func $f)) +``` +validation specifies: +* `$f` is given type `(func (param $si) (param $ptr i32) (result i32))` + +Calling `$f` invokes the following function which waits for progress to be made +on a waitable in the given waitable set (indicated by index `$si`) and then +returning its `EventCode` and writing the payload values into linear memory: +```python +async def canon_waitable_set_wait(sync, mem, task, si, ptr): trap_if(not task.inst.may_leave) trap_if(task.opts.callback and not sync) - event, p1, p2 = await task.wait(sync) + s = task.inst.waitable_sets.get(si) + e = await task.wait_on(s.wait(), sync) + return unpack_event(mem, task, ptr, e) + +def unpack_event(mem, task, ptr, e: EventTuple): + event, p1, p2 = e cx = LiftLowerContext(CanonicalOptions(memory = mem), task.inst) store(cx, p1, U32Type(), ptr) store(cx, p2, U32Type(), ptr + 4) return [event] ``` -If `async` is not set, no other tasks may execute during `task.wait`, which -can be useful for producer toolchains in situations where interleaving is not -supported. However, this is generally worse for concurrency and thus producer -toolchains should set `async` when possible. When `$async` is set, `task.wait` -will only block the current `Task`, allowing other tasks to start or resume. +If `async` is not set, `wait_on` will prevent other tasks from executing in +the same component instance, which can be useful for producer toolchains in +situations where interleaving is not supported. However, this is generally +worse for concurrency and thus producer toolchains should set `async` when +possible. -`task.wait` can be called from a synchronously-lifted export so that even +`wait` can be called from a synchronously-lifted export so that even synchronous code can make concurrent import calls. In these synchronous cases, though, the automatic backpressure (applied by `Task.enter`) will ensure there is only ever at most once synchronously-lifted task executing in a component instance at a time. -The guard preventing `async` use of `task.wait` when a `callback` has -been used preserves the invariant that producer toolchains using -`callback` never need to handle multiple overlapping callback -activations. +The guard preventing `async` use of `wait` when a `callback` has been used +preserves the invariant that producer toolchains using `callback` never need to +handle multiple overlapping callback activations. -### ๐Ÿ”€ `canon task.poll` +### ๐Ÿ”€ `canon waitable-set.poll` For a canonical definition: ```wasm -(canon task.poll $async? (memory $mem) (core func $f)) +(canon waitable-set.poll $async? (memory $mem) (core func $f)) ``` validation specifies: -* `$f` is given type `(func (param i32) (result i32))` +* `$f` is given type `(func (param $si i32) (param $ptr i32) (result i32))` -Calling `$f` does a non-blocking check for whether an event is already -available, returning whether or not there was such an event as a boolean and, -if there was an event, storing the `i32` event and payloads as outparams. +Calling `$f` invokes the following function, which returns `NONE` (`0`) instead +of blocking if there is no event available, and otherwise returns the event the +same way as `wait`. ```python -async def canon_task_poll(sync, mem, task, ptr): +async def canon_waitable_set_poll(sync, mem, task, si, ptr): trap_if(not task.inst.may_leave) trap_if(task.opts.callback and not sync) - ret = await task.poll(sync) - if ret is None: - return [0] - cx = LiftLowerContext(CanonicalOptions(memory = mem), task.inst) - store(cx, ret, TupleType([U32Type(), U32Type(), U32Type()]), ptr) - return [1] + s = task.inst.waitable_sets.get(si) + await task.yield_(sync) + if (e := s.poll()): + return unpack_event(mem, task, ptr, e) + return [EventCode.NONE] ``` When `async` is set, `task.poll` can yield to other tasks (in this or other components) as part of polling for an event. @@ -3142,32 +3315,51 @@ been used preserves the invariant that producer toolchains using activations. -### ๐Ÿ”€ `canon task.yield` +### ๐Ÿ”€ `canon waitable-set.drop` For a canonical definition: ```wasm -(canon task.yield $async? (core func $f)) +(canon waitable-set.drop (core func $f)) ``` validation specifies: -* `$f` is given type `(func)` +* `$f` is given type `(func (param i32))` -Calling `$f` calls `Task.yield_` to allow other tasks to execute: +Calling `$f` invokes the following function, which removes the given +waitable set from the component instance table, performing the guards defined +by `WaitableSet.drop` above: ```python -async def canon_task_yield(sync, task): +async def canon_waitable_set_drop(task, i): trap_if(not task.inst.may_leave) - trap_if(task.opts.callback and not sync) - await task.yield_(sync) + s = task.inst.waitable_sets.remove(i) + s.drop() return [] ``` -If `sync` is set, no other tasks *in the same component instance* can -execute, however tasks in *other* component instances may execute. This allows -a long-running task in one component to avoid starving other components -without needing support full reentrancy. -The guard preventing `async` use of `task.poll` when a `callback` has -been used preserves the invariant that producer toolchains using -`callback` never need to handle multiple overlapping callback -activations. + +### ๐Ÿ”€ `canon waitable.join` + +For a canonical definition: +```wasm +(canon waitable.join (core func $f)) +``` +validation specifies: +* `$f` is given type `(func (param $wi i32) (param $si i32))` + +Calling `$f` invokes the following function: +```python +async def canon_waitable_join(task, wi, si): + trap_if(not task.inst.may_leave) + w = task.inst.waitables.get(wi) + if si == 0: + w.join(None) + else: + w.join(task.inst.waitable_sets.get(si)) + return [] +``` +Note that tables do not allow elements at index `0`, so `0` is a valid sentinel +that tells `join` to remove the given waitable from any set that it is +currently a part of. Waitables can be a member of at most one set, so if the +given waitable is already in one set, it will be transferred. ### ๐Ÿ”€ `canon subtask.drop` @@ -3311,7 +3503,6 @@ context-switching overhead. def copy_event(revoke_buffer): revoke_buffer() e.copying = False - e.join(None) return (event_code, i, pack_copy_result(task, buffer, e)) def on_partial_copy(revoke_buffer): e.set_event(partial(copy_event, revoke_buffer)) @@ -3319,7 +3510,6 @@ context-switching overhead. e.set_event(partial(copy_event, revoke_buffer = lambda:())) if e.copy(buffer, on_partial_copy, on_copy_done) != 'done': e.copying = True - e.join(task.waitable_set) return [BLOCKED] return [pack_copy_result(task, buffer, e)] ``` @@ -3641,6 +3831,7 @@ def canon_thread_available_parallelism(): [Structured Concurrency]: Async.md#structured-concurrency [Current Task]: Async.md#current-task [Readable and Writable Ends]: Async.md#streams-and-futures +[Context-Local Storage]: Async.md#context-local-storage [Administrative Instructions]: https://webassembly.github.io/spec/core/exec/runtime.html#syntax-instr-admin [Implementation Limits]: https://webassembly.github.io/spec/core/appendix/implementation.html diff --git a/design/mvp/Explainer.md b/design/mvp/Explainer.md index 31463ea7..c7d5c878 100644 --- a/design/mvp/Explainer.md +++ b/design/mvp/Explainer.md @@ -1410,11 +1410,16 @@ canon ::= ... | (canon resource.new (core func ?)) | (canon resource.drop async? (core func ?)) | (canon resource.rep (core func ?)) - | (canon task.backpressure (core func ?)) ๐Ÿ”€ + | (canon context.get (core func ?)) ๐Ÿ”€ + | (canon context.set (core func ?)) ๐Ÿ”€ + | (canon backpressure.set (core func ?)) ๐Ÿ”€ | (canon task.return (result )? * (core func ?)) ๐Ÿ”€ - | (canon task.wait async? (memory ) (core func ?)) ๐Ÿ”€ - | (canon task.poll async? (memory ) (core func ?)) ๐Ÿ”€ - | (canon task.yield async? (core func ?)) ๐Ÿ”€ + | (canon yield async? (core func ?)) ๐Ÿ”€ + | (canon waitable-set.new (core func ?)) ๐Ÿ”€ + | (canon waitable-set.wait async? (memory ) (core func ?)) ๐Ÿ”€ + | (canon waitable-set.poll async? (memory ) (core func ?)) ๐Ÿ”€ + | (canon waitable-set.drop (core func ?)) ๐Ÿ”€ + | (canon waitable.join (core func ?)) ๐Ÿ”€ | (canon subtask.drop (core func ?)) ๐Ÿ”€ | (canon stream.new (core func ?)) ๐Ÿ”€ | (canon stream.read * (core func ?)) ๐Ÿ”€ @@ -1538,18 +1543,44 @@ transferring ownership of the newly-created resource to the export's caller. See the [async explainer](Async.md) for high-level context and terminology and the [Canonical ABI explainer] for detailed runtime semantics. -###### ๐Ÿ”€ `task.backpressure` +###### ๐Ÿ”€ `context.get` + +| Synopsis | | +| -------------------------- | ------------------ | +| Approximate WIT signature | `func() -> T` | +| Canonical ABI signature | `[] -> [T]` | + +The `context.get` built-in returns the `i`th element of the [current task]'s +[context-local storage] array. Validation currently restricts `i` to be less +than 2 and `t` to be `i32`, but will be relaxed in the future (as described +[here][context-local storage]). (See also [`canon_context_get`] in the +Canonical ABI explainer for details.) + +###### ๐Ÿ”€ `context.set` + +| Synopsis | | +| -------------------------- | ----------------- | +| Approximate WIT signature | `func(v: T)` | +| Canonical ABI signature | `[T] -> []` | + +The `context.set` built-in sets the `i`th element of the [current task]'s +[context-local storage] array to the value `v`. Validation currently +restricts `i` to be less than 2 and `t` to be `i32`, but will be relaxed in the +future (as described [here][context-local storage]). (See also +[`canon_context_set`] in the Canonical ABI explainer for details.) + +###### ๐Ÿ”€ `backpressure.set` | Synopsis | | | -------------------------- | --------------------- | | Approximate WIT signature | `func(enable: bool)` | | Canonical ABI signature | `[enable:i32] -> []` | -The `task.backpressure` built-in allows the async-lifted callee to toggle a +The `backpressure.set` built-in allows the async-lifted callee to toggle a per-component-instance flag that, when set, prevents new incoming export calls to the component (until the flag is unset). This allows the component to exert -[backpressure]. (See also [`canon_task_backpressure`] in the Canonical ABI -explainer.) +[backpressure]. (See also [`canon_backpressure_set`] in the Canonical ABI +explainer for details.) ###### ๐Ÿ”€ `task.return` @@ -1561,24 +1592,52 @@ called, the declared return type and `canonopt`s are checked to exactly match those of the current task. (See also "[Returning]" in the async explainer and [`canon_task_return`] in the Canonical ABI explainer.) -###### ๐Ÿ”€ `task.wait` +###### ๐Ÿ”€ `yield` + +| Synopsis | | +| -------------------------- | ------------------ | +| Approximate WIT signature | `func()` | +| Canonical ABI signature | `[] -> []` | + +The `yield` built-in allows the runtime to switch to other tasks, enabling a +long-running computation to cooperatively interleave execution. If the `async` +immediate is present, the runtime can switch to other tasks in the *same* +component instance, which the calling core wasm must be prepared to handle. If +`async` is not present, only tasks in *other* component instances may be +switched to. (See also [`canon_yield`] in the Canonical ABI explainer for +details.) -| Synopsis | | -| -------------------------- | ---------------------------------------- | -| Approximate WIT signature | `func() -> event` | -| Canonical ABI signature | `[payload_addr:i32] -> [event-kind:i32]` | +###### ๐Ÿ”€ `waitable-set.new` -where `event`, `event-kind`, and `payload` are defined in WIT as: +| Synopsis | | +| -------------------------- | ------------------------ | +| Approximate WIT signature | `func() -> waitable-set` | +| Canonical ABI signature | `[] -> [i32]` | + +The `waitable-set.new` built-in returns the `i32` index of a new [waitable +set]. The `waitable-set` type is not a true WIT-level type but instead serves +to document associated built-ins below. Waitable sets start out empty and are +populated explicitly with [waitables] by `waitable.join`. (See also +[`canon_waitable_set_new`] in the Canonical ABI explainer for details.) + +###### ๐Ÿ”€ `waitable-set.wait` + +| Synopsis | | +| -------------------------- | ---------------------------------------------- | +| Approximate WIT signature | `func(s: waitable-set) -> event` | +| Canonical ABI signature | `[s:i32 payload-addr:i32] -> [event-code:i32]` | + +where `event`, `event-code`, and `payload` are defined in WIT as: ```wit record event { - kind: event-kind, + kind: event-code, payload: payload, } -enum event-kind { +enum event-code { + none, call-starting, call-started, call-returned, - yielded, stream-read, stream-write, future-read, @@ -1590,8 +1649,11 @@ record payload { } ``` -The `task.wait` built-in waits for one of the pending events to occur, and then -returns an `event` describing it. +The `waitable-set.wait` built-in waits for any one of the [waitables] in the +given [waitable set] `s` to make progress and then returns an `event` +describing the event. The `event-code` `none` is never returned. Waitable sets +may be `wait`ed upon when empty, in which case the caller will necessarily +block until another task adds a waitable to the set that can make progress. If the `async` immediate is present, other tasks in the same component instance can be started (via export call) or resumed while the current task blocks. If @@ -1599,49 +1661,62 @@ can be started (via export call) or resumed while the current task blocks. If code until `wait` returns (however, *other* component instances may execute code in the interim). -In the Canonical ABI, the return value provides the `event-kind`, and the -`payload` value is stored at the address passed as the `payload_addr` -parameter. (See also "[Waiting]" in the async explainer and [`canon_task_wait`] -in the Canonical ABI explainer.) +In the Canonical ABI, the return value provides the `event-code`, and the +`payload` value is stored at the address passed as the `payload-addr` +parameter. (See also [`canon_waitable_set_wait`] in the Canonical ABI explainer +for details.) -###### ๐Ÿ”€ `task.poll` +###### ๐Ÿ”€ `waitable-set.poll` -| Synopsis | | -| -------------------------- | ----------------------------------- | -| Approximate WIT signature | `func() -> option ` | -| Canonical ABI signature | `[event_addr:i32] -> [is_some:i32]` | +| Synopsis | | +| -------------------------- | ---------------------------------------------- | +| Approximate WIT signature | `func(s: waitable-set) -> event` | +| Canonical ABI signature | `[s:i32 payload-addr:i32] -> [event-code:i32]` | -where `event`, `event-kind`, and `payload` are defined as in [`task.wait`](#-taskwait). +where `event`, `event-code`, and `payload` are defined as in +[`waitable-set.wait`](#-waitable-setwait). -The `task.poll` built-in returns either `none` if no event was immediately -available, or `some` containing an event code and payload. `poll` implicitly -performs a `task.yield`, allowing other tasks to be scheduled before `poll` -returns. The `async?` immediate is passed to `task.yield`, determining whether -other tasks in the same component instance may execute. +The `waitable-set.poll` built-in returns the `event-code` `none` if no event +was available without blocking. `poll` implicitly performs a `yield`, allowing +other tasks to be scheduled before `poll` returns. The `async?` immediate is +passed to `yield`, determining whether other code in the same component +instance may execute. -In the Canonical ABI, the return value `is_some` holds a boolean value -indicating whether an event was immediately available, and if so, the `event` -value, containing the code and payloads are stored into the buffer pointed to -by `event_addr`. (See also [`canon_task_poll`] n the Canonical ABI explainer.) +The Canonical ABI of `waitable-set.poll` is the same as `waitable-set.wait` +(with the `none` case indicated by returning `0`). (See also +[`canon_waitable_set_poll`] in the Canonical ABI explainer for details.) -###### ๐Ÿ”€ `task.yield` +###### ๐Ÿ”€ `waitable-set.drop` -| Synopsis | | -| -------------------------- | ------------------ | -| Approximate WIT signature | `func()` | -| Canonical ABI signature | `[] -> []` | - -The `task.yield` built-in allows the runtime to switch to another task, -enabling a long-running computation to cooperatively interleave execution with -other tasks. - -If the `async` immediate is present, other tasks in the same component instance -can be started (via export call) or resumed while the current task blocks and -thus the core wasm calling `task.yield` must be reentrant. If `async` is not -present, only tasks in *other* component instances may execute, and thus the -calling core wasm will not observe any reentrance. - -(See also [`canon_task_yield`] in the Canonical ABI explainer.) +| Synopsis | | +| -------------------------- | ------------------------ | +| Approximate WIT signature | `func(s: waitable-set)` | +| Canonical ABI signature | `[s:i32] -> []` | + +The `waitable-set.drop` built-in removes the indicated [waitable set] from the +current instance's table of waitable sets, trapping if the waitable set is not +empty or if another task is concurrently `wait`ing on it. (See also +[`canon_waitable_set_drop`] in the Canonical ABI explainer for details.) + +###### ๐Ÿ”€ `waitable.join` + +| Synopsis | | +| -------------------------- | ---------------------------------------------------- | +| Approximate WIT signature | `func(w: waitable, maybe_set: option)` | +| Canonical ABI signature | `[w:i32, maybe_set:i32] -> []` | + +The `waitable.join` built-in may be called given a [waitable] and an optional +[waitable set]. `join` first removes `w` from any waitable set that it is a +member of and then, if `maybe_set` is not `none`, `w` is added to that set. +Thus, `join` can be used to arbitrarily add, change and remove waitables from +waitable sets in the same component instance, preserving the invariant that a +waitable can be in at most one set. + +In the Canonical ABI, `w` is an index into the component instance's [waitables] +table and can be any type of waitable (`subtask` or +`{readable,writable}-{stream,future}-end`). A value of `0` represents a `none` +`maybe_set`, since `0` is not a valid table index. (See also +[`canon_waitable_join`] in the Canonical ABI explainer for details.) ###### ๐Ÿ”€ `subtask.drop` @@ -2711,11 +2786,16 @@ For some use-case-focused, worked examples, see: [Adapter Functions]: FutureFeatures.md#custom-abis-via-adapter-functions [Canonical ABI explainer]: CanonicalABI.md +[`canon_context_get`]: CanonicalABI.md#-canon-contextget +[`canon_context_set`]: CanonicalABI.md#-canon-contextset +[`canon_backpressure_set`]: CanonicalABI.md#-canon-backpressureset [`canon_task_return`]: CanonicalABI.md#-canon-taskreturn -[`canon_task_wait`]: CanonicalABI.md#-canon-taskwait -[`canon_task_poll`]: CanonicalABI.md#-canon-taskpoll -[`canon_task_yield`]: CanonicalABI.md#-canon-taskyield -[`canon_task_backpressure`]: CanonicalABI.md#-canon-taskbackpressure +[`canon_yield`]: CanonicalABI.md#-canon-yield +[`canon_waitable_set_new`]: CanonicalABI.md#-canon-waitable-setnew +[`canon_waitable_set_wait`]: CanonicalABI.md#-canon-waitable-setwait +[`canon_waitable_set_poll`]: CanonicalABI.md#-canon-waitable-setpoll +[`canon_waitable_set_drop`]: CanonicalABI.md#-canon-waitable-setdrop +[`canon_waitable_join`]: CanonicalABI.md#-canon-waitablejoin [`canon_stream_new`]: CanonicalABI.md#-canon-streamfuturenew [`canon_stream_read`]: CanonicalABI.md#-canon-streamfuturereadwrite [`canon_future_read`]: CanonicalABI.md#-canon-streamfuturereadwrite @@ -2737,12 +2817,14 @@ For some use-case-focused, worked examples, see: [Task]: Async.md#task [Current Task]: Async.md#current-task +[Context-Local Storage]: Async.md#context-local-storage [Subtask]: Async.md#subtask [Stream or Future]: Async.md#streams-and-futures [Readable or Writable End]: Async.md#streams-and-futures [Writable End]: Async.md#streams-and-futures [Waiting]: Async.md#waiting [Waitables]: Async.md#waiting +[Waitable Set]: Async.md#waiting [Backpressure]: Async.md#backpressure [Returning]: Async.md#returning diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index bad50607..ad901c8b 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -206,6 +206,7 @@ class CanonicalOptions: class ComponentInstance: resources: Table[ResourceHandle] waitables: Table[Waitable] + waitable_sets: Table[WaitableSet] error_contexts: Table[ErrorContext] may_leave: bool backpressure: bool @@ -217,6 +218,7 @@ class ComponentInstance: def __init__(self): self.resources = Table[ResourceHandle]() self.waitables = Table[Waitable]() + self.waitable_sets = Table[WaitableSet]() self.error_contexts = Table[ErrorContext]() self.may_leave = True self.backpressure = False @@ -232,7 +234,7 @@ class Table(Generic[ElemT]): array: list[Optional[ElemT]] free: list[int] - MAX_LENGTH = 2**30 - 1 + MAX_LENGTH = 2**28 - 1 def __init__(self): self.array = [None] @@ -343,6 +345,22 @@ def write(self, vs): assert(all(v == () for v in vs)) self.progress += len(vs) +#### Context-Local Storage + +class ContextLocalStorage: + LENGTH = 2 + array: list[int] + + def __init__(self): + self.array = [0] * ContextLocalStorage.LENGTH + + def set(self, i, v): + assert(types_match_values(['i32'], [v])) + self.array[i] = v + + def get(self, i): + return self.array[i] + #### Task State class Task: @@ -352,9 +370,9 @@ class Task: caller: Optional[Task] on_return: Optional[Callable] on_block: Callable[[Awaitable], Awaitable] - waitable_set: WaitableSet num_subtasks: int num_borrows: int + context: ContextLocalStorage def __init__(self, opts, inst, ft, caller, on_return, on_block): self.opts = opts @@ -363,9 +381,9 @@ def __init__(self, opts, inst, ft, caller, on_return, on_block): self.caller = caller self.on_return = on_return self.on_block = on_block - self.waitable_set = WaitableSet() self.num_subtasks = 0 self.num_borrows = 0 + self.context = ContextLocalStorage() current = asyncio.Lock() @@ -413,13 +431,6 @@ def maybe_start_pending_task(self): async def yield_(self, sync): await self.wait_on(asyncio.sleep(0), sync) - async def wait(self, sync) -> EventTuple: - return await self.wait_on(self.waitable_set.wait(), sync) - - async def poll(self, sync) -> Optional[EventTuple]: - await self.yield_(sync) - return self.waitable_set.poll() - async_waiting_tasks = asyncio.Condition(current) async def wait_on(self, awaitable, sync): @@ -480,7 +491,6 @@ def return_(self, flat_results): def exit(self): assert(Task.current.locked()) trap_if(self.num_subtasks > 0) - self.waitable_set.drop() trap_if(self.on_return) assert(self.num_borrows == 0) if self.opts.sync: @@ -491,19 +501,19 @@ def exit(self): #### Waitable State class CallState(IntEnum): - STARTING = 0 - STARTED = 1 - RETURNED = 2 + STARTING = 1 + STARTED = 2 + RETURNED = 3 class EventCode(IntEnum): + NONE = 0 CALL_STARTING = CallState.STARTING CALL_STARTED = CallState.STARTED CALL_RETURNED = CallState.RETURNED - YIELDED = 3 - STREAM_READ = 4 - STREAM_WRITE = 5 - FUTURE_READ = 6 - FUTURE_WRITE = 7 + STREAM_READ = 5 + STREAM_WRITE = 6 + FUTURE_READ = 7 + FUTURE_WRITE = 8 EventTuple = tuple[EventCode, int, int] @@ -553,15 +563,19 @@ def drop(self): class WaitableSet: elems: list[Waitable] maybe_has_pending_event: asyncio.Event + num_waiting: int def __init__(self): self.elems = [] self.maybe_has_pending_event = asyncio.Event() + self.num_waiting = 0 async def wait(self) -> EventTuple: + self.num_waiting += 1 while True: await self.maybe_has_pending_event.wait() if (e := self.poll()): + self.num_waiting -= 1 return e def poll(self) -> Optional[EventTuple]: @@ -576,6 +590,7 @@ def poll(self) -> Optional[EventTuple]: def drop(self): trap_if(len(self.elems) > 0) + trap_if(self.num_waiting > 0) #### Subtask State @@ -596,7 +611,6 @@ def add_to_waitables(self, task): self.supertask = task self.supertask.num_subtasks += 1 Waitable.__init__(self) - Waitable.join(self, task.waitable_set) return task.inst.waitables.add(self) def add_lender(self, lending_handle): @@ -1723,23 +1737,55 @@ async def canon_lift(opts, inst, ft, callee, caller, on_start, on_return, on_blo task.return_(flat_results) if opts.post_return is not None: [] = await call_and_trap_on_throw(opts.post_return, task, flat_results) + task.exit() + return else: if not opts.callback: [] = await call_and_trap_on_throw(callee, task, flat_args) assert(types_match_values(flat_ft.results, [])) + task.exit() + return else: - [packed_ctx] = await call_and_trap_on_throw(callee, task, flat_args) - assert(types_match_values(flat_ft.results, [packed_ctx])) - while packed_ctx != 0: - is_yield = bool(packed_ctx & 1) - ctx = packed_ctx & ~1 - if is_yield: - await task.yield_(sync = False) - event, p1, p2 = (EventCode.YIELDED, 0, 0) + [packed] = await call_and_trap_on_throw(callee, task, flat_args) + s = None + while True: + code,si = unpack_callback_result(packed) + if si != 0: + s = task.inst.waitable_sets.get(si) + match code: + case CallbackCode.EXIT: + task.exit() + return + case CallbackCode.YIELD: + await task.yield_(opts.sync) + e = None + case CallbackCode.WAIT: + trap_if(not s) + e = await task.wait_on(s.wait(), sync = False) + case CallbackCode.POLL: + trap_if(not s) + await task.yield_(opts.sync) + e = s.poll() + if e: + event, p1, p2 = e else: - event, p1, p2 = await task.wait(sync = False) - [packed_ctx] = await call_and_trap_on_throw(opts.callback, task, [ctx, event, p1, p2]) - task.exit() + event, p1, p2 = (EventCode.NONE, 0, 0) + [packed] = await call_and_trap_on_throw(opts.callback, task, [event, p1, p2]) + +class CallbackCode(IntEnum): + EXIT = 0 + YIELD = 1 + WAIT = 2 + POLL = 3 + MAX = 3 + +def unpack_callback_result(packed): + code = packed & 0xf + trap_if(code > CallbackCode.MAX) + assert(packed < 2**32) + assert(Table.MAX_LENGTH < 2**28) + waitable_set_index = packed >> 4 + return (CallbackCode(code), waitable_set_index) async def call_and_trap_on_throw(callee, task, args): try: @@ -1794,9 +1840,9 @@ def subtask_event(): subtask.finish() return (EventCode(subtask.state), subtaski, 0) subtask.set_event(subtask_event) - assert(0 < subtaski <= Table.MAX_LENGTH < 2**30) - assert(0 <= int(subtask.state) < 2**2) - flat_results = [subtaski | (int(subtask.state) << 30)] + assert(0 < subtaski <= Table.MAX_LENGTH < 2**28) + assert(0 <= int(subtask.state) < 2**4) + flat_results = [int(subtask.state) | (subtaski << 4)] return flat_results @@ -1842,9 +1888,24 @@ async def canon_resource_rep(rt, task, i): trap_if(h.rt is not rt) return [h.rep] -### ๐Ÿ”€ `canon task.backpressure` +### ๐Ÿ”€ `canon context.get` + +async def canon_context_get(t, i, task): + assert(t == 'i32') + assert(i < ContextLocalStorage.LENGTH) + return [task.context.get(i)] + +### ๐Ÿ”€ `canon context.set` -async def canon_task_backpressure(task, flat_args): +async def canon_context_set(t, i, task, v): + assert(t == 'i32') + assert(i < ContextLocalStorage.LENGTH) + task.context.set(i, v) + return [] + +### ๐Ÿ”€ `canon backpressure.set` + +async def canon_backpressure_set(task, flat_args): trap_if(task.opts.sync) task.inst.backpressure = bool(flat_args[0]) return [] @@ -1859,35 +1920,64 @@ async def canon_task_return(task, result_type, opts, flat_args): task.return_(flat_args) return [] -### ๐Ÿ”€ `canon task.wait` +### ๐Ÿ”€ `canon yield` + +async def canon_yield(sync, task): + trap_if(not task.inst.may_leave) + trap_if(task.opts.callback and not sync) + await task.yield_(sync) + return [] + +### ๐Ÿ”€ `canon waitable-set.new` -async def canon_task_wait(sync, mem, task, ptr): +async def canon_waitable_set_new(task): + trap_if(not task.inst.may_leave) + return [ task.inst.waitable_sets.add(WaitableSet()) ] + +### ๐Ÿ”€ `canon waitable-set.wait` + +async def canon_waitable_set_wait(sync, mem, task, si, ptr): trap_if(not task.inst.may_leave) trap_if(task.opts.callback and not sync) - event, p1, p2 = await task.wait(sync) + s = task.inst.waitable_sets.get(si) + e = await task.wait_on(s.wait(), sync) + return unpack_event(mem, task, ptr, e) + +def unpack_event(mem, task, ptr, e: EventTuple): + event, p1, p2 = e cx = LiftLowerContext(CanonicalOptions(memory = mem), task.inst) store(cx, p1, U32Type(), ptr) store(cx, p2, U32Type(), ptr + 4) return [event] -### ๐Ÿ”€ `canon task.poll` +### ๐Ÿ”€ `canon waitable-set.poll` -async def canon_task_poll(sync, mem, task, ptr): +async def canon_waitable_set_poll(sync, mem, task, si, ptr): trap_if(not task.inst.may_leave) trap_if(task.opts.callback and not sync) - ret = await task.poll(sync) - if ret is None: - return [0] - cx = LiftLowerContext(CanonicalOptions(memory = mem), task.inst) - store(cx, ret, TupleType([U32Type(), U32Type(), U32Type()]), ptr) - return [1] + s = task.inst.waitable_sets.get(si) + await task.yield_(sync) + if (e := s.poll()): + return unpack_event(mem, task, ptr, e) + return [EventCode.NONE] -### ๐Ÿ”€ `canon task.yield` +### ๐Ÿ”€ `canon waitable-set.drop` -async def canon_task_yield(sync, task): +async def canon_waitable_set_drop(task, i): trap_if(not task.inst.may_leave) - trap_if(task.opts.callback and not sync) - await task.yield_(sync) + s = task.inst.waitable_sets.remove(i) + s.drop() + return [] + +### ๐Ÿ”€ `canon waitable.join` + +async def canon_waitable_join(task, wi, si): + trap_if(not task.inst.may_leave) + w = task.inst.waitables.get(wi) + if si == 0: + w.join(None) + else: + w.join(task.inst.waitable_sets.get(si)) return [] ### ๐Ÿ”€ `canon subtask.drop` @@ -1954,7 +2044,6 @@ def on_partial_copy(revoke_buffer): def copy_event(revoke_buffer): revoke_buffer() e.copying = False - e.join(None) return (event_code, i, pack_copy_result(task, buffer, e)) def on_partial_copy(revoke_buffer): e.set_event(partial(copy_event, revoke_buffer)) @@ -1962,7 +2051,6 @@ def on_copy_done(): e.set_event(partial(copy_event, revoke_buffer = lambda:())) if e.copy(buffer, on_partial_copy, on_copy_done) != 'done': e.copying = True - e.join(task.waitable_set) return [BLOCKED] return [pack_copy_result(task, buffer, e)] diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index c0d1c84d..8af438d6 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -61,7 +61,7 @@ def mk_tup_rec(x): return { str(i):mk_tup_rec(v) for i,v in enumerate(a) } def unpack_lower_result(ret): - return (ret & ~(3 << 30), ret >> 30) + return (ret & 0xf, ret >> 4) def fail(msg): raise BaseException(msg) @@ -535,9 +535,9 @@ async def core_eager_producer(task, args): fut1 = asyncio.Future() async def core_toggle(task, args): assert(len(args) == 0) - [] = await canon_task_backpressure(task, [1]) + [] = await canon_backpressure_set(task, [1]) await task.on_block(fut1) - [] = await canon_task_backpressure(task, [0]) + [] = await canon_backpressure_set(task, [0]) [] = await canon_task_return(task, [], producer_opts, []) return [] toggle_callee = partial(canon_lift, producer_opts, producer_inst, toggle_ft, core_toggle) @@ -554,40 +554,50 @@ async def core_blocking_producer(task, args): return [] blocking_callee = partial(canon_lift, producer_opts, producer_inst, blocking_ft, core_blocking_producer) - consumer_heap = Heap(10) + consumer_heap = Heap(20) consumer_opts = mk_opts(consumer_heap.memory) consumer_opts.sync = False async def consumer(task, args): [b] = args + [seti] = await canon_waitable_set_new(task) ptr = consumer_heap.realloc(0, 0, 1, 1) [ret] = await canon_lower(consumer_opts, eager_ft, eager_callee, task, [ptr]) assert(ret == 0) u8 = consumer_heap.memory[ptr] assert(u8 == 43) [ret] = await canon_lower(consumer_opts, toggle_ft, toggle_callee, task, []) - subi,state = unpack_lower_result(ret) + state,subi1 = unpack_lower_result(ret) + assert(subi1 == 1) assert(state == CallState.STARTED) + [] = await canon_waitable_join(task, subi1, seti) retp = ptr consumer_heap.memory[retp] = 13 [ret] = await canon_lower(consumer_opts, blocking_ft, blocking_callee, task, [83, retp]) - assert(ret == (2 | (CallState.STARTING << 30))) + state,subi2 = unpack_lower_result(ret) + assert(subi2 == 2) + assert(state == CallState.STARTING) assert(consumer_heap.memory[retp] == 13) + [] = await canon_waitable_join(task, subi2, seti) fut1.set_result(None) - event, callidx, _ = await task.wait(sync = False) + + waitretp = consumer_heap.realloc(0, 0, 8, 4) + [event] = await canon_waitable_set_wait(False, consumer_heap.memory, task, seti, waitretp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 1) - [] = await canon_subtask_drop(task, callidx) - event, callidx, _ = await task.wait(sync = True) + assert(consumer_heap.memory[waitretp] == subi1) + [] = await canon_subtask_drop(task, subi1) + + [event] = await canon_waitable_set_wait(True, consumer_heap.memory, task, seti, waitretp) assert(event == EventCode.CALL_STARTED) - assert(callidx == 2) + assert(consumer_heap.memory[waitretp] == subi2) assert(consumer_heap.memory[retp] == 13) fut2.set_result(None) - event, callidx, _ = await task.wait(sync = False) + + [event] = await canon_waitable_set_wait(False, consumer_heap.memory, task, seti, waitretp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 2) + assert(consumer_heap.memory[waitretp] == subi2) assert(consumer_heap.memory[retp] == 44) - [] = await canon_subtask_drop(task, callidx) + [] = await canon_subtask_drop(task, subi2) fut3.set_result(None) assert(await task.on_block(fut4) == "done") @@ -605,13 +615,19 @@ async def dtor(task, args): assert(i == 1) assert(dtor_value is None) [ret] = await canon_resource_drop(rt, False, task, 1) - assert(ret == (2 | (CallState.STARTED << 30))) + state,dtorsubi = unpack_lower_result(ret) + assert(dtorsubi == 2) + assert(state == CallState.STARTED) assert(dtor_value is None) dtor_fut.set_result(None) - event, callidx, _ = await task.wait(sync = False) + + [] = await canon_waitable_join(task, dtorsubi, seti) + [event] = await canon_waitable_set_wait(False, consumer_heap.memory, task, seti, waitretp) assert(event == CallState.RETURNED) - assert(callidx == 2) - [] = await canon_subtask_drop(task, callidx) + assert(consumer_heap.memory[waitretp] == dtorsubi) + assert(dtor_value == 50) + [] = await canon_subtask_drop(task, dtorsubi) + [] = await canon_waitable_set_drop(task, seti) [] = await canon_task_return(task, [U8Type()], consumer_opts, [42]) return [] @@ -655,36 +671,52 @@ async def consumer(task, args): assert(len(args) == 0) [ret] = await canon_lower(opts, producer_ft, producer1, task, []) - assert(ret == (1 | (CallState.STARTED << 30))) + state,subi1 = unpack_lower_result(ret) + assert(subi1 == 1) + assert(state == CallState.STARTED) [ret] = await canon_lower(opts, producer_ft, producer2, task, []) - assert(ret == (2 | (CallState.STARTED << 30))) + state,subi2 = unpack_lower_result(ret) + assert(subi2 == 2) + assert(state == CallState.STARTED) + + [seti] = await canon_waitable_set_new(task) + assert(seti == 1) + [] = await canon_waitable_join(task, subi1, seti) + [] = await canon_waitable_join(task, subi2, seti) fut1.set_result(None) - return [42] + [] = await canon_context_set('i32', 0, task, 42) + return [definitions.CallbackCode.WAIT|(seti << 4)] async def callback(task, args): - assert(len(args) == 4) - if args[0] == 42: - assert(args[1] == EventCode.CALL_RETURNED) - assert(args[2] == 1) - assert(args[3] == 0) - await canon_subtask_drop(task, 1) - return [53] - elif args[0] == 52: - assert(args[1] == EventCode.YIELDED) - assert(args[2] == 0) - assert(args[3] == 0) - fut2.set_result(None) - return [62] - else: - assert(args[0] == 62) - assert(args[1] == EventCode.CALL_RETURNED) - assert(args[2] == 2) - assert(args[3] == 0) - await canon_subtask_drop(task, 2) - [] = await canon_task_return(task, [U32Type()], opts, [83]) - return [0] + assert(len(args) == 3) + seti = 1 + [ctx] = await canon_context_get('i32', 0, task) + match ctx: + case 42: + assert(args[0] == EventCode.CALL_RETURNED) + assert(args[1] == 1) + assert(args[2] == 0) + await canon_subtask_drop(task, 1) + [] = await canon_context_set('i32', 0, task, 52) + return [definitions.CallbackCode.YIELD] + case 52: + assert(args[0] == EventCode.NONE) + assert(args[1] == 0) + assert(args[2] == 0) + fut2.set_result(None) + [] = await canon_context_set('i32', 0, task, 62) + return [definitions.CallbackCode.WAIT] + case 62: + assert(args[0] == EventCode.CALL_RETURNED) + assert(args[1] == 2) + assert(args[2] == 0) + await canon_subtask_drop(task, 2) + [] = await canon_task_return(task, [U32Type()], opts, [83]) + return [definitions.CallbackCode.EXIT] + case _: + assert(False) consumer_inst = ComponentInstance() def on_start(): return [] @@ -727,7 +759,8 @@ async def producer2_core(task, args): producer1 = partial(canon_lift, producer_opts, producer_inst, producer_ft, producer1_core) producer2 = partial(canon_lift, producer_opts, producer_inst, producer_ft, producer2_core) - consumer_opts = mk_opts() + consumer_heap = Heap(20) + consumer_opts = mk_opts(consumer_heap.memory) consumer_opts.sync = False consumer_ft = FuncType([],[U8Type()]) @@ -735,31 +768,40 @@ async def consumer(task, args): assert(len(args) == 0) [ret] = await canon_lower(consumer_opts, producer_ft, producer1, task, []) - assert(ret == (1 | (CallState.STARTED << 30))) + state,subi1 = unpack_lower_result(ret) + assert(subi1 == 1) + assert(state == CallState.STARTED) [ret] = await canon_lower(consumer_opts, producer_ft, producer2, task, []) - assert(ret == (2 | (CallState.STARTING << 30))) + state,subi2 = unpack_lower_result(ret) + assert(subi2 == 2) + assert(state == CallState.STARTING) - assert(await task.poll(sync = False) is None) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, subi1, seti) + [] = await canon_waitable_join(task, subi2, seti) fut.set_result(None) assert(producer1_done == False) - event, callidx, _ = await task.wait(sync = False) + + retp = consumer_heap.realloc(0,0,8,4) + [event] = await canon_waitable_set_wait(False, consumer_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 1) - await canon_subtask_drop(task, callidx) + assert(consumer_heap.memory[retp] == subi1) + await canon_subtask_drop(task, subi1) assert(producer1_done == True) assert(producer2_done == False) - await canon_task_yield(False, task) + await canon_yield(False, task) assert(producer2_done == True) - event, callidx, _ = await task.poll(sync = False) + + [event] = await canon_waitable_set_poll(False, consumer_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 2) - await canon_subtask_drop(task, callidx) + assert(consumer_heap.memory[retp] == subi2) + await canon_subtask_drop(task, subi2) assert(producer2_done == True) - assert(await task.poll(sync = True) is None) + [] = await canon_waitable_set_drop(task, seti) await canon_task_return(task, [U8Type()], consumer_opts, [83]) return [] @@ -786,9 +828,9 @@ async def test_async_backpressure(): producer1_done = False async def producer1_core(task, args): nonlocal producer1_done - await canon_task_backpressure(task, [1]) + await canon_backpressure_set(task, [1]) await task.on_block(fut) - await canon_task_backpressure(task, [0]) + await canon_backpressure_set(task, [0]) await canon_task_return(task, [], producer_opts, []) producer1_done = True return [] @@ -804,37 +846,46 @@ async def producer2_core(task, args): producer1 = partial(canon_lift, producer_opts, producer_inst, producer_ft, producer1_core) producer2 = partial(canon_lift, producer_opts, producer_inst, producer_ft, producer2_core) - consumer_opts = CanonicalOptions() - consumer_opts.sync = False + consumer_heap = Heap(20) + consumer_opts = mk_opts(consumer_heap.memory, sync = False) consumer_ft = FuncType([],[U8Type()]) async def consumer(task, args): assert(len(args) == 0) [ret] = await canon_lower(consumer_opts, producer_ft, producer1, task, []) - assert(ret == (1 | (CallState.STARTED << 30))) + state,subi1 = unpack_lower_result(ret) + assert(subi1 == 1) + assert(state == CallState.STARTED) [ret] = await canon_lower(consumer_opts, producer_ft, producer2, task, []) - assert(ret == (2 | (CallState.STARTING << 30))) + state,subi2 = unpack_lower_result(ret) + assert(subi2 == 2) + assert(state == CallState.STARTING) - assert(await task.poll(sync = False) is None) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, subi1, seti) + [] = await canon_waitable_join(task, subi2, seti) fut.set_result(None) assert(producer1_done == False) assert(producer2_done == False) - event, callidx, _ = await task.wait(sync = False) + + retp = consumer_heap.realloc(0,0,8,4) + [event] = await canon_waitable_set_wait(False, consumer_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 1) + assert(consumer_heap.memory[retp] == subi1) assert(producer1_done == True) - event, callidx, _ = await task.poll(sync = False) + + [event] = await canon_waitable_set_poll(False, consumer_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 2) + assert(consumer_heap.memory[retp] == subi2) assert(producer2_done == True) - await canon_subtask_drop(task, 1) - await canon_subtask_drop(task, 2) + await canon_subtask_drop(task, subi1) + await canon_subtask_drop(task, subi2) - assert(await task.poll(sync = False) is None) + [] = await canon_waitable_set_drop(task, seti) await canon_task_return(task, [U8Type()], consumer_opts, [84]) return [] @@ -868,26 +919,40 @@ async def core_hostcall_pre(fut, task, args): core_hostcall2 = partial(core_hostcall_pre, fut2) hostcall2 = partial(canon_lift, hostcall_opts, hostcall_inst, ft, core_hostcall2) - lower_opts = mk_opts() + lower_heap = Heap(20) + lower_opts = mk_opts(lower_heap.memory) lower_opts.sync = False async def core_func(task, args): [ret] = await canon_lower(lower_opts, ft, hostcall1, task, []) - assert(ret == (1 | (CallState.STARTED << 30))) + state,subi1 = unpack_lower_result(ret) + assert(subi1 == 1) + assert(state == CallState.STARTED) [ret] = await canon_lower(lower_opts, ft, hostcall2, task, []) - assert(ret == (2 | (CallState.STARTED << 30))) + state,subi2 = unpack_lower_result(ret) + assert(subi2 == 2) + assert(state == CallState.STARTED) + + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, subi1, seti) + [] = await canon_waitable_join(task, subi2, seti) fut1.set_result(None) - event, callidx, _ = await task.wait(sync = False) + + retp = lower_heap.realloc(0,0,8,4) + [event] = await canon_waitable_set_wait(False, lower_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 1) + assert(lower_heap.memory[retp] == subi1) + fut2.set_result(None) - event, callidx, _ = await task.wait(sync = False) + + [event] = await canon_waitable_set_wait(False, lower_heap.memory, task, seti, retp) assert(event == EventCode.CALL_RETURNED) - assert(callidx == 2) + assert(lower_heap.memory[retp] == subi2) - await canon_subtask_drop(task, 1) - await canon_subtask_drop(task, 2) + await canon_subtask_drop(task, subi1) + await canon_subtask_drop(task, subi2) + await canon_waitable_set_drop(task, seti) return [] @@ -1118,7 +1183,7 @@ async def core_func(task, args): async def test_async_stream_ops(): ft = FuncType([StreamType(U8Type())], [StreamType(U8Type())]) inst = ComponentInstance() - mem = bytearray(20) + mem = bytearray(24) opts = mk_opts(memory=mem, sync=False) sync_opts = mk_opts(memory=mem, sync=True) @@ -1159,13 +1224,15 @@ async def core_func(task, args): [ret] = await canon_stream_read(U8Type(), opts, task, rsi1, 0, 4) assert(ret == definitions.BLOCKED) src_stream.write([1,2,3,4]) - event, p1, p2 = await task.wait(sync = False) + retp = 16 + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rsi1, seti) + [event] = await canon_waitable_set_wait(False, mem, task, rsi1, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi1) - assert(p2 == 4) + assert(mem[retp+0] == rsi1) + assert(mem[retp+4] == 4) assert(mem[0:4] == b'\x01\x02\x03\x04') [wsi2] = await canon_stream_new(U8Type(), task) - retp = 16 [ret] = await canon_lower(opts, ft, host_import, task, [wsi2, retp]) assert(ret == 0) rsi2 = mem[16] @@ -1173,19 +1240,21 @@ async def core_func(task, args): [ret] = await canon_stream_write(U8Type(), opts, task, wsi2, 0, 4) assert(ret == definitions.BLOCKED) host_import_incoming.set_remain(100) - event, p1, p2 = await task.wait(sync = False) + [] = await canon_waitable_join(task, wsi2, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_WRITE) - assert(p1 == wsi2) - assert(p2 == 4) + assert(mem[retp+0] == wsi2) + assert(mem[retp+4] == 4) [ret] = await canon_stream_read(U8Type(), sync_opts, task, rsi2, 0, 4) assert(ret == 4) [ret] = await canon_stream_write(U8Type(), opts, task, wsi1, 0, 4) assert(ret == definitions.BLOCKED) dst_stream.set_remain(100) - event, p1, p2 = await task.wait(sync = False) + [] = await canon_waitable_join(task, wsi1, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_WRITE) - assert(p1 == wsi1) - assert(p2 == 4) + assert(mem[retp+0] == wsi1) + assert(mem[retp+4] == 4) src_stream.write([5,6,7,8]) src_stream.destroy_once_empty() [ret] = await canon_stream_read(U8Type(), opts, task, rsi1, 0, 4) @@ -1199,16 +1268,18 @@ async def core_func(task, args): [] = await canon_stream_close_writable(U8Type(), task, wsi2, 0) [ret] = await canon_stream_read(U8Type(), opts, task, rsi2, 0, 4) assert(ret == definitions.BLOCKED) - event, p1, p2 = await task.wait(sync = False) + [] = await canon_waitable_join(task, rsi2, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi2) - assert(p2 == 4) + assert(mem[retp+0] == rsi2) + assert(mem[retp+4] == 4) [ret] = await canon_stream_read(U8Type(), opts, task, rsi2, 0, 4) assert(ret == definitions.CLOSED) [] = await canon_stream_close_readable(U8Type(), task, rsi2) [ret] = await canon_stream_write(U8Type(), sync_opts, task, wsi1, 0, 4) assert(ret == 4) [] = await canon_stream_close_writable(U8Type(), task, wsi1, 0) + [] = await canon_waitable_set_drop(task, seti) return [] await canon_lift(opts, inst, ft, core_func, None, on_start, on_return, Task.sync_on_block) @@ -1311,10 +1382,13 @@ async def core_func(task, args): [ret] = await canon_stream_read(U8Type(), opts, task, rsi, 0, 4) assert(ret == definitions.BLOCKED) src.write([5,6]) - event, p1, p2 = await task.wait(sync = False) + + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rsi, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi) - assert(p2 == 2) + assert(mem[retp+0] == rsi) + assert(mem[retp+4] == 2) [] = await canon_stream_close_readable(U8Type(), task, rsi) [wsi] = await canon_stream_new(U8Type(), task) @@ -1327,12 +1401,14 @@ async def core_func(task, args): [ret] = await canon_stream_write(U8Type(), opts, task, wsi, 2, 6) assert(ret == definitions.BLOCKED) dst.set_remain(4) - event, p1, p2 = await task.wait(sync = False) + [] = await canon_waitable_join(task, wsi, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_WRITE) - assert(p1 == wsi) - assert(p2 == 4) + assert(mem[retp+0] == wsi) + assert(mem[retp+4] == 4) assert(dst.received == [1,2,3,4,5,6]) [] = await canon_stream_close_writable(U8Type(), task, wsi, 0) + [] = await canon_waitable_set_drop(task, seti) dst.set_remain(100) assert(await dst.consume(100) is None) return [] @@ -1349,7 +1425,7 @@ async def test_wasm_to_wasm_stream(): fut1, fut2, fut3, fut4 = asyncio.Future(), asyncio.Future(), asyncio.Future(), asyncio.Future() inst1 = ComponentInstance() - mem1 = bytearray(10) + mem1 = bytearray(24) opts1 = mk_opts(memory=mem1, sync=False) ft1 = FuncType([], [StreamType(U8Type())]) async def core_func1(task, args): @@ -1373,22 +1449,26 @@ async def core_func1(task, args): fut3.set_result(None) - event, p1, p2 = await task.wait(sync = False) + retp = 16 + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, wsi, seti) + [event] = await canon_waitable_set_wait(False, mem1, task, seti, retp) assert(event == EventCode.STREAM_WRITE) - assert(p1 == wsi) - assert(p2 == 4) + assert(mem1[retp+0] == wsi) + assert(mem1[retp+4] == 4) fut4.set_result(None) [errctxi] = await canon_error_context_new(opts1, task, 0, 0) [] = await canon_stream_close_writable(U8Type(), task, wsi, errctxi) + [] = await canon_waitable_set_drop(task, seti) [] = await canon_error_context_drop(task, errctxi) return [] func1 = partial(canon_lift, opts1, inst1, ft1, core_func1) inst2 = ComponentInstance() - heap2 = Heap(10) + heap2 = Heap(24) mem2 = heap2.memory opts2 = mk_opts(memory=heap2.memory, realloc=heap2.realloc, sync=False) ft2 = FuncType([], []) @@ -1396,10 +1476,10 @@ async def core_func2(task, args): assert(not args) [] = await canon_task_return(task, [], opts2, []) - retp = 0 + retp = 16 [ret] = await canon_lower(opts2, ft1, func1, task, [retp]) assert(ret == 0) - rsi = mem2[0] + rsi = mem2[retp] assert(rsi == 1) [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 8) @@ -1407,10 +1487,12 @@ async def core_func2(task, args): fut1.set_result(None) - event, p1, p2 = await task.wait(sync = False) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rsi, seti) + [event] = await canon_waitable_set_wait(False, mem2, task, seti, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi) - assert(p2 == 4) + assert(mem2[retp+0] == rsi) + assert(mem2[retp+4] == 4) assert(mem2[0:8] == b'\x01\x02\x03\x04\x00\x00\x00\x00') fut2.set_result(None) @@ -1430,6 +1512,7 @@ async def core_func2(task, args): errctxi = 1 assert(ret == (definitions.CLOSED | errctxi)) [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_waitable_set_drop(task, seti) [] = await canon_error_context_debug_message(opts2, task, errctxi, 0) [] = await canon_error_context_drop(task, errctxi) return [] @@ -1441,7 +1524,8 @@ async def test_wasm_to_wasm_stream_empty(): fut1, fut2, fut3, fut4 = asyncio.Future(), asyncio.Future(), asyncio.Future(), asyncio.Future() inst1 = ComponentInstance() - opts1 = mk_opts(memory=None, sync=False) + mem1 = bytearray(24) + opts1 = mk_opts(memory=mem1, sync=False) ft1 = FuncType([], [StreamType(None)]) async def core_func1(task, args): assert(not args) @@ -1462,10 +1546,13 @@ async def core_func1(task, args): fut3.set_result(None) - event, p1, p2 = await task.wait(sync = False) + retp = 16 + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, wsi, seti) + [event] = await canon_waitable_set_wait(False, mem1, task, seti, retp) assert(event == EventCode.STREAM_WRITE) - assert(p1 == wsi) - assert(p2 == 4) + assert(mem1[retp+0] == wsi) + assert(mem1[retp+4] == 4) fut4.set_result(None) @@ -1496,10 +1583,12 @@ async def core_func2(task, args): fut1.set_result(None) - event, p1, p2 = await task.wait(sync = False) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rsi, seti) + [event] = await canon_waitable_set_wait(False, mem2, task, seti, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi) - assert(p2 == 4) + assert(mem2[retp+0] == rsi) + assert(mem2[retp+4] == 4) fut2.set_result(None) await task.on_block(fut3) @@ -1524,7 +1613,7 @@ async def core_func2(task, args): async def test_cancel_copy(): inst = ComponentInstance() - mem = bytearray(10) + mem = bytearray(24) lower_opts = mk_opts(memory=mem, sync=False) host_ft1 = FuncType([StreamType(U8Type())],[]) @@ -1577,7 +1666,7 @@ async def core_func(task, args): host_sink.set_remain(100) assert(await host_sink.consume(100) is None) - retp = 0 + retp = 16 [ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp]) assert(ret == 0) rsi = mem[retp] @@ -1587,7 +1676,6 @@ async def core_func(task, args): assert(ret == 0) [] = await canon_stream_close_readable(U8Type(), task, rsi) - retp = 0 [ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp]) assert(ret == 0) rsi = mem[retp] @@ -1598,12 +1686,15 @@ async def core_func(task, args): assert(ret == definitions.BLOCKED) host_source.write([7,8]) await asyncio.sleep(0) - event,p1,p2 = await task.wait(sync = False) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rsi, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.STREAM_READ) - assert(p1 == rsi) - assert(p2 == 2) + assert(mem[retp+0] == rsi) + assert(mem[retp+4] == 2) assert(mem[0:2] == b'\x07\x08') [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_waitable_set_drop(task, seti) return [] @@ -1674,7 +1765,7 @@ def close(self, errctx = None): async def test_futures(): inst = ComponentInstance() - mem = bytearray(10) + mem = bytearray(24) lower_opts = mk_opts(memory=mem, sync=False) host_ft1 = FuncType([FutureType(U8Type())],[FutureType(U8Type())]) @@ -1692,7 +1783,7 @@ async def host_func(task, on_start, on_return, on_block): async def core_func(task, args): assert(not args) [wfi] = await canon_future_new(U8Type(), task) - retp = 0 + retp = 16 [ret] = await canon_lower(lower_opts, host_ft1, host_func, task, [wfi, retp]) assert(ret == 0) rfi = mem[retp] @@ -1706,17 +1797,19 @@ async def core_func(task, args): [ret] = await canon_future_write(U8Type(), lower_opts, task, wfi, writep) assert(ret == 1) - event,p1,p2 = await task.wait(sync = False) + [seti] = await canon_waitable_set_new(task) + [] = await canon_waitable_join(task, rfi, seti) + [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.FUTURE_READ) - assert(p1 == rfi) - assert(p2 == 1) + assert(mem[retp+0] == rfi) + assert(mem[retp+4] == 1) assert(mem[readp] == 43) [] = await canon_future_close_writable(U8Type(), task, wfi, 0) [] = await canon_future_close_readable(U8Type(), task, rfi) + [] = await canon_waitable_set_drop(task, seti) [wfi] = await canon_future_new(U8Type(), task) - retp = 0 [ret] = await canon_lower(lower_opts, host_ft1, host_func, task, [wfi, retp]) assert(ret == 0) rfi = mem[retp]