diff --git a/.github/workflows/ci_integration_tests_windows.yaml b/.github/workflows/ci_integration_tests_windows.yaml index c59c12078c..2ee12d3e9e 100644 --- a/.github/workflows/ci_integration_tests_windows.yaml +++ b/.github/workflows/ci_integration_tests_windows.yaml @@ -51,7 +51,7 @@ jobs: ci_integration_tests_windows: name: ci_integration_tests_windows needs: prologue - runs-on: ${{ needs.prologue.outputs.windows_runner_label }} + runs-on: windows-latest timeout-minutes: 140 steps: - uses: actions/checkout@v3 diff --git a/chain/src/init.rs b/chain/src/init.rs index 4dc9d2d919..e352ed43ab 100644 --- a/chain/src/init.rs +++ b/chain/src/init.rs @@ -2,11 +2,11 @@ //! Bootstrap InitLoadUnverified, PreloadUnverifiedBlock, ChainService and ConsumeUnverified threads. use crate::chain_service::ChainService; -use crate::consume_unverified::ConsumeUnverifiedBlocks; use crate::init_load_unverified::InitLoadUnverified; use crate::orphan_broker::OrphanBroker; use crate::preload_unverified_blocks_channel::PreloadUnverifiedBlocksChannel; use crate::utils::orphan_block_pool::OrphanBlockPool; +use crate::verify::ConsumeUnverifiedBlocks; use crate::{chain_controller::ChainController, LonelyBlockHash, UnverifiedBlock}; use ckb_channel::{self as channel, SendError}; use ckb_constant::sync::BLOCK_DOWNLOAD_WINDOW; @@ -37,7 +37,7 @@ pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController { let is_pending_verify: Arc> = Arc::new(DashSet::new()); let consumer_unverified_thread = thread::Builder::new() - .name("consume_unverified_blocks".into()) + .name("verify_blocks".into()) .spawn({ let shared = builder.shared.clone(); let is_pending_verify = Arc::clone(&is_pending_verify); diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 5ffd268222..34b61ef562 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -16,7 +16,6 @@ use std::sync::Arc; mod chain_controller; mod chain_service; -pub mod consume_unverified; mod init; mod init_load_unverified; mod orphan_broker; @@ -24,6 +23,7 @@ mod preload_unverified_blocks_channel; #[cfg(test)] mod tests; mod utils; +pub mod verify; pub use chain_controller::ChainController; use ckb_logger::{error, info}; diff --git a/chain/src/tests/find_fork.rs b/chain/src/tests/find_fork.rs index 93fa67f118..2c39cfd0ca 100644 --- a/chain/src/tests/find_fork.rs +++ b/chain/src/tests/find_fork.rs @@ -1,5 +1,5 @@ -use crate::consume_unverified::ConsumeUnverifiedBlockProcessor; use crate::utils::forkchanges::ForkChanges; +use crate::verify::ConsumeUnverifiedBlockProcessor; use crate::{start_chain_services, UnverifiedBlock}; use ckb_chain_spec::consensus::{Consensus, ProposalWindow}; use ckb_proposal_table::ProposalTable; diff --git a/chain/src/consume_unverified.rs b/chain/src/verify.rs similarity index 99% rename from chain/src/consume_unverified.rs rename to chain/src/verify.rs index cf09abdaba..1b2a007aa1 100644 --- a/chain/src/consume_unverified.rs +++ b/chain/src/verify.rs @@ -103,7 +103,7 @@ impl ConsumeUnverifiedBlocks { }, }, recv(self.stop_rx) -> _ => { - info!("consume_unverified_blocks thread received exit signal, exit now"); + info!("verify_blocks thread received exit signal, exit now"); break; } diff --git a/devtools/ci/check-relaxed.sh b/devtools/ci/check-relaxed.sh new file mode 100755 index 0000000000..8f5cab29c4 --- /dev/null +++ b/devtools/ci/check-relaxed.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +set -euo pipefail + +case "$OSTYPE" in + darwin*) + if ! type gsed &>/dev/null || ! type ggrep &>/dev/null; then + echo "GNU sed and grep not found! You can install via Homebrew" >&2 + echo >&2 + echo " brew install grep gnu-sed" >&2 + exit 1 + fi + + SED=gsed + GREP=ggrep + ;; + *) + SED=sed + GREP=grep + ;; +esac + +function main() { + local res=$(find ./ -not -path '*/target/*' -type f -name "*.rs" | xargs grep -H "Relaxed") + + if [ -z "${res}" ]; then + echo "ok" + exit 0 + else + echo "find use Relaxed on code, please check" + + for file in ${res}; do + printf ${file} + done + + exit 1 + fi +} + +main "$@" diff --git a/devtools/ci/ci_main.sh b/devtools/ci/ci_main.sh index f3ff03a86b..33242b3035 100755 --- a/devtools/ci/ci_main.sh +++ b/devtools/ci/ci_main.sh @@ -60,6 +60,7 @@ case $GITHUB_WORKFLOW in make check-dirty-rpc-doc make check-dirty-hashes-toml devtools/ci/check-cyclic-dependencies.py + devtools/ci/check-relaxed.sh ;; ci_aarch64_build*) echo "ci_aarch64_build" diff --git a/network/src/network.rs b/network/src/network.rs index ea7a135eee..b5814d3482 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -485,7 +485,7 @@ impl NetworkState { /// Network message processing controller, default is true, if false, discard any received messages pub fn is_active(&self) -> bool { - self.active.load(Ordering::Relaxed) + self.active.load(Ordering::Acquire) } } @@ -1368,7 +1368,7 @@ impl NetworkController { /// Change active status, if set false discard any received messages pub fn set_active(&self, active: bool) { - self.network_state.active.store(active, Ordering::Relaxed); + self.network_state.active.store(active, Ordering::Release); } /// Return all connected peers' protocols info diff --git a/script/src/verify/tests/ckb_latest/features_since_v2021.rs b/script/src/verify/tests/ckb_latest/features_since_v2021.rs index 5ffea363ce..cb70fb0b4c 100644 --- a/script/src/verify/tests/ckb_latest/features_since_v2021.rs +++ b/script/src/verify/tests/ckb_latest/features_since_v2021.rs @@ -835,10 +835,12 @@ fn _check_typical_secp256k1_blake160_2_in_2_out_tx_with_state(step_cycles: Cycle let mut cycles = 0; let verifier = TransactionScriptsVerifierWithEnv::new(); let result = verifier.verify_map(script_version, &rtx, |verifier| { + #[allow(unused_assignments)] let mut init_state: Option = None; - if let VerifyResult::Suspended(state) = verifier.resumable_verify(step_cycles).unwrap() { - init_state = Some(state); + match verifier.resumable_verify(step_cycles).unwrap() { + VerifyResult::Suspended(state) => init_state = Some(state), + VerifyResult::Completed(cycle) => return Ok(cycle), } loop { @@ -948,12 +950,12 @@ fn _check_typical_secp256k1_blake160_2_in_2_out_tx_with_snap(step_cycles: Cycle) if script_version == crate::ScriptVersion::V2 { assert!( cycles >= TWO_IN_TWO_OUT_CYCLES - V2_CYCLE_BOUND, - "step_cycles {step_cycles}" + "cycles {cycles} step_cycles {step_cycles}" ); } else { assert!( cycles >= TWO_IN_TWO_OUT_CYCLES - CYCLE_BOUND, - "step_cycles {step_cycles}" + "cycles {cycles} step_cycles {step_cycles}" ); } assert_eq!(cycles, cycles_once, "step_cycles {step_cycles}"); diff --git a/shared/src/shared.rs b/shared/src/shared.rs index a6141e041d..2939230a5f 100644 --- a/shared/src/shared.rs +++ b/shared/src/shared.rs @@ -378,14 +378,14 @@ impl Shared { /// Return whether chain is in initial block download pub fn is_initial_block_download(&self) -> bool { // Once this function has returned false, it must remain false. - if self.ibd_finished.load(Ordering::Relaxed) { + if self.ibd_finished.load(Ordering::Acquire) { false } else if unix_time_as_millis().saturating_sub(self.snapshot().tip_header().timestamp()) > MAX_TIP_AGE { true } else { - self.ibd_finished.store(true, Ordering::Relaxed); + self.ibd_finished.store(true, Ordering::Release); false } } diff --git a/shared/src/types/header_map/kernel_lru.rs b/shared/src/types/header_map/kernel_lru.rs index 46dba8eb35..c82404658e 100644 --- a/shared/src/types/header_map/kernel_lru.rs +++ b/shared/src/types/header_map/kernel_lru.rs @@ -157,7 +157,7 @@ where self.stats().tick_primary_delete(); } // If IBD is not finished, don't shrink memory map - let allow_shrink_to_fit = self.ibd_finished.load(Ordering::Relaxed); + let allow_shrink_to_fit = self.ibd_finished.load(Ordering::Acquire); self.memory.remove(hash, allow_shrink_to_fit); if self.backend.is_empty() { return; @@ -175,7 +175,7 @@ where }); // If IBD is not finished, don't shrink memory map - let allow_shrink_to_fit = self.ibd_finished.load(Ordering::Relaxed); + let allow_shrink_to_fit = self.ibd_finished.load(Ordering::Acquire); self.memory .remove_batch(values.iter().map(|value| value.hash()), allow_shrink_to_fit); } diff --git a/sync/src/tests/types.rs b/sync/src/tests/types.rs index a6f11e9b44..e41c0827d1 100644 --- a/sync/src/tests/types.rs +++ b/sync/src/tests/types.rs @@ -8,7 +8,10 @@ use ckb_types::{ use rand::{thread_rng, Rng}; use std::{ collections::{BTreeMap, HashMap}, - sync::atomic::{AtomicUsize, Ordering::Relaxed}, + sync::atomic::{ + AtomicUsize, + Ordering::{Acquire, SeqCst}, + }, }; use crate::types::{TtlFilter, FILTER_TTL}; @@ -64,7 +67,7 @@ fn test_get_ancestor_use_skip_list() { 0, b, |hash, _| { - count.fetch_add(1, Relaxed); + count.fetch_add(1, SeqCst); header_map.get(hash).cloned() }, |_, _| None, @@ -72,7 +75,7 @@ fn test_get_ancestor_use_skip_list() { .unwrap(); // Search must finished in steps - assert!(count.load(Relaxed) <= limit); + assert!(count.load(Acquire) <= limit); header }; diff --git a/test/src/main.rs b/test/src/main.rs index d6a8187e70..038b947452 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -491,6 +491,7 @@ fn all_specs() -> Vec> { Box::new(RbfReplaceProposedSuccess), Box::new(RbfConcurrency), Box::new(RbfCellDepsCheck), + Box::new(RbfCyclingAttack), Box::new(CompactBlockEmpty), Box::new(CompactBlockEmptyParentUnknown), Box::new(CompactBlockPrefilled), diff --git a/test/src/node.rs b/test/src/node.rs index e7f906a289..ad25d7fe89 100644 --- a/test/src/node.rs +++ b/test/src/node.rs @@ -38,6 +38,16 @@ pub(crate) struct ProcessGuard { pub killed: bool, } +impl ProcessGuard { + pub(crate) fn is_alive(&mut self) -> bool { + let try_wait = self.child.try_wait(); + match try_wait { + Ok(status_op) => status_op.is_none(), + Err(_err) => false, + } + } +} + impl Drop for ProcessGuard { fn drop(&mut self) { if !self.killed { @@ -738,6 +748,15 @@ impl Node { g.take() } + pub(crate) fn is_alive(&mut self) -> bool { + let mut g = self.inner.guard.write().unwrap(); + if let Some(guard) = g.as_mut() { + guard.is_alive() + } else { + false + } + } + pub fn stop(&mut self) { drop(self.take_guard()); } diff --git a/test/src/rpc.rs b/test/src/rpc.rs index 2f91be7e83..bb4b9a81dc 100644 --- a/test/src/rpc.rs +++ b/test/src/rpc.rs @@ -174,13 +174,24 @@ impl RpcClient { } pub fn wait_rpc_ready(&self) { + self.wait_rpc_ready_internal(|| { + panic!("wait rpc ready timeout"); + }); + } + + pub fn wait_rpc_ready_internal(&self, fail: F) -> bool + where + F: Fn(), + { let now = std::time::Instant::now(); while self.inner.get_tip_block_number().is_err() { std::thread::sleep(std::time::Duration::from_millis(100)); if now.elapsed() > std::time::Duration::from_secs(60) { - panic!("wait rpc ready timeout"); + fail(); + return false; } } + true } pub fn get_block_template( diff --git a/test/src/specs/fault_injection/randomly_kill.rs b/test/src/specs/fault_injection/randomly_kill.rs index 92c6101805..e3f42a67d0 100644 --- a/test/src/specs/fault_injection/randomly_kill.rs +++ b/test/src/specs/fault_injection/randomly_kill.rs @@ -11,8 +11,23 @@ impl Spec for RandomlyKill { fn run(&self, nodes: &mut Vec) { let mut rng = thread_rng(); let node = &mut nodes[0]; - for _ in 0..rng.gen_range(10..20) { - node.rpc_client().wait_rpc_ready(); + let max_restart_times = rng.gen_range(10..20); + + let mut node_crash_times = 0; + + let mut randomly_kill_times = 0; + while randomly_kill_times < max_restart_times { + node.rpc_client().wait_rpc_ready_internal(|| {}); + + if !node.is_alive() { + node.start(); + node_crash_times += 1; + + if node_crash_times > 3 { + panic!("Node crash too many times"); + } + } + let n = rng.gen_range(0..10); // TODO: the kill of child process and mining are actually sequential here // We need to find some way to so these two things in parallel. @@ -25,6 +40,7 @@ impl Spec for RandomlyKill { } info!("Stop the node"); node.stop_gracefully(); + randomly_kill_times += 1; info!("Start the node"); node.start(); } diff --git a/test/src/specs/tx_pool/replace.rs b/test/src/specs/tx_pool/replace.rs index 3d5a61c6bc..b6ef175906 100644 --- a/test/src/specs/tx_pool/replace.rs +++ b/test/src/specs/tx_pool/replace.rs @@ -1013,6 +1013,142 @@ impl Spec for RbfCellDepsCheck { } } +pub struct RbfCyclingAttack; +impl Spec for RbfCyclingAttack { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + + let initial_inputs = gen_spendable(node0, 3); + let input_a = &initial_inputs[0]; + let input_b = &initial_inputs[1]; + let input_c = &initial_inputs[2]; + + let input_c: CellInput = CellInput::new_builder() + .previous_output(input_c.out_point.clone()) + .build(); + + // Commit transaction root + let tx_a = { + let tx_a = always_success_transaction(node0, input_a); + node0.submit_transaction(&tx_a); + tx_a + }; + + let tx_b = { + let tx_b = always_success_transaction(node0, input_b); + node0.submit_transaction(&tx_b); + tx_b + }; + + let mut prev = tx_a.clone(); + // Create transaction chain, A0 -> A1 -> A2 + let mut txs_chain_a = vec![tx_a]; + for _i in 0..2 { + let input = + CellMetaBuilder::from_cell_output(prev.output(0).unwrap(), Default::default()) + .out_point(OutPoint::new(prev.hash(), 0)) + .build(); + let cur = always_success_transaction(node0, &input); + txs_chain_a.push(cur.clone()); + let _ = node0.rpc_client().send_transaction(cur.data().into()); + prev = cur.clone(); + } + + // Create transaction chain, B0 -> B1 + let mut txs_chain_b = vec![tx_b.clone()]; + let mut prev = tx_b; + for _i in 0..1 { + let input = + CellMetaBuilder::from_cell_output(prev.output(0).unwrap(), Default::default()) + .out_point(OutPoint::new(prev.hash(), 0)) + .build(); + let cur = always_success_transaction(node0, &input); + txs_chain_b.push(cur.clone()); + let _ = node0.rpc_client().send_transaction(cur.data().into()); + prev = cur.clone(); + } + let tx_b1 = txs_chain_b[1].clone(); + eprintln!("tx_b1 {:?}", tx_b1.proposal_short_id()); + + // Create a child transaction consume B0 and A1 + // A0 ---> A1 ---> A2 + // | + // ----------> B2 + // | + // B0 ---> B1 + let tx_a1 = &txs_chain_a[1]; + let tx_b0 = &txs_chain_b[0]; + + let input_a1: CellInput = CellInput::new_builder() + .previous_output(OutPoint::new(tx_a1.hash(), 0)) + .build(); + let input_b0 = CellInput::new_builder() + .previous_output(OutPoint::new(tx_b0.hash(), 0)) + .build(); + + let tx_b2_output = CellOutputBuilder::default() + .capacity(capacity_bytes!(200).pack()) + .build(); + let tx_b2 = tx_a1 + .as_advanced_builder() + .set_inputs(vec![input_a1, input_b0]) + .set_outputs(vec![tx_b2_output]) + .build(); + let res = node0.rpc_client().send_transaction(tx_b2.data().into()); + eprintln!("tx_b2 {:?}", res); + + // after A2 and B1 is replaced by B2 + // A0 ---> A1 + // | + // ----------> B2 + // | + // B0 + let res = node0.rpc_client().get_transaction(tx_b2.hash()); + assert_eq!(res.tx_status.status, Status::Pending); + let res = node0.rpc_client().get_transaction(txs_chain_a[2].hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + let res = node0.rpc_client().get_transaction(txs_chain_b[1].hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + + // tx_b1 is still rejected + let res = node0.rpc_client().get_transaction(tx_b1.hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + + // Create a new transaction A3 consume A1, it will replace B2 + let input_a1 = CellInput::new_builder() + .previous_output(OutPoint::new(tx_a1.hash(), 0)) + .build(); + let tx_a3_output = CellOutputBuilder::default() + .capacity(capacity_bytes!(100).pack()) + .build(); + let tx_a3 = tx_a1 + .as_advanced_builder() + .set_inputs(vec![input_a1, input_c]) + .set_outputs(vec![tx_a3_output]) + .build(); + let _res = node0.rpc_client().send_transaction(tx_a3.data().into()); + + // now result is: + // A0 ---> A1 -> A3 + // + // B0 -> B1 (B1 is recovered back) + // + let res = node0.rpc_client().get_transaction(tx_a3.hash()); + assert_eq!(res.tx_status.status, Status::Pending); + let res = node0.rpc_client().get_transaction(tx_b2.hash()); + assert_eq!(res.tx_status.status, Status::Rejected); + eprintln!("tx_b1 {:?}", tx_b1.proposal_short_id()); + + // B1 is expected by recovered back + let res = node0.rpc_client().get_transaction(tx_b1.hash()); + assert_eq!(res.tx_status.status, Status::Pending); + } + + fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) { + config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500); + } +} + fn run_spec_send_conflict_relay(nodes: &mut [Node]) { let node0 = &nodes[0]; let node1 = &nodes[1]; diff --git a/test/src/specs/tx_pool/send_large_cycles_tx.rs b/test/src/specs/tx_pool/send_large_cycles_tx.rs index 3ae8eada43..ce4d4ec09e 100644 --- a/test/src/specs/tx_pool/send_large_cycles_tx.rs +++ b/test/src/specs/tx_pool/send_large_cycles_tx.rs @@ -117,7 +117,7 @@ impl Spec for SendLargeCyclesTxToRelay { }); assert!(result, "node0 can't sync with node1"); - let result = wait_until(60, || { + let result = wait_until(120, || { node0 .rpc_client() .get_transaction(tx.hash()) diff --git a/tx-pool/src/pool.rs b/tx-pool/src/pool.rs index 54b2395e4c..da259d413c 100644 --- a/tx-pool/src/pool.rs +++ b/tx-pool/src/pool.rs @@ -28,6 +28,7 @@ use std::sync::Arc; const COMMITTED_HASH_CACHE_SIZE: usize = 100_000; const CONFLICTES_CACHE_SIZE: usize = 10_000; +const CONFLICTES_INPUTS_CACHE_SIZE: usize = 30_000; const MAX_REPLACEMENT_CANDIDATES: usize = 100; /// Tx-pool implementation @@ -44,6 +45,8 @@ pub struct TxPool { pub(crate) expiry: u64, // conflicted transaction cache pub(crate) conflicts_cache: lru::LruCache, + // conflicted transaction outputs cache, input -> tx_short_id + pub(crate) conflicts_outputs_cache: lru::LruCache, } impl TxPool { @@ -59,6 +62,7 @@ impl TxPool { recent_reject, expiry, conflicts_cache: LruCache::new(CONFLICTES_CACHE_SIZE), + conflicts_outputs_cache: lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE), } } @@ -158,6 +162,9 @@ impl TxPool { pub(crate) fn record_conflict(&mut self, tx: TransactionView) { let short_id = tx.proposal_short_id(); + for inputs in tx.input_pts_iter() { + self.conflicts_outputs_cache.put(inputs, short_id.clone()); + } self.conflicts_cache.put(short_id.clone(), tx); debug!( "record_conflict {:?} now cache size: {}", @@ -167,7 +174,11 @@ impl TxPool { } pub(crate) fn remove_conflict(&mut self, short_id: &ProposalShortId) { - self.conflicts_cache.pop(short_id); + if let Some(tx) = self.conflicts_cache.pop(short_id) { + for inputs in tx.input_pts_iter() { + self.conflicts_outputs_cache.pop(&inputs); + } + } debug!( "remove_conflict {:?} now cache size: {}", short_id, @@ -175,6 +186,19 @@ impl TxPool { ); } + pub(crate) fn get_conflicted_txs_from_inputs( + &self, + inputs: impl Iterator, + ) -> Vec { + inputs + .filter_map(|input| { + self.conflicts_outputs_cache + .peek(&input) + .and_then(|id| self.conflicts_cache.peek(id).cloned()) + }) + .collect() + } + /// Returns tx with cycles corresponding to the id. pub(crate) fn get_tx_with_cycles( &self, @@ -493,6 +517,7 @@ impl TxPool { self.snapshot = snapshot; self.committed_txs_hash_cache = LruCache::new(COMMITTED_HASH_CACHE_SIZE); self.conflicts_cache = LruCache::new(CONFLICTES_CACHE_SIZE); + self.conflicts_outputs_cache = lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE); } pub(crate) fn package_proposals( diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 4ca169c2c1..2594901784 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -132,26 +132,11 @@ impl TxPoolService { time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?; } - // try to remove conflicted tx here - for id in conflicts.iter() { - let removed = tx_pool.pool_map.remove_entry_and_descendants(id); - for old in removed { - debug!( - "remove conflict tx {} for RBF by new tx {}", - old.transaction().hash(), - entry.transaction().hash() - ); - let reject = Reject::RBFRejected(format!( - "replaced by tx {}", - entry.transaction().hash() - )); - // RBF replace successfully, put old transactions into conflicts pool - tx_pool.record_conflict(old.transaction().clone()); - // after removing old tx from tx_pool, we call reject callbacks manually - self.callbacks.call_reject(tx_pool, &old, reject); - } - } + let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts); let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?; + + // in a corner case, a tx with lower fee rate may be rejected immediately + // after inserting into pool, return proper reject error here for evict in evicted { let reject = Reject::Invalidated(format!( "invalidated by tx {}", @@ -159,12 +144,23 @@ impl TxPoolService { )); self.callbacks.call_reject(tx_pool, &evict, reject); } + tx_pool.remove_conflict(&entry.proposal_short_id()); - // in a corner case, a tx with lower fee rate may be rejected immediately - // after inserting into pool, return proper reject error here tx_pool .limit_size(&self.callbacks, Some(&entry.proposal_short_id())) .map_or(Ok(()), Err)?; + + if !may_recovered_txs.is_empty() { + let self_clone = self.clone(); + tokio::spawn(async move { + // push the recovered txs back to verify queue, so that they can be verified and submitted again + let mut queue = self_clone.verify_queue.write().await; + for tx in may_recovered_txs { + debug!("recover back: {:?}", tx.proposal_short_id()); + let _ = queue.add_tx(tx, None); + } + }); + } Ok(()) }) .await; @@ -200,6 +196,55 @@ impl TxPoolService { } } + // try to remove conflicted tx here, the returned txs can be re-verified and re-submitted + // since they maybe not conflicted anymore + fn process_rbf( + &self, + tx_pool: &mut TxPool, + entry: &TxEntry, + conflicts: &HashSet, + ) -> Vec { + let mut may_recovered_txs = vec![]; + let mut available_inputs = HashSet::new(); + + if conflicts.is_empty() { + return may_recovered_txs; + } + + let all_removed: Vec<_> = conflicts + .iter() + .flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id)) + .collect(); + + available_inputs.extend( + all_removed + .iter() + .flat_map(|removed| removed.transaction().input_pts_iter()), + ); + + for input in entry.transaction().input_pts_iter() { + available_inputs.remove(&input); + } + + may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter()); + for old in all_removed { + debug!( + "remove conflict tx {} for RBF by new tx {}", + old.transaction().hash(), + entry.transaction().hash() + ); + let reject = + Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash())); + + // RBF replace successfully, put old transactions into conflicts pool + tx_pool.record_conflict(old.transaction().clone()); + // after removing old tx from tx_pool, we call reject callbacks manually + self.callbacks.call_reject(tx_pool, &old, reject); + } + assert!(!may_recovered_txs.contains(entry.transaction())); + may_recovered_txs + } + pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool { let queue = self.verify_queue.read().await; queue.contains_key(&tx.proposal_short_id()) diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index 83621e465e..6645550af8 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -177,13 +177,13 @@ macro_rules! send_notify { impl TxPoolController { /// Return whether tx-pool service is started pub fn service_started(&self) -> bool { - self.started.load(Ordering::Relaxed) + self.started.load(Ordering::Acquire) } /// Set tx-pool service started, should only used for test #[cfg(feature = "internal")] pub fn set_service_started(&self, v: bool) { - self.started.store(v, Ordering::Relaxed); + self.started.store(v, Ordering::Release); } /// Return reference of tokio runtime handle @@ -658,7 +658,7 @@ impl TxPoolServiceBuilder { } } }); - self.started.store(true, Ordering::Relaxed); + self.started.store(true, Ordering::Release); if let Err(err) = self.tx_pool_controller.load_persisted_data(txs) { error!("Failed to import persistent txs, cause: {}", err); } @@ -1117,10 +1117,10 @@ impl TxPoolService { } pub fn after_delay(&self) -> bool { - self.after_delay.load(Ordering::Relaxed) + self.after_delay.load(Ordering::Acquire) } pub fn set_after_delay_true(&self) { - self.after_delay.store(true, Ordering::Relaxed); + self.after_delay.store(true, Ordering::Release); } } diff --git a/util/app-config/src/tests/bats_tests/cli_test.sh b/util/app-config/src/tests/bats_tests/cli_test.sh index 25ee673c26..bd5fc1318c 100755 --- a/util/app-config/src/tests/bats_tests/cli_test.sh +++ b/util/app-config/src/tests/bats_tests/cli_test.sh @@ -9,6 +9,25 @@ function cleanup { rm -rf ${CKB_BATS_TESTBED} } +git_clone_repo_with_retry() { + local branch=$1 + local repo_address=$2 + local dir_name=$3 + local retry_count=5 + local retry_delay=5 + + for i in $(seq 1 $retry_count); do + git clone --depth 1 --branch "$branch" "$repo_address" "$dir_name" && break + echo "Attempt $i failed. Retrying in $retry_delay seconds..." + sleep $retry_delay + done + + if [ $i -eq $retry_count ]; then + echo "Failed to clone repository after $retry_count attempts." + exit 1 + fi +} + trap cleanup EXIT cp target/prod/ckb ${CKB_BATS_TESTBED} @@ -17,24 +36,25 @@ cp -r util/app-config/src/tests/bats_tests/later_bats_job ${CKB_BATS_TESTBED} cp util/app-config/src/tests/bats_tests/*.sh ${CKB_BATS_TESTBED} if [ ! -d "/tmp/ckb_bats_assets/" ]; then - git clone --depth=1 https://github.com/nervosnetwork/ckb-assets /tmp/ckb_bats_assets + git_clone_repo_with_retry "main" "https://github.com/nervosnetwork/ckb-assets" "/tmp/ckb_bats_assets" fi cp /tmp/ckb_bats_assets/cli_bats_env/ckb_mainnet_4000.json ${CKB_BATS_TESTBED} CKB_BATS_CORE_DIR=/tmp/ckb_bats_core if [ ! -d "${CKB_BATS_CORE_DIR}/bats" ]; then - git clone --depth 1 --branch v1.9.0 https://github.com/bats-core/bats-core.git ${CKB_BATS_CORE_DIR}/bats - ${CKB_BATS_CORE_DIR}/bats/install.sh /tmp/ckb_bats_bin/tmp_install + git_clone_repo_with_retry "v1.9.0" "https://github.com/bats-core/bats-core.git" "${CKB_BATS_CORE_DIR}/bats" + ${CKB_BATS_CORE_DIR}/bats/install.sh /tmp/ckb_bats_bin/tmp_install fi if [ ! -d "${CKB_BATS_CORE_DIR}/bats-support" ]; then - git clone --depth 1 --branch v0.3.0 https://github.com/bats-core/bats-support.git ${CKB_BATS_CORE_DIR}/bats-support + git_clone_repo_with_retry "v0.3.0" "https://github.com/bats-core/bats-support.git" "${CKB_BATS_CORE_DIR}/bats-support" fi bash ${CKB_BATS_CORE_DIR}/bats-support/load.bash if [ ! -d "${CKB_BATS_CORE_DIR}/bats-assert" ]; then - git clone --depth 1 --branch v2.1.0 https://github.com/bats-core/bats-assert.git ${CKB_BATS_CORE_DIR}/bats-assert + git_clone_repo_with_retry "v2.1.0" "https://github.com/bats-core/bats-assert.git" "${CKB_BATS_CORE_DIR}/bats-assert" fi + bash ${CKB_BATS_CORE_DIR}/bats-assert/load.bash cd ${CKB_BATS_TESTBED} diff --git a/util/systemtime/src/lib.rs b/util/systemtime/src/lib.rs index 3f845bb11b..aed840fa23 100644 --- a/util/systemtime/src/lib.rs +++ b/util/systemtime/src/lib.rs @@ -58,14 +58,14 @@ impl FaketimeGuard { /// Set faketime #[cfg(feature = "enable_faketime")] pub fn set_faketime(&self, time: u64) { - FAKETIME.store(time, Ordering::Relaxed); + FAKETIME.store(time, Ordering::Release); FAKETIME_ENABLED.store(true, Ordering::SeqCst); } /// Disable faketime #[cfg(feature = "enable_faketime")] pub fn disable_faketime(&self) { - FAKETIME_ENABLED.store(false, Ordering::Relaxed); + FAKETIME_ENABLED.store(false, Ordering::Release); } }