From ace73e851c6ee784b09eddbdca3342249ac1e6ac Mon Sep 17 00:00:00 2001 From: HuangYi Date: Tue, 16 May 2023 20:53:56 +0800 Subject: [PATCH 1/3] Problem: async commit queue size not configurable Solution: - change the bool option to int rename WaitAsyncCommit --- app/config/config.go | 5 ++- app/config/toml.go | 5 ++- app/memiavl.go | 8 ++-- memiavl/db.go | 91 ++++++++++++++++++++++++---------------- store/rootmulti/store.go | 22 ++++++---- 5 files changed, 79 insertions(+), 52 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index b7f80ad06b..09df36c74d 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -14,8 +14,9 @@ type MemIAVLConfig struct { // ZeroCopy defines if the memiavl should return slices pointing to mmap-ed buffers directly (zero-copy), // the zero-copied slices must not be retained beyond current block's execution. ZeroCopy bool `mapstructure:"zero-copy"` - // AsyncCommit defines if the memiavl should commit asynchronously, this greatly improve block catching-up performance. - AsyncCommit bool `mapstructure:"async-commit"` + // AsyncCommitBuffer defines the size of asynchronous commit queue, this greatly improve block catching-up + // performance, -1 means synchronous commit. + AsyncCommitBuffer int `mapstructure:"async-commit-buffer"` } func DefaultMemIAVLConfig() MemIAVLConfig { diff --git a/app/config/toml.go b/app/config/toml.go index ebd3dceab4..10f3aed05a 100644 --- a/app/config/toml.go +++ b/app/config/toml.go @@ -15,6 +15,7 @@ enable = {{ .MemIAVL.Enable }} # the zero-copied slices must not be retained beyond current block's execution. zero-copy = {{ .MemIAVL.ZeroCopy }} -# AsyncCommit defines if the memiavl should commit asynchronously, this greatly improve block catching-up performance. -async-commit = {{ .MemIAVL.AsyncCommit }} +# AsyncCommitBuffer defines the size of asynchronous commit queue, this greatly improve block catching-up +# performance, -1 means synchronous commit. +async-commit-buffer = {{ .MemIAVL.AsyncCommitBuffer }} ` diff --git a/app/memiavl.go b/app/memiavl.go index 195bf797c0..5533e8b381 100644 --- a/app/memiavl.go +++ b/app/memiavl.go @@ -13,9 +13,9 @@ import ( ) const ( - FlagMemIAVL = "memiavl.enable" - FlagAsyncCommit = "memiavl.async-commit" - FlagZeroCopy = "memiavl.zero-copy" + FlagMemIAVL = "memiavl.enable" + FlagAsyncCommitBuffer = "memiavl.async-commit-buffer" + FlagZeroCopy = "memiavl.zero-copy" ) func SetupMemIAVL(logger log.Logger, homePath string, appOpts servertypes.AppOptions, baseAppOptions []func(*baseapp.BaseApp)) []func(*baseapp.BaseApp) { @@ -23,7 +23,7 @@ func SetupMemIAVL(logger log.Logger, homePath string, appOpts servertypes.AppOpt // cms must be overridden before the other options, because they may use the cms, // make sure the cms aren't be overridden by the other options later on. cms := rootmulti.NewStore(filepath.Join(homePath, "data", "memiavl.db"), logger) - cms.SetAsyncCommit(cast.ToBool(appOpts.Get(FlagAsyncCommit))) + cms.SetAsyncCommitBuffer(cast.ToInt(appOpts.Get(FlagAsyncCommitBuffer))) cms.SetZeroCopy(cast.ToBool(appOpts.Get(FlagZeroCopy))) baseAppOptions = append([]func(*baseapp.BaseApp){setCMS(cms)}, baseAppOptions...) } diff --git a/memiavl/db.go b/memiavl/db.go index e33fc3b577..b600560096 100644 --- a/memiavl/db.go +++ b/memiavl/db.go @@ -38,9 +38,10 @@ type DB struct { pruneSnapshotLock sync.Mutex // invariant: the LastIndex always match the current version of MultiTree - wal *wal.Log - walChan chan *walEntry - walQuit chan error + wal *wal.Log + walChanSize int + walChan chan *walEntry + walQuit chan error // pending store upgrades, will be written into WAL in next Commit call pendingUpgrades []*TreeNameUpgrade @@ -63,8 +64,8 @@ type Options struct { SnapshotKeepRecent uint32 // load the target version instead of latest version TargetVersion uint32 - // Write WAL asynchronously, it's ok in blockchain case because we can always replay the raw blocks. - AsyncCommit bool + // Buffer size for the asynchronous commit queue, -1 means synchronous commit + AsyncCommitBuffer int // ZeroCopy if true, the get and iterator methods could return a slice pointing to mmaped blob files. ZeroCopy bool } @@ -97,36 +98,11 @@ func Load(dir string, opts Options) (*DB, error) { return nil, err } - var ( - walChan chan *walEntry - walQuit chan error - ) - if opts.AsyncCommit { - walChan = make(chan *walEntry, 100) - walQuit = make(chan error) - go func() { - defer close(walQuit) - - for entry := range walChan { - bz, err := entry.data.Marshal() - if err != nil { - walQuit <- err - return - } - if err := wal.Write(entry.index, bz); err != nil { - walQuit <- err - return - } - } - }() - } - db := &DB{ MultiTree: *mtree, dir: dir, wal: wal, - walChan: walChan, - walQuit: walQuit, + walChanSize: opts.AsyncCommitBuffer, snapshotKeepRecent: opts.SnapshotKeepRecent, } @@ -179,13 +155,13 @@ func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error { // checkAsyncTasks checks the status of background tasks non-blocking-ly and process the result func (db *DB) checkAsyncTasks() error { return errors.Join( - db.checkAsyncWAL(), + db.checkAsyncCommit(), db.checkBackgroundSnapshotRewrite(), ) } -// checkAsyncWAL check the quit signal of async wal writing -func (db *DB) checkAsyncWAL() error { +// checkAsyncCommit check the quit signal of async wal writing +func (db *DB) checkAsyncCommit() error { select { case err := <-db.walQuit: // async wal writing failed, we need to abort the state machine @@ -269,7 +245,11 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) { Changesets: changeSets, Upgrades: db.pendingUpgrades, }} - if db.walChan != nil { + if db.walChanSize >= 0 { + if db.walChan == nil { + db.initAsyncCommit() + } + // async wal writing db.walChan <- &entry } else { @@ -288,6 +268,47 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) { return hash, v, nil } +func (db *DB) initAsyncCommit() { + walChan := make(chan *walEntry, db.walChanSize) + walQuit := make(chan error) + + go func() { + defer close(walQuit) + + for entry := range walChan { + bz, err := entry.data.Marshal() + if err != nil { + walQuit <- err + return + } + if err := db.wal.Write(entry.index, bz); err != nil { + walQuit <- err + return + } + } + }() + + db.walChan = walChan + db.walQuit = walQuit +} + +// WaitAsyncCommit waits for the completion of async commit +func (db *DB) WaitAsyncCommit() error { + db.mtx.Lock() + defer db.mtx.Unlock() + + if db.walChan == nil { + return nil + } + + close(db.walChan) + err := <-db.walQuit + + db.walChan = nil + db.walQuit = nil + return err +} + func (db *DB) Copy() *DB { db.mtx.Lock() defer db.mtx.Unlock() diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index 80786a8c8f..c7ca247dfe 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -50,8 +50,8 @@ type Store struct { interBlockCache types.MultiStorePersistentCache - asyncCommit bool - zeroCopy bool + asyncCommitBuffer int + zeroCopy bool } func NewStore(dir string, logger log.Logger) *Store { @@ -92,6 +92,10 @@ func (rs *Store) Commit() types.CommitID { return rs.lastCommitInfo.CommitID() } +func (rs *Store) WaitAsyncCommit() error { + return rs.db.WaitAsyncCommit() +} + // Implements interface Committer func (rs *Store) LastCommitID() types.CommitID { return rs.lastCommitInfo.CommitID() @@ -266,11 +270,11 @@ func (rs *Store) LoadVersionAndUpgrade(version int64, upgrades *types.StoreUpgra } } db, err := memiavl.Load(rs.dir, memiavl.Options{ - CreateIfMissing: true, - InitialStores: initialStores, - TargetVersion: uint32(version), - AsyncCommit: rs.asyncCommit, - ZeroCopy: rs.zeroCopy, + CreateIfMissing: true, + InitialStores: initialStores, + TargetVersion: uint32(version), + AsyncCommitBuffer: rs.asyncCommitBuffer, + ZeroCopy: rs.zeroCopy, }) if err != nil { return errors.Wrapf(err, "fail to load memiavl at %s", rs.dir) @@ -384,8 +388,8 @@ func (rs *Store) SetIAVLDisableFastNode(disable bool) { func (rs *Store) SetLazyLoading(lazyLoading bool) { } -func (rs *Store) SetAsyncCommit(async bool) { - rs.asyncCommit = async +func (rs *Store) SetAsyncCommitBuffer(size int) { + rs.asyncCommitBuffer = size } func (rs *Store) SetZeroCopy(zeroCopy bool) { From a0986f450ba1ade53687b4edfa224edb6aa6e6bd Mon Sep 17 00:00:00 2001 From: HuangYi Date: Wed, 17 May 2023 09:16:51 +0800 Subject: [PATCH 2/3] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4060f3c49c..252e590304 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ - [#998](https://github.com/crypto-org-chain/cronos/pull/998) Bump grocksdb to v1.7.16 and rocksdb to v7.10.2 - [#1028](https://github.com/crypto-org-chain/cronos/pull/1028) Add memiavl configs into app.toml - [#1027](https://github.com/crypto-org-chain/cronos/pull/1027) Integrate local state-sync commands. +- [#1029](https://github.com/crypto-org-chain/cronos/pull/1029) Change config `async-commit` to `async-commit-buffer`, make the channel size configurable. *April 13, 2023* From d80aaba6845b26d23a3cfac62c7a85062e5e9d3c Mon Sep 17 00:00:00 2001 From: HuangYi Date: Wed, 17 May 2023 11:36:14 +0800 Subject: [PATCH 3/3] fix unit test --- memiavl/snapshot_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/memiavl/snapshot_test.go b/memiavl/snapshot_test.go index be8456e3d5..024cc9f67b 100644 --- a/memiavl/snapshot_test.go +++ b/memiavl/snapshot_test.go @@ -132,8 +132,9 @@ func TestSnapshotImportExport(t *testing.T) { func TestDBSnapshotRestore(t *testing.T) { db, err := Load(t.TempDir(), Options{ - CreateIfMissing: true, - InitialStores: []string{"test"}, + CreateIfMissing: true, + InitialStores: []string{"test"}, + AsyncCommitBuffer: -1, }) require.NoError(t, err)