From df6daffab7bb15f5632cbe7eef5ff1f73a600633 Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 20 May 2024 14:38:56 +0300 Subject: [PATCH 1/2] fix(core-manager): add range check + clippy fixes --- .github/workflows/e2e.yml | 3 ++- Cargo.lock | 13 +++++++------ Cargo.toml | 6 +++--- crates/core-manager/Cargo.toml | 3 ++- crates/core-manager/src/core_range.rs | 10 ++++++++++ crates/core-manager/src/dev.rs | 28 ++++++++++++++++++++++++--- crates/core-manager/src/errors.rs | 2 ++ crates/core-manager/src/strict.rs | 26 +++++++++++++++++++++++-- crates/workers/src/workers.rs | 15 +++++++++++++- 9 files changed, 89 insertions(+), 17 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 5bc46f7af9..b2ce304939 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -89,9 +89,10 @@ jobs: js-client: needs: - nox-snapshot - uses: fluencelabs/js-client/.github/workflows/tests.yml@main + uses: fluencelabs/js-client/.github/workflows/tests.yml@timeout with: nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}" + ref: "timeout" aqua: needs: diff --git a/Cargo.lock b/Cargo.lock index aa7ee3e285..b64705336d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1251,9 +1251,9 @@ dependencies = [ [[package]] name = "ccp-rpc-client" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dcb551e7eaad87b0447b2a65e2bb2ff1c2ea8184c6b9f6ee69f753556fa9980" +checksum = "2888e0529487437677d06c31dfc581bb8bea06d0fc007db0a8484307be9d7de5" dependencies = [ "ccp-shared", "hex", @@ -1263,9 +1263,9 @@ dependencies = [ [[package]] name = "ccp-shared" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b87301eb83acefe8c2b241c8aed467ddad7931e63aec1a2d438bba324a18b76d" +checksum = "7ccecad3bb44270ec5497788954232de0c55564ceadc2eca4f0a45da0b3a77e2" dependencies = [ "hex", "newtype_derive", @@ -1681,6 +1681,7 @@ dependencies = [ "hex-utils", "multimap 0.10.0", "newtype_derive", + "nonempty", "num_cpus", "parking_lot", "rand 0.8.5", @@ -1715,9 +1716,9 @@ dependencies = [ [[package]] name = "cpu-utils" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1214c39c3a71f470eab128fdd2495995cc13401d5a76191f9200ea1020917d29" +checksum = "8efb8938379cbf7f471035078b25e10e6b12bd8e0eeacba4dbbf92e1be39fedb" dependencies = [ "ccp-shared", "ccp_core_affinity", diff --git a/Cargo.toml b/Cargo.toml index 12767540a8..3dd984a166 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -174,9 +174,9 @@ enum_dispatch = "0.3.12" serde_with = "3.7.0" mockito = "1.2.0" clarity = "1.3.0" -cpu-utils = "0.8.0" -ccp-shared = "0.8.0" -ccp-rpc-client = "0.8.0" +cpu-utils = "0.9.0" +ccp-shared = "0.9.0" +ccp-rpc-client = "0.9.0" alloy-sol-types = "0.6.4" alloy-primitives = "0.6.4" alloy_serde_macro = "0.1.2" diff --git a/crates/core-manager/Cargo.toml b/crates/core-manager/Cargo.toml index d6bb6288f2..d1f5994fcc 100644 --- a/crates/core-manager/Cargo.toml +++ b/crates/core-manager/Cargo.toml @@ -14,6 +14,7 @@ multimap = { version = "0.10.0", features = ["serde"] } bimap = { version = "0.6.3", features = ["serde"] } toml = "0.8.12" newtype_derive = "0.1.6" +nonempty = "0.9.0" tokio = { workspace = true, features = ["fs", "rt", "sync", "macros", "tracing"] } async-trait.workspace = true @@ -25,7 +26,7 @@ serde = { workspace = true, features = ["derive"] } tracing.workspace = true tokio-stream.workspace = true futures.workspace = true -rand = "0.8.5" +rand = { workspace = true } hex.workspace = true serde_with = { workspace = true } hex-utils = { workspace = true, features = ["serde_with"] } diff --git a/crates/core-manager/src/core_range.rs b/crates/core-manager/src/core_range.rs index 240a403e77..3984c3ca75 100644 --- a/crates/core-manager/src/core_range.rs +++ b/crates/core-manager/src/core_range.rs @@ -1,3 +1,5 @@ +use ccp_shared::types::PhysicalCoreId; +use nonempty::NonEmpty; use range_set_blaze::RangeSetBlaze; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt::{Debug, Display, Formatter}; @@ -6,6 +8,14 @@ use thiserror::Error; #[derive(Clone, PartialEq)] pub struct CoreRange(pub(crate) RangeSetBlaze); +impl CoreRange { + pub fn is_subset(&self, cores: &NonEmpty) -> bool { + let range: RangeSetBlaze = + RangeSetBlaze::from_iter(cores.into_iter().map(|core| ::from(*core))); + + self.0.is_subset(&range) + } +} impl Debug for CoreRange { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { diff --git a/crates/core-manager/src/dev.rs b/crates/core-manager/src/dev.rs index 2dc2d3ee85..242a081bc2 100644 --- a/crates/core-manager/src/dev.rs +++ b/crates/core-manager/src/dev.rs @@ -105,6 +105,10 @@ impl DevCoreManager { .physical_cores() .map_err(|err| CreateError::CollectCoresData { err })?; + if !core_range.is_subset(&physical_cores) { + return Err(CreateError::WrongCpuRange); + } + let mut cores_mapping: MultiMap = MultiMap::with_capacity_and_hasher(available_core_count, FxBuildHasher::default()); @@ -301,7 +305,7 @@ impl CoreManagerFunctions for DevCoreManager { fn release(&self, unit_ids: &[CUID]) { let mut lock = self.state.write(); for unit_id in unit_ids { - if let Some(physical_core_id) = lock.unit_id_core_mapping.remove(&unit_id) { + if let Some(physical_core_id) = lock.unit_id_core_mapping.remove(unit_id) { let mapping = lock.core_unit_id_mapping.get_vec_mut(&physical_core_id); if let Some(mapping) = mapping { let index = mapping.iter().position(|x| x == unit_id).unwrap(); @@ -310,7 +314,7 @@ impl CoreManagerFunctions for DevCoreManager { lock.core_unit_id_mapping.remove(&physical_core_id); } } - lock.work_type_mapping.remove(&unit_id); + lock.work_type_mapping.remove(unit_id); } } } @@ -354,10 +358,11 @@ mod tests { use ccp_shared::types::CUID; use hex::FromHex; use rand::RngCore; + use std::str::FromStr; use crate::manager::CoreManagerFunctions; use crate::types::{AcquireRequest, WorkType}; - use crate::{CoreRange, DevCoreManager}; + use crate::{CoreRange, DevCoreManager, StrictCoreManager}; fn cores_exists() -> bool { num_cpus::get_physical() >= 4 @@ -539,4 +544,21 @@ mod tests { assert_eq!(after_assignment_type_mapping.len(), assignment_count * 2); } } + + #[test] + fn test_wrong_range() { + if cores_exists() { + let temp_dir = tempfile::tempdir().expect("Failed to create temp dir"); + + let range = CoreRange::from_str("0-16384").unwrap(); + + let result = StrictCoreManager::from_path(temp_dir.path().join("test.toml"), 2, range); + + assert!(result.is_err()); + assert_eq!( + result.err().map(|err| err.to_string()), + Some("The specified CPU range exceeds the available CPU count".to_string()) + ); + } + } } diff --git a/crates/core-manager/src/errors.rs b/crates/core-manager/src/errors.rs index 732196627c..dc54268cbe 100644 --- a/crates/core-manager/src/errors.rs +++ b/crates/core-manager/src/errors.rs @@ -14,6 +14,8 @@ pub enum CreateError { CreateTopology { err: CPUTopologyError }, #[error("Failed to collect cores data from OS {err:?}")] CollectCoresData { err: CPUTopologyError }, + #[error("The specified CPU range exceeds the available CPU count")] + WrongCpuRange, } #[derive(Debug, Error)] diff --git a/crates/core-manager/src/strict.rs b/crates/core-manager/src/strict.rs index f08cf110c8..6a1c6c14c5 100644 --- a/crates/core-manager/src/strict.rs +++ b/crates/core-manager/src/strict.rs @@ -102,6 +102,10 @@ impl StrictCoreManager { .physical_cores() .map_err(|err| CreateError::CollectCoresData { err })?; + if !core_range.is_subset(&physical_cores) { + return Err(CreateError::WrongCpuRange); + } + let mut cores_mapping: MultiMap = MultiMap::with_capacity_and_hasher(available_core_count, FxBuildHasher::default()); @@ -302,9 +306,9 @@ impl CoreManagerFunctions for StrictCoreManager { fn release(&self, unit_ids: &[CUID]) { let mut lock = self.state.write(); for unit_id in unit_ids { - if let Some((physical_core_id, _)) = lock.unit_id_mapping.remove_by_right(&unit_id) { + if let Some((physical_core_id, _)) = lock.unit_id_mapping.remove_by_right(unit_id) { lock.available_cores.insert(physical_core_id); - lock.work_type_mapping.remove(&unit_id); + lock.work_type_mapping.remove(unit_id); } } } @@ -348,6 +352,7 @@ mod tests { use ccp_shared::types::{LogicalCoreId, PhysicalCoreId, CUID}; use hex::FromHex; use std::collections::BTreeSet; + use std::str::FromStr; use crate::manager::CoreManagerFunctions; use crate::persistence::PersistentCoreManagerState; @@ -536,4 +541,21 @@ mod tests { assert_eq!(expected, result.unwrap_err().to_string()); } } + + #[test] + fn test_wrong_range() { + if cores_exists() { + let temp_dir = tempfile::tempdir().expect("Failed to create temp dir"); + + let range = CoreRange::from_str("0-16384").unwrap(); + + let result = StrictCoreManager::from_path(temp_dir.path().join("test.toml"), 2, range); + + assert!(result.is_err()); + assert_eq!( + result.err().map(|err| err.to_string()), + Some("The specified CPU range exceeds the available CPU count".to_string()) + ); + } + } } diff --git a/crates/workers/src/workers.rs b/crates/workers/src/workers.rs index c450410f42..ecec58718d 100644 --- a/crates/workers/src/workers.rs +++ b/crates/workers/src/workers.rs @@ -238,8 +238,21 @@ impl Workers { .await .map_err(|_err| WorkersError::FailedToNotifySubsystem { worker_id }); match result { - Ok(_) => Ok(()), + Ok(_) => { + tracing::info!( + target = "worker-registry", + worker_id = worker_id.to_string(), + "Worker created {worker_id}" + ); + Ok(()) + } Err(err) => { + tracing::error!( + target = "worker-registry", + worker_id = worker_id.to_string(), + "Failed to notify subsystem for {worker_id}: {}", + err + ); let mut worker_ids = self.worker_ids.write(); let mut worker_infos = self.worker_infos.write(); let mut runtimes = self.runtimes.write(); From 1d0c0b867e451ce44dd908d710d4afbf5400910e Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 20 May 2024 14:50:52 +0300 Subject: [PATCH 2/2] fix branches --- .github/workflows/e2e.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index b2ce304939..659535da9b 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -82,17 +82,18 @@ jobs: cli: needs: - nox-snapshot - uses: fluencelabs/cli/.github/workflows/tests.yml@main + uses: fluencelabs/cli/.github/workflows/tests.yml@timeout with: nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}" + ref: "timeout" + js-client: needs: - nox-snapshot - uses: fluencelabs/js-client/.github/workflows/tests.yml@timeout + uses: fluencelabs/js-client/.github/workflows/tests.yml@main with: nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}" - ref: "timeout" aqua: needs: