Skip to content

Commit

Permalink
fix: fix eq check for actor kv
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Dec 31, 2024
1 parent f2cfdf7 commit 0e3b754
Show file tree
Hide file tree
Showing 34 changed files with 1,995 additions and 271 deletions.
1 change: 1 addition & 0 deletions packages/infra/client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions packages/infra/client/actor-kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ anyhow.workspace = true
deno_core.workspace = true
foundationdb = {version = "0.9.1", features = [ "fdb-7_1", "embedded-fdb-include" ] }
futures-util = { version = "0.3" }
indexmap = { version = "2.0" }
prost = "0.13.3"
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
tokio.workspace = true
tokio-tungstenite = "0.23.1"
tracing.workspace = true
tokio.workspace = true
tracing-logfmt.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
uuid = { version = "1.6.1", features = ["v4"] }

pegboard = { path = "../../../services/pegboard", default-features = false }
11 changes: 6 additions & 5 deletions packages/infra/client/actor-kv/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{hash_map, HashMap},
collections::HashMap,
result::Result::{Err, Ok},
};

Expand All @@ -15,6 +15,7 @@ pub use list_query::ListQuery;
pub use metadata::Metadata;
use pegboard::protocol;
use prost::Message;
use indexmap::IndexMap;
use utils::{owner_segment, validate_entries, validate_keys, TransactionExt};

mod entry;
Expand Down Expand Up @@ -174,7 +175,7 @@ impl ActorKv {
query: ListQuery,
reverse: bool,
limit: Option<usize>,
) -> Result<HashMap<Key, Entry>> {
) -> Result<IndexMap<Key, Entry>> {
let subspace = self
.subspace
.as_ref()
Expand Down Expand Up @@ -231,13 +232,13 @@ impl ActorKv {
// With a limit, we short circuit out of the `try_fold` once the limit is reached
if let Some(limit) = limit {
stream
.try_fold(HashMap::new(), |mut acc, (key, sub_key)| async {
.try_fold(IndexMap::new(), |mut acc, (key, sub_key)| async {
let size = acc.len();
let entry = acc.entry(key);

// Short circuit when limit is reached. This relies on data from the stream
// being in order.
if size == limit && matches!(entry, hash_map::Entry::Vacant(_)) {
if size == limit && matches!(entry, indexmap::map::Entry::Vacant(_)) {
return Err(ListLimitReached(acc).into());
}

Expand All @@ -255,7 +256,7 @@ impl ActorKv {
})
} else {
stream
.try_fold(HashMap::new(), |mut acc, (key, sub_key)| async {
.try_fold(IndexMap::new(), |mut acc, (key, sub_key)| async {
acc.entry(key)
.or_insert_with(EntryBuilder::default)
.add_sub_key(sub_key)?;
Expand Down
5 changes: 2 additions & 3 deletions packages/infra/client/actor-kv/src/list_query.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::collections::HashMap;

use anyhow::*;
use foundationdb::tuple::Subspace;
use serde::Deserialize;
use indexmap::IndexMap;

use crate::{
entry::EntryBuilder,
Expand Down Expand Up @@ -71,7 +70,7 @@ impl ListQuery {
}

// Used to short circuit after the
pub struct ListLimitReached(pub HashMap<Key, EntryBuilder>);
pub struct ListLimitReached(pub IndexMap<Key, EntryBuilder>);

impl std::fmt::Debug for ListLimitReached {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down
52 changes: 39 additions & 13 deletions packages/infra/client/isolate-v8-runner/js/40_rivet_kv.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import { core } from "ext:core/mod.js";
import { op_rivet_kv_delete, op_rivet_kv_delete_all, op_rivet_kv_delete_batch, op_rivet_kv_get, op_rivet_kv_get_batch, op_rivet_kv_list, op_rivet_kv_put, op_rivet_kv_put_batch, } from "ext:core/ops";
import { deepEqual } from "./lib/fast-equals/index.js";
/**
* Retrieves a value from the key-value store.
*/
Expand All @@ -18,19 +19,17 @@ async function get(key, options) {
*/
async function getBatch(keys, options) {
const entries = await op_rivet_kv_get_batch(keys.map((x) => serializeKey(x)));
const deserializedValues = new Map();
for (const [key, entry] of entries) {
const jsKey = deserializeKey(key);
deserializedValues.set(jsKey, deserializeValue(jsKey, entry.value, options?.format));
}
return deserializedValues;
return new HashMap(entries.map(([key, entry]) => {
let jsKey = deserializeKey(key);
return [jsKey, deserializeValue(jsKey, entry.value, options?.format)];
}));
}
/**
* Retrieves all key-value pairs in the KV store. When using any of the options, the keys lexicographic order
* is used for filtering.
*
* @param {ListOptions} [options] - Options.
* @returns {Promise<Map<Key, Entry>>} The retrieved values.
* @returns {Promise<HashMap<Key, Entry>>} The retrieved values.
*/
async function list(options) {
// Build query
Expand Down Expand Up @@ -69,12 +68,10 @@ async function list(options) {
query = { all: {} };
}
const entries = await op_rivet_kv_list(query, options?.reverse ?? false, options?.limit);
const deserializedValues = new Map();
for (const [key, entry] of entries) {
const jsKey = deserializeKey(key);
deserializedValues.set(jsKey, deserializeValue(jsKey, entry.value, options?.format));
}
return deserializedValues;
return new HashMap(entries.map(([key, entry]) => {
let jsKey = deserializeKey(key);
return [jsKey, deserializeValue(jsKey, entry.value, options?.format)];
}));
}
/**
* Stores a key-value pair in the key-value store.
Expand Down Expand Up @@ -233,6 +230,35 @@ function deserializeValue(key, value, format = "value") {
throw Error(`invalid format: "${format}". expected "value" or "arrayBuffer".`);
}
}
class HashMap {
#internal;
constructor(internal) {
this.#internal = internal;
}
get(key) {
for (let [k, v] of this.#internal) {
if (deepEqual(key, k))
return v;
}
return undefined;
}
/**
* Returns a map of keys to values. **WARNING** Using `.get` on the returned map does not work as expected
* with complex types (arrays, objects, etc). Use `.get` on this class instead.
*/
raw() {
return new Map(this.#internal);
}
array() {
return this.#internal;
}
entries() {
return this[Symbol.iterator]();
}
[Symbol.iterator]() {
return this.#internal[Symbol.iterator]();
}
}
export const KV_NAMESPACE = {
get,
getBatch,
Expand Down
Loading

0 comments on commit 0e3b754

Please sign in to comment.