Skip to content

Commit

Permalink
Fix recovery of sequences after restart
Browse files Browse the repository at this point in the history
  • Loading branch information
mamcx committed Aug 8, 2023
1 parent 45621c8 commit cd22737
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
31 changes: 25 additions & 6 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ impl Inner {

// If any columns are auto incrementing, we need to create a sequence
// NOTE: This code with the `seq_start` is particularly fragile.
// TODO: If we exceed `SEQUENCE_PREALLOCATION_AMOUNT` we will get a unique violation
if col.is_autoinc {
let (seq_start, seq_id): (i128, SequenceId) = match TableId(schema.table_id) {
ST_TABLES_ID => (4, TABLE_ID_SEQUENCE_ID), // The database is bootstrapped with 4 tables
Expand Down Expand Up @@ -399,10 +400,26 @@ impl Inner {
let rows = st_sequences.scan_rows().cloned().collect::<Vec<_>>();
for row in rows {
let sequence = StSequenceRow::try_from(&row)?;
// TODO: The system tables have initialized their value already, but this is wrong:
// If we exceed `SEQUENCE_PREALLOCATION_AMOUNT` we will get a unique violation
let is_system_table = self
.committed_state
.tables
.get(&TableId(sequence.table_id))
.map(|x| x.schema.table_type == StTableType::System)
.unwrap_or(false);

let schema = (&sequence).into();

let mut seq = Sequence::new(schema);
//Now we need to recover the last allocation value...
if !is_system_table && seq.value < sequence.allocated + 1 {
seq.value = sequence.allocated + 1;
}

self.sequence_state
.sequences
.insert(SequenceId(sequence.sequence_id), Sequence::new(schema));
.insert(SequenceId(sequence.sequence_id), seq);
}
Ok(())
}
Expand Down Expand Up @@ -483,9 +500,12 @@ impl Inner {
return Err(SequenceError::NotFound(seq_id).into());
};

// If there are allocated sequence values, return the new value.
// If there are allocated sequence values, return the new value, if it is not bigger than
// the upper range of `sequence.allocated`
if let Some(value) = sequence.gen_next_value() {
return Ok(value);
if value < sequence.allocated() {
return Ok(value);
}
}
}
// Allocate new sequence values
Expand All @@ -506,8 +526,7 @@ impl Inner {
};
let old_seq_row_id = RowId(old_seq_row.to_data_key());
let mut seq_row = StSequenceRow::try_from(&old_seq_row)?;
let num_to_allocate = 1024;
seq_row.allocated = sequence.nth_value(num_to_allocate);
seq_row.allocated = sequence.nth_value(SEQUENCE_PREALLOCATION_AMOUNT as usize);
sequence.set_allocation(seq_row.allocated);
(seq_row, old_seq_row_id)
};
Expand Down Expand Up @@ -540,7 +559,7 @@ impl Inner {
sequence_name: seq.sequence_name.as_str(),
table_id: seq.table_id,
col_id: seq.col_id,
allocated: 0,
allocated: seq.start.unwrap_or(1),
increment: seq.increment,
start: seq.start.unwrap_or(1),
min_value: seq.min_value.unwrap_or(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::db::datastore::traits::SequenceSchema;

pub struct Sequence {
schema: SequenceSchema,
value: i128,
pub(crate) value: i128,
}

impl Sequence {
Expand Down Expand Up @@ -46,6 +46,10 @@ impl Sequence {
Some(value)
}

pub fn allocated(&self) -> i128 {
self.schema.allocated
}

pub fn next_value(&self) -> i128 {
self.nth_value(1)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ mod tests {
use crate::db::datastore::traits::IndexDef;
use crate::db::datastore::traits::TableDef;
use crate::db::message_log::MessageLog;
use crate::db::relational_db::ST_TABLES_ID;
use crate::db::relational_db::{open_db, ST_TABLES_ID};

use super::RelationalDB;
use crate::db::relational_db::make_default_ostorage;
Expand Down

0 comments on commit cd22737

Please sign in to comment.