Skip to content

Commit

Permalink
Merge pull request rapidpro#614 from nyaruka/es_asset_ids
Browse files Browse the repository at this point in the history
Resolve flows and groups to ids for elastic queries
  • Loading branch information
rowanseymour authored Apr 6, 2022
2 parents 4d8b5a3 + 86f32ec commit f9ee767
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 46 deletions.
1 change: 1 addition & 0 deletions core/handlers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func createTestFlow(t *testing.T, uuid assets.FlowUUID, tc TestCase) flows.Flow
definition.NewLocalization(),
nodes,
nil,
nil,
)
require.NoError(t, err)

Expand Down
9 changes: 6 additions & 3 deletions core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ func (c *Contact) UpdatePreferredURN(ctx context.Context, db Queryer, oa *OrgAss
// FlowContact converts our mailroom contact into a flow contact for use in the engine
func (c *Contact) FlowContact(oa *OrgAssets) (*flows.Contact, error) {
// convert our groups to a list of references
groups := make([]*assets.GroupReference, len(c.groups))
for i, g := range c.groups {
groups[i] = assets.NewGroupReference(g.UUID(), g.Name())
groups := make([]*assets.GroupReference, 0, len(c.groups))
for _, g := range c.groups {
// exclude the db-trigger based status groups for now
if g.Type() == GroupTypeManual || g.Type() == GroupTypeSmart {
groups = append(groups, assets.NewGroupReference(g.UUID(), g.Name()))
}
}

// convert our tickets to flow tickets
Expand Down
32 changes: 24 additions & 8 deletions core/models/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/sirupsen/logrus"
)

// GroupID is our type for group ids
type GroupID int

// GroupStatus is the current status of the passed in group
type GroupStatus string

Expand All @@ -24,16 +27,23 @@ const (
GroupStatusReady = GroupStatus("R")
)

// GroupID is our type for group ids
type GroupID int
// GroupType is the the type of a group
type GroupType string

const (
GroupTypeManual = GroupType("M")
GroupTypeSmart = GroupType("Q")
)

// Group is our mailroom type for contact groups
type Group struct {
g struct {
ID GroupID `json:"id"`
UUID assets.GroupUUID `json:"uuid"`
Name string `json:"name"`
Query string `json:"query"`
ID GroupID `json:"id"`
UUID assets.GroupUUID `json:"uuid"`
Name string `json:"name"`
Query string `json:"query"`
Status GroupStatus `json:"status"`
Type GroupType `json:"group_type"`
}
}

Expand All @@ -49,6 +59,12 @@ func (g *Group) Name() string { return g.g.Name }
// Query returns the query string (if any) for this group
func (g *Group) Query() string { return g.g.Query }

// Status returns the status of this group
func (g *Group) Status() GroupStatus { return g.g.Status }

// Type returns the type of this group
func (g *Group) Type() GroupType { return g.g.Type }

// LoadGroups loads the groups for the passed in org
func LoadGroups(ctx context.Context, db Queryer, orgID OrgID) ([]assets.Group, error) {
start := time.Now()
Expand Down Expand Up @@ -77,9 +93,9 @@ func LoadGroups(ctx context.Context, db Queryer, orgID OrgID) ([]assets.Group, e

const selectGroupsSQL = `
SELECT ROW_TO_JSON(r) FROM (
SELECT id, uuid, name, query
SELECT id, uuid, name, query, status, group_type
FROM contacts_contactgroup
WHERE org_id = $1 AND group_type IN ('M', 'Q') AND is_active = TRUE
WHERE org_id = $1 AND is_active = TRUE
ORDER BY name ASC
) r;`

Expand Down
7 changes: 6 additions & 1 deletion core/models/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ func TestLoadGroups(t *testing.T) {
name string
query string
}{
{testdata.ActiveGroup.ID, testdata.ActiveGroup.UUID, "Active", ""},
{testdata.ArchivedGroup.ID, testdata.ArchivedGroup.UUID, "Archived", ""},
{testdata.BlockedGroup.ID, testdata.BlockedGroup.UUID, "Blocked", ""},
{testdata.DoctorsGroup.ID, testdata.DoctorsGroup.UUID, "Doctors", ""},
{testdata.OpenTicketsGroup.ID, testdata.OpenTicketsGroup.UUID, "Open Tickets", "tickets > 0"},
{testdata.StoppedGroup.ID, testdata.StoppedGroup.UUID, "Stopped", ""},
{testdata.TestersGroup.ID, testdata.TestersGroup.UUID, "Testers", ""},
}

assert.Equal(t, 3, len(groups))
assert.Equal(t, 7, len(groups))

for i, tc := range tcs {
group := groups[i].(*models.Group)
assert.Equal(t, tc.uuid, group.UUID())
Expand Down
36 changes: 21 additions & 15 deletions core/models/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,31 @@ import (
"github.com/sirupsen/logrus"
)

// AssetMapper maps resolved assets in queries to how we identify them in ES which in the case
// of flows and groups is their ids. We can do this by just type cracking them to their models.
type AssetMapper struct{}

func (m *AssetMapper) Flow(f assets.Flow) int64 {
return int64(f.(*Flow).ID())
}

func (m *AssetMapper) Group(g assets.Group) int64 {
return int64(g.(*Group).ID())
}

var assetMapper = &AssetMapper{}

// BuildElasticQuery turns the passed in contact ql query into an elastic query
func BuildElasticQuery(oa *OrgAssets, group assets.GroupUUID, status ContactStatus, excludeIDs []ContactID, query *contactql.ContactQuery) elastic.Query {
func BuildElasticQuery(oa *OrgAssets, group *Group, status ContactStatus, excludeIDs []ContactID, query *contactql.ContactQuery) elastic.Query {
// filter by org and active contacts
eq := elastic.NewBoolQuery().Must(
elastic.NewTermQuery("org_id", oa.OrgID()),
elastic.NewTermQuery("is_active", true),
)

// our group if present
if group != "" {
eq = eq.Must(elastic.NewTermQuery("groups", group))
if group != nil {
eq = eq.Must(elastic.NewTermQuery("group_ids", group.ID()))
}

// our status is present
Expand All @@ -45,15 +59,15 @@ func BuildElasticQuery(oa *OrgAssets, group assets.GroupUUID, status ContactStat

// and by our query if present
if query != nil {
q := es.ToElasticQuery(oa.Env(), query)
q := es.ToElasticQuery(oa.Env(), assetMapper, query)
eq = eq.Must(q)
}

return eq
}

// GetContactIDsForQueryPage returns a page of contact ids for the given query and sort
func GetContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *OrgAssets, group assets.GroupUUID, excludeIDs []ContactID, query string, sort string, offset int, pageSize int) (*contactql.ContactQuery, []ContactID, int64, error) {
func GetContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *OrgAssets, group *Group, excludeIDs []ContactID, query string, sort string, offset int, pageSize int) (*contactql.ContactQuery, []ContactID, int64, error) {
env := oa.Env()
start := time.Now()
var parsed *contactql.ContactQuery
Expand Down Expand Up @@ -97,15 +111,7 @@ func GetContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *
return nil, nil, 0, err
}

logrus.WithFields(logrus.Fields{
"org_id": oa.OrgID(),
"parsed": parsed,
"group_uuid": group,
"query": query,
"elapsed": time.Since(start),
"page_count": len(ids),
"total_count": results.Hits.TotalHits,
}).Debug("paged contact query complete")
logrus.WithFields(logrus.Fields{"org_id": oa.OrgID(), "query": query, "elapsed": time.Since(start), "page_count": len(ids), "total_count": results.Hits.TotalHits}).Debug("paged contact query complete")

return parsed, ids, results.Hits.TotalHits.Value, nil
}
Expand All @@ -126,7 +132,7 @@ func GetContactIDsForQuery(ctx context.Context, client *elastic.Client, oa *OrgA
}

routing := strconv.FormatInt(int64(oa.OrgID()), 10)
eq := BuildElasticQuery(oa, "", ContactStatusActive, nil, parsed)
eq := BuildElasticQuery(oa, nil, ContactStatusActive, nil, parsed)
ids := make([]ContactID, 0, 100)

// if limit provided that can be done with regular search, do that
Expand Down
16 changes: 9 additions & 7 deletions core/models/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"testing"

"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/test"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/testsuite"
Expand Down Expand Up @@ -32,7 +31,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
require.NoError(t, err)

tcs := []struct {
Group assets.GroupUUID
Group *testdata.Group
ExcludeIDs []models.ContactID
Query string
Sort string
Expand All @@ -43,7 +42,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
ExpectedError string
}{
{
Group: testdata.ActiveGroup.UUID,
Group: testdata.ActiveGroup,
Query: "george",
ExpectedESRequest: `{
"_source": false,
Expand All @@ -63,7 +62,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
},
{
"term": {
"groups": "b97f69f7-5edf-45c7-9fda-d37066eae91d"
"group_ids": 1
}
},
{
Expand Down Expand Up @@ -117,7 +116,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
ExpectedTotal: 1,
},
{
Group: testdata.BlockedGroup.UUID,
Group: testdata.BlockedGroup,
ExcludeIDs: []models.ContactID{testdata.Bob.ID, testdata.Cathy.ID},
Query: "age > 32",
Sort: "-age",
Expand All @@ -139,7 +138,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
},
{
"term": {
"groups": "14f6ea01-456b-4417-b0b8-35e942f549f1"
"group_ids": 2
}
},
{
Expand Down Expand Up @@ -229,6 +228,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
ExpectedTotal: 1,
},
{
Group: testdata.ActiveGroup,
Query: "goats > 2", // no such contact field
ExpectedError: "error parsing query: goats > 2: can't resolve 'goats' to attribute, scheme or field",
},
Expand All @@ -237,7 +237,9 @@ func TestGetContactIDsForQueryPage(t *testing.T) {
for i, tc := range tcs {
es.NextResponse = tc.MockedESResponse

_, ids, total, err := models.GetContactIDsForQueryPage(ctx, client, oa, tc.Group, tc.ExcludeIDs, tc.Query, tc.Sort, 0, 50)
group := oa.GroupByID(tc.Group.ID)

_, ids, total, err := models.GetContactIDsForQueryPage(ctx, client, oa, group, tc.ExcludeIDs, tc.Query, tc.Sort, 0, 50)

if tc.ExpectedError != "" {
assert.EqualError(t, err, tc.ExpectedError)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/lib/pq v1.10.4
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.17.1
github.com/nyaruka/goflow v0.154.0
github.com/nyaruka/goflow v0.156.1
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDY
github.com/nyaruka/gocommon v1.5.3/go.mod h1:2ZeBZF9yt20IaAJ4aC1ujojAsFhJBk2IuDvSl7KuQDw=
github.com/nyaruka/gocommon v1.17.1 h1:4bbNp+0/BIbne4VDiKOxh3kcbdvEu/WsrsZiG/VyRZ8=
github.com/nyaruka/gocommon v1.17.1/go.mod h1:nmYyb7MZDM0iW4DYJKiBzfKuE9nbnx+xSHZasuIBOT0=
github.com/nyaruka/goflow v0.154.0 h1:tcUVs+sDFyjWdLvyk1kf2SOkQwSGInMbzuG+trE7ZNc=
github.com/nyaruka/goflow v0.154.0/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ=
github.com/nyaruka/goflow v0.156.1 h1:bRVNuuMkbbmkKsphyLI9+F57kVGBfFfu2rjLZ+0er/U=
github.com/nyaruka/goflow v0.156.1/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
25 changes: 21 additions & 4 deletions web/contact/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ func init() {
//
// {
// "org_id": 1,
// "group_id": 234,
// "group_uuid": "985a83fe-2e9f-478d-a3ec-fa602d5e7ddd",
// "query": "age > 10",
// "sort": "-age"
// }
//
type searchRequest struct {
OrgID models.OrgID `json:"org_id" validate:"required"`
GroupUUID assets.GroupUUID `json:"group_uuid" validate:"required"`
GroupID models.GroupID `json:"group_id"`
GroupUUID assets.GroupUUID `json:"group_uuid"`
ExcludeIDs []models.ContactID `json:"exclude_ids"`
Query string `json:"query"`
PageSize int `json:"page_size"`
Expand Down Expand Up @@ -78,9 +80,15 @@ func handleSearch(ctx context.Context, rt *runtime.Runtime, r *http.Request) (in
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
}

var group *models.Group
if request.GroupID != 0 {
group = oa.GroupByID(request.GroupID)
} else if request.GroupUUID != "" {
group = oa.GroupByUUID(request.GroupUUID)
}

// perform our search
parsed, hits, total, err := models.GetContactIDsForQueryPage(ctx, rt.ES, oa,
request.GroupUUID, request.ExcludeIDs, request.Query, request.Sort, request.Offset, request.PageSize)
parsed, hits, total, err := models.GetContactIDsForQueryPage(ctx, rt.ES, oa, group, request.ExcludeIDs, request.Query, request.Sort, request.Offset, request.PageSize)

if err != nil {
isQueryError, qerr := contactql.IsQueryError(err)
Expand Down Expand Up @@ -117,13 +125,15 @@ func handleSearch(ctx context.Context, rt *runtime.Runtime, r *http.Request) (in
// {
// "org_id": 1,
// "query": "age > 10",
// "group_id": 234,
// "group_uuid": "123123-123-123-"
// }
//
type parseRequest struct {
OrgID models.OrgID `json:"org_id" validate:"required"`
Query string `json:"query" validate:"required"`
ParseOnly bool `json:"parse_only"`
GroupID models.GroupID `json:"group_id"`
GroupUUID assets.GroupUUID `json:"group_uuid"`
}

Expand Down Expand Up @@ -158,6 +168,13 @@ func handleParseQuery(ctx context.Context, rt *runtime.Runtime, r *http.Request)
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
}

var group *models.Group
if request.GroupID != 0 {
group = oa.GroupByID(request.GroupID)
} else if request.GroupUUID != "" {
group = oa.GroupByUUID(request.GroupUUID)
}

env := oa.Env()
var resolver contactql.Resolver
if !request.ParseOnly {
Expand All @@ -179,7 +196,7 @@ func handleParseQuery(ctx context.Context, rt *runtime.Runtime, r *http.Request)

var elasticSource interface{}
if !request.ParseOnly {
eq := models.BuildElasticQuery(oa, request.GroupUUID, models.NilContactStatus, nil, parsed)
eq := models.BuildElasticQuery(oa, group, models.NilContactStatus, nil, parsed)
elasticSource, err = eq.Source()
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "error getting elastic source")
Expand Down
2 changes: 1 addition & 1 deletion web/contact/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestSearch(t *testing.T) {
},
{
"term": {
"groups": "b97f69f7-5edf-45c7-9fda-d37066eae91d"
"group_ids": 1
}
},
{
Expand Down
Loading

0 comments on commit f9ee767

Please sign in to comment.