Skip to content

Commit

Permalink
add elastic tests, add query field to flowstart
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Jul 26, 2019
1 parent 358626d commit 5c286e0
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 25 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ github.com/nyaruka/gocommon v1.0.1 h1:pLKU6sK31PqmSkLiZKlap8XIqUe2JHLtteagSup3cr
github.com/nyaruka/gocommon v1.0.1/go.mod h1:QbdU2J9WBsqBmeZRuwndf2f6O7rD7mkC0bGn5UNnwjI=
github.com/nyaruka/goflow v0.43.2 h1:ZZ3F3aT9QU8aEvYdbWPdEV2S9/MedIolybLM0Avs+nw=
github.com/nyaruka/goflow v0.43.2/go.mod h1:EZtf7FNY9Gys6wpqrLWpw1Q2ob1Xh3pXFHdnnOPZPNg=
github.com/nyaruka/goflow v0.44.2 h1:+IdTxImKqaEz5wWEYRcbRVL5wgzWKnEJgk5PK4a8Tvk=
github.com/nyaruka/librato v0.0.0-20180827155909-cacc769357b8 h1:TOvxy0u6LNTWP3gwbdNVCiByXJupr9ATFdzBnBJ2TY8=
github.com/nyaruka/librato v0.0.0-20180827155909-cacc769357b8/go.mod h1:huVocfMEHkttMHD4hSr/wjWNyTx/YMzwwajVzV2bq+0=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
2 changes: 1 addition & 1 deletion hooks/session_triggered.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (h *InsertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool
// create our start
start := models.NewFlowStart(
org.OrgID(), flow.FlowType(), flow.ID(),
groupIDs, contactIDs, event.URNs, event.CreateContact,
groupIDs, contactIDs, event.URNs, "", event.CreateContact,
true, true,
event.RunSummary, nil,
)
Expand Down
2 changes: 1 addition & 1 deletion ivr/ivr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestIVR(t *testing.T) {
db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, models.TwilioChannelID)

// create a flow start for cathy
start := models.NewFlowStart(models.Org1, models.IVRFlow, models.IVRFlowID, nil, []models.ContactID{models.CathyID}, nil, false, true, true, nil, nil)
start := models.NewFlowStart(models.Org1, models.IVRFlow, models.IVRFlowID, nil, []models.ContactID{models.CathyID}, nil, "", false, true, true, nil, nil)

// call our master starter
err := starts.CreateFlowBatches(ctx, db, rp, nil, start)
Expand Down
Binary file modified mailroom_test.dump
Binary file not shown.
4 changes: 2 additions & 2 deletions models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ func ContactIDsForQuery(ctx context.Context, client *elastic.Client, org *OrgAss
ids := make([]ContactID, 0, 100)

// iterate across our results, building up our contact ids
scroll := client.Scroll("contacts").Index("contacts").Routing(strconv.FormatInt(int64(org.OrgID()), 10))
scroll = scroll.KeepAlive("15m").Size(100000).Query(eq).FetchSource(false)
scroll := client.Scroll("contacts").Routing(strconv.FormatInt(int64(org.OrgID()), 10))
scroll = scroll.KeepAlive("15m").Size(10000).Query(eq).FetchSource(false)
for {
results, err := scroll.Do(ctx)
if err == io.EOF {
Expand Down
120 changes: 120 additions & 0 deletions models/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,131 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/engine"
"github.com/nyaruka/mailroom/search"
"github.com/nyaruka/mailroom/testsuite"
"github.com/olivere/elastic"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"

"fmt"
)

func TestElasticContacts(t *testing.T) {
testsuite.Reset()
ctx := testsuite.CTX()
db := testsuite.DB()

es := search.NewMockElasticServer()
defer es.Close()

client, err := elastic.NewClient(
elastic.SetURL(es.URL()),
elastic.SetHealthcheck(false),
elastic.SetSniff(false),
)
assert.NoError(t, err)

org, err := GetOrgAssets(ctx, db, 1)
assert.NoError(t, err)

tcs := []struct {
Query string
Request string
Response string
Contacts []ContactID
}{
{
Query: "george",
Request: `{
"_source":false,
"query":{
"bool":{
"must":[
{"match":{"name":{"query":"george"}}},
{"term":{"org_id":1}},
{"term":{"is_active":true}},
{"term":{"is_blocked":false}},
{"term":{"is_stopped":false}}
]
}
},
"sort":["_doc"]
}`,
Response: fmt.Sprintf(`{
"_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==",
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1,
"max_score": null,
"hits": [
{
"_index": "contacts",
"_type": "_doc",
"_id": "%d",
"_score": null,
"_routing": "1",
"sort": [
15124352
]
}
]
}
}`, GeorgeID),
Contacts: []ContactID{GeorgeID},
}, {
Query: "nobody",
Request: `{
"_source":false,
"query":{
"bool":{
"must":[
{"match":{"name":{"query":"nobody"}}},
{"term":{"org_id":1}},
{"term":{"is_active":true}},
{"term":{"is_blocked":false}},
{"term":{"is_stopped":false}}
]
}
},
"sort":["_doc"]
}`,
Response: `{
"_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==",
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}`,
Contacts: []ContactID{},
},
}

for i, tc := range tcs {
es.NextResponse = tc.Response

ids, err := ContactIDsForQuery(ctx, client, org, tc.Query)
assert.NoError(t, err, "%d: error encountered performing query", i)
assert.JSONEq(t, tc.Request, es.LastBody, "%d: request mismatch", i)
assert.Equal(t, tc.Contacts, ids, "%d: ids mismatch", i)
}
}

func TestContacts(t *testing.T) {
testsuite.Reset()
ctx := testsuite.CTX()
Expand Down
8 changes: 4 additions & 4 deletions models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type FlowStartBatch struct {
FlowID FlowID `json:"flow_id"`
FlowType FlowType `json:"flow_type"`
ContactIDs []ContactID `json:"contact_ids"`
Search string `json:"search"`

ParentSummary null.JSON `json:"parent_summary,omitempty"`
Extra null.JSON `json:"extra,omitempty"`
Expand Down Expand Up @@ -83,8 +82,8 @@ type FlowStart struct {

GroupIDs []GroupID `json:"group_ids,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`
Query string `json:"query,omitempty"`
URNs []urns.URN `json:"urns,omitempty"`
Query null.String `json:"query,omitempty" db:"query"`

CreateContact bool `json:"create_contact"`

Expand All @@ -103,8 +102,8 @@ func (s *FlowStart) FlowType() FlowType { return s.s.FlowType }
func (s *FlowStart) GroupIDs() []GroupID { return s.s.GroupIDs }
func (s *FlowStart) ContactIDs() []ContactID { return s.s.ContactIDs }
func (s *FlowStart) URNs() []urns.URN { return s.s.URNs }
func (s *FlowStart) Query() string { return string(s.s.Query) }
func (s *FlowStart) CreateContact() bool { return s.s.CreateContact }
func (s *FlowStart) Query() string { return s.s.Query }
func (s *FlowStart) RestartParticipants() bool { return s.s.RestartParticipants }
func (s *FlowStart) IncludeActive() bool { return s.s.IncludeActive }

Expand All @@ -127,7 +126,7 @@ func GetFlowStartAttributes(ctx context.Context, db Queryer, orgID OrgID, startI
// NewFlowStart creates a new flow start objects for the passed in parameters
func NewFlowStart(
orgID OrgID, flowType FlowType, flowID FlowID,
groupIDs []GroupID, contactIDs []ContactID, urns []urns.URN, createContact bool,
groupIDs []GroupID, contactIDs []ContactID, urns []urns.URN, query string, createContact bool,
restartParticipants bool, includeActive bool, parent json.RawMessage, extra json.RawMessage) *FlowStart {

s := &FlowStart{}
Expand All @@ -138,6 +137,7 @@ func NewFlowStart(
s.s.GroupIDs = groupIDs
s.s.ContactIDs = contactIDs
s.s.URNs = urns
s.s.Query = null.String(query)
s.s.CreateContact = createContact
s.s.RestartParticipants = restartParticipants
s.s.IncludeActive = includeActive
Expand Down
2 changes: 1 addition & 1 deletion runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func TriggerIVRFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, orgID mode
tx, _ := db.BeginTxx(ctx, nil)

// create our start
start := models.NewFlowStart(orgID, models.IVRFlow, flowID, nil, contactIDs, nil, false, true, true, nil, nil)
start := models.NewFlowStart(orgID, models.IVRFlow, flowID, nil, contactIDs, nil, "", false, true, true, nil, nil)

// insert it
err := models.InsertFlowStarts(ctx, tx, []*models.FlowStart{start})
Expand Down
2 changes: 1 addition & 1 deletion runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestBatchStart(t *testing.T) {
for i, tc := range tcs {
start := models.NewFlowStart(
models.OrgID(1), models.MessagingFlow, tc.Flow,
nil, contactIDs, nil, false, tc.Restart, tc.IncludeActive,
nil, contactIDs, nil, "", false, tc.Restart, tc.IncludeActive,
nil, tc.Extra,
)
batch := start.CreateBatch(contactIDs)
Expand Down
67 changes: 67 additions & 0 deletions search/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package search

import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
)

// MockElasticServer is a mock HTTP server/endpoint that can be used to test elastic queries
type MockElasticServer struct {
Server *httptest.Server
LastBody string
NextResponse string
}

// NewMockElasticServer creates a new mock elastic server
func NewMockElasticServer() *MockElasticServer {
mock := &MockElasticServer{}
mock.Server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// scrolling of results, we are always one page, so return empty hits
if r.URL.String() == "/_search/scroll" {
w.WriteHeader(200)
w.Write([]byte(`
{
"_scroll_id": "anything==",
"took": 7,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1000,
"max_score": null,
"hits": []
}
}
`))
return
}

// otherwise read our next body and return our next response
body, _ := ioutil.ReadAll(r.Body)
mock.LastBody = string(body)

fmt.Println(r.URL)
fmt.Println(mock.LastBody)

w.WriteHeader(200)
w.Write([]byte(mock.NextResponse))
mock.NextResponse = ""
}))
return mock
}

// Close closes our HTTP server
func (m *MockElasticServer) Close() {
m.Server.Close()
}

// URL returns the URL to call this server
func (m *MockElasticServer) URL() string {
return m.Server.URL
}
4 changes: 2 additions & 2 deletions search/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/nyaruka/goflow/contactql"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/goflow/envs"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -72,7 +72,7 @@ func TestElasticQuery(t *testing.T) {
assert.NoError(t, err)

ny, _ := time.LoadLocation("America/New_York")
env := utils.NewEnvironmentBuilder().WithTimezone(ny).Build()
env := envs.NewEnvironmentBuilder().WithTimezone(ny).Build()

for _, tc := range tcs {
registry.IsAnon = tc.IsAnon
Expand Down
Loading

0 comments on commit 5c286e0

Please sign in to comment.