Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Reduce bandwidth over the VC<>BN API using dependant roots #4170

Closed
wants to merge 11 commits into from
Closed
231 changes: 152 additions & 79 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use crate::{
validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore},
};
use environment::RuntimeContext;
use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId};
use eth2::types::{
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId,
};
use futures::{stream, StreamExt};
use parking_lot::RwLock;
use safe_arith::ArithError;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::cmp::min;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -54,6 +57,11 @@ const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
/// flag in the cli to enable collection of per validator metrics.
const VALIDATOR_METRICS_MIN_COUNT: usize = 64;

/// The number of validators to request duty information for in the initial request.
/// The initial request is used to determine if further requests are required, so that it
/// reduces the amount of data that needs to be transferred.
const INITIAL_DUTIES_QUERY_SIZE: usize = 1;

#[derive(Debug)]
pub enum Error {
UnableToReadSlotClock,
Expand Down Expand Up @@ -531,7 +539,6 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
current_epoch,
&local_indices,
&local_pubkeys,
current_slot,
)
.await
{
Expand All @@ -544,21 +551,18 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
)
}

update_per_validator_duty_metrics::<T, E>(duties_service, current_epoch, current_slot);

drop(current_epoch_timer);
let next_epoch_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::UPDATE_ATTESTERS_NEXT_EPOCH],
);

// Download the duties and update the duties for the next epoch.
if let Err(e) = poll_beacon_attesters_for_epoch(
duties_service,
next_epoch,
&local_indices,
&local_pubkeys,
current_slot,
)
.await
if let Err(e) =
poll_beacon_attesters_for_epoch(duties_service, next_epoch, &local_indices, &local_pubkeys)
.await
{
error!(
log,
Expand All @@ -569,6 +573,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
)
}

update_per_validator_duty_metrics::<T, E>(duties_service, next_epoch, current_slot);

drop(next_epoch_timer);
let subscriptions_timer =
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]);
Expand Down Expand Up @@ -655,7 +661,6 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
epoch: Epoch,
local_indices: &[u64],
local_pubkeys: &HashSet<PublicKeyBytes>,
current_slot: Slot,
) -> Result<(), Error> {
let log = duties_service.context.log();

Expand All @@ -674,84 +679,69 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
&[metrics::UPDATE_ATTESTERS_FETCH],
);

let response = duties_service
.beacon_nodes
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, local_indices)
.await
},
)
.await
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))?;

drop(fetch_timer);
let _store_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::UPDATE_ATTESTERS_STORE],
);
// Request duties for all uninitialized validators. If there isn't any, we will just request for
// `INITIAL_DUTIES_QUERY_SIZE` validators. We use the `dependent_root` in the response to
// determine whether validator duties need to be updated. This is to ensure that we don't
// request for extra data unless necessary in order to save on network bandwidth.
let uninitialized_validators =
get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
let indices_to_request = if !uninitialized_validators.is_empty() {
uninitialized_validators.as_slice()
} else {
&local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())]
};

let response =
post_validator_duties_attester(duties_service, epoch, indices_to_request).await?;
let dependent_root = response.dependent_root;

// Filter any duties that are not relevant or already known.
let new_duties = {
// Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch.
let validators_to_update: Vec<_> = {
// Avoid holding the read-lock for any longer than required.
let attesters = duties_service.attesters.read();
response
local_pubkeys
.iter()
.filter(|pubkey| {
attesters.get(pubkey).map_or(true, |duties| {
duties
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
})
})
.collect::<Vec<_>>()
};

if validators_to_update.is_empty() {
// No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch.
return Ok(());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to make sure we run update_per_validator_duty_metrics so that we start reporting the next-epoch duties as the current_slot progresses past the current-epoch slot.

Since we've changed update_per_validator_duty_metrics, perhaps it makes sense to hoist it up into poll_beacon_attesters (perhaps after each call to poll_beacon_attesters_for_epoch in here)? That way we can exit this whenever we like and still be confident that update_per_validator_duty_metrics is being called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch 🙏 , I've moved this to where you suggested!

}

// Filter out validators which have already been requested.
let initial_duties = &response.data;
let indices_to_request = validators_to_update
.iter()
.filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey))
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.collect::<Vec<_>>();

let new_duties = if !indices_to_request.is_empty() {
post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice())
.await?
.data
.into_iter()
.filter(|duty| {
if duties_service.per_validator_metrics() {
let validator_index = duty.validator_index;
let duty_slot = duty.slot;
if let Some(existing_slot_gauge) =
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());

// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
existing_slot_gauge.set(duty_slot.as_u64() as i64);
}
} else {
set_int_gauge(
&ATTESTATION_DUTY,
&[&validator_index.to_string()],
duty_slot.as_u64() as i64,
);
}
}

local_pubkeys.contains(&duty.pubkey) && {
// Only update the duties if either is true:
//
// - There were no known duties for this epoch.
// - The dependent root has changed, signalling a re-org.
attesters.get(&duty.pubkey).map_or(true, |duties| {
duties
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
})
}
})
.chain(response.data)
.collect::<Vec<_>>()
} else {
response.data
};

drop(fetch_timer);

let _store_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::UPDATE_ATTESTERS_STORE],
);

debug!(
log,
"Downloaded attester duties";
Expand Down Expand Up @@ -799,6 +789,89 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
Ok(())
}

/// Get a filtered list of local validators for which we don't already know their duties for that epoch
fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: &Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Vec<u64> {
let attesters = duties_service.attesters.read();
local_pubkeys
.iter()
.filter(|pubkey| {
attesters
.get(pubkey)
.map_or(true, |duties| !duties.contains_key(epoch))
})
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.collect::<Vec<_>>()
}

fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: Epoch,
current_slot: Slot,
) {
if duties_service.per_validator_metrics() {
let attesters = duties_service.attesters.read();
attesters.values().for_each(|attester_duties_by_epoch| {
if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) {
let duty = &duty_and_proof.duty;
let validator_index = duty.validator_index;
let duty_slot = duty.slot;
if let Some(existing_slot_gauge) =
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());

// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
existing_slot_gauge.set(duty_slot.as_u64() as i64);
}
} else {
set_int_gauge(
&ATTESTATION_DUTY,
&[&validator_index.to_string()],
duty_slot.as_u64() as i64,
);
}
}
});
}
}

async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: Epoch,
validator_indices: &[u64],
) -> Result<DutiesResponse<Vec<AttesterData>>, Error> {
duties_service
.beacon_nodes
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, validator_indices)
.await
},
)
.await
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
}

/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
///
/// Duties are computed in batches each slot. If a re-org is detected then the process will
Expand Down