From 0e96fdf0ff7d37f571a2495cbcb3967f90f31250 Mon Sep 17 00:00:00 2001 From: fbrv Date: Fri, 19 Jul 2024 19:13:23 +0100 Subject: [PATCH] remove unused code --- src/common/client.rs | 229 +------------------------------------ src/common/error.rs | 21 ---- src/constants.rs | 1 - src/forward_service.rs | 42 ++++--- src/lookahead/manager.rs | 8 +- src/lookahead/mod.rs | 45 +------- src/lookahead/provider.rs | 7 +- src/main.rs | 2 +- src/preconf/constraints.rs | 15 --- src/preconf/election.rs | 8 -- src/relay_client/client.rs | 29 +---- src/relay_client/error.rs | 5 - src/ssz/mod.rs | 1 - 13 files changed, 43 insertions(+), 370 deletions(-) diff --git a/src/common/client.rs b/src/common/client.rs index eea86c7..bb098c4 100644 --- a/src/common/client.rs +++ b/src/common/client.rs @@ -6,32 +6,13 @@ use std::{ time::Duration, }; -use alloy::{ - primitives::B256, - rpc::types::beacon::events::{HeadEvent, PayloadAttributesEvent}, -}; -use futures::{future::join_all, StreamExt}; +use alloy::rpc::types::beacon::events::HeadEvent; +use futures::StreamExt; use reqwest_eventsource::EventSource; -use tokio::{ - sync::{ - broadcast::{self, Sender}, - mpsc::UnboundedSender, - }, - task::JoinError, - time::sleep, -}; +use tokio::{sync::broadcast::Sender, time::sleep}; use tracing::{debug, error, warn}; use url::Url; -use super::{ - error::BeaconClientError, - types::{ApiResult, BeaconResponse, ProposerDuty, SyncStatus}, -}; -use crate::constants::EPOCH_SLOTS; - -const BEACON_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); -const PROPOSER_DUTIES_REFRESH_FREQ: u64 = EPOCH_SLOTS / 4; - /// Handles communication with multiple `BeaconClient` instances. /// Load balances requests. #[derive(Clone)] @@ -61,84 +42,6 @@ impl MultiBeaconClient { Self::new(clients) } - /// Retrieves the sync status from multiple beacon clients and selects the best one. - /// - /// The function spawns async tasks to fetch the sync status from each beacon client. - /// It then selects the sync status with the highest `head_slot`. - pub async fn best_sync_status(&self) -> Result { - let clients = self.beacon_clients_by_last_response(); - - let handles = clients - .into_iter() - .map(|(_, client)| tokio::spawn(async move { client.sync_status().await })) - .collect::>(); - - let results: Vec, JoinError>> = - join_all(handles).await; - - let mut best_sync_status: Option = None; - for join_result in results { - match join_result { - Ok(sync_status_result) => match sync_status_result { - Ok(sync_status) => { - if best_sync_status.as_ref().map_or(true, |current_best| { - current_best.head_slot < sync_status.head_slot - }) { - best_sync_status = Some(sync_status); - } - } - Err(err) => warn!("Failed to get sync status: {err:?}"), - }, - Err(join_err) => { - error!("Tokio join error for best_sync_status: {join_err:?}") - } - } - } - - best_sync_status.ok_or(BeaconClientError::BeaconNodeUnavailable) - } - - pub async fn get_proposer_duties( - &self, - epoch: u64, - ) -> Result<(B256, Vec), BeaconClientError> { - let clients = self.beacon_clients_by_last_response(); - let mut last_error = None; - - for (i, client) in clients.into_iter() { - match client.get_proposer_duties(epoch).await { - Ok(proposer_duties) => { - self.best_beacon_instance.store(i, Ordering::Relaxed); - return Ok(proposer_duties); - } - Err(err) => { - last_error = Some(err); - } - } - } - - Err(last_error.unwrap_or(BeaconClientError::BeaconNodeUnavailable)) - } - - /// `subscribe_to_payload_attributes_events` subscribes to payload attributes events from all - /// beacon nodes. - /// - /// This function swaps async tasks for all beacon clients. Therefore, - /// a single payload event will be received multiple times, likely once for every beacon node. - pub async fn subscribe_to_payload_attributes_events( - &self, - chan: Sender, - ) { - let clients = self.beacon_clients_by_last_response(); - - for (_, client) in clients { - let chan = chan.clone(); - tokio::spawn(async move { - client.subscribe_to_payload_attributes_events(chan).await; - }); - } - } - /// `subscribe_to_head_events` subscribes to head events from all beacon nodes. /// /// This function swaps async tasks for all beacon clients. Therefore, @@ -154,27 +57,6 @@ impl MultiBeaconClient { } } - /// `subscribe_to_proposer_duties` listens to new `PayloadAttributesEvent`s through `rx`. - /// Fetches the chain proposer duties every 8 slots and sends them down `tx`. - pub async fn subscribe_to_proposer_duties( - self, - tx: UnboundedSender>, - mut rx: broadcast::Receiver, - ) { - let mut last_updated_slot = 0; - - while let Ok(payload) = rx.recv().await { - let new_slot = payload.data.proposal_slot; - - if last_updated_slot == 0 || - (new_slot > last_updated_slot && new_slot % PROPOSER_DUTIES_REFRESH_FREQ == 0) - { - last_updated_slot = new_slot; - tokio::spawn(fetch_and_send_duties_for_slot(new_slot, tx.clone(), self.clone())); - } - } - } - /// Returns a list of beacon clients, prioritized by the last successful response. /// /// The beacon client with the most recent successful response is placed at the @@ -193,63 +75,17 @@ impl MultiBeaconClient { /// Handles communication to a single beacon client url. #[derive(Clone, Debug)] pub struct BeaconClient { - pub http: reqwest::Client, pub endpoint: Url, } impl BeaconClient { - pub fn new(http: reqwest::Client, endpoint: Url) -> Self { - Self { http, endpoint } + pub fn new(endpoint: Url) -> Self { + Self { endpoint } } pub fn from_endpoint_str(endpoint: &str) -> Self { let endpoint = Url::parse(endpoint).unwrap(); - let client = - reqwest::ClientBuilder::new().timeout(BEACON_CLIENT_REQUEST_TIMEOUT).build().unwrap(); - Self::new(client, endpoint) - } - - pub async fn http_get(&self, path: &str) -> Result { - let target = self.endpoint.join(path)?; - Ok(self.http.get(target).send().await?) - } - - pub async fn get( - &self, - path: &str, - ) -> Result { - let result = self.http_get(path).await?.json().await?; - match result { - ApiResult::Ok(result) => Ok(result), - ApiResult::Err(err) => Err(BeaconClientError::Api(err)), - } - } - - pub async fn sync_status(&self) -> Result { - let response: BeaconResponse = self.get("eth/v1/node/syncing").await?; - Ok(response.data) - } - - pub async fn get_proposer_duties( - &self, - epoch: u64, - ) -> Result<(B256, Vec), BeaconClientError> { - let endpoint = format!("eth/v1/validator/duties/proposer/{epoch}"); - let mut result: BeaconResponse> = self.get(&endpoint).await?; - let dependent_root_value = result.meta.remove("dependent_root").ok_or_else(|| { - BeaconClientError::MissingExpectedData( - "missing `dependent_root` in response".to_string(), - ) - })?; - let dependent_root: B256 = serde_json::from_value(dependent_root_value)?; - Ok((dependent_root, result.data)) - } - - pub async fn subscribe_to_payload_attributes_events( - &self, - chan: Sender, - ) { - self.subscribe_to_sse("payload_attributes", chan).await + Self::new(endpoint) } async fn subscribe_to_head_events(&self, chan: Sender) { @@ -291,56 +127,3 @@ impl BeaconClient { } } } - -async fn fetch_and_send_duties_for_slot( - slot: u64, - tx: UnboundedSender>, - beacon_client: MultiBeaconClient, -) { - let epoch = slot / EPOCH_SLOTS; - - // Fetch for `epoch` and `epoch + 1`; - let mut all_duties = Vec::with_capacity(64); - match beacon_client.get_proposer_duties(epoch).await { - Ok((_, mut duties)) => { - all_duties.append(&mut duties); - } - Err(err) => { - warn!(?err, %epoch, "failed fetching duties") - } - } - match beacon_client.get_proposer_duties(epoch + 1).await { - Ok((_, mut duties)) => { - all_duties.append(&mut duties); - } - Err(err) => { - warn!(?err, epoch=%epoch+1, "failed fetching duties") - } - } - - if let Err(err) = tx.send(all_duties) { - error!(?err, "error sending duties"); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::initialize_tracing_log; - - fn get_test_client() -> BeaconClient { - BeaconClient::from_endpoint_str("http://18.199.195.154:32945") - } - - #[tokio::test] - async fn test_best_sync_status() { - initialize_tracing_log(); - - let client = get_test_client(); - - let sync_status = client.sync_status().await; - tracing::info!(?sync_status); - assert!(sync_status.is_ok()); - assert!(sync_status.unwrap().head_slot > 0); - } -} diff --git a/src/common/error.rs b/src/common/error.rs index 875a197..aa61ea5 100644 --- a/src/common/error.rs +++ b/src/common/error.rs @@ -8,25 +8,4 @@ pub enum BeaconClientError { #[error("JSON serialization/deserialization error: {0}")] Json(#[from] serde_json::Error), - - #[error("error from API: {0}")] - Api(String), - - #[error("missing expected data in response: {0}")] - MissingExpectedData(String), - - #[error("beacon node unavailable")] - BeaconNodeUnavailable, - - #[error("block validation failed")] - BlockValidationFailed, - - #[error("block integration failed")] - BlockIntegrationFailed, - - #[error("beacon node syncing")] - BeaconNodeSyncing, - - #[error("channel error")] - ChannelError, } diff --git a/src/constants.rs b/src/constants.rs index 7479b70..38a94e9 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -2,4 +2,3 @@ pub const EPOCH_SLOTS: u64 = 32; pub const GET_PRECONFER_PATH: &str = "/constraints/v1/preconfer/"; pub const GET_PRECONFERS_PATH: &str = "/constraints/v1/preconfers"; -pub const SET_CONSTRAINTS_PATH: &str = "/eth/v1/builder/set_constraints"; diff --git a/src/forward_service.rs b/src/forward_service.rs index 6051c35..ed2d3b9 100644 --- a/src/forward_service.rs +++ b/src/forward_service.rs @@ -21,7 +21,6 @@ use reqwest_tracing::{ use tokio::task::JoinHandle; use tower_http::trace::TraceLayer; use tracing::Span; -use tracing_subscriber::fmt::format; use crate::lookahead::LookaheadManager; @@ -153,6 +152,7 @@ mod test { Router, }; use bytes::Bytes; + use dashmap::DashMap; use eyre::Result; use hashbrown::HashMap; use http::StatusCode; @@ -169,7 +169,10 @@ mod test { #[tokio::test] async fn test_missing_chain_id() -> Result<()> { tokio::spawn(async move { - let manager = LookaheadManager::new(Lookahead::Single(None), LookaheadProvider::None); + let manager = LookaheadManager::new( + Lookahead::Multi(DashMap::new().into()), + LookaheadProvider::None, + ); let mut managers = HashMap::new(); managers.insert(1u16, manager); let router = router(SharedState::new(managers).unwrap()); @@ -186,15 +189,18 @@ mod test { #[tokio::test] async fn test_invalid_chain_id() -> Result<()> { tokio::spawn(async move { - let manager = LookaheadManager::new(Lookahead::Single(None), LookaheadProvider::None); + let manager = LookaheadManager::new( + Lookahead::Multi(DashMap::new().into()), + LookaheadProvider::None, + ); let mut managers = HashMap::new(); managers.insert(1u16, manager); let router = router(SharedState::new(managers).unwrap()); - let listener = tokio::net::TcpListener::bind("localhost:12001").await.unwrap(); + let listener = tokio::net::TcpListener::bind("localhost:12002").await.unwrap(); axum::serve(listener, router).await.unwrap(); }); tokio::time::sleep(Duration::from_secs(1)).await; - let res = reqwest::Client::new().post("http://localhost:12001/2").send().await.unwrap(); + let res = reqwest::Client::new().post("http://localhost:12002/2").send().await.unwrap(); assert_eq!(res.status(), StatusCode::BAD_REQUEST); assert_eq!(res.text().await.unwrap(), "no lookahead provider found for id 2"); Ok(()) @@ -203,13 +209,12 @@ mod test { #[tokio::test] async fn test_unavailable_forwarded_service() -> Result<()> { tokio::spawn(async move { - let manager = LookaheadManager::new( - Lookahead::Single(Some(LookaheadEntry { - url: "http://not-a-valid-url.gattaca".into(), - ..Default::default() - })), - LookaheadProvider::None, - ); + let map = Arc::new(DashMap::new()); + map.insert(0, LookaheadEntry { + url: "http://not-a-valid-url".into(), + ..Default::default() + }); + let manager = LookaheadManager::new(Lookahead::Multi(map), LookaheadProvider::None); let mut managers = HashMap::new(); managers.insert(1u16, manager); let router = router(SharedState::new(managers).unwrap()); @@ -234,13 +239,12 @@ mod test { axum::serve(listener, router).await.unwrap(); }); tokio::spawn(async move { - let manager = LookaheadManager::new( - Lookahead::Single(Some(LookaheadEntry { - url: "http://localhost:12004".into(), - ..Default::default() - })), - LookaheadProvider::None, - ); + let map = Arc::new(DashMap::new()); + map.insert(0, LookaheadEntry { + url: "http://localhost:12004".into(), + ..Default::default() + }); + let manager = LookaheadManager::new(Lookahead::Multi(map), LookaheadProvider::None); let mut managers = HashMap::new(); managers.insert(1u16, manager); let router = router(SharedState::new(managers).unwrap()); diff --git a/src/lookahead/manager.rs b/src/lookahead/manager.rs index 50a90fa..6201c1b 100644 --- a/src/lookahead/manager.rs +++ b/src/lookahead/manager.rs @@ -2,7 +2,7 @@ use alloy::rpc::types::beacon::events::HeadEvent; use dashmap::DashMap; use eyre::{bail, Result}; use hashbrown::HashMap; -use tokio::{sync::broadcast, task::JoinHandle}; +use tokio::sync::broadcast; use super::{ provider::LookaheadProvider, Lookahead, LookaheadEntry, LookaheadProviderOptions, @@ -12,7 +12,7 @@ use crate::config::Config; enum LookaheadProviderManager { Initialized(LookaheadProvider), - Running(JoinHandle<()>), + Running, } pub struct LookaheadManager { @@ -32,10 +32,10 @@ impl LookaheadManager { self.provider_manager.take().expect("provider manager should never be None"); match provider_manager { LookaheadProviderManager::Initialized(provider) => { - let handle = tokio::spawn(async move { + let _handle = tokio::spawn(async move { provider.run().await; }); - self.provider_manager = Some(LookaheadProviderManager::Running(handle)); + self.provider_manager = Some(LookaheadProviderManager::Running); Ok(()) } _ => bail!("context provider is already running."), diff --git a/src/lookahead/mod.rs b/src/lookahead/mod.rs index 34f9439..6f7cfde 100644 --- a/src/lookahead/mod.rs +++ b/src/lookahead/mod.rs @@ -1,9 +1,6 @@ use std::sync::Arc; -use alloy::rpc::types::beacon::BlsPublicKey; use dashmap::DashMap; -use hashbrown::HashMap; -use tokio::sync::broadcast; use crate::preconf::election::SignedPreconferElection; @@ -24,28 +21,21 @@ impl LookaheadEntry { pub fn slot(&self) -> u64 { self.election.slot() } - - pub fn preconfer_pubkey(&self) -> BlsPublicKey { - self.election.message.preconfer_pubkey - } } #[derive(Debug, Clone)] pub enum Lookahead { - Single(Option), Multi(Arc>), } impl Lookahead { pub fn clear_slots(&mut self, head_slot: u64) { match self { - Lookahead::Single(_) => (), Lookahead::Multi(m) => m.retain(|slot, _| *slot >= head_slot), } } pub fn insert(&mut self, election_slot: u64, slot: LookaheadEntry) { match self { - Lookahead::Single(s) => *s = Some(slot), Lookahead::Multi(m) => { m.insert(election_slot, slot); } @@ -58,42 +48,9 @@ impl Lookahead { /// getting the preconfer with the lowest slot number. pub fn get_next_elected_preconfer(&self) -> Option { match self { - Lookahead::Single(s) => s.clone(), Lookahead::Multi(m) => { m.iter().min_by_key(|entry| entry.slot()).map(|entry| entry.value().clone()) } } } -} - -#[cfg(test)] -mod test { - use provider::{LookaheadProviderOptions, RelayLookaheadProvider}; - - use super::*; - use crate::{common::client::MultiBeaconClient, initialize_tracing_log}; - - #[ignore] - #[tokio::test] - async fn test_lookahead() { - std::env::set_var("RUST_LOG", "lookahead=trace"); - - initialize_tracing_log(); - - let beacons = vec!["https://bn.bootnode.helder-devnets.xyz/".into()]; - - let (beacon_tx, beacon_rx) = broadcast::channel(16); - let client = MultiBeaconClient::from_endpoint_strs(&beacons); - client.subscribe_to_head_events(beacon_tx.clone()).await; - - let lookahead = Lookahead::Multi(DashMap::new().into()); - let relays = vec!["http://18.192.244.122:4040".into()]; - let provider = LookaheadProviderOptions { - head_event_receiver: Some(beacon_rx), - relay_provider: Some(RelayLookaheadProvider::new(lookahead, relays, HashMap::new())), - } - .build_relay_provider(); - - provider.run().await; - } -} +} \ No newline at end of file diff --git a/src/lookahead/provider.rs b/src/lookahead/provider.rs index 43730f3..bae4805 100644 --- a/src/lookahead/provider.rs +++ b/src/lookahead/provider.rs @@ -191,7 +191,12 @@ impl LookaheadProviderOptions { } pub enum LookaheadProvider { - Relay { provider: RelayLookaheadProvider, receiver: Receiver }, + Relay { + provider: RelayLookaheadProvider, + receiver: Receiver, + }, + #[allow(dead_code)] + /// used for testing purpose, LookaheadProvider::None does not fetch any lookhead. None, } diff --git a/src/main.rs b/src/main.rs index 37a2fed..7c1fb9f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -44,7 +44,7 @@ async fn main() -> Result<()> { match &cli.command { Commands::Forward { port } => { let config = Config::from_file(&cli.config)?; - let (beacon_tx, beacon_rx) = broadcast::channel(16); + let (beacon_tx, _beacon_rx) = broadcast::channel(16); let client = MultiBeaconClient::from_endpoint_strs(&config.beacon_urls); client.subscribe_to_head_events(beacon_tx.clone()).await; let listening_addr = format!("0.0.0.0:{}", port.unwrap_or(8000)); diff --git a/src/preconf/constraints.rs b/src/preconf/constraints.rs index ff3a512..2307419 100644 --- a/src/preconf/constraints.rs +++ b/src/preconf/constraints.rs @@ -28,21 +28,6 @@ pub struct ConstraintsMessage { >, } -impl ConstraintsMessage { - /// Creates a new Constraints message from preconfs. Treats each preconf as individual - /// and makes no bundles. - pub fn new(preconfs: Vec, proposal_slot: u64) -> Self { - let mut msg = ConstraintsMessage { slot: proposal_slot, constraints: Default::default() }; - - for preconf in preconfs { - let constraint = VariableList::new(vec![preconf]).unwrap(); - msg.constraints.push(constraint).unwrap(); - } - - msg - } -} - /// Constraint representing a transaction that must be *included* in a block. #[derive(Debug, Clone, Default, PartialEq, Serialize, TreeHash)] pub struct InclusionConstraint { diff --git a/src/preconf/election.rs b/src/preconf/election.rs index 883cebe..136866e 100644 --- a/src/preconf/election.rs +++ b/src/preconf/election.rs @@ -17,14 +17,6 @@ impl SignedPreconferElection { pub fn slot(&self) -> u64 { self.message.slot_number } - - pub fn chain_id(&self) -> u64 { - self.message.chain_id - } - - pub fn gas_limit(&self) -> u64 { - self.message.gas_limit - } } #[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, TreeHash)] diff --git a/src/relay_client/client.rs b/src/relay_client/client.rs index b34ba73..9561d98 100644 --- a/src/relay_client/client.rs +++ b/src/relay_client/client.rs @@ -6,8 +6,8 @@ use tracing::{error, trace}; use super::RelayClientConfig; use crate::{ - constants::{EPOCH_SLOTS, GET_PRECONFERS_PATH, GET_PRECONFER_PATH, SET_CONSTRAINTS_PATH}, - preconf::{constraints::SignedConstraints, election::SignedPreconferElection}, + constants::{EPOCH_SLOTS, GET_PRECONFERS_PATH, GET_PRECONFER_PATH}, + preconf::election::SignedPreconferElection, relay_client::error::RelayClientError, }; @@ -28,11 +28,6 @@ impl RelayClient { Self { client, config } } - /// Returns the ID of the relay. - pub fn id(&self) -> String { - self.config.url.clone() - } - /// Fetches elected preconfers for the entire epoch. /// /// If the relay supports lookahead, it uses it to fetch all preconfers at once. @@ -112,26 +107,6 @@ impl RelayClient { Ok(Some(preconfer_election)) } - /// Sets constraints for the relay by making a request to the constraints API. - /// See spec: [https://www.notion.so/Aligning-Preconfirmation-APIs-db7907d9e66e41718e6bc2cff19604e4?pvs=4#99734b0684f741c8b15eec7661f8940d]. - pub async fn set_constraints( - &self, - constraints: SignedConstraints, - ) -> Result<(), RelayClientError> { - let url = format!("{}{}", self.url(), SET_CONSTRAINTS_PATH); - - let response = self.client.post(&url).json(&constraints).send().await?; - - let status_code = response.status(); - if status_code.is_success() { - Ok(()) - } else { - let error_message = - response.text().await.unwrap_or_else(|_| "Unknown error".to_string()); - Err(RelayClientError::RelayError { status_code, error: error_message }) - } - } - /// Returns the URL of the relay. pub fn url(&self) -> &str { &self.config.url diff --git a/src/relay_client/error.rs b/src/relay_client/error.rs index 5ced861..126aa03 100644 --- a/src/relay_client/error.rs +++ b/src/relay_client/error.rs @@ -1,10 +1,5 @@ -use reqwest::StatusCode; - #[derive(Debug, thiserror::Error)] pub enum RelayClientError { #[error("Reqwest error: {0}")] ReqwestError(#[from] reqwest::Error), - - #[error("Relay responded with an error. Code: {status_code:?}, Error: {error}")] - RelayError { status_code: StatusCode, error: String }, } diff --git a/src/ssz/mod.rs b/src/ssz/mod.rs index 8178a08..d999887 100644 --- a/src/ssz/mod.rs +++ b/src/ssz/mod.rs @@ -7,4 +7,3 @@ pub type MaxBytesPerTransaction = U1073741824; // 1,073,741,824 pub type MaxTransactionsPerPayload = U1048576; // 1,048,576 pub type SszTransaction = VariableList; -pub type SszTransactions = VariableList;