Skip to content

Commit

Permalink
fix(cc): Continue CCP calculation if cpu_cores < cu_number [fixes NET…
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu authored Apr 10, 2024
1 parent 41faf0b commit d3d5f27
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 121 deletions.
219 changes: 142 additions & 77 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use chain_connector::{
ConnectorError, Deal, PEER_NOT_EXISTS,
};
use chain_data::{parse_log, peer_id_to_hex, Log};
use core_manager::types::{AcquireRequest, WorkType};
use core_manager::errors::AcquireError;
use core_manager::types::{AcquireRequest, Assignment, WorkType};
use core_manager::{CoreManager, CoreManagerFunctions, CUID};
use peer_metrics::ChainListenerMetrics;
use server_config::{ChainConfig, ChainListenerConfig};
Expand Down Expand Up @@ -813,46 +814,31 @@ impl ChainListener {
Ok(())
}

/// Return already started units involved in CC and not having less than MIN_PROOFS_PER_EPOCH proofs in the current epoch
fn get_priority_units(&self) -> Vec<CUID> {
self.cc_compute_units
.iter()
.filter(|(_, cu)| cu.startEpoch <= self.current_epoch) // CU is already started
.filter(|(cuid, _)| {
self.proof_counter
.get(cuid)
.map(|count| *count < self.min_proofs_per_epoch)
.unwrap_or(true)
})
.map(|(cuid, _)| *cuid)
.collect()
}

/// Return already started units involved in CC and found at least MIN_PROOFS_PER_EPOCH proofs,
/// but less that MAX_PROOFS_PER_EPOCH proofs in the current epoch
fn get_non_priority_units(&self) -> Vec<CUID> {
self.cc_compute_units
.iter()
.filter(|(_, cu)| cu.startEpoch <= self.current_epoch) // CU is already started
.filter(|(cuid, _)| {
self.proof_counter
.get(cuid)
.map(|count| {
*count >= self.min_proofs_per_epoch && *count < self.max_proofs_per_epoch
})
.unwrap_or(true)
})
.map(|(cuid, _)| *cuid)
.collect()
}

/// Return units in CC that is not active yet and can't produce proofs in the current epoch
fn get_pending_units(&self) -> Vec<CUID> {
self.cc_compute_units
.values()
.filter(|cu| cu.startEpoch > self.current_epoch) // CU hasn't yet started
.map(|cu| CUID::new(cu.id.0))
.collect()
fn get_cu_groups(&self) -> CUGroups {
let mut priority_units: Vec<CUID> = Vec::new();
let mut non_priority_units: Vec<CUID> = Vec::new();
let mut pending_units: Vec<CUID> = Vec::new();
let mut finished_units: Vec<CUID> = Vec::new();
for (cuid, cu) in &self.cc_compute_units {
if cu.startEpoch <= self.current_epoch {
let count = self.proof_counter.get(cuid).unwrap_or(&U256::ZERO);
if count < &self.min_proofs_per_epoch {
priority_units.push(*cuid)
} else if *count >= self.max_proofs_per_epoch {
finished_units.push(*cuid)
} else {
non_priority_units.push(*cuid)
}
} else {
pending_units.push(*cuid);
}
}
CUGroups {
priority_units,
non_priority_units,
pending_units,
finished_units,
}
}

/// Send GlobalNonce, Difficulty and Core<>CUID mapping (full commitment info) to CCP
Expand All @@ -878,44 +864,52 @@ impl ChainListener {
None => return Ok(()),
};

let priority_units = self.get_priority_units();
let non_priority_units = self.get_non_priority_units();
let pending_units = self.get_pending_units();
let cu_groups = self.get_cu_groups();

let cc_cores = self.acquire_cores_for_cc(&cu_groups)?;

let mut cu_allocation = HashMap::new();
let priority_cores = self.acquire_cores_for_cc(&priority_units)?;
let non_priority_cores = self.acquire_cores_for_cc(&non_priority_units)?;
let pending_cores = self.acquire_cores_for_cc(&pending_units)?;
let mut cu_allocation: HashMap<PhysicalCoreId, CUID> = HashMap::new();

if all_min_proofs_found(&priority_units) {
if cu_groups.all_min_proofs_found() {
tracing::info!(target: "chain-listener", "All CUs found minimal number of proofs {} in current epoch {}", self.min_proofs_per_epoch, self.current_epoch);
if all_max_proofs_found(&non_priority_units) {
if cu_groups.all_max_proofs_found() {
tracing::info!(target: "chain-listener", "All CUs found max number of proofs {} in current epoch {}", self.max_proofs_per_epoch ,self.current_epoch);
self.stop_commitment().await?;
return Ok(());
} else {
// All CUs were proven, now let's work on submitting proofs for every CU until MAX_PROOF_COUNT is reached
cu_allocation.extend(non_priority_cores.iter().zip(non_priority_units.iter()));
let non_priority_cores_mapping = cc_cores
.non_priority_cores
.iter()
.cloned()
.zip(cu_groups.non_priority_units.iter().cloned());
cu_allocation.extend(non_priority_cores_mapping);

let mut units = non_priority_units.iter().cycle();
// Assign "pending cores" to help generate proofs for "non priority units"
pending_cores.iter().for_each(|core| {
if let Some(unit) = units.next() {
cu_allocation.insert(*core, *unit);
let mut non_priority_units = cu_groups.non_priority_units.iter().cycle();
cc_cores.pending_cores.iter().for_each(|core| {
if let Some(non_priority_unit) = non_priority_units.next() {
cu_allocation.insert(*core, *non_priority_unit);
}
});
}
} else {
// Use assigned cores to calculate proofs for CUs who haven't reached MIN_PROOF_COUNT yet
cu_allocation.extend(priority_cores.iter().zip(priority_units.iter()));
let priority_cores_mapping = cc_cores
.priority_cores
.iter()
.zip(cu_groups.priority_units.iter());
cu_allocation.extend(priority_cores_mapping);

// Use all spare cores to help CUs to reach MIN_PROOF_COUNT
let spare_cores: BTreeSet<_> = non_priority_cores
let spare_cores: BTreeSet<_> = cc_cores
.non_priority_cores
.into_iter()
.chain(pending_cores.into_iter())
.chain(cc_cores.pending_cores.into_iter())
.chain(cc_cores.finished_cores.into_iter())
.collect();

let mut units = priority_units.iter().cycle();
let mut units = cu_groups.priority_units.iter().cycle();
spare_cores.iter().for_each(|core| {
if let Some(unit) = units.next() {
cu_allocation.insert(*core, *unit);
Expand Down Expand Up @@ -949,19 +943,70 @@ impl ChainListener {
Ok(())
}

fn acquire_cores_for_cc(&self, units: &[CUID]) -> eyre::Result<BTreeSet<PhysicalCoreId>> {
let cores = self
.core_manager
.acquire_worker_core(AcquireRequest::new(
units.to_vec(),
WorkType::CapacityCommitment,
))
.map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to acquire cores for active units: {err}");
eyre::eyre!("Failed to acquire cores for active units: {err}")
})?;
fn acquire_cores_for_cc(&self, cu_groups: &CUGroups) -> eyre::Result<PhysicalCoreGroups> {
let mut units = vec![];
units.extend(&cu_groups.priority_units);
units.extend(&cu_groups.non_priority_units);
units.extend(&cu_groups.pending_units);
units.extend(&cu_groups.finished_units);

let cores = self.core_manager.acquire_worker_core(AcquireRequest::new(
units.to_vec(),
WorkType::CapacityCommitment,
));

Ok(cores.physical_core_ids)
fn filter(units: &[CUID], assignment: &Assignment) -> Vec<PhysicalCoreId> {
units
.iter()
.filter_map(|cuid| {
assignment
.cuid_cores
.get(cuid)
.map(|data| data.physical_core_id)
})
.collect()
}

match cores {
Ok(assignment) => {
let priority_units = filter(&cu_groups.priority_units, &assignment);
let non_priority_units = filter(&cu_groups.non_priority_units, &assignment);
let pending_units = filter(&cu_groups.pending_units, &assignment);
let finished_units = filter(&cu_groups.finished_units, &assignment);

Ok(PhysicalCoreGroups {
priority_cores: priority_units,
non_priority_cores: non_priority_units,
pending_cores: pending_units,
finished_cores: finished_units,
})
}
Err(AcquireError::NotFoundAvailableCores {
required,
available,
..
}) => {
tracing::warn!("Found {required} CUs in the Capacity Commitment, but Nox has only {available} Cores available for CC");
let assign_units = units.iter().take(available).cloned().collect();
// Release all units to allow the core manager to assign them again
self.core_manager.release(units);
let assignment = self.core_manager.acquire_worker_core(AcquireRequest::new(
assign_units,
WorkType::CapacityCommitment,
))?;
let priority_cores = filter(&cu_groups.priority_units, &assignment);
let non_priority_cores = filter(&cu_groups.non_priority_units, &assignment);
let pending_cores = filter(&cu_groups.pending_units, &assignment);
let finished_cores = filter(&cu_groups.finished_units, &assignment);

Ok(PhysicalCoreGroups {
priority_cores,
non_priority_cores,
pending_cores,
finished_cores,
})
}
}
}

fn acquire_core_for_deal(&self, unit_id: CUID) -> eyre::Result<()> {
Expand Down Expand Up @@ -1036,7 +1081,7 @@ impl ChainListener {
.filter(|p| p.id.global_nonce == self.global_nonce)
.collect();

if proofs.len() > 0 {
if !proofs.is_empty() {
tracing::info!(target: "chain-listener", "Found {} proofs from polling", proofs.len());
}

Expand Down Expand Up @@ -1258,7 +1303,7 @@ impl ChainListener {
}

if stats_updated {
tracing::info!(target: "chain-listener", "Confirmed proofs count: {:?}", self.proof_counter.iter().map(|(cu, count)| format!("{}: {}", cu, count.to_string())).collect::<Vec<_>>());
tracing::info!(target: "chain-listener", "Confirmed proofs count: {:?}", self.proof_counter.iter().map(|(cu, count)| format!("{}: {}", cu, count)).collect::<Vec<_>>());
}

Ok(())
Expand All @@ -1274,12 +1319,32 @@ impl ChainListener {
}
}

fn all_min_proofs_found(priority_units: &[CUID]) -> bool {
priority_units.is_empty()
struct CUGroups {
/// Already started units involved in CC and not having less than MIN_PROOFS_PER_EPOCH proofs in the current epoch
pub priority_units: Vec<CUID>,
/// Already started units involved in CC and found at least MIN_PROOFS_PER_EPOCH proofs,
/// but less that MAX_PROOFS_PER_EPOCH proofs in the current epoch
pub non_priority_units: Vec<CUID>,
/// Units in CC that is not active yet and can't produce proofs in the current epoch
pub pending_units: Vec<CUID>,
/// Already started units involved in CC and having more than MAX_PROOFS_PER_EPOCH proofs in the current epoch
pub finished_units: Vec<CUID>,
}

fn all_max_proofs_found(non_priority_units: &[CUID]) -> bool {
non_priority_units.is_empty()
impl CUGroups {
fn all_min_proofs_found(&self) -> bool {
self.priority_units.is_empty()
}

fn all_max_proofs_found(&self) -> bool {
self.non_priority_units.is_empty()
}
}
struct PhysicalCoreGroups {
pub priority_cores: Vec<PhysicalCoreId>,
pub non_priority_cores: Vec<PhysicalCoreId>,
pub pending_cores: Vec<PhysicalCoreId>,
pub finished_cores: Vec<PhysicalCoreId>,
}

// measure the request execution time and store it in the metrics
Expand Down
Loading

0 comments on commit d3d5f27

Please sign in to comment.