From d8576bfcc94c0b79f2756703e4e1ad35b658b448 Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Thu, 29 Jun 2023 21:17:46 -0700 Subject: [PATCH] Fixes the restart problem (i.e. rebuilding the datastore from the message log) (#34) * Fixes the restart problem (i.e. rebuilding the datastore from the message log) * Remove logging --- .../db/datastore/locking_tx_datastore/mod.rs | 137 +++++++++++++++--- crates/core/src/db/relational_db.rs | 47 +----- 2 files changed, 126 insertions(+), 58 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index 71f472c2d5..088c73f472 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -32,8 +32,9 @@ use crate::{ }, messages::{ transaction::Transaction, - write::{self, Write}, + write::{self, Operation, Write}, }, + ostorage::ObjectDB, }, error::{DBError, IndexError, TableError}, }; @@ -410,6 +411,32 @@ impl Inner { Ok(()) } + /// After replaying all old transactions, tables which have rows will + /// have been created in memory, but tables with no rows will not have + /// been created. This function ensures that they are created. + fn build_missing_tables(&mut self) -> super::Result<()> { + let st_tables = self.committed_state.tables.get(&ST_TABLES_ID).unwrap(); + let rows = st_tables.scan_rows().cloned().collect::>(); + for row in rows { + let table_row = StTableRow::try_from(&row)?; + let table_id = TableId(table_row.table_id); + let schema = self.schema_for_table(table_id)?; + let row_type = self.row_type_for_table(table_id)?; + if self.committed_state.get_table(&table_id).is_none() { + self.committed_state.tables.insert( + table_id, + Table { + row_type, + schema, + indexes: HashMap::new(), + rows: BTreeMap::new(), + }, + ); + } + } + Ok(()) + } + fn drop_table_from_st_tables(&mut self, table_id: TableId) -> super::Result<()> { const ST_TABLES_TABLE_ID_COL: ColId = ColId(0); let value = AlgebraicValue::U32(table_id.0); @@ -963,7 +990,10 @@ impl Inner { } fn table_exists(&self, table_id: &TableId) -> bool { - self.tx_state.as_ref().unwrap().insert_tables.contains_key(table_id) + self.tx_state + .as_ref() + .map(|tx_state| tx_state.insert_tables.contains_key(table_id)) + .unwrap_or(false) || self.committed_state.tables.contains_key(table_id) } @@ -1023,7 +1053,7 @@ impl Inner { #[tracing::instrument(skip_all)] fn insert_row(&mut self, table_id: TableId, mut row: ProductValue) -> super::Result { - // TODO: Excuting schema_for_table for every row insert is expensive. + // TODO: Executing schema_for_table for every row insert is expensive. // We should store the schema in the [Table] struct instead. let schema = self.schema_for_table(table_id)?; for col in schema.columns { @@ -1191,8 +1221,7 @@ impl Inner { return Ok(self .tx_state .as_ref() - .unwrap() - .get_row(table_id, row_id) + .and_then(|tx_state| tx_state.get_row(table_id, row_id)) .map(|row| DataRef::new(row.clone()))); } RowOp::Delete => { @@ -1212,9 +1241,7 @@ impl Inner { if let Some(row_type) = self .tx_state .as_ref() - .unwrap() - .insert_tables - .get(table_id) + .and_then(|tx_state| tx_state.insert_tables.get(table_id)) .map(|table| table.get_row_type()) { return Some(row_type); @@ -1229,9 +1256,7 @@ impl Inner { if let Some(schema) = self .tx_state .as_ref() - .unwrap() - .insert_tables - .get(table_id) + .and_then(|tx_state| tx_state.insert_tables.get(table_id)) .map(|table| table.get_schema()) { return Some(schema); @@ -1301,8 +1326,12 @@ impl Inner { /// yielding every row in the table identified by `table_id`, /// where the column data identified by `col_id` equates to `value`. fn seek<'a>(&'a self, table_id: &TableId, col_id: &ColId, value: &'a AlgebraicValue) -> super::Result { - let tx_state = self.tx_state.as_ref().unwrap(); - if let Some(inserted_rows) = tx_state.index_seek(table_id, col_id, value) { + if let Some(inserted_rows) = self + .tx_state + .as_ref() + .and_then(|tx_state| tx_state.index_seek(table_id, col_id, value)) + { + let tx_state = self.tx_state.as_ref().unwrap(); Ok(SeekIter::Index(IndexSeekIter { value, col_id: *col_id, @@ -1374,9 +1403,66 @@ impl Locking { inner: Arc::new(Mutex::new(datastore)), }) } + + /// The purpose of this is to rebuild the state of the datastore + /// after having inserted all of rows from the message log. + /// This is necessary because, for example, inserting a row into `st_table` + /// is not equivalent to calling `create_table`. + /// There may eventually be better way to do this, but this will have to do for now. + pub fn rebuild_state_after_replay(&self) -> Result<(), DBError> { + let mut inner = self.inner.lock(); + + // `build_missing_tables` must be called before indexes. + // Honestly this should maybe just be one big procedure. + // See John Carmack's philosophy on this. + inner.build_missing_tables()?; + inner.build_indexes()?; + inner.build_sequence_state()?; + + Ok(()) + } + + pub fn replay_transaction( + &self, + transaction: &Transaction, + odb: Arc>>, + ) -> Result<(), DBError> { + let mut inner = self.inner.lock(); + for write in &transaction.writes { + let table_id = TableId(write.set_id); + let schema = inner.schema_for_table(table_id)?; + let row_type = inner.row_type_for_table(table_id)?; + let table = inner.committed_state.tables.entry(table_id).or_insert(Table { + row_type: row_type.clone(), + schema, + indexes: HashMap::new(), + rows: BTreeMap::new(), + }); + match write.operation { + Operation::Delete => { + table.rows.remove(&RowId(write.data_key)); + } + Operation::Insert => { + let product_value = match write.data_key { + DataKey::Data(data) => ProductValue::decode(&row_type, &mut &data[..]).unwrap_or_else(|_| { + panic!("Couldn't decode product value to {:?} from message log", row_type) + }), + DataKey::Hash(hash) => { + let data = odb.lock().unwrap().get(hash).unwrap(); + ProductValue::decode(&row_type, &mut &data[..]).unwrap_or_else(|_| { + panic!("Couldn't decode product value to {:?} from message log", row_type) + }) + } + }; + table.rows.insert(RowId(write.data_key), product_value); + } + } + } + Ok(()) + } } -#[derive(Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] pub struct RowId(pub(crate) DataKey); impl DataRow for Locking { @@ -1446,15 +1532,28 @@ impl Iterator for ScanIter<'_> { } ScanStage::Committed { iter } => { for (row_id, row) in iter { - match self.inner.tx_state.as_ref().unwrap().get_row_op(&self.table_id, row_id) { - RowOp::Insert => (), // Do nothing, we'll get it in the next stage - RowOp::Delete => (), // Skip it, it's been deleted - RowOp::Absent => { + match self + .inner + .tx_state + .as_ref() + .map(|tx_state| tx_state.get_row_op(&self.table_id, row_id)) + { + Some(RowOp::Insert) => (), // Do nothing, we'll get it in the next stage + Some(RowOp::Delete) => (), // Skip it, it's been deleted + Some(RowOp::Absent) => { + return Some(DataRef::new(row.clone())); + } + None => { return Some(DataRef::new(row.clone())); } } } - if let Some(table) = self.inner.tx_state.as_ref().unwrap().insert_tables.get(&self.table_id) { + if let Some(table) = self + .inner + .tx_state + .as_ref() + .and_then(|tx_state| tx_state.insert_tables.get(&self.table_id)) + { self.stage = ScanStage::CurrentTx { iter: table.rows.iter(), }; diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index b9ef844802..1827b7258f 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -10,7 +10,6 @@ use crate::db::db_metrics::{ RDB_DELETE_IN_TIME, RDB_DELETE_PK_TIME, RDB_DROP_TABLE_TIME, RDB_INSERT_TIME, RDB_SCAN_TIME, }; use crate::db::messages::commit::Commit; -use crate::db::messages::write::Operation; use crate::db::ostorage::hashmap_object_db::HashMapObjectDB; use crate::db::ostorage::ObjectDB; use crate::error::{DBError, DatabaseError, TableError}; @@ -96,47 +95,17 @@ impl RelationalDB { // is just to reduce memory usage while inserting. We don't // really care about inserting these transactionally as long // as all of the writes get inserted. - let mut tx = datastore.begin_mut_tx(); - for write in &transaction.writes { - let table_id = TableId(write.set_id); - match write.operation { - Operation::Delete => { - datastore - .delete_row_mut_tx(&mut tx, table_id, RowId(write.data_key)) - .unwrap(); - } - Operation::Insert => { - let row_type = datastore.row_type_for_table_mut_tx(&tx, table_id).unwrap(); - match write.data_key { - DataKey::Data(data) => { - let product_value = ProductValue::decode(&row_type, &mut &data[..]) - .unwrap_or_else(|_| { - panic!( - "Couldn't decode product value to {:?} from message log", - row_type - ) - }); - datastore.insert_row_mut_tx(&mut tx, table_id, product_value).unwrap(); - } - DataKey::Hash(hash) => { - let data = odb.lock().unwrap().get(hash).unwrap(); - let product_value = ProductValue::decode(&row_type, &mut &data[..]) - .unwrap_or_else(|_| { - panic!( - "Couldn't decode product value to {:?} from message log", - row_type - ) - }); - datastore.insert_row_mut_tx(&mut tx, table_id, product_value).unwrap(); - } - }; - } - } - } - datastore.commit_mut_tx(tx).unwrap(); + datastore.replay_transaction(&transaction, odb.clone())?; } } + // The purpose of this is to rebuild the state of the datastore + // after having inserted all of rows from the message log. + // This is necessary because, for example, inserting a row into `st_table` + // is not equivalent to calling `create_table`. + // There may eventually be better way to do this, but this will have to do for now. + datastore.rebuild_state_after_replay()?; + let commit_offset = if let Some(last_commit_offset) = last_commit_offset { last_commit_offset + 1 } else {