From 5297467ac5da19a1c51cc6e91d9f01c4780680af Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Wed, 8 Sep 2021 17:32:08 -0500 Subject: [PATCH 1/3] WIP --- core/models/imports.go | 56 +++++++++++++++++++++ core/models/notifications.go | 13 +++++ core/tasks/contacts/import_contact_batch.go | 34 +++++++++++-- 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/core/models/imports.go b/core/models/imports.go index 0f6aab119..5241e917a 100644 --- a/core/models/imports.go +++ b/core/models/imports.go @@ -43,6 +43,62 @@ const ( ContactImportStatusFailed ContactImportStatus = "F" ) +type ContactImport struct { + ID ContactImportID `db:"id"` + OrgID OrgID `db:"org_id"` + Status ContactImportStatus `db:"status"` + CreatedByID UserID `db:"created_by_id"` + FinishedOn *time.Time `db:"finished_on"` + + BatchStatuses []string `db:"batch_statuses"` +} + +var loadContactImportSQL = ` +SELECT + i.id AS "id", + i.org_id AS "org_id", + i.status AS "status", + i.created_by_id AS "created_by_id", + i.finished_on AS "finished_on" + array_agg(DISTINCT b.status) AS "batch_statuses" +FROM + contacts_contactimport i +INNER JOIN + contacts_contactimportbatch b ON b.contact_import_id = i.id +WHERE + id = $1 +GROUP BY + c.id` + +// LoadContactImport loads a contact import by ID +func LoadContactImport(ctx context.Context, db Queryer, id ContactImportID) (*ContactImport, error) { + i := &ContactImport{} + err := db.GetContext(ctx, i, loadContactImportSQL, id) + if err != nil { + return nil, errors.Wrapf(err, "error loading contact import id=%d", id) + } + return i, nil +} + +var markContactImportFinishedSQL = ` +UPDATE + contacts_contactimport +SET + status = $2, + finished_on = $3 +WHERE + id = $1 +` + +func (i *ContactImport) MarkFinished(ctx context.Context, db Queryer, status ContactImportStatus) error { + now := dates.Now() + i.Status = status + i.FinishedOn = &now + + _, err := db.ExecContext(ctx, markContactImportFinishedSQL, i.ID, i.Status, i.FinishedOn) + return errors.Wrap(err, "error marking import as finished") +} + // ContactImportBatch is a batch of contacts within a larger import type ContactImportBatch struct { ID ContactImportBatchID `db:"id"` diff --git a/core/models/notifications.go b/core/models/notifications.go index 1e2f27589..181a0d60b 100644 --- a/core/models/notifications.go +++ b/core/models/notifications.go @@ -2,6 +2,7 @@ package models import ( "context" + "fmt" "time" "github.com/nyaruka/mailroom/utils/dbutil" @@ -34,6 +35,18 @@ type Notification struct { ContactImportID ContactImportID `db:"contact_import_id"` } +// NotifyImportFinished logs the the finishing of a contact import +func NotifyImportFinished(ctx context.Context, db Queryer, imp *ContactImport) error { + n := &Notification{ + OrgID: imp.OrgID, + Type: NotificationTypeImportFinished, + Scope: fmt.Sprintf("contact:%d", imp.ID), + UserID: imp.CreatedByID, + } + + return insertNotifications(ctx, db, []*Notification{n}) +} + var ticketAssignableToles = []UserRole{UserRoleAdministrator, UserRoleEditor, UserRoleAgent} // NotificationsFromTicketEvents logs the opening of new tickets and notifies all assignable users if tickets is not already assigned diff --git a/core/tasks/contacts/import_contact_batch.go b/core/tasks/contacts/import_contact_batch.go index 16c114a8b..dc0542a74 100644 --- a/core/tasks/contacts/import_contact_batch.go +++ b/core/tasks/contacts/import_contact_batch.go @@ -2,8 +2,10 @@ package contacts import ( "context" + "fmt" "time" + "github.com/gomodule/redigo/redis" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/core/tasks" "github.com/nyaruka/mailroom/runtime" @@ -34,9 +36,35 @@ func (t *ImportContactBatchTask) Perform(ctx context.Context, rt *runtime.Runtim return errors.Wrapf(err, "unable to load contact import batch with id %d", t.ContactImportBatchID) } - if err := batch.Import(ctx, rt.DB, orgID); err != nil { - return errors.Wrapf(err, "unable to import contact import batch %d", t.ContactImportBatchID) + batchErr := batch.Import(ctx, rt.DB, orgID) + + // decrement the redis key that holds remaining batches to see if the overall import is now finished + rc := rt.RP.Get() + defer rc.Close() + remaining, _ := redis.Int(rc.Do("decr", fmt.Sprintf("contact_import_batches_remaining:%d", batch.ImportID))) + if remaining == 0 { + imp, err := models.LoadContactImport(ctx, rt.DB, batch.ImportID) + if err != nil { + return errors.Wrap(err, "error loading contact import") + } + + // if any batch failed, then import is considered failed + status := models.ContactImportStatusComplete + for _, s := range imp.BatchStatuses { + if models.ContactImportStatus(s) == models.ContactImportStatusFailed { + status = models.ContactImportStatusFailed + break + } + } + + if err := imp.MarkFinished(ctx, rt.DB, status); err != nil { + return errors.Wrap(err, "error marking import as finished") + } + + if err := models.NotifyImportFinished(ctx, rt.DB, imp); err != nil { + return errors.Wrap(err, "error creating import finished notification") + } } - return nil + return errors.Wrapf(batchErr, "unable to import contact import batch %d", t.ContactImportBatchID) } From 76cd86c408493d96226c5ad7ac3a1263bdb3ab57 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 9 Sep 2021 11:34:46 -0500 Subject: [PATCH 2/3] Fix loading of imports, add tests --- core/models/imports.go | 11 +++-- core/models/notifications.go | 9 ++-- .../contacts/import_contact_batch_test.go | 47 +++++++++++++++++-- testsuite/testdata/imports.go | 6 +-- 4 files changed, 56 insertions(+), 17 deletions(-) diff --git a/core/models/imports.go b/core/models/imports.go index 5241e917a..cb0345e4a 100644 --- a/core/models/imports.go +++ b/core/models/imports.go @@ -50,7 +50,8 @@ type ContactImport struct { CreatedByID UserID `db:"created_by_id"` FinishedOn *time.Time `db:"finished_on"` - BatchStatuses []string `db:"batch_statuses"` + // we fetch unique batch statuses concatenated as a string, see https://github.com/jmoiron/sqlx/issues/168 + BatchStatuses string `db:"batch_statuses"` } var loadContactImportSQL = ` @@ -59,16 +60,16 @@ SELECT i.org_id AS "org_id", i.status AS "status", i.created_by_id AS "created_by_id", - i.finished_on AS "finished_on" - array_agg(DISTINCT b.status) AS "batch_statuses" + i.finished_on AS "finished_on", + array_to_string(array_agg(DISTINCT b.status), '') AS "batch_statuses" FROM contacts_contactimport i INNER JOIN contacts_contactimportbatch b ON b.contact_import_id = i.id WHERE - id = $1 + i.id = $1 GROUP BY - c.id` + i.id` // LoadContactImport loads a contact import by ID func LoadContactImport(ctx context.Context, db Queryer, id ContactImportID) (*ContactImport, error) { diff --git a/core/models/notifications.go b/core/models/notifications.go index 181a0d60b..b78282a3a 100644 --- a/core/models/notifications.go +++ b/core/models/notifications.go @@ -38,10 +38,11 @@ type Notification struct { // NotifyImportFinished logs the the finishing of a contact import func NotifyImportFinished(ctx context.Context, db Queryer, imp *ContactImport) error { n := &Notification{ - OrgID: imp.OrgID, - Type: NotificationTypeImportFinished, - Scope: fmt.Sprintf("contact:%d", imp.ID), - UserID: imp.CreatedByID, + OrgID: imp.OrgID, + Type: NotificationTypeImportFinished, + Scope: fmt.Sprintf("contact:%d", imp.ID), + UserID: imp.CreatedByID, + ContactImportID: imp.ID, } return insertNotifications(ctx, db, []*Notification{n}) diff --git a/core/tasks/contacts/import_contact_batch_test.go b/core/tasks/contacts/import_contact_batch_test.go index 77571371d..295593450 100644 --- a/core/tasks/contacts/import_contact_batch_test.go +++ b/core/tasks/contacts/import_contact_batch_test.go @@ -1,7 +1,9 @@ package contacts_test import ( + "fmt" "testing" + "time" _ "github.com/nyaruka/mailroom/core/handlers" "github.com/nyaruka/mailroom/core/tasks/contacts" @@ -12,19 +14,54 @@ import ( ) func TestImportContactBatch(t *testing.T) { - ctx, rt, db, _ := testsuite.Get() + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() - importID := testdata.InsertContactImport(db, testdata.Org1) - batchID := testdata.InsertContactImportBatch(db, importID, []byte(`[ + start := time.Now() + + defer func() { + db.MustExec(`DELETE FROM contacts_contactgroup_contacts WHERE contact_id > $1`, testdata.Org2Contact.ID) + db.MustExec(`DELETE FROM contacts_contacturn WHERE id > $1`, testdata.Org2Contact.URNID) + db.MustExec(`DELETE FROM contacts_contact WHERE id > $1`, testdata.Org2Contact.ID) + }() + + importID := testdata.InsertContactImport(db, testdata.Org1, testdata.Admin) + batch1ID := testdata.InsertContactImportBatch(db, importID, []byte(`[ {"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]}, {"name": "Leah", "urns": ["tel:+16055740002"]} ]`)) + batch2ID := testdata.InsertContactImportBatch(db, importID, []byte(`[ + {"name": "Rowan", "language": "spa", "urns": ["tel:+16055740003"]} + ]`)) - task := &contacts.ImportContactBatchTask{ContactImportBatchID: batchID} + rc.Do("setex", fmt.Sprintf("contact_import_batches_remaining:%d", importID), 10, 2) - err := task.Perform(ctx, rt, testdata.Org1.ID) + // perform first batch task... + task1 := &contacts.ImportContactBatchTask{ContactImportBatchID: batch1ID} + err := task1.Perform(ctx, rt, testdata.Org1.ID) require.NoError(t, err) + // import is still in progress + testsuite.AssertQuery(t, db, `SELECT status FROM contacts_contactimport WHERE id = $1`, importID).Columns(map[string]interface{}{"status": "O"}) + + // perform second batch task... + task2 := &contacts.ImportContactBatchTask{ContactImportBatchID: batch2ID} + err = task2.Perform(ctx, rt, testdata.Org1.ID) + require.NoError(t, err) + + testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE created_on > $1`, start).Returns(3) testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Norbert' AND language = 'eng'`).Returns(1) testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Leah' AND language IS NULL`).Returns(1) + testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Rowan' AND language = 'spa'`).Returns(1) + + // import is now complete and there is a notification for the creator + testsuite.AssertQuery(t, db, `SELECT status FROM contacts_contactimport WHERE id = $1`, importID).Columns(map[string]interface{}{"status": "C"}) + testsuite.AssertQuery(t, db, `SELECT org_id, notification_type, scope, user_id FROM notifications_notification WHERE contact_import_id = $1`, importID). + Columns(map[string]interface{}{ + "org_id": int64(testdata.Org1.ID), + "notification_type": "import:finished", + "scope": fmt.Sprintf("contact:%d", importID), + "user_id": int64(testdata.Admin.ID), + }) } diff --git a/testsuite/testdata/imports.go b/testsuite/testdata/imports.go index dae4bc614..2e8610a33 100644 --- a/testsuite/testdata/imports.go +++ b/testsuite/testdata/imports.go @@ -11,10 +11,10 @@ import ( ) // InsertContactImport inserts a contact import -func InsertContactImport(db *sqlx.DB, org *Org) models.ContactImportID { +func InsertContactImport(db *sqlx.DB, org *Org, createdBy *User) models.ContactImportID { var importID models.ContactImportID - must(db.Get(&importID, `INSERT INTO contacts_contactimport(org_id, file, original_filename, headers, mappings, num_records, group_id, started_on, created_on, created_by_id, modified_on, modified_by_id, is_active) - VALUES($1, 'contact_imports/1234.xlsx', 'contacts.xlsx', '{"Name", "URN:Tel"}', '{}', 30, NULL, $2, $2, 1, $2, 1, TRUE) RETURNING id`, org.ID, dates.Now(), + must(db.Get(&importID, `INSERT INTO contacts_contactimport(org_id, file, original_filename, mappings, num_records, group_id, started_on, status, created_on, created_by_id, modified_on, modified_by_id, is_active) + VALUES($1, 'contact_imports/1234.xlsx', 'contacts.xlsx', '{}', 30, NULL, $2, 'O', $2, $3, $2, $3, TRUE) RETURNING id`, org.ID, dates.Now(), createdBy.ID, )) return importID } From 27cfe18eb2d9db9f0df8f072c447057f5dfdb9e2 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 9 Sep 2021 15:13:03 -0500 Subject: [PATCH 3/3] Coverage --- core/models/imports.go | 2 +- core/models/imports_test.go | 40 ++++++++++++++++++++++++------- core/models/notifications_test.go | 24 ++++++++++++++++++- testsuite/testdata/contacts.go | 3 +++ 4 files changed, 58 insertions(+), 11 deletions(-) diff --git a/core/models/imports.go b/core/models/imports.go index cb0345e4a..1f7df13ae 100644 --- a/core/models/imports.go +++ b/core/models/imports.go @@ -64,7 +64,7 @@ SELECT array_to_string(array_agg(DISTINCT b.status), '') AS "batch_statuses" FROM contacts_contactimport i -INNER JOIN +LEFT OUTER JOIN contacts_contactimportbatch b ON b.contact_import_id = i.id WHERE i.id = $1 diff --git a/core/models/imports_test.go b/core/models/imports_test.go index 1f7974a6c..5083341cc 100644 --- a/core/models/imports_test.go +++ b/core/models/imports_test.go @@ -146,28 +146,50 @@ func TestContactImports(t *testing.T) { } } -func TestContactImportBatch(t *testing.T) { +func TestLoadContactImport(t *testing.T) { ctx, _, db, _ := testsuite.Get() + defer testdata.ResetContactData(db) + importID := testdata.InsertContactImport(db, testdata.Org1, testdata.Admin) - batchID := testdata.InsertContactImportBatch(db, importID, []byte(`[ + batch1ID := testdata.InsertContactImportBatch(db, importID, []byte(`[ {"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]}, {"name": "Leah", "urns": ["tel:+16055740002"]} ]`)) + testdata.InsertContactImportBatch(db, importID, []byte(`[ + {"name": "Rowan", "language": "spa", "urns": ["tel:+16055740003"]} + ]`)) + + imp, err := models.LoadContactImport(ctx, db, importID) + require.NoError(t, err) - batch, err := models.LoadContactImportBatch(ctx, db, batchID) + assert.Equal(t, testdata.Org1.ID, imp.OrgID) + assert.Equal(t, testdata.Admin.ID, imp.CreatedByID) + assert.Equal(t, models.ContactImportStatusProcessing, imp.Status) + assert.Nil(t, imp.FinishedOn) + assert.Equal(t, "P", imp.BatchStatuses) + + batch1, err := models.LoadContactImportBatch(ctx, db, batch1ID) require.NoError(t, err) - assert.Equal(t, importID, batch.ImportID) - assert.Equal(t, models.ContactImportStatus("P"), batch.Status) - assert.NotNil(t, batch.Specs) - assert.Equal(t, 0, batch.RecordStart) - assert.Equal(t, 2, batch.RecordEnd) + assert.Equal(t, importID, batch1.ImportID) + assert.Equal(t, models.ContactImportStatusPending, batch1.Status) + assert.NotNil(t, batch1.Specs) + assert.Equal(t, 0, batch1.RecordStart) + assert.Equal(t, 2, batch1.RecordEnd) + + err = batch1.Import(ctx, db, testdata.Org1.ID) + require.NoError(t, err) - err = batch.Import(ctx, db, testdata.Org1.ID) + imp, err = models.LoadContactImport(ctx, db, importID) require.NoError(t, err) + batchStatuses := strings.Split(imp.BatchStatuses, "") + sort.Strings(batchStatuses) + assert.Equal(t, []string{"C", "P"}, batchStatuses) + testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contactimportbatch WHERE status = 'C' AND finished_on IS NOT NULL`).Returns(1) + testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contactimportbatch WHERE status = 'P' AND finished_on IS NULL`).Returns(1) } func TestContactSpecUnmarshal(t *testing.T) { diff --git a/core/models/notifications_test.go b/core/models/notifications_test.go index fe333c0f8..a4b28d5ae 100644 --- a/core/models/notifications_test.go +++ b/core/models/notifications_test.go @@ -16,7 +16,7 @@ import ( func TestTicketNotifications(t *testing.T) { ctx, _, db, _ := testsuite.Get() - defer deleteTickets(db) + defer testdata.ResetContactData(db) oa, err := models.GetOrgAssets(ctx, db, testdata.Org1.ID) require.NoError(t, err) @@ -105,6 +105,28 @@ func TestTicketNotifications(t *testing.T) { assertNotifications(t, ctx, db, t5, map[*testdata.User][]models.NotificationType{}) } +func TestImportNotifications(t *testing.T) { + ctx, _, db, _ := testsuite.Get() + + defer testdata.ResetContactData(db) + + importID := testdata.InsertContactImport(db, testdata.Org1, testdata.Editor) + imp, err := models.LoadContactImport(ctx, db, importID) + require.NoError(t, err) + + err = imp.MarkFinished(ctx, db, models.ContactImportStatusComplete) + require.NoError(t, err) + + t0 := time.Now() + + err = models.NotifyImportFinished(ctx, db, imp) + require.NoError(t, err) + + assertNotifications(t, ctx, db, t0, map[*testdata.User][]models.NotificationType{ + testdata.Editor: {models.NotificationTypeImportFinished}, + }) +} + func assertNotifications(t *testing.T, ctx context.Context, db *sqlx.DB, after time.Time, expected map[*testdata.User][]models.NotificationType) { // check last log var notifications []*models.Notification diff --git a/testsuite/testdata/contacts.go b/testsuite/testdata/contacts.go index 6faae8455..08578859b 100644 --- a/testsuite/testdata/contacts.go +++ b/testsuite/testdata/contacts.go @@ -86,10 +86,13 @@ func InsertContactURN(db *sqlx.DB, org *Org, contact *Contact, urn urns.URN, pri // ResetContactData removes contact data not in the test database dump. Note that this function can't // undo changes made to the contact data in the test database dump. func ResetContactData(db *sqlx.DB) { + db.MustExec(`DELETE FROM notifications_notification`) db.MustExec(`DELETE FROM tickets_ticketevent`) db.MustExec(`DELETE FROM tickets_ticket`) db.MustExec(`DELETE FROM channels_channelcount`) db.MustExec(`DELETE FROM msgs_msg`) + db.MustExec(`DELETE FROM contacts_contactimportbatch`) + db.MustExec(`DELETE FROM contacts_contactimport`) db.MustExec(`DELETE FROM contacts_contacturn WHERE id >= 30000`) db.MustExec(`DELETE FROM contacts_contactgroup_contacts WHERE contact_id >= 30000`) db.MustExec(`DELETE FROM contacts_contact WHERE id >= 30000`)