From d57fd61a51412e37dd527d273e1f8f9138905b3f Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Wed, 10 Nov 2021 11:30:11 -0500 Subject: [PATCH] Make LUA script to queue messages to courier easier to understand --- core/msgio/courier.go | 95 ++++++++++++++++++++------------------ core/msgio/courier_test.go | 72 +++++++++++++++++++++++++++++ core/msgio/send_test.go | 32 +++++++++++-- 3 files changed, 149 insertions(+), 50 deletions(-) diff --git a/core/msgio/courier.go b/core/msgio/courier.go index f4d603d0c..ef72470cf 100644 --- a/core/msgio/courier.go +++ b/core/msgio/courier.go @@ -1,10 +1,11 @@ package msgio import ( - "encoding/json" "strconv" "time" + "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/mailroom/core/models" @@ -14,20 +15,61 @@ import ( ) const ( - highPriority = 1 - defaultPriority = 0 + bulkPriority = 0 + highPriority = 1 ) +var queuePushScript = redis.NewScript(6, ` +-- KEYS: [QueueType, QueueName, TPS, Priority, Items, EpochSecs] +local queueType, queueName, tps, priority, items, epochSecs = KEYS[1], KEYS[2], tonumber(KEYS[3]), KEYS[4], KEYS[5], KEYS[6] + +-- first construct the base key for this queue from the type + name + tps, e.g. "msgs:0a77a158-1dcb-4c06-9aee-e15bdf64653e|10" +local queueKey = queueType .. ":" .. queueName .. "|" .. tps + +-- each queue than has two sorted sets for bulk and high priority items, e.g. "msgs:0a77..653e|10/0" vs msgs:0a77..653e|10/1" +local priorityQueueKey = queueKey .. "/" .. priority + +-- add the items to the sorted set using the full timestamp (e.g. 1636556789.123456) as the score +redis.call("ZADD", priorityQueueKey, epochSecs, items) + +-- if we have a TPS limit, check the transaction counter for this epoch second to see if have already reached it +local curr = -1 +if tps > 0 then + local tpsKey = queueKey .. ":tps:" .. math.floor(epochSecs) -- e.g. "msgs:0a77..4653e|10:tps:1636556789" + curr = tonumber(redis.call("GET", tpsKey)) +end + +-- if we haven't hit the limit, add this queue to set of active queues +if not curr or curr < tps then + redis.call("ZINCRBY", queueType .. ":active", 0, queueKey) + return 1 +else + return 0 +end +`) + +// PushCourierBatch pushes a batch of messages for a single contact and channel onto the appropriate courier queue +func PushCourierBatch(rc redis.Conn, ch *models.Channel, batch []*models.Msg, timestamp string) error { + priority := bulkPriority + if batch[0].HighPriority() { + priority = highPriority + } + batchJSON := jsonx.MustMarshal(batch) + + _, err := queuePushScript.Do(rc, "msgs", ch.UUID(), ch.TPS(), priority, batchJSON, timestamp) + return err +} + // QueueCourierMessages queues messages for a single contact to Courier func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*models.Msg) error { if len(msgs) == 0 { return nil } - now := time.Now() - epochMS := strconv.FormatFloat(float64(now.UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64) - - priority := defaultPriority + // get the time in seconds since the epoch as a floating point number + // e.g. 2021-11-10T15:10:49.123456+00:00 => "1636557205.123456" + now := dates.Now() + epochSeconds := strconv.FormatFloat(float64(now.UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64) // we batch msgs by channel uuid batch := make([]*models.Msg, 0, len(msgs)) @@ -36,17 +78,8 @@ func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*mod // commits our batch to redis commitBatch := func() error { if len(batch) > 0 { - priority = defaultPriority - if batch[0].HighPriority() { - priority = highPriority - } - - batchJSON, err := json.Marshal(batch) - if err != nil { - return err - } start := time.Now() - _, err = queueMsg.Do(rc, epochMS, "msgs", currentChannel.UUID(), currentChannel.TPS(), priority, batchJSON) + err := PushCourierBatch(rc, currentChannel, batch, epochSeconds) if err != nil { return err } @@ -101,31 +134,3 @@ func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*mod // any remaining in our batch, queue it up return commitBatch() } - -var queueMsg = redis.NewScript(6, ` --- KEYS: [EpochMS, QueueType, QueueName, TPS, Priority, Value] - --- first push onto our specific queue --- our queue name is built from the type, name and tps, usually something like: "msgs:uuid1-uuid2-uuid3-uuid4|tps" -local queueKey = KEYS[2] .. ":" .. KEYS[3] .. "|" .. KEYS[4] - --- our priority queue name also includes the priority of the message (we have one queue for default and one for bulk) -local priorityQueueKey = queueKey .. "/" .. KEYS[5] -redis.call("zadd", priorityQueueKey, KEYS[1], KEYS[6]) -local tps = tonumber(KEYS[4]) - --- if we have a TPS, check whether we are currently throttled -local curr = -1 -if tps > 0 then - local tpsKey = queueKey .. ":tps:" .. math.floor(KEYS[1]) - curr = tonumber(redis.call("get", tpsKey)) -end - --- if we aren't then add to our active -if not curr or curr < tps then -redis.call("zincrby", KEYS[2] .. ":active", 0, queueKey) - return 1 -else - return 0 -end -`) diff --git a/core/msgio/courier_test.go b/core/msgio/courier_test.go index 2605f062e..f938c8927 100644 --- a/core/msgio/courier_test.go +++ b/core/msgio/courier_test.go @@ -1,8 +1,11 @@ package msgio_test import ( + "encoding/json" "testing" + "github.com/gomodule/redigo/redis" + "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/core/msgio" "github.com/nyaruka/mailroom/testsuite" @@ -98,3 +101,72 @@ func TestQueueCourierMessages(t *testing.T) { msgio.QueueCourierMessages(rc, testdata.Cathy.ID, []*models.Msg{ms.createMsg(t, rt, oa)}) }) } + +func TestPushCourierBatch(t *testing.T) { + ctx, rt, _, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetData | testsuite.ResetRedis) + + oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshChannels) + require.NoError(t, err) + + channel := oa.ChannelByID(testdata.TwilioChannel.ID) + + msg1 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa) + msg2 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa) + + err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg1, msg2}, "1636557205.123456") + require.NoError(t, err) + + // check that channel has been added to active list + msgsActive, err := redis.Strings(rc.Do("ZRANGE", "msgs:active", 0, -1)) + assert.NoError(t, err) + assert.Equal(t, []string{"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10"}, msgsActive) + + // and that msgs were added as single batch to bulk priority (0) queue + queued, err := redis.ByteSlices(rc.Do("ZRANGE", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0", 0, -1)) + assert.NoError(t, err) + assert.Equal(t, 1, len(queued)) + + unmarshaled, err := jsonx.DecodeGeneric(queued[0]) + assert.NoError(t, err) + assert.Equal(t, 2, len(unmarshaled.([]interface{}))) + + item1ID, _ := unmarshaled.([]interface{})[0].(map[string]interface{})["id"].(json.Number).Int64() + item2ID, _ := unmarshaled.([]interface{})[1].(map[string]interface{})["id"].(json.Number).Int64() + assert.Equal(t, int64(msg1.ID()), item1ID) + assert.Equal(t, int64(msg2.ID()), item2ID) + + // push another batch in the same epoch second with transaction counter still below limit + rc.Do("SET", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10:tps:1636557205", "5") + + msg3 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa) + + err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg3}, "1636557205.234567") + require.NoError(t, err) + + queued, err = redis.ByteSlices(rc.Do("ZRANGE", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0", 0, -1)) + assert.NoError(t, err) + assert.Equal(t, 2, len(queued)) + + // simulate channel having been throttled + rc.Do("ZREM", "msgs:active", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10") + rc.Do("SET", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10:tps:1636557205", "11") + + msg4 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa) + + err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg4}, "1636557205.345678") + require.NoError(t, err) + + // check that channel has *not* been added to active list + msgsActive, err = redis.Strings(rc.Do("ZRANGE", "msgs:active", 0, -1)) + assert.NoError(t, err) + assert.Equal(t, []string{}, msgsActive) + + // but msg was still added to queue + queued, err = redis.ByteSlices(rc.Do("ZRANGE", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0", 0, -1)) + assert.NoError(t, err) + assert.Equal(t, 3, len(queued)) +} diff --git a/core/msgio/send_test.go b/core/msgio/send_test.go index 0b4eaf029..3fad09268 100644 --- a/core/msgio/send_test.go +++ b/core/msgio/send_test.go @@ -14,16 +14,18 @@ import ( "github.com/nyaruka/mailroom/runtime" "github.com/nyaruka/mailroom/testsuite" "github.com/nyaruka/mailroom/testsuite/testdata" + "github.com/nyaruka/null" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) type msgSpec struct { - ChannelID models.ChannelID - ContactID models.ContactID - URNID models.URNID - Failed bool + ChannelID models.ChannelID + ContactID models.ContactID + URNID models.URNID + Failed bool + HighPriority bool } func (m *msgSpec) createMsg(t *testing.T, rt *runtime.Runtime, oa *models.OrgAssets) *models.Msg { @@ -47,6 +49,10 @@ func (m *msgSpec) createMsg(t *testing.T, rt *runtime.Runtime, oa *models.OrgAss msg, err := models.NewOutgoingMsg(rt.Config, oaOrg.Org(), channel, m.ContactID, flowMsg, time.Now()) require.NoError(t, err) + if m.HighPriority { + msg.SetResponseTo(models.NilMsgID, null.String("1234")) + } + models.InsertMessages(ctx, rt.DB, []*models.Msg{msg}) require.NoError(t, err) @@ -58,6 +64,8 @@ func TestSendMessages(t *testing.T) { rc := rp.Get() defer rc.Close() + defer testsuite.Reset(testsuite.ResetData) + mockFCM := newMockFCMEndpoint("FCMID3") defer mockFCM.Stop() @@ -78,6 +86,13 @@ func TestSendMessages(t *testing.T) { FCMTokensSynced []string PendingMsgs int }{ + { + Description: "no messages", + Msgs: []msgSpec{}, + QueueSizes: map[string][]int{}, + FCMTokensSynced: []string{}, + PendingMsgs: 0, + }, { Description: "2 messages for Courier, and 1 Android", Msgs: []msgSpec{ @@ -96,9 +111,16 @@ func TestSendMessages(t *testing.T) { ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID, }, + { + ChannelID: testdata.TwilioChannel.ID, + ContactID: testdata.Bob.ID, + URNID: testdata.Bob.URNID, + HighPriority: true, + }, }, QueueSizes: map[string][]int{ - "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {2}, + "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {2}, // 2 default priority messages for Cathy + "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/1": {1}, // 1 high priority message for Bob }, FCMTokensSynced: []string{"FCMID1"}, PendingMsgs: 0,