Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store linear blocks in freezer db #5905

Draft
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
warp = { version = "0.3.7", default-features = false, features = ["tls"] }
zeroize = { version = "1", features = ["zeroize_derive"] }
zip = "0.6"
zstd = "0.11.2"

# Local crates.
account_utils = { path = "common/account_utils" }
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
continue;
}

match streamer.beacon_chain.store.try_get_full_block(&root) {
match streamer.beacon_chain.store.try_get_full_block(&root, None) {
Err(e) => db_blocks.push((root, Err(e.into()))),
Ok(opt_block) => db_blocks.push((
root,
Expand Down
14 changes: 10 additions & 4 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<Option<SignedBlindedBeaconBlock<T::EthSpec>>, Error> {
let root = self.block_root_at_slot(request_slot, skips)?;

// Only hint the slot if expect a block at this exact slot.
let slot_hint = match skips {
WhenSlotSkipped::Prev => None,
WhenSlotSkipped::None => Some(request_slot),
};

if let Some(block_root) = root {
Ok(self.store.get_blinded_block(&block_root)?)
Ok(self.store.get_blinded_block(&block_root, slot_hint)?)
} else {
Ok(None)
}
Expand Down Expand Up @@ -1180,7 +1186,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
// Load block from database, returning immediately if we have the full block w payload
// stored.
let blinded_block = match self.store.try_get_full_block(block_root)? {
let blinded_block = match self.store.try_get_full_block(block_root, None)? {
Some(DatabaseBlock::Full(block)) => return Ok(Some(block)),
Some(DatabaseBlock::Blinded(block)) => block,
None => return Ok(None),
Expand Down Expand Up @@ -1248,7 +1254,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
block_root: &Hash256,
) -> Result<Option<SignedBlindedBeaconBlock<T::EthSpec>>, Error> {
Ok(self.store.get_blinded_block(block_root)?)
Ok(self.store.get_blinded_block(block_root, None)?)
}

/// Return the status of a block as it progresses through the various caches of the beacon
Expand Down Expand Up @@ -6379,7 +6385,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let beacon_block = self
.store
.get_blinded_block(&beacon_block_root)?
.get_blinded_block(&beacon_block_root, None)?
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing block {}", beacon_block_root))
})?;
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ where
metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES);
let justified_block = self
.store
.get_blinded_block(&self.justified_checkpoint.root)
.get_blinded_block(&self.justified_checkpoint.root, None)
.map_err(Error::FailedToReadBlock)?
.ok_or(Error::MissingBlock(self.justified_checkpoint.root))?
.deconstruct()
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ where
.ok_or("Fork choice not found in store")?;

let genesis_block = store
.get_blinded_block(&chain.genesis_block_root)
.get_blinded_block(&chain.genesis_block_root, Some(Slot::new(0)))
.map_err(|e| descriptive_db_error("genesis block", &e))?
.ok_or("Genesis block not found in store")?;
let genesis_state = store
Expand Down Expand Up @@ -732,7 +732,7 @@ where
// Try to decode the head block according to the current fork, if that fails, try
// to backtrack to before the most recent fork.
let (head_block_root, head_block, head_reverted) =
match store.get_full_block(&initial_head_block_root) {
match store.get_full_block(&initial_head_block_root, None) {
Ok(Some(block)) => (initial_head_block_root, block, false),
Ok(None) => return Err("Head block not found in store".into()),
Err(StoreError::SszDecodeError(_)) => {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl<T: BeaconChainTypes> CanonicalHead<T> {
let fork_choice_view = fork_choice.cached_fork_choice_view();
let beacon_block_root = fork_choice_view.head_block_root;
let beacon_block = store
.get_full_block(&beacon_block_root)?
.get_full_block(&beacon_block_root, None)?
.ok_or(Error::MissingBeaconBlock(beacon_block_root))?;
let current_slot = fork_choice.fc_store().get_current_slot();
let (_, beacon_state) = store
Expand Down Expand Up @@ -651,7 +651,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut new_snapshot = {
let beacon_block = self
.store
.get_full_block(&new_view.head_block_root)?
.get_full_block(&new_view.head_block_root, None)?
.ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?;

let (_, beacon_state) = self
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/fork_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
let finalized_checkpoint = head_state.finalized_checkpoint();
let finalized_block_root = finalized_checkpoint.root;
let finalized_block = store
.get_full_block(&finalized_block_root)
.get_full_block(&finalized_block_root, None)
.map_err(|e| format!("Error loading finalized block: {:?}", e))?
.ok_or_else(|| {
format!(
Expand Down
15 changes: 7 additions & 8 deletions beacon_node/beacon_chain/src/light_client_server_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
let signature_slot = block_slot;
let attested_block_root = block_parent_root;

let attested_block =
store
.get_full_block(attested_block_root)?
.ok_or(BeaconChainError::DBInconsistent(format!(
"Block not available {:?}",
attested_block_root
)))?;
let attested_block = store.get_full_block(attested_block_root, None)?.ok_or(
BeaconChainError::DBInconsistent(format!(
"Block not available {:?}",
attested_block_root
)),
)?;

let cached_parts = self.get_or_compute_prev_block_cache(
store.clone(),
Expand Down Expand Up @@ -130,7 +129,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
if is_latest_finality & !cached_parts.finalized_block_root.is_zero() {
// Immediately after checkpoint sync the finalized block may not be available yet.
if let Some(finalized_block) =
store.get_full_block(&cached_parts.finalized_block_root)?
store.get_full_block(&cached_parts.finalized_block_root, None)?
{
*self.latest_finality_update.write() = Some(LightClientFinalityUpdate::new(
&attested_block,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// so delete it from the head tracker but leave it and its states in the database
// This is suboptimal as it wastes disk space, but it's difficult to fix. A re-sync
// can be used to reclaim the space.
let head_state_root = match store.get_blinded_block(&head_hash) {
let head_state_root = match store.get_blinded_block(&head_hash, Some(head_slot)) {
Ok(Some(block)) => block.state_root(),
Ok(None) => {
return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into())
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/pre_finalization_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

// 2. Check on disk.
if self.store.get_blinded_block(&block_root)?.is_some() {
if self.store.get_blinded_block(&block_root, None)?.is_some() {
cache.block_roots.put(block_root, ());
return Ok(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn get_slot_clock<T: BeaconChainTypes>(
log: &Logger,
) -> Result<Option<T::SlotClock>, Error> {
let spec = db.get_chain_spec();
let Some(genesis_block) = db.get_blinded_block(&Hash256::zero())? else {
let Some(genesis_block) = db.get_blinded_block(&Hash256::zero(), Some(Slot::new(0)))? else {
error!(log, "Missing genesis block");
return Ok(None);
};
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,16 @@ pub fn cli_app() -> Command {
.default_value("true")
.display_order(0)
)
.arg(
Arg::new("compression-level")
.long("compression-level")
.value_name("LEVEL")
.help("Compression level (-99 to 22) for zstd compression applied to states on disk \
[default: 1]. You may change the compression level freely without re-syncing.")
.action(ArgAction::Set)
.default_value("1")
.display_order(0)
)
.arg(
Arg::new("prune-payloads")
.long("prune-payloads")
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ lru = { workspace = true }
sloggers = { workspace = true }
directory = { workspace = true }
strum = { workspace = true }
zstd = { workspace = true }
27 changes: 27 additions & 0 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(5);
pub const DEFAULT_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(128);
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
const EST_COMPRESSION_FACTOR: usize = 2;
pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(1);
pub const DEFAULT_EPOCHS_PER_BLOB_PRUNE: u64 = 1;
pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0;
Expand All @@ -25,6 +27,8 @@ pub struct StoreConfig {
pub block_cache_size: NonZeroUsize,
/// Maximum number of states to store in the in-memory state cache.
pub state_cache_size: NonZeroUsize,
/// Compression level for blocks, state diffs and other compressed values.
pub compression_level: i32,
/// Maximum number of states from freezer database to store in the in-memory state cache.
pub historic_state_cache_size: NonZeroUsize,
/// Whether to compact the database on initialization.
Expand All @@ -33,6 +37,8 @@ pub struct StoreConfig {
pub compact_on_prune: bool,
/// Whether to prune payloads on initialization and finalization.
pub prune_payloads: bool,
/// Whether to store finalized blocks compressed and linearised in the freezer database.
pub linear_blocks: bool,
/// Whether to prune blobs older than the blob data availability boundary.
pub prune_blobs: bool,
/// Frequency of blob pruning in epochs. Default: 1 (every epoch).
Expand Down Expand Up @@ -61,10 +67,12 @@ impl Default for StoreConfig {
slots_per_restore_point_set_explicitly: false,
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
compression_level: DEFAULT_COMPRESSION_LEVEL,
historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE,
compact_on_init: false,
compact_on_prune: true,
prune_payloads: true,
linear_blocks: true,
prune_blobs: true,
epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE,
blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS,
Expand All @@ -91,6 +99,25 @@ impl StoreConfig {
}
Ok(())
}

/// Estimate the size of `len` bytes after compression at the current compression level.
pub fn estimate_compressed_size(&self, len: usize) -> usize {
if self.compression_level == 0 {
len
} else {
len / EST_COMPRESSION_FACTOR
}
}

/// Estimate the size of `len` compressed bytes after decompression at the current compression
/// level.
pub fn estimate_decompressed_size(&self, len: usize) -> usize {
if self.compression_level == 0 {
len
} else {
len * EST_COMPRESSION_FACTOR
}
}
}

impl StoreItem for OnDiskStoreConfig {
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub enum Error {
computed: Hash256,
},
BlockReplayError(BlockReplayError),
Compression(std::io::Error),
AddPayloadLogicError,
SlotClockUnavailableForMigration,
InvalidKey,
Expand Down
Loading
Loading