Skip to content

Commit

Permalink
Reduce bandwidth over the VC<>BN API using dependant roots (#4170)
Browse files Browse the repository at this point in the history
## Issue Addressed

#4157 

## Proposed Changes

See description in #4157.

In diagram form:

![reduce-attestation-bandwidth](https://user-images.githubusercontent.com/742762/230277084-f97301c1-0c5d-4fb3-92f9-91f99e4dc7d4.png)


Co-authored-by: Jimmy Chen <jimmy@sigmaprime.io>
  • Loading branch information
jimmygchen and jimmygchen committed May 15, 2023
1 parent b7b4549 commit cf239fe
Showing 1 changed file with 152 additions and 79 deletions.
231 changes: 152 additions & 79 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use crate::{
validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore},
};
use environment::RuntimeContext;
use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId};
use eth2::types::{
AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId,
};
use futures::{stream, StreamExt};
use parking_lot::RwLock;
use safe_arith::ArithError;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::cmp::min;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -54,6 +57,11 @@ const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
/// flag in the cli to enable collection of per validator metrics.
const VALIDATOR_METRICS_MIN_COUNT: usize = 64;

/// The number of validators to request duty information for in the initial request.
/// The initial request is used to determine if further requests are required, so that it
/// reduces the amount of data that needs to be transferred.
const INITIAL_DUTIES_QUERY_SIZE: usize = 1;

#[derive(Debug)]
pub enum Error {
UnableToReadSlotClock,
Expand Down Expand Up @@ -531,7 +539,6 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
current_epoch,
&local_indices,
&local_pubkeys,
current_slot,
)
.await
{
Expand All @@ -544,21 +551,18 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
)
}

update_per_validator_duty_metrics::<T, E>(duties_service, current_epoch, current_slot);

drop(current_epoch_timer);
let next_epoch_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::UPDATE_ATTESTERS_NEXT_EPOCH],
);

// Download the duties and update the duties for the next epoch.
if let Err(e) = poll_beacon_attesters_for_epoch(
duties_service,
next_epoch,
&local_indices,
&local_pubkeys,
current_slot,
)
.await
if let Err(e) =
poll_beacon_attesters_for_epoch(duties_service, next_epoch, &local_indices, &local_pubkeys)
.await
{
error!(
log,
Expand All @@ -569,6 +573,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
)
}

update_per_validator_duty_metrics::<T, E>(duties_service, next_epoch, current_slot);

drop(next_epoch_timer);
let subscriptions_timer =
metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]);
Expand Down Expand Up @@ -655,7 +661,6 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
epoch: Epoch,
local_indices: &[u64],
local_pubkeys: &HashSet<PublicKeyBytes>,
current_slot: Slot,
) -> Result<(), Error> {
let log = duties_service.context.log();

Expand All @@ -674,84 +679,69 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
&[metrics::UPDATE_ATTESTERS_FETCH],
);

let response = duties_service
.beacon_nodes
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, local_indices)
.await
},
)
.await
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))?;

drop(fetch_timer);
let _store_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::UPDATE_ATTESTERS_STORE],
);
// Request duties for all uninitialized validators. If there isn't any, we will just request for
// `INITIAL_DUTIES_QUERY_SIZE` validators. We use the `dependent_root` in the response to
// determine whether validator duties need to be updated. This is to ensure that we don't
// request for extra data unless necessary in order to save on network bandwidth.
let uninitialized_validators =
get_uninitialized_validators(duties_service, &epoch, local_pubkeys);
let indices_to_request = if !uninitialized_validators.is_empty() {
uninitialized_validators.as_slice()
} else {
&local_indices[0..min(INITIAL_DUTIES_QUERY_SIZE, local_indices.len())]
};

let response =
post_validator_duties_attester(duties_service, epoch, indices_to_request).await?;
let dependent_root = response.dependent_root;

// Filter any duties that are not relevant or already known.
let new_duties = {
// Find any validators which have conflicting (epoch, dependent_root) values or missing duties for the epoch.
let validators_to_update: Vec<_> = {
// Avoid holding the read-lock for any longer than required.
let attesters = duties_service.attesters.read();
response
local_pubkeys
.iter()
.filter(|pubkey| {
attesters.get(pubkey).map_or(true, |duties| {
duties
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
})
})
.collect::<Vec<_>>()
};

if validators_to_update.is_empty() {
// No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch.
return Ok(());
}

// Filter out validators which have already been requested.
let initial_duties = &response.data;
let indices_to_request = validators_to_update
.iter()
.filter(|&&&pubkey| !initial_duties.iter().any(|duty| duty.pubkey == pubkey))
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.collect::<Vec<_>>();

let new_duties = if !indices_to_request.is_empty() {
post_validator_duties_attester(duties_service, epoch, indices_to_request.as_slice())
.await?
.data
.into_iter()
.filter(|duty| {
if duties_service.per_validator_metrics() {
let validator_index = duty.validator_index;
let duty_slot = duty.slot;
if let Some(existing_slot_gauge) =
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());

// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
existing_slot_gauge.set(duty_slot.as_u64() as i64);
}
} else {
set_int_gauge(
&ATTESTATION_DUTY,
&[&validator_index.to_string()],
duty_slot.as_u64() as i64,
);
}
}

local_pubkeys.contains(&duty.pubkey) && {
// Only update the duties if either is true:
//
// - There were no known duties for this epoch.
// - The dependent root has changed, signalling a re-org.
attesters.get(&duty.pubkey).map_or(true, |duties| {
duties
.get(&epoch)
.map_or(true, |(prior, _)| *prior != dependent_root)
})
}
})
.chain(response.data)
.collect::<Vec<_>>()
} else {
response.data
};

drop(fetch_timer);

let _store_timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::UPDATE_ATTESTERS_STORE],
);

debug!(
log,
"Downloaded attester duties";
Expand Down Expand Up @@ -799,6 +789,89 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
Ok(())
}

/// Get a filtered list of local validators for which we don't already know their duties for that epoch
fn get_uninitialized_validators<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: &Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Vec<u64> {
let attesters = duties_service.attesters.read();
local_pubkeys
.iter()
.filter(|pubkey| {
attesters
.get(pubkey)
.map_or(true, |duties| !duties.contains_key(epoch))
})
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.collect::<Vec<_>>()
}

fn update_per_validator_duty_metrics<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: Epoch,
current_slot: Slot,
) {
if duties_service.per_validator_metrics() {
let attesters = duties_service.attesters.read();
attesters.values().for_each(|attester_duties_by_epoch| {
if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) {
let duty = &duty_and_proof.duty;
let validator_index = duty.validator_index;
let duty_slot = duty.slot;
if let Some(existing_slot_gauge) =
get_int_gauge(&ATTESTATION_DUTY, &[&validator_index.to_string()])
{
let existing_slot = Slot::new(existing_slot_gauge.get() as u64);
let existing_epoch = existing_slot.epoch(E::slots_per_epoch());

// First condition ensures that we switch to the next epoch duty slot
// once the current epoch duty slot passes.
// Second condition is to ensure that next epoch duties don't override
// current epoch duties.
if existing_slot < current_slot
|| (duty_slot.epoch(E::slots_per_epoch()) <= existing_epoch
&& duty_slot > current_slot
&& duty_slot != existing_slot)
{
existing_slot_gauge.set(duty_slot.as_u64() as i64);
}
} else {
set_int_gauge(
&ATTESTATION_DUTY,
&[&validator_index.to_string()],
duty_slot.as_u64() as i64,
);
}
}
});
}
}

async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
duties_service: &Arc<DutiesService<T, E>>,
epoch: Epoch,
validator_indices: &[u64],
) -> Result<DutiesResponse<Vec<AttesterData>>, Error> {
duties_service
.beacon_nodes
.first_success(
duties_service.require_synced,
OfflineOnFailure::Yes,
|beacon_node| async move {
let _timer = metrics::start_timer_vec(
&metrics::DUTIES_SERVICE_TIMES,
&[metrics::ATTESTER_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_attester(epoch, validator_indices)
.await
},
)
.await
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
}

/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
///
/// Duties are computed in batches each slot. If a re-org is detected then the process will
Expand Down

0 comments on commit cf239fe

Please sign in to comment.