Skip to content

Commit

Permalink
Updates from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Sep 30, 2020
1 parent d439565 commit 81e279b
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 17 deletions.
24 changes: 12 additions & 12 deletions models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,16 @@ func getContactIDsFromUUIDs(ctx context.Context, tx Queryer, orgID OrgID, uuids
func queryContactIDs(ctx context.Context, tx Queryer, query string, args ...interface{}) ([]ContactID, error) {
ids := make([]ContactID, 0, 10)
rows, err := tx.QueryxContext(ctx, query, args...)
if err != nil {
return nil, err
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, "error querying contact ids")
}
defer rows.Close()

var id ContactID
for rows.Next() {
err := rows.Scan(&id)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "error scanning contact id")
}
ids = append(ids, id)
}
Expand Down Expand Up @@ -611,7 +611,7 @@ func GetOrCreateContactIDsFromURNs(ctx context.Context, db *sqlx.DB, oa *OrgAsse
}

// looks up the contacts who own the given urns (which should be normalized by the caller) and returns that information as a map
func contactIDsFromURNs(ctx context.Context, db *sqlx.DB, orgID OrgID, urnz []urns.URN) (map[urns.URN]ContactID, error) {
func contactIDsFromURNs(ctx context.Context, db Queryer, orgID OrgID, urnz []urns.URN) (map[urns.URN]ContactID, error) {
identityToOriginal := make(map[urns.URN]urns.URN, len(urnz))
identities := make([]urns.URN, len(urnz))
owners := make(map[urns.URN]ContactID, len(urnz))
Expand Down Expand Up @@ -694,7 +694,7 @@ func CreateContact(ctx context.Context, db *sqlx.DB, oa *OrgAssets, userID UserI
// * If URNs exists and belongs to a single contact it returns that contact (other URNs are not assigned to the contact).
// * If URNs exists and belongs to multiple contacts it will return an error.
//
func GetOrCreateContact(ctx context.Context, db *sqlx.DB, oa *OrgAssets, urnz []urns.URN) (*Contact, *flows.Contact, error) {
func GetOrCreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, urnz []urns.URN) (*Contact, *flows.Contact, error) {
// ensure all URNs are normalized
for i, urn := range urnz {
urnz[i] = urn.Normalize(string(oa.Env().DefaultCountry()))
Expand Down Expand Up @@ -728,7 +728,7 @@ func GetOrCreateContact(ctx context.Context, db *sqlx.DB, oa *OrgAssets, urnz []
return contact, flowContact, nil
}

func getOrCreateContact(ctx context.Context, db *sqlx.DB, orgID OrgID, urnz []urns.URN) (ContactID, bool, error) {
func getOrCreateContact(ctx context.Context, db QueryerWithTx, orgID OrgID, urnz []urns.URN) (ContactID, bool, error) {
// find current owners of these URNs
owners, err := contactIDsFromURNs(ctx, db, orgID, urnz)
if err != nil {
Expand Down Expand Up @@ -783,7 +783,7 @@ func uniqueContactIDs(urnMap map[urns.URN]ContactID) []ContactID {

// Tries to create a new contact for the passed in org with the passed in URNs. Returned error can be tested with `dbutil.IsUniqueViolation` to
// determine if problem was one or more of the URNs already exist and are assigned to other contacts.
func tryInsertContactAndURNs(ctx context.Context, db *sqlx.DB, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) {
func tryInsertContactAndURNs(ctx context.Context, db QueryerWithTx, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) {
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return NilContactID, errors.Wrapf(err, "error beginning transaction")
Expand All @@ -804,14 +804,14 @@ func tryInsertContactAndURNs(ctx context.Context, db *sqlx.DB, orgID OrgID, user
return contactID, nil
}

func insertContactAndURNs(ctx context.Context, tx *sqlx.Tx, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) {
func insertContactAndURNs(ctx context.Context, db Queryer, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) {
if userID == NilUserID {
userID = UserID(1)
}

// first insert our contact
var contactID ContactID
err := tx.GetContext(ctx, &contactID,
err := db.GetContext(ctx, &contactID,
`INSERT INTO contacts_contact (org_id, is_active, status, uuid, name, language, created_on, modified_on, created_by_id, modified_by_id)
VALUES($1, TRUE, 'A', $2, $3, $4, $5, $5, $6, $6)
RETURNING id`,
Expand All @@ -826,17 +826,17 @@ func insertContactAndURNs(ctx context.Context, tx *sqlx.Tx, orgID OrgID, userID
for _, urn := range urnz {
// look for a URN with this identity that already exists but doesn't have a contact so could be attached
var orphanURNID URNID
err = tx.GetContext(ctx, &orphanURNID, `SELECT id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2 AND contact_id IS NULL`, orgID, urn.Identity())
err = db.GetContext(ctx, &orphanURNID, `SELECT id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2 AND contact_id IS NULL`, orgID, urn.Identity())
if err != nil && err != sql.ErrNoRows {
return NilContactID, err
}
if orphanURNID != NilURNID {
_, err := tx.ExecContext(ctx, `UPDATE contacts_contacturn SET contact_id = $2, priority = $3 WHERE id = $1`, orphanURNID, contactID, priority)
_, err := db.ExecContext(ctx, `UPDATE contacts_contacturn SET contact_id = $2, priority = $3 WHERE id = $1`, orphanURNID, contactID, priority)
if err != nil {
return NilContactID, errors.Wrapf(err, "error attaching existing URN to new contact")
}
} else {
_, err := tx.ExecContext(
_, err := db.ExecContext(
ctx,
`INSERT INTO contacts_contacturn(org_id, identity, path, scheme, display, auth, priority, channel_id, contact_id)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
Expand Down
42 changes: 40 additions & 2 deletions models/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package models_test

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -585,7 +586,7 @@ func TestGetOrCreateContact(t *testing.T) {
newContact := func() models.ContactID { maxContactID++; return maxContactID }
prevContact := func() models.ContactID { return maxContactID }

org, err := models.GetOrgAssets(ctx, db, models.Org1)
oa, err := models.GetOrgAssets(ctx, db, models.Org1)
assert.NoError(t, err)

tcs := []struct {
Expand Down Expand Up @@ -651,14 +652,51 @@ func TestGetOrCreateContact(t *testing.T) {
}

for i, tc := range tcs {
contact, flowContact, err := models.GetOrCreateContact(ctx, db, org, tc.URNs)
contact, flowContact, err := models.GetOrCreateContact(ctx, db, oa, tc.URNs)
assert.NoError(t, err, "%d: error creating contact", i)

assert.Equal(t, tc.ContactID, contact.ID(), "%d: contact id mismatch", i)
assert.Equal(t, tc.ContactURNs, flowContact.URNs().RawURNs(), "%d: URNs mismatch", i)
}
}

func TestGetOrCreateContactRace(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()

oa, err := models.GetOrgAssets(ctx, db, models.Org1)
assert.NoError(t, err)

mdb := testsuite.NewMockDB(db, func(funcName string, call int) error {
// Make beginning a transaction take a while to create race condition. All threads will fetch
// URN owners and decide nobody owns the URN, so all threads will try to create a new contact.
if funcName == "BeginTxx" {
time.Sleep(100 * time.Millisecond)
}
return nil
})

var contacts [2]*models.Contact
var errs [2]error
wg := &sync.WaitGroup{}

getOrCreate := func(i int) {
defer wg.Done()
contacts[i], _, errs[i] = models.GetOrCreateContact(ctx, mdb, oa, []urns.URN{urns.URN("telegram:100007")})
}

wg.Add(2)
go getOrCreate(0)
go getOrCreate(1)

// let both finish
wg.Wait()

require.NoError(t, errs[0])
require.NoError(t, errs[1])
assert.Equal(t, contacts[0].ID(), contacts[1].ID())
}

func TestGetContactIDsFromReferences(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()
Expand Down
16 changes: 13 additions & 3 deletions models/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,19 @@ func (b *ContactImportBatch) markComplete(ctx context.Context, db *sqlx.DB, impo
b.NumErrored = numErrored
b.Errors = errorsJSON
b.FinishedOn = &now
_, err = db.ExecContext(ctx,
`UPDATE contacts_contactimportbatch SET status = $2, num_created = $3, num_updated = $4, num_errored = $5, errors = $6, finished_on = $7 WHERE id = $1`,
b.ID, b.Status, b.NumCreated, b.NumUpdated, b.NumErrored, b.Errors, b.FinishedOn,
_, err = db.NamedExecContext(ctx,
`UPDATE
contacts_contactimportbatch
SET
status = :status,
num_created = :num_created,
num_updated = :num_updated,
num_errored = :num_errored,
errors = :errors,
finished_on = :finished_on
WHERE
id = :id`,
b,
)
return err
}
Expand Down
7 changes: 7 additions & 0 deletions models/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/jmoiron/sqlx"
"github.com/nyaruka/mailroom/utils/dbutil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -19,6 +20,12 @@ type Queryer interface {
GetContext(ctx context.Context, value interface{}, query string, args ...interface{}) error
}

type QueryerWithTx interface {
Queryer

BeginTxx(ctx context.Context, opts *sql.TxOptions) (*sqlx.Tx, error)
}

// Exec calls ExecContext on the passed in Queryer, logging time taken if any rows were affected
func Exec(ctx context.Context, label string, tx Queryer, sql string, args ...interface{}) error {
start := time.Now()
Expand Down
7 changes: 7 additions & 0 deletions testsuite/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,10 @@ func (d *MockDB) GetContext(ctx context.Context, value interface{}, query string
}
return d.real.GetContext(ctx, value, query, args...)
}

func (d *MockDB) BeginTxx(ctx context.Context, opts *sql.TxOptions) (*sqlx.Tx, error) {
if err := d.check("BeginTxx"); err != nil {
return nil, err
}
return d.real.BeginTxx(ctx, opts)
}

0 comments on commit 81e279b

Please sign in to comment.