Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Sep 23, 2020
1 parent ea6d738 commit 7659543
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 30 deletions.
1 change: 1 addition & 0 deletions cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "github.com/nyaruka/mailroom/services/tickets/zendesk"
_ "github.com/nyaruka/mailroom/tasks/broadcasts"
_ "github.com/nyaruka/mailroom/tasks/campaigns"
_ "github.com/nyaruka/mailroom/tasks/contacts"
_ "github.com/nyaruka/mailroom/tasks/expirations"
_ "github.com/nyaruka/mailroom/tasks/groups"
_ "github.com/nyaruka/mailroom/tasks/interrupts"
Expand Down
Binary file modified mailroom_test.dump
Binary file not shown.
176 changes: 176 additions & 0 deletions models/imports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package models

import (
"context"
"encoding/json"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/flows"
"github.com/pkg/errors"

"github.com/jmoiron/sqlx"
)

// ContactImportID is the type for contact import IDs
type ContactImportID int64

// ContactImportBatchID is the type for contact import batch IDs
type ContactImportBatchID int64

// ContactImportStatus is the status of an import
type ContactImportStatus string

// import status constants
const (
ContactImportStatusPending ContactImportStatus = "P"
ContactImportStatusProcessing ContactImportStatus = "O"
ContactImportStatusComplete ContactImportStatus = "C"
ContactImportStatusFailed ContactImportStatus = "F"
)

// ContactImportBatch is a batch of contacts within a larger import
type ContactImportBatch struct {
ID ContactImportBatchID `db:"id"`
ImportID ContactImportID `db:"contact_import_id"`
Status ContactImportStatus `db:"status"`
Specs json.RawMessage `db:"specs"`

// the range of records from the entire import contained in this batch
RecordStart int `db:"record_start"`
RecordEnd int `db:"record_end"`

// results written after processing this batch
NumCreated int `db:"num_created"`
NumUpdated int `db:"num_updated"`
NumErrored int `db:"num_errored"`
Errors json.RawMessage `db:"errors"`
FinishedOn *time.Time `db:"finished_on"`
}

// Import does the actual import of this batch
func (b *ContactImportBatch) Import(ctx context.Context, db *sqlx.DB) error {
// if any error occurs this batch should be marked as failed
if err := b.tryImport(ctx, db); err != nil {
b.markFailed(ctx, db)
return err
}
return nil
}

func (b *ContactImportBatch) tryImport(ctx context.Context, db *sqlx.DB) error {
if err := b.markProcessing(ctx, db); err != nil {
return errors.Wrap(err, "unable to mark as processing")
}

// unmarshal this batch's specs
var specs []*ContactSpec
if err := jsonx.Unmarshal(b.Specs, &specs); err != nil {
return errors.New("unable to unmarsal specs")
}

result, err := importContactSpecs(ctx, db, specs)
if err != nil {
return errors.Wrap(err, "unable to import specs")
}

if err := b.markComplete(ctx, db, result); err != nil {
return errors.Wrap(err, "unable to mark as complete")
}

return nil
}

func (b *ContactImportBatch) markProcessing(ctx context.Context, db *sqlx.DB) error {
b.Status = ContactImportStatusProcessing
_, err := db.ExecContext(ctx, `UPDATE contacts_contactimportbatch SET status = $2 WHERE id = $1`, b.ID, b.Status)
return err
}

func (b *ContactImportBatch) markComplete(ctx context.Context, db *sqlx.DB, r importResult) error {
errorsJSON, err := jsonx.Marshal(r.errors)
if err != nil {
return errors.Wrap(err, "error marshaling errors")
}

now := dates.Now()
b.Status = ContactImportStatusComplete
b.NumCreated = r.numCreated
b.NumUpdated = r.numUpdated
b.NumErrored = r.numErrored
b.Errors = errorsJSON
b.FinishedOn = &now
_, err = db.ExecContext(ctx,
`UPDATE contacts_contactimportbatch SET status = $2, num_created = $3, num_updated = $4, num_errored = $5, errors = $6, finished_on = $7 WHERE id = $1`,
b.ID, b.Status, b.NumCreated, b.NumUpdated, b.NumErrored, b.Errors, b.FinishedOn,
)
return err
}

func (b *ContactImportBatch) markFailed(ctx context.Context, db *sqlx.DB) error {
now := dates.Now()
b.Status = ContactImportStatusFailed
b.FinishedOn = &now
_, err := db.ExecContext(ctx, `UPDATE contacts_contactimportbatch SET status = $2, finished_on = $3 WHERE id = $1`, b.ID, b.Status, b.FinishedOn)
return err
}

var loadContactImportBatchSQL = `
SELECT
id,
contact_import_id,
status,
specs,
record_start,
record_end
FROM
contacts_contactimportbatch
WHERE
id = $1`

// LoadContactImportBatch loads a contact import batch by ID
func LoadContactImportBatch(ctx context.Context, db Queryer, id ContactImportBatchID) (*ContactImportBatch, error) {
b := &ContactImportBatch{}
err := db.GetContext(ctx, b, loadContactImportBatchSQL, id)
if err != nil {
return nil, err
}
return b, nil
}

// ContactSpec describes a contact to be updated or created
type ContactSpec struct {
UUID flows.ContactUUID `json:"uuid"`
Name *string `json:"name"`
Language *string `json:"language"`
URNs []urns.URN `json:"urns"`
Fields map[string]string `json:"fields"`
Groups []assets.GroupUUID `json:"groups"`
}

// an error message associated with a particular record
type importError struct {
Record int `json:"record"`
Message string `json:"message"`
}

// holds the result of importing a set of contact specs
type importResult struct {
numCreated int
numUpdated int
numErrored int
errors []importError
}

func importContactSpecs(ctx context.Context, db *sqlx.DB, specs []*ContactSpec) (importResult, error) {
res := importResult{
errors: make([]importError, 0),
}

// TODO

return res, nil
}
68 changes: 68 additions & 0 deletions models/imports_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package models_test

import (
"encoding/json"
"testing"

"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestContactImportBatch(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()

importID := testdata.InsertContactImport(t, db, models.Org1)
batchID := testdata.InsertContactImportBatch(t, db, importID, `[{"name": "Bob"},{"name": "Jim"}]`)

batch, err := models.LoadContactImportBatch(ctx, db, batchID)
require.NoError(t, err)

assert.Equal(t, importID, batch.ImportID)
assert.Equal(t, models.ContactImportStatus("P"), batch.Status)
assert.Equal(t, json.RawMessage(`[{"name": "Bob"}, {"name": "Jim"}]`), batch.Specs)
assert.Equal(t, 10, batch.RecordStart)
assert.Equal(t, 12, batch.RecordEnd)

err = batch.Import(ctx, db)
require.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contactimportbatch WHERE status = 'C' AND finished_on IS NOT NULL`, []interface{}{}, 1)
}

func TestContactSpecUnmarshal(t *testing.T) {
s := &models.ContactSpec{}
jsonx.Unmarshal([]byte(`{}`), s)

assert.Equal(t, flows.ContactUUID(""), s.UUID)
assert.Nil(t, s.Name)
assert.Nil(t, s.Language)
assert.Nil(t, s.URNs)
assert.Nil(t, s.Fields)
assert.Nil(t, s.Groups)

s = &models.ContactSpec{}
jsonx.Unmarshal([]byte(`{
"uuid": "8e879527-7e6d-4bff-abc8-b1d41cd4f702",
"name": "Bob",
"language": "spa",
"urns": ["tel:+1234567890"],
"fields": {"age": "39"},
"groups": ["3972dcc2-6749-4761-a896-7880d6165f2c"]
}`), s)

assert.Equal(t, flows.ContactUUID("8e879527-7e6d-4bff-abc8-b1d41cd4f702"), s.UUID)
assert.Equal(t, "Bob", *s.Name)
assert.Equal(t, "spa", *s.Language)
assert.Equal(t, []urns.URN{"tel:+1234567890"}, s.URNs)
assert.Equal(t, map[string]string{"age": "39"}, s.Fields)
assert.Equal(t, []assets.GroupUUID{"3972dcc2-6749-4761-a896-7880d6165f2c"}, s.Groups)
}
42 changes: 42 additions & 0 deletions tasks/contacts/import_contact_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package contacts

import (
"context"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/tasks"
"github.com/pkg/errors"
)

// TypeImportContactBatch is the type of the import contact batch task
const TypeImportContactBatch = "import_contact_batch"

func init() {
tasks.RegisterType(TypeImportContactBatch, func() tasks.Task { return &ImportContactBatchTask{} })
}

// ImportContactBatchTask is our task to import a batch of contacts
type ImportContactBatchTask struct {
ContactImportBatchID models.ContactImportBatchID `json:"contact_import_batch_id"`
}

// Timeout is the maximum amount of time the task can run for
func (t *ImportContactBatchTask) Timeout() time.Duration {
return time.Minute * 10
}

// Perform figures out the membership for a query based group then repopulates it
func (t *ImportContactBatchTask) Perform(ctx context.Context, mr *mailroom.Mailroom, orgID models.OrgID) error {
batch, err := models.LoadContactImportBatch(ctx, mr.DB, t.ContactImportBatchID)
if err != nil {
return errors.Wrapf(err, "unable to load contact import batch with id %d", t.ContactImportBatchID)
}

if err := batch.Import(ctx, mr.DB); err != nil {
return errors.Wrapf(err, "unable to import contact import batch %d", t.ContactImportBatchID)
}

return nil
}
29 changes: 29 additions & 0 deletions tasks/contacts/import_contact_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package contacts_test

import (
"testing"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/tasks/contacts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/stretchr/testify/require"
)

func TestImportContactBatch(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()

mr := &mailroom.Mailroom{Config: config.Mailroom, DB: db, RP: testsuite.RP(), ElasticClient: nil}

importID := testdata.InsertContactImport(t, db, models.Org1)
batchID := testdata.InsertContactImportBatch(t, db, importID, `[{"name": "Bob"},{"name": "Jim"}]`)

task := &contacts.ImportContactBatchTask{ContactImportBatchID: batchID}

err := task.Perform(ctx, mr, models.Org1)
require.NoError(t, err)
}
29 changes: 29 additions & 0 deletions testsuite/testdata/imports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package testdata

import (
"testing"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/mailroom/models"

"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/require"
)

// InsertContactImport inserts a contact import
func InsertContactImport(t *testing.T, db *sqlx.DB, orgID models.OrgID) models.ContactImportID {
var importID models.ContactImportID
err := db.Get(&importID, `INSERT INTO contacts_contactimport(org_id, file, original_filename, headers, mappings, num_records, group_id, started_on, created_on, created_by_id, modified_on, modified_by_id, is_active)
VALUES($1, 'contact_imports/1234.xlsx', 'contacts.xlsx', '{"Name", "URN:Tel"}', '{}', 30, NULL, $2, $2, 1, $2, 1, TRUE) RETURNING id`, models.Org1, dates.Now())
require.NoError(t, err)
return importID
}

// InsertContactImportBatch inserts a contact import batch
func InsertContactImportBatch(t *testing.T, db *sqlx.DB, importID models.ContactImportID, specs string) models.ContactImportBatchID {
var batchID models.ContactImportBatchID
err := db.Get(&batchID, `INSERT INTO contacts_contactimportbatch(contact_import_id, status, specs, record_start, record_end, num_created, num_updated, num_errored, errors, finished_on)
VALUES($1, 'P', $2, 10, 12, 0, 0, 0, '[]', NULL) RETURNING id`, importID, specs)
require.NoError(t, err)
return batchID
}
8 changes: 4 additions & 4 deletions web/contact/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func init() {
// }
//
type createRequest struct {
OrgID models.OrgID `json:"org_id" validate:"required"`
UserID models.UserID `json:"user_id"`
Contact *Spec `json:"contact" validate:"required"`
OrgID models.OrgID `json:"org_id" validate:"required"`
UserID models.UserID `json:"user_id"`
Contact *models.ContactSpec `json:"contact" validate:"required"`
}

// handles a request to create the given contacts
Expand All @@ -52,7 +52,7 @@ func handleCreate(ctx context.Context, s *web.Server, r *http.Request) (interfac
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
}

c, err := request.Contact.Validate(oa.Env(), oa.SessionAssets())
c, err := SpecToCreation(request.Contact, oa.Env(), oa.SessionAssets())
if err != nil {
return err, http.StatusBadRequest, nil
}
Expand Down
Loading

0 comments on commit 7659543

Please sign in to comment.