Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore crash safety for database pruning #4975

Merged
merged 2 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor};
use types::{
BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256, Signature,
BeaconBlock, BeaconState, ChainSpec, Epoch, EthSpec, Graffiti, Hash256, Signature,
SignedBeaconBlock, Slot,
};

Expand Down Expand Up @@ -559,16 +559,6 @@ where
.map_err(|e| format!("Failed to initialize blob info: {:?}", e))?,
);

// Store pruning checkpoint to prevent attempting to prune before the anchor state.
self.pending_io_batch.push(
store
.pruning_checkpoint_store_op(Checkpoint {
root: weak_subj_block_root,
epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()),
})
.map_err(|e| format!("{:?}", e))?,
);

let snapshot = BeaconSnapshot {
beacon_block_root: weak_subj_block_root,
beacon_block: Arc::new(weak_subj_block),
Expand Down
39 changes: 25 additions & 14 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,13 +512,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
genesis_block_root: Hash256,
log: &Logger,
) -> Result<PruningOutcome, BeaconChainError> {
let old_finalized_checkpoint =
store
.load_pruning_checkpoint()?
.unwrap_or_else(|| Checkpoint {
epoch: Epoch::new(0),
root: Hash256::zero(),
});
let old_finalized_checkpoint = store.get_pruning_checkpoint();

let old_finalized_slot = old_finalized_checkpoint
.epoch
Expand Down Expand Up @@ -572,6 +566,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
})
.collect::<Result<_, _>>()?;

// Quick sanity check. If the canonical block & state roots are incorrect then we could
// incorrectly delete canonical states, which would corrupt the database.
let expected_canonical_block_roots = new_finalized_slot
.saturating_sub(old_finalized_slot)
.as_usize()
.saturating_add(1);
if newly_finalized_chain.len() != expected_canonical_block_roots {
return Err(BeaconChainError::DBInconsistent(format!(
"canonical chain iterator is corrupt; \
expected {} but got {} block roots",
expected_canonical_block_roots,
newly_finalized_chain.len()
)));
}

// We don't know which blocks are shared among abandoned chains, so we buffer and delete
// everything in one fell swoop.
let mut abandoned_blocks: HashSet<SignedBeaconBlockHash> = HashSet::new();
Expand Down Expand Up @@ -735,11 +744,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY)?,
));

// Persist the new finalized checkpoint as the pruning checkpoint.
batch.push(StoreOp::KeyValueOp(
store.pruning_checkpoint_store_op(new_finalized_checkpoint)?,
));

store.do_atomically_with_block_and_blobs_cache(batch)?;
debug!(
log,
Expand All @@ -753,19 +757,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
let (state_root, summary) = res?;

if summary.slot <= new_finalized_slot {
// If state root doesn't match state root from canonical chain, or this slot
// is not part of the recently finalized chain, then delete.
// If state root doesn't match state root from canonical chain, then delete.
// We may also find older states here that should have been deleted by `migrate_db`
// but weren't due to wonky I/O atomicity.
if newly_finalized_chain
.get(&summary.slot)
.map_or(true, |(_, canonical_state_root)| {
state_root != Hash256::from(*canonical_state_root)
})
{
let reason = if summary.slot < old_finalized_slot {
"old dangling state"
} else {
"non-canonical"
};
debug!(
log,
"Deleting state";
"state_root" => ?state_root,
"slot" => summary.slot,
"reason" => reason,
);
state_delete_batch.push(StoreOp::DeleteState(state_root, Some(summary.slot)));
}
Expand Down
62 changes: 30 additions & 32 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
use crate::leveldb_store::{BytesKey, LevelDB};
use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION,
PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
AnchorInfo, BlobInfo, CompactionTimestamp, SchemaVersion, ANCHOR_INFO_KEY, BLOB_INFO_KEY,
COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY,
STATE_UPPER_LIMIT_NO_RETAIN,
};
use crate::metrics;
use crate::state_cache::{PutStateOutcome, StateCache};
Expand Down Expand Up @@ -77,6 +77,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded.
block_cache: Mutex<BlockCache<E>>,
/// Cache of beacon states.
///
/// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required.
state_cache: Mutex<StateCache<E>>,
/// Immutable validator cache.
pub immutable_validators: Arc<RwLock<ValidatorPubkeyCache<E, Hot, Cold>>>,
Expand Down Expand Up @@ -2385,26 +2387,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.config.compact_on_prune
}

/// Load the checkpoint to begin pruning from (the "old finalized checkpoint").
pub fn load_pruning_checkpoint(&self) -> Result<Option<Checkpoint>, Error> {
Ok(self
.hot_db
.get(&PRUNING_CHECKPOINT_KEY)?
.map(|pc: PruningCheckpoint| pc.checkpoint))
}

/// Store the checkpoint to begin pruning from (the "old finalized checkpoint").
pub fn store_pruning_checkpoint(&self, checkpoint: Checkpoint) -> Result<(), Error> {
self.hot_db
.do_atomically(vec![self.pruning_checkpoint_store_op(checkpoint)?])
}

/// Create a staged store for the pruning checkpoint.
pub fn pruning_checkpoint_store_op(
&self,
checkpoint: Checkpoint,
) -> Result<KeyValueStoreOp, Error> {
PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY)
/// Get the checkpoint to begin pruning from (the "old finalized checkpoint").
pub fn get_pruning_checkpoint(&self) -> Checkpoint {
// Since tree-states we infer the pruning checkpoint from the split, as this is simpler &
// safer in the presence of crashes that occur after pruning but before the split is
// updated.
// FIXME(sproul): ensure delete PRUNING_CHECKPOINT_KEY is deleted in DB migration
let split = self.get_split_info();
Checkpoint {
epoch: split.slot.epoch(E::slots_per_epoch()),
root: split.block_root,
}
}

/// Load the timestamp of the last compaction as a `Duration` since the UNIX epoch.
Expand Down Expand Up @@ -2917,8 +2910,8 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
}

// There are data dependencies between calls to `store_cold_state()` that prevent us from
// doing one big call to `store.cold_db.do_atomically()` at end of the loop.
// Cold states are diffed with respect to each other, so we need to finish writing previous
// states before storing new ones.
store.cold_db.do_atomically(cold_db_ops)?;
}

Expand All @@ -2927,15 +2920,20 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// procedure.
//
// Since it is pretty much impossible to be atomic across more than one database, we trade
// losing track of states to delete, for consistency. In other words: We should be safe to die
// at any point below but it may happen that some states won't be deleted from the hot database
// and will remain there forever. Since dying in these particular few lines should be an
// exceedingly rare event, this should be an acceptable tradeoff.
// temporarily losing track of blocks to delete, for consistency. In other words: We should be
// safe to die at any point below but it may happen that some blocks won't be deleted from the
// hot database and will remain there forever. We may also temporarily abandon states, but
// they will get picked up by the state pruning that iterates over the whole column.

// Flush to disk all the states that have just been migrated to the cold store.
store.cold_db.do_atomically(cold_db_block_ops)?;
store.cold_db.sync()?;

// Update the split.
//
// NOTE(sproul): We do this in its own fsync'd transaction mostly for historical reasons, but
// I'm scared to change it, because doing an fsync with *more data* while holding the split
// write lock might have terrible performance implications (jamming the split for 100-500ms+).
{
let mut split_guard = store.split.write();
let latest_split_slot = split_guard.slot;
Expand Down Expand Up @@ -2966,13 +2964,13 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
};
store.hot_db.put_sync(&SPLIT_KEY, &split)?;

// Split point is now persisted in the hot database on disk. The in-memory split point
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update
// Split point is now persisted in the hot database on disk. The in-memory split point
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update
// the in-memory split point now.
*split_guard = split;
}

// Delete the states from the hot database if we got this far.
// Delete the blocks and states from the hot database if we got this far.
store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?;

// Update the cache's view of the finalized state.
Expand Down
21 changes: 3 additions & 18 deletions database_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,32 +294,17 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result<InspectConfig, String>
pub fn inspect_db<E: EthSpec>(
inspect_config: InspectConfig,
client_config: ClientConfig,
runtime_context: &RuntimeContext<E>,
log: Logger,
) -> Result<(), String> {
let spec = runtime_context.eth2_config.spec.clone();
let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path();
let blobs_path = client_config.get_blobs_db_path();

let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path,
&cold_path,
&blobs_path,
|_, _, _| Ok(()),
client_config.store,
spec,
log,
)
.map_err(|e| format!("{:?}", e))?;

let mut total = 0;
let mut num_keys = 0;

let sub_db = if inspect_config.freezer {
&db.cold_db
LevelDB::<E>::open(&cold_path).map_err(|e| format!("Unable to open freezer DB: {e:?}"))?
} else {
&db.hot_db
LevelDB::<E>::open(&hot_path).map_err(|e| format!("Unable to open hot DB: {e:?}"))?
};

let skip = inspect_config.skip.unwrap_or(0);
Expand Down Expand Up @@ -653,7 +638,7 @@ pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, env: Environment<T>) -> Result
}
("inspect", Some(cli_args)) => {
let inspect_config = parse_inspect_config(cli_args)?;
inspect_db(inspect_config, client_config, &context, log)
inspect_db::<T>(inspect_config, client_config)
}
("prune-payloads", Some(_)) => {
prune_payloads(client_config, &context, log).map_err(format_err)
Expand Down