Skip to content

Commit

Permalink
Update to latest goflow
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Jul 30, 2021
1 parent f28855d commit 4071ce0
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 65 deletions.
5 changes: 1 addition & 4 deletions core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,10 +917,7 @@ func URNForID(ctx context.Context, db Queryer, org *OrgAssets, urnID URNID) (urn
// CalculateDynamicGroups recalculates all the dynamic groups for the passed in contact, recalculating
// campaigns as necessary based on those group changes.
func CalculateDynamicGroups(ctx context.Context, db Queryer, org *OrgAssets, contact *flows.Contact) error {
added, removed, errs := contact.ReevaluateQueryBasedGroups(org.Env())
if len(errs) > 0 {
return errors.Wrapf(errs[0], "error calculating dynamic groups")
}
added, removed := contact.ReevaluateQueryBasedGroups(org.Env())

campaigns := make(map[CampaignID]*Campaign)

Expand Down
67 changes: 22 additions & 45 deletions core/models/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
)

// BuildElasticQuery turns the passed in contact ql query into an elastic query
func BuildElasticQuery(oa *OrgAssets, group assets.GroupUUID, status ContactStatus, excludeIDs []ContactID, query *contactql.ContactQuery) (elastic.Query, error) {
func BuildElasticQuery(org *OrgAssets, group assets.GroupUUID, status ContactStatus, excludeIDs []ContactID, query *contactql.ContactQuery) elastic.Query {
// filter by org and active contacts
eq := elastic.NewBoolQuery().Must(
elastic.NewTermQuery("org_id", oa.OrgID()),
elastic.NewTermQuery("org_id", org.OrgID()),
elastic.NewTermQuery("is_active", true),
)

Expand All @@ -45,46 +45,39 @@ func BuildElasticQuery(oa *OrgAssets, group assets.GroupUUID, status ContactStat

// and by our query if present
if query != nil {
q, err := es.ToElasticQuery(oa.Env(), oa.SessionAssets(), query)
if err != nil {
return nil, errors.Wrap(err, "error translating query to elastic")
}

q := es.ToElasticQuery(org.Env(), query)
eq = eq.Must(q)
}

return eq, nil
return eq
}

// ContactIDsForQueryPage returns the ids of the contacts for the passed in query page
func ContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *OrgAssets, group assets.GroupUUID, excludeIDs []ContactID, query string, sort string, offset int, pageSize int) (*contactql.ContactQuery, []ContactID, int64, error) {
if client == nil {
return nil, nil, 0, errors.Errorf("no elastic client available, check your configuration")
}

func ContactIDsForQueryPage(ctx context.Context, client *elastic.Client, org *OrgAssets, group assets.GroupUUID, excludeIDs []ContactID, query string, sort string, offset int, pageSize int) (*contactql.ContactQuery, []ContactID, int64, error) {
env := org.Env()
start := time.Now()
var parsed *contactql.ContactQuery
var err error

if client == nil {
return nil, nil, 0, errors.Errorf("no elastic client available, check your configuration")
}

if query != "" {
parsed, err = parseAndValidateQuery(oa, query)
parsed, err = contactql.ParseQuery(env, query, org.SessionAssets())
if err != nil {
return nil, nil, 0, errors.Wrapf(err, "error parsing query: %s", query)
}
}

// turn into elastic query
eq, err := BuildElasticQuery(oa, group, NilContactStatus, excludeIDs, parsed)
if err != nil {
return nil, nil, 0, errors.Wrapf(err, "error building elastic query: %s", query)
}
eq := BuildElasticQuery(org, group, NilContactStatus, excludeIDs, parsed)

fieldSort, err := es.ToElasticFieldSort(sort, oa.SessionAssets())
fieldSort, err := es.ToElasticFieldSort(sort, org.SessionAssets())
if err != nil {
return nil, nil, 0, errors.Wrapf(err, "error parsing sort")
}

s := client.Search("contacts").TrackTotalHits(true).Routing(strconv.FormatInt(int64(oa.OrgID()), 10))
s := client.Search("contacts").TrackTotalHits(true).Routing(strconv.FormatInt(int64(org.OrgID()), 10))
s = s.Size(pageSize).From(offset).Query(eq).SortBy(fieldSort).FetchSource(false)

results, err := s.Do(ctx)
Expand All @@ -108,7 +101,7 @@ func ContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *Org
}

logrus.WithFields(logrus.Fields{
"org_id": oa.OrgID(),
"org_id": org.OrgID(),
"parsed": parsed,
"group_uuid": group,
"query": query,
Expand All @@ -121,34 +114,32 @@ func ContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa *Org
}

// ContactIDsForQuery returns the ids of all the contacts that match the passed in query
func ContactIDsForQuery(ctx context.Context, client *elastic.Client, oa *OrgAssets, query string) ([]ContactID, error) {
func ContactIDsForQuery(ctx context.Context, client *elastic.Client, org *OrgAssets, query string) ([]ContactID, error) {
env := org.Env()
start := time.Now()

if client == nil {
return nil, errors.Errorf("no elastic client available, check your configuration")
}

parsed, err := parseAndValidateQuery(oa, query)
// turn into elastic query
parsed, err := contactql.ParseQuery(env, query, org.SessionAssets())
if err != nil {
return nil, errors.Wrapf(err, "error parsing query: %s", query)
}

// turn into elastic query
eq, err := BuildElasticQuery(oa, "", ContactStatusActive, nil, parsed)
if err != nil {
return nil, errors.Wrapf(err, "error building elastic query: %s", query)
}
eq := BuildElasticQuery(org, "", ContactStatusActive, nil, parsed)

ids := make([]ContactID, 0, 100)

// iterate across our results, building up our contact ids
scroll := client.Scroll("contacts").Routing(strconv.FormatInt(int64(oa.OrgID()), 10))
scroll := client.Scroll("contacts").Routing(strconv.FormatInt(int64(org.OrgID()), 10))
scroll = scroll.KeepAlive("15m").Size(10000).Query(eq).FetchSource(false)
for {
results, err := scroll.Do(ctx)
if err == io.EOF {
logrus.WithFields(logrus.Fields{
"org_id": oa.OrgID(),
"org_id": org.OrgID(),
"query": query,
"elapsed": time.Since(start),
"match_count": len(ids),
Expand All @@ -170,17 +161,3 @@ func ContactIDsForQuery(ctx context.Context, client *elastic.Client, oa *OrgAsse
}
}
}

func parseAndValidateQuery(oa *OrgAssets, query string) (*contactql.ContactQuery, error) {
parsed, err := contactql.ParseQuery(oa.Env(), query)
if err != nil {
return nil, errors.Wrapf(err, "error parsing query: %s", query)
}

err = parsed.Validate(oa.Env(), oa.SessionAssets())
if err != nil {
return nil, err
}

return parsed, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.13.0
github.com/nyaruka/goflow v0.129.0
github.com/nyaruka/goflow v0.130.1
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
github.com/nyaruka/gocommon v1.13.0 h1:WPL//ekajA30KinYRr6IrdP1igNZpcUAfABleHCuxPQ=
github.com/nyaruka/gocommon v1.13.0/go.mod h1:Jn7UIE8zwIr4JaviDf4PZrrQlN8r6QGVhOuaF/JoKus=
github.com/nyaruka/goflow v0.129.0 h1:1I6SxdDOBF9wCeAb5yfG/3xWI4IHyPZz7t/pXJrsI8Q=
github.com/nyaruka/goflow v0.129.0/go.mod h1:Xp1p21TyYiMM/fVQNWQRok/fZ1ZeNoeQGUd//LvYxq4=
github.com/nyaruka/goflow v0.130.1 h1:ai84idJkjgn2XJdv735/4qz72L0AuOYtEX8+GpL4xs0=
github.com/nyaruka/goflow v0.130.1/go.mod h1:Xp1p21TyYiMM/fVQNWQRok/fZ1ZeNoeQGUd//LvYxq4=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
13 changes: 5 additions & 8 deletions web/contact/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,12 @@ func handleParseQuery(ctx context.Context, rt *runtime.Runtime, r *http.Request)
}

env := oa.Env()
parsed, err := contactql.ParseQuery(env, request.Query)
if err == nil && !request.ParseOnly {
err = parsed.Validate(env, oa.SessionAssets())
var resolver contactql.Resolver
if !request.ParseOnly {
resolver = oa.SessionAssets()
}

parsed, err := contactql.ParseQuery(env, request.Query, resolver)
if err != nil {
isQueryError, qerr := contactql.IsQueryError(err)
if isQueryError {
Expand All @@ -191,11 +192,7 @@ func handleParseQuery(ctx context.Context, rt *runtime.Runtime, r *http.Request)

var elasticSource interface{}
if !request.ParseOnly {
eq, err := models.BuildElasticQuery(oa, request.GroupUUID, models.NilContactStatus, nil, parsed)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "error building elastic query")
}

eq := models.BuildElasticQuery(oa, request.GroupUUID, models.NilContactStatus, nil, parsed)
elasticSource, err = eq.Source()
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "error getting elastic source")
Expand Down
2 changes: 1 addition & 1 deletion web/contact/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestSearch(t *testing.T) {
}
}

func TestParse(t *testing.T) {
func TestParseQuery(t *testing.T) {
testsuite.Reset()

web.RunWebTests(t, "testdata/parse_query.json", nil)
Expand Down
7 changes: 4 additions & 3 deletions web/contact/testdata/parse_query.json
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
"fields": [
{
"key": "age",
"name": ""
"name": "Age"
}
],
"groups": [],
Expand Down Expand Up @@ -204,7 +204,7 @@
"fields": [
{
"key": "age",
"name": ""
"name": "Age"
}
],
"groups": [],
Expand Down Expand Up @@ -252,7 +252,8 @@
"fields": [],
"groups": [
{
"name_match": "Testers"
"uuid": "5e9d8fab-5e7e-4f51-b533-261af5dea70d",
"name": "Testers"
}
],
"allow_as_group": false
Expand Down
2 changes: 1 addition & 1 deletion web/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestErrorResponse(t *testing.T) {
assert.JSONEq(t, `{"error": "I'm an error!"}`, string(er1JSON))

// create a rich error
_, err = contactql.ParseQuery(envs.NewBuilder().Build(), "$$")
_, err = contactql.ParseQuery(envs.NewBuilder().Build(), "$$", nil)

er2 := web.NewErrorResponse(err)
assert.Equal(t, "mismatched input '$' expecting {'(', TEXT, STRING}", er2.Error)
Expand Down

0 comments on commit 4071ce0

Please sign in to comment.