Skip to content

Commit

Permalink
Fixes the restart problem (i.e. rebuilding the datastore from the mes…
Browse files Browse the repository at this point in the history
…sage log) (#34)

* Fixes the restart problem (i.e. rebuilding the datastore from the message log)

* Remove logging
  • Loading branch information
cloutiertyler committed Aug 1, 2023
1 parent 3da3402 commit 68b4452
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 58 deletions.
137 changes: 118 additions & 19 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use crate::{
},
messages::{
transaction::Transaction,
write::{self, Write},
write::{self, Operation, Write},
},
ostorage::ObjectDB,
},
error::{DBError, IndexError, TableError},
};
Expand Down Expand Up @@ -410,6 +411,32 @@ impl Inner {
Ok(())
}

/// After replaying all old transactions, tables which have rows will
/// have been created in memory, but tables with no rows will not have
/// been created. This function ensures that they are created.
fn build_missing_tables(&mut self) -> super::Result<()> {
let st_tables = self.committed_state.tables.get(&ST_TABLES_ID).unwrap();
let rows = st_tables.scan_rows().cloned().collect::<Vec<_>>();
for row in rows {
let table_row = StTableRow::try_from(&row)?;
let table_id = TableId(table_row.table_id);
let schema = self.schema_for_table(table_id)?;
let row_type = self.row_type_for_table(table_id)?;
if self.committed_state.get_table(&table_id).is_none() {
self.committed_state.tables.insert(
table_id,
Table {
row_type,
schema,
indexes: HashMap::new(),
rows: BTreeMap::new(),
},
);
}
}
Ok(())
}

fn drop_table_from_st_tables(&mut self, table_id: TableId) -> super::Result<()> {
const ST_TABLES_TABLE_ID_COL: ColId = ColId(0);
let value = AlgebraicValue::U32(table_id.0);
Expand Down Expand Up @@ -963,7 +990,10 @@ impl Inner {
}

fn table_exists(&self, table_id: &TableId) -> bool {
self.tx_state.as_ref().unwrap().insert_tables.contains_key(table_id)
self.tx_state
.as_ref()
.map(|tx_state| tx_state.insert_tables.contains_key(table_id))
.unwrap_or(false)
|| self.committed_state.tables.contains_key(table_id)
}

Expand Down Expand Up @@ -1023,7 +1053,7 @@ impl Inner {

#[tracing::instrument(skip_all)]
fn insert_row(&mut self, table_id: TableId, mut row: ProductValue) -> super::Result<ProductValue> {
// TODO: Excuting schema_for_table for every row insert is expensive.
// TODO: Executing schema_for_table for every row insert is expensive.
// We should store the schema in the [Table] struct instead.
let schema = self.schema_for_table(table_id)?;
for col in schema.columns {
Expand Down Expand Up @@ -1191,8 +1221,7 @@ impl Inner {
return Ok(self
.tx_state
.as_ref()
.unwrap()
.get_row(table_id, row_id)
.and_then(|tx_state| tx_state.get_row(table_id, row_id))
.map(|row| DataRef::new(row.clone())));
}
RowOp::Delete => {
Expand All @@ -1212,9 +1241,7 @@ impl Inner {
if let Some(row_type) = self
.tx_state
.as_ref()
.unwrap()
.insert_tables
.get(table_id)
.and_then(|tx_state| tx_state.insert_tables.get(table_id))
.map(|table| table.get_row_type())
{
return Some(row_type);
Expand All @@ -1229,9 +1256,7 @@ impl Inner {
if let Some(schema) = self
.tx_state
.as_ref()
.unwrap()
.insert_tables
.get(table_id)
.and_then(|tx_state| tx_state.insert_tables.get(table_id))
.map(|table| table.get_schema())
{
return Some(schema);
Expand Down Expand Up @@ -1301,8 +1326,12 @@ impl Inner {
/// yielding every row in the table identified by `table_id`,
/// where the column data identified by `col_id` equates to `value`.
fn seek<'a>(&'a self, table_id: &TableId, col_id: &ColId, value: &'a AlgebraicValue) -> super::Result<SeekIter> {
let tx_state = self.tx_state.as_ref().unwrap();
if let Some(inserted_rows) = tx_state.index_seek(table_id, col_id, value) {
if let Some(inserted_rows) = self
.tx_state
.as_ref()
.and_then(|tx_state| tx_state.index_seek(table_id, col_id, value))
{
let tx_state = self.tx_state.as_ref().unwrap();
Ok(SeekIter::Index(IndexSeekIter {
value,
col_id: *col_id,
Expand Down Expand Up @@ -1374,9 +1403,66 @@ impl Locking {
inner: Arc::new(Mutex::new(datastore)),
})
}

/// The purpose of this is to rebuild the state of the datastore
/// after having inserted all of rows from the message log.
/// This is necessary because, for example, inserting a row into `st_table`
/// is not equivalent to calling `create_table`.
/// There may eventually be better way to do this, but this will have to do for now.
pub fn rebuild_state_after_replay(&self) -> Result<(), DBError> {
let mut inner = self.inner.lock();

// `build_missing_tables` must be called before indexes.
// Honestly this should maybe just be one big procedure.
// See John Carmack's philosophy on this.
inner.build_missing_tables()?;
inner.build_indexes()?;
inner.build_sequence_state()?;

Ok(())
}

pub fn replay_transaction(
&self,
transaction: &Transaction,
odb: Arc<std::sync::Mutex<Box<dyn ObjectDB + Send>>>,
) -> Result<(), DBError> {
let mut inner = self.inner.lock();
for write in &transaction.writes {
let table_id = TableId(write.set_id);
let schema = inner.schema_for_table(table_id)?;
let row_type = inner.row_type_for_table(table_id)?;
let table = inner.committed_state.tables.entry(table_id).or_insert(Table {
row_type: row_type.clone(),
schema,
indexes: HashMap::new(),
rows: BTreeMap::new(),
});
match write.operation {
Operation::Delete => {
table.rows.remove(&RowId(write.data_key));
}
Operation::Insert => {
let product_value = match write.data_key {
DataKey::Data(data) => ProductValue::decode(&row_type, &mut &data[..]).unwrap_or_else(|_| {
panic!("Couldn't decode product value to {:?} from message log", row_type)
}),
DataKey::Hash(hash) => {
let data = odb.lock().unwrap().get(hash).unwrap();
ProductValue::decode(&row_type, &mut &data[..]).unwrap_or_else(|_| {
panic!("Couldn't decode product value to {:?} from message log", row_type)
})
}
};
table.rows.insert(RowId(write.data_key), product_value);
}
}
}
Ok(())
}
}

#[derive(Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct RowId(pub(crate) DataKey);

impl DataRow for Locking {
Expand Down Expand Up @@ -1446,15 +1532,28 @@ impl Iterator for ScanIter<'_> {
}
ScanStage::Committed { iter } => {
for (row_id, row) in iter {
match self.inner.tx_state.as_ref().unwrap().get_row_op(&self.table_id, row_id) {
RowOp::Insert => (), // Do nothing, we'll get it in the next stage
RowOp::Delete => (), // Skip it, it's been deleted
RowOp::Absent => {
match self
.inner
.tx_state
.as_ref()
.map(|tx_state| tx_state.get_row_op(&self.table_id, row_id))
{
Some(RowOp::Insert) => (), // Do nothing, we'll get it in the next stage
Some(RowOp::Delete) => (), // Skip it, it's been deleted
Some(RowOp::Absent) => {
return Some(DataRef::new(row.clone()));
}
None => {
return Some(DataRef::new(row.clone()));
}
}
}
if let Some(table) = self.inner.tx_state.as_ref().unwrap().insert_tables.get(&self.table_id) {
if let Some(table) = self
.inner
.tx_state
.as_ref()
.and_then(|tx_state| tx_state.insert_tables.get(&self.table_id))
{
self.stage = ScanStage::CurrentTx {
iter: table.rows.iter(),
};
Expand Down
47 changes: 8 additions & 39 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::db::db_metrics::{
RDB_DELETE_IN_TIME, RDB_DELETE_PK_TIME, RDB_DROP_TABLE_TIME, RDB_INSERT_TIME, RDB_SCAN_TIME,
};
use crate::db::messages::commit::Commit;
use crate::db::messages::write::Operation;
use crate::db::ostorage::hashmap_object_db::HashMapObjectDB;
use crate::db::ostorage::ObjectDB;
use crate::error::{DBError, DatabaseError, TableError};
Expand Down Expand Up @@ -96,47 +95,17 @@ impl RelationalDB {
// is just to reduce memory usage while inserting. We don't
// really care about inserting these transactionally as long
// as all of the writes get inserted.
let mut tx = datastore.begin_mut_tx();
for write in &transaction.writes {
let table_id = TableId(write.set_id);
match write.operation {
Operation::Delete => {
datastore
.delete_row_mut_tx(&mut tx, table_id, RowId(write.data_key))
.unwrap();
}
Operation::Insert => {
let row_type = datastore.row_type_for_table_mut_tx(&tx, table_id).unwrap();
match write.data_key {
DataKey::Data(data) => {
let product_value = ProductValue::decode(&row_type, &mut &data[..])
.unwrap_or_else(|_| {
panic!(
"Couldn't decode product value to {:?} from message log",
row_type
)
});
datastore.insert_row_mut_tx(&mut tx, table_id, product_value).unwrap();
}
DataKey::Hash(hash) => {
let data = odb.lock().unwrap().get(hash).unwrap();
let product_value = ProductValue::decode(&row_type, &mut &data[..])
.unwrap_or_else(|_| {
panic!(
"Couldn't decode product value to {:?} from message log",
row_type
)
});
datastore.insert_row_mut_tx(&mut tx, table_id, product_value).unwrap();
}
};
}
}
}
datastore.commit_mut_tx(tx).unwrap();
datastore.replay_transaction(&transaction, odb.clone())?;
}
}

// The purpose of this is to rebuild the state of the datastore
// after having inserted all of rows from the message log.
// This is necessary because, for example, inserting a row into `st_table`
// is not equivalent to calling `create_table`.
// There may eventually be better way to do this, but this will have to do for now.
datastore.rebuild_state_after_replay()?;

let commit_offset = if let Some(last_commit_offset) = last_commit_offset {
last_commit_offset + 1
} else {
Expand Down

0 comments on commit 68b4452

Please sign in to comment.