From e5bd85a6c484e0befc84cbbf9968e11c860cc6ef Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Mon, 25 Feb 2019 17:06:39 +0000 Subject: [PATCH 01/22] reworked the dandelion rewrite (dandelion++) --- p2p/src/peer.rs | 7 + p2p/src/peers.rs | 55 ------- pool/src/lib.rs | 4 +- pool/src/pool.rs | 19 +-- pool/src/transaction_pool.rs | 30 ++-- pool/src/types.rs | 51 +++---- servers/src/common/adapters.rs | 56 ++++++-- servers/src/common/types.rs | 73 +++++++++- servers/src/grin/dandelion_monitor.rs | 199 ++++++++------------------ servers/src/grin/seed.rs | 17 --- servers/src/grin/server.rs | 5 +- 11 files changed, 216 insertions(+), 300 deletions(-) diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 1b50bf59ad..e57d25a84a 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::util::{Mutex, RwLock}; +use std::fmt; use std::fs::File; use std::net::{Shutdown, TcpStream}; use std::sync::Arc; @@ -54,6 +55,12 @@ pub struct Peer { connection: Option>, } +impl fmt::Debug for Peer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Peer({:?})", &self.info) + } +} + impl Peer { // Only accept and connect can be externally used to build a peer fn new(info: PeerInfo, adapter: Arc) -> Peer { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 19713b7d85..da1a4067d4 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -37,7 +37,6 @@ pub struct Peers { pub adapter: Arc, store: PeerStore, peers: RwLock>>, - dandelion_relay: RwLock)>>, config: P2PConfig, } @@ -48,7 +47,6 @@ impl Peers { store, config, peers: RwLock::new(HashMap::new()), - dandelion_relay: RwLock::new(None), } } @@ -87,39 +85,6 @@ impl Peers { self.save_peer(&peer_data) } - // Update the dandelion relay - pub fn update_dandelion_relay(&self) { - let peers = self.outgoing_connected_peers(); - - let peer = &self - .config - .dandelion_peer - .and_then(|ip| peers.iter().find(|x| x.info.addr == ip)) - .or(thread_rng().choose(&peers)); - - match peer { - Some(peer) => self.set_dandelion_relay(peer), - None => debug!("Could not update dandelion relay"), - } - } - - fn set_dandelion_relay(&self, peer: &Arc) { - // Clear the map and add new relay - let dandelion_relay = &self.dandelion_relay; - dandelion_relay - .write() - .replace((Utc::now().timestamp(), peer.clone())); - debug!( - "Successfully updated Dandelion relay to: {}", - peer.info.addr - ); - } - - // Get the dandelion relay - pub fn get_dandelion_relay(&self) -> Option<(i64, Arc)> { - self.dandelion_relay.read().clone() - } - pub fn is_known(&self, addr: PeerAddr) -> bool { self.peers.read().contains_key(&addr) } @@ -335,26 +300,6 @@ impl Peers { ); } - /// Relays the provided stem transaction to our single stem peer. - pub fn relay_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { - self.get_dandelion_relay() - .or_else(|| { - debug!("No dandelion relay, updating."); - self.update_dandelion_relay(); - self.get_dandelion_relay() - }) - // If still return an error, let the caller handle this as they see fit. - // The caller will "fluff" at this point as the stem phase is finished. - .ok_or(Error::NoDandelionRelay) - .map(|(_, relay)| { - if relay.is_connected() { - if let Err(e) = relay.send_stem_transaction(tx) { - debug!("Error sending stem transaction to peer relay: {:?}", e); - } - } - }) - } - /// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our /// peers. We may be connected to PEER_MAX_COUNT peers so we only /// want to broadcast to a random subset of peers. diff --git a/pool/src/lib.rs b/pool/src/lib.rs index b6ccc56c79..c506deaec6 100644 --- a/pool/src/lib.rs +++ b/pool/src/lib.rs @@ -35,6 +35,4 @@ pub mod transaction_pool; pub mod types; pub use crate::transaction_pool::TransactionPool; -pub use crate::types::{ - BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntryState, PoolError, TxSource, -}; +pub use crate::types::{BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolError, TxSource}; diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 5f2e944205..11c9091cba 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -23,7 +23,7 @@ use self::core::core::{ Block, BlockHeader, BlockSums, Committed, Transaction, TxKernel, Weighting, }; use self::util::RwLock; -use crate::types::{BlockChain, PoolEntry, PoolEntryState, PoolError}; +use crate::types::{BlockChain, PoolEntry, PoolError}; use grin_core as core; use grin_util as util; use std::collections::{HashMap, HashSet}; @@ -177,23 +177,6 @@ impl Pool { Ok(valid_txs) } - pub fn get_transactions_in_state(&self, state: PoolEntryState) -> Vec { - self.entries - .iter() - .filter(|x| x.state == state) - .map(|x| x.tx.clone()) - .collect::>() - } - - // Transition the specified pool entries to the new state. - pub fn transition_to_state(&mut self, txs: &[Transaction], state: PoolEntryState) { - for x in &mut self.entries { - if txs.contains(&x.tx) { - x.state = state; - } - } - } - // Aggregate this new tx with all existing txs in the pool. // If we can validate the aggregated tx against the current chain state // then we can safely add the tx to the pool. diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index becd7a7884..2cd81b0758 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -23,9 +23,7 @@ use self::core::core::verifier_cache::VerifierCache; use self::core::core::{transaction, Block, BlockHeader, Transaction, Weighting}; use self::util::RwLock; use crate::pool::Pool; -use crate::types::{ - BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource, -}; +use crate::types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource}; use chrono::prelude::*; use grin_core as core; use grin_util as util; @@ -79,10 +77,8 @@ impl TransactionPool { fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> { // Add tx to stempool (passing in all txs from txpool to validate against). self.stempool - .add_to_pool(entry, self.txpool.all_transactions(), header)?; - - // Note: we do not notify the adapter here, - // we let the dandelion monitor handle this. + .add_to_pool(entry.clone(), self.txpool.all_transactions(), header)?; + self.adapter.stem_tx_accepted(&entry.tx); Ok(()) } @@ -159,28 +155,18 @@ impl TransactionPool { self.blockchain.verify_coinbase_maturity(&tx)?; let entry = PoolEntry { - state: PoolEntryState::Fresh, src, tx_at: Utc::now(), tx, }; - // If we are in "stem" mode then check if this is a new tx or if we have seen it before. - // If new tx - add it to our stempool. - // If we have seen any of the kernels before then fallback to fluff, - // adding directly to txpool. - if stem - && self - .stempool - .find_matching_transactions(entry.tx.kernels()) - .is_empty() - { - self.add_to_stempool(entry, header)?; - return Ok(()); + if stem { + self.add_to_stempool(entry.clone(), header)?; + } else { + self.add_to_txpool(entry.clone(), header)?; + self.add_to_reorg_cache(entry); } - self.add_to_txpool(entry.clone(), header)?; - self.add_to_reorg_cache(entry); Ok(()) } diff --git a/pool/src/types.rs b/pool/src/types.rs index 04f1a1ee27..d5084d2c99 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -27,6 +27,7 @@ use failure::Fail; use grin_core as core; use grin_keychain as keychain; +/// TODO - This now maps to the "epoch length"? /// Dandelion relay timer const DANDELION_RELAY_SECS: u64 = 600; @@ -37,7 +38,7 @@ const DANDELION_EMBARGO_SECS: u64 = 180; const DANDELION_PATIENCE_SECS: u64 = 10; /// Dandelion stem probability (stem 90% of the time, fluff 10%). -const DANDELION_STEM_PROBABILITY: usize = 90; +const DANDELION_STEM_PROBABILITY: u64 = 90; /// Configuration for "Dandelion". /// Note: shared between p2p and pool. @@ -56,7 +57,18 @@ pub struct DandelionConfig { pub patience_secs: Option, /// Dandelion stem probability (stem 90% of the time, fluff 10% etc.) #[serde = "default_dandelion_stem_probability"] - pub stem_probability: Option, + pub stem_probability: Option, +} + +impl DandelionConfig { + pub fn stem_probability(&self) -> u64 { + self.stem_probability.unwrap_or(DANDELION_STEM_PROBABILITY) + } + + // TODO - Cleanup config for Dandelion++ + pub fn epoch_secs(&self) -> u64 { + self.relay_secs.unwrap_or(DANDELION_RELAY_SECS) + } } impl Default for DandelionConfig { @@ -82,7 +94,7 @@ fn default_dandelion_patience_secs() -> Option { Some(DANDELION_PATIENCE_SECS) } -fn default_dandelion_stem_probability() -> Option { +fn default_dandelion_stem_probability() -> Option { Some(DANDELION_STEM_PROBABILITY) } @@ -138,8 +150,6 @@ fn default_mineable_max_weight() -> usize { /// A single (possibly aggregated) transaction. #[derive(Clone, Debug)] pub struct PoolEntry { - /// The state of the pool entry. - pub state: PoolEntryState, /// Info on where this tx originated from. pub src: TxSource, /// Timestamp of when this tx was originally added to the pool. @@ -148,21 +158,6 @@ pub struct PoolEntry { pub tx: Transaction, } -/// The possible states a pool entry can be in. -#[derive(Clone, Copy, Debug, PartialEq)] -pub enum PoolEntryState { - /// A new entry, not yet processed. - Fresh, - /// Tx to be included in the next "stem" run. - ToStem, - /// Tx previously "stemmed" and propagated. - Stemmed, - /// Tx to be included in the next "fluff" run. - ToFluff, - /// Tx previously "fluffed" and broadcast. - Fluffed, -} - /// Placeholder: the data representing where we heard about a tx from. /// /// Used to make decisions based on transaction acceptance priority from @@ -267,13 +262,10 @@ pub trait BlockChain: Sync + Send { /// downstream processing of valid transactions by the rest of the system, most /// importantly the broadcasting of transactions to our peers. pub trait PoolAdapter: Send + Sync { - /// The transaction pool has accepted this transactions as valid and added - /// it to its internal cache. + /// The transaction pool has accepted this transaction as valid. fn tx_accepted(&self, tx: &transaction::Transaction); - /// The stem transaction pool has accepted this transactions as valid and - /// added it to its internal cache, we have waited for the "patience" timer - /// to fire and we now want to propagate the tx to the next Dandelion relay. - fn stem_tx_accepted(&self, tx: &transaction::Transaction) -> Result<(), PoolError>; + /// The stem transaction pool has accepted this transactions as valid. + fn stem_tx_accepted(&self, tx: &transaction::Transaction); } /// Dummy adapter used as a placeholder for real implementations @@ -281,9 +273,6 @@ pub trait PoolAdapter: Send + Sync { pub struct NoopAdapter {} impl PoolAdapter for NoopAdapter { - fn tx_accepted(&self, _: &transaction::Transaction) {} - - fn stem_tx_accepted(&self, _: &transaction::Transaction) -> Result<(), PoolError> { - Ok(()) - } + fn tx_accepted(&self, _tx: &transaction::Transaction) {} + fn stem_tx_accepted(&self, _tx: &transaction::Transaction) {} } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index d684fb31fa..46b490b32f 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -23,7 +23,9 @@ use std::time::Instant; use crate::chain::{self, BlockStatus, ChainAdapter, Options}; use crate::common::hooks::{ChainEvents, NetEvents}; -use crate::common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus}; +use crate::common::types::{ + self, ChainValidationMode, DandelionEpoch, ServerConfig, SyncState, SyncStatus, +}; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::transaction::Transaction; use crate::core::core::verifier_cache::VerifierCache; @@ -33,6 +35,7 @@ use crate::core::{core, global}; use crate::p2p; use crate::p2p::types::PeerAddr; use crate::pool; +use crate::pool::types::DandelionConfig; use crate::util::OneTime; use chrono::prelude::*; use chrono::Duration; @@ -685,26 +688,63 @@ impl ChainToPoolAndNetAdapter { /// transactions that have been accepted. pub struct PoolToNetAdapter { peers: OneTime>, + dandelion_epoch: Arc>, } -impl pool::PoolAdapter for PoolToNetAdapter { - fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { - self.peers() - .relay_stem_transaction(tx) - .map_err(|_| pool::PoolError::DandelionError)?; - Ok(()) +pub trait DandelionAdapter: Send + Sync { + fn is_stem(&self) -> bool; + + fn is_expired(&self) -> bool; + + fn next_epoch(&self); +} + +impl DandelionAdapter for PoolToNetAdapter { + fn is_stem(&self) -> bool { + self.dandelion_epoch.read().is_stem() } + fn is_expired(&self) -> bool { + self.dandelion_epoch.read().is_expired() + } + + fn next_epoch(&self) { + self.dandelion_epoch.write().next_epoch(&self.peers()); + } +} + +impl pool::PoolAdapter for PoolToNetAdapter { fn tx_accepted(&self, tx: &core::Transaction) { self.peers().broadcast_transaction(tx); } + + fn stem_tx_accepted(&self, tx: &core::Transaction) { + let mut epoch = self.dandelion_epoch.write(); + if epoch.is_expired() { + warn!("epoch expired, setting up next epoch"); + epoch.next_epoch(&self.peers()); + } + + if epoch.is_stem() { + if let Some(peer) = epoch.relay_peer(&self.peers()) { + warn!("Stemming this epoch, relaying to next peer."); + peer.send_stem_transaction(tx); + } else { + // TODO - no relay peer, no available outgoing peers, do we fluff here? + error!("What to do here? We have no relay peer?"); + } + } else { + warn!("Not forwarding stem tx. Collecting, aggregating and fluffing txs this epoch. (Ok and expected)."); + } + } } impl PoolToNetAdapter { /// Create a new pool to net adapter - pub fn new() -> PoolToNetAdapter { + pub fn new(config: DandelionConfig) -> PoolToNetAdapter { PoolToNetAdapter { peers: OneTime::new(), + dandelion_epoch: Arc::new(RwLock::new(DandelionEpoch::new(config))), } } diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index 6b9ed73cb8..e76511cff2 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -13,18 +13,20 @@ // limitations under the License. //! Server types -use crate::util::RwLock; use std::convert::From; use std::sync::Arc; +use chrono::prelude::{DateTime, Utc}; +use rand::prelude::*; + use crate::api; use crate::chain; use crate::core::global::ChainTypes; use crate::core::{core, pow}; use crate::p2p; use crate::pool; +use crate::pool::types::DandelionConfig; use crate::store; -use chrono::prelude::{DateTime, Utc}; /// Error type wrapping underlying module errors. #[derive(Debug)] @@ -421,3 +423,70 @@ impl chain::TxHashsetWriteStatus for SyncState { self.update(SyncStatus::TxHashsetDone); } } + +/// A node is either "stem" of "fluff" for the duration of a single epoch. +/// A node also maintains an outbound relay peer for the epoch. +#[derive(Debug)] +pub struct DandelionEpoch { + config: DandelionConfig, + // When did this epoch start? + start_time: Option, + // Are we in "stem" mode or "fluff" mode for this epoch? + is_stem: bool, + // Our current Dandelion relay peer (effective for this epoch). + relay_peer: Option>, +} + +impl DandelionEpoch { + pub fn new(config: DandelionConfig) -> DandelionEpoch { + DandelionEpoch { + config, + start_time: None, + is_stem: false, + relay_peer: None, + } + } + + pub fn is_expired(&self) -> bool { + let expired = if let Some(start_time) = self.start_time { + Utc::now().timestamp().saturating_sub(start_time) > self.config.epoch_secs() as i64 + } else { + true + }; + error!("DandelionEpoch: is_expired: {}", expired); + expired + } + + pub fn next_epoch(&mut self, peers: &Arc) { + self.start_time = Some(Utc::now().timestamp()); + self.relay_peer = peers.outgoing_connected_peers().first().cloned(); + + // If stem_probability == 90 then we stem 90% of the time. + let mut rng = rand::thread_rng(); + self.is_stem = rng.gen_range(0, 101) < self.config.stem_probability(); + + error!("DandelionEpoch: next_epoch: {:?}", self); + } + + // Are we stemming transactions in this epoch? + pub fn is_stem(&self) -> bool { + self.is_stem + } + + pub fn relay_peer(&mut self, peers: &Arc) -> Option> { + let mut update_relay = false; + if let Some(peer) = &self.relay_peer { + if !peer.is_connected() { + update_relay = true; + } + } else { + update_relay = true; + } + + if update_relay { + self.relay_peer = peers.outgoing_connected_peers().first().cloned(); + } + + self.relay_peer.clone() + } +} diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 57fcbb3081..48ffb78674 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::util::{Mutex, RwLock, StopState}; use chrono::prelude::Utc; use rand::{thread_rng, Rng}; use std::sync::Arc; use std::thread; use std::time::Duration; +use crate::common::adapters::DandelionAdapter; use crate::core::core::hash::Hashed; use crate::core::core::transaction; use crate::core::core::verifier_cache::VerifierCache; -use crate::pool::{DandelionConfig, PoolEntryState, PoolError, TransactionPool, TxSource}; +use crate::pool::{DandelionConfig, PoolError, TransactionPool, TxSource}; +use crate::util::{Mutex, RwLock, StopState}; /// A process to monitor transactions in the stempool. /// With Dandelion, transaction can be broadcasted in stem or fluff phase. @@ -35,6 +36,7 @@ use crate::pool::{DandelionConfig, PoolEntryState, PoolError, TransactionPool, T pub fn monitor_transactions( dandelion_config: DandelionConfig, tx_pool: Arc>, + adapter: Arc, verifier_cache: Arc>, stop_state: Arc>, ) { @@ -48,51 +50,43 @@ pub fn monitor_transactions( break; } + // TODO - may be preferable to loop more often and check for expired patience time? // This is the patience timer, we loop every n secs. let patience_secs = dandelion_config.patience_secs.unwrap(); thread::sleep(Duration::from_secs(patience_secs)); - // Step 1: find all "ToStem" entries in stempool from last run. - // Aggregate them up to give a single (valid) aggregated tx and propagate it - // to the next Dandelion relay along the stem. - if process_stem_phase(tx_pool.clone(), verifier_cache.clone()).is_err() { - error!("dand_mon: Problem with stem phase."); + // Our adapter hooks us into the current Dandelion "epoch". + // From this we can determine if we should fluff txs in stempool. + if adapter.is_expired() { + adapter.next_epoch(); } - // Step 2: find all "ToFluff" entries in stempool from last run. - // Aggregate them up to give a single (valid) aggregated tx and (re)add it - // to our pool with stem=false (which will then broadcast it). - if process_fluff_phase(tx_pool.clone(), verifier_cache.clone()).is_err() { - error!("dand_mon: Problem with fluff phase."); - } + // Vastly simplified - + // check if we are is_stem() via the adapter (current epoch) + // * if we are stem then do nothing (nothing to aggregate here) + // * if fluff then aggregate and add to txpool - // Step 3: now find all "Fresh" entries in stempool since last run. - // Coin flip for each (90/10) and label them as either "ToStem" or "ToFluff". - // We will process these in the next run (waiting patience secs). - if process_fresh_entries(dandelion_config.clone(), tx_pool.clone()).is_err() { - error!("dand_mon: Problem processing fresh pool entries."); + if !adapter.is_stem() { + if process_fluff_phase(&tx_pool, &verifier_cache).is_err() { + error!("dand_mon: Problem processing fresh pool entries."); + } } - // Step 4: now find all expired entries based on embargo timer. - if process_expired_entries(dandelion_config.clone(), tx_pool.clone()).is_err() { + // Now find all expired entries based on embargo timer. + if process_expired_entries(&dandelion_config, &tx_pool).is_err() { error!("dand_mon: Problem processing fresh pool entries."); } } }); } -fn process_stem_phase( - tx_pool: Arc>, - verifier_cache: Arc>, +fn process_fluff_phase( + tx_pool: &Arc>, + verifier_cache: &Arc>, ) -> Result<(), PoolError> { let mut tx_pool = tx_pool.write(); - let header = tx_pool.chain_head()?; - - let stem_txs = tx_pool - .stempool - .get_transactions_in_state(PoolEntryState::ToStem); - + let stem_txs = tx_pool.stempool.all_transactions(); if stem_txs.is_empty() { return Ok(()); } @@ -100,118 +94,35 @@ fn process_stem_phase( // Get the aggregate tx representing the entire txpool. let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; - let stem_txs = tx_pool - .stempool - .select_valid_transactions(stem_txs, txpool_tx, &header)?; - tx_pool - .stempool - .transition_to_state(&stem_txs, PoolEntryState::Stemmed); - - if stem_txs.len() > 0 { - debug!("dand_mon: Found {} txs for stemming.", stem_txs.len()); - - let agg_tx = transaction::aggregate(stem_txs)?; - agg_tx.validate( - transaction::Weighting::AsTransaction, - verifier_cache.clone(), - )?; - - let res = tx_pool.adapter.stem_tx_accepted(&agg_tx); - if res.is_err() { - debug!("dand_mon: Unable to propagate stem tx. No relay, fluffing instead."); - - let src = TxSource { - debug_name: "no_relay".to_string(), - identifier: "?.?.?.?".to_string(), - }; - - tx_pool.add_to_pool(src, agg_tx, false, &header)?; - } - } - Ok(()) -} - -fn process_fluff_phase( - tx_pool: Arc>, - verifier_cache: Arc>, -) -> Result<(), PoolError> { - let mut tx_pool = tx_pool.write(); - let header = tx_pool.chain_head()?; - let stem_txs = tx_pool .stempool - .get_transactions_in_state(PoolEntryState::ToFluff); + .select_valid_transactions(stem_txs, txpool_tx, &header)?; if stem_txs.is_empty() { return Ok(()); } - // Get the aggregate tx representing the entire txpool. - let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; + debug!("dand_mon: Found {} txs for fluffing.", stem_txs.len()); - let stem_txs = tx_pool - .stempool - .select_valid_transactions(stem_txs, txpool_tx, &header)?; - tx_pool - .stempool - .transition_to_state(&stem_txs, PoolEntryState::Fluffed); - - if stem_txs.len() > 0 { - debug!("dand_mon: Found {} txs for fluffing.", stem_txs.len()); + let agg_tx = transaction::aggregate(stem_txs)?; + agg_tx.validate( + transaction::Weighting::AsTransaction, + verifier_cache.clone(), + )?; - let agg_tx = transaction::aggregate(stem_txs)?; - agg_tx.validate( - transaction::Weighting::AsTransaction, - verifier_cache.clone(), - )?; + let src = TxSource { + debug_name: "fluff".to_string(), + identifier: "?.?.?.?".to_string(), + }; - let src = TxSource { - debug_name: "fluff".to_string(), - identifier: "?.?.?.?".to_string(), - }; - - tx_pool.add_to_pool(src, agg_tx, false, &header)?; - } - Ok(()) -} - -fn process_fresh_entries( - dandelion_config: DandelionConfig, - tx_pool: Arc>, -) -> Result<(), PoolError> { - let mut tx_pool = tx_pool.write(); - - let mut rng = thread_rng(); - - let fresh_entries = &mut tx_pool - .stempool - .entries - .iter_mut() - .filter(|x| x.state == PoolEntryState::Fresh) - .collect::>(); - - if fresh_entries.len() > 0 { - debug!( - "dand_mon: Found {} fresh entries in stempool.", - fresh_entries.len() - ); - - for x in &mut fresh_entries.iter_mut() { - let random = rng.gen_range(0, 101); - if random <= dandelion_config.stem_probability.unwrap() { - x.state = PoolEntryState::ToStem; - } else { - x.state = PoolEntryState::ToFluff; - } - } - } + tx_pool.add_to_pool(src, agg_tx, false, &header)?; Ok(()) } fn process_expired_entries( - dandelion_config: DandelionConfig, - tx_pool: Arc>, + dandelion_config: &DandelionConfig, + tx_pool: &Arc>, ) -> Result<(), PoolError> { let now = Utc::now().timestamp(); let embargo_sec = dandelion_config.embargo_secs.unwrap() + thread_rng().gen_range(0, 31); @@ -231,24 +142,26 @@ fn process_expired_entries( } } - if expired_entries.len() > 0 { - debug!("dand_mon: Found {} expired txs.", expired_entries.len()); + if expired_entries.is_empty() { + return Ok(()); + } - { - let mut tx_pool = tx_pool.write(); - let header = tx_pool.chain_head()?; - - for entry in expired_entries { - let src = TxSource { - debug_name: "embargo_expired".to_string(), - identifier: "?.?.?.?".to_string(), - }; - match tx_pool.add_to_pool(src, entry.tx, false, &header) { - Ok(_) => debug!("dand_mon: embargo expired, fluffed tx successfully."), - Err(e) => debug!("dand_mon: Failed to fluff expired tx - {:?}", e), - }; - } - } + debug!("dand_mon: Found {} expired txs.", expired_entries.len()); + + let mut tx_pool = tx_pool.write(); + let header = tx_pool.chain_head()?; + + let src = TxSource { + debug_name: "embargo_expired".to_string(), + identifier: "?.?.?.?".to_string(), + }; + + for entry in expired_entries { + match tx_pool.add_to_pool(src.clone(), entry.tx, false, &header) { + Ok(_) => debug!("dand_mon: embargo expired, fluffed tx successfully."), + Err(e) => debug!("dand_mon: Failed to fluff expired tx - {:?}", e), + }; } + Ok(()) } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index dd6f2d1348..7cc8a6fb55 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -119,8 +119,6 @@ pub fn connect_and_monitor( preferred_peers.clone(), ); - update_dandelion_relay(peers.clone(), dandelion_config.clone()); - prev = Utc::now(); start_attempt = cmp::min(6, start_attempt + 1); } @@ -244,21 +242,6 @@ fn monitor_peers( } } -fn update_dandelion_relay(peers: Arc, dandelion_config: DandelionConfig) { - // Dandelion Relay Updater - let dandelion_relay = peers.get_dandelion_relay(); - if let Some((last_added, _)) = dandelion_relay { - let dandelion_interval = Utc::now().timestamp() - last_added; - if dandelion_interval >= dandelion_config.relay_secs.unwrap() as i64 { - debug!("monitor_peers: updating expired dandelion relay"); - peers.update_dandelion_relay(); - } - } else { - debug!("monitor_peers: no dandelion relay updating"); - peers.update_dandelion_relay(); - } -} - // Check if we have any pre-existing peer in db. If so, start with those, // otherwise use the seeds provided. fn connect_to_seeds_and_preferred_peers( diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 5c6cb3a4a9..48b65141a3 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -154,7 +154,7 @@ impl Server { let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new())); let pool_adapter = Arc::new(PoolToChainAdapter::new()); - let pool_net_adapter = Arc::new(PoolToNetAdapter::new()); + let pool_net_adapter = Arc::new(PoolToNetAdapter::new(config.dandelion_config.clone())); let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new( config.pool_config.clone(), pool_adapter.clone(), @@ -207,6 +207,8 @@ impl Server { genesis.hash(), stop_state.clone(), )?); + + // Initialize various adapters with our dynamic set of connected peers. chain_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone()); net_adapter.init(p2p_server.peers.clone()); @@ -281,6 +283,7 @@ impl Server { dandelion_monitor::monitor_transactions( config.dandelion_config.clone(), tx_pool.clone(), + pool_net_adapter.clone(), verifier_cache.clone(), stop_state.clone(), ); From 9c7e3ece3d9c8fe00a611a90ecb14eece6ce65bb Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Mon, 25 Feb 2019 17:37:28 +0000 Subject: [PATCH 02/22] fallback to fluff/broadcast if we cannot stem the tx for any reason --- servers/src/common/adapters.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 46b490b32f..3c5d499d9c 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -721,20 +721,28 @@ impl pool::PoolAdapter for PoolToNetAdapter { fn stem_tx_accepted(&self, tx: &core::Transaction) { let mut epoch = self.dandelion_epoch.write(); if epoch.is_expired() { - warn!("epoch expired, setting up next epoch"); + debug!("Epoch expired, setting up next epoch."); epoch.next_epoch(&self.peers()); } + // If "stem" epoch attempt to relay the tx to the next Dandelion relay. + // Fallback to immediately fluffing the tx if we cannot stem for any reason. + // If "fluff" epoch then nothing to do right now (fluff when epoch expires). if epoch.is_stem() { if let Some(peer) = epoch.relay_peer(&self.peers()) { - warn!("Stemming this epoch, relaying to next peer."); - peer.send_stem_transaction(tx); + match peer.send_stem_transaction(tx) { + Ok(_) => info!("Stemming this epoch, relaying to next peer."), + Err(e) => { + error!("Stemming tx failed. Fluffing. {:?}", e); + self.peers().broadcast_transaction(tx); + } + } } else { - // TODO - no relay peer, no available outgoing peers, do we fluff here? - error!("What to do here? We have no relay peer?"); + error!("No relay peer. Fluffing."); + self.peers().broadcast_transaction(tx); } } else { - warn!("Not forwarding stem tx. Collecting, aggregating and fluffing txs this epoch. (Ok and expected)."); + info!("Fluff epoch. Aggregating stem tx(s). Fluffing when epoch expires."); } } } From d66e9f833510217468ab7a7b620e80fb56c75c46 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Wed, 27 Feb 2019 12:57:42 +0000 Subject: [PATCH 03/22] rework stem vs fluff logic during accepting tx --- pool/src/transaction_pool.rs | 18 +++++++++++++----- pool/src/types.rs | 7 +++++-- servers/src/common/adapters.rs | 12 ++++++++---- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 2cd81b0758..4bec36a2e4 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -78,7 +78,6 @@ impl TransactionPool { // Add tx to stempool (passing in all txs from txpool to validate against). self.stempool .add_to_pool(entry.clone(), self.txpool.all_transactions(), header)?; - self.adapter.stem_tx_accepted(&entry.tx); Ok(()) } @@ -120,8 +119,6 @@ impl TransactionPool { let txpool_tx = self.txpool.all_transactions_aggregate()?; self.stempool.reconcile(txpool_tx, header)?; } - - self.adapter.tx_accepted(&entry.tx); Ok(()) } @@ -160,11 +157,22 @@ impl TransactionPool { tx, }; - if stem { + // If this is a stem tx then attempt to stem. + // Any problems fallback to fluff later. + // If not stem then we are fluff. + let fluff = if stem { self.add_to_stempool(entry.clone(), header)?; + self.adapter.stem_tx_accepted(&entry.tx).is_err() } else { + true + }; + + // Fluff (broadcast) the tx if tagged as fluff or if + // stemming failed for any reason. + if fluff { self.add_to_txpool(entry.clone(), header)?; - self.add_to_reorg_cache(entry); + self.add_to_reorg_cache(entry.clone()); + self.adapter.tx_accepted(&entry.tx); } Ok(()) diff --git a/pool/src/types.rs b/pool/src/types.rs index d5084d2c99..ce15d0e5d5 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -264,8 +264,9 @@ pub trait BlockChain: Sync + Send { pub trait PoolAdapter: Send + Sync { /// The transaction pool has accepted this transaction as valid. fn tx_accepted(&self, tx: &transaction::Transaction); + /// The stem transaction pool has accepted this transactions as valid. - fn stem_tx_accepted(&self, tx: &transaction::Transaction); + fn stem_tx_accepted(&self, tx: &transaction::Transaction) -> Result<(), PoolError>; } /// Dummy adapter used as a placeholder for real implementations @@ -274,5 +275,7 @@ pub struct NoopAdapter {} impl PoolAdapter for NoopAdapter { fn tx_accepted(&self, _tx: &transaction::Transaction) {} - fn stem_tx_accepted(&self, _tx: &transaction::Transaction) {} + fn stem_tx_accepted(&self, _tx: &transaction::Transaction) -> Result<(), PoolError> { + Ok(()) + } } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 3c5d499d9c..d85720042a 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -718,7 +718,7 @@ impl pool::PoolAdapter for PoolToNetAdapter { self.peers().broadcast_transaction(tx); } - fn stem_tx_accepted(&self, tx: &core::Transaction) { + fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { let mut epoch = self.dandelion_epoch.write(); if epoch.is_expired() { debug!("Epoch expired, setting up next epoch."); @@ -731,18 +731,22 @@ impl pool::PoolAdapter for PoolToNetAdapter { if epoch.is_stem() { if let Some(peer) = epoch.relay_peer(&self.peers()) { match peer.send_stem_transaction(tx) { - Ok(_) => info!("Stemming this epoch, relaying to next peer."), + Ok(_) => { + info!("Stemming this epoch, relaying to next peer."); + Ok(()) + } Err(e) => { error!("Stemming tx failed. Fluffing. {:?}", e); - self.peers().broadcast_transaction(tx); + Err(pool::PoolError::DandelionError) } } } else { error!("No relay peer. Fluffing."); - self.peers().broadcast_transaction(tx); + Err(pool::PoolError::DandelionError) } } else { info!("Fluff epoch. Aggregating stem tx(s). Fluffing when epoch expires."); + Ok(()) } } } From 9daf0de6175ef1a00fa4b3d86677853b2767b631 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Thu, 28 Feb 2019 12:51:20 +0000 Subject: [PATCH 04/22] cleanup docs --- pool/src/transaction_pool.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 4bec36a2e4..b6b1ef3178 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -161,7 +161,10 @@ impl TransactionPool { // Any problems fallback to fluff later. // If not stem then we are fluff. let fluff = if stem { + // First attempt to add to stempool and return an error if this fails. self.add_to_stempool(entry.clone(), header)?; + // Then attempt to relay it to the next stem peer, + // falling back to fluffing locally if this fails. self.adapter.stem_tx_accepted(&entry.tx).is_err() } else { true From 6c332905f68721f4b9111a0d9c89157d732f80fb Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Thu, 28 Feb 2019 22:01:35 +0000 Subject: [PATCH 05/22] add is_stem to logging --- servers/src/common/types.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index e76511cff2..a1aa6bd5a4 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -453,7 +453,10 @@ impl DandelionEpoch { } else { true }; - error!("DandelionEpoch: is_expired: {}", expired); + error!( + "DandelionEpoch: is_expired: {}, is_stem: {}", + expired, self.is_stem + ); expired } From 8db03433ab5f1e0fad17f9c2a2da2e6778df65f8 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Tue, 5 Mar 2019 11:00:59 +0000 Subject: [PATCH 06/22] cleanup --- pool/src/pool.rs | 2 +- servers/src/common/adapters.rs | 4 ++-- servers/src/common/types.rs | 10 ++++----- servers/src/grin/dandelion_monitor.rs | 32 ++++++++++++++++++--------- 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 11c9091cba..8f4b1c6853 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -169,10 +169,10 @@ impl Pool { pub fn select_valid_transactions( &self, - txs: Vec, extra_tx: Option, header: &BlockHeader, ) -> Result, PoolError> { + let txs = self.all_transactions(); let valid_txs = self.validate_raw_txs(txs, extra_tx, header, Weighting::NoLimit)?; Ok(valid_txs) } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index d85720042a..26e3e20f7d 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -727,7 +727,7 @@ impl pool::PoolAdapter for PoolToNetAdapter { // If "stem" epoch attempt to relay the tx to the next Dandelion relay. // Fallback to immediately fluffing the tx if we cannot stem for any reason. - // If "fluff" epoch then nothing to do right now (fluff when epoch expires). + // If "fluff" epoch then nothing to do right now (fluff via Dandelion monitor). if epoch.is_stem() { if let Some(peer) = epoch.relay_peer(&self.peers()) { match peer.send_stem_transaction(tx) { @@ -745,7 +745,7 @@ impl pool::PoolAdapter for PoolToNetAdapter { Err(pool::PoolError::DandelionError) } } else { - info!("Fluff epoch. Aggregating stem tx(s). Fluffing when epoch expires."); + info!("Fluff epoch. Aggregating stem tx(s). Will fluff via Dandelion monitor."); Ok(()) } } diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index a1aa6bd5a4..ac71c71d57 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -442,7 +442,7 @@ impl DandelionEpoch { DandelionEpoch { config, start_time: None, - is_stem: false, + is_stem: true, relay_peer: None, } } @@ -451,12 +451,9 @@ impl DandelionEpoch { let expired = if let Some(start_time) = self.start_time { Utc::now().timestamp().saturating_sub(start_time) > self.config.epoch_secs() as i64 } else { + info!("DandelionEpoch: expired, is_stem: {}", self.is_stem); true }; - error!( - "DandelionEpoch: is_expired: {}, is_stem: {}", - expired, self.is_stem - ); expired } @@ -468,7 +465,8 @@ impl DandelionEpoch { let mut rng = rand::thread_rng(); self.is_stem = rng.gen_range(0, 101) < self.config.stem_probability(); - error!("DandelionEpoch: next_epoch: {:?}", self); + let addr = self.relay_peer.clone().map(|p| p.info.addr); + info!("DandelionEpoch: next_epoch: is_stem: {}, relay: {:?}", self.is_stem, addr); } // Are we stemming transactions in this epoch? diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 48ffb78674..f88f86d19d 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -61,11 +61,6 @@ pub fn monitor_transactions( adapter.next_epoch(); } - // Vastly simplified - - // check if we are is_stem() via the adapter (current epoch) - // * if we are stem then do nothing (nothing to aggregate here) - // * if fluff then aggregate and add to txpool - if !adapter.is_stem() { if process_fluff_phase(&tx_pool, &verifier_cache).is_err() { error!("dand_mon: Problem processing fresh pool entries."); @@ -86,18 +81,35 @@ fn process_fluff_phase( ) -> Result<(), PoolError> { let mut tx_pool = tx_pool.write(); - let stem_txs = tx_pool.stempool.all_transactions(); - if stem_txs.is_empty() { + // Get the aggregate tx representing the entire txpool. + let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; + + // Nothing to process? Then we are done. + if txpool_tx.is_none() { return Ok(()); } - // Get the aggregate tx representing the entire txpool. - let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; + // TODO - If this runs every 10s (patience time?) we don't have much time to aggregate txs. + // It would be good if we could let txs build up over a longer period of time here. + // This would complicate the selection and aggregation rules quite significantly though. + // We want to leave txs in here but we also want to maximize the chance of aggregation, + // so sometimes it is desirable to aggregate more recent txs along with them. + // + // Something like - + // * If everything is recent then leave them in there. + // * If anything is old enough to fluff then fluff everything (even recent txs). + // + // Or maybe the rule is as simple as - + // * If multiple txs then aggregate and fluff + // * If single tx then wait until old enough before fluffing + // + // Note: We do not want to simply wait for epoch to expire as that could be 10 mins. + // We likely want to aggregate and fluff after 60s or so. let header = tx_pool.chain_head()?; let stem_txs = tx_pool .stempool - .select_valid_transactions(stem_txs, txpool_tx, &header)?; + .select_valid_transactions(txpool_tx, &header)?; if stem_txs.is_empty() { return Ok(()); From 8ed4f100ab1260058ef0c0cbeeb7b96e16ae0b90 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Tue, 5 Mar 2019 11:01:34 +0000 Subject: [PATCH 07/22] rustfmt --- servers/src/common/types.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index ac71c71d57..e50e661c5a 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -466,7 +466,10 @@ impl DandelionEpoch { self.is_stem = rng.gen_range(0, 101) < self.config.stem_probability(); let addr = self.relay_peer.clone().map(|p| p.info.addr); - info!("DandelionEpoch: next_epoch: is_stem: {}, relay: {:?}", self.is_stem, addr); + info!( + "DandelionEpoch: next_epoch: is_stem: {}, relay: {:?}", + self.is_stem, addr + ); } // Are we stemming transactions in this epoch? From c42c4f67a0fa2b94b6003862a02795d009fec0c0 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Thu, 7 Mar 2019 11:47:49 +0000 Subject: [PATCH 08/22] cleanup monitor and logging --- servers/src/common/adapters.rs | 4 ++++ servers/src/common/types.rs | 30 +++++++++++++++++++++++---- servers/src/grin/dandelion_monitor.rs | 11 +++++----- servers/src/grin/seed.rs | 2 -- servers/src/grin/server.rs | 1 - 5 files changed, 35 insertions(+), 13 deletions(-) diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 26e3e20f7d..c8e67b38dd 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -691,11 +691,15 @@ pub struct PoolToNetAdapter { dandelion_epoch: Arc>, } +/// Adapter between the Dandelion monitor and the current Dandelion "epoch". pub trait DandelionAdapter: Send + Sync { + /// Is the node stemming (or fluffing) transactions in the current epoch? fn is_stem(&self) -> bool; + /// Is the current Dandelion epoch expired? fn is_expired(&self) -> bool; + /// Transition to the next Dandelion epoch (new stem/fluff state, select new relay peer). fn next_epoch(&self); } diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index e50e661c5a..a5de887dcb 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -438,6 +438,7 @@ pub struct DandelionEpoch { } impl DandelionEpoch { + /// Create a new Dandelion epoch, defaulting to "stem" and no outbound relay peer. pub fn new(config: DandelionConfig) -> DandelionEpoch { DandelionEpoch { config, @@ -447,16 +448,25 @@ impl DandelionEpoch { } } + /// Is the current Dandelion epoch expired? + /// It is expired if start_time is older than the configured epoch_secs. pub fn is_expired(&self) -> bool { let expired = if let Some(start_time) = self.start_time { Utc::now().timestamp().saturating_sub(start_time) > self.config.epoch_secs() as i64 } else { - info!("DandelionEpoch: expired, is_stem: {}", self.is_stem); + let addr = self.relay_peer.clone().map(|p| p.info.addr); + info!( + "DandelionEpoch: epoch expired, is_stem: {}, relay: {:?}", + self.is_stem, addr + ); true }; expired } + /// Transition to next Dandelion epoch. + /// Select stem/fluff based on configured stem_probability. + /// Choose a new outbound stem relay peer. pub fn next_epoch(&mut self, peers: &Arc) { self.start_time = Some(Utc::now().timestamp()); self.relay_peer = peers.outgoing_connected_peers().first().cloned(); @@ -467,20 +477,28 @@ impl DandelionEpoch { let addr = self.relay_peer.clone().map(|p| p.info.addr); info!( - "DandelionEpoch: next_epoch: is_stem: {}, relay: {:?}", - self.is_stem, addr + "DandelionEpoch: next_epoch: is_stem: {} ({}%), relay: {:?}", + self.is_stem, + self.config.stem_probability(), + addr ); } - // Are we stemming transactions in this epoch? + /// Are we stemming (or fluffing) transactions in this epoch? pub fn is_stem(&self) -> bool { self.is_stem } + /// What is out current relay peer? + /// If it is not connected then choose a new one. pub fn relay_peer(&mut self, peers: &Arc) -> Option> { let mut update_relay = false; if let Some(peer) = &self.relay_peer { if !peer.is_connected() { + info!( + "DandelionEpoch: relay_peer: {:?} not connected, choosing a new one.", + peer.info.addr + ); update_relay = true; } } else { @@ -489,6 +507,10 @@ impl DandelionEpoch { if update_relay { self.relay_peer = peers.outgoing_connected_peers().first().cloned(); + info!( + "DandelionEpoch: relay_peer: new peer chosen: {:?}", + self.relay_peer.clone().map(|p| p.info.addr) + ); } self.relay_peer.clone() diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index f88f86d19d..f1fcf19e7b 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -79,16 +79,12 @@ fn process_fluff_phase( tx_pool: &Arc>, verifier_cache: &Arc>, ) -> Result<(), PoolError> { + // Take a write lock on the txpool for the duration of this processing. let mut tx_pool = tx_pool.write(); // Get the aggregate tx representing the entire txpool. let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; - // Nothing to process? Then we are done. - if txpool_tx.is_none() { - return Ok(()); - } - // TODO - If this runs every 10s (patience time?) we don't have much time to aggregate txs. // It would be good if we could let txs build up over a longer period of time here. // This would complicate the selection and aggregation rules quite significantly though. @@ -115,7 +111,10 @@ fn process_fluff_phase( return Ok(()); } - debug!("dand_mon: Found {} txs for fluffing.", stem_txs.len()); + debug!( + "dand_mon: Found {} txs in local stempool to fluff", + stem_txs.len() + ); let agg_tx = transaction::aggregate(stem_txs)?; agg_tx.validate( diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 7cc8a6fb55..68886586cf 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -29,7 +29,6 @@ use crate::core::global; use crate::p2p; use crate::p2p::types::PeerAddr; use crate::p2p::ChainAdapter; -use crate::pool::DandelionConfig; use crate::util::{Mutex, StopState}; // DNS Seeds with contact email associated @@ -52,7 +51,6 @@ const FLOONET_DNS_SEEDS: &'static [&'static str] = &[ pub fn connect_and_monitor( p2p_server: Arc, capabilities: p2p::Capabilities, - dandelion_config: DandelionConfig, seed_list: Box Vec + Send>, preferred_peers: Option>, stop_state: Arc>, diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 48b65141a3..167201c258 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -229,7 +229,6 @@ impl Server { seed::connect_and_monitor( p2p_server.clone(), config.p2p_config.capabilities, - config.dandelion_config.clone(), seeder, config.p2p_config.peers_preferred.clone(), stop_state.clone(), From 88bb6b84033aa1e614416b9c591b88d13d15f4ef Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Thu, 7 Mar 2019 14:05:17 +0000 Subject: [PATCH 09/22] rework dandelion monitor to use simple cutoff for aggregation --- pool/src/lib.rs | 5 +- pool/src/pool.rs | 12 +-- pool/src/types.rs | 55 +++++-------- servers/src/common/types.rs | 13 ++-- servers/src/grin/dandelion_monitor.rs | 106 +++++++++++++------------- 5 files changed, 86 insertions(+), 105 deletions(-) diff --git a/pool/src/lib.rs b/pool/src/lib.rs index c506deaec6..635e00f532 100644 --- a/pool/src/lib.rs +++ b/pool/src/lib.rs @@ -34,5 +34,8 @@ mod pool; pub mod transaction_pool; pub mod types; +pub use crate::pool::Pool; pub use crate::transaction_pool::TransactionPool; -pub use crate::types::{BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolError, TxSource}; +pub use crate::types::{ + BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource, +}; diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 8f4b1c6853..56251f9db0 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -167,16 +167,6 @@ impl Pool { Ok(Some(tx)) } - pub fn select_valid_transactions( - &self, - extra_tx: Option, - header: &BlockHeader, - ) -> Result, PoolError> { - let txs = self.all_transactions(); - let valid_txs = self.validate_raw_txs(txs, extra_tx, header, Weighting::NoLimit)?; - Ok(valid_txs) - } - // Aggregate this new tx with all existing txs in the pool. // If we can validate the aggregated tx against the current chain state // then we can safely add the tx to the pool. @@ -250,7 +240,7 @@ impl Pool { Ok(new_sums) } - fn validate_raw_txs( + pub fn validate_raw_txs( &self, txs: Vec, extra_tx: Option, diff --git a/pool/src/types.rs b/pool/src/types.rs index ce15d0e5d5..3ece2dfcf3 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -27,15 +27,14 @@ use failure::Fail; use grin_core as core; use grin_keychain as keychain; -/// TODO - This now maps to the "epoch length"? -/// Dandelion relay timer -const DANDELION_RELAY_SECS: u64 = 600; +/// Dandelion "epoch" length. +const DANDELION_EPOCH_SECS: u64 = 600; -/// Dandelion embargo timer +/// Dandelion embargo timer. const DANDELION_EMBARGO_SECS: u64 = 180; -/// Dandelion patience timer -const DANDELION_PATIENCE_SECS: u64 = 10; +/// Dandelion aggregation timer. +const DANDELION_AGGREGATION_SECS: u64 = 30; /// Dandelion stem probability (stem 90% of the time, fluff 10%). const DANDELION_STEM_PROBABILITY: u64 = 90; @@ -44,54 +43,42 @@ const DANDELION_STEM_PROBABILITY: u64 = 90; /// Note: shared between p2p and pool. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct DandelionConfig { - /// Choose new Dandelion relay peer every n secs. - #[serde = "default_dandelion_relay_secs"] - pub relay_secs: Option, - /// Dandelion embargo, fluff and broadcast tx if not seen on network before - /// embargo expires. - #[serde = "default_dandelion_embargo_secs"] + /// Length of each "epoch". + #[serde(default = "default_dandelion_epoch_secs")] + pub epoch_secs: Option, + /// Dandelion embargo timer. Fluff and broadcast individual txs if not seen + /// on network before embargo expires. + #[serde(default = "default_dandelion_embargo_secs")] pub embargo_secs: Option, - /// Dandelion patience timer, fluff/stem processing runs every n secs. - /// Tx aggregation happens on stem txs received within this window. - #[serde = "default_dandelion_patience_secs"] - pub patience_secs: Option, + /// Dandelion aggregation timer. + #[serde(default = "default_dandelion_aggregation_secs")] + pub aggregation_secs: Option, /// Dandelion stem probability (stem 90% of the time, fluff 10% etc.) - #[serde = "default_dandelion_stem_probability"] + #[serde(default = "default_dandelion_stem_probability")] pub stem_probability: Option, } -impl DandelionConfig { - pub fn stem_probability(&self) -> u64 { - self.stem_probability.unwrap_or(DANDELION_STEM_PROBABILITY) - } - - // TODO - Cleanup config for Dandelion++ - pub fn epoch_secs(&self) -> u64 { - self.relay_secs.unwrap_or(DANDELION_RELAY_SECS) - } -} - impl Default for DandelionConfig { fn default() -> DandelionConfig { DandelionConfig { - relay_secs: default_dandelion_relay_secs(), + epoch_secs: default_dandelion_epoch_secs(), embargo_secs: default_dandelion_embargo_secs(), - patience_secs: default_dandelion_patience_secs(), + aggregation_secs: default_dandelion_aggregation_secs(), stem_probability: default_dandelion_stem_probability(), } } } -fn default_dandelion_relay_secs() -> Option { - Some(DANDELION_RELAY_SECS) +fn default_dandelion_epoch_secs() -> Option { + Some(DANDELION_EPOCH_SECS) } fn default_dandelion_embargo_secs() -> Option { Some(DANDELION_EMBARGO_SECS) } -fn default_dandelion_patience_secs() -> Option { - Some(DANDELION_PATIENCE_SECS) +fn default_dandelion_aggregation_secs() -> Option { + Some(DANDELION_AGGREGATION_SECS) } fn default_dandelion_stem_probability() -> Option { diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index a5de887dcb..dd0ed4fd66 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -452,7 +452,8 @@ impl DandelionEpoch { /// It is expired if start_time is older than the configured epoch_secs. pub fn is_expired(&self) -> bool { let expired = if let Some(start_time) = self.start_time { - Utc::now().timestamp().saturating_sub(start_time) > self.config.epoch_secs() as i64 + let epoch_secs = self.config.epoch_secs.expect("epoch_secs config missing") as i64; + Utc::now().timestamp().saturating_sub(start_time) > epoch_secs } else { let addr = self.relay_peer.clone().map(|p| p.info.addr); info!( @@ -473,14 +474,16 @@ impl DandelionEpoch { // If stem_probability == 90 then we stem 90% of the time. let mut rng = rand::thread_rng(); - self.is_stem = rng.gen_range(0, 101) < self.config.stem_probability(); + let stem_probability = self + .config + .stem_probability + .expect("stem_probability config missing"); + self.is_stem = rng.gen_range(0, 101) < stem_probability; let addr = self.relay_peer.clone().map(|p| p.info.addr); info!( "DandelionEpoch: next_epoch: is_stem: {} ({}%), relay: {:?}", - self.is_stem, - self.config.stem_probability(), - addr + self.is_stem, stem_probability, addr ); } diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index f1fcf19e7b..e75b8823a2 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -22,7 +22,7 @@ use crate::common::adapters::DandelionAdapter; use crate::core::core::hash::Hashed; use crate::core::core::transaction; use crate::core::core::verifier_cache::VerifierCache; -use crate::pool::{DandelionConfig, PoolError, TransactionPool, TxSource}; +use crate::pool::{DandelionConfig, Pool, PoolEntry, PoolError, TransactionPool, TxSource}; use crate::util::{Mutex, RwLock, StopState}; /// A process to monitor transactions in the stempool. @@ -46,15 +46,11 @@ pub fn monitor_transactions( .name("dandelion".to_string()) .spawn(move || { loop { + // Halt Dandelion monitor if we have been notified that we are stopping. if stop_state.lock().is_stopped() { break; } - // TODO - may be preferable to loop more often and check for expired patience time? - // This is the patience timer, we loop every n secs. - let patience_secs = dandelion_config.patience_secs.unwrap(); - thread::sleep(Duration::from_secs(patience_secs)); - // Our adapter hooks us into the current Dandelion "epoch". // From this we can determine if we should fluff txs in stempool. if adapter.is_expired() { @@ -62,7 +58,7 @@ pub fn monitor_transactions( } if !adapter.is_stem() { - if process_fluff_phase(&tx_pool, &verifier_cache).is_err() { + if process_fluff_phase(&dandelion_config, &tx_pool, &verifier_cache).is_err() { error!("dand_mon: Problem processing fresh pool entries."); } } @@ -71,52 +67,60 @@ pub fn monitor_transactions( if process_expired_entries(&dandelion_config, &tx_pool).is_err() { error!("dand_mon: Problem processing fresh pool entries."); } + + // Monitor loops every 10s. + thread::sleep(Duration::from_secs(10)); } }); } +// Query the pool for transactions older than the cutoff. +// Used for both periodic fluffing and handling expired embargo timer. +fn select_txs_cutoff(pool: &Pool, cutoff_secs: u64) -> Vec { + let cutoff = Utc::now().timestamp() - cutoff_secs as i64; + pool.entries + .iter() + .filter(|x| x.tx_at.timestamp() < cutoff) + .cloned() + .collect() +} + fn process_fluff_phase( + dandelion_config: &DandelionConfig, tx_pool: &Arc>, verifier_cache: &Arc>, ) -> Result<(), PoolError> { // Take a write lock on the txpool for the duration of this processing. let mut tx_pool = tx_pool.write(); - // Get the aggregate tx representing the entire txpool. - let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; - - // TODO - If this runs every 10s (patience time?) we don't have much time to aggregate txs. - // It would be good if we could let txs build up over a longer period of time here. - // This would complicate the selection and aggregation rules quite significantly though. - // We want to leave txs in here but we also want to maximize the chance of aggregation, - // so sometimes it is desirable to aggregate more recent txs along with them. - // - // Something like - - // * If everything is recent then leave them in there. - // * If anything is old enough to fluff then fluff everything (even recent txs). - // - // Or maybe the rule is as simple as - - // * If multiple txs then aggregate and fluff - // * If single tx then wait until old enough before fluffing - // - // Note: We do not want to simply wait for epoch to expire as that could be 10 mins. - // We likely want to aggregate and fluff after 60s or so. - - let header = tx_pool.chain_head()?; - let stem_txs = tx_pool - .stempool - .select_valid_transactions(txpool_tx, &header)?; + let cutoff_secs = dandelion_config + .aggregation_secs + .expect("aggregation secs config missing"); + let fluffable_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs); - if stem_txs.is_empty() { + if fluffable_entries.is_empty() { return Ok(()); } + let header = tx_pool.chain_head()?; + + let fluffable_txs = { + let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; + let txs: Vec<_> = fluffable_entries.iter().map(|x| x.tx.clone()).collect(); + tx_pool.stempool.validate_raw_txs( + txs, + txpool_tx, + &header, + transaction::Weighting::NoLimit, + )? + }; + debug!( "dand_mon: Found {} txs in local stempool to fluff", - stem_txs.len() + fluffable_txs.len() ); - let agg_tx = transaction::aggregate(stem_txs)?; + let agg_tx = transaction::aggregate(fluffable_txs)?; agg_tx.validate( transaction::Weighting::AsTransaction, verifier_cache.clone(), @@ -135,23 +139,14 @@ fn process_expired_entries( dandelion_config: &DandelionConfig, tx_pool: &Arc>, ) -> Result<(), PoolError> { - let now = Utc::now().timestamp(); - let embargo_sec = dandelion_config.embargo_secs.unwrap() + thread_rng().gen_range(0, 31); - let cutoff = now - embargo_sec as i64; - - let mut expired_entries = vec![]; - { - let tx_pool = tx_pool.read(); - for entry in tx_pool - .stempool - .entries - .iter() - .filter(|x| x.tx_at.timestamp() < cutoff) - { - debug!("dand_mon: Embargo timer expired for {:?}", entry.tx.hash()); - expired_entries.push(entry.clone()); - } - } + // Take a write lock on the txpool for the duration of this processing. + let mut tx_pool = tx_pool.write(); + + let embargo_secs = dandelion_config + .embargo_secs + .expect("embargo_secs config missing") + + thread_rng().gen_range(0, 31); + let expired_entries = select_txs_cutoff(&tx_pool.stempool, embargo_secs); if expired_entries.is_empty() { return Ok(()); @@ -159,7 +154,6 @@ fn process_expired_entries( debug!("dand_mon: Found {} expired txs.", expired_entries.len()); - let mut tx_pool = tx_pool.write(); let header = tx_pool.chain_head()?; let src = TxSource { @@ -168,9 +162,13 @@ fn process_expired_entries( }; for entry in expired_entries { + let txhash = entry.tx.hash(); match tx_pool.add_to_pool(src.clone(), entry.tx, false, &header) { - Ok(_) => debug!("dand_mon: embargo expired, fluffed tx successfully."), - Err(e) => debug!("dand_mon: Failed to fluff expired tx - {:?}", e), + Ok(_) => info!( + "dand_mon: embargo expired for {}, fluffed successfully.", + txhash + ), + Err(e) => warn!("dand_mon: failed to fluff expired tx {}, {:?}", txhash, e), }; } From a7fe963052adad5ca527b39f84e5d125f6d9d691 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Mon, 11 Mar 2019 11:09:08 +0000 Subject: [PATCH 10/22] transition to next epoch *after* processing tx so we fluff final outstanding txs --- servers/src/common/adapters.rs | 6 ++---- servers/src/common/types.rs | 7 +------ servers/src/grin/dandelion_monitor.rs | 13 +++++++------ 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index c8e67b38dd..f6e6f71c18 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -723,11 +723,9 @@ impl pool::PoolAdapter for PoolToNetAdapter { } fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { + // Take write lock on the current epoch. + // We need to be able to update the current relay peer if not currently connected. let mut epoch = self.dandelion_epoch.write(); - if epoch.is_expired() { - debug!("Epoch expired, setting up next epoch."); - epoch.next_epoch(&self.peers()); - } // If "stem" epoch attempt to relay the tx to the next Dandelion relay. // Fallback to immediately fluffing the tx if we cannot stem for any reason. diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index dd0ed4fd66..8c05189999 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -455,11 +455,6 @@ impl DandelionEpoch { let epoch_secs = self.config.epoch_secs.expect("epoch_secs config missing") as i64; Utc::now().timestamp().saturating_sub(start_time) > epoch_secs } else { - let addr = self.relay_peer.clone().map(|p| p.info.addr); - info!( - "DandelionEpoch: epoch expired, is_stem: {}, relay: {:?}", - self.is_stem, addr - ); true }; expired @@ -492,7 +487,7 @@ impl DandelionEpoch { self.is_stem } - /// What is out current relay peer? + /// What is our current relay peer? /// If it is not connected then choose a new one. pub fn relay_peer(&mut self, peers: &Arc) -> Option> { let mut update_relay = false; diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index e75b8823a2..772475bd40 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -51,12 +51,6 @@ pub fn monitor_transactions( break; } - // Our adapter hooks us into the current Dandelion "epoch". - // From this we can determine if we should fluff txs in stempool. - if adapter.is_expired() { - adapter.next_epoch(); - } - if !adapter.is_stem() { if process_fluff_phase(&dandelion_config, &tx_pool, &verifier_cache).is_err() { error!("dand_mon: Problem processing fresh pool entries."); @@ -68,6 +62,13 @@ pub fn monitor_transactions( error!("dand_mon: Problem processing fresh pool entries."); } + // Handle the tx above *before* we transition to next epoch. + // This gives us an opportunity to do the final "fluff" before we start + // stemming on the subsequent epoch. + if adapter.is_expired() { + adapter.next_epoch(); + } + // Monitor loops every 10s. thread::sleep(Duration::from_secs(10)); } From fbd158e927fc2ca5ed4e40bafb9ee72336119705 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Mon, 11 Mar 2019 12:45:27 +0000 Subject: [PATCH 11/22] fluff all txs in stempool if any are older than 30s aggressively aggregate when we can --- servers/src/grin/dandelion_monitor.rs | 31 ++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 772475bd40..f15244e19b 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -52,7 +52,9 @@ pub fn monitor_transactions( } if !adapter.is_stem() { - if process_fluff_phase(&dandelion_config, &tx_pool, &verifier_cache).is_err() { + if process_fluff_phase(&dandelion_config, &tx_pool, &adapter, &verifier_cache) + .is_err() + { error!("dand_mon: Problem processing fresh pool entries."); } } @@ -89,17 +91,36 @@ fn select_txs_cutoff(pool: &Pool, cutoff_secs: u64) -> Vec { fn process_fluff_phase( dandelion_config: &DandelionConfig, tx_pool: &Arc>, + adapter: &Arc, verifier_cache: &Arc>, ) -> Result<(), PoolError> { // Take a write lock on the txpool for the duration of this processing. let mut tx_pool = tx_pool.write(); + // if we are expired the look for all txs. + // if not expired then - + // if multiple look for all txs + // if single then wait 30s + + let all_entries = select_txs_cutoff(&tx_pool.stempool, 0); + if all_entries.is_empty() { + return Ok(()); + } + let cutoff_secs = dandelion_config .aggregation_secs .expect("aggregation secs config missing"); - let fluffable_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs); - - if fluffable_entries.is_empty() { + let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs); + + if adapter.is_expired() { + // If epoch is expired, fluff *all* outstanding entries in stempool. + } else if cutoff_entries.len() > 0 { + // If *any* entry older than aggregation_secs (30s) then fluff *all* entries. + // If we only have a single tx in the stempool and it is older + // than aggregation_secs we will fluff it without aggregation, + // but we must broadcast the tx in a timely manner. + } else { + // Give any fresh txs time to aggregate, do nothing this time. return Ok(()); } @@ -107,7 +128,7 @@ fn process_fluff_phase( let fluffable_txs = { let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; - let txs: Vec<_> = fluffable_entries.iter().map(|x| x.tx.clone()).collect(); + let txs: Vec<_> = all_entries.iter().map(|x| x.tx.clone()).collect(); tx_pool.stempool.validate_raw_txs( txs, txpool_tx, From 69ad669b65771f7c33cc0842a1312d494a4413a0 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Mon, 11 Mar 2019 13:15:54 +0000 Subject: [PATCH 12/22] fix rebase onto 1.1.0 --- servers/src/common/types.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index 8c05189999..3514cf6422 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -27,6 +27,7 @@ use crate::p2p; use crate::pool; use crate::pool::types::DandelionConfig; use crate::store; +use crate::util::RwLock; /// Error type wrapping underlying module errors. #[derive(Debug)] From 89c2f35329d5715f16ab39f6a2749c33923a94a9 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Mon, 11 Mar 2019 14:11:08 +0000 Subject: [PATCH 13/22] default config comments for Dandelion --- config/src/comments.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/config/src/comments.rs b/config/src/comments.rs index 1849b656f3..b5e3f6d970 100644 --- a/config/src/comments.rs +++ b/config/src/comments.rs @@ -141,28 +141,29 @@ fn comments() -> HashMap { ); retval.insert( - "relay_secs".to_string(), + "epoch_secs".to_string(), " -#dandelion relay time (choose new relay peer every n secs) +#dandelion epoch duration " .to_string(), ); retval.insert( - "embargo_secs".to_string(), + "aggregation_secs".to_string(), " -#fluff and broadcast after embargo expires if tx not seen on network +#dandelion aggregation period in secs " .to_string(), ); retval.insert( - "patience_secs".to_string(), + "embargo_secs".to_string(), " -#run dandelion stem/fluff processing every n secs (stem tx aggregation in this window) +#fluff and broadcast after embargo expires if tx not seen on network " .to_string(), ); + retval.insert( "stem_probability".to_string(), " From bd46732f59fbf50a0a65c73818206e8c8f9cedb1 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Mon, 11 Mar 2019 14:26:13 +0000 Subject: [PATCH 14/22] fix code to reflect our tests - fallback to txpool on stempool error --- pool/src/transaction_pool.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index b6b1ef3178..531ed65978 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -161,11 +161,11 @@ impl TransactionPool { // Any problems fallback to fluff later. // If not stem then we are fluff. let fluff = if stem { - // First attempt to add to stempool and return an error if this fails. - self.add_to_stempool(entry.clone(), header)?; - // Then attempt to relay it to the next stem peer, - // falling back to fluffing locally if this fails. - self.adapter.stem_tx_accepted(&entry.tx).is_err() + // Attempt to add to stempool, notifying adapter to relay to outbound peer. + // Fall back to fluffing (via txpool) if stempool interaction fails. + self.add_to_stempool(entry.clone(), header) + .and_then(|_| self.adapter.stem_tx_accepted(&entry.tx)) + .is_err() } else { true }; From 9df229fd3566ab61e688065662804c666e1449c4 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Tue, 12 Mar 2019 13:05:19 +0000 Subject: [PATCH 15/22] log fluff and expire errors in dandelion monitor --- servers/src/grin/dandelion_monitor.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index f15244e19b..dc38e75d8c 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -52,17 +52,17 @@ pub fn monitor_transactions( } if !adapter.is_stem() { - if process_fluff_phase(&dandelion_config, &tx_pool, &adapter, &verifier_cache) - .is_err() - { - error!("dand_mon: Problem processing fresh pool entries."); - } + let _ = + process_fluff_phase(&dandelion_config, &tx_pool, &adapter, &verifier_cache) + .map_err(|e| { + error!("dand_mon: Problem processing fluff phase. {:?}", e); + }); } // Now find all expired entries based on embargo timer. - if process_expired_entries(&dandelion_config, &tx_pool).is_err() { - error!("dand_mon: Problem processing fresh pool entries."); - } + let _ = process_expired_entries(&dandelion_config, &tx_pool).map_err(|e| { + error!("dand_mon: Problem processing expired entries. {:?}", e); + }); // Handle the tx above *before* we transition to next epoch. // This gives us an opportunity to do the final "fluff" before we start From 5e64a838372c349b38e475fd89d5d768e3fc224e Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Thu, 14 Mar 2019 12:17:28 +0000 Subject: [PATCH 16/22] cleanup --- pool/src/pool.rs | 6 +++--- servers/src/grin/dandelion_monitor.rs | 22 ++++++---------------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 56251f9db0..a7d355f928 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -139,7 +139,7 @@ impl Pool { // Verify these txs produce an aggregated tx below max tx weight. // Return a vec of all the valid txs. let txs = self.validate_raw_txs( - tx_buckets, + &tx_buckets, None, &header, Weighting::AsLimitedTransaction { max_weight }, @@ -242,7 +242,7 @@ impl Pool { pub fn validate_raw_txs( &self, - txs: Vec, + txs: &[Transaction], extra_tx: Option, header: &BlockHeader, weighting: Weighting, @@ -262,7 +262,7 @@ impl Pool { // We know the tx is valid if the entire aggregate tx is valid. if self.validate_raw_tx(&agg_tx, header, weighting).is_ok() { - valid_txs.push(tx); + valid_txs.push(tx.clone()); } } diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index dc38e75d8c..2837eb3f79 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -97,11 +97,6 @@ fn process_fluff_phase( // Take a write lock on the txpool for the duration of this processing. let mut tx_pool = tx_pool.write(); - // if we are expired the look for all txs. - // if not expired then - - // if multiple look for all txs - // if single then wait 30s - let all_entries = select_txs_cutoff(&tx_pool.stempool, 0); if all_entries.is_empty() { return Ok(()); @@ -112,15 +107,10 @@ fn process_fluff_phase( .expect("aggregation secs config missing"); let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs); - if adapter.is_expired() { - // If epoch is expired, fluff *all* outstanding entries in stempool. - } else if cutoff_entries.len() > 0 { - // If *any* entry older than aggregation_secs (30s) then fluff *all* entries. - // If we only have a single tx in the stempool and it is older - // than aggregation_secs we will fluff it without aggregation, - // but we must broadcast the tx in a timely manner. - } else { - // Give any fresh txs time to aggregate, do nothing this time. + // If epoch is expired, fluff *all* outstanding entries in stempool. + // If *any* entry older than aggregation_secs (30s) then fluff *all* entries. + // Otherwise we are done for now and we can give txs more time to aggregate. + if !adapter.is_expired() && cutoff_entries.is_empty() { return Ok(()); } @@ -128,9 +118,9 @@ fn process_fluff_phase( let fluffable_txs = { let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; - let txs: Vec<_> = all_entries.iter().map(|x| x.tx.clone()).collect(); + let txs: Vec<_> = all_entries.into_iter().map(|x| x.tx).collect(); tx_pool.stempool.validate_raw_txs( - txs, + &txs, txpool_tx, &header, transaction::Weighting::NoLimit, From 323992e60f73035bd16377c6bcab2e4bdb9c90be Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Thu, 14 Mar 2019 12:27:48 +0000 Subject: [PATCH 17/22] fix off by one --- servers/src/common/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index 3514cf6422..3190e465cf 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -474,7 +474,7 @@ impl DandelionEpoch { .config .stem_probability .expect("stem_probability config missing"); - self.is_stem = rng.gen_range(0, 101) < stem_probability; + self.is_stem = rng.gen_range(0, 100) < stem_probability; let addr = self.relay_peer.clone().map(|p| p.info.addr); info!( From 63c2a9ba3ecf469156c040ec9185489653bf4aa3 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Thu, 14 Mar 2019 12:41:16 +0000 Subject: [PATCH 18/22] cleanup --- pool/src/transaction_pool.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 531ed65978..f835162dea 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -157,18 +157,12 @@ impl TransactionPool { tx, }; + // If not stem then we are fluff. // If this is a stem tx then attempt to stem. // Any problems fallback to fluff later. - // If not stem then we are fluff. - let fluff = if stem { - // Attempt to add to stempool, notifying adapter to relay to outbound peer. - // Fall back to fluffing (via txpool) if stempool interaction fails. - self.add_to_stempool(entry.clone(), header) - .and_then(|_| self.adapter.stem_tx_accepted(&entry.tx)) - .is_err() - } else { - true - }; + let fluff = !stem || self.add_to_stempool(entry.clone(), header) + .and_then(|_| self.adapter.stem_tx_accepted(&entry.tx)) + .is_err(); // Fluff (broadcast) the tx if tagged as fluff or if // stemming failed for any reason. From 0d27c0d922a71beb66587d64b93407199db84dca Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Thu, 14 Mar 2019 12:45:16 +0000 Subject: [PATCH 19/22] cleanup --- pool/src/transaction_pool.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index f835162dea..66a41a2b58 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -159,14 +159,13 @@ impl TransactionPool { // If not stem then we are fluff. // If this is a stem tx then attempt to stem. - // Any problems fallback to fluff later. - let fluff = !stem || self.add_to_stempool(entry.clone(), header) - .and_then(|_| self.adapter.stem_tx_accepted(&entry.tx)) - .is_err(); - - // Fluff (broadcast) the tx if tagged as fluff or if - // stemming failed for any reason. - if fluff { + // Any problems during stem, fallback to fluff. + if !stem + || self + .add_to_stempool(entry.clone(), header) + .and_then(|_| self.adapter.stem_tx_accepted(&entry.tx)) + .is_err() + { self.add_to_txpool(entry.clone(), header)?; self.add_to_reorg_cache(entry.clone()); self.adapter.tx_accepted(&entry.tx); From 5dff54d8f1578691a848f5dd1b3f7705dbf325a7 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Tue, 19 Mar 2019 16:47:11 +0000 Subject: [PATCH 20/22] various fixes --- pool/src/types.rs | 24 ++++++++++++------------ servers/src/grin/dandelion_monitor.rs | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pool/src/types.rs b/pool/src/types.rs index 3ece2dfcf3..20397a514e 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -28,16 +28,16 @@ use grin_core as core; use grin_keychain as keychain; /// Dandelion "epoch" length. -const DANDELION_EPOCH_SECS: u64 = 600; +const DANDELION_EPOCH_SECS: u16 = 600; /// Dandelion embargo timer. -const DANDELION_EMBARGO_SECS: u64 = 180; +const DANDELION_EMBARGO_SECS: u16 = 180; /// Dandelion aggregation timer. -const DANDELION_AGGREGATION_SECS: u64 = 30; +const DANDELION_AGGREGATION_SECS: u16 = 30; /// Dandelion stem probability (stem 90% of the time, fluff 10%). -const DANDELION_STEM_PROBABILITY: u64 = 90; +const DANDELION_STEM_PROBABILITY: u8 = 90; /// Configuration for "Dandelion". /// Note: shared between p2p and pool. @@ -45,17 +45,17 @@ const DANDELION_STEM_PROBABILITY: u64 = 90; pub struct DandelionConfig { /// Length of each "epoch". #[serde(default = "default_dandelion_epoch_secs")] - pub epoch_secs: Option, + pub epoch_secs: Option, /// Dandelion embargo timer. Fluff and broadcast individual txs if not seen /// on network before embargo expires. #[serde(default = "default_dandelion_embargo_secs")] - pub embargo_secs: Option, + pub embargo_secs: Option, /// Dandelion aggregation timer. #[serde(default = "default_dandelion_aggregation_secs")] - pub aggregation_secs: Option, + pub aggregation_secs: Option, /// Dandelion stem probability (stem 90% of the time, fluff 10% etc.) #[serde(default = "default_dandelion_stem_probability")] - pub stem_probability: Option, + pub stem_probability: Option, } impl Default for DandelionConfig { @@ -69,19 +69,19 @@ impl Default for DandelionConfig { } } -fn default_dandelion_epoch_secs() -> Option { +fn default_dandelion_epoch_secs() -> Option { Some(DANDELION_EPOCH_SECS) } -fn default_dandelion_embargo_secs() -> Option { +fn default_dandelion_embargo_secs() -> Option { Some(DANDELION_EMBARGO_SECS) } -fn default_dandelion_aggregation_secs() -> Option { +fn default_dandelion_aggregation_secs() -> Option { Some(DANDELION_AGGREGATION_SECS) } -fn default_dandelion_stem_probability() -> Option { +fn default_dandelion_stem_probability() -> Option { Some(DANDELION_STEM_PROBABILITY) } diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 2837eb3f79..665874298a 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -79,7 +79,7 @@ pub fn monitor_transactions( // Query the pool for transactions older than the cutoff. // Used for both periodic fluffing and handling expired embargo timer. -fn select_txs_cutoff(pool: &Pool, cutoff_secs: u64) -> Vec { +fn select_txs_cutoff(pool: &Pool, cutoff_secs: u16) -> Vec { let cutoff = Utc::now().timestamp() - cutoff_secs as i64; pool.entries .iter() @@ -97,7 +97,7 @@ fn process_fluff_phase( // Take a write lock on the txpool for the duration of this processing. let mut tx_pool = tx_pool.write(); - let all_entries = select_txs_cutoff(&tx_pool.stempool, 0); + let all_entries = tx_pool.stempool.entries.clone(); if all_entries.is_empty() { return Ok(()); } From d8d7d996040576dedaaac5e6af243037f311b5ad Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Tue, 19 Mar 2019 16:49:45 +0000 Subject: [PATCH 21/22] one less clone --- pool/src/transaction_pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index 66a41a2b58..f0e0d81de4 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -74,10 +74,10 @@ impl TransactionPool { self.blockchain.chain_head() } + // Add tx to stempool (passing in all txs from txpool to validate against). fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> { - // Add tx to stempool (passing in all txs from txpool to validate against). self.stempool - .add_to_pool(entry.clone(), self.txpool.all_transactions(), header)?; + .add_to_pool(entry, self.txpool.all_transactions(), header)?; Ok(()) } From d49bc62a68a390e1018b4ec3db647f6694b154e0 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Tue, 19 Mar 2019 16:59:05 +0000 Subject: [PATCH 22/22] cleanup --- servers/src/common/types.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index cd559a94fe..374bddef29 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -468,13 +468,13 @@ impl DandelionEpoch { /// Is the current Dandelion epoch expired? /// It is expired if start_time is older than the configured epoch_secs. pub fn is_expired(&self) -> bool { - let expired = if let Some(start_time) = self.start_time { - let epoch_secs = self.config.epoch_secs.expect("epoch_secs config missing") as i64; - Utc::now().timestamp().saturating_sub(start_time) > epoch_secs - } else { - true - }; - expired + match self.start_time { + None => true, + Some(start_time) => { + let epoch_secs = self.config.epoch_secs.expect("epoch_secs config missing") as i64; + Utc::now().timestamp().saturating_sub(start_time) > epoch_secs + } + } } /// Transition to next Dandelion epoch.