From 648b18cef1c081059b0c9dd6805b02c4d4ac6887 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 3 Sep 2024 15:25:16 +0200 Subject: [PATCH] actually fix the issue --- iroh-blobs/src/store/fs.rs | 138 +++++++++++++++++++++++++++++---- iroh-blobs/src/store/traits.rs | 2 +- 2 files changed, 122 insertions(+), 18 deletions(-) diff --git a/iroh-blobs/src/store/fs.rs b/iroh-blobs/src/store/fs.rs index 72b128ef47..ec4ba15ee0 100644 --- a/iroh-blobs/src/store/fs.rs +++ b/iroh-blobs/src/store/fs.rs @@ -79,6 +79,7 @@ use bao_tree::io::{ use bytes::Bytes; use futures_lite::{Stream, StreamExt}; +use genawaiter::rc::{Co, Gen}; use iroh_base::hash::{BlobFormat, Hash, HashAndFormat}; use iroh_io::AsyncSliceReader; use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError}; @@ -103,6 +104,7 @@ use crate::{ tables::BaoFilePart, util::{overwrite_and_sync, read_and_remove}, }, + GcMarkEvent, GcSweepEvent, }, util::{ compute_outboard, @@ -123,7 +125,7 @@ use self::test_support::EntryData; use super::{ bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb}, temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode, - ExportProgressCb, ImportMode, ImportProgress, Map, TempCounterMap, + ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap, }; /// Location of the data. @@ -628,6 +630,11 @@ pub(crate) enum ActorMessage { hashes: Vec, tx: oneshot::Sender>, }, + /// Modification method: try to delete the data for a number of hashes + GcDelete { + hashes: Vec, + tx: oneshot::Sender>, + }, /// Sync the entire database to disk. /// /// This just makes sure that there is no write transaction open. @@ -671,7 +678,8 @@ impl ActorMessage { | Self::SetTag { .. } | Self::CreateTag { .. } | Self::SetFullEntryState { .. } - | Self::Delete { .. } => MessageCategory::ReadWrite, + | Self::Delete { .. } + | Self::GcDelete { .. } => MessageCategory::ReadWrite, Self::UpdateInlineOptions { .. } | Self::Sync { .. } | Self::Shutdown { .. } @@ -888,6 +896,12 @@ impl StoreInner { Ok(rx.await??) } + async fn gc_delete(&self, hashes: Vec) -> OuterResult<()> { + let (tx, rx) = oneshot::channel(); + self.tx.send(ActorMessage::GcDelete { hashes, tx }).await?; + Ok(rx.await??) + } + async fn gc_start(&self) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::GcStart { tx }).await?; @@ -1380,20 +1394,71 @@ impl super::Store for Store { G: Fn() -> Gut, Gut: Future> + Send, { - let inner = self.0.clone(); - super::gc_run_loop( - self, - config, - move || { - let inner = inner.clone(); - async move { - inner.gc_start().await?; - Ok(()) + tracing::info!("Starting GC task with interval {:?}", config.period); + let mut live = BTreeSet::new(); + 'outer: loop { + if let Err(cause) = self.0.gc_start().await { + tracing::debug!( + "unable to notify the db of GC start: {cause}. Shutting down GC loop." + ); + break; + } + // do delay before the two phases of GC + tokio::time::sleep(config.period).await; + tracing::debug!("Starting GC"); + live.clear(); + + let p = protected_cb().await; + live.extend(p); + + tracing::debug!("Starting GC mark phase"); + let live_ref = &mut live; + let mut stream = Gen::new(|co| async move { + if let Err(e) = super::gc_mark_task(self, live_ref, &co).await { + co.yield_(GcMarkEvent::Error(e)).await; } - }, - protected_cb, - ) - .await + }); + while let Some(item) = stream.next().await { + match item { + GcMarkEvent::CustomDebug(text) => { + tracing::debug!("{}", text); + } + GcMarkEvent::CustomWarning(text, _) => { + tracing::warn!("{}", text); + } + GcMarkEvent::Error(err) => { + tracing::error!("Fatal error during GC mark {}", err); + continue 'outer; + } + } + } + drop(stream); + + tracing::debug!("Starting GC sweep phase"); + let live_ref = &live; + let mut stream = Gen::new(|co| async move { + if let Err(e) = gc_sweep_task(self, live_ref, &co).await { + co.yield_(GcSweepEvent::Error(e)).await; + } + }); + while let Some(item) = stream.next().await { + match item { + GcSweepEvent::CustomDebug(text) => { + tracing::debug!("{}", text); + } + GcSweepEvent::CustomWarning(text, _) => { + tracing::warn!("{}", text); + } + GcSweepEvent::Error(err) => { + tracing::error!("Fatal error during GC mark {}", err); + continue 'outer; + } + } + } + if let Some(ref cb) = config.done_callback { + cb(); + } + } } fn temp_tag(&self, value: HashAndFormat) -> TempTag { @@ -1409,6 +1474,36 @@ impl super::Store for Store { } } +pub(super) async fn gc_sweep_task<'a>( + store: &'a Store, + live: &BTreeSet, + co: &Co, +) -> anyhow::Result<()> { + let blobs = store.blobs().await?.chain(store.partial_blobs().await?); + let mut count = 0; + let mut batch = Vec::new(); + for hash in blobs { + let hash = hash?; + if !live.contains(&hash) { + batch.push(hash); + count += 1; + } + if batch.len() >= 100 { + store.0.gc_delete(batch.clone()).await?; + batch.clear(); + } + } + if !batch.is_empty() { + store.0.gc_delete(batch).await?; + } + co.yield_(GcSweepEvent::CustomDebug(format!( + "deleted {} blobs", + count + ))) + .await; + Ok(()) +} + impl Actor { fn new( path: &Path, @@ -2050,11 +2145,16 @@ impl ActorState { Ok(()) } - fn delete(&mut self, tables: &mut Tables, hashes: Vec) -> ActorResult<()> { + fn delete(&mut self, tables: &mut Tables, hashes: Vec, force: bool) -> ActorResult<()> { for hash in hashes { if self.temp.as_ref().read().unwrap().contains(&hash) { continue; } + if !force && self.protected.contains(&hash) { + tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]); + continue; + } + tracing::debug!("deleting {}", &hash.to_hex()[..8]); let handle = self.handles.remove(&hash); let entry = tables.blobs.remove(hash)?; @@ -2271,7 +2371,11 @@ impl ActorState { tx.send(res).ok(); } ActorMessage::Delete { hashes, tx } => { - let res = self.delete(tables, hashes); + let res = self.delete(tables, hashes, true); + tx.send(res).ok(); + } + ActorMessage::GcDelete { hashes, tx } => { + let res = self.delete(tables, hashes, false); tx.send(res).ok(); } ActorMessage::OnComplete { handle } => { diff --git a/iroh-blobs/src/store/traits.rs b/iroh-blobs/src/store/traits.rs index 300961b8b8..85f617468c 100644 --- a/iroh-blobs/src/store/traits.rs +++ b/iroh-blobs/src/store/traits.rs @@ -608,7 +608,7 @@ pub(super) async fn gc_run_loop( } /// Implementation of the gc method. -async fn gc_mark_task<'a>( +pub(super) async fn gc_mark_task<'a>( store: &'a impl Store, live: &'a mut BTreeSet, co: &Co,