Skip to content

Commit

Permalink
latest goflow, add classifier reading, updated mailroom db
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Oct 10, 2019
1 parent cca1bb5 commit f355298
Show file tree
Hide file tree
Showing 15 changed files with 223 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.1.1
github.com/nyaruka/goflow v0.50.4
github.com/nyaruka/goflow v0.51.0
github.com/nyaruka/librato v0.0.0-20180827155909-cacc769357b8
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ github.com/nyaruka/gocommon v1.1.1 h1:RnQ+kMzN1lA+W0NpkBDd0mGU3UqadJygR3SMpITMYT
github.com/nyaruka/gocommon v1.1.1/go.mod h1:QbdU2J9WBsqBmeZRuwndf2f6O7rD7mkC0bGn5UNnwjI=
github.com/nyaruka/goflow v0.50.4 h1:71sGdTg6Ia7T/1uPPMUVo20clyW5rA8/f0JRXjTDRP0=
github.com/nyaruka/goflow v0.50.4/go.mod h1:tPppKrlURChn6oRUpCVTzM39f6kH3exSSsFjTXzIGTw=
github.com/nyaruka/goflow v0.51.0 h1:F4aVgeo5oA2N7iC+bwtJp9vx8RMaW2zKeBIOWhfxeuw=
github.com/nyaruka/goflow v0.51.0/go.mod h1:wI+5qZRajxxZvfYXqbmWauTWAJ0GOTTG3TZDkixU/E0=
github.com/nyaruka/librato v0.0.0-20180827155909-cacc769357b8 h1:TOvxy0u6LNTWP3gwbdNVCiByXJupr9ATFdzBnBJ2TY8=
github.com/nyaruka/librato v0.0.0-20180827155909-cacc769357b8/go.mod h1:huVocfMEHkttMHD4hSr/wjWNyTx/YMzwwajVzV2bq+0=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
6 changes: 4 additions & 2 deletions goflow/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/engine"
"github.com/nyaruka/goflow/providers/webhooks"
"github.com/nyaruka/goflow/services/webhooks"
"github.com/nyaruka/mailroom/config"
)

Expand All @@ -31,7 +31,9 @@ func Engine() flows.Engine {
engInit.Do(func() {
eng = engine.NewBuilder().
WithHTTPClient(httpClient).
WithWebhookService(webhooks.NewService("RapidProMailroom/"+config.Mailroom.Version, 10000)).
WithWebhookServiceFactory(func(flows.Session) flows.WebhookService {
return webhooks.NewService("RapidProMailroom/"+config.Mailroom.Version, 10000)
}).
WithMaxStepsPerSprint(config.Mailroom.MaxStepsPerSprint).
Build()
})
Expand Down
Binary file modified mailroom_test.dump
Binary file not shown.
21 changes: 21 additions & 0 deletions models/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type OrgAssets struct {
channelsByID map[ChannelID]*Channel
channelsByUUID map[assets.ChannelUUID]*Channel

classifiers []assets.Classifier
classifiersByUUID map[assets.ClassifierUUID]*Classifier

campaigns []*Campaign
campaignEventsByField map[FieldID][]*CampaignEvent
campaignEventsByID map[CampaignEventID]*CampaignEvent
Expand Down Expand Up @@ -87,6 +90,8 @@ func NewOrgAssets(ctx context.Context, db *sqlx.DB, orgID OrgID, prev *OrgAssets
channelsByID: make(map[ChannelID]*Channel),
channelsByUUID: make(map[assets.ChannelUUID]*Channel),

classifiersByUUID: make(map[assets.ClassifierUUID]*Classifier),

fieldsByUUID: make(map[assets.FieldUUID]*Field),
fieldsByKey: make(map[string]*Field),

Expand Down Expand Up @@ -121,6 +126,14 @@ func NewOrgAssets(ctx context.Context, db *sqlx.DB, orgID OrgID, prev *OrgAssets
o.channelsByUUID[channel.UUID()] = channel
}

o.classifiers, err = loadClassifiers(ctx, db, orgID)
if err != nil {
return nil, errors.Wrapf(err, "error loading classifier assets for org %d", orgID)
}
for _, c := range o.classifiers {
o.classifiersByUUID[c.UUID()] = c.(*Classifier)
}

o.fields, err = loadFields(ctx, db, orgID)
if err != nil {
return nil, errors.Wrapf(err, "error loading field assets for org %d", orgID)
Expand Down Expand Up @@ -273,6 +286,14 @@ func (a *OrgAssets) AddTestChannel(channel assets.Channel) {
// we don't populate our maps for uuid or id, shouldn't be used in any hook anyways
}

func (a *OrgAssets) Classifiers() ([]assets.Classifier, error) {
return a.classifiers, nil
}

func (a *OrgAssets) ClassifierByUUID(classifierUUID assets.ClassifierUUID) *Classifier {
return a.classifiersByUUID[classifierUUID]
}

func (a *OrgAssets) Fields() ([]assets.Field, error) {
return a.fields, nil
}
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions models/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (

type ChannelID null.Int

var NilChannelID = ChannelID(0)

type ChannelType string

const (
NilChannelID = ChannelID(0)

ChannelTypeAndroid = ChannelType("A")

ChannelConfigCallbackDomain = "callback_domain"
Expand Down
132 changes: 132 additions & 0 deletions models/classifiers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package models

import (
"context"
"database/sql/driver"
"time"

"github.com/jmoiron/sqlx"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/null"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// ClassifierID is our type for classifier ids
type ClassifierID null.Int

// NilClassifierID is our const for a nil classifier ID
const NilClassifierID = ClassifierID(0)

// Classifier is our type for a Classifier
type Classifier struct {
c struct {
ID ClassifierID `json:"id"`
UUID assets.ClassifierUUID `json:"uuid"`
Type string `json:"classifier_type"`
Name string `json:"name"`
Config map[string]interface{} `json:"config"`
Intents []struct {
Name string `json:"name"`
ExternalID string `json:"external_id"`
} `json:"intents"`

intentNames []string
}
}

// ID returns the ID of this classifier
func (c *Classifier) ID() ClassifierID { return c.c.ID }

// UUID returns our UUID
func (c *Classifier) UUID() assets.ClassifierUUID { return c.c.UUID }

// Name return our Name
func (c *Classifier) Name() string { return c.c.Name }

// Intents returns a list of our intent names
func (c *Classifier) Intents() []string { return c.c.intentNames }

// Type returns the type of this classifier
func (c *Classifier) Type() string { return c.c.Type }

// loadClassifiers loads all the classifiers for the passed in org
func loadClassifiers(ctx context.Context, db sqlx.Queryer, orgID OrgID) ([]assets.Classifier, error) {
start := time.Now()

rows, err := db.Queryx(selectClassifiersSQL, orgID)
if err != nil {
return nil, errors.Wrapf(err, "error querying classifiers for org: %d", orgID)
}
defer rows.Close()

classifiers := make([]assets.Classifier, 0, 2)
for rows.Next() {
classifier := &Classifier{}
err := readJSONRow(rows, &classifier.c)
if err != nil {
return nil, errors.Wrapf(err, "error unmarshalling classifier")
}

// populate our intent names
classifier.c.intentNames = make([]string, len(classifier.c.Intents))
for i, intent := range classifier.c.Intents {
classifier.c.intentNames[i] = intent.Name
}

classifiers = append(classifiers, classifier)
}

logrus.WithField("elapsed", time.Since(start)).WithField("org_id", orgID).WithField("count", len(classifiers)).Debug("loaded classifiers")

return classifiers, nil
}

const selectClassifiersSQL = `
SELECT ROW_TO_JSON(r) FROM (SELECT
c.id as id,
c.uuid as uuid,
c.name as name,
c.classifier_type as classifier_type,
c.config as config,
(SELECT ARRAY_AGG(ci) FROM (
SELECT
ci.name as name,
ci.external_id as external_id
FROM
classifiers_intent ci
WHERE
ci.classifier_id = c.id AND
ci.is_active = TRUE
ORDER BY
ci.created_on ASC
) ci) as intents
FROM
classifiers_classifier c
WHERE
c.org_id = $1 AND
c.is_active = TRUE
ORDER BY
c.created_on ASC
) r;
`

// MarshalJSON marshals into JSON. 0 values will become null
func (i ClassifierID) MarshalJSON() ([]byte, error) {
return null.Int(i).MarshalJSON()
}

// UnmarshalJSON unmarshals from JSON. null values become 0
func (i *ClassifierID) UnmarshalJSON(b []byte) error {
return null.UnmarshalInt(b, (*null.Int)(i))
}

// Value returns the db value, null is returned for 0
func (i ClassifierID) Value() (driver.Value, error) {
return null.Int(i).Value()
}

// Scan scans from the db value. null values become 0
func (i *ClassifierID) Scan(value interface{}) error {
return null.ScanInt(value, (*null.Int)(i))
}
37 changes: 37 additions & 0 deletions models/classifiers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package models

import (
"testing"

"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/mailroom/testsuite"
"github.com/stretchr/testify/assert"
)

func TestClassifiers(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()

classifiers, err := loadClassifiers(ctx, db, 1)
assert.NoError(t, err)

tcs := []struct {
ID ClassifierID
UUID assets.ClassifierUUID
Name string
Intents []string
}{
{LuisID, LuisUUID, "LUIS", []string{"book_flight", "book_car"}},
{WitID, WitUUID, "Wit.ai", []string{"register"}},
}

assert.Equal(t, len(tcs), len(classifiers))
for i, tc := range tcs {
c := classifiers[i].(*Classifier)
assert.Equal(t, tc.UUID, c.UUID())
assert.Equal(t, tc.ID, c.ID())
assert.Equal(t, tc.Name, c.Name())
assert.Equal(t, tc.Intents, c.Intents())
}

}
6 changes: 6 additions & 0 deletions models/test_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ var ReportingLabelUUID = assets.LabelUUID("ebc4dedc-91c4-4ed4-9dd6-daa05ea82698"
var TestingLabelID = LabelID(10001)
var TestingLabelUUID = assets.LabelUUID("a6338cdc-7938-4437-8b05-2d5d785e3a08")

var LuisID = ClassifierID(1)
var LuisUUID = assets.ClassifierUUID("097e026c-ae79-4740-af67-656dbedf0263")

var WitID = ClassifierID(2)
var WitUUID = assets.ClassifierUUID("ff2a817c-040a-4eb2-8404-7d92e8b79dd0")

// constants for org 2, just a few here

var Org2 = OrgID(2)
Expand Down
4 changes: 2 additions & 2 deletions runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func TestCampaignStarts(t *testing.T) {
db.MustExec(`INSERT INTO campaigns_eventfire(event_id, scheduled, contact_id) VALUES($1, $2, $3),($1, $2, $4),($1, $2, $5);`, models.RemindersEvent2ID, now, models.CathyID, models.BobID, models.AlexandriaID)

// create an active session for Alexandria to test skipping
db.MustExec(`INSERT INTO flows_flowsession(session_type, org_id, contact_id, status, responded, created_on, current_flow_id) VALUES('M', $1, $2, 'W', FALSE, NOW(), $3);`, models.Org1, models.AlexandriaID, models.FavoritesFlowID)
db.MustExec(`INSERT INTO flows_flowsession(uuid, session_type, org_id, contact_id, status, responded, created_on, current_flow_id) VALUES($1, 'M', $2, $3, 'W', FALSE, NOW(), $4);`, uuids.New(), models.Org1, models.AlexandriaID, models.FavoritesFlowID)

// create an active voice call for Cathy to make sure it doesn't get interrupted or cause skipping
db.MustExec(`INSERT INTO flows_flowsession(session_type, org_id, contact_id, status, responded, created_on, current_flow_id) VALUES('V', $1, $2, 'W', FALSE, NOW(), $3);`, models.Org1, models.CathyID, models.IVRFlowID)
db.MustExec(`INSERT INTO flows_flowsession(uuid, session_type, org_id, contact_id, status, responded, created_on, current_flow_id) VALUES($1, 'V', $2, $3, 'W', FALSE, NOW(), $4);`, uuids.New(), models.Org1, models.CathyID, models.IVRFlowID)

// set our event to skip
db.MustExec(`UPDATE campaigns_campaignevent SET start_mode = 'S' WHERE id= $1`, models.RemindersEvent2ID)
Expand Down
13 changes: 7 additions & 6 deletions tasks/expirations/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"testing"
"time"

"github.com/nyaruka/mailroom/tasks/handler"
"github.com/nyaruka/goflow/utils/uuids"
_ "github.com/nyaruka/mailroom/hooks"
"github.com/nyaruka/mailroom/marker"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/queue"
"github.com/nyaruka/mailroom/tasks/handler"
"github.com/nyaruka/mailroom/testsuite"
"github.com/stretchr/testify/assert"
)
Expand All @@ -34,24 +35,24 @@ func TestExpirations(t *testing.T) {

// create a few sessions
var s1, s2 models.SessionID
err = db.Get(&s1, `INSERT INTO flows_flowsession(org_id, status, responded, contact_id, created_on) VALUES (1, 'W', TRUE, $1, NOW()) RETURNING id;`, models.CathyID)
err = db.Get(&s1, `INSERT INTO flows_flowsession(uuid, org_id, status, responded, contact_id, created_on) VALUES ($1, 1, 'W', TRUE, $2, NOW()) RETURNING id;`, uuids.New(), models.CathyID)
assert.NoError(t, err)

err = db.Get(&s2, `INSERT INTO flows_flowsession(org_id, status, responded, contact_id, created_on) VALUES (1, 'W', TRUE, $1, NOW()) RETURNING id;`, models.GeorgeID)
err = db.Get(&s2, `INSERT INTO flows_flowsession(uuid, org_id, status, responded, contact_id, created_on) VALUES ($1, 1, 'W', TRUE, $2, NOW()) RETURNING id;`, uuids.New(), models.GeorgeID)
assert.NoError(t, err)

var r1, r2, r3 models.FlowRunID

// simple run, no parent
err = db.Get(&r1, `INSERT INTO flows_flowrun(session_id, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, 'f240ab19-ed5d-4b51-b934-f2fbb9f8e5ad', TRUE, NOW(), NOW(), TRUE, $2, $3, 1, NOW()) RETURNING id;`, s1, models.CathyID, models.FavoritesFlowID)
err = db.Get(&r1, `INSERT INTO flows_flowrun(session_id, status, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'f240ab19-ed5d-4b51-b934-f2fbb9f8e5ad', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, s1, models.RunStatusWaiting, models.CathyID, models.FavoritesFlowID)
assert.NoError(t, err)

// parent run
err = db.Get(&r2, `INSERT INTO flows_flowrun(session_id, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, 'c4126b59-7a61-4ed5-a2da-c7857580355b', TRUE, NOW(), NOW(), TRUE, $2, $3, 1, NOW() + interval '1' day) RETURNING id;`, s2, models.GeorgeID, models.FavoritesFlowID)
err = db.Get(&r2, `INSERT INTO flows_flowrun(session_id, status, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'c4126b59-7a61-4ed5-a2da-c7857580355b', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW() + interval '1' day) RETURNING id;`, s2, models.RunStatusWaiting, models.GeorgeID, models.FavoritesFlowID)
assert.NoError(t, err)

// child run
err = db.Get(&r3, `INSERT INTO flows_flowrun(session_id, parent_uuid, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, 'c4126b59-7a61-4ed5-a2da-c7857580355b', 'a87b7079-5a3c-4e5f-8a6a-62084807c522', TRUE, NOW(), NOW(), TRUE, $2, $3, 1, NOW()) RETURNING id;`, s2, models.GeorgeID, models.FavoritesFlowID)
err = db.Get(&r3, `INSERT INTO flows_flowrun(session_id, status, parent_uuid, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'c4126b59-7a61-4ed5-a2da-c7857580355b', 'a87b7079-5a3c-4e5f-8a6a-62084807c522', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, s2, models.RunStatusWaiting, models.GeorgeID, models.FavoritesFlowID)
assert.NoError(t, err)

time.Sleep(10 * time.Millisecond)
Expand Down
6 changes: 3 additions & 3 deletions tasks/interrupts/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func TestInterrupts(t *testing.T) {
insertSession := func(orgID models.OrgID, contactID models.ContactID, connectionID models.ConnectionID, currentFlowID models.FlowID) models.SessionID {
var sessionID models.SessionID
err := db.Get(&sessionID,
`INSERT INTO flows_flowsession(status, responded, created_on, org_id, contact_id, connection_id, current_flow_id)
VALUES('W', false, NOW(), $1, $2, $3, $4) RETURNING id`,
orgID, contactID, connectionID, currentFlowID)
`INSERT INTO flows_flowsession(uuid, status, responded, created_on, org_id, contact_id, connection_id, current_flow_id)
VALUES($1, 'W', false, NOW(), $2, $3, $4, $5) RETURNING id`,
uuids.New(), orgID, contactID, connectionID, currentFlowID)
assert.NoError(t, err)

// give session one active run too
Expand Down
4 changes: 2 additions & 2 deletions tasks/starts/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestStarts(t *testing.T) {
// insert a flow run for one of our contacts
// TODO: can be replaced with a normal flow start of another flow once we support flows with waits
db.MustExec(
`INSERT INTO flows_flowrun(uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id)
VALUES($1, TRUE, now(), now(), FALSE, $2, $3, 1);`, uuids.New(), models.GeorgeID, models.SingleMessageFlowID)
`INSERT INTO flows_flowrun(uuid, status, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id)
VALUES($1, 'W', TRUE, now(), now(), FALSE, $2, $3, 1);`, uuids.New(), models.GeorgeID, models.SingleMessageFlowID)

tcs := []struct {
Label string
Expand Down
7 changes: 4 additions & 3 deletions tasks/timeouts/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"testing"
"time"

"github.com/nyaruka/mailroom/tasks/handler"
"github.com/nyaruka/goflow/utils/uuids"
_ "github.com/nyaruka/mailroom/hooks"
"github.com/nyaruka/mailroom/marker"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/queue"
"github.com/nyaruka/mailroom/tasks/handler"
"github.com/nyaruka/mailroom/testsuite"
"github.com/stretchr/testify/assert"
)
Expand All @@ -26,8 +27,8 @@ func TestTimeouts(t *testing.T) {

// need to create a session that has an expired timeout
db := testsuite.DB()
db.MustExec(`INSERT INTO flows_flowsession(org_id, status, responded, contact_id, created_on, timeout_on) VALUES (1, 'W', TRUE, $1, NOW(), NOW());`, models.CathyID)
db.MustExec(`INSERT INTO flows_flowsession(org_id, status, responded, contact_id, created_on, timeout_on) VALUES (1, 'W', TRUE, $1, NOW(), NOW()+ interval '1' day);`, models.GeorgeID)
db.MustExec(`INSERT INTO flows_flowsession(uuid, org_id, status, responded, contact_id, created_on, timeout_on) VALUES ($1, 1, 'W', TRUE, $2, NOW(), NOW());`, uuids.New(), models.CathyID)
db.MustExec(`INSERT INTO flows_flowsession(uuid, org_id, status, responded, contact_id, created_on, timeout_on) VALUES ($1, 1, 'W', TRUE, $2, NOW(), NOW()+ interval '1' day);`, uuids.New(), models.GeorgeID)
time.Sleep(10 * time.Millisecond)

// schedule our timeouts
Expand Down

0 comments on commit f355298

Please sign in to comment.