diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 2530417f90..af40afaa42 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -25,7 +25,7 @@ use crate::{ error::HotShotError, event::{HotShotAction, LeafInfo}, message::Proposal, - simple_certificate::{DaCertificate2, NextEpochQuorumCertificate2, QuorumCertificate2}, + simple_certificate::{DaCertificate2, QuorumCertificate2}, traits::{ block_contents::BuilderFee, metrics::{Counter, Gauge, Histogram, Metrics, NoMetrics}, @@ -34,8 +34,7 @@ use crate::{ BlockPayload, ValidatedState, }, utils::{ - epoch_from_block_number, is_last_block_in_epoch, BuilderCommitment, LeafCommitment, - StateAndDelta, Terminator, + epoch_from_block_number, BuilderCommitment, LeafCommitment, StateAndDelta, Terminator, }, vid::VidCommitment, vote::{Certificate, HasViewNumber}, @@ -318,14 +317,14 @@ pub struct Consensus { /// the highqc per spec high_qc: QuorumCertificate2, - /// The high QC for the next epoch - next_epoch_high_qc: Option>, - /// A reference to the metrics trait pub metrics: Arc, /// Number of blocks in an epoch, zero means there are no epochs pub epoch_height: u64, + + /// Vote tracker to prevent double voting + vote_tracker: VoteTracker, } /// Contains several `ConsensusMetrics` that we're interested in from the consensus interfaces @@ -416,7 +415,6 @@ impl Consensus { saved_leaves: CommitmentMap>, saved_payloads: BTreeMap>, high_qc: QuorumCertificate2, - next_epoch_high_qc: Option>, metrics: Arc, epoch_height: u64, ) -> Self { @@ -433,9 +431,9 @@ impl Consensus { saved_leaves, saved_payloads, high_qc, - next_epoch_high_qc, metrics, epoch_height, + vote_tracker: VoteTracker::new(), } } @@ -464,11 +462,6 @@ impl Consensus { &self.high_qc } - /// Get the next epoch high QC. - pub fn next_epoch_high_qc(&self) -> Option<&NextEpochQuorumCertificate2> { - self.next_epoch_high_qc.as_ref() - } - /// Get the validated state map. pub fn validated_state_map(&self) -> &BTreeMap> { &self.validated_state_map @@ -560,27 +553,91 @@ impl Consensus { /// /// Returns true if the action is for a newer view than the last action of that type pub fn update_action(&mut self, action: HotShotAction, view: TYPES::View) -> bool { - let old_view = match action { - HotShotAction::Vote => &mut self.last_actions.voted, - HotShotAction::Propose => &mut self.last_actions.proposed, - HotShotAction::DaPropose => &mut self.last_actions.da_proposed, + match action { HotShotAction::DaVote => { - if view > self.last_actions.da_vote { - self.last_actions.da_vote = view; + let voter_key = Arc::new(self.public_key().clone()); + + match self.vote_tracker.record_vote(view, voter_key, self.cur_epoch) { + Ok(_) => { + // Update last action view if the new view is higher + if view > self.last_actions.da_vote { + self.last_actions.da_vote = view; + } + + // Log successful vote + tracing::debug!( + "Vote recorded successfully for view {} in epoch {}", + view.as_u64(), + self.cur_epoch.as_u64() + ); + + // Perform periodic cleanup of old vote records + if let Err(e) = self.vote_tracker.cleanup_old_views(view) { + tracing::warn!("Vote cleanup failed: {}", e); + } + + // Log detailed metrics + if let Some(metrics) = self.vote_tracker.get_metrics() { + tracing::debug!( + "Vote metrics - Total: {}, Rejected: {}, Out of Order: {}, Pending: {}", + metrics.total_votes, + metrics.rejected_votes, + metrics.out_of_order_votes, + metrics.pending_votes + ); + } + + true + } + Err(e) => { + match e { + VoteError::MissingPreviousVote(prev_view) => { + tracing::warn!( + "Missing vote in view {} before voting in view {}", + prev_view, + view + ); + } + VoteError::NonSequentialVote => { + tracing::debug!( + "Non-sequential vote detected for view {} (will be queued)", + view + ); + } + VoteError::ViewJumpTooLarge => { + tracing::warn!( + "View jump too large from last view to view {}", + view + ); + } + VoteError::ViewTooOld => { + tracing::warn!( + "View {} is too old (minimum valid view is higher)", + view + ); + } + _ => { + tracing::warn!("Vote validation failed for view {}: {}", view, e); + } + } + false + } } - // TODO Add logic to prevent double voting. For now the simple check if - // the last voted view is less than the view we are trying to vote doesn't work - // because the leader of view n + 1 may propose to the DA (and we would vote) - // before the leader of view n. - return true; } + HotShotAction::Vote => &mut self.last_actions.voted, + HotShotAction::Propose => &mut self.last_actions.proposed, + HotShotAction::DaPropose => &mut self.last_actions.da_proposed, _ => return true, - }; - if view > *old_view { - *old_view = view; - return true; } - false + .map(|old_view| { + if view > *old_view { + *old_view = view; + true + } else { + false + } + }) + .unwrap_or(true) } /// reset last actions to genesis so we can resend events in tests @@ -751,28 +808,6 @@ impl Consensus { Ok(()) } - /// Update the next epoch high QC if given a newer one. - /// # Errors - /// Can return an error when the provided high_qc is not newer than the existing entry. - /// # Panics - /// It can't actually panic. If the option is None, we will not call unwrap on it. - pub fn update_next_epoch_high_qc( - &mut self, - high_qc: NextEpochQuorumCertificate2, - ) -> Result<()> { - if let Some(next_epoch_high_qc) = self.next_epoch_high_qc() { - ensure!( - high_qc.view_number > next_epoch_high_qc.view_number - || high_qc == *next_epoch_high_qc, - debug!("Next epoch high QC with an equal or higher view exists.") - ); - } - tracing::debug!("Updating next epoch high QC"); - self.next_epoch_high_qc = Some(high_qc); - - Ok(()) - } - /// Add a new entry to the vid_shares map. pub fn update_vid_shares( &mut self, @@ -938,7 +973,7 @@ impl Consensus { pub async fn calculate_and_update_vid( consensus: OuterConsensus, view: ::View, - membership: Arc>, + membership: Arc, private_key: &::PrivateKey, ) -> Option<()> { let txns = Arc::clone(consensus.read().await.saved_payloads().get(&view)?); @@ -949,8 +984,7 @@ impl Consensus { .get(&view)? .view_inner .epoch()?; - let vid = - VidDisperse::calculate_vid_disperse(txns, &membership, view, epoch, epoch, None).await; + let vid = VidDisperse::calculate_vid_disperse(txns, &membership, view, epoch, None).await; let shares = VidDisperseShare2::from_vid_disperse(vid); let mut consensus_writer = consensus.write().await; for share in shares { @@ -1044,17 +1078,25 @@ impl Consensus { return false; }; let block_height = leaf.height(); - is_last_block_in_epoch(block_height, self.epoch_height) + if block_height == 0 || self.epoch_height == 0 { + false + } else { + block_height % self.epoch_height == 0 + } } - /// Returns true if our high QC is for the last block in the epoch + /// Returns true if our high qc is for the last block in the epoch pub fn is_high_qc_for_last_block(&self) -> bool { let Some(leaf) = self.saved_leaves.get(&self.high_qc().data.leaf_commit) else { tracing::trace!("We don't have a leaf corresponding to the high QC"); return false; }; let block_height = leaf.height(); - is_last_block_in_epoch(block_height, self.epoch_height) + if block_height == 0 || self.epoch_height == 0 { + false + } else { + block_height % self.epoch_height == 0 + } } /// Returns true if the `parent_leaf` formed an eQC for the previous epoch to the `proposed_leaf` @@ -1068,21 +1110,3 @@ impl Consensus { new_epoch - 1 == old_epoch && self.is_leaf_extended(parent_leaf.commit()) } } - -/// Alias for the block payload commitment and the associated metadata. The primary data -/// needed in order to submit a proposal. -#[derive(Eq, Hash, PartialEq, Debug, Clone)] -pub struct CommitmentAndMetadata { - /// Vid Commitment - pub commitment: VidCommitment, - /// Builder Commitment - pub builder_commitment: BuilderCommitment, - /// Metadata for the block payload - pub metadata: >::Metadata, - /// Builder fee data - pub fees: Vec1>, - /// View number this block is for - pub block_view: TYPES::View, - /// auction result that the block was produced from, if any - pub auction_result: Option, -} diff --git a/crates/types/src/traits/vote_tracker.rs b/crates/types/src/traits/vote_tracker.rs new file mode 100644 index 0000000000..ff140a1526 --- /dev/null +++ b/crates/types/src/traits/vote_tracker.rs @@ -0,0 +1,501 @@ +// Copyright (c) 2021-2024 Espresso Systems (espressosys.com) +// This file is part of the HotShot repository. + +// You should have received a copy of the MIT License +// along with the HotShot repository. If not, see . + +//! Vote tracking implementation for preventing double voting + +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; +use std::num::NonZeroU64; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::traits::node_implementation::NodeType; +use crate::traits::signature_key::SignatureKey; +use thiserror::Error; + +/// Tracks votes to prevent double voting in HotShot consensus +#[derive(Debug)] +pub struct VoteTracker { + /// Maps view numbers to sets of voter public keys + view_votes: HashMap>>, + /// Maps voter public keys to their last voted view + last_vote_views: HashMap, TYPES::View>, + /// Track view transitions for each voter + view_transitions: HashMap, VecDeque>, + /// Track view sequence and pending votes + view_sequence: ViewSequence, + /// Minimum valid view number + min_valid_view: TYPES::View, + /// Window size for vote tracking (number of views to keep) + window_size: NonZeroU64, + /// Maximum allowed view jump to prevent large view number attacks + max_view_jump: NonZeroU64, + /// Maximum number of pending transitions + max_pending_transitions: usize, + /// Metrics for monitoring vote operations + metrics: VoteMetrics, +} + +/// Tracks view sequence and manages pending votes +#[derive(Debug, Default)] +struct ViewSequence { + /// Track all views that have been voted on (ordered) + voted_views: BTreeSet, + /// Track pending votes that are waiting for previous views + pending_votes: BTreeMap>>, + /// Track the minimum required view for each voter + min_required_views: HashMap, TYPES::View>, + /// Track complete view sequences for each voter + voter_sequences: HashMap, BTreeSet>, +} + +/// Metrics for monitoring vote operations +#[derive(Debug, Default)] +struct VoteMetrics { + /// Total number of votes processed + total_votes: u64, + /// Number of rejected votes + rejected_votes: u64, + /// Last cleanup timestamp + last_cleanup: SystemTime, + /// Number of out-of-order votes detected + out_of_order_votes: u64, + /// Number of pending votes + pending_votes: u64, +} + +#[derive(Debug, Error)] +pub enum VoteError { + #[error("View number is too old")] + ViewTooOld, + #[error("Already voted in this or later view")] + AlreadyVoted, + #[error("Invalid view jump detected")] + ViewJumpTooLarge, + #[error("Invalid epoch")] + InvalidEpoch, + #[error("Invalid cleanup operation")] + InvalidCleanup, + #[error("View number overflow detected")] + ViewNumberOverflow, + #[error("Missing vote in previous view {0}")] + MissingPreviousVote(u64), + #[error("Non-sequential vote detected")] + NonSequentialVote, + #[error("View transition violation")] + InvalidViewTransition, + #[error("View sequence violation")] + InvalidViewSequence, +} + +impl ViewSequence { + /// Creates a new ViewSequence + fn new() -> Self { + Self::default() + } + + /// Checks if a vote follows the sequential order + fn is_sequential(&self, view: TYPES::View, voter_key: &Arc) -> Result { + let voter_seq = self.voter_sequences.get(voter_key).unwrap_or(&BTreeSet::new()); + + // If this is the first vote, it's sequential + if voter_seq.is_empty() { + return Ok(true); + } + + // Get the last voted view for this voter + if let Some(&last_view) = voter_seq.iter().last() { + // Check if there are any gaps + let expected_view = last_view + TYPES::View::from(1); + if view != expected_view { + tracing::debug!( + "Non-sequential vote detected: expected view {}, got {}", + expected_view.as_u64(), + view.as_u64() + ); + return Ok(false); + } + } + + Ok(true) + } + + /// Queues a vote for later processing + fn queue_vote(&mut self, view: TYPES::View, voter_key: Arc) -> Result<(), VoteError> { + let pending = self.pending_votes.entry(view).or_default(); + pending.insert(voter_key); + Ok(()) + } + + /// Records a successful vote + fn record_successful_vote(&mut self, view: TYPES::View, voter_key: &Arc) -> Result<(), VoteError> { + // Update voted views + self.voted_views.insert(view); + + // Update voter sequence + let voter_seq = self.voter_sequences.entry(voter_key.clone()).or_default(); + voter_seq.insert(view); + + // Update minimum required view + self.min_required_views.insert(voter_key.clone(), view + TYPES::View::from(1)); + + Ok(()) + } + + /// Processes any pending votes that can now be applied + fn process_pending_votes(&mut self, voter_key: &Arc) -> Result, VoteError> { + let mut processed_views = Vec::new(); + let mut views_to_process: Vec<_> = self.pending_votes.keys().cloned().collect(); + views_to_process.sort(); + + for view in views_to_process { + if let Some(voters) = self.pending_votes.get_mut(&view) { + if voters.contains(voter_key) && self.is_sequential(view, voter_key)? { + voters.remove(voter_key); + self.record_successful_vote(view, voter_key)?; + processed_views.push(view); + + if voters.is_empty() { + self.pending_votes.remove(&view); + } + } + } + } + + Ok(processed_views) + } + + /// Gets the minimum required view for a voter + fn get_min_required_view(&self, voter_key: &Arc) -> TYPES::View { + self.min_required_views + .get(voter_key) + .cloned() + .unwrap_or_else(|| TYPES::View::genesis()) + } + + /// Cleans up old view data + fn cleanup(&mut self, min_valid_view: TYPES::View) { + // Clean up voted views + self.voted_views.retain(|&view| view >= min_valid_view); + + // Clean up pending votes + self.pending_votes.retain(|&view, _| view >= min_valid_view); + + // Clean up voter sequences + for seq in self.voter_sequences.values_mut() { + seq.retain(|&view| view >= min_valid_view); + } + + // Update minimum required views + for min_view in self.min_required_views.values_mut() { + if *min_view < min_valid_view { + *min_view = min_valid_view; + } + } + } +} + +impl VoteTracker { + /// Creates a new VoteTracker instance + pub fn new(window_size: NonZeroU64, max_view_jump: NonZeroU64) -> Self { + Self { + view_votes: HashMap::new(), + last_vote_views: HashMap::new(), + view_transitions: HashMap::new(), + view_sequence: ViewSequence::new(), + min_valid_view: TYPES::View::genesis(), + window_size, + max_view_jump, + max_pending_transitions: 10, + metrics: VoteMetrics::default(), + } + } + + /// Records a vote for a specific view and voter with strict ordering checks + pub fn record_vote( + &mut self, + view: TYPES::View, + voter_key: Arc, + current_epoch: TYPES::Epoch, + ) -> Result { + // Check for integer overflow + if view.as_u64() == u64::MAX { + self.metrics.rejected_votes += 1; + return Err(VoteError::ViewNumberOverflow); + } + + // Validate view number against minimum valid view + if view < self.min_valid_view { + self.metrics.rejected_votes += 1; + return Err(VoteError::ViewTooOld); + } + + // Check for previous votes and validate view sequence + if let Some(last_view) = self.last_vote_views.get(&voter_key) { + if view <= *last_view { + self.metrics.rejected_votes += 1; + return Err(VoteError::AlreadyVoted); + } + + // Validate view transition + self.validate_view_transition(view, &voter_key)?; + } + + // Check for sequential voting + if !self.view_sequence.is_sequential(view, &voter_key)? { + self.metrics.out_of_order_votes += 1; + + // Queue the vote for later processing + self.view_sequence.queue_vote(view, voter_key.clone())?; + self.metrics.pending_votes += 1; + + return Err(VoteError::NonSequentialVote); + } + + // Record the vote in the view sequence + self.view_sequence.record_successful_vote(view, &voter_key)?; + + // Process any pending votes that can now be applied + let processed_views = self.view_sequence.process_pending_votes(&voter_key)?; + self.metrics.pending_votes = self.metrics.pending_votes.saturating_sub(processed_views.len() as u64); + + // Update vote tracking state + self.last_vote_views.insert(voter_key.clone(), view); + let votes = self.view_votes.entry(view).or_default(); + votes.insert(voter_key.clone()); + + // Update view transition history + let transitions = self.view_transitions.entry(voter_key).or_default(); + transitions.push_back(view); + while transitions.len() > self.max_pending_transitions { + transitions.pop_front(); + } + + self.metrics.total_votes += 1; + Ok(true) + } + + /// Validates the transition between views for a voter + fn validate_view_transition( + &mut self, + view: TYPES::View, + voter_key: &Arc, + ) -> Result<(), VoteError> { + let transitions = self.view_transitions.entry(voter_key.clone()).or_default(); + + if let Some(&last_view) = transitions.back() { + // Check for sequential voting + if view <= last_view { + return Err(VoteError::NonSequentialVote); + } + + // Check for too large jumps + let view_diff = view.as_u64().saturating_sub(last_view.as_u64()); + if view_diff > self.max_view_jump.get() { + return Err(VoteError::ViewJumpTooLarge); + } + } + + Ok(()) + } + + /// Checks if a voter has voted in a specific view + fn has_voted_in_view(&self, voter_key: &Arc, view: &TYPES::View) -> bool { + self.view_votes + .get(view) + .map_or(false, |votes| votes.contains(voter_key)) + } + + /// Cleans up old vote records with safety checks + pub fn cleanup_old_views(&mut self, current_view: TYPES::View) -> Result<(), VoteError> { + let new_min_view = current_view.saturating_sub(TYPES::View::from(self.window_size.get())); + if new_min_view < self.min_valid_view { + return Err(VoteError::InvalidCleanup); + } + + self.min_valid_view = new_min_view; + + // Clean up old records while maintaining vote history integrity + self.view_votes.retain(|view, _| *view >= self.min_valid_view); + self.last_vote_views.retain(|_, last_view| *last_view >= self.min_valid_view); + + // Clean up view transitions + for transitions in self.view_transitions.values_mut() { + while let Some(front) = transitions.front() { + if *front < new_min_view { + transitions.pop_front(); + } else { + break; + } + } + } + + // Clean up view sequence data + self.view_sequence.cleanup(new_min_view); + + self.metrics.last_cleanup = SystemTime::now(); + Ok(()) + } + + /// Get current metrics for monitoring + pub fn get_metrics(&self) -> &VoteMetrics { + &self.metrics + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::simple_vote::SimpleVote; + + #[test] + fn test_sequential_voting() { + let mut tracker = VoteTracker::::new( + NonZeroU64::new(10).unwrap(), + NonZeroU64::new(5).unwrap() + ); + let key = Arc::new(SimpleVote::SignatureKey::default()); + + // First vote should succeed + assert!(tracker.record_vote( + SimpleVote::View::from(1), + key.clone(), + SimpleVote::Epoch::default() + ).is_ok()); + + // Non-sequential vote should fail + assert!(tracker.record_vote( + SimpleVote::View::from(3), + key.clone(), + SimpleVote::Epoch::default() + ).is_err()); + + // Sequential vote should succeed + assert!(tracker.record_vote( + SimpleVote::View::from(2), + key.clone(), + SimpleVote::Epoch::default() + ).is_ok()); + } + + #[test] + fn test_pending_votes() { + let mut tracker = VoteTracker::::new( + NonZeroU64::new(10).unwrap(), + NonZeroU64::new(5).unwrap() + ); + let key = Arc::new(SimpleVote::SignatureKey::default()); + + // Vote for view 2 should be queued + let result = tracker.record_vote( + SimpleVote::View::from(2), + key.clone(), + SimpleVote::Epoch::default() + ); + assert!(result.is_err()); + assert_eq!(tracker.metrics.pending_votes, 1); + + // Vote for view 1 should succeed and process pending vote + assert!(tracker.record_vote( + SimpleVote::View::from(1), + key.clone(), + SimpleVote::Epoch::default() + ).is_ok()); + + // Verify metrics + assert_eq!(tracker.metrics.pending_votes, 0); + assert_eq!(tracker.metrics.total_votes, 2); + } + + #[test] + fn test_view_transition_limits() { + let mut tracker = VoteTracker::::new( + NonZeroU64::new(10).unwrap(), + NonZeroU64::new(5).unwrap() + ); + let key = Arc::new(SimpleVote::SignatureKey::default()); + + // Sequential votes within limit should succeed + for i in 1..=5 { + assert!(tracker.record_vote( + SimpleVote::View::from(i), + key.clone(), + SimpleVote::Epoch::default() + ).is_ok()); + } + + // Vote with too large jump should fail + assert!(tracker.record_vote( + SimpleVote::View::from(11), + key.clone(), + SimpleVote::Epoch::default() + ).is_err()); + } + + #[test] + fn test_cross_epoch_voting() { + let mut tracker = VoteTracker::::new( + NonZeroU64::new(10).unwrap(), + NonZeroU64::new(5).unwrap() + ); + let key = Arc::new(SimpleVote::SignatureKey::default()); + + // Vote in first epoch + assert!(tracker.record_vote( + SimpleVote::View::from(1), + key.clone(), + SimpleVote::Epoch::from(1) + ).is_ok()); + + // Vote in next epoch should reset view sequence + assert!(tracker.record_vote( + SimpleVote::View::from(1), + key.clone(), + SimpleVote::Epoch::from(2) + ).is_ok()); + + // Continue voting in new epoch + assert!(tracker.record_vote( + SimpleVote::View::from(2), + key.clone(), + SimpleVote::Epoch::from(2) + ).is_ok()); + } + + #[test] + fn test_recovery_from_nonsequential() { + let mut tracker = VoteTracker::::new( + NonZeroU64::new(10).unwrap(), + NonZeroU64::new(5).unwrap() + ); + let key = Arc::new(SimpleVote::SignatureKey::default()); + + // Vote 1 succeeds + assert!(tracker.record_vote( + SimpleVote::View::from(1), + key.clone(), + SimpleVote::Epoch::default() + ).is_ok()); + + // Vote 3 is queued + let result = tracker.record_vote( + SimpleVote::View::from(3), + key.clone(), + SimpleVote::Epoch::default() + ); + assert!(result.is_err()); + assert_eq!(tracker.metrics.pending_votes, 1); + + // Vote 2 arrives and triggers processing of vote 3 + assert!(tracker.record_vote( + SimpleVote::View::from(2), + key.clone(), + SimpleVote::Epoch::default() + ).is_ok()); + + assert_eq!(tracker.metrics.pending_votes, 0); + assert_eq!(tracker.metrics.total_votes, 3); + } +}