Skip to content

Commit

Permalink
Merge pull request rapidpro#493 from nyaruka/import_notifications
Browse files Browse the repository at this point in the history
Contact Import Notifications
  • Loading branch information
rowanseymour authored Sep 9, 2021
2 parents 2baf4a0 + 27cfe18 commit 98b1852
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 17 deletions.
57 changes: 57 additions & 0 deletions core/models/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,63 @@ const (
ContactImportStatusFailed ContactImportStatus = "F"
)

type ContactImport struct {
ID ContactImportID `db:"id"`
OrgID OrgID `db:"org_id"`
Status ContactImportStatus `db:"status"`
CreatedByID UserID `db:"created_by_id"`
FinishedOn *time.Time `db:"finished_on"`

// we fetch unique batch statuses concatenated as a string, see https://github.com/jmoiron/sqlx/issues/168
BatchStatuses string `db:"batch_statuses"`
}

var loadContactImportSQL = `
SELECT
i.id AS "id",
i.org_id AS "org_id",
i.status AS "status",
i.created_by_id AS "created_by_id",
i.finished_on AS "finished_on",
array_to_string(array_agg(DISTINCT b.status), '') AS "batch_statuses"
FROM
contacts_contactimport i
LEFT OUTER JOIN
contacts_contactimportbatch b ON b.contact_import_id = i.id
WHERE
i.id = $1
GROUP BY
i.id`

// LoadContactImport loads a contact import by ID
func LoadContactImport(ctx context.Context, db Queryer, id ContactImportID) (*ContactImport, error) {
i := &ContactImport{}
err := db.GetContext(ctx, i, loadContactImportSQL, id)
if err != nil {
return nil, errors.Wrapf(err, "error loading contact import id=%d", id)
}
return i, nil
}

var markContactImportFinishedSQL = `
UPDATE
contacts_contactimport
SET
status = $2,
finished_on = $3
WHERE
id = $1
`

func (i *ContactImport) MarkFinished(ctx context.Context, db Queryer, status ContactImportStatus) error {
now := dates.Now()
i.Status = status
i.FinishedOn = &now

_, err := db.ExecContext(ctx, markContactImportFinishedSQL, i.ID, i.Status, i.FinishedOn)
return errors.Wrap(err, "error marking import as finished")
}

// ContactImportBatch is a batch of contacts within a larger import
type ContactImportBatch struct {
ID ContactImportBatchID `db:"id"`
Expand Down
40 changes: 31 additions & 9 deletions core/models/imports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,28 +146,50 @@ func TestContactImports(t *testing.T) {
}
}

func TestContactImportBatch(t *testing.T) {
func TestLoadContactImport(t *testing.T) {
ctx, _, db, _ := testsuite.Get()

defer testdata.ResetContactData(db)

importID := testdata.InsertContactImport(db, testdata.Org1, testdata.Admin)
batchID := testdata.InsertContactImportBatch(db, importID, []byte(`[
batch1ID := testdata.InsertContactImportBatch(db, importID, []byte(`[
{"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]},
{"name": "Leah", "urns": ["tel:+16055740002"]}
]`))
testdata.InsertContactImportBatch(db, importID, []byte(`[
{"name": "Rowan", "language": "spa", "urns": ["tel:+16055740003"]}
]`))

imp, err := models.LoadContactImport(ctx, db, importID)
require.NoError(t, err)

batch, err := models.LoadContactImportBatch(ctx, db, batchID)
assert.Equal(t, testdata.Org1.ID, imp.OrgID)
assert.Equal(t, testdata.Admin.ID, imp.CreatedByID)
assert.Equal(t, models.ContactImportStatusProcessing, imp.Status)
assert.Nil(t, imp.FinishedOn)
assert.Equal(t, "P", imp.BatchStatuses)

batch1, err := models.LoadContactImportBatch(ctx, db, batch1ID)
require.NoError(t, err)

assert.Equal(t, importID, batch.ImportID)
assert.Equal(t, models.ContactImportStatus("P"), batch.Status)
assert.NotNil(t, batch.Specs)
assert.Equal(t, 0, batch.RecordStart)
assert.Equal(t, 2, batch.RecordEnd)
assert.Equal(t, importID, batch1.ImportID)
assert.Equal(t, models.ContactImportStatusPending, batch1.Status)
assert.NotNil(t, batch1.Specs)
assert.Equal(t, 0, batch1.RecordStart)
assert.Equal(t, 2, batch1.RecordEnd)

err = batch1.Import(ctx, db, testdata.Org1.ID)
require.NoError(t, err)

err = batch.Import(ctx, db, testdata.Org1.ID)
imp, err = models.LoadContactImport(ctx, db, importID)
require.NoError(t, err)

batchStatuses := strings.Split(imp.BatchStatuses, "")
sort.Strings(batchStatuses)
assert.Equal(t, []string{"C", "P"}, batchStatuses)

testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contactimportbatch WHERE status = 'C' AND finished_on IS NOT NULL`).Returns(1)
testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contactimportbatch WHERE status = 'P' AND finished_on IS NULL`).Returns(1)
}

func TestContactSpecUnmarshal(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions core/models/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package models

import (
"context"
"fmt"
"time"

"github.com/nyaruka/mailroom/utils/dbutil"
Expand Down Expand Up @@ -34,6 +35,19 @@ type Notification struct {
ContactImportID ContactImportID `db:"contact_import_id"`
}

// NotifyImportFinished logs the the finishing of a contact import
func NotifyImportFinished(ctx context.Context, db Queryer, imp *ContactImport) error {
n := &Notification{
OrgID: imp.OrgID,
Type: NotificationTypeImportFinished,
Scope: fmt.Sprintf("contact:%d", imp.ID),
UserID: imp.CreatedByID,
ContactImportID: imp.ID,
}

return insertNotifications(ctx, db, []*Notification{n})
}

var ticketAssignableToles = []UserRole{UserRoleAdministrator, UserRoleEditor, UserRoleAgent}

// NotificationsFromTicketEvents logs the opening of new tickets and notifies all assignable users if tickets is not already assigned
Expand Down
24 changes: 23 additions & 1 deletion core/models/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestTicketNotifications(t *testing.T) {
ctx, _, db, _ := testsuite.Get()

defer deleteTickets(db)
defer testdata.ResetContactData(db)

oa, err := models.GetOrgAssets(ctx, db, testdata.Org1.ID)
require.NoError(t, err)
Expand Down Expand Up @@ -105,6 +105,28 @@ func TestTicketNotifications(t *testing.T) {
assertNotifications(t, ctx, db, t5, map[*testdata.User][]models.NotificationType{})
}

func TestImportNotifications(t *testing.T) {
ctx, _, db, _ := testsuite.Get()

defer testdata.ResetContactData(db)

importID := testdata.InsertContactImport(db, testdata.Org1, testdata.Editor)
imp, err := models.LoadContactImport(ctx, db, importID)
require.NoError(t, err)

err = imp.MarkFinished(ctx, db, models.ContactImportStatusComplete)
require.NoError(t, err)

t0 := time.Now()

err = models.NotifyImportFinished(ctx, db, imp)
require.NoError(t, err)

assertNotifications(t, ctx, db, t0, map[*testdata.User][]models.NotificationType{
testdata.Editor: {models.NotificationTypeImportFinished},
})
}

func assertNotifications(t *testing.T, ctx context.Context, db *sqlx.DB, after time.Time, expected map[*testdata.User][]models.NotificationType) {
// check last log
var notifications []*models.Notification
Expand Down
34 changes: 31 additions & 3 deletions core/tasks/contacts/import_contact_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package contacts

import (
"context"
"fmt"
"time"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
Expand Down Expand Up @@ -34,9 +36,35 @@ func (t *ImportContactBatchTask) Perform(ctx context.Context, rt *runtime.Runtim
return errors.Wrapf(err, "unable to load contact import batch with id %d", t.ContactImportBatchID)
}

if err := batch.Import(ctx, rt.DB, orgID); err != nil {
return errors.Wrapf(err, "unable to import contact import batch %d", t.ContactImportBatchID)
batchErr := batch.Import(ctx, rt.DB, orgID)

// decrement the redis key that holds remaining batches to see if the overall import is now finished
rc := rt.RP.Get()
defer rc.Close()
remaining, _ := redis.Int(rc.Do("decr", fmt.Sprintf("contact_import_batches_remaining:%d", batch.ImportID)))
if remaining == 0 {
imp, err := models.LoadContactImport(ctx, rt.DB, batch.ImportID)
if err != nil {
return errors.Wrap(err, "error loading contact import")
}

// if any batch failed, then import is considered failed
status := models.ContactImportStatusComplete
for _, s := range imp.BatchStatuses {
if models.ContactImportStatus(s) == models.ContactImportStatusFailed {
status = models.ContactImportStatusFailed
break
}
}

if err := imp.MarkFinished(ctx, rt.DB, status); err != nil {
return errors.Wrap(err, "error marking import as finished")
}

if err := models.NotifyImportFinished(ctx, rt.DB, imp); err != nil {
return errors.Wrap(err, "error creating import finished notification")
}
}

return nil
return errors.Wrapf(batchErr, "unable to import contact import batch %d", t.ContactImportBatchID)
}
38 changes: 34 additions & 4 deletions core/tasks/contacts/import_contact_batch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package contacts_test

import (
"fmt"
"testing"

_ "github.com/nyaruka/mailroom/core/handlers"
Expand All @@ -12,19 +13,48 @@ import (
)

func TestImportContactBatch(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()
ctx, rt, db, rp := testsuite.Get()
rc := rp.Get()
defer rc.Close()

defer testdata.ResetContactData(db)

importID := testdata.InsertContactImport(db, testdata.Org1, testdata.Admin)
batchID := testdata.InsertContactImportBatch(db, importID, []byte(`[
batch1ID := testdata.InsertContactImportBatch(db, importID, []byte(`[
{"name": "Norbert", "language": "eng", "urns": ["tel:+16055740001"]},
{"name": "Leah", "urns": ["tel:+16055740002"]}
]`))
batch2ID := testdata.InsertContactImportBatch(db, importID, []byte(`[
{"name": "Rowan", "language": "spa", "urns": ["tel:+16055740003"]}
]`))

rc.Do("setex", fmt.Sprintf("contact_import_batches_remaining:%d", importID), 10, 2)

// perform first batch task...
task1 := &contacts.ImportContactBatchTask{ContactImportBatchID: batch1ID}
err := task1.Perform(ctx, rt, testdata.Org1.ID)
require.NoError(t, err)

task := &contacts.ImportContactBatchTask{ContactImportBatchID: batchID}
// import is still in progress
testsuite.AssertQuery(t, db, `SELECT status FROM contacts_contactimport WHERE id = $1`, importID).Columns(map[string]interface{}{"status": "O"})

err := task.Perform(ctx, rt, testdata.Org1.ID)
// perform second batch task...
task2 := &contacts.ImportContactBatchTask{ContactImportBatchID: batch2ID}
err = task2.Perform(ctx, rt, testdata.Org1.ID)
require.NoError(t, err)

testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE id >= 30000`).Returns(3)
testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Norbert' AND language = 'eng'`).Returns(1)
testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Leah' AND language IS NULL`).Returns(1)
testsuite.AssertQuery(t, db, `SELECT count(*) FROM contacts_contact WHERE name = 'Rowan' AND language = 'spa'`).Returns(1)

// import is now complete and there is a notification for the creator
testsuite.AssertQuery(t, db, `SELECT status FROM contacts_contactimport WHERE id = $1`, importID).Columns(map[string]interface{}{"status": "C"})
testsuite.AssertQuery(t, db, `SELECT org_id, notification_type, scope, user_id FROM notifications_notification WHERE contact_import_id = $1`, importID).
Columns(map[string]interface{}{
"org_id": int64(testdata.Org1.ID),
"notification_type": "import:finished",
"scope": fmt.Sprintf("contact:%d", importID),
"user_id": int64(testdata.Admin.ID),
})
}
3 changes: 3 additions & 0 deletions testsuite/testdata/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,13 @@ func InsertContactURN(db *sqlx.DB, org *Org, contact *Contact, urn urns.URN, pri
// ResetContactData removes contact data not in the test database dump. Note that this function can't
// undo changes made to the contact data in the test database dump.
func ResetContactData(db *sqlx.DB) {
db.MustExec(`DELETE FROM notifications_notification`)
db.MustExec(`DELETE FROM tickets_ticketevent`)
db.MustExec(`DELETE FROM tickets_ticket`)
db.MustExec(`DELETE FROM channels_channelcount`)
db.MustExec(`DELETE FROM msgs_msg`)
db.MustExec(`DELETE FROM contacts_contactimportbatch`)
db.MustExec(`DELETE FROM contacts_contactimport`)
db.MustExec(`DELETE FROM contacts_contacturn WHERE id >= 30000`)
db.MustExec(`DELETE FROM contacts_contactgroup_contacts WHERE contact_id >= 30000`)
db.MustExec(`DELETE FROM contacts_contact WHERE id >= 30000`)
Expand Down

0 comments on commit 98b1852

Please sign in to comment.