Skip to content

Commit

Permalink
Merge pull request rapidpro#501 from nyaruka/ticket_group_fix
Browse files Browse the repository at this point in the history
Recalculate dynamic groups after closing and reopening tickets
  • Loading branch information
rowanseymour authored Sep 23, 2021
2 parents d067aa1 + 66e07f9 commit a4a5cef
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 70 deletions.
6 changes: 3 additions & 3 deletions core/models/campaigns.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,9 @@ type FireDelete struct {
EventID CampaignEventID `db:"event_id"`
}

// DeleteUnfiredContactEvents deletes all unfired event fires for the passed in contact
func DeleteUnfiredContactEvents(ctx context.Context, tx Queryer, contactID ContactID) error {
_, err := tx.ExecContext(ctx, `DELETE FROM campaigns_eventfire WHERE contact_id = $1 AND fired IS NULL`, contactID)
// DeleteUnfiredContactEvents deletes all unfired event fires for the passed in contacts
func DeleteUnfiredContactEvents(ctx context.Context, tx Queryer, contactIDs []ContactID) error {
_, err := tx.ExecContext(ctx, `DELETE FROM campaigns_eventfire WHERE contact_id = ANY($1) AND fired IS NULL`, pq.Array(contactIDs))
if err != nil {
return errors.Wrapf(err, "error deleting unfired contact events")
}
Expand Down
100 changes: 54 additions & 46 deletions core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func CreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, userID
return nil, nil, errors.Wrapf(err, "error creating flow contact")
}

err = CalculateDynamicGroups(ctx, db, oa, flowContact)
err = CalculateDynamicGroups(ctx, db, oa, []*flows.Contact{flowContact})
if err != nil {
return nil, nil, errors.Wrapf(err, "error calculating dynamic groups")
}
Expand Down Expand Up @@ -632,7 +632,7 @@ func GetOrCreateContact(ctx context.Context, db QueryerWithTx, oa *OrgAssets, ur

// calculate dynamic groups if contact was created
if created {
err := CalculateDynamicGroups(ctx, db, oa, flowContact)
err := CalculateDynamicGroups(ctx, db, oa, []*flows.Contact{flowContact})
if err != nil {
return nil, nil, false, errors.Wrapf(err, "error calculating dynamic groups")
}
Expand Down Expand Up @@ -918,71 +918,79 @@ func URNForID(ctx context.Context, db Queryer, org *OrgAssets, urnID URNID) (urn

// CalculateDynamicGroups recalculates all the dynamic groups for the passed in contact, recalculating
// campaigns as necessary based on those group changes.
func CalculateDynamicGroups(ctx context.Context, db Queryer, org *OrgAssets, contact *flows.Contact) error {
added, removed := contact.ReevaluateQueryBasedGroups(org.Env())

campaigns := make(map[CampaignID]*Campaign)
func CalculateDynamicGroups(ctx context.Context, db Queryer, org *OrgAssets, contacts []*flows.Contact) error {
contactIDs := make([]ContactID, len(contacts))
groupAdds := make([]*GroupAdd, 0, 2*len(contacts))
groupRemoves := make([]*GroupRemove, 0, 2*len(contacts))
checkCampaigns := make(map[*Campaign][]*flows.Contact)

for i, contact := range contacts {
contactIDs[i] = ContactID(contact.ID())
added, removed := contact.ReevaluateQueryBasedGroups(org.Env())

for _, a := range added {
group := org.GroupByUUID(a.UUID())
if group != nil {
groupAdds = append(groupAdds, &GroupAdd{
ContactID: ContactID(contact.ID()),
GroupID: group.ID(),
})
}

groupAdds := make([]*GroupAdd, 0, 1)
for _, a := range added {
group := org.GroupByUUID(a.UUID())
if group == nil {
return errors.Errorf("added to unknown group: %s", a.UUID())
// add in any campaigns we may qualify for
for _, campaign := range org.CampaignByGroupID(group.ID()) {
checkCampaigns[campaign] = append(checkCampaigns[campaign], contact)
}
}
groupAdds = append(groupAdds, &GroupAdd{
ContactID: ContactID(contact.ID()),
GroupID: group.ID(),
})

// add in any campaigns we may qualify for
for _, c := range org.CampaignByGroupID(group.ID()) {
campaigns[c.ID()] = c

for _, r := range removed {
group := org.GroupByUUID(r.UUID())
if group != nil {
groupRemoves = append(groupRemoves, &GroupRemove{
ContactID: ContactID(contact.ID()),
GroupID: group.ID(),
})
}

}
}

err := AddContactsToGroups(ctx, db, groupAdds)
if err != nil {
return errors.Wrapf(err, "error adding contact to groups")
}

groupRemoves := make([]*GroupRemove, 0, 1)
for _, r := range removed {
group := org.GroupByUUID(r.UUID())
if group == nil {
return errors.Wrapf(err, "removed from an unknown group: %s", r.UUID())
}
groupRemoves = append(groupRemoves, &GroupRemove{
ContactID: ContactID(contact.ID()),
GroupID: group.ID(),
})
}
err = RemoveContactsFromGroups(ctx, db, groupRemoves)
if err != nil {
return errors.Wrapf(err, "error removing contact from group")
}

// clear any unfired campaign events for this contact
err = DeleteUnfiredContactEvents(ctx, db, ContactID(contact.ID()))
err = DeleteUnfiredContactEvents(ctx, db, contactIDs)
if err != nil {
return errors.Wrapf(err, "error deleting unfired events for contact")
return errors.Wrapf(err, "error deleting unfired events")
}

// for each campaign figure out if we need to be added to any events
fireAdds := make([]*FireAdd, 0, 2)
fireAdds := make([]*FireAdd, 0, 2*len(contacts))
tz := org.Env().Timezone()
now := time.Now()
for _, c := range campaigns {
for _, ce := range c.Events() {
scheduled, err := ce.ScheduleForContact(tz, now, contact)
if err != nil {
return errors.Wrapf(err, "error calculating schedule for event: %d", ce.ID())
}

if scheduled != nil {
fireAdds = append(fireAdds, &FireAdd{
ContactID: ContactID(contact.ID()),
EventID: ce.ID(),
Scheduled: *scheduled,
})
for campaign, eligibleContacts := range checkCampaigns {
for _, ce := range campaign.Events() {

for _, contact := range eligibleContacts {
scheduled, err := ce.ScheduleForContact(tz, now, contact)
if err != nil {
return errors.Wrapf(err, "error calculating schedule for event: %d", ce.ID())
}

if scheduled != nil {
fireAdds = append(fireAdds, &FireAdd{
ContactID: ContactID(contact.ID()),
EventID: ce.ID(),
Scheduled: *scheduled,
})
}
}
}
}
Expand Down
42 changes: 38 additions & 4 deletions core/models/tickets.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ func CloseTickets(ctx context.Context, db Queryer, oa *OrgAssets, userID UserID,
ids := make([]TicketID, 0, len(tickets))
events := make([]*TicketEvent, 0, len(tickets))
eventsByTicket := make(map[*Ticket]*TicketEvent, len(tickets))
contactIDs := make(map[ContactID]bool, len(tickets))
now := dates.Now()

for _, ticket := range tickets {
Expand All @@ -537,6 +538,7 @@ func CloseTickets(ctx context.Context, db Queryer, oa *OrgAssets, userID UserID,
e := NewTicketClosedEvent(ticket, userID)
events = append(events, e)
eventsByTicket[ticket] = e
contactIDs[ticket.ContactID()] = true
}
}

Expand All @@ -563,11 +565,14 @@ func CloseTickets(ctx context.Context, db Queryer, oa *OrgAssets, userID UserID,
return nil, errors.Wrapf(err, "error updating tickets")
}

err = InsertTicketEvents(ctx, db, events)
if err != nil {
if err := InsertTicketEvents(ctx, db, events); err != nil {
return nil, errors.Wrapf(err, "error inserting ticket events")
}

if err := recalcGroupsForTicketChanges(ctx, db, oa, contactIDs); err != nil {
return nil, errors.Wrapf(err, "error recalculting groups")
}

return eventsByTicket, nil
}

Expand All @@ -584,11 +589,12 @@ WHERE
`

// ReopenTickets reopens the passed in tickets
func ReopenTickets(ctx context.Context, db Queryer, org *OrgAssets, userID UserID, tickets []*Ticket, externally bool, logger *HTTPLogger) (map[*Ticket]*TicketEvent, error) {
func ReopenTickets(ctx context.Context, db Queryer, oa *OrgAssets, userID UserID, tickets []*Ticket, externally bool, logger *HTTPLogger) (map[*Ticket]*TicketEvent, error) {
byTicketer := make(map[TicketerID][]*Ticket)
ids := make([]TicketID, 0, len(tickets))
events := make([]*TicketEvent, 0, len(tickets))
eventsByTicket := make(map[*Ticket]*TicketEvent, len(tickets))
contactIDs := make(map[ContactID]bool, len(tickets))
now := dates.Now()

for _, ticket := range tickets {
Expand All @@ -604,12 +610,13 @@ func ReopenTickets(ctx context.Context, db Queryer, org *OrgAssets, userID UserI
e := NewTicketReopenedEvent(ticket, userID)
events = append(events, e)
eventsByTicket[ticket] = e
contactIDs[ticket.ContactID()] = true
}
}

if externally {
for ticketerID, ticketerTickets := range byTicketer {
ticketer := org.TicketerByID(ticketerID)
ticketer := oa.TicketerByID(ticketerID)
if ticketer != nil {
service, err := ticketer.AsService(config.Mailroom, flows.NewTicketer(ticketer))
if err != nil {
Expand All @@ -635,9 +642,36 @@ func ReopenTickets(ctx context.Context, db Queryer, org *OrgAssets, userID UserI
return nil, errors.Wrapf(err, "error inserting ticket events")
}

if err := recalcGroupsForTicketChanges(ctx, db, oa, contactIDs); err != nil {
return nil, errors.Wrapf(err, "error recalculting groups")
}

return eventsByTicket, nil
}

// because groups can be based on "tickets" need to recalculate after closing/reopening tickets
func recalcGroupsForTicketChanges(ctx context.Context, db Queryer, oa *OrgAssets, contactIDs map[ContactID]bool) error {
ids := make([]ContactID, 0, len(contactIDs))
for cid := range contactIDs {
ids = append(ids, cid)
}

contacts, err := LoadContacts(ctx, db, oa, ids)
if err != nil {
return errors.Wrap(err, "error loading contacts with ticket changes")
}

flowContacts := make([]*flows.Contact, len(contacts))
for i, contact := range contacts {
flowContacts[i], err = contact.FlowContact(oa)
if err != nil {
return errors.Wrap(err, "error loading flow contact")
}
}

return CalculateDynamicGroups(ctx, db, oa, flowContacts)
}

// Ticketer is our type for a ticketer asset
type Ticketer struct {
t struct {
Expand Down
29 changes: 26 additions & 3 deletions core/models/tickets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ func TestCloseTickets(t *testing.T) {
},
}))

oa, err := models.GetOrgAssetsWithRefresh(ctx, db, testdata.Org1.ID, models.RefreshTicketers)
testdata.InsertContactGroup(db, testdata.Org1, "94c816d7-cc87-42db-a577-ce072ceaab80", "Tickets", "tickets > 0")

oa, err := models.GetOrgAssetsWithRefresh(ctx, db, testdata.Org1.ID, models.RefreshTicketers|models.RefreshGroups)
require.NoError(t, err)

ticket1 := testdata.InsertOpenTicket(db, testdata.Org1, testdata.Cathy, testdata.Mailgun, testdata.DefaultTopic, "Where my shoes", "123", nil)
Expand All @@ -290,6 +292,14 @@ func TestCloseTickets(t *testing.T) {
ticket2 := testdata.InsertClosedTicket(db, testdata.Org1, testdata.Cathy, testdata.Zendesk, testdata.DefaultTopic, "Where my pants", "234", nil)
modelTicket2 := ticket2.Load(db)

_, cathy := testdata.Cathy.Load(db, oa)

err = models.CalculateDynamicGroups(ctx, db, oa, []*flows.Contact{cathy})
require.NoError(t, err)

assert.Equal(t, "Doctors", cathy.Groups().All()[0].Name())
assert.Equal(t, "Tickets", cathy.Groups().All()[1].Name())

logger := &models.HTTPLogger{}
evts, err := models.CloseTickets(ctx, db, oa, testdata.Admin.ID, []*models.Ticket{modelTicket1, modelTicket2}, true, false, logger)
require.NoError(t, err)
Expand All @@ -308,7 +318,12 @@ func TestCloseTickets(t *testing.T) {

testsuite.AssertQuery(t, db, `SELECT count(*) FROM request_logs_httplog WHERE ticketer_id = $1`, testdata.Mailgun.ID).Returns(1)

// but no events for ticket #2 which waas already closed
// reload Cathy and check they're no longer in the tickets group
_, cathy = testdata.Cathy.Load(db, oa)
assert.Equal(t, 1, len(cathy.Groups().All()))
assert.Equal(t, "Doctors", cathy.Groups().All()[0].Name())

// but no events for ticket #2 which was already closed
testsuite.AssertQuery(t, db, `SELECT count(*) FROM tickets_ticketevent WHERE ticket_id = $1 AND event_type = 'C'`, ticket2.ID).Returns(0)

// can close tickets without a user
Expand Down Expand Up @@ -338,7 +353,9 @@ func TestReopenTickets(t *testing.T) {
},
}))

oa, err := models.GetOrgAssetsWithRefresh(ctx, db, testdata.Org1.ID, models.RefreshTicketers)
testdata.InsertContactGroup(db, testdata.Org1, "94c816d7-cc87-42db-a577-ce072ceaab80", "Two Tickets", "tickets = 2")

oa, err := models.GetOrgAssetsWithRefresh(ctx, db, testdata.Org1.ID, models.RefreshTicketers|models.RefreshGroups)
require.NoError(t, err)

ticket1 := testdata.InsertClosedTicket(db, testdata.Org1, testdata.Cathy, testdata.Mailgun, testdata.DefaultTopic, "Where my shoes", "123", nil)
Expand Down Expand Up @@ -366,4 +383,10 @@ func TestReopenTickets(t *testing.T) {

// but no events for ticket #2 which waas already open
testsuite.AssertQuery(t, db, `SELECT count(*) FROM tickets_ticketevent WHERE ticket_id = $1 AND event_type = 'R'`, ticket2.ID).Returns(0)

// check Cathy is now in the two tickets group
_, cathy := testdata.Cathy.Load(db, oa)
assert.Equal(t, 2, len(cathy.Groups().All()))
assert.Equal(t, "Doctors", cathy.Groups().All()[0].Name())
assert.Equal(t, "Two Tickets", cathy.Groups().All()[1].Name())
}
4 changes: 2 additions & 2 deletions core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func HandleChannelEvent(ctx context.Context, rt *runtime.Runtime, eventType mode
}

if event.IsNewContact() {
err = models.CalculateDynamicGroups(ctx, rt.DB, oa, contact)
err = models.CalculateDynamicGroups(ctx, rt.DB, oa, []*flows.Contact{contact})
if err != nil {
return nil, errors.Wrapf(err, "unable to initialize new contact")
}
Expand Down Expand Up @@ -546,7 +546,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e

// if this is a new contact, we need to calculate dynamic groups and campaigns
if newContact {
err = models.CalculateDynamicGroups(ctx, rt.DB, oa, contact)
err = models.CalculateDynamicGroups(ctx, rt.DB, oa, []*flows.Contact{contact})
if err != nil {
return errors.Wrapf(err, "unable to initialize new contact")
}
Expand Down
4 changes: 2 additions & 2 deletions services/tickets/mailgun/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func handleReceive(ctx context.Context, rt *runtime.Runtime, r *http.Request, l

// check if reply is actually a command
if strings.ToLower(strings.TrimSpace(request.StrippedText)) == "close" {
err = tickets.CloseTicket(ctx, rt, oa, ticket, true, l)
err = tickets.Close(ctx, rt, oa, ticket, true, l)
if err != nil {
return errors.Wrapf(err, "error closing ticket: %s", ticket.UUID()), http.StatusInternalServerError, nil
}
Expand All @@ -127,7 +127,7 @@ func handleReceive(ctx context.Context, rt *runtime.Runtime, r *http.Request, l

// reopen ticket if necessary
if ticket.Status() != models.TicketStatusOpen {
err = tickets.ReopenTicket(ctx, rt, oa, ticket, false, nil)
err = tickets.Reopen(ctx, rt, oa, ticket, false, nil)
if err != nil {
return errors.Wrapf(err, "error reopening ticket: %s", ticket.UUID()), http.StatusInternalServerError, nil
}
Expand Down
7 changes: 6 additions & 1 deletion services/tickets/rocketchat/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func handleEventCallback(ctx context.Context, rt *runtime.Runtime, r *http.Reque
return errors.Errorf("no such ticket %s", request.TicketID), http.StatusNotFound, nil
}

oa, err := models.GetOrgAssets(ctx, rt.DB, ticket.OrgID())
if err != nil {
return err, http.StatusBadRequest, nil
}

// handle event callback
switch request.Type {

Expand Down Expand Up @@ -93,7 +98,7 @@ func handleEventCallback(ctx context.Context, rt *runtime.Runtime, r *http.Reque
_, err = tickets.SendReply(ctx, rt, ticket, data.Text, files)

case "close-room":
err = tickets.CloseTicket(ctx, rt, nil, ticket, false, l)
err = tickets.Close(ctx, rt, oa, ticket, false, l)

default:
err = errors.New("invalid event type")
Expand Down
8 changes: 4 additions & 4 deletions services/tickets/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func FetchFile(url string, headers map[string]string) (*File, error) {
return &File{URL: url, ContentType: contentType, Body: io.NopCloser(bytes.NewReader(trace.ResponseBody))}, nil
}

// CloseTicket closes the given ticket, and creates and queues a closed event
func CloseTicket(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, ticket *models.Ticket, externally bool, l *models.HTTPLogger) error {
// Close closes the given ticket, and creates and queues a closed event
func Close(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, ticket *models.Ticket, externally bool, l *models.HTTPLogger) error {
events, err := models.CloseTickets(ctx, rt.DB, oa, models.NilUserID, []*models.Ticket{ticket}, externally, false, l)
if err != nil {
return errors.Wrap(err, "error closing ticket")
Expand All @@ -160,8 +160,8 @@ func CloseTicket(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets,
return nil
}

// ReopenTicket reopens the given ticket
func ReopenTicket(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, ticket *models.Ticket, externally bool, l *models.HTTPLogger) error {
// Reopen reopens the given ticket
func Reopen(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, ticket *models.Ticket, externally bool, l *models.HTTPLogger) error {
_, err := models.ReopenTickets(ctx, rt.DB, oa, models.NilUserID, []*models.Ticket{ticket}, externally, l)
return err
}
Loading

0 comments on commit a4a5cef

Please sign in to comment.