Skip to content

Commit

Permalink
Introduce atomic DB operations
Browse files Browse the repository at this point in the history
  • Loading branch information
adaszko committed May 13, 2020
1 parent 19bda54 commit 7dc9ffd
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 11 deletions.
20 changes: 11 additions & 9 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use store::iter::{ParentRootBlockIterator, RootsIterator};
use store::{hot_cold_store::HotColdDBError, Error, SimpleDiskStore, Store};
use store::{hot_cold_store::HotColdDBError, Error, SimpleDiskStore, Store, StoreOp};
pub use store::{DiskStore, MemoryStore};
use types::*;
use types::{BeaconState, EthSpec, Hash256, Slot};
Expand Down Expand Up @@ -145,14 +145,16 @@ pub trait Migrate<S: Store<E>, E: EthSpec>: Send + Sync + 'static {
}
}

// XXX Should be performed atomically, see
// https://github.com/sigp/lighthouse/issues/692
for block_hash in abandoned_blocks.into_iter() {
store.delete_block(&block_hash.into())?;
}
for (slot, state_hash) in abandoned_states.into_iter() {
store.delete_state(&state_hash.into(), slot)?;
}
let batch: Vec<StoreOp> = abandoned_blocks
.into_iter()
.map(|block_hash| StoreOp::DeleteBlock(block_hash))
.chain(
abandoned_states
.into_iter()
.map(|(slot, state_hash)| StoreOp::DeleteState(state_hash, slot)),
)
.collect();
store.do_atomically(&batch)?;
for head_hash in abandoned_heads.into_iter() {
head_tracker.remove_head(head_hash);
}
Expand Down
16 changes: 16 additions & 0 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::metrics;
use crate::{
leveldb_store::LevelDB, DBColumn, Error, PartialBeaconState, SimpleStoreItem, Store, StoreItem,
StoreOp,
};
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -203,6 +204,21 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
Ok(())
}

fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
let mut guard = self.block_cache.lock();
self.hot_db.do_atomically(batch)?;
for op in batch {
match op {
StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
guard.pop(&untyped_hash);
},
StoreOp::DeleteState(_, _) => (),
}
}
Ok(())
}

/// Advance the split point of the store, moving new finalized states to the freezer.
fn process_finalization(
store: Arc<Self>,
Expand Down
30 changes: 30 additions & 0 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::forwards_iter::SimpleForwardsBlockRootsIterator;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::metrics;
use db_key::Key;
use leveldb::database::batch::{Batch, Writebatch};
use leveldb::database::kv::KV;
use leveldb::database::Database;
use leveldb::error::Error as LevelDBError;
Expand Down Expand Up @@ -145,6 +146,35 @@ impl<E: EthSpec> Store<E> for LevelDB<E> {
) -> Self::ForwardsBlockRootsIterator {
SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root)
}

fn do_atomically(&self, ops_batch: &[StoreOp]) -> Result<(), Error> {
let mut leveldb_batch = Writebatch::new();
for op in ops_batch {
match op {
StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
let key = Self::get_key_for_col(
DBColumn::BeaconBlock.into(),
untyped_hash.as_bytes(),
);
leveldb_batch.delete(key);
}

StoreOp::DeleteState(state_hash, slot) => {
let untyped_hash: Hash256 = (*state_hash).into();
let state_summary_key = Self::get_key_for_col(DBColumn::BeaconStateSummary.into(), untyped_hash.as_bytes());
leveldb_batch.delete(state_summary_key);

if *slot % E::slots_per_epoch() == 0 {
let state_key = Self::get_key_for_col(DBColumn::BeaconState.into(), untyped_hash.as_bytes());
leveldb_batch.delete(state_key);
}
}
}
}
self.db.write(self.write_options(), &leveldb_batch)?;
Ok(())
}
}

impl From<LevelDBError> for Error {
Expand Down
10 changes: 9 additions & 1 deletion beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,15 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {

/// Delete a block from the store.
fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
self.delete::<SignedBeaconBlock<E>>(block_root)
self.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())
}

/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error>;

/// Execute either all of operations in `batch` or none at all, returning an error.
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>;

/// Store a state summary in the store.
// NOTE: this is a hack for the HotColdDb, we could consider splitting this
// trait and removing the generic `S: Store` types everywhere?
Expand Down Expand Up @@ -180,6 +183,11 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
}
}

pub enum StoreOp {
DeleteBlock(SignedBeaconBlockHash),
DeleteState(BeaconStateHash, Slot),
}

/// A unique column identifier.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DBColumn {
Expand Down
26 changes: 25 additions & 1 deletion beacon_node/store/src/memory_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Error, Store};
use super::{DBColumn, Error, Store, StoreOp};
use crate::forwards_iter::SimpleForwardsBlockRootsIterator;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use parking_lot::RwLock;
Expand Down Expand Up @@ -89,6 +89,30 @@ impl<E: EthSpec> Store<E> for MemoryStore<E> {
get_full_state(self, state_root)
}

fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
for op in batch {
match op {
StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
self.key_delete(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes())?;
}

StoreOp::DeleteState(state_hash, slot) => {
let untyped_hash: Hash256 = (*state_hash).into();
if *slot % E::slots_per_epoch() == 0 {
self.key_delete(DBColumn::BeaconState.into(), untyped_hash.as_bytes())?;
} else {
self.key_delete(
DBColumn::BeaconStateSummary.into(),
untyped_hash.as_bytes(),
)?;
}
}
}
}
Ok(())
}

fn forwards_block_roots_iterator(
store: Arc<Self>,
start_slot: Slot,
Expand Down

0 comments on commit 7dc9ffd

Please sign in to comment.