Skip to content

Commit

Permalink
Merge pull request #620 from nyaruka/search_cleanup
Browse files Browse the repository at this point in the history
Move search into its own package and add more tests
  • Loading branch information
rowanseymour authored Apr 20, 2022
2 parents 3314713 + 241c617 commit 6f2118f
Show file tree
Hide file tree
Showing 15 changed files with 397 additions and 275 deletions.
4 changes: 4 additions & 0 deletions core/models/globals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
func TestLoadGlobals(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()

defer func() {
db.MustExec(`UPDATE globals_global SET value = 'Nyaruka' WHERE org_id = $1 AND key = $2`, testdata.Org1.ID, "org_name")
}()

// set one of our global values to empty
db.MustExec(`UPDATE globals_global SET value = '' WHERE org_id = $1 AND key = $2`, testdata.Org1.ID, "org_name")

Expand Down
91 changes: 0 additions & 91 deletions core/models/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/olivere/elastic/v7"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -338,93 +337,3 @@ func AddContactsToGroupAndCampaigns(ctx context.Context, db *sqlx.DB, oa *OrgAss

return nil
}

// PopulateDynamicGroup calculates which members should be part of a group and populates the contacts
// for that group by performing the minimum number of inserts / deletes.
func PopulateDynamicGroup(ctx context.Context, db *sqlx.DB, es *elastic.Client, oa *OrgAssets, groupID GroupID, query string) (int, error) {
err := UpdateGroupStatus(ctx, db, groupID, GroupStatusEvaluating)
if err != nil {
return 0, errors.Wrapf(err, "error marking dynamic group as evaluating")
}

start := time.Now()

// we have a bit of a race with the indexer process.. we want to make sure that any contacts that changed
// before this group was updated but after the last index are included, so if a contact was modified
// more recently than 10 seconds ago, we wait that long before starting in populating our group
newest, err := GetNewestContactModifiedOn(ctx, db, oa)
if err != nil {
return 0, errors.Wrapf(err, "error getting most recent contact modified_on for org: %d", oa.OrgID())
}
if newest != nil {
n := *newest

// if it was more recent than 10 seconds ago, sleep until it has been 10 seconds
if n.Add(time.Second * 10).After(start) {
sleep := n.Add(time.Second * 10).Sub(start)
logrus.WithField("sleep", sleep).Info("sleeping before evaluating dynamic group")
time.Sleep(sleep)
}
}

// get current set of contacts in our group
ids, err := ContactIDsForGroupIDs(ctx, db, []GroupID{groupID})
if err != nil {
return 0, errors.Wrapf(err, "unable to look up contact ids for group: %d", groupID)
}
present := make(map[ContactID]bool, len(ids))
for _, i := range ids {
present[i] = true
}

// calculate new set of ids
new, err := GetContactIDsForQuery(ctx, es, oa, query, -1)
if err != nil {
return 0, errors.Wrapf(err, "error performing query: %s for group: %d", query, groupID)
}

// find which contacts need to be added or removed
adds := make([]ContactID, 0, 100)
for _, id := range new {
if !present[id] {
adds = append(adds, id)
}
delete(present, id)
}

// build our list of removals
removals := make([]ContactID, 0, len(present))
for id := range present {
removals = append(removals, id)
}

// first remove all the contacts
err = RemoveContactsFromGroupAndCampaigns(ctx, db, oa, groupID, removals)
if err != nil {
return 0, errors.Wrapf(err, "error removing contacts from group: %d", groupID)
}

// then add them all
err = AddContactsToGroupAndCampaigns(ctx, db, oa, groupID, adds)
if err != nil {
return 0, errors.Wrapf(err, "error adding contacts to group: %d", groupID)
}

// mark our group as no longer evaluating
err = UpdateGroupStatus(ctx, db, groupID, GroupStatusReady)
if err != nil {
return 0, errors.Wrapf(err, "error marking dynamic group as ready")
}

// finally update modified_on for all affected contacts to ensure these changes are seen by rp-indexer
changed := make([]ContactID, 0, len(adds))
changed = append(changed, adds...)
changed = append(changed, removals...)

err = UpdateContactModifiedOn(ctx, db, changed)
if err != nil {
return 0, errors.Wrapf(err, "error updating contact modified_on after group population")
}

return len(new), nil
}
81 changes: 0 additions & 81 deletions core/models/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package models_test

import (
"errors"
"fmt"
"testing"

"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/lib/pq"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -58,81 +55,3 @@ func TestLoadGroups(t *testing.T) {
assert.Equal(t, tc.query, group.Query())
}
}

func TestSmartGroups(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()

defer testsuite.Reset(testsuite.ResetAll)

// insert an event on our campaign
newEvent := testdata.InsertCampaignFlowEvent(db, testdata.RemindersCampaign, testdata.Favorites, testdata.JoinedField, 1000, "W")

// clear Cathy's value
db.MustExec(
`update contacts_contact set fields = fields - $2
WHERE id = $1`, testdata.Cathy.ID, testdata.JoinedField.UUID)

// and populate Bob's
db.MustExec(
fmt.Sprintf(`update contacts_contact set fields = fields ||
'{"%s": { "text": "2029-09-15T12:00:00+00:00", "datetime": "2029-09-15T12:00:00+00:00" }}'::jsonb
WHERE id = $1`, testdata.JoinedField.UUID), testdata.Bob.ID)

oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshCampaigns|models.RefreshGroups)
assert.NoError(t, err)

mockES := testsuite.NewMockElasticServer()
defer mockES.Close()

es := mockES.Client()

mockES.AddResponse(testdata.Cathy.ID)
mockES.AddResponse(testdata.Bob.ID)
mockES.AddResponse(testdata.Bob.ID)

tcs := []struct {
Query string
ContactIDs []models.ContactID
EventContactIDs []models.ContactID
}{
{
"cathy",
[]models.ContactID{testdata.Cathy.ID},
[]models.ContactID{},
},
{
"bob",
[]models.ContactID{testdata.Bob.ID},
[]models.ContactID{testdata.Bob.ID},
},
{
"unchanged",
[]models.ContactID{testdata.Bob.ID},
[]models.ContactID{testdata.Bob.ID},
},
}

for _, tc := range tcs {
err := models.UpdateGroupStatus(ctx, db, testdata.DoctorsGroup.ID, models.GroupStatusInitializing)
assert.NoError(t, err)

count, err := models.PopulateDynamicGroup(ctx, db, es, oa, testdata.DoctorsGroup.ID, tc.Query)
assert.NoError(t, err, "error populating dynamic group for: %s", tc.Query)

assert.Equal(t, count, len(tc.ContactIDs))

// assert the current group membership
contactIDs, err := models.ContactIDsForGroupIDs(ctx, db, []models.GroupID{testdata.DoctorsGroup.ID})
assert.NoError(t, err)
assert.Equal(t, tc.ContactIDs, contactIDs)

assertdb.Query(t, db, `SELECT count(*) from contacts_contactgroup WHERE id = $1 AND status = 'R'`, testdata.DoctorsGroup.ID).
Returns(1, "wrong number of contacts in group for query: %s", tc.Query)

assertdb.Query(t, db, `SELECT count(*) from campaigns_eventfire WHERE event_id = $1`, newEvent.ID).
Returns(len(tc.EventContactIDs), "wrong number of contacts with events for query: %s", tc.Query)

assertdb.Query(t, db, `SELECT count(*) from campaigns_eventfire WHERE event_id = $1 AND contact_id = ANY($2)`, newEvent.ID, pq.Array(tc.EventContactIDs)).
Returns(len(tc.EventContactIDs), "wrong contacts with events for query: %s", tc.Query)
}
}
2 changes: 2 additions & 0 deletions core/models/http_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func TestHTTPLogs(t *testing.T) {
func TestHTTPLogger(t *testing.T) {
ctx, _, db, _ := testsuite.Get()

defer func() { db.MustExec(`DELETE FROM request_logs_httplog`) }()

defer httpx.SetRequestor(httpx.DefaultRequestor)
httpx.SetRequestor(httpx.NewMockRequestor(map[string][]httpx.MockResponse{
"https://temba.io": {
Expand Down
2 changes: 1 addition & 1 deletion core/models/imports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestContactImports(t *testing.T) {
}{}
jsonx.MustUnmarshal(testJSON, &tcs)

oa, err := models.GetOrgAssets(ctx, rt, 1)
oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshOrg|models.RefreshChannels|models.RefreshGroups)
require.NoError(t, err)

uuids.SetGenerator(uuids.NewSeededGenerator(12345))
Expand Down
102 changes: 102 additions & 0 deletions core/search/groups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package search

import (
"context"
"time"

"github.com/jmoiron/sqlx"
"github.com/nyaruka/mailroom/core/models"
"github.com/olivere/elastic/v7"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// PopulateSmartGroup calculates which members should be part of a group and populates the contacts
// for that group by performing the minimum number of inserts / deletes.
func PopulateSmartGroup(ctx context.Context, db *sqlx.DB, es *elastic.Client, oa *models.OrgAssets, groupID models.GroupID, query string) (int, error) {
err := models.UpdateGroupStatus(ctx, db, groupID, models.GroupStatusEvaluating)
if err != nil {
return 0, errors.Wrapf(err, "error marking dynamic group as evaluating")
}

start := time.Now()

// we have a bit of a race with the indexer process.. we want to make sure that any contacts that changed
// before this group was updated but after the last index are included, so if a contact was modified
// more recently than 10 seconds ago, we wait that long before starting in populating our group
newest, err := models.GetNewestContactModifiedOn(ctx, db, oa)
if err != nil {
return 0, errors.Wrapf(err, "error getting most recent contact modified_on for org: %d", oa.OrgID())
}
if newest != nil {
n := *newest

// if it was more recent than 10 seconds ago, sleep until it has been 10 seconds
if n.Add(time.Second * 10).After(start) {
sleep := n.Add(time.Second * 10).Sub(start)
logrus.WithField("sleep", sleep).Info("sleeping before evaluating dynamic group")
time.Sleep(sleep)
}
}

// get current set of contacts in our group
ids, err := models.ContactIDsForGroupIDs(ctx, db, []models.GroupID{groupID})
if err != nil {
return 0, errors.Wrapf(err, "unable to look up contact ids for group: %d", groupID)
}
present := make(map[models.ContactID]bool, len(ids))
for _, i := range ids {
present[i] = true
}

// calculate new set of ids
new, err := GetContactIDsForQuery(ctx, es, oa, query, -1)
if err != nil {
return 0, errors.Wrapf(err, "error performing query: %s for group: %d", query, groupID)
}

// find which contacts need to be added or removed
adds := make([]models.ContactID, 0, 100)
for _, id := range new {
if !present[id] {
adds = append(adds, id)
}
delete(present, id)
}

// build our list of removals
removals := make([]models.ContactID, 0, len(present))
for id := range present {
removals = append(removals, id)
}

// first remove all the contacts
err = models.RemoveContactsFromGroupAndCampaigns(ctx, db, oa, groupID, removals)
if err != nil {
return 0, errors.Wrapf(err, "error removing contacts from group: %d", groupID)
}

// then add them all
err = models.AddContactsToGroupAndCampaigns(ctx, db, oa, groupID, adds)
if err != nil {
return 0, errors.Wrapf(err, "error adding contacts to group: %d", groupID)
}

// mark our group as no longer evaluating
err = models.UpdateGroupStatus(ctx, db, groupID, models.GroupStatusReady)
if err != nil {
return 0, errors.Wrapf(err, "error marking dynamic group as ready")
}

// finally update modified_on for all affected contacts to ensure these changes are seen by rp-indexer
changed := make([]models.ContactID, 0, len(adds))
changed = append(changed, adds...)
changed = append(changed, removals...)

err = models.UpdateContactModifiedOn(ctx, db, changed)
if err != nil {
return 0, errors.Wrapf(err, "error updating contact modified_on after group population")
}

return len(new), nil
}
Loading

0 comments on commit 6f2118f

Please sign in to comment.