Skip to content

Commit

Permalink
Optimize drop_table, rename_table, delete_by_col_eq, and `clear…
Browse files Browse the repository at this point in the history
…_table` (#436)

* optimize delete, drop, rename

* address review comments
  • Loading branch information
Centril committed Oct 19, 2023
1 parent b94ced9 commit 25c43f5
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 112 deletions.
122 changes: 57 additions & 65 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,26 +556,24 @@ impl Inner {
Ok(())
}

fn drop_table_from_st_tables(&mut self, table_id: TableId) -> super::Result<()> {
const ST_TABLES_TABLE_ID_COL: ColId = ColId(0);
let rows = self.iter_by_col_eq(&ST_TABLES_ID, ST_TABLES_TABLE_ID_COL, table_id.into())?;
let rows = rows.map(|row| row.view().to_owned()).collect::<Vec<_>>();
if rows.is_empty() {
fn drop_col_eq(&mut self, table_id: TableId, col_id: ColId, value: AlgebraicValue) -> super::Result<()> {
let rows = self.iter_by_col_eq(&table_id, col_id, value)?;
let ids_to_delete = rows.map(|row| RowId(*row.id())).collect::<Vec<_>>();
if ids_to_delete.is_empty() {
return Err(TableError::IdNotFound(table_id.0).into());
}
self.delete_by_rel(&table_id, rows)?;
self.delete(&table_id, ids_to_delete);
Ok(())
}

fn drop_table_from_st_tables(&mut self, table_id: TableId) -> super::Result<()> {
const ST_TABLES_TABLE_ID_COL: ColId = ColId(0);
self.drop_col_eq(ST_TABLES_ID, ST_TABLES_TABLE_ID_COL, table_id.into())
}

fn drop_table_from_st_columns(&mut self, table_id: TableId) -> super::Result<()> {
const ST_COLUMNS_TABLE_ID_COL: ColId = ColId(0);
let rows = self.iter_by_col_eq(&ST_COLUMNS_ID, ST_COLUMNS_TABLE_ID_COL, table_id.into())?;
let rows = rows.map(|row| row.view().to_owned()).collect::<Vec<_>>();
if rows.is_empty() {
return Err(TableError::IdNotFound(table_id.0).into());
}
self.delete_by_rel(&table_id, rows)?;
Ok(())
self.drop_col_eq(ST_COLUMNS_ID, ST_COLUMNS_TABLE_ID_COL, table_id.into())
}

#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -611,7 +609,7 @@ impl Inner {
(seq_row, old_seq_row_id)
};

self.delete(&ST_SEQUENCES_ID, &old_seq_row_id)?;
self.delete(&ST_SEQUENCES_ID, [old_seq_row_id]);
self.insert(ST_SEQUENCES_ID, ProductValue::from(seq_row))?;

let Some(sequence) = self.sequence_state.get_sequence_mut(seq_id) else {
Expand Down Expand Up @@ -667,7 +665,7 @@ impl Inner {
.unwrap()
.data;
let old_seq_row_id = RowId(old_seq_row.to_data_key());
self.delete(&ST_SEQUENCES_ID, &old_seq_row_id)?;
self.delete(&ST_SEQUENCES_ID, [old_seq_row_id]);
self.sequence_state.sequences.remove(&seq_id);
Ok(())
}
Expand Down Expand Up @@ -870,24 +868,22 @@ impl Inner {
fn drop_table(&mut self, table_id: TableId) -> super::Result<()> {
// First drop the tables indexes.
const ST_INDEXES_TABLE_ID_COL: ColId = ColId(1);
let rows = self
let indexes = self
.iter_by_col_eq(&ST_INDEXES_ID, ST_INDEXES_TABLE_ID_COL, table_id.into())?
.map(|x| x.view().clone())
.collect::<Vec<_>>();
for row in rows {
let el = StIndexRow::try_from(&row)?;
self.drop_index(IndexId(el.index_id))?;
.map(|row| StIndexRow::try_from(row.view()).map(|el| el.index_id))
.collect::<Result<Vec<_>, _>>()?;
for id in indexes {
self.drop_index(IndexId(id))?;
}

// Remove the table's sequences from st_sequences.
const ST_SEQUENCES_TABLE_ID_COL: ColId = ColId(2);
let rows = self
.iter_by_col_eq(&ST_SEQUENCES_ID, ST_SEQUENCES_TABLE_ID_COL, table_id.into())?
.map(|x| x.view().clone())
.collect::<Vec<_>>();
for row in rows {
let el = StSequenceRow::try_from(&row)?;
self.drop_sequence(SequenceId(el.sequence_id))?;
.map(|row| StSequenceRow::try_from(row.view()).map(|el| el.sequence_id))
.collect::<Result<Vec<_>, _>>()?;
for id in rows {
self.drop_sequence(SequenceId(id))?;
}

// Remove the table's columns from st_columns.
Expand All @@ -906,17 +902,21 @@ impl Inner {
fn rename_table(&mut self, table_id: TableId, new_name: &str) -> super::Result<()> {
// Update the table's name in st_tables.
const ST_TABLES_TABLE_ID_COL: ColId = ColId(0);
let rows = self
.iter_by_col_eq(&ST_TABLES_ID, ST_TABLES_TABLE_ID_COL, table_id.into())?
.map(|x| x.view().clone())
.collect::<Vec<_>>();
assert!(rows.len() <= 1, "Expected at most one row in st_tables for table_id");
let row = rows.first().ok_or_else(|| TableError::IdNotFound(table_id.0))?;
let row_id = RowId(row.to_data_key());
let mut el = StTableRow::try_from(row)?;
let mut row_iter = self.iter_by_col_eq(&ST_TABLES_ID, ST_TABLES_TABLE_ID_COL, table_id.into())?;

let row = row_iter.next().ok_or_else(|| TableError::IdNotFound(table_id.0))?;
let row_id = RowId(*row.id);
let mut el = StTableRow::try_from(row.view())?;
el.table_name = new_name;
self.delete(&ST_TABLES_ID, &row_id)?;
self.insert(ST_TABLES_ID, (&el).into())?;
let new_row = (&el).into();

assert!(
row_iter.next().is_none(),
"Expected at most one row in st_tables for table_id"
);

self.delete(&ST_TABLES_ID, [row_id]);
self.insert(ST_TABLES_ID, new_row)?;
Ok(())
}

Expand Down Expand Up @@ -1037,7 +1037,7 @@ impl Inner {
.unwrap()
.data;
let old_index_row_id = RowId(old_index_row.to_data_key());
self.delete(&ST_INDEXES_ID, &old_index_row_id)?;
self.delete(&ST_INDEXES_ID, [old_index_row_id]);

self.drop_index_internal(&index_id);

Expand Down Expand Up @@ -1387,8 +1387,11 @@ impl Inner {
.map(|table| table.get_schema())
}

fn delete(&mut self, table_id: &TableId, row_id: &RowId) -> super::Result<bool> {
Ok(self.delete_row_internal(table_id, row_id))
fn delete(&mut self, table_id: &TableId, row_ids: impl IntoIterator<Item = RowId>) -> u32 {
row_ids
.into_iter()
.map(|row_id| self.delete_row_internal(table_id, &row_id) as u32)
.sum()
}

fn delete_row_internal(&mut self, table_id: &TableId, row_id: &RowId) -> bool {
Expand Down Expand Up @@ -1419,19 +1422,8 @@ impl Inner {
}
}

fn delete_by_rel(
&mut self,
table_id: &TableId,
relation: impl IntoIterator<Item = spacetimedb_sats::ProductValue>,
) -> super::Result<Option<u32>> {
let mut count = 0;
for tuple in relation {
let data_key = tuple.to_data_key();
if self.delete(table_id, &RowId(data_key))? {
count += 1;
}
}
Ok(Some(count))
fn delete_by_rel(&mut self, table_id: &TableId, relation: impl IntoIterator<Item = ProductValue>) -> u32 {
self.delete(table_id, relation.into_iter().map(|pv| RowId(pv.to_data_key())))
}

fn iter(&self, table_id: &TableId) -> super::Result<Iter> {
Expand Down Expand Up @@ -2084,17 +2076,17 @@ impl MutTxDatastore for Locking {
&'a self,
tx: &'a mut Self::MutTxId,
table_id: TableId,
row_id: Self::RowId,
) -> super::Result<bool> {
tx.lock.delete(&table_id, &row_id)
row_ids: impl IntoIterator<Item = Self::RowId>,
) -> u32 {
tx.lock.delete(&table_id, row_ids)
}

fn delete_by_rel_mut_tx<R: IntoIterator<Item = spacetimedb_sats::ProductValue>>(
fn delete_by_rel_mut_tx(
&self,
tx: &mut Self::MutTxId,
table_id: TableId,
relation: R,
) -> super::Result<Option<u32>> {
relation: impl IntoIterator<Item = ProductValue>,
) -> u32 {
tx.lock.delete_by_rel(&table_id, relation)
}

Expand Down Expand Up @@ -2129,7 +2121,7 @@ impl traits::MutProgrammable for Locking {
if fence <= row.epoch.0 {
return Err(anyhow!("stale fencing token: {}, storage is at epoch: {}", fence, row.epoch).into());
}
tx.lock.delete_by_rel(&ST_MODULE_ID, Some(ProductValue::from(&row)))?;
tx.lock.delete_by_rel(&ST_MODULE_ID, Some(ProductValue::from(&row)));
}

tx.lock.insert(
Expand Down Expand Up @@ -2676,8 +2668,8 @@ mod tests {
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx();
let created_row = u32_str_u32(1, "Foo", 18);
let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, vec![created_row])?;
assert_eq!(num_deleted, Some(1));
let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, [created_row]);
assert_eq!(num_deleted, 1);
assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0);
let created_row = u32_str_u32(1, "Foo", 19);
datastore.insert_mut_tx(&mut tx, table_id, created_row)?;
Expand All @@ -2693,8 +2685,8 @@ mod tests {
datastore.insert_mut_tx(&mut tx, table_id, row)?;
for _ in 0..2 {
let created_row = u32_str_u32(1, "Foo", 18);
let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, vec![created_row.clone()])?;
assert_eq!(num_deleted, Some(1));
let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, [created_row.clone()]);
assert_eq!(num_deleted, 1);
assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0);
datastore.insert_mut_tx(&mut tx, table_id, created_row)?;
#[rustfmt::skip]
Expand Down Expand Up @@ -2930,8 +2922,8 @@ mod tests {
assert_eq!(rows.len(), 1);
assert_eq!(row, rows[0]);
// Delete the row.
let count_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, rows)?;
assert_eq!(count_deleted, Some(1));
let count_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, rows);
assert_eq!(count_deleted, 1);

// We shouldn't see the row when iterating now that it's deleted.
assert_eq!(all_rows_col_0_eq_1(&tx).len(), 0);
Expand Down
13 changes: 9 additions & 4 deletions crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,18 @@ pub trait MutTxDatastore: TxDatastore + MutTx {
table_id: TableId,
row_id: &'a Self::RowId,
) -> Result<Option<Self::DataRef<'a>>>;
fn delete_mut_tx<'a>(&'a self, tx: &'a mut Self::MutTxId, table_id: TableId, row_id: Self::RowId) -> Result<bool>;
fn delete_by_rel_mut_tx<R: IntoIterator<Item = ProductValue>>(
fn delete_mut_tx<'a>(
&'a self,
tx: &'a mut Self::MutTxId,
table_id: TableId,
row_ids: impl IntoIterator<Item = Self::RowId>,
) -> u32;
fn delete_by_rel_mut_tx(
&self,
tx: &mut Self::MutTxId,
table_id: TableId,
relation: R,
) -> Result<Option<u32>>;
relation: impl IntoIterator<Item = ProductValue>,
) -> u32;
fn insert_mut_tx<'a>(
&'a self,
tx: &'a mut Self::MutTxId,
Expand Down
7 changes: 6 additions & 1 deletion crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ metrics_group!(
pub rdb_insert_row_time: HistogramVec,

#[name = spacetime_rdb_delete_in_time]
#[help = "The time spent deleting values in a set from a table"]
#[help = "The time spent deleting values by row id from a table"]
#[labels(table_id: u32)]
pub rdb_delete: HistogramVec,

#[name = spacetime_rdb_delete_in_time]
#[help = "The time spent deleting values by a relation from a table"]
#[labels(table_id: u32)]
pub rdb_delete_by_rel_time: HistogramVec,
}
Expand Down
22 changes: 11 additions & 11 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::commit_log::{CommitLog, CommitLogView};
use super::datastore::locking_tx_datastore::{DataRef, Iter, IterByColEq, IterByColRange, MutTxId, RowId};
use super::datastore::locking_tx_datastore::{DataRef, Iter, IterByColEq, IterByColRange, Locking, MutTxId, RowId};
use super::datastore::traits::{
ColId, DataRow, IndexDef, IndexId, MutProgrammable, MutTx, MutTxDatastore, Programmable, SequenceDef, SequenceId,
TableDef, TableId, TableSchema, TxData,
Expand All @@ -26,8 +26,6 @@ use std::ops::RangeBounds;
use std::path::Path;
use std::sync::{Arc, Mutex};

use super::datastore::locking_tx_datastore::Locking;

pub const ST_TABLES_NAME: &str = "st_table";
pub const ST_COLUMNS_NAME: &str = "st_columns";
pub const ST_SEQUENCES_NAME: &str = "st_sequence";
Expand Down Expand Up @@ -528,17 +526,19 @@ impl RelationalDB {
self.insert(tx, table_id, row)
}

pub fn delete(&self, tx: &mut MutTxId, table_id: u32, row_ids: impl IntoIterator<Item = RowId>) -> u32 {
let _guard = DB_METRICS.rdb_delete.with_label_values(&table_id).start_timer();

self.inner.delete_mut_tx(tx, TableId(table_id), row_ids)
}

#[tracing::instrument(skip_all)]
pub fn delete_by_rel<R: Relation>(
&self,
tx: &mut MutTxId,
table_id: u32,
relation: R,
) -> Result<Option<u32>, DBError> {
pub fn delete_by_rel<R: Relation>(&self, tx: &mut MutTxId, table_id: u32, relation: R) -> u32 {
let _guard = DB_METRICS
.rdb_delete_by_rel_time
.with_label_values(&table_id)
.start_timer();

self.inner.delete_by_rel_mut_tx(tx, TableId(table_id), relation)
}

Expand All @@ -547,9 +547,9 @@ impl RelationalDB {
pub fn clear_table(&self, tx: &mut MutTxId, table_id: u32) -> Result<(), DBError> {
let relation = self
.iter(tx, table_id)?
.map(|data| data.view().clone())
.map(|data| RowId(*data.id()))
.collect::<Vec<_>>();
self.delete_by_rel(tx, table_id, relation)?;
self.delete(tx, table_id, relation);
Ok(())
}

Expand Down
19 changes: 9 additions & 10 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use nonempty::NonEmpty;
use parking_lot::{Mutex, MutexGuard};
use spacetimedb_lib::{bsatn, ProductValue};
use std::num::NonZeroU32;
use std::ops::DerefMut;
use std::sync::Arc;

use crate::database_instance_context::DatabaseInstanceContext;
use crate::database_logger::{BacktraceProvider, LogLevel, Record};
use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::db::datastore::locking_tx_datastore::{MutTxId, RowId};
use crate::db::datastore::traits::{ColId, IndexDef};
use crate::error::{IndexError, NodesError};
use crate::util::ResultInspectExt;
Expand Down Expand Up @@ -144,24 +145,22 @@ impl InstanceEnv {
///
/// Returns an error if no columns were deleted or if the column wasn't found.
#[tracing::instrument(skip(self, value))]
pub fn delete_by_col_eq(&self, table_id: u32, col_id: u32, value: &[u8]) -> Result<u32, NodesError> {
pub fn delete_by_col_eq(&self, table_id: u32, col_id: u32, value: &[u8]) -> Result<NonZeroU32, NodesError> {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.get_tx()?;

// Interpret the `value` using the schema of the column.
let eq_value = stdb.decode_column(tx, table_id, col_id, value)?;

// Find all rows in the table where the column data equates to `value`.
let seek = stdb.iter_by_col_eq(tx, table_id, ColId(col_id), eq_value)?;
let seek = seek.map(|x| x.view().clone()).collect::<Vec<_>>();
let rows_to_delete = stdb
.iter_by_col_eq(tx, table_id, ColId(col_id), eq_value)?
.map(|x| RowId(*x.id()))
.collect::<Vec<_>>();

// Delete them and count how many we deleted and error if none.
let count = stdb
.delete_by_rel(tx, table_id, seek)
.inspect_err_(|e| log::error!("delete_by_col_eq(table_id: {table_id}): {e}"))?
.ok_or(NodesError::ColumnValueNotFound)?;

Ok(count)
let count = stdb.delete(tx, table_id, rows_to_delete);
NonZeroU32::new(count).ok_or(NodesError::ColumnValueNotFound)
}

/// Returns the `table_id` associated with the given `table_name`.
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/host/wasmer/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ impl WasmInstanceEnv {
) -> RtResult<u16> {
Self::cvt_ret(caller, "delete_by_col_eq", Call::DeleteByColEq, out, |caller, mem| {
let value = mem.read_bytes(&caller, value, value_len)?;
Ok(caller.data().instance_env.delete_by_col_eq(table_id, col_id, &value)?)
let count = caller.data().instance_env.delete_by_col_eq(table_id, col_id, &value)?;
Ok(count.get())
})
}

Expand Down
Loading

0 comments on commit 25c43f5

Please sign in to comment.