From 4380e0550299c78fb6a55be3f80c6b5a2cda9b3d Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 22 Feb 2024 13:22:29 +1100 Subject: [PATCH] Delete the participation cache (#16) --- .../beacon_chain/src/attestation_rewards.rs | 46 +- .../http_api/src/attestation_performance.rs | 8 - consensus/fork_choice/src/fork_choice.rs | 6 +- consensus/state_processing/src/all_caches.rs | 2 +- .../update_progressive_balances_cache.rs | 79 ++- .../src/per_block_processing.rs | 2 +- .../altair/sync_committee.rs | 8 +- .../src/per_block_processing/errors.rs | 9 - .../src/per_epoch_processing/altair.rs | 4 +- .../altair/participation_cache.rs | 479 ------------------ .../src/per_epoch_processing/errors.rs | 8 - .../src/per_epoch_processing/single_pass.rs | 2 +- .../state_processing/src/upgrade/altair.rs | 2 +- consensus/types/src/beacon_state.rs | 1 - .../progressive_balances_cache.rs | 2 +- .../ef_tests/src/cases/epoch_processing.rs | 2 +- testing/ef_tests/src/cases/operations.rs | 6 +- 17 files changed, 101 insertions(+), 565 deletions(-) delete mode 100644 consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs diff --git a/beacon_node/beacon_chain/src/attestation_rewards.rs b/beacon_node/beacon_chain/src/attestation_rewards.rs index 25f05b32ff4..5c3ddb9b3b9 100644 --- a/beacon_node/beacon_chain/src/attestation_rewards.rs +++ b/beacon_node/beacon_chain/src/attestation_rewards.rs @@ -1,7 +1,6 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::lighthouse::attestation_rewards::{IdealAttestationRewards, TotalAttestationRewards}; use eth2::lighthouse::StandardAttestationRewards; -use participation_cache::ParticipationCache; use safe_arith::SafeArith; use serde_utils::quoted_u64::Quoted; use slog::debug; @@ -10,7 +9,7 @@ use state_processing::per_epoch_processing::altair::{ }; use state_processing::{ common::altair::BaseRewardPerIncrement, - per_epoch_processing::altair::{participation_cache, rewards_and_penalties::get_flag_weight}, + per_epoch_processing::altair::rewards_and_penalties::get_flag_weight, }; use std::collections::HashMap; use store::consts::altair::{ @@ -134,8 +133,6 @@ impl BeaconChain { let spec = &self.spec; // Calculate ideal_rewards - let participation_cache = ParticipationCache::new(&state, spec) - .map_err(|_| BeaconChainError::AttestationRewardsError)?; process_justification_and_finalization(&state)?.apply_changes_to_state(&mut state); process_inactivity_updates_slow(&mut state, spec)?; @@ -147,14 +144,14 @@ impl BeaconChain { let weight = get_flag_weight(flag_index) .map_err(|_| BeaconChainError::AttestationRewardsError)?; - let unslashed_participating_balance = participation_cache - .previous_epoch_flag_attesting_balance(flag_index) - .map_err(|_| BeaconChainError::AttestationRewardsError)?; + let unslashed_participating_balance = state + .progressive_balances_cache() + .previous_epoch_flag_attesting_balance(flag_index)?; let unslashed_participating_increments = unslashed_participating_balance.safe_div(spec.effective_balance_increment)?; - let total_active_balance = participation_cache.current_epoch_total_active_balance(); + let total_active_balance = state.get_total_active_balance()?; let active_increments = total_active_balance.safe_div(spec.effective_balance_increment)?; @@ -190,7 +187,7 @@ impl BeaconChain { let mut total_rewards: Vec = Vec::new(); let validators = if validators.is_empty() { - participation_cache.eligible_validator_indices().to_vec() + Self::all_eligible_validator_indices(&state, previous_epoch)? } else { Self::validators_ids_to_indices(&mut state, validators)? }; @@ -198,7 +195,7 @@ impl BeaconChain { for &validator_index in &validators { // Return 0s for unknown/inactive validator indices. This is a bit different from stable // where we error for unknown pubkeys. - let Ok(validator) = participation_cache.get_validator(validator_index) else { + let Ok(validator) = state.get_validator(validator_index) else { debug!( self.log, "No rewards for inactive/unknown validator"; @@ -215,7 +212,11 @@ impl BeaconChain { }); continue; }; - let eligible = validator.is_eligible; + let previous_epoch_participation_flags = state + .previous_epoch_participation()? + .get(validator_index) + .ok_or(BeaconChainError::AttestationRewardsError)?; + let eligible = state.is_eligible_validator(previous_epoch, validator)?; let mut head_reward = 0i64; let mut target_reward = 0i64; let mut source_reward = 0i64; @@ -228,9 +229,8 @@ impl BeaconChain { let (ideal_reward, penalty) = ideal_rewards_hashmap .get(&(flag_index, effective_balance)) .ok_or(BeaconChainError::AttestationRewardsError)?; - let voted_correctly = validator - .is_unslashed_participating_index(flag_index) - .map_err(|_| BeaconChainError::AttestationRewardsError)?; + let voted_correctly = !validator.slashed + && previous_epoch_participation_flags.has_flag(flag_index)?; if voted_correctly { if flag_index == TIMELY_HEAD_FLAG_INDEX { head_reward += *ideal_reward as i64; @@ -314,6 +314,24 @@ impl BeaconChain { Ok(max_steps) } + fn all_eligible_validator_indices( + state: &BeaconState, + previous_epoch: Epoch, + ) -> Result, BeaconChainError> { + state + .validators() + .iter() + .enumerate() + .filter_map(|(i, validator)| { + state + .is_eligible_validator(previous_epoch, validator) + .map(|eligible| eligible.then_some(i)) + .map_err(BeaconChainError::BeaconStateError) + .transpose() + }) + .collect() + } + fn validators_ids_to_indices( state: &mut BeaconState, validators: Vec, diff --git a/beacon_node/http_api/src/attestation_performance.rs b/beacon_node/http_api/src/attestation_performance.rs index 6e3ebcccec5..d4f9916814a 100644 --- a/beacon_node/http_api/src/attestation_performance.rs +++ b/beacon_node/http_api/src/attestation_performance.rs @@ -3,7 +3,6 @@ use eth2::lighthouse::{ AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, }; use state_processing::{ - per_epoch_processing::altair::participation_cache::Error as ParticipationCacheError, per_epoch_processing::EpochProcessingSummary, BlockReplayError, BlockReplayer, }; use std::sync::Arc; @@ -18,7 +17,6 @@ const BLOCK_ROOT_CHUNK_SIZE: usize = 100; enum AttestationPerformanceError { BlockReplay(#[allow(dead_code)] BlockReplayError), BeaconState(#[allow(dead_code)] BeaconStateError), - ParticipationCache(#[allow(dead_code)] ParticipationCacheError), UnableToFindValidator(#[allow(dead_code)] usize), } @@ -34,12 +32,6 @@ impl From for AttestationPerformanceError { } } -impl From for AttestationPerformanceError { - fn from(e: ParticipationCacheError) -> Self { - Self::ParticipationCache(e) - } -} - pub fn get_attestation_performance( target: String, query: AttestationPerformanceQuery, diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index cbc08c79afc..faa34834b11 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -747,7 +747,11 @@ where | BeaconBlockRef::Capella(_) | BeaconBlockRef::Merge(_) | BeaconBlockRef::Altair(_) => { - // FIXME(sproul): initialize progressive balances + // NOTE: Processing justification & finalization requires the progressive + // balances cache, but we cannot initialize it here as we only have an + // immutable reference. The state *should* have come straight from block + // processing, which initialises the cache, but if we add other `on_block` + // calls in future it could be worth passing a mutable reference. per_epoch_processing::altair::process_justification_and_finalization(state)? } BeaconBlockRef::Base(_) => { diff --git a/consensus/state_processing/src/all_caches.rs b/consensus/state_processing/src/all_caches.rs index bd64d9f6643..af0e394221c 100644 --- a/consensus/state_processing/src/all_caches.rs +++ b/consensus/state_processing/src/all_caches.rs @@ -24,7 +24,7 @@ impl AllCaches for BeaconState { fn build_all_caches(&mut self, spec: &ChainSpec) -> Result<(), EpochCacheError> { self.build_caches(spec)?; initialize_epoch_cache(self, spec)?; - initialize_progressive_balances_cache(self, None, spec)?; + initialize_progressive_balances_cache(self, spec)?; Ok(()) } diff --git a/consensus/state_processing/src/common/update_progressive_balances_cache.rs b/consensus/state_processing/src/common/update_progressive_balances_cache.rs index fb65e583ba8..3fc21ed41d8 100644 --- a/consensus/state_processing/src/common/update_progressive_balances_cache.rs +++ b/consensus/state_processing/src/common/update_progressive_balances_cache.rs @@ -3,21 +3,16 @@ use crate::metrics::{ PARTICIPATION_CURR_EPOCH_TARGET_ATTESTING_GWEI_PROGRESSIVE_TOTAL, PARTICIPATION_PREV_EPOCH_TARGET_ATTESTING_GWEI_PROGRESSIVE_TOTAL, }; -use crate::per_epoch_processing::altair::ParticipationCache; use crate::{BlockProcessingError, EpochProcessingError}; use lighthouse_metrics::set_gauge; -use std::borrow::Cow; use types::{ is_progressive_balances_enabled, BeaconState, BeaconStateError, ChainSpec, Epoch, - EpochTotalBalances, EthSpec, ProgressiveBalancesCache, + EpochTotalBalances, EthSpec, ParticipationFlags, ProgressiveBalancesCache, Validator, }; -/// Initializes the `ProgressiveBalancesCache` cache using balance values from the -/// `ParticipationCache`. If the optional `&ParticipationCache` is not supplied, it will be computed -/// from the `BeaconState`. +/// Initializes the `ProgressiveBalancesCache` if it is unbuilt. pub fn initialize_progressive_balances_cache( state: &mut BeaconState, - maybe_participation_cache: Option<&ParticipationCache>, spec: &ChainSpec, ) -> Result<(), BeaconStateError> { if !is_progressive_balances_enabled(state) @@ -26,29 +21,37 @@ pub fn initialize_progressive_balances_cache( return Ok(()); } - // FIXME(sproul): simplify the participation cache - let participation_cache = match maybe_participation_cache { - Some(cache) => Cow::Borrowed(cache), - None => { - state.build_total_active_balance_cache_at(state.current_epoch(), spec)?; - Cow::Owned( - ParticipationCache::new(state, spec) - .map_err(|e| BeaconStateError::ParticipationCacheError(format!("{e:?}")))?, - ) + // Calculate the total flag balances for previous & current epoch in a single iteration. + // This calculates `get_total_balance(unslashed_participating_indices(..))` for each flag in + // the current and previous epoch. + let current_epoch = state.current_epoch(); + let previous_epoch = state.previous_epoch(); + let mut previous_epoch_cache = EpochTotalBalances::new(spec); + let mut current_epoch_cache = EpochTotalBalances::new(spec); + for ((validator, current_epoch_flags), previous_epoch_flags) in state + .validators() + .iter() + .zip(state.current_epoch_participation()?) + .zip(state.previous_epoch_participation()?) + { + // Exclude slashed validators. We are calculating *unslashed* participating totals. + if validator.slashed { + continue; } - }; - let current_epoch = state.current_epoch(); - let previous_epoch_cache = EpochTotalBalances { - total_flag_balances: participation_cache - .previous_epoch_participation - .total_flag_balances, - }; - let current_epoch_cache = EpochTotalBalances { - total_flag_balances: participation_cache - .current_epoch_participation - .total_flag_balances, - }; + // Update current epoch flag balances. + if validator.is_active_at(current_epoch) { + update_flag_total_balances(&mut current_epoch_cache, *current_epoch_flags, validator)?; + } + // Update previous epoch flag balances. + if validator.is_active_at(previous_epoch) { + update_flag_total_balances( + &mut previous_epoch_cache, + *previous_epoch_flags, + validator, + )?; + } + } state.progressive_balances_cache_mut().initialize( current_epoch, @@ -61,6 +64,26 @@ pub fn initialize_progressive_balances_cache( Ok(()) } +/// During the initialization of the progressive balances for a single epoch, add +/// `validator.effective_balance` to the flag total, for each flag present in `participation_flags`. +/// +/// Pre-conditions: +/// +/// - `validator` must not be slashed +/// - the `participation_flags` must be for `validator` in the same epoch as the `total_balances` +fn update_flag_total_balances( + total_balances: &mut EpochTotalBalances, + participation_flags: ParticipationFlags, + validator: &Validator, +) -> Result<(), BeaconStateError> { + for (flag, balance) in total_balances.total_flag_balances.iter_mut().enumerate() { + if participation_flags.has_flag(flag)? { + balance.safe_add_assign(validator.effective_balance)?; + } + } + Ok(()) +} + /// Updates the `ProgressiveBalancesCache` when a new target attestation has been processed. pub fn update_progressive_balances_on_attestation( state: &mut BeaconState, diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index c994066c5d9..b358a16447f 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -121,7 +121,7 @@ pub fn per_block_processing>( // Build epoch cache if it hasn't already been built, or if it is no longer valid initialize_epoch_cache(state, spec)?; - initialize_progressive_balances_cache(state, None, spec)?; + initialize_progressive_balances_cache(state, spec)?; state.build_slashings_cache()?; let verify_signatures = match block_signature_strategy { diff --git a/consensus/state_processing/src/per_block_processing/altair/sync_committee.rs b/consensus/state_processing/src/per_block_processing/altair/sync_committee.rs index 99653d75618..3faa0d1754f 100644 --- a/consensus/state_processing/src/per_block_processing/altair/sync_committee.rs +++ b/consensus/state_processing/src/per_block_processing/altair/sync_committee.rs @@ -68,12 +68,10 @@ pub fn process_sync_aggregate( increase_balance(state, participant_index, participant_reward)?; } proposer_balance.safe_add_assign(proposer_reward)?; + } else if participant_index == proposer_index { + proposer_balance = proposer_balance.saturating_sub(participant_reward); } else { - if participant_index == proposer_index { - proposer_balance = proposer_balance.saturating_sub(participant_reward); - } else { - decrease_balance(state, participant_index, participant_reward)?; - } + decrease_balance(state, participant_index, participant_reward)?; } } diff --git a/consensus/state_processing/src/per_block_processing/errors.rs b/consensus/state_processing/src/per_block_processing/errors.rs index 4576853af5c..15c2b800912 100644 --- a/consensus/state_processing/src/per_block_processing/errors.rs +++ b/consensus/state_processing/src/per_block_processing/errors.rs @@ -1,8 +1,6 @@ use super::signature_sets::Error as SignatureSetError; -use crate::per_epoch_processing::altair::participation_cache; use crate::{ContextError, EpochCacheError}; use merkle_proof::MerkleTreeError; -use participation_cache::Error as ParticipationCacheError; use safe_arith::ArithError; use ssz::DecodeError; use types::*; @@ -90,7 +88,6 @@ pub enum BlockProcessingError { found: Hash256, }, WithdrawalCredentialsInvalid, - ParticipationCacheError(ParticipationCacheError), } impl From for BlockProcessingError { @@ -154,12 +151,6 @@ impl From> for BlockProcessingError { } } -impl From for BlockProcessingError { - fn from(e: ParticipationCacheError) -> Self { - BlockProcessingError::ParticipationCacheError(e) - } -} - /// A conversion that consumes `self` and adds an `index` variable to resulting struct. /// /// Used here to allow converting an error into an upstream error that points to the object that diff --git a/consensus/state_processing/src/per_epoch_processing/altair.rs b/consensus/state_processing/src/per_epoch_processing/altair.rs index f5905aaa351..be2bba405bf 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair.rs @@ -11,7 +11,6 @@ use crate::per_epoch_processing::{ }; pub use inactivity_updates::process_inactivity_updates_slow; pub use justification_and_finalization::process_justification_and_finalization; -pub use participation_cache::ParticipationCache; pub use participation_flag_updates::process_participation_flag_updates; pub use rewards_and_penalties::process_rewards_and_penalties_slow; pub use sync_committee_updates::process_sync_committee_updates; @@ -19,7 +18,6 @@ use types::{BeaconState, ChainSpec, EthSpec, RelativeEpoch}; pub mod inactivity_updates; pub mod justification_and_finalization; -pub mod participation_cache; pub mod participation_flag_updates; pub mod rewards_and_penalties; pub mod sync_committee_updates; @@ -34,7 +32,7 @@ pub fn process_epoch( state.build_committee_cache(RelativeEpoch::Next, spec)?; state.build_total_active_balance_cache_at(state.current_epoch(), spec)?; initialize_epoch_cache(state, spec)?; - initialize_progressive_balances_cache::(state, None, spec)?; + initialize_progressive_balances_cache::(state, spec)?; let sync_committee = state.current_sync_committee()?.clone(); diff --git a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs deleted file mode 100644 index 689be28adc0..00000000000 --- a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs +++ /dev/null @@ -1,479 +0,0 @@ -//! Provides the `ParticipationCache`, a custom Lighthouse cache which attempts to reduce CPU and -//! memory usage by: -//! -//! - Caching a map of `validator_index -> participation_flags` for all active validators in the -//! previous and current epochs. -//! - Caching the total balances of: -//! - All active validators. -//! - All active validators matching each of the three "timely" flags. -//! - Caching the "eligible" validators. -//! -//! Additionally, this cache is returned from the `altair::process_epoch` function and can be used -//! to get useful summaries about the validator participation in an epoch. - -use std::collections::HashMap; - -use crate::common::altair::{get_base_reward, BaseRewardPerIncrement}; -use safe_arith::{ArithError, SafeArith}; -use types::{ - consts::altair::{ - NUM_FLAG_INDICES, TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX, - TIMELY_TARGET_FLAG_INDEX, - }, - Balance, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, ParticipationFlags, - RelativeEpoch, Unsigned, Validator, -}; - -#[derive(Debug, PartialEq, Clone)] -pub enum Error { - InvalidFlagIndex(usize), - NoUnslashedParticipatingIndices, - MissingValidator(usize), - BeaconState(BeaconStateError), - Arith(ArithError), - InvalidValidatorIndex(usize), - InconsistentTotalActiveBalance { cached: u64, computed: u64 }, -} - -impl From for Error { - fn from(e: BeaconStateError) -> Self { - Self::BeaconState(e) - } -} - -impl From for Error { - fn from(e: ArithError) -> Self { - Self::Arith(e) - } -} - -/// Caches the participation values for one epoch (either the previous or current). -#[derive(PartialEq, Debug, Clone)] -pub(crate) struct SingleEpochParticipationCache { - /// Stores the sum of the balances for all validators in `self.unslashed_participating_indices` - /// for all flags in `NUM_FLAG_INDICES`. - /// - /// A flag balance is only incremented if a validator is in that flag set. - pub(crate) total_flag_balances: [Balance; NUM_FLAG_INDICES], - /// Stores the sum of all balances of all validators in `self.unslashed_participating_indices` - /// (regardless of which flags are set). - total_active_balance: Balance, -} - -impl SingleEpochParticipationCache { - fn new(spec: &ChainSpec) -> Self { - let zero_balance = Balance::zero(spec.effective_balance_increment); - - Self { - total_flag_balances: [zero_balance; NUM_FLAG_INDICES], - total_active_balance: zero_balance, - } - } - - /// Returns the total balance of attesters who have `flag_index` set. - fn total_flag_balance(&self, flag_index: usize) -> Result { - self.total_flag_balances - .get(flag_index) - .map(Balance::get) - .ok_or(Error::InvalidFlagIndex(flag_index)) - } - - /// Returns the raw total balance of attesters who have `flag_index` set. - fn total_flag_balance_raw(&self, flag_index: usize) -> Result { - self.total_flag_balances - .get(flag_index) - .copied() - .ok_or(Error::InvalidFlagIndex(flag_index)) - } - - /// Process an **active** validator, reading from the `epoch_participation` with respect to the - /// `relative_epoch`. - /// - /// ## Errors - /// - /// - An error will be returned if the `val_index` validator is inactive at the given - /// `relative_epoch`. - fn process_active_validator( - &mut self, - val_index: usize, - validator: &Validator, - epoch_participation: &ParticipationFlags, - current_epoch: Epoch, - relative_epoch: RelativeEpoch, - ) -> Result<(), BeaconStateError> { - // Sanity check to ensure the validator is active. - let epoch = relative_epoch.into_epoch(current_epoch); - if !validator.is_active_at(epoch) { - return Err(BeaconStateError::ValidatorIsInactive { val_index }); - } - - // All active validators increase the total active balance. - self.total_active_balance - .safe_add_assign(validator.effective_balance)?; - - // Only unslashed validators may proceed. - if validator.slashed { - return Ok(()); - } - - // Iterate through all the flags and increment the total flag balances for whichever flags - // are set for `val_index`. - for (flag, balance) in self.total_flag_balances.iter_mut().enumerate() { - if epoch_participation.has_flag(flag)? { - balance.safe_add_assign(validator.effective_balance)?; - } - } - - Ok(()) - } -} - -#[derive(Debug, PartialEq, Clone)] -pub struct ValidatorInfo { - pub effective_balance: u64, - pub base_reward: u64, - pub is_eligible: bool, - pub is_slashed: bool, - pub is_active_current_epoch: bool, - pub is_active_previous_epoch: bool, - pub previous_epoch_participation: ParticipationFlags, -} - -impl ValidatorInfo { - #[inline] - pub fn is_unslashed_participating_index(&self, flag_index: usize) -> Result { - Ok(self.is_active_previous_epoch - && !self.is_slashed - && self - .previous_epoch_participation - .has_flag(flag_index) - .map_err(|_| Error::InvalidFlagIndex(flag_index))?) - } -} - -/// Single `HashMap` for validator info relevant to `process_epoch`. -#[derive(Debug, PartialEq, Clone)] -struct ValidatorInfoCache { - info: Vec>, -} - -impl ValidatorInfoCache { - pub fn new(capacity: usize) -> Self { - Self { - info: vec![None; capacity], - } - } -} - -/// Maintains a cache to be used during `altair::process_epoch`. -#[derive(PartialEq, Debug, Clone)] -pub struct ParticipationCache { - current_epoch: Epoch, - /// Caches information about active validators pertaining to `self.current_epoch`. - pub(crate) current_epoch_participation: SingleEpochParticipationCache, - previous_epoch: Epoch, - /// Caches information about active validators pertaining to `self.previous_epoch`. - pub(crate) previous_epoch_participation: SingleEpochParticipationCache, - /// Caches validator information relevant to `process_epoch`. - validators: ValidatorInfoCache, - /// Caches the result of the `get_eligible_validator_indices` function. - eligible_indices: Vec, - /// Caches the indices and effective balances of validators that need to be processed by - /// `process_slashings`. - process_slashings_indices: Vec<(usize, u64)>, - /// Updates to the inactivity scores if we are definitely not in an inactivity leak. - pub inactivity_score_updates: Option>, -} - -impl ParticipationCache { - /// Instantiate `Self`, returning a fully initialized cache. - /// - /// ## Errors - /// - /// - The provided `state` **must** be an Altair state. An error will be returned otherwise. - pub fn new(state: &BeaconState, spec: &ChainSpec) -> Result { - let current_epoch = state.current_epoch(); - let previous_epoch = state.previous_epoch(); - - // Both the current/previous epoch participations are set to a capacity that is slightly - // larger than required. The difference will be due slashed-but-active validators. - let mut current_epoch_participation = SingleEpochParticipationCache::new(spec); - let mut previous_epoch_participation = SingleEpochParticipationCache::new(spec); - - let mut validators = ValidatorInfoCache::new(state.validators().len()); - - let current_epoch_total_active_balance = state.get_total_active_balance()?; - let base_reward_per_increment = - BaseRewardPerIncrement::new(current_epoch_total_active_balance, spec)?; - - // Contains the set of validators which are either: - // - // - Active in the previous epoch. - // - Slashed, but not yet withdrawable. - // - // Using the full length of `state.validators` is almost always overkill, but it ensures no - // reallocations. - let mut eligible_indices = Vec::with_capacity(state.validators().len()); - - let mut process_slashings_indices = vec![]; - - // Fast path for inactivity scores update when we are definitely not in an inactivity leak. - // This breaks the dependence of `process_inactivity_updates` on the finalization - // re-calculation. - let definitely_not_in_inactivity_leak = state - .finalized_checkpoint() - .epoch - .safe_add(spec.min_epochs_to_inactivity_penalty)? - .safe_add(1)? - >= state.current_epoch(); - let mut inactivity_score_updates = HashMap::default(); - - // Iterate through all validators, updating: - // - // 1. Validator participation for current and previous epochs. - // 2. The "eligible indices". - // - // Care is taken to ensure that the ordering of `eligible_indices` is the same as the - // `get_eligible_validator_indices` function in the spec. - let iter = state - .validators() - .iter() - .zip(state.current_epoch_participation()?) - .zip(state.previous_epoch_participation()?) - .zip(state.inactivity_scores()?) - .enumerate(); - for (val_index, (((val, curr_epoch_flags), prev_epoch_flags), inactivity_score)) in iter { - let is_active_current_epoch = val.is_active_at(current_epoch); - let is_active_previous_epoch = val.is_active_at(previous_epoch); - let is_eligible = state.is_eligible_validator(previous_epoch, val)?; - - if is_active_current_epoch { - current_epoch_participation.process_active_validator( - val_index, - val, - curr_epoch_flags, - current_epoch, - RelativeEpoch::Current, - )?; - } - - if is_active_previous_epoch { - assert!(is_eligible); - - previous_epoch_participation.process_active_validator( - val_index, - val, - prev_epoch_flags, - current_epoch, - RelativeEpoch::Previous, - )?; - } - - if val.slashed - && current_epoch.safe_add(T::EpochsPerSlashingsVector::to_u64().safe_div(2)?)? - == val.withdrawable_epoch - { - process_slashings_indices.push((val_index, val.effective_balance)); - } - - // Note: a validator might still be "eligible" whilst returning `false` to - // `Validator::is_active_at`. It's also possible for a validator to be active - // in the current epoch without being eligible (if it was just activated). - if is_eligible { - eligible_indices.push(val_index); - } - - let mut validator_info = ValidatorInfo { - effective_balance: val.effective_balance, - base_reward: 0, // not read - is_eligible, - is_slashed: val.slashed, - is_active_current_epoch, - is_active_previous_epoch, - previous_epoch_participation: *prev_epoch_flags, - }; - - // Calculate inactivity updates. - if is_eligible && definitely_not_in_inactivity_leak { - let mut new_inactivity_score = - if validator_info.is_unslashed_participating_index(TIMELY_TARGET_FLAG_INDEX)? { - inactivity_score.saturating_sub(1) - } else { - inactivity_score.safe_add(spec.inactivity_score_bias)? - }; - - // Decrease the score of all validators for forgiveness when not during a leak - new_inactivity_score = - new_inactivity_score.saturating_sub(spec.inactivity_score_recovery_rate); - - if new_inactivity_score != *inactivity_score { - inactivity_score_updates.insert(val_index, new_inactivity_score); - } - } - - #[allow(clippy::indexing_slicing)] - if is_eligible || is_active_current_epoch { - let effective_balance = val.effective_balance; - let base_reward = - get_base_reward(effective_balance, base_reward_per_increment, spec)?; - validator_info.base_reward = base_reward; - validators.info[val_index] = Some(validator_info); - } - } - - // Sanity check total active balance. - if current_epoch_participation.total_active_balance.get() - != current_epoch_total_active_balance - { - return Err(Error::InconsistentTotalActiveBalance { - cached: current_epoch_total_active_balance, - computed: current_epoch_participation.total_active_balance.get(), - }); - } - - Ok(Self { - current_epoch, - current_epoch_participation, - previous_epoch, - previous_epoch_participation, - validators, - eligible_indices, - process_slashings_indices, - inactivity_score_updates: definitely_not_in_inactivity_leak - .then_some(inactivity_score_updates), - }) - } - - /// Equivalent to the specification `get_eligible_validator_indices` function. - pub fn eligible_validator_indices(&self) -> &[usize] { - &self.eligible_indices - } - - pub fn process_slashings_indices(&mut self) -> Vec<(usize, u64)> { - std::mem::take(&mut self.process_slashings_indices) - } - - /* - * Balances - */ - - pub fn current_epoch_total_active_balance(&self) -> u64 { - self.current_epoch_participation.total_active_balance.get() - } - - pub fn current_epoch_target_attesting_balance(&self) -> Result { - self.current_epoch_participation - .total_flag_balance(TIMELY_TARGET_FLAG_INDEX) - } - - pub fn current_epoch_target_attesting_balance_raw(&self) -> Result { - self.current_epoch_participation - .total_flag_balance_raw(TIMELY_TARGET_FLAG_INDEX) - } - - pub fn previous_epoch_total_active_balance(&self) -> u64 { - self.previous_epoch_participation.total_active_balance.get() - } - - pub fn previous_epoch_target_attesting_balance(&self) -> Result { - self.previous_epoch_flag_attesting_balance(TIMELY_TARGET_FLAG_INDEX) - } - - pub fn previous_epoch_target_attesting_balance_raw(&self) -> Result { - self.previous_epoch_participation - .total_flag_balance_raw(TIMELY_TARGET_FLAG_INDEX) - } - - pub fn previous_epoch_source_attesting_balance(&self) -> Result { - self.previous_epoch_flag_attesting_balance(TIMELY_SOURCE_FLAG_INDEX) - } - - pub fn previous_epoch_head_attesting_balance(&self) -> Result { - self.previous_epoch_flag_attesting_balance(TIMELY_HEAD_FLAG_INDEX) - } - - pub fn previous_epoch_flag_attesting_balance(&self, flag_index: usize) -> Result { - self.previous_epoch_participation - .total_flag_balance(flag_index) - } - - /* - * Active/Unslashed - */ - - pub fn is_active_unslashed_in_previous_epoch(&self, val_index: usize) -> bool { - self.get_validator(val_index).map_or(false, |validator| { - validator.is_active_previous_epoch && !validator.is_slashed - }) - } - - pub fn is_active_unslashed_in_current_epoch(&self, val_index: usize) -> bool { - self.get_validator(val_index).map_or(false, |validator| { - validator.is_active_current_epoch && !validator.is_slashed - }) - } - - pub fn get_validator(&self, val_index: usize) -> Result<&ValidatorInfo, Error> { - self.validators - .info - .get(val_index) - .ok_or(Error::MissingValidator(val_index))? - .as_ref() - .ok_or(Error::MissingValidator(val_index)) - } - - /* - * Flags - */ - /// Always returns false for a slashed validator. - pub fn is_previous_epoch_timely_source_attester( - &self, - val_index: usize, - ) -> Result { - self.get_validator(val_index) - .map_or(Ok(false), |validator| { - Ok(!validator.is_slashed - && validator - .previous_epoch_participation - .has_flag(TIMELY_SOURCE_FLAG_INDEX) - .map_err(|_| Error::InvalidFlagIndex(TIMELY_SOURCE_FLAG_INDEX))?) - }) - } - - /// Always returns false for a slashed validator. - pub fn is_previous_epoch_timely_target_attester( - &self, - val_index: usize, - ) -> Result { - self.get_validator(val_index) - .map_or(Ok(false), |validator| { - Ok(!validator.is_slashed - && validator - .previous_epoch_participation - .has_flag(TIMELY_TARGET_FLAG_INDEX) - .map_err(|_| Error::InvalidFlagIndex(TIMELY_TARGET_FLAG_INDEX))?) - }) - } - - /// Always returns false for a slashed validator. - pub fn is_previous_epoch_timely_head_attester(&self, val_index: usize) -> Result { - self.get_validator(val_index) - .map_or(Ok(false), |validator| { - Ok(!validator.is_slashed - && validator - .previous_epoch_participation - .has_flag(TIMELY_HEAD_FLAG_INDEX) - .map_err(|_| Error::InvalidFlagIndex(TIMELY_TARGET_FLAG_INDEX))?) - }) - } - - /// Always returns false for a slashed validator. - pub fn is_current_epoch_timely_target_attester( - &self, - _val_index: usize, - ) -> Result { - // FIXME(sproul): decide whether it's worth storing the current epoch participation flags - // *just* for this call. Perhaps the validator API could source it from the state directly. - Ok(false) - } -} diff --git a/consensus/state_processing/src/per_epoch_processing/errors.rs b/consensus/state_processing/src/per_epoch_processing/errors.rs index c1da929c686..c18e1303b26 100644 --- a/consensus/state_processing/src/per_epoch_processing/errors.rs +++ b/consensus/state_processing/src/per_epoch_processing/errors.rs @@ -1,4 +1,3 @@ -use crate::per_epoch_processing::altair::participation_cache::Error as ParticipationCacheError; use types::{BeaconStateError, EpochCacheError, InconsistentFork}; #[derive(Debug, PartialEq)] @@ -24,7 +23,6 @@ pub enum EpochProcessingError { InconsistentStateFork(InconsistentFork), InvalidJustificationBit(ssz_types::Error), InvalidFlagIndex(usize), - ParticipationCache(ParticipationCacheError), EpochCache(EpochCacheError), } @@ -52,12 +50,6 @@ impl From for EpochProcessingError { } } -impl From for EpochProcessingError { - fn from(e: ParticipationCacheError) -> EpochProcessingError { - EpochProcessingError::ParticipationCache(e) - } -} - impl From for EpochProcessingError { fn from(e: EpochCacheError) -> Self { EpochProcessingError::EpochCache(e) diff --git a/consensus/state_processing/src/per_epoch_processing/single_pass.rs b/consensus/state_processing/src/per_epoch_processing/single_pass.rs index fd0825f2e5a..5f23ce0081c 100644 --- a/consensus/state_processing/src/per_epoch_processing/single_pass.rs +++ b/consensus/state_processing/src/per_epoch_processing/single_pass.rs @@ -110,7 +110,7 @@ pub fn process_epoch_single_pass( conf: SinglePassConfig, ) -> Result, Error> { initialize_epoch_cache(state, spec)?; - initialize_progressive_balances_cache(state, None, spec)?; + initialize_progressive_balances_cache(state, spec)?; state.build_exit_cache(spec)?; let previous_epoch = state.previous_epoch(); diff --git a/consensus/state_processing/src/upgrade/altair.rs b/consensus/state_processing/src/upgrade/altair.rs index 589e4c7a479..cfbc6eba9e9 100644 --- a/consensus/state_processing/src/upgrade/altair.rs +++ b/consensus/state_processing/src/upgrade/altair.rs @@ -114,7 +114,7 @@ pub fn upgrade_to_altair( // Fill in previous epoch participation from the pre state's pending attestations. translate_participation(&mut post, &pre.previous_epoch_attestations, spec)?; - initialize_progressive_balances_cache(&mut post, None, spec)?; + initialize_progressive_balances_cache(&mut post, spec)?; // Fill in sync committees // Note: A duplicate committee is assigned for the current and next committee at the fork diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 67ba34d0e29..77f15c0ed1f 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -115,7 +115,6 @@ pub enum Error { SszTypesError(ssz_types::Error), TreeHashCacheNotInitialized, NonLinearTreeHashCacheHistory, - ParticipationCacheError(String), ProgressiveBalancesCacheNotInitialized, ProgressiveBalancesCacheInconsistent, TreeHashCacheSkippedSlot { diff --git a/consensus/types/src/beacon_state/progressive_balances_cache.rs b/consensus/types/src/beacon_state/progressive_balances_cache.rs index 69c76447b46..a7620540bb4 100644 --- a/consensus/types/src/beacon_state/progressive_balances_cache.rs +++ b/consensus/types/src/beacon_state/progressive_balances_cache.rs @@ -283,7 +283,7 @@ impl ProgressiveBalancesCache { } } -/// `ProgressiveBalancesCache` is only enabled from `Altair` as it requires `ParticipationCache`. +/// `ProgressiveBalancesCache` is only enabled from `Altair` as it uses Altair-specific logic. pub fn is_progressive_balances_enabled(state: &BeaconState) -> bool { match state { BeaconState::Base(_) => false, diff --git a/testing/ef_tests/src/cases/epoch_processing.rs b/testing/ef_tests/src/cases/epoch_processing.rs index 7ad38cb6f10..af695eb94e0 100644 --- a/testing/ef_tests/src/cases/epoch_processing.rs +++ b/testing/ef_tests/src/cases/epoch_processing.rs @@ -110,7 +110,7 @@ impl EpochTransition for JustificationAndFinalization { | BeaconState::Merge(_) | BeaconState::Capella(_) | BeaconState::Deneb(_) => { - initialize_progressive_balances_cache(state, None, spec)?; + initialize_progressive_balances_cache(state, spec)?; let justification_and_finalization_state = altair::process_justification_and_finalization(state)?; justification_and_finalization_state.apply_changes_to_state(state); diff --git a/testing/ef_tests/src/cases/operations.rs b/testing/ef_tests/src/cases/operations.rs index 14d047dae88..177fa301471 100644 --- a/testing/ef_tests/src/cases/operations.rs +++ b/testing/ef_tests/src/cases/operations.rs @@ -104,7 +104,7 @@ impl Operation for Attestation { | BeaconState::Merge(_) | BeaconState::Capella(_) | BeaconState::Deneb(_) => { - initialize_progressive_balances_cache(state, None, spec)?; + initialize_progressive_balances_cache(state, spec)?; altair_deneb::process_attestation( state, self, @@ -134,7 +134,7 @@ impl Operation for AttesterSlashing { _: &Operations, ) -> Result<(), BlockProcessingError> { let mut ctxt = ConsensusContext::new(state.slot()); - initialize_progressive_balances_cache(state, None, spec)?; + initialize_progressive_balances_cache(state, spec)?; process_attester_slashings( state, &[self.clone()], @@ -185,7 +185,7 @@ impl Operation for ProposerSlashing { _: &Operations, ) -> Result<(), BlockProcessingError> { let mut ctxt = ConsensusContext::new(state.slot()); - initialize_progressive_balances_cache(state, None, spec)?; + initialize_progressive_balances_cache(state, spec)?; process_proposer_slashings( state, &[self.clone()],