Skip to content

Commit

Permalink
Manually merge tracing PR for beacon_node_fallback
Browse files Browse the repository at this point in the history
Co-authored-by: ThreeHrSleep <threehrsleep@gmail.com>
  • Loading branch information
dknopik and ThreeHrSleep committed Dec 19, 2024
1 parent c26126f commit 7b51633
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 83 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion validator_client/beacon_node_fallback/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ eth2 = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
serde = { workspace = true }
slog = { workspace = true }
slot_clock = { workspace = true }
strum = { workspace = true }
task_executor = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }
validator_metrics = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use super::CandidateError;
use eth2::BeaconNodeHttpClient;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use slog::{warn, Logger};
use std::cmp::Ordering;
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use tracing::warn;
use types::Slot;

/// Sync distances between 0 and DEFAULT_SYNC_TOLERANCE are considered `synced`.
Expand Down Expand Up @@ -290,15 +290,13 @@ impl BeaconNodeHealth {

pub async fn check_node_health(
beacon_node: &BeaconNodeHttpClient,
log: &Logger,
) -> Result<(Slot, bool, bool), CandidateError> {
let resp = match beacon_node.get_node_syncing().await {
Ok(resp) => resp,
Err(e) => {
warn!(
log,
"Unable connect to beacon node";
"error" => %e
error = %e,
"Unable connect to beacon node"
);

return Err(CandidateError::Offline);
Expand Down
117 changes: 43 additions & 74 deletions validator_client/beacon_node_fallback/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use beacon_node_health::{
use eth2::BeaconNodeHttpClient;
use futures::future;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
use std::cmp::Ordering;
use std::fmt;
Expand All @@ -21,6 +20,7 @@ use std::time::{Duration, Instant};
use strum::{EnumString, EnumVariantNames};
use task_executor::TaskExecutor;
use tokio::{sync::RwLock, time::sleep};
use tracing::{debug, error, warn};
use types::{ChainSpec, Config as ConfigSpec, EthSpec, Slot};
use validator_metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_REQUESTS};

Expand Down Expand Up @@ -216,15 +216,14 @@ impl CandidateBeaconNode {
distance_tiers: &BeaconNodeSyncDistanceTiers,
slot_clock: Option<&T>,
spec: &ChainSpec,
log: &Logger,
) -> Result<(), CandidateError> {
if let Err(e) = self.is_compatible::<E>(spec, log).await {
if let Err(e) = self.is_compatible::<E>(spec).await {
*self.health.write().await = Err(e);
return Err(e);
}

if let Some(slot_clock) = slot_clock {
match check_node_health(&self.beacon_node, log).await {
match check_node_health(&self.beacon_node).await {
Ok((head, is_optimistic, el_offline)) => {
let Some(slot_clock_head) = slot_clock.now() else {
let e = match slot_clock.is_prior_to_genesis() {
Expand Down Expand Up @@ -282,84 +281,73 @@ impl CandidateBeaconNode {
}

/// Checks if the node has the correct specification.
async fn is_compatible<E: EthSpec>(
&self,
spec: &ChainSpec,
log: &Logger,
) -> Result<(), CandidateError> {
async fn is_compatible<E: EthSpec>(&self, spec: &ChainSpec) -> Result<(), CandidateError> {
let config = self
.beacon_node
.get_config_spec::<ConfigSpec>()
.await
.map_err(|e| {
error!(
log,
"Unable to read spec from beacon node";
"error" => %e,
"endpoint" => %self.beacon_node,
error = %e,
endpoint = %self.beacon_node,
"Unable to read spec from beacon node"
);
CandidateError::Offline
})?
.data;

let beacon_node_spec = ChainSpec::from_config::<E>(&config).ok_or_else(|| {
error!(
log,
endpoint = %self.beacon_node,
"The minimal/mainnet spec type of the beacon node does not match the validator \
client. See the --network command.";
"endpoint" => %self.beacon_node,
client. See the --network command."

);
CandidateError::Incompatible
})?;

if beacon_node_spec.genesis_fork_version != spec.genesis_fork_version {
error!(
log,
"Beacon node is configured for a different network";
"endpoint" => %self.beacon_node,
"bn_genesis_fork" => ?beacon_node_spec.genesis_fork_version,
"our_genesis_fork" => ?spec.genesis_fork_version,
endpoint = %self.beacon_node,
bn_genesis_fork = ?beacon_node_spec.genesis_fork_version,
our_genesis_fork = ?spec.genesis_fork_version,
"Beacon node is configured for a different network"
);
return Err(CandidateError::Incompatible);
} else if beacon_node_spec.altair_fork_epoch != spec.altair_fork_epoch {
warn!(
log,
"Beacon node has mismatched Altair fork epoch";
"endpoint" => %self.beacon_node,
"endpoint_altair_fork_epoch" => ?beacon_node_spec.altair_fork_epoch,
"hint" => UPDATE_REQUIRED_LOG_HINT,
endpoint = %self.beacon_node,
endpoint_altair_fork_epoch = ?beacon_node_spec.altair_fork_epoch,
hint = UPDATE_REQUIRED_LOG_HINT,
"Beacon node has mismatched Altair fork epoch"
);
} else if beacon_node_spec.bellatrix_fork_epoch != spec.bellatrix_fork_epoch {
warn!(
log,
"Beacon node has mismatched Bellatrix fork epoch";
"endpoint" => %self.beacon_node,
"endpoint_bellatrix_fork_epoch" => ?beacon_node_spec.bellatrix_fork_epoch,
"hint" => UPDATE_REQUIRED_LOG_HINT,
endpoint = %self.beacon_node,
endpoint_bellatrix_fork_epoch = ?beacon_node_spec.bellatrix_fork_epoch,
hint = UPDATE_REQUIRED_LOG_HINT,
"Beacon node has mismatched Bellatrix fork epoch"
);
} else if beacon_node_spec.capella_fork_epoch != spec.capella_fork_epoch {
warn!(
log,
"Beacon node has mismatched Capella fork epoch";
"endpoint" => %self.beacon_node,
"endpoint_capella_fork_epoch" => ?beacon_node_spec.capella_fork_epoch,
"hint" => UPDATE_REQUIRED_LOG_HINT,
endpoint = %self.beacon_node,
endpoint_capella_fork_epoch = ?beacon_node_spec.capella_fork_epoch,
hint = UPDATE_REQUIRED_LOG_HINT,
"Beacon node has mismatched Capella fork epoch"
);
} else if beacon_node_spec.deneb_fork_epoch != spec.deneb_fork_epoch {
warn!(
log,
"Beacon node has mismatched Deneb fork epoch";
"endpoint" => %self.beacon_node,
"endpoint_deneb_fork_epoch" => ?beacon_node_spec.deneb_fork_epoch,
"hint" => UPDATE_REQUIRED_LOG_HINT,
endpoint = %self.beacon_node,
endpoint_deneb_fork_epoch = ?beacon_node_spec.deneb_fork_epoch,
hint = UPDATE_REQUIRED_LOG_HINT,
"Beacon node has mismatched Deneb fork epoch"
);
} else if beacon_node_spec.electra_fork_epoch != spec.electra_fork_epoch {
warn!(
log,
"Beacon node has mismatched Electra fork epoch";
"endpoint" => %self.beacon_node,
"endpoint_electra_fork_epoch" => ?beacon_node_spec.electra_fork_epoch,
"hint" => UPDATE_REQUIRED_LOG_HINT,
endpoint = %self.beacon_node,
endpoint_electra_fork_epoch = ?beacon_node_spec.electra_fork_epoch,
hint = UPDATE_REQUIRED_LOG_HINT,
"Beacon node has mismatched Electra fork epoch"
);
}

Expand All @@ -377,7 +365,6 @@ pub struct BeaconNodeFallback<T> {
slot_clock: Option<T>,
broadcast_topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
log: Logger,
}

impl<T: SlotClock> BeaconNodeFallback<T> {
Expand All @@ -386,7 +373,6 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
config: Config,
broadcast_topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
log: Logger,
) -> Self {
let distance_tiers = config.sync_tolerances;
Self {
Expand All @@ -395,7 +381,6 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
slot_clock: None,
broadcast_topics,
spec,
log,
}
}

Expand Down Expand Up @@ -478,7 +463,6 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
&self.distance_tiers,
self.slot_clock.as_ref(),
&self.spec,
&self.log,
));
nodes.push(candidate.beacon_node.to_string());
}
Expand All @@ -491,10 +475,9 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
if let Err(e) = result {
if *e != CandidateError::PreGenesis {
warn!(
self.log,
"A connected beacon node errored during routine health check";
"error" => ?e,
"endpoint" => node,
error = ?e,
endpoint = %node,
"A connected beacon node errored during routine health check"
);
}
}
Expand Down Expand Up @@ -566,11 +549,7 @@ impl<T: SlotClock> BeaconNodeFallback<T> {

// Run `func` using a `candidate`, returning the value or capturing errors.
for candidate in candidates.iter() {
futures.push(Self::run_on_candidate(
candidate.beacon_node.clone(),
&func,
&self.log,
));
futures.push(Self::run_on_candidate(candidate.beacon_node.clone(), &func));
}
drop(candidates);

Expand All @@ -588,11 +567,7 @@ impl<T: SlotClock> BeaconNodeFallback<T> {

// Run `func` using a `candidate`, returning the value or capturing errors.
for candidate in candidates.iter() {
futures.push(Self::run_on_candidate(
candidate.beacon_node.clone(),
&func,
&self.log,
));
futures.push(Self::run_on_candidate(candidate.beacon_node.clone(), &func));
}
drop(candidates);

Expand All @@ -611,7 +586,6 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
async fn run_on_candidate<F, R, Err, O>(
candidate: BeaconNodeHttpClient,
func: F,
log: &Logger,
) -> Result<O, (String, Error<Err>)>
where
F: Fn(BeaconNodeHttpClient) -> R,
Expand All @@ -626,10 +600,9 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
Ok(val) => Ok(val),
Err(e) => {
debug!(
log,
"Request to beacon node failed";
"node" => %candidate,
"error" => ?e,
node = %candidate,
error = ?e,
"Request to beacon node failed"
);
inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.as_ref()]);
Err((candidate.to_string(), Error::RequestFailed(e)))
Expand All @@ -656,11 +629,7 @@ impl<T: SlotClock> BeaconNodeFallback<T> {

// Run `func` using a `candidate`, returning the value or capturing errors.
for candidate in candidates.iter() {
futures.push(Self::run_on_candidate(
candidate.beacon_node.clone(),
&func,
&self.log,
));
futures.push(Self::run_on_candidate(candidate.beacon_node.clone(), &func));
}
drop(candidates);

Expand Down
2 changes: 0 additions & 2 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,15 +390,13 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
log.clone(),
);

let mut proposer_nodes: BeaconNodeFallback<_> = BeaconNodeFallback::new(
proposer_candidates,
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
log.clone(),
);

// Perform some potentially long-running initialization tasks.
Expand Down

0 comments on commit 7b51633

Please sign in to comment.