diff --git a/Cargo.lock b/Cargo.lock index a681de9ee..e24ebf0da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2048,6 +2048,17 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "delay_map" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df941644b671f05f59433e481ba0d31ac10e3667de725236a4c0d587c496fba1" +dependencies = [ + "futures", + "tokio", + "tokio-util", +] + [[package]] name = "der" version = "0.7.9" @@ -2209,7 +2220,7 @@ dependencies = [ "aes 0.7.5", "aes-gcm 0.9.4", "arrayvec 0.7.6", - "delay_map", + "delay_map 0.3.0", "enr", "fnv", "futures", @@ -5377,6 +5388,7 @@ dependencies = [ "async-trait", "chrono", "clap", + "delay_map 0.4.0", "discv5", "e2store", "env_logger 0.9.3", @@ -5416,7 +5428,7 @@ dependencies = [ "anyhow", "async-trait", "bytes", - "delay_map", + "delay_map 0.4.0", "directories", "discv5", "env_logger 0.9.3", @@ -8507,7 +8519,7 @@ version = "0.1.0-alpha.8" source = "git+https://github.com/ethereum/utp?tag=v0.1.0-alpha.14#d894487d80a650bffd9e511fd9071b05c1ddb40d" dependencies = [ "async-trait", - "delay_map", + "delay_map 0.3.0", "futures", "rand 0.8.5", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 0b322a407..96f79f397 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ async-trait = "0.1.68" bytes = "1.3.0" chrono = "0.4.38" clap = { version = "4.2.1", features = ["derive"] } +delay_map = "0.4.0" directories = "3.0" discv5 = { version = "0.4.1", features = ["serde"] } e2store = { path = "e2store" } diff --git a/portal-bridge/Cargo.toml b/portal-bridge/Cargo.toml index d80f30b93..e745afec6 100644 --- a/portal-bridge/Cargo.toml +++ b/portal-bridge/Cargo.toml @@ -18,6 +18,7 @@ anyhow.workspace = true async-trait.workspace = true chrono.workspace = true clap.workspace = true +delay_map.workspace = true discv5.workspace = true e2store.workspace = true eth_trie.workspace = true diff --git a/portal-bridge/src/bridge/era1.rs b/portal-bridge/src/bridge/era1.rs index 35ce6064c..e1bff9022 100644 --- a/portal-bridge/src/bridge/era1.rs +++ b/portal-bridge/src/bridge/era1.rs @@ -49,7 +49,9 @@ pub struct Era1Bridge { pub era1_files: Vec, pub http_client: Client, pub metrics: BridgeMetricsReporter, - pub gossip_limit: usize, + // Semaphore used to limit the amount of active gossip transfers + // to make sure we don't overwhelm the trin client + pub gossip_semaphore: Arc, pub execution_api: ExecutionApi, } @@ -69,6 +71,7 @@ impl Era1Bridge { .try_into()?; let era1_files = get_shuffled_era1_files(&http_client).await?; let metrics = BridgeMetricsReporter::new("era1".to_string(), &format!("{mode:?}")); + let gossip_semaphore = Arc::new(Semaphore::new(gossip_limit)); Ok(Self { mode, portal_client, @@ -77,7 +80,7 @@ impl Era1Bridge { era1_files, http_client, metrics, - gossip_limit, + gossip_semaphore, execution_api, }) } @@ -228,9 +231,6 @@ impl Era1Bridge { async fn gossip_era1(&self, era1_path: String, gossip_range: Option>, hunt: bool) { info!("Processing era1 file at path: {era1_path:?}"); - // We are using a semaphore to limit the amount of active gossip transfers to make sure - // we don't overwhelm the trin client - let gossip_send_semaphore = Arc::new(Semaphore::new(self.gossip_limit)); let raw_era1 = self .http_client @@ -263,7 +263,8 @@ impl Era1Bridge { continue; } } - let permit = gossip_send_semaphore + let permit = self + .gossip_semaphore .clone() .acquire_owned() .await diff --git a/portal-bridge/src/bridge/history.rs b/portal-bridge/src/bridge/history.rs index 15da61b71..82d26f2ae 100644 --- a/portal-bridge/src/bridge/history.rs +++ b/portal-bridge/src/bridge/history.rs @@ -42,7 +42,9 @@ pub struct HistoryBridge { pub header_oracle: HeaderOracle, pub epoch_acc_path: PathBuf, pub metrics: BridgeMetricsReporter, - pub gossip_limit: usize, + // Semaphore used to limit the amount of active gossip transfers + // to make sure we don't overwhelm the trin client + pub gossip_semaphore: Arc, } impl HistoryBridge { @@ -55,6 +57,7 @@ impl HistoryBridge { gossip_limit: usize, ) -> Self { let metrics = BridgeMetricsReporter::new("history".to_string(), &format!("{mode:?}")); + let gossip_semaphore = Arc::new(Semaphore::new(gossip_limit)); Self { mode, portal_client, @@ -62,7 +65,7 @@ impl HistoryBridge { header_oracle, epoch_acc_path, metrics, - gossip_limit, + gossip_semaphore, } } } @@ -177,10 +180,6 @@ impl HistoryBridge { // epoch_acc gets set on the first iteration of the loop let mut current_epoch_index = u64::MAX; - // We are using a semaphore to limit the amount of active gossip transfers to make sure we - // don't overwhelm the trin client - let gossip_send_semaphore = Arc::new(Semaphore::new(self.gossip_limit)); - info!("fetching headers in range: {gossip_range:?}"); let mut epoch_acc = None; let mut serve_full_block_handles = vec![]; @@ -203,9 +202,7 @@ impl HistoryBridge { } else if height > MERGE_BLOCK_NUMBER { epoch_acc = None; } - let permit = gossip_send_semaphore.clone().acquire_owned().await.expect( - "acquire_owned() can only error on semaphore close, this should be impossible", - ); + let permit = self.acquire_gossip_permit().await; self.metrics.report_current_block(height as i64); serve_full_block_handles.push(Self::spawn_serve_full_block( height, @@ -408,4 +405,12 @@ impl HistoryBridge { metrics.stop_process_timer(timer); result } + + async fn acquire_gossip_permit(&self) -> OwnedSemaphorePermit { + self.gossip_semaphore + .clone() + .acquire_owned() + .await + .expect("to be able to acquire semaphore") + } } diff --git a/portal-bridge/src/bridge/state.rs b/portal-bridge/src/bridge/state.rs index a606711a0..1b3a7a202 100644 --- a/portal-bridge/src/bridge/state.rs +++ b/portal-bridge/src/bridge/state.rs @@ -1,22 +1,19 @@ -use std::{path::PathBuf, sync::Arc}; +use std::sync::{Arc, Mutex}; use alloy_rlp::Decodable; -use e2store::utils::get_shuffled_era1_files; use eth_trie::{decode_node, node::Node, RootWithTrieDiff}; use ethportal_api::{ jsonrpsee::http_client::HttpClient, - types::state_trie::account_state::AccountState as AccountStateInfo, StateContentKey, - StateContentValue, + types::state_trie::account_state::AccountState as AccountStateInfo, ContentValue, Enr, + OverlayContentKey, StateContentKey, StateContentValue, StateNetworkApiClient, }; use revm::Database; use revm_primitives::{keccak256, Bytecode, SpecId, B256}; -use surf::{Client, Config}; use tokio::{ - sync::{OwnedSemaphorePermit, Semaphore}, - task::JoinHandle, + sync::{mpsc, OwnedSemaphorePermit, Semaphore}, time::timeout, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, enabled, error, info, warn, Level}; use trin_execution::{ config::StateConfig, content::{ @@ -31,52 +28,39 @@ use trin_execution::{ utils::full_nibble_path_to_address_hash, }; use trin_metrics::bridge::BridgeMetricsReporter; -use trin_validation::oracle::HeaderOracle; use crate::{ bridge::history::SERVE_BLOCK_TIMEOUT, - gossip::gossip_state_content, + census::{ContentKey, EnrsRequest}, types::mode::{BridgeMode, ModeType}, }; pub struct StateBridge { - pub mode: BridgeMode, - pub portal_client: HttpClient, - pub header_oracle: HeaderOracle, - pub epoch_acc_path: PathBuf, - pub era1_files: Vec, - pub http_client: Client, - pub metrics: BridgeMetricsReporter, - pub gossip_limit_semaphore: Arc, + mode: BridgeMode, + portal_client: HttpClient, + metrics: BridgeMetricsReporter, + // Semaphore used to limit the amount of active offer transfers + // to make sure we don't overwhelm the trin client + offer_semaphore: Arc, + // Used to request all interested enrs in the network from census process. + census_tx: mpsc::UnboundedSender, } impl StateBridge { pub async fn new( mode: BridgeMode, portal_client: HttpClient, - header_oracle: HeaderOracle, - epoch_acc_path: PathBuf, - gossip_limit: usize, + offer_limit: usize, + census_tx: mpsc::UnboundedSender, ) -> anyhow::Result { - let http_client: Client = Config::new() - .add_header("Content-Type", "application/xml") - .expect("to be able to add header") - .try_into()?; - let era1_files = get_shuffled_era1_files(&http_client).await?; let metrics = BridgeMetricsReporter::new("state".to_string(), &format!("{mode:?}")); - - // We are using a semaphore to limit the amount of active gossip transfers to make sure - // we don't overwhelm the trin client - let gossip_limit_semaphore = Arc::new(Semaphore::new(gossip_limit)); + let offer_semaphore = Arc::new(Semaphore::new(offer_limit)); Ok(Self { mode, portal_client, - header_oracle, - epoch_acc_path, - era1_files, - http_client, metrics, - gossip_limit_semaphore, + offer_semaphore, + census_tx, }) } @@ -202,21 +186,9 @@ impl StateBridge { account_proof: &TrieProof, block_hash: B256, ) -> anyhow::Result<()> { - let permit = self - .gossip_limit_semaphore - .clone() - .acquire_owned() - .await - .expect("to be able to acquire semaphore"); let content_key = create_account_content_key(account_proof)?; let content_value = create_account_content_value(block_hash, account_proof)?; - Self::spawn_serve_state_proof( - self.portal_client.clone(), - content_key, - content_value, - permit, - self.metrics.clone(), - ); + self.spawn_offer_tasks(content_key, content_value).await; Ok(()) } @@ -228,21 +200,9 @@ impl StateBridge { code_hash: B256, code: Bytecode, ) -> anyhow::Result<()> { - let permit = self - .gossip_limit_semaphore - .clone() - .acquire_owned() - .await - .expect("to be able to acquire semaphore"); - let code_content_key = create_contract_content_key(address_hash, code_hash)?; - let code_content_value = create_contract_content_value(block_hash, account_proof, code)?; - Self::spawn_serve_state_proof( - self.portal_client.clone(), - code_content_key, - code_content_value, - permit, - self.metrics.clone(), - ); + let content_key = create_contract_content_key(address_hash, code_hash)?; + let content_value = create_contract_content_value(block_hash, account_proof, code)?; + self.spawn_offer_tasks(content_key, content_value).await; Ok(()) } @@ -253,73 +213,143 @@ impl StateBridge { address_hash: B256, block_hash: B256, ) -> anyhow::Result<()> { - let permit = self - .gossip_limit_semaphore - .clone() - .acquire_owned() - .await - .expect("to be able to acquire semaphore"); - let storage_content_key = create_storage_content_key(storage_proof, address_hash)?; - let storage_content_value = - create_storage_content_value(block_hash, account_proof, storage_proof)?; - Self::spawn_serve_state_proof( - self.portal_client.clone(), - storage_content_key, - storage_content_value, - permit, - self.metrics.clone(), - ); + let content_key = create_storage_content_key(storage_proof, address_hash)?; + let content_value = create_storage_content_value(block_hash, account_proof, storage_proof)?; + self.spawn_offer_tasks(content_key, content_value).await; Ok(()) } - fn spawn_serve_state_proof( - portal_client: HttpClient, + // request enrs interested in the content key from Census + async fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result> { + let (resp_tx, resp_rx) = futures::channel::oneshot::channel(); + let enrs_request = EnrsRequest { + content_key: ContentKey::State(content_key.clone()), + resp_tx, + }; + self.census_tx.send(enrs_request)?; + Ok(resp_rx.await?) + } + + // spawn individual offer tasks of the content key for each interested enr found in Census + async fn spawn_offer_tasks( + &self, content_key: StateContentKey, content_value: StateContentValue, - permit: OwnedSemaphorePermit, - metrics: BridgeMetricsReporter, - ) -> JoinHandle<()> { - info!("Spawning serve_state_proof for content key: {content_key}"); - tokio::spawn(async move { - let timer = metrics.start_process_timer("spawn_serve_state_proof"); - match timeout( - SERVE_BLOCK_TIMEOUT, - Self::serve_state_proof( - portal_client, - content_key.clone(), - content_value, - metrics.clone() - )).await + ) { + let Ok(enrs) = self.request_enrs(&content_key).await else { + error!("Failed to request enrs for content key, skipping offer: {content_key:?}"); + return; + }; + let offer_report = Arc::new(Mutex::new(OfferReport::new( + content_key.clone(), + enrs.len(), + ))); + for enr in enrs.clone() { + let permit = self.acquire_offer_permit().await; + let portal_client = self.portal_client.clone(); + let content_key = content_key.clone(); + let content_value = content_value.clone(); + let offer_report = offer_report.clone(); + let metrics = self.metrics.clone(); + tokio::spawn(async move { + let timer = metrics.start_process_timer("spawn_offer_state_proof"); + match timeout( + SERVE_BLOCK_TIMEOUT, + StateNetworkApiClient::trace_offer( + &portal_client, + enr.clone(), + content_key.clone(), + content_value.encode(), + ), + ) + .await { - Ok(result) => match result { - Ok(_) => { - debug!("Done serving state proof: {content_key}"); + Ok(result) => match result { + Ok(result) => { + offer_report + .lock() + .expect("to acquire lock") + .update(&enr, result); + } + Err(e) => { + offer_report + .lock() + .expect("to acquire lock") + .update(&enr, false); + warn!("Error offering to: {enr}, error: {e:?}"); + } + }, + Err(_) => { + offer_report + .lock() + .expect("to acquire lock") + .update(&enr, false); + error!("trace_offer timed out on state proof {content_key}: indicating a bug is present"); } - Err(msg) => warn!("Error serving state proof: {content_key}: {msg:?}"), - }, - Err(_) => error!("serve_state_proof() timed out on state proof {content_key}: this is an indication a bug is present") - }; - drop(permit); - metrics.stop_process_timer(timer); - }) + }; + drop(permit); + metrics.stop_process_timer(timer); + }); + } } - async fn serve_state_proof( - portal_client: HttpClient, - content_key: StateContentKey, - content_value: StateContentValue, - metrics: BridgeMetricsReporter, - ) -> anyhow::Result<()> { - let timer = metrics.start_process_timer("gossip_state_content"); - match gossip_state_content(portal_client, content_key.clone(), content_value).await { - Ok(_) => { - debug!("Successfully gossiped state proof: {content_key}") - } - Err(msg) => { - warn!("Error gossiping state proof: {content_key} {msg}",); - } + async fn acquire_offer_permit(&self) -> OwnedSemaphorePermit { + self.offer_semaphore + .clone() + .acquire_owned() + .await + .expect("to be able to acquire semaphore") + } +} + +// Individual report for outcomes of offering a state content key +struct OfferReport { + content_key: StateContentKey, + total: usize, + success: Vec, + // includes both enrs that rejected offer and failed txs + // in the future we may want to differentiate between these cases + // and return a more detailed report from `trace_offer` + fail: Vec, +} + +impl OfferReport { + fn new(content_key: StateContentKey, total: usize) -> Self { + Self { + content_key, + total, + success: Vec::new(), + fail: Vec::new(), + } + } + + fn update(&mut self, enr: &Enr, success: bool) { + if success { + self.success.push(enr.clone()); + } else { + self.fail.push(enr.clone()); + } + if self.total == self.success.len() + self.fail.len() { + self.report(); + } + } + + fn report(&self) { + if enabled!(Level::DEBUG) { + debug!( + "Successfully offered to {}/{} peers. Content key: {}. Failed: {:?}", + self.success.len(), + self.total, + self.content_key.to_hex(), + self.fail, + ); + } else { + info!( + "Successfully offered to {}/{} peers. Content key: {}", + self.success.len(), + self.total, + self.content_key.to_hex(), + ); } - metrics.stop_process_timer(timer); - Ok(()) } } diff --git a/portal-bridge/src/census.rs b/portal-bridge/src/census.rs new file mode 100644 index 000000000..411186de6 --- /dev/null +++ b/portal-bridge/src/census.rs @@ -0,0 +1,310 @@ +use alloy_primitives::U256; +use anyhow::anyhow; +use delay_map::HashMapDelay; +use discv5::enr::NodeId; +use futures::{channel::oneshot, StreamExt}; +use tokio::{ + sync::mpsc, + time::{Duration, Instant}, +}; +use tracing::{error, info, warn}; + +use ethportal_api::{ + generate_random_remote_enr, + jsonrpsee::http_client::HttpClient, + types::{ + distance::{Distance, Metric, XorMetric}, + portal::PongInfo, + }, + BeaconContentKey, BeaconNetworkApiClient, Enr, HistoryContentKey, HistoryNetworkApiClient, + OverlayContentKey, StateContentKey, StateNetworkApiClient, +}; + +/// Ping delay for liveness check of peers in census +/// Two minutes was chosen somewhat arbitrarily, and can be adjusted +/// in the future based on performance +const LIVENESS_CHECK_DELAY: Duration = Duration::from_secs(120); + +/// The maximum number of enrs to return in a response, +/// limiting the number of OFFER requests spawned by the bridge +/// for each piece of content +const ENRS_RESPONSE_LIMIT: usize = 8; + +/// The census is responsible for maintaining a list of known peers in the network, +/// checking their liveness, updating their data radius, iterating through their +/// rfn to find new peers, and providing interested enrs for a given content key. +pub struct Census { + history: Network, + state: Network, + beacon: Network, + census_rx: mpsc::UnboundedReceiver, +} + +impl Census { + pub fn new(client: HttpClient, census_rx: mpsc::UnboundedReceiver) -> Self { + Self { + history: Network::new(client.clone(), Subnetwork::History), + state: Network::new(client.clone(), Subnetwork::State), + beacon: Network::new(client.clone(), Subnetwork::Beacon), + census_rx, + } + } +} + +impl Census { + pub async fn init(&mut self) { + // currently, the census is only initialized for the state network + // only initialized networks will yield inside `run()` loop + info!("Initializing state network census"); + self.state.init().await; + } + + pub async fn run(&mut self) { + loop { + // Randomly selects between what available task is ready + // and executes it. Ensures that the census will continue + // to update while it handles a stream of enr requests. + tokio::select! { + // handle enrs request + Some(request) = self.census_rx.recv() => { + let enrs = self.get_interested_enrs(request.content_key).await; + if let Err(err) = request.resp_tx.send(enrs) { + error!("Error sending enrs response: {err:?}"); + } + } + Some(Ok(known_enr)) = self.history.peers.next() => { + self.history.process_enr(known_enr.1.0).await; + info!("Updated history census: found peers: {}", self.history.peers.len()); + } + // yield next known state peer and ping for liveness + Some(Ok(known_enr)) = self.state.peers.next() => { + self.state.process_enr(known_enr.1.0).await; + info!("Updated state census: found peers: {}", self.state.peers.len()); + } + Some(Ok(known_enr)) = self.beacon.peers.next() => { + self.beacon.process_enr(known_enr.1.0).await; + info!("Updated beacon census: found peers: {}", self.beacon.peers.len()); + } + } + } + } + + pub async fn get_interested_enrs(&self, content_key: ContentKey) -> Vec { + match content_key { + ContentKey::History(content_key) => { + self.history + .get_interested_enrs(content_key.content_id()) + .await + } + ContentKey::State(content_key) => { + self.state + .get_interested_enrs(content_key.content_id()) + .await + } + ContentKey::Beacon(content_key) => { + self.beacon + .get_interested_enrs(content_key.content_id()) + .await + } + } + } +} + +/// The network struct is responsible for maintaining a list of known peers +/// in the given subnetwork. +struct Network { + peers: HashMapDelay<[u8; 32], (Enr, Distance)>, + client: HttpClient, + subnetwork: Subnetwork, +} + +impl Network { + fn new(client: HttpClient, subnetwork: Subnetwork) -> Self { + Self { + peers: HashMapDelay::new(LIVENESS_CHECK_DELAY), + client, + subnetwork, + } + } + + // We initialize a network with a random rfn lookup to get an initial view of the network + // and then iterate through the rfn of each peer to find new peers. Since this initialization + // blocks the bridge's gossip feature, there is a tradeoff between the time taken to initialize + // the census and the time taken to start gossiping. In the future, we might consider updating + // the initialization process to be considered complete after it has found ~100% of the network + // peers. However, since the census continues to iterate through the peers after initialization, + // the initialization is just to reach a critical mass of peers so that gossip can begin. + async fn init(&mut self) { + let (_, random_enr) = generate_random_remote_enr(); + let Ok(initial_enrs) = self + .subnetwork + .recursive_find_nodes(&self.client, random_enr.node_id()) + .await + else { + panic!("Failed to initialize network census"); + }; + + // if this initialization is too slow, we can consider + // refactoring the peers structure so that it can be + // run in parallel + for enr in initial_enrs { + self.process_enr(enr).await; + } + if self.peers.is_empty() { + panic!( + "Failed to initialize {} census, couldn't find any peers.", + self.subnetwork + ); + } + info!( + "Initialized {} census: found peers: {}", + self.subnetwork, + self.peers.len() + ); + } + + /// Only processes an enr (iterating through its rfn) if the enr's + /// liveness delay has expired + async fn process_enr(&mut self, enr: Enr) { + // ping for liveliness check + if !self.liveness_check(enr.clone()).await { + return; + } + // iterate peers routing table via rfn over various distances + for distance in 245..257 { + let Ok(result) = self + .subnetwork + .find_nodes(&self.client, enr.clone(), vec![distance]) + .await + else { + warn!("Find nodes request failed for enr: {}", enr); + continue; + }; + for found_enr in result { + let _ = self.liveness_check(found_enr).await; + } + } + } + + // Only perform liveness check on enrs if their deadline is up, + // since the same enr might appear multiple times between the + // routing tables of different peers. + async fn liveness_check(&mut self, enr: Enr) -> bool { + // if enr is already registered, check if delay map deadline has expired + if let Some(deadline) = self.peers.deadline(&enr.node_id().raw()) { + if Instant::now() < deadline { + return false; + } + } + + match self.subnetwork.ping(&self.client, enr.clone()).await { + Ok(pong_info) => { + let data_radius = Distance::from(U256::from(pong_info.data_radius)); + self.peers.insert(enr.node_id().raw(), (enr, data_radius)); + true + } + Err(_) => { + self.peers.remove(&enr.node_id().raw()); + false + } + } + } + + // Look up all known interested enrs for a given content id + async fn get_interested_enrs(&self, content_id: [u8; 32]) -> Vec { + if self.peers.is_empty() { + error!( + "No known peers in {} census, unable to offer.", + self.subnetwork + ); + return Vec::new(); + } + self.peers + .iter() + .filter_map(|(node_id, (enr, data_radius))| { + let distance = XorMetric::distance(node_id, &content_id); + if data_radius >= &distance { + Some(enr.clone()) + } else { + None + } + }) + .take(ENRS_RESPONSE_LIMIT) + .collect() + } +} + +/// The subnetwork enum represents the different subnetworks that the census +/// can operate on, and forwards requests to each respective overlay network. +#[derive(Debug)] +enum Subnetwork { + History, + State, + Beacon, +} + +impl std::fmt::Display for Subnetwork { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Subnetwork::History => write!(f, "history"), + Subnetwork::State => write!(f, "state"), + Subnetwork::Beacon => write!(f, "beacon"), + } + } +} + +impl Subnetwork { + async fn ping(&self, client: &HttpClient, enr: Enr) -> anyhow::Result { + let result = match self { + Subnetwork::History => HistoryNetworkApiClient::ping(client, enr).await, + Subnetwork::State => StateNetworkApiClient::ping(client, enr).await, + Subnetwork::Beacon => BeaconNetworkApiClient::ping(client, enr).await, + }; + result.map_err(|e| anyhow!(e)) + } + + async fn find_nodes( + &self, + client: &HttpClient, + enr: Enr, + distances: Vec, + ) -> anyhow::Result> { + let result = match self { + Subnetwork::History => { + HistoryNetworkApiClient::find_nodes(client, enr, distances).await + } + Subnetwork::State => StateNetworkApiClient::find_nodes(client, enr, distances).await, + Subnetwork::Beacon => BeaconNetworkApiClient::find_nodes(client, enr, distances).await, + }; + result.map_err(|e| anyhow!(e)) + } + + async fn recursive_find_nodes( + &self, + client: &HttpClient, + node_id: NodeId, + ) -> anyhow::Result> { + let result = match self { + Subnetwork::History => { + HistoryNetworkApiClient::recursive_find_nodes(client, node_id).await + } + Subnetwork::State => StateNetworkApiClient::recursive_find_nodes(client, node_id).await, + Subnetwork::Beacon => { + BeaconNetworkApiClient::recursive_find_nodes(client, node_id).await + } + }; + result.map_err(|e| anyhow!(e)) + } +} + +pub struct EnrsRequest { + pub content_key: ContentKey, + pub resp_tx: oneshot::Sender>, +} + +#[derive(Debug, Clone)] +pub enum ContentKey { + History(HistoryContentKey), + State(StateContentKey), + Beacon(BeaconContentKey), +} diff --git a/portal-bridge/src/client_handles.rs b/portal-bridge/src/client_handles.rs index 78d5772c5..36fdb6c79 100644 --- a/portal-bridge/src/client_handles.rs +++ b/portal-bridge/src/client_handles.rs @@ -58,6 +58,7 @@ pub fn trin_handle(bridge_config: &BridgeConfig) -> anyhow::Result { command .kill_on_drop(true) .args(["--ephemeral"]) + .args(["--no-upnp"]) .args(["--mb", "0"]) .args(["--web3-transport", "http"]) .args(["--network", bridge_config.network.get_network_name()]) diff --git a/portal-bridge/src/gossip.rs b/portal-bridge/src/gossip.rs index af4222611..ff086c481 100644 --- a/portal-bridge/src/gossip.rs +++ b/portal-bridge/src/gossip.rs @@ -9,14 +9,13 @@ use ethportal_api::{ jsonrpsee::core::Error, types::portal::{ContentInfo, TraceGossipInfo}, BeaconContentKey, BeaconContentValue, BeaconNetworkApiClient, ContentValue, HistoryContentKey, - HistoryContentValue, HistoryNetworkApiClient, OverlayContentKey, StateContentKey, - StateContentValue, StateNetworkApiClient, + HistoryContentValue, HistoryNetworkApiClient, OverlayContentKey, }; const GOSSIP_RETRY_COUNT: u64 = 3; const RETRY_AFTER: Duration = Duration::from_secs(15); -/// Gossip any given content key / value to the history network. +/// Gossip any given content key / value to the beacon network. pub async fn gossip_beacon_content( portal_client: HttpClient, content_key: BeaconContentKey, @@ -162,74 +161,6 @@ async fn history_trace_gossip( }) } -/// Gossip any given content key / value to the state network. -pub async fn gossip_state_content( - portal_client: HttpClient, - content_key: StateContentKey, - content_value: StateContentValue, -) -> anyhow::Result<()> { - // stats are not currently being reported for state content - let _result = tokio::spawn( - state_trace_gossip(portal_client, content_key, content_value).in_current_span(), - ) - .await?; - Ok(()) -} - -async fn state_trace_gossip( - client: HttpClient, - content_key: StateContentKey, - content_value: StateContentValue, -) -> Result { - let mut retries = 0; - let mut traces = vec![]; - let mut found = false; - while retries < GOSSIP_RETRY_COUNT { - let result = StateNetworkApiClient::trace_gossip( - &client, - content_key.clone(), - content_value.encode(), - ) - .await; - // check if content was successfully transferred to at least one peer on network - if let Ok(trace) = result { - traces.push(trace.clone()); - if !trace.transferred.is_empty() { - return Ok(GossipReport { - traces, - retries, - found, - }); - } - } - // if not, make rfc request to see if data is available on network - let result = - StateNetworkApiClient::recursive_find_content(&client, content_key.clone()).await; - if let Ok(ContentInfo::Content { .. }) = result { - debug!("Found content on network, after failing to gossip, aborting gossip. content key={:?}", content_key.to_hex()); - found = true; - return Ok(GossipReport { - traces, - retries, - found, - }); - } - retries += 1; - debug!("Unable to locate content on network, after failing to gossip, retrying in {:?} seconds. content key={:?}", RETRY_AFTER, content_key.to_hex()); - sleep(RETRY_AFTER).await; - } - warn!( - "Failed to gossip state content, without successfully locating data on network, after {} attempts: content key={:?}", - GOSSIP_RETRY_COUNT, - content_key.to_hex(), - ); - Ok(GossipReport { - traces, - retries, - found, - }) -} - pub struct GossipReport { pub traces: Vec, pub retries: u64, diff --git a/portal-bridge/src/lib.rs b/portal-bridge/src/lib.rs index b01008c47..1bde978fb 100644 --- a/portal-bridge/src/lib.rs +++ b/portal-bridge/src/lib.rs @@ -3,6 +3,7 @@ pub mod api; pub mod bridge; +pub mod census; pub mod cli; pub mod client_handles; pub mod constants; diff --git a/portal-bridge/src/main.rs b/portal-bridge/src/main.rs index 84f769131..506477134 100644 --- a/portal-bridge/src/main.rs +++ b/portal-bridge/src/main.rs @@ -1,11 +1,15 @@ use clap::Parser; -use tokio::time::{sleep, Duration}; +use tokio::{ + sync::mpsc, + time::{sleep, Duration}, +}; use tracing::Instrument; use ethportal_api::jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; use portal_bridge::{ api::{consensus::ConsensusApi, execution::ExecutionApi}, bridge::{beacon::BeaconBridge, era1::Era1Bridge, history::HistoryBridge, state::StateBridge}, + census::Census, cli::BridgeConfig, types::{mode::BridgeMode, network::NetworkKind}, }; @@ -40,22 +44,30 @@ async fn main() -> Result<(), Box> { .map_err(|e| e.to_string())?; let mut bridge_tasks = Vec::new(); + let mut census_handle = None; // Launch State Network portal bridge if bridge_config .portal_subnetworks .contains(&NetworkKind::State) { - let bridge_mode = bridge_config.mode.clone(); - let portal_client_clone = portal_client.clone(); - let epoch_acc_path = bridge_config.epoch_acc_path.clone(); - let header_oracle = HeaderOracle::default(); + // Initialize the census + let (census_tx, census_rx) = mpsc::unbounded_channel(); + let mut census = Census::new(portal_client.clone(), census_rx); + // initialize the census to acquire critical threshold view of network before gossiping + census.init().await; + census_handle = Some(tokio::spawn(async move { + census + .run() + .instrument(tracing::trace_span!("census")) + .await; + })); + let state_bridge = StateBridge::new( - bridge_mode, - portal_client_clone, - header_oracle, - epoch_acc_path, + bridge_config.mode.clone(), + portal_client.clone(), bridge_config.gossip_limit, + census_tx, ) .await?; @@ -146,5 +158,8 @@ async fn main() -> Result<(), Box> { futures::future::join_all(bridge_tasks).await; drop(handle); + if let Some(census_handle) = census_handle { + drop(census_handle); + } Ok(()) } diff --git a/portalnet/Cargo.toml b/portalnet/Cargo.toml index 8eddb1b9c..a92167e4e 100644 --- a/portalnet/Cargo.toml +++ b/portalnet/Cargo.toml @@ -15,7 +15,7 @@ alloy-primitives.workspace = true anyhow.workspace = true async-trait.workspace = true bytes.workspace = true -delay_map = "0.3.0" +delay_map.workspace = true directories.workspace = true discv5.workspace = true ethereum_ssz.workspace = true