diff --git a/core/models/imports.go b/core/models/imports.go index 0f6aab119..1f7df13ae 100644 --- a/core/models/imports.go +++ b/core/models/imports.go @@ -43,6 +43,63 @@ const ( ContactImportStatusFailed ContactImportStatus = "F" ) +type ContactImport struct { + ID ContactImportID `db:"id"` + OrgID OrgID `db:"org_id"` + Status ContactImportStatus `db:"status"` + CreatedByID UserID `db:"created_by_id"` + FinishedOn *time.Time `db:"finished_on"` + + // we fetch unique batch statuses concatenated as a string, see https://github.com/jmoiron/sqlx/issues/168 + BatchStatuses string `db:"batch_statuses"` +} + +var loadContactImportSQL = ` +SELECT + i.id AS "id", + i.org_id AS "org_id", + i.status AS "status", + i.created_by_id AS "created_by_id", + i.finished_on AS "finished_on", + array_to_string(array_agg(DISTINCT b.status), '') AS "batch_statuses" +FROM + contacts_contactimport i +LEFT OUTER JOIN + contacts_contactimportbatch b ON b.contact_import_id = i.id +WHERE + i.id = $1 +GROUP BY + i.id` + +// LoadContactImport loads a contact import by ID +func LoadContactImport(ctx context.Context, db Queryer, id ContactImportID) (*ContactImport, error) { + i := &ContactImport{} + err := db.GetContext(ctx, i, loadContactImportSQL, id) + if err != nil { + return nil, errors.Wrapf(err, "error loading contact import id=%d", id) + } + return i, nil +} + +var markContactImportFinishedSQL = ` +UPDATE + contacts_contactimport +SET + status = $2, + finished_on = $3 +WHERE + id = $1 +` + +func (i *ContactImport) MarkFinished(ctx context.Context, db Queryer, status ContactImportStatus) error { + now := dates.Now() + i.Status = status + i.FinishedOn = &now + + _, err := db.ExecContext(ctx, markContactImportFinishedSQL, i.ID, i.Status, i.FinishedOn) + return errors.Wrap(err, "error marking import as finished") +} + // ContactImportBatch is a batch of contacts within a larger import type ContactImportBatch struct { ID ContactImportBatchID `db:"id"` diff --git a/core/models/imports_test.go b/core/models/imports_test.go index 1f7974a6c..5083341cc 100644 --- a/core/models/imports_test.go +++ b/core/models/imports_test.go @@ -146,28 +146,50 @@ func TestContactImports(t *testing.T) { } } -func TestContactImportBatch(t *testing.T) { +func TestLoadContactImport(t *testing.T) { ctx, _, db, _ := testsuite.Get() + defer testdata.ResetContactData(db) + importID := testdata.InsertContactImport(db, testdata.Org1, testdata.Admin) - batchID := testdata.InsertContactImportBatch(db, importID, []byte(`[ + batch1ID := testdata.InsertContactImportBatch(db, importID, []byte(`[ {"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]}, {"name": "Leah", "urns": ["tel:+16055740002"]} ]`)) + testdata.InsertContactImportBatch(db, importID, []byte(`[ + {"name": "Rowan", "language": "spa", "urns": ["tel:+16055740003"]} + ]`)) + + imp, err := models.LoadContactImport(ctx, db, importID) + require.NoError(t, err) - batch, err := models.LoadContactImportBatch(ctx, db, batchID) + assert.Equal(t, testdata.Org1.ID, imp.OrgID) + assert.Equal(t, testdata.Admin.ID, imp.CreatedByID) + assert.Equal(t, models.ContactImportStatusProcessing, imp.Status) + assert.Nil(t, imp.FinishedOn) + assert.Equal(t, "P", imp.BatchStatuses) + + batch1, err := models.LoadContactImportBatch(ctx, db, batch1ID) require.NoError(t, err) - assert.Equal(t, importID, batch.ImportID) - assert.Equal(t, models.ContactImportStatus("P"), batch.Status) - assert.NotNil(t, batch.Specs) - assert.Equal(t, 0, batch.RecordStart) - assert.Equal(t, 2, batch.RecordEnd) + assert.Equal(t, importID, batch1.ImportID) + assert.Equal(t, models.ContactImportStatusPending, batch1.Status) + assert.NotNil(t, batch1.Specs) + assert.Equal(t, 0, batch1.RecordStart) + assert.Equal(t, 2, batch1.RecordEnd) + + err = batch1.Import(ctx, db, testdata.Org1.ID) + require.NoError(t, err) - err = batch.Import(ctx, db, testdata.Org1.ID) + imp, err = models.LoadContactImport(ctx, db, importID) require.NoError(t, err) + batchStatuses := strings.Split(imp.BatchStatuses, "") + sort.Strings(batchStatuses) + assert.Equal(t, []string{"C", "P"}, batchStatuses) + testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contactimportbatch WHERE status = 'C' AND finished_on IS NOT NULL`).Returns(1) + testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contactimportbatch WHERE status = 'P' AND finished_on IS NULL`).Returns(1) } func TestContactSpecUnmarshal(t *testing.T) { diff --git a/core/models/notifications.go b/core/models/notifications.go index 1e2f27589..b78282a3a 100644 --- a/core/models/notifications.go +++ b/core/models/notifications.go @@ -2,6 +2,7 @@ package models import ( "context" + "fmt" "time" "github.com/nyaruka/mailroom/utils/dbutil" @@ -34,6 +35,19 @@ type Notification struct { ContactImportID ContactImportID `db:"contact_import_id"` } +// NotifyImportFinished logs the the finishing of a contact import +func NotifyImportFinished(ctx context.Context, db Queryer, imp *ContactImport) error { + n := &Notification{ + OrgID: imp.OrgID, + Type: NotificationTypeImportFinished, + Scope: fmt.Sprintf("contact:%d", imp.ID), + UserID: imp.CreatedByID, + ContactImportID: imp.ID, + } + + return insertNotifications(ctx, db, []*Notification{n}) +} + var ticketAssignableToles = []UserRole{UserRoleAdministrator, UserRoleEditor, UserRoleAgent} // NotificationsFromTicketEvents logs the opening of new tickets and notifies all assignable users if tickets is not already assigned diff --git a/core/models/notifications_test.go b/core/models/notifications_test.go index fe333c0f8..a4b28d5ae 100644 --- a/core/models/notifications_test.go +++ b/core/models/notifications_test.go @@ -16,7 +16,7 @@ import ( func TestTicketNotifications(t *testing.T) { ctx, _, db, _ := testsuite.Get() - defer deleteTickets(db) + defer testdata.ResetContactData(db) oa, err := models.GetOrgAssets(ctx, db, testdata.Org1.ID) require.NoError(t, err) @@ -105,6 +105,28 @@ func TestTicketNotifications(t *testing.T) { assertNotifications(t, ctx, db, t5, map[*testdata.User][]models.NotificationType{}) } +func TestImportNotifications(t *testing.T) { + ctx, _, db, _ := testsuite.Get() + + defer testdata.ResetContactData(db) + + importID := testdata.InsertContactImport(db, testdata.Org1, testdata.Editor) + imp, err := models.LoadContactImport(ctx, db, importID) + require.NoError(t, err) + + err = imp.MarkFinished(ctx, db, models.ContactImportStatusComplete) + require.NoError(t, err) + + t0 := time.Now() + + err = models.NotifyImportFinished(ctx, db, imp) + require.NoError(t, err) + + assertNotifications(t, ctx, db, t0, map[*testdata.User][]models.NotificationType{ + testdata.Editor: {models.NotificationTypeImportFinished}, + }) +} + func assertNotifications(t *testing.T, ctx context.Context, db *sqlx.DB, after time.Time, expected map[*testdata.User][]models.NotificationType) { // check last log var notifications []*models.Notification diff --git a/core/tasks/contacts/import_contact_batch.go b/core/tasks/contacts/import_contact_batch.go index 16c114a8b..dc0542a74 100644 --- a/core/tasks/contacts/import_contact_batch.go +++ b/core/tasks/contacts/import_contact_batch.go @@ -2,8 +2,10 @@ package contacts import ( "context" + "fmt" "time" + "github.com/gomodule/redigo/redis" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/core/tasks" "github.com/nyaruka/mailroom/runtime" @@ -34,9 +36,35 @@ func (t *ImportContactBatchTask) Perform(ctx context.Context, rt *runtime.Runtim return errors.Wrapf(err, "unable to load contact import batch with id %d", t.ContactImportBatchID) } - if err := batch.Import(ctx, rt.DB, orgID); err != nil { - return errors.Wrapf(err, "unable to import contact import batch %d", t.ContactImportBatchID) + batchErr := batch.Import(ctx, rt.DB, orgID) + + // decrement the redis key that holds remaining batches to see if the overall import is now finished + rc := rt.RP.Get() + defer rc.Close() + remaining, _ := redis.Int(rc.Do("decr", fmt.Sprintf("contact_import_batches_remaining:%d", batch.ImportID))) + if remaining == 0 { + imp, err := models.LoadContactImport(ctx, rt.DB, batch.ImportID) + if err != nil { + return errors.Wrap(err, "error loading contact import") + } + + // if any batch failed, then import is considered failed + status := models.ContactImportStatusComplete + for _, s := range imp.BatchStatuses { + if models.ContactImportStatus(s) == models.ContactImportStatusFailed { + status = models.ContactImportStatusFailed + break + } + } + + if err := imp.MarkFinished(ctx, rt.DB, status); err != nil { + return errors.Wrap(err, "error marking import as finished") + } + + if err := models.NotifyImportFinished(ctx, rt.DB, imp); err != nil { + return errors.Wrap(err, "error creating import finished notification") + } } - return nil + return errors.Wrapf(batchErr, "unable to import contact import batch %d", t.ContactImportBatchID) } diff --git a/core/tasks/contacts/import_contact_batch_test.go b/core/tasks/contacts/import_contact_batch_test.go index d7e969ab4..04aae0e14 100644 --- a/core/tasks/contacts/import_contact_batch_test.go +++ b/core/tasks/contacts/import_contact_batch_test.go @@ -1,6 +1,7 @@ package contacts_test import ( + "fmt" "testing" _ "github.com/nyaruka/mailroom/core/handlers" @@ -12,19 +13,48 @@ import ( ) func TestImportContactBatch(t *testing.T) { - ctx, rt, db, _ := testsuite.Get() + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testdata.ResetContactData(db) importID := testdata.InsertContactImport(db, testdata.Org1, testdata.Admin) - batchID := testdata.InsertContactImportBatch(db, importID, []byte(`[ + batch1ID := testdata.InsertContactImportBatch(db, importID, []byte(`[ {"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]}, {"name": "Leah", "urns": ["tel:+16055740002"]} ]`)) + batch2ID := testdata.InsertContactImportBatch(db, importID, []byte(`[ + {"name": "Rowan", "language": "spa", "urns": ["tel:+16055740003"]} + ]`)) + + rc.Do("setex", fmt.Sprintf("contact_import_batches_remaining:%d", importID), 10, 2) + + // perform first batch task... + task1 := &contacts.ImportContactBatchTask{ContactImportBatchID: batch1ID} + err := task1.Perform(ctx, rt, testdata.Org1.ID) + require.NoError(t, err) - task := &contacts.ImportContactBatchTask{ContactImportBatchID: batchID} + // import is still in progress + testsuite.AssertQuery(t, db, `SELECT status FROM contacts_contactimport WHERE id = $1`, importID).Columns(map[string]interface{}{"status": "O"}) - err := task.Perform(ctx, rt, testdata.Org1.ID) + // perform second batch task... + task2 := &contacts.ImportContactBatchTask{ContactImportBatchID: batch2ID} + err = task2.Perform(ctx, rt, testdata.Org1.ID) require.NoError(t, err) + testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE id >= 30000`).Returns(3) testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Norbert' AND language = 'eng'`).Returns(1) testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Leah' AND language IS NULL`).Returns(1) + testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Rowan' AND language = 'spa'`).Returns(1) + + // import is now complete and there is a notification for the creator + testsuite.AssertQuery(t, db, `SELECT status FROM contacts_contactimport WHERE id = $1`, importID).Columns(map[string]interface{}{"status": "C"}) + testsuite.AssertQuery(t, db, `SELECT org_id, notification_type, scope, user_id FROM notifications_notification WHERE contact_import_id = $1`, importID). + Columns(map[string]interface{}{ + "org_id": int64(testdata.Org1.ID), + "notification_type": "import:finished", + "scope": fmt.Sprintf("contact:%d", importID), + "user_id": int64(testdata.Admin.ID), + }) } diff --git a/testsuite/testdata/contacts.go b/testsuite/testdata/contacts.go index 6faae8455..08578859b 100644 --- a/testsuite/testdata/contacts.go +++ b/testsuite/testdata/contacts.go @@ -86,10 +86,13 @@ func InsertContactURN(db *sqlx.DB, org *Org, contact *Contact, urn urns.URN, pri // ResetContactData removes contact data not in the test database dump. Note that this function can't // undo changes made to the contact data in the test database dump. func ResetContactData(db *sqlx.DB) { + db.MustExec(`DELETE FROM notifications_notification`) db.MustExec(`DELETE FROM tickets_ticketevent`) db.MustExec(`DELETE FROM tickets_ticket`) db.MustExec(`DELETE FROM channels_channelcount`) db.MustExec(`DELETE FROM msgs_msg`) + db.MustExec(`DELETE FROM contacts_contactimportbatch`) + db.MustExec(`DELETE FROM contacts_contactimport`) db.MustExec(`DELETE FROM contacts_contacturn WHERE id >= 30000`) db.MustExec(`DELETE FROM contacts_contactgroup_contacts WHERE contact_id >= 30000`) db.MustExec(`DELETE FROM contacts_contact WHERE id >= 30000`)