diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index a9b76c27548..6608b7ca64f 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -442,3 +442,19 @@ fn monitoring_endpoint() { assert_eq!(api_conf.update_period_secs, Some(30)); }); } +#[test] +fn disable_run_on_all_default() { + CommandLineTest::new().run().with_config(|config| { + assert!(!config.disable_run_on_all); + }); +} + +#[test] +fn disable_run_on_all() { + CommandLineTest::new() + .flag("disable-run-on-all", None) + .run() + .with_config(|config| { + assert!(config.disable_run_on_all); + }); +} diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index df6c949aef0..82f085c43fb 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -105,11 +105,13 @@ impl Error { } /// The list of errors encountered whilst attempting to perform a query. -pub struct AllErrored(pub Vec<(String, Error)>); +pub struct Errors(pub Vec<(String, Error)>); -impl fmt::Display for AllErrored { +impl fmt::Display for Errors { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "All endpoints failed")?; + if !self.0.is_empty() { + write!(f, "Some endpoints failed, num_failed: {}", self.0.len())?; + } for (i, (id, error)) in self.0.iter().enumerate() { let comma = if i + 1 < self.0.len() { "," } else { "" }; @@ -294,15 +296,22 @@ impl CandidateBeaconNode { pub struct BeaconNodeFallback { candidates: Vec>, slot_clock: Option, + disable_run_on_all: bool, spec: ChainSpec, log: Logger, } impl BeaconNodeFallback { - pub fn new(candidates: Vec>, spec: ChainSpec, log: Logger) -> Self { + pub fn new( + candidates: Vec>, + disable_run_on_all: bool, + spec: ChainSpec, + log: Logger, + ) -> Self { Self { candidates, slot_clock: None, + disable_run_on_all, spec, log, } @@ -396,7 +405,7 @@ impl BeaconNodeFallback { require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, func: F, - ) -> Result> + ) -> Result> where F: Fn(&'a BeaconNodeHttpClient) -> R, R: Future>, @@ -486,6 +495,145 @@ impl BeaconNodeFallback { } // There were no candidates already ready and we were unable to make any of them ready. - Err(AllErrored(errors)) + Err(Errors(errors)) + } + + /// Run `func` against all candidates in `self`, collecting the result of `func` against each + /// candidate. + /// + /// First this function will try all nodes with a suitable status. If no candidates are suitable + /// it will try updating the status of all unsuitable nodes and re-running `func` again. + /// + /// Note: This function returns `Ok(())` if `func` returned successfully on all beacon nodes. + /// It returns a list of errors along with the beacon node id that failed for `func`. + /// Since this ignores the actual result of `func`, this function should only be used for beacon + /// node calls whose results we do not care about, only that they completed successfully. + pub async fn run_on_all<'a, F, O, Err, R>( + &'a self, + require_synced: RequireSynced, + offline_on_failure: OfflineOnFailure, + func: F, + ) -> Result<(), Errors> + where + F: Fn(&'a BeaconNodeHttpClient) -> R, + R: Future>, + { + let mut results = vec![]; + let mut to_retry = vec![]; + let mut retry_unsynced = vec![]; + + // Run `func` using a `candidate`, returning the value or capturing errors. + // + // We use a macro instead of a closure here since it is not trivial to move `func` into a + // closure. + macro_rules! try_func { + ($candidate: ident) => {{ + inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]); + + // There exists a race condition where `func` may be called when the candidate is + // actually not ready. We deem this an acceptable inefficiency. + match func(&$candidate.beacon_node).await { + Ok(val) => results.push(Ok(val)), + Err(e) => { + // If we have an error on this function, make the client as not-ready. + // + // There exists a race condition where the candidate may have been marked + // as ready between the `func` call and now. We deem this an acceptable + // inefficiency. + if matches!(offline_on_failure, OfflineOnFailure::Yes) { + $candidate.set_offline().await; + } + results.push(Err(( + $candidate.beacon_node.to_string(), + Error::RequestFailed(e), + ))); + inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]); + } + } + }}; + } + + // First pass: try `func` on all synced and ready candidates. + // + // This ensures that we always choose a synced node if it is available. + for candidate in &self.candidates { + match candidate.status(RequireSynced::Yes).await { + Err(CandidateError::NotSynced) if require_synced == false => { + // This client is unsynced we will try it after trying all synced clients + retry_unsynced.push(candidate); + } + Err(_) => { + // This client was not ready on the first pass, we might try it again later. + to_retry.push(candidate); + } + Ok(_) => try_func!(candidate), + } + } + + // Second pass: try `func` on ready unsynced candidates. This only runs if we permit + // unsynced candidates. + // + // Due to async race-conditions, it is possible that we will send a request to a candidate + // that has been set to an offline/unready status. This is acceptable. + if require_synced == false { + for candidate in retry_unsynced { + try_func!(candidate); + } + } + + // Third pass: try again, attempting to make non-ready clients become ready. + for candidate in to_retry { + // If the candidate hasn't luckily transferred into the correct state in the meantime, + // force an update of the state. + let new_status = match candidate.status(require_synced).await { + Ok(()) => Ok(()), + Err(_) => { + candidate + .refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log) + .await + } + }; + + match new_status { + Ok(()) => try_func!(candidate), + Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate), + Err(e) => { + results.push(Err(( + candidate.beacon_node.to_string(), + Error::Unavailable(e), + ))); + } + } + } + + let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect(); + + if !errors.is_empty() { + Err(Errors(errors)) + } else { + Ok(()) + } + } + + /// Call `func` on first beacon node that returns success or on all beacon nodes + /// depending on the value of `disable_run_on_all`. + pub async fn run<'a, F, Err, R>( + &'a self, + require_synced: RequireSynced, + offline_on_failure: OfflineOnFailure, + func: F, + ) -> Result<(), Errors> + where + F: Fn(&'a BeaconNodeHttpClient) -> R, + R: Future>, + { + if self.disable_run_on_all { + self.first_success(require_synced, offline_on_failure, func) + .await?; + Ok(()) + } else { + self.run_on_all(require_synced, offline_on_failure, func) + .await + } } } diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index ac1ba116740..b0b69a4f50d 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,4 +1,4 @@ -use crate::beacon_node_fallback::{AllErrored, Error as FallbackError}; +use crate::beacon_node_fallback::{Error as FallbackError, Errors}; use crate::{ beacon_node_fallback::{BeaconNodeFallback, RequireSynced}, graffiti_file::GraffitiFile, @@ -20,8 +20,8 @@ pub enum BlockError { Irrecoverable(String), } -impl From> for BlockError { - fn from(e: AllErrored) -> Self { +impl From> for BlockError { + fn from(e: Errors) -> Self { if e.0.iter().any(|(_, error)| { matches!( error, diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 5c7205a4ae4..ef2e66676a5 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -26,6 +26,16 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { ) .takes_value(true), ) + .arg( + Arg::with_name("disable-run-on-all") + .long("disable-run-on-all") + .value_name("DISABLE_RUN_ON_ALL") + .help("By default, Lighthouse publishes attestation, sync committee subscriptions \ + and proposer preparation messages to all beacon nodes provided in the \ + `--beacon-nodes flag`. This option changes that behaviour such that these \ + api calls only go out to the first available and synced beacon node") + .takes_value(false) + ) // This argument is deprecated, use `--beacon-nodes` instead. .arg( Arg::with_name("server") diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 22472f75124..277a4bd8ded 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -61,6 +61,8 @@ pub struct Config { /// A list of custom certificates that the validator client will additionally use when /// connecting to a beacon node over SSL/TLS. pub beacon_nodes_tls_certs: Option>, + /// Disables publishing http api requests to all beacon nodes for select api calls. + pub disable_run_on_all: bool, } impl Default for Config { @@ -96,6 +98,7 @@ impl Default for Config { builder_proposals: false, builder_registration_timestamp_override: None, gas_limit: None, + disable_run_on_all: false, } } } @@ -177,6 +180,7 @@ impl Config { } config.allow_unsynced_beacon_node = cli_args.is_present("allow-unsynced"); + config.disable_run_on_all = cli_args.is_present("disable-run-on-all"); config.disable_auto_discover = cli_args.is_present("disable-auto-discover"); config.init_slashing_protection = cli_args.is_present("init-slashing-protection"); config.use_long_timeouts = cli_args.is_present("use-long-timeouts"); diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 60b617e6c8f..86b8ca870e2 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -570,12 +570,12 @@ async fn poll_beacon_attesters( }); } - // If there are any subscriptions, push them out to the beacon node. + // If there are any subscriptions, push them out to beacon nodes if !subscriptions.is_empty() { let subscriptions_ref = &subscriptions; if let Err(e) = duties_service .beacon_nodes - .first_success( + .run( duties_service.require_synced, OfflineOnFailure::Yes, |beacon_node| async move { diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 9db4cc03155..005a74edf60 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -327,8 +327,12 @@ impl ProductionValidatorClient { // Initialize the number of connected, avaliable beacon nodes to 0. set_gauge(&http_metrics::metrics::AVAILABLE_BEACON_NODES_COUNT, 0); - let mut beacon_nodes: BeaconNodeFallback<_, T> = - BeaconNodeFallback::new(candidates, context.eth2_config.spec.clone(), log.clone()); + let mut beacon_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new( + candidates, + config.disable_run_on_all, + context.eth2_config.spec.clone(), + log.clone(), + ); // Perform some potentially long-running initialization tasks. let (genesis_time, genesis_validators_root) = tokio::select! { diff --git a/validator_client/src/preparation_service.rs b/validator_client/src/preparation_service.rs index af152545e20..fc80f2ded08 100644 --- a/validator_client/src/preparation_service.rs +++ b/validator_client/src/preparation_service.rs @@ -331,7 +331,7 @@ impl PreparationService { let preparation_entries = preparation_data.as_slice(); match self .beacon_nodes - .first_success( + .run( RequireSynced::Yes, OfflineOnFailure::Yes, |beacon_node| async move { @@ -349,7 +349,7 @@ impl PreparationService { ), Err(e) => error!( log, - "Unable to publish proposer preparation"; + "Unable to publish proposer preparation to all beacon nodes"; "error" => %e, ), } diff --git a/validator_client/src/sync_committee_service.rs b/validator_client/src/sync_committee_service.rs index 1e6ff7a5b5f..5b959453024 100644 --- a/validator_client/src/sync_committee_service.rs +++ b/validator_client/src/sync_committee_service.rs @@ -568,7 +568,7 @@ impl SyncCommitteeService { if let Err(e) = self .beacon_nodes - .first_success( + .run( RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async move {