From 70917f7b409112ce8dbeb5d32fe9885096fd623b Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 9 Jan 2025 17:52:06 -0800 Subject: [PATCH] Try to remove ugliness --- .../lighthouse_network/src/rpc/codec.rs | 20 ++++++++++-- .../lighthouse_network/src/rpc/handler.rs | 6 ++-- .../lighthouse_network/src/rpc/methods.rs | 31 +++---------------- beacon_node/lighthouse_network/src/rpc/mod.rs | 5 +-- .../lighthouse_network/src/rpc/protocol.rs | 20 +++++++----- .../src/rpc/rate_limiter.rs | 24 +++++++++----- .../src/rpc/self_limiter.rs | 20 +++++++++--- .../network_beacon_processor/rpc_methods.rs | 8 ----- 8 files changed, 71 insertions(+), 63 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 5d86936d41d..9fa1a3adcca 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -186,6 +186,7 @@ impl Decoder for SSZSnappyInboundCodec { handle_rpc_request( self.protocol.versioned_protocol, &decoded_buffer, + self.fork_context.current_fork(), &self.fork_context.spec, ) } @@ -552,6 +553,7 @@ fn handle_length( fn handle_rpc_request( versioned_protocol: SupportedProtocol, decoded_buffer: &[u8], + current_fork: ForkName, spec: &ChainSpec, ) -> Result>, RPCError> { match versioned_protocol { @@ -583,9 +585,21 @@ fn handle_rpc_request( )?, }), ))), - SupportedProtocol::BlobsByRangeV1 => Ok(Some(RequestType::BlobsByRange( - BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, - ))), + SupportedProtocol::BlobsByRangeV1 => { + let req = BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?; + // TODO(pawan): change this to max_blobs_per_rpc_request in the alpha10 PR + let max_blobs = spec.max_blobs_per_block_by_fork(current_fork); + if req.count > max_blobs { + return Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + format!( + "requested exceeded limit. allowed: {}, requested: {}", + max_blobs, req.count + ), + )); + } + Ok(Some(RequestType::BlobsByRange(req))) + } SupportedProtocol::BlobsByRootV1 => { Ok(Some(RequestType::BlobsByRoot(BlobsByRootRequest { blob_ids: RuntimeVariableList::from_ssz_bytes( diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 0a0a6ca754f..3a008df023d 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -855,7 +855,8 @@ where } let (req, substream) = substream; - let max_responses = req.max_responses(); + let max_responses = + req.max_responses(self.fork_context.current_fork(), &self.fork_context.spec); // store requests that expect responses if max_responses > 0 { @@ -924,7 +925,8 @@ where } // add the stream to substreams if we expect a response, otherwise drop the stream. - let max_responses = request.max_responses(); + let max_responses = + request.max_responses(self.fork_context.current_fork(), &self.fork_context.spec); if max_responses > 0 { let max_remaining_chunks = if request.expect_exactly_one_response() { // Currently enforced only for multiple responses diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index e35af6fb40a..500188beefb 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -15,6 +15,7 @@ use strum::IntoStaticStr; use superstruct::superstruct; use types::blob_sidecar::BlobIdentifier; use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES; +use types::ForkName; use types::{ blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate, @@ -25,13 +26,6 @@ use types::{ pub type MaxErrorLen = U256; pub const MAX_ERROR_LEN: u64 = 256; -/// The max number of blobs we expect in the configs to set for compile time params. -/// Note: This value is an estimate that we should use only for rate limiting, -/// bounds checking and other non-consensus critical operations. -/// -/// For exact value, we should always check the chainspec. -pub const MAX_BLOBS_PER_BLOCK_CEILING: u64 = 16; - /// Wrapper over SSZ List to represent error message in rpc responses. #[derive(Debug, Clone)] pub struct ErrorType(pub VariableList); @@ -334,13 +328,9 @@ pub struct BlobsByRangeRequest { } impl BlobsByRangeRequest { - /// This function provides an upper bound on number of blobs expected in - /// a certain slot range. - /// - /// Note: **must not** use for anything consensus critical, only for - /// bounds checking and rate limiting. - pub fn max_blobs_requested(&self) -> u64 { - self.count.saturating_mul(MAX_BLOBS_PER_BLOCK_CEILING) + pub fn max_blobs_requested(&self, current_fork: ForkName, spec: &ChainSpec) -> u64 { + let max_blobs_per_block = spec.max_blobs_per_block_by_fork(current_fork); + self.count.saturating_mul(max_blobs_per_block) } } @@ -863,16 +853,3 @@ impl slog::KV for StatusMessage { slog::Result::Ok(()) } } - -#[cfg(test)] -mod test { - use super::*; - use types::{ForkName, MainnetEthSpec}; - - #[test] - fn max_blobs_per_block_ceiling() { - let spec = MainnetEthSpec::default_spec(); - let latest_fork = ForkName::latest(); - assert!(spec.max_blobs_per_block_by_fork(latest_fork) <= MAX_BLOBS_PER_BLOCK_CEILING); - } -} diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 7d091da7660..03f1395b8b5 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -181,12 +181,13 @@ impl RPC { let inbound_limiter = inbound_rate_limiter_config.map(|config| { debug!(log, "Using inbound rate limiting params"; "config" => ?config); - RateLimiter::new_with_config(config.0) + RateLimiter::new_with_config(config.0, fork_context.clone()) .expect("Inbound limiter configuration parameters are valid") }); let self_limiter = outbound_rate_limiter_config.map(|config| { - SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid") + SelfRateLimiter::new(config, fork_context.clone(), log.clone()) + .expect("Configuration parameters are valid") }); RPC { diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index e4e8232e18f..a7c4524a2e1 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -93,7 +93,7 @@ pub static SIGNED_BEACON_BLOCK_DENEB_MAX: LazyLock = LazyLock::new(|| { *SIGNED_BEACON_BLOCK_CAPELLA_MAX_WITHOUT_PAYLOAD + types::ExecutionPayload::::max_execution_payload_deneb_size() // adding max size of execution payload (~16gb) + ssz::BYTES_PER_LENGTH_OFFSET // Adding the additional offsets for the `ExecutionPayload` - + (::ssz_fixed_len() * MAX_BLOBS_PER_BLOCK_CEILING as usize) + + (::ssz_fixed_len() * MainnetEthSpec::default_spec().max_blobs_per_block_by_fork(ForkName::Deneb) as usize) + ssz::BYTES_PER_LENGTH_OFFSET }); // Length offset for the blob commitments field. // @@ -101,7 +101,7 @@ pub static SIGNED_BEACON_BLOCK_ELECTRA_MAX: LazyLock = LazyLock::new(|| { *SIGNED_BEACON_BLOCK_ELECTRA_MAX_WITHOUT_PAYLOAD + types::ExecutionPayload::::max_execution_payload_electra_size() // adding max size of execution payload (~16gb) + ssz::BYTES_PER_LENGTH_OFFSET // Adding the additional ssz offset for the `ExecutionPayload` field - + (::ssz_fixed_len() * MAX_BLOBS_PER_BLOCK_CEILING as usize) + + (::ssz_fixed_len() * MainnetEthSpec::default_spec().max_blobs_per_block_by_fork(ForkName::Electra) as usize) + ssz::BYTES_PER_LENGTH_OFFSET }); // Length offset for the blob commitments field. @@ -603,8 +603,10 @@ impl ProtocolId { Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlobsByRange => rpc_blob_limits::(), Protocol::BlobsByRoot => rpc_blob_limits::(), - Protocol::DataColumnsByRoot => rpc_data_column_limits::(), - Protocol::DataColumnsByRange => rpc_data_column_limits::(), + Protocol::DataColumnsByRoot => rpc_data_column_limits::(fork_context.current_fork()), + Protocol::DataColumnsByRange => { + rpc_data_column_limits::(fork_context.current_fork()) + } Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -684,10 +686,12 @@ pub fn rpc_blob_limits() -> RpcLimits { } } -pub fn rpc_data_column_limits() -> RpcLimits { +pub fn rpc_data_column_limits(fork_name: ForkName) -> RpcLimits { RpcLimits::new( DataColumnSidecar::::empty().as_ssz_bytes().len(), - DataColumnSidecar::::max_size(MAX_BLOBS_PER_BLOCK_CEILING as usize), + DataColumnSidecar::::max_size( + E::default_spec().max_blobs_per_block_by_fork(fork_name) as usize + ), ) } @@ -786,13 +790,13 @@ impl RequestType { /* These functions are used in the handler for stream management */ /// Maximum number of responses expected for this request. - pub fn max_responses(&self) -> u64 { + pub fn max_responses(&self, current_fork: ForkName, spec: &ChainSpec) -> u64 { match self { RequestType::Status(_) => 1, RequestType::Goodbye(_) => 0, RequestType::BlocksByRange(req) => *req.count(), RequestType::BlocksByRoot(req) => req.block_roots().len() as u64, - RequestType::BlobsByRange(req) => req.max_blobs_requested::(), + RequestType::BlobsByRange(req) => req.max_blobs_requested(current_fork, spec), RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64, RequestType::DataColumnsByRoot(req) => req.data_column_ids.len() as u64, RequestType::DataColumnsByRange(req) => req.max_requested::(), diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index ecbacc8c112..f9e66504aa8 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -6,10 +6,11 @@ use serde::{Deserialize, Serialize}; use std::future::Future; use std::hash::Hash; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use tokio::time::Interval; -use types::EthSpec; +use types::{ChainSpec, EthSpec, ForkName, ForkContext}; /// Nanoseconds since a given time. // Maintained as u64 to reduce footprint @@ -109,6 +110,7 @@ pub struct RPCRateLimiter { lc_finality_update_rl: Limiter, /// LightClientUpdatesByRange rate limiter. lc_updates_by_range_rl: Limiter, + fork_context: Arc, } /// Error type for non conformant requests @@ -176,7 +178,7 @@ impl RPCRateLimiterBuilder { self } - pub fn build(self) -> Result { + pub fn build(self, fork_context: Arc) -> Result { // get our quotas let ping_quota = self.ping_quota.ok_or("Ping quota not specified")?; let metadata_quota = self.metadata_quota.ok_or("MetaData quota not specified")?; @@ -253,13 +255,14 @@ impl RPCRateLimiterBuilder { lc_finality_update_rl, lc_updates_by_range_rl, init_time: Instant::now(), + fork_context, }) } } pub trait RateLimiterItem { fn protocol(&self) -> Protocol; - fn max_responses(&self) -> u64; + fn max_responses(&self, current_fork: ForkName, spec: &ChainSpec) -> u64; } impl RateLimiterItem for super::RequestType { @@ -267,13 +270,16 @@ impl RateLimiterItem for super::RequestType { self.versioned_protocol().protocol() } - fn max_responses(&self) -> u64 { - self.max_responses() + fn max_responses(&self, current_fork: ForkName, spec: &ChainSpec) -> u64 { + self.max_responses(current_fork, spec) } } impl RPCRateLimiter { - pub fn new_with_config(config: RateLimiterConfig) -> Result { + pub fn new_with_config( + config: RateLimiterConfig, + fork_context: Arc, + ) -> Result { // Destructure to make sure every configuration value is used. let RateLimiterConfig { ping_quota, @@ -316,7 +322,7 @@ impl RPCRateLimiter { Protocol::LightClientUpdatesByRange, light_client_updates_by_range_quota, ) - .build() + .build(fork_context) } /// Get a builder instance. @@ -330,7 +336,9 @@ impl RPCRateLimiter { request: &Item, ) -> Result<(), RateLimitedErr> { let time_since_start = self.init_time.elapsed(); - let tokens = request.max_responses().max(1); + let tokens = request + .max_responses(self.fork_context.current_fork(), &self.fork_context.spec) + .max(1); let check = |limiter: &mut Limiter| limiter.allows(time_since_start, peer_id, tokens); diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index e968ad11e3d..e0c8593f29f 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -1,5 +1,6 @@ use std::{ collections::{hash_map::Entry, HashMap, VecDeque}, + sync::Arc, task::{Context, Poll}, time::Duration, }; @@ -9,7 +10,7 @@ use libp2p::{swarm::NotifyHandler, PeerId}; use slog::{crit, debug, Logger}; use smallvec::SmallVec; use tokio_util::time::DelayQueue; -use types::EthSpec; +use types::{EthSpec, ForkContext}; use super::{ config::OutboundRateLimiterConfig, @@ -50,9 +51,13 @@ pub enum Error { impl SelfRateLimiter { /// Creates a new [`SelfRateLimiter`] based on configration values. - pub fn new(config: OutboundRateLimiterConfig, log: Logger) -> Result { + pub fn new( + config: OutboundRateLimiterConfig, + fork_context: Arc, + log: Logger, + ) -> Result { debug!(log, "Using self rate limiting params"; "config" => ?config); - let limiter = RateLimiter::new_with_config(config.0)?; + let limiter = RateLimiter::new_with_config(config.0, fork_context)?; Ok(SelfRateLimiter { delayed_requests: Default::default(), @@ -215,7 +220,7 @@ mod tests { use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId}; use libp2p::PeerId; use std::time::Duration; - use types::MainnetEthSpec; + use types::{EthSpec, ForkContext, Hash256, MainnetEthSpec, Slot}; /// Test that `next_peer_request_ready` correctly maintains the queue. #[tokio::test] @@ -225,8 +230,13 @@ mod tests { ping_quota: Quota::n_every(1, 2), ..Default::default() }); + let fork_context = std::sync::Arc::new(ForkContext::new::( + Slot::new(0), + Hash256::ZERO, + &MainnetEthSpec::default_spec(), + )); let mut limiter: SelfRateLimiter = - SelfRateLimiter::new(config, log).unwrap(); + SelfRateLimiter::new(config, fork_context, log).unwrap(); let peer_id = PeerId::random(); for i in 1..=5u32 { diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index c4944078fef..b4f19f668d7 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -890,14 +890,6 @@ impl NetworkBeaconProcessor { "start_slot" => req.start_slot, ); - // Should not send more than max request blocks - if req.max_blobs_requested::() > self.chain.spec.max_request_blob_sidecars { - return Err(( - RpcErrorResponse::InvalidRequest, - "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", - )); - } - let request_start_slot = Slot::from(req.start_slot); let data_availability_boundary_slot = match self.chain.data_availability_boundary() {