Skip to content

Commit

Permalink
protect docs
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Sep 3, 2024
1 parent e08c08e commit 7ef5914
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 24 deletions.
26 changes: 18 additions & 8 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
//! errors when communicating with the actor.
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
io,
path::{Path, PathBuf},
sync::{Arc, RwLock},
Expand Down Expand Up @@ -1374,15 +1375,24 @@ impl super::Store for Store {
Ok(self.0.delete(hashes).await?)
}

async fn gc_run(&self, config: super::GcConfig) {
async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
where
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
let inner = self.0.clone();
super::gc_run_loop(self, config, move || {
let inner = inner.clone();
async move {
inner.gc_start().await?;
Ok(())
}
})
super::gc_run_loop(
self,
config,
move || {
let inner = inner.clone();
async move {
inner.gc_start().await?;
Ok(())
}
},
protected_cb,
)
.await
}

Expand Down
11 changes: 8 additions & 3 deletions iroh-blobs/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use futures_lite::{Stream, StreamExt};
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet},
future::Future,
io,
path::PathBuf,
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
Expand Down Expand Up @@ -228,8 +229,12 @@ impl super::Store for Store {
self.inner.temp_tag(tag)
}

async fn gc_run(&self, config: super::GcConfig) {
super::gc_run_loop(self, config, move || async { Ok(()) }).await
async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
where
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
}

async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
Expand Down
11 changes: 8 additions & 3 deletions iroh-blobs/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//!
//! Main entry point is [Store].
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
future::Future,
io,
path::PathBuf,
sync::Arc,
Expand Down Expand Up @@ -324,8 +325,12 @@ impl super::Store for Store {
TempTag::new(inner, None)
}

async fn gc_run(&self, config: super::GcConfig) {
super::gc_run_loop(self, config, move || async { Ok(()) }).await
async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
where
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
}

async fn delete(&self, _hashes: Vec<Hash>) -> io::Result<()> {
Expand Down
20 changes: 16 additions & 4 deletions iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,10 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
/// Start the GC loop
///
/// The gc task will shut down, when dropping the returned future.
fn gc_run(&self, config: GcConfig) -> impl Future<Output = ()>;
fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G) -> impl Future<Output = ()>
where
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send;

/// physically delete the given hashes from the store.
fn delete(&self, hashes: Vec<Hash>) -> impl Future<Output = io::Result<()>> + Send;
Expand Down Expand Up @@ -522,16 +525,22 @@ pub struct GcConfig {
/// The period at which to execute the GC.
pub period: Duration,
/// An optional callback called everytime a GC round finishes.
#[debug("callback")]
#[debug("done_callback")]
pub done_callback: Option<Box<dyn Fn() + Send>>,
}

/// Implementation of the gc loop.
pub(super) async fn gc_run_loop<S, F, Fut>(store: &S, config: GcConfig, start_cb: F)
where
pub(super) async fn gc_run_loop<S, F, Fut, G, Gut>(
store: &S,
config: GcConfig,
start_cb: F,
protected_cb: G,
) where
S: Store,
F: Fn() -> Fut,
Fut: Future<Output = io::Result<()>> + Send + 'static,
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
tracing::info!("Starting GC task with interval {:?}", config.period);
let mut live = BTreeSet::new();
Expand All @@ -545,6 +554,9 @@ where
tracing::debug!("Starting GC");
live.clear();

let p = protected_cb().await;
live.extend(p);

tracing::debug!("Starting GC mark phase");
let live_ref = &mut live;
let mut stream = Gen::new(|co| async move {
Expand Down
39 changes: 33 additions & 6 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,14 +346,41 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
if let GcPolicy::Interval(gc_period) = gc_policy {
let inner = self.clone();
let handle = local_pool.spawn(move || async move {
// TODO: add protection for hashes from documents.

let inner2 = inner.clone();
inner
.db
.gc_run(iroh_blobs::store::GcConfig {
period: gc_period,
done_callback: gc_done_callback,
})
.gc_run(
iroh_blobs::store::GcConfig {
period: gc_period,
done_callback: gc_done_callback,
},
move || {
let inner2 = inner2.clone();
async move {
let mut live = BTreeSet::default();
if let Some(docs) = &inner2.docs {
let doc_hashes = match docs.sync.content_hashes().await {
Ok(hashes) => hashes,
Err(err) => {
tracing::warn!("Error getting doc hashes: {}", err);
return live;
}
};
for hash in doc_hashes {
match hash {
Ok(hash) => {
live.insert(hash);
}
Err(err) => {
tracing::error!("Error getting doc hash: {}", err);
}
}
}
}
live
}
},
)
.await;
});
// We cannot spawn tasks that run on the local pool directly into the join set,
Expand Down

0 comments on commit 7ef5914

Please sign in to comment.