Skip to content

Commit

Permalink
Actually use indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
kulakowski committed Jul 20, 2023
1 parent 2f9390e commit 6e9bf4b
Showing 1 changed file with 74 additions and 8 deletions.
82 changes: 74 additions & 8 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,10 +705,19 @@ impl Inner {

// Look up the table_name for the table in question.
let table_id_col: ColId = ColId(0);
let rows = self
.iter_by_col_eq(&ST_TABLES_ID, &table_id_col, &AlgebraicValue::U32(table_id.0))?
.collect::<Vec<_>>();

// TODO(george): As part of the bootstrapping process, we add a bunch of rows
// and only at very end do we patch things up and create table metadata, indexes,
// and so on. Early parts of that process insert rows, and need the schema to do
// so. We can't just call iter_by_col_eq here as that would attempt to use the
// index which we haven't created yet. So instead we just manually Scan here.
let rows = IterByColEq::Scan(ScanIterByColEq {
value: &AlgebraicValue::U32(table_id.0),
col_id: table_id_col,
scan_iter: self.iter(&ST_TABLES_ID)?,
}).collect::<Vec<_>>();
assert!(rows.len() <= 1, "Expected at most one row in st_tables for table_id");

let row = rows.first().ok_or_else(|| TableError::IdNotFound(table_id.0))?;
let el = StTableRow::try_from(row.view())?;
let table_name = el.table_name.to_owned();
Expand Down Expand Up @@ -1342,11 +1351,23 @@ impl Inner {
col_id: &ColId,
value: &'a AlgebraicValue,
) -> super::Result<IterByColEq> {
// We have to index_seek in both the committed state and the current tx state.
// First, we will check modifications in the current tx. It may be that the table
// has not been modified yet in the current tx, in which case we will only search
// the committed state. Finally, the table may not be indexed at all, in which case
// we fall back to iterating the entire table.

// We need to check the tx_state first. In particular, it may be that the index
// was only added in the current transaction.
// TODO(george): It's unclear that we truly support dynamically creating an index
// yet. In particular, I don't know if creating an index in a transaction and
// rolling it back will leave the index in place.
if let Some(inserted_rows) = self
.tx_state
.as_ref()
.and_then(|tx_state| tx_state.index_seek(table_id, col_id, value))
{
// The current transaction has modified this table, and the table is indexed.
let tx_state = self.tx_state.as_ref().unwrap();
Ok(IterByColEq::Index(IndexIterByColEq {
value,
Expand All @@ -1360,11 +1381,21 @@ impl Inner {
},
}))
} else {
Ok(IterByColEq::Scan(ScanIterByColEq {
value,
col_id: *col_id,
scan_iter: self.iter(table_id)?,
}))
// Either the current transaction has not modified this table, or the table is not
// indexed.
match self.committed_state.index_seek(table_id, col_id, value) {
Some(committed_rows) => Ok(IterByColEq::CommittedIndex(CommittedIndexIterByColEq {
table_id: *table_id,
tx_state: self.tx_state.as_ref().unwrap(),
committed_state: &self.committed_state,
committed_rows,
})),
None => Ok(IterByColEq::Scan(ScanIterByColEq {
value,
col_id: *col_id,
scan_iter: self.iter(table_id)?,
})),
}
}
}

Expand Down Expand Up @@ -1594,6 +1625,7 @@ impl Iterator for Iter<'_> {
pub enum IterByColEq<'a> {
Scan(ScanIterByColEq<'a>),
Index(IndexIterByColEq<'a>),
CommittedIndex(CommittedIndexIterByColEq<'a>),
}

impl Iterator for IterByColEq<'_> {
Expand All @@ -1603,6 +1635,7 @@ impl Iterator for IterByColEq<'_> {
match self {
IterByColEq::Scan(seek) => seek.next(),
IterByColEq::Index(seek) => seek.next(),
IterByColEq::CommittedIndex(seek) => seek.next(),
}
}
}
Expand Down Expand Up @@ -1689,6 +1722,39 @@ impl Iterator for IndexSeekIterInner<'_> {
}
}

pub struct CommittedIndexIterByColEq<'a> {
table_id: TableId,
tx_state: &'a TxState,
committed_state: &'a CommittedState,
committed_rows: BTreeIndexRangeIter<'a>,
}

impl<'a> Iterator for CommittedIndexIterByColEq<'a> {
type Item = DataRef;

fn next(&mut self) -> Option<Self::Item> {
if let Some(row_id) = self.committed_rows.find(|row_id| {
!self
.tx_state
.delete_tables
.get(&self.table_id)
.map_or(false, |table| table.contains(row_id))
}) {
return Some(DataRef::new(
self.committed_state
.tables
.get(&self.table_id)
.unwrap()
.get_row(&row_id)
.unwrap()
.clone(),
));
}

None
}
}

pub enum IterByColRange<'a, R: RangeBounds<AlgebraicValue>> {
Scan(ScanIterByColRange<'a, R>),
// TODO: Index(IndexRangeScanIter<'a>),
Expand Down

0 comments on commit 6e9bf4b

Please sign in to comment.