diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 918a9ae679..71d1491ea4 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -84,6 +84,7 @@ jobs: - nox-snapshot uses: fluencelabs/cli/.github/workflows/tests.yml@main with: + ref: up-clients nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}" js-client: diff --git a/Cargo.lock b/Cargo.lock index 0057d613aa..e6381f95df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1479,6 +1479,7 @@ dependencies = [ "alloy-sol-types", "alloy_serde_macro", "backoff", + "bs58", "ccp-rpc-client", "ccp-shared", "chain-connector", @@ -2207,9 +2208,9 @@ dependencies = [ [[package]] name = "decider-distro" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f87fc5b72d3931ea4017ee61d7c99a7f98ec6f337b877f4e78fe14a708cb1ec" +checksum = "828361c3930c4a44400ac0c4003b9cc5754894b052b55a9781d3bfd4455435ef" dependencies = [ "built 0.7.1", "fluence-spell-dtos", diff --git a/crates/chain-connector/src/builtins.rs b/crates/chain-connector/src/builtins.rs index 5fc34139ff..2d79aa3e72 100644 --- a/crates/chain-connector/src/builtins.rs +++ b/crates/chain-connector/src/builtins.rs @@ -16,9 +16,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -use crate::types::{SubnetResolveResult, TxReceiptResult, TxStatus, Worker}; +use crate::types::{OnChainWorkerId, SubnetResolveResult, SubnetWorker, TxReceiptResult, TxStatus}; use crate::{ChainConnector, HttpChainConnector}; -use ccp_shared::types::CUID; use futures::FutureExt; use particle_args::{Args, JError}; use particle_builtins::{wrap, CustomService}; @@ -111,14 +110,14 @@ async fn register_worker_builtin( let mut args = args.function_args.into_iter(); let deal_id: DealId = Args::next("deal_id", &mut args)?; let worker_id: WorkerId = Args::next("worker_id", &mut args)?; - let cu_ids: Vec = Args::next("cu_id", &mut args)?; + let onchain_worker_id: OnChainWorkerId = Args::next("onchain_worker_id", &mut args)?; - if cu_ids.len() != 1 { - return Err(JError::new("Only one cu_id is allowed")); + if onchain_worker_id.is_empty() { + return Err(JError::new("Invalid onchain_worker_id: empty")); } let tx_hash = connector - .register_worker(&deal_id, worker_id, cu_ids[0]) + .register_worker(&deal_id, worker_id, onchain_worker_id) .await .map_err(|err| JError::new(format!("Failed to register worker: {err}")))?; Ok(json!(tx_hash)) @@ -164,7 +163,7 @@ async fn resolve_subnet_builtin( let deal_id: String = Args::next("deal_id", &mut args.function_args.into_iter())?; let deal_id = DealId::from(deal_id); - let workers: eyre::Result> = try { + let workers: eyre::Result> = try { if !deal_id.is_valid() { Err(eyre::eyre!( "Invalid deal id '{}': invalid length", @@ -172,10 +171,10 @@ async fn resolve_subnet_builtin( ))?; } - let units = connector.get_deal_compute_units(&deal_id).await?; - let workers: Result, _> = units + let workers = connector.get_deal_workers(&deal_id).await?; + let workers: Result, _> = workers .into_iter() - .map(|unit| Worker::try_from(unit)) + .map(|worker| SubnetWorker::try_from(worker)) .collect(); workers? }; diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs index b5499011be..1370e888ac 100644 --- a/crates/chain-connector/src/connector.rs +++ b/crates/chain-connector/src/connector.rs @@ -42,7 +42,7 @@ use crate::types::*; use crate::ConnectorError::{FieldNotFound, InvalidU256, ResponseParseError}; use crate::Deal::CIDV1; use crate::Offer::{ComputePeer, ComputeUnit}; -use crate::{CCStatus, Capacity, CommitmentId, Core, Deal, Offer}; +use crate::{CCStatus, Capacity, CommitmentId, Core, Deal, Offer, OnChainWorkerID}; use chain_data::{peer_id_to_bytes, BlockHeader}; use fluence_libp2p::PeerId; use hex_utils::{decode_hex, encode_hex_0x}; @@ -72,7 +72,11 @@ pub trait ChainConnector: Send + Sync { async fn get_deal_statuses(&self, deal_ids: Vec) -> Result>>; - async fn exit_deal(&self, cu_id: &CUID) -> Result; + async fn exit_deal( + &self, + deal_id: &DealId, + on_chain_worker_id: OnChainWorkerID, + ) -> Result; async fn get_tx_statuses(&self, tx_hashes: Vec) -> Result>>>; @@ -151,7 +155,7 @@ impl HttpChainConnector { pub(crate) async fn get_deals(&self) -> eyre::Result> { let units = self.get_compute_units().await?; tracing::debug!(target: "chain-connector", "get_deals: Got {} compute units", units.len()); - let mut deals: BTreeMap> = BTreeMap::new(); + let mut deals: BTreeMap>> = BTreeMap::new(); units .iter() @@ -160,27 +164,47 @@ impl HttpChainConnector { deals .entry(unit.deal.to_string().into()) .or_default() + .entry(unit.onchainWorkerId.as_slice().into()) + .or_default() .push(CUID::new(unit.id.into())); }); + // For now, we forbid multiple workers for one deal on the peer! + deals.retain(|deal_id, worker| { + if worker.keys().len() > 1 { + tracing::error!(target: "chain-connector", "Deal {deal_id} has more then one worker for the peer which is forbiden at the moment"); + false + } else { + true + } + }); + if deals.is_empty() { return Ok(Vec::new()); } + tracing::debug!(target: "chain-connector", "get_deals: Got {} deals: {:?}", deals.len(), deals); let infos = self.get_deal_infos(deals.keys()).await?; tracing::debug!(target: "chain-connector", "get_deals: Got {} deals infos: {:?}", infos.len(), infos); let deals = infos .into_iter() .zip(deals) - .map(|(details, (deal_id, unit_ids))| match details { - Ok((status, app_cid)) => DealResult::new( - deal_id, - DealInfo { - unit_ids, - status, - app_cid, - }, - ), + .map(|(details, (deal_id, mut workers))| match details { + Ok((status, app_cid)) => { + if let Some((onchain_worker_id, cu_ids)) = workers.pop_first() { + DealResult::new( + deal_id, + DealInfo { + cu_ids, + status, + app_cid, + onchain_worker_id, + }, + ) + } else { + DealResult::with_error(deal_id, "No CUs are found for the deal".into()) + } + } Err(err) => DealResult::with_error(deal_id, err.to_string()), }) .collect::<_>(); @@ -348,14 +372,14 @@ impl HttpChainConnector { &self, deal_id: &DealId, worker_id: WorkerId, - cu_id: CUID, + onchain_worker_id: OnChainWorkerId, ) -> Result { - let data = Deal::setWorkerCall { - computeUnitId: cu_id.as_ref().into(), - workerId: peer_id_to_bytes(worker_id.into()).into(), + let data = Deal::activateWorkerCall { + onchainId: FixedBytes::from_slice(&onchain_worker_id), + offchainId: peer_id_to_bytes(worker_id.into()).into(), } .abi_encode(); - tracing::debug!(target: "chain-connector", "Registering worker {worker_id} for deal {deal_id} with cu_id {cu_id}"); + tracing::debug!(target: "chain-connector", "Registering worker {worker_id} for deal {deal_id} with onchain_id {}", encode_hex_0x(onchain_worker_id)); self.send_tx(data, &deal_id.to_address()).await } @@ -365,8 +389,8 @@ impl HttpChainConnector { self.make_latest_diamond_rpc_params(data) } - pub async fn get_deal_compute_units(&self, deal_id: &DealId) -> Result> { - let data = Deal::getComputeUnitsCall {}.abi_encode(); + pub async fn get_deal_workers(&self, deal_id: &DealId) -> Result> { + let data = Deal::getWorkersCall {}.abi_encode(); let resp: String = process_response( self.client .request( @@ -376,9 +400,9 @@ impl HttpChainConnector { .await, )?; let bytes = decode_hex(&resp)?; - let compute_units = as SolType>::abi_decode(&bytes, true)?; + let workers = as SolType>::abi_decode(&bytes, true)?; - Ok(compute_units) + Ok(workers) } fn init_timestamp_params(&self) -> ArrayParams { @@ -580,14 +604,17 @@ impl ChainConnector for HttpChainConnector { Ok(statuses) } - async fn exit_deal(&self, cu_id: &CUID) -> Result { - let data = Offer::returnComputeUnitFromDealCall { - unitId: cu_id.as_ref().into(), + async fn exit_deal( + &self, + deal_id: &DealId, + onchain_worker_id: OnChainWorkerID, + ) -> Result { + let data = Deal::removeWorkerCall { + onchainId: onchain_worker_id, } .abi_encode(); - self.send_tx(data, &self.config.diamond_contract_address) - .await + self.send_tx(data, &deal_id.to_address()).await } async fn get_tx_statuses(&self, tx_hashes: Vec) -> Result>>> { @@ -640,21 +667,21 @@ mod tests { use alloy_primitives::uint; use alloy_primitives::U256; - use std::assert_matches::assert_matches; - use std::str::FromStr; - use std::sync::Arc; - + use alloy_sol_types::sol_data::Array; + use alloy_sol_types::SolType; use ccp_shared::types::{Difficulty, GlobalNonce, LocalNonce, ResultHash, CUID}; + use chain_data::peer_id_from_hex; use clarity::PrivateKey; + use fluence_libp2p::RandomPeerId; use hex::FromHex; + use hex_utils::{decode_hex, encode_hex_0x}; + use log_utils::{enable_logs_for, LogSpec}; use mockito::Matcher; use serde::Deserialize; use serde_json::json; - - use chain_data::peer_id_from_hex; - use fluence_libp2p::RandomPeerId; - use hex_utils::decode_hex; - use log_utils::{enable_logs_for, LogSpec}; + use std::assert_matches::assert_matches; + use std::str::FromStr; + use std::sync::Arc; use crate::connector::TxStatus; use crate::Deal::Status::ACTIVE; @@ -686,7 +713,30 @@ mod tests { #[tokio::test] async fn test_get_compute_units() { - let expected_data = "0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000025d204dcc21f59c2a2098a277e48879207f614583e066654ad6736d36815ebb9e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000450e2f2a5bdb528895e9005f67e70fe213b9b822122e96fd85d2238cae55b6f900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"; + let cu_1 = crate::Offer::ComputeUnit { + id: alloy_primitives::hex!( + "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5" + ) + .into(), + deal: Default::default(), + startEpoch: Default::default(), + onchainWorkerId: Default::default(), + }; + + let cu_2 = crate::Offer::ComputeUnit { + id: alloy_primitives::hex!( + "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1" + ) + .into(), + deal: Default::default(), + startEpoch: Default::default(), + onchainWorkerId: Default::default(), + }; + + let expected_data = encode_hex_0x(Array::::abi_encode(&vec![ + cu_1.clone(), + cu_2.clone(), + ])); let expected_response = format!("{{\"jsonrpc\":\"2.0\",\"result\":\"{expected_data}\",\"id\":0}}"); @@ -797,7 +847,6 @@ mod tests { #[tokio::test] async fn test_batch_init_request() { - use hex::FromHex; let expected_response = r#"[ { "jsonrpc": "2.0", @@ -1084,13 +1133,32 @@ mod tests { #[tokio::test] async fn test_get_deals() { enable_logs_for(LogSpec::new(vec!["chain-connector=debug".parse().unwrap()])); - let expected_deal_id_1 = "5e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c"; - let expected_cuid_1 = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5"; - let expected_deal_id_2 = "0x6e3d0fde6f793b3115a9e7f5ebc195bbeed35d6d"; - let expected_cuid_2 = "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1"; + let cu_1 = crate::Offer::ComputeUnit { + id: alloy_primitives::hex!( + "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5" + ) + .into(), + deal: alloy_primitives::hex!("5e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c").into(), + startEpoch: Default::default(), + onchainWorkerId: Default::default(), + }; + + let cu_2 = crate::Offer::ComputeUnit { + id: alloy_primitives::hex!( + "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1" + ) + .into(), + deal: alloy_primitives::hex!("6e3d0fde6f793b3115a9e7f5ebc195bbeed35d6d").into(), + startEpoch: Default::default(), + onchainWorkerId: Default::default(), + }; - let compute_units_response = "00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000002aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d50000000000000000000000005e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c00000000000000000000000000000000000000000000000000000000000fffbcba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d10000000000000000000000006e3d0fde6f793b3115a9e7f5ebc195bbeed35d6d00000000000000000000000000000000000000000000000000000000000fffba"; + let compute_units_response = + encode_hex_0x(Array::::abi_encode(&vec![ + cu_1.clone(), + cu_2.clone(), + ])); let compute_units_response = json!({ "jsonrpc": "2.0", "id": 0, @@ -1152,30 +1220,30 @@ mod tests { assert_eq!(deals.len(), 2, "there should be only two deals: {deals:?}"); assert!(deals[0].success, "failed to get a deal: {deals:?}"); - assert_eq!(deals[0].deal_id, expected_deal_id_1); + assert_eq!(deals[0].deal_id, cu_1.deal.to_string()); let deal_info = &deals[0].deal_info[0]; assert_eq!(deal_info.status, ACTIVE); assert_eq!( - deal_info.unit_ids.len(), + deal_info.cu_ids.len(), 1, "there should be only one unit id: {deals:?}" ); - assert_eq!(deal_info.unit_ids[0].to_string(), expected_cuid_1); + assert_eq!(deal_info.cu_ids[0], CUID::new(cu_1.id.0)); assert_eq!(deal_info.app_cid, expected_app_cid); // Second deal assert!(deals[1].success, "failed to get a deal: {deals:?}"); - assert_eq!(deals[1].deal_id, expected_deal_id_2); + assert_eq!(deals[1].deal_id, cu_2.deal.to_string()); let deal_info = &deals[1].deal_info[0]; assert_eq!(deal_info.status, ACTIVE); assert_eq!( - deal_info.unit_ids.len(), + deal_info.cu_ids.len(), 1, "there should be only one unit id: {deals:?}" ); - assert_eq!(deal_info.unit_ids[0].to_string(), expected_cuid_2); + assert_eq!(deal_info.cu_ids[0], CUID::new(cu_2.id.0)); assert_eq!(deal_info.app_cid, expected_app_cid); mock.assert(); @@ -1183,7 +1251,31 @@ mod tests { #[tokio::test] async fn test_get_deals_no_deals() { - let compute_units_response = "0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000002097ab88db9d8e48a1d55cd0205bd74721eb2ba70897d255f350062dec8fec7bb4000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002988e97756c550265b7a570b41bb4d8dbf056a1ca0435265ad175c5ecced2ef600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000d4af56c3a07423c4b976f6d1a43a96f4843bdc5479b01bf3bd73f69c1062738e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e9ab8214c37d615708ef89559407e1a015a0f7fbdaf3cd23a1f4c5ed2ae1c8cd00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a717255e798920c9a9bbaae45492ff9020cdc8db3d8a44099015fbe950927368000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000df891a54a5479aa7ee3c019615ac55a18534aa81dcf3b2fbf7577d54cadf0c300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000fbbd2b72d216685d97c09c56ca6ba16e7d2e35ff72bb69c72584ad8d9365610200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000ccff78d7a0c365cd3ba8842707a2f96f26ac7ea4dbd7aac04ae2f0958ef2252400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000db5fba69f5d2b2d04a5bf63b7abe1a2781470e77101e3afe6ec66794aec80c0a000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005b96d1d71cf1b77421645ae4031558195d9df60abd73ea716c3df67d3e7832da00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e5cf187d01e552b0bd38ea89b9013e7b98915891cafb44fc7cf6937223290bcf00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000fe00f5915737e75aa5c7799dcf326020ab499a65c9d10e1e4951be8371aee6f2000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c073f4fe89ebe1a354f052de5e371dc39e8d2b8f9ccbf0fc9e26e107217fe40000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008510f943f7080e58846bc5044afa0c91480dbeca0ff5dcbef7c522a43531cffc000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000007d3ea3803085980a5b4e3bd93c32d8dae3b8060db9aab800e0922b7b18d865fc000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002be7bd75150701c7ef8521d322fbcbb228cf907224d80e433550a3034bbcbb8b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e1469c97990a40da4465cb4e749faa2905adfc4b1109f7dc8b66ef8b1ed0a0c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008d20e7f50737ea4236c6d92b150e569aa2fc482699cc0165722210b4bcffdc4c00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000eae742035300aed3774579cab0c75416002438809f3cb3f62fb3c34dd0ab16790000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000029e9cc0f707e8e02578285b8e1d50c207fab5ec11b0b1e6f97834e1154abbe4b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005be58fc7ae8eb4e577b1be2a911a11fd17b90c8b6754aa71859ad6285bcc3b00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008f7ba4e1f4bccd2aaf1f8791df5b73a6727b0720aba8fcf0afb6e1f07303c1cc0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000053f668a26f5978f4f252145d68d2f0a627116c197a342a2e42aa269f46dcccf000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000db7ddcd38d772f1b043572bab7d890ea65723cf4805ee963f46b7a9f81a454f20000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000018fa36dd54f48c4c4ae40ceefca274460d4c83026f48fdcc4e201a1a0423561400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a0b6e3c818a1ecb540a289c81d0835dc41755f091ba70f02e5134203c195b80900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e77ef1585872eabfa78c01915cf0d43c3bd3bd63ae40565c62254a95bba901530000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000059810dcb510b17ca28693caa6d6164b6db28925290f345d8ef0ac8ef5751014f0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000d6b992345e2ed6287ff2df0943d10f972ae7f63789d11df22f3fd9a4199f5c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005e58c2b5ba2b1a7d861de440a962fe0ffdcef1e240e3e50690f6f1f527bf7d66000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000ca0be7865c187df9331bce42cd6ea3498d858a3d8b37fec18e9654ef320daa7000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003938f3a12b19ed989431771df29f64ef011eec4da79e2ec30d86f28dae35f10a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"; + let cu_1 = crate::Offer::ComputeUnit { + id: alloy_primitives::hex!( + "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5" + ) + .into(), + deal: Default::default(), + startEpoch: Default::default(), + onchainWorkerId: Default::default(), + }; + + let cu_2 = crate::Offer::ComputeUnit { + id: alloy_primitives::hex!( + "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1" + ) + .into(), + deal: Default::default(), + startEpoch: Default::default(), + onchainWorkerId: Default::default(), + }; + + let compute_units_response = + encode_hex_0x(Array::::abi_encode(&vec![ + cu_1.clone(), + cu_2.clone(), + ])); let compute_units_response = format!("{{\"jsonrpc\":\"2.0\",\"result\":\"{compute_units_response}\",\"id\":0}}"); let mut server = mockito::Server::new_async().await; @@ -1210,7 +1302,7 @@ mod tests { } #[tokio::test] - async fn test_register_worker() { + async fn test_activate_worker() { let get_block_by_number_response = r#"{"jsonrpc":"2.0","id":0,"result":{"hash":"0xcbe8d90665392babc8098738ec78009193c99d3cc872a6657e306cfe8824bef9","parentHash":"0x15e767118a3e2d7545fee290b545faccd4a9eff849ac1057ce82cab7100c0c52","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","miner":"0x0000000000000000000000000000000000000000","stateRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","difficulty":"0x0","number":"0xa2","gasLimit":"0x1c9c380","gasUsed":"0x0","timestamp":"0x65d88f76","extraData":"0x","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0000000000000000","baseFeePerGas":"0x7","totalDifficulty":"0x0","uncles":[],"transactions":[],"size":"0x220"}}"#; let estimate_gas_response = r#"{"jsonrpc":"2.0","id": 1,"result": "0x5208"}"#; let max_priority_fee_response = r#"{"jsonrpc":"2.0","id": 2,"result": "0x5208"}"#; @@ -1227,9 +1319,8 @@ mod tests { let deal_id = "5e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c"; let deal_id = types::DealId::from(deal_id); let worker_id = types::peer_scope::WorkerId::from(RandomPeerId::random()); - let cuid = - CUID::from_hex("aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5") - .unwrap(); + let onchain_worker_id = + decode_hex("aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5").unwrap(); let mut server = mockito::Server::new_async().await; let url = server.url(); @@ -1256,7 +1347,7 @@ mod tests { .create(); let result = get_connector(&url) - .register_worker(&deal_id, worker_id, cuid) + .register_worker(&deal_id, worker_id, onchain_worker_id) .await .unwrap(); assert_eq!(result, expected_tx_hash); diff --git a/crates/chain-connector/src/function/deal.rs b/crates/chain-connector/src/function/deal.rs index 792f7cb283..0227e3725b 100644 --- a/crates/chain-connector/src/function/deal.rs +++ b/crates/chain-connector/src/function/deal.rs @@ -16,7 +16,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - +use alloy_primitives::FixedBytes; use alloy_sol_types::{sol, SolType}; use hex_utils::decode_hex; use serde::{Deserialize, Serialize}; @@ -44,12 +44,13 @@ sol! { SMALL_BALANCE } - struct ComputeUnit { - bytes32 id; - bytes32 workerId; + struct Worker { + bytes32 offchainId; + bytes32 onchainId; bytes32 peerId; address provider; uint256 joinedEpoch; + bytes32[] computeUnitIds; } /// @dev Returns the status of the deal @@ -58,14 +59,18 @@ sol! { /// @dev Returns the app CID function appCID() external view returns (CIDV1 memory); - /// @dev Set worker ID for a compute unit. Compute unit can have only one worker ID - function setWorker(bytes32 computeUnitId, bytes32 workerId) external; + /// @dev Set offchain worker ID for a corresponding onchain worker for a deal + function activateWorker(bytes32 onchainId, bytes32 offchainId); - /// @dev Returns the compute units info by provider - function getComputeUnits() public view returns (ComputeUnit[] memory); + /// @dev Removes worker from the deal + function removeWorker(bytes32 onchainId) external; + /// @dev Returns workers + function getWorkers() external view returns (Worker[] memory); } } +pub type OnChainWorkerID = FixedBytes<32>; + impl CIDV1 { pub fn from_hex(hex: &str) -> Result { let bytes = decode_hex(hex)?; diff --git a/crates/chain-connector/src/function/offer.rs b/crates/chain-connector/src/function/offer.rs index 6b43b64c3c..605ccadf22 100644 --- a/crates/chain-connector/src/function/offer.rs +++ b/crates/chain-connector/src/function/offer.rs @@ -36,15 +36,13 @@ sol! { bytes32 id; address deal; uint256 startEpoch; + bytes32 onchainWorkerId; } /// @dev Returns the compute peer info function getComputePeer(bytes32 peerId) external view returns (ComputePeer memory); /// @dev Returns the compute units info of a peer function getComputeUnits(bytes32 peerId) external view returns (ComputeUnit[] memory); - - /// @dev Return the compute unit from a deal - function returnComputeUnitFromDeal(bytes32 unitId) external; } } @@ -75,12 +73,12 @@ impl From for PendingUnit { mod tests { use crate::Offer::ComputePeer; use alloy_primitives::{hex, U256}; - use alloy_sol_types::SolType; + use alloy_sol_types::SolValue; use hex_utils::decode_hex; - #[tokio::test] - async fn decode_compute_unit() { - let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d50000000000000000000000005e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c00000000000000000000000000000000000000000000000000000000000003e8"; + #[test] + fn decode_compute_unit() { + let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d50000000000000000000000005e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c00000000000000000000000000000000000000000000000000000000000003e8bb3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633dd"; let compute_unit = super::ComputeUnit::abi_decode(&decode_hex(data).unwrap(), true); assert!(compute_unit.is_ok()); let compute_unit = compute_unit.unwrap(); @@ -95,11 +93,15 @@ mod tests { "0x5e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c" ); assert_eq!(compute_unit.startEpoch, U256::from(1000)); + assert_eq!( + compute_unit.onchainWorkerId, + hex!("bb3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633dd") + ) } - #[tokio::test] - async fn decode_compute_unit_no_deal() { - let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003e8"; + #[test] + fn decode_compute_unit_no_deal_no_worker() { + let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003e80000000000000000000000000000000000000000000000000000000000000000"; let compute_unit = super::ComputeUnit::abi_decode(&decode_hex(data).unwrap(), true); assert!(compute_unit.is_ok()); let compute_unit = compute_unit.unwrap(); @@ -109,10 +111,11 @@ mod tests { ); assert!(compute_unit.deal.is_zero()); assert_eq!(compute_unit.startEpoch, U256::from(1000)); + assert!(compute_unit.onchainWorkerId.is_zero()) } - #[tokio::test] - async fn decode_compute_peer_no_commitment() { + #[test] + fn decode_compute_peer_no_commitment() { let data = "0xaa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000020000000000000000000000005b73c5498c1e3b4dba84de0f1833c4a029d90519"; let compute_peer = ComputePeer::abi_decode(&decode_hex(data).unwrap(), true); assert!(compute_peer.is_ok()); @@ -129,8 +132,8 @@ mod tests { ); } - #[tokio::test] - async fn decode_compute_peer() { + #[test] + fn decode_compute_peer() { let data = "0xaa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5aa3046a12a1aac6e840625e6329d70b427328feceedc8d273e5e6454b85633b5000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000005b73c5498c1e3b4dba84de0f1833c4a029d90519"; let compute_peer = ComputePeer::abi_decode(&decode_hex(data).unwrap(), true); assert!(compute_peer.is_ok()); diff --git a/crates/chain-connector/src/types.rs b/crates/chain-connector/src/types.rs index fd973adf96..48e6af46de 100644 --- a/crates/chain-connector/src/types.rs +++ b/crates/chain-connector/src/types.rs @@ -18,7 +18,6 @@ */ use crate::error::ConnectorError; use crate::function::Deal; -use crate::Deal::ComputeUnit; use alloy_primitives::U256; use ccp_shared::types::{Difficulty, GlobalNonce, CUID}; use chain_data::parse_peer_id; @@ -56,11 +55,14 @@ impl DealResult { } } +pub type OnChainWorkerId = Vec; + #[derive(Debug, Serialize, Deserialize)] pub struct DealInfo { pub status: Deal::Status, - pub unit_ids: Vec, + pub cu_ids: Vec, pub app_cid: String, + pub onchain_worker_id: OnChainWorkerId, } #[derive(Debug, Serialize, Deserialize)] @@ -142,28 +144,32 @@ impl RawTxReceipt { } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct Worker { +pub struct SubnetWorker { pub cu_ids: Vec, pub host_id: String, pub worker_id: Vec, } -impl TryFrom for Worker { +impl TryFrom for SubnetWorker { type Error = Report; - fn try_from(unit: ComputeUnit) -> eyre::Result { + fn try_from(deal_worker: Deal::Worker) -> eyre::Result { let mut worker_id = vec![]; - if !unit.workerId.is_zero() { - let w_id = parse_peer_id(&unit.workerId.0) - .map_err(|err| eyre!("Failed to parse unit.workerId: {err}"))? + if !deal_worker.offchainId.is_zero() { + let w_id = parse_peer_id(&deal_worker.offchainId.0) + .map_err(|err| eyre!("Failed to parse worker.offchainId: {err}"))? .to_base58(); worker_id.push(w_id) } - let cu_id = unit.id.to_string(); - let peer_id = parse_peer_id(&unit.peerId.0) - .map_err(|err| eyre!("Failed to parse unit.peerId: {err}"))?; + let cu_ids = deal_worker + .computeUnitIds + .into_iter() + .map(|id| id.to_string()) + .collect(); + let peer_id = parse_peer_id(&deal_worker.peerId.0) + .map_err(|err| eyre!("Failed to parse worker.peerId: {err}"))?; Ok(Self { - cu_ids: vec![cu_id], + cu_ids, host_id: peer_id.to_base58(), worker_id, }) @@ -173,7 +179,7 @@ impl TryFrom for Worker { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct SubnetResolveResult { pub success: bool, - pub workers: Vec, + pub workers: Vec, pub error: Vec, } diff --git a/crates/chain-listener/Cargo.toml b/crates/chain-listener/Cargo.toml index 54a56f547f..c312248e02 100644 --- a/crates/chain-listener/Cargo.toml +++ b/crates/chain-listener/Cargo.toml @@ -44,4 +44,4 @@ serde_with = { workspace = true } [dev-dependencies] jsonrpsee = { workspace = true, features = ["server"] } tempfile = { workspace = true } - +bs58 = { workspace = true } diff --git a/crates/chain-listener/src/event/compute_unit_matched.rs b/crates/chain-listener/src/event/compute_unit_matched.rs index 14202abd53..937a91f8d4 100644 --- a/crates/chain-listener/src/event/compute_unit_matched.rs +++ b/crates/chain-listener/src/event/compute_unit_matched.rs @@ -25,28 +25,31 @@ sol! { bytes32 hash; } - event ComputeUnitMatched( + event ComputeUnitsMatched( bytes32 indexed peerId, address deal, - bytes32 unitId, - uint256 dealCreationBlock, + bytes32 onchainWorkerId, + bytes32[] cuIds, CIDV1 appCID ); } #[cfg(test)] mod tests { - use crate::event::compute_unit_matched::ComputeUnitMatched; - use alloy_primitives::Uint; + use crate::event::compute_unit_matched::{ComputeUnitsMatched, CIDV1}; + use alloy_primitives::Address; use alloy_sol_types::SolEvent; - use chain_data::{parse_log, parse_peer_id, Log}; - use hex_utils::decode_hex; + use chain_data::{parse_log, parse_peer_id, peer_id_to_bytes, Log}; + use fluence_libp2p::RandomPeerId; + use hex_utils::{decode_hex, encode_hex_0x}; + use libipld::Cid; + use std::str::FromStr; #[test] fn topic() { assert_eq!( - ComputeUnitMatched::SIGNATURE_HASH.to_string(), - String::from("0xb1c5a9179c3104a43de668491f14c45778f00ec34d5deee023af204820483bdb") + ComputeUnitsMatched::SIGNATURE_HASH.to_string(), + String::from("0x6e5629a2cfaa82d1ea7ad51936794f666271fbd5068017020eaaafdbb017e615") ); } @@ -73,72 +76,76 @@ mod tests { #[test] fn parse() { - let data1 = "000000000000000000000000ffa0611a099ab68ad7c3c67b4ca5bbbee7a58b9900000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000500155122000000000000000000000000000000000000000000000000000000000a146af49df31c99c79a30ec4ae2abb2445d8c5d202ea58fa9ea9cbff45d4152e".to_string(); - let data2 = "00000000000000000000000067b2ad3866429282e16e55b715d12a77f85b7ce800000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000560155122000000000000000000000000000000000000000000000000000000000a146af49df31c99c79a30ec4ae2abb2445d8c5d202ea58fa9ea9cbff45d4152e".to_string(); + let peer_id1 = RandomPeerId::random(); + let cu_id1 = [1u8; 32]; + let deal1 = "0xFfA0611a099AB68AD7C3C67B4cA5bbBEE7a58B99"; + let cid = "bafkreifbi2xutxzrzgohtiyoysxcvozeixmmluqc5jmpvhvjzp7ulvavfy"; + let cid_bytes = Cid::from_str(cid).unwrap().to_bytes(); + let encoded_cid = CIDV1 { + prefixes: cid_bytes[0..4].as_chunks::<4>().0[0].into(), + hash: cid_bytes[4..36].as_chunks::<32>().0[0].into(), + }; + let event1 = ComputeUnitsMatched { + peerId: peer_id_to_bytes(peer_id1).into(), + deal: Address::from_slice(&decode_hex(deal1).unwrap()), + onchainWorkerId: Default::default(), + cuIds: vec![cu_id1.into()], + appCID: encoded_cid.clone(), + }; + + let peer_id2 = RandomPeerId::random(); + let cu_id2 = [2u8; 32]; + let cu_id3 = [3u8; 32]; + let deal2 = "0x67b2AD3866429282e16e55B715d12A77F85B7CE8"; + let event2 = ComputeUnitsMatched { + peerId: peer_id_to_bytes(peer_id2).into(), + deal: Address::from_slice(&decode_hex(deal2).unwrap()), + onchainWorkerId: Default::default(), + cuIds: vec![cu_id2.into(), cu_id3.into()], + appCID: encoded_cid, + }; let log1 = Log { - data: data1, + data: encode_hex_0x(&event1.encode_data()), block_number: "0x0".to_string(), removed: false, topics: vec![ - ComputeUnitMatched::SIGNATURE_HASH.to_string(), - "0x7a82a5feefcaad4a89c689412031e5f87c02b29e3fced583be5f05c7077354b7".to_string(), + ComputeUnitsMatched::SIGNATURE_HASH.to_string(), + encode_hex_0x(&peer_id_to_bytes(peer_id1)), ], }; let log2 = Log { - data: data2, + data: encode_hex_0x(&event2.encode_data()), block_number: "0x1".to_string(), removed: false, topics: vec![ - ComputeUnitMatched::SIGNATURE_HASH.to_string(), - "0x7a82a5feefcaad4a89c689412031e5f87c02b29e3fced583be5f05c7077354b7".to_string(), + ComputeUnitsMatched::SIGNATURE_HASH.to_string(), + encode_hex_0x(&peer_id_to_bytes(peer_id2)), ], }; - let m = parse_log::(log1).expect("error parsing Match from log"); - assert_eq!( - parse_peer_id(m.peerId.as_slice()).unwrap().to_string(), - "12D3KooWJ4bTHirdTFNZpCS72TAzwtdmavTBkkEXtzo6wHL25CtE" - ); - assert_eq!( - m.deal.to_string(), - "0xFfA0611a099AB68AD7C3C67B4cA5bbBEE7a58B99" - ); - assert_eq!( - m.unitId.to_string(), - "0x00000000000000000000000000000000000000000000000000000000000000a0" - ); - assert_eq!(m.dealCreationBlock, Uint::from(80)); + let m = parse_log::(log1).expect("error parsing Match from log"); + assert_eq!(parse_peer_id(m.peerId.as_slice()).unwrap(), peer_id1); + assert_eq!(m.deal.to_string(), deal1); + assert_eq!(m.cuIds.len(), 1); + assert_eq!(m.cuIds[0], cu_id1); let cid_bytes = [m.appCID.prefixes.to_vec(), m.appCID.hash.to_vec()].concat(); let app_cid = libipld::Cid::read_bytes(cid_bytes.as_slice()) .unwrap() .to_string(); - assert_eq!( - app_cid, - "bafkreifbi2xutxzrzgohtiyoysxcvozeixmmluqc5jmpvhvjzp7ulvavfy" - ); + assert_eq!(app_cid, cid); + + let m = parse_log::(log2).expect("error parsing Match from log"); + assert_eq!(parse_peer_id(m.peerId.as_slice()).unwrap(), peer_id2); + assert_eq!(m.deal.to_string(), deal2); + assert_eq!(m.cuIds.len(), 2); + assert_eq!(m.cuIds[0], cu_id2); + assert_eq!(m.cuIds[1], cu_id3); - let m = parse_log::(log2).expect("error parsing Match from log"); - assert_eq!( - parse_peer_id(m.peerId.as_slice()).unwrap().to_string(), - "12D3KooWJ4bTHirdTFNZpCS72TAzwtdmavTBkkEXtzo6wHL25CtE" - ); - assert_eq!( - m.deal.to_string(), - "0x67b2AD3866429282e16e55B715d12A77F85B7CE8" - ); - assert_eq!( - m.unitId.to_string(), - "0x00000000000000000000000000000000000000000000000000000000000000a0" - ); - assert_eq!(m.dealCreationBlock, Uint::from(86)); let cid_bytes = [m.appCID.prefixes.to_vec(), m.appCID.hash.to_vec()].concat(); let app_cid = libipld::Cid::read_bytes(cid_bytes.as_slice()) .unwrap() .to_string(); - assert_eq!( - app_cid, - "bafkreifbi2xutxzrzgohtiyoysxcvozeixmmluqc5jmpvhvjzp7ulvavfy" - ); + assert_eq!(app_cid, cid); } } diff --git a/crates/chain-listener/src/event/mod.rs b/crates/chain-listener/src/event/mod.rs index 67fe5926e8..c3a0ecafa6 100644 --- a/crates/chain-listener/src/event/mod.rs +++ b/crates/chain-listener/src/event/mod.rs @@ -23,6 +23,6 @@ mod unit_activated; mod unit_deactivated; pub use cc_activated::CommitmentActivated; -pub use compute_unit_matched::{ComputeUnitMatched, CIDV1}; +pub use compute_unit_matched::{ComputeUnitsMatched, CIDV1}; pub use unit_activated::UnitActivated; pub use unit_deactivated::UnitDeactivated; diff --git a/crates/chain-listener/src/lib.rs b/crates/chain-listener/src/lib.rs index 4a3e2a161b..259a4d3d45 100644 --- a/crates/chain-listener/src/lib.rs +++ b/crates/chain-listener/src/lib.rs @@ -22,13 +22,10 @@ #![feature(extract_if)] #![feature(btree_extract_if)] #![feature(result_option_inspect)] +#![feature(slice_as_chunks)] extern crate core; -pub use event::CommitmentActivated; -pub use event::ComputeUnitMatched; -pub use event::UnitActivated; -pub use event::UnitDeactivated; -pub use event::CIDV1; +pub use event::{CommitmentActivated, ComputeUnitsMatched, UnitActivated, UnitDeactivated, CIDV1}; pub use listener::ChainListener; mod event; diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index 545511fb0b..24ca716316 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -22,6 +22,7 @@ use alloy_sol_types::SolEvent; use backoff::Error::Permanent; use std::cmp::min; use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::fmt::{Debug, Formatter}; use std::future::{pending, Future}; use std::path::PathBuf; use std::process::exit; @@ -54,7 +55,7 @@ use tracing::Instrument; use chain_connector::Offer::ComputeUnit; use chain_connector::{ is_commitment_not_active, is_too_many_proofs, CCStatus, ChainConnector, CommitmentId, - ConnectorError, Deal, PEER_NOT_EXISTS, + ConnectorError, Deal, OnChainWorkerID, PEER_NOT_EXISTS, }; use chain_data::{parse_log, peer_id_to_hex, BlockHeader, Log}; use core_distributor::errors::AcquireError; @@ -65,12 +66,32 @@ use server_config::{ChainConfig, ChainListenerConfig}; use types::DealId; use crate::event::CommitmentActivated; -use crate::event::{ComputeUnitMatched, UnitActivated, UnitDeactivated}; +use crate::event::{ComputeUnitsMatched, UnitActivated, UnitDeactivated}; use crate::proof_tracker::ProofTracker; use crate::types::{CUGroups, PhysicalCoreGroups}; const PROOF_POLL_LIMIT: usize = 50; +#[derive(Clone)] +struct OnChainWorker { + id: OnChainWorkerID, + cu_ids: Vec, +} + +impl Debug for OnChainWorker { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "(chain_worker_id = {}, cu_ids = {:?})", + self.id, + self.cu_ids + .iter() + .map(|cu| cu.to_string()) + .collect::>() + ) + } +} + pub struct ChainListener { config: ChainConfig, listener_config: ChainListenerConfig, @@ -103,7 +124,7 @@ pub struct ChainListener { // the compute units that are in the commitment and not in deals cc_compute_units: BTreeMap, // the compute units that are in deals and not in commitment - active_deals: BTreeMap, + active_deals: BTreeMap, proof_tracker: ProofTracker, pending_proof_txs: Vec<(String, Vec)>, @@ -115,7 +136,7 @@ pub struct ChainListener { // Subscriptions that are polled always heads: Option>, commitment_activated: Option>, - unit_matched: Option>, + units_matched: Option>, metrics: Option, } @@ -170,7 +191,7 @@ impl ChainListener { unit_deactivated: None, heads: None, commitment_activated: None, - unit_matched: None, + units_matched: None, active_deals: BTreeMap::new(), metrics, proof_tracker: ProofTracker::new(persisted_proof_id_dir), @@ -250,9 +271,9 @@ impl ChainListener { } } }, - event = poll_subscription(&mut self.unit_matched) => { - if let Err(err) = self.process_unit_matched(event) { - self.handle_subscription_error("ComputeUnitMatched", err).await; + event = poll_subscription(&mut self.units_matched) => { + if let Err(err) = self.process_compute_units_matched(event) { + self.handle_subscription_error("ComputeUnitsMatched", err).await; } }, _ = timer.next() => { @@ -491,7 +512,8 @@ impl ChainListener { self.heads = Some(self.subscribe("newHeads", rpc_params!["newHeads"]).await?); self.commitment_activated = Some(self.subscribe("logs", self.cc_activated_params()).await?); - self.unit_matched = Some(self.subscribe("logs", self.unit_matched_params()).await?); + self.units_matched = + Some(self.subscribe("logs", self.unit_matched_params()).await?); if let Some(commitment_id) = self.current_commitment.clone() { self.subscribe_unit_events(&commitment_id).await?; } @@ -530,6 +552,17 @@ impl ChainListener { .map(|unit| (CUID::new(unit.id.0), unit)) .collect(); + self.active_deals.clear(); + for cu in &in_deal { + let cu_id = CUID::new(cu.id.0); + let deal_id = cu.deal.to_string().into(); + let onchain_worker = self.active_deals.entry(deal_id).or_insert(OnChainWorker { + id: cu.onchainWorkerId, + cu_ids: vec![], + }); + onchain_worker.cu_ids.push(cu_id); + } + let active = self .cc_compute_units .values() @@ -540,13 +573,6 @@ impl ChainListener { .values() .filter(|unit| unit.startEpoch > self.current_epoch); - for cu in &in_deal { - let cu_id = CUID::new(cu.id.0); - // TODO: in the future it should be BTreeMap>, because deal will be able - // to use multiple CUs from one peer - self.active_deals.insert(cu.deal.to_string().into(), cu_id); - } - tracing::info!(target: "chain-listener", "Compute units mapping: in cc {}/[{} pending], in deal {}", self.cc_compute_units.len(), @@ -567,14 +593,13 @@ impl ChainListener { tracing::info!(target: "chain-listener", "In deal compute units: {:?}", self.active_deals.values() - .map(CUID::to_string) .collect::>() ); // NOTE: cores are released after all the logs to simplify debug on failure - for cu_id in self.active_deals.values() { - self.core_distributor.release_worker_cores(&[*cu_id]); - self.acquire_core_for_deal(*cu_id)?; + for worker in self.active_deals.values() { + self.core_distributor.release_worker_cores(&worker.cu_ids); + self.acquire_core_for_deal(worker.cu_ids.clone())?; } Ok(()) @@ -629,7 +654,7 @@ impl ChainListener { fn unit_matched_params(&self) -> ArrayParams { let topics = vec![ - ComputeUnitMatched::SIGNATURE_HASH.to_string(), + ComputeUnitsMatched::SIGNATURE_HASH.to_string(), peer_id_to_hex(self.host_id), ]; rpc_params![ @@ -729,6 +754,7 @@ impl ChainListener { id, deal: Address::ZERO, startEpoch: cc_event.startEpoch, + onchainWorkerId: FixedBytes::<32>::ZERO, }, ) }) @@ -763,6 +789,7 @@ impl ChainListener { id: unit_event.unitId, deal: Address::ZERO, startEpoch: unit_event.startEpoch, + onchainWorkerId: FixedBytes::<32>::ZERO, }, ); @@ -788,11 +815,11 @@ impl ChainListener { ); self.cc_compute_units.remove(&unit_id); self.refresh_commitment().await?; - self.acquire_core_for_deal(unit_id)?; + self.acquire_core_for_deal(vec![unit_id])?; Ok(()) } - fn process_unit_matched( + fn process_compute_units_matched( &mut self, event: Option>, ) -> eyre::Result<()> { @@ -801,15 +828,24 @@ impl ChainListener { tracing::error!(target: "chain-listener", "Failed to parse DealMatched event: {err}, data: {event}"); err })?; - let deal_event = parse_log::(log)?; + let deal_event = parse_log::(log)?; tracing::info!(target: "chain-listener", "Received DealMatched event for deal: {}", deal_event.deal ); + let cu_ids = deal_event + .cuIds + .into_iter() + .map(|cu| CUID::new(cu.0)) + .collect(); + self.active_deals.insert( deal_event.deal.to_string().into(), - CUID::new(deal_event.unitId.0), + OnChainWorker { + id: deal_event.onchainWorkerId, + cu_ids, + }, ); Ok(()) } @@ -1018,9 +1054,9 @@ impl ChainListener { } } - fn acquire_core_for_deal(&self, unit_id: CUID) -> eyre::Result<()> { + fn acquire_core_for_deal(&self, cu_ids: Vec) -> eyre::Result<()> { self.core_distributor - .acquire_worker_cores(AcquireRequest::new(vec![unit_id], WorkType::Deal))?; + .acquire_worker_cores(AcquireRequest::new(cu_ids, WorkType::Deal))?; Ok(()) } @@ -1244,7 +1280,7 @@ impl ChainListener { }) .await?; - for (status, (deal_id, cu_id)) in statuses + for (status, (deal_id, worker)) in statuses .into_iter() .zip(self.active_deals.clone().into_iter()) { @@ -1252,7 +1288,7 @@ impl ChainListener { Ok(status) => match status { Deal::Status::INSUFFICIENT_FUNDS | Deal::Status::ENDED => { tracing::info!(target: "chain-listener", "Deal {deal_id} status: {status:?}; Exiting..."); - self.exit_deal(&deal_id, cu_id).await?; + self.exit_deal(&deal_id, worker.id).await?; tracing::info!(target: "chain-listener", "Exited deal {deal_id} successfully"); } _ => {} @@ -1266,14 +1302,18 @@ impl ChainListener { Ok(()) } - async fn exit_deal(&mut self, deal_id: &DealId, cu_id: CUID) -> eyre::Result<()> { + async fn exit_deal( + &mut self, + deal_id: &DealId, + onchain_worker_id: OnChainWorkerID, + ) -> eyre::Result<()> { let backoff = ExponentialBackoff { max_elapsed_time: Some(Duration::from_secs(3)), ..ExponentialBackoff::default() }; retry(backoff, || async { - self.chain_connector.exit_deal(&cu_id).await.map_err(|err| { + self.chain_connector.exit_deal(deal_id, onchain_worker_id).await.map_err(|err| { tracing::warn!(target: "chain-listener", "Failed to exit deal {deal_id}: {err}"); eyre!("Failed to exit deal {deal_id}: {err}; Retrying...") })?; diff --git a/crates/nox-tests/tests/builtin.rs b/crates/nox-tests/tests/builtin.rs index ebe42f80c7..7885e4912d 100644 --- a/crates/nox-tests/tests/builtin.rs +++ b/crates/nox-tests/tests/builtin.rs @@ -2377,24 +2377,33 @@ async fn aliases_restart() { #[ignore] #[tokio::test] async fn subnet_resolve() { - let cu_1 = chain_connector::Deal::ComputeUnit { - id: hex!("0000000000000000000000000000000000000000000000000000000000000001").into(), - workerId: peer_id_to_bytes(RandomPeerId::random()).into(), + let worker1 = chain_connector::Deal::Worker { + offchainId: peer_id_to_bytes(RandomPeerId::random()).into(), peerId: peer_id_to_bytes(RandomPeerId::random()).into(), provider: Default::default(), joinedEpoch: Default::default(), + onchainId: Default::default(), + computeUnitIds: vec![hex!( + "0000000000000000000000000000000000000000000000000000000000000001" + ) + .into()], }; - let cu_2 = chain_connector::Deal::ComputeUnit { - id: hex!("0000000000000000000000000000000000000000000000000000000000000002").into(), - workerId: Default::default(), + let worker2 = chain_connector::Deal::Worker { + offchainId: Default::default(), peerId: peer_id_to_bytes(RandomPeerId::random()).into(), provider: Default::default(), joinedEpoch: Default::default(), + onchainId: Default::default(), + computeUnitIds: vec![hex!( + "0000000000000000000000000000000000000000000000000000000000000002" + ) + .into()], }; - let resolve_result = encode_hex_0x(Array::::abi_encode( - &vec![cu_1.clone(), cu_2.clone()], - )); + let resolve_result = encode_hex_0x(Array::::abi_encode(&vec![ + worker1.clone(), + worker2.clone(), + ])); // Create a mock let mut server = mockito::Server::new_async().await; @@ -2485,13 +2494,13 @@ async fn subnet_resolve() { workers, vec![ ( - vec![encode_hex_0x(cu_1.id.0).to_string()], - parse_peer_id(&cu_1.peerId.0).unwrap().to_base58(), - vec![parse_peer_id(&cu_1.workerId.0).unwrap().to_base58()], + vec![encode_hex_0x(worker1.computeUnitIds[0].0).to_string()], + parse_peer_id(&worker1.peerId.0).unwrap().to_base58(), + vec![parse_peer_id(&worker1.offchainId.0).unwrap().to_base58()], ), ( - vec![encode_hex_0x(cu_2.id.0).to_string()], - parse_peer_id(&cu_2.peerId.0).unwrap().to_base58(), + vec![encode_hex_0x(worker2.computeUnitIds[0].0).to_string()], + parse_peer_id(&worker2.peerId.0).unwrap().to_base58(), vec![] ) ] diff --git a/crates/nox-tests/tests/chain_listener/chain_server.rs b/crates/nox-tests/tests/chain_listener/chain_server.rs index 473568568d..0c47ad4936 100644 --- a/crates/nox-tests/tests/chain_listener/chain_server.rs +++ b/crates/nox-tests/tests/chain_listener/chain_server.rs @@ -26,10 +26,10 @@ use chain_connector::Capacity::CapacityCalls; use chain_connector::Core::CoreCalls; use chain_connector::Deal::{DealCalls, Status}; use chain_connector::Offer::{ComputeUnit, OfferCalls}; -use chain_connector::{CCStatus, CommitmentId, Offer}; +use chain_connector::{CCStatus, CommitmentId, Deal}; use chain_data::{parse_peer_id, peer_id_to_bytes}; use chain_listener::{ - CommitmentActivated, ComputeUnitMatched, UnitActivated, UnitDeactivated, CIDV1, + CommitmentActivated, ComputeUnitsMatched, UnitActivated, UnitDeactivated, CIDV1, }; use clarity::Transaction; use eyre::{eyre, OptionExt}; @@ -40,6 +40,7 @@ use jsonrpsee::server::ServerHandle; use jsonrpsee::types::{ErrorObject, Params}; use jsonrpsee::{RpcModule, SubscriptionMessage}; use parking_lot::{Mutex, MutexGuard}; +use rand::Rng; use serde_json::{json, Value}; use std::collections::BTreeSet; use std::str::FromStr; @@ -189,9 +190,6 @@ impl ChainServer { let data = Array::::abi_encode(&state.units); Ok(encode_hex_0x(data)) } - OfferCalls::returnComputeUnitFromDeal(_params) => { - todo!() - } } } else if let Ok(res) = DealCalls::abi_decode(data.as_slice(), true) { if !deal_addresses.contains(&to) { @@ -216,10 +214,13 @@ impl ChainServer { DealCalls::appCID(_) => { todo!() } - DealCalls::setWorker(_) => { + DealCalls::activateWorker(_) => { + todo!() + } + DealCalls::getWorkers(_) => { todo!() } - DealCalls::getComputeUnits(_) => { + DealCalls::removeWorker(_) => { todo!() } } @@ -424,6 +425,12 @@ impl ChainServer { pub async fn create_deal(&self, params: CreateDealParams) { let mut state = self.chain_state.lock(); let block_number = state.block_number; + let on_chain_worker_id = FixedBytes::<32>::from(rand::thread_rng().gen::<[u8; 32]>()); + + state + .workers + .insert(on_chain_worker_id, vec![params.unit.id]); + state.deal_statuses.insert(params.deal_id, Status::ACTIVE); let peer_state = state.peer_states.get_mut(¶ms.peer_id).unwrap(); if let Some(state_unit) = peer_state @@ -432,6 +439,7 @@ impl ChainServer { .find(|state_unit| state_unit.id == params.unit.id) { state_unit.deal = params.deal_id; + state_unit.onchainWorkerId = on_chain_worker_id; } { @@ -467,21 +475,21 @@ impl ChainServer { } { - let event = ComputeUnitMatched { + let event = ComputeUnitsMatched { peerId: FixedBytes::new(peer_id_to_bytes(params.peer_id)), deal: params.deal_id, - unitId: params.unit.id, - dealCreationBlock: block_number, + onchainWorkerId: Default::default(), appCID: CIDV1 { prefixes: Default::default(), hash: Default::default(), }, + cuIds: vec![params.unit.id], }; let data = event.encode_data(); let data = encode_hex_0x(data); let message = SubscriptionMessage::from_json(&json!({ "topics": vec![ - ComputeUnitMatched::SIGNATURE_HASH.to_string(), + ComputeUnitsMatched::SIGNATURE_HASH.to_string(), encode_hex_0x(peer_id_to_bytes(params.peer_id)), ], "data": data, @@ -492,7 +500,7 @@ impl ChainServer { let message = LogsParams { address: self.diamond_contract_address.clone(), topics: vec![ - ComputeUnitMatched::SIGNATURE_HASH.to_string(), + ComputeUnitsMatched::SIGNATURE_HASH.to_string(), encode_hex_0x(peer_id_to_bytes(params.peer_id)), ], message, @@ -520,74 +528,94 @@ impl ChainServer { let transaction = Transaction::decode_from_rlp(&mut transaction.as_slice()) .map_err(|_| ErrorObject::owned(500, "", None::))?; + let state = ctx.chain_state.lock(); + + let deal_addresses = state + .deal_statuses + .keys() + .map(|addr| addr.to_checksum(None)) + .collect::>(); + match transaction { Transaction::Legacy { .. } => {} Transaction::Eip2930 { .. } => {} Transaction::Eip1559 { to, data, .. } => { let to = to.to_string(); - if to == contract_address { - let call = - Offer::returnComputeUnitFromDealCall::abi_decode(data.as_slice(), true) - .map_err(|_| { - ErrorObject::owned( - 500, - "expected ABI for function returnComputeUnitFromDealCall", - None::, - ) - })?; + if deal_addresses.contains(&to) { + let call = Deal::removeWorkerCall::abi_decode(data.as_slice(), true).map_err( + |_| { + ErrorObject::owned( + 500, + "expected ABI for function returnComputeUnitFromDealCall", + None::, + ) + }, + )?; - let state = ctx.chain_state.lock(); - let unit_state = + let worker_cus = state - .unit_state - .get(&call.unitId) + .workers + .get(&call.onchainId) .ok_or(ErrorObject::owned( 500, - format!("no such unitId {}", call.unitId), + &format!("no such worker {}", call.onchainId), None::, ))?; - let commitment_id = - unit_state.commitment_id.clone().ok_or(ErrorObject::owned( - 500, - &format!("compute unit {} doesn't have commitment id", call.unitId), - None::, - ))?; - - let event = UnitActivated { - commitmentId: FixedBytes::new(commitment_id.0), - unitId: call.unitId, - startEpoch: state.current_epoch + U256::from(100), - }; - - let data = event.encode_data(); - let data = encode_hex_0x(data); - - let message = SubscriptionMessage::from_json(&json!({ - "topics": vec![ - UnitActivated::SIGNATURE_HASH.to_string(), - commitment_id.to_string(), - encode_hex_0x(call.unitId), - ], - "data": data, - "blockNumber": state.block_number, - "removed": false - })) - .unwrap(); - - let message = LogsParams { - address: contract_address.clone(), - topics: vec![ + for unit_id in worker_cus { + let unit_state = + state.unit_state.get(unit_id).ok_or(ErrorObject::owned( + 500, + format!("no such unitId {}", unit_id), + None::, + ))?; + + let commitment_id = + unit_state.commitment_id.clone().ok_or(ErrorObject::owned( + 500, + &format!("compute unit {} doesn't have commitment id", unit_id), + None::, + ))?; + + let event = UnitActivated { + commitmentId: FixedBytes::new(commitment_id.0), + unitId: unit_id.clone(), + startEpoch: state.current_epoch + U256::from(100), + }; + + let data = event.encode_data(); + let data = encode_hex_0x(data); + + let message = SubscriptionMessage::from_json(&json!({ + "topics": vec![ UnitActivated::SIGNATURE_HASH.to_string(), commitment_id.to_string(), - encode_hex_0x(call.unitId), - ], - message, - }; + encode_hex_0x(unit_id), + ], + "data": data, + "blockNumber": state.block_number, + "removed": false + })) + .unwrap(); + + let message = LogsParams { + address: contract_address.clone(), + topics: vec![ + UnitActivated::SIGNATURE_HASH.to_string(), + commitment_id.to_string(), + encode_hex_0x(unit_id.0), + ], + message, + }; - ctx.logs_sender.send(message).unwrap(); + ctx.logs_sender.send(message).unwrap(); + } } else { - return Err(ErrorObject::owned(500, &format!("Only TXs with returnComputeUnitFromDealCall sent to diamond address {} are supported", contract_address), None::)); + return Err(ErrorObject::owned( + 500, + &format!("Only TXs with removeWorker sent to deal addresses are supported",), + None::, + )); } } } diff --git a/crates/nox-tests/tests/chain_listener/tests.rs b/crates/nox-tests/tests/chain_listener/tests.rs index 823d37cb57..4991d13b77 100644 --- a/crates/nox-tests/tests/chain_listener/tests.rs +++ b/crates/nox-tests/tests/chain_listener/tests.rs @@ -157,6 +157,7 @@ async fn test_cc_activation_flow() { id: compute_unit_id_1, deal: Address::ZERO, startEpoch: U256::from(2), + onchainWorkerId: FixedBytes::<32>::ZERO, }; let compute_peer_1 = ComputePeer { @@ -257,18 +258,21 @@ async fn test_deal_insufficient_funds_flow() { id: compute_unit_id_1, deal: Address::ZERO, startEpoch: U256::from(2), + onchainWorkerId: FixedBytes::<32>::ZERO, }; let compute_unit_2 = ComputeUnit { id: compute_unit_id_2, deal: Address::ZERO, startEpoch: U256::from(2), + onchainWorkerId: FixedBytes::<32>::ZERO, }; let compute_unit_3 = ComputeUnit { id: compute_unit_id_3, deal: Address::ZERO, startEpoch: U256::from(2), + onchainWorkerId: FixedBytes::<32>::ZERO, }; let compute_peer_1 = ComputePeer { diff --git a/crates/nox-tests/tests/chain_listener/types.rs b/crates/nox-tests/tests/chain_listener/types.rs index 832fa6126a..31c59f0282 100644 --- a/crates/nox-tests/tests/chain_listener/types.rs +++ b/crates/nox-tests/tests/chain_listener/types.rs @@ -22,7 +22,7 @@ use alloy_primitives::{Address, FixedBytes, U256}; use ccp_shared::types::{Difficulty, GlobalNonce, PhysicalCoreId, CUID}; use chain_connector::Deal::Status; use chain_connector::Offer::{ComputePeer, ComputeUnit}; -use chain_connector::{CCStatus, CommitmentId}; +use chain_connector::{CCStatus, CommitmentId, OnChainWorkerID}; use fluence_libp2p::PeerId; use hex::FromHex; use jsonrpsee::SubscriptionMessage; @@ -30,6 +30,7 @@ use parking_lot::Mutex; use std::collections::HashMap; use std::sync::Arc; +type ChainCUID = FixedBytes<32>; pub struct ChainState { pub(crate) block_number: U256, pub(crate) block_duration: U256, @@ -44,7 +45,8 @@ pub struct ChainState { pub(crate) commitment_activation_at: HashMap, pub(crate) peer_states: HashMap, pub(crate) deal_statuses: HashMap, - pub(crate) unit_state: HashMap, UnitState>, + pub(crate) unit_state: HashMap, + pub(crate) workers: HashMap>, } impl Default for ChainState { @@ -64,6 +66,7 @@ impl Default for ChainState { deal_statuses: Default::default(), commitment_activation_at: Default::default(), unit_state: Default::default(), + workers: Default::default(), } } } diff --git a/crates/system-services/Cargo.toml b/crates/system-services/Cargo.toml index 8ebfc62084..d86c9ec5a4 100644 --- a/crates/system-services/Cargo.toml +++ b/crates/system-services/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] aqua-ipfs-distro = "=0.6.0" -decider-distro = "=0.7.1" +decider-distro = "=0.7.2" registry-distro = "=0.9.4" trust-graph-distro = "=0.4.11"