Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If a flow start task creates new contacts, save those back to the start #362

Merged
merged 1 commit into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,29 @@ func MarkStartComplete(ctx context.Context, db Queryer, startID StartID) error {
}

// MarkStartStarted sets the status for the passed in flow start to S and updates the contact count on it
func MarkStartStarted(ctx context.Context, db Queryer, startID StartID, contactCount int) error {
func MarkStartStarted(ctx context.Context, db Queryer, startID StartID, contactCount int, createdContactIDs []ContactID) error {
_, err := db.ExecContext(ctx, "UPDATE flows_flowstart SET status = 'S', contact_count = $2, modified_on = NOW() WHERE id = $1", startID, contactCount)
if err != nil {
return errors.Wrapf(err, "error setting start as started")
}

// if we created contacts, add them to the start for logging
if len(createdContactIDs) > 0 {
type startContact struct {
StartID StartID `db:"flowstart_id"`
ContactID ContactID `db:"contact_id"`
}

args := make([]interface{}, len(createdContactIDs))
for i, id := range createdContactIDs {
args[i] = &startContact{StartID: startID, ContactID: id}
}
return BulkQuery(
ctx, "adding created contacts to flow start", db,
`INSERT INTO flows_flowstart_contacts(flowstart_id, contact_id) VALUES(:flowstart_id, :contact_id) ON CONFLICT DO NOTHING`,
args,
)
}
return nil
}

Expand Down
50 changes: 36 additions & 14 deletions models/starts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,49 @@ package models_test

import (
"encoding/json"
"fmt"
"testing"

"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStarts(t *testing.T) {
startJSON := []byte(`{
"start_id": 123,
ctx := testsuite.CTX()
db := testsuite.DB()

startID := testdata.InsertFlowStart(t, db, models.Org1, models.SingleMessageFlowID, []models.ContactID{models.CathyID, models.BobID})

startJSON := []byte(fmt.Sprintf(`{
"start_id": %d,
"start_type": "M",
"org_id": 12,
"org_id": %d,
"created_by": "rowan@nyaruka.com",
"flow_id": 234,
"flow_id": %d,
"flow_type": "M",
"contact_ids": [4567, 5678],
"contact_ids": [%d, %d],
"group_ids": [6789],
"urns": ["tel:+12025550199"],
"query": null,
"restart_participants": true,
"include_active": true,
"parent_summary": {"uuid": "b65b1a22-db6d-4f5a-9b3d-7302368a82e6"},
"session_history": {"parent_uuid": "532a3899-492f-4ffe-aed7-e75ad524efab", "ancestors": 3, "ancestors_since_input": 1},
"extra": {"foo": "bar"}
}`)
}`, startID, models.Org1, models.SingleMessageFlowID, models.CathyID, models.BobID))

start := &models.FlowStart{}
err := json.Unmarshal(startJSON, start)

assert.NoError(t, err)
assert.Equal(t, models.StartID(123), start.ID())
assert.Equal(t, models.OrgID(12), start.OrgID())
assert.Equal(t, models.FlowID(234), start.FlowID())
require.NoError(t, err)
assert.Equal(t, startID, start.ID())
assert.Equal(t, models.Org1, start.OrgID())
assert.Equal(t, models.SingleMessageFlowID, start.FlowID())
assert.Equal(t, models.MessagingFlow, start.FlowType())
assert.Equal(t, "", start.Query())
assert.Equal(t, models.DoRestartParticipants, start.RestartParticipants())
Expand All @@ -43,11 +54,17 @@ func TestStarts(t *testing.T) {
assert.Equal(t, json.RawMessage(`{"parent_uuid": "532a3899-492f-4ffe-aed7-e75ad524efab", "ancestors": 3, "ancestors_since_input": 1}`), start.SessionHistory())
assert.Equal(t, json.RawMessage(`{"foo": "bar"}`), start.Extra())

batch := start.CreateBatch([]models.ContactID{4567, 5678}, false, 3)
assert.Equal(t, models.StartID(123), batch.StartID())
err = models.MarkStartStarted(ctx, db, startID, 2, []models.ContactID{models.GeorgeID})
require.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart WHERE id = $1 AND status = 'S' AND contact_count = 2`, []interface{}{startID}, 1)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart_contacts WHERE flowstart_id = $1`, []interface{}{startID}, 3)

batch := start.CreateBatch([]models.ContactID{models.CathyID, models.BobID}, false, 3)
assert.Equal(t, startID, batch.StartID())
assert.Equal(t, models.StartTypeManual, batch.StartType())
assert.Equal(t, models.FlowID(234), batch.FlowID())
assert.Equal(t, []models.ContactID{4567, 5678}, batch.ContactIDs())
assert.Equal(t, models.SingleMessageFlowID, batch.FlowID())
assert.Equal(t, []models.ContactID{models.CathyID, models.BobID}, batch.ContactIDs())
assert.Equal(t, models.DoRestartParticipants, batch.RestartParticipants())
assert.Equal(t, models.DoIncludeActive, batch.IncludeActive())
assert.Equal(t, "rowan@nyaruka.com", batch.CreatedBy())
Expand All @@ -64,4 +81,9 @@ func TestStarts(t *testing.T) {

history, err = models.ReadSessionHistory([]byte(`{`))
assert.EqualError(t, err, "unexpected end of JSON input")

err = models.MarkStartComplete(ctx, db, startID)
require.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart WHERE id = $1 AND status = 'C'`, []interface{}{startID}, 1)
}
8 changes: 4 additions & 4 deletions runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import (
"testing"
"time"

"github.com/lib/pq"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/resumes"
"github.com/nyaruka/goflow/flows/triggers"
_ "github.com/nyaruka/mailroom/hooks"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/lib/pq"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -103,9 +105,7 @@ func TestBatchStart(t *testing.T) {
rp := testsuite.RP()

// create a start object
db.MustExec(
`INSERT INTO flows_flowstart(uuid, org_id, flow_id, start_type, created_on, modified_on, restart_participants, include_active, contact_count, status, created_by_id)
VALUES($1, $2, $3, 'M', NOW(), NOW(), TRUE, TRUE, 2, 'P', 1)`, uuids.New(), models.Org1, models.SingleMessageFlowID)
testdata.InsertFlowStart(t, db, models.Org1, models.SingleMessageFlowID, nil)

// and our batch object
contactIDs := []models.ContactID{models.CathyID, models.BobID}
Expand Down
10 changes: 8 additions & 2 deletions tasks/starts/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ func handleFlowStart(ctx context.Context, mr *mailroom.Mailroom, task *queue.Tas

// CreateFlowBatches takes our master flow start and creates batches of flow starts for all the unique contacts
func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *elastic.Client, start *models.FlowStart) error {
// we are building a set of contact ids, start with the explicit ones
contactIDs := make(map[models.ContactID]bool)
createdContactIDs := make([]models.ContactID, 0)

// we are building a set of contact ids, start with the explicit ones
for _, id := range start.ContactIDs() {
contactIDs[id] = true
}
Expand All @@ -78,6 +80,9 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela
return errors.Wrapf(err, "error getting contact ids from urns")
}
for _, id := range urnContactIDs {
if !contactIDs[id] {
createdContactIDs = append(createdContactIDs, id)
}
contactIDs[id] = true
}
}
Expand All @@ -89,6 +94,7 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela
return errors.Wrapf(err, "error creating new contact")
}
contactIDs[contact.ID()] = true
createdContactIDs = append(createdContactIDs, contact.ID())
}

// now add all the ids for our groups
Expand Down Expand Up @@ -125,7 +131,7 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela
defer rc.Close()

// mark our start as starting, last task will mark as complete
err = models.MarkStartStarted(ctx, db, start.ID(), len(contactIDs))
err = models.MarkStartStarted(ctx, db, start.ID(), len(contactIDs), createdContactIDs)
if err != nil {
return errors.Wrapf(err, "error marking start as started")
}
Expand Down
27 changes: 27 additions & 0 deletions testsuite/testdata/flows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package testdata

import (
"testing"

"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/mailroom/models"

"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/require"
)

// InsertFlowStart inserts a flow start
func InsertFlowStart(t *testing.T, db *sqlx.DB, orgID models.OrgID, flowID models.FlowID, contactIDs []models.ContactID) models.StartID {
var id models.StartID
err := db.Get(&id,
`INSERT INTO flows_flowstart(uuid, org_id, flow_id, start_type, created_on, modified_on, restart_participants, include_active, contact_count, status, created_by_id)
VALUES($1, $2, $3, 'M', NOW(), NOW(), TRUE, TRUE, 2, 'P', 1) RETURNING id`, uuids.New(), orgID, flowID,
)
require.NoError(t, err)

for i := range contactIDs {
db.MustExec(`INSERT INTO flows_flowstart_contacts(flowstart_id, contact_id) VALUES($1, $2)`, id, contactIDs[i])
}

return id
}