Skip to content

Commit

Permalink
[WASM ABI 1.0] impl __call_reducer__ using bytes_source_read (#1609)
Browse files Browse the repository at this point in the history
  • Loading branch information
Centril authored Aug 20, 2024
1 parent 3be5c83 commit 9ea5a2f
Show file tree
Hide file tree
Showing 12 changed files with 384 additions and 146 deletions.
5 changes: 5 additions & 0 deletions crates/bindings-csharp/Runtime/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public class BufferTooSmallException : StdbException
public override string Message => "The provided buffer is not large enough to store the data";
}

public class NoSuchBytesException : StdbException
{
public override string Message => "The provided bytes source or sink does not exist";
}

public class UnknownException : StdbException
{
private readonly FFI.CheckedStatus.Errno code;
Expand Down
24 changes: 16 additions & 8 deletions crates/bindings-csharp/Runtime/Internal/FFI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ namespace SpacetimeDB.Internal;
using System.Runtime.InteropServices;
using System.Runtime.InteropServices.Marshalling;

// This type is outside of the hidden `FFI` class because for now we need to do some public
// forwarding in the codegen for `__describe_module__` and `__call_reducer__` exports which both
// use this type.
[StructLayout(LayoutKind.Sequential)]
public readonly record struct BytesSource(uint Handle)
{
public static readonly BytesSource INVALID = new(0);
}

// This type is outside of the hidden `FFI` class because for now we need to do some public
// forwarding in the codegen for `__describe_module__` and `__call_reducer__` exports which both
// use this type.
Expand Down Expand Up @@ -49,6 +58,7 @@ public enum Errno : ushort
LOOKUP_NOT_FOUND = 2,
UNIQUE_ALREADY_EXISTS = 3,
BUFFER_TOO_SMALL = 4,
NO_SUCH_BYTES = 8,
}

// This custom marshaller takes care of checking the status code
Expand All @@ -75,6 +85,7 @@ public static CheckedStatus ConvertToManaged(Errno status)
Errno.LOOKUP_NOT_FOUND => new LookupNotFoundException(),
Errno.UNIQUE_ALREADY_EXISTS => new UniqueAlreadyExistsException(),
Errno.BUFFER_TOO_SMALL => new BufferTooSmallException(),
Errno.NO_SUCH_BYTES => new NoSuchBytesException(),
_ => new UnknownException(status),
};
}
Expand Down Expand Up @@ -163,7 +174,7 @@ out RowIter out_
[LibraryImport(StdbNamespace)]
public static partial CheckedStatus _iter_advance(
RowIter iter_handle,
[MarshalUsing(CountElementName = nameof(buffer_len))] [Out] byte[] buffer,
[MarshalUsing(CountElementName = nameof(buffer_len))][Out] byte[] buffer,
ref uint buffer_len
);

Expand All @@ -183,13 +194,10 @@ uint message_len
);

[LibraryImport(StdbNamespace)]
public static partial uint _buffer_len(Buffer buf_handle);

[LibraryImport(StdbNamespace)]
public static partial void _buffer_consume(
Buffer buf_handle,
[MarshalUsing(CountElementName = nameof(dst_len))] [Out] byte[] dst,
uint dst_len
public static partial short _bytes_source_read(
BytesSource source,
Span<byte> buffer,
ref uint buffer_len_ptr
);

[LibraryImport(StdbNamespace)]
Expand Down
44 changes: 38 additions & 6 deletions crates/bindings-csharp/Runtime/Internal/Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace SpacetimeDB.Internal;
using SpacetimeDB;
using SpacetimeDB.BSATN;
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;

public static partial class Module
Expand Down Expand Up @@ -219,12 +220,42 @@ public static void RegisterTable<T>()
moduleDef.RegisterTable(T.MakeTableDesc(typeRegistrar));
}

private static byte[] Consume(this Buffer buffer)
private static byte[] Consume(this BytesSource source)
{
var len = FFI._buffer_len(buffer);
var result = new byte[len];
FFI._buffer_consume(buffer, result, len);
return result;
if (source == BytesSource.INVALID)
{
return [];
}
var buffer = new byte[0x20_000];
var written = 0U;
while (true)
{
// Write into the spare capacity of the buffer.
var spare = buffer.AsSpan((int)written);
var buf_len = (uint)spare.Length;
var ret = FFI._bytes_source_read(source, spare, ref buf_len);
written += buf_len;
switch (ret)
{
// Host side source exhausted, we're done.
case -1:
Array.Resize(ref buffer, (int)written);
return buffer;
// Wrote the entire spare capacity.
// Need to reserve more space in the buffer.
case 0 when written == buffer.Length:
Array.Resize(ref buffer, buffer.Length + 1024);
break;
// Host didn't write as much as possible.
// Try to read some more.
// The host will likely not trigger this branch (current host doesn't),
// but a module should be prepared for it.
case 0:
break;
default:
throw new UnreachableException();
}
}
}

#pragma warning disable IDE1006 // Naming Styles - methods below are meant for FFI.
Expand Down Expand Up @@ -257,7 +288,7 @@ public static Buffer __call_reducer__(
ulong address_0,
ulong address_1,
DateTimeOffsetRepr timestamp,
Buffer args
BytesSource args
)
{
// Piece together the sender identity.
Expand All @@ -269,6 +300,7 @@ Buffer args
try
{
Runtime.Random = new((int)timestamp.MicrosecondsSinceEpoch);

using var stream = new MemoryStream(args.Consume());
using var reader = new BinaryReader(stream);
reducers[(int)id].Invoke(reader, new(sender, address, timestamp.ToStd()));
Expand Down
6 changes: 3 additions & 3 deletions crates/bindings-csharp/Runtime/bindings.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ OPAQUE_TYPEDEF(ColId, uint16_t);
OPAQUE_TYPEDEF(IndexType, uint8_t);
OPAQUE_TYPEDEF(LogLevel, uint8_t);
OPAQUE_TYPEDEF(Buffer, uint32_t);
OPAQUE_TYPEDEF(BytesSource, uint32_t);
OPAQUE_TYPEDEF(RowIter, uint32_t);

#define CSTR(s) (uint8_t*)s, sizeof(s) - 1
Expand Down Expand Up @@ -68,9 +69,8 @@ IMPORT(Status, _iter_advance,
(RowIter iter, uint8_t* buffer, size_t* buffer_len),
(iter, buffer, buffer_len));
IMPORT(void, _iter_drop, (RowIter iter), (iter));
IMPORT(uint32_t, _buffer_len, (Buffer buf), (buf));
IMPORT(void, _buffer_consume, (Buffer buf, uint8_t* dst, uint32_t dst_len),
(buf, dst, dst_len));
IMPORT(int16_t, _bytes_source_read, (BytesSource source, uint8_t* buffer_ptr, size_t* buffer_len_ptr),
(source, buffer_ptr, buffer_len_ptr));
IMPORT(Buffer, _buffer_alloc, (const uint8_t* data, uint32_t len), (data, len));

#ifndef EXPERIMENTAL_WASM_AOT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
<WasmImport Include="$(SpacetimeNamespace)!_iter_start_filtered" />
<WasmImport Include="$(SpacetimeNamespace)!_iter_next" />
<WasmImport Include="$(SpacetimeNamespace)!_iter_drop" />
<WasmImport Include="$(SpacetimeNamespace)!_buffer_len" />
<WasmImport Include="$(SpacetimeNamespace)!_buffer_consume" />
<WasmImport Include="$(SpacetimeNamespace)!_bytes_source_read" />
<WasmImport Include="$(SpacetimeNamespace)!_buffer_alloc" />

<PackageReference Include="Microsoft.DotNet.ILCompiler.LLVM" Version="8.0.0-*" />
Expand Down
120 changes: 77 additions & 43 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use core::mem::MaybeUninit;
use core::num::NonZeroU16;
use std::ptr;

use alloc::boxed::Box;

use spacetimedb_primitives::{errno, errnos, ColId, TableId};

/// Provides a raw set of sys calls which abstractions can be built atop of.
Expand Down Expand Up @@ -252,6 +250,71 @@ pub mod raw {
/// Traps if `data + data_len` overflows a 64-bit integer.
pub fn _buffer_alloc(data: *const u8, data_len: usize) -> Buffer;

/// Reads bytes from `source`, registered in the host environment,
/// and stores them in the memory pointed to by `buffer = buffer_ptr[..buffer_len]`.
///
/// The `buffer_len = buffer_len_ptr[..size_of::<usize>()]` stores the capacity of `buffer`.
/// On success (`0` or `-1` is returned),
/// `buffer_len` is set to the number of bytes written to `buffer`.
/// When `-1` is returned, the resource has been exhausted
/// and there are no more bytes to read,
/// leading to the resource being immediately destroyed.
/// Note that the host is free to reuse allocations in a pool,
/// destroying the handle logically does not entail that memory is necessarily reclaimed.
///
/// # Traps
///
/// Traps if:
///
/// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory.
/// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory.
///
/// # Errors
///
/// Returns an error:
///
/// - `NO_SUCH_BYTES`, when `source` is not a valid bytes source.
///
/// # Example
///
/// The typical use case for this ABI is in `__call_reducer__`,
/// to read and deserialize the `args`.
/// An example definition, dealing with `args` might be:
/// ```rust,ignore
/// /// #[no_mangle]
/// extern "C" fn __call_reducer__(..., args: BytesSource, ...) -> i16 {
/// // ...
///
/// let mut buf = Vec::<u8>::with_capacity(1024);
/// loop {
/// // Write into the spare capacity of the buffer.
/// let buf_ptr = buf.spare_capacity_mut();
/// let spare_len = buf_ptr.len();
/// let mut buf_len = buf_ptr.len();
/// let buf_ptr = buf_ptr.as_mut_ptr().cast();
/// let ret = unsafe { bytes_source_read(args, buf_ptr, &mut buf_len) };
/// // SAFETY: `bytes_source_read` just appended `spare_len` bytes to `buf`.
/// unsafe { buf.set_len(buf.len() + spare_len) };
/// match ret {
/// // Host side source exhausted, we're done.
/// -1 => break,
/// // Wrote the entire spare capacity.
/// // Need to reserve more space in the buffer.
/// 0 if spare_len == buf_len => buf.reserve(1024),
/// // Host didn't write as much as possible.
/// // Try to read some more.
/// // The host will likely not trigger this branch,
/// // but a module should be prepared for it.
/// 0 => {}
/// _ => unreachable!(),
/// }
/// }
///
/// // ...
/// }
/// ```
pub fn _bytes_source_read(source: BytesSource, buffer_ptr: *mut u8, buffer_len_ptr: *mut usize) -> i16;

/// Begin a timing span.
///
/// When the returned `u32` span ID is passed to [`_span_end`],
Expand Down Expand Up @@ -298,6 +361,18 @@ pub mod raw {
/// A panic level is emitted just before a fatal error causes the WASM module to trap.
pub const LOG_LEVEL_PANIC: u8 = 101;

/// A handle into a buffer of bytes in the host environment that can be read from.
///
/// Used for transporting bytes from host to WASM linear memory.
#[derive(PartialEq, Eq, Copy, Clone)]
#[repr(transparent)]
pub struct BytesSource(u32);

impl BytesSource {
/// An invalid handle, used e.g., when the reducer arguments were empty.
pub const INVALID: Self = Self(0);
}

/// A handle into a buffer of bytes in the host environment.
///
/// Used for transporting bytes host <-> WASM linear memory.
Expand Down Expand Up @@ -632,47 +707,6 @@ impl Buffer {
raw: raw::INVALID_BUFFER,
};

/// Returns the number of bytes of the data stored in the buffer.
pub fn data_len(&self) -> usize {
unsafe { raw::_buffer_len(self.raw) }
}

/// Read the contents of the buffer into the provided Vec.
/// The Vec is cleared in the process.
pub fn read_into(self, buf: &mut Vec<u8>) {
let data_len = self.data_len();
buf.clear();
buf.reserve(data_len);
self.read_uninit(&mut buf.spare_capacity_mut()[..data_len]);
// SAFETY: We just wrote `data_len` bytes into `buf`.
unsafe { buf.set_len(data_len) };
}

/// Read the contents of the buffer into a new boxed byte slice.
pub fn read(self) -> Box<[u8]> {
let mut buf = alloc::vec::Vec::new();
self.read_into(&mut buf);
buf.into_boxed_slice()
}

/// Read the contents of the buffer into an array of fixed size `N`.
///
/// If the length is wrong, the module will crash.
pub fn read_array<const N: usize>(self) -> [u8; N] {
// use MaybeUninit::uninit_array once stable
let mut arr = unsafe { MaybeUninit::<[MaybeUninit<u8>; N]>::uninit().assume_init() };
self.read_uninit(&mut arr);
// use MaybeUninit::array_assume_init once stable
unsafe { (&arr as *const [_; N]).cast::<[u8; N]>().read() }
}

/// Reads the buffer into an uninitialized byte string `buf`.
///
/// The module will crash if `buf`'s length doesn't match the buffer.
pub fn read_uninit(self, buf: &mut [MaybeUninit<u8>]) {
unsafe { raw::_buffer_consume(self.raw, buf.as_mut_ptr().cast(), buf.len()) }
}

/// Allocates a buffer with the contents of `data`.
pub fn alloc(data: &[u8]) -> Self {
let raw = unsafe { raw::_buffer_alloc(data.as_ptr(), data.len()) };
Expand Down
Loading

2 comments on commit 9ea5a2f

@github-actions
Copy link

@github-actions github-actions bot commented on 9ea5a2f Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

@github-actions
Copy link

@github-actions github-actions bot commented on 9ea5a2f Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

Please sign in to comment.