Skip to content

Commit

Permalink
Fix tree-states sub-epoch diffs (sigp#5097)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Jan 30, 2024
1 parent 11461d8 commit 7862c71
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 65 deletions.
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

// Store block roots, including at all skip slots in the freezer DB.
// The block root mapping for `block.slot()` itself was already written when the block
// was stored, above.
for slot in (block.slot().as_usize() + 1..prev_block_slot.as_usize()).rev() {
cold_batch.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()),
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
Arg::with_name("slots-per-restore-point")
.long("slots-per-restore-point")
.value_name("SLOT_COUNT")
.help("Specifies how often a freezer DB restore point should be stored. \
Cannot be changed after initialization. \
[default: 8192 (mainnet) or 64 (minimal)]")
.help("Deprecated.")
.takes_value(true)
)
.arg(
Expand Down
19 changes: 2 additions & 17 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::num::NonZeroU16;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
use store::hdiff::HierarchyConfig;
use types::{Checkpoint, Epoch, EthSpec, Hash256, PublicKeyBytes, GRAFFITI_BYTES_LEN};

/// Gets the fully-initialized global client.
Expand Down Expand Up @@ -435,22 +434,8 @@ pub fn get_config<E: EthSpec>(
client_config.store.epochs_per_state_diff = epochs_per_state_diff;
}

if let Some(hierarchy_exponents) =
clap_utils::parse_optional::<String>(cli_args, "hierarchy-exponents")?
{
let exponents = hierarchy_exponents
.split(',')
.map(|s| {
s.parse()
.map_err(|e| format!("invalid hierarchy-exponents: {e:?}"))
})
.collect::<Result<Vec<u8>, _>>()?;

if exponents.windows(2).any(|w| w[0] >= w[1]) {
return Err("hierarchy-exponents must be in ascending order".to_string());
}

client_config.store.hierarchy_config = HierarchyConfig { exponents };
if let Some(hierarchy_config) = clap_utils::parse_optional(cli_args, "hierarchy-exponents")? {
client_config.store.hierarchy_config = hierarchy_config;
}

if let Some(epochs_per_migration) =
Expand Down
59 changes: 51 additions & 8 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::hdiff::HierarchyConfig;
use crate::{DBColumn, Error, StoreItem};
use crate::{AnchorInfo, DBColumn, Error, Split, StoreItem};
use serde::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
Expand Down Expand Up @@ -117,15 +117,22 @@ impl StoreConfig {
pub fn check_compatibility(
&self,
on_disk_config: &OnDiskStoreConfig,
split: &Split,
anchor: Option<&AnchorInfo>,
) -> Result<(), StoreConfigError> {
let db_config = self.as_disk_config();
if db_config.ne(on_disk_config) {
return Err(StoreConfigError::IncompatibleStoreConfig {
// Allow changing the hierarchy exponents if no historic states are stored.
if db_config.linear_blocks == on_disk_config.linear_blocks
&& (db_config.hierarchy_config == on_disk_config.hierarchy_config
|| anchor.map_or(false, |anchor| anchor.no_historic_states_stored(split.slot)))
{
Ok(())
} else {
Err(StoreConfigError::IncompatibleStoreConfig {
config: db_config,
on_disk: on_disk_config.clone(),
});
})
}
Ok(())
}

/// Check that the configuration is valid.
Expand Down Expand Up @@ -218,6 +225,8 @@ impl StoreItem for OnDiskStoreConfig {
#[cfg(test)]
mod test {
use super::*;
use crate::{metadata::STATE_UPPER_LIMIT_NO_RETAIN, AnchorInfo, Split};
use types::{Hash256, Slot};

#[test]
fn check_compatibility_ok() {
Expand All @@ -229,7 +238,10 @@ mod test {
linear_blocks: true,
hierarchy_config: store_config.hierarchy_config.clone(),
};
assert!(store_config.check_compatibility(&on_disk_config).is_ok());
let split = Split::default();
assert!(store_config
.check_compatibility(&on_disk_config, &split, None)
.is_ok());
}

#[test]
Expand All @@ -242,7 +254,10 @@ mod test {
linear_blocks: false,
hierarchy_config: store_config.hierarchy_config.clone(),
};
assert!(store_config.check_compatibility(&on_disk_config).is_err());
let split = Split::default();
assert!(store_config
.check_compatibility(&on_disk_config, &split, None)
.is_err());
}

#[test]
Expand All @@ -257,6 +272,34 @@ mod test {
exponents: vec![5, 8, 11, 13, 16, 18, 21],
},
};
assert!(store_config.check_compatibility(&on_disk_config).is_err());
let split = Split::default();
assert!(store_config
.check_compatibility(&on_disk_config, &split, None)
.is_err());
}

#[test]
fn check_compatibility_hierarchy_config_update() {
let store_config = StoreConfig {
linear_blocks: true,
..Default::default()
};
let on_disk_config = OnDiskStoreConfig {
linear_blocks: true,
hierarchy_config: HierarchyConfig {
exponents: vec![5, 8, 11, 13, 16, 18, 21],
},
};
let split = Split::default();
let anchor = AnchorInfo {
anchor_slot: Slot::new(0),
oldest_block_slot: Slot::new(0),
oldest_block_parent: Hash256::zero(),
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
};
assert!(store_config
.check_compatibility(&on_disk_config, &split, Some(&anchor))
.is_ok());
}
}
33 changes: 33 additions & 0 deletions beacon_node/store/src/hdiff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::io::{Read, Write};
use std::str::FromStr;
use types::{BeaconState, ChainSpec, EthSpec, Slot, VList};
use zstd::{Decoder, Encoder};

Expand All @@ -22,6 +23,26 @@ pub struct HierarchyConfig {
pub exponents: Vec<u8>,
}

impl FromStr for HierarchyConfig {
type Err = String;

fn from_str(s: &str) -> Result<Self, String> {
let exponents = s
.split(',')
.map(|s| {
s.parse()
.map_err(|e| format!("invalid hierarchy-exponents: {e:?}"))
})
.collect::<Result<Vec<u8>, _>>()?;

if exponents.windows(2).any(|w| w[0] >= w[1]) {
return Err("hierarchy-exponents must be in ascending order".to_string());
}

Ok(HierarchyConfig { exponents })
}
}

#[derive(Debug)]
pub struct HierarchyModuli {
moduli: Vec<u64>,
Expand Down Expand Up @@ -267,6 +288,18 @@ impl HierarchyModuli {
Ok((slot / last + 1) * last)
}
}

/// Return `true` if the database ops for this slot should be committed immediately.
///
/// This is the case for all diffs in the 2nd lowest layer and above, which are required by diffs
/// in the 1st layer.
pub fn should_commit_immediately(&self, slot: Slot) -> Result<bool, Error> {
// If there's only 1 layer of snapshots, then commit only when writing a snapshot.
self.moduli.get(1).map_or_else(
|| Ok(slot == self.next_snapshot_slot(slot)?),
|second_layer_moduli| Ok(slot % *second_layer_moduli == 0),
)
}
}

#[cfg(test)]
Expand Down
19 changes: 18 additions & 1 deletion beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,20 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {

// Ensure that any on-disk config is compatible with the supplied config.
if let Some(disk_config) = db.load_config()? {
db.config.check_compatibility(&disk_config)?;
let split = db.get_split_info();
let anchor = db.get_anchor_info();
db.config
.check_compatibility(&disk_config, &split, anchor.as_ref())?;

// Inform user if hierarchy config is changing.
if db.config.hierarchy_config != disk_config.hierarchy_config {
info!(
db.log,
"Updating historic state config";
"previous_config" => ?disk_config.hierarchy_config,
"new_config" => ?db.config.hierarchy_config,
);
}
}
db.store_config()?;

Expand Down Expand Up @@ -2740,6 +2753,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let columns = [
DBColumn::BeaconState,
DBColumn::BeaconStateSummary,
DBColumn::BeaconStateDiff,
DBColumn::BeaconRestorePoint,
DBColumn::BeaconStateRoots,
DBColumn::BeaconHistoricalRoots,
Expand Down Expand Up @@ -2780,6 +2794,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.cold_db.do_atomically(cold_ops)?;
}

// In order to reclaim space, we need to compact the freezer DB as well.
self.cold_db.compact()?;

Ok(())
}
}
Expand Down
28 changes: 9 additions & 19 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,15 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
self.transaction_mutex.lock()
}

/// Compact all values in the states and states flag columns.
fn compact(&self) -> Result<(), Error> {
let endpoints = |column: DBColumn| {
(
BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_bytes())),
BytesKey::from_vec(get_key_for_col(
column.as_str(),
Hash256::repeat_byte(0xff).as_bytes(),
)),
)
};

for (start_key, end_key) in [
endpoints(DBColumn::BeaconState),
endpoints(DBColumn::BeaconStateDiff),
endpoints(DBColumn::BeaconStateSummary),
] {
self.db.compact(&start_key, &end_key);
}
fn compact_column(&self, column: DBColumn) -> Result<(), Error> {
// Use key-size-agnostic keys [] and 0xff..ff with a minimum of 32 bytes to account for
// columns that may change size between sub-databases or schema versions.
let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), &[]));
let end_key = BytesKey::from_vec(get_key_for_col(
column.as_str(),
&vec![0; std::cmp::max(column.key_size(), 32)],
));
self.db.compact(&start_key, &end_key);
Ok(())
}

Expand Down
19 changes: 17 additions & 2 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,23 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
/// this method. In future we may implement a safer mandatory locking scheme.
fn begin_rw_transaction(&self) -> MutexGuard<()>;

/// Compact the database, freeing space used by deleted items.
fn compact(&self) -> Result<(), Error>;
/// Compact a single column in the database, freeing space used by deleted items.
fn compact_column(&self, column: DBColumn) -> Result<(), Error>;

/// Compact a default set of columns that are likely to free substantial space.
fn compact(&self) -> Result<(), Error> {
// Compact state and block related columns as they are likely to have the most churn,
// i.e. entries being created and deleted.
for column in [
DBColumn::BeaconState,
DBColumn::BeaconStateDiff,
DBColumn::BeaconStateSummary,
DBColumn::BeaconBlock,
] {
self.compact_column(column)?;
}
Ok(())
}

/// Iterate through all keys and values in a particular column.
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
self.transaction_mutex.lock()
}

fn compact(&self) -> Result<(), Error> {
fn compact_column(&self, _column: DBColumn) -> Result<(), Error> {
Ok(())
}
}
Expand Down
5 changes: 5 additions & 0 deletions beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ impl AnchorInfo {
pub fn block_backfill_complete(&self, target_slot: Slot) -> bool {
self.oldest_block_slot <= target_slot
}

/// Return true if no historic states other than genesis are stored in the database.
pub fn no_historic_states_stored(&self, split_slot: Slot) -> bool {
self.state_lower_limit == 0 && self.state_upper_limit >= split_slot
}
}

impl StoreItem for AnchorInfo {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/store/src/reconstruct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
self.store_cold_state(&state_root, &state, &mut io_batch)?;

// If the slot lies on an epoch boundary, commit the batch and update the anchor.
if slot % E::slots_per_epoch() == 0 || slot + 1 == upper_limit_slot {
if self.hierarchy.should_commit_immediately(slot)? || slot + 1 == upper_limit_slot {
info!(
self.log,
"State reconstruction in progress";
Expand Down
Loading

0 comments on commit 7862c71

Please sign in to comment.