diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 1b52eb548d8..f61219e7288 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -396,6 +396,9 @@ func (b *Blockstore) doCopy(from, to *badger.DB) error { if workers < 2 { workers = 2 } + if workers > 8 { + workers = 8 + } stream := from.NewStream() stream.NumGo = workers @@ -441,7 +444,7 @@ func (b *Blockstore) deleteDB(path string) { } } -func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error { +func (b *Blockstore) onlineGC(ctx context.Context, threshold float64, checkFreq time.Duration, check func() error) error { b.lockDB() defer b.unlockDB() @@ -458,11 +461,15 @@ func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error { if err != nil { return err } - + checkTick := time.NewTimer(checkFreq) + defer checkTick.Stop() for err == nil { select { case <-ctx.Done(): err = ctx.Err() + case <-checkTick.C: + check() + checkTick.Reset(checkFreq) default: err = b.db.RunValueLogGC(threshold) } @@ -499,7 +506,17 @@ func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.Bloc if threshold == 0 { threshold = defaultGCThreshold } - return b.onlineGC(ctx, threshold) + checkFreq := options.CheckFreq + if checkFreq < 30*time.Second { // disallow checking more frequently than block time + checkFreq = 30 * time.Second + } + check := options.Check + if check == nil { + check = func() error { + return nil + } + } + return b.onlineGC(ctx, threshold, checkFreq, check) } // GCOnce runs garbage collection on the value log; diff --git a/blockstore/blockstore.go b/blockstore/blockstore.go index 8d16bbd509b..195e991e1f4 100644 --- a/blockstore/blockstore.go +++ b/blockstore/blockstore.go @@ -2,6 +2,7 @@ package blockstore import ( "context" + "time" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" @@ -57,6 +58,10 @@ type BlockstoreGCOptions struct { FullGC bool // fraction of garbage in badger vlog before its worth processing in online GC Threshold float64 + // how often to call the check function + CheckFreq time.Duration + // function to call periodically to pause or early terminate GC + Check func() error } func WithFullGC(fullgc bool) BlockstoreGCOption { @@ -73,6 +78,20 @@ func WithThreshold(threshold float64) BlockstoreGCOption { } } +func WithCheckFreq(f time.Duration) BlockstoreGCOption { + return func(opts *BlockstoreGCOptions) error { + opts.CheckFreq = f + return nil + } +} + +func WithCheck(check func() error) BlockstoreGCOption { + return func(opts *BlockstoreGCOptions) error { + opts.Check = check + return nil + } +} + // BlockstoreSize is a trait for on-disk blockstores that can report their size type BlockstoreSize interface { Size() (int64, error) diff --git a/blockstore/splitstore/splitstore_gc.go b/blockstore/splitstore/splitstore_gc.go index 486633e1585..8b154f574c3 100644 --- a/blockstore/splitstore/splitstore_gc.go +++ b/blockstore/splitstore/splitstore_gc.go @@ -72,6 +72,8 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG log.Info("garbage collecting blockstore") startGC := time.Now() + opts = append(opts, bstore.WithCheckFreq(90*time.Second)) + opts = append(opts, bstore.WithCheck(s.checkYield)) if err := gc.CollectGarbage(s.ctx, opts...); err != nil { return err }