From eb53a7653a14f2adb34cf3184f3d2c0d6f4866e7 Mon Sep 17 00:00:00 2001 From: cool-developer <51834436+cool-develope@users.noreply.github.com> Date: Tue, 9 Apr 2024 16:48:48 -0500 Subject: [PATCH] feat: deferred pruning (#925) Co-authored-by: Marko --- CHANGELOG.md | 6 ++ migrate_test.go | 13 ---- mutable_tree.go | 11 ++++ nodedb.go | 156 +++++++++++++++++++++++++++++++++++++----------- options.go | 10 ++++ prune_test.go | 69 +++++++++++++++++++++ 6 files changed, 217 insertions(+), 48 deletions(-) create mode 100644 prune_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f2f67922..a181df85a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Improvements + +- [#925](https://github.com/cosmos/iavl/pull/925) Add the `AsyncPruning` option to the `MutableTree` constructor to enable async pruning. + ## v1.1.2 April 8, 2024 ### Bug Fixes diff --git a/migrate_test.go b/migrate_test.go index adc238dd1..717118d5d 100644 --- a/migrate_test.go +++ b/migrate_test.go @@ -8,7 +8,6 @@ import ( "os/exec" "path" "testing" - "time" "cosmossdk.io/log" "github.com/stretchr/testify/require" @@ -255,18 +254,6 @@ func TestPruning(t *testing.T) { } } - // Wait for pruning to finish - for i := 0; i < 100; i++ { - _, _, err := tree.SaveVersion() - require.NoError(t, err) - isLeacy, err := tree.ndb.hasLegacyVersion(int64(legacyVersion)) - require.NoError(t, err) - if !isLeacy { - break - } - // Simulate the consensus state update - time.Sleep(500 * time.Millisecond) - } // Reload the tree tree = NewMutableTree(db, 0, false, log.NewNopLogger()) versions := tree.AvailableVersions() diff --git a/mutable_tree.go b/mutable_tree.go index 3ec57eb87..c794efd43 100644 --- a/mutable_tree.go +++ b/mutable_tree.go @@ -688,6 +688,17 @@ func (tree *MutableTree) GetVersioned(key []byte, version int64) ([]byte, error) return nil, nil } +// SetCommitting sets a flag to indicate that the tree is in the process of being saved. +// This is used to prevent parallel writing from async pruning. +func (tree *MutableTree) SetCommitting() { + tree.ndb.SetCommitting() +} + +// UnsetCommitting unsets the flag to indicate that the tree is no longer in the process of being saved. +func (tree *MutableTree) UnsetCommitting() { + tree.ndb.UnsetCommitting() +} + // SaveVersion saves a new tree version to disk, based on the current state of // the tree. Returns the hash and new version number. func (tree *MutableTree) SaveVersion() ([]byte, int64, error) { diff --git a/nodedb.go b/nodedb.go index b00aa6fac..1d4ae21f2 100644 --- a/nodedb.go +++ b/nodedb.go @@ -37,10 +37,6 @@ const ( defaultStorageVersionValue = "1.0.0" fastStorageVersionValue = "1.1.0" fastNodeCacheSize = 100000 - - // This is used to avoid the case which pruning blocks the main process. - deleteBatchCount = 1000 - deletePauseDuration = 100 * time.Millisecond ) var ( @@ -87,9 +83,12 @@ type nodeDB struct { storageVersion string // Storage version firstVersion int64 // First version of nodeDB. latestVersion int64 // Latest version of nodeDB. + pruneVersion int64 // Version to prune up to. legacyLatestVersion int64 // Latest version of nodeDB in legacy format. nodeCache cache.Cache // Cache for nodes in the regular tree that consists of key-value pairs at any version. fastNodeCache cache.Cache // Cache for nodes in the fast index that represents only key-value pairs at the latest version. + isCommitting bool // Flag to indicate that the nodeDB is committing. + chCommitting chan struct{} // Channel to signal that the committing is done. } func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg log.Logger) *nodeDB { @@ -99,7 +98,7 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg log.Logger) *nodeDB { storeVersion = []byte(defaultStorageVersionValue) } - return &nodeDB{ + ndb := &nodeDB{ logger: lg, db: db, batch: NewBatchWithFlusher(db, opts.FlushThreshold), @@ -107,11 +106,19 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg log.Logger) *nodeDB { firstVersion: 0, latestVersion: 0, // initially invalid legacyLatestVersion: 0, + pruneVersion: 0, nodeCache: cache.New(cacheSize), fastNodeCache: cache.New(fastNodeCacheSize), versionReaders: make(map[int64]uint32, 8), storageVersion: string(storeVersion), + chCommitting: make(chan struct{}, 1), + } + + if opts.AsyncPruning { + go ndb.startPruning() } + + return ndb } // GetNode gets a node from memory or disk. If it is an inner node, it does not @@ -244,6 +251,33 @@ func (ndb *nodeDB) SaveFastNodeNoCache(node *fastnode.Node) error { return ndb.saveFastNodeUnlocked(node, false) } +// SetCommitting sets the committing flag to true. +// This is used to let the pruning process know that the nodeDB is committing. +func (ndb *nodeDB) SetCommitting() { + for len(ndb.chCommitting) > 0 { + <-ndb.chCommitting + } + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + ndb.isCommitting = true +} + +// UnsetCommitting sets the committing flag to false. +// This is used to let the pruning process know that the nodeDB is done committing. +func (ndb *nodeDB) UnsetCommitting() { + ndb.mtx.Lock() + ndb.isCommitting = false + ndb.mtx.Unlock() + ndb.chCommitting <- struct{}{} +} + +// IsCommitting returns true if the nodeDB is committing, false otherwise. +func (ndb *nodeDB) IsCommitting() bool { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + return ndb.isCommitting +} + // SetFastStorageVersionToBatch sets storage version to fast where the version is // 1.1.0-. Returns error if storage version is incorrect or on // db error, nil otherwise. Requires changes to be committed after to be persisted. @@ -331,6 +365,37 @@ func (ndb *nodeDB) Has(nk []byte) (bool, error) { return ndb.db.Has(ndb.nodeKey(nk)) } +// deleteFromPruning deletes the orphan nodes from the pruning process. +func (ndb *nodeDB) deleteFromPruning(key []byte) error { + if ndb.IsCommitting() { + // if the nodeDB is committing, the pruning process will be done after the committing. + <-ndb.chCommitting + } + + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + return ndb.batch.Delete(key) +} + +// saveNodeFromPruning saves the orphan nodes to the pruning process. +func (ndb *nodeDB) saveNodeFromPruning(node *Node) error { + if ndb.IsCommitting() { + // if the nodeDB is committing, the pruning process will be done after the committing. + <-ndb.chCommitting + } + + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + + // Save node bytes to db. + var buf bytes.Buffer + buf.Grow(node.encodedSize()) + if err := node.writeBytes(&buf); err != nil { + return err + } + return ndb.batch.Set(ndb.nodeKey(node.GetKey()), buf.Bytes()) +} + // deleteVersion deletes a tree version from disk. // deletes orphans func (ndb *nodeDB) deleteVersion(version int64) error { @@ -343,7 +408,7 @@ func (ndb *nodeDB) deleteVersion(version int64) error { if orphan.nodeKey.nonce == 0 && !orphan.isLegacy { // if the orphan is a reformatted root, it can be a legacy root // so it should be removed from the pruning process. - if err := ndb.batch.Delete(ndb.legacyNodeKey(orphan.hash)); err != nil { + if err := ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash)); err != nil { return err } } @@ -355,9 +420,9 @@ func (ndb *nodeDB) deleteVersion(version int64) error { } nk := orphan.GetKey() if orphan.isLegacy { - return ndb.batch.Delete(ndb.legacyNodeKey(nk)) + return ndb.deleteFromPruning(ndb.legacyNodeKey(nk)) } - return ndb.batch.Delete(ndb.nodeKey(nk)) + return ndb.deleteFromPruning(ndb.nodeKey(nk)) }); err != nil { return err } @@ -366,7 +431,7 @@ func (ndb *nodeDB) deleteVersion(version int64) error { if rootKey == nil || !bytes.Equal(rootKey, literalRootKey) { // if the root key is not matched with the literal root key, it means the given root // is a reference root to the previous version. - if err := ndb.batch.Delete(ndb.nodeKey(literalRootKey)); err != nil { + if err := ndb.deleteFromPruning(ndb.nodeKey(literalRootKey)); err != nil { return err } } @@ -382,12 +447,12 @@ func (ndb *nodeDB) deleteVersion(version int64) error { return err } // ensure that the given version is not included in the root search - if err := ndb.batch.Delete(ndb.nodeKey(literalRootKey)); err != nil { + if err := ndb.deleteFromPruning(ndb.nodeKey(literalRootKey)); err != nil { return err } // instead, the root should be reformatted to (version, 0) root.nodeKey.nonce = 0 - if err := ndb.SaveNode(root); err != nil { + if err := ndb.saveNodeFromPruning(root); err != nil { return err } } @@ -421,35 +486,22 @@ func (ndb *nodeDB) deleteLegacyNodes(version int64, nk []byte) error { // deleteLegacyVersions deletes all legacy versions from disk. func (ndb *nodeDB) deleteLegacyVersions(legacyLatestVersion int64) error { - count := 0 - - checkDeletePause := func() { - count++ - if count%deleteBatchCount == 0 { - time.Sleep(deletePauseDuration) - count = 0 - } - } - // Delete the last version for the legacyLastVersion if err := ndb.traverseOrphans(legacyLatestVersion, legacyLatestVersion+1, func(orphan *Node) error { - checkDeletePause() - return ndb.batch.Delete(ndb.legacyNodeKey(orphan.hash)) + return ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash)) }); err != nil { return err } // Delete orphans for all legacy versions if err := ndb.traversePrefix(legacyOrphanKeyFormat.Key(), func(key, value []byte) error { - checkDeletePause() - if err := ndb.batch.Delete(key); err != nil { + if err := ndb.deleteFromPruning(key); err != nil { return err } var fromVersion, toVersion int64 legacyOrphanKeyFormat.Scan(key, &toVersion, &fromVersion) if (fromVersion <= legacyLatestVersion && toVersion < legacyLatestVersion) || fromVersion > legacyLatestVersion { - checkDeletePause() - return ndb.batch.Delete(ndb.legacyNodeKey(value)) + return ndb.deleteFromPruning(ndb.legacyNodeKey(value)) } return nil }); err != nil { @@ -457,8 +509,7 @@ func (ndb *nodeDB) deleteLegacyVersions(legacyLatestVersion int64) error { } // Delete all legacy roots if err := ndb.traversePrefix(legacyRootKeyFormat.Key(), func(key, value []byte) error { - checkDeletePause() - return ndb.batch.Delete(key) + return ndb.deleteFromPruning(key) }); err != nil { return err } @@ -526,8 +577,45 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error { return nil } +// startPruning starts the pruning process. +func (ndb *nodeDB) startPruning() { + for { + ndb.mtx.Lock() + toVersion := ndb.pruneVersion + ndb.mtx.Unlock() + + if toVersion == 0 { + time.Sleep(100 * time.Millisecond) + continue + } + + if err := ndb.deleteVersionsTo(toVersion); err != nil { + ndb.logger.Error("Error while pruning", "err", err) + time.Sleep(1 * time.Second) + continue + } + + ndb.mtx.Lock() + if ndb.pruneVersion <= toVersion { + ndb.pruneVersion = 0 + } + ndb.mtx.Unlock() + } +} + // DeleteVersionsTo deletes the oldest versions up to the given version from disk. func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error { + if !ndb.opts.AsyncPruning { + return ndb.deleteVersionsTo(toVersion) + } + + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + ndb.pruneVersion = toVersion + return nil +} + +func (ndb *nodeDB) deleteVersionsTo(toVersion int64) error { legacyLatestVersion, err := ndb.getLegacyLatestVersion() if err != nil { return err @@ -564,14 +652,12 @@ func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error { // Delete the legacy versions if legacyLatestVersion >= first { + if err := ndb.deleteLegacyVersions(legacyLatestVersion); err != nil { + ndb.logger.Error("Error deleting legacy versions", "err", err) + } + first = legacyLatestVersion + 1 // reset the legacy latest version forcibly to avoid multiple calls ndb.resetLegacyLatestVersion(-1) - go func() { - if err := ndb.deleteLegacyVersions(legacyLatestVersion); err != nil { - ndb.logger.Error("Error deleting legacy versions", "err", err) - } - }() - first = legacyLatestVersion + 1 } for version := first; version <= toVersion; version++ { diff --git a/options.go b/options.go index c679cfca1..520c2170f 100644 --- a/options.go +++ b/options.go @@ -84,6 +84,9 @@ type Options struct { // Ethereum has found that commit of 100KB is optimal, ref ethereum/go-ethereum#15115 FlushThreshold int + + // AsyncPruning is a flag to enable async pruning + AsyncPruning bool } // DefaultOptions returns the default options for IAVL. @@ -118,3 +121,10 @@ func FlushThresholdOption(ft int) Option { opts.FlushThreshold = ft } } + +// AsyncPruningOption sets the AsyncPruning for the tree. +func AsyncPruningOption(asyncPruning bool) Option { + return func(opts *Options) { + opts.AsyncPruning = asyncPruning + } +} diff --git a/prune_test.go b/prune_test.go new file mode 100644 index 000000000..31c738fd7 --- /dev/null +++ b/prune_test.go @@ -0,0 +1,69 @@ +package iavl + +import ( + "fmt" + "testing" + "time" + + "cosmossdk.io/log" + dbm "github.com/cosmos/iavl/db" + "github.com/stretchr/testify/require" +) + +func TestAsyncPruning(t *testing.T) { + db, err := dbm.NewDB("test", "goleveldb", t.TempDir()) + require.NoError(t, err) + defer db.Close() + + tree := NewMutableTree(db, 0, false, log.NewNopLogger(), AsyncPruningOption(true), FlushThresholdOption(1000)) + + toVersion := 10000 + keyCount := 10 + pruneInterval := int64(100) + keepRecent := int64(300) + for i := 0; i < toVersion; i++ { + for j := 0; j < keyCount; j++ { + _, err := tree.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j))) + require.NoError(t, err) + } + + tree.SetCommitting() + _, v, err := tree.SaveVersion() + require.NoError(t, err) + tree.UnsetCommitting() + + if v%pruneInterval == 0 && v > keepRecent { + ti := time.Now() + require.NoError(t, tree.DeleteVersionsTo(v-keepRecent)) + t.Logf("Pruning %d versions took %v\n", keepRecent, time.Since(ti)) + } + } + + // wait for async pruning to finish + for i := 0; i < 100; i++ { + tree.SetCommitting() + _, _, err := tree.SaveVersion() + require.NoError(t, err) + tree.UnsetCommitting() + + firstVersion, err := tree.ndb.getFirstVersion() + require.NoError(t, err) + t.Logf("Iteration: %d First version: %d\n", i, firstVersion) + if firstVersion == int64(toVersion)-keepRecent+1 { + break + } + // simulate the consensus process + time.Sleep(500 * time.Millisecond) + } + + // Reload the tree + tree = NewMutableTree(db, 0, false, log.NewNopLogger()) + _, err = tree.LoadVersion(int64(toVersion) - keepRecent) + require.Error(t, err) + versions := tree.AvailableVersions() + require.Equal(t, versions[0], toVersion-int(keepRecent)+1) + for _, v := range versions { + _, err := tree.LoadVersion(int64(v)) + require.NoError(t, err) + } +}