Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: async commit queue size not configurable #1029

Merged
merged 3 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
- [#998](https://github.com/crypto-org-chain/cronos/pull/998) Bump grocksdb to v1.7.16 and rocksdb to v7.10.2
- [#1028](https://github.com/crypto-org-chain/cronos/pull/1028) Add memiavl configs into app.toml
- [#1027](https://github.com/crypto-org-chain/cronos/pull/1027) Integrate local state-sync commands.
- [#1029](https://github.com/crypto-org-chain/cronos/pull/1029) Change config `async-commit` to `async-commit-buffer`, make the channel size configurable.

*April 13, 2023*

Expand Down
5 changes: 3 additions & 2 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ type MemIAVLConfig struct {
// ZeroCopy defines if the memiavl should return slices pointing to mmap-ed buffers directly (zero-copy),
// the zero-copied slices must not be retained beyond current block's execution.
ZeroCopy bool `mapstructure:"zero-copy"`
// AsyncCommit defines if the memiavl should commit asynchronously, this greatly improve block catching-up performance.
AsyncCommit bool `mapstructure:"async-commit"`
// AsyncCommitBuffer defines the size of asynchronous commit queue, this greatly improve block catching-up
// performance, -1 means synchronous commit.
AsyncCommitBuffer int `mapstructure:"async-commit-buffer"`
}

func DefaultMemIAVLConfig() MemIAVLConfig {
Expand Down
5 changes: 3 additions & 2 deletions app/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ enable = {{ .MemIAVL.Enable }}
# the zero-copied slices must not be retained beyond current block's execution.
zero-copy = {{ .MemIAVL.ZeroCopy }}

# AsyncCommit defines if the memiavl should commit asynchronously, this greatly improve block catching-up performance.
async-commit = {{ .MemIAVL.AsyncCommit }}
# AsyncCommitBuffer defines the size of asynchronous commit queue, this greatly improve block catching-up
# performance, -1 means synchronous commit.
async-commit-buffer = {{ .MemIAVL.AsyncCommitBuffer }}
`
8 changes: 4 additions & 4 deletions app/memiavl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import (
)

const (
FlagMemIAVL = "memiavl.enable"
FlagAsyncCommit = "memiavl.async-commit"
FlagZeroCopy = "memiavl.zero-copy"
FlagMemIAVL = "memiavl.enable"
FlagAsyncCommitBuffer = "memiavl.async-commit-buffer"
FlagZeroCopy = "memiavl.zero-copy"
)

func SetupMemIAVL(logger log.Logger, homePath string, appOpts servertypes.AppOptions, baseAppOptions []func(*baseapp.BaseApp)) []func(*baseapp.BaseApp) {
if cast.ToBool(appOpts.Get(FlagMemIAVL)) {
// cms must be overridden before the other options, because they may use the cms,
// make sure the cms aren't be overridden by the other options later on.
cms := rootmulti.NewStore(filepath.Join(homePath, "data", "memiavl.db"), logger)
cms.SetAsyncCommit(cast.ToBool(appOpts.Get(FlagAsyncCommit)))
cms.SetAsyncCommitBuffer(cast.ToInt(appOpts.Get(FlagAsyncCommitBuffer)))
cms.SetZeroCopy(cast.ToBool(appOpts.Get(FlagZeroCopy)))
baseAppOptions = append([]func(*baseapp.BaseApp){setCMS(cms)}, baseAppOptions...)
}
Expand Down
91 changes: 56 additions & 35 deletions memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type DB struct {
pruneSnapshotLock sync.Mutex

// invariant: the LastIndex always match the current version of MultiTree
wal *wal.Log
walChan chan *walEntry
walQuit chan error
wal *wal.Log
walChanSize int
walChan chan *walEntry
walQuit chan error

// pending store upgrades, will be written into WAL in next Commit call
pendingUpgrades []*TreeNameUpgrade
Expand All @@ -63,8 +64,8 @@ type Options struct {
SnapshotKeepRecent uint32
// load the target version instead of latest version
TargetVersion uint32
// Write WAL asynchronously, it's ok in blockchain case because we can always replay the raw blocks.
AsyncCommit bool
// Buffer size for the asynchronous commit queue, -1 means synchronous commit
AsyncCommitBuffer int
// ZeroCopy if true, the get and iterator methods could return a slice pointing to mmaped blob files.
ZeroCopy bool
}
Expand Down Expand Up @@ -97,36 +98,11 @@ func Load(dir string, opts Options) (*DB, error) {
return nil, err
}

var (
walChan chan *walEntry
walQuit chan error
)
if opts.AsyncCommit {
walChan = make(chan *walEntry, 100)
walQuit = make(chan error)
go func() {
defer close(walQuit)

for entry := range walChan {
bz, err := entry.data.Marshal()
if err != nil {
walQuit <- err
return
}
if err := wal.Write(entry.index, bz); err != nil {
walQuit <- err
return
}
}
}()
}

db := &DB{
MultiTree: *mtree,
dir: dir,
wal: wal,
walChan: walChan,
walQuit: walQuit,
walChanSize: opts.AsyncCommitBuffer,
snapshotKeepRecent: opts.SnapshotKeepRecent,
}

Expand Down Expand Up @@ -179,13 +155,13 @@ func (db *DB) ApplyUpgrades(upgrades []*TreeNameUpgrade) error {
// checkAsyncTasks checks the status of background tasks non-blocking-ly and process the result
func (db *DB) checkAsyncTasks() error {
return errors.Join(
db.checkAsyncWAL(),
db.checkAsyncCommit(),
db.checkBackgroundSnapshotRewrite(),
)
}

// checkAsyncWAL check the quit signal of async wal writing
func (db *DB) checkAsyncWAL() error {
// checkAsyncCommit check the quit signal of async wal writing
func (db *DB) checkAsyncCommit() error {
select {
case err := <-db.walQuit:
// async wal writing failed, we need to abort the state machine
Expand Down Expand Up @@ -269,7 +245,11 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
Changesets: changeSets,
Upgrades: db.pendingUpgrades,
}}
if db.walChan != nil {
if db.walChanSize >= 0 {
if db.walChan == nil {
db.initAsyncCommit()
}

// async wal writing
db.walChan <- &entry
} else {
Expand All @@ -288,6 +268,47 @@ func (db *DB) Commit(changeSets []*NamedChangeSet) ([]byte, int64, error) {
return hash, v, nil
}

func (db *DB) initAsyncCommit() {
walChan := make(chan *walEntry, db.walChanSize)
walQuit := make(chan error)

go func() {
defer close(walQuit)

for entry := range walChan {
bz, err := entry.data.Marshal()
if err != nil {
walQuit <- err
return
}
if err := db.wal.Write(entry.index, bz); err != nil {
walQuit <- err
return
}
}
}()
Comment on lines +275 to +289

Check notice

Code scanning / CodeQL

Spawning a Go routine

Spawning a Go routine may be a possible source of non-determinism

db.walChan = walChan
db.walQuit = walQuit
}

// WaitAsyncCommit waits for the completion of async commit
func (db *DB) WaitAsyncCommit() error {
db.mtx.Lock()
defer db.mtx.Unlock()

if db.walChan == nil {
return nil
}

close(db.walChan)
err := <-db.walQuit

db.walChan = nil
db.walQuit = nil
return err
}

func (db *DB) Copy() *DB {
db.mtx.Lock()
defer db.mtx.Unlock()
Expand Down
5 changes: 3 additions & 2 deletions memiavl/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ func TestSnapshotImportExport(t *testing.T) {

func TestDBSnapshotRestore(t *testing.T) {
db, err := Load(t.TempDir(), Options{
CreateIfMissing: true,
InitialStores: []string{"test"},
CreateIfMissing: true,
InitialStores: []string{"test"},
AsyncCommitBuffer: -1,
})
require.NoError(t, err)

Expand Down
22 changes: 13 additions & 9 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type Store struct {

interBlockCache types.MultiStorePersistentCache

asyncCommit bool
zeroCopy bool
asyncCommitBuffer int
zeroCopy bool
}

func NewStore(dir string, logger log.Logger) *Store {
Expand Down Expand Up @@ -92,6 +92,10 @@ func (rs *Store) Commit() types.CommitID {
return rs.lastCommitInfo.CommitID()
}

func (rs *Store) WaitAsyncCommit() error {
return rs.db.WaitAsyncCommit()
}

// Implements interface Committer
func (rs *Store) LastCommitID() types.CommitID {
return rs.lastCommitInfo.CommitID()
Expand Down Expand Up @@ -266,11 +270,11 @@ func (rs *Store) LoadVersionAndUpgrade(version int64, upgrades *types.StoreUpgra
}
}
db, err := memiavl.Load(rs.dir, memiavl.Options{
CreateIfMissing: true,
InitialStores: initialStores,
TargetVersion: uint32(version),
AsyncCommit: rs.asyncCommit,
ZeroCopy: rs.zeroCopy,
CreateIfMissing: true,
InitialStores: initialStores,
TargetVersion: uint32(version),

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
AsyncCommitBuffer: rs.asyncCommitBuffer,
ZeroCopy: rs.zeroCopy,
})
if err != nil {
return errors.Wrapf(err, "fail to load memiavl at %s", rs.dir)
Expand Down Expand Up @@ -384,8 +388,8 @@ func (rs *Store) SetIAVLDisableFastNode(disable bool) {
func (rs *Store) SetLazyLoading(lazyLoading bool) {
}

func (rs *Store) SetAsyncCommit(async bool) {
rs.asyncCommit = async
func (rs *Store) SetAsyncCommitBuffer(size int) {
rs.asyncCommitBuffer = size
}

func (rs *Store) SetZeroCopy(zeroCopy bool) {
Expand Down