Skip to content

Commit

Permalink
actually fix the issue
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Sep 3, 2024
1 parent 7ef5914 commit 648b18c
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 18 deletions.
138 changes: 121 additions & 17 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use bao_tree::io::{
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};

use genawaiter::rc::{Co, Gen};
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
Expand All @@ -103,6 +104,7 @@ use crate::{
tables::BaoFilePart,
util::{overwrite_and_sync, read_and_remove},
},
GcMarkEvent, GcSweepEvent,
},
util::{
compute_outboard,
Expand All @@ -123,7 +125,7 @@ use self::test_support::EntryData;
use super::{
bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
ExportProgressCb, ImportMode, ImportProgress, Map, TempCounterMap,
ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
};

/// Location of the data.
Expand Down Expand Up @@ -628,6 +630,11 @@ pub(crate) enum ActorMessage {
hashes: Vec<Hash>,
tx: oneshot::Sender<ActorResult<()>>,
},
/// Modification method: try to delete the data for a number of hashes
GcDelete {
hashes: Vec<Hash>,
tx: oneshot::Sender<ActorResult<()>>,
},
/// Sync the entire database to disk.
///
/// This just makes sure that there is no write transaction open.
Expand Down Expand Up @@ -671,7 +678,8 @@ impl ActorMessage {
| Self::SetTag { .. }
| Self::CreateTag { .. }
| Self::SetFullEntryState { .. }
| Self::Delete { .. } => MessageCategory::ReadWrite,
| Self::Delete { .. }
| Self::GcDelete { .. } => MessageCategory::ReadWrite,
Self::UpdateInlineOptions { .. }
| Self::Sync { .. }
| Self::Shutdown { .. }
Expand Down Expand Up @@ -888,6 +896,12 @@ impl StoreInner {
Ok(rx.await??)
}

async fn gc_delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::GcDelete { hashes, tx }).await?;
Ok(rx.await??)
}

async fn gc_start(&self) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::GcStart { tx }).await?;
Expand Down Expand Up @@ -1380,20 +1394,71 @@ impl super::Store for Store {
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
let inner = self.0.clone();
super::gc_run_loop(
self,
config,
move || {
let inner = inner.clone();
async move {
inner.gc_start().await?;
Ok(())
tracing::info!("Starting GC task with interval {:?}", config.period);
let mut live = BTreeSet::new();
'outer: loop {
if let Err(cause) = self.0.gc_start().await {
tracing::debug!(
"unable to notify the db of GC start: {cause}. Shutting down GC loop."
);
break;
}
// do delay before the two phases of GC
tokio::time::sleep(config.period).await;
tracing::debug!("Starting GC");
live.clear();

let p = protected_cb().await;
live.extend(p);

tracing::debug!("Starting GC mark phase");
let live_ref = &mut live;
let mut stream = Gen::new(|co| async move {
if let Err(e) = super::gc_mark_task(self, live_ref, &co).await {
co.yield_(GcMarkEvent::Error(e)).await;
}
},
protected_cb,
)
.await
});
while let Some(item) = stream.next().await {
match item {
GcMarkEvent::CustomDebug(text) => {
tracing::debug!("{}", text);
}
GcMarkEvent::CustomWarning(text, _) => {
tracing::warn!("{}", text);
}
GcMarkEvent::Error(err) => {
tracing::error!("Fatal error during GC mark {}", err);
continue 'outer;
}
}
}
drop(stream);

tracing::debug!("Starting GC sweep phase");
let live_ref = &live;
let mut stream = Gen::new(|co| async move {
if let Err(e) = gc_sweep_task(self, live_ref, &co).await {
co.yield_(GcSweepEvent::Error(e)).await;
}
});
while let Some(item) = stream.next().await {
match item {
GcSweepEvent::CustomDebug(text) => {
tracing::debug!("{}", text);
}
GcSweepEvent::CustomWarning(text, _) => {
tracing::warn!("{}", text);
}
GcSweepEvent::Error(err) => {
tracing::error!("Fatal error during GC mark {}", err);
continue 'outer;
}
}
}
if let Some(ref cb) = config.done_callback {
cb();
}
}
}

fn temp_tag(&self, value: HashAndFormat) -> TempTag {
Expand All @@ -1409,6 +1474,36 @@ impl super::Store for Store {
}
}

pub(super) async fn gc_sweep_task<'a>(
store: &'a Store,
live: &BTreeSet<Hash>,
co: &Co<GcSweepEvent>,
) -> anyhow::Result<()> {
let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
let mut count = 0;
let mut batch = Vec::new();
for hash in blobs {
let hash = hash?;
if !live.contains(&hash) {
batch.push(hash);
count += 1;
}
if batch.len() >= 100 {
store.0.gc_delete(batch.clone()).await?;
batch.clear();
}
}
if !batch.is_empty() {
store.0.gc_delete(batch).await?;
}
co.yield_(GcSweepEvent::CustomDebug(format!(
"deleted {} blobs",
count
)))
.await;
Ok(())
}

impl Actor {
fn new(
path: &Path,
Expand Down Expand Up @@ -2050,11 +2145,16 @@ impl ActorState {
Ok(())
}

fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>) -> ActorResult<()> {
fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>, force: bool) -> ActorResult<()> {
for hash in hashes {
if self.temp.as_ref().read().unwrap().contains(&hash) {
continue;
}
if !force && self.protected.contains(&hash) {
tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]);
continue;
}

tracing::debug!("deleting {}", &hash.to_hex()[..8]);
let handle = self.handles.remove(&hash);
let entry = tables.blobs.remove(hash)?;
Expand Down Expand Up @@ -2271,7 +2371,11 @@ impl ActorState {
tx.send(res).ok();
}
ActorMessage::Delete { hashes, tx } => {
let res = self.delete(tables, hashes);
let res = self.delete(tables, hashes, true);
tx.send(res).ok();
}
ActorMessage::GcDelete { hashes, tx } => {
let res = self.delete(tables, hashes, false);
tx.send(res).ok();
}
ActorMessage::OnComplete { handle } => {
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ pub(super) async fn gc_run_loop<S, F, Fut, G, Gut>(
}

/// Implementation of the gc method.
async fn gc_mark_task<'a>(
pub(super) async fn gc_mark_task<'a>(
store: &'a impl Store,
live: &'a mut BTreeSet<Hash>,
co: &Co<GcMarkEvent>,
Expand Down

0 comments on commit 648b18c

Please sign in to comment.