Skip to content

Commit

Permalink
fix: safe batch write (#838)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored Oct 20, 2023
1 parent 10c9f1e commit fa35c63
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 40 deletions.
2 changes: 1 addition & 1 deletion migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestLazySet(t *testing.T) {
}

func TestLegacyReferenceNode(t *testing.T) {
legacyVersion := 10
legacyVersion := 20
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion)
require.NoError(t, err)
Expand Down
34 changes: 19 additions & 15 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,12 @@ func (tree *MutableTree) enableFastStorageAndCommit() error {
return err
}

if err = tree.ndb.setFastStorageVersionToBatch(); err != nil {
latestVersion, err := tree.ndb.getLatestVersion()
if err != nil {
return err
}

if err = tree.ndb.setFastStorageVersionToBatch(latestVersion); err != nil {
return err
}

Expand Down Expand Up @@ -733,6 +738,13 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
}

tree.logger.Debug("SAVE TREE", "version", version)

// save new fast nodes
if !tree.skipFastStorageUpgrade {
if err := tree.saveFastNodeVersion(version, isGenesis); err != nil {
return nil, version, err
}
}
// save new nodes
if tree.root == nil {
if err := tree.ndb.SaveEmptyRoot(version); err != nil {
Expand Down Expand Up @@ -760,18 +772,11 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
}
}

tree.ndb.resetLatestVersion(version)

if !tree.skipFastStorageUpgrade {
if err := tree.saveFastNodeVersion(isGenesis); err != nil {
return nil, version, err
}
}

if err := tree.ndb.Commit(); err != nil {
return nil, version, err
}

tree.ndb.resetLatestVersion(version)
tree.version = version

// set new working tree
Expand All @@ -782,19 +787,17 @@ func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
tree.unsavedFastNodeRemovals = &sync.Map{}
}

hash := tree.Hash()

return hash, version, nil
return tree.Hash(), version, nil
}

func (tree *MutableTree) saveFastNodeVersion(isGenesis bool) error {
func (tree *MutableTree) saveFastNodeVersion(latestVersion int64, isGenesis bool) error {
if err := tree.saveFastNodeAdditions(isGenesis); err != nil {
return err
}
if err := tree.saveFastNodeRemovals(); err != nil {
return err
}
return tree.ndb.setFastStorageVersionToBatch()
return tree.ndb.setFastStorageVersionToBatch(latestVersion)
}

func (tree *MutableTree) getUnsavedFastNodeAdditions() map[string]*fastnode.Node {
Expand Down Expand Up @@ -1039,7 +1042,6 @@ func (tree *MutableTree) saveNewNodes(version int64) error {
version: version,
nonce: nonce,
}
newNodes = append(newNodes, node)

var err error
// the inner nodes should have two children.
Expand All @@ -1055,6 +1057,8 @@ func (tree *MutableTree) saveNewNodes(version int64) error {
}

node._hash(version)
newNodes = append(newNodes, node)

return node.nodeKey.GetKey(), nil
}

Expand Down
7 changes: 1 addition & 6 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (ndb *nodeDB) SaveFastNodeNoCache(node *fastnode.Node) error {
// setFastStorageVersionToBatch sets storage version to fast where the version is
// 1.1.0-<version of the current live state>. Returns error if storage version is incorrect or on
// db error, nil otherwise. Requires changes to be committed after to be persisted.
func (ndb *nodeDB) setFastStorageVersionToBatch() error {
func (ndb *nodeDB) setFastStorageVersionToBatch(latestVersion int64) error {
var newVersion string
if ndb.storageVersion >= fastStorageVersionValue {
// Storage version should be at index 0 and latest fast cache version at index 1
Expand All @@ -262,11 +262,6 @@ func (ndb *nodeDB) setFastStorageVersionToBatch() error {
newVersion = fastStorageVersionValue
}

latestVersion, err := ndb.getLatestVersion()
if err != nil {
return err
}

newVersion += fastStorageVersionDelimiter + strconv.Itoa(int(latestVersion))

if err := ndb.batch.Set(metadataKeyFormat.Key([]byte(storageVersionKey)), []byte(newVersion)); err != nil {
Expand Down
26 changes: 10 additions & 16 deletions nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ func TestSetStorageVersion_Success(t *testing.T) {
ndb := newNodeDB(db, 0, DefaultOptions(), log.NewNopLogger())
require.Equal(t, defaultStorageVersionValue, ndb.getStorageVersion())

err := ndb.setFastStorageVersionToBatch()
latestVersion, err := ndb.getLatestVersion()
require.NoError(t, err)

latestVersion, err := ndb.getLatestVersion()
err = ndb.setFastStorageVersionToBatch(latestVersion)
require.NoError(t, err)

require.Equal(t, expectedVersion+fastStorageVersionDelimiter+strconv.Itoa(int(latestVersion)), ndb.getStorageVersion())
require.NoError(t, ndb.batch.Write())
}
Expand All @@ -101,7 +102,6 @@ func TestSetStorageVersion_DBFailure_OldKept(t *testing.T) {
ctrl := gomock.NewController(t)
dbMock := mock.NewMockDB(ctrl)
batchMock := mock.NewMockBatch(ctrl)
rIterMock := mock.NewMockIterator(ctrl)

expectedErrorMsg := "some db error"

Expand All @@ -110,19 +110,13 @@ func TestSetStorageVersion_DBFailure_OldKept(t *testing.T) {
dbMock.EXPECT().Get(gomock.Any()).Return([]byte(defaultStorageVersionValue), nil).Times(1)
dbMock.EXPECT().NewBatchWithSize(gomock.Any()).Return(batchMock).Times(1)

// rIterMock is used to get the latest version from disk. We are mocking that rIterMock returns latestTreeVersion from disk
rIterMock.EXPECT().Valid().Return(true).Times(1)
rIterMock.EXPECT().Key().Return(nodeKeyFormat.Key(GetRootKey(int64(expectedFastCacheVersion)))).Times(1)
rIterMock.EXPECT().Close().Return(nil).Times(1)

dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)
batchMock.EXPECT().GetByteSize().Return(100, nil).Times(1)
batchMock.EXPECT().Set(metadataKeyFormat.Key([]byte(storageVersionKey)), []byte(fastStorageVersionValue+fastStorageVersionDelimiter+strconv.Itoa(expectedFastCacheVersion))).Return(errors.New(expectedErrorMsg)).Times(1)

ndb := newNodeDB(dbMock, 0, DefaultOptions(), log.NewNopLogger())
require.Equal(t, defaultStorageVersionValue, ndb.getStorageVersion())

err := ndb.setFastStorageVersionToBatch()
err := ndb.setFastStorageVersionToBatch(int64(expectedFastCacheVersion))
require.Error(t, err)
require.Equal(t, expectedErrorMsg, err.Error())
require.Equal(t, defaultStorageVersionValue, ndb.getStorageVersion())
Expand All @@ -143,7 +137,7 @@ func TestSetStorageVersion_InvalidVersionFailure_OldKept(t *testing.T) {
ndb := newNodeDB(dbMock, 0, DefaultOptions(), log.NewNopLogger())
require.Equal(t, invalidStorageVersion, ndb.getStorageVersion())

err := ndb.setFastStorageVersionToBatch()
err := ndb.setFastStorageVersionToBatch(0)
require.Error(t, err)
require.Equal(t, expectedErrorMsg, err)
require.Equal(t, invalidStorageVersion, ndb.getStorageVersion())
Expand All @@ -155,7 +149,7 @@ func TestSetStorageVersion_FastVersionFirst_VersionAppended(t *testing.T) {
ndb.storageVersion = fastStorageVersionValue
ndb.latestVersion = 100

err := ndb.setFastStorageVersionToBatch()
err := ndb.setFastStorageVersionToBatch(ndb.latestVersion)
require.NoError(t, err)
require.Equal(t, fastStorageVersionValue+fastStorageVersionDelimiter+strconv.Itoa(int(ndb.latestVersion)), ndb.storageVersion)
}
Expand All @@ -169,7 +163,7 @@ func TestSetStorageVersion_FastVersionSecond_VersionAppended(t *testing.T) {
storageVersionBytes[len(fastStorageVersionValue)-1]++ // increment last byte
ndb.storageVersion = string(storageVersionBytes)

err := ndb.setFastStorageVersionToBatch()
err := ndb.setFastStorageVersionToBatch(ndb.latestVersion)
require.NoError(t, err)
require.Equal(t, string(storageVersionBytes)+fastStorageVersionDelimiter+strconv.Itoa(int(ndb.latestVersion)), ndb.storageVersion)
}
Expand All @@ -183,12 +177,12 @@ func TestSetStorageVersion_SameVersionTwice(t *testing.T) {
storageVersionBytes[len(fastStorageVersionValue)-1]++ // increment last byte
ndb.storageVersion = string(storageVersionBytes)

err := ndb.setFastStorageVersionToBatch()
err := ndb.setFastStorageVersionToBatch(ndb.latestVersion)
require.NoError(t, err)
newStorageVersion := string(storageVersionBytes) + fastStorageVersionDelimiter + strconv.Itoa(int(ndb.latestVersion))
require.Equal(t, newStorageVersion, ndb.storageVersion)

err = ndb.setFastStorageVersionToBatch()
err = ndb.setFastStorageVersionToBatch(ndb.latestVersion)
require.NoError(t, err)
require.Equal(t, newStorageVersion, ndb.storageVersion)
}
Expand Down Expand Up @@ -408,7 +402,7 @@ func TestDeleteVersionsFromNoDeadlock(t *testing.T) {
ndb := newNodeDB(db, 0, DefaultOptions(), log.NewNopLogger())
require.Equal(t, defaultStorageVersionValue, ndb.getStorageVersion())

err := ndb.setFastStorageVersionToBatch()
err := ndb.setFastStorageVersionToBatch(ndb.latestVersion)
require.NoError(t, err)

latestVersion, err := ndb.getLatestVersion()
Expand Down
4 changes: 2 additions & 2 deletions tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1780,8 +1780,8 @@ func TestNodeCacheStatisic(t *testing.T) {
cacheSize: numKeyVals,
expectFastCacheHitCnt: numKeyVals,
expectFastCacheMissCnt: 0,
expectCacheHitCnt: 0,
expectCacheMissCnt: 1,
expectCacheHitCnt: 1,
expectCacheMissCnt: 0,
},
"without_cache": {
cacheSize: 0,
Expand Down

0 comments on commit fa35c63

Please sign in to comment.