From 5bb19adf72736882f6300dbce5c0b30eeb359fb7 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 16:31:56 -0800 Subject: [PATCH 1/6] Adjust performance options --- storage/badger_storage.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/badger_storage.go b/storage/badger_storage.go index 01da3b0b..2ccee235 100644 --- a/storage/badger_storage.go +++ b/storage/badger_storage.go @@ -171,7 +171,7 @@ func PerformanceBadgerOptions(dir string) badger.Options { // This option will have a significant effect the memory. If the level is kept // in-memory, read are faster but the tables will be kept in memory. By default, // this is set to false. - opts.KeepL0InMemory = false + opts.KeepL0InMemory = true // We don't compact L0 on close as this can greatly delay shutdown time. opts.CompactL0OnClose = false @@ -179,7 +179,7 @@ func PerformanceBadgerOptions(dir string) badger.Options { // LoadBloomsOnOpen=false will improve the db startup speed. This is also // a waste to enable with a limited index cache size (as many of the loaded bloom // filters will be immediately discarded from the cache). - opts.LoadBloomsOnOpen = false + opts.LoadBloomsOnOpen = true return opts } From 6b89802d831b6653376b5c9aae73d10a83426778 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 16:44:09 -0800 Subject: [PATCH 2/6] update Database interface --- storage/badger_storage.go | 75 +++++++++++++++++++++++++-------------- storage/storage.go | 5 ++- 2 files changed, 53 insertions(+), 27 deletions(-) diff --git a/storage/badger_storage.go b/storage/badger_storage.go index 2ccee235..f0710358 100644 --- a/storage/badger_storage.go +++ b/storage/badger_storage.go @@ -80,7 +80,7 @@ type BadgerStorage struct { encoder *Encoder compress bool - writer sync.Mutex + writer *utils.MutexMap // Track the closed status to ensure we exit garbage // collection when the db closes. @@ -292,7 +292,8 @@ type BadgerTransaction struct { txn *badger.Txn rwLock sync.RWMutex - holdsLock bool + holdGlobal bool + identifier string // We MUST wait to reclaim any memory until after // the transaction is committed or discarded. @@ -305,31 +306,58 @@ type BadgerTransaction struct { buffersToReclaim []*bytes.Buffer } -// NewDatabaseTransaction creates a new BadgerTransaction. -// If the transaction will not modify any values, pass -// in false for the write parameter (this allows for -// optimization within the Badger DB). -func (b *BadgerStorage) NewDatabaseTransaction( +// GTransaction creates a new exclusive write BadgerTransaction. +func (b *BadgerStorage) GTransaction( ctx context.Context, - write bool, ) DatabaseTransaction { - if write { - // To avoid database commit conflicts, - // we need to lock the writer. - // - // Because we process blocks serially, - // this doesn't lead to much lock contention. - b.writer.Lock() + b.writer.GLock() + + return &BadgerTransaction{ + db: b, + txn: b.db.NewTransaction(true), + holdGlobal: true, + buffersToReclaim: []*bytes.Buffer{}, + } +} + +// RTransaction creates a new read BadgerTransaction. +func (b *BadgerStorage) RTransaction( + ctx context.Context, +) DatabaseTransaction { + return &BadgerTransaction{ + db: b, + txn: b.db.NewTransaction(false), + buffersToReclaim: []*bytes.Buffer{}, } +} + +// WTransaction creates a new write BadgerTransaction. +func (b *BadgerStorage) WTransaction( + ctx context.Context, + identifier string, + priority bool, +) DatabaseTransaction { + b.writer.Lock(identifier, priority) return &BadgerTransaction{ db: b, - txn: b.db.NewTransaction(write), - holdsLock: write, + txn: b.db.NewTransaction(true), + identifier: identifier, buffersToReclaim: []*bytes.Buffer{}, } } +func (b *BadgerTransaction) releaseLocks() { + if b.holdGlobal { + b.holdGlobal = false + b.db.writer.GUnlock() + } + if len(b.identifier) > 0 { + b.db.writer.Unlock(b.identifier) + b.identifier = "" + } +} + // Commit attempts to commit and discard the transaction. func (b *BadgerTransaction) Commit(context.Context) error { err := b.txn.Commit() @@ -346,10 +374,7 @@ func (b *BadgerTransaction) Commit(context.Context) error { // It is possible that we may accidentally call commit twice. // In this case, we only unlock if we hold the lock to avoid a panic. - if b.holdsLock { - b.holdsLock = false - b.db.writer.Unlock() - } + b.releaseLocks() if err != nil { return fmt.Errorf("%w: %v", ErrCommitFailed, err) @@ -373,9 +398,7 @@ func (b *BadgerTransaction) Discard(context.Context) { b.buffersToReclaim = nil b.reclaimLock.Unlock() - if b.holdsLock { - b.db.writer.Unlock() - } + b.releaseLocks() } // Set changes the value of the key to the value within a transaction. @@ -548,7 +571,7 @@ func recompress( onDiskSize := float64(0) newSize := float64(0) - txn := badgerDb.NewDatabaseTransaction(ctx, false) + txn := badgerDb.RTransaction(ctx) defer txn.Discard(ctx) _, err := txn.Scan( ctx, @@ -622,7 +645,7 @@ func BadgerTrain( totalUncompressedSize := float64(0) totalDiskSize := float64(0) entriesSeen := 0 - txn := badgerDb.NewDatabaseTransaction(ctx, false) + txn := badgerDb.RTransaction(ctx) defer txn.Discard(ctx) _, err = txn.Scan( ctx, diff --git a/storage/storage.go b/storage/storage.go index 30aaffd5..ede577c6 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -21,7 +21,10 @@ import ( // Database is an interface that provides transactional // access to a KV store. type Database interface { - NewDatabaseTransaction(context.Context, bool) DatabaseTransaction + GTransaction(context.Context) DatabaseTransaction + RTransaction(context.Context) DatabaseTransaction + WTransaction(ctx context.Context, identifier string, priority bool) DatabaseTransaction + Close(context.Context) error Encoder() *Encoder } From 7e944d7f24b49b1871e551d110663b369cdee57c Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 25 Nov 2020 09:25:54 -0800 Subject: [PATCH 3/6] cleanup comments --- storage/badger_storage.go | 17 +++++++++-------- storage/storage.go | 25 ++++++++++++++++++++++--- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/storage/badger_storage.go b/storage/badger_storage.go index f0710358..c0f1f584 100644 --- a/storage/badger_storage.go +++ b/storage/badger_storage.go @@ -306,8 +306,8 @@ type BadgerTransaction struct { buffersToReclaim []*bytes.Buffer } -// GTransaction creates a new exclusive write BadgerTransaction. -func (b *BadgerStorage) GTransaction( +// Transaction creates a new exclusive write BadgerTransaction. +func (b *BadgerStorage) Transaction( ctx context.Context, ) DatabaseTransaction { b.writer.GLock() @@ -320,8 +320,8 @@ func (b *BadgerStorage) GTransaction( } } -// RTransaction creates a new read BadgerTransaction. -func (b *BadgerStorage) RTransaction( +// ReadTransaction creates a new read BadgerTransaction. +func (b *BadgerStorage) ReadTransaction( ctx context.Context, ) DatabaseTransaction { return &BadgerTransaction{ @@ -331,8 +331,9 @@ func (b *BadgerStorage) RTransaction( } } -// WTransaction creates a new write BadgerTransaction. -func (b *BadgerStorage) WTransaction( +// WriteTransaction creates a new write BadgerTransaction +// for a particular identifier. +func (b *BadgerStorage) WriteTransaction( ctx context.Context, identifier string, priority bool, @@ -571,7 +572,7 @@ func recompress( onDiskSize := float64(0) newSize := float64(0) - txn := badgerDb.RTransaction(ctx) + txn := badgerDb.ReadTransaction(ctx) defer txn.Discard(ctx) _, err := txn.Scan( ctx, @@ -645,7 +646,7 @@ func BadgerTrain( totalUncompressedSize := float64(0) totalDiskSize := float64(0) entriesSeen := 0 - txn := badgerDb.RTransaction(ctx) + txn := badgerDb.ReadTransaction(ctx) defer txn.Discard(ctx) _, err = txn.Scan( ctx, diff --git a/storage/storage.go b/storage/storage.go index ede577c6..92a9ad36 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -21,11 +21,30 @@ import ( // Database is an interface that provides transactional // access to a KV store. type Database interface { - GTransaction(context.Context) DatabaseTransaction - RTransaction(context.Context) DatabaseTransaction - WTransaction(ctx context.Context, identifier string, priority bool) DatabaseTransaction + // Transaction acquires an exclusive write lock on the database. + // This ensures all other calls to Transaction and WriteTransaction + // will block until the returned DatabaseTransaction is committed or + // discarded. This is useful for making changes across + // multiple prefixes but incurs a large performance overhead. + Transaction(context.Context) DatabaseTransaction + // ReadTransaction allows for consistent, read-only access + // to the database. This does not acquire any lock + // on the database. + ReadTransaction(context.Context) DatabaseTransaction + + // WriteTransaction acquires a granular write lock for a particular + // identifier. All subsequent calls to WriteTransaction with the same + // identifier will block until the DatabaseTransaction returned is either + // committed or discarded. + WriteTransaction(ctx context.Context, identifier string, priority bool) DatabaseTransaction + + // Close shuts down the database. Close(context.Context) error + + // Encoder returns the *Encoder used to store/read data + // in the database. This *Encoder often performs some + // form of compression on data. Encoder() *Encoder } From c439d620db3a6dd14c3544a0d1474e628b4efbdf Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 25 Nov 2020 09:30:01 -0800 Subject: [PATCH 4/6] replace interface calls --- constructor/coordinator/coordinator_test.go | 50 +++++++++--------- constructor/worker/worker_test.go | 6 +-- storage/badger_storage_test.go | 32 ++++++------ storage/balance_storage.go | 14 ++--- storage/balance_storage_test.go | 58 ++++++++++----------- storage/block_storage.go | 24 ++++----- storage/block_storage_test.go | 8 +-- storage/broadcast_storage.go | 8 +-- storage/broadcast_storage_test.go | 40 +++++++------- storage/coin_storage.go | 6 +-- storage/coin_storage_test.go | 18 +++---- storage/counter_storage.go | 4 +- storage/job_storage.go | 10 ++-- storage/job_storage_test.go | 14 ++--- storage/key_storage.go | 6 +-- 15 files changed, 149 insertions(+), 149 deletions(-) diff --git a/constructor/coordinator/coordinator_test.go b/constructor/coordinator/coordinator_test.go index 4ec64baa..f67cbef4 100644 --- a/constructor/coordinator/coordinator_test.go +++ b/constructor/coordinator/coordinator_test.go @@ -244,7 +244,7 @@ func TestProcess(t *testing.T) { // all responses from the database and "write" transactions require a // lock. While it would be possible to orchestrate these locks in this // test, it is simpler to just use a "read" transaction. - dbTxFail := db.NewDatabaseTransaction(ctx, false) + dbTxFail := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTxFail).Once() jobStorage.On("Ready", ctx, dbTxFail).Return([]*job.Job{}, nil).Once() jobStorage.On("Processing", ctx, dbTxFail, "transfer").Return([]*job.Job{}, nil).Once() @@ -261,7 +261,7 @@ func TestProcess(t *testing.T) { // Determine account must be created helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx := db.NewDatabaseTransaction(ctx, false) + dbTx := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx).Once() jobStorage.On("Ready", ctx, dbTx).Return([]*job.Job{}, nil).Once() jobStorage.On("Broadcasting", ctx, dbTx).Return([]*job.Job{}, nil).Once() @@ -292,7 +292,7 @@ func TestProcess(t *testing.T) { // Attempt to run transfer again (but determine funds are needed) helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTxFail2 := db.NewDatabaseTransaction(ctx, false) + dbTxFail2 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTxFail2).Once() jobStorage.On("Ready", ctx, dbTxFail2).Return([]*job.Job{}, nil).Once() jobStorage.On("Processing", ctx, dbTxFail2, "transfer").Return([]*job.Job{}, nil).Once() @@ -323,7 +323,7 @@ func TestProcess(t *testing.T) { // Attempt funds request helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx2 := db.NewDatabaseTransaction(ctx, false) + dbTx2 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once() jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{}, nil).Once() jobStorage.On("Broadcasting", ctx, dbTx2).Return([]*job.Job{}, nil).Once() @@ -370,7 +370,7 @@ func TestProcess(t *testing.T) { // Load funds helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTxExtra := db.NewDatabaseTransaction(ctx, false) + dbTxExtra := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTxExtra).Once() jobStorage.On("Ready", ctx, dbTxExtra).Return([]*job.Job{&jobExtra}, nil).Once() helper.On("AllAccounts", ctx, dbTxExtra).Return([]*types.AccountIdentifier{ @@ -403,7 +403,7 @@ func TestProcess(t *testing.T) { // Attempt to transfer again helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTxFail3 := db.NewDatabaseTransaction(ctx, false) + dbTxFail3 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTxFail3).Once() jobStorage.On("Ready", ctx, dbTxFail3).Return([]*job.Job{}, nil).Once() jobStorage.On("Processing", ctx, dbTxFail3, "transfer").Return([]*job.Job{}, nil).Once() @@ -437,7 +437,7 @@ func TestProcess(t *testing.T) { // Attempt to create recipient helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx3 := db.NewDatabaseTransaction(ctx, false) + dbTx3 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx3).Once() jobStorage.On("Ready", ctx, dbTx3).Return([]*job.Job{}, nil).Once() jobStorage.On("Broadcasting", ctx, dbTx3).Return([]*job.Job{}, nil).Once() @@ -468,7 +468,7 @@ func TestProcess(t *testing.T) { // Attempt to create transfer helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx4 := db.NewDatabaseTransaction(ctx, false) + dbTx4 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx4).Once() jobStorage.On("Ready", ctx, dbTx4).Return([]*job.Job{}, nil).Once() jobStorage.On("Processing", ctx, dbTx4, "transfer").Return([]*job.Job{}, nil).Once() @@ -676,7 +676,7 @@ func TestProcess(t *testing.T) { // Wait for transfer to complete helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx5 := db.NewDatabaseTransaction(ctx, false) + dbTx5 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx5).Once() jobStorage.On("Ready", ctx, dbTx5).Return([]*job.Job{}, nil).Once() jobStorage.On("Processing", ctx, dbTx5, "transfer").Return([]*job.Job{&job4}, nil).Once() @@ -720,7 +720,7 @@ func TestProcess(t *testing.T) { } go func() { <-markConfirmed - dbTx6 := db.NewDatabaseTransaction(ctx, false) + dbTx6 := db.ReadTransaction(ctx) jobStorage.On("Get", ctx, dbTx6, "job4").Return(&job4, nil).Once() jobStorage.On( "Update", @@ -745,7 +745,7 @@ func TestProcess(t *testing.T) { }() helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx7 := db.NewDatabaseTransaction(ctx, false) + dbTx7 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx7).Once() jobStorage.On("Ready", ctx, dbTx7).Return([]*job.Job{&job4}, nil).Once() jobStorage.On( @@ -880,7 +880,7 @@ func TestProcess_Failed(t *testing.T) { // Attempt to create transfer helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx := db.NewDatabaseTransaction(ctx, false) + dbTx := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx).Once() jobStorage.On("Ready", ctx, dbTx).Return([]*job.Job{}, nil).Once() jobStorage.On("Processing", ctx, dbTx, "transfer").Return([]*job.Job{}, nil).Once() @@ -1147,7 +1147,7 @@ func TestProcess_Failed(t *testing.T) { // Wait for transfer to complete helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx2 := db.NewDatabaseTransaction(ctx, false) + dbTx2 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once() jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{}, nil).Once() jobStorage.On("Processing", ctx, dbTx2, "transfer").Return([]*job.Job{&j}, nil).Once() @@ -1161,7 +1161,7 @@ func TestProcess_Failed(t *testing.T) { go func() { <-markConfirmed - dbTx3 := db.NewDatabaseTransaction(ctx, false) + dbTx3 := db.ReadTransaction(ctx) jobStorage.On("Get", ctx, dbTx3, jobIdentifier).Return(&j, nil).Once() jobStorage.On( "Update", @@ -1398,7 +1398,7 @@ func TestProcess_DryRun(t *testing.T) { helper.On("HeadBlockExists", ctx).Return(true).Once() // Attempt to transfer - dbTx := db.NewDatabaseTransaction(ctx, false) + dbTx := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx).Once() jobStorage.On("Ready", ctx, dbTx).Return([]*job.Job{}, nil).Once() jobStorage.On("Processing", ctx, dbTx, "transfer").Return([]*job.Job{}, nil).Once() @@ -1509,7 +1509,7 @@ func TestProcess_DryRun(t *testing.T) { // Process second scenario helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx2 := db.NewDatabaseTransaction(ctx, false) + dbTx2 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once() jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{&j}, nil).Once() jobStorage.On("Update", ctx, dbTx2, mock.Anything).Run(func(args mock.Arguments) { @@ -1683,7 +1683,7 @@ func TestReturnFunds_NoBalance(t *testing.T) { helper.On("HeadBlockExists", ctx).Return(true).Once() // Attempt to transfer - dbTxFail := db.NewDatabaseTransaction(ctx, false) + dbTxFail := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTxFail).Once() jobStorage.On("Ready", ctx, dbTxFail).Return([]*job.Job{}, nil).Once() jobStorage.On( @@ -1748,7 +1748,7 @@ func TestReturnFunds_NoBalance(t *testing.T) { // Will exit this round because we've tried all workflows. helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx2 := db.NewDatabaseTransaction(ctx, false) + dbTx2 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once() jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{}, nil).Once() jobStorage.On("Broadcasting", ctx, dbTx2).Return([]*job.Job{}, nil).Once() @@ -2015,7 +2015,7 @@ func TestReturnFunds(t *testing.T) { helper.On("HeadBlockExists", ctx).Return(true).Once() // Attempt to transfer - dbTxFail := db.NewDatabaseTransaction(ctx, false) + dbTxFail := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTxFail).Once() jobStorage.On("Ready", ctx, dbTxFail).Return([]*job.Job{}, nil).Once() jobStorage.On( @@ -2214,7 +2214,7 @@ func TestReturnFunds(t *testing.T) { // Wait for transfer to complete helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx := db.NewDatabaseTransaction(ctx, false) + dbTx := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx).Once() jobStorage.On("Ready", ctx, dbTx).Return([]*job.Job{}, nil).Once() jobStorage.On( @@ -2266,7 +2266,7 @@ func TestReturnFunds(t *testing.T) { } go func() { <-markConfirmed - dbTx2 := db.NewDatabaseTransaction(ctx, false) + dbTx2 := db.ReadTransaction(ctx) jobStorage.On("Get", ctx, dbTx2, jobIdentifier).Return(&j, nil).Once() jobStorage.On( "Update", @@ -2291,7 +2291,7 @@ func TestReturnFunds(t *testing.T) { // No balance remaining helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx3 := db.NewDatabaseTransaction(ctx, false) + dbTx3 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx3).Once() jobStorage.On("Ready", ctx, dbTx3).Return([]*job.Job{}, nil).Once() jobStorage.On( @@ -2329,7 +2329,7 @@ func TestReturnFunds(t *testing.T) { // Will exit this round because we've tried all workflows. helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx4 := db.NewDatabaseTransaction(ctx, false) + dbTx4 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx4).Once() jobStorage.On("Ready", ctx, dbTx4).Return([]*job.Job{}, nil).Once() jobStorage.On("Broadcasting", ctx, dbTx4).Return([]*job.Job{}, nil).Once() @@ -2444,7 +2444,7 @@ func TestNoReservedWorkflows(t *testing.T) { // all responses from the database and "write" transactions require a // lock. While it would be possible to orchestrate these locks in this // test, it is simpler to just use a "read" transaction. - dbTxFail := db.NewDatabaseTransaction(ctx, false) + dbTxFail := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTxFail).Once() jobStorage.On("Ready", ctx, dbTxFail).Return([]*job.Job{}, nil).Once() jobStorage.On("Processing", ctx, dbTxFail, "transfer").Return([]*job.Job{}, nil).Once() @@ -2458,7 +2458,7 @@ func TestNoReservedWorkflows(t *testing.T) { }() helper.On("HeadBlockExists", ctx).Return(true).Once() - dbTx2 := db.NewDatabaseTransaction(ctx, false) + dbTx2 := db.ReadTransaction(ctx) helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once() jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{}, nil).Once() jobStorage.On("Broadcasting", ctx, dbTx2).Return([]*job.Job{}, nil).Once() diff --git a/constructor/worker/worker_test.go b/constructor/worker/worker_test.go index 54a769fe..b63e0dac 100644 --- a/constructor/worker/worker_test.go +++ b/constructor/worker/worker_test.go @@ -963,7 +963,7 @@ func TestFindBalanceWorker(t *testing.T) { assert.NotNil(t, db) defer db.Close(ctx) - dbTx := db.NewDatabaseTransaction(ctx, true) + dbTx := db.Transaction(ctx) defer dbTx.Discard(ctx) worker := New(test.mockHelper) @@ -1125,7 +1125,7 @@ func TestJob_ComplicatedTransfer(t *testing.T) { assert.NotNil(t, db) defer db.Close(ctx) - dbTx := db.NewDatabaseTransaction(ctx, true) + dbTx := db.Transaction(ctx) network := &types.NetworkIdentifier{ Blockchain: "Bitcoin", @@ -1643,7 +1643,7 @@ func TestJob_Failures(t *testing.T) { assert.NotNil(t, db) defer db.Close(ctx) - dbTx := db.NewDatabaseTransaction(ctx, true) + dbTx := db.Transaction(ctx) assert.False(t, j.CheckComplete()) diff --git a/storage/badger_storage_test.go b/storage/badger_storage_test.go index 740f69de..83347949 100644 --- a/storage/badger_storage_test.go +++ b/storage/badger_storage_test.go @@ -59,7 +59,7 @@ func TestDatabase(t *testing.T) { defer database.Close(ctx) t.Run("No key exists", func(t *testing.T) { - txn := database.NewDatabaseTransaction(ctx, false) + txn := database.ReadTransaction(ctx) exists, value, err := txn.Get(ctx, []byte("hello")) assert.False(t, exists) assert.Nil(t, value) @@ -68,14 +68,14 @@ func TestDatabase(t *testing.T) { }) t.Run("Set key", func(t *testing.T) { - txn := database.NewDatabaseTransaction(ctx, true) + txn := database.Transaction(ctx) err := txn.Set(ctx, []byte("hello"), []byte("hola"), true) assert.NoError(t, err) assert.NoError(t, txn.Commit(ctx)) }) t.Run("Get key", func(t *testing.T) { - txn := database.NewDatabaseTransaction(ctx, false) + txn := database.ReadTransaction(ctx) exists, value, err := txn.Get(ctx, []byte("hello")) assert.True(t, exists) assert.Equal(t, []byte("hola"), value) @@ -85,7 +85,7 @@ func TestDatabase(t *testing.T) { t.Run("Many key set/get", func(t *testing.T) { for i := 0; i < 1000; i++ { - txn := database.NewDatabaseTransaction(ctx, true) + txn := database.Transaction(ctx) k := []byte(fmt.Sprintf("blah/%d", i)) v := []byte(fmt.Sprintf("%d", i)) err := txn.Set(ctx, k, v, true) @@ -93,7 +93,7 @@ func TestDatabase(t *testing.T) { assert.NoError(t, txn.Commit(ctx)) for j := 0; j <= i; j++ { - txn := database.NewDatabaseTransaction(ctx, false) + txn := database.ReadTransaction(ctx) jk := []byte(fmt.Sprintf("blah/%d", j)) jv := []byte(fmt.Sprintf("%d", j)) exists, value, err := txn.Get(ctx, jk) @@ -106,7 +106,7 @@ func TestDatabase(t *testing.T) { }) t.Run("Scan", func(t *testing.T) { - txn := database.NewDatabaseTransaction(ctx, true) + txn := database.Transaction(ctx) type scanItem struct { Key []byte Value []byte @@ -175,11 +175,11 @@ func TestDatabaseTransaction(t *testing.T) { defer database.Close(ctx) t.Run("Set and get within a transaction", func(t *testing.T) { - txn := database.NewDatabaseTransaction(ctx, true) + txn := database.Transaction(ctx) assert.NoError(t, txn.Set(ctx, []byte("hello"), []byte("hola"), true)) // Ensure tx does not affect db - txn2 := database.NewDatabaseTransaction(ctx, false) + txn2 := database.ReadTransaction(ctx) exists, value, err := txn2.Get(ctx, []byte("hello")) assert.False(t, exists) assert.Nil(t, value) @@ -188,7 +188,7 @@ func TestDatabaseTransaction(t *testing.T) { assert.NoError(t, txn.Commit(ctx)) - txn3 := database.NewDatabaseTransaction(ctx, false) + txn3 := database.ReadTransaction(ctx) exists, value, err = txn3.Get(ctx, []byte("hello")) assert.True(t, exists) assert.Equal(t, []byte("hola"), value) @@ -197,11 +197,11 @@ func TestDatabaseTransaction(t *testing.T) { }) t.Run("Discard transaction", func(t *testing.T) { - txn := database.NewDatabaseTransaction(ctx, true) + txn := database.Transaction(ctx) assert.NoError(t, txn.Set(ctx, []byte("hello"), []byte("world"), true)) txn.Discard(ctx) - txn2 := database.NewDatabaseTransaction(ctx, false) + txn2 := database.ReadTransaction(ctx) exists, value, err := txn2.Get(ctx, []byte("hello")) txn2.Discard(ctx) assert.True(t, exists) @@ -210,11 +210,11 @@ func TestDatabaseTransaction(t *testing.T) { }) t.Run("Delete within a transaction", func(t *testing.T) { - txn := database.NewDatabaseTransaction(ctx, true) + txn := database.Transaction(ctx) assert.NoError(t, txn.Delete(ctx, []byte("hello"))) assert.NoError(t, txn.Commit(ctx)) - txn2 := database.NewDatabaseTransaction(ctx, false) + txn2 := database.ReadTransaction(ctx) exists, value, err := txn2.Get(ctx, []byte("hello")) assert.False(t, exists) assert.Nil(t, value) @@ -239,7 +239,7 @@ func TestBadgerTrain_NoLimit(t *testing.T) { // Load storage with entries in namespace namespace := "bogus" - txn := database.NewDatabaseTransaction(ctx, true) + txn := database.Transaction(ctx) for i := 0; i < 10000; i++ { entry := &BogusEntry{ Index: i, @@ -282,7 +282,7 @@ func TestBadgerTrain_Limit(t *testing.T) { // Load storage with entries in namespace namespace := "bogus" - txn := database.NewDatabaseTransaction(ctx, true) + txn := database.Transaction(ctx) for i := 0; i < 10000; i++ { output, err := reggen.Generate(`[a-z]+`, 50) assert.NoError(t, err) @@ -334,7 +334,7 @@ func TestBadgerTrain_Limit(t *testing.T) { ) assert.NoError(t, err) - txn2 := database2.NewDatabaseTransaction(ctx, true) + txn2 := database2.Transaction(ctx) for i := 0; i < 10000; i++ { output, err := reggen.Generate(`[a-z]+`, 50) assert.NoError(t, err) diff --git a/storage/balance_storage.go b/storage/balance_storage.go index b06cff46..252891d9 100644 --- a/storage/balance_storage.go +++ b/storage/balance_storage.go @@ -271,7 +271,7 @@ func (b *BalanceStorage) Reconciled( currency *types.Currency, block *types.BlockIdentifier, ) error { - dbTx := b.db.NewDatabaseTransaction(ctx, true) + dbTx := b.db.Transaction(ctx) defer dbTx.Discard(ctx) err := b.updateAccountEntry( @@ -496,7 +496,7 @@ func (b *BalanceStorage) PruneBalances( currency *types.Currency, index int64, ) error { - dbTx := b.db.NewDatabaseTransaction(ctx, true) + dbTx := b.db.Transaction(ctx) defer dbTx.Discard(ctx) err := b.removeHistoricalBalances( @@ -636,7 +636,7 @@ func (b *BalanceStorage) GetBalance( currency *types.Currency, index int64, ) (*types.Amount, error) { - dbTx := b.db.NewDatabaseTransaction(ctx, false) + dbTx := b.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) amount, err := b.GetBalanceTransactional( @@ -749,7 +749,7 @@ func (b *BalanceStorage) GetOrSetBalance( currency *types.Currency, block *types.BlockIdentifier, ) (*types.Amount, error) { - dbTx := b.db.NewDatabaseTransaction(ctx, true) + dbTx := b.db.Transaction(ctx) defer dbTx.Discard(ctx) amount, err := b.GetOrSetBalanceTransactional( @@ -832,7 +832,7 @@ func (b *BalanceStorage) BootstrapBalances( } // Update balances in database - dbTransaction := b.db.NewDatabaseTransaction(ctx, true) + dbTransaction := b.db.Transaction(ctx) defer dbTransaction.Discard(ctx) for _, balance := range balances { @@ -881,7 +881,7 @@ func (b *BalanceStorage) getAllAccountEntries( ctx context.Context, handler func(accountEntry), ) error { - txn := b.db.NewDatabaseTransaction(ctx, false) + txn := b.db.ReadTransaction(ctx) defer txn.Discard(ctx) _, err := txn.Scan( ctx, @@ -948,7 +948,7 @@ func (b *BalanceStorage) SetBalanceImported( accountBalances []*utils.AccountBalance, ) error { // Update balances in database - transaction := b.db.NewDatabaseTransaction(ctx, true) + transaction := b.db.Transaction(ctx) defer transaction.Discard(ctx) for _, accountBalance := range accountBalances { diff --git a/storage/balance_storage_test.go b/storage/balance_storage_test.go index 65c086b3..1a267438 100644 --- a/storage/balance_storage_test.go +++ b/storage/balance_storage_test.go @@ -201,7 +201,7 @@ func TestBalance(t *testing.T) { }) t.Run("Set and get genesis balance", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.SetBalance( ctx, txn, @@ -224,7 +224,7 @@ func TestBalance(t *testing.T) { }) t.Run("Set and get balance", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.SetBalance( ctx, txn, @@ -246,7 +246,7 @@ func TestBalance(t *testing.T) { t.Run("Set and get balance with storage helper", func(t *testing.T) { mockHelper.AccountBalanceAmount = "10" - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.UpdateBalance( ctx, txn, @@ -269,7 +269,7 @@ func TestBalance(t *testing.T) { }) t.Run("Set balance with nil currency", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.UpdateBalance( ctx, txn, @@ -290,7 +290,7 @@ func TestBalance(t *testing.T) { }) t.Run("Modify existing balance", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err = storage.UpdateBalance( ctx, txn, @@ -311,7 +311,7 @@ func TestBalance(t *testing.T) { }) t.Run("Discard transaction", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.UpdateBalance( ctx, txn, @@ -326,7 +326,7 @@ func TestBalance(t *testing.T) { assert.NoError(t, err) // Get balance during transaction - readTx := storage.db.NewDatabaseTransaction(ctx, false) + readTx := storage.db.ReadTransaction(ctx) defer readTx.Discard(ctx) retrievedAmount, err := storage.GetBalanceTransactional( ctx, @@ -342,7 +342,7 @@ func TestBalance(t *testing.T) { }) t.Run("Attempt modification to push balance negative on existing account", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.UpdateBalance( ctx, txn, @@ -359,7 +359,7 @@ func TestBalance(t *testing.T) { }) t.Run("Attempt modification to push balance negative on new acct", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.UpdateBalance( ctx, txn, @@ -377,7 +377,7 @@ func TestBalance(t *testing.T) { }) t.Run("sub account set and get balance", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.UpdateBalance( ctx, txn, @@ -403,7 +403,7 @@ func TestBalance(t *testing.T) { }) t.Run("sub account metadata set and get balance", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.UpdateBalance( ctx, txn, @@ -429,7 +429,7 @@ func TestBalance(t *testing.T) { }) t.Run("sub account unique metadata set and get balance", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.UpdateBalance( ctx, txn, @@ -455,7 +455,7 @@ func TestBalance(t *testing.T) { }) t.Run("balance exemption update", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.SetBalance( ctx, txn, @@ -471,7 +471,7 @@ func TestBalance(t *testing.T) { // Successful (balance > computed and negative intermediate value) mockHelper.AccountBalanceAmount = "150" - txn = storage.db.NewDatabaseTransaction(ctx, true) + txn = storage.db.Transaction(ctx) err = storage.UpdateBalance( ctx, txn, @@ -497,7 +497,7 @@ func TestBalance(t *testing.T) { // Successful (balance == computed) mockHelper.AccountBalanceAmount = "200" - txn = storage.db.NewDatabaseTransaction(ctx, true) + txn = storage.db.Transaction(ctx) err = storage.UpdateBalance( ctx, txn, @@ -523,7 +523,7 @@ func TestBalance(t *testing.T) { // Unsuccessful (balance < computed) mockHelper.AccountBalanceAmount = "10" - txn = storage.db.NewDatabaseTransaction(ctx, true) + txn = storage.db.Transaction(ctx) err = storage.UpdateBalance( ctx, txn, @@ -629,7 +629,7 @@ func TestBalance(t *testing.T) { }) t.Run("update existing balance", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) orphanValue, _ := new(big.Int).SetString(largeDeduction.Value, 10) err := storage.UpdateBalance( ctx, @@ -683,7 +683,7 @@ func TestBalance(t *testing.T) { Currency: largeDeduction.Currency, }, retrievedAmount) - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err = storage.OrphanBalance( ctx, txn, @@ -938,7 +938,7 @@ func TestBootstrapBalances(t *testing.T) { assert.NoError(t, err) // Attempt to update balance - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err = storage.UpdateBalance( ctx, txn, @@ -1105,7 +1105,7 @@ func TestBalanceReconciliation(t *testing.T) { }) t.Run("set balance", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err := storage.UpdateBalance( ctx, txn, @@ -1129,7 +1129,7 @@ func TestBalanceReconciliation(t *testing.T) { err := storage.Reconciled(ctx, account, currency, genesisBlock) assert.NoError(t, err) - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err = storage.UpdateBalance( ctx, txn, @@ -1181,7 +1181,7 @@ func TestBalanceReconciliation(t *testing.T) { }) t.Run("add unreconciled", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) err = storage.UpdateBalance( ctx, txn, @@ -1380,7 +1380,7 @@ func TestBlockSyncing(t *testing.T) { } t.Run("add genesis block", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) _, err = storage.AddingBlock(ctx, b0, dbTx) assert.NoError(t, err) assert.NoError(t, dbTx.Commit(ctx)) @@ -1394,7 +1394,7 @@ func TestBlockSyncing(t *testing.T) { }) t.Run("add block 1", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) _, err = storage.AddingBlock(ctx, b1, dbTx) assert.NoError(t, err) assert.NoError(t, dbTx.Commit(ctx)) @@ -1417,7 +1417,7 @@ func TestBlockSyncing(t *testing.T) { }) t.Run("add block 2", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) _, err = storage.AddingBlock(ctx, b2, dbTx) assert.NoError(t, err) assert.NoError(t, dbTx.Commit(ctx)) @@ -1455,7 +1455,7 @@ func TestBlockSyncing(t *testing.T) { }) t.Run("orphan block 2", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) _, err = storage.RemovingBlock(ctx, b2, dbTx) assert.NoError(t, err) assert.NoError(t, dbTx.Commit(ctx)) @@ -1487,7 +1487,7 @@ func TestBlockSyncing(t *testing.T) { }) t.Run("orphan block 1", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) _, err = storage.RemovingBlock(ctx, b1, dbTx) assert.NoError(t, err) assert.NoError(t, dbTx.Commit(ctx)) @@ -1513,7 +1513,7 @@ func TestBlockSyncing(t *testing.T) { }) t.Run("add block 1", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) _, err = storage.AddingBlock(ctx, b1, dbTx) assert.NoError(t, err) assert.NoError(t, dbTx.Commit(ctx)) @@ -1545,7 +1545,7 @@ func TestBlockSyncing(t *testing.T) { }) t.Run("add block 2a", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) _, err = storage.AddingBlock(ctx, b2a, dbTx) assert.NoError(t, err) assert.NoError(t, dbTx.Commit(ctx)) diff --git a/storage/block_storage.go b/storage/block_storage.go index 6ba80460..70f4db9e 100644 --- a/storage/block_storage.go +++ b/storage/block_storage.go @@ -166,7 +166,7 @@ func (b *BlockStorage) GetOldestBlockIndexTransactional( func (b *BlockStorage) GetOldestBlockIndex( ctx context.Context, ) (int64, error) { - dbTx := b.db.NewDatabaseTransaction(ctx, false) + dbTx := b.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return b.GetOldestBlockIndexTransactional(ctx, dbTx) @@ -183,7 +183,7 @@ func (b *BlockStorage) pruneBlock( // we don't hit the database tx size maximum. As a result, it is possible // that we prune a collection of blocks, encounter an error, and cannot // rollback the pruning operations. - dbTx := b.db.NewDatabaseTransaction(ctx, true) + dbTx := b.db.Transaction(ctx) defer dbTx.Discard(ctx) oldestIndex, err := b.GetOldestBlockIndexTransactional(ctx, dbTx) @@ -290,7 +290,7 @@ func (b *BlockStorage) Prune( func (b *BlockStorage) GetHeadBlockIdentifier( ctx context.Context, ) (*types.BlockIdentifier, error) { - transaction := b.db.NewDatabaseTransaction(ctx, false) + transaction := b.db.ReadTransaction(ctx) defer transaction.Discard(ctx) return b.GetHeadBlockIdentifierTransactional(ctx, transaction) @@ -406,7 +406,7 @@ func (b *BlockStorage) GetBlockLazy( ctx context.Context, blockIdentifier *types.PartialBlockIdentifier, ) (*types.BlockResponse, error) { - transaction := b.db.NewDatabaseTransaction(ctx, false) + transaction := b.db.ReadTransaction(ctx) defer transaction.Discard(ctx) return b.GetBlockLazyTransactional(ctx, blockIdentifier, transaction) @@ -420,7 +420,7 @@ func (b *BlockStorage) CanonicalBlock( ctx context.Context, blockIdentifier *types.BlockIdentifier, ) (bool, error) { - dbTx := b.db.NewDatabaseTransaction(ctx, false) + dbTx := b.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return b.CanonicalBlockTransactional(ctx, blockIdentifier, dbTx) @@ -507,7 +507,7 @@ func (b *BlockStorage) GetBlock( ctx context.Context, blockIdentifier *types.PartialBlockIdentifier, ) (*types.Block, error) { - transaction := b.db.NewDatabaseTransaction(ctx, false) + transaction := b.db.ReadTransaction(ctx) defer transaction.Discard(ctx) return b.GetBlockTransactional(ctx, transaction, blockIdentifier) @@ -555,7 +555,7 @@ func (b *BlockStorage) AddBlock( ctx context.Context, block *types.Block, ) error { - transaction := b.db.NewDatabaseTransaction(ctx, true) + transaction := b.db.Transaction(ctx) defer transaction.Discard(ctx) // Store all transactions in order and check for duplicates @@ -666,7 +666,7 @@ func (b *BlockStorage) RemoveBlock( ctx context.Context, blockIdentifier *types.BlockIdentifier, ) error { - transaction := b.db.NewDatabaseTransaction(ctx, true) + transaction := b.db.Transaction(ctx) defer transaction.Discard(ctx) block, err := b.GetBlockTransactional( @@ -770,7 +770,7 @@ func (b *BlockStorage) SetNewStartIndex( // Ensure we do not set a new start index less // than the oldest block. - dbTx := b.db.NewDatabaseTransaction(ctx, false) + dbTx := b.db.ReadTransaction(ctx) oldestIndex, err := b.GetOldestBlockIndexTransactional(ctx, dbTx) dbTx.Discard(ctx) if err != nil { @@ -1035,7 +1035,7 @@ func (b *BlockStorage) GetBlockTransaction( blockIdentifier *types.BlockIdentifier, transactionIdentifier *types.TransactionIdentifier, ) (*types.Transaction, error) { - transaction := b.db.NewDatabaseTransaction(ctx, false) + transaction := b.db.ReadTransaction(ctx) defer transaction.Discard(ctx) return b.findBlockTransaction(ctx, blockIdentifier, transactionIdentifier, transaction) @@ -1073,7 +1073,7 @@ func (b *BlockStorage) AtTip( ctx context.Context, tipDelay int64, ) (bool, *types.BlockIdentifier, error) { - transaction := b.db.NewDatabaseTransaction(ctx, false) + transaction := b.db.ReadTransaction(ctx) defer transaction.Discard(ctx) return b.AtTipTransactional(ctx, tipDelay, transaction) @@ -1089,7 +1089,7 @@ func (b *BlockStorage) IndexAtTip( tipDelay int64, index int64, ) (bool, error) { - transaction := b.db.NewDatabaseTransaction(ctx, false) + transaction := b.db.ReadTransaction(ctx) defer transaction.Discard(ctx) headBlockResponse, err := b.GetBlockLazyTransactional(ctx, nil, transaction) if errors.Is(err, ErrHeadBlockNotFound) { diff --git a/storage/block_storage_test.go b/storage/block_storage_test.go index cece21f5..af18d821 100644 --- a/storage/block_storage_test.go +++ b/storage/block_storage_test.go @@ -61,7 +61,7 @@ func TestHeadBlockIdentifier(t *testing.T) { }) t.Run("Set and get head block", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) assert.NoError(t, storage.StoreHeadBlockIdentifier(ctx, txn, newBlockIdentifier)) assert.NoError(t, txn.Commit(ctx)) @@ -71,7 +71,7 @@ func TestHeadBlockIdentifier(t *testing.T) { }) t.Run("Discard head block update", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) assert.NoError(t, storage.StoreHeadBlockIdentifier(ctx, txn, &types.BlockIdentifier{ Hash: "no blah", @@ -86,7 +86,7 @@ func TestHeadBlockIdentifier(t *testing.T) { }) t.Run("Multiple updates to head block", func(t *testing.T) { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) assert.NoError(t, storage.StoreHeadBlockIdentifier(ctx, txn, newBlockIdentifier2)) assert.NoError(t, txn.Commit(ctx)) @@ -288,7 +288,7 @@ func findTransactionWithDbTransaction( storage *BlockStorage, transactionIdentifier *types.TransactionIdentifier, ) (*types.BlockIdentifier, *types.Transaction, error) { - txn := storage.db.NewDatabaseTransaction(ctx, false) + txn := storage.db.ReadTransaction(ctx) defer txn.Discard(ctx) return storage.FindTransaction( diff --git a/storage/broadcast_storage.go b/storage/broadcast_storage.go index 5ec1214d..431a627d 100644 --- a/storage/broadcast_storage.go +++ b/storage/broadcast_storage.go @@ -382,7 +382,7 @@ func (b *BroadcastStorage) getAllBroadcasts( // GetAllBroadcasts returns all currently in-process broadcasts. func (b *BroadcastStorage) GetAllBroadcasts(ctx context.Context) ([]*Broadcast, error) { - dbTx := b.db.NewDatabaseTransaction(ctx, false) + dbTx := b.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return b.getAllBroadcasts(ctx, dbTx) @@ -399,7 +399,7 @@ func (b *BroadcastStorage) performBroadcast( return fmt.Errorf("%w: %v", ErrBroadcastEncodeFailed, err) } - txn := b.db.NewDatabaseTransaction(ctx, true) + txn := b.db.Transaction(ctx) defer txn.Discard(ctx) if err := txn.Set(ctx, key, bytes, true); err != nil { @@ -485,7 +485,7 @@ func (b *BroadcastStorage) BroadcastAll(ctx context.Context, onlyEligible bool) } if broadcast.Broadcasts >= b.broadcastLimit { - txn := b.db.NewDatabaseTransaction(ctx, true) + txn := b.db.Transaction(ctx) defer txn.Discard(ctx) _, key := getBroadcastKey(broadcast.TransactionIdentifier) @@ -573,7 +573,7 @@ func (b *BroadcastStorage) ClearBroadcasts(ctx context.Context) ([]*Broadcast, e return nil, fmt.Errorf("%w: %v", ErrBroadcastGetAllFailed, err) } - txn := b.db.NewDatabaseTransaction(ctx, true) + txn := b.db.Transaction(ctx) for _, broadcast := range broadcasts { _, key := getBroadcastKey(broadcast.TransactionIdentifier) if err := txn.Delete(ctx, key); err != nil { diff --git a/storage/broadcast_storage_test.go b/storage/broadcast_storage_test.go index 3064e691..2225ae64 100644 --- a/storage/broadcast_storage_test.go +++ b/storage/broadcast_storage_test.go @@ -107,7 +107,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { mockHelper.AtSyncTip = true t.Run("broadcast send 1 before block exists", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) err := storage.Broadcast( @@ -150,7 +150,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { t.Run("add block 0", func(t *testing.T) { block := blocks[0] - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) commitWorker, err := storage.AddingBlock(ctx, block, txn) assert.NoError(t, err) err = txn.Commit(ctx) @@ -189,7 +189,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { t.Run("add block 1", func(t *testing.T) { block := blocks[1] - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) commitWorker, err := storage.AddingBlock(ctx, block, txn) assert.NoError(t, err) err = txn.Commit(ctx) @@ -226,7 +226,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { }) t.Run("broadcast send 2 after adding a block", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) err := storage.Broadcast( @@ -282,7 +282,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { t.Run("add block 2", func(t *testing.T) { block := blocks[2] - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) commitWorker, err := storage.AddingBlock(ctx, block, txn) assert.NoError(t, err) @@ -356,7 +356,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { block := blocks[3] block.Transactions = []*types.Transaction{tx1} - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) commitWorker, err := storage.AddingBlock(ctx, block, txn) assert.NoError(t, err) err = txn.Commit(ctx) @@ -412,7 +412,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { block := blocks[4] block.Transactions = []*types.Transaction{tx2} - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) commitWorker, err := storage.AddingBlock(ctx, block, txn) assert.NoError(t, err) @@ -462,7 +462,7 @@ func TestBroadcastStorageBroadcastSuccess(t *testing.T) { t.Run("add block 5", func(t *testing.T) { block := blocks[5] - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) commitWorker, err := storage.AddingBlock(ctx, block, txn) assert.NoError(t, err) @@ -527,7 +527,7 @@ func TestBroadcastStorageBroadcastFailure(t *testing.T) { storage.Initialize(mockHelper, mockHandler) t.Run("locked addresses with no broadcasts", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, false) + dbTx := database.ReadTransaction(ctx) defer dbTx.Discard(ctx) accounts, err := storage.LockedAccounts(ctx, dbTx) @@ -549,7 +549,7 @@ func TestBroadcastStorageBroadcastFailure(t *testing.T) { send2 := opFiller("addr 2", 13) network := &types.NetworkIdentifier{Blockchain: "Bitcoin", Network: "Testnet3"} t.Run("broadcast", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) err := storage.Broadcast( @@ -619,7 +619,7 @@ func TestBroadcastStorageBroadcastFailure(t *testing.T) { blocks := blockFiller(0, 10) mockHelper.AtSyncTip = true for _, block := range blocks { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) commitWorker, err := storage.AddingBlock(ctx, block, txn) assert.NoError(t, err) err = txn.Commit(ctx) @@ -630,7 +630,7 @@ func TestBroadcastStorageBroadcastFailure(t *testing.T) { assert.NoError(t, err) } - dbTx := database.NewDatabaseTransaction(ctx, false) + dbTx := database.ReadTransaction(ctx) defer dbTx.Discard(ctx) accounts, err := storage.LockedAccounts(ctx, dbTx) @@ -693,7 +693,7 @@ func TestBroadcastStorageBehindTip(t *testing.T) { send2 := opFiller("addr 2", 1) network := &types.NetworkIdentifier{Blockchain: "Bitcoin", Network: "Testnet3"} t.Run("broadcast", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) err := storage.Broadcast( @@ -762,7 +762,7 @@ func TestBroadcastStorageBehindTip(t *testing.T) { t.Run("add blocks behind tip", func(t *testing.T) { for _, block := range blocks[:60] { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) commitWorker, err := storage.AddingBlock(ctx, block, txn) assert.NoError(t, err) err = txn.Commit(ctx) @@ -773,7 +773,7 @@ func TestBroadcastStorageBehindTip(t *testing.T) { assert.NoError(t, err) } - dbTx := database.NewDatabaseTransaction(ctx, false) + dbTx := database.ReadTransaction(ctx) defer dbTx.Discard(ctx) accounts, err := storage.LockedAccounts(ctx, dbTx) @@ -812,7 +812,7 @@ func TestBroadcastStorageBehindTip(t *testing.T) { mockHelper.AtSyncTip = true t.Run("add blocks close to tip", func(t *testing.T) { for _, block := range blocks[60:71] { - txn := storage.db.NewDatabaseTransaction(ctx, true) + txn := storage.db.Transaction(ctx) commitWorker, err := storage.AddingBlock(ctx, block, txn) assert.NoError(t, err) err = txn.Commit(ctx) @@ -823,7 +823,7 @@ func TestBroadcastStorageBehindTip(t *testing.T) { assert.NoError(t, err) } - dbTx := database.NewDatabaseTransaction(ctx, false) + dbTx := database.ReadTransaction(ctx) defer dbTx.Discard(ctx) accounts, err := storage.LockedAccounts(ctx, dbTx) @@ -889,7 +889,7 @@ func TestBroadcastStorageClearBroadcasts(t *testing.T) { network := &types.NetworkIdentifier{Blockchain: "Bitcoin", Network: "Testnet3"} t.Run("locked addresses with no broadcasts", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, false) + dbTx := database.ReadTransaction(ctx) defer dbTx.Discard(ctx) accounts, err := storage.LockedAccounts(ctx, dbTx) @@ -901,7 +901,7 @@ func TestBroadcastStorageClearBroadcasts(t *testing.T) { send1 := opFiller("addr 1", 11) send2 := opFiller("addr 2", 13) t.Run("broadcast", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) err := storage.Broadcast( @@ -983,7 +983,7 @@ func TestBroadcastStorageClearBroadcasts(t *testing.T) { }, }, broadcasts) - dbTx := database.NewDatabaseTransaction(ctx, false) + dbTx := database.ReadTransaction(ctx) defer dbTx.Discard(ctx) accounts, err := storage.LockedAccounts(ctx, dbTx) diff --git a/storage/coin_storage.go b/storage/coin_storage.go index 72981fb8..a81acbd1 100644 --- a/storage/coin_storage.go +++ b/storage/coin_storage.go @@ -127,7 +127,7 @@ func (c *CoinStorage) AddCoins( ctx context.Context, accountCoins []*AccountCoin, ) error { - dbTransaction := c.db.NewDatabaseTransaction(ctx, true) + dbTransaction := c.db.Transaction(ctx) defer dbTransaction.Discard(ctx) for _, accountCoin := range accountCoins { @@ -419,7 +419,7 @@ func (c *CoinStorage) GetCoins( ctx context.Context, accountIdentifier *types.AccountIdentifier, ) ([]*types.Coin, *types.BlockIdentifier, error) { - dbTx := c.db.NewDatabaseTransaction(ctx, false) + dbTx := c.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return c.GetCoinsTransactional(ctx, dbTx, accountIdentifier) @@ -449,7 +449,7 @@ func (c *CoinStorage) GetCoin( ctx context.Context, coinIdentifier *types.CoinIdentifier, ) (*types.Coin, *types.AccountIdentifier, error) { - dbTx := c.db.NewDatabaseTransaction(ctx, false) + dbTx := c.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return c.GetCoinTransactional(ctx, dbTx, coinIdentifier) diff --git a/storage/coin_storage_test.go b/storage/coin_storage_test.go index 66a49470..6ab319a6 100644 --- a/storage/coin_storage_test.go +++ b/storage/coin_storage_test.go @@ -495,7 +495,7 @@ func TestCoinStorage(t *testing.T) { }) t.Run("add block", func(t *testing.T) { - tx := c.db.NewDatabaseTransaction(ctx, true) + tx := c.db.Transaction(ctx) commitFunc, err := c.AddingBlock(ctx, coinBlock, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) @@ -508,7 +508,7 @@ func TestCoinStorage(t *testing.T) { }) t.Run("add duplicate coin", func(t *testing.T) { - tx := c.db.NewDatabaseTransaction(ctx, true) + tx := c.db.Transaction(ctx) commitFunc, err := c.AddingBlock(ctx, coinBlock, tx) assert.Nil(t, commitFunc) assert.Error(t, err) @@ -521,7 +521,7 @@ func TestCoinStorage(t *testing.T) { }) t.Run("add duplicate coin in same block", func(t *testing.T) { - tx := c.db.NewDatabaseTransaction(ctx, true) + tx := c.db.Transaction(ctx) commitFunc, err := c.AddingBlock(ctx, coinBlockRepeat, tx) assert.Nil(t, commitFunc) assert.Error(t, err) @@ -534,7 +534,7 @@ func TestCoinStorage(t *testing.T) { }) t.Run("remove block", func(t *testing.T) { - tx := c.db.NewDatabaseTransaction(ctx, true) + tx := c.db.Transaction(ctx) commitFunc, err := c.RemovingBlock(ctx, coinBlock, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) @@ -552,7 +552,7 @@ func TestCoinStorage(t *testing.T) { }) t.Run("spend coin", func(t *testing.T) { - tx := c.db.NewDatabaseTransaction(ctx, true) + tx := c.db.Transaction(ctx) commitFunc, err := c.AddingBlock(ctx, coinBlock, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) @@ -563,7 +563,7 @@ func TestCoinStorage(t *testing.T) { assert.Equal(t, accountCoins, coins) assert.Equal(t, blockIdentifier, block) - tx = c.db.NewDatabaseTransaction(ctx, true) + tx = c.db.Transaction(ctx) commitFunc, err = c.AddingBlock(ctx, coinBlock2, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) @@ -581,7 +581,7 @@ func TestCoinStorage(t *testing.T) { }) t.Run("add block with multiple outputs for 1 account", func(t *testing.T) { - tx := c.db.NewDatabaseTransaction(ctx, true) + tx := c.db.Transaction(ctx) commitFunc, err := c.AddingBlock(ctx, coinBlock3, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) @@ -611,13 +611,13 @@ func TestCoinStorage(t *testing.T) { }) t.Run("remove block that creates and spends single coin", func(t *testing.T) { - tx := c.db.NewDatabaseTransaction(ctx, true) + tx := c.db.Transaction(ctx) commitFunc, err := c.RemovingBlock(ctx, coinBlock3, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) assert.NoError(t, tx.Commit(ctx)) - tx = c.db.NewDatabaseTransaction(ctx, true) + tx = c.db.Transaction(ctx) commitFunc, err = c.AddingBlock(ctx, coinBlock3, tx) assert.Nil(t, commitFunc) assert.NoError(t, err) diff --git a/storage/counter_storage.go b/storage/counter_storage.go index 0b007e98..7a765f50 100644 --- a/storage/counter_storage.go +++ b/storage/counter_storage.go @@ -144,7 +144,7 @@ func (c *CounterStorage) Update( counter string, amount *big.Int, ) (*big.Int, error) { - dbTx := c.db.NewDatabaseTransaction(ctx, true) + dbTx := c.db.Transaction(ctx) defer dbTx.Discard(ctx) newVal, err := c.UpdateTransactional(ctx, dbTx, counter, amount) @@ -161,7 +161,7 @@ func (c *CounterStorage) Update( // Get returns the current value of a counter. func (c *CounterStorage) Get(ctx context.Context, counter string) (*big.Int, error) { - transaction := c.db.NewDatabaseTransaction(ctx, false) + transaction := c.db.ReadTransaction(ctx) defer transaction.Discard(ctx) return transactionalGet(ctx, counter, transaction) diff --git a/storage/job_storage.go b/storage/job_storage.go index 68d1ea60..31fcb78d 100644 --- a/storage/job_storage.go +++ b/storage/job_storage.go @@ -125,7 +125,7 @@ func (j *JobStorage) Processing( // AllProcessing gets all processing *job.Jobs. func (j *JobStorage) AllProcessing(ctx context.Context) ([]*job.Job, error) { - dbTx := j.db.NewDatabaseTransaction(ctx, false) + dbTx := j.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return j.getAllJobs(ctx, dbTx, getJobMetadataKey(processingKey)) @@ -133,7 +133,7 @@ func (j *JobStorage) AllProcessing(ctx context.Context) ([]*job.Job, error) { // Failed returns all failed *job.Job of a certain workflow. func (j *JobStorage) Failed(ctx context.Context, workflow string) ([]*job.Job, error) { - dbTx := j.db.NewDatabaseTransaction(ctx, false) + dbTx := j.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return j.getAllJobs(ctx, dbTx, getJobMetadataKey(getJobFailedKey(workflow))) @@ -141,7 +141,7 @@ func (j *JobStorage) Failed(ctx context.Context, workflow string) ([]*job.Job, e // AllFailed returns all failed *job.Jobs. func (j *JobStorage) AllFailed(ctx context.Context) ([]*job.Job, error) { - dbTx := j.db.NewDatabaseTransaction(ctx, false) + dbTx := j.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return j.getAllJobs(ctx, dbTx, getJobMetadataKey(failedKey)) @@ -149,7 +149,7 @@ func (j *JobStorage) AllFailed(ctx context.Context) ([]*job.Job, error) { // Completed gets all successfully completed *job.Job of a certain workflow. func (j *JobStorage) Completed(ctx context.Context, workflow string) ([]*job.Job, error) { - dbTx := j.db.NewDatabaseTransaction(ctx, false) + dbTx := j.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return j.getAllJobs(ctx, dbTx, getJobMetadataKey(getJobCompletedKey(workflow))) @@ -157,7 +157,7 @@ func (j *JobStorage) Completed(ctx context.Context, workflow string) ([]*job.Job // AllCompleted gets all successfully completed *job.Jobs. func (j *JobStorage) AllCompleted(ctx context.Context) ([]*job.Job, error) { - dbTx := j.db.NewDatabaseTransaction(ctx, false) + dbTx := j.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return j.getAllJobs(ctx, dbTx, getJobMetadataKey(completedKey)) diff --git a/storage/job_storage_test.go b/storage/job_storage_test.go index d4dee3f7..bf87699d 100644 --- a/storage/job_storage_test.go +++ b/storage/job_storage_test.go @@ -38,7 +38,7 @@ func TestJobStorage(t *testing.T) { storage := NewJobStorage(database) t.Run("get non-existent job", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, false) + dbTx := database.ReadTransaction(ctx) defer dbTx.Discard(ctx) job, err := storage.Get(ctx, dbTx, "job1") @@ -71,7 +71,7 @@ func TestJobStorage(t *testing.T) { Status: job.Broadcasting, } t.Run("add job", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) jobIdentifier, err := storage.Update(ctx, dbTx, newJob) @@ -118,7 +118,7 @@ func TestJobStorage(t *testing.T) { Status: job.Ready, } t.Run("add another job", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) jobIdentifier, err := storage.Update(ctx, dbTx, newJob2) @@ -165,7 +165,7 @@ func TestJobStorage(t *testing.T) { Status: job.Completed, } t.Run("add another job", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) jobIdentifier, err := storage.Update(ctx, dbTx, newJob3) @@ -212,7 +212,7 @@ func TestJobStorage(t *testing.T) { }) t.Run("update job 1", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) newJob.Status = job.Completed @@ -262,7 +262,7 @@ func TestJobStorage(t *testing.T) { }) t.Run("fail job 2", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) newJob2.Status = job.Failed @@ -320,7 +320,7 @@ func TestJobStorage(t *testing.T) { }) t.Run("attempt to update job 2", func(t *testing.T) { - dbTx := database.NewDatabaseTransaction(ctx, true) + dbTx := database.Transaction(ctx) defer dbTx.Discard(ctx) newJob2.Status = job.Completed diff --git a/storage/key_storage.go b/storage/key_storage.go index a4348a8e..f6157c8f 100644 --- a/storage/key_storage.go +++ b/storage/key_storage.go @@ -112,7 +112,7 @@ func (k *KeyStorage) Store( account *types.AccountIdentifier, keyPair *keys.KeyPair, ) error { - dbTx := k.db.NewDatabaseTransaction(ctx, true) + dbTx := k.db.Transaction(ctx) defer dbTx.Discard(ctx) if err := k.StoreTransactional(ctx, account, keyPair, dbTx); err != nil { @@ -155,7 +155,7 @@ func (k *KeyStorage) Get( ctx context.Context, account *types.AccountIdentifier, ) (*keys.KeyPair, error) { - transaction := k.db.NewDatabaseTransaction(ctx, false) + transaction := k.db.ReadTransaction(ctx) defer transaction.Discard(ctx) return k.GetTransactional(ctx, transaction, account) @@ -193,7 +193,7 @@ func (k *KeyStorage) GetAllAccountsTransactional( // GetAllAccounts returns all AccountIdentifiers in key storage. func (k *KeyStorage) GetAllAccounts(ctx context.Context) ([]*types.AccountIdentifier, error) { - dbTx := k.db.NewDatabaseTransaction(ctx, false) + dbTx := k.db.ReadTransaction(ctx) defer dbTx.Discard(ctx) return k.GetAllAccountsTransactional(ctx, dbTx) From 61557acaa678de1f06414b6a9e45f15cc2610179 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 25 Nov 2020 09:39:21 -0800 Subject: [PATCH 5/6] Initialize MutexMap --- storage/badger_storage.go | 1 + 1 file changed, 1 insertion(+) diff --git a/storage/badger_storage.go b/storage/badger_storage.go index c0f1f584..e2b7da1d 100644 --- a/storage/badger_storage.go +++ b/storage/badger_storage.go @@ -197,6 +197,7 @@ func NewBadgerStorage( closed: make(chan struct{}), pool: NewBufferPool(), compress: true, + writer: utils.NewMutexMap(utils.DefaultShards), } for _, opt := range storageOptions { opt(b) From 5002d777a55cd7e35d998046cfcfca9c887e6383 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 25 Nov 2020 09:47:57 -0800 Subject: [PATCH 6/6] Make MutexMap shards configurable --- storage/badger_storage.go | 9 +++++++-- storage/badger_storage_configuration.go | 10 ++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/storage/badger_storage.go b/storage/badger_storage.go index e2b7da1d..933b75af 100644 --- a/storage/badger_storage.go +++ b/storage/badger_storage.go @@ -80,7 +80,8 @@ type BadgerStorage struct { encoder *Encoder compress bool - writer *utils.MutexMap + writer *utils.MutexMap + writerShards int // Track the closed status to ensure we exit garbage // collection when the db closes. @@ -197,12 +198,16 @@ func NewBadgerStorage( closed: make(chan struct{}), pool: NewBufferPool(), compress: true, - writer: utils.NewMutexMap(utils.DefaultShards), + writerShards: utils.DefaultShards, } for _, opt := range storageOptions { opt(b) } + // Initialize utis.MutexMap used to track granular + // write transactions. + b.writer = utils.NewMutexMap(b.writerShards) + db, err := badger.Open(b.badgerOptions) if err != nil { return nil, fmt.Errorf("%w: %v", ErrDatabaseOpenFailed, err) diff --git a/storage/badger_storage_configuration.go b/storage/badger_storage_configuration.go index 645a7919..e7b894bd 100644 --- a/storage/badger_storage_configuration.go +++ b/storage/badger_storage_configuration.go @@ -56,3 +56,13 @@ func WithCustomSettings(settings badger.Options) BadgerOption { b.badgerOptions = settings } } + +// WithWriterShards overrides the default shards used +// in the writer utils.MutexMap. It is recommended +// to set this value to your write concurrency to prevent +// lock contention. +func WithWriterShards(shards int) BadgerOption { + return func(b *BadgerStorage) { + b.writerShards = shards + } +}