Skip to content

Commit

Permalink
fluff all txs in stempool if any are older than 30s
Browse files Browse the repository at this point in the history
aggressively aggregate when we can
  • Loading branch information
antiochp committed Mar 11, 2019
1 parent e7c9a59 commit f5d1dad
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions servers/src/grin/dandelion_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ pub fn monitor_transactions(
}

if !adapter.is_stem() {
if process_fluff_phase(&dandelion_config, &tx_pool, &verifier_cache).is_err() {
if process_fluff_phase(&dandelion_config, &tx_pool, &adapter, &verifier_cache)
.is_err()
{
error!("dand_mon: Problem processing fresh pool entries.");
}
}
Expand Down Expand Up @@ -89,25 +91,44 @@ fn select_txs_cutoff(pool: &Pool, cutoff_secs: u64) -> Vec<PoolEntry> {
fn process_fluff_phase(
dandelion_config: &DandelionConfig,
tx_pool: &Arc<RwLock<TransactionPool>>,
adapter: &Arc<DandelionAdapter>,
verifier_cache: &Arc<RwLock<dyn VerifierCache>>,
) -> Result<(), PoolError> {
// Take a write lock on the txpool for the duration of this processing.
let mut tx_pool = tx_pool.write();

// if we are expired the look for all txs.
// if not expired then -
// if multiple look for all txs
// if single then wait 30s

let all_entries = select_txs_cutoff(&tx_pool.stempool, 0);
if all_entries.is_empty() {
return Ok(());
}

let cutoff_secs = dandelion_config
.aggregation_secs
.expect("aggregation secs config missing");
let fluffable_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs);

if fluffable_entries.is_empty() {
let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs);

if adapter.is_expired() {
// If epoch is expired, fluff *all* outstanding entries in stempool.
} else if cutoff_entries.len() > 0 {
// If *any* entry older than aggregation_secs (30s) then fluff *all* entries.
// If we only have a single tx in the stempool and it is older
// than aggregation_secs we will fluff it without aggregation,
// but we must broadcast the tx in a timely manner.
} else {
// Give any fresh txs time to aggregate, do nothing this time.
return Ok(());
}

let header = tx_pool.chain_head()?;

let fluffable_txs = {
let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?;
let txs: Vec<_> = fluffable_entries.iter().map(|x| x.tx.clone()).collect();
let txs: Vec<_> = all_entries.iter().map(|x| x.tx.clone()).collect();
tx_pool.stempool.validate_raw_txs(
txs,
txpool_tx,
Expand Down

0 comments on commit f5d1dad

Please sign in to comment.