Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return an error when scheduling a reducer with a long delay #77

Merged
merged 1 commit into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::util::prometheus_handle::HistogramVecHandle;
use crate::util::ResultInspectExt;
use crate::worker_metrics::{INSTANCE_ENV_DELETE_BY_COL_EQ, INSTANCE_ENV_INSERT};

use super::scheduler::{ScheduledReducerId, Scheduler};
use super::scheduler::{ScheduleError, ScheduledReducerId, Scheduler};
use super::timestamp::Timestamp;
use super::tracelog::instance_trace::TraceLog;
use crate::vm::DbProgram;
Expand Down Expand Up @@ -54,7 +54,12 @@ impl InstanceEnv {
}

#[tracing::instrument(skip_all)]
pub fn schedule(&self, reducer: String, args: Vec<u8>, time: Timestamp) -> ScheduledReducerId {
pub fn schedule(
&self,
reducer: String,
args: Vec<u8>,
time: Timestamp,
) -> Result<ScheduledReducerId, ScheduleError> {
self.scheduler.schedule(reducer, args, time)
}

Expand Down
93 changes: 82 additions & 11 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::StreamExt;
use rustc_hash::FxHashMap;
use sled::transaction::{ConflictableTransactionError::Abort as TxAbort, TransactionError};
use spacetimedb_lib::bsatn;
use spacetimedb_lib::bsatn::ser::BsatnError;
use tokio::sync::mpsc;
use tokio_util::time::{delay_queue, DelayQueue};

Expand Down Expand Up @@ -101,26 +102,96 @@ impl SchedulerStarter {
}
}

/// The maximum `Duration` into the future that we can schedule a reducer.
///
/// `tokio_utils::time::DelayQueue`, as of version 0.7.8,
/// limits its scheduling to at most approx. 2 years into the future.
/// More specifically, they define:
/// ```ignore
/// const NUM_LEVELS: usize = 6;
/// const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
/// ```
/// These specific incantations have to do with the internal representation
/// of `DelayQueue`.
///
/// Unfortunately, rather than returning an `Err`
/// if the requested duration is longer than `MAX_DURATION`,
/// `DelayQueue` will panic.
/// We can't allow users to crash SpacetimeDB
/// by scheduling a reducer in the distant future,
/// so we have to re-derive their maximum delay
/// and check against it ourselves.
///
/// The exact range of delays supported by `DelayQueue` may change in the future,
/// but (hopefully) it won't ever shrink, as that would be a user-visible regression.
/// If `DelayQueue` extends to support a larger range,
/// we may reject some long-delayed schedule calls which could succeed,
/// but we will never permit a schedule attempt which will panic.
const MAX_SCHEDULE_DELAY: std::time::Duration = std::time::Duration::from_millis(
// Equal to 64^6 - 1 milliseconds, which is 2.177589 years.
(1 << (6 * 6)) - 1,
);

#[derive(thiserror::Error, Debug)]
pub enum ScheduleError {
#[error("Unable to schedule with long delay at {0:?}")]
DelayTooLong(Timestamp),

#[error("Unable to generate a ScheduledReducerId: {0:?}")]
IdTransactionError(#[from] TransactionError<BsatnError>),
}

impl Scheduler {
pub fn schedule(&self, reducer: String, bsatn_args: Vec<u8>, at: Timestamp) -> ScheduledReducerId {
pub fn schedule(
&self,
reducer: String,
bsatn_args: Vec<u8>,
at: Timestamp,
) -> Result<ScheduledReducerId, ScheduleError> {
// Check that `at` is within `tokio_utils::time::DelayQueue`'s accepted time-range.
//
// `DelayQueue` uses a sliding window,
// and there may be some non-zero delay between this check
// and the actual call to `DelayQueue::insert`.
//
// Assuming a monotonic clock,
// this means we may reject some otherwise acceptable schedule calls.
//
// If `Timestamp::to_duration_from_now` is not monotonic,
// i.e. `std::time::SystemTime` is not monotonic,
// `DelayQueue::insert` may panic.
// This will happen if a module attempts to schedule a reducer
// with a delay just before the two-year limit,
// and the system clock is adjusted backwards
// after the check but before scheduling so that after the adjustment,
// the delay is beyond the two-year limit.
//
// We could avoid this edge case by scheduling in terms of the monotonic `Instant`,
// rather than `SystemTime`,
// but we don't currently have a meaningful way
// to convert a `Timestamp` into an `Instant`.
let delay = at.to_duration_from_now();
if delay >= MAX_SCHEDULE_DELAY {
return Err(ScheduleError::DelayTooLong(at));
}

let reducer = ScheduledReducer {
at,
reducer,
bsatn_args,
};
let id = self
.db
.transaction(|tx| {
let id = tx.generate_id()?;
let reducer = bsatn::to_vec(&reducer).map_err(TxAbort)?;
tx.insert(&id.to_le_bytes(), reducer)?;
Ok(ScheduledReducerId(id))
})
.unwrap();

let id = self.db.transaction(|tx| {
let id = tx.generate_id()?;
let reducer = bsatn::to_vec(&reducer).map_err(TxAbort)?;
tx.insert(&id.to_le_bytes(), reducer)?;
Ok(ScheduledReducerId(id))
})?;

// if the actor has exited, it's fine to ignore; it means that the host actor calling
// schedule will exit soon as well, and it'll be scheduled to run when the module host restarts
let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::Schedule { id, at }));
id
Ok(id)
}

pub fn cancel(&self, id: ScheduledReducerId) {
Expand Down
13 changes: 11 additions & 2 deletions crates/core/src/host/wasmer/wasm_instance_env.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(clippy::too_many_arguments)]

use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record};
use crate::host::scheduler::ScheduledReducerId;
use crate::host::scheduler::{ScheduleError, ScheduledReducerId};
use crate::host::timestamp::Timestamp;
use crate::host::wasm_common::{err_to_errno, AbiRuntimeError, BufferIdx, BufferIterIdx, BufferIters, Buffers};
use bytes::Bytes;
Expand Down Expand Up @@ -124,7 +124,16 @@ impl WasmInstanceEnv {
// Noa: This would be nice but I think the eventual goal/desire is to switch to wasmtime,
// which doesn't allow user types to impl ValueType.
// Probably the correct API choice, but makes things a bit less ergonomic sometimes.
let ScheduledReducerId(id) = caller.data().instance_env.schedule(name, args, Timestamp(time));
let ScheduledReducerId(id) = caller
.data()
.instance_env
.schedule(name, args, Timestamp(time))
.map_err(|e| match e {
ScheduleError::DelayTooLong(_) => RuntimeError::new("requested delay is too long"),
ScheduleError::IdTransactionError(_) => {
RuntimeError::new("transaction to acquire ScheduleReducerId failed")
}
})?;
Ok(id)
})
.map(|_| ())
Expand Down