Skip to content

Commit

Permalink
fix: use all nodes to fetch blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
alextes committed Aug 27, 2024
1 parent 3b88207 commit 192df8e
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 37 deletions.
96 changes: 64 additions & 32 deletions src/beacon_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use anyhow::anyhow;
use rand::seq::SliceRandom;
use reqwest::{StatusCode, Url};
use serde::Deserialize;
use tracing::warn;

#[derive(Deserialize)]
struct BeaconResponse<T> {
Expand All @@ -19,7 +18,7 @@ pub struct SyncStatus {
pub is_syncing: bool,
}

#[derive(Deserialize)]
#[derive(Deserialize, Clone)]
pub struct ExecutionPayload {
pub block_hash: String,
#[serde(deserialize_with = "parse_i64_from_string")]
Expand Down Expand Up @@ -52,33 +51,36 @@ where

#[derive(Clone)]
pub struct BeaconApi {
node_urls: Vec<Url>,
node_hosts: Vec<Url>,
client: reqwest::Client,
}

impl BeaconApi {
pub fn new(nodes: &[Url]) -> Self {
if !nodes.is_empty() {
Self {
node_urls: nodes.to_vec(),
client: reqwest::Client::new(),
}
} else {
panic!("tried to instantiate BeaconAPI without at least one url");
pub fn new(node_hosts: &[Url]) -> Self {
assert!(
!node_hosts.is_empty(),
"tried to instantiate BeaconAPI without at least one url"
);

Self {
node_hosts: node_hosts.to_vec(),
client: reqwest::Client::new(),
}
}

// poor mans load balancer, get random node from list
fn get_node(&self) -> &Url {
self.node_urls.choose(&mut rand::thread_rng()).unwrap()
/// Pick a random beacon node from the list we've been initialized with.
fn random_host(&self) -> &Url {
self.node_hosts.choose(&mut rand::thread_rng()).unwrap()
}

/// Fetch a validator index from a pubkey.
pub async fn validator_index(&self, pubkey: &String) -> reqwest::Result<String> {
let url = format!(
"{}eth/v1/beacon/states/head/validators/{}",
self.get_node(),
self.random_host(),
pubkey
);

self.client
.get(url)
.send()
Expand All @@ -88,15 +90,19 @@ impl BeaconApi {
.map(|body| body.data.index)
}

// Method to fetch the payload from a node and a slot
async fn fetch_payload(
/// Method to fetch the payload from a node and a slot.
async fn block_by_slot(
&self,
node: &Url,
slot: i64,
) -> anyhow::Result<Option<ExecutionPayload>> {
let url = format!("{}eth/v2/beacon/blocks/{}", node, slot);
let res = self.client.get(&url).send().await?;
match res.status() {
// Could mean:
// 1. Slot doesn't have a block.
// 2. Slot is in the future and doesn't have a block yet.
// 3. Slot is before the beacon node backfill limit.
StatusCode::NOT_FOUND => Ok(None),
StatusCode::OK => {
let block = res
Expand All @@ -114,8 +120,45 @@ impl BeaconApi {
}
}

/// Fetch a beacon block by slot.
///
/// This function is intended to be highly reliable, it does so by calling as many nodes as it
/// can and returning the first Ok(Some) if any, then Ok(None) if any, and finally the first
/// error.
pub async fn block_by_slot_any(&self, slot: i64) -> anyhow::Result<Option<ExecutionPayload>> {
let futures = self
.node_hosts
.iter()
.map(|node| self.block_by_slot(node, slot));
let results = futures::future::join_all(futures).await;

// Attempt to return the first Ok(Some) if any.
for result in &results {
match result {
Ok(Some(payload)) => return Ok(Some(payload.clone())),
Ok(None) => continue,
Err(_) => continue,
}
}

// Attempt to return the first Ok(None) if any.
for result in &results {
match result {
Ok(None) => return Ok(None),
Ok(Some(_)) => continue,
Err(_) => continue,
}
}

// Return the first error if all Ok(None) and Ok(Some) are exhausted.
results
.into_iter()
.next()
.expect("expect results to be all errors")
}

// Method to fetch the sync status from a node
pub async fn fetch_sync_status(&self, node_url: &Url) -> reqwest::Result<SyncStatus> {
async fn sync_status(&self, node_url: &Url) -> reqwest::Result<SyncStatus> {
let url = format!("{}eth/v1/node/syncing", node_url);
self.client
.get(&url)
Expand All @@ -128,19 +171,8 @@ impl BeaconApi {
.map_err(Into::into)
}

pub async fn fetch_payload_all(&self, slot: i64) -> anyhow::Result<Option<ExecutionPayload>> {
for (i, node) in self.node_urls.iter().enumerate() {
match self.fetch_payload(node, slot).await {
Ok(res) => return Ok(res),
Err(err) => {
warn!("failed to fetch payload from {}: {:?}", node, err);
if i == self.node_urls.len() - 1 {
return Err(err);
}
}
}
}

unreachable!("last iteration should always return")
pub async fn sync_status_all(&self) -> Vec<reqwest::Result<SyncStatus>> {
let futures = self.node_hosts.iter().map(|node| self.sync_status(node));
futures::future::join_all(futures).await
}
}
5 changes: 2 additions & 3 deletions src/phoenix/consensus_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ impl ConsensusNodeMonitor {
async fn num_unsynced_nodes(&self) -> usize {
let mut results = Vec::new();

for url in &APP_CONFIG.consensus_nodes {
let status = self.beacon_api.fetch_sync_status(url).await;

let statuses = self.beacon_api.sync_status_all().await;
for status in statuses {
match status {
Ok(s) => results.push(!s.is_syncing),
Err(err) => {
Expand Down
4 changes: 2 additions & 2 deletions src/phoenix/inclusion_monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ async fn was_attempted_reorg(
delivered: &DeliveredPayload,
) -> anyhow::Result<bool> {
let prev_slot = delivered.slot - 1;
let prev_payload = beacon_api.fetch_payload_all(prev_slot).await?;
let prev_payload = beacon_api.block_by_slot_any(prev_slot).await?;
Ok(prev_payload
.map(|p| p.block_number == delivered.block_number)
.unwrap_or(false))
Expand All @@ -334,7 +334,7 @@ async fn check_missing_payload(
payload: &DeliveredPayload,
relay_pool: &PgPool,
) -> anyhow::Result<()> {
let block = beacon_api.fetch_payload_all(payload.slot).await?;
let block = beacon_api.block_by_slot_any(payload.slot).await?;

match block {
Some(ExecutionPayload { block_hash, .. }) => {
Expand Down

0 comments on commit 192df8e

Please sign in to comment.