Skip to content

Commit

Permalink
Allow get or create by multiple URNs (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Sep 25, 2020
1 parent 2f1e7fe commit c159526
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 67 deletions.
117 changes: 65 additions & 52 deletions models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,26 @@ func ContactIDsFromReferences(ctx context.Context, tx Queryer, orgID OrgID, refs

// ContactIDsFromUUIDs queries the contacts for the passed in org, returning the contact ids for the UUIDs
func ContactIDsFromUUIDs(ctx context.Context, tx Queryer, orgID OrgID, uuids []flows.ContactUUID) ([]ContactID, error) {
ids := make([]ContactID, 0, len(uuids))
rows, err := tx.QueryxContext(ctx, `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, orgID, pq.Array(uuids))
ids, err := queryContactIDs(ctx, tx, `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, orgID, pq.Array(uuids))
if err != nil {
return nil, errors.Wrapf(err, "error selecting contact ids by UUID")
}
return ids, nil
}

func queryContactIDs(ctx context.Context, tx Queryer, query string, args ...interface{}) ([]ContactID, error) {
ids := make([]ContactID, 0, 10)
rows, err := tx.QueryxContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()

var id ContactID
for rows.Next() {
err := rows.Scan(&id)
if err != nil {
return nil, errors.Wrapf(err, "error scanning contact id")
return nil, err
}
ids = append(ids, id)
}
Expand Down Expand Up @@ -568,9 +576,9 @@ WHERE
) r;
`

// ContactIDsFromURNs will fetch or create the contacts for the passed in URNs, returning a map the same length as
// GetOrCreateContactIDsFromURNs will fetch or create the contacts for the passed in URNs, returning a map the same length as
// the passed in URNs with the ids of the contacts.
func ContactIDsFromURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, us []urns.URN) (map[urns.URN]ContactID, error) {
func GetOrCreateContactIDsFromURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, us []urns.URN) (map[urns.URN]ContactID, error) {
// build a map of our urns to contact id
urnMap := make(map[urns.URN]ContactID, len(us))

Expand Down Expand Up @@ -641,8 +649,8 @@ func ContactIDsFromURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, us []u
}

// CreateContact creates a new contact for the passed in org with the passed in URNs
func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (*Contact, *flows.Contact, error) {
contactID, err := insertContactAndURNs(ctx, db, org, userID, name, language, urnz)
func CreateContact(ctx context.Context, db *sqlx.DB, oa *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (*Contact, *flows.Contact, error) {
contactID, err := tryInsertContactAndURNs(ctx, db, oa.OrgID(), userID, name, language, urnz)
if err != nil {
if dbutil.IsUniqueViolation(err) {
return nil, nil, errors.New("URNs in use by other contacts")
Expand All @@ -651,58 +659,71 @@ func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID User
}

// load a full contact so that we can calculate dynamic groups
contacts, err := LoadContacts(ctx, db, org, []ContactID{contactID})
contacts, err := LoadContacts(ctx, db, oa, []ContactID{contactID})
if err != nil {
return nil, nil, errors.Wrapf(err, "error loading new contact")
}
contact := contacts[0]

flowContact, err := contact.FlowContact(org)
flowContact, err := contact.FlowContact(oa)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating flow contact")
}

err = CalculateDynamicGroups(ctx, db, org, flowContact)
err = CalculateDynamicGroups(ctx, db, oa, flowContact)
if err != nil {
return nil, nil, errors.Wrapf(err, "error calculating dynamic groups")
}

return contact, flowContact, nil
}

// GetOrCreateContact creates a new contact for the passed in org with the passed in URNs
func GetOrCreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn urns.URN) (*Contact, *flows.Contact, error) {
// GetOrCreateContact fetches or creates a new contact for the passed in org with the passed in URNs
func GetOrCreateContact(ctx context.Context, db *sqlx.DB, oa *OrgAssets, urnz []urns.URN) (*Contact, *flows.Contact, error) {
created := true

contactID, err := insertContactAndURNs(ctx, db, org, UserID(1), "", envs.NilLanguage, []urns.URN{urn})
contactID, err := tryInsertContactAndURNs(ctx, db, oa.OrgID(), UserID(1), "", envs.NilLanguage, urnz)
if err != nil {
if dbutil.IsUniqueViolation(err) {
// if this was a duplicate URN, we should be able to fetch this contact instead
err := db.GetContext(ctx, &contactID, `SELECT contact_id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2`, org.OrgID(), urn.Identity())
// if we blew up because URNs are already taken by other contacts, find who owns them
identities := make([]string, len(urnz))
for i := range urnz {
identities[i] = string(urnz[i].Identity())
}

contactIDs, err := queryContactIDs(ctx, db, `SELECT DISTINCT contact_id FROM contacts_contacturn WHERE org_id = $1 AND identity = ANY($2) AND contact_id IS NOT NULL`, oa.OrgID(), pq.Array(identities))
if err != nil {
return nil, nil, errors.Wrapf(err, "unable to load contact")
return nil, nil, errors.Wrapf(err, "error querying contacts with URNs")
}

// if we have a single contact, we can return that
if len(contactIDs) == 1 {
contactID = contactIDs[0]
} else {
return nil, nil, errors.New("error because URNs belong to different contacts")
}

created = false
} else {
return nil, nil, err
}
}

// load a full contact so that we can calculate dynamic groups
contacts, err := LoadContacts(ctx, db, org, []ContactID{contactID})
contacts, err := LoadContacts(ctx, db, oa, []ContactID{contactID})
if err != nil {
return nil, nil, errors.Wrapf(err, "error loading new contact")
}
contact := contacts[0]

flowContact, err := contact.FlowContact(org)
flowContact, err := contact.FlowContact(oa)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating flow contact")
}

// calculate dynamic groups if contact was created
if created {
err := CalculateDynamicGroups(ctx, db, org, flowContact)
err := CalculateDynamicGroups(ctx, db, oa, flowContact)
if err != nil {
return nil, nil, errors.Wrapf(err, "error calculating dynamic groups")
}
Expand All @@ -712,29 +733,44 @@ func GetOrCreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn ur
}

// tries to create a new contact for the passed in org with the passed in URNs
func insertContactAndURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) {
func tryInsertContactAndURNs(ctx context.Context, db *sqlx.DB, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) {
if userID == NilUserID {
userID = UserID(1)
}

tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return NilContactID, errors.Wrapf(err, "unable to start transaction")
return NilContactID, errors.Wrapf(err, "error beginning transaction")
}

contactID, err := insertContactAndURNs(ctx, tx, orgID, userID, name, language, urnz)
if err != nil {
tx.Rollback()
return NilContactID, err
}

err = tx.Commit()
if err != nil {
tx.Rollback()
return NilContactID, errors.Wrapf(err, "error committing transaction")
}

return contactID, nil
}

func insertContactAndURNs(ctx context.Context, tx *sqlx.Tx, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) {
// first insert our contact
var contactID ContactID
err = tx.GetContext(ctx, &contactID,
err := tx.GetContext(ctx, &contactID,
`INSERT INTO contacts_contact
(org_id, is_active, status, uuid, name, language, created_on, modified_on, created_by_id, modified_by_id)
VALUES
($1, TRUE, 'A', $2, $3, $4, $5, $5, $6, $6)
RETURNING id`,
org.OrgID(), uuids.New(), null.String(name), null.String(string(language)), dates.Now(), userID,
orgID, uuids.New(), null.String(name), null.String(string(language)), dates.Now(), userID,
)

if err != nil {
tx.Rollback()
return NilContactID, errors.Wrapf(err, "error inserting new contact")
}

Expand All @@ -744,7 +780,7 @@ func insertContactAndURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, user
for _, urn := range urnz {
// look for a URN with this identity that already exists but doesn't have a contact so could be attached
var orphanURNID URNID
err = tx.GetContext(ctx, &orphanURNID, `SELECT id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2 AND contact_id IS NULL`, org.OrgID(), urn.Identity())
err = tx.GetContext(ctx, &orphanURNID, `SELECT id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2 AND contact_id IS NULL`, orgID, urn.Identity())
if err != nil && err != sql.ErrNoRows {
return NilContactID, err
}
Expand All @@ -760,31 +796,22 @@ func insertContactAndURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, user
(org_id, identity, path, scheme, display, auth, priority, channel_id, contact_id)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
org.OrgID(), urn.Identity(), urn.Path(), urn.Scheme(), urn.Display(), GetURNAuth(urn), topURNPriority, nil, contactID,
orgID, urn.Identity(), urn.Path(), urn.Scheme(), urn.Display(), GetURNAuth(urn), topURNPriority, nil, contactID,
)

if err != nil {
tx.Rollback()
return NilContactID, err
}
}

// attach URNs
if len(urnsToAttach) > 0 {
_, err := tx.ExecContext(ctx, `UPDATE contacts_contacturn SET contact_id = $3 WHERE org_id = $1 AND id = ANY($2)`, org.OrgID(), pq.Array(urnsToAttach), contactID)
_, err := tx.ExecContext(ctx, `UPDATE contacts_contacturn SET contact_id = $3 WHERE org_id = $1 AND id = ANY($2)`, orgID, pq.Array(urnsToAttach), contactID)
if err != nil {
tx.Rollback()
return NilContactID, errors.Wrapf(err, "error attaching existing URNs to new contact")
}
}

// try to commit
err = tx.Commit()
if err != nil {
tx.Rollback()
return NilContactID, err
}

return contactID, nil
}

Expand Down Expand Up @@ -1257,23 +1284,9 @@ func UpdateContactURNs(ctx context.Context, tx Queryer, org *OrgAssets, changes

if len(inserts) > 0 {
// find the unique ids of the contacts that may be affected by our URN inserts
rows, err := tx.QueryxContext(ctx,
`SELECT contact_id FROM contacts_contacturn WHERE identity = ANY($1) AND org_id = $2 AND contact_id IS NOT NULL`,
pq.Array(identities), org.OrgID(),
)
orphanedIDs, err := queryContactIDs(ctx, tx, `SELECT contact_id FROM contacts_contacturn WHERE identity = ANY($1) AND org_id = $2 AND contact_id IS NOT NULL`, pq.Array(identities), org.OrgID())
if err != nil {
return errors.Wrapf(err, "error finding contacts for urns")
}
defer rows.Close()

orphanedIDs := make([]ContactID, 0, len(inserts))
for rows.Next() {
var contactID ContactID
err := rows.Scan(&contactID)
if err != nil {
return errors.Wrapf(err, "error reading orphaned contacts")
}
orphanedIDs = append(orphanedIDs, contactID)
return errors.Wrapf(err, "error finding contacts for URNs")
}

// then insert new urns, we do these one by one since we have to deal with conflicts
Expand Down
18 changes: 10 additions & 8 deletions models/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func TestContacts(t *testing.T) {
assert.Equal(t, "whatsapp:250788373373?id=20121&priority=998", bob.URNs()[2].String())
}

func TestContactsFromURN(t *testing.T) {
func TestGetOrCreateContactIDsFromURNs(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()
testsuite.Reset()
Expand All @@ -538,7 +538,7 @@ func TestContactsFromURN(t *testing.T) {
assert.NoError(t, err)

for i, tc := range tcs {
ids, err := ContactIDsFromURNs(ctx, db, org, []urns.URN{tc.URN})
ids, err := GetOrCreateContactIDsFromURNs(ctx, db, org, []urns.URN{tc.URN})
assert.NoError(t, err, "%d: error getting contact ids", i)

if len(ids) != 1 {
Expand All @@ -559,20 +559,22 @@ func TestGetOrCreateContact(t *testing.T) {

tcs := []struct {
OrgID OrgID
URN urns.URN
URNs []urns.URN
ContactID ContactID
}{
{Org1, CathyURN, CathyID},
{Org1, urns.URN(CathyURN.String() + "?foo=bar"), CathyID},
{Org1, urns.URN("telegram:12345678"), ContactID(maxContactID + 3)},
{Org1, urns.URN("telegram:12345678"), ContactID(maxContactID + 3)},
{Org1, []urns.URN{CathyURN}, CathyID},
{Org1, []urns.URN{urns.URN(CathyURN.String() + "?foo=bar")}, CathyID}, // only URN identity is considered
{Org1, []urns.URN{urns.URN("telegram:100001")}, ContactID(maxContactID + 3)}, // creates new contact
{Org1, []urns.URN{urns.URN("telegram:100001")}, ContactID(maxContactID + 3)}, // returns the same created contact
{Org1, []urns.URN{urns.URN("telegram:100001"), urns.URN("telegram:100002")}, ContactID(maxContactID + 3)}, // same again as other URNs don't exist
{Org1, []urns.URN{urns.URN("telegram:100002"), urns.URN("telegram:100001")}, ContactID(maxContactID + 3)}, // same again as other URNs don't exist
}

org, err := GetOrgAssets(ctx, db, Org1)
assert.NoError(t, err)

for i, tc := range tcs {
contact, _, err := GetOrCreateContact(ctx, db, org, tc.URN)
contact, _, err := GetOrCreateContact(ctx, db, org, tc.URNs)
assert.NoError(t, err, "%d: error creating contact", i)
assert.Equal(t, tc.ContactID, contact.ID(), "%d: mismatch in contact id", i)
}
Expand Down
4 changes: 1 addition & 3 deletions models/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ func (b *ContactImportBatch) getOrCreateContacts(ctx context.Context, db *sqlx.D
addModifier(modifiers.NewURNs(spec.URNs, modifiers.URNsAppend))

} else {
// TODO get or create by multiple URNs

imp.contact, imp.flowContact, err = GetOrCreateContact(ctx, db, oa, spec.URNs[0])
imp.contact, imp.flowContact, err = GetOrCreateContact(ctx, db, oa, spec.URNs)
if err != nil {
addError("Unable to get or create contact with URN '%s'", spec.URNs[0])
continue
Expand Down
2 changes: 1 addition & 1 deletion tasks/broadcasts/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func CreateBroadcastBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, bc
}

// get the contact ids for our URNs
urnMap, err := models.ContactIDsFromURNs(ctx, db, oa, bcast.URNs())
urnMap, err := models.GetOrCreateContactIDsFromURNs(ctx, db, oa, bcast.URNs())
if err != nil {
return errors.Wrapf(err, "error getting contact ids for urns")
}
Expand Down
2 changes: 1 addition & 1 deletion tasks/starts/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela

// look up any contacts by URN
if len(start.URNs()) > 0 {
urnContactIDs, err := models.ContactIDsFromURNs(ctx, db, oa, start.URNs())
urnContactIDs, err := models.GetOrCreateContactIDsFromURNs(ctx, db, oa, start.URNs())
if err != nil {
return errors.Wrapf(err, "error getting contact ids from urns")
}
Expand Down
2 changes: 1 addition & 1 deletion web/ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func handleIncomingCall(ctx context.Context, s *web.Server, r *http.Request, w h
}

// get the contact for this URN
contact, _, err := models.GetOrCreateContact(ctx, s.DB, oa, urn)
contact, _, err := models.GetOrCreateContact(ctx, s.DB, oa, []urns.URN{urn})
if err != nil {
return channel, nil, client.WriteErrorResponse(w, errors.Wrapf(err, "unable to get contact by urn"))
}
Expand Down
3 changes: 2 additions & 1 deletion web/surveyor/surveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"net/http"

"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/goflow/flows"
Expand Down Expand Up @@ -95,7 +96,7 @@ func handleSubmit(ctx context.Context, s *web.Server, r *http.Request) (interfac
// create / fetch our contact based on the highest priority URN
urn := fs.Contact().URNs()[0].URN()

_, flowContact, err = models.GetOrCreateContact(ctx, s.DB, oa, urn)
_, flowContact, err = models.GetOrCreateContact(ctx, s.DB, oa, []urns.URN{urn})
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to look up contact")
}
Expand Down

0 comments on commit c159526

Please sign in to comment.