From 2a9ae03bae34f3a87eb754b421b9957355b05331 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 20 Aug 2024 11:16:08 +0200 Subject: [PATCH] wasm abi: impl __call_reducer__ & __describe_module__ using bytes_sink_write --- .../server/snapshots/Module#FFI.verified.cs | 12 +- crates/bindings-csharp/Codegen/Module.cs | 10 +- crates/bindings-csharp/Runtime/Exceptions.cs | 9 +- .../bindings-csharp/Runtime/Internal/FFI.cs | 43 +++--- .../Runtime/Internal/Module.cs | 37 ++++-- crates/bindings-csharp/Runtime/bindings.c | 23 ++-- .../Runtime/build/SpacetimeDB.Runtime.targets | 2 +- crates/bindings-macro/src/lib.rs | 2 +- crates/bindings-sys/src/lib.rs | 78 +++-------- crates/bindings/src/lib.rs | 2 + crates/bindings/src/rt.rs | 118 +++++++++++------ crates/cli/src/subcommands/generate/mod.rs | 91 ++++++------- crates/core/src/host/mod.rs | 3 +- crates/core/src/host/wasm_common.rs | 27 +--- .../src/host/wasm_common/module_host_actor.rs | 4 +- crates/core/src/host/wasmtime/mod.rs | 32 ++--- .../src/host/wasmtime/wasm_instance_env.rs | 121 +++++++++++------ .../core/src/host/wasmtime/wasmtime_module.rs | 123 +++++++++--------- crates/primitives/src/errno.rs | 9 +- 19 files changed, 393 insertions(+), 353 deletions(-) diff --git a/crates/bindings-csharp/Codegen.Tests/fixtures/server/snapshots/Module#FFI.verified.cs b/crates/bindings-csharp/Codegen.Tests/fixtures/server/snapshots/Module#FFI.verified.cs index 730b9c4cf23..28285c8754e 100644 --- a/crates/bindings-csharp/Codegen.Tests/fixtures/server/snapshots/Module#FFI.verified.cs +++ b/crates/bindings-csharp/Codegen.Tests/fixtures/server/snapshots/Module#FFI.verified.cs @@ -114,11 +114,11 @@ public static void Main() // Exports only work from the main assembly, so we need to generate forwarding methods. #if EXPERIMENTAL_WASM_AOT [UnmanagedCallersOnly(EntryPoint = "__describe_module__")] - public static SpacetimeDB.Internal.Buffer __describe_module__() => - SpacetimeDB.Internal.Module.__describe_module__(); + public static void __describe_module__(SpacetimeDB.Internal.BytesSink d) => + SpacetimeDB.Internal.Module.__describe_module__(d); [UnmanagedCallersOnly(EntryPoint = "__call_reducer__")] - public static SpacetimeDB.Internal.Buffer __call_reducer__( + public static short __call_reducer__( uint id, ulong sender_0, ulong sender_1, @@ -127,7 +127,8 @@ public static SpacetimeDB.Internal.Buffer __call_reducer__( ulong address_0, ulong address_1, SpacetimeDB.Internal.DateTimeOffsetRepr timestamp, - SpacetimeDB.Internal.Buffer args + SpacetimeDB.Internal.BytesSource args, + SpacetimeDB.Internal.BytesSink error ) => SpacetimeDB.Internal.Module.__call_reducer__( id, @@ -138,7 +139,8 @@ SpacetimeDB.Internal.Buffer args address_0, address_0, timestamp, - args + args, + error ); #endif } diff --git a/crates/bindings-csharp/Codegen/Module.cs b/crates/bindings-csharp/Codegen/Module.cs index 05c38ab56f3..7ce0a0e5d35 100644 --- a/crates/bindings-csharp/Codegen/Module.cs +++ b/crates/bindings-csharp/Codegen/Module.cs @@ -443,10 +443,10 @@ public static void Main() { // Exports only work from the main assembly, so we need to generate forwarding methods. #if EXPERIMENTAL_WASM_AOT [UnmanagedCallersOnly(EntryPoint = "__describe_module__")] - public static SpacetimeDB.Internal.Buffer __describe_module__() => SpacetimeDB.Internal.Module.__describe_module__(); + public static void __describe_module__(SpacetimeDB.Internal.BytesSink d) => SpacetimeDB.Internal.Module.__describe_module__(d); [UnmanagedCallersOnly(EntryPoint = "__call_reducer__")] - public static SpacetimeDB.Internal.Buffer __call_reducer__( + public static short __call_reducer__( uint id, ulong sender_0, ulong sender_1, @@ -455,7 +455,8 @@ public static SpacetimeDB.Internal.Buffer __call_reducer__( ulong address_0, ulong address_1, SpacetimeDB.Internal.DateTimeOffsetRepr timestamp, - SpacetimeDB.Internal.Buffer args + SpacetimeDB.Internal.BytesSource args, + SpacetimeDB.Internal.BytesSink error ) => SpacetimeDB.Internal.Module.__call_reducer__( id, sender_0, @@ -465,7 +466,8 @@ SpacetimeDB.Internal.Buffer args address_0, address_0, timestamp, - args + args, + error ); #endif } diff --git a/crates/bindings-csharp/Runtime/Exceptions.cs b/crates/bindings-csharp/Runtime/Exceptions.cs index be6c287798b..3b217635e47 100644 --- a/crates/bindings-csharp/Runtime/Exceptions.cs +++ b/crates/bindings-csharp/Runtime/Exceptions.cs @@ -32,11 +32,16 @@ public class NoSuchBytesException : StdbException public override string Message => "The provided bytes source or sink does not exist"; } +public class NoSpaceException : StdbException +{ + public override string Message => "The provided bytes sink has no more room left"; +} + public class UnknownException : StdbException { - private readonly FFI.CheckedStatus.Errno code; + private readonly FFI.Errno code; - internal UnknownException(FFI.CheckedStatus.Errno code) => this.code = code; + internal UnknownException(FFI.Errno code) => this.code = code; public override string Message => $"SpacetimeDB error code {code}"; } diff --git a/crates/bindings-csharp/Runtime/Internal/FFI.cs b/crates/bindings-csharp/Runtime/Internal/FFI.cs index 03374ee7e90..6646a9620a2 100644 --- a/crates/bindings-csharp/Runtime/Internal/FFI.cs +++ b/crates/bindings-csharp/Runtime/Internal/FFI.cs @@ -16,22 +16,8 @@ public readonly record struct BytesSource(uint Handle) // forwarding in the codegen for `__describe_module__` and `__call_reducer__` exports which both // use this type. [StructLayout(LayoutKind.Sequential)] -[NativeMarshalling(typeof(Marshaller))] -public readonly record struct Buffer(uint Handle) +public readonly record struct BytesSink(uint Handle) { - public static readonly Buffer INVALID = new(uint.MaxValue); - - // We need custom marshaller for `Buffer` because we return it by value - // instead of passing an `out` reference, and C# currently doesn't match - // the common Wasm C ABI in that a struct with a single field is supposed - // to have the same ABI as the field itself. - [CustomMarshaller(typeof(Buffer), MarshalMode.Default, typeof(Marshaller))] - internal static class Marshaller - { - public static Buffer ConvertToManaged(uint buf_handle) => new(buf_handle); - - public static uint ConvertToUnmanaged(Buffer buf) => buf.Handle; - } } #pragma warning disable IDE1006 // Naming Styles - Not applicable to FFI stuff. @@ -48,19 +34,21 @@ internal static partial class FFI #endif ; + public enum Errno : ushort + { + OK = 0, + HOST_CALL_FAILURE = 1, + NO_SUCH_TABLE = 4, + LOOKUP_NOT_FOUND = 2, + NO_SUCH_BYTES = 8, + NO_SPACE = 9, + BUFFER_TOO_SMALL = 11, + UNIQUE_ALREADY_EXISTS = 12, + } + [NativeMarshalling(typeof(Marshaller))] public struct CheckedStatus { - public enum Errno : ushort - { - OK = 0, - NO_SUCH_TABLE = 1, - LOOKUP_NOT_FOUND = 2, - UNIQUE_ALREADY_EXISTS = 3, - BUFFER_TOO_SMALL = 4, - NO_SUCH_BYTES = 8, - } - // This custom marshaller takes care of checking the status code // returned from the host and throwing an exception if it's not 0. // The only reason it doesn't return `void` is because the C# compiler @@ -86,6 +74,7 @@ public static CheckedStatus ConvertToManaged(Errno status) Errno.UNIQUE_ALREADY_EXISTS => new UniqueAlreadyExistsException(), Errno.BUFFER_TOO_SMALL => new BufferTooSmallException(), Errno.NO_SUCH_BYTES => new NoSuchBytesException(), + Errno.NO_SPACE => new NoSpaceException(), _ => new UnknownException(status), }; } @@ -197,9 +186,9 @@ uint message_len public static partial short _bytes_source_read( BytesSource source, Span buffer, - ref uint buffer_len_ptr + ref uint buffer_len ); [LibraryImport(StdbNamespace)] - public static partial Buffer _buffer_alloc([In] byte[] data, uint data_len); + public static partial CheckedStatus _bytes_sink_write(BytesSink sink, Span buffer, ref uint buffer_len); } diff --git a/crates/bindings-csharp/Runtime/Internal/Module.cs b/crates/bindings-csharp/Runtime/Internal/Module.cs index d4ffe2b275c..9e60a778359 100644 --- a/crates/bindings-csharp/Runtime/Internal/Module.cs +++ b/crates/bindings-csharp/Runtime/Internal/Module.cs @@ -252,15 +252,33 @@ private static byte[] Consume(this BytesSource source) // but a module should be prepared for it. case 0: break; + case (short)(ushort)FFI.Errno.NO_SUCH_BYTES: + throw new NoSuchBytesException(); default: - throw new UnreachableException(); + throw new UnknownException((FFI.Errno)(ushort)ret); + } + } + } + + private static void Write(this BytesSink sink, byte[] bytes) + { + var start = 0U; + while (true) + { + var written = (uint)bytes.Length; + var buffer = bytes.AsSpan((int)start); + FFI._bytes_sink_write(sink, buffer, ref written); + start += written; + if (start == bytes.Length) + { + return; } } } #pragma warning disable IDE1006 // Naming Styles - methods below are meant for FFI. - public static Buffer __describe_module__() + public static void __describe_module__(BytesSink description) { // replace `module` with a temporary internal module that will register RawModuleDefV8, AlgebraicType and other internal types // during the RawModuleDefV8.GetSatsTypeInfo() instead of exposing them via user's module. @@ -269,17 +287,15 @@ public static Buffer __describe_module__() // We need this explicit cast here to make `ToBytes` understand the types correctly. RawModuleDef versioned = new RawModuleDef.V8BackCompat(moduleDef); var moduleBytes = IStructuralReadWrite.ToBytes(new RawModuleDef.BSATN(), versioned); - var res = FFI._buffer_alloc(moduleBytes, (uint)moduleBytes.Length); - return res; + description.Write(moduleBytes); } catch (Exception e) { Runtime.Log($"Error while describing the module: {e}", Runtime.LogLevel.Error); - return Buffer.INVALID; } } - public static Buffer __call_reducer__( + public static short __call_reducer__( uint id, ulong sender_0, ulong sender_1, @@ -288,7 +304,8 @@ public static Buffer __call_reducer__( ulong address_0, ulong address_1, DateTimeOffsetRepr timestamp, - BytesSource args + BytesSource args, + BytesSink error ) { // Piece together the sender identity. @@ -308,14 +325,14 @@ BytesSource args { throw new Exception("Unrecognised extra bytes in the reducer arguments"); } - return /* no exception */ - Buffer.INVALID; + return 0; /* no exception */ } catch (Exception e) { var error_str = e.ToString(); var error_bytes = System.Text.Encoding.UTF8.GetBytes(error_str); - return FFI._buffer_alloc(error_bytes, (uint)error_bytes.Length); + error.Write(error_bytes); + return (short)(ushort)FFI.Errno.HOST_CALL_FAILURE; } } } diff --git a/crates/bindings-csharp/Runtime/bindings.c b/crates/bindings-csharp/Runtime/bindings.c index 74607aa694c..c272d0ea1d5 100644 --- a/crates/bindings-csharp/Runtime/bindings.c +++ b/crates/bindings-csharp/Runtime/bindings.c @@ -18,7 +18,7 @@ OPAQUE_TYPEDEF(TableId, uint32_t); OPAQUE_TYPEDEF(ColId, uint16_t); OPAQUE_TYPEDEF(IndexType, uint8_t); OPAQUE_TYPEDEF(LogLevel, uint8_t); -OPAQUE_TYPEDEF(Buffer, uint32_t); +OPAQUE_TYPEDEF(BytesSink, uint32_t); OPAQUE_TYPEDEF(BytesSource, uint32_t); OPAQUE_TYPEDEF(RowIter, uint32_t); @@ -71,7 +71,8 @@ IMPORT(Status, _iter_advance, IMPORT(void, _iter_drop, (RowIter iter), (iter)); IMPORT(int16_t, _bytes_source_read, (BytesSource source, uint8_t* buffer_ptr, size_t* buffer_len_ptr), (source, buffer_ptr, buffer_len_ptr)); -IMPORT(Buffer, _buffer_alloc, (const uint8_t* data, uint32_t len), (data, len)); +IMPORT(uint16_t, _bytes_sink_write, (BytesSink sink, uint8_t* buffer_ptr, size_t* buffer_len_ptr), + (sink, buffer_ptr, buffer_len_ptr)); #ifndef EXPERIMENTAL_WASM_AOT static MonoClass* ffi_class; @@ -94,7 +95,7 @@ PREINIT(10, startup) { "FFI export class (SpacetimeDB.Internal.Module) not found"); } -#define EXPORT(ret, name, params, args...) \ +#define EXPORT_WITH_MONO_RES(ret, res_code, name, params, args...) \ static MonoMethod* ffi_method_##name; \ PREINIT(20, find_##name) { \ ffi_method_##name = mono_wasm_assembly_find_method(ffi_class, #name, -1); \ @@ -104,20 +105,26 @@ PREINIT(10, startup) { MonoObject* res; \ mono_wasm_invoke_method_ref(ffi_method_##name, NULL, (void*[]){args}, \ NULL, &res); \ - return *(ret*)mono_object_unbox(res); \ + res_code \ } -EXPORT(Buffer, __describe_module__, ()); +#define EXPORT(ret, name, params, args...) \ + EXPORT_WITH_MONO_RES(ret, return *(ret*)mono_object_unbox(res);, name, params, args) \ -EXPORT(Buffer, __call_reducer__, +#define EXPORT_VOID(name, params, args...) \ + EXPORT_WITH_MONO_RES(void, return;, name, params, args) \ + +EXPORT_VOID(__describe_module__, (BytesSink description), &description); + +EXPORT(int16_t, __call_reducer__, (uint32_t id, uint64_t sender_0, uint64_t sender_1, uint64_t sender_2, uint64_t sender_3, uint64_t address_0, uint64_t address_1, - uint64_t timestamp, Buffer args), + uint64_t timestamp, BytesSource args, BytesSink error), &id, &sender_0, &sender_1, &sender_2, &sender_3, &address_0, &address_1, - ×tamp, &args); + ×tamp, &args, &error); #endif // Shims to avoid dependency on WASI in the generated Wasm file. diff --git a/crates/bindings-csharp/Runtime/build/SpacetimeDB.Runtime.targets b/crates/bindings-csharp/Runtime/build/SpacetimeDB.Runtime.targets index 1a779f6e182..0b48a3ca6d7 100644 --- a/crates/bindings-csharp/Runtime/build/SpacetimeDB.Runtime.targets +++ b/crates/bindings-csharp/Runtime/build/SpacetimeDB.Runtime.targets @@ -15,7 +15,7 @@ - + diff --git a/crates/bindings-macro/src/lib.rs b/crates/bindings-macro/src/lib.rs index 68344be7e5d..d31873114aa 100644 --- a/crates/bindings-macro/src/lib.rs +++ b/crates/bindings-macro/src/lib.rs @@ -377,7 +377,7 @@ fn gen_reducer(original_function: ItemFn, reducer_name: &str) -> syn::Result spacetimedb::sys::Buffer { + fn __reducer(__ctx: spacetimedb::ReducerContext, __args: &[u8]) -> spacetimedb::ReducerResult { #(spacetimedb::rt::assert_reducer_arg::<#arg_tys>();)* #(spacetimedb::rt::assert_reducer_ret::<#ret_ty>();)* spacetimedb::rt::invoke_reducer(#func_name, __ctx, __args) diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index f020457f8e7..20729ca5fdc 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -20,20 +20,6 @@ pub mod raw { // For breaking changes, all functions should be moved into one new `spacetime_X.0` block. #[link(wasm_import_module = "spacetime_10.0")] extern "C" { - /* - /// Create a table with `name`, a UTF-8 slice in WASM memory lasting `name_len` bytes, - /// and with the table's `schema` in a slice in WASM memory lasting `schema_len` bytes. - /// - /// Writes the table id of the new table into the WASM pointer `out`. - pub fn _create_table( - name: *const u8, - name_len: usize, - schema: *const u8, - schema_len: usize, - out: *mut TableId, - ) -> u16; - */ - /// Queries the `table_id` associated with the given (table) `name` /// where `name` points to a UTF-8 slice in WASM memory of `name_len` bytes. /// @@ -223,32 +209,25 @@ pub mod raw { /// This assumes that the reducer hasn't already been executed. pub fn _cancel_reducer(id: u64); - /// Returns the length (number of bytes) of buffer `bufh` without - /// transferring ownership of the data into the function. + /// Writes up to `buffer_len` bytes from `buffer = buffer_ptr[..buffer_len]`, + /// to the `sink`, registered in the host environment. /// - /// The `bufh` must have previously been allocating using `_buffer_alloc`. + /// The `buffer_len = buffer_len_ptr[..size_of::()]` stores the capacity of `buffer`. + /// On success (`0` is returned), + /// `buffer_len` is set to the number of bytes written to `sink`. /// - /// Traps if the buffer does not exist. - pub fn _buffer_len(bufh: Buffer) -> usize; - - /// Consumes the `buffer`, - /// moving its contents to the slice `(dst, dst_len)`. + /// # Traps /// - /// Traps if - /// - the buffer does not exist - /// - `dst + dst_len` overflows a 64-bit integer - pub fn _buffer_consume(buffer: Buffer, dst: *mut u8, dst_len: usize); - - /// Creates a buffer of size `data_len` in the host environment. + /// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory. + /// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory. /// - /// The contents of the byte slice pointed to by `data` - /// and lasting `data_len` bytes - /// is written into the newly initialized buffer. + /// # Errors /// - /// The buffer is registered in the host environment and is indexed by the returned `u32`. + /// Returns an error: /// - /// Traps if `data + data_len` overflows a 64-bit integer. - pub fn _buffer_alloc(data: *const u8, data_len: usize) -> Buffer; + /// - `NO_SUCH_BYTES`, when `sink` is not a valid bytes sink. + /// - `NO_SPACE`, when there is no room for more bytes in `sink`. + pub fn _bytes_sink_write(sink: BytesSink, buffer_ptr: *const u8, buffer_len_ptr: *mut usize) -> u16; /// Reads bytes from `source`, registered in the host environment, /// and stores them in the memory pointed to by `buffer = buffer_ptr[..buffer_len]`. @@ -373,19 +352,12 @@ pub mod raw { pub const INVALID: Self = Self(0); } - /// A handle into a buffer of bytes in the host environment. + /// A handle into a buffer of bytes in the host environment that can be written to. /// - /// Used for transporting bytes host <-> WASM linear memory. + /// Used for transporting bytes from WASM linear memory to host. #[derive(PartialEq, Eq, Copy, Clone)] #[repr(transparent)] - pub struct Buffer(u32); - - /// An invalid buffer handle. - /// - /// Could happen if too many buffers exist, making the key overflow a `u32`. - /// `INVALID_BUFFER` is also used for parts of the protocol - /// that are "morally" sending a `None`s in `Option>`s. - pub const INVALID_BUFFER: Buffer = Buffer(u32::MAX); + pub struct BytesSink(u32); /// Represents table iterators. #[derive(PartialEq, Eq, Copy, Clone)] @@ -696,24 +668,6 @@ pub fn cancel_reducer(id: u64) { unsafe { raw::_cancel_reducer(id) } } -/// A RAII wrapper around [`raw::Buffer`]. -#[repr(transparent)] -pub struct Buffer { - raw: raw::Buffer, -} - -impl Buffer { - pub const INVALID: Self = Buffer { - raw: raw::INVALID_BUFFER, - }; - - /// Allocates a buffer with the contents of `data`. - pub fn alloc(data: &[u8]) -> Self { - let raw = unsafe { raw::_buffer_alloc(data.as_ptr(), data.len()) }; - Buffer { raw } - } -} - pub struct RowIter { raw: raw::RowIter, } diff --git a/crates/bindings/src/lib.rs b/crates/bindings/src/lib.rs index 006222b95b8..037c5936835 100644 --- a/crates/bindings/src/lib.rs +++ b/crates/bindings/src/lib.rs @@ -47,6 +47,8 @@ pub use timestamp::Timestamp; pub type Result = core::result::Result; +pub type ReducerResult = core::result::Result<(), Box>; + /// A context that any reducer is provided with. #[non_exhaustive] #[derive(Copy, Clone)] diff --git a/crates/bindings/src/rt.rs b/crates/bindings/src/rt.rs index 244d66f2a20..080f2f155ed 100644 --- a/crates/bindings/src/rt.rs +++ b/crates/bindings/src/rt.rs @@ -1,14 +1,7 @@ #![deny(unsafe_op_in_unsafe_fn)] -use std::fmt; -use std::marker::PhantomData; -use std::sync::{Mutex, OnceLock}; -use std::time::Duration; -use sys::raw::BytesSource; -use sys::Buffer; - use crate::timestamp::with_timestamp_set; -use crate::{return_iter_buf, sys, take_iter_buf, ReducerContext, SpacetimeType, TableType, Timestamp}; +use crate::{return_iter_buf, sys, take_iter_buf, ReducerContext, ReducerResult, SpacetimeType, TableType, Timestamp}; use spacetimedb_lib::db::auth::StTableType; use spacetimedb_lib::db::raw_def::{ RawColumnDefV8, RawConstraintDefV8, RawIndexDefV8, RawSequenceDefV8, RawTableDefV8, @@ -19,6 +12,11 @@ use spacetimedb_lib::sats::{impl_deserialize, impl_serialize, ProductTypeElement use spacetimedb_lib::ser::{Serialize, SerializeSeqProduct}; use spacetimedb_lib::{bsatn, Address, Identity, ModuleDefBuilder, RawModuleDef, ReducerDef, TableDesc}; use spacetimedb_primitives::*; +use std::fmt; +use std::marker::PhantomData; +use std::sync::{Mutex, OnceLock}; +use std::time::Duration; +use sys::raw::{BytesSink, BytesSource}; /// The `sender` invokes `reducer` at `timestamp` and provides it with the given `args`. /// @@ -28,7 +26,7 @@ pub fn invoke_reducer<'a, A: Args<'a>, T>( reducer: impl Reducer<'a, A, T>, ctx: ReducerContext, args: &'a [u8], -) -> Buffer { +) -> Result<(), Box> { // Deserialize the arguments from a bsatn encoding. let SerDeArgs(args) = bsatn::from_slice(args).expect("unable to decode args"); @@ -36,26 +34,13 @@ pub fn invoke_reducer<'a, A: Args<'a>, T>( let invoke = || reducer.invoke(ctx, args); #[cfg(feature = "rand")] let invoke = || crate::rng::with_rng_set(invoke); - let res = with_timestamp_set(ctx.timestamp, invoke); - - // Any error is pushed into a `Buffer`. - cvt_result(res) + with_timestamp_set(ctx.timestamp, invoke) } - -/// Converts `res` into a `Buffer` where `Ok(_)` results in an invalid buffer -/// and an error message is moved into a fresh buffer. -fn cvt_result(res: Result<(), Box>) -> Buffer { - match res { - Ok(()) => Buffer::INVALID, - Err(errmsg) => Buffer::alloc(errmsg.as_bytes()), - } -} - /// A trait for types representing the *execution logic* of a reducer. /// /// The type parameter `T` is used for determining whether there is a context argument. pub trait Reducer<'de, A: Args<'de>, T> { - fn invoke(&self, ctx: ReducerContext, args: A) -> Result<(), Box>; + fn invoke(&self, ctx: ReducerContext, args: A) -> ReducerResult; } /// A trait for types that can *describe* a reducer. @@ -92,18 +77,18 @@ pub trait Args<'de>: Sized { } /// A trait of types representing the result of executing a reducer. -pub trait ReducerResult { +pub trait IntoReducerResult { /// Convert the result into form where there is no value /// and the error message is a string. fn into_result(self) -> Result<(), Box>; } -impl ReducerResult for () { +impl IntoReducerResult for () { #[inline] fn into_result(self) -> Result<(), Box> { Ok(self) } } -impl ReducerResult for Result<(), E> { +impl IntoReducerResult for Result<(), E> { #[inline] fn into_result(self) -> Result<(), Box> { self.map_err(|e| format!("{e:?}").into()) @@ -116,8 +101,8 @@ impl<'de, T: Deserialize<'de>> ReducerArg<'de> for T {} impl ReducerArg<'_> for ReducerContext {} /// Assert that `T: ReducerArg`. pub fn assert_reducer_arg<'de, T: ReducerArg<'de>>() {} -/// Assert that `T: ReducerResult`. -pub fn assert_reducer_ret() {} +/// Assert that `T: IntoReducerResult`. +pub fn assert_reducer_ret() {} /// Assert that `T: TableType`. pub const fn assert_table() {} @@ -206,7 +191,7 @@ macro_rules! impl_reducer { impl<'de, Func, Ret, $($T: SpacetimeType + Deserialize<'de> + Serialize),*> Reducer<'de, ($($T,)*), ContextArg> for Func where Func: Fn(ReducerContext, $($T),*) -> Ret, - Ret: ReducerResult + Ret: IntoReducerResult { fn invoke(&self, ctx: ReducerContext, args: ($($T,)*)) -> Result<(), Box> { #[allow(non_snake_case)] @@ -219,7 +204,7 @@ macro_rules! impl_reducer { impl<'de, Func, Ret, $($T: SpacetimeType + Deserialize<'de> + Serialize),*> Reducer<'de, ($($T,)*), NoContextArg> for Func where Func: Fn($($T),*) -> Ret, - Ret: ReducerResult + Ret: IntoReducerResult { fn invoke(&self, _ctx: ReducerContext, args: ($($T,)*)) -> Result<(), Box> { #[allow(non_snake_case)] @@ -376,13 +361,28 @@ struct ModuleBuilder { // Not actually a mutex; because WASM is single-threaded this basically just turns into a refcell. static DESCRIBERS: Mutex> = Mutex::new(Vec::new()); -/// A reducer function takes in `(Sender, Timestamp, Args)` and writes to a new `Buffer`. -pub type ReducerFn = fn(ReducerContext, &[u8]) -> Buffer; +/// A reducer function takes in `(Sender, Timestamp, Args)` +/// and returns a result with a possible error message. +pub type ReducerFn = fn(ReducerContext, &[u8]) -> ReducerResult; static REDUCERS: OnceLock> = OnceLock::new(); -/// Describes the module into a serialized form that is returned and writes the set of `REDUCERS`. +/// Called by the host when the module is initialized +/// to describe the module into a serialized form that is returned. +/// +/// This is also the module's opportunity to ready `__call_reducer__` +/// (by writing the set of `REDUCERS`). +/// +/// To `description`, a BSATN-encoded ModuleDef` should be written,. +/// For the time being, the definition of `ModuleDef` is not stabilized, +/// as it is being changed by the schema proposal. +/// +/// The `ModuleDef` is used to define tables, constraints, indices, reducers, etc. +/// This affords the module the opportunity +/// to define and, to a limited extent, alter the schema at initialization time, +/// including when modules are updated (re-publishing). +/// After initialization, the module cannot alter the schema. #[no_mangle] -extern "C" fn __describe_module__() -> Buffer { +extern "C" fn __describe_module__(description: BytesSink) { // Collect the `module`. let mut module = ModuleBuilder::default(); for describer in &*DESCRIBERS.lock().unwrap() { @@ -397,8 +397,8 @@ extern "C" fn __describe_module__() -> Buffer { // Write the set of reducers. REDUCERS.set(module.reducers).ok().unwrap(); - // Allocate the bsatn data into a fresh buffer. - Buffer::alloc(&bytes) + // Write the bsatn data into the sink. + write_to_sink(description, &bytes); } // TODO(1.0): update `__call_reducer__` docs + for `BytesSink`. @@ -423,7 +423,12 @@ extern "C" fn __describe_module__() -> Buffer { /// The contents of the buffer are the BSATN-encoding of the arguments to the reducer. /// In the case of empty arguments, `args` will be 0, that is, invalid. /// -/// The result of the reducer is written into a fresh buffer. +/// The `error` is a `BytesSink`, registered on the host side, +/// which can be written to with `bytes_sink_write`. +/// When `error` is written to, +/// it is expected that `HOST_CALL_FAILURE` is returned. +/// Otherwise, `0` should be returned, i.e., the reducer completed successfully. +/// Note that in the future, more failure codes could be supported. #[no_mangle] extern "C" fn __call_reducer__( id: usize, @@ -435,7 +440,8 @@ extern "C" fn __call_reducer__( address_1: u64, timestamp: u64, args: BytesSource, -) -> Buffer { + error: BytesSink, +) -> i16 { // Piece together `sender_i` into an `Identity`. let sender = [sender_0, sender_1, sender_2, sender_3]; let sender: [u8; 32] = bytemuck::must_cast(sender); @@ -456,8 +462,18 @@ extern "C" fn __call_reducer__( address, }; + // Fetch reducer function. let reducers = REDUCERS.get().unwrap(); - with_read_args(args, |args| reducers[id](ctx, args)) + // Dispatch to it with the arguments read. + let res = with_read_args(args, |args| reducers[id](ctx, args)); + // Convert any error message to an error code and writes to the `error` sink. + match res { + Ok(()) => 0, + Err(msg) => { + write_to_sink(error, msg.as_bytes()); + errno::HOST_CALL_FAILURE.get() as i16 + } + } } /// Run `logic` with `args` read from the host into a `&[u8]`. @@ -512,6 +528,28 @@ fn read_bytes_source_into(source: BytesSource, buf: &mut Vec) { } } +/// Write `buf` to `sink`. +fn write_to_sink(sink: BytesSink, mut buf: &[u8]) { + const NO_SPACE: u16 = errno::NO_SPACE.get(); + const NO_SUCH_BYTES: u16 = errno::NO_SUCH_BYTES.get(); + + loop { + let len = &mut buf.len(); + match unsafe { sys::raw::_bytes_sink_write(sink, buf.as_ptr(), len) } { + 0 => { + // Set `buf` to remainder and bail if it's empty. + (_, buf) = buf.split_at(*len); + if buf.is_empty() { + break; + } + } + NO_SUCH_BYTES => panic!("invalid sink passed"), + NO_SPACE => panic!("no space left at sink"), + _ => unreachable!(), + } + } +} + #[macro_export] #[doc(hidden)] macro_rules! __make_register_reftype { diff --git a/crates/cli/src/subcommands/generate/mod.rs b/crates/cli/src/subcommands/generate/mod.rs index b067bc643f2..e88251d278f 100644 --- a/crates/cli/src/subcommands/generate/mod.rs +++ b/crates/cli/src/subcommands/generate/mod.rs @@ -1,20 +1,22 @@ #![warn(clippy::uninlined_format_args)] -use std::fs; -use std::io::Write; -use std::ops::Deref; -use std::path::{Path, PathBuf}; - use clap::Arg; use clap::ArgAction::SetTrue; use convert_case::{Case, Casing}; +use core::mem; use duct::cmd; +use spacetimedb::host::wasmtime::{Mem, MemView, WasmPointee as _}; use spacetimedb_lib::db::raw_def::RawColumnDefV8; use spacetimedb_lib::de::serde::DeserializeWrapper; use spacetimedb_lib::sats::{AlgebraicType, Typespace}; use spacetimedb_lib::{bsatn, MiscModuleExport, RawModuleDefV8, ReducerDef, TableDesc, TypeAlias}; use spacetimedb_lib::{RawModuleDef, MODULE_ABI_MAJOR_VERSION}; -use wasmtime::{AsContext, Caller}; +use spacetimedb_primitives::errno; +use std::fs; +use std::io::Write; +use std::ops::Deref; +use std::path::{Path, PathBuf}; +use wasmtime::{Caller, StoreContextMut}; use crate::Config; @@ -402,7 +404,7 @@ pub fn extract_descriptions(wasm_file: &Path) -> anyhow::Result println!("compilation took {:?}", t.elapsed()); let ctx = WasmCtx { mem: None, - buffers: slab::Slab::new(), + sink: Vec::new(), }; let mut store = wasmtime::Store::new(&engine, ctx); let mut linker = wasmtime::Linker::new(&engine); @@ -411,7 +413,7 @@ pub fn extract_descriptions(wasm_file: &Path) -> anyhow::Result linker.func_wrap( module_name, "_console_log", - |caller: Caller<'_, WasmCtx>, + |mut caller: Caller<'_, WasmCtx>, _level: u32, _target: u32, _target_len: u32, @@ -420,20 +422,14 @@ pub fn extract_descriptions(wasm_file: &Path) -> anyhow::Result _line_number: u32, message: u32, message_len: u32| { - let mem = caller.data().mem.unwrap(); - let slice = mem.deref_slice(&caller, message, message_len); - if let Some(slice) = slice { - println!("from wasm: {}", String::from_utf8_lossy(slice)); - } else { - println!("tried to print from wasm but out of bounds") - } + let (mem, _) = WasmCtx::mem_env(&mut caller); + let slice = mem.deref_slice(message, message_len).unwrap(); + println!("from wasm: {}", String::from_utf8_lossy(slice)); }, )?; - linker.func_wrap(module_name, "_buffer_alloc", WasmCtx::buffer_alloc)?; + linker.func_wrap(module_name, "_bytes_sink_write", WasmCtx::bytes_sink_write)?; let instance = linker.instantiate(&mut store, &module)?; - let memory = Memory { - mem: instance.get_memory(&mut store, "memory").unwrap(), - }; + let memory = Mem::extract(&instance, &mut store)?; store.data_mut().mem = Some(memory); let mut preinits = instance @@ -446,9 +442,10 @@ pub fn extract_descriptions(wasm_file: &Path) -> anyhow::Result } let module: RawModuleDef = match instance.get_func(&mut store, "__describe_module__") { Some(f) => { - let buf: u32 = f.typed(&store)?.call(&mut store, ()).unwrap(); - let slice = store.data_mut().buffers.remove(buf as usize); - bsatn::from_slice(&slice)? + store.data_mut().sink = Vec::new(); + f.typed::(&store)?.call(&mut store, 1).unwrap(); + let buf = mem::take(&mut store.data_mut().sink); + bsatn::from_slice(&buf)? } // TODO: shouldn't we return an error here? None => RawModuleDef::V8BackCompat(RawModuleDefV8::default()), @@ -461,36 +458,40 @@ pub fn extract_descriptions(wasm_file: &Path) -> anyhow::Result } struct WasmCtx { - mem: Option, - buffers: slab::Slab>, + mem: Option, + sink: Vec, } impl WasmCtx { - fn mem(&self) -> Memory { - self.mem.unwrap() + pub fn get_mem(&self) -> Mem { + self.mem.expect("Initialized memory") } - fn buffer_alloc(mut caller: Caller<'_, Self>, data: u32, data_len: u32) -> u32 { - let buf = caller - .data() - .mem() - .deref_slice(&caller, data, data_len) - .unwrap() - .to_vec(); - caller.data_mut().buffers.insert(buf) as u32 + + fn mem_env<'a>(ctx: impl Into>) -> (&'a mut MemView, &'a mut Self) { + let ctx = ctx.into(); + let mem = ctx.data().get_mem(); + mem.view_and_store_mut(ctx) } -} -#[derive(Copy, Clone)] -struct Memory { - mem: wasmtime::Memory, -} + pub fn bytes_sink_write( + mut caller: Caller<'_, Self>, + sink_handle: u32, + buffer_ptr: u32, + buffer_len_ptr: u32, + ) -> anyhow::Result { + if sink_handle != 1 { + return Ok(errno::NO_SUCH_BYTES.get().into()); + } + + let (mem, env) = Self::mem_env(&mut caller); + + // Read `buffer_len`, i.e., the capacity of `buffer` pointed to by `buffer_ptr`. + let buffer_len = u32::read_from(mem, buffer_len_ptr)?; + // Write `buffer` to `sink`. + let buffer = mem.deref_slice(buffer_ptr, buffer_len)?; + env.sink.extend(buffer); -impl Memory { - fn deref_slice<'a>(&self, store: &'a impl AsContext, offset: u32, len: u32) -> Option<&'a [u8]> { - self.mem - .data(store.as_context()) - .get(offset as usize..)? - .get(..len as usize) + Ok(0) } } diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 88ae8febbdd..19b7071a602 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -15,7 +15,7 @@ mod host_controller; #[allow(clippy::too_many_arguments)] pub mod module_host; pub mod scheduler; -mod wasmtime; +pub mod wasmtime; // Visible for integration testing. pub mod instance_env; mod wasm_common; @@ -142,6 +142,7 @@ fn from_json_seed<'de, T: serde::de::DeserializeSeed<'de>>(s: &'de str, seed: T) #[derive(Debug, Display, Enum, Clone, Copy, strum::AsRefStr)] pub enum AbiCall { BytesSourceRead, + BytesSinkWrite, CancelReducer, ConsoleLog, diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index ae27ba248fa..6d26d6e0bc3 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -113,8 +113,8 @@ pub trait FuncSigLike: PartialEq { } const PREINIT_SIG: StaticFuncSig = FuncSig::new(&[], &[]); -const INIT_SIG: StaticFuncSig = FuncSig::new(&[], &[WasmType::I32]); -const DESCRIBE_MODULE_SIG: StaticFuncSig = FuncSig::new(&[], &[WasmType::I32]); +const INIT_SIG: StaticFuncSig = FuncSig::new(&[WasmType::I32], &[WasmType::I32]); +const DESCRIBE_MODULE_SIG: StaticFuncSig = FuncSig::new(&[WasmType::I32], &[]); const CALL_REDUCER_SIG: StaticFuncSig = FuncSig::new( &[ WasmType::I32, // Reducer ID @@ -131,10 +131,11 @@ const CALL_REDUCER_SIG: StaticFuncSig = FuncSig::new( WasmType::I64, // `address_1` contains bytes `[8..16]`. // ---------------------------------------------------- WasmType::I64, // Timestamp - WasmType::I32, // Args buffer + WasmType::I32, // Args source buffer + WasmType::I32, // Errors sink buffer ], &[ - WasmType::I32, // Result buffer + WasmType::I32, // Result code ], ); @@ -294,21 +295,6 @@ impl ResourceSlab { pub fn take(&mut self, handle: I) -> Option { self.slab.try_remove(handle.to_u32() as usize) } - - pub fn clear(&mut self) { - self.slab.clear() - } -} - -decl_index!(BufferIdx => bytes::Bytes); -pub(super) type Buffers = ResourceSlab; - -impl BufferIdx { - pub const INVALID: Self = Self(u32::MAX); - - pub const fn is_invalid(&self) -> bool { - self.0 == Self::INVALID.0 - } } decl_index!(RowIterIdx => std::vec::IntoIter>); @@ -360,8 +346,9 @@ pub struct AbiRuntimeError { macro_rules! abi_funcs { ($mac:ident) => { $mac! { - "spacetime_10.0"::buffer_alloc, "spacetime_10.0"::bytes_source_read, + "spacetime_10.0"::bytes_sink_write, + "spacetime_10.0"::console_log, "spacetime_10.0"::delete_by_col_eq, "spacetime_10.0"::delete_by_rel, diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 7f20cc0a97f..f94673ae62f 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -50,7 +50,7 @@ pub trait WasmInstancePre: Send + Sync + 'static { } pub trait WasmInstance: Send + Sync + 'static { - fn extract_descriptions(&mut self) -> Result; + fn extract_descriptions(&mut self) -> Result, DescribeError>; fn instance_env(&self) -> &InstanceEnv; @@ -121,8 +121,6 @@ pub enum DescribeError { Decode(#[from] DecodeError), #[error(transparent)] RuntimeError(anyhow::Error), - #[error("invalid buffer")] - BadBuffer, #[error("unimplemented RawModuleDef version")] UnimplementedRawModuleDefVersion, } diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index 2c8b5b5a645..e15d0254740 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -72,7 +72,7 @@ pub fn make_actor(mcc: ModuleCreationContext) -> Result for EnergyQuanta { } } -trait WasmPointee { +pub trait WasmPointee { type Pointer; - fn write_to(self, mem: &mut MemView, ptr: Self::Pointer) -> Result<(), WasmError>; - fn read_from(mem: &mut MemView, ptr: Self::Pointer) -> Result + fn write_to(self, mem: &mut MemView, ptr: Self::Pointer) -> Result<(), MemError>; + fn read_from(mem: &mut MemView, ptr: Self::Pointer) -> Result where Self: Sized; } @@ -119,31 +119,31 @@ macro_rules! impl_pointee { ($($t:ty),*) => { $(impl WasmPointee for $t { type Pointer = u32; - fn write_to(self, mem: &mut MemView, ptr: Self::Pointer) -> Result<(), WasmError> { + fn write_to(self, mem: &mut MemView, ptr: Self::Pointer) -> Result<(), MemError> { let bytes = self.to_le_bytes(); mem.deref_slice_mut(ptr, bytes.len() as u32)?.copy_from_slice(&bytes); Ok(()) } - fn read_from(mem: &mut MemView, ptr: Self::Pointer) -> Result { + fn read_from(mem: &mut MemView, ptr: Self::Pointer) -> Result { Ok(Self::from_le_bytes(*mem.deref_array(ptr)?)) } })* }; } impl_pointee!(u8, u16, u32, u64); -impl_pointee!(super::wasm_common::BufferIdx, super::wasm_common::RowIterIdx); +impl_pointee!(super::wasm_common::RowIterIdx); type WasmPtr = ::Pointer; /// Wraps access to WASM linear memory with some additional functionality. #[derive(Clone, Copy)] -struct Mem { +pub struct Mem { /// The underlying WASM `memory` instance. pub memory: wasmtime::Memory, } impl Mem { /// Constructs an instance of `Mem` from an exports map. - fn extract(exports: &wasmtime::Instance, store: impl wasmtime::AsContextMut) -> anyhow::Result { + pub fn extract(exports: &wasmtime::Instance, store: impl wasmtime::AsContextMut) -> anyhow::Result { Ok(Self { memory: exports.get_memory(store, "memory").context("no memory export")?, }) @@ -151,7 +151,7 @@ impl Mem { /// Creates and returns a view into the actual memory `store`. /// This view allows for reads and writes. - fn view_and_store_mut<'a, T>(&self, store: impl Into>) -> (&'a mut MemView, &'a mut T) { + pub fn view_and_store_mut<'a, T>(&self, store: impl Into>) -> (&'a mut MemView, &'a mut T) { let (mem, store_data) = self.memory.data_and_store_mut(store); (MemView::from_slice_mut(mem), store_data) } @@ -162,7 +162,7 @@ impl Mem { } #[repr(transparent)] -struct MemView([u8]); +pub struct MemView([u8]); impl MemView { fn from_slice_mut(v: &mut [u8]) -> &mut Self { @@ -175,7 +175,7 @@ impl MemView { } /// Get a byte slice of wasm memory given a pointer and a length. - fn deref_slice(&self, offset: WasmPtr, len: u32) -> Result<&[u8], MemError> { + pub fn deref_slice(&self, offset: WasmPtr, len: u32) -> Result<&[u8], MemError> { if offset == 0 { return Err(MemError::Null); } @@ -216,7 +216,7 @@ impl MemView { /// An error that can result from operations on [`MemView`]. #[derive(thiserror::Error, Debug)] -enum MemError { +pub enum MemError { #[error("out of bounds pointer passed to a spacetime function")] OutOfBounds, #[error("null pointer passed to a spacetime function")] @@ -234,14 +234,14 @@ impl From for WasmError { /// Extension trait to gracefully handle null `WasmPtr`s, e.g. /// `mem.deref_slice(ptr, len).check_nullptr()? == Option<&[u8]>`. trait NullableMemOp { - fn check_nullptr(self) -> Result, WasmError>; + fn check_nullptr(self) -> Result, MemError>; } impl NullableMemOp for Result { - fn check_nullptr(self) -> Result, WasmError> { + fn check_nullptr(self) -> Result, MemError> { match self { Ok(x) => Ok(Some(x)), Err(MemError::Null) => Ok(None), - Err(e) => Err(e.into()), + Err(e) => Err(e), } } } diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 08730910689..3f17e5834e6 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -8,8 +8,8 @@ use crate::execution_context::ExecutionContext; use crate::host::wasm_common::instrumentation; use crate::host::wasm_common::module_host_actor::ExecutionTimings; use crate::host::wasm_common::{ - err_to_errno, instrumentation::CallTimes, AbiRuntimeError, BufferIdx, Buffers, RowIterIdx, RowIters, TimingSpan, - TimingSpanIdx, TimingSpanSet, + err_to_errno, instrumentation::CallTimes, AbiRuntimeError, RowIterIdx, RowIters, TimingSpan, TimingSpanIdx, + TimingSpanSet, }; use crate::host::AbiCall; use anyhow::Context as _; @@ -53,8 +53,8 @@ pub(super) struct WasmInstanceEnv { /// that it can read via [`Self::bytes_source_read`]. call_reducer_args: Option<(bytes::Bytes, usize)>, - /// The slab of `Buffers` created for this instance. - buffers: Buffers, + /// The standard sink used for [`Self::bytes_sink_write`]. + standard_bytes_sink: Option>, /// The slab of `BufferIters` created for this instance. iters: RowIters, @@ -76,6 +76,7 @@ pub(super) struct WasmInstanceEnv { } const CALL_REDUCER_ARGS_SOURCE: u32 = 1; +const STANDARD_BYTES_SINK: u32 = 1; type WasmResult = Result; type RtResult = anyhow::Result; @@ -90,7 +91,7 @@ impl WasmInstanceEnv { instance_env, mem: None, call_reducer_args: None, - buffers: Default::default(), + standard_bytes_sink: None, iters: Default::default(), timing_spans: Default::default(), reducer_start, @@ -121,49 +122,60 @@ impl WasmInstanceEnv { &self.instance_env } - /// Take ownership of a particular `Buffer` from this instance. - pub fn take_buffer(&mut self, idx: BufferIdx) -> Option { - self.buffers.take(idx) + /// Setup the standard bytes sink and return a handle to it for writing. + pub fn setup_standard_bytes_sink(&mut self) -> u32 { + self.standard_bytes_sink = Some(Vec::new()); + STANDARD_BYTES_SINK + } + + /// Extract all the bytes written to the standard bytes sink + /// and prevent further writes to it. + pub fn take_standard_bytes_sink(&mut self) -> Vec { + self.standard_bytes_sink.take().unwrap_or_default() } /// Signal to this `WasmInstanceEnv` that a reducer call is beginning. /// - /// Returns the handle used by reducers to read from `args`. - pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes) -> u32 { - self.call_reducer_args = (!args.is_empty()).then_some((args, 0)); - - self.reducer_start = Instant::now(); - name.clone_into(&mut self.reducer_name); + /// Returns the handle used by reducers to read from `args` + /// as well as the handle used to write the error message, if any. + pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes) -> (u32, u32) { + let errors = self.setup_standard_bytes_sink(); // Pass an invalid source when the reducer args were empty. // This allows the module to avoid allocating and make a system call in those cases. - if self.call_reducer_args.is_some() { + self.call_reducer_args = (!args.is_empty()).then_some((args, 0)); + let args = if self.call_reducer_args.is_some() { CALL_REDUCER_ARGS_SOURCE } else { 0 - } + }; + + self.reducer_start = Instant::now(); + name.clone_into(&mut self.reducer_name); + + (args, errors) } /// Signal to this `WasmInstanceEnv` that a reducer call is over. /// This resets all of the state associated to a single reducer call, /// and returns instrumentation records. - pub fn finish_reducer(&mut self) -> ExecutionTimings { - self.call_reducer_args = None; - - // For the moment, we only explicitly clear the set of buffers and the - // "syscall" times. + pub fn finish_reducer(&mut self) -> (ExecutionTimings, Vec) { + // For the moment, + // we only explicitly clear the source/sink buffers and the "syscall" times. // TODO: should we be clearing `iters` and/or `timing_spans`? - self.buffers.clear(); let total_duration = self.reducer_start.elapsed(); // Taking the call times record also resets timings to 0s for the next call. let wasm_instance_env_call_times = self.call_times.take(); - ExecutionTimings { + let timings = ExecutionTimings { total_duration, wasm_instance_env_call_times, - } + }; + + self.call_reducer_args = None; + (timings, self.take_standard_bytes_sink()) } /// Returns an execution context for a reducer call. @@ -255,7 +267,7 @@ impl WasmInstanceEnv { Self::cvt(caller, call, |caller| { f(caller).and_then(|ret| { let (mem, _) = Self::mem_env(caller); - ret.write_to(mem, out) + ret.write_to(mem, out).map_err(|e| e.into()) }) }) } @@ -635,22 +647,6 @@ impl WasmInstanceEnv { Ok(()) } - /// Creates a buffer of size `data_len` in the host environment. - /// - /// The contents of the byte slice pointed to by `data` - /// and lasting `data_len` bytes - /// is written into the newly initialized buffer. - /// - /// The buffer is registered in the host environment and is indexed by the returned `u32`. - /// - /// Returns an error if `data + data_len` overflows a 64-bit integer. - // #[tracing::instrument(skip_all)] - pub fn buffer_alloc(mut caller: Caller<'_, Self>, data: WasmPtr, data_len: u32) -> RtResult { - let (mem, env) = Self::mem_env(&mut caller); - let buf = mem.deref_slice(data, data_len)?; - Ok(env.buffers.insert(buf.to_vec().into()).0) - } - /// Reads bytes from `source`, registered in the host environment, /// and stores them in the memory pointed to by `buffer = buffer_ptr[..buffer_len]`. /// @@ -758,6 +754,49 @@ impl WasmInstanceEnv { }) } + /// Writes up to `buffer_len` bytes from `buffer = buffer_ptr[..buffer_len]`, + /// to the `sink`, registered in the host environment. + /// + /// The `buffer_len = buffer_len_ptr[..size_of::()]` stores the capacity of `buffer`. + /// On success (`0` is returned), + /// `buffer_len` is set to the number of bytes written to `sink`. + /// + /// # Traps + /// + /// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory. + /// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory. + /// + /// # Errors + /// + /// Returns an error: + /// + /// - `NO_SUCH_BYTES`, when `sink` is not a valid bytes sink. + /// - `NO_SPACE`, when there is no room for more bytes in `sink`. + /// (Doesn't currently happen.) + pub fn bytes_sink_write( + caller: Caller<'_, Self>, + sink: u32, + buffer_ptr: WasmPtr, + buffer_len_ptr: WasmPtr, + ) -> RtResult { + Self::cvt_custom(caller, AbiCall::BytesSinkWrite, |caller| { + let (mem, env) = Self::mem_env(caller); + + // Retrieve the reducer args if available and requested, or error. + let Some(sink) = env.standard_bytes_sink.as_mut().filter(|_| sink == STANDARD_BYTES_SINK) else { + return Ok(errno::NO_SUCH_BYTES.get().into()); + }; + + // Read `buffer_len`, i.e., the capacity of `buffer` pointed to by `buffer_ptr`. + let buffer_len = u32::read_from(mem, buffer_len_ptr)?; + // Write `buffer` to `sink`. + let buffer = mem.deref_slice(buffer_ptr, buffer_len)?; + sink.extend(buffer); + + Ok(0) + }) + } + pub fn span_start(mut caller: Caller<'_, Self>, name: WasmPtr, name_len: u32) -> RtResult { let (mem, env) = Self::mem_env(&mut caller); let name = mem.deref_slice(name, name_len)?.to_vec(); diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 5e786d7d669..8013634173b 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -6,8 +6,8 @@ use crate::energy::ReducerBudget; use crate::host::instance_env::InstanceEnv; use crate::host::wasm_common::module_host_actor::{DescribeError, InitializationError}; use crate::host::wasm_common::*; -use anyhow::anyhow; -use bytes::Bytes; +use crate::util::string_from_utf8_lossy_owned; +use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use wasmtime::{AsContext, AsContextMut, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace}; fn log_traceback(func_type: &str, func: &str, e: &wasmtime::Error) { @@ -75,6 +75,16 @@ impl module_host_actor::WasmModule for WasmtimeModule { } } +fn handle_error_sink_code(code: i32, error: Vec) -> Result<(), Box> { + match code { + 0 => Ok(()), + CALL_FAILURE => Err(string_from_utf8_lossy_owned(error).into()), + _ => Err("unknown return code".into()), + } +} + +const CALL_FAILURE: i32 = HOST_CALL_FAILURE.get() as i32; + impl module_host_actor::WasmInstancePre for WasmtimeModule { type Instance = WasmtimeInstance; @@ -101,24 +111,16 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { })?; } - let init = instance.get_typed_func::<(), u32>(&mut store, SETUP_DUNDER); - if let Ok(init) = init { - match init.call(&mut store, ()).map(BufferIdx) { - Ok(errbuf) if errbuf.is_invalid() => {} - Ok(errbuf) => { - let errbuf = store - .data_mut() - .take_buffer(errbuf) - .unwrap_or_else(|| "unknown error".as_bytes().into()); - let errbuf = crate::util::string_from_utf8_lossy_owned(errbuf.into()).into(); - // TODO: catch this and return the error message to the http client - return Err(InitializationError::Setup(errbuf)); - } + if let Ok(init) = instance.get_typed_func::(&mut store, SETUP_DUNDER) { + let setup_error = store.data_mut().setup_standard_bytes_sink(); + let res = init.call(&mut store, setup_error); + let error = store.data_mut().take_standard_bytes_sink(); + match res { + // TODO: catch this and return the error message to the http client + Ok(code) => handle_error_sink_code(code, error).map_err(InitializationError::Setup)?, Err(err) => { - return Err(InitializationError::RuntimeError { - err, - func: SETUP_DUNDER.to_owned(), - }); + let func = SETUP_DUNDER.to_owned(); + return Err(InitializationError::RuntimeError { err, func }); } } } @@ -135,7 +137,7 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { } } -type CallReducerType = TypedFunc<(u32, u64, u64, u64, u64, u64, u64, u64, u32), u32>; +type CallReducerType = TypedFunc<(u32, u64, u64, u64, u64, u64, u64, u64, u32, u32), i32>; pub struct WasmtimeInstance { store: Store, @@ -144,27 +146,31 @@ pub struct WasmtimeInstance { } impl module_host_actor::WasmInstance for WasmtimeInstance { - fn extract_descriptions(&mut self) -> Result { + fn extract_descriptions(&mut self) -> Result, DescribeError> { let describer_func_name = DESCRIBE_MODULE_DUNDER; - let describer = self.instance.get_func(&mut self.store, describer_func_name).unwrap(); + let store = &mut self.store; + + let describer = self.instance.get_func(&mut *store, describer_func_name).unwrap(); + let describer = describer + .typed::(&mut *store) + .map_err(|_| DescribeError::Signature)?; + + let sink = store.data_mut().setup_standard_bytes_sink(); let start = std::time::Instant::now(); log::trace!("Start describer \"{}\"...", describer_func_name); - let store = &mut self.store; - let describer = describer - .typed::<(), u32>(&mut *store) - .map_err(|_| DescribeError::Signature)?; - let result = describer.call(&mut *store, ()).map(BufferIdx); + let result = describer.call(&mut *store, sink); + let duration = start.elapsed(); - log::trace!("Describer \"{}\" ran: {} us", describer_func_name, duration.as_micros(),); - let buf = result + log::trace!("Describer \"{}\" ran: {} us", describer_func_name, duration.as_micros()); + + result .inspect_err(|err| log_traceback("describer", describer_func_name, err)) .map_err(DescribeError::RuntimeError)?; - let bytes = store.data_mut().take_buffer(buf).ok_or(DescribeError::BadBuffer)?; - // Clear all of the instance state associated to this describer call. - store.data_mut().finish_reducer(); + // Fetch the bsatn returned by the describer call. + let bytes = store.data_mut().take_standard_bytes_sink(); Ok(bytes) } @@ -191,42 +197,31 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let [sender_0, sender_1, sender_2, sender_3] = bytemuck::must_cast(*op.caller_identity.as_bytes()); let [address_0, address_1] = bytemuck::must_cast(*op.caller_address.as_slice()); - // Prepare arguments to the reducer & start timings. - let args_source = store.data_mut().start_reducer(op.name, op.arg_bytes); - - let call_result = self - .call_reducer - .call( - &mut *store, - ( - op.id.0, - sender_0, - sender_1, - sender_2, - sender_3, - address_0, - address_1, - op.timestamp.microseconds, - args_source, - ), - ) - .and_then(|errbuf| { - let errbuf = BufferIdx(errbuf); - Ok(if errbuf.is_invalid() { - Ok(()) - } else { - let errmsg = store - .data_mut() - .take_buffer(errbuf) - .ok_or_else(|| anyhow!("invalid buffer handle"))?; - Err(crate::util::string_from_utf8_lossy_owned(errmsg.into()).into()) - }) - }); + // Prepare arguments to the reducer + the error sink & start timings. + let (args_source, errors_sink) = store.data_mut().start_reducer(op.name, op.arg_bytes); + + let call_result = self.call_reducer.call( + &mut *store, + ( + op.id.0, + sender_0, + sender_1, + sender_2, + sender_3, + address_0, + address_1, + op.timestamp.microseconds, + args_source, + errors_sink, + ), + ); // Signal that this reducer call is finished. This gets us the timings // associated to our reducer call, and clears all of the instance state // associated to the call. - let timings = store.data_mut().finish_reducer(); + let (timings, error) = store.data_mut().finish_reducer(); + + let call_result = call_result.map(|code| handle_error_sink_code(code, error)); let remaining: ReducerBudget = get_store_fuel(store).into(); let energy = module_host_actor::EnergyStats { diff --git a/crates/primitives/src/errno.rs b/crates/primitives/src/errno.rs index 89a366ba3aa..bb5a98b1c90 100644 --- a/crates/primitives/src/errno.rs +++ b/crates/primitives/src/errno.rs @@ -8,11 +8,14 @@ use core::num::NonZeroU16; macro_rules! errnos { ($mac:ident) => { $mac!( - NO_SUCH_TABLE(1, "No such table"), + // TODO(1.0): remove this. LOOKUP_NOT_FOUND(2, "Value or range provided not found in table"), - UNIQUE_ALREADY_EXISTS(3, "Value with given unique identifier already exists"), - BUFFER_TOO_SMALL(4, "The provided buffer is not large enough to store the data"), + HOST_CALL_FAILURE(1, "ABI called by host returned an error"), + NO_SUCH_TABLE(4, "No such table"), NO_SUCH_BYTES(8, "The provided bytes source or sink is not valid"), + NO_SPACE(9, "The provided sink has no more space left"), + BUFFER_TOO_SMALL(11, "The provided buffer is not large enough to store the data"), + UNIQUE_ALREADY_EXISTS(12, "Value with given unique identifier already exists"), ); }; }