Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store linear blocks in freezer db #5905

Draft
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
warp = { version = "0.3.7", default-features = false, features = ["tls"] }
zeroize = { version = "1", features = ["zeroize_derive"] }
zip = "0.6"
zstd = "0.11.2"

# Local crates.
account_utils = { path = "common/account_utils" }
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
continue;
}

match streamer.beacon_chain.store.try_get_full_block(&root) {
match streamer.beacon_chain.store.try_get_full_block(&root, None) {
Err(e) => db_blocks.push((root, Err(e.into()))),
Ok(opt_block) => db_blocks.push((
root,
Expand Down
14 changes: 10 additions & 4 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<Option<SignedBlindedBeaconBlock<T::EthSpec>>, Error> {
let root = self.block_root_at_slot(request_slot, skips)?;

// Only hint the slot if expect a block at this exact slot.
let slot_hint = match skips {
WhenSlotSkipped::Prev => None,
WhenSlotSkipped::None => Some(request_slot),
};

if let Some(block_root) = root {
Ok(self.store.get_blinded_block(&block_root)?)
Ok(self.store.get_blinded_block(&block_root, slot_hint)?)
} else {
Ok(None)
}
Expand Down Expand Up @@ -1180,7 +1186,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
// Load block from database, returning immediately if we have the full block w payload
// stored.
let blinded_block = match self.store.try_get_full_block(block_root)? {
let blinded_block = match self.store.try_get_full_block(block_root, None)? {
Some(DatabaseBlock::Full(block)) => return Ok(Some(block)),
Some(DatabaseBlock::Blinded(block)) => block,
None => return Ok(None),
Expand Down Expand Up @@ -1248,7 +1254,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
) -> Result<Option<SignedBlindedBeaconBlock<T::EthSpec>>, Error> {
Ok(self.store.get_blinded_block(block_root)?)
Ok(self.store.get_blinded_block(block_root, None)?)
}

/// Return the status of a block as it progresses through the various caches of the beacon
Expand Down Expand Up @@ -6379,7 +6385,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let beacon_block = self
.store
.get_blinded_block(&beacon_block_root)?
.get_blinded_block(&beacon_block_root, None)?
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing block {}", beacon_block_root))
})?;
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ where
metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES);
let justified_block = self
.store
.get_blinded_block(&self.justified_checkpoint.root)
.get_blinded_block(&self.justified_checkpoint.root, None)
.map_err(Error::FailedToReadBlock)?
.ok_or(Error::MissingBlock(self.justified_checkpoint.root))?
.deconstruct()
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ where
.ok_or("Fork choice not found in store")?;

let genesis_block = store
.get_blinded_block(&chain.genesis_block_root)
.get_blinded_block(&chain.genesis_block_root, Some(Slot::new(0)))
.map_err(|e| descriptive_db_error("genesis block", &e))?
.ok_or("Genesis block not found in store")?;
let genesis_state = store
Expand Down Expand Up @@ -732,7 +732,7 @@ where
// Try to decode the head block according to the current fork, if that fails, try
// to backtrack to before the most recent fork.
let (head_block_root, head_block, head_reverted) =
match store.get_full_block(&initial_head_block_root) {
match store.get_full_block(&initial_head_block_root, None) {
Ok(Some(block)) => (initial_head_block_root, block, false),
Ok(None) => return Err("Head block not found in store".into()),
Err(StoreError::SszDecodeError(_)) => {
Expand Down Expand Up @@ -1212,7 +1212,7 @@ mod test {
assert_eq!(
chain
.store
.get_blinded_block(&Hash256::zero())
.get_blinded_block(&Hash256::zero(), None)
.expect("should read db")
.expect("should find genesis block"),
block.clone_as_blinded(),
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl<T: BeaconChainTypes> CanonicalHead<T> {
let fork_choice_view = fork_choice.cached_fork_choice_view();
let beacon_block_root = fork_choice_view.head_block_root;
let beacon_block = store
.get_full_block(&beacon_block_root)?
.get_full_block(&beacon_block_root, None)?
.ok_or(Error::MissingBeaconBlock(beacon_block_root))?;
let current_slot = fork_choice.fc_store().get_current_slot();
let (_, beacon_state) = store
Expand Down Expand Up @@ -651,7 +651,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut new_snapshot = {
let beacon_block = self
.store
.get_full_block(&new_view.head_block_root)?
.get_full_block(&new_view.head_block_root, None)?
.ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?;

let (_, beacon_state) = self
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/fork_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
let finalized_checkpoint = head_state.finalized_checkpoint();
let finalized_block_root = finalized_checkpoint.root;
let finalized_block = store
.get_full_block(&finalized_block_root)
.get_full_block(&finalized_block_root, None)
.map_err(|e| format!("Error loading finalized block: {:?}", e))?
.ok_or_else(|| {
format!(
Expand Down
15 changes: 7 additions & 8 deletions beacon_node/beacon_chain/src/light_client_server_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
let signature_slot = block_slot;
let attested_block_root = block_parent_root;

let attested_block =
store
.get_full_block(attested_block_root)?
.ok_or(BeaconChainError::DBInconsistent(format!(
"Block not available {:?}",
attested_block_root
)))?;
let attested_block = store.get_full_block(attested_block_root, None)?.ok_or(
BeaconChainError::DBInconsistent(format!(
"Block not available {:?}",
attested_block_root
)),
)?;

let cached_parts = self.get_or_compute_prev_block_cache(
store.clone(),
Expand Down Expand Up @@ -130,7 +129,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
if is_latest_finality & !cached_parts.finalized_block_root.is_zero() {
// Immediately after checkpoint sync the finalized block may not be available yet.
if let Some(finalized_block) =
store.get_full_block(&cached_parts.finalized_block_root)?
store.get_full_block(&cached_parts.finalized_block_root, None)?
{
*self.latest_finality_update.write() = Some(LightClientFinalityUpdate::new(
&attested_block,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// so delete it from the head tracker but leave it and its states in the database
// This is suboptimal as it wastes disk space, but it's difficult to fix. A re-sync
// can be used to reclaim the space.
let head_state_root = match store.get_blinded_block(&head_hash) {
let head_state_root = match store.get_blinded_block(&head_hash, Some(head_slot)) {
Ok(Some(block)) => block.state_root(),
Ok(None) => {
return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into())
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/pre_finalization_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

// 2. Check on disk.
if self.store.get_blinded_block(&block_root)?.is_some() {
if self.store.get_blinded_block(&block_root, None)?.is_some() {
cache.block_roots.put(block_root, ());
return Ok(true);
}
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod migration_schema_v17;
mod migration_schema_v18;
mod migration_schema_v19;
mod migration_schema_v20;

use crate::beacon_chain::BeaconChainTypes;
use crate::types::ChainSpec;
Expand Down Expand Up @@ -78,6 +79,13 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v19::downgrade_from_v19::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(19), SchemaVersion(20)) => {
let ops = migration_schema_v20::upgrade_to_v20::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(20), SchemaVersion(19)) => {
unimplemented!()
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn get_slot_clock<T: BeaconChainTypes>(
log: &Logger,
) -> Result<Option<T::SlotClock>, Error> {
let spec = db.get_chain_spec();
let Some(genesis_block) = db.get_blinded_block(&Hash256::zero())? else {
let Some(genesis_block) = db.get_blinded_block(&Hash256::zero(), Some(Slot::new(0)))? else {
error!(log, "Missing genesis block");
return Ok(None);
};
Expand Down
108 changes: 108 additions & 0 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use crate::BeaconChainTypes;
use slog::{info, Logger};
use std::sync::Arc;
use store::{get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp};
use types::{Hash256, Slot};

/// Chunk size for freezer block roots in the old database schema.
const OLD_SCHEMA_CHUNK_SIZE: u64 = 128;

fn old_schema_chunk_key(cindex: u64) -> [u8; 8] {
(cindex + 1).to_be_bytes()
}

pub fn upgrade_to_v20<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Upgrading freezer database schema");
upgrade_freezer_database::<T>(&db, &log)?;

// No hot DB changes
return Ok(vec![]);
Copy link
Collaborator Author

@dapplion dapplion Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelsproul thoughts on migrating all finalized blocks? If we just delete that's going to be a lot of bandwidth on the entire network to backfill. However, migrating all blocks will take a while. Are DB migrations blocking the start routine of Lighthouse?

}

fn upgrade_freezer_database<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
log: &Logger,
) -> Result<(), Error> {
let mut cold_db_ops = vec![];

// Re-write the beacon block roots array.
let mut freezer_block_roots = vec![];
let oldest_block_slot = db.get_oldest_block_slot();
let mut current_slot = oldest_block_slot;

for result in db
.cold_db
.iter_column::<Vec<u8>>(DBColumn::BeaconBlockRoots)
{
let (chunk_key, chunk_bytes) = result?;

// Stage this chunk for deletion.
cold_db_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconBlockRoots.into(),
&chunk_key,
)));

// Skip the 0x0 key which is for the genesis block.
if chunk_key.iter().all(|b| *b == 0u8) {
continue;
}
// Skip the 0x00..01 key which is for slot 0.
if chunk_key == old_schema_chunk_key(0).as_slice() && current_slot != 0 {
continue;
}

let current_chunk_index = current_slot.as_u64() / OLD_SCHEMA_CHUNK_SIZE;
if chunk_key != old_schema_chunk_key(current_chunk_index).as_slice() {
return Err(Error::DBError {
message: format!(
"expected chunk index {} but got {:?}",
current_chunk_index, chunk_key
),
});
}

for (i, block_root_bytes) in chunk_bytes.chunks_exact(32).enumerate() {
let block_root = Hash256::from_slice(block_root_bytes);

if block_root.is_zero() {
continue;
}

let slot = Slot::new(current_chunk_index * OLD_SCHEMA_CHUNK_SIZE + i as u64);
if slot != current_slot {
return Err(Error::DBError {
message: format!(
"expected block root for slot {} but got {}",
current_slot, slot
),
});
}
freezer_block_roots.push((slot, block_root));
current_slot += 1;
}
}

// Write the freezer block roots in the new schema.
for (slot, block_root) in freezer_block_roots {
cold_db_ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(
DBColumn::BeaconBlockRoots.into(),
&slot.as_u64().to_be_bytes(),
),
block_root.as_bytes().to_vec(),
));
}

db.cold_db.do_atomically(cold_db_ops)?;
info!(
log,
"Freezer database upgrade complete";
"oldest_block_slot" => oldest_block_slot,
"newest_block_slot" => current_slot - 1
);

Ok(())
}
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/tests/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ async fn attestation_that_skips_epochs() {
let block_slot = harness
.chain
.store
.get_blinded_block(&block_root)
.get_blinded_block(&block_root, None)
.expect("should not error getting block")
.expect("should find attestation block")
.message()
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/tests/payload_invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl InvalidPayloadRig {
self.harness
.chain
.store
.get_full_block(&block_root)
.get_full_block(&block_root, None)
.unwrap()
.unwrap(),
*block,
Expand Down
12 changes: 6 additions & 6 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ async fn epoch_boundary_state_attestation_processing() {
// load_epoch_boundary_state is idempotent!
let block_root = attestation.data.beacon_block_root;
let block = store
.get_blinded_block(&block_root)
.get_blinded_block(&block_root, None)
.unwrap()
.expect("block exists");
let epoch_boundary_state = store
Expand Down Expand Up @@ -849,7 +849,7 @@ async fn delete_blocks_and_states() {
);

let faulty_head_block = store
.get_blinded_block(&faulty_head.into())
.get_blinded_block(&faulty_head.into(), None)
.expect("no errors")
.expect("faulty head block exists");

Expand Down Expand Up @@ -891,7 +891,7 @@ async fn delete_blocks_and_states() {
break;
}
store.delete_block(&block_root).unwrap();
assert_eq!(store.get_blinded_block(&block_root).unwrap(), None);
assert_eq!(store.get_blinded_block(&block_root, None).unwrap(), None);
}

// Deleting frozen states should do nothing
Expand Down Expand Up @@ -1135,7 +1135,7 @@ fn get_state_for_block(harness: &TestHarness, block_root: Hash256) -> BeaconStat
let head_block = harness
.chain
.store
.get_blinded_block(&block_root)
.get_blinded_block(&block_root, None)
.unwrap()
.unwrap();
harness
Expand Down Expand Up @@ -2355,7 +2355,7 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
let wss_block = harness
.chain
.store
.get_full_block(&wss_block_root)
.get_full_block(&wss_block_root, None)
.unwrap()
.unwrap();
let wss_blobs_opt = harness.chain.store.get_blobs(&wss_block_root).unwrap();
Expand Down Expand Up @@ -2576,7 +2576,7 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
.unwrap()
.map(Result::unwrap)
{
let block = store.get_blinded_block(&block_root).unwrap().unwrap();
let block = store.get_blinded_block(&block_root, None).unwrap().unwrap();
if block_root != prev_block_root {
assert_eq!(block.slot(), slot);
}
Expand Down
Loading
Loading