Skip to content

Commit

Permalink
test(414): multi-column index scan (#415)
Browse files Browse the repository at this point in the history
Closes #414.

Adds a test for calling iter_by_col_eq with multiple columns.
Ensures the index is scanned if an applicable multi-column index is present.
To write this test, iter_by_col_eq was updated to take multiple columns.
  • Loading branch information
joshua-spacetime authored Oct 12, 2023
1 parent a836aed commit ca532c9
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 76 deletions.
5 changes: 3 additions & 2 deletions crates/bench/src/spacetime_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
schemas::{table_name, BenchTable, IndexStrategy},
ResultBench,
};
use spacetimedb::db::datastore::traits::{IndexDef, TableDef};
use spacetimedb::db::datastore::traits::{ColId, IndexDef, TableDef};
use spacetimedb::db::relational_db::{open_db, RelationalDB};
use spacetimedb_lib::sats::AlgebraicValue;
use std::hint::black_box;
Expand Down Expand Up @@ -107,8 +107,9 @@ impl BenchDatabase for SpacetimeRaw {
column_index: u32,
value: AlgebraicValue,
) -> ResultBench<()> {
let col: ColId = column_index.into();
self.db.with_auto_commit(|tx| {
for row in self.db.iter_by_col_eq(tx, *table_id, column_index, value)? {
for row in self.db.iter_by_col_eq(tx, *table_id, col, value)? {
black_box(row);
}
Ok(())
Expand Down
81 changes: 38 additions & 43 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl CommittedState {
pub fn index_seek<'a>(
&'a self,
table_id: &TableId,
cols: NonEmpty<ColId>,
cols: &NonEmpty<ColId>,
range: &impl RangeBounds<AlgebraicValue>,
) -> Option<BTreeIndexRangeIter<'a>> {
if let Some(table) = self.tables.get(table_id) {
Expand Down Expand Up @@ -323,7 +323,7 @@ impl TxState {
pub fn index_seek<'a>(
&'a self,
table_id: &TableId,
cols: NonEmpty<ColId>,
cols: &NonEmpty<ColId>,
range: &impl RangeBounds<AlgebraicValue>,
) -> Option<BTreeIndexRangeIter<'a>> {
self.insert_tables.get(table_id)?.index_seek(cols, range)
Expand Down Expand Up @@ -582,7 +582,7 @@ impl Inner {

fn drop_table_from_st_tables(&mut self, table_id: TableId) -> super::Result<()> {
const ST_TABLES_TABLE_ID_COL: ColId = ColId(0);
let rows = self.iter_by_col_eq(&ST_TABLES_ID, &ST_TABLES_TABLE_ID_COL, table_id.into())?;
let rows = self.iter_by_col_eq(&ST_TABLES_ID, ST_TABLES_TABLE_ID_COL, table_id.into())?;
let rows = rows.map(|row| row.view().to_owned()).collect::<Vec<_>>();
if rows.is_empty() {
return Err(TableError::IdNotFound(table_id.0).into());
Expand All @@ -593,7 +593,7 @@ impl Inner {

fn drop_table_from_st_columns(&mut self, table_id: TableId) -> super::Result<()> {
const ST_COLUMNS_TABLE_ID_COL: ColId = ColId(0);
let rows = self.iter_by_col_eq(&ST_COLUMNS_ID, &ST_COLUMNS_TABLE_ID_COL, table_id.into())?;
let rows = self.iter_by_col_eq(&ST_COLUMNS_ID, ST_COLUMNS_TABLE_ID_COL, table_id.into())?;
let rows = rows.map(|row| row.view().to_owned()).collect::<Vec<_>>();
if rows.is_empty() {
return Err(TableError::IdNotFound(table_id.0).into());
Expand All @@ -619,7 +619,7 @@ impl Inner {
// If we're out of allocations, then update the sequence row in st_sequences to allocate a fresh batch of sequences.
const ST_SEQUENCES_SEQUENCE_ID_COL: ColId = ColId(0);
let old_seq_row = self
.iter_by_col_eq(&ST_SEQUENCES_ID, &ST_SEQUENCES_SEQUENCE_ID_COL, seq_id.into())?
.iter_by_col_eq(&ST_SEQUENCES_ID, ST_SEQUENCES_SEQUENCE_ID_COL, seq_id.into())?
.last()
.unwrap()
.data;
Expand Down Expand Up @@ -684,7 +684,7 @@ impl Inner {
fn drop_sequence(&mut self, seq_id: SequenceId) -> super::Result<()> {
const ST_SEQUENCES_SEQUENCE_ID_COL: ColId = ColId(0);
let old_seq_row = self
.iter_by_col_eq(&ST_SEQUENCES_ID, &ST_SEQUENCES_SEQUENCE_ID_COL, seq_id.into())?
.iter_by_col_eq(&ST_SEQUENCES_ID, ST_SEQUENCES_SEQUENCE_ID_COL, seq_id.into())?
.last()
.unwrap()
.data;
Expand All @@ -698,7 +698,7 @@ impl Inner {
let seq_name_col: ColId = ColId(1);
self.iter_by_col_eq(
&ST_SEQUENCES_ID,
&seq_name_col,
seq_name_col,
AlgebraicValue::String(seq_name.to_owned()),
)
.map(|mut iter| {
Expand Down Expand Up @@ -819,7 +819,7 @@ impl Inner {
}

// Look up the table_name for the table in question.
let table_id_col: ColId = ColId(0);
let table_id_col = NonEmpty::new(0);

// TODO(george): As part of the bootstrapping process, we add a bunch of rows
// and only at very end do we patch things up and create table metadata, indexes,
Expand All @@ -829,7 +829,7 @@ impl Inner {
let value: AlgebraicValue = table_id.into();
let rows = IterByColRange::Scan(ScanIterByColRange {
range: value,
col_id: table_id_col,
cols: table_id_col,
scan_iter: self.iter(&ST_TABLES_ID)?,
})
.collect::<Vec<_>>();
Expand All @@ -843,7 +843,7 @@ impl Inner {
// Look up the columns for the table in question.
let mut columns = Vec::new();
const TABLE_ID_COL: ColId = ColId(0);
for data_ref in self.iter_by_col_eq(&ST_COLUMNS_ID, &TABLE_ID_COL, table_id.into())? {
for data_ref in self.iter_by_col_eq(&ST_COLUMNS_ID, TABLE_ID_COL, table_id.into())? {
let row = data_ref.view();

let el = StColumnRow::try_from(row)?;
Expand All @@ -862,7 +862,7 @@ impl Inner {
// Look up the indexes for the table in question.
let mut indexes = Vec::new();
let table_id_col: ColId = ColId(1);
for data_ref in self.iter_by_col_eq(&ST_INDEXES_ID, &table_id_col, table_id.into())? {
for data_ref in self.iter_by_col_eq(&ST_INDEXES_ID, table_id_col, table_id.into())? {
let row = data_ref.view();

let el = StIndexRow::try_from(row)?;
Expand Down Expand Up @@ -891,7 +891,7 @@ impl Inner {
// First drop the tables indexes.
const ST_INDEXES_TABLE_ID_COL: ColId = ColId(1);
let rows = self
.iter_by_col_eq(&ST_INDEXES_ID, &ST_INDEXES_TABLE_ID_COL, table_id.into())?
.iter_by_col_eq(&ST_INDEXES_ID, ST_INDEXES_TABLE_ID_COL, table_id.into())?
.collect::<Vec<_>>();
for data_ref in rows {
let row = data_ref.view();
Expand All @@ -902,7 +902,7 @@ impl Inner {
// Remove the table's sequences from st_sequences.
const ST_SEQUENCES_TABLE_ID_COL: ColId = ColId(2);
let rows = self
.iter_by_col_eq(&ST_SEQUENCES_ID, &ST_SEQUENCES_TABLE_ID_COL, table_id.into())?
.iter_by_col_eq(&ST_SEQUENCES_ID, ST_SEQUENCES_TABLE_ID_COL, table_id.into())?
.collect::<Vec<_>>();
for data_ref in rows {
let row = data_ref.view();
Expand All @@ -927,7 +927,7 @@ impl Inner {
// Update the table's name in st_tables.
const ST_TABLES_TABLE_ID_COL: ColId = ColId(0);
let rows = self
.iter_by_col_eq(&ST_TABLES_ID, &ST_TABLES_TABLE_ID_COL, table_id.into())?
.iter_by_col_eq(&ST_TABLES_ID, ST_TABLES_TABLE_ID_COL, table_id.into())?
.collect::<Vec<_>>();
assert!(rows.len() <= 1, "Expected at most one row in st_tables for table_id");
let row = rows.first().ok_or_else(|| TableError::IdNotFound(table_id.0))?;
Expand All @@ -943,7 +943,7 @@ impl Inner {
let table_name_col: ColId = ColId(1);
self.iter_by_col_eq(
&ST_TABLES_ID,
&table_name_col,
table_name_col,
AlgebraicValue::String(table_name.to_owned()),
)
.map(|mut iter| {
Expand All @@ -954,7 +954,7 @@ impl Inner {

fn table_name_from_id(&self, table_id: TableId) -> super::Result<Option<String>> {
let table_id_col: ColId = ColId(0);
self.iter_by_col_eq(&ST_TABLES_ID, &table_id_col, table_id.into())
self.iter_by_col_eq(&ST_TABLES_ID, table_id_col, table_id.into())
.map(|mut iter| {
iter.next()
.map(|row| row.view().elements[1].as_string().unwrap().to_owned())
Expand Down Expand Up @@ -1055,7 +1055,7 @@ impl Inner {
// Remove the index from st_indexes.
const ST_INDEXES_INDEX_ID_COL: ColId = ColId(0);
let old_index_row = self
.iter_by_col_eq(&ST_INDEXES_ID, &ST_INDEXES_INDEX_ID_COL, index_id.into())?
.iter_by_col_eq(&ST_INDEXES_ID, ST_INDEXES_INDEX_ID_COL, index_id.into())?
.last()
.unwrap()
.data;
Expand Down Expand Up @@ -1104,7 +1104,7 @@ impl Inner {
let index_name_col: ColId = ColId(3);
self.iter_by_col_eq(
&ST_INDEXES_ID,
&index_name_col,
index_name_col,
AlgebraicValue::String(index_name.to_owned()),
)
.map(|mut iter| {
Expand Down Expand Up @@ -1195,7 +1195,7 @@ impl Inner {
continue;
}
let st_sequences_table_id_col = ColId(2);
for seq_row in self.iter_by_col_eq(&ST_SEQUENCES_ID, &st_sequences_table_id_col, table_id.into())? {
for seq_row in self.iter_by_col_eq(&ST_SEQUENCES_ID, st_sequences_table_id_col, table_id.into())? {
let seq_row = seq_row.view();
let seq_row = StSequenceRow::try_from(seq_row)?;
if seq_row.col_id != col.col_id {
Expand Down Expand Up @@ -1473,10 +1473,10 @@ impl Inner {
fn iter_by_col_eq(
&self,
table_id: &TableId,
col_id: &ColId,
cols: impl Into<NonEmpty<ColId>>,
value: AlgebraicValue,
) -> super::Result<IterByColEq<'_>> {
self.iter_by_col_range(table_id, col_id, value)
self.iter_by_col_range(table_id, cols.into(), value)
}

/// Returns an iterator,
Expand All @@ -1485,7 +1485,7 @@ impl Inner {
fn iter_by_col_range<'a, R: RangeBounds<AlgebraicValue>>(
&'a self,
table_id: &TableId,
col_id: &ColId,
cols: NonEmpty<ColId>,
range: R,
) -> super::Result<IterByColRange<'a, R>> {
// We have to index_seek in both the committed state and the current tx state.
Expand All @@ -1502,31 +1502,26 @@ impl Inner {
if let Some(inserted_rows) = self
.tx_state
.as_ref()
.and_then(|tx_state| tx_state.index_seek(table_id, NonEmpty::new(*col_id), &range))
.and_then(|tx_state| tx_state.index_seek(table_id, &cols, &range))
{
// The current transaction has modified this table, and the table is indexed.
let tx_state = self.tx_state.as_ref().unwrap();
Ok(IterByColRange::Index(IndexSeekIterInner {
table_id: *table_id,
tx_state,
inserted_rows,
committed_rows: self
.committed_state
.index_seek(table_id, NonEmpty::new(*col_id), &range),
committed_rows: self.committed_state.index_seek(table_id, &cols, &range),
committed_state: &self.committed_state,
}))
} else {
// Either the current transaction has not modified this table, or the table is not
// indexed.
match self
.committed_state
.index_seek(table_id, NonEmpty::new(*col_id), &range)
{
match self.committed_state.index_seek(table_id, &cols, &range) {
//If we don't have `self.tx_state` yet is likely we are running the bootstrap process
Some(committed_rows) => match self.tx_state.as_ref() {
None => Ok(IterByColRange::Scan(ScanIterByColRange {
range,
col_id: *col_id,
cols: NonEmpty::collect(cols.map(|col| col.0)).unwrap(),
scan_iter: self.iter(table_id)?,
})),
Some(tx_state) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter {
Expand All @@ -1538,7 +1533,7 @@ impl Inner {
},
None => Ok(IterByColRange::Scan(ScanIterByColRange {
range,
col_id: *col_id,
cols: NonEmpty::collect(cols.map(|col| col.0)).unwrap(),
scan_iter: self.iter(table_id)?,
})),
}
Expand Down Expand Up @@ -1887,7 +1882,7 @@ impl<R: RangeBounds<AlgebraicValue>> Iterator for IterByColRange<'_, R> {

pub struct ScanIterByColRange<'a, R: RangeBounds<AlgebraicValue>> {
scan_iter: Iter<'a>,
col_id: ColId,
cols: NonEmpty<u32>,
range: R,
}

Expand All @@ -1898,8 +1893,8 @@ impl<R: RangeBounds<AlgebraicValue>> Iterator for ScanIterByColRange<'_, R> {
fn next(&mut self) -> Option<Self::Item> {
for data_ref in &mut self.scan_iter {
let row = data_ref.view();
let value = &row.elements[self.col_id.0 as usize];
if self.range.contains(value) {
let value = row.project_not_empty(&self.cols).unwrap();
if self.range.contains(&value) {
return Some(data_ref);
}
}
Expand All @@ -1920,20 +1915,20 @@ impl TxDatastore for Locking {
&'a self,
tx: &'a Self::TxId,
table_id: TableId,
col_id: ColId,
cols: NonEmpty<ColId>,
range: R,
) -> super::Result<Self::IterByColRange<'a, R>> {
self.iter_by_col_range_mut_tx(tx, table_id, col_id, range)
self.iter_by_col_range_mut_tx(tx, table_id, cols, range)
}

fn iter_by_col_eq_tx<'a>(
&'a self,
tx: &'a Self::TxId,
table_id: TableId,
col_id: ColId,
cols: NonEmpty<ColId>,
value: AlgebraicValue,
) -> super::Result<Self::IterByColEq<'a>> {
self.iter_by_col_eq_mut_tx(tx, table_id, col_id, value)
self.iter_by_col_eq_mut_tx(tx, table_id, cols, value)
}

fn get_tx<'a>(
Expand Down Expand Up @@ -2059,20 +2054,20 @@ impl MutTxDatastore for Locking {
&'a self,
tx: &'a Self::MutTxId,
table_id: TableId,
col_id: ColId,
cols: impl Into<NonEmpty<ColId>>,
range: R,
) -> super::Result<Self::IterByColRange<'a, R>> {
tx.lock.iter_by_col_range(&table_id, &col_id, range)
tx.lock.iter_by_col_range(&table_id, cols.into(), range)
}

fn iter_by_col_eq_mut_tx<'a>(
&'a self,
tx: &'a Self::MutTxId,
table_id: TableId,
col_id: ColId,
cols: impl Into<NonEmpty<ColId>>,
value: AlgebraicValue,
) -> super::Result<Self::IterByColEq<'a>> {
tx.lock.iter_by_col_eq(&table_id, &col_id, value)
tx.lock.iter_by_col_eq(&table_id, cols, value)
}

fn get_mut_tx<'a>(
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/db/datastore/locking_tx_datastore/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ impl Table {
/// Matching is defined by `Ord for AlgebraicValue`.
pub(crate) fn index_seek(
&self,
cols: NonEmpty<ColId>,
cols: &NonEmpty<ColId>,
range: &impl RangeBounds<AlgebraicValue>,
) -> Option<BTreeIndexRangeIter<'_>> {
self.indexes.get(&cols).map(|index| index.seek(range))
self.indexes.get(cols).map(|index| index.seek(range))
}
}
Loading

0 comments on commit ca532c9

Please sign in to comment.