Skip to content

Commit

Permalink
todos and flag comments
Browse files Browse the repository at this point in the history
  • Loading branch information
eserilev committed Dec 3, 2023
2 parents 13942bf + 44aaf13 commit 6dc3273
Show file tree
Hide file tree
Showing 48 changed files with 1,455 additions and 533 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 84 additions & 13 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,11 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub data_availability_checker: Arc<DataAvailabilityChecker<T>>,
/// The KZG trusted setup used by this chain.
pub kzg: Option<Arc<Kzg>>,
/// State with complete tree hash cache, ready for block production.
///
/// NB: We can delete this once we have tree-states.
#[allow(clippy::type_complexity)]
pub block_production_state: Arc<Mutex<Option<(Hash256, BlockProductionPreState<T::EthSpec>)>>>,
}

pub enum BeaconBlockResponseType<T: EthSpec> {
Expand Down Expand Up @@ -4030,14 +4035,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
(re_org_state.pre_state, re_org_state.state_root)
}
// Normal case: proposing a block atop the current head. Use the snapshot cache.
// Normal case: proposing a block atop the current head using the cache.
else if let Some((_, cached_state)) = self
.block_production_state
.lock()
.take()
.filter(|(cached_block_root, _)| *cached_block_root == head_block_root)
{
(cached_state.pre_state, cached_state.state_root)
}
// Fall back to a direct read of the snapshot cache.
else if let Some(pre_state) = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(head_block_root)
})
{
warn!(
self.log,
"Block production cache miss";
"message" => "falling back to snapshot cache clone",
"slot" => slot
);
(pre_state.pre_state, pre_state.state_root)
} else {
warn!(
Expand Down Expand Up @@ -4161,12 +4181,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
drop(proposer_head_timer);
let re_org_parent_block = proposer_head.parent_node.root;

// Only attempt a re-org if we hit the snapshot cache.
// Only attempt a re-org if we hit the block production cache or snapshot cache.
let pre_state = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(re_org_parent_block)
.block_production_state
.lock()
.take()
.and_then(|(cached_block_root, state)| {
(cached_block_root == re_org_parent_block).then_some(state)
})
.or_else(|| {
warn!(
self.log,
"Block production cache miss";
"message" => "falling back to snapshot cache during re-org",
"slot" => slot,
"block_root" => ?re_org_parent_block
);
self.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(re_org_parent_block)
})
})
.or_else(|| {
debug!(
Expand Down Expand Up @@ -5326,15 +5361,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// This function will result in a call to `forkchoiceUpdated` on the EL if we're in the
/// tail-end of the slot (as defined by `self.config.prepare_payload_lookahead`).
///
/// Return `Ok(Some(head_block_root))` if this node prepared to propose at the next slot on
/// top of `head_block_root`.
pub async fn prepare_beacon_proposer(
self: &Arc<Self>,
current_slot: Slot,
) -> Result<(), Error> {
) -> Result<Option<Hash256>, Error> {
let prepare_slot = current_slot + 1;

// There's no need to run the proposer preparation routine before the bellatrix fork.
if self.slot_is_prior_to_bellatrix(prepare_slot) {
return Ok(());
return Ok(None);
}

let execution_layer = self
Expand All @@ -5347,7 +5385,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if !self.config.always_prepare_payload
&& !execution_layer.has_any_proposer_preparation_data().await
{
return Ok(());
return Ok(None);
}

// Load the cached head and its forkchoice update parameters.
Expand Down Expand Up @@ -5394,7 +5432,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let Some((forkchoice_update_params, Some(pre_payload_attributes))) = maybe_prep_data else {
// Appropriate log messages have already been logged above and in
// `get_pre_payload_attributes`.
return Ok(());
return Ok(None);
};

// If the execution layer doesn't have any proposer data for this validator then we assume
Expand All @@ -5405,7 +5443,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.has_proposer_preparation_data(proposer)
.await
{
return Ok(());
return Ok(None);
}

// Fetch payload attributes from the execution layer's cache, or compute them from scratch
Expand Down Expand Up @@ -5500,7 +5538,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"prepare_slot" => prepare_slot,
"validator" => proposer,
);
return Ok(());
return Ok(None);
};

// If we are close enough to the proposal slot, send an fcU, which will have payload
Expand All @@ -5523,7 +5561,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await?;
}

Ok(())
Ok(Some(head_root))
}

pub async fn update_execution_engine_forkchoice(
Expand Down Expand Up @@ -6446,6 +6484,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn data_availability_boundary(&self) -> Option<Epoch> {
self.data_availability_checker.data_availability_boundary()
}

/// Gets the `LightClientBootstrap` object for a requested block root.
///
/// Returns `None` when the state or block is not found in the database.
#[allow(clippy::type_complexity)]
pub fn get_light_client_bootstrap(
&self,
block_root: &Hash256,
) -> Result<Option<(LightClientBootstrap<T::EthSpec>, ForkName)>, Error> {
let Some((state_root, slot)) = self
.get_blinded_block(block_root)?
.map(|block| (block.state_root(), block.slot()))
else {
return Ok(None);
};

let Some(mut state) = self.get_state(&state_root, Some(slot))? else {
return Ok(None);
};

let fork_name = state
.fork_name(&self.spec)
.map_err(Error::InconsistentFork)?;

match fork_name {
ForkName::Altair | ForkName::Merge => {
LightClientBootstrap::from_beacon_state(&mut state)
.map(|bootstrap| Some((bootstrap, fork_name)))
.map_err(Error::LightClientError)
}
ForkName::Base | ForkName::Capella | ForkName::Deneb => Err(Error::UnsupportedFork),
}
}
}

impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ where
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
),
kzg,
block_production_state: Arc::new(Mutex::new(None)),
};

let head = beacon_chain.head_snapshot();
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ pub enum BeaconChainError {
ProposerHeadForkChoiceError(fork_choice::Error<proto_array::Error>),
UnableToPublish,
AvailabilityCheckError(AvailabilityCheckError),
LightClientError(LightClientError),
UnsupportedFork,
}

easy_from_to!(SlotProcessingError, BeaconChainError);
Expand Down
22 changes: 22 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub struct ServerSentEventHandler<T: EthSpec> {
contribution_tx: Sender<EventKind<T>>,
payload_attributes_tx: Sender<EventKind<T>>,
late_head: Sender<EventKind<T>>,
light_client_finality_update_tx: Sender<EventKind<T>>,
light_client_optimistic_update_tx: Sender<EventKind<T>>,
block_reward_tx: Sender<EventKind<T>>,
log: Logger,
}
Expand All @@ -40,6 +42,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (contribution_tx, _) = broadcast::channel(capacity);
let (payload_attributes_tx, _) = broadcast::channel(capacity);
let (late_head, _) = broadcast::channel(capacity);
let (light_client_finality_update_tx, _) = broadcast::channel(capacity);
let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity);
let (block_reward_tx, _) = broadcast::channel(capacity);

Self {
Expand All @@ -53,6 +57,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
contribution_tx,
payload_attributes_tx,
late_head,
light_client_finality_update_tx,
light_client_optimistic_update_tx,
block_reward_tx,
log,
}
Expand Down Expand Up @@ -108,6 +114,14 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.late_head
.send(kind)
.map(|count| log_count("late head", count)),
EventKind::LightClientFinalityUpdate(_) => self
.light_client_finality_update_tx
.send(kind)
.map(|count| log_count("light client finality update", count)),
EventKind::LightClientOptimisticUpdate(_) => self
.light_client_optimistic_update_tx
.send(kind)
.map(|count| log_count("light client optimistic update", count)),
EventKind::BlockReward(_) => self
.block_reward_tx
.send(kind)
Expand Down Expand Up @@ -158,6 +172,14 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.late_head.subscribe()
}

pub fn subscribe_light_client_finality_update(&self) -> Receiver<EventKind<T>> {
self.light_client_finality_update_tx.subscribe()
}

pub fn subscribe_light_client_optimistic_update(&self) -> Receiver<EventKind<T>> {
self.light_client_optimistic_update_tx.subscribe()
}

pub fn subscribe_block_reward(&self) -> Receiver<EventKind<T>> {
self.block_reward_tx.subscribe()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientFinalityUpdate<T> {
chain: &BeaconChain<T>,
seen_timestamp: Duration,
) -> Result<Self, Error> {
let gossiped_finality_slot = light_client_finality_update.finalized_header.slot;
let gossiped_finality_slot = light_client_finality_update.finalized_header.beacon.slot;
let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0);
let signature_slot = light_client_finality_update.signature_slot;
let start_time = chain.slot_clock.start_of(signature_slot);
Expand All @@ -88,7 +88,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientFinalityUpdate<T> {
.get_blinded_block(&finalized_block_root)?
.ok_or(Error::FailedConstructingUpdate)?;
let latest_seen_finality_update_slot = match latest_seen_finality_update.as_ref() {
Some(update) => update.finalized_header.slot,
Some(update) => update.finalized_header.beacon.slot,
None => Slot::new(0),
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
chain: &BeaconChain<T>,
seen_timestamp: Duration,
) -> Result<Self, Error> {
let gossiped_optimistic_slot = light_client_optimistic_update.attested_header.slot;
let gossiped_optimistic_slot = light_client_optimistic_update.attested_header.beacon.slot;
let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0);
let signature_slot = light_client_optimistic_update.signature_slot;
let start_time = chain.slot_clock.start_of(signature_slot);
Expand All @@ -88,7 +88,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
.get_state(&attested_block.state_root(), Some(attested_block.slot()))?
.ok_or(Error::FailedConstructingUpdate)?;
let latest_seen_optimistic_update_slot = match latest_seen_optimistic_update.as_ref() {
Some(update) => update.attested_header.slot,
Some(update) => update.attested_header.beacon.slot,
None => Slot::new(0),
};

Expand All @@ -114,6 +114,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
// otherwise queue
let canonical_root = light_client_optimistic_update
.attested_header
.beacon
.canonical_root();

if canonical_root != head_block.message().parent_root() {
Expand Down
Loading

0 comments on commit 6dc3273

Please sign in to comment.