Skip to content

Commit

Permalink
Make LUA script to queue messages to courier easier to understand
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Nov 10, 2021
1 parent 28bdca9 commit d57fd61
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 50 deletions.
95 changes: 50 additions & 45 deletions core/msgio/courier.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package msgio

import (
"encoding/json"
"strconv"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/mailroom/core/models"

Expand All @@ -14,20 +15,61 @@ import (
)

const (
highPriority = 1
defaultPriority = 0
bulkPriority = 0
highPriority = 1
)

var queuePushScript = redis.NewScript(6, `
-- KEYS: [QueueType, QueueName, TPS, Priority, Items, EpochSecs]
local queueType, queueName, tps, priority, items, epochSecs = KEYS[1], KEYS[2], tonumber(KEYS[3]), KEYS[4], KEYS[5], KEYS[6]
-- first construct the base key for this queue from the type + name + tps, e.g. "msgs:0a77a158-1dcb-4c06-9aee-e15bdf64653e|10"
local queueKey = queueType .. ":" .. queueName .. "|" .. tps
-- each queue than has two sorted sets for bulk and high priority items, e.g. "msgs:0a77..653e|10/0" vs msgs:0a77..653e|10/1"
local priorityQueueKey = queueKey .. "/" .. priority
-- add the items to the sorted set using the full timestamp (e.g. 1636556789.123456) as the score
redis.call("ZADD", priorityQueueKey, epochSecs, items)
-- if we have a TPS limit, check the transaction counter for this epoch second to see if have already reached it
local curr = -1
if tps > 0 then
local tpsKey = queueKey .. ":tps:" .. math.floor(epochSecs) -- e.g. "msgs:0a77..4653e|10:tps:1636556789"
curr = tonumber(redis.call("GET", tpsKey))
end
-- if we haven't hit the limit, add this queue to set of active queues
if not curr or curr < tps then
redis.call("ZINCRBY", queueType .. ":active", 0, queueKey)
return 1
else
return 0
end
`)

// PushCourierBatch pushes a batch of messages for a single contact and channel onto the appropriate courier queue
func PushCourierBatch(rc redis.Conn, ch *models.Channel, batch []*models.Msg, timestamp string) error {
priority := bulkPriority
if batch[0].HighPriority() {
priority = highPriority
}
batchJSON := jsonx.MustMarshal(batch)

_, err := queuePushScript.Do(rc, "msgs", ch.UUID(), ch.TPS(), priority, batchJSON, timestamp)
return err
}

// QueueCourierMessages queues messages for a single contact to Courier
func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*models.Msg) error {
if len(msgs) == 0 {
return nil
}

now := time.Now()
epochMS := strconv.FormatFloat(float64(now.UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64)

priority := defaultPriority
// get the time in seconds since the epoch as a floating point number
// e.g. 2021-11-10T15:10:49.123456+00:00 => "1636557205.123456"
now := dates.Now()
epochSeconds := strconv.FormatFloat(float64(now.UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64)

// we batch msgs by channel uuid
batch := make([]*models.Msg, 0, len(msgs))
Expand All @@ -36,17 +78,8 @@ func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*mod
// commits our batch to redis
commitBatch := func() error {
if len(batch) > 0 {
priority = defaultPriority
if batch[0].HighPriority() {
priority = highPriority
}

batchJSON, err := json.Marshal(batch)
if err != nil {
return err
}
start := time.Now()
_, err = queueMsg.Do(rc, epochMS, "msgs", currentChannel.UUID(), currentChannel.TPS(), priority, batchJSON)
err := PushCourierBatch(rc, currentChannel, batch, epochSeconds)
if err != nil {
return err
}
Expand Down Expand Up @@ -101,31 +134,3 @@ func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*mod
// any remaining in our batch, queue it up
return commitBatch()
}

var queueMsg = redis.NewScript(6, `
-- KEYS: [EpochMS, QueueType, QueueName, TPS, Priority, Value]
-- first push onto our specific queue
-- our queue name is built from the type, name and tps, usually something like: "msgs:uuid1-uuid2-uuid3-uuid4|tps"
local queueKey = KEYS[2] .. ":" .. KEYS[3] .. "|" .. KEYS[4]
-- our priority queue name also includes the priority of the message (we have one queue for default and one for bulk)
local priorityQueueKey = queueKey .. "/" .. KEYS[5]
redis.call("zadd", priorityQueueKey, KEYS[1], KEYS[6])
local tps = tonumber(KEYS[4])
-- if we have a TPS, check whether we are currently throttled
local curr = -1
if tps > 0 then
local tpsKey = queueKey .. ":tps:" .. math.floor(KEYS[1])
curr = tonumber(redis.call("get", tpsKey))
end
-- if we aren't then add to our active
if not curr or curr < tps then
redis.call("zincrby", KEYS[2] .. ":active", 0, queueKey)
return 1
else
return 0
end
`)
72 changes: 72 additions & 0 deletions core/msgio/courier_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package msgio_test

import (
"encoding/json"
"testing"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/msgio"
"github.com/nyaruka/mailroom/testsuite"
Expand Down Expand Up @@ -98,3 +101,72 @@ func TestQueueCourierMessages(t *testing.T) {
msgio.QueueCourierMessages(rc, testdata.Cathy.ID, []*models.Msg{ms.createMsg(t, rt, oa)})
})
}

func TestPushCourierBatch(t *testing.T) {
ctx, rt, _, rp := testsuite.Get()
rc := rp.Get()
defer rc.Close()

defer testsuite.Reset(testsuite.ResetData | testsuite.ResetRedis)

oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshChannels)
require.NoError(t, err)

channel := oa.ChannelByID(testdata.TwilioChannel.ID)

msg1 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa)
msg2 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa)

err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg1, msg2}, "1636557205.123456")
require.NoError(t, err)

// check that channel has been added to active list
msgsActive, err := redis.Strings(rc.Do("ZRANGE", "msgs:active", 0, -1))
assert.NoError(t, err)
assert.Equal(t, []string{"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10"}, msgsActive)

// and that msgs were added as single batch to bulk priority (0) queue
queued, err := redis.ByteSlices(rc.Do("ZRANGE", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0", 0, -1))
assert.NoError(t, err)
assert.Equal(t, 1, len(queued))

unmarshaled, err := jsonx.DecodeGeneric(queued[0])
assert.NoError(t, err)
assert.Equal(t, 2, len(unmarshaled.([]interface{})))

item1ID, _ := unmarshaled.([]interface{})[0].(map[string]interface{})["id"].(json.Number).Int64()
item2ID, _ := unmarshaled.([]interface{})[1].(map[string]interface{})["id"].(json.Number).Int64()
assert.Equal(t, int64(msg1.ID()), item1ID)
assert.Equal(t, int64(msg2.ID()), item2ID)

// push another batch in the same epoch second with transaction counter still below limit
rc.Do("SET", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10:tps:1636557205", "5")

msg3 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa)

err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg3}, "1636557205.234567")
require.NoError(t, err)

queued, err = redis.ByteSlices(rc.Do("ZRANGE", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0", 0, -1))
assert.NoError(t, err)
assert.Equal(t, 2, len(queued))

// simulate channel having been throttled
rc.Do("ZREM", "msgs:active", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10")
rc.Do("SET", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10:tps:1636557205", "11")

msg4 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa)

err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg4}, "1636557205.345678")
require.NoError(t, err)

// check that channel has *not* been added to active list
msgsActive, err = redis.Strings(rc.Do("ZRANGE", "msgs:active", 0, -1))
assert.NoError(t, err)
assert.Equal(t, []string{}, msgsActive)

// but msg was still added to queue
queued, err = redis.ByteSlices(rc.Do("ZRANGE", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0", 0, -1))
assert.NoError(t, err)
assert.Equal(t, 3, len(queued))
}
32 changes: 27 additions & 5 deletions core/msgio/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ import (
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/null"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type msgSpec struct {
ChannelID models.ChannelID
ContactID models.ContactID
URNID models.URNID
Failed bool
ChannelID models.ChannelID
ContactID models.ContactID
URNID models.URNID
Failed bool
HighPriority bool
}

func (m *msgSpec) createMsg(t *testing.T, rt *runtime.Runtime, oa *models.OrgAssets) *models.Msg {
Expand All @@ -47,6 +49,10 @@ func (m *msgSpec) createMsg(t *testing.T, rt *runtime.Runtime, oa *models.OrgAss
msg, err := models.NewOutgoingMsg(rt.Config, oaOrg.Org(), channel, m.ContactID, flowMsg, time.Now())
require.NoError(t, err)

if m.HighPriority {
msg.SetResponseTo(models.NilMsgID, null.String("1234"))
}

models.InsertMessages(ctx, rt.DB, []*models.Msg{msg})
require.NoError(t, err)

Expand All @@ -58,6 +64,8 @@ func TestSendMessages(t *testing.T) {
rc := rp.Get()
defer rc.Close()

defer testsuite.Reset(testsuite.ResetData)

mockFCM := newMockFCMEndpoint("FCMID3")
defer mockFCM.Stop()

Expand All @@ -78,6 +86,13 @@ func TestSendMessages(t *testing.T) {
FCMTokensSynced []string
PendingMsgs int
}{
{
Description: "no messages",
Msgs: []msgSpec{},
QueueSizes: map[string][]int{},
FCMTokensSynced: []string{},
PendingMsgs: 0,
},
{
Description: "2 messages for Courier, and 1 Android",
Msgs: []msgSpec{
Expand All @@ -96,9 +111,16 @@ func TestSendMessages(t *testing.T) {
ContactID: testdata.Cathy.ID,
URNID: testdata.Cathy.URNID,
},
{
ChannelID: testdata.TwilioChannel.ID,
ContactID: testdata.Bob.ID,
URNID: testdata.Bob.URNID,
HighPriority: true,
},
},
QueueSizes: map[string][]int{
"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {2},
"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {2}, // 2 default priority messages for Cathy
"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/1": {1}, // 1 high priority message for Bob
},
FCMTokensSynced: []string{"FCMID1"},
PendingMsgs: 0,
Expand Down

0 comments on commit d57fd61

Please sign in to comment.