Skip to content

Commit

Permalink
Fix panic when repairing databases with some multimap tables
Browse files Browse the repository at this point in the history
Multimap tables with fixed width values would likely cause a panic when
recovering the database after a crash
  • Loading branch information
cberner committed Jun 29, 2023
1 parent 1fa7774 commit b59d32a
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 99 deletions.
142 changes: 50 additions & 92 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

use crate::error::TransactionError;
use crate::multimap_table::parse_subtree_roots;
use crate::multimap_table::{parse_subtree_roots, DynamicCollection};
use crate::sealed::Sealed;
use crate::transactions::SAVEPOINT_TABLE;
#[cfg(feature = "logging")]
Expand Down Expand Up @@ -263,76 +263,24 @@ impl Database {
self.mem.set_crash_countdown(value);
}

fn verify_primary_checksums(mem: &TransactionalMemory) -> Result<bool> {
if let Some((root, root_checksum)) = mem.get_data_root() {
if !RawBtree::new(
Some((root, root_checksum)),
<&str>::fixed_width(),
InternalTableDefinition::fixed_width(),
mem,
)
.verify_checksum()?
{
return Ok(false);
}

// Iterate over all other tables
let iter: BtreeRangeIter<&str, InternalTableDefinition> =
BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem)?;
for entry in iter {
let definition = entry?.value();
if let Some((table_root, table_checksum)) = definition.get_root() {
if !RawBtree::new(
Some((table_root, table_checksum)),
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
mem,
)
.verify_checksum()?
{
return Ok(false);
}
}
}
pub(crate) fn verify_primary_checksums(mem: &TransactionalMemory) -> Result<bool> {
let fake_freed_pages = Arc::new(Mutex::new(vec![]));
let table_tree = TableTree::new(mem.get_data_root(), mem, fake_freed_pages.clone());
if !table_tree.verify_checksums()? {
return Ok(false);
}

if let Some((root, root_checksum)) = mem.get_system_root() {
if !RawBtree::new(
Some((root, root_checksum)),
<&str>::fixed_width(),
InternalTableDefinition::fixed_width(),
mem,
)
.verify_checksum()?
{
return Ok(false);
}

// Iterate over all other tables
let iter: BtreeRangeIter<&str, InternalTableDefinition> =
BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem)?;
for entry in iter {
let definition = entry?.value();
if let Some((table_root, table_checksum)) = definition.get_root() {
if !RawBtree::new(
Some((table_root, table_checksum)),
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
mem,
)
.verify_checksum()?
{
return Ok(false);
}
}
}
let system_table_tree =
TableTree::new(mem.get_system_root(), mem, fake_freed_pages.clone());
if !system_table_tree.verify_checksums()? {
return Ok(false);
}
assert!(fake_freed_pages.lock().unwrap().is_empty());

if let Some((freed_root, freed_checksum)) = mem.get_freed_root() {
if !RawBtree::new(
Some((freed_root, freed_checksum)),
FreedTableKey::fixed_width(),
None,
FreedPageList::fixed_width(),
mem,
)
.verify_checksum()?
Expand Down Expand Up @@ -499,34 +447,44 @@ impl Database {
for entry in iter {
let definition = entry?.value();
if let Some((table_root, _)) = definition.get_root() {
let table_pages_iter = AllPageNumbersBtreeIter::new(
table_root,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
mem,
)?;
mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;

// Multimap tables may have additional subtrees in their values
if definition.get_type() == TableType::Multimap {
let table_pages_iter = AllPageNumbersBtreeIter::new(
table_root,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
mem,
)?;
for table_page in table_pages_iter {
let page = mem.get_page(table_page?)?;
let subtree_roots =
parse_subtree_roots(&page, definition.get_fixed_key_size());
for sub_root in subtree_roots {
let sub_root_iter = AllPageNumbersBtreeIter::new(
sub_root,
definition.get_fixed_value_size(),
<()>::fixed_width(),
mem,
)?;
mem.mark_pages_allocated(sub_root_iter, allow_duplicates)?;
match definition.get_type() {
TableType::Normal => {
let table_pages_iter = AllPageNumbersBtreeIter::new(
table_root,
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
mem,
)?;
mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
}
TableType::Multimap => {
let table_pages_iter = AllPageNumbersBtreeIter::new(
table_root,
definition.get_fixed_key_size(),
<&DynamicCollection>::fixed_width(),
mem,
)?;
mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;

let table_pages_iter = AllPageNumbersBtreeIter::new(
table_root,
definition.get_fixed_key_size(),
<&DynamicCollection>::fixed_width(),
mem,
)?;
for table_page in table_pages_iter {
let page = mem.get_page(table_page?)?;
let subtree_roots =
parse_subtree_roots(&page, definition.get_fixed_key_size());
for (sub_root, _) in subtree_roots {
let sub_root_iter = AllPageNumbersBtreeIter::new(
sub_root,
definition.get_fixed_value_size(),
<()>::fixed_width(),
mem,
)?;
mem.mark_pages_allocated(sub_root_iter, allow_duplicates)?;
}
}
}
}
Expand Down
52 changes: 47 additions & 5 deletions src/multimap_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use crate::multimap_table::DynamicCollectionType::{Inline, Subtree};
use crate::sealed::Sealed;
use crate::tree_store::{
AllPageNumbersBtreeIter, Btree, BtreeMut, BtreeRangeIter, Checksum, LeafAccessor, LeafMutator,
Page, PageHint, PageNumber, RawLeafBuilder, TransactionalMemory, UntypedBtreeMut, BRANCH, LEAF,
MAX_VALUE_LENGTH,
Page, PageHint, PageNumber, RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtreeMut,
BRANCH, LEAF, MAX_VALUE_LENGTH,
};
use crate::types::{RedbKey, RedbValue, TypeName};
use crate::{AccessGuard, Result, StorageError, WriteTransaction};
Expand All @@ -15,6 +15,48 @@ use std::mem::size_of;
use std::ops::{RangeBounds, RangeFull};
use std::sync::{Arc, Mutex};

// Verify all the checksums in the tree, including any Dynamic collection subtrees
pub(crate) fn verify_tree_and_subtree_checksums(
root: Option<(PageNumber, Checksum)>,
key_size: Option<usize>,
value_size: Option<usize>,
mem: &TransactionalMemory,
) -> Result<bool> {
if let Some((root, root_checksum)) = root {
if !RawBtree::new(
Some((root, root_checksum)),
key_size,
<&DynamicCollection>::fixed_width(),
mem,
)
.verify_checksum()?
{
return Ok(false);
}

let table_pages_iter =
AllPageNumbersBtreeIter::new(root, key_size, <&DynamicCollection>::fixed_width(), mem)?;
for table_page in table_pages_iter {
let page = mem.get_page(table_page?)?;
let subtree_roots = parse_subtree_roots(&page, key_size);
for (sub_root, sub_root_checksum) in subtree_roots {
if !RawBtree::new(
Some((sub_root, sub_root_checksum)),
value_size,
<()>::fixed_width(),
mem,
)
.verify_checksum()?
{
return Ok(false);
}
}
}
}

Ok(true)
}

// Finalize all the checksums in the tree, including any Dynamic collection subtrees
// Returns the root checksum
pub(crate) fn finalize_tree_and_subtree_checksums(
Expand Down Expand Up @@ -80,7 +122,7 @@ pub(crate) fn finalize_tree_and_subtree_checksums(
pub(crate) fn parse_subtree_roots<T: Page>(
page: &T,
fixed_key_size: Option<usize>,
) -> Vec<PageNumber> {
) -> Vec<(PageNumber, Checksum)> {
match page.memory()[0] {
BRANCH => {
vec![]
Expand All @@ -96,7 +138,7 @@ pub(crate) fn parse_subtree_roots<T: Page>(
let entry = accessor.entry(i).unwrap();
let collection = <&DynamicCollection>::from_bytes(entry.value());
if matches!(collection.collection_type(), DynamicCollectionType::Subtree) {
result.push(collection.as_subtree().0);
result.push(collection.as_subtree());
}
}

Expand Down Expand Up @@ -201,7 +243,7 @@ impl Into<u8> for DynamicCollectionType {
/// (when type = 2) checksum (16 bytes): sub tree checksum
#[derive(Debug)]
#[repr(transparent)]
struct DynamicCollection {
pub(crate) struct DynamicCollection {
data: [u8],
// TODO: include V type when GATs are stable
// _value_type: PhantomData<V>,
Expand Down
10 changes: 10 additions & 0 deletions src/tree_store/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ impl<'a, K: RedbKey + 'a, V: RedbValue + 'a> BtreeMut<'a, K, V> {
}
}

pub(crate) fn verify_checksum(&self) -> Result<bool> {
RawBtree::new(
self.get_root(),
K::fixed_width(),
V::fixed_width(),
self.mem,
)
.verify_checksum()
}

pub(crate) fn finalize_dirty_checksums(&mut self) -> Result {
let mut tree = UntypedBtreeMut::new(
self.get_root(),
Expand Down
48 changes: 46 additions & 2 deletions src/tree_store/table_tree.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::error::TableError;
use crate::multimap_table::finalize_tree_and_subtree_checksums;
use crate::multimap_table::{
finalize_tree_and_subtree_checksums, verify_tree_and_subtree_checksums,
};
use crate::tree_store::btree::{btree_stats, UntypedBtreeMut};
use crate::tree_store::btree_base::Checksum;
use crate::tree_store::btree_iters::AllPageNumbersBtreeIter;
use crate::tree_store::{BtreeMut, BtreeRangeIter, PageNumber, TransactionalMemory};
use crate::tree_store::{BtreeMut, BtreeRangeIter, PageNumber, RawBtree, TransactionalMemory};
use crate::types::{RedbKey, RedbValue, RedbValueMutInPlace, TypeName};
use crate::{DatabaseStats, Result};
use std::cmp::max;
Expand Down Expand Up @@ -476,6 +478,48 @@ impl<'txn> TableTree<'txn> {
self.pending_table_updates.clear();
}

pub(crate) fn verify_checksums(&self) -> Result<bool> {
assert!(self.pending_table_updates.is_empty());
if !self.tree.verify_checksum()? {
return Ok(false);
}

for entry in self.tree.range::<RangeFull, &str>(&(..))? {
let entry = entry?;
let definition = entry.value();
// TODO: all these matches on table_type() are pretty errorprone, because you can call .get_fixed_value_size() on either.
// Maybe InternalTableDefinition should be an enum for normal and multimap, instead
match definition.get_type() {
TableType::Normal => {
if let Some((table_root, table_checksum)) = definition.get_root() {
if !RawBtree::new(
Some((table_root, table_checksum)),
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
self.mem,
)
.verify_checksum()?
{
return Ok(false);
}
}
}
TableType::Multimap => {
if !verify_tree_and_subtree_checksums(
definition.get_root(),
definition.get_fixed_key_size(),
definition.get_fixed_value_size(),
self.mem,
)? {
return Ok(false);
}
}
}
}

Ok(true)
}

pub(crate) fn flush_table_root_updates(&mut self) -> Result<Option<(PageNumber, Checksum)>> {
for (name, table_root) in self.pending_table_updates.drain() {
// Bypass .get_table() since the table types are dynamic
Expand Down

0 comments on commit b59d32a

Please sign in to comment.