diff --git a/iroh-blobs/src/store/fs.rs b/iroh-blobs/src/store/fs.rs index 8f3b6c67ff..72b128ef47 100644 --- a/iroh-blobs/src/store/fs.rs +++ b/iroh-blobs/src/store/fs.rs @@ -65,6 +65,7 @@ //! errors when communicating with the actor. use std::{ collections::{BTreeMap, BTreeSet}, + future::Future, io, path::{Path, PathBuf}, sync::{Arc, RwLock}, @@ -1374,15 +1375,24 @@ impl super::Store for Store { Ok(self.0.delete(hashes).await?) } - async fn gc_run(&self, config: super::GcConfig) { + async fn gc_run(&self, config: super::GcConfig, protected_cb: G) + where + G: Fn() -> Gut, + Gut: Future> + Send, + { let inner = self.0.clone(); - super::gc_run_loop(self, config, move || { - let inner = inner.clone(); - async move { - inner.gc_start().await?; - Ok(()) - } - }) + super::gc_run_loop( + self, + config, + move || { + let inner = inner.clone(); + async move { + inner.gc_start().await?; + Ok(()) + } + }, + protected_cb, + ) .await } diff --git a/iroh-blobs/src/store/mem.rs b/iroh-blobs/src/store/mem.rs index 8b0b109151..299f224772 100644 --- a/iroh-blobs/src/store/mem.rs +++ b/iroh-blobs/src/store/mem.rs @@ -10,7 +10,8 @@ use futures_lite::{Stream, StreamExt}; use iroh_base::hash::{BlobFormat, Hash, HashAndFormat}; use iroh_io::AsyncSliceReader; use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, + future::Future, io, path::PathBuf, sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, @@ -228,8 +229,12 @@ impl super::Store for Store { self.inner.temp_tag(tag) } - async fn gc_run(&self, config: super::GcConfig) { - super::gc_run_loop(self, config, move || async { Ok(()) }).await + async fn gc_run(&self, config: super::GcConfig, protected_cb: G) + where + G: Fn() -> Gut, + Gut: Future> + Send, + { + super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await } async fn delete(&self, hashes: Vec) -> io::Result<()> { diff --git a/iroh-blobs/src/store/readonly_mem.rs b/iroh-blobs/src/store/readonly_mem.rs index a4648633af..5fba947ab4 100644 --- a/iroh-blobs/src/store/readonly_mem.rs +++ b/iroh-blobs/src/store/readonly_mem.rs @@ -2,7 +2,8 @@ //! //! Main entry point is [Store]. use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap}, + future::Future, io, path::PathBuf, sync::Arc, @@ -324,8 +325,12 @@ impl super::Store for Store { TempTag::new(inner, None) } - async fn gc_run(&self, config: super::GcConfig) { - super::gc_run_loop(self, config, move || async { Ok(()) }).await + async fn gc_run(&self, config: super::GcConfig, protected_cb: G) + where + G: Fn() -> Gut, + Gut: Future> + Send, + { + super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await } async fn delete(&self, _hashes: Vec) -> io::Result<()> { diff --git a/iroh-blobs/src/store/traits.rs b/iroh-blobs/src/store/traits.rs index 185b84c1bd..300961b8b8 100644 --- a/iroh-blobs/src/store/traits.rs +++ b/iroh-blobs/src/store/traits.rs @@ -359,7 +359,10 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug { /// Start the GC loop /// /// The gc task will shut down, when dropping the returned future. - fn gc_run(&self, config: GcConfig) -> impl Future; + fn gc_run(&self, config: super::GcConfig, protected_cb: G) -> impl Future + where + G: Fn() -> Gut, + Gut: Future> + Send; /// physically delete the given hashes from the store. fn delete(&self, hashes: Vec) -> impl Future> + Send; @@ -522,16 +525,22 @@ pub struct GcConfig { /// The period at which to execute the GC. pub period: Duration, /// An optional callback called everytime a GC round finishes. - #[debug("callback")] + #[debug("done_callback")] pub done_callback: Option>, } /// Implementation of the gc loop. -pub(super) async fn gc_run_loop(store: &S, config: GcConfig, start_cb: F) -where +pub(super) async fn gc_run_loop( + store: &S, + config: GcConfig, + start_cb: F, + protected_cb: G, +) where S: Store, F: Fn() -> Fut, Fut: Future> + Send + 'static, + G: Fn() -> Gut, + Gut: Future> + Send, { tracing::info!("Starting GC task with interval {:?}", config.period); let mut live = BTreeSet::new(); @@ -545,6 +554,9 @@ where tracing::debug!("Starting GC"); live.clear(); + let p = protected_cb().await; + live.extend(p); + tracing::debug!("Starting GC mark phase"); let live_ref = &mut live; let mut stream = Gen::new(|co| async move { diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 7702791795..087bd0cac5 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -346,14 +346,41 @@ impl NodeInner { if let GcPolicy::Interval(gc_period) = gc_policy { let inner = self.clone(); let handle = local_pool.spawn(move || async move { - // TODO: add protection for hashes from documents. - + let inner2 = inner.clone(); inner .db - .gc_run(iroh_blobs::store::GcConfig { - period: gc_period, - done_callback: gc_done_callback, - }) + .gc_run( + iroh_blobs::store::GcConfig { + period: gc_period, + done_callback: gc_done_callback, + }, + move || { + let inner2 = inner2.clone(); + async move { + let mut live = BTreeSet::default(); + if let Some(docs) = &inner2.docs { + let doc_hashes = match docs.sync.content_hashes().await { + Ok(hashes) => hashes, + Err(err) => { + tracing::warn!("Error getting doc hashes: {}", err); + return live; + } + }; + for hash in doc_hashes { + match hash { + Ok(hash) => { + live.insert(hash); + } + Err(err) => { + tracing::error!("Error getting doc hash: {}", err); + } + } + } + } + live + } + }, + ) .await; }); // We cannot spawn tasks that run on the local pool directly into the join set,