Skip to content

Commit

Permalink
Handle broadcast_validation parameter in POST /eth/v2/beacon/blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
Tumas committed Aug 29, 2024
1 parent bdeb14e commit fa5e309
Show file tree
Hide file tree
Showing 27 changed files with 836 additions and 173 deletions.
2 changes: 1 addition & 1 deletion ad_hoc_bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ fn run<P: Preset>(

if let Some(block_blobs) = blobs.remove(&slot) {
for blob in block_blobs {
controller.on_api_blob_sidecar(blob)
controller.on_api_blob_sidecar(blob, None)
}
}

Expand Down
29 changes: 27 additions & 2 deletions fork_choice_control/src/block_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use types::{
nonstandard::{BlockRewards, Phase, SlashingKind},
phase0::primitives::H256,
preset::Preset,
traits::{BeaconBlock as _, SignedBeaconBlock as _},
traits::{BeaconBlock as _, BeaconState as _, SignedBeaconBlock as _},
};

#[derive(Constructor)]
Expand Down Expand Up @@ -156,6 +156,31 @@ impl<P: Preset> BlockProcessor<P> {
.map(|(state, _)| state)
}

pub fn validate_block_for_gossip(
&self,
store: &Store<P>,
block: &Arc<SignedBeaconBlock<P>>,
) -> Result<Option<BlockAction<P>>> {
store.validate_block_for_gossip(block, |parent| {
let block_slot = block.message().slot();

// > Make a copy of the state to avoid mutability issues
let mut state = self
.state_cache
.before_or_at_slot(store, parent.block_root, block_slot)
.unwrap_or_else(|| parent.state(store));

// > Process slots (including those with no blocks) since block
if state.slot() < block_slot {
combined::process_slots(&self.chain_config, state.make_mut(), block_slot)?;
}

combined::process_block_for_gossip(&self.chain_config, &state, block)?;

Ok(None)
})
}

pub fn validate_block<E: ExecutionEngine<P> + Send>(
&self,
store: &Store<P>,
Expand Down Expand Up @@ -190,7 +215,7 @@ impl<P: Preset> BlockProcessor<P> {
{
PartialBlockAction::Accept => {}
PartialBlockAction::Ignore => {
return Ok((state, Some(BlockAction::Ignore)))
return Ok((state, Some(BlockAction::Ignore(false))))
}
}
}
Expand Down
29 changes: 22 additions & 7 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::{
storage::Storage,
tasks::{
AggregateAndProofTask, AttestationTask, AttesterSlashingTask, BlobSidecarTask, BlockTask,
BlockVerifyForGossipTask,
},
thread_pool::{Spawn, ThreadPool},
unbounded_sink::UnboundedSink,
Expand Down Expand Up @@ -205,10 +206,6 @@ where
self.spawn_block_task(block, BlockOrigin::Requested(peer_id))
}

pub fn on_semi_verified_block(&self, block: Arc<SignedBeaconBlock<P>>) {
self.spawn_block_task(block, BlockOrigin::SemiVerified)
}

pub fn on_own_block(&self, wait_group: W, block: Arc<SignedBeaconBlock<P>>) {
self.spawn_block_task_with_wait_group(wait_group, block, BlockOrigin::Own)
}
Expand All @@ -222,16 +219,34 @@ where
)
}

pub fn on_api_blob_sidecar(&self, blob_sidecar: Arc<BlobSidecar<P>>) {
self.spawn_blob_sidecar_task(blob_sidecar, true, BlobSidecarOrigin::Api)
pub fn on_api_blob_sidecar(
&self,
blob_sidecar: Arc<BlobSidecar<P>>,
sender: Option<OneshotSender<Result<ValidationOutcome>>>,
) {
self.spawn_blob_sidecar_task(blob_sidecar, true, BlobSidecarOrigin::Api(sender))
}

pub fn on_api_block(
&self,
block: Arc<SignedBeaconBlock<P>>,
sender: MultiSender<Result<ValidationOutcome>>,
) {
self.spawn_block_task(block, BlockOrigin::Api(sender))
self.spawn_block_task(block, BlockOrigin::Api(Some(sender)))
}

pub fn on_api_block_for_gossip(
&self,
block: Arc<SignedBeaconBlock<P>>,
sender: MultiSender<Result<ValidationOutcome>>,
) {
self.spawn(BlockVerifyForGossipTask {
store_snapshot: self.owned_store_snapshot(),
block_processor: self.block_processor.clone_arc(),
wait_group: self.owned_wait_group(),
block,
sender,
})
}

pub fn on_notified_fork_choice_update(&self, payload_status: PayloadStatusV1) {
Expand Down
119 changes: 108 additions & 11 deletions fork_choice_control/src/mutator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,14 +466,17 @@ where

self.accept_block(&wait_group, pending_chain_link)?;
}
Ok(BlockAction::Ignore) => {
Ok(BlockAction::Ignore(publishable)) => {
let (gossip_id, sender) = origin.split();

if let Some(gossip_id) = gossip_id {
P2pMessage::Ignore(gossip_id).send(&self.p2p_tx);
}

reply_block_validation_result_to_http_api(sender, Ok(ValidationOutcome::Ignore));
reply_block_validation_result_to_http_api(
sender,
Ok(ValidationOutcome::Ignore(publishable)),
);
}
Ok(BlockAction::DelayUntilBlobs(block)) => {
let slot = block.message().slot();
Expand All @@ -497,6 +500,11 @@ where
P2pMessage::Accept(gossip_id).send(&self.p2p_tx);
}

let pending_block = reply_delayed_block_validation_result(
pending_block,
Ok(ValidationOutcome::Ignore(false)),
);

let blob_ids = missing_blob_indices
.into_iter()
.map(|index| BlobIdentifier { block_root, index })
Expand All @@ -521,6 +529,11 @@ where
if self.store.contains_block(parent_root) {
self.retry_block(wait_group, pending_block);
} else {
let pending_block = reply_delayed_block_validation_result(
pending_block,
Ok(ValidationOutcome::Ignore(false)),
);

debug!("block delayed until parent: {pending_block:?}");

let peer_id = pending_block.origin.peer_id();
Expand All @@ -542,6 +555,11 @@ where
if slot <= self.store.slot() {
self.retry_block(wait_group, pending_block);
} else {
let pending_block = reply_delayed_block_validation_result(
pending_block,
Ok(ValidationOutcome::Ignore(false)),
);

debug!("block delayed until slot: {pending_block:?}");

self.delay_block_until_slot(pending_block);
Expand Down Expand Up @@ -673,7 +691,7 @@ where
P2pMessage::Ignore(gossip_id).send(&self.p2p_tx);
}

reply_to_http_api(sender, Ok(ValidationOutcome::Ignore));
reply_to_http_api(sender, Ok(ValidationOutcome::Ignore(false)));
}
Ok(AggregateAndProofAction::DelayUntilBlock(aggregate_and_proof, block_root)) => {
if let Some(metrics) = self.metrics.as_ref() {
Expand Down Expand Up @@ -835,7 +853,7 @@ where
P2pMessage::Ignore(gossip_id).send(&self.p2p_tx);
}

reply_to_http_api(sender, Ok(ValidationOutcome::Ignore));
reply_to_http_api(sender, Ok(ValidationOutcome::Ignore(false)));
}
Ok(AttestationAction::DelayUntilBlock(attestation, block_root)) => {
if let Some(metrics) = self.metrics.as_ref() {
Expand Down Expand Up @@ -1014,16 +1032,24 @@ where
) {
match result {
Ok(BlobSidecarAction::Accept(blob_sidecar)) => {
if let Some(gossip_id) = origin.gossip_id() {
let (gossip_id, sender) = origin.split();

if let Some(gossip_id) = gossip_id {
P2pMessage::Accept(gossip_id).send(&self.p2p_tx);
}

reply_to_http_api(sender, Ok(ValidationOutcome::Accept));

self.accept_blob_sidecar(&wait_group, blob_sidecar);
}
Ok(BlobSidecarAction::Ignore) => {
if let Some(gossip_id) = origin.gossip_id() {
Ok(BlobSidecarAction::Ignore(publishable)) => {
let (gossip_id, sender) = origin.split();

if let Some(gossip_id) = gossip_id {
P2pMessage::Ignore(gossip_id).send(&self.p2p_tx);
}

reply_to_http_api(sender, Ok(ValidationOutcome::Ignore(publishable)));
}
Ok(BlobSidecarAction::DelayUntilParent(blob_sidecar)) => {
let parent_root = blob_sidecar.signed_block_header.message.parent_root;
Expand All @@ -1044,6 +1070,11 @@ where

P2pMessage::BlockNeeded(parent_root, peer_id).send(&self.p2p_tx);

let pending_blob_sidecar = reply_delayed_blob_sidecar_validation_result(
pending_blob_sidecar,
Ok(ValidationOutcome::Ignore(false)),
);

self.delay_blob_sidecar_until_parent(pending_blob_sidecar);
}
}
Expand All @@ -1062,16 +1093,25 @@ where
} else {
debug!("blob sidecar delayed until slot: {slot}");

let pending_blob_sidecar = reply_delayed_blob_sidecar_validation_result(
pending_blob_sidecar,
Ok(ValidationOutcome::Ignore(false)),
);

self.delay_blob_sidecar_until_slot(pending_blob_sidecar);
}
}
Err(error) => {
warn!("blob sidecar rejected (error: {error}, origin: {origin:?})");

if let Some(gossip_id) = origin.gossip_id() {
let (gossip_id, sender) = origin.split();

if let Some(gossip_id) = gossip_id {
P2pMessage::Reject(gossip_id, MutatorRejectionReason::InvalidBlobSidecar)
.send(&self.p2p_tx);
}

reply_to_http_api(sender, Err(error));
}
}
}
Expand Down Expand Up @@ -1325,7 +1365,7 @@ where
P2pMessage::Ignore(gossip_id).send(&self.p2p_tx);
}

reply_block_validation_result_to_http_api(sender, Ok(ValidationOutcome::Ignore));
reply_block_validation_result_to_http_api(sender, Ok(ValidationOutcome::Ignore(true)));

return Ok(());
}
Expand All @@ -1346,7 +1386,7 @@ where
P2pMessage::Ignore(gossip_id).send(&self.p2p_tx);
}

reply_block_validation_result_to_http_api(sender, Ok(ValidationOutcome::Ignore));
reply_block_validation_result_to_http_api(sender, Ok(ValidationOutcome::Ignore(false)));

return Ok(());
}
Expand Down Expand Up @@ -1438,7 +1478,7 @@ where
P2pMessage::Ignore(gossip_id).send(&self.p2p_tx);
}

reply_block_validation_result_to_http_api(sender, Ok(ValidationOutcome::Ignore));
reply_block_validation_result_to_http_api(sender, Ok(ValidationOutcome::Ignore(false)));
} else {
let (gossip_id, sender) = origin.split();

Expand Down Expand Up @@ -2401,3 +2441,60 @@ fn reply_block_validation_result_to_http_api(
}
}
}

fn reply_delayed_block_validation_result<P: Preset>(
pending_block: PendingBlock<P>,
reply: Result<ValidationOutcome>,
) -> PendingBlock<P> {
let PendingBlock {
block,
origin,
submission_time,
} = pending_block;

if let BlockOrigin::Api(Some(sender)) = origin {
reply_block_validation_result_to_http_api(Some(sender), reply);

PendingBlock {
block,
origin: BlockOrigin::Api(None),
submission_time,
}
} else {
PendingBlock {
block,
origin,
submission_time,
}
}
}

fn reply_delayed_blob_sidecar_validation_result<P: Preset>(
pending_blob_sidecar: PendingBlobSidecar<P>,
reply: Result<ValidationOutcome>,
) -> PendingBlobSidecar<P> {
let PendingBlobSidecar {
blob_sidecar,
block_seen,
origin,
submission_time,
} = pending_blob_sidecar;

if let BlobSidecarOrigin::Api(Some(sender)) = origin {
reply_to_http_api(Some(sender), reply);

PendingBlobSidecar {
blob_sidecar,
block_seen,
origin: BlobSidecarOrigin::Api(None),
submission_time,
}
} else {
PendingBlobSidecar {
blob_sidecar,
block_seen,
origin,
submission_time,
}
}
}
15 changes: 15 additions & 0 deletions fork_choice_control/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,21 @@ where
Ok(None)
}

pub fn exibits_equivocation(&self, block: &Arc<SignedBeaconBlock<P>>) -> bool {
let block_slot = block.message().slot();
let store = self.store_snapshot();

if store.is_slot_finalized(block_slot) {
return false;
}

let block_proposer_index = block.message().proposer_index();
let block_root = block.message().hash_tree_root();

store.exibits_equivocation_on_blobs(block_slot, block_proposer_index, block_root)
|| store.exibits_equivocation_on_blocks(block_slot, block_proposer_index, block_root)
}

pub fn check_block_root(&self, block_root: H256) -> Result<Option<WithStatus<H256>>> {
let store = self.store_snapshot();

Expand Down
Loading

0 comments on commit fa5e309

Please sign in to comment.