Skip to content

Commit

Permalink
Problem: versiondb don't support restore from local snapshot
Browse files Browse the repository at this point in the history
to setup a pruned versiondb node, we need to restore versiondb from local snapshot.

Solution:
- implement a new subcommand for that.
  • Loading branch information
yihuang committed Jul 25, 2023
1 parent ed242b2 commit e64b167
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- [#1083](https://github.com/crypto-org-chain/cronos/pull/1083) memiavl support both sdk 46 and 47 root hash rules.
- [#1091](https://github.com/crypto-org-chain/cronos/pull/1091) memiavl support rollback.
- [#1100](https://github.com/crypto-org-chain/cronos/pull/1100) memiavl support read-only mode, and grab exclusive lock for write mode.
- [#]() versiondb support restore from local snapshot.

### Improvements

Expand Down
1 change: 1 addition & 0 deletions versiondb/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func ChangeSetGroupCmd(opts Options) *cobra.Command {
IngestVersionDBSSTCmd(),
ChangeSetToVersionDBCmd(),
RestoreAppDBCmd(opts),
RestoreVersionDBCmd(),
)
return cmd
}
121 changes: 121 additions & 0 deletions versiondb/client/restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package client

import (
"fmt"
"io"
"math"
"path/filepath"
"strconv"

"cosmossdk.io/errors"
protoio "github.com/gogo/protobuf/io"
"github.com/spf13/cobra"

"github.com/cosmos/cosmos-sdk/server"
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"

"github.com/crypto-org-chain/cronos/versiondb"
"github.com/crypto-org-chain/cronos/versiondb/tsrocksdb"
)

// RestoreVersionDBCmd returns a command to restore a versiondb from local snapshot
func RestoreVersionDBCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "restore <height> <format>",
Short: "Restore initial versiondb from local snapshot",
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := server.GetServerContextFromCmd(cmd)

height, err := strconv.ParseUint(args[0], 10, 63)
if err != nil {
return err
}
format, err := strconv.ParseUint(args[1], 10, 32)
if err != nil {
return err
}

store, err := server.GetSnapshotStore(ctx.Viper)
if err != nil {
return err
}

snapshot, chChunks, err := store.Load(height, uint32(format))
if err != nil {
return err
}

if snapshot == nil {
return fmt.Errorf("snapshot doesn't exist, height: %d, format: %d", height, format)
}

streamReader, err := snapshots.NewStreamReader(chChunks)
if err != nil {
return err
}
defer streamReader.Close()

home := ctx.Config.RootDir
versionDB, err := tsrocksdb.NewStore(filepath.Join(home, "data", "versiondb"))
if err != nil {
return err
}

ch := make(chan versiondb.ImportEntry, 128)

go func() {
defer close(ch)

if err := readSnapshotEntries(streamReader, ch); err != nil {
ctx.Logger.Error("failed to read snapshot entries", "err", err)
}
}()

return versionDB.Import(int64(height), ch)
},
}
return cmd
}

// readSnapshotEntries reads key-value entries from protobuf reader and feed to the channel
func readSnapshotEntries(protoReader protoio.Reader, ch chan<- versiondb.ImportEntry) error {
var (
snapshotItem snapshottypes.SnapshotItem
storeKey string
)

loop:
for {
snapshotItem = snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(&snapshotItem)
if err == io.EOF {
break
} else if err != nil {
return errors.Wrap(err, "invalid protobuf message")
}

switch item := snapshotItem.Item.(type) {
case *snapshottypes.SnapshotItem_Store:
storeKey = item.Store.Name
case *snapshottypes.SnapshotItem_IAVL:
if storeKey == "" {
return errors.Wrap(err, "invalid protobuf message, store name is empty")
}
if item.IAVL.Height > math.MaxInt8 {
return fmt.Errorf("node height %v cannot exceed %v",
item.IAVL.Height, math.MaxInt8)
}
ch <- versiondb.ImportEntry{
StoreKey: storeKey,
Key: item.IAVL.Key,
Value: item.IAVL.Value,
}
default:
break loop
}
}

return nil
}
33 changes: 33 additions & 0 deletions versiondb/tsrocksdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (

StorePrefixTpl = "s/k:%s/"
latestVersionKey = "s/latest"

ImportCommitBatchSize = 10000
)

var (
Expand Down Expand Up @@ -167,6 +169,37 @@ func (s Store) FeedChangeSet(version int64, store string, changeSet *iavl.Change
return s.db.Write(defaultWriteOpts, batch)
}

// Import loads the initial version of the state
func (s Store) Import(version int64, ch <-chan versiondb.ImportEntry) error {
batch := grocksdb.NewWriteBatch()
defer batch.Destroy()

var ts [TimestampSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))

var counter int
for entry := range ch {
key := cloneAppend(storePrefix(entry.StoreKey), entry.Key)
batch.PutCFWithTS(s.cfHandle, key, ts[:], entry.Value)

counter++
if counter%ImportCommitBatchSize == 0 {
if err := s.db.Write(defaultWriteOpts, batch); err != nil {
return err
}
batch.Clear()
}
}

if batch.Count() > 0 {
if err := s.db.Write(defaultWriteOpts, batch); err != nil {
return err
}
}

return s.SetLatestVersion(version)
}

func newTSReadOptions(version *int64) *grocksdb.ReadOptions {
var ver uint64
if version == nil {
Expand Down
9 changes: 9 additions & 0 deletions versiondb/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,13 @@ type VersionStore interface {
// the `changeSet` should be ordered by (storeKey, key),
// the version should be latest version plus one.
PutAtVersion(version int64, changeSet []types.StoreKVPair) error

// Import the initial state of the store
Import(version int64, ch <-chan ImportEntry) error
}

type ImportEntry struct {
StoreKey string
Key []byte
Value []byte
}

0 comments on commit e64b167

Please sign in to comment.