Skip to content

Commit

Permalink
Implement epochs-per-migration
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Jun 26, 2023
1 parent cc780aa commit b9938eb
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 2 deletions.
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub struct ChainConfig {
pub always_prepare_payload: bool,
/// Whether backfill sync processing should be rate-limited.
pub enable_backfill_rate_limiting: bool,
/// Number of epochs between each migration of data from the hot database to the freezer.
pub epochs_per_migration: u64,
}

impl Default for ChainConfig {
Expand Down Expand Up @@ -111,6 +113,7 @@ impl Default for ChainConfig {
genesis_backfill: false,
always_prepare_payload: false,
enable_backfill_rate_limiting: true,
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub use execution_layer::EngineState;
pub use execution_payload::NotifyExecutionLayer;
pub use fork_choice::{ExecutionStatus, ForkchoiceUpdateParameters};
pub use metrics::scrape_for_metrics;
pub use migrate::MigratorConfig;
pub use parking_lot;
pub use slot_clock;
pub use state_processing::per_block_processing::errors::{
Expand Down
61 changes: 60 additions & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,58 @@ const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200;
/// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`.
const COMPACTION_FINALITY_DISTANCE: u64 = 1024;

/// Default number of epochs to wait between finalization migrations.
pub const DEFAULT_EPOCHS_PER_MIGRATION: u64 = 1;

/// The background migrator runs a thread to perform pruning and migrate state from the hot
/// to the cold database.
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
db: Arc<HotColdDB<E, Hot, Cold>>,
/// Record of when the last migration ran, for enforcing `epochs_per_migration`.
prev_migration: Arc<Mutex<PrevMigration>>,
#[allow(clippy::type_complexity)]
tx_thread: Option<Mutex<(mpsc::Sender<Notification>, thread::JoinHandle<()>)>>,
/// Genesis block root, for persisting the `PersistedBeaconChain`.
genesis_block_root: Hash256,
log: Logger,
}

#[derive(Debug, Default, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MigratorConfig {
pub blocking: bool,
/// Run migrations at most once per `epochs_per_migration`.
///
/// If set to 0 or 1, then run every finalization.
pub epochs_per_migration: u64,
}

impl Default for MigratorConfig {
fn default() -> Self {
Self {
blocking: false,
epochs_per_migration: DEFAULT_EPOCHS_PER_MIGRATION,
}
}
}

impl MigratorConfig {
pub fn blocking(mut self) -> Self {
self.blocking = true;
self
}

pub fn epochs_per_migration(mut self, epochs_per_migration: u64) -> Self {
self.epochs_per_migration = epochs_per_migration;
self
}
}

/// Record of when the last migration ran.
pub struct PrevMigration {
/// The epoch at which the last finalization migration ran.
epoch: Epoch,
/// The number of epochs to wait between runs.
epochs_per_migration: u64,
}

/// Pruning can be successful, or in rare cases deferred to a later point.
Expand Down Expand Up @@ -92,6 +123,7 @@ pub struct FinalizationNotification {
finalized_state_root: BeaconStateHash,
finalized_checkpoint: Checkpoint,
head_tracker: Arc<HeadTracker>,
prev_migration: Arc<Mutex<PrevMigration>>,
genesis_block_root: Hash256,
}

Expand All @@ -103,6 +135,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
genesis_block_root: Hash256,
log: Logger,
) -> Self {
// Estimate last migration run from DB split slot.
let prev_migration = Arc::new(Mutex::new(PrevMigration {
epoch: db.get_split_slot().epoch(E::slots_per_epoch()),
epochs_per_migration: config.epochs_per_migration,
}));
let tx_thread = if config.blocking {
None
} else {
Expand All @@ -111,6 +148,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
Self {
db,
tx_thread,
prev_migration,
genesis_block_root,
log,
}
Expand All @@ -131,6 +169,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
finalized_state_root,
finalized_checkpoint,
head_tracker,
prev_migration: self.prev_migration.clone(),
genesis_block_root: self.genesis_block_root,
};

Expand Down Expand Up @@ -204,6 +243,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
notif: FinalizationNotification,
log: &Logger,
) {
// Do not run too frequently.
let epoch = notif.finalized_checkpoint.epoch;
let mut prev_migration = notif.prev_migration.lock();
if epoch < prev_migration.epoch + prev_migration.epochs_per_migration {
debug!(
log,
"Database consolidation deferred";
"last_finalized_epoch" => prev_migration.epoch,
"new_finalized_epoch" => epoch,
"epochs_per_migration" => prev_migration.epochs_per_migration,
);
return;
}

// Update the previous migration epoch immediately to avoid holding the lock. If the
// migration doesn't succeed then the next migration will be retried at the next scheduled
// run.
prev_migration.epoch = epoch;
drop(prev_migration);

debug!(log, "Database consolidation started");

let finalized_state_root = notif.finalized_state_root;
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use beacon_chain::{
slot_clock::{SlotClock, SystemTimeSlotClock},
state_advance_timer::spawn_state_advance_timer,
store::{HotColdDB, ItemStore, LevelDB, StoreConfig},
BeaconChain, BeaconChainTypes, Eth1ChainBackend, ServerSentEventHandler,
BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler,
};
use environment::RuntimeContext;
use eth1::{Config as Eth1Config, Service as Eth1Service};
Expand Down Expand Up @@ -167,6 +167,9 @@ where
.store(store)
.task_executor(context.executor.clone())
.custom_spec(spec.clone())
.store_migrator_config(
MigratorConfig::default().epochs_per_migration(chain_config.epochs_per_migration),
)
.chain_config(chain_config)
.graffiti(graffiti)
.event_handler(event_handler)
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,16 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
[default: 8192 (mainnet) or 64 (minimal)]")
.takes_value(true)
)
.arg(
Arg::with_name("epochs-per-migration")
.long("epochs-per-migration")
.value_name("N")
.help("The number of epochs to wait between running the migration of data from the \
hot DB to the cold DB. Less frequent runs can be useful for minimizing disk \
writes")
.default_value("1")
.takes_value(true)
)
.arg(
Arg::with_name("block-cache-size")
.long("block-cache-size")
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,12 @@ pub fn get_config<E: EthSpec>(
client_config.store.prune_payloads = prune_payloads;
}

if let Some(epochs_per_migration) =
clap_utils::parse_optional(cli_args, "epochs-per-migration")?
{
client_config.chain.epochs_per_migration = epochs_per_migration;
}

/*
* Zero-ports
*
Expand Down
18 changes: 18 additions & 0 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change