From 01ae37ac37c746f3275894b454701b00d1fbd65b Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 19 May 2023 05:13:07 +0000 Subject: [PATCH] Add more metrics for tracking sync messages (#4308) ## Issue Addressed NA ## Proposed Changes Adds metrics to track validators that are submitting equivocating (but not slashable) sync messages. This follows on from some research we've been doing in a separate fork of LH. ## Additional Info @jimmygchen and @michaelsproul have already run their eyes over this so it should be easy to get into v4.2.0, IMO. --- beacon_node/beacon_chain/src/metrics.rs | 8 + .../beacon_chain/src/observed_attesters.rs | 283 ++++++++++++++---- .../src/sync_committee_verification.rs | 61 +++- .../tests/sync_committee_verification.rs | 175 ++++++++++- beacon_node/http_api/src/sync_committees.rs | 4 + .../beacon_processor/worker/gossip_methods.rs | 19 ++ 6 files changed, 473 insertions(+), 77 deletions(-) diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index b52c4258fe7..d0f695062f3 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -874,6 +874,14 @@ lazy_static! { "beacon_sync_committee_message_gossip_verification_seconds", "Full runtime of sync contribution gossip verification" ); + pub static ref SYNC_MESSAGE_EQUIVOCATIONS: Result = try_create_int_counter( + "sync_message_equivocations_total", + "Number of sync messages with the same validator index for different blocks" + ); + pub static ref SYNC_MESSAGE_EQUIVOCATIONS_TO_HEAD: Result = try_create_int_counter( + "sync_message_equivocations_to_head_total", + "Number of sync message which conflict with a previous message but elect the head" + ); /* * Sync Committee Contribution Verification diff --git a/beacon_node/beacon_chain/src/observed_attesters.rs b/beacon_node/beacon_chain/src/observed_attesters.rs index ed22beaec68..59c67bd1b95 100644 --- a/beacon_node/beacon_chain/src/observed_attesters.rs +++ b/beacon_node/beacon_chain/src/observed_attesters.rs @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet}; use std::hash::Hash; use std::marker::PhantomData; use types::slot_data::SlotData; -use types::{Epoch, EthSpec, Slot, Unsigned}; +use types::{Epoch, EthSpec, Hash256, Slot, Unsigned}; /// The maximum capacity of the `AutoPruningEpochContainer`. /// @@ -39,10 +39,10 @@ pub const MAX_CACHED_EPOCHS: u64 = 3; pub type ObservedAttesters = AutoPruningEpochContainer; pub type ObservedSyncContributors = - AutoPruningSlotContainer, E>; + AutoPruningSlotContainer, E>; pub type ObservedAggregators = AutoPruningEpochContainer; pub type ObservedSyncAggregators = - AutoPruningSlotContainer; + AutoPruningSlotContainer; #[derive(Debug, PartialEq)] pub enum Error { @@ -62,7 +62,7 @@ pub enum Error { } /// Implemented on an item in an `AutoPruningContainer`. -pub trait Item { +pub trait Item { /// Instantiate `Self` with the given `capacity`. fn with_capacity(capacity: usize) -> Self; @@ -75,11 +75,11 @@ pub trait Item { /// Returns the number of validators that have been observed by `self`. fn validator_count(&self) -> usize; - /// Store `validator_index` in `self`. - fn insert(&mut self, validator_index: usize) -> bool; + /// Store `validator_index` and `value` in `self`. + fn insert(&mut self, validator_index: usize, value: T) -> bool; - /// Returns `true` if `validator_index` has been stored in `self`. - fn contains(&self, validator_index: usize) -> bool; + /// Returns `Some(T)` if there is an entry for `validator_index`. + fn get(&self, validator_index: usize) -> Option; } /// Stores a `BitVec` that represents which validator indices have attested or sent sync committee @@ -88,7 +88,7 @@ pub struct EpochBitfield { bitfield: BitVec, } -impl Item for EpochBitfield { +impl Item<()> for EpochBitfield { fn with_capacity(capacity: usize) -> Self { Self { bitfield: BitVec::with_capacity(capacity), @@ -108,7 +108,7 @@ impl Item for EpochBitfield { self.bitfield.iter().filter(|bit| **bit).count() } - fn insert(&mut self, validator_index: usize) -> bool { + fn insert(&mut self, validator_index: usize, _value: ()) -> bool { self.bitfield .get_mut(validator_index) .map(|mut bit| { @@ -129,8 +129,11 @@ impl Item for EpochBitfield { }) } - fn contains(&self, validator_index: usize) -> bool { - self.bitfield.get(validator_index).map_or(false, |bit| *bit) + fn get(&self, validator_index: usize) -> Option<()> { + self.bitfield + .get(validator_index) + .map_or(false, |bit| *bit) + .then_some(()) } } @@ -140,7 +143,7 @@ pub struct EpochHashSet { set: HashSet, } -impl Item for EpochHashSet { +impl Item<()> for EpochHashSet { fn with_capacity(capacity: usize) -> Self { Self { set: HashSet::with_capacity(capacity), @@ -163,27 +166,27 @@ impl Item for EpochHashSet { /// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was /// already in the set. - fn insert(&mut self, validator_index: usize) -> bool { + fn insert(&mut self, validator_index: usize, _value: ()) -> bool { !self.set.insert(validator_index) } /// Returns `true` if the `validator_index` is in the set. - fn contains(&self, validator_index: usize) -> bool { - self.set.contains(&validator_index) + fn get(&self, validator_index: usize) -> Option<()> { + self.set.contains(&validator_index).then_some(()) } } /// Stores a `HashSet` of which validator indices have created a sync aggregate during a /// slot. pub struct SyncContributorSlotHashSet { - set: HashSet, + map: HashMap, phantom: PhantomData, } -impl Item for SyncContributorSlotHashSet { +impl Item for SyncContributorSlotHashSet { fn with_capacity(capacity: usize) -> Self { Self { - set: HashSet::with_capacity(capacity), + map: HashMap::with_capacity(capacity), phantom: PhantomData, } } @@ -194,22 +197,24 @@ impl Item for SyncContributorSlotHashSet { } fn len(&self) -> usize { - self.set.len() + self.map.len() } fn validator_count(&self) -> usize { - self.set.len() + self.map.len() } /// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was /// already in the set. - fn insert(&mut self, validator_index: usize) -> bool { - !self.set.insert(validator_index) + fn insert(&mut self, validator_index: usize, beacon_block_root: Hash256) -> bool { + self.map + .insert(validator_index, beacon_block_root) + .is_some() } /// Returns `true` if the `validator_index` is in the set. - fn contains(&self, validator_index: usize) -> bool { - self.set.contains(&validator_index) + fn get(&self, validator_index: usize) -> Option { + self.map.get(&validator_index).copied() } } @@ -219,7 +224,7 @@ pub struct SyncAggregatorSlotHashSet { set: HashSet, } -impl Item for SyncAggregatorSlotHashSet { +impl Item<()> for SyncAggregatorSlotHashSet { fn with_capacity(capacity: usize) -> Self { Self { set: HashSet::with_capacity(capacity), @@ -241,13 +246,13 @@ impl Item for SyncAggregatorSlotHashSet { /// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was /// already in the set. - fn insert(&mut self, validator_index: usize) -> bool { + fn insert(&mut self, validator_index: usize, _value: ()) -> bool { !self.set.insert(validator_index) } /// Returns `true` if the `validator_index` is in the set. - fn contains(&self, validator_index: usize) -> bool { - self.set.contains(&validator_index) + fn get(&self, validator_index: usize) -> Option<()> { + self.set.contains(&validator_index).then_some(()) } } @@ -275,7 +280,7 @@ impl Default for AutoPruningEpochContainer { } } -impl AutoPruningEpochContainer { +impl, E: EthSpec> AutoPruningEpochContainer { /// Observe that `validator_index` has produced attestation `a`. Returns `Ok(true)` if `a` has /// previously been observed for `validator_index`. /// @@ -293,7 +298,7 @@ impl AutoPruningEpochContainer { self.prune(epoch); if let Some(item) = self.items.get_mut(&epoch) { - Ok(item.insert(validator_index)) + Ok(item.insert(validator_index, ())) } else { // To avoid re-allocations, try and determine a rough initial capacity for the new item // by obtaining the mean size of all items in earlier epoch. @@ -309,7 +314,7 @@ impl AutoPruningEpochContainer { let initial_capacity = sum.checked_div(count).unwrap_or_else(T::default_capacity); let mut item = T::with_capacity(initial_capacity); - item.insert(validator_index); + item.insert(validator_index, ()); self.items.insert(epoch, item); Ok(false) @@ -333,7 +338,7 @@ impl AutoPruningEpochContainer { let exists = self .items .get(&epoch) - .map_or(false, |item| item.contains(validator_index)); + .map_or(false, |item| item.get(validator_index).is_some()); Ok(exists) } @@ -392,7 +397,7 @@ impl AutoPruningEpochContainer { pub fn index_seen_at_epoch(&self, index: usize, epoch: Epoch) -> bool { self.items .get(&epoch) - .map(|item| item.contains(index)) + .map(|item| item.get(index).is_some()) .unwrap_or(false) } } @@ -405,23 +410,63 @@ impl AutoPruningEpochContainer { /// sync contributions with an epoch prior to `data.slot - 3` will be cleared from the cache. /// /// `V` should be set to a `SyncAggregatorSlotHashSet` or a `SyncContributorSlotHashSet`. -pub struct AutoPruningSlotContainer { +pub struct AutoPruningSlotContainer { lowest_permissible_slot: Slot, items: HashMap, - _phantom: PhantomData, + _phantom_e: PhantomData, + _phantom_s: PhantomData, } -impl Default for AutoPruningSlotContainer { +impl Default for AutoPruningSlotContainer { fn default() -> Self { Self { lowest_permissible_slot: Slot::new(0), items: HashMap::new(), - _phantom: PhantomData, + _phantom_e: PhantomData, + _phantom_s: PhantomData, } } } -impl AutoPruningSlotContainer { +impl, E: EthSpec> + AutoPruningSlotContainer +{ + /// Observes the given `value` for the given `validator_index`. + /// + /// The `override_observation` function is supplied `previous_observation` + /// and `value`. If it returns `true`, then any existing observation will be + /// overridden. + /// + /// This function returns `Some` if: + /// - An observation already existed for the validator, AND, + /// - The `override_observation` function returned `false`. + /// + /// Alternatively, it returns `None` if: + /// - An observation did not already exist for the given validator, OR, + /// - The `override_observation` function returned `true`. + pub fn observe_validator_with_override( + &mut self, + key: K, + validator_index: usize, + value: S, + override_observation: F, + ) -> Result, Error> + where + F: Fn(&S, &S) -> bool, + { + if let Some(prev_observation) = self.observation_for_validator(key, validator_index)? { + if override_observation(&prev_observation, &value) { + self.observe_validator(key, validator_index, value)?; + Ok(None) + } else { + Ok(Some(prev_observation)) + } + } else { + self.observe_validator(key, validator_index, value)?; + Ok(None) + } + } + /// Observe that `validator_index` has produced a sync committee message. Returns `Ok(true)` if /// the sync committee message has previously been observed for `validator_index`. /// @@ -429,14 +474,19 @@ impl AutoPruningSlotContainer Result { + pub fn observe_validator( + &mut self, + key: K, + validator_index: usize, + value: S, + ) -> Result { let slot = key.get_slot(); self.sanitize_request(slot, validator_index)?; self.prune(slot); if let Some(item) = self.items.get_mut(&key) { - Ok(item.insert(validator_index)) + Ok(item.insert(validator_index, value)) } else { // To avoid re-allocations, try and determine a rough initial capacity for the new item // by obtaining the mean size of all items in earlier slot. @@ -452,32 +502,45 @@ impl AutoPruningSlotContainer Result { + self.observation_for_validator(key, validator_index) + .map(|observation| observation.is_some()) + } + + /// Returns `Ok(Some)` if the `validator_index` has already produced a + /// conflicting sync committee message. /// /// ## Errors /// /// - `validator_index` is higher than `VALIDATOR_REGISTRY_LIMIT`. /// - `key.slot` is earlier than `self.lowest_permissible_slot`. - pub fn validator_has_been_observed( + pub fn observation_for_validator( &self, key: K, validator_index: usize, - ) -> Result { + ) -> Result, Error> { self.sanitize_request(key.get_slot(), validator_index)?; - let exists = self + let observation = self .items .get(&key) - .map_or(false, |item| item.contains(validator_index)); + .and_then(|item| item.get(validator_index)); - Ok(exists) + Ok(observation) } /// Returns the number of validators that have been observed at the given `slot`. Returns @@ -561,6 +624,116 @@ mod tests { type E = types::MainnetEthSpec; + #[test] + fn value_storage() { + type Container = AutoPruningSlotContainer, E>; + + let mut store: Container = <_>::default(); + let key = Slot::new(0); + let validator_index = 0; + let value = Hash256::zero(); + + // Assert there is no entry. + assert!(store + .observation_for_validator(key, validator_index) + .unwrap() + .is_none()); + assert!(!store + .validator_has_been_observed(key, validator_index) + .unwrap()); + + // Add an entry. + assert!(!store + .observe_validator(key, validator_index, value) + .unwrap()); + + // Assert there is a correct entry. + assert_eq!( + store + .observation_for_validator(key, validator_index) + .unwrap(), + Some(value) + ); + assert!(store + .validator_has_been_observed(key, validator_index) + .unwrap()); + + let alternate_value = Hash256::from_low_u64_be(1); + + // Assert that override false does not override. + assert_eq!( + store + .observe_validator_with_override(key, validator_index, alternate_value, |_, _| { + false + }) + .unwrap(), + Some(value) + ); + + // Assert that override true overrides and acts as if there was never an + // entry there. + assert_eq!( + store + .observe_validator_with_override(key, validator_index, alternate_value, |_, _| { + true + }) + .unwrap(), + None + ); + assert_eq!( + store + .observation_for_validator(key, validator_index) + .unwrap(), + Some(alternate_value) + ); + + // Reset the store. + let mut store: Container = <_>::default(); + + // Asset that a new entry with override = false is inserted + assert_eq!( + store + .observation_for_validator(key, validator_index) + .unwrap(), + None + ); + assert_eq!( + store + .observe_validator_with_override(key, validator_index, value, |_, _| { false }) + .unwrap(), + None, + ); + assert_eq!( + store + .observation_for_validator(key, validator_index) + .unwrap(), + Some(value) + ); + + // Reset the store. + let mut store: Container = <_>::default(); + + // Asset that a new entry with override = true is inserted + assert_eq!( + store + .observation_for_validator(key, validator_index) + .unwrap(), + None + ); + assert_eq!( + store + .observe_validator_with_override(key, validator_index, value, |_, _| { true }) + .unwrap(), + None, + ); + assert_eq!( + store + .observation_for_validator(key, validator_index) + .unwrap(), + Some(value) + ); + } + macro_rules! test_suite_epoch { ($mod_name: ident, $type: ident) => { #[cfg(test)] @@ -722,7 +895,7 @@ mod tests { test_suite_epoch!(observed_aggregators, ObservedAggregators); macro_rules! test_suite_slot { - ($mod_name: ident, $type: ident) => { + ($mod_name: ident, $type: ident, $value: expr) => { #[cfg(test)] mod $mod_name { use super::*; @@ -737,7 +910,7 @@ mod tests { "should indicate an unknown item is unknown" ); assert_eq!( - store.observe_validator(key, i), + store.observe_validator(key, i, $value), Ok(false), "should observe new item" ); @@ -750,7 +923,7 @@ mod tests { "should indicate a known item is known" ); assert_eq!( - store.observe_validator(key, i), + store.observe_validator(key, i, $value), Ok(true), "should acknowledge an existing item" ); @@ -997,6 +1170,10 @@ mod tests { } }; } - test_suite_slot!(observed_sync_contributors, ObservedSyncContributors); - test_suite_slot!(observed_sync_aggregators, ObservedSyncAggregators); + test_suite_slot!( + observed_sync_contributors, + ObservedSyncContributors, + Hash256::zero() + ); + test_suite_slot!(observed_sync_aggregators, ObservedSyncAggregators, ()); } diff --git a/beacon_node/beacon_chain/src/sync_committee_verification.rs b/beacon_node/beacon_chain/src/sync_committee_verification.rs index 4b4228e71d2..14cdc2400d8 100644 --- a/beacon_node/beacon_chain/src/sync_committee_verification.rs +++ b/beacon_node/beacon_chain/src/sync_committee_verification.rs @@ -153,7 +153,21 @@ pub enum Error { /// It's unclear if this sync message is valid, however we have already observed a /// signature from this validator for this slot and should not observe /// another. - PriorSyncCommitteeMessageKnown { validator_index: u64, slot: Slot }, + PriorSyncCommitteeMessageKnown { + validator_index: u64, + slot: Slot, + prev_root: Hash256, + new_root: Hash256, + }, + /// We have already observed a contribution for the aggregator and refuse to + /// process another. + /// + /// ## Peer scoring + /// + /// It's unclear if this sync message is valid, however we have already observed a + /// signature from this validator for this slot and should not observe + /// another. + PriorSyncContributionMessageKnown { validator_index: u64, slot: Slot }, /// The sync committee message was received on an invalid sync committee message subnet. /// /// ## Peer scoring @@ -378,10 +392,10 @@ impl VerifiedSyncContribution { if chain .observed_sync_aggregators .write() - .observe_validator(observed_key, aggregator_index as usize) + .observe_validator(observed_key, aggregator_index as usize, ()) .map_err(BeaconChainError::from)? { - return Err(Error::PriorSyncCommitteeMessageKnown { + return Err(Error::PriorSyncContributionMessageKnown { validator_index: aggregator_index, slot: contribution.slot, }); @@ -450,19 +464,40 @@ impl VerifiedSyncCommitteeMessage { // The sync committee message is the first valid message received for the participating validator // for the slot, sync_message.slot. let validator_index = sync_message.validator_index; - if chain + let head_root = chain.canonical_head.cached_head().head_block_root(); + let new_root = sync_message.beacon_block_root; + let should_override_prev = |prev_root: &Hash256, new_root: &Hash256| { + let roots_differ = new_root != prev_root; + let new_elects_head = new_root == &head_root; + + if roots_differ { + // Track sync committee messages that differ from each other. + metrics::inc_counter(&metrics::SYNC_MESSAGE_EQUIVOCATIONS); + if new_elects_head { + // Track sync committee messages that swap from an old block to a new block. + metrics::inc_counter(&metrics::SYNC_MESSAGE_EQUIVOCATIONS_TO_HEAD); + } + } + + roots_differ && new_elects_head + }; + if let Some(prev_root) = chain .observed_sync_contributors .read() - .validator_has_been_observed( + .observation_for_validator( SlotSubcommitteeIndex::new(sync_message.slot, subnet_id.into()), validator_index as usize, ) .map_err(BeaconChainError::from)? { - return Err(Error::PriorSyncCommitteeMessageKnown { - validator_index, - slot: sync_message.slot, - }); + if !should_override_prev(&prev_root, &new_root) { + return Err(Error::PriorSyncCommitteeMessageKnown { + validator_index, + slot: sync_message.slot, + prev_root, + new_root, + }); + } } // The aggregate signature of the sync committee message is valid. @@ -474,18 +509,22 @@ impl VerifiedSyncCommitteeMessage { // It's important to double check that the sync committee message still hasn't been observed, since // there can be a race-condition if we receive two sync committee messages at the same time and // process them in different threads. - if chain + if let Some(prev_root) = chain .observed_sync_contributors .write() - .observe_validator( + .observe_validator_with_override( SlotSubcommitteeIndex::new(sync_message.slot, subnet_id.into()), validator_index as usize, + sync_message.beacon_block_root, + should_override_prev, ) .map_err(BeaconChainError::from)? { return Err(Error::PriorSyncCommitteeMessageKnown { validator_index, slot: sync_message.slot, + prev_root, + new_root, }); } diff --git a/beacon_node/beacon_chain/tests/sync_committee_verification.rs b/beacon_node/beacon_chain/tests/sync_committee_verification.rs index 239f55e7d38..4204a51212a 100644 --- a/beacon_node/beacon_chain/tests/sync_committee_verification.rs +++ b/beacon_node/beacon_chain/tests/sync_committee_verification.rs @@ -5,12 +5,16 @@ use beacon_chain::test_utils::{BeaconChainHarness, EphemeralHarnessType, Relativ use int_to_bytes::int_to_bytes32; use lazy_static::lazy_static; use safe_arith::SafeArith; +use state_processing::{ + per_block_processing::{altair::sync_committee::process_sync_aggregate, VerifySignatures}, + state_advance::complete_state_advance, +}; use store::{SignedContributionAndProof, SyncCommitteeMessage}; use tree_hash::TreeHash; use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT; use types::{ AggregateSignature, Epoch, EthSpec, Hash256, Keypair, MainnetEthSpec, SecretKey, Slot, - SyncSelectionProof, SyncSubnetId, Unsigned, + SyncContributionData, SyncSelectionProof, SyncSubnetId, Unsigned, }; pub type E = MainnetEthSpec; @@ -47,10 +51,29 @@ fn get_valid_sync_committee_message( relative_sync_committee: RelativeSyncCommittee, message_index: usize, ) -> (SyncCommitteeMessage, usize, SecretKey, SyncSubnetId) { - let head_state = harness.chain.head_beacon_state_cloned(); let head_block_root = harness.chain.head_snapshot().beacon_block_root; + get_valid_sync_committee_message_for_block( + harness, + slot, + relative_sync_committee, + message_index, + head_block_root, + ) +} + +/// Returns a sync message that is valid for some slot in the given `chain`. +/// +/// Also returns some info about who created it. +fn get_valid_sync_committee_message_for_block( + harness: &BeaconChainHarness>, + slot: Slot, + relative_sync_committee: RelativeSyncCommittee, + message_index: usize, + block_root: Hash256, +) -> (SyncCommitteeMessage, usize, SecretKey, SyncSubnetId) { + let head_state = harness.chain.head_beacon_state_cloned(); let (signature, _) = harness - .make_sync_committee_messages(&head_state, head_block_root, slot, relative_sync_committee) + .make_sync_committee_messages(&head_state, block_root, slot, relative_sync_committee) .get(0) .expect("sync messages should exist") .get(message_index) @@ -119,7 +142,7 @@ fn get_non_aggregator( subcommittee.iter().find_map(|pubkey| { let validator_index = harness .chain - .validator_index(&pubkey) + .validator_index(pubkey) .expect("should get validator index") .expect("pubkey should exist in beacon chain"); @@ -376,7 +399,7 @@ async fn aggregated_gossip_verification() { SyncCommitteeError::AggregatorNotInCommittee { aggregator_index } - if aggregator_index == valid_aggregate.message.aggregator_index as u64 + if aggregator_index == valid_aggregate.message.aggregator_index ); /* @@ -472,7 +495,7 @@ async fn aggregated_gossip_verification() { assert_invalid!( "sync contribution created with incorrect sync committee", - next_valid_contribution.clone(), + next_valid_contribution, SyncCommitteeError::InvalidSignature | SyncCommitteeError::AggregatorNotInCommittee { .. } ); } @@ -496,6 +519,30 @@ async fn unaggregated_gossip_verification() { let (valid_sync_committee_message, expected_validator_index, validator_sk, subnet_id) = get_valid_sync_committee_message(&harness, current_slot, RelativeSyncCommittee::Current, 0); + let parent_root = harness.chain.head_snapshot().beacon_block.parent_root(); + let (valid_sync_committee_message_to_parent, _, _, _) = + get_valid_sync_committee_message_for_block( + &harness, + current_slot, + RelativeSyncCommittee::Current, + 0, + parent_root, + ); + + assert_eq!( + valid_sync_committee_message.slot, valid_sync_committee_message_to_parent.slot, + "test pre-condition: same slot" + ); + assert_eq!( + valid_sync_committee_message.validator_index, + valid_sync_committee_message_to_parent.validator_index, + "test pre-condition: same validator index" + ); + assert!( + valid_sync_committee_message.beacon_block_root + != valid_sync_committee_message_to_parent.beacon_block_root, + "test pre-condition: differing roots" + ); macro_rules! assert_invalid { ($desc: tt, $attn_getter: expr, $subnet_getter: expr, $($error: pat_param) |+ $( if $guard: expr )?) => { @@ -602,28 +649,130 @@ async fn unaggregated_gossip_verification() { SyncCommitteeError::InvalidSignature ); + let head_root = valid_sync_committee_message.beacon_block_root; + let parent_root = valid_sync_committee_message_to_parent.beacon_block_root; + + let verifed_message_to_parent = harness + .chain + .verify_sync_committee_message_for_gossip( + valid_sync_committee_message_to_parent.clone(), + subnet_id, + ) + .expect("valid sync message to parent should be verified"); + // Add the aggregate to the pool. harness + .chain + .add_to_naive_sync_aggregation_pool(verifed_message_to_parent) + .unwrap(); + + /* + * The following test ensures that: + * + * A sync committee message from the same validator to the same block will + * be rejected. + */ + assert_invalid!( + "sync message to parent block that has already been seen", + valid_sync_committee_message_to_parent.clone(), + subnet_id, + SyncCommitteeError::PriorSyncCommitteeMessageKnown { + validator_index, + slot, + prev_root, + new_root + } + if validator_index == expected_validator_index as u64 && slot == current_slot && prev_root == parent_root && new_root == parent_root + ); + + let verified_message_to_head = harness .chain .verify_sync_committee_message_for_gossip(valid_sync_committee_message.clone(), subnet_id) - .expect("valid sync message should be verified"); + .expect("valid sync message to the head should be verified"); + // Add the aggregate to the pool. + harness + .chain + .add_to_naive_sync_aggregation_pool(verified_message_to_head) + .unwrap(); + + /* + * The following test ensures that: + * + * A sync committee message from the same validator to the same block will + * be rejected. + */ + assert_invalid!( + "sync message to the head that has already been seen", + valid_sync_committee_message.clone(), + subnet_id, + SyncCommitteeError::PriorSyncCommitteeMessageKnown { + validator_index, + slot, + prev_root, + new_root + } + if validator_index == expected_validator_index as u64 && slot == current_slot && prev_root == head_root && new_root == head_root + ); /* * The following test ensures that: * - * There has been no other valid sync committee message for the declared slot for the - * validator referenced by sync_committee_message.validator_index. + * A sync committee message from the same validator to a non-head block will + * be rejected. */ assert_invalid!( - "sync message that has already been seen", - valid_sync_committee_message, + "sync message to parent after message to head has already been seen", + valid_sync_committee_message_to_parent.clone(), subnet_id, SyncCommitteeError::PriorSyncCommitteeMessageKnown { validator_index, slot, + prev_root, + new_root } - if validator_index == expected_validator_index as u64 && slot == current_slot + if validator_index == expected_validator_index as u64 && slot == current_slot && prev_root == head_root && new_root == parent_root ); + // Ensure that the sync aggregates in the op pool for both the parent block and head block are valid. + let chain = &harness.chain; + let check_sync_aggregate = |root: Hash256| async move { + // Generate an aggregate sync message from the naive aggregation pool. + let aggregate = chain + .get_aggregated_sync_committee_contribution(&SyncContributionData { + // It's a test pre-condition that both sync messages have the same slot. + slot: valid_sync_committee_message.slot, + beacon_block_root: root, + subcommittee_index: subnet_id.into(), + }) + .unwrap() + .unwrap(); + + // Insert the aggregate into the op pool. + chain.op_pool.insert_sync_contribution(aggregate).unwrap(); + + // Load the block and state for the given root. + let block = chain.get_block(&root).await.unwrap().unwrap(); + let mut state = chain.get_state(&block.state_root(), None).unwrap().unwrap(); + + // Advance the state to simulate a pre-state for block production. + let slot = valid_sync_committee_message.slot + 1; + complete_state_advance(&mut state, Some(block.state_root()), slot, &chain.spec).unwrap(); + + // Get an aggregate that would be included in a block. + let aggregate_for_inclusion = chain.op_pool.get_sync_aggregate(&state).unwrap().unwrap(); + + // Validate the retrieved aggregate against the state. + process_sync_aggregate( + &mut state, + &aggregate_for_inclusion, + 0, + VerifySignatures::True, + &chain.spec, + ) + .unwrap(); + }; + check_sync_aggregate(valid_sync_committee_message.beacon_block_root).await; + check_sync_aggregate(valid_sync_committee_message_to_parent.beacon_block_root).await; + /* * The following test ensures that: * @@ -649,7 +798,7 @@ async fn unaggregated_gossip_verification() { assert_invalid!( "sync message on incorrect subnet", - next_valid_sync_committee_message.clone(), + next_valid_sync_committee_message, next_subnet_id, SyncCommitteeError::InvalidSubnetId { received, diff --git a/beacon_node/http_api/src/sync_committees.rs b/beacon_node/http_api/src/sync_committees.rs index a6acf308fa2..c728fbeb14e 100644 --- a/beacon_node/http_api/src/sync_committees.rs +++ b/beacon_node/http_api/src/sync_committees.rs @@ -199,10 +199,14 @@ pub fn process_sync_committee_signatures( Err(SyncVerificationError::PriorSyncCommitteeMessageKnown { validator_index, slot, + prev_root, + new_root, }) => { debug!( log, "Ignoring already-known sync message"; + "new_root" => ?new_root, + "prev_root" => ?prev_root, "slot" => slot, "validator_index" => validator_index, ); diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 9d85bc545e6..cb4533f5aee 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -2343,6 +2343,25 @@ impl Worker { "peer_id" => %peer_id, "type" => ?message_type, ); + + // Do not penalize the peer. + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + + return; + } + SyncCommitteeError::PriorSyncContributionMessageKnown { .. } => { + /* + * We have already seen a sync contribution message from this validator for this epoch. + * + * The peer is not necessarily faulty. + */ + debug!( + self.log, + "Prior sync contribution message known"; + "peer_id" => %peer_id, + "type" => ?message_type, + ); // We still penalize the peer slightly. We don't want this to be a recurring // behaviour. self.gossip_penalize_peer(