diff --git a/models/contacts.go b/models/contacts.go index 7062794d9..aa5df722e 100644 --- a/models/contacts.go +++ b/models/contacts.go @@ -205,18 +205,26 @@ func ContactIDsFromReferences(ctx context.Context, tx Queryer, orgID OrgID, refs // ContactIDsFromUUIDs queries the contacts for the passed in org, returning the contact ids for the UUIDs func ContactIDsFromUUIDs(ctx context.Context, tx Queryer, orgID OrgID, uuids []flows.ContactUUID) ([]ContactID, error) { - ids := make([]ContactID, 0, len(uuids)) - rows, err := tx.QueryxContext(ctx, `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, orgID, pq.Array(uuids)) + ids, err := queryContactIDs(ctx, tx, `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, orgID, pq.Array(uuids)) if err != nil { return nil, errors.Wrapf(err, "error selecting contact ids by UUID") } + return ids, nil +} + +func queryContactIDs(ctx context.Context, tx Queryer, query string, args ...interface{}) ([]ContactID, error) { + ids := make([]ContactID, 0, 10) + rows, err := tx.QueryxContext(ctx, query, args...) + if err != nil { + return nil, err + } defer rows.Close() var id ContactID for rows.Next() { err := rows.Scan(&id) if err != nil { - return nil, errors.Wrapf(err, "error scanning contact id") + return nil, err } ids = append(ids, id) } @@ -568,9 +576,9 @@ WHERE ) r; ` -// ContactIDsFromURNs will fetch or create the contacts for the passed in URNs, returning a map the same length as +// GetOrCreateContactIDsFromURNs will fetch or create the contacts for the passed in URNs, returning a map the same length as // the passed in URNs with the ids of the contacts. -func ContactIDsFromURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, us []urns.URN) (map[urns.URN]ContactID, error) { +func GetOrCreateContactIDsFromURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, us []urns.URN) (map[urns.URN]ContactID, error) { // build a map of our urns to contact id urnMap := make(map[urns.URN]ContactID, len(us)) @@ -641,8 +649,8 @@ func ContactIDsFromURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, us []u } // CreateContact creates a new contact for the passed in org with the passed in URNs -func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (*Contact, *flows.Contact, error) { - contactID, err := insertContactAndURNs(ctx, db, org, userID, name, language, urnz) +func CreateContact(ctx context.Context, db *sqlx.DB, oa *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (*Contact, *flows.Contact, error) { + contactID, err := tryInsertContactAndURNs(ctx, db, oa.OrgID(), userID, name, language, urnz) if err != nil { if dbutil.IsUniqueViolation(err) { return nil, nil, errors.New("URNs in use by other contacts") @@ -651,18 +659,18 @@ func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID User } // load a full contact so that we can calculate dynamic groups - contacts, err := LoadContacts(ctx, db, org, []ContactID{contactID}) + contacts, err := LoadContacts(ctx, db, oa, []ContactID{contactID}) if err != nil { return nil, nil, errors.Wrapf(err, "error loading new contact") } contact := contacts[0] - flowContact, err := contact.FlowContact(org) + flowContact, err := contact.FlowContact(oa) if err != nil { return nil, nil, errors.Wrapf(err, "error creating flow contact") } - err = CalculateDynamicGroups(ctx, db, org, flowContact) + err = CalculateDynamicGroups(ctx, db, oa, flowContact) if err != nil { return nil, nil, errors.Wrapf(err, "error calculating dynamic groups") } @@ -670,18 +678,31 @@ func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID User return contact, flowContact, nil } -// GetOrCreateContact creates a new contact for the passed in org with the passed in URNs -func GetOrCreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn urns.URN) (*Contact, *flows.Contact, error) { +// GetOrCreateContact fetches or creates a new contact for the passed in org with the passed in URNs +func GetOrCreateContact(ctx context.Context, db *sqlx.DB, oa *OrgAssets, urnz []urns.URN) (*Contact, *flows.Contact, error) { created := true - contactID, err := insertContactAndURNs(ctx, db, org, UserID(1), "", envs.NilLanguage, []urns.URN{urn}) + contactID, err := tryInsertContactAndURNs(ctx, db, oa.OrgID(), UserID(1), "", envs.NilLanguage, urnz) if err != nil { if dbutil.IsUniqueViolation(err) { - // if this was a duplicate URN, we should be able to fetch this contact instead - err := db.GetContext(ctx, &contactID, `SELECT contact_id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2`, org.OrgID(), urn.Identity()) + // if we blew up because URNs are already taken by other contacts, find who owns them + identities := make([]string, len(urnz)) + for i := range urnz { + identities[i] = string(urnz[i].Identity()) + } + + contactIDs, err := queryContactIDs(ctx, db, `SELECT DISTINCT contact_id FROM contacts_contacturn WHERE org_id = $1 AND identity = ANY($2) AND contact_id IS NOT NULL`, oa.OrgID(), pq.Array(identities)) if err != nil { - return nil, nil, errors.Wrapf(err, "unable to load contact") + return nil, nil, errors.Wrapf(err, "error querying contacts with URNs") } + + // if we have a single contact, we can return that + if len(contactIDs) == 1 { + contactID = contactIDs[0] + } else { + return nil, nil, errors.New("error because URNs belong to different contacts") + } + created = false } else { return nil, nil, err @@ -689,20 +710,20 @@ func GetOrCreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn ur } // load a full contact so that we can calculate dynamic groups - contacts, err := LoadContacts(ctx, db, org, []ContactID{contactID}) + contacts, err := LoadContacts(ctx, db, oa, []ContactID{contactID}) if err != nil { return nil, nil, errors.Wrapf(err, "error loading new contact") } contact := contacts[0] - flowContact, err := contact.FlowContact(org) + flowContact, err := contact.FlowContact(oa) if err != nil { return nil, nil, errors.Wrapf(err, "error creating flow contact") } // calculate dynamic groups if contact was created if created { - err := CalculateDynamicGroups(ctx, db, org, flowContact) + err := CalculateDynamicGroups(ctx, db, oa, flowContact) if err != nil { return nil, nil, errors.Wrapf(err, "error calculating dynamic groups") } @@ -712,29 +733,44 @@ func GetOrCreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn ur } // tries to create a new contact for the passed in org with the passed in URNs -func insertContactAndURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) { +func tryInsertContactAndURNs(ctx context.Context, db *sqlx.DB, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) { if userID == NilUserID { userID = UserID(1) } tx, err := db.BeginTxx(ctx, nil) if err != nil { - return NilContactID, errors.Wrapf(err, "unable to start transaction") + return NilContactID, errors.Wrapf(err, "error beginning transaction") } + contactID, err := insertContactAndURNs(ctx, tx, orgID, userID, name, language, urnz) + if err != nil { + tx.Rollback() + return NilContactID, err + } + + err = tx.Commit() + if err != nil { + tx.Rollback() + return NilContactID, errors.Wrapf(err, "error committing transaction") + } + + return contactID, nil +} + +func insertContactAndURNs(ctx context.Context, tx *sqlx.Tx, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) { // first insert our contact var contactID ContactID - err = tx.GetContext(ctx, &contactID, + err := tx.GetContext(ctx, &contactID, `INSERT INTO contacts_contact (org_id, is_active, status, uuid, name, language, created_on, modified_on, created_by_id, modified_by_id) VALUES ($1, TRUE, 'A', $2, $3, $4, $5, $5, $6, $6) RETURNING id`, - org.OrgID(), uuids.New(), null.String(name), null.String(string(language)), dates.Now(), userID, + orgID, uuids.New(), null.String(name), null.String(string(language)), dates.Now(), userID, ) if err != nil { - tx.Rollback() return NilContactID, errors.Wrapf(err, "error inserting new contact") } @@ -744,7 +780,7 @@ func insertContactAndURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, user for _, urn := range urnz { // look for a URN with this identity that already exists but doesn't have a contact so could be attached var orphanURNID URNID - err = tx.GetContext(ctx, &orphanURNID, `SELECT id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2 AND contact_id IS NULL`, org.OrgID(), urn.Identity()) + err = tx.GetContext(ctx, &orphanURNID, `SELECT id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2 AND contact_id IS NULL`, orgID, urn.Identity()) if err != nil && err != sql.ErrNoRows { return NilContactID, err } @@ -760,31 +796,22 @@ func insertContactAndURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, user (org_id, identity, path, scheme, display, auth, priority, channel_id, contact_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, - org.OrgID(), urn.Identity(), urn.Path(), urn.Scheme(), urn.Display(), GetURNAuth(urn), topURNPriority, nil, contactID, + orgID, urn.Identity(), urn.Path(), urn.Scheme(), urn.Display(), GetURNAuth(urn), topURNPriority, nil, contactID, ) if err != nil { - tx.Rollback() return NilContactID, err } } // attach URNs if len(urnsToAttach) > 0 { - _, err := tx.ExecContext(ctx, `UPDATE contacts_contacturn SET contact_id = $3 WHERE org_id = $1 AND id = ANY($2)`, org.OrgID(), pq.Array(urnsToAttach), contactID) + _, err := tx.ExecContext(ctx, `UPDATE contacts_contacturn SET contact_id = $3 WHERE org_id = $1 AND id = ANY($2)`, orgID, pq.Array(urnsToAttach), contactID) if err != nil { - tx.Rollback() return NilContactID, errors.Wrapf(err, "error attaching existing URNs to new contact") } } - // try to commit - err = tx.Commit() - if err != nil { - tx.Rollback() - return NilContactID, err - } - return contactID, nil } @@ -1257,23 +1284,9 @@ func UpdateContactURNs(ctx context.Context, tx Queryer, org *OrgAssets, changes if len(inserts) > 0 { // find the unique ids of the contacts that may be affected by our URN inserts - rows, err := tx.QueryxContext(ctx, - `SELECT contact_id FROM contacts_contacturn WHERE identity = ANY($1) AND org_id = $2 AND contact_id IS NOT NULL`, - pq.Array(identities), org.OrgID(), - ) + orphanedIDs, err := queryContactIDs(ctx, tx, `SELECT contact_id FROM contacts_contacturn WHERE identity = ANY($1) AND org_id = $2 AND contact_id IS NOT NULL`, pq.Array(identities), org.OrgID()) if err != nil { - return errors.Wrapf(err, "error finding contacts for urns") - } - defer rows.Close() - - orphanedIDs := make([]ContactID, 0, len(inserts)) - for rows.Next() { - var contactID ContactID - err := rows.Scan(&contactID) - if err != nil { - return errors.Wrapf(err, "error reading orphaned contacts") - } - orphanedIDs = append(orphanedIDs, contactID) + return errors.Wrapf(err, "error finding contacts for URNs") } // then insert new urns, we do these one by one since we have to deal with conflicts diff --git a/models/contacts_test.go b/models/contacts_test.go index 04d2f9048..2a316f909 100644 --- a/models/contacts_test.go +++ b/models/contacts_test.go @@ -515,7 +515,7 @@ func TestContacts(t *testing.T) { assert.Equal(t, "whatsapp:250788373373?id=20121&priority=998", bob.URNs()[2].String()) } -func TestContactsFromURN(t *testing.T) { +func TestGetOrCreateContactIDsFromURNs(t *testing.T) { ctx := testsuite.CTX() db := testsuite.DB() testsuite.Reset() @@ -538,7 +538,7 @@ func TestContactsFromURN(t *testing.T) { assert.NoError(t, err) for i, tc := range tcs { - ids, err := ContactIDsFromURNs(ctx, db, org, []urns.URN{tc.URN}) + ids, err := GetOrCreateContactIDsFromURNs(ctx, db, org, []urns.URN{tc.URN}) assert.NoError(t, err, "%d: error getting contact ids", i) if len(ids) != 1 { @@ -559,20 +559,22 @@ func TestGetOrCreateContact(t *testing.T) { tcs := []struct { OrgID OrgID - URN urns.URN + URNs []urns.URN ContactID ContactID }{ - {Org1, CathyURN, CathyID}, - {Org1, urns.URN(CathyURN.String() + "?foo=bar"), CathyID}, - {Org1, urns.URN("telegram:12345678"), ContactID(maxContactID + 3)}, - {Org1, urns.URN("telegram:12345678"), ContactID(maxContactID + 3)}, + {Org1, []urns.URN{CathyURN}, CathyID}, + {Org1, []urns.URN{urns.URN(CathyURN.String() + "?foo=bar")}, CathyID}, // only URN identity is considered + {Org1, []urns.URN{urns.URN("telegram:100001")}, ContactID(maxContactID + 3)}, // creates new contact + {Org1, []urns.URN{urns.URN("telegram:100001")}, ContactID(maxContactID + 3)}, // returns the same created contact + {Org1, []urns.URN{urns.URN("telegram:100001"), urns.URN("telegram:100002")}, ContactID(maxContactID + 3)}, // same again as other URNs don't exist + {Org1, []urns.URN{urns.URN("telegram:100002"), urns.URN("telegram:100001")}, ContactID(maxContactID + 3)}, // same again as other URNs don't exist } org, err := GetOrgAssets(ctx, db, Org1) assert.NoError(t, err) for i, tc := range tcs { - contact, _, err := GetOrCreateContact(ctx, db, org, tc.URN) + contact, _, err := GetOrCreateContact(ctx, db, org, tc.URNs) assert.NoError(t, err, "%d: error creating contact", i) assert.Equal(t, tc.ContactID, contact.ID(), "%d: mismatch in contact id", i) } diff --git a/models/imports.go b/models/imports.go index 05fad17ab..0ae31a49c 100644 --- a/models/imports.go +++ b/models/imports.go @@ -160,9 +160,7 @@ func (b *ContactImportBatch) getOrCreateContacts(ctx context.Context, db *sqlx.D addModifier(modifiers.NewURNs(spec.URNs, modifiers.URNsAppend)) } else { - // TODO get or create by multiple URNs - - imp.contact, imp.flowContact, err = GetOrCreateContact(ctx, db, oa, spec.URNs[0]) + imp.contact, imp.flowContact, err = GetOrCreateContact(ctx, db, oa, spec.URNs) if err != nil { addError("Unable to get or create contact with URN '%s'", spec.URNs[0]) continue diff --git a/tasks/broadcasts/worker.go b/tasks/broadcasts/worker.go index 19fc73e30..15fa93688 100644 --- a/tasks/broadcasts/worker.go +++ b/tasks/broadcasts/worker.go @@ -62,7 +62,7 @@ func CreateBroadcastBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, bc } // get the contact ids for our URNs - urnMap, err := models.ContactIDsFromURNs(ctx, db, oa, bcast.URNs()) + urnMap, err := models.GetOrCreateContactIDsFromURNs(ctx, db, oa, bcast.URNs()) if err != nil { return errors.Wrapf(err, "error getting contact ids for urns") } diff --git a/tasks/starts/worker.go b/tasks/starts/worker.go index caab3bd62..d822e67dd 100644 --- a/tasks/starts/worker.go +++ b/tasks/starts/worker.go @@ -73,7 +73,7 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela // look up any contacts by URN if len(start.URNs()) > 0 { - urnContactIDs, err := models.ContactIDsFromURNs(ctx, db, oa, start.URNs()) + urnContactIDs, err := models.GetOrCreateContactIDsFromURNs(ctx, db, oa, start.URNs()) if err != nil { return errors.Wrapf(err, "error getting contact ids from urns") } diff --git a/web/ivr/ivr.go b/web/ivr/ivr.go index 900a248c5..b5f9d5819 100644 --- a/web/ivr/ivr.go +++ b/web/ivr/ivr.go @@ -102,7 +102,7 @@ func handleIncomingCall(ctx context.Context, s *web.Server, r *http.Request, w h } // get the contact for this URN - contact, _, err := models.GetOrCreateContact(ctx, s.DB, oa, urn) + contact, _, err := models.GetOrCreateContact(ctx, s.DB, oa, []urns.URN{urn}) if err != nil { return channel, nil, client.WriteErrorResponse(w, errors.Wrapf(err, "unable to get contact by urn")) } diff --git a/web/surveyor/surveyor.go b/web/surveyor/surveyor.go index 0ed1a323e..c9f8f3d5b 100644 --- a/web/surveyor/surveyor.go +++ b/web/surveyor/surveyor.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" + "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/envs" "github.com/nyaruka/goflow/flows" @@ -95,7 +96,7 @@ func handleSubmit(ctx context.Context, s *web.Server, r *http.Request) (interfac // create / fetch our contact based on the highest priority URN urn := fs.Contact().URNs()[0].URN() - _, flowContact, err = models.GetOrCreateContact(ctx, s.DB, oa, urn) + _, flowContact, err = models.GetOrCreateContact(ctx, s.DB, oa, []urns.URN{urn}) if err != nil { return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to look up contact") }