Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dandelion++ Rewrite #2628

Merged
merged 23 commits into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions config/src/comments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,28 +141,29 @@ fn comments() -> HashMap<String, String> {
);

retval.insert(
"relay_secs".to_string(),
"epoch_secs".to_string(),
"
#dandelion relay time (choose new relay peer every n secs)
#dandelion epoch duration
"
.to_string(),
);

retval.insert(
"embargo_secs".to_string(),
"aggregation_secs".to_string(),
"
#fluff and broadcast after embargo expires if tx not seen on network
#dandelion aggregation period in secs
"
.to_string(),
);

retval.insert(
"patience_secs".to_string(),
"embargo_secs".to_string(),
"
#run dandelion stem/fluff processing every n secs (stem tx aggregation in this window)
#fluff and broadcast after embargo expires if tx not seen on network
"
.to_string(),
);

retval.insert(
"stem_probability".to_string(),
"
Expand Down
7 changes: 7 additions & 0 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use crate::util::{Mutex, RwLock};
use std::fmt;
use std::fs::File;
use std::net::{Shutdown, TcpStream};
use std::sync::Arc;
Expand Down Expand Up @@ -54,6 +55,12 @@ pub struct Peer {
connection: Option<Mutex<conn::Tracker>>,
}

impl fmt::Debug for Peer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Peer({:?})", &self.info)
}
}

impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, adapter: Arc<dyn NetAdapter>) -> Peer {
Expand Down
55 changes: 0 additions & 55 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub struct Peers {
pub adapter: Arc<dyn ChainAdapter>,
store: PeerStore,
peers: RwLock<HashMap<PeerAddr, Arc<Peer>>>,
dandelion_relay: RwLock<Option<(i64, Arc<Peer>)>>,
config: P2PConfig,
}

Expand All @@ -48,7 +47,6 @@ impl Peers {
store,
config,
peers: RwLock::new(HashMap::new()),
dandelion_relay: RwLock::new(None),
}
}

Expand Down Expand Up @@ -87,39 +85,6 @@ impl Peers {
self.save_peer(&peer_data)
}

// Update the dandelion relay
pub fn update_dandelion_relay(&self) {
let peers = self.outgoing_connected_peers();

let peer = &self
.config
.dandelion_peer
.and_then(|ip| peers.iter().find(|x| x.info.addr == ip))
.or(thread_rng().choose(&peers));

match peer {
Some(peer) => self.set_dandelion_relay(peer),
None => debug!("Could not update dandelion relay"),
}
}

fn set_dandelion_relay(&self, peer: &Arc<Peer>) {
// Clear the map and add new relay
let dandelion_relay = &self.dandelion_relay;
dandelion_relay
.write()
.replace((Utc::now().timestamp(), peer.clone()));
debug!(
"Successfully updated Dandelion relay to: {}",
peer.info.addr
);
}

// Get the dandelion relay
pub fn get_dandelion_relay(&self) -> Option<(i64, Arc<Peer>)> {
self.dandelion_relay.read().clone()
}

pub fn is_known(&self, addr: PeerAddr) -> bool {
self.peers.read().contains_key(&addr)
}
Expand Down Expand Up @@ -335,26 +300,6 @@ impl Peers {
);
}

/// Relays the provided stem transaction to our single stem peer.
pub fn relay_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
self.get_dandelion_relay()
.or_else(|| {
debug!("No dandelion relay, updating.");
self.update_dandelion_relay();
self.get_dandelion_relay()
})
// If still return an error, let the caller handle this as they see fit.
// The caller will "fluff" at this point as the stem phase is finished.
.ok_or(Error::NoDandelionRelay)
.map(|(_, relay)| {
if relay.is_connected() {
if let Err(e) = relay.send_stem_transaction(tx) {
debug!("Error sending stem transaction to peer relay: {:?}", e);
}
}
})
}

/// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our
/// peers. We may be connected to PEER_MAX_COUNT peers so we only
/// want to broadcast to a random subset of peers.
Expand Down
3 changes: 2 additions & 1 deletion pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ mod pool;
pub mod transaction_pool;
pub mod types;

pub use crate::pool::Pool;
pub use crate::transaction_pool::TransactionPool;
pub use crate::types::{
BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntryState, PoolError, TxSource,
BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource,
};
37 changes: 5 additions & 32 deletions pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use self::core::core::{
Block, BlockHeader, BlockSums, Committed, Transaction, TxKernel, Weighting,
};
use self::util::RwLock;
use crate::types::{BlockChain, PoolEntry, PoolEntryState, PoolError};
use crate::types::{BlockChain, PoolEntry, PoolError};
use grin_core as core;
use grin_util as util;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -139,7 +139,7 @@ impl Pool {
// Verify these txs produce an aggregated tx below max tx weight.
// Return a vec of all the valid txs.
let txs = self.validate_raw_txs(
tx_buckets,
&tx_buckets,
None,
&header,
Weighting::AsLimitedTransaction { max_weight },
Expand Down Expand Up @@ -167,33 +167,6 @@ impl Pool {
Ok(Some(tx))
}

pub fn select_valid_transactions(
&self,
txs: Vec<Transaction>,
extra_tx: Option<Transaction>,
header: &BlockHeader,
) -> Result<Vec<Transaction>, PoolError> {
let valid_txs = self.validate_raw_txs(txs, extra_tx, header, Weighting::NoLimit)?;
Ok(valid_txs)
}

pub fn get_transactions_in_state(&self, state: PoolEntryState) -> Vec<Transaction> {
self.entries
.iter()
.filter(|x| x.state == state)
.map(|x| x.tx.clone())
.collect::<Vec<_>>()
}

// Transition the specified pool entries to the new state.
pub fn transition_to_state(&mut self, txs: &[Transaction], state: PoolEntryState) {
for x in &mut self.entries {
if txs.contains(&x.tx) {
x.state = state;
}
}
}

// Aggregate this new tx with all existing txs in the pool.
// If we can validate the aggregated tx against the current chain state
// then we can safely add the tx to the pool.
Expand Down Expand Up @@ -267,9 +240,9 @@ impl Pool {
Ok(new_sums)
}

fn validate_raw_txs(
pub fn validate_raw_txs(
&self,
txs: Vec<Transaction>,
txs: &[Transaction],
extra_tx: Option<Transaction>,
header: &BlockHeader,
weighting: Weighting,
Expand All @@ -289,7 +262,7 @@ impl Pool {

// We know the tx is valid if the entire aggregate tx is valid.
if self.validate_raw_tx(&agg_tx, header, weighting).is_ok() {
valid_txs.push(tx);
valid_txs.push(tx.clone());
}
}

Expand Down
36 changes: 13 additions & 23 deletions pool/src/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use self::core::core::verifier_cache::VerifierCache;
use self::core::core::{transaction, Block, BlockHeader, Transaction, Weighting};
use self::util::RwLock;
use crate::pool::Pool;
use crate::types::{
BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource,
};
use crate::types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource};
use chrono::prelude::*;
use grin_core as core;
use grin_util as util;
Expand Down Expand Up @@ -76,13 +74,10 @@ impl TransactionPool {
self.blockchain.chain_head()
}

// Add tx to stempool (passing in all txs from txpool to validate against).
fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> {
// Add tx to stempool (passing in all txs from txpool to validate against).
self.stempool
.add_to_pool(entry, self.txpool.all_transactions(), header)?;

// Note: we do not notify the adapter here,
// we let the dandelion monitor handle this.
Ok(())
}

Expand Down Expand Up @@ -124,8 +119,6 @@ impl TransactionPool {
let txpool_tx = self.txpool.all_transactions_aggregate()?;
self.stempool.reconcile(txpool_tx, header)?;
}

self.adapter.tx_accepted(&entry.tx);
Ok(())
}

Expand Down Expand Up @@ -159,28 +152,25 @@ impl TransactionPool {
self.blockchain.verify_coinbase_maturity(&tx)?;

let entry = PoolEntry {
state: PoolEntryState::Fresh,
src,
tx_at: Utc::now(),
tx,
};

// If we are in "stem" mode then check if this is a new tx or if we have seen it before.
// If new tx - add it to our stempool.
// If we have seen any of the kernels before then fallback to fluff,
// adding directly to txpool.
if stem
&& self
.stempool
.find_matching_transactions(entry.tx.kernels())
.is_empty()
// If not stem then we are fluff.
// If this is a stem tx then attempt to stem.
// Any problems during stem, fallback to fluff.
if !stem
|| self
.add_to_stempool(entry.clone(), header)
.and_then(|_| self.adapter.stem_tx_accepted(&entry.tx))
.is_err()
quentinlesceller marked this conversation as resolved.
Show resolved Hide resolved
{
self.add_to_stempool(entry, header)?;
return Ok(());
self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry.clone());
self.adapter.tx_accepted(&entry.tx);
}

self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry);
Ok(())
}

Expand Down
Loading