From 10644eba374b92836c68b041bf8e570e20c7852c Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 29 Jun 2023 11:25:49 +0300 Subject: [PATCH] Limit reconstruction concurrency on farmer to avoid excessive memory usage --- .../subspace-farmer-components/src/plotting.rs | 16 ++++++++++++++++ .../src/segment_reconstruction.rs | 1 + 2 files changed, 17 insertions(+) diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 63cbf92715..515bb69a41 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -26,8 +26,11 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Quality, Table}; use thiserror::Error; +use tokio::sync::Semaphore; use tracing::{debug, warn}; +const RECONSTRUCTION_CONCURRENCY_LIMIT: usize = 1; + fn default_backoff() -> ExponentialBackoff { ExponentialBackoff { initial_interval: Duration::from_secs(15), @@ -408,6 +411,10 @@ async fn download_sector( piece_indexes: &[PieceIndex], piece_memory_cache: PieceMemoryCache, ) -> Result<(), PlottingError> { + // TODO: Make configurable, likely allowing user to specify RAM usage expectations and inferring + // concurrency from there + let recovery_semaphore = Semaphore::new(RECONSTRUCTION_CONCURRENCY_LIMIT); + let mut pieces_receiving_futures = piece_indexes .iter() .map(|piece_index| async { @@ -428,6 +435,15 @@ async fn download_sector( // all retries failed if !succeeded { + let _permit = match recovery_semaphore.acquire().await { + Ok(permit) => permit, + Err(error) => { + return ( + piece_index, + Err(format!("Recovery semaphore was closed: {error}").into()), + ); + } + }; let recovered_piece = recover_missing_piece(piece_getter, kzg.clone(), piece_index).await; diff --git a/crates/subspace-farmer-components/src/segment_reconstruction.rs b/crates/subspace-farmer-components/src/segment_reconstruction.rs index 37a2fb54f5..b02433ec08 100644 --- a/crates/subspace-farmer-components/src/segment_reconstruction.rs +++ b/crates/subspace-farmer-components/src/segment_reconstruction.rs @@ -9,6 +9,7 @@ use thiserror::Error; use tokio::sync::Semaphore; use tracing::{debug, error, info, trace, warn}; +// TODO: Probably should be made configurable const PARALLELISM_LEVEL: usize = 20; #[derive(Debug, Error)]