Skip to content

Commit

Permalink
Merge pull request rapidpro#444 from nyaruka/scheduled_trigger_excludes
Browse files Browse the repository at this point in the history
⏲️ Add support for exclusion groups on scheduled triggers
  • Loading branch information
rowanseymour authored Jun 21, 2021
2 parents dfacffa + cb74f37 commit 2c0f1dd
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 12 deletions.
10 changes: 9 additions & 1 deletion core/models/schedules.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,15 @@ SELECT ROW_TO_JSON(s) FROM (SELECT
triggers_trigger_groups tg
WHERE
tg.trigger_id = t.id
) tg) as group_ids
) tg) as group_ids,
(SELECT ARRAY_AGG(tg.contactgroup_id) FROM (
SELECT
tg.contactgroup_id
FROM
triggers_trigger_exclude_groups tg
WHERE
tg.trigger_id = t.id
) tg) as exclude_group_ids
FROM
triggers_trigger t JOIN
flows_flow f on t.flow_id = f.id
Expand Down
17 changes: 11 additions & 6 deletions core/models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ type FlowStart struct {
FlowID FlowID `json:"flow_id" db:"flow_id"`
FlowType FlowType `json:"flow_type"`

GroupIDs []GroupID `json:"group_ids,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`
URNs []urns.URN `json:"urns,omitempty"`
Query null.String `json:"query,omitempty" db:"query"`

CreateContact bool `json:"create_contact"`
URNs []urns.URN `json:"urns,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`
GroupIDs []GroupID `json:"group_ids,omitempty"`
ExcludeGroupIDs []GroupID `json:"exclude_group_ids,omitempty"` // used when loading scheduled triggers as flow starts
Query null.String `json:"query,omitempty" db:"query"`
CreateContact bool `json:"create_contact"`

RestartParticipants RestartParticipants `json:"restart_participants" db:"restart_participants"`
IncludeActive IncludeActive `json:"include_active" db:"include_active"`
Expand All @@ -179,6 +179,11 @@ func (s *FlowStart) WithGroupIDs(groupIDs []GroupID) *FlowStart {
s.s.GroupIDs = groupIDs
return s
}
func (s *FlowStart) ExcludeGroupIDs() []GroupID { return s.s.ExcludeGroupIDs }
func (s *FlowStart) WithExcludeGroupIDs(groupIDs []GroupID) *FlowStart {
s.s.ExcludeGroupIDs = groupIDs
return s
}

func (s *FlowStart) ContactIDs() []ContactID { return s.s.ContactIDs }
func (s *FlowStart) WithContactIDs(contactIDs []ContactID) *FlowStart {
Expand Down
43 changes: 41 additions & 2 deletions core/models/starts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"fmt"
"testing"

"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/test"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
Expand All @@ -28,15 +31,16 @@ func TestStarts(t *testing.T) {
"flow_id": %d,
"flow_type": "M",
"contact_ids": [%d, %d],
"group_ids": [6789],
"group_ids": [%d],
"exclude_group_ids": [%d],
"urns": ["tel:+12025550199"],
"query": null,
"restart_participants": true,
"include_active": true,
"parent_summary": {"uuid": "b65b1a22-db6d-4f5a-9b3d-7302368a82e6"},
"session_history": {"parent_uuid": "532a3899-492f-4ffe-aed7-e75ad524efab", "ancestors": 3, "ancestors_since_input": 1},
"extra": {"foo": "bar"}
}`, startID, testdata.Org1.ID, testdata.SingleMessage.ID, testdata.Cathy.ID, testdata.Bob.ID))
}`, startID, testdata.Org1.ID, testdata.SingleMessage.ID, testdata.Cathy.ID, testdata.Bob.ID, testdata.DoctorsGroup.ID, testdata.TestersGroup.ID))

start := &models.FlowStart{}
err := json.Unmarshal(startJSON, start)
Expand All @@ -49,6 +53,9 @@ func TestStarts(t *testing.T) {
assert.Equal(t, "", start.Query())
assert.Equal(t, models.DoRestartParticipants, start.RestartParticipants())
assert.Equal(t, models.DoIncludeActive, start.IncludeActive())
assert.Equal(t, []models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, start.ContactIDs())
assert.Equal(t, []models.GroupID{testdata.DoctorsGroup.ID}, start.GroupIDs())
assert.Equal(t, []models.GroupID{testdata.TestersGroup.ID}, start.ExcludeGroupIDs())

assert.Equal(t, json.RawMessage(`{"uuid": "b65b1a22-db6d-4f5a-9b3d-7302368a82e6"}`), start.ParentSummary())
assert.Equal(t, json.RawMessage(`{"parent_uuid": "532a3899-492f-4ffe-aed7-e75ad524efab", "ancestors": 3, "ancestors_since_input": 1}`), start.SessionHistory())
Expand Down Expand Up @@ -87,3 +94,35 @@ func TestStarts(t *testing.T) {

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart WHERE id = $1 AND status = 'C'`, []interface{}{startID}, 1)
}

func TestStartsBuilding(t *testing.T) {
uuids.SetGenerator(uuids.NewSeededGenerator(12345))
defer uuids.SetGenerator(uuids.DefaultGenerator)

start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, models.FlowTypeMessaging, testdata.Favorites.ID, models.DoRestartParticipants, models.DoIncludeActive).
WithGroupIDs([]models.GroupID{testdata.DoctorsGroup.ID}).
WithExcludeGroupIDs([]models.GroupID{testdata.TestersGroup.ID}).
WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}).
WithQuery(`language != ""`).
WithCreateContact(true)

marshalled, err := jsonx.Marshal(start)
require.NoError(t, err)

test.AssertEqualJSON(t, []byte(fmt.Sprintf(`{
"UUID": "1ae96956-4b34-433e-8d1a-f05fe6923d6d",
"contact_ids": [%d, %d],
"create_contact": true,
"created_by": "",
"exclude_group_ids": [%d],
"flow_id": %d,
"flow_type": "M",
"group_ids": [%d],
"include_active": true,
"org_id": 1,
"query": "language != \"\"",
"restart_participants": true,
"start_id": null,
"start_type": "M"
}`, testdata.Cathy.ID, testdata.Bob.ID, testdata.TestersGroup.ID, testdata.Favorites.ID, testdata.DoctorsGroup.ID)), marshalled)
}
24 changes: 21 additions & 3 deletions core/tasks/starts/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela
createdContactIDs = append(createdContactIDs, contact.ID())
}

// now add all the ids for our groups
// if we have inclusion groups, add all the contact ids from those groups
if len(start.GroupIDs()) > 0 {
rows, err := db.QueryxContext(ctx, `SELECT contact_id FROM contacts_contactgroup_contacts WHERE contactgroup_id = ANY($1)`, pq.Array(start.GroupIDs()))
if err != nil {
return errors.Wrapf(err, "error selecting contacts for groups")
return errors.Wrapf(err, "error querying contacts from inclusion groups")
}
defer rows.Close()

Expand All @@ -116,7 +116,7 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela
}
}

// finally, if we have a query, add the contacts that match that as well
// if we have a query, add the contacts that match that as well
if start.Query() != "" {
matches, err := models.ContactIDsForQuery(ctx, ec, oa, start.Query())
if err != nil {
Expand All @@ -128,6 +128,24 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela
}
}

// finally, if we have exclusion groups, remove all the contact ids from those groups
if len(start.ExcludeGroupIDs()) > 0 {
rows, err := db.QueryxContext(ctx, `SELECT contact_id FROM contacts_contactgroup_contacts WHERE contactgroup_id = ANY($1)`, pq.Array(start.ExcludeGroupIDs()))
if err != nil {
return errors.Wrapf(err, "error querying contacts from exclusion groups")
}
defer rows.Close()

var contactID models.ContactID
for rows.Next() {
err := rows.Scan(&contactID)
if err != nil {
return errors.Wrapf(err, "error scanning contact id")
}
delete(contactIDs, contactID)
}
}

rc := rp.Get()
defer rc.Close()

Expand Down
16 changes: 16 additions & 0 deletions core/tasks/starts/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestStarts(t *testing.T) {
label string
flowID models.FlowID
groupIDs []models.GroupID
excludeGroupIDs []models.GroupID
contactIDs []models.ContactID
createContact bool
query string
Expand Down Expand Up @@ -245,6 +246,20 @@ func TestStarts(t *testing.T) {
expectedStatus: models.StartStatusComplete,
expectedActiveRuns: map[models.FlowID]int{testdata.Favorites.ID: 123, testdata.PickANumber.ID: 1, testdata.SingleMessage.ID: 0},
},
{
label: "Exclude group",
flowID: testdata.Favorites.ID,
contactIDs: []models.ContactID{testdata.Cathy.ID, testdata.Bob.ID},
excludeGroupIDs: []models.GroupID{testdata.DoctorsGroup.ID}, // should exclude Cathy
restartParticipants: true,
includeActive: true,
queue: queue.HandlerQueue,
expectedContactCount: 1,
expectedBatchCount: 1,
expectedTotalCount: 1,
expectedStatus: models.StartStatusComplete,
expectedActiveRuns: map[models.FlowID]int{testdata.Favorites.ID: 124, testdata.PickANumber.ID: 0, testdata.SingleMessage.ID: 0},
},
}

for _, tc := range tcs {
Expand All @@ -253,6 +268,7 @@ func TestStarts(t *testing.T) {
// handle our start task
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, models.FlowTypeMessaging, tc.flowID, tc.restartParticipants, tc.includeActive).
WithGroupIDs(tc.groupIDs).
WithExcludeGroupIDs(tc.excludeGroupIDs).
WithContactIDs(tc.contactIDs).
WithQuery(tc.query).
WithCreateContact(tc.createContact)
Expand Down

0 comments on commit 2c0f1dd

Please sign in to comment.