Skip to content

Commit

Permalink
Modularize beacon node backend (#4718)
Browse files Browse the repository at this point in the history
#4669


  Modularize the beacon node backend to make it easier to add new database implementations
  • Loading branch information
eserilev authored Jan 23, 2025
1 parent 266b241 commit a1b7d61
Show file tree
Hide file tree
Showing 38 changed files with 1,478 additions and 649 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ BUILD_PATH_AARCH64 = "target/$(AARCH64_TAG)/release"
PINNED_NIGHTLY ?= nightly

# List of features to use when cross-compiling. Can be overridden via the environment.
CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx,slasher-redb,jemalloc
CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx,slasher-redb,jemalloc,beacon-node-leveldb,beacon-node-redb

# Cargo profile for Cross builds. Default is for local builds, CI uses an override.
CROSS_PROFILE ?= release
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ impl<E: EthSpec> PendingComponents<E> {
None,
)
};

let executed_block = recover(diet_executed_block)?;

let AvailabilityPendingExecutedBlock {
Expand Down Expand Up @@ -732,7 +731,7 @@ mod test {
use slog::{info, Logger};
use state_processing::ConsensusContext;
use std::collections::VecDeque;
use store::{HotColdDB, ItemStore, LevelDB, StoreConfig};
use store::{database::interface::BeaconNodeBackend, HotColdDB, ItemStore, StoreConfig};
use tempfile::{tempdir, TempDir};
use types::non_zero_usize::new_non_zero_usize;
use types::{ExecPayload, MinimalEthSpec};
Expand All @@ -744,7 +743,7 @@ mod test {
db_path: &TempDir,
spec: Arc<ChainSpec>,
log: Logger,
) -> Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>> {
) -> Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>> {
let hot_path = db_path.path().join("hot_db");
let cold_path = db_path.path().join("cold_db");
let blobs_path = db_path.path().join("blobs_db");
Expand Down Expand Up @@ -920,7 +919,11 @@ mod test {
)
where
E: EthSpec,
T: BeaconChainTypes<HotStore = LevelDB<E>, ColdStore = LevelDB<E>, EthSpec = E>,
T: BeaconChainTypes<
HotStore = BeaconNodeBackend<E>,
ColdStore = BeaconNodeBackend<E>,
EthSpec = E,
>,
{
let log = test_logger();
let chain_db_path = tempdir().expect("should get temp dir");
Expand Down
11 changes: 5 additions & 6 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use std::borrow::Cow;
use std::iter;
use std::time::Duration;
use store::metadata::DataColumnInfo;
use store::{
get_key_for_col, AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore,
KeyValueStoreOp,
};
use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp};
use strum::IntoStaticStr;
use types::{FixedBytesExtended, Hash256, Slot};

Expand Down Expand Up @@ -153,7 +150,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Store block roots, including at all skip slots in the freezer DB.
for slot in (block.slot().as_u64()..prev_block_slot.as_u64()).rev() {
cold_batch.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()),
DBColumn::BeaconBlockRoots,
slot.to_be_bytes().to_vec(),
block_root.as_slice().to_vec(),
));
}
Expand All @@ -169,7 +167,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let genesis_slot = self.spec.genesis_slot;
for slot in genesis_slot.as_u64()..prev_block_slot.as_u64() {
cold_batch.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()),
DBColumn::BeaconBlockRoots,
slot.to_be_bytes().to_vec(),
self.genesis_block_root.as_slice().to_vec(),
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use crate::validator_pubkey_cache::DatabasePubkey;
use slog::{info, Logger};
use ssz::{Decode, Encode};
use std::sync::Arc;
use store::{
get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem,
};
use store::{DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem};
use types::{Hash256, PublicKey};

const LOG_EVERY: usize = 200_000;
Expand Down Expand Up @@ -62,9 +60,9 @@ pub fn downgrade_from_v21<T: BeaconChainTypes>(
message: format!("{e:?}"),
})?;

let db_key = get_key_for_col(DBColumn::PubkeyCache.into(), key.as_slice());
ops.push(KeyValueStoreOp::PutKeyValue(
db_key,
DBColumn::PubkeyCache,
key.as_slice().to_vec(),
pubkey_bytes.as_ssz_bytes(),
));

Expand Down
17 changes: 6 additions & 11 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;
use store::chunked_iter::ChunkedVectorIter;
use store::{
chunked_vector::BlockRootsChunked,
get_key_for_col,
metadata::{
SchemaVersion, ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_UNINITIALIZED, STATE_UPPER_LIMIT_NO_RETAIN,
},
Expand All @@ -21,7 +20,7 @@ fn load_old_schema_frozen_state<T: BeaconChainTypes>(
) -> Result<Option<BeaconState<T::EthSpec>>, Error> {
let Some(partial_state_bytes) = db
.cold_db
.get_bytes(DBColumn::BeaconState.into(), state_root.as_slice())?
.get_bytes(DBColumn::BeaconState, state_root.as_slice())?
else {
return Ok(None);
};
Expand Down Expand Up @@ -136,10 +135,7 @@ pub fn delete_old_schema_freezer_data<T: BeaconChainTypes>(
for column in columns {
for res in db.cold_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
column.as_str(),
&key,
)));
cold_ops.push(KeyValueStoreOp::DeleteKey(column, key));
}
}
let delete_ops = cold_ops.len();
Expand Down Expand Up @@ -175,7 +171,8 @@ pub fn write_new_schema_block_roots<T: BeaconChainTypes>(
// Store the genesis block root if it would otherwise not be stored.
if oldest_block_slot != 0 {
cold_ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &0u64.to_be_bytes()),
DBColumn::BeaconBlockRoots,
0u64.to_be_bytes().to_vec(),
genesis_block_root.as_slice().to_vec(),
));
}
Expand All @@ -192,10 +189,8 @@ pub fn write_new_schema_block_roots<T: BeaconChainTypes>(
// OK to hold these in memory (10M slots * 43 bytes per KV ~= 430 MB).
for (i, (slot, block_root)) in block_root_iter.enumerate() {
cold_ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(
DBColumn::BeaconBlockRoots.into(),
&(slot as u64).to_be_bytes(),
),
DBColumn::BeaconBlockRoots,
slot.to_be_bytes().to_vec(),
block_root.as_slice().to_vec(),
));

Expand Down
15 changes: 11 additions & 4 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore};
use store::database::interface::BeaconNodeBackend;
use store::{config::StoreConfig, HotColdDB, ItemStore, MemoryStore};
use task_executor::TaskExecutor;
use task_executor::{test_utils::TestRuntime, ShutdownReason};
use tree_hash::TreeHash;
Expand Down Expand Up @@ -116,7 +117,7 @@ pub fn get_kzg(spec: &ChainSpec) -> Arc<Kzg> {
pub type BaseHarnessType<E, THotStore, TColdStore> =
Witness<TestingSlotClock, CachingEth1Backend<E>, E, THotStore, TColdStore>;

pub type DiskHarnessType<E> = BaseHarnessType<E, LevelDB<E>, LevelDB<E>>;
pub type DiskHarnessType<E> = BaseHarnessType<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>;
pub type EphemeralHarnessType<E> = BaseHarnessType<E, MemoryStore<E>, MemoryStore<E>>;

pub type BoxedMutator<E, Hot, Cold> = Box<
Expand Down Expand Up @@ -299,7 +300,10 @@ impl<E: EthSpec> Builder<EphemeralHarnessType<E>> {

impl<E: EthSpec> Builder<DiskHarnessType<E>> {
/// Disk store, start from genesis.
pub fn fresh_disk_store(mut self, store: Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>>) -> Self {
pub fn fresh_disk_store(
mut self,
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
) -> Self {
let validator_keypairs = self
.validator_keypairs
.clone()
Expand All @@ -324,7 +328,10 @@ impl<E: EthSpec> Builder<DiskHarnessType<E>> {
}

/// Disk store, resume.
pub fn resumed_disk_store(mut self, store: Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>>) -> Self {
pub fn resumed_disk_store(
mut self,
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
) -> Self {
let mutator = move |builder: BeaconChainBuilder<_>| {
builder
.resume_from_db()
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/beacon_chain/tests/op_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use state_processing::per_block_processing::errors::{
AttesterSlashingInvalid, BlockOperationError, ExitInvalid, ProposerSlashingInvalid,
};
use std::sync::{Arc, LazyLock};
use store::{LevelDB, StoreConfig};
use store::database::interface::BeaconNodeBackend;
use store::StoreConfig;
use tempfile::{tempdir, TempDir};
use types::*;

Expand All @@ -26,7 +27,7 @@ static KEYPAIRS: LazyLock<Vec<Keypair>> =

type E = MinimalEthSpec;
type TestHarness = BeaconChainHarness<DiskHarnessType<E>>;
type HotColdDB = store::HotColdDB<E, LevelDB<E>, LevelDB<E>>;
type HotColdDB = store::HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>;

fn get_store(db_path: &TempDir) -> Arc<HotColdDB> {
let spec = Arc::new(test_spec::<E>());
Expand Down
17 changes: 10 additions & 7 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ use std::collections::HashSet;
use std::convert::TryInto;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use store::database::interface::BeaconNodeBackend;
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION, STATE_UPPER_LIMIT_NO_RETAIN};
use store::{
iter::{BlockRootsIterator, StateRootsIterator},
BlobInfo, DBColumn, HotColdDB, LevelDB, StoreConfig,
BlobInfo, DBColumn, HotColdDB, StoreConfig,
};
use tempfile::{tempdir, TempDir};
use tokio::time::sleep;
Expand All @@ -46,15 +47,15 @@ static KEYPAIRS: LazyLock<Vec<Keypair>> =
type E = MinimalEthSpec;
type TestHarness = BeaconChainHarness<DiskHarnessType<E>>;

fn get_store(db_path: &TempDir) -> Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>> {
fn get_store(db_path: &TempDir) -> Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>> {
get_store_generic(db_path, StoreConfig::default(), test_spec::<E>())
}

fn get_store_generic(
db_path: &TempDir,
config: StoreConfig,
spec: ChainSpec,
) -> Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>> {
) -> Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>> {
let hot_path = db_path.path().join("chain_db");
let cold_path = db_path.path().join("freezer_db");
let blobs_path = db_path.path().join("blobs_db");
Expand All @@ -73,7 +74,7 @@ fn get_store_generic(
}

fn get_harness(
store: Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>>,
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
validator_count: usize,
) -> TestHarness {
// Most tests expect to retain historic states, so we use this as the default.
Expand All @@ -85,7 +86,7 @@ fn get_harness(
}

fn get_harness_generic(
store: Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>>,
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
validator_count: usize,
chain_config: ChainConfig,
) -> TestHarness {
Expand Down Expand Up @@ -244,7 +245,6 @@ async fn full_participation_no_skips() {
AttestationStrategy::AllValidators,
)
.await;

check_finalization(&harness, num_blocks_produced);
check_split_slot(&harness, store);
check_chain_dump(&harness, num_blocks_produced + 1);
Expand Down Expand Up @@ -3508,7 +3508,10 @@ fn check_finalization(harness: &TestHarness, expected_slot: u64) {
}

/// Check that the HotColdDB's split_slot is equal to the start slot of the last finalized epoch.
fn check_split_slot(harness: &TestHarness, store: Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>>) {
fn check_split_slot(
harness: &TestHarness,
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
) {
let split_slot = store.get_split_slot();
assert_eq!(
harness
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use beacon_chain::{
eth1_chain::{CachingEth1Backend, Eth1Chain},
slot_clock::{SlotClock, SystemTimeSlotClock},
state_advance_timer::spawn_state_advance_timer,
store::{HotColdDB, ItemStore, LevelDB, StoreConfig},
store::{HotColdDB, ItemStore, StoreConfig},
BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler,
};
use beacon_chain::{Kzg, LightClientProducerEvent};
Expand All @@ -41,6 +41,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use store::database::interface::BeaconNodeBackend;
use timer::spawn_timer;
use tokio::sync::oneshot;
use types::{
Expand Down Expand Up @@ -1030,7 +1031,7 @@ where
}

impl<TSlotClock, TEth1Backend, E>
ClientBuilder<Witness<TSlotClock, TEth1Backend, E, LevelDB<E>, LevelDB<E>>>
ClientBuilder<Witness<TSlotClock, TEth1Backend, E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>
where
TSlotClock: SlotClock + 'static,
TEth1Backend: Eth1ChainBackend<E> + 'static,
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1933,7 +1933,7 @@ impl ApiTester {
.sync_committee_period(&self.chain.spec)
.unwrap();

let result = match self
match self
.client
.get_beacon_light_client_updates::<E>(current_sync_committee_period, 1)
.await
Expand All @@ -1954,7 +1954,6 @@ impl ApiTester {
.unwrap();

assert_eq!(1, expected.len());
assert_eq!(result.clone().unwrap().len(), expected.len());
self
}

Expand All @@ -1979,7 +1978,6 @@ impl ApiTester {
.get_light_client_bootstrap(&self.chain.store, &block_root, 1u64, &self.chain.spec);

assert!(expected.is_ok());

assert_eq!(result.unwrap().data, expected.unwrap().unwrap().0);

self
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1591,5 +1591,14 @@ pub fn cli_app() -> Command {
.action(ArgAction::Set)
.display_order(0)
)
.arg(
Arg::new("beacon-node-backend")
.long("beacon-node-backend")
.value_name("DATABASE")
.value_parser(store::config::DatabaseBackend::VARIANTS.to_vec())
.help("Set the database backend to be used by the beacon node.")
.action(ArgAction::Set)
.display_order(0)
)
.group(ArgGroup::new("enable_http").args(["http", "gui", "staking"]).multiple(true))
}
4 changes: 4 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,10 @@ pub fn get_config<E: EthSpec>(
warn!(log, "The slots-per-restore-point flag is deprecated");
}

if let Some(backend) = clap_utils::parse_optional(cli_args, "beacon-node-backend")? {
client_config.store.backend = backend;
}

if let Some(hierarchy_config) = clap_utils::parse_optional(cli_args, "hierarchy-exponents")? {
client_config.store.hierarchy_config = hierarchy_config;
}
Expand Down
Loading

0 comments on commit a1b7d61

Please sign in to comment.