From c7e52c809db4eb9df387b46505c0a7aceb57018d Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 29 Jun 2023 11:25:49 +0300 Subject: [PATCH] Limit reconstruction concurrency on farmer to avoid excessive memory usage --- crates/subspace-farmer-components/src/plotting.rs | 14 ++++++++++++++ .../src/segment_reconstruction.rs | 1 + 2 files changed, 15 insertions(+) diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 63cbf92715c..8361c972598 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -26,6 +26,7 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Quality, Table}; use thiserror::Error; +use tokio::sync::Semaphore; use tracing::{debug, warn}; fn default_backoff() -> ExponentialBackoff { @@ -408,6 +409,10 @@ async fn download_sector( piece_indexes: &[PieceIndex], piece_memory_cache: PieceMemoryCache, ) -> Result<(), PlottingError> { + // TODO: Make configurable, likely allowing user to specify RAM usage expectations and inferring + // concurrency from there + let recovery_semaphore = Semaphore::new(1); + let mut pieces_receiving_futures = piece_indexes .iter() .map(|piece_index| async { @@ -428,6 +433,15 @@ async fn download_sector( // all retries failed if !succeeded { + let _permit = match recovery_semaphore.acquire().await { + Ok(permit) => permit, + Err(error) => { + return ( + piece_index, + Err(format!("Recovery semaphore was closed: {error}").into()), + ); + } + }; let recovered_piece = recover_missing_piece(piece_getter, kzg.clone(), piece_index).await; diff --git a/crates/subspace-farmer-components/src/segment_reconstruction.rs b/crates/subspace-farmer-components/src/segment_reconstruction.rs index 37a2fb54f54..b02433ec08a 100644 --- a/crates/subspace-farmer-components/src/segment_reconstruction.rs +++ b/crates/subspace-farmer-components/src/segment_reconstruction.rs @@ -9,6 +9,7 @@ use thiserror::Error; use tokio::sync::Semaphore; use tracing::{debug, error, info, trace, warn}; +// TODO: Probably should be made configurable const PARALLELISM_LEVEL: usize = 20; #[derive(Debug, Error)]