Skip to content

Commit

Permalink
Merge pull request #364 from nyaruka/resolve_endpoint
Browse files Browse the repository at this point in the history
🔬 Add contact/resolve endpoint to assist with channel events still handled in RP
  • Loading branch information
rowanseymour authored Oct 2, 2020
2 parents 0fa60d4 + 653d8f2 commit cbccc73
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 35 deletions.
21 changes: 10 additions & 11 deletions models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func CreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, userID
return nil, nil, errors.New("URNs in use by other contacts")
}

contactID, err := tryInsertContactAndURNs(ctx, db, oa.OrgID(), userID, name, language, urnz)
contactID, err := tryInsertContactAndURNs(ctx, db, oa.OrgID(), userID, name, language, urnz, NilChannelID)
if err != nil {
// always possible that another thread created a contact with these URNs after we checked above
if dbutil.IsUniqueViolation(err) {
Expand Down Expand Up @@ -615,13 +615,13 @@ func CreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, userID
// * If URNs exists and belongs to a single contact it returns that contact (other URNs are not assigned to the contact).
// * If URNs exists and belongs to multiple contacts it will return an error.
//
func GetOrCreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, urnz []urns.URN) (*Contact, *flows.Contact, error) {
func GetOrCreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, urnz []urns.URN, channelID ChannelID) (*Contact, *flows.Contact, error) {
// ensure all URNs are normalized
for i, urn := range urnz {
urnz[i] = urn.Normalize(string(oa.Env().DefaultCountry()))
}

contactID, created, err := getOrCreateContact(ctx, db, oa.OrgID(), urnz)
contactID, created, err := getOrCreateContact(ctx, db, oa.OrgID(), urnz, channelID)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -649,7 +649,7 @@ func GetOrCreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, ur
return contact, flowContact, nil
}

func getOrCreateContact(ctx context.Context, db QueryerWithTx, orgID OrgID, urnz []urns.URN) (ContactID, bool, error) {
func getOrCreateContact(ctx context.Context, db QueryerWithTx, orgID OrgID, urnz []urns.URN, channelID ChannelID) (ContactID, bool, error) {
// find current owners of these URNs
owners, err := contactIDsFromURNs(ctx, db, orgID, urnz)
if err != nil {
Expand All @@ -663,7 +663,7 @@ func getOrCreateContact(ctx context.Context, db QueryerWithTx, orgID OrgID, urnz
return uniqueOwners[0], false, nil
}

contactID, err := tryInsertContactAndURNs(ctx, db, orgID, UserID(1), "", envs.NilLanguage, urnz)
contactID, err := tryInsertContactAndURNs(ctx, db, orgID, UserID(1), "", envs.NilLanguage, urnz, channelID)
if err == nil {
return contactID, true, nil
}
Expand Down Expand Up @@ -704,13 +704,13 @@ func uniqueContactIDs(urnMap map[urns.URN]ContactID) []ContactID {

// Tries to create a new contact for the passed in org with the passed in URNs. Returned error can be tested with `dbutil.IsUniqueViolation` to
// determine if problem was one or more of the URNs already exist and are assigned to other contacts.
func tryInsertContactAndURNs(ctx context.Context, db QueryerWithTx, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) {
func tryInsertContactAndURNs(ctx context.Context, db QueryerWithTx, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN, channelID ChannelID) (ContactID, error) {
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return NilContactID, errors.Wrapf(err, "error beginning transaction")
}

contactID, err := insertContactAndURNs(ctx, tx, orgID, userID, name, language, urnz)
contactID, err := insertContactAndURNs(ctx, tx, orgID, userID, name, language, urnz, channelID)
if err != nil {
tx.Rollback()
return NilContactID, err
Expand All @@ -725,7 +725,7 @@ func tryInsertContactAndURNs(ctx context.Context, db QueryerWithTx, orgID OrgID,
return contactID, nil
}

func insertContactAndURNs(ctx context.Context, db Queryer, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN) (ContactID, error) {
func insertContactAndURNs(ctx context.Context, db Queryer, orgID OrgID, userID UserID, name string, language envs.Language, urnz []urns.URN, channelID ChannelID) (ContactID, error) {
if userID == NilUserID {
userID = UserID(1)
}
Expand Down Expand Up @@ -757,11 +757,10 @@ func insertContactAndURNs(ctx context.Context, db Queryer, orgID OrgID, userID U
return NilContactID, errors.Wrapf(err, "error attaching existing URN to new contact")
}
} else {
_, err := db.ExecContext(
ctx,
_, err := db.ExecContext(ctx,
`INSERT INTO contacts_contacturn(org_id, identity, path, scheme, display, auth, priority, channel_id, contact_id)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
orgID, urn.Identity(), urn.Path(), urn.Scheme(), urn.Display(), GetURNAuth(urn), priority, nil, contactID,
orgID, urn.Identity(), urn.Path(), urn.Scheme(), urn.Display(), GetURNAuth(urn), priority, channelID, contactID,
)
if err != nil {
return NilContactID, err
Expand Down
24 changes: 17 additions & 7 deletions models/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,65 +194,75 @@ func TestGetOrCreateContact(t *testing.T) {
URNs []urns.URN
ContactID models.ContactID
ContactURNs []urns.URN
ChannelID models.ChannelID
}{
{
models.Org1,
[]urns.URN{models.CathyURN},
models.CathyID,
[]urns.URN{"tel:+16055741111?id=10000&priority=1000"},
models.NilChannelID,
},
{
models.Org1,
[]urns.URN{urns.URN(models.CathyURN.String() + "?foo=bar")},
models.CathyID, // only URN identity is considered
[]urns.URN{"tel:+16055741111?id=10000&priority=1000"},
models.NilChannelID,
},
{
models.Org1,
[]urns.URN{urns.URN("telegram:100001")},
newContact(), // creates new contact
[]urns.URN{"telegram:100001?id=20123&priority=1000"},
[]urns.URN{"telegram:100001?channel=74729f45-7f29-4868-9dc4-90e491e3c7d8&id=20123&priority=1000"},
models.TwilioChannelID,
},
{
models.Org1,
[]urns.URN{urns.URN("telegram:100001")},
prevContact(), // returns the same created contact
[]urns.URN{"telegram:100001?id=20123&priority=1000"},
[]urns.URN{"telegram:100001?channel=74729f45-7f29-4868-9dc4-90e491e3c7d8&id=20123&priority=1000"},
models.NilChannelID,
},
{
models.Org1,
[]urns.URN{urns.URN("telegram:100001"), urns.URN("telegram:100002")},
prevContact(), // same again as other URNs don't exist
[]urns.URN{"telegram:100001?id=20123&priority=1000"},
[]urns.URN{"telegram:100001?channel=74729f45-7f29-4868-9dc4-90e491e3c7d8&id=20123&priority=1000"},
models.NilChannelID,
},
{
models.Org1,
[]urns.URN{urns.URN("telegram:100002"), urns.URN("telegram:100001")},
prevContact(), // same again as other URNs don't exist
[]urns.URN{"telegram:100001?id=20123&priority=1000"},
[]urns.URN{"telegram:100001?channel=74729f45-7f29-4868-9dc4-90e491e3c7d8&id=20123&priority=1000"},
models.NilChannelID,
},
{
models.Org1,
[]urns.URN{urns.URN("telegram:200001"), urns.URN("telegram:100001")},
prevContact(), // same again as other URNs are orphaned
[]urns.URN{"telegram:100001?id=20123&priority=1000"},
[]urns.URN{"telegram:100001?channel=74729f45-7f29-4868-9dc4-90e491e3c7d8&id=20123&priority=1000"},
models.NilChannelID,
},
{
models.Org1,
[]urns.URN{urns.URN("telegram:100003"), urns.URN("telegram:100004")}, // 2 new URNs
newContact(),
[]urns.URN{"telegram:100003?id=20124&priority=1000", "telegram:100004?id=20125&priority=999"},
models.NilChannelID,
},
{
models.Org1,
[]urns.URN{urns.URN("telegram:100005"), urns.URN("telegram:200002")}, // 1 new, 1 orphaned
newContact(),
[]urns.URN{"telegram:100005?id=20126&priority=1000", "telegram:200002?id=20122&priority=999"},
models.NilChannelID,
},
}

for i, tc := range tcs {
contact, flowContact, err := models.GetOrCreateContact(ctx, db, oa, tc.URNs)
contact, flowContact, err := models.GetOrCreateContact(ctx, db, oa, tc.URNs, tc.ChannelID)
assert.NoError(t, err, "%d: error creating contact", i)

assert.Equal(t, tc.ContactID, contact.ID(), "%d: contact id mismatch", i)
Expand Down Expand Up @@ -282,7 +292,7 @@ func TestGetOrCreateContactRace(t *testing.T) {

getOrCreate := func(i int) {
defer wg.Done()
contacts[i], _, errs[i] = models.GetOrCreateContact(ctx, mdb, oa, []urns.URN{urns.URN("telegram:100007")})
contacts[i], _, errs[i] = models.GetOrCreateContact(ctx, mdb, oa, []urns.URN{urns.URN("telegram:100007")}, models.NilChannelID)
}

wg.Add(2)
Expand Down
2 changes: 1 addition & 1 deletion models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func HandleAndCommitEvents(ctx context.Context, db QueryerWithTx, rp *redis.Pool
scenes = append(scenes, scene)
}

// begin the transaction for handling and pre-commit hooks
// begin the transaction for pre-commit hooks
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return errors.Wrapf(err, "error beginning transaction")
Expand Down
2 changes: 1 addition & 1 deletion models/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (b *ContactImportBatch) getOrCreateContacts(ctx context.Context, db Queryer
}

} else {
imp.contact, imp.flowContact, err = GetOrCreateContact(ctx, db, oa, spec.URNs)
imp.contact, imp.flowContact, err = GetOrCreateContact(ctx, db, oa, spec.URNs, NilChannelID)
if err != nil {
urnStrs := make([]string, len(spec.URNs))
for i := range spec.URNs {
Expand Down
22 changes: 16 additions & 6 deletions testsuite/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,37 @@ package testsuite
import (
"context"
"database/sql"
"sync"

"github.com/jmoiron/sqlx"
)

// MockDB is a mockable proxy to a real sqlx.DB that implements models.Queryer
type MockDB struct {
real *sqlx.DB
callCounts map[string]int
shouldErr func(funcName string, call int) error
real *sqlx.DB
callCounts map[string]int
callCountsMutex *sync.Mutex

// invoked before each queryer method call giving tests a chance to define an error return, sleep etc
shouldErr func(funcName string, call int) error
}

// NewMockDB creates a new mock over the given database connection
func NewMockDB(db *sqlx.DB, shouldErr func(funcName string, call int) error) *MockDB {
return &MockDB{
real: db,
callCounts: make(map[string]int),
shouldErr: shouldErr,
real: db,
callCounts: make(map[string]int),
callCountsMutex: &sync.Mutex{},
shouldErr: shouldErr,
}
}

func (d *MockDB) check(funcName string) error {
d.callCountsMutex.Lock()
call := d.callCounts[funcName]
d.callCounts[funcName]++
d.callCountsMutex.Unlock()

return d.shouldErr(funcName, call)
}

Expand Down
60 changes: 53 additions & 7 deletions web/contact/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"net/http"

"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom/goflow"
Expand All @@ -17,6 +18,7 @@ import (
func init() {
web.RegisterJSONRoute(http.MethodPost, "/mr/contact/create", web.RequireAuthToken(handleCreate))
web.RegisterJSONRoute(http.MethodPost, "/mr/contact/modify", web.RequireAuthToken(handleModify))
web.RegisterJSONRoute(http.MethodPost, "/mr/contact/resolve", web.RequireAuthToken(handleResolve))
}

// Request to create a new contact.
Expand All @@ -39,7 +41,7 @@ type createRequest struct {
Contact *models.ContactSpec `json:"contact" validate:"required"`
}

// handles a request to create the given contacts
// handles a request to create the given contact
func handleCreate(ctx context.Context, s *web.Server, r *http.Request) (interface{}, int, error) {
request := &createRequest{}
if err := utils.UnmarshalAndValidateWithLimit(r.Body, request, web.MaxRequestBytes); err != nil {
Expand Down Expand Up @@ -128,12 +130,6 @@ func handleModify(ctx context.Context, s *web.Server, r *http.Request) (interfac
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
}

// clone it as we will modify flows
oa, err = oa.Clone(s.CTX, s.DB)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to clone orgs")
}

// read the modifiers from the request
mods, err := goflow.ReadModifiers(oa.SessionAssets(), request.Modifiers, goflow.ErrorOnMissing)
if err != nil {
Expand Down Expand Up @@ -173,3 +169,53 @@ func handleModify(ctx context.Context, s *web.Server, r *http.Request) (interfac

return results, http.StatusOK, nil
}

// Request to resolve a contact based on a channel and URN
//
// {
// "org_id": 1,
// "channel_id": 234,
// "urn": "tel:+250788123123"
// }
//
type resolveRequest struct {
OrgID models.OrgID `json:"org_id" validate:"required"`
ChannelID models.ChannelID `json:"channel_id" validate:"required"`
URN urns.URN `json:"urn" validate:"required"`
}

// handles a request to resolve a contact
func handleResolve(ctx context.Context, s *web.Server, r *http.Request) (interface{}, int, error) {
request := &resolveRequest{}
if err := utils.UnmarshalAndValidateWithLimit(r.Body, request, web.MaxRequestBytes); err != nil {
return errors.Wrapf(err, "request failed validation"), http.StatusBadRequest, nil
}

// grab our org
oa, err := models.GetOrgAssets(s.CTX, s.DB, request.OrgID)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
}

_, contact, err := models.GetOrCreateContact(ctx, s.DB, oa, []urns.URN{request.URN}, request.ChannelID)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error getting or creating contact")
}

// find the URN on the contact
urn := request.URN.Normalize(string(oa.Env().DefaultCountry()))
for _, u := range contact.URNs() {
if urn.Identity() == u.URN().Identity() {
urn = u.URN()
break
}
}

return map[string]interface{}{
"contact": contact,
"urn": map[string]interface{}{
"id": models.GetURNInt(urn, "id"),
"identity": urn.Identity(),
},
}, http.StatusOK, nil
}
12 changes: 12 additions & 0 deletions web/contact/contact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,15 @@ func TestModifyContacts(t *testing.T) {

models.FlushCache()
}

func TestResolveContacts(t *testing.T) {
testsuite.Reset()
db := testsuite.DB()

// detach Cathy's tel URN
db.MustExec(`UPDATE contacts_contacturn SET contact_id = NULL WHERE contact_id = $1`, models.CathyID)

db.MustExec(`ALTER SEQUENCE contacts_contact_id_seq RESTART WITH 30000`)

web.RunWebTests(t, "testdata/resolve.json")
}
Loading

0 comments on commit cbccc73

Please sign in to comment.