From 0ad3b2ee2639df8f7b3dfd53d46bcfb05733b271 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 19 Apr 2022 17:42:14 -0500 Subject: [PATCH 1/2] Add endpoint to generate a flow start preview (WIP) --- core/models/groups_test.go | 52 +----- core/models/search.go | 2 +- core/models/search_test.go | 159 ++---------------- .../contacts/populate_dynamic_group_test.go | 43 +---- core/tasks/starts/worker_test.go | 54 ++---- testsuite/elastic.go | 55 +++++- web/contact/search_test.go | 64 ++----- web/flow/start.go | 151 +++++++++++++++++ web/flow/start_test.go | 25 +++ web/flow/testdata/preview_start.json | 150 +++++++++++++++++ 10 files changed, 436 insertions(+), 319 deletions(-) create mode 100644 web/flow/start.go create mode 100644 web/flow/start_test.go create mode 100644 web/flow/testdata/preview_start.json diff --git a/core/models/groups_test.go b/core/models/groups_test.go index e881dac9c..0cadd675a 100644 --- a/core/models/groups_test.go +++ b/core/models/groups_test.go @@ -12,7 +12,6 @@ import ( "github.com/nyaruka/mailroom/testsuite/testdata" "github.com/lib/pq" - "github.com/olivere/elastic/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -60,7 +59,7 @@ func TestLoadGroups(t *testing.T) { } } -func TestDynamicGroups(t *testing.T) { +func TestSmartGroups(t *testing.T) { ctx, rt, db, _ := testsuite.Get() defer testsuite.Reset(testsuite.ResetAll) @@ -82,68 +81,32 @@ func TestDynamicGroups(t *testing.T) { oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshCampaigns|models.RefreshGroups) assert.NoError(t, err) - esServer := testsuite.NewMockElasticServer() - defer esServer.Close() + mockES := testsuite.NewMockElasticServer() + defer mockES.Close() - es, err := elastic.NewClient( - elastic.SetURL(esServer.URL()), - elastic.SetHealthcheck(false), - elastic.SetSniff(false), - ) - assert.NoError(t, err) - - contactHit := `{ - "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", - "took": 2, - "timed_out": false, - "_shards": { - "total": 1, - "successful": 1, - "skipped": 0, - "failed": 0 - }, - "hits": { - "total": 1, - "max_score": null, - "hits": [ - { - "_index": "contacts", - "_type": "_doc", - "_id": "%d", - "_score": null, - "_routing": "1", - "sort": [ - 15124352 - ] - } - ] - } - }` + es := mockES.Client() - cathyHit := fmt.Sprintf(contactHit, testdata.Cathy.ID) - bobHit := fmt.Sprintf(contactHit, testdata.Bob.ID) + mockES.AddResponse(testdata.Cathy.ID) + mockES.AddResponse(testdata.Bob.ID) + mockES.AddResponse(testdata.Bob.ID) tcs := []struct { Query string - ESResponse string ContactIDs []models.ContactID EventContactIDs []models.ContactID }{ { "cathy", - cathyHit, []models.ContactID{testdata.Cathy.ID}, []models.ContactID{}, }, { "bob", - bobHit, []models.ContactID{testdata.Bob.ID}, []models.ContactID{testdata.Bob.ID}, }, { "unchanged", - bobHit, []models.ContactID{testdata.Bob.ID}, []models.ContactID{testdata.Bob.ID}, }, @@ -153,7 +116,6 @@ func TestDynamicGroups(t *testing.T) { err := models.UpdateGroupStatus(ctx, db, testdata.DoctorsGroup.ID, models.GroupStatusInitializing) assert.NoError(t, err) - esServer.NextResponse = tc.ESResponse count, err := models.PopulateDynamicGroup(ctx, db, es, oa, testdata.DoctorsGroup.ID, tc.Query) assert.NoError(t, err, "error populating dynamic group for: %s", tc.Query) diff --git a/core/models/search.go b/core/models/search.go index c54a7b7e1..f56929a26 100644 --- a/core/models/search.go +++ b/core/models/search.go @@ -111,7 +111,7 @@ func GetContactIDsForQueryPage(ctx context.Context, client *elastic.Client, oa * return nil, nil, 0, err } - logrus.WithFields(logrus.Fields{"org_id": oa.OrgID(), "query": query, "elapsed": time.Since(start), "page_count": len(ids), "total_count": results.Hits.TotalHits}).Debug("paged contact query complete") + logrus.WithFields(logrus.Fields{"org_id": oa.OrgID(), "query": query, "elapsed": time.Since(start), "page_count": len(ids), "total_count": results.Hits.TotalHits.Value}).Debug("paged contact query complete") return parsed, ids, results.Hits.TotalHits.Value, nil } diff --git a/core/models/search_test.go b/core/models/search_test.go index 92a63320b..4ed19c040 100644 --- a/core/models/search_test.go +++ b/core/models/search_test.go @@ -1,7 +1,6 @@ package models_test import ( - "fmt" "testing" "github.com/nyaruka/goflow/test" @@ -17,15 +16,13 @@ import ( func TestGetContactIDsForQueryPage(t *testing.T) { ctx, rt, _, _ := testsuite.Get() - es := testsuite.NewMockElasticServer() - defer es.Close() + mockES := testsuite.NewMockElasticServer() + defer mockES.Close() - client, err := elastic.NewClient( - elastic.SetURL(es.URL()), - elastic.SetHealthcheck(false), - elastic.SetSniff(false), - ) - require.NoError(t, err) + mockES.AddResponse(testdata.George.ID) + mockES.AddResponse(testdata.George.ID) + + es := mockES.Client() oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID) require.NoError(t, err) @@ -36,7 +33,6 @@ func TestGetContactIDsForQueryPage(t *testing.T) { Query string Sort string ExpectedESRequest string - MockedESResponse string ExpectedContacts []models.ContactID ExpectedTotal int64 ExpectedError string @@ -85,33 +81,6 @@ func TestGetContactIDsForQueryPage(t *testing.T) { ], "track_total_hits": true }`, - MockedESResponse: fmt.Sprintf(`{ - "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", - "took": 2, - "timed_out": false, - "_shards": { - "total": 1, - "successful": 1, - "skipped": 0, - "failed": 0 - }, - "hits": { - "total": 1, - "max_score": null, - "hits": [ - { - "_index": "contacts", - "_type": "_doc", - "_id": "%d", - "_score": null, - "_routing": "1", - "sort": [ - 15124352 - ] - } - ] - } - }`, testdata.George.ID), ExpectedContacts: []models.ContactID{testdata.George.ID}, ExpectedTotal: 1, }, @@ -197,33 +166,6 @@ func TestGetContactIDsForQueryPage(t *testing.T) { ], "track_total_hits": true }`, - MockedESResponse: fmt.Sprintf(`{ - "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", - "took": 2, - "timed_out": false, - "_shards": { - "total": 1, - "successful": 1, - "skipped": 0, - "failed": 0 - }, - "hits": { - "total": 1, - "max_score": null, - "hits": [ - { - "_index": "contacts", - "_type": "_doc", - "_id": "%d", - "_score": null, - "_routing": "1", - "sort": [ - 15124352 - ] - } - ] - } - }`, testdata.George.ID), ExpectedContacts: []models.ContactID{testdata.George.ID}, ExpectedTotal: 1, }, @@ -235,11 +177,9 @@ func TestGetContactIDsForQueryPage(t *testing.T) { } for i, tc := range tcs { - es.NextResponse = tc.MockedESResponse - group := oa.GroupByID(tc.Group.ID) - _, ids, total, err := models.GetContactIDsForQueryPage(ctx, client, oa, group, tc.ExcludeIDs, tc.Query, tc.Sort, 0, 50) + _, ids, total, err := models.GetContactIDsForQueryPage(ctx, es, oa, group, tc.ExcludeIDs, tc.Query, tc.Sort, 0, 50) if tc.ExpectedError != "" { assert.EqualError(t, err, tc.ExpectedError) @@ -248,7 +188,7 @@ func TestGetContactIDsForQueryPage(t *testing.T) { assert.Equal(t, tc.ExpectedContacts, ids, "%d: ids mismatch", i) assert.Equal(t, tc.ExpectedTotal, total, "%d: total mismatch", i) - test.AssertEqualJSON(t, []byte(tc.ExpectedESRequest), []byte(es.LastRequestBody), "%d: ES request mismatch", i) + test.AssertEqualJSON(t, []byte(tc.ExpectedESRequest), []byte(mockES.LastRequestBody), "%d: ES request mismatch", i) } } } @@ -256,14 +196,14 @@ func TestGetContactIDsForQueryPage(t *testing.T) { func TestGetContactIDsForQuery(t *testing.T) { ctx, rt, _, _ := testsuite.Get() - es := testsuite.NewMockElasticServer() - defer es.Close() + mockES := testsuite.NewMockElasticServer() + defer mockES.Close() - client, err := elastic.NewClient( - elastic.SetURL(es.URL()), - elastic.SetHealthcheck(false), - elastic.SetSniff(false), - ) + mockES.AddResponse(testdata.George.ID) + mockES.AddResponse() + mockES.AddResponse(testdata.George.ID) + + es, err := elastic.NewClient(elastic.SetURL(mockES.URL()), elastic.SetHealthcheck(false), elastic.SetSniff(false)) require.NoError(t, err) oa, err := models.GetOrgAssets(ctx, rt, 1) @@ -314,33 +254,6 @@ func TestGetContactIDsForQuery(t *testing.T) { }, "sort":["_doc"] }`, - mockedESResponse: fmt.Sprintf(`{ - "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", - "took": 2, - "timed_out": false, - "_shards": { - "total": 1, - "successful": 1, - "skipped": 0, - "failed": 0 - }, - "hits": { - "total": 1, - "max_score": null, - "hits": [ - { - "_index": "contacts", - "_type": "_doc", - "_id": "%d", - "_score": null, - "_routing": "1", - "sort": [ - 15124352 - ] - } - ] - } - }`, testdata.George.ID), expectedContacts: []models.ContactID{testdata.George.ID}, }, { query: "nobody", @@ -378,22 +291,6 @@ func TestGetContactIDsForQuery(t *testing.T) { }, "sort":["_doc"] }`, - mockedESResponse: `{ - "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", - "took": 2, - "timed_out": false, - "_shards": { - "total": 1, - "successful": 1, - "skipped": 0, - "failed": 0 - }, - "hits": { - "total": 0, - "max_score": null, - "hits": [] - } - }`, expectedContacts: []models.ContactID{}, }, { @@ -433,24 +330,6 @@ func TestGetContactIDsForQuery(t *testing.T) { }, "size": 1 }`, - mockedESResponse: fmt.Sprintf(`{ - "hits": { - "total": 1, - "max_score": null, - "hits": [ - { - "_index": "contacts", - "_type": "_doc", - "_id": "%d", - "_score": null, - "_routing": "1", - "sort": [ - 15124352 - ] - } - ] - } - }`, testdata.George.ID), expectedContacts: []models.ContactID{testdata.George.ID}, }, { @@ -461,9 +340,7 @@ func TestGetContactIDsForQuery(t *testing.T) { } for i, tc := range tcs { - es.NextResponse = tc.mockedESResponse - - ids, err := models.GetContactIDsForQuery(ctx, client, oa, tc.query, tc.limit) + ids, err := models.GetContactIDsForQuery(ctx, es, oa, tc.query, tc.limit) if tc.expectedError != "" { assert.EqualError(t, err, tc.expectedError) @@ -471,8 +348,8 @@ func TestGetContactIDsForQuery(t *testing.T) { assert.NoError(t, err, "%d: error encountered performing query", i) assert.Equal(t, tc.expectedContacts, ids, "%d: ids mismatch", i) - assert.Equal(t, tc.expectedRequestURL, es.LastRequestURL, "%d: request URL mismatch", i) - test.AssertEqualJSON(t, []byte(tc.expectedRequestBody), []byte(es.LastRequestBody), "%d: request body mismatch", i) + assert.Equal(t, tc.expectedRequestURL, mockES.LastRequestURL, "%d: request URL mismatch", i) + test.AssertEqualJSON(t, []byte(tc.expectedRequestBody), []byte(mockES.LastRequestBody), "%d: request body mismatch", i) } } } diff --git a/core/tasks/contacts/populate_dynamic_group_test.go b/core/tasks/contacts/populate_dynamic_group_test.go index 5398f0167..caefa03b0 100644 --- a/core/tasks/contacts/populate_dynamic_group_test.go +++ b/core/tasks/contacts/populate_dynamic_group_test.go @@ -1,7 +1,6 @@ package contacts_test import ( - "fmt" "testing" "github.com/nyaruka/gocommon/dates" @@ -10,7 +9,6 @@ import ( "github.com/nyaruka/mailroom/testsuite" "github.com/nyaruka/mailroom/testsuite/testdata" - "github.com/olivere/elastic/v7" "github.com/stretchr/testify/require" ) @@ -19,41 +17,12 @@ func TestPopulateTask(t *testing.T) { defer testsuite.Reset(testsuite.ResetAll) - mes := testsuite.NewMockElasticServer() - defer mes.Close() - es, err := elastic.NewClient( - elastic.SetURL(mes.URL()), - elastic.SetHealthcheck(false), - elastic.SetSniff(false), - ) - require.NoError(t, err) - rt.ES = es + mockES := testsuite.NewMockElasticServer() + defer mockES.Close() + + mockES.AddResponse(testdata.Cathy.ID) - mes.NextResponse = fmt.Sprintf(`{ - "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", - "took": 2, - "timed_out": false, - "_shards": { - "total": 1, - "successful": 1, - "skipped": 0, - "failed": 0 - }, - "hits": { - "total": 1, - "max_score": null, - "hits": [ - { - "_index": "contacts", - "_type": "_doc", - "_id": "%d", - "_score": null, - "_routing": "1", - "sort": [15124352] - } - ] - } - }`, testdata.Cathy.ID) + rt.ES = mockES.Client() group := testdata.InsertContactGroup(db, testdata.Org1, "e52fee05-2f95-4445-aef6-2fe7dac2fd56", "Women", "gender = F") start := dates.Now() @@ -62,7 +31,7 @@ func TestPopulateTask(t *testing.T) { GroupID: group.ID, Query: "gender = F", } - err = task.Perform(ctx, rt, testdata.Org1.ID) + err := task.Perform(ctx, rt, testdata.Org1.ID) require.NoError(t, err) assertdb.Query(t, db, `SELECT count(*) FROM contacts_contactgroup_contacts WHERE contactgroup_id = $1`, group.ID).Returns(1) diff --git a/core/tasks/starts/worker_test.go b/core/tasks/starts/worker_test.go index 81d3e37cd..1308353c7 100644 --- a/core/tasks/starts/worker_test.go +++ b/core/tasks/starts/worker_test.go @@ -2,7 +2,6 @@ package starts import ( "encoding/json" - "fmt" "testing" "github.com/nyaruka/gocommon/dbutil/assertdb" @@ -14,7 +13,6 @@ import ( "github.com/nyaruka/mailroom/testsuite" "github.com/nyaruka/mailroom/testsuite/testdata" - "github.com/olivere/elastic/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -26,16 +24,10 @@ func TestStarts(t *testing.T) { defer testsuite.Reset(testsuite.ResetAll) - mes := testsuite.NewMockElasticServer() - defer mes.Close() + mockES := testsuite.NewMockElasticServer() + defer mockES.Close() - es, err := elastic.NewClient( - elastic.SetURL(mes.URL()), - elastic.SetHealthcheck(false), - elastic.SetSniff(false), - ) - require.NoError(t, err) - rt.ES = es + rt.ES = mockES.Client() // convert our single message flow to an actual background flow that shouldn't interrupt db.MustExec(`UPDATE flows_flow SET flow_type = 'B' WHERE id = $1`, testdata.SingleMessage.ID) @@ -54,7 +46,7 @@ func TestStarts(t *testing.T) { contactIDs []models.ContactID createContact bool query string - queryResponse string + queryResult []models.ContactID restartParticipants bool includeActive bool queue string @@ -159,36 +151,10 @@ func TestStarts(t *testing.T) { expectedActiveRuns: map[models.FlowID]int{testdata.Favorites.ID: 123, testdata.PickANumber.ID: 0, testdata.SingleMessage.ID: 0}, }, { - label: "Query start", - flowID: testdata.Favorites.ID, - query: "bob", - queryResponse: fmt.Sprintf(`{ - "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", - "took": 2, - "timed_out": false, - "_shards": { - "total": 1, - "successful": 1, - "skipped": 0, - "failed": 0 - }, - "hits": { - "total": 1, - "max_score": null, - "hits": [ - { - "_index": "contacts", - "_type": "_doc", - "_id": "%d", - "_score": null, - "_routing": "1", - "sort": [ - 15124352 - ] - } - ] - } - }`, testdata.Bob.ID), + label: "Query start", + flowID: testdata.Favorites.ID, + query: "bob", + queryResult: []models.ContactID{testdata.Bob.ID}, restartParticipants: true, includeActive: true, queue: queue.HandlerQueue, @@ -263,7 +229,9 @@ func TestStarts(t *testing.T) { } for _, tc := range tcs { - mes.NextResponse = tc.queryResponse + if tc.queryResult != nil { + mockES.AddResponse(tc.queryResult...) + } // handle our start task start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeManual, models.FlowTypeMessaging, tc.flowID, tc.restartParticipants, tc.includeActive). diff --git a/testsuite/elastic.go b/testsuite/elastic.go index 0683fa2bc..d87e9357d 100644 --- a/testsuite/elastic.go +++ b/testsuite/elastic.go @@ -1,9 +1,14 @@ package testsuite import ( + "fmt" "io" "net/http" "net/http/httptest" + + "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/mailroom/core/models" + "github.com/olivere/elastic/v7" ) // MockElasticServer is a mock HTTP server/endpoint that can be used to test elastic queries @@ -11,7 +16,7 @@ type MockElasticServer struct { Server *httptest.Server LastRequestURL string LastRequestBody string - NextResponse string + Responses [][]byte } // NewMockElasticServer creates a new mock elastic server @@ -48,13 +53,24 @@ func NewMockElasticServer() *MockElasticServer { body, _ := io.ReadAll(r.Body) m.LastRequestBody = string(body) + if len(m.Responses) == 0 { + panic("mock elastic server has no more queued responses") + } + + var response []byte + response, m.Responses = m.Responses[0], m.Responses[1:] + w.WriteHeader(200) - w.Write([]byte(m.NextResponse)) - m.NextResponse = "" + w.Write(response) })) return m } +func (m *MockElasticServer) Client() *elastic.Client { + c, _ := elastic.NewClient(elastic.SetURL(m.URL()), elastic.SetHealthcheck(false), elastic.SetSniff(false)) + return c +} + // Close closes our HTTP server func (m *MockElasticServer) Close() { m.Server.Close() @@ -64,3 +80,36 @@ func (m *MockElasticServer) Close() { func (m *MockElasticServer) URL() string { return m.Server.URL } + +// AddResponse adds a mock response to the server's queue +func (m *MockElasticServer) AddResponse(ids ...models.ContactID) { + hits := make([]map[string]interface{}, len(ids)) + for i := range ids { + hits[i] = map[string]interface{}{ + "_index": "contacts", + "_type": "_doc", + "_id": fmt.Sprintf("%d", ids[i]), + "_score": nil, + "_routing": "1", + "sort": []int{15124352}, + } + } + + response := jsonx.MustMarshal(map[string]interface{}{ + "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", + "took": 2, + "timed_out": false, + "_shards": map[string]interface{}{ + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0, + }, + "hits": map[string]interface{}{ + "total": len(ids), + "max_score": nil, + "hits": hits, + }, + }) + m.Responses = append(m.Responses, response) +} diff --git a/web/contact/search_test.go b/web/contact/search_test.go index a377ec20f..1713b0dfe 100644 --- a/web/contact/search_test.go +++ b/web/contact/search_test.go @@ -18,26 +18,18 @@ import ( "github.com/nyaruka/mailroom/testsuite/testdata" "github.com/nyaruka/mailroom/web" - "github.com/olivere/elastic/v7" "github.com/stretchr/testify/assert" ) -func TestSearch(t *testing.T) { +func TestContactSearch(t *testing.T) { ctx, rt, _, _ := testsuite.Get() wg := &sync.WaitGroup{} - es := testsuite.NewMockElasticServer() - defer es.Close() + mockES := testsuite.NewMockElasticServer() + defer mockES.Close() - client, err := elastic.NewClient( - elastic.SetURL(es.URL()), - elastic.SetHealthcheck(false), - elastic.SetSniff(false), - ) - assert.NoError(t, err) - - rt.ES = client + rt.ES = mockES.Client() server := web.NewServer(ctx, rt, wg) server.Start() @@ -47,39 +39,11 @@ func TestSearch(t *testing.T) { defer server.Stop() - singleESResponse := fmt.Sprintf(`{ - "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", - "took": 2, - "timed_out": false, - "_shards": { - "total": 1, - "successful": 1, - "skipped": 0, - "failed": 0 - }, - "hits": { - "total": 1, - "max_score": null, - "hits": [ - { - "_index": "contacts", - "_type": "_doc", - "_id": "%d", - "_score": null, - "_routing": "1", - "sort": [ - 15124352 - ] - } - ] - } - }`, testdata.Cathy.ID) - tcs := []struct { method string url string body string - esResponse string + mockResult []models.ContactID expectedStatus int expectedError string expectedHits []models.ContactID @@ -114,7 +78,7 @@ func TestSearch(t *testing.T) { method: "POST", url: "/mr/contact/search", body: fmt.Sprintf(`{"org_id": 1, "query": "Cathy", "group_uuid": "%s"}`, testdata.ActiveGroup.UUID), - esResponse: singleESResponse, + mockResult: []models.ContactID{testdata.Cathy.ID}, expectedStatus: 200, expectedHits: []models.ContactID{testdata.Cathy.ID}, expectedQuery: `name ~ "Cathy"`, @@ -127,9 +91,9 @@ func TestSearch(t *testing.T) { method: "POST", url: "/mr/contact/search", body: fmt.Sprintf(`{"org_id": 1, "query": "Cathy", "group_uuid": "%s", "exclude_ids": [%d, %d]}`, testdata.ActiveGroup.UUID, testdata.Bob.ID, testdata.George.ID), - esResponse: singleESResponse, + mockResult: []models.ContactID{testdata.George.ID}, expectedStatus: 200, - expectedHits: []models.ContactID{testdata.Cathy.ID}, + expectedHits: []models.ContactID{testdata.George.ID}, expectedQuery: `name ~ "Cathy"`, expectedAttributes: []string{"name"}, expectedFields: []*assets.FieldReference{}, @@ -189,7 +153,7 @@ func TestSearch(t *testing.T) { method: "POST", url: "/mr/contact/search", body: fmt.Sprintf(`{"org_id": 1, "query": "AGE = 10 and gender = M", "group_uuid": "%s"}`, testdata.ActiveGroup.UUID), - esResponse: singleESResponse, + mockResult: []models.ContactID{testdata.Cathy.ID}, expectedStatus: 200, expectedHits: []models.ContactID{testdata.Cathy.ID}, expectedQuery: `age = 10 AND gender = "M"`, @@ -205,7 +169,7 @@ func TestSearch(t *testing.T) { method: "POST", url: "/mr/contact/search", body: fmt.Sprintf(`{"org_id": 1, "query": "", "group_uuid": "%s"}`, testdata.ActiveGroup.UUID), - esResponse: singleESResponse, + mockResult: []models.ContactID{testdata.Cathy.ID}, expectedStatus: 200, expectedHits: []models.ContactID{testdata.Cathy.ID}, expectedQuery: ``, @@ -217,9 +181,11 @@ func TestSearch(t *testing.T) { } for i, tc := range tcs { - var body io.Reader - es.NextResponse = tc.esResponse + if tc.mockResult != nil { + mockES.AddResponse(tc.mockResult...) + } + var body io.Reader if tc.body != "" { body = bytes.NewReader([]byte(tc.body)) } @@ -251,7 +217,7 @@ func TestSearch(t *testing.T) { } if tc.expectedESRequest != "" { - test.AssertEqualJSON(t, []byte(tc.expectedESRequest), []byte(es.LastRequestBody), "elastic request mismatch") + test.AssertEqualJSON(t, []byte(tc.expectedESRequest), []byte(mockES.LastRequestBody), "elastic request mismatch") } } else { r := &web.ErrorResponse{} diff --git a/web/flow/start.go b/web/flow/start.go new file mode 100644 index 000000000..193b13f2e --- /dev/null +++ b/web/flow/start.go @@ -0,0 +1,151 @@ +package flow + +import ( + "context" + "fmt" + "net/http" + "strings" + "time" + + "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/gocommon/urns" + "github.com/nyaruka/goflow/utils" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/runtime" + "github.com/nyaruka/mailroom/web" + "github.com/pkg/errors" +) + +func init() { + web.RegisterJSONRoute(http.MethodPost, "/mr/flow/preview_start", web.RequireAuthToken(handlePreviewStart)) +} + +// Generates a preview of which contacts will be started in the given flow. +// +// { +// "org_id": 1, +// "flow_id": 2, +// "contact_ids": [12, 34], +// "group_ids": [123, 345], +// "urns": ["tel:+1234567890"], +// "user_query": "", +// "exclusions": { +// "non_active": false, +// "in_a_flow": false, +// "started_previously": true, +// "not_seen_recently": false +// } +// } +// +// { +// "query": "(id = 12 OR id = 34 OR group = "No Age" OR group = "No Name" OR tel = "+1234567890") AND history != \"Registration\"", +// "count": 567, +// "sample": [12, 34, 56, 67, 78] +// } +// +type previewStartRequest struct { + OrgID models.OrgID `json:"org_id" validate:"required"` + FlowID models.FlowID `json:"flow_id" validate:"required"` + ContactIDs []models.ContactID `json:"contact_ids"` + GroupIDs []models.GroupID `json:"group_ids"` + URNs []urns.URN `json:"urns"` + Query string `json:"query"` + Exclusions Exclusions `json:"exclusions"` +} + +type previewStartResponse struct { + Query string `json:"query"` + Count int `json:"count"` + Sample []models.ContactID `json:"sample"` +} + +func handlePreviewStart(ctx context.Context, rt *runtime.Runtime, r *http.Request) (interface{}, int, error) { + request := &previewStartRequest{} + if err := utils.UnmarshalAndValidateWithLimit(r.Body, request, web.MaxRequestBytes); err != nil { + return errors.Wrapf(err, "request failed validation"), http.StatusBadRequest, nil + } + + oa, err := models.GetOrgAssets(ctx, rt, request.OrgID) + if err != nil { + return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets") + } + + flow, err := oa.FlowByID(request.FlowID) + if err != nil { + return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load flow") + } + + query := BuildStartQuery(oa, flow, request.Query, request.GroupIDs, request.ContactIDs, request.URNs, request.Exclusions) + if query == "" { + return &previewStartResponse{Query: "", Count: 0, Sample: []models.ContactID{}}, http.StatusOK, nil + } + + parsedQuery, sampleIDs, count, err := models.GetContactIDsForQueryPage(ctx, rt.ES, oa, nil, nil, query, "", 0, 5) + if err != nil { + return nil, http.StatusInternalServerError, errors.Wrapf(err, "error querying preview") + } + + return &previewStartResponse{Query: parsedQuery.String(), Count: int(count), Sample: sampleIDs}, http.StatusOK, nil +} + +//////////////// Query building support ////////////// + +// Exclusions are preset exclusion conditions +type Exclusions struct { + NonActive bool `json:"non_active"` // contacts who are blocked, stopped or archived + InAFlow bool `json:"in_a_flow"` // contacts who are currently in a flow (including this one) + StartedPreviously bool `json:"started_previously"` // contacts who have been in this flow in the last 90 days + NotSeenRecently bool `json:"not_seen_recently"` // contacts who have not been seen for more than 90 days +} + +// BuildStartQuery builds a start query for the given flow and start options +func BuildStartQuery(oa *models.OrgAssets, flow *models.Flow, userQuery string, groupIDs []models.GroupID, contactIDs []models.ContactID, urnz []urns.URN, excs Exclusions) string { + inclusions := make([]string, 0, 10) + if userQuery != "" { + inclusions = append(inclusions, fmt.Sprintf("(%s)", userQuery)) + } + for _, groupID := range groupIDs { + group := oa.GroupByID(groupID) + if group != nil { + inclusions = append(inclusions, fmt.Sprintf("group = \"%s\"", group.Name())) + } + } + for _, contactID := range contactIDs { + inclusions = append(inclusions, fmt.Sprintf("id = %d", contactID)) + } + for _, urn := range urnz { + scheme, path, _, _ := urn.ToParts() + inclusions = append(inclusions, fmt.Sprintf("%s = \"%s\"", scheme, path)) + } + + exclusions := make([]string, 0, 10) + if excs.NonActive { + exclusions = append(exclusions, "status = \"A\"") + } + if excs.InAFlow { + exclusions = append(exclusions, "flow = \"\"") + } + if excs.StartedPreviously { + exclusions = append(exclusions, fmt.Sprintf("history != \"%s\"", flow.Name())) + } + if excs.NotSeenRecently { + seenSince := dates.Now().Add(-time.Hour * 24 * 90) + exclusions = append(exclusions, fmt.Sprintf("last_seen_on > %s", formatQueryDate(oa, seenSince))) + } + + conditions := make([]string, 0, 10) + if len(inclusions) == 1 { + conditions = append(conditions, inclusions[0]) + } else if len(inclusions) > 1 { + conditions = append(conditions, fmt.Sprintf("(%s)", strings.Join(inclusions, " OR "))) + } + conditions = append(conditions, exclusions...) + + return strings.Join(conditions, " AND ") +} + +func formatQueryDate(oa *models.OrgAssets, t time.Time) string { + d := dates.ExtractDate(t.In(oa.Env().Timezone())) + s, _ := d.Format(string(oa.Env().DateFormat()), oa.Env().DefaultLocale().ToBCP47()) + return s +} diff --git a/web/flow/start_test.go b/web/flow/start_test.go new file mode 100644 index 000000000..6e2fac073 --- /dev/null +++ b/web/flow/start_test.go @@ -0,0 +1,25 @@ +package flow_test + +import ( + "testing" + + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + "github.com/nyaruka/mailroom/web" +) + +func TestPreviewStart(t *testing.T) { + ctx, rt, _, _ := testsuite.Get() + + mockES := testsuite.NewMockElasticServer() + defer mockES.Close() + + rt.ES = mockES.Client() + + mockES.AddResponse(testdata.Cathy.ID) + mockES.AddResponse(testdata.Bob.ID) + mockES.AddResponse(testdata.George.ID) + mockES.AddResponse(testdata.Alexandria.ID) + + web.RunWebTests(t, ctx, rt, "testdata/preview_start.json", nil) +} diff --git a/web/flow/testdata/preview_start.json b/web/flow/testdata/preview_start.json new file mode 100644 index 000000000..98f861d06 --- /dev/null +++ b/web/flow/testdata/preview_start.json @@ -0,0 +1,150 @@ +[ + { + "label": "illegal method", + "method": "GET", + "path": "/mr/flow/preview_start", + "status": 405, + "response": { + "error": "illegal method: GET" + } + }, + { + "label": "missing org or flow id", + "method": "POST", + "path": "/mr/flow/preview_start", + "body": {}, + "status": 400, + "response": { + "error": "request failed validation: field 'org_id' is required, field 'flow_id' is required" + } + }, + { + "label": "no inclusions or exclusions", + "method": "POST", + "path": "/mr/flow/preview_start", + "body": { + "org_id": 1, + "flow_id": 10001 + }, + "status": 200, + "response": { + "query": "", + "count": 0, + "sample": [] + } + }, + { + "label": "manual inclusions, no exclusions", + "method": "POST", + "path": "/mr/flow/preview_start", + "body": { + "org_id": 1, + "flow_id": 10001, + "group_ids": [ + 10000, + 10001 + ], + "contact_ids": [ + 1234, + 3456 + ], + "urns": [ + "tel:+1234567890", + "facebook:9876543210" + ], + "query": "" + }, + "status": 200, + "response": { + "query": "group = \"Doctors\" OR group = \"Testers\" OR id = 1234 OR id = 3456 OR tel = \"+1234567890\" OR facebook = 9876543210", + "count": 1, + "sample": [ + 10000 + ] + } + }, + { + "label": "query inclusion, no exclusions", + "method": "POST", + "path": "/mr/flow/preview_start", + "body": { + "org_id": 1, + "flow_id": 10001, + "group_ids": [], + "contact_ids": [], + "urns": [], + "query": "gender = M" + }, + "status": 200, + "response": { + "query": "gender = \"M\"", + "count": 1, + "sample": [ + 10001 + ] + } + }, + { + "label": "manual inclusions, all exclusions", + "method": "POST", + "path": "/mr/flow/preview_start", + "body": { + "org_id": 1, + "flow_id": 10001, + "group_ids": [ + 10000, + 10001 + ], + "contact_ids": [ + 1234, + 3456 + ], + "urns": [ + "tel:+1234567890", + "facebook:9876543210" + ], + "query": "", + "exclusions": { + "non_active": true, + "in_a_flow": true, + "started_previously": true, + "not_seen_recently": true + } + }, + "status": 200, + "response": { + "query": "(group = \"Doctors\" OR group = \"Testers\" OR id = 1234 OR id = 3456 OR tel = \"+1234567890\" OR facebook = 9876543210) AND status = \"A\" AND flow = \"\" AND history != \"Pick a Number\" AND last_seen_on > \"07-04-2018\"", + "count": 1, + "sample": [ + 10002 + ] + } + }, + { + "label": "query inclusion, all exclusions", + "method": "POST", + "path": "/mr/flow/preview_start", + "body": { + "org_id": 1, + "flow_id": 10001, + "group_ids": [], + "contact_ids": [], + "urns": [], + "query": "gender = M", + "exclusions": { + "non_active": true, + "in_a_flow": true, + "started_previously": true, + "not_seen_recently": true + } + }, + "status": 200, + "response": { + "query": "gender = \"M\" AND status = \"A\" AND flow = \"\" AND history != \"Pick a Number\" AND last_seen_on > \"07-04-2018\"", + "count": 1, + "sample": [ + 10003 + ] + } + } +] \ No newline at end of file From 57a1381c9cf8055ca8e16e46aa8f85c32d70bc55 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Tue, 19 Apr 2022 18:38:56 -0500 Subject: [PATCH 2/2] Update to latest goflow --- go.mod | 2 +- go.sum | 4 ++-- web/flow/start.go | 2 +- web/flow/testdata/preview_start.json | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 1e96ab1aa..0ba683103 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/lib/pq v1.10.4 github.com/nyaruka/ezconf v0.2.1 github.com/nyaruka/gocommon v1.18.0 - github.com/nyaruka/goflow v0.157.0 + github.com/nyaruka/goflow v0.158.0 github.com/nyaruka/librato v1.0.0 github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d github.com/nyaruka/null v1.2.0 diff --git a/go.sum b/go.sum index c87783649..3fe6e046f 100644 --- a/go.sum +++ b/go.sum @@ -137,8 +137,8 @@ github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDY github.com/nyaruka/gocommon v1.5.3/go.mod h1:2ZeBZF9yt20IaAJ4aC1ujojAsFhJBk2IuDvSl7KuQDw= github.com/nyaruka/gocommon v1.18.0 h1:pRSV63w449FVHrjeXJNNIyoD0k47ruqPq3fROuHBjD0= github.com/nyaruka/gocommon v1.18.0/go.mod h1:+jVWR2FB6XTqFz7fjvVIZMFwp2B6uoaCk7NVLGnfPFM= -github.com/nyaruka/goflow v0.157.0 h1:t8ilxL1Fi38Y/fiYBTB8o+jknxeRwKprgxaXkIhjfqM= -github.com/nyaruka/goflow v0.157.0/go.mod h1:J+FJ0iw1cjivEziBGpVPtTl9fuOz+ib558MCBdKLC8M= +github.com/nyaruka/goflow v0.158.0 h1:LQbZBUnW6AiAm3WB56pEEOPUfwggDxkluh5zjovfLf8= +github.com/nyaruka/goflow v0.158.0/go.mod h1:J+FJ0iw1cjivEziBGpVPtTl9fuOz+ib558MCBdKLC8M= github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0= github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg= github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc= diff --git a/web/flow/start.go b/web/flow/start.go index 193b13f2e..043a93a4c 100644 --- a/web/flow/start.go +++ b/web/flow/start.go @@ -120,7 +120,7 @@ func BuildStartQuery(oa *models.OrgAssets, flow *models.Flow, userQuery string, exclusions := make([]string, 0, 10) if excs.NonActive { - exclusions = append(exclusions, "status = \"A\"") + exclusions = append(exclusions, "status = \"active\"") } if excs.InAFlow { exclusions = append(exclusions, "flow = \"\"") diff --git a/web/flow/testdata/preview_start.json b/web/flow/testdata/preview_start.json index 98f861d06..ff5dde7ca 100644 --- a/web/flow/testdata/preview_start.json +++ b/web/flow/testdata/preview_start.json @@ -113,7 +113,7 @@ }, "status": 200, "response": { - "query": "(group = \"Doctors\" OR group = \"Testers\" OR id = 1234 OR id = 3456 OR tel = \"+1234567890\" OR facebook = 9876543210) AND status = \"A\" AND flow = \"\" AND history != \"Pick a Number\" AND last_seen_on > \"07-04-2018\"", + "query": "(group = \"Doctors\" OR group = \"Testers\" OR id = 1234 OR id = 3456 OR tel = \"+1234567890\" OR facebook = 9876543210) AND status = \"active\" AND flow = \"\" AND history != \"Pick a Number\" AND last_seen_on > \"07-04-2018\"", "count": 1, "sample": [ 10002 @@ -140,7 +140,7 @@ }, "status": 200, "response": { - "query": "gender = \"M\" AND status = \"A\" AND flow = \"\" AND history != \"Pick a Number\" AND last_seen_on > \"07-04-2018\"", + "query": "gender = \"M\" AND status = \"active\" AND flow = \"\" AND history != \"Pick a Number\" AND last_seen_on > \"07-04-2018\"", "count": 1, "sample": [ 10003