diff --git a/Cargo.lock b/Cargo.lock index 859bd3847c..def5ddae56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -770,7 +770,7 @@ dependencies = [ [[package]] name = "connect_disconnect_client" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "spacetimedb-sdk", @@ -4245,7 +4245,7 @@ dependencies = [ [[package]] name = "spacetimedb" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "bytemuck", "derive_more", @@ -4262,7 +4262,7 @@ dependencies = [ [[package]] name = "spacetimedb-bench" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "anymap", @@ -4300,7 +4300,7 @@ dependencies = [ [[package]] name = "spacetimedb-bindings-macro" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "bitflags 2.6.0", "heck 0.4.1", @@ -4313,14 +4313,14 @@ dependencies = [ [[package]] name = "spacetimedb-bindings-sys" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "spacetimedb-primitives", ] [[package]] name = "spacetimedb-cli" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "base64 0.21.7", @@ -4374,7 +4374,7 @@ dependencies = [ [[package]] name = "spacetimedb-client-api" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "async-trait", @@ -4414,7 +4414,7 @@ dependencies = [ [[package]] name = "spacetimedb-client-api-messages" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "brotli", "bytes", @@ -4437,7 +4437,7 @@ dependencies = [ [[package]] name = "spacetimedb-commitlog" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "bitflags 2.6.0", "crc32c", @@ -4458,7 +4458,7 @@ dependencies = [ [[package]] name = "spacetimedb-core" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "arrayvec", @@ -4556,7 +4556,7 @@ dependencies = [ [[package]] name = "spacetimedb-data-structures" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "hashbrown 0.14.5", "nohash-hasher", @@ -4567,7 +4567,7 @@ dependencies = [ [[package]] name = "spacetimedb-durability" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "log", @@ -4579,7 +4579,7 @@ dependencies = [ [[package]] name = "spacetimedb-fs-utils" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "hex", @@ -4590,7 +4590,7 @@ dependencies = [ [[package]] name = "spacetimedb-lib" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "bitflags 2.6.0", @@ -4616,7 +4616,7 @@ dependencies = [ [[package]] name = "spacetimedb-metrics" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "arrayvec", "itertools 0.12.1", @@ -4626,7 +4626,7 @@ dependencies = [ [[package]] name = "spacetimedb-primitives" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "bitflags 2.6.0", "either", @@ -4637,7 +4637,7 @@ dependencies = [ [[package]] name = "spacetimedb-query-planner" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "derive_more", "spacetimedb-lib", @@ -4659,7 +4659,7 @@ dependencies = [ [[package]] name = "spacetimedb-sats" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "ahash 0.8.11", "arrayvec", @@ -4689,7 +4689,7 @@ dependencies = [ [[package]] name = "spacetimedb-schema" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "enum-as-inner", @@ -4713,7 +4713,7 @@ dependencies = [ [[package]] name = "spacetimedb-sdk" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "anymap", @@ -4740,7 +4740,7 @@ dependencies = [ [[package]] name = "spacetimedb-snapshot" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "blake3", "hex", @@ -4756,7 +4756,7 @@ dependencies = [ [[package]] name = "spacetimedb-sql-parser" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "derive_more", "sqlparser", @@ -4765,7 +4765,7 @@ dependencies = [ [[package]] name = "spacetimedb-standalone" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "async-trait", @@ -4795,7 +4795,7 @@ dependencies = [ [[package]] name = "spacetimedb-table" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "ahash 0.8.11", "blake3", @@ -4819,7 +4819,7 @@ dependencies = [ [[package]] name = "spacetimedb-testing" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "clap 4.5.18", @@ -4844,7 +4844,7 @@ dependencies = [ [[package]] name = "spacetimedb-vm" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "arrayvec", @@ -4940,7 +4940,7 @@ dependencies = [ [[package]] name = "sqltest" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "async-trait", @@ -5272,7 +5272,7 @@ dependencies = [ [[package]] name = "test-client" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "env_logger", @@ -5283,7 +5283,7 @@ dependencies = [ [[package]] name = "test-counter" -version = "0.12.0" +version = "1.0.0-rc1" dependencies = [ "anyhow", "spacetimedb-data-structures", diff --git a/Cargo.toml b/Cargo.toml index fd354db69e..55aa4a2ebc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,34 +80,34 @@ inherits = "release" debug = true [workspace.package] -version = "0.12.0" +version = "1.0.0-rc1" edition = "2021" # update rust-toolchain.toml too! rust-version = "1.78.0" [workspace.dependencies] -spacetimedb = { path = "crates/bindings", version = "0.12.0" } -spacetimedb-bindings-macro = { path = "crates/bindings-macro", version = "0.12.0" } -spacetimedb-bindings-sys = { path = "crates/bindings-sys", version = "0.12.0" } -spacetimedb-cli = { path = "crates/cli", version = "0.12.0" } -spacetimedb-client-api = { path = "crates/client-api", version = "0.12.0" } -spacetimedb-client-api-messages = { path = "crates/client-api-messages", version = "0.12.0" } -spacetimedb-commitlog = { path = "crates/commitlog", version = "0.12.0" } -spacetimedb-core = { path = "crates/core", version = "0.12.0" } -spacetimedb-data-structures = { path = "crates/data-structures", version = "0.12.0" } -spacetimedb-durability = { path = "crates/durability", version = "0.12.0" } -spacetimedb-lib = { path = "crates/lib", default-features = false, version = "0.12.0" } -spacetimedb-metrics = { path = "crates/metrics", version = "0.12.0" } -spacetimedb-primitives = { path = "crates/primitives", version = "0.12.0" } -spacetimedb-query-planner = { path = "crates/planner", version = "0.12.0" } -spacetimedb-sats = { path = "crates/sats", version = "0.12.0" } -spacetimedb-schema = { path = "crates/schema", version = "0.12.0" } -spacetimedb-standalone = { path = "crates/standalone", version = "0.12.0" } -spacetimedb-sql-parser = { path = "crates/sql-parser", version = "0.12.0" } -spacetimedb-table = { path = "crates/table", version = "0.12.0" } -spacetimedb-vm = { path = "crates/vm", version = "0.12.0" } -spacetimedb-fs-utils = { path = "crates/fs-utils", version = "0.12.0" } -spacetimedb-snapshot = { path = "crates/snapshot", version = "0.12.0" } +spacetimedb = { path = "crates/bindings", version = "1.0.0-rc1" } +spacetimedb-bindings-macro = { path = "crates/bindings-macro", version = "1.0.0-rc1" } +spacetimedb-bindings-sys = { path = "crates/bindings-sys", version = "1.0.0-rc1" } +spacetimedb-cli = { path = "crates/cli", version = "1.0.0-rc1" } +spacetimedb-client-api = { path = "crates/client-api", version = "1.0.0-rc1" } +spacetimedb-client-api-messages = { path = "crates/client-api-messages", version = "1.0.0-rc1" } +spacetimedb-commitlog = { path = "crates/commitlog", version = "1.0.0-rc1" } +spacetimedb-core = { path = "crates/core", version = "1.0.0-rc1" } +spacetimedb-data-structures = { path = "crates/data-structures", version = "1.0.0-rc1" } +spacetimedb-durability = { path = "crates/durability", version = "1.0.0-rc1" } +spacetimedb-lib = { path = "crates/lib", default-features = false, version = "1.0.0-rc1" } +spacetimedb-metrics = { path = "crates/metrics", version = "1.0.0-rc1" } +spacetimedb-primitives = { path = "crates/primitives", version = "1.0.0-rc1" } +spacetimedb-query-planner = { path = "crates/planner", version = "1.0.0-rc1" } +spacetimedb-sats = { path = "crates/sats", version = "1.0.0-rc1" } +spacetimedb-schema = { path = "crates/schema", version = "1.0.0-rc1" } +spacetimedb-standalone = { path = "crates/standalone", version = "1.0.0-rc1" } +spacetimedb-sql-parser = { path = "crates/sql-parser", version = "1.0.0-rc1" } +spacetimedb-table = { path = "crates/table", version = "1.0.0-rc1" } +spacetimedb-vm = { path = "crates/vm", version = "1.0.0-rc1" } +spacetimedb-fs-utils = { path = "crates/fs-utils", version = "1.0.0-rc1" } +spacetimedb-snapshot = { path = "crates/snapshot", version = "1.0.0-rc1" } ahash = "0.8" anyhow = "1.0.68" diff --git a/LICENSE.txt b/LICENSE.txt index 4639053d48..7ef5688a85 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -5,7 +5,7 @@ Business Source License 1.1 Parameters Licensor: Clockwork Laboratories, Inc. -Licensed Work: SpacetimeDB 0.12.0 +Licensed Work: SpacetimeDB 1.0.0-rc1 The Licensed Work is (c) 2023 Clockwork Laboratories, Inc. diff --git a/crates/cli/src/subcommands/project/rust/Cargo._toml b/crates/cli/src/subcommands/project/rust/Cargo._toml index 945eeb8539..d9636308bb 100644 --- a/crates/cli/src/subcommands/project/rust/Cargo._toml +++ b/crates/cli/src/subcommands/project/rust/Cargo._toml @@ -9,5 +9,5 @@ edition = "2021" crate-type = ["cdylib"] [dependencies] -spacetimedb = "0.12.0" +spacetimedb = "1.0.0-rc1" log = "0.4" diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index a952b92c03..a71497fe67 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -10,7 +10,7 @@ use spacetimedb::client::ClientActorIndex; use spacetimedb::energy::{EnergyBalance, EnergyQuanta}; use spacetimedb::host::{HostController, UpdateDatabaseResult}; use spacetimedb::identity::Identity; -use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType, IdentityEmail, Node}; +use spacetimedb::messages::control_db::{Database, HostType, IdentityEmail, Node, Replica}; use spacetimedb::sendgrid_controller::SendGridController; use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld}; use spacetimedb_client_api_messages::recovery::RecoveryCode; @@ -92,10 +92,10 @@ pub trait ControlStateReadAccess { fn get_database_by_address(&self, address: &Address) -> anyhow::Result>; fn get_databases(&self) -> anyhow::Result>; - // Database instances - fn get_database_instance_by_id(&self, id: u64) -> anyhow::Result>; - fn get_database_instances(&self) -> anyhow::Result>; - fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option; + // Replicas + fn get_replica_by_id(&self, id: u64) -> anyhow::Result>; + fn get_replicas(&self) -> anyhow::Result>; + fn get_leader_replica_by_database(&self, database_id: u64) -> Option; // Identities fn get_identities_for_email(&self, email: &str) -> anyhow::Result>; @@ -174,15 +174,15 @@ impl ControlStateReadAccess for Arc { (**self).get_databases() } - // Database instances - fn get_database_instance_by_id(&self, id: u64) -> anyhow::Result> { - (**self).get_database_instance_by_id(id) + // Replicas + fn get_replica_by_id(&self, id: u64) -> anyhow::Result> { + (**self).get_replica_by_id(id) } - fn get_database_instances(&self) -> anyhow::Result> { - (**self).get_database_instances() + fn get_replicas(&self) -> anyhow::Result> { + (**self).get_replicas() } - fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option { - (**self).get_leader_database_instance_by_database(database_id) + fn get_leader_replica_by_database(&self, database_id: u64) -> Option { + (**self).get_leader_replica_by_database(database_id) } // Identities diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 0e2405325e..00071e926a 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -22,7 +22,7 @@ use spacetimedb::host::{DescribedEntityType, UpdateDatabaseResult}; use spacetimedb::host::{ModuleHost, ReducerArgs}; use spacetimedb::identity::Identity; use spacetimedb::json::client_api::StmtResultJson; -use spacetimedb::messages::control_db::{Database, DatabaseInstance, HostType}; +use spacetimedb::messages::control_db::{Database, HostType, Replica}; use spacetimedb::sql; use spacetimedb::sql::execute::{ctx_sql, translate_col}; use spacetimedb_client_api_messages::name::{self, DnsLookupResponse, DomainName, PublishOp, PublishResult}; @@ -74,16 +74,13 @@ pub async fn call( (StatusCode::NOT_FOUND, "No such database.") })?; let identity = database.owner_identity; - let database_instance = worker_ctx - .get_leader_database_instance_by_database(database.id) - .ok_or(( - StatusCode::NOT_FOUND, - "Database instance not scheduled to this node yet.", - ))?; - let instance_id = database_instance.id; + let replica = worker_ctx + .get_leader_replica_by_database(database.id) + .ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?; + let replica_id = replica.id; let host = worker_ctx.host_controller(); let module = host - .get_or_launch_module_host(database, instance_id) + .get_or_launch_module_host(database, replica_id) .await .map_err(log_and_500)?; @@ -172,7 +169,7 @@ pub enum DBCallErr { } pub struct DatabaseInformation { - database_instance: DatabaseInstance, + replica: Replica, auth: SpacetimeAuth, } /// Extract some common parameters that most API call invocations to the database will use. @@ -191,15 +188,11 @@ async fn extract_db_call_info( (StatusCode::NOT_FOUND, "No such database.") })?; - let database_instance = ctx.get_leader_database_instance_by_database(database.id).ok_or(( - StatusCode::NOT_FOUND, - "Database instance not scheduled to this node yet.", - ))?; + let replica = ctx + .get_leader_replica_by_database(database.id) + .ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?; - Ok(DatabaseInformation { - database_instance, - auth, - }) + Ok(DatabaseInformation { replica, auth }) } pub enum EntityDef<'a> { @@ -280,10 +273,10 @@ where let call_info = extract_db_call_info(&worker_ctx, auth, &address).await?; - let instance_id = call_info.database_instance.id; + let replica_id = call_info.replica.id; let module = worker_ctx .host_controller() - .get_or_launch_module_host(database, instance_id) + .get_or_launch_module_host(database, replica_id) .await .map_err(log_and_500)?; @@ -348,10 +341,10 @@ where let call_info = extract_db_call_info(&worker_ctx, auth, &address).await?; - let instance_id = call_info.database_instance.id; + let replica_id = call_info.replica.id; let host = worker_ctx.host_controller(); let module = host - .get_or_launch_module_host(database, instance_id) + .get_or_launch_module_host(database, replica_id) .await .map_err(log_and_500)?; @@ -448,21 +441,18 @@ where .into()); } - let database_instance = worker_ctx - .get_leader_database_instance_by_database(database.id) - .ok_or(( - StatusCode::NOT_FOUND, - "Database instance not scheduled to this node yet.", - ))?; - let instance_id = database_instance.id; + let replica = worker_ctx + .get_leader_replica_by_database(database.id) + .ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?; + let replica_id = replica.id; - let filepath = DatabaseLogger::filepath(&address, instance_id); + let filepath = DatabaseLogger::filepath(&address, replica_id); let lines = DatabaseLogger::read_latest(&filepath, num_lines).await; let body = if follow { let host = worker_ctx.host_controller(); let module = host - .get_or_launch_module_host(database, instance_id) + .get_or_launch_module_host(database, replica_id) .await .map_err(log_and_500)?; let log_rx = module.subscribe_to_logs().map_err(log_and_500)?; @@ -533,23 +523,20 @@ where let auth = AuthCtx::new(database.owner_identity, auth.identity); log::debug!("auth: {auth:?}"); - let database_instance = worker_ctx - .get_leader_database_instance_by_database(database.id) - .ok_or(( - StatusCode::NOT_FOUND, - "Database instance not scheduled to this node yet.", - ))?; - let instance_id = database_instance.id; + let replica = worker_ctx + .get_leader_replica_by_database(database.id) + .ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?; + let replica_id = replica.id; let host = worker_ctx.host_controller(); let module_host = host - .get_or_launch_module_host(database.clone(), instance_id) + .get_or_launch_module_host(database.clone(), replica_id) .await .map_err(log_and_500)?; let json = host .using_database( database, - instance_id, + replica_id, move |db| -> axum::response::Result<_, (StatusCode, String)> { tracing::info!(sql = body); let results = diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index f1439cf33d..cf45f04f92 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -88,16 +88,16 @@ where .get_database_by_address(&db_address) .unwrap() .ok_or(StatusCode::NOT_FOUND)?; - let database_instance = ctx - .get_leader_database_instance_by_database(database.id) + let replica = ctx + .get_leader_replica_by_database(database.id) .ok_or(StatusCode::NOT_FOUND)?; - let instance_id = database_instance.id; + let replica_id = replica.id; let identity_token = auth.creds.token().into(); let host = ctx.host_controller(); let module_rx = host - .watch_maybe_launch_module_host(database, instance_id) + .watch_maybe_launch_module_host(database, replica_id) .await .map_err(log_and_500)?; @@ -131,7 +131,7 @@ where } let actor = |client, sendrx| ws_client_actor(client, ws, sendrx); - let client = match ClientConnection::spawn(client_id, protocol, instance_id, module_rx, actor).await { + let client = match ClientConnection::spawn(client_id, protocol, replica_id, module_rx, actor).await { Ok(s) => s, Err(e) => { log::warn!("ModuleHost died while we were connecting: {e:#}"); diff --git a/crates/client-api/src/routes/tracelog.rs b/crates/client-api/src/routes/tracelog.rs index 04b574735a..1b3e6e11bd 100644 --- a/crates/client-api/src/routes/tracelog.rs +++ b/crates/client-api/src/routes/tracelog.rs @@ -6,7 +6,7 @@ use serde::Deserialize; use tempfile::TempDir; use spacetimedb::address::Address; -use spacetimedb::database_instance_context::DatabaseInstanceContext; +use spacetimedb::replica_context::ReplicaContext; use spacetimedb::db::Storage; use spacetimedb::hash::hash_bytes; use spacetimedb::host::instance_env::InstanceEnv; @@ -28,13 +28,13 @@ pub async fn get_tracelog( .get_database_by_address(&address) .map_err(log_and_500)? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; - let database_instance = ctx.get_leader_database_instance_by_database(database.id); - let instance_id = database_instance.unwrap().id; + let replica = ctx.get_leader_replica_by_database(database.id); + let replica_id = replica.unwrap().id; let host = ctx.host_controller(); - let trace = host.get_trace(instance_id).await.map_err(|e| { + let trace = host.get_trace(replica_id).await.map_err(|e| { log::error!("Unable to retrieve tracelog {}", e); - (StatusCode::SERVICE_UNAVAILABLE, "Database instance not ready.") + (StatusCode::SERVICE_UNAVAILABLE, "Replica not ready.") })?; let trace = trace.ok_or(StatusCode::NOT_FOUND)?; @@ -54,13 +54,13 @@ pub async fn stop_tracelog( .get_database_by_address(&address) .map_err(log_and_500)? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; - let database_instance = ctx.get_leader_database_instance_by_database(database.id); - let instance_id = database_instance.unwrap().id; + let replica = ctx.get_leader_replica_by_database(database.id); + let replica_id = replica.unwrap().id; let host = ctx.host_controller(); - host.stop_trace(instance_id).await.map_err(|e| { + host.stop_trace(replica_id).await.map_err(|e| { log::error!("Unable to retrieve tracelog {}", e); - (StatusCode::SERVICE_UNAVAILABLE, "Database instance not ready.") + (StatusCode::SERVICE_UNAVAILABLE, "Replica not ready.") })?; Ok(()) @@ -74,7 +74,7 @@ pub async fn perform_tracelog_replay(body: Bytes) -> axum::response::Result axum::response::Result, - pub database_instance_id: u64, + pub replica_id: u64, pub module: ModuleHost, module_rx: watch::Receiver, } @@ -143,7 +143,7 @@ impl ClientConnection { pub async fn spawn( id: ClientActorId, protocol: Protocol, - database_instance_id: u64, + replica_id: u64, mut module_rx: watch::Receiver, actor: F, ) -> Result @@ -152,8 +152,8 @@ impl ClientConnection { Fut: Future + Send + 'static, { // Add this client as a subscriber - // TODO: Right now this is connecting clients directly to an instance, but their requests should be - // logically subscribed to the database, not any particular instance. We should handle failover for + // TODO: Right now this is connecting clients directly to a replica, but their requests should be + // logically subscribed to the database, not any particular replica. We should handle failover for // them and stuff. Not right now though. let module = module_rx.borrow_and_update().clone(); module @@ -184,7 +184,7 @@ impl ClientConnection { }); let this = Self { sender, - database_instance_id, + replica_id, module, module_rx, }; @@ -199,13 +199,13 @@ impl ClientConnection { pub fn dummy( id: ClientActorId, protocol: Protocol, - database_instance_id: u64, + replica_id: u64, mut module_rx: watch::Receiver, ) -> Self { let module = module_rx.borrow_and_update().clone(); Self { sender: Arc::new(ClientConnectionSender::dummy(id, protocol)), - database_instance_id, + replica_id, module, module_rx, } diff --git a/crates/core/src/client/message_handlers.rs b/crates/core/src/client/message_handlers.rs index 5f71f3d0e9..1033c14aa1 100644 --- a/crates/core/src/client/message_handlers.rs +++ b/crates/core/src/client/message_handlers.rs @@ -36,12 +36,12 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst WORKER_METRICS .websocket_request_msg_size - .with_label_values(&client.database_instance_id, message_kind) + .with_label_values(&client.replica_id, message_kind) .observe(message.len() as f64); WORKER_METRICS .websocket_requests - .with_label_values(&client.database_instance_id, message_kind) + .with_label_values(&client.replica_id, message_kind) .inc(); let message = match message { diff --git a/crates/core/src/database_logger.rs b/crates/core/src/database_logger.rs index c06e8c2919..8e22664025 100644 --- a/crates/core/src/database_logger.rs +++ b/crates/core/src/database_logger.rs @@ -126,10 +126,10 @@ impl DatabaseLogger { // PathBuf::from(path) // } - pub fn filepath(address: &Address, instance_id: u64) -> PathBuf { - let root = crate::stdb_path("worker_node/database_instances"); + pub fn filepath(address: &Address, replica_id: u64) -> PathBuf { + let root = crate::stdb_path("worker_node/replicas"); root.join(&*address.to_hex()) - .join(instance_id.to_string()) + .join(replica_id.to_string()) .join("module_logs") } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 639942046f..874f8af231 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1258,15 +1258,15 @@ pub async fn local_durability(db_path: &Path) -> io::Result<(Arc Result, Box> { let snapshot_dir = db_path.join("snapshots"); std::fs::create_dir_all(&snapshot_dir).map_err(|e| Box::new(SnapshotError::from(e)))?; - SnapshotRepository::open(snapshot_dir, database_address, database_instance_id) + SnapshotRepository::open(snapshot_dir, database_address, replica_id) .map(Arc::new) .map_err(Box::new) } diff --git a/crates/core/src/energy.rs b/crates/core/src/energy.rs index 2f272e3347..9fe3fd82f5 100644 --- a/crates/core/src/energy.rs +++ b/crates/core/src/energy.rs @@ -20,7 +20,7 @@ pub trait EnergyMonitor: Send + Sync + 'static { energy_used: EnergyQuanta, execution_duration: Duration, ); - fn record_disk_usage(&self, database: &Database, instance_id: u64, disk_usage: u64, period: Duration); + fn record_disk_usage(&self, database: &Database, replica_id: u64, disk_usage: u64, period: Duration); } #[derive(Default)] @@ -39,5 +39,5 @@ impl EnergyMonitor for NullEnergyMonitor { ) { } - fn record_disk_usage(&self, _database: &Database, _instance_id: u64, _disk_usage: u64, _period: Duration) {} + fn record_disk_usage(&self, _database: &Database, _replica_id: u64, _disk_usage: u64, _period: Duration) {} } diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 7d8a653547..b65b0e4b23 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -132,7 +132,7 @@ pub enum PlanError { #[derive(Error, Debug)] pub enum DatabaseError { - #[error("Database instance not found: {0}")] + #[error("Replica not found: {0}")] NotFound(u64), #[error("Database is already opened. Path:`{0}`. Error:{1}")] DatabasedOpened(PathBuf, anyhow::Error), diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index f6a7ffae1b..692beda21a 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1,7 +1,6 @@ use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule}; use super::scheduler::SchedulerStarter; use super::{Scheduler, UpdateDatabaseResult}; -use crate::database_instance_context::DatabaseInstanceContext; use crate::database_logger::DatabaseLogger; use crate::db::datastore::traits::Program; use crate::db::db_metrics::DB_METRICS; @@ -9,6 +8,7 @@ use crate::db::relational_db::{self, RelationalDB}; use crate::energy::{EnergyMonitor, EnergyQuanta}; use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; +use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::util::spawn_rayon; use crate::{db, host}; @@ -64,12 +64,12 @@ pub struct HostController { /// Map of all hosts managed by this controller, /// keyed by database instance id. hosts: Hosts, - /// The directory to create database instances in. + /// The directory to create replicas in. /// /// For example: /// - /// - `$STDB_PATH/worker_node/database_instances` - /// - `$STDB_PATH/database_instances` + /// - `$STDB_PATH/worker_node/replicas` + /// - `$STDB_PATH/replicas` root_dir: Arc, /// The default configuration to use for databases created by this /// controller. @@ -205,8 +205,8 @@ impl HostController { /// /// See also: [`Self::get_module_host`] #[tracing::instrument(skip_all)] - pub async fn get_or_launch_module_host(&self, database: Database, instance_id: u64) -> anyhow::Result { - let mut rx = self.watch_maybe_launch_module_host(database, instance_id).await?; + pub async fn get_or_launch_module_host(&self, database: Database, replica_id: u64) -> anyhow::Result { + let mut rx = self.watch_maybe_launch_module_host(database, replica_id).await?; let module = rx.borrow_and_update(); Ok(module.clone()) } @@ -222,13 +222,13 @@ impl HostController { pub async fn watch_maybe_launch_module_host( &self, database: Database, - instance_id: u64, + replica_id: u64, ) -> anyhow::Result> { // Try a read lock first. { - let guard = self.acquire_read_lock(instance_id).await; + let guard = self.acquire_read_lock(replica_id).await; if let Some(host) = &*guard { - trace!("cached host {}/{}", database.address, instance_id); + trace!("cached host {}/{}", database.address, replica_id); return Ok(host.module.subscribe()); } } @@ -236,14 +236,14 @@ impl HostController { // We didn't find a running module, so take a write lock. // Since [`tokio::sync::RwLock`] doesn't support upgrading of read locks, // we'll need to check again if a module was added meanwhile. - let mut guard = self.acquire_write_lock(instance_id).await; + let mut guard = self.acquire_write_lock(replica_id).await; if let Some(host) = &*guard { - trace!("cached host {}/{} (lock upgrade)", database.address, instance_id); + trace!("cached host {}/{} (lock upgrade)", database.address, replica_id); return Ok(host.module.subscribe()); } - trace!("launch host {}/{}", database.address, instance_id); - let host = self.try_init_host(database, instance_id).await?; + trace!("launch host {}/{}", database.address, replica_id); + let host = self.try_init_host(database, replica_id).await?; let rx = host.module.subscribe(); *guard = Some(host); @@ -257,15 +257,15 @@ impl HostController { /// If the computation `F` panics, the host is removed from this controller, /// releasing its resources. #[tracing::instrument(skip_all)] - pub async fn using_database(&self, database: Database, instance_id: u64, f: F) -> anyhow::Result + pub async fn using_database(&self, database: Database, replica_id: u64, f: F) -> anyhow::Result where F: FnOnce(&RelationalDB) -> T + Send + 'static, T: Send + 'static, { - trace!("using database {}/{}", database.address, instance_id); - let module = self.get_or_launch_module_host(database, instance_id).await?; - let on_panic = self.unregister_fn(instance_id); - let result = tokio::task::spawn_blocking(move || f(&module.dbic().relational_db)) + trace!("using database {}/{}", database.address, replica_id); + let module = self.get_or_launch_module_host(database, replica_id).await?; + let on_panic = self.unregister_fn(replica_id); + let result = tokio::task::spawn_blocking(move || f(&module.replica_ctx().relational_db)) .await .unwrap_or_else(|e| { warn!("database operation panicked"); @@ -275,7 +275,7 @@ impl HostController { Ok(result) } - /// Update the [`ModuleHost`] identified by `instance_id` to the given + /// Update the [`ModuleHost`] identified by `replica_id` to the given /// program. /// /// The host may not be running, in which case it is spawned (see @@ -288,7 +288,7 @@ impl HostController { &self, database: Database, host_type: HostType, - instance_id: u64, + replica_id: u64, program_bytes: Box<[u8]>, ) -> anyhow::Result { let program = Program { @@ -298,16 +298,16 @@ impl HostController { trace!( "update module host {}/{}: genesis={} update-to={}", database.address, - instance_id, + replica_id, database.initial_program, program.hash ); - let mut guard = self.acquire_write_lock(instance_id).await; + let mut guard = self.acquire_write_lock(replica_id).await; let mut host = match guard.take() { None => { trace!("host not running, try_init"); - self.try_init_host(database, instance_id).await? + self.try_init_host(database, replica_id).await? } Some(host) => { trace!("host found, updating"); @@ -315,14 +315,14 @@ impl HostController { } }; let update_result = host - .update_module(host_type, program, self.unregister_fn(instance_id)) + .update_module(host_type, program, self.unregister_fn(replica_id)) .await?; *guard = Some(host); Ok(update_result) } - /// Start the host `instance_id` and conditionally update it. + /// Start the host `replica_id` and conditionally update it. /// /// If the host was not initialized before, it is initialized with the /// program [`Database::initial_program`], which is loaded from the @@ -344,19 +344,19 @@ impl HostController { pub async fn init_maybe_update_module_host( &self, database: Database, - instance_id: u64, + replica_id: u64, expected_hash: Option, ) -> anyhow::Result> { - trace!("custom bootstrap {}/{}", database.address, instance_id); + trace!("custom bootstrap {}/{}", database.address, replica_id); let db_addr = database.address; let host_type = database.host_type; let program_hash = database.initial_program; - let mut guard = self.acquire_write_lock(instance_id).await; + let mut guard = self.acquire_write_lock(replica_id).await; let mut host = match guard.take() { Some(host) => host, - None => self.try_init_host(database, instance_id).await?, + None => self.try_init_host(database, replica_id).await?, }; let module = host.module.subscribe(); @@ -386,7 +386,7 @@ impl HostController { ); let program = load_program(&self.program_storage, program_hash).await?; let update_result = host - .update_module(host_type, program, self.unregister_fn(instance_id)) + .update_module(host_type, program, self.unregister_fn(replica_id)) .await?; match update_result { UpdateDatabaseResult::NoUpdateNeeded | UpdateDatabaseResult::UpdatePerformed => { @@ -404,12 +404,12 @@ impl HostController { Ok(module) } - /// Release all resources of the [`ModuleHost`] identified by `instance_id`, + /// Release all resources of the [`ModuleHost`] identified by `replica_id`, /// and deregister it from the controller. #[tracing::instrument(skip_all)] - pub async fn exit_module_host(&self, instance_id: u64) -> Result<(), anyhow::Error> { - trace!("exit module host {}", instance_id); - let lock = self.hosts.lock().remove(&instance_id); + pub async fn exit_module_host(&self, replica_id: u64) -> Result<(), anyhow::Error> { + trace!("exit module host {}", replica_id); + let lock = self.hosts.lock().remove(&replica_id); if let Some(lock) = lock { if let Some(host) = lock.write_owned().await.take() { let module = host.module.borrow().clone(); @@ -420,73 +420,73 @@ impl HostController { Ok(()) } - /// Get the [`ModuleHost`] identified by `instance_id` or return an error + /// Get the [`ModuleHost`] identified by `replica_id` or return an error /// if it is not registered with the controller. /// /// See [`Self::get_or_launch_module_host`] for a variant which launches /// the host if it is not running. #[tracing::instrument(skip_all)] - pub async fn get_module_host(&self, instance_id: u64) -> Result { - trace!("get module host {}", instance_id); - let guard = self.acquire_read_lock(instance_id).await; + pub async fn get_module_host(&self, replica_id: u64) -> Result { + trace!("get module host {}", replica_id); + let guard = self.acquire_read_lock(replica_id).await; guard .as_ref() .map(|Host { module, .. }| module.borrow().clone()) .ok_or(NoSuchModule) } - /// Subscribe to updates of the [`ModuleHost`] identified by `instance_id`, + /// Subscribe to updates of the [`ModuleHost`] identified by `replica_id`, /// or return an error if it is not registered with the controller. /// /// See [`Self::watch_maybe_launch_module_host`] for a variant which /// launches the host if it is not running. #[tracing::instrument(skip_all)] - pub async fn watch_module_host(&self, instance_id: u64) -> Result, NoSuchModule> { - trace!("watch module host {}", instance_id); - let guard = self.acquire_read_lock(instance_id).await; + pub async fn watch_module_host(&self, replica_id: u64) -> Result, NoSuchModule> { + trace!("watch module host {}", replica_id); + let guard = self.acquire_read_lock(replica_id).await; guard .as_ref() .map(|Host { module, .. }| module.subscribe()) .ok_or(NoSuchModule) } - /// `true` if the module host `instance_id` is currently registered with + /// `true` if the module host `replica_id` is currently registered with /// the controller. - pub async fn has_module_host(&self, instance_id: u64) -> bool { - self.acquire_read_lock(instance_id).await.is_some() + pub async fn has_module_host(&self, replica_id: u64) -> bool { + self.acquire_read_lock(replica_id).await.is_some() } /// On-panic callback passed to [`ModuleHost`]s created by this controller. /// - /// Removes the module with the given `instance_id` from this controller. - fn unregister_fn(&self, instance_id: u64) -> impl Fn() + Send + Sync + 'static { + /// Removes the module with the given `replica_id` from this controller. + fn unregister_fn(&self, replica_id: u64) -> impl Fn() + Send + Sync + 'static { let hosts = Arc::downgrade(&self.hosts); move || { if let Some(hosts) = hosts.upgrade() { - hosts.lock().remove(&instance_id); + hosts.lock().remove(&replica_id); } } } - async fn acquire_write_lock(&self, instance_id: u64) -> OwnedRwLockWriteGuard> { - let lock = self.hosts.lock().entry(instance_id).or_default().clone(); + async fn acquire_write_lock(&self, replica_id: u64) -> OwnedRwLockWriteGuard> { + let lock = self.hosts.lock().entry(replica_id).or_default().clone(); lock.write_owned().await } - async fn acquire_read_lock(&self, instance_id: u64) -> OwnedRwLockReadGuard> { - let lock = self.hosts.lock().entry(instance_id).or_default().clone(); + async fn acquire_read_lock(&self, replica_id: u64) -> OwnedRwLockReadGuard> { + let lock = self.hosts.lock().entry(replica_id).or_default().clone(); lock.read_owned().await } - async fn try_init_host(&self, database: Database, instance_id: u64) -> anyhow::Result { + async fn try_init_host(&self, database: Database, replica_id: u64) -> anyhow::Result { Host::try_init( &self.root_dir, self.default_config, database, - instance_id, + replica_id, self.program_storage.clone(), self.energy_monitor.clone(), - self.unregister_fn(instance_id), + self.unregister_fn(replica_id), ) .await } @@ -497,18 +497,18 @@ fn stored_program_hash(db: &RelationalDB) -> anyhow::Result> { Ok(meta.map(|meta| meta.program_hash)) } -async fn make_dbic( +async fn make_replica_ctx( database: Database, - instance_id: u64, + replica_id: u64, relational_db: Arc, -) -> anyhow::Result { - let log_path = DatabaseLogger::filepath(&database.address, instance_id); +) -> anyhow::Result { + let log_path = DatabaseLogger::filepath(&database.address, replica_id); let logger = tokio::task::block_in_place(|| Arc::new(DatabaseLogger::open(log_path))); let subscriptions = ModuleSubscriptions::new(relational_db.clone(), database.owner_identity); - Ok(DatabaseInstanceContext { + Ok(ReplicaContext { database, - database_instance_id: instance_id, + replica_id, logger, relational_db, subscriptions, @@ -516,10 +516,10 @@ async fn make_dbic( } /// Initialize a module host for the given program. -/// The passed dbic may not be configured for this version of the program's database schema yet. +/// The passed replica_ctx may not be configured for this version of the program's database schema yet. async fn make_module_host( host_type: HostType, - dbic: Arc, + replica_ctx: Arc, scheduler: Scheduler, program: Program, energy_monitor: Arc, @@ -529,7 +529,7 @@ async fn make_module_host( let module_host = match host_type { HostType::Wasm => { let mcc = ModuleCreationContext { - dbic, + replica_ctx, scheduler, program: &program, energy_monitor, @@ -554,7 +554,7 @@ async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result, + replica_ctx: Arc, module_host: ModuleHost, scheduler: Scheduler, scheduler_starter: SchedulerStarter, @@ -562,7 +562,7 @@ struct LaunchedModule { async fn launch_module( database: Database, - instance_id: u64, + replica_id: u64, program: Program, on_panic: impl Fn() + Send + Sync + 'static, relational_db: Arc, @@ -571,11 +571,13 @@ async fn launch_module( let address = database.address; let host_type = database.host_type; - let dbic = make_dbic(database, instance_id, relational_db).await.map(Arc::new)?; - let (scheduler, scheduler_starter) = Scheduler::open(dbic.relational_db.clone()); + let replica_ctx = make_replica_ctx(database, replica_id, relational_db) + .await + .map(Arc::new)?; + let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone()); let (program, module_host) = make_module_host( host_type, - dbic.clone(), + replica_ctx.clone(), scheduler.clone(), program, energy_monitor.clone(), @@ -588,7 +590,7 @@ async fn launch_module( Ok(( program, LaunchedModule { - dbic, + replica_ctx, module_host, scheduler, scheduler_starter, @@ -637,17 +639,17 @@ struct Host { /// clients may subscribe to the channel, so they get the most recent /// [`ModuleHost`] version or an error if the [`Host`] was dropped. module: watch::Sender, - /// Pointer to the `module`'s [`DatabaseInstanceContext`]. + /// Pointer to the `module`'s [`ReplicaContext`]. /// /// The database stays the same if and when the module is updated via /// [`Host::update_module`]. - dbic: Arc, + replica_ctx: Arc, /// Scheduler for repeating reducers, operating on the current `module`. scheduler: Scheduler, /// Handle to the metrics collection task started via [`disk_monitor`]. /// - /// The task collects metrics from the `dbic`, and so stays alive as long - /// as the `dbic` is live. The task is aborted when [`Host`] is dropped. + /// The task collects metrics from the `replica_ctx`, and so stays alive as long + /// as the `replica_ctx` is live. The task is aborted when [`Host`] is dropped. metrics_task: AbortHandle, /// [`EnergyMonitor`] to use for [`Host::update_module`]. @@ -664,13 +666,13 @@ impl Host { root_dir: &Path, config: db::Config, database: Database, - instance_id: u64, + replica_id: u64, program_storage: ProgramStorage, energy_monitor: Arc, on_panic: impl Fn() + Send + Sync + 'static, ) -> anyhow::Result { let mut db_path = root_dir.to_path_buf(); - db_path.extend([&*database.address.to_hex(), &*instance_id.to_string()]); + db_path.extend([&*database.address.to_hex(), &*replica_id.to_string()]); db_path.push("database"); let (db, connected_clients) = match config.storage { @@ -684,7 +686,7 @@ impl Host { )?, db::Storage::Disk => { let (durability, disk_size_fn) = relational_db::local_durability(&db_path).await?; - let snapshot_repo = relational_db::open_snapshot_repo(&db_path, database.address, instance_id)?; + let snapshot_repo = relational_db::open_snapshot_repo(&db_path, database.address, replica_id)?; let history = durability.clone(); RelationalDB::open( &db_path, @@ -697,7 +699,7 @@ impl Host { } }; let LaunchedModule { - dbic, + replica_ctx, module_host, scheduler, scheduler_starter, @@ -706,7 +708,7 @@ impl Host { Some(program) => { let (_, launched) = launch_module( database, - instance_id, + replica_id, program, on_panic, Arc::new(db), @@ -722,7 +724,7 @@ impl Host { let program = load_program(&program_storage, database.initial_program).await?; let (program, launched) = launch_module( database, - instance_id, + replica_id, program, on_panic, Arc::new(db), @@ -747,17 +749,17 @@ impl Host { .with_context(|| { format!( "Error calling disconnect for {} {} on {}", - identity, address, dbic.address + identity, address, replica_ctx.address ) })?; } scheduler_starter.start(&module_host)?; - let metrics_task = tokio::spawn(disk_monitor(dbic.clone(), energy_monitor.clone())).abort_handle(); + let metrics_task = tokio::spawn(disk_monitor(replica_ctx.clone(), energy_monitor.clone())).abort_handle(); Ok(Host { module: watch::Sender::new(module_host), - dbic, + replica_ctx, scheduler, metrics_task, @@ -768,7 +770,7 @@ impl Host { /// Attempt to replace this [`Host`]'s [`ModuleHost`] with a new one running /// the program `program_hash`. /// - /// The associated [`DatabaseInstanceContext`] stays the same. + /// The associated [`ReplicaContext`] stays the same. /// /// Executes [`ModuleHost::update_database`] on the newly instantiated /// module, updating the database schema and invoking the `__update__` @@ -783,12 +785,12 @@ impl Host { program: Program, on_panic: impl Fn() + Send + Sync + 'static, ) -> anyhow::Result { - let dbic = &self.dbic; + let replica_ctx = &self.replica_ctx; let (scheduler, scheduler_starter) = self.scheduler.new_with_same_db(); let (program, module) = make_module_host( host_type, - dbic.clone(), + replica_ctx.clone(), scheduler.clone(), program, self.energy_monitor.clone(), @@ -799,7 +801,7 @@ impl Host { // Get the old module info to diff against when building a migration plan. let old_module_info = self.module.borrow().info.clone(); - let update_result = update_module(&dbic.relational_db, &module, program, old_module_info).await?; + let update_result = update_module(&replica_ctx.relational_db, &module, program, old_module_info).await?; trace!("update result: {update_result:?}"); // Only replace the module + scheduler if the update succeeded. // Otherwise, we want the database to continue running with the old state. @@ -814,7 +816,7 @@ impl Host { } fn db(&self) -> &RelationalDB { - &self.dbic.relational_db + &self.replica_ctx.relational_db } } @@ -826,34 +828,34 @@ impl Drop for Host { const DISK_METERING_INTERVAL: Duration = Duration::from_secs(5); -/// Periodically collect the disk usage of `dbic` and update metrics as well as +/// Periodically collect the disk usage of `replica_ctx` and update metrics as well as /// the `energy_monitor` accordingly. -async fn disk_monitor(dbic: Arc, energy_monitor: Arc) { +async fn disk_monitor(replica_ctx: Arc, energy_monitor: Arc) { let mut interval = tokio::time::interval(DISK_METERING_INTERVAL); // We don't care about happening precisely every 5 seconds - it just matters // that the time between ticks is accurate. interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - let mut prev_disk_usage = dbic.total_disk_usage(); + let mut prev_disk_usage = replica_ctx.total_disk_usage(); let mut prev_tick = interval.tick().await; loop { let tick = interval.tick().await; let dt = tick - prev_tick; - let disk_usage = tokio::task::block_in_place(|| dbic.total_disk_usage()); + let disk_usage = tokio::task::block_in_place(|| replica_ctx.total_disk_usage()); if let Some(num_bytes) = disk_usage.durability { DB_METRICS .message_log_size - .with_label_values(&dbic.address) + .with_label_values(&replica_ctx.address) .set(num_bytes as i64); } if let Some(num_bytes) = disk_usage.logs { DB_METRICS .module_log_file_size - .with_label_values(&dbic.address) + .with_label_values(&replica_ctx.address) .set(num_bytes as i64); } let disk_usage = disk_usage.or(prev_disk_usage); - energy_monitor.record_disk_usage(&dbic.database, dbic.database_instance_id, disk_usage.sum(), dt); + energy_monitor.record_disk_usage(&replica_ctx.database, replica_ctx.replica_id, disk_usage.sum(), dt); prev_disk_usage = disk_usage; prev_tick = tick; } diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 1158bb61d5..38435ad5a6 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -1,9 +1,9 @@ use super::scheduler::{get_schedule_from_row, ScheduleError, Scheduler}; -use crate::database_instance_context::DatabaseInstanceContext; use crate::database_logger::{BacktraceProvider, LogLevel, Record}; use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::error::{IndexError, NodesError}; use crate::execution_context::ExecutionContext; +use crate::replica_context::ReplicaContext; use parking_lot::{Mutex, MutexGuard}; use smallvec::SmallVec; use spacetimedb_primitives::{ColId, IndexId, TableId}; @@ -14,7 +14,7 @@ use std::sync::Arc; #[derive(Clone)] pub struct InstanceEnv { - pub dbic: Arc, + pub replica_ctx: Arc, pub scheduler: Scheduler, pub tx: TxSlot, } @@ -73,9 +73,9 @@ impl ChunkedWriter { // Generic 'instance environment' delegated to from various host types. impl InstanceEnv { - pub fn new(dbic: Arc, scheduler: Scheduler) -> Self { + pub fn new(replica_ctx: Arc, scheduler: Scheduler) -> Self { Self { - dbic, + replica_ctx, scheduler, tx: TxSlot::default(), } @@ -91,8 +91,12 @@ impl InstanceEnv { #[tracing::instrument(skip_all)] pub fn console_log(&self, level: LogLevel, record: &Record, bt: &dyn BacktraceProvider) { - self.dbic.logger.write(level, record, bt); - log::trace!("MOD({}): {}", self.dbic.address.to_abbreviated_hex(), record.message); + self.replica_ctx.logger.write(level, record, bt); + log::trace!( + "MOD({}): {}", + self.replica_ctx.address.to_abbreviated_hex(), + record.message + ); } pub fn insert( @@ -101,7 +105,7 @@ impl InstanceEnv { table_id: TableId, buffer: &[u8], ) -> Result { - let stdb = &*self.dbic.relational_db; + let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.get_tx()?; let (gen_cols, row_ptr) = stdb @@ -149,7 +153,7 @@ impl InstanceEnv { col_id: ColId, value: &[u8], ) -> Result { - let stdb = &*self.dbic.relational_db; + let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.get_tx()?; // Interpret the `value` using the schema of the column. @@ -176,7 +180,7 @@ impl InstanceEnv { rstart: &[u8], rend: &[u8], ) -> Result { - let stdb = &*self.dbic.relational_db; + let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.tx.get()?; // Find all rows in the table to delete. @@ -198,7 +202,7 @@ impl InstanceEnv { /// - a row couldn't be decoded to the table schema type. #[tracing::instrument(skip(self, relation))] pub fn datastore_delete_all_by_eq_bsatn(&self, table_id: TableId, relation: &[u8]) -> Result { - let stdb = &*self.dbic.relational_db; + let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.get_tx()?; // Find the row schema using it to decode a vector of product values. @@ -217,7 +221,7 @@ impl InstanceEnv { /// and `TableNotFound` if the table does not exist. #[tracing::instrument(skip_all)] pub fn table_id_from_name(&self, table_name: &str) -> Result { - let stdb = &*self.dbic.relational_db; + let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.get_tx()?; // Query the table id from the name. @@ -231,7 +235,7 @@ impl InstanceEnv { /// and `IndexNotFound` if the index does not exist. #[tracing::instrument(skip_all)] pub fn index_id_from_name(&self, index_name: &str) -> Result { - let stdb = &*self.dbic.relational_db; + let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.get_tx()?; // Query the index id from the name. @@ -245,7 +249,7 @@ impl InstanceEnv { /// and `TableNotFound` if the table does not exist. #[tracing::instrument(skip_all)] pub fn datastore_table_row_count(&self, table_id: TableId) -> Result { - let stdb = &*self.dbic.relational_db; + let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.get_tx()?; // Query the row count for id. @@ -266,7 +270,7 @@ impl InstanceEnv { col_id: ColId, value: &[u8], ) -> Result>, NodesError> { - let stdb = &*self.dbic.relational_db; + let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.get_tx()?; // Interpret the `value` using the schema of the column. @@ -283,7 +287,7 @@ impl InstanceEnv { ctx: &ExecutionContext, table_id: TableId, ) -> Result>, NodesError> { - let stdb = &*self.dbic.relational_db; + let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.tx.get()?; let chunks = ChunkedWriter::collect_iter(stdb.iter_mut(ctx, tx, table_id)?); @@ -299,7 +303,7 @@ impl InstanceEnv { rstart: &[u8], rend: &[u8], ) -> Result>, NodesError> { - let stdb = &*self.dbic.relational_db; + let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.tx.get()?; let (_, iter) = stdb.btree_scan(tx, index_id, prefix, prefix_elems, rstart, rend)?; diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 10b3bd2582..76d21056b6 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1,7 +1,6 @@ use super::wasm_common::{CLIENT_CONNECTED_DUNDER, CLIENT_DISCONNECTED_DUNDER}; use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId}; use crate::client::{ClientActorId, ClientConnectionSender}; -use crate::database_instance_context::DatabaseInstanceContext; use crate::database_logger::{LogLevel, Record}; use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::db::datastore::system_tables::{StClientFields, StClientRow, ST_CLIENT_ID}; @@ -12,6 +11,7 @@ use crate::execution_context::{ExecutionContext, ReducerContext}; use crate::hash::Hash; use crate::identity::Identity; use crate::messages::control_db::Database; +use crate::replica_context::ReplicaContext; use crate::sql; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::util::lending_pool::{Closed, LendingPool, LentResource, PoolClosed}; @@ -274,7 +274,7 @@ pub trait Module: Send + Sync + 'static { fn initial_instances(&mut self) -> Self::InitialInstances<'_>; fn info(&self) -> Arc; fn create_instance(&self) -> Self::Instance; - fn dbic(&self) -> &DatabaseInstanceContext; + fn replica_ctx(&self) -> &ReplicaContext; fn close(self); #[cfg(feature = "tracelogging")] fn get_trace(&self) -> Option; @@ -285,7 +285,7 @@ pub trait Module: Send + Sync + 'static { pub trait ModuleInstance: Send + 'static { fn trapped(&self) -> bool; - /// If the module instance's dbic is uninitialized, initialize it. + /// If the module instance's replica_ctx is uninitialized, initialize it. fn init_database(&mut self, program: Program) -> anyhow::Result>; /// Update the module instance's database to match the schema of the module instance. @@ -369,7 +369,7 @@ impl fmt::Debug for ModuleHost { #[async_trait::async_trait] trait DynModuleHost: Send + Sync + 'static { async fn get_instance(&self, db: Address) -> Result, NoSuchModule>; - fn dbic(&self) -> &DatabaseInstanceContext; + fn replica_ctx(&self) -> &ReplicaContext; fn exit(&self) -> Closed<'_>; fn exited(&self) -> Closed<'_>; } @@ -428,8 +428,8 @@ impl DynModuleHost for HostControllerActor { })) } - fn dbic(&self) -> &DatabaseInstanceContext { - self.module.dbic() + fn replica_ctx(&self) -> &ReplicaContext { + self.module.replica_ctx() } fn exit(&self) -> Closed<'_> { @@ -566,7 +566,7 @@ impl ModuleHost { CLIENT_DISCONNECTED_DUNDER }; - let db = &self.inner.dbic().relational_db; + let db = &self.inner.replica_ctx().relational_db; let ctx = || { ExecutionContext::reducer( db.address(), @@ -637,7 +637,7 @@ impl ModuleHost { caller_address: Address, connected: bool, ) -> Result<(), DBError> { - let db = &*self.inner.dbic().relational_db; + let db = &*self.inner.replica_ctx().relational_db; let ctx = &ExecutionContext::internal(db.address()); let row = &StClientRow { identity: caller_identity.into(), @@ -757,7 +757,7 @@ impl ModuleHost { &self, call_reducer_params: impl FnOnce(&MutTxId) -> anyhow::Result> + Send + 'static, ) -> Result { - let db = self.inner.dbic().relational_db.clone(); + let db = self.inner.replica_ctx().relational_db.clone(); // scheduled reducer name not fetched yet, anyway this is only for logging purpose const REDUCER: &str = "scheduled_reducer"; self.call(REDUCER, move |inst: &mut dyn ModuleInstance| { @@ -808,7 +808,7 @@ impl ModuleHost { } pub fn inject_logs(&self, log_level: LogLevel, message: &str) { - self.dbic().logger.write( + self.replica_ctx().logger.write( log_level, &Record { ts: chrono::Utc::now(), @@ -823,9 +823,9 @@ impl ModuleHost { #[tracing::instrument(skip_all)] pub fn one_off_query(&self, caller_identity: Identity, query: String) -> Result, anyhow::Error> { - let dbic = self.dbic(); - let db = &dbic.relational_db; - let auth = AuthCtx::new(dbic.owner_identity, caller_identity); + let replica_ctx = self.replica_ctx(); + let db = &replica_ctx.relational_db; + let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity); log::debug!("One-off query: {query}"); let ctx = &ExecutionContext::sql(db.address()); db.with_read_only(ctx, |tx| { @@ -839,7 +839,7 @@ impl ModuleHost { /// for tables without primary keys. It is only used in the benchmarks. /// Note: this doesn't drop the table, it just clears it! pub fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error> { - let db = &*self.dbic().relational_db; + let db = &*self.replica_ctx().relational_db; db.with_auto_commit(&ExecutionContext::internal(db.address()), |tx| { let tables = db.get_all_tables_mut(tx)?; // We currently have unique table names, @@ -863,11 +863,11 @@ impl ModuleHost { } pub fn database_info(&self) -> &Database { - &self.dbic().database + &self.replica_ctx().database } - pub(crate) fn dbic(&self) -> &DatabaseInstanceContext { - self.inner.dbic() + pub(crate) fn replica_ctx(&self) -> &ReplicaContext { + self.inner.replica_ctx() } } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 95d1e6011a..41f9f0b7d6 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -274,7 +274,7 @@ impl SchedulerActor { let Some(module_host) = self.module_host.upgrade() else { return; }; - let db = module_host.dbic().relational_db.clone(); + let db = module_host.replica_ctx().relational_db.clone(); let ctx = ExecutionContext::internal(db.address()); let caller_identity = module_host.info().identity; let module_info = module_host.info.clone(); @@ -332,7 +332,7 @@ impl SchedulerActor { })) }; - let db = module_host.dbic().relational_db.clone(); + let db = module_host.replica_ctx().relational_db.clone(); let module_host_clone = module_host.clone(); let ctx = ExecutionContext::internal(db.address()); diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index a0a7d64a86..9f22d7a42e 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -13,7 +13,6 @@ use spacetimedb_lib::buffer::DecodeError; use spacetimedb_lib::{bsatn, Address, RawModuleDef}; use super::instrumentation::CallTimes; -use crate::database_instance_context::DatabaseInstanceContext; use crate::database_logger::SystemLogger; use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::db::datastore::system_tables::{StClientRow, ST_CLIENT_ID}; @@ -28,6 +27,7 @@ use crate::host::{ArgsTuple, ReducerCallResult, ReducerId, ReducerOutcome, Sched use crate::identity::Identity; use crate::messages::control_db::HostType; use crate::module_host_context::ModuleCreationContext; +use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::WriteConflict; use crate::util::const_unwrap; use crate::util::prometheus_handle::HistogramExt; @@ -82,7 +82,7 @@ pub struct ExecuteResult { pub(crate) struct WasmModuleHostActor { module: T::InstancePre, initial_instance: Option>>, - database_instance_context: Arc, + replica_context: Arc, scheduler: Scheduler, func_names: Arc, info: Arc, @@ -130,7 +130,7 @@ pub enum DescribeError { impl WasmModuleHostActor { pub fn new(mcc: ModuleCreationContext, module: T) -> Result { let ModuleCreationContext { - dbic: database_instance_context, + replica_ctx: replica_context, scheduler, program, energy_monitor, @@ -138,10 +138,10 @@ impl WasmModuleHostActor { let module_hash = program.hash; log::trace!( "Making new module host actor for database {} with module {}", - database_instance_context.address, + replica_context.address, module_hash, ); - let log_tx = database_instance_context.logger.tx.clone(); + let log_tx = replica_context.logger.tx.clone(); FuncNames::check_required(|name| module.get_export(name))?; let mut func_names = FuncNames::default(); @@ -150,7 +150,7 @@ impl WasmModuleHostActor { let uninit_instance = module.instantiate_pre()?; let mut instance = uninit_instance.instantiate( - InstanceEnv::new(database_instance_context.clone(), scheduler.clone()), + InstanceEnv::new(replica_context.clone(), scheduler.clone()), &func_names, )?; @@ -163,11 +163,11 @@ impl WasmModuleHostActor { // Note: assigns Reducer IDs based on the alphabetical order of reducer names. let info = ModuleInfo::new( def, - database_instance_context.owner_identity, - database_instance_context.address, + replica_context.owner_identity, + replica_context.address, module_hash, log_tx, - database_instance_context.subscriptions.clone(), + replica_context.subscriptions.clone(), ); let func_names = Arc::new(func_names); @@ -176,7 +176,7 @@ impl WasmModuleHostActor { initial_instance: None, func_names, info, - database_instance_context, + replica_context, scheduler, energy_monitor, }; @@ -211,7 +211,7 @@ impl Module for WasmModuleHostActor { } fn create_instance(&self) -> Self::Instance { - let env = InstanceEnv::new(self.database_instance_context.clone(), self.scheduler.clone()); + let env = InstanceEnv::new(self.replica_context.clone(), self.scheduler.clone()); // this shouldn't fail, since we already called module.create_instance() // before and it didn't error, and ideally they should be deterministic let mut instance = self @@ -222,8 +222,8 @@ impl Module for WasmModuleHostActor { self.make_from_instance(instance) } - fn dbic(&self) -> &DatabaseInstanceContext { - &self.database_instance_context + fn replica_ctx(&self) -> &ReplicaContext { + &self.replica_context } fn close(self) { @@ -247,8 +247,8 @@ impl std::fmt::Debug for WasmModuleInstance { } impl WasmModuleInstance { - fn database_instance_context(&self) -> &DatabaseInstanceContext { - &self.instance.instance_env().dbic + fn replica_context(&self) -> &ReplicaContext { + &self.instance.instance_env().replica_ctx } } @@ -257,11 +257,11 @@ impl ModuleInstance for WasmModuleInstance { self.trapped } - #[tracing::instrument(skip_all, fields(db_id = self.instance.instance_env().dbic.id))] + #[tracing::instrument(skip_all, fields(db_id = self.instance.instance_env().replica_ctx.id))] fn init_database(&mut self, program: Program) -> anyhow::Result> { log::debug!("init database"); let timestamp = Timestamp::now(); - let stdb = &*self.database_instance_context().relational_db; + let stdb = &*self.replica_context().relational_db; let ctx = ExecutionContext::internal(stdb.address()); let tx = stdb.begin_mut_tx(IsolationLevel::Serializable); let (tx, ()) = stdb @@ -291,7 +291,7 @@ impl ModuleInstance for WasmModuleInstance { Some(reducer_id) => { self.system_logger().info("Invoking `init` reducer"); - let caller_identity = self.database_instance_context().database.owner_identity; + let caller_identity = self.replica_context().database.owner_identity; let client = None; Some(self.call_reducer_with_tx( Some(tx), @@ -327,7 +327,7 @@ impl ModuleInstance for WasmModuleInstance { return Ok(UpdateDatabaseResult::AutoMigrateError(errs)); } }; - let stdb = &*self.database_instance_context().relational_db; + let stdb = &*self.replica_context().relational_db; let ctx = Lazy::new(|| ExecutionContext::internal(stdb.address())); let program_hash = program.hash; @@ -389,9 +389,9 @@ impl WasmModuleInstance { } = params; let caller_address_opt = (caller_address != Address::__DUMMY).then_some(caller_address); - let dbic = self.database_instance_context(); - let stdb = &*dbic.relational_db.clone(); - let address = dbic.address; + let replica_ctx = self.replica_context(); + let stdb = &*replica_ctx.relational_db.clone(); + let address = replica_ctx.address; let reducer_name = self .info .reducers_map @@ -538,11 +538,11 @@ impl WasmModuleInstance { // Helpers - NOT API fn system_logger(&self) -> &SystemLogger { - self.database_instance_context().logger.system_logger() + self.replica_context().logger.system_logger() } fn insert_st_client(&self, tx: &mut MutTxId, identity: Identity, address: Address) -> Result<(), DBError> { - let db = &*self.database_instance_context().relational_db; + let db = &*self.replica_context().relational_db; let row = &StClientRow { identity: identity.into(), address: address.into(), diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 3ad43d4376..132b602389 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1040,7 +1040,7 @@ impl WasmInstanceEnv { message: &message, }; - // Write the log record to the `DatabaseLogger` in the database instance context (dbic). + // Write the log record to the `DatabaseLogger` in the database instance context (replica_ctx). env.instance_env .console_log((level as u8).into(), &record, &caller.as_context()); Ok(()) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index d08d7e5d69..267fb63ed7 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -31,12 +31,12 @@ pub use spacetimedb_sats::hash; pub mod callgrind_flag; pub mod client; pub mod config; -pub mod database_instance_context; pub mod database_logger; pub mod estimation; pub mod execution_context; pub mod host; pub mod module_host_context; +pub mod replica_context; pub mod sendgrid_controller; pub mod startup; pub mod subscription; diff --git a/crates/core/src/messages/control_db.rs b/crates/core/src/messages/control_db.rs index 7d0a04f226..4525432af1 100644 --- a/crates/core/src/messages/control_db.rs +++ b/crates/core/src/messages/control_db.rs @@ -45,14 +45,14 @@ pub struct DatabaseStatus { pub state: String, } #[derive(Clone, PartialEq, Serialize, Deserialize)] -pub struct DatabaseInstance { +pub struct Replica { pub id: u64, pub database_id: u64, pub node_id: u64, pub leader: bool, } #[derive(Clone, PartialEq, Serialize, Deserialize)] -pub struct DatabaseInstanceStatus { +pub struct ReplicaStatus { pub state: String, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/crates/core/src/messages/control_worker_api.rs b/crates/core/src/messages/control_worker_api.rs index f77695b4e7..17c5d8bc33 100644 --- a/crates/core/src/messages/control_worker_api.rs +++ b/crates/core/src/messages/control_worker_api.rs @@ -19,7 +19,7 @@ pub enum ControlBoundMessage { } #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct ScheduleState { - pub database_instances: Vec, + pub replicas: Vec, pub databases: Vec, pub nodes: Vec, } @@ -31,19 +31,19 @@ pub enum ScheduleUpdate { } #[derive(Clone, PartialEq, Serialize, Deserialize)] pub enum InsertOperation { - DatabaseInstance(DatabaseInstance), + Replica(Replica), Database(Database), Node(Node), } #[derive(Clone, PartialEq, Serialize, Deserialize)] pub enum UpdateOperation { - DatabaseInstance(DatabaseInstance), + Replica(Replica), Database(Database), Node(Node), } #[derive(Clone, PartialEq, Serialize, Deserialize)] pub enum DeleteOperation { - DatabaseInstanceId(u64), + ReplicaId(u64), DatabaseId(u64), NodeId(u64), } diff --git a/crates/core/src/messages/worker_db.rs b/crates/core/src/messages/worker_db.rs index eab9d7055d..698957894f 100644 --- a/crates/core/src/messages/worker_db.rs +++ b/crates/core/src/messages/worker_db.rs @@ -2,7 +2,7 @@ use spacetimedb_sats::de::Deserialize; use spacetimedb_sats::ser::Serialize; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct DatabaseInstanceState { - pub database_instance_id: u64, +pub struct ReplicaState { + pub replica_id: u64, pub initialized: bool, } diff --git a/crates/core/src/module_host_context.rs b/crates/core/src/module_host_context.rs index 7ffc08ff81..15055b190a 100644 --- a/crates/core/src/module_host_context.rs +++ b/crates/core/src/module_host_context.rs @@ -1,11 +1,11 @@ -use crate::database_instance_context::DatabaseInstanceContext; use crate::db::datastore::traits::Program; use crate::energy::EnergyMonitor; use crate::host::scheduler::Scheduler; +use crate::replica_context::ReplicaContext; use std::sync::Arc; pub struct ModuleCreationContext<'a> { - pub dbic: Arc, + pub replica_ctx: Arc, pub scheduler: Scheduler, pub program: &'a Program, pub energy_monitor: Arc, diff --git a/crates/core/src/database_instance_context.rs b/crates/core/src/replica_context.rs similarity index 92% rename from crates/core/src/database_instance_context.rs rename to crates/core/src/replica_context.rs index c3ce095107..a502ae6493 100644 --- a/crates/core/src/database_instance_context.rs +++ b/crates/core/src/replica_context.rs @@ -12,18 +12,18 @@ pub type Result = anyhow::Result; /// A "live" database. #[derive(Clone)] -pub struct DatabaseInstanceContext { +pub struct ReplicaContext { pub database: Database, - pub database_instance_id: u64, + pub replica_id: u64, pub logger: Arc, pub subscriptions: ModuleSubscriptions, pub relational_db: Arc, } -impl DatabaseInstanceContext { +impl ReplicaContext { pub fn scheduler_db_path(&self, root_db_path: PathBuf) -> PathBuf { let mut scheduler_db_path = root_db_path; - scheduler_db_path.extend([&*self.address.to_hex(), &*self.database_instance_id.to_string()]); + scheduler_db_path.extend([&*self.address.to_hex(), &*self.replica_id.to_string()]); scheduler_db_path.push("scheduler"); scheduler_db_path } @@ -51,7 +51,7 @@ impl DatabaseInstanceContext { } } -impl Deref for DatabaseInstanceContext { +impl Deref for ReplicaContext { type Target = Database; fn deref(&self) -> &Self::Target { diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 5e33931511..6b4ff3f034 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -14,12 +14,12 @@ metrics_group!( #[name = spacetime_websocket_requests_total] #[help = "The cumulative number of websocket request messages"] - #[labels(instance_id: u64, protocol: str)] + #[labels(replica_id: u64, protocol: str)] pub websocket_requests: IntCounterVec, #[name = spacetime_websocket_request_msg_size] #[help = "The size of messages received on connected sessions"] - #[labels(instance_id: u64, protocol: str)] + #[labels(replica_id: u64, protocol: str)] pub websocket_request_msg_size: HistogramVec, #[name = spacetime_websocket_sent_msg_size_bytes] diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index 44fa27b5da..325c2ce2ac 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -163,7 +163,7 @@ pub struct Snapshot { /// The address of the snapshotted database. pub database_address: Address, /// The instance ID of the snapshotted database. - pub database_instance_id: u64, + pub replica_id: u64, /// ABI version of the module from which this snapshot was created, as [MAJOR, MINOR]. /// @@ -470,7 +470,7 @@ pub struct SnapshotRepository { database_address: Address, /// The database instance ID of the database instance for which this repository stores snapshots. - database_instance_id: u64, + replica_id: u64, // TODO(deduplication): track the most recent successful snapshot // (possibly in a file) // and hardlink its objects into the next snapshot for deduplication. @@ -563,7 +563,7 @@ impl SnapshotRepository { magic: MAGIC, version: CURRENT_SNAPSHOT_VERSION, database_address: self.database_address, - database_instance_id: self.database_instance_id, + replica_id: self.replica_id, module_abi_version: CURRENT_MODULE_ABI_VERSION, tx_offset, blobs: vec![], @@ -678,7 +678,7 @@ impl SnapshotRepository { Ok(ReconstructedSnapshot { database_address: snapshot.database_address, - database_instance_id: snapshot.database_instance_id, + replica_id: snapshot.replica_id, tx_offset: snapshot.tx_offset, module_abi_version: snapshot.module_abi_version, blob_store, @@ -690,14 +690,14 @@ impl SnapshotRepository { /// /// Calls [`Path::is_dir`] and requires that the result is `true`. /// See that method for more detailed preconditions on this function. - pub fn open(root: PathBuf, database_address: Address, database_instance_id: u64) -> Result { + pub fn open(root: PathBuf, database_address: Address, replica_id: u64) -> Result { if !root.is_dir() { return Err(SnapshotError::NotDirectory { root }); } Ok(Self { root, database_address, - database_instance_id, + replica_id, }) } @@ -778,7 +778,7 @@ pub struct ReconstructedSnapshot { /// The address of the snapshotted database. pub database_address: Address, /// The instance ID of the snapshotted database. - pub database_instance_id: u64, + pub replica_id: u64, /// The transaction offset of the state this snapshot reflects. pub tx_offset: TxOffset, /// ABI version of the module from which this snapshot was created, as [MAJOR, MINOR]. diff --git a/crates/standalone/src/control_db.rs b/crates/standalone/src/control_db.rs index c6877dc0ca..6f608e3dc7 100644 --- a/crates/standalone/src/control_db.rs +++ b/crates/standalone/src/control_db.rs @@ -1,7 +1,7 @@ use spacetimedb::address::Address; use spacetimedb::hash::hash_bytes; use spacetimedb::identity::Identity; -use spacetimedb::messages::control_db::{Database, DatabaseInstance, EnergyBalance, IdentityEmail, Node}; +use spacetimedb::messages::control_db::{Database, EnergyBalance, IdentityEmail, Node, Replica}; use spacetimedb::{energy, stdb_path}; use spacetimedb_client_api_messages::name::{ @@ -368,35 +368,35 @@ impl ControlDb { Ok(None) } - pub fn get_database_instances(&self) -> Result> { - let tree = self.db.open_tree("database_instance")?; - let mut database_instances = Vec::new(); + pub fn get_replicas(&self) -> Result> { + let tree = self.db.open_tree("replica")?; + let mut replicas = Vec::new(); let scan_key: &[u8] = b""; for result in tree.range(scan_key..) { let (_key, value) = result?; - let database_instance = bsatn::from_slice(&value[..]).unwrap(); - database_instances.push(database_instance); + let replica = bsatn::from_slice(&value[..]).unwrap(); + replicas.push(replica); } - Ok(database_instances) + Ok(replicas) } - pub fn get_database_instance_by_id(&self, database_instance_id: u64) -> Result> { - for di in self.get_database_instances()? { - if di.id == database_instance_id { + pub fn get_replica_by_id(&self, replica_id: u64) -> Result> { + for di in self.get_replicas()? { + if di.id == replica_id { return Ok(Some(di)); } } Ok(None) } - pub fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option { - self.get_database_instances() + pub fn get_leader_replica_by_database(&self, database_id: u64) -> Option { + self.get_replicas() .unwrap() .into_iter() .find(|instance| instance.database_id == database_id && instance.leader) } - pub fn get_database_instances_by_database(&self, database_id: u64) -> Result> { + pub fn get_replicas_by_database(&self, database_id: u64) -> Result> { // TODO: because we don't have foreign key constraints it's actually possible to have // instances in here with no database. Although we'd be in a bit of a corrupted state // in that case @@ -406,30 +406,30 @@ impl ControlDb { // return Err(anyhow::anyhow!("No such database.")); // } // - let database_instances = self - .get_database_instances()? + let replicas = self + .get_replicas()? .iter() .filter(|instance| instance.database_id == database_id) .cloned() .collect::>(); - Ok(database_instances) + Ok(replicas) } - pub fn insert_database_instance(&self, mut database_instance: DatabaseInstance) -> Result { - let tree = self.db.open_tree("database_instance")?; + pub fn insert_replica(&self, mut replica: Replica) -> Result { + let tree = self.db.open_tree("replica")?; let id = self.db.generate_id()?; - database_instance.id = id; - let buf = bsatn::to_vec(&database_instance).unwrap(); + replica.id = id; + let buf = bsatn::to_vec(&replica).unwrap(); tree.insert(id.to_be_bytes(), buf)?; Ok(id) } - pub fn delete_database_instance(&self, id: u64) -> Result<()> { - let tree = self.db.open_tree("database_instance")?; + pub fn delete_replica(&self, id: u64) -> Result<()> { + let tree = self.db.open_tree("replica")?; tree.remove(id.to_be_bytes())?; Ok(()) } diff --git a/crates/standalone/src/control_db/tests.rs b/crates/standalone/src/control_db/tests.rs index b36ce0c8a8..ebeb45fe0d 100644 --- a/crates/standalone/src/control_db/tests.rs +++ b/crates/standalone/src/control_db/tests.rs @@ -109,17 +109,17 @@ fn test_decode() -> ResultTest<()> { assert_eq!(dbs.len(), 1); assert_eq!(dbs[0].owner_identity, id); - let mut new_database_instance = DatabaseInstance { + let mut new_replica = Replica { id: 0, database_id: 1, node_id: 0, leader: true, }; - let id = cdb.insert_database_instance(new_database_instance.clone())?; - new_database_instance.id = id; + let id = cdb.insert_replica(new_replica.clone())?; + new_replica.id = id; - let dbs = cdb.get_database_instances()?; + let dbs = cdb.get_replicas()?; assert_eq!(dbs.len(), 1); assert_eq!(dbs[0].id, id); diff --git a/crates/standalone/src/energy_monitor.rs b/crates/standalone/src/energy_monitor.rs index f82318cf1f..74ed2e4d6a 100644 --- a/crates/standalone/src/energy_monitor.rs +++ b/crates/standalone/src/energy_monitor.rs @@ -36,7 +36,7 @@ impl EnergyMonitor for StandaloneEnergyMonitor { self.withdraw_energy(fingerprint.module_identity, energy_used) } - fn record_disk_usage(&self, database: &Database, _instance_id: u64, disk_usage: u64, period: Duration) { + fn record_disk_usage(&self, database: &Database, _replica_id: u64, disk_usage: u64, period: Duration) { let amount = EnergyQuanta::from_disk_usage(disk_usage, period); self.withdraw_energy(database.owner_identity, amount) } diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index f3014333df..8be943c446 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -21,7 +21,7 @@ use spacetimedb::db::{db_metrics::DB_METRICS, Config}; use spacetimedb::energy::{EnergyBalance, EnergyQuanta}; use spacetimedb::host::{DiskStorage, HostController, UpdateDatabaseResult}; use spacetimedb::identity::Identity; -use spacetimedb::messages::control_db::{Database, DatabaseInstance, IdentityEmail, Node}; +use spacetimedb::messages::control_db::{Database, IdentityEmail, Node, Replica}; use spacetimedb::sendgrid_controller::SendGridController; use spacetimedb::stdb_path; use spacetimedb::worker_metrics::WORKER_METRICS; @@ -52,7 +52,7 @@ impl StandaloneEnv { let program_store = Arc::new(DiskStorage::new(stdb_path("control_node/program_bytes")).await?); let host_controller = HostController::new( - stdb_path("worker_node/database_instances").into(), + stdb_path("worker_node/replicas").into(), config, program_store.clone(), energy_monitor, @@ -215,17 +215,17 @@ impl spacetimedb_client_api::ControlStateReadAccess for StandaloneEnv { Ok(self.control_db.get_databases()?) } - // Database instances - fn get_database_instance_by_id(&self, id: u64) -> anyhow::Result> { - Ok(self.control_db.get_database_instance_by_id(id)?) + // Replicas + fn get_replica_by_id(&self, id: u64) -> anyhow::Result> { + Ok(self.control_db.get_replica_by_id(id)?) } - fn get_database_instances(&self) -> anyhow::Result> { - Ok(self.control_db.get_database_instances()?) + fn get_replicas(&self) -> anyhow::Result> { + Ok(self.control_db.get_replicas()?) } - fn get_leader_database_instance_by_database(&self, database_id: u64) -> Option { - self.control_db.get_leader_database_instance_by_database(database_id) + fn get_leader_replica_by_database(&self, database_id: u64) -> Option { + self.control_db.get_leader_replica_by_database(database_id) } // Identities @@ -302,7 +302,7 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { let leader = self .control_db - .get_leader_database_instance_by_database(database_id) + .get_leader_replica_by_database(database_id) .with_context(|| format!("Not found: leader instance for database `{}`", database_addr))?; let update_result = self .host_controller @@ -310,23 +310,23 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { .await?; if update_result.was_successful() { - let instances = self.control_db.get_database_instances_by_database(database_id)?; - let desired_instances = spec.num_replicas as usize; - if desired_instances == 0 { - log::info!("Decommissioning all instances of database {}", database_addr); - for instance in instances { - self.delete_database_instance(instance.id).await?; + let replicas = self.control_db.get_replicas_by_database(database_id)?; + let desired_replicas = spec.num_replicas as usize; + if desired_replicas == 0 { + log::info!("Decommissioning all replicas of database {}", database_addr); + for instance in replicas { + self.delete_replica(instance.id).await?; } - } else if desired_instances > instances.len() { - let n = desired_instances - instances.len(); + } else if desired_replicas > replicas.len() { + let n = desired_replicas - replicas.len(); log::info!( - "Scaling up database {} from {} to {} instances", + "Scaling up database {} from {} to {} replicas", database_addr, - instances.len(), + replicas.len(), n ); for _ in 0..n { - self.insert_database_instance(DatabaseInstance { + self.insert_replica(Replica { id: 0, database_id, node_id: 0, @@ -334,21 +334,21 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { }) .await?; } - } else if desired_instances < instances.len() { - let n = instances.len() - desired_instances; + } else if desired_replicas < replicas.len() { + let n = replicas.len() - desired_replicas; log::info!( - "Scaling down database {} from {} to {} instances", + "Scaling down database {} from {} to {} replicas", database_addr, - instances.len(), + replicas.len(), n ); - for instance in instances.into_iter().filter(|instance| !instance.leader).take(n) { - self.delete_database_instance(instance.id).await?; + for instance in replicas.into_iter().filter(|instance| !instance.leader).take(n) { + self.delete_replica(instance.id).await?; } } else { log::debug!( "Desired replica count {} for database {} already satisfied", - desired_instances, + desired_replicas, database_addr ); } @@ -373,8 +373,8 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { ); self.control_db.delete_database(database.id)?; - for instance in self.control_db.get_database_instances_by_database(database.id)? { - self.delete_database_instance(instance.id).await?; + for instance in self.control_db.get_replicas_by_database(database.id)? { + self.delete_replica(instance.id).await?; } Ok(()) @@ -427,19 +427,19 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { } impl StandaloneEnv { - async fn insert_database_instance(&self, database_instance: DatabaseInstance) -> Result<(), anyhow::Error> { - let mut new_database_instance = database_instance.clone(); - let id = self.control_db.insert_database_instance(database_instance)?; - new_database_instance.id = id; + async fn insert_replica(&self, replica: Replica) -> Result<(), anyhow::Error> { + let mut new_replica = replica.clone(); + let id = self.control_db.insert_replica(replica)?; + new_replica.id = id; - self.on_insert_database_instance(&new_database_instance).await?; + self.on_insert_replica(&new_replica).await?; Ok(()) } - async fn delete_database_instance(&self, database_instance_id: u64) -> Result<(), anyhow::Error> { - self.control_db.delete_database_instance(database_instance_id)?; - self.on_delete_database_instance(database_instance_id).await?; + async fn delete_replica(&self, replica_id: u64) -> Result<(), anyhow::Error> { + self.control_db.delete_replica(replica_id)?; + self.on_delete_replica(replica_id).await?; Ok(()) } @@ -447,19 +447,19 @@ impl StandaloneEnv { async fn schedule_replicas(&self, database_id: u64, num_replicas: u32) -> Result<(), anyhow::Error> { // Just scheduling a bunch of replicas to the only machine for i in 0..num_replicas { - let database_instance = DatabaseInstance { + let replica = Replica { id: 0, database_id, node_id: 0, leader: i == 0, }; - self.insert_database_instance(database_instance).await?; + self.insert_replica(replica).await?; } Ok(()) } - async fn on_insert_database_instance(&self, instance: &DatabaseInstance) -> Result<(), anyhow::Error> { + async fn on_insert_replica(&self, instance: &Replica) -> Result<(), anyhow::Error> { if instance.leader { let database = self .control_db @@ -479,12 +479,12 @@ impl StandaloneEnv { Ok(()) } - async fn on_delete_database_instance(&self, instance_id: u64) -> anyhow::Result<()> { + async fn on_delete_replica(&self, replica_id: u64) -> anyhow::Result<()> { // TODO(cloutiertyler): We should think about how to clean up - // database instances which have been deleted. This will just drop + // replicas which have been deleted. This will just drop // them from memory, but will not remove them from disk. We need // some kind of database lifecycle manager long term. - self.host_controller.exit_module_host(instance_id).await?; + self.host_controller.exit_module_host(replica_id).await?; Ok(()) } diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index f9ca99f493..e0adf66e83 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -75,7 +75,7 @@ impl ModuleHandle { } pub async fn read_log(&self, size: Option) -> String { - let filepath = DatabaseLogger::filepath(&self.db_address, self.client.database_instance_id); + let filepath = DatabaseLogger::filepath(&self.db_address, self.client.replica_id); DatabaseLogger::read_latest(&filepath, size).await } } @@ -172,7 +172,7 @@ impl CompiledModule { .unwrap(); let database = env.get_database_by_address(&db_address).unwrap().unwrap(); - let instance = env.get_leader_database_instance_by_database(database.id).unwrap(); + let instance = env.get_leader_replica_by_database(database.id).unwrap(); let client_id = ClientActorId { identity,