Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Jan 20, 2025
1 parent 9619c06 commit 9c5bf18
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 95 deletions.
133 changes: 71 additions & 62 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ class ComponentInstance:
waitables: Table[Waitable]
waitable_sets: Table[WaitableSet]
error_contexts: Table[ErrorContext]
num_tasks: int
may_leave: bool
backpressure: bool
interruptible: asyncio.Event
calling_sync_export: bool
calling_sync_import: bool
pending_tasks: list[tuple[Task, asyncio.Future]]
starting_pending_task: bool

Expand All @@ -178,11 +178,10 @@ class ComponentInstance:
self.waitables = Table[Waitable]()
self.waitable_sets = Table[WaitableSet]()
self.error_contexts = Table[ErrorContext]()
self.num_tasks = 0
self.may_leave = True
self.backpressure = False
self.interruptible = asyncio.Event()
self.interruptible.set()
self.calling_sync_export = False
self.calling_sync_import = False
self.pending_tasks = []
self.starting_pending_task = False
```
Expand Down Expand Up @@ -506,8 +505,7 @@ point can start running:
assert(self.may_enter(self) and self.inst.starting_pending_task)
self.inst.starting_pending_task = False
if self.opts.sync:
self.inst.interruptible.clear()
self.inst.num_tasks += 1
self.inst.calling_sync_export = True
cx = LiftLowerContext(self.opts, self.inst, self)
return lower_flat_values(cx, MAX_FLAT_PARAMS, on_start(), self.ft.param_types())
```
Expand All @@ -526,11 +524,8 @@ per-`ComponentInstance` `pending_tasks` queue and blocking until released by
disjunct ensures fairness, preventing continual new tasks from starving pending
tasks.

The `interruptible` boolean is kept clear when executing a synchronously-lifted
export to prevent new asynchronous export calls from starting. The `num_tasks`
counter tracks the number of tasks live in a component instances and is used by
`may_enter` to prevent a synchronous task from starting when there are already
active tasks.
The `calling_sync_export` flag set by `enter` (and cleared by `exit`) is used
by `may_enter` to prevent sync-lifted export calls from overlapping.

Once all the guards and bookkeeping has been done, the `enter` method lowers
the given arguments into the callee's memory (possibly executing `realloc`)
Expand Down Expand Up @@ -570,18 +565,23 @@ The `Task.may_enter` method called by `enter` blocks a new task from starting
for three reasons:
* The core wasm code has explicitly indicated that it is overloaded by calling
`task.backpressure` to set the `backpressure` field of `Task`.
* The component instance is not `interruptible` due to the current task
executing synchronous core wasm code or making a synchronous import call from
asynchronous core wasm code.
* The new task wants to execute synchronous core wasm code, but there are
currently asynchronous tasks executing (if a component mixes both kinds
for whatever strange reason).
* The component instance is currently blocked on a synchronous call from core
wasm into a Canonical ABI built-in and is thus not currently in a reentrant
state.
* The current pending call is to a synchronously-lifted export and there is
already a synchronously-lifted export in progress.
```python
def may_enter(self, pending_task):
return not self.inst.backpressure and \
self.inst.interruptible.is_set() and \
not (pending_task.opts.sync and self.inst.num_tasks > 0)
not self.inst.calling_sync_import and \
not (self.inst.calling_sync_export and pending_task.opts.sync)
```
Notably, the above definition of `may_enter` only prevents *synchronously*
lifted tasks from overlapping. *Asynchronously* lifted tasks are allowed to
freely overlap (with each other and synchronously-lifted tasks). This allows
purely-synchronous toolchains to stay simple and ignore asynchrony while
enabling more-advanced hybrid use cases (such as "background tasks"), putting
the burden of not interfering with the synchronous tasks on the toolchain.

The `Task.maybe_start_pending_task` method unblocks pending tasks enqueued by
`enter` above once `may_enter` is true for the pending task. One key property
Expand All @@ -598,13 +598,18 @@ specific points (`wait_on` and `exit`, below) where the core wasm code has had
the opportunity to re-enable backpressure if need be.
```python
def maybe_start_pending_task(self):
if self.inst.pending_tasks and not self.inst.starting_pending_task:
pending_task, pending_future = self.inst.pending_tasks[0]
if self.inst.starting_pending_task:
return
for i,(pending_task,pending_future) in enumerate(self.inst.pending_tasks):
if self.may_enter(pending_task):
self.inst.pending_tasks.pop(0)
self.inst.pending_tasks.pop(i)
self.inst.starting_pending_task = True
pending_future.set_result(None)
return
```
Notably, the loop in `maybe_start_pending_task` allows pending async tasks to
start even when there is a blocked pending sync task ahead of them in the
`pending_tasks` queue.

The `Task.yield_` method is called by `canon yield` or, when a `callback`
is used, when core wasm returns the "yield" code to the event loop. Yielding
Expand All @@ -618,48 +623,55 @@ for any external I/O as emulated in the Python definition by waiting on

The `Task.wait_on` method defines how to block the current task until a given
Python awaitable is resolved. By calling the `on_block` callback, `wait_on`
allows other tasks to execute before the current task resumes. If the `sync`
boolean parameter is true, then only tasks in *other* component instances may
allows other tasks to execute before the current task resumes, but *which*
other tasks depends on the `sync` boolean parameter:

When blocking *synchronously*, only tasks in *other* component instances may
execute; the current component instance must not observe any interleaved
execution. This is achieved by clearing the `interruptible` flag before calling
`on_block`. When blocking asynchronously, other tasks in the *same* component
instance may execute. In this case, it's possible that the other task performed
`wait_on(sync=True)`, which cleared `interruptible`, in which case *this* task
must wait until `interruptible` reenabled. Lastly, if `wait_on` is called from
a synchronously-lifted function, then `interruptible` will *already* be clear,
in which case we simply leave it clear before and after calling `on_block`.
execution. This is achieved by setting `calling_sync_import` during
`on_block`, which is checked by both `may_enter` (to prevent reentrance) as
well as `wait_on` itself in the asynchronous case (to prevent resumption of
already-started async tasks).

When blocking *asynchronously*, other tasks in the *same* component instance
may execute. In this case, it's possible that one of those other tasks
performed a synchronous `wait_on` (which set `calling_sync_import`) in which
case *this* task must wait until it is reenabled. This reenablement is
signalled asynchronously via the `async_waiting_tasks` [`asyncio.Condition`].
```python
async_waiting_tasks = asyncio.Condition(current)

async def wait_on(self, awaitable, sync):
if self.inst.interruptible.is_set():
if sync:
self.inst.interruptible.clear()
v = await self.on_block(awaitable)
self.inst.interruptible.set()
else:
self.maybe_start_pending_task()
v = await self.on_block(awaitable)
while not self.inst.interruptible.is_set():
await self.on_block(self.inst.interruptible.wait())
assert(not self.inst.calling_sync_import)
if sync:
self.inst.calling_sync_import = True
v = await self.on_block(awaitable)
self.inst.calling_sync_import = False
self.async_waiting_tasks.notify_all()
else:
self.maybe_start_pending_task()
v = await self.on_block(awaitable)
while self.inst.calling_sync_import:
Task.current.release()
await self.async_waiting_tasks.wait()
return v
```

The `Task.call_sync` method is used by `canon_lower` to make a synchronous call
to `callee`. Since this is a synchronous call, the current task clears
`interruptible` (just like `wait_on`) so that no other tasks in the current
component instance can execute in the interim. Whether or not we are "blocking"
here depends on whether the *callee* blocks, so we pass the current task's
`on_block` callback (that our caller passed us) to the callee, so that the
callee will effectively call `on_block` (or not) on our behalf.
to `callee` given `*args` and works just like the `sync` case of `wait_on`
above except that `call_sync` avoids unconditionally blocking (by calling
`on_block`). Instead, the caller simply passes its own `on_call` callback to
the callee, so that the caller blocks iff the callee blocks. This means that
N-deep synchronous callstacks avoid the overhead of async calls if none of the
calls in the stack actually block on external I/O.
```python
async def call_sync(self, callee, *args):
if self.inst.interruptible.is_set():
self.inst.interruptible.clear()
await callee(*args, self.on_block)
self.inst.interruptible.set()
else:
await callee(*args, self.on_block)
assert(not self.inst.calling_sync_import)
self.inst.calling_sync_import = True
v = await callee(*args, self.on_block)
self.inst.calling_sync_import = False
self.async_waiting_tasks.notify_all()
return v
```

The `Task.call_async` method is used by `canon_lower` to make an asynchronous
Expand Down Expand Up @@ -749,20 +761,16 @@ intends to exit. This method guards that the various obligations of the callee
implied by the Canonical ABI have in fact been met and also performs final
bookkeeping that matches initial bookkeeping performed by `enter`. Lastly, when
a `Task` exits, it attempts to start another pending task which, in particular,
may have just been unblocked by the preceding clearing of `interruptible` or
decrement of `num_tasks`.
may be a synchronous task unblocked by the clearing of `calling_sync_export`.
```python
def exit(self):
assert(Task.current.locked())
trap_if(self.num_subtasks > 0)
trap_if(self.on_return)
assert(self.num_borrows == 0)
trap_if(self.inst.num_tasks == 1 and self.inst.backpressure)
self.inst.num_tasks -= 1
assert(self.inst.num_tasks >= 0)
if self.opts.sync:
assert(not self.inst.interruptible.is_set())
self.inst.interruptible.set()
assert(self.inst.calling_sync_export)
self.inst.calling_sync_export = False
self.maybe_start_pending_task()
```

Expand Down Expand Up @@ -3855,7 +3863,8 @@ def canon_thread_available_parallelism():
[Shared-Everything Threads]: https://github.com/WebAssembly/shared-everything-threads/blob/main/proposals/shared-everything-threads/Overview.md

[`asyncio`]: https://docs.python.org/3/library/asyncio.html
[`asyncio.Event`]: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Event
[`asyncio.Event`]: https://docs.python.org/3/library/asyncio-sync.html#event
[`asyncio.Condition`]: https://docs.python.org/3/library/asyncio-sync.html#condition

[OIO]: https://en.wikipedia.org/wiki/Overlapped_I/O
[io_uring]: https://en.wikipedia.org/wiki/Io_uring
65 changes: 32 additions & 33 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,10 @@ class ComponentInstance:
waitables: Table[Waitable]
waitable_sets: Table[WaitableSet]
error_contexts: Table[ErrorContext]
num_tasks: int
may_leave: bool
backpressure: bool
interruptible: asyncio.Event
calling_sync_export: bool
calling_sync_import: bool
pending_tasks: list[tuple[Task, asyncio.Future]]
starting_pending_task: bool

Expand All @@ -220,11 +220,10 @@ def __init__(self):
self.waitables = Table[Waitable]()
self.waitable_sets = Table[WaitableSet]()
self.error_contexts = Table[ErrorContext]()
self.num_tasks = 0
self.may_leave = True
self.backpressure = False
self.interruptible = asyncio.Event()
self.interruptible.set()
self.calling_sync_export = False
self.calling_sync_import = False
self.pending_tasks = []
self.starting_pending_task = False

Expand Down Expand Up @@ -404,8 +403,7 @@ async def enter(self, on_start):
assert(self.may_enter(self) and self.inst.starting_pending_task)
self.inst.starting_pending_task = False
if self.opts.sync:
self.inst.interruptible.clear()
self.inst.num_tasks += 1
self.inst.calling_sync_export = True
cx = LiftLowerContext(self.opts, self.inst, self)
return lower_flat_values(cx, MAX_FLAT_PARAMS, on_start(), self.ft.param_types())

Expand All @@ -417,42 +415,46 @@ def trap_if_on_the_stack(self, inst):

def may_enter(self, pending_task):
return not self.inst.backpressure and \
self.inst.interruptible.is_set() and \
not (pending_task.opts.sync and self.inst.num_tasks > 0)
not self.inst.calling_sync_import and \
not (self.inst.calling_sync_export and pending_task.opts.sync)

def maybe_start_pending_task(self):
if self.inst.pending_tasks and not self.inst.starting_pending_task:
pending_task, pending_future = self.inst.pending_tasks[0]
if self.inst.starting_pending_task:
return
for i,(pending_task,pending_future) in enumerate(self.inst.pending_tasks):
if self.may_enter(pending_task):
self.inst.pending_tasks.pop(0)
self.inst.pending_tasks.pop(i)
self.inst.starting_pending_task = True
pending_future.set_result(None)
return

async def yield_(self, sync):
await self.wait_on(asyncio.sleep(0), sync)

async_waiting_tasks = asyncio.Condition(current)

async def wait_on(self, awaitable, sync):
if self.inst.interruptible.is_set():
if sync:
self.inst.interruptible.clear()
v = await self.on_block(awaitable)
self.inst.interruptible.set()
else:
self.maybe_start_pending_task()
v = await self.on_block(awaitable)
while not self.inst.interruptible.is_set():
await self.on_block(self.inst.interruptible.wait())
assert(not self.inst.calling_sync_import)
if sync:
self.inst.calling_sync_import = True
v = await self.on_block(awaitable)
self.inst.calling_sync_import = False
self.async_waiting_tasks.notify_all()
else:
self.maybe_start_pending_task()
v = await self.on_block(awaitable)
while self.inst.calling_sync_import:
Task.current.release()
await self.async_waiting_tasks.wait()
return v

async def call_sync(self, callee, *args):
if self.inst.interruptible.is_set():
self.inst.interruptible.clear()
await callee(*args, self.on_block)
self.inst.interruptible.set()
else:
await callee(*args, self.on_block)
assert(not self.inst.calling_sync_import)
self.inst.calling_sync_import = True
v = await callee(*args, self.on_block)
self.inst.calling_sync_import = False
self.async_waiting_tasks.notify_all()
return v

async def call_async(self, callee, *args):
ret = asyncio.Future()
Expand Down Expand Up @@ -491,12 +493,9 @@ def exit(self):
trap_if(self.num_subtasks > 0)
trap_if(self.on_return)
assert(self.num_borrows == 0)
trap_if(self.inst.num_tasks == 1 and self.inst.backpressure)
self.inst.num_tasks -= 1
assert(self.inst.num_tasks >= 0)
if self.opts.sync:
assert(not self.inst.interruptible.is_set())
self.inst.interruptible.set()
assert(self.inst.calling_sync_export)
self.inst.calling_sync_export = False
self.maybe_start_pending_task()

#### Waitable State
Expand Down

0 comments on commit 9c5bf18

Please sign in to comment.