Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add processing and processed caching to the DA checker #4732

Merged
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
3a7ea05
add processing and processed caching to the DA checker
realbigsean Sep 13, 2023
6ed602f
move processing cache out of critical cache
realbigsean Sep 14, 2023
904e593
get it compiling
realbigsean Sep 18, 2023
654485f
fix lints
realbigsean Sep 19, 2023
0ad729c
add docs to `AvailabilityView`
realbigsean Sep 19, 2023
e25c609
some self review
realbigsean Sep 19, 2023
77afea3
fix lints
realbigsean Sep 19, 2023
2402b79
fix beacon chain tests
realbigsean Sep 19, 2023
fb768a3
Merge branch 'deneb-free-blobs' of https://github.com/sigp/lighthouse…
realbigsean Sep 19, 2023
aeeddfc
cargo fmt
realbigsean Sep 19, 2023
91b81e9
make availability view easier to implement, start on testing
realbigsean Sep 20, 2023
420e7cf
move child component cache and finish test
realbigsean Sep 21, 2023
787eb37
cargo fix
realbigsean Sep 21, 2023
c5263b6
cargo fix
realbigsean Sep 21, 2023
90e3e4e
cargo fix
realbigsean Sep 21, 2023
2534dff
fmt and lint
realbigsean Sep 21, 2023
65fa033
make blob commitments not optional, rename some caches, add missing b…
realbigsean Sep 21, 2023
bc23daf
Update beacon_node/beacon_chain/src/data_availability_checker/process…
realbigsean Sep 21, 2023
e07dd7c
marks review feedback and other general cleanup
realbigsean Sep 21, 2023
a1f6b17
cargo fix
realbigsean Sep 21, 2023
d6862e7
improve availability view docs
realbigsean Sep 21, 2023
1b10a29
Merge branch 'move-deneb-lookup-delay-logic' of https://github.com/re…
realbigsean Sep 21, 2023
3986d33
some renames
realbigsean Sep 21, 2023
a868a6c
some renames and docs
realbigsean Sep 21, 2023
8d85c83
fix should delay lookup logic
realbigsean Sep 21, 2023
2777b73
get rid of some wrapper methods
realbigsean Sep 21, 2023
8eecea5
fix up single lookup changes
realbigsean Sep 21, 2023
f2644ab
add a couple docs
realbigsean Sep 21, 2023
14ad2d2
add single blob merge method and improve process_... docs
realbigsean Sep 21, 2023
ad1fabb
update some names
realbigsean Sep 21, 2023
f1a1bc9
lints
realbigsean Sep 21, 2023
21ae04a
Merge branch 'deneb-free-blobs' of https://github.com/sigp/lighthouse…
realbigsean Sep 25, 2023
9384352
fix merge
realbigsean Sep 25, 2023
32b92ff
remove blob indices from lookup creation log
realbigsean Sep 25, 2023
9fc5ca5
remove blob indices from lookup creation log
realbigsean Sep 25, 2023
bf80cab
delayed lookup logging improvement
realbigsean Sep 25, 2023
c774a35
check fork choice before doing any blob processing
realbigsean Sep 25, 2023
816744c
remove unused dep
realbigsean Sep 25, 2023
6a5178a
Update beacon_node/beacon_chain/src/data_availability_checker/availab…
realbigsean Sep 27, 2023
303771d
Update beacon_node/beacon_chain/src/data_availability_checker/availab…
realbigsean Sep 27, 2023
4a6caa3
Update beacon_node/beacon_chain/src/data_availability_checker/availab…
realbigsean Sep 27, 2023
581ea31
Update beacon_node/beacon_chain/src/data_availability_checker/availab…
realbigsean Sep 27, 2023
5e3ee97
Update beacon_node/network/src/sync/block_lookups/delayed_lookup.rs
realbigsean Sep 27, 2023
aba1ce5
Merge branch 'deneb-free-blobs' of https://github.com/sigp/lighthouse…
realbigsean Sep 29, 2023
176e587
Merge branch 'move-deneb-lookup-delay-logic' of https://github.com/re…
realbigsean Sep 29, 2023
61d56c5
remove duplicate deps
realbigsean Sep 29, 2023
188c974
use gen range in random blobs geneartor
realbigsean Oct 2, 2023
49b0c2b
rename processing cache fields
realbigsean Oct 2, 2023
220fda1
require block root in rpc block construction and check block root con…
realbigsean Oct 2, 2023
75c50db
send peers as vec in single message
realbigsean Oct 2, 2023
1eec921
spawn delayed lookup service from network beacon processor
realbigsean Oct 2, 2023
2aba776
fix tests
realbigsean Oct 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 94 additions & 8 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
/// The slot at which blocks are downloaded back to.
pub genesis_backfill_slot: Slot,
// Provides a KZG verification and temporary storage for blocks and blobs as
// they are collected and combined.
/// Provides a KZG verification and temporary storage for blocks and blobs as
/// they are collected and combined.
pub data_availability_checker: Arc<DataAvailabilityChecker<T>>,
/// The KZG trusted setup used by this chain.
pub kzg: Option<Arc<Kzg<<T::EthSpec as EthSpec>::Kzg>>>,
Expand Down Expand Up @@ -2791,11 +2791,97 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(BeaconChainError::TokioJoin)?
}

pub async fn process_blob(
/// Cache the blob in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_gossip_blob(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
self.check_gossip_blob_availability_and_import(blob).await
let block_root = blob.block_root();

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown);
}

self.data_availability_checker
.notify_gossip_blob(blob.as_blob().slot, block_root, &blob);
let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_blobs(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown);
}

self.data_availability_checker
.notify_rpc_blobs(slot, block_root, &blobs);
let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;
self.remove_notified(&block_root, r)
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
&self,
block_root: &Hash256,
r: Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.data_availability_checker.remove_notified(block_root);
}
r
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or erred.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
self: &Arc<Self>,
block_root: Hash256,
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Ok(commitments) = unverified_block
.block()
.message()
.body()
.blob_kzg_commitments()
{
self.data_availability_checker.notify_block_commitments(
unverified_block.block().slot(),
block_root,
commitments.clone(),
);
};
let r = self
.process_block(block_root, unverified_block, notify_execution_layer, || {
Ok(())
})
.await;
self.remove_notified(&block_root, r)
}

/// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and
Expand Down Expand Up @@ -2961,7 +3047,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Checks if the block is available, and imports immediately if so, otherwise caches the block
/// in the data availability checker.
pub async fn check_block_availability_and_import(
async fn check_block_availability_and_import(
self: &Arc<Self>,
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
Expand All @@ -2974,7 +3060,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Checks if the provided blob can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
pub async fn check_gossip_blob_availability_and_import(
async fn check_gossip_blob_availability_and_import(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
Expand All @@ -2986,7 +3072,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
pub async fn check_rpc_blob_availability_and_import(
async fn check_rpc_blob_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
Expand Down Expand Up @@ -5250,7 +5336,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(());
}

// Fetch payoad attributes from the execution layer's cache, or compute them from scratch
// Fetch payload attributes from the execution layer's cache, or compute them from scratch
// if no matching entry is found. This saves recomputing the withdrawals which can take
// considerable time to compute if a state load is required.
let head_root = forkchoice_update_params.head_root;
Expand Down
32 changes: 30 additions & 2 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use crate::beacon_chain::{
use crate::block_verification::cheap_state_advance_to_obtain_committees;
use crate::data_availability_checker::AvailabilityCheckError;
use crate::kzg_utils::{validate_blob, validate_blobs};
use crate::BeaconChainError;
use crate::{metrics, BeaconChainError};
use kzg::Kzg;
use slog::{debug, warn};
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconStateError, BlobSidecar, BlobSidecarList, CloneConfig, EthSpec, Hash256,
Expand Down Expand Up @@ -172,6 +173,9 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
pub fn to_blob(self) -> Arc<BlobSidecar<T::EthSpec>> {
self.blob.message
}
pub fn as_blob(&self) -> &BlobSidecar<T::EthSpec> {
&self.blob.message
}
pub fn signed_blob(&self) -> SignedBlobSidecar<T::EthSpec> {
self.blob.clone()
}
Expand Down Expand Up @@ -203,6 +207,8 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
});
}

let blob_root = get_blob_root(&signed_blob_sidecar);

// Verify that the sidecar is not from a future slot.
let latest_permissible_slot = chain
.slot_clock
Expand Down Expand Up @@ -393,7 +399,7 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
.ok_or_else(|| GossipBlobError::UnknownValidator(proposer_index as u64))?;

signed_blob_sidecar.verify_signature(
None,
Some(blob_root),
pubkey,
&fork,
chain.genesis_validators_root,
Expand Down Expand Up @@ -473,6 +479,15 @@ impl<T: EthSpec> KzgVerifiedBlob<T> {
}
}

#[cfg(test)]
impl<T: EthSpec> KzgVerifiedBlob<T> {
pub fn new(blob: BlobSidecar<T>) -> Self {
Self {
blob: Arc::new(blob),
}
}
}

/// Complete kzg verification for a `GossipVerifiedBlob`.
///
/// Returns an error if the kzg verification check fails.
Expand Down Expand Up @@ -518,3 +533,16 @@ pub fn verify_kzg_for_blob_list<T: EthSpec>(
Err(AvailabilityCheckError::KzgVerificationFailed)
}
}

/// Returns the canonical root of the given `blob`.
///
/// Use this function to ensure that we report the blob hashing time Prometheus metric.
pub fn get_blob_root<E: EthSpec>(blob: &SignedBlobSidecar<E>) -> Hash256 {
let blob_root_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_BLOB_ROOT);

let blob_root = blob.message.tree_hash_root();

metrics::stop_timer(blob_root_timer);

blob_root
}
38 changes: 21 additions & 17 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::block_verification::BlockError;
use crate::data_availability_checker::AvailabilityCheckError;
pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock};
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::{data_availability_checker, GossipVerifiedBlock, PayloadVerificationOutcome};
use crate::{GossipVerifiedBlock, PayloadVerificationOutcome};
use derivative::Derivative;
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
Expand Down Expand Up @@ -65,8 +65,22 @@ impl<E: EthSpec> RpcBlock<E> {
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
) -> Result<Self, AvailabilityCheckError> {
if let Some(blobs) = blobs.as_ref() {
data_availability_checker::consistency_checks(&block, blobs)?;
if let (Some(blobs), Ok(block_commitments)) = (
blobs.as_ref(),
block.message().body().blob_kzg_commitments(),
) {
if blobs.len() != block_commitments.len() {
return Err(AvailabilityCheckError::MissingBlobs);
}
for (blob, &block_commitment) in blobs.iter().zip(block_commitments.iter()) {
let blob_commitment = blob.kzg_commitment;
if blob_commitment != block_commitment {
pawanjay176 marked this conversation as resolved.
Show resolved Hide resolved
return Err(AvailabilityCheckError::KzgCommitmentMismatch {
block_commitment,
blob_commitment,
});
}
}
}
let inner = match blobs {
Some(blobs) => RpcBlockInner::BlockAndBlobs(block, blobs),
Expand Down Expand Up @@ -235,27 +249,17 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
}
}

pub fn as_block(&self) -> &SignedBeaconBlock<E> {
&self.block
}

pub fn num_blobs_expected(&self) -> usize {
self.block
.message()
.body()
.blob_kzg_commitments()
.map_or(0, |commitments| commitments.len())
}

pub fn get_all_blob_ids(&self) -> Vec<BlobIdentifier> {
let block_root = self.import_data.block_root;
self.block
.get_filtered_blob_ids(Some(block_root), |_, _| true)
}

pub fn get_filtered_blob_ids(
&self,
filter: impl Fn(usize, Hash256) -> bool,
) -> Vec<BlobIdentifier> {
self.block
.get_filtered_blob_ids(Some(self.import_data.block_root), filter)
}
}

#[derive(Debug, PartialEq, Encode, Decode, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ where
validator_monitor: RwLock::new(validator_monitor),
genesis_backfill_slot,
data_availability_checker: Arc::new(
DataAvailabilityChecker::new(slot_clock, kzg.clone(), store, self.spec)
DataAvailabilityChecker::new(slot_clock, kzg.clone(), store, &log, self.spec)
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
),
kzg,
Expand Down
Loading
Loading