diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 93e090ac876..ef717dc75d1 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" "sync/atomic" "time" @@ -138,12 +139,20 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) { return nil, xerrors.Errorf("failed to get metadata datastore: %w", err) } - bds, err := lr.Datastore("/chain") + bs, err := lr.Blockstore(repo.BlockstoreChain) if err != nil { - return nil, xerrors.Errorf("failed to get blocks datastore: %w", err) + return nil, err } - bs := mybs{blockstore.NewBlockstore(bds)} + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() + + bs = mybs{bs} ks, err := lr.KeyStore() if err != nil { diff --git a/chain/store/store.go b/chain/store/store.go index f9df20d4f24..e58669c14d6 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -44,10 +44,10 @@ import ( "github.com/ipfs/go-datastore/query" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log/v2" - car "github.com/ipld/go-car" + "github.com/ipld/go-car" carutil "github.com/ipld/go-car/util" cbg "github.com/whyrusleeping/cbor-gen" - pubsub "github.com/whyrusleeping/pubsub" + "github.com/whyrusleeping/pubsub" "golang.org/x/xerrors" ) @@ -108,6 +108,8 @@ type ChainStore struct { localbs bstore.Blockstore ds dstore.Batching + localviewer bstore.Viewer + heaviestLk sync.Mutex heaviest *types.TipSet @@ -150,6 +152,10 @@ func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Ba journal: j, } + if v, ok := localbs.(bstore.Viewer); ok { + cs.localviewer = v + } + cs.evtTypes = [1]journal.EventType{ evtTypeHeadChange: j.RegisterEventType("sync", "head_change"), } @@ -365,6 +371,26 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS return nil } +// ForceHeadSilent forces a chain head tipset without triggering a reorg +// operation. +// +// CAUTION: Use it only for testing, such as to teleport the chain to a +// particular tipset to carry out a benchmark, verification, etc. on a chain +// segment. +func (cs *ChainStore) ForceHeadSilent(_ context.Context, ts *types.TipSet) error { + log.Warnf("(!!!) forcing a new head silently; only use this only for testing; new head: %s", ts) + + cs.heaviestLk.Lock() + defer cs.heaviestLk.Unlock() + cs.heaviest = ts + + err := cs.writeHead(ts) + if err != nil { + err = xerrors.Errorf("failed to write chain head: %s", err) + } + return err +} + type reorg struct { old *types.TipSet new *types.TipSet @@ -525,12 +551,20 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { // GetBlock fetches a BlockHeader with the supplied CID. It returns // blockstore.ErrNotFound if the block was not found in the BlockStore. func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) { - sb, err := cs.localbs.Get(c) - if err != nil { - return nil, err + if cs.localviewer == nil { + sb, err := cs.localbs.Get(c) + if err != nil { + return nil, err + } + return types.DecodeBlock(sb.RawData()) } - return types.DecodeBlock(sb.RawData()) + var blk *types.BlockHeader + err := cs.localviewer.View(c, func(b []byte) (err error) { + blk, err = types.DecodeBlock(b) + return err + }) + return blk, err } func (cs *ChainStore) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) { @@ -775,12 +809,7 @@ func (cs *ChainStore) GetGenesis() (*types.BlockHeader, error) { return nil, err } - genb, err := cs.bs.Get(c) - if err != nil { - return nil, err - } - - return types.DecodeBlock(genb.RawData()) + return cs.GetBlock(c) } func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) { @@ -796,23 +825,39 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) { } func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) { - sb, err := cs.localbs.Get(c) - if err != nil { - log.Errorf("get message get failed: %s: %s", c, err) - return nil, err + if cs.localviewer == nil { + sb, err := cs.localbs.Get(c) + if err != nil { + log.Errorf("get message get failed: %s: %s", c, err) + return nil, err + } + return types.DecodeMessage(sb.RawData()) } - return types.DecodeMessage(sb.RawData()) + var msg *types.Message + err := cs.localviewer.View(c, func(b []byte) (err error) { + msg, err = types.DecodeMessage(b) + return err + }) + return msg, err } func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) { - sb, err := cs.localbs.Get(c) - if err != nil { - log.Errorf("get message get failed: %s: %s", c, err) - return nil, err + if cs.localviewer == nil { + sb, err := cs.localbs.Get(c) + if err != nil { + log.Errorf("get message get failed: %s: %s", c, err) + return nil, err + } + return types.DecodeSignedMessage(sb.RawData()) } - return types.DecodeSignedMessage(sb.RawData()) + var msg *types.SignedMessage + err := cs.localviewer.View(c, func(b []byte) (err error) { + msg, err = types.DecodeSignedMessage(b) + return err + }) + return msg, err } func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) { diff --git a/chain/store/store_test.go b/chain/store/store_test.go index 61ff9862052..bef06602403 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -3,6 +3,7 @@ package store_test import ( "bytes" "context" + "io" "testing" datastore "github.com/ipfs/go-datastore" @@ -51,18 +52,24 @@ func BenchmarkGetRandomness(b *testing.B) { b.Fatal(err) } - bds, err := lr.Datastore("/chain") + bs, err := lr.Blockstore(repo.BlockstoreChain) if err != nil { b.Fatal(err) } + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + b.Logf("WARN: failed to close blockstore: %s", err) + } + } + }() + mds, err := lr.Datastore("/metadata") if err != nil { b.Fatal(err) } - bs := blockstore.NewBlockstore(bds) - cs := store.NewChainStore(bs, bs, mds, nil, nil) b.ResetTimer() diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 8b7f78074a0..fa28ce92847 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -70,6 +70,7 @@ func ResolveToKeyAddr(state types.StateTree, cst cbor.IpldStore, addr address.Ad } var _ cbor.IpldBlockstore = (*gasChargingBlocks)(nil) +var _ blockstore.Viewer = (*gasChargingBlocks)(nil) type gasChargingBlocks struct { chargeGas func(GasCharge) @@ -77,6 +78,24 @@ type gasChargingBlocks struct { under cbor.IpldBlockstore } +func (bs *gasChargingBlocks) View(c cid.Cid, cb func([]byte) error) error { + if v, ok := bs.under.(blockstore.Viewer); ok { + bs.chargeGas(bs.pricelist.OnIpldGet()) + return v.View(c, func(b []byte) error { + // we have successfully retrieved the value; charge for it, even if the user-provided function fails. + bs.chargeGas(newGasCharge("OnIpldViewEnd", 0, 0).WithExtra(len(b))) + bs.chargeGas(gasOnActorExec) + return cb(b) + }) + } + // the underlying blockstore doesn't implement the viewer interface, fall back to normal Get behaviour. + blk, err := bs.Get(c) + if err == nil && blk != nil { + return cb(blk.RawData()) + } + return err +} + func (bs *gasChargingBlocks) Get(c cid.Cid) (block.Block, error) { bs.chargeGas(bs.pricelist.OnIpldGet()) blk, err := bs.under.Get(c) @@ -130,10 +149,10 @@ func (vm *VM) makeRuntime(ctx context.Context, msg *types.Message, parent *Runti rt.Abortf(exitcode.SysErrForbidden, "message execution exceeds call depth") } - rt.cst = &cbor.BasicIpldStore{ - Blocks: &gasChargingBlocks{rt.chargeGasFunc(2), rt.pricelist, vm.cst.Blocks}, - Atlas: vm.cst.Atlas, - } + cbb := &gasChargingBlocks{rt.chargeGasFunc(2), rt.pricelist, vm.cst.Blocks} + cst := cbor.NewCborStore(cbb) + cst.Atlas = vm.cst.Atlas // associate the atlas. + rt.cst = cst vmm := *msg resF, ok := rt.ResolveAddress(msg.From) diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index bb7baf2b80f..c99bed15896 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -16,21 +16,30 @@ import ( "sort" "time" + ocprom "contrib.go.opencensus.io/exporter/prometheus" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/bloom" + "github.com/ipfs/go-cid" + metricsi "github.com/ipfs/go-metrics-interface" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" + lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/blockstore" + badgerbs "github.com/filecoin-project/lotus/lib/blockstore/badger" _ "github.com/filecoin-project/lotus/lib/sigs/bls" _ "github.com/filecoin-project/lotus/lib/sigs/secp" + "github.com/filecoin-project/lotus/node/repo" + + "github.com/filecoin-project/go-state-types/abi" metricsprometheus "github.com/ipfs/go-metrics-prometheus" "github.com/ipld/go-car" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" bdg "github.com/dgraph-io/badger/v2" @@ -56,9 +65,25 @@ var importBenchCmd = &cli.Command{ importAnalyzeCmd, }, Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "start-tipset", + Usage: "start validation at the given tipset key; in format cid1,cid2,cid3...", + }, + &cli.StringFlag{ + Name: "end-tipset", + Usage: "halt validation at the given tipset key; in format cid1,cid2,cid3...", + }, + &cli.StringFlag{ + Name: "genesis-tipset", + Usage: "genesis tipset key; in format cid1,cid2,cid3...", + }, + &cli.Int64Flag{ + Name: "start-height", + Usage: "start validation at given height; beware that chain traversal by height is very slow", + }, &cli.Int64Flag{ - Name: "height", - Usage: "halt validation after given height", + Name: "end-height", + Usage: "halt validation after given height; beware that chain traversal by height is very slow", }, &cli.IntFlag{ Name: "batch-seal-verify-threads", @@ -86,32 +111,52 @@ var importBenchCmd = &cli.Command{ Name: "global-profile", Value: true, }, - &cli.Int64Flag{ - Name: "start-at", - }, &cli.BoolFlag{ Name: "only-import", }, &cli.BoolFlag{ Name: "use-pebble", }, + &cli.BoolFlag{ + Name: "use-native-badger", + }, + &cli.StringFlag{ + Name: "car", + Usage: "path to CAR file; required for import; on validation, either " + + "a CAR path or the --head flag are required", + }, + &cli.StringFlag{ + Name: "head", + Usage: "tipset key of the head, useful when benchmarking validation " + + "on an existing chain store, where a CAR is not available; " + + "if both --car and --head are provided, --head takes precedence " + + "over the CAR root; the format is cid1,cid2,cid3...", + }, }, Action: func(cctx *cli.Context) error { metricsprometheus.Inject() //nolint:errcheck vm.BatchSealVerifyParallelism = cctx.Int("batch-seal-verify-threads") - if !cctx.Args().Present() { - fmt.Println("must pass car file of chain to benchmark importing") - return nil - } - - cfi, err := os.Open(cctx.Args().First()) - if err != nil { - return err - } - defer cfi.Close() //nolint:errcheck // read only file go func() { - http.Handle("/debug/metrics/prometheus", promhttp.Handler()) + // Prometheus globals are exposed as interfaces, but the prometheus + // OpenCensus exporter expects a concrete *Registry. The concrete type of + // the globals are actually *Registry, so we downcast them, staying + // defensive in case things change under the hood. + registry, ok := prometheus.DefaultRegisterer.(*prometheus.Registry) + if !ok { + log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", prometheus.DefaultRegisterer) + return + } + exporter, err := ocprom.NewExporter(ocprom.Options{ + Registry: registry, + Namespace: "lotus", + }) + if err != nil { + log.Fatalf("could not create the prometheus stats exporter: %v", err) + } + + http.Handle("/debug/metrics", exporter) + http.ListenAndServe("localhost:6060", nil) //nolint:errcheck }() @@ -126,17 +171,17 @@ var importBenchCmd = &cli.Command{ tdir = tmp } - bdgOpt := badger.DefaultOptions - bdgOpt.GcInterval = 0 - bdgOpt.Options = bdg.DefaultOptions("") - bdgOpt.Options.SyncWrites = false - bdgOpt.Options.Truncate = true - bdgOpt.Options.DetectConflicts = false + var ( + ds datastore.Batching + bs blockstore.Blockstore + err error + ) - var bds datastore.Batching - if cctx.Bool("use-pebble") { + switch { + case cctx.Bool("use-pebble"): + log.Info("using pebble") cache := 512 - bds, err = pebbleds.NewDatastore(tdir, &pebble.Options{ + ds, err = pebbleds.NewDatastore(tdir, &pebble.Options{ // Pebble has a single combined cache area and the write // buffers are taken from this too. Assign all available // memory allowance for cache. @@ -155,30 +200,53 @@ var importBenchCmd = &cli.Command{ }, Logger: log, }) - } else { - bds, err = badger.NewDatastore(tdir, &bdgOpt) + + case cctx.Bool("use-native-badger"): + log.Info("using native badger") + var opts badgerbs.Options + if opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, tdir, false); err != nil { + return err + } + opts.SyncWrites = false + bs, err = badgerbs.Open(opts) + + default: // legacy badger via datastore. + log.Info("using legacy badger") + bdgOpt := badger.DefaultOptions + bdgOpt.GcInterval = 0 + bdgOpt.Options = bdg.DefaultOptions("") + bdgOpt.Options.SyncWrites = false + bdgOpt.Options.Truncate = true + bdgOpt.Options.DetectConflicts = false + + ds, err = badger.NewDatastore(tdir, &bdgOpt) } + if err != nil { return err } - defer bds.Close() //nolint:errcheck - bds = measure.New("dsbench", bds) + if ds != nil { + ds = measure.New("dsbench", ds) + defer ds.Close() //nolint:errcheck + bs = blockstore.NewBlockstore(ds) + } - bs := blockstore.NewBlockstore(bds) + if c, ok := bs.(io.Closer); ok { + defer c.Close() //nolint:errcheck + } + + ctx := metricsi.CtxScope(context.Background(), "lotus") cacheOpts := blockstore.DefaultCacheOpts() cacheOpts.HasBloomFilterSize = 0 - - cbs, err := blockstore.CachedBlockstore(context.TODO(), bs, cacheOpts) + bs, err = blockstore.CachedBlockstore(ctx, bs, cacheOpts) if err != nil { return err } - bs = cbs - ds := datastore.NewMapDatastore() var verifier ffiwrapper.Verifier = ffiwrapper.ProofVerifier if cctx.IsSet("syscall-cache") { - scds, err := badger.NewDatastore(cctx.String("syscall-cache"), &bdgOpt) + scds, err := badger.NewDatastore(cctx.String("syscall-cache"), &badger.DefaultOptions) if err != nil { return xerrors.Errorf("opening syscall-cache datastore: %w", err) } @@ -193,29 +261,111 @@ var importBenchCmd = &cli.Command{ return nil } - cs := store.NewChainStore(bs, bs, ds, vm.Syscalls(verifier), nil) + metadataDs := datastore.NewMapDatastore() + cs := store.NewChainStore(bs, bs, metadataDs, vm.Syscalls(verifier), nil) stm := stmgr.NewStateManager(cs) - if cctx.Bool("global-profile") { - prof, err := os.Create("import-bench.prof") + startTime := time.Now() + + // register a gauge that reports how long since the measurable + // operation began. + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "lotus_bench_time_taken_secs", + }, func() float64 { + return time.Since(startTime).Seconds() + }) + + defer func() { + end := time.Now().Format(time.RFC3339) + + resp, err := http.Get("http://localhost:6060/debug/metrics") if err != nil { - return err + log.Warnf("failed to scape prometheus: %s", err) } - defer prof.Close() //nolint:errcheck - if err := pprof.StartCPUProfile(prof); err != nil { - return err + metricsfi, err := os.Create("bench.metrics") + if err != nil { + log.Warnf("failed to write prometheus data: %s", err) + } + + _, _ = io.Copy(metricsfi, resp.Body) //nolint:errcheck + _ = metricsfi.Close() //nolint:errcheck + + writeProfile := func(name string) { + if file, err := os.Create(fmt.Sprintf("%s.%s.%s.pprof", name, startTime.Format(time.RFC3339), end)); err == nil { + if err := pprof.Lookup(name).WriteTo(file, 0); err != nil { + log.Warnf("failed to write %s pprof: %s", name, err) + } + _ = file.Close() + } else { + log.Warnf("failed to create %s pprof file: %s", name, err) + } + } + + writeProfile("heap") + writeProfile("allocs") + }() + + var carFile *os.File + + // open the CAR file if one is provided. + if path := cctx.String("car"); path != "" { + var err error + if carFile, err = os.Open(path); err != nil { + return xerrors.Errorf("failed to open provided CAR file: %w", err) } } var head *types.TipSet + + // --- IMPORT --- if !cctx.Bool("no-import") { - head, err = cs.Import(cfi) + if cctx.Bool("global-profile") { + prof, err := os.Create("bench.import.pprof") + if err != nil { + return err + } + defer prof.Close() //nolint:errcheck + + if err := pprof.StartCPUProfile(prof); err != nil { + return err + } + } + + // import is NOT suppressed; do it. + if carFile == nil { // a CAR is compulsory for the import. + return fmt.Errorf("no CAR file provided for import") + } + + head, err = cs.Import(carFile) if err != nil { return err } - } else { - cr, err := car.NewCarReader(cfi) + + pprof.StopCPUProfile() + } + + if cctx.Bool("only-import") { + return nil + } + + // --- VALIDATION --- + // + // we are now preparing for the validation benchmark. + // a HEAD needs to be set; --head takes precedence over the root + // of the CAR, if both are provided. + if h := cctx.String("head"); h != "" { + cids, err := lcli.ParseTipSetString(h) + if err != nil { + return xerrors.Errorf("failed to parse head tipset key: %w", err) + } + + head, err = cs.LoadTipSet(types.NewTipSetKey(cids...)) + if err != nil { + return err + } + } else if carFile != nil && head == nil { + cr, err := car.NewCarReader(carFile) if err != nil { return err } @@ -223,59 +373,99 @@ var importBenchCmd = &cli.Command{ if err != nil { return err } + } else if h == "" && carFile == nil { + return xerrors.Errorf("neither --car nor --head flags supplied") } - if cctx.Bool("only-import") { - return nil + log.Infof("chain head is tipset: %s", head.Key()) + + var genesis *types.TipSet + log.Infof("getting genesis block") + if tsk := cctx.String("genesis-tipset"); tsk != "" { + var cids []cid.Cid + if cids, err = lcli.ParseTipSetString(tsk); err != nil { + return xerrors.Errorf("failed to parse genesis tipset key: %w", err) + } + genesis, err = cs.LoadTipSet(types.NewTipSetKey(cids...)) + } else { + log.Warnf("getting genesis by height; this will be slow; pass in the genesis tipset through --genesis-tipset") + // fallback to the slow path of walking the chain. + genesis, err = cs.GetTipsetByHeight(context.TODO(), 0, head, true) } - gb, err := cs.GetTipsetByHeight(context.TODO(), 0, head, true) if err != nil { return err } - err = cs.SetGenesis(gb.Blocks()[0]) - if err != nil { + if err = cs.SetGenesis(genesis.Blocks()[0]); err != nil { return err } - startEpoch := abi.ChainEpoch(1) - if cctx.IsSet("start-at") { - startEpoch = abi.ChainEpoch(cctx.Int64("start-at")) - start, err := cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(cctx.Int64("start-at")), head, true) - if err != nil { - return err + // Resolve the end tipset, falling back to head if not provided. + end := head + if tsk := cctx.String("end-tipset"); tsk != "" { + var cids []cid.Cid + if cids, err = lcli.ParseTipSetString(tsk); err != nil { + return xerrors.Errorf("failed to end genesis tipset key: %w", err) } + end, err = cs.LoadTipSet(types.NewTipSetKey(cids...)) + } else if h := cctx.Int64("end-height"); h != 0 { + log.Infof("getting end tipset at height %d...", h) + end, err = cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), head, true) + } - err = cs.SetHead(start) - if err != nil { - return err + if err != nil { + return err + } + + // Resolve the start tipset, if provided; otherwise, fallback to + // height 1 for a start point. + var ( + startEpoch = abi.ChainEpoch(1) + start *types.TipSet + ) + + if tsk := cctx.String("start-tipset"); tsk != "" { + var cids []cid.Cid + if cids, err = lcli.ParseTipSetString(tsk); err != nil { + return xerrors.Errorf("failed to start genesis tipset key: %w", err) } + start, err = cs.LoadTipSet(types.NewTipSetKey(cids...)) + } else if h := cctx.Int64("start-height"); h != 0 { + log.Infof("getting start tipset at height %d...", h) + // lookback from the end tipset (which falls back to head if not supplied). + start, err = cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), end, true) } - if h := cctx.Int64("height"); h != 0 { - tsh, err := cs.GetTipsetByHeight(context.TODO(), abi.ChainEpoch(h), head, true) - if err != nil { + if err != nil { + return err + } + + if start != nil { + startEpoch = start.Height() + if err := cs.ForceHeadSilent(context.Background(), start); err != nil { + // if err := cs.SetHead(start); err != nil { return err } - head = tsh } - ts := head - tschain := []*types.TipSet{ts} - for ts.Height() > startEpoch { + inverseChain := append(make([]*types.TipSet, 0, end.Height()), end) + for ts := end; ts.Height() > startEpoch; { + if h := ts.Height(); h%100 == 0 { + log.Infof("walking back the chain; loaded tipset at height %d...", h) + } next, err := cs.LoadTipSet(ts.Parents()) if err != nil { return err } - tschain = append(tschain, next) + inverseChain = append(inverseChain, next) ts = next } var enc *json.Encoder if cctx.Bool("export-traces") { - ibj, err := os.Create("import-bench.json") + ibj, err := os.Create("bench.json") if err != nil { return err } @@ -284,8 +474,20 @@ var importBenchCmd = &cli.Command{ enc = json.NewEncoder(ibj) } - for i := len(tschain) - 1; i >= 1; i-- { - cur := tschain[i] + if cctx.Bool("global-profile") { + prof, err := os.Create("bench.validation.pprof") + if err != nil { + return err + } + defer prof.Close() //nolint:errcheck + + if err := pprof.StartCPUProfile(prof); err != nil { + return err + } + } + + for i := len(inverseChain) - 1; i >= 1; i-- { + cur := inverseChain[i] start := time.Now() log.Infof("computing state (height: %d, ts=%s)", cur.Height(), cur.Cids()) st, trace, err := stm.ExecutionTrace(context.TODO(), cur) @@ -304,7 +506,7 @@ var importBenchCmd = &cli.Command{ return xerrors.Errorf("failed to write out tipsetexec: %w", err) } } - if tschain[i-1].ParentState() != st { + if inverseChain[i-1].ParentState() != st { stripCallers(tse.Trace) lastTrace := tse.Trace d, err := json.MarshalIndent(lastTrace, "", " ") @@ -320,23 +522,7 @@ var importBenchCmd = &cli.Command{ pprof.StopCPUProfile() - if true { - resp, err := http.Get("http://localhost:6060/debug/metrics/prometheus") - if err != nil { - return err - } - - metricsfi, err := os.Create("import-bench.metrics") - if err != nil { - return err - } - - io.Copy(metricsfi, resp.Body) //nolint:errcheck - metricsfi.Close() //nolint:errcheck - } - return nil - }, } diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 8f3c9574ec7..474dfe68585 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "io" "strconv" "github.com/filecoin-project/lotus/chain/gen/genesis" @@ -10,6 +11,7 @@ import ( _init "github.com/filecoin-project/lotus/chain/actors/builtin/init" "github.com/docker/go-units" + "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/actors/builtin/multisig" "github.com/filecoin-project/lotus/chain/actors/builtin/power" @@ -24,6 +26,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/state" @@ -33,7 +36,6 @@ import ( "github.com/filecoin-project/lotus/chain/vm" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" - "github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/node/repo" ) @@ -168,18 +170,24 @@ var chainBalanceStateCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - ds, err := lkrepo.Datastore("/chain") + bs, err := lkrepo.Blockstore(repo.BlockstoreChain) if err != nil { - return err + return fmt.Errorf("failed to open blockstore: %w", err) } + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() + mds, err := lkrepo.Datastore("/metadata") if err != nil { return err } - bs := blockstore.NewBlockstore(ds) - cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) cst := cbor.NewCborStore(bs) @@ -382,18 +390,24 @@ var chainPledgeCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - ds, err := lkrepo.Datastore("/chain") + bs, err := lkrepo.Blockstore(repo.BlockstoreChain) if err != nil { - return err + return xerrors.Errorf("failed to open blockstore: %w", err) } + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() + mds, err := lkrepo.Datastore("/metadata") if err != nil { return err } - bs := blockstore.NewBlockstore(ds) - cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) cst := cbor.NewCborStore(bs) diff --git a/cmd/lotus-shed/datastore.go b/cmd/lotus-shed/datastore.go index 83422e77b82..8cdc1630cb5 100644 --- a/cmd/lotus-shed/datastore.go +++ b/cmd/lotus-shed/datastore.go @@ -8,10 +8,10 @@ import ( "os" "strings" + "github.com/dgraph-io/badger/v2" "github.com/docker/go-units" "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" - badgerds "github.com/ipfs/go-ds-badger2" logging "github.com/ipfs/go-log" "github.com/mitchellh/go-homedir" "github.com/polydawn/refmt/cbor" @@ -312,30 +312,41 @@ var datastoreRewriteCmd = &cli.Command{ return xerrors.Errorf("cannot get toPath: %w", err) } - opts := repo.ChainBadgerOptions() - opts.Options = opts.Options.WithSyncWrites(false) - to, err := badgerds.NewDatastore(toPath, &opts) + var ( + from *badger.DB + to *badger.DB + ) + + // open the destination (to) store. + opts, err := repo.BadgerBlockstoreOptions(repo.BlockstoreChain, toPath, false) if err != nil { - return xerrors.Errorf("opennig 'to' datastore: %w", err) + return xerrors.Errorf("failed to get badger options: %w", err) + } + opts.SyncWrites = false + if to, err = badger.Open(opts.Options); err != nil { + return xerrors.Errorf("opening 'to' badger store: %w", err) } - opts.Options = opts.Options.WithReadOnly(false) - from, err := badgerds.NewDatastore(fromPath, &opts) + // open the source (from) store. + opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, fromPath, true) if err != nil { - return xerrors.Errorf("opennig 'from' datastore: %w", err) + return xerrors.Errorf("failed to get badger options: %w", err) + } + if from, err = badger.Open(opts.Options); err != nil { + return xerrors.Errorf("opening 'from' datastore: %w", err) } pr, pw := io.Pipe() errCh := make(chan error) go func() { bw := bufio.NewWriterSize(pw, 64<<20) - _, err := from.DB.Backup(bw, 0) + _, err := from.Backup(bw, 0) _ = bw.Flush() _ = pw.CloseWithError(err) errCh <- err }() go func() { - err := to.DB.Load(pr, 256) + err := to.Load(pr, 256) errCh <- err }() diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index dcf45e9a8c5..42434f3d25f 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -3,16 +3,17 @@ package main import ( "context" "fmt" + "io" "os" "github.com/urfave/cli/v2" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" - "github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/node/repo" ) @@ -71,18 +72,24 @@ var exportChainCmd = &cli.Command{ defer fi.Close() //nolint:errcheck - ds, err := lr.Datastore("/chain") + bs, err := lr.Blockstore(repo.BlockstoreChain) if err != nil { - return err + return fmt.Errorf("failed to open blockstore: %w", err) } + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() + mds, err := lr.Datastore("/metadata") if err != nil { return err } - bs := blockstore.NewBlockstore(ds) - cs := store.NewChainStore(bs, bs, mds, nil, nil) if err := cs.Load(); err != nil { return err diff --git a/cmd/lotus-shed/import-car.go b/cmd/lotus-shed/import-car.go index 9cbff953b16..9fa8537284f 100644 --- a/cmd/lotus-shed/import-car.go +++ b/cmd/lotus-shed/import-car.go @@ -12,7 +12,6 @@ import ( "github.com/urfave/cli/v2" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/node/repo" ) @@ -45,12 +44,18 @@ var importCarCmd = &cli.Command{ return xerrors.Errorf("opening the car file: %w", err) } - ds, err := lr.Datastore("/chain") + bs, err := lr.Blockstore(repo.BlockstoreChain) if err != nil { return err } - bs := blockstore.NewBlockstore(ds) + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() cr, err := car.NewCarReader(f) if err != nil { @@ -65,7 +70,7 @@ var importCarCmd = &cli.Command{ return err } fmt.Println() - return ds.Close() + return nil default: if err := f.Close(); err != nil { return err @@ -108,12 +113,18 @@ var importObjectCmd = &cli.Command{ } defer lr.Close() //nolint:errcheck - ds, err := lr.Datastore("/chain") + bs, err := lr.Blockstore(repo.BlockstoreChain) if err != nil { - return err + return fmt.Errorf("failed to open blockstore: %w", err) } - bs := blockstore.NewBlockstore(ds) + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() c, err := cid.Decode(cctx.Args().Get(0)) if err != nil { diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index f61c8d8eab9..76d283f6b26 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -3,20 +3,19 @@ package main import ( "context" "fmt" + "io" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/vm" - "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" - "github.com/filecoin-project/lotus/lib/blockstore" - "github.com/filecoin-project/lotus/node/repo" "github.com/ipfs/bbloom" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/query" - dshelp "github.com/ipfs/go-ipfs-ds-help" "github.com/urfave/cli/v2" "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + badgerbs "github.com/filecoin-project/lotus/lib/blockstore/badger" + "github.com/filecoin-project/lotus/node/repo" ) type cidSet interface { @@ -132,12 +131,25 @@ var stateTreePruneCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - ds, err := lkrepo.Datastore("/chain") + bs, err := lkrepo.Blockstore(repo.BlockstoreChain) if err != nil { - return err + return fmt.Errorf("failed to open blockstore: %w", err) } - defer ds.Close() //nolint:errcheck + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() + + // After migrating to native blockstores, this has been made + // database-specific. + badgbs, ok := bs.(*badgerbs.Blockstore) + if !ok { + return fmt.Errorf("only badger blockstores are supported") + } mds, err := lkrepo.Datastore("/metadata") if err != nil { @@ -145,23 +157,18 @@ var stateTreePruneCmd = &cli.Command{ } defer mds.Close() //nolint:errcheck + const DiscardRatio = 0.2 if cctx.Bool("only-ds-gc") { - gcds, ok := ds.(datastore.GCDatastore) - if ok { - fmt.Println("running datastore gc....") - for i := 0; i < cctx.Int("gc-count"); i++ { - if err := gcds.CollectGarbage(); err != nil { - return xerrors.Errorf("datastore GC failed: %w", err) - } + fmt.Println("running datastore gc....") + for i := 0; i < cctx.Int("gc-count"); i++ { + if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil { + return xerrors.Errorf("datastore GC failed: %w", err) } - fmt.Println("gc complete!") - return nil } - return fmt.Errorf("datastore doesnt support gc") + fmt.Println("gc complete!") + return nil } - bs := blockstore.NewBlockstore(ds) - cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) if err := cs.Load(); err != nil { return fmt.Errorf("loading chainstore: %w", err) @@ -199,63 +206,30 @@ var stateTreePruneCmd = &cli.Command{ return nil } - var b datastore.Batch - var batchCount int - markForRemoval := func(c cid.Cid) error { - if b == nil { - nb, err := ds.Batch() - if err != nil { - return fmt.Errorf("opening batch: %w", err) - } - - b = nb - } - batchCount++ - - if err := b.Delete(dshelp.MultihashToDsKey(c.Hash())); err != nil { - return err - } + b := badgbs.DB.NewWriteBatch() + defer b.Cancel() - if batchCount > 100 { - if err := b.Commit(); err != nil { - return xerrors.Errorf("failed to commit batch deletes: %w", err) - } - b = nil - batchCount = 0 - } - return nil + markForRemoval := func(c cid.Cid) error { + return b.Delete(badgbs.StorageKey(nil, c)) } - res, err := ds.Query(query.Query{KeysOnly: true}) + keys, err := bs.AllKeysChan(context.Background()) if err != nil { - return xerrors.Errorf("failed to query datastore: %w", err) + return xerrors.Errorf("failed to query blockstore: %w", err) } dupTo := cctx.Int("delete-up-to") var deleteCount int var goodHits int - for { - v, ok := res.NextSync() - if !ok { - break - } - - bk, err := dshelp.BinaryFromDsKey(datastore.RawKey(v.Key[len("/blocks"):])) - if err != nil { - return xerrors.Errorf("failed to parse key: %w", err) - } - - if goodSet.HasRaw(bk) { + for k := range keys { + if goodSet.HasRaw(k.Bytes()) { goodHits++ continue } - nc := cid.NewCidV1(cid.Raw, bk) - - deleteCount++ - if err := markForRemoval(nc); err != nil { - return fmt.Errorf("failed to remove cid %s: %w", nc, err) + if err := markForRemoval(k); err != nil { + return fmt.Errorf("failed to remove cid %s: %w", k, err) } if deleteCount%20 == 0 { @@ -267,22 +241,17 @@ var stateTreePruneCmd = &cli.Command{ } } - if b != nil { - if err := b.Commit(); err != nil { - return xerrors.Errorf("failed to commit final batch delete: %w", err) - } + if err := b.Flush(); err != nil { + return xerrors.Errorf("failed to flush final batch delete: %w", err) } - gcds, ok := ds.(datastore.GCDatastore) - if ok { - fmt.Println("running datastore gc....") - for i := 0; i < cctx.Int("gc-count"); i++ { - if err := gcds.CollectGarbage(); err != nil { - return xerrors.Errorf("datastore GC failed: %w", err) - } + fmt.Println("running datastore gc....") + for i := 0; i < cctx.Int("gc-count"); i++ { + if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil { + return xerrors.Errorf("datastore GC failed: %w", err) } - fmt.Println("gc complete!") } + fmt.Println("gc complete!") return nil }, diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 42fee736b7f..f49278a2b83 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -36,7 +36,6 @@ import ( lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/journal" - "github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" @@ -406,18 +405,24 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) { } defer lr.Close() //nolint:errcheck - ds, err := lr.Datastore("/chain") + bs, err := lr.Blockstore(repo.BlockstoreChain) if err != nil { - return err + return xerrors.Errorf("failed to open blockstore: %w", err) } + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() + mds, err := lr.Datastore("/metadata") if err != nil { return err } - bs := blockstore.NewBlockstore(ds) - j, err := journal.OpenFSJournal(lr, journal.EnvDisabledEvents()) if err != nil { return xerrors.Errorf("failed to open journal: %w", err) diff --git a/go.mod b/go.mod index 84cdd2073b3..c67d6f2edd4 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/ipfs/go-filestore v1.0.0 github.com/ipfs/go-fs-lock v0.0.6 github.com/ipfs/go-graphsync v0.4.2 - github.com/ipfs/go-ipfs-blockstore v1.0.1 + github.com/ipfs/go-ipfs-blockstore v1.0.2 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/ipfs/go-ipfs-exchange-interface v0.0.1 @@ -76,7 +76,8 @@ require ( github.com/ipfs/go-ipfs-files v0.0.8 github.com/ipfs/go-ipfs-http-client v0.0.5 github.com/ipfs/go-ipfs-routing v0.1.0 - github.com/ipfs/go-ipld-cbor v0.0.5-0.20200428170625-a0bd04d3cbdf + github.com/ipfs/go-ipfs-util v0.0.2 + github.com/ipfs/go-ipld-cbor v0.0.5 github.com/ipfs/go-ipld-format v0.2.0 github.com/ipfs/go-log v1.0.4 github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4 @@ -90,6 +91,7 @@ require ( github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018 github.com/kelseyhightower/envconfig v1.4.0 github.com/lib/pq v1.7.0 + github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-libp2p v0.11.0 github.com/libp2p/go-libp2p-connmgr v0.2.4 diff --git a/go.sum b/go.sum index 15c1bc49ac7..9c505b5b029 100644 --- a/go.sum +++ b/go.sum @@ -563,6 +563,8 @@ github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86 github.com/ipfs/go-ipfs-blockstore v1.0.0/go.mod h1:knLVdhVU9L7CC4T+T4nvGdeUIPAXlnd9zmXfp+9MIjU= github.com/ipfs/go-ipfs-blockstore v1.0.1 h1:fnuVj4XdZp4yExhd0CnUwAiMNJHiPnfInhiuwz4lW1w= github.com/ipfs/go-ipfs-blockstore v1.0.1/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= +github.com/ipfs/go-ipfs-blockstore v1.0.2 h1:Z8nUlBHK7wVKPKliQCQR9tLgUtz4J2QRbqFcJrqzM+E= +github.com/ipfs/go-ipfs-blockstore v1.0.2/go.mod h1:MGNZlHNEnR4KGgPHM3/k8lBySIOK2Ve+0KjZubKlaOE= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk= github.com/ipfs/go-ipfs-chunker v0.0.1/go.mod h1:tWewYK0we3+rMbOh7pPFGDyypCtvGcBFymgY4rSDLAw= @@ -607,8 +609,8 @@ github.com/ipfs/go-ipld-cbor v0.0.2/go.mod h1:wTBtrQZA3SoFKMVkp6cn6HMRteIB1VsmHA github.com/ipfs/go-ipld-cbor v0.0.3/go.mod h1:wTBtrQZA3SoFKMVkp6cn6HMRteIB1VsmHA0AQFOn7Nc= github.com/ipfs/go-ipld-cbor v0.0.4/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9Oh8B2Ftq4= github.com/ipfs/go-ipld-cbor v0.0.5-0.20200204214505-252690b78669/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9Oh8B2Ftq4= -github.com/ipfs/go-ipld-cbor v0.0.5-0.20200428170625-a0bd04d3cbdf h1:PRCy+w3GocY77CBEwTprp6hn7PLiEU1YToKe7B+1FVk= -github.com/ipfs/go-ipld-cbor v0.0.5-0.20200428170625-a0bd04d3cbdf/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9Oh8B2Ftq4= +github.com/ipfs/go-ipld-cbor v0.0.5 h1:ovz4CHKogtG2KB/h1zUp5U0c/IzZrL435rCh5+K/5G8= +github.com/ipfs/go-ipld-cbor v0.0.5/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9Oh8B2Ftq4= github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dCDnkOJhcZkms= github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf0INGQgiKf9k= github.com/ipfs/go-ipld-format v0.2.0 h1:xGlJKkArkmBvowr+GMCX0FEZtkro71K1AwiKnL37mwA= diff --git a/lib/blockstore/badger/blockstore.go b/lib/blockstore/badger/blockstore.go new file mode 100644 index 00000000000..aa57b5cae73 --- /dev/null +++ b/lib/blockstore/badger/blockstore.go @@ -0,0 +1,423 @@ +package badgerbs + +import ( + "context" + "fmt" + "io" + "sync/atomic" + + "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/badger/v2/options" + "github.com/multiformats/go-base32" + "go.uber.org/zap" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + logger "github.com/ipfs/go-log/v2" + pool "github.com/libp2p/go-buffer-pool" + + "github.com/filecoin-project/lotus/lib/blockstore" +) + +var ( + // KeyPool is the buffer pool we use to compute storage keys. + KeyPool *pool.BufferPool = pool.GlobalPool +) + +var ( + // ErrBlockstoreClosed is returned from blockstore operations after + // the blockstore has been closed. + ErrBlockstoreClosed = fmt.Errorf("badger blockstore closed") + + log = logger.Logger("badgerbs") +) + +// aliases to mask badger dependencies. +const ( + // FileIO is equivalent to badger/options.FileIO. + FileIO = options.FileIO + // MemoryMap is equivalent to badger/options.MemoryMap. + MemoryMap = options.MemoryMap + // LoadToRAM is equivalent to badger/options.LoadToRAM. + LoadToRAM = options.LoadToRAM +) + +// Options embeds the badger options themselves, and augments them with +// blockstore-specific options. +type Options struct { + badger.Options + + // Prefix is an optional prefix to prepend to keys. Default: "". + Prefix string +} + +func DefaultOptions(path string) Options { + return Options{ + Options: badger.DefaultOptions(path), + Prefix: "", + } +} + +// badgerLogger is a local wrapper for go-log to make the interface +// compatible with badger.Logger (namely, aliasing Warnf to Warningf) +type badgerLogger struct { + *zap.SugaredLogger // skips 1 caller to get useful line info, skipping over badger.Options. + + skip2 *zap.SugaredLogger // skips 2 callers, just like above + this logger. +} + +// Warningf is required by the badger logger APIs. +func (b *badgerLogger) Warningf(format string, args ...interface{}) { + b.skip2.Warnf(format, args...) +} + +const ( + stateOpen int64 = iota + stateClosing + stateClosed +) + +// Blockstore is a badger-backed IPLD blockstore. +// +// NOTE: once Close() is called, methods will try their best to return +// ErrBlockstoreClosed. This will guaranteed to happen for all subsequent +// operation calls after Close() has returned, but it may not happen for +// operations in progress. Those are likely to fail with a different error. +type Blockstore struct { + DB *badger.DB + + // state is guarded by atomic. + state int64 + + prefixing bool + prefix []byte + prefixLen int +} + +var _ blockstore.Blockstore = (*Blockstore)(nil) +var _ blockstore.Viewer = (*Blockstore)(nil) +var _ io.Closer = (*Blockstore)(nil) + +// Open creates a new badger-backed blockstore, with the supplied options. +func Open(opts Options) (*Blockstore, error) { + opts.Logger = &badgerLogger{ + SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), + skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), + } + + db, err := badger.Open(opts.Options) + if err != nil { + return nil, fmt.Errorf("failed to open badger blockstore: %w", err) + } + + bs := &Blockstore{ + DB: db, + } + + if p := opts.Prefix; p != "" { + bs.prefixing = true + bs.prefix = []byte(p) + bs.prefixLen = len(bs.prefix) + } + + return bs, nil +} + +// Close closes the store. If the store has already been closed, this noops and +// returns an error, even if the first closure resulted in error. +func (b *Blockstore) Close() error { + if !atomic.CompareAndSwapInt64(&b.state, stateOpen, stateClosing) { + return nil + } + + defer atomic.StoreInt64(&b.state, stateClosed) + return b.DB.Close() +} + +// View implements blockstore.Viewer, which leverages zero-copy read-only +// access to values. +func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + k, pooled := b.PooledStorageKey(cid) + if pooled { + defer KeyPool.Put(k) + } + + return b.DB.View(func(txn *badger.Txn) error { + switch item, err := txn.Get(k); err { + case nil: + return item.Value(fn) + case badger.ErrKeyNotFound: + return blockstore.ErrNotFound + default: + return fmt.Errorf("failed to view block from badger blockstore: %w", err) + } + }) +} + +// Has implements Blockstore.Has. +func (b *Blockstore) Has(cid cid.Cid) (bool, error) { + if atomic.LoadInt64(&b.state) != stateOpen { + return false, ErrBlockstoreClosed + } + + k, pooled := b.PooledStorageKey(cid) + if pooled { + defer KeyPool.Put(k) + } + + err := b.DB.View(func(txn *badger.Txn) error { + _, err := txn.Get(k) + return err + }) + + switch err { + case badger.ErrKeyNotFound: + return false, nil + case nil: + return true, nil + default: + return false, fmt.Errorf("failed to check if block exists in badger blockstore: %w", err) + } +} + +// Get implements Blockstore.Get. +func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) { + if !cid.Defined() { + return nil, blockstore.ErrNotFound + } + + if atomic.LoadInt64(&b.state) != stateOpen { + return nil, ErrBlockstoreClosed + } + + k, pooled := b.PooledStorageKey(cid) + if pooled { + defer KeyPool.Put(k) + } + + var val []byte + err := b.DB.View(func(txn *badger.Txn) error { + switch item, err := txn.Get(k); err { + case nil: + val, err = item.ValueCopy(nil) + return err + case badger.ErrKeyNotFound: + return blockstore.ErrNotFound + default: + return fmt.Errorf("failed to get block from badger blockstore: %w", err) + } + }) + if err != nil { + return nil, err + } + return blocks.NewBlockWithCid(val, cid) +} + +// GetSize implements Blockstore.GetSize. +func (b *Blockstore) GetSize(cid cid.Cid) (int, error) { + if atomic.LoadInt64(&b.state) != stateOpen { + return -1, ErrBlockstoreClosed + } + + k, pooled := b.PooledStorageKey(cid) + if pooled { + defer KeyPool.Put(k) + } + + var size int + err := b.DB.View(func(txn *badger.Txn) error { + switch item, err := txn.Get(k); err { + case nil: + size = int(item.ValueSize()) + case badger.ErrKeyNotFound: + return blockstore.ErrNotFound + default: + return fmt.Errorf("failed to get block size from badger blockstore: %w", err) + } + return nil + }) + if err != nil { + size = -1 + } + return size, err +} + +// Put implements Blockstore.Put. +func (b *Blockstore) Put(block blocks.Block) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + k, pooled := b.PooledStorageKey(block.Cid()) + if pooled { + defer KeyPool.Put(k) + } + + err := b.DB.Update(func(txn *badger.Txn) error { + return txn.Set(k, block.RawData()) + }) + if err != nil { + err = fmt.Errorf("failed to put block in badger blockstore: %w", err) + } + return err +} + +// PutMany implements Blockstore.PutMany. +func (b *Blockstore) PutMany(blocks []blocks.Block) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + batch := b.DB.NewWriteBatch() + defer batch.Cancel() + + // toReturn tracks the byte slices to return to the pool, if we're using key + // prefixing. we can't return each slice to the pool after each Set, because + // badger holds on to the slice. + var toReturn [][]byte + if b.prefixing { + toReturn = make([][]byte, 0, len(blocks)) + defer func() { + for _, b := range toReturn { + KeyPool.Put(b) + } + }() + } + + for _, block := range blocks { + k, pooled := b.PooledStorageKey(block.Cid()) + if pooled { + toReturn = append(toReturn, k) + } + if err := batch.Set(k, block.RawData()); err != nil { + return err + } + } + + err := batch.Flush() + if err != nil { + err = fmt.Errorf("failed to put blocks in badger blockstore: %w", err) + } + return err +} + +// DeleteBlock implements Blockstore.DeleteBlock. +func (b *Blockstore) DeleteBlock(cid cid.Cid) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + k, pooled := b.PooledStorageKey(cid) + if pooled { + defer KeyPool.Put(k) + } + + return b.DB.Update(func(txn *badger.Txn) error { + return txn.Delete(k) + }) +} + +// AllKeysChan implements Blockstore.AllKeysChan. +func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + if atomic.LoadInt64(&b.state) != stateOpen { + return nil, ErrBlockstoreClosed + } + + txn := b.DB.NewTransaction(false) + opts := badger.IteratorOptions{PrefetchSize: 100} + if b.prefixing { + opts.Prefix = b.prefix + } + iter := txn.NewIterator(opts) + + ch := make(chan cid.Cid) + go func() { + defer close(ch) + defer iter.Close() + + // NewCidV1 makes a copy of the multihash buffer, so we can reuse it to + // contain allocs. + var buf []byte + for iter.Rewind(); iter.Valid(); iter.Next() { + if ctx.Err() != nil { + return // context has fired. + } + if atomic.LoadInt64(&b.state) != stateOpen { + // open iterators will run even after the database is closed... + return // closing, yield. + } + k := iter.Item().Key() + if b.prefixing { + k = k[b.prefixLen:] + } + + if reqlen := base32.RawStdEncoding.DecodedLen(len(k)); len(buf) < reqlen { + buf = make([]byte, reqlen) + } + if n, err := base32.RawStdEncoding.Decode(buf, k); err == nil { + ch <- cid.NewCidV1(cid.Raw, buf[:n]) + } else { + log.Warnf("failed to decode key %s in badger AllKeysChan; err: %s", k, err) + } + } + }() + + return ch, nil +} + +// HashOnRead implements Blockstore.HashOnRead. It is not supported by this +// blockstore. +func (b *Blockstore) HashOnRead(_ bool) { + log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring") +} + +// PooledStorageKey returns the storage key under which this CID is stored. +// +// The key is: prefix + base32_no_padding(cid.Hash) +// +// This method may return pooled byte slice, which MUST be returned to the +// KeyPool if pooled=true, or a leak will occur. +func (b *Blockstore) PooledStorageKey(cid cid.Cid) (key []byte, pooled bool) { + h := cid.Hash() + size := base32.RawStdEncoding.EncodedLen(len(h)) + if !b.prefixing { // optimize for branch prediction. + k := pool.Get(size) + base32.RawStdEncoding.Encode(k, h) + return k, true // slicing upto length unnecessary; the pool has already done this. + } + + size += b.prefixLen + k := pool.Get(size) + copy(k, b.prefix) + base32.RawStdEncoding.Encode(k[b.prefixLen:], h) + return k, true // slicing upto length unnecessary; the pool has already done this. +} + +// Storage acts like PooledStorageKey, but attempts to write the storage key +// into the provided slice. If the slice capacity is insufficient, it allocates +// a new byte slice with enough capacity to accommodate the result. This method +// returns the resulting slice. +func (b *Blockstore) StorageKey(dst []byte, cid cid.Cid) []byte { + h := cid.Hash() + reqsize := base32.RawStdEncoding.EncodedLen(len(h)) + b.prefixLen + if reqsize > cap(dst) { + // passed slice is smaller than required size; create new. + dst = make([]byte, reqsize) + } else if reqsize > len(dst) { + // passed slice has enough capacity, but its length is + // restricted, expand. + dst = dst[:cap(dst)] + } + + if b.prefixing { // optimize for branch prediction. + copy(dst, b.prefix) + base32.RawStdEncoding.Encode(dst[b.prefixLen:], h) + } else { + base32.RawStdEncoding.Encode(dst, h) + } + return dst[:reqsize] +} diff --git a/lib/blockstore/badger/blockstore_test.go b/lib/blockstore/badger/blockstore_test.go new file mode 100644 index 00000000000..e357117e584 --- /dev/null +++ b/lib/blockstore/badger/blockstore_test.go @@ -0,0 +1,90 @@ +package badgerbs + +import ( + "io/ioutil" + "os" + "testing" + + blocks "github.com/ipfs/go-block-format" + blockstore "github.com/ipfs/go-ipfs-blockstore" + "github.com/stretchr/testify/require" +) + +func TestBadgerBlockstore(t *testing.T) { + (&Suite{ + NewBlockstore: newBlockstore(DefaultOptions), + OpenBlockstore: openBlockstore(DefaultOptions), + }).RunTests(t, "non_prefixed") + + prefixed := func(path string) Options { + opts := DefaultOptions(path) + opts.Prefix = "/prefixed/" + return opts + } + + (&Suite{ + NewBlockstore: newBlockstore(prefixed), + OpenBlockstore: openBlockstore(prefixed), + }).RunTests(t, "prefixed") +} + +func TestStorageKey(t *testing.T) { + bs, _ := newBlockstore(DefaultOptions)(t) + bbs := bs.(*Blockstore) + defer bbs.Close() //nolint:errcheck + + cid1 := blocks.NewBlock([]byte("some data")).Cid() + cid2 := blocks.NewBlock([]byte("more data")).Cid() + cid3 := blocks.NewBlock([]byte("a little more data")).Cid() + require.NotEqual(t, cid1, cid2) // sanity check + require.NotEqual(t, cid2, cid3) // sanity check + + // nil slice; let StorageKey allocate for us. + k1 := bbs.StorageKey(nil, cid1) + require.Len(t, k1, 55) + require.True(t, cap(k1) == len(k1)) + + // k1's backing array is reused. + k2 := bbs.StorageKey(k1, cid2) + require.Len(t, k2, 55) + require.True(t, cap(k2) == len(k1)) + + // bring k2 to len=0, and verify that its backing array gets reused + // (i.e. k1 and k2 are overwritten) + k3 := bbs.StorageKey(k2[:0], cid3) + require.Len(t, k3, 55) + require.True(t, cap(k3) == len(k3)) + + // backing array of k1 and k2 has been modified, i.e. memory is shared. + require.Equal(t, k3, k1) + require.Equal(t, k3, k2) +} + +func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.Blockstore, path string) { + return func(tb testing.TB) (bs blockstore.Blockstore, path string) { + tb.Helper() + + path, err := ioutil.TempDir("", "") + if err != nil { + tb.Fatal(err) + } + + db, err := Open(optsSupplier(path)) + if err != nil { + tb.Fatal(err) + } + + tb.Cleanup(func() { + _ = os.RemoveAll(path) + }) + + return db, path + } +} + +func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) { + return func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) { + tb.Helper() + return Open(optsSupplier(path)) + } +} diff --git a/lib/blockstore/badger/blockstore_test_suite.go b/lib/blockstore/badger/blockstore_test_suite.go new file mode 100644 index 00000000000..e9eed18f591 --- /dev/null +++ b/lib/blockstore/badger/blockstore_test_suite.go @@ -0,0 +1,307 @@ +package badgerbs + +import ( + "context" + "fmt" + "io" + "reflect" + "strings" + "testing" + + "github.com/filecoin-project/lotus/lib/blockstore" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + u "github.com/ipfs/go-ipfs-util" + + "github.com/stretchr/testify/require" +) + +// TODO: move this to go-ipfs-blockstore. +type Suite struct { + NewBlockstore func(tb testing.TB) (bs blockstore.Blockstore, path string) + OpenBlockstore func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) +} + +func (s *Suite) RunTests(t *testing.T, prefix string) { + v := reflect.TypeOf(s) + f := func(t *testing.T) { + for i := 0; i < v.NumMethod(); i++ { + if m := v.Method(i); strings.HasPrefix(m.Name, "Test") { + f := m.Func.Interface().(func(*Suite, *testing.T)) + t.Run(m.Name, func(t *testing.T) { + f(s, t) + }) + } + } + } + + if prefix == "" { + f(t) + } else { + t.Run(prefix, f) + } +} + +func (s *Suite) TestGetWhenKeyNotPresent(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + c := cid.NewCidV0(u.Hash([]byte("stuff"))) + bl, err := bs.Get(c) + require.Nil(t, bl) + require.Equal(t, blockstore.ErrNotFound, err) +} + +func (s *Suite) TestGetWhenKeyIsNil(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + _, err := bs.Get(cid.Undef) + require.Equal(t, blockstore.ErrNotFound, err) +} + +func (s *Suite) TestPutThenGetBlock(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + orig := blocks.NewBlock([]byte("some data")) + + err := bs.Put(orig) + require.NoError(t, err) + + fetched, err := bs.Get(orig.Cid()) + require.NoError(t, err) + require.Equal(t, orig.RawData(), fetched.RawData()) +} + +func (s *Suite) TestHas(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + orig := blocks.NewBlock([]byte("some data")) + + err := bs.Put(orig) + require.NoError(t, err) + + ok, err := bs.Has(orig.Cid()) + require.NoError(t, err) + require.True(t, ok) + + ok, err = bs.Has(blocks.NewBlock([]byte("another thing")).Cid()) + require.NoError(t, err) + require.False(t, ok) +} + +func (s *Suite) TestCidv0v1(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + orig := blocks.NewBlock([]byte("some data")) + + err := bs.Put(orig) + require.NoError(t, err) + + fetched, err := bs.Get(cid.NewCidV1(cid.DagProtobuf, orig.Cid().Hash())) + require.NoError(t, err) + require.Equal(t, orig.RawData(), fetched.RawData()) +} + +func (s *Suite) TestPutThenGetSizeBlock(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + block := blocks.NewBlock([]byte("some data")) + missingBlock := blocks.NewBlock([]byte("missingBlock")) + emptyBlock := blocks.NewBlock([]byte{}) + + err := bs.Put(block) + require.NoError(t, err) + + blockSize, err := bs.GetSize(block.Cid()) + require.NoError(t, err) + require.Len(t, block.RawData(), blockSize) + + err = bs.Put(emptyBlock) + require.NoError(t, err) + + emptySize, err := bs.GetSize(emptyBlock.Cid()) + require.NoError(t, err) + require.Zero(t, emptySize) + + missingSize, err := bs.GetSize(missingBlock.Cid()) + require.Equal(t, blockstore.ErrNotFound, err) + require.Equal(t, -1, missingSize) +} + +func (s *Suite) TestAllKeysSimple(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + keys := insertBlocks(t, bs, 100) + + ctx := context.Background() + ch, err := bs.AllKeysChan(ctx) + require.NoError(t, err) + actual := collect(ch) + + require.ElementsMatch(t, keys, actual) +} + +func (s *Suite) TestAllKeysRespectsContext(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + _ = insertBlocks(t, bs, 100) + + ctx, cancel := context.WithCancel(context.Background()) + ch, err := bs.AllKeysChan(ctx) + require.NoError(t, err) + + // consume 2, then cancel context. + v, ok := <-ch + require.NotEqual(t, cid.Undef, v) + require.True(t, ok) + + v, ok = <-ch + require.NotEqual(t, cid.Undef, v) + require.True(t, ok) + + cancel() + + v, ok = <-ch + require.Equal(t, cid.Undef, v) + require.False(t, ok) +} + +func (s *Suite) TestDoubleClose(t *testing.T) { + bs, _ := s.NewBlockstore(t) + c, ok := bs.(io.Closer) + if !ok { + t.SkipNow() + } + require.NoError(t, c.Close()) + require.NoError(t, c.Close()) +} + +func (s *Suite) TestReopenPutGet(t *testing.T) { + bs, path := s.NewBlockstore(t) + c, ok := bs.(io.Closer) + if !ok { + t.SkipNow() + } + + orig := blocks.NewBlock([]byte("some data")) + err := bs.Put(orig) + require.NoError(t, err) + + err = c.Close() + require.NoError(t, err) + + bs, err = s.OpenBlockstore(t, path) + require.NoError(t, err) + + fetched, err := bs.Get(orig.Cid()) + require.NoError(t, err) + require.Equal(t, orig.RawData(), fetched.RawData()) +} + +func (s *Suite) TestPutMany(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + blks := []blocks.Block{ + blocks.NewBlock([]byte("foo1")), + blocks.NewBlock([]byte("foo2")), + blocks.NewBlock([]byte("foo3")), + } + err := bs.PutMany(blks) + require.NoError(t, err) + + for _, blk := range blks { + fetched, err := bs.Get(blk.Cid()) + require.NoError(t, err) + require.Equal(t, blk.RawData(), fetched.RawData()) + + ok, err := bs.Has(blk.Cid()) + require.NoError(t, err) + require.True(t, ok) + } + + ch, err := bs.AllKeysChan(context.Background()) + require.NoError(t, err) + + cids := collect(ch) + require.Len(t, cids, 3) +} + +func (s *Suite) TestDelete(t *testing.T) { + bs, _ := s.NewBlockstore(t) + if c, ok := bs.(io.Closer); ok { + defer func() { require.NoError(t, c.Close()) }() + } + + blks := []blocks.Block{ + blocks.NewBlock([]byte("foo1")), + blocks.NewBlock([]byte("foo2")), + blocks.NewBlock([]byte("foo3")), + } + err := bs.PutMany(blks) + require.NoError(t, err) + + err = bs.DeleteBlock(blks[1].Cid()) + require.NoError(t, err) + + ch, err := bs.AllKeysChan(context.Background()) + require.NoError(t, err) + + cids := collect(ch) + require.Len(t, cids, 2) + require.ElementsMatch(t, cids, []cid.Cid{ + cid.NewCidV1(cid.Raw, blks[0].Cid().Hash()), + cid.NewCidV1(cid.Raw, blks[2].Cid().Hash()), + }) + + has, err := bs.Has(blks[1].Cid()) + require.NoError(t, err) + require.False(t, has) + +} + +func insertBlocks(t *testing.T, bs blockstore.Blockstore, count int) []cid.Cid { + keys := make([]cid.Cid, count) + for i := 0; i < count; i++ { + block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) + err := bs.Put(block) + require.NoError(t, err) + // NewBlock assigns a CIDv0; we convert it to CIDv1 because that's what + // the store returns. + keys[i] = cid.NewCidV1(cid.Raw, block.Multihash()) + } + return keys +} + +func collect(ch <-chan cid.Cid) []cid.Cid { + var keys []cid.Cid + for k := range ch { + keys = append(keys, k) + } + return keys +} diff --git a/lib/blockstore/blockstore.go b/lib/blockstore/blockstore.go index 99d8491887f..a1293a83c9f 100644 --- a/lib/blockstore/blockstore.go +++ b/lib/blockstore/blockstore.go @@ -44,6 +44,7 @@ func NewBlockstore(dstore ds.Batching) blockstore.Blockstore { // Alias so other packages don't have to import go-ipfs-blockstore type Blockstore = blockstore.Blockstore +type Viewer = blockstore.Viewer type GCBlockstore = blockstore.GCBlockstore type CacheOpts = blockstore.CacheOpts type GCLocker = blockstore.GCLocker diff --git a/lib/blockstore/memstore.go b/lib/blockstore/memstore.go index 9745d6f0395..ac56cf3e657 100644 --- a/lib/blockstore/memstore.go +++ b/lib/blockstore/memstore.go @@ -8,16 +8,27 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" ) +// MemStore is a terminal blockstore that keeps blocks in memory. type MemStore map[cid.Cid]blocks.Block func (m MemStore) DeleteBlock(k cid.Cid) error { delete(m, k) return nil } + func (m MemStore) Has(k cid.Cid) (bool, error) { _, ok := m[k] return ok, nil } + +func (m MemStore) View(k cid.Cid, callback func([]byte) error) error { + b, ok := m[k] + if !ok { + return blockstore.ErrNotFound + } + return callback(b.RawData()) +} + func (m MemStore) Get(k cid.Cid) (blocks.Block, error) { b, ok := m[k] if !ok { diff --git a/lib/blockstore/syncstore.go b/lib/blockstore/syncstore.go index be9f6b5c40c..86786a0c472 100644 --- a/lib/blockstore/syncstore.go +++ b/lib/blockstore/syncstore.go @@ -8,6 +8,8 @@ import ( "github.com/ipfs/go-cid" ) +// SyncStore is a terminal blockstore that is a synchronized version +// of MemStore. type SyncStore struct { mu sync.RWMutex bs MemStore // specifically use a memStore to save indirection overhead. @@ -18,11 +20,20 @@ func (m *SyncStore) DeleteBlock(k cid.Cid) error { defer m.mu.Unlock() return m.bs.DeleteBlock(k) } + func (m *SyncStore) Has(k cid.Cid) (bool, error) { m.mu.RLock() defer m.mu.RUnlock() return m.bs.Has(k) } + +func (m *SyncStore) View(k cid.Cid, callback func([]byte) error) error { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.bs.View(k, callback) +} + func (m *SyncStore) Get(k cid.Cid) (blocks.Block, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/lib/bufbstore/buf_bstore.go b/lib/bufbstore/buf_bstore.go index d3c5dc4421f..5b21ace5ba9 100644 --- a/lib/bufbstore/buf_bstore.go +++ b/lib/bufbstore/buf_bstore.go @@ -16,6 +16,9 @@ var log = logging.Logger("bufbs") type BufferedBS struct { read bstore.Blockstore write bstore.Blockstore + + readviewer bstore.Viewer + writeviewer bstore.Viewer } func NewBufferedBstore(base bstore.Blockstore) *BufferedBS { @@ -27,10 +30,20 @@ func NewBufferedBstore(base bstore.Blockstore) *BufferedBS { buf = bstore.NewTemporary() } - return &BufferedBS{ + bs := &BufferedBS{ read: base, write: buf, } + if v, ok := base.(bstore.Viewer); ok { + bs.readviewer = v + } + if v, ok := buf.(bstore.Viewer); ok { + bs.writeviewer = v + } + if (bs.writeviewer == nil) != (bs.readviewer == nil) { + log.Warnf("one of the stores is not viewable; running less efficiently") + } + return bs } func NewTieredBstore(r bstore.Blockstore, w bstore.Blockstore) *BufferedBS { @@ -40,7 +53,8 @@ func NewTieredBstore(r bstore.Blockstore, w bstore.Blockstore) *BufferedBS { } } -var _ (bstore.Blockstore) = &BufferedBS{} +var _ bstore.Blockstore = (*BufferedBS)(nil) +var _ bstore.Viewer = (*BufferedBS)(nil) func (bs *BufferedBS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { a, err := bs.read.AllKeysChan(ctx) @@ -93,6 +107,25 @@ func (bs *BufferedBS) DeleteBlock(c cid.Cid) error { return bs.write.DeleteBlock(c) } +func (bs *BufferedBS) View(c cid.Cid, callback func([]byte) error) error { + if bs.writeviewer == nil || bs.readviewer == nil { + // one of the stores isn't Viewer; fall back to pure Get behaviour. + blk, err := bs.Get(c) + if err != nil { + return err + } + return callback(blk.RawData()) + } + + // both stores are viewable. + if err := bs.writeviewer.View(c, callback); err == bstore.ErrNotFound { + // not found in write blockstore; fall through. + } else { + return err // propagate errors, or nil, i.e. found. + } + return bs.readviewer.View(c, callback) +} + func (bs *BufferedBS) Get(c cid.Cid) (block.Block, error) { if out, err := bs.write.Get(c); err != nil { if err != bstore.ErrNotFound { diff --git a/lib/cachebs/cachebs.go b/lib/cachebs/cachebs.go deleted file mode 100644 index 046f100c011..00000000000 --- a/lib/cachebs/cachebs.go +++ /dev/null @@ -1,89 +0,0 @@ -package cachebs - -import ( - "context" - - lru "github.com/hashicorp/golang-lru" - block "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - - bstore "github.com/filecoin-project/lotus/lib/blockstore" -) - -//nolint:deadcode,varcheck -var log = logging.Logger("cachebs") - -type CacheBS struct { - cache *lru.ARCCache - bs bstore.Blockstore -} - -func NewBufferedBstore(base bstore.Blockstore, size int) bstore.Blockstore { - c, err := lru.NewARC(size) - if err != nil { - panic(err) - } - // Wrap this in an ID blockstore to avoid caching blocks inlined into - // CIDs. - return bstore.WrapIDStore(&CacheBS{ - cache: c, - bs: base, - }) -} - -var _ (bstore.Blockstore) = &CacheBS{} - -func (bs *CacheBS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - return bs.bs.AllKeysChan(ctx) -} - -func (bs *CacheBS) DeleteBlock(c cid.Cid) error { - bs.cache.Remove(c) - - return bs.bs.DeleteBlock(c) -} - -func (bs *CacheBS) Get(c cid.Cid) (block.Block, error) { - v, ok := bs.cache.Get(c) - if ok { - return v.(block.Block), nil - } - - out, err := bs.bs.Get(c) - if err != nil { - return nil, err - } - - bs.cache.Add(c, out) - return out, nil -} - -func (bs *CacheBS) GetSize(c cid.Cid) (int, error) { - return bs.bs.GetSize(c) -} - -func (bs *CacheBS) Put(blk block.Block) error { - bs.cache.Add(blk.Cid(), blk) - - return bs.bs.Put(blk) -} - -func (bs *CacheBS) Has(c cid.Cid) (bool, error) { - if bs.cache.Contains(c) { - return true, nil - } - - return bs.bs.Has(c) -} - -func (bs *CacheBS) HashOnRead(hor bool) { - bs.bs.HashOnRead(hor) -} - -func (bs *CacheBS) PutMany(blks []block.Block) error { - for _, blk := range blks { - bs.cache.Add(blk.Cid(), blk) - } - return bs.bs.PutMany(blks) -} diff --git a/node/modules/chain.go b/node/modules/chain.go index e5a0f741280..8c7606ee3be 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -77,12 +77,12 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds } func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) { - blocks, err := r.Datastore("/chain") + bs, err := r.Blockstore(repo.BlockstoreChain) if err != nil { return nil, err } - bs := blockstore.NewBlockstore(blocks) + // TODO potentially replace this cached blockstore by a CBOR cache. cbs, err := blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, blockstore.DefaultCacheOpts()) if err != nil { return nil, err diff --git a/node/repo/blockstore_opts.go b/node/repo/blockstore_opts.go new file mode 100644 index 00000000000..d8d852d8478 --- /dev/null +++ b/node/repo/blockstore_opts.go @@ -0,0 +1,51 @@ +package repo + +import badgerbs "github.com/filecoin-project/lotus/lib/blockstore/badger" + +// BadgerBlockstoreOptions returns the badger options to apply for the provided +// domain. +func BadgerBlockstoreOptions(domain BlockstoreDomain, path string, readonly bool) (badgerbs.Options, error) { + if domain != BlockstoreChain { + return badgerbs.Options{}, ErrInvalidBlockstoreDomain + } + + opts := badgerbs.DefaultOptions(path) + + // Due to legacy usage of blockstore.Blockstore, over a datastore, all + // blocks are prefixed with this namespace. In the future, this can go away, + // in order to shorten keys, but it'll require a migration. + opts.Prefix = "/blocks/" + + // Blockstore values are immutable; therefore we do not expect any + // conflicts to emerge. + opts.DetectConflicts = false + + // This is to optimize the database on close so it can be opened + // read-only and efficiently queried. We don't do that and hanging on + // stop isn't nice. + opts.CompactL0OnClose = false + + // The alternative is "crash on start and tell the user to fix it". This + // will truncate corrupt and unsynced data, which we don't guarantee to + // persist anyways. + opts.Truncate = true + + // We mmap the index and the value logs; this is important to enable + // zero-copy value access. + opts.ValueLogLoadingMode = badgerbs.MemoryMap + opts.TableLoadingMode = badgerbs.MemoryMap + + // Embed only values < 128 bytes in the LSM tree; larger values are stored + // in value logs. + opts.ValueThreshold = 128 + + // Default table size is already 64MiB. This is here to make it explicit. + opts.MaxTableSize = 64 << 20 + + // NOTE: The chain blockstore doesn't require any GC (blocks are never + // deleted). This will change if we move to a tiered blockstore. + + opts.ReadOnly = readonly + + return opts, nil +} diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index c1b6b5233b1..d652c1e548d 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -14,6 +14,7 @@ import ( "github.com/BurntSushi/toml" "github.com/ipfs/go-datastore" fslock "github.com/ipfs/go-fs-lock" + blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" "github.com/multiformats/go-base32" @@ -22,6 +23,8 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/stores" + lblockstore "github.com/filecoin-project/lotus/lib/blockstore" + badgerbs "github.com/filecoin-project/lotus/lib/blockstore/badger" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/config" @@ -257,6 +260,10 @@ type fsLockedRepo struct { dsErr error dsOnce sync.Once + bs blockstore.Blockstore + bsErr error + bsOnce sync.Once + storageLk sync.Mutex configLk sync.Mutex } @@ -279,11 +286,44 @@ func (fsr *fsLockedRepo) Close() error { } } + // type assertion will return ok=false if fsr.bs is nil altogether. + if c, ok := fsr.bs.(io.Closer); ok && c != nil { + if err := c.Close(); err != nil { + return xerrors.Errorf("could not close blockstore: %w", err) + } + } + err = fsr.closer.Close() fsr.closer = nil return err } +// Blockstore returns a blockstore for the provided data domain. +func (fsr *fsLockedRepo) Blockstore(domain BlockstoreDomain) (blockstore.Blockstore, error) { + if domain != BlockstoreChain { + return nil, ErrInvalidBlockstoreDomain + } + + fsr.bsOnce.Do(func() { + path := fsr.join(filepath.Join(fsDatastore, "chain")) + readonly := fsr.readonly + + opts, err := BadgerBlockstoreOptions(domain, path, readonly) + if err != nil { + fsr.bsErr = err + return + } + + bs, err := badgerbs.Open(opts) + if err != nil { + fsr.bsErr = err + } + fsr.bs = lblockstore.WrapIDStore(bs) + }) + + return fsr.bs, fsr.bsErr +} + // join joins path elements with fsr.path func (fsr *fsLockedRepo) join(paths ...string) string { return filepath.Join(append([]string{fsr.path}, paths...)...) diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go index e7746cb8edc..433ddb9b8aa 100644 --- a/node/repo/fsrepo_ds.go +++ b/node/repo/fsrepo_ds.go @@ -16,17 +16,7 @@ import ( type dsCtor func(path string, readonly bool) (datastore.Batching, error) -func ChainBadgerOptions() badger.Options { - opts := badger.DefaultOptions - opts.GcInterval = 0 // disable GC for chain datastore - - opts.Options = dgbadger.DefaultOptions("").WithTruncate(true). - WithValueThreshold(128) - return opts -} - var fsDatastores = map[string]dsCtor{ - "chain": chainBadgerDs, "metadata": levelDs, // Those need to be fast for large writes... but also need a really good GC :c @@ -35,12 +25,6 @@ var fsDatastores = map[string]dsCtor{ "client": badgerDs, // client specific } -func chainBadgerDs(path string, readonly bool) (datastore.Batching, error) { - opts := ChainBadgerOptions() - opts.ReadOnly = readonly - return badger.NewDatastore(path, &opts) -} - func badgerDs(path string, readonly bool) (datastore.Batching, error) { opts := badger.DefaultOptions opts.ReadOnly = readonly diff --git a/node/repo/interface.go b/node/repo/interface.go index c25bcb53485..12f981f01fc 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/multiformats/go-multiaddr" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" @@ -12,11 +13,26 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +// BlockstoreDomain represents the domain of a blockstore. +type BlockstoreDomain string + +const ( + // BlockstoreChain represents the blockstore domain for chain data. + // Right now, this includes chain objects (tipsets, blocks, messages), as + // well as state. In the future, they may get segregated into different + // domains. + BlockstoreChain = BlockstoreDomain("chain") +) + var ( ErrNoAPIEndpoint = errors.New("API not running (no endpoint)") ErrNoAPIToken = errors.New("API token not set") ErrRepoAlreadyLocked = errors.New("repo is already locked (lotus daemon already running)") ErrClosedRepo = errors.New("repo is no longer open") + + // ErrInvalidBlockstoreDomain is returned by LockedRepo#Blockstore() when + // an unrecognized domain is requested. + ErrInvalidBlockstoreDomain = errors.New("invalid blockstore domain") ) type Repo interface { @@ -37,6 +53,9 @@ type LockedRepo interface { // Returns datastore defined in this repo. Datastore(namespace string) (datastore.Batching, error) + // Blockstore returns an IPLD blockstore for the requested domain. + Blockstore(domain BlockstoreDomain) (blockstore.Blockstore, error) + // Returns config in this repo Config() (interface{}, error) SetConfig(func(interface{})) error diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index 34e3637ebb0..88d4eccd32f 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -14,10 +14,10 @@ import ( "github.com/multiformats/go-multiaddr" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" - "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/stores" + "github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/node/config" ) @@ -31,8 +31,9 @@ type MemRepo struct { repoLock chan struct{} token *byte - datastore datastore.Datastore - keystore map[string]types.KeyInfo + datastore datastore.Datastore + keystore map[string]types.KeyInfo + blockstore blockstore.Blockstore // given a repo type, produce the default config configF func(t RepoType) interface{} @@ -158,11 +159,11 @@ func NewMemory(opts *MemRepoOptions) *MemRepo { } return &MemRepo{ - repoLock: make(chan struct{}, 1), - - datastore: opts.Ds, - configF: opts.ConfigF, - keystore: opts.KeyStore, + repoLock: make(chan struct{}, 1), + blockstore: blockstore.WrapIDStore(blockstore.NewTemporarySync()), + datastore: opts.Ds, + configF: opts.ConfigF, + keystore: opts.KeyStore, } } @@ -243,6 +244,13 @@ func (lmem *lockedMemRepo) Datastore(ns string) (datastore.Batching, error) { return namespace.Wrap(lmem.mem.datastore, datastore.NewKey(ns)), nil } +func (lmem *lockedMemRepo) Blockstore(domain BlockstoreDomain) (blockstore.Blockstore, error) { + if domain != BlockstoreChain { + return nil, ErrInvalidBlockstoreDomain + } + return lmem.mem.blockstore, nil +} + func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) { return nil, nil }