Skip to content

Commit

Permalink
Rework message events to use channel UUID and include channel type
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Sep 29, 2022
1 parent 800fe9d commit f87ec37
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 56 deletions.
29 changes: 15 additions & 14 deletions core/tasks/handler/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,23 @@ func RetryPendingMsgs(ctx context.Context, rt *runtime.Runtime) error {

const unhandledMsgsQuery = `
SELECT org_id, contact_id, msg_id, ROW_TO_JSON(r) FROM (SELECT
m.contact_id as contact_id,
m.org_id as org_id,
m.channel_id as channel_id,
m.id as msg_id,
m.uuid as msg_uuid,
m.external_id as msg_external_id,
u.identity as urn,
m.contact_urn_id as urn_id,
m.text as text,
m.attachments as attachments
m.contact_id AS contact_id,
m.org_id AS org_id,
c.id AS channel_id,
c.uuid AS channel_uuid,
c.channel_type AS channel_type,
m.id AS msg_id,
m.uuid AS msg_uuid,
m.external_id AS msg_external_id,
u.identity AS urn,
m.contact_urn_id AS urn_id,
m.text AS text,
m.attachments AS attachments
FROM
msgs_msg m
JOIN contacts_contacturn as u ON m.contact_urn_id = u.id
INNER JOIN channels_channel c ON c.id = m.channel_id
INNER JOIN contacts_contacturn u ON u.id = m.contact_urn_id
WHERE
m.direction = 'I' AND
m.status = 'P' AND
m.created_on < now() - INTERVAL '5 min'
m.direction = 'I' AND m.status = 'P' AND m.created_on < now() - INTERVAL '5 min'
) r;
`
70 changes: 36 additions & 34 deletions core/tasks/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,15 @@ func TestMsgEvents(t *testing.T) {

makeMsgTask := func(org *testdata.Org, channel *testdata.Channel, contact *testdata.Contact, text string) *queue.Task {
return &queue.Task{Type: handler.MsgEventType, OrgID: int(org.ID), Task: jsonx.MustMarshal(&handler.MsgEvent{
ContactID: contact.ID,
OrgID: org.ID,
ChannelID: channel.ID,
MsgID: dbMsg.ID(),
MsgUUID: dbMsg.UUID(),
URN: contact.URN,
URNID: contact.URNID,
Text: text,
ContactID: contact.ID,
OrgID: org.ID,
ChannelUUID: channel.UUID,
ChannelType: channel.Type,
MsgID: dbMsg.ID(),
MsgUUID: dbMsg.UUID(),
URN: contact.URN,
URNID: contact.URNID,
Text: text,
})}
}

Expand Down Expand Up @@ -587,47 +588,47 @@ func TestTimedEvents(t *testing.T) {
Contact *testdata.Contact
Message string
Response string
ChannelID models.ChannelID
OrgID models.OrgID
Channel *testdata.Channel
Org *testdata.Org
}{
// 0: start the flow
{handler.MsgEventType, testdata.Cathy, "start", "What is your favorite color?", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.MsgEventType, testdata.Cathy, "start", "What is your favorite color?", testdata.TwitterChannel, testdata.Org1},

// 1: this expiration does nothing because the times don't match
{handler.ExpirationEventType, testdata.Cathy, "bad", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.ExpirationEventType, testdata.Cathy, "bad", "", testdata.TwitterChannel, testdata.Org1},

// 2: this checks that the flow wasn't expired
{handler.MsgEventType, testdata.Cathy, "red", "Good choice, I like Red too! What is your favorite beer?", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.MsgEventType, testdata.Cathy, "red", "Good choice, I like Red too! What is your favorite beer?", testdata.TwitterChannel, testdata.Org1},

// 3: this expiration will actually take
{handler.ExpirationEventType, testdata.Cathy, "good", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.ExpirationEventType, testdata.Cathy, "good", "", testdata.TwitterChannel, testdata.Org1},

// 4: we won't get a response as we will be out of the flow
{handler.MsgEventType, testdata.Cathy, "mutzig", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.MsgEventType, testdata.Cathy, "mutzig", "", testdata.TwitterChannel, testdata.Org1},

// 5: start the parent expiration flow
{handler.MsgEventType, testdata.Cathy, "parent", "Child", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.MsgEventType, testdata.Cathy, "parent", "Child", testdata.TwitterChannel, testdata.Org1},

// 6: respond, should bring us out
{handler.MsgEventType, testdata.Cathy, "hi", "Completed", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.MsgEventType, testdata.Cathy, "hi", "Completed", testdata.TwitterChannel, testdata.Org1},

// 7: expiring our child should be a no op
{handler.ExpirationEventType, testdata.Cathy, "child", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.ExpirationEventType, testdata.Cathy, "child", "", testdata.TwitterChannel, testdata.Org1},

// 8: respond one last time, should be done
{handler.MsgEventType, testdata.Cathy, "done", "Ended", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.MsgEventType, testdata.Cathy, "done", "Ended", testdata.TwitterChannel, testdata.Org1},

// 9: start our favorite flow again
{handler.MsgEventType, testdata.Cathy, "start", "What is your favorite color?", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.MsgEventType, testdata.Cathy, "start", "What is your favorite color?", testdata.TwitterChannel, testdata.Org1},

// 10: timeout on the color question
{handler.TimeoutEventType, testdata.Cathy, "", "Sorry you can't participate right now, I'll try again later.", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.TimeoutEventType, testdata.Cathy, "", "Sorry you can't participate right now, I'll try again later.", testdata.TwitterChannel, testdata.Org1},

// 11: start the pick a number flow
{handler.MsgEventType, testdata.Cathy, "pick", "Pick a number between 1-10.", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.MsgEventType, testdata.Cathy, "pick", "Pick a number between 1-10.", testdata.TwitterChannel, testdata.Org1},

// 12: try to resume with timeout even tho flow doesn't have one set
{handler.TimeoutEventType, testdata.Cathy, "", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
{handler.TimeoutEventType, testdata.Cathy, "", "", testdata.TwitterChannel, testdata.Org1},
}

last := time.Now()
Expand All @@ -642,16 +643,17 @@ func TestTimedEvents(t *testing.T) {
if tc.EventType == handler.MsgEventType {
task = &queue.Task{
Type: tc.EventType,
OrgID: int(tc.OrgID),
OrgID: int(tc.Org.ID),
Task: jsonx.MustMarshal(&handler.MsgEvent{
ContactID: tc.Contact.ID,
OrgID: tc.OrgID,
ChannelID: tc.ChannelID,
MsgID: flows.MsgID(1),
MsgUUID: flows.MsgUUID(uuids.New()),
URN: tc.Contact.URN,
URNID: tc.Contact.URNID,
Text: tc.Message,
ContactID: tc.Contact.ID,
OrgID: tc.Org.ID,
ChannelUUID: tc.Channel.UUID,
ChannelType: tc.Channel.Type,
MsgID: flows.MsgID(1),
MsgUUID: flows.MsgUUID(uuids.New()),
URN: tc.Contact.URN,
URNID: tc.Contact.URNID,
Text: tc.Message,
}),
}
} else if tc.EventType == handler.ExpirationEventType {
Expand All @@ -666,15 +668,15 @@ func TestTimedEvents(t *testing.T) {
expiration = time.Now().Add(time.Hour * 24)
}

task = handler.NewExpirationTask(tc.OrgID, tc.Contact.ID, sessionID, expiration)
task = handler.NewExpirationTask(tc.Org.ID, tc.Contact.ID, sessionID, expiration)

} else if tc.EventType == handler.TimeoutEventType {
timeoutOn := time.Now().Round(time.Millisecond) // so that there's no difference between this and what we read from the db

// usually courier will set timeout_on after sending the last message
db.MustExec(`UPDATE flows_flowsession SET timeout_on = $2 WHERE id = $1`, sessionID, timeoutOn)

task = handler.NewTimeoutTask(tc.OrgID, tc.Contact.ID, sessionID, timeoutOn)
task = handler.NewTimeoutTask(tc.Org.ID, tc.Contact.ID, sessionID, timeoutOn)
}

err := handler.QueueHandleTask(rc, tc.Contact.ID, task)
Expand Down
6 changes: 4 additions & 2 deletions core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/excellent/types"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/engine"
Expand Down Expand Up @@ -504,7 +505,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e
modelContact := contacts[0]

// load the channel for this message
channel := oa.ChannelByID(event.ChannelID)
channel := oa.ChannelByUUID(event.ChannelUUID)

// if we have URNs make sure the message URN is our highest priority (this is usually a noop)
if len(modelContact.URNs()) > 0 {
Expand Down Expand Up @@ -798,7 +799,8 @@ type TimedEvent struct {
type MsgEvent struct {
ContactID models.ContactID `json:"contact_id"`
OrgID models.OrgID `json:"org_id"`
ChannelID models.ChannelID `json:"channel_id"`
ChannelUUID assets.ChannelUUID `json:"channel_uuid"`
ChannelType models.ChannelType `json:"channel_type"`
MsgID flows.MsgID `json:"msg_id"`
MsgUUID flows.MsgUUID `json:"msg_uuid"`
MsgExternalID null.String `json:"msg_external_id"`
Expand Down
5 changes: 3 additions & 2 deletions testsuite/testdata/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ import (
type Channel struct {
ID models.ChannelID
UUID assets.ChannelUUID
Type models.ChannelType
}

// InsertChannel inserts a channel
func InsertChannel(db *sqlx.DB, org *Org, channelType, name string, schemes []string, role string, config map[string]interface{}) *Channel {
func InsertChannel(db *sqlx.DB, org *Org, channelType models.ChannelType, name string, schemes []string, role string, config map[string]interface{}) *Channel {
uuid := assets.ChannelUUID(uuids.New())
var id models.ChannelID
must(db.Get(&id,
`INSERT INTO channels_channel(uuid, org_id, channel_type, name, schemes, role, config, last_seen, is_system, is_active, created_on, modified_on, created_by_id, modified_by_id)
VALUES($1, $2, $3, $4, $5, $6, $7, NOW(), FALSE, TRUE, NOW(), NOW(), 1, 1) RETURNING id`, uuid, org.ID, channelType, name, pq.Array(schemes), role, null.NewMap(config),
))
return &Channel{id, uuid}
return &Channel{ID: id, UUID: uuid, Type: channelType}
}

// InsertCall inserts a call
Expand Down
8 changes: 4 additions & 4 deletions testsuite/testdata/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ var Viewer = &User{5, "viewer1@nyaruka.com"}
var Agent = &User{6, "agent1@nyaruka.com"}
var Surveyor = &User{7, "surveyor1@nyaruka.com"}

var TwilioChannel = &Channel{10000, "74729f45-7f29-4868-9dc4-90e491e3c7d8"}
var VonageChannel = &Channel{10001, "19012bfd-3ce3-4cae-9bb9-76cf92c73d49"}
var TwitterChannel = &Channel{10002, "0f661e8b-ea9d-4bd3-9953-d368340acf91"}
var TwilioChannel = &Channel{10000, "74729f45-7f29-4868-9dc4-90e491e3c7d8", "T"}
var VonageChannel = &Channel{10001, "19012bfd-3ce3-4cae-9bb9-76cf92c73d49", "NX"}
var TwitterChannel = &Channel{10002, "0f661e8b-ea9d-4bd3-9953-d368340acf91", "TT"}

var Cathy = &Contact{10000, "6393abc0-283d-4c9b-a1b3-641a035c34bf", "tel:+16055741111", 10000}
var Bob = &Contact{10001, "b699a406-7e44-49be-9f01-1a82893e8a10", "tel:+16055742222", 10001}
Expand Down Expand Up @@ -69,7 +69,7 @@ var RemindersEvent3 = &CampaignEvent{10002, "3e4f06c2-e04f-47ca-a047-f5252b3160e
// secondary org.. only a few things
var Org2 = &Org{2, "3ae7cdeb-fd96-46e5-abc4-a4622f349921"}
var Org2Admin = &User{8, "admin2@nyaruka.com"}
var Org2Channel = &Channel{20000, "a89bc872-3763-4b95-91d9-31d4e56c6651"}
var Org2Channel = &Channel{20000, "a89bc872-3763-4b95-91d9-31d4e56c6651", "T"}
var Org2Contact = &Contact{20000, "26d20b72-f7d8-44dc-87f2-aae046dbff95", "tel:+250700000005", 20000}
var Org2Favorites = &Flow{20000, "f161bd16-3c60-40bd-8c92-228ce815b9cd"}
var Org2SingleMessage = &Flow{20001, "5277916d-6011-41ac-a4a4-f6ac6a4f1dd9"}
Expand Down

0 comments on commit f87ec37

Please sign in to comment.