diff --git a/Cargo.lock b/Cargo.lock index de2ae82..a5b6340 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2263,6 +2263,7 @@ dependencies = [ "log", "once_cell", "otx-format", + "otx-plugin-protocol", "rand 0.8.5", "reqwest", "secp256k1", diff --git a/integration-test/Cargo.lock b/integration-test/Cargo.lock index bf4dce1..59a0497 100644 --- a/integration-test/Cargo.lock +++ b/integration-test/Cargo.lock @@ -2351,6 +2351,7 @@ dependencies = [ "log", "once_cell", "otx-format", + "otx-plugin-protocol", "rand 0.8.5", "reqwest", "secp256k1 0.24.3", diff --git a/integration-test/README.md b/integration-test/README.md index 934abc1..2ee0ea1 100644 --- a/integration-test/README.md +++ b/integration-test/README.md @@ -13,28 +13,28 @@ - create tables and indexes ```bash -cd integration +cd integration-test psql -h localhost -U postgres -d mercury-otx-dev -f devtools/create_table.sql ``` ### Init CKB ```bash -cd integration +cd integration-test rm -rf ./dev_chain/dev/data ./free-space ``` ### Run integration tests ```bash -cd integration +cd integration-test cargo run ``` or ```bash -cd integration +cd integration-test cargo run -- -t test_service_rpc ``` @@ -151,7 +151,7 @@ If you need to deploy contract scripts on the dev chain, you need to do the foll - run integration tests ```bash - cd integration + cd integration-test cargo run ``` diff --git a/integration-test/src/help.rs b/integration-test/src/help.rs index 4432f9f..90dbfe1 100644 --- a/integration-test/src/help.rs +++ b/integration-test/src/help.rs @@ -127,14 +127,16 @@ pub(crate) fn start_mercury(ckb: Child) -> (Child, Child) { panic!("Setup test environment failed"); } -pub(crate) fn start_otx_pool(address: Address, pk: H256) { +pub(crate) fn start_otx_pool(address: Address, pk: Option) { let mut lock = CURRENT_OTX_POOL_SERVICE_PROCESS.lock().unwrap(); if let Some(child) = lock.as_mut() { child.kill().unwrap(); } - env::set_var("PRIVKEY", pk.to_string()); env::set_var("DEFAUT_ADDRESS", address.to_string()); + if let Some(pk) = pk { + env::set_var("PRIVKEY", pk.to_string()); + } let service = run_command_spawn( "cargo", diff --git a/integration-test/src/tests/mod.rs b/integration-test/src/tests/mod.rs index 64ba95a..22a6af4 100644 --- a/integration-test/src/tests/mod.rs +++ b/integration-test/src/tests/mod.rs @@ -1,6 +1,7 @@ mod helper; mod payment; mod rpc; +mod signer; mod swap; #[derive(Debug)] diff --git a/integration-test/src/tests/payment/dust_collector.rs b/integration-test/src/tests/payment/dust_collector.rs index b6e23fa..ba1ae1d 100644 --- a/integration-test/src/tests/payment/dust_collector.rs +++ b/integration-test/src/tests/payment/dust_collector.rs @@ -30,7 +30,7 @@ fn test_payment_dust_collect_ckb() { let (dust_collector_address, pk) = generate_rand_secp_address_pk_pair(); prepare_ckb_capacity(&dust_collector_address, 200_0000_0000u64).unwrap(); prepare_udt_1(200u128, &dust_collector_address).unwrap(); - start_otx_pool(dust_collector_address.clone(), pk); + start_otx_pool(dust_collector_address.clone(), Some(pk)); // check dust collector assets let payload = GetBalancePayload { @@ -107,7 +107,7 @@ fn test_payment_dust_collect_ckb() { assert_eq!(200u128, response.balances[1].free.into()); } -fn build_pay_ckb_otx( +pub(crate) fn build_pay_ckb_otx( payer: &str, prepare_capacity: usize, remain_capacity: usize, diff --git a/integration-test/src/tests/payment/mod.rs b/integration-test/src/tests/payment/mod.rs index af9ee20..350ee00 100644 --- a/integration-test/src/tests/payment/mod.rs +++ b/integration-test/src/tests/payment/mod.rs @@ -1 +1 @@ -mod dust_collector; +pub mod dust_collector; diff --git a/integration-test/src/tests/rpc.rs b/integration-test/src/tests/rpc.rs index 4ce2e9f..cce68df 100644 --- a/integration-test/src/tests/rpc.rs +++ b/integration-test/src/tests/rpc.rs @@ -17,7 +17,7 @@ inventory::submit!(IntegrationTest { }); fn test_service_rpc() { let (address, pk) = generate_rand_secp_address_pk_pair(); - start_otx_pool(address, pk); + start_otx_pool(address, Some(pk)); let service_client = OtxPoolRpcClient::new(OTX_POOL_URI.to_string()); let ret = service_client.submit_otx(JsonBytes::default()); @@ -32,7 +32,7 @@ inventory::submit!(IntegrationTest { }); fn test_service_rpc_submit_otx() { let (address, pk) = generate_rand_secp_address_pk_pair(); - start_otx_pool(address, pk); + start_otx_pool(address, Some(pk)); let tx_info = build_pay_ckb_signed_otx("alice", 151, 100, 51).unwrap(); let tx_view = tx_info.tx; @@ -47,6 +47,8 @@ fn test_service_rpc_submit_otx() { let status = service_client.query_otx_status_by_id(id).unwrap().unwrap(); assert_eq!(status, OpenTxStatus::Pending); - let ret = service_client.query_otx_status_by_id(H256::default()).unwrap(); + let ret = service_client + .query_otx_status_by_id(H256::default()) + .unwrap(); assert!(ret.is_none()); } diff --git a/integration-test/src/tests/signer/mod.rs b/integration-test/src/tests/signer/mod.rs new file mode 100644 index 0000000..aef0333 --- /dev/null +++ b/integration-test/src/tests/signer/mod.rs @@ -0,0 +1,131 @@ +use crate::const_definition::{CKB_URI, MERCURY_URI, OTX_POOL_URI}; +use crate::help::start_otx_pool; +use crate::tests::payment::dust_collector::build_pay_ckb_otx; +use crate::utils::client::mercury_client::MercuryRpcClient; +use crate::utils::instruction::ckb::aggregate_transactions_into_blocks; +use crate::utils::instruction::mercury::{prepare_ckb_capacity, prepare_udt_1}; +use crate::utils::lock::secp::generate_rand_secp_address_pk_pair; +use crate::IntegrationTest; + +use otx_format::jsonrpc_types::tx_view::tx_view_to_basic_otx; +use otx_format::types::OpenTxStatus; +use utils::aggregator::SignInfo; +use utils::client::otx_pool_client::OtxPoolRpcClient; + +use ckb_jsonrpc_types::JsonBytes; +use ckb_types::prelude::Entity; +use core_rpc_types::{GetBalancePayload, JsonItem}; +use utils::config::CkbConfig; + +use std::collections::HashSet; +use std::thread::sleep; +use std::time::Duration; + +inventory::submit!(IntegrationTest { + name: "test_plugin_rpc_get_plugin_info", + test_fn: test_plugin_rpc_get_plugin_info +}); +fn test_plugin_rpc_get_plugin_info() { + let (address, pk) = generate_rand_secp_address_pk_pair(); + start_otx_pool(address, Some(pk)); + + let service_client = OtxPoolRpcClient::new(OTX_POOL_URI.to_string()); + let plugin_info = service_client.get_signer_info().unwrap(); + assert_eq!(plugin_info.name, "singer"); +} + +inventory::submit!(IntegrationTest { + name: "test_plugin_rpc_get_pending_sign_otxs", + test_fn: test_plugin_rpc_get_pending_sign_otxs +}); +fn test_plugin_rpc_get_pending_sign_otxs() { + let (address, pk) = generate_rand_secp_address_pk_pair(); + start_otx_pool(address.clone(), Some(pk)); + + let service_client = OtxPoolRpcClient::new(OTX_POOL_URI.to_string()); + let otxs = service_client + .get_pending_sign_otxs(address.to_string()) + .unwrap(); + assert_eq!(otxs.len(), 0); +} + +inventory::submit!(IntegrationTest { + name: "test_plugin_rpc_get_pending_sign_otxs_with_one_otx", + test_fn: test_plugin_rpc_get_pending_sign_otxs_with_one_otx +}); +fn test_plugin_rpc_get_pending_sign_otxs_with_one_otx() { + // run otx pool + let (dust_collector_address, pk) = generate_rand_secp_address_pk_pair(); + prepare_ckb_capacity(&dust_collector_address, 200_0000_0000u64).unwrap(); + prepare_udt_1(200u128, &dust_collector_address).unwrap(); + start_otx_pool(dust_collector_address.clone(), None); + + // check dust collector assets + let payload = GetBalancePayload { + item: JsonItem::Address(dust_collector_address.to_string()), + asset_infos: HashSet::new(), + extra: None, + tip_block_number: None, + }; + let mercury_client = MercuryRpcClient::new(MERCURY_URI.to_string()); + let response = mercury_client.get_balance(payload).unwrap(); + assert_eq!(response.balances.len(), 2); + assert_eq!(200_0000_0000u128, response.balances[0].free.into()); + assert_eq!(142_0000_0000u128, response.balances[0].occupied.into()); + assert_eq!(200u128, response.balances[1].free.into()); + + // build otxs + let alice_otx = build_pay_ckb_otx("alice", 151, 100, 51).unwrap(); + let bob_otx = build_pay_ckb_otx("bob", 202, 200, 2).unwrap(); + + // submit otxs + let service_client = OtxPoolRpcClient::new(OTX_POOL_URI.to_string()); + let _alice_otx_id = service_client + .submit_otx(JsonBytes::from_bytes(alice_otx.as_bytes())) + .unwrap(); + let _bob_otx_id = service_client + .submit_otx(JsonBytes::from_bytes(bob_otx.as_bytes())) + .unwrap(); + + // query otx after a few secs + sleep(Duration::from_secs(12)); + aggregate_transactions_into_blocks().unwrap(); + + let otxs = service_client + .get_pending_sign_otxs(dust_collector_address.to_string()) + .unwrap(); + assert_eq!(otxs.len(), 1); + + // sign + let ckb_tx = if let Ok(tx) = otxs[0].clone().try_into() { + tx + } else { + log::error!("open tx converts to Ckb tx failed."); + return; + }; + let sign_info = SignInfo::new( + &dust_collector_address, + &pk, + CkbConfig::new("ckb_dev", CKB_URI), + ); + let tx_view = sign_info.sign_ckb_tx(ckb_tx).unwrap(); + let otx = tx_view_to_basic_otx(tx_view).unwrap(); + + // send signed tx to otx pool + let ret = service_client.send_signed_otx(otx.clone()); + println!("ret: {:?}", ret); + assert!(ret.is_ok()); + + // query otx after a few secs + sleep(Duration::from_secs(12)); + let otxs = service_client + .get_pending_sign_otxs(dust_collector_address.to_string()) + .unwrap(); + assert_eq!(otxs.len(), 0); + + let status = service_client + .query_otx_status_by_id(otx.get_tx_hash().unwrap()) + .unwrap() + .unwrap(); + assert!(matches!(status, OpenTxStatus::Committed(_))); +} diff --git a/integration-test/src/tests/swap/atomic_swap.rs b/integration-test/src/tests/swap/atomic_swap.rs index 908d30a..62e966c 100644 --- a/integration-test/src/tests/swap/atomic_swap.rs +++ b/integration-test/src/tests/swap/atomic_swap.rs @@ -44,7 +44,7 @@ fn test_swap_udt_to_udt() { // run otx pool let (address, pk) = generate_rand_secp_address_pk_pair(); prepare_ckb_capacity(&address, 200_0000_0000u64).unwrap(); - start_otx_pool(address, pk); + start_otx_pool(address, Some(pk)); // alice build otxs // pay 10 UDT-1, get 10 UDT-2, pay fee 1 CKB diff --git a/otx-format/src/constant/extra_keys.rs b/otx-format/src/constant/extra_keys.rs index 55599ce..5045f7c 100644 --- a/otx-format/src/constant/extra_keys.rs +++ b/otx-format/src/constant/extra_keys.rs @@ -38,3 +38,6 @@ pub const OTX_LOCATING_INPUT_TYPE_HASH_TYPE: u32 = 0x10055; pub const OTX_LOCATING_INPUT_TYPE_ARGS: u32 = 0x10056; pub const OTX_LOCATING_INPUT_TYPE_SCRIPT_HASH: u32 = 0x10057; pub const OTX_LOCATING_INPUT_DATA_HASH: u32 = 0x10058; + +/// Signing (0x10060) +pub const OTX_SIGNING_WITNESS_SIGHASH_ALL_SCRIPT: u32 = 0x10060; diff --git a/otx-format/src/error.rs b/otx-format/src/error.rs index e99017c..94f6bc0 100644 --- a/otx-format/src/error.rs +++ b/otx-format/src/error.rs @@ -25,6 +25,9 @@ pub enum OtxFormatError { #[display(fmt = "locate input cell failed: {}", _0)] LocateInputFailed(String), + + #[display(fmt = "witness index error: {}", _0)] + WitnessIndexError(usize), } impl OtxError for OtxFormatError { @@ -35,6 +38,7 @@ impl OtxError for OtxFormatError { OtxFormatError::OtxMapParseMissingField(_) => -13012, OtxFormatError::OtxMapParseFailed(_) => -13013, OtxFormatError::LocateInputFailed(_) => -13014, + OtxFormatError::WitnessIndexError(_) => -13015, } } diff --git a/otx-format/src/jsonrpc_types/opentx.rs b/otx-format/src/jsonrpc_types/opentx.rs index 64759a0..32f50fe 100644 --- a/otx-format/src/jsonrpc_types/opentx.rs +++ b/otx-format/src/jsonrpc_types/opentx.rs @@ -11,7 +11,7 @@ use crate::constant::extra_keys::{ OTX_ACCOUNTING_META_INPUT_CKB, OTX_ACCOUNTING_META_INPUT_SUDT, OTX_ACCOUNTING_META_INPUT_XUDT, OTX_ACCOUNTING_META_OUTPUT_CKB, OTX_ACCOUNTING_META_OUTPUT_SUDT, OTX_ACCOUNTING_META_OUTPUT_XUDT, OTX_IDENTIFYING_META_AGGREGATE_COUNT, - OTX_IDENTIFYING_META_TX_HASH, + OTX_IDENTIFYING_META_TX_HASH, OTX_SIGNING_WITNESS_SIGHASH_ALL_SCRIPT, }; use crate::error::OtxFormatError; use crate::types::packed::{self, OpenTransactionBuilder, OtxMapBuilder, OtxMapVecBuilder}; @@ -23,7 +23,9 @@ use ckb_jsonrpc_types::{ use ckb_types::bytes::Bytes; use ckb_types::constants::TX_VERSION; use ckb_types::core::{self, ScriptHashType, TransactionBuilder}; -use ckb_types::packed::{Byte32, OutPointBuilder, Uint128, Uint64, WitnessArgs}; +use ckb_types::packed::{ + Byte32, OutPointBuilder, Script as PackedScript, Uint128, Uint64, WitnessArgs, +}; use ckb_types::{self, prelude::*, H256}; use serde::{Deserialize, Serialize}; @@ -76,6 +78,20 @@ impl From for OtxKeyPair { #[derive(Clone, Default, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)] pub struct OtxMapVec(Vec); +impl OtxMapVec { + pub fn push(&mut self, map: OtxMap) { + self.0.push(map) + } + + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + impl IntoIterator for OtxMapVec { type Item = OtxMap; type IntoIter = std::vec::IntoIter; @@ -287,6 +303,38 @@ impl OpenTransaction { s_udt_amount, }) } + + pub fn add_pending_signature_script( + &mut self, + index: usize, + script: PackedScript, + ) -> Result<(), OtxFormatError> { + let witness = OtxKeyPair::new( + OTX_SIGNING_WITNESS_SIGHASH_ALL_SCRIPT.into(), + None, + JsonBytes::from_bytes(script.as_bytes()), + ); + if self.witnesses.len() != index { + return Err(OtxFormatError::WitnessIndexError(index)); + } + let witness: OtxMap = vec![witness].into(); + self.witnesses.push(witness); + Ok(()) + } + + pub fn get_pending_signature_locks(&self) -> Vec<(usize, PackedScript)> { + self.witnesses + .0 + .iter() + .enumerate() + .filter_map(|(index, witness)| { + witness + .get_value(OTX_SIGNING_WITNESS_SIGHASH_ALL_SCRIPT.into(), None) + .and_then(|value: JsonBytes| PackedScript::from_slice(value.as_bytes()).ok()) + .map(|script| (index, script)) + }) + .collect() + } } fn get_value_by_first_element( @@ -424,6 +472,13 @@ impl OtxMap { pub fn push(&mut self, key_pair: OtxKeyPair) { self.0.push(key_pair) } + + pub fn get_value(&self, key_type: Uint32, key_data: Option) -> Option { + self.0 + .iter() + .find(|k| k.key_type == key_type && k.key_data == key_data) + .map(|k| k.value_data.clone()) + } } impl From> for OtxMap { diff --git a/otx-plugin-protocol/src/lib.rs b/otx-plugin-protocol/src/lib.rs index 239d990..343136a 100644 --- a/otx-plugin-protocol/src/lib.rs +++ b/otx-plugin-protocol/src/lib.rs @@ -16,7 +16,8 @@ pub enum MessageFromHost { NewInterval(u64), OtxPoolStart, OtxPoolStop, - CommitOtx(Vec), + CommitOtx((H256, Vec)), + SendTx(OpenTransaction), // Request GetPluginInfo, @@ -33,7 +34,8 @@ impl MessageFromHost { | Self::NewInterval(_) | Self::OtxPoolStart | Self::OtxPoolStop - | Self::CommitOtx(_) => MessageType::Notify, + | Self::CommitOtx(_) + | Self::SendTx(_) => MessageType::Notify, Self::GetPluginInfo | Self::Ok | Self::Error(_) => MessageType::Request, } } diff --git a/otx-pool/src/built_in_plugin/atomic_swap.rs b/otx-pool/src/built_in_plugin/atomic_swap.rs index 63866ad..c864b49 100644 --- a/otx-pool/src/built_in_plugin/atomic_swap.rs +++ b/otx-pool/src/built_in_plugin/atomic_swap.rs @@ -20,8 +20,6 @@ use std::sync::Arc; use std::thread; use std::thread::JoinHandle; -pub const EVERY_INTERVALS: usize = 10; - #[derive(Clone)] struct Context { plugin_name: String, @@ -167,8 +165,7 @@ impl AtomicSwap { match msg { Ok(msg) => { match msg { - (_, MessageFromHost::NewInterval(elapsed)) => { - on_new_intervel(context.clone(), elapsed); + (_, MessageFromHost::NewInterval(_)) => { } (_, MessageFromHost::NewOtx(otx)) => { log::info!("{} receivers msg NewOtx hash: {:?}", @@ -176,8 +173,8 @@ impl AtomicSwap { otx.get_tx_hash().expect("get otx tx hash").to_string()); on_new_open_tx(context.clone(), otx); } - (_, MessageFromHost::CommitOtx(otx_hashes)) => { - on_commit_open_tx(context.clone(), otx_hashes); + (_, MessageFromHost::CommitOtx((tx_hash, otx_hashes))) => { + on_commit_open_tx(context.clone(), tx_hash, otx_hashes); } _ => unreachable!(), } @@ -207,7 +204,9 @@ impl AtomicSwap { } fn on_new_open_tx(context: Context, otx: OpenTransaction) { - log::info!("on_new_open_tx, index otxs count: {:?}", context.otxs.len()); + if !otx.get_pending_signature_locks().is_empty() { + return; + } if let Ok(aggregate_count) = otx.get_aggregate_count() { log::info!("aggregate count: {:?}", aggregate_count); if aggregate_count > 1 { @@ -282,16 +281,11 @@ fn on_new_open_tx(context: Context, otx: OpenTransaction) { vec![pair_tx_hash.to_owned(), otx_hash], tx_hash, )); - if let Some(MessageFromHost::Ok) = Request::call(&context.service_handler, message) { - context.otxs.remove(pair_tx_hash); - context.orders.retain(|_, hashes| { - hashes.remove(pair_tx_hash); - !hashes.is_empty() - }); - } + Request::call(&context.service_handler, message); } } else { context.otxs.insert(otx_hash.clone(), otx); + log::info!("on_new_open_tx, index otxs count: {:?}", context.otxs.len()); context .orders .entry(order_key) @@ -300,7 +294,7 @@ fn on_new_open_tx(context: Context, otx: OpenTransaction) { } } -fn on_commit_open_tx(context: Context, otx_hashes: Vec) { +fn on_commit_open_tx(context: Context, _tx_hash: H256, otx_hashes: Vec) { log::info!( "{} on commit open tx remove committed otx: {:?}", context.plugin_name, @@ -311,17 +305,9 @@ fn on_commit_open_tx(context: Context, otx_hashes: Vec) { ); otx_hashes.iter().for_each(|otx_hash| { context.otxs.remove(otx_hash); + context.orders.retain(|_, hashes| { + hashes.remove(otx_hash); + !hashes.is_empty() + }); }) } - -fn on_new_intervel(context: Context, elapsed: u64) { - if elapsed % EVERY_INTERVALS as u64 != 0 || context.otxs.len() <= 1 { - return; - } - - log::info!( - "on new {} intervals otx set len: {:?}", - EVERY_INTERVALS, - context.otxs.len() - ); -} diff --git a/otx-pool/src/built_in_plugin/dust_collector.rs b/otx-pool/src/built_in_plugin/dust_collector.rs index a146b51..791ad4c 100644 --- a/otx-pool/src/built_in_plugin/dust_collector.rs +++ b/otx-pool/src/built_in_plugin/dust_collector.rs @@ -166,8 +166,8 @@ impl DustCollector { otx.get_tx_hash().expect("get tx hash")); on_new_open_tx(context.clone(), otx); } - (_, MessageFromHost::CommitOtx(otx_hashes)) => { - on_commit_open_tx(context.clone(), otx_hashes); + (_, MessageFromHost::CommitOtx((tx_hash, otx_hashes))) => { + on_commit_open_tx(context.clone(), tx_hash, otx_hashes); } _ => unreachable!(), } @@ -197,7 +197,9 @@ impl DustCollector { } fn on_new_open_tx(context: Context, otx: OpenTransaction) { - log::info!("on_new_open_tx, index otxs count: {:?}", context.otxs.len()); + if !otx.get_pending_signature_locks().is_empty() { + return; + } if let Ok(aggregate_count) = otx.get_aggregate_count() { log::info!("aggregate count: {:?}", aggregate_count); if aggregate_count > 1 { @@ -217,9 +219,10 @@ fn on_new_open_tx(context: Context, otx: OpenTransaction) { }; let otx_hash = otx.get_tx_hash().expect("get tx hash"); context.otxs.insert(otx_hash, otx); + log::info!("on_new_open_tx, index otxs count: {:?}", context.otxs.len()); } -fn on_commit_open_tx(context: Context, otx_hashes: Vec) { +fn on_commit_open_tx(context: Context, _tx_hash: H256, otx_hashes: Vec) { log::info!( "{} on commit open tx remove committed otx: {:?}", context.plugin_name, diff --git a/otx-pool/src/built_in_plugin/mod.rs b/otx-pool/src/built_in_plugin/mod.rs index 983f9bb..d70741c 100644 --- a/otx-pool/src/built_in_plugin/mod.rs +++ b/otx-pool/src/built_in_plugin/mod.rs @@ -4,4 +4,5 @@ mod signer; pub use atomic_swap::AtomicSwap; pub use dust_collector::DustCollector; +pub use signer::rpc::SignerRpc; pub use signer::Signer; diff --git a/otx-pool/src/built_in_plugin/signer.rs b/otx-pool/src/built_in_plugin/signer/mod.rs similarity index 61% rename from otx-pool/src/built_in_plugin/signer.rs rename to otx-pool/src/built_in_plugin/signer/mod.rs index 829c265..a9aaff0 100644 --- a/otx-pool/src/built_in_plugin/signer.rs +++ b/otx-pool/src/built_in_plugin/signer/mod.rs @@ -1,3 +1,5 @@ +pub mod rpc; + use crate::plugin::host_service::ServiceHandler; use crate::plugin::plugin_proxy::{MsgHandler, PluginState, RequestHandler}; use crate::plugin::Plugin; @@ -8,12 +10,14 @@ use utils::aggregator::{Committer, SignInfo}; use utils::config::{built_in_plugins::SignerConfig, CkbConfig, ScriptConfig}; use anyhow::{anyhow, Result}; -use ckb_sdk_open_tx::types::Address; +use ckb_jsonrpc_types::Script; +use ckb_sdk::Address; use ckb_types::core::service::Request; -use ckb_types::H256; +use ckb_types::{packed, H256}; use crossbeam_channel::{bounded, select, unbounded}; use dashmap::DashMap; +use std::collections::HashSet; use std::env; use std::path::PathBuf; use std::sync::Arc; @@ -23,8 +27,9 @@ use std::thread::JoinHandle; #[derive(Clone)] struct Context { plugin_name: String, - otxs: Arc>, - sign_info: SignInfo, + otxs: DashMap, + indexed_otxs_by_lock: DashMap>, + sign_info: Option, ckb_config: CkbConfig, _script_config: ScriptConfig, service_handler: ServiceHandler, @@ -33,14 +38,15 @@ struct Context { impl Context { fn new( plugin_name: &str, - sign_info: SignInfo, + sign_info: Option, ckb_config: CkbConfig, script_config: ScriptConfig, service_handler: ServiceHandler, ) -> Self { Context { plugin_name: plugin_name.to_owned(), - otxs: Arc::new(DashMap::new()), + otxs: DashMap::new(), + indexed_otxs_by_lock: DashMap::new(), sign_info, ckb_config, _script_config: script_config, @@ -52,6 +58,7 @@ impl Context { pub struct Signer { state: PluginState, info: PluginInfo, + context: Arc, /// Send request to plugin thread, and expect a response. request_handler: RequestHandler, @@ -62,7 +69,7 @@ pub struct Signer { _thread: JoinHandle<()>, } -impl Plugin for Signer { +impl Plugin for Arc { fn get_name(&self) -> String { self.info.name.clone() } @@ -98,21 +105,33 @@ impl Signer { "This plugin indexes OTXs that are waiting to be signed and enables them to be signed using a hosted private key.", "1.0", ); - let key = env::var(config.get_env_key_name())?.parse::()?; - let address = env::var(config.get_env_default_address())? - .parse::
() - .map_err(|e| anyhow!(e))?; - let (msg_handler, request_handler, thread) = Signer::start_process(Context::new( + let address = env::var(config.get_env_default_address()); + let key = env::var(config.get_env_key_name()); + let sign_info = if address.is_ok() && key.is_ok() { + Some(SignInfo::new( + &address? + .parse::() + .map_err(|e| anyhow!(e))?, + &key?.parse::()?, + ckb_config.clone(), + )) + } else { + None + }; + + let context = Arc::new(Context::new( name, - SignInfo::new(&address, &key, ckb_config.clone()), + sign_info, ckb_config, script_config, service_handler, - ))?; + )); + let (msg_handler, request_handler, thread) = Signer::start_process(context.clone())?; Ok(Signer { state, info, + context, msg_handler, request_handler, _thread: thread, @@ -121,7 +140,22 @@ impl Signer { } impl Signer { - fn start_process(context: Context) -> Result<(MsgHandler, RequestHandler, JoinHandle<()>)> { + fn get_index_sign_otxs(&self, address: Address) -> Vec { + let script: packed::Script = (&address).into(); + if let Some(otx_hashes) = self.context.indexed_otxs_by_lock.get(&script.into()) { + otx_hashes + .iter() + .filter_map(|hash| self.context.otxs.get(hash)) + .map(|otx| otx.value().clone()) + .collect() + } else { + vec![] + } + } + + fn start_process( + context: Arc, + ) -> Result<(MsgHandler, RequestHandler, JoinHandle<()>)> { // the host request channel receives request from host to plugin let (host_request_sender, host_request_receiver) = bounded(1); // the channel sends notifications or responses from the host to plugin @@ -159,8 +193,11 @@ impl Signer { otx.get_tx_hash().expect("get tx hash")); on_new_open_tx(context.clone(), otx); } - (_, MessageFromHost::CommitOtx(otx_hashes)) => { - on_commit_open_tx(context.clone(), otx_hashes); + (_, MessageFromHost::CommitOtx((tx_hash, otx_hashes))) => { + on_commit_open_tx(context.clone(), tx_hash, otx_hashes); + } + (_, MessageFromHost::SendTx(otx)) => { + on_send_tx(context.clone(), otx); } _ => unreachable!(), } @@ -189,31 +226,50 @@ impl Signer { } } -fn on_new_open_tx(context: Context, otx: OpenTransaction) { - log::info!("on_new_open_tx, index otxs count: {:?}", context.otxs.len()); - if let Ok(aggregate_count) = otx.get_aggregate_count() { - log::info!("aggregate count: {:?}", aggregate_count); - if aggregate_count == 1 { - return; - } +fn on_new_open_tx(context: Arc, otx: OpenTransaction) { + let lock_scripts = otx.get_pending_signature_locks(); + if lock_scripts.is_empty() { + return; } + let otx_hash = otx.get_tx_hash().expect("get tx hash"); - context.otxs.insert(otx_hash, otx.clone()); + // index pending signature otx + // when the hosted private key cannot be signed + if context.sign_info.is_none() + || lock_scripts + .iter() + .any(|(_, script)| script != &context.sign_info.clone().unwrap().lock_script()) + { + // index otx + context.otxs.insert(otx_hash.clone(), otx); + log::info!("on_new_open_tx, index otxs count: {:?}", context.otxs.len()); + + // index lock scripts + lock_scripts.into_iter().for_each(|(_, script)| { + context + .indexed_otxs_by_lock + .entry(script.into()) + .or_insert_with(HashSet::new) + .insert(otx_hash.clone()); + }); + return; + } + + // signing with a hosted private key let ckb_tx = if let Ok(tx) = otx.try_into() { tx } else { log::error!("open tx converts to Ckb tx failed."); return; }; - - // sign - let signer = SignInfo::new( - context.sign_info.secp_address(), - context.sign_info.privkey(), - context.ckb_config.clone(), - ); - let signed_ckb_tx = signer.sign_ckb_tx(ckb_tx).unwrap(); + let signed_ckb_tx = + if let Ok(signed_ckb_tx) = context.sign_info.clone().unwrap().sign_ckb_tx(ckb_tx) { + signed_ckb_tx + } else { + log::error!("sign open tx failed."); + return; + }; // send_ckb let committer = Committer::new(context.ckb_config.get_ckb_uri()); @@ -227,21 +283,41 @@ fn on_new_open_tx(context: Context, otx: OpenTransaction) { // call host service to notify the host that the final tx has been sent let message = MessageFromPlugin::SentToCkb(tx_hash); - if let Some(MessageFromHost::Ok) = Request::call(&context.service_handler, message) { - context.otxs.clear(); - } + Request::call(&context.service_handler, message); } -fn on_commit_open_tx(context: Context, otx_hashes: Vec) { +fn on_commit_open_tx(context: Arc, tx_hash: H256, _otx_hashes: Vec) { log::info!( - "{} on commit open tx remove committed otx: {:?}", + "{} on commit open tx remove committed tx: {:?}", context.plugin_name, - otx_hashes - .iter() - .map(|hash| hash.to_string()) - .collect::>() + tx_hash ); - otx_hashes.iter().for_each(|otx_hash| { - context.otxs.remove(otx_hash); - }) + context.indexed_otxs_by_lock.retain(|_, hashes| { + hashes.remove(&tx_hash); + !hashes.is_empty() + }); + context.otxs.remove(&tx_hash); +} + +fn on_send_tx(context: Arc, otx: OpenTransaction) { + let ckb_tx = if let Ok(tx) = otx.try_into() { + tx + } else { + log::error!("open tx converts to Ckb tx failed."); + return; + }; + + // send_ckb + let committer = Committer::new(context.ckb_config.get_ckb_uri()); + let tx_hash = if let Ok(tx_hash) = committer.send_tx(ckb_tx) { + tx_hash + } else { + log::error!("failed to send final tx."); + return; + }; + log::info!("commit final Ckb tx: {:?}", tx_hash.to_string()); + + // call host service to notify the host that the final tx has been sent + let message = MessageFromPlugin::SentToCkb(tx_hash); + Request::call(&context.service_handler, message); } diff --git a/otx-pool/src/built_in_plugin/signer/rpc.rs b/otx-pool/src/built_in_plugin/signer/rpc.rs new file mode 100644 index 0000000..639cb1e --- /dev/null +++ b/otx-pool/src/built_in_plugin/signer/rpc.rs @@ -0,0 +1,43 @@ +use super::Signer; +use crate::error::{OtxPoolError, OtxRpcError}; +use crate::plugin::Plugin; + +use otx_format::jsonrpc_types::OpenTransaction; +use otx_plugin_protocol::{MessageFromHost, PluginInfo}; + +use ckb_sdk::Address; +use jsonrpc_core::Result as RpcResult; +use jsonrpc_derive::rpc; + +use std::{str::FromStr, sync::Arc}; + +#[rpc(server)] +pub trait SignerRpc { + #[rpc(name = "get_signer_info")] + fn get_signer_info(&self) -> RpcResult; + + #[rpc(name = "get_pending_sign_otxs")] + fn get_pending_sign_otxs(&self, address: String) -> RpcResult>; + + #[rpc(name = "send_signed_otx")] + fn send_signed_otx(&self, otx: OpenTransaction) -> RpcResult<()>; +} + +impl SignerRpc for Arc { + fn get_signer_info(&self) -> RpcResult { + let plugin_info = self.get_info(); + Ok(plugin_info) + } + + fn get_pending_sign_otxs(&self, address: String) -> RpcResult> { + let address = Address::from_str(&address) + .map_err(OtxPoolError::RpcParamParseError) + .map_err(Into::::into)?; + Ok(self.get_index_sign_otxs(address)) + } + + fn send_signed_otx(&self, otx: OpenTransaction) -> RpcResult<()> { + let _ = self.msg_handler.send((0, MessageFromHost::SendTx(otx))); + Ok(()) + } +} diff --git a/otx-pool/src/error.rs b/otx-pool/src/error.rs index c4c305b..7d185f3 100644 --- a/otx-pool/src/error.rs +++ b/otx-pool/src/error.rs @@ -1,3 +1,4 @@ +use ckb_types::H256; use otx_format::error::{OtxError, OtxFormatError}; use anyhow::Result; @@ -35,16 +36,30 @@ impl From for OtxRpcError { } } +impl From for OtxRpcError { + fn from(err: OtxPoolError) -> Self { + OtxRpcError(Box::new(err)) + } +} + #[derive(Serialize, Deserialize, Clone, Debug, Display, Hash, PartialEq, Eq)] pub enum OtxPoolError { #[display(fmt = "Otx already exists")] OtxAlreadyExists, + + #[display(fmt = "RPC parameter parsing error: {}", _0)] + RpcParamParseError(String), + + #[display(fmt = "RPC parameter parsing error: {}", _0)] + TxNotFound(H256), } impl OtxError for OtxPoolError { fn err_code(&self) -> i64 { match self { OtxPoolError::OtxAlreadyExists => -13100, + OtxPoolError::RpcParamParseError(_) => -13101, + OtxPoolError::TxNotFound(_) => -13102, } } diff --git a/otx-pool/src/notify.rs b/otx-pool/src/notify.rs index 558a158..c3dcbcb 100644 --- a/otx-pool/src/notify.rs +++ b/otx-pool/src/notify.rs @@ -44,8 +44,8 @@ pub struct NotifyController { stop: StopHandler<()>, new_open_tx_register: NotifyRegister, new_open_tx_notifier: Sender, - commit_open_tx_register: NotifyRegister>, - commit_open_tx_notifier: Sender>, + commit_open_tx_register: NotifyRegister<(H256, Vec)>, + commit_open_tx_notifier: Sender<(H256, Vec)>, interval_register: NotifyRegister, interval_notifier: Sender, start_register: NotifyRegister<()>, @@ -63,7 +63,7 @@ impl Drop for NotifyController { pub struct NotifyService { new_open_tx_subscribers: HashMap>, - commit_open_tx_subscribers: HashMap>>, + commit_open_tx_subscribers: HashMap)>>, interval_subscribers: HashMap>, start_subscribers: HashMap>, stop_subscribers: HashMap>, @@ -169,7 +169,10 @@ impl NotifyService { } } - fn handle_register_commit_open_tx(&mut self, msg: Request>>) { + fn handle_register_commit_open_tx( + &mut self, + msg: Request)>>, + ) { let Request { responder, arguments: name, @@ -180,11 +183,11 @@ impl NotifyService { let _ = responder.send(receiver); } - async fn handle_notify_commit_open_tx(&mut self, otx_hashes: Vec) { - log::trace!("event commit open tx {:?}", otx_hashes); + async fn handle_notify_commit_open_tx(&mut self, otxs: (H256, Vec)) { + log::trace!("event commit open tx {:?}", otxs); // notify all subscribers for subscriber in self.commit_open_tx_subscribers.values() { - let _ = subscriber.send(otx_hashes.clone()).await; + let _ = subscriber.send(otxs.clone()).await; } } @@ -260,16 +263,19 @@ impl NotifyController { }); } - pub async fn subscribe_commit_open_tx(&self, name: S) -> Receiver> { + pub async fn subscribe_commit_open_tx( + &self, + name: S, + ) -> Receiver<(H256, Vec)> { Request::call(&self.commit_open_tx_register, name.to_string()) .await .expect("Subscribe commit open tx should be OK") } - pub fn notify_commit_open_tx(&self, otx_hashes: Vec) { + pub fn notify_commit_open_tx(&self, tx_hash: H256, otx_hashes: Vec) { let commit_open_tx_notifier = self.commit_open_tx_notifier.clone(); self.handle.spawn(async move { - let _ = commit_open_tx_notifier.send(otx_hashes).await; + let _ = commit_open_tx_notifier.send((tx_hash, otx_hashes)).await; }); } diff --git a/otx-pool/src/plugin/host_service.rs b/otx-pool/src/plugin/host_service.rs index c5bfca9..4097484 100644 --- a/otx-pool/src/plugin/host_service.rs +++ b/otx-pool/src/plugin/host_service.rs @@ -164,7 +164,7 @@ impl HostServiceProvider { &final_otx_hash, OpenTxStatus::Committed(final_otx_hash.clone()), ); - notify_ctrl.notify_commit_open_tx(otx_hashes.clone()); + notify_ctrl.notify_commit_open_tx(final_otx_hash.clone(), otx_hashes.clone()); otx_pool.insert_sent_tx(final_otx_hash, otx_hashes); } @@ -186,7 +186,7 @@ impl HostServiceProvider { for otx_hash in otx_hashes.iter() { otx_pool.update_otx_status(otx_hash, OpenTxStatus::Committed(tx_hash.clone())); } - notify_ctrl.notify_commit_open_tx(otx_hashes.clone()); + notify_ctrl.notify_commit_open_tx(tx_hash.clone(), otx_hashes.clone()); otx_pool.insert_sent_tx(tx_hash, otx_hashes); } } diff --git a/src/main.rs b/src/main.rs index af1beeb..7ff7bf8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use otx_pool::{ - built_in_plugin::{AtomicSwap, DustCollector, Signer}, + built_in_plugin::{AtomicSwap, DustCollector, Signer, SignerRpc}, cli::print_logo, notify::{NotifyController, NotifyService}, plugin::host_service::HostServiceProvider, @@ -87,12 +87,21 @@ pub fn start(config: AppConfig) -> Result<()> { // otx pool let otx_pool = Arc::new(OtxPool::new(notify_ctrl.clone())); - // init host service + // init host service for plugins let service_provider = HostServiceProvider::start(notify_ctrl.clone(), otx_pool.clone()) .map_err(|err| anyhow!(err))?; + // init rpc io handler + let mut io_handler = IoHandler::new(); + // init plugins - let plugin_manager = init_plugins(&service_provider, &config, &runtime_handle, ¬ify_ctrl)?; + let plugin_manager = init_plugins( + &service_provider, + &config, + &runtime_handle, + ¬ify_ctrl, + &mut io_handler, + )?; // display all names of plugins let plugins = plugin_manager.plugin_configs(); @@ -101,9 +110,8 @@ pub fn start(config: AppConfig) -> Result<()> { .iter() .for_each(|(_, plugin)| log::info!("plugin name: {:?}", plugin.1.name)); - // init otx pool rpc + // new otx pool rpc let rpc_impl = OtxPoolRpcImpl::new(otx_pool); - let mut io_handler = IoHandler::new(); io_handler.extend_with(rpc_impl.to_delegate()); // start rpc server @@ -142,6 +150,7 @@ fn init_plugins( config: &AppConfig, runtime_handle: &Handle, notify_ctrl: &NotifyController, + io_handler: &mut IoHandler, ) -> Result { // create plugin manager let mut plugin_manager = @@ -172,13 +181,16 @@ fn init_plugins( // init built-in plugins if config.get_signer_config().is_enabled() { - let signer = Signer::new( - service_provider.handler(), - config.get_signer_config(), - config.get_ckb_config(), - config.get_script_config(), - ) - .map_err(|err| anyhow!(err))?; + let signer = Arc::new( + Signer::new( + service_provider.handler(), + config.get_signer_config(), + config.get_ckb_config(), + config.get_script_config(), + ) + .map_err(|err| anyhow!(err))?, + ); + io_handler.extend_with(signer.clone().to_delegate()); plugin_manager.register_built_in_plugins(Box::new(signer)); } diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 53ca14b..19c4899 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -24,3 +24,4 @@ serde_json = "1.0" toml = "0.5" otx-format = { path = "../otx-format" } +otx-plugin-protocol = { path = "../otx-plugin-protocol" } diff --git a/utils/src/aggregator.rs b/utils/src/aggregator.rs index fc41d83..953c80d 100644 --- a/utils/src/aggregator.rs +++ b/utils/src/aggregator.rs @@ -63,26 +63,30 @@ impl OtxAggregator { let ckb_tx = open_tx .try_into() .map_err(|_| anyhow!("open tx convert to ckb tx"))?; - let tx_info = self.tx_builder.add_input( + let tx_view = self.tx_builder.add_input( ckb_tx, input.tx_hash, std::convert::Into::::into(input.index) as usize, )?; + let input_index = tx_view.inner.inputs.len() - 1; let ckb_tx = self.tx_builder.add_output( - tx_info, + tx_view, output_address, output_amout.capacity, output_amout.udt_amount, udt_issuer_script, )?; - tx_view_to_otx( + let mut otx = tx_view_to_otx( ckb_tx, self.script_config.get_xudt_rce_code_hash(), self.script_config.get_sudt_code_hash(), aggregate_count, self.ckb_config.get_ckb_uri(), ) - .map_err(|err| anyhow!(err.to_string())) + .map_err(|err| anyhow!(err.to_string()))?; + otx.add_pending_signature_script(input_index, output_address.into()) + .map_err(|err| anyhow!(err.to_string()))?; + Ok(otx) } pub fn merge_otxs(&self, otx_list: Vec) -> Result { @@ -211,6 +215,10 @@ impl SignInfo { &self.secp_address } + pub fn lock_script(&self) -> Script { + self.secp_address.payload().into() + } + pub fn privkey(&self) -> &H256 { &self.pk } diff --git a/utils/src/client/mod.rs b/utils/src/client/mod.rs index 72309bc..36088f0 100644 --- a/utils/src/client/mod.rs +++ b/utils/src/client/mod.rs @@ -41,7 +41,6 @@ pub fn request( ) -> Result { let request = build_request(method, params)?; let response = client.rpc_exec(&request)?; - println!("{:?}", response); handle_response(response) } diff --git a/utils/src/client/otx_pool_client.rs b/utils/src/client/otx_pool_client.rs index 9b772a0..4b40c38 100644 --- a/utils/src/client/otx_pool_client.rs +++ b/utils/src/client/otx_pool_client.rs @@ -1,6 +1,8 @@ use super::{request, RpcClient}; +use otx_format::jsonrpc_types::OpenTransaction; use otx_format::types::OpenTxStatus; +use otx_plugin_protocol::PluginInfo; use anyhow::Result; use ckb_jsonrpc_types::JsonBytes; @@ -23,4 +25,16 @@ impl OtxPoolRpcClient { pub fn query_otx_status_by_id(&self, otx: H256) -> Result> { request(&self.client, "query_otx_status_by_id", vec![otx]) } + + pub fn get_signer_info(&self) -> Result { + request(&self.client, "get_signer_info", ()) + } + + pub fn get_pending_sign_otxs(&self, address: String) -> Result> { + request(&self.client, "get_pending_sign_otxs", vec![address]) + } + + pub fn send_signed_otx(&self, otx: OpenTransaction) -> Result<()> { + request(&self.client, "send_signed_otx", vec![otx]) + } }