diff --git a/crates/bindings-csharp/Runtime/Exceptions.cs b/crates/bindings-csharp/Runtime/Exceptions.cs index 4db463c70a..be6c287798 100644 --- a/crates/bindings-csharp/Runtime/Exceptions.cs +++ b/crates/bindings-csharp/Runtime/Exceptions.cs @@ -27,6 +27,11 @@ public class BufferTooSmallException : StdbException public override string Message => "The provided buffer is not large enough to store the data"; } +public class NoSuchBytesException : StdbException +{ + public override string Message => "The provided bytes source or sink does not exist"; +} + public class UnknownException : StdbException { private readonly FFI.CheckedStatus.Errno code; diff --git a/crates/bindings-csharp/Runtime/Internal/FFI.cs b/crates/bindings-csharp/Runtime/Internal/FFI.cs index dc08161263..03374ee7e9 100644 --- a/crates/bindings-csharp/Runtime/Internal/FFI.cs +++ b/crates/bindings-csharp/Runtime/Internal/FFI.cs @@ -3,6 +3,15 @@ namespace SpacetimeDB.Internal; using System.Runtime.InteropServices; using System.Runtime.InteropServices.Marshalling; +// This type is outside of the hidden `FFI` class because for now we need to do some public +// forwarding in the codegen for `__describe_module__` and `__call_reducer__` exports which both +// use this type. +[StructLayout(LayoutKind.Sequential)] +public readonly record struct BytesSource(uint Handle) +{ + public static readonly BytesSource INVALID = new(0); +} + // This type is outside of the hidden `FFI` class because for now we need to do some public // forwarding in the codegen for `__describe_module__` and `__call_reducer__` exports which both // use this type. @@ -49,6 +58,7 @@ public enum Errno : ushort LOOKUP_NOT_FOUND = 2, UNIQUE_ALREADY_EXISTS = 3, BUFFER_TOO_SMALL = 4, + NO_SUCH_BYTES = 8, } // This custom marshaller takes care of checking the status code @@ -75,6 +85,7 @@ public static CheckedStatus ConvertToManaged(Errno status) Errno.LOOKUP_NOT_FOUND => new LookupNotFoundException(), Errno.UNIQUE_ALREADY_EXISTS => new UniqueAlreadyExistsException(), Errno.BUFFER_TOO_SMALL => new BufferTooSmallException(), + Errno.NO_SUCH_BYTES => new NoSuchBytesException(), _ => new UnknownException(status), }; } @@ -163,7 +174,7 @@ out RowIter out_ [LibraryImport(StdbNamespace)] public static partial CheckedStatus _iter_advance( RowIter iter_handle, - [MarshalUsing(CountElementName = nameof(buffer_len))] [Out] byte[] buffer, + [MarshalUsing(CountElementName = nameof(buffer_len))][Out] byte[] buffer, ref uint buffer_len ); @@ -183,13 +194,10 @@ uint message_len ); [LibraryImport(StdbNamespace)] - public static partial uint _buffer_len(Buffer buf_handle); - - [LibraryImport(StdbNamespace)] - public static partial void _buffer_consume( - Buffer buf_handle, - [MarshalUsing(CountElementName = nameof(dst_len))] [Out] byte[] dst, - uint dst_len + public static partial short _bytes_source_read( + BytesSource source, + Span buffer, + ref uint buffer_len_ptr ); [LibraryImport(StdbNamespace)] diff --git a/crates/bindings-csharp/Runtime/Internal/Module.cs b/crates/bindings-csharp/Runtime/Internal/Module.cs index 2bba4d0c10..d4ffe2b275 100644 --- a/crates/bindings-csharp/Runtime/Internal/Module.cs +++ b/crates/bindings-csharp/Runtime/Internal/Module.cs @@ -3,6 +3,7 @@ namespace SpacetimeDB.Internal; using SpacetimeDB; using SpacetimeDB.BSATN; using System; +using System.Diagnostics; using System.Runtime.InteropServices; public static partial class Module @@ -219,12 +220,42 @@ public static void RegisterTable() moduleDef.RegisterTable(T.MakeTableDesc(typeRegistrar)); } - private static byte[] Consume(this Buffer buffer) + private static byte[] Consume(this BytesSource source) { - var len = FFI._buffer_len(buffer); - var result = new byte[len]; - FFI._buffer_consume(buffer, result, len); - return result; + if (source == BytesSource.INVALID) + { + return []; + } + var buffer = new byte[0x20_000]; + var written = 0U; + while (true) + { + // Write into the spare capacity of the buffer. + var spare = buffer.AsSpan((int)written); + var buf_len = (uint)spare.Length; + var ret = FFI._bytes_source_read(source, spare, ref buf_len); + written += buf_len; + switch (ret) + { + // Host side source exhausted, we're done. + case -1: + Array.Resize(ref buffer, (int)written); + return buffer; + // Wrote the entire spare capacity. + // Need to reserve more space in the buffer. + case 0 when written == buffer.Length: + Array.Resize(ref buffer, buffer.Length + 1024); + break; + // Host didn't write as much as possible. + // Try to read some more. + // The host will likely not trigger this branch (current host doesn't), + // but a module should be prepared for it. + case 0: + break; + default: + throw new UnreachableException(); + } + } } #pragma warning disable IDE1006 // Naming Styles - methods below are meant for FFI. @@ -257,7 +288,7 @@ public static Buffer __call_reducer__( ulong address_0, ulong address_1, DateTimeOffsetRepr timestamp, - Buffer args + BytesSource args ) { // Piece together the sender identity. @@ -269,6 +300,7 @@ Buffer args try { Runtime.Random = new((int)timestamp.MicrosecondsSinceEpoch); + using var stream = new MemoryStream(args.Consume()); using var reader = new BinaryReader(stream); reducers[(int)id].Invoke(reader, new(sender, address, timestamp.ToStd())); diff --git a/crates/bindings-csharp/Runtime/bindings.c b/crates/bindings-csharp/Runtime/bindings.c index 6c51f74d2d..74607aa694 100644 --- a/crates/bindings-csharp/Runtime/bindings.c +++ b/crates/bindings-csharp/Runtime/bindings.c @@ -19,6 +19,7 @@ OPAQUE_TYPEDEF(ColId, uint16_t); OPAQUE_TYPEDEF(IndexType, uint8_t); OPAQUE_TYPEDEF(LogLevel, uint8_t); OPAQUE_TYPEDEF(Buffer, uint32_t); +OPAQUE_TYPEDEF(BytesSource, uint32_t); OPAQUE_TYPEDEF(RowIter, uint32_t); #define CSTR(s) (uint8_t*)s, sizeof(s) - 1 @@ -68,9 +69,8 @@ IMPORT(Status, _iter_advance, (RowIter iter, uint8_t* buffer, size_t* buffer_len), (iter, buffer, buffer_len)); IMPORT(void, _iter_drop, (RowIter iter), (iter)); -IMPORT(uint32_t, _buffer_len, (Buffer buf), (buf)); -IMPORT(void, _buffer_consume, (Buffer buf, uint8_t* dst, uint32_t dst_len), - (buf, dst, dst_len)); +IMPORT(int16_t, _bytes_source_read, (BytesSource source, uint8_t* buffer_ptr, size_t* buffer_len_ptr), + (source, buffer_ptr, buffer_len_ptr)); IMPORT(Buffer, _buffer_alloc, (const uint8_t* data, uint32_t len), (data, len)); #ifndef EXPERIMENTAL_WASM_AOT diff --git a/crates/bindings-csharp/Runtime/build/SpacetimeDB.Runtime.targets b/crates/bindings-csharp/Runtime/build/SpacetimeDB.Runtime.targets index 1d94ac087a..1a779f6e18 100644 --- a/crates/bindings-csharp/Runtime/build/SpacetimeDB.Runtime.targets +++ b/crates/bindings-csharp/Runtime/build/SpacetimeDB.Runtime.targets @@ -14,8 +14,7 @@ - - + diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 27f2e2eb80..f020457f8e 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -8,8 +8,6 @@ use core::mem::MaybeUninit; use core::num::NonZeroU16; use std::ptr; -use alloc::boxed::Box; - use spacetimedb_primitives::{errno, errnos, ColId, TableId}; /// Provides a raw set of sys calls which abstractions can be built atop of. @@ -252,6 +250,71 @@ pub mod raw { /// Traps if `data + data_len` overflows a 64-bit integer. pub fn _buffer_alloc(data: *const u8, data_len: usize) -> Buffer; + /// Reads bytes from `source`, registered in the host environment, + /// and stores them in the memory pointed to by `buffer = buffer_ptr[..buffer_len]`. + /// + /// The `buffer_len = buffer_len_ptr[..size_of::()]` stores the capacity of `buffer`. + /// On success (`0` or `-1` is returned), + /// `buffer_len` is set to the number of bytes written to `buffer`. + /// When `-1` is returned, the resource has been exhausted + /// and there are no more bytes to read, + /// leading to the resource being immediately destroyed. + /// Note that the host is free to reuse allocations in a pool, + /// destroying the handle logically does not entail that memory is necessarily reclaimed. + /// + /// # Traps + /// + /// Traps if: + /// + /// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory. + /// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory. + /// + /// # Errors + /// + /// Returns an error: + /// + /// - `NO_SUCH_BYTES`, when `source` is not a valid bytes source. + /// + /// # Example + /// + /// The typical use case for this ABI is in `__call_reducer__`, + /// to read and deserialize the `args`. + /// An example definition, dealing with `args` might be: + /// ```rust,ignore + /// /// #[no_mangle] + /// extern "C" fn __call_reducer__(..., args: BytesSource, ...) -> i16 { + /// // ... + /// + /// let mut buf = Vec::::with_capacity(1024); + /// loop { + /// // Write into the spare capacity of the buffer. + /// let buf_ptr = buf.spare_capacity_mut(); + /// let spare_len = buf_ptr.len(); + /// let mut buf_len = buf_ptr.len(); + /// let buf_ptr = buf_ptr.as_mut_ptr().cast(); + /// let ret = unsafe { bytes_source_read(args, buf_ptr, &mut buf_len) }; + /// // SAFETY: `bytes_source_read` just appended `spare_len` bytes to `buf`. + /// unsafe { buf.set_len(buf.len() + spare_len) }; + /// match ret { + /// // Host side source exhausted, we're done. + /// -1 => break, + /// // Wrote the entire spare capacity. + /// // Need to reserve more space in the buffer. + /// 0 if spare_len == buf_len => buf.reserve(1024), + /// // Host didn't write as much as possible. + /// // Try to read some more. + /// // The host will likely not trigger this branch, + /// // but a module should be prepared for it. + /// 0 => {} + /// _ => unreachable!(), + /// } + /// } + /// + /// // ... + /// } + /// ``` + pub fn _bytes_source_read(source: BytesSource, buffer_ptr: *mut u8, buffer_len_ptr: *mut usize) -> i16; + /// Begin a timing span. /// /// When the returned `u32` span ID is passed to [`_span_end`], @@ -298,6 +361,18 @@ pub mod raw { /// A panic level is emitted just before a fatal error causes the WASM module to trap. pub const LOG_LEVEL_PANIC: u8 = 101; + /// A handle into a buffer of bytes in the host environment that can be read from. + /// + /// Used for transporting bytes from host to WASM linear memory. + #[derive(PartialEq, Eq, Copy, Clone)] + #[repr(transparent)] + pub struct BytesSource(u32); + + impl BytesSource { + /// An invalid handle, used e.g., when the reducer arguments were empty. + pub const INVALID: Self = Self(0); + } + /// A handle into a buffer of bytes in the host environment. /// /// Used for transporting bytes host <-> WASM linear memory. @@ -632,47 +707,6 @@ impl Buffer { raw: raw::INVALID_BUFFER, }; - /// Returns the number of bytes of the data stored in the buffer. - pub fn data_len(&self) -> usize { - unsafe { raw::_buffer_len(self.raw) } - } - - /// Read the contents of the buffer into the provided Vec. - /// The Vec is cleared in the process. - pub fn read_into(self, buf: &mut Vec) { - let data_len = self.data_len(); - buf.clear(); - buf.reserve(data_len); - self.read_uninit(&mut buf.spare_capacity_mut()[..data_len]); - // SAFETY: We just wrote `data_len` bytes into `buf`. - unsafe { buf.set_len(data_len) }; - } - - /// Read the contents of the buffer into a new boxed byte slice. - pub fn read(self) -> Box<[u8]> { - let mut buf = alloc::vec::Vec::new(); - self.read_into(&mut buf); - buf.into_boxed_slice() - } - - /// Read the contents of the buffer into an array of fixed size `N`. - /// - /// If the length is wrong, the module will crash. - pub fn read_array(self) -> [u8; N] { - // use MaybeUninit::uninit_array once stable - let mut arr = unsafe { MaybeUninit::<[MaybeUninit; N]>::uninit().assume_init() }; - self.read_uninit(&mut arr); - // use MaybeUninit::array_assume_init once stable - unsafe { (&arr as *const [_; N]).cast::<[u8; N]>().read() } - } - - /// Reads the buffer into an uninitialized byte string `buf`. - /// - /// The module will crash if `buf`'s length doesn't match the buffer. - pub fn read_uninit(self, buf: &mut [MaybeUninit]) { - unsafe { raw::_buffer_consume(self.raw, buf.as_mut_ptr().cast(), buf.len()) } - } - /// Allocates a buffer with the contents of `data`. pub fn alloc(data: &[u8]) -> Self { let raw = unsafe { raw::_buffer_alloc(data.as_ptr(), data.len()) }; diff --git a/crates/bindings/src/rt.rs b/crates/bindings/src/rt.rs index 740ce8aacc..244d66f2a2 100644 --- a/crates/bindings/src/rt.rs +++ b/crates/bindings/src/rt.rs @@ -4,10 +4,11 @@ use std::fmt; use std::marker::PhantomData; use std::sync::{Mutex, OnceLock}; use std::time::Duration; +use sys::raw::BytesSource; use sys::Buffer; use crate::timestamp::with_timestamp_set; -use crate::{sys, ReducerContext, SpacetimeType, TableType, Timestamp}; +use crate::{return_iter_buf, sys, take_iter_buf, ReducerContext, SpacetimeType, TableType, Timestamp}; use spacetimedb_lib::db::auth::StTableType; use spacetimedb_lib::db::raw_def::{ RawColumnDefV8, RawConstraintDefV8, RawIndexDefV8, RawSequenceDefV8, RawTableDefV8, @@ -400,7 +401,7 @@ extern "C" fn __describe_module__() -> Buffer { Buffer::alloc(&bytes) } -// TODO(1.0): update `__call_reducer__` docs + for `BytesSource` & `BytesSink`. +// TODO(1.0): update `__call_reducer__` docs + for `BytesSink`. /// Called by the host to execute a reducer /// when the `sender` calls the reducer identified by `id` at `timestamp` with `args`. @@ -417,6 +418,11 @@ extern "C" fn __describe_module__() -> Buffer { /// - `address_0` contains bytes `[0 ..8 ]`. /// - `address_1` contains bytes `[8 ..16]`. /// +/// The `args` is a `BytesSource`, registered on the host side, +/// which can be read with `bytes_source_read`. +/// The contents of the buffer are the BSATN-encoding of the arguments to the reducer. +/// In the case of empty arguments, `args` will be 0, that is, invalid. +/// /// The result of the reducer is written into a fresh buffer. #[no_mangle] extern "C" fn __call_reducer__( @@ -428,7 +434,7 @@ extern "C" fn __call_reducer__( address_0: u64, address_1: u64, timestamp: u64, - args: Buffer, + args: BytesSource, ) -> Buffer { // Piece together `sender_i` into an `Identity`. let sender = [sender_0, sender_1, sender_2, sender_3]; @@ -451,8 +457,59 @@ extern "C" fn __call_reducer__( }; let reducers = REDUCERS.get().unwrap(); - let args = args.read(); - reducers[id](ctx, &args) + with_read_args(args, |args| reducers[id](ctx, args)) +} + +/// Run `logic` with `args` read from the host into a `&[u8]`. +fn with_read_args(args: BytesSource, logic: impl FnOnce(&[u8]) -> R) -> R { + if args == BytesSource::INVALID { + return logic(&[]); + } + + // Steal an iteration row buffer. + // These were not meant for this purpose, + // but it's likely we have one sitting around being unused at this point, + // so use it to avoid allocating a temporary buffer if possible. + // And if we do allocate a temporary buffer now, it will likely be reused later. + let mut buf = take_iter_buf(); + + // Read `args` and run `logic`. + read_bytes_source_into(args, &mut buf); + let ret = logic(&buf); + + // Return the `buf` back to the pool. + // Should a panic occur before reaching here, + // the WASM module cannot recover and will trap, + // so we don't need to care that this is not returned to the pool. + return_iter_buf(buf); + ret +} + +/// Read `source` from the host fully into `buf`. +fn read_bytes_source_into(source: BytesSource, buf: &mut Vec) { + loop { + // Write into the spare capacity of the buffer. + let buf_ptr = buf.spare_capacity_mut(); + let spare_len = buf_ptr.len(); + let mut buf_len = buf_ptr.len(); + let buf_ptr = buf_ptr.as_mut_ptr().cast(); + let ret = unsafe { sys::raw::_bytes_source_read(source, buf_ptr, &mut buf_len) }; + // SAFETY: `bytes_source_read` just appended `spare_len` bytes to `buf`. + unsafe { buf.set_len(buf.len() + spare_len) }; + match ret { + // Host side source exhausted, we're done. + -1 => break, + // Wrote the entire spare capacity. + // Need to reserve more space in the buffer. + 0 if spare_len == buf_len => buf.reserve(1024), + // Host didn't write as much as possible. + // Try to read some more. + // The host will likely not trigger this branch (current host doesn't), + // but a module should be prepared for it. + 0 => {} + _ => unreachable!(), + } + } } #[macro_export] diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 8761735ffa..88ae8febbd 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -141,9 +141,10 @@ fn from_json_seed<'de, T: serde::de::DeserializeSeed<'de>>(s: &'de str, seed: T) /// Tags for each call that a `WasmInstanceEnv` can make. #[derive(Debug, Display, Enum, Clone, Copy, strum::AsRefStr)] pub enum AbiCall { + BytesSourceRead, + CancelReducer, ConsoleLog, - CreateIndex, DeleteByColEq, DeleteByRel, GetTableId, diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index 8e214e39cc..ae27ba248f 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -287,10 +287,6 @@ impl ResourceSlab { I::from_u32(idx) } - pub fn get(&self, handle: I) -> Option<&I::Resource> { - self.slab.get(handle.to_u32() as usize) - } - pub fn get_mut(&mut self, handle: I) -> Option<&mut I::Resource> { self.slab.get_mut(handle.to_u32() as usize) } @@ -365,8 +361,7 @@ macro_rules! abi_funcs { ($mac:ident) => { $mac! { "spacetime_10.0"::buffer_alloc, - "spacetime_10.0"::buffer_consume, - "spacetime_10.0"::buffer_len, + "spacetime_10.0"::bytes_source_read, "spacetime_10.0"::console_log, "spacetime_10.0"::delete_by_col_eq, "spacetime_10.0"::delete_by_rel, diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 3196d4f9d4..0873091068 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -49,6 +49,10 @@ pub(super) struct WasmInstanceEnv { /// always be `Some`. mem: Option, + /// The arguments being passed to a reducer + /// that it can read via [`Self::bytes_source_read`]. + call_reducer_args: Option<(bytes::Bytes, usize)>, + /// The slab of `Buffers` created for this instance. buffers: Buffers, @@ -71,6 +75,8 @@ pub(super) struct WasmInstanceEnv { reducer_name: String, } +const CALL_REDUCER_ARGS_SOURCE: u32 = 1; + type WasmResult = Result; type RtResult = anyhow::Result; @@ -83,6 +89,7 @@ impl WasmInstanceEnv { Self { instance_env, mem: None, + call_reducer_args: None, buffers: Default::default(), iters: Default::default(), timing_spans: Default::default(), @@ -119,22 +126,30 @@ impl WasmInstanceEnv { self.buffers.take(idx) } - /// Take ownership of the given `data` and give back a `BufferIdx` - /// as a handle to that data. - pub fn insert_buffer(&mut self, data: bytes::Bytes) -> BufferIdx { - self.buffers.insert(data) - } - /// Signal to this `WasmInstanceEnv` that a reducer call is beginning. - pub fn start_reducer(&mut self, name: &str) { + /// + /// Returns the handle used by reducers to read from `args`. + pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes) -> u32 { + self.call_reducer_args = (!args.is_empty()).then_some((args, 0)); + self.reducer_start = Instant::now(); name.clone_into(&mut self.reducer_name); + + // Pass an invalid source when the reducer args were empty. + // This allows the module to avoid allocating and make a system call in those cases. + if self.call_reducer_args.is_some() { + CALL_REDUCER_ARGS_SOURCE + } else { + 0 + } } /// Signal to this `WasmInstanceEnv` that a reducer call is over. /// This resets all of the state associated to a single reducer call, /// and returns instrumentation records. pub fn finish_reducer(&mut self) -> ExecutionTimings { + self.call_reducer_args = None; + // For the moment, we only explicitly clear the set of buffers and the // "syscall" times. // TODO: should we be clearing `iters` and/or `timing_spans`? @@ -156,34 +171,20 @@ impl WasmInstanceEnv { self.instance_env().get_ctx().map_err(|err| WasmError::Db(err.into())) } - /// Call the function `f` with the name `func`. - /// The function `f` is provided with the callers environment and the host's memory. - /// - /// One of `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any - /// host call, to provide consistent error handling and instrumentation. - /// - /// Some database errors are logged but are otherwise regarded as `Ok(_)`. - /// See `err_to_errno` for a list. - fn cvt( - mut caller: Caller<'_, Self>, - func: AbiCall, - f: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult<()>, - ) -> RtResult { + fn with_span(mut caller: Caller<'_, Self>, func: AbiCall, run: impl FnOnce(&mut Caller<'_, Self>) -> R) -> R { let span_start = span::CallSpanStart::new(func); - // Call `f` with the caller and a handle to the memory. - let result = f(&mut caller); + // Call `run` with the caller and a handle to the memory. + let result = run(&mut caller); // Track the span of this call. let span = span_start.end(); span::record_span(&mut caller.data_mut().call_times, span); - // Bail if there were no errors. - let Err(err) = result else { - return Ok(0); - }; + result + } - // Handle any errors. + fn convert_wasm_result>(func: AbiCall, err: WasmError) -> RtResult { Err(match err { WasmError::Db(err) => match err_to_errno(&err) { Some(errno) => { @@ -200,6 +201,40 @@ impl WasmInstanceEnv { }) } + /// Call the function `run` with the name `func`. + /// The function `run` is provided with the callers environment and the host's memory. + /// + /// One of `cvt_custom`, `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any + /// host call, to provide consistent error handling and instrumentation. + /// + /// Some database errors are logged but are otherwise regarded as `Ok(_)`. + /// See `err_to_errno` for a list. + /// + /// This variant should be used when more control is needed over the success value. + fn cvt_custom>( + caller: Caller<'_, Self>, + func: AbiCall, + run: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult, + ) -> RtResult { + Self::with_span(caller, func, run).or_else(|err| Self::convert_wasm_result(func, err)) + } + + /// Call the function `run` with the name `func`. + /// The function `run` is provided with the callers environment and the host's memory. + /// + /// One of `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any + /// host call, to provide consistent error handling and instrumentation. + /// + /// Some database errors are logged but are otherwise regarded as `Ok(_)`. + /// See `err_to_errno` for a list. + fn cvt>( + caller: Caller<'_, Self>, + func: AbiCall, + run: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult<()>, + ) -> RtResult { + Self::cvt_custom(caller, func, |c| run(c).map(|()| 0u16.into())) + } + /// Call the function `f` with any return value being written to the pointer `out`. /// /// Otherwise, `cvt_ret` (this function) behaves as `cvt`. @@ -211,11 +246,11 @@ impl WasmInstanceEnv { /// as it helps with upholding the safety invariants of [`bindings_sys::call`]. /// /// Returns an error if writing `T` to `out` errors. - fn cvt_ret( + fn cvt_ret( caller: Caller<'_, Self>, call: AbiCall, - out: WasmPtr, - f: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult, + out: WasmPtr, + f: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult, ) -> RtResult { Self::cvt(caller, call, |caller| { f(caller).and_then(|ret| { @@ -230,14 +265,8 @@ impl WasmInstanceEnv { /// This is the version of `cvt` or `cvt_ret` for functions with no return value. /// One of `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any /// host call, to provide consistent error handling and instrumentation. - fn cvt_noret(mut caller: Caller<'_, Self>, call: AbiCall, f: impl FnOnce(&mut Caller<'_, Self>)) { - let span_start = span::CallSpanStart::new(call); - - // Call `f` with the caller and a handle to the memory. - f(&mut caller); - - let span = span_start.end(); - span::record_span(&mut caller.data_mut().call_times, span); + fn cvt_noret(caller: Caller<'_, Self>, call: AbiCall, f: impl FnOnce(&mut Caller<'_, Self>)) { + Self::with_span(caller, call, f) } fn convert_u32_to_col_id(col_id: u32) -> WasmResult { @@ -606,37 +635,6 @@ impl WasmInstanceEnv { Ok(()) } - /// Returns the length (number of bytes) of buffer `bufh` without - /// transferring ownership of the data into the function. - /// - /// The `bufh` must have previously been allocating using `_buffer_alloc`. - /// - /// Returns an error if the buffer does not exist. - // #[tracing::instrument(skip_all)] - pub fn buffer_len(caller: Caller<'_, Self>, buffer: u32) -> RtResult { - caller - .data() - .buffers - .get(BufferIdx(buffer)) - .map(|b| b.len() as u32) - .context("no such buffer") - } - - /// Consumes the `buffer`, - /// moving its contents to the slice `(dst, dst_len)`. - /// - /// Returns an error if - /// - the buffer does not exist - /// - `dst + dst_len` overflows a 64-bit integer - // #[tracing::instrument(skip_all)] - pub fn buffer_consume(mut caller: Caller<'_, Self>, buffer: u32, dst: WasmPtr, dst_len: u32) -> RtResult<()> { - let (mem, env) = Self::mem_env(&mut caller); - let buf = env.take_buffer(BufferIdx(buffer)).context("no such buffer")?; - anyhow::ensure!(dst_len as usize == buf.len(), "bad length passed to buffer_consume"); - mem.deref_slice_mut(dst, dst_len)?.copy_from_slice(&buf); - Ok(()) - } - /// Creates a buffer of size `data_len` in the host environment. /// /// The contents of the byte slice pointed to by `data` @@ -653,6 +651,113 @@ impl WasmInstanceEnv { Ok(env.buffers.insert(buf.to_vec().into()).0) } + /// Reads bytes from `source`, registered in the host environment, + /// and stores them in the memory pointed to by `buffer = buffer_ptr[..buffer_len]`. + /// + /// The `buffer_len = buffer_len_ptr[..size_of::()]` stores the capacity of `buffer`. + /// On success (`0` or `-1` is returned), + /// `buffer_len` is set to the number of bytes written to `buffer`. + /// When `-1` is returned, the resource has been exhausted + /// and there are no more bytes to read, + /// leading to the resource being immediately destroyed. + /// Note that the host is free to reuse allocations in a pool, + /// destroying the handle logically does not entail that memory is necessarily reclaimed. + /// + /// # Traps + /// + /// Traps if: + /// + /// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory. + /// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory. + /// + /// # Errors + /// + /// Returns an error: + /// + /// - `NO_SUCH_BYTES`, when `source` is not a valid bytes source. + /// + /// # Example + /// + /// The typical use case for this ABI is in `__call_reducer__`, + /// to read and deserialize the `args`. + /// An example definition, dealing with `args` might be: + /// ```rust,ignore + /// /// #[no_mangle] + /// extern "C" fn __call_reducer__(..., args: BytesSource, ...) -> i16 { + /// // ... + /// + /// let mut buf = Vec::::with_capacity(1024); + /// loop { + /// // Write into the spare capacity of the buffer. + /// let buf_ptr = buf.spare_capacity_mut(); + /// let spare_len = buf_ptr.len(); + /// let mut buf_len = buf_ptr.len(); + /// let buf_ptr = buf_ptr.as_mut_ptr().cast(); + /// let ret = unsafe { bytes_source_read(args, buf_ptr, &mut buf_len) }; + /// // SAFETY: `bytes_source_read` just appended `spare_len` bytes to `buf`. + /// unsafe { buf.set_len(buf.len() + spare_len) }; + /// match ret { + /// // Host side source exhausted, we're done. + /// -1 => break, + /// // Wrote the entire spare capacity. + /// // Need to reserve more space in the buffer. + /// 0 if spare_len == buf_len => buf.reserve(1024), + /// // Host didn't write as much as possible. + /// // Try to read some more. + /// // The host will likely not trigger this branch, + /// // but a module should be prepared for it. + /// 0 => {} + /// _ => unreachable!(), + /// } + /// } + /// + /// // ... + /// } + /// ``` + pub fn bytes_source_read( + caller: Caller<'_, Self>, + source: u32, + buffer_ptr: WasmPtr, + buffer_len_ptr: WasmPtr, + ) -> RtResult { + Self::cvt_custom(caller, AbiCall::BytesSourceRead, |caller| { + let (mem, env) = Self::mem_env(caller); + + // Retrieve the reducer args if available and requested, or error. + let Some((reducer_args, cursor)) = env + .call_reducer_args + .as_mut() + .filter(|_| source == CALL_REDUCER_ARGS_SOURCE) + else { + return Ok(errno::NO_SUCH_BYTES.get().into()); + }; + + // Read `buffer_len`, i.e., the capacity of `buffer` pointed to by `buffer_ptr`. + let buffer_len = u32::read_from(mem, buffer_len_ptr)?; + // Get a mutable view to the `buffer`. + let buffer = mem.deref_slice_mut(buffer_ptr, buffer_len)?; + let buffer_len = buffer_len as usize; + + // Derive the portion that we can read and what remains, + // based on what is left to read and the capacity. + let left_to_read = &reducer_args[*cursor..]; + let can_read_len = buffer_len.min(left_to_read.len()); + let (can_read, remainder) = left_to_read.split_at(can_read_len); + // Copy to the `buffer` and write written bytes count to `buffer_len`. + buffer[..can_read_len].copy_from_slice(can_read); + (can_read_len as u32).write_to(mem, buffer_len_ptr)?; + + // Destroy the source if exhausted, or advance `cursor`. + if remainder.is_empty() { + env.call_reducer_args = None; + Ok(-1i32) + } else { + *cursor = can_read_len; + Ok(0) + } + }) + } + pub fn span_start(mut caller: Caller<'_, Self>, name: WasmPtr, name_len: u32) -> RtResult { let (mem, env) = Self::mem_env(&mut caller); let name = mem.deref_slice(name, name_len)?.to_vec(); diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 823763e20a..5e786d7d66 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -187,11 +187,12 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { // otherwise, we'd return something like `used: i128::MAX - u64::MAX`, which is inaccurate. set_store_fuel(store, budget.into()); + // Prepare sender identity and address. let [sender_0, sender_1, sender_2, sender_3] = bytemuck::must_cast(*op.caller_identity.as_bytes()); let [address_0, address_1] = bytemuck::must_cast(*op.caller_address.as_slice()); - let args_buf = store.data_mut().insert_buffer(op.arg_bytes); - store.data_mut().start_reducer(op.name); + // Prepare arguments to the reducer & start timings. + let args_source = store.data_mut().start_reducer(op.name, op.arg_bytes); let call_result = self .call_reducer @@ -206,7 +207,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { address_0, address_1, op.timestamp.microseconds, - args_buf.0, + args_source, ), ) .and_then(|errbuf| { diff --git a/crates/primitives/src/errno.rs b/crates/primitives/src/errno.rs index b581e90b87..89a366ba3a 100644 --- a/crates/primitives/src/errno.rs +++ b/crates/primitives/src/errno.rs @@ -12,6 +12,7 @@ macro_rules! errnos { LOOKUP_NOT_FOUND(2, "Value or range provided not found in table"), UNIQUE_ALREADY_EXISTS(3, "Value with given unique identifier already exists"), BUFFER_TOO_SMALL(4, "The provided buffer is not large enough to store the data"), + NO_SUCH_BYTES(8, "The provided bytes source or sink is not valid"), ); }; }