diff --git a/hooks/session_triggered.go b/hooks/session_triggered.go index 1a816f0a3..c191db297 100644 --- a/hooks/session_triggered.go +++ b/hooks/session_triggered.go @@ -86,7 +86,7 @@ func (h *InsertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool } // load our contacts by uuid - contactIDs, err := models.ContactIDsFromReferences(ctx, tx, oa, event.Contacts) + contactIDs, err := models.ContactIDsFromReferences(ctx, tx, oa.OrgID(), event.Contacts) if err != nil { return errors.Wrapf(err, "error loading contacts by reference") } diff --git a/models/contacts.go b/models/contacts.go index fcb3151f1..7062794d9 100644 --- a/models/contacts.go +++ b/models/contacts.go @@ -162,6 +162,15 @@ func LoadContacts(ctx context.Context, db Queryer, org *OrgAssets, ids []Contact return contacts, nil } +// LoadContactsByUUID loads a set of contacts for the passed in UUIDs +func LoadContactsByUUID(ctx context.Context, db Queryer, oa *OrgAssets, uuids []flows.ContactUUID) ([]*Contact, error) { + ids, err := ContactIDsFromUUIDs(ctx, db, oa.OrgID(), uuids) + if err != nil { + return nil, err + } + return LoadContacts(ctx, db, oa, ids) +} + // GetNewestContactModifiedOn returns the newest modified_on for a contact in the passed in org func GetNewestContactModifiedOn(ctx context.Context, db *sqlx.DB, org *OrgAssets) (*time.Time, error) { rows, err := db.QueryxContext(ctx, "SELECT modified_on FROM contacts_contact WHERE org_id = $1 ORDER BY modified_on DESC LIMIT 1", org.OrgID()) @@ -184,20 +193,22 @@ func GetNewestContactModifiedOn(ctx context.Context, db *sqlx.DB, org *OrgAssets } // ContactIDsFromReferences queries the contacts for the passed in org, returning the contact ids for the references -func ContactIDsFromReferences(ctx context.Context, tx Queryer, org *OrgAssets, refs []*flows.ContactReference) ([]ContactID, error) { +func ContactIDsFromReferences(ctx context.Context, tx Queryer, orgID OrgID, refs []*flows.ContactReference) ([]ContactID, error) { // build our list of UUIDs - uuids := make([]interface{}, len(refs)) + uuids := make([]flows.ContactUUID, len(refs)) for i := range refs { uuids[i] = refs[i].UUID } - ids := make([]ContactID, 0, len(refs)) - rows, err := tx.QueryxContext(ctx, - `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, - org.OrgID(), pq.Array(uuids), - ) + return ContactIDsFromUUIDs(ctx, tx, orgID, uuids) +} + +// ContactIDsFromUUIDs queries the contacts for the passed in org, returning the contact ids for the UUIDs +func ContactIDsFromUUIDs(ctx context.Context, tx Queryer, orgID OrgID, uuids []flows.ContactUUID) ([]ContactID, error) { + ids := make([]ContactID, 0, len(uuids)) + rows, err := tx.QueryxContext(ctx, `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, orgID, pq.Array(uuids)) if err != nil { - return nil, errors.Wrapf(err, "error selecting contact ids by uuid") + return nil, errors.Wrapf(err, "error selecting contact ids by UUID") } defer rows.Close() diff --git a/models/events.go b/models/events.go index cc54b5656..f21250a0d 100644 --- a/models/events.go +++ b/models/events.go @@ -235,3 +235,27 @@ func HandleAndCommitEvents(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa } return nil } + +// ApplyModifiers modifies contacts by applying modifiers and handling the resultant events +func ApplyModifiers(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *OrgAssets, modifiersByContact map[*flows.Contact][]flows.Modifier) (map[*flows.Contact][]flows.Event, error) { + // create an environment instance with location support + env := flows.NewEnvironment(oa.Env(), oa.SessionAssets().Locations()) + + eventsByContact := make(map[*flows.Contact][]flows.Event, len(modifiersByContact)) + + // apply the modifiers to get the events for each contact + for contact, mods := range modifiersByContact { + events := make([]flows.Event, 0) + for _, mod := range mods { + mod.Apply(env, oa.SessionAssets(), contact, func(e flows.Event) { events = append(events, e) }) + } + eventsByContact[contact] = events + } + + err := HandleAndCommitEvents(ctx, db, rp, oa, eventsByContact) + if err != nil { + return nil, errors.Wrap(err, "error commiting events") + } + + return eventsByContact, nil +} diff --git a/models/imports.go b/models/imports.go index 06f8dc84d..05fad17ab 100644 --- a/models/imports.go +++ b/models/imports.go @@ -3,13 +3,16 @@ package models import ( "context" "encoding/json" + "fmt" "time" "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/assets" + "github.com/nyaruka/goflow/envs" "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/flows/modifiers" "github.com/pkg/errors" "github.com/jmoiron/sqlx" @@ -52,55 +55,214 @@ type ContactImportBatch struct { } // Import does the actual import of this batch -func (b *ContactImportBatch) Import(ctx context.Context, db *sqlx.DB) error { +func (b *ContactImportBatch) Import(ctx context.Context, db *sqlx.DB, orgID OrgID) error { // if any error occurs this batch should be marked as failed - if err := b.tryImport(ctx, db); err != nil { + if err := b.tryImport(ctx, db, orgID); err != nil { b.markFailed(ctx, db) return err } return nil } -func (b *ContactImportBatch) tryImport(ctx context.Context, db *sqlx.DB) error { +// holds work data for import of a single contact +type importContact struct { + record int + spec *ContactSpec + contact *Contact + created bool + flowContact *flows.Contact + mods []flows.Modifier + errors []string +} + +func (b *ContactImportBatch) tryImport(ctx context.Context, db *sqlx.DB, orgID OrgID) error { if err := b.markProcessing(ctx, db); err != nil { - return errors.Wrap(err, "unable to mark as processing") + return errors.Wrap(err, "error marking as processing") + } + + // grab our org assets + oa, err := GetOrgAssets(ctx, db, orgID) + if err != nil { + return errors.Wrap(err, "error loading org assets") } // unmarshal this batch's specs var specs []*ContactSpec if err := jsonx.Unmarshal(b.Specs, &specs); err != nil { - return errors.New("unable to unmarsal specs") + return errors.Wrap(err, "error unmarsaling specs") + } + + // create our work data for each contact being created or updated + imports := make([]*importContact, len(specs)) + for i := range imports { + // ensure all URNs are normalized + for j, urn := range specs[i].URNs { + specs[i].URNs[j] = urn.Normalize(string(oa.Env().DefaultCountry())) + } + + imports[i] = &importContact{record: b.RecordStart + i, spec: specs[i]} + } + + if err := b.getOrCreateContacts(ctx, db, oa, imports); err != nil { + return errors.Wrap(err, "error getting and creating contacts") + } + + // gather up contacts and modifiers + modifiersByContact := make(map[*flows.Contact][]flows.Modifier, len(imports)) + for _, imp := range imports { + // ignore errored imports which couldn't get/create a contact + if imp.contact != nil { + modifiersByContact[imp.flowContact] = imp.mods + } } - result, err := importContactSpecs(ctx, db, specs) + // and apply in bulk + _, err = ApplyModifiers(ctx, db, nil, oa, modifiersByContact) if err != nil { - return errors.Wrap(err, "unable to import specs") + return errors.Wrap(err, "error applying modifiers") } - if err := b.markComplete(ctx, db, result); err != nil { + if err := b.markComplete(ctx, db, imports); err != nil { return errors.Wrap(err, "unable to mark as complete") } return nil } +// for each import, fetches or creates the contact, creates the modifiers needed to set fields etc +func (b *ContactImportBatch) getOrCreateContacts(ctx context.Context, db *sqlx.DB, oa *OrgAssets, imports []*importContact) error { + sa := oa.SessionAssets() + + // build map of UUIDs to contacts + contactsByUUID, err := b.loadContactsByUUID(ctx, db, oa, imports) + if err != nil { + return errors.Wrap(err, "error loading contacts by UUID") + } + + for _, imp := range imports { + addModifier := func(m flows.Modifier) { imp.mods = append(imp.mods, m) } + addError := func(s string, args ...interface{}) { imp.errors = append(imp.errors, fmt.Sprintf(s, args...)) } + spec := imp.spec + + uuid := spec.UUID + if uuid != "" { + imp.contact = contactsByUUID[uuid] + if imp.contact == nil { + addError("Unable to find contact with UUID '%s'", uuid) + continue + } + + imp.flowContact, err = imp.contact.FlowContact(oa) + if err != nil { + return errors.Wrapf(err, "error creating flow contact for %d", imp.contact.ID()) + } + + addModifier(modifiers.NewURNs(spec.URNs, modifiers.URNsAppend)) + + } else { + // TODO get or create by multiple URNs + + imp.contact, imp.flowContact, err = GetOrCreateContact(ctx, db, oa, spec.URNs[0]) + if err != nil { + addError("Unable to get or create contact with URN '%s'", spec.URNs[0]) + continue + } + } + + if spec.Name != nil { + addModifier(modifiers.NewName(*spec.Name)) + } + if spec.Language != nil { + lang, err := envs.ParseLanguage(*spec.Language) + if err != nil { + addError("'%s' is not a valid language code", *spec.Language) + } else { + addModifier(modifiers.NewLanguage(lang)) + } + } + + for key, value := range spec.Fields { + field := sa.Fields().Get(key) + if field == nil { + addError("'%s' is not a valid contact field key", key) + } else { + addModifier(modifiers.NewField(field, value)) + } + } + + if len(spec.Groups) > 0 { + groups := make([]*flows.Group, 0, len(spec.Groups)) + for _, uuid := range spec.Groups { + group := sa.Groups().Get(uuid) + if group == nil { + addError("'%s' is not a valid contact group UUID", uuid) + } else { + groups = append(groups, group) + } + } + addModifier(modifiers.NewGroups(groups, modifiers.GroupsAdd)) + } + } + + return nil +} + +// loads any import contacts for which we have UUIDs +func (b *ContactImportBatch) loadContactsByUUID(ctx context.Context, db *sqlx.DB, oa *OrgAssets, imports []*importContact) (map[flows.ContactUUID]*Contact, error) { + uuids := make([]flows.ContactUUID, 0, 50) + for _, imp := range imports { + if imp.spec.UUID != "" { + uuids = append(uuids, imp.spec.UUID) + } + } + + // build map of UUIDs to contacts + contacts, err := LoadContactsByUUID(ctx, db, oa, uuids) + if err != nil { + return nil, err + } + + contactsByUUID := make(map[flows.ContactUUID]*Contact, len(contacts)) + for _, c := range contacts { + contactsByUUID[c.UUID()] = c + } + return contactsByUUID, nil +} + func (b *ContactImportBatch) markProcessing(ctx context.Context, db *sqlx.DB) error { b.Status = ContactImportStatusProcessing _, err := db.ExecContext(ctx, `UPDATE contacts_contactimportbatch SET status = $2 WHERE id = $1`, b.ID, b.Status) return err } -func (b *ContactImportBatch) markComplete(ctx context.Context, db *sqlx.DB, r importResult) error { - errorsJSON, err := jsonx.Marshal(r.errors) +func (b *ContactImportBatch) markComplete(ctx context.Context, db *sqlx.DB, imports []*importContact) error { + numCreated := 0 + numUpdated := 0 + numErrored := 0 + importErrors := make([]importError, 0, 10) + for _, imp := range imports { + if imp.contact == nil { + numErrored++ + } else if imp.created { + numCreated++ + } else { + numUpdated++ + } + for _, e := range imp.errors { + importErrors = append(importErrors, importError{Record: imp.record, Message: e}) + } + } + + errorsJSON, err := jsonx.Marshal(importErrors) if err != nil { return errors.Wrap(err, "error marshaling errors") } now := dates.Now() b.Status = ContactImportStatusComplete - b.NumCreated = r.numCreated - b.NumUpdated = r.numUpdated - b.NumErrored = r.numErrored + b.NumCreated = numCreated + b.NumUpdated = numUpdated + b.NumErrored = numErrored b.Errors = errorsJSON b.FinishedOn = &now _, err = db.ExecContext(ctx, @@ -156,21 +318,3 @@ type importError struct { Record int `json:"record"` Message string `json:"message"` } - -// holds the result of importing a set of contact specs -type importResult struct { - numCreated int - numUpdated int - numErrored int - errors []importError -} - -func importContactSpecs(ctx context.Context, db *sqlx.DB, specs []*ContactSpec) (importResult, error) { - res := importResult{ - errors: make([]importError, 0), - } - - // TODO - - return res, nil -} diff --git a/models/imports_test.go b/models/imports_test.go index fca19c5e7..4e4cecdc5 100644 --- a/models/imports_test.go +++ b/models/imports_test.go @@ -1,13 +1,13 @@ package models_test import ( - "encoding/json" "testing" "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/flows" + _ "github.com/nyaruka/mailroom/hooks" "github.com/nyaruka/mailroom/models" "github.com/nyaruka/mailroom/testsuite" "github.com/nyaruka/mailroom/testsuite/testdata" @@ -21,18 +21,21 @@ func TestContactImportBatch(t *testing.T) { db := testsuite.DB() importID := testdata.InsertContactImport(t, db, models.Org1) - batchID := testdata.InsertContactImportBatch(t, db, importID, `[{"name": "Bob"},{"name": "Jim"}]`) + batchID := testdata.InsertContactImportBatch(t, db, importID, `[ + {"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]}, + {"name": "Leah", "urns": ["tel:+16055740002"]} + ]`) batch, err := models.LoadContactImportBatch(ctx, db, batchID) require.NoError(t, err) assert.Equal(t, importID, batch.ImportID) assert.Equal(t, models.ContactImportStatus("P"), batch.Status) - assert.Equal(t, json.RawMessage(`[{"name": "Bob"}, {"name": "Jim"}]`), batch.Specs) + assert.NotNil(t, batch.Specs) assert.Equal(t, 10, batch.RecordStart) assert.Equal(t, 12, batch.RecordEnd) - err = batch.Import(ctx, db) + err = batch.Import(ctx, db, models.Org1) require.NoError(t, err) testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contactimportbatch WHERE status = 'C' AND finished_on IS NOT NULL`, []interface{}{}, 1) diff --git a/models/msgs.go b/models/msgs.go index d4b3d89d1..30be94e07 100644 --- a/models/msgs.go +++ b/models/msgs.go @@ -683,7 +683,7 @@ func NewBroadcastFromEvent(ctx context.Context, tx Queryer, org *OrgAssets, even } // resolve our contact references - contactIDs, err := ContactIDsFromReferences(ctx, tx, org, event.Contacts) + contactIDs, err := ContactIDsFromReferences(ctx, tx, org.OrgID(), event.Contacts) if err != nil { return nil, errors.Wrapf(err, "error resolving contact references") } diff --git a/tasks/contacts/import_contact_batch.go b/tasks/contacts/import_contact_batch.go index 207065ef8..38e037728 100644 --- a/tasks/contacts/import_contact_batch.go +++ b/tasks/contacts/import_contact_batch.go @@ -34,7 +34,7 @@ func (t *ImportContactBatchTask) Perform(ctx context.Context, mr *mailroom.Mailr return errors.Wrapf(err, "unable to load contact import batch with id %d", t.ContactImportBatchID) } - if err := batch.Import(ctx, mr.DB); err != nil { + if err := batch.Import(ctx, mr.DB, orgID); err != nil { return errors.Wrapf(err, "unable to import contact import batch %d", t.ContactImportBatchID) } diff --git a/tasks/contacts/import_contact_batch_test.go b/tasks/contacts/import_contact_batch_test.go index 8387b8a15..cd760b1d3 100644 --- a/tasks/contacts/import_contact_batch_test.go +++ b/tasks/contacts/import_contact_batch_test.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom" "github.com/nyaruka/mailroom/config" + _ "github.com/nyaruka/mailroom/hooks" "github.com/nyaruka/mailroom/models" "github.com/nyaruka/mailroom/tasks/contacts" "github.com/nyaruka/mailroom/testsuite" @@ -20,10 +21,16 @@ func TestImportContactBatch(t *testing.T) { mr := &mailroom.Mailroom{Config: config.Mailroom, DB: db, RP: testsuite.RP(), ElasticClient: nil} importID := testdata.InsertContactImport(t, db, models.Org1) - batchID := testdata.InsertContactImportBatch(t, db, importID, `[{"name": "Bob"},{"name": "Jim"}]`) + batchID := testdata.InsertContactImportBatch(t, db, importID, `[ + {"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]}, + {"name": "Leah", "urns": ["tel:+16055740002"]} + ]`) task := &contacts.ImportContactBatchTask{ContactImportBatchID: batchID} err := task.Perform(ctx, mr, models.Org1) require.NoError(t, err) + + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Norbert' AND language = 'eng'`, nil, 1) + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Leah' AND language IS NULL`, nil, 1) } diff --git a/web/contact/contact.go b/web/contact/contact.go index ede25528d..628f255b5 100644 --- a/web/contact/contact.go +++ b/web/contact/contact.go @@ -63,7 +63,7 @@ func handleCreate(ctx context.Context, s *web.Server, r *http.Request) (interfac } modifiersByContact := map[*flows.Contact][]flows.Modifier{contact: c.Mods} - _, err = ModifyContacts(ctx, s.DB, s.RP, oa, modifiersByContact) + _, err = models.ApplyModifiers(ctx, s.DB, s.RP, oa, modifiersByContact) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "error modifying new contact") } @@ -157,7 +157,7 @@ func handleModify(ctx context.Context, s *web.Server, r *http.Request) (interfac modifiersByContact[flowContact] = mods } - eventsByContact, err := ModifyContacts(ctx, s.DB, s.RP, oa, modifiersByContact) + eventsByContact, err := models.ApplyModifiers(ctx, s.DB, s.RP, oa, modifiersByContact) if err != nil { return nil, http.StatusBadRequest, err } diff --git a/web/contact/utils.go b/web/contact/utils.go index 57ccd3ae6..e3a768e69 100644 --- a/web/contact/utils.go +++ b/web/contact/utils.go @@ -1,16 +1,12 @@ package contact import ( - "context" - "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/envs" "github.com/nyaruka/goflow/flows" "github.com/nyaruka/goflow/flows/modifiers" "github.com/nyaruka/mailroom/models" - "github.com/gomodule/redigo/redis" - "github.com/jmoiron/sqlx" "github.com/pkg/errors" ) @@ -74,27 +70,3 @@ func SpecToCreation(s *models.ContactSpec, env envs.Environment, sa flows.Sessio return validated, nil } - -// ModifyContacts modifies contacts by applying modifiers and handling the resultant events -func ModifyContacts(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *models.OrgAssets, modifiersByContact map[*flows.Contact][]flows.Modifier) (map[*flows.Contact][]flows.Event, error) { - // create an environment instance with location support - env := flows.NewEnvironment(oa.Env(), oa.SessionAssets().Locations()) - - eventsByContact := make(map[*flows.Contact][]flows.Event, len(modifiersByContact)) - - // apply the modifiers to get the events for each contact - for contact, mods := range modifiersByContact { - events := make([]flows.Event, 0) - for _, mod := range mods { - mod.Apply(env, oa.SessionAssets(), contact, func(e flows.Event) { events = append(events, e) }) - } - eventsByContact[contact] = events - } - - err := models.HandleAndCommitEvents(ctx, db, rp, oa, eventsByContact) - if err != nil { - return nil, errors.Wrap(err, "error commiting events") - } - - return eventsByContact, nil -}