Skip to content

Commit

Permalink
Fork aware max values in rpc (#6847)
Browse files Browse the repository at this point in the history
N/A


  In #6329 we changed `max_blobs_per_block` from a preset to a config value.
We weren't using the right value based on fork in that PR. This is a follow up PR to use the fork dependent values.

In the proces, I also updated other places where we weren't using fork dependent values from the ChainSpec.

Note to reviewer: easier to go through by commit
  • Loading branch information
pawanjay176 authored Jan 29, 2025
1 parent e7ea696 commit 4a07c08
Show file tree
Hide file tree
Showing 16 changed files with 203 additions and 114 deletions.
61 changes: 30 additions & 31 deletions beacon_node/lighthouse_network/src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,40 +576,26 @@ fn handle_rpc_request<E: EthSpec>(
BlocksByRootRequest::V2(BlocksByRootRequestV2 {
block_roots: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_blocks as usize,
spec.max_request_blocks(current_fork),
)?,
}),
))),
SupportedProtocol::BlocksByRootV1 => Ok(Some(RequestType::BlocksByRoot(
BlocksByRootRequest::V1(BlocksByRootRequestV1 {
block_roots: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_blocks as usize,
spec.max_request_blocks(current_fork),
)?,
}),
))),
SupportedProtocol::BlobsByRangeV1 => {
let req = BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?;
let max_requested_blobs = req
.count
.saturating_mul(spec.max_blobs_per_block_by_fork(current_fork));
// TODO(pawan): change this to max_blobs_per_rpc_request in the alpha10 PR
if max_requested_blobs > spec.max_request_blob_sidecars {
return Err(RPCError::ErrorResponse(
RpcErrorResponse::InvalidRequest,
format!(
"requested exceeded limit. allowed: {}, requested: {}",
spec.max_request_blob_sidecars, max_requested_blobs
),
));
}
Ok(Some(RequestType::BlobsByRange(req)))
}
SupportedProtocol::BlobsByRangeV1 => Ok(Some(RequestType::BlobsByRange(
BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?,
))),
SupportedProtocol::BlobsByRootV1 => {
Ok(Some(RequestType::BlobsByRoot(BlobsByRootRequest {
blob_ids: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_blob_sidecars as usize,
spec.max_request_blob_sidecars(current_fork),
)?,
})))
}
Expand Down Expand Up @@ -1097,21 +1083,21 @@ mod tests {
}
}

fn bbroot_request_v1(spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new_v1(vec![Hash256::zero()], spec)
fn bbroot_request_v1(fork_name: ForkName) -> BlocksByRootRequest {
BlocksByRootRequest::new_v1(vec![Hash256::zero()], &fork_context(fork_name))
}

fn bbroot_request_v2(spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![Hash256::zero()], spec)
fn bbroot_request_v2(fork_name: ForkName) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![Hash256::zero()], &fork_context(fork_name))
}

fn blbroot_request(spec: &ChainSpec) -> BlobsByRootRequest {
fn blbroot_request(fork_name: ForkName) -> BlobsByRootRequest {
BlobsByRootRequest::new(
vec![BlobIdentifier {
block_root: Hash256::zero(),
index: 0,
}],
spec,
&fork_context(fork_name),
)
}

Expand Down Expand Up @@ -1909,29 +1895,42 @@ mod tests {

#[test]
fn test_encode_then_decode_request() {
let chain_spec = Spec::default_spec();
let fork_context = fork_context(ForkName::Electra);
let chain_spec = fork_context.spec.clone();

let requests: &[RequestType<Spec>] = &[
RequestType::Ping(ping_message()),
RequestType::Status(status_message()),
RequestType::Goodbye(GoodbyeReason::Fault),
RequestType::BlocksByRange(bbrange_request_v1()),
RequestType::BlocksByRange(bbrange_request_v2()),
RequestType::BlocksByRoot(bbroot_request_v1(&chain_spec)),
RequestType::BlocksByRoot(bbroot_request_v2(&chain_spec)),
RequestType::MetaData(MetadataRequest::new_v1()),
RequestType::BlobsByRange(blbrange_request()),
RequestType::BlobsByRoot(blbroot_request(&chain_spec)),
RequestType::DataColumnsByRange(dcbrange_request()),
RequestType::DataColumnsByRoot(dcbroot_request(&chain_spec)),
RequestType::MetaData(MetadataRequest::new_v2()),
];

for req in requests.iter() {
for fork_name in ForkName::list_all() {
encode_then_decode_request(req.clone(), fork_name, &chain_spec);
}
}

// Request types that have different length limits depending on the fork
// Handled separately to have consistent `ForkName` across request and responses
let fork_dependent_requests = |fork_name| {
[
RequestType::BlobsByRoot(blbroot_request(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v1(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v2(fork_name)),
]
};
for fork_name in ForkName::list_all() {
let requests = fork_dependent_requests(fork_name);
for req in requests {
encode_then_decode_request(req.clone(), fork_name, &chain_spec);
}
}
}

/// Test a malicious snappy encoding for a V1 `Status` message where the attacker
Expand Down
39 changes: 39 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,45 @@ where
}

let (req, substream) = substream;
let current_fork = self.fork_context.current_fork();
let spec = &self.fork_context.spec;

match &req {
RequestType::BlocksByRange(request) => {
let max_allowed = spec.max_request_blocks(current_fork) as u64;
if *request.count() > max_allowed {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: Protocol::BlocksByRange,
error: RPCError::InvalidData(format!(
"requested exceeded limit. allowed: {}, requested: {}",
max_allowed,
request.count()
)),
}));
return self.shutdown(None);
}
}
RequestType::BlobsByRange(request) => {
let max_requested_blobs = request
.count
.saturating_mul(spec.max_blobs_per_block_by_fork(current_fork));
let max_allowed = spec.max_request_blob_sidecars(current_fork) as u64;
if max_requested_blobs > max_allowed {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: Protocol::BlobsByRange,
error: RPCError::InvalidData(format!(
"requested exceeded limit. allowed: {}, requested: {}",
max_allowed, max_requested_blobs
)),
}));
return self.shutdown(None);
}
}
_ => {}
};

let max_responses =
req.max_responses(self.fork_context.current_fork(), &self.fork_context.spec);

Expand Down
26 changes: 16 additions & 10 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use strum::IntoStaticStr;
use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
use types::ForkName;
use types::{
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar,
Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, Slot,
};
use types::{ForkContext, ForkName};

/// Maximum length of error message.
pub type MaxErrorLen = U256;
Expand Down Expand Up @@ -420,15 +420,19 @@ pub struct BlocksByRootRequest {
}

impl BlocksByRootRequest {
pub fn new(block_roots: Vec<Hash256>, spec: &ChainSpec) -> Self {
let block_roots =
RuntimeVariableList::from_vec(block_roots, spec.max_request_blocks as usize);
pub fn new(block_roots: Vec<Hash256>, fork_context: &ForkContext) -> Self {
let max_request_blocks = fork_context
.spec
.max_request_blocks(fork_context.current_fork());
let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks);
Self::V2(BlocksByRootRequestV2 { block_roots })
}

pub fn new_v1(block_roots: Vec<Hash256>, spec: &ChainSpec) -> Self {
let block_roots =
RuntimeVariableList::from_vec(block_roots, spec.max_request_blocks as usize);
pub fn new_v1(block_roots: Vec<Hash256>, fork_context: &ForkContext) -> Self {
let max_request_blocks = fork_context
.spec
.max_request_blocks(fork_context.current_fork());
let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks);
Self::V1(BlocksByRootRequestV1 { block_roots })
}
}
Expand All @@ -441,9 +445,11 @@ pub struct BlobsByRootRequest {
}

impl BlobsByRootRequest {
pub fn new(blob_ids: Vec<BlobIdentifier>, spec: &ChainSpec) -> Self {
let blob_ids =
RuntimeVariableList::from_vec(blob_ids, spec.max_request_blob_sidecars as usize);
pub fn new(blob_ids: Vec<BlobIdentifier>, fork_context: &ForkContext) -> Self {
let max_request_blob_sidecars = fork_context
.spec
.max_request_blob_sidecars(fork_context.current_fork());
let blob_ids = RuntimeVariableList::from_vec(blob_ids, max_request_blob_sidecars);
Self { blob_ids }
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl<E: EthSpec> Network<E> {

let max_topics = ctx.chain_spec.attestation_subnet_count as usize
+ SYNC_COMMITTEE_SUBNET_COUNT as usize
+ ctx.chain_spec.blob_sidecar_subnet_count_electra as usize
+ ctx.chain_spec.blob_sidecar_subnet_count_max() as usize
+ ctx.chain_spec.data_column_sidecar_subnet_count as usize
+ BASE_CORE_TOPICS.len()
+ ALTAIR_CORE_TOPICS.len()
Expand Down
6 changes: 1 addition & 5 deletions beacon_node/lighthouse_network/src/service/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,7 @@ pub(crate) fn create_whitelist_filter(
for id in 0..sync_committee_subnet_count {
add(SyncCommitteeMessage(SyncSubnetId::new(id)));
}
let blob_subnet_count = if spec.electra_fork_epoch.is_some() {
spec.blob_sidecar_subnet_count_electra
} else {
spec.blob_sidecar_subnet_count
};
let blob_subnet_count = spec.blob_sidecar_subnet_count_max();
for id in 0..blob_subnet_count {
add(BlobSidecar(id));
}
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/types/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> V
ForkName::Deneb => {
// All of deneb blob topics are core topics
let mut deneb_blob_topics = Vec::new();
for i in 0..spec.blob_sidecar_subnet_count {
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Deneb) {
deneb_blob_topics.push(GossipKind::BlobSidecar(i));
}
deneb_blob_topics
}
ForkName::Electra => {
// All of electra blob topics are core topics
let mut electra_blob_topics = Vec::new();
for i in 0..spec.blob_sidecar_subnet_count_electra {
for i in 0..spec.blob_sidecar_subnet_count(ForkName::Electra) {
electra_blob_topics.push(GossipKind::BlobSidecar(i));
}
electra_blob_topics
Expand Down
60 changes: 33 additions & 27 deletions beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::time::sleep;
use types::{
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec,
EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkContext, ForkName, Hash256, MinimalEthSpec,
Signature, SignedBeaconBlock, Slot,
RuntimeVariableList, Signature, SignedBeaconBlock, Slot,
};

type E = MinimalEthSpec;
Expand Down Expand Up @@ -810,17 +810,20 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
.await;

// BlocksByRoot Request
let rpc_request = RequestType::BlocksByRoot(BlocksByRootRequest::new(
vec![
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
],
&spec,
));
let rpc_request =
RequestType::BlocksByRoot(BlocksByRootRequest::V2(BlocksByRootRequestV2 {
block_roots: RuntimeVariableList::from_vec(
vec![
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
],
spec.max_request_blocks_upper_bound(),
),
}));

// BlocksByRoot Response
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
Expand Down Expand Up @@ -953,21 +956,24 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
.await;

// BlocksByRoot Request
let rpc_request = RequestType::BlocksByRoot(BlocksByRootRequest::new(
vec![
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
],
&spec,
));
let rpc_request =
RequestType::BlocksByRoot(BlocksByRootRequest::V2(BlocksByRootRequestV2 {
block_roots: RuntimeVariableList::from_vec(
vec![
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
Hash256::zero(),
],
spec.max_request_blocks_upper_bound(),
),
}));

// BlocksByRoot Response
let full_block = BeaconBlock::Base(BeaconBlockBase::<E>::full(&spec));
Expand Down
18 changes: 0 additions & 18 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,24 +659,6 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"start_slot" => req.start_slot(),
);

// Should not send more than max request blocks
let max_request_size =
self.chain
.epoch()
.map_or(self.chain.spec.max_request_blocks, |epoch| {
if self.chain.spec.fork_name_at_epoch(epoch).deneb_enabled() {
self.chain.spec.max_request_blocks_deneb
} else {
self.chain.spec.max_request_blocks
}
});
if *req.count() > max_request_size {
return Err((
RpcErrorResponse::InvalidRequest,
"Request exceeded max size",
));
}

let forwards_block_root_iter = match self
.chain
.forwards_iter_block_roots(Slot::from(*req.start_slot()))
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock};

/// Handles messages from the network and routes them to the appropriate service to be handled.
pub struct Router<T: BeaconChainTypes> {
Expand Down Expand Up @@ -90,6 +90,7 @@ impl<T: BeaconChainTypes> Router<T> {
invalid_block_storage: InvalidBlockStorage,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
beacon_processor_reprocess_tx: mpsc::Sender<ReprocessQueueMessage>,
fork_context: Arc<ForkContext>,
log: slog::Logger,
) -> Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>, String> {
let message_handler_log = log.new(o!("service"=> "router"));
Expand Down Expand Up @@ -122,6 +123,7 @@ impl<T: BeaconChainTypes> Router<T> {
network_send.clone(),
network_beacon_processor.clone(),
sync_recv,
fork_context,
sync_logger,
);

Expand Down
Loading

0 comments on commit 4a07c08

Please sign in to comment.