Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store one fiber stack in a Store<T> #9604

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/fiber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ cfg_if::cfg_if! {
/// Represents an execution stack to use for a fiber.
pub struct FiberStack(imp::FiberStack);

fn _assert_send_sync() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}

_assert_send::<FiberStack>();
_assert_sync::<FiberStack>();
}

impl FiberStack {
/// Creates a new fiber stack of the given size.
pub fn new(size: usize) -> io::Result<Self> {
Expand Down
23 changes: 14 additions & 9 deletions crates/fiber/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@ use std::ops::Range;
use std::ptr;

pub struct FiberStack {
base: *mut u8,
base: BasePtr,
len: usize,

/// Stored here to ensure that when this `FiberStack` the backing storage,
/// if any, is additionally dropped.
storage: FiberStackStorage,
}

struct BasePtr(*mut u8);

unsafe impl Send for BasePtr {}
unsafe impl Sync for BasePtr {}

enum FiberStackStorage {
Mmap(#[allow(dead_code)] MmapFiberStack),
Unmanaged(usize),
Expand All @@ -64,7 +69,7 @@ impl FiberStack {
// region so the base and length of our stack are both offset by a
// single page.
Ok(FiberStack {
base: stack.mapping_base.wrapping_byte_add(page_size),
base: BasePtr(stack.mapping_base.wrapping_byte_add(page_size)),
len: stack.mapping_len - page_size,
storage: FiberStackStorage::Mmap(stack),
})
Expand All @@ -77,7 +82,7 @@ impl FiberStack {
return Self::from_custom(asan::new_fiber_stack(len)?);
}
Ok(FiberStack {
base: base.add(guard_size),
base: BasePtr(base.add(guard_size)),
len,
storage: FiberStackStorage::Unmanaged(guard_size),
})
Expand All @@ -101,28 +106,28 @@ impl FiberStack {
"expected fiber stack end ({end_ptr:?}) to be page aligned ({page_size:#x})",
);
Ok(FiberStack {
base: start_ptr,
base: BasePtr(start_ptr),
len: range.len(),
storage: FiberStackStorage::Custom(custom),
})
}

pub fn top(&self) -> Option<*mut u8> {
Some(self.base.wrapping_byte_add(self.len))
Some(self.base.0.wrapping_byte_add(self.len))
}

pub fn range(&self) -> Option<Range<usize>> {
let base = self.base as usize;
let base = self.base.0 as usize;
Some(base..base + self.len)
}

pub fn guard_range(&self) -> Option<Range<*mut u8>> {
match &self.storage {
FiberStackStorage::Unmanaged(guard_size) => unsafe {
let start = self.base.sub(*guard_size);
Some(start..self.base)
let start = self.base.0.sub(*guard_size);
Some(start..self.base.0)
},
FiberStackStorage::Mmap(mmap) => Some(mmap.mapping_base..self.base),
FiberStackStorage::Mmap(mmap) => Some(mmap.mapping_base..self.base.0),
FiberStackStorage::Custom(custom) => Some(custom.guard_range()),
}
}
Expand Down
49 changes: 45 additions & 4 deletions crates/wasmtime/src/runtime/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ pub struct StoreOpaque {
table_limit: usize,
#[cfg(feature = "async")]
async_state: AsyncState,

// If fuel_yield_interval is enabled, then we store the remaining fuel (that isn't in
// runtime_limits) here. The total amount of fuel is the runtime limits and reserve added
// together. Then when we run out of gas, we inject the yield amount from the reserve
Expand Down Expand Up @@ -392,6 +393,8 @@ pub struct StoreOpaque {
struct AsyncState {
current_suspend: UnsafeCell<*mut wasmtime_fiber::Suspend<Result<()>, (), Result<()>>>,
current_poll_cx: UnsafeCell<PollContext>,
/// The last fiber stack that was in use by this store.
last_fiber_stack: Option<wasmtime_fiber::FiberStack>,
}

#[cfg(feature = "async")]
Expand Down Expand Up @@ -556,6 +559,7 @@ impl<T> Store<T> {
async_state: AsyncState {
current_suspend: UnsafeCell::new(ptr::null_mut()),
current_poll_cx: UnsafeCell::new(PollContext::default()),
last_fiber_stack: None,
},
fuel_reserve: 0,
fuel_yield_interval: None,
Expand Down Expand Up @@ -2099,6 +2103,31 @@ at https://bytecodealliance.org/security.
core::ptr::null_mut()..core::ptr::null_mut()
}
}

#[cfg(feature = "async")]
fn allocate_fiber_stack(&mut self) -> Result<wasmtime_fiber::FiberStack> {
if let Some(stack) = self.async_state.last_fiber_stack.take() {
return Ok(stack);
}
self.engine().allocator().allocate_fiber_stack()
}

#[cfg(feature = "async")]
fn deallocate_fiber_stack(&mut self, stack: wasmtime_fiber::FiberStack) {
self.flush_fiber_stack();
self.async_state.last_fiber_stack = Some(stack);
}

/// Releases the last fiber stack to the underlying instance allocator, if
/// present.
fn flush_fiber_stack(&mut self) {
#[cfg(feature = "async")]
if let Some(stack) = self.async_state.last_fiber_stack.take() {
unsafe {
self.engine.allocator().deallocate_fiber_stack(stack);
}
}
}
}

impl<T> StoreContextMut<'_, T> {
Expand All @@ -2124,13 +2153,14 @@ impl<T> StoreContextMut<'_, T> {
debug_assert!(config.async_stack_size > 0);

let mut slot = None;
let future = {
let mut future = {
let current_poll_cx = self.0.async_state.current_poll_cx.get();
let current_suspend = self.0.async_state.current_suspend.get();
let stack = self.engine().allocator().allocate_fiber_stack()?;
let stack = self.0.allocate_fiber_stack()?;

let engine = self.engine().clone();
let slot = &mut slot;
let this = &mut *self;
let fiber = wasmtime_fiber::Fiber::new(stack, move |keep_going, suspend| {
// First check and see if we were interrupted/dropped, and only
// continue if we haven't been.
Expand All @@ -2148,7 +2178,7 @@ impl<T> StoreContextMut<'_, T> {
let _reset = Reset(current_suspend, *current_suspend);
*current_suspend = suspend;

*slot = Some(func(self));
*slot = Some(func(this));
Ok(())
}
})?;
Expand All @@ -2163,7 +2193,12 @@ impl<T> StoreContextMut<'_, T> {
state: Some(crate::runtime::vm::AsyncWasmCallState::new()),
}
};
future.await?;
(&mut future).await?;
let stack = future.fiber.take().map(|f| f.into_stack());
drop(future);
if let Some(stack) = stack {
self.0.deallocate_fiber_stack(stack);
}

return Ok(slot.unwrap());

Expand Down Expand Up @@ -2373,6 +2408,10 @@ impl<T> StoreContextMut<'_, T> {
// completion.
impl Drop for FiberFuture<'_> {
fn drop(&mut self) {
if self.fiber.is_none() {
return;
}

if !self.fiber().done() {
let result = self.resume(Err(anyhow!("future dropped")));
// This resumption with an error should always complete the
Expand Down Expand Up @@ -2737,6 +2776,8 @@ impl<T: fmt::Debug> fmt::Debug for Store<T> {

impl<T> Drop for Store<T> {
fn drop(&mut self) {
self.inner.flush_fiber_stack();

// for documentation on this `unsafe`, see `into_data`.
unsafe {
ManuallyDrop::drop(&mut self.inner.data);
Expand Down