Skip to content

Commit

Permalink
remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
fbrv committed Jul 19, 2024
1 parent dc5cef1 commit 0e96fdf
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 370 deletions.
229 changes: 6 additions & 223 deletions src/common/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,13 @@ use std::{
time::Duration,
};

use alloy::{
primitives::B256,
rpc::types::beacon::events::{HeadEvent, PayloadAttributesEvent},
};
use futures::{future::join_all, StreamExt};
use alloy::rpc::types::beacon::events::HeadEvent;
use futures::StreamExt;
use reqwest_eventsource::EventSource;
use tokio::{
sync::{
broadcast::{self, Sender},
mpsc::UnboundedSender,
},
task::JoinError,
time::sleep,
};
use tokio::{sync::broadcast::Sender, time::sleep};
use tracing::{debug, error, warn};
use url::Url;

use super::{
error::BeaconClientError,
types::{ApiResult, BeaconResponse, ProposerDuty, SyncStatus},
};
use crate::constants::EPOCH_SLOTS;

const BEACON_CLIENT_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
const PROPOSER_DUTIES_REFRESH_FREQ: u64 = EPOCH_SLOTS / 4;

/// Handles communication with multiple `BeaconClient` instances.
/// Load balances requests.
#[derive(Clone)]
Expand Down Expand Up @@ -61,84 +42,6 @@ impl MultiBeaconClient {
Self::new(clients)
}

/// Retrieves the sync status from multiple beacon clients and selects the best one.
///
/// The function spawns async tasks to fetch the sync status from each beacon client.
/// It then selects the sync status with the highest `head_slot`.
pub async fn best_sync_status(&self) -> Result<SyncStatus, BeaconClientError> {
let clients = self.beacon_clients_by_last_response();

let handles = clients
.into_iter()
.map(|(_, client)| tokio::spawn(async move { client.sync_status().await }))
.collect::<Vec<_>>();

let results: Vec<Result<Result<SyncStatus, BeaconClientError>, JoinError>> =
join_all(handles).await;

let mut best_sync_status: Option<SyncStatus> = None;
for join_result in results {
match join_result {
Ok(sync_status_result) => match sync_status_result {
Ok(sync_status) => {
if best_sync_status.as_ref().map_or(true, |current_best| {
current_best.head_slot < sync_status.head_slot
}) {
best_sync_status = Some(sync_status);
}
}
Err(err) => warn!("Failed to get sync status: {err:?}"),
},
Err(join_err) => {
error!("Tokio join error for best_sync_status: {join_err:?}")
}
}
}

best_sync_status.ok_or(BeaconClientError::BeaconNodeUnavailable)
}

pub async fn get_proposer_duties(
&self,
epoch: u64,
) -> Result<(B256, Vec<ProposerDuty>), BeaconClientError> {
let clients = self.beacon_clients_by_last_response();
let mut last_error = None;

for (i, client) in clients.into_iter() {
match client.get_proposer_duties(epoch).await {
Ok(proposer_duties) => {
self.best_beacon_instance.store(i, Ordering::Relaxed);
return Ok(proposer_duties);
}
Err(err) => {
last_error = Some(err);
}
}
}

Err(last_error.unwrap_or(BeaconClientError::BeaconNodeUnavailable))
}

/// `subscribe_to_payload_attributes_events` subscribes to payload attributes events from all
/// beacon nodes.
///
/// This function swaps async tasks for all beacon clients. Therefore,
/// a single payload event will be received multiple times, likely once for every beacon node.
pub async fn subscribe_to_payload_attributes_events(
&self,
chan: Sender<PayloadAttributesEvent>,
) {
let clients = self.beacon_clients_by_last_response();

for (_, client) in clients {
let chan = chan.clone();
tokio::spawn(async move {
client.subscribe_to_payload_attributes_events(chan).await;
});
}
}

/// `subscribe_to_head_events` subscribes to head events from all beacon nodes.
///
/// This function swaps async tasks for all beacon clients. Therefore,
Expand All @@ -154,27 +57,6 @@ impl MultiBeaconClient {
}
}

/// `subscribe_to_proposer_duties` listens to new `PayloadAttributesEvent`s through `rx`.
/// Fetches the chain proposer duties every 8 slots and sends them down `tx`.
pub async fn subscribe_to_proposer_duties(
self,
tx: UnboundedSender<Vec<ProposerDuty>>,
mut rx: broadcast::Receiver<PayloadAttributesEvent>,
) {
let mut last_updated_slot = 0;

while let Ok(payload) = rx.recv().await {
let new_slot = payload.data.proposal_slot;

if last_updated_slot == 0 ||
(new_slot > last_updated_slot && new_slot % PROPOSER_DUTIES_REFRESH_FREQ == 0)
{
last_updated_slot = new_slot;
tokio::spawn(fetch_and_send_duties_for_slot(new_slot, tx.clone(), self.clone()));
}
}
}

/// Returns a list of beacon clients, prioritized by the last successful response.
///
/// The beacon client with the most recent successful response is placed at the
Expand All @@ -193,63 +75,17 @@ impl MultiBeaconClient {
/// Handles communication to a single beacon client url.
#[derive(Clone, Debug)]
pub struct BeaconClient {
pub http: reqwest::Client,
pub endpoint: Url,
}

impl BeaconClient {
pub fn new(http: reqwest::Client, endpoint: Url) -> Self {
Self { http, endpoint }
pub fn new(endpoint: Url) -> Self {
Self { endpoint }
}

pub fn from_endpoint_str(endpoint: &str) -> Self {
let endpoint = Url::parse(endpoint).unwrap();
let client =
reqwest::ClientBuilder::new().timeout(BEACON_CLIENT_REQUEST_TIMEOUT).build().unwrap();
Self::new(client, endpoint)
}

pub async fn http_get(&self, path: &str) -> Result<reqwest::Response, BeaconClientError> {
let target = self.endpoint.join(path)?;
Ok(self.http.get(target).send().await?)
}

pub async fn get<T: serde::Serialize + serde::de::DeserializeOwned>(
&self,
path: &str,
) -> Result<T, BeaconClientError> {
let result = self.http_get(path).await?.json().await?;
match result {
ApiResult::Ok(result) => Ok(result),
ApiResult::Err(err) => Err(BeaconClientError::Api(err)),
}
}

pub async fn sync_status(&self) -> Result<SyncStatus, BeaconClientError> {
let response: BeaconResponse<SyncStatus> = self.get("eth/v1/node/syncing").await?;
Ok(response.data)
}

pub async fn get_proposer_duties(
&self,
epoch: u64,
) -> Result<(B256, Vec<ProposerDuty>), BeaconClientError> {
let endpoint = format!("eth/v1/validator/duties/proposer/{epoch}");
let mut result: BeaconResponse<Vec<ProposerDuty>> = self.get(&endpoint).await?;
let dependent_root_value = result.meta.remove("dependent_root").ok_or_else(|| {
BeaconClientError::MissingExpectedData(
"missing `dependent_root` in response".to_string(),
)
})?;
let dependent_root: B256 = serde_json::from_value(dependent_root_value)?;
Ok((dependent_root, result.data))
}

pub async fn subscribe_to_payload_attributes_events(
&self,
chan: Sender<PayloadAttributesEvent>,
) {
self.subscribe_to_sse("payload_attributes", chan).await
Self::new(endpoint)
}

async fn subscribe_to_head_events(&self, chan: Sender<HeadEvent>) {
Expand Down Expand Up @@ -291,56 +127,3 @@ impl BeaconClient {
}
}
}

async fn fetch_and_send_duties_for_slot(
slot: u64,
tx: UnboundedSender<Vec<ProposerDuty>>,
beacon_client: MultiBeaconClient,
) {
let epoch = slot / EPOCH_SLOTS;

// Fetch for `epoch` and `epoch + 1`;
let mut all_duties = Vec::with_capacity(64);
match beacon_client.get_proposer_duties(epoch).await {
Ok((_, mut duties)) => {
all_duties.append(&mut duties);
}
Err(err) => {
warn!(?err, %epoch, "failed fetching duties")
}
}
match beacon_client.get_proposer_duties(epoch + 1).await {
Ok((_, mut duties)) => {
all_duties.append(&mut duties);
}
Err(err) => {
warn!(?err, epoch=%epoch+1, "failed fetching duties")
}
}

if let Err(err) = tx.send(all_duties) {
error!(?err, "error sending duties");
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::initialize_tracing_log;

fn get_test_client() -> BeaconClient {
BeaconClient::from_endpoint_str("http://18.199.195.154:32945")
}

#[tokio::test]
async fn test_best_sync_status() {
initialize_tracing_log();

let client = get_test_client();

let sync_status = client.sync_status().await;
tracing::info!(?sync_status);
assert!(sync_status.is_ok());
assert!(sync_status.unwrap().head_slot > 0);
}
}
21 changes: 0 additions & 21 deletions src/common/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,4 @@ pub enum BeaconClientError {

#[error("JSON serialization/deserialization error: {0}")]
Json(#[from] serde_json::Error),

#[error("error from API: {0}")]
Api(String),

#[error("missing expected data in response: {0}")]
MissingExpectedData(String),

#[error("beacon node unavailable")]
BeaconNodeUnavailable,

#[error("block validation failed")]
BlockValidationFailed,

#[error("block integration failed")]
BlockIntegrationFailed,

#[error("beacon node syncing")]
BeaconNodeSyncing,

#[error("channel error")]
ChannelError,
}
1 change: 0 additions & 1 deletion src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ pub const EPOCH_SLOTS: u64 = 32;

pub const GET_PRECONFER_PATH: &str = "/constraints/v1/preconfer/";
pub const GET_PRECONFERS_PATH: &str = "/constraints/v1/preconfers";
pub const SET_CONSTRAINTS_PATH: &str = "/eth/v1/builder/set_constraints";
42 changes: 23 additions & 19 deletions src/forward_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use reqwest_tracing::{
use tokio::task::JoinHandle;
use tower_http::trace::TraceLayer;
use tracing::Span;
use tracing_subscriber::fmt::format;

use crate::lookahead::LookaheadManager;

Expand Down Expand Up @@ -153,6 +152,7 @@ mod test {
Router,
};
use bytes::Bytes;
use dashmap::DashMap;
use eyre::Result;
use hashbrown::HashMap;
use http::StatusCode;
Expand All @@ -169,7 +169,10 @@ mod test {
#[tokio::test]
async fn test_missing_chain_id() -> Result<()> {
tokio::spawn(async move {
let manager = LookaheadManager::new(Lookahead::Single(None), LookaheadProvider::None);
let manager = LookaheadManager::new(
Lookahead::Multi(DashMap::new().into()),
LookaheadProvider::None,
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
Expand All @@ -186,15 +189,18 @@ mod test {
#[tokio::test]
async fn test_invalid_chain_id() -> Result<()> {
tokio::spawn(async move {
let manager = LookaheadManager::new(Lookahead::Single(None), LookaheadProvider::None);
let manager = LookaheadManager::new(
Lookahead::Multi(DashMap::new().into()),
LookaheadProvider::None,
);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
let listener = tokio::net::TcpListener::bind("localhost:12001").await.unwrap();
let listener = tokio::net::TcpListener::bind("localhost:12002").await.unwrap();
axum::serve(listener, router).await.unwrap();
});
tokio::time::sleep(Duration::from_secs(1)).await;
let res = reqwest::Client::new().post("http://localhost:12001/2").send().await.unwrap();
let res = reqwest::Client::new().post("http://localhost:12002/2").send().await.unwrap();
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
assert_eq!(res.text().await.unwrap(), "no lookahead provider found for id 2");
Ok(())
Expand All @@ -203,13 +209,12 @@ mod test {
#[tokio::test]
async fn test_unavailable_forwarded_service() -> Result<()> {
tokio::spawn(async move {
let manager = LookaheadManager::new(
Lookahead::Single(Some(LookaheadEntry {
url: "http://not-a-valid-url.gattaca".into(),
..Default::default()
})),
LookaheadProvider::None,
);
let map = Arc::new(DashMap::new());
map.insert(0, LookaheadEntry {
url: "http://not-a-valid-url".into(),
..Default::default()
});
let manager = LookaheadManager::new(Lookahead::Multi(map), LookaheadProvider::None);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
Expand All @@ -234,13 +239,12 @@ mod test {
axum::serve(listener, router).await.unwrap();
});
tokio::spawn(async move {
let manager = LookaheadManager::new(
Lookahead::Single(Some(LookaheadEntry {
url: "http://localhost:12004".into(),
..Default::default()
})),
LookaheadProvider::None,
);
let map = Arc::new(DashMap::new());
map.insert(0, LookaheadEntry {
url: "http://localhost:12004".into(),
..Default::default()
});
let manager = LookaheadManager::new(Lookahead::Multi(map), LookaheadProvider::None);
let mut managers = HashMap::new();
managers.insert(1u16, manager);
let router = router(SharedState::new(managers).unwrap());
Expand Down
Loading

0 comments on commit 0e96fdf

Please sign in to comment.