Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move search into its own package and add more tests #620

Merged
merged 2 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/models/globals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
func TestLoadGlobals(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()

defer func() {
db.MustExec(`UPDATE globals_global SET value = 'Nyaruka' WHERE org_id = $1 AND key = $2`, testdata.Org1.ID, "org_name")
}()

// set one of our global values to empty
db.MustExec(`UPDATE globals_global SET value = '' WHERE org_id = $1 AND key = $2`, testdata.Org1.ID, "org_name")

Expand Down
91 changes: 0 additions & 91 deletions core/models/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/olivere/elastic/v7"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -338,93 +337,3 @@ func AddContactsToGroupAndCampaigns(ctx context.Context, db *sqlx.DB, oa *OrgAss

return nil
}

// PopulateDynamicGroup calculates which members should be part of a group and populates the contacts
// for that group by performing the minimum number of inserts / deletes.
func PopulateDynamicGroup(ctx context.Context, db *sqlx.DB, es *elastic.Client, oa *OrgAssets, groupID GroupID, query string) (int, error) {
err := UpdateGroupStatus(ctx, db, groupID, GroupStatusEvaluating)
if err != nil {
return 0, errors.Wrapf(err, "error marking dynamic group as evaluating")
}

start := time.Now()

// we have a bit of a race with the indexer process.. we want to make sure that any contacts that changed
// before this group was updated but after the last index are included, so if a contact was modified
// more recently than 10 seconds ago, we wait that long before starting in populating our group
newest, err := GetNewestContactModifiedOn(ctx, db, oa)
if err != nil {
return 0, errors.Wrapf(err, "error getting most recent contact modified_on for org: %d", oa.OrgID())
}
if newest != nil {
n := *newest

// if it was more recent than 10 seconds ago, sleep until it has been 10 seconds
if n.Add(time.Second * 10).After(start) {
sleep := n.Add(time.Second * 10).Sub(start)
logrus.WithField("sleep", sleep).Info("sleeping before evaluating dynamic group")
time.Sleep(sleep)
}
}

// get current set of contacts in our group
ids, err := ContactIDsForGroupIDs(ctx, db, []GroupID{groupID})
if err != nil {
return 0, errors.Wrapf(err, "unable to look up contact ids for group: %d", groupID)
}
present := make(map[ContactID]bool, len(ids))
for _, i := range ids {
present[i] = true
}

// calculate new set of ids
new, err := GetContactIDsForQuery(ctx, es, oa, query, -1)
if err != nil {
return 0, errors.Wrapf(err, "error performing query: %s for group: %d", query, groupID)
}

// find which contacts need to be added or removed
adds := make([]ContactID, 0, 100)
for _, id := range new {
if !present[id] {
adds = append(adds, id)
}
delete(present, id)
}

// build our list of removals
removals := make([]ContactID, 0, len(present))
for id := range present {
removals = append(removals, id)
}

// first remove all the contacts
err = RemoveContactsFromGroupAndCampaigns(ctx, db, oa, groupID, removals)
if err != nil {
return 0, errors.Wrapf(err, "error removing contacts from group: %d", groupID)
}

// then add them all
err = AddContactsToGroupAndCampaigns(ctx, db, oa, groupID, adds)
if err != nil {
return 0, errors.Wrapf(err, "error adding contacts to group: %d", groupID)
}

// mark our group as no longer evaluating
err = UpdateGroupStatus(ctx, db, groupID, GroupStatusReady)
if err != nil {
return 0, errors.Wrapf(err, "error marking dynamic group as ready")
}

// finally update modified_on for all affected contacts to ensure these changes are seen by rp-indexer
changed := make([]ContactID, 0, len(adds))
changed = append(changed, adds...)
changed = append(changed, removals...)

err = UpdateContactModifiedOn(ctx, db, changed)
if err != nil {
return 0, errors.Wrapf(err, "error updating contact modified_on after group population")
}

return len(new), nil
}
81 changes: 0 additions & 81 deletions core/models/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package models_test

import (
"errors"
"fmt"
"testing"

"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/lib/pq"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -58,81 +55,3 @@ func TestLoadGroups(t *testing.T) {
assert.Equal(t, tc.query, group.Query())
}
}

func TestSmartGroups(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()

defer testsuite.Reset(testsuite.ResetAll)

// insert an event on our campaign
newEvent := testdata.InsertCampaignFlowEvent(db, testdata.RemindersCampaign, testdata.Favorites, testdata.JoinedField, 1000, "W")

// clear Cathy's value
db.MustExec(
`update contacts_contact set fields = fields - $2
WHERE id = $1`, testdata.Cathy.ID, testdata.JoinedField.UUID)

// and populate Bob's
db.MustExec(
fmt.Sprintf(`update contacts_contact set fields = fields ||
'{"%s": { "text": "2029-09-15T12:00:00+00:00", "datetime": "2029-09-15T12:00:00+00:00" }}'::jsonb
WHERE id = $1`, testdata.JoinedField.UUID), testdata.Bob.ID)

oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshCampaigns|models.RefreshGroups)
assert.NoError(t, err)

mockES := testsuite.NewMockElasticServer()
defer mockES.Close()

es := mockES.Client()

mockES.AddResponse(testdata.Cathy.ID)
mockES.AddResponse(testdata.Bob.ID)
mockES.AddResponse(testdata.Bob.ID)

tcs := []struct {
Query string
ContactIDs []models.ContactID
EventContactIDs []models.ContactID
}{
{
"cathy",
[]models.ContactID{testdata.Cathy.ID},
[]models.ContactID{},
},
{
"bob",
[]models.ContactID{testdata.Bob.ID},
[]models.ContactID{testdata.Bob.ID},
},
{
"unchanged",
[]models.ContactID{testdata.Bob.ID},
[]models.ContactID{testdata.Bob.ID},
},
}

for _, tc := range tcs {
err := models.UpdateGroupStatus(ctx, db, testdata.DoctorsGroup.ID, models.GroupStatusInitializing)
assert.NoError(t, err)

count, err := models.PopulateDynamicGroup(ctx, db, es, oa, testdata.DoctorsGroup.ID, tc.Query)
assert.NoError(t, err, "error populating dynamic group for: %s", tc.Query)

assert.Equal(t, count, len(tc.ContactIDs))

// assert the current group membership
contactIDs, err := models.ContactIDsForGroupIDs(ctx, db, []models.GroupID{testdata.DoctorsGroup.ID})
assert.NoError(t, err)
assert.Equal(t, tc.ContactIDs, contactIDs)

assertdb.Query(t, db, `SELECT count(*) from contacts_contactgroup WHERE id = $1 AND status = 'R'`, testdata.DoctorsGroup.ID).
Returns(1, "wrong number of contacts in group for query: %s", tc.Query)

assertdb.Query(t, db, `SELECT count(*) from campaigns_eventfire WHERE event_id = $1`, newEvent.ID).
Returns(len(tc.EventContactIDs), "wrong number of contacts with events for query: %s", tc.Query)

assertdb.Query(t, db, `SELECT count(*) from campaigns_eventfire WHERE event_id = $1 AND contact_id = ANY($2)`, newEvent.ID, pq.Array(tc.EventContactIDs)).
Returns(len(tc.EventContactIDs), "wrong contacts with events for query: %s", tc.Query)
}
}
2 changes: 2 additions & 0 deletions core/models/http_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func TestHTTPLogs(t *testing.T) {
func TestHTTPLogger(t *testing.T) {
ctx, _, db, _ := testsuite.Get()

defer func() { db.MustExec(`DELETE FROM request_logs_httplog`) }()

defer httpx.SetRequestor(httpx.DefaultRequestor)
httpx.SetRequestor(httpx.NewMockRequestor(map[string][]httpx.MockResponse{
"https://temba.io": {
Expand Down
2 changes: 1 addition & 1 deletion core/models/imports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestContactImports(t *testing.T) {
}{}
jsonx.MustUnmarshal(testJSON, &tcs)

oa, err := models.GetOrgAssets(ctx, rt, 1)
oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshOrg|models.RefreshChannels|models.RefreshGroups)
require.NoError(t, err)

uuids.SetGenerator(uuids.NewSeededGenerator(12345))
Expand Down
102 changes: 102 additions & 0 deletions core/search/groups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package search

import (
"context"
"time"

"github.com/jmoiron/sqlx"
"github.com/nyaruka/mailroom/core/models"
"github.com/olivere/elastic/v7"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// PopulateSmartGroup calculates which members should be part of a group and populates the contacts
// for that group by performing the minimum number of inserts / deletes.
func PopulateSmartGroup(ctx context.Context, db *sqlx.DB, es *elastic.Client, oa *models.OrgAssets, groupID models.GroupID, query string) (int, error) {
err := models.UpdateGroupStatus(ctx, db, groupID, models.GroupStatusEvaluating)
if err != nil {
return 0, errors.Wrapf(err, "error marking dynamic group as evaluating")
}

start := time.Now()

// we have a bit of a race with the indexer process.. we want to make sure that any contacts that changed
// before this group was updated but after the last index are included, so if a contact was modified
// more recently than 10 seconds ago, we wait that long before starting in populating our group
newest, err := models.GetNewestContactModifiedOn(ctx, db, oa)
if err != nil {
return 0, errors.Wrapf(err, "error getting most recent contact modified_on for org: %d", oa.OrgID())
}
if newest != nil {
n := *newest

// if it was more recent than 10 seconds ago, sleep until it has been 10 seconds
if n.Add(time.Second * 10).After(start) {
sleep := n.Add(time.Second * 10).Sub(start)
logrus.WithField("sleep", sleep).Info("sleeping before evaluating dynamic group")
time.Sleep(sleep)
}
}

// get current set of contacts in our group
ids, err := models.ContactIDsForGroupIDs(ctx, db, []models.GroupID{groupID})
if err != nil {
return 0, errors.Wrapf(err, "unable to look up contact ids for group: %d", groupID)
}
present := make(map[models.ContactID]bool, len(ids))
for _, i := range ids {
present[i] = true
}

// calculate new set of ids
new, err := GetContactIDsForQuery(ctx, es, oa, query, -1)
if err != nil {
return 0, errors.Wrapf(err, "error performing query: %s for group: %d", query, groupID)
}

// find which contacts need to be added or removed
adds := make([]models.ContactID, 0, 100)
for _, id := range new {
if !present[id] {
adds = append(adds, id)
}
delete(present, id)
}

// build our list of removals
removals := make([]models.ContactID, 0, len(present))
for id := range present {
removals = append(removals, id)
}

// first remove all the contacts
err = models.RemoveContactsFromGroupAndCampaigns(ctx, db, oa, groupID, removals)
if err != nil {
return 0, errors.Wrapf(err, "error removing contacts from group: %d", groupID)
}

// then add them all
err = models.AddContactsToGroupAndCampaigns(ctx, db, oa, groupID, adds)
if err != nil {
return 0, errors.Wrapf(err, "error adding contacts to group: %d", groupID)
}

// mark our group as no longer evaluating
err = models.UpdateGroupStatus(ctx, db, groupID, models.GroupStatusReady)
if err != nil {
return 0, errors.Wrapf(err, "error marking dynamic group as ready")
}

// finally update modified_on for all affected contacts to ensure these changes are seen by rp-indexer
changed := make([]models.ContactID, 0, len(adds))
changed = append(changed, adds...)
changed = append(changed, removals...)

err = models.UpdateContactModifiedOn(ctx, db, changed)
if err != nil {
return 0, errors.Wrapf(err, "error updating contact modified_on after group population")
}

return len(new), nil
}
Loading