diff --git a/Cargo.lock b/Cargo.lock index 53e6438cdeccd5..8f2cfeff019943 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -205,7 +205,7 @@ dependencies = [ "asn1-rs-derive", "asn1-rs-impl", "displaydoc", - "nom", + "nom 7.1.3", "num-traits", "rusticata-macros", "thiserror", @@ -1280,6 +1280,19 @@ dependencies = [ "v8", ] +[[package]] +name = "deno_cron" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "deno_core", + "deno_unsync 0.1.1", + "saffron", + "tokio", +] + [[package]] name = "deno_crypto" version = "0.135.0" @@ -1696,6 +1709,7 @@ dependencies = [ "deno_cache", "deno_console", "deno_core", + "deno_cron", "deno_crypto", "deno_fetch", "deno_ffi", @@ -1967,7 +1981,7 @@ checksum = "dbd676fbbab537128ef0278adb5576cf363cff6aa22a7b24effe97347cfab61e" dependencies = [ "asn1-rs", "displaydoc", - "nom", + "nom 7.1.3", "num-bigint", "num-traits", "rusticata-macros", @@ -3680,6 +3694,16 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "nom" +version = "5.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08959a387a676302eebf4ddbcbc611da04285579f76f88ee0506c63b1a61dd4b" +dependencies = [ + "memchr", + "version_check", +] + [[package]] name = "nom" version = "7.1.3" @@ -4747,7 +4771,7 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" dependencies = [ - "nom", + "nom 7.1.3", ] [[package]] @@ -4888,6 +4912,16 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" +[[package]] +name = "saffron" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03fb9a628596fc7590eb7edbf7b0613287be78df107f5f97b118aad59fb2eea9" +dependencies = [ + "chrono", + "nom 5.1.3", +] + [[package]] name = "salsa20" version = "0.10.2" @@ -7033,7 +7067,7 @@ dependencies = [ "data-encoding", "der-parser", "lazy_static", - "nom", + "nom 7.1.3", "oid-registry", "rusticata-macros", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index d2a4f66380bef6..d6c8c81cd916da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "ext/broadcast_channel", "ext/cache", "ext/console", + "ext/cron", "ext/crypto", "ext/fetch", "ext/ffi", @@ -56,6 +57,7 @@ denokv_remote = "0.2.3" deno_broadcast_channel = { version = "0.115.0", path = "./ext/broadcast_channel" } deno_cache = { version = "0.53.0", path = "./ext/cache" } deno_console = { version = "0.121.0", path = "./ext/console" } +deno_cron = { version = "0.1.0", path = "./ext/cron" } deno_crypto = { version = "0.135.0", path = "./ext/crypto" } deno_fetch = { version = "0.145.0", path = "./ext/fetch" } deno_ffi = { version = "0.108.0", path = "./ext/ffi" } @@ -133,6 +135,7 @@ rustls-webpki = "0.101.4" rustls-native-certs = "0.6.2" webpki-roots = "0.25.2" scopeguard = "1.2.0" +saffron = "=0.1.0" serde = { version = "1.0.149", features = ["derive"] } serde_bytes = "0.11" serde_json = "1.0.85" diff --git a/cli/build.rs b/cli/build.rs index e6b9dc0a42b39e..b7a4524657e64f 100644 --- a/cli/build.rs +++ b/cli/build.rs @@ -349,6 +349,7 @@ deno_core::extension!( fn create_cli_snapshot(snapshot_path: PathBuf) -> CreateSnapshotOutput { use deno_core::Extension; use deno_runtime::deno_cache::SqliteBackedCache; + use deno_runtime::deno_cron::local::LocalCronHandler; use deno_runtime::deno_http::DefaultHttpPropertyExtractor; use deno_runtime::deno_kv::sqlite::SqliteDbHandler; use deno_runtime::permissions::PermissionsContainer; @@ -383,6 +384,7 @@ fn create_cli_snapshot(snapshot_path: PathBuf) -> CreateSnapshotOutput { deno_kv::deno_kv::init_ops(SqliteDbHandler::::new( None, None, )), + deno_cron::deno_cron::init_ops(LocalCronHandler::new()), deno_napi::deno_napi::init_ops::(), deno_http::deno_http::init_ops::(), deno_io::deno_io::init_ops(Default::default()), diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs index f110f8aa6cebe6..863776aa225594 100644 --- a/cli/tests/integration/js_unit_tests.rs +++ b/cli/tests/integration/js_unit_tests.rs @@ -24,6 +24,7 @@ util::unit_test_factory!( console_test, copy_file_test, custom_event_test, + cron_test, dir_test, dom_exception_test, error_stack_test, diff --git a/cli/tests/unit/cron_test.ts b/cli/tests/unit/cron_test.ts new file mode 100644 index 00000000000000..636a04fd25d464 --- /dev/null +++ b/cli/tests/unit/cron_test.ts @@ -0,0 +1,242 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +import { assertEquals, assertThrows, deferred } from "./test_util.ts"; + +const sleep = (time: number) => new Promise((r) => setTimeout(r, time)); + +Deno.test(function noNameTest() { + assertThrows( + // @ts-ignore test + () => Deno.cron(), + TypeError, + "Deno.cron requires a unique name", + ); +}); + +Deno.test(function noSchedule() { + assertThrows( + // @ts-ignore test + () => Deno.cron("foo"), + TypeError, + "Deno.cron requires a valid schedule", + ); +}); + +Deno.test(function noHandler() { + assertThrows( + // @ts-ignore test + () => Deno.cron("foo", "*/1 * * * *"), + TypeError, + "Deno.cron requires a handler", + ); +}); + +Deno.test(function invalidNameTest() { + assertThrows( + () => Deno.cron("abc[]", "*/1 * * * *", () => {}), + TypeError, + "Invalid cron name", + ); + assertThrows( + () => Deno.cron("a**bc", "*/1 * * * *", () => {}), + TypeError, + "Invalid cron name", + ); + assertThrows( + () => Deno.cron("abc<>", "*/1 * * * *", () => {}), + TypeError, + "Invalid cron name", + ); + assertThrows( + () => Deno.cron(";']", "*/1 * * * *", () => {}), + TypeError, + "Invalid cron name", + ); + assertThrows( + () => + Deno.cron( + "0000000000000000000000000000000000000000000000000000000000000000000000", + "*/1 * * * *", + () => {}, + ), + TypeError, + "Cron name is too long", + ); +}); + +Deno.test(function invalidScheduleTest() { + assertThrows( + () => Deno.cron("abc", "bogus", () => {}), + TypeError, + "Invalid cron schedule", + ); + assertThrows( + () => Deno.cron("abc", "* * * * * *", () => {}), + TypeError, + "Invalid cron schedule", + ); + assertThrows( + () => Deno.cron("abc", "* * * *", () => {}), + TypeError, + "Invalid cron schedule", + ); + assertThrows( + () => Deno.cron("abc", "m * * * *", () => {}), + TypeError, + "Invalid cron schedule", + ); +}); + +Deno.test(function invalidBackoffScheduleTest() { + assertThrows( + () => + Deno.cron("abc", "*/1 * * * *", () => {}, { + backoffSchedule: [1, 1, 1, 1, 1, 1], + }), + TypeError, + "Invalid backoff schedule", + ); + assertThrows( + () => + Deno.cron("abc", "*/1 * * * *", () => {}, { + backoffSchedule: [3600001], + }), + TypeError, + "Invalid backoff schedule", + ); +}); + +Deno.test(async function tooManyCrons() { + const crons: Promise[] = []; + const ac = new AbortController(); + for (let i = 0; i <= 100; i++) { + const c = Deno.cron(`abc_${i}`, "*/1 * * * *", () => {}, { + signal: ac.signal, + }); + crons.push(c); + } + + try { + assertThrows( + () => { + Deno.cron("next-cron", "*/1 * * * *", () => {}, { signal: ac.signal }); + }, + TypeError, + "Too many crons", + ); + } finally { + ac.abort(); + for (const c of crons) { + await c; + } + } +}); + +Deno.test(async function duplicateCrons() { + const ac = new AbortController(); + const c = Deno.cron("abc", "*/20 * * * *", () => { + }, { signal: ac.signal }); + try { + assertThrows( + () => Deno.cron("abc", "*/20 * * * *", () => {}), + TypeError, + "Cron with this name already exists", + ); + } finally { + ac.abort(); + await c; + } +}); + +Deno.test(async function basicTest() { + Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "100"); + + let count = 0; + const promise = deferred(); + const ac = new AbortController(); + const c = Deno.cron("abc", "*/20 * * * *", () => { + count++; + if (count > 5) { + promise.resolve(); + } + }, { signal: ac.signal }); + try { + await promise; + } finally { + ac.abort(); + await c; + } +}); + +Deno.test(async function multipleCrons() { + Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "100"); + + let count0 = 0; + let count1 = 0; + const promise0 = deferred(); + const promise1 = deferred(); + const ac = new AbortController(); + const c0 = Deno.cron("abc", "*/20 * * * *", () => { + count0++; + if (count0 > 5) { + promise0.resolve(); + } + }, { signal: ac.signal }); + const c1 = Deno.cron("xyz", "*/20 * * * *", () => { + count1++; + if (count1 > 5) { + promise1.resolve(); + } + }, { signal: ac.signal }); + try { + await promise0; + await promise1; + } finally { + ac.abort(); + await c0; + await c1; + } +}); + +Deno.test(async function overlappingExecutions() { + Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "100"); + + let count = 0; + const promise0 = deferred(); + const promise1 = deferred(); + const ac = new AbortController(); + const c = Deno.cron("abc", "*/20 * * * *", async () => { + promise0.resolve(); + count++; + await promise1; + }, { signal: ac.signal }); + try { + await promise0; + } finally { + await sleep(2000); + promise1.resolve(); + ac.abort(); + await c; + } + assertEquals(count, 1); +}); + +Deno.test(async function retriesWithBackkoffSchedule() { + Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "5000"); + + let count = 0; + const ac = new AbortController(); + const c = Deno.cron("abc", "*/20 * * * *", async () => { + count += 1; + await sleep(10); + throw new TypeError("cron error"); + }, { signal: ac.signal, backoffSchedule: [10, 20] }); + try { + await sleep(6000); + } finally { + ac.abort(); + await c; + } + + // The cron should have executed 3 times (1st attempt and 2 retries). + assertEquals(count, 3); +}); diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 56188f3b8ab592..c758e620c47b6a 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1317,6 +1317,31 @@ declare namespace Deno { */ export function openKv(path?: string): Promise; + /** **UNSTABLE**: New API, yet to be vetted. + * + * Create a cron job that will periodically execute the provided handler + * callback based on the specified schedule. + * + * ```ts + * Deno.cron("sample cron", "*\/20 * * * *", () => { + * console.log("cron job executed"); + * }); + * ``` + * `backoffSchedule` option can be used to specify the retry policy for failed + * executions. Each element in the array represents the number of milliseconds + * to wait before retrying the execution. For example, `[1000, 5000, 10000]` + * means that a failed execution will be retried at most 3 times, with 1 + * second, 5 seconds, and 10 seconds delay between each retry. + * + * @category Cron + */ + export function cron( + name: string, + schedule: string, + handler: () => Promise | void, + options?: { backoffSchedule?: number[]; signal?: AbortSignal }, + ): Promise; + /** **UNSTABLE**: New API, yet to be vetted. * * A key to be persisted in a {@linkcode Deno.Kv}. A key is a sequence diff --git a/ext/cron/01_cron.ts b/ext/cron/01_cron.ts new file mode 100644 index 00000000000000..a615ae34bf5aab --- /dev/null +++ b/ext/cron/01_cron.ts @@ -0,0 +1,58 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// @ts-ignore internal api +const core = Deno.core; + +function cron( + name: string, + schedule: string, + handler: () => Promise | void, + options?: { backoffSchedule?: number[]; signal?: AbortSignal }, +) { + if (name === undefined) { + throw new TypeError("Deno.cron requires a unique name"); + } + if (schedule === undefined) { + throw new TypeError("Deno.cron requires a valid schedule"); + } + if (handler === undefined) { + throw new TypeError("Deno.cron requires a handler"); + } + + const rid = core.ops.op_cron_create( + name, + schedule, + options?.backoffSchedule, + ); + + if (options?.signal) { + const signal = options?.signal; + signal.addEventListener( + "abort", + () => { + core.close(rid); + }, + { once: true }, + ); + } + + return (async () => { + let success = true; + while (true) { + const r = await core.opAsync("op_cron_next", rid, success); + if (r === false) { + break; + } + try { + const result = handler(); + const _res = result instanceof Promise ? (await result) : result; + success = true; + } catch (error) { + console.error(`Exception in cron handler ${name}`, error); + success = false; + } + } + })(); +} + +export { cron }; diff --git a/ext/cron/Cargo.toml b/ext/cron/Cargo.toml new file mode 100644 index 00000000000000..ac107c5675d26a --- /dev/null +++ b/ext/cron/Cargo.toml @@ -0,0 +1,23 @@ +# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_cron" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +readme = "README.md" +repository.workspace = true +description = "Implementation of the Deno cron API" + +[lib] +path = "lib.rs" + +[dependencies] +anyhow.workspace = true +async-trait.workspace = true +chrono.workspace = true +deno_core.workspace = true +deno_unsync = "0.1.1" +saffron.workspace = true +tokio.workspace = true diff --git a/ext/cron/interface.rs b/ext/cron/interface.rs new file mode 100644 index 00000000000000..c70988788295ff --- /dev/null +++ b/ext/cron/interface.rs @@ -0,0 +1,23 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use async_trait::async_trait; +use deno_core::error::AnyError; + +pub trait CronHandler { + type EH: CronHandle + 'static; + + fn create(&self, spec: CronSpec) -> Result; +} + +#[async_trait(?Send)] +pub trait CronHandle { + async fn next(&self, prev_success: bool) -> Result; + fn close(&self); +} + +#[derive(Clone)] +pub struct CronSpec { + pub name: String, + pub cron_schedule: String, + pub backoff_schedule: Option>, +} diff --git a/ext/cron/lib.rs b/ext/cron/lib.rs new file mode 100644 index 00000000000000..c496597039eadd --- /dev/null +++ b/ext/cron/lib.rs @@ -0,0 +1,128 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +mod interface; +pub mod local; +mod time; + +use std::borrow::Cow; +use std::cell::RefCell; +use std::rc::Rc; + +use deno_core::error::get_custom_error_class; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::op2; +use deno_core::OpState; +use deno_core::Resource; +use deno_core::ResourceId; + +pub use crate::interface::*; + +pub const UNSTABLE_FEATURE_NAME: &str = "cron"; + +deno_core::extension!(deno_cron, + deps = [ deno_console ], + parameters = [ C: CronHandler ], + ops = [ + op_cron_create, + op_cron_next, + ], + esm = [ "01_cron.ts" ], + options = { + cron_handler: C, + }, + state = |state, options| { + state.put(Rc::new(options.cron_handler)); + } +); + +struct CronResource { + handle: Rc, +} + +impl Resource for CronResource { + fn name(&self) -> Cow { + "cron".into() + } + + fn close(self: Rc) { + self.handle.close(); + } +} + +#[op2] +#[smi] +fn op_cron_create( + state: Rc>, + #[string] name: String, + #[string] cron_schedule: String, + #[serde] backoff_schedule: Option>, +) -> Result +where + C: CronHandler + 'static, +{ + let cron_handler = { + let state = state.borrow(); + // TODO(bartlomieju): replace with `state.feature_checker.check_or_exit` + // once we phase out `check_or_exit_with_legacy_fallback` + state + .feature_checker + .check_or_exit_with_legacy_fallback(UNSTABLE_FEATURE_NAME, "Deno.cron"); + state.borrow::>().clone() + }; + + validate_cron_name(&name)?; + + let handle = cron_handler.create(CronSpec { + name, + cron_schedule, + backoff_schedule, + })?; + + let handle_rid = { + let mut state = state.borrow_mut(); + state.resource_table.add(CronResource { + handle: Rc::new(handle), + }) + }; + Ok(handle_rid) +} + +#[op2(async)] +async fn op_cron_next( + state: Rc>, + #[smi] rid: ResourceId, + prev_success: bool, +) -> Result +where + C: CronHandler + 'static, +{ + let cron_handler = { + let state = state.borrow(); + let resource = match state.resource_table.get::>(rid) { + Ok(resource) => resource, + Err(err) => { + if get_custom_error_class(&err) == Some("BadResource") { + return Ok(false); + } else { + return Err(err); + } + } + }; + resource.handle.clone() + }; + + cron_handler.next(prev_success).await +} + +fn validate_cron_name(name: &str) -> Result<(), AnyError> { + if name.len() > 64 { + return Err(type_error("Cron name is too long")); + } + if !name.chars().all(|c| { + c.is_ascii_whitespace() || c.is_ascii_alphanumeric() || c == '_' || c == '-' + }) { + return Err(type_error("Invalid cron name")); + } + Ok(()) +} diff --git a/ext/cron/local.rs b/ext/cron/local.rs new file mode 100644 index 00000000000000..0b6dcae2e94185 --- /dev/null +++ b/ext/cron/local.rs @@ -0,0 +1,343 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::cell::OnceCell; +use std::cell::RefCell; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::env; +use std::rc::Rc; +use std::rc::Weak; +use std::sync::Arc; + +use async_trait::async_trait; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures; +use deno_core::futures::FutureExt; +use deno_unsync::spawn; +use deno_unsync::JoinHandle; +use tokio::sync::mpsc; +use tokio::sync::mpsc::WeakSender; +use tokio::sync::OwnedSemaphorePermit; +use tokio::sync::Semaphore; + +use crate::CronHandle; +use crate::CronHandler; +use crate::CronSpec; + +const MAX_CRONS: usize = 100; +const DISPATCH_CONCURRENCY_LIMIT: usize = 50; +const MAX_BACKOFF_MS: u32 = 60 * 60 * 1_000; // 1 hour +const MAX_BACKOFF_COUNT: usize = 5; +const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1_000, 5_000, 30_000, 60_000]; + +pub struct LocalCronHandler { + cron_schedule_tx: OnceCell>, + concurrency_limiter: Arc, + cron_loop_join_handle: OnceCell>, + runtime_state: Rc>, +} + +struct RuntimeState { + crons: HashMap, + scheduled_deadlines: BTreeMap>, +} + +struct Cron { + spec: CronSpec, + next_tx: mpsc::WeakSender<()>, + current_execution_retries: u32, +} + +impl Cron { + fn backoff_schedule(&self) -> &[u32] { + self + .spec + .backoff_schedule + .as_deref() + .unwrap_or(&DEFAULT_BACKOFF_SCHEDULE) + } +} + +impl Default for LocalCronHandler { + fn default() -> Self { + Self::new() + } +} + +impl LocalCronHandler { + pub fn new() -> Self { + Self { + cron_schedule_tx: OnceCell::new(), + concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)), + cron_loop_join_handle: OnceCell::new(), + runtime_state: Rc::new(RefCell::new(RuntimeState { + crons: HashMap::new(), + scheduled_deadlines: BTreeMap::new(), + })), + } + } + + async fn cron_loop( + runtime_state: Rc>, + mut cron_schedule_rx: mpsc::Receiver<(String, bool)>, + ) -> Result<(), AnyError> { + loop { + let earliest_deadline = runtime_state + .borrow() + .scheduled_deadlines + .keys() + .next() + .copied(); + + let sleep_fut = if let Some(earliest_deadline) = earliest_deadline { + let now = crate::time::utc_now().timestamp_millis() as u64; + if let Some(delta) = earliest_deadline.checked_sub(now) { + tokio::time::sleep(std::time::Duration::from_millis(delta)).boxed() + } else { + futures::future::ready(()).boxed() + } + } else { + futures::future::pending().boxed() + }; + + let cron_to_schedule = tokio::select! { + _ = sleep_fut => None, + x = cron_schedule_rx.recv() => { + if x.is_none() { + return Ok(()); + }; + x + } + }; + + // Schedule next execution of the cron if needed. + if let Some((name, prev_success)) = cron_to_schedule { + let mut runtime_state = runtime_state.borrow_mut(); + if let Some(cron) = runtime_state.crons.get_mut(&name) { + let backoff_schedule = cron.backoff_schedule(); + let next_deadline = if !prev_success + && cron.current_execution_retries < backoff_schedule.len() as u32 + { + let backoff_ms = + backoff_schedule[cron.current_execution_retries as usize]; + let now = crate::time::utc_now().timestamp_millis() as u64; + cron.current_execution_retries += 1; + now + backoff_ms as u64 + } else { + let next_ts = compute_next_deadline(&cron.spec.cron_schedule)?; + cron.current_execution_retries = 0; + next_ts + }; + runtime_state + .scheduled_deadlines + .entry(next_deadline) + .or_default() + .push(name.to_string()); + } + } + + // Dispatch ready to execute crons. + let crons_to_execute = { + let mut runtime_state = runtime_state.borrow_mut(); + runtime_state.get_ready_crons()? + }; + for (_, tx) in crons_to_execute { + if let Some(tx) = tx.upgrade() { + let _ = tx.send(()).await; + } + } + } + } +} + +impl RuntimeState { + fn get_ready_crons( + &mut self, + ) -> Result)>, AnyError> { + let now = crate::time::utc_now().timestamp_millis() as u64; + + let ready = { + let to_remove = self + .scheduled_deadlines + .range(..=now) + .map(|(ts, _)| *ts) + .collect::>(); + to_remove + .iter() + .flat_map(|ts| { + self + .scheduled_deadlines + .remove(ts) + .unwrap() + .iter() + .map(move |name| (*ts, name.clone())) + .collect::>() + }) + .map(|(_, name)| { + (name.clone(), self.crons.get(&name).unwrap().next_tx.clone()) + }) + .collect::>() + }; + + Ok(ready) + } +} + +#[async_trait(?Send)] +impl CronHandler for LocalCronHandler { + type EH = CronExecutionHandle; + + fn create(&self, spec: CronSpec) -> Result { + // Ensure that the cron loop is started. + self.cron_loop_join_handle.get_or_init(|| { + let (cron_schedule_tx, cron_schedule_rx) = + mpsc::channel::<(String, bool)>(1); + self.cron_schedule_tx.set(cron_schedule_tx).unwrap(); + let runtime_state = self.runtime_state.clone(); + spawn(async move { + LocalCronHandler::cron_loop(runtime_state, cron_schedule_rx) + .await + .unwrap(); + }) + }); + + let mut runtime_state = self.runtime_state.borrow_mut(); + + if runtime_state.crons.len() > MAX_CRONS { + return Err(type_error("Too many crons")); + } + if runtime_state.crons.contains_key(&spec.name) { + return Err(type_error("Cron with this name already exists")); + } + + // Validate schedule expression. + spec + .cron_schedule + .parse::() + .map_err(|_| type_error("Invalid cron schedule"))?; + + // Validate backoff_schedule. + if let Some(backoff_schedule) = &spec.backoff_schedule { + validate_backoff_schedule(backoff_schedule)?; + } + + let (next_tx, next_rx) = mpsc::channel::<()>(1); + let cron = Cron { + spec: spec.clone(), + next_tx: next_tx.downgrade(), + current_execution_retries: 0, + }; + runtime_state.crons.insert(spec.name.clone(), cron); + + Ok(CronExecutionHandle { + name: spec.name.clone(), + cron_schedule_tx: self.cron_schedule_tx.get().unwrap().clone(), + concurrency_limiter: self.concurrency_limiter.clone(), + runtime_state: Rc::downgrade(&self.runtime_state), + inner: RefCell::new(Inner { + next_rx: Some(next_rx), + shutdown_tx: Some(next_tx), + permit: None, + }), + }) + } +} + +pub struct CronExecutionHandle { + name: String, + runtime_state: Weak>, + cron_schedule_tx: mpsc::Sender<(String, bool)>, + concurrency_limiter: Arc, + inner: RefCell, +} + +struct Inner { + next_rx: Option>, + shutdown_tx: Option>, + permit: Option, +} + +#[async_trait(?Send)] +impl CronHandle for CronExecutionHandle { + async fn next(&self, prev_success: bool) -> Result { + self.inner.borrow_mut().permit.take(); + + if self + .cron_schedule_tx + .send((self.name.clone(), prev_success)) + .await + .is_err() + { + return Ok(false); + }; + + let Some(mut next_rx) = self.inner.borrow_mut().next_rx.take() else { + return Ok(false); + }; + if next_rx.recv().await.is_none() { + return Ok(false); + }; + + let permit = self.concurrency_limiter.clone().acquire_owned().await?; + let mut inner = self.inner.borrow_mut(); + inner.next_rx = Some(next_rx); + inner.permit = Some(permit); + Ok(true) + } + + fn close(&self) { + if let Some(tx) = self.inner.borrow_mut().shutdown_tx.take() { + drop(tx) + } + if let Some(runtime_state) = self.runtime_state.upgrade() { + let mut runtime_state = runtime_state.borrow_mut(); + runtime_state.crons.remove(&self.name); + } + } +} + +fn compute_next_deadline(cron_expression: &str) -> Result { + let now = crate::time::utc_now(); + + if let Ok(test_schedule) = env::var("DENO_CRON_TEST_SCHEDULE_OFFSET") { + if let Ok(offset) = test_schedule.parse::() { + return Ok(now.timestamp_millis() as u64 + offset); + } + } + + let cron = cron_expression + .parse::() + .map_err(|_| anyhow::anyhow!("invalid cron expression"))?; + let Some(next_deadline) = cron.next_after(now) else { + return Err(anyhow::anyhow!("invalid cron expression")); + }; + Ok(next_deadline.timestamp_millis() as u64) +} + +fn validate_backoff_schedule( + backoff_schedule: &Vec, +) -> Result<(), AnyError> { + if backoff_schedule.len() > MAX_BACKOFF_COUNT { + return Err(type_error("Invalid backoff schedule")); + } + if backoff_schedule.iter().any(|s| *s > MAX_BACKOFF_MS) { + return Err(type_error("Invalid backoff schedule")); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_compute_next_deadline() { + let now = crate::time::utc_now().timestamp_millis() as u64; + assert!(compute_next_deadline("*/1 * * * *").unwrap() > now); + assert!(compute_next_deadline("* * * * *").unwrap() > now); + assert!(compute_next_deadline("bogus").is_err()); + assert!(compute_next_deadline("* * * * * *").is_err()); + assert!(compute_next_deadline("* * *").is_err()); + } +} diff --git a/ext/cron/time.rs b/ext/cron/time.rs new file mode 100644 index 00000000000000..60375818b6dff9 --- /dev/null +++ b/ext/cron/time.rs @@ -0,0 +1,19 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +/// Identical to chrono::Utc::now() but without the system "clock" +/// feature flag. +/// +/// The "clock" feature flag pulls in the "iana-time-zone" crate +/// which links to macOS's "CoreFoundation" framework which increases +/// startup time for the CLI. +pub fn utc_now() -> chrono::DateTime { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system time before Unix epoch"); + let naive = chrono::NaiveDateTime::from_timestamp_opt( + now.as_secs() as i64, + now.subsec_nanos(), + ) + .unwrap(); + chrono::DateTime::from_naive_utc_and_offset(naive, chrono::Utc) +} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 4634d758a9f431..7c60fd4a81b9ca 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -44,6 +44,7 @@ deno_broadcast_channel.workspace = true deno_cache.workspace = true deno_console.workspace = true deno_core.workspace = true +deno_cron.workspace = true deno_crypto.workspace = true deno_fetch.workspace = true deno_ffi.workspace = true @@ -71,6 +72,7 @@ deno_broadcast_channel.workspace = true deno_cache.workspace = true deno_console.workspace = true deno_core.workspace = true +deno_cron.workspace = true deno_crypto.workspace = true deno_fetch.workspace = true deno_ffi.workspace = true diff --git a/runtime/build.rs b/runtime/build.rs index ce1896e6fbf029..2c6f0a4d8469f8 100644 --- a/runtime/build.rs +++ b/runtime/build.rs @@ -223,6 +223,9 @@ mod startup_snapshot { deno_kv::deno_kv::init_ops_and_esm(deno_kv::sqlite::SqliteDbHandler::< Permissions, >::new(None, None)), + deno_cron::deno_cron::init_ops_and_esm( + deno_cron::local::LocalCronHandler::new(), + ), deno_napi::deno_napi::init_ops_and_esm::(), deno_http::deno_http::init_ops_and_esm::(), deno_io::deno_io::init_ops_and_esm(Default::default()), diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index 5364a60eeb2a3a..c44c14bbe07922 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -24,6 +24,7 @@ import * as tty from "ext:runtime/40_tty.js"; // TODO(bartlomieju): this is funky we have two `http` imports import * as httpRuntime from "ext:runtime/40_http.js"; import * as kv from "ext:deno_kv/01_db.ts"; +import * as cron from "ext:deno_cron/01_cron.ts"; const denoNs = { metrics: core.metrics, @@ -179,6 +180,7 @@ const denoNsUnstable = { Kv: kv.Kv, KvU64: kv.KvU64, KvListIterator: kv.KvListIterator, + cron: cron.cron, }; export { denoNs, denoNsUnstable }; diff --git a/runtime/lib.rs b/runtime/lib.rs index bc49a2fefae88e..c4944637559f80 100644 --- a/runtime/lib.rs +++ b/runtime/lib.rs @@ -4,6 +4,7 @@ pub use deno_broadcast_channel; pub use deno_cache; pub use deno_console; pub use deno_core; +pub use deno_cron; pub use deno_crypto; pub use deno_fetch; pub use deno_ffi; diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index de69ce43b307c8..5c42d752fb290e 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -36,6 +36,7 @@ use deno_core::RuntimeOptions; use deno_core::SharedArrayBufferStore; use deno_core::Snapshot; use deno_core::SourceMapGetter; +use deno_cron::local::LocalCronHandler; use deno_fs::FileSystem; use deno_http::DefaultHttpPropertyExtractor; use deno_io::Stdio; @@ -450,6 +451,7 @@ impl WebWorker { }, ), ), + deno_cron::deno_cron::init_ops_and_esm(LocalCronHandler::new()), deno_napi::deno_napi::init_ops_and_esm::(), deno_http::deno_http::init_ops_and_esm::(), deno_io::deno_io::init_ops_and_esm(Some(options.stdio)), diff --git a/runtime/worker.rs b/runtime/worker.rs index f0fc25aa2db84e..2e29101097ff0b 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -31,6 +31,7 @@ use deno_core::RuntimeOptions; use deno_core::SharedArrayBufferStore; use deno_core::Snapshot; use deno_core::SourceMapGetter; +use deno_cron::local::LocalCronHandler; use deno_fs::FileSystem; use deno_http::DefaultHttpPropertyExtractor; use deno_io::Stdio; @@ -273,6 +274,7 @@ impl MainWorker { }, ), ), + deno_cron::deno_cron::init_ops_and_esm(LocalCronHandler::new()), deno_napi::deno_napi::init_ops_and_esm::(), deno_http::deno_http::init_ops_and_esm::(), deno_io::deno_io::init_ops_and_esm(Some(options.stdio)), diff --git a/tools/core_import_map.json b/tools/core_import_map.json index 3c1d65f9342d33..0f356eb976ddb7 100644 --- a/tools/core_import_map.json +++ b/tools/core_import_map.json @@ -3,6 +3,7 @@ "ext:deno_broadcast_channel/01_broadcast_channel.js": "../ext/broadcast_channel/01_broadcast_channel.js", "ext:deno_cache/01_cache.js": "../ext/cache/01_cache.js", "ext:deno_console/01_console.js": "../ext/console/01_console.js", + "ext:deno_cron/01_cron.ts": "../ext/cron/01_cron.ts", "ext:deno_crypto/00_crypto.js": "../ext/crypto/00_crypto.js", "ext:deno_fetch/20_headers.js": "../ext/fetch/20_headers.js", "ext:deno_fetch/21_formdata.js": "../ext/fetch/21_formdata.js",