From 473eb317e58d197db11f712cebdc40f424e355cb Mon Sep 17 00:00:00 2001 From: Zeke Foppa Date: Fri, 23 Aug 2024 09:21:39 -0700 Subject: [PATCH] [release/v0.11.1-beta]: Manually apply open PR #1612: Add volatile_nonatomic_schedule_immediate --- crates/bindings-csharp/Codegen/Module.cs | 20 ++++++ .../bindings-csharp/Runtime/Internal/FFI.cs | 8 +++ .../Runtime/Internal/IReducer.cs | 13 ++++ crates/bindings-sys/Cargo.toml | 3 + crates/bindings-sys/src/lib.rs | 42 +++-------- crates/bindings/Cargo.toml | 1 + crates/bindings/src/lib.rs | 34 ++++++++- crates/bindings/src/rt.rs | 36 ++++++++-- crates/core/src/host/scheduler.rs | 70 ++++++++++++++++--- .../src/host/wasmtime/wasm_instance_env.rs | 18 +++++ smoketests/tests/schedule_reducer.py | 30 ++++++++ 11 files changed, 225 insertions(+), 50 deletions(-) diff --git a/crates/bindings-csharp/Codegen/Module.cs b/crates/bindings-csharp/Codegen/Module.cs index 449ab8d2a5b..a2669f87da2 100644 --- a/crates/bindings-csharp/Codegen/Module.cs +++ b/crates/bindings-csharp/Codegen/Module.cs @@ -420,5 +420,25 @@ SpacetimeDB.Internal.Buffer args ); } ); + + reducers + .Select( + (r, ct) => + new KeyValuePair( + r.FullName, + r.Scope.GenerateExtensions( + $@" + public static void VolatileNonatomicScheduleImmediate{r.Name}({string.Join(", ", r.GetNonContextArgs().Select(a => $"{a.Type} {a.Name}"))}) {{ + using var stream = new MemoryStream(); + using var writer = new BinaryWriter(stream); + {string.Join("\n", r.GetNonContextArgs().Select(a => $"new {a.TypeInfo}().Write(writer, {a.Name});"))} + SpacetimeDB.Internal.IReducer.VolatileNonatomicScheduleImmediate(""{r.ExportName}"", stream); + }} + " + ) + ) + ) + .WithTrackingName("SpacetimeDB.Reducer.GenerateSchedule") + .RegisterSourceOutputs(context); } } diff --git a/crates/bindings-csharp/Runtime/Internal/FFI.cs b/crates/bindings-csharp/Runtime/Internal/FFI.cs index 063b7cd11a4..44d1abd6b68 100644 --- a/crates/bindings-csharp/Runtime/Internal/FFI.cs +++ b/crates/bindings-csharp/Runtime/Internal/FFI.cs @@ -180,6 +180,14 @@ ref uint buffer_len [LibraryImport(StdbNamespace)] public static partial void _iter_drop(RowIter iter_handle); + [LibraryImport(StdbNamespace)] + public static partial void _volatile_nonatomic_schedule_immediate( + [In] byte[] name, + uint name_len, + [In] byte[] args, + uint args_len + ); + [LibraryImport(StdbNamespace)] public static partial void _console_log( byte level, diff --git a/crates/bindings-csharp/Runtime/Internal/IReducer.cs b/crates/bindings-csharp/Runtime/Internal/IReducer.cs index a498c286ec4..b5082cfeb1b 100644 --- a/crates/bindings-csharp/Runtime/Internal/IReducer.cs +++ b/crates/bindings-csharp/Runtime/Internal/IReducer.cs @@ -9,4 +9,17 @@ public interface IReducer // This one is not static because we need to be able to store IReducer in a list. void Invoke(BinaryReader reader, ReducerContext args); + + public static void VolatileNonatomicScheduleImmediate(string name, MemoryStream args) + { + var name_bytes = Encoding.UTF8.GetBytes(name); + var args_bytes = args.ToArray(); + + FFI._volatile_nonatomic_schedule_immediate( + name_bytes, + (uint)name_bytes.Length, + args_bytes, + (uint)args_bytes.Length + ); + } } diff --git a/crates/bindings-sys/Cargo.toml b/crates/bindings-sys/Cargo.toml index d650944793c..3ab56914af4 100644 --- a/crates/bindings-sys/Cargo.toml +++ b/crates/bindings-sys/Cargo.toml @@ -10,5 +10,8 @@ rust-version.workspace = true # Benching off, because of https://bheisler.github.io/criterion.rs/book/faq.html#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options bench = false +[features] +unstable_abi = [] + [dependencies] spacetimedb-primitives.workspace = true diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 0093ac252e8..d8818208ea2 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -225,32 +225,23 @@ pub mod raw { message_len: usize, ); - /// Schedules a reducer to be called asynchronously at `time`. + /// Schedules a reducer to be called asynchronously, nonatomically, + /// and immediately on a best effort basis. /// /// The reducer is named as the valid UTF-8 slice `(name, name_len)`, /// and is passed the slice `(args, args_len)` as its argument. /// - /// A generated schedule id is assigned to the reducer. - /// This id is written to the pointer `out`. - /// /// Traps if - /// - the `time` delay exceeds `64^6 - 1` milliseconds from now /// - `name` does not point to valid UTF-8 /// - `name + name_len` or `args + args_len` overflow a 64-bit integer - pub fn _schedule_reducer( + #[cfg(feature = "unstable_abi")] + pub fn _volatile_nonatomic_schedule_immediate( name: *const u8, name_len: usize, args: *const u8, args_len: usize, - time: u64, - out: *mut u64, ); - /// Unschedule a reducer using the same `id` generated as when it was scheduled. - /// - /// This assumes that the reducer hasn't already been executed. - pub fn _cancel_reducer(id: u64); - /// Returns the length (number of bytes) of buffer `bufh` without /// transferring ownership of the data into the function. /// @@ -638,29 +629,14 @@ pub fn console_log( } } -/// Schedule a reducer to be called asynchronously at `time`. +/// Schedule a reducer to be called asynchronously, nonatomically, and immediately +/// on a best-effort basis. /// /// The reducer is assigned `name` and is provided `args` as its argument. -/// -/// A generated schedule id is assigned to the reducer which is returned. -/// -/// Returns an error if the `time` delay exceeds `64^6 - 1` milliseconds from now. -/// -/// TODO: not fully implemented yet -/// TODO(Centril): Unsure what is unimplemented; perhaps it refers to a new -/// implementation with a special system table rather than a special sys call. +#[cfg(feature = "unstable_abi")] #[inline] -pub fn schedule(name: &str, args: &[u8], time: u64) -> u64 { - let mut out = 0; - unsafe { raw::_schedule_reducer(name.as_ptr(), name.len(), args.as_ptr(), args.len(), time, &mut out) } - out -} - -/// Unschedule a reducer using the same `id` generated as when it was scheduled. -/// -/// This assumes that the reducer hasn't already been executed. -pub fn cancel_reducer(id: u64) { - unsafe { raw::_cancel_reducer(id) } +pub fn volatile_nonatomic_schedule_immediate(name: &str, args: &[u8]) { + unsafe { raw::_volatile_nonatomic_schedule_immediate(name.as_ptr(), name.len(), args.as_ptr(), args.len()) } } /// A RAII wrapper around [`raw::Buffer`]. diff --git a/crates/bindings/Cargo.toml b/crates/bindings/Cargo.toml index ce0a6cf10cf..9c2d759c7fb 100644 --- a/crates/bindings/Cargo.toml +++ b/crates/bindings/Cargo.toml @@ -16,6 +16,7 @@ bench = false [features] default = ["rand"] rand = ["dep:rand", "dep:getrandom"] +unstable_abi = ["spacetimedb-bindings-sys/unstable_abi"] [dependencies] spacetimedb-bindings-sys.workspace = true diff --git a/crates/bindings/src/lib.rs b/crates/bindings/src/lib.rs index 2a7f4b02869..14ca41a5c1c 100644 --- a/crates/bindings/src/lib.rs +++ b/crates/bindings/src/lib.rs @@ -570,7 +570,35 @@ pub mod query { } } -/// An erased reducer. -pub struct AnyReducer { - _never: std::convert::Infallible, +#[cfg(feature = "unstable_abi")] +#[macro_export] +macro_rules! volatile_nonatomic_schedule_immediate { + ($($args:tt)*) => { + $crate::__volatile_nonatomic_schedule_immediate_impl!([] [$($args)*]) + }; +} + +#[cfg(feature = "unstable_abi")] +#[doc(hidden)] +#[macro_export] +macro_rules! __volatile_nonatomic_schedule_immediate_impl { + ([$repeater:path] [($($args:tt)*)]) => { + $crate::__volatile_nonatomic_schedule_immediate_impl!(@process_args $repeater, ($($args)*)) + }; + ([$($cur:tt)*] [$next:tt $($rest:tt)*]) => { + $crate::__volatile_nonatomic_schedule_immediate_impl!([$($cur)* $next] [$($rest)*]) + }; + (@process_args $repeater:path, (_$(, $args:expr)* $(,)?)) => { + $crate::__volatile_nonatomic_schedule_immediate_impl!(@call $repeater, ($crate::ReducerContext::__dummy(), $($args),*)) + }; + (@process_args $repeater:path, ($($args:expr),* $(,)?)) => { + $crate::__volatile_nonatomic_schedule_immediate_impl!(@call $repeater, ($($args),*)) + }; + (@call $repeater:path, ($($args:expr),*)) => { + if false { + let _ = $repeater($($args,)*); + } else { + $crate::rt::volatile_nonatomic_schedule_immediate::<_, _, $repeater, _>($repeater, ($($args,)*)) + } + }; } diff --git a/crates/bindings/src/rt.rs b/crates/bindings/src/rt.rs index 92959c3cd1c..7dc3ee8bf0a 100644 --- a/crates/bindings/src/rt.rs +++ b/crates/bindings/src/rt.rs @@ -111,6 +111,9 @@ fn cvt_result(res: Result<(), Box>) -> Buffer { /// The type parameter `T` is used for determining whether there is a context argument. pub trait Reducer<'de, A: Args<'de>, T> { fn invoke(&self, ctx: ReducerContext, args: A) -> Result<(), Box>; + + type ArgsWithContext; + fn extract_args(args: Self::ArgsWithContext) -> A; } /// A trait for types that can *describe* a reducer. @@ -232,18 +235,18 @@ macro_rules! impl_reducer { Ok(($($T,)*)) } + #[allow(non_snake_case)] fn serialize_seq_product(&self, _prod: &mut Ser) -> Result<(), Ser::Error> { // For every element in the product, serialize. - #[allow(non_snake_case)] let ($($T,)*) = self; $(_prod.serialize_element($T)?;)* Ok(()) } #[inline] + #[allow(non_snake_case, irrefutable_let_patterns)] fn schema(_typespace: &mut impl TypespaceBuilder) -> ReducerDef { // Extract the names of the arguments. - #[allow(non_snake_case, irrefutable_let_patterns)] let [.., $($T),*] = Info::ARG_NAMES else { panic!() }; ReducerDef { name: Info::NAME.into(), @@ -263,11 +266,18 @@ macro_rules! impl_reducer { Func: Fn(ReducerContext, $($T),*) -> Ret, Ret: ReducerResult { + #[allow(non_snake_case)] fn invoke(&self, ctx: ReducerContext, args: ($($T,)*)) -> Result<(), Box> { - #[allow(non_snake_case)] let ($($T,)*) = args; self(ctx, $($T),*).into_result() } + + type ArgsWithContext = (ReducerContext, $($T,)*); + #[allow(non_snake_case, clippy::unused_unit)] + fn extract_args(args: Self::ArgsWithContext) -> ($($T,)*) { + let (_ctx, $($T,)*) = args; + ($($T,)*) + } } // Implement `Reducer<..., NoContextArg>` for the tuple type `($($T,)*)`. @@ -276,11 +286,16 @@ macro_rules! impl_reducer { Func: Fn($($T),*) -> Ret, Ret: ReducerResult { + #[allow(non_snake_case)] fn invoke(&self, _ctx: ReducerContext, args: ($($T,)*)) -> Result<(), Box> { - #[allow(non_snake_case)] let ($($T,)*) = args; self($($T),*).into_result() } + + type ArgsWithContext = ($($T,)*); + fn extract_args(args: Self::ArgsWithContext) -> ($($T,)*) { + args + } } }; // Counts the number of elements in the tuple. @@ -488,3 +503,16 @@ macro_rules! __make_register_reftype { }; }; } + +#[cfg(feature = "unstable_abi")] +#[doc(hidden)] +pub fn volatile_nonatomic_schedule_immediate<'de, A: Args<'de>, R: Reducer<'de, A, T>, R2: ReducerInfo, T>( + _reducer: R, + args: R::ArgsWithContext, +) { + let args = R::extract_args(args); + let arg_bytes = bsatn::to_vec(&SerDeArgs(args)).unwrap(); + + // Schedule the reducer. + sys::volatile_nonatomic_schedule_immediate(R2::NAME, &arg_bytes) +} diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 47fcabddea1..bc5e9865d16 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -47,6 +47,7 @@ enum MsgOrExit { enum SchedulerMessage { Schedule { id: ScheduledReducerId, at: ScheduleAt }, + ScheduleImmediate { reducer_name: String, args: ReducerArgs }, } pub struct ScheduledReducer { @@ -83,7 +84,7 @@ impl SchedulerStarter { // TODO(cloutiertyler): This whole start dance is scuffed, but I don't have // time to make it better right now. pub fn start(mut self, module_host: &ModuleHost) -> anyhow::Result<()> { - let mut queue: DelayQueue = DelayQueue::new(); + let mut queue: DelayQueue = DelayQueue::new(); let ctx = &ExecutionContext::internal(self.db.address()); let tx = self.db.begin_tx(); @@ -106,7 +107,7 @@ impl SchedulerStarter { let schedule_at = get_schedule_at(&tx, &self.db, table_id, &scheduled_row)?; // calculate duration left to call the scheduled reducer let duration = schedule_at.to_duration_from_now(); - queue.insert(ScheduledReducerId { table_id, schedule_id }, duration); + queue.insert(QueueItem::Id(ScheduledReducerId { table_id, schedule_id }), duration); } } @@ -205,6 +206,13 @@ impl Scheduler { Ok(()) } + pub fn volatile_nonatomic_schedule_immediate(&self, reducer_name: String, args: ReducerArgs) { + let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::ScheduleImmediate { + reducer_name, + args, + })); + } + pub fn close(&self) { let _ = self.tx.send(MsgOrExit::Exit); } @@ -212,11 +220,16 @@ impl Scheduler { struct SchedulerActor { rx: mpsc::UnboundedReceiver>, - queue: DelayQueue, + queue: DelayQueue, key_map: FxHashMap, module_host: WeakModuleHost, } +enum QueueItem { + Id(ScheduledReducerId), + VolatileNonatomicImmediate { reducer_name: String, args: ReducerArgs }, +} + impl SchedulerActor { async fn run(mut self) { loop { @@ -237,15 +250,27 @@ impl SchedulerActor { fn handle_message(&mut self, msg: SchedulerMessage) { match msg { SchedulerMessage::Schedule { id, at } => { - let key = self.queue.insert(id, at.to_duration_from_now()); + let key = self.queue.insert(QueueItem::Id(id), at.to_duration_from_now()); self.key_map.insert(id, key); } + SchedulerMessage::ScheduleImmediate { reducer_name, args } => { + self.queue.insert( + QueueItem::VolatileNonatomicImmediate { reducer_name, args }, + Duration::ZERO, + ); + } } } - async fn handle_queued(&mut self, id: Expired) { - let id = id.into_inner(); - self.key_map.remove(&id); + async fn handle_queued(&mut self, id: Expired) { + let item = id.into_inner(); + let id = match item { + QueueItem::Id(id) => Some(id), + QueueItem::VolatileNonatomicImmediate { .. } => None, + }; + if let Some(id) = id { + self.key_map.remove(&id); + } let Some(module_host) = self.module_host.upgrade() else { return; @@ -256,6 +281,29 @@ impl SchedulerActor { let module_info = module_host.info.clone(); let call_reducer_params = move |tx: &MutTxId| -> Result, anyhow::Error> { + let id = match item { + QueueItem::Id(id) => id, + QueueItem::VolatileNonatomicImmediate { reducer_name, args } => { + let (reducer_id, schema) = module_info + .reducers + .lookup(&reducer_name) + .ok_or(ReducerCallError::NoSuchReducer)?; + + let reducer_args = args.into_tuple(module_info.typespace.with_type(schema))?; + + return Ok(Some(CallReducerParams { + timestamp: Timestamp::now(), + caller_identity, + caller_address: Address::default(), + client: None, + request_id: None, + timer: None, + reducer_id, + args: reducer_args, + })); + } + }; + let Ok(schedule_row) = get_schedule_row_mut(&ctx, tx, &db, id) else { // if the row is not found, it means the schedule is cancelled by the user log::debug!( @@ -302,8 +350,10 @@ impl SchedulerActor { // delete the scheduled reducer row if its not repeated reducer Ok(_) | Err(_) => { - self.delete_scheduled_reducer_row(&ctx, &db, id, module_host_clone) - .await; + if let Some(id) = id { + self.delete_scheduled_reducer_row(&ctx, &db, id, module_host_clone) + .await; + } } } @@ -324,7 +374,7 @@ impl SchedulerActor { let schedule_at = get_schedule_at_mut(tx, db, id.table_id, schedule_row)?; if let ScheduleAt::Interval(dur) = schedule_at { - self.queue.insert(id, Duration::from_micros(dur)); + self.queue.insert(QueueItem::Id(id), Duration::from_micros(dur)); Ok(true) } else { Ok(false) diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index c3e5629f566..bbe259e3cd9 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -667,6 +667,24 @@ impl WasmInstanceEnv { Ok(()) } + pub fn volatile_nonatomic_schedule_immediate( + mut caller: Caller<'_, Self>, + name: WasmPtr, + name_len: u32, + args: WasmPtr, + args_len: u32, + ) -> RtResult<()> { + let (mem, env) = Self::mem_env(&mut caller); + let name = mem.deref_str(name, name_len)?; + let args = mem.deref_slice(args, args_len)?; + env.instance_env.scheduler.volatile_nonatomic_schedule_immediate( + name.to_owned(), + crate::host::ReducerArgs::Bsatn(args.to_vec().into()), + ); + + Ok(()) + } + /// Creates a buffer of size `data_len` in the host environment. /// /// The contents of the byte slice pointed to by `data` diff --git a/smoketests/tests/schedule_reducer.py b/smoketests/tests/schedule_reducer.py index 93450a11a02..65cfbbfd6a9 100644 --- a/smoketests/tests/schedule_reducer.py +++ b/smoketests/tests/schedule_reducer.py @@ -110,3 +110,33 @@ def test_scheduled_table_subscription_repeated_reducer(self): # subscription should have 2 updates and should not have any deletes self.assertEqual(sub(), [{'ScheduledTable': {'deletes': [], 'inserts': [repeated_row_entry]}}, {'ScheduledTable': {'deletes': [], 'inserts': [row_entry]}}]) + + +class VolatileNonatomicScheduleImmediate(Smoketest): + MODULE_CODE = """ +use spacetimedb::spacetimedb; + +#[spacetimedb(table(public))] +pub struct MyTable { + x: String, +} + +#[spacetimedb(reducer)] +fn do_schedule() { + spacetimedb::volatile_nonatomic_schedule_immediate!(do_insert("hello".to_owned())); +} + +#[spacetimedb(reducer)] +fn do_insert(x: String) { + MyTable::insert(MyTable { x }); +} +""" + def test_volatile_nonatomic_schedule_immediate(self): + """Check that volatile_nonatomic_schedule_immediate works""" + + sub = self.subscribe("SELECT * FROM MyTable", n=2) + + self.call("do_insert", "yay!") + self.call("do_schedule") + + self.assertEqual(sub(), [{'MyTable': {'deletes': [], 'inserts': [{'x': 'yay!'}]}}, {'MyTable': {'deletes': [], 'inserts': [{'x': 'hello'}]}}])