Skip to content

Commit

Permalink
[release/v0.11.1-beta]: Manually apply open PR #1612: Add volatile_no…
Browse files Browse the repository at this point in the history
…natomic_schedule_immediate
  • Loading branch information
bfops committed Aug 23, 2024
1 parent 6cccf94 commit 473eb31
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 50 deletions.
20 changes: 20 additions & 0 deletions crates/bindings-csharp/Codegen/Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -420,5 +420,25 @@ SpacetimeDB.Internal.Buffer args
);
}
);

reducers
.Select(
(r, ct) =>
new KeyValuePair<string, string>(
r.FullName,
r.Scope.GenerateExtensions(
$@"
public static void VolatileNonatomicScheduleImmediate{r.Name}({string.Join(", ", r.GetNonContextArgs().Select(a => $"{a.Type} {a.Name}"))}) {{
using var stream = new MemoryStream();
using var writer = new BinaryWriter(stream);
{string.Join("\n", r.GetNonContextArgs().Select(a => $"new {a.TypeInfo}().Write(writer, {a.Name});"))}
SpacetimeDB.Internal.IReducer.VolatileNonatomicScheduleImmediate(""{r.ExportName}"", stream);
}}
"
)
)
)
.WithTrackingName("SpacetimeDB.Reducer.GenerateSchedule")
.RegisterSourceOutputs(context);
}
}
8 changes: 8 additions & 0 deletions crates/bindings-csharp/Runtime/Internal/FFI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ ref uint buffer_len
[LibraryImport(StdbNamespace)]
public static partial void _iter_drop(RowIter iter_handle);

[LibraryImport(StdbNamespace)]
public static partial void _volatile_nonatomic_schedule_immediate(
[In] byte[] name,
uint name_len,
[In] byte[] args,
uint args_len
);

[LibraryImport(StdbNamespace)]
public static partial void _console_log(
byte level,
Expand Down
13 changes: 13 additions & 0 deletions crates/bindings-csharp/Runtime/Internal/IReducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,17 @@ public interface IReducer

// This one is not static because we need to be able to store IReducer in a list.
void Invoke(BinaryReader reader, ReducerContext args);

public static void VolatileNonatomicScheduleImmediate(string name, MemoryStream args)
{
var name_bytes = Encoding.UTF8.GetBytes(name);
var args_bytes = args.ToArray();

FFI._volatile_nonatomic_schedule_immediate(
name_bytes,
(uint)name_bytes.Length,
args_bytes,
(uint)args_bytes.Length
);
}
}
3 changes: 3 additions & 0 deletions crates/bindings-sys/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ rust-version.workspace = true
# Benching off, because of https://bheisler.github.io/criterion.rs/book/faq.html#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options
bench = false

[features]
unstable_abi = []

[dependencies]
spacetimedb-primitives.workspace = true
42 changes: 9 additions & 33 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,32 +225,23 @@ pub mod raw {
message_len: usize,
);

/// Schedules a reducer to be called asynchronously at `time`.
/// Schedules a reducer to be called asynchronously, nonatomically,
/// and immediately on a best effort basis.
///
/// The reducer is named as the valid UTF-8 slice `(name, name_len)`,
/// and is passed the slice `(args, args_len)` as its argument.
///
/// A generated schedule id is assigned to the reducer.
/// This id is written to the pointer `out`.
///
/// Traps if
/// - the `time` delay exceeds `64^6 - 1` milliseconds from now
/// - `name` does not point to valid UTF-8
/// - `name + name_len` or `args + args_len` overflow a 64-bit integer
pub fn _schedule_reducer(
#[cfg(feature = "unstable_abi")]
pub fn _volatile_nonatomic_schedule_immediate(
name: *const u8,
name_len: usize,
args: *const u8,
args_len: usize,
time: u64,
out: *mut u64,
);

/// Unschedule a reducer using the same `id` generated as when it was scheduled.
///
/// This assumes that the reducer hasn't already been executed.
pub fn _cancel_reducer(id: u64);

/// Returns the length (number of bytes) of buffer `bufh` without
/// transferring ownership of the data into the function.
///
Expand Down Expand Up @@ -638,29 +629,14 @@ pub fn console_log(
}
}

/// Schedule a reducer to be called asynchronously at `time`.
/// Schedule a reducer to be called asynchronously, nonatomically, and immediately
/// on a best-effort basis.
///
/// The reducer is assigned `name` and is provided `args` as its argument.
///
/// A generated schedule id is assigned to the reducer which is returned.
///
/// Returns an error if the `time` delay exceeds `64^6 - 1` milliseconds from now.
///
/// TODO: not fully implemented yet
/// TODO(Centril): Unsure what is unimplemented; perhaps it refers to a new
/// implementation with a special system table rather than a special sys call.
#[cfg(feature = "unstable_abi")]
#[inline]
pub fn schedule(name: &str, args: &[u8], time: u64) -> u64 {
let mut out = 0;
unsafe { raw::_schedule_reducer(name.as_ptr(), name.len(), args.as_ptr(), args.len(), time, &mut out) }
out
}

/// Unschedule a reducer using the same `id` generated as when it was scheduled.
///
/// This assumes that the reducer hasn't already been executed.
pub fn cancel_reducer(id: u64) {
unsafe { raw::_cancel_reducer(id) }
pub fn volatile_nonatomic_schedule_immediate(name: &str, args: &[u8]) {
unsafe { raw::_volatile_nonatomic_schedule_immediate(name.as_ptr(), name.len(), args.as_ptr(), args.len()) }
}

/// A RAII wrapper around [`raw::Buffer`].
Expand Down
1 change: 1 addition & 0 deletions crates/bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bench = false
[features]
default = ["rand"]
rand = ["dep:rand", "dep:getrandom"]
unstable_abi = ["spacetimedb-bindings-sys/unstable_abi"]

[dependencies]
spacetimedb-bindings-sys.workspace = true
Expand Down
34 changes: 31 additions & 3 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,35 @@ pub mod query {
}
}

/// An erased reducer.
pub struct AnyReducer {
_never: std::convert::Infallible,
#[cfg(feature = "unstable_abi")]
#[macro_export]
macro_rules! volatile_nonatomic_schedule_immediate {
($($args:tt)*) => {
$crate::__volatile_nonatomic_schedule_immediate_impl!([] [$($args)*])
};
}

#[cfg(feature = "unstable_abi")]
#[doc(hidden)]
#[macro_export]
macro_rules! __volatile_nonatomic_schedule_immediate_impl {
([$repeater:path] [($($args:tt)*)]) => {
$crate::__volatile_nonatomic_schedule_immediate_impl!(@process_args $repeater, ($($args)*))
};
([$($cur:tt)*] [$next:tt $($rest:tt)*]) => {
$crate::__volatile_nonatomic_schedule_immediate_impl!([$($cur)* $next] [$($rest)*])
};
(@process_args $repeater:path, (_$(, $args:expr)* $(,)?)) => {
$crate::__volatile_nonatomic_schedule_immediate_impl!(@call $repeater, ($crate::ReducerContext::__dummy(), $($args),*))
};
(@process_args $repeater:path, ($($args:expr),* $(,)?)) => {
$crate::__volatile_nonatomic_schedule_immediate_impl!(@call $repeater, ($($args),*))
};
(@call $repeater:path, ($($args:expr),*)) => {
if false {
let _ = $repeater($($args,)*);
} else {
$crate::rt::volatile_nonatomic_schedule_immediate::<_, _, $repeater, _>($repeater, ($($args,)*))
}
};
}
36 changes: 32 additions & 4 deletions crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ fn cvt_result(res: Result<(), Box<str>>) -> Buffer {
/// The type parameter `T` is used for determining whether there is a context argument.
pub trait Reducer<'de, A: Args<'de>, T> {
fn invoke(&self, ctx: ReducerContext, args: A) -> Result<(), Box<str>>;

type ArgsWithContext;
fn extract_args(args: Self::ArgsWithContext) -> A;
}

/// A trait for types that can *describe* a reducer.
Expand Down Expand Up @@ -232,18 +235,18 @@ macro_rules! impl_reducer {
Ok(($($T,)*))
}

#[allow(non_snake_case)]
fn serialize_seq_product<Ser: SerializeSeqProduct>(&self, _prod: &mut Ser) -> Result<(), Ser::Error> {
// For every element in the product, serialize.
#[allow(non_snake_case)]
let ($($T,)*) = self;
$(_prod.serialize_element($T)?;)*
Ok(())
}

#[inline]
#[allow(non_snake_case, irrefutable_let_patterns)]
fn schema<Info: ReducerInfo>(_typespace: &mut impl TypespaceBuilder) -> ReducerDef {
// Extract the names of the arguments.
#[allow(non_snake_case, irrefutable_let_patterns)]
let [.., $($T),*] = Info::ARG_NAMES else { panic!() };
ReducerDef {
name: Info::NAME.into(),
Expand All @@ -263,11 +266,18 @@ macro_rules! impl_reducer {
Func: Fn(ReducerContext, $($T),*) -> Ret,
Ret: ReducerResult
{
#[allow(non_snake_case)]
fn invoke(&self, ctx: ReducerContext, args: ($($T,)*)) -> Result<(), Box<str>> {
#[allow(non_snake_case)]
let ($($T,)*) = args;
self(ctx, $($T),*).into_result()
}

type ArgsWithContext = (ReducerContext, $($T,)*);
#[allow(non_snake_case, clippy::unused_unit)]
fn extract_args(args: Self::ArgsWithContext) -> ($($T,)*) {
let (_ctx, $($T,)*) = args;
($($T,)*)
}
}

// Implement `Reducer<..., NoContextArg>` for the tuple type `($($T,)*)`.
Expand All @@ -276,11 +286,16 @@ macro_rules! impl_reducer {
Func: Fn($($T),*) -> Ret,
Ret: ReducerResult
{
#[allow(non_snake_case)]
fn invoke(&self, _ctx: ReducerContext, args: ($($T,)*)) -> Result<(), Box<str>> {
#[allow(non_snake_case)]
let ($($T,)*) = args;
self($($T),*).into_result()
}

type ArgsWithContext = ($($T,)*);
fn extract_args(args: Self::ArgsWithContext) -> ($($T,)*) {
args
}
}
};
// Counts the number of elements in the tuple.
Expand Down Expand Up @@ -488,3 +503,16 @@ macro_rules! __make_register_reftype {
};
};
}

#[cfg(feature = "unstable_abi")]
#[doc(hidden)]
pub fn volatile_nonatomic_schedule_immediate<'de, A: Args<'de>, R: Reducer<'de, A, T>, R2: ReducerInfo, T>(
_reducer: R,
args: R::ArgsWithContext,
) {
let args = R::extract_args(args);
let arg_bytes = bsatn::to_vec(&SerDeArgs(args)).unwrap();

// Schedule the reducer.
sys::volatile_nonatomic_schedule_immediate(R2::NAME, &arg_bytes)
}
Loading

0 comments on commit 473eb31

Please sign in to comment.