Skip to content

Commit

Permalink
feat: add durability to ds (#1364)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Nov 21, 2024
1 parent 12f01f7 commit 293be3d
Show file tree
Hide file tree
Showing 89 changed files with 1,075 additions and 655 deletions.
19 changes: 11 additions & 8 deletions packages/api/actor/src/route/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ pub async fn create(

let server_id = Uuid::new_v4();

let mut create_sub = ctx
.subscribe::<ds::workflows::server::CreateComplete>(("server_id", server_id))
let mut ready_sub = ctx
.subscribe::<ds::workflows::server::Ready>(("server_id", server_id))
.await?;
let mut fail_sub = ctx
.subscribe::<ds::workflows::server::CreateFailed>(("server_id", server_id))
Expand All @@ -147,11 +147,12 @@ pub async fn create(
runtime: game_config.runtime,
tags,
resources: (*body.resources).api_into(),
kill_timeout_ms: body
.lifecycle
.as_ref()
.and_then(|x| x.kill_timeout)
.unwrap_or_default(),
lifecycle: body.lifecycle.map(|x| (*x).api_into()).unwrap_or_else(|| {
ds::types::ServerLifecycle {
kill_timeout_ms: 0,
durable: false,
}
}),
image_id: body.runtime.build,
root_user_enabled: game_config.root_user_enabled,
args: body.runtime.arguments.unwrap_or_default(),
Expand Down Expand Up @@ -211,8 +212,9 @@ pub async fn create(
.dispatch()
.await?;

// Wait for ready or fail
tokio::select! {
res = create_sub.next() => { res?; },
res = ready_sub.next() => { res?; },
res = fail_sub.next() => {
res?;
bail_with!(SERVERS_SERVER_FAILED_TO_CREATE);
Expand Down Expand Up @@ -260,6 +262,7 @@ pub async fn create_deprecated(
lifecycle: body.lifecycle.map(|l| {
Box::new(models::ActorLifecycle {
kill_timeout: l.kill_timeout,
durable: Some(false),
})
}),
network: Some(Box::new(models::ActorCreateActorNetworkRequest {
Expand Down
24 changes: 14 additions & 10 deletions packages/infra/client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/infra/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ members = [
"isolate-v8-runner",
"actor-kv",
"manager",
"runner-protocol",
"config",
]

[workspace.package]
Expand All @@ -18,7 +18,7 @@ license = "Apache-2.0"
container-runner = { path = "container-runner" }
echo = { path = "echo" }
isolate-v8-runner = { path = "isolate-v8-runner" }
runner-protocol = { path = "runner-protocol", package = "pegboard-runner-protocol" }
pegboard-config = { path = "config" }
actor-kv = { path = "actor-kv", package = "pegboard-actor-kv" }

[workspace.dependencies.sqlx]
Expand Down
2 changes: 2 additions & 0 deletions packages/infra/client/actor-kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
"json",
] }
uuid = { version = "1.6.1", features = ["v4"] }

pegboard = { path = "../../../services/pegboard", default-features = false }
62 changes: 62 additions & 0 deletions packages/infra/client/actor-kv/src/entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use anyhow::*;
use foundationdb as fdb;
use prost::Message;
use serde::Serialize;

use crate::{key::Key, metadata::Metadata};

#[derive(Default)]
pub(crate) struct EntryBuilder {
metadata: Option<Metadata>,
value: Vec<u8>,
next_idx: usize,
}

impl EntryBuilder {
pub(crate) fn add_sub_key(&mut self, sub_key: SubKey) -> Result<()> {
match sub_key {
SubKey::Metadata(value) => {
// We ignore setting the metadata again because it means the same key was given twice in the
// input keys for `ActorKv::get`. We don't perform automatic deduplication.
if self.metadata.is_none() {
self.metadata = Some(Metadata::decode(value.value())?);
}
}
SubKey::Chunk(idx, value) => {
// We don't perform deduplication on the input keys for `ActorKv::get` so we might have
// duplicate data chunks. This idx check ignores chunks that were already passed and ensures
// contiguity.
if idx == self.next_idx {
self.value.extend(value.value());
self.next_idx = idx + 1;
}
}
}

Ok(())
}

pub(crate) fn build(self, key: &Key) -> Result<Entry> {
ensure!(!self.value.is_empty(), "empty value at key {key:?}");

Ok(Entry {
metadata: self
.metadata
.with_context(|| format!("no metadata for key {key:?}"))?,
value: self.value,
})
}
}

/// Represents a Rivet KV value.
#[derive(Serialize)]
pub struct Entry {
pub metadata: Metadata,
pub value: Vec<u8>,
}

/// Represents FDB keys within a Rivet KV key.
pub(crate) enum SubKey {
Metadata(fdb::future::FdbValue),
Chunk(usize, fdb::future::FdbValue),
}
156 changes: 14 additions & 142 deletions packages/infra/client/actor-kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ use std::{

use anyhow::*;
use deno_core::JsBuffer;
pub use entry::Entry;
use entry::{EntryBuilder, SubKey};
use foundationdb::{self as fdb, directory::Directory, tuple::Subspace};
use futures_util::{StreamExt, TryStreamExt};
use key::{Key, ListKey};
use key::Key;
use list_query::ListLimitReached;
pub use list_query::ListQuery;
pub use metadata::Metadata;
use pegboard::protocol;
use prost::Message;
use serde::{Deserialize, Serialize};
use utils::{validate_entries, validate_keys, TransactionExt};
use uuid::Uuid;
use utils::{owner_segment, validate_entries, validate_keys, TransactionExt};

mod entry;
pub mod key;
mod list_query;
mod metadata;
mod utils;

Expand All @@ -29,16 +34,16 @@ const VALUE_CHUNK_SIZE: usize = 1000; // 1 KB, not KiB
pub struct ActorKv {
version: &'static str,
db: fdb::Database,
actor_id: Uuid,
owner: protocol::ActorOwner,
subspace: Option<Subspace>,
}

impl ActorKv {
pub fn new(db: fdb::Database, actor_id: Uuid) -> Self {
pub fn new(db: fdb::Database, owner: protocol::ActorOwner) -> Self {
Self {
version: env!("CARGO_PKG_VERSION"),
db,
actor_id,
owner,
subspace: None,
}
}
Expand All @@ -50,7 +55,7 @@ impl ActorKv {
let actor_dir = root
.create_or_open(
&tx,
&["pegboard".into(), self.actor_id.into()],
&["pegboard".into(), owner_segment(&self.owner)],
None,
Some(b"partition"),
)
Expand Down Expand Up @@ -398,144 +403,11 @@ impl ActorKv {
let root = fdb::directory::DirectoryLayer::default();

let tx = self.db.create_trx()?;
root.remove_if_exists(&tx, &["pegboard".into(), self.actor_id.into()])
root.remove_if_exists(&tx, &["pegboard".into(), owner_segment(&self.owner)])
.await
.map_err(|err| anyhow!("{err:?}"))?;
tx.commit().await.map_err(|err| anyhow!("{err:?}"))?;

Ok(())
}
}

#[derive(Default)]
struct EntryBuilder {
metadata: Option<Metadata>,
value: Vec<u8>,
next_idx: usize,
}

impl EntryBuilder {
fn add_sub_key(&mut self, sub_key: SubKey) -> Result<()> {
match sub_key {
SubKey::Metadata(value) => {
// We ignore setting the metadata again because it means the same key was given twice in the
// input keys for `ActorKv::get`. We don't perform automatic deduplication.
if self.metadata.is_none() {
self.metadata = Some(Metadata::decode(value.value())?);
}
}
SubKey::Chunk(idx, value) => {
// We don't perform deduplication on the input keys for `ActorKv::get` so we might have
// duplicate data chunks. This idx check ignores chunks that were already passed and ensures
// contiguity.
if idx == self.next_idx {
self.value.extend(value.value());
self.next_idx = idx + 1;
}
}
}

Ok(())
}

fn build(self, key: &Key) -> Result<Entry> {
ensure!(!self.value.is_empty(), "empty value at key {key:?}");

Ok(Entry {
metadata: self
.metadata
.with_context(|| format!("no metadata for key {key:?}"))?,
value: self.value,
})
}
}

/// Represents a Rivet KV value.
#[derive(Serialize)]
pub struct Entry {
pub metadata: Metadata,
pub value: Vec<u8>,
}

/// Represents FDB keys within a Rivet KV key.
enum SubKey {
Metadata(fdb::future::FdbValue),
Chunk(usize, fdb::future::FdbValue),
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ListQuery {
All,
RangeInclusive(ListKey, Key),
RangeExclusive(ListKey, Key),
Prefix(ListKey),
}

impl ListQuery {
fn range(&self, subspace: &Subspace) -> (Vec<u8>, Vec<u8>) {
match self {
ListQuery::All => subspace.range(),
ListQuery::RangeInclusive(start, end) => (
subspace.subspace(&start).range().0,
subspace.subspace(&end).range().1,
),
ListQuery::RangeExclusive(start, end) => (
subspace.subspace(&start).range().0,
subspace.subspace(&end).range().1,
),
ListQuery::Prefix(prefix) => subspace.subspace(&prefix).range(),
}
}

fn validate(&self) -> Result<()> {
match self {
ListQuery::All => {}
ListQuery::RangeInclusive(start, end) => {
ensure!(
start.len() <= MAX_KEY_SIZE,
"start key is too long (max 2048 bytes)"
);
ensure!(
end.len() <= MAX_KEY_SIZE,
"end key is too long (max 2048 bytes)"
);
}
ListQuery::RangeExclusive(start, end) => {
ensure!(
start.len() <= MAX_KEY_SIZE,
"startAfter key is too long (max 2048 bytes)"
);
ensure!(
end.len() <= MAX_KEY_SIZE,
"end key is too long (max 2048 bytes)"
);
}
ListQuery::Prefix(prefix) => {
ensure!(
prefix.len() <= MAX_KEY_SIZE,
"prefix key is too long (max 2048 bytes)"
);
}
}

Ok(())
}
}

// Used to short circuit after the
struct ListLimitReached(HashMap<Key, EntryBuilder>);

impl std::fmt::Debug for ListLimitReached {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ListLimitReached")
}
}

impl std::fmt::Display for ListLimitReached {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "List limit reached")
}
}

impl std::error::Error for ListLimitReached {}
Loading

0 comments on commit 293be3d

Please sign in to comment.