Skip to content

Commit

Permalink
chain import: don't walk to genesis
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Apr 5, 2023
1 parent f0ec716 commit 921a3a0
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 22 deletions.
2 changes: 1 addition & 1 deletion chain/store/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestIndexSeeks(t *testing.T) {
cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

_, err = cs.Import(ctx, bytes.NewReader(gencar))
_, _, err = cs.Import(ctx, bytes.NewReader(gencar))
if err != nil {
t.Fatal(err)
}
Expand Down
39 changes: 30 additions & 9 deletions chain/store/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
})
}

func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, error) {
func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (head *types.TipSet, tail *types.BlockHeader, err error) {
// TODO: writing only to the state blockstore is incorrect.
// At this time, both the state and chain blockstores are backed by the
// universal store. When we physically segregate the stores, we will need
Expand All @@ -69,7 +69,7 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e

br, err := carv2.NewBlockReader(r)
if err != nil {
return nil, xerrors.Errorf("loadcar failed: %w", err)
return nil, nil, xerrors.Errorf("loadcar failed: %w", err)
}

s := cs.StateBlockstore()
Expand All @@ -80,27 +80,44 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
putThrottle <- nil
}

nextTailCid := br.Roots[0]
var tailBlock types.BlockHeader
tailBlock.Height = abi.ChainEpoch(-1)

var buf []blocks.Block
for {
blk, err := br.Next()
if err != nil {

// we're at the end
if err == io.EOF {
if len(buf) > 0 {
if err := s.PutMany(ctx, buf); err != nil {
return nil, err
return nil, nil, err
}
}

break
}
return nil, err
return nil, nil, err
}

// check for header block, looking for genesis
if blk.Cid() == nextTailCid && tailBlock.Height != 0 {
if err := tailBlock.UnmarshalCBOR(bytes.NewReader(blk.RawData())); err != nil {
return nil, nil, xerrors.Errorf("failed to unmarshal tail block: %w", err)
}
if len(tailBlock.Parents) > 0 {
nextTailCid = tailBlock.Parents[0]
}
}

// append to batch
buf = append(buf, blk)

if len(buf) > 1000 {
if lastErr := <-putThrottle; lastErr != nil { // consume one error to have the right to add one
return nil, lastErr
return nil, nil, lastErr
}

go func(buf []blocks.Block) {
Expand All @@ -113,20 +130,24 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
// check errors
for i := 0; i < parallelPuts; i++ {
if lastErr := <-putThrottle; lastErr != nil {
return nil, lastErr
return nil, nil, lastErr
}
}

if tailBlock.Height != 0 {
return nil, nil, xerrors.Errorf("expected tail block to have height 0 (genesis), got %d: %s", tailBlock.Height, tailBlock.Cid())
}

root, err := cs.LoadTipSet(ctx, types.NewTipSetKey(br.Roots...))
if err != nil {
return nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
return nil, nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
}

ts := root
for i := 0; i < int(TipsetkeyBackfillRange); i++ {
err = cs.PersistTipset(ctx, ts)
if err != nil {
return nil, err
return nil, nil, err
}
parentTsKey := ts.Parents()
ts, err = cs.LoadTipSet(ctx, parentTsKey)
Expand All @@ -136,7 +157,7 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
}
}

return root, nil
return root, &tailBlock, nil
}

type walkSchedTaskType int
Expand Down
6 changes: 3 additions & 3 deletions chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestChainExportImport(t *testing.T) {
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

root, err := cs.Import(context.TODO(), buf)
root, _, err := cs.Import(context.TODO(), buf)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestChainImportTipsetKeyCid(t *testing.T) {
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

root, err := cs.Import(ctx, buf)
root, _, err := cs.Import(ctx, buf)
require.NoError(t, err)

require.Truef(t, root.Equals(last), "imported chain differed from exported chain")
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestChainExportImportFull(t *testing.T) {
cs := store.NewChainStore(nbs, nbs, ds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck

root, err := cs.Import(context.TODO(), buf)
root, _, err := cs.Import(context.TODO(), buf)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-bench/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ var importBenchCmd = &cli.Command{
return fmt.Errorf("no CAR file provided for import")
}

head, err = cs.Import(cctx.Context, carFile)
head, _, err = cs.Import(cctx.Context, carFile)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/genesis-verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var genesisVerifyCmd = &cli.Command{
return xerrors.Errorf("opening the car file: %w", err)
}

ts, err := cs.Import(cctx.Context, f)
ts, _, err := cs.Import(cctx.Context, f)
if err != nil {
return err
}
Expand Down
10 changes: 3 additions & 7 deletions cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
}

bar.Start()
ts, err := cst.Import(ctx, ir)
ts, gen, err := cst.Import(ctx, ir)
bar.Finish()

if err != nil {
Expand All @@ -545,12 +545,8 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
return xerrors.Errorf("flushing validation cache failed: %w", err)
}

gb, err := cst.GetTipsetByHeight(ctx, 0, ts, true)
if err != nil {
return err
}

err = cst.SetGenesis(ctx, gb.Blocks()[0])
log.Infof("setting genesis")
err = cst.SetGenesis(ctx, gen)
if err != nil {
return err
}
Expand Down

0 comments on commit 921a3a0

Please sign in to comment.