diff --git a/Cargo.lock b/Cargo.lock index eabb1b8145..f28d47b569 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4459,6 +4459,7 @@ dependencies = [ "proptest-derive", "rand 0.8.5", "serde", + "spacetimedb-paths", "spacetimedb-primitives", "spacetimedb-sats", "tempfile", @@ -4518,6 +4519,7 @@ dependencies = [ "rustc-demangle", "rustc-hash", "scopeguard", + "semver", "serde", "serde_json", "serde_path_to_error", @@ -4534,6 +4536,7 @@ dependencies = [ "spacetimedb-expr", "spacetimedb-lib", "spacetimedb-metrics", + "spacetimedb-paths", "spacetimedb-primitives", "spacetimedb-sats", "spacetimedb-schema", @@ -4582,6 +4585,7 @@ dependencies = [ "itertools 0.12.1", "log", "spacetimedb-commitlog", + "spacetimedb-paths", "spacetimedb-sats", "tokio", "tracing", @@ -4648,6 +4652,16 @@ dependencies = [ "prometheus", ] +[[package]] +name = "spacetimedb-paths" +version = "1.0.0-rc1" +dependencies = [ + "chrono", + "fs2", + "itoa", + "thiserror", +] + [[package]] name = "spacetimedb-primitives" version = "1.0.0-rc1" @@ -4759,6 +4773,7 @@ dependencies = [ "spacetimedb-durability", "spacetimedb-fs-utils", "spacetimedb-lib", + "spacetimedb-paths", "spacetimedb-primitives", "spacetimedb-sats", "spacetimedb-table", @@ -4798,6 +4813,7 @@ dependencies = [ "spacetimedb-client-api-messages", "spacetimedb-core", "spacetimedb-lib", + "spacetimedb-paths", "tempfile", "thiserror", "tokio", @@ -4847,6 +4863,7 @@ dependencies = [ "spacetimedb-core", "spacetimedb-data-structures", "spacetimedb-lib", + "spacetimedb-paths", "spacetimedb-standalone", "tempfile", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 0295922e05..b1131f3ae7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "crates/fs-utils", "crates/lib", "crates/metrics", + "crates/paths", "crates/primitives", "crates/sats", "crates/schema", @@ -100,6 +101,7 @@ spacetimedb-expr = { path = "crates/expr", version = "1.0.0-rc1" } spacetimedb-lib = { path = "crates/lib", default-features = false, version = "1.0.0-rc1" } spacetimedb-metrics = { path = "crates/metrics", version = "1.0.0-rc1" } spacetimedb-primitives = { path = "crates/primitives", version = "1.0.0-rc1" } +spacetimedb-paths = { path = "crates/paths", version = "1.0.0-rc1" } spacetimedb-sats = { path = "crates/sats", version = "1.0.0-rc1" } spacetimedb-schema = { path = "crates/schema", version = "1.0.0-rc1" } spacetimedb-standalone = { path = "crates/standalone", version = "1.0.0-rc1" } @@ -169,6 +171,7 @@ indicatif = "0.16" insta = { version = "1.21.0", features = ["toml"] } is-terminal = "0.4" itertools = "0.12" +itoa = "1" jsonwebtoken = { version = "8.1.0" } lazy_static = "1.4.0" log = "0.4.17" @@ -201,6 +204,7 @@ rustyline = { version = "12.0.0", features = [] } scoped-tls = "1.0.1" scopeguard = "1.1.0" second-stack = "0.3" +semver = "1" serde = { version = "1.0.136", features = ["derive"] } serde_json = { version = "1.0.128", features = ["raw_value", "arbitrary_precision"] } serde_path_to_error = "0.1.9" diff --git a/crates/cli/src/subcommands/server.rs b/crates/cli/src/subcommands/server.rs index 0e0bb28553..bf787873b8 100644 --- a/crates/cli/src/subcommands/server.rs +++ b/crates/cli/src/subcommands/server.rs @@ -5,7 +5,7 @@ use crate::{ }; use anyhow::Context; use clap::{Arg, ArgAction, ArgMatches, Command}; -use spacetimedb::stdb_path; +use spacetimedb_standalone::subcommands::start::default_data_dir; use std::path::PathBuf; use tabled::{ settings::{object::Columns, Alignment, Modify, Style}, @@ -122,6 +122,13 @@ fn get_subcommands() -> Vec { .arg(common_args::yes()), Command::new("clear") .about("Deletes all data from all local databases") + .arg( + Arg::new("data_dir") + .long("data-dir") + .help("The path to the data directory for the database") + .default_value(default_data_dir().into_os_string()) + .value_parser(clap::value_parser!(PathBuf)), + ) .arg(common_args::yes()), // TODO: set-name, set-protocol, set-host, set-url ] @@ -425,26 +432,10 @@ pub async fn exec_edit(mut config: Config, args: &ArgMatches) -> Result<(), anyh async fn exec_clear(_config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> { let force = args.get_flag("force"); - if std::env::var_os("STDB_PATH").map(PathBuf::from).is_none() { - let mut path = dirs::home_dir().unwrap_or_default(); - path.push(".spacetime"); - std::env::set_var("STDB_PATH", path.to_str().unwrap()); - } - - let control_node_dir = stdb_path("control_node"); - let worker_node_dir = stdb_path("worker_node"); - if control_node_dir.exists() || worker_node_dir.exists() { - if control_node_dir.exists() { - println!("Control node database path: {}", control_node_dir.to_str().unwrap()); - } else { - println!("Control node database path: "); - } + let data_dir = args.get_one::("data_dir").unwrap(); - if worker_node_dir.exists() { - println!("Worker node database path: {}", worker_node_dir.to_str().unwrap()); - } else { - println!("Worker node database path: "); - } + if data_dir.exists() { + println!("Database path: {}", data_dir.display()); if !y_or_n( force, @@ -454,14 +445,8 @@ async fn exec_clear(_config: Config, args: &ArgMatches) -> Result<(), anyhow::Er return Ok(()); } - if control_node_dir.exists() { - std::fs::remove_dir_all(&control_node_dir)?; - println!("Deleted control node database: {}", control_node_dir.to_str().unwrap()); - } - if worker_node_dir.exists() { - std::fs::remove_dir_all(&worker_node_dir)?; - println!("Deleted worker node database: {}", worker_node_dir.to_str().unwrap()); - } + std::fs::remove_dir_all(&data_dir)?; + println!("Deleted database: {}", data_dir.display()); } else { println!("Local database not found. Nothing has been deleted."); } diff --git a/crates/client-api/Cargo.toml b/crates/client-api/Cargo.toml index 649feb2489..0ed057c1e3 100644 --- a/crates/client-api/Cargo.toml +++ b/crates/client-api/Cargo.toml @@ -38,6 +38,6 @@ bytes = "1" tracing.workspace = true bytestring = "1" tokio-tungstenite.workspace = true -itoa = "1.0.9" +itoa.workspace = true derive_more = "0.99.17" scopeguard.workspace = true diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 00071e926a..4015e9d0da 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -446,11 +446,12 @@ where .ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?; let replica_id = replica.id; - let filepath = DatabaseLogger::filepath(&address, replica_id); + let host = worker_ctx.host_controller(); + + let filepath = DatabaseLogger::filepath(host.data_dir.replica(replica_id)); let lines = DatabaseLogger::read_latest(&filepath, num_lines).await; let body = if follow { - let host = worker_ctx.host_controller(); let module = host .get_or_launch_module_host(database, replica_id) .await diff --git a/crates/commitlog/Cargo.toml b/crates/commitlog/Cargo.toml index 205c93e015..d8a2b6f733 100644 --- a/crates/commitlog/Cargo.toml +++ b/crates/commitlog/Cargo.toml @@ -18,6 +18,7 @@ log.workspace = true memmap2 = "0.9.4" serde = { workspace = true, optional = true } spacetimedb-primitives.workspace = true +spacetimedb-paths.workspace = true spacetimedb-sats.workspace = true tempfile.workspace = true thiserror.workspace = true diff --git a/crates/commitlog/src/index/indexfile.rs b/crates/commitlog/src/index/indexfile.rs index 2584173530..b7009b8fab 100644 --- a/crates/commitlog/src/index/indexfile.rs +++ b/crates/commitlog/src/index/indexfile.rs @@ -3,22 +3,16 @@ use std::{ io, marker::PhantomData, mem, - path::{Path, PathBuf}, }; use log::debug; use memmap2::MmapMut; +use spacetimedb_paths::server::OffsetIndexFile; use super::IndexError; -const OFFSET_INDEX_FILE_EXT: &str = ".stdb.ofs"; const KEY_SIZE: usize = mem::size_of::(); const ENTRY_SIZE: usize = KEY_SIZE + mem::size_of::(); -/// Returns the offset index file path based on the root path and offset -pub fn offset_index_file_path(root: &Path, offset: u64) -> PathBuf { - root.join(format!("{offset:0>20}{OFFSET_INDEX_FILE_EXT}")) -} - /// A mutable representation of an index file using memory-mapped I/O. /// /// `IndexFileMut` provides efficient read and write access to an index file, which stores @@ -35,12 +29,8 @@ pub struct IndexFileMut + From> { } impl + From> IndexFileMut { - pub fn create_index_file(path: &Path, offset: u64, cap: u64) -> io::Result { - File::options() - .write(true) - .read(true) - .create_new(true) - .open(offset_index_file_path(path, offset)) + pub fn create_index_file(path: &OffsetIndexFile, cap: u64) -> io::Result { + path.open_file(File::options().write(true).read(true).create_new(true)) .and_then(|file| { file.set_len(cap * ENTRY_SIZE as u64)?; let mmap = unsafe { MmapMut::map_mut(&file) }?; @@ -54,18 +44,15 @@ impl + From> IndexFileMut { .or_else(|e| { if e.kind() == io::ErrorKind::AlreadyExists { debug!("Index file {} already exists", path.display()); - Self::open_index_file(path, offset, cap) + Self::open_index_file(path, cap) } else { Err(e) } }) } - pub fn open_index_file(path: &Path, offset: u64, cap: u64) -> io::Result { - let file = File::options() - .read(true) - .write(true) - .open(offset_index_file_path(path, offset))?; + pub fn open_index_file(path: &OffsetIndexFile, cap: u64) -> io::Result { + let file = path.open_file(File::options().read(true).write(true))?; file.set_len(cap * ENTRY_SIZE as u64)?; let mmap = unsafe { MmapMut::map_mut(&file)? }; @@ -79,8 +66,8 @@ impl + From> IndexFileMut { Ok(me) } - pub fn delete_index_file(path: &Path, offset: u64) -> io::Result<()> { - fs::remove_file(offset_index_file_path(path, offset)).map_err(Into::into) + pub fn delete_index_file(path: &OffsetIndexFile) -> io::Result<()> { + fs::remove_file(path).map_err(Into::into) } // Searches for first 0-key, to count number of entries @@ -244,11 +231,8 @@ pub struct IndexFile + From> { } impl + From> IndexFile { - pub fn open_index_file(path: &Path, offset: u64) -> io::Result { - let file = File::options() - .read(true) - .append(true) - .open(offset_index_file_path(path, offset))?; + pub fn open_index_file(path: &OffsetIndexFile) -> io::Result { + let file = path.open_file(File::options().read(true).append(true))?; let mmap = unsafe { MmapMut::map_mut(&file)? }; let mut inner = IndexFileMut { @@ -274,16 +258,18 @@ impl + From> IndexFile { #[cfg(test)] mod tests { use super::*; + use spacetimedb_paths::server::CommitLogDir; use tempfile::TempDir; /// Create and fill index file with key as first `fill_till - 1` even numbers fn create_and_fill_index(cap: u64, fill_till: u64) -> Result, IndexError> { // Create a temporary directory for testing let temp_dir = TempDir::new()?; - let path = temp_dir.path().to_path_buf(); + let path = CommitLogDir(temp_dir.path().to_path_buf()); + let index_path = path.index(0); // Create an index file - let mut index_file: IndexFileMut = IndexFileMut::create_index_file(&path, 0, cap)?; + let mut index_file: IndexFileMut = IndexFileMut::create_index_file(&index_path, cap)?; // Enter even number keys from 2 for i in 1..fill_till { @@ -364,10 +350,11 @@ mod tests { fn test_close_open_index() -> Result<(), IndexError> { // Create a temporary directory for testing let temp_dir = TempDir::new()?; - let path = temp_dir.path().to_path_buf(); + let path = CommitLogDir(temp_dir.path().to_path_buf()); + let index_path = path.index(0); // Create an index file - let mut index_file: IndexFileMut = IndexFileMut::create_index_file(&path, 0, 100)?; + let mut index_file: IndexFileMut = IndexFileMut::create_index_file(&index_path, 100)?; for i in 1..10 { index_file.append(i * 2, i * 2 * 100)?; @@ -376,7 +363,7 @@ mod tests { assert_eq!(index_file.num_entries, 9); drop(index_file); - let open_index_file: IndexFileMut = IndexFileMut::open_index_file(&path, 0, 100)?; + let open_index_file: IndexFileMut = IndexFileMut::open_index_file(&index_path, 100)?; assert_eq!(open_index_file.num_entries, 9); assert_eq!(open_index_file.key_lookup(6)?, (6, 600)); diff --git a/crates/commitlog/src/index/mod.rs b/crates/commitlog/src/index/mod.rs index 5469b8111d..3c72b4e32d 100644 --- a/crates/commitlog/src/index/mod.rs +++ b/crates/commitlog/src/index/mod.rs @@ -3,7 +3,6 @@ use std::io; use thiserror::Error; mod indexfile; -pub use indexfile::offset_index_file_path; pub use indexfile::{IndexFile, IndexFileMut}; #[derive(Error, Debug)] diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 3f83af6916..821e0a13cd 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -1,11 +1,11 @@ use std::{ io, num::{NonZeroU16, NonZeroU64}, - path::PathBuf, sync::RwLock, }; use log::trace; +use spacetimedb_paths::server::CommitLogDir; pub mod commit; pub mod commitlog; @@ -105,7 +105,7 @@ impl Commitlog { /// This is only necessary when opening the commitlog for writing. See the /// free-standing functions in this module for how to traverse a read-only /// commitlog. - pub fn open(root: impl Into, opts: Options) -> io::Result { + pub fn open(root: CommitLogDir, opts: Options) -> io::Result { let inner = commitlog::Generic::open(repo::Fs::new(root), opts)?; Ok(Self { @@ -402,7 +402,7 @@ impl Commitlog { /// /// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`]. /// See [`Commitlog::commits`] for more information. -pub fn commits(root: impl Into) -> io::Result>> { +pub fn commits(root: CommitLogDir) -> io::Result>> { commits_from(root, 0) } @@ -412,7 +412,7 @@ pub fn commits(root: impl Into) -> io::Result, + root: CommitLogDir, offset: u64, ) -> io::Result>> { commitlog::commits_from(repo::Fs::new(root), DEFAULT_LOG_FORMAT_VERSION, offset) @@ -424,7 +424,7 @@ pub fn commits_from( /// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`]. /// See [`Commitlog::transactions`] for more information. pub fn transactions<'a, D, T>( - root: impl Into, + root: CommitLogDir, de: &'a D, ) -> io::Result, D::Error>> + 'a> where @@ -441,7 +441,7 @@ where /// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`]. /// See [`Commitlog::transactions_from`] for more information. pub fn transactions_from<'a, D, T>( - root: impl Into, + root: CommitLogDir, offset: u64, de: &'a D, ) -> io::Result, D::Error>> + 'a> @@ -458,7 +458,7 @@ where /// /// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`]. /// See [`Commitlog::fold_transactions`] for more information. -pub fn fold_transactions(root: impl Into, de: D) -> Result<(), D::Error> +pub fn fold_transactions(root: CommitLogDir, de: D) -> Result<(), D::Error> where D: Decoder, D::Error: From + From, @@ -471,7 +471,7 @@ where /// /// Starts the traversal without the upfront I/O imposed by [`Commitlog::open`]. /// See [`Commitlog::fold_transactions_from`] for more information. -pub fn fold_transactions_from(root: impl Into, offset: u64, de: D) -> Result<(), D::Error> +pub fn fold_transactions_from(root: CommitLogDir, offset: u64, de: D) -> Result<(), D::Error> where D: Decoder, D::Error: From + From, diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 9430e4c624..717a9768e0 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -1,24 +1,13 @@ +use std::fs::{self, File}; use std::io; -use std::{ - fs::{self, File}, - path::PathBuf, -}; use log::{debug, warn}; - -use crate::index::offset_index_file_path; +use spacetimedb_paths::server::CommitLogDir; use super::{Repo, TxOffset, TxOffsetIndex, TxOffsetIndexMut}; const SEGMENT_FILE_EXT: &str = ".stdb.log"; -/// By convention, the file name of a segment consists of the minimum -/// transaction offset contained in it, left-padded with zeroes to 20 digits, -/// and the file extension `.stdb.log`. -pub fn segment_file_name(offset: u64) -> String { - format!("{offset:0>20}{SEGMENT_FILE_EXT}") -} - // TODO // // - should use advisory locks? @@ -35,21 +24,15 @@ pub fn segment_file_name(offset: u64) -> String { #[derive(Clone, Debug)] pub struct Fs { /// The base directory within which segment files will be stored. - root: PathBuf, + root: CommitLogDir, } impl Fs { /// Create a commitlog repository which stores segments in the directory `root`. /// /// `root` must name an extant, accessible, writeable directory. - pub fn new(root: impl Into) -> Self { - Self { root: root.into() } - } - - /// Get the filename for a segment starting with `offset` within this - /// repository. - pub fn segment_path(&self, offset: u64) -> PathBuf { - self.root.join(segment_file_name(offset)) + pub fn new(root: CommitLogDir) -> Self { + Self { root } } /// Determine the size on disk as the sum of the sizes of all segments. @@ -58,12 +41,9 @@ impl Fs { pub fn size_on_disk(&self) -> io::Result { let mut sz = 0; for offset in self.existing_offsets()? { - sz += self.segment_path(offset).metadata()?.len(); + sz += self.root.segment(offset).metadata()?.len(); // Add the size of the offset index file if present - sz += offset_index_file_path(&self.root, offset) - .metadata() - .map(|m| m.len()) - .unwrap_or(0); + sz += self.root.index(offset).metadata().map(|m| m.len()).unwrap_or(0); } Ok(sz) @@ -78,7 +58,10 @@ impl Repo for Fs { .read(true) .append(true) .create_new(true) - .open(self.segment_path(offset)) + .open({ + let this = &self; + this.root.segment(offset) + }) .or_else(|e| { if e.kind() == io::ErrorKind::AlreadyExists { debug!("segment {offset} already exists"); @@ -94,14 +77,20 @@ impl Repo for Fs { } fn open_segment(&self, offset: u64) -> io::Result { - File::options().read(true).append(true).open(self.segment_path(offset)) + File::options().read(true).append(true).open({ + let this = &self; + this.root.segment(offset) + }) } fn remove_segment(&self, offset: u64) -> io::Result<()> { let _ = self.remove_offset_index(offset).map_err(|e| { warn!("failed to remove offset index for segment {offset}, error: {e}"); }); - fs::remove_file(self.segment_path(offset)) + fs::remove_file({ + let this = &self; + this.root.segment(offset) + }) } fn existing_offsets(&self) -> io::Result> { @@ -129,14 +118,14 @@ impl Repo for Fs { } fn create_offset_index(&self, offset: TxOffset, cap: u64) -> io::Result { - TxOffsetIndexMut::create_index_file(&self.root, offset, cap) + TxOffsetIndexMut::create_index_file(&self.root.index(offset), cap) } fn remove_offset_index(&self, offset: TxOffset) -> io::Result<()> { - TxOffsetIndexMut::delete_index_file(&self.root, offset) + TxOffsetIndexMut::delete_index_file(&self.root.index(offset)) } fn get_offset_index(&self, offset: TxOffset) -> io::Result { - TxOffsetIndex::open_index_file(&self.root, offset) + TxOffsetIndex::open_index_file(&self.root.index(offset)) } } diff --git a/crates/commitlog/tests/random_payload/mod.rs b/crates/commitlog/tests/random_payload/mod.rs index 9d10b256fe..7ab8643334 100644 --- a/crates/commitlog/tests/random_payload/mod.rs +++ b/crates/commitlog/tests/random_payload/mod.rs @@ -2,6 +2,7 @@ use std::num::NonZeroU16; use rand::Rng; use spacetimedb_commitlog::{payload, Commitlog, Options}; +use spacetimedb_paths::server::CommitLogDir; use tempfile::tempdir; fn gen_payload() -> [u8; 256] { @@ -15,7 +16,7 @@ fn gen_payload() -> [u8; 256] { fn smoke() { let root = tempdir().unwrap(); let clog = Commitlog::open( - root.path(), + CommitLogDir(root.path().into()), Options { max_segment_size: 8 * 1024, max_records_in_commit: NonZeroU16::MIN, @@ -44,7 +45,7 @@ fn smoke() { fn resets() { let root = tempdir().unwrap(); let mut clog = Commitlog::open( - root.path(), + CommitLogDir(root.path().into()), Options { max_segment_size: 512, max_records_in_commit: NonZeroU16::MIN, diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 91a23e587a..3829537c77 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -22,6 +22,7 @@ spacetimedb-commitlog.workspace = true spacetimedb-durability.workspace = true spacetimedb-metrics.workspace = true spacetimedb-primitives.workspace = true +spacetimedb-paths.workspace = true spacetimedb-sats = { workspace = true, features = ["serde"] } spacetimedb-schema.workspace = true spacetimedb-table.workspace = true @@ -72,6 +73,7 @@ regex.workspace = true rustc-demangle.workspace = true rustc-hash.workspace = true scopeguard.workspace = true +semver = { workspace = true, features = ["serde"] } serde.workspace = true serde_json.workspace = true serde_path_to_error.workspace = true diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index fb95523ee7..235ef9c329 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -1,6 +1,10 @@ use std::env::temp_dir; +use std::fmt; use std::path::{Path, PathBuf}; +use spacetimedb_lib::Address; +use spacetimedb_paths::server::MetadataTomlPath; + #[cfg(not(any(target_os = "macos", target_os = "windows")))] mod paths { use super::*; @@ -152,3 +156,47 @@ impl SpacetimeDbFiles for FilesGlobal { paths::config_path() } } + +#[derive(serde::Serialize, serde::Deserialize)] +pub struct MetadataFile { + pub version: semver::Version, + pub edition: String, + pub client_address: Option
, +} + +impl MetadataFile { + pub fn read(path: &MetadataTomlPath) -> anyhow::Result> { + match std::fs::read_to_string(path) { + Ok(contents) => Ok(Some(toml::from_str(&contents)?)), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e.into()), + } + } + + pub fn write(&self, path: &MetadataTomlPath) -> std::io::Result<()> { + std::fs::write(path, self.to_string()) + } + + pub fn version_compatible_with(&self, version: &semver::Version) -> bool { + semver::Comparator { + op: semver::Op::Caret, + major: self.version.major, + minor: Some(self.version.minor), + patch: Some(self.version.patch), + pre: self.version.pre.clone(), + } + .matches(version) + } +} + +impl fmt::Display for MetadataFile { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "# THIS FILE IS GENERATED BY SPACETIMEDB, DO NOT MODIFY!")?; + writeln!(f)?; + f.write_str(&toml::to_string(self).unwrap()) + } +} + +pub fn current_version() -> semver::Version { + env!("CARGO_PKG_VERSION").parse().unwrap() +} diff --git a/crates/core/src/database_logger.rs b/crates/core/src/database_logger.rs index 8e22664025..a54db3243c 100644 --- a/crates/core/src/database_logger.rs +++ b/crates/core/src/database_logger.rs @@ -1,11 +1,9 @@ -use core::str::FromStr; -use spacetimedb_lib::address::Address; -use std::fs::OpenOptions; -use std::fs::{self, File}; -use std::io::{self, prelude::*, SeekFrom}; -use std::path::{Path, PathBuf}; +use std::fs::File; +use std::io::{self, Seek, SeekFrom, Write}; use tokio::sync::broadcast; +use spacetimedb_paths::server::{ModuleLogPath, ReplicaPath}; + pub struct DatabaseLogger { file: File, pub tx: broadcast::Sender, @@ -126,21 +124,16 @@ impl DatabaseLogger { // PathBuf::from(path) // } - pub fn filepath(address: &Address, replica_id: u64) -> PathBuf { - let root = crate::stdb_path("worker_node/replicas"); - root.join(&*address.to_hex()) - .join(replica_id.to_string()) - .join("module_logs") + pub fn filepath(replica_path: ReplicaPath) -> ModuleLogPath { + replica_path.module_log(chrono::Utc::now().date_naive()) } - pub fn open(root: impl AsRef) -> Self { - let root = root.as_ref(); - fs::create_dir_all(root).unwrap(); - - let mut filepath = PathBuf::from(root); - filepath.push(&PathBuf::from_str("0.log").unwrap()); + pub fn open(replica_path: ReplicaPath) -> Self { + Self::open_from_path(&Self::filepath(replica_path)) + } - let file = OpenOptions::new().create(true).append(true).open(&filepath).unwrap(); + pub fn open_from_path(path: &ModuleLogPath) -> Self { + let file = path.open_file(File::options().create(true).append(true)).unwrap(); let (tx, _) = broadcast::channel(64); Self { file, tx } } @@ -175,17 +168,13 @@ impl DatabaseLogger { let _ = self.tx.send(buf.into()); } - pub async fn _read_all(root: &Path) -> String { - let filepath = root.join("0.log"); - - tokio::fs::read_to_string(&filepath).await.unwrap() + pub async fn _read_all(filepath: &ModuleLogPath) -> String { + tokio::fs::read_to_string(filepath).await.unwrap() } - pub async fn read_latest(root: &Path, num_lines: Option) -> String { - let filepath = root.join("0.log"); - + pub async fn read_latest(filepath: &ModuleLogPath, num_lines: Option) -> String { // TODO: Read backwards from the end of the file to only read in the latest lines - let text = tokio::fs::read_to_string(&filepath).await.expect("reading log file"); + let text = tokio::fs::read_to_string(filepath).await.expect("reading log file"); let Some(num_lines) = num_lines else { return text }; diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 58e431f11d..50a3a2d7d7 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -29,6 +29,7 @@ use spacetimedb_lib::address::Address; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::raw_def::v9::{RawIndexAlgorithm, RawModuleDefV9Builder, RawSql}; use spacetimedb_lib::Identity; +use spacetimedb_paths::server::{CommitLogDir, ReplicaPath, SnapshotsPath}; use spacetimedb_primitives::*; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; use spacetimedb_schema::def::{ModuleDef, TableDef}; @@ -40,7 +41,7 @@ use spacetimedb_table::MemoryUsage; use std::borrow::Cow; use std::collections::HashSet; use std::fmt; -use std::fs::{self, File}; +use std::fs::File; use std::io; use std::ops::RangeBounds; use std::path::Path; @@ -96,6 +97,7 @@ pub struct RelationalDB { // DO NOT ADD FIELDS AFTER THIS. // By default, fields are dropped in declaration order. // We want to release the file lock last. + // TODO(noa): is this lockfile still necessary now that we have data-dir? _lock: LockFile, } @@ -201,7 +203,6 @@ impl RelationalDB { row_count_fn: default_row_count_fn(address), disk_size_fn, - _lock: lock, } } @@ -286,7 +287,7 @@ impl RelationalDB { /// /// [ModuleHost]: crate::host::module_host::ModuleHost pub fn open( - root: &Path, + root: &ReplicaPath, address: Address, owner_identity: Identity, history: impl durability::History, @@ -1193,8 +1194,8 @@ struct LockFile { } impl LockFile { - pub fn lock(root: impl AsRef) -> Result { - fs::create_dir_all(&root)?; + pub fn lock(root: &ReplicaPath) -> Result { + root.create()?; let path = root.as_ref().join("db.lock"); let lock = File::create(&path)?; lock.try_lock_exclusive() @@ -1258,8 +1259,9 @@ where /// /// Note that this operation can be expensive, as it needs to traverse a suffix /// of the commitlog. -pub async fn local_durability(db_path: &Path) -> io::Result<(Arc>, DiskSizeFn)> { - let commitlog_dir = db_path.join("clog"); +pub async fn local_durability( + commitlog_dir: CommitLogDir, +) -> io::Result<(Arc>, DiskSizeFn)> { tokio::fs::create_dir_all(&commitlog_dir).await?; let rt = tokio::runtime::Handle::current(); // TODO: Should this better be spawn_blocking? @@ -1289,13 +1291,12 @@ pub async fn local_durability(db_path: &Path) -> io::Result<(Arc Result, Box> { - let snapshot_dir = db_path.join("snapshots"); - std::fs::create_dir_all(&snapshot_dir).map_err(|e| Box::new(SnapshotError::from(e)))?; - SnapshotRepository::open(snapshot_dir, database_address, replica_id) + path.create().map_err(SnapshotError::from)?; + SnapshotRepository::open(path, database_address, replica_id) .map(Arc::new) .map_err(Box::new) } @@ -1349,7 +1350,7 @@ pub mod tests_utils { /// Create a [`TestDB`] which does not store data on disk. pub fn in_memory() -> Result { let dir = TempDir::with_prefix("stdb_test")?; - let db = Self::in_memory_internal(dir.path())?; + let db = Self::in_memory_internal(ReplicaPath(dir.path().into()))?; Ok(Self { db, @@ -1368,7 +1369,7 @@ pub mod tests_utils { let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?; // Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`. let _rt = rt.enter(); - let (db, handle) = Self::durable_internal(dir.path(), rt.handle().clone())?; + let (db, handle) = Self::durable_internal(ReplicaPath(dir.path().into()), rt.handle().clone())?; let durable = DurableState { handle, rt }; Ok(Self { @@ -1391,7 +1392,8 @@ pub mod tests_utils { // Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`. let _rt = rt.enter(); - let (db, handle) = Self::durable_internal(self.tmp_dir.path(), rt.handle().clone())?; + let (db, handle) = + Self::durable_internal(ReplicaPath(self.tmp_dir.path().into()), rt.handle().clone())?; let durable = DurableState { handle, rt }; Ok(Self { @@ -1400,7 +1402,7 @@ pub mod tests_utils { ..self }) } else { - let db = Self::in_memory_internal(self.tmp_dir.path())?; + let db = Self::in_memory_internal(ReplicaPath(self.tmp_dir.path().into()))?; Ok(Self { db, ..self }) } } @@ -1431,8 +1433,8 @@ pub mod tests_utils { } /// The root path of the (temporary) database directory. - pub fn path(&self) -> &Path { - self.tmp_dir.path() + pub fn path(&self) -> ReplicaPath { + ReplicaPath(self.tmp_dir.path().into()) } /// Handle to the tokio runtime, available if [`Self::durable`] was used @@ -1458,31 +1460,31 @@ pub mod tests_utils { (db, durability, rt, tmp_dir) } - fn in_memory_internal(root: &Path) -> Result { + fn in_memory_internal(root: ReplicaPath) -> Result { Self::open_db(root, EmptyHistory::new(), None, None) } fn durable_internal( - root: &Path, + root: ReplicaPath, rt: tokio::runtime::Handle, ) -> Result<(RelationalDB, Arc>), DBError> { - let (local, disk_size_fn) = rt.block_on(local_durability(root))?; + let (local, disk_size_fn) = rt.block_on(local_durability(root.clone().commit_log()))?; let history = local.clone(); let durability = local.clone() as Arc>; - let snapshot_repo = open_snapshot_repo(root, Address::default(), 0)?; + let snapshot_repo = open_snapshot_repo(root.clone().snapshots(), Address::default(), 0)?; let db = Self::open_db(root, history, Some((durability, disk_size_fn)), Some(snapshot_repo))?; Ok((db, local)) } fn open_db( - root: &Path, + root: ReplicaPath, history: impl durability::History, durability: Option<(Arc>, DiskSizeFn)>, snapshot_repo: Option>, ) -> Result { let (db, connected_clients) = - RelationalDB::open(root, Self::ADDRESS, Self::OWNER, history, durability, snapshot_repo)?; + RelationalDB::open(&root, Self::ADDRESS, Self::OWNER, history, durability, snapshot_repo)?; debug_assert!(connected_clients.is_empty()); let db = db.with_row_count(Self::row_count_fn()); db.with_auto_commit(&ExecutionContext::internal(db.address()), |tx| { @@ -1609,7 +1611,7 @@ mod tests { stdb.commit_tx(&ExecutionContext::default(), tx)?; match RelationalDB::open( - stdb.path(), + &stdb.path(), Address::ZERO, Identity::ZERO, EmptyHistory::new(), @@ -2402,8 +2404,8 @@ mod tests { row_ty, })); { - let clog = - Commitlog::<()>::open(dir.path().join("clog"), Default::default()).expect("failed to open commitlog"); + let clog = Commitlog::<()>::open(ReplicaPath(dir.path().into()).commit_log(), Default::default()) + .expect("failed to open commitlog"); let decoder = Decoder(Rc::clone(&inputs)); clog.fold_transactions(decoder).unwrap(); } diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 316e3afbc1..ab7bca5be1 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1,7 +1,9 @@ use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule}; use super::scheduler::SchedulerStarter; +use super::wasmtime::WasmtimeRuntime; use super::{Scheduler, UpdateDatabaseResult}; use crate::database_logger::DatabaseLogger; +use crate::db; use crate::db::datastore::traits::Program; use crate::db::db_metrics::DB_METRICS; use crate::db::relational_db::{self, RelationalDB}; @@ -11,7 +13,6 @@ use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::util::spawn_rayon; -use crate::{db, host}; use anyhow::{anyhow, ensure, Context}; use async_trait::async_trait; use durability::EmptyHistory; @@ -21,10 +22,10 @@ use serde::Serialize; use spacetimedb_data_structures::map::IntMap; use spacetimedb_durability as durability; use spacetimedb_lib::hash_bytes; +use spacetimedb_paths::server::{ReplicaPath, ServerDataPath}; use spacetimedb_sats::hash::Hash; use std::fmt; use std::future::Future; -use std::path::Path; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock}; @@ -64,13 +65,8 @@ pub struct HostController { /// Map of all hosts managed by this controller, /// keyed by database instance id. hosts: Hosts, - /// The directory to create replicas in. - /// - /// For example: - /// - /// - `$STDB_PATH/worker_node/replicas` - /// - `$STDB_PATH/replicas` - root_dir: Arc, + /// The root directory for database data. + pub data_dir: Arc, /// The default configuration to use for databases created by this /// controller. default_config: db::Config, @@ -78,6 +74,19 @@ pub struct HostController { program_storage: ProgramStorage, /// The [`EnergyMonitor`] used by this controller. energy_monitor: Arc, + /// The runtimes for running our modules. + runtimes: Arc, +} + +struct HostRuntimes { + wasmtime: WasmtimeRuntime, +} + +impl HostRuntimes { + fn new(data_dir: &ServerDataPath) -> Arc { + let wasmtime = WasmtimeRuntime::new(data_dir); + Arc::new(Self { wasmtime }) + } } #[derive(PartialEq, Eq, Hash, Copy, Clone, Serialize, Debug)] @@ -167,17 +176,18 @@ impl From<&EventStatus> for ReducerOutcome { impl HostController { pub fn new( - root_dir: Arc, + data_dir: Arc, default_config: db::Config, program_storage: ProgramStorage, energy_monitor: Arc, ) -> Self { Self { hosts: <_>::default(), - root_dir, default_config, program_storage, energy_monitor, + runtimes: HostRuntimes::new(&data_dir), + data_dir, } } @@ -315,7 +325,13 @@ impl HostController { } }; let update_result = host - .update_module(host_type, program, self.unregister_fn(replica_id)) + .update_module( + self.runtimes.clone(), + host_type, + program, + self.energy_monitor.clone(), + self.unregister_fn(replica_id), + ) .await?; *guard = Some(host); @@ -386,7 +402,13 @@ impl HostController { ); let program = load_program(&self.program_storage, program_hash).await?; let update_result = host - .update_module(host_type, program, self.unregister_fn(replica_id)) + .update_module( + self.runtimes.clone(), + host_type, + program, + self.energy_monitor.clone(), + self.unregister_fn(replica_id), + ) .await?; match update_result { UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => { @@ -479,16 +501,7 @@ impl HostController { } async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result { - Host::try_init( - &self.root_dir, - self.default_config, - database, - replica_id, - self.program_storage.clone(), - self.energy_monitor.clone(), - self.unregister_fn(replica_id), - ) - .await + Host::try_init(self, database, replica_id).await } } @@ -498,12 +511,12 @@ fn stored_program_hash(db: &RelationalDB) -> anyhow::Result> { } async fn make_replica_ctx( + path: ReplicaPath, database: Database, replica_id: u64, relational_db: Arc, ) -> anyhow::Result { - let log_path = DatabaseLogger::filepath(&database.address, replica_id); - let logger = tokio::task::block_in_place(|| Arc::new(DatabaseLogger::open(log_path))); + let logger = tokio::task::block_in_place(move || Arc::new(DatabaseLogger::open(path))); let subscriptions = ModuleSubscriptions::new(relational_db.clone(), database.owner_identity); Ok(ReplicaContext { @@ -518,6 +531,7 @@ async fn make_replica_ctx( /// Initialize a module host for the given program. /// The passed replica_ctx may not be configured for this version of the program's database schema yet. async fn make_module_host( + runtimes: Arc, host_type: HostType, replica_ctx: Arc, scheduler: Scheduler, @@ -535,7 +549,7 @@ async fn make_module_host( energy_monitor, }; let start = Instant::now(); - let actor = host::wasmtime::make_actor(mcc)?; + let actor = runtimes.wasmtime.make_actor(mcc)?; trace!("wasmtime::make_actor blocked for {:?}", start.elapsed()); ModuleHost::new(actor, unregister) } @@ -567,15 +581,18 @@ async fn launch_module( on_panic: impl Fn() + Send + Sync + 'static, relational_db: Arc, energy_monitor: Arc, + replica_path: ReplicaPath, + runtimes: Arc, ) -> anyhow::Result<(Program, LaunchedModule)> { let address = database.address; let host_type = database.host_type; - let replica_ctx = make_replica_ctx(database, replica_id, relational_db) + let replica_ctx = make_replica_ctx(replica_path, database, replica_id, relational_db) .await .map(Arc::new)?; let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone()); let (program, module_host) = make_module_host( + runtimes.clone(), host_type, replica_ctx.clone(), scheduler.clone(), @@ -651,9 +668,6 @@ struct Host { /// The task collects metrics from the `replica_ctx`, and so stays alive as long /// as the `replica_ctx` is live. The task is aborted when [`Host`] is dropped. metrics_task: AbortHandle, - - /// [`EnergyMonitor`] to use for [`Host::update_module`]. - energy_monitor: Arc, } impl Host { @@ -662,22 +676,21 @@ impl Host { /// Note that this does **not** run module initialization routines, but may /// create on-disk artifacts if the host / database did not exist. #[tracing::instrument(skip_all)] - async fn try_init( - root_dir: &Path, - config: db::Config, - database: Database, - replica_id: u64, - program_storage: ProgramStorage, - energy_monitor: Arc, - on_panic: impl Fn() + Send + Sync + 'static, - ) -> anyhow::Result { - let mut db_path = root_dir.to_path_buf(); - db_path.extend([&*database.address.to_hex(), &*replica_id.to_string()]); - db_path.push("database"); + async fn try_init(host_controller: &HostController, database: Database, replica_id: u64) -> anyhow::Result { + let HostController { + data_dir, + default_config: config, + program_storage, + energy_monitor, + runtimes, + .. + } = host_controller; + let on_panic = host_controller.unregister_fn(replica_id); + let replica_path = data_dir.replica(replica_id); let (db, connected_clients) = match config.storage { db::Storage::Memory => RelationalDB::open( - &db_path, + &replica_path, database.address, database.owner_identity, EmptyHistory::new(), @@ -685,11 +698,13 @@ impl Host { None, )?, db::Storage::Disk => { - let (durability, disk_size_fn) = relational_db::local_durability(&db_path).await?; - let snapshot_repo = relational_db::open_snapshot_repo(&db_path, database.address, replica_id)?; + let (durability, disk_size_fn) = + relational_db::local_durability(replica_path.clone().commit_log()).await?; + let snapshot_repo = + relational_db::open_snapshot_repo(replica_path.clone().snapshots(), database.address, replica_id)?; let history = durability.clone(); RelationalDB::open( - &db_path, + &replica_path, database.address, database.owner_identity, history, @@ -698,48 +713,41 @@ impl Host { )? } }; - let LaunchedModule { - replica_ctx, - module_host, - scheduler, - scheduler_starter, - } = match db.program()? { + let (program, program_needs_init) = match db.program()? { // Launch module with program from existing database. - Some(program) => { - let (_, launched) = launch_module( - database, - replica_id, - program, - on_panic, - Arc::new(db), - energy_monitor.clone(), - ) - .await?; - launched - } - + Some(program) => (program, false), // Database is empty, load program from external storage and run // initialization. - None => { - let program = load_program(&program_storage, database.initial_program).await?; - let (program, launched) = launch_module( - database, - replica_id, - program, - on_panic, - Arc::new(db), - energy_monitor.clone(), - ) - .await?; + None => (load_program(&program_storage, database.initial_program).await?, true), + }; - let call_result = launched.module_host.init_database(program).await?; - if let Some(call_result) = call_result { - Result::from(call_result)?; - } + let (program, launched) = launch_module( + database, + replica_id, + program, + on_panic, + Arc::new(db), + energy_monitor.clone(), + replica_path, + runtimes.clone(), + ) + .await?; - launched + if program_needs_init { + let call_result = launched.module_host.init_database(program).await?; + if let Some(call_result) = call_result { + Result::from(call_result)?; } - }; + } else { + drop(program) + } + + let LaunchedModule { + replica_ctx, + module_host, + scheduler, + scheduler_starter, + } = launched; // Disconnect dangling clients. for (identity, address) in connected_clients { @@ -762,8 +770,6 @@ impl Host { replica_ctx, scheduler, metrics_task, - - energy_monitor, }) } @@ -781,19 +787,22 @@ impl Host { /// Either way, the [`UpdateDatabaseResult`] is returned. async fn update_module( &mut self, + runtimes: Arc, host_type: HostType, program: Program, + energy_monitor: Arc, on_panic: impl Fn() + Send + Sync + 'static, ) -> anyhow::Result { let replica_ctx = &self.replica_ctx; - let (scheduler, scheduler_starter) = self.scheduler.new_with_same_db(); + let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone()); let (program, module) = make_module_host( + runtimes, host_type, replica_ctx.clone(), scheduler.clone(), program, - self.energy_monitor.clone(), + energy_monitor, on_panic, ) .await?; diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 41f9f0b7d6..cf22f5e01d 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -56,7 +56,6 @@ pub struct ScheduledReducer { #[derive(Clone)] pub struct Scheduler { tx: mpsc::UnboundedSender>, - db: Arc, } pub struct SchedulerStarter { @@ -67,11 +66,7 @@ pub struct SchedulerStarter { impl Scheduler { pub fn open(db: Arc) -> (Self, SchedulerStarter) { let (tx, rx) = mpsc::unbounded_channel(); - (Scheduler { tx, db: db.clone() }, SchedulerStarter { rx, db }) - } - - pub fn new_with_same_db(&self) -> (Self, SchedulerStarter) { - Self::open(self.db.clone()) + (Scheduler { tx }, SchedulerStarter { rx, db }) } } diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index e15d025474..0075d89112 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -1,13 +1,12 @@ use std::borrow::Cow; use anyhow::Context; -use once_cell::sync::Lazy; +use spacetimedb_paths::server::{ServerDataPath, WasmtimeCacheDir}; use wasmtime::{Engine, Linker, Module, StoreContext, StoreContextMut}; use crate::energy::{EnergyQuanta, ReducerBudget}; use crate::error::NodesError; use crate::module_host_context::ModuleCreationContext; -use crate::stdb_path; mod wasm_instance_env; mod wasmtime_module; @@ -19,56 +18,66 @@ use self::wasm_instance_env::WasmInstanceEnv; use super::wasm_common::module_host_actor::InitializationError; use super::wasm_common::{abi, module_host_actor::WasmModuleHostActor, ModuleCreationError}; -static ENGINE: Lazy = Lazy::new(|| { - let mut config = wasmtime::Config::new(); - config - .cranelift_opt_level(wasmtime::OptLevel::Speed) - .consume_fuel(true) - .wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); - - let cache_config = toml::toml! { - // see for options here - [cache] - enabled = true - directory = (toml::Value::try_from(stdb_path("worker_node/wasmtime_cache")).unwrap()) - }; - // ignore errors for this - if we're not able to set up caching, that's fine, it's just an optimization - let _ = set_cache_config(&mut config, cache_config); +pub struct WasmtimeRuntime { + engine: Engine, + linker: Box>, +} - Engine::new(&config).unwrap() -}); +impl WasmtimeRuntime { + pub fn new(data_dir: &ServerDataPath) -> Self { + let mut config = wasmtime::Config::new(); + config + .cranelift_opt_level(wasmtime::OptLevel::Speed) + .consume_fuel(true) + .wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); -fn set_cache_config(config: &mut wasmtime::Config, cache_config: toml::value::Table) -> anyhow::Result<()> { - use std::io::Write; - let tmpfile = tempfile::NamedTempFile::new()?; - write!(&tmpfile, "{cache_config}")?; - config.cache_config_load(tmpfile.path())?; - Ok(()) -} + // ignore errors for this - if we're not able to set up caching, that's fine, it's just an optimization + let _ = Self::set_cache_config(&mut config, data_dir.wasmtime_cache()); -static LINKER: Lazy> = Lazy::new(|| { - let mut linker = Linker::new(&ENGINE); - WasmtimeModule::link_imports(&mut linker).unwrap(); - linker -}); + let engine = Engine::new(&config).unwrap(); -pub fn make_actor(mcc: ModuleCreationContext) -> Result { - let module = Module::new(&ENGINE, &mcc.program.bytes).map_err(ModuleCreationError::WasmCompileError)?; + let mut linker = Box::new(Linker::new(&engine)); + WasmtimeModule::link_imports(&mut linker).unwrap(); - let func_imports = module - .imports() - .filter(|imp| matches!(imp.ty(), wasmtime::ExternType::Func(_))); - let abi = abi::determine_spacetime_abi(func_imports, |imp| imp.module())?; + WasmtimeRuntime { engine, linker } + } - abi::verify_supported(WasmtimeModule::IMPLEMENTED_ABI, abi)?; + fn set_cache_config(config: &mut wasmtime::Config, cache_dir: WasmtimeCacheDir) -> anyhow::Result<()> { + use std::io::Write; + let cache_config = toml::toml! { + // see for options here + [cache] + enabled = true + directory = (toml::Value::try_from(cache_dir.0)?) + }; + let tmpfile = tempfile::NamedTempFile::new()?; + write!(&tmpfile, "{}", cache_config.to_string())?; + config.cache_config_load(tmpfile.path())?; + Ok(()) + } - let module = LINKER - .instantiate_pre(&module) - .map_err(InitializationError::Instantiation)?; + pub fn make_actor( + &self, + mcc: ModuleCreationContext, + ) -> Result { + let module = Module::new(&self.engine, &mcc.program.bytes).map_err(ModuleCreationError::WasmCompileError)?; - let module = WasmtimeModule::new(module); + let func_imports = module + .imports() + .filter(|imp| matches!(imp.ty(), wasmtime::ExternType::Func(_))); + let abi = abi::determine_spacetime_abi(func_imports, |imp| imp.module())?; - WasmModuleHostActor::new(mcc, module).map_err(Into::into) + abi::verify_supported(WasmtimeModule::IMPLEMENTED_ABI, abi)?; + + let module = self + .linker + .instantiate_pre(&module) + .map_err(InitializationError::Instantiation)?; + + let module = WasmtimeModule::new(module); + + WasmModuleHostActor::new(mcc, module).map_err(Into::into) + } } #[derive(Debug, derive_more::From)] diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index e4d68a9b64..941c99a6e3 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -1,23 +1,9 @@ -use std::path::{Path, PathBuf}; - -use once_cell::sync::Lazy; - extern crate core; pub mod energy; pub mod json; pub mod sql; -pub static STDB_PATH: Lazy = - Lazy::new(|| PathBuf::from(std::env::var_os("STDB_PATH").expect("STDB_PATH must be set"))); - -pub fn stdb_path(s: &S) -> PathBuf -where - S: AsRef + ?Sized, -{ - STDB_PATH.join(s) -} - pub mod address { pub use spacetimedb_lib::Address; } diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 0a88f345b7..9df1628358 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -5,7 +5,6 @@ use crate::messages::control_db::Database; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use std::io; use std::ops::Deref; -use std::path::PathBuf; use std::sync::Arc; pub type Result = anyhow::Result; @@ -21,13 +20,6 @@ pub struct ReplicaContext { } impl ReplicaContext { - pub fn scheduler_db_path(&self, root_db_path: PathBuf) -> PathBuf { - let mut scheduler_db_path = root_db_path; - scheduler_db_path.extend([&*self.address.to_hex(), &*self.replica_id.to_string()]); - scheduler_db_path.push("scheduler"); - scheduler_db_path - } - /// The number of bytes on disk occupied by the database's durability layer. /// /// An in-memory database will return `Ok(0)`. diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index e805acd04e..2616ff4d89 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -12,6 +12,7 @@ anyhow.workspace = true itertools.workspace = true log.workspace = true spacetimedb-commitlog.workspace = true +spacetimedb-paths.workspace = true spacetimedb-sats.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 67b8b986c8..88cbb58bd7 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -2,7 +2,6 @@ use std::{ io, num::NonZeroU16, panic, - path::PathBuf, sync::{ atomic::{ AtomicI64, AtomicU64, @@ -17,6 +16,7 @@ use anyhow::Context as _; use itertools::Itertools as _; use log::{info, trace, warn}; use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction}; +use spacetimedb_paths::server::CommitLogDir; use tokio::{ sync::mpsc, task::{spawn_blocking, AbortHandle, JoinHandle}, @@ -94,7 +94,7 @@ impl Local { /// The `root` directory must already exist. /// /// Background tasks are spawned onto the provided tokio runtime. - pub fn open(root: impl Into, rt: tokio::runtime::Handle, opts: Options) -> io::Result { + pub fn open(root: CommitLogDir, rt: tokio::runtime::Handle, opts: Options) -> io::Result { info!("open local durability"); let clog = Arc::new(Commitlog::open(root, opts.commitlog)?); diff --git a/crates/fs-utils/src/lockfile.rs b/crates/fs-utils/src/lockfile.rs index 4bb2ef6199..70297fc828 100644 --- a/crates/fs-utils/src/lockfile.rs +++ b/crates/fs-utils/src/lockfile.rs @@ -36,7 +36,8 @@ impl Lockfile { /// Acquire an exclusive lock on the file `file_path`. /// /// `file_path` should be the full path of the file to which to acquire exclusive access. - pub fn for_file(file_path: &Path) -> Result { + pub fn for_file>(file_path: P) -> Result { + let file_path = file_path.as_ref(); // TODO: Someday, it would be nice to use OS locks to minimize edge cases (see // https://github.com/clockworklabs/SpacetimeDB/pull/1341#issuecomment-2151018992). // @@ -62,8 +63,8 @@ impl Lockfile { /// Returns the path of a lockfile for the file `file_path`, /// without actually acquiring the lock. - pub fn lock_path(file_path: &Path) -> PathBuf { - file_path.with_extension("lock") + pub fn lock_path>(file_path: P) -> PathBuf { + file_path.as_ref().with_extension("lock") } fn release_internal(path: &Path) -> Result<(), LockfileError> { diff --git a/crates/paths/Cargo.toml b/crates/paths/Cargo.toml new file mode 100644 index 0000000000..81b21e62e3 --- /dev/null +++ b/crates/paths/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "spacetimedb-paths" +version.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +chrono.workspace = true +fs2.workspace = true +itoa.workspace = true +thiserror.workspace = true diff --git a/crates/paths/src/lib.rs b/crates/paths/src/lib.rs new file mode 100644 index 0000000000..81edfe18c9 --- /dev/null +++ b/crates/paths/src/lib.rs @@ -0,0 +1,71 @@ +use std::path::{Path, PathBuf}; + +trait PathBufExt { + fn joined>(self, path: P) -> Self; + fn joined_int(self, path_seg: I) -> Self; +} + +impl PathBufExt for PathBuf { + fn joined>(mut self, path: P) -> Self { + self.push(path); + self + } + + fn joined_int(self, path_seg: I) -> Self { + self.joined(itoa::Buffer::new().format(path_seg)) + } +} + +macro_rules! path_type { + ($name:ident) => { + #[derive(Clone, Debug)] + pub struct $name(pub std::path::PathBuf); + + impl AsRef for $name { + fn as_ref(&self) -> &std::path::Path { + &self.0 + } + } + + impl $name { + #[inline] + pub fn display(&self) -> std::path::Display<'_> { + self.0.display() + } + + #[inline] + pub fn metadata(&self) -> std::io::Result { + self.0.metadata() + } + } + }; + ($name:ident: dir) => { + path_type!($name); + impl $name { + #[inline] + pub fn create(&self) -> std::io::Result<()> { + std::fs::create_dir_all(self) + } + #[inline] + pub fn read_dir(&self) -> std::io::Result { + self.0.read_dir() + } + } + }; + ($name:ident: file) => { + path_type!($name); + impl $name { + /// Opens a file at this path with the given options, ensuring its parent directory exists. + #[inline] + pub fn open_file(&self, options: &std::fs::OpenOptions) -> std::io::Result { + if let Some(parent) = self.0.parent() { + std::fs::create_dir_all(parent)?; + } + options.open(self) + } + } + }; +} + +pub mod server; +pub mod standalone; diff --git a/crates/paths/src/server.rs b/crates/paths/src/server.rs new file mode 100644 index 0000000000..00df96119a --- /dev/null +++ b/crates/paths/src/server.rs @@ -0,0 +1,130 @@ +use std::fmt::Write; +use std::{fs, io}; + +use crate::PathBufExt; +use chrono::NaiveDate; + +path_type!(ServerDataPath: dir); + +impl ServerDataPath { + pub fn wasmtime_cache(&self) -> WasmtimeCacheDir { + WasmtimeCacheDir(self.0.join("cache/wasmtime")) + } + + pub fn metadata_toml(&self) -> MetadataTomlPath { + MetadataTomlPath(self.0.join("metadata.toml")) + } + + pub fn pid_file(&self) -> Result { + use io::Write; + let path = self.0.join("spacetime.pid"); + let mut file = fs::File::create_new(&path).map_err(|e| { + if e.kind() == io::ErrorKind::AlreadyExists { + let pid = fs::read_to_string(&path).ok().and_then(|s| s.trim().parse().ok()); + PidFileError::Exists { pid } + } else { + PidFileError::Io(e) + } + })?; + let pidfile = PidFile(path); + write!(file, "{}", std::process::id()).map_err(PidFileError::Io)?; + Ok(pidfile) + } + + pub fn replica(&self, replica_id: u64) -> ReplicaPath { + ReplicaPath(self.0.join("replicas").joined_int(replica_id)) + } +} + +path_type!(WasmtimeCacheDir: dir); + +path_type!(MetadataTomlPath: file); + +#[derive(thiserror::Error, Debug)] +pub enum PidFileError { + #[error("error while taking database lock on spacetime.pid")] + Io(io::Error), + #[error("cannot take lock on database; spacetime.pid already exists (owned by pid {pid:?})")] + Exists { pid: Option }, +} + +/// Removes file upon drop +pub struct PidFile(std::path::PathBuf); + +impl Drop for PidFile { + fn drop(&mut self) { + let _ = fs::remove_file(&self.0); + } +} + +path_type!(ReplicaPath: dir); + +impl ReplicaPath { + /// `date` should be in UTC. + pub fn module_log(self, date: NaiveDate) -> ModuleLogPath { + let mut path = self.0.joined("module_logs/"); + write!(path.as_mut_os_string(), "{date}.date").unwrap(); + ModuleLogPath(path) + } + + pub fn snapshots(self) -> SnapshotsPath { + SnapshotsPath(self.0.joined("snapshots")) + } + + pub fn commit_log(self) -> CommitLogDir { + CommitLogDir(self.0.join("clog")) + } +} + +path_type!(ModuleLogPath: file); + +path_type!(SnapshotsPath: dir); + +impl SnapshotsPath { + pub fn snapshot_dir(&self, tx_offset: u64) -> SnapshotDirPath { + let dir_name = format!("{tx_offset:0>20}.snapshot_dir"); + SnapshotDirPath(self.0.join(dir_name)) + } +} + +path_type!(SnapshotDirPath: dir); + +impl SnapshotDirPath { + pub fn snapshot_file(&self, tx_offset: u64) -> SnapshotFilePath { + let file_name = format!("{tx_offset:0>20}.snapshot_bsatn"); + SnapshotFilePath(self.0.join(file_name)) + } + + pub fn objects(&self) -> SnapshotObjectsPath { + SnapshotObjectsPath(self.0.join("objects")) + } + + pub fn rename_invalid(&self) -> io::Result<()> { + let invalid_path = self.0.with_extension("invalid_snapshot"); + fs::rename(self, invalid_path) + } +} + +path_type!(SnapshotFilePath: file); +path_type!(SnapshotObjectsPath: dir); + +path_type!(CommitLogDir: dir); + +impl CommitLogDir { + /// By convention, the file name of a segment consists of the minimum + /// transaction offset contained in it, left-padded with zeroes to 20 digits, + /// and the file extension `.stdb.log`. + pub fn segment(&self, offset: u64) -> SegmentFile { + let file_name = format!("{offset:0>20}.stdb.log"); + SegmentFile(self.0.join(file_name)) + } + + /// Returns the offset index file path based on the root path and offset + pub fn index(&self, offset: u64) -> OffsetIndexFile { + let file_name = format!("{offset:0>20}.stdb.ofs"); + OffsetIndexFile(self.0.join(file_name)) + } +} + +path_type!(SegmentFile: file); +path_type!(OffsetIndexFile: file); diff --git a/crates/paths/src/standalone.rs b/crates/paths/src/standalone.rs new file mode 100644 index 0000000000..637299852a --- /dev/null +++ b/crates/paths/src/standalone.rs @@ -0,0 +1,20 @@ +use crate::server::ServerDataPath; + +mod sealed { + pub trait Sealed: AsRef {} +} + +pub trait StandaloneDataDirExt: sealed::Sealed { + fn program_bytes(&self) -> ProgramBytesDir { + ProgramBytesDir(self.as_ref().join("program-bytes")) + } + fn control_db(&self) -> ControlDbDir { + ControlDbDir(self.as_ref().join("control-db")) + } +} + +impl sealed::Sealed for ServerDataPath {} +impl StandaloneDataDirExt for ServerDataPath {} + +path_type!(ProgramBytesDir: dir); +path_type!(ControlDbDir: dir); diff --git a/crates/snapshot/Cargo.toml b/crates/snapshot/Cargo.toml index a3b0a9f558..f11a8bd7b0 100644 --- a/crates/snapshot/Cargo.toml +++ b/crates/snapshot/Cargo.toml @@ -12,6 +12,7 @@ spacetimedb-durability.workspace = true spacetimedb-lib.workspace = true spacetimedb-sats = { workspace = true, features = ["blake3"] } spacetimedb-primitives.workspace = true +spacetimedb-paths.workspace = true spacetimedb-fs-utils.workspace = true blake3.workspace = true diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 325c2ce2ac..42f48f5918 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -34,6 +34,7 @@ use spacetimedb_lib::{ ser::Serialize, Address, }; +use spacetimedb_paths::server::{SnapshotDirPath, SnapshotFilePath, SnapshotsPath}; use spacetimedb_primitives::TableId; use spacetimedb_table::{ blob_store::{BlobHash, BlobStore, HashMapBlobStore}, @@ -44,7 +45,7 @@ use std::{ collections::BTreeMap, ffi::OsStr, io::{Read, Write}, - path::{Path, PathBuf}, + path::PathBuf, }; #[derive(Debug, Copy, Clone)] @@ -110,7 +111,7 @@ pub enum SnapshotError { #[error("Refusing to reconstruct snapshot {tx_offset} with unsupported version {version}")] BadVersion { tx_offset: TxOffset, version: u8 }, #[error("Cannot open snapshot repository in non-directory {root:?}")] - NotDirectory { root: PathBuf }, + NotDirectory { root: SnapshotsPath }, #[error(transparent)] Lockfile(#[from] LockfileError), #[error(transparent)] @@ -303,13 +304,13 @@ impl Snapshot { /// - `path` does not refer to a readable file. /// - The file at `path` is corrupted, /// as detected by comparing the hash of its bytes to a hash recorded in the file. - pub fn read_from_file(path: &Path) -> Result { + pub fn read_from_file(path: &SnapshotFilePath) -> Result { let err_read_object = |cause| SnapshotError::ReadObject { ty: ObjectType::Snapshot, - source_repo: path.to_path_buf(), + source_repo: path.0.clone(), cause, }; - let mut snapshot_file = o_rdonly().open(path).map_err(err_read_object)?; + let mut snapshot_file = path.open_file(&o_rdonly()).map_err(err_read_object)?; // The snapshot file is prefixed with the hash of the `Snapshot`'s BSATN. // Read that hash. @@ -330,13 +331,13 @@ impl Snapshot { ty: ObjectType::Snapshot, expected: *hash.as_bytes(), computed: *computed_hash.as_bytes(), - source_repo: path.to_path_buf(), + source_repo: path.0.clone(), }); } let snapshot = bsatn::from_slice::(&snapshot_bsatn).map_err(|cause| SnapshotError::Deserialize { ty: ObjectType::Snapshot, - source_repo: path.to_path_buf(), + source_repo: path.0.clone(), cause, })?; @@ -464,7 +465,7 @@ impl Snapshot { /// A repository of snapshots of a particular database instance. pub struct SnapshotRepository { /// The directory which contains all the snapshots. - root: PathBuf, + root: SnapshotsPath, /// The database address of the database instance for which this repository stores snapshots. database_address: Address, @@ -492,7 +493,7 @@ impl SnapshotRepository { tables: impl Iterator, blobs: &'db dyn BlobStore, tx_offset: TxOffset, - ) -> Result { + ) -> Result { // If a previous snapshot exists in this snapshot repo, // get a handle on its object repo in order to hardlink shared objects into the new snapshot. let prev_snapshot = self @@ -500,7 +501,7 @@ impl SnapshotRepository { .map(|tx_offset| self.snapshot_dir_path(tx_offset)); let prev_snapshot = if let Some(prev_snapshot) = prev_snapshot { assert!( - prev_snapshot.is_dir(), + prev_snapshot.0.is_dir(), "prev_snapshot {prev_snapshot:?} is not a directory" ); let object_repo = Self::object_repo(&prev_snapshot)?; @@ -515,10 +516,11 @@ impl SnapshotRepository { // Before performing any observable operations, // acquire a lockfile on the snapshot you want to create. + // TODO(noa): is this lockfile still necessary now that we have data-dir? let _lock = Lockfile::for_file(&snapshot_dir)?; // Create the snapshot directory. - std::fs::create_dir_all(&snapshot_dir)?; + snapshot_dir.create()?; // Create a new `DirTrie` to hold all the content-addressed objects in the snapshot. let object_repo = Self::object_repo(&snapshot_dir)?; @@ -540,7 +542,7 @@ impl SnapshotRepository { // Create the snapshot file, containing first the hash, then the `Snapshot`. { - let mut snapshot_file = o_excl().open(Self::snapshot_file_path(tx_offset, &snapshot_dir))?; + let mut snapshot_file = snapshot_dir.snapshot_file(tx_offset).open_file(&o_excl())?; snapshot_file.write_all(hash.as_bytes())?; snapshot_file.write_all(&snapshot_bsatn)?; } @@ -587,24 +589,8 @@ impl SnapshotRepository { /// Any mutations to any files contained in the returned directory /// will likely corrupt the snapshot, /// causing attempts to reconstruct it to fail. - pub fn snapshot_dir_path(&self, tx_offset: TxOffset) -> PathBuf { - let dir_name = format!("{tx_offset:0>20}.{SNAPSHOT_DIR_EXT}"); - self.root.join(dir_name) - } - - /// Given `snapshot_dir` as the result of `self.snapshot_dir_path(tx_offset)`, - /// get the path to the root snapshot file, which contains a serialized [`Snapshot`]. - /// - /// This method does not validate that the `snapshot_dir` exists or is valid, - /// so the returned snapshot file path may be nonexistant or refer to a locked or incomplete snapshot file. - /// This method also does not check if the snapshot file, or any other part of the snapshot, is corrupted. - /// Consumers should verify the hashes stored in the snapshot file and object repository. - /// - /// Any mutations to the snapshot file will likely render the snapshot corrupted, - /// causing future attempts to reconstruct it to fail. - pub fn snapshot_file_path(tx_offset: TxOffset, snapshot_dir: &Path) -> PathBuf { - let file_name = format!("{tx_offset:0>20}.{SNAPSHOT_FILE_EXT}"); - snapshot_dir.join(file_name) + pub fn snapshot_dir_path(&self, tx_offset: TxOffset) -> SnapshotDirPath { + self.root.snapshot_dir(tx_offset) } /// Given `snapshot_dir` as the result of [`Self::snapshot_dir_path`], @@ -617,8 +603,8 @@ impl SnapshotRepository { /// Any mutations to the returned [`DirTrie`] or its contents /// will likely render the snapshot corrupted, /// causing future attempts to reconstruct it to fail. - pub fn object_repo(snapshot_dir: &Path) -> Result { - DirTrie::open(snapshot_dir.join("objects")) + pub fn object_repo(snapshot_dir: &SnapshotDirPath) -> Result { + DirTrie::open(snapshot_dir.objects().0) } /// Read a snapshot contained in self referring to `tx_offset`, @@ -653,7 +639,7 @@ impl SnapshotRepository { return Err(SnapshotError::Incomplete { tx_offset, lockfile }); } - let snapshot_file_path = Self::snapshot_file_path(tx_offset, &snapshot_dir); + let snapshot_file_path = snapshot_dir.snapshot_file(tx_offset); let snapshot = Snapshot::read_from_file(&snapshot_file_path)?; if snapshot.magic != MAGIC { @@ -690,8 +676,8 @@ impl SnapshotRepository { /// /// Calls [`Path::is_dir`] and requires that the result is `true`. /// See that method for more detailed preconditions on this function. - pub fn open(root: PathBuf, database_address: Address, replica_id: u64) -> Result { - if !root.is_dir() { + pub fn open(root: SnapshotsPath, database_address: Address, replica_id: u64) -> Result { + if !root.0.is_dir() { return Err(SnapshotError::NotDirectory { root }); } Ok(Self { @@ -766,9 +752,8 @@ impl SnapshotRepository { for newer_snapshot in newer_snapshots { let path = self.snapshot_dir_path(newer_snapshot); - let invalid_path = path.with_extension(INVALID_SNAPSHOT_DIR_EXT); log::info!("Renaming snapshot newer than {upper_bound} from {path:?} to {path:?}"); - std::fs::rename(path, invalid_path)?; + path.rename_invalid()?; } Ok(()) } diff --git a/crates/standalone/Cargo.toml b/crates/standalone/Cargo.toml index aedbf6e226..5501748ae3 100644 --- a/crates/standalone/Cargo.toml +++ b/crates/standalone/Cargo.toml @@ -17,10 +17,11 @@ harness = true # Use libtest harness. required-features = [] # Features required to build this target (N/A for lib) [dependencies] +spacetimedb-client-api-messages.workspace = true +spacetimedb-client-api.workspace = true spacetimedb-core.workspace = true spacetimedb-lib.workspace = true -spacetimedb-client-api.workspace = true -spacetimedb-client-api-messages.workspace = true +spacetimedb-paths.workspace = true anyhow.workspace = true async-trait.workspace = true diff --git a/crates/standalone/src/control_db.rs b/crates/standalone/src/control_db.rs index fc116ca148..2faab348d6 100644 --- a/crates/standalone/src/control_db.rs +++ b/crates/standalone/src/control_db.rs @@ -1,13 +1,14 @@ use spacetimedb::address::Address; +use spacetimedb::energy; use spacetimedb::hash::hash_bytes; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, EnergyBalance, Node, Replica}; -use spacetimedb::{energy, stdb_path}; use spacetimedb_client_api_messages::name::{ DomainName, DomainParsingError, InsertDomainResult, RegisterTldResult, Tld, TldRef, }; use spacetimedb_lib::bsatn; +use spacetimedb_paths::standalone::ControlDbDir; #[cfg(test)] mod tests; @@ -53,9 +54,9 @@ impl From for Error { } impl ControlDb { - pub fn new() -> Result { + pub fn new(path: &ControlDbDir) -> Result { let config = sled::Config::default() - .path(stdb_path("control_node/control_db")) + .path(path) .flush_every_ms(Some(50)) .mode(sled::Mode::HighThroughput); let db = config.open()?; diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index b45fcdcbf4..765a0f9a39 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -17,14 +17,16 @@ use openssl::pkey::PKey; use spacetimedb::address::Address; use spacetimedb::auth::identity::{DecodingKey, EncodingKey}; use spacetimedb::client::ClientActorIndex; +use spacetimedb::config::MetadataFile; use spacetimedb::db::{db_metrics::DB_METRICS, Config}; use spacetimedb::energy::{EnergyBalance, EnergyQuanta}; use spacetimedb::host::{DiskStorage, HostController, UpdateDatabaseResult}; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, Node, Replica}; -use spacetimedb::stdb_path; use spacetimedb::worker_metrics::WORKER_METRICS; use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld}; +use spacetimedb_paths::server::{PidFile, ServerDataPath}; +use spacetimedb_paths::standalone::StandaloneDataDirExt; use std::fs::File; use std::io::Write; use std::path::{Path, PathBuf}; @@ -41,20 +43,33 @@ pub struct StandaloneEnv { private_key: EncodingKey, public_key_bytes: Box<[u8]>, metrics_registry: prometheus::Registry, + _pid_file: PidFile, } impl StandaloneEnv { - pub async fn init(config: Config) -> anyhow::Result> { - let control_db = ControlDb::new().context("failed to initialize control db")?; + pub async fn init(config: Config, data_dir: Arc) -> anyhow::Result> { + data_dir.create()?; + let _pid_file = data_dir.pid_file()?; + let meta_path = data_dir.metadata_toml(); + let meta = MetadataFile { + version: spacetimedb::config::current_version(), + edition: "standalone".to_owned(), + client_address: None, + }; + if let Some(existing_meta) = MetadataFile::read(&meta_path).context("failed reading metadata.toml")? { + anyhow::ensure!( + existing_meta.version_compatible_with(&meta.version) && existing_meta.edition == meta.edition, + "metadata.toml indicates that this database is from an incompatible \ + version of SpacetimeDB. please run a migration before proceeding." + ); + } + meta.write(&meta_path).context("failed writing metadata.toml")?; + + let control_db = ControlDb::new(&data_dir.control_db()).context("failed to initialize control db")?; let energy_monitor = Arc::new(StandaloneEnergyMonitor::new(control_db.clone())); - let program_store = Arc::new(DiskStorage::new(stdb_path("control_node/program_bytes")).await?); + let program_store = Arc::new(DiskStorage::new(data_dir.program_bytes().0).await?); - let host_controller = HostController::new( - stdb_path("worker_node/replicas").into(), - config, - program_store.clone(), - energy_monitor, - ); + let host_controller = HostController::new(data_dir, config, program_store.clone(), energy_monitor); let client_actor_index = ClientActorIndex::new(); let (public_key, private_key, public_key_bytes) = get_or_create_keys()?; @@ -71,8 +86,13 @@ impl StandaloneEnv { private_key, public_key_bytes, metrics_registry, + _pid_file, })) } + + pub fn data_dir(&self) -> &Arc { + &self.host_controller.data_dir + } } fn get_or_create_keys() -> anyhow::Result<(DecodingKey, EncodingKey, Box<[u8]>)> { diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index 0037c7b38a..18ceac5834 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -1,3 +1,6 @@ +use std::path::PathBuf; +use std::sync::Arc; + use crate::routes::router; use crate::util::{create_dir_or_err, create_file_with_contents}; use crate::StandaloneEnv; @@ -6,6 +9,7 @@ use clap::{Arg, ArgMatches}; use spacetimedb::config::{FilesGlobal, FilesLocal, SpacetimeDbFiles}; use spacetimedb::db::{Config, Storage}; use spacetimedb::startup; +use spacetimedb_paths::server::ServerDataPath; use tokio::net::TcpListener; #[cfg(feature = "string")] @@ -50,6 +54,14 @@ impl ProgramMode { } } +pub fn default_data_dir() -> PathBuf { + dirs::data_local_dir().unwrap().join(if cfg!(windows) { + "SpacetimeDB/data" + } else { + "spacetime/data" + }) +} + pub fn cli(mode: ProgramMode) -> clap::Command { let mut log_conf_path_arg = Arg::new("log_conf_path") .long("log-conf-path") @@ -65,6 +77,11 @@ pub fn cli(mode: ProgramMode) -> clap::Command { let mut jwt_priv_key_path_arg = Arg::new("jwt_priv_key_path") .long("jwt-priv-key-path") .help("The path to the private jwt key for issuing identities (SPACETIMEDB_JWT_PRIV_KEY)"); + let data_dir_arg = Arg::new("data_dir") + .long("data-dir") + .help("The path to the data directory for the database") + .default_value(default_data_dir().into_os_string()) + .value_parser(clap::value_parser!(PathBuf)); let in_memory_arg = Arg::new("in_memory") .long("in-memory") @@ -122,6 +139,7 @@ pub fn cli(mode: ProgramMode) -> clap::Command { .default_value(mode.listen_addr()) .help(mode.listen_addr_help()) ) + .arg(data_dir_arg) .arg(log_conf_path_arg) .arg(log_dir_path_arg) .arg(database_path_arg) @@ -168,6 +186,7 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> { let stdb_path = read_argument(args, "database_path", "STDB_PATH"); let jwt_pub_key_path = read_argument(args, "jwt_pub_key_path", "SPACETIMEDB_JWT_PUB_KEY"); let jwt_priv_key_path = read_argument(args, "jwt_priv_key_path", "SPACETIMEDB_JWT_PRIV_KEY"); + let data_dir = args.get_one::("data_dir").unwrap(); let enable_tracy = args.get_flag("enable_tracy"); let storage = if args.get_flag("in_memory") { Storage::Memory @@ -213,7 +232,9 @@ pub async fn exec(args: &ArgMatches) -> anyhow::Result<()> { startup::StartupOptions::default().configure(); - let ctx = StandaloneEnv::init(config).await?; + // TODO: + let data_dir = Arc::new(ServerDataPath(data_dir.clone())); + let ctx = StandaloneEnv::init(config, data_dir).await?; let service = router(ctx); diff --git a/crates/testing/Cargo.toml b/crates/testing/Cargo.toml index 119f97a692..29633397df 100644 --- a/crates/testing/Cargo.toml +++ b/crates/testing/Cargo.toml @@ -10,6 +10,7 @@ spacetimedb-lib.workspace = true spacetimedb-core.workspace = true spacetimedb-standalone.workspace = true spacetimedb-client-api.workspace = true +spacetimedb-paths.workspace = true anyhow.workspace = true env_logger.workspace = true diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 69226880a6..c8aececcf5 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -7,6 +7,7 @@ use std::time::Instant; use spacetimedb::messages::control_db::HostType; use spacetimedb::Identity; use spacetimedb_lib::ser::serde::SerializeWrapper; +use spacetimedb_paths::server::ServerDataPath; use tokio::runtime::{Builder, Runtime}; use spacetimedb::address::Address; @@ -76,7 +77,8 @@ impl ModuleHandle { } pub async fn read_log(&self, size: Option) -> String { - let filepath = DatabaseLogger::filepath(&self.db_address, self.client.replica_id); + let replica_path = self._env.data_dir().replica(self.client.replica_id); + let filepath = DatabaseLogger::filepath(replica_path); DatabaseLogger::read_latest(&filepath, size).await } } @@ -148,9 +150,12 @@ impl CompiledModule { paths } }; + let data_dir = ServerDataPath(paths.db_path().join("data")); crate::set_key_env_vars(&paths); - let env = spacetimedb_standalone::StandaloneEnv::init(config).await.unwrap(); + let env = spacetimedb_standalone::StandaloneEnv::init(config, data_dir.into()) + .await + .unwrap(); // TODO: Fix this when we update identity generation. let identity = Identity::ZERO; let db_address = env.create_address().await.unwrap(); diff --git a/crates/testing/src/sdk.rs b/crates/testing/src/sdk.rs index 4885402c81..8159cb526e 100644 --- a/crates/testing/src/sdk.rs +++ b/crates/testing/src/sdk.rs @@ -19,8 +19,9 @@ pub fn ensure_standalone_process() { // We need the tempdir to live for the duration of the process, // and all the options for post-`main` cleanup seem sketchy. .into_path(); - std::env::set_var("STDB_PATH", stdb_path); - Mutex::new(Some(std::thread::spawn(|| invoke_cli(&["start"])))) + std::env::set_var("STDB_PATH", &stdb_path); + let data_dir = stdb_path.join("data").into_os_string().into_string().unwrap(); + Mutex::new(Some(std::thread::spawn(move || invoke_cli(&["start", "--data-dir", &data_dir])))) }; }