Skip to content

Commit

Permalink
Start cleaning up use of include vs exclude properties of starts
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Jun 9, 2022
1 parent b5ff945 commit 92911c3
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 220 deletions.
2 changes: 1 addition & 1 deletion core/hooks/insert_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (h *insertStartHook) Apply(ctx context.Context, rt *runtime.Runtime, tx *sq
}

// create our start
start := models.NewFlowStart(oa.OrgID(), models.StartTypeFlowAction, flow.FlowType(), flow.ID(), true).
start := models.NewFlowStart(oa.OrgID(), models.StartTypeFlowAction, flow.FlowType(), flow.ID()).
WithGroupIDs(groupIDs).
WithContactIDs(contactIDs).
WithURNs(event.URNs).
Expand Down
37 changes: 21 additions & 16 deletions core/models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,16 @@ type FlowStartBatch struct {
}
}

func (b *FlowStartBatch) StartID() StartID { return b.b.StartID }
func (b *FlowStartBatch) StartType() StartType { return b.b.StartType }
func (b *FlowStartBatch) OrgID() OrgID { return b.b.OrgID }
func (b *FlowStartBatch) CreatedByID() UserID { return b.b.CreatedByID }
func (b *FlowStartBatch) FlowID() FlowID { return b.b.FlowID }
func (b *FlowStartBatch) ContactIDs() []ContactID { return b.b.ContactIDs }
func (b *FlowStartBatch) RestartParticipants() bool { return b.b.RestartParticipants }
func (b *FlowStartBatch) IncludeActive() bool { return b.b.IncludeActive }
func (b *FlowStartBatch) IsLast() bool { return b.b.IsLast }
func (b *FlowStartBatch) TotalContacts() int { return b.b.TotalContacts }
func (b *FlowStartBatch) StartID() StartID { return b.b.StartID }
func (b *FlowStartBatch) StartType() StartType { return b.b.StartType }
func (b *FlowStartBatch) OrgID() OrgID { return b.b.OrgID }
func (b *FlowStartBatch) CreatedByID() UserID { return b.b.CreatedByID }
func (b *FlowStartBatch) FlowID() FlowID { return b.b.FlowID }
func (b *FlowStartBatch) ContactIDs() []ContactID { return b.b.ContactIDs }
func (b *FlowStartBatch) ExcludeStartedPreviously() bool { return !b.b.RestartParticipants }
func (b *FlowStartBatch) ExcludeInAFlow() bool { return !b.b.IncludeActive }
func (b *FlowStartBatch) IsLast() bool { return b.b.IsLast }
func (b *FlowStartBatch) TotalContacts() int { return b.b.TotalContacts }

func (b *FlowStartBatch) ParentSummary() json.RawMessage { return json.RawMessage(b.b.ParentSummary) }
func (b *FlowStartBatch) SessionHistory() json.RawMessage { return json.RawMessage(b.b.SessionHistory) }
Expand Down Expand Up @@ -193,8 +193,13 @@ func (s *FlowStart) WithQuery(query string) *FlowStart {
return s
}

func (s *FlowStart) RestartParticipants() bool { return s.s.RestartParticipants }
func (s *FlowStart) IncludeActive() bool { return s.s.IncludeActive }
func (s *FlowStart) ExcludeStartedPreviously() bool { return !s.s.RestartParticipants }
func (s *FlowStart) WithExcludeStartedPreviously(exclude bool) *FlowStart {
s.s.RestartParticipants = !exclude
return s
}

func (s *FlowStart) ExcludeInAFlow() bool { return !s.s.IncludeActive }
func (s *FlowStart) WithExcludeInAFlow(exclude bool) *FlowStart {
s.s.IncludeActive = !exclude
return s
Expand Down Expand Up @@ -238,14 +243,14 @@ func GetFlowStartAttributes(ctx context.Context, db Queryer, startID StartID) (*
}

// NewFlowStart creates a new flow start objects for the passed in parameters
func NewFlowStart(orgID OrgID, startType StartType, flowType FlowType, flowID FlowID, restartParticipants bool) *FlowStart {
func NewFlowStart(orgID OrgID, startType StartType, flowType FlowType, flowID FlowID) *FlowStart {
s := &FlowStart{}
s.s.UUID = uuids.New()
s.s.OrgID = orgID
s.s.StartType = startType
s.s.FlowType = flowType
s.s.FlowID = flowID
s.s.RestartParticipants = restartParticipants
s.s.RestartParticipants = true
s.s.IncludeActive = true
return s
}
Expand Down Expand Up @@ -338,8 +343,8 @@ func (s *FlowStart) CreateBatch(contactIDs []ContactID, last bool, totalContacts
b.b.FlowID = s.FlowID()
b.b.FlowType = s.FlowType()
b.b.ContactIDs = contactIDs
b.b.RestartParticipants = s.RestartParticipants()
b.b.IncludeActive = s.IncludeActive()
b.b.RestartParticipants = s.s.RestartParticipants
b.b.IncludeActive = s.s.IncludeActive
b.b.ParentSummary = null.JSON(s.ParentSummary())
b.b.SessionHistory = null.JSON(s.SessionHistory())
b.b.Extra = null.JSON(s.Extra())
Expand Down
10 changes: 5 additions & 5 deletions core/models/starts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func TestStarts(t *testing.T) {
assert.Equal(t, testdata.SingleMessage.ID, start.FlowID())
assert.Equal(t, models.FlowTypeMessaging, start.FlowType())
assert.Equal(t, "", start.Query())
assert.True(t, start.RestartParticipants())
assert.True(t, start.IncludeActive())
assert.False(t, start.ExcludeStartedPreviously())
assert.False(t, start.ExcludeInAFlow())
assert.Equal(t, []models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, start.ContactIDs())
assert.Equal(t, []models.GroupID{testdata.DoctorsGroup.ID}, start.GroupIDs())
assert.Equal(t, []models.GroupID{testdata.TestersGroup.ID}, start.ExcludeGroupIDs())
Expand All @@ -73,8 +73,8 @@ func TestStarts(t *testing.T) {
assert.Equal(t, models.StartTypeManual, batch.StartType())
assert.Equal(t, testdata.SingleMessage.ID, batch.FlowID())
assert.Equal(t, []models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, batch.ContactIDs())
assert.True(t, batch.RestartParticipants())
assert.True(t, batch.IncludeActive())
assert.False(t, batch.ExcludeStartedPreviously())
assert.False(t, batch.ExcludeInAFlow())
assert.Equal(t, testdata.Admin.ID, batch.CreatedByID())
assert.False(t, batch.IsLast())
assert.Equal(t, 3, batch.TotalContacts())
Expand All @@ -100,7 +100,7 @@ func TestStartsBuilding(t *testing.T) {
uuids.SetGenerator(uuids.NewSeededGenerator(12345))
defer uuids.SetGenerator(uuids.DefaultGenerator)

start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, models.FlowTypeMessaging, testdata.Favorites.ID, true).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, models.FlowTypeMessaging, testdata.Favorites.ID).
WithGroupIDs([]models.GroupID{testdata.DoctorsGroup.ID}).
WithExcludeGroupIDs([]models.GroupID{testdata.TestersGroup.ID}).
WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}).
Expand Down
36 changes: 18 additions & 18 deletions core/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ var startTypeToOrigin = map[models.StartType]string{

// StartOptions define the various parameters that can be used when starting a flow
type StartOptions struct {
// ExcludeWaiting excludes contacts with waiting sessions which would otherwise have to be interrupted
ExcludeWaiting bool
// ExcludeInAFlow excludes contacts with waiting sessions which would otherwise have to be interrupted
ExcludeInAFlow bool

// ExcludeReruns excludes contacts who have been in this flow previously (at least as long as we have runs for)
ExcludeReruns bool
// ExcludeStartedPreviously excludes contacts who have been in this flow previously (at least as long as we have runs for)
ExcludeStartedPreviously bool

// Interrupt should be true if we want to interrupt the flows runs for any contact started in this flow
Interrupt bool
Expand All @@ -52,9 +52,9 @@ type StartOptions struct {
// NewStartOptions creates and returns the default start options to be used for flow starts
func NewStartOptions() *StartOptions {
return &StartOptions{
ExcludeWaiting: false,
ExcludeReruns: false,
Interrupt: true,
ExcludeInAFlow: false,
ExcludeStartedPreviously: false,
Interrupt: true,
}
}

Expand Down Expand Up @@ -231,8 +231,8 @@ func StartFlowBatch(

// options for our flow start
options := NewStartOptions()
options.ExcludeReruns = !batch.RestartParticipants()
options.ExcludeWaiting = !batch.IncludeActive()
options.ExcludeStartedPreviously = batch.ExcludeStartedPreviously()
options.ExcludeInAFlow = batch.ExcludeInAFlow()
options.Interrupt = flow.FlowType().Interrupts()
options.TriggerBuilder = triggerBuilder
options.CommitHook = updateStartID
Expand Down Expand Up @@ -306,16 +306,16 @@ func FireCampaignEvents(
options := NewStartOptions()
switch dbEvent.StartMode() {
case models.StartModeInterrupt:
options.ExcludeWaiting = false
options.ExcludeReruns = false
options.ExcludeInAFlow = false
options.ExcludeStartedPreviously = false
options.Interrupt = true
case models.StartModePassive:
options.ExcludeWaiting = false
options.ExcludeReruns = false
options.ExcludeInAFlow = false
options.ExcludeStartedPreviously = false
options.Interrupt = false
case models.StartModeSkip:
options.ExcludeWaiting = true
options.ExcludeReruns = false
options.ExcludeInAFlow = true
options.ExcludeStartedPreviously = false
options.Interrupt = true
default:
return nil, errors.Errorf("unknown start mode: %s", dbEvent.StartMode())
Expand Down Expand Up @@ -417,7 +417,7 @@ func StartFlow(
exclude := make(map[models.ContactID]bool, 5)

// filter out anybody who has has a flow run in this flow if appropriate
if options.ExcludeReruns {
if options.ExcludeStartedPreviously {
// find all participants that have been in this flow
started, err := models.FindFlowStartedOverlap(ctx, rt.DB, flow.ID(), contactIDs)
if err != nil {
Expand All @@ -429,7 +429,7 @@ func StartFlow(
}

// filter out our list of contacts to only include those that should be started
if options.ExcludeWaiting {
if options.ExcludeInAFlow {
// find all participants active in any flow
active, err := models.FilterByWaitingSession(ctx, rt.DB, contactIDs)
if err != nil {
Expand Down Expand Up @@ -719,7 +719,7 @@ func TriggerIVRFlow(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID
tx, _ := rt.DB.BeginTxx(ctx, nil)

// create our start
start := models.NewFlowStart(orgID, models.StartTypeTrigger, models.FlowTypeVoice, flowID, true).
start := models.NewFlowStart(orgID, models.StartTypeTrigger, models.FlowTypeVoice, flowID).
WithContactIDs(contactIDs)

// insert it
Expand Down
43 changes: 22 additions & 21 deletions core/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,35 +192,36 @@ func TestBatchStart(t *testing.T) {
contactIDs := []models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}

tcs := []struct {
Flow models.FlowID
Restart bool
ExcludeInAFlow bool
Extra json.RawMessage
Msg string
Count int
TotalCount int
Flow models.FlowID
ExcludeStartedPreviously bool
ExcludeInAFlow bool
Extra json.RawMessage
Msg string
Count int
TotalCount int
}{
{testdata.SingleMessage.ID, true, false, nil, "Hey, how are you?", 2, 2},
{testdata.SingleMessage.ID, false, false, nil, "Hey, how are you?", 0, 2},
{testdata.SingleMessage.ID, false, true, nil, "Hey, how are you?", 0, 2},
{testdata.SingleMessage.ID, true, true, nil, "Hey, how are you?", 2, 4},
{testdata.SingleMessage.ID, false, false, nil, "Hey, how are you?", 2, 2},
{testdata.SingleMessage.ID, true, false, nil, "Hey, how are you?", 0, 2},
{testdata.SingleMessage.ID, true, true, nil, "Hey, how are you?", 0, 2},
{testdata.SingleMessage.ID, false, true, nil, "Hey, how are you?", 2, 4},
{
Flow: testdata.IncomingExtraFlow.ID,
Restart: true,
ExcludeInAFlow: true,
Extra: json.RawMessage([]byte(`{"name":"Fred", "age":33}`)),
Msg: "Great to meet you Fred. Your age is 33.",
Count: 2,
TotalCount: 2,
Flow: testdata.IncomingExtraFlow.ID,
ExcludeStartedPreviously: false,
ExcludeInAFlow: true,
Extra: json.RawMessage([]byte(`{"name":"Fred", "age":33}`)),
Msg: "Great to meet you Fred. Your age is 33.",
Count: 2,
TotalCount: 2,
},
}

last := time.Now()

for i, tc := range tcs {
start := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.FlowTypeMessaging, tc.Flow, tc.Restart).
start := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.FlowTypeMessaging, tc.Flow).
WithContactIDs(contactIDs).
WithExcludeInAFlow(tc.ExcludeInAFlow).
WithExcludeStartedPreviously(tc.ExcludeStartedPreviously).
WithExtra(tc.Extra)
batch := start.CreateBatch(contactIDs, true, len(contactIDs))

Expand Down Expand Up @@ -345,8 +346,8 @@ func TestStartFlowConcurrency(t *testing.T) {
}

options := &runner.StartOptions{
ExcludeReruns: false,
ExcludeWaiting: false,
ExcludeStartedPreviously: false,
ExcludeInAFlow: false,
TriggerBuilder: func(contact *flows.Contact) flows.Trigger {
return triggers.NewBuilder(oa.Env(), flowRef, contact).Manual().Build()
},
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/ivr/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestRetries(t *testing.T) {
db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

// create a flow start for cathy
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, true).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

// call our master starter
Expand Down
4 changes: 2 additions & 2 deletions core/tasks/ivr/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func HandleFlowStartBatch(bg context.Context, rt *runtime.Runtime, batch *models
exclude := make(map[models.ContactID]bool, 5)

// filter out anybody who has has a flow run in this flow if appropriate
if !batch.RestartParticipants() {
if batch.ExcludeStartedPreviously() {
// find all participants that have been in this flow
started, err := models.FindFlowStartedOverlap(ctx, rt.DB, batch.FlowID(), batch.ContactIDs())
if err != nil {
Expand All @@ -53,7 +53,7 @@ func HandleFlowStartBatch(bg context.Context, rt *runtime.Runtime, batch *models
}

// filter out our list of contacts to only include those that should be started
if !batch.IncludeActive() {
if batch.ExcludeInAFlow() {
// find all participants active in other sessions
active, err := models.FilterByWaitingSession(ctx, rt.DB, batch.ContactIDs())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/ivr/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestIVR(t *testing.T) {
db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

// create a flow start for cathy
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, true).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

// call our master starter
Expand Down
Loading

0 comments on commit 92911c3

Please sign in to comment.