Skip to content

Commit

Permalink
Make the corresponding change for delete as well
Browse files Browse the repository at this point in the history
  • Loading branch information
kulakowski committed Aug 11, 2023
1 parent d271997 commit adafcd3
Showing 1 changed file with 49 additions and 27 deletions.
76 changes: 49 additions & 27 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,19 @@ struct TxState {
delete_tables: HashMap<TableId, BTreeSet<RowId>>,
}

/// Represents whether a row has been inserted, deleted, or has not been
/// alterted in the current transaction.
enum RowOp {
/// Represents whether a row has been previously committed, inserted
/// or deleted this transaction, or simply not present at all.
enum RowState {
/// The row is present in the table because it was inserted
/// in a previously committed transaction.
Committed(ProductValue),
/// The row is present because it has been inserted in the
/// current transaction.
Insert(ProductValue),
/// The row is absent because it has been deleted in the
/// current transaction.
Delete,
/// The row is not present in the table.
Absent,
}

Expand All @@ -244,17 +252,17 @@ impl TxState {
}
}

pub fn get_row_op(&self, table_id: &TableId, row_id: &RowId) -> RowOp {
pub fn get_row_op(&self, table_id: &TableId, row_id: &RowId) -> RowState {
if let Some(true) = self.delete_tables.get(table_id).map(|set| set.contains(row_id)) {
return RowOp::Delete;
return RowState::Delete;
}
let Some(table) = self.insert_tables.get(table_id) else {
return RowOp::Absent;
return RowState::Absent;
};
table
.get_row(row_id)
.map(|pv| RowOp::Insert(pv.clone()))
.unwrap_or(RowOp::Absent)
.map(|pv| RowState::Insert(pv.clone()))
.unwrap_or(RowState::Absent)
}

pub fn get_row(&self, table_id: &TableId, row_id: &RowId) -> Option<&ProductValue> {
Expand Down Expand Up @@ -1035,20 +1043,21 @@ impl Inner {
})
}

fn contains_row(&self, table_id: &TableId, row_id: &RowId) -> RowOp {
fn contains_row(&self, table_id: &TableId, row_id: &RowId) -> RowState {
match self.tx_state.as_ref().unwrap().get_row_op(table_id, row_id) {
RowOp::Insert(pv) => return RowOp::Insert(pv),
RowOp::Delete => return RowOp::Delete,
RowOp::Absent => (),
RowState::Committed(_) => unreachable!("a row cannot be committed in a tx state"),
RowState::Insert(pv) => return RowState::Insert(pv),
RowState::Delete => return RowState::Delete,
RowState::Absent => (),
}
match self
.committed_state
.tables
.get(table_id)
.and_then(|table| table.rows.get(row_id))
{
Some(pv) => RowOp::Insert(pv.clone()),
None => RowOp::Absent,
Some(pv) => RowState::Committed(pv.clone()),
None => RowState::Absent,
}
}

Expand Down Expand Up @@ -1287,13 +1296,14 @@ impl Inner {
return Err(TableError::IdNotFound(table_id.0).into());
}
match self.tx_state.as_ref().unwrap().get_row_op(table_id, row_id) {
RowOp::Insert(row) => {
RowState::Committed(_) => unreachable!("a row cannot be committed in a tx state"),
RowState::Insert(row) => {
return Ok(Some(DataRef::new(row)));
}
RowOp::Delete => {
RowState::Delete => {
return Ok(None);
}
RowOp::Absent => {}
RowState::Absent => {}
}
Ok(self
.committed_state
Expand Down Expand Up @@ -1338,19 +1348,30 @@ impl Inner {
}

fn delete_row_internal(&mut self, table_id: &TableId, row_id: &RowId) -> bool {
if let RowOp::Insert(_) = self.contains_row(table_id, row_id) {
if let Some(table) = self.tx_state.as_mut().unwrap().get_insert_table_mut(table_id) {
table.delete(row_id);
} else {
match self.contains_row(table_id, row_id) {
RowState::Committed(_) => {
// If the row is present because of a previously committed transaction,
// we need to add it to the appropriate delete_table.
self.tx_state
.as_mut()
.unwrap()
.get_or_create_delete_table(*table_id)
.insert(*row_id);
// True because we did delete the row.
true
}
RowState::Insert(_) => {
// If the row is present because of a an insertion in this transaction,
// we need to remove it from the appropriate insert_table.
let insert_table = self.tx_state.as_mut().unwrap().get_insert_table_mut(table_id).unwrap();
insert_table.delete(row_id);
// True because we did delete a row.
true
}
RowState::Delete | RowState::Absent => {
// In either case, there's nothing to delete.
false
}
true
} else {
false
}
}

Expand Down Expand Up @@ -1634,9 +1655,10 @@ impl Iterator for Iter<'_> {
.as_ref()
.map(|tx_state| tx_state.get_row_op(&self.table_id, row_id))
{
Some(RowOp::Insert(_)) => (), // Do nothing, we'll get it in the next stage
Some(RowOp::Delete) => (), // Skip it, it's been deleted
Some(RowOp::Absent) => {
Some(RowState::Committed(_)) => unreachable!("a row cannot be committed in a tx state"),
Some(RowState::Insert(_)) => (), // Do nothing, we'll get it in the next stage
Some(RowState::Delete) => (), // Skip it, it's been deleted
Some(RowState::Absent) => {
return Some(DataRef::new(row.clone()));
}
None => {
Expand Down

0 comments on commit adafcd3

Please sign in to comment.