From 0e20598b3a0f69703b10fb670458cceee48a3b6d Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 12 Jul 2023 14:03:26 +0000 Subject: [PATCH] Fix HTTP state API bug and add `--epochs-per-migration` (#4236) ## Issue Addressed Fix an issue observed by `@zlan` on Discord where Lighthouse would sometimes return this error when looking up states via the API: > {"code":500,"message":"UNHANDLED_ERROR: ForkChoiceError(MissingProtoArrayBlock(0xc9cf1495421b6ef3215d82253b388d77321176a1dcef0db0e71a0cd0ffc8cdb7))","stacktraces":[]} ## Proposed Changes The error stems from a faulty assumption in the HTTP API logic: that any state in the hot database must have its block in fork choice. This isn't true because the state's hot database may update much less frequently than the fork choice store, e.g. if reconstructing states (where freezer migration pauses), or if the freezer migration runs slowly. There could also be a race between loading the hot state and checking fork choice, e.g. even if the finalization migration of DB+fork choice were atomic, the update could happen between the 1st and 2nd calls. To address this I've changed the HTTP API logic to use the finalized block's execution status as a fallback where it is safe to do so. In the case where a block is non-canonical and prior to finalization (permanently orphaned) we default `execution_optimistic` to `true`. ## Additional Info I've also added a new CLI flag to reduce the frequency of the finalization migration as this is useful for several purposes: - Spacing out database writes (less frequent, larger batches) - Keeping a limited chain history with high availability, e.g. the last month in the hot database. This new flag made it _substantially_ easier to test this change. It was extracted from `tree-states` (where it's called `--db-migration-period`), which is why this PR also carries the `tree-states` label. --- beacon_node/beacon_chain/src/beacon_chain.rs | 35 ++++++++- beacon_node/beacon_chain/src/chain_config.rs | 3 + beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/beacon_chain/src/migrate.rs | 61 ++++++++++++++- beacon_node/beacon_chain/src/test_utils.rs | 9 ++- beacon_node/client/src/builder.rs | 5 +- beacon_node/http_api/src/state_id.rs | 33 ++++++-- .../http_api/tests/interactive_tests.rs | 75 ++++++++++++++++++- beacon_node/src/cli.rs | 10 +++ beacon_node/src/config.rs | 6 ++ lighthouse/tests/beacon_node.rs | 18 +++++ 11 files changed, 240 insertions(+), 16 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 01343ff3b15..78f2c3f03b4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -197,6 +197,17 @@ pub struct PrePayloadAttributes { pub parent_block_number: u64, } +/// Information about a state/block at a specific slot. +#[derive(Debug, Clone, Copy)] +pub struct FinalizationAndCanonicity { + /// True if the slot of the state or block is finalized. + /// + /// This alone DOES NOT imply that the state/block is finalized, use `self.is_finalized()`. + pub slot_is_finalized: bool, + /// True if the state or block is canonical at its slot. + pub canonical: bool, +} + /// Define whether a forkchoiceUpdate needs to be checked for an override (`Yes`) or has already /// been checked (`AlreadyApplied`). It is safe to specify `Yes` even if re-orgs are disabled. #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] @@ -426,6 +437,12 @@ pub struct BeaconChain { type BeaconBlockAndState = (BeaconBlock, BeaconState); +impl FinalizationAndCanonicity { + pub fn is_finalized(self) -> bool { + self.slot_is_finalized && self.canonical + } +} + impl BeaconChain { /// Checks if a block is finalized. /// The finalization check is done with the block slot. The block root is used to verify that @@ -455,16 +472,30 @@ impl BeaconChain { state_root: &Hash256, state_slot: Slot, ) -> Result { + self.state_finalization_and_canonicity(state_root, state_slot) + .map(FinalizationAndCanonicity::is_finalized) + } + + /// Fetch the finalization and canonicity status of the state with `state_root`. + pub fn state_finalization_and_canonicity( + &self, + state_root: &Hash256, + state_slot: Slot, + ) -> Result { let finalized_slot = self .canonical_head .cached_head() .finalized_checkpoint() .epoch .start_slot(T::EthSpec::slots_per_epoch()); - let is_canonical = self + let slot_is_finalized = state_slot <= finalized_slot; + let canonical = self .state_root_at_slot(state_slot)? .map_or(false, |canonical_root| state_root == &canonical_root); - Ok(state_slot <= finalized_slot && is_canonical) + Ok(FinalizationAndCanonicity { + slot_is_finalized, + canonical, + }) } /// Persists the head tracker and fork choice. diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index cc7a957ecc8..efbc9905b7a 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -83,6 +83,8 @@ pub struct ChainConfig { pub enable_backfill_rate_limiting: bool, /// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation. pub progressive_balances_mode: ProgressiveBalancesMode, + /// Number of epochs between each migration of data from the hot database to the freezer. + pub epochs_per_migration: u64, } impl Default for ChainConfig { @@ -114,6 +116,7 @@ impl Default for ChainConfig { always_prepare_payload: false, enable_backfill_rate_limiting: true, progressive_balances_mode: ProgressiveBalancesMode::Checked, + epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION, } } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index c5cf74e179c..85ff0f20a01 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -73,6 +73,7 @@ pub use execution_layer::EngineState; pub use execution_payload::NotifyExecutionLayer; pub use fork_choice::{ExecutionStatus, ForkchoiceUpdateParameters}; pub use metrics::scrape_for_metrics; +pub use migrate::MigratorConfig; pub use parking_lot; pub use slot_clock; pub use state_processing::per_block_processing::errors::{ diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 66f082742eb..8306b66d7b5 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -25,10 +25,15 @@ const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200; /// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`. const COMPACTION_FINALITY_DISTANCE: u64 = 1024; +/// Default number of epochs to wait between finalization migrations. +pub const DEFAULT_EPOCHS_PER_MIGRATION: u64 = 1; + /// The background migrator runs a thread to perform pruning and migrate state from the hot /// to the cold database. pub struct BackgroundMigrator, Cold: ItemStore> { db: Arc>, + /// Record of when the last migration ran, for enforcing `epochs_per_migration`. + prev_migration: Arc>, #[allow(clippy::type_complexity)] tx_thread: Option, thread::JoinHandle<()>)>>, /// Genesis block root, for persisting the `PersistedBeaconChain`. @@ -36,9 +41,22 @@ pub struct BackgroundMigrator, Cold: ItemStore> log: Logger, } -#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct MigratorConfig { pub blocking: bool, + /// Run migrations at most once per `epochs_per_migration`. + /// + /// If set to 0 or 1, then run every finalization. + pub epochs_per_migration: u64, +} + +impl Default for MigratorConfig { + fn default() -> Self { + Self { + blocking: false, + epochs_per_migration: DEFAULT_EPOCHS_PER_MIGRATION, + } + } } impl MigratorConfig { @@ -46,6 +64,19 @@ impl MigratorConfig { self.blocking = true; self } + + pub fn epochs_per_migration(mut self, epochs_per_migration: u64) -> Self { + self.epochs_per_migration = epochs_per_migration; + self + } +} + +/// Record of when the last migration ran. +pub struct PrevMigration { + /// The epoch at which the last finalization migration ran. + epoch: Epoch, + /// The number of epochs to wait between runs. + epochs_per_migration: u64, } /// Pruning can be successful, or in rare cases deferred to a later point. @@ -92,6 +123,7 @@ pub struct FinalizationNotification { finalized_state_root: BeaconStateHash, finalized_checkpoint: Checkpoint, head_tracker: Arc, + prev_migration: Arc>, genesis_block_root: Hash256, } @@ -103,6 +135,11 @@ impl, Cold: ItemStore> BackgroundMigrator Self { + // Estimate last migration run from DB split slot. + let prev_migration = Arc::new(Mutex::new(PrevMigration { + epoch: db.get_split_slot().epoch(E::slots_per_epoch()), + epochs_per_migration: config.epochs_per_migration, + })); let tx_thread = if config.blocking { None } else { @@ -111,6 +148,7 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator prev_migration.epoch, + "new_finalized_epoch" => epoch, + "epochs_per_migration" => prev_migration.epochs_per_migration, + ); + return; + } + + // Update the previous migration epoch immediately to avoid holding the lock. If the + // migration doesn't succeed then the next migration will be retried at the next scheduled + // run. + prev_migration.epoch = epoch; + drop(prev_migration); + debug!(log, "Database consolidation started"); let finalized_state_root = notif.finalized_state_root; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 6520c9ba9c8..5fc05c5551f 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -516,18 +516,23 @@ where let validator_keypairs = self .validator_keypairs .expect("cannot build without validator keypairs"); + let chain_config = self.chain_config.unwrap_or_default(); let mut builder = BeaconChainBuilder::new(self.eth_spec_instance) .logger(log.clone()) .custom_spec(spec) .store(self.store.expect("cannot build without store")) - .store_migrator_config(MigratorConfig::default().blocking()) + .store_migrator_config( + MigratorConfig::default() + .blocking() + .epochs_per_migration(chain_config.epochs_per_migration), + ) .task_executor(self.runtime.task_executor.clone()) .execution_layer(self.execution_layer) .dummy_eth1_backend() .expect("should build dummy backend") .shutdown_sender(shutdown_tx) - .chain_config(self.chain_config.unwrap_or_default()) + .chain_config(chain_config) .event_handler(Some(ServerSentEventHandler::new_with_capacity( log.clone(), 5, diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index f3ed7f65a90..b1a507eaa59 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -11,7 +11,7 @@ use beacon_chain::{ slot_clock::{SlotClock, SystemTimeSlotClock}, state_advance_timer::spawn_state_advance_timer, store::{HotColdDB, ItemStore, LevelDB, StoreConfig}, - BeaconChain, BeaconChainTypes, Eth1ChainBackend, ServerSentEventHandler, + BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler, }; use beacon_processor::{ work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessor, BeaconProcessorSend, @@ -184,6 +184,9 @@ where .store(store) .task_executor(context.executor.clone()) .custom_spec(spec.clone()) + .store_migrator_config( + MigratorConfig::default().epochs_per_migration(chain_config.epochs_per_migration), + ) .chain_config(chain_config) .graffiti(graffiti) .event_handler(event_handler) diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index 9e4aadef17e..5e86053771e 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -70,15 +70,32 @@ impl StateId { .map_err(BeaconChainError::DBError) .map_err(warp_utils::reject::beacon_chain_error)? { - let execution_optimistic = chain - .canonical_head - .fork_choice_read_lock() - .is_optimistic_or_invalid_block_no_fallback(&hot_summary.latest_block_root) - .map_err(BeaconChainError::ForkChoiceError) - .map_err(warp_utils::reject::beacon_chain_error)?; - let finalized = chain - .is_finalized_state(root, hot_summary.slot) + let finalization_status = chain + .state_finalization_and_canonicity(root, hot_summary.slot) .map_err(warp_utils::reject::beacon_chain_error)?; + let finalized = finalization_status.is_finalized(); + let fork_choice = chain.canonical_head.fork_choice_read_lock(); + let execution_optimistic = if finalization_status.slot_is_finalized + && !finalization_status.canonical + { + // This block is permanently orphaned and has likely been pruned from fork + // choice. If it isn't found in fork choice, mark it optimistic to be on the + // safe side. + fork_choice + .is_optimistic_or_invalid_block_no_fallback( + &hot_summary.latest_block_root, + ) + .unwrap_or(true) + } else { + // This block is either old and finalized, or recent and unfinalized, so + // it's safe to fallback to the optimistic status of the finalized block. + chain + .canonical_head + .fork_choice_read_lock() + .is_optimistic_or_invalid_block(&hot_summary.latest_block_root) + .map_err(BeaconChainError::ForkChoiceError) + .map_err(warp_utils::reject::beacon_chain_error)? + }; return Ok((*root, execution_optimistic, finalized)); } else if let Some(_cold_state_slot) = chain .store diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index da92419744e..d7ea7c26284 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -2,8 +2,9 @@ use beacon_chain::{ chain_config::{DisallowedReOrgOffsets, ReOrgThreshold}, test_utils::{AttestationStrategy, BlockStrategy, SyncCommitteeStrategy}, + ChainConfig, }; -use eth2::types::DepositContractData; +use eth2::types::{DepositContractData, StateId}; use execution_layer::{ForkchoiceState, PayloadAttributes}; use http_api::test_utils::InteractiveTester; use parking_lot::Mutex; @@ -17,7 +18,7 @@ use std::time::Duration; use tree_hash::TreeHash; use types::{ Address, Epoch, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload, - MainnetEthSpec, ProposerPreparationData, Slot, + MainnetEthSpec, MinimalEthSpec, ProposerPreparationData, Slot, }; type E = MainnetEthSpec; @@ -48,6 +49,76 @@ async fn deposit_contract_custom_network() { assert_eq!(result, expected); } +// Test that state lookups by root function correctly for states that are finalized but still +// present in the hot database, and have had their block pruned from fork choice. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn state_by_root_pruned_from_fork_choice() { + type E = MinimalEthSpec; + + let validator_count = 24; + let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + + let tester = InteractiveTester::::new_with_initializer_and_mutator( + Some(spec.clone()), + validator_count, + Some(Box::new(move |builder| { + builder + .deterministic_keypairs(validator_count) + .fresh_ephemeral_store() + .chain_config(ChainConfig { + epochs_per_migration: 1024, + ..ChainConfig::default() + }) + })), + None, + ) + .await; + + let client = &tester.client; + let harness = &tester.harness; + + // Create some chain depth and finalize beyond fork choice's pruning depth. + let num_epochs = 8_u64; + let num_initial = num_epochs * E::slots_per_epoch(); + harness.advance_slot(); + harness + .extend_chain_with_sync( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + SyncCommitteeStrategy::NoValidators, + ) + .await; + + // Should now be finalized. + let finalized_epoch = harness.finalized_checkpoint().epoch; + assert_eq!(finalized_epoch, num_epochs - 2); + + // The split slot should still be at 0. + assert_eq!(harness.chain.store.get_split_slot(), 0); + + // States that are between the split and the finalized slot should be able to be looked up by + // state root. + for slot in 0..finalized_epoch.start_slot(E::slots_per_epoch()).as_u64() { + let state_root = harness + .chain + .state_root_at_slot(Slot::new(slot)) + .unwrap() + .unwrap(); + let response = client + .get_debug_beacon_states::(StateId::Root(state_root)) + .await + .unwrap() + .unwrap(); + + assert!(response.finalized.unwrap()); + assert!(!response.execution_optimistic.unwrap()); + + let mut state = response.data; + assert_eq!(state.update_tree_hash_cache().unwrap(), state_root); + } +} + /// Data structure for tracking fork choice updates received by the mock execution layer. #[derive(Debug, Default)] struct ForkChoiceUpdates { diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index fc53f578880..e46c3d8ca11 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -533,6 +533,16 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { [default: 8192 (mainnet) or 64 (minimal)]") .takes_value(true) ) + .arg( + Arg::with_name("epochs-per-migration") + .long("epochs-per-migration") + .value_name("N") + .help("The number of epochs to wait between running the migration of data from the \ + hot DB to the cold DB. Less frequent runs can be useful for minimizing disk \ + writes") + .default_value("1") + .takes_value(true) + ) .arg( Arg::with_name("block-cache-size") .long("block-cache-size") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 948c70dd41f..4abf649bfbb 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -400,6 +400,12 @@ pub fn get_config( client_config.store.prune_payloads = prune_payloads; } + if let Some(epochs_per_migration) = + clap_utils::parse_optional(cli_args, "epochs-per-migration")? + { + client_config.chain.epochs_per_migration = epochs_per_migration; + } + /* * Zero-ports * diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 4badcef3cf2..bc5f610881b 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1733,6 +1733,24 @@ fn no_reconstruct_historic_states_flag() { .run_with_zero_port() .with_config(|config| assert!(!config.chain.reconstruct_historic_states)); } +#[test] +fn epochs_per_migration_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.chain.epochs_per_migration, + beacon_node::beacon_chain::migrate::DEFAULT_EPOCHS_PER_MIGRATION + ) + }); +} +#[test] +fn epochs_per_migration_override() { + CommandLineTest::new() + .flag("epochs-per-migration", Some("128")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.chain.epochs_per_migration, 128)); +} // Tests for Slasher flags. // Using `--slasher-max-db-size` to work around https://github.com/sigp/lighthouse/issues/2342