Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes the restart problem (i.e. rebuilding the datastore from the message log) #34

Merged
merged 2 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 118 additions & 19 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use crate::{
},
messages::{
transaction::Transaction,
write::{self, Write},
write::{self, Operation, Write},
},
ostorage::ObjectDB,
},
error::{DBError, IndexError, TableError},
};
Expand Down Expand Up @@ -408,6 +409,32 @@ impl Inner {
Ok(())
}

/// After replaying all old transactions, tables which have rows will
/// have been created in memory, but tables with no rows will not have
/// been created. This function ensures that they are created.
fn build_missing_tables(&mut self) -> super::Result<()> {
let st_tables = self.committed_state.tables.get(&ST_TABLES_ID).unwrap();
let rows = st_tables.scan_rows().cloned().collect::<Vec<_>>();
for row in rows {
let table_row = StTableRow::try_from(&row)?;
let table_id = TableId(table_row.table_id);
let schema = self.schema_for_table(table_id)?;
let row_type = self.row_type_for_table(table_id)?;
if self.committed_state.get_table(&table_id).is_none() {
self.committed_state.tables.insert(
table_id,
Table {
row_type,
schema,
indexes: HashMap::new(),
rows: BTreeMap::new(),
},
);
}
}
Ok(())
}

fn drop_table_from_st_tables(&mut self, table_id: TableId) -> super::Result<()> {
const ST_TABLES_TABLE_ID_COL: ColId = ColId(0);
let value = AlgebraicValue::U32(table_id.0);
Expand Down Expand Up @@ -961,7 +988,10 @@ impl Inner {
}

fn table_exists(&self, table_id: &TableId) -> bool {
self.tx_state.as_ref().unwrap().insert_tables.contains_key(table_id)
self.tx_state
.as_ref()
.map(|tx_state| tx_state.insert_tables.contains_key(table_id))
.unwrap_or(false)
|| self.committed_state.tables.contains_key(table_id)
}

Expand Down Expand Up @@ -1021,7 +1051,7 @@ impl Inner {

#[tracing::instrument(skip_all)]
fn insert_row(&mut self, table_id: TableId, mut row: ProductValue) -> super::Result<ProductValue> {
// TODO: Excuting schema_for_table for every row insert is expensive.
// TODO: Executing schema_for_table for every row insert is expensive.
// We should store the schema in the [Table] struct instead.
let schema = self.schema_for_table(table_id)?;
for col in schema.columns {
Expand Down Expand Up @@ -1189,8 +1219,7 @@ impl Inner {
return Ok(self
.tx_state
.as_ref()
.unwrap()
.get_row(table_id, row_id)
.and_then(|tx_state| tx_state.get_row(table_id, row_id))
.map(|row| DataRef::new(row.clone())));
}
RowOp::Delete => {
Expand All @@ -1210,9 +1239,7 @@ impl Inner {
if let Some(row_type) = self
.tx_state
.as_ref()
.unwrap()
.insert_tables
.get(table_id)
.and_then(|tx_state| tx_state.insert_tables.get(table_id))
.map(|table| table.get_row_type())
{
return Some(row_type);
Expand All @@ -1227,9 +1254,7 @@ impl Inner {
if let Some(schema) = self
.tx_state
.as_ref()
.unwrap()
.insert_tables
.get(table_id)
.and_then(|tx_state| tx_state.insert_tables.get(table_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed feelings about this pattern. I don't think the ability to reason about the state (tx_state is None or not, corresponding to being currently in a transaction or not) was great, but I think this also muddles it a little further. I don't think it's actionable now, but something that we could want to make clearer in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I hate this whole thing tbh, but I'm just trying to get it to build in time. :/

.map(|table| table.get_schema())
{
return Some(schema);
Expand Down Expand Up @@ -1299,8 +1324,12 @@ impl Inner {
/// yielding every row in the table identified by `table_id`,
/// where the column data identified by `col_id` equates to `value`.
fn seek<'a>(&'a self, table_id: &TableId, col_id: &ColId, value: &'a AlgebraicValue) -> super::Result<SeekIter> {
let tx_state = self.tx_state.as_ref().unwrap();
if let Some(inserted_rows) = tx_state.index_seek(table_id, col_id, value) {
if let Some(inserted_rows) = self
.tx_state
.as_ref()
.and_then(|tx_state| tx_state.index_seek(table_id, col_id, value))
{
let tx_state = self.tx_state.as_ref().unwrap();
Ok(SeekIter::Index(IndexSeekIter {
value,
col_id: *col_id,
Expand Down Expand Up @@ -1372,9 +1401,66 @@ impl Locking {
inner: Arc::new(Mutex::new(datastore)),
})
}

/// The purpose of this is to rebuild the state of the datastore
/// after having inserted all of rows from the message log.
/// This is necessary because, for example, inserting a row into `st_table`
/// is not equivalent to calling `create_table`.
/// There may eventually be better way to do this, but this will have to do for now.
pub fn rebuild_state_after_replay(&self) -> Result<(), DBError> {
let mut inner = self.inner.lock();

// `build_missing_tables` must be called before indexes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be fine with that if the very deep for loop bodies inside got factored out.

// Honestly this should maybe just be one big procedure.
// See John Carmack's philosophy on this.
inner.build_missing_tables()?;
inner.build_indexes()?;
inner.build_sequence_state()?;

Ok(())
}

pub fn replay_transaction(
&self,
transaction: &Transaction,
odb: Arc<std::sync::Mutex<Box<dyn ObjectDB + Send>>>,
) -> Result<(), DBError> {
let mut inner = self.inner.lock();
for write in &transaction.writes {
let table_id = TableId(write.set_id);
let schema = inner.schema_for_table(table_id)?;
let row_type = inner.row_type_for_table(table_id)?;
let table = inner.committed_state.tables.entry(table_id).or_insert(Table {
row_type: row_type.clone(),
schema,
indexes: HashMap::new(),
rows: BTreeMap::new(),
});
match write.operation {
Operation::Delete => {
table.rows.remove(&RowId(write.data_key));
}
Operation::Insert => {
let product_value = match write.data_key {
DataKey::Data(data) => ProductValue::decode(&row_type, &mut &data[..]).unwrap_or_else(|_| {
panic!("Couldn't decode product value to {:?} from message log", row_type)
}),
DataKey::Hash(hash) => {
let data = odb.lock().unwrap().get(hash).unwrap();
ProductValue::decode(&row_type, &mut &data[..]).unwrap_or_else(|_| {
panic!("Couldn't decode product value to {:?} from message log", row_type)
})
}
};
table.rows.insert(RowId(write.data_key), product_value);
}
}
}
Ok(())
}
}

#[derive(Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct RowId(pub(crate) DataKey);

impl DataRow for Locking {
Expand Down Expand Up @@ -1444,15 +1530,28 @@ impl Iterator for ScanIter<'_> {
}
ScanStage::Committed { iter } => {
for (row_id, row) in iter {
match self.inner.tx_state.as_ref().unwrap().get_row_op(&self.table_id, row_id) {
RowOp::Insert => (), // Do nothing, we'll get it in the next stage
RowOp::Delete => (), // Skip it, it's been deleted
RowOp::Absent => {
match self
.inner
.tx_state
.as_ref()
.map(|tx_state| tx_state.get_row_op(&self.table_id, row_id))
{
Some(RowOp::Insert) => (), // Do nothing, we'll get it in the next stage
Some(RowOp::Delete) => (), // Skip it, it's been deleted
Some(RowOp::Absent) => {
return Some(DataRef::new(row.clone()));
}
None => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of my comment about what state we are in, now there's this extra None case in which you are supposed to do the same thing as when you see Some(Absent).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

return Some(DataRef::new(row.clone()));
}
}
}
if let Some(table) = self.inner.tx_state.as_ref().unwrap().insert_tables.get(&self.table_id) {
if let Some(table) = self
.inner
.tx_state
.as_ref()
.and_then(|tx_state| tx_state.insert_tables.get(&self.table_id))
{
self.stage = ScanStage::CurrentTx {
iter: table.rows.iter(),
};
Expand Down
47 changes: 8 additions & 39 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::db::db_metrics::{
RDB_DELETE_IN_TIME, RDB_DELETE_PK_TIME, RDB_DROP_TABLE_TIME, RDB_INSERT_TIME, RDB_SCAN_TIME,
};
use crate::db::messages::commit::Commit;
use crate::db::messages::write::Operation;
use crate::db::ostorage::hashmap_object_db::HashMapObjectDB;
use crate::db::ostorage::ObjectDB;
use crate::error::{DBError, DatabaseError, TableError};
Expand Down Expand Up @@ -96,47 +95,17 @@ impl RelationalDB {
// is just to reduce memory usage while inserting. We don't
// really care about inserting these transactionally as long
// as all of the writes get inserted.
let mut tx = datastore.begin_mut_tx();
for write in &transaction.writes {
let table_id = TableId(write.set_id);
match write.operation {
Operation::Delete => {
datastore
.delete_row_mut_tx(&mut tx, table_id, RowId(write.data_key))
.unwrap();
}
Operation::Insert => {
let row_type = datastore.row_type_for_table_mut_tx(&tx, table_id).unwrap();
match write.data_key {
DataKey::Data(data) => {
let product_value = ProductValue::decode(&row_type, &mut &data[..])
.unwrap_or_else(|_| {
panic!(
"Couldn't decode product value to {:?} from message log",
row_type
)
});
datastore.insert_row_mut_tx(&mut tx, table_id, product_value).unwrap();
}
DataKey::Hash(hash) => {
let data = odb.lock().unwrap().get(hash).unwrap();
let product_value = ProductValue::decode(&row_type, &mut &data[..])
.unwrap_or_else(|_| {
panic!(
"Couldn't decode product value to {:?} from message log",
row_type
)
});
datastore.insert_row_mut_tx(&mut tx, table_id, product_value).unwrap();
}
};
}
}
}
datastore.commit_mut_tx(tx).unwrap();
datastore.replay_transaction(&transaction, odb.clone())?;
}
}

// The purpose of this is to rebuild the state of the datastore
// after having inserted all of rows from the message log.
// This is necessary because, for example, inserting a row into `st_table`
// is not equivalent to calling `create_table`.
// There may eventually be better way to do this, but this will have to do for now.
datastore.rebuild_state_after_replay()?;

let commit_offset = if let Some(last_commit_offset) = last_commit_offset {
last_commit_offset + 1
} else {
Expand Down