Skip to content

Commit

Permalink
Merge pull request #251 from coinbase/patrick/storage-overhaul
Browse files Browse the repository at this point in the history
[storage] Interface Overhaul
  • Loading branch information
patrick-ogrady authored Nov 25, 2020
2 parents 7df574c + 5002d77 commit 20bc316
Show file tree
Hide file tree
Showing 18 changed files with 240 additions and 178 deletions.
50 changes: 25 additions & 25 deletions constructor/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestProcess(t *testing.T) {
// all responses from the database and "write" transactions require a
// lock. While it would be possible to orchestrate these locks in this
// test, it is simpler to just use a "read" transaction.
dbTxFail := db.NewDatabaseTransaction(ctx, false)
dbTxFail := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTxFail).Once()
jobStorage.On("Ready", ctx, dbTxFail).Return([]*job.Job{}, nil).Once()
jobStorage.On("Processing", ctx, dbTxFail, "transfer").Return([]*job.Job{}, nil).Once()
Expand All @@ -261,7 +261,7 @@ func TestProcess(t *testing.T) {
// Determine account must be created
helper.On("HeadBlockExists", ctx).Return(true).Once()

dbTx := db.NewDatabaseTransaction(ctx, false)
dbTx := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx).Once()
jobStorage.On("Ready", ctx, dbTx).Return([]*job.Job{}, nil).Once()
jobStorage.On("Broadcasting", ctx, dbTx).Return([]*job.Job{}, nil).Once()
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestProcess(t *testing.T) {

// Attempt to run transfer again (but determine funds are needed)
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTxFail2 := db.NewDatabaseTransaction(ctx, false)
dbTxFail2 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTxFail2).Once()
jobStorage.On("Ready", ctx, dbTxFail2).Return([]*job.Job{}, nil).Once()
jobStorage.On("Processing", ctx, dbTxFail2, "transfer").Return([]*job.Job{}, nil).Once()
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestProcess(t *testing.T) {
// Attempt funds request
helper.On("HeadBlockExists", ctx).Return(true).Once()

dbTx2 := db.NewDatabaseTransaction(ctx, false)
dbTx2 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once()
jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{}, nil).Once()
jobStorage.On("Broadcasting", ctx, dbTx2).Return([]*job.Job{}, nil).Once()
Expand Down Expand Up @@ -370,7 +370,7 @@ func TestProcess(t *testing.T) {

// Load funds
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTxExtra := db.NewDatabaseTransaction(ctx, false)
dbTxExtra := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTxExtra).Once()
jobStorage.On("Ready", ctx, dbTxExtra).Return([]*job.Job{&jobExtra}, nil).Once()
helper.On("AllAccounts", ctx, dbTxExtra).Return([]*types.AccountIdentifier{
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestProcess(t *testing.T) {

// Attempt to transfer again
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTxFail3 := db.NewDatabaseTransaction(ctx, false)
dbTxFail3 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTxFail3).Once()
jobStorage.On("Ready", ctx, dbTxFail3).Return([]*job.Job{}, nil).Once()
jobStorage.On("Processing", ctx, dbTxFail3, "transfer").Return([]*job.Job{}, nil).Once()
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestProcess(t *testing.T) {

// Attempt to create recipient
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx3 := db.NewDatabaseTransaction(ctx, false)
dbTx3 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx3).Once()
jobStorage.On("Ready", ctx, dbTx3).Return([]*job.Job{}, nil).Once()
jobStorage.On("Broadcasting", ctx, dbTx3).Return([]*job.Job{}, nil).Once()
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestProcess(t *testing.T) {

// Attempt to create transfer
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx4 := db.NewDatabaseTransaction(ctx, false)
dbTx4 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx4).Once()
jobStorage.On("Ready", ctx, dbTx4).Return([]*job.Job{}, nil).Once()
jobStorage.On("Processing", ctx, dbTx4, "transfer").Return([]*job.Job{}, nil).Once()
Expand Down Expand Up @@ -676,7 +676,7 @@ func TestProcess(t *testing.T) {

// Wait for transfer to complete
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx5 := db.NewDatabaseTransaction(ctx, false)
dbTx5 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx5).Once()
jobStorage.On("Ready", ctx, dbTx5).Return([]*job.Job{}, nil).Once()
jobStorage.On("Processing", ctx, dbTx5, "transfer").Return([]*job.Job{&job4}, nil).Once()
Expand Down Expand Up @@ -720,7 +720,7 @@ func TestProcess(t *testing.T) {
}
go func() {
<-markConfirmed
dbTx6 := db.NewDatabaseTransaction(ctx, false)
dbTx6 := db.ReadTransaction(ctx)
jobStorage.On("Get", ctx, dbTx6, "job4").Return(&job4, nil).Once()
jobStorage.On(
"Update",
Expand All @@ -745,7 +745,7 @@ func TestProcess(t *testing.T) {
}()

helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx7 := db.NewDatabaseTransaction(ctx, false)
dbTx7 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx7).Once()
jobStorage.On("Ready", ctx, dbTx7).Return([]*job.Job{&job4}, nil).Once()
jobStorage.On(
Expand Down Expand Up @@ -880,7 +880,7 @@ func TestProcess_Failed(t *testing.T) {

// Attempt to create transfer
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx := db.NewDatabaseTransaction(ctx, false)
dbTx := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx).Once()
jobStorage.On("Ready", ctx, dbTx).Return([]*job.Job{}, nil).Once()
jobStorage.On("Processing", ctx, dbTx, "transfer").Return([]*job.Job{}, nil).Once()
Expand Down Expand Up @@ -1147,7 +1147,7 @@ func TestProcess_Failed(t *testing.T) {

// Wait for transfer to complete
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx2 := db.NewDatabaseTransaction(ctx, false)
dbTx2 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once()
jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{}, nil).Once()
jobStorage.On("Processing", ctx, dbTx2, "transfer").Return([]*job.Job{&j}, nil).Once()
Expand All @@ -1161,7 +1161,7 @@ func TestProcess_Failed(t *testing.T) {

go func() {
<-markConfirmed
dbTx3 := db.NewDatabaseTransaction(ctx, false)
dbTx3 := db.ReadTransaction(ctx)
jobStorage.On("Get", ctx, dbTx3, jobIdentifier).Return(&j, nil).Once()
jobStorage.On(
"Update",
Expand Down Expand Up @@ -1398,7 +1398,7 @@ func TestProcess_DryRun(t *testing.T) {
helper.On("HeadBlockExists", ctx).Return(true).Once()

// Attempt to transfer
dbTx := db.NewDatabaseTransaction(ctx, false)
dbTx := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx).Once()
jobStorage.On("Ready", ctx, dbTx).Return([]*job.Job{}, nil).Once()
jobStorage.On("Processing", ctx, dbTx, "transfer").Return([]*job.Job{}, nil).Once()
Expand Down Expand Up @@ -1509,7 +1509,7 @@ func TestProcess_DryRun(t *testing.T) {

// Process second scenario
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx2 := db.NewDatabaseTransaction(ctx, false)
dbTx2 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once()
jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{&j}, nil).Once()
jobStorage.On("Update", ctx, dbTx2, mock.Anything).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -1683,7 +1683,7 @@ func TestReturnFunds_NoBalance(t *testing.T) {
helper.On("HeadBlockExists", ctx).Return(true).Once()

// Attempt to transfer
dbTxFail := db.NewDatabaseTransaction(ctx, false)
dbTxFail := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTxFail).Once()
jobStorage.On("Ready", ctx, dbTxFail).Return([]*job.Job{}, nil).Once()
jobStorage.On(
Expand Down Expand Up @@ -1748,7 +1748,7 @@ func TestReturnFunds_NoBalance(t *testing.T) {

// Will exit this round because we've tried all workflows.
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx2 := db.NewDatabaseTransaction(ctx, false)
dbTx2 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once()
jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{}, nil).Once()
jobStorage.On("Broadcasting", ctx, dbTx2).Return([]*job.Job{}, nil).Once()
Expand Down Expand Up @@ -2015,7 +2015,7 @@ func TestReturnFunds(t *testing.T) {
helper.On("HeadBlockExists", ctx).Return(true).Once()

// Attempt to transfer
dbTxFail := db.NewDatabaseTransaction(ctx, false)
dbTxFail := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTxFail).Once()
jobStorage.On("Ready", ctx, dbTxFail).Return([]*job.Job{}, nil).Once()
jobStorage.On(
Expand Down Expand Up @@ -2214,7 +2214,7 @@ func TestReturnFunds(t *testing.T) {

// Wait for transfer to complete
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx := db.NewDatabaseTransaction(ctx, false)
dbTx := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx).Once()
jobStorage.On("Ready", ctx, dbTx).Return([]*job.Job{}, nil).Once()
jobStorage.On(
Expand Down Expand Up @@ -2266,7 +2266,7 @@ func TestReturnFunds(t *testing.T) {
}
go func() {
<-markConfirmed
dbTx2 := db.NewDatabaseTransaction(ctx, false)
dbTx2 := db.ReadTransaction(ctx)
jobStorage.On("Get", ctx, dbTx2, jobIdentifier).Return(&j, nil).Once()
jobStorage.On(
"Update",
Expand All @@ -2291,7 +2291,7 @@ func TestReturnFunds(t *testing.T) {

// No balance remaining
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx3 := db.NewDatabaseTransaction(ctx, false)
dbTx3 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx3).Once()
jobStorage.On("Ready", ctx, dbTx3).Return([]*job.Job{}, nil).Once()
jobStorage.On(
Expand Down Expand Up @@ -2329,7 +2329,7 @@ func TestReturnFunds(t *testing.T) {

// Will exit this round because we've tried all workflows.
helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx4 := db.NewDatabaseTransaction(ctx, false)
dbTx4 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx4).Once()
jobStorage.On("Ready", ctx, dbTx4).Return([]*job.Job{}, nil).Once()
jobStorage.On("Broadcasting", ctx, dbTx4).Return([]*job.Job{}, nil).Once()
Expand Down Expand Up @@ -2444,7 +2444,7 @@ func TestNoReservedWorkflows(t *testing.T) {
// all responses from the database and "write" transactions require a
// lock. While it would be possible to orchestrate these locks in this
// test, it is simpler to just use a "read" transaction.
dbTxFail := db.NewDatabaseTransaction(ctx, false)
dbTxFail := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTxFail).Once()
jobStorage.On("Ready", ctx, dbTxFail).Return([]*job.Job{}, nil).Once()
jobStorage.On("Processing", ctx, dbTxFail, "transfer").Return([]*job.Job{}, nil).Once()
Expand All @@ -2458,7 +2458,7 @@ func TestNoReservedWorkflows(t *testing.T) {
}()

helper.On("HeadBlockExists", ctx).Return(true).Once()
dbTx2 := db.NewDatabaseTransaction(ctx, false)
dbTx2 := db.ReadTransaction(ctx)
helper.On("DatabaseTransaction", ctx).Return(dbTx2).Once()
jobStorage.On("Ready", ctx, dbTx2).Return([]*job.Job{}, nil).Once()
jobStorage.On("Broadcasting", ctx, dbTx2).Return([]*job.Job{}, nil).Once()
Expand Down
6 changes: 3 additions & 3 deletions constructor/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func TestFindBalanceWorker(t *testing.T) {
assert.NotNil(t, db)
defer db.Close(ctx)

dbTx := db.NewDatabaseTransaction(ctx, true)
dbTx := db.Transaction(ctx)
defer dbTx.Discard(ctx)

worker := New(test.mockHelper)
Expand Down Expand Up @@ -1125,7 +1125,7 @@ func TestJob_ComplicatedTransfer(t *testing.T) {
assert.NotNil(t, db)
defer db.Close(ctx)

dbTx := db.NewDatabaseTransaction(ctx, true)
dbTx := db.Transaction(ctx)

network := &types.NetworkIdentifier{
Blockchain: "Bitcoin",
Expand Down Expand Up @@ -1643,7 +1643,7 @@ func TestJob_Failures(t *testing.T) {
assert.NotNil(t, db)
defer db.Close(ctx)

dbTx := db.NewDatabaseTransaction(ctx, true)
dbTx := db.Transaction(ctx)

assert.False(t, j.CheckComplete())

Expand Down
Loading

0 comments on commit 20bc316

Please sign in to comment.