Skip to content

Commit

Permalink
Delete the participation cache (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Feb 22, 2024
1 parent a4427da commit 4380e05
Show file tree
Hide file tree
Showing 17 changed files with 101 additions and 565 deletions.
46 changes: 32 additions & 14 deletions beacon_node/beacon_chain/src/attestation_rewards.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::attestation_rewards::{IdealAttestationRewards, TotalAttestationRewards};
use eth2::lighthouse::StandardAttestationRewards;
use participation_cache::ParticipationCache;
use safe_arith::SafeArith;
use serde_utils::quoted_u64::Quoted;
use slog::debug;
Expand All @@ -10,7 +9,7 @@ use state_processing::per_epoch_processing::altair::{
};
use state_processing::{
common::altair::BaseRewardPerIncrement,
per_epoch_processing::altair::{participation_cache, rewards_and_penalties::get_flag_weight},
per_epoch_processing::altair::rewards_and_penalties::get_flag_weight,
};
use std::collections::HashMap;
use store::consts::altair::{
Expand Down Expand Up @@ -134,8 +133,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let spec = &self.spec;

// Calculate ideal_rewards
let participation_cache = ParticipationCache::new(&state, spec)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
process_justification_and_finalization(&state)?.apply_changes_to_state(&mut state);
process_inactivity_updates_slow(&mut state, spec)?;

Expand All @@ -147,14 +144,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let weight = get_flag_weight(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;

let unslashed_participating_balance = participation_cache
.previous_epoch_flag_attesting_balance(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let unslashed_participating_balance = state
.progressive_balances_cache()
.previous_epoch_flag_attesting_balance(flag_index)?;

let unslashed_participating_increments =
unslashed_participating_balance.safe_div(spec.effective_balance_increment)?;

let total_active_balance = participation_cache.current_epoch_total_active_balance();
let total_active_balance = state.get_total_active_balance()?;

let active_increments =
total_active_balance.safe_div(spec.effective_balance_increment)?;
Expand Down Expand Up @@ -190,15 +187,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut total_rewards: Vec<TotalAttestationRewards> = Vec::new();

let validators = if validators.is_empty() {
participation_cache.eligible_validator_indices().to_vec()
Self::all_eligible_validator_indices(&state, previous_epoch)?
} else {
Self::validators_ids_to_indices(&mut state, validators)?
};

for &validator_index in &validators {
// Return 0s for unknown/inactive validator indices. This is a bit different from stable
// where we error for unknown pubkeys.
let Ok(validator) = participation_cache.get_validator(validator_index) else {
let Ok(validator) = state.get_validator(validator_index) else {
debug!(
self.log,
"No rewards for inactive/unknown validator";
Expand All @@ -215,7 +212,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
});
continue;
};
let eligible = validator.is_eligible;
let previous_epoch_participation_flags = state
.previous_epoch_participation()?
.get(validator_index)
.ok_or(BeaconChainError::AttestationRewardsError)?;
let eligible = state.is_eligible_validator(previous_epoch, validator)?;
let mut head_reward = 0i64;
let mut target_reward = 0i64;
let mut source_reward = 0i64;
Expand All @@ -228,9 +229,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let (ideal_reward, penalty) = ideal_rewards_hashmap
.get(&(flag_index, effective_balance))
.ok_or(BeaconChainError::AttestationRewardsError)?;
let voted_correctly = validator
.is_unslashed_participating_index(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let voted_correctly = !validator.slashed
&& previous_epoch_participation_flags.has_flag(flag_index)?;
if voted_correctly {
if flag_index == TIMELY_HEAD_FLAG_INDEX {
head_reward += *ideal_reward as i64;
Expand Down Expand Up @@ -314,6 +314,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(max_steps)
}

fn all_eligible_validator_indices(
state: &BeaconState<T::EthSpec>,
previous_epoch: Epoch,
) -> Result<Vec<usize>, BeaconChainError> {
state
.validators()
.iter()
.enumerate()
.filter_map(|(i, validator)| {
state
.is_eligible_validator(previous_epoch, validator)
.map(|eligible| eligible.then_some(i))
.map_err(BeaconChainError::BeaconStateError)
.transpose()
})
.collect()
}

fn validators_ids_to_indices(
state: &mut BeaconState<T::EthSpec>,
validators: Vec<ValidatorId>,
Expand Down
8 changes: 0 additions & 8 deletions beacon_node/http_api/src/attestation_performance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use eth2::lighthouse::{
AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics,
};
use state_processing::{
per_epoch_processing::altair::participation_cache::Error as ParticipationCacheError,
per_epoch_processing::EpochProcessingSummary, BlockReplayError, BlockReplayer,
};
use std::sync::Arc;
Expand All @@ -18,7 +17,6 @@ const BLOCK_ROOT_CHUNK_SIZE: usize = 100;
enum AttestationPerformanceError {
BlockReplay(#[allow(dead_code)] BlockReplayError),
BeaconState(#[allow(dead_code)] BeaconStateError),
ParticipationCache(#[allow(dead_code)] ParticipationCacheError),
UnableToFindValidator(#[allow(dead_code)] usize),
}

Expand All @@ -34,12 +32,6 @@ impl From<BeaconStateError> for AttestationPerformanceError {
}
}

impl From<ParticipationCacheError> for AttestationPerformanceError {
fn from(e: ParticipationCacheError) -> Self {
Self::ParticipationCache(e)
}
}

pub fn get_attestation_performance<T: BeaconChainTypes>(
target: String,
query: AttestationPerformanceQuery,
Expand Down
6 changes: 5 additions & 1 deletion consensus/fork_choice/src/fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,11 @@ where
| BeaconBlockRef::Capella(_)
| BeaconBlockRef::Merge(_)
| BeaconBlockRef::Altair(_) => {
// FIXME(sproul): initialize progressive balances
// NOTE: Processing justification & finalization requires the progressive
// balances cache, but we cannot initialize it here as we only have an
// immutable reference. The state *should* have come straight from block
// processing, which initialises the cache, but if we add other `on_block`
// calls in future it could be worth passing a mutable reference.
per_epoch_processing::altair::process_justification_and_finalization(state)?
}
BeaconBlockRef::Base(_) => {
Expand Down
2 changes: 1 addition & 1 deletion consensus/state_processing/src/all_caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl<E: EthSpec> AllCaches for BeaconState<E> {
fn build_all_caches(&mut self, spec: &ChainSpec) -> Result<(), EpochCacheError> {
self.build_caches(spec)?;
initialize_epoch_cache(self, spec)?;
initialize_progressive_balances_cache(self, None, spec)?;
initialize_progressive_balances_cache(self, spec)?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@ use crate::metrics::{
PARTICIPATION_CURR_EPOCH_TARGET_ATTESTING_GWEI_PROGRESSIVE_TOTAL,
PARTICIPATION_PREV_EPOCH_TARGET_ATTESTING_GWEI_PROGRESSIVE_TOTAL,
};
use crate::per_epoch_processing::altair::ParticipationCache;
use crate::{BlockProcessingError, EpochProcessingError};
use lighthouse_metrics::set_gauge;
use std::borrow::Cow;
use types::{
is_progressive_balances_enabled, BeaconState, BeaconStateError, ChainSpec, Epoch,
EpochTotalBalances, EthSpec, ProgressiveBalancesCache,
EpochTotalBalances, EthSpec, ParticipationFlags, ProgressiveBalancesCache, Validator,
};

/// Initializes the `ProgressiveBalancesCache` cache using balance values from the
/// `ParticipationCache`. If the optional `&ParticipationCache` is not supplied, it will be computed
/// from the `BeaconState`.
/// Initializes the `ProgressiveBalancesCache` if it is unbuilt.
pub fn initialize_progressive_balances_cache<E: EthSpec>(
state: &mut BeaconState<E>,
maybe_participation_cache: Option<&ParticipationCache>,
spec: &ChainSpec,
) -> Result<(), BeaconStateError> {
if !is_progressive_balances_enabled(state)
Expand All @@ -26,29 +21,37 @@ pub fn initialize_progressive_balances_cache<E: EthSpec>(
return Ok(());
}

// FIXME(sproul): simplify the participation cache
let participation_cache = match maybe_participation_cache {
Some(cache) => Cow::Borrowed(cache),
None => {
state.build_total_active_balance_cache_at(state.current_epoch(), spec)?;
Cow::Owned(
ParticipationCache::new(state, spec)
.map_err(|e| BeaconStateError::ParticipationCacheError(format!("{e:?}")))?,
)
// Calculate the total flag balances for previous & current epoch in a single iteration.
// This calculates `get_total_balance(unslashed_participating_indices(..))` for each flag in
// the current and previous epoch.
let current_epoch = state.current_epoch();
let previous_epoch = state.previous_epoch();
let mut previous_epoch_cache = EpochTotalBalances::new(spec);
let mut current_epoch_cache = EpochTotalBalances::new(spec);
for ((validator, current_epoch_flags), previous_epoch_flags) in state
.validators()
.iter()
.zip(state.current_epoch_participation()?)
.zip(state.previous_epoch_participation()?)
{
// Exclude slashed validators. We are calculating *unslashed* participating totals.
if validator.slashed {
continue;
}
};

let current_epoch = state.current_epoch();
let previous_epoch_cache = EpochTotalBalances {
total_flag_balances: participation_cache
.previous_epoch_participation
.total_flag_balances,
};
let current_epoch_cache = EpochTotalBalances {
total_flag_balances: participation_cache
.current_epoch_participation
.total_flag_balances,
};
// Update current epoch flag balances.
if validator.is_active_at(current_epoch) {
update_flag_total_balances(&mut current_epoch_cache, *current_epoch_flags, validator)?;
}
// Update previous epoch flag balances.
if validator.is_active_at(previous_epoch) {
update_flag_total_balances(
&mut previous_epoch_cache,
*previous_epoch_flags,
validator,
)?;
}
}

state.progressive_balances_cache_mut().initialize(
current_epoch,
Expand All @@ -61,6 +64,26 @@ pub fn initialize_progressive_balances_cache<E: EthSpec>(
Ok(())
}

/// During the initialization of the progressive balances for a single epoch, add
/// `validator.effective_balance` to the flag total, for each flag present in `participation_flags`.
///
/// Pre-conditions:
///
/// - `validator` must not be slashed
/// - the `participation_flags` must be for `validator` in the same epoch as the `total_balances`
fn update_flag_total_balances(
total_balances: &mut EpochTotalBalances,
participation_flags: ParticipationFlags,
validator: &Validator,
) -> Result<(), BeaconStateError> {
for (flag, balance) in total_balances.total_flag_balances.iter_mut().enumerate() {
if participation_flags.has_flag(flag)? {
balance.safe_add_assign(validator.effective_balance)?;
}
}
Ok(())
}

/// Updates the `ProgressiveBalancesCache` when a new target attestation has been processed.
pub fn update_progressive_balances_on_attestation<T: EthSpec>(
state: &mut BeaconState<T>,
Expand Down
2 changes: 1 addition & 1 deletion consensus/state_processing/src/per_block_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub fn per_block_processing<T: EthSpec, Payload: AbstractExecPayload<T>>(

// Build epoch cache if it hasn't already been built, or if it is no longer valid
initialize_epoch_cache(state, spec)?;
initialize_progressive_balances_cache(state, None, spec)?;
initialize_progressive_balances_cache(state, spec)?;
state.build_slashings_cache()?;

let verify_signatures = match block_signature_strategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,10 @@ pub fn process_sync_aggregate<T: EthSpec>(
increase_balance(state, participant_index, participant_reward)?;
}
proposer_balance.safe_add_assign(proposer_reward)?;
} else if participant_index == proposer_index {
proposer_balance = proposer_balance.saturating_sub(participant_reward);
} else {
if participant_index == proposer_index {
proposer_balance = proposer_balance.saturating_sub(participant_reward);
} else {
decrease_balance(state, participant_index, participant_reward)?;
}
decrease_balance(state, participant_index, participant_reward)?;
}
}

Expand Down
9 changes: 0 additions & 9 deletions consensus/state_processing/src/per_block_processing/errors.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use super::signature_sets::Error as SignatureSetError;
use crate::per_epoch_processing::altair::participation_cache;
use crate::{ContextError, EpochCacheError};
use merkle_proof::MerkleTreeError;
use participation_cache::Error as ParticipationCacheError;
use safe_arith::ArithError;
use ssz::DecodeError;
use types::*;
Expand Down Expand Up @@ -90,7 +88,6 @@ pub enum BlockProcessingError {
found: Hash256,
},
WithdrawalCredentialsInvalid,
ParticipationCacheError(ParticipationCacheError),
}

impl From<BeaconStateError> for BlockProcessingError {
Expand Down Expand Up @@ -154,12 +151,6 @@ impl From<BlockOperationError<HeaderInvalid>> for BlockProcessingError {
}
}

impl From<ParticipationCacheError> for BlockProcessingError {
fn from(e: ParticipationCacheError) -> Self {
BlockProcessingError::ParticipationCacheError(e)
}
}

/// A conversion that consumes `self` and adds an `index` variable to resulting struct.
///
/// Used here to allow converting an error into an upstream error that points to the object that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@ use crate::per_epoch_processing::{
};
pub use inactivity_updates::process_inactivity_updates_slow;
pub use justification_and_finalization::process_justification_and_finalization;
pub use participation_cache::ParticipationCache;
pub use participation_flag_updates::process_participation_flag_updates;
pub use rewards_and_penalties::process_rewards_and_penalties_slow;
pub use sync_committee_updates::process_sync_committee_updates;
use types::{BeaconState, ChainSpec, EthSpec, RelativeEpoch};

pub mod inactivity_updates;
pub mod justification_and_finalization;
pub mod participation_cache;
pub mod participation_flag_updates;
pub mod rewards_and_penalties;
pub mod sync_committee_updates;
Expand All @@ -34,7 +32,7 @@ pub fn process_epoch<T: EthSpec>(
state.build_committee_cache(RelativeEpoch::Next, spec)?;
state.build_total_active_balance_cache_at(state.current_epoch(), spec)?;
initialize_epoch_cache(state, spec)?;
initialize_progressive_balances_cache::<T>(state, None, spec)?;
initialize_progressive_balances_cache::<T>(state, spec)?;

let sync_committee = state.current_sync_committee()?.clone();

Expand Down
Loading

0 comments on commit 4380e05

Please sign in to comment.