diff --git a/Cargo.lock b/Cargo.lock index af9d0a0ad88..9c1b591349b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3875,7 +3875,9 @@ dependencies = [ "eth2_wallet", "ethereum_hashing", "ethereum_ssz", + "execution_layer", "genesis", + "hex", "int_to_bytes", "lighthouse_network", "lighthouse_version", @@ -8437,6 +8439,7 @@ dependencies = [ "slashing_protection", "slog", "slot_clock", + "strum", "sysinfo", "system_health", "task_executor", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 168bbfca496..53583390fa3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -482,6 +482,11 @@ pub struct BeaconChain { pub data_availability_checker: Arc>, /// The KZG trusted setup used by this chain. pub kzg: Option>, + /// State with complete tree hash cache, ready for block production. + /// + /// NB: We can delete this once we have tree-states. + #[allow(clippy::type_complexity)] + pub block_production_state: Arc)>>>, } pub enum BeaconBlockResponseType { @@ -4030,7 +4035,16 @@ impl BeaconChain { ); (re_org_state.pre_state, re_org_state.state_root) } - // Normal case: proposing a block atop the current head. Use the snapshot cache. + // Normal case: proposing a block atop the current head using the cache. + else if let Some((_, cached_state)) = self + .block_production_state + .lock() + .take() + .filter(|(cached_block_root, _)| *cached_block_root == head_block_root) + { + (cached_state.pre_state, cached_state.state_root) + } + // Fall back to a direct read of the snapshot cache. else if let Some(pre_state) = self .snapshot_cache .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) @@ -4038,6 +4052,12 @@ impl BeaconChain { snapshot_cache.get_state_for_block_production(head_block_root) }) { + warn!( + self.log, + "Block production cache miss"; + "message" => "falling back to snapshot cache clone", + "slot" => slot + ); (pre_state.pre_state, pre_state.state_root) } else { warn!( @@ -4161,12 +4181,27 @@ impl BeaconChain { drop(proposer_head_timer); let re_org_parent_block = proposer_head.parent_node.root; - // Only attempt a re-org if we hit the snapshot cache. + // Only attempt a re-org if we hit the block production cache or snapshot cache. let pre_state = self - .snapshot_cache - .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .and_then(|snapshot_cache| { - snapshot_cache.get_state_for_block_production(re_org_parent_block) + .block_production_state + .lock() + .take() + .and_then(|(cached_block_root, state)| { + (cached_block_root == re_org_parent_block).then_some(state) + }) + .or_else(|| { + warn!( + self.log, + "Block production cache miss"; + "message" => "falling back to snapshot cache during re-org", + "slot" => slot, + "block_root" => ?re_org_parent_block + ); + self.snapshot_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .and_then(|snapshot_cache| { + snapshot_cache.get_state_for_block_production(re_org_parent_block) + }) }) .or_else(|| { debug!( @@ -5326,15 +5361,18 @@ impl BeaconChain { /// /// This function will result in a call to `forkchoiceUpdated` on the EL if we're in the /// tail-end of the slot (as defined by `self.config.prepare_payload_lookahead`). + /// + /// Return `Ok(Some(head_block_root))` if this node prepared to propose at the next slot on + /// top of `head_block_root`. pub async fn prepare_beacon_proposer( self: &Arc, current_slot: Slot, - ) -> Result<(), Error> { + ) -> Result, Error> { let prepare_slot = current_slot + 1; // There's no need to run the proposer preparation routine before the bellatrix fork. if self.slot_is_prior_to_bellatrix(prepare_slot) { - return Ok(()); + return Ok(None); } let execution_layer = self @@ -5347,7 +5385,7 @@ impl BeaconChain { if !self.config.always_prepare_payload && !execution_layer.has_any_proposer_preparation_data().await { - return Ok(()); + return Ok(None); } // Load the cached head and its forkchoice update parameters. @@ -5394,7 +5432,7 @@ impl BeaconChain { let Some((forkchoice_update_params, Some(pre_payload_attributes))) = maybe_prep_data else { // Appropriate log messages have already been logged above and in // `get_pre_payload_attributes`. - return Ok(()); + return Ok(None); }; // If the execution layer doesn't have any proposer data for this validator then we assume @@ -5405,7 +5443,7 @@ impl BeaconChain { .has_proposer_preparation_data(proposer) .await { - return Ok(()); + return Ok(None); } // Fetch payload attributes from the execution layer's cache, or compute them from scratch @@ -5500,7 +5538,7 @@ impl BeaconChain { "prepare_slot" => prepare_slot, "validator" => proposer, ); - return Ok(()); + return Ok(None); }; // If we are close enough to the proposal slot, send an fcU, which will have payload @@ -5523,7 +5561,7 @@ impl BeaconChain { .await?; } - Ok(()) + Ok(Some(head_root)) } pub async fn update_execution_engine_forkchoice( @@ -6446,6 +6484,39 @@ impl BeaconChain { pub fn data_availability_boundary(&self) -> Option { self.data_availability_checker.data_availability_boundary() } + + /// Gets the `LightClientBootstrap` object for a requested block root. + /// + /// Returns `None` when the state or block is not found in the database. + #[allow(clippy::type_complexity)] + pub fn get_light_client_bootstrap( + &self, + block_root: &Hash256, + ) -> Result, ForkName)>, Error> { + let Some((state_root, slot)) = self + .get_blinded_block(block_root)? + .map(|block| (block.state_root(), block.slot())) + else { + return Ok(None); + }; + + let Some(mut state) = self.get_state(&state_root, Some(slot))? else { + return Ok(None); + }; + + let fork_name = state + .fork_name(&self.spec) + .map_err(Error::InconsistentFork)?; + + match fork_name { + ForkName::Altair | ForkName::Merge => { + LightClientBootstrap::from_beacon_state(&mut state) + .map(|bootstrap| Some((bootstrap, fork_name))) + .map_err(Error::LightClientError) + } + ForkName::Base | ForkName::Capella | ForkName::Deneb => Err(Error::UnsupportedFork), + } + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index bffb23aeb7e..fbd255126ee 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -925,6 +925,7 @@ where .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?, ), kzg, + block_production_state: Arc::new(Mutex::new(None)), }; let head = beacon_chain.head_snapshot(); diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 7c1bb04917d..9c1ba06f853 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -221,6 +221,8 @@ pub enum BeaconChainError { ProposerHeadForkChoiceError(fork_choice::Error), UnableToPublish, AvailabilityCheckError(AvailabilityCheckError), + LightClientError(LightClientError), + UnsupportedFork, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 8d3a6827946..0e5dfc80596 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -17,6 +17,8 @@ pub struct ServerSentEventHandler { contribution_tx: Sender>, payload_attributes_tx: Sender>, late_head: Sender>, + light_client_finality_update_tx: Sender>, + light_client_optimistic_update_tx: Sender>, block_reward_tx: Sender>, log: Logger, } @@ -40,6 +42,8 @@ impl ServerSentEventHandler { let (contribution_tx, _) = broadcast::channel(capacity); let (payload_attributes_tx, _) = broadcast::channel(capacity); let (late_head, _) = broadcast::channel(capacity); + let (light_client_finality_update_tx, _) = broadcast::channel(capacity); + let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity); let (block_reward_tx, _) = broadcast::channel(capacity); Self { @@ -53,6 +57,8 @@ impl ServerSentEventHandler { contribution_tx, payload_attributes_tx, late_head, + light_client_finality_update_tx, + light_client_optimistic_update_tx, block_reward_tx, log, } @@ -108,6 +114,14 @@ impl ServerSentEventHandler { .late_head .send(kind) .map(|count| log_count("late head", count)), + EventKind::LightClientFinalityUpdate(_) => self + .light_client_finality_update_tx + .send(kind) + .map(|count| log_count("light client finality update", count)), + EventKind::LightClientOptimisticUpdate(_) => self + .light_client_optimistic_update_tx + .send(kind) + .map(|count| log_count("light client optimistic update", count)), EventKind::BlockReward(_) => self .block_reward_tx .send(kind) @@ -158,6 +172,14 @@ impl ServerSentEventHandler { self.late_head.subscribe() } + pub fn subscribe_light_client_finality_update(&self) -> Receiver> { + self.light_client_finality_update_tx.subscribe() + } + + pub fn subscribe_light_client_optimistic_update(&self) -> Receiver> { + self.light_client_optimistic_update_tx.subscribe() + } + pub fn subscribe_block_reward(&self) -> Receiver> { self.block_reward_tx.subscribe() } diff --git a/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs b/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs index 86cc6695edb..791d63ccfe5 100644 --- a/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs @@ -67,7 +67,7 @@ impl VerifiedLightClientFinalityUpdate { chain: &BeaconChain, seen_timestamp: Duration, ) -> Result { - let gossiped_finality_slot = light_client_finality_update.finalized_header.slot; + let gossiped_finality_slot = light_client_finality_update.finalized_header.beacon.slot; let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); let signature_slot = light_client_finality_update.signature_slot; let start_time = chain.slot_clock.start_of(signature_slot); @@ -88,7 +88,7 @@ impl VerifiedLightClientFinalityUpdate { .get_blinded_block(&finalized_block_root)? .ok_or(Error::FailedConstructingUpdate)?; let latest_seen_finality_update_slot = match latest_seen_finality_update.as_ref() { - Some(update) => update.finalized_header.slot, + Some(update) => update.finalized_header.beacon.slot, None => Slot::new(0), }; diff --git a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs index 8b7e6445273..374cc9a7753 100644 --- a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs @@ -71,7 +71,7 @@ impl VerifiedLightClientOptimisticUpdate { chain: &BeaconChain, seen_timestamp: Duration, ) -> Result { - let gossiped_optimistic_slot = light_client_optimistic_update.attested_header.slot; + let gossiped_optimistic_slot = light_client_optimistic_update.attested_header.beacon.slot; let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); let signature_slot = light_client_optimistic_update.signature_slot; let start_time = chain.slot_clock.start_of(signature_slot); @@ -88,7 +88,7 @@ impl VerifiedLightClientOptimisticUpdate { .get_state(&attested_block.state_root(), Some(attested_block.slot()))? .ok_or(Error::FailedConstructingUpdate)?; let latest_seen_optimistic_update_slot = match latest_seen_optimistic_update.as_ref() { - Some(update) => update.attested_header.slot, + Some(update) => update.attested_header.beacon.slot, None => Slot::new(0), }; @@ -114,6 +114,7 @@ impl VerifiedLightClientOptimisticUpdate { // otherwise queue let canonical_root = light_client_optimistic_update .attested_header + .beacon .canonical_root(); if canonical_root != head_block.message().parent_root() { diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index f3e97168a5c..c04815ebc13 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -45,6 +45,9 @@ const MAX_ADVANCE_DISTANCE: u64 = 4; /// impact whilst having 8 epochs without a block is a comfortable grace period. const MAX_FORK_CHOICE_DISTANCE: u64 = 256; +/// Drop any unused block production state cache after this many slots. +const MAX_BLOCK_PRODUCTION_CACHE_DISTANCE: u64 = 4; + #[derive(Debug)] enum Error { BeaconChain(BeaconChainError), @@ -227,19 +230,73 @@ async fn state_advance_timer( // Prepare proposers so that the node can send payload attributes in the case where // it decides to abandon a proposer boost re-org. - if let Err(e) = beacon_chain.prepare_beacon_proposer(current_slot).await { - warn!( - log, - "Unable to prepare proposer with lookahead"; - "error" => ?e, - "slot" => next_slot, - ); - } + let proposer_head = beacon_chain + .prepare_beacon_proposer(current_slot) + .await + .unwrap_or_else(|e| { + warn!( + log, + "Unable to prepare proposer with lookahead"; + "error" => ?e, + "slot" => next_slot, + ); + None + }); // Use a blocking task to avoid blocking the core executor whilst waiting for locks // in `ForkChoiceSignalTx`. beacon_chain.task_executor.clone().spawn_blocking( move || { + // If we're proposing, clone the head state preemptively so that it isn't on + // the hot path of proposing. We can delete this once we have tree-states. + if let Some(proposer_head) = proposer_head { + let mut cache = beacon_chain.block_production_state.lock(); + + // Avoid holding two states in memory. It's OK to hold the lock because + // we always lock the block production cache before the snapshot cache + // and we prefer for block production to wait for the block production + // cache if a clone is in-progress. + if cache + .as_ref() + .map_or(false, |(cached_head, _)| *cached_head != proposer_head) + { + drop(cache.take()); + } + if let Some(proposer_state) = beacon_chain + .snapshot_cache + .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .and_then(|snapshot_cache| { + snapshot_cache.get_state_for_block_production(proposer_head) + }) + { + *cache = Some((proposer_head, proposer_state)); + debug!( + log, + "Cloned state ready for block production"; + "head_block_root" => ?proposer_head, + "slot" => next_slot + ); + } else { + warn!( + log, + "Block production state missing from snapshot cache"; + "head_block_root" => ?proposer_head, + "slot" => next_slot + ); + } + } else { + // If we aren't proposing, drop any old block production cache to save + // memory. + let mut cache = beacon_chain.block_production_state.lock(); + if let Some((_, state)) = &*cache { + if state.pre_state.slot() + MAX_BLOCK_PRODUCTION_CACHE_DISTANCE + <= current_slot + { + drop(cache.take()); + } + } + } + // Signal block proposal for the next slot (if it happens to be waiting). if let Some(tx) = &beacon_chain.fork_choice_signal_tx { if let Err(e) = tx.notify_fork_choice_complete(next_slot) { diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 3993e442ad7..07fdf6414c1 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -320,6 +320,7 @@ pub struct BuilderParams { pub chain_health: ChainHealth, } +#[derive(PartialEq)] pub enum ChainHealth { Healthy, Unhealthy(FailedCondition), @@ -327,7 +328,7 @@ pub enum ChainHealth { PreMerge, } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum FailedCondition { Skips, SkipsPerEpoch, @@ -928,259 +929,99 @@ impl ExecutionLayer { } } - async fn determine_and_fetch_payload( + /// Fetches local and builder paylaods concurrently, Logs and returns results. + async fn fetch_builder_and_local_payloads( &self, + builder: &BuilderHttpClient, parent_hash: ExecutionBlockHash, + builder_params: &BuilderParams, payload_attributes: &PayloadAttributes, forkchoice_update_params: ForkchoiceUpdateParameters, - builder_params: BuilderParams, current_fork: ForkName, - spec: &ChainSpec, - ) -> Result>, Error> { - if let Some(builder) = self.builder() { - let slot = builder_params.slot; - let pubkey = builder_params.pubkey; - match builder_params.chain_health { - ChainHealth::Healthy => { - info!( - self.log(), - "Requesting blinded header from connected builder"; - "slot" => ?slot, - "pubkey" => ?pubkey, - "parent_hash" => ?parent_hash, - ); - - // Wait for the builder *and* local EL to produce a payload (or return an error). - let ((relay_result, relay_duration), (local_result_type, local_duration)) = tokio::join!( - timed_future(metrics::GET_BLINDED_PAYLOAD_BUILDER, async { - builder - .get_builder_header::(slot, parent_hash, &pubkey) - .await - }), - timed_future(metrics::GET_BLINDED_PAYLOAD_LOCAL, async { - self.get_full_payload_caching( - parent_hash, - payload_attributes, - forkchoice_update_params, - current_fork, - ) - .await - }) - ); - - let local_result = match local_result_type? { - GetPayloadResponseType::Full(payload) => Ok(payload), - GetPayloadResponseType::Blinded(_) => Err(Error::PayloadTypeMismatch), - }; - - info!( - self.log(), - "Requested blinded execution payload"; - "relay_fee_recipient" => match &relay_result { - Ok(Some(r)) => format!("{:?}", r.data.message.header().fee_recipient()), - Ok(None) => "empty response".to_string(), - Err(_) => "request failed".to_string(), - }, - "relay_response_ms" => relay_duration.as_millis(), - "local_fee_recipient" => match &local_result { - Ok(get_payload_response) => format!("{:?}", get_payload_response.fee_recipient()), - Err(_) => "request failed".to_string() - }, - "local_response_ms" => local_duration.as_millis(), - "parent_hash" => ?parent_hash, - ); - - return match (relay_result, local_result) { - (Err(e), Ok(local)) => { - warn!( - self.log(), - "Builder error when requesting payload"; - "info" => "falling back to local execution client", - "relay_error" => ?e, - "local_block_hash" => ?local.block_hash(), - "parent_hash" => ?parent_hash, - ); - Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full( - local.try_into()?, - ))) - } - (Ok(None), Ok(local)) => { - info!( - self.log(), - "Builder did not return a payload"; - "info" => "falling back to local execution client", - "local_block_hash" => ?local.block_hash(), - "parent_hash" => ?parent_hash, - ); - Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full( - local.try_into()?, - ))) - } - (Ok(Some(relay)), Ok(local)) => { - let header = &relay.data.message.header(); + ) -> ( + Result>>, builder_client::Error>, + Result, Error>, + ) { + let slot = builder_params.slot; + let pubkey = &builder_params.pubkey; - info!( - self.log(), - "Received local and builder payloads"; - "relay_block_hash" => ?header.block_hash(), - "local_block_hash" => ?local.block_hash(), - "parent_hash" => ?parent_hash, - ); + info!( + self.log(), + "Requesting blinded header from connected builder"; + "slot" => ?slot, + "pubkey" => ?pubkey, + "parent_hash" => ?parent_hash, + ); - let relay_value = relay.data.message.value(); - let local_value = *local.block_value(); - - if !self.inner.always_prefer_builder_payload { - if local_value >= *relay_value { - info!( - self.log(), - "Local block is more profitable than relay block"; - "local_block_value" => %local_value, - "relay_value" => %relay_value - ); - return Ok(ProvenancedPayload::Local( - BlockProposalContentsType::Full(local.try_into()?), - )); - } else if local.should_override_builder().unwrap_or(false) { - let percentage_difference = - percentage_difference_u256(local_value, *relay_value); - if percentage_difference.map_or(false, |percentage| { - percentage - < self - .inner - .ignore_builder_override_suggestion_threshold - }) { - info!( - self.log(), - "Using local payload because execution engine suggested we ignore builder payload"; - "local_block_value" => %local_value, - "relay_value" => %relay_value - ); - return Ok(ProvenancedPayload::Local( - BlockProposalContentsType::Full(local.try_into()?), - )); - } - } else { - info!( - self.log(), - "Relay block is more profitable than local block"; - "local_block_value" => %local_value, - "relay_value" => %relay_value - ); - } - } - - match verify_builder_bid( - &relay, - parent_hash, - payload_attributes, - Some(local.block_number()), - self.inner.builder_profit_threshold, - current_fork, - spec, - ) { - Ok(()) => Ok(ProvenancedPayload::try_from(relay.data.message)?), - Err(reason) if !reason.payload_invalid() => { - info!( - self.log(), - "Builder payload ignored"; - "info" => "using local payload", - "reason" => %reason, - "relay_block_hash" => ?header.block_hash(), - "parent_hash" => ?parent_hash, - ); - Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full( - local.try_into()?, - ))) - } - Err(reason) => { - metrics::inc_counter_vec( - &metrics::EXECUTION_LAYER_GET_PAYLOAD_BUILDER_REJECTIONS, - &[reason.as_ref().as_ref()], - ); - warn!( - self.log(), - "Builder returned invalid payload"; - "info" => "using local payload", - "reason" => %reason, - "relay_block_hash" => ?header.block_hash(), - "parent_hash" => ?parent_hash, - ); - Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full( - local.try_into()?, - ))) - } - } - } - (Ok(Some(relay)), Err(local_error)) => { - let header = &relay.data.message.header(); + // Wait for the builder *and* local EL to produce a payload (or return an error). + let ((relay_result, relay_duration), (local_result, local_duration)) = tokio::join!( + timed_future(metrics::GET_BLINDED_PAYLOAD_BUILDER, async { + builder + .get_builder_header::(slot, parent_hash, pubkey) + .await + }), + timed_future(metrics::GET_BLINDED_PAYLOAD_LOCAL, async { + self.get_full_payload_caching( + parent_hash, + payload_attributes, + forkchoice_update_params, + current_fork, + ) + .await + .and_then(|local_result_type| match local_result_type { + GetPayloadResponseType::Full(payload) => Ok(payload), + GetPayloadResponseType::Blinded(_) => Err(Error::PayloadTypeMismatch), + }) + }) + ); - info!( - self.log(), - "Received builder payload with local error"; - "relay_block_hash" => ?header.block_hash(), - "local_error" => ?local_error, - "parent_hash" => ?parent_hash, - ); + info!( + self.log(), + "Requested blinded execution payload"; + "relay_fee_recipient" => match &relay_result { + Ok(Some(r)) => format!("{:?}", r.data.message.header().fee_recipient()), + Ok(None) => "empty response".to_string(), + Err(_) => "request failed".to_string(), + }, + "relay_response_ms" => relay_duration.as_millis(), + "local_fee_recipient" => match &local_result { + Ok(get_payload_response) => format!("{:?}", get_payload_response.fee_recipient()), + Err(_) => "request failed".to_string() + }, + "local_response_ms" => local_duration.as_millis(), + "parent_hash" => ?parent_hash, + ); - match verify_builder_bid( - &relay, - parent_hash, - payload_attributes, - None, - self.inner.builder_profit_threshold, - current_fork, - spec, - ) { - Ok(()) => Ok(ProvenancedPayload::try_from(relay.data.message)?), - // If the payload is valid then use it. The local EE failed - // to produce a payload so we have no alternative. - Err(e) if !e.payload_invalid() => { - Ok(ProvenancedPayload::try_from(relay.data.message)?) - } - Err(reason) => { - metrics::inc_counter_vec( - &metrics::EXECUTION_LAYER_GET_PAYLOAD_BUILDER_REJECTIONS, - &[reason.as_ref().as_ref()], - ); - crit!( - self.log(), - "Builder returned invalid payload"; - "info" => "no local payload either - unable to propose block", - "reason" => %reason, - "relay_block_hash" => ?header.block_hash(), - "parent_hash" => ?parent_hash, - ); - Err(Error::CannotProduceHeader) - } - } - } - (Err(relay_error), Err(local_error)) => { - crit!( - self.log(), - "Unable to produce execution payload"; - "info" => "the local EL and builder both failed - unable to propose block", - "relay_error" => ?relay_error, - "local_error" => ?local_error, - "parent_hash" => ?parent_hash, - ); + (relay_result, local_result) + } - Err(Error::CannotProduceHeader) - } - (Ok(None), Err(local_error)) => { - crit!( - self.log(), - "Unable to produce execution payload"; - "info" => "the local EL failed and the builder returned nothing - \ - the block proposal will be missed", - "local_error" => ?local_error, - "parent_hash" => ?parent_hash, - ); + async fn determine_and_fetch_payload( + &self, + parent_hash: ExecutionBlockHash, + payload_attributes: &PayloadAttributes, + forkchoice_update_params: ForkchoiceUpdateParameters, + builder_params: BuilderParams, + current_fork: ForkName, + spec: &ChainSpec, + ) -> Result>, Error> { + let Some(builder) = self.builder() else { + // no builder.. return local payload + return self + .get_full_payload_caching( + parent_hash, + payload_attributes, + forkchoice_update_params, + current_fork, + ) + .await + .and_then(GetPayloadResponseType::try_into) + .map(ProvenancedPayload::Local); + }; - Err(Error::CannotProduceHeader) - } - }; - } + // check chain health + if builder_params.chain_health != ChainHealth::Healthy { + // chain is unhealthy, gotta use local payload + match builder_params.chain_health { ChainHealth::Unhealthy(condition) => info!( self.log(), "Chain is unhealthy, using local payload"; @@ -1196,17 +1037,220 @@ impl ExecutionLayer { "info" => "the local execution engine is syncing and the builder network \ cannot safely be used - unable to propose block" ), + ChainHealth::Healthy => crit!( + self.log(), + "got healthy but also not healthy.. this shouldn't happen!" + ), + } + return self + .get_full_payload_caching( + parent_hash, + payload_attributes, + forkchoice_update_params, + current_fork, + ) + .await + .and_then(GetPayloadResponseType::try_into) + .map(ProvenancedPayload::Local); + } + + let (relay_result, local_result) = self + .fetch_builder_and_local_payloads( + builder.as_ref(), + parent_hash, + &builder_params, + payload_attributes, + forkchoice_update_params, + current_fork, + ) + .await; + + match (relay_result, local_result) { + (Err(e), Ok(local)) => { + warn!( + self.log(), + "Builder error when requesting payload"; + "info" => "falling back to local execution client", + "relay_error" => ?e, + "local_block_hash" => ?local.block_hash(), + "parent_hash" => ?parent_hash, + ); + Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full( + local.try_into()?, + ))) + } + (Ok(None), Ok(local)) => { + info!( + self.log(), + "Builder did not return a payload"; + "info" => "falling back to local execution client", + "local_block_hash" => ?local.block_hash(), + "parent_hash" => ?parent_hash, + ); + Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full( + local.try_into()?, + ))) + } + (Err(relay_error), Err(local_error)) => { + crit!( + self.log(), + "Unable to produce execution payload"; + "info" => "the local EL and builder both failed - unable to propose block", + "relay_error" => ?relay_error, + "local_error" => ?local_error, + "parent_hash" => ?parent_hash, + ); + + Err(Error::CannotProduceHeader) + } + (Ok(None), Err(local_error)) => { + crit!( + self.log(), + "Unable to produce execution payload"; + "info" => "the local EL failed and the builder returned nothing - \ + the block proposal will be missed", + "local_error" => ?local_error, + "parent_hash" => ?parent_hash, + ); + + Err(Error::CannotProduceHeader) + } + (Ok(Some(relay)), Ok(local)) => { + let header = &relay.data.message.header(); + + info!( + self.log(), + "Received local and builder payloads"; + "relay_block_hash" => ?header.block_hash(), + "local_block_hash" => ?local.block_hash(), + "parent_hash" => ?parent_hash, + ); + + // check relay payload validity + if let Err(reason) = verify_builder_bid( + &relay, + parent_hash, + payload_attributes, + Some(local.block_number()), + current_fork, + spec, + ) { + // relay payload invalid -> return local + metrics::inc_counter_vec( + &metrics::EXECUTION_LAYER_GET_PAYLOAD_BUILDER_REJECTIONS, + &[reason.as_ref().as_ref()], + ); + warn!( + self.log(), + "Builder returned invalid payload"; + "info" => "using local payload", + "reason" => %reason, + "relay_block_hash" => ?header.block_hash(), + "parent_hash" => ?parent_hash, + ); + return Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full( + local.try_into()?, + ))); + } + + if self.inner.always_prefer_builder_payload { + return ProvenancedPayload::try_from(relay.data.message); + } + + let relay_value = *relay.data.message.value(); + let local_value = *local.block_value(); + + if local_value >= relay_value { + info!( + self.log(), + "Local block is more profitable than relay block"; + "local_block_value" => %local_value, + "relay_value" => %relay_value + ); + return Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full( + local.try_into()?, + ))); + } + + if relay_value < self.inner.builder_profit_threshold { + info!( + self.log(), + "Builder payload ignored"; + "info" => "using local payload", + "reason" => format!("payload value of {} does not meet user-configured profit-threshold of {}", relay_value, self.inner.builder_profit_threshold), + "relay_block_hash" => ?header.block_hash(), + "parent_hash" => ?parent_hash, + ); + return Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full( + local.try_into()?, + ))); + } + + if local.should_override_builder().unwrap_or(false) { + let percentage_difference = + percentage_difference_u256(local_value, relay_value); + if percentage_difference.map_or(false, |percentage| { + percentage < self.inner.ignore_builder_override_suggestion_threshold + }) { + info!( + self.log(), + "Using local payload because execution engine suggested we ignore builder payload"; + "local_block_value" => %local_value, + "relay_value" => %relay_value + ); + return Ok(ProvenancedPayload::Local(BlockProposalContentsType::Full( + local.try_into()?, + ))); + } + } + + info!( + self.log(), + "Relay block is more profitable than local block"; + "local_block_value" => %local_value, + "relay_value" => %relay_value + ); + + Ok(ProvenancedPayload::try_from(relay.data.message)?) + } + (Ok(Some(relay)), Err(local_error)) => { + let header = &relay.data.message.header(); + + info!( + self.log(), + "Received builder payload with local error"; + "relay_block_hash" => ?header.block_hash(), + "local_error" => ?local_error, + "parent_hash" => ?parent_hash, + ); + + match verify_builder_bid( + &relay, + parent_hash, + payload_attributes, + None, + current_fork, + spec, + ) { + Ok(()) => Ok(ProvenancedPayload::try_from(relay.data.message)?), + Err(reason) => { + metrics::inc_counter_vec( + &metrics::EXECUTION_LAYER_GET_PAYLOAD_BUILDER_REJECTIONS, + &[reason.as_ref().as_ref()], + ); + crit!( + self.log(), + "Builder returned invalid payload"; + "info" => "no local payload either - unable to propose block", + "reason" => %reason, + "relay_block_hash" => ?header.block_hash(), + "parent_hash" => ?parent_hash, + ); + Err(Error::CannotProduceHeader) + } + } } } - self.get_full_payload_caching( - parent_hash, - payload_attributes, - forkchoice_update_params, - current_fork, - ) - .await - .and_then(GetPayloadResponseType::try_into) - .map(ProvenancedPayload::Local) } /// Get a full payload and cache its result in the execution layer's payload cache. @@ -2027,10 +2071,6 @@ impl ExecutionLayer { #[derive(AsRefStr)] #[strum(serialize_all = "snake_case")] enum InvalidBuilderPayload { - LowValue { - profit_threshold: Uint256, - payload_value: Uint256, - }, ParentHash { payload: ExecutionBlockHash, expected: ExecutionBlockHash, @@ -2061,34 +2101,9 @@ enum InvalidBuilderPayload { }, } -impl InvalidBuilderPayload { - /// Returns `true` if a payload is objectively invalid and should never be included on chain. - fn payload_invalid(&self) -> bool { - match self { - // A low-value payload isn't invalid, it should just be avoided if possible. - InvalidBuilderPayload::LowValue { .. } => false, - InvalidBuilderPayload::ParentHash { .. } => true, - InvalidBuilderPayload::PrevRandao { .. } => true, - InvalidBuilderPayload::Timestamp { .. } => true, - InvalidBuilderPayload::BlockNumber { .. } => true, - InvalidBuilderPayload::Fork { .. } => true, - InvalidBuilderPayload::Signature { .. } => true, - InvalidBuilderPayload::WithdrawalsRoot { .. } => true, - } - } -} - impl fmt::Display for InvalidBuilderPayload { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - InvalidBuilderPayload::LowValue { - profit_threshold, - payload_value, - } => write!( - f, - "payload value of {} does not meet user-configured profit-threshold of {}", - payload_value, profit_threshold - ), InvalidBuilderPayload::ParentHash { payload, expected } => { write!(f, "payload block hash was {} not {}", payload, expected) } @@ -2132,13 +2147,11 @@ fn verify_builder_bid( parent_hash: ExecutionBlockHash, payload_attributes: &PayloadAttributes, block_number: Option, - profit_threshold: Uint256, current_fork: ForkName, spec: &ChainSpec, ) -> Result<(), Box> { let is_signature_valid = bid.data.verify_signature(spec); let header = &bid.data.message.header(); - let payload_value = bid.data.message.value(); // Avoid logging values that we can't represent with our Prometheus library. let payload_value_gwei = bid.data.message.value() / 1_000_000_000; @@ -2157,12 +2170,7 @@ fn verify_builder_bid( .map(|withdrawals| Withdrawals::::from(withdrawals).tree_hash_root()); let payload_withdrawals_root = header.withdrawals_root().ok().copied(); - if *payload_value < profit_threshold { - Err(Box::new(InvalidBuilderPayload::LowValue { - profit_threshold, - payload_value: *payload_value, - })) - } else if header.parent_hash() != parent_hash { + if header.parent_hash() != parent_hash { Err(Box::new(InvalidBuilderPayload::ParentHash { payload: header.parent_hash(), expected: parent_hash, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index a6f9d9ffcec..5b00a80bdf0 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -77,9 +77,10 @@ use tokio_stream::{ use types::{ Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, - ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, - SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, + ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, + SignedAggregateAndProof, SignedBlsToExecutionChange, SignedContributionAndProof, + SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, + SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -143,6 +144,7 @@ pub struct Config { pub enable_beacon_processor: bool, #[serde(with = "eth2::types::serde_status_code")] pub duplicate_block_status_code: StatusCode, + pub enable_light_client_server: bool, } impl Default for Config { @@ -159,6 +161,7 @@ impl Default for Config { sse_capacity_multiplier: 1, enable_beacon_processor: true, duplicate_block_status_code: StatusCode::ACCEPTED, + enable_light_client_server: false, } } } @@ -279,6 +282,18 @@ pub fn prometheus_metrics() -> warp::filters::log::Log impl Filter + Clone { + warp::any() + .and_then(move || async move { + if is_enabled { + Ok(()) + } else { + Err(warp::reject::not_found()) + } + }) + .untuple_one() +} + /// Creates a server that will serve requests using information from `ctx`. /// /// The server will shut down gracefully when the `shutdown` future resolves. @@ -2379,6 +2394,164 @@ pub fn serve( }, ); + /* + * beacon/light_client + */ + + let beacon_light_client_path = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("light_client")) + .and(chain_filter.clone()); + + // GET beacon/light_client/bootstrap/{block_root} + let get_beacon_light_client_bootstrap = beacon_light_client_path + .clone() + .and(task_spawner_filter.clone()) + .and(warp::path("bootstrap")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid block root value".to_string(), + )) + })) + .and(warp::path::end()) + .and(warp::header::optional::("accept")) + .then( + |chain: Arc>, + task_spawner: TaskSpawner, + block_root: Hash256, + accept_header: Option| { + task_spawner.blocking_response_task(Priority::P1, move || { + let (bootstrap, fork_name) = match chain.get_light_client_bootstrap(&block_root) + { + Ok(Some(res)) => res, + Ok(None) => { + return Err(warp_utils::reject::custom_not_found( + "Light client bootstrap unavailable".to_string(), + )); + } + Err(e) => { + return Err(warp_utils::reject::custom_server_error(format!( + "Unable to obtain LightClientBootstrap instance: {e:?}" + ))); + } + }; + + match accept_header { + Some(api_types::Accept::Ssz) => Response::builder() + .status(200) + .header("Content-Type", "application/octet-stream") + .body(bootstrap.as_ssz_bytes().into()) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to create response: {}", + e + )) + }), + _ => Ok(warp::reply::json(&ForkVersionedResponse { + version: Some(fork_name), + data: bootstrap, + }) + .into_response()), + } + .map(|resp| add_consensus_version_header(resp, fork_name)) + }) + }, + ); + + // GET beacon/light_client/optimistic_update + let get_beacon_light_client_optimistic_update = beacon_light_client_path + .clone() + .and(task_spawner_filter.clone()) + .and(warp::path("optimistic_update")) + .and(warp::path::end()) + .and(warp::header::optional::("accept")) + .then( + |chain: Arc>, + task_spawner: TaskSpawner, + accept_header: Option| { + task_spawner.blocking_response_task(Priority::P1, move || { + let update = chain + .latest_seen_optimistic_update + .lock() + .clone() + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "No LightClientOptimisticUpdate is available".to_string(), + ) + })?; + + let fork_name = chain + .spec + .fork_name_at_slot::(update.signature_slot); + match accept_header { + Some(api_types::Accept::Ssz) => Response::builder() + .status(200) + .header("Content-Type", "application/octet-stream") + .body(update.as_ssz_bytes().into()) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to create response: {}", + e + )) + }), + _ => Ok(warp::reply::json(&ForkVersionedResponse { + version: Some(fork_name), + data: update, + }) + .into_response()), + } + .map(|resp| add_consensus_version_header(resp, fork_name)) + }) + }, + ); + + // GET beacon/light_client/finality_update + let get_beacon_light_client_finality_update = beacon_light_client_path + .clone() + .and(task_spawner_filter.clone()) + .and(warp::path("finality_update")) + .and(warp::path::end()) + .and(warp::header::optional::("accept")) + .then( + |chain: Arc>, + task_spawner: TaskSpawner, + accept_header: Option| { + task_spawner.blocking_response_task(Priority::P1, move || { + let update = chain + .latest_seen_finality_update + .lock() + .clone() + .ok_or_else(|| { + warp_utils::reject::custom_not_found( + "No LightClientFinalityUpdate is available".to_string(), + ) + })?; + + let fork_name = chain + .spec + .fork_name_at_slot::(update.signature_slot); + match accept_header { + Some(api_types::Accept::Ssz) => Response::builder() + .status(200) + .header("Content-Type", "application/octet-stream") + .body(update.as_ssz_bytes().into()) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to create response: {}", + e + )) + }), + _ => Ok(warp::reply::json(&ForkVersionedResponse { + version: Some(fork_name), + data: update, + }) + .into_response()), + } + .map(|resp| add_consensus_version_header(resp, fork_name)) + }) + }, + ); + /* * beacon/rewards */ @@ -4339,6 +4512,12 @@ pub fn serve( api_types::EventTopic::LateHead => { event_handler.subscribe_late_head() } + api_types::EventTopic::LightClientFinalityUpdate => { + event_handler.subscribe_light_client_finality_update() + } + api_types::EventTopic::LightClientOptimisticUpdate => { + event_handler.subscribe_light_client_optimistic_update() + } api_types::EventTopic::BlockReward => { event_handler.subscribe_block_reward() } @@ -4492,6 +4671,18 @@ pub fn serve( .uor(get_lighthouse_database_info) .uor(get_lighthouse_block_rewards) .uor(get_lighthouse_attestation_performance) + .uor( + enable(ctx.config.enable_light_client_server) + .and(get_beacon_light_client_optimistic_update), + ) + .uor( + enable(ctx.config.enable_light_client_server) + .and(get_beacon_light_client_finality_update), + ) + .uor( + enable(ctx.config.enable_light_client_server) + .and(get_beacon_light_client_bootstrap), + ) .uor(get_lighthouse_block_packing_efficiency) .uor(get_lighthouse_merge_readiness) .uor(get_events) diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index fe47e56dc57..bafb5738194 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -209,6 +209,7 @@ pub async fn create_api_server( enabled: true, listen_port: port, data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR), + enable_light_client_server: true, ..Config::default() }, chain: Some(chain), diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index c0aa4fe58b7..d5fa50ba219 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1646,6 +1646,59 @@ impl ApiTester { self } + pub async fn test_get_beacon_light_client_bootstrap(self) -> Self { + let block_id = BlockId(CoreBlockId::Finalized); + let (block_root, _, _) = block_id.root(&self.chain).unwrap(); + let (block, _, _) = block_id.full_block(&self.chain).await.unwrap(); + + let result = match self + .client + .get_light_client_bootstrap::(block_root) + .await + { + Ok(result) => result.unwrap().data, + Err(e) => panic!("query failed incorrectly: {e:?}"), + }; + + let expected = block.slot(); + assert_eq!(result.header.beacon.slot, expected); + + self + } + + pub async fn test_get_beacon_light_client_optimistic_update(self) -> Self { + // get_beacon_light_client_optimistic_update returns Ok(None) on 404 NOT FOUND + let result = match self + .client + .get_beacon_light_client_optimistic_update::() + .await + { + Ok(result) => result.map(|res| res.data), + Err(e) => panic!("query failed incorrectly: {e:?}"), + }; + + let expected = self.chain.latest_seen_optimistic_update.lock().clone(); + assert_eq!(result, expected); + + self + } + + pub async fn test_get_beacon_light_client_finality_update(self) -> Self { + let result = match self + .client + .get_beacon_light_client_finality_update::() + .await + { + Ok(result) => result.map(|res| res.data), + Err(e) => panic!("query failed incorrectly: {e:?}"), + }; + + let expected = self.chain.latest_seen_finality_update.lock().clone(); + assert_eq!(result, expected); + + self + } + pub async fn test_get_beacon_pool_attestations(self) -> Self { let result = self .client @@ -3376,7 +3429,7 @@ impl ApiTester { let result = self .client - .post_validator_liveness_epoch(epoch, indices.clone()) + .post_validator_liveness_epoch(epoch, &indices) .await .unwrap() .data; @@ -3391,7 +3444,7 @@ impl ApiTester { let result = self .client - .post_validator_liveness_epoch(epoch, indices.clone()) + .post_validator_liveness_epoch(epoch, &indices) .await .unwrap() .data; @@ -5701,6 +5754,42 @@ async fn node_get() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_light_client_bootstrap() { + let config = ApiTesterConfig { + spec: ForkName::Altair.make_genesis_spec(E::default_spec()), + ..<_>::default() + }; + ApiTester::new_from_config(config) + .await + .test_get_beacon_light_client_bootstrap() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_light_client_optimistic_update() { + let config = ApiTesterConfig { + spec: ForkName::Altair.make_genesis_spec(E::default_spec()), + ..<_>::default() + }; + ApiTester::new_from_config(config) + .await + .test_get_beacon_light_client_optimistic_update() + .await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_light_client_finality_update() { + let config = ApiTesterConfig { + spec: ForkName::Altair.make_genesis_spec(E::default_spec()), + ..<_>::default() + }; + ApiTester::new_from_config(config) + .await + .test_get_beacon_light_client_finality_update() + .await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn get_validator_duties_early() { ApiTester::new() diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 6e83cfc86de..787c3dcb7a5 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -15,11 +15,10 @@ use std::io::{Read, Write}; use std::marker::PhantomData; use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; -use types::{light_client_bootstrap::LightClientBootstrap, BlobSidecar}; use types::{ - EthSpec, ForkContext, ForkName, Hash256, SignedBeaconBlock, SignedBeaconBlockAltair, - SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, - SignedBeaconBlockMerge, + BlobSidecar, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap, SignedBeaconBlock, + SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella, + SignedBeaconBlockDeneb, SignedBeaconBlockMerge, }; use unsigned_varint::codec::Uvi; diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 438c2c75447..627c871c471 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -17,8 +17,8 @@ use superstruct::superstruct; use types::blob_sidecar::BlobIdentifier; use types::consts::deneb::MAX_BLOBS_PER_BLOCK; use types::{ - blob_sidecar::BlobSidecar, light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, - Hash256, SignedBeaconBlock, Slot, + blob_sidecar::BlobSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap, SignedBeaconBlock, + Slot, }; /// Maximum number of blocks in a single request. @@ -571,7 +571,11 @@ impl std::fmt::Display for RPCResponse { RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()), RPCResponse::LightClientBootstrap(bootstrap) => { - write!(f, "LightClientBootstrap Slot: {}", bootstrap.header.slot) + write!( + f, + "LightClientBootstrap Slot: {}", + bootstrap.header.beacon.slot + ) } } } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 6ede0666a3a..96c9d283327 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -1,8 +1,7 @@ use std::sync::Arc; use libp2p::swarm::ConnectionId; -use types::light_client_bootstrap::LightClientBootstrap; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, EthSpec, LightClientBootstrap, SignedBeaconBlock}; use crate::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; use crate::rpc::{ diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index f2ca3428097..430e0571b7e 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -18,9 +18,7 @@ use std::sync::Arc; use task_executor::TaskExecutor; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; -use types::{ - light_client_bootstrap::LightClientBootstrap, Epoch, EthSpec, ForkName, Hash256, Slot, -}; +use types::{Epoch, EthSpec, ForkName, Hash256, Slot}; impl NetworkBeaconProcessor { /* Auxiliary functions */ @@ -304,66 +302,32 @@ impl NetworkBeaconProcessor { request: LightClientBootstrapRequest, ) { let block_root = request.root; - let state_root = match self.chain.get_blinded_block(&block_root) { - Ok(signed_block) => match signed_block { - Some(signed_block) => signed_block.state_root(), - None => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not available".into(), - request_id, - ); - return; - } - }, - Err(_) => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not available".into(), - request_id, - ); - return; - } - }; - let mut beacon_state = match self.chain.get_state(&state_root, None) { - Ok(beacon_state) => match beacon_state { - Some(state) => state, - None => { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not available".into(), - request_id, - ); - return; - } - }, - Err(_) => { + match self.chain.get_light_client_bootstrap(&block_root) { + Ok(Some((bootstrap, _))) => self.send_response( + peer_id, + Response::LightClientBootstrap(bootstrap), + request_id, + ), + Ok(None) => self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Bootstrap not available".into(), + request_id, + ), + Err(e) => { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, "Bootstrap not available".into(), request_id, ); - return; + error!(self.log, "Error getting LightClientBootstrap instance"; + "block_root" => ?block_root, + "peer" => %peer_id, + "error" => ?e + ) } }; - let Ok(bootstrap) = LightClientBootstrap::from_beacon_state(&mut beacon_state) else { - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not available".into(), - request_id, - ); - return; - }; - self.send_response( - peer_id, - Response::LightClientBootstrap(bootstrap), - request_id, - ) } /// Handle a `BlocksByRange` request from the peer. diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 609626ae88a..9ceab47f336 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -151,6 +151,9 @@ pub fn get_config( client_config.http_api.duplicate_block_status_code = parse_required(cli_args, "http-duplicate-block-status")?; + + client_config.http_api.enable_light_client_server = + cli_args.is_present("light-client-server"); } if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 043c0f197e5..da5c1a5a122 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -667,6 +667,59 @@ impl BeaconNodeHttpClient { self.get_opt(path).await } + /// `GET beacon/light_client/bootstrap` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_light_client_bootstrap( + &self, + block_root: Hash256, + ) -> Result>>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("light_client") + .push("bootstrap") + .push(&format!("{:?}", block_root)); + + self.get_opt(path).await + } + + /// `GET beacon/light_client/optimistic_update` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_light_client_optimistic_update( + &self, + ) -> Result>>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("light_client") + .push("optimistic_update"); + + self.get_opt(path).await + } + + /// `GET beacon/light_client/finality_update` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_light_client_finality_update( + &self, + ) -> Result>>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("light_client") + .push("finality_update"); + + self.get_opt(path).await + } + /// `GET beacon/headers?slot,parent_root` /// /// Returns `Ok(None)` on a 404 error. @@ -2092,7 +2145,7 @@ impl BeaconNodeHttpClient { pub async fn post_validator_liveness_epoch( &self, epoch: Epoch, - indices: Vec, + indices: &Vec, ) -> Result>, Error> { let mut path = self.eth_path(V1)?; @@ -2102,7 +2155,7 @@ impl BeaconNodeHttpClient { .push("liveness") .push(&epoch.to_string()); - self.post_with_timeout_and_response(path, &indices, self.timeouts.liveness) + self.post_with_timeout_and_response(path, indices, self.timeouts.liveness) .await } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index c2d85a31d31..dea8b2bf568 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1047,6 +1047,8 @@ pub enum EventKind { ChainReorg(SseChainReorg), ContributionAndProof(Box>), LateHead(SseLateHead), + LightClientFinalityUpdate(Box>), + LightClientOptimisticUpdate(Box>), #[cfg(feature = "lighthouse")] BlockReward(BlockReward), PayloadAttributes(VersionedSsePayloadAttributes), @@ -1065,6 +1067,8 @@ impl EventKind { EventKind::ContributionAndProof(_) => "contribution_and_proof", EventKind::PayloadAttributes(_) => "payload_attributes", EventKind::LateHead(_) => "late_head", + EventKind::LightClientFinalityUpdate(_) => "light_client_finality_update", + EventKind::LightClientOptimisticUpdate(_) => "light_client_optimistic_update", #[cfg(feature = "lighthouse")] EventKind::BlockReward(_) => "block_reward", } @@ -1127,6 +1131,22 @@ impl EventKind { ServerError::InvalidServerSentEvent(format!("Payload Attributes: {:?}", e)) })?, )), + "light_client_finality_update" => Ok(EventKind::LightClientFinalityUpdate( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!( + "Light Client Finality Update: {:?}", + e + )) + })?, + )), + "light_client_optimistic_update" => Ok(EventKind::LightClientOptimisticUpdate( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!( + "Light Client Optimistic Update: {:?}", + e + )) + })?, + )), #[cfg(feature = "lighthouse")] "block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)), @@ -1158,6 +1178,8 @@ pub enum EventTopic { ContributionAndProof, LateHead, PayloadAttributes, + LightClientFinalityUpdate, + LightClientOptimisticUpdate, #[cfg(feature = "lighthouse")] BlockReward, } @@ -1177,6 +1199,8 @@ impl FromStr for EventTopic { "contribution_and_proof" => Ok(EventTopic::ContributionAndProof), "payload_attributes" => Ok(EventTopic::PayloadAttributes), "late_head" => Ok(EventTopic::LateHead), + "light_client_finality_update" => Ok(EventTopic::LightClientFinalityUpdate), + "light_client_optimistic_update" => Ok(EventTopic::LightClientOptimisticUpdate), #[cfg(feature = "lighthouse")] "block_reward" => Ok(EventTopic::BlockReward), _ => Err("event topic cannot be parsed.".to_string()), @@ -1197,6 +1221,8 @@ impl fmt::Display for EventTopic { EventTopic::ContributionAndProof => write!(f, "contribution_and_proof"), EventTopic::PayloadAttributes => write!(f, "payload_attributes"), EventTopic::LateHead => write!(f, "late_head"), + EventTopic::LightClientFinalityUpdate => write!(f, "light_client_finality_update"), + EventTopic::LightClientOptimisticUpdate => write!(f, "light_client_optimistic_update"), #[cfg(feature = "lighthouse")] EventTopic::BlockReward => write!(f, "block_reward"), } diff --git a/common/eth2_network_config/built_in_network_configs/holesky/config.yaml b/common/eth2_network_config/built_in_network_configs/holesky/config.yaml index 9f4faeb27a4..d699b9a39f8 100644 --- a/common/eth2_network_config/built_in_network_configs/holesky/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/holesky/config.yaml @@ -111,5 +111,3 @@ MAX_REQUEST_BLOB_SIDECARS: 768 MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: 4096 # `6` BLOB_SIDECAR_SUBNET_COUNT: 6 -# `uint64(6)` -MAX_BLOBS_PER_BLOCK: 6 diff --git a/consensus/state_processing/src/upgrade/altair.rs b/consensus/state_processing/src/upgrade/altair.rs index 26b1192bc16..5bb4f0bd592 100644 --- a/consensus/state_processing/src/upgrade/altair.rs +++ b/consensus/state_processing/src/upgrade/altair.rs @@ -54,7 +54,7 @@ pub fn upgrade_to_altair( VariableList::new(vec![ParticipationFlags::default(); pre.validators.len()])?; let inactivity_scores = VariableList::new(vec![0; pre.validators.len()])?; - let temp_sync_committee = Arc::new(SyncCommittee::temporary()?); + let temp_sync_committee = Arc::new(SyncCommittee::temporary()); // Where possible, use something like `mem::take` to move fields from behind the &mut // reference. For other fields that don't have a good default value, use `clone`. diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 26541a188c6..0f284bde9d2 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -99,6 +99,7 @@ pub mod slot_data; pub mod sqlite; pub mod blob_sidecar; +pub mod light_client_header; pub mod sidecar; pub mod signed_blob; @@ -154,8 +155,11 @@ pub use crate::fork_versioned_response::{ForkVersionDeserialize, ForkVersionedRe pub use crate::graffiti::{Graffiti, GRAFFITI_BYTES_LEN}; pub use crate::historical_batch::HistoricalBatch; pub use crate::indexed_attestation::IndexedAttestation; +pub use crate::light_client_bootstrap::LightClientBootstrap; pub use crate::light_client_finality_update::LightClientFinalityUpdate; +pub use crate::light_client_header::LightClientHeader; pub use crate::light_client_optimistic_update::LightClientOptimisticUpdate; +pub use crate::light_client_update::{Error as LightClientError, LightClientUpdate}; pub use crate::participation_flags::ParticipationFlags; pub use crate::participation_list::ParticipationList; pub use crate::payload::{ diff --git a/consensus/types/src/light_client_bootstrap.rs b/consensus/types/src/light_client_bootstrap.rs index 1d70456d732..616aced483a 100644 --- a/consensus/types/src/light_client_bootstrap.rs +++ b/consensus/types/src/light_client_bootstrap.rs @@ -1,10 +1,13 @@ -use super::{BeaconBlockHeader, BeaconState, EthSpec, FixedVector, Hash256, SyncCommittee}; -use crate::{light_client_update::*, test_utils::TestRandom}; -use serde::{Deserialize, Serialize}; +use super::{BeaconState, EthSpec, FixedVector, Hash256, SyncCommittee}; +use crate::{ + light_client_update::*, test_utils::TestRandom, ForkName, ForkVersionDeserialize, + LightClientHeader, +}; +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::Value; use ssz_derive::{Decode, Encode}; use std::sync::Arc; use test_random_derive::TestRandom; -use tree_hash::TreeHash; /// A LightClientBootstrap is the initializer we send over to lightclient nodes /// that are trying to generate their basic storage when booting up. @@ -22,8 +25,8 @@ use tree_hash::TreeHash; #[serde(bound = "T: EthSpec")] #[arbitrary(bound = "T: EthSpec")] pub struct LightClientBootstrap { - /// Requested beacon block header. - pub header: BeaconBlockHeader, + /// The requested beacon block header. + pub header: LightClientHeader, /// The `SyncCommittee` used in the requested period. pub current_sync_committee: Arc>, /// Merkle proof for sync committee @@ -33,17 +36,37 @@ pub struct LightClientBootstrap { impl LightClientBootstrap { pub fn from_beacon_state(beacon_state: &mut BeaconState) -> Result { let mut header = beacon_state.latest_block_header().clone(); - header.state_root = beacon_state.tree_hash_root(); + header.state_root = beacon_state.update_tree_hash_cache()?; let current_sync_committee_branch = beacon_state.compute_merkle_proof(CURRENT_SYNC_COMMITTEE_INDEX)?; Ok(LightClientBootstrap { - header, + header: header.into(), current_sync_committee: beacon_state.current_sync_committee()?.clone(), current_sync_committee_branch: FixedVector::new(current_sync_committee_branch)?, }) } } +impl ForkVersionDeserialize for LightClientBootstrap { + fn deserialize_by_fork<'de, D: Deserializer<'de>>( + value: Value, + fork_name: ForkName, + ) -> Result { + match fork_name { + ForkName::Altair | ForkName::Merge => { + Ok(serde_json::from_value::>(value) + .map_err(serde::de::Error::custom))? + } + ForkName::Base | ForkName::Capella | ForkName::Deneb => { + Err(serde::de::Error::custom(format!( + "LightClientBootstrap failed to deserialize: unsupported fork '{}'", + fork_name + ))) + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/light_client_finality_update.rs b/consensus/types/src/light_client_finality_update.rs index 30f337fb2bb..87601b81565 100644 --- a/consensus/types/src/light_client_finality_update.rs +++ b/consensus/types/src/light_client_finality_update.rs @@ -1,9 +1,12 @@ use super::{ - BeaconBlockHeader, EthSpec, FixedVector, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, - Slot, SyncAggregate, + EthSpec, FixedVector, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, Slot, SyncAggregate, }; -use crate::{light_client_update::*, test_utils::TestRandom, BeaconState, ChainSpec}; -use serde::{Deserialize, Serialize}; +use crate::{ + light_client_update::*, test_utils::TestRandom, BeaconState, ChainSpec, ForkName, + ForkVersionDeserialize, LightClientHeader, +}; +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::Value; use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; use tree_hash::TreeHash; @@ -25,9 +28,9 @@ use tree_hash::TreeHash; #[arbitrary(bound = "T: EthSpec")] pub struct LightClientFinalityUpdate { /// The last `BeaconBlockHeader` from the last attested block by the sync committee. - pub attested_header: BeaconBlockHeader, + pub attested_header: LightClientHeader, /// The last `BeaconBlockHeader` from the last attested finalized block (end of epoch). - pub finalized_header: BeaconBlockHeader, + pub finalized_header: LightClientHeader, /// Merkle proof attesting finalized header. pub finality_branch: FixedVector, /// current sync aggreggate @@ -68,8 +71,8 @@ impl LightClientFinalityUpdate { let finality_branch = attested_state.compute_merkle_proof(FINALIZED_ROOT_INDEX)?; Ok(Self { - attested_header, - finalized_header, + attested_header: attested_header.into(), + finalized_header: finalized_header.into(), finality_branch: FixedVector::new(finality_branch)?, sync_aggregate: sync_aggregate.clone(), signature_slot: block.slot(), @@ -77,6 +80,26 @@ impl LightClientFinalityUpdate { } } +impl ForkVersionDeserialize for LightClientFinalityUpdate { + fn deserialize_by_fork<'de, D: Deserializer<'de>>( + value: Value, + fork_name: ForkName, + ) -> Result { + match fork_name { + ForkName::Altair | ForkName::Merge => Ok(serde_json::from_value::< + LightClientFinalityUpdate, + >(value) + .map_err(serde::de::Error::custom))?, + ForkName::Base | ForkName::Capella | ForkName::Deneb => { + Err(serde::de::Error::custom(format!( + "LightClientFinalityUpdate failed to deserialize: unsupported fork '{}'", + fork_name + ))) + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/light_client_header.rs b/consensus/types/src/light_client_header.rs new file mode 100644 index 00000000000..8fe31f7af8c --- /dev/null +++ b/consensus/types/src/light_client_header.rs @@ -0,0 +1,26 @@ +use crate::test_utils::TestRandom; +use crate::BeaconBlockHeader; +use serde::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; +use test_random_derive::TestRandom; + +#[derive( + Debug, + Clone, + PartialEq, + Serialize, + Deserialize, + Encode, + Decode, + TestRandom, + arbitrary::Arbitrary, +)] +pub struct LightClientHeader { + pub beacon: BeaconBlockHeader, +} + +impl From for LightClientHeader { + fn from(beacon: BeaconBlockHeader) -> Self { + LightClientHeader { beacon } + } +} diff --git a/consensus/types/src/light_client_optimistic_update.rs b/consensus/types/src/light_client_optimistic_update.rs index fbb0558eced..d883d735f35 100644 --- a/consensus/types/src/light_client_optimistic_update.rs +++ b/consensus/types/src/light_client_optimistic_update.rs @@ -1,8 +1,10 @@ -use super::{BeaconBlockHeader, EthSpec, Slot, SyncAggregate}; +use super::{EthSpec, ForkName, ForkVersionDeserialize, Slot, SyncAggregate}; +use crate::light_client_header::LightClientHeader; use crate::{ light_client_update::Error, test_utils::TestRandom, BeaconState, ChainSpec, SignedBeaconBlock, }; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::Value; use ssz_derive::{Decode, Encode}; use test_random_derive::TestRandom; use tree_hash::TreeHash; @@ -24,7 +26,7 @@ use tree_hash::TreeHash; #[arbitrary(bound = "T: EthSpec")] pub struct LightClientOptimisticUpdate { /// The last `BeaconBlockHeader` from the last attested block by the sync committee. - pub attested_header: BeaconBlockHeader, + pub attested_header: LightClientHeader, /// current sync aggreggate pub sync_aggregate: SyncAggregate, /// Slot of the sync aggregated singature @@ -53,13 +55,33 @@ impl LightClientOptimisticUpdate { let mut attested_header = attested_state.latest_block_header().clone(); attested_header.state_root = attested_state.tree_hash_root(); Ok(Self { - attested_header, + attested_header: attested_header.into(), sync_aggregate: sync_aggregate.clone(), signature_slot: block.slot(), }) } } +impl ForkVersionDeserialize for LightClientOptimisticUpdate { + fn deserialize_by_fork<'de, D: Deserializer<'de>>( + value: Value, + fork_name: ForkName, + ) -> Result { + match fork_name { + ForkName::Altair | ForkName::Merge => Ok(serde_json::from_value::< + LightClientOptimisticUpdate, + >(value) + .map_err(serde::de::Error::custom))?, + ForkName::Base | ForkName::Capella | ForkName::Deneb => { + Err(serde::de::Error::custom(format!( + "LightClientOptimisticUpdate failed to deserialize: unsupported fork '{}'", + fork_name + ))) + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/light_client_update.rs b/consensus/types/src/light_client_update.rs index 6e53e14c994..718cd7553f9 100644 --- a/consensus/types/src/light_client_update.rs +++ b/consensus/types/src/light_client_update.rs @@ -1,7 +1,11 @@ use super::{BeaconBlockHeader, EthSpec, FixedVector, Hash256, Slot, SyncAggregate, SyncCommittee}; -use crate::{beacon_state, test_utils::TestRandom, BeaconBlock, BeaconState, ChainSpec}; +use crate::{ + beacon_state, test_utils::TestRandom, BeaconBlock, BeaconState, ChainSpec, ForkName, + ForkVersionDeserialize, LightClientHeader, +}; use safe_arith::ArithError; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::Value; use ssz_derive::{Decode, Encode}; use ssz_types::typenum::{U5, U6}; use std::sync::Arc; @@ -67,13 +71,13 @@ impl From for Error { #[arbitrary(bound = "T: EthSpec")] pub struct LightClientUpdate { /// The last `BeaconBlockHeader` from the last attested block by the sync committee. - pub attested_header: BeaconBlockHeader, + pub attested_header: LightClientHeader, /// The `SyncCommittee` used in the next period. pub next_sync_committee: Arc>, /// Merkle proof for next sync committee pub next_sync_committee_branch: FixedVector, /// The last `BeaconBlockHeader` from the last attested finalized block (end of epoch). - pub finalized_header: BeaconBlockHeader, + pub finalized_header: LightClientHeader, /// Merkle proof attesting finalized header. pub finality_branch: FixedVector, /// current sync aggreggate @@ -128,10 +132,10 @@ impl LightClientUpdate { attested_state.compute_merkle_proof(NEXT_SYNC_COMMITTEE_INDEX)?; let finality_branch = attested_state.compute_merkle_proof(FINALIZED_ROOT_INDEX)?; Ok(Self { - attested_header, + attested_header: attested_header.into(), next_sync_committee: attested_state.next_sync_committee()?.clone(), next_sync_committee_branch: FixedVector::new(next_sync_committee_branch)?, - finalized_header, + finalized_header: finalized_header.into(), finality_branch: FixedVector::new(finality_branch)?, sync_aggregate: sync_aggregate.clone(), signature_slot: block.slot(), @@ -139,6 +143,26 @@ impl LightClientUpdate { } } +impl ForkVersionDeserialize for LightClientUpdate { + fn deserialize_by_fork<'de, D: Deserializer<'de>>( + value: Value, + fork_name: ForkName, + ) -> Result { + match fork_name { + ForkName::Altair | ForkName::Merge => { + Ok(serde_json::from_value::>(value) + .map_err(serde::de::Error::custom))? + } + ForkName::Base | ForkName::Capella | ForkName::Deneb => { + Err(serde::de::Error::custom(format!( + "LightClientUpdate failed to deserialize: unsupported fork '{}'", + fork_name + ))) + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/sync_committee.rs b/consensus/types/src/sync_committee.rs index 0bcf505f257..b42a000bb00 100644 --- a/consensus/types/src/sync_committee.rs +++ b/consensus/types/src/sync_committee.rs @@ -1,5 +1,4 @@ use crate::test_utils::TestRandom; -use crate::typenum::Unsigned; use crate::{EthSpec, FixedVector, SyncSubnetId}; use bls::PublicKeyBytes; use safe_arith::{ArithError, SafeArith}; @@ -46,14 +45,11 @@ pub struct SyncCommittee { impl SyncCommittee { /// Create a temporary sync committee that should *never* be included in a legitimate consensus object. - pub fn temporary() -> Result { - Ok(Self { - pubkeys: FixedVector::new(vec![ - PublicKeyBytes::empty(); - T::SyncCommitteeSize::to_usize() - ])?, + pub fn temporary() -> Self { + Self { + pubkeys: FixedVector::from_elem(PublicKeyBytes::empty()), aggregate_pubkey: PublicKeyBytes::empty(), - }) + } } /// Return the pubkeys in this `SyncCommittee` for the given `subcommittee_index`. diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index 854f718c591..d6c3ff197ae 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -43,6 +43,8 @@ beacon_chain = { workspace = true } store = { workspace = true } malloc_utils = { workspace = true } rayon = { workspace = true } +execution_layer = { workspace = true } +hex = { workspace = true } [package.metadata.cargo-udeps.ignore] normal = ["malloc_utils"] diff --git a/lcli/src/main.rs b/lcli/src/main.rs index 2aa855474fa..17fafe6ec1e 100644 --- a/lcli/src/main.rs +++ b/lcli/src/main.rs @@ -11,6 +11,7 @@ mod indexed_attestations; mod insecure_validators; mod interop_genesis; mod mnemonic_validators; +mod mock_el; mod new_testnet; mod parse_ssz; mod replace_state_pubkeys; @@ -891,6 +892,61 @@ fn main() { .help("Number of repeat runs, useful for benchmarking."), ) ) + .subcommand( + SubCommand::with_name("mock-el") + .about("Creates a mock execution layer server. This is NOT SAFE and should only \ + be used for testing and development on testnets. Do not use in production. Do not \ + use on mainnet. It cannot perform validator duties.") + .arg( + Arg::with_name("jwt-output-path") + .long("jwt-output-path") + .value_name("PATH") + .takes_value(true) + .required(true) + .help("Path to write the JWT secret."), + ) + .arg( + Arg::with_name("listen-address") + .long("listen-address") + .value_name("IP_ADDRESS") + .takes_value(true) + .help("The server will listen on this address.") + .default_value("127.0.0.1") + ) + .arg( + Arg::with_name("listen-port") + .long("listen-port") + .value_name("PORT") + .takes_value(true) + .help("The server will listen on this port.") + .default_value("8551") + ) + .arg( + Arg::with_name("all-payloads-valid") + .long("all-payloads-valid") + .takes_value(true) + .help("Controls the response to newPayload and forkchoiceUpdated. \ + Set to 'true' to return VALID. Set to 'false' to return SYNCING.") + .default_value("false") + .hidden(true) + ) + .arg( + Arg::with_name("shanghai-time") + .long("shanghai-time") + .value_name("UNIX_TIMESTAMP") + .takes_value(true) + .help("The payload timestamp that enables Shanghai. Defaults to the mainnet value.") + .default_value("1681338479") + ) + .arg( + Arg::with_name("cancun-time") + .long("cancun-time") + .value_name("UNIX_TIMESTAMP") + .takes_value(true) + .help("The payload timestamp that enables Cancun. No default is provided \ + until Cancun is triggered on mainnet.") + ) + ) .get_matches(); let result = matches @@ -1032,6 +1088,8 @@ fn run( state_root::run::(env, network_config, matches) .map_err(|e| format!("Failed to run state-root command: {}", e)) } + ("mock-el", Some(matches)) => mock_el::run::(env, matches) + .map_err(|e| format!("Failed to run mock-el command: {}", e)), (other, _) => Err(format!("Unknown subcommand {}. See --help.", other)), } } diff --git a/lcli/src/mock_el.rs b/lcli/src/mock_el.rs new file mode 100644 index 00000000000..094e23c3b40 --- /dev/null +++ b/lcli/src/mock_el.rs @@ -0,0 +1,62 @@ +use clap::ArgMatches; +use clap_utils::{parse_optional, parse_required}; +use environment::Environment; +use execution_layer::{ + auth::JwtKey, + test_utils::{ + Config, MockExecutionConfig, MockServer, DEFAULT_JWT_SECRET, DEFAULT_TERMINAL_BLOCK, + }, +}; +use std::net::Ipv4Addr; +use std::path::PathBuf; +use types::*; + +pub fn run(mut env: Environment, matches: &ArgMatches) -> Result<(), String> { + let jwt_path: PathBuf = parse_required(matches, "jwt-output-path")?; + let listen_addr: Ipv4Addr = parse_required(matches, "listen-address")?; + let listen_port: u16 = parse_required(matches, "listen-port")?; + let all_payloads_valid: bool = parse_required(matches, "all-payloads-valid")?; + let shanghai_time = parse_required(matches, "shanghai-time")?; + let cancun_time = parse_optional(matches, "cancun-time")?; + + let handle = env.core_context().executor.handle().unwrap(); + let spec = &T::default_spec(); + let jwt_key = JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap(); + std::fs::write(jwt_path, hex::encode(DEFAULT_JWT_SECRET)).unwrap(); + + let config = MockExecutionConfig { + server_config: Config { + listen_addr, + listen_port, + }, + jwt_key, + terminal_difficulty: spec.terminal_total_difficulty, + terminal_block: DEFAULT_TERMINAL_BLOCK, + terminal_block_hash: spec.terminal_block_hash, + shanghai_time: Some(shanghai_time), + cancun_time, + }; + let kzg = None; + let server: MockServer = MockServer::new_with_config(&handle, config, kzg); + + if all_payloads_valid { + eprintln!( + "Using --all-payloads-valid=true can be dangerous. \ + Never use this flag when operating validators." + ); + // Indicate that all payloads are valid. + server.all_payloads_valid(); + } + + eprintln!( + "This tool is for TESTING PURPOSES ONLY. Do not use in production or on mainnet. \ + It cannot perform validator duties. It may cause nodes to follow an invalid chain." + ); + eprintln!("Server listening on {}:{}", listen_addr, listen_port); + + let shutdown_reason = env.block_until_shutdown_requested()?; + + eprintln!("Shutting down: {:?}", shutdown_reason); + + Ok(()) +} diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index c8e064e224d..5f75cb1acff 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -2346,7 +2346,10 @@ fn sync_eth1_chain_disable_deposit_contract_sync_flag() { fn light_client_server_default() { CommandLineTest::new() .run_with_zero_port() - .with_config(|config| assert_eq!(config.network.enable_light_client_server, false)); + .with_config(|config| { + assert_eq!(config.network.enable_light_client_server, false); + assert_eq!(config.http_api.enable_light_client_server, false); + }); } #[test] @@ -2354,7 +2357,20 @@ fn light_client_server_enabled() { CommandLineTest::new() .flag("light-client-server", None) .run_with_zero_port() - .with_config(|config| assert_eq!(config.network.enable_light_client_server, true)); + .with_config(|config| { + assert_eq!(config.network.enable_light_client_server, true); + }); +} + +#[test] +fn light_client_http_server_enabled() { + CommandLineTest::new() + .flag("http", None) + .flag("light-client-server", None) + .run_with_zero_port() + .with_config(|config| { + assert_eq!(config.http_api.enable_light_client_server, true); + }); } #[test] diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index 14d65468a78..2fb2ff09a74 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -1,4 +1,4 @@ -use validator_client::Config; +use validator_client::{ApiTopic, Config}; use crate::exec::CommandLineTestExec; use bls::{Keypair, PublicKeyBytes}; @@ -508,20 +508,78 @@ fn monitoring_endpoint() { assert_eq!(api_conf.update_period_secs, Some(30)); }); } + #[test] -fn disable_run_on_all_default() { +fn disable_run_on_all_flag() { + CommandLineTest::new() + .flag("disable-run-on-all", None) + .run() + .with_config(|config| { + assert_eq!(config.broadcast_topics, vec![]); + }); + // --broadcast flag takes precedence + CommandLineTest::new() + .flag("disable-run-on-all", None) + .flag("broadcast", Some("attestations")) + .run() + .with_config(|config| { + assert_eq!(config.broadcast_topics, vec![ApiTopic::Attestations]); + }); +} + +#[test] +fn no_broadcast_flag() { CommandLineTest::new().run().with_config(|config| { - assert!(!config.disable_run_on_all); + assert_eq!(config.broadcast_topics, vec![ApiTopic::Subscriptions]); }); } #[test] -fn disable_run_on_all() { +fn broadcast_flag() { + // "none" variant CommandLineTest::new() - .flag("disable-run-on-all", None) + .flag("broadcast", Some("none")) + .run() + .with_config(|config| { + assert_eq!(config.broadcast_topics, vec![]); + }); + // "none" with other values is ignored + CommandLineTest::new() + .flag("broadcast", Some("none,sync-committee")) .run() .with_config(|config| { - assert!(config.disable_run_on_all); + assert_eq!(config.broadcast_topics, vec![ApiTopic::SyncCommittee]); + }); + // Other valid variants + CommandLineTest::new() + .flag("broadcast", Some("blocks, subscriptions")) + .run() + .with_config(|config| { + assert_eq!( + config.broadcast_topics, + vec![ApiTopic::Blocks, ApiTopic::Subscriptions], + ); + }); + // Omitted "subscription" overrides default + CommandLineTest::new() + .flag("broadcast", Some("attestations")) + .run() + .with_config(|config| { + assert_eq!(config.broadcast_topics, vec![ApiTopic::Attestations]); + }); +} + +#[test] +#[should_panic(expected = "Unknown API topic")] +fn wrong_broadcast_flag() { + CommandLineTest::new() + .flag("broadcast", Some("foo, subscriptions")) + .run() + .with_config(|config| { + assert_eq!( + config.broadcast_topics, + vec![ApiTopic::Blocks, ApiTopic::Subscriptions], + ); }); } diff --git a/testing/node_test_rig/Cargo.toml b/testing/node_test_rig/Cargo.toml index 5fe820d15b6..4696d8d2f1d 100644 --- a/testing/node_test_rig/Cargo.toml +++ b/testing/node_test_rig/Cargo.toml @@ -11,7 +11,7 @@ types = { workspace = true } tempfile = { workspace = true } eth2 = { workspace = true } validator_client = { workspace = true } -validator_dir = { workspace = true } +validator_dir = { workspace = true, features = ["insecure_keys"] } sensitive_url = { workspace = true } execution_layer = { workspace = true } tokio = { workspace = true } diff --git a/testing/node_test_rig/src/lib.rs b/testing/node_test_rig/src/lib.rs index 3f08c837169..6c9af707f57 100644 --- a/testing/node_test_rig/src/lib.rs +++ b/testing/node_test_rig/src/lib.rs @@ -21,7 +21,7 @@ pub use eth2; pub use execution_layer::test_utils::{ Config as MockServerConfig, MockExecutionConfig, MockServer, }; -pub use validator_client::Config as ValidatorConfig; +pub use validator_client::{ApiTopic, Config as ValidatorConfig}; /// The global timeout for HTTP requests to the beacon node. const HTTP_TIMEOUT: Duration = Duration::from_secs(8); diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 57c944cf1a7..953dcf5822f 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -10,7 +10,8 @@ use futures::prelude::*; use node_test_rig::environment::RuntimeContext; use node_test_rig::{ environment::{EnvironmentBuilder, LoggerConfig}, - testing_client_config, testing_validator_config, ClientConfig, ClientGenesis, ValidatorFiles, + testing_client_config, testing_validator_config, ApiTopic, ClientConfig, ClientGenesis, + ValidatorFiles, }; use rayon::prelude::*; use sensitive_url::SensitiveUrl; @@ -159,10 +160,25 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { validator_config.fee_recipient = Some(SUGGESTED_FEE_RECIPIENT.into()); } println!("Adding validator client {}", i); - network_1 - .add_validator_client(validator_config, i, files, i % 2 == 0) - .await - .expect("should add validator"); + + // Enable broadcast on every 4th node. + if i % 4 == 0 { + validator_config.broadcast_topics = ApiTopic::all(); + let beacon_nodes = vec![i, (i + 1) % node_count]; + network_1 + .add_validator_client_with_fallbacks( + validator_config, + i, + beacon_nodes, + files, + ) + .await + } else { + network_1 + .add_validator_client(validator_config, i, files, i % 2 == 0) + .await + } + .expect("should add validator"); }, "vc", ); diff --git a/testing/simulator/src/local_network.rs b/testing/simulator/src/local_network.rs index 1024c46e491..dc8bf0d27dd 100644 --- a/testing/simulator/src/local_network.rs +++ b/testing/simulator/src/local_network.rs @@ -270,6 +270,48 @@ impl LocalNetwork { Ok(()) } + pub async fn add_validator_client_with_fallbacks( + &self, + mut validator_config: ValidatorConfig, + validator_index: usize, + beacon_nodes: Vec, + validator_files: ValidatorFiles, + ) -> Result<(), String> { + let context = self + .context + .service_context(format!("validator_{}", validator_index)); + let self_1 = self.clone(); + let mut beacon_node_urls = vec![]; + for beacon_node in beacon_nodes { + let socket_addr = { + let read_lock = self.beacon_nodes.read(); + let beacon_node = read_lock + .get(beacon_node) + .ok_or_else(|| format!("No beacon node for index {}", beacon_node))?; + beacon_node + .client + .http_api_listen_addr() + .expect("Must have http started") + }; + let beacon_node = SensitiveUrl::parse( + format!("http://{}:{}", socket_addr.ip(), socket_addr.port()).as_str(), + ) + .unwrap(); + beacon_node_urls.push(beacon_node); + } + + validator_config.beacon_nodes = beacon_node_urls; + + let validator_client = LocalValidatorClient::production_with_insecure_keypairs( + context, + validator_config, + validator_files, + ) + .await?; + self_1.validator_clients.write().push(validator_client); + Ok(()) + } + /// For all beacon nodes in `Self`, return a HTTP client to access each nodes HTTP API. pub fn remote_nodes(&self) -> Result, String> { let beacon_nodes = self.beacon_nodes.read(); diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 0b648a81553..90a82b7e3b2 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -61,3 +61,4 @@ malloc_utils = { workspace = true } sysinfo = { workspace = true } system_health = { path = "../common/system_health" } logging = { workspace = true } +strum = { workspace = true } diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index b5bb6702ae2..43b9d60e234 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -1,4 +1,4 @@ -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}; use crate::{ duties_service::{DutiesService, DutyAndProof}, http_metrics::metrics, @@ -433,9 +433,10 @@ impl AttestationService { // Post the attestations to the BN. match self .beacon_nodes - .first_success( + .request( RequireSynced::No, OfflineOnFailure::Yes, + ApiTopic::Attestations, |beacon_node| async move { let _timer = metrics::start_timer_vec( &metrics::ATTESTATION_SERVICE_TIMES, diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 3fce61a55a6..23458d327b9 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -7,6 +7,7 @@ use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_RE use environment::RuntimeContext; use eth2::BeaconNodeHttpClient; use futures::future; +use serde::{Deserialize, Serialize}; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; use std::fmt; @@ -15,6 +16,7 @@ use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; +use strum::{EnumString, EnumVariantNames}; use tokio::{sync::RwLock, time::sleep}; use types::{ChainSpec, Config, EthSpec}; @@ -330,7 +332,7 @@ impl CandidateBeaconNode { pub struct BeaconNodeFallback { candidates: Vec>, slot_clock: Option, - disable_run_on_all: bool, + broadcast_topics: Vec, spec: ChainSpec, log: Logger, } @@ -338,14 +340,14 @@ pub struct BeaconNodeFallback { impl BeaconNodeFallback { pub fn new( candidates: Vec>, - disable_run_on_all: bool, + broadcast_topics: Vec, spec: ChainSpec, log: Logger, ) -> Self { Self { candidates, slot_clock: None, - disable_run_on_all, + broadcast_topics, spec, log, } @@ -579,7 +581,7 @@ impl BeaconNodeFallback { /// It returns a list of errors along with the beacon node id that failed for `func`. /// Since this ignores the actual result of `func`, this function should only be used for beacon /// node calls whose results we do not care about, only that they completed successfully. - pub async fn run_on_all<'a, F, O, Err, R>( + pub async fn broadcast<'a, F, O, Err, R>( &'a self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, @@ -687,11 +689,12 @@ impl BeaconNodeFallback { } /// Call `func` on first beacon node that returns success or on all beacon nodes - /// depending on the value of `disable_run_on_all`. - pub async fn run<'a, F, Err, R>( + /// depending on the `topic` and configuration. + pub async fn request<'a, F, Err, R>( &'a self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, + topic: ApiTopic, func: F, ) -> Result<(), Errors> where @@ -699,13 +702,47 @@ impl BeaconNodeFallback { R: Future>, Err: Debug, { - if self.disable_run_on_all { + if self.broadcast_topics.contains(&topic) { + self.broadcast(require_synced, offline_on_failure, func) + .await + } else { self.first_success(require_synced, offline_on_failure, func) .await?; Ok(()) - } else { - self.run_on_all(require_synced, offline_on_failure, func) - .await } } } + +/// Serves as a cue for `BeaconNodeFallback` to tell which requests need to be broadcasted. +#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumString, EnumVariantNames)] +#[strum(serialize_all = "kebab-case")] +pub enum ApiTopic { + Attestations, + Blocks, + Subscriptions, + SyncCommittee, +} + +impl ApiTopic { + pub fn all() -> Vec { + use ApiTopic::*; + vec![Attestations, Blocks, Subscriptions, SyncCommittee] + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::str::FromStr; + use strum::VariantNames; + + #[test] + fn api_topic_all() { + let all = ApiTopic::all(); + assert_eq!(all.len(), ApiTopic::VARIANTS.len()); + assert!(ApiTopic::VARIANTS + .iter() + .map(|topic| ApiTopic::from_str(topic).unwrap()) + .eq(all.into_iter())); + } +} diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 30f53ffe38d..aeac9e880f5 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,6 +1,6 @@ use crate::beacon_node_fallback::{Error as FallbackError, Errors}; use crate::{ - beacon_node_fallback::{BeaconNodeFallback, RequireSynced}, + beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}, determine_graffiti, graffiti_file::GraffitiFile, OfflineOnFailure, @@ -21,7 +21,6 @@ use std::ops::Deref; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use tokio::time::sleep; use types::{ AbstractExecPayload, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti, PublicKeyBytes, Slot, @@ -57,7 +56,6 @@ pub struct BlockServiceBuilder { context: Option>, graffiti: Option, graffiti_file: Option, - block_delay: Option, } impl BlockServiceBuilder { @@ -70,7 +68,6 @@ impl BlockServiceBuilder { context: None, graffiti: None, graffiti_file: None, - block_delay: None, } } @@ -109,11 +106,6 @@ impl BlockServiceBuilder { self } - pub fn block_delay(mut self, block_delay: Option) -> Self { - self.block_delay = block_delay; - self - } - pub fn build(self) -> Result, String> { Ok(BlockService { inner: Arc::new(Inner { @@ -132,7 +124,6 @@ impl BlockServiceBuilder { proposer_nodes: self.proposer_nodes, graffiti: self.graffiti, graffiti_file: self.graffiti_file, - block_delay: self.block_delay, }), }) } @@ -147,35 +138,41 @@ pub struct ProposerFallback { impl ProposerFallback { // Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`. - pub async fn first_success_try_proposers_first<'a, F, O, Err, R>( + pub async fn request_proposers_first<'a, F, Err, R>( &'a self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, func: F, - ) -> Result> + ) -> Result<(), Errors> where F: Fn(&'a BeaconNodeHttpClient) -> R + Clone, - R: Future>, + R: Future>, Err: Debug, { // If there are proposer nodes, try calling `func` on them and return early if they are successful. if let Some(proposer_nodes) = &self.proposer_nodes { - if let Ok(result) = proposer_nodes - .first_success(require_synced, offline_on_failure, func.clone()) + if proposer_nodes + .request( + require_synced, + offline_on_failure, + ApiTopic::Blocks, + func.clone(), + ) .await + .is_ok() { - return Ok(result); + return Ok(()); } } // If the proposer nodes failed, try on the non-proposer nodes. self.beacon_nodes - .first_success(require_synced, offline_on_failure, func) + .request(require_synced, offline_on_failure, ApiTopic::Blocks, func) .await } // Try `func` on `self.beacon_nodes` first. If that doesn't work, try `self.proposer_nodes`. - pub async fn first_success_try_proposers_last<'a, F, O, Err, R>( + pub async fn request_proposers_last<'a, F, O, Err, R>( &'a self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, @@ -216,7 +213,6 @@ pub struct Inner { context: RuntimeContext, graffiti: Option, graffiti_file: Option, - block_delay: Option, } /// Attempts to produce attestations for any block producer(s) at the start of the epoch. @@ -260,18 +256,7 @@ impl BlockService { executor.spawn( async move { while let Some(notif) = notification_rx.recv().await { - let service = self.clone(); - - if let Some(delay) = service.block_delay { - debug!( - service.context.log(), - "Delaying block production by {}ms", - delay.as_millis() - ); - sleep(delay).await; - } - - service.do_update(notif).await.ok(); + self.do_update(notif).await.ok(); } debug!(log, "Block service shutting down"); }, @@ -346,6 +331,7 @@ impl BlockService { }) .unwrap_or(false); + // TODO produce_block_v3 should be deprecated post deneb if self.validator_store.produce_block_v3() || deneb_fork_activated { for validator_pubkey in proposers { let service: BlockService = self.clone(); @@ -374,6 +360,7 @@ impl BlockService { ) } } else { + // TODO this can be deprecated post deneb for validator_pubkey in proposers { let builder_proposals = self .validator_store @@ -744,7 +731,7 @@ impl BlockService { // Try the proposer nodes last, since it's likely that they don't have a // great view of attestations on the network. let block_contents = proposer_fallback - .first_success_try_proposers_last( + .request_proposers_last( RequireSynced::No, OfflineOnFailure::Yes, move |beacon_node| { diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index e0a80055864..d3325e2b05b 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -26,15 +26,28 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { ) .takes_value(true), ) + // TODO remove this flag in a future release .arg( Arg::with_name("disable-run-on-all") .long("disable-run-on-all") .value_name("DISABLE_RUN_ON_ALL") - .help("By default, Lighthouse publishes attestation, sync committee subscriptions \ + .help("DEPRECATED. Use --broadcast. \ + By default, Lighthouse publishes attestation, sync committee subscriptions \ and proposer preparation messages to all beacon nodes provided in the \ `--beacon-nodes flag`. This option changes that behaviour such that these \ api calls only go out to the first available and synced beacon node") - .takes_value(false) + .takes_value(false), + ) + .arg( + Arg::with_name("broadcast") + .long("broadcast") + .value_name("API_TOPICS") + .help("Comma-separated list of beacon API topics to broadcast to all beacon nodes. \ + Possible values are: none, attestations, blocks, subscriptions, \ + sync-committee. Default (when flag is omitted) is to broadcast \ + subscriptions only." + ) + .takes_value(true), ) .arg( Arg::with_name("validators-dir") @@ -289,13 +302,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { immediately.") .takes_value(false), ) + // TODO deprecate this flag post Deneb .arg( Arg::with_name("builder-proposals") .long("builder-proposals") .alias("private-tx-proposals") .help("If this flag is set, Lighthouse will query the Beacon Node for only block \ headers during proposals and will sign over headers. Useful for outsourcing \ - execution payload construction during proposals.") + execution payload construction during proposals. If the produce-block-v3 flag is present \ + this flag will be ignored. This flag will also be ignored after the Deneb fork.") .takes_value(false), ) .arg( @@ -335,16 +350,4 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("500") .takes_value(true), ) - /* - * Experimental/development options. - */ - .arg( - Arg::with_name("block-delay-ms") - .long("block-delay-ms") - .value_name("MILLIS") - .hidden(true) - .help("Time to delay block production from the start of the slot. Should only be \ - used for testing.") - .takes_value(true), - ) } diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 808f1f805bb..4b7da764287 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -1,3 +1,4 @@ +use crate::beacon_node_fallback::ApiTopic; use crate::graffiti_file::GraffitiFile; use crate::{http_api, http_metrics}; use clap::ArgMatches; @@ -9,11 +10,10 @@ use directory::{ use eth2::types::Graffiti; use sensitive_url::SensitiveUrl; use serde::{Deserialize, Serialize}; -use slog::{info, Logger}; +use slog::{info, warn, Logger}; use std::fs; use std::net::IpAddr; use std::path::PathBuf; -use std::time::Duration; use types::{Address, GRAFFITI_BYTES_LEN}; pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/"; @@ -69,12 +69,8 @@ pub struct Config { /// A list of custom certificates that the validator client will additionally use when /// connecting to a beacon node over SSL/TLS. pub beacon_nodes_tls_certs: Option>, - /// Delay from the start of the slot to wait before publishing a block. - /// - /// This is *not* recommended in prod and should only be used for testing. - pub block_delay: Option, - /// Disables publishing http api requests to all beacon nodes for select api calls. - pub disable_run_on_all: bool, + /// Enables broadcasting of various requests (by topic) to all beacon nodes. + pub broadcast_topics: Vec, /// Enables a service which attempts to measure latency between the VC and BNs. pub enable_latency_measurement_service: bool, /// Defines the number of validators per `validator/register_validator` request sent to the BN. @@ -115,11 +111,10 @@ impl Default for Config { enable_doppelganger_protection: false, enable_high_validator_count_metrics: false, beacon_nodes_tls_certs: None, - block_delay: None, builder_proposals: false, builder_registration_timestamp_override: None, gas_limit: None, - disable_run_on_all: false, + broadcast_topics: vec![ApiTopic::Subscriptions], enable_latency_measurement_service: true, validator_registration_batch_size: 500, produce_block_v3: false, @@ -182,7 +177,6 @@ impl Config { .map_err(|e| format!("Unable to parse proposer node URL: {:?}", e))?; } - config.disable_run_on_all = cli_args.is_present("disable-run-on-all"); config.disable_auto_discover = cli_args.is_present("disable-auto-discover"); config.init_slashing_protection = cli_args.is_present("init-slashing-protection"); config.use_long_timeouts = cli_args.is_present("use-long-timeouts"); @@ -225,6 +219,26 @@ impl Config { config.beacon_nodes_tls_certs = Some(tls_certs.split(',').map(PathBuf::from).collect()); } + if cli_args.is_present("disable-run-on-all") { + warn!( + log, + "The --disable-run-on-all flag is deprecated"; + "msg" => "please use --broadcast instead" + ); + config.broadcast_topics = vec![]; + } + if let Some(broadcast_topics) = cli_args.value_of("broadcast") { + config.broadcast_topics = broadcast_topics + .split(',') + .filter(|t| *t != "none") + .map(|t| { + t.trim() + .parse::() + .map_err(|_| format!("Unknown API topic to broadcast: {t}")) + }) + .collect::>()?; + } + /* * Http API server */ @@ -360,13 +374,6 @@ impl Config { return Err("validator-registration-batch-size cannot be 0".to_string()); } - /* - * Experimental - */ - if let Some(delay_ms) = parse_optional::(cli_args, "block-delay-ms")? { - config.block_delay = Some(Duration::from_millis(delay_ms)); - } - Ok(config) } } diff --git a/validator_client/src/doppelganger_service.rs b/validator_client/src/doppelganger_service.rs index 231cea86c01..86584d794c3 100644 --- a/validator_client/src/doppelganger_service.rs +++ b/validator_client/src/doppelganger_service.rs @@ -163,8 +163,6 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( current_epoch: Epoch, validator_indices: Vec, ) -> LivenessResponses { - let validator_indices = validator_indices.as_slice(); - let previous_epoch = current_epoch.saturating_sub(1_u64); let previous_epoch_responses = if previous_epoch == current_epoch { @@ -180,12 +178,22 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( .first_success( RequireSynced::Yes, OfflineOnFailure::Yes, - |beacon_node| async move { + |beacon_node| async { beacon_node - .post_lighthouse_liveness(validator_indices, previous_epoch) + .post_validator_liveness_epoch(previous_epoch, &validator_indices) .await .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) - .map(|result| result.data) + .map(|result| { + result + .data + .into_iter() + .map(|response| LivenessResponseData { + index: response.index, + epoch: previous_epoch, + is_live: response.is_live, + }) + .collect() + }) }, ) .await @@ -207,12 +215,22 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( .first_success( RequireSynced::Yes, OfflineOnFailure::Yes, - |beacon_node| async move { + |beacon_node| async { beacon_node - .post_lighthouse_liveness(validator_indices, current_epoch) + .post_validator_liveness_epoch(current_epoch, &validator_indices) .await .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) - .map(|result| result.data) + .map(|result| { + result + .data + .into_iter() + .map(|response| LivenessResponseData { + index: response.index, + epoch: current_epoch, + is_live: response.is_live, + }) + .collect() + }) }, ) .await diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 9b9105a6217..26747f81110 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -8,7 +8,7 @@ mod sync; -use crate::beacon_node_fallback::{BeaconNodeFallback, OfflineOnFailure, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, OfflineOnFailure, RequireSynced}; use crate::http_metrics::metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY}; use crate::{ block_service::BlockServiceNotification, @@ -700,9 +700,10 @@ async fn poll_beacon_attesters( let subscriptions_ref = &subscriptions; if let Err(e) = duties_service .beacon_nodes - .run( + .request( RequireSynced::No, OfflineOnFailure::Yes, + ApiTopic::Subscriptions, |beacon_node| async move { let _timer = metrics::start_timer_vec( &metrics::DUTIES_SERVICE_TIMES, diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 6925e285fc5..d52247df4d2 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -19,6 +19,7 @@ pub mod http_api; pub mod initialized_validators; pub mod validator_store; +pub use beacon_node_fallback::ApiTopic; pub use cli::cli_app; pub use config::Config; use initialized_validators::InitializedValidators; @@ -369,14 +370,14 @@ impl ProductionValidatorClient { let mut beacon_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new( candidates, - config.disable_run_on_all, + config.broadcast_topics.clone(), context.eth2_config.spec.clone(), log.clone(), ); let mut proposer_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new( proposer_candidates, - config.disable_run_on_all, + config.broadcast_topics.clone(), context.eth2_config.spec.clone(), log.clone(), ); @@ -471,8 +472,7 @@ impl ProductionValidatorClient { .beacon_nodes(beacon_nodes.clone()) .runtime_context(context.service_context("block".into())) .graffiti(config.graffiti) - .graffiti_file(config.graffiti_file.clone()) - .block_delay(config.block_delay); + .graffiti_file(config.graffiti_file.clone()); // If we have proposer nodes, add them to the block service builder. if proposer_nodes_num > 0 { diff --git a/validator_client/src/preparation_service.rs b/validator_client/src/preparation_service.rs index 2d2221680f9..7aabc7d5abb 100644 --- a/validator_client/src/preparation_service.rs +++ b/validator_client/src/preparation_service.rs @@ -1,4 +1,4 @@ -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}; use crate::validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}; use crate::OfflineOnFailure; use bls::PublicKeyBytes; @@ -342,9 +342,10 @@ impl PreparationService { let preparation_entries = preparation_data.as_slice(); match self .beacon_nodes - .run( + .request( RequireSynced::No, OfflineOnFailure::Yes, + ApiTopic::Subscriptions, |beacon_node| async move { beacon_node .post_validator_prepare_beacon_proposer(preparation_entries) diff --git a/validator_client/src/sync_committee_service.rs b/validator_client/src/sync_committee_service.rs index 8e904e5dd1a..90b62cd3b44 100644 --- a/validator_client/src/sync_committee_service.rs +++ b/validator_client/src/sync_committee_service.rs @@ -1,4 +1,4 @@ -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}; use crate::{ duties_service::DutiesService, validator_store::{Error as ValidatorStoreError, ValidatorStore}, @@ -299,9 +299,10 @@ impl SyncCommitteeService { .collect::>(); self.beacon_nodes - .first_success( + .request( RequireSynced::No, OfflineOnFailure::Yes, + ApiTopic::SyncCommittee, |beacon_node| async move { beacon_node .post_beacon_pool_sync_committee_signatures(committee_signatures) @@ -594,9 +595,10 @@ impl SyncCommitteeService { if let Err(e) = self .beacon_nodes - .run( + .request( RequireSynced::No, OfflineOnFailure::Yes, + ApiTopic::Subscriptions, |beacon_node| async move { beacon_node .post_validator_sync_committee_subscriptions(subscriptions_slice)