From 201e83495f1dc79d0aaa5fc4d38da5e750d846cc Mon Sep 17 00:00:00 2001 From: strahe Date: Wed, 19 Jan 2022 13:19:54 +0800 Subject: [PATCH] fix: Separate chain and state stores. --- blockstore/splitstore/splitstore.go | 8 ++++ chain/store/messages.go | 4 +- chain/store/store.go | 4 ++ node/builder_chain.go | 2 + node/modules/blockstore.go | 23 +++++++++++ node/modules/genesis.go | 63 ++++++++++++++++++++--------- 6 files changed, 83 insertions(+), 21 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index f6715ea333c..64306f9bb02 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -611,3 +611,11 @@ func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error { s.baseEpoch = epoch return s.ds.Put(s.ctx, baseEpochKey, epochToBytes(epoch)) } + +func (s *SplitStore) HotBlockStore() bstore.Blockstore { + return s.hot.(bstore.Blockstore) +} + +func (s *SplitStore) ColdBlockStore() bstore.Blockstore { + return s.cold.(bstore.Blockstore) +} diff --git a/chain/store/messages.go b/chain/store/messages.go index 4dd3bfc1d9f..193dfe77d79 100644 --- a/chain/store/messages.go +++ b/chain/store/messages.go @@ -73,7 +73,7 @@ func (cs *ChainStore) GetSignedMessage(ctx context.Context, c cid.Cid) (*types.S func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) { ctx := context.TODO() // block headers use adt0, for now. - a, err := blockadt.AsArray(cs.ActorStore(ctx), root) + a, err := blockadt.AsArray(cs.ChainAdtStore(ctx), root) if err != nil { return nil, xerrors.Errorf("amt load: %w", err) } @@ -258,7 +258,7 @@ func (cs *ChainStore) MessagesForBlock(ctx context.Context, b *types.BlockHeader func (cs *ChainStore) GetParentReceipt(ctx context.Context, b *types.BlockHeader, i int) (*types.MessageReceipt, error) { // block headers use adt0, for now. - a, err := blockadt.AsArray(cs.ActorStore(ctx), b.ParentMessageReceipts) + a, err := blockadt.AsArray(cs.ChainAdtStore(ctx), b.ParentMessageReceipts) if err != nil { return nil, xerrors.Errorf("amt load: %w", err) } diff --git a/chain/store/store.go b/chain/store/store.go index d86a66f1285..9bce4c01634 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -1108,6 +1108,10 @@ func (cs *ChainStore) ActorStore(ctx context.Context) adt.Store { return ActorStore(ctx, cs.stateBlockstore) } +func (cs *ChainStore) ChainAdtStore(ctx context.Context) adt.Store { + return adt.WrapStore(ctx, cbor.NewCborStore(cs.chainLocalBlockstore)) +} + func (cs *ChainStore) TryFillTipSet(ctx context.Context, ts *types.TipSet) (*FullTipSet, error) { var out []*types.FullBlock diff --git a/node/builder_chain.go b/node/builder_chain.go index 11283ec3a09..0f678c1f47e 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -198,6 +198,8 @@ func ConfigFullNode(c interface{}) Option { Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))), Override(new(dtypes.GCReferenceProtector), modules.NoopGCReferenceProtector), ), + If(os.Getenv("LOTUS_SEPARATE_CHAINSTORE_STATESTORE") == "1", + Override(new(dtypes.BasicStateBlockstore), modules.BadgerStateBlockstore)), Override(new(dtypes.ChainBlockstore), From(new(dtypes.BasicChainBlockstore))), Override(new(dtypes.StateBlockstore), From(new(dtypes.BasicStateBlockstore))), diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 2486b9744d5..ba695b3c71f 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -70,6 +70,29 @@ func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlocksto return bs, nil } +func BadgerStateBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.BasicStateBlockstore, error) { + path := filepath.Join(r.Path(), "datastore", "state") + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + + opts := badgerbs.DefaultOptions(path) + opts.ReadOnly = r.Readonly() + + bs, err := badgerbs.Open(opts) + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return bs.Close() + }}) + + return bs, nil +} + + func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { path, err := r.SplitstorePath() diff --git a/node/modules/genesis.go b/node/modules/genesis.go index 03b4e2907ef..1f5c0e1a8d3 100644 --- a/node/modules/genesis.go +++ b/node/modules/genesis.go @@ -2,6 +2,9 @@ package modules import ( "bytes" + "context" + "github.com/filecoin-project/lotus/blockstore" + "golang.org/x/sync/errgroup" "os" "go.uber.org/fx" @@ -22,31 +25,53 @@ func ErrorGenesis() Genesis { } } -func LoadGenesis(genBytes []byte) func(fx.Lifecycle, helpers.MetricsCtx, dtypes.ChainBlockstore) Genesis { - return func(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ChainBlockstore) Genesis { +func LoadGenesis(genBytes []byte) func(fx.Lifecycle, helpers.MetricsCtx, dtypes.ChainBlockstore, dtypes.StateBlockstore) Genesis { + return func(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ChainBlockstore, ss dtypes.StateBlockstore) Genesis { return func() (header *types.BlockHeader, e error) { ctx := helpers.LifecycleCtx(mctx, lc) - c, err := car.LoadCar(ctx, bs, bytes.NewReader(genBytes)) - if err != nil { - return nil, xerrors.Errorf("loading genesis car file failed: %w", err) - } - if len(c.Roots) != 1 { - return nil, xerrors.New("expected genesis file to have one root") - } - root, err := bs.Get(ctx, c.Roots[0]) - if err != nil { - return nil, err - } - - h, err := types.DecodeBlock(root.RawData()) - if err != nil { - return nil, xerrors.Errorf("decoding block failed: %w", err) - } - return h, nil + genBytes2 := make([]byte, len(genBytes)) + copy(genBytes2, genBytes) + grp, _ := errgroup.WithContext(context.TODO()) + grp.Go(func() error { + h, err := loadGenesis(ctx, bs, genBytes) + if err == nil { + header = h + } + return err + }) + grp.Go(func() error { + h, err := loadGenesis(ctx, ss, genBytes2) + if err == nil { + header = h + } + return err + }) + e = grp.Wait() + return } } } +func loadGenesis(ctx context.Context, bs blockstore.Blockstore, genBytes []byte) (header *types.BlockHeader, e error) { + c, err := car.LoadCar(ctx, bs, bytes.NewReader(genBytes)) + if err != nil { + return nil, xerrors.Errorf("loading genesis car file failed: %w", err) + } + if len(c.Roots) != 1 { + return nil, xerrors.New("expected genesis file to have one root") + } + root, err := bs.Get(ctx, c.Roots[0]) + if err != nil { + return nil, err + } + + h, err := types.DecodeBlock(root.RawData()) + if err != nil { + return nil, xerrors.Errorf("decoding block failed: %w", err) + } + return h, nil +} + func DoSetGenesis(_ dtypes.AfterGenesisSet) {} func SetGenesis(lc fx.Lifecycle, mctx helpers.MetricsCtx, cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {