Skip to content

Commit

Permalink
Update to goflow v0.87.0
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Jun 3, 2020
1 parent 89091fe commit e903901
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 32 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.2.0
github.com/nyaruka/goflow v0.86.2
github.com/nyaruka/goflow v0.87.0
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
github.com/nyaruka/gocommon v1.2.0 h1:gCmVCXYZFwKDMqQj8R1jNlK+7a06khKFq3zX8fBBbzw=
github.com/nyaruka/gocommon v1.2.0/go.mod h1:9Y21Fd6iZXDLHWTRiZAc6b4LQSCi6HEEQK4SB45Yav4=
github.com/nyaruka/goflow v0.86.2 h1:VU2ZJLIZv9IXWz75BPz55SZfak+fUh69FR6tsRa6Rbg=
github.com/nyaruka/goflow v0.86.2/go.mod h1:HBoTXbhrjhZENbCUlDvh8ZCR16ZvlgY4aTE/ABMS8uE=
github.com/nyaruka/goflow v0.87.0 h1:JwClty7iwg4iitzitDQulqjkbRMANa0Z//yd2wfqfBQ=
github.com/nyaruka/goflow v0.87.0/go.mod h1:HBoTXbhrjhZENbCUlDvh8ZCR16ZvlgY4aTE/ABMS8uE=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
8 changes: 1 addition & 7 deletions hooks/broadcast_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,10 @@ func (h *StartBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.
defer rc.Close()

// for each of our scene
for s, es := range scenes {
for _, es := range scenes {
for _, e := range es {
event := e.(*events.BroadcastCreatedEvent)

// we skip over any scene starts that involve groups if we are in a batch start
if len(scenes) > 1 && len(event.Groups) > 0 {
logrus.WithField("session_id", s.SessionID).Error("ignoring broadcast on group in batch")
continue
}

bcast, err := models.NewBroadcastFromEvent(ctx, tx, org, event)
if err != nil {
return errors.Wrapf(err, "error creating broadcast")
Expand Down
2 changes: 1 addition & 1 deletion hooks/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func RunHookTestCases(t *testing.T, tcs []HookTestCase) {
options.TriggerBuilder = func(contact *flows.Contact) (flows.Trigger, error) {
msg := tc.Msgs[models.ContactID(contact.ID())]
if msg == nil {
return triggers.NewManual(org.Env(), flow.FlowReference(), contact, nil), nil
return triggers.NewManual(org.Env(), flow.FlowReference(), contact, false, nil), nil
}
return triggers.NewMsg(org.Env(), flow.FlowReference(), contact, msg, nil), nil
}
Expand Down
6 changes: 0 additions & 6 deletions hooks/session_triggered.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ func (h *InsertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool
for _, e := range es {
event := e.(*events.SessionTriggeredEvent)

// we skip over any scene starts that involve groups if we are in a batch start
if len(scenes) > 1 && (len(event.Groups) > 0 || event.ContactQuery != "") {
logrus.WithField("session_id", s.SessionID).Error("ignoring scene trigger on group or query in batch")
continue
}

// look up our flow
f, err := org.Flow(event.Flow.UUID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,12 @@ func StartIVRFlow(

var trigger flows.Trigger
if len(start.ParentSummary()) > 0 {
trigger, err = triggers.NewFlowActionVoice(org.Env(), flowRef, contact, connRef, start.ParentSummary())
trigger, err = triggers.NewFlowActionVoice(org.Env(), flowRef, contact, connRef, start.ParentSummary(), false)
if err != nil {
return errors.Wrap(err, "unable to create flow action trigger")
}
} else {
trigger = triggers.NewManualVoice(org.Env(), flowRef, contact, connRef, params)
trigger = triggers.NewManualVoice(org.Env(), flowRef, contact, connRef, false, params)
}

// mark our connection as started
Expand Down
9 changes: 6 additions & 3 deletions models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ type FlowStartBatch struct {
RestartParticipants RestartParticipants `json:"restart_participants"`
IncludeActive IncludeActive `json:"include_active"`

IsLast bool `json:"is_last,omitempty"`
IsLast bool `json:"is_last,omitempty"`
TotalContacts int `json:"total_contacts"`
}
}

Expand All @@ -84,7 +85,7 @@ func (b *FlowStartBatch) ContactIDs() []ContactID { return b.b.
func (b *FlowStartBatch) RestartParticipants() RestartParticipants { return b.b.RestartParticipants }
func (b *FlowStartBatch) IncludeActive() IncludeActive { return b.b.IncludeActive }
func (b *FlowStartBatch) IsLast() bool { return b.b.IsLast }
func (b *FlowStartBatch) SetIsLast(last bool) { b.b.IsLast = last }
func (b *FlowStartBatch) TotalContacts() int { return b.b.TotalContacts }

func (b *FlowStartBatch) ParentSummary() json.RawMessage { return json.RawMessage(b.b.ParentSummary) }
func (b *FlowStartBatch) Extra() json.RawMessage { return json.RawMessage(b.b.Extra) }
Expand Down Expand Up @@ -280,7 +281,7 @@ INSERT INTO
`

// CreateBatch creates a batch for this start using the passed in contact ids
func (s *FlowStart) CreateBatch(contactIDs []ContactID) *FlowStartBatch {
func (s *FlowStart) CreateBatch(contactIDs []ContactID, last bool, totalContacts int) *FlowStartBatch {
b := &FlowStartBatch{}
b.b.StartID = s.ID()
b.b.OrgID = s.OrgID()
Expand All @@ -291,6 +292,8 @@ func (s *FlowStart) CreateBatch(contactIDs []ContactID) *FlowStartBatch {
b.b.IncludeActive = s.IncludeActive()
b.b.ParentSummary = null.JSON(s.ParentSummary())
b.b.Extra = null.JSON(s.Extra())
b.b.IsLast = last
b.b.TotalContacts = totalContacts
return b
}

Expand Down
12 changes: 7 additions & 5 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,22 @@ func StartFlowBatch(
}
}

// whether engine allows some functions is based on whether there is more than one contact being started
batchStart := batch.TotalContacts() > 1

// this will build our trigger for each contact started
triggerBuilder := func(contact *flows.Contact) (flows.Trigger, error) {
if batch.ParentSummary() != nil {
trigger, err := triggers.NewFlowAction(org.Env(), flow.FlowReference(), contact, batch.ParentSummary())
trigger, err := triggers.NewFlowAction(org.Env(), flow.FlowReference(), contact, batch.ParentSummary(), batchStart)
if err != nil {
return nil, errors.Wrap(err, "unable to create flow action trigger")
}
return trigger, nil
}
if batch.Extra() != nil {
return triggers.NewManual(org.Env(), flow.FlowReference(), contact, params), nil
return triggers.NewManual(org.Env(), flow.FlowReference(), contact, batchStart, params), nil
}
return triggers.NewManual(org.Env(), flow.FlowReference(), contact, nil), nil
return triggers.NewManual(org.Env(), flow.FlowReference(), contact, batchStart, nil), nil
}

// before committing our runs we want to set the start they are associated with
Expand Down Expand Up @@ -720,8 +723,7 @@ func TriggerIVRFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, orgID mode
}

// create our batch of all our contacts
task := start.CreateBatch(contactIDs)
task.SetIsLast(true)
task := start.CreateBatch(contactIDs, true, len(contactIDs))

// queue this to our ivr starter, it will take care of creating the connections then calling back in
rc := rp.Get()
Expand Down
5 changes: 2 additions & 3 deletions runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ func TestBatchStart(t *testing.T) {
start := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.MessagingFlow, tc.Flow, tc.Restart, tc.IncludeActive).
WithContactIDs(contactIDs).
WithExtra(tc.Extra)
batch := start.CreateBatch(contactIDs)
batch.SetIsLast(true)
batch := start.CreateBatch(contactIDs, true, len(contactIDs))

sessions, err := StartFlowBatch(ctx, db, rp, batch)
assert.NoError(t, err)
Expand Down Expand Up @@ -197,7 +196,7 @@ func TestContactRuns(t *testing.T) {
contact, err := contacts[0].FlowContact(org)
assert.NoError(t, err)

trigger := triggers.NewManual(org.Env(), flow.FlowReference(), contact, nil)
trigger := triggers.NewManual(org.Env(), flow.FlowReference(), contact, false, nil)
sessions, err := StartFlowForContacts(ctx, db, rp, org, flow, []flows.Trigger{trigger}, nil, true)
assert.NoError(t, err)
assert.NotNil(t, sessions)
Expand Down
3 changes: 1 addition & 2 deletions tasks/starts/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela

contacts := make([]models.ContactID, 0, 100)
queueBatch := func(last bool) {
batch := start.CreateBatch(contacts)
batch.SetIsLast(last)
batch := start.CreateBatch(contacts, last, len(contactIDs))
err = queue.AddTask(rc, q, taskType, int(start.OrgID()), batch, queue.DefaultPriority)
if err != nil {
// TODO: is continuing the right thing here? what do we do if redis is down? (panic!)
Expand Down

0 comments on commit e903901

Please sign in to comment.