Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Sep 24, 2020
1 parent 7659543 commit 2f1e7fe
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 77 deletions.
2 changes: 1 addition & 1 deletion hooks/session_triggered.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h *InsertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool
}

// load our contacts by uuid
contactIDs, err := models.ContactIDsFromReferences(ctx, tx, oa, event.Contacts)
contactIDs, err := models.ContactIDsFromReferences(ctx, tx, oa.OrgID(), event.Contacts)
if err != nil {
return errors.Wrapf(err, "error loading contacts by reference")
}
Expand Down
27 changes: 19 additions & 8 deletions models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@ func LoadContacts(ctx context.Context, db Queryer, org *OrgAssets, ids []Contact
return contacts, nil
}

// LoadContactsByUUID loads a set of contacts for the passed in UUIDs
func LoadContactsByUUID(ctx context.Context, db Queryer, oa *OrgAssets, uuids []flows.ContactUUID) ([]*Contact, error) {
ids, err := ContactIDsFromUUIDs(ctx, db, oa.OrgID(), uuids)
if err != nil {
return nil, err
}
return LoadContacts(ctx, db, oa, ids)
}

// GetNewestContactModifiedOn returns the newest modified_on for a contact in the passed in org
func GetNewestContactModifiedOn(ctx context.Context, db *sqlx.DB, org *OrgAssets) (*time.Time, error) {
rows, err := db.QueryxContext(ctx, "SELECT modified_on FROM contacts_contact WHERE org_id = $1 ORDER BY modified_on DESC LIMIT 1", org.OrgID())
Expand All @@ -184,20 +193,22 @@ func GetNewestContactModifiedOn(ctx context.Context, db *sqlx.DB, org *OrgAssets
}

// ContactIDsFromReferences queries the contacts for the passed in org, returning the contact ids for the references
func ContactIDsFromReferences(ctx context.Context, tx Queryer, org *OrgAssets, refs []*flows.ContactReference) ([]ContactID, error) {
func ContactIDsFromReferences(ctx context.Context, tx Queryer, orgID OrgID, refs []*flows.ContactReference) ([]ContactID, error) {
// build our list of UUIDs
uuids := make([]interface{}, len(refs))
uuids := make([]flows.ContactUUID, len(refs))
for i := range refs {
uuids[i] = refs[i].UUID
}

ids := make([]ContactID, 0, len(refs))
rows, err := tx.QueryxContext(ctx,
`SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`,
org.OrgID(), pq.Array(uuids),
)
return ContactIDsFromUUIDs(ctx, tx, orgID, uuids)
}

// ContactIDsFromUUIDs queries the contacts for the passed in org, returning the contact ids for the UUIDs
func ContactIDsFromUUIDs(ctx context.Context, tx Queryer, orgID OrgID, uuids []flows.ContactUUID) ([]ContactID, error) {
ids := make([]ContactID, 0, len(uuids))
rows, err := tx.QueryxContext(ctx, `SELECT id FROM contacts_contact WHERE org_id = $1 AND uuid = ANY($2) AND is_active = TRUE`, orgID, pq.Array(uuids))
if err != nil {
return nil, errors.Wrapf(err, "error selecting contact ids by uuid")
return nil, errors.Wrapf(err, "error selecting contact ids by UUID")
}
defer rows.Close()

Expand Down
24 changes: 24 additions & 0 deletions models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,27 @@ func HandleAndCommitEvents(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa
}
return nil
}

// ApplyModifiers modifies contacts by applying modifiers and handling the resultant events
func ApplyModifiers(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *OrgAssets, modifiersByContact map[*flows.Contact][]flows.Modifier) (map[*flows.Contact][]flows.Event, error) {
// create an environment instance with location support
env := flows.NewEnvironment(oa.Env(), oa.SessionAssets().Locations())

eventsByContact := make(map[*flows.Contact][]flows.Event, len(modifiersByContact))

// apply the modifiers to get the events for each contact
for contact, mods := range modifiersByContact {
events := make([]flows.Event, 0)
for _, mod := range mods {
mod.Apply(env, oa.SessionAssets(), contact, func(e flows.Event) { events = append(events, e) })
}
eventsByContact[contact] = events
}

err := HandleAndCommitEvents(ctx, db, rp, oa, eventsByContact)
if err != nil {
return nil, errors.Wrap(err, "error commiting events")
}

return eventsByContact, nil
}
206 changes: 175 additions & 31 deletions models/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package models
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/modifiers"
"github.com/pkg/errors"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -52,55 +55,214 @@ type ContactImportBatch struct {
}

// Import does the actual import of this batch
func (b *ContactImportBatch) Import(ctx context.Context, db *sqlx.DB) error {
func (b *ContactImportBatch) Import(ctx context.Context, db *sqlx.DB, orgID OrgID) error {
// if any error occurs this batch should be marked as failed
if err := b.tryImport(ctx, db); err != nil {
if err := b.tryImport(ctx, db, orgID); err != nil {
b.markFailed(ctx, db)
return err
}
return nil
}

func (b *ContactImportBatch) tryImport(ctx context.Context, db *sqlx.DB) error {
// holds work data for import of a single contact
type importContact struct {
record int
spec *ContactSpec
contact *Contact
created bool
flowContact *flows.Contact
mods []flows.Modifier
errors []string
}

func (b *ContactImportBatch) tryImport(ctx context.Context, db *sqlx.DB, orgID OrgID) error {
if err := b.markProcessing(ctx, db); err != nil {
return errors.Wrap(err, "unable to mark as processing")
return errors.Wrap(err, "error marking as processing")
}

// grab our org assets
oa, err := GetOrgAssets(ctx, db, orgID)
if err != nil {
return errors.Wrap(err, "error loading org assets")
}

// unmarshal this batch's specs
var specs []*ContactSpec
if err := jsonx.Unmarshal(b.Specs, &specs); err != nil {
return errors.New("unable to unmarsal specs")
return errors.Wrap(err, "error unmarsaling specs")
}

// create our work data for each contact being created or updated
imports := make([]*importContact, len(specs))
for i := range imports {
// ensure all URNs are normalized
for j, urn := range specs[i].URNs {
specs[i].URNs[j] = urn.Normalize(string(oa.Env().DefaultCountry()))
}

imports[i] = &importContact{record: b.RecordStart + i, spec: specs[i]}
}

if err := b.getOrCreateContacts(ctx, db, oa, imports); err != nil {
return errors.Wrap(err, "error getting and creating contacts")
}

// gather up contacts and modifiers
modifiersByContact := make(map[*flows.Contact][]flows.Modifier, len(imports))
for _, imp := range imports {
// ignore errored imports which couldn't get/create a contact
if imp.contact != nil {
modifiersByContact[imp.flowContact] = imp.mods
}
}

result, err := importContactSpecs(ctx, db, specs)
// and apply in bulk
_, err = ApplyModifiers(ctx, db, nil, oa, modifiersByContact)
if err != nil {
return errors.Wrap(err, "unable to import specs")
return errors.Wrap(err, "error applying modifiers")
}

if err := b.markComplete(ctx, db, result); err != nil {
if err := b.markComplete(ctx, db, imports); err != nil {
return errors.Wrap(err, "unable to mark as complete")
}

return nil
}

// for each import, fetches or creates the contact, creates the modifiers needed to set fields etc
func (b *ContactImportBatch) getOrCreateContacts(ctx context.Context, db *sqlx.DB, oa *OrgAssets, imports []*importContact) error {
sa := oa.SessionAssets()

// build map of UUIDs to contacts
contactsByUUID, err := b.loadContactsByUUID(ctx, db, oa, imports)
if err != nil {
return errors.Wrap(err, "error loading contacts by UUID")
}

for _, imp := range imports {
addModifier := func(m flows.Modifier) { imp.mods = append(imp.mods, m) }
addError := func(s string, args ...interface{}) { imp.errors = append(imp.errors, fmt.Sprintf(s, args...)) }
spec := imp.spec

uuid := spec.UUID
if uuid != "" {
imp.contact = contactsByUUID[uuid]
if imp.contact == nil {
addError("Unable to find contact with UUID '%s'", uuid)
continue
}

imp.flowContact, err = imp.contact.FlowContact(oa)
if err != nil {
return errors.Wrapf(err, "error creating flow contact for %d", imp.contact.ID())
}

addModifier(modifiers.NewURNs(spec.URNs, modifiers.URNsAppend))

} else {
// TODO get or create by multiple URNs

imp.contact, imp.flowContact, err = GetOrCreateContact(ctx, db, oa, spec.URNs[0])
if err != nil {
addError("Unable to get or create contact with URN '%s'", spec.URNs[0])
continue
}
}

if spec.Name != nil {
addModifier(modifiers.NewName(*spec.Name))
}
if spec.Language != nil {
lang, err := envs.ParseLanguage(*spec.Language)
if err != nil {
addError("'%s' is not a valid language code", *spec.Language)
} else {
addModifier(modifiers.NewLanguage(lang))
}
}

for key, value := range spec.Fields {
field := sa.Fields().Get(key)
if field == nil {
addError("'%s' is not a valid contact field key", key)
} else {
addModifier(modifiers.NewField(field, value))
}
}

if len(spec.Groups) > 0 {
groups := make([]*flows.Group, 0, len(spec.Groups))
for _, uuid := range spec.Groups {
group := sa.Groups().Get(uuid)
if group == nil {
addError("'%s' is not a valid contact group UUID", uuid)
} else {
groups = append(groups, group)
}
}
addModifier(modifiers.NewGroups(groups, modifiers.GroupsAdd))
}
}

return nil
}

// loads any import contacts for which we have UUIDs
func (b *ContactImportBatch) loadContactsByUUID(ctx context.Context, db *sqlx.DB, oa *OrgAssets, imports []*importContact) (map[flows.ContactUUID]*Contact, error) {
uuids := make([]flows.ContactUUID, 0, 50)
for _, imp := range imports {
if imp.spec.UUID != "" {
uuids = append(uuids, imp.spec.UUID)
}
}

// build map of UUIDs to contacts
contacts, err := LoadContactsByUUID(ctx, db, oa, uuids)
if err != nil {
return nil, err
}

contactsByUUID := make(map[flows.ContactUUID]*Contact, len(contacts))
for _, c := range contacts {
contactsByUUID[c.UUID()] = c
}
return contactsByUUID, nil
}

func (b *ContactImportBatch) markProcessing(ctx context.Context, db *sqlx.DB) error {
b.Status = ContactImportStatusProcessing
_, err := db.ExecContext(ctx, `UPDATE contacts_contactimportbatch SET status = $2 WHERE id = $1`, b.ID, b.Status)
return err
}

func (b *ContactImportBatch) markComplete(ctx context.Context, db *sqlx.DB, r importResult) error {
errorsJSON, err := jsonx.Marshal(r.errors)
func (b *ContactImportBatch) markComplete(ctx context.Context, db *sqlx.DB, imports []*importContact) error {
numCreated := 0
numUpdated := 0
numErrored := 0
importErrors := make([]importError, 0, 10)
for _, imp := range imports {
if imp.contact == nil {
numErrored++
} else if imp.created {
numCreated++
} else {
numUpdated++
}
for _, e := range imp.errors {
importErrors = append(importErrors, importError{Record: imp.record, Message: e})
}
}

errorsJSON, err := jsonx.Marshal(importErrors)
if err != nil {
return errors.Wrap(err, "error marshaling errors")
}

now := dates.Now()
b.Status = ContactImportStatusComplete
b.NumCreated = r.numCreated
b.NumUpdated = r.numUpdated
b.NumErrored = r.numErrored
b.NumCreated = numCreated
b.NumUpdated = numUpdated
b.NumErrored = numErrored
b.Errors = errorsJSON
b.FinishedOn = &now
_, err = db.ExecContext(ctx,
Expand Down Expand Up @@ -156,21 +318,3 @@ type importError struct {
Record int `json:"record"`
Message string `json:"message"`
}

// holds the result of importing a set of contact specs
type importResult struct {
numCreated int
numUpdated int
numErrored int
errors []importError
}

func importContactSpecs(ctx context.Context, db *sqlx.DB, specs []*ContactSpec) (importResult, error) {
res := importResult{
errors: make([]importError, 0),
}

// TODO

return res, nil
}
11 changes: 7 additions & 4 deletions models/imports_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package models_test

import (
"encoding/json"
"testing"

"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/flows"
_ "github.com/nyaruka/mailroom/hooks"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
Expand All @@ -21,18 +21,21 @@ func TestContactImportBatch(t *testing.T) {
db := testsuite.DB()

importID := testdata.InsertContactImport(t, db, models.Org1)
batchID := testdata.InsertContactImportBatch(t, db, importID, `[{"name": "Bob"},{"name": "Jim"}]`)
batchID := testdata.InsertContactImportBatch(t, db, importID, `[
{"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]},
{"name": "Leah", "urns": ["tel:+16055740002"]}
]`)

batch, err := models.LoadContactImportBatch(ctx, db, batchID)
require.NoError(t, err)

assert.Equal(t, importID, batch.ImportID)
assert.Equal(t, models.ContactImportStatus("P"), batch.Status)
assert.Equal(t, json.RawMessage(`[{"name": "Bob"}, {"name": "Jim"}]`), batch.Specs)
assert.NotNil(t, batch.Specs)
assert.Equal(t, 10, batch.RecordStart)
assert.Equal(t, 12, batch.RecordEnd)

err = batch.Import(ctx, db)
err = batch.Import(ctx, db, models.Org1)
require.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contactimportbatch WHERE status = 'C' AND finished_on IS NOT NULL`, []interface{}{}, 1)
Expand Down
2 changes: 1 addition & 1 deletion models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func NewBroadcastFromEvent(ctx context.Context, tx Queryer, org *OrgAssets, even
}

// resolve our contact references
contactIDs, err := ContactIDsFromReferences(ctx, tx, org, event.Contacts)
contactIDs, err := ContactIDsFromReferences(ctx, tx, org.OrgID(), event.Contacts)
if err != nil {
return nil, errors.Wrapf(err, "error resolving contact references")
}
Expand Down
Loading

0 comments on commit 2f1e7fe

Please sign in to comment.