From 66866b1ed658be80065d2657cbe2d31d6698258f Mon Sep 17 00:00:00 2001 From: maximilien-hyle Date: Wed, 29 Jan 2025 18:01:18 +0100 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Refacto=20Storage's=20Lane?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/mempool.rs | 557 +++++---- src/mempool/storage.rs | 1981 +++++++++------------------------ src/mempool/storage_fjall.rs | 150 +++ src/mempool/storage_memory.rs | 103 ++ src/tests/autobahn_testing.rs | 6 +- 5 files changed, 1070 insertions(+), 1727 deletions(-) create mode 100644 src/mempool/storage_fjall.rs create mode 100644 src/mempool/storage_memory.rs diff --git a/src/mempool.rs b/src/mempool.rs index 009737b0b..63feacc49 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -4,7 +4,6 @@ use crate::{ bus::{command_response::Query, BusClientSender, BusMessage}, consensus::{CommittedConsensusProposal, ConsensusEvent}, genesis::GenesisEvent, - mempool::storage::Storage, model::*, module_handle_messages, node_state::module::NodeStateEvent, @@ -33,7 +32,10 @@ use std::{ path::PathBuf, sync::Arc, }; -use storage::{DataProposalVerdict, LaneBytesSize, LaneEntry}; +use storage::{DataProposalVerdict, LaneEntry, Storage}; +// Pick one of the two implementations +// use storage_memory::LanesStorage; +use storage_fjall::LanesStorage; use strum_macros::IntoStaticStr; use tracing::{debug, error, info, trace, warn}; @@ -42,6 +44,8 @@ use verifiers::{verify_proof, verify_recursive_proof}; pub mod api; pub mod metrics; pub mod storage; +pub mod storage_fjall; +pub mod storage_memory; pub mod verifiers; #[derive(Debug, Clone)] @@ -90,7 +94,6 @@ struct MempoolBusClient { #[derive(Default, Encode, Decode)] pub struct MempoolStore { - storage: Storage, buffered_proposals: BTreeMap>, pending_txs: Vec, last_ccp: Option, @@ -106,6 +109,7 @@ pub struct Mempool { conf: SharedConf, crypto: SharedBlstCrypto, metrics: MempoolMetrics, + lanes: LanesStorage, inner: MempoolStore, } @@ -169,26 +173,23 @@ impl Module for Mempool { } } - let lanes_tip = Self::load_from_disk::>( + let attributes = Self::load_from_disk::( ctx.common .config .data_directory - .join("mempool_lanes_tip.bin") + .join("mempool.bin") .as_path(), ) .unwrap_or_default(); - let attributes = Self::load_from_disk::( + let lanes_tip = Self::load_from_disk::>( ctx.common .config .data_directory - .join("mempool.bin") + .join("mempool_lanes_tip.bin") .as_path(), ) - .unwrap_or(MempoolStore { - storage: Storage::new(ctx.node.crypto.validator_pubkey().clone(), lanes_tip), - ..MempoolStore::default() - }); + .unwrap_or_default(); // Register the Hyle contract to be able to handle registrations. #[allow(clippy::expect_used, reason = "not held across await")] @@ -206,6 +207,11 @@ impl Module for Mempool { conf: ctx.common.config.clone(), metrics, crypto: Arc::clone(&ctx.node.crypto), + lanes: LanesStorage::new( + &ctx.common.config.data_directory, + ctx.node.crypto.validator_pubkey().clone(), + lanes_tip, + )?, inner: attributes, }) } @@ -253,7 +259,7 @@ impl Mempool { } } command_response staking => { - Ok(self.handle_querynewcut(staking)) + self.handle_querynewcut(staking) } _ = interval.tick() => { let _ = self.handle_data_proposal_management() @@ -267,7 +273,7 @@ impl Mempool { } if let Err(e) = Self::save_on_disk( file.join("mempool_lanes_tip.bin").as_path(), - &self.storage.lanes_tip, + &self.lanes.lanes_tip, ) { warn!("Failed to save mempool storage on disk: {}", e); } @@ -304,9 +310,14 @@ impl Mempool { } /// Creates a cut with local material on QueryNewCut message reception (from consensus) - fn handle_querynewcut(&mut self, staking: &mut QueryNewCut) -> Cut { + fn handle_querynewcut(&mut self, staking: &mut QueryNewCut) -> Result { self.metrics.add_new_cut(staking); - self.storage.new_cut(&staking.0) + let previous_cut = self + .last_ccp + .as_ref() + .map(|ccp| ccp.consensus_proposal.cut.clone()) + .unwrap_or_default(); + self.lanes.new_cut(&staking.0, &previous_cut) } fn handle_api_message(&mut self, command: RestApiMessage) -> Result<()> { @@ -342,60 +353,67 @@ impl Mempool { // Create new DataProposal with pending txs let crypto = self.crypto.clone(); let new_txs = std::mem::take(&mut self.pending_txs); - self.storage.new_data_proposal(&crypto, new_txs); // TODO: copy crypto in storage + self.lanes.new_data_proposal(&crypto, new_txs)?; // TODO: copy crypto in storage + let last_cut = self + .last_ccp + .as_ref() + .map(|ccp| ccp.consensus_proposal.cut.clone()); // Check for each pending DataProposal if it has enough signatures - if let Some(entries) = self.storage.get_lane_pending_entries(&self.storage.id) { - for lane_entry in entries { - // If there's only 1 signature (=own signature), broadcast it to everyone - if lane_entry.signatures.len() == 1 && self.staking.bonded().len() > 1 { - debug!( - "🚗 Broadcast DataProposal {} ({} validators, {} txs)", - lane_entry.data_proposal.hash(), - self.staking.bonded().len(), - lane_entry.data_proposal.txs.len() - ); - self.metrics.add_data_proposal(&lane_entry.data_proposal); - self.metrics.add_proposed_txs(&lane_entry.data_proposal); - self.broadcast_net_message(MempoolNetMessage::DataProposal( - lane_entry.data_proposal.clone(), - ))?; - } else { - // If None, rebroadcast it to every validator that has not yet signed it - let validator_that_has_signed: HashSet<&ValidatorPublicKey> = lane_entry - .signatures - .iter() - .map(|s| &s.signature.validator) - .collect(); - - // No PoA means we rebroadcast the DataProposal for non present voters - let only_for: HashSet = self - .staking - .bonded() - .iter() - .filter(|pubkey| !validator_that_has_signed.contains(pubkey)) - .cloned() - .collect(); - - if only_for.is_empty() { - continue; - } + let entries = self + .lanes + .get_lane_pending_entries(&self.lanes.id, last_cut)?; - self.metrics.add_data_proposal(&lane_entry.data_proposal); - self.metrics.add_proposed_txs(&lane_entry.data_proposal); - debug!( - "🚗 Broadcast DataProposal {} (only for {} validators, {} txs)", - &lane_entry.data_proposal.hash(), - only_for.len(), - &lane_entry.data_proposal.txs.len() - ); - self.broadcast_only_for_net_message( - only_for, - MempoolNetMessage::DataProposal(lane_entry.data_proposal.clone()), - )?; + for lane_entry in entries { + // If there's only 1 signature (=own signature), broadcast it to everyone + if lane_entry.signatures.len() == 1 && self.staking.bonded().len() > 1 { + debug!( + "🚗 Broadcast DataProposal {} ({} validators, {} txs)", + lane_entry.data_proposal.hash(), + self.staking.bonded().len(), + lane_entry.data_proposal.txs.len() + ); + self.metrics.add_data_proposal(&lane_entry.data_proposal); + self.metrics.add_proposed_txs(&lane_entry.data_proposal); + self.broadcast_net_message(MempoolNetMessage::DataProposal( + lane_entry.data_proposal.clone(), + ))?; + } else { + // If None, rebroadcast it to every validator that has not yet signed it + let validator_that_has_signed: HashSet<&ValidatorPublicKey> = lane_entry + .signatures + .iter() + .map(|s| &s.signature.validator) + .collect(); + + // No PoA means we rebroadcast the DataProposal for non present voters + let only_for: HashSet = self + .staking + .bonded() + .iter() + .filter(|pubkey| !validator_that_has_signed.contains(pubkey)) + .cloned() + .collect(); + + if only_for.is_empty() { + continue; } + + self.metrics.add_data_proposal(&lane_entry.data_proposal); + self.metrics.add_proposed_txs(&lane_entry.data_proposal); + debug!( + "🚗 Broadcast DataProposal {} (only for {} validators, {} txs)", + &lane_entry.data_proposal.hash(), + only_for.len(), + &lane_entry.data_proposal.txs.len() + ); + self.broadcast_only_for_net_message( + only_for, + MempoolNetMessage::DataProposal(lane_entry.data_proposal.clone()), + )?; } } + Ok(()) } @@ -418,7 +436,7 @@ impl Mempool { .map(|el| &el.1); let entries = self - .storage + .lanes .get_lane_entries_between_hashes( validator, // get start hash for validator from_hash, @@ -553,11 +571,6 @@ impl Mempool { // Fetch in advance data proposals self.fetch_unknown_data_proposals(&cut)?; - // Update all lanes with the new cut - self.storage.update_lanes_with_commited_cut(&cut); - - self.storage.try_update_lanes_tip(&cut); - Ok(()) } } @@ -566,20 +579,14 @@ impl Mempool { fn fetch_unknown_data_proposals(&mut self, cut: &Cut) -> Result<()> { // Detect all unknown data proposals for (validator, data_proposal_hash, _, _) in cut.iter() { - if !self - .storage - .lane_has_data_proposal(validator, data_proposal_hash) - { - let latest_data_proposal = self - .storage - .get_lane_latest_data_proposal_hash(validator) - .cloned(); + if !self.lanes.contains(validator, data_proposal_hash) { + let latest_data_proposal = self.lanes.lanes_tip.get(validator).cloned(); // Send SyncRequest for all unknown data proposals self.send_sync_request( validator, - latest_data_proposal.as_ref(), Some(data_proposal_hash), + latest_data_proposal.as_ref(), ) .context("Fetching unknown data")?; } @@ -613,7 +620,7 @@ impl Mempool { self.on_data_vote(&msg, data_proposal_hash, data_proposal_size)?; } MempoolNetMessage::PoDAUpdate(data_proposal_hash, signatures) => { - self.on_poda_update(validator, &data_proposal_hash, signatures); + self.on_poda_update(validator, &data_proposal_hash, signatures)? } MempoolNetMessage::SyncRequest(from_data_proposal_hash, to_data_proposal_hash) => { self.on_sync_request( @@ -634,7 +641,7 @@ impl Mempool { validator: &ValidatorPublicKey, mut missing_lane_entries: Vec, ) -> Result<()> { - info!("{} SyncReply from validator {validator}", self.storage.id); + info!("{} SyncReply from validator {validator}", self.lanes.id); // Discard any lane entry that wasn't signed by the validator. missing_lane_entries.retain(|lane_entry| { @@ -661,11 +668,11 @@ impl Mempool { debug!( "{} adding {} missing lane entries to lane of {validator}", - self.storage.id, + self.lanes.id, missing_lane_entries.len() ); - self.storage + self.lanes .add_missing_lane_entries(validator, missing_lane_entries)?; let mut waiting_proposals = match self.buffered_proposals.get_mut(validator) { @@ -691,17 +698,17 @@ impl Mempool { ) -> Result<()> { info!( "{} SyncRequest received from validator {validator} for last_data_proposal_hash {:?}", - self.storage.id, to_data_proposal_hash + self.lanes.id, to_data_proposal_hash ); - let missing_lane_entries = self.storage.get_lane_entries_between_hashes( - &self.storage.id, + let missing_lane_entries = self.lanes.get_lane_entries_between_hashes( + &self.lanes.id, from_data_proposal_hash, to_data_proposal_hash, ); match missing_lane_entries { - Err(e) => info!("Can't send sync reply as there are no missing data proposals found between {:?} and {:?} for {}: {}", to_data_proposal_hash, from_data_proposal_hash, self.storage.id, e), + Err(e) => info!("Can't send sync reply as there are no missing data proposals found between {:?} and {:?} for {}: {}", to_data_proposal_hash, from_data_proposal_hash, self.lanes.id, e), Ok(lane_entries) => { debug!( "Missing data proposals on {} are {:?}", @@ -720,9 +727,9 @@ impl Mempool { new_lane_size: LaneBytesSize, ) -> Result<()> { let validator = &msg.signature.validator; - debug!("{} Vote from {}", self.storage.id, validator); + debug!("{} Vote from {}", self.lanes.id, validator); let (data_proposal_hash, signatures) = - self.storage + self.lanes .on_data_vote(msg, data_proposal_hash, new_lane_size)?; // Compute voting power of all signers to check if the DataProposal received enough votes @@ -759,15 +766,15 @@ impl Mempool { validator: &ValidatorPublicKey, data_proposal_hash: &DataProposalHash, signatures: Vec>, - ) { + ) -> Result<()> { debug!( "Received {} signatures for DataProposal {} of validator {}", signatures.len(), data_proposal_hash, validator ); - self.storage - .on_poda_update(validator, data_proposal_hash, signatures); + self.lanes + .on_poda_update(validator, data_proposal_hash, signatures) } fn on_data_proposal( @@ -783,7 +790,7 @@ impl Mempool { data_proposal.estimate_size() ); let data_proposal_hash = data_proposal.hash(); - let (verdict, lane_size) = self.storage.on_data_proposal(validator, &data_proposal); + let (verdict, lane_size) = self.lanes.on_data_proposal(validator, &data_proposal)?; match verdict { DataProposalVerdict::Empty => { warn!( @@ -804,7 +811,7 @@ impl Mempool { let sender = sender.clone(); let validator = validator.clone(); tokio::task::spawn_blocking(move || { - let decision = Storage::process_data_proposal(&mut data_proposal, kc); + let decision = LanesStorage::process_data_proposal(&mut data_proposal, kc); sender.send(InternalMempoolEvent::OnProcessedDataProposal(( validator, decision, @@ -822,8 +829,8 @@ impl Mempool { // We dont have the parent, so we craft a sync demand debug!( - "Emitting sync request with local state {} last_known_data_proposal_hash {:?}", - self.storage, last_known_data_proposal_hash + "Emitting sync request to {}, last_known_data_proposal_hash {:?}", + validator, last_known_data_proposal_hash ); if let Some(parent) = data_proposal_parent_hash { @@ -868,8 +875,8 @@ impl Mempool { trace!("Send vote for DataProposal"); let crypto = self.crypto.clone(); let size = self - .storage - .store_data_proposal(&crypto, &validator, data_proposal); + .lanes + .store_data_proposal(&crypto, &validator, data_proposal)?; self.send_vote(&validator, data_proposal_hash, size)?; } DataProposalVerdict::Refuse => { @@ -1133,7 +1140,8 @@ pub mod test { impl MempoolTestCtx { pub async fn build_mempool(shared_bus: &SharedMessageBus, crypto: BlstCrypto) -> Mempool { let pubkey = crypto.validator_pubkey(); - let storage = Storage::new(pubkey.clone(), HashMap::default()); + let tmp_dir = tempfile::tempdir().unwrap().into_path(); + let lanes = LanesStorage::new(&tmp_dir, pubkey.clone(), HashMap::default()).unwrap(); let bus = MempoolBusClient::new_from_bus(shared_bus.new_handle()).await; // Initialize Mempool @@ -1143,10 +1151,8 @@ pub mod test { conf: SharedConf::default(), crypto: Arc::new(crypto), metrics: MempoolMetrics::global("id".to_string()), - inner: MempoolStore { - storage, - ..MempoolStore::default() - }, + lanes, + inner: MempoolStore::default(), } } @@ -1204,6 +1210,7 @@ pub mod test { pub fn gen_cut(&mut self, staking: &Staking) -> Cut { self.mempool .handle_querynewcut(&mut QueryNewCut(staking.clone())) + .unwrap() } pub fn make_data_proposal_with_pending_txs(&mut self) -> Result<()> { @@ -1343,119 +1350,98 @@ pub mod test { } pub fn current_hash(&self) -> Option { - let lane = self - .mempool - .storage - .lanes - .get(self.validator_pubkey()) - .expect("Could not get own lane"); - lane.current_hash().cloned() - } - pub fn current_size(&self) -> LaneBytesSize { - let lane = self - .mempool - .storage + self.mempool .lanes + .lanes_tip .get(self.validator_pubkey()) - .expect("Could not get own lane"); - lane.get_lane_size() + .cloned() } - pub fn data_proposal( + pub fn last_validator_lane_entry( &self, - height: usize, - ) -> (DataProposal, DataProposalHash, LaneBytesSize) { - let le = self + validator: &ValidatorPublicKey, + ) -> (LaneEntry, DataProposalHash) { + let last_dp_hash = self.mempool.lanes.lanes_tip.get(validator).unwrap(); + let last_dp = self .mempool - .storage .lanes - .get(self.validator_pubkey()) - .unwrap() - .data_proposals - .get_index(height) + .get_by_hash(validator, last_dp_hash) .unwrap() - .1; - let dp_orig = le.data_proposal.clone(); + .unwrap(); - (dp_orig.clone(), dp_orig.hash(), le.cumul_size) + (last_dp, last_dp_hash.clone()) } - pub fn last_validator_data_proposal( - &self, - validator: &ValidatorPublicKey, - ) -> (DataProposal, DataProposalHash) { - let dp_orig = self - .mempool - .storage - .lanes - .get(validator) - .unwrap() - .data_proposals - .last() - .unwrap() - .1 - .data_proposal - .clone(); - (dp_orig.clone(), dp_orig.hash()) + pub fn current_size(&self) -> LaneBytesSize { + let validator = self.validator_pubkey(); + self.mempool.lanes.get_lane_size(validator).unwrap() } + pub fn pop_data_proposal(&mut self) -> (DataProposal, DataProposalHash, LaneBytesSize) { let pub_key = self.validator_pubkey().clone(); - let le = self - .mempool - .storage - .lanes - .get_mut(&pub_key) - .unwrap() - .data_proposals - .pop() - .unwrap() - .1; - let dp_orig = le.data_proposal.clone(); - - (dp_orig.clone(), dp_orig.hash(), le.cumul_size) + self.pop_validator_data_proposal(&pub_key) } + pub fn pop_validator_data_proposal( &mut self, validator: &ValidatorPublicKey, - ) -> (DataProposal, DataProposalHash) { - let dp_orig = self + ) -> (DataProposal, DataProposalHash, LaneBytesSize) { + // Get the latest lane entry + let latest_data_proposal_hash = self .mempool - .storage .lanes - .get_mut(validator) - .unwrap() - .data_proposals - .pop() + .lanes_tip + .get(validator) + .cloned() + .unwrap(); + let latest_lane_entry = self + .mempool + .lanes + .get_by_hash(validator, &latest_data_proposal_hash) .unwrap() - .1 + .unwrap(); + + // update the tip + if let Some(parent_dp_hash) = latest_lane_entry .data_proposal - .clone(); + .parent_data_proposal_hash + .as_ref() + { + self.mempool + .lanes + .lanes_tip + .insert(validator.clone(), parent_dp_hash.clone()); + } else { + self.mempool.lanes.lanes_tip.remove(validator); + } - (dp_orig.clone(), dp_orig.hash()) + // Remove the lane entry from db + let key = &format!("{}:{}", validator, &latest_data_proposal_hash); + self.mempool.lanes.by_hash.remove(key).unwrap(); + + ( + latest_lane_entry.data_proposal, + latest_data_proposal_hash, + latest_lane_entry.cumul_size, + ) } pub fn push_data_proposal(&mut self, dp: DataProposal) { let key = self.validator_pubkey().clone(); - let proposals = &mut self - .mempool - .storage + + let lane_size = self.mempool.lanes.get_lane_size(&key).unwrap(); + let size = lane_size + dp.estimate_size(); + self.mempool .lanes - .get_mut(&key) - .unwrap() - .data_proposals; - let size = proposals - .last() - .map(|(_, v)| v.cumul_size) - .unwrap_or_default() - + dp.estimate_size(); - proposals.insert( - dp.hash(), - LaneEntry { - data_proposal: dp, - cumul_size: size, - signatures: vec![], - }, - ); + .put( + key, + LaneEntry { + data_proposal: dp, + cumul_size: size, + signatures: vec![], + }, + ) + .unwrap(); } pub fn submit_contract_tx(&mut self, contract_name: &'static str) { @@ -1490,21 +1476,14 @@ pub mod test { ctx.submit_tx(®ister_tx); ctx.mempool.handle_data_proposal_management()?; + let ( + LaneEntry { + data_proposal: dp, .. + }, + _, + ) = ctx.last_validator_lane_entry(ctx.validator_pubkey()); - assert_eq!( - ctx.mempool - .storage - .lanes - .get(ctx.validator_pubkey()) - .unwrap() - .data_proposals - .first() - .unwrap() - .1 - .data_proposal - .txs, - vec![register_tx.clone()] - ); + assert_eq!(dp.txs, vec![register_tx.clone()]); // Assert that pending_tx has been flushed assert!(ctx.mempool.pending_txs.is_empty()); @@ -1700,18 +1679,14 @@ pub mod test { .expect("should handle net message"); // Assert that we added the vote to the signatures - assert_eq!( - ctx.mempool - .storage - .lanes - .get(ctx.mempool.crypto.validator_pubkey()) - .unwrap() - .get_last_proposal() - .unwrap() - .signatures - .len(), - 2 - ); + let ( + LaneEntry { + signatures: sig, .. + }, + _, + ) = ctx.last_validator_lane_entry(ctx.validator_pubkey()); + + assert_eq!(sig.len(), 2); Ok(()) } @@ -1752,18 +1727,8 @@ pub mod test { // Since mempool is alone, no broadcast - let data_proposal = ctx - .mempool - .storage - .lanes - .get(ctx.validator_pubkey()) - .unwrap() - .data_proposals - .first() - .unwrap() - .1 - .data_proposal - .clone(); + let (LaneEntry { data_proposal, .. }, _) = + ctx.last_validator_lane_entry(ctx.validator_pubkey()); // Add new validator let crypto2 = BlstCrypto::new("2".into()).unwrap(); @@ -1802,18 +1767,14 @@ pub mod test { // Since mempool is alone, no broadcast. // Take this as an example data proposal. - let le = ctx - .mempool - .storage - .lanes - .get(ctx.validator_pubkey()) - .unwrap() - .data_proposals - .first() - .unwrap() - .1; - let data_proposal = le.data_proposal.clone(); - let cumul_size = le.cumul_size; + let ( + LaneEntry { + data_proposal, + cumul_size, + .. + }, + _, + ) = ctx.last_validator_lane_entry(ctx.validator_pubkey()); // Add new validator let crypto2 = BlstCrypto::new("2".into()).unwrap(); @@ -1912,17 +1873,14 @@ pub mod test { assert_ok!(handle, "Should handle net message"); // Assert that the lane entry was added - let lane = ctx - .mempool - .storage - .lanes - .get(crypto2.validator_pubkey()) - .expect("Could not get lane"); - assert_eq!(lane.data_proposals.len(), 1); - assert_eq!( - lane.get_last_proposal().unwrap().data_proposal, - data_proposal - ); + let ( + LaneEntry { + data_proposal: saved_dp, + .. + }, + _, + ) = ctx.last_validator_lane_entry(crypto2.validator_pubkey()); + assert_eq!(saved_dp, data_proposal); // Process it again ctx.mempool @@ -1945,20 +1903,25 @@ pub mod test { ctx.mempool.handle_data_proposal_management()?; - let (dp_orig, dp_hash, l_size) = ctx.data_proposal(0); - let key = ctx.validator_pubkey().clone(); + let ( + LaneEntry { + data_proposal: dp_orig, + cumul_size, + .. + }, + dp_hash, + ) = ctx.last_validator_lane_entry(&key); let cut = vec![( key.clone(), - dp_hash.clone(), - l_size, + dp_hash, + cumul_size, AggregateSignature::default(), )]; ctx.add_trusted_validator(&key); - let _ = ctx - .mempool + ctx.mempool .handle_consensus_event(ConsensusEvent::CommitConsensusProposal( CommittedConsensusProposal { staking: ctx.mempool.staking.clone(), @@ -1973,7 +1936,7 @@ pub mod test { }, certificate: AggregateSignature::default(), }, - )); + ))?; assert_chanmsg_matches!( ctx.mempool_event_receiver, @@ -2007,18 +1970,16 @@ pub mod test { ctx.mempool.handle_data_proposal_management()?; ctx.add_trusted_validator(&ctx.validator_pubkey().clone()); - let (_, dp_hash, l_size) = ctx.data_proposal(0); - let key = ctx.validator_pubkey(); + let (LaneEntry { cumul_size, .. }, dp_hash) = ctx.last_validator_lane_entry(key); let cut = vec![( key.clone(), - dp_hash.clone(), - l_size, + dp_hash, + cumul_size, AggregateSignature::default(), )]; - let _ = ctx - .mempool + ctx.mempool .handle_consensus_event(ConsensusEvent::CommitConsensusProposal( CommittedConsensusProposal { staking: ctx.mempool.staking.clone(), @@ -2033,23 +1994,29 @@ pub mod test { }, certificate: AggregateSignature::default(), }, - )); + ))?; assert!(ctx.mempool_event_receiver.try_recv().is_err()); ctx.submit_contract_tx("test2"); ctx.mempool.handle_data_proposal_management()?; - let (dp_orig1, dp_hash1, l_size) = ctx.data_proposal(1); let key = ctx.validator_pubkey().clone(); + let ( + LaneEntry { + data_proposal: dp_orig1, + cumul_size, + .. + }, + dp_hash1, + ) = ctx.last_validator_lane_entry(&key); let cut = vec![( key.clone(), - dp_hash1.clone(), - l_size, + dp_hash1, + cumul_size, AggregateSignature::default(), )]; - let _ = ctx - .mempool + ctx.mempool .handle_consensus_event(ConsensusEvent::CommitConsensusProposal( CommittedConsensusProposal { staking: ctx.mempool.staking.clone(), @@ -2064,12 +2031,11 @@ pub mod test { }, certificate: AggregateSignature::default(), }, - )); + ))?; assert!(ctx.mempool_event_receiver.try_recv().is_err()); - let _ = ctx - .mempool + ctx.mempool .handle_consensus_event(ConsensusEvent::CommitConsensusProposal( CommittedConsensusProposal { staking: ctx.mempool.staking.clone(), @@ -2084,7 +2050,7 @@ pub mod test { }, certificate: AggregateSignature::default(), }, - )); + ))?; assert_chanmsg_matches!( ctx.mempool_event_receiver, @@ -2118,18 +2084,23 @@ pub mod test { ctx.mempool.handle_data_proposal_management()?; - let (dp_orig, dp_hash, l_size) = ctx.data_proposal(0); - let key = ctx.validator_pubkey().clone(); + let ( + LaneEntry { + data_proposal: dp_orig, + cumul_size, + .. + }, + dp_hash, + ) = ctx.last_validator_lane_entry(&key); let cut = vec![( key.clone(), - dp_hash.clone(), - l_size, + dp_hash, + cumul_size, AggregateSignature::default(), )]; - let _ = ctx - .mempool + ctx.mempool .handle_consensus_event(ConsensusEvent::CommitConsensusProposal( CommittedConsensusProposal { staking: ctx.mempool.staking.clone(), @@ -2144,7 +2115,7 @@ pub mod test { }, certificate: AggregateSignature::default(), }, - )); + ))?; assert_chanmsg_matches!( ctx.mempool_event_receiver, @@ -2172,17 +2143,16 @@ pub mod test { ctx.mempool.handle_data_proposal_management()?; - let (dp_orig2, dp_hash2, l_size) = ctx.pop_data_proposal(); + let (dp_orig2, dp_hash2, cumul_size2) = ctx.pop_data_proposal(); let cut2 = vec![( key.clone(), - dp_hash2.clone(), - l_size, + dp_hash2, + cumul_size2, AggregateSignature::default(), )]; - let _ = ctx - .mempool + ctx.mempool .handle_consensus_event(ConsensusEvent::CommitConsensusProposal( CommittedConsensusProposal { staking: ctx.mempool.staking.clone(), @@ -2197,7 +2167,7 @@ pub mod test { }, certificate: AggregateSignature::default(), }, - )); + ))?; // No data is available to process correctly the ccp, so no signed block assert!(ctx.mempool_event_receiver.try_recv().is_err()); @@ -2208,18 +2178,23 @@ pub mod test { ctx.submit_contract_tx("test3"); ctx.mempool.handle_data_proposal_management()?; - - let (dp_orig3, dp_hash3, l_size) = ctx.data_proposal(2); + let ( + LaneEntry { + data_proposal: dp_orig3, + cumul_size: cumul_size3, + .. + }, + dp_hash3, + ) = ctx.last_validator_lane_entry(&key); let cut3 = vec![( key.clone(), - dp_hash3.clone(), - l_size, + dp_hash3, + cumul_size3, AggregateSignature::default(), )]; - let _ = ctx - .mempool + ctx.mempool .handle_consensus_event(ConsensusEvent::CommitConsensusProposal( CommittedConsensusProposal { staking: ctx.mempool.staking.clone(), @@ -2234,7 +2209,7 @@ pub mod test { }, certificate: AggregateSignature::default(), }, - )); + ))?; assert_chanmsg_matches!( ctx.mempool_event_receiver, diff --git a/src/mempool/storage.rs b/src/mempool/storage.rs index f83a4a57f..c07fe9589 100644 --- a/src/mempool/storage.rs +++ b/src/mempool/storage.rs @@ -1,13 +1,12 @@ -use anyhow::{bail, Context, Result}; +use anyhow::{bail, Result}; use bincode::{Decode, Encode}; use hyle_model::{ ContractName, DataSized, ProgramId, RegisterContractAction, Signed, StructuredBlobData, ValidatorSignature, Verifier, }; -use indexmap::IndexMap; use serde::{Deserialize, Serialize}; use staking::state::Staking; -use std::{collections::HashMap, fmt::Display, sync::Arc, vec}; +use std::{collections::HashMap, path::Path, sync::Arc, vec}; use tracing::{debug, error, warn}; use crate::{ @@ -15,7 +14,7 @@ use crate::{ BlobProofOutput, Cut, DataProposal, DataProposalHash, Hashable, PoDA, SignedByValidator, Transaction, TransactionData, ValidatorPublicKey, }, - utils::crypto::BlstCrypto, + utils::{crypto::BlstCrypto, logger::LogMe}, }; use super::verifiers::{verify_proof, verify_recursive_proof}; @@ -30,6 +29,8 @@ pub enum DataProposalVerdict { Refuse, } +pub use hyle_model::LaneBytesSize; + #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, Serialize, Deserialize)] pub struct LaneEntry { pub data_proposal: DataProposal, @@ -37,126 +38,53 @@ pub struct LaneEntry { pub signatures: Vec>, } -#[derive(Debug, Default, Clone, Encode, Decode)] -pub struct Lane { - pub last_cut: Option<(PoDA, DataProposalHash)>, - #[bincode(with_serde)] - pub data_proposals: IndexMap, -} - -pub use hyle_model::LaneBytesSize; - -#[derive(Debug, Default, Clone, Encode, Decode)] -pub struct Storage { - pub id: ValidatorPublicKey, - pub lanes: HashMap, - pub lanes_tip: HashMap, -} - -impl Display for Storage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Replica {}", self.id)?; - for (i, lane) in self.lanes.iter() { - write!(f, "\n - OL {}: {}", i, lane)?; - } - - Ok(()) - } -} - -impl Storage { - pub fn new( +pub trait Storage { + fn new( + path: &Path, id: ValidatorPublicKey, lanes_tip: HashMap, - ) -> Storage { - let mut lanes = HashMap::new(); - lanes.insert(id.clone(), Lane::default()); - Storage { - id, - lanes, - lanes_tip, - } - } + ) -> Result + where + Self: std::marker::Sized; + fn id(&self) -> &ValidatorPublicKey; + fn contains(&self, validator_key: &ValidatorPublicKey, dp_hash: &DataProposalHash) -> bool; + fn get_by_hash( + &self, + validator_key: &ValidatorPublicKey, + dp_hash: &DataProposalHash, + ) -> Result>; + fn put(&mut self, validator_key: ValidatorPublicKey, lane_entry: LaneEntry) -> Result<()>; + fn update(&mut self, validator_key: ValidatorPublicKey, lane_entry: LaneEntry) -> Result<()>; + fn persist(&self) -> Result<()>; + fn get_lane_tip(&self, validator: &ValidatorPublicKey) -> Option<&DataProposalHash>; + fn update_lane_tip( + &mut self, + validator: ValidatorPublicKey, + dp_hash: DataProposalHash, + ) -> Option; + + fn new_cut(&self, staking: &Staking, previous_cut: &Cut) -> Result { + let validator_last_cut: HashMap = + previous_cut + .iter() + .map(|(v, dp, _, poda)| (v.clone(), (dp.clone(), poda.clone()))) + .collect(); - pub fn new_cut(&mut self, staking: &Staking) -> Cut { // For each validator, we get the last validated car and put it in the cut let mut cut: Cut = vec![]; let bonded_validators = staking.bonded(); for validator in bonded_validators.iter() { - // Get lane of the validator. Create a new empty one is it does not exist - let lane = self.lanes.entry(validator.clone()).or_default(); - - // Iterate their lane starting from the most recent DataProposal until we find one with enough signatures - for ( - data_proposal_hash, - LaneEntry { - data_proposal: _, - cumul_size: size, - signatures, - }, - ) in lane.iter_reverse() + if let Some((dp_hash, cumul_size, poda)) = + self.get_latest_car(validator, staking, validator_last_cut.get(validator))? { - // Only cut on DataProposal that have not been cut yet - if lane.last_cut.as_ref().map(|lc| &lc.1) == Some(data_proposal_hash) { - #[allow(clippy::unwrap_used, reason = "we know the value is Some")] - let poda = lane.last_cut.as_ref().map(|lc| lc.0.clone()).unwrap(); - cut.push((validator.clone(), data_proposal_hash.clone(), *size, poda)); - break; - } - // Filter signatures on DataProposal to only keep the ones from the current validators - let filtered_signatures: Vec> = signatures - .iter() - .filter(|signed_msg| { - bonded_validators.contains(&signed_msg.signature.validator) - }) - .cloned() - .collect(); - - // Collect all filtered validators that signed the DataProposal - let filtered_validators: Vec = filtered_signatures - .iter() - .map(|s| s.signature.validator.clone()) - .collect(); - - // Compute their voting power to check if the DataProposal received enough votes - let voting_power = staking.compute_voting_power(filtered_validators.as_slice()); - let f = staking.compute_f(); - if voting_power < f + 1 { - // Check if previous DataProposals received enough votes - continue; - } - - // Aggregate the signatures in a PoDA - let poda = match BlstCrypto::aggregate( - MempoolNetMessage::DataVote(data_proposal_hash.clone(), *size), - &filtered_signatures.iter().collect::>(), - ) { - Ok(poda) => poda, - Err(e) => { - error!( - "Could not aggregate signatures for validator {} and data proposal hash {}: {}", - validator, data_proposal_hash, e - ); - break; - } - }; - - // Add the DataProposal to the cut for this validator - cut.push(( - validator.clone(), - data_proposal_hash.clone(), - *size, - poda.signature, - )); - break; + cut.push((validator.clone(), dp_hash, cumul_size, poda)); } } - - cut + Ok(cut) } // Called by the initial proposal validator to aggregate votes - pub fn on_data_vote( + fn on_data_vote( &mut self, msg: &SignedByValidator, data_proposal_hash: &DataProposalHash, @@ -165,88 +93,87 @@ impl Storage { DataProposalHash, Vec>, )> { - let lane = match self.lanes.get_mut(&self.id) { - Some(lane) => lane, - None => bail!( - "Received vote from unkown validator {}", - msg.signature.validator, - ), - }; - match lane.get_proposal_mut(data_proposal_hash) { - Some(lane_entry) => { + let id = self.id().clone(); + match self.get_by_hash(&id, data_proposal_hash)? { + Some(mut lane_entry) => { if lane_entry.cumul_size != new_lane_size { - debug!("Received size {:?} does no match the actual size of the DataProposal ({:?})", new_lane_size, lane_entry.cumul_size); bail!("Received size {new_lane_size} does no match the actual size of the DataProposal ({})", lane_entry.cumul_size); } - // Adding the vote to the DataProposal lane_entry.signatures.push(msg.clone()); lane_entry.signatures.dedup(); - Ok((data_proposal_hash.clone(), lane_entry.signatures.clone())) - } - None => { - bail!("Received vote from validator {} for unknown DataProposal ({data_proposal_hash})", msg.signature.validator); + + // Update LaneEntry for self + self.update(self.id().clone(), lane_entry.clone())?; + + Ok((data_proposal_hash.clone(), lane_entry.signatures)) } + None => bail!( + "Received vote from unknown validator {}", + msg.signature.validator, + ), } } - pub fn on_poda_update( + fn on_poda_update( &mut self, validator: &ValidatorPublicKey, data_proposal_hash: &DataProposalHash, signatures: Vec>, - ) { - if let Some(data_proposal) = self - .lanes - .entry(validator.clone()) - .or_default() - .get_proposal_mut(data_proposal_hash) + ) -> Result<()> { + match self + .get_by_hash(validator, data_proposal_hash) + .log_warn("Received PoDA update for unknown DataProposal")? { - // Adding the votes to the DataProposal - data_proposal.signatures.extend(signatures); - data_proposal.signatures.dedup(); - } else { - warn!("Received PoDA update from validator {validator} for unknown DataProposal ({data_proposal_hash}). Not storing it"); + Some(mut lane_entry) => { + lane_entry.signatures.extend(signatures); + lane_entry.signatures.dedup(); + self.put(validator.clone(), lane_entry) + } + None => bail!( + "Received poda update for unknown DP {} for validator {}", + data_proposal_hash, + validator + ), } } - pub fn on_data_proposal( + fn on_data_proposal( &mut self, validator: &ValidatorPublicKey, data_proposal: &DataProposal, - ) -> (DataProposalVerdict, Option) { + ) -> Result<(DataProposalVerdict, Option)> { // Check that data_proposal is not empty if data_proposal.txs.is_empty() { - return (DataProposalVerdict::Empty, None); + return Ok((DataProposalVerdict::Empty, None)); } - let validator_lane = self.lanes.entry(validator.clone()).or_default(); - let validator_lane_size = validator_lane.get_lane_size(); + let validator_lane_size = self.get_lane_size(validator)?; // ALREADY STORED // Already present data proposal (just resend a vote) - if validator_lane.has_proposal(&data_proposal.hash()) { - return (DataProposalVerdict::Vote, Some(validator_lane_size)); + if self.contains(validator, &data_proposal.hash()) { + return Ok((DataProposalVerdict::Vote, Some(validator_lane_size))); } // LEGIT DATA PROPOSAL // data proposal parent hash needs to match the lane tip of that validator - let last_known_hash = self.lanes_tip.get(validator); + let last_known_hash = self.get_lane_tip(validator); let valid_next_data_proposal = data_proposal.parent_data_proposal_hash.as_ref() == last_known_hash; if valid_next_data_proposal { - return (DataProposalVerdict::Process, None); + return Ok((DataProposalVerdict::Process, None)); } // UNKNOWN PARENT // DataProposal's parent is unknown if let Some(ref dp_parent_hash) = data_proposal.parent_data_proposal_hash { - if !validator_lane.has_proposal(dp_parent_hash) { + if !self.contains(validator, dp_parent_hash) { // Get the last known parent hash in order to get all the next ones - return (DataProposalVerdict::Wait(last_known_hash.cloned()), None); + return Ok((DataProposalVerdict::Wait(last_known_hash.cloned()), None)); } } @@ -261,10 +188,76 @@ impl Storage { data_proposal.clone() ); - (DataProposalVerdict::Refuse, None) + Ok((DataProposalVerdict::Refuse, None)) + } + + fn get_latest_car( + &self, + validator: &ValidatorPublicKey, + staking: &Staking, + previous_committed_car: Option<&(DataProposalHash, PoDA)>, + ) -> Result> { + let bonded_validators = staking.bonded(); + // We start from the tip of the lane, and go backup until we find a DP with enough signatures + if let Some(tip_dp_hash) = self.get_lane_tip(validator) { + let mut dp_hash = tip_dp_hash.clone(); + while let Some(le) = self.get_by_hash(validator, &dp_hash)? { + if let Some((hash, poda)) = previous_committed_car { + if &dp_hash == hash { + // Latest car has already been committed + return Ok(Some((hash.clone(), le.cumul_size, poda.clone()))); + } + } + // Filter signatures on DataProposal to only keep the ones from the current validators + let filtered_signatures: Vec> = le + .signatures + .iter() + .filter(|signed_msg| { + bonded_validators.contains(&signed_msg.signature.validator) + }) + .cloned() + .collect(); + + // Collect all filtered validators that signed the DataProposal + let filtered_validators: Vec = filtered_signatures + .iter() + .map(|s| s.signature.validator.clone()) + .collect(); + + // Compute their voting power to check if the DataProposal received enough votes + let voting_power = staking.compute_voting_power(filtered_validators.as_slice()); + let f = staking.compute_f(); + if voting_power < f + 1 { + // Check if previous DataProposals received enough votes + if let Some(parent_dp_hash) = le.data_proposal.parent_data_proposal_hash { + dp_hash = parent_dp_hash.clone(); + continue; + } + return Ok(None); + } + + // Aggregate the signatures in a PoDA + let poda = match BlstCrypto::aggregate( + MempoolNetMessage::DataVote(dp_hash.clone(), le.cumul_size), + &filtered_signatures.iter().collect::>(), + ) { + Ok(poda) => poda, + Err(e) => { + error!( + "Could not aggregate signatures for validator {} and data proposal hash {}: {}", + validator, dp_hash, e + ); + break; + } + }; + return Ok(Some((dp_hash.clone(), le.cumul_size, poda.signature))); + } + } + + Ok(None) } - pub fn process_data_proposal( + fn process_data_proposal( data_proposal: &mut DataProposal, known_contracts: Arc>, ) -> DataProposalVerdict { @@ -374,36 +367,36 @@ impl Storage { } // Remove proofs from transactions - remove_proofs(data_proposal); + Self::remove_proofs(data_proposal); DataProposalVerdict::Vote } - pub fn store_data_proposal( + fn store_data_proposal( &mut self, crypto: &BlstCrypto, validator: &ValidatorPublicKey, data_proposal: DataProposal, - ) -> LaneBytesSize { - // Updating validator's lane tip - if let Some(validator_tip) = self.lanes_tip.get(validator) { - // If validator already has a lane, we only update the tip if DP-chain is respected - if let Some(parent_dp_hash) = &data_proposal.parent_data_proposal_hash { - if validator_tip == parent_dp_hash { - self.lanes_tip - .insert(validator.clone(), data_proposal.hash()); - } - } - } else { - self.lanes_tip - .insert(validator.clone(), data_proposal.hash()); - } - + ) -> Result { // Add DataProposal to validator's lane - self.lanes - .entry(validator.clone()) - .or_default() - .add_new_proposal(crypto, data_proposal) + let data_proposal_hash = data_proposal.hash(); + + let dp_size = data_proposal.estimate_size(); + let lane_size = self.get_lane_size(validator)?; + let cumul_size = lane_size + dp_size; + + let msg = MempoolNetMessage::DataVote(data_proposal_hash.clone(), cumul_size); + let signatures = vec![crypto.sign(msg)?]; + + self.put( + validator.clone(), + LaneEntry { + data_proposal, + cumul_size, + signatures, + }, + )?; + Ok(cumul_size) } // Find the verifier and program_id for a contract name in the current lane, optimistically. @@ -447,1403 +440,525 @@ impl Storage { }) } - pub fn lane_has_data_proposal( - &self, - validator: &ValidatorPublicKey, - data_proposal_hash: &DataProposalHash, - ) -> bool { - self.lanes - .get(validator) - .is_some_and(|lane| lane.has_proposal(data_proposal_hash)) - } - - pub fn get_lane_entries_between_hashes( + fn get_lane_entries_between_hashes( &self, validator: &ValidatorPublicKey, from_data_proposal_hash: Option<&DataProposalHash>, to_data_proposal_hash: Option<&DataProposalHash>, ) -> Result> { - if let Some(lane) = self.lanes.get(validator) { - return lane - .get_lane_entries_between_hashes(from_data_proposal_hash, to_data_proposal_hash); + // If no dp hash is provided, we use the tip of the lane + let mut dp_hash: DataProposalHash = match to_data_proposal_hash { + Some(hash) => hash.clone(), + None => match self.get_lane_tip(validator) { + Some(dp_hash) => dp_hash.clone(), + None => { + return Ok(vec![]); + } + }, + }; + let mut entries = vec![]; + while Some(&dp_hash) != from_data_proposal_hash { + let lane_entry = self.get_by_hash(validator, &dp_hash)?; + match lane_entry { + Some(lane_entry) => { + entries.insert(0, lane_entry.clone()); + if let Some(parent_dp_hash) = lane_entry.data_proposal.parent_data_proposal_hash + { + dp_hash = parent_dp_hash; + } else { + break; + } + } + None => { + bail!("Local lane is incomplete: could not find DP {}", dp_hash); + } + } + } + + Ok(entries) + } + + fn get_lane_size(&self, validator: &ValidatorPublicKey) -> Result { + if let Some(latest_data_proposal_hash) = self.get_lane_tip(validator) { + self.get_by_hash(validator, latest_data_proposal_hash)? + .map_or_else( + || Ok(LaneBytesSize::default()), + |entry| Ok(entry.cumul_size), + ) + } else { + Ok(LaneBytesSize::default()) } - bail!("Validator not found"); } // Add lane entries to validator"s lane - pub fn add_missing_lane_entries( + fn add_missing_lane_entries( &mut self, validator: &ValidatorPublicKey, lane_entries: Vec, ) -> Result<()> { - let lane = self.lanes.entry(validator.clone()).or_default(); - - debug!( - "Trying to add missing lane entries on lane \n {}\n {:?}", - lane, lane_entries - ); - - lane.add_missing_lane_entries(lane_entries.clone())?; - - // WARNING: This is not a proper way to do it. This *will* be properly done after refactoring storage to HashMap - // Updating validator's lane tip - if let Some(validator_tip) = self.lanes_tip.get(validator).cloned() { - for le in lane_entries.iter() { - // If validator already has a lane, we only update the tip if DP-chain is respected - if let Some(parent_dp_hash) = &le.data_proposal.parent_data_proposal_hash { - if &validator_tip == parent_dp_hash { - self.lanes_tip - .insert(validator.clone(), le.data_proposal.hash()); - } - } - } - } else { - self.lanes_tip.insert( - validator.clone(), - #[allow(clippy::unwrap_used, reason = "must exist because of previous checks")] - lane_entries.last().unwrap().data_proposal.hash(), - ); + debug!("Trying to add missing lane entries on validator ({validator})'s lane"); + for lane_entry in lane_entries { + self.put(validator.clone(), lane_entry)?; } Ok(()) } /// Creates and saves a new DataProposal if there are pending transactions - pub fn new_data_proposal(&mut self, crypto: &BlstCrypto, txs: Vec) { + fn new_data_proposal(&mut self, crypto: &BlstCrypto, txs: Vec) -> Result<()> { if txs.is_empty() { - return; + return Ok(()); } - // Get last DataProposal of own lane - let data_proposal = if let Some(LaneEntry { - data_proposal: parent_data_proposal, - cumul_size: _, - signatures: _, - }) = self - .lanes - .entry(self.id.clone()) - .or_default() - .get_last_proposal() - { - // Create new data proposal - DataProposal { - parent_data_proposal_hash: Some(parent_data_proposal.hash()), - txs, - } - } else { - // Own lane has no parent DataProposal yet - DataProposal { - parent_data_proposal_hash: None, - txs, - } + let validator_key = self.id().clone(); + + // Create new data proposal + let data_proposal = DataProposal { + parent_data_proposal_hash: self.get_lane_tip(&validator_key).cloned(), + txs, }; debug!( "Creating new DataProposal in local lane ({}) with {} transactions", - self.id, + self.id(), data_proposal.txs.len() ); - self.store_data_proposal(crypto, &self.id.clone(), data_proposal); - } - - pub fn get_lane_latest_entry(&self, validator: &ValidatorPublicKey) -> Option<&LaneEntry> { - self.lanes - .get(validator) - .and_then(|lane| lane.get_last_proposal()) - } + let v_id = self.id().clone(); + self.store_data_proposal(crypto, &v_id, data_proposal)?; - pub fn get_lane_pending_entries( - &self, - validator: &ValidatorPublicKey, - ) -> Option> { - self.lanes - .get(validator) - .map(|lane| lane.get_pending_entries()) + Ok(()) } - pub fn get_lane_latest_data_proposal_hash( + fn get_lane_pending_entries( &self, validator: &ValidatorPublicKey, - ) -> Option<&DataProposalHash> { - self.lanes - .get(validator) - .and_then(|lane| lane.get_last_proposal_hash()) - } - - pub fn update_lanes_with_commited_cut(&mut self, committed_cut: &Cut) { - for (validator, data_proposal_hash, _, poda) in committed_cut.iter() { - if let Some(lane) = self.lanes.get_mut(validator) { - lane.last_cut = Some((poda.clone(), data_proposal_hash.clone())); - } - } - } + last_cut: Option, + ) -> Result> { + let lane_tip = self.get_lane_tip(validator); - pub fn try_update_lanes_tip(&mut self, cut: &Cut) { - for (validator, data_proposal_hash, _, _) in cut.iter() { - // If we know the hash, we don't change the tip (because either it is already at the tip, or we have advanced DPs) - if self.lane_has_data_proposal(validator, data_proposal_hash) { - continue; - } - // If we do not know the hash, 2 options: - // 1) Cut's DP is ahead of our lane tip: we update the tip - self.lanes_tip - .insert(validator.clone(), data_proposal_hash.clone()); - // FIXME: - // 2) Cut's DP is on a fork of our lane... - } + let last_committed_dp_hash = match last_cut { + Some(cut) => cut + .iter() + .find(|(v, _, _, _)| v == validator) + .map(|(_, dp, _, _)| dp.clone()), + None => None, + }; + self.get_lane_entries_between_hashes(validator, last_committed_dp_hash.as_ref(), lane_tip) } -} - -/// Remove proofs from all transactions in the DataProposal -fn remove_proofs(dp: &mut DataProposal) { - dp.txs.iter_mut().for_each(|tx| { - match &mut tx.transaction_data { - TransactionData::VerifiedProof(proof_tx) => { - proof_tx.proof = None; - } - TransactionData::Proof(_) => { - // This can never happen. - // A DataProposal that has been processed has turned all TransactionData::Proof into TransactionData::VerifiedProof - unreachable!(); - } - TransactionData::Blob(_) => {} - } - }); -} -impl Display for Lane { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for ( - _, - LaneEntry { - data_proposal, - cumul_size: size, - signatures: _, - }, - ) in self.data_proposals.iter() - { - match &data_proposal.parent_data_proposal_hash { - None => { - let _ = write!(f, "{} ({})", data_proposal, size); + /// Remove proofs from all transactions in the DataProposal + fn remove_proofs(dp: &mut DataProposal) { + dp.txs.iter_mut().for_each(|tx| { + match &mut tx.transaction_data { + TransactionData::VerifiedProof(proof_tx) => { + proof_tx.proof = None; } - Some(_) => { - let _ = write!(f, " <- {} ({})", data_proposal, size); + TransactionData::Proof(_) => { + // This can never happen. + // A DataProposal that has been processed has turned all TransactionData::Proof into TransactionData::VerifiedProof + unreachable!(); } + TransactionData::Blob(_) => {} } - } - - Ok(()) - } -} - -impl Lane { - pub fn has_proposal(&self, data_proposal_hash: &DataProposalHash) -> bool { - self.data_proposals.contains_key(data_proposal_hash) - } - - pub fn get_proposal(&self, hash: &DataProposalHash) -> Option<&LaneEntry> { - self.data_proposals.get(hash) - } - - pub fn get_proposal_mut(&mut self, hash: &DataProposalHash) -> Option<&mut LaneEntry> { - self.data_proposals.get_mut(hash) - } - - pub fn get_pending_entries(&self) -> Vec { - let last_comitted_dp_hash = self.last_cut.as_ref().map(|(_, dp_hash)| dp_hash); - let last_dp_hash = self.get_last_proposal_hash(); - - self.get_lane_entries_between_hashes(last_comitted_dp_hash, last_dp_hash) - .unwrap_or_default() - } - - pub fn get_last_proposal(&self) -> Option<&LaneEntry> { - self.data_proposals.iter().last().map(|(_, entry)| entry) - } - - pub fn get_last_proposal_hash(&self) -> Option<&DataProposalHash> { - self.data_proposals - .iter() - .last() - .map(|(data_proposal_hash, _)| data_proposal_hash) - } - - pub fn get_lane_size(&self) -> LaneBytesSize { - self.data_proposals - .iter() - .last() - .map(|(_, entry)| entry.cumul_size) - .unwrap_or_default() - } - - pub fn add_new_proposal( - &mut self, - crypto: &BlstCrypto, - data_proposal: DataProposal, - ) -> LaneBytesSize { - if let Some(le) = self.data_proposals.get(&data_proposal.hash()) { - warn!( - "DataProposal {} already exists in lane, size: {}", - data_proposal.hash(), - le.cumul_size - ); - return le.cumul_size; - } - let data_proposal_hash = data_proposal.hash(); - let dp_size = data_proposal.estimate_size(); - let lane_size = self.get_lane_size(); - let tx_len = data_proposal.txs.len(); - let msg = MempoolNetMessage::DataVote(data_proposal_hash.clone(), lane_size + dp_size); - let signatures = match crypto.sign(msg) { - Ok(s) => vec![s], - Err(_) => vec![], - }; - self.data_proposals.insert( - data_proposal_hash.clone(), - LaneEntry { - data_proposal, - cumul_size: lane_size + dp_size, - signatures, - }, - ); - tracing::info!( - "Added new DataProposal {} to lane, txs: {}, size: {}", - data_proposal_hash, - tx_len, - lane_size + dp_size - ); - - lane_size + dp_size - } - - pub fn add_proposal(&mut self, hash: DataProposalHash, lane_entry: LaneEntry) { - self.data_proposals.insert(hash, lane_entry); - } - - pub fn iter_reverse(&self) -> impl Iterator { - self.data_proposals.iter().rev() - } - - pub fn current(&self) -> Option<(&DataProposalHash, &LaneEntry)> { - self.data_proposals.last() - } - - pub fn current_hash(&self) -> Option<&DataProposalHash> { - self.current() - .map(|(data_proposal_hash, _)| data_proposal_hash) - } - - fn get_index_or_bail(&self, hash: &DataProposalHash) -> Result { - self.data_proposals.get_index_of(hash).context( - "Won't return any LaneEntry as aimed DataProposal {hash} does not exist on Lane", - ) - } - - fn collect_entries(&self, start: usize, end: usize) -> Vec { - self.data_proposals - .values() - .skip(start) - .take(end - start) - .cloned() - .collect() - } - - fn get_lane_entries_between_hashes( - &self, - from_data_proposal_hash: Option<&DataProposalHash>, - to_data_proposal_hash: Option<&DataProposalHash>, - ) -> Result> { - let from_index = match from_data_proposal_hash { - Some(hash) => { - self.get_index_or_bail(hash) - .context("getting 'from' index")? - + 1 - } - None => 0, - }; - let to_index = match to_data_proposal_hash { - Some(hash) => self.get_index_or_bail(hash).context("getting 'to' index")?, - None => self.data_proposals.len(), - }; - - Ok(self.collect_entries(from_index, to_index + 1)) - } - - pub fn add_missing_lane_entries(&mut self, lane_entries: Vec) -> Result<()> { - let lane_entries_len = lane_entries.len(); - let mut ordered_lane_entries = lane_entries; - ordered_lane_entries.dedup(); - - for lane_entry in ordered_lane_entries.into_iter() { - if Some(&lane_entry.data_proposal.hash()) == self.current_hash() { - debug!("Skipping already known LaneEntry"); - continue; - } - if lane_entry.data_proposal.parent_data_proposal_hash != self.current_hash().cloned() { - bail!("Incorrect parent hash while adding missing LaneEntry"); - } - let current_size = self.get_lane_size(); - let expected_size = current_size + lane_entry.data_proposal.estimate_size(); - if lane_entry.cumul_size != expected_size { - bail!( - "Incorrect size while adding missing LaneEntry. Expected: {}, Got: {}", - expected_size, - lane_entry.cumul_size - ); - } - self.add_proposal(lane_entry.data_proposal.hash(), lane_entry); - } - debug!( - "Nb data proposals after adding {} missing entries: {}", - lane_entries_len, - self.data_proposals.len() - ); - Ok(()) + }); } } #[cfg(test)] mod tests { - #![allow(clippy::indexing_slicing)] - use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - vec, - }; - + use super::*; use crate::{ - mempool::{ - storage::{DataProposalHash, DataProposalVerdict, LaneEntry, Storage}, - test::make_register_contract_tx, - KnownContracts, MempoolNetMessage, - }, - model::{ - Blob, BlobData, BlobProofOutput, BlobTransaction, ContractName, Hashable, ProofData, - ProofTransaction, Transaction, TransactionData, ValidatorPublicKey, - VerifiedProofTransaction, - }, + mempool::{storage::DataProposalVerdict, storage_memory::LanesStorage, MempoolNetMessage}, utils::crypto::{self, BlstCrypto}, }; - use hyle_contract_sdk::{BlobIndex, HyleOutput, Identity, ProgramId, StateDigest, TxHash}; - use hyle_model::DataSized; + use hyle_model::{ + DataProposal, DataSized, Signature, SignedByValidator, Transaction, ValidatorSignature, + }; use staking::state::Staking; + use std::collections::HashMap; - use super::{DataProposal, Lane, LaneBytesSize}; - - fn get_hyle_output() -> HyleOutput { - HyleOutput { - version: 1, - initial_state: StateDigest(vec![0, 1, 2, 3]), - next_state: StateDigest(vec![4, 5, 6]), - identity: Identity::new("test"), - tx_hash: TxHash::new(""), - index: BlobIndex(0), - blobs: vec![], - success: true, - tx_ctx: None, - registered_contracts: vec![], - program_outputs: vec![], - } - } - - fn make_proof_tx(contract_name: ContractName) -> ProofTransaction { - let hyle_output = get_hyle_output(); - ProofTransaction { - contract_name: contract_name.clone(), - proof: ProofData( - bincode::encode_to_vec(vec![hyle_output.clone()], bincode::config::standard()) - .unwrap(), - ), - } - } - - fn make_unverified_proof_tx(contract_name: ContractName) -> Transaction { - Transaction { - version: 1, - transaction_data: TransactionData::Proof(make_proof_tx(contract_name)), - } - } - - fn make_verified_proof_tx(contract_name: ContractName) -> Transaction { - let hyle_output = get_hyle_output(); - let proof = ProofData( - bincode::encode_to_vec(vec![hyle_output.clone()], bincode::config::standard()).unwrap(), - ); - Transaction { - version: 1, - transaction_data: TransactionData::VerifiedProof(VerifiedProofTransaction { - contract_name: contract_name.clone(), - proof_hash: proof.hash(), - proven_blobs: vec![BlobProofOutput { - program_id: ProgramId(vec![]), - blob_tx_hash: TxHash::default(), - hyle_output, - original_proof_hash: proof.hash(), - }], - proof: Some(proof), - is_recursive: false, - }), - } - } - - fn make_empty_verified_proof_tx(contract_name: ContractName) -> Transaction { - let hyle_output = get_hyle_output(); - let proof = ProofData( - bincode::encode_to_vec(vec![hyle_output.clone()], bincode::config::standard()).unwrap(), - ); - Transaction { - version: 1, - transaction_data: TransactionData::VerifiedProof(VerifiedProofTransaction { - contract_name: contract_name.clone(), - proof_hash: proof.hash(), - proven_blobs: vec![BlobProofOutput { - program_id: ProgramId(vec![]), - blob_tx_hash: TxHash::default(), - hyle_output, - original_proof_hash: proof.hash(), - }], - proof: None, - is_recursive: false, - }), - } - } - - fn make_blob_tx(inner_tx: &'static str) -> Transaction { - Transaction { - version: 1, - transaction_data: TransactionData::Blob(BlobTransaction { - identity: Identity::new("id.c1"), - blobs: vec![Blob { - contract_name: ContractName::new("c1"), - data: BlobData(inner_tx.as_bytes().to_vec()), - }], - }), - } - } - - fn handle_data_proposal( - store: &mut Storage, - crypto: &BlstCrypto, - pubkey: &ValidatorPublicKey, - mut data_proposal: DataProposal, - known_contracts: Arc>, - ) -> (DataProposalVerdict, Option) { - let (verdict, size) = store.on_data_proposal(pubkey, &data_proposal); - let verdict = match verdict { - DataProposalVerdict::Process => { - Storage::process_data_proposal(&mut data_proposal, known_contracts) - } - verdict => verdict, - }; - match verdict { - DataProposalVerdict::Vote => { - let size = store.store_data_proposal(crypto, pubkey, data_proposal); - (verdict, Some(size)) - } - verdict => (verdict, size), - } + fn setup_storage(pubkey: &ValidatorPublicKey) -> LanesStorage { + let tmp_dir = tempfile::tempdir().unwrap().into_path(); + LanesStorage::new(&tmp_dir, pubkey.clone(), HashMap::default()).unwrap() } - #[test_log::test] - fn test_data_proposal_hash_with_verified_proof() { - let contract_name = ContractName::new("test"); - - let proof_tx_with_proof = make_verified_proof_tx(contract_name.clone()); - let proof_tx_without_proof = make_empty_verified_proof_tx(contract_name.clone()); + #[test_log::test(tokio::test)] + async fn test_put_contains_get() { + let crypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + let mut storage = setup_storage(pubkey); - let data_proposal_with_proof = DataProposal { + let data_proposal = DataProposal { parent_data_proposal_hash: None, - txs: vec![proof_tx_with_proof], + txs: vec![], }; + let cumul_size: LaneBytesSize = LaneBytesSize(data_proposal.estimate_size() as u64); - let data_proposal_without_proof = DataProposal { - parent_data_proposal_hash: None, - txs: vec![proof_tx_without_proof], + let entry = LaneEntry { + data_proposal, + cumul_size, + signatures: vec![], }; - - let hash_with_proof = data_proposal_with_proof.hash(); - let hash_without_proof = data_proposal_without_proof.hash(); - - assert_eq!(hash_with_proof, hash_without_proof); + let dp_hash = entry.data_proposal.hash(); + storage.put(pubkey.clone(), entry.clone()).unwrap(); + assert!(storage.contains(pubkey, &dp_hash)); + assert_eq!( + storage.get_by_hash(pubkey, &dp_hash).unwrap().unwrap(), + entry + ); } - #[test_log::test] - fn test_add_missing_lane_entries() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); - let mut store = Storage::new(pubkey1.clone(), HashMap::default()); - - let data_proposal1 = DataProposal { + #[test_log::test(tokio::test)] + async fn test_update() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + let mut storage = setup_storage(pubkey); + let data_proposal = DataProposal { parent_data_proposal_hash: None, txs: vec![], }; - let data_proposal1_hash = data_proposal1.hash(); - let l_dp1_size = LaneBytesSize(data_proposal1.estimate_size() as u64); - - let data_proposal2 = DataProposal { - parent_data_proposal_hash: Some(data_proposal1_hash.clone()), - txs: vec![], - }; - let data_proposal2_hash = data_proposal2.hash(); - let l_dp2_size = l_dp1_size + data_proposal2.estimate_size(); - - let lane_entry1 = LaneEntry { - data_proposal: data_proposal1, - cumul_size: l_dp1_size, + let cumul_size: LaneBytesSize = LaneBytesSize(data_proposal.estimate_size() as u64); + let mut entry = LaneEntry { + data_proposal, + cumul_size, signatures: vec![], }; + let dp_hash = entry.data_proposal.hash(); + storage.put(pubkey.clone(), entry.clone()).unwrap(); + entry.signatures.push(SignedByValidator { + msg: MempoolNetMessage::DataVote(dp_hash.clone(), cumul_size), + signature: ValidatorSignature { + validator: pubkey.clone(), + signature: Signature::default(), + }, + }); + storage.update(pubkey.clone(), entry.clone()).unwrap(); + let updated = storage.get_by_hash(pubkey, &dp_hash).unwrap().unwrap(); + assert_eq!(1, updated.signatures.len()); + } - let lane_entry2 = LaneEntry { - data_proposal: data_proposal2, - cumul_size: l_dp2_size, - signatures: vec![], - }; + #[test_log::test(tokio::test)] + async fn test_on_data_proposal() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + + let crypto2: BlstCrypto = crypto::BlstCrypto::new("2".to_owned()).unwrap(); + let pubkey2 = crypto2.validator_pubkey(); - store - .add_missing_lane_entries(pubkey1, vec![lane_entry1.clone(), lane_entry2.clone()]) - .expect("Failed to add missing lane entries"); - - let lane = store.lanes.get(pubkey1).expect("Lane not found"); - assert!(lane.has_proposal(&data_proposal1_hash)); - assert!(lane.has_proposal(&data_proposal2_hash)); - - // Ensure the lane entries are in the correct order - let lane_entries = lane - .data_proposals - .values() - .cloned() - .collect::>(); - assert_eq!(lane_entries, vec![lane_entry1.clone(), lane_entry2.clone()]); - - // Adding an incorrect data proposal should fail - let data_proposal3 = DataProposal { - parent_data_proposal_hash: Some(DataProposalHash("non_existent".to_string())), + let mut storage = setup_storage(pubkey); + let dp = DataProposal { + parent_data_proposal_hash: None, txs: vec![], }; - let data_proposal3_hash = data_proposal3.hash(); - let l_dp3_size = l_dp2_size + data_proposal3.estimate_size(); + // 2 send a DP to 1 + let (verdict, _) = storage.on_data_proposal(pubkey2, &dp).unwrap(); + assert_eq!(verdict, DataProposalVerdict::Empty); - let lane_entry3 = LaneEntry { - data_proposal: data_proposal3, - cumul_size: l_dp3_size, - signatures: vec![], + let dp = DataProposal { + parent_data_proposal_hash: None, + txs: vec![Transaction::default()], }; + let (verdict, _) = storage.on_data_proposal(pubkey2, &dp).unwrap(); + assert_eq!(verdict, DataProposalVerdict::Process); - assert!(store - .add_missing_lane_entries(pubkey1, vec![lane_entry3.clone()]) - .is_err()); - let lane = store.lanes.get(pubkey1).expect("Lane not found"); - // Ensure incorrect data proposal is not in the lane entries - assert!(!lane.has_proposal(&data_proposal3_hash)); + let dp_unknown_parent = DataProposal { + parent_data_proposal_hash: Some(DataProposalHash::default()), + txs: vec![Transaction::default()], + }; + // 2 send a DP to 1 + let (verdict, _) = storage + .on_data_proposal(pubkey2, &dp_unknown_parent) + .unwrap(); + assert_eq!(verdict, DataProposalVerdict::Wait(None)); } - #[test_log::test] - fn test_get_lane_entries_between_hashes() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); - let mut store = Storage::new(pubkey1.clone(), HashMap::default()); - - let data_proposal1 = DataProposal { - parent_data_proposal_hash: None, - txs: vec![], - }; - let data_proposal1_hash = data_proposal1.hash(); - let l_dp1_size = LaneBytesSize(data_proposal1.estimate_size() as u64); + #[test_log::test(tokio::test)] + async fn test_on_data_proposal_fork() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); - let data_proposal2 = DataProposal { - parent_data_proposal_hash: Some(data_proposal1_hash.clone()), - txs: vec![], - }; - let data_proposal2_hash = data_proposal2.hash(); - let l_dp2_size = l_dp1_size + data_proposal2.estimate_size(); + let crypto2: BlstCrypto = crypto::BlstCrypto::new("2".to_owned()).unwrap(); + let pubkey2 = crypto2.validator_pubkey(); - let data_proposal3 = DataProposal { - parent_data_proposal_hash: Some(data_proposal2_hash.clone()), - txs: vec![], + let mut storage = setup_storage(pubkey); + let dp = DataProposal { + parent_data_proposal_hash: None, + txs: vec![Transaction::default()], }; - let data_proposal3_hash = data_proposal3.hash(); - let l_dp3_size = l_dp2_size + data_proposal3.estimate_size(); - - let lane_entry1 = LaneEntry { - data_proposal: data_proposal1, - cumul_size: l_dp1_size, - signatures: vec![], + let dp2 = DataProposal { + parent_data_proposal_hash: Some(dp.hash()), + txs: vec![Transaction::default()], }; - let lane_entry2 = LaneEntry { - data_proposal: data_proposal2, - cumul_size: l_dp2_size, - signatures: vec![], - }; + storage + .store_data_proposal(&crypto, pubkey2, dp.clone()) + .unwrap(); + storage.store_data_proposal(&crypto, pubkey2, dp2).unwrap(); - let lane_entry3 = LaneEntry { - data_proposal: data_proposal3, - cumul_size: l_dp3_size, - signatures: vec![], + let dp2_fork = DataProposal { + parent_data_proposal_hash: Some(dp.hash()), + txs: vec![Transaction::default(), Transaction::default()], }; - store - .add_missing_lane_entries( - pubkey1, - vec![ - lane_entry1.clone(), - lane_entry2.clone(), - lane_entry3.clone(), - ], - ) - .expect("Failed to add missing lane entries"); - - let lane = store.lanes.get(pubkey1).expect("Lane not found"); - - // Test getting all entries from the beginning to the second proposal - let entries = lane - .get_lane_entries_between_hashes(None, Some(&data_proposal2_hash)) - .expect("Failed to get lane entries"); - assert_eq!(entries.len(), 2); - assert_eq!(entries[0], lane_entry1); - assert_eq!(entries[1], lane_entry2); - - // Test getting entries between the first and second proposal - let entries = lane - .get_lane_entries_between_hashes(Some(&data_proposal1_hash), Some(&data_proposal2_hash)) - .expect("Failed to get lane entries"); - assert_eq!(entries.len(), 1); - assert_eq!(entries[0], lane_entry2); - - // Test getting entries between the first, second and third proposals - let entries = lane - .get_lane_entries_between_hashes(Some(&data_proposal1_hash), Some(&data_proposal3_hash)) - .expect("Failed to get lane entries"); - assert_eq!(entries.len(), 2); - assert_eq!(entries[0], lane_entry2); - assert_eq!(entries[1], lane_entry3); - - // Test getting entries between the first, and the first proposals (empty) - let entries = lane - .get_lane_entries_between_hashes(Some(&data_proposal1_hash), Some(&data_proposal1_hash)) - .expect("Failed to get lane entries"); - assert!(entries.is_empty()); - - // Test getting entries with a non-existent starting hash - let non_existent_hash = DataProposalHash("non_existent".to_string()); - let entries = lane - .get_lane_entries_between_hashes(Some(&non_existent_hash), Some(&data_proposal2_hash)); - assert!(entries.is_err()); - - // Test getting entries with a non-existent ending hash - let entries = lane - .get_lane_entries_between_hashes(Some(&data_proposal1_hash), Some(&non_existent_hash)); - assert!(entries.is_err()); + let (verdict, _) = storage.on_data_proposal(pubkey2, &dp2_fork).unwrap(); + assert_eq!(verdict, DataProposalVerdict::Refuse); } - fn lane<'a>(store: &'a mut Storage, id: &ValidatorPublicKey) -> &'a mut Lane { - store.lanes.entry(id.clone()).or_default() - } + #[test_log::test(tokio::test)] + async fn test_on_data_vote() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + let mut storage = setup_storage(pubkey); - #[test_log::test] - fn test_on_poa_update() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let crypto2 = crypto::BlstCrypto::new("2".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); - let pubkey2 = crypto2.validator_pubkey(); - let mut store1 = Storage::new(pubkey1.clone(), HashMap::default()); + let crypto2: BlstCrypto = crypto::BlstCrypto::new("2".to_owned()).unwrap(); let data_proposal = DataProposal { parent_data_proposal_hash: None, txs: vec![], }; - let data_proposal_hash = data_proposal.hash(); - let l_dp_size = LaneBytesSize(data_proposal.estimate_size() as u64); - - lane(&mut store1, pubkey1).add_new_proposal(&crypto1, data_proposal); - - let signatures = vec![ - crypto1 - .sign(MempoolNetMessage::DataVote( - data_proposal_hash.clone(), - l_dp_size, - )) - .expect("Failed to sign message"), - crypto2 - .sign(MempoolNetMessage::DataVote( - data_proposal_hash.clone(), - l_dp_size, - )) - .expect("Failed to sign message"), - ]; - - store1.on_poda_update(pubkey1, &data_proposal_hash, signatures); - - let lane = store1.lanes.get(pubkey1).expect("Lane not found"); - let lane_entry = lane - .get_proposal(&data_proposal_hash) - .expect("Data proposal not found"); - - assert_eq!(lane_entry.signatures.len(), 2); - assert!(lane_entry - .signatures - .iter() - .any(|s| &s.signature.validator == pubkey1)); - assert!(lane_entry - .signatures - .iter() - .any(|s| &s.signature.validator == pubkey2)); - } - - #[test_log::test] - fn test_workflow() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let crypto2 = crypto::BlstCrypto::new("2".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); - let pubkey2 = crypto2.validator_pubkey(); - let mut store1 = Storage::new(pubkey1.clone(), HashMap::default()); - let mut store2 = Storage::new(pubkey2.clone(), HashMap::default()); - let known_contracts = Arc::new(RwLock::new(KnownContracts::default())); - - // First data proposal - let tx1 = make_blob_tx("test1"); - store1.new_data_proposal(&crypto1, vec![tx1]); - - let data_proposal1 = store1 - .get_lane_latest_entry(pubkey1) - .unwrap() - .data_proposal - .clone(); - let data_proposal1_hash = data_proposal1.hash(); - let l_dp1_size = LaneBytesSize(data_proposal1.estimate_size() as u64); - - assert_eq!( - handle_data_proposal( - &mut store2, - &crypto2, - pubkey1, - data_proposal1, - known_contracts.clone() - ), - (DataProposalVerdict::Vote, Some(l_dp1_size)) - ); - - let msg1 = crypto2 - .sign(MempoolNetMessage::DataVote( - data_proposal1_hash.clone(), - l_dp1_size, - )) - .expect("Could not sign DataVote message"); - - store1 - .on_data_vote(&msg1, &data_proposal1_hash, l_dp1_size) - .expect("Expect vote success"); - - // Second data proposal - let tx2 = make_blob_tx("test2"); - store1.new_data_proposal(&crypto1, vec![tx2]); - - let data_proposal2 = store1 - .get_lane_latest_entry(pubkey1) - .unwrap() - .data_proposal - .clone(); - let data_proposal2_hash = data_proposal2.hash(); - let l_dp2_size = l_dp1_size + data_proposal2.estimate_size(); - - assert_eq!( - handle_data_proposal( - &mut store2, - &crypto2, - pubkey1, - data_proposal2, - known_contracts.clone() - ), - (DataProposalVerdict::Vote, Some(l_dp2_size)) - ); - let msg2 = crypto2 - .sign(MempoolNetMessage::DataVote( - data_proposal2_hash.clone(), - l_dp2_size, - )) - .expect("Could not sign DataVote message"); - - store1 - .on_data_vote(&msg2, &data_proposal2_hash, l_dp2_size) - .expect("vote success"); - - // Third data proposal - let tx3 = make_blob_tx("test3"); - store1.new_data_proposal(&crypto1, vec![tx3]); - - let data_proposal3 = store1 - .get_lane_latest_entry(pubkey1) - .unwrap() - .data_proposal - .clone(); - let data_proposal3_hash = data_proposal3.hash(); - let l_dp3_size = l_dp2_size + data_proposal3.estimate_size(); - - assert_eq!( - handle_data_proposal( - &mut store2, - &crypto2, - pubkey1, - data_proposal3, - known_contracts.clone() - ), - (DataProposalVerdict::Vote, Some(l_dp3_size)) - ); - let msg3 = crypto2 - .sign(MempoolNetMessage::DataVote( - data_proposal3_hash.clone(), - l_dp3_size, - )) - .expect("Could not sign DataVote message"); - - store1 - .on_data_vote(&msg3, &data_proposal3_hash, l_dp3_size) - .expect("vote success"); - - // Fourth data proposal - let tx4 = make_blob_tx("test4"); - store1.new_data_proposal(&crypto1, vec![tx4]); - - let data_proposal4 = store1 - .get_lane_latest_entry(pubkey1) - .unwrap() - .data_proposal - .clone(); - let data_proposal4_hash = data_proposal4.hash(); - let l_dp4_size = l_dp3_size + data_proposal4.estimate_size(); - - assert_eq!( - handle_data_proposal( - &mut store2, - &crypto2, - pubkey1, - data_proposal4, - known_contracts.clone() - ), - (DataProposalVerdict::Vote, Some(l_dp4_size)) - ); - let msg4 = crypto2 - .sign(MempoolNetMessage::DataVote( - data_proposal4_hash.clone(), - l_dp4_size, - )) - .expect("Could not sign DataVote message"); - - store1 - .on_data_vote(&msg4, &data_proposal4_hash, l_dp4_size) - .expect("vote success"); - - // Verifications - assert_eq!(store2.lanes.len(), 2); - assert!(store2.lanes.contains_key(pubkey1)); - - let store_own_lane = store1.lanes.get(pubkey1).expect("lane"); - let (_, first_data_proposal_entry) = store_own_lane - .data_proposals - .first() - .expect("first data proposal"); - assert_eq!( - first_data_proposal_entry - .data_proposal - .parent_data_proposal_hash, - None - ); - assert_eq!( - first_data_proposal_entry.data_proposal.txs, - vec![make_blob_tx("test1")] - ); + let dp_hash = data_proposal.hash(); + // 1 creates a DP + let cumul_size = storage + .store_data_proposal(&crypto, pubkey, data_proposal) + .unwrap(); - let validators_that_signed = first_data_proposal_entry - .signatures - .iter() - .map(|s| s.signature.validator.clone()) - .collect::>(); - assert!(validators_that_signed.contains(pubkey2)); - - let store2_own_lane = store2.lanes.get(pubkey1).expect("lane"); - let (_, first_data_proposal_entry) = store2_own_lane - .data_proposals - .first() - .expect("first data proposal"); - assert_eq!( - first_data_proposal_entry - .data_proposal - .parent_data_proposal_hash, - None - ); - assert_eq!( - first_data_proposal_entry.data_proposal.txs, - vec![make_blob_tx("test1")] - ); + let lane_entry = storage.get_by_hash(pubkey, &dp_hash).unwrap().unwrap(); + assert_eq!(1, lane_entry.signatures.len()); - let lane2_entries = store2 - .get_lane_entries_between_hashes(pubkey1, None, Some(&data_proposal4_hash)) - .expect("Could not load own lane entries"); + // 2 votes on this DP + let vote_msg = MempoolNetMessage::DataVote(dp_hash.clone(), cumul_size); + let signed_msg = crypto2.sign(vote_msg).expect("Failed to sign message"); - assert_eq!(lane2_entries.len(), 4); + let result = storage + .on_data_vote(&signed_msg, &dp_hash, cumul_size) + .unwrap(); + assert_eq!(result.0, dp_hash); + assert_eq!(2, result.1.len()); } - #[test_log::test] - fn test_vote() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let crypto2 = crypto::BlstCrypto::new("2".to_owned()).unwrap(); - let crypto3 = crypto::BlstCrypto::new("3".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); - let pubkey2 = crypto2.validator_pubkey(); - let pubkey3 = crypto3.validator_pubkey(); - let mut store2 = Storage::new(pubkey2.clone(), HashMap::default()); - let mut store3 = Storage::new(pubkey3.clone(), HashMap::default()); - let known_contracts2 = Arc::new(RwLock::new(KnownContracts::default())); - - let txs = vec![ - make_blob_tx("test1"), - make_blob_tx("test2"), - make_blob_tx("test3"), - make_blob_tx("test4"), - ]; - - store3.new_data_proposal(&crypto3, txs); - let data_proposal = store3 - .get_lane_latest_entry(pubkey3) - .unwrap() - .data_proposal - .clone(); - let size = store3.get_lane_latest_entry(pubkey3).unwrap().cumul_size; - let data_proposal_hash = data_proposal.hash(); - assert_eq!(store3.lanes.get(pubkey3).unwrap().data_proposals.len(), 1); + #[test_log::test(tokio::test)] + async fn test_on_poda_update() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + let mut storage = setup_storage(pubkey); - assert_eq!( - handle_data_proposal( - &mut store2, - &crypto2, - pubkey3, - data_proposal.clone(), - known_contracts2.clone() - ), - (DataProposalVerdict::Vote, Some(size)) - ); - // Assert we can vote multiple times - assert_eq!( - handle_data_proposal( - &mut store2, - &crypto2, - pubkey3, - data_proposal, - known_contracts2 - ), - (DataProposalVerdict::Vote, Some(size)) - ); + let crypto2: BlstCrypto = crypto::BlstCrypto::new("2".to_owned()).unwrap(); + let pubkey2 = crypto2.validator_pubkey(); + let crypto3: BlstCrypto = crypto::BlstCrypto::new("3".to_owned()).unwrap(); - let msg1 = crypto1 - .sign(MempoolNetMessage::DataVote( - data_proposal_hash.clone(), - size, - )) - .expect("Could not sign DataVote message"); + let dp = DataProposal { + parent_data_proposal_hash: None, + txs: vec![], + }; + let dp_hash = dp.hash(); - store3 - .on_data_vote(&msg1, &data_proposal_hash, size) - .expect("success"); + // 1 stores DP in 2's lane + let cumul_size = storage.store_data_proposal(&crypto, pubkey2, dp).unwrap(); - let msg2 = crypto2 - .sign(MempoolNetMessage::DataVote( - data_proposal_hash.clone(), - size, - )) - .expect("Could not sign DataVote message"); + // 3 votes on this DP + let vote_msg = MempoolNetMessage::DataVote(dp_hash.clone(), cumul_size); + let signed_msg = crypto3.sign(vote_msg).expect("Failed to sign message"); - store3 - .on_data_vote(&msg2, &data_proposal_hash, size) - .expect("success"); + // 1 updates its lane with all signatures + storage + .on_poda_update(pubkey2, &dp_hash, vec![signed_msg]) + .unwrap(); + let lane_entry = storage.get_by_hash(pubkey2, &dp_hash).unwrap().unwrap(); assert_eq!( - store3 - .lanes - .get(pubkey3) - .unwrap() - .data_proposals - .get(&data_proposal_hash) - .unwrap() - .signatures - .len(), - 3 + 2, + lane_entry.signatures.len(), + "{pubkey2}'s lane entry: {:?}", + lane_entry ); - - let (_, first_data_proposal_entry) = store3 - .lanes - .get(pubkey3) - .expect("lane") - .data_proposals - .first() - .expect("first data proposal"); - let validators_that_signed = first_data_proposal_entry - .signatures - .iter() - .map(|s| s.signature.validator.clone()) - .collect::>(); - assert!(validators_that_signed.contains(pubkey1)); - assert!(validators_that_signed.contains(pubkey2)); } - #[test_log::test] - fn test_update_lane_with_unverified_proof_transaction() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let crypto2 = crypto::BlstCrypto::new("2".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); - let pubkey2 = crypto2.validator_pubkey(); - - let mut store1 = Storage::new(pubkey1.clone(), HashMap::default()); - let known_contracts = Arc::new(RwLock::new(KnownContracts::default())); - - let contract_name = ContractName::new("test"); - let register_tx = make_register_contract_tx(contract_name.clone()); - - let proof_tx = make_unverified_proof_tx(contract_name.clone()); - - let data_proposal = DataProposal { + #[test_log::test(tokio::test)] + async fn test_get_lane_entries_between_hashes() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + let mut storage = setup_storage(pubkey); + let dp1 = DataProposal { parent_data_proposal_hash: None, - txs: vec![register_tx, proof_tx], + txs: vec![], + }; + let dp2 = DataProposal { + parent_data_proposal_hash: Some(dp1.hash()), + txs: vec![], + }; + let dp3 = DataProposal { + parent_data_proposal_hash: Some(dp2.hash()), + txs: vec![], }; - let data_proposal_hash = data_proposal.hash(); - - let (verdict, _) = handle_data_proposal( - &mut store1, - &crypto1, - pubkey2, - data_proposal, - known_contracts, - ); - assert_eq!(verdict, DataProposalVerdict::Refuse); - - // Ensure the lane was not updated with the unverified proof transaction - assert!(!store1.lane_has_data_proposal(pubkey2, &data_proposal_hash)); - } - - #[test_log::test] - fn test_update_lane_with_verified_proof_transaction() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); - - let mut store1 = Storage::new(pubkey1.clone(), HashMap::default()); - let known_contracts = Arc::new(RwLock::new(KnownContracts::default())); - - let contract_name = ContractName::new("test"); - let register_tx = make_register_contract_tx(contract_name.clone()); - - let proof_tx = make_verified_proof_tx(contract_name); + storage + .store_data_proposal(&crypto, pubkey, dp1.clone()) + .unwrap(); + storage + .store_data_proposal(&crypto, pubkey, dp2.clone()) + .unwrap(); + storage + .store_data_proposal(&crypto, pubkey, dp3.clone()) + .unwrap(); + + tracing::error!("dp1hash: {}", dp1.hash()); + tracing::error!("dp2hash: {}", dp2.hash()); + tracing::error!("dp3hash: {}", dp3.hash()); + + // [start, end] == [1, 2, 3] + let all_entries = storage + .get_lane_entries_between_hashes(pubkey, None, None) + .unwrap(); + assert_eq!(3, all_entries.len()); + + // ]1, end] == [2, 3] + let entries_from_1_to_end = storage + .get_lane_entries_between_hashes(pubkey, Some(&dp1.hash()), None) + .unwrap(); + assert_eq!(2, entries_from_1_to_end.len()); + assert_eq!(dp2, entries_from_1_to_end.first().unwrap().data_proposal); + assert_eq!(dp3, entries_from_1_to_end.last().unwrap().data_proposal); + + // [start, 2] == [1, 2] + let entries_from_start_to_2 = storage + .get_lane_entries_between_hashes(pubkey, None, Some(&dp2.hash())) + .unwrap(); + assert_eq!(2, entries_from_start_to_2.len()); + assert_eq!(dp1, entries_from_start_to_2.first().unwrap().data_proposal); + assert_eq!(dp2, entries_from_start_to_2.last().unwrap().data_proposal); + + // ]1, 2] == [2] + let entries_from_1_to_2 = storage + .get_lane_entries_between_hashes(pubkey, Some(&dp1.hash()), Some(&dp2.hash())) + .unwrap(); + assert_eq!(1, entries_from_1_to_2.len()); + assert_eq!(dp2, entries_from_1_to_2.first().unwrap().data_proposal); + + // ]1, 3] == [2, 3] + let entries_from_1_to_3 = storage + .get_lane_entries_between_hashes(pubkey, Some(&dp1.hash()), None) + .unwrap(); + assert_eq!(2, entries_from_1_to_3.len()); + assert_eq!(dp2, entries_from_1_to_3.first().unwrap().data_proposal); + assert_eq!(dp3, entries_from_1_to_3.last().unwrap().data_proposal); + + // ]1, 1[ == [] + let entries_from_1_to_1 = storage + .get_lane_entries_between_hashes(pubkey, Some(&dp1.hash()), Some(&dp1.hash())) + .unwrap(); + assert_eq!(0, entries_from_1_to_1.len()); + } + + #[test_log::test(tokio::test)] + async fn test_add_missing_lane_entries() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + let mut storage = setup_storage(pubkey); let data_proposal = DataProposal { parent_data_proposal_hash: None, - txs: vec![proof_tx.clone()], + txs: vec![], }; - - let (verdict, _) = handle_data_proposal( - &mut store1, - &crypto1, - pubkey1, + let data_proposal_hash = data_proposal.hash(); + let cumul_size: LaneBytesSize = LaneBytesSize(data_proposal.estimate_size() as u64); + let entry = LaneEntry { data_proposal, - known_contracts.clone(), - ); - assert_eq!(verdict, DataProposalVerdict::Refuse); // refused because contract not found - - let data_proposal = DataProposal { - parent_data_proposal_hash: None, - txs: vec![register_tx, proof_tx], + cumul_size, + signatures: vec![], }; - let (verdict, _) = handle_data_proposal( - &mut store1, - &crypto1, - pubkey1, - data_proposal, - known_contracts, - ); - assert_eq!(verdict, DataProposalVerdict::Vote); + storage + .add_missing_lane_entries(pubkey, vec![entry.clone()]) + .unwrap(); + assert!(storage.contains(pubkey, &data_proposal_hash)); } - #[test_log::test] - // This test currently panics as we no longer optimistically register contracts - #[should_panic] - fn test_new_data_proposal_with_register_tx_in_previous_uncommitted_car() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); - - let mut store1 = Storage::new(pubkey1.clone(), HashMap::default()); - let known_contracts = Arc::new(RwLock::new(KnownContracts::default())); - - let contract_name = ContractName::new("test"); - let register_tx = make_register_contract_tx(contract_name.clone()); - - let proof_tx = make_verified_proof_tx(contract_name.clone()); + #[test_log::test(tokio::test)] + async fn test_new_data_proposal() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + let mut storage = setup_storage(pubkey); - let data_proposal1 = DataProposal { - parent_data_proposal_hash: None, - txs: vec![register_tx], - }; - let data_proposal1_hash = data_proposal1.hash(); - - lane(&mut store1, pubkey1).add_new_proposal(&crypto1, data_proposal1); - - let data_proposal = DataProposal { - parent_data_proposal_hash: Some(data_proposal1_hash.clone()), - txs: vec![proof_tx], - }; + let txs = vec![Transaction::default()]; - let (verdict, _) = handle_data_proposal( - &mut store1, - &crypto1, - pubkey1, - data_proposal, - known_contracts, - ); - assert_eq!(verdict, DataProposalVerdict::Vote); + storage.new_data_proposal(&crypto, txs).unwrap(); - // Ensure the lane was updated with the DataProposal - let empty_verified_proof_tx = make_empty_verified_proof_tx(contract_name.clone()); - let saved_data_proposal = DataProposal { - parent_data_proposal_hash: Some(data_proposal1_hash), - txs: vec![empty_verified_proof_tx.clone()], - }; - assert!(store1.lane_has_data_proposal(pubkey1, &saved_data_proposal.hash())); + let tip = storage.lanes_tip.get(pubkey); + assert!(tip.is_some()); } #[test_log::test] - fn test_register_contract_and_proof_tx_in_same_car() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); + fn test_lane_size() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + let mut storage = setup_storage(pubkey); - let mut store1 = Storage::new(pubkey1.clone(), HashMap::default()); - let known_contracts = Arc::new(RwLock::new(KnownContracts::default())); + let dp1 = DataProposal { + parent_data_proposal_hash: None, + txs: vec![Transaction::default()], + }; - let contract_name = ContractName::new("test"); - let register_tx = make_register_contract_tx(contract_name.clone()); - let proof_tx = make_verified_proof_tx(contract_name.clone()); + let size = storage + .store_data_proposal(&crypto, pubkey, dp1.clone()) + .unwrap(); + assert_eq!(size, storage.get_lane_size(pubkey).unwrap()); + assert_eq!(size.0, dp1.estimate_size() as u64); + // Adding a new DP + let dp2 = DataProposal { + parent_data_proposal_hash: Some(dp1.hash()), + txs: vec![Transaction::default()], + }; + let size = storage + .store_data_proposal(&crypto, pubkey, dp2.clone()) + .unwrap(); + assert_eq!(size, storage.get_lane_size(pubkey).unwrap()); + assert_eq!(size.0, (dp1.estimate_size() + dp2.estimate_size()) as u64); + } + + #[test_log::test(tokio::test)] + async fn test_get_lane_pending_entries() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + let mut storage = setup_storage(pubkey); let data_proposal = DataProposal { parent_data_proposal_hash: None, - txs: vec![register_tx.clone(), proof_tx], + txs: vec![], }; - - let (verdict, _) = handle_data_proposal( - &mut store1, - &crypto1, - pubkey1, + let cumul_size: LaneBytesSize = LaneBytesSize(data_proposal.estimate_size() as u64); + let entry = LaneEntry { data_proposal, - known_contracts, - ); - assert_eq!(verdict, DataProposalVerdict::Vote); - - // Ensure the lane was updated with the DataProposal - let empty_verified_proof_tx = make_empty_verified_proof_tx(contract_name.clone()); - let saved_data_proposal = DataProposal { - parent_data_proposal_hash: None, - txs: vec![register_tx, empty_verified_proof_tx.clone()], + cumul_size, + signatures: vec![], }; - assert!(store1.lane_has_data_proposal(pubkey1, &saved_data_proposal.hash())); + storage.put(pubkey.clone(), entry).unwrap(); + let pending = storage.get_lane_pending_entries(pubkey, None).unwrap(); + assert_eq!(1, pending.len()); } - #[test_log::test] - fn test_register_contract_and_proof_tx_in_same_car_wrong_order() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let crypto2 = crypto::BlstCrypto::new("2".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); - let pubkey2 = crypto2.validator_pubkey(); - - let mut store1 = Storage::new(pubkey2.clone(), HashMap::default()); - let known_contracts = Arc::new(RwLock::new(KnownContracts::default())); - - let contract_name = ContractName::new("test"); - let register_tx = make_register_contract_tx(contract_name.clone()); - let proof_tx = make_verified_proof_tx(contract_name); - + #[test_log::test(tokio::test)] + async fn test_get_latest_car_and_new_cut() { + let crypto: BlstCrypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); + let pubkey = crypto.validator_pubkey(); + let mut storage = setup_storage(pubkey); + let staking = Staking::new(); let data_proposal = DataProposal { parent_data_proposal_hash: None, - txs: vec![proof_tx, register_tx], + txs: vec![], }; - let data_proposal_hash = data_proposal.hash(); - - let (verdict, _) = handle_data_proposal( - &mut store1, - &crypto1, - pubkey1, + let cumul_size: LaneBytesSize = LaneBytesSize(data_proposal.estimate_size() as u64); + let entry = LaneEntry { data_proposal, - known_contracts, - ); - assert_eq!(verdict, DataProposalVerdict::Refuse); - - // Ensure the lane was not updated with the DataProposal - assert!(!store1.lane_has_data_proposal(pubkey1, &data_proposal_hash)); - } - - #[test_log::test] - fn test_new_cut() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let crypto2 = crypto::BlstCrypto::new("2".to_owned()).unwrap(); - let pubkey1 = crypto1.validator_pubkey(); - let pubkey2 = crypto2.validator_pubkey(); - - let mut store1 = Storage::new(pubkey1.clone(), HashMap::default()); - let mut store2 = Storage::new(pubkey2.clone(), HashMap::default()); - let known_contracts1 = Arc::new(RwLock::new(KnownContracts::default())); - let known_contracts2 = Arc::new(RwLock::new(KnownContracts::default())); - let mut staking = Staking::default(); - staking.stake("pk1".into(), 100).expect("could not stake"); - staking - .delegate_to("pk1".into(), pubkey1.clone()) - .expect("could not delegate"); - staking.stake("pk2".into(), 100).expect("could not stake"); - staking - .delegate_to("pk2".into(), pubkey2.clone()) - .expect("could not delegate"); - staking - .bond(pubkey1.clone()) - .expect("Could not bond pubkey1"); - staking - .bond(pubkey2.clone()) - .expect("Could not bond pubkey2"); - - let tx1 = make_blob_tx("test1"); - store1.new_data_proposal(&crypto1, vec![tx1]); - let data_proposal1 = store1 - .get_lane_latest_entry(pubkey1) - .unwrap() - .data_proposal - .clone(); - let size = store1.get_lane_latest_entry(pubkey1).unwrap().cumul_size; - - assert_eq!( - handle_data_proposal( - &mut store2, - &crypto2, - pubkey1, - data_proposal1.clone(), - known_contracts2.clone() - ), - (DataProposalVerdict::Vote, Some(size)) - ); - - let tx2 = make_blob_tx("tx2"); - store2.new_data_proposal(&crypto2, vec![tx2]); - let data_proposal2 = store2 - .get_lane_latest_entry(pubkey2) - .unwrap() - .data_proposal - .clone(); - let size = store2.get_lane_latest_entry(pubkey2).unwrap().cumul_size; - - assert_eq!( - handle_data_proposal( - &mut store1, - &crypto1, - pubkey2, - data_proposal2.clone(), - known_contracts1 - ), - (DataProposalVerdict::Vote, Some(size)) - ); - - let cut1 = store1.new_cut(&staking); - assert_eq!(cut1.len(), 2); - assert_eq!(cut1[0].0, pubkey1.clone()); - assert_eq!(cut1[0].1, data_proposal1.hash()); - assert_eq!(cut1[1].0, pubkey2.clone()); - assert_eq!(cut1[1].1, data_proposal2.hash()); - } - - #[test_log::test] - fn test_poda() { - let crypto1 = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let crypto2 = crypto::BlstCrypto::new("2".to_owned()).unwrap(); - - let pubkey1 = crypto1.validator_pubkey(); - let pubkey2 = crypto2.validator_pubkey(); - - let mut store1 = Storage::new(pubkey1.clone(), HashMap::default()); - let mut staking = Staking::default(); - - staking.stake("pk1".into(), 100).expect("Staking failed"); - staking - .delegate_to("pk1".into(), pubkey1.clone()) - .expect("Delegation failed"); - staking.stake("pk2".into(), 100).expect("Staking failed"); - staking - .delegate_to("pk2".into(), pubkey2.clone()) - .expect("Delegation failed"); - - staking - .bond(pubkey1.clone()) - .expect("Could not bond pubkey1"); - staking - .bond(pubkey2.clone()) - .expect("Could not bond pubkey2"); - - let tx1 = make_blob_tx("test1"); - store1.new_data_proposal(&crypto1, vec![tx1]); - - let data_proposal = store1 - .get_lane_latest_entry(pubkey1) - .unwrap() - .data_proposal - .clone(); - let data_proposal_hash = data_proposal.hash(); - let size = store1.get_lane_latest_entry(pubkey1).unwrap().cumul_size; - - let msg2 = crypto2 - .sign(MempoolNetMessage::DataVote( - data_proposal_hash.clone(), - size, - )) - .expect("Could not sign DataVote message"); - - store1 - .on_data_vote(&msg2, &data_proposal_hash, size) - .expect("Expect vote success"); - - let cut = store1.new_cut(&staking); - let poda = cut[0].3.clone(); - - assert!(poda.validators.contains(pubkey1)); - assert!(poda.validators.contains(pubkey2)); - assert_eq!(cut.len(), 1); - } - - #[test_log::test] - fn test_add_new_proposal() { - let crypto = crypto::BlstCrypto::new("1".to_owned()).unwrap(); - let mut lane = Lane::default(); - - let tx = make_verified_proof_tx("testContract".into()); - - let data_proposal1 = DataProposal { - parent_data_proposal_hash: None, - txs: vec![tx], + cumul_size, + signatures: vec![], }; + storage.put(pubkey.clone(), entry).unwrap(); + let latest = storage.get_latest_car(pubkey, &staking, None).unwrap(); + assert!(latest.is_none()); - let size = lane.add_new_proposal(&crypto, data_proposal1.clone()); - assert_eq!(size, lane.get_lane_size()); - assert_eq!(size.0, data_proposal1.estimate_size() as u64); - - // Test adding the same proposal again - let size_again = lane.add_new_proposal(&crypto, data_proposal1.clone()); - assert_eq!(size, size_again); - - // Adding a new DP - let tx2 = make_register_contract_tx("testContract2".into()); - let data_proposal2 = DataProposal { - parent_data_proposal_hash: Some(data_proposal1.hash()), - txs: vec![tx2], - }; - let size = lane.add_new_proposal(&crypto, data_proposal2.clone()); - assert_eq!(size, lane.get_lane_size()); - assert_eq!( - size.0, - (data_proposal1.estimate_size() + data_proposal2.estimate_size()) as u64 - ); + // Force some signature for f+1 check if needed: + // This requires more advanced stubbing of Staking if you want a real test. - // Test adding the same proposal again - let size_again = lane.add_new_proposal(&crypto, data_proposal2.clone()); - assert_eq!(size, size_again); - let size = lane.add_new_proposal(&crypto, data_proposal1.clone()); - assert_eq!(size.0, data_proposal1.estimate_size() as u64); + let cut = storage.new_cut(&staking, &vec![]).unwrap(); + assert_eq!(0, cut.len()); } } diff --git a/src/mempool/storage_fjall.rs b/src/mempool/storage_fjall.rs new file mode 100644 index 000000000..fedff7a5c --- /dev/null +++ b/src/mempool/storage_fjall.rs @@ -0,0 +1,150 @@ +use std::{collections::HashMap, path::Path, sync::Arc}; + +use anyhow::{bail, Result}; +use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle, Slice}; +use tracing::info; + +use crate::{ + model::{DataProposalHash, Hashable, ValidatorPublicKey}, + utils::logger::LogMe, +}; + +use super::storage::{LaneEntry, Storage}; + +pub use hyle_model::LaneBytesSize; + +pub struct LanesStorage { + pub id: ValidatorPublicKey, + pub lanes_tip: HashMap, + db: Keyspace, + pub by_hash: PartitionHandle, +} + +impl Storage for LanesStorage { + fn new( + path: &Path, + id: ValidatorPublicKey, + lanes_tip: HashMap, + ) -> Result { + let db = Config::new(path) + .blob_cache(Arc::new(fjall::BlobCache::with_capacity_bytes( + 5 * 1024 * 1024 * 1024, // 5Go cache + ))) + .block_cache(Arc::new(fjall::BlockCache::with_capacity_bytes( + 5 * 1024 * 1024 * 1024, // 5Go cache + ))) + .open()?; + let by_hash = db.open_partition( + "dp", + PartitionCreateOptions::default() + .block_size(56 * 1024) + .manual_journal_persist(true) + .max_memtable_size(128 * 1024 * 1024), + )?; + + info!("{} DP(s) available", by_hash.len()?); + + Ok(LanesStorage { + id, + lanes_tip, + db, + by_hash, + }) + } + + fn id(&self) -> &ValidatorPublicKey { + &self.id + } + + fn persist(&self) -> Result<()> { + self.db + .persist(fjall::PersistMode::Buffer) + .map_err(Into::into) + } + + fn contains(&self, validator_key: &ValidatorPublicKey, dp_hash: &DataProposalHash) -> bool { + self.by_hash + .contains_key(format!("{}:{}", validator_key, dp_hash)) + .unwrap_or(false) + } + + fn get_by_hash( + &self, + validator_key: &ValidatorPublicKey, + dp_hash: &DataProposalHash, + ) -> Result> { + let item = self + .by_hash + .get(format!("{}:{}", validator_key, dp_hash)) + .log_warn(format!( + "Can't find DP {} for validator {}", + dp_hash, validator_key + ))?; + item.map(decode_from_item).transpose() + } + + fn put(&mut self, validator_key: ValidatorPublicKey, lane_entry: LaneEntry) -> Result<()> { + let dp_hash = lane_entry.data_proposal.hash(); + self.by_hash.insert( + format!("{}:{}", validator_key, dp_hash), + encode_to_item(lane_entry.clone())?, + )?; + tracing::info!( + "Added new DataProposal {} to lane, size: {}", + dp_hash, + lane_entry.cumul_size + ); + + // Updating validator's lane tip + if let Some(validator_tip) = self.lanes_tip.get(&validator_key) { + // If validator already has a lane, we only update the tip if DP-chain is respected + if let Some(parent_dp_hash) = &lane_entry.data_proposal.parent_data_proposal_hash { + if validator_tip == parent_dp_hash { + self.update_lane_tip(validator_key, lane_entry.data_proposal.hash()); + } + } + } else { + self.update_lane_tip(validator_key, lane_entry.data_proposal.hash()); + } + + Ok(()) + } + + fn update(&mut self, validator_key: ValidatorPublicKey, lane_entry: LaneEntry) -> Result<()> { + let dp_hash = lane_entry.data_proposal.hash(); + + if !self.contains(&validator_key, &dp_hash) { + bail!("LaneEntry does not exist"); + } + self.by_hash.insert( + format!("{}:{}", validator_key, dp_hash), + encode_to_item(lane_entry.clone())?, + )?; + + Ok(()) + } + + fn get_lane_tip(&self, validator: &ValidatorPublicKey) -> Option<&DataProposalHash> { + self.lanes_tip.get(validator) + } + + fn update_lane_tip( + &mut self, + validator: ValidatorPublicKey, + dp_hash: DataProposalHash, + ) -> Option { + self.lanes_tip.insert(validator, dp_hash) + } +} + +fn decode_from_item(item: Slice) -> Result { + bincode::decode_from_slice(&item, bincode::config::standard()) + .map(|(b, _)| b) + .map_err(Into::into) +} + +fn encode_to_item(lane_entry: LaneEntry) -> Result { + bincode::encode_to_vec(lane_entry, bincode::config::standard()) + .map(Slice::from) + .map_err(Into::into) +} diff --git a/src/mempool/storage_memory.rs b/src/mempool/storage_memory.rs new file mode 100644 index 000000000..06321f35d --- /dev/null +++ b/src/mempool/storage_memory.rs @@ -0,0 +1,103 @@ +use std::{collections::HashMap, path::Path}; + +use anyhow::{bail, Result}; +use tracing::info; + +use super::storage::{LaneEntry, Storage}; +use crate::model::{DataProposalHash, Hashable, ValidatorPublicKey}; + +pub struct LanesStorage { + pub id: ValidatorPublicKey, + pub lanes_tip: HashMap, + pub by_hash: HashMap, +} + +impl Storage for LanesStorage { + fn new( + _path: &Path, + id: ValidatorPublicKey, + lanes_tip: HashMap, + ) -> Result { + // FIXME: load from disk + let by_hash = HashMap::default(); + + info!("{} DP(s) available", by_hash.len()); + + Ok(LanesStorage { + id, + lanes_tip, + by_hash, + }) + } + + fn id(&self) -> &ValidatorPublicKey { + &self.id + } + + fn contains(&self, validator_key: &ValidatorPublicKey, dp_hash: &DataProposalHash) -> bool { + self.by_hash + .contains_key(&format!("{}:{}", validator_key, dp_hash)) + } + + fn get_by_hash( + &self, + validator_key: &ValidatorPublicKey, + dp_hash: &DataProposalHash, + ) -> Result> { + let item = self.by_hash.get(&format!("{}:{}", validator_key, dp_hash)); + Ok(item.cloned()) + } + + fn put(&mut self, validator_key: ValidatorPublicKey, lane_entry: LaneEntry) -> Result<()> { + let dp_hash = lane_entry.data_proposal.hash(); + self.by_hash + .insert(format!("{}:{}", validator_key, dp_hash), lane_entry.clone()); + tracing::info!( + "Added new DataProposal {} to lane, size: {}", + dp_hash, + lane_entry.cumul_size + ); + + // Updating validator's lane tip + if let Some(validator_tip) = self.get_lane_tip(&validator_key) { + // If validator already has a lane, we only update the tip if DP-chain is respected + if let Some(parent_dp_hash) = &lane_entry.data_proposal.parent_data_proposal_hash { + if validator_tip == parent_dp_hash { + self.update_lane_tip(validator_key, lane_entry.data_proposal.hash()); + } + } + } else { + self.update_lane_tip(validator_key, lane_entry.data_proposal.hash()); + } + + Ok(()) + } + + fn update(&mut self, validator_key: ValidatorPublicKey, lane_entry: LaneEntry) -> Result<()> { + let dp_hash = lane_entry.data_proposal.hash(); + + if !self.contains(&validator_key, &dp_hash) { + bail!("LaneEntry does not exist"); + } + self.by_hash + .insert(format!("{}:{}", validator_key, dp_hash), lane_entry.clone()); + + Ok(()) + } + + fn persist(&self) -> Result<()> { + Ok(()) + } + + fn get_lane_tip(&self, validator: &ValidatorPublicKey) -> Option<&DataProposalHash> { + self.lanes_tip.get(validator) + } + + fn update_lane_tip( + &mut self, + validator: ValidatorPublicKey, + dp_hash: DataProposalHash, + ) -> Option { + self.lanes_tip.insert(validator, dp_hash) + } +} diff --git a/src/tests/autobahn_testing.rs b/src/tests/autobahn_testing.rs index 68eea91b1..a5a102ebe 100644 --- a/src/tests/autobahn_testing.rs +++ b/src/tests/autobahn_testing.rs @@ -767,21 +767,21 @@ async fn mempool_fail_to_vote_on_fork() { assert_ne!( node2 .mempool_ctx - .last_validator_data_proposal(node1.mempool_ctx.validator_pubkey()) + .last_validator_lane_entry(node1.mempool_ctx.validator_pubkey()) .1, dp_fork_3.hash() ); assert_ne!( node3 .mempool_ctx - .last_validator_data_proposal(node1.mempool_ctx.validator_pubkey()) + .last_validator_lane_entry(node1.mempool_ctx.validator_pubkey()) .1, dp_fork_3.hash() ); assert_ne!( node4 .mempool_ctx - .last_validator_data_proposal(node1.mempool_ctx.validator_pubkey()) + .last_validator_lane_entry(node1.mempool_ctx.validator_pubkey()) .1, dp_fork_3.hash() );