From 3bd402b99b2ad8ec77f061f76d5f831cacafed54 Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Sat, 1 Jan 2022 21:10:41 +0800 Subject: [PATCH 1/4] fix: resolve conflict descendants pending transactions --- test/src/main.rs | 1 + test/src/specs/tx_pool/collision.rs | 54 +++++++++- tx-pool/src/component/pending.rs | 137 ++++++++++++++++-------- tx-pool/src/component/proposed.rs | 8 +- tx-pool/src/component/tests/pending.rs | 39 +++++-- tx-pool/src/component/tests/proposed.rs | 11 +- tx-pool/src/pool.rs | 22 ++-- tx-pool/src/process.rs | 19 +--- 8 files changed, 193 insertions(+), 98 deletions(-) diff --git a/test/src/main.rs b/test/src/main.rs index 65ee69b4c0..4a3e8cf78a 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -493,6 +493,7 @@ fn all_specs() -> Vec> { Box::new(ConflictInPending), Box::new(ConflictInGap), Box::new(ConflictInProposed), + Box::new(RemoveConflictFromPending), Box::new(SubmitConflict), Box::new(DAOVerify), Box::new(AvoidDuplicatedProposalsWithUncles), diff --git a/test/src/specs/tx_pool/collision.rs b/test/src/specs/tx_pool/collision.rs index 467aaf677e..ba857acecd 100644 --- a/test/src/specs/tx_pool/collision.rs +++ b/test/src/specs/tx_pool/collision.rs @@ -1,8 +1,11 @@ -use crate::util::{check::is_transaction_committed, mining::mine}; +use crate::util::{ + check::{is_transaction_committed, is_transaction_pending, is_transaction_unknown}, + mining::mine, +}; use crate::utils::{assert_send_transaction_fail, blank, commit, propose}; use crate::{Node, Spec}; use ckb_types::bytes::Bytes; -use ckb_types::core::{Capacity, TransactionView}; +use ckb_types::core::{capacity_bytes, Capacity, TransactionView}; use ckb_types::prelude::*; // Convention: @@ -145,14 +148,49 @@ impl Spec for SubmitConflict { } } -fn conflict_transactions(node: &Node) -> (TransactionView, TransactionView) { +pub struct RemoveConflictFromPending; + +impl Spec for RemoveConflictFromPending { + fn run(&self, nodes: &mut Vec) { + let node = &nodes[0]; + let window = node.consensus().tx_proposal_window(); + mine(node, window.farthest() + 2); + + let (txa, txb) = + conflict_transactions_with_capacity(node, Bytes::new(), capacity_bytes!(1000)); + let txc = node.new_transaction_with_since_capacity(txb.hash(), 0, capacity_bytes!(100)); + node.submit_transaction(&txa); + node.submit_transaction(&txb); + node.submit_transaction(&txc); + + assert!(is_transaction_pending(node, &txa)); + assert!(is_transaction_pending(node, &txb)); + assert!(is_transaction_pending(node, &txc)); + + node.submit_block(&propose(node, &[&txa])); + (0..window.closest()).for_each(|_| { + node.submit_block(&blank(node)); + }); + node.submit_block(&commit(node, &[&txa])); + node.wait_for_tx_pool(); + + assert!(is_transaction_committed(node, &txa)); + assert!(is_transaction_unknown(node, &txb)); + assert!(is_transaction_unknown(node, &txc)); + } +} + +fn conflict_transactions_with_capacity( + node: &Node, + output_data: Bytes, + cap: Capacity, +) -> (TransactionView, TransactionView) { let txa = node.new_transaction_spend_tip_cellbase(); - let output_data = Bytes::from(b"b0b".to_vec()); let output = txa .output(0) .unwrap() .as_builder() - .build_exact_capacity(Capacity::bytes(output_data.len()).unwrap()) + .build_exact_capacity(cap) .unwrap(); let txb = txa .as_advanced_builder() @@ -163,6 +201,12 @@ fn conflict_transactions(node: &Node) -> (TransactionView, TransactionView) { (txa, txb) } +fn conflict_transactions(node: &Node) -> (TransactionView, TransactionView) { + let output_data = Bytes::from(b"b0b".to_vec()); + let cap = Capacity::bytes(output_data.len()).unwrap(); + conflict_transactions_with_capacity(node, output_data, cap) +} + fn cousin_txs_with_same_hash_different_witness_hash( node: &Node, ) -> (TransactionView, TransactionView) { diff --git a/tx-pool/src/component/pending.rs b/tx-pool/src/component/pending.rs index aa9aff1c31..722bedaca7 100644 --- a/tx-pool/src/component/pending.rs +++ b/tx-pool/src/component/pending.rs @@ -19,10 +19,12 @@ pub(crate) struct PendingQueue { pub(crate) inner: LinkedHashMap, /// dep-set map represent in-pool tx's deps pub(crate) deps: HashMap>, - /// input-txid map represent in-pool tx's inputs - pub(crate) inputs: HashMap, + /// input-set map represent in-pool tx's inputs + pub(crate) inputs: HashMap>, /// dep-set map represent in-pool tx's header deps pub(crate) header_deps: HashMap>, + // /// output-op map represent in-pool tx's outputs + pub(crate) outputs: HashMap>, } impl PendingQueue { @@ -32,6 +34,7 @@ impl PendingQueue { deps: Default::default(), inputs: Default::default(), header_deps: Default::default(), + outputs: Default::default(), } } @@ -42,13 +45,22 @@ impl PendingQueue { pub(crate) fn add_entry(&mut self, entry: TxEntry) -> bool { let inputs = entry.transaction().input_pts_iter(); let tx_short_id = entry.proposal_short_id(); + let outputs = entry.transaction().output_pts(); if self.inner.contains_key(&tx_short_id) { return false; } for i in inputs { - self.inputs.insert(i.to_owned(), tx_short_id.clone()); + self.inputs + .entry(i.to_owned()) + .or_default() + .insert(tx_short_id.clone()); + + self.outputs + .entry(i.to_owned()) + .or_default() + .insert(tx_short_id.clone()); } // record dep-txid @@ -57,6 +69,16 @@ impl PendingQueue { .entry(d.to_owned()) .or_default() .insert(tx_short_id.clone()); + + self.outputs + .entry(d.to_owned()) + .or_default() + .insert(tx_short_id.clone()); + } + + // record tx unconsumed output + for o in outputs { + self.outputs.insert(o, HashSet::new()); } // record header_deps @@ -75,24 +97,27 @@ impl PendingQueue { let mut conflicts = Vec::new(); for i in inputs { - if let Some(id) = self.inputs.remove(&i) { - if let Some(entry) = self.remove_entry(&id) { - let reject = Reject::Resolve(OutPointError::Dead(i.clone())); - conflicts.push((entry, reject)); + if let Some(ids) = self.inputs.remove(&i) { + for id in ids { + let entries = self.remove_entry_and_descendants(&id); + for entry in entries { + let reject = Reject::Resolve(OutPointError::Dead(i.clone())); + conflicts.push((entry, reject)); + } } } // deps consumed - if let Some(x) = self.deps.remove(&i) { - for id in x { - if let Some(entry) = self.remove_entry(&id) { + if let Some(ids) = self.deps.remove(&i) { + for id in ids { + let entries = self.remove_entry_and_descendants(&id); + for entry in entries { let reject = Reject::Resolve(OutPointError::Dead(i.clone())); conflicts.push((entry, reject)); } } } } - conflicts } @@ -114,8 +139,9 @@ impl PendingQueue { } for (blk_hash, id) in ids { - if let Some(entry) = self.remove_entry(&id) { - let reject = Reject::Resolve(OutPointError::InvalidHeader(blk_hash)); + let entries = self.remove_entry_and_descendants(&id); + for entry in entries { + let reject = Reject::Resolve(OutPointError::InvalidHeader(blk_hash.to_owned())); conflicts.push((entry, reject)); } } @@ -138,54 +164,71 @@ impl PendingQueue { self.inner.get(id).map(|entry| entry.transaction()) } - pub(crate) fn remove_committed_tx( - &mut self, - tx: &TransactionView, - related_out_points: &[OutPoint], - ) -> Option { - let inputs = tx.input_pts_iter(); - let id = tx.proposal_short_id(); + pub(crate) fn remove_entry(&mut self, id: &ProposalShortId) -> Option { + let removed = self.inner.remove(id); - if let Some(entry) = self.inner.remove(&id) { - for i in inputs { - self.inputs.remove(&i); - } + if let Some(ref entry) = removed { + self.remove_entry_relation(entry); + } - for d in related_out_points { - let mut empty = false; - if let Some(x) = self.deps.get_mut(d) { - x.remove(&id); - empty = x.is_empty(); - } + removed + } - if empty { - self.deps.remove(d); + pub(crate) fn remove_entry_and_descendants(&mut self, id: &ProposalShortId) -> Vec { + let mut removed = Vec::new(); + if let Some(entry) = self.inner.remove(id) { + let descendants = self.get_descendants(&entry); + self.remove_entry_relation(&entry); + removed.push(entry); + for id in descendants { + if let Some(entry) = self.remove_entry(&id) { + removed.push(entry); } } - - self.header_deps.remove(&id); - - return Some(entry); } - None + removed } - pub(crate) fn remove_entry(&mut self, id: &ProposalShortId) -> Option { - let removed = self.inner.remove(id); + pub(crate) fn get_descendants(&self, entry: &TxEntry) -> Vec { + use std::collections::VecDeque; - if let Some(ref entry) = removed { - self.remove_entry_relation(entry); - } + let mut entries: VecDeque<&TxEntry> = VecDeque::new(); + entries.push_back(entry); - removed + let mut descendants = Vec::new(); + while let Some(entry) = entries.pop_front() { + let outputs = entry.transaction().output_pts(); + + for output in outputs { + if let Some(ids) = self.outputs.get(&output) { + descendants.extend(ids.iter().cloned()); + for id in ids { + if let Some(entry) = self.inner.get(id) { + entries.push_back(entry); + } + } + } + } + } + descendants } pub(crate) fn remove_entry_relation(&mut self, entry: &TxEntry) { let inputs = entry.transaction().input_pts_iter(); let tx_short_id = entry.proposal_short_id(); + let outputs = entry.transaction().output_pts(); for i in inputs { - self.inputs.remove(&i); + let mut empty = false; + + if let Some(ids) = self.inputs.get_mut(&i) { + ids.remove(&tx_short_id); + empty = ids.is_empty(); + } + + if empty { + self.inputs.remove(&i); + } } // remove dep @@ -200,6 +243,11 @@ impl PendingQueue { self.deps.remove(d); } } + + for o in outputs { + self.outputs.remove(&o); + } + self.header_deps.remove(&tx_short_id); } @@ -252,6 +300,7 @@ impl PendingQueue { self.deps.clear(); self.inputs.clear(); self.header_deps.clear(); + self.outputs.clear(); txs } } diff --git a/tx-pool/src/component/proposed.rs b/tx-pool/src/component/proposed.rs index 3f484b9447..1d0ee793a0 100644 --- a/tx-pool/src/component/proposed.rs +++ b/tx-pool/src/component/proposed.rs @@ -208,11 +208,7 @@ impl ProposedPool { removed_entries } - pub(crate) fn remove_committed_tx( - &mut self, - tx: &TransactionView, - related_out_points: &[OutPoint], - ) -> Option { + pub(crate) fn remove_committed_tx(&mut self, tx: &TransactionView) -> Option { let outputs = tx.output_pts(); let inputs = tx.input_pts_iter(); let id = tx.proposal_short_id(); @@ -232,7 +228,7 @@ impl ProposedPool { self.edges.remove_input(&i); } - for d in related_out_points { + for d in entry.related_dep_out_points() { self.edges.delete_txid_by_dep(d, &id); } diff --git a/tx-pool/src/component/tests/pending.rs b/tx-pool/src/component/tests/pending.rs index 689302014b..cea3f84627 100644 --- a/tx-pool/src/component/tests/pending.rs +++ b/tx-pool/src/component/tests/pending.rs @@ -28,6 +28,8 @@ fn test_basic() { assert!(queue.inner.is_empty()); assert!(queue.deps.is_empty()); assert!(queue.inputs.is_empty()); + assert!(queue.header_deps.is_empty()); + assert!(queue.outputs.is_empty()); assert_eq!(txs, vec![tx1, tx2]); } @@ -70,6 +72,29 @@ fn test_resolve_conflict() { ); } +#[test] +fn test_resolve_conflict_descendants() { + let mut queue = PendingQueue::new(); + let tx1 = build_tx(vec![(&Byte32::zero(), 1)], 1); + let tx3 = build_tx(vec![(&tx1.hash(), 0)], 2); + let tx4 = build_tx(vec![(&tx3.hash(), 0)], 1); + + let tx2 = build_tx(vec![(&tx1.hash(), 0)], 1); + + let entry1 = TxEntry::dummy_resolve(tx1, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE); + let entry3 = TxEntry::dummy_resolve(tx3, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE); + let entry4 = TxEntry::dummy_resolve(tx4, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE); + assert!(queue.add_entry(entry1)); + assert!(queue.add_entry(entry3.clone())); + assert!(queue.add_entry(entry4.clone())); + + let conflicts = queue.resolve_conflict(&tx2); + assert_eq!( + conflicts.into_iter().map(|i| i.0).collect::>(), + HashSet::from_iter(vec![entry3, entry4]) + ); +} + #[test] fn test_resolve_conflict_header_dep() { let mut queue = PendingQueue::new(); @@ -80,9 +105,12 @@ fn test_resolve_conflict_header_dep() { vec![header.clone()], 1, ); + let tx1 = build_tx(vec![(&tx.hash(), 0)], 1); let entry = TxEntry::dummy_resolve(tx, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE); + let entry1 = TxEntry::dummy_resolve(tx1, MOCK_CYCLES, MOCK_FEE, MOCK_SIZE); assert!(queue.add_entry(entry.clone())); + assert!(queue.add_entry(entry1.clone())); let mut headers = HashSet::new(); headers.insert(header); @@ -90,12 +118,12 @@ fn test_resolve_conflict_header_dep() { let conflicts = queue.resolve_conflict_header_dep(&headers); assert_eq!( conflicts.into_iter().map(|i| i.0).collect::>(), - HashSet::from_iter(vec![entry]) + HashSet::from_iter(vec![entry, entry1]) ); } #[test] -fn test_remove_committed_tx() { +fn test_remove_entry() { let mut queue = PendingQueue::new(); let tx1 = build_tx(vec![(&Byte32::zero(), 1), (&h256!("0x1").pack(), 1)], 1); let header: Byte32 = h256!("0x1").pack(); @@ -106,12 +134,9 @@ fn test_remove_committed_tx() { assert!(queue.add_entry(entry1.clone())); assert!(queue.add_entry(entry2.clone())); - let related_dep1: Vec<_> = entry1.related_dep_out_points().cloned().collect(); - let related_dep2: Vec<_> = entry2.related_dep_out_points().cloned().collect(); - - let removed = queue.remove_committed_tx(&tx1, &related_dep1); + let removed = queue.remove_entry(&tx1.proposal_short_id()); assert_eq!(removed, Some(entry1)); - let removed = queue.remove_committed_tx(&tx2, &related_dep2); + let removed = queue.remove_entry(&tx2.proposal_short_id()); assert_eq!(removed, Some(entry2)); assert!(queue.inner.is_empty()); assert!(queue.deps.is_empty()); diff --git a/tx-pool/src/component/tests/proposed.rs b/tx-pool/src/component/tests/proposed.rs index ad573bc061..f143e33b05 100644 --- a/tx-pool/src/component/tests/proposed.rs +++ b/tx-pool/src/component/tests/proposed.rs @@ -69,7 +69,7 @@ fn test_add_entry() { assert_eq!(pool.edges.outputs_len(), 2); assert_eq!(pool.edges.inputs_len(), 2); - pool.remove_committed_tx(&tx1, &get_related_dep_out_points(&tx1, |_| None).unwrap()); + pool.remove_committed_tx(&tx1); assert_eq!(pool.edges.outputs_len(), 1); assert_eq!(pool.edges.inputs_len(), 1); } @@ -102,7 +102,7 @@ fn test_add_roots() { assert_eq!(pool.edges.outputs_len(), 4); assert_eq!(pool.edges.inputs_len(), 4); - pool.remove_committed_tx(&tx1, &get_related_dep_out_points(&tx1, |_| None).unwrap()); + pool.remove_committed_tx(&tx1); assert_eq!(pool.edges.outputs_len(), 3); assert_eq!(pool.edges.inputs_len(), 2); @@ -163,7 +163,7 @@ fn test_add_no_roots() { assert_eq!(pool.edges.outputs_len(), 13); assert_eq!(pool.edges.inputs_len(), 2); - pool.remove_committed_tx(&tx1, &get_related_dep_out_points(&tx1, |_| None).unwrap()); + pool.remove_committed_tx(&tx1); assert_eq!(pool.edges.outputs_len(), 10); assert_eq!(pool.edges.inputs_len(), 4); @@ -465,10 +465,7 @@ fn test_dep_group() { assert_eq!(get_deps_len(&pool, &tx2_out_point), 1); assert_eq!(get_deps_len(&pool, &tx3_out_point), 0); - pool.remove_committed_tx( - &tx3, - &get_related_dep_out_points(&tx3, &get_cell_data).unwrap(), - ); + pool.remove_committed_tx(&tx3); assert_eq!(get_deps_len(&pool, &tx1_out_point), 0); assert_eq!(get_deps_len(&pool, &tx2_out_point), 0); assert_eq!(get_deps_len(&pool, &tx3_out_point), 0); diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 54b7785bf2..14e16344d0 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -20,7 +20,7 @@ use ckb_types::{ tx_pool::{TxPoolEntryInfo, TxPoolIds}, Cycle, TransactionView, }, - packed::{Byte32, OutPoint, ProposalShortId}, + packed::{Byte32, ProposalShortId}, }; use ckb_verification::{cache::CacheEntry, TxVerifyEnv}; use faketime::unix_time_as_millis; @@ -252,12 +252,12 @@ impl TxPool { pub(crate) fn remove_committed_txs<'a>( &mut self, - txs: impl Iterator)>, + txs: impl Iterator, callbacks: &Callbacks, detached_headers: &HashSet, ) { - for (tx, related_out_points) in txs { - self.remove_committed_tx(tx, &related_out_points, callbacks); + for tx in txs { + self.remove_committed_tx(tx, callbacks); self.committed_txs_hash_cache .put(tx.proposal_short_id(), tx.hash()); @@ -284,17 +284,13 @@ impl TxPool { } } - pub(crate) fn remove_committed_tx( - &mut self, - tx: &TransactionView, - related_out_points: &[OutPoint], - callbacks: &Callbacks, - ) { + pub(crate) fn remove_committed_tx(&mut self, tx: &TransactionView, callbacks: &Callbacks) { let hash = tx.hash(); + let short_id = tx.proposal_short_id(); trace!("committed {}", hash); // try remove committed tx from proposed // proposed tx should not contain conflict, if exists just skip resolve conflict - if let Some(entry) = self.proposed.remove_committed_tx(tx, related_out_points) { + if let Some(entry) = self.proposed.remove_committed_tx(tx) { callbacks.call_committed(self, &entry) } else { let conflicts = self.proposed.resolve_conflict(tx); @@ -305,7 +301,7 @@ impl TxPool { } // pending and gap should resolve conflict no matter exists or not - if let Some(entry) = self.gap.remove_committed_tx(tx, related_out_points) { + if let Some(entry) = self.gap.remove_entry(&short_id) { callbacks.call_committed(self, &entry) } { @@ -316,7 +312,7 @@ impl TxPool { } } - if let Some(entry) = self.pending.remove_committed_tx(tx, related_out_points) { + if let Some(entry) = self.pending.remove_entry(&short_id) { callbacks.call_committed(self, &entry) } { diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index ec2eb659ed..77fe616b49 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -24,14 +24,11 @@ use ckb_snapshot::Snapshot; use ckb_store::ChainStore; use ckb_types::{ core::{ - cell::{ - get_related_dep_out_points, OverlayCellChecker, ResolveOptions, ResolvedTransaction, - TransactionsChecker, - }, + cell::{OverlayCellChecker, ResolveOptions, ResolvedTransaction, TransactionsChecker}, BlockView, Capacity, Cycle, EpochExt, HeaderView, ScriptHashType, TransactionView, UncleBlockView, Version, }, - packed::{Byte32, Bytes, CellbaseWitness, OutPoint, ProposalShortId, Script}, + packed::{Byte32, Bytes, CellbaseWitness, ProposalShortId, Script}, prelude::*, }; use ckb_util::LinkedHashSet; @@ -1383,22 +1380,12 @@ fn _update_tx_pool_for_reorg( ) { tx_pool.snapshot = Arc::clone(&snapshot); - let txs_iter = attached.iter().map(|tx| { - let get_cell_data = |out_point: &OutPoint| { - snapshot - .get_cell_data(out_point) - .map(|(data, _data_hash)| data) - }; - let related_out_points = - get_related_dep_out_points(tx, get_cell_data).expect("Get dep out points failed"); - (tx, related_out_points) - }); // NOTE: `remove_expired` will try to re-put the given expired/detached proposals into // pending-pool if they can be found within txpool. As for a transaction // which is both expired and committed at the one time(commit at its end of commit-window), // we should treat it as a committed and not re-put into pending-pool. So we should ensure // that involves `remove_committed_txs` before `remove_expired`. - tx_pool.remove_committed_txs(txs_iter, callbacks, detached_headers); + tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers); tx_pool.remove_expired(detached_proposal_id.iter()); let mut entries = Vec::new(); From c25306c390d070ca8d9b249ef3e6fc47b3397488 Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Sat, 1 Jan 2022 21:26:26 +0800 Subject: [PATCH 2/4] feat: remove expired pending --- rpc/src/error.rs | 3 + tx-pool/src/pool.rs | 23 +++++- tx-pool/src/process.rs | 96 ++++++++++++++------------ util/app-config/src/configs/tx_pool.rs | 2 + util/app-config/src/legacy/tx_pool.rs | 11 +++ util/jsonrpc-types/src/pool.rs | 4 ++ util/types/src/core/tx_pool.rs | 4 ++ 7 files changed, 98 insertions(+), 45 deletions(-) diff --git a/rpc/src/error.rs b/rpc/src/error.rs index e0ff9e17e7..fc57860bcc 100644 --- a/rpc/src/error.rs +++ b/rpc/src/error.rs @@ -108,6 +108,8 @@ pub enum RPCError { /// /// For example, a cellbase transaction is not allowed in `send_transaction` RPC. PoolRejectedMalformedTransaction = -1108, + /// (-1109): The transaction is expired from tx-pool after `expiry_hours`. + TransactionExpired = -1109, } impl RPCError { @@ -167,6 +169,7 @@ impl RPCError { Reject::DeclaredWrongCycles(..) => RPCError::PoolRejectedMalformedTransaction, Reject::Resolve(_) => RPCError::TransactionFailedToResolve, Reject::Verification(_) => RPCError::TransactionFailedToVerify, + Reject::Expiry(_) => RPCError::TransactionExpired, }; RPCError::custom_with_error(code, reject) } diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 14e16344d0..44a999707a 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -54,6 +54,8 @@ pub struct TxPool { pub(crate) snapshot: Arc, /// record recent reject pub recent_reject: Option, + // expiration milliseconds, + pub(crate) expiry: u64, } /// Transaction pool information. @@ -96,6 +98,7 @@ impl TxPool { last_txs_updated_at: Arc, ) -> TxPool { let recent_reject = build_recent_reject(&config); + let expiry = config.expiry_hours as u64 * 60 * 60 * 1000; TxPool { pending: PendingQueue::new(), gap: PendingQueue::new(), @@ -107,6 +110,7 @@ impl TxPool { config, snapshot, recent_reject, + expiry, } } @@ -324,7 +328,24 @@ impl TxPool { } } - pub(crate) fn remove_expired<'a>(&mut self, ids: impl Iterator) { + pub(crate) fn remove_expired(&mut self, callbacks: &Callbacks) { + let now_ms = faketime::unix_time_as_millis(); + let removed = self + .pending + .remove_entries_by_filter(|_id, tx_entry| now_ms > self.expiry + tx_entry.timestamp); + + for entry in removed { + let reject = Reject::Expiry(entry.timestamp); + callbacks.call_reject(self, &entry, reject); + } + } + + // remove transaction with detached proposal from gap and proposed + // try re-put to pending + pub(crate) fn remove_by_detached_proposal<'a>( + &mut self, + ids: impl Iterator, + ) { for id in ids { if let Some(entry) = self.gap.remove_entry(id) { self.add_pending(entry); diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 77fe616b49..eb6c28f1e9 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -1082,6 +1082,7 @@ impl TxPoolService { detached_proposal_id: HashSet, snapshot: Arc, ) { + let mine_mode = self.block_assembler.is_some(); let mut detached = LinkedHashSet::default(); let mut attached = LinkedHashSet::default(); @@ -1138,6 +1139,7 @@ impl TxPoolService { detached_proposal_id, snapshot, &self.callbacks, + mine_mode, ); // Updates network fork switch if required. @@ -1377,6 +1379,7 @@ fn _update_tx_pool_for_reorg( detached_proposal_id: HashSet, snapshot: Arc, callbacks: &Callbacks, + mine_mode: bool, ) { tx_pool.snapshot = Arc::clone(&snapshot); @@ -1386,62 +1389,67 @@ fn _update_tx_pool_for_reorg( // we should treat it as a committed and not re-put into pending-pool. So we should ensure // that involves `remove_committed_txs` before `remove_expired`. tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers); - tx_pool.remove_expired(detached_proposal_id.iter()); + tx_pool.remove_by_detached_proposal(detached_proposal_id.iter()); let mut entries = Vec::new(); let mut gaps = Vec::new(); + // mine mode: // pending ---> gap ----> proposed // try move gap to proposed + if mine_mode { + tx_pool.gap.remove_entries_by_filter(|id, tx_entry| { + if snapshot.proposals().contains_proposed(id) { + entries.push(( + Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)), + tx_entry.clone(), + )); + true + } else { + false + } + }); - tx_pool.gap.remove_entries_by_filter(|id, tx_entry| { - if snapshot.proposals().contains_proposed(id) { - entries.push(( - Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)), - tx_entry.clone(), - )); - true - } else { - false - } - }); - - tx_pool.pending.remove_entries_by_filter(|id, tx_entry| { - if snapshot.proposals().contains_proposed(id) { - entries.push(( - Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)), - tx_entry.clone(), - )); - true - } else if snapshot.proposals().contains_gap(id) { - gaps.push(( - Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)), - tx_entry.clone(), - )); - true - } else { - false - } - }); + tx_pool.pending.remove_entries_by_filter(|id, tx_entry| { + if snapshot.proposals().contains_proposed(id) { + entries.push(( + Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)), + tx_entry.clone(), + )); + true + } else if snapshot.proposals().contains_gap(id) { + gaps.push(( + Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)), + tx_entry.clone(), + )); + true + } else { + false + } + }); - for (cycles, entry) in entries { - let tx_hash = entry.transaction().hash(); - if let Err(e) = tx_pool.proposed_rtx(cycles, entry.size, entry.rtx.clone()) { - debug!("Failed to add proposed tx {}, reason: {}", tx_hash, e); - callbacks.call_reject(tx_pool, &entry, e.clone()); - } else { - callbacks.call_proposed(tx_pool, &entry, false); + for (cycles, entry) in entries { + let tx_hash = entry.transaction().hash(); + if let Err(e) = tx_pool.proposed_rtx(cycles, entry.size, entry.rtx.clone()) { + debug!("Failed to add proposed tx {}, reason: {}", tx_hash, e); + callbacks.call_reject(tx_pool, &entry, e.clone()); + } else { + callbacks.call_proposed(tx_pool, &entry, false); + } } - } - for (cycles, entry) in gaps { - debug!("tx proposed, add to gap {}", entry.transaction().hash()); - let tx_hash = entry.transaction().hash(); - if let Err(e) = tx_pool.gap_rtx(cycles, entry.size, entry.rtx.clone()) { - debug!("Failed to add tx to gap {}, reason: {}", tx_hash, e); - callbacks.call_reject(tx_pool, &entry, e.clone()); + for (cycles, entry) in gaps { + debug!("tx proposed, add to gap {}", entry.transaction().hash()); + let tx_hash = entry.transaction().hash(); + if let Err(e) = tx_pool.gap_rtx(cycles, entry.size, entry.rtx.clone()) { + debug!("Failed to add tx to gap {}, reason: {}", tx_hash, e); + callbacks.call_reject(tx_pool, &entry, e.clone()); + } } } + + // remove expired transaction from pending + tx_pool.remove_expired(callbacks); } pub fn all_inputs_is_unknown(snapshot: &Snapshot, tx: &TransactionView) -> bool { diff --git a/util/app-config/src/configs/tx_pool.rs b/util/app-config/src/configs/tx_pool.rs index 9fe6420c9d..546a4f4624 100644 --- a/util/app-config/src/configs/tx_pool.rs +++ b/util/app-config/src/configs/tx_pool.rs @@ -33,6 +33,8 @@ pub struct TxPoolConfig { /// By default, it is a subdirectory of 'tx-pool' subdirectory under the data directory. #[serde(default)] pub recent_reject: PathBuf, + /// The expiration time for pool transactions in hours + pub expiry_hours: u8, } /// Block assembler config options. diff --git a/util/app-config/src/legacy/tx_pool.rs b/util/app-config/src/legacy/tx_pool.rs index 48dae04c27..c255cffc8e 100644 --- a/util/app-config/src/legacy/tx_pool.rs +++ b/util/app-config/src/legacy/tx_pool.rs @@ -10,6 +10,8 @@ const DEFAULT_MIN_FEE_RATE: FeeRate = FeeRate::from_u64(1000); const DEFAULT_MAX_TX_VERIFY_CYCLES: Cycle = TWO_IN_TWO_OUT_CYCLES * 20; // default max ancestors count const DEFAULT_MAX_ANCESTORS_COUNT: usize = 25; +// Default expiration time for pool transactions in hours +const DEFAULT_EXPIRY_HOURS: u8 = 24; #[derive(Clone, Debug, Deserialize)] #[serde(deny_unknown_fields)] @@ -31,6 +33,8 @@ pub(crate) struct TxPoolConfig { persisted_data: PathBuf, #[serde(default)] recent_reject: PathBuf, + #[serde(default = "default_expiry_hours")] + expiry_hours: u8, } fn default_keep_rejected_tx_hashes_days() -> u8 { @@ -41,6 +45,10 @@ fn default_keep_rejected_tx_hashes_count() -> u64 { 10_000_000 } +fn default_expiry_hours() -> u8 { + DEFAULT_EXPIRY_HOURS +} + impl Default for crate::TxPoolConfig { fn default() -> Self { TxPoolConfig::default().into() @@ -62,6 +70,7 @@ impl Default for TxPoolConfig { max_ancestors_count: DEFAULT_MAX_ANCESTORS_COUNT, persisted_data: Default::default(), recent_reject: Default::default(), + expiry_hours: DEFAULT_EXPIRY_HOURS, } } } @@ -81,6 +90,7 @@ impl From for crate::TxPoolConfig { max_ancestors_count, persisted_data, recent_reject, + expiry_hours, } = input; Self { max_mem_size, @@ -92,6 +102,7 @@ impl From for crate::TxPoolConfig { keep_rejected_tx_hashes_count, persisted_data, recent_reject, + expiry_hours, } } } diff --git a/util/jsonrpc-types/src/pool.rs b/util/jsonrpc-types/src/pool.rs index 9fa52ea892..68723de000 100644 --- a/util/jsonrpc-types/src/pool.rs +++ b/util/jsonrpc-types/src/pool.rs @@ -207,6 +207,9 @@ pub enum PoolTransactionReject { /// Verification failed Verification(String), + + /// Transaction expired + Expiry(String), } impl From for PoolTransactionReject { @@ -222,6 +225,7 @@ impl From for PoolTransactionReject { Reject::DeclaredWrongCycles(..) => Self::DeclaredWrongCycles(format!("{}", reject)), Reject::Resolve(_) => Self::Resolve(format!("{}", reject)), Reject::Verification(_) => Self::Verification(format!("{}", reject)), + Reject::Expiry(_) => Self::Expiry(format!("{}", reject)), } } } diff --git a/util/types/src/core/tx_pool.rs b/util/types/src/core/tx_pool.rs index 97f57b5b3a..b8cedc7848 100644 --- a/util/types/src/core/tx_pool.rs +++ b/util/types/src/core/tx_pool.rs @@ -43,6 +43,10 @@ pub enum Reject { /// Verification failed #[error("Verification failed {0}")] Verification(Error), + + /// Expired + #[error("Expiry transaction, timestamp {0}")] + Expiry(u64), } fn is_malformed_from_verification(error: &Error) -> bool { From 3557180439fffa271889dbb287dda81c710c0570 Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Sun, 2 Jan 2022 01:39:18 +0800 Subject: [PATCH 3/4] fix: rpc enum type docs --- devtools/doc/rpc.py | 7 ++++--- rpc/README.md | 23 ++++++++++++++--------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/devtools/doc/rpc.py b/devtools/doc/rpc.py index a7f3bad8dd..2c1c6f3898 100755 --- a/devtools/doc/rpc.py +++ b/devtools/doc/rpc.py @@ -510,7 +510,7 @@ def handle_starttag(self, tag, attrs): if self.next_variant is None: if tag == 'div': attrs_dict = dict(attrs) - if 'id' in attrs_dict and attrs_dict['id'].startswith('variant.'): + if 'id' in attrs_dict and attrs_dict['id'].startswith('variant.') and ('class', 'variant small-section-header') in attrs: self.next_variant = camel_to_snake( attrs_dict['id'].split('.')[1]) elif self.variant_parser is None: @@ -521,13 +521,14 @@ def handle_starttag(self, tag, attrs): self.variant_parser.handle_starttag(tag, attrs) def handle_endtag(self, tag): - if self.variant_parser is not None: + if self.next_variant is not None and self.variant_parser is not None: self.variant_parser.handle_endtag(tag) if self.variant_parser.completed(): if self.next_variant not in [v[0] for v in self.variants]: self.variants.append((self.next_variant, self.variant_parser)) + self.variant_parser = None self.next_variant = None - self.variant_parser = None + def handle_data(self, data): if self.variant_parser is not None: diff --git a/rpc/README.md b/rpc/README.md index 562a7664ba..d0b965250e 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -3116,6 +3116,10 @@ Pool rejects a large package of chained transactions to avoid certain kinds of D For example, a cellbase transaction is not allowed in `send_transaction` RPC. +### Error `TransactionExpired` + +(-1109): The transaction is expired from tx-pool after `expiry_hours`. + ## RPC Types @@ -3725,7 +3729,7 @@ An enum to represent the two kinds of dao withdrawal amount calculation option. `DaoWithdrawingCalculationKind` is equivalent to `"withdrawing_header_hash" | "withdrawing_out_point"`. * the assumed reference block hash for withdrawing phase 1 transaction -* Returns a copy of the value. [Read more](#method-clone) +* the out point of the withdrawing phase 1 transaction ### Type `DepType` @@ -4192,19 +4196,20 @@ TX reject message `PoolTransactionReject` is a JSON object with following fields. -* `type`: `"LowFeeRate" | "ExceededMaximumAncestorsCount" | "Full" | "Duplicated" | "Malformed" | "DeclaredWrongCycles" | "Resolve" | "Verification"` - Reject type. +* `type`: `"LowFeeRate" | "ExceededMaximumAncestorsCount" | "Full" | "Duplicated" | "Malformed" | "DeclaredWrongCycles" | "Resolve" | "Verification" | "Expiry"` - Reject type. * `description`: `string` - Detailed description about why the transaction is rejected. Different reject types: * `LowFeeRate`: Transaction fee lower than config -* `ExceededMaximumAncestorsCount`: Transaction pool exceeded maximum size or cycles limit, -* `Full`: Transaction already exist in transaction_pool -* `Duplicated`: Malformed transaction -* `Malformed`: Declared wrong cycles -* `DeclaredWrongCycles`: Resolve failed -* `Resolve`: Verification failed -* `Verification`: Returns a copy of the value. [Read more](#method-clone) +* `ExceededMaximumAncestorsCount`: Transaction exceeded maximum ancestors count limit +* `Full`: Transaction pool exceeded maximum size or cycles limit, +* `Duplicated`: Transaction already exist in transaction_pool +* `Malformed`: Malformed transaction +* `DeclaredWrongCycles`: Declared wrong cycles +* `Resolve`: Resolve failed +* `Verification`: Verification failed +* `Expiry`: Transaction expired ### Type `ProposalShortId` From f382e8a7b4ce5e2927e113ad45059b551b88627f Mon Sep 17 00:00:00 2001 From: zhangsoledad <787953403@qq.com> Date: Tue, 4 Jan 2022 16:53:08 +0800 Subject: [PATCH 4/4] refactor(tx-pool): remove from nested collection --- tx-pool/src/component/pending.rs | 41 +++++++++++++++---------------- tx-pool/src/component/proposed.rs | 26 ++++++++++---------- tx-pool/src/pool.rs | 8 +++--- tx-pool/src/process.rs | 33 ++++++++++--------------- 4 files changed, 50 insertions(+), 58 deletions(-) diff --git a/tx-pool/src/component/pending.rs b/tx-pool/src/component/pending.rs index 722bedaca7..24d3aa5144 100644 --- a/tx-pool/src/component/pending.rs +++ b/tx-pool/src/component/pending.rs @@ -10,7 +10,7 @@ use ckb_types::{ prelude::*, }; use ckb_util::{LinkedHashMap, LinkedHashMapEntries}; -use std::collections::{HashMap, HashSet}; +use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque}; type ConflictEntry = (TxEntry, Reject); @@ -190,8 +190,6 @@ impl PendingQueue { } pub(crate) fn get_descendants(&self, entry: &TxEntry) -> Vec { - use std::collections::VecDeque; - let mut entries: VecDeque<&TxEntry> = VecDeque::new(); entries.push_back(entry); @@ -219,28 +217,29 @@ impl PendingQueue { let outputs = entry.transaction().output_pts(); for i in inputs { - let mut empty = false; - - if let Some(ids) = self.inputs.get_mut(&i) { - ids.remove(&tx_short_id); - empty = ids.is_empty(); - } - - if empty { - self.inputs.remove(&i); + if let Entry::Occupied(mut occupied) = self.inputs.entry(i) { + let empty = { + let ids = occupied.get_mut(); + ids.remove(&tx_short_id); + ids.is_empty() + }; + if empty { + occupied.remove(); + } } } // remove dep - for d in entry.related_dep_out_points() { - let mut empty = false; - if let Some(x) = self.deps.get_mut(d) { - x.remove(&tx_short_id); - empty = x.is_empty(); - } - - if empty { - self.deps.remove(d); + for d in entry.related_dep_out_points().cloned() { + if let Entry::Occupied(mut occupied) = self.deps.entry(d) { + let empty = { + let ids = occupied.get_mut(); + ids.remove(&tx_short_id); + ids.is_empty() + }; + if empty { + occupied.remove(); + } } } diff --git a/tx-pool/src/component/proposed.rs b/tx-pool/src/component/proposed.rs index 1d0ee793a0..7271987da6 100644 --- a/tx-pool/src/component/proposed.rs +++ b/tx-pool/src/component/proposed.rs @@ -11,7 +11,7 @@ use ckb_types::{ packed::{Byte32, CellOutput, OutPoint, ProposalShortId}, prelude::*, }; -use std::collections::{HashMap, HashSet}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::iter; type ConflictEntry = (TxEntry, Reject); @@ -78,16 +78,16 @@ impl Edges { self.deps.entry(out_point).or_default().insert(txid); } - pub(crate) fn delete_txid_by_dep(&mut self, out_point: &OutPoint, txid: &ProposalShortId) { - let mut empty = false; - - if let Some(x) = self.deps.get_mut(out_point) { - x.remove(txid); - empty = x.is_empty(); - } - - if empty { - self.deps.remove(out_point); + pub(crate) fn delete_txid_by_dep(&mut self, out_point: OutPoint, txid: &ProposalShortId) { + if let Entry::Occupied(mut occupied) = self.deps.entry(out_point) { + let empty = { + let ids = occupied.get_mut(); + ids.remove(txid); + ids.is_empty() + }; + if empty { + occupied.remove(); + } } } @@ -194,7 +194,7 @@ impl ProposedPool { } } - for d in entry.related_dep_out_points() { + for d in entry.related_dep_out_points().cloned() { self.edges.delete_txid_by_dep(d, id); } @@ -228,7 +228,7 @@ impl ProposedPool { self.edges.remove_input(&i); } - for d in entry.related_dep_out_points() { + for d in entry.related_dep_out_points().cloned() { self.edges.delete_txid_by_dep(d, &id); } diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 44a999707a..54ef7f96c7 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -454,7 +454,7 @@ impl TxPool { pub(crate) fn gap_rtx( &mut self, - cache_entry: Option, + cache_entry: CacheEntry, size: usize, rtx: ResolvedTransaction, ) -> Result { @@ -471,7 +471,7 @@ impl TxPool { self.check_rtx_from_pending_and_proposed(&rtx, resolve_opts)?; let max_cycles = snapshot.consensus().max_block_cycles(); - let verified = verify_rtx(snapshot, &rtx, &tx_env, &cache_entry, max_cycles)?; + let verified = verify_rtx(snapshot, &rtx, &tx_env, &Some(cache_entry), max_cycles)?; let entry = TxEntry::new(rtx, verified.cycles, verified.fee, size); let tx_hash = entry.transaction().hash(); @@ -484,7 +484,7 @@ impl TxPool { pub(crate) fn proposed_rtx( &mut self, - cache_entry: Option, + cache_entry: CacheEntry, size: usize, rtx: ResolvedTransaction, ) -> Result { @@ -501,7 +501,7 @@ impl TxPool { self.check_rtx_from_proposed(&rtx, resolve_opts)?; let max_cycles = snapshot.consensus().max_block_cycles(); - let verified = verify_rtx(snapshot, &rtx, &tx_env, &cache_entry, max_cycles)?; + let verified = verify_rtx(snapshot, &rtx, &tx_env, &Some(cache_entry), max_cycles)?; let entry = TxEntry::new(rtx, verified.cycles, verified.fee, size); let tx_hash = entry.transaction().hash(); diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index eb6c28f1e9..9a9ce6e732 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -1383,7 +1383,7 @@ fn _update_tx_pool_for_reorg( ) { tx_pool.snapshot = Arc::clone(&snapshot); - // NOTE: `remove_expired` will try to re-put the given expired/detached proposals into + // NOTE: `remove_by_detached_proposal` will try to re-put the given expired/detached proposals into // pending-pool if they can be found within txpool. As for a transaction // which is both expired and committed at the one time(commit at its end of commit-window), // we should treat it as a committed and not re-put into pending-pool. So we should ensure @@ -1391,19 +1391,16 @@ fn _update_tx_pool_for_reorg( tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers); tx_pool.remove_by_detached_proposal(detached_proposal_id.iter()); - let mut entries = Vec::new(); - let mut gaps = Vec::new(); - // mine mode: // pending ---> gap ----> proposed // try move gap to proposed if mine_mode { + let mut entries = Vec::new(); + let mut gaps = Vec::new(); + tx_pool.gap.remove_entries_by_filter(|id, tx_entry| { if snapshot.proposals().contains_proposed(id) { - entries.push(( - Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)), - tx_entry.clone(), - )); + entries.push(tx_entry.clone()); true } else { false @@ -1412,25 +1409,20 @@ fn _update_tx_pool_for_reorg( tx_pool.pending.remove_entries_by_filter(|id, tx_entry| { if snapshot.proposals().contains_proposed(id) { - entries.push(( - Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)), - tx_entry.clone(), - )); + entries.push(tx_entry.clone()); true } else if snapshot.proposals().contains_gap(id) { - gaps.push(( - Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)), - tx_entry.clone(), - )); + gaps.push(tx_entry.clone()); true } else { false } }); - for (cycles, entry) in entries { + for entry in entries { + let cached = CacheEntry::completed(entry.cycles, entry.fee); let tx_hash = entry.transaction().hash(); - if let Err(e) = tx_pool.proposed_rtx(cycles, entry.size, entry.rtx.clone()) { + if let Err(e) = tx_pool.proposed_rtx(cached, entry.size, entry.rtx.clone()) { debug!("Failed to add proposed tx {}, reason: {}", tx_hash, e); callbacks.call_reject(tx_pool, &entry, e.clone()); } else { @@ -1438,10 +1430,11 @@ fn _update_tx_pool_for_reorg( } } - for (cycles, entry) in gaps { + for entry in gaps { debug!("tx proposed, add to gap {}", entry.transaction().hash()); let tx_hash = entry.transaction().hash(); - if let Err(e) = tx_pool.gap_rtx(cycles, entry.size, entry.rtx.clone()) { + let cached = CacheEntry::completed(entry.cycles, entry.fee); + if let Err(e) = tx_pool.gap_rtx(cached, entry.size, entry.rtx.clone()) { debug!("Failed to add tx to gap {}, reason: {}", tx_hash, e); callbacks.call_reject(tx_pool, &entry, e.clone()); }