Skip to content

Commit

Permalink
Merge pull request rapidpro#366 from nyaruka/bulk_contact_get_or_crea…
Browse files Browse the repository at this point in the history
…te_fix

Fix race condition in GetOrCreateContactIDsFromURNs
  • Loading branch information
rowanseymour authored Oct 5, 2020
2 parents 896ba60 + a59c8db commit 81cfacf
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 115 deletions.
116 changes: 58 additions & 58 deletions models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,64 +504,6 @@ WHERE
) r;
`

// GetOrCreateContactIDsFromURNs will fetch or create the contacts for the passed in URNs, returning a map the same length as
// the passed in URNs with the ids of the contacts.
func GetOrCreateContactIDsFromURNs(ctx context.Context, db QueryerWithTx, oa *OrgAssets, urnz []urns.URN) (map[urns.URN]ContactID, error) {
// ensure all URNs are normalized
for i, urn := range urnz {
urnz[i] = urn.Normalize(string(oa.Env().DefaultCountry()))
}

// find current owners of these URNs
owners, err := contactIDsFromURNs(ctx, db, oa.OrgID(), urnz)
if err != nil {
return nil, errors.Wrapf(err, "error looking up contacts for URNs")
}

// create any contacts that are missing
for urn, contactID := range owners {
if contactID == NilContactID {
contact, _, err := CreateContact(ctx, db, oa, NilUserID, "", envs.NilLanguage, []urns.URN{urn})
if err != nil {
return nil, errors.Wrapf(err, "error creating contact")
}
owners[urn] = contact.ID()
}
}
return owners, nil
}

// looks up the contacts who own the given urns (which should be normalized by the caller) and returns that information as a map
func contactIDsFromURNs(ctx context.Context, db Queryer, orgID OrgID, urnz []urns.URN) (map[urns.URN]ContactID, error) {
identityToOriginal := make(map[urns.URN]urns.URN, len(urnz))
identities := make([]urns.URN, len(urnz))
owners := make(map[urns.URN]ContactID, len(urnz))

for i, urn := range urnz {
identity := urn.Identity()
identityToOriginal[identity] = urn
identities[i] = identity
owners[urn] = NilContactID
}

rows, err := db.QueryxContext(ctx, `SELECT contact_id, identity FROM contacts_contacturn WHERE org_id = $1 AND identity = ANY($2)`, orgID, pq.Array(identities))
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, "error querying contact URNs")
}
defer rows.Close()

for rows.Next() {
var urn urns.URN
var id ContactID
if err := rows.Scan(&id, &urn); err != nil {
return nil, errors.Wrapf(err, "error scanning URN result")
}
owners[identityToOriginal[urn]] = id
}

return owners, nil
}

// CreateContact creates a new contact for the passed in org with the passed in URNs
func CreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, userID UserID, name string, language envs.Language, urnz []urns.URN) (*Contact, *flows.Contact, error) {
// ensure all URNs are normalized
Expand Down Expand Up @@ -649,6 +591,64 @@ func GetOrCreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, ur
return contact, flowContact, nil
}

// GetOrCreateContactIDsFromURNs will fetch or create the contacts for the passed in URNs, returning a map the same length as
// the passed in URNs with the ids of the contacts.
func GetOrCreateContactIDsFromURNs(ctx context.Context, db QueryerWithTx, oa *OrgAssets, urnz []urns.URN) (map[urns.URN]ContactID, error) {
// ensure all URNs are normalized
for i, urn := range urnz {
urnz[i] = urn.Normalize(string(oa.Env().DefaultCountry()))
}

// find current owners of these URNs
owners, err := contactIDsFromURNs(ctx, db, oa.OrgID(), urnz)
if err != nil {
return nil, errors.Wrapf(err, "error looking up contacts for URNs")
}

// create any contacts that are missing
for urn, contactID := range owners {
if contactID == NilContactID {
contact, _, err := GetOrCreateContact(ctx, db, oa, []urns.URN{urn}, NilChannelID)
if err != nil {
return nil, errors.Wrapf(err, "error creating contact")
}
owners[urn] = contact.ID()
}
}
return owners, nil
}

// looks up the contacts who own the given urns (which should be normalized by the caller) and returns that information as a map
func contactIDsFromURNs(ctx context.Context, db Queryer, orgID OrgID, urnz []urns.URN) (map[urns.URN]ContactID, error) {
identityToOriginal := make(map[urns.URN]urns.URN, len(urnz))
identities := make([]urns.URN, len(urnz))
owners := make(map[urns.URN]ContactID, len(urnz))

for i, urn := range urnz {
identity := urn.Identity()
identityToOriginal[identity] = urn
identities[i] = identity
owners[urn] = NilContactID
}

rows, err := db.QueryxContext(ctx, `SELECT contact_id, identity FROM contacts_contacturn WHERE org_id = $1 AND identity = ANY($2)`, orgID, pq.Array(identities))
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, "error querying contact URNs")
}
defer rows.Close()

for rows.Next() {
var urn urns.URN
var id ContactID
if err := rows.Scan(&id, &urn); err != nil {
return nil, errors.Wrapf(err, "error scanning URN result")
}
owners[identityToOriginal[urn]] = id
}

return owners, nil
}

func getOrCreateContact(ctx context.Context, db QueryerWithTx, orgID OrgID, urnz []urns.URN, channelID ChannelID) (ContactID, bool, error) {
// find current owners of these URNs
owners, err := contactIDsFromURNs(ctx, db, orgID, urnz)
Expand Down
Loading

0 comments on commit 81cfacf

Please sign in to comment.