diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index c916ee2778..5ab73e670b 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -14,7 +14,7 @@ use crate::util::prometheus_handle::HistogramVecHandle; use crate::util::ResultInspectExt; use crate::worker_metrics::{INSTANCE_ENV_DELETE_BY_COL_EQ, INSTANCE_ENV_INSERT}; -use super::scheduler::{ScheduledReducerId, Scheduler}; +use super::scheduler::{ScheduleError, ScheduledReducerId, Scheduler}; use super::timestamp::Timestamp; use super::tracelog::instance_trace::TraceLog; use crate::vm::DbProgram; @@ -54,7 +54,12 @@ impl InstanceEnv { } #[tracing::instrument(skip_all)] - pub fn schedule(&self, reducer: String, args: Vec, time: Timestamp) -> ScheduledReducerId { + pub fn schedule( + &self, + reducer: String, + args: Vec, + time: Timestamp, + ) -> Result { self.scheduler.schedule(reducer, args, time) } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 2c803a7340..4f8e4a6c15 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -4,6 +4,7 @@ use futures::StreamExt; use rustc_hash::FxHashMap; use sled::transaction::{ConflictableTransactionError::Abort as TxAbort, TransactionError}; use spacetimedb_lib::bsatn; +use spacetimedb_lib::bsatn::ser::BsatnError; use tokio::sync::mpsc; use tokio_util::time::{delay_queue, DelayQueue}; @@ -101,26 +102,96 @@ impl SchedulerStarter { } } +/// The maximum `Duration` into the future that we can schedule a reducer. +/// +/// `tokio_utils::time::DelayQueue`, as of version 0.7.8, +/// limits its scheduling to at most approx. 2 years into the future. +/// More specifically, they define: +/// ```ignore +/// const NUM_LEVELS: usize = 6; +/// const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1; +/// ``` +/// These specific incantations have to do with the internal representation +/// of `DelayQueue`. +/// +/// Unfortunately, rather than returning an `Err` +/// if the requested duration is longer than `MAX_DURATION`, +/// `DelayQueue` will panic. +/// We can't allow users to crash SpacetimeDB +/// by scheduling a reducer in the distant future, +/// so we have to re-derive their maximum delay +/// and check against it ourselves. +/// +/// The exact range of delays supported by `DelayQueue` may change in the future, +/// but (hopefully) it won't ever shrink, as that would be a user-visible regression. +/// If `DelayQueue` extends to support a larger range, +/// we may reject some long-delayed schedule calls which could succeed, +/// but we will never permit a schedule attempt which will panic. +const MAX_SCHEDULE_DELAY: std::time::Duration = std::time::Duration::from_millis( + // Equal to 64^6 - 1 milliseconds, which is 2.177589 years. + (1 << (6 * 6)) - 1, +); + +#[derive(thiserror::Error, Debug)] +pub enum ScheduleError { + #[error("Unable to schedule with long delay at {0:?}")] + DelayTooLong(Timestamp), + + #[error("Unable to generate a ScheduledReducerId: {0:?}")] + IdTransactionError(#[from] TransactionError), +} + impl Scheduler { - pub fn schedule(&self, reducer: String, bsatn_args: Vec, at: Timestamp) -> ScheduledReducerId { + pub fn schedule( + &self, + reducer: String, + bsatn_args: Vec, + at: Timestamp, + ) -> Result { + // Check that `at` is within `tokio_utils::time::DelayQueue`'s accepted time-range. + // + // `DelayQueue` uses a sliding window, + // and there may be some non-zero delay between this check + // and the actual call to `DelayQueue::insert`. + // + // Assuming a monotonic clock, + // this means we may reject some otherwise acceptable schedule calls. + // + // If `Timestamp::to_duration_from_now` is not monotonic, + // i.e. `std::time::SystemTime` is not monotonic, + // `DelayQueue::insert` may panic. + // This will happen if a module attempts to schedule a reducer + // with a delay just before the two-year limit, + // and the system clock is adjusted backwards + // after the check but before scheduling so that after the adjustment, + // the delay is beyond the two-year limit. + // + // We could avoid this edge case by scheduling in terms of the monotonic `Instant`, + // rather than `SystemTime`, + // but we don't currently have a meaningful way + // to convert a `Timestamp` into an `Instant`. + let delay = at.to_duration_from_now(); + if delay >= MAX_SCHEDULE_DELAY { + return Err(ScheduleError::DelayTooLong(at)); + } + let reducer = ScheduledReducer { at, reducer, bsatn_args, }; - let id = self - .db - .transaction(|tx| { - let id = tx.generate_id()?; - let reducer = bsatn::to_vec(&reducer).map_err(TxAbort)?; - tx.insert(&id.to_le_bytes(), reducer)?; - Ok(ScheduledReducerId(id)) - }) - .unwrap(); + + let id = self.db.transaction(|tx| { + let id = tx.generate_id()?; + let reducer = bsatn::to_vec(&reducer).map_err(TxAbort)?; + tx.insert(&id.to_le_bytes(), reducer)?; + Ok(ScheduledReducerId(id)) + })?; + // if the actor has exited, it's fine to ignore; it means that the host actor calling // schedule will exit soon as well, and it'll be scheduled to run when the module host restarts let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::Schedule { id, at })); - id + Ok(id) } pub fn cancel(&self, id: ScheduledReducerId) { diff --git a/crates/core/src/host/wasmer/wasm_instance_env.rs b/crates/core/src/host/wasmer/wasm_instance_env.rs index b7f9ad8105..c75bbe6268 100644 --- a/crates/core/src/host/wasmer/wasm_instance_env.rs +++ b/crates/core/src/host/wasmer/wasm_instance_env.rs @@ -1,7 +1,7 @@ #![allow(clippy::too_many_arguments)] use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record}; -use crate::host::scheduler::ScheduledReducerId; +use crate::host::scheduler::{ScheduleError, ScheduledReducerId}; use crate::host::timestamp::Timestamp; use crate::host::wasm_common::{err_to_errno, AbiRuntimeError, BufferIdx, BufferIterIdx, BufferIters, Buffers}; use bytes::Bytes; @@ -124,7 +124,16 @@ impl WasmInstanceEnv { // Noa: This would be nice but I think the eventual goal/desire is to switch to wasmtime, // which doesn't allow user types to impl ValueType. // Probably the correct API choice, but makes things a bit less ergonomic sometimes. - let ScheduledReducerId(id) = caller.data().instance_env.schedule(name, args, Timestamp(time)); + let ScheduledReducerId(id) = caller + .data() + .instance_env + .schedule(name, args, Timestamp(time)) + .map_err(|e| match e { + ScheduleError::DelayTooLong(_) => RuntimeError::new("requested delay is too long"), + ScheduleError::IdTransactionError(_) => { + RuntimeError::new("transaction to acquire ScheduleReducerId failed") + } + })?; Ok(id) }) .map(|_| ())