Skip to content

Commit

Permalink
Data dir impl
Browse files Browse the repository at this point in the history
  • Loading branch information
coolreader18 committed Oct 21, 2024
1 parent 263511e commit 5795709
Show file tree
Hide file tree
Showing 36 changed files with 663 additions and 377 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ members = [
"crates/fs-utils",
"crates/lib",
"crates/metrics",
"crates/paths",
"crates/primitives",
"crates/sats",
"crates/schema",
Expand Down Expand Up @@ -100,6 +101,7 @@ spacetimedb-expr = { path = "crates/expr", version = "1.0.0-rc1" }
spacetimedb-lib = { path = "crates/lib", default-features = false, version = "1.0.0-rc1" }
spacetimedb-metrics = { path = "crates/metrics", version = "1.0.0-rc1" }
spacetimedb-primitives = { path = "crates/primitives", version = "1.0.0-rc1" }
spacetimedb-paths = { path = "crates/paths", version = "1.0.0-rc1" }
spacetimedb-sats = { path = "crates/sats", version = "1.0.0-rc1" }
spacetimedb-schema = { path = "crates/schema", version = "1.0.0-rc1" }
spacetimedb-standalone = { path = "crates/standalone", version = "1.0.0-rc1" }
Expand Down Expand Up @@ -169,6 +171,7 @@ indicatif = "0.16"
insta = { version = "1.21.0", features = ["toml"] }
is-terminal = "0.4"
itertools = "0.12"
itoa = "1"
jsonwebtoken = { version = "8.1.0" }
lazy_static = "1.4.0"
log = "0.4.17"
Expand Down Expand Up @@ -201,6 +204,7 @@ rustyline = { version = "12.0.0", features = [] }
scoped-tls = "1.0.1"
scopeguard = "1.1.0"
second-stack = "0.3"
semver = "1"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { version = "1.0.128", features = ["raw_value", "arbitrary_precision"] }
serde_path_to_error = "0.1.9"
Expand Down
41 changes: 13 additions & 28 deletions crates/cli/src/subcommands/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
};
use anyhow::Context;
use clap::{Arg, ArgAction, ArgMatches, Command};
use spacetimedb::stdb_path;
use spacetimedb_standalone::subcommands::start::default_data_dir;
use std::path::PathBuf;
use tabled::{
settings::{object::Columns, Alignment, Modify, Style},
Expand Down Expand Up @@ -122,6 +122,13 @@ fn get_subcommands() -> Vec<Command> {
.arg(common_args::yes()),
Command::new("clear")
.about("Deletes all data from all local databases")
.arg(
Arg::new("data_dir")
.long("data-dir")
.help("The path to the data directory for the database")
.default_value(default_data_dir().into_os_string())
.value_parser(clap::value_parser!(PathBuf)),
)
.arg(common_args::yes()),
// TODO: set-name, set-protocol, set-host, set-url
]
Expand Down Expand Up @@ -425,26 +432,10 @@ pub async fn exec_edit(mut config: Config, args: &ArgMatches) -> Result<(), anyh

async fn exec_clear(_config: Config, args: &ArgMatches) -> Result<(), anyhow::Error> {
let force = args.get_flag("force");
if std::env::var_os("STDB_PATH").map(PathBuf::from).is_none() {
let mut path = dirs::home_dir().unwrap_or_default();
path.push(".spacetime");
std::env::set_var("STDB_PATH", path.to_str().unwrap());
}

let control_node_dir = stdb_path("control_node");
let worker_node_dir = stdb_path("worker_node");
if control_node_dir.exists() || worker_node_dir.exists() {
if control_node_dir.exists() {
println!("Control node database path: {}", control_node_dir.to_str().unwrap());
} else {
println!("Control node database path: <not found>");
}
let data_dir = args.get_one::<PathBuf>("data_dir").unwrap();

if worker_node_dir.exists() {
println!("Worker node database path: {}", worker_node_dir.to_str().unwrap());
} else {
println!("Worker node database path: <not found>");
}
if data_dir.exists() {
println!("Database path: {}", data_dir.display());

if !y_or_n(
force,
Expand All @@ -454,14 +445,8 @@ async fn exec_clear(_config: Config, args: &ArgMatches) -> Result<(), anyhow::Er
return Ok(());
}

if control_node_dir.exists() {
std::fs::remove_dir_all(&control_node_dir)?;
println!("Deleted control node database: {}", control_node_dir.to_str().unwrap());
}
if worker_node_dir.exists() {
std::fs::remove_dir_all(&worker_node_dir)?;
println!("Deleted worker node database: {}", worker_node_dir.to_str().unwrap());
}
std::fs::remove_dir_all(&data_dir)?;
println!("Deleted database: {}", data_dir.display());
} else {
println!("Local database not found. Nothing has been deleted.");
}
Expand Down
2 changes: 1 addition & 1 deletion crates/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ bytes = "1"
tracing.workspace = true
bytestring = "1"
tokio-tungstenite.workspace = true
itoa = "1.0.9"
itoa.workspace = true
derive_more = "0.99.17"
scopeguard.workspace = true
5 changes: 3 additions & 2 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,12 @@ where
.ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?;
let replica_id = replica.id;

let filepath = DatabaseLogger::filepath(&address, replica_id);
let host = worker_ctx.host_controller();

let filepath = DatabaseLogger::filepath(host.data_dir.replica(replica_id));
let lines = DatabaseLogger::read_latest(&filepath, num_lines).await;

let body = if follow {
let host = worker_ctx.host_controller();
let module = host
.get_or_launch_module_host(database, replica_id)
.await
Expand Down
1 change: 1 addition & 0 deletions crates/commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ log.workspace = true
memmap2 = "0.9.4"
serde = { workspace = true, optional = true }
spacetimedb-primitives.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-sats.workspace = true
tempfile.workspace = true
thiserror.workspace = true
Expand Down
49 changes: 18 additions & 31 deletions crates/commitlog/src/index/indexfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,16 @@ use std::{
io,
marker::PhantomData,
mem,
path::{Path, PathBuf},
};

use log::debug;
use memmap2::MmapMut;
use spacetimedb_paths::server::OffsetIndexFile;

use super::IndexError;
const OFFSET_INDEX_FILE_EXT: &str = ".stdb.ofs";
const KEY_SIZE: usize = mem::size_of::<u64>();
const ENTRY_SIZE: usize = KEY_SIZE + mem::size_of::<u64>();

/// Returns the offset index file path based on the root path and offset
pub fn offset_index_file_path(root: &Path, offset: u64) -> PathBuf {
root.join(format!("{offset:0>20}{OFFSET_INDEX_FILE_EXT}"))
}

/// A mutable representation of an index file using memory-mapped I/O.
///
/// `IndexFileMut` provides efficient read and write access to an index file, which stores
Expand All @@ -35,12 +29,8 @@ pub struct IndexFileMut<Key: Into<u64> + From<u64>> {
}

impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
pub fn create_index_file(path: &Path, offset: u64, cap: u64) -> io::Result<Self> {
File::options()
.write(true)
.read(true)
.create_new(true)
.open(offset_index_file_path(path, offset))
pub fn create_index_file(path: &OffsetIndexFile, cap: u64) -> io::Result<Self> {
path.open_file(File::options().write(true).read(true).create_new(true))
.and_then(|file| {
file.set_len(cap * ENTRY_SIZE as u64)?;
let mmap = unsafe { MmapMut::map_mut(&file) }?;
Expand All @@ -54,18 +44,15 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
.or_else(|e| {
if e.kind() == io::ErrorKind::AlreadyExists {
debug!("Index file {} already exists", path.display());
Self::open_index_file(path, offset, cap)
Self::open_index_file(path, cap)
} else {
Err(e)
}
})
}

pub fn open_index_file(path: &Path, offset: u64, cap: u64) -> io::Result<Self> {
let file = File::options()
.read(true)
.write(true)
.open(offset_index_file_path(path, offset))?;
pub fn open_index_file(path: &OffsetIndexFile, cap: u64) -> io::Result<Self> {
let file = path.open_file(File::options().read(true).write(true))?;
file.set_len(cap * ENTRY_SIZE as u64)?;
let mmap = unsafe { MmapMut::map_mut(&file)? };

Expand All @@ -79,8 +66,8 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
Ok(me)
}

pub fn delete_index_file(path: &Path, offset: u64) -> io::Result<()> {
fs::remove_file(offset_index_file_path(path, offset)).map_err(Into::into)
pub fn delete_index_file(path: &OffsetIndexFile) -> io::Result<()> {
fs::remove_file(path).map_err(Into::into)
}

// Searches for first 0-key, to count number of entries
Expand Down Expand Up @@ -244,11 +231,8 @@ pub struct IndexFile<Key: Into<u64> + From<u64>> {
}

impl<Key: Into<u64> + From<u64>> IndexFile<Key> {
pub fn open_index_file(path: &Path, offset: u64) -> io::Result<Self> {
let file = File::options()
.read(true)
.append(true)
.open(offset_index_file_path(path, offset))?;
pub fn open_index_file(path: &OffsetIndexFile) -> io::Result<Self> {
let file = path.open_file(File::options().read(true).append(true))?;
let mmap = unsafe { MmapMut::map_mut(&file)? };

let mut inner = IndexFileMut {
Expand All @@ -274,16 +258,18 @@ impl<Key: Into<u64> + From<u64>> IndexFile<Key> {
#[cfg(test)]
mod tests {
use super::*;
use spacetimedb_paths::server::CommitLogDir;
use tempfile::TempDir;

/// Create and fill index file with key as first `fill_till - 1` even numbers
fn create_and_fill_index(cap: u64, fill_till: u64) -> Result<IndexFileMut<u64>, IndexError> {
// Create a temporary directory for testing
let temp_dir = TempDir::new()?;
let path = temp_dir.path().to_path_buf();
let path = CommitLogDir(temp_dir.path().to_path_buf());
let index_path = path.index(0);

// Create an index file
let mut index_file: IndexFileMut<u64> = IndexFileMut::create_index_file(&path, 0, cap)?;
let mut index_file: IndexFileMut<u64> = IndexFileMut::create_index_file(&index_path, cap)?;

// Enter even number keys from 2
for i in 1..fill_till {
Expand Down Expand Up @@ -364,10 +350,11 @@ mod tests {
fn test_close_open_index() -> Result<(), IndexError> {
// Create a temporary directory for testing
let temp_dir = TempDir::new()?;
let path = temp_dir.path().to_path_buf();
let path = CommitLogDir(temp_dir.path().to_path_buf());
let index_path = path.index(0);

// Create an index file
let mut index_file: IndexFileMut<u64> = IndexFileMut::create_index_file(&path, 0, 100)?;
let mut index_file: IndexFileMut<u64> = IndexFileMut::create_index_file(&index_path, 100)?;

for i in 1..10 {
index_file.append(i * 2, i * 2 * 100)?;
Expand All @@ -376,7 +363,7 @@ mod tests {
assert_eq!(index_file.num_entries, 9);
drop(index_file);

let open_index_file: IndexFileMut<u64> = IndexFileMut::open_index_file(&path, 0, 100)?;
let open_index_file: IndexFileMut<u64> = IndexFileMut::open_index_file(&index_path, 100)?;
assert_eq!(open_index_file.num_entries, 9);
assert_eq!(open_index_file.key_lookup(6)?, (6, 600));

Expand Down
1 change: 0 additions & 1 deletion crates/commitlog/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::io;
use thiserror::Error;
mod indexfile;

pub use indexfile::offset_index_file_path;
pub use indexfile::{IndexFile, IndexFileMut};

#[derive(Error, Debug)]
Expand Down
Loading

0 comments on commit 5795709

Please sign in to comment.