Skip to content

Commit

Permalink
Return an error when scheduling a reducer with a long delay (#77)
Browse files Browse the repository at this point in the history
Prior to this commit, it was possible for a module to crash SpacetimeDB
by scheduling a reducer with a delay longer than ~2yrs.
This was due to our use of `tokio_utils::time::DelayQueue` to handle scheduling.
`DelayQueue`'s internal data structure imposes a limit of 64^6 ms on delays,
a little more than two years.
Attempting to insert with a delay longer than that panics.

With this commit, we avoid the panic by checking ourselves that the requested delay
is not longer than 64^6 ms.
This requires bubbling a `ScheduleError` up from `Scheduler::schedule`
to `WasmInstanceEnv::schedule`, where it is converted into a `RuntimeError`
which crashes the module.

`Scheduler::schedule` could also fail because its transaction to compute a new id
was fallible. This seems unlikely to ever fail, and if it does, we have bigger problems,
so `unwrap`ping might still be reasonable for that case,
but this commit converts it into a handle-able `Err`or anyway,
as there's essentially no cost in complexity to doing so.
  • Loading branch information
gefjon authored Jul 13, 2023
1 parent 59c26f9 commit 882d4cf
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 15 deletions.
9 changes: 7 additions & 2 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::util::prometheus_handle::HistogramVecHandle;
use crate::util::ResultInspectExt;
use crate::worker_metrics::{INSTANCE_ENV_DELETE_BY_COL_EQ, INSTANCE_ENV_INSERT};

use super::scheduler::{ScheduledReducerId, Scheduler};
use super::scheduler::{ScheduleError, ScheduledReducerId, Scheduler};
use super::timestamp::Timestamp;
use super::tracelog::instance_trace::TraceLog;
use crate::vm::DbProgram;
Expand Down Expand Up @@ -54,7 +54,12 @@ impl InstanceEnv {
}

#[tracing::instrument(skip_all)]
pub fn schedule(&self, reducer: String, args: Vec<u8>, time: Timestamp) -> ScheduledReducerId {
pub fn schedule(
&self,
reducer: String,
args: Vec<u8>,
time: Timestamp,
) -> Result<ScheduledReducerId, ScheduleError> {
self.scheduler.schedule(reducer, args, time)
}

Expand Down
93 changes: 82 additions & 11 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::StreamExt;
use rustc_hash::FxHashMap;
use sled::transaction::{ConflictableTransactionError::Abort as TxAbort, TransactionError};
use spacetimedb_lib::bsatn;
use spacetimedb_lib::bsatn::ser::BsatnError;
use tokio::sync::mpsc;
use tokio_util::time::{delay_queue, DelayQueue};

Expand Down Expand Up @@ -101,26 +102,96 @@ impl SchedulerStarter {
}
}

/// The maximum `Duration` into the future that we can schedule a reducer.
///
/// `tokio_utils::time::DelayQueue`, as of version 0.7.8,
/// limits its scheduling to at most approx. 2 years into the future.
/// More specifically, they define:
/// ```ignore
/// const NUM_LEVELS: usize = 6;
/// const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
/// ```
/// These specific incantations have to do with the internal representation
/// of `DelayQueue`.
///
/// Unfortunately, rather than returning an `Err`
/// if the requested duration is longer than `MAX_DURATION`,
/// `DelayQueue` will panic.
/// We can't allow users to crash SpacetimeDB
/// by scheduling a reducer in the distant future,
/// so we have to re-derive their maximum delay
/// and check against it ourselves.
///
/// The exact range of delays supported by `DelayQueue` may change in the future,
/// but (hopefully) it won't ever shrink, as that would be a user-visible regression.
/// If `DelayQueue` extends to support a larger range,
/// we may reject some long-delayed schedule calls which could succeed,
/// but we will never permit a schedule attempt which will panic.
const MAX_SCHEDULE_DELAY: std::time::Duration = std::time::Duration::from_millis(
// Equal to 64^6 - 1 milliseconds, which is 2.177589 years.
(1 << (6 * 6)) - 1,
);

#[derive(thiserror::Error, Debug)]
pub enum ScheduleError {
#[error("Unable to schedule with long delay at {0:?}")]
DelayTooLong(Timestamp),

#[error("Unable to generate a ScheduledReducerId: {0:?}")]
IdTransactionError(#[from] TransactionError<BsatnError>),
}

impl Scheduler {
pub fn schedule(&self, reducer: String, bsatn_args: Vec<u8>, at: Timestamp) -> ScheduledReducerId {
pub fn schedule(
&self,
reducer: String,
bsatn_args: Vec<u8>,
at: Timestamp,
) -> Result<ScheduledReducerId, ScheduleError> {
// Check that `at` is within `tokio_utils::time::DelayQueue`'s accepted time-range.
//
// `DelayQueue` uses a sliding window,
// and there may be some non-zero delay between this check
// and the actual call to `DelayQueue::insert`.
//
// Assuming a monotonic clock,
// this means we may reject some otherwise acceptable schedule calls.
//
// If `Timestamp::to_duration_from_now` is not monotonic,
// i.e. `std::time::SystemTime` is not monotonic,
// `DelayQueue::insert` may panic.
// This will happen if a module attempts to schedule a reducer
// with a delay just before the two-year limit,
// and the system clock is adjusted backwards
// after the check but before scheduling so that after the adjustment,
// the delay is beyond the two-year limit.
//
// We could avoid this edge case by scheduling in terms of the monotonic `Instant`,
// rather than `SystemTime`,
// but we don't currently have a meaningful way
// to convert a `Timestamp` into an `Instant`.
let delay = at.to_duration_from_now();
if delay >= MAX_SCHEDULE_DELAY {
return Err(ScheduleError::DelayTooLong(at));
}

let reducer = ScheduledReducer {
at,
reducer,
bsatn_args,
};
let id = self
.db
.transaction(|tx| {
let id = tx.generate_id()?;
let reducer = bsatn::to_vec(&reducer).map_err(TxAbort)?;
tx.insert(&id.to_le_bytes(), reducer)?;
Ok(ScheduledReducerId(id))
})
.unwrap();

let id = self.db.transaction(|tx| {
let id = tx.generate_id()?;
let reducer = bsatn::to_vec(&reducer).map_err(TxAbort)?;
tx.insert(&id.to_le_bytes(), reducer)?;
Ok(ScheduledReducerId(id))
})?;

// if the actor has exited, it's fine to ignore; it means that the host actor calling
// schedule will exit soon as well, and it'll be scheduled to run when the module host restarts
let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::Schedule { id, at }));
id
Ok(id)
}

pub fn cancel(&self, id: ScheduledReducerId) {
Expand Down
13 changes: 11 additions & 2 deletions crates/core/src/host/wasmer/wasm_instance_env.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(clippy::too_many_arguments)]

use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record};
use crate::host::scheduler::ScheduledReducerId;
use crate::host::scheduler::{ScheduleError, ScheduledReducerId};
use crate::host::timestamp::Timestamp;
use crate::host::wasm_common::{err_to_errno, AbiRuntimeError, BufferIdx, BufferIterIdx, BufferIters, Buffers};
use bytes::Bytes;
Expand Down Expand Up @@ -124,7 +124,16 @@ impl WasmInstanceEnv {
// Noa: This would be nice but I think the eventual goal/desire is to switch to wasmtime,
// which doesn't allow user types to impl ValueType.
// Probably the correct API choice, but makes things a bit less ergonomic sometimes.
let ScheduledReducerId(id) = caller.data().instance_env.schedule(name, args, Timestamp(time));
let ScheduledReducerId(id) = caller
.data()
.instance_env
.schedule(name, args, Timestamp(time))
.map_err(|e| match e {
ScheduleError::DelayTooLong(_) => RuntimeError::new("requested delay is too long"),
ScheduleError::IdTransactionError(_) => {
RuntimeError::new("transaction to acquire ScheduleReducerId failed")
}
})?;
Ok(id)
})
.map(|_| ())
Expand Down

0 comments on commit 882d4cf

Please sign in to comment.