diff --git a/Cargo.lock b/Cargo.lock index 475d473b5d..f501861f25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -828,6 +828,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bnum" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29ed1ec45f6ef6e8d1125cc2c2fec8f8fe7d4fa5b262f15885fdccb9e26f0f15" +dependencies = [ + "num-traits", +] + [[package]] name = "bs58" version = "0.5.0" @@ -984,14 +993,36 @@ dependencies = [ "libc", ] +[[package]] +name = "ccp-randomx-types" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3769b7222a8e4cf35252235fe7d7be66328b5440687275ee592dbdb04b7efd7" +dependencies = [ + "hex", + "serde", +] + +[[package]] +name = "ccp-rpc-client" +version = "0.1.0" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#5de05462da4444f7fd3633fecaecd82c854a1618" +dependencies = [ + "ccp-shared", + "hex", + "jsonrpsee", + "serde", +] + [[package]] name = "ccp-shared" version = "0.1.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#86b55795bfb1bdea85cd312606eaadd0b2a4cfcd" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#5de05462da4444f7fd3633fecaecd82c854a1618" dependencies = [ + "ccp-randomx-types", "cpu-utils", "hex", - "randomx-rust-wrapper", + "rand 0.8.5", "serde", ] @@ -1025,11 +1056,37 @@ dependencies = [ "zeroize", ] +[[package]] +name = "chain-connector" +version = "0.1.0" +dependencies = [ + "ccp-shared", + "chain-data", + "chain-types", + "clarity", + "ethabi", + "eyre", + "fluence-libp2p", + "futures", + "hex", + "hex-utils", + "jsonrpsee", + "mockito", + "particle-args", + "particle-builtins", + "particle-execution", + "serde_json", + "server-config", + "thiserror", + "tokio", +] + [[package]] name = "chain-data" version = "0.1.0" dependencies = [ "ethabi", + "eyre", "hex", "hex-utils", "libp2p-identity", @@ -1042,11 +1099,18 @@ dependencies = [ name = "chain-listener" version = "0.1.0" dependencies = [ + "ccp-rpc-client", + "ccp-shared", + "chain-connector", "chain-data", + "chain-types", + "core-manager", + "cpu-utils", "ethabi", "eyre", "fluence-libp2p", "fs-utils", + "futures", "hex", "hex-utils", "jsonrpsee", @@ -1059,10 +1123,24 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "tracing", "types", ] +[[package]] +name = "chain-types" +version = "0.1.0" +dependencies = [ + "ccp-shared", + "chain-data", + "ethabi", + "eyre", + "hex", + "serde", + "tokio", +] + [[package]] name = "chrono" version = "0.4.33" @@ -1183,12 +1261,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" [[package]] -name = "cmake" -version = "0.1.50" +name = "clarity" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +checksum = "273c656822c0367587c0fc6ae5a0bf6e19a10d700cd49168a084975b7f84dcfe" dependencies = [ - "cc", + "num-traits", + "num256", + "secp256k1", + "serde", + "serde_derive", + "sha3", ] [[package]] @@ -1417,7 +1500,7 @@ dependencies = [ [[package]] name = "cpu-utils" version = "0.1.0" -source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#86b55795bfb1bdea85cd312606eaadd0b2a4cfcd" +source = "git+https://github.com/fluencelabs/capacity-commitment-prover/?branch=main#5de05462da4444f7fd3633fecaecd82c854a1618" dependencies = [ "core_affinity", "hwlocality", @@ -1770,9 +1853,9 @@ dependencies = [ [[package]] name = "decider-distro" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03a8de76f4a45b6c3199a96f81ea6e6cb84fa814b2dd3dbed2cdf5583b3f53ab" +checksum = "67ef66cee9edb0c9a9b7c1a0bc2646d3e72ac6d884cc889a2a77fae9f3b478d5" dependencies = [ "built 0.7.1", "fluence-spell-dtos", @@ -2476,6 +2559,10 @@ name = "futures-timer" version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" +dependencies = [ + "gloo-timers", + "send_wrapper", +] [[package]] name = "futures-util" @@ -2582,6 +2669,52 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-net" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43aaa242d1239a8822c15c645f02166398da4f8b5c4bae795c1f5b44e9eee173" +dependencies = [ + "futures-channel", + "futures-core", + "futures-sink", + "gloo-utils", + "http 0.2.11", + "js-sys", + "pin-project", + "serde", + "serde_json", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "gloo-utils" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5555354113b18c547c1d3a98fbf7fb32a9ff4f6fa112ce823a21641a0ba3aa" +dependencies = [ + "js-sys", + "serde", + "serde_json", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "h2" version = "0.3.22" @@ -3399,11 +3532,13 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9579d0ca9fb30da026bac2f0f7d9576ec93489aeb7cd4971dd5b4617d82c79b2" dependencies = [ + "jsonrpsee-client-transport", "jsonrpsee-core", "jsonrpsee-http-client", "jsonrpsee-proc-macros", "jsonrpsee-server", "jsonrpsee-types", + "jsonrpsee-wasm-client", "jsonrpsee-ws-client", "tokio", "tracing", @@ -3415,7 +3550,9 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9f9ed46590a8d5681975f126e22531698211b926129a40a2db47cbca429220" dependencies = [ + "futures-channel", "futures-util", + "gloo-net", "http 0.2.11", "jsonrpsee-core", "pin-project", @@ -3428,6 +3565,7 @@ dependencies = [ "tokio-util", "tracing", "url", + "webpki-roots 0.26.1", ] [[package]] @@ -3454,6 +3592,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "wasm-bindgen-futures", ] [[package]] @@ -3526,6 +3665,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jsonrpsee-wasm-client" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30f36d27503d0efc0355c1630b74ecfb367050847bf7241a0ed75fab6dfa96c0" +dependencies = [ + "jsonrpsee-client-transport", + "jsonrpsee-core", + "jsonrpsee-types", +] + [[package]] name = "jsonrpsee-ws-client" version = "0.21.0" @@ -4096,7 +4246,7 @@ dependencies = [ "soketto", "tracing", "url", - "webpki-roots", + "webpki-roots 0.25.2", ] [[package]] @@ -5210,6 +5360,7 @@ dependencies = [ "base64 0.21.7", "blake3", "bs58", + "chain-connector", "chain-listener", "config", "config-utils", @@ -5227,6 +5378,7 @@ dependencies = [ "health", "humantime-serde", "itertools 0.12.1", + "jsonrpsee", "kademlia", "libp2p", "libp2p-connection-limits", @@ -5275,6 +5427,7 @@ dependencies = [ "base64 0.21.7", "blake3", "bs58", + "clarity", "connected-client", "connection-pool", "control-macro", @@ -5368,6 +5521,17 @@ dependencies = [ "libm", ] +[[package]] +name = "num256" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27dd9df2c73f8724f4f6930882c127f042656bc84e6000fe9f6026bb43fe1a57" +dependencies = [ + "bnum", + "num-traits", + "serde", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -6394,20 +6558,6 @@ dependencies = [ "rand_core 0.6.4", ] -[[package]] -name = "randomx-rust-wrapper" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cf0b5600b179645ca4f5b3d12c4872064ceb0eaeff5dd36e8a843aa7ff7e901" -dependencies = [ - "bitflags 2.4.1", - "cmake", - "hex", - "libc", - "serde", - "thiserror", -] - [[package]] name = "range-set-blaze" version = "0.1.14" @@ -6934,6 +7084,24 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "secp256k1" +version = "0.28.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d24b59d129cdadea20aea4fb2352fa053712e5d713eee47d700cd4b2bc002f10" +dependencies = [ + "secp256k1-sys", +] + +[[package]] +name = "secp256k1-sys" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d1746aae42c19d583c3c1a8c646bfad910498e2051c551a7f2e3c0c9fbb7eb" +dependencies = [ + "cc", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -6972,6 +7140,12 @@ dependencies = [ "serde", ] +[[package]] +name = "send_wrapper" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" + [[package]] name = "serde" version = "1.0.196" @@ -7111,6 +7285,7 @@ dependencies = [ "bs58", "bytesize", "clap 4.4.18", + "clarity", "config", "config-utils", "core-manager", @@ -9047,6 +9222,15 @@ version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" +[[package]] +name = "webpki-roots" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "widestring" version = "1.0.2" diff --git a/Cargo.toml b/Cargo.toml index 8dc067b34c..26a9957ce5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,10 +44,13 @@ members = [ "particle-execution", "crates/system-services", "crates/chain-listener", + "crates/chain-connector", "crates/hex-utils", "crates/chain-data", + "crates/chain-types", "crates/types", - "crates/core-manager"] + "crates/core-manager" +] exclude = [ "nox/tests/tetraplets", ] @@ -97,6 +100,8 @@ subnet-resolver = { path = "crates/subnet-resolver" } hex-utils = { path = "crates/hex-utils" } chain-data = { path = "crates/chain-data" } chain-listener = { path = "crates/chain-listener" } +chain-connector = { path = "crates/chain-connector" } +chain-types = { path = "crates/chain-types" } types = { path = "crates/types" } core-manager = { path = "crates/core-manager" } @@ -165,6 +170,12 @@ futures-util = "0.3.30" num_cpus = "1.16.0" enum_dispatch = "0.3.12" serde_with = "3.6.0" +mockito = "1.2.0" +clarity = "1.3.0" +cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" } +ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" } +ccp-rpc-client = { git = "https://github.com/fluencelabs/capacity-commitment-prover.git", branch = "main" } + # Enable a small amount of optimization in debug mode [profile.dev] diff --git a/crates/chain-connector/Cargo.toml b/crates/chain-connector/Cargo.toml new file mode 100644 index 0000000000..24ab06dcc8 --- /dev/null +++ b/crates/chain-connector/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "chain-connector" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +particle-builtins = { workspace = true } +particle-execution = { workspace = true } +particle-args = { workspace = true } +chain-data = { workspace = true } +chain-types = { workspace = true } +ethabi = { workspace = true } +jsonrpsee = { workspace = true, features = ["macros", "server", "client"] } +eyre = { workspace = true } +fluence-libp2p = { workspace = true } +serde_json = { workspace = true } +hex = { workspace = true } +server-config = { workspace = true } +clarity = { workspace = true } +tokio = { workspace = true, features = ["rt", "macros"] } +hex-utils = { workspace = true } +futures = { workspace = true } +ccp-shared = { workspace = true } +thiserror = { workspace = true } + +[dev-dependencies] +mockito = { workspace = true } diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs new file mode 100644 index 0000000000..5bf7e52c5b --- /dev/null +++ b/crates/chain-connector/src/connector.rs @@ -0,0 +1,765 @@ +use crate::error::{process_response, ConnectorError}; +use crate::function::{GetCommitmentFunction, GetStatusFunction, SubmitProofFunction}; +use crate::ConnectorError::InvalidBaseFeePerGas; +use crate::{ + CurrentEpochFunction, DifficultyFunction, EpochDurationFunction, GetComputePeerFunction, + GetComputeUnitsFunction, GetGlobalNonceFunction, InitTimestampFunction, +}; +use ccp_shared::proof::CCProof; +use ccp_shared::types::{Difficulty, GlobalNonce}; +use chain_data::ChainDataError::InvalidTokenSize; +use chain_data::{next_opt, parse_chain_data, peer_id_to_bytes, ChainFunction}; +use chain_types::{Commitment, CommitmentId, CommitmentStatus, ComputePeer, ComputeUnit}; +use clarity::Transaction; +use ethabi::ethereum_types::U256; +use ethabi::Token; +use eyre::eyre; +use fluence_libp2p::PeerId; +use futures::FutureExt; +use jsonrpsee::core::client::{BatchResponse, ClientT}; +use jsonrpsee::core::params::{ArrayParams, BatchRequestBuilder}; +use jsonrpsee::http_client::HttpClientBuilder; +use jsonrpsee::rpc_params; +use particle_args::{Args, JError}; +use particle_builtins::{wrap, CustomService}; +use particle_execution::{ParticleParams, ServiceFunction}; +use serde_json::Value as JValue; +use serde_json::{json, Value}; +use server_config::ChainConfig; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; + +const BASE_FEE_MULTIPLIER: f64 = 0.125; +pub struct ChainConnector { + client: Arc, + config: ChainConfig, + tx_nonce_mutex: Arc>, + host_id: PeerId, +} + +pub struct CCInitParams { + pub difficulty: Difficulty, + pub init_timestamp: U256, + pub global_nonce: GlobalNonce, + pub current_epoch: U256, + pub epoch_duration: U256, +} + +impl ChainConnector { + pub fn new( + config: ChainConfig, + host_id: PeerId, + ) -> eyre::Result<(Arc, HashMap)> { + let connector = Arc::new(Self { + client: Arc::new(HttpClientBuilder::default().build(&config.http_endpoint)?), + config, + tx_nonce_mutex: Arc::new(Default::default()), + host_id, + }); + + let builtins = Self::make_connector_builtins(connector.clone()); + Ok((connector, builtins)) + } + + fn make_connector_builtins(connector: Arc) -> HashMap { + let mut builtins = HashMap::new(); + builtins.insert( + "connector".to_string(), + CustomService::new( + vec![("send_tx", Self::make_send_tx_closure(connector.clone()))], + None, + ), + ); + builtins + } + + fn make_send_tx_closure(connector: Arc) -> ServiceFunction { + ServiceFunction::Immut(Box::new(move |args, params| { + let connector = connector.clone(); + async move { wrap(connector.send_tx_builtin(args, params).await) }.boxed() + })) + } + + async fn send_tx_builtin(&self, args: Args, params: ParticleParams) -> Result { + if params.init_peer_id != self.host_id { + return Err(JError::new("Only the root worker can send transactions")); + } + + let mut args = args.function_args.into_iter(); + let data: Vec = Args::next("data", &mut args)?; + let to: String = Args::next("to", &mut args)?; + let tx_hash = self + .send_tx(data, &to) + .await + .map_err(|err| JError::new(format!("Failed to send tx: {err}")))?; + Ok(json!(tx_hash)) + } + + async fn get_base_fee_per_gas(&self) -> Result { + let block: Value = process_response( + self.client + .request("eth_getBlockByNumber", rpc_params!["pending", false]) + .await, + )?; + + let fee = block + .as_object() + .and_then(|o| o.get("baseFeePerGas")) + .and_then(Value::as_str) + .ok_or(InvalidBaseFeePerGas(block.to_string()))? + .to_string(); + + let base_fee_per_gas = + U256::from_str_radix(&fee, 16).map_err(|_| InvalidBaseFeePerGas(fee))?; + + Ok(base_fee_per_gas) + } + + async fn get_tx_nonce(&self) -> Result { + let address = self.config.wallet_key.to_address().to_string(); + let resp: String = process_response( + self.client + .request("eth_getTransactionCount", rpc_params![address, "pending"]) + .await, + )?; + + let nonce = + U256::from_str_radix(&resp, 16).map_err(|_| ConnectorError::InvalidNonce(resp))?; + Ok(nonce) + } + + async fn max_priority_fee_per_gas(&self) -> Result { + let resp: String = process_response( + self.client + .request("eth_maxPriorityFeePerGas", rpc_params![]) + .await, + )?; + let max_priority_fee_per_gas = + U256::from_str_radix(&resp, 16).map_err(|_| ConnectorError::InvalidGasLimit(resp))?; + Ok(max_priority_fee_per_gas) + } + + async fn estimate_gas_limit(&self, data: &[u8], to: &str) -> Result { + let resp: String = process_response( + self.client + .request( + "eth_estimateGas", + rpc_params![json!({ + "from": self.config.wallet_key.to_address().to_string(), + "to": to, + "data": format!("0x{}", hex::encode(data)), + })], + ) + .await, + )?; + let limit = + U256::from_str_radix(&resp, 16).map_err(|_| ConnectorError::InvalidGasLimit(resp))?; + Ok(limit) + } + + pub async fn send_tx(&self, data: Vec, to: &str) -> Result { + let base_fee_per_gas = self.get_base_fee_per_gas().await?; + let gas_limit = self.estimate_gas_limit(&data, to).await?; + let max_priority_fee_per_gas = self.max_priority_fee_per_gas().await?; + + let increase = (base_fee_per_gas.as_u64() as f64 * BASE_FEE_MULTIPLIER) as u128; + let base_fee = base_fee_per_gas + .checked_add(increase.into()) + .ok_or(InvalidBaseFeePerGas("AAAA".to_string()))?; + // (base fee + priority fee). + let max_fee_per_gas = base_fee + max_priority_fee_per_gas; + + // We use this lock no ensure that we don't send two transactions with the same nonce + let _lock = self.tx_nonce_mutex.lock().await; + let nonce = self.get_tx_nonce().await?; + + // Create a new transaction + let tx = Transaction::Eip1559 { + chain_id: self.config.network_id.into(), + nonce: nonce.as_u128().into(), + max_priority_fee_per_gas: max_priority_fee_per_gas.as_u128().into(), + gas_limit: gas_limit.as_u128().into(), + to: to.parse()?, + value: 0u32.into(), + data, + signature: None, // Not signed. Yet. + max_fee_per_gas: max_fee_per_gas.as_u128().into(), + access_list: vec![], + }; + + let tx = tx + .sign(&self.config.wallet_key, Some(self.config.network_id)) + .to_bytes(); + let tx = hex::encode(tx); + + let resp: String = process_response( + self.client + .request("eth_sendRawTransaction", rpc_params![format!("0x{}", tx)]) + .await, + )?; + Ok(resp) + } + + pub async fn get_current_commitment_id(&self) -> Result, ConnectorError> { + let peer_id = Token::FixedBytes(peer_id_to_bytes(self.host_id)); + let data = GetComputePeerFunction::data(&[peer_id])?; + let resp: String = process_response( + self.client + .request( + "eth_call", + rpc_params![json!({ + "data": data, + "to": self.config.market_contract_address, + })], + ) + .await, + )?; + Ok(ComputePeer::from(&resp)?.commitment_id) + } + + pub async fn get_commitment_status( + &self, + commitment_id: CommitmentId, + ) -> Result { + let data = GetStatusFunction::data(&[Token::FixedBytes(commitment_id.0)])?; + let resp: String = process_response( + self.client + .request( + "eth_call", + rpc_params![json!({ + "data": data, + "to": self.config.cc_contract_address, + })], + ) + .await, + )?; + Ok(CommitmentStatus::from(&resp)?) + } + + pub async fn get_commitment( + &self, + commitment_id: CommitmentId, + ) -> Result { + let data = GetCommitmentFunction::data(&[Token::FixedBytes(commitment_id.0)])?; + let resp: String = process_response( + self.client + .request( + "eth_call", + rpc_params![json!({ + "data": data, + "to": self.config.cc_contract_address, + })], + ) + .await, + )?; + Ok(Commitment::from(&resp)?) + } + + pub async fn get_global_nonce(&self) -> Result { + let data = GetGlobalNonceFunction::data(&[])?; + let resp: String = process_response( + self.client + .request( + "eth_call", + rpc_params![json!({ + "data": data, + "to": self.config.cc_contract_address + })], + ) + .await, + )?; + + let bytes = GetGlobalNonceFunction::decode_fixed_bytes(&resp)?; + Ok(GlobalNonce::new( + bytes.try_into().map_err(|_| InvalidTokenSize)?, + )) + } + + pub async fn submit_proof(&self, proof: CCProof) -> Result { + let data = SubmitProofFunction::data_bytes(&[ + Token::FixedBytes(proof.cu_id.as_ref().to_vec()), + Token::FixedBytes(proof.local_nonce.as_ref().to_vec()), + Token::FixedBytes(proof.result_hash.as_ref().to_vec()), + ])?; + + self.send_tx(data, &self.config.cc_contract_address).await + } + + pub async fn get_compute_units(&self) -> eyre::Result> { + let data = + GetComputeUnitsFunction::data(&[Token::FixedBytes(peer_id_to_bytes(self.host_id))])?; + let resp: String = process_response( + self.client + .request( + "eth_call", + rpc_params![json!({ + "data": data, + "to": self.config.market_contract_address, + })], + ) + .await, + )?; + let mut tokens = + parse_chain_data(&resp, &GetComputeUnitsFunction::signature())?.into_iter(); + let units = next_opt(&mut tokens, "units", Token::into_array)?.into_iter(); + let compute_units = units + .map(ComputeUnit::from_token) + .collect::, _>>()?; + + Ok(compute_units) + } + + pub async fn get_cc_init_params(&self) -> eyre::Result { + let mut batch = BatchRequestBuilder::new(); + + batch.insert("eth_call", self.difficulty_params()?)?; + batch.insert("eth_call", self.init_timestamp_params()?)?; + batch.insert("eth_call", self.global_nonce_params()?)?; + batch.insert("eth_call", self.current_epoch_params()?)?; + batch.insert("eth_call", self.epoch_duration_params()?)?; + + let resp: BatchResponse = self.client.batch_request(batch).await?; + let mut results = resp + .into_ok() + .map_err(|err| eyre!("Some request failed in a batch {err:?}"))?; + + let difficulty = DifficultyFunction::decode_fixed_bytes( + &results.next().ok_or(eyre!("No response for difficulty"))?, + )?; + let init_timestamp = InitTimestampFunction::decode_uint( + &results + .next() + .ok_or(eyre!("No response for init_timestamp"))?, + )?; + let global_nonce = GetGlobalNonceFunction::decode_fixed_bytes( + &results + .next() + .ok_or(eyre!("No response for global_nonce"))?, + )?; + let current_epoch = CurrentEpochFunction::decode_uint( + &results + .next() + .ok_or(eyre!("No response for current_epoch"))?, + )?; + let epoch_duration = EpochDurationFunction::decode_uint( + &results + .next() + .ok_or(eyre!("No response for epoch_duration"))?, + )?; + + Ok(CCInitParams { + difficulty: Difficulty::new( + difficulty + .try_into() + .map_err(|_| eyre!("Failed to convert difficulty"))?, + ), + init_timestamp, + global_nonce: GlobalNonce::new( + global_nonce + .try_into() + .map_err(|_| eyre!("Failed to convert global_nonce"))?, + ), + current_epoch, + epoch_duration, + }) + } + + fn difficulty_params(&self) -> eyre::Result { + let data = DifficultyFunction::data(&[])?; + Ok(rpc_params![ + json!({"data": data, "to": self.config.cc_contract_address}) + ]) + } + + fn init_timestamp_params(&self) -> eyre::Result { + let data = InitTimestampFunction::data(&[])?; + Ok(rpc_params![ + json!({"data": data, "to": self.config.core_contract_address}) + ]) + } + fn global_nonce_params(&self) -> eyre::Result { + let data = GetGlobalNonceFunction::data(&[])?; + Ok(rpc_params![ + json!({"data": data, "to": self.config.cc_contract_address}) + ]) + } + fn current_epoch_params(&self) -> eyre::Result { + let data = CurrentEpochFunction::data(&[])?; + Ok(rpc_params![ + json!({"data": data, "to": self.config.core_contract_address}) + ]) + } + fn epoch_duration_params(&self) -> eyre::Result { + let data = EpochDurationFunction::data(&[])?; + Ok(rpc_params![ + json!({"data": data, "to": self.config.core_contract_address}) + ]) + } +} + +#[cfg(test)] +mod tests { + use crate::{ChainConnector, ConnectorError}; + use ccp_shared::proof::{CCProof, CCProofId, ProofIdx}; + use ccp_shared::types::{Difficulty, GlobalNonce, LocalNonce, ResultHash, CUID}; + use chain_data::peer_id_from_hex; + use chain_types::{CommitmentId, COMMITMENT_IS_NOT_ACTIVE}; + use clarity::PrivateKey; + use hex::FromHex; + use mockito::Matcher; + use serde_json::json; + use std::assert_matches::assert_matches; + use std::str::FromStr; + use std::sync::Arc; + + fn get_connector(url: &str) -> Arc { + let (connector, _) = ChainConnector::new( + server_config::ChainConfig { + http_endpoint: url.to_string(), + cc_contract_address: "0x8dc7d48492b9fD2519b65A54816be03758742c60".to_string(), + core_contract_address: "0x0B306BF915C4d645ff596e518fAf3F9669b97016".to_string(), + market_contract_address: "0x68B1D87F95878fE05B998F19b66F4baba5De1aed".to_string(), + network_id: 3525067388221321, + wallet_key: PrivateKey::from_str( + "0x97a2456e78c4894c62eef6031972d1ca296ed40bf311ab54c231f13db59fc428", + ) + .unwrap(), + }, + peer_id_from_hex("0x6497db93b32e4cdd979ada46a23249f444da1efb186cd74b9666bd03f710028b") + .unwrap(), + ) + .unwrap(); + + connector + } + #[tokio::test] + async fn test_get_compute_units() { + let expected_data = "0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000025d204dcc21f59c2a2098a277e48879207f614583e066654ad6736d36815ebb9e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000450e2f2a5bdb528895e9005f67e70fe213b9b822122e96fd85d2238cae55b6f900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"; + let expected_response = + format!("{{\"jsonrpc\":\"2.0\",\"result\":\"{expected_data}\",\"id\":0}}"); + + let mut server = mockito::Server::new(); + let url = server.url(); + let mock = server + .mock("POST", "/") + // expect exactly 1 POST request + .expect(1) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(expected_response) + .create(); + + let units = get_connector(&url).get_compute_units().await.unwrap(); + + mock.assert(); + assert_eq!(units.len(), 2); + assert_eq!(units[0].start_epoch, 0.into()); + assert!(units[0].deal.is_none()); + assert_eq!(units[1].start_epoch, 0.into()); + assert!(units[1].deal.is_none()); + } + + #[tokio::test] + async fn test_get_current_commitment_id_none() { + let expected_data = "0xaa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000020000000000000000000000005b73c5498c1e3b4dba84de0f1833c4a029d90519"; + let expected_response = + format!("{{\"jsonrpc\":\"2.0\",\"result\":\"{expected_data}\",\"id\":0}}"); + + let mut server = mockito::Server::new(); + let url = server.url(); + let mock = server + .mock("POST", "/") + // expect exactly 1 POST request + .expect(1) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(expected_response) + .create(); + let commitment_id = get_connector(&url) + .get_current_commitment_id() + .await + .unwrap(); + + mock.assert(); + assert!(commitment_id.is_none()); + } + + #[tokio::test] + async fn test_get_current_commitment_id_some() { + let expected_data = "0xaa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5aa3046a12a1aac6e840625e6329d70b427328feceedc8d273e5e6454b85633b5000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000005b73c5498c1e3b4dba84de0f1833c4a029d90519"; + let expected_response = + format!("{{\"jsonrpc\":\"2.0\",\"result\":\"{expected_data}\",\"id\":0}}"); + + let mut server = mockito::Server::new(); + let url = server.url(); + let mock = server + .mock("POST", "/") + // expect exactly 1 POST request + .expect(1) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(expected_response) + .create(); + let commitment_id = get_connector(&url) + .get_current_commitment_id() + .await + .unwrap(); + + mock.assert(); + assert!(commitment_id.is_some()); + assert_eq!( + hex::encode(commitment_id.unwrap().0), + "aa3046a12a1aac6e840625e6329d70b427328feceedc8d273e5e6454b85633b5" + ); + } + + #[tokio::test] + async fn test_get_commitment() { + let commitment_id = "0xa98dc43600773b162bcdb8175eadc037412cd7ad83555fafa507702011a53992"; + + let expected_data = "0x00000000000000000000000000000000000000000000000000000000000000016497db93b32e4cdd979ada46a23249f444da1efb186cd74b9666bd03f710028b000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000012c00000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"; + let expected_response = + format!("{{\"jsonrpc\":\"2.0\",\"result\":\"{expected_data}\",\"id\":0}}"); + let mut server = mockito::Server::new(); + let url = server.url(); + let mock = server + .mock("POST", "/") + // expect exactly 1 POST request + .expect(1) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(expected_response) + .create(); + let commitment_id = CommitmentId(hex::decode(&commitment_id[2..]).unwrap()); + let commitment = get_connector(&url) + .get_commitment(commitment_id) + .await + .unwrap(); + + mock.assert(); + assert_eq!( + commitment.status, + chain_types::CommitmentStatus::WaitDelegation + ); + assert_eq!(commitment.start_epoch, 0.into()); + assert_eq!(commitment.end_epoch, 300.into()); + } + + #[tokio::test] + async fn get_commitment_status() { + let commitment_id = "0xa98dc43600773b162bcdb8175eadc037412cd7ad83555fafa507702011a53992"; + + let expected_data = "0x0000000000000000000000000000000000000000000000000000000000000001"; + let expected_response = + format!("{{\"jsonrpc\":\"2.0\",\"result\":\"{expected_data}\",\"id\":0}}"); + let mut server = mockito::Server::new(); + let url = server.url(); + let mock = server + .mock("POST", "/") + // expect exactly 1 POST request + .expect(1) + .with_status(200) + .with_header("content-type", "application/json") + .match_body(Matcher::PartialJson(json!({ + "method": "eth_call", + }))) + .with_body(expected_response) + .create(); + let commitment_id = CommitmentId(hex::decode(&commitment_id[2..]).unwrap()); + let status = get_connector(&url) + .get_commitment_status(commitment_id) + .await + .unwrap(); + + mock.assert(); + assert_eq!(status, chain_types::CommitmentStatus::WaitDelegation); + } + + #[tokio::test] + async fn test_batch_init_request() { + let expected_response = r#"[ + { + "jsonrpc": "2.0", + "result": "0x76889c92f61b9c5df216e048df56eb8f4eb02f172ab0d5b04edb9190ab9c9eec", + "id": 0 + }, + { + "jsonrpc": "2.0", + "result": "0x0000000000000000000000000000000000000000000000000000000065ca5a01", + "id": 1 + }, + { + "jsonrpc": "2.0", + "result": "0x0000000000000000000000000000000000000000000000000000000000000005", + "id": 2 + }, + { + "jsonrpc": "2.0", + "result": "0x00000000000000000000000000000000000000000000000000000000000016be", + "id": 3 + }, + { + "jsonrpc": "2.0", + "result": "0x000000000000000000000000000000000000000000000000000000000000000f", + "id": 4 + } + ]"#; + let mut server = mockito::Server::new(); + let url = server.url(); + let mock = server + .mock("POST", "/") + // expect exactly 1 POST request + .expect(1) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(expected_response) + .create(); + + let init_params = get_connector(&url).get_cc_init_params().await.unwrap(); + + mock.assert(); + assert_eq!( + init_params.difficulty, + ::from_hex( + "76889c92f61b9c5df216e048df56eb8f4eb02f172ab0d5b04edb9190ab9c9eec" + ) + .unwrap() + ); + assert_eq!(init_params.init_timestamp, 1707760129.into()); + assert_eq!( + init_params.global_nonce, + ::from_hex( + "0000000000000000000000000000000000000000000000000000000000000005" + ) + .unwrap() + ); + assert_eq!( + init_params.current_epoch, + 0x00000000000000000000000000000000000000000000000000000000000016be.into() + ); + assert_eq!( + init_params.epoch_duration, + 0x000000000000000000000000000000000000000000000000000000000000000f.into() + ); + } + + #[tokio::test] + async fn submit_proof_not_active() { + let get_block_by_number = r#"{"jsonrpc":"2.0","id":0,"result":{"hash":"0xcbe8d90665392babc8098738ec78009193c99d3cc872a6657e306cfe8824bef9","parentHash":"0x15e767118a3e2d7545fee290b545faccd4a9eff849ac1057ce82cab7100c0c52","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","miner":"0x0000000000000000000000000000000000000000","stateRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","difficulty":"0x0","number":"0xa2","gasLimit":"0x1c9c380","gasUsed":"0x0","timestamp":"0x65d88f76","extraData":"0x","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0000000000000000","baseFeePerGas":"0x7","totalDifficulty":"0x0","uncles":[],"transactions":[],"size":"0x220"}}"#; + let estimate_gas = r#" + { + "jsonrpc": "2.0", + "id": 1, + "error": { + "code": -32000, + "message": "execution reverted: revert: Capacity commitment is not active", + "data": "0x08c379a000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000021436170616369747920636f6d6d69746d656e74206973206e6f742061637469766500000000000000000000000000000000000000000000000000000000000000" + } + } + "#; + + let mut server = mockito::Server::new(); + let url = server.url(); + server + .mock("POST", "/") + // expect exactly 4 POST request + .expect(1) + .with_status(200) + .with_header("content-type", "application/json") + .with_body_from_request(move |req| { + let body = req.body().expect("mock: get request body"); + let body: serde_json::Value = + serde_json::from_slice(body).expect("mock: parse request body"); + let method = body.get("method").expect("get method"); + let method = method.as_str().expect("as str").trim_matches(|c| c == '\"'); + + match method { + "eth_getBlockByNumber" => get_block_by_number.into(), + "eth_estimateGas" => estimate_gas.into(), + method => format!("'{}' not supported", method).into(), + } + }) + .create(); + + let proof = CCProof::new( + CCProofId::new( + GlobalNonce::new([0u8; 32].into()), + Difficulty::new([0u8; 32].into()), + ProofIdx::zero(), + ), + LocalNonce::new([0u8; 32].into()), + CUID::new([0u8; 32].into()), + ResultHash::from_slice([0u8; 32].into()), + ); + let result = get_connector(&url).submit_proof(proof).await; + + assert!(result.is_err()); + + assert_matches!( + result.unwrap_err(), + ConnectorError::RpcCallError { + code: _, + message: _, + data, + } if data.contains(COMMITMENT_IS_NOT_ACTIVE) + ); + } + + #[tokio::test] + async fn submit_proof() { + let get_block_by_number = r#"{"jsonrpc":"2.0","id":0,"result":{"hash":"0xcbe8d90665392babc8098738ec78009193c99d3cc872a6657e306cfe8824bef9","parentHash":"0x15e767118a3e2d7545fee290b545faccd4a9eff849ac1057ce82cab7100c0c52","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","miner":"0x0000000000000000000000000000000000000000","stateRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","difficulty":"0x0","number":"0xa2","gasLimit":"0x1c9c380","gasUsed":"0x0","timestamp":"0x65d88f76","extraData":"0x","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0000000000000000","baseFeePerGas":"0x7","totalDifficulty":"0x0","uncles":[],"transactions":[],"size":"0x220"}}"#; + let estimate_gas = r#"{"jsonrpc":"2.0","id": 1,"result": "0x5208"}"#; + let max_priority_fee = r#"{"jsonrpc":"2.0","id": 2,"result": "0x5208"}"#; + let nonce = r#"{"jsonrpc":"2.0","id":3,"result":"0x20"}"#; + let send_tx_response = r#" + { + "jsonrpc": "2.0", + "id": 4, + "result": "0x55bfec4a4400ca0b09e075e2b517041cd78b10021c51726cb73bcba52213fa05" + } + "#; + let mut server = mockito::Server::new(); + let url = server.url(); + server + .mock("POST", "/") + // expect exactly 4 POST request + .expect(1) + .with_status(200) + .with_header("content-type", "application/json") + .with_body_from_request(move |req| { + let body = req.body().expect("mock: get request body"); + let body: serde_json::Value = + serde_json::from_slice(body).expect("mock: parse request body"); + let method = body.get("method").expect("get method"); + let method = method.as_str().expect("as str").trim_matches(|c| c == '\"'); + + match method { + "eth_getBlockByNumber" => get_block_by_number.into(), + "eth_estimateGas" => estimate_gas.into(), + "eth_maxPriorityFeePerGas" => max_priority_fee.into(), + "eth_getTransactionCount" => nonce.into(), + "eth_sendRawTransaction" => send_tx_response.into(), + method => format!("'{}' not supported", method).into(), + } + }) + .create(); + + let proof = CCProof::new( + CCProofId::new( + GlobalNonce::new([0u8; 32].into()), + Difficulty::new([0u8; 32].into()), + ProofIdx::zero(), + ), + LocalNonce::new([0u8; 32].into()), + CUID::new([0u8; 32].into()), + ResultHash::from_slice([0u8; 32].into()), + ); + let result = get_connector(&url).submit_proof(proof).await.unwrap(); + + assert_eq!( + result, + "0x55bfec4a4400ca0b09e075e2b517041cd78b10021c51726cb73bcba52213fa05" + ); + } +} diff --git a/crates/chain-connector/src/error.rs b/crates/chain-connector/src/error.rs new file mode 100644 index 0000000000..aac4e9684b --- /dev/null +++ b/crates/chain-connector/src/error.rs @@ -0,0 +1,58 @@ +use chain_data::ChainDataError; +use jsonrpsee::core::client::{Error as RPCError, Error}; +use std::string::FromUtf8Error; + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ConnectorError { + #[error("RPC error: {0}")] + RpcError(#[from] RPCError), + #[error("RPC call error: code: {code}, message: {message}, data: {data}")] + RpcCallError { + /// Code + code: i32, + /// Message + message: String, + /// Optional data + data: String, + }, + #[error("Failed to parse chain data: {0}")] + ParseChainDataFailed(#[from] ChainDataError), + #[error("Failed to parse address: {0}")] + AddressParseError(#[from] clarity::Error), + #[error("data is not a valid hex: '{0}'")] + DecodeHex(#[from] hex::FromHexError), + #[error("data is not a valid string: '{0}'")] + DecodeData(#[from] FromUtf8Error), + #[error("Failed to parse baseFeePerGas: {0}")] + InvalidBaseFeePerGas(String), + #[error("Invalid transaction nonce: {0}")] + InvalidNonce(String), + #[error("Invalid gas limit: {0}")] + InvalidGasLimit(String), + #[error("Parse error: {0}")] + ParseError(#[from] serde_json::Error), +} + +pub fn process_response(response: Result) -> Result { + match response { + Ok(data) => Ok(data), + Err(err) => match err { + Error::Call(e) => { + let code = e.code(); + let message = e.message().to_string(); + let data = match e.data() { + Some(data) => serde_json::from_str(data.get())?, + None => "".to_string(), + }; + Err(ConnectorError::RpcCallError { + code, + message, + data, + }) + } + _ => Err(ConnectorError::RpcError(err)), + }, + } +} diff --git a/crates/chain-connector/src/function/current_epoch.rs b/crates/chain-connector/src/function/current_epoch.rs new file mode 100644 index 0000000000..936ce91eed --- /dev/null +++ b/crates/chain-connector/src/function/current_epoch.rs @@ -0,0 +1,23 @@ +use chain_data::ChainFunction; +use ethabi::{Function, ParamType, StateMutability}; + +/// @dev Returns current epoch +/// @return current epoch number +/// function currentEpoch() external view returns (uint256); +pub struct CurrentEpochFunction; +impl ChainFunction for CurrentEpochFunction { + fn function() -> Function { + #[allow(deprecated)] + Function { + name: "currentEpoch".to_string(), + inputs: vec![], + outputs: vec![], + constant: None, + state_mutability: StateMutability::View, + } + } + + fn signature() -> Vec { + vec![ParamType::Uint(256)] + } +} diff --git a/crates/chain-connector/src/function/difficulty.rs b/crates/chain-connector/src/function/difficulty.rs new file mode 100644 index 0000000000..bfb364457d --- /dev/null +++ b/crates/chain-connector/src/function/difficulty.rs @@ -0,0 +1,21 @@ +use chain_data::ChainFunction; +use ethabi::{Function, ParamType, StateMutability}; + +/// function difficulty() external view returns (bytes32); +pub struct DifficultyFunction; +impl ChainFunction for DifficultyFunction { + fn function() -> Function { + #[allow(deprecated)] + Function { + name: "difficulty".to_string(), + inputs: vec![], + outputs: vec![], + constant: None, + state_mutability: StateMutability::View, + } + } + + fn signature() -> Vec { + vec![ParamType::FixedBytes(32)] + } +} diff --git a/crates/chain-connector/src/function/epoch_duration.rs b/crates/chain-connector/src/function/epoch_duration.rs new file mode 100644 index 0000000000..5fb519c733 --- /dev/null +++ b/crates/chain-connector/src/function/epoch_duration.rs @@ -0,0 +1,24 @@ +use chain_data::ChainFunction; +use ethabi::{Function, ParamType, StateMutability}; + +/// @dev Returns epoch duration +/// @return epochDuration in seconds +/// function epochDuration() external view returns (uint256); +pub struct EpochDurationFunction; + +impl ChainFunction for EpochDurationFunction { + fn function() -> Function { + #[allow(deprecated)] + Function { + name: "epochDuration".to_string(), + inputs: vec![], + outputs: vec![], + constant: None, + state_mutability: StateMutability::View, + } + } + + fn signature() -> Vec { + vec![ParamType::Uint(256)] + } +} diff --git a/crates/chain-connector/src/function/get_commitment.rs b/crates/chain-connector/src/function/get_commitment.rs new file mode 100644 index 0000000000..a2d1230330 --- /dev/null +++ b/crates/chain-connector/src/function/get_commitment.rs @@ -0,0 +1,28 @@ +use chain_data::ChainFunction; +use chain_types::Commitment; + +/// @dev Returns the commitment info +/// @param commitmentId Commitment id +/// @return info commitment info +/// function getCommitment(bytes32 commitmentId) external view returns (CommitmentView memory); +pub struct GetCommitmentFunction; + +impl ChainFunction for GetCommitmentFunction { + fn function() -> ethabi::Function { + #[allow(deprecated)] + ethabi::Function { + name: "getCommitment".to_string(), + inputs: vec![ethabi::Param { + name: "commitmentId".to_string(), + kind: ethabi::ParamType::FixedBytes(32), + internal_type: None, + }], + outputs: vec![], + constant: None, + state_mutability: ethabi::StateMutability::View, + } + } + fn signature() -> Vec { + Commitment::signature() + } +} diff --git a/crates/chain-connector/src/function/get_commitment_status.rs b/crates/chain-connector/src/function/get_commitment_status.rs new file mode 100644 index 0000000000..e4c791e953 --- /dev/null +++ b/crates/chain-connector/src/function/get_commitment_status.rs @@ -0,0 +1,29 @@ +use chain_data::ChainFunction; + +/// @dev Returns the commitment status +/// @param commitmentId Commitment id +/// @return status commitment status +/// function getStatus(bytes32 commitmentId) external view returns (CCStatus); +pub struct GetStatusFunction; + +impl ChainFunction for GetStatusFunction { + fn function() -> ethabi::Function { + #[allow(deprecated)] + let function = ethabi::Function { + name: "getStatus".to_string(), + inputs: vec![ethabi::Param { + name: "commitmentId".to_string(), + kind: ethabi::ParamType::FixedBytes(32), + internal_type: None, + }], + outputs: vec![], + constant: None, + state_mutability: ethabi::StateMutability::View, + }; + function + } + + fn signature() -> Vec { + vec![ethabi::ParamType::FixedBytes(32)] + } +} diff --git a/crates/chain-connector/src/function/get_compute_peer.rs b/crates/chain-connector/src/function/get_compute_peer.rs new file mode 100644 index 0000000000..f29ebfc1a1 --- /dev/null +++ b/crates/chain-connector/src/function/get_compute_peer.rs @@ -0,0 +1,53 @@ +use chain_data::ChainFunction; +use chain_types::ComputePeer; +use ethabi::{Function, Param, ParamType, StateMutability}; + +/// struct ComputePeer { +/// bytes32 offerId; +/// bytes32 commitmentId; +/// uint256 unitCount; +/// address owner; +/// } +/// @dev Returns the compute peer info +/// function getComputePeer(bytes32 peerId) external view returns (ComputePeer memory); +pub struct GetComputePeerFunction; + +impl ChainFunction for GetComputePeerFunction { + fn function() -> Function { + #[allow(deprecated)] + Function { + name: "getComputePeer".to_string(), + inputs: vec![Param { + name: String::from("peerId"), + kind: ParamType::FixedBytes(32), + internal_type: None, + }], + outputs: vec![], + constant: None, + state_mutability: StateMutability::View, + } + } + + fn signature() -> Vec { + ComputePeer::signature() + } +} + +#[cfg(test)] +mod tests { + use crate::GetComputePeerFunction; + use chain_data::ChainFunction; + + #[tokio::test] + async fn test_data() { + let peer_id = "0x6497db93b32e4cdd979ada46a23249f444da1efb186cd74b9666bd03f710028b"; + let data = GetComputePeerFunction::data(&[ethabi::Token::FixedBytes( + hex::decode(&peer_id[2..]).unwrap(), + )]) + .unwrap(); + assert_eq!( + data, + "0x86e682546497db93b32e4cdd979ada46a23249f444da1efb186cd74b9666bd03f710028b" + ); + } +} diff --git a/crates/chain-connector/src/function/get_compute_units.rs b/crates/chain-connector/src/function/get_compute_units.rs new file mode 100644 index 0000000000..04d70c41fa --- /dev/null +++ b/crates/chain-connector/src/function/get_compute_units.rs @@ -0,0 +1,48 @@ +use chain_data::ChainFunction; +use chain_types::ComputeUnit; +use ethabi::ParamType; + +/// @dev Returns the compute units info of a peer +/// function getComputeUnits(bytes32 peerId) external view returns (ComputeUnitView[] memory); +pub struct GetComputeUnitsFunction; + +impl ChainFunction for GetComputeUnitsFunction { + fn function() -> ethabi::Function { + #[allow(deprecated)] + ethabi::Function { + name: "getComputeUnits".to_string(), + inputs: vec![ethabi::Param { + name: "peerId".to_string(), + kind: ParamType::FixedBytes(32), + internal_type: None, + }], + outputs: vec![], + constant: None, + state_mutability: ethabi::StateMutability::View, + } + } + fn signature() -> Vec { + vec![ParamType::Array(Box::new(ParamType::Tuple( + ComputeUnit::signature(), + )))] + } +} + +#[cfg(test)] +mod tests { + use crate::GetComputeUnitsFunction; + use chain_data::ChainFunction; + + #[tokio::test] + async fn test_data() { + let peer_id = "0x6497db93b32e4cdd979ada46a23249f444da1efb186cd74b9666bd03f710028b"; + let data = GetComputeUnitsFunction::data(&[ethabi::Token::FixedBytes( + hex::decode(&peer_id[2..]).unwrap(), + )]) + .unwrap(); + assert_eq!( + data, + "0xb6015c6e6497db93b32e4cdd979ada46a23249f444da1efb186cd74b9666bd03f710028b" + ); + } +} diff --git a/crates/chain-connector/src/function/global_nonce.rs b/crates/chain-connector/src/function/global_nonce.rs new file mode 100644 index 0000000000..0f4932e111 --- /dev/null +++ b/crates/chain-connector/src/function/global_nonce.rs @@ -0,0 +1,23 @@ +use chain_data::ChainFunction; +use ethabi::{Function, ParamType, StateMutability}; + +/// function getGlobalNonce() external view returns (bytes32); + +pub struct GetGlobalNonceFunction; + +impl ChainFunction for GetGlobalNonceFunction { + fn function() -> Function { + #[allow(deprecated)] + Function { + name: "getGlobalNonce".to_string(), + inputs: vec![], + outputs: vec![], + constant: None, + state_mutability: StateMutability::View, + } + } + + fn signature() -> Vec { + vec![ParamType::FixedBytes(32)] + } +} diff --git a/crates/chain-connector/src/function/init_timestamp.rs b/crates/chain-connector/src/function/init_timestamp.rs new file mode 100644 index 0000000000..0b757846d7 --- /dev/null +++ b/crates/chain-connector/src/function/init_timestamp.rs @@ -0,0 +1,26 @@ +use chain_data::ChainFunction; +use ethabi::{Function, ParamType, StateMutability}; + +/// @dev Returns epoch init timestamp +/// @return initTimestamp in seconds +/// function initTimestamp() external view returns (uint256); +/// + +pub struct InitTimestampFunction; + +impl ChainFunction for InitTimestampFunction { + fn function() -> Function { + #[allow(deprecated)] + Function { + name: "initTimestamp".to_string(), + inputs: vec![], + outputs: vec![], + constant: None, + state_mutability: StateMutability::View, + } + } + + fn signature() -> Vec { + vec![ParamType::Uint(256)] + } +} diff --git a/crates/chain-connector/src/function/mod.rs b/crates/chain-connector/src/function/mod.rs new file mode 100644 index 0000000000..c116865021 --- /dev/null +++ b/crates/chain-connector/src/function/mod.rs @@ -0,0 +1,21 @@ +mod current_epoch; +mod difficulty; +mod epoch_duration; +mod get_commitment; +mod get_commitment_status; +mod get_compute_peer; +mod get_compute_units; +mod global_nonce; +mod init_timestamp; +mod submit_proof; + +pub use current_epoch::CurrentEpochFunction; +pub use difficulty::DifficultyFunction; +pub use epoch_duration::EpochDurationFunction; +pub use get_commitment::GetCommitmentFunction; +pub use get_commitment_status::GetStatusFunction; +pub use get_compute_peer::GetComputePeerFunction; +pub use get_compute_units::GetComputeUnitsFunction; +pub use global_nonce::GetGlobalNonceFunction; +pub use init_timestamp::InitTimestampFunction; +pub use submit_proof::SubmitProofFunction; diff --git a/crates/chain-connector/src/function/submit_proof.rs b/crates/chain-connector/src/function/submit_proof.rs new file mode 100644 index 0000000000..4f547d744e --- /dev/null +++ b/crates/chain-connector/src/function/submit_proof.rs @@ -0,0 +1,48 @@ +use chain_data::ChainFunction; + +/// @dev Submits a proof for the commitment +/// @param unitId Compute unit id which provied the proof +/// @param globalUnitNonce The global nonce of the unit for calculating the target hash +/// @param localUnitNonce The local nonce of the unit for calculating the target hash. It's the proof +/// @param targetHash The target hash of this proof +/// function submitProof(bytes32 unitId, bytes32 globalUnitNonce, bytes32 localUnitNonce, bytes32 targetHash) external; + +pub struct SubmitProofFunction; + +impl ChainFunction for SubmitProofFunction { + fn function() -> ethabi::Function { + #[allow(deprecated)] + let function = ethabi::Function { + name: "submitProof".to_string(), + inputs: vec![ + ethabi::Param { + name: "unitId".to_string(), + kind: ethabi::ParamType::FixedBytes(32), + internal_type: None, + }, + ethabi::Param { + name: "localUnitNonce".to_string(), + kind: ethabi::ParamType::FixedBytes(32), + internal_type: None, + }, + ethabi::Param { + name: "targetHash".to_string(), + kind: ethabi::ParamType::FixedBytes(32), + internal_type: None, + }, + ], + outputs: vec![], + constant: None, + state_mutability: ethabi::StateMutability::NonPayable, + }; + function + } + + fn signature() -> Vec { + vec![ + ethabi::ParamType::FixedBytes(32), + ethabi::ParamType::FixedBytes(32), + ethabi::ParamType::FixedBytes(32), + ] + } +} diff --git a/crates/chain-connector/src/lib.rs b/crates/chain-connector/src/lib.rs new file mode 100644 index 0000000000..4ea2e96cf6 --- /dev/null +++ b/crates/chain-connector/src/lib.rs @@ -0,0 +1,10 @@ +#![feature(assert_matches)] + +mod connector; +mod error; +mod function; + +pub use connector::CCInitParams; +pub use connector::ChainConnector; +pub use error::ConnectorError; +pub use function::*; diff --git a/crates/chain-data/Cargo.toml b/crates/chain-data/Cargo.toml index 19ae1af62e..1a0c3734d2 100644 --- a/crates/chain-data/Cargo.toml +++ b/crates/chain-data/Cargo.toml @@ -13,3 +13,4 @@ hex-utils = { workspace = true } hex = { workspace = true } log = { workspace = true } thiserror = { workspace = true } +eyre = { workspace = true } diff --git a/crates/chain-data/src/error.rs b/crates/chain-data/src/error.rs index 9de46fef3d..d2c77edf3c 100644 --- a/crates/chain-data/src/error.rs +++ b/crates/chain-data/src/error.rs @@ -13,4 +13,6 @@ pub enum ChainDataError { DecodeHex(#[source] hex::FromHexError), #[error(transparent)] EthError(#[from] ethabi::Error), + #[error("Invalid token size")] + InvalidTokenSize, } diff --git a/crates/chain-data/src/function.rs b/crates/chain-data/src/function.rs new file mode 100644 index 0000000000..2c4fbe906a --- /dev/null +++ b/crates/chain-data/src/function.rs @@ -0,0 +1,33 @@ +use crate::{next_opt, ChainDataError}; +use ethabi::ethereum_types::U256; +use ethabi::Token; + +pub trait ChainFunction { + fn function() -> ethabi::Function; + fn signature() -> Vec; + + fn data(inputs: &[Token]) -> Result { + let function = Self::function(); + let data = function.encode_input(inputs)?; + Ok(format!("0x{}", hex::encode(data))) + } + + fn data_bytes(inputs: &[Token]) -> Result, ChainDataError> { + let function = Self::function(); + Ok(function.encode_input(inputs)?) + } + + fn decode_uint(data: &str) -> Result { + let mut tokens = crate::parse_chain_data(data, &Self::signature())?.into_iter(); + next_opt(&mut tokens, "uint", Token::into_uint) + } + + fn decode_fixed_bytes(data: &str) -> Result, ChainDataError> { + let mut tokens = crate::parse_chain_data(data, &Self::signature())?.into_iter(); + next_opt(&mut tokens, "bytes", Token::into_fixed_bytes) + } + + fn decode_tuple(data: &str) -> Result, ChainDataError> { + crate::parse_chain_data(data, &Self::signature()) + } +} diff --git a/crates/chain-data/src/lib.rs b/crates/chain-data/src/lib.rs index 0fdea916e9..456a55c168 100644 --- a/crates/chain-data/src/lib.rs +++ b/crates/chain-data/src/lib.rs @@ -2,13 +2,15 @@ mod chain_data; mod data_tokens; mod error; +mod function; mod log; mod u256; mod utils; -pub use chain_data::{ChainData, ChainEvent, EventField}; +pub use chain_data::{parse_chain_data, ChainData, ChainEvent, EventField}; pub use data_tokens::{next, next_opt}; pub use error::ChainDataError; +pub use function::ChainFunction; pub use log::{parse_log, Log, LogParseError}; pub use u256::U256; -pub use utils::{parse_peer_id, peer_id_to_hex}; +pub use utils::{parse_peer_id, peer_id_from_hex, peer_id_to_bytes, peer_id_to_hex}; diff --git a/crates/chain-data/src/utils.rs b/crates/chain-data/src/utils.rs index 83f09629d7..a4b7758781 100644 --- a/crates/chain-data/src/utils.rs +++ b/crates/chain-data/src/utils.rs @@ -1,3 +1,4 @@ +use hex_utils::decode_hex; use libp2p_identity::{ParseError, PeerId}; /// Static prefix of the PeerId. Protobuf encoding + multihash::identity + length and so on. @@ -9,9 +10,16 @@ pub fn parse_peer_id(bytes: Vec) -> Result { PeerId::from_bytes(&peer_id) } -pub fn peer_id_to_hex(peer_id: PeerId) -> String { +pub fn peer_id_to_bytes(peer_id: PeerId) -> Vec { let peer_id = peer_id.to_bytes(); - format!("0x{:0>64}", hex::encode(&peer_id[PEER_ID_PREFIX.len()..])) + peer_id[PEER_ID_PREFIX.len()..].to_vec() +} +pub fn peer_id_to_hex(peer_id: PeerId) -> String { + format!("0x{:0>64}", hex::encode(peer_id_to_bytes(peer_id))) +} + +pub fn peer_id_from_hex(hex: &str) -> eyre::Result { + Ok(parse_peer_id(decode_hex(hex)?)?) } #[cfg(test)] diff --git a/crates/chain-listener/Cargo.toml b/crates/chain-listener/Cargo.toml index 5d0af35b88..9ca4a07211 100644 --- a/crates/chain-listener/Cargo.toml +++ b/crates/chain-listener/Cargo.toml @@ -9,6 +9,8 @@ edition = "2021" ethabi = { workspace = true } thiserror = { workspace = true } chain-data = { workspace = true } +chain-types = { workspace = true } +chain-connector = { workspace = true } jsonrpsee = { workspace = true, features = ["ws-client", "macros", "server"] } serde = { workspace = true } @@ -19,7 +21,7 @@ log = { workspace = true } tracing = { workspace = true } eyre = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["rt"]} server-config = { workspace = true } types = { workspace = true } @@ -27,3 +29,9 @@ fluence-libp2p = { workspace = true } tempfile = { workspace = true } fs-utils = { workspace = true } log-utils = { workspace = true } +futures = { workspace = true } +core-manager = { workspace = true } +ccp-rpc-client = { workspace = true } +cpu-utils = { workspace = true } +ccp-shared = { workspace = true } +tokio-stream = { workspace = true } diff --git a/crates/chain-listener/src/event/cc_activated.rs b/crates/chain-listener/src/event/cc_activated.rs index 999a8b31eb..7edcab4c78 100644 --- a/crates/chain-listener/src/event/cc_activated.rs +++ b/crates/chain-listener/src/event/cc_activated.rs @@ -1,59 +1,61 @@ +use chain_data::ChainDataError::InvalidTokenSize; use chain_data::EventField::{Indexed, NotIndexed}; -use chain_data::{ - next_opt, parse_peer_id, ChainData, ChainDataError, ChainEvent, EventField, U256, -}; +use chain_data::{next_opt, parse_peer_id, ChainData, ChainDataError, ChainEvent, EventField}; +use chain_types::CommitmentId; +use core_manager::CUID; +use ethabi::ethereum_types::U256; use ethabi::param_type::ParamType; use ethabi::Token; use libp2p_identity::PeerId; use serde::{Deserialize, Serialize}; use types::peer_id; -/// Corresponding Solidity type: -/// ```solidity -///event CapacityCommitmentActivated( -/// bytes32 indexed peerId, -/// bytes32 indexed commitmentId, -/// uint256 endEpoch, -/// bytes32[] unitIds, -///); -/// ``` - -#[derive(Debug, Serialize, Deserialize)] -pub struct CommitmentId(pub Vec); -#[derive(Debug, Serialize, Deserialize)] -pub struct UnitId(pub Vec); - +/// @dev Emitted when a commitment is activated. Commitment can be activated only if delegator deposited collateral. +/// @param peerId Peer id which linked to the commitment +/// @param commitmentId Commitment id which activated +/// @param startEpoch The start epoch of the commitment +/// @param endEpoch The end epoch of the commitment +/// @param unitIds Compute unit ids which linked to the commitment +/// event CommitmentActivated( +/// bytes32 indexed peerId, +/// bytes32 indexed commitmentId, +/// uint256 startEpoch, +/// uint256 endEpoch, +/// bytes32[] unitIds +/// ); #[derive(Debug, Serialize, Deserialize)] -pub struct CCActivatedData { +pub struct CommitmentActivatedData { #[serde( serialize_with = "peer_id::serde::serialize", deserialize_with = "peer_id::serde::deserialize" )] pub peer_id: PeerId, pub commitment_id: CommitmentId, + pub start_epoch: U256, pub end_epoch: U256, - pub unit_ids: Vec, + pub unit_ids: Vec, } #[derive(Debug, Serialize, Deserialize)] -pub struct CCActivated { +pub struct CommitmentActivated { pub block_number: String, - pub info: CCActivatedData, + pub info: CommitmentActivatedData, } -impl CCActivated { - pub const EVENT_NAME: &'static str = "CapacityCommitmentActivated"; +impl CommitmentActivated { + pub const EVENT_NAME: &'static str = "CommitmentActivated"; } -impl ChainData for CCActivatedData { +impl ChainData for CommitmentActivatedData { fn event_name() -> &'static str { - CCActivated::EVENT_NAME + CommitmentActivated::EVENT_NAME } fn signature() -> Vec { vec![ Indexed(ParamType::FixedBytes(32)), // peerId Indexed(ParamType::FixedBytes(32)), // commitmentId + NotIndexed(ParamType::Uint(256)), // startEpoch NotIndexed(ParamType::Uint(256)), // endEpoch NotIndexed(ParamType::Array(Box::new(ParamType::FixedBytes(32)))), // unitIds ] @@ -70,27 +72,33 @@ impl ChainData for CCActivatedData { Token::into_fixed_bytes, )?); - let end_epoch = next_opt(data_tokens, "end_epoch", U256::from_token)?; + let start_epoch = next_opt(data_tokens, "start_epoch", Token::into_uint)?; + let end_epoch = next_opt(data_tokens, "end_epoch", Token::into_uint)?; - let unit_ids: Vec> = next_opt(data_tokens, "unit_ids", |t| { + let units: Vec> = next_opt(data_tokens, "unit_ids", |t| { t.into_array()? .into_iter() .map(Token::into_fixed_bytes) .collect() })?; - let unit_ids = unit_ids.into_iter().map(UnitId).collect(); - Ok(CCActivatedData { + let mut unit_ids = vec![]; + for cu in units { + unit_ids.push(CUID::new(cu.try_into().map_err(|_| InvalidTokenSize)?)); + } + + Ok(CommitmentActivatedData { peer_id, commitment_id, + start_epoch, end_epoch, unit_ids, }) } } -impl ChainEvent for CCActivated { - fn new(block_number: String, info: CCActivatedData) -> Self { +impl ChainEvent for CommitmentActivated { + fn new(block_number: String, info: CommitmentActivatedData) -> Self { Self { block_number, info } } } @@ -98,49 +106,54 @@ impl ChainEvent for CCActivated { #[cfg(test)] mod test { - use super::{CCActivated, CCActivatedData}; + use super::{CommitmentActivated, CommitmentActivatedData}; use chain_data::{parse_log, ChainData, Log}; + use core_manager::CUID; use hex; + use hex::FromHex; #[tokio::test] async fn test_cc_activated_topic() { assert_eq!( - CCActivatedData::topic(), - "0xcd92fc03744bba25ad966bdc1127f8996e70c551d1ee4a88ce7fb0e596069649" + CommitmentActivatedData::topic(), + "0x0b0a4688a90d1b24732d05ddf4925af69f02cd7d9a921b1cdcd4a7c2b6d57d68" ); } #[tokio::test] async fn test_chain_parsing_ok() { - let data = "0x00000000000000000000000000000000000000000000000000000000009896800000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000a4c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc9".to_string(); + let data = "0x000000000000000000000000000000000000000000000000000000000000007b00000000000000000000000000000000000000000000000000000000000001c800000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000001c04d94f1e85788b245471c87490f42149b09503fe3af46733e4b5adf94583105".to_string(); let log = Log { data, block_number: "0x0".to_string(), removed: false, topics: vec![ - CCActivatedData::topic(), - "0x246cd65bc58db104674f76c9b1340eb16881d9ef90e33d4b1086ebd334f4002d".to_string(), - "0xd6996a1d0950671fa4ae2642e9bfdb7e4c7832a35c640cdb47ecb8b8002b77b5".to_string(), + CommitmentActivatedData::topic(), + "0xc586dcbfc973643dc5f885bf1a38e054d2675b03fe283a5b7337d70dda9f7171".to_string(), + "0x27e42c090aa007a4f2545547425aaa8ea3566e1f18560803ac48f8e98cb3b0c9".to_string(), ], }; - let result = parse_log::(log); + let result = parse_log::(log); assert!(result.is_ok(), "can't parse data: {:?}", result); let result = result.unwrap().info; assert_eq!( result.peer_id.to_string(), - "12D3KooWCGZ6t8by5ag5YMQW4k3HoPLaKdN5rB9DhAmDUeG8dj1N" + "12D3KooWP7RkvkBhbe7ATd451zxTifzF6Gm1uzCDadqQueET7EMe" // it's also the second topic ); assert_eq!( hex::encode(result.commitment_id.0), - "d6996a1d0950671fa4ae2642e9bfdb7e4c7832a35c640cdb47ecb8b8002b77b5" - ); - assert_eq!( - result.end_epoch.to_eth(), - ethabi::ethereum_types::U256::exp10(7) + "27e42c090aa007a4f2545547425aaa8ea3566e1f18560803ac48f8e98cb3b0c9" // it's the third topic ); + assert_eq!(result.start_epoch, 123.into()); + assert_eq!(result.end_epoch, 456.into()); - assert_eq!(result.unit_ids.len(), 10); + assert_eq!(result.unit_ids.len(), 1); + assert_eq!( + result.unit_ids[0], + ::from_hex("c04d94f1e85788b245471c87490f42149b09503fe3af46733e4b5adf94583105") + .unwrap() + ) } } diff --git a/crates/chain-listener/src/event/mod.rs b/crates/chain-listener/src/event/mod.rs index 824d9bb6db..9868c2de52 100644 --- a/crates/chain-listener/src/event/mod.rs +++ b/crates/chain-listener/src/event/mod.rs @@ -1,3 +1,7 @@ pub mod cc_activated; +mod unit_activated; +mod unit_deactivated; -pub use cc_activated::CCActivatedData; +pub use cc_activated::CommitmentActivatedData; +pub use unit_activated::{UnitActivated, UnitActivatedData}; +pub use unit_deactivated::{UnitDeactivated, UnitDeactivatedData}; diff --git a/crates/chain-listener/src/event/unit_activated.rs b/crates/chain-listener/src/event/unit_activated.rs new file mode 100644 index 0000000000..1eb08bab0e --- /dev/null +++ b/crates/chain-listener/src/event/unit_activated.rs @@ -0,0 +1,132 @@ +use chain_data::ChainDataError::InvalidTokenSize; +use chain_data::EventField::{Indexed, NotIndexed}; +use chain_data::{next_opt, ChainData, ChainDataError, ChainEvent, EventField}; +use chain_types::{CommitmentId, ComputeUnit}; +use core_manager::CUID; +use ethabi::ethereum_types::U256; +use ethabi::param_type::ParamType; +use ethabi::Token; +use serde::{Deserialize, Serialize}; + +/// @dev Emitted when a unit activated. Unit is activated when it returned from deal +/// @param commitmentId Commitment id +/// @param unitId Compute unit id which activated +/// event UnitActivated( +/// bytes32 indexed commitmentId, +/// bytes32 indexed unitId, +/// uint256 startEpoch +/// ); + +#[derive(Debug, Serialize, Deserialize)] +pub struct UnitActivatedData { + pub commitment_id: CommitmentId, + pub unit_id: CUID, + pub start_epoch: U256, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct UnitActivated { + pub block_number: String, + pub info: UnitActivatedData, +} + +impl UnitActivated { + pub const EVENT_NAME: &'static str = "UnitActivated"; +} + +impl ChainData for UnitActivatedData { + fn event_name() -> &'static str { + UnitActivated::EVENT_NAME + } + + fn signature() -> Vec { + vec![ + Indexed(ParamType::FixedBytes(32)), // commitmentId + Indexed(ParamType::FixedBytes(32)), // unitId + NotIndexed(ParamType::Uint(256)), // startEpoch + ] + } + + /// Parse data from chain. Accepts data with and without "0x" prefix. + fn parse(data_tokens: &mut impl Iterator) -> Result { + let commitment_id = CommitmentId(next_opt( + data_tokens, + "commitment_id", + Token::into_fixed_bytes, + )?); + + let unit_id = next_opt(data_tokens, "unit_id", Token::into_fixed_bytes)?; + + let start_epoch = next_opt(data_tokens, "start_epoch", Token::into_uint)?; + + Ok(UnitActivatedData { + commitment_id, + unit_id: CUID::new(unit_id.try_into().map_err(|_| InvalidTokenSize)?), + start_epoch, + }) + } +} + +impl ChainEvent for UnitActivated { + fn new(block_number: String, info: UnitActivatedData) -> Self { + Self { block_number, info } + } +} + +impl From for ComputeUnit { + fn from(data: UnitActivatedData) -> Self { + ComputeUnit { + id: data.unit_id, + deal: None, + start_epoch: data.start_epoch, + } + } +} +#[cfg(test)] +mod test { + + use super::UnitActivated; + use crate::event::UnitActivatedData; + use chain_data::{parse_log, ChainData, Log}; + use core_manager::CUID; + use hex; + use hex::FromHex; + + #[tokio::test] + async fn test_unit_activated_topic() { + assert_eq!( + UnitActivatedData::topic(), + "0x8e4b27eeb3194deef0b3140997e6b82f53eb7350daceb9355268009b92f70add" + ); + } + + #[tokio::test] + async fn test_chain_parsing_ok() { + let data = "0x000000000000000000000000000000000000000000000000000000000000007b00000000000000000000000000000000000000000000000000000000000001c800000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000001c04d94f1e85788b245471c87490f42149b09503fe3af46733e4b5adf94583105".to_string(); + let log = Log { + data, + block_number: "0x0".to_string(), + removed: false, + topics: vec![ + UnitActivatedData::topic(), + "0x431688393bc518ef01e11420af290b92f3668dca24fc171eeb11dd15bcefad72".to_string(), + "0xd33bc101f018e42351fbe2adc8682770d164e27e2e4c6454e0faaf5b8b63b90e".to_string(), + ], + }; + let result = parse_log::(log); + + assert!(result.is_ok(), "can't parse data: {:?}", result); + let result = result.unwrap().info; + assert_eq!( + hex::encode(result.commitment_id.0), + "431688393bc518ef01e11420af290b92f3668dca24fc171eeb11dd15bcefad72" // it's the second topic + ); + assert_eq!( + result.unit_id, + ::from_hex("d33bc101f018e42351fbe2adc8682770d164e27e2e4c6454e0faaf5b8b63b90e") + .unwrap() // it's also the third topic + ); + + assert_eq!(result.start_epoch, 123.into()); + } +} diff --git a/crates/chain-listener/src/event/unit_deactivated.rs b/crates/chain-listener/src/event/unit_deactivated.rs new file mode 100644 index 0000000000..836a4d0f9f --- /dev/null +++ b/crates/chain-listener/src/event/unit_deactivated.rs @@ -0,0 +1,114 @@ +use chain_data::ChainDataError::InvalidTokenSize; +use chain_data::EventField::Indexed; +use chain_data::{next_opt, ChainData, ChainDataError, ChainEvent, EventField}; +use chain_types::CommitmentId; +use core_manager::CUID; +use ethabi::param_type::ParamType; +use ethabi::Token; +use serde::{Deserialize, Serialize}; + +/// @dev Emitted when a unit deactivated. Unit is deactivated when it moved to deal +/// @param commitmentId Commitment id +/// @param unitId Compute unit id which deactivated +/// event UnitDeactivated( +/// bytes32 indexed commitmentId, +/// bytes32 indexed unitId +/// ); + +#[derive(Debug, Serialize, Deserialize)] +pub struct UnitDeactivatedData { + pub commitment_id: CommitmentId, + pub unit_id: CUID, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct UnitDeactivated { + pub block_number: String, + pub info: UnitDeactivatedData, +} + +impl UnitDeactivated { + pub const EVENT_NAME: &'static str = "UnitDeactivated"; +} + +impl ChainData for UnitDeactivatedData { + fn event_name() -> &'static str { + UnitDeactivated::EVENT_NAME + } + + fn signature() -> Vec { + vec![ + Indexed(ParamType::FixedBytes(32)), // commitmentId + Indexed(ParamType::FixedBytes(32)), // unitId + ] + } + + /// Parse data from chain. Accepts data with and without "0x" prefix. + fn parse(data_tokens: &mut impl Iterator) -> Result { + let commitment_id = CommitmentId(next_opt( + data_tokens, + "commitment_id", + Token::into_fixed_bytes, + )?); + + let unit_id = next_opt(data_tokens, "unit_id", Token::into_fixed_bytes)?; + + Ok(UnitDeactivatedData { + commitment_id, + unit_id: CUID::new(unit_id.try_into().map_err(|_| InvalidTokenSize)?), + }) + } +} + +impl ChainEvent for UnitDeactivated { + fn new(block_number: String, info: UnitDeactivatedData) -> Self { + Self { block_number, info } + } +} + +#[cfg(test)] +mod test { + + use crate::event::unit_deactivated::UnitDeactivated; + use crate::event::UnitDeactivatedData; + use chain_data::{parse_log, ChainData, Log}; + use core_manager::CUID; + use hex; + use hex::FromHex; + + #[tokio::test] + async fn test_unit_activated_topic() { + assert_eq!( + UnitDeactivatedData::topic(), + "0xbd9cde1bbc961036d34368ae328c38917036a98eacfb025a1ff6d2c6235d0a14" + ); + } + + #[tokio::test] + async fn test_chain_parsing_ok() { + let data = "0x".to_string(); + let log = Log { + data, + block_number: "0x0".to_string(), + removed: false, + topics: vec![ + UnitDeactivatedData::topic(), + "0x91cfcc4a139573b08646960be31b278152ef3480710ab15d9b39262be37038a1".to_string(), + "0xf3660ca1eaf461cbbb5e1d06ade6ba4a9a503c0d680ba825e09cddd3f9b45fc6".to_string(), + ], + }; + let result = parse_log::(log); + + assert!(result.is_ok(), "can't parse data: {:?}", result); + let result = result.unwrap().info; + assert_eq!( + hex::encode(result.commitment_id.0), + "91cfcc4a139573b08646960be31b278152ef3480710ab15d9b39262be37038a1" // it's the second topic + ); + assert_eq!( + result.unit_id, + ::from_hex("f3660ca1eaf461cbbb5e1d06ade6ba4a9a503c0d680ba825e09cddd3f9b45fc6") + .unwrap() // it's also the third topic + ); + } +} diff --git a/crates/chain-listener/src/lib.rs b/crates/chain-listener/src/lib.rs index 4977a2375d..65f4ccc1b5 100644 --- a/crates/chain-listener/src/lib.rs +++ b/crates/chain-listener/src/lib.rs @@ -1,5 +1,8 @@ #![feature(assert_matches)] #![feature(try_blocks)] +#![feature(extract_if)] +#![feature(hash_extract_if)] + mod event; mod listener; diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 9633ebe0fe..f302cae841 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -1,61 +1,183 @@ -use crate::event::cc_activated::CCActivated; -use crate::event::CCActivatedData; +use crate::event::cc_activated::CommitmentActivated; +use crate::event::{ + CommitmentActivatedData, UnitActivated, UnitActivatedData, UnitDeactivated, UnitDeactivatedData, +}; +use ccp_rpc_client::OrHex; +use ccp_shared::proof::{CCProof, CCProofId, ProofIdx}; +use ccp_shared::types::{Difficulty, GlobalNonce, LocalNonce, ResultHash}; +use chain_connector::{CCInitParams, ChainConnector, ConnectorError}; use chain_data::{parse_log, peer_id_to_hex, ChainData, Log}; -use jsonrpsee::core::client::{Client, Subscription, SubscriptionClientT}; -use jsonrpsee::core::JsonValue; +use chain_types::{ + CommitmentId, CommitmentStatus, ComputeUnit, COMMITMENT_IS_NOT_ACTIVE, TOO_MANY_PROOFS, +}; +use core_manager::manager::{CoreManager, CoreManagerFunctions}; +use core_manager::types::{AcquireRequest, WorkType}; +use core_manager::CUID; +use cpu_utils::PhysicalCoreId; +use ethabi::ethereum_types::U256; +use jsonrpsee::core::client::{Client as WsClient, Subscription, SubscriptionClientT}; +use jsonrpsee::core::{client, JsonValue}; use jsonrpsee::rpc_params; -use jsonrpsee::ws_client::WsClientBuilder; use libp2p_identity::PeerId; -use serde_json::json; -use server_config::ChainListenerConfig; +use serde_json::{json, Value}; +use server_config::{ChainConfig, ChainListenerConfig}; +use std::collections::{HashMap, HashSet}; use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; use tokio::task::JoinHandle; +use tokio::time::interval; +use tokio_stream::wrappers::IntervalStream; +use tokio_stream::StreamExt; pub struct ChainListener { - config: ChainListenerConfig, - cc_events_dir: PathBuf, + config: ChainConfig, + + chain_connector: Arc, + ws_client: WsClient, + // ccp_client: Option, + core_manager: Arc, + + timer_resolution: Duration, + _cc_events_dir: PathBuf, host_id: PeerId, + + difficulty: Difficulty, + init_timestamp: U256, + global_nonce: GlobalNonce, + current_epoch: U256, + epoch_duration: U256, + + // proof_counter: HashMap, + current_commitment: Option, + + active_compute_units: HashSet, + pending_compute_units: HashSet, +} + +async fn poll_subscription( + s: &mut Option>, +) -> Option> { + match s { + Some(ref mut s) => s.next().await, + None => None, + } } impl ChainListener { - pub fn new(config: ChainListenerConfig, cc_events_dir: PathBuf, host_id: PeerId) -> Self { + pub async fn new( + chain_config: ChainConfig, + listener_config: ChainListenerConfig, + cc_events_dir: PathBuf, + host_id: PeerId, + chain_connector: Arc, + core_manager: Arc, + init_params: CCInitParams, + ws_client: WsClient, + ) -> Self { + // // We will use the first physical core for utility tasks + // let _utility_core = core_manager + // .get_system_cpu_assignment() + // .physical_core_ids + // .first() + // .cloned() + // .ok_or(eyre::eyre!("No utility core id"))?; + + // let ccp_client = + // CCPRpcHttpClient::new(listener_config.ccp_endpoint.clone(), utility_core).await?; Self { - config, - cc_events_dir, + chain_connector, + ws_client, + // ccp_client, + config: chain_config, host_id, + difficulty: init_params.difficulty, + init_timestamp: init_params.init_timestamp, + global_nonce: init_params.global_nonce, + current_epoch: init_params.current_epoch, + epoch_duration: init_params.epoch_duration, + current_commitment: None, + active_compute_units: HashSet::new(), + pending_compute_units: HashSet::new(), + core_manager, + _cc_events_dir: cc_events_dir, + timer_resolution: listener_config.proof_poll_period, + } + } + + async fn refresh_compute_units(&mut self) -> eyre::Result<()> { + let (active, pending) = self.get_compute_units().await?; + self.current_commitment = self.chain_connector.get_current_commitment_id().await?; + + if let Some(status) = self.get_commitment_status().await? { + // log status + match status { + CommitmentStatus::Active => { + self.active_compute_units.extend(active); + self.pending_compute_units.extend(pending); + self.refresh_commitment().await?; + } + _ => { + self.stop_commitment().await?; + } + } } + + Ok(()) } - pub fn start(self) -> JoinHandle<()> { + + pub fn start(mut self) -> JoinHandle<()> { let result = tokio::task::Builder::new() .name("ChainListener") .spawn(async move { - let client = WsClientBuilder::default() - .build(&self.config.ws_endpoint) - .await - .unwrap_or_else(|_| panic!("Could not connect to chain {}", self.config.ws_endpoint)); + let setup: eyre::Result<()> = try { + self.refresh_compute_units().await?; + }; + if let Err(err) = setup { + log::error!("ChainListener: compute units refresh error: {err}"); + panic!("ChainListener startup error: {err}"); + } + + let mut heads = self.subscribe_new_heads().await.expect("Could not subscribe to new heads"); + let mut cc_events = self.subscribe_cc_activated().await.expect("Could not subscribe to cc events"); + + let mut unit_activated: Option>= None; + let mut unit_deactivated: Option> = None; - let mut _heads = self.subscribe_new_heads(&client).await.expect("Could not subscribe to new heads"); - let mut cc_events = self.subscribe_cc_activated(&client).await.expect("Could not subscribe to cc events"); + let mut timer = IntervalStream::new(interval(self.timer_resolution)); loop { tokio::select! { - _ = _heads.next() => {}, + Some(header) = heads.next() => { + if let Err(err) = self.process_new_header(header).await { + log::error!("newHeads event processing error: {err}"); + } + }, Some(cc) = cc_events.next() => { - match cc { - Ok(event) => { - let res: eyre::Result<()> = try { - let cc_event = parse_log::(event)?; - self.save_cc_event(cc_event).await?; - }; - - if let Err(err) = res { - log::error!("CC event processing error: {err}"); - } + match self.process_commitment_activated(cc).await { + Err(err) => log::error!("CommitmentActivated event processing error: {err}"), + Ok((activated, deactivated)) => { + unit_activated = Some(activated); + unit_deactivated = Some(deactivated); } - Err(err) => { - log::error!("CC events subscription error: {err}"); - } - }; + } + }, + Some(event) = poll_subscription(&mut unit_activated) => { + if let Err(err) = self.process_unit_activated(event).await { + log::error!("UnitActivated event processing error: {err}"); + } + }, + Some(event) = poll_subscription(&mut unit_deactivated) => { + if let Err(err) = self.process_unit_deactivated(event).await { + log::error!("UnitDeactivated event processing error: {err}"); + } + }, + _ = timer.next() => { + if let Err(err) = self.submit_mocked_proofs().await { + log::error!("Failed to submit mocked proofs: {err}"); + } + // TODO: poll proofs from CCP + // self.poll_proofs().await?; } } } @@ -65,34 +187,299 @@ impl ChainListener { result } - pub async fn subscribe_new_heads( - &self, - client: &Client, - ) -> eyre::Result> { - let subs = client + async fn get_commitment_status(&self) -> eyre::Result> { + if let Some(commitment_id) = self.current_commitment.clone() { + let status = self + .chain_connector + .get_commitment_status(commitment_id) + .await?; + Ok(Some(status)) + } else { + Ok(None) + } + } + + /// Returns active and pending compute units + async fn get_compute_units(&self) -> eyre::Result<(Vec, Vec)> { + let units = self.chain_connector.get_compute_units().await?; + + // print info about all units + let (active, pending): (Vec, Vec) = units + .into_iter() + .filter(|cu| cu.deal.is_none()) + .partition(|unit| unit.start_epoch <= self.current_epoch); + + let active = active.into_iter().map(|unit| unit.id).collect(); + + Ok((active, pending)) + } + + async fn subscribe_new_heads(&self) -> eyre::Result> { + let subs = self + .ws_client .subscribe("eth_subscribe", rpc_params!["newHeads"], "eth_unsubscribe") .await?; Ok(subs) } - pub async fn subscribe_cc_activated(&self, client: &Client) -> eyre::Result> { - let topics = vec![CCActivatedData::topic(), peer_id_to_hex(self.host_id)]; + async fn subscribe_cc_activated(&self) -> eyre::Result> { + let topics = vec![ + CommitmentActivatedData::topic(), + peer_id_to_hex(self.host_id), + ]; let params = rpc_params![ "logs", json!({"address": self.config.cc_contract_address, "topics": topics}) ]; - let subs = client + let subs = self + .ws_client + .subscribe("eth_subscribe", params, "eth_unsubscribe") + .await?; + + Ok(subs) + } + + async fn subscribe_unit_activated( + &self, + commitment_id: &CommitmentId, + ) -> eyre::Result> { + let params = rpc_params![ + "logs", + json!({"address": self.config.cc_contract_address, "topics": vec![UnitActivatedData::topic(), hex::encode(&commitment_id.0)]}) + ]; + let subs = self + .ws_client .subscribe("eth_subscribe", params, "eth_unsubscribe") .await?; Ok(subs) } - pub async fn save_cc_event(&self, event: CCActivated) -> eyre::Result<()> { - let file_name = format!("{}.json", hex::encode(&event.info.commitment_id.0)); - let file_path = self.cc_events_dir.join(file_name); - tokio::fs::write(file_path, serde_json::to_string(&event)?).await?; + async fn subscribe_unit_deactivated( + &self, + commitment_id: &CommitmentId, + ) -> eyre::Result> { + let params = rpc_params![ + "logs", + json!({"address": self.config.cc_contract_address, "topics": vec![UnitDeactivatedData::topic(), hex::encode(&commitment_id.0)]}) + ]; + let subs = self + .ws_client + .subscribe("eth_subscribe", params, "eth_unsubscribe") + .await?; + + Ok(subs) + } + + async fn process_new_header( + &mut self, + header: Result, + ) -> eyre::Result<()> { + let block_timestamp = Self::parse_timestamp(header?)?; + + // `epoch_number = 1 + (block_timestamp - init_timestamp) / epoch_duration` + let epoch_number = + U256::from(1) + (block_timestamp - self.init_timestamp) / self.epoch_duration; + let epoch_changed = epoch_number > self.current_epoch; + + if epoch_changed { + self.current_epoch = epoch_number; + // nonce changes every epoch + self.global_nonce = self.chain_connector.get_global_nonce().await?; + + if let Some(status) = self.get_commitment_status().await? { + match status { + CommitmentStatus::Active => { + self.activate_pending_units().await?; + } + CommitmentStatus::Inactive + | CommitmentStatus::Failed + | CommitmentStatus::Removed => { + self.stop_commitment().await?; + } + CommitmentStatus::WaitDelegation => {} // log commitment is not active} + CommitmentStatus::WaitStart => {} // log commitment wait start} + } + } + } + + Ok(()) + } + + async fn process_commitment_activated( + &mut self, + event: Result, + ) -> eyre::Result<(Subscription, Subscription)> { + // add logs about activation + + let cc_event = parse_log::(event?)?; + let unit_ids = cc_event.info.unit_ids; + + let unit_activated = self + .subscribe_unit_activated(&cc_event.info.commitment_id) + .await?; + let unit_deactivated = self + .subscribe_unit_deactivated(&cc_event.info.commitment_id) + .await?; + + let is_cc_active = cc_event.info.start_epoch <= self.current_epoch; + + if is_cc_active { + self.active_compute_units = unit_ids.into_iter().collect(); + self.refresh_commitment().await?; + } else { + self.pending_compute_units = unit_ids + .into_iter() + .map(|id| ComputeUnit::new(id, cc_event.info.start_epoch)) + .collect(); + self.stop_commitment().await?; + } + + Ok((unit_activated, unit_deactivated)) + } + + async fn process_unit_activated( + &mut self, + event: Result, + ) -> eyre::Result<()> { + let unit_event = parse_log::(event?)?; + if self.current_epoch >= unit_event.info.start_epoch { + self.active_compute_units.insert(unit_event.info.unit_id); + self.refresh_commitment().await?; + } else { + // Will be activated on the `start_epoch` + self.pending_compute_units.insert(unit_event.info.into()); + } + Ok(()) + } + + /// Unit goes to Deal + async fn process_unit_deactivated( + &mut self, + event: Result, + ) -> eyre::Result<()> { + let unit_event = parse_log::(event?)?; + // add logs + self.active_compute_units.remove(&unit_event.info.unit_id); + self.pending_compute_units + .retain(|cu| cu.id == unit_event.info.unit_id); + self.refresh_commitment().await?; + self.acquire_deal_core(unit_event.info.unit_id)?; + Ok(()) + } + + /// Send GlobalNonce, Difficulty and Core<>CUID mapping (full commitment info) to CCP + async fn refresh_commitment(&self) -> eyre::Result<()> { + let _cores = self.acquire_active_units()?; + // self.ccp_client + // .on_active_commitment(self.global_nonce, self.difficulty, cores) + // .await?; + Ok(()) + } + + fn acquire_active_units(&self) -> eyre::Result>> { + let cores = self.core_manager.acquire_worker_core(AcquireRequest::new( + self.active_compute_units.clone().into_iter().collect(), + WorkType::CapacityCommitment, + ))?; + + Ok(cores + .physical_core_ids + .into_iter() + .zip( + self.active_compute_units + .clone() + .into_iter() + .map(OrHex::Data), + ) + .collect()) + } + + fn acquire_deal_core(&self, unit_id: CUID) -> eyre::Result<()> { + self.core_manager + .acquire_worker_core(AcquireRequest::new(vec![unit_id], WorkType::Deal))?; + Ok(()) + } + + async fn stop_commitment(&mut self) -> eyre::Result<()> { + // log stop commitment + self.active_compute_units.clear(); + self.pending_compute_units.clear(); + self.current_commitment = None; + // self.ccp_client.on_no_active_commitment().await?; Ok(()) } + + async fn activate_pending_units(&mut self) -> eyre::Result<()> { + let to_activate = self + .pending_compute_units + .extract_if(|unit| unit.start_epoch <= self.current_epoch) + .map(|cu| cu.id); + + self.active_compute_units.extend(to_activate); + self.refresh_commitment().await?; + Ok(()) + } + + /// Submit Mocked Proofs for all active compute units. + /// Mocked Proof has result_hash == difficulty and random local_nonce + async fn submit_mocked_proofs(&mut self) -> eyre::Result<()> { + let result_hash = ResultHash::from_slice(*self.difficulty.as_ref()); + + // proof_id is used only by CCP and is not sent to chain + let proof_id = CCProofId::new(self.global_nonce, self.difficulty, ProofIdx::zero()); + for unit in self.active_compute_units.clone().into_iter() { + let local_nonce = LocalNonce::random(); + self.submit_proof(CCProof::new(proof_id, local_nonce, unit, result_hash)) + .await?; + } + + Ok(()) + } + + async fn submit_proof(&mut self, proof: CCProof) -> eyre::Result<()> { + match self.chain_connector.submit_proof(proof).await { + Ok(_) => Ok(()), + Err(err) => { + match err { + ConnectorError::RpcCallError { ref data, .. } => { + // TODO: track proofs count per epoch and stop at maxProofsPerEpoch + if data.contains(TOO_MANY_PROOFS) { + // we stop unit until the next epoch if "TooManyProofs" error received + // TODO: acquire core for other units to help with proofs calculation + self.active_compute_units.remove(&proof.cu_id); + self.pending_compute_units + .insert(ComputeUnit::new(proof.cu_id, self.current_epoch + 1)); + self.refresh_commitment().await?; + // log about stopping + Ok(()) + } else if data.contains(COMMITMENT_IS_NOT_ACTIVE) { + // log about commitment is not active + self.stop_commitment().await?; + Ok(()) + } else { + log::error!("Failed to submit proof: {err}"); + Err(err.into()) + } + } + _ => { + log::error!("Failed to submit proof: {err}"); + Err(err.into()) + } + } + } + } + } + + fn parse_timestamp(header: Value) -> eyre::Result { + let timestamp = header + .as_object() + .and_then(|o| o.get("timestamp")) + .and_then(Value::as_str) + .ok_or(eyre::eyre!("newHeads: timestamp field not found"))? + .to_string(); + + Ok(U256::from_str_radix(×tamp, 16)?) + } } diff --git a/crates/chain-types/Cargo.toml b/crates/chain-types/Cargo.toml new file mode 100644 index 0000000000..4daf6d3b8f --- /dev/null +++ b/crates/chain-types/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "chain-types" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { workspace = true } +ethabi = { workspace = true } +eyre = { workspace = true } +chain-data = { workspace = true } +hex = { workspace = true } +ccp-shared = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["rt", "macros"] } diff --git a/crates/chain-types/src/commitment.rs b/crates/chain-types/src/commitment.rs new file mode 100644 index 0000000000..3bd605e1bc --- /dev/null +++ b/crates/chain-types/src/commitment.rs @@ -0,0 +1,72 @@ +use crate::commitment_status::CommitmentStatus; +use chain_data::{next_opt, parse_chain_data, ChainDataError}; +use ethabi::ethereum_types::U256; +use ethabi::{ParamType, Token}; + +/// struct CommitmentView { +/// CCStatus status; +/// bytes32 peerId; +/// uint256 collateralPerUnit; +/// uint256 unitCount; +/// uint256 startEpoch; +/// uint256 endEpoch; +/// uint256 rewardDelegatorRate; +/// address delegator; +/// uint256 totalCUFailCount; +/// uint256 failedEpoch; +/// uint256 exitedUnitCount; +/// } +pub struct Commitment { + pub status: CommitmentStatus, + pub start_epoch: U256, + pub end_epoch: U256, +} + +impl Commitment { + pub fn signature() -> Vec { + vec![ + ParamType::Uint(8), // CCStatus status + ParamType::FixedBytes(32), // bytes32 peerId + ParamType::Uint(256), // uint256 collateralPerUnit + ParamType::Uint(256), // uint256 unitCount + ParamType::Uint(256), // uint256 startEpoch + ParamType::Uint(256), // uint256 endEpoch + ParamType::Uint(256), // uint256 rewardDelegatorRate + ParamType::Address, // address delegator + ParamType::Uint(256), // uint256 totalCUFailCount + ParamType::Uint(256), // uint256 failedEpoch + ParamType::Uint(256), // uint256 exitedUnitCount + ] + } + + pub fn from(data: &str) -> Result { + let mut tokens = parse_chain_data(data, &Self::signature())?.into_iter(); + let status = next_opt( + &mut tokens, + "commitment_status", + CommitmentStatus::from_token, + )?; + let mut tokens = tokens.skip(3); + let start_epoch = next_opt(&mut tokens, "start_epoch", Token::into_uint)?; + let end_epoch = next_opt(&mut tokens, "end_epoch", Token::into_uint)?; + Ok(Commitment { + status, + start_epoch, + end_epoch, + }) + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn decode_commitment() { + let data = "0x00000000000000000000000000000000000000000000000000000000000000016497db93b32e4cdd979ada46a23249f444da1efb186cd74b9666bd03f710028b000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000012c00000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"; + let commitment = super::Commitment::from(data); + assert!(commitment.is_ok()); + let commitment = commitment.unwrap(); + assert_eq!(commitment.status, super::CommitmentStatus::WaitDelegation); + assert_eq!(commitment.start_epoch, 0.into()); + assert_eq!(commitment.end_epoch, 300.into()); + } +} diff --git a/crates/chain-types/src/commitment_status.rs b/crates/chain-types/src/commitment_status.rs new file mode 100644 index 0000000000..d5af88d328 --- /dev/null +++ b/crates/chain-types/src/commitment_status.rs @@ -0,0 +1,62 @@ +use chain_data::{next_opt, parse_chain_data, ChainDataError}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CommitmentStatus { + Active = 0, + // WaitDelegation - before collateral is deposited. + WaitDelegation, + // Status is WaitStart - means collateral deposited, and epoch should be proceed before Active. + WaitStart, + Inactive, + Failed, + Removed, +} + +impl CommitmentStatus { + pub fn signature() -> Vec { + vec![ethabi::ParamType::Uint(8)] + } + pub fn from_num(num: u8) -> Option { + match num { + 0 => Some(CommitmentStatus::Active), + 1 => Some(CommitmentStatus::WaitDelegation), + 2 => Some(CommitmentStatus::WaitStart), + 3 => Some(CommitmentStatus::Inactive), + 4 => Some(CommitmentStatus::Failed), + 5 => Some(CommitmentStatus::Removed), + _ => None, + } + } + + pub fn from_token(token: ethabi::Token) -> Option { + token + .into_uint() + .and_then(|u| Self::from_num(u.as_u64() as u8)) + } + + pub fn from(data: &str) -> Result { + let mut tokens = parse_chain_data(data, &Self::signature())?.into_iter(); + next_opt(&mut tokens, "commitment_status", Self::from_token) + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn decode_commitment_status() { + let data = "0x0000000000000000000000000000000000000000000000000000000000000001"; + let status = super::CommitmentStatus::from(data); + assert!(status.is_ok()); + let status = status.unwrap(); + assert_eq!(status, super::CommitmentStatus::WaitDelegation); + } + + #[tokio::test] + async fn decode_commitment_status_removed() { + let data = "0x0000000000000000000000000000000000000000000000000000000000000005"; + let status = super::CommitmentStatus::from(data); + assert!(status.is_ok()); + let status = status.unwrap(); + assert_eq!(status, super::CommitmentStatus::Removed); + } +} diff --git a/crates/chain-types/src/compute_peer.rs b/crates/chain-types/src/compute_peer.rs new file mode 100644 index 0000000000..18998671e8 --- /dev/null +++ b/crates/chain-types/src/compute_peer.rs @@ -0,0 +1,92 @@ +use crate::types::CommitmentId; +use chain_data::{next_opt, parse_chain_data, ChainDataError}; +use ethabi::ethereum_types::U256; +use ethabi::Token; + +/// struct ComputePeer { +/// bytes32 offerId; +/// bytes32 commitmentId; +/// uint256 unitCount; +/// address owner; +/// } +pub struct ComputePeer { + pub offer_id: Vec, + pub commitment_id: Option, + pub unit_count: U256, + pub owner: String, +} + +impl ComputePeer { + pub fn signature() -> Vec { + vec![ + ethabi::ParamType::FixedBytes(32), + ethabi::ParamType::FixedBytes(32), + ethabi::ParamType::Uint(256), + ethabi::ParamType::Address, + ] + } + pub fn from(data: &str) -> Result { + let mut tokens = parse_chain_data(data, &Self::signature())?.into_iter(); + let offer_id = next_opt(&mut tokens, "offer_id", Token::into_fixed_bytes)?; + let commitment_id = next_opt(&mut tokens, "commitment_id", Token::into_fixed_bytes)?; + + let commitment_id = if commitment_id.iter().all(|&x| x == 0) { + None + } else { + Some(CommitmentId(commitment_id)) + }; + + let unit_count = next_opt(&mut tokens, "unit_count", Token::into_uint)?; + let owner = next_opt(&mut tokens, "owner", Token::into_address)?; + + Ok(Self { + offer_id, + commitment_id, + unit_count, + owner: format!("{owner:#x}"), + }) + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn decode_compute_peer_no_commitment() { + let data = "0xaa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000020000000000000000000000005b73c5498c1e3b4dba84de0f1833c4a029d90519"; + let compute_peer = super::ComputePeer::from(data); + assert!(compute_peer.is_ok()); + let compute_peer = compute_peer.unwrap(); + assert_eq!( + hex::encode(compute_peer.offer_id), + "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5" + ); + assert_eq!(compute_peer.commitment_id, None); + assert_eq!(compute_peer.unit_count, 2.into()); + assert_eq!( + compute_peer.owner, + "0x5b73c5498c1e3b4dba84de0f1833c4a029d90519" + ); + } + + #[tokio::test] + async fn decode_compute_peer() { + let data = "0xaa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5aa3046a12a1aac6e840625e6329d70b427328feceedc8d273e5e6454b85633b5000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000005b73c5498c1e3b4dba84de0f1833c4a029d90519"; + let compute_peer = super::ComputePeer::from(data); + assert!(compute_peer.is_ok()); + let compute_peer = compute_peer.unwrap(); + assert_eq!( + hex::encode(compute_peer.offer_id), + "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5" + ); + assert!(compute_peer.commitment_id.is_some()); + assert_eq!( + hex::encode(compute_peer.commitment_id.unwrap().0), + "aa3046a12a1aac6e840625e6329d70b427328feceedc8d273e5e6454b85633b5" + ); + assert_eq!(compute_peer.unit_count, 10.into()); + assert_eq!( + compute_peer.owner, + "0x5b73c5498c1e3b4dba84de0f1833c4a029d90519" + ); + } +} diff --git a/crates/chain-types/src/compute_unit.rs b/crates/chain-types/src/compute_unit.rs new file mode 100644 index 0000000000..ae26eb4b51 --- /dev/null +++ b/crates/chain-types/src/compute_unit.rs @@ -0,0 +1,109 @@ +use ccp_shared::types::CUID; +use chain_data::{next_opt, parse_chain_data}; +use ethabi::ethereum_types::U256; +use ethabi::Token; + +/// struct ComputeUnitView { +/// bytes32 id; +/// address deal; +/// uint256 startEpoch; +/// } +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct ComputeUnit { + pub id: CUID, + /// if deal is zero-address, it means the unit is not assigned to any deal + pub deal: Option, + pub start_epoch: U256, +} + +impl ComputeUnit { + pub fn new(id: CUID, start_epoch: U256) -> Self { + Self { + id, + deal: None, + start_epoch, + } + } + pub fn signature() -> Vec { + vec![ + ethabi::ParamType::FixedBytes(32), + ethabi::ParamType::Address, + ethabi::ParamType::Uint(256), + ] + } + + pub fn from(data: &str) -> eyre::Result { + let mut tokens = parse_chain_data(data, &Self::signature())?.into_iter(); + Self::from_tokens(&mut tokens) + } + + pub fn from_token(token: Token) -> eyre::Result { + let mut tokens = next_opt( + &mut std::iter::once(token), + "compute_unit", + Token::into_tuple, + )? + .into_iter(); + Self::from_tokens(&mut tokens) + } + + pub fn from_tokens(data_tokens: &mut impl Iterator) -> eyre::Result { + let id = next_opt(data_tokens, "id", Token::into_fixed_bytes)?; + let deal = next_opt(data_tokens, "deal", Token::into_address)?; + + // if deal is zero-address, it means the unit is not assigned to any deal + let deal = if deal.is_zero() { + None + } else { + Some(format!("{deal:#x}")) + }; + + let start_epoch = next_opt(data_tokens, "start_epoch", Token::into_uint)?; + Ok(ComputeUnit { + id: CUID::new(id.as_slice().try_into()?), + deal, + start_epoch, + }) + } +} + +#[cfg(test)] +mod tests { + use ccp_shared::types::CUID; + use hex::FromHex; + + #[tokio::test] + async fn decode_compute_unit() { + let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d50000000000000000000000005e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c00000000000000000000000000000000000000000000000000000000000003e8"; + let compute_unit = super::ComputeUnit::from(data); + assert!(compute_unit.is_ok()); + let compute_unit = compute_unit.unwrap(); + + assert_eq!( + compute_unit.id, + ::from_hex("aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5") + .unwrap() + ); + assert!(compute_unit.deal.is_some()); + assert_eq!( + compute_unit.deal.unwrap(), + "0x5e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c" + ); + assert_eq!(compute_unit.start_epoch, 1000.into()); + } + + #[tokio::test] + async fn decode_compute_unit_no_deal() { + let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003e8"; + let compute_unit = super::ComputeUnit::from(data); + assert!(compute_unit.is_ok()); + let compute_unit = compute_unit.unwrap(); + assert_eq!( + compute_unit.id, + ::from_hex("aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5") + .unwrap() + ); + assert!(compute_unit.deal.is_none()); + assert_eq!(compute_unit.start_epoch, 1000.into()); + } +} diff --git a/crates/chain-types/src/errors.rs b/crates/chain-types/src/errors.rs new file mode 100644 index 0000000000..4b04e7c6c9 --- /dev/null +++ b/crates/chain-types/src/errors.rs @@ -0,0 +1,2 @@ +pub const COMMITMENT_IS_NOT_ACTIVE: &str = "0x08c379a000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000021436170616369747920636f6d6d69746d656e74206973206e6f742061637469766500000000000000000000000000000000000000000000000000000000000000"; +pub const TOO_MANY_PROOFS: &str = "0xe5d50da7"; diff --git a/crates/chain-types/src/lib.rs b/crates/chain-types/src/lib.rs new file mode 100644 index 0000000000..76fc647311 --- /dev/null +++ b/crates/chain-types/src/lib.rs @@ -0,0 +1,13 @@ +mod commitment; +mod commitment_status; +mod compute_peer; +mod compute_unit; +mod errors; +mod types; + +pub use commitment::Commitment; +pub use commitment_status::CommitmentStatus; +pub use compute_peer::ComputePeer; +pub use compute_unit::ComputeUnit; +pub use errors::*; +pub use types::*; diff --git a/crates/chain-types/src/types.rs b/crates/chain-types/src/types.rs new file mode 100644 index 0000000000..7e3cde6b7b --- /dev/null +++ b/crates/chain-types/src/types.rs @@ -0,0 +1,4 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub struct CommitmentId(pub Vec); diff --git a/crates/core-manager/Cargo.toml b/crates/core-manager/Cargo.toml index 53793ce8f9..17d8c2af99 100644 --- a/crates/core-manager/Cargo.toml +++ b/crates/core-manager/Cargo.toml @@ -8,8 +8,8 @@ edition = "2021" [dependencies] fxhash = "0.2.1" range-set-blaze = "0.1.14" -cpu-utils = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" } -ccp-shared = { git = "https://github.com/fluencelabs/capacity-commitment-prover/", branch = "main" } +cpu-utils = { workspace = true } +ccp-shared = { workspace = true } multimap = { version = "0.10.0", features = ["serde"] } bimap = { version = "0.6.3", features = ["serde"] } toml = { workspace = true } diff --git a/crates/created-swarm/src/lib.rs b/crates/created-swarm/src/lib.rs index 6e63a0b567..c1b7c8832f 100644 --- a/crates/created-swarm/src/lib.rs +++ b/crates/created-swarm/src/lib.rs @@ -34,6 +34,7 @@ pub use crate::services::*; pub use crate::swarm::*; pub use server_config::system_services_config; +pub use server_config::ChainConfig; pub use fluence_app_service; pub use fluence_spell_dtos; diff --git a/crates/created-swarm/src/swarm.rs b/crates/created-swarm/src/swarm.rs index 4654b7aa54..dca7cc8c0e 100644 --- a/crates/created-swarm/src/swarm.rs +++ b/crates/created-swarm/src/swarm.rs @@ -40,7 +40,7 @@ use futures::stream::iter; use nox::{Connectivity, Node}; use particle_protocol::ProtocolConfig; use server_config::{ - persistent_dir, system_services_config, BootstrapConfig, ChainListenerConfig, ResolvedConfig, + persistent_dir, system_services_config, BootstrapConfig, ChainConfig, ResolvedConfig, UnresolvedConfig, }; use tempfile::TempDir; @@ -272,7 +272,7 @@ pub struct SwarmConfig { pub override_system_services_config: Option, pub http_port: u16, pub connector_api_endpoint: Option, - pub chain_listener: Option, + pub chain_config: Option, pub cc_events_dir: Option, } @@ -300,7 +300,7 @@ impl SwarmConfig { override_system_services_config: None, http_port: 0, connector_api_endpoint: None, - chain_listener: None, + chain_config: None, cc_events_dir: None, } } @@ -405,6 +405,7 @@ pub async fn create_swarm_with_runtime( if let Some(config) = config.override_system_services_config.clone() { resolved.system_services = config; } + // `enable_system_services` has higher priority then `enable` field of the SystemServicesConfig resolved.system_services.enable = config .enabled_system_services @@ -424,7 +425,7 @@ pub async fn create_swarm_with_runtime( .public() .to_peer_id(); resolved.node_config.management_peer_id = management_peer_id; - resolved.chain_listener_config = config.chain_listener.clone(); + resolved.chain_config = config.chain_config.clone(); let vm_config = vm_config(BaseVmConfig { peer_id, diff --git a/crates/nox-tests/Cargo.toml b/crates/nox-tests/Cargo.toml index 97ed16ef6f..ca3b0c5a58 100644 --- a/crates/nox-tests/Cargo.toml +++ b/crates/nox-tests/Cargo.toml @@ -58,7 +58,8 @@ base64 = { workspace = true } tokio = { workspace = true } log = { workspace = true } tracing = { workspace = true } -mockito = "1.2.0" +mockito = { workspace = true } tempfile = { workspace = true } jsonrpsee = { workspace = true, features = ["server"] } hex = { workspace = true } +clarity = { workspace = true } diff --git a/crates/nox-tests/tests/chain_listener.rs b/crates/nox-tests/tests/chain_listener.rs index 3774d883a4..057be3dff8 100644 --- a/crates/nox-tests/tests/chain_listener.rs +++ b/crates/nox-tests/tests/chain_listener.rs @@ -1,104 +1,110 @@ -use created_swarm::make_swarms_with_cfg; -use fs_utils::list_files; -use jsonrpsee::core::JsonValue; -use jsonrpsee::server::Server; -use jsonrpsee::{RpcModule, SubscriptionMessage}; -use server_config::ChainListenerConfig; -use tempfile::TempDir; -use tokio::task::JoinHandle; +// use jsonrpsee::core::JsonValue; +// use jsonrpsee::server::Server; +// use jsonrpsee::{RpcModule, SubscriptionMessage}; +// use tokio::task::JoinHandle; -async fn run_server() -> eyre::Result<(String, JoinHandle<()>)> { - let server = Server::builder() - .set_message_buffer_capacity(10) - .build("127.0.0.1:0") - .await?; - let mut module = RpcModule::new(()); - module - .register_subscription( - "eth_subscribe", - "eth_subscription", - "eth_unsubscribe", - |params, pending, _ctx| async move { - let cc_event_log: JsonValue = serde_json::from_str(r#" - { - "address": "0xdc64a140aa3e981100a9beca4e685f962f0cf6c9", - "topics": [ - "0xcd92fc03744bba25ad966bdc1127f8996e70c551d1ee4a88ce7fb0e596069649", - "0x246cd65bc58db104674f76c9b1340eb16881d9ef90e33d4b1086ebd334f4002d", - "0xd6996a1d0950671fa4ae2642e9bfdb7e4c7832a35c640cdb47ecb8b8002b77b5" - ], - "data": "0x00000000000000000000000000000000000000000000000000000000009896800000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000a4c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc9", - "blockHash": "0x85f298013ad4ea98d279ef543d8efc3aac0563aabbb623567f7146e9a24aa000", - "blockNumber": "0x6", - "transactionHash": "0xcad9a9e67e1503513973b2c73052bb64b2709ea4578b04e0464624d7a2f92e61", - "transactionIndex": "0x0", - "logIndex": "0x0", - "transactionLogIndex": "0x0", - "removed": false - }"#).unwrap(); - - let new_heads: JsonValue = serde_json::from_str( r#" - { - "difficulty": "0x15d9223a23aa", - "extraData": "0xd983010305844765746887676f312e342e328777696e646f7773", - "gasLimit": "0x47e7c4", - "gasUsed": "0x38658", - "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", - "miner": "0xf8b483dba2c3b7176a3da549ad41a48bb3121069", - "nonce": "0x084149998194cc5f", - "number": "0x1348c9", - "parentHash": "0x7736fab79e05dc611604d22470dadad26f56fe494421b5b333de816ce1f25701", - "receiptRoot": "0x2fab35823ad00c7bb388595cb46652fe7886e00660a01e867824d3dceb1c8d36", - "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", - "stateRoot": "0xb3346685172db67de536d8765c43c31009d0eb3bd9c501c9be3229203f15f378", - "timestamp": "0x56ffeff8", - "transactionsRoot": "0x0167ffa60e3ebc0b080cdb95f7c0087dd6c0e61413140e39d94d3468d7c9689f" - }"#).unwrap(); - if let Ok(sub) = params.sequence().next::() { - let sink = pending.accept().await.unwrap(); - - if sub == "newHeads" { - sink.send(SubscriptionMessage::from_json(&new_heads).unwrap()) - .await.unwrap(); - } - - if sub == "logs" { - sink.send(SubscriptionMessage::from_json(&cc_event_log).unwrap()) - .await.unwrap(); - } - } - }, - ) - .unwrap(); - - let addr = server.local_addr()?; - let handle = server.start(module); - - let handle = tokio::spawn(handle.stopped()); - Ok((addr.to_string(), handle)) -} - -#[tokio::test] -async fn test_chain_listener_cc() { - let (addr, server) = run_server().await.unwrap(); - let url = format!("ws://{}", addr); - let events_dir = TempDir::new().unwrap(); - let cc_events_dir = events_dir.path().to_path_buf(); - let _swarm = make_swarms_with_cfg(1, move |mut cfg| { - cfg.chain_listener = Some(ChainListenerConfig { - ws_endpoint: url.clone(), - cc_contract_address: "".to_string(), - }); - - cfg.cc_events_dir = Some(cc_events_dir.clone()); - cfg - }) - .await; - - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - - let event_files = list_files(events_dir.path()).unwrap().collect::>(); - assert_eq!(event_files.len(), 1); - - server.abort(); -} +// async fn run_server() -> eyre::Result<(String, JoinHandle<()>)> { +// let server = Server::builder() +// .set_message_buffer_capacity(10) +// .build("127.0.0.1:0") +// .await?; +// let mut module = RpcModule::new(()); +// module +// .register_subscription( +// "eth_subscribe", +// "eth_subscription", +// "eth_unsubscribe", +// |params, pending, _ctx| async move { +// let cc_event_log: JsonValue = serde_json::from_str(r#" +// { +// "address": "0xdc64a140aa3e981100a9beca4e685f962f0cf6c9", +// "topics": [ +// "0xcd92fc03744bba25ad966bdc1127f8996e70c551d1ee4a88ce7fb0e596069649", +// "0x246cd65bc58db104674f76c9b1340eb16881d9ef90e33d4b1086ebd334f4002d", +// "0xd6996a1d0950671fa4ae2642e9bfdb7e4c7832a35c640cdb47ecb8b8002b77b5" +// ], +// "data": "0x00000000000000000000000000000000000000000000000000000000009896800000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000a4c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc94c5951c05568dac529f66b98217d93627dd10f3070a928b427ad82cdadc72cc9", +// "blockHash": "0x85f298013ad4ea98d279ef543d8efc3aac0563aabbb623567f7146e9a24aa000", +// "blockNumber": "0x6", +// "transactionHash": "0xcad9a9e67e1503513973b2c73052bb64b2709ea4578b04e0464624d7a2f92e61", +// "transactionIndex": "0x0", +// "logIndex": "0x0", +// "transactionLogIndex": "0x0", +// "removed": false +// }"#).unwrap(); +// +// let new_heads: JsonValue = serde_json::from_str( r#" +// { +// "difficulty": "0x15d9223a23aa", +// "extraData": "0xd983010305844765746887676f312e342e328777696e646f7773", +// "gasLimit": "0x47e7c4", +// "gasUsed": "0x38658", +// "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", +// "miner": "0xf8b483dba2c3b7176a3da549ad41a48bb3121069", +// "nonce": "0x084149998194cc5f", +// "number": "0x1348c9", +// "parentHash": "0x7736fab79e05dc611604d22470dadad26f56fe494421b5b333de816ce1f25701", +// "receiptRoot": "0x2fab35823ad00c7bb388595cb46652fe7886e00660a01e867824d3dceb1c8d36", +// "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", +// "stateRoot": "0xb3346685172db67de536d8765c43c31009d0eb3bd9c501c9be3229203f15f378", +// "timestamp": "0x56ffeff8", +// "transactionsRoot": "0x0167ffa60e3ebc0b080cdb95f7c0087dd6c0e61413140e39d94d3468d7c9689f" +// }"#).unwrap(); +// if let Ok(sub) = params.sequence().next::() { +// let sink = pending.accept().await.unwrap(); +// +// if sub == "newHeads" { +// sink.send(SubscriptionMessage::from_json(&new_heads).unwrap()) +// .await.unwrap(); +// } +// +// if sub == "logs" { +// sink.send(SubscriptionMessage::from_json(&cc_event_log).unwrap()) +// .await.unwrap(); +// } +// } +// }, +// ) +// .unwrap(); +// +// let addr = server.local_addr()?; +// let handle = server.start(module); +// +// let handle = tokio::spawn(handle.stopped()); +// Ok((addr.to_string(), handle)) +// } +// +// #[tokio::test] +// async fn test_chain_listener_cc() { +// let (addr, server) = run_server().await.unwrap(); +// let url = format!("ws://{}", addr); +// let events_dir = TempDir::new().unwrap(); +// let cc_events_dir = events_dir.path().to_path_buf(); +// let _swarm = make_swarms_with_cfg(1, move |mut cfg| { +// cfg.chain_listener = Some(ChainConfig { +// ws_endpoint: url.clone(), +// http_endpoint: "".to_string(), +// cc_contract_address: "".to_string(), +// core_contract_address: "".to_string(), +// market_contract_address: "".to_string(), +// network_id: 0, +// wallet_key: PrivateKey::from_str( +// "0xfdc4ba94809c7930fe4676b7d845cbf8fa5c1beae8744d959530e5073004cf3f", +// ) +// .unwrap(), +// ccp_endpoint: "".to_string(), +// timer_resolution: Default::default(), +// }); +// +// cfg.cc_events_dir = Some(cc_events_dir.clone()); +// cfg +// }) +// .await; +// +// tokio::time::sleep(std::time::Duration::from_millis(200)).await; +// +// let event_files = list_files(events_dir.path()).unwrap().collect::>(); +// assert_eq!(event_files.len(), 1); +// +// server.abort(); +// } diff --git a/crates/server-config/Cargo.toml b/crates/server-config/Cargo.toml index f8d042e3f0..9090ee5da3 100644 --- a/crates/server-config/Cargo.toml +++ b/crates/server-config/Cargo.toml @@ -34,6 +34,7 @@ derivative = { workspace = true } bytesize = { version = "1.3.0", features = ["serde"] } serde_with = { workspace = true } config = { version = "0.13.4", default-features = false, features = ["toml"] } +clarity = { workspace = true } [dev-dependencies] temp-env = "0.3.6" diff --git a/crates/server-config/src/lib.rs b/crates/server-config/src/lib.rs index 0078c2b111..c99b19d906 100644 --- a/crates/server-config/src/lib.rs +++ b/crates/server-config/src/lib.rs @@ -48,7 +48,7 @@ pub use resolved_config::ConfigData; pub use bootstrap_config::BootstrapConfig; pub use kademlia_config::KademliaConfig; pub use network_config::NetworkConfig; -pub use node_config::{ChainListenerConfig, NodeConfig, TransportConfig}; +pub use node_config::{ChainConfig, ChainListenerConfig, NodeConfig, TransportConfig}; pub use resolved_config::ConsoleConfig; pub use resolved_config::LogConfig; pub use resolved_config::LogFormat; diff --git a/crates/server-config/src/node_config.rs b/crates/server-config/src/node_config.rs index 3ed180fa0b..9743115b9d 100644 --- a/crates/server-config/src/node_config.rs +++ b/crates/server-config/src/node_config.rs @@ -5,6 +5,7 @@ use std::path::{Path, PathBuf}; use std::time::Duration; use base64::{engine::general_purpose::STANDARD as base64, Engine}; +use clarity::PrivateKey; use core_manager::CoreRange; use derivative::Derivative; use eyre::eyre; @@ -135,7 +136,8 @@ pub struct UnresolvedNodeConfig { #[serde(default)] pub system_services: SystemServicesConfig, - #[serde(flatten)] + pub chain_config: Option, + pub chain_listener_config: Option, } @@ -195,6 +197,7 @@ impl UnresolvedNodeConfig { allowed_binaries, system_services: self.system_services, http_config: self.http_config, + chain_config: self.chain_config, chain_listener_config: self.chain_listener_config, }; @@ -366,6 +369,8 @@ pub struct NodeConfig { pub http_config: Option, + pub chain_config: Option, + pub chain_listener_config: Option, } @@ -527,9 +532,23 @@ impl KeypairConfig { } } +#[derive(Clone, Deserialize, Serialize, Derivative)] +#[derivative(Debug)] +pub struct ChainConfig { + pub http_endpoint: String, + // TODO get all addresses from Core contract + pub core_contract_address: String, + pub cc_contract_address: String, + pub market_contract_address: String, + pub network_id: u64, + pub wallet_key: PrivateKey, +} + #[derive(Clone, Deserialize, Serialize, Derivative)] #[derivative(Debug)] pub struct ChainListenerConfig { pub ws_endpoint: String, - pub cc_contract_address: String, + pub ccp_endpoint: Option, + /// How often to poll proofs + pub proof_poll_period: Duration, } diff --git a/crates/spell-event-bus/src/api.rs b/crates/spell-event-bus/src/api.rs index 286ef1a734..bbb0b88021 100644 --- a/crates/spell-event-bus/src/api.rs +++ b/crates/spell-event-bus/src/api.rs @@ -97,7 +97,7 @@ impl From for TriggerInfoAqua { impl From for TriggerInfo { fn from(i: TriggerInfoAqua) -> Self { - match (i.timer.get(0), i.peer.get(0)) { + match (i.timer.first(), i.peer.first()) { (Some(t), None) => Self::Timer(t.clone()), (None, Some(p)) => Self::Peer(p.clone()), _ => unreachable!("TriggerInfoAqua should always have either timer or peer event"), diff --git a/crates/system-services/Cargo.toml b/crates/system-services/Cargo.toml index 1312238a26..0cc65d7836 100644 --- a/crates/system-services/Cargo.toml +++ b/crates/system-services/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] aqua-ipfs-distro = "=0.6.0" -decider-distro = "=0.6.7" +decider-distro = "=0.6.8" registry-distro = "=0.9.4" trust-graph-distro = "=0.4.11" diff --git a/nox/Cargo.toml b/nox/Cargo.toml index 3eba1adcb4..099f720e6e 100644 --- a/nox/Cargo.toml +++ b/nox/Cargo.toml @@ -32,6 +32,7 @@ workers = { workspace = true } system-services = { workspace = true } spell-service-api = { workspace = true } chain-listener = { workspace = true } +chain-connector = { workspace = true } fluence-keypair = { workspace = true } @@ -70,6 +71,7 @@ opentelemetry-stdout = { version = "0.2.0", features = ["trace"] } once_cell = { workspace = true } config = "0.13.4" tonic = "0.9.2" +jsonrpsee = { workspace = true, features = ["ws-client", "macros"] } [dev-dependencies] parking_lot = { workspace = true } diff --git a/nox/src/node.rs b/nox/src/node.rs index 6ef0bd7abf..13ffef654e 100644 --- a/nox/src/node.rs +++ b/nox/src/node.rs @@ -38,6 +38,7 @@ use aquamarine::{ AquaRuntime, AquamarineApi, AquamarineApiError, AquamarineBackend, DataStoreConfig, RemoteRoutingEffects, VmPoolConfig, }; +use chain_connector::ChainConnector; use chain_listener::ChainListener; use config_utils::to_peer_id; use connection_pool::ConnectionPoolT; @@ -45,6 +46,7 @@ use core_manager::manager::CoreManager; use fluence_keypair::KeyPair; use fluence_libp2p::build_transport; use health::HealthCheckRegistry; +use jsonrpsee::ws_client::WsClientBuilder; use particle_builtins::{Builtins, CustomService, NodeInfo}; use particle_execution::ParticleFunctionStatic; use particle_protocol::ExtendedParticle; @@ -325,6 +327,17 @@ impl Node { let services = builtins.services.clone(); let modules = builtins.modules.clone(); + let connector = if let Some(chain_config) = config.chain_config.clone() { + let host_id = scopes.get_host_peer_id(); + let (chain_connector, chain_builtins) = + ChainConnector::new(chain_config.clone(), host_id)?; + custom_service_functions.extend(chain_builtins.into_iter()); + Some(chain_connector) + } else { + // TODO: log warning, exit with error if decider in on + None + }; + custom_service_functions.into_iter().for_each( move |( service_id, @@ -360,10 +373,29 @@ impl Node { system_services_deployer.versions(), ); - let chain_listener = if let Some(chain_config) = config.chain_listener_config.clone() { + let chain_listener = if let (Some(connector), Some(chain_config), Some(listener_config)) = ( + connector, + config.chain_config.clone(), + config.chain_listener_config.clone(), + ) { let cc_events_dir = config.dir_config.cc_events_dir.clone(); let host_id = scopes.get_host_peer_id(); - let chain_listener = ChainListener::new(chain_config, cc_events_dir, host_id); + // print init params + let init_params = connector.get_cc_init_params().await?; + let ws_client = WsClientBuilder::default() + .build(&listener_config.ws_endpoint) + .await?; // todo write error msg + let chain_listener = ChainListener::new( + chain_config, + listener_config, + cc_events_dir, + host_id, + connector, + core_manager.clone(), + init_params, + ws_client, + ) + .await; Some(chain_listener) } else { None diff --git a/particle-builtins/src/builtins.rs b/particle-builtins/src/builtins.rs index 735b3141f4..250e65cb67 100644 --- a/particle-builtins/src/builtins.rs +++ b/particle-builtins/src/builtins.rs @@ -912,7 +912,7 @@ where let result: Result = try { let data: Vec = Args::next("data", &mut args)?; - let tetraplet = tetraplets.get(0).map(|v| v.as_slice()); + let tetraplet = tetraplets.first().map(|v| v.as_slice()); if let Some([t]) = tetraplet { if self.scopes.scope(PeerId::from_str(&t.peer_pk)?).is_err() { return Err(JError::new(format!(