diff --git a/cmd/mailroom/main.go b/cmd/mailroom/main.go index 28f84ef8b..8b9ecd5e9 100644 --- a/cmd/mailroom/main.go +++ b/cmd/mailroom/main.go @@ -19,6 +19,7 @@ import ( _ "github.com/nyaruka/mailroom/services/tickets/zendesk" _ "github.com/nyaruka/mailroom/tasks/broadcasts" _ "github.com/nyaruka/mailroom/tasks/campaigns" + _ "github.com/nyaruka/mailroom/tasks/contacts" _ "github.com/nyaruka/mailroom/tasks/expirations" _ "github.com/nyaruka/mailroom/tasks/groups" _ "github.com/nyaruka/mailroom/tasks/interrupts" diff --git a/hooks/session_triggered.go b/hooks/session_triggered.go index 1a816f0a3..d25064428 100644 --- a/hooks/session_triggered.go +++ b/hooks/session_triggered.go @@ -86,7 +86,7 @@ func (h *InsertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool } // load our contacts by uuid - contactIDs, err := models.ContactIDsFromReferences(ctx, tx, oa, event.Contacts) + contactIDs, err := models.GetContactIDsFromReferences(ctx, tx, oa.OrgID(), event.Contacts) if err != nil { return errors.Wrapf(err, "error loading contacts by reference") } diff --git a/mailroom_test.dump b/mailroom_test.dump index 6ba4a08e9..fdad8714f 100644 Binary files a/mailroom_test.dump and b/mailroom_test.dump differ diff --git a/models/contacts.go b/models/contacts.go index fcb3151f1..b6f6237d9 100644 --- a/models/contacts.go +++ b/models/contacts.go @@ -35,10 +35,14 @@ type URNID null.Int // ContactID is our type for contact ids, which can be null type ContactID null.Int +// URN priority constants const ( topURNPriority = 1000 defaultURNPriority = 0 +) +// nil versions of ID types +const ( NilURNID = URNID(0) NilContactID = ContactID(0) ) @@ -83,7 +87,8 @@ func LoadContact(ctx context.Context, db Queryer, org *OrgAssets, id ContactID) return contacts[0], nil } -// LoadContacts loads a set of contacts for the passed in ids +// LoadContacts loads a set of contacts for the passed in ids. Note that the order of the returned contacts +// won't necessarily match the order of the ids. func LoadContacts(ctx context.Context, db Queryer, org *OrgAssets, ids []ContactID) ([]*Contact, error) { start := time.Now() @@ -162,6 +167,15 @@ func LoadContacts(ctx context.Context, db Queryer, org *OrgAssets, ids []Contact return contacts, nil } +// LoadContactsByUUID loads a set of contacts for the passed in UUIDs +func LoadContactsByUUID(ctx context.Context, db Queryer, oa *OrgAssets, uuids []flows.ContactUUID) ([]*Contact, error) { + ids, err := getContactIDsFromUUIDs(ctx, db, oa.OrgID(), uuids) + if err != nil { + return nil, err + } + return LoadContacts(ctx, db, oa, ids) +} + // GetNewestContactModifiedOn returns the newest modified_on for a contact in the passed in org func GetNewestContactModifiedOn(ctx context.Context, db *sqlx.DB, org *OrgAssets) (*time.Time, error) { rows, err := db.QueryxContext(ctx, "SELECT modified_on FROM contacts_contact WHERE org_id = $1 ORDER BY modified_on DESC LIMIT 1", org.OrgID()) @@ -183,21 +197,33 @@ func GetNewestContactModifiedOn(ctx context.Context, db *sqlx.DB, org *OrgAssets return nil, nil } -// ContactIDsFromReferences queries the contacts for the passed in org, returning the contact ids for the references -func ContactIDsFromReferences(ctx context.Context, tx Queryer, org *OrgAssets, refs []*flows.ContactReference) ([]ContactID, error) { +// GetContactIDsFromReferences gets the contact ids for the given org and set of references. Note that the order of the returned contacts +// won't necessarily match the order of the references. +func GetContactIDsFromReferences(ctx context.Context, tx Queryer, orgID OrgID, refs []*flows.ContactReference) ([]ContactID, error) { // build our list of UUIDs - uuids := make([]interface{}, len(refs)) + uuids := make([]flows.ContactUUID, len(refs)) for i := range refs { uuids[i] = refs[i].UUID } - ids := make([]ContactID, 0, len(refs)) - rows, err := tx.QueryxContext(ctx, - `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, - org.OrgID(), pq.Array(uuids), - ) + return getContactIDsFromUUIDs(ctx, tx, orgID, uuids) +} + +// gets the contact IDs for the passed in org and set of UUIDs +func getContactIDsFromUUIDs(ctx context.Context, tx Queryer, orgID OrgID, uuids []flows.ContactUUID) ([]ContactID, error) { + ids, err := queryContactIDs(ctx, tx, `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, orgID, pq.Array(uuids)) if err != nil { - return nil, errors.Wrapf(err, "error selecting contact ids by uuid") + return nil, errors.Wrapf(err, "error selecting contact ids by UUID") + } + return ids, nil +} + +// utility to query contact IDs +func queryContactIDs(ctx context.Context, tx Queryer, query string, args ...interface{}) ([]ContactID, error) { + ids := make([]ContactID, 0, 10) + rows, err := tx.QueryxContext(ctx, query, args...) + if err != nil && err != sql.ErrNoRows { + return nil, errors.Wrapf(err, "error querying contact ids") } defer rows.Close() @@ -557,82 +583,84 @@ WHERE ) r; ` -// ContactIDsFromURNs will fetch or create the contacts for the passed in URNs, returning a map the same length as +// GetOrCreateContactIDsFromURNs will fetch or create the contacts for the passed in URNs, returning a map the same length as // the passed in URNs with the ids of the contacts. -func ContactIDsFromURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, us []urns.URN) (map[urns.URN]ContactID, error) { - // build a map of our urns to contact id - urnMap := make(map[urns.URN]ContactID, len(us)) +func GetOrCreateContactIDsFromURNs(ctx context.Context, db *sqlx.DB, oa *OrgAssets, urnz []urns.URN) (map[urns.URN]ContactID, error) { + // ensure all URNs are normalized + for i, urn := range urnz { + urnz[i] = urn.Normalize(string(oa.Env().DefaultCountry())) + } - // and another map from URN identity to the passed in URN - urnIdentities := make(map[urns.URN]urns.URN, len(us)) - for _, u := range us { - urnIdentities[u.Identity()] = u + // find current owners of these URNs + owners, err := contactIDsFromURNs(ctx, db, oa.OrgID(), urnz) + if err != nil { + return nil, errors.Wrapf(err, "error looking up contacts for URNs") } - // try to select our contact ids - identities := make([]string, len(us)) - for i := range us { - if us[i] == urns.NilURN { - return nil, errors.Errorf("cannot look up contact without URN") + // create any contacts that are missing + for urn, contactID := range owners { + if contactID == NilContactID { + contact, _, err := CreateContact(ctx, db, oa, NilUserID, "", envs.NilLanguage, []urns.URN{urn}) + if err != nil { + return nil, errors.Wrapf(err, "error creating contact") + } + owners[urn] = contact.ID() } - - identities[i] = us[i].Identity().String() } + return owners, nil +} - rows, err := db.QueryxContext(ctx, - `SELECT contact_id, identity FROM contacts_contacturn WHERE org_id = $1 AND identity = ANY($2) AND contact_id IS NOT NULL`, - org.OrgID(), pq.Array(identities), - ) +// looks up the contacts who own the given urns (which should be normalized by the caller) and returns that information as a map +func contactIDsFromURNs(ctx context.Context, db Queryer, orgID OrgID, urnz []urns.URN) (map[urns.URN]ContactID, error) { + identityToOriginal := make(map[urns.URN]urns.URN, len(urnz)) + identities := make([]urns.URN, len(urnz)) + owners := make(map[urns.URN]ContactID, len(urnz)) - if err != nil { - return nil, errors.Wrapf(err, "error querying contact urns") + for i, urn := range urnz { + identity := urn.Identity() + identityToOriginal[identity] = urn + identities[i] = identity + owners[urn] = NilContactID + } + + rows, err := db.QueryxContext(ctx, `SELECT contact_id, identity FROM contacts_contacturn WHERE org_id = $1 AND identity = ANY($2)`, orgID, pq.Array(identities)) + if err != nil && err != sql.ErrNoRows { + return nil, errors.Wrapf(err, "error querying contact URNs") } defer rows.Close() for rows.Next() { var urn urns.URN var id ContactID - - err := rows.Scan(&id, &urn) - if err != nil { - return nil, errors.Wrapf(err, "error scanning urn result") + if err := rows.Scan(&id, &urn); err != nil { + return nil, errors.Wrapf(err, "error scanning URN result") } + owners[identityToOriginal[urn]] = id + } - original, found := urnIdentities[urn] - if !found { - return nil, errors.Wrapf(err, "unable to find original URN from identity") - } + return owners, nil +} - urnMap[original] = id +// CreateContact creates a new contact for the passed in org with the passed in URNs +func CreateContact(ctx context.Context, db *sqlx.DB, oa *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (*Contact, *flows.Contact, error) { + // ensure all URNs are normalized + for i, urn := range urnz { + urnz[i] = urn.Normalize(string(oa.Env().DefaultCountry())) } - // if we didn't find some contacts - if len(urnMap) < len(us) { - // create the contacts that are missing - for _, u := range us { - if urnMap[u] == NilContactID { - contact, _, err := CreateContact(ctx, db, org, NilUserID, "", envs.NilLanguage, []urns.URN{u}) - if err != nil { - return nil, errors.Wrapf(err, "error while creating contact") - } - - original, found := urnIdentities[u] - if !found { - return nil, errors.Wrapf(err, "unable to find original URN from identity") - } - urnMap[original] = contact.ID() - } - } + // find current owners of these URNs + owners, err := contactIDsFromURNs(ctx, db, oa.OrgID(), urnz) + if err != nil { + return nil, nil, errors.Wrapf(err, "error looking up contacts for URNs") } - // return our map of urns to ids - return urnMap, nil -} + if len(uniqueContactIDs(owners)) > 0 { + return nil, nil, errors.New("URNs in use by other contacts") + } -// CreateContact creates a new contact for the passed in org with the passed in URNs -func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (*Contact, *flows.Contact, error) { - contactID, err := insertContactAndURNs(ctx, db, org, userID, name, language, urnz) + contactID, err := tryInsertContactAndURNs(ctx, db, oa.OrgID(), userID, name, language, urnz) if err != nil { + // always possible that another thread created a contact with these URNs after we checked above if dbutil.IsUniqueViolation(err) { return nil, nil, errors.New("URNs in use by other contacts") } @@ -640,18 +668,18 @@ func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID User } // load a full contact so that we can calculate dynamic groups - contacts, err := LoadContacts(ctx, db, org, []ContactID{contactID}) + contacts, err := LoadContacts(ctx, db, oa, []ContactID{contactID}) if err != nil { return nil, nil, errors.Wrapf(err, "error loading new contact") } contact := contacts[0] - flowContact, err := contact.FlowContact(org) + flowContact, err := contact.FlowContact(oa) if err != nil { return nil, nil, errors.Wrapf(err, "error creating flow contact") } - err = CalculateDynamicGroups(ctx, db, org, flowContact) + err = CalculateDynamicGroups(ctx, db, oa, flowContact) if err != nil { return nil, nil, errors.Wrapf(err, "error calculating dynamic groups") } @@ -659,39 +687,39 @@ func CreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID User return contact, flowContact, nil } -// GetOrCreateContact creates a new contact for the passed in org with the passed in URNs -func GetOrCreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn urns.URN) (*Contact, *flows.Contact, error) { - created := true +// GetOrCreateContact fetches or creates a new contact for the passed in org with the passed in URNs. +// +// * If none of the URNs exist, it creates a new contact with those URNs. +// * If URNs exist but are orphaned it creates a new contact and assigns those URNs to them. +// * If URNs exists and belongs to a single contact it returns that contact (other URNs are not assigned to the contact). +// * If URNs exists and belongs to multiple contacts it will return an error. +// +func GetOrCreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, urnz []urns.URN) (*Contact, *flows.Contact, error) { + // ensure all URNs are normalized + for i, urn := range urnz { + urnz[i] = urn.Normalize(string(oa.Env().DefaultCountry())) + } - contactID, err := insertContactAndURNs(ctx, db, org, UserID(1), "", envs.NilLanguage, []urns.URN{urn}) + contactID, created, err := getOrCreateContact(ctx, db, oa.OrgID(), urnz) if err != nil { - if dbutil.IsUniqueViolation(err) { - // if this was a duplicate URN, we should be able to fetch this contact instead - err := db.GetContext(ctx, &contactID, `SELECT contact_id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2`, org.OrgID(), urn.Identity()) - if err != nil { - return nil, nil, errors.Wrapf(err, "unable to load contact") - } - created = false - } else { - return nil, nil, err - } + return nil, nil, err } // load a full contact so that we can calculate dynamic groups - contacts, err := LoadContacts(ctx, db, org, []ContactID{contactID}) + contacts, err := LoadContacts(ctx, db, oa, []ContactID{contactID}) if err != nil { return nil, nil, errors.Wrapf(err, "error loading new contact") } contact := contacts[0] - flowContact, err := contact.FlowContact(org) + flowContact, err := contact.FlowContact(oa) if err != nil { return nil, nil, errors.Wrapf(err, "error creating flow contact") } // calculate dynamic groups if contact was created if created { - err := CalculateDynamicGroups(ctx, db, org, flowContact) + err := CalculateDynamicGroups(ctx, db, oa, flowContact) if err != nil { return nil, nil, errors.Wrapf(err, "error calculating dynamic groups") } @@ -700,78 +728,126 @@ func GetOrCreateContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, urn ur return contact, flowContact, nil } -// tries to create a new contact for the passed in org with the passed in URNs -func insertContactAndURNs(ctx context.Context, db *sqlx.DB, org *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) { - if userID == NilUserID { - userID = UserID(1) +func getOrCreateContact(ctx context.Context, db QueryerWithTx, orgID OrgID, urnz []urns.URN) (ContactID, bool, error) { + // find current owners of these URNs + owners, err := contactIDsFromURNs(ctx, db, orgID, urnz) + if err != nil { + return NilContactID, false, errors.Wrapf(err, "error looking up contacts for URNs") + } + + uniqueOwners := uniqueContactIDs(owners) + if len(uniqueOwners) > 1 { + return NilContactID, false, errors.New("error because URNs belong to different contacts") + } else if len(uniqueOwners) == 1 { + return uniqueOwners[0], false, nil } + contactID, err := tryInsertContactAndURNs(ctx, db, orgID, UserID(1), "", envs.NilLanguage, urnz) + if err == nil { + return contactID, true, nil + } + + if dbutil.IsUniqueViolation(err) { + // another thread must have created contacts with these URNs in the time between us looking them up and trying to + // create them ourselves, so let's try to fetch that contact + owners, err := contactIDsFromURNs(ctx, db, orgID, urnz) + if err != nil { + return NilContactID, false, errors.Wrapf(err, "error looking up contacts for URNs") + } + + uniqueOwners := uniqueContactIDs(owners) + if len(uniqueOwners) > 1 { + return NilContactID, false, errors.New("error because URNs belong to different contacts") + } else if len(uniqueOwners) == 1 { + return uniqueOwners[0], false, nil + } else { + return NilContactID, false, errors.New("lookup of URNs after failed insert returned zero contacts") + } + } + + return NilContactID, false, err +} + +// utility to extract non-nil unique contact IDs from the given URN map +func uniqueContactIDs(urnMap map[urns.URN]ContactID) []ContactID { + unique := make([]ContactID, 0, len(urnMap)) + seen := make(map[ContactID]bool) + for _, id := range urnMap { + if id != NilContactID && !seen[id] { + unique = append(unique, id) + seen[id] = true + } + } + return unique +} + +// Tries to create a new contact for the passed in org with the passed in URNs. Returned error can be tested with `dbutil.IsUniqueViolation` to +// determine if problem was one or more of the URNs already exist and are assigned to other contacts. +func tryInsertContactAndURNs(ctx context.Context, db QueryerWithTx, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) { tx, err := db.BeginTxx(ctx, nil) if err != nil { - return NilContactID, errors.Wrapf(err, "unable to start transaction") + return NilContactID, errors.Wrapf(err, "error beginning transaction") + } + + contactID, err := insertContactAndURNs(ctx, tx, orgID, userID, name, language, urnz) + if err != nil { + tx.Rollback() + return NilContactID, err + } + + err = tx.Commit() + if err != nil { + tx.Rollback() + return NilContactID, errors.Wrapf(err, "error committing transaction") + } + + return contactID, nil +} + +func insertContactAndURNs(ctx context.Context, db Queryer, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) { + if userID == NilUserID { + userID = UserID(1) } // first insert our contact var contactID ContactID - err = tx.GetContext(ctx, &contactID, - `INSERT INTO contacts_contact - (org_id, is_active, status, uuid, name, language, created_on, modified_on, created_by_id, modified_by_id) - VALUES - ($1, TRUE, 'A', $2, $3, $4, $5, $5, $6, $6) + err := db.GetContext(ctx, &contactID, + `INSERT INTO contacts_contact (org_id, is_active, status, uuid, name, language, created_on, modified_on, created_by_id, modified_by_id) + VALUES($1, TRUE, 'A', $2, $3, $4, $5, $5, $6, $6) RETURNING id`, - org.OrgID(), uuids.New(), null.String(name), null.String(string(language)), dates.Now(), userID, + orgID, uuids.New(), null.String(name), null.String(string(language)), dates.Now(), userID, ) - if err != nil { - tx.Rollback() return NilContactID, errors.Wrapf(err, "error inserting new contact") } - var urnsToAttach []URNID + priority := topURNPriority - // now try to insert the URNs for _, urn := range urnz { // look for a URN with this identity that already exists but doesn't have a contact so could be attached var orphanURNID URNID - err = tx.GetContext(ctx, &orphanURNID, `SELECT id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2 AND contact_id IS NULL`, org.OrgID(), urn.Identity()) + err = db.GetContext(ctx, &orphanURNID, `SELECT id FROM contacts_contacturn WHERE org_id = $1 AND identity = $2 AND contact_id IS NULL`, orgID, urn.Identity()) if err != nil && err != sql.ErrNoRows { return NilContactID, err } - if orphanURNID > 0 { - urnsToAttach = append(urnsToAttach, orphanURNID) - continue - } - - _, err := tx.ExecContext( - ctx, - `INSERT INTO - contacts_contacturn - (org_id, identity, path, scheme, display, auth, priority, channel_id, contact_id) - VALUES - ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, - org.OrgID(), urn.Identity(), urn.Path(), urn.Scheme(), urn.Display(), GetURNAuth(urn), topURNPriority, nil, contactID, - ) - - if err != nil { - tx.Rollback() - return NilContactID, err - } - } - - // attach URNs - if len(urnsToAttach) > 0 { - _, err := tx.ExecContext(ctx, `UPDATE contacts_contacturn SET contact_id = $3 WHERE org_id = $1 AND id = ANY($2)`, org.OrgID(), pq.Array(urnsToAttach), contactID) - if err != nil { - tx.Rollback() - return NilContactID, errors.Wrapf(err, "error attaching existing URNs to new contact") + if orphanURNID != NilURNID { + _, err := db.ExecContext(ctx, `UPDATE contacts_contacturn SET contact_id = $2, priority = $3 WHERE id = $1`, orphanURNID, contactID, priority) + if err != nil { + return NilContactID, errors.Wrapf(err, "error attaching existing URN to new contact") + } + } else { + _, err := db.ExecContext( + ctx, + `INSERT INTO contacts_contacturn(org_id, identity, path, scheme, display, auth, priority, channel_id, contact_id) + VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + orgID, urn.Identity(), urn.Path(), urn.Scheme(), urn.Display(), GetURNAuth(urn), priority, nil, contactID, + ) + if err != nil { + return NilContactID, err + } } - } - // try to commit - err = tx.Commit() - if err != nil { - tx.Rollback() - return NilContactID, err + priority-- } return contactID, nil @@ -1246,23 +1322,9 @@ func UpdateContactURNs(ctx context.Context, tx Queryer, org *OrgAssets, changes if len(inserts) > 0 { // find the unique ids of the contacts that may be affected by our URN inserts - rows, err := tx.QueryxContext(ctx, - `SELECT contact_id FROM contacts_contacturn WHERE identity = ANY($1) AND org_id = $2 AND contact_id IS NOT NULL`, - pq.Array(identities), org.OrgID(), - ) + orphanedIDs, err := queryContactIDs(ctx, tx, `SELECT contact_id FROM contacts_contacturn WHERE identity = ANY($1) AND org_id = $2 AND contact_id IS NOT NULL`, pq.Array(identities), org.OrgID()) if err != nil { - return errors.Wrapf(err, "error finding contacts for urns") - } - defer rows.Close() - - orphanedIDs := make([]ContactID, 0, len(inserts)) - for rows.Next() { - var contactID ContactID - err := rows.Scan(&contactID) - if err != nil { - return errors.Wrapf(err, "error reading orphaned contacts") - } - orphanedIDs = append(orphanedIDs, contactID) + return errors.Wrapf(err, "error finding contacts for URNs") } // then insert new urns, we do these one by one since we have to deal with conflicts diff --git a/models/contacts_test.go b/models/contacts_test.go index 8cdbcb070..c9e12a23e 100644 --- a/models/contacts_test.go +++ b/models/contacts_test.go @@ -2,6 +2,7 @@ package models_test import ( "fmt" + "sync" "testing" "time" @@ -513,37 +514,61 @@ func TestContacts(t *testing.T) { assert.Equal(t, "whatsapp:250788373373?id=20121&priority=998", bob.URNs()[2].String()) } -func TestContactsFromURN(t *testing.T) { +func TestGetOrCreateContactIDsFromURNs(t *testing.T) { ctx := testsuite.CTX() db := testsuite.DB() testsuite.Reset() - var maxContactID int64 + // add an orphaned URN + testdata.InsertContactURN(t, db, models.Org1, models.NilContactID, urns.URN("telegram:200001"), 100) + + var maxContactID models.ContactID db.Get(&maxContactID, `SELECT max(id) FROM contacts_contact`) + newContact := func() models.ContactID { maxContactID++; return maxContactID } + prevContact := func() models.ContactID { return maxContactID } + + org, err := models.GetOrgAssets(ctx, db, models.Org1) + assert.NoError(t, err) tcs := []struct { - OrgID models.OrgID - URN urns.URN - ContactID models.ContactID + OrgID models.OrgID + URNs []urns.URN + ContactIDs map[urns.URN]models.ContactID }{ - {models.Org1, models.CathyURN, models.CathyID}, - {models.Org1, urns.URN(models.CathyURN.String() + "?foo=bar"), models.CathyID}, - {models.Org1, urns.URN("telegram:12345678"), models.ContactID(maxContactID + 1)}, - {models.Org1, urns.URN("telegram:12345678"), models.ContactID(maxContactID + 1)}, + { + models.Org1, + []urns.URN{models.CathyURN}, + map[urns.URN]models.ContactID{models.CathyURN: models.CathyID}, + }, + { + models.Org1, + []urns.URN{urns.URN(models.CathyURN.String() + "?foo=bar")}, + map[urns.URN]models.ContactID{urns.URN(models.CathyURN.String() + "?foo=bar"): models.CathyID}, + }, + { + models.Org1, + []urns.URN{models.CathyURN, urns.URN("telegram:100001")}, + map[urns.URN]models.ContactID{ + models.CathyURN: models.CathyID, + urns.URN("telegram:100001"): newContact(), + }, + }, + { + models.Org1, + []urns.URN{urns.URN("telegram:100001")}, + map[urns.URN]models.ContactID{urns.URN("telegram:100001"): prevContact()}, + }, + { + models.Org1, + []urns.URN{urns.URN("telegram:200001")}, + map[urns.URN]models.ContactID{urns.URN("telegram:200001"): newContact()}, // new contact assigned orphaned URN + }, } - org, err := models.GetOrgAssets(ctx, db, models.Org1) - assert.NoError(t, err) - for i, tc := range tcs { - ids, err := models.ContactIDsFromURNs(ctx, db, org, []urns.URN{tc.URN}) + ids, err := models.GetOrCreateContactIDsFromURNs(ctx, db, org, tc.URNs) assert.NoError(t, err, "%d: error getting contact ids", i) - - if len(ids) != 1 { - assert.Fail(t, "%d: unexpected number of ids returned", i) - continue - } - assert.Equal(t, tc.ContactID, ids[tc.URN], "%d: mismatch in contact ids", i) + assert.Equal(t, tc.ContactIDs, ids, "%d: mismatch in contact ids", i) } } @@ -552,28 +577,136 @@ func TestGetOrCreateContact(t *testing.T) { db := testsuite.DB() testsuite.Reset() - var maxContactID int64 + // add some orphaned URNs + testdata.InsertContactURN(t, db, models.Org1, models.NilContactID, urns.URN("telegram:200001"), 100) + testdata.InsertContactURN(t, db, models.Org1, models.NilContactID, urns.URN("telegram:200002"), 100) + + var maxContactID models.ContactID db.Get(&maxContactID, `SELECT max(id) FROM contacts_contact`) + newContact := func() models.ContactID { maxContactID++; return maxContactID } + prevContact := func() models.ContactID { return maxContactID } + + oa, err := models.GetOrgAssets(ctx, db, models.Org1) + assert.NoError(t, err) tcs := []struct { - OrgID models.OrgID - URN urns.URN - ContactID models.ContactID + OrgID models.OrgID + URNs []urns.URN + ContactID models.ContactID + ContactURNs []urns.URN }{ - {models.Org1, models.CathyURN, models.CathyID}, - {models.Org1, urns.URN(models.CathyURN.String() + "?foo=bar"), models.CathyID}, - {models.Org1, urns.URN("telegram:12345678"), models.ContactID(maxContactID + 3)}, - {models.Org1, urns.URN("telegram:12345678"), models.ContactID(maxContactID + 3)}, + { + models.Org1, + []urns.URN{models.CathyURN}, + models.CathyID, + []urns.URN{"tel:+16055741111?id=10000&priority=1000"}, + }, + { + models.Org1, + []urns.URN{urns.URN(models.CathyURN.String() + "?foo=bar")}, + models.CathyID, // only URN identity is considered + []urns.URN{"tel:+16055741111?id=10000&priority=1000"}, + }, + { + models.Org1, + []urns.URN{urns.URN("telegram:100001")}, + newContact(), // creates new contact + []urns.URN{"telegram:100001?id=20123&priority=1000"}, + }, + { + models.Org1, + []urns.URN{urns.URN("telegram:100001")}, + prevContact(), // returns the same created contact + []urns.URN{"telegram:100001?id=20123&priority=1000"}, + }, + { + models.Org1, + []urns.URN{urns.URN("telegram:100001"), urns.URN("telegram:100002")}, + prevContact(), // same again as other URNs don't exist + []urns.URN{"telegram:100001?id=20123&priority=1000"}, + }, + { + models.Org1, + []urns.URN{urns.URN("telegram:100002"), urns.URN("telegram:100001")}, + prevContact(), // same again as other URNs don't exist + []urns.URN{"telegram:100001?id=20123&priority=1000"}, + }, + { + models.Org1, + []urns.URN{urns.URN("telegram:200001"), urns.URN("telegram:100001")}, + prevContact(), // same again as other URNs are orphaned + []urns.URN{"telegram:100001?id=20123&priority=1000"}, + }, + { + models.Org1, + []urns.URN{urns.URN("telegram:100003"), urns.URN("telegram:100004")}, // 2 new URNs + newContact(), + []urns.URN{"telegram:100003?id=20124&priority=1000", "telegram:100004?id=20125&priority=999"}, + }, + { + models.Org1, + []urns.URN{urns.URN("telegram:100005"), urns.URN("telegram:200002")}, // 1 new, 1 orphaned + newContact(), + []urns.URN{"telegram:100005?id=20126&priority=1000", "telegram:200002?id=20122&priority=999"}, + }, } - org, err := models.GetOrgAssets(ctx, db, models.Org1) - assert.NoError(t, err) - for i, tc := range tcs { - contact, _, err := models.GetOrCreateContact(ctx, db, org, tc.URN) + contact, flowContact, err := models.GetOrCreateContact(ctx, db, oa, tc.URNs) assert.NoError(t, err, "%d: error creating contact", i) - assert.Equal(t, tc.ContactID, contact.ID(), "%d: mismatch in contact id", i) + + assert.Equal(t, tc.ContactID, contact.ID(), "%d: contact id mismatch", i) + assert.Equal(t, tc.ContactURNs, flowContact.URNs().RawURNs(), "%d: URNs mismatch", i) + } +} + +func TestGetOrCreateContactRace(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + + oa, err := models.GetOrgAssets(ctx, db, models.Org1) + assert.NoError(t, err) + + mdb := testsuite.NewMockDB(db, func(funcName string, call int) error { + // Make beginning a transaction take a while to create race condition. All threads will fetch + // URN owners and decide nobody owns the URN, so all threads will try to create a new contact. + if funcName == "BeginTxx" { + time.Sleep(100 * time.Millisecond) + } + return nil + }) + + var contacts [2]*models.Contact + var errs [2]error + wg := &sync.WaitGroup{} + + getOrCreate := func(i int) { + defer wg.Done() + contacts[i], _, errs[i] = models.GetOrCreateContact(ctx, mdb, oa, []urns.URN{urns.URN("telegram:100007")}) } + + wg.Add(2) + go getOrCreate(0) + go getOrCreate(1) + + // let both finish + wg.Wait() + + require.NoError(t, errs[0]) + require.NoError(t, errs[1]) + assert.Equal(t, contacts[0].ID(), contacts[1].ID()) +} + +func TestGetContactIDsFromReferences(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + + ids, err := models.GetContactIDsFromReferences(ctx, db, models.Org1, []*flows.ContactReference{ + flows.NewContactReference(models.CathyUUID, "Cathy"), + flows.NewContactReference(models.BobUUID, "Bob"), + }) + require.NoError(t, err) + assert.ElementsMatch(t, []models.ContactID{models.CathyID, models.BobID}, ids) } func TestStopContact(t *testing.T) { diff --git a/models/events.go b/models/events.go index cc54b5656..f21250a0d 100644 --- a/models/events.go +++ b/models/events.go @@ -235,3 +235,27 @@ func HandleAndCommitEvents(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa } return nil } + +// ApplyModifiers modifies contacts by applying modifiers and handling the resultant events +func ApplyModifiers(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *OrgAssets, modifiersByContact map[*flows.Contact][]flows.Modifier) (map[*flows.Contact][]flows.Event, error) { + // create an environment instance with location support + env := flows.NewEnvironment(oa.Env(), oa.SessionAssets().Locations()) + + eventsByContact := make(map[*flows.Contact][]flows.Event, len(modifiersByContact)) + + // apply the modifiers to get the events for each contact + for contact, mods := range modifiersByContact { + events := make([]flows.Event, 0) + for _, mod := range mods { + mod.Apply(env, oa.SessionAssets(), contact, func(e flows.Event) { events = append(events, e) }) + } + eventsByContact[contact] = events + } + + err := HandleAndCommitEvents(ctx, db, rp, oa, eventsByContact) + if err != nil { + return nil, errors.Wrap(err, "error commiting events") + } + + return eventsByContact, nil +} diff --git a/models/imports.go b/models/imports.go new file mode 100644 index 000000000..6921d7a44 --- /dev/null +++ b/models/imports.go @@ -0,0 +1,329 @@ +package models + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/gocommon/urns" + "github.com/nyaruka/goflow/assets" + "github.com/nyaruka/goflow/envs" + "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/flows/modifiers" + "github.com/pkg/errors" + + "github.com/jmoiron/sqlx" +) + +// ContactImportID is the type for contact import IDs +type ContactImportID int64 + +// ContactImportBatchID is the type for contact import batch IDs +type ContactImportBatchID int64 + +// ContactImportStatus is the status of an import +type ContactImportStatus string + +// import status constants +const ( + ContactImportStatusPending ContactImportStatus = "P" + ContactImportStatusProcessing ContactImportStatus = "O" + ContactImportStatusComplete ContactImportStatus = "C" + ContactImportStatusFailed ContactImportStatus = "F" +) + +// ContactImportBatch is a batch of contacts within a larger import +type ContactImportBatch struct { + ID ContactImportBatchID `db:"id"` + ImportID ContactImportID `db:"contact_import_id"` + Status ContactImportStatus `db:"status"` + Specs json.RawMessage `db:"specs"` + + // the range of records from the entire import contained in this batch + RecordStart int `db:"record_start"` + RecordEnd int `db:"record_end"` + + // results written after processing this batch + NumCreated int `db:"num_created"` + NumUpdated int `db:"num_updated"` + NumErrored int `db:"num_errored"` + Errors json.RawMessage `db:"errors"` + FinishedOn *time.Time `db:"finished_on"` +} + +// Import does the actual import of this batch +func (b *ContactImportBatch) Import(ctx context.Context, db *sqlx.DB, orgID OrgID) error { + // if any error occurs this batch should be marked as failed + if err := b.tryImport(ctx, db, orgID); err != nil { + b.markFailed(ctx, db) + return err + } + return nil +} + +// holds work data for import of a single contact +type importContact struct { + record int + spec *ContactSpec + contact *Contact + created bool + flowContact *flows.Contact + mods []flows.Modifier + errors []string +} + +func (b *ContactImportBatch) tryImport(ctx context.Context, db *sqlx.DB, orgID OrgID) error { + if err := b.markProcessing(ctx, db); err != nil { + return errors.Wrap(err, "error marking as processing") + } + + // grab our org assets + oa, err := GetOrgAssets(ctx, db, orgID) + if err != nil { + return errors.Wrap(err, "error loading org assets") + } + + // unmarshal this batch's specs + var specs []*ContactSpec + if err := jsonx.Unmarshal(b.Specs, &specs); err != nil { + return errors.Wrap(err, "error unmarsaling specs") + } + + // create our work data for each contact being created or updated + imports := make([]*importContact, len(specs)) + for i := range imports { + imports[i] = &importContact{record: b.RecordStart + i, spec: specs[i]} + } + + if err := b.getOrCreateContacts(ctx, db, oa, imports); err != nil { + return errors.Wrap(err, "error getting and creating contacts") + } + + // gather up contacts and modifiers + modifiersByContact := make(map[*flows.Contact][]flows.Modifier, len(imports)) + for _, imp := range imports { + // ignore errored imports which couldn't get/create a contact + if imp.contact != nil { + modifiersByContact[imp.flowContact] = imp.mods + } + } + + // and apply in bulk + _, err = ApplyModifiers(ctx, db, nil, oa, modifiersByContact) + if err != nil { + return errors.Wrap(err, "error applying modifiers") + } + + if err := b.markComplete(ctx, db, imports); err != nil { + return errors.Wrap(err, "unable to mark as complete") + } + + return nil +} + +// for each import, fetches or creates the contact, creates the modifiers needed to set fields etc +func (b *ContactImportBatch) getOrCreateContacts(ctx context.Context, db *sqlx.DB, oa *OrgAssets, imports []*importContact) error { + sa := oa.SessionAssets() + + // build map of UUIDs to contacts + contactsByUUID, err := b.loadContactsByUUID(ctx, db, oa, imports) + if err != nil { + return errors.Wrap(err, "error loading contacts by UUID") + } + + for _, imp := range imports { + addModifier := func(m flows.Modifier) { imp.mods = append(imp.mods, m) } + addError := func(s string, args ...interface{}) { imp.errors = append(imp.errors, fmt.Sprintf(s, args...)) } + spec := imp.spec + + uuid := spec.UUID + if uuid != "" { + imp.contact = contactsByUUID[uuid] + if imp.contact == nil { + addError("Unable to find contact with UUID '%s'", uuid) + continue + } + + imp.flowContact, err = imp.contact.FlowContact(oa) + if err != nil { + return errors.Wrapf(err, "error creating flow contact for %d", imp.contact.ID()) + } + + } else { + imp.contact, imp.flowContact, err = GetOrCreateContact(ctx, db, oa, spec.URNs) + if err != nil { + urnStrs := make([]string, len(spec.URNs)) + for i := range spec.URNs { + urnStrs[i] = string(spec.URNs[i].Identity()) + } + + addError("Unable to find or create contact with URNs %s", strings.Join(urnStrs, ", ")) + continue + } + } + + addModifier(modifiers.NewURNs(spec.URNs, modifiers.URNsAppend)) + + if spec.Name != nil { + addModifier(modifiers.NewName(*spec.Name)) + } + if spec.Language != nil { + lang, err := envs.ParseLanguage(*spec.Language) + if err != nil { + addError("'%s' is not a valid language code", *spec.Language) + } else { + addModifier(modifiers.NewLanguage(lang)) + } + } + + for key, value := range spec.Fields { + field := sa.Fields().Get(key) + if field == nil { + addError("'%s' is not a valid contact field key", key) + } else { + addModifier(modifiers.NewField(field, value)) + } + } + + if len(spec.Groups) > 0 { + groups := make([]*flows.Group, 0, len(spec.Groups)) + for _, uuid := range spec.Groups { + group := sa.Groups().Get(uuid) + if group == nil { + addError("'%s' is not a valid contact group UUID", uuid) + } else { + groups = append(groups, group) + } + } + addModifier(modifiers.NewGroups(groups, modifiers.GroupsAdd)) + } + } + + return nil +} + +// loads any import contacts for which we have UUIDs +func (b *ContactImportBatch) loadContactsByUUID(ctx context.Context, db *sqlx.DB, oa *OrgAssets, imports []*importContact) (map[flows.ContactUUID]*Contact, error) { + uuids := make([]flows.ContactUUID, 0, 50) + for _, imp := range imports { + if imp.spec.UUID != "" { + uuids = append(uuids, imp.spec.UUID) + } + } + + // build map of UUIDs to contacts + contacts, err := LoadContactsByUUID(ctx, db, oa, uuids) + if err != nil { + return nil, err + } + + contactsByUUID := make(map[flows.ContactUUID]*Contact, len(contacts)) + for _, c := range contacts { + contactsByUUID[c.UUID()] = c + } + return contactsByUUID, nil +} + +func (b *ContactImportBatch) markProcessing(ctx context.Context, db *sqlx.DB) error { + b.Status = ContactImportStatusProcessing + _, err := db.ExecContext(ctx, `UPDATE contacts_contactimportbatch SET status = $2 WHERE id = $1`, b.ID, b.Status) + return err +} + +func (b *ContactImportBatch) markComplete(ctx context.Context, db *sqlx.DB, imports []*importContact) error { + numCreated := 0 + numUpdated := 0 + numErrored := 0 + importErrors := make([]importError, 0, 10) + for _, imp := range imports { + if imp.contact == nil { + numErrored++ + } else if imp.created { + numCreated++ + } else { + numUpdated++ + } + for _, e := range imp.errors { + importErrors = append(importErrors, importError{Record: imp.record, Message: e}) + } + } + + errorsJSON, err := jsonx.Marshal(importErrors) + if err != nil { + return errors.Wrap(err, "error marshaling errors") + } + + now := dates.Now() + b.Status = ContactImportStatusComplete + b.NumCreated = numCreated + b.NumUpdated = numUpdated + b.NumErrored = numErrored + b.Errors = errorsJSON + b.FinishedOn = &now + _, err = db.NamedExecContext(ctx, + `UPDATE + contacts_contactimportbatch + SET + status = :status, + num_created = :num_created, + num_updated = :num_updated, + num_errored = :num_errored, + errors = :errors, + finished_on = :finished_on + WHERE + id = :id`, + b, + ) + return err +} + +func (b *ContactImportBatch) markFailed(ctx context.Context, db *sqlx.DB) error { + now := dates.Now() + b.Status = ContactImportStatusFailed + b.FinishedOn = &now + _, err := db.ExecContext(ctx, `UPDATE contacts_contactimportbatch SET status = $2, finished_on = $3 WHERE id = $1`, b.ID, b.Status, b.FinishedOn) + return err +} + +var loadContactImportBatchSQL = ` +SELECT + id, + contact_import_id, + status, + specs, + record_start, + record_end +FROM + contacts_contactimportbatch +WHERE + id = $1` + +// LoadContactImportBatch loads a contact import batch by ID +func LoadContactImportBatch(ctx context.Context, db Queryer, id ContactImportBatchID) (*ContactImportBatch, error) { + b := &ContactImportBatch{} + err := db.GetContext(ctx, b, loadContactImportBatchSQL, id) + if err != nil { + return nil, err + } + return b, nil +} + +// ContactSpec describes a contact to be updated or created +type ContactSpec struct { + UUID flows.ContactUUID `json:"uuid"` + Name *string `json:"name"` + Language *string `json:"language"` + URNs []urns.URN `json:"urns"` + Fields map[string]string `json:"fields"` + Groups []assets.GroupUUID `json:"groups"` +} + +// an error message associated with a particular record +type importError struct { + Record int `json:"record"` + Message string `json:"message"` +} diff --git a/models/imports_test.go b/models/imports_test.go new file mode 100644 index 000000000..ddcb8a77f --- /dev/null +++ b/models/imports_test.go @@ -0,0 +1,223 @@ +package models_test + +import ( + "context" + "encoding/json" + "io/ioutil" + "sort" + "strings" + "testing" + + "github.com/jmoiron/sqlx" + "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/gocommon/urns" + "github.com/nyaruka/gocommon/uuids" + "github.com/nyaruka/goflow/assets" + "github.com/nyaruka/goflow/excellent/types" + "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/test" + _ "github.com/nyaruka/mailroom/hooks" + "github.com/nyaruka/mailroom/models" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestContactImports(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + testsuite.Reset() + defer testsuite.Reset() + + models.FlushCache() + + testdata.DeleteContactsAndURNs(t, db) + + // add contact in other org to make sure we can't update it + testdata.InsertContact(t, db, models.Org2, "f7a8016d-69a6-434b-aae7-5142ce4a98ba", "Xavier", "spa") + + // add dynamic group to test imported contacts are added to it + testdata.InsertContactGroup(t, db, models.Org1, "fc32f928-ad37-477c-a88e-003d30fd7406", "Adults", "age >= 40") + + testJSON, err := ioutil.ReadFile("testdata/imports.json") + require.NoError(t, err) + + tcs := []struct { + Description string `json:"description"` + Specs json.RawMessage `json:"specs"` + NumCreated int `json:"num_created"` + NumUpdated int `json:"num_updated"` + NumErrored int `json:"num_errored"` + Errors json.RawMessage `json:"errors"` + Contacts []*models.ContactSpec `json:"contacts"` + }{} + err = jsonx.Unmarshal(testJSON, &tcs) + require.NoError(t, err) + + oa, err := models.GetOrgAssets(ctx, db, 1) + require.NoError(t, err) + + uuids.SetGenerator(uuids.NewSeededGenerator(12345)) + defer uuids.SetGenerator(uuids.DefaultGenerator) + + for i, tc := range tcs { + importID := testdata.InsertContactImport(t, db, models.Org1) + batchID := testdata.InsertContactImportBatch(t, db, importID, tc.Specs) + + batch, err := models.LoadContactImportBatch(ctx, db, batchID) + require.NoError(t, err) + + err = batch.Import(ctx, db, models.Org1) + require.NoError(t, err) + + results := &struct { + NumCreated int `db:"num_created"` + NumUpdated int `db:"num_updated"` + NumErrored int `db:"num_errored"` + Errors json.RawMessage `db:"errors"` + }{} + err = db.Get(results, `SELECT num_created, num_updated, num_errored, errors FROM contacts_contactimportbatch WHERE id = $1`, batchID) + require.NoError(t, err) + + // load all contacts and convert to specs + contacts := loadAllContacts(t, db, oa) + specs := make([]*models.ContactSpec, len(contacts)) + for i, contact := range contacts { + name := contact.Name() + lang := string(contact.Language()) + groupUUIDs := make([]assets.GroupUUID, len(contact.Groups().All())) + for j, group := range contact.Groups().All() { + groupUUIDs[j] = group.UUID() + } + sort.Slice(groupUUIDs, func(i, j int) bool { return strings.Compare(string(groupUUIDs[i]), string(groupUUIDs[j])) < 0 }) + + fields := make(map[string]string) + for key, fv := range contact.Fields() { + val := types.Render(fv.ToXValue(oa.Env())) + if val != "" { + fields[key] = val + } + } + specs[i] = &models.ContactSpec{ + UUID: contact.UUID(), + Name: &name, + Language: &lang, + URNs: contact.URNs().RawURNs(), + Fields: fields, + Groups: groupUUIDs, + } + } + + actual := tc + actual.NumCreated = results.NumCreated + actual.NumUpdated = results.NumUpdated + actual.NumErrored = results.NumErrored + actual.Errors = results.Errors + actual.Contacts = specs + + if !test.UpdateSnapshots { + assert.Equal(t, tc.NumCreated, actual.NumCreated, "created contacts mismatch in '%s'", tc.Description) + assert.Equal(t, tc.NumUpdated, actual.NumUpdated, "updated contacts mismatch in '%s'", tc.Description) + assert.Equal(t, tc.NumErrored, actual.NumErrored, "errored contacts mismatch in '%s'", tc.Description) + + test.AssertEqualJSON(t, tc.Errors, actual.Errors, "errors mismatch in '%s'", tc.Description) + + actualJSON, _ := jsonx.Marshal(actual.Contacts) + expectedJSON, _ := jsonx.Marshal(tc.Contacts) + test.AssertEqualJSON(t, expectedJSON, actualJSON, "imported contacts mismatch in '%s'", tc.Description) + } else { + tcs[i] = actual + } + } + + if test.UpdateSnapshots { + testJSON, err = jsonx.MarshalPretty(tcs) + require.NoError(t, err) + + err = ioutil.WriteFile("testdata/imports.json", testJSON, 0600) + require.NoError(t, err) + } +} + +func TestContactImportBatch(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + + importID := testdata.InsertContactImport(t, db, models.Org1) + batchID := testdata.InsertContactImportBatch(t, db, importID, []byte(`[ + {"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]}, + {"name": "Leah", "urns": ["tel:+16055740002"]} + ]`)) + + batch, err := models.LoadContactImportBatch(ctx, db, batchID) + require.NoError(t, err) + + assert.Equal(t, importID, batch.ImportID) + assert.Equal(t, models.ContactImportStatus("P"), batch.Status) + assert.NotNil(t, batch.Specs) + assert.Equal(t, 0, batch.RecordStart) + assert.Equal(t, 2, batch.RecordEnd) + + err = batch.Import(ctx, db, models.Org1) + require.NoError(t, err) + + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contactimportbatch WHERE status = 'C' AND finished_on IS NOT NULL`, []interface{}{}, 1) +} + +func TestContactSpecUnmarshal(t *testing.T) { + s := &models.ContactSpec{} + jsonx.Unmarshal([]byte(`{}`), s) + + assert.Equal(t, flows.ContactUUID(""), s.UUID) + assert.Nil(t, s.Name) + assert.Nil(t, s.Language) + assert.Nil(t, s.URNs) + assert.Nil(t, s.Fields) + assert.Nil(t, s.Groups) + + s = &models.ContactSpec{} + jsonx.Unmarshal([]byte(`{ + "uuid": "8e879527-7e6d-4bff-abc8-b1d41cd4f702", + "name": "Bob", + "language": "spa", + "urns": ["tel:+1234567890"], + "fields": {"age": "39"}, + "groups": ["3972dcc2-6749-4761-a896-7880d6165f2c"] + }`), s) + + assert.Equal(t, flows.ContactUUID("8e879527-7e6d-4bff-abc8-b1d41cd4f702"), s.UUID) + assert.Equal(t, "Bob", *s.Name) + assert.Equal(t, "spa", *s.Language) + assert.Equal(t, []urns.URN{"tel:+1234567890"}, s.URNs) + assert.Equal(t, map[string]string{"age": "39"}, s.Fields) + assert.Equal(t, []assets.GroupUUID{"3972dcc2-6749-4761-a896-7880d6165f2c"}, s.Groups) +} + +// utility to load all contacts for the given org and return as slice sorted by ID +func loadAllContacts(t *testing.T, db *sqlx.DB, oa *models.OrgAssets) []*flows.Contact { + rows, err := db.Queryx(`SELECT id FROM contacts_contact WHERE org_id = $1`, oa.OrgID()) + require.NoError(t, err) + defer rows.Close() + + var allIDs []models.ContactID + var id models.ContactID + for rows.Next() { + rows.Scan(&id) + allIDs = append(allIDs, id) + } + + contacts, err := models.LoadContacts(context.Background(), db, oa, allIDs) + require.NoError(t, err) + + sort.Slice(contacts, func(i, j int) bool { return contacts[i].ID() < contacts[j].ID() }) + + flowContacts := make([]*flows.Contact, len(contacts)) + for i := range contacts { + flowContacts[i], err = contacts[i].FlowContact(oa) + require.NoError(t, err) + } + + return flowContacts +} diff --git a/models/msgs.go b/models/msgs.go index d4b3d89d1..2a52d7278 100644 --- a/models/msgs.go +++ b/models/msgs.go @@ -683,7 +683,7 @@ func NewBroadcastFromEvent(ctx context.Context, tx Queryer, org *OrgAssets, even } // resolve our contact references - contactIDs, err := ContactIDsFromReferences(ctx, tx, org, event.Contacts) + contactIDs, err := GetContactIDsFromReferences(ctx, tx, org.OrgID(), event.Contacts) if err != nil { return nil, errors.Wrapf(err, "error resolving contact references") } diff --git a/models/testdata/imports.json b/models/testdata/imports.json new file mode 100644 index 000000000..47fda5389 --- /dev/null +++ b/models/testdata/imports.json @@ -0,0 +1,457 @@ +[ + { + "description": "2 new contacts with name and phone", + "specs": [ + { + "name": "Ann", + "urns": [ + "tel:+1 605 570 0001" + ] + }, + { + "name": "Bob", + "urns": [ + "tel:+16055700002" + ] + } + ], + "num_created": 0, + "num_updated": 2, + "num_errored": 0, + "errors": [], + "contacts": [ + { + "uuid": "1ae96956-4b34-433e-8d1a-f05fe6923d6d", + "name": "Ann", + "language": "", + "urns": [ + "tel:+16055700001?id=10000&priority=1000" + ], + "fields": {}, + "groups": [] + }, + { + "uuid": "e7187099-7d38-4f60-955c-325957214c42", + "name": "Bob", + "language": "", + "urns": [ + "tel:+16055700002?id=10001&priority=1000" + ], + "fields": {}, + "groups": [] + } + ] + }, + { + "description": "1 new contact with all fields", + "specs": [ + { + "name": "Cat", + "language": "spa", + "urns": [ + "tel:+16055700003", + "tel:+593979000001" + ], + "fields": { + "age": "40" + }, + "groups": [ + "c153e265-f7c9-4539-9dbc-9b358714b638", + "5e9d8fab-5e7e-4f51-b533-261af5dea70d" + ] + } + ], + "num_created": 0, + "num_updated": 1, + "num_errored": 0, + "errors": [], + "contacts": [ + { + "uuid": "1ae96956-4b34-433e-8d1a-f05fe6923d6d", + "name": "Ann", + "language": "", + "urns": [ + "tel:+16055700001?id=10000&priority=1000" + ], + "fields": {}, + "groups": [] + }, + { + "uuid": "e7187099-7d38-4f60-955c-325957214c42", + "name": "Bob", + "language": "", + "urns": [ + "tel:+16055700002?id=10001&priority=1000" + ], + "fields": {}, + "groups": [] + }, + { + "uuid": "59d74b86-3e2f-4a93-aece-b05d2fdcde0c", + "name": "Cat", + "language": "spa", + "urns": [ + "tel:+16055700003?id=10002&priority=1000", + "tel:+593979000001?id=10003&priority=999" + ], + "fields": { + "age": "40" + }, + "groups": [ + "5e9d8fab-5e7e-4f51-b533-261af5dea70d", + "c153e265-f7c9-4539-9dbc-9b358714b638", + "fc32f928-ad37-477c-a88e-003d30fd7406" + ] + } + ] + }, + { + "description": "updating contacts with single URN", + "specs": [ + { + "name": "Bob", + "urns": [ + "tel:+16055700002", + "tel:+593 979 000002" + ], + "fields": { + "age": "28" + } + }, + { + "name": "Cat", + "urns": [ + "tel:+16055700003" + ], + "fields": { + "age": "39" + } + } + ], + "num_created": 0, + "num_updated": 2, + "num_errored": 0, + "errors": [], + "contacts": [ + { + "uuid": "1ae96956-4b34-433e-8d1a-f05fe6923d6d", + "name": "Ann", + "language": "", + "urns": [ + "tel:+16055700001?id=10000&priority=1000" + ], + "fields": {}, + "groups": [] + }, + { + "uuid": "e7187099-7d38-4f60-955c-325957214c42", + "name": "Bob", + "language": "", + "urns": [ + "tel:+16055700002?id=10001&priority=1000", + "tel:+593979000002?id=10004&priority=999" + ], + "fields": { + "age": "28" + }, + "groups": [] + }, + { + "uuid": "59d74b86-3e2f-4a93-aece-b05d2fdcde0c", + "name": "Cat", + "language": "spa", + "urns": [ + "tel:+16055700003?id=10002&priority=1000", + "tel:+593979000001?id=10003&priority=999" + ], + "fields": { + "age": "39" + }, + "groups": [ + "5e9d8fab-5e7e-4f51-b533-261af5dea70d", + "c153e265-f7c9-4539-9dbc-9b358714b638" + ] + } + ] + }, + { + "description": "updating contacts with multiple URNs (first contact has URNs from 2 different contacts so errors)", + "specs": [ + { + "name": "Ann", + "urns": [ + "tel:+16055700001", + "tel:+593979000001" + ], + "fields": { + "joined": "2020-01-01T10:45:30Z" + } + }, + { + "name": "Bob", + "urns": [ + "tel:+16055700002", + "tel:+593979000002" + ], + "fields": { + "joined": "2020-01-01T10:45:30Z" + } + }, + { + "name": "Cat", + "urns": [ + "tel:+593979000001", + "tel:+16055700003" + ], + "fields": { + "joined": "2020-02-01T17:15:30Z" + } + } + ], + "num_created": 0, + "num_updated": 2, + "num_errored": 1, + "errors": [ + { + "record": 0, + "message": "Unable to find or create contact with URNs tel:+16055700001, tel:+593979000001" + } + ], + "contacts": [ + { + "uuid": "1ae96956-4b34-433e-8d1a-f05fe6923d6d", + "name": "Ann", + "language": "", + "urns": [ + "tel:+16055700001?id=10000&priority=1000" + ], + "fields": {}, + "groups": [] + }, + { + "uuid": "e7187099-7d38-4f60-955c-325957214c42", + "name": "Bob", + "language": "", + "urns": [ + "tel:+16055700002?id=10001&priority=1000", + "tel:+593979000002?id=10004&priority=999" + ], + "fields": { + "age": "28", + "joined": "2020-01-01T10:45:30.000000Z" + }, + "groups": [] + }, + { + "uuid": "59d74b86-3e2f-4a93-aece-b05d2fdcde0c", + "name": "Cat", + "language": "spa", + "urns": [ + "tel:+16055700003?id=10002&priority=1000", + "tel:+593979000001?id=10003&priority=999" + ], + "fields": { + "age": "39", + "joined": "2020-02-01T17:15:30.000000Z" + }, + "groups": [ + "5e9d8fab-5e7e-4f51-b533-261af5dea70d", + "c153e265-f7c9-4539-9dbc-9b358714b638" + ] + } + ] + }, + { + "description": "updating contacts by UUID (last UUID doesn't exist)", + "specs": [ + { + "uuid": "1ae96956-4b34-433e-8d1a-f05fe6923d6d", + "name": "Anne" + }, + { + "uuid": "e7187099-7d38-4f60-955c-325957214c42", + "language": "kin", + "urns": [ + "tel:+16055700002" + ] + }, + { + "uuid": "68dc10e7-19ce-4052-b202-7c1b49e69ba0", + "name": "Jim" + } + ], + "num_created": 0, + "num_updated": 2, + "num_errored": 1, + "errors": [ + { + "record": 2, + "message": "Unable to find contact with UUID '68dc10e7-19ce-4052-b202-7c1b49e69ba0'" + } + ], + "contacts": [ + { + "uuid": "1ae96956-4b34-433e-8d1a-f05fe6923d6d", + "name": "Anne", + "language": "", + "urns": [ + "tel:+16055700001?id=10000&priority=1000" + ], + "fields": {}, + "groups": [] + }, + { + "uuid": "e7187099-7d38-4f60-955c-325957214c42", + "name": "Bob", + "language": "kin", + "urns": [ + "tel:+16055700002?id=10001&priority=1000", + "tel:+593979000002?id=10004&priority=999" + ], + "fields": { + "age": "28", + "joined": "2020-01-01T10:45:30.000000Z" + }, + "groups": [] + }, + { + "uuid": "59d74b86-3e2f-4a93-aece-b05d2fdcde0c", + "name": "Cat", + "language": "spa", + "urns": [ + "tel:+16055700003?id=10002&priority=1000", + "tel:+593979000001?id=10003&priority=999" + ], + "fields": { + "age": "39", + "joined": "2020-02-01T17:15:30.000000Z" + }, + "groups": [ + "5e9d8fab-5e7e-4f51-b533-261af5dea70d", + "c153e265-f7c9-4539-9dbc-9b358714b638" + ] + } + ] + }, + { + "description": "stealing URNs permitted with UUIDs", + "specs": [ + { + "uuid": "1ae96956-4b34-433e-8d1a-f05fe6923d6d", + "name": "Anne", + "urns": [ + "tel:+16055700002" + ] + } + ], + "num_created": 0, + "num_updated": 1, + "num_errored": 0, + "errors": [], + "contacts": [ + { + "uuid": "1ae96956-4b34-433e-8d1a-f05fe6923d6d", + "name": "Anne", + "language": "", + "urns": [ + "tel:+16055700001?id=10000&priority=1000", + "tel:+16055700002?id=10001&priority=999" + ], + "fields": {}, + "groups": [] + }, + { + "uuid": "e7187099-7d38-4f60-955c-325957214c42", + "name": "Bob", + "language": "kin", + "urns": [ + "tel:+593979000002?id=10004&priority=999" + ], + "fields": { + "age": "28", + "joined": "2020-01-01T10:45:30.000000Z" + }, + "groups": [] + }, + { + "uuid": "59d74b86-3e2f-4a93-aece-b05d2fdcde0c", + "name": "Cat", + "language": "spa", + "urns": [ + "tel:+16055700003?id=10002&priority=1000", + "tel:+593979000001?id=10003&priority=999" + ], + "fields": { + "age": "39", + "joined": "2020-02-01T17:15:30.000000Z" + }, + "groups": [ + "5e9d8fab-5e7e-4f51-b533-261af5dea70d", + "c153e265-f7c9-4539-9dbc-9b358714b638" + ] + } + ] + }, + { + "description": "can't reference contact in other org by UUID", + "specs": [ + { + "uuid": "f7a8016d-69a6-434b-aae7-5142ce4a98ba", + "name": "Xavier", + "language": "eng" + } + ], + "num_created": 0, + "num_updated": 0, + "num_errored": 1, + "errors": [ + { + "record": 0, + "message": "Unable to find contact with UUID 'f7a8016d-69a6-434b-aae7-5142ce4a98ba'" + } + ], + "contacts": [ + { + "uuid": "1ae96956-4b34-433e-8d1a-f05fe6923d6d", + "name": "Anne", + "language": "", + "urns": [ + "tel:+16055700001?id=10000&priority=1000", + "tel:+16055700002?id=10001&priority=999" + ], + "fields": {}, + "groups": [] + }, + { + "uuid": "e7187099-7d38-4f60-955c-325957214c42", + "name": "Bob", + "language": "kin", + "urns": [ + "tel:+593979000002?id=10004&priority=999" + ], + "fields": { + "age": "28", + "joined": "2020-01-01T10:45:30.000000Z" + }, + "groups": [] + }, + { + "uuid": "59d74b86-3e2f-4a93-aece-b05d2fdcde0c", + "name": "Cat", + "language": "spa", + "urns": [ + "tel:+16055700003?id=10002&priority=1000", + "tel:+593979000001?id=10003&priority=999" + ], + "fields": { + "age": "39", + "joined": "2020-02-01T17:15:30.000000Z" + }, + "groups": [ + "5e9d8fab-5e7e-4f51-b533-261af5dea70d", + "c153e265-f7c9-4539-9dbc-9b358714b638" + ] + } + ] + } +] \ No newline at end of file diff --git a/models/utils.go b/models/utils.go index 4ed3bb324..e3270313f 100644 --- a/models/utils.go +++ b/models/utils.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/jmoiron/sqlx" "github.com/nyaruka/mailroom/utils/dbutil" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -19,6 +20,12 @@ type Queryer interface { GetContext(ctx context.Context, value interface{}, query string, args ...interface{}) error } +type QueryerWithTx interface { + Queryer + + BeginTxx(ctx context.Context, opts *sql.TxOptions) (*sqlx.Tx, error) +} + // Exec calls ExecContext on the passed in Queryer, logging time taken if any rows were affected func Exec(ctx context.Context, label string, tx Queryer, sql string, args ...interface{}) error { start := time.Now() diff --git a/tasks/broadcasts/worker.go b/tasks/broadcasts/worker.go index 19fc73e30..15fa93688 100644 --- a/tasks/broadcasts/worker.go +++ b/tasks/broadcasts/worker.go @@ -62,7 +62,7 @@ func CreateBroadcastBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, bc } // get the contact ids for our URNs - urnMap, err := models.ContactIDsFromURNs(ctx, db, oa, bcast.URNs()) + urnMap, err := models.GetOrCreateContactIDsFromURNs(ctx, db, oa, bcast.URNs()) if err != nil { return errors.Wrapf(err, "error getting contact ids for urns") } diff --git a/tasks/contacts/import_contact_batch.go b/tasks/contacts/import_contact_batch.go new file mode 100644 index 000000000..38e037728 --- /dev/null +++ b/tasks/contacts/import_contact_batch.go @@ -0,0 +1,42 @@ +package contacts + +import ( + "context" + "time" + + "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/models" + "github.com/nyaruka/mailroom/tasks" + "github.com/pkg/errors" +) + +// TypeImportContactBatch is the type of the import contact batch task +const TypeImportContactBatch = "import_contact_batch" + +func init() { + tasks.RegisterType(TypeImportContactBatch, func() tasks.Task { return &ImportContactBatchTask{} }) +} + +// ImportContactBatchTask is our task to import a batch of contacts +type ImportContactBatchTask struct { + ContactImportBatchID models.ContactImportBatchID `json:"contact_import_batch_id"` +} + +// Timeout is the maximum amount of time the task can run for +func (t *ImportContactBatchTask) Timeout() time.Duration { + return time.Minute * 10 +} + +// Perform figures out the membership for a query based group then repopulates it +func (t *ImportContactBatchTask) Perform(ctx context.Context, mr *mailroom.Mailroom, orgID models.OrgID) error { + batch, err := models.LoadContactImportBatch(ctx, mr.DB, t.ContactImportBatchID) + if err != nil { + return errors.Wrapf(err, "unable to load contact import batch with id %d", t.ContactImportBatchID) + } + + if err := batch.Import(ctx, mr.DB, orgID); err != nil { + return errors.Wrapf(err, "unable to import contact import batch %d", t.ContactImportBatchID) + } + + return nil +} diff --git a/tasks/contacts/import_contact_batch_test.go b/tasks/contacts/import_contact_batch_test.go new file mode 100644 index 000000000..82b26548b --- /dev/null +++ b/tasks/contacts/import_contact_batch_test.go @@ -0,0 +1,36 @@ +package contacts_test + +import ( + "testing" + + "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/config" + _ "github.com/nyaruka/mailroom/hooks" + "github.com/nyaruka/mailroom/models" + "github.com/nyaruka/mailroom/tasks/contacts" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + + "github.com/stretchr/testify/require" +) + +func TestImportContactBatch(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + + mr := &mailroom.Mailroom{Config: config.Mailroom, DB: db, RP: testsuite.RP(), ElasticClient: nil} + + importID := testdata.InsertContactImport(t, db, models.Org1) + batchID := testdata.InsertContactImportBatch(t, db, importID, []byte(`[ + {"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]}, + {"name": "Leah", "urns": ["tel:+16055740002"]} + ]`)) + + task := &contacts.ImportContactBatchTask{ContactImportBatchID: batchID} + + err := task.Perform(ctx, mr, models.Org1) + require.NoError(t, err) + + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Norbert' AND language = 'eng'`, nil, 1) + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Leah' AND language IS NULL`, nil, 1) +} diff --git a/tasks/groups/populate_dynamic_group_test.go b/tasks/groups/populate_dynamic_group_test.go index 93db7930f..92657e7a3 100644 --- a/tasks/groups/populate_dynamic_group_test.go +++ b/tasks/groups/populate_dynamic_group_test.go @@ -58,7 +58,7 @@ func TestPopulateTask(t *testing.T) { } }`, models.CathyID) - groupID := testdata.InsertContactGroup(t, db, models.Org1, "Women", "gender = F") + groupID := testdata.InsertContactGroup(t, db, models.Org1, "e52fee05-2f95-4445-aef6-2fe7dac2fd56", "Women", "gender = F") task := &groups.PopulateDynamicGroupTask{ GroupID: groupID, diff --git a/tasks/starts/worker.go b/tasks/starts/worker.go index caab3bd62..d822e67dd 100644 --- a/tasks/starts/worker.go +++ b/tasks/starts/worker.go @@ -73,7 +73,7 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela // look up any contacts by URN if len(start.URNs()) > 0 { - urnContactIDs, err := models.ContactIDsFromURNs(ctx, db, oa, start.URNs()) + urnContactIDs, err := models.GetOrCreateContactIDsFromURNs(ctx, db, oa, start.URNs()) if err != nil { return errors.Wrapf(err, "error getting contact ids from urns") } diff --git a/testsuite/db.go b/testsuite/db.go index 351c90380..f4d5be28e 100644 --- a/testsuite/db.go +++ b/testsuite/db.go @@ -58,3 +58,10 @@ func (d *MockDB) GetContext(ctx context.Context, value interface{}, query string } return d.real.GetContext(ctx, value, query, args...) } + +func (d *MockDB) BeginTxx(ctx context.Context, opts *sql.TxOptions) (*sqlx.Tx, error) { + if err := d.check("BeginTxx"); err != nil { + return nil, err + } + return d.real.BeginTxx(ctx, opts) +} diff --git a/testsuite/testdata/contacts.go b/testsuite/testdata/contacts.go index 58fb303d0..7e20a29c5 100644 --- a/testsuite/testdata/contacts.go +++ b/testsuite/testdata/contacts.go @@ -4,7 +4,9 @@ import ( "testing" "github.com/nyaruka/gocommon/urns" - "github.com/nyaruka/gocommon/uuids" + "github.com/nyaruka/goflow/assets" + "github.com/nyaruka/goflow/envs" + "github.com/nyaruka/goflow/flows" "github.com/nyaruka/mailroom/models" "github.com/nyaruka/null" @@ -12,12 +14,23 @@ import ( "github.com/stretchr/testify/require" ) +// InsertContact inserts a contact +func InsertContact(t *testing.T, db *sqlx.DB, orgID models.OrgID, uuid flows.ContactUUID, name string, language envs.Language) models.ContactID { + var id models.ContactID + err := db.Get(&id, + `INSERT INTO contacts_contact (org_id, is_active, status, uuid, name, language, created_on, modified_on, created_by_id, modified_by_id) + VALUES($1, TRUE, 'A', $2, $3, $4, NOW(), NOW(), 1, 1) RETURNING id`, orgID, uuid, name, language, + ) + require.NoError(t, err) + return id +} + // InsertContactGroup inserts a contact group -func InsertContactGroup(t *testing.T, db *sqlx.DB, orgID models.OrgID, name, query string) models.GroupID { +func InsertContactGroup(t *testing.T, db *sqlx.DB, orgID models.OrgID, uuid assets.GroupUUID, name, query string) models.GroupID { var id models.GroupID err := db.Get(&id, `INSERT INTO contacts_contactgroup(uuid, org_id, group_type, name, query, status, is_active, created_by_id, created_on, modified_by_id, modified_on) - VALUES($1, $2, 'U', $3, $4, 'R', TRUE, 1, NOW(), 1, NOW()) RETURNING id`, uuids.New(), models.Org1, name, null.String(query), + VALUES($1, $2, 'U', $3, $4, 'R', TRUE, 1, NOW(), 1, NOW()) RETURNING id`, uuid, models.Org1, name, null.String(query), ) require.NoError(t, err) return id @@ -35,3 +48,14 @@ func InsertContactURN(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID m require.NoError(t, err) return id } + +// DeleteContactsAndURNs deletes all contacts and URNs +func DeleteContactsAndURNs(t *testing.T, db *sqlx.DB) { + db.MustExec(`DELETE FROM contacts_contacturn`) + db.MustExec(`DELETE FROM contacts_contactgroup_contacts`) + db.MustExec(`DELETE FROM contacts_contact`) + + // reset id sequences back to a known number + db.MustExec(`ALTER SEQUENCE contacts_contact_id_seq RESTART WITH 10000`) + db.MustExec(`ALTER SEQUENCE contacts_contacturn_id_seq RESTART WITH 10000`) +} diff --git a/testsuite/testdata/imports.go b/testsuite/testdata/imports.go new file mode 100644 index 000000000..3cb991df5 --- /dev/null +++ b/testsuite/testdata/imports.go @@ -0,0 +1,35 @@ +package testdata + +import ( + "encoding/json" + "testing" + + "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/mailroom/models" + + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/require" +) + +// InsertContactImport inserts a contact import +func InsertContactImport(t *testing.T, db *sqlx.DB, orgID models.OrgID) models.ContactImportID { + var importID models.ContactImportID + err := db.Get(&importID, `INSERT INTO contacts_contactimport(org_id, file, original_filename, headers, mappings, num_records, group_id, started_on, created_on, created_by_id, modified_on, modified_by_id, is_active) + VALUES($1, 'contact_imports/1234.xlsx', 'contacts.xlsx', '{"Name", "URN:Tel"}', '{}', 30, NULL, $2, $2, 1, $2, 1, TRUE) RETURNING id`, models.Org1, dates.Now()) + require.NoError(t, err) + return importID +} + +// InsertContactImportBatch inserts a contact import batch +func InsertContactImportBatch(t *testing.T, db *sqlx.DB, importID models.ContactImportID, specs json.RawMessage) models.ContactImportBatchID { + var splitSpecs []json.RawMessage + err := jsonx.Unmarshal(specs, &splitSpecs) + require.NoError(t, err) + + var batchID models.ContactImportBatchID + err = db.Get(&batchID, `INSERT INTO contacts_contactimportbatch(contact_import_id, status, specs, record_start, record_end, num_created, num_updated, num_errored, errors, finished_on) + VALUES($1, 'P', $2, 0, $3, 0, 0, 0, '[]', NULL) RETURNING id`, importID, specs, len(splitSpecs)) + require.NoError(t, err) + return batchID +} diff --git a/web/contact/contact.go b/web/contact/contact.go index f06fefa87..628f255b5 100644 --- a/web/contact/contact.go +++ b/web/contact/contact.go @@ -34,9 +34,9 @@ func init() { // } // type createRequest struct { - OrgID models.OrgID `json:"org_id" validate:"required"` - UserID models.UserID `json:"user_id"` - Contact *Spec `json:"contact" validate:"required"` + OrgID models.OrgID `json:"org_id" validate:"required"` + UserID models.UserID `json:"user_id"` + Contact *models.ContactSpec `json:"contact" validate:"required"` } // handles a request to create the given contacts @@ -52,7 +52,7 @@ func handleCreate(ctx context.Context, s *web.Server, r *http.Request) (interfac return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets") } - c, err := request.Contact.Validate(oa.Env(), oa.SessionAssets()) + c, err := SpecToCreation(request.Contact, oa.Env(), oa.SessionAssets()) if err != nil { return err, http.StatusBadRequest, nil } @@ -63,7 +63,7 @@ func handleCreate(ctx context.Context, s *web.Server, r *http.Request) (interfac } modifiersByContact := map[*flows.Contact][]flows.Modifier{contact: c.Mods} - _, err = ModifyContacts(ctx, s.DB, s.RP, oa, modifiersByContact) + _, err = models.ApplyModifiers(ctx, s.DB, s.RP, oa, modifiersByContact) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "error modifying new contact") } @@ -157,7 +157,7 @@ func handleModify(ctx context.Context, s *web.Server, r *http.Request) (interfac modifiersByContact[flowContact] = mods } - eventsByContact, err := ModifyContacts(ctx, s.DB, s.RP, oa, modifiersByContact) + eventsByContact, err := models.ApplyModifiers(ctx, s.DB, s.RP, oa, modifiersByContact) if err != nil { return nil, http.StatusBadRequest, err } diff --git a/web/contact/testdata/create.json b/web/contact/testdata/create.json index 3ba819678..b105fa054 100644 --- a/web/contact/testdata/create.json +++ b/web/contact/testdata/create.json @@ -150,8 +150,8 @@ "status": 200, "response": { "contact": { - "uuid": "c34b6c7d-fa06-4563-92a3-d648ab64bccb", - "id": 30003, + "uuid": "8720f157-ca1c-432f-9c0b-2014ddc77094", + "id": 30002, "name": "MarĂ­a", "status": "active", "timezone": "America/Los_Angeles", diff --git a/web/contact/utils.go b/web/contact/utils.go index 14a76f70d..e3a768e69 100644 --- a/web/contact/utils.go +++ b/web/contact/utils.go @@ -1,17 +1,12 @@ package contact import ( - "context" - "github.com/nyaruka/gocommon/urns" - "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/envs" "github.com/nyaruka/goflow/flows" "github.com/nyaruka/goflow/flows/modifiers" "github.com/nyaruka/mailroom/models" - "github.com/gomodule/redigo/redis" - "github.com/jmoiron/sqlx" "github.com/pkg/errors" ) @@ -23,23 +18,18 @@ type Creation struct { Mods []flows.Modifier } -// Spec describes a contact to be created -type Spec struct { - Name string `json:"name"` - Language string `json:"language"` - URNs []urns.URN `json:"urns"` - Fields map[string]string `json:"fields"` - Groups []assets.GroupUUID `json:"groups"` -} - -// Validate validates that the spec is valid for the given assets -func (s *Spec) Validate(env envs.Environment, sa flows.SessionAssets) (*Creation, error) { +// SpecToCreation validates that the spec is valid for the given assets +func SpecToCreation(s *models.ContactSpec, env envs.Environment, sa flows.SessionAssets) (*Creation, error) { country := string(env.DefaultCountry()) var err error - validated := &Creation{Name: s.Name} + validated := &Creation{} - if s.Language != "" { - validated.Language, err = envs.ParseLanguage(s.Language) + if s.Name != nil { + validated.Name = *s.Name + } + + if s.Language != nil && *s.Language != "" { + validated.Language, err = envs.ParseLanguage(*s.Language) if err != nil { return nil, errors.Wrap(err, "invalid language") } @@ -80,27 +70,3 @@ func (s *Spec) Validate(env envs.Environment, sa flows.SessionAssets) (*Creation return validated, nil } - -// ModifyContacts modifies contacts by applying modifiers and handling the resultant events -func ModifyContacts(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *models.OrgAssets, modifiersByContact map[*flows.Contact][]flows.Modifier) (map[*flows.Contact][]flows.Event, error) { - // create an environment instance with location support - env := flows.NewEnvironment(oa.Env(), oa.SessionAssets().Locations()) - - eventsByContact := make(map[*flows.Contact][]flows.Event, len(modifiersByContact)) - - // apply the modifiers to get the events for each contact - for contact, mods := range modifiersByContact { - events := make([]flows.Event, 0) - for _, mod := range mods { - mod.Apply(env, oa.SessionAssets(), contact, func(e flows.Event) { events = append(events, e) }) - } - eventsByContact[contact] = events - } - - err := models.HandleAndCommitEvents(ctx, db, rp, oa, eventsByContact) - if err != nil { - return nil, errors.Wrap(err, "error commiting events") - } - - return eventsByContact, nil -} diff --git a/web/contact/utils_test.go b/web/contact/utils_test.go index 3cb478d70..949ae9121 100644 --- a/web/contact/utils_test.go +++ b/web/contact/utils_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestValidateSpec(t *testing.T) { +func TestSpecToCreation(t *testing.T) { testsuite.Reset() db := testsuite.DB() ctx := testsuite.CTX() @@ -25,8 +25,8 @@ func TestValidateSpec(t *testing.T) { env := envs.NewBuilder().Build() // empty spec is valid - s := &contact.Spec{} - c, err := s.Validate(env, sa) + s := &models.ContactSpec{} + c, err := contact.SpecToCreation(s, env, sa) assert.NoError(t, err) assert.Equal(t, "", c.Name) assert.Equal(t, envs.NilLanguage, c.Language) @@ -34,22 +34,23 @@ func TestValidateSpec(t *testing.T) { assert.Equal(t, 0, len(c.Mods)) // try to set invalid language - s = &contact.Spec{Language: "xyzd"} - _, err = s.Validate(env, sa) + lang := "xyzd" + s = &models.ContactSpec{Language: &lang} + _, err = contact.SpecToCreation(s, env, sa) assert.EqualError(t, err, "invalid language: iso-639-3 codes must be 3 characters, got: xyzd") // try to set non-existent contact field - s = &contact.Spec{Fields: map[string]string{"goats": "7"}} - _, err = s.Validate(env, sa) + s = &models.ContactSpec{Fields: map[string]string{"goats": "7"}} + _, err = contact.SpecToCreation(s, env, sa) assert.EqualError(t, err, "unknown contact field 'goats'") // try to add to non-existent group - s = &contact.Spec{Groups: []assets.GroupUUID{"52f6c50e-f9a8-4f24-bb80-5c9f144ed27f"}} - _, err = s.Validate(env, sa) + s = &models.ContactSpec{Groups: []assets.GroupUUID{"52f6c50e-f9a8-4f24-bb80-5c9f144ed27f"}} + _, err = contact.SpecToCreation(s, env, sa) assert.EqualError(t, err, "unknown contact group '52f6c50e-f9a8-4f24-bb80-5c9f144ed27f'") // try to add to dynamic group - s = &contact.Spec{Groups: []assets.GroupUUID{"52f6c50e-f9a8-4f24-bb80-5c9f144ed27f"}} - _, err = s.Validate(env, sa) + s = &models.ContactSpec{Groups: []assets.GroupUUID{"52f6c50e-f9a8-4f24-bb80-5c9f144ed27f"}} + _, err = contact.SpecToCreation(s, env, sa) assert.EqualError(t, err, "unknown contact group '52f6c50e-f9a8-4f24-bb80-5c9f144ed27f'") } diff --git a/web/ivr/ivr.go b/web/ivr/ivr.go index 900a248c5..b5f9d5819 100644 --- a/web/ivr/ivr.go +++ b/web/ivr/ivr.go @@ -102,7 +102,7 @@ func handleIncomingCall(ctx context.Context, s *web.Server, r *http.Request, w h } // get the contact for this URN - contact, _, err := models.GetOrCreateContact(ctx, s.DB, oa, urn) + contact, _, err := models.GetOrCreateContact(ctx, s.DB, oa, []urns.URN{urn}) if err != nil { return channel, nil, client.WriteErrorResponse(w, errors.Wrapf(err, "unable to get contact by urn")) } diff --git a/web/surveyor/surveyor.go b/web/surveyor/surveyor.go index 0ed1a323e..c9f8f3d5b 100644 --- a/web/surveyor/surveyor.go +++ b/web/surveyor/surveyor.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" + "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/envs" "github.com/nyaruka/goflow/flows" @@ -95,7 +96,7 @@ func handleSubmit(ctx context.Context, s *web.Server, r *http.Request) (interfac // create / fetch our contact based on the highest priority URN urn := fs.Contact().URNs()[0].URN() - _, flowContact, err = models.GetOrCreateContact(ctx, s.DB, oa, urn) + _, flowContact, err = models.GetOrCreateContact(ctx, s.DB, oa, []urns.URN{urn}) if err != nil { return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to look up contact") }