From c3321dddb7e8d8d55a39dadaa6be4abee05dd9a4 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 6 Oct 2023 06:26:18 +0000 Subject: [PATCH] Reduce attestation subscription spam from VC (#4806) ## Proposed Changes Instead of sending every attestation subscription every slot to every BN: - Send subscriptions 32, 16, 8, 7, 6, 5, 4, 3 slots before they occur. - Track whether each subscription is sent successfully and retry it in subsequent slots if necessary. ## Additional Info - [x] Add unit tests for `SubscriptionSlots`. - [x] Test on Holesky. - [x] Based on #4774 for testing. --- validator_client/src/duties_service.rs | 192 +++++++++++++++++++++++-- 1 file changed, 177 insertions(+), 15 deletions(-) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index a3b3cabcccd..9b9105a6217 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -21,11 +21,12 @@ use eth2::types::{ }; use futures::{stream, StreamExt}; use parking_lot::RwLock; -use safe_arith::ArithError; +use safe_arith::{ArithError, SafeArith}; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; use std::cmp::min; use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use sync::poll_sync_committee_duties; @@ -33,14 +34,6 @@ use sync::SyncDutiesMap; use tokio::{sync::mpsc::Sender, time::sleep}; use types::{ChainSpec, Epoch, EthSpec, Hash256, PublicKeyBytes, SelectionProof, Slot}; -/// Since the BN does not like it when we subscribe to slots that are close to the current time, we -/// will only subscribe to slots which are further than `SUBSCRIPTION_BUFFER_SLOTS` away. -/// -/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the -/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid -/// bringing in the entire crate. -const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2; - /// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. const HISTORICAL_DUTIES_EPOCHS: u64 = 2; @@ -62,6 +55,36 @@ const VALIDATOR_METRICS_MIN_COUNT: usize = 64; /// reduces the amount of data that needs to be transferred. const INITIAL_DUTIES_QUERY_SIZE: usize = 1; +/// Offsets from the attestation duty slot at which a subscription should be sent. +const ATTESTATION_SUBSCRIPTION_OFFSETS: [u64; 8] = [3, 4, 5, 6, 7, 8, 16, 32]; + +/// Check that `ATTESTATION_SUBSCRIPTION_OFFSETS` is sorted ascendingly. +const _: () = assert!({ + let mut i = 0; + loop { + let prev = if i > 0 { + ATTESTATION_SUBSCRIPTION_OFFSETS[i - 1] + } else { + 0 + }; + let curr = ATTESTATION_SUBSCRIPTION_OFFSETS[i]; + if curr < prev { + break false; + } + i += 1; + if i == ATTESTATION_SUBSCRIPTION_OFFSETS.len() { + break true; + } + } +}); +/// Since the BN does not like it when we subscribe to slots that are close to the current time, we +/// will only subscribe to slots which are further than 2 slots away. +/// +/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the +/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid +/// bringing in the entire crate. +const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2); + #[derive(Debug)] pub enum Error { UnableToReadSlotClock, @@ -84,6 +107,16 @@ pub struct DutyAndProof { pub duty: AttesterData, /// This value is only set to `Some` if the proof indicates that the validator is an aggregator. pub selection_proof: Option, + /// Track which slots we should send subscriptions at for this duty. + /// + /// This value is updated after each subscription is successfully sent. + pub subscription_slots: Arc, +} + +/// Tracker containing the slots at which an attestation subscription should be sent. +pub struct SubscriptionSlots { + /// Pairs of `(slot, already_sent)` in slot-descending order. + slots: Vec<(Slot, AtomicBool)>, } impl DutyAndProof { @@ -111,17 +144,55 @@ impl DutyAndProof { } })?; + let subscription_slots = SubscriptionSlots::new(duty.slot); + Ok(Self { duty, selection_proof, + subscription_slots, }) } /// Create a new `DutyAndProof` with the selection proof waiting to be filled in. pub fn new_without_selection_proof(duty: AttesterData) -> Self { + let subscription_slots = SubscriptionSlots::new(duty.slot); Self { duty, selection_proof: None, + subscription_slots, + } + } +} + +impl SubscriptionSlots { + fn new(duty_slot: Slot) -> Arc { + let slots = ATTESTATION_SUBSCRIPTION_OFFSETS + .into_iter() + .filter_map(|offset| duty_slot.safe_sub(offset).ok()) + .map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false))) + .collect(); + Arc::new(Self { slots }) + } + + /// Return `true` if we should send a subscription at `slot`. + fn should_send_subscription_at(&self, slot: Slot) -> bool { + // Iterate slots from smallest to largest looking for one that hasn't been completed yet. + self.slots + .iter() + .rev() + .any(|(scheduled_slot, already_sent)| { + slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed) + }) + } + + /// Update our record of subscribed slots to account for successful subscription at `slot`. + fn record_successful_subscription_at(&self, slot: Slot) { + for (scheduled_slot, already_sent) in self.slots.iter().rev() { + if slot >= *scheduled_slot { + already_sent.store(true, Ordering::Relaxed); + } else { + break; + } } } } @@ -574,8 +645,24 @@ async fn poll_beacon_attesters( let subscriptions_timer = metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::SUBSCRIPTIONS]); - // This vector is likely to be a little oversized, but it won't reallocate. - let mut subscriptions = Vec::with_capacity(local_pubkeys.len() * 2); + // This vector is intentionally oversized by 10% so that it won't reallocate. + // Each validator has 2 attestation duties occuring in the current and next epoch, for which + // they must send `ATTESTATION_SUBSCRIPTION_OFFSETS.len()` subscriptions. These subscription + // slots are approximately evenly distributed over the two epochs, usually with a slight lag + // that balances out (some subscriptions for the current epoch were sent in the previous, and + // some subscriptions for the next next epoch will be sent in the next epoch but aren't included + // in our calculation). We cancel the factor of 2 from the formula for simplicity. + let overallocation_numerator = 110; + let overallocation_denominator = 100; + let num_expected_subscriptions = overallocation_numerator + * std::cmp::max( + 1, + local_pubkeys.len() * ATTESTATION_SUBSCRIPTION_OFFSETS.len() + / E::slots_per_epoch() as usize, + ) + / overallocation_denominator; + let mut subscriptions = Vec::with_capacity(num_expected_subscriptions); + let mut subscription_slots_to_confirm = Vec::with_capacity(num_expected_subscriptions); // For this epoch and the next epoch, produce any beacon committee subscriptions. // @@ -588,10 +675,10 @@ async fn poll_beacon_attesters( .read() .iter() .filter_map(|(_, map)| map.get(epoch)) - // The BN logs a warning if we try and subscribe to current or near-by slots. Give it a - // buffer. .filter(|(_, duty_and_proof)| { - current_slot + SUBSCRIPTION_BUFFER_SLOTS < duty_and_proof.duty.slot + duty_and_proof + .subscription_slots + .should_send_subscription_at(current_slot) }) .for_each(|(_, duty_and_proof)| { let duty = &duty_and_proof.duty; @@ -603,7 +690,8 @@ async fn poll_beacon_attesters( committees_at_slot: duty.committees_at_slot, slot: duty.slot, is_aggregator, - }) + }); + subscription_slots_to_confirm.push(duty_and_proof.subscription_slots.clone()); }); } @@ -632,6 +720,16 @@ async fn poll_beacon_attesters( "Failed to subscribe validators"; "error" => %e ) + } else { + // Record that subscriptions were successfully sent. + debug!( + log, + "Broadcast attestation subscriptions"; + "count" => subscriptions.len(), + ); + for subscription_slots in subscription_slots_to_confirm { + subscription_slots.record_successful_subscription_at(current_slot); + } } } @@ -1200,3 +1298,67 @@ async fn notify_block_production_service( }; } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn subscription_slots_exact() { + for duty_slot in [ + Slot::new(32), + Slot::new(47), + Slot::new(99), + Slot::new(1002003), + ] { + let subscription_slots = SubscriptionSlots::new(duty_slot); + + // Run twice to check idempotence (subscription slots shouldn't be marked as done until + // we mark them manually). + for _ in 0..2 { + for offset in ATTESTATION_SUBSCRIPTION_OFFSETS { + assert!(subscription_slots.should_send_subscription_at(duty_slot - offset)); + } + } + + // Mark each slot as complete and check that all prior slots are still marked + // incomplete. + for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS + .into_iter() + .rev() + .enumerate() + { + subscription_slots.record_successful_subscription_at(duty_slot - offset); + for lower_offset in ATTESTATION_SUBSCRIPTION_OFFSETS + .into_iter() + .rev() + .skip(i + 1) + { + assert!(lower_offset < offset); + assert!( + subscription_slots.should_send_subscription_at(duty_slot - lower_offset) + ); + } + } + } + } + #[test] + fn subscription_slots_mark_multiple() { + for (i, offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() { + let duty_slot = Slot::new(64); + let subscription_slots = SubscriptionSlots::new(duty_slot); + + subscription_slots.record_successful_subscription_at(duty_slot - offset); + + // All past offsets (earlier slots) should be marked as complete. + for (j, other_offset) in ATTESTATION_SUBSCRIPTION_OFFSETS.into_iter().enumerate() { + let past = j >= i; + assert_eq!(other_offset >= offset, past); + assert_eq!( + subscription_slots.should_send_subscription_at(duty_slot - other_offset), + !past + ); + } + } + } +}