From 3d1f60280c32f3791f60d48bb08886e034a02aae Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 8 Nov 2024 02:02:21 +0000 Subject: [PATCH] feat: add put and delete ops to kv --- packages/infra/client/Cargo.lock | 12 +- .../infra/client/isolate-v8-runner/Cargo.toml | 2 +- .../isolate-v8-runner/js/40_rivet_kv.js | 110 +++++++++- .../isolate-v8-runner/js/90_rivet_ns.js | 4 + .../client/isolate-v8-runner/src/ext/kv.rs | 202 +++++++++++------- .../client/isolate-v8-runner/src/isolate.rs | 6 +- .../client/isolate-v8-runner/src/main.rs | 4 +- .../isolate-v8-runner/tests/integration.rs | 89 ++++++++ 8 files changed, 342 insertions(+), 87 deletions(-) create mode 100644 packages/infra/client/isolate-v8-runner/tests/integration.rs diff --git a/packages/infra/client/Cargo.lock b/packages/infra/client/Cargo.lock index 8bc73efc07..46da738ef0 100644 --- a/packages/infra/client/Cargo.lock +++ b/packages/infra/client/Cargo.lock @@ -3842,6 +3842,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -6802,6 +6811,7 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b1f47d22deb79c3f59fcf2a1f00f60cbdc05462bf17d1cd356c1fefa3f444bd" dependencies = [ + "nu-ansi-term 0.50.1", "time", "tracing", "tracing-core", @@ -6824,7 +6834,7 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ - "nu-ansi-term", + "nu-ansi-term 0.46.0", "serde", "serde_json", "sharded-slab", diff --git a/packages/infra/client/isolate-v8-runner/Cargo.toml b/packages/infra/client/isolate-v8-runner/Cargo.toml index 72bef958d8..1eb686dc6e 100644 --- a/packages/infra/client/isolate-v8-runner/Cargo.toml +++ b/packages/infra/client/isolate-v8-runner/Cargo.toml @@ -23,7 +23,7 @@ signal-hook = "0.3.17" tokio = { version = "1.36.0", features = [ "full" ] } tokio-tungstenite = "0.23.1" tracing = "0.1" -tracing-logfmt = "0.3" +tracing-logfmt = { version = "0.3" , features = ["ansi_logs"] } tracing-subscriber = { version = "0.3", default-features = false, features = [ "ansi", "fmt", diff --git a/packages/infra/client/isolate-v8-runner/js/40_rivet_kv.js b/packages/infra/client/isolate-v8-runner/js/40_rivet_kv.js index e235c5cd96..2f18861252 100644 --- a/packages/infra/client/isolate-v8-runner/js/40_rivet_kv.js +++ b/packages/infra/client/isolate-v8-runner/js/40_rivet_kv.js @@ -1,11 +1,113 @@ -import { op_rivet_kv_get, op_rivet_kv_get_batch } from "ext:core/ops"; +import { + op_rivet_kv_get, + op_rivet_kv_get_batch, + op_rivet_kv_put, + op_rivet_kv_put_batch, + op_rivet_kv_delete, +} from "ext:core/ops"; +import { core } from "ext:core/mod.js"; +/** + * Retrieves a value from the key-value store. + * + * @param {string} key - The key to retrieve the value for. + * @param {Object} [options] - Optional settings. + * @param {('value'|'arrayBuffer')} [options.format] - The format in which to return the data. + * If "arrayBuffer", returns an ArrayBuffer. + * Otherwise, returns the deserialized value. + * @returns {Promise} The retrieved value, or undefined if the key does not exist. + */ async function get(key, options) { - return (await op_rivet_kv_get(key, options)) ?? undefined; + let value = (await op_rivet_kv_get(key)) ?? undefined; + + return deserializeValue(key, value, options); } +/** + * Asynchronously retrieves a batch of key-value pairs. + * + * @param {string[]} keys - A list of keys to retrieve. + * @param {Object} [options] - Optional settings. + * @param {('value'|'arrayBuffer')} [options.format] - The format in which to return the data. + * If "arrayBuffer", returns an ArrayBuffer. + * Otherwise, returns the deserialized value. + * @returns {Promise>} The retrieved values. Keys that have no value in the key-value store + * will not be present. + */ async function getBatch(keys, options) { - return (await op_rivet_kv_get_batch(keys, options)) ?? undefined; + let values = await op_rivet_kv_get_batch(keys, options); + + let deserializedValues = new Map(); + + for (let key in values) { + deserializedValues.set(key, deserializeValue(key, values[key], options)); + } + + return deserializedValues; +} + +/** + * Stores a key-value pair in the key-value store. + * + * @param {string} key - The key under which the value will be stored. + * @param {any} value - The value to be stored, which will be serialized. + * @returns {Promise} A promise that resolves when the operation is complete. + */ +async function put(key, value) { + await op_rivet_kv_put(key, core.serialize(value)); +} + +/** + * Asynchronously stores a batch of key-value pairs. + * + * @param {Object} obj - An object containing key-value pairs to be stored. + * @returns {Promise} A promise that resolves when the batch operation is complete. + */ +async function putBatch(obj) { + let serializedObj = new Map(); + + for (let key in obj) { + serializedObj.set(key, core.serialize(obj[key])); + } + + await op_rivet_kv_put_batch(serializedObj); +} + +/** + * Deletes a key-value pair from the key-value store. + * + * @param {string} key - The key of the key-value pair to delete. + * @returns {Promise} A promise that resolves when the operation is complete. + */ +async function delete_(key) { + return await op_rivet_kv_delete(key); +} + +async function deleteBatch(keys) { + return await op_rivet_kv_delete_batch(keys); +} + +function deserializeValue(key, data, options) { + if (data != undefined) { + let format = options?.format ?? "value"; + + if (format == "value") { + try { + return core.deserialize(data, { forStorage: true }); + } catch (e) { + throw new Error( + `Could not deserialize data in key "${key}". You must use options.format = "arrayBuffer".`, + { cause: e } + ); + } + } else if (format == "arrayBuffer") { + return data.buffer; + } else { + throw Error(`Invalid format: "${options.format}". Expected "value" or "arrayBuffer".`); + } + } + + return undefined; } -export { get, getBatch }; +export { get, getBatch, put, putBatch, delete_, deleteBatch }; diff --git a/packages/infra/client/isolate-v8-runner/js/90_rivet_ns.js b/packages/infra/client/isolate-v8-runner/js/90_rivet_ns.js index d36d288364..449572a746 100644 --- a/packages/infra/client/isolate-v8-runner/js/90_rivet_ns.js +++ b/packages/infra/client/isolate-v8-runner/js/90_rivet_ns.js @@ -4,6 +4,10 @@ const rivetNs = { kv: { get: kv.get, getBatch: kv.getBatch, + put: kv.put, + putBatch: kv.putBatch, + delete: kv.delete_, + deleteBatch: kv.deleteBatch, }, }; diff --git a/packages/infra/client/isolate-v8-runner/src/ext/kv.rs b/packages/infra/client/isolate-v8-runner/src/ext/kv.rs index c154eb9280..c30bcf661b 100644 --- a/packages/infra/client/isolate-v8-runner/src/ext/kv.rs +++ b/packages/infra/client/isolate-v8-runner/src/ext/kv.rs @@ -1,16 +1,18 @@ -use std::{future::Future, sync::Arc}; +use std::{collections::HashMap, future::Future, sync::Arc}; -use anyhow::Context; -use deno_core::{error::AnyError, op2, OpState}; +use deno_core::{error::AnyError, op2, JsBuffer, OpState, ToJsBuffer}; use foundationdb as fdb; use futures_util::{StreamExt, TryStreamExt}; -use serde::{Deserialize, Serialize}; deno_core::extension!( rivet_kv, ops = [ op_rivet_kv_get, op_rivet_kv_get_batch, + op_rivet_kv_put, + op_rivet_kv_put_batch, + op_rivet_kv_delete, + // op_rivet_kv_delete_batch, ], esm = [ dir "js", @@ -24,40 +26,17 @@ deno_core::extension!( }, ); -#[derive(Deserialize, Default)] -#[serde(rename_all = "camelCase")] -enum OutputFormat { - Json, - #[default] - Text, - ArrayBuffer, -} - -#[derive(Deserialize, Default)] -#[serde(rename_all = "camelCase")] -struct GetOptions { - format: OutputFormat, -} - -#[derive(Serialize)] -#[serde(untagged)] -enum Output { - Text(String), - Json(serde_json::Value), - Buffer(deno_core::ToJsBuffer), -} - #[op2(async)] -#[serde] +#[buffer] pub fn op_rivet_kv_get( state: &mut OpState, #[string] key: String, - #[serde] options: Option, -) -> impl Future, AnyError>> { +) -> Result>, AnyError>>, AnyError> { + validate_key(&key)?; + let db = state.borrow::>().clone(); - let options = options.unwrap_or_default(); - async move { + Ok(async move { let bkey = key.as_bytes(); let data = db @@ -68,24 +47,8 @@ pub fn op_rivet_kv_get( return Ok(None); }; - let output = match options.format { - OutputFormat::Text => Output::Text( - std::str::from_utf8(&data) - .with_context(|| { - format!("failed to deserialize value as text for key {key:?}") - })? - .to_string(), - ), - OutputFormat::Json => Output::Json( - serde_json::from_slice::(&data).with_context(|| { - format!("failed to deserialize value as JSON for key {key:?}") - })?, - ), - OutputFormat::ArrayBuffer => Output::Buffer(data.to_vec().into()), - }; - - Ok(Some(output)) - } + Ok(Some(data.to_vec())) + }) } #[op2(async)] @@ -93,16 +56,18 @@ pub fn op_rivet_kv_get( pub fn op_rivet_kv_get_batch( state: &mut OpState, #[serde] keys: Vec, - #[serde] options: Option, -) -> Result, AnyError>>, AnyError> { - let db = state.borrow::>().clone(); - let options = options.unwrap_or_default(); - +) -> Result, AnyError>>, AnyError> { anyhow::ensure!( keys.len() <= 128, "a maximum of 128 keys is allowed for `Rivet.getBatch`" ); + for key in &keys { + validate_key(key)?; + } + + let db = state.borrow::>().clone(); + Ok(async move { let data = db .run(|tx, _maybe_committed| { @@ -113,37 +78,120 @@ pub fn op_rivet_kv_get_batch( .map(|key| { let tx = tx.clone(); async move { - Ok(tx.get(key.as_bytes(), false).await?.map(|data| (key, data))) + Ok(tx + .get(key.as_bytes(), false) + .await? + .map(|data| (key, data.to_vec().into()))) } }) .buffer_unordered(16) .try_filter_map(|x| std::future::ready(Ok(x))) - .try_collect::>() + .try_collect::>() .await } }) .await?; - data.into_iter() - .map(|(key, data)| { - let output = match options.format { - OutputFormat::Text => Output::Text( - std::str::from_utf8(&data) - .with_context(|| { - format!("failed to deserialize value as text for key {key:?}") - })? - .to_string(), - ), - OutputFormat::Json => Output::Json( - serde_json::from_slice::(&data).with_context(|| { - format!("failed to deserialize value as JSON for key {key:?}") - })?, - ), - OutputFormat::ArrayBuffer => Output::Buffer(data.to_vec().into()), - }; - - Ok(output) + Ok(data) + }) +} + +#[op2(async)] +pub fn op_rivet_kv_put( + state: &mut OpState, + #[string] key: String, + #[buffer] value: JsBuffer, +) -> Result>, AnyError> { + validate_key(&key)?; + validate_value(&key, &value)?; + + let db = state.borrow::>().clone(); + + Ok(async move { + let bkey = key.as_bytes(); + + db.run(|tx, _maybe_committed| { + let value = value.clone(); // Creates a new ref, does not clone data + + async move { + tx.set(bkey, &value); + Ok(()) + } + }) + .await?; + + Ok(()) + }) +} + +#[op2(async)] +pub fn op_rivet_kv_put_batch( + state: &mut OpState, + #[serde] obj: HashMap, +) -> Result>, AnyError> { + for (key, value) in &obj { + validate_key(&key)?; + validate_value(&key, &value)?; + } + + let db = state.borrow::>().clone(); + + Ok(async move { + db.run(|tx, _maybe_committed| { + let obj = obj.clone(); + + async move { + for (key, value) in obj { + tx.set(key.as_bytes(), &value); + } + + Ok(()) + } + }) + .await?; + + Ok(()) + }) +} + +#[op2(async)] +pub fn op_rivet_kv_delete( + state: &mut OpState, + #[string] key: String, +) -> Result>, AnyError> { + validate_key(&key)?; + + let db = state.borrow::>().clone(); + + Ok(async move { + let bkey = key.as_bytes(); + + let existed = db + .run(|tx, _maybe_committed| async move { + let existed = tx.get(bkey, false).await?.is_some(); + + tx.clear(bkey); + Ok(existed) }) - .collect() + .await?; + + Ok(existed) }) } + +fn validate_key(key: &str) -> Result<(), AnyError> { + // 2048 bytes + anyhow::ensure!(key.len() <= 2048, "key is too long (max 2048 bytes)"); + + Ok(()) +} + +fn validate_value(key: &str, value: &[u8]) -> Result<(), AnyError> { + // 2048 bytes + anyhow::ensure!( + value.len() <= 128 * 1024, + "value for key {key:?} is too large (max 128 KiB)" + ); + + Ok(()) +} diff --git a/packages/infra/client/isolate-v8-runner/src/isolate.rs b/packages/infra/client/isolate-v8-runner/src/isolate.rs index bdaddd7adb..1e088ea7ab 100644 --- a/packages/infra/client/isolate-v8-runner/src/isolate.rs +++ b/packages/infra/client/isolate-v8-runner/src/isolate.rs @@ -66,8 +66,8 @@ pub fn run( // Run the isolate let exit_code = match create_and_run_current_thread(run_inner( - actor_id, actor_path.clone(), + actor_id, terminate_tx, msg_tx.clone(), config, @@ -118,8 +118,8 @@ pub fn run( } pub async fn run_inner( - actor_id: Uuid, actor_path: PathBuf, + actor_id: Uuid, terminate_tx: mpsc::Sender, msg_tx: Option>, config: Config, @@ -129,7 +129,7 @@ pub async fn run_inner( // Load script into a static module loader. No dynamic scripts can be loaded this way. let script_content = fs::read_to_string(actor_path.join("index.js")) .await - .context("failed to load index.js")?; + .with_context(|| format!("failed to load {}", actor_path.join("index.js").display()))?; let main_module = ModuleSpecifier::from_file_path(Path::new("/index.js")) .map_err(|_| anyhow!("invalid file name"))?; let loader = StaticModuleLoader::new([(main_module.clone(), script_content)]); diff --git a/packages/infra/client/isolate-v8-runner/src/main.rs b/packages/infra/client/isolate-v8-runner/src/main.rs index 72d1483f3d..dcded180d7 100644 --- a/packages/infra/client/isolate-v8-runner/src/main.rs +++ b/packages/infra/client/isolate-v8-runner/src/main.rs @@ -209,7 +209,9 @@ async fn handle_connection( runner_protocol::ToRunner::Signal { actor_id, signal } => { if let Some(signal_tx) = actors.read().await.get(&actor_id) { // Tell actor thread to stop. Removing the actor is handled in the tokio task above. - signal_tx.try_send(signal).context("failed to send stop signal to actor poll task")?; + signal_tx + .try_send(signal) + .context("failed to send stop signal to actor poll task")?; } else { tracing::warn!("Actor {actor_id} not found for stopping"); } diff --git a/packages/infra/client/isolate-v8-runner/tests/integration.rs b/packages/infra/client/isolate-v8-runner/tests/integration.rs new file mode 100644 index 0000000000..e6e0e78b7a --- /dev/null +++ b/packages/infra/client/isolate-v8-runner/tests/integration.rs @@ -0,0 +1,89 @@ +// TODO: + +// #[cfg(test)] +// mod tests { +// use std::{ +// collections::HashMap, +// os::fd::AsRawFd, +// path::{Path, PathBuf}, +// result::Result::{Err, Ok}, +// sync::Arc, +// thread::JoinHandle, +// time::Duration, +// }; + +// use anyhow::*; +// use deno_core::JsRuntime; +// use deno_runtime::worker::MainWorkerTerminateHandle; +// use foundationdb as fdb; +// use futures_util::{stream::SplitStream, SinkExt, StreamExt}; +// use tokio::{ +// fs, +// net::TcpStream, +// sync::{mpsc, RwLock}, +// }; +// use tokio_tungstenite::{tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream}; +// use tracing_subscriber::prelude::*; +// use uuid::Uuid; + +// use super::run_inner; +// use crate::config::*; +// use crate::utils::{self, var}; + +// const THREAD_STATUS_POLL_INTERVAL: Duration = Duration::from_millis(500); + +// #[tokio::test] +// async fn test_isolate() -> Result<()> { +// tracing_subscriber::registry() +// .with( +// tracing_logfmt::builder() +// .with_ansi_color(true) +// .layer() +// .with_filter(tracing_subscriber::filter::LevelFilter::INFO), +// ) +// .init(); + +// // Start FDB network thread +// let _network = unsafe { fdb::boot() }; +// tokio::spawn(utils::fdb_health_check()); + +// // For receiving the terminate handle +// let (terminate_tx, _terminate_rx) = +// tokio::sync::mpsc::channel::(1); + +// let test_dir = Path::new("/tmp/pegboard-isolate-v8-runner-test/"); +// let actors_path = test_dir.join("actors"); +// let actor_id = Uuid::nil(); + +// unsafe { +// std::env::set_var( +// "FDB_CLUSTER_PATH", +// &test_dir.join("fdb.cluster").display().to_string(), +// ) +// }; + +// let config = Config { +// resources: Resources { +// memory: 26843545600, +// memory_max: 26843545600, +// }, +// ports: Default::default(), +// env: Default::default(), +// stakeholder: Stakeholder::DynamicServer { +// server_id: String::new(), +// }, +// vector_socket_addr: Default::default(), +// }; + +// run_inner( +// actors_path.join(actor_id.to_string()).to_path_buf(), +// actor_id, +// terminate_tx, +// None, +// config, +// ) +// .await?; + +// Ok(()) +// } +// }