diff --git a/script/src/syscalls/pause.rs b/script/src/syscalls/pause.rs index 48044fe013..e6a81e44af 100644 --- a/script/src/syscalls/pause.rs +++ b/script/src/syscalls/pause.rs @@ -28,6 +28,8 @@ impl Syscalls for Pause { if self.skip.load(Ordering::SeqCst) { return Ok(true); } - Err(VMError::CyclesExceeded) + // FIXME(yukang): currently we change to use VMInternalError::CyclesExceeded | VMInternalError::Pause + // as a flag to Suspend, only for compatibility with old tests, maybe we need cleanup this later. + Err(VMError::Pause) } } diff --git a/script/src/syscalls/spawn.rs b/script/src/syscalls/spawn.rs index c5a8eb7548..15301bc199 100644 --- a/script/src/syscalls/spawn.rs +++ b/script/src/syscalls/spawn.rs @@ -235,7 +235,7 @@ where Ok(true) } Err(err) => { - if matches!(err, VMError::CyclesExceeded | VMError::Pause) { + if matches!(err, VMError::Pause | VMError::CyclesExceeded) { let mut context = self.context.lock().map_err(|e| { VMError::Unexpected(format!("Failed to acquire lock: {}", e)) })?; diff --git a/script/src/verify.rs b/script/src/verify.rs index 880a40b53a..0d2dff5c56 100644 --- a/script/src/verify.rs +++ b/script/src/verify.rs @@ -1353,7 +1353,7 @@ fn run_vms( } } Err(error) => match error { - VMInternalError::CyclesExceeded => { + VMInternalError::CyclesExceeded | VMInternalError::Pause => { let mut new_suspended_machines: Vec<_> = { let mut context = context.lock().map_err(|e| { ScriptError::Other(format!("Failed to acquire lock: {}", e)) @@ -1396,9 +1396,13 @@ async fn run_vms_with_signal( )); } - let pause = machines[0].pause(); + let mut pause = machines[0].pause(); let (finished_send, mut finished_recv) = mpsc::unbounded_channel::<(Result, u64)>(); + + // send initial `Resume` command to child + // it's maybe useful to set initial command to `signal.borrow().to_owned()` + // so that we can control the initial state of child, which is useful for testing purpose let (child_sender, child_recv) = watch::channel(ChunkCommand::Resume); let jh = tokio::spawn( @@ -1406,6 +1410,7 @@ async fn run_vms_with_signal( ); loop { + //eprintln!("parent wait to receive: {:?}", signal.borrow().to_owned()); tokio::select! { _ = signal.changed() => { match signal.borrow().to_owned() { @@ -1413,7 +1418,8 @@ async fn run_vms_with_signal( pause.interrupt(); } ChunkCommand::Resume => { - child_sender.send(ChunkCommand::Resume).unwrap(); + pause.free(); + let _res = child_sender.send(ChunkCommand::Resume); } } } @@ -1451,6 +1457,8 @@ async fn run_vms_child( ) { let (mut exit_code, mut cycles, mut spawn_data) = (0, 0, None); child_recv.mark_changed(); + // mark changed to make sure we can receive initial command + // and start to run immediately loop { select! { _ = child_recv.changed() => { @@ -1501,7 +1509,6 @@ async fn run_vms_child( new_suspended_machines.reverse(); machines.push(machine); machines.append(&mut new_suspended_machines); - eprintln!("suspend here: {:?}", machines.len()); // break to wait for Resume command to begin next loop iteration break; } diff --git a/script/src/verify/tests/ckb_latest/features_since_v2021.rs b/script/src/verify/tests/ckb_latest/features_since_v2021.rs index 819d247443..a9b8176bfd 100644 --- a/script/src/verify/tests/ckb_latest/features_since_v2021.rs +++ b/script/src/verify/tests/ckb_latest/features_since_v2021.rs @@ -408,6 +408,42 @@ fn check_exec_wrong_callee_format() { assert!(result.is_err()); } +#[tokio::test] +async fn async_check_exec_wrong_callee_format() { + let script_version = SCRIPT_VERSION; + + let (exec_caller_cell, exec_caller_data_hash) = + load_cell_from_path("testdata/exec_caller_from_cell_data"); + let (exec_callee_cell, _exec_caller_data_hash) = + load_cell_from_slice(&[0x00, 0x01, 0x02, 0x03]); + + let exec_caller_script = Script::new_builder() + .hash_type(script_version.data_hash_type().into()) + .code_hash(exec_caller_data_hash) + .build(); + let output = CellOutputBuilder::default() + .capacity(capacity_bytes!(100).pack()) + .lock(exec_caller_script) + .build(); + let input = CellInput::new(OutPoint::null(), 0); + + let transaction = TransactionBuilder::default().input(input).build(); + let dummy_cell = create_dummy_cell(output); + + let rtx = ResolvedTransaction { + transaction, + resolved_cell_deps: vec![exec_caller_cell, exec_callee_cell], + resolved_inputs: vec![dummy_cell], + resolved_dep_groups: vec![], + }; + + let verifier = TransactionScriptsVerifierWithEnv::new(); + let result = verifier + .verify_without_limit_async(script_version, &rtx) + .await; + assert!(result.is_err()); +} + #[test] fn check_exec_big_offset_length() { let script_version = SCRIPT_VERSION; @@ -518,7 +554,7 @@ fn _check_type_id_one_in_one_out_resume(step_cycles: Cycle) -> Result<(), TestCa } } Err(error) => match error { - VMInternalError::CyclesExceeded => { + VMInternalError::CyclesExceeded | VMInternalError::Pause => { tmp = Some(vm); limit += step_cycles; continue; @@ -718,7 +754,7 @@ fn _check_typical_secp256k1_blake160_2_in_2_out_tx_with_chunk(step_cycles: Cycle } } Err(error) => match error { - VMInternalError::CyclesExceeded => { + VMInternalError::CyclesExceeded | VMInternalError::Pause => { tmp = Some(vm); limit += step_cycles; continue; diff --git a/script/src/verify/tests/ckb_latest/features_since_v2023.rs b/script/src/verify/tests/ckb_latest/features_since_v2023.rs index aafed8439d..c0dbb80c71 100644 --- a/script/src/verify/tests/ckb_latest/features_since_v2023.rs +++ b/script/src/verify/tests/ckb_latest/features_since_v2023.rs @@ -647,6 +647,88 @@ fn check_spawn_snapshot() { assert!(chunks_count > 1); } +#[tokio::test] +async fn check_spawn_async() { + let script_version = SCRIPT_VERSION; + if script_version <= ScriptVersion::V1 { + return; + } + + let (spawn_caller_cell, spawn_caller_data_hash) = + load_cell_from_path("testdata/spawn_caller_exec"); + let (snapshot_cell, _) = load_cell_from_path("testdata/current_cycles_with_snapshot"); + + let spawn_caller_script = Script::new_builder() + .hash_type(script_version.data_hash_type().into()) + .code_hash(spawn_caller_data_hash) + .build(); + let output = CellOutputBuilder::default() + .capacity(capacity_bytes!(100).pack()) + .lock(spawn_caller_script) + .build(); + let input = CellInput::new(OutPoint::null(), 0); + + let transaction = TransactionBuilder::default().input(input).build(); + let dummy_cell = create_dummy_cell(output); + + let rtx = ResolvedTransaction { + transaction, + resolved_cell_deps: vec![spawn_caller_cell, snapshot_cell], + resolved_inputs: vec![dummy_cell], + resolved_dep_groups: vec![], + }; + let verifier = TransactionScriptsVerifierWithEnv::new(); + let result = verifier.verify_without_pause(script_version, &rtx, Cycle::MAX); + let cycles_once = result.unwrap(); + + // we use debug pause to test context resume + // `current_cycles_with_snapshot` will try to pause verifier + // here we use `channel` to send Resume to verifier until it completes + let (command_tx, mut command_rx) = watch::channel(ChunkCommand::Resume); + let _jt = tokio::spawn(async move { + loop { + let res = command_tx.send(ChunkCommand::Resume); + if res.is_err() { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + } + }); + let cycles = verifier + .verify_complete_async(script_version, &rtx, &mut command_rx, false) + .await + .unwrap(); + assert_eq!(cycles, cycles_once); + + // we send Resume/Suspend to command_rx in a loop, make sure cycles is still the same + let (command_tx, mut command_rx) = watch::channel(ChunkCommand::Resume); + let _jt = tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + let res = command_tx.send(ChunkCommand::Suspend); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + if res.is_err() { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + let res = command_tx.send(ChunkCommand::Resume); + if res.is_err() { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + + let _res = command_tx.send(ChunkCommand::Suspend); + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; + } + }); + + let cycles = verifier + .verify_complete_async(script_version, &rtx, &mut command_rx, true) + .await + .unwrap(); + assert_eq!(cycles, cycles_once); +} + #[test] fn check_spawn_state() { let script_version = SCRIPT_VERSION; diff --git a/script/src/verify/tests/utils.rs b/script/src/verify/tests/utils.rs index 8eaffcd64d..c355523047 100644 --- a/script/src/verify/tests/utils.rs +++ b/script/src/verify/tests/utils.rs @@ -173,6 +173,76 @@ impl TransactionScriptsVerifierWithEnv { self.verify(version, rtx, u64::MAX) } + pub(crate) async fn verify_without_limit_async( + &self, + version: ScriptVersion, + rtx: &ResolvedTransaction, + ) -> Result { + let data_loader = self.store.as_data_loader(); + let epoch = match version { + ScriptVersion::V0 => EpochNumberWithFraction::new(0, 0, 1), + ScriptVersion::V1 => EpochNumberWithFraction::new(self.version_1_enabled_at, 0, 1), + ScriptVersion::V2 => EpochNumberWithFraction::new(self.version_2_enabled_at, 0, 1), + }; + let header = HeaderView::new_advanced_builder() + .epoch(epoch.pack()) + .build(); + let tx_env = Arc::new(TxVerifyEnv::new_commit(&header)); + let verifier = TransactionScriptsVerifier::new( + Arc::new(rtx.clone()), + data_loader, + Arc::clone(&self.consensus), + tx_env, + ); + + let (_command_tx, mut command_rx) = tokio::sync::watch::channel(ChunkCommand::Resume); + let res = verifier + .resumable_verify_with_signal(u64::MAX, &mut command_rx) + .await; + match res { + Ok(VerifyResult::Completed(cycle)) => Ok(cycle), + Ok(VerifyResult::Suspended(_)) => unreachable!(), + Err(err) => Err(err), + } + } + + pub(crate) async fn verify_complete_async( + &self, + version: ScriptVersion, + rtx: &ResolvedTransaction, + command_rx: &mut tokio::sync::watch::Receiver, + skip_debug_pause: bool, + ) -> Result { + let data_loader = self.store.as_data_loader(); + let epoch = match version { + ScriptVersion::V0 => EpochNumberWithFraction::new(0, 0, 1), + ScriptVersion::V1 => EpochNumberWithFraction::new(self.version_1_enabled_at, 0, 1), + ScriptVersion::V2 => EpochNumberWithFraction::new(self.version_2_enabled_at, 0, 1), + }; + let header = HeaderView::new_advanced_builder() + .epoch(epoch.pack()) + .build(); + let tx_env = Arc::new(TxVerifyEnv::new_commit(&header)); + let verifier = TransactionScriptsVerifier::new( + Arc::new(rtx.clone()), + data_loader, + Arc::clone(&self.consensus), + tx_env, + ); + + if skip_debug_pause { + verifier.set_skip_pause(true); + } + let res = verifier + .resumable_verify_with_signal(Cycle::MAX, command_rx) + .await; + match res { + Ok(VerifyResult::Completed(cycle)) => Ok(cycle), + Ok(VerifyResult::Suspended(_)) => unreachable!(), + Err(err) => Err(err), + } + } + // If the max cycles is meaningless, please use `verify_without_limit`, // so reviewers or developers can understand the intentions easier. pub(crate) fn verify( diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 342931fb56..eeff0c8221 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -303,7 +303,6 @@ impl TxPoolService { } if let Some((ret, snapshot)) = self._resumeble_process_tx(tx.clone(), remote).await { - eprintln!("resumeble_process_tx: ret = {:?}", ret); match ret { Ok(processed) => { if let ProcessResult::Completed(completed) = processed {