Skip to content

Commit

Permalink
Try to remove ugliness
Browse files Browse the repository at this point in the history
  • Loading branch information
pawanjay176 committed Jan 10, 2025
1 parent f13bdfc commit 70917f7
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 63 deletions.
20 changes: 17 additions & 3 deletions beacon_node/lighthouse_network/src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
handle_rpc_request(
self.protocol.versioned_protocol,
&decoded_buffer,
self.fork_context.current_fork(),
&self.fork_context.spec,
)
}
Expand Down Expand Up @@ -552,6 +553,7 @@ fn handle_length(
fn handle_rpc_request<E: EthSpec>(
versioned_protocol: SupportedProtocol,
decoded_buffer: &[u8],
current_fork: ForkName,
spec: &ChainSpec,
) -> Result<Option<RequestType<E>>, RPCError> {
match versioned_protocol {
Expand Down Expand Up @@ -583,9 +585,21 @@ fn handle_rpc_request<E: EthSpec>(
)?,
}),
))),
SupportedProtocol::BlobsByRangeV1 => Ok(Some(RequestType::BlobsByRange(
BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
SupportedProtocol::BlobsByRangeV1 => {
let req = BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?;
// TODO(pawan): change this to max_blobs_per_rpc_request in the alpha10 PR
let max_blobs = spec.max_blobs_per_block_by_fork(current_fork);
if req.count > max_blobs {
return Err(RPCError::ErrorResponse(
RpcErrorResponse::InvalidRequest,
format!(
"requested exceeded limit. allowed: {}, requested: {}",
max_blobs, req.count
),
));
}
Ok(Some(RequestType::BlobsByRange(req)))
}
SupportedProtocol::BlobsByRootV1 => {
Ok(Some(RequestType::BlobsByRoot(BlobsByRootRequest {
blob_ids: RuntimeVariableList::from_ssz_bytes(
Expand Down
6 changes: 4 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,8 @@ where
}

let (req, substream) = substream;
let max_responses = req.max_responses();
let max_responses =
req.max_responses(self.fork_context.current_fork(), &self.fork_context.spec);

// store requests that expect responses
if max_responses > 0 {
Expand Down Expand Up @@ -924,7 +925,8 @@ where
}

// add the stream to substreams if we expect a response, otherwise drop the stream.
let max_responses = request.max_responses();
let max_responses =
request.max_responses(self.fork_context.current_fork(), &self.fork_context.spec);
if max_responses > 0 {
let max_remaining_chunks = if request.expect_exactly_one_response() {
// Currently enforced only for multiple responses
Expand Down
31 changes: 4 additions & 27 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use strum::IntoStaticStr;
use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
use types::ForkName;
use types::{
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar,
Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
Expand All @@ -25,13 +26,6 @@ use types::{
pub type MaxErrorLen = U256;
pub const MAX_ERROR_LEN: u64 = 256;

/// The max number of blobs we expect in the configs to set for compile time params.
/// Note: This value is an estimate that we should use only for rate limiting,
/// bounds checking and other non-consensus critical operations.
///
/// For exact value, we should always check the chainspec.
pub const MAX_BLOBS_PER_BLOCK_CEILING: u64 = 16;

/// Wrapper over SSZ List to represent error message in rpc responses.
#[derive(Debug, Clone)]
pub struct ErrorType(pub VariableList<u8, MaxErrorLen>);
Expand Down Expand Up @@ -334,13 +328,9 @@ pub struct BlobsByRangeRequest {
}

impl BlobsByRangeRequest {
/// This function provides an upper bound on number of blobs expected in
/// a certain slot range.
///
/// Note: **must not** use for anything consensus critical, only for
/// bounds checking and rate limiting.
pub fn max_blobs_requested<E: EthSpec>(&self) -> u64 {
self.count.saturating_mul(MAX_BLOBS_PER_BLOCK_CEILING)
pub fn max_blobs_requested(&self, current_fork: ForkName, spec: &ChainSpec) -> u64 {
let max_blobs_per_block = spec.max_blobs_per_block_by_fork(current_fork);
self.count.saturating_mul(max_blobs_per_block)
}
}

Expand Down Expand Up @@ -863,16 +853,3 @@ impl slog::KV for StatusMessage {
slog::Result::Ok(())
}
}

#[cfg(test)]
mod test {
use super::*;
use types::{ForkName, MainnetEthSpec};

#[test]
fn max_blobs_per_block_ceiling() {
let spec = MainnetEthSpec::default_spec();
let latest_fork = ForkName::latest();
assert!(spec.max_blobs_per_block_by_fork(latest_fork) <= MAX_BLOBS_PER_BLOCK_CEILING);
}
}
5 changes: 3 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,13 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {

let inbound_limiter = inbound_rate_limiter_config.map(|config| {
debug!(log, "Using inbound rate limiting params"; "config" => ?config);
RateLimiter::new_with_config(config.0)
RateLimiter::new_with_config(config.0, fork_context.clone())
.expect("Inbound limiter configuration parameters are valid")
});

let self_limiter = outbound_rate_limiter_config.map(|config| {
SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid")
SelfRateLimiter::new(config, fork_context.clone(), log.clone())
.expect("Configuration parameters are valid")
});

RPC {
Expand Down
20 changes: 12 additions & 8 deletions beacon_node/lighthouse_network/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ pub static SIGNED_BEACON_BLOCK_DENEB_MAX: LazyLock<usize> = LazyLock::new(|| {
*SIGNED_BEACON_BLOCK_CAPELLA_MAX_WITHOUT_PAYLOAD
+ types::ExecutionPayload::<MainnetEthSpec>::max_execution_payload_deneb_size() // adding max size of execution payload (~16gb)
+ ssz::BYTES_PER_LENGTH_OFFSET // Adding the additional offsets for the `ExecutionPayload`
+ (<types::KzgCommitment as Encode>::ssz_fixed_len() * MAX_BLOBS_PER_BLOCK_CEILING as usize)
+ (<types::KzgCommitment as Encode>::ssz_fixed_len() * MainnetEthSpec::default_spec().max_blobs_per_block_by_fork(ForkName::Deneb) as usize)
+ ssz::BYTES_PER_LENGTH_OFFSET
}); // Length offset for the blob commitments field.
//
pub static SIGNED_BEACON_BLOCK_ELECTRA_MAX: LazyLock<usize> = LazyLock::new(|| {
*SIGNED_BEACON_BLOCK_ELECTRA_MAX_WITHOUT_PAYLOAD
+ types::ExecutionPayload::<MainnetEthSpec>::max_execution_payload_electra_size() // adding max size of execution payload (~16gb)
+ ssz::BYTES_PER_LENGTH_OFFSET // Adding the additional ssz offset for the `ExecutionPayload` field
+ (<types::KzgCommitment as Encode>::ssz_fixed_len() * MAX_BLOBS_PER_BLOCK_CEILING as usize)
+ (<types::KzgCommitment as Encode>::ssz_fixed_len() * MainnetEthSpec::default_spec().max_blobs_per_block_by_fork(ForkName::Electra) as usize)
+ ssz::BYTES_PER_LENGTH_OFFSET
}); // Length offset for the blob commitments field.

Expand Down Expand Up @@ -603,8 +603,10 @@ impl ProtocolId {
Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
Protocol::BlobsByRange => rpc_blob_limits::<E>(),
Protocol::BlobsByRoot => rpc_blob_limits::<E>(),
Protocol::DataColumnsByRoot => rpc_data_column_limits::<E>(),
Protocol::DataColumnsByRange => rpc_data_column_limits::<E>(),
Protocol::DataColumnsByRoot => rpc_data_column_limits::<E>(fork_context.current_fork()),
Protocol::DataColumnsByRange => {
rpc_data_column_limits::<E>(fork_context.current_fork())
}
Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(),
<Ping as Encode>::ssz_fixed_len(),
Expand Down Expand Up @@ -684,10 +686,12 @@ pub fn rpc_blob_limits<E: EthSpec>() -> RpcLimits {
}
}

pub fn rpc_data_column_limits<E: EthSpec>() -> RpcLimits {
pub fn rpc_data_column_limits<E: EthSpec>(fork_name: ForkName) -> RpcLimits {
RpcLimits::new(
DataColumnSidecar::<E>::empty().as_ssz_bytes().len(),
DataColumnSidecar::<E>::max_size(MAX_BLOBS_PER_BLOCK_CEILING as usize),
DataColumnSidecar::<E>::max_size(
E::default_spec().max_blobs_per_block_by_fork(fork_name) as usize
),
)
}

Expand Down Expand Up @@ -786,13 +790,13 @@ impl<E: EthSpec> RequestType<E> {
/* These functions are used in the handler for stream management */

/// Maximum number of responses expected for this request.
pub fn max_responses(&self) -> u64 {
pub fn max_responses(&self, current_fork: ForkName, spec: &ChainSpec) -> u64 {
match self {
RequestType::Status(_) => 1,
RequestType::Goodbye(_) => 0,
RequestType::BlocksByRange(req) => *req.count(),
RequestType::BlocksByRoot(req) => req.block_roots().len() as u64,
RequestType::BlobsByRange(req) => req.max_blobs_requested::<E>(),
RequestType::BlobsByRange(req) => req.max_blobs_requested(current_fork, spec),
RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64,
RequestType::DataColumnsByRoot(req) => req.data_column_ids.len() as u64,
RequestType::DataColumnsByRange(req) => req.max_requested::<E>(),
Expand Down
24 changes: 16 additions & 8 deletions beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use serde::{Deserialize, Serialize};
use std::future::Future;
use std::hash::Hash;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::time::Interval;
use types::EthSpec;
use types::{ChainSpec, EthSpec, ForkName, ForkContext};

/// Nanoseconds since a given time.
// Maintained as u64 to reduce footprint
Expand Down Expand Up @@ -109,6 +110,7 @@ pub struct RPCRateLimiter {
lc_finality_update_rl: Limiter<PeerId>,
/// LightClientUpdatesByRange rate limiter.
lc_updates_by_range_rl: Limiter<PeerId>,
fork_context: Arc<ForkContext>,
}

/// Error type for non conformant requests
Expand Down Expand Up @@ -176,7 +178,7 @@ impl RPCRateLimiterBuilder {
self
}

pub fn build(self) -> Result<RPCRateLimiter, &'static str> {
pub fn build(self, fork_context: Arc<ForkContext>) -> Result<RPCRateLimiter, &'static str> {
// get our quotas
let ping_quota = self.ping_quota.ok_or("Ping quota not specified")?;
let metadata_quota = self.metadata_quota.ok_or("MetaData quota not specified")?;
Expand Down Expand Up @@ -253,27 +255,31 @@ impl RPCRateLimiterBuilder {
lc_finality_update_rl,
lc_updates_by_range_rl,
init_time: Instant::now(),
fork_context,
})
}
}

pub trait RateLimiterItem {
fn protocol(&self) -> Protocol;
fn max_responses(&self) -> u64;
fn max_responses(&self, current_fork: ForkName, spec: &ChainSpec) -> u64;
}

impl<E: EthSpec> RateLimiterItem for super::RequestType<E> {
fn protocol(&self) -> Protocol {
self.versioned_protocol().protocol()
}

fn max_responses(&self) -> u64 {
self.max_responses()
fn max_responses(&self, current_fork: ForkName, spec: &ChainSpec) -> u64 {
self.max_responses(current_fork, spec)
}
}

impl RPCRateLimiter {
pub fn new_with_config(config: RateLimiterConfig) -> Result<Self, &'static str> {
pub fn new_with_config(
config: RateLimiterConfig,
fork_context: Arc<ForkContext>,
) -> Result<Self, &'static str> {
// Destructure to make sure every configuration value is used.
let RateLimiterConfig {
ping_quota,
Expand Down Expand Up @@ -316,7 +322,7 @@ impl RPCRateLimiter {
Protocol::LightClientUpdatesByRange,
light_client_updates_by_range_quota,
)
.build()
.build(fork_context)
}

/// Get a builder instance.
Expand All @@ -330,7 +336,9 @@ impl RPCRateLimiter {
request: &Item,
) -> Result<(), RateLimitedErr> {
let time_since_start = self.init_time.elapsed();
let tokens = request.max_responses().max(1);
let tokens = request
.max_responses(self.fork_context.current_fork(), &self.fork_context.spec)
.max(1);

let check =
|limiter: &mut Limiter<PeerId>| limiter.allows(time_since_start, peer_id, tokens);
Expand Down
20 changes: 15 additions & 5 deletions beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
sync::Arc,
task::{Context, Poll},
time::Duration,
};
Expand All @@ -9,7 +10,7 @@ use libp2p::{swarm::NotifyHandler, PeerId};
use slog::{crit, debug, Logger};
use smallvec::SmallVec;
use tokio_util::time::DelayQueue;
use types::EthSpec;
use types::{EthSpec, ForkContext};

use super::{
config::OutboundRateLimiterConfig,
Expand Down Expand Up @@ -50,9 +51,13 @@ pub enum Error {

impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
/// Creates a new [`SelfRateLimiter`] based on configration values.
pub fn new(config: OutboundRateLimiterConfig, log: Logger) -> Result<Self, &'static str> {
pub fn new(
config: OutboundRateLimiterConfig,
fork_context: Arc<ForkContext>,
log: Logger,
) -> Result<Self, &'static str> {
debug!(log, "Using self rate limiting params"; "config" => ?config);
let limiter = RateLimiter::new_with_config(config.0)?;
let limiter = RateLimiter::new_with_config(config.0, fork_context)?;

Ok(SelfRateLimiter {
delayed_requests: Default::default(),
Expand Down Expand Up @@ -215,7 +220,7 @@ mod tests {
use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId};
use libp2p::PeerId;
use std::time::Duration;
use types::MainnetEthSpec;
use types::{EthSpec, ForkContext, Hash256, MainnetEthSpec, Slot};

/// Test that `next_peer_request_ready` correctly maintains the queue.
#[tokio::test]
Expand All @@ -225,8 +230,13 @@ mod tests {
ping_quota: Quota::n_every(1, 2),
..Default::default()
});
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(
Slot::new(0),
Hash256::ZERO,
&MainnetEthSpec::default_spec(),
));
let mut limiter: SelfRateLimiter<RequestId, MainnetEthSpec> =
SelfRateLimiter::new(config, log).unwrap();
SelfRateLimiter::new(config, fork_context, log).unwrap();
let peer_id = PeerId::random();

for i in 1..=5u32 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,14 +890,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"start_slot" => req.start_slot,
);

// Should not send more than max request blocks
if req.max_blobs_requested::<T::EthSpec>() > self.chain.spec.max_request_blob_sidecars {
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`",
));
}

let request_start_slot = Slot::from(req.start_slot);

let data_availability_boundary_slot = match self.chain.data_availability_boundary() {
Expand Down

0 comments on commit 70917f7

Please sign in to comment.