From b4b5adf0162081b4f1eff6ed260b18427897bcbf Mon Sep 17 00:00:00 2001 From: George Kulakowski Date: Tue, 8 Aug 2023 14:36:36 -0700 Subject: [PATCH 01/13] Remove unused datastore traits We previously anticipated factoring the database in such a way that the typed vs untyped distinctions, and the transactional vs not, semantics would be more visible. We have not used these distinctions and no longer need to expose these traits. --- crates/core/src/db/datastore/traits.rs | 108 ------------------------- crates/core/src/db/relational_db.rs | 2 +- 2 files changed, 1 insertion(+), 109 deletions(-) diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index fe87b40390..c5e97bdadd 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -316,24 +316,10 @@ pub struct TxData { pub(crate) records: Vec, } -pub trait Blob { - fn view(&self) -> &[u8]; -} - pub trait Data: Into { fn view(&self) -> &ProductValue; } -pub trait BlobRow: Send + Sync { - type TableId: Copy; - type RowId: Copy; - - type Blob: Blob; - type BlobRef: Clone; - - fn blob_to_owned(&self, blob_ref: Self::BlobRef) -> Self::Blob; -} - pub trait DataRow: Send + Sync { type RowId: Copy; @@ -358,100 +344,6 @@ pub trait MutTx { fn commit_mut_tx(&self, tx: Self::MutTxId) -> Result>; } -pub trait Blobstore: BlobRow { - type Iter<'a>: Iterator - where - Self: 'a; - - fn iter_blobs(&self, table_id: TableId) -> Result>; - - fn get_blob(&self, table_id: TableId, row_id: Self::RowId) -> Result>; -} - -pub trait MutBlobstore: Blobstore { - fn delete_blob(&self, table_id: TableId, row_id: Self::RowId) -> Result<()>; - - fn insert_blob(&self, table_id: TableId, row: &[u8]) -> Result; -} - -pub trait Datastore: DataRow { - type Iter<'a>: Iterator - where - Self: 'a; - - type IterByColRange<'a, R: RangeBounds>: Iterator - where - Self: 'a; - - type IterByColEq<'a>: Iterator - where - Self: 'a; - - fn iter(&self, table_id: TableId) -> Result>; - - fn iter_by_col_range>( - &self, - table_id: TableId, - col_id: ColId, - range: R, - ) -> Result>; - - fn iter_by_col_eq<'a>( - &'a self, - table_id: TableId, - col_id: ColId, - value: &'a AlgebraicValue, - ) -> Result>; - - fn get(&self, table_id: TableId, row_id: Self::RowId) -> Result>; -} - -pub trait MutDatastore: Datastore { - fn delete(&self, table_id: TableId, row_id: Self::RowId) -> Result<()>; - - fn insert(&self, table_id: TableId, row: ProductValue) -> Result; -} - -pub trait TxBlobstore: BlobRow + Tx { - type Iter<'a>: Iterator - where - Self: 'a; - - fn iter_blobs_tx<'a>(&'a self, tx: &'a Self::TxId, table_id: TableId) -> Result>; - - fn get_blob_tx<'a>( - &'a self, - tx: &'a Self::TxId, - table_id: TableId, - row_id: Self::RowId, - ) -> Result>; -} - -pub trait MutTxBlobstore: TxBlobstore + MutTx { - fn iter_blobs_mut_tx<'a>(&'a self, tx: &'a Self::MutTxId, table_id: TableId) -> Result>; - - fn get_blob_mut_tx<'a>( - &'a self, - tx: &'a Self::MutTxId, - table_id: TableId, - row_id: Self::RowId, - ) -> Result>; - - fn delete_blob_mut_tx<'a>( - &'a self, - tx: &'a mut Self::MutTxId, - table_id: TableId, - row_id: Self::RowId, - ) -> Result<()>; - - fn insert_blob_mut_tx<'a>( - &'a self, - tx: &'a mut Self::MutTxId, - table_id: TableId, - row: &[u8], - ) -> Result; -} - pub trait TxDatastore: DataRow + Tx { type Iter<'a>: Iterator where diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 630cdbadf4..80db98e74a 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -100,7 +100,7 @@ impl RelationalDB { last_commit_offset = Some(commit.commit_offset); for transaction in commit.transactions { transaction_offset += 1; - // NOTE: Although I am creating a blobstore transaction in a + // NOTE: Although I am creating a datastore transaction in a // one to one fashion for each message log transaction, this // is just to reduce memory usage while inserting. We don't // really care about inserting these transactionally as long From eef0089b7869b6f1e0d43dcbabe960ac447f7619 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Alejandro=20Montoya=20Corte=CC=81s?= Date: Tue, 8 Aug 2023 17:40:19 -0500 Subject: [PATCH 02/13] Fix recovery of sequences after restart --- .../db/datastore/locking_tx_datastore/mod.rs | 31 +++++++++++++++---- .../locking_tx_datastore/sequence.rs | 6 +++- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index d6e65bc46f..9055647bd6 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -345,6 +345,7 @@ impl Inner { // If any columns are auto incrementing, we need to create a sequence // NOTE: This code with the `seq_start` is particularly fragile. + // TODO: If we exceed `SEQUENCE_PREALLOCATION_AMOUNT` we will get a unique violation if col.is_autoinc { let (seq_start, seq_id): (i128, SequenceId) = match TableId(schema.table_id) { ST_TABLES_ID => (4, TABLE_ID_SEQUENCE_ID), // The database is bootstrapped with 4 tables @@ -399,10 +400,26 @@ impl Inner { let rows = st_sequences.scan_rows().cloned().collect::>(); for row in rows { let sequence = StSequenceRow::try_from(&row)?; + // TODO: The system tables have initialized their value already, but this is wrong: + // If we exceed `SEQUENCE_PREALLOCATION_AMOUNT` we will get a unique violation + let is_system_table = self + .committed_state + .tables + .get(&TableId(sequence.table_id)) + .map(|x| x.schema.table_type == StTableType::System) + .unwrap_or(false); + let schema = (&sequence).into(); + + let mut seq = Sequence::new(schema); + //Now we need to recover the last allocation value... + if !is_system_table && seq.value < sequence.allocated + 1 { + seq.value = sequence.allocated + 1; + } + self.sequence_state .sequences - .insert(SequenceId(sequence.sequence_id), Sequence::new(schema)); + .insert(SequenceId(sequence.sequence_id), seq); } Ok(()) } @@ -483,9 +500,12 @@ impl Inner { return Err(SequenceError::NotFound(seq_id).into()); }; - // If there are allocated sequence values, return the new value. + // If there are allocated sequence values, return the new value, if it is not bigger than + // the upper range of `sequence.allocated` if let Some(value) = sequence.gen_next_value() { - return Ok(value); + if value < sequence.allocated() { + return Ok(value); + } } } // Allocate new sequence values @@ -506,8 +526,7 @@ impl Inner { }; let old_seq_row_id = RowId(old_seq_row.to_data_key()); let mut seq_row = StSequenceRow::try_from(&old_seq_row)?; - let num_to_allocate = 1024; - seq_row.allocated = sequence.nth_value(num_to_allocate); + seq_row.allocated = sequence.nth_value(SEQUENCE_PREALLOCATION_AMOUNT as usize); sequence.set_allocation(seq_row.allocated); (seq_row, old_seq_row_id) }; @@ -540,7 +559,7 @@ impl Inner { sequence_name: seq.sequence_name.as_str(), table_id: seq.table_id, col_id: seq.col_id, - allocated: 0, + allocated: seq.start.unwrap_or(1), increment: seq.increment, start: seq.start.unwrap_or(1), min_value: seq.min_value.unwrap_or(1), diff --git a/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs b/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs index 89f231d577..f048e26150 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs @@ -2,7 +2,7 @@ use crate::db::datastore::traits::SequenceSchema; pub struct Sequence { schema: SequenceSchema, - value: i128, + pub(crate) value: i128, } impl Sequence { @@ -46,6 +46,10 @@ impl Sequence { Some(value) } + pub fn allocated(&self) -> i128 { + self.schema.allocated + } + pub fn next_value(&self) -> i128 { self.nth_value(1) } From 9bac5ebe8d187052e6af06277fe161dfc689de71 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Thu, 10 Aug 2023 14:40:24 +0200 Subject: [PATCH 03/13] Debug --- crates/client-api/src/routes/database.rs | 3 +++ crates/client-api/src/util.rs | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index a9904de229..0666431cd3 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -354,10 +354,13 @@ pub async fn info( State(worker_ctx): State>, Path(InfoParams { name_or_address }): Path, ) -> axum::response::Result { + log::trace!("Trying to resolve address: {:?}", name_or_address); let address = name_or_address.resolve(&*worker_ctx).await?; + log::trace!("Resolved address to: {address:?}"); let database = worker_ctx_find_database(&*worker_ctx, &address) .await? .ok_or((StatusCode::NOT_FOUND, "No such database."))?; + log::trace!("Fetched database from the worker db for address: {address:?}"); let host_type = match database.host_type { HostType::Wasmer => "wasmer", diff --git a/crates/client-api/src/util.rs b/crates/client-api/src/util.rs index 1aa0bc7fbd..36c760dab3 100644 --- a/crates/client-api/src/util.rs +++ b/crates/client-api/src/util.rs @@ -60,7 +60,7 @@ impl headers::Header for XForwardedFor { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum NameOrAddress { Address(Address), Name(String), From df111d44db1fa76bb014149d12748f1a4b655e02 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Fri, 11 Aug 2023 06:23:50 +0200 Subject: [PATCH 04/13] More debug logs --- crates/client-api/src/routes/subscribe.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index baaed161db..d10cb6670f 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -39,8 +39,11 @@ pub async fn handle_websocket( auth: SpacetimeAuthHeader, ws: WebSocketUpgrade, ) -> axum::response::Result { + log::trace!("Handling WebSocket subscription request, getting or creating auth"); let auth = auth.get_or_create(&*worker_ctx).await?; + log::trace!("Auth: {}", auth.identity.to_hex()); + log::trace!("Resolving address: {:?}", name_or_address); let address = name_or_address.resolve(&*worker_ctx).await?; let (res, ws_upgrade, protocol) = @@ -50,6 +53,7 @@ pub async fn handle_websocket( // TODO: Should also maybe refactor the code and the protocol to allow a single websocket // to connect to multiple modules + log::trace!("Resolve address to {address:?}"); let database = worker_ctx .get_database_by_address(&address) .await @@ -61,6 +65,7 @@ pub async fn handle_websocket( .ok_or(StatusCode::BAD_REQUEST)?; let instance_id = database_instance.id; + log::trace!("Got databsae {} and instance {}", database.id, database_instance.id); let identity_token = auth.creds.token().to_owned(); let host = worker_ctx.host_controller(); From 33e68f4f10c6db203062975776ea8305627a0e6d Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Fri, 11 Aug 2023 08:09:25 +0200 Subject: [PATCH 05/13] Export more tyeps --- crates/client-api/src/routes/identity.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/client-api/src/routes/identity.rs b/crates/client-api/src/routes/identity.rs index 0b96744107..4cc7be4e61 100644 --- a/crates/client-api/src/routes/identity.rs +++ b/crates/client-api/src/routes/identity.rs @@ -8,7 +8,7 @@ use spacetimedb::auth::identity::encode_token_with_expiry; use spacetimedb_lib::de::serde::DeserializeWrapper; use spacetimedb_lib::Identity; -use crate::auth::{SpacetimeAuth, SpacetimeAuthHeader}; +pub use crate::auth::{SpacetimeAuth, SpacetimeAuthHeader}; use crate::{log_and_500, ControlCtx, ControlNodeDelegate}; #[derive(Deserialize)] @@ -175,7 +175,7 @@ pub async fn get_databases( #[derive(Debug, Serialize)] pub struct WebsocketTokenResponse { - token: String, + pub token: String, } pub async fn create_websocket_token( From e0c122b8c2f5a2dc0169e95932456782f81eaf9b Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Sun, 13 Aug 2023 19:36:59 -0700 Subject: [PATCH 06/13] More debug info and small changes --- .../core/src/host/wasm_common/module_host_actor.rs | 12 +++++++++--- crates/core/src/messages/worker_db.rs | 2 +- crates/lib/src/address.rs | 8 +++++++- crates/standalone/src/lib.rs | 4 ++-- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 0198ad80ea..2c1602b233 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -144,6 +144,8 @@ impl WasmModuleHostActor { scheduler: Scheduler, energy_monitor: Arc, ) -> Result { + log::trace!("Making new module host actor for database {}", database_instance_context.address); + let trace_log = if database_instance_context.trace_log { Some(Arc::new(Mutex::new(TraceLog::new().unwrap()))) } else { @@ -218,6 +220,7 @@ impl JobRunnerSeed for InstanceSeed { type Runner = WasmInstanceActor; type Job = InstanceMessage; fn make_runner(&self) -> Self::Runner { + log::trace!("Making new runner for database {}", self.worker_database_instance.address); let env = InstanceEnv::new( self.worker_database_instance.clone(), self.scheduler.clone(), @@ -529,10 +532,13 @@ impl WasmInstanceActor { stdb.with_auto_commit::<_, _, anyhow::Error>(|tx| { for table in self.info.catalog.values().filter_map(EntityDef::as_table) { let schema = self.schema_for(table)?; - stdb.create_table(tx, schema) - .with_context(|| format!("failed to create table {}", table.name))?; + let result = stdb.create_table(tx, schema) + .with_context(|| format!("failed to create table {}", table.name)); + if let Err(err) = result { + log::error!("{:?}", err); + return Err(err) + } } - Ok(()) })?; diff --git a/crates/core/src/messages/worker_db.rs b/crates/core/src/messages/worker_db.rs index a7367ef2e3..eab9d7055d 100644 --- a/crates/core/src/messages/worker_db.rs +++ b/crates/core/src/messages/worker_db.rs @@ -1,7 +1,7 @@ use spacetimedb_sats::de::Deserialize; use spacetimedb_sats::ser::Serialize; -#[derive(Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct DatabaseInstanceState { pub database_instance_id: u64, pub initialized: bool, diff --git a/crates/lib/src/address.rs b/crates/lib/src/address.rs index e5fabfea94..b5816deb35 100644 --- a/crates/lib/src/address.rs +++ b/crates/lib/src/address.rs @@ -1,4 +1,4 @@ -use std::net::Ipv6Addr; +use std::{net::Ipv6Addr, fmt::Display}; use anyhow::Context as _; use hex::FromHex as _; @@ -16,6 +16,12 @@ use crate::sats; #[cfg_attr(feature = "serde", derive(serde::Serialize))] pub struct Address(u128); +impl Display for Address { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.to_hex()) + } +} + impl Address { const ABBREVIATION_LEN: usize = 16; diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 7e8c2ae939..1948aab8da 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -472,10 +472,10 @@ impl StandaloneEnv { database_instance_id: instance.id, initialized: false, }; + state.initialized = true; + self.worker_db.upsert_database_instance_state(state.clone()).unwrap(); self.init_module_on_database_instance(instance.database_id, instance.id) .await?; - self.worker_db.upsert_database_instance_state(state.clone()).unwrap(); - state.initialized = true; self.worker_db.upsert_database_instance_state(state).unwrap(); Ok(()) } From c98eae4e05e0322db2ffcd8503ae97f1597574e7 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Mon, 14 Aug 2023 17:48:28 +0200 Subject: [PATCH 07/13] More logs --- crates/core/src/host/host_controller.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 5020a92e78..b737d49b09 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -165,6 +165,7 @@ impl HostController { } pub async fn init_module_host(&self, module_host_context: ModuleHostContext) -> Result { + log::trace!("spawn_module_host in init_module_host, database.instance_id: {}", module_host_context.dbic.database_instance_id); let module_host = self.spawn_module_host(module_host_context).await?; // TODO(cloutiertyler): Hook this up again // let identity = &module_host.info().identity; @@ -187,6 +188,7 @@ impl HostController { &self, module_host_context: ModuleHostContext, ) -> Result { + log::trace!("spawn_module_host in update_module_host, database.instance_id: {}", module_host_context.dbic.database_instance_id); let module_host = self.spawn_module_host(module_host_context).await?; // TODO: see init_module_host let update_result = module_host.update_database().await?; @@ -198,6 +200,7 @@ impl HostController { } pub async fn add_module_host(&self, module_host_context: ModuleHostContext) -> Result { + log::trace!("spawn_module_host in add_module_host, database.instance_id: {}", module_host_context.dbic.database_instance_id); let module_host = self.spawn_module_host(module_host_context).await?; // module_host.init_function(); ?? Ok(module_host) From 073650b729a49925047b6822d19c1a6d1a61c28c Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Mon, 14 Aug 2023 09:58:34 -0700 Subject: [PATCH 08/13] Change waiting on messages from client to be async --- crates/core/src/host/host_controller.rs | 24 ++++++++++++------- crates/core/src/host/module_host.rs | 10 ++++---- .../src/host/wasm_common/module_host_actor.rs | 1 + 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index b737d49b09..2e03169db8 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -8,7 +8,7 @@ use std::collections::HashMap; use std::fmt; use std::ops::Sub; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; use super::module_host::{ Catalog, EntityDef, EventStatus, ModuleHost, ModuleStarter, NoSuchModule, UpdateDatabaseResult, @@ -220,7 +220,7 @@ impl HostController { let key = module_host_context.dbic.database_instance_id; let (module_host, start_module, start_scheduler) = - tokio::task::block_in_place(|| Self::make_module_host(module_host_context, self.energy_monitor.clone()))?; + Self::make_module_host(module_host_context, self.energy_monitor.clone())?; let old_module = self.modules.lock().unwrap().insert(key, module_host.clone()); if let Some(old_module) = old_module { @@ -238,13 +238,19 @@ impl HostController { ) -> anyhow::Result<(ModuleHost, ModuleStarter, SchedulerStarter)> { let module_hash = hash_bytes(&mhc.program_bytes); let (module_host, module_starter) = match mhc.host_type { - HostType::Wasmer => ModuleHost::spawn(wasmer::make_actor( - mhc.dbic, - module_hash, - &mhc.program_bytes, - mhc.scheduler, - energy_monitor, - )?), + HostType::Wasmer => { + // make_actor with block_in_place since it's going to take some time to compute. + let start = Instant::now(); + let actor = tokio::task::block_in_place(|| wasmer::make_actor( + mhc.dbic, + module_hash, + &mhc.program_bytes, + mhc.scheduler, + energy_monitor, + ))?; + log::trace!("wasmer::make_actor blocked for {}us", start.elapsed().as_micros()); + ModuleHost::spawn(actor) + }, }; Ok((module_host, module_starter, mhc.scheduler_starter)) } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 9e30b2ead1..41cd0c1c41 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -367,15 +367,15 @@ impl ModuleHost { let (tx, rx) = mpsc::channel(8); let (start_tx, start_rx) = oneshot::channel(); let info = actor.info(); - tokio::task::spawn_blocking(|| { - let _ = start_rx.blocking_recv(); - Self::run_actor(rx, actor) + tokio::spawn(async move { + let _ = start_rx.await; + Self::run_actor(rx, actor).await }); (ModuleHost { info, tx }, ModuleStarter { tx: start_tx }) } - fn run_actor(mut rx: mpsc::Receiver, mut actor: impl ModuleHostActor) { - while let Some(command) = rx.blocking_recv() { + async fn run_actor(mut rx: mpsc::Receiver, mut actor: impl ModuleHostActor) { + while let Some(command) = rx.recv().await { match command { CmdOrExit::Cmd(command) => command.dispatch(&mut actor), CmdOrExit::Exit => rx.close(), diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 2c1602b233..9efbf5ecba 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -306,6 +306,7 @@ impl JobPool { } }); } + fn spawn(&self) { self.spawn_from_runner(self.seed().make_runner()) } From 875e095f3b41948c441ba7ab560bf76f809afcd0 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Tue, 22 Aug 2023 22:38:01 +0200 Subject: [PATCH 09/13] Remove some trace lines --- crates/client-api/src/routes/subscribe.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index c811f59b67..85f575fc57 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -39,11 +39,8 @@ pub async fn handle_websocket( auth: SpacetimeAuthHeader, ws: WebSocketUpgrade, ) -> axum::response::Result { - log::trace!("Handling WebSocket subscription request, getting or creating auth"); let auth = auth.get_or_create(&*worker_ctx).await?; - log::trace!("Auth: {}", auth.identity.to_hex()); - log::trace!("Resolving address: {:?}", name_or_address); let address = name_or_address.resolve(&*worker_ctx).await?.into(); let (res, ws_upgrade, protocol) = @@ -53,7 +50,6 @@ pub async fn handle_websocket( // TODO: Should also maybe refactor the code and the protocol to allow a single websocket // to connect to multiple modules - log::trace!("Resolve address to {address:?}"); let database = worker_ctx .get_database_by_address(&address) .await @@ -65,7 +61,6 @@ pub async fn handle_websocket( .ok_or(StatusCode::BAD_REQUEST)?; let instance_id = database_instance.id; - log::trace!("Got databsae {} and instance {}", database.id, database_instance.id); let identity_token = auth.creds.token().to_owned(); let host = worker_ctx.host_controller(); From e9954878978fbcb5eea0797a4a295ebb56086dd1 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Tue, 22 Aug 2023 22:39:13 +0200 Subject: [PATCH 10/13] Revert "Fix recovery of sequences after restart" This reverts commit eef0089b7869b6f1e0d43dcbabe960ac447f7619. --- .../db/datastore/locking_tx_datastore/mod.rs | 31 ++++--------------- .../locking_tx_datastore/sequence.rs | 6 +--- 2 files changed, 7 insertions(+), 30 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index f02e4736c0..a661097cba 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -380,7 +380,6 @@ impl Inner { // If any columns are auto incrementing, we need to create a sequence // NOTE: This code with the `seq_start` is particularly fragile. - // TODO: If we exceed `SEQUENCE_PREALLOCATION_AMOUNT` we will get a unique violation if col.is_autoinc { let (seq_start, seq_id): (i128, SequenceId) = match TableId(schema.table_id) { ST_TABLES_ID => (4, TABLE_ID_SEQUENCE_ID), // The database is bootstrapped with 4 tables @@ -435,26 +434,10 @@ impl Inner { let rows = st_sequences.scan_rows().cloned().collect::>(); for row in rows { let sequence = StSequenceRow::try_from(&row)?; - // TODO: The system tables have initialized their value already, but this is wrong: - // If we exceed `SEQUENCE_PREALLOCATION_AMOUNT` we will get a unique violation - let is_system_table = self - .committed_state - .tables - .get(&TableId(sequence.table_id)) - .map(|x| x.schema.table_type == StTableType::System) - .unwrap_or(false); - let schema = (&sequence).into(); - - let mut seq = Sequence::new(schema); - //Now we need to recover the last allocation value... - if !is_system_table && seq.value < sequence.allocated + 1 { - seq.value = sequence.allocated + 1; - } - self.sequence_state .sequences - .insert(SequenceId(sequence.sequence_id), seq); + .insert(SequenceId(sequence.sequence_id), Sequence::new(schema)); } Ok(()) } @@ -535,12 +518,9 @@ impl Inner { return Err(SequenceError::NotFound(seq_id).into()); }; - // If there are allocated sequence values, return the new value, if it is not bigger than - // the upper range of `sequence.allocated` + // If there are allocated sequence values, return the new value. if let Some(value) = sequence.gen_next_value() { - if value < sequence.allocated() { - return Ok(value); - } + return Ok(value); } } // Allocate new sequence values @@ -561,7 +541,8 @@ impl Inner { }; let old_seq_row_id = RowId(old_seq_row.to_data_key()); let mut seq_row = StSequenceRow::try_from(&old_seq_row)?; - seq_row.allocated = sequence.nth_value(SEQUENCE_PREALLOCATION_AMOUNT as usize); + let num_to_allocate = 1024; + seq_row.allocated = sequence.nth_value(num_to_allocate); sequence.set_allocation(seq_row.allocated); (seq_row, old_seq_row_id) }; @@ -594,7 +575,7 @@ impl Inner { sequence_name: seq.sequence_name.as_str(), table_id: seq.table_id, col_id: seq.col_id, - allocated: seq.start.unwrap_or(1), + allocated: 0, increment: seq.increment, start: seq.start.unwrap_or(1), min_value: seq.min_value.unwrap_or(1), diff --git a/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs b/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs index f048e26150..89f231d577 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/sequence.rs @@ -2,7 +2,7 @@ use crate::db::datastore::traits::SequenceSchema; pub struct Sequence { schema: SequenceSchema, - pub(crate) value: i128, + value: i128, } impl Sequence { @@ -46,10 +46,6 @@ impl Sequence { Some(value) } - pub fn allocated(&self) -> i128 { - self.schema.allocated - } - pub fn next_value(&self) -> i128 { self.nth_value(1) } From ea8df14bd806e9865fb964bd0279190a95eb5fbb Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Tue, 22 Aug 2023 22:41:06 +0200 Subject: [PATCH 11/13] Remove more trace lines --- crates/core/src/host/host_controller.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 2e03169db8..d5cdc2b27b 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -165,7 +165,6 @@ impl HostController { } pub async fn init_module_host(&self, module_host_context: ModuleHostContext) -> Result { - log::trace!("spawn_module_host in init_module_host, database.instance_id: {}", module_host_context.dbic.database_instance_id); let module_host = self.spawn_module_host(module_host_context).await?; // TODO(cloutiertyler): Hook this up again // let identity = &module_host.info().identity; @@ -188,7 +187,6 @@ impl HostController { &self, module_host_context: ModuleHostContext, ) -> Result { - log::trace!("spawn_module_host in update_module_host, database.instance_id: {}", module_host_context.dbic.database_instance_id); let module_host = self.spawn_module_host(module_host_context).await?; // TODO: see init_module_host let update_result = module_host.update_database().await?; @@ -200,7 +198,6 @@ impl HostController { } pub async fn add_module_host(&self, module_host_context: ModuleHostContext) -> Result { - log::trace!("spawn_module_host in add_module_host, database.instance_id: {}", module_host_context.dbic.database_instance_id); let module_host = self.spawn_module_host(module_host_context).await?; // module_host.init_function(); ?? Ok(module_host) From 92a7200d6e77ec9a3f3d1d1df5aed266a7925db4 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Tue, 22 Aug 2023 22:42:48 +0200 Subject: [PATCH 12/13] cargo fmt --- crates/core/src/host/host_controller.rs | 12 ++++-------- .../src/host/wasm_common/module_host_actor.rs | 15 +++++++++++---- crates/lib/src/address.rs | 2 +- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index d5cdc2b27b..4caac45ca7 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -238,16 +238,12 @@ impl HostController { HostType::Wasmer => { // make_actor with block_in_place since it's going to take some time to compute. let start = Instant::now(); - let actor = tokio::task::block_in_place(|| wasmer::make_actor( - mhc.dbic, - module_hash, - &mhc.program_bytes, - mhc.scheduler, - energy_monitor, - ))?; + let actor = tokio::task::block_in_place(|| { + wasmer::make_actor(mhc.dbic, module_hash, &mhc.program_bytes, mhc.scheduler, energy_monitor) + })?; log::trace!("wasmer::make_actor blocked for {}us", start.elapsed().as_micros()); ModuleHost::spawn(actor) - }, + } }; Ok((module_host, module_starter, mhc.scheduler_starter)) } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 9efbf5ecba..e4ca6761f8 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -144,7 +144,10 @@ impl WasmModuleHostActor { scheduler: Scheduler, energy_monitor: Arc, ) -> Result { - log::trace!("Making new module host actor for database {}", database_instance_context.address); + log::trace!( + "Making new module host actor for database {}", + database_instance_context.address + ); let trace_log = if database_instance_context.trace_log { Some(Arc::new(Mutex::new(TraceLog::new().unwrap()))) @@ -220,7 +223,10 @@ impl JobRunnerSeed for InstanceSeed { type Runner = WasmInstanceActor; type Job = InstanceMessage; fn make_runner(&self) -> Self::Runner { - log::trace!("Making new runner for database {}", self.worker_database_instance.address); + log::trace!( + "Making new runner for database {}", + self.worker_database_instance.address + ); let env = InstanceEnv::new( self.worker_database_instance.clone(), self.scheduler.clone(), @@ -533,11 +539,12 @@ impl WasmInstanceActor { stdb.with_auto_commit::<_, _, anyhow::Error>(|tx| { for table in self.info.catalog.values().filter_map(EntityDef::as_table) { let schema = self.schema_for(table)?; - let result = stdb.create_table(tx, schema) + let result = stdb + .create_table(tx, schema) .with_context(|| format!("failed to create table {}", table.name)); if let Err(err) = result { log::error!("{:?}", err); - return Err(err) + return Err(err); } } Ok(()) diff --git a/crates/lib/src/address.rs b/crates/lib/src/address.rs index 684d3e02f7..d90a6a149a 100644 --- a/crates/lib/src/address.rs +++ b/crates/lib/src/address.rs @@ -1,4 +1,4 @@ -use std::{net::Ipv6Addr, fmt::Display}; +use std::{fmt::Display, net::Ipv6Addr}; use anyhow::Context as _; use hex::FromHex as _; From a893de33cc0c183890c4b6a975420753642a58b8 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Tue, 22 Aug 2023 22:44:06 +0200 Subject: [PATCH 13/13] clippy --- crates/sdk/examples/cursive-chat/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sdk/examples/cursive-chat/main.rs b/crates/sdk/examples/cursive-chat/main.rs index 7c0e214f40..0936fbc4bb 100644 --- a/crates/sdk/examples/cursive-chat/main.rs +++ b/crates/sdk/examples/cursive-chat/main.rs @@ -127,7 +127,7 @@ fn register_callbacks(send: UiSend) { on_set_name(on_name_set(send.clone())); // When we fail to send a message, print a warning. - on_send_message(on_message_sent(send.clone())); + on_send_message(on_message_sent(send)); } // ## Save credentials to a file