diff --git a/hooks/airtime_transferred.go b/hooks/airtime_transferred.go index 08a7e5719..0cd047127 100644 --- a/hooks/airtime_transferred.go +++ b/hooks/airtime_transferred.go @@ -25,11 +25,11 @@ type InsertAirtimeTransfersHook struct{} var insertAirtimeTransfersHook = &InsertAirtimeTransfersHook{} // Apply inserts all the airtime transfers that were created -func (h *InsertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *InsertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // gather all our transfers - transfers := make([]*models.AirtimeTransfer, 0, len(sessions)) + transfers := make([]*models.AirtimeTransfer, 0, len(scenes)) - for _, ts := range sessions { + for _, ts := range scenes { for _, t := range ts { transfer := t.(*models.AirtimeTransfer) transfers = append(transfers, transfer) @@ -43,7 +43,7 @@ func (h *InsertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp } // gather all our logs and set the newly inserted transfer IDs on them - logs := make([]*models.HTTPLog, 0, len(sessions)) + logs := make([]*models.HTTPLog, 0, len(scenes)) for _, t := range transfers { for _, l := range t.Logs { @@ -62,9 +62,14 @@ func (h *InsertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp } // handleAirtimeTransferred is called for each airtime transferred event -func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.AirtimeTransferredEvent) + // must be in a session + if scene.Session() == nil { + return errors.Errorf("cannot handle airtime transferred event without session") + } + status := models.AirtimeTransferStatusSuccess if event.ActualAmount == decimal.Zero { status = models.AirtimeTransferStatusFailed @@ -73,7 +78,7 @@ func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, transfer := models.NewAirtimeTransfer( org.OrgID(), status, - session.ContactID(), + scene.ContactID(), event.Sender, event.Recipient, event.Currency, @@ -83,8 +88,8 @@ func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, ) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "sender": string(event.Sender), "recipient": string(event.Recipient), "currency": event.Currency, @@ -105,7 +110,7 @@ func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, )) } - session.AddPreCommitEvent(insertAirtimeTransfersHook, transfer) + scene.AddPreCommitEvent(insertAirtimeTransfersHook, transfer) return nil } diff --git a/hooks/broadcast_created.go b/hooks/broadcast_created.go index eb1d350c7..f05ae6c4a 100644 --- a/hooks/broadcast_created.go +++ b/hooks/broadcast_created.go @@ -17,23 +17,23 @@ func init() { models.RegisterEventHook(events.TypeBroadcastCreated, handleBroadcastCreated) } -// StartBroadcastsHook is our hook for starting the broadcasts created in these sessions +// StartBroadcastsHook is our hook for starting the broadcasts created in these scene type StartBroadcastsHook struct{} var startBroadcastsHook = &StartBroadcastsHook{} // Apply queues up our broadcasts for sending -func (h *StartBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *StartBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { rc := rp.Get() defer rc.Close() - // for each of our sessions - for s, es := range sessions { + // for each of our scene + for s, es := range scene { for _, e := range es { event := e.(*events.BroadcastCreatedEvent) - // we skip over any session starts that involve groups if we are in a batch start - if len(sessions) > 1 && len(event.Groups) > 0 { + // we skip over any scene starts that involve groups if we are in a batch start + if len(scene) > 1 && len(event.Groups) > 0 { logrus.WithField("session_id", s.ID).Error("ignoring broadcast on group in batch") continue } @@ -62,17 +62,22 @@ func (h *StartBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis. return nil } -// handleBroadcastCreated is called for each broadcast created event across our sessions -func handleBroadcastCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +// handleBroadcastCreated is called for each broadcast created event across our scene +func handleBroadcastCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { + // must be in a session + if scene.Session() == nil { + return errors.Errorf("cannot handle broadcast created event without session") + } + event := e.(*events.BroadcastCreatedEvent) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "translations": event.Translations[event.BaseLanguage], }).Debug("broadcast created") - // schedule this for being started after our sessions are committed - session.AddPostCommitEvent(startBroadcastsHook, event) + // schedule this for being started after our scene are committed + scene.AddPostCommitEvent(startBroadcastsHook, event) return nil } diff --git a/hooks/campaigns.go b/hooks/campaigns.go index 19a490a00..8fa9fc48a 100644 --- a/hooks/campaigns.go +++ b/hooks/campaigns.go @@ -17,15 +17,15 @@ type UpdateCampaignEventsHook struct{} var updateCampaignEventsHook = &UpdateCampaignEventsHook{} -// Apply will update all the campaigns for the passed in sessions, minimizing the number of queries to do so -func (h *UpdateCampaignEventsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +// Apply will update all the campaigns for the passed in scene, minimizing the number of queries to do so +func (h *UpdateCampaignEventsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // these are all the events we need to delete unfired fires for deletes := make([]*models.FireDelete, 0, 5) // these are all the new events we need to insert inserts := make([]*models.FireAdd, 0, 5) - for s, es := range sessions { + for s, es := range scene { groupAdds := make(map[models.GroupID]bool) groupRemoves := make(map[models.GroupID]bool) fieldChanges := make(map[models.FieldID]bool) diff --git a/hooks/classifier_called.go b/hooks/classifier_called.go index db9ea6729..3ad8adb10 100644 --- a/hooks/classifier_called.go +++ b/hooks/classifier_called.go @@ -24,10 +24,10 @@ type InsertHTTPLogsHook struct{} var insertHTTPLogsHook = &InsertHTTPLogsHook{} // Apply inserts all the classifier logs that were created -func (h *InsertHTTPLogsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *InsertHTTPLogsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // gather all our logs - logs := make([]*models.HTTPLog, 0, len(sessions)) - for _, ls := range sessions { + logs := make([]*models.HTTPLog, 0, len(scene)) + for _, ls := range scene { for _, l := range ls { logs = append(logs, l.(*models.HTTPLog)) } @@ -42,7 +42,7 @@ func (h *InsertHTTPLogsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.P } // handleClassifierCalled is called for each classifier called event -func handleClassifierCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleClassifierCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.ClassifierCalledEvent) classifier := org.ClassifierByUUID(event.Classifier.UUID) @@ -53,8 +53,8 @@ func handleClassifierCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, or // create a log for each HTTP call for _, httpLog := range event.HTTPLogs { logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "url": httpLog.URL, "status": httpLog.Status, "elapsed_ms": httpLog.ElapsedMS, @@ -73,7 +73,7 @@ func handleClassifierCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, or httpLog.CreatedOn, ) - session.AddPreCommitEvent(insertHTTPLogsHook, log) + scene.AddPreCommitEvent(insertHTTPLogsHook, log) } return nil diff --git a/hooks/contact_field_changed.go b/hooks/contact_field_changed.go index a34dfb20a..e85f36351 100644 --- a/hooks/contact_field_changed.go +++ b/hooks/contact_field_changed.go @@ -24,11 +24,11 @@ type CommitFieldChangesHook struct{} var commitFieldChangesHook = &CommitFieldChangesHook{} // Apply squashes and writes all the field updates for the contacts -func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // our list of updates - fieldUpdates := make([]interface{}, 0, len(sessions)) + fieldUpdates := make([]interface{}, 0, len(scene)) fieldDeletes := make(map[assets.FieldUUID][]interface{}) - for session, es := range sessions { + for scene, es := range scene { updates := make(map[assets.FieldUUID]*flows.Value, len(es)) for _, e := range es { event := e.(*events.ContactFieldChangedEvent) @@ -37,7 +37,7 @@ func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red logrus.WithFields(logrus.Fields{ "field_key": event.Field.Key, "field_name": event.Field.Name, - "session_id": session.ID(), + "session_id": scene.ID(), }).Debug("unable to find field with key, ignoring") continue } @@ -50,7 +50,7 @@ func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red if v == nil || v.Text.Native() == "" { delete(updates, k) fieldDeletes[k] = append(fieldDeletes[k], &FieldDelete{ - ContactID: session.ContactID(), + ContactID: scene.ContactID(), FieldUUID: k, }) } @@ -64,7 +64,7 @@ func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red // and queue them up for our update fieldUpdates = append(fieldUpdates, &FieldUpdate{ - ContactID: session.ContactID(), + ContactID: scene.ContactID(), Updates: string(fieldJSON), }) } @@ -90,18 +90,18 @@ func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red } // handleContactFieldChanged is called when a contact field changes -func handleContactFieldChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleContactFieldChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.ContactFieldChangedEvent) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "field_key": event.Field.Key, "value": event.Value, }).Debug("contact field changed") // add our callback - session.AddPreCommitEvent(commitFieldChangesHook, event) - session.AddPreCommitEvent(updateCampaignEventsHook, event) + scene.AddPreCommitEvent(commitFieldChangesHook, event) + scene.AddPreCommitEvent(updateCampaignEventsHook, event) return nil } diff --git a/hooks/contact_groups_changed.go b/hooks/contact_groups_changed.go index 21249559d..d60cceb64 100644 --- a/hooks/contact_groups_changed.go +++ b/hooks/contact_groups_changed.go @@ -23,14 +23,14 @@ type CommitGroupChangesHook struct{} var commitGroupChangesHook = &CommitGroupChangesHook{} // Apply squashes and adds or removes all our contact groups -func (h *CommitGroupChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *CommitGroupChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // build up our list of all adds and removes - adds := make([]*models.GroupAdd, 0, len(sessions)) - removes := make([]*models.GroupRemove, 0, len(sessions)) - changed := make(map[models.ContactID]bool, len(sessions)) + adds := make([]*models.GroupAdd, 0, len(scene)) + removes := make([]*models.GroupRemove, 0, len(scene)) + changed := make(map[models.ContactID]bool, len(scene)) // we remove from our groups at once, build up our list - for _, events := range sessions { + for _, events := range scene { // we use these sets to track what our final add or remove should be seenAdds := make(map[models.GroupID]*models.GroupAdd) seenRemoves := make(map[models.GroupID]*models.GroupRemove) @@ -84,11 +84,11 @@ func (h *CommitGroupChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red } // handleContactGroupsChanged is called when a group is added or removed from our contact -func handleContactGroupsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleContactGroupsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.ContactGroupsChangedEvent) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "groups_removed": len(event.GroupsRemoved), "groups_added": len(event.GroupsAdded), }).Debug("changing contact groups") @@ -99,21 +99,21 @@ func handleContactGroupsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool group := org.GroupByUUID(g.UUID) if group == nil { logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), + "contact_uuid": scene.ContactUUID(), "group_uuid": g.UUID, }).Warn("unable to find group to remove, skipping") continue } hookEvent := &models.GroupRemove{ - ContactID: session.ContactID(), + ContactID: scene.ContactID(), GroupID: group.ID(), } // add our add event - session.AddPreCommitEvent(commitGroupChangesHook, hookEvent) - session.AddPreCommitEvent(updateCampaignEventsHook, hookEvent) - session.AddPreCommitEvent(contactModifiedHook, session.Contact().ID()) + scene.AddPreCommitEvent(commitGroupChangesHook, hookEvent) + scene.AddPreCommitEvent(updateCampaignEventsHook, hookEvent) + scene.AddPreCommitEvent(contactModifiedHook, scene.Contact().ID()) } // add each of our groups @@ -122,7 +122,7 @@ func handleContactGroupsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool group := org.GroupByUUID(g.UUID) if group == nil { logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), + "contact_uuid": scene.ContactUUID(), "group_uuid": g.UUID, }).Warn("unable to find group to add, skipping") continue @@ -130,13 +130,13 @@ func handleContactGroupsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool // add our add event hookEvent := &models.GroupAdd{ - ContactID: session.ContactID(), + ContactID: scene.ContactID(), GroupID: group.ID(), } - session.AddPreCommitEvent(commitGroupChangesHook, hookEvent) - session.AddPreCommitEvent(updateCampaignEventsHook, hookEvent) - session.AddPreCommitEvent(contactModifiedHook, session.Contact().ID()) + scene.AddPreCommitEvent(commitGroupChangesHook, hookEvent) + scene.AddPreCommitEvent(updateCampaignEventsHook, hookEvent) + scene.AddPreCommitEvent(contactModifiedHook, scene.Contact().ID()) } return nil diff --git a/hooks/contact_language_changed.go b/hooks/contact_language_changed.go index 365af2694..6c81c4703 100644 --- a/hooks/contact_language_changed.go +++ b/hooks/contact_language_changed.go @@ -21,10 +21,10 @@ type CommitLanguageChangesHook struct{} var commitLanguageChangesHook = &CommitLanguageChangesHook{} // Apply applies our contact language change before our commit -func (h *CommitLanguageChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *CommitLanguageChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // build up our list of pairs of contact id and language name - updates := make([]interface{}, 0, len(sessions)) - for s, e := range sessions { + updates := make([]interface{}, 0, len(scene)) + for s, e := range scene { // we only care about the last name change event := e[len(e)-1].(*events.ContactLanguageChangedEvent) updates = append(updates, &languageUpdate{s.ContactID(), event.Language}) @@ -35,15 +35,15 @@ func (h *CommitLanguageChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp * } // handleContactLanguageChanged is called when we process a contact language change -func handleContactLanguageChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleContactLanguageChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.ContactLanguageChangedEvent) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "language": event.Language, }).Debug("changing contact language") - session.AddPreCommitEvent(commitLanguageChangesHook, event) + scene.AddPreCommitEvent(commitLanguageChangesHook, event) return nil } diff --git a/hooks/contact_modified.go b/hooks/contact_modified.go index 4e26ed8f4..0f2853ac1 100644 --- a/hooks/contact_modified.go +++ b/hooks/contact_modified.go @@ -15,11 +15,11 @@ type ContactModifiedHook struct{} var contactModifiedHook = &ContactModifiedHook{} // Apply squashes and updates modified_on on all the contacts passed in -func (h *ContactModifiedHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *ContactModifiedHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // our list of contact ids - contactIDs := make([]models.ContactID, 0, len(sessions)) - for session := range sessions { - contactIDs = append(contactIDs, session.ContactID()) + contactIDs := make([]models.ContactID, 0, len(scene)) + for scene := range scene { + contactIDs = append(contactIDs, scene.ContactID()) } err := models.UpdateContactModifiedOn(ctx, tx, contactIDs) diff --git a/hooks/contact_name_changed.go b/hooks/contact_name_changed.go index 241a0278b..086a627f3 100644 --- a/hooks/contact_name_changed.go +++ b/hooks/contact_name_changed.go @@ -21,11 +21,11 @@ type CommitNameChangesHook struct{} var commitNameChangesHook = &CommitNameChangesHook{} -// Apply commits our contact name changes as a bulk update for the passed in map of sessions -func (h *CommitNameChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +// Apply commits our contact name changes as a bulk update for the passed in map of scene +func (h *CommitNameChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // build up our list of pairs of contact id and contact name - updates := make([]interface{}, 0, len(sessions)) - for s, e := range sessions { + updates := make([]interface{}, 0, len(scene)) + for s, e := range scene { // we only care about the last name change event := e[len(e)-1].(*events.ContactNameChangedEvent) updates = append(updates, &nameUpdate{s.ContactID(), fmt.Sprintf("%.128s", event.Name)}) @@ -36,15 +36,15 @@ func (h *CommitNameChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redi } // handleContactNameChanged changes the name of the contact -func handleContactNameChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleContactNameChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.ContactNameChangedEvent) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "name": event.Name, }).Debug("changing contact name") - session.AddPreCommitEvent(commitNameChangesHook, event) + scene.AddPreCommitEvent(commitNameChangesHook, event) return nil } diff --git a/hooks/contact_urns_changed.go b/hooks/contact_urns_changed.go index d6c876ad1..62466d1f4 100644 --- a/hooks/contact_urns_changed.go +++ b/hooks/contact_urns_changed.go @@ -22,10 +22,10 @@ type CommitURNChangesHook struct{} var commitURNChangesHook = &CommitURNChangesHook{} // Apply adds all our URNS in a batch -func (h *CommitURNChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { - // gather all our urn changes, we only care about the last change for each session - changes := make([]*models.ContactURNsChanged, 0, len(sessions)) - for _, sessionChanges := range sessions { +func (h *CommitURNChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { + // gather all our urn changes, we only care about the last change for each scene + changes := make([]*models.ContactURNsChanged, 0, len(scene)) + for _, sessionChanges := range scene { changes = append(changes, sessionChanges[len(sessionChanges)-1].(*models.ContactURNsChanged)) } @@ -38,24 +38,24 @@ func (h *CommitURNChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis } // handleContactURNsChanged is called for each contact urn changed event that is encountered -func handleContactURNsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleContactURNsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.ContactURNsChangedEvent) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "urns": event.URNs, }).Debug("contact urns changed") // create our URN changed event change := &models.ContactURNsChanged{ - ContactID: session.ContactID(), + ContactID: scene.ContactID(), OrgID: org.OrgID(), URNs: event.URNs, } // add our callback - session.AddPreCommitEvent(commitURNChangesHook, change) - session.AddPreCommitEvent(contactModifiedHook, session.Contact().ID()) + scene.AddPreCommitEvent(commitURNChangesHook, change) + scene.AddPreCommitEvent(contactModifiedHook, scene.Contact().ID()) return nil } diff --git a/hooks/email_sent.go b/hooks/email_sent.go index 73cb4e69f..5a8215898 100644 --- a/hooks/email_sent.go +++ b/hooks/email_sent.go @@ -17,12 +17,12 @@ func init() { } // goflow now sends email so this just logs the event -func handleEmailSent(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleEmailSent(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.EmailSentEvent) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "body": event.Body, "to": event.To, }).Debug("email sent") diff --git a/hooks/hooks_test.go b/hooks/hooks_test.go index a7cdfc754..99887e0e6 100644 --- a/hooks/hooks_test.go +++ b/hooks/hooks_test.go @@ -191,8 +191,8 @@ func RunActionTestCases(t *testing.T, tcs []HookTestCase) { assert.NoError(t, err) options := runner.NewStartOptions() - options.CommitHook = func(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions []*models.Session) error { - for _, s := range sessions { + options.CommitHook = func(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session []*models.Session) error { + for _, s := range session { msg := tc.Msgs[s.ContactID()] if msg != nil { s.SetIncomingMsg(msg.ID(), "") diff --git a/hooks/input_labels_added.go b/hooks/input_labels_added.go index 0b27a2a93..822a5d0f8 100644 --- a/hooks/input_labels_added.go +++ b/hooks/input_labels_added.go @@ -23,12 +23,12 @@ type CommitAddedLabelsHook struct{} var commitAddedLabelsHook = &CommitAddedLabelsHook{} // Apply applies our input labels added, committing them in a single batch -func (h *CommitAddedLabelsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *CommitAddedLabelsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // build our list of msg label adds, we dedupe these so we never double add in the same transaction seen := make(map[string]bool) - adds := make([]*models.MsgLabelAdd, 0, len(sessions)) + adds := make([]*models.MsgLabelAdd, 0, len(scene)) - for _, as := range sessions { + for _, as := range scene { for _, a := range as { add := a.(*models.MsgLabelAdd) key := fmt.Sprintf("%d:%d", add.LabelID, add.MsgID) @@ -43,12 +43,12 @@ func (h *CommitAddedLabelsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redi return models.AddMsgLabels(ctx, tx, adds) } -// handleInputLabelsAdded is called for each input labels added event in a session -func handleInputLabelsAdded(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +// handleInputLabelsAdded is called for each input labels added event in a scene +func handleInputLabelsAdded(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.InputLabelsAddedEvent) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "labels": event.Labels, }).Debug("input labels added") @@ -59,12 +59,16 @@ func handleInputLabelsAdded(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, or return errors.Errorf("unable to find label with UUID: %s", l.UUID) } - if session.IncomingMsgID() == models.NilMsgID { - return errors.Errorf("cannot add label, no incoming message for session: %d", session.ID()) + if scene.Session() == nil { + return errors.Errorf("cannot add label, not in a session") } - session.AddPreCommitEvent(commitAddedLabelsHook, &models.MsgLabelAdd{ - MsgID: session.IncomingMsgID(), + if scene.Session().IncomingMsgID() == models.NilMsgID { + return errors.Errorf("cannot add label, no incoming message for scene: %d", scene.ID()) + } + + scene.AddPreCommitEvent(commitAddedLabelsHook, &models.MsgLabelAdd{ + MsgID: scene.Session().IncomingMsgID(), LabelID: label.ID(), }) } diff --git a/hooks/ivr_created.go b/hooks/ivr_created.go index 36491caea..a20cacfb1 100644 --- a/hooks/ivr_created.go +++ b/hooks/ivr_created.go @@ -16,15 +16,15 @@ func init() { models.RegisterEventHook(events.TypeIVRCreated, handleIVRCreated) } -// CommitIVRHook is our hook for comitting session messages / say commands +// CommitIVRHook is our hook for comitting scene messages / say commands type CommitIVRHook struct{} var commitIVRHook = &CommitIVRHook{} -// Apply takes care of inserting all the messages in the passed in sessions assigning topups to them as needed. -func (h *CommitIVRHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { - msgs := make([]*models.Msg, 0, len(sessions)) - for _, s := range sessions { +// Apply takes care of inserting all the messages in the passed in scene assigning topups to them as needed. +func (h *CommitIVRHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { + msgs := make([]*models.Msg, 0, len(scene)) + for _, s := range scene { for _, m := range s { msgs = append(msgs, m.(*models.Msg)) } @@ -55,19 +55,24 @@ func (h *CommitIVRHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, } // handleIVRCreated creates the db msg for the passed in event -func handleIVRCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleIVRCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.IVRCreatedEvent) + // must be in a session + if scene.Session() == nil { + return errors.Errorf("cannot apply ivr created event, not in a session") + } + logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "text": event.Msg.Text(), }).Debug("ivr say") // get our channel connection - conn := session.ChannelConnection() + conn := scene.Session().ChannelConnection() if conn == nil { - return errors.Errorf("ivr sessions must have a channel connection set") + return errors.Errorf("ivr session must have a channel connection set") } // if our call is no longer in progress, return @@ -81,7 +86,7 @@ func handleIVRCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *mod } // register to have this message committed - session.AddPreCommitEvent(commitIVRHook, msg) + scene.AddPreCommitEvent(commitIVRHook, msg) return nil } diff --git a/hooks/msg_created.go b/hooks/msg_created.go index 3ffaea833..15f3907be 100644 --- a/hooks/msg_created.go +++ b/hooks/msg_created.go @@ -24,13 +24,13 @@ func init() { models.RegisterEventHook(events.TypeMsgCreated, handleMsgCreated) } -// SendMessagesHook is our hook for sending session messages +// SendMessagesHook is our hook for sending scene messages type SendMessagesHook struct{} var sendMessagesHook = &SendMessagesHook{} // Apply sends all non-android messages to courier -func (h *SendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *SendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { rc := rp.Get() defer rc.Close() @@ -40,8 +40,8 @@ func (h *SendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Poo // android channels that need to be notified to sync androidChannels := make(map[*models.Channel]bool) - // for each session gather all our messages - for s, args := range sessions { + // for each scene gather all our messages + for s, args := range scene { // walk through our messages, separate by whether they have a topup courierMsgs := make([]*models.Msg, 0, len(args)) @@ -61,12 +61,12 @@ func (h *SendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Poo // if there are courier messages to send, do so if len(courierMsgs) > 0 { - // if our session has a timeout, set it on our last message - if s.Timeout() != nil && s.WaitStartedOn() != nil { - courierMsgs[len(courierMsgs)-1].SetTimeout(s.ID(), *s.WaitStartedOn(), *s.Timeout()) + // if our scene has a timeout, set it on our last message + if s.Session().Timeout() != nil && s.Session().WaitStartedOn() != nil { + courierMsgs[len(courierMsgs)-1].SetTimeout(s.ID(), *s.Session().WaitStartedOn(), *s.Session().Timeout()) } - log := log.WithField("messages", courierMsgs).WithField("session", s.ID) + log := log.WithField("messages", courierMsgs).WithField("scene", s.ID) err := courier.QueueMessages(rc, courierMsgs) @@ -135,15 +135,15 @@ func (h *SendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Poo return nil } -// CommitMessagesHook is our hook for comitting session messages +// CommitMessagesHook is our hook for comitting scene messages type CommitMessagesHook struct{} var commitMessagesHook = &CommitMessagesHook{} -// Apply takes care of inserting all the messages in the passed in sessions assigning topups to them as needed. -func (h *CommitMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { - msgs := make([]*models.Msg, 0, len(sessions)) - for _, s := range sessions { +// Apply takes care of inserting all the messages in the passed in scene assigning topups to them as needed. +func (h *CommitMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { + msgs := make([]*models.Msg, 0, len(scene)) + for _, s := range scene { for _, m := range s { msgs = append(msgs, m.(*models.Msg)) } @@ -174,27 +174,32 @@ func (h *CommitMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.P } // handleMsgCreated creates the db msg for the passed in event -func handleMsgCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleMsgCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.MsgCreatedEvent) + // must be in a session + if scene.Session() == nil { + return errors.Errorf("cannot handle msg created event without session") + } + logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "text": event.Msg.Text(), "urn": event.Msg.URN(), }).Debug("msg created event") // ignore events that don't have a channel or URN set // TODO: maybe we should create these messages in a failed state? - if session.SessionType() == models.MessagingFlow && (event.Msg.URN() == urns.NilURN || event.Msg.Channel() == nil) { + if scene.Session().SessionType() == models.MessagingFlow && (event.Msg.URN() == urns.NilURN || event.Msg.Channel() == nil) { return nil } // messages in messaging flows must have urn id set on them, if not, go look it up - if session.SessionType() == models.MessagingFlow { + if scene.Session().SessionType() == models.MessagingFlow { urn := event.Msg.URN() if models.GetURNInt(urn, "id") == 0 { - urn, err := models.GetOrCreateURN(ctx, tx, org, session.ContactID(), event.Msg.URN()) + urn, err := models.GetOrCreateURN(ctx, tx, org, scene.ContactID(), event.Msg.URN()) if err != nil { return errors.Wrapf(err, "unable to get or create URN: %s", event.Msg.URN()) } @@ -212,31 +217,36 @@ func handleMsgCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *mod } } - msg, err := models.NewOutgoingMsg(org.OrgID(), channel, session.ContactID(), event.Msg, event.CreatedOn()) + msg, err := models.NewOutgoingMsg(org.OrgID(), channel, scene.ContactID(), event.Msg, event.CreatedOn()) if err != nil { return errors.Wrapf(err, "error creating outgoing message to %s", event.Msg.URN()) } // set our reply to as well (will be noop in cases when there is no incoming message) - msg.SetResponseTo(session.IncomingMsgID(), session.IncomingMsgExternalID()) + msg.SetResponseTo(scene.Session().IncomingMsgID(), scene.Session().IncomingMsgExternalID()) // register to have this message committed - session.AddPreCommitEvent(commitMessagesHook, msg) + scene.AddPreCommitEvent(commitMessagesHook, msg) // don't send messages for surveyor flows - if session.SessionType() != models.SurveyorFlow { - session.AddPostCommitEvent(sendMessagesHook, msg) + if scene.Session().SessionType() != models.SurveyorFlow { + scene.AddPostCommitEvent(sendMessagesHook, msg) } return nil } -// handlePreMsgCreated clears our timeout on our session so that courier can send it when the message is sent, that will be set by courier when sent -func handlePreMsgCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +// handlePreMsgCreated clears our timeout on our scene so that courier can send it when the message is sent, that will be set by courier when sent +func handlePreMsgCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.MsgCreatedEvent) + // must be in a session + if scene.Session() == nil { + return errors.Errorf("cannot handle msg created event without session") + } + // we only clear timeouts on messaging flows - if session.SessionType() != models.MessagingFlow { + if scene.Session().SessionType() != models.MessagingFlow { return nil } @@ -261,7 +271,7 @@ func handlePreMsgCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org * } // everybody else gets their timeout cleared, will be set by courier - session.ClearTimeoutOn() + scene.Session().ClearTimeoutOn() return nil } diff --git a/hooks/msg_received.go b/hooks/msg_received.go index 4b6df49b9..60fc07fd3 100644 --- a/hooks/msg_received.go +++ b/hooks/msg_received.go @@ -8,6 +8,7 @@ import ( "github.com/nyaruka/goflow/flows" "github.com/nyaruka/goflow/flows/events" "github.com/nyaruka/mailroom/models" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -16,24 +17,29 @@ func init() { } // handleMsgReceived takes care of creating the incoming message for surveyor flows, it is a noop for all other flows -func handleMsgReceived(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +func handleMsgReceived(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.MsgReceivedEvent) + // must be in a session + if scene.Session() == nil { + return errors.Errorf("cannot handle msg received event without session") + } + // we only care about msg received events when dealing with surveyor flows - if session.SessionType() != models.SurveyorFlow { + if scene.Session().SessionType() != models.SurveyorFlow { return nil } logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "text": event.Msg.Text(), "urn": event.Msg.URN(), }).Debug("msg received event") - msg := models.NewIncomingMsg(org.OrgID(), nil, session.ContactID(), &event.Msg, event.CreatedOn()) + msg := models.NewIncomingMsg(org.OrgID(), nil, scene.ContactID(), &event.Msg, event.CreatedOn()) // we'll commit this message with all the others - session.AddPreCommitEvent(commitMessagesHook, msg) + scene.AddPreCommitEvent(commitMessagesHook, msg) return nil } diff --git a/hooks/noop.go b/hooks/noop.go index cfe71ff81..cf159a1e9 100644 --- a/hooks/noop.go +++ b/hooks/noop.go @@ -22,6 +22,6 @@ func init() { } // NoopHandler is our hook for events we ignore in a run -func NoopHandler(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, event flows.Event) error { +func NoopHandler(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, event flows.Event) error { return nil } diff --git a/hooks/resthook_called.go b/hooks/resthook_called.go index d2bcad5dc..14033fa5b 100644 --- a/hooks/resthook_called.go +++ b/hooks/resthook_called.go @@ -22,9 +22,9 @@ type InsertWebhookEventHook struct{} var insertWebhookEventHook = &InsertWebhookEventHook{} // Apply inserts all the webook events that were created -func (h *InsertWebhookEventHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { - events := make([]*models.WebhookEvent, 0, len(sessions)) - for _, rs := range sessions { +func (h *InsertWebhookEventHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { + events := make([]*models.WebhookEvent, 0, len(scene)) + for _, rs := range scene { for _, r := range rs { events = append(events, r.(*models.WebhookEvent)) } @@ -38,12 +38,12 @@ func (h *InsertWebhookEventHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red return nil } -// handleResthookCalled is called for each resthook call in a session -func handleResthookCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +// handleResthookCalled is called for each resthook call in a scene +func handleResthookCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.ResthookCalledEvent) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "resthook": event.Resthook, }).Debug("resthook called") @@ -61,7 +61,7 @@ func handleResthookCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org string(event.Payload), event.CreatedOn(), ) - session.AddPreCommitEvent(insertWebhookEventHook, re) + scene.AddPreCommitEvent(insertWebhookEventHook, re) return nil } diff --git a/hooks/session_triggered.go b/hooks/session_triggered.go index de4e08851..720268e6b 100644 --- a/hooks/session_triggered.go +++ b/hooks/session_triggered.go @@ -17,7 +17,7 @@ func init() { models.RegisterEventHook(events.TypeSessionTriggered, handleSessionTriggered) } -// StartStartHook is our hook to fire our session starts +// StartStartHook is our hook to fire our scene starts type StartStartHook struct{} var startStartHook = &StartStartHook{} @@ -28,12 +28,12 @@ type InsertStartHook struct{} var insertStartHook = &InsertStartHook{} // Apply queues up our flow starts -func (h *StartStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *StartStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { rc := rp.Get() defer rc.Close() - // for each of our sessions - for _, es := range sessions { + // for each of our scene + for _, es := range scene { for _, e := range es { start := e.(*models.FlowStart) @@ -57,20 +57,20 @@ func (h *StartStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, } // Apply inserts our starts -func (h *InsertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *InsertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { rc := rp.Get() defer rc.Close() - starts := make([]*models.FlowStart, 0, len(sessions)) + starts := make([]*models.FlowStart, 0, len(scene)) - // for each of our sessions - for s, es := range sessions { + // for each of our scene + for s, es := range scene { for _, e := range es { event := e.(*events.SessionTriggeredEvent) - // we skip over any session starts that involve groups if we are in a batch start - if len(sessions) > 1 && (len(event.Groups) > 0 || event.ContactQuery != "") { - logrus.WithField("session_id", s.ID).Error("ignoring session trigger on group or query in batch") + // we skip over any scene starts that involve groups if we are in a batch start + if len(scene) > 1 && (len(event.Groups) > 0 || event.ContactQuery != "") { + logrus.WithField("session_id", s.ID).Error("ignoring scene trigger on group or query in batch") continue } @@ -115,23 +115,29 @@ func (h *InsertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool // insert all our starts err := models.InsertFlowStarts(ctx, tx, starts) if err != nil { - return errors.Wrapf(err, "error inserting flow starts for session triggers") + return errors.Wrapf(err, "error inserting flow starts for scene triggers") } return nil } -// handleSessionTriggered queues this event for being started after our sessions are committed -func handleSessionTriggered(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +// handleSessionTriggered queues this event for being started after our scene are committed +func handleSessionTriggered(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.SessionTriggeredEvent) + + // must be in a session + if scene.Session() == nil { + return errors.Errorf("cannot handle session triggered event without session") + } + logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "flow": event.Flow.Name, "flow_uuid": event.Flow.UUID, - }).Debug("session triggered") + }).Debug("scene triggered") - session.AddPreCommitEvent(insertStartHook, event) + scene.AddPreCommitEvent(insertStartHook, event) return nil } diff --git a/hooks/webhook_called.go b/hooks/webhook_called.go index 0b85fa23d..c4c43d927 100644 --- a/hooks/webhook_called.go +++ b/hooks/webhook_called.go @@ -23,10 +23,10 @@ type UnsubscribeResthookHook struct{} var unsubscribeResthookHook = &UnsubscribeResthookHook{} // Apply squashes and applies all our resthook unsubscriptions -func (h *UnsubscribeResthookHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *UnsubscribeResthookHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // gather all our unsubscribes - unsubs := make([]*models.ResthookUnsubscribe, 0, len(sessions)) - for _, us := range sessions { + unsubs := make([]*models.ResthookUnsubscribe, 0, len(scene)) + for _, us := range scene { for _, u := range us { unsubs = append(unsubs, u.(*models.ResthookUnsubscribe)) } @@ -46,10 +46,10 @@ type InsertWebhookResultHook struct{} var insertWebhookResultHook = &InsertWebhookResultHook{} // Apply inserts all the webook results that were created -func (h *InsertWebhookResultHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error { +func (h *InsertWebhookResultHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // gather all our results - results := make([]*models.WebhookResult, 0, len(sessions)) - for _, rs := range sessions { + results := make([]*models.WebhookResult, 0, len(scene)) + for _, rs := range scene { for _, r := range rs { results = append(results, r.(*models.WebhookResult)) } @@ -63,12 +63,12 @@ func (h *InsertWebhookResultHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *re return nil } -// handleWebhookCalled is called for each webhook call in a session -func handleWebhookCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error { +// handleWebhookCalled is called for each webhook call in a scene +func handleWebhookCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.WebhookCalledEvent) logrus.WithFields(logrus.Fields{ - "contact_uuid": session.ContactUUID(), - "session_id": session.ID(), + "contact_uuid": scene.ContactUUID(), + "session_id": scene.ID(), "url": event.URL, "status": event.Status, "elapsed_ms": event.ElapsedMS, @@ -83,7 +83,7 @@ func handleWebhookCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org * URL: event.URL, } - session.AddPreCommitEvent(unsubscribeResthookHook, unsub) + scene.AddPreCommitEvent(unsubscribeResthookHook, unsub) } // if this is a connection error, use that as our response @@ -94,12 +94,12 @@ func handleWebhookCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org * // create a result for this call result := models.NewWebhookResult( - org.OrgID(), session.ContactID(), + org.OrgID(), scene.ContactID(), event.URL, event.Request, event.StatusCode, response, time.Millisecond*time.Duration(event.ElapsedMS), event.CreatedOn(), ) - session.AddPreCommitEvent(insertWebhookResultHook, result) + scene.AddPreCommitEvent(insertWebhookResultHook, result) return nil } diff --git a/models/events.go b/models/events.go index 3d058cb66..b07f59a98 100644 --- a/models/events.go +++ b/models/events.go @@ -9,20 +9,73 @@ import ( "github.com/pkg/errors" ) +// Scene represents the context that events are occurring in +type Scene struct { + contact *flows.Contact + session *Session + + preCommits map[EventCommitHook][]interface{} + postCommits map[EventCommitHook][]interface{} +} + +func NewSceneForSession(session *Session) *Scene { + s := &Scene{ + contact: session.Contact(), + session: session, + + preCommits: make(map[EventCommitHook][]interface{}), + postCommits: make(map[EventCommitHook][]interface{}), + } + return s +} + +func NewSceneForContact(contact *flows.Contact) *Scene { + s := &Scene{ + contact: contact, + + preCommits: make(map[EventCommitHook][]interface{}), + postCommits: make(map[EventCommitHook][]interface{}), + } + return s +} + +func (s *Scene) ID() SessionID { + if s.session == nil { + return SessionID(0) + } + return s.session.ID() +} + +func (s *Scene) Contact() *flows.Contact { return s.contact } +func (s *Scene) ContactID() ContactID { return ContactID(s.contact.ID()) } +func (s *Scene) ContactUUID() flows.ContactUUID { return s.contact.UUID() } + +func (s *Scene) Session() *Session { return s.session } + +// AddPreCommitEvent adds a new event to be handled by a pre commit hook +func (s *Scene) AddPreCommitEvent(hook EventCommitHook, event interface{}) { + s.preCommits[hook] = append(s.preCommits[hook], event) +} + +// AddPostCommitEvent adds a new event to be handled by a post commit hook +func (s *Scene) AddPostCommitEvent(hook EventCommitHook, event interface{}) { + s.postCommits[hook] = append(s.postCommits[hook], event) +} + // EventCommitHook defines a callback that will accept a certain type of events across session, either before or after committing type EventCommitHook interface { - Apply(context.Context, *sqlx.Tx, *redis.Pool, *OrgAssets, map[*Session][]interface{}) error + Apply(context.Context, *sqlx.Tx, *redis.Pool, *OrgAssets, map[*Scene][]interface{}) error } // ApplyPreEventHooks runs through all the pre event hooks for the passed in sessions and applies their events -func ApplyPreEventHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, sessions []*Session) error { +func ApplyPreEventHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, scenes []*Scene) error { // gather all our hook events together across our sessions - preHooks := make(map[EventCommitHook]map[*Session][]interface{}) - for _, s := range sessions { + preHooks := make(map[EventCommitHook]map[*Scene][]interface{}) + for _, s := range scenes { for hook, args := range s.preCommits { sessionMap, found := preHooks[hook] if !found { - sessionMap = make(map[*Session][]interface{}, len(sessions)) + sessionMap = make(map[*Scene][]interface{}, len(scenes)) preHooks[hook] = sessionMap } sessionMap[s] = args @@ -41,17 +94,17 @@ func ApplyPreEventHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *O } // ApplyPostEventHooks runs through all the post event hooks for the passed in sessions and applies their events -func ApplyPostEventHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, sessions []*Session) error { +func ApplyPostEventHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, scenes []*Scene) error { // gather all our hook events together across our sessions - postHooks := make(map[EventCommitHook]map[*Session][]interface{}) - for _, s := range sessions { + postHooks := make(map[EventCommitHook]map[*Scene][]interface{}) + for _, s := range scenes { for hook, args := range s.postCommits { - sessionMap, found := postHooks[hook] + sprintMap, found := postHooks[hook] if !found { - sessionMap = make(map[*Session][]interface{}, len(sessions)) - postHooks[hook] = sessionMap + sprintMap = make(map[*Scene][]interface{}, len(scenes)) + postHooks[hook] = sprintMap } - sessionMap[s] = args + sprintMap[s] = args } } @@ -67,7 +120,7 @@ func ApplyPostEventHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org * } // EventHandler defines a call for handling events that occur in a flow -type EventHandler func(context.Context, *sqlx.Tx, *redis.Pool, *OrgAssets, *Session, flows.Event) error +type EventHandler func(context.Context, *sqlx.Tx, *redis.Pool, *OrgAssets, *Scene, flows.Event) error // RegisterEventHook registers the passed in handler as being interested in the passed in type func RegisterEventHook(eventType string, handler EventHandler) { @@ -89,35 +142,32 @@ func RegisterPreWriteHook(eventType string, handler EventHandler) { preHandlers[eventType] = handler } -// ApplyEvent applies the passed in event, IE, creates the db objects required etc.. -func ApplyEvent(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, session *Session, e flows.Event) error { - // if this session failed, don't apply any hooks - if session.Status() == SessionStatusFailed { - return nil - } +// ApplyEvents applies the passed in event, IE, creates the db objects required etc.. +func ApplyEvents(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, scene *Scene, events []flows.Event) error { + for _, e := range events { - handler, found := eventHandlers[e.Type()] - if !found { - return errors.Errorf("unable to find handler for event type: %s", e.Type()) - } + handler, found := eventHandlers[e.Type()] + if !found { + return errors.Errorf("unable to find handler for event type: %s", e.Type()) + } - return handler(ctx, tx, rp, org, session, e) + err := handler(ctx, tx, rp, org, scene, e) + if err != nil { + return err + } + } + return nil } // ApplyPreWriteEvent applies the passed in event before insertion or update, unlike normal event handlers it is not a requirement // that all types have a handler. -func ApplyPreWriteEvent(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, session *Session, e flows.Event) error { - // if this session failed, don't apply any hooks - if session.Status() == SessionStatusFailed { - return nil - } - +func ApplyPreWriteEvent(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, scene *Scene, e flows.Event) error { handler, found := preHandlers[e.Type()] if !found { return nil } - return handler(ctx, tx, rp, org, session, e) + return handler(ctx, tx, rp, org, scene, e) } // our map of event type to internal handlers diff --git a/models/runs.go b/models/runs.go index 7f7c0eddc..81606656c 100644 --- a/models/runs.go +++ b/models/runs.go @@ -94,20 +94,20 @@ var keptEvents = map[string]bool{ // Session is the mailroom type for a FlowSession type Session struct { s struct { - ID SessionID `db:"id"` - UUID null.String `db:"uuid"` // TODO remove nullable once backfilled - SessionType FlowType `db:"session_type"` - Status SessionStatus `db:"status"` - Responded bool `db:"responded"` - Output string `db:"output"` - ContactID ContactID `db:"contact_id"` - OrgID OrgID `db:"org_id"` - CreatedOn time.Time `db:"created_on"` - EndedOn *time.Time `db:"ended_on"` - TimeoutOn *time.Time `db:"timeout_on"` - WaitStartedOn *time.Time `db:"wait_started_on"` - CurrentFlowID FlowID `db:"current_flow_id"` - ConnectionID *ConnectionID `db:"connection_id"` + ID SessionID `db:"id"` + UUID flows.SessionUUID `db:"uuid"` + SessionType FlowType `db:"session_type"` + Status SessionStatus `db:"status"` + Responded bool `db:"responded"` + Output string `db:"output"` + ContactID ContactID `db:"contact_id"` + OrgID OrgID `db:"org_id"` + CreatedOn time.Time `db:"created_on"` + EndedOn *time.Time `db:"ended_on"` + TimeoutOn *time.Time `db:"timeout_on"` + WaitStartedOn *time.Time `db:"wait_started_on"` + CurrentFlowID FlowID `db:"current_flow_id"` + ConnectionID *ConnectionID `db:"connection_id"` } incomingMsgID MsgID @@ -122,13 +122,14 @@ type Session struct { contact *flows.Contact runs []*FlowRun - seenRuns map[flows.RunUUID]time.Time - preCommits map[EventCommitHook][]interface{} - postCommits map[EventCommitHook][]interface{} + seenRuns map[flows.RunUUID]time.Time // we keep around a reference to the sprint associated with this session sprint flows.Sprint + // the scene for our event hooks + scene *Scene + // we also keep around a reference to the wait (if any) wait flows.ActivatedWait } @@ -150,6 +151,7 @@ func (s *Session) CurrentFlowID() FlowID { return s.s.CurrentFlowID func (s *Session) ConnectionID() *ConnectionID { return s.s.ConnectionID } func (s *Session) IncomingMsgID() MsgID { return s.incomingMsgID } func (s *Session) IncomingMsgExternalID() null.String { return s.incomingExternalID } +func (s *Session) Scene() *Scene { return s.scene } // ContactUUID returns the UUID of our contact func (s *Session) ContactUUID() flows.ContactUUID { @@ -186,16 +188,6 @@ func (s *Session) OutputMD5() string { return fmt.Sprintf("%x", md5.Sum([]byte(s.s.Output))) } -// AddPreCommitEvent adds a new event to be handled by a pre commit hook -func (s *Session) AddPreCommitEvent(hook EventCommitHook, event interface{}) { - s.preCommits[hook] = append(s.preCommits[hook], event) -} - -// AddPostCommitEvent adds a new event to be handled by a post commit hook -func (s *Session) AddPostCommitEvent(hook EventCommitHook, event interface{}) { - s.postCommits[hook] = append(s.postCommits[hook], event) -} - // SetIncomingMsg set the incoming message that this session should be associated with in this sprint func (s *Session) SetIncomingMsg(id flows.MsgID, externalID null.String) { s.incomingMsgID = MsgID(id) @@ -322,7 +314,7 @@ func NewSession(ctx context.Context, tx *sqlx.Tx, org *OrgAssets, fs flows.Sessi // create our session object session := &Session{} s := &session.s - s.UUID = null.String(uuid) + s.UUID = uuid s.Status = sessionStatus s.SessionType = sessionType s.Responded = false @@ -332,8 +324,7 @@ func NewSession(ctx context.Context, tx *sqlx.Tx, org *OrgAssets, fs flows.Sessi s.CreatedOn = fs.Runs()[0].CreatedOn() session.contact = fs.Contact() - session.preCommits = make(map[EventCommitHook][]interface{}) - session.postCommits = make(map[EventCommitHook][]interface{}) + session.scene = NewSceneForSession(session) session.sprint = sprint session.wait = fs.Wait() @@ -379,10 +370,9 @@ func ActiveSessionForContact(ctx context.Context, db *sqlx.DB, org *OrgAssets, s // scan in our session session := &Session{ - contact: contact, - preCommits: make(map[EventCommitHook][]interface{}), - postCommits: make(map[EventCommitHook][]interface{}), + contact: contact, } + session.scene = NewSceneForSession(session) err = rows.StructScan(&session.s) if err != nil { return nil, errors.Wrapf(err, "error scanning session") @@ -530,7 +520,7 @@ func (s *Session) WriteUpdatedSession(ctx context.Context, tx *sqlx.Tx, rp *redi // apply all our pre write events for _, e := range sprint.Events() { - err := ApplyPreWriteEvent(ctx, tx, rp, org, s, e) + err := ApplyPreWriteEvent(ctx, tx, rp, org, s.scene, e) if err != nil { return errors.Wrapf(err, "error applying event: %v", e) } @@ -590,15 +580,15 @@ func (s *Session) WriteUpdatedSession(ctx context.Context, tx *sqlx.Tx, rp *redi } // apply all our events - for _, e := range sprint.Events() { - err := ApplyEvent(ctx, tx, rp, org, s, e) + if s.Status() != SessionStatusFailed { + err = ApplyEvents(ctx, tx, rp, org, s.scene, sprint.Events()) if err != nil { - return errors.Wrapf(err, "error applying event: %v", e) + return errors.Wrapf(err, "error applying events: %d", s.ID()) } } // gather all our pre commit events, group them by hook and apply them - err = ApplyPreEventHooks(ctx, tx, rp, org, []*Session{s}) + err = ApplyPreEventHooks(ctx, tx, rp, org, []*Scene{s.scene}) if err != nil { return errors.Wrapf(err, "error applying pre commit hook: %T", hook) } @@ -676,7 +666,7 @@ func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAss // apply all our pre write events for i := range ss { for _, e := range sprints[i].Events() { - err := ApplyPreWriteEvent(ctx, tx, rp, org, sessions[i], e) + err := ApplyPreWriteEvent(ctx, tx, rp, org, sessions[i].scene, e) if err != nil { return nil, errors.Wrapf(err, "error applying event: %v", e) } @@ -727,17 +717,23 @@ func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAss } // apply our all events for the session - for i := range ss { - for _, e := range sprints[i].Events() { - err := ApplyEvent(ctx, tx, rp, org, sessions[i], e) - if err != nil { - return nil, errors.Wrapf(err, "error applying event: %v", e) - } + scenes := make([]*Scene, 0, len(ss)) + for i := range sessions { + if ss[i].Status() == SessionStatusFailed { + continue } + + err = ApplyEvents(ctx, tx, rp, org, sessions[i].Scene(), sprints[i].Events()) + if err != nil { + return nil, errors.Wrapf(err, "error applying events for session: %d", sessions[i].ID()) + } + + scene := sessions[i].Scene() + scenes = append(scenes, scene) } // gather all our pre commit events, group them by hook - err = ApplyPreEventHooks(ctx, tx, rp, org, sessions) + err = ApplyPreEventHooks(ctx, tx, rp, org, scenes) if err != nil { return nil, errors.Wrapf(err, "error applying pre commit hook: %T", hook) } diff --git a/runner/runner.go b/runner/runner.go index 49a71ce47..d132a053a 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -119,7 +119,7 @@ func ResumeFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, org *models.Or return nil, errors.Wrapf(err, "error starting transaction for post commit hooks") } - err = models.ApplyPostEventHooks(txCTX, tx, rp, org, []*models.Session{session}) + err = models.ApplyPostEventHooks(txCTX, tx, rp, org, []*models.Scene{session.Scene()}) if err == nil { err = tx.Commit() } @@ -636,7 +636,12 @@ func StartFlowForContacts( return nil, errors.Wrapf(err, "error starting transaction for post commit hooks") } - err = models.ApplyPostEventHooks(txCTX, tx, rp, org, dbSessions) + scenes := make([]*models.Scene, 0, len(triggers)) + for _, s := range dbSessions { + scenes = append(scenes, s.Scene()) + } + + err = models.ApplyPostEventHooks(txCTX, tx, rp, org, scenes) if err == nil { err = tx.Commit() } @@ -658,7 +663,7 @@ func StartFlowForContacts( continue } - err = models.ApplyPostEventHooks(ctx, tx, rp, org, []*models.Session{session}) + err = models.ApplyPostEventHooks(ctx, tx, rp, org, []*models.Scene{session.Scene()}) if err != nil { tx.Rollback() log.WithError(err).Errorf("error applying post commit hook") diff --git a/web/contact/contact.go b/web/contact/contact.go index cbcfa804d..8535f78a2 100644 --- a/web/contact/contact.go +++ b/web/contact/contact.go @@ -3,18 +3,12 @@ package contact import ( "context" "encoding/json" - "math" "net/http" "github.com/nyaruka/goflow/assets" - "github.com/nyaruka/goflow/envs" "github.com/nyaruka/goflow/flows" - "github.com/nyaruka/goflow/flows/actions" - "github.com/nyaruka/goflow/flows/definition" - "github.com/nyaruka/goflow/flows/triggers" + "github.com/nyaruka/goflow/flows/actions/modifiers" "github.com/nyaruka/goflow/utils" - "github.com/nyaruka/goflow/utils/uuids" - "github.com/nyaruka/mailroom/goflow" "github.com/nyaruka/mailroom/models" "github.com/nyaruka/mailroom/search" "github.com/nyaruka/mailroom/web" @@ -24,7 +18,7 @@ import ( func init() { web.RegisterJSONRoute(http.MethodPost, "/mr/contact/search", web.RequireAuthToken(handleSearch)) web.RegisterJSONRoute(http.MethodPost, "/mr/contact/parse_query", web.RequireAuthToken(handleParseQuery)) - web.RegisterJSONRoute(http.MethodPost, "/mr/contact/apply_actions", web.RequireAuthToken(handleApplyActions)) + web.RegisterJSONRoute(http.MethodPost, "/mr/contact/modify", web.RequireAuthToken(handleModify)) } // Searches the contacts for an org @@ -188,61 +182,51 @@ func handleParseQuery(ctx context.Context, s *web.Server, r *http.Request) (inte return response, http.StatusOK, nil } -// Request update a contact. Clients should only pass in the fields they want updated. +// Request that a set of contacts is modified. // // { // "org_id": 1, -// "contact_uuid": "559d4cf7-8ed3-43db-9bbb-2be85345f87e", -// "name": "Joe", -// "fields": { -// "age": "124" -// }, -// "add_groups": [], -// "remove_groups": [] +// "contact_ids": [15,235], +// "modifiers": [{ +// "type": "groups", +// "modification": "add", +// "groups": [{ +// "uuid": "a8e8efdb-78ee-46e7-9eb0-6a578da3b02d", +// "name": "Doctors" +// }] +// }] // } // -type applyActionsRequest struct { - OrgID models.OrgID `json:"org_id" validate:"required"` - ContactID models.ContactID `json:"contact_id" validate:"required"` - Actions []json.RawMessage `json:"actions" validate:"required"` +type modifyRequest struct { + OrgID models.OrgID `json:"org_id" validate:"required"` + ContactIDs []models.ContactID `json:"contact_ids" validate:"required"` + Modifiers []json.RawMessage `json:"modifiers" validate:"required"` } // Response for a contact update. Will return the full contact state and any errors // // { -// "contact": { -// "id": 123, -// "contact_uuid": "559d4cf7-8ed3-43db-9bbb-2be85345f87e", -// "name": "Joe", -// "language": "eng", -// "created_on": ".." -// "urns": [ .. ], -// "fields": { -// } -// "groups": [ .. ], -// } +// "1000": { +// "contact": { +// "id": 123, +// "contact_uuid": "559d4cf7-8ed3-43db-9bbb-2be85345f87e", +// "name": "Joe", +// "language": "eng", +// ... +// }], +// "events": [{ +// .... +// }] +// }, ... // } -type applyActionsResponse struct { +type modifyResult struct { Contact *flows.Contact `json:"contact"` Events []flows.Event `json:"events"` } -// the types of actions our apply_actions endpoind supports -var supportedTypes map[string]bool = map[string]bool{ - actions.TypeAddContactGroups: true, - actions.TypeAddContactURN: true, - // actions.TypeRemoveContactURN <-- missing - actions.TypeRemoveContactGroups: true, - actions.TypeSetContactChannel: true, - actions.TypeSetContactLanguage: true, - actions.TypeSetContactName: true, - actions.TypeSetContactTimezone: true, - actions.TypeSetContactField: true, -} - // handles a request to apply the passed in actions -func handleApplyActions(ctx context.Context, s *web.Server, r *http.Request) (interface{}, int, error) { - request := &applyActionsRequest{} +func handleModify(ctx context.Context, s *web.Server, r *http.Request) (interface{}, int, error) { + request := &modifyRequest{} if err := utils.UnmarshalAndValidateWithLimit(r.Body, request, web.MaxRequestBytes); err != nil { return errors.Wrapf(err, "request failed validation"), http.StatusBadRequest, nil } @@ -259,93 +243,64 @@ func handleApplyActions(ctx context.Context, s *web.Server, r *http.Request) (in return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to clone orgs") } - // load our contact - contact, err := models.LoadContact(ctx, s.DB, org, request.ContactID) - if err != nil { - return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load contact") - } - if contact == nil { - return errors.Errorf("unable to find contact widh id: %d", request.ContactID), http.StatusBadRequest, nil - } - - // build up our actions - as := make([]flows.Action, len(request.Actions)) - for i, a := range request.Actions { - action, err := actions.ReadAction(a) + // build up our modifiers + mods := make([]flows.Modifier, len(request.Modifiers)) + for i, m := range request.Modifiers { + mod, err := modifiers.ReadModifier(org.SessionAssets(), m, nil) if err != nil { - return errors.Wrapf(err, "error in action: %s", string(a)), http.StatusBadRequest, nil - } - if !supportedTypes[action.Type()] { - return errors.Errorf("unsupported action type: %s", action.Type()), http.StatusBadRequest, nil + return errors.Wrapf(err, "error in modifier: %s", string(m)), http.StatusBadRequest, nil } - - as[i] = action + mods[i] = mod } - // create a minimal node with these actions - entry := definition.NewNode( - flows.NodeUUID(uuids.New()), - as, - nil, - []flows.Exit{definition.NewExit(flows.ExitUUID(uuids.New()), "")}, - ) - - // we have our nodes, lets create our flow - flowUUID := assets.FlowUUID(uuids.New()) - flowDef, err := definition.NewFlow( - flowUUID, - "Contact Update Flow", - envs.Language("eng"), - flows.FlowTypeMessaging, - 1, - 300, - definition.NewLocalization(), - []flows.Node{entry}, - nil, - ) + // load our contacts + contacts, err := models.LoadContacts(ctx, s.DB, org, request.ContactIDs) if err != nil { - return nil, http.StatusInternalServerError, errors.Wrapf(err, "error building contact flow") + return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load contact") } - flowJSON, err := json.Marshal(flowDef) - if err != nil { - return nil, http.StatusInternalServerError, errors.Wrapf(err, "error marshalling contact flow") - } + results := make(map[models.ContactID]modifyResult) + + // create scenes for our contacts + scenes := make([]*models.Scene, 0, len(contacts)) + for _, contact := range contacts { + flowContact, err := contact.FlowContact(org) + if err != nil { + return nil, http.StatusInternalServerError, errors.Wrapf(err, "error creating flow contact for contact: %d", contact.ID()) + } - flow := org.SetFlow(math.MaxInt32, flowUUID, flowDef.Name(), flowJSON) + result := modifyResult{ + Contact: flowContact, + Events: make([]flows.Event, 0, len(mods)), + } - flowContact, err := contact.FlowContact(org) - if err != nil { - return nil, http.StatusInternalServerError, errors.Wrapf(err, "error converting to flow contact") - } + scene := models.NewSceneForContact(flowContact) - // build our trigger - trigger := triggers.NewManual(org.Env(), flow.FlowReference(), flowContact, nil) - flowSession, flowSprint, err := goflow.Engine().NewSession(org.SessionAssets(), trigger) - if err != nil { - return nil, http.StatusInternalServerError, errors.Wrapf(err, "error running contact flow") + // apply our modifiers + for _, mod := range mods { + mod.Apply(org.Env(), org.SessionAssets(), flowContact, func(e flows.Event) { result.Events = append(result.Events, e) }) + } + + results[contact.ID()] = result + scenes = append(scenes, scene) } + // ok, commit all our events tx, err := s.DB.BeginTxx(ctx, nil) if err != nil { return nil, http.StatusInternalServerError, errors.Wrapf(err, "error starting transaction") } - session, err := models.NewSession(ctx, tx, org, flowSession, flowSprint) - if err != nil { - return nil, http.StatusInternalServerError, errors.Wrapf(err, "error creating session object") - } - // apply our events - for _, e := range flowSprint.Events() { - err := models.ApplyEvent(ctx, tx, s.RP, org, session, e) + for _, scene := range scenes { + err := models.ApplyEvents(ctx, tx, s.RP, org, scene, results[scene.ContactID()].Events) if err != nil { - return nil, http.StatusInternalServerError, errors.Wrapf(err, "error applying event: %v", e) + return nil, http.StatusInternalServerError, errors.Wrapf(err, "error applying events") } } // gather all our pre commit events, group them by hook and apply them - err = models.ApplyPreEventHooks(ctx, tx, s.RP, org, []*models.Session{session}) + err = models.ApplyPreEventHooks(ctx, tx, s.RP, org, scenes) if err != nil { return nil, http.StatusInternalServerError, errors.Wrapf(err, "error applying pre commit hooks") } @@ -362,7 +317,7 @@ func handleApplyActions(ctx context.Context, s *web.Server, r *http.Request) (in } // then apply our post commit hooks - err = models.ApplyPostEventHooks(ctx, tx, s.RP, org, []*models.Session{session}) + err = models.ApplyPostEventHooks(ctx, tx, s.RP, org, scenes) if err != nil { return nil, http.StatusInternalServerError, errors.Wrapf(err, "error applying pre commit hooks") } @@ -372,11 +327,5 @@ func handleApplyActions(ctx context.Context, s *web.Server, r *http.Request) (in return nil, http.StatusInternalServerError, errors.Wrapf(err, "error committing pre commit hooks") } - // all done! build our response, including our updated contact and events - response := &applyActionsResponse{ - Contact: flowSession.Contact(), - Events: flowSprint.Events(), - } - - return response, http.StatusOK, nil + return results, http.StatusOK, nil } diff --git a/web/contact/testdata/apply_actions.json b/web/contact/testdata/apply_actions.json index 15ebb5e0e..e2ed8b062 100644 --- a/web/contact/testdata/apply_actions.json +++ b/web/contact/testdata/apply_actions.json @@ -2,22 +2,26 @@ { "label": "noop", "method": "POST", - "path": "/mr/contact/apply_actions", + "path": "/mr/contact/modify", "body": { "org_id": 1, - "contact_id": 10000, - "actions": [] + "contact_ids": [ + 10000 + ], + "modifiers": [] }, "status": 200, "response": { - "contact": { - "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", - "id": 10000, - "name": "Cathy", - "timezone": "America/Los_Angeles", - "created_on": "2018-07-06T12:30:00.123457Z" - }, - "events": [] + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "name": "Cathy", + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z" + }, + "events": [] + } }, "db_assertions": [ { @@ -29,35 +33,37 @@ { "label": "set name", "method": "POST", - "path": "/mr/contact/apply_actions", + "path": "/mr/contact/modify", "body": { "org_id": 1, - "contact_id": 10000, - "actions": [ + "contact_ids": [ + 10000 + ], + "modifiers": [ { - "uuid": "8eebd020-1af5-431c-b943-aa670fc74da9", - "type": "set_contact_name", + "type": "name", "name": "Kathy" } ] }, "status": 200, "response": { - "contact": { - "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", - "id": 10000, - "name": "Kathy", - "timezone": "America/Los_Angeles", - "created_on": "2018-07-06T12:30:00.123457Z" - }, - "events": [ - { - "type": "contact_name_changed", - "created_on": "2018-07-06T12:30:12.123456789Z", - "step_uuid": "efcd6cea-84b6-425e-a07b-e063716f96dd", - "name": "Kathy" - } - ] + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "name": "Kathy", + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z" + }, + "events": [ + { + "type": "contact_name_changed", + "created_on": "2018-07-06T12:30:01.123456789Z", + "name": "Kathy" + } + ] + } }, "db_assertions": [ { @@ -69,34 +75,36 @@ { "label": "clear name", "method": "POST", - "path": "/mr/contact/apply_actions", + "path": "/mr/contact/modify", "body": { "org_id": 1, - "contact_id": 10000, - "actions": [ + "contact_ids": [ + 10000 + ], + "modifiers": [ { - "uuid": "8eebd020-1af5-431c-b943-aa670fc74da9", - "type": "set_contact_name", + "type": "name", "name": "" } ] }, "status": 200, "response": { - "contact": { - "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", - "id": 10000, - "timezone": "America/Los_Angeles", - "created_on": "2018-07-06T12:30:00.123457Z" - }, - "events": [ - { - "type": "contact_name_changed", - "created_on": "2018-07-06T12:30:20.123456789Z", - "step_uuid": "6ac2c4fe-4a72-4053-9a11-9b9d4f515140", - "name": "" - } - ] + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z" + }, + "events": [ + { + "type": "contact_name_changed", + "created_on": "2018-07-06T12:30:02.123456789Z", + "name": "" + } + ] + } }, "db_assertions": [ { @@ -108,41 +116,15 @@ { "label": "set valid numeric field", "method": "POST", - "path": "/mr/contact/apply_actions", + "path": "/mr/contact/modify", "body": { "org_id": 1, - "contact_id": 10000, - "actions": [ + "contact_ids": [ + 10000 + ], + "modifiers": [ { - "uuid": "8eebd020-1af5-431c-b943-aa670fc74da9", - "type": "set_contact_field", - "field": { - "key": "age", - "name": "Gender" - }, - "value": "24" - } - ] - }, - "status": 200, - "response": { - "contact": { - "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", - "id": 10000, - "timezone": "America/Los_Angeles", - "created_on": "2018-07-06T12:30:00.123457Z", - "fields": { - "age": { - "text": "24", - "number": 24 - } - } - }, - "events": [ - { - "type": "contact_field_changed", - "created_on": "2018-07-06T12:30:29.123456789Z", - "step_uuid": "0a30f257-f811-4130-a78a-c04586c1e3c9", + "type": "field", "field": { "key": "age", "name": "Age" @@ -154,6 +136,37 @@ } ] }, + "status": 200, + "response": { + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z", + "fields": { + "age": { + "text": "24", + "number": 24 + } + } + }, + "events": [ + { + "type": "contact_field_changed", + "created_on": "2018-07-06T12:30:03.123456789Z", + "field": { + "key": "age", + "name": "Age" + }, + "value": { + "text": "24", + "number": 24 + } + } + ] + } + }, "db_assertions": [ { "query": "SELECT count(*) FROM contacts_contact WHERE id = 10000 AND fields = '{\"903f51da-2717-47c7-a0d3-f2f32877013d\": {\"text\": \"24\", \"number\": 24}}'", @@ -164,42 +177,44 @@ { "label": "clear field", "method": "POST", - "path": "/mr/contact/apply_actions", + "path": "/mr/contact/modify", "body": { "org_id": 1, - "contact_id": 10000, - "actions": [ + "contact_ids": [ + 10000 + ], + "modifiers": [ { - "uuid": "8eebd020-1af5-431c-b943-aa670fc74da9", - "type": "set_contact_field", + "type": "field", "field": { "key": "age", - "name": "Gender" + "name": "Age" }, - "value": "" + "value": null } ] }, "status": 200, "response": { - "contact": { - "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", - "id": 10000, - "timezone": "America/Los_Angeles", - "created_on": "2018-07-06T12:30:00.123457Z" - }, - "events": [ - { - "type": "contact_field_changed", - "created_on": "2018-07-06T12:30:37.123456789Z", - "step_uuid": "5f832708-1430-4867-b5d0-6711b238001c", - "field": { - "key": "age", - "name": "Age" - }, - "value": null - } - ] + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z" + }, + "events": [ + { + "type": "contact_field_changed", + "created_on": "2018-07-06T12:30:04.123456789Z", + "field": { + "key": "age", + "name": "Age" + }, + "value": null + } + ] + } }, "db_assertions": [ { @@ -211,14 +226,16 @@ { "label": "add group", "method": "POST", - "path": "/mr/contact/apply_actions", + "path": "/mr/contact/modify", "body": { "org_id": 1, - "contact_id": 10000, - "actions": [ + "contact_ids": [ + 10000 + ], + "modifiers": [ { - "uuid": "8eebd020-1af5-431c-b943-aa670fc74da9", - "type": "add_contact_groups", + "type": "groups", + "modification": "add", "groups": [ { "name": "Doctors", @@ -230,32 +247,80 @@ }, "status": 200, "response": { - "contact": { - "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", - "id": 10000, - "timezone": "America/Los_Angeles", - "created_on": "2018-07-06T12:30:00.123457Z", - "groups": [ + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z", + "groups": [ + { + "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638", + "name": "Doctors" + } + ] + }, + "events": [ { - "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638", - "name": "Doctors" + "type": "contact_groups_changed", + "created_on": "2018-07-06T12:30:05.123456789Z", + "groups_added": [ + { + "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638", + "name": "Doctors" + } + ] } ] - }, - "events": [ + } + }, + "db_assertions": [ + { + "query": "SELECT count(*) FROM contacts_contactgroup_contacts WHERE contact_id = 10000 AND contactgroup_id = 10000", + "count": 1 + } + ] + }, + { + "label": "add group contact already in", + "method": "POST", + "path": "/mr/contact/modify", + "body": { + "org_id": 1, + "contact_ids": [ + 10000 + ], + "modifiers": [ { - "type": "contact_groups_changed", - "created_on": "2018-07-06T12:30:45.123456789Z", - "step_uuid": "fdb474aa-fe8a-4d3e-8515-dd4650cf5117", - "groups_added": [ + "type": "groups", + "modification": "add", + "groups": [ { - "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638", - "name": "Doctors" + "name": "Doctors", + "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638" } ] } ] }, + "status": 200, + "response": { + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z", + "groups": [ + { + "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638", + "name": "Doctors" + } + ] + }, + "events": [] + } + }, "db_assertions": [ { "query": "SELECT count(*) FROM contacts_contactgroup_contacts WHERE contact_id = 10000 AND contactgroup_id = 10000", @@ -266,14 +331,16 @@ { "label": "remove group", "method": "POST", - "path": "/mr/contact/apply_actions", + "path": "/mr/contact/modify", "body": { "org_id": 1, - "contact_id": 10000, - "actions": [ + "contact_ids": [ + 10000 + ], + "modifiers": [ { - "uuid": "8eebd020-1af5-431c-b943-aa670fc74da9", - "type": "remove_contact_groups", + "type": "groups", + "modification": "remove", "groups": [ { "name": "Doctors", @@ -285,26 +352,68 @@ }, "status": 200, "response": { - "contact": { - "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", - "id": 10000, - "timezone": "America/Los_Angeles", - "created_on": "2018-07-06T12:30:00.123457Z" - }, - "events": [ + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z" + }, + "events": [ + { + "type": "contact_groups_changed", + "created_on": "2018-07-06T12:30:06.123456789Z", + "groups_removed": [ + { + "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638", + "name": "Doctors" + } + ] + } + ] + } + }, + "db_assertions": [ + { + "query": "SELECT count(*) FROM contacts_contactgroup_contacts WHERE contact_id = 10000 AND contactgroup_id = 10000", + "count": 0 + } + ] + }, + { + "label": "remove group contact isn't part of", + "method": "POST", + "path": "/mr/contact/modify", + "body": { + "org_id": 1, + "contact_ids": [ + 10000 + ], + "modifiers": [ { - "type": "contact_groups_changed", - "created_on": "2018-07-06T12:30:53.123456789Z", - "step_uuid": "190e3236-23a8-48d1-aae1-769f40a26631", - "groups_removed": [ + "type": "groups", + "modification": "remove", + "groups": [ { - "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638", - "name": "Doctors" + "name": "Doctors", + "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638" } ] } ] }, + "status": 200, + "response": { + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z" + }, + "events": [] + } + }, "db_assertions": [ { "query": "SELECT count(*) FROM contacts_contactgroup_contacts WHERE contact_id = 10000 AND contactgroup_id = 10000", @@ -315,35 +424,37 @@ { "label": "set contact language", "method": "POST", - "path": "/mr/contact/apply_actions", + "path": "/mr/contact/modify", "body": { "org_id": 1, - "contact_id": 10000, - "actions": [ + "contact_ids": [ + 10000 + ], + "modifiers": [ { - "uuid": "8eebd020-1af5-431c-b943-aa670fc74da9", - "type": "set_contact_language", + "type": "language", "language": "fra" } ] }, "status": 200, "response": { - "contact": { - "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", - "id": 10000, - "language": "fra", - "timezone": "America/Los_Angeles", - "created_on": "2018-07-06T12:30:00.123457Z" - }, - "events": [ - { - "type": "contact_language_changed", - "created_on": "2018-07-06T12:31:01.123456789Z", - "step_uuid": "a8665c42-341d-4a4a-9aea-595ab1895f96", - "language": "fra" - } - ] + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "language": "fra", + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z" + }, + "events": [ + { + "type": "contact_language_changed", + "created_on": "2018-07-06T12:30:07.123456789Z", + "language": "fra" + } + ] + } }, "db_assertions": [ { @@ -355,45 +466,87 @@ { "label": "add URN", "method": "POST", - "path": "/mr/contact/apply_actions", + "path": "/mr/contact/modify", "body": { "org_id": 1, - "contact_id": 10000, - "actions": [ + "contact_ids": [ + 10000 + ], + "modifiers": [ { - "uuid": "8eebd020-1af5-431c-b943-aa670fc74da9", - "type": "add_contact_urn", - "scheme": "tel", - "path": "0788555111" + "type": "urn", + "modification": "append", + "urn": "tel:+255788555111" } ] }, "status": 200, "response": { - "contact": { - "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", - "id": 10000, - "language": "fra", - "timezone": "America/Los_Angeles", - "created_on": "2018-07-06T12:30:00.123457Z", - "urns": [ - "tel:0788555111" - ] - }, - "events": [ - { - "type": "contact_urns_changed", - "created_on": "2018-07-06T12:31:09.123456789Z", - "step_uuid": "60ab6284-71a4-4083-8383-661801bb0bfd", + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "language": "fra", + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z", "urns": [ - "tel:0788555111" + "tel:+255788555111" ] + }, + "events": [ + { + "type": "contact_urns_changed", + "created_on": "2018-07-06T12:30:08.123456789Z", + "urns": [ + "tel:+255788555111" + ] + } + ] + } + }, + "db_assertions": [ + { + "query": "SELECT count(*) FROM contacts_contacturn WHERE contact_id = 10000 AND path = '+255788555111'", + "count": 1 + } + ] + }, + { + "label": "add existing URN", + "method": "POST", + "path": "/mr/contact/modify", + "body": { + "org_id": 1, + "contact_ids": [ + 10000 + ], + "modifiers": [ + { + "type": "urn", + "modification": "append", + "urn": "tel:+255788555111" } ] }, + "status": 200, + "response": { + "10000": { + "contact": { + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "id": 10000, + "language": "fra", + "timezone": "America/Los_Angeles", + "created_on": "2018-07-06T12:30:00.123457Z", + "urns": [ + "tel:+255788555111?id=20121\u0026priority=1000" + ] + }, + "events": [] + } + }, "db_assertions": [ { - "query": "SELECT count(*) FROM contacts_contacturn WHERE contact_id = 10000 AND path = '0788555111'", + "query": "SELECT count(*) FROM contacts_contacturn WHERE contact_id = 10000 AND path = '+255788555111'", "count": 1 } ] diff --git a/web/surveyor/surveyor.go b/web/surveyor/surveyor.go index cd7f024e8..757f83e84 100644 --- a/web/surveyor/surveyor.go +++ b/web/surveyor/surveyor.go @@ -171,7 +171,7 @@ func handleSubmit(ctx context.Context, s *web.Server, r *http.Request) (interfac } // write our post commit hooks - err = models.ApplyPostEventHooks(ctx, tx, s.RP, org, sessions) + err = models.ApplyPostEventHooks(ctx, tx, s.RP, org, []*models.Scene{sessions[0].Scene()}) if err != nil { tx.Rollback() return nil, http.StatusInternalServerError, errors.Wrapf(err, "error applying post commit hooks")