Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔁 Add task to resend messages #420

Merged
merged 6 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
_ "github.com/nyaruka/mailroom/core/tasks/groups"
_ "github.com/nyaruka/mailroom/core/tasks/interrupts"
_ "github.com/nyaruka/mailroom/core/tasks/ivr"
_ "github.com/nyaruka/mailroom/core/tasks/msgs"
_ "github.com/nyaruka/mailroom/core/tasks/schedules"
_ "github.com/nyaruka/mailroom/core/tasks/starts"
_ "github.com/nyaruka/mailroom/core/tasks/stats"
Expand Down
124 changes: 124 additions & 0 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/gsm7"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
Expand Down Expand Up @@ -118,6 +119,7 @@ type Msg struct {
ContactURNID *URNID `db:"contact_urn_id" json:"contact_urn_id"`
ResponseToID MsgID `db:"response_to_id" json:"response_to_id"`
ResponseToExternalID null.String ` json:"response_to_external_id"`
IsResend bool ` json:"is_resend,omitempty"`
URN urns.URN ` json:"urn"`
URNAuth null.String ` json:"urn_auth,omitempty"`
OrgID OrgID `db:"org_id" json:"org_id"`
Expand Down Expand Up @@ -164,6 +166,7 @@ func (m *Msg) OrgID() OrgID { return m.m.OrgID }
func (m *Msg) TopupID() TopupID { return m.m.TopupID }
func (m *Msg) ContactID() ContactID { return m.m.ContactID }
func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID }
func (m *Msg) IsResend() bool { return m.m.IsResend }

func (m *Msg) SetTopup(topupID TopupID) { m.m.TopupID = topupID }
func (m *Msg) SetChannelID(channelID ChannelID) { m.m.ChannelID = channelID }
Expand Down Expand Up @@ -382,6 +385,60 @@ func NewIncomingMsg(orgID OrgID, channel *Channel, contactID ContactID, in *flow
return msg
}

var loadMessagesSQL = `
SELECT
id,
broadcast_id,
uuid,
text,
created_on,
direction,
status,
visibility,
msg_count,
error_count,
next_attempt,
external_id,
attachments,
metadata,
channel_id,
connection_id,
contact_id,
contact_urn_id,
response_to_id,
org_id,
topup_id
FROM
msgs_msg
WHERE
org_id = $1 AND
direction = $2 AND
id = ANY($3)
ORDER BY
id ASC`

// LoadMessages loads the given messages for the passed in org
func LoadMessages(ctx context.Context, db Queryer, orgID OrgID, direction MsgDirection, msgIDs []MsgID) ([]*Msg, error) {
rows, err := db.QueryxContext(ctx, loadMessagesSQL, orgID, direction, pq.Array(msgIDs))
if err != nil {
return nil, errors.Wrapf(err, "error querying msgs for org: %d", orgID)
}
defer rows.Close()

msgs := make([]*Msg, 0)
for rows.Next() {
msg := &Msg{}
err = rows.StructScan(&msg.m)
if err != nil {
return nil, errors.Wrap(err, "error scanning msg row")
}

msgs = append(msgs, msg)
}

return msgs, nil
}

// NormalizeAttachment will turn any relative URL in the passed in attachment and normalize it to
// include the full host for attachment domains
func NormalizeAttachment(attachment utils.Attachment) utils.Attachment {
Expand Down Expand Up @@ -949,6 +1006,73 @@ func CreateBroadcastMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa
return msgs, nil
}

const updateMsgForResendingSQL = `
UPDATE
msgs_msg m
SET
channel_id = r.channel_id::int,
topup_id = r.topup_id::int,
status = 'P',
error_count = 0,
queued_on = r.queued_on::timestamp with time zone,
sent_on = NULL,
modified_on = NOW()
FROM (
VALUES(:id, :channel_id, :topup_id, :queued_on)
) AS
r(id, channel_id, topup_id, queued_on)
WHERE
m.id = r.id::bigint
`

// ResendMessages prepares messages for resending by reselecting a channel and marking them as PENDING
func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAssets, msgs []*Msg) error {
channels := oa.SessionAssets().Channels()
resends := make([]interface{}, len(msgs))

for i, msg := range msgs {
// reselect channel for this message's URN
urn, err := URNForID(ctx, db, oa, *msg.ContactURNID())
if err != nil {
return errors.Wrap(err, "error loading URN")
}
contactURN, err := flows.ParseRawURN(channels, urn, assets.IgnoreMissing)
if err != nil {
return errors.Wrap(err, "error parsing URN")
}
ch := channels.GetForURN(contactURN, assets.ChannelRoleSend)
if ch != nil {
channel := oa.ChannelByUUID(ch.UUID())
msg.m.ChannelID = channel.ID()
} else {
msg.m.ChannelID = NilChannelID
}

// allocate a new topup for this message if org uses topups
msg.m.TopupID, err = AllocateTopups(ctx, db, rp, oa.Org(), 1)
if err != nil {
return errors.Wrapf(err, "error allocating topup for message resending")
}

// mark message as being a resend so it will be queued to courier as such
msg.m.Status = MsgStatusPending
msg.m.QueuedOn = dates.Now()
msg.m.SentOn = dates.ZeroDateTime
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about this.. I think it'll put a zero time in the json queued to courier.. but that doesn't matter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does omit_empty leave it out if it is a zero value? Feels risky / weird having a 0 value there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently not but aren't we always queuing sent_on="0001-01-01T00:00:00Z" to courier? I need to verify but I can't see how the regular queuing code would be any different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok now I've checked and we always queue to courier with sent_on="0001-01-01T00:00:00Z" and next_attempt="0001-01-01T00:00:00Z" because those are non-null in mailroom's model

msg.m.ErrorCount = 0
msg.m.IsResend = true

resends[i] = msg.m
}

// update the messages in the database
err := BulkQuery(ctx, "updating messages for resending", db, updateMsgForResendingSQL, resends)
if err != nil {
return errors.Wrapf(err, "error updating messages for resending")
}

return nil
}

// MarkBroadcastSent marks the passed in broadcast as sent
func MarkBroadcastSent(ctx context.Context, db Queryer, id BroadcastID) error {
// noop if it is a nil id
Expand Down
69 changes: 69 additions & 0 deletions core/models/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/flows"
Expand Down Expand Up @@ -147,6 +148,74 @@ func TestGetMessageIDFromUUID(t *testing.T) {
assert.Equal(t, models.MsgID(msgIn.ID()), msgID)
}

func TestLoadMessages(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()
msgIn1 := testdata.InsertIncomingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "in 1")
msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", []utils.Attachment{"image/jpeg:hi.jpg"})
msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 2", nil)
msgOut3 := testdata.InsertOutgoingMsg(t, db, models.Org2, models.Org2FredID, models.Org2FredURN, models.Org2FredURNID, "out 3", nil)
testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "hi 3", nil)

ids := []models.MsgID{models.MsgID(msgIn1.ID()), models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID()), models.MsgID(msgOut3.ID())}

msgs, err := models.LoadMessages(ctx, db, models.Org1, models.DirectionOut, ids)

// should only return the outgoing messages for this org
require.NoError(t, err)
assert.Equal(t, 2, len(msgs))
assert.Equal(t, "out 1", msgs[0].Text())
assert.Equal(t, []utils.Attachment{"image/jpeg:hi.jpg"}, msgs[0].Attachments())
assert.Equal(t, "out 2", msgs[1].Text())

msgs, err = models.LoadMessages(ctx, db, models.Org1, models.DirectionIn, ids)

// should only return the incoming message for this org
require.NoError(t, err)
assert.Equal(t, 1, len(msgs))
assert.Equal(t, "in 1", msgs[0].Text())
}

func TestResendMessages(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()
rp := testsuite.RP()

db.MustExec(`DELETE FROM msgs_msg`)

oa, err := models.GetOrgAssets(ctx, db, models.Org1)
require.NoError(t, err)

msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", nil)
msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.BobID, models.BobURN, models.BobURNID, "out 2", nil)
testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 3", nil)

// make them look like failed messages
db.MustExec(`UPDATE msgs_msg SET status = 'F', sent_on = NOW(), error_count = 3`)

// give Bob's URN an affinity for the Vonage channel
db.MustExec(`UPDATE contacts_contacturn SET channel_id = $1 WHERE id = $2`, models.VonageChannelID, models.BobURNID)

msgs, err := models.LoadMessages(ctx, db, models.Org1, models.DirectionOut, []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())})
require.NoError(t, err)

now := dates.Now()

// resend both msgs
err = models.ResendMessages(ctx, db, rp, oa, msgs)
require.NoError(t, err)

// both messages should now have a channel, a topup and be marked for resending
assert.True(t, msgs[0].IsResend())
assert.Equal(t, models.TwilioChannelID, msgs[0].ChannelID())
assert.Equal(t, models.TopupID(1), msgs[0].TopupID())
assert.True(t, msgs[1].IsResend())
assert.Equal(t, models.VonageChannelID, msgs[1].ChannelID())
assert.Equal(t, models.TopupID(1), msgs[1].TopupID())

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P' AND queued_on > $1 AND sent_on IS NULL`, []interface{}{now}, 2)
}

func TestNormalizeAttachment(t *testing.T) {
config.Mailroom.AttachmentDomain = "foo.bar.com"
defer func() { config.Mailroom.AttachmentDomain = "" }()
Expand Down
53 changes: 53 additions & 0 deletions core/tasks/msgs/resend_msgs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package msgs

import (
"context"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/msgio"
"github.com/nyaruka/mailroom/core/tasks"

"github.com/pkg/errors"
)

// TypeResendMsgs is the type of the resend messages task
const TypeResendMsgs = "resend_msgs"

func init() {
tasks.RegisterType(TypeResendMsgs, func() tasks.Task { return &ResendMsgsTask{} })
}

// ResendMsgsTask is our task for resending messages
type ResendMsgsTask struct {
MsgIDs []models.MsgID `json:"msg_ids"`
}

// Timeout is the maximum amount of time the task can run for
func (t *ResendMsgsTask) Timeout() time.Duration {
return time.Minute * 5
}

func (t *ResendMsgsTask) Perform(ctx context.Context, mr *mailroom.Mailroom, orgID models.OrgID) error {
db := mr.DB
rp := mr.RP

oa, err := models.GetOrgAssets(ctx, mr.DB, orgID)
if err != nil {
return errors.Wrap(err, "unable to load org")
}

msgs, err := models.LoadMessages(ctx, db, orgID, models.DirectionOut, t.MsgIDs)
if err != nil {
return errors.Wrap(err, "error loading messages to resend")
}

err = models.ResendMessages(ctx, db, rp, oa, msgs)
if err != nil {
return errors.Wrap(err, "error cloning messages")
}

msgio.SendMessages(ctx, db, rp, nil, msgs)
return nil
}
43 changes: 43 additions & 0 deletions core/tasks/msgs/resend_msgs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package msgs_test

import (
"testing"

"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks/msgs"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/stretchr/testify/require"
)

func TestResendMsgs(t *testing.T) {
testsuite.Reset()
ctx := testsuite.CTX()
db := testsuite.DB()
mr := &mailroom.Mailroom{Config: config.Mailroom, DB: db, RP: testsuite.RP(), ElasticClient: nil}

msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", []utils.Attachment{"image/jpeg:hi.jpg"})
msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.BobID, models.BobURN, models.BobURNID, "out 2", nil)
testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 3", nil)

db.MustExec(`UPDATE msgs_msg SET metadata = '{"topic":"cool-stuff"}' WHERE id = $1`, msgOut1.ID())

// create our task
task := &msgs.ResendMsgsTask{
MsgIDs: []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())},
}

// execute it
err := task.Perform(ctx, mr, models.Org1)
require.NoError(t, err)

// the two resent messages should now be pending and have a channel set
testsuite.AssertQueryCount(t, db,
`SELECT count(*) FROM msgs_msg WHERE status = 'P' AND channel_id IS NOT NULL AND id IN ($1, $2)`,
[]interface{}{msgOut1.ID(), msgOut2.ID()}, 2,
)
}
16 changes: 16 additions & 0 deletions testsuite/testdata/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package testdata
import (
"testing"

"github.com/lib/pq"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom/core/models"

"github.com/jmoiron/sqlx"
Expand All @@ -25,3 +27,17 @@ func InsertIncomingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID
msg.SetID(id)
return msg
}

// InsertOutgoingMsg inserts an outgoing message
func InsertOutgoingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID models.ContactID, urn urns.URN, urnID models.URNID, text string, attachments []utils.Attachment) *flows.MsgOut {
msg := flows.NewMsgOut(urn, nil, text, nil, nil, nil, flows.NilMsgTopic)

var id flows.MsgID
err := db.Get(&id,
`INSERT INTO msgs_msg(uuid, text, attachments, created_on, direction, status, visibility, msg_count, error_count, next_attempt, contact_id, contact_urn_id, org_id)
VALUES($1, $2, $3, NOW(), 'O', 'P', 'V', 1, 0, NOW(), $4, $5, $6) RETURNING id`, msg.UUID(), text, pq.Array(attachments), contactID, urnID, orgID)
require.NoError(t, err)

msg.SetID(id)
return msg
}