diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index 9d3c9168377..e4efde3bce8 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -556,26 +556,24 @@ impl Inner { Ok(()) } - fn drop_table_from_st_tables(&mut self, table_id: TableId) -> super::Result<()> { - const ST_TABLES_TABLE_ID_COL: ColId = ColId(0); - let rows = self.iter_by_col_eq(&ST_TABLES_ID, ST_TABLES_TABLE_ID_COL, table_id.into())?; - let rows = rows.map(|row| row.view().to_owned()).collect::>(); - if rows.is_empty() { + fn drop_col_eq(&mut self, table_id: TableId, col_id: ColId, value: AlgebraicValue) -> super::Result<()> { + let rows = self.iter_by_col_eq(&table_id, col_id, value)?; + let ids_to_delete = rows.map(|row| RowId(*row.id())).collect::>(); + if ids_to_delete.is_empty() { return Err(TableError::IdNotFound(table_id.0).into()); } - self.delete_by_rel(&table_id, rows)?; + self.delete(&table_id, ids_to_delete); Ok(()) } + fn drop_table_from_st_tables(&mut self, table_id: TableId) -> super::Result<()> { + const ST_TABLES_TABLE_ID_COL: ColId = ColId(0); + self.drop_col_eq(ST_TABLES_ID, ST_TABLES_TABLE_ID_COL, table_id.into()) + } + fn drop_table_from_st_columns(&mut self, table_id: TableId) -> super::Result<()> { const ST_COLUMNS_TABLE_ID_COL: ColId = ColId(0); - let rows = self.iter_by_col_eq(&ST_COLUMNS_ID, ST_COLUMNS_TABLE_ID_COL, table_id.into())?; - let rows = rows.map(|row| row.view().to_owned()).collect::>(); - if rows.is_empty() { - return Err(TableError::IdNotFound(table_id.0).into()); - } - self.delete_by_rel(&table_id, rows)?; - Ok(()) + self.drop_col_eq(ST_COLUMNS_ID, ST_COLUMNS_TABLE_ID_COL, table_id.into()) } #[tracing::instrument(skip_all)] @@ -611,7 +609,7 @@ impl Inner { (seq_row, old_seq_row_id) }; - self.delete(&ST_SEQUENCES_ID, &old_seq_row_id)?; + self.delete(&ST_SEQUENCES_ID, [old_seq_row_id]); self.insert(ST_SEQUENCES_ID, ProductValue::from(seq_row))?; let Some(sequence) = self.sequence_state.get_sequence_mut(seq_id) else { @@ -667,7 +665,7 @@ impl Inner { .unwrap() .data; let old_seq_row_id = RowId(old_seq_row.to_data_key()); - self.delete(&ST_SEQUENCES_ID, &old_seq_row_id)?; + self.delete(&ST_SEQUENCES_ID, [old_seq_row_id]); self.sequence_state.sequences.remove(&seq_id); Ok(()) } @@ -870,24 +868,22 @@ impl Inner { fn drop_table(&mut self, table_id: TableId) -> super::Result<()> { // First drop the tables indexes. const ST_INDEXES_TABLE_ID_COL: ColId = ColId(1); - let rows = self + let indexes = self .iter_by_col_eq(&ST_INDEXES_ID, ST_INDEXES_TABLE_ID_COL, table_id.into())? - .map(|x| x.view().clone()) - .collect::>(); - for row in rows { - let el = StIndexRow::try_from(&row)?; - self.drop_index(IndexId(el.index_id))?; + .map(|row| StIndexRow::try_from(row.view()).map(|el| el.index_id)) + .collect::, _>>()?; + for id in indexes { + self.drop_index(IndexId(id))?; } // Remove the table's sequences from st_sequences. const ST_SEQUENCES_TABLE_ID_COL: ColId = ColId(2); let rows = self .iter_by_col_eq(&ST_SEQUENCES_ID, ST_SEQUENCES_TABLE_ID_COL, table_id.into())? - .map(|x| x.view().clone()) - .collect::>(); - for row in rows { - let el = StSequenceRow::try_from(&row)?; - self.drop_sequence(SequenceId(el.sequence_id))?; + .map(|row| StSequenceRow::try_from(row.view()).map(|el| el.sequence_id)) + .collect::, _>>()?; + for id in rows { + self.drop_sequence(SequenceId(id))?; } // Remove the table's columns from st_columns. @@ -906,17 +902,21 @@ impl Inner { fn rename_table(&mut self, table_id: TableId, new_name: &str) -> super::Result<()> { // Update the table's name in st_tables. const ST_TABLES_TABLE_ID_COL: ColId = ColId(0); - let rows = self - .iter_by_col_eq(&ST_TABLES_ID, ST_TABLES_TABLE_ID_COL, table_id.into())? - .map(|x| x.view().clone()) - .collect::>(); - assert!(rows.len() <= 1, "Expected at most one row in st_tables for table_id"); - let row = rows.first().ok_or_else(|| TableError::IdNotFound(table_id.0))?; - let row_id = RowId(row.to_data_key()); - let mut el = StTableRow::try_from(row)?; + let mut row_iter = self.iter_by_col_eq(&ST_TABLES_ID, ST_TABLES_TABLE_ID_COL, table_id.into())?; + + let row = row_iter.next().ok_or_else(|| TableError::IdNotFound(table_id.0))?; + let row_id = RowId(*row.id); + let mut el = StTableRow::try_from(row.view())?; el.table_name = new_name; - self.delete(&ST_TABLES_ID, &row_id)?; - self.insert(ST_TABLES_ID, (&el).into())?; + let new_row = (&el).into(); + + assert!( + row_iter.next().is_none(), + "Expected at most one row in st_tables for table_id" + ); + + self.delete(&ST_TABLES_ID, [row_id]); + self.insert(ST_TABLES_ID, new_row)?; Ok(()) } @@ -1037,7 +1037,7 @@ impl Inner { .unwrap() .data; let old_index_row_id = RowId(old_index_row.to_data_key()); - self.delete(&ST_INDEXES_ID, &old_index_row_id)?; + self.delete(&ST_INDEXES_ID, [old_index_row_id]); self.drop_index_internal(&index_id); @@ -1387,8 +1387,11 @@ impl Inner { .map(|table| table.get_schema()) } - fn delete(&mut self, table_id: &TableId, row_id: &RowId) -> super::Result { - Ok(self.delete_row_internal(table_id, row_id)) + fn delete(&mut self, table_id: &TableId, row_ids: impl IntoIterator) -> u32 { + row_ids + .into_iter() + .map(|row_id| self.delete_row_internal(table_id, &row_id) as u32) + .sum() } fn delete_row_internal(&mut self, table_id: &TableId, row_id: &RowId) -> bool { @@ -1419,19 +1422,8 @@ impl Inner { } } - fn delete_by_rel( - &mut self, - table_id: &TableId, - relation: impl IntoIterator, - ) -> super::Result> { - let mut count = 0; - for tuple in relation { - let data_key = tuple.to_data_key(); - if self.delete(table_id, &RowId(data_key))? { - count += 1; - } - } - Ok(Some(count)) + fn delete_by_rel(&mut self, table_id: &TableId, relation: impl IntoIterator) -> u32 { + self.delete(table_id, relation.into_iter().map(|pv| RowId(pv.to_data_key()))) } fn iter(&self, table_id: &TableId) -> super::Result { @@ -2084,17 +2076,17 @@ impl MutTxDatastore for Locking { &'a self, tx: &'a mut Self::MutTxId, table_id: TableId, - row_id: Self::RowId, - ) -> super::Result { - tx.lock.delete(&table_id, &row_id) + row_ids: impl IntoIterator, + ) -> u32 { + tx.lock.delete(&table_id, row_ids) } - fn delete_by_rel_mut_tx>( + fn delete_by_rel_mut_tx( &self, tx: &mut Self::MutTxId, table_id: TableId, - relation: R, - ) -> super::Result> { + relation: impl IntoIterator, + ) -> u32 { tx.lock.delete_by_rel(&table_id, relation) } @@ -2129,7 +2121,7 @@ impl traits::MutProgrammable for Locking { if fence <= row.epoch.0 { return Err(anyhow!("stale fencing token: {}, storage is at epoch: {}", fence, row.epoch).into()); } - tx.lock.delete_by_rel(&ST_MODULE_ID, Some(ProductValue::from(&row)))?; + tx.lock.delete_by_rel(&ST_MODULE_ID, Some(ProductValue::from(&row))); } tx.lock.insert( @@ -2676,8 +2668,8 @@ mod tests { datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(); let created_row = u32_str_u32(1, "Foo", 18); - let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, vec![created_row])?; - assert_eq!(num_deleted, Some(1)); + let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, [created_row]); + assert_eq!(num_deleted, 1); assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0); let created_row = u32_str_u32(1, "Foo", 19); datastore.insert_mut_tx(&mut tx, table_id, created_row)?; @@ -2693,8 +2685,8 @@ mod tests { datastore.insert_mut_tx(&mut tx, table_id, row)?; for _ in 0..2 { let created_row = u32_str_u32(1, "Foo", 18); - let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, vec![created_row.clone()])?; - assert_eq!(num_deleted, Some(1)); + let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, [created_row.clone()]); + assert_eq!(num_deleted, 1); assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0); datastore.insert_mut_tx(&mut tx, table_id, created_row)?; #[rustfmt::skip] @@ -2930,8 +2922,8 @@ mod tests { assert_eq!(rows.len(), 1); assert_eq!(row, rows[0]); // Delete the row. - let count_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, rows)?; - assert_eq!(count_deleted, Some(1)); + let count_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, rows); + assert_eq!(count_deleted, 1); // We shouldn't see the row when iterating now that it's deleted. assert_eq!(all_rows_col_0_eq_1(&tx).len(), 0); diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index dc3b438a695..491ef3dc3bb 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -546,13 +546,18 @@ pub trait MutTxDatastore: TxDatastore + MutTx { table_id: TableId, row_id: &'a Self::RowId, ) -> Result>>; - fn delete_mut_tx<'a>(&'a self, tx: &'a mut Self::MutTxId, table_id: TableId, row_id: Self::RowId) -> Result; - fn delete_by_rel_mut_tx>( + fn delete_mut_tx<'a>( + &'a self, + tx: &'a mut Self::MutTxId, + table_id: TableId, + row_ids: impl IntoIterator, + ) -> u32; + fn delete_by_rel_mut_tx( &self, tx: &mut Self::MutTxId, table_id: TableId, - relation: R, - ) -> Result>; + relation: impl IntoIterator, + ) -> u32; fn insert_mut_tx<'a>( &'a self, tx: &'a mut Self::MutTxId, diff --git a/crates/core/src/db/db_metrics/mod.rs b/crates/core/src/db/db_metrics/mod.rs index 8cbc420f071..9ebc21c25b0 100644 --- a/crates/core/src/db/db_metrics/mod.rs +++ b/crates/core/src/db/db_metrics/mod.rs @@ -46,7 +46,12 @@ metrics_group!( pub rdb_insert_row_time: HistogramVec, #[name = spacetime_rdb_delete_in_time] - #[help = "The time spent deleting values in a set from a table"] + #[help = "The time spent deleting values by row id from a table"] + #[labels(table_id: u32)] + pub rdb_delete: HistogramVec, + + #[name = spacetime_rdb_delete_in_time] + #[help = "The time spent deleting values by a relation from a table"] #[labels(table_id: u32)] pub rdb_delete_by_rel_time: HistogramVec, } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 87096cfe59b..597cb999731 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,5 +1,5 @@ use super::commit_log::{CommitLog, CommitLogView}; -use super::datastore::locking_tx_datastore::{DataRef, Iter, IterByColEq, IterByColRange, MutTxId, RowId}; +use super::datastore::locking_tx_datastore::{DataRef, Iter, IterByColEq, IterByColRange, Locking, MutTxId, RowId}; use super::datastore::traits::{ ColId, DataRow, IndexDef, IndexId, MutProgrammable, MutTx, MutTxDatastore, Programmable, SequenceDef, SequenceId, TableDef, TableId, TableSchema, TxData, @@ -26,8 +26,6 @@ use std::ops::RangeBounds; use std::path::Path; use std::sync::{Arc, Mutex}; -use super::datastore::locking_tx_datastore::Locking; - pub const ST_TABLES_NAME: &str = "st_table"; pub const ST_COLUMNS_NAME: &str = "st_columns"; pub const ST_SEQUENCES_NAME: &str = "st_sequence"; @@ -528,17 +526,19 @@ impl RelationalDB { self.insert(tx, table_id, row) } + pub fn delete(&self, tx: &mut MutTxId, table_id: u32, row_ids: impl IntoIterator) -> u32 { + let _guard = DB_METRICS.rdb_delete.with_label_values(&table_id).start_timer(); + + self.inner.delete_mut_tx(tx, TableId(table_id), row_ids) + } + #[tracing::instrument(skip_all)] - pub fn delete_by_rel( - &self, - tx: &mut MutTxId, - table_id: u32, - relation: R, - ) -> Result, DBError> { + pub fn delete_by_rel(&self, tx: &mut MutTxId, table_id: u32, relation: R) -> u32 { let _guard = DB_METRICS .rdb_delete_by_rel_time .with_label_values(&table_id) .start_timer(); + self.inner.delete_by_rel_mut_tx(tx, TableId(table_id), relation) } @@ -547,9 +547,9 @@ impl RelationalDB { pub fn clear_table(&self, tx: &mut MutTxId, table_id: u32) -> Result<(), DBError> { let relation = self .iter(tx, table_id)? - .map(|data| data.view().clone()) + .map(|data| RowId(*data.id())) .collect::>(); - self.delete_by_rel(tx, table_id, relation)?; + self.delete(tx, table_id, relation); Ok(()) } diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 2f1f24c4e5e..28978969d2d 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -1,12 +1,13 @@ use nonempty::NonEmpty; use parking_lot::{Mutex, MutexGuard}; use spacetimedb_lib::{bsatn, ProductValue}; +use std::num::NonZeroU32; use std::ops::DerefMut; use std::sync::Arc; use crate::database_instance_context::DatabaseInstanceContext; use crate::database_logger::{BacktraceProvider, LogLevel, Record}; -use crate::db::datastore::locking_tx_datastore::MutTxId; +use crate::db::datastore::locking_tx_datastore::{MutTxId, RowId}; use crate::db::datastore::traits::{ColId, IndexDef}; use crate::error::{IndexError, NodesError}; use crate::util::ResultInspectExt; @@ -144,7 +145,7 @@ impl InstanceEnv { /// /// Returns an error if no columns were deleted or if the column wasn't found. #[tracing::instrument(skip(self, value))] - pub fn delete_by_col_eq(&self, table_id: u32, col_id: u32, value: &[u8]) -> Result { + pub fn delete_by_col_eq(&self, table_id: u32, col_id: u32, value: &[u8]) -> Result { let stdb = &*self.dbic.relational_db; let tx = &mut *self.get_tx()?; @@ -152,16 +153,14 @@ impl InstanceEnv { let eq_value = stdb.decode_column(tx, table_id, col_id, value)?; // Find all rows in the table where the column data equates to `value`. - let seek = stdb.iter_by_col_eq(tx, table_id, ColId(col_id), eq_value)?; - let seek = seek.map(|x| x.view().clone()).collect::>(); + let rows_to_delete = stdb + .iter_by_col_eq(tx, table_id, ColId(col_id), eq_value)? + .map(|x| RowId(*x.id())) + .collect::>(); // Delete them and count how many we deleted and error if none. - let count = stdb - .delete_by_rel(tx, table_id, seek) - .inspect_err_(|e| log::error!("delete_by_col_eq(table_id: {table_id}): {e}"))? - .ok_or(NodesError::ColumnValueNotFound)?; - - Ok(count) + let count = stdb.delete(tx, table_id, rows_to_delete); + NonZeroU32::new(count).ok_or(NodesError::ColumnValueNotFound) } /// Returns the `table_id` associated with the given `table_name`. diff --git a/crates/core/src/host/wasmer/wasm_instance_env.rs b/crates/core/src/host/wasmer/wasm_instance_env.rs index ac221faaffb..13796fbaf0e 100644 --- a/crates/core/src/host/wasmer/wasm_instance_env.rs +++ b/crates/core/src/host/wasmer/wasm_instance_env.rs @@ -427,7 +427,8 @@ impl WasmInstanceEnv { ) -> RtResult { Self::cvt_ret(caller, "delete_by_col_eq", Call::DeleteByColEq, out, |caller, mem| { let value = mem.read_bytes(&caller, value, value_len)?; - Ok(caller.data().instance_env.delete_by_col_eq(table_id, col_id, &value)?) + let count = caller.data().instance_env.delete_by_col_eq(table_id, col_id, &value)?; + Ok(count.get()) }) } diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index ad7564537d7..cd7fbf7dafb 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -247,9 +247,8 @@ mod tests { Ok(()) } - fn delete_row(db: &RelationalDB, tx: &mut MutTxId, table_id: u32, row: ProductValue) -> ResultTest<()> { - db.delete_by_rel(tx, table_id, vec![row])?; - Ok(()) + fn delete_row(db: &RelationalDB, tx: &mut MutTxId, table_id: u32, row: ProductValue) { + db.delete_by_rel(tx, table_id, [row]); } fn make_data( @@ -538,7 +537,7 @@ mod tests { let r1 = product!(10, 0, 2); let r2 = product!(10, 0, 3); - delete_row(&db, &mut tx, rhs_id, r1.clone())?; + delete_row(&db, &mut tx, rhs_id, r1.clone()); insert_row(&db, &mut tx, rhs_id, r2.clone())?; let updates = vec![ @@ -554,7 +553,7 @@ mod tests { // Clean up tx insert_row(&db, &mut tx, rhs_id, r1.clone())?; - delete_row(&db, &mut tx, rhs_id, r2.clone())?; + delete_row(&db, &mut tx, rhs_id, r2.clone()); } // Case 2: Delete a row outside the region and insert back outside the region @@ -563,7 +562,7 @@ mod tests { let r2 = product!(13, 3, 6); insert_row(&db, &mut tx, rhs_id, r1.clone())?; - delete_row(&db, &mut tx, rhs_id, r2.clone())?; + delete_row(&db, &mut tx, rhs_id, r2.clone()); let updates = vec![ delete_op(rhs_id, "rhs", r1.clone()), @@ -578,7 +577,7 @@ mod tests { // Clean up tx insert_row(&db, &mut tx, rhs_id, r1.clone())?; - delete_row(&db, &mut tx, rhs_id, r2.clone())?; + delete_row(&db, &mut tx, rhs_id, r2.clone()); } // Case 3: Delete a row inside the region and insert back outside the region @@ -586,7 +585,7 @@ mod tests { let r1 = product!(10, 0, 2); let r2 = product!(10, 0, 5); - delete_row(&db, &mut tx, rhs_id, r1.clone())?; + delete_row(&db, &mut tx, rhs_id, r1.clone()); insert_row(&db, &mut tx, rhs_id, r2.clone())?; let updates = vec![ @@ -603,7 +602,7 @@ mod tests { // Clean up tx insert_row(&db, &mut tx, rhs_id, r1.clone())?; - delete_row(&db, &mut tx, rhs_id, r2.clone())?; + delete_row(&db, &mut tx, rhs_id, r2.clone()); } // Case 4: Delete a row outside the region and insert back inside the region @@ -611,7 +610,7 @@ mod tests { let r1 = product!(13, 3, 5); let r2 = product!(13, 3, 4); - delete_row(&db, &mut tx, rhs_id, r1.clone())?; + delete_row(&db, &mut tx, rhs_id, r1.clone()); insert_row(&db, &mut tx, rhs_id, r2.clone())?; let updates = vec![ @@ -628,7 +627,7 @@ mod tests { // Clean up tx insert_row(&db, &mut tx, rhs_id, r1.clone())?; - delete_row(&db, &mut tx, rhs_id, r2.clone())?; + delete_row(&db, &mut tx, rhs_id, r2.clone()); } // Case 5: Insert a row into lhs and insert a matching row inside the region of rhs @@ -652,8 +651,8 @@ mod tests { assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(5, 10))); // Clean up tx - delete_row(&db, &mut tx, lhs_id, lhs_row.clone())?; - delete_row(&db, &mut tx, rhs_id, rhs_row.clone())?; + delete_row(&db, &mut tx, lhs_id, lhs_row.clone()); + delete_row(&db, &mut tx, rhs_id, rhs_row.clone()); } // Case 6: Insert a row into lhs and insert a matching row outside the region of rhs @@ -676,8 +675,8 @@ mod tests { assert_eq!(result.tables.len(), 0); // Clean up tx - delete_row(&db, &mut tx, lhs_id, lhs_row.clone())?; - delete_row(&db, &mut tx, rhs_id, rhs_row.clone())?; + delete_row(&db, &mut tx, lhs_id, lhs_row.clone()); + delete_row(&db, &mut tx, rhs_id, rhs_row.clone()); } // Case 7: Delete a row from lhs and delete a matching row inside the region of rhs @@ -685,8 +684,8 @@ mod tests { let lhs_row = product!(0, 5); let rhs_row = product!(10, 0, 2); - delete_row(&db, &mut tx, lhs_id, lhs_row.clone())?; - delete_row(&db, &mut tx, rhs_id, rhs_row.clone())?; + delete_row(&db, &mut tx, lhs_id, lhs_row.clone()); + delete_row(&db, &mut tx, rhs_id, rhs_row.clone()); let updates = vec![ delete_op(lhs_id, "lhs", lhs_row.clone()), @@ -710,8 +709,8 @@ mod tests { let lhs_row = product!(3, 8); let rhs_row = product!(13, 3, 5); - delete_row(&db, &mut tx, lhs_id, lhs_row.clone())?; - delete_row(&db, &mut tx, rhs_id, rhs_row.clone())?; + delete_row(&db, &mut tx, lhs_id, lhs_row.clone()); + delete_row(&db, &mut tx, rhs_id, rhs_row.clone()); let updates = vec![ delete_op(lhs_id, "lhs", lhs_row.clone()), diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index bbbfaa70be5..461b6245172 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -290,7 +290,7 @@ impl<'db, 'tx> DbProgram<'db, 'tx> { // TODO: How do we deal with mutating values? Table::MemTable(_) => Err(ErrorVm::Other(anyhow::anyhow!("How deal with mutating values?"))), Table::DbTable(t) => { - let count = self.db.delete_by_rel(self.tx, t.table_id, rows)?; + let count = self.db.delete_by_rel(self.tx, t.table_id, rows); Ok(Code::Value(count.into())) } }