Skip to content

Commit

Permalink
Merge pull request #10387 from filecoin-project/feat/lotus-badger-gc
Browse files Browse the repository at this point in the history
feat:splitstore:Badger GC of hotstore command
  • Loading branch information
magik6k committed Mar 7, 2023
2 parents b88ecb7 + a2d3315 commit 64b9b53
Show file tree
Hide file tree
Showing 17 changed files with 286 additions and 17 deletions.
12 changes: 11 additions & 1 deletion api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,14 @@ type FullNode interface {
// nodes.
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) error //perm:admin

// ChainPrune prunes the stored chain state and garbage collects; only supported if you
// ChainPrune forces compaction on cold store and garbage collects; only supported if you
// are using the splitstore
ChainPrune(ctx context.Context, opts PruneOpts) error //perm:admin

// ChainHotGC does online (badger) GC on the hot store; only supported if you are using
// the splitstore
ChainHotGC(ctx context.Context, opts HotGCOpts) error //perm:admin

// ChainCheckBlockstore performs an (asynchronous) health check on the chain/state blockstore
// if supported by the underlying implementation.
ChainCheckBlockstore(context.Context) error //perm:admin
Expand Down Expand Up @@ -1354,6 +1358,12 @@ type PruneOpts struct {
RetainState int64
}

type HotGCOpts struct {
Threshold float64
Periodic bool
Moving bool
}

type EthTxReceipt struct {
TransactionHash ethtypes.EthHash `json:"transactionHash"`
TransactionIndex ethtypes.EthUint64 `json:"transactionIndex"`
Expand Down
14 changes: 14 additions & 0 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 55 additions & 5 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-base32"
"go.uber.org/zap"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/blockstore"
)
Expand All @@ -44,7 +45,8 @@ const (
// MemoryMap is equivalent to badger/options.MemoryMap.
MemoryMap = options.MemoryMap
// LoadToRAM is equivalent to badger/options.LoadToRAM.
LoadToRAM = options.LoadToRAM
LoadToRAM = options.LoadToRAM
defaultGCThreshold = 0.125
)

// Options embeds the badger options themselves, and augments them with
Expand Down Expand Up @@ -439,7 +441,7 @@ func (b *Blockstore) deleteDB(path string) {
}
}

func (b *Blockstore) onlineGC() error {
func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
b.lockDB()
defer b.unlockDB()

Expand All @@ -448,14 +450,22 @@ func (b *Blockstore) onlineGC() error {
if nworkers < 2 {
nworkers = 2
}
if nworkers > 7 { // max out at 1 goroutine per badger level
nworkers = 7
}

err := b.db.Flatten(nworkers)
if err != nil {
return err
}

for err == nil {
err = b.db.RunValueLogGC(0.125)
select {
case <-ctx.Done():
err = ctx.Err()
default:
err = b.db.RunValueLogGC(threshold)
}
}

if err == badger.ErrNoRewrite {
Expand All @@ -468,7 +478,7 @@ func (b *Blockstore) onlineGC() error {

// CollectGarbage compacts and runs garbage collection on the value log;
// implements the BlockstoreGC trait
func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error {
func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error {
if err := b.access(); err != nil {
return err
}
Expand All @@ -485,8 +495,48 @@ func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error
if options.FullGC {
return b.movingGC()
}
threshold := options.Threshold
if threshold == 0 {
threshold = defaultGCThreshold
}
return b.onlineGC(ctx, threshold)
}

// GCOnce runs garbage collection on the value log;
// implements BlockstoreGCOnce trait
func (b *Blockstore) GCOnce(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()

return b.onlineGC()
var options blockstore.BlockstoreGCOptions
for _, opt := range opts {
err := opt(&options)
if err != nil {
return err
}
}
if options.FullGC {
return xerrors.Errorf("FullGC option specified for GCOnce but full GC is non incremental")
}

threshold := options.Threshold
if threshold == 0 {
threshold = defaultGCThreshold
}

b.lockDB()
defer b.unlockDB()

// Note no compaction needed before single GC as we will hit at most one vlog anyway
err := b.db.RunValueLogGC(threshold)
if err == badger.ErrNoRewrite {
// not really an error in this case, it signals the end of GC
return nil
}

return err
}

// Size returns the aggregate size of the blockstore
Expand Down
4 changes: 2 additions & 2 deletions blockstore/badger/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
return nil
})
g.Go(func() error {
return db.CollectGarbage(blockstore.WithFullGC(true))
return db.CollectGarbage(ctx, blockstore.WithFullGC(true))
})

err = g.Wait()
Expand Down Expand Up @@ -230,7 +230,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
checkPath()

// now do another FullGC to test the double move and following of symlinks
if err := db.CollectGarbage(blockstore.WithFullGC(true)); err != nil {
if err := db.CollectGarbage(ctx, blockstore.WithFullGC(true)); err != nil {
t.Fatal(err)
}

Expand Down
16 changes: 15 additions & 1 deletion blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ type BlockstoreIterator interface {

// BlockstoreGC is a trait for blockstores that support online garbage collection
type BlockstoreGC interface {
CollectGarbage(options ...BlockstoreGCOption) error
CollectGarbage(ctx context.Context, options ...BlockstoreGCOption) error
}

// BlockstoreGCOnce is a trait for a blockstore that supports incremental online garbage collection
type BlockstoreGCOnce interface {
GCOnce(ctx context.Context, options ...BlockstoreGCOption) error
}

// BlockstoreGCOption is a functional interface for controlling blockstore GC options
Expand All @@ -45,6 +50,8 @@ type BlockstoreGCOption = func(*BlockstoreGCOptions) error
// BlockstoreGCOptions is a struct with GC options
type BlockstoreGCOptions struct {
FullGC bool
// fraction of garbage in badger vlog before its worth processing in online GC
Threshold float64
}

func WithFullGC(fullgc bool) BlockstoreGCOption {
Expand All @@ -54,6 +61,13 @@ func WithFullGC(fullgc bool) BlockstoreGCOption {
}
}

func WithThreshold(threshold float64) BlockstoreGCOption {
return func(opts *BlockstoreGCOptions) error {
opts.Threshold = threshold
return nil
}
}

// BlockstoreSize is a trait for on-disk blockstores that can report their size
type BlockstoreSize interface {
Size() (int64, error)
Expand Down
2 changes: 1 addition & 1 deletion blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {

// we are done; do some housekeeping
s.endTxnProtect()
s.gcHotstore()
s.gcHotAfterCompaction()

err = s.setBaseEpoch(boundaryEpoch)
if err != nil {
Expand Down
20 changes: 18 additions & 2 deletions blockstore/splitstore/splitstore_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
bstore "github.com/filecoin-project/lotus/blockstore"
)

func (s *SplitStore) gcHotstore() {
func (s *SplitStore) gcHotAfterCompaction() {
var opts []bstore.BlockstoreGCOption
if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 {
opts = append(opts, bstore.WithFullGC(true))
Expand All @@ -23,7 +23,7 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG
log.Info("garbage collecting blockstore")
startGC := time.Now()

if err := gc.CollectGarbage(opts...); err != nil {
if err := gc.CollectGarbage(s.ctx, opts...); err != nil {
return err
}

Expand All @@ -33,3 +33,19 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG

return fmt.Errorf("blockstore doesn't support garbage collection: %T", b)
}

func (s *SplitStore) gcBlockstoreOnce(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
if gc, ok := b.(bstore.BlockstoreGCOnce); ok {
log.Debug("gc blockstore once")
startGC := time.Now()

if err := gc.GCOnce(s.ctx, opts...); err != nil {
return err
}

log.Debugw("gc blockstore once done", "took", time.Since(startGC))
return nil
}

return fmt.Errorf("blockstore doesn't support gc once: %T", b)
}
17 changes: 17 additions & 0 deletions blockstore/splitstore/splitstore_prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ var (
PruneThreshold = 7 * build.Finality
)

// GCHotstore runs online GC on the chain state in the hotstore according the to options specified
func (s *SplitStore) GCHotStore(opts api.HotGCOpts) error {
if opts.Moving {
gcOpts := []bstore.BlockstoreGCOption{bstore.WithFullGC(true)}
return s.gcBlockstore(s.hot, gcOpts)
}

gcOpts := []bstore.BlockstoreGCOption{bstore.WithThreshold(opts.Threshold)}
var err error
if opts.Periodic {
err = s.gcBlockstore(s.hot, gcOpts)
} else {
err = s.gcBlockstoreOnce(s.hot, gcOpts)
}
return err
}

// PruneChain instructs the SplitStore to prune chain state in the coldstore, according to the
// options specified.
func (s *SplitStore) PruneChain(opts api.PruneOpts) error {
Expand Down
Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/gateway.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
59 changes: 58 additions & 1 deletion cli/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,64 @@ func createExportFile(app *cli.App, path string) (io.WriteCloser, error) {

var ChainPruneCmd = &cli.Command{
Name: "prune",
Usage: "prune the stored chain state and perform garbage collection",
Usage: "splitstore gc",
Subcommands: []*cli.Command{
chainPruneColdCmd,
chainPruneHotGCCmd,
chainPruneHotMovingGCCmd,
},
}

var chainPruneHotGCCmd = &cli.Command{
Name: "hot",
Usage: "run online (badger vlog) garbage collection on hotstore",
Flags: []cli.Flag{
&cli.Float64Flag{Name: "threshold", Value: 0.01, Usage: "Threshold of vlog garbage for gc"},
&cli.BoolFlag{Name: "periodic", Value: false, Usage: "Run periodic gc over multiple vlogs. Otherwise run gc once"},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
opts := lapi.HotGCOpts{}
opts.Periodic = cctx.Bool("periodic")
opts.Threshold = cctx.Float64("threshold")

gcStart := time.Now()
err = api.ChainHotGC(ctx, opts)
gcTime := time.Since(gcStart)
fmt.Printf("Online GC took %v (periodic <%t> threshold <%f>)", gcTime, opts.Periodic, opts.Threshold)
return err
},
}

var chainPruneHotMovingGCCmd = &cli.Command{
Name: "hot-moving",
Usage: "run moving gc on hotstore",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
opts := lapi.HotGCOpts{}
opts.Moving = true

gcStart := time.Now()
err = api.ChainHotGC(ctx, opts)
gcTime := time.Since(gcStart)
fmt.Printf("Moving GC took %v", gcTime)
return err
},
}

var chainPruneColdCmd = &cli.Command{
Name: "compact-cold",
Usage: "force splitstore compaction on cold store state and run gc",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "online-gc",
Expand Down
Loading

0 comments on commit 64b9b53

Please sign in to comment.