diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index 918a9ae679..71d1491ea4 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -84,6 +84,7 @@ jobs:
- nox-snapshot
uses: fluencelabs/cli/.github/workflows/tests.yml@main
with:
+ ref: up-clients
nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}"
js-client:
diff --git a/Cargo.lock b/Cargo.lock
index 0057d613aa..e6381f95df 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1479,6 +1479,7 @@ dependencies = [
"alloy-sol-types",
"alloy_serde_macro",
"backoff",
+ "bs58",
"ccp-rpc-client",
"ccp-shared",
"chain-connector",
@@ -2207,9 +2208,9 @@ dependencies = [
[[package]]
name = "decider-distro"
-version = "0.7.1"
+version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1f87fc5b72d3931ea4017ee61d7c99a7f98ec6f337b877f4e78fe14a708cb1ec"
+checksum = "828361c3930c4a44400ac0c4003b9cc5754894b052b55a9781d3bfd4455435ef"
dependencies = [
"built 0.7.1",
"fluence-spell-dtos",
diff --git a/crates/chain-connector/src/builtins.rs b/crates/chain-connector/src/builtins.rs
index 5fc34139ff..2d79aa3e72 100644
--- a/crates/chain-connector/src/builtins.rs
+++ b/crates/chain-connector/src/builtins.rs
@@ -16,9 +16,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-use crate::types::{SubnetResolveResult, TxReceiptResult, TxStatus, Worker};
+use crate::types::{OnChainWorkerId, SubnetResolveResult, SubnetWorker, TxReceiptResult, TxStatus};
use crate::{ChainConnector, HttpChainConnector};
-use ccp_shared::types::CUID;
use futures::FutureExt;
use particle_args::{Args, JError};
use particle_builtins::{wrap, CustomService};
@@ -111,14 +110,14 @@ async fn register_worker_builtin(
let mut args = args.function_args.into_iter();
let deal_id: DealId = Args::next("deal_id", &mut args)?;
let worker_id: WorkerId = Args::next("worker_id", &mut args)?;
- let cu_ids: Vec = Args::next("cu_id", &mut args)?;
+ let onchain_worker_id: OnChainWorkerId = Args::next("onchain_worker_id", &mut args)?;
- if cu_ids.len() != 1 {
- return Err(JError::new("Only one cu_id is allowed"));
+ if onchain_worker_id.is_empty() {
+ return Err(JError::new("Invalid onchain_worker_id: empty"));
}
let tx_hash = connector
- .register_worker(&deal_id, worker_id, cu_ids[0])
+ .register_worker(&deal_id, worker_id, onchain_worker_id)
.await
.map_err(|err| JError::new(format!("Failed to register worker: {err}")))?;
Ok(json!(tx_hash))
@@ -164,7 +163,7 @@ async fn resolve_subnet_builtin(
let deal_id: String = Args::next("deal_id", &mut args.function_args.into_iter())?;
let deal_id = DealId::from(deal_id);
- let workers: eyre::Result> = try {
+ let workers: eyre::Result> = try {
if !deal_id.is_valid() {
Err(eyre::eyre!(
"Invalid deal id '{}': invalid length",
@@ -172,10 +171,10 @@ async fn resolve_subnet_builtin(
))?;
}
- let units = connector.get_deal_compute_units(&deal_id).await?;
- let workers: Result, _> = units
+ let workers = connector.get_deal_workers(&deal_id).await?;
+ let workers: Result, _> = workers
.into_iter()
- .map(|unit| Worker::try_from(unit))
+ .map(|worker| SubnetWorker::try_from(worker))
.collect();
workers?
};
diff --git a/crates/chain-connector/src/connector.rs b/crates/chain-connector/src/connector.rs
index b5499011be..1370e888ac 100644
--- a/crates/chain-connector/src/connector.rs
+++ b/crates/chain-connector/src/connector.rs
@@ -42,7 +42,7 @@ use crate::types::*;
use crate::ConnectorError::{FieldNotFound, InvalidU256, ResponseParseError};
use crate::Deal::CIDV1;
use crate::Offer::{ComputePeer, ComputeUnit};
-use crate::{CCStatus, Capacity, CommitmentId, Core, Deal, Offer};
+use crate::{CCStatus, Capacity, CommitmentId, Core, Deal, Offer, OnChainWorkerID};
use chain_data::{peer_id_to_bytes, BlockHeader};
use fluence_libp2p::PeerId;
use hex_utils::{decode_hex, encode_hex_0x};
@@ -72,7 +72,11 @@ pub trait ChainConnector: Send + Sync {
async fn get_deal_statuses(&self, deal_ids: Vec) -> Result>>;
- async fn exit_deal(&self, cu_id: &CUID) -> Result;
+ async fn exit_deal(
+ &self,
+ deal_id: &DealId,
+ on_chain_worker_id: OnChainWorkerID,
+ ) -> Result;
async fn get_tx_statuses(&self, tx_hashes: Vec) -> Result>>>;
@@ -151,7 +155,7 @@ impl HttpChainConnector {
pub(crate) async fn get_deals(&self) -> eyre::Result> {
let units = self.get_compute_units().await?;
tracing::debug!(target: "chain-connector", "get_deals: Got {} compute units", units.len());
- let mut deals: BTreeMap> = BTreeMap::new();
+ let mut deals: BTreeMap>> = BTreeMap::new();
units
.iter()
@@ -160,27 +164,47 @@ impl HttpChainConnector {
deals
.entry(unit.deal.to_string().into())
.or_default()
+ .entry(unit.onchainWorkerId.as_slice().into())
+ .or_default()
.push(CUID::new(unit.id.into()));
});
+ // For now, we forbid multiple workers for one deal on the peer!
+ deals.retain(|deal_id, worker| {
+ if worker.keys().len() > 1 {
+ tracing::error!(target: "chain-connector", "Deal {deal_id} has more then one worker for the peer which is forbiden at the moment");
+ false
+ } else {
+ true
+ }
+ });
+
if deals.is_empty() {
return Ok(Vec::new());
}
+
tracing::debug!(target: "chain-connector", "get_deals: Got {} deals: {:?}", deals.len(), deals);
let infos = self.get_deal_infos(deals.keys()).await?;
tracing::debug!(target: "chain-connector", "get_deals: Got {} deals infos: {:?}", infos.len(), infos);
let deals = infos
.into_iter()
.zip(deals)
- .map(|(details, (deal_id, unit_ids))| match details {
- Ok((status, app_cid)) => DealResult::new(
- deal_id,
- DealInfo {
- unit_ids,
- status,
- app_cid,
- },
- ),
+ .map(|(details, (deal_id, mut workers))| match details {
+ Ok((status, app_cid)) => {
+ if let Some((onchain_worker_id, cu_ids)) = workers.pop_first() {
+ DealResult::new(
+ deal_id,
+ DealInfo {
+ cu_ids,
+ status,
+ app_cid,
+ onchain_worker_id,
+ },
+ )
+ } else {
+ DealResult::with_error(deal_id, "No CUs are found for the deal".into())
+ }
+ }
Err(err) => DealResult::with_error(deal_id, err.to_string()),
})
.collect::<_>();
@@ -348,14 +372,14 @@ impl HttpChainConnector {
&self,
deal_id: &DealId,
worker_id: WorkerId,
- cu_id: CUID,
+ onchain_worker_id: OnChainWorkerId,
) -> Result {
- let data = Deal::setWorkerCall {
- computeUnitId: cu_id.as_ref().into(),
- workerId: peer_id_to_bytes(worker_id.into()).into(),
+ let data = Deal::activateWorkerCall {
+ onchainId: FixedBytes::from_slice(&onchain_worker_id),
+ offchainId: peer_id_to_bytes(worker_id.into()).into(),
}
.abi_encode();
- tracing::debug!(target: "chain-connector", "Registering worker {worker_id} for deal {deal_id} with cu_id {cu_id}");
+ tracing::debug!(target: "chain-connector", "Registering worker {worker_id} for deal {deal_id} with onchain_id {}", encode_hex_0x(onchain_worker_id));
self.send_tx(data, &deal_id.to_address()).await
}
@@ -365,8 +389,8 @@ impl HttpChainConnector {
self.make_latest_diamond_rpc_params(data)
}
- pub async fn get_deal_compute_units(&self, deal_id: &DealId) -> Result> {
- let data = Deal::getComputeUnitsCall {}.abi_encode();
+ pub async fn get_deal_workers(&self, deal_id: &DealId) -> Result> {
+ let data = Deal::getWorkersCall {}.abi_encode();
let resp: String = process_response(
self.client
.request(
@@ -376,9 +400,9 @@ impl HttpChainConnector {
.await,
)?;
let bytes = decode_hex(&resp)?;
- let compute_units = as SolType>::abi_decode(&bytes, true)?;
+ let workers = as SolType>::abi_decode(&bytes, true)?;
- Ok(compute_units)
+ Ok(workers)
}
fn init_timestamp_params(&self) -> ArrayParams {
@@ -580,14 +604,17 @@ impl ChainConnector for HttpChainConnector {
Ok(statuses)
}
- async fn exit_deal(&self, cu_id: &CUID) -> Result {
- let data = Offer::returnComputeUnitFromDealCall {
- unitId: cu_id.as_ref().into(),
+ async fn exit_deal(
+ &self,
+ deal_id: &DealId,
+ onchain_worker_id: OnChainWorkerID,
+ ) -> Result {
+ let data = Deal::removeWorkerCall {
+ onchainId: onchain_worker_id,
}
.abi_encode();
- self.send_tx(data, &self.config.diamond_contract_address)
- .await
+ self.send_tx(data, &deal_id.to_address()).await
}
async fn get_tx_statuses(&self, tx_hashes: Vec) -> Result>>> {
@@ -640,21 +667,21 @@ mod tests {
use alloy_primitives::uint;
use alloy_primitives::U256;
- use std::assert_matches::assert_matches;
- use std::str::FromStr;
- use std::sync::Arc;
-
+ use alloy_sol_types::sol_data::Array;
+ use alloy_sol_types::SolType;
use ccp_shared::types::{Difficulty, GlobalNonce, LocalNonce, ResultHash, CUID};
+ use chain_data::peer_id_from_hex;
use clarity::PrivateKey;
+ use fluence_libp2p::RandomPeerId;
use hex::FromHex;
+ use hex_utils::{decode_hex, encode_hex_0x};
+ use log_utils::{enable_logs_for, LogSpec};
use mockito::Matcher;
use serde::Deserialize;
use serde_json::json;
-
- use chain_data::peer_id_from_hex;
- use fluence_libp2p::RandomPeerId;
- use hex_utils::decode_hex;
- use log_utils::{enable_logs_for, LogSpec};
+ use std::assert_matches::assert_matches;
+ use std::str::FromStr;
+ use std::sync::Arc;
use crate::connector::TxStatus;
use crate::Deal::Status::ACTIVE;
@@ -686,7 +713,30 @@ mod tests {
#[tokio::test]
async fn test_get_compute_units() {
- let expected_data = "0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000025d204dcc21f59c2a2098a277e48879207f614583e066654ad6736d36815ebb9e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000450e2f2a5bdb528895e9005f67e70fe213b9b822122e96fd85d2238cae55b6f900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000";
+ let cu_1 = crate::Offer::ComputeUnit {
+ id: alloy_primitives::hex!(
+ "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5"
+ )
+ .into(),
+ deal: Default::default(),
+ startEpoch: Default::default(),
+ onchainWorkerId: Default::default(),
+ };
+
+ let cu_2 = crate::Offer::ComputeUnit {
+ id: alloy_primitives::hex!(
+ "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1"
+ )
+ .into(),
+ deal: Default::default(),
+ startEpoch: Default::default(),
+ onchainWorkerId: Default::default(),
+ };
+
+ let expected_data = encode_hex_0x(Array::::abi_encode(&vec![
+ cu_1.clone(),
+ cu_2.clone(),
+ ]));
let expected_response =
format!("{{\"jsonrpc\":\"2.0\",\"result\":\"{expected_data}\",\"id\":0}}");
@@ -797,7 +847,6 @@ mod tests {
#[tokio::test]
async fn test_batch_init_request() {
- use hex::FromHex;
let expected_response = r#"[
{
"jsonrpc": "2.0",
@@ -1084,13 +1133,32 @@ mod tests {
#[tokio::test]
async fn test_get_deals() {
enable_logs_for(LogSpec::new(vec!["chain-connector=debug".parse().unwrap()]));
- let expected_deal_id_1 = "5e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c";
- let expected_cuid_1 = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5";
- let expected_deal_id_2 = "0x6e3d0fde6f793b3115a9e7f5ebc195bbeed35d6d";
- let expected_cuid_2 = "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1";
+ let cu_1 = crate::Offer::ComputeUnit {
+ id: alloy_primitives::hex!(
+ "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5"
+ )
+ .into(),
+ deal: alloy_primitives::hex!("5e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c").into(),
+ startEpoch: Default::default(),
+ onchainWorkerId: Default::default(),
+ };
+
+ let cu_2 = crate::Offer::ComputeUnit {
+ id: alloy_primitives::hex!(
+ "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1"
+ )
+ .into(),
+ deal: alloy_primitives::hex!("6e3d0fde6f793b3115a9e7f5ebc195bbeed35d6d").into(),
+ startEpoch: Default::default(),
+ onchainWorkerId: Default::default(),
+ };
- let compute_units_response = "00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000002aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d50000000000000000000000005e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c00000000000000000000000000000000000000000000000000000000000fffbcba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d10000000000000000000000006e3d0fde6f793b3115a9e7f5ebc195bbeed35d6d00000000000000000000000000000000000000000000000000000000000fffba";
+ let compute_units_response =
+ encode_hex_0x(Array::::abi_encode(&vec![
+ cu_1.clone(),
+ cu_2.clone(),
+ ]));
let compute_units_response = json!({
"jsonrpc": "2.0",
"id": 0,
@@ -1152,30 +1220,30 @@ mod tests {
assert_eq!(deals.len(), 2, "there should be only two deals: {deals:?}");
assert!(deals[0].success, "failed to get a deal: {deals:?}");
- assert_eq!(deals[0].deal_id, expected_deal_id_1);
+ assert_eq!(deals[0].deal_id, cu_1.deal.to_string());
let deal_info = &deals[0].deal_info[0];
assert_eq!(deal_info.status, ACTIVE);
assert_eq!(
- deal_info.unit_ids.len(),
+ deal_info.cu_ids.len(),
1,
"there should be only one unit id: {deals:?}"
);
- assert_eq!(deal_info.unit_ids[0].to_string(), expected_cuid_1);
+ assert_eq!(deal_info.cu_ids[0], CUID::new(cu_1.id.0));
assert_eq!(deal_info.app_cid, expected_app_cid);
// Second deal
assert!(deals[1].success, "failed to get a deal: {deals:?}");
- assert_eq!(deals[1].deal_id, expected_deal_id_2);
+ assert_eq!(deals[1].deal_id, cu_2.deal.to_string());
let deal_info = &deals[1].deal_info[0];
assert_eq!(deal_info.status, ACTIVE);
assert_eq!(
- deal_info.unit_ids.len(),
+ deal_info.cu_ids.len(),
1,
"there should be only one unit id: {deals:?}"
);
- assert_eq!(deal_info.unit_ids[0].to_string(), expected_cuid_2);
+ assert_eq!(deal_info.cu_ids[0], CUID::new(cu_2.id.0));
assert_eq!(deal_info.app_cid, expected_app_cid);
mock.assert();
@@ -1183,7 +1251,31 @@ mod tests {
#[tokio::test]
async fn test_get_deals_no_deals() {
- let compute_units_response = "0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000002097ab88db9d8e48a1d55cd0205bd74721eb2ba70897d255f350062dec8fec7bb4000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002988e97756c550265b7a570b41bb4d8dbf056a1ca0435265ad175c5ecced2ef600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000d4af56c3a07423c4b976f6d1a43a96f4843bdc5479b01bf3bd73f69c1062738e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e9ab8214c37d615708ef89559407e1a015a0f7fbdaf3cd23a1f4c5ed2ae1c8cd00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a717255e798920c9a9bbaae45492ff9020cdc8db3d8a44099015fbe950927368000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000df891a54a5479aa7ee3c019615ac55a18534aa81dcf3b2fbf7577d54cadf0c300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000fbbd2b72d216685d97c09c56ca6ba16e7d2e35ff72bb69c72584ad8d9365610200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000ccff78d7a0c365cd3ba8842707a2f96f26ac7ea4dbd7aac04ae2f0958ef2252400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000db5fba69f5d2b2d04a5bf63b7abe1a2781470e77101e3afe6ec66794aec80c0a000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005b96d1d71cf1b77421645ae4031558195d9df60abd73ea716c3df67d3e7832da00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e5cf187d01e552b0bd38ea89b9013e7b98915891cafb44fc7cf6937223290bcf00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000fe00f5915737e75aa5c7799dcf326020ab499a65c9d10e1e4951be8371aee6f2000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c073f4fe89ebe1a354f052de5e371dc39e8d2b8f9ccbf0fc9e26e107217fe40000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008510f943f7080e58846bc5044afa0c91480dbeca0ff5dcbef7c522a43531cffc000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000007d3ea3803085980a5b4e3bd93c32d8dae3b8060db9aab800e0922b7b18d865fc000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002be7bd75150701c7ef8521d322fbcbb228cf907224d80e433550a3034bbcbb8b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e1469c97990a40da4465cb4e749faa2905adfc4b1109f7dc8b66ef8b1ed0a0c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008d20e7f50737ea4236c6d92b150e569aa2fc482699cc0165722210b4bcffdc4c00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000eae742035300aed3774579cab0c75416002438809f3cb3f62fb3c34dd0ab16790000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000029e9cc0f707e8e02578285b8e1d50c207fab5ec11b0b1e6f97834e1154abbe4b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005be58fc7ae8eb4e577b1be2a911a11fd17b90c8b6754aa71859ad6285bcc3b00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008f7ba4e1f4bccd2aaf1f8791df5b73a6727b0720aba8fcf0afb6e1f07303c1cc0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000053f668a26f5978f4f252145d68d2f0a627116c197a342a2e42aa269f46dcccf000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000db7ddcd38d772f1b043572bab7d890ea65723cf4805ee963f46b7a9f81a454f20000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000018fa36dd54f48c4c4ae40ceefca274460d4c83026f48fdcc4e201a1a0423561400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000a0b6e3c818a1ecb540a289c81d0835dc41755f091ba70f02e5134203c195b80900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e77ef1585872eabfa78c01915cf0d43c3bd3bd63ae40565c62254a95bba901530000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000059810dcb510b17ca28693caa6d6164b6db28925290f345d8ef0ac8ef5751014f0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000d6b992345e2ed6287ff2df0943d10f972ae7f63789d11df22f3fd9a4199f5c000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005e58c2b5ba2b1a7d861de440a962fe0ffdcef1e240e3e50690f6f1f527bf7d66000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000ca0be7865c187df9331bce42cd6ea3498d858a3d8b37fec18e9654ef320daa7000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003938f3a12b19ed989431771df29f64ef011eec4da79e2ec30d86f28dae35f10a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000";
+ let cu_1 = crate::Offer::ComputeUnit {
+ id: alloy_primitives::hex!(
+ "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5"
+ )
+ .into(),
+ deal: Default::default(),
+ startEpoch: Default::default(),
+ onchainWorkerId: Default::default(),
+ };
+
+ let cu_2 = crate::Offer::ComputeUnit {
+ id: alloy_primitives::hex!(
+ "ba3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d1"
+ )
+ .into(),
+ deal: Default::default(),
+ startEpoch: Default::default(),
+ onchainWorkerId: Default::default(),
+ };
+
+ let compute_units_response =
+ encode_hex_0x(Array::::abi_encode(&vec![
+ cu_1.clone(),
+ cu_2.clone(),
+ ]));
let compute_units_response =
format!("{{\"jsonrpc\":\"2.0\",\"result\":\"{compute_units_response}\",\"id\":0}}");
let mut server = mockito::Server::new_async().await;
@@ -1210,7 +1302,7 @@ mod tests {
}
#[tokio::test]
- async fn test_register_worker() {
+ async fn test_activate_worker() {
let get_block_by_number_response = r#"{"jsonrpc":"2.0","id":0,"result":{"hash":"0xcbe8d90665392babc8098738ec78009193c99d3cc872a6657e306cfe8824bef9","parentHash":"0x15e767118a3e2d7545fee290b545faccd4a9eff849ac1057ce82cab7100c0c52","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","miner":"0x0000000000000000000000000000000000000000","stateRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","difficulty":"0x0","number":"0xa2","gasLimit":"0x1c9c380","gasUsed":"0x0","timestamp":"0x65d88f76","extraData":"0x","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0000000000000000","baseFeePerGas":"0x7","totalDifficulty":"0x0","uncles":[],"transactions":[],"size":"0x220"}}"#;
let estimate_gas_response = r#"{"jsonrpc":"2.0","id": 1,"result": "0x5208"}"#;
let max_priority_fee_response = r#"{"jsonrpc":"2.0","id": 2,"result": "0x5208"}"#;
@@ -1227,9 +1319,8 @@ mod tests {
let deal_id = "5e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c";
let deal_id = types::DealId::from(deal_id);
let worker_id = types::peer_scope::WorkerId::from(RandomPeerId::random());
- let cuid =
- CUID::from_hex("aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5")
- .unwrap();
+ let onchain_worker_id =
+ decode_hex("aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5").unwrap();
let mut server = mockito::Server::new_async().await;
let url = server.url();
@@ -1256,7 +1347,7 @@ mod tests {
.create();
let result = get_connector(&url)
- .register_worker(&deal_id, worker_id, cuid)
+ .register_worker(&deal_id, worker_id, onchain_worker_id)
.await
.unwrap();
assert_eq!(result, expected_tx_hash);
diff --git a/crates/chain-connector/src/function/deal.rs b/crates/chain-connector/src/function/deal.rs
index 792f7cb283..0227e3725b 100644
--- a/crates/chain-connector/src/function/deal.rs
+++ b/crates/chain-connector/src/function/deal.rs
@@ -16,7 +16,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-
+use alloy_primitives::FixedBytes;
use alloy_sol_types::{sol, SolType};
use hex_utils::decode_hex;
use serde::{Deserialize, Serialize};
@@ -44,12 +44,13 @@ sol! {
SMALL_BALANCE
}
- struct ComputeUnit {
- bytes32 id;
- bytes32 workerId;
+ struct Worker {
+ bytes32 offchainId;
+ bytes32 onchainId;
bytes32 peerId;
address provider;
uint256 joinedEpoch;
+ bytes32[] computeUnitIds;
}
/// @dev Returns the status of the deal
@@ -58,14 +59,18 @@ sol! {
/// @dev Returns the app CID
function appCID() external view returns (CIDV1 memory);
- /// @dev Set worker ID for a compute unit. Compute unit can have only one worker ID
- function setWorker(bytes32 computeUnitId, bytes32 workerId) external;
+ /// @dev Set offchain worker ID for a corresponding onchain worker for a deal
+ function activateWorker(bytes32 onchainId, bytes32 offchainId);
- /// @dev Returns the compute units info by provider
- function getComputeUnits() public view returns (ComputeUnit[] memory);
+ /// @dev Removes worker from the deal
+ function removeWorker(bytes32 onchainId) external;
+ /// @dev Returns workers
+ function getWorkers() external view returns (Worker[] memory);
}
}
+pub type OnChainWorkerID = FixedBytes<32>;
+
impl CIDV1 {
pub fn from_hex(hex: &str) -> Result {
let bytes = decode_hex(hex)?;
diff --git a/crates/chain-connector/src/function/offer.rs b/crates/chain-connector/src/function/offer.rs
index 6b43b64c3c..605ccadf22 100644
--- a/crates/chain-connector/src/function/offer.rs
+++ b/crates/chain-connector/src/function/offer.rs
@@ -36,15 +36,13 @@ sol! {
bytes32 id;
address deal;
uint256 startEpoch;
+ bytes32 onchainWorkerId;
}
/// @dev Returns the compute peer info
function getComputePeer(bytes32 peerId) external view returns (ComputePeer memory);
/// @dev Returns the compute units info of a peer
function getComputeUnits(bytes32 peerId) external view returns (ComputeUnit[] memory);
-
- /// @dev Return the compute unit from a deal
- function returnComputeUnitFromDeal(bytes32 unitId) external;
}
}
@@ -75,12 +73,12 @@ impl From for PendingUnit {
mod tests {
use crate::Offer::ComputePeer;
use alloy_primitives::{hex, U256};
- use alloy_sol_types::SolType;
+ use alloy_sol_types::SolValue;
use hex_utils::decode_hex;
- #[tokio::test]
- async fn decode_compute_unit() {
- let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d50000000000000000000000005e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c00000000000000000000000000000000000000000000000000000000000003e8";
+ #[test]
+ fn decode_compute_unit() {
+ let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d50000000000000000000000005e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c00000000000000000000000000000000000000000000000000000000000003e8bb3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633dd";
let compute_unit = super::ComputeUnit::abi_decode(&decode_hex(data).unwrap(), true);
assert!(compute_unit.is_ok());
let compute_unit = compute_unit.unwrap();
@@ -95,11 +93,15 @@ mod tests {
"0x5e3d0fde6f793b3115a9e7f5ebc195bbeed35d6c"
);
assert_eq!(compute_unit.startEpoch, U256::from(1000));
+ assert_eq!(
+ compute_unit.onchainWorkerId,
+ hex!("bb3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633dd")
+ )
}
- #[tokio::test]
- async fn decode_compute_unit_no_deal() {
- let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003e8";
+ #[test]
+ fn decode_compute_unit_no_deal_no_worker() {
+ let data = "aa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003e80000000000000000000000000000000000000000000000000000000000000000";
let compute_unit = super::ComputeUnit::abi_decode(&decode_hex(data).unwrap(), true);
assert!(compute_unit.is_ok());
let compute_unit = compute_unit.unwrap();
@@ -109,10 +111,11 @@ mod tests {
);
assert!(compute_unit.deal.is_zero());
assert_eq!(compute_unit.startEpoch, U256::from(1000));
+ assert!(compute_unit.onchainWorkerId.is_zero())
}
- #[tokio::test]
- async fn decode_compute_peer_no_commitment() {
+ #[test]
+ fn decode_compute_peer_no_commitment() {
let data = "0xaa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000020000000000000000000000005b73c5498c1e3b4dba84de0f1833c4a029d90519";
let compute_peer = ComputePeer::abi_decode(&decode_hex(data).unwrap(), true);
assert!(compute_peer.is_ok());
@@ -129,8 +132,8 @@ mod tests {
);
}
- #[tokio::test]
- async fn decode_compute_peer() {
+ #[test]
+ fn decode_compute_peer() {
let data = "0xaa3046a12a1aac6e840625e6329d70b427328fec36dc8d273e5e6454b85633d5aa3046a12a1aac6e840625e6329d70b427328feceedc8d273e5e6454b85633b5000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000005b73c5498c1e3b4dba84de0f1833c4a029d90519";
let compute_peer = ComputePeer::abi_decode(&decode_hex(data).unwrap(), true);
assert!(compute_peer.is_ok());
diff --git a/crates/chain-connector/src/types.rs b/crates/chain-connector/src/types.rs
index fd973adf96..48e6af46de 100644
--- a/crates/chain-connector/src/types.rs
+++ b/crates/chain-connector/src/types.rs
@@ -18,7 +18,6 @@
*/
use crate::error::ConnectorError;
use crate::function::Deal;
-use crate::Deal::ComputeUnit;
use alloy_primitives::U256;
use ccp_shared::types::{Difficulty, GlobalNonce, CUID};
use chain_data::parse_peer_id;
@@ -56,11 +55,14 @@ impl DealResult {
}
}
+pub type OnChainWorkerId = Vec;
+
#[derive(Debug, Serialize, Deserialize)]
pub struct DealInfo {
pub status: Deal::Status,
- pub unit_ids: Vec,
+ pub cu_ids: Vec,
pub app_cid: String,
+ pub onchain_worker_id: OnChainWorkerId,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -142,28 +144,32 @@ impl RawTxReceipt {
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
-pub struct Worker {
+pub struct SubnetWorker {
pub cu_ids: Vec,
pub host_id: String,
pub worker_id: Vec,
}
-impl TryFrom for Worker {
+impl TryFrom for SubnetWorker {
type Error = Report;
- fn try_from(unit: ComputeUnit) -> eyre::Result {
+ fn try_from(deal_worker: Deal::Worker) -> eyre::Result {
let mut worker_id = vec![];
- if !unit.workerId.is_zero() {
- let w_id = parse_peer_id(&unit.workerId.0)
- .map_err(|err| eyre!("Failed to parse unit.workerId: {err}"))?
+ if !deal_worker.offchainId.is_zero() {
+ let w_id = parse_peer_id(&deal_worker.offchainId.0)
+ .map_err(|err| eyre!("Failed to parse worker.offchainId: {err}"))?
.to_base58();
worker_id.push(w_id)
}
- let cu_id = unit.id.to_string();
- let peer_id = parse_peer_id(&unit.peerId.0)
- .map_err(|err| eyre!("Failed to parse unit.peerId: {err}"))?;
+ let cu_ids = deal_worker
+ .computeUnitIds
+ .into_iter()
+ .map(|id| id.to_string())
+ .collect();
+ let peer_id = parse_peer_id(&deal_worker.peerId.0)
+ .map_err(|err| eyre!("Failed to parse worker.peerId: {err}"))?;
Ok(Self {
- cu_ids: vec![cu_id],
+ cu_ids,
host_id: peer_id.to_base58(),
worker_id,
})
@@ -173,7 +179,7 @@ impl TryFrom for Worker {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct SubnetResolveResult {
pub success: bool,
- pub workers: Vec,
+ pub workers: Vec,
pub error: Vec,
}
diff --git a/crates/chain-listener/Cargo.toml b/crates/chain-listener/Cargo.toml
index 54a56f547f..c312248e02 100644
--- a/crates/chain-listener/Cargo.toml
+++ b/crates/chain-listener/Cargo.toml
@@ -44,4 +44,4 @@ serde_with = { workspace = true }
[dev-dependencies]
jsonrpsee = { workspace = true, features = ["server"] }
tempfile = { workspace = true }
-
+bs58 = { workspace = true }
diff --git a/crates/chain-listener/src/event/compute_unit_matched.rs b/crates/chain-listener/src/event/compute_unit_matched.rs
index 14202abd53..937a91f8d4 100644
--- a/crates/chain-listener/src/event/compute_unit_matched.rs
+++ b/crates/chain-listener/src/event/compute_unit_matched.rs
@@ -25,28 +25,31 @@ sol! {
bytes32 hash;
}
- event ComputeUnitMatched(
+ event ComputeUnitsMatched(
bytes32 indexed peerId,
address deal,
- bytes32 unitId,
- uint256 dealCreationBlock,
+ bytes32 onchainWorkerId,
+ bytes32[] cuIds,
CIDV1 appCID
);
}
#[cfg(test)]
mod tests {
- use crate::event::compute_unit_matched::ComputeUnitMatched;
- use alloy_primitives::Uint;
+ use crate::event::compute_unit_matched::{ComputeUnitsMatched, CIDV1};
+ use alloy_primitives::Address;
use alloy_sol_types::SolEvent;
- use chain_data::{parse_log, parse_peer_id, Log};
- use hex_utils::decode_hex;
+ use chain_data::{parse_log, parse_peer_id, peer_id_to_bytes, Log};
+ use fluence_libp2p::RandomPeerId;
+ use hex_utils::{decode_hex, encode_hex_0x};
+ use libipld::Cid;
+ use std::str::FromStr;
#[test]
fn topic() {
assert_eq!(
- ComputeUnitMatched::SIGNATURE_HASH.to_string(),
- String::from("0xb1c5a9179c3104a43de668491f14c45778f00ec34d5deee023af204820483bdb")
+ ComputeUnitsMatched::SIGNATURE_HASH.to_string(),
+ String::from("0x6e5629a2cfaa82d1ea7ad51936794f666271fbd5068017020eaaafdbb017e615")
);
}
@@ -73,72 +76,76 @@ mod tests {
#[test]
fn parse() {
- let data1 = "000000000000000000000000ffa0611a099ab68ad7c3c67b4ca5bbbee7a58b9900000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000500155122000000000000000000000000000000000000000000000000000000000a146af49df31c99c79a30ec4ae2abb2445d8c5d202ea58fa9ea9cbff45d4152e".to_string();
- let data2 = "00000000000000000000000067b2ad3866429282e16e55b715d12a77f85b7ce800000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000560155122000000000000000000000000000000000000000000000000000000000a146af49df31c99c79a30ec4ae2abb2445d8c5d202ea58fa9ea9cbff45d4152e".to_string();
+ let peer_id1 = RandomPeerId::random();
+ let cu_id1 = [1u8; 32];
+ let deal1 = "0xFfA0611a099AB68AD7C3C67B4cA5bbBEE7a58B99";
+ let cid = "bafkreifbi2xutxzrzgohtiyoysxcvozeixmmluqc5jmpvhvjzp7ulvavfy";
+ let cid_bytes = Cid::from_str(cid).unwrap().to_bytes();
+ let encoded_cid = CIDV1 {
+ prefixes: cid_bytes[0..4].as_chunks::<4>().0[0].into(),
+ hash: cid_bytes[4..36].as_chunks::<32>().0[0].into(),
+ };
+ let event1 = ComputeUnitsMatched {
+ peerId: peer_id_to_bytes(peer_id1).into(),
+ deal: Address::from_slice(&decode_hex(deal1).unwrap()),
+ onchainWorkerId: Default::default(),
+ cuIds: vec![cu_id1.into()],
+ appCID: encoded_cid.clone(),
+ };
+
+ let peer_id2 = RandomPeerId::random();
+ let cu_id2 = [2u8; 32];
+ let cu_id3 = [3u8; 32];
+ let deal2 = "0x67b2AD3866429282e16e55B715d12A77F85B7CE8";
+ let event2 = ComputeUnitsMatched {
+ peerId: peer_id_to_bytes(peer_id2).into(),
+ deal: Address::from_slice(&decode_hex(deal2).unwrap()),
+ onchainWorkerId: Default::default(),
+ cuIds: vec![cu_id2.into(), cu_id3.into()],
+ appCID: encoded_cid,
+ };
let log1 = Log {
- data: data1,
+ data: encode_hex_0x(&event1.encode_data()),
block_number: "0x0".to_string(),
removed: false,
topics: vec![
- ComputeUnitMatched::SIGNATURE_HASH.to_string(),
- "0x7a82a5feefcaad4a89c689412031e5f87c02b29e3fced583be5f05c7077354b7".to_string(),
+ ComputeUnitsMatched::SIGNATURE_HASH.to_string(),
+ encode_hex_0x(&peer_id_to_bytes(peer_id1)),
],
};
let log2 = Log {
- data: data2,
+ data: encode_hex_0x(&event2.encode_data()),
block_number: "0x1".to_string(),
removed: false,
topics: vec![
- ComputeUnitMatched::SIGNATURE_HASH.to_string(),
- "0x7a82a5feefcaad4a89c689412031e5f87c02b29e3fced583be5f05c7077354b7".to_string(),
+ ComputeUnitsMatched::SIGNATURE_HASH.to_string(),
+ encode_hex_0x(&peer_id_to_bytes(peer_id2)),
],
};
- let m = parse_log::(log1).expect("error parsing Match from log");
- assert_eq!(
- parse_peer_id(m.peerId.as_slice()).unwrap().to_string(),
- "12D3KooWJ4bTHirdTFNZpCS72TAzwtdmavTBkkEXtzo6wHL25CtE"
- );
- assert_eq!(
- m.deal.to_string(),
- "0xFfA0611a099AB68AD7C3C67B4cA5bbBEE7a58B99"
- );
- assert_eq!(
- m.unitId.to_string(),
- "0x00000000000000000000000000000000000000000000000000000000000000a0"
- );
- assert_eq!(m.dealCreationBlock, Uint::from(80));
+ let m = parse_log::(log1).expect("error parsing Match from log");
+ assert_eq!(parse_peer_id(m.peerId.as_slice()).unwrap(), peer_id1);
+ assert_eq!(m.deal.to_string(), deal1);
+ assert_eq!(m.cuIds.len(), 1);
+ assert_eq!(m.cuIds[0], cu_id1);
let cid_bytes = [m.appCID.prefixes.to_vec(), m.appCID.hash.to_vec()].concat();
let app_cid = libipld::Cid::read_bytes(cid_bytes.as_slice())
.unwrap()
.to_string();
- assert_eq!(
- app_cid,
- "bafkreifbi2xutxzrzgohtiyoysxcvozeixmmluqc5jmpvhvjzp7ulvavfy"
- );
+ assert_eq!(app_cid, cid);
+
+ let m = parse_log::(log2).expect("error parsing Match from log");
+ assert_eq!(parse_peer_id(m.peerId.as_slice()).unwrap(), peer_id2);
+ assert_eq!(m.deal.to_string(), deal2);
+ assert_eq!(m.cuIds.len(), 2);
+ assert_eq!(m.cuIds[0], cu_id2);
+ assert_eq!(m.cuIds[1], cu_id3);
- let m = parse_log::(log2).expect("error parsing Match from log");
- assert_eq!(
- parse_peer_id(m.peerId.as_slice()).unwrap().to_string(),
- "12D3KooWJ4bTHirdTFNZpCS72TAzwtdmavTBkkEXtzo6wHL25CtE"
- );
- assert_eq!(
- m.deal.to_string(),
- "0x67b2AD3866429282e16e55B715d12A77F85B7CE8"
- );
- assert_eq!(
- m.unitId.to_string(),
- "0x00000000000000000000000000000000000000000000000000000000000000a0"
- );
- assert_eq!(m.dealCreationBlock, Uint::from(86));
let cid_bytes = [m.appCID.prefixes.to_vec(), m.appCID.hash.to_vec()].concat();
let app_cid = libipld::Cid::read_bytes(cid_bytes.as_slice())
.unwrap()
.to_string();
- assert_eq!(
- app_cid,
- "bafkreifbi2xutxzrzgohtiyoysxcvozeixmmluqc5jmpvhvjzp7ulvavfy"
- );
+ assert_eq!(app_cid, cid);
}
}
diff --git a/crates/chain-listener/src/event/mod.rs b/crates/chain-listener/src/event/mod.rs
index 67fe5926e8..c3a0ecafa6 100644
--- a/crates/chain-listener/src/event/mod.rs
+++ b/crates/chain-listener/src/event/mod.rs
@@ -23,6 +23,6 @@ mod unit_activated;
mod unit_deactivated;
pub use cc_activated::CommitmentActivated;
-pub use compute_unit_matched::{ComputeUnitMatched, CIDV1};
+pub use compute_unit_matched::{ComputeUnitsMatched, CIDV1};
pub use unit_activated::UnitActivated;
pub use unit_deactivated::UnitDeactivated;
diff --git a/crates/chain-listener/src/lib.rs b/crates/chain-listener/src/lib.rs
index 4a3e2a161b..259a4d3d45 100644
--- a/crates/chain-listener/src/lib.rs
+++ b/crates/chain-listener/src/lib.rs
@@ -22,13 +22,10 @@
#![feature(extract_if)]
#![feature(btree_extract_if)]
#![feature(result_option_inspect)]
+#![feature(slice_as_chunks)]
extern crate core;
-pub use event::CommitmentActivated;
-pub use event::ComputeUnitMatched;
-pub use event::UnitActivated;
-pub use event::UnitDeactivated;
-pub use event::CIDV1;
+pub use event::{CommitmentActivated, ComputeUnitsMatched, UnitActivated, UnitDeactivated, CIDV1};
pub use listener::ChainListener;
mod event;
diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs
index 545511fb0b..24ca716316 100644
--- a/crates/chain-listener/src/listener.rs
+++ b/crates/chain-listener/src/listener.rs
@@ -22,6 +22,7 @@ use alloy_sol_types::SolEvent;
use backoff::Error::Permanent;
use std::cmp::min;
use std::collections::{BTreeMap, BTreeSet, HashMap};
+use std::fmt::{Debug, Formatter};
use std::future::{pending, Future};
use std::path::PathBuf;
use std::process::exit;
@@ -54,7 +55,7 @@ use tracing::Instrument;
use chain_connector::Offer::ComputeUnit;
use chain_connector::{
is_commitment_not_active, is_too_many_proofs, CCStatus, ChainConnector, CommitmentId,
- ConnectorError, Deal, PEER_NOT_EXISTS,
+ ConnectorError, Deal, OnChainWorkerID, PEER_NOT_EXISTS,
};
use chain_data::{parse_log, peer_id_to_hex, BlockHeader, Log};
use core_distributor::errors::AcquireError;
@@ -65,12 +66,32 @@ use server_config::{ChainConfig, ChainListenerConfig};
use types::DealId;
use crate::event::CommitmentActivated;
-use crate::event::{ComputeUnitMatched, UnitActivated, UnitDeactivated};
+use crate::event::{ComputeUnitsMatched, UnitActivated, UnitDeactivated};
use crate::proof_tracker::ProofTracker;
use crate::types::{CUGroups, PhysicalCoreGroups};
const PROOF_POLL_LIMIT: usize = 50;
+#[derive(Clone)]
+struct OnChainWorker {
+ id: OnChainWorkerID,
+ cu_ids: Vec,
+}
+
+impl Debug for OnChainWorker {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "(chain_worker_id = {}, cu_ids = {:?})",
+ self.id,
+ self.cu_ids
+ .iter()
+ .map(|cu| cu.to_string())
+ .collect::>()
+ )
+ }
+}
+
pub struct ChainListener {
config: ChainConfig,
listener_config: ChainListenerConfig,
@@ -103,7 +124,7 @@ pub struct ChainListener {
// the compute units that are in the commitment and not in deals
cc_compute_units: BTreeMap,
// the compute units that are in deals and not in commitment
- active_deals: BTreeMap,
+ active_deals: BTreeMap,
proof_tracker: ProofTracker,
pending_proof_txs: Vec<(String, Vec)>,
@@ -115,7 +136,7 @@ pub struct ChainListener {
// Subscriptions that are polled always
heads: Option>,
commitment_activated: Option>,
- unit_matched: Option>,
+ units_matched: Option>,
metrics: Option,
}
@@ -170,7 +191,7 @@ impl ChainListener {
unit_deactivated: None,
heads: None,
commitment_activated: None,
- unit_matched: None,
+ units_matched: None,
active_deals: BTreeMap::new(),
metrics,
proof_tracker: ProofTracker::new(persisted_proof_id_dir),
@@ -250,9 +271,9 @@ impl ChainListener {
}
}
},
- event = poll_subscription(&mut self.unit_matched) => {
- if let Err(err) = self.process_unit_matched(event) {
- self.handle_subscription_error("ComputeUnitMatched", err).await;
+ event = poll_subscription(&mut self.units_matched) => {
+ if let Err(err) = self.process_compute_units_matched(event) {
+ self.handle_subscription_error("ComputeUnitsMatched", err).await;
}
},
_ = timer.next() => {
@@ -491,7 +512,8 @@ impl ChainListener {
self.heads = Some(self.subscribe("newHeads", rpc_params!["newHeads"]).await?);
self.commitment_activated =
Some(self.subscribe("logs", self.cc_activated_params()).await?);
- self.unit_matched = Some(self.subscribe("logs", self.unit_matched_params()).await?);
+ self.units_matched =
+ Some(self.subscribe("logs", self.unit_matched_params()).await?);
if let Some(commitment_id) = self.current_commitment.clone() {
self.subscribe_unit_events(&commitment_id).await?;
}
@@ -530,6 +552,17 @@ impl ChainListener {
.map(|unit| (CUID::new(unit.id.0), unit))
.collect();
+ self.active_deals.clear();
+ for cu in &in_deal {
+ let cu_id = CUID::new(cu.id.0);
+ let deal_id = cu.deal.to_string().into();
+ let onchain_worker = self.active_deals.entry(deal_id).or_insert(OnChainWorker {
+ id: cu.onchainWorkerId,
+ cu_ids: vec![],
+ });
+ onchain_worker.cu_ids.push(cu_id);
+ }
+
let active = self
.cc_compute_units
.values()
@@ -540,13 +573,6 @@ impl ChainListener {
.values()
.filter(|unit| unit.startEpoch > self.current_epoch);
- for cu in &in_deal {
- let cu_id = CUID::new(cu.id.0);
- // TODO: in the future it should be BTreeMap>, because deal will be able
- // to use multiple CUs from one peer
- self.active_deals.insert(cu.deal.to_string().into(), cu_id);
- }
-
tracing::info!(target: "chain-listener",
"Compute units mapping: in cc {}/[{} pending], in deal {}",
self.cc_compute_units.len(),
@@ -567,14 +593,13 @@ impl ChainListener {
tracing::info!(target: "chain-listener",
"In deal compute units: {:?}",
self.active_deals.values()
- .map(CUID::to_string)
.collect::>()
);
// NOTE: cores are released after all the logs to simplify debug on failure
- for cu_id in self.active_deals.values() {
- self.core_distributor.release_worker_cores(&[*cu_id]);
- self.acquire_core_for_deal(*cu_id)?;
+ for worker in self.active_deals.values() {
+ self.core_distributor.release_worker_cores(&worker.cu_ids);
+ self.acquire_core_for_deal(worker.cu_ids.clone())?;
}
Ok(())
@@ -629,7 +654,7 @@ impl ChainListener {
fn unit_matched_params(&self) -> ArrayParams {
let topics = vec![
- ComputeUnitMatched::SIGNATURE_HASH.to_string(),
+ ComputeUnitsMatched::SIGNATURE_HASH.to_string(),
peer_id_to_hex(self.host_id),
];
rpc_params![
@@ -729,6 +754,7 @@ impl ChainListener {
id,
deal: Address::ZERO,
startEpoch: cc_event.startEpoch,
+ onchainWorkerId: FixedBytes::<32>::ZERO,
},
)
})
@@ -763,6 +789,7 @@ impl ChainListener {
id: unit_event.unitId,
deal: Address::ZERO,
startEpoch: unit_event.startEpoch,
+ onchainWorkerId: FixedBytes::<32>::ZERO,
},
);
@@ -788,11 +815,11 @@ impl ChainListener {
);
self.cc_compute_units.remove(&unit_id);
self.refresh_commitment().await?;
- self.acquire_core_for_deal(unit_id)?;
+ self.acquire_core_for_deal(vec![unit_id])?;
Ok(())
}
- fn process_unit_matched(
+ fn process_compute_units_matched(
&mut self,
event: Option>,
) -> eyre::Result<()> {
@@ -801,15 +828,24 @@ impl ChainListener {
tracing::error!(target: "chain-listener", "Failed to parse DealMatched event: {err}, data: {event}");
err
})?;
- let deal_event = parse_log::(log)?;
+ let deal_event = parse_log::(log)?;
tracing::info!(target: "chain-listener",
"Received DealMatched event for deal: {}",
deal_event.deal
);
+ let cu_ids = deal_event
+ .cuIds
+ .into_iter()
+ .map(|cu| CUID::new(cu.0))
+ .collect();
+
self.active_deals.insert(
deal_event.deal.to_string().into(),
- CUID::new(deal_event.unitId.0),
+ OnChainWorker {
+ id: deal_event.onchainWorkerId,
+ cu_ids,
+ },
);
Ok(())
}
@@ -1018,9 +1054,9 @@ impl ChainListener {
}
}
- fn acquire_core_for_deal(&self, unit_id: CUID) -> eyre::Result<()> {
+ fn acquire_core_for_deal(&self, cu_ids: Vec) -> eyre::Result<()> {
self.core_distributor
- .acquire_worker_cores(AcquireRequest::new(vec![unit_id], WorkType::Deal))?;
+ .acquire_worker_cores(AcquireRequest::new(cu_ids, WorkType::Deal))?;
Ok(())
}
@@ -1244,7 +1280,7 @@ impl ChainListener {
})
.await?;
- for (status, (deal_id, cu_id)) in statuses
+ for (status, (deal_id, worker)) in statuses
.into_iter()
.zip(self.active_deals.clone().into_iter())
{
@@ -1252,7 +1288,7 @@ impl ChainListener {
Ok(status) => match status {
Deal::Status::INSUFFICIENT_FUNDS | Deal::Status::ENDED => {
tracing::info!(target: "chain-listener", "Deal {deal_id} status: {status:?}; Exiting...");
- self.exit_deal(&deal_id, cu_id).await?;
+ self.exit_deal(&deal_id, worker.id).await?;
tracing::info!(target: "chain-listener", "Exited deal {deal_id} successfully");
}
_ => {}
@@ -1266,14 +1302,18 @@ impl ChainListener {
Ok(())
}
- async fn exit_deal(&mut self, deal_id: &DealId, cu_id: CUID) -> eyre::Result<()> {
+ async fn exit_deal(
+ &mut self,
+ deal_id: &DealId,
+ onchain_worker_id: OnChainWorkerID,
+ ) -> eyre::Result<()> {
let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(3)),
..ExponentialBackoff::default()
};
retry(backoff, || async {
- self.chain_connector.exit_deal(&cu_id).await.map_err(|err| {
+ self.chain_connector.exit_deal(deal_id, onchain_worker_id).await.map_err(|err| {
tracing::warn!(target: "chain-listener", "Failed to exit deal {deal_id}: {err}");
eyre!("Failed to exit deal {deal_id}: {err}; Retrying...")
})?;
diff --git a/crates/nox-tests/tests/builtin.rs b/crates/nox-tests/tests/builtin.rs
index ebe42f80c7..7885e4912d 100644
--- a/crates/nox-tests/tests/builtin.rs
+++ b/crates/nox-tests/tests/builtin.rs
@@ -2377,24 +2377,33 @@ async fn aliases_restart() {
#[ignore]
#[tokio::test]
async fn subnet_resolve() {
- let cu_1 = chain_connector::Deal::ComputeUnit {
- id: hex!("0000000000000000000000000000000000000000000000000000000000000001").into(),
- workerId: peer_id_to_bytes(RandomPeerId::random()).into(),
+ let worker1 = chain_connector::Deal::Worker {
+ offchainId: peer_id_to_bytes(RandomPeerId::random()).into(),
peerId: peer_id_to_bytes(RandomPeerId::random()).into(),
provider: Default::default(),
joinedEpoch: Default::default(),
+ onchainId: Default::default(),
+ computeUnitIds: vec![hex!(
+ "0000000000000000000000000000000000000000000000000000000000000001"
+ )
+ .into()],
};
- let cu_2 = chain_connector::Deal::ComputeUnit {
- id: hex!("0000000000000000000000000000000000000000000000000000000000000002").into(),
- workerId: Default::default(),
+ let worker2 = chain_connector::Deal::Worker {
+ offchainId: Default::default(),
peerId: peer_id_to_bytes(RandomPeerId::random()).into(),
provider: Default::default(),
joinedEpoch: Default::default(),
+ onchainId: Default::default(),
+ computeUnitIds: vec![hex!(
+ "0000000000000000000000000000000000000000000000000000000000000002"
+ )
+ .into()],
};
- let resolve_result = encode_hex_0x(Array::::abi_encode(
- &vec![cu_1.clone(), cu_2.clone()],
- ));
+ let resolve_result = encode_hex_0x(Array::::abi_encode(&vec![
+ worker1.clone(),
+ worker2.clone(),
+ ]));
// Create a mock
let mut server = mockito::Server::new_async().await;
@@ -2485,13 +2494,13 @@ async fn subnet_resolve() {
workers,
vec![
(
- vec![encode_hex_0x(cu_1.id.0).to_string()],
- parse_peer_id(&cu_1.peerId.0).unwrap().to_base58(),
- vec![parse_peer_id(&cu_1.workerId.0).unwrap().to_base58()],
+ vec![encode_hex_0x(worker1.computeUnitIds[0].0).to_string()],
+ parse_peer_id(&worker1.peerId.0).unwrap().to_base58(),
+ vec![parse_peer_id(&worker1.offchainId.0).unwrap().to_base58()],
),
(
- vec![encode_hex_0x(cu_2.id.0).to_string()],
- parse_peer_id(&cu_2.peerId.0).unwrap().to_base58(),
+ vec![encode_hex_0x(worker2.computeUnitIds[0].0).to_string()],
+ parse_peer_id(&worker2.peerId.0).unwrap().to_base58(),
vec![]
)
]
diff --git a/crates/nox-tests/tests/chain_listener/chain_server.rs b/crates/nox-tests/tests/chain_listener/chain_server.rs
index 473568568d..0c47ad4936 100644
--- a/crates/nox-tests/tests/chain_listener/chain_server.rs
+++ b/crates/nox-tests/tests/chain_listener/chain_server.rs
@@ -26,10 +26,10 @@ use chain_connector::Capacity::CapacityCalls;
use chain_connector::Core::CoreCalls;
use chain_connector::Deal::{DealCalls, Status};
use chain_connector::Offer::{ComputeUnit, OfferCalls};
-use chain_connector::{CCStatus, CommitmentId, Offer};
+use chain_connector::{CCStatus, CommitmentId, Deal};
use chain_data::{parse_peer_id, peer_id_to_bytes};
use chain_listener::{
- CommitmentActivated, ComputeUnitMatched, UnitActivated, UnitDeactivated, CIDV1,
+ CommitmentActivated, ComputeUnitsMatched, UnitActivated, UnitDeactivated, CIDV1,
};
use clarity::Transaction;
use eyre::{eyre, OptionExt};
@@ -40,6 +40,7 @@ use jsonrpsee::server::ServerHandle;
use jsonrpsee::types::{ErrorObject, Params};
use jsonrpsee::{RpcModule, SubscriptionMessage};
use parking_lot::{Mutex, MutexGuard};
+use rand::Rng;
use serde_json::{json, Value};
use std::collections::BTreeSet;
use std::str::FromStr;
@@ -189,9 +190,6 @@ impl ChainServer {
let data = Array::::abi_encode(&state.units);
Ok(encode_hex_0x(data))
}
- OfferCalls::returnComputeUnitFromDeal(_params) => {
- todo!()
- }
}
} else if let Ok(res) = DealCalls::abi_decode(data.as_slice(), true) {
if !deal_addresses.contains(&to) {
@@ -216,10 +214,13 @@ impl ChainServer {
DealCalls::appCID(_) => {
todo!()
}
- DealCalls::setWorker(_) => {
+ DealCalls::activateWorker(_) => {
+ todo!()
+ }
+ DealCalls::getWorkers(_) => {
todo!()
}
- DealCalls::getComputeUnits(_) => {
+ DealCalls::removeWorker(_) => {
todo!()
}
}
@@ -424,6 +425,12 @@ impl ChainServer {
pub async fn create_deal(&self, params: CreateDealParams) {
let mut state = self.chain_state.lock();
let block_number = state.block_number;
+ let on_chain_worker_id = FixedBytes::<32>::from(rand::thread_rng().gen::<[u8; 32]>());
+
+ state
+ .workers
+ .insert(on_chain_worker_id, vec![params.unit.id]);
+
state.deal_statuses.insert(params.deal_id, Status::ACTIVE);
let peer_state = state.peer_states.get_mut(¶ms.peer_id).unwrap();
if let Some(state_unit) = peer_state
@@ -432,6 +439,7 @@ impl ChainServer {
.find(|state_unit| state_unit.id == params.unit.id)
{
state_unit.deal = params.deal_id;
+ state_unit.onchainWorkerId = on_chain_worker_id;
}
{
@@ -467,21 +475,21 @@ impl ChainServer {
}
{
- let event = ComputeUnitMatched {
+ let event = ComputeUnitsMatched {
peerId: FixedBytes::new(peer_id_to_bytes(params.peer_id)),
deal: params.deal_id,
- unitId: params.unit.id,
- dealCreationBlock: block_number,
+ onchainWorkerId: Default::default(),
appCID: CIDV1 {
prefixes: Default::default(),
hash: Default::default(),
},
+ cuIds: vec![params.unit.id],
};
let data = event.encode_data();
let data = encode_hex_0x(data);
let message = SubscriptionMessage::from_json(&json!({
"topics": vec![
- ComputeUnitMatched::SIGNATURE_HASH.to_string(),
+ ComputeUnitsMatched::SIGNATURE_HASH.to_string(),
encode_hex_0x(peer_id_to_bytes(params.peer_id)),
],
"data": data,
@@ -492,7 +500,7 @@ impl ChainServer {
let message = LogsParams {
address: self.diamond_contract_address.clone(),
topics: vec![
- ComputeUnitMatched::SIGNATURE_HASH.to_string(),
+ ComputeUnitsMatched::SIGNATURE_HASH.to_string(),
encode_hex_0x(peer_id_to_bytes(params.peer_id)),
],
message,
@@ -520,74 +528,94 @@ impl ChainServer {
let transaction = Transaction::decode_from_rlp(&mut transaction.as_slice())
.map_err(|_| ErrorObject::owned(500, "", None::))?;
+ let state = ctx.chain_state.lock();
+
+ let deal_addresses = state
+ .deal_statuses
+ .keys()
+ .map(|addr| addr.to_checksum(None))
+ .collect::>();
+
match transaction {
Transaction::Legacy { .. } => {}
Transaction::Eip2930 { .. } => {}
Transaction::Eip1559 { to, data, .. } => {
let to = to.to_string();
- if to == contract_address {
- let call =
- Offer::returnComputeUnitFromDealCall::abi_decode(data.as_slice(), true)
- .map_err(|_| {
- ErrorObject::owned(
- 500,
- "expected ABI for function returnComputeUnitFromDealCall",
- None::,
- )
- })?;
+ if deal_addresses.contains(&to) {
+ let call = Deal::removeWorkerCall::abi_decode(data.as_slice(), true).map_err(
+ |_| {
+ ErrorObject::owned(
+ 500,
+ "expected ABI for function returnComputeUnitFromDealCall",
+ None::,
+ )
+ },
+ )?;
- let state = ctx.chain_state.lock();
- let unit_state =
+ let worker_cus =
state
- .unit_state
- .get(&call.unitId)
+ .workers
+ .get(&call.onchainId)
.ok_or(ErrorObject::owned(
500,
- format!("no such unitId {}", call.unitId),
+ &format!("no such worker {}", call.onchainId),
None::,
))?;
- let commitment_id =
- unit_state.commitment_id.clone().ok_or(ErrorObject::owned(
- 500,
- &format!("compute unit {} doesn't have commitment id", call.unitId),
- None::,
- ))?;
-
- let event = UnitActivated {
- commitmentId: FixedBytes::new(commitment_id.0),
- unitId: call.unitId,
- startEpoch: state.current_epoch + U256::from(100),
- };
-
- let data = event.encode_data();
- let data = encode_hex_0x(data);
-
- let message = SubscriptionMessage::from_json(&json!({
- "topics": vec![
- UnitActivated::SIGNATURE_HASH.to_string(),
- commitment_id.to_string(),
- encode_hex_0x(call.unitId),
- ],
- "data": data,
- "blockNumber": state.block_number,
- "removed": false
- }))
- .unwrap();
-
- let message = LogsParams {
- address: contract_address.clone(),
- topics: vec![
+ for unit_id in worker_cus {
+ let unit_state =
+ state.unit_state.get(unit_id).ok_or(ErrorObject::owned(
+ 500,
+ format!("no such unitId {}", unit_id),
+ None::,
+ ))?;
+
+ let commitment_id =
+ unit_state.commitment_id.clone().ok_or(ErrorObject::owned(
+ 500,
+ &format!("compute unit {} doesn't have commitment id", unit_id),
+ None::,
+ ))?;
+
+ let event = UnitActivated {
+ commitmentId: FixedBytes::new(commitment_id.0),
+ unitId: unit_id.clone(),
+ startEpoch: state.current_epoch + U256::from(100),
+ };
+
+ let data = event.encode_data();
+ let data = encode_hex_0x(data);
+
+ let message = SubscriptionMessage::from_json(&json!({
+ "topics": vec![
UnitActivated::SIGNATURE_HASH.to_string(),
commitment_id.to_string(),
- encode_hex_0x(call.unitId),
- ],
- message,
- };
+ encode_hex_0x(unit_id),
+ ],
+ "data": data,
+ "blockNumber": state.block_number,
+ "removed": false
+ }))
+ .unwrap();
+
+ let message = LogsParams {
+ address: contract_address.clone(),
+ topics: vec![
+ UnitActivated::SIGNATURE_HASH.to_string(),
+ commitment_id.to_string(),
+ encode_hex_0x(unit_id.0),
+ ],
+ message,
+ };
- ctx.logs_sender.send(message).unwrap();
+ ctx.logs_sender.send(message).unwrap();
+ }
} else {
- return Err(ErrorObject::owned(500, &format!("Only TXs with returnComputeUnitFromDealCall sent to diamond address {} are supported", contract_address), None::));
+ return Err(ErrorObject::owned(
+ 500,
+ &format!("Only TXs with removeWorker sent to deal addresses are supported",),
+ None::,
+ ));
}
}
}
diff --git a/crates/nox-tests/tests/chain_listener/tests.rs b/crates/nox-tests/tests/chain_listener/tests.rs
index 823d37cb57..4991d13b77 100644
--- a/crates/nox-tests/tests/chain_listener/tests.rs
+++ b/crates/nox-tests/tests/chain_listener/tests.rs
@@ -157,6 +157,7 @@ async fn test_cc_activation_flow() {
id: compute_unit_id_1,
deal: Address::ZERO,
startEpoch: U256::from(2),
+ onchainWorkerId: FixedBytes::<32>::ZERO,
};
let compute_peer_1 = ComputePeer {
@@ -257,18 +258,21 @@ async fn test_deal_insufficient_funds_flow() {
id: compute_unit_id_1,
deal: Address::ZERO,
startEpoch: U256::from(2),
+ onchainWorkerId: FixedBytes::<32>::ZERO,
};
let compute_unit_2 = ComputeUnit {
id: compute_unit_id_2,
deal: Address::ZERO,
startEpoch: U256::from(2),
+ onchainWorkerId: FixedBytes::<32>::ZERO,
};
let compute_unit_3 = ComputeUnit {
id: compute_unit_id_3,
deal: Address::ZERO,
startEpoch: U256::from(2),
+ onchainWorkerId: FixedBytes::<32>::ZERO,
};
let compute_peer_1 = ComputePeer {
diff --git a/crates/nox-tests/tests/chain_listener/types.rs b/crates/nox-tests/tests/chain_listener/types.rs
index 832fa6126a..31c59f0282 100644
--- a/crates/nox-tests/tests/chain_listener/types.rs
+++ b/crates/nox-tests/tests/chain_listener/types.rs
@@ -22,7 +22,7 @@ use alloy_primitives::{Address, FixedBytes, U256};
use ccp_shared::types::{Difficulty, GlobalNonce, PhysicalCoreId, CUID};
use chain_connector::Deal::Status;
use chain_connector::Offer::{ComputePeer, ComputeUnit};
-use chain_connector::{CCStatus, CommitmentId};
+use chain_connector::{CCStatus, CommitmentId, OnChainWorkerID};
use fluence_libp2p::PeerId;
use hex::FromHex;
use jsonrpsee::SubscriptionMessage;
@@ -30,6 +30,7 @@ use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
+type ChainCUID = FixedBytes<32>;
pub struct ChainState {
pub(crate) block_number: U256,
pub(crate) block_duration: U256,
@@ -44,7 +45,8 @@ pub struct ChainState {
pub(crate) commitment_activation_at: HashMap,
pub(crate) peer_states: HashMap,
pub(crate) deal_statuses: HashMap,
- pub(crate) unit_state: HashMap, UnitState>,
+ pub(crate) unit_state: HashMap,
+ pub(crate) workers: HashMap>,
}
impl Default for ChainState {
@@ -64,6 +66,7 @@ impl Default for ChainState {
deal_statuses: Default::default(),
commitment_activation_at: Default::default(),
unit_state: Default::default(),
+ workers: Default::default(),
}
}
}
diff --git a/crates/system-services/Cargo.toml b/crates/system-services/Cargo.toml
index 8ebfc62084..d86c9ec5a4 100644
--- a/crates/system-services/Cargo.toml
+++ b/crates/system-services/Cargo.toml
@@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
aqua-ipfs-distro = "=0.6.0"
-decider-distro = "=0.7.1"
+decider-distro = "=0.7.2"
registry-distro = "=0.9.4"
trust-graph-distro = "=0.4.11"