diff --git a/portal-bridge/src/beacon_bridge.rs b/portal-bridge/src/beacon_bridge.rs index 1d43a34f5..8ce2f54c8 100644 --- a/portal-bridge/src/beacon_bridge.rs +++ b/portal-bridge/src/beacon_bridge.rs @@ -1,10 +1,22 @@ +use std::cmp::Ordering; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use std::time::SystemTime; + +use anyhow::bail; +use jsonrpsee::http_client::HttpClient; +use serde_json::Value; +use ssz_types::VariableList; +use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; +use tracing::{info, warn}; + use crate::consensus_api::ConsensusApi; use crate::constants::BEACON_GENESIS_TIME; use crate::mode::BridgeMode; +use crate::stats::{BeaconSlotStats, StatsReporter}; use crate::utils::{ duration_until_next_update, expected_current_slot, read_test_assets_from_file, TestAssets, }; -use anyhow::bail; use ethportal_api::types::consensus::fork::ForkName; use ethportal_api::types::consensus::light_client::bootstrap::LightClientBootstrapCapella; use ethportal_api::types::consensus::light_client::finality_update::LightClientFinalityUpdateCapella; @@ -12,6 +24,9 @@ use ethportal_api::types::consensus::light_client::optimistic_update::LightClien use ethportal_api::types::consensus::light_client::update::{ LightClientUpdate, LightClientUpdateCapella, }; +use ethportal_api::types::content_key::beacon::{ + LightClientFinalityUpdateKey, LightClientOptimisticUpdateKey, +}; use ethportal_api::types::content_value::beacon::{ ForkVersionedLightClientUpdate, LightClientUpdatesByRange, }; @@ -20,19 +35,6 @@ use ethportal_api::BeaconNetworkApiClient; use ethportal_api::{ BeaconContentKey, BeaconContentValue, LightClientBootstrapKey, LightClientUpdatesByRangeKey, }; -use jsonrpsee::http_client::HttpClient; -use serde_json::Value; -use ssz_types::VariableList; -use std::cmp::Ordering; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::SystemTime; -use tracing::{info, warn}; - -use ethportal_api::types::content_key::beacon::{ - LightClientFinalityUpdateKey, LightClientOptimisticUpdateKey, -}; -use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; pub struct BeaconBridge { pub api: ConsensusApi, @@ -67,11 +69,14 @@ impl BeaconBridge { .into_beacon_assets() .expect("Error parsing beacon test assets."); + // test files have no slot number data, so report all gossiped content at height 0. + let slot_stats = Arc::new(Mutex::new(BeaconSlotStats::new(0))); for asset in assets.0.into_iter() { BeaconBridge::gossip_beacon_content( Arc::clone(&self.portal_clients), asset.content_key, asset.content_value, + slot_stats.clone(), ) .await .expect("Error serving beacon data in test mode."); @@ -135,12 +140,15 @@ impl BeaconBridge { // Serve LightClientBootstrap data let api_clone = api.clone(); let portal_clients_clone = Arc::clone(&portal_clients); + let slot_stats = Arc::new(Mutex::new(BeaconSlotStats::new(finalized_slot))); + let slot_stats_clone = slot_stats.clone(); let bootstrap_result = tokio::spawn(async move { Self::serve_light_client_bootstrap( api_clone, portal_clients_clone, &finalized_block_root, + slot_stats_clone, ) .await .or_else(|err| { @@ -154,24 +162,32 @@ impl BeaconBridge { let api_clone = api.clone(); let portal_clients_clone = Arc::clone(&portal_clients); + let slot_stats_clone = slot_stats.clone(); let update_result = tokio::spawn(async move { - Self::serve_light_client_update(api_clone, portal_clients_clone, current_period) - .await - .or_else(|err| { - warn!("Failed to serve light client update: {err}"); - Ok::(current_period) - }) - .expect("always return the original or new period") + Self::serve_light_client_update( + api_clone, + portal_clients_clone, + current_period, + slot_stats_clone, + ) + .await + .or_else(|err| { + warn!("Failed to serve light client update: {err}"); + Ok::(current_period) + }) + .expect("always return the original or new period") }); // Serve `LightClientFinalityUpdate` data let api_clone = api.clone(); let portal_clients_clone = Arc::clone(&portal_clients); + let slot_stats_clone = slot_stats.clone(); let finalized_slot = tokio::spawn(async move { Self::serve_light_client_finality_update( api_clone, portal_clients_clone, finalized_slot, + slot_stats_clone, ) .await .or_else(|err| { @@ -182,8 +198,11 @@ impl BeaconBridge { }); // Serve `LightClientOptimisticUpdate` data - tokio::spawn(async move { - if let Err(err) = Self::serve_light_client_optimistic_update(api, portal_clients).await + let slot_stats_clone = slot_stats.clone(); + let optimistic_update = tokio::spawn(async move { + if let Err(err) = + Self::serve_light_client_optimistic_update(api, portal_clients, slot_stats_clone) + .await { warn!("Failed to serve light client optimistic update: {err}"); } @@ -196,6 +215,14 @@ impl BeaconBridge { let finalized_slot = finalized_slot .await .expect("finality update task is never cancelled"); + optimistic_update + .await + .expect("optimistic update task is never cancelled"); + if let Ok(stats) = slot_stats.lock() { + stats.report(); + } else { + warn!("Error displaying beacon gossip stats. Unable to acquire lock."); + }; (new_period, new_finalized_block_root, finalized_slot) } @@ -204,6 +231,7 @@ impl BeaconBridge { api: ConsensusApi, portal_clients: Arc>, finalized_block_root: &str, + slot_stats: Arc>, ) -> anyhow::Result { let response = api.get_beacon_block_root("finalized".to_owned()).await?; let response: Value = serde_json::from_str(&response)?; @@ -234,7 +262,7 @@ impl BeaconBridge { }); // Return the latest finalized block root if we successfully gossiped the latest bootstrap. - Self::gossip_beacon_content(portal_clients, content_key, content_value) + Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats) .await .map(|_| latest_finalized_block_root) } @@ -243,6 +271,7 @@ impl BeaconBridge { api: ConsensusApi, portal_clients: Arc>, current_period: u64, + slot_stats: Arc>, ) -> anyhow::Result { let now = SystemTime::now(); let expected_current_period = expected_current_slot(BEACON_GENESIS_TIME, now) / (32 * 256); @@ -282,7 +311,7 @@ impl BeaconBridge { ); // Update the current known period if we successfully gossiped the latest data. - Self::gossip_beacon_content(portal_clients, content_key, content_value).await?; + Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?; Ok(expected_current_period) } @@ -290,6 +319,7 @@ impl BeaconBridge { async fn serve_light_client_optimistic_update( api: ConsensusApi, portal_clients: Arc>, + slot_stats: Arc>, ) -> anyhow::Result<()> { let data = api.get_lc_optimistic_update().await?; let update: Value = serde_json::from_str(&data)?; @@ -304,14 +334,14 @@ impl BeaconBridge { LightClientOptimisticUpdateKey::new(update.signature_slot), ); let content_value = BeaconContentValue::LightClientOptimisticUpdate(update.into()); - - Self::gossip_beacon_content(portal_clients, content_key, content_value).await + Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await } async fn serve_light_client_finality_update( api: ConsensusApi, portal_clients: Arc>, finalized_slot: u64, + slot_stats: Arc>, ) -> anyhow::Result { let data = api.get_lc_finality_update().await?; let update: Value = serde_json::from_str(&data)?; @@ -343,7 +373,7 @@ impl BeaconBridge { ); let content_value = BeaconContentValue::LightClientFinalityUpdate(update.into()); - Self::gossip_beacon_content(portal_clients, content_key, content_value).await?; + Self::gossip_beacon_content(portal_clients, content_key, content_value, slot_stats).await?; Ok(new_finalized_slot) } @@ -353,11 +383,19 @@ impl BeaconBridge { portal_clients: Arc>, content_key: BeaconContentKey, content_value: BeaconContentValue, + slot_stats: Arc>, ) -> anyhow::Result<()> { + let mut results = vec![]; for client in portal_clients.as_ref() { - client - .gossip(content_key.clone(), content_value.clone()) - .await?; + let result = client + .trace_gossip(content_key.clone(), content_value.clone()) + .await; + results.push(result); + } + if let Ok(mut data) = slot_stats.lock() { + data.update(content_key, results.into()); + } else { + warn!("Error updating beacon gossip stats. Unable to acquire lock."); } Ok(()) } diff --git a/portal-bridge/src/bridge.rs b/portal-bridge/src/bridge.rs index bfa076ded..73bfaddf3 100644 --- a/portal-bridge/src/bridge.rs +++ b/portal-bridge/src/bridge.rs @@ -1,40 +1,43 @@ +use std::fs; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use anyhow::{anyhow, bail}; +use futures::stream::StreamExt; +use ssz::Decode; +use surf::{ + middleware::{Middleware, Next}, + Body, Client, Request, Response, +}; +use tokio::time::{sleep, Duration}; +use tracing::{debug, info, warn}; + use crate::execution_api::ExecutionApi; use crate::full_header::FullHeader; use crate::mode::{BridgeMode, ModeType}; +use crate::stats::{HistoryBlockStats, StatsReporter}; use crate::utils::{read_test_assets_from_file, TestAssets}; -use anyhow::{anyhow, bail}; use ethportal_api::jsonrpsee::http_client::HttpClient; -use ethportal_api::types::execution::accumulator::EpochAccumulator; -use ethportal_api::types::execution::block_body::{ - BlockBody, BlockBodyLegacy, BlockBodyMerge, BlockBodyShanghai, MERGE_TIMESTAMP, - SHANGHAI_TIMESTAMP, -}; -use ethportal_api::types::execution::header::{ - AccumulatorProof, BlockHeaderProof, Header, HeaderWithProof, SszNone, +use ethportal_api::types::execution::{ + accumulator::EpochAccumulator, + block_body::{ + BlockBody, BlockBodyLegacy, BlockBodyMerge, BlockBodyShanghai, MERGE_TIMESTAMP, + SHANGHAI_TIMESTAMP, + }, + header::{AccumulatorProof, BlockHeaderProof, Header, HeaderWithProof, SszNone}, + receipts::Receipts, }; -use ethportal_api::types::execution::receipts::Receipts; use ethportal_api::utils::bytes::hex_encode; -use ethportal_api::HistoryContentValue; -use ethportal_api::HistoryNetworkApiClient; use ethportal_api::{ BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, EpochAccumulatorKey, HistoryContentKey, + HistoryContentValue, HistoryNetworkApiClient, }; -use futures::stream::StreamExt; -use ssz::Decode; -use std::fs; -use std::ops::Range; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; -use std::time; -use surf::{ - middleware::{Middleware, Next}, - Body, Client, Request, Response, +use trin_validation::{ + accumulator::MasterAccumulator, + constants::{EPOCH_SIZE as EPOCH_SIZE_USIZE, MERGE_BLOCK_NUMBER}, + oracle::HeaderOracle, }; -use tokio::time::{sleep, Duration}; -use tracing::{debug, info, warn}; -use trin_validation::accumulator::MasterAccumulator; -use trin_validation::constants::{EPOCH_SIZE as EPOCH_SIZE_USIZE, MERGE_BLOCK_NUMBER}; -use trin_validation::oracle::HeaderOracle; // todo: calculate / test optimal saturation delay const HEADER_SATURATION_DELAY: u64 = 10; // seconds @@ -85,14 +88,16 @@ impl Bridge { .into_history_assets() .expect("Error parsing history test assets."); + // test files have no block number data, so we report all gossiped content at height 0. + let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new(0))); for asset in assets.0.into_iter() { Bridge::gossip_content( &self.portal_clients, asset.content_key.clone(), asset.content_value, + block_stats.clone(), ) - .await - .expect("Error serving block range in test mode."); + .await; if let HistoryContentKey::BlockHeaderWithProof(_) = asset.content_key { sleep(Duration::from_millis(50)).await; } @@ -120,9 +125,8 @@ impl Bridge { end: latest_block + 1, }; info!("Discovered new blocks to gossip: {gossip_range:?}"); - let gossip_stats = Arc::new(Mutex::new(GossipStats::new(gossip_range.clone()))); let epoch_acc = None; - self.serve(gossip_range.clone(), epoch_acc, gossip_stats.clone()) + self.serve(gossip_range.clone(), epoch_acc) .await .expect("Error serving block range in latest mode."); block_index = gossip_range.end; @@ -170,7 +174,6 @@ impl Bridge { start: start_block, end: end_block, }; - let gossip_stats = Arc::new(Mutex::new(GossipStats::new(gossip_range.clone()))); while epoch_index <= current_epoch { // Using epoch_size chunks & epoch boundaries ensures that every // "chunk" shares an epoch accumulator avoiding the need to @@ -187,14 +190,9 @@ impl Bridge { None }; info!("fetching headers in range: {gossip_range:?}"); - self.serve(gossip_range, epoch_acc, gossip_stats.clone()) + self.serve(gossip_range, epoch_acc) .await .expect("Error serving headers in backfill mode."); - if let Ok(gossip_stats) = gossip_stats.lock() { - gossip_stats.display_stats(); - } else { - warn!("Error displaying gossip stats. Unable to acquire lock."); - } if !looped { break; } @@ -213,25 +211,18 @@ impl Bridge { &self, gossip_range: Range, epoch_acc: Option>, - gossip_stats: Arc>, ) -> anyhow::Result<()> { let futures = futures::stream::iter(gossip_range.into_iter().map(|height| { let epoch_acc = epoch_acc.clone(); - let gossip_stats = gossip_stats.clone(); async move { let _ = self - .serve_full_block(height, epoch_acc, gossip_stats, self.portal_clients.clone()) + .serve_full_block(height, epoch_acc, self.portal_clients.clone()) .await; } })) .buffer_unordered(FUTURES_BUFFER_SIZE) .collect::>(); futures.await; - if let Ok(gossip_stats) = gossip_stats.lock() { - gossip_stats.display_stats(); - } else { - warn!("Error displaying gossip stats. Unable to acquire lock."); - } Ok(()) } @@ -239,7 +230,6 @@ impl Bridge { &self, height: u64, epoch_acc: Option>, - gossip_stats: Arc>, portal_clients: Vec, ) -> anyhow::Result<()> { debug!("Serving block: {height}"); @@ -247,24 +237,32 @@ impl Bridge { if full_header.header.number <= MERGE_BLOCK_NUMBER { full_header.epoch_acc = epoch_acc; } - Bridge::gossip_header(&full_header, &portal_clients, &gossip_stats).await?; + let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new( + full_header.header.number, + ))); + Bridge::gossip_header(&full_header, &portal_clients, block_stats.clone()).await?; // Sleep for 10 seconds to allow headers to saturate network, // since they must be available for body / receipt validation. sleep(Duration::from_secs(HEADER_SATURATION_DELAY)).await; - self.construct_and_gossip_block_body(&full_header, &portal_clients, &gossip_stats) + self.construct_and_gossip_block_body(&full_header, &portal_clients, block_stats.clone()) .await .map_err(|err| anyhow!("Error gossiping block body #{height:?}: {err:?}"))?; - self.construct_and_gossip_receipt(&full_header, &portal_clients, &gossip_stats) + self.construct_and_gossip_receipt(&full_header, &portal_clients, block_stats.clone()) .await .map_err(|err| anyhow!("Error gossiping receipt #{height:?}: {err:?}"))?; + if let Ok(stats) = block_stats.lock() { + stats.report(); + } else { + warn!("Error displaying history gossip stats. Unable to acquire lock."); + } Ok(()) } async fn gossip_header( full_header: &FullHeader, portal_clients: &Vec, - gossip_stats: &Arc>, + block_stats: Arc>, ) -> anyhow::Result<()> { debug!("Serving header: {}", full_header.header.number); if full_header.header.number < MERGE_BLOCK_NUMBER && full_header.epoch_acc.is_none() { @@ -305,15 +303,8 @@ impl Bridge { "Gossip: Block #{:?} HeaderWithProof", full_header.header.number ); - let result = Bridge::gossip_content(portal_clients, content_key, content_value).await; - if result.is_ok() { - if let Ok(mut data) = gossip_stats.lock() { - data.header_with_proof_count += 1; - } else { - warn!("Error updating gossip header with proof stats. Unable to acquire lock."); - } - } - result + Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await; + Ok(()) } /// Attempt to lookup an epoch accumulator from local portal-accumulators path provided via cli @@ -337,7 +328,15 @@ impl Bridge { // Gossip epoch acc to network if found locally let content_key = HistoryContentKey::EpochAccumulator(EpochAccumulatorKey { epoch_hash }); let content_value = HistoryContentValue::EpochAccumulator(local_epoch_acc.clone()); - let _ = Bridge::gossip_content(&self.portal_clients, content_key, content_value).await; + // create unique stats for epoch accumulator, since it's rarely gossiped + let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new(epoch_index * EPOCH_SIZE))); + Bridge::gossip_content( + &self.portal_clients, + content_key, + content_value, + block_stats, + ) + .await; Ok(Arc::new(local_epoch_acc)) } @@ -345,7 +344,7 @@ impl Bridge { &self, full_header: &FullHeader, portal_clients: &Vec, - gossip_stats: &Arc>, + block_stats: Arc>, ) -> anyhow::Result<()> { debug!("Serving receipt: {:?}", full_header.header.number); let receipts = match full_header.txs.len() { @@ -372,22 +371,15 @@ impl Bridge { }); let content_value = HistoryContentValue::Receipts(receipts); debug!("Gossip: Block #{:?} Receipts", full_header.header.number,); - let result = Bridge::gossip_content(portal_clients, content_key, content_value).await; - if result.is_ok() { - if let Ok(mut data) = gossip_stats.lock() { - data.receipts_count += 1; - } else { - warn!("Error updating gossip receipts stats. Unable to acquire lock."); - } - } - result + Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await; + Ok(()) } async fn construct_and_gossip_block_body( &self, full_header: &FullHeader, portal_clients: &Vec, - gossip_stats: &Arc>, + block_stats: Arc>, ) -> anyhow::Result<()> { let txs = full_header.txs.clone(); let block_body = if full_header.header.timestamp > SHANGHAI_TIMESTAMP { @@ -422,15 +414,8 @@ impl Bridge { }); let content_value = HistoryContentValue::BlockBody(block_body); debug!("Gossip: Block #{:?} BlockBody", full_header.header.number); - let result = Bridge::gossip_content(portal_clients, content_key, content_value).await; - if result.is_ok() { - if let Ok(mut data) = gossip_stats.lock() { - data.bodies_count += 1; - } else { - warn!("Error updating gossip bodies stats. Unable to acquire lock."); - } - } - result + Bridge::gossip_content(portal_clients, content_key, content_value, block_stats).await; + Ok(()) } /// Create a proof for the given header / epoch acc @@ -448,43 +433,21 @@ impl Bridge { portal_clients: &Vec, content_key: HistoryContentKey, content_value: HistoryContentValue, - ) -> anyhow::Result<()> { + block_stats: Arc>, + ) { + let mut results = vec![]; for client in portal_clients { - client - .gossip(content_key.clone(), content_value.clone()) - .await?; + let result = client + .trace_gossip(content_key.clone(), content_value.clone()) + .await; + results.push(result); } - Ok(()) - } -} - -#[derive(Debug, Clone)] -pub struct GossipStats { - pub range: Range, - pub header_with_proof_count: u64, - pub bodies_count: u64, - pub receipts_count: u64, - pub start_time: time::Instant, -} - -impl GossipStats { - fn new(range: Range) -> Self { - Self { - range, - header_with_proof_count: 0, - bodies_count: 0, - receipts_count: 0, - start_time: time::Instant::now(), + if let Ok(mut data) = block_stats.lock() { + data.update(content_key, results.into()); + } else { + warn!("Error updating history gossip stats. Unable to acquire lock."); } } - - fn display_stats(&self) { - info!( - "Header Group: Range {:?} - Header with proof: {:?} - Bodies: {:?} - Receipts: {:?}", - self.range, self.header_with_proof_count, self.bodies_count, self.receipts_count - ); - info!("Group took: {:?}", time::Instant::now() - self.start_time); - } } #[derive(Debug)] diff --git a/portal-bridge/src/lib.rs b/portal-bridge/src/lib.rs index 0c3724392..e12309c01 100644 --- a/portal-bridge/src/lib.rs +++ b/portal-bridge/src/lib.rs @@ -10,6 +10,7 @@ pub mod execution_api; pub mod full_header; pub mod mode; pub mod pandaops; +pub mod stats; pub mod types; pub mod utils; diff --git a/portal-bridge/src/main.rs b/portal-bridge/src/main.rs index 84338ff10..85da90ccf 100644 --- a/portal-bridge/src/main.rs +++ b/portal-bridge/src/main.rs @@ -42,6 +42,8 @@ async fn main() -> Result<(), Box> { .iter() .map(|address| { HttpClientBuilder::default() + // increase default timeout to allow for trace_gossip requests that can take a long time + .request_timeout(Duration::from_secs(120)) .build(address) .map_err(|e| e.to_string()) }) diff --git a/portal-bridge/src/stats.rs b/portal-bridge/src/stats.rs new file mode 100644 index 000000000..de6dd6e1b --- /dev/null +++ b/portal-bridge/src/stats.rs @@ -0,0 +1,332 @@ +use std::str::FromStr; + +use tracing::{info, trace}; + +use ethportal_api::jsonrpsee::core::Error; +use ethportal_api::types::enr::Enr; +use ethportal_api::types::portal::TraceGossipInfo; +use ethportal_api::{BeaconContentKey, HistoryContentKey}; + +// Trait for tracking / reporting gossip stats +pub trait StatsReporter { + fn new(number: u64) -> Self; + + fn report(&self); + + fn update(&mut self, _content_key: TContentKey, _results: ContentStats) {} +} + +// Struct for tracking gossip stats per slot in beacon bridge +#[derive(Debug, Clone, Default)] +pub struct BeaconSlotStats { + pub slot_number: u64, + pub bootstrap: Option, + pub update: Option, + pub finality_update: Option, + pub optimistic_update: Option, +} + +impl StatsReporter for BeaconSlotStats { + fn new(slot_number: u64) -> Self { + Self { + slot_number, + ..Default::default() + } + } + + fn report(&self) { + if let Some(stats) = &self.bootstrap { + info!( + "GossipReport: slot#{} - bootstrap - {}", + self.slot_number, + stats.report() + ); + trace!( + "GossipReport: slot#{} - bootstrap - offered {:?}", + self.slot_number, + stats.offered + ); + trace!( + "GossipReport: slot#{} - bootstrap - accepted {:?}", + self.slot_number, + stats.accepted + ); + trace!( + "GossipReport: slot#{} - bootstrap - transferred {:?}", + self.slot_number, + stats.transferred + ); + } + if let Some(stats) = &self.update { + info!( + "GossipReport: slot#{} - update - {}", + self.slot_number, + stats.report() + ); + trace!( + "GossipReport: slot#{} - update - offered {:?}", + self.slot_number, + stats.offered + ); + trace!( + "GossipReport: slot#{} - update - accepted {:?}", + self.slot_number, + stats.accepted + ); + trace!( + "GossipReport: slot#{} - update - transferred {:?}", + self.slot_number, + stats.transferred + ); + } + if let Some(stats) = &self.finality_update { + info!( + "GossipReport: slot#{} - finality_update - {}", + self.slot_number, + stats.report() + ); + trace!( + "GossipReport: slot#{} - finality_update - offered {:?}", + self.slot_number, + stats.offered + ); + trace!( + "GossipReport: slot#{} - finality_update - accepted {:?}", + self.slot_number, + stats.accepted + ); + trace!( + "GossipReport: slot#{} - finality_update - transferred {:?}", + self.slot_number, + stats.transferred + ); + } + if let Some(stats) = &self.optimistic_update { + info!( + "GossipReport: slot#{} - optimistic_update - {}", + self.slot_number, + stats.report() + ); + trace!( + "GossipReport: slot#{} - optimistic_update - offered {:?}", + self.slot_number, + stats.offered + ); + trace!( + "GossipReport: slot#{} - optimistic_update - accepted {:?}", + self.slot_number, + stats.accepted + ); + trace!( + "GossipReport: slot#{} - optimistic_update - transferred {:?}", + self.slot_number, + stats.transferred + ); + } + } + + fn update(&mut self, content_key: BeaconContentKey, results: ContentStats) { + match content_key { + BeaconContentKey::LightClientBootstrap(_) => { + self.bootstrap = Some(results); + } + BeaconContentKey::LightClientUpdatesByRange(_) => { + self.update = Some(results); + } + BeaconContentKey::LightClientFinalityUpdate(_) => { + self.finality_update = Some(results); + } + BeaconContentKey::LightClientOptimisticUpdate(_) => { + self.optimistic_update = Some(results); + } + } + } +} + +// Struct for tracking gossip stats per block in history bridge +#[derive(Debug, Clone, Default)] +pub struct HistoryBlockStats { + pub block_number: u64, + pub header_with_proof: Option, + pub block_body: Option, + pub receipts: Option, + pub epoch_accumulator: Option, +} + +impl StatsReporter for HistoryBlockStats { + fn new(block_number: u64) -> Self { + Self { + block_number, + ..Default::default() + } + } + + fn report(&self) { + if let Some(stats) = &self.header_with_proof { + info!( + "GossipReport: block#{}: header_with_proof - {}", + self.block_number, + stats.report() + ); + trace!( + "GossipReport: block#{}: header_with_proof - offered {:?}", + self.block_number, + stats.offered + ); + trace!( + "GossipReport: block#{}: header_with_proof - accepted {:?}", + self.block_number, + stats.accepted + ); + trace!( + "GossipReport: block#{}: header_with_proof - transferred {:?}", + self.block_number, + stats.transferred + ); + } + if let Some(stats) = &self.block_body { + info!( + "GossipReport: block#{}: block_body - {}", + self.block_number, + stats.report() + ); + trace!( + "GossipReport: block#{}: block_body - offered {:?}", + self.block_number, + stats.offered + ); + trace!( + "GossipReport: block#{}: block_body - accepted {:?}", + self.block_number, + stats.accepted + ); + trace!( + "GossipReport: block#{}: block_body - transferred {:?}", + self.block_number, + stats.transferred + ); + } + if let Some(stats) = &self.receipts { + info!( + "GossipReport: block#{}: receipts - {}", + self.block_number, + stats.report() + ); + trace!( + "GossipReport: block#{}: receipts - offered {:?}", + self.block_number, + stats.offered + ); + trace!( + "GossipReport: block#{}: receipts - accepted {:?}", + self.block_number, + stats.accepted + ); + trace!( + "GossipReport: block#{}: receipts - transferred {:?}", + self.block_number, + stats.transferred + ); + } + if let Some(stats) = &self.epoch_accumulator { + info!( + "GossipReport: block#{}: epoch_accumulator - {}", + self.block_number, + stats.report() + ); + trace!( + "GossipReport: block#{}: epoch_accumulator - offered {:?}", + self.block_number, + stats.offered + ); + trace!( + "GossipReport: block#{}: epoch_accumulator - accepted {:?}", + self.block_number, + stats.accepted + ); + trace!( + "GossipReport: block#{}: epoch_accumulator - transferred {:?}", + self.block_number, + stats.transferred + ); + } + } + + fn update(&mut self, content_key: HistoryContentKey, results: ContentStats) { + match content_key { + HistoryContentKey::BlockHeaderWithProof(_) => { + self.header_with_proof = Some(results); + } + HistoryContentKey::BlockBody(_) => { + self.block_body = Some(results); + } + HistoryContentKey::BlockReceipts(_) => { + self.receipts = Some(results); + } + HistoryContentKey::EpochAccumulator(_) => { + self.epoch_accumulator = Some(results); + } + } + } +} + +// Struct to record the gossip stats for a single piece of content (eg key/value pair), +// consolidating results from jsonrpc requests to 1/many clients. +#[derive(Debug, Clone, Default)] +pub struct ContentStats { + pub offered: Vec, + pub accepted: Vec, + pub transferred: Vec, + pub failures: u64, +} + +impl ContentStats { + pub fn report(&self) -> String { + format!( + "offered: {}, accepted: {}, transferred: {}, failures: {}", + self.offered.len(), + self.accepted.len(), + self.transferred.len(), + self.failures, + ) + } +} + +impl From>> for ContentStats { + fn from(results: Vec>) -> Self { + let mut content_stats = ContentStats::default(); + for trace_gossip_info in results.iter() { + match trace_gossip_info { + Ok(info) => { + for enr in info.offered.iter() { + let enr = Enr::from_str(enr) + .expect("ENR from trace gossip response to succesfully decode."); + // don't double count an enr if multiple clients offered the same content + // to a peer + if !content_stats.offered.contains(&enr) { + content_stats.offered.push(enr); + } + } + + for enr in info.accepted.iter() { + let enr = Enr::from_str(enr) + .expect("ENR from trace gossip response to succesfully decode."); + if !content_stats.accepted.contains(&enr) { + content_stats.accepted.push(enr); + } + } + + for enr in info.transferred.iter() { + let enr = Enr::from_str(enr) + .expect("ENR from trace gossip response to succesfully decode."); + if !content_stats.transferred.contains(&enr) { + content_stats.transferred.push(enr); + } + } + } + Err(_) => content_stats.failures += 1, + } + } + content_stats + } +}