Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Sep 8, 2021
1 parent 0b67cac commit 5297467
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 3 deletions.
56 changes: 56 additions & 0 deletions core/models/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,62 @@ const (
ContactImportStatusFailed ContactImportStatus = "F"
)

type ContactImport struct {
ID ContactImportID `db:"id"`
OrgID OrgID `db:"org_id"`
Status ContactImportStatus `db:"status"`
CreatedByID UserID `db:"created_by_id"`
FinishedOn *time.Time `db:"finished_on"`

BatchStatuses []string `db:"batch_statuses"`
}

var loadContactImportSQL = `
SELECT
i.id AS "id",
i.org_id AS "org_id",
i.status AS "status",
i.created_by_id AS "created_by_id",
i.finished_on AS "finished_on"
array_agg(DISTINCT b.status) AS "batch_statuses"
FROM
contacts_contactimport i
INNER JOIN
contacts_contactimportbatch b ON b.contact_import_id = i.id
WHERE
id = $1
GROUP BY
c.id`

// LoadContactImport loads a contact import by ID
func LoadContactImport(ctx context.Context, db Queryer, id ContactImportID) (*ContactImport, error) {
i := &ContactImport{}
err := db.GetContext(ctx, i, loadContactImportSQL, id)
if err != nil {
return nil, errors.Wrapf(err, "error loading contact import id=%d", id)
}
return i, nil
}

var markContactImportFinishedSQL = `
UPDATE
contacts_contactimport
SET
status = $2,
finished_on = $3
WHERE
id = $1
`

func (i *ContactImport) MarkFinished(ctx context.Context, db Queryer, status ContactImportStatus) error {
now := dates.Now()
i.Status = status
i.FinishedOn = &now

_, err := db.ExecContext(ctx, markContactImportFinishedSQL, i.ID, i.Status, i.FinishedOn)
return errors.Wrap(err, "error marking import as finished")
}

// ContactImportBatch is a batch of contacts within a larger import
type ContactImportBatch struct {
ID ContactImportBatchID `db:"id"`
Expand Down
13 changes: 13 additions & 0 deletions core/models/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package models

import (
"context"
"fmt"
"time"

"github.com/nyaruka/mailroom/utils/dbutil"
Expand Down Expand Up @@ -34,6 +35,18 @@ type Notification struct {
ContactImportID ContactImportID `db:"contact_import_id"`
}

// NotifyImportFinished logs the the finishing of a contact import
func NotifyImportFinished(ctx context.Context, db Queryer, imp *ContactImport) error {
n := &Notification{
OrgID: imp.OrgID,
Type: NotificationTypeImportFinished,
Scope: fmt.Sprintf("contact:%d", imp.ID),
UserID: imp.CreatedByID,
}

return insertNotifications(ctx, db, []*Notification{n})
}

var ticketAssignableToles = []UserRole{UserRoleAdministrator, UserRoleEditor, UserRoleAgent}

// NotificationsFromTicketEvents logs the opening of new tickets and notifies all assignable users if tickets is not already assigned
Expand Down
34 changes: 31 additions & 3 deletions core/tasks/contacts/import_contact_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package contacts

import (
"context"
"fmt"
"time"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
Expand Down Expand Up @@ -34,9 +36,35 @@ func (t *ImportContactBatchTask) Perform(ctx context.Context, rt *runtime.Runtim
return errors.Wrapf(err, "unable to load contact import batch with id %d", t.ContactImportBatchID)
}

if err := batch.Import(ctx, rt.DB, orgID); err != nil {
return errors.Wrapf(err, "unable to import contact import batch %d", t.ContactImportBatchID)
batchErr := batch.Import(ctx, rt.DB, orgID)

// decrement the redis key that holds remaining batches to see if the overall import is now finished
rc := rt.RP.Get()
defer rc.Close()
remaining, _ := redis.Int(rc.Do("decr", fmt.Sprintf("contact_import_batches_remaining:%d", batch.ImportID)))
if remaining == 0 {
imp, err := models.LoadContactImport(ctx, rt.DB, batch.ImportID)
if err != nil {
return errors.Wrap(err, "error loading contact import")
}

// if any batch failed, then import is considered failed
status := models.ContactImportStatusComplete
for _, s := range imp.BatchStatuses {
if models.ContactImportStatus(s) == models.ContactImportStatusFailed {
status = models.ContactImportStatusFailed
break
}
}

if err := imp.MarkFinished(ctx, rt.DB, status); err != nil {
return errors.Wrap(err, "error marking import as finished")
}

if err := models.NotifyImportFinished(ctx, rt.DB, imp); err != nil {
return errors.Wrap(err, "error creating import finished notification")
}
}

return nil
return errors.Wrapf(batchErr, "unable to import contact import batch %d", t.ContactImportBatchID)
}

0 comments on commit 5297467

Please sign in to comment.