Skip to content

Commit

Permalink
Merge pull request #362 from nyaruka/start_contacts
Browse files Browse the repository at this point in the history
If a flow start task creates new contacts, save those back to the start
  • Loading branch information
rowanseymour authored Oct 1, 2020
2 parents a5787d4 + d671b40 commit e32d6a6
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 21 deletions.
20 changes: 19 additions & 1 deletion models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,29 @@ func MarkStartComplete(ctx context.Context, db Queryer, startID StartID) error {
}

// MarkStartStarted sets the status for the passed in flow start to S and updates the contact count on it
func MarkStartStarted(ctx context.Context, db Queryer, startID StartID, contactCount int) error {
func MarkStartStarted(ctx context.Context, db Queryer, startID StartID, contactCount int, createdContactIDs []ContactID) error {
_, err := db.ExecContext(ctx, "UPDATE flows_flowstart SET status = 'S', contact_count = $2, modified_on = NOW() WHERE id = $1", startID, contactCount)
if err != nil {
return errors.Wrapf(err, "error setting start as started")
}

// if we created contacts, add them to the start for logging
if len(createdContactIDs) > 0 {
type startContact struct {
StartID StartID `db:"flowstart_id"`
ContactID ContactID `db:"contact_id"`
}

args := make([]interface{}, len(createdContactIDs))
for i, id := range createdContactIDs {
args[i] = &startContact{StartID: startID, ContactID: id}
}
return BulkQuery(
ctx, "adding created contacts to flow start", db,
`INSERT INTO flows_flowstart_contacts(flowstart_id, contact_id) VALUES(:flowstart_id, :contact_id) ON CONFLICT DO NOTHING`,
args,
)
}
return nil
}

Expand Down
50 changes: 36 additions & 14 deletions models/starts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,49 @@ package models_test

import (
"encoding/json"
"fmt"
"testing"

"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStarts(t *testing.T) {
startJSON := []byte(`{
"start_id": 123,
ctx := testsuite.CTX()
db := testsuite.DB()

startID := testdata.InsertFlowStart(t, db, models.Org1, models.SingleMessageFlowID, []models.ContactID{models.CathyID, models.BobID})

startJSON := []byte(fmt.Sprintf(`{
"start_id": %d,
"start_type": "M",
"org_id": 12,
"org_id": %d,
"created_by": "rowan@nyaruka.com",
"flow_id": 234,
"flow_id": %d,
"flow_type": "M",
"contact_ids": [4567, 5678],
"contact_ids": [%d, %d],
"group_ids": [6789],
"urns": ["tel:+12025550199"],
"query": null,
"restart_participants": true,
"include_active": true,
"parent_summary": {"uuid": "b65b1a22-db6d-4f5a-9b3d-7302368a82e6"},
"session_history": {"parent_uuid": "532a3899-492f-4ffe-aed7-e75ad524efab", "ancestors": 3, "ancestors_since_input": 1},
"extra": {"foo": "bar"}
}`)
}`, startID, models.Org1, models.SingleMessageFlowID, models.CathyID, models.BobID))

start := &models.FlowStart{}
err := json.Unmarshal(startJSON, start)

assert.NoError(t, err)
assert.Equal(t, models.StartID(123), start.ID())
assert.Equal(t, models.OrgID(12), start.OrgID())
assert.Equal(t, models.FlowID(234), start.FlowID())
require.NoError(t, err)
assert.Equal(t, startID, start.ID())
assert.Equal(t, models.Org1, start.OrgID())
assert.Equal(t, models.SingleMessageFlowID, start.FlowID())
assert.Equal(t, models.MessagingFlow, start.FlowType())
assert.Equal(t, "", start.Query())
assert.Equal(t, models.DoRestartParticipants, start.RestartParticipants())
Expand All @@ -43,11 +54,17 @@ func TestStarts(t *testing.T) {
assert.Equal(t, json.RawMessage(`{"parent_uuid": "532a3899-492f-4ffe-aed7-e75ad524efab", "ancestors": 3, "ancestors_since_input": 1}`), start.SessionHistory())
assert.Equal(t, json.RawMessage(`{"foo": "bar"}`), start.Extra())

batch := start.CreateBatch([]models.ContactID{4567, 5678}, false, 3)
assert.Equal(t, models.StartID(123), batch.StartID())
err = models.MarkStartStarted(ctx, db, startID, 2, []models.ContactID{models.GeorgeID})
require.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart WHERE id = $1 AND status = 'S' AND contact_count = 2`, []interface{}{startID}, 1)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart_contacts WHERE flowstart_id = $1`, []interface{}{startID}, 3)

batch := start.CreateBatch([]models.ContactID{models.CathyID, models.BobID}, false, 3)
assert.Equal(t, startID, batch.StartID())
assert.Equal(t, models.StartTypeManual, batch.StartType())
assert.Equal(t, models.FlowID(234), batch.FlowID())
assert.Equal(t, []models.ContactID{4567, 5678}, batch.ContactIDs())
assert.Equal(t, models.SingleMessageFlowID, batch.FlowID())
assert.Equal(t, []models.ContactID{models.CathyID, models.BobID}, batch.ContactIDs())
assert.Equal(t, models.DoRestartParticipants, batch.RestartParticipants())
assert.Equal(t, models.DoIncludeActive, batch.IncludeActive())
assert.Equal(t, "rowan@nyaruka.com", batch.CreatedBy())
Expand All @@ -64,4 +81,9 @@ func TestStarts(t *testing.T) {

history, err = models.ReadSessionHistory([]byte(`{`))
assert.EqualError(t, err, "unexpected end of JSON input")

err = models.MarkStartComplete(ctx, db, startID)
require.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart WHERE id = $1 AND status = 'C'`, []interface{}{startID}, 1)
}
8 changes: 4 additions & 4 deletions runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import (
"testing"
"time"

"github.com/lib/pq"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/resumes"
"github.com/nyaruka/goflow/flows/triggers"
_ "github.com/nyaruka/mailroom/hooks"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/lib/pq"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -103,9 +105,7 @@ func TestBatchStart(t *testing.T) {
rp := testsuite.RP()

// create a start object
db.MustExec(
`INSERT INTO flows_flowstart(uuid, org_id, flow_id, start_type, created_on, modified_on, restart_participants, include_active, contact_count, status, created_by_id)
VALUES($1, $2, $3, 'M', NOW(), NOW(), TRUE, TRUE, 2, 'P', 1)`, uuids.New(), models.Org1, models.SingleMessageFlowID)
testdata.InsertFlowStart(t, db, models.Org1, models.SingleMessageFlowID, nil)

// and our batch object
contactIDs := []models.ContactID{models.CathyID, models.BobID}
Expand Down
10 changes: 8 additions & 2 deletions tasks/starts/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ func handleFlowStart(ctx context.Context, mr *mailroom.Mailroom, task *queue.Tas

// CreateFlowBatches takes our master flow start and creates batches of flow starts for all the unique contacts
func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *elastic.Client, start *models.FlowStart) error {
// we are building a set of contact ids, start with the explicit ones
contactIDs := make(map[models.ContactID]bool)
createdContactIDs := make([]models.ContactID, 0)

// we are building a set of contact ids, start with the explicit ones
for _, id := range start.ContactIDs() {
contactIDs[id] = true
}
Expand All @@ -78,6 +80,9 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela
return errors.Wrapf(err, "error getting contact ids from urns")
}
for _, id := range urnContactIDs {
if !contactIDs[id] {
createdContactIDs = append(createdContactIDs, id)
}
contactIDs[id] = true
}
}
Expand All @@ -89,6 +94,7 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela
return errors.Wrapf(err, "error creating new contact")
}
contactIDs[contact.ID()] = true
createdContactIDs = append(createdContactIDs, contact.ID())
}

// now add all the ids for our groups
Expand Down Expand Up @@ -125,7 +131,7 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela
defer rc.Close()

// mark our start as starting, last task will mark as complete
err = models.MarkStartStarted(ctx, db, start.ID(), len(contactIDs))
err = models.MarkStartStarted(ctx, db, start.ID(), len(contactIDs), createdContactIDs)
if err != nil {
return errors.Wrapf(err, "error marking start as started")
}
Expand Down
27 changes: 27 additions & 0 deletions testsuite/testdata/flows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package testdata

import (
"testing"

"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/mailroom/models"

"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/require"
)

// InsertFlowStart inserts a flow start
func InsertFlowStart(t *testing.T, db *sqlx.DB, orgID models.OrgID, flowID models.FlowID, contactIDs []models.ContactID) models.StartID {
var id models.StartID
err := db.Get(&id,
`INSERT INTO flows_flowstart(uuid, org_id, flow_id, start_type, created_on, modified_on, restart_participants, include_active, contact_count, status, created_by_id)
VALUES($1, $2, $3, 'M', NOW(), NOW(), TRUE, TRUE, 2, 'P', 1) RETURNING id`, uuids.New(), orgID, flowID,
)
require.NoError(t, err)

for i := range contactIDs {
db.MustExec(`INSERT INTO flows_flowstart_contacts(flowstart_id, contact_id) VALUES($1, $2)`, id, contactIDs[i])
}

return id
}

0 comments on commit e32d6a6

Please sign in to comment.