Skip to content

Commit

Permalink
switch to use modifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Feb 17, 2020
1 parent 2ce43a3 commit a2eb9ed
Show file tree
Hide file tree
Showing 26 changed files with 774 additions and 580 deletions.
23 changes: 14 additions & 9 deletions hooks/airtime_transferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ type InsertAirtimeTransfersHook struct{}
var insertAirtimeTransfersHook = &InsertAirtimeTransfersHook{}

// Apply inserts all the airtime transfers that were created
func (h *InsertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
func (h *InsertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// gather all our transfers
transfers := make([]*models.AirtimeTransfer, 0, len(sessions))
transfers := make([]*models.AirtimeTransfer, 0, len(scenes))

for _, ts := range sessions {
for _, ts := range scenes {
for _, t := range ts {
transfer := t.(*models.AirtimeTransfer)
transfers = append(transfers, transfer)
Expand All @@ -43,7 +43,7 @@ func (h *InsertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp
}

// gather all our logs and set the newly inserted transfer IDs on them
logs := make([]*models.HTTPLog, 0, len(sessions))
logs := make([]*models.HTTPLog, 0, len(scenes))

for _, t := range transfers {
for _, l := range t.Logs {
Expand All @@ -62,9 +62,14 @@ func (h *InsertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp
}

// handleAirtimeTransferred is called for each airtime transferred event
func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error {
func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*events.AirtimeTransferredEvent)

// must be in a session
if scene.Session() == nil {
return errors.Errorf("cannot handle airtime transferred event without session")
}

status := models.AirtimeTransferStatusSuccess
if event.ActualAmount == decimal.Zero {
status = models.AirtimeTransferStatusFailed
Expand All @@ -73,7 +78,7 @@ func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool,
transfer := models.NewAirtimeTransfer(
org.OrgID(),
status,
session.ContactID(),
scene.ContactID(),
event.Sender,
event.Recipient,
event.Currency,
Expand All @@ -83,8 +88,8 @@ func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool,
)

logrus.WithFields(logrus.Fields{
"contact_uuid": session.ContactUUID(),
"session_id": session.ID(),
"contact_uuid": scene.ContactUUID(),
"session_id": scene.ID(),
"sender": string(event.Sender),
"recipient": string(event.Recipient),
"currency": event.Currency,
Expand All @@ -105,7 +110,7 @@ func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool,
))
}

session.AddPreCommitEvent(insertAirtimeTransfersHook, transfer)
scene.AddPreCommitEvent(insertAirtimeTransfersHook, transfer)

return nil
}
29 changes: 17 additions & 12 deletions hooks/broadcast_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ func init() {
models.RegisterEventHook(events.TypeBroadcastCreated, handleBroadcastCreated)
}

// StartBroadcastsHook is our hook for starting the broadcasts created in these sessions
// StartBroadcastsHook is our hook for starting the broadcasts created in these scene
type StartBroadcastsHook struct{}

var startBroadcastsHook = &StartBroadcastsHook{}

// Apply queues up our broadcasts for sending
func (h *StartBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
func (h *StartBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error {
rc := rp.Get()
defer rc.Close()

// for each of our sessions
for s, es := range sessions {
// for each of our scene
for s, es := range scene {
for _, e := range es {
event := e.(*events.BroadcastCreatedEvent)

// we skip over any session starts that involve groups if we are in a batch start
if len(sessions) > 1 && len(event.Groups) > 0 {
// we skip over any scene starts that involve groups if we are in a batch start
if len(scene) > 1 && len(event.Groups) > 0 {
logrus.WithField("session_id", s.ID).Error("ignoring broadcast on group in batch")
continue
}
Expand Down Expand Up @@ -62,17 +62,22 @@ func (h *StartBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.
return nil
}

// handleBroadcastCreated is called for each broadcast created event across our sessions
func handleBroadcastCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error {
// handleBroadcastCreated is called for each broadcast created event across our scene
func handleBroadcastCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error {
// must be in a session
if scene.Session() == nil {
return errors.Errorf("cannot handle broadcast created event without session")
}

event := e.(*events.BroadcastCreatedEvent)
logrus.WithFields(logrus.Fields{
"contact_uuid": session.ContactUUID(),
"session_id": session.ID(),
"contact_uuid": scene.ContactUUID(),
"session_id": scene.ID(),
"translations": event.Translations[event.BaseLanguage],
}).Debug("broadcast created")

// schedule this for being started after our sessions are committed
session.AddPostCommitEvent(startBroadcastsHook, event)
// schedule this for being started after our scene are committed
scene.AddPostCommitEvent(startBroadcastsHook, event)

return nil
}
6 changes: 3 additions & 3 deletions hooks/campaigns.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ type UpdateCampaignEventsHook struct{}

var updateCampaignEventsHook = &UpdateCampaignEventsHook{}

// Apply will update all the campaigns for the passed in sessions, minimizing the number of queries to do so
func (h *UpdateCampaignEventsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
// Apply will update all the campaigns for the passed in scene, minimizing the number of queries to do so
func (h *UpdateCampaignEventsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error {
// these are all the events we need to delete unfired fires for
deletes := make([]*models.FireDelete, 0, 5)

// these are all the new events we need to insert
inserts := make([]*models.FireAdd, 0, 5)

for s, es := range sessions {
for s, es := range scene {
groupAdds := make(map[models.GroupID]bool)
groupRemoves := make(map[models.GroupID]bool)
fieldChanges := make(map[models.FieldID]bool)
Expand Down
14 changes: 7 additions & 7 deletions hooks/classifier_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ type InsertHTTPLogsHook struct{}
var insertHTTPLogsHook = &InsertHTTPLogsHook{}

// Apply inserts all the classifier logs that were created
func (h *InsertHTTPLogsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
func (h *InsertHTTPLogsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error {
// gather all our logs
logs := make([]*models.HTTPLog, 0, len(sessions))
for _, ls := range sessions {
logs := make([]*models.HTTPLog, 0, len(scene))
for _, ls := range scene {
for _, l := range ls {
logs = append(logs, l.(*models.HTTPLog))
}
Expand All @@ -42,7 +42,7 @@ func (h *InsertHTTPLogsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.P
}

// handleClassifierCalled is called for each classifier called event
func handleClassifierCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error {
func handleClassifierCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*events.ClassifierCalledEvent)

classifier := org.ClassifierByUUID(event.Classifier.UUID)
Expand All @@ -53,8 +53,8 @@ func handleClassifierCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, or
// create a log for each HTTP call
for _, httpLog := range event.HTTPLogs {
logrus.WithFields(logrus.Fields{
"contact_uuid": session.ContactUUID(),
"session_id": session.ID(),
"contact_uuid": scene.ContactUUID(),
"session_id": scene.ID(),
"url": httpLog.URL,
"status": httpLog.Status,
"elapsed_ms": httpLog.ElapsedMS,
Expand All @@ -73,7 +73,7 @@ func handleClassifierCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, or
httpLog.CreatedOn,
)

session.AddPreCommitEvent(insertHTTPLogsHook, log)
scene.AddPreCommitEvent(insertHTTPLogsHook, log)
}

return nil
Expand Down
22 changes: 11 additions & 11 deletions hooks/contact_field_changed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ type CommitFieldChangesHook struct{}
var commitFieldChangesHook = &CommitFieldChangesHook{}

// Apply squashes and writes all the field updates for the contacts
func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error {
// our list of updates
fieldUpdates := make([]interface{}, 0, len(sessions))
fieldUpdates := make([]interface{}, 0, len(scene))
fieldDeletes := make(map[assets.FieldUUID][]interface{})
for session, es := range sessions {
for scene, es := range scene {
updates := make(map[assets.FieldUUID]*flows.Value, len(es))
for _, e := range es {
event := e.(*events.ContactFieldChangedEvent)
Expand All @@ -37,7 +37,7 @@ func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red
logrus.WithFields(logrus.Fields{
"field_key": event.Field.Key,
"field_name": event.Field.Name,
"session_id": session.ID(),
"session_id": scene.ID(),
}).Debug("unable to find field with key, ignoring")
continue
}
Expand All @@ -50,7 +50,7 @@ func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red
if v == nil || v.Text.Native() == "" {
delete(updates, k)
fieldDeletes[k] = append(fieldDeletes[k], &FieldDelete{
ContactID: session.ContactID(),
ContactID: scene.ContactID(),
FieldUUID: k,
})
}
Expand All @@ -64,7 +64,7 @@ func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red

// and queue them up for our update
fieldUpdates = append(fieldUpdates, &FieldUpdate{
ContactID: session.ContactID(),
ContactID: scene.ContactID(),
Updates: string(fieldJSON),
})
}
Expand All @@ -90,18 +90,18 @@ func (h *CommitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red
}

// handleContactFieldChanged is called when a contact field changes
func handleContactFieldChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error {
func handleContactFieldChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*events.ContactFieldChangedEvent)
logrus.WithFields(logrus.Fields{
"contact_uuid": session.ContactUUID(),
"session_id": session.ID(),
"contact_uuid": scene.ContactUUID(),
"session_id": scene.ID(),
"field_key": event.Field.Key,
"value": event.Value,
}).Debug("contact field changed")

// add our callback
session.AddPreCommitEvent(commitFieldChangesHook, event)
session.AddPreCommitEvent(updateCampaignEventsHook, event)
scene.AddPreCommitEvent(commitFieldChangesHook, event)
scene.AddPreCommitEvent(updateCampaignEventsHook, event)

return nil
}
Expand Down
36 changes: 18 additions & 18 deletions hooks/contact_groups_changed.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ type CommitGroupChangesHook struct{}
var commitGroupChangesHook = &CommitGroupChangesHook{}

// Apply squashes and adds or removes all our contact groups
func (h *CommitGroupChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
func (h *CommitGroupChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error {
// build up our list of all adds and removes
adds := make([]*models.GroupAdd, 0, len(sessions))
removes := make([]*models.GroupRemove, 0, len(sessions))
changed := make(map[models.ContactID]bool, len(sessions))
adds := make([]*models.GroupAdd, 0, len(scene))
removes := make([]*models.GroupRemove, 0, len(scene))
changed := make(map[models.ContactID]bool, len(scene))

// we remove from our groups at once, build up our list
for _, events := range sessions {
for _, events := range scene {
// we use these sets to track what our final add or remove should be
seenAdds := make(map[models.GroupID]*models.GroupAdd)
seenRemoves := make(map[models.GroupID]*models.GroupRemove)
Expand Down Expand Up @@ -84,11 +84,11 @@ func (h *CommitGroupChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *red
}

// handleContactGroupsChanged is called when a group is added or removed from our contact
func handleContactGroupsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error {
func handleContactGroupsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*events.ContactGroupsChangedEvent)
logrus.WithFields(logrus.Fields{
"contact_uuid": session.ContactUUID(),
"session_id": session.ID(),
"contact_uuid": scene.ContactUUID(),
"session_id": scene.ID(),
"groups_removed": len(event.GroupsRemoved),
"groups_added": len(event.GroupsAdded),
}).Debug("changing contact groups")
Expand All @@ -99,21 +99,21 @@ func handleContactGroupsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool
group := org.GroupByUUID(g.UUID)
if group == nil {
logrus.WithFields(logrus.Fields{
"contact_uuid": session.ContactUUID(),
"contact_uuid": scene.ContactUUID(),
"group_uuid": g.UUID,
}).Warn("unable to find group to remove, skipping")
continue
}

hookEvent := &models.GroupRemove{
ContactID: session.ContactID(),
ContactID: scene.ContactID(),
GroupID: group.ID(),
}

// add our add event
session.AddPreCommitEvent(commitGroupChangesHook, hookEvent)
session.AddPreCommitEvent(updateCampaignEventsHook, hookEvent)
session.AddPreCommitEvent(contactModifiedHook, session.Contact().ID())
scene.AddPreCommitEvent(commitGroupChangesHook, hookEvent)
scene.AddPreCommitEvent(updateCampaignEventsHook, hookEvent)
scene.AddPreCommitEvent(contactModifiedHook, scene.Contact().ID())
}

// add each of our groups
Expand All @@ -122,21 +122,21 @@ func handleContactGroupsChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool
group := org.GroupByUUID(g.UUID)
if group == nil {
logrus.WithFields(logrus.Fields{
"contact_uuid": session.ContactUUID(),
"contact_uuid": scene.ContactUUID(),
"group_uuid": g.UUID,
}).Warn("unable to find group to add, skipping")
continue
}

// add our add event
hookEvent := &models.GroupAdd{
ContactID: session.ContactID(),
ContactID: scene.ContactID(),
GroupID: group.ID(),
}

session.AddPreCommitEvent(commitGroupChangesHook, hookEvent)
session.AddPreCommitEvent(updateCampaignEventsHook, hookEvent)
session.AddPreCommitEvent(contactModifiedHook, session.Contact().ID())
scene.AddPreCommitEvent(commitGroupChangesHook, hookEvent)
scene.AddPreCommitEvent(updateCampaignEventsHook, hookEvent)
scene.AddPreCommitEvent(contactModifiedHook, scene.Contact().ID())
}

return nil
Expand Down
14 changes: 7 additions & 7 deletions hooks/contact_language_changed.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type CommitLanguageChangesHook struct{}
var commitLanguageChangesHook = &CommitLanguageChangesHook{}

// Apply applies our contact language change before our commit
func (h *CommitLanguageChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
func (h *CommitLanguageChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error {
// build up our list of pairs of contact id and language name
updates := make([]interface{}, 0, len(sessions))
for s, e := range sessions {
updates := make([]interface{}, 0, len(scene))
for s, e := range scene {
// we only care about the last name change
event := e[len(e)-1].(*events.ContactLanguageChangedEvent)
updates = append(updates, &languageUpdate{s.ContactID(), event.Language})
Expand All @@ -35,15 +35,15 @@ func (h *CommitLanguageChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *
}

// handleContactLanguageChanged is called when we process a contact language change
func handleContactLanguageChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, session *models.Session, e flows.Event) error {
func handleContactLanguageChanged(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*events.ContactLanguageChangedEvent)
logrus.WithFields(logrus.Fields{
"contact_uuid": session.ContactUUID(),
"session_id": session.ID(),
"contact_uuid": scene.ContactUUID(),
"session_id": scene.ID(),
"language": event.Language,
}).Debug("changing contact language")

session.AddPreCommitEvent(commitLanguageChangesHook, event)
scene.AddPreCommitEvent(commitLanguageChangesHook, event)
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions hooks/contact_modified.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ type ContactModifiedHook struct{}
var contactModifiedHook = &ContactModifiedHook{}

// Apply squashes and updates modified_on on all the contacts passed in
func (h *ContactModifiedHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, sessions map[*models.Session][]interface{}) error {
func (h *ContactModifiedHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *models.OrgAssets, scene map[*models.Scene][]interface{}) error {
// our list of contact ids
contactIDs := make([]models.ContactID, 0, len(sessions))
for session := range sessions {
contactIDs = append(contactIDs, session.ContactID())
contactIDs := make([]models.ContactID, 0, len(scene))
for scene := range scene {
contactIDs = append(contactIDs, scene.ContactID())
}

err := models.UpdateContactModifiedOn(ctx, tx, contactIDs)
Expand Down
Loading

0 comments on commit a2eb9ed

Please sign in to comment.