Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔁 Add task to resend messages #420

Merged
merged 6 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 128 additions & 6 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"strings"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/gsm7"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/goflow/excellent"
Expand Down Expand Up @@ -382,6 +384,60 @@ func NewIncomingMsg(orgID OrgID, channel *Channel, contactID ContactID, in *flow
return msg
}

var loadMessagesSQL = `
SELECT
id,
broadcast_id,
uuid,
text,
created_on,
direction,
status,
visibility,
msg_count,
error_count,
next_attempt,
external_id,
attachments,
metadata,
channel_id,
connection_id,
contact_id,
contact_urn_id,
response_to_id,
org_id,
topup_id
FROM
msgs_msg
WHERE
org_id = $1 AND
direction = $2 AND
id = ANY($3)
ORDER BY
id ASC`

// LoadMessages loads the given messages for the passed in org
func LoadMessages(ctx context.Context, db Queryer, orgID OrgID, direction MsgDirection, msgIDs []MsgID) ([]*Msg, error) {
rows, err := db.QueryxContext(ctx, loadMessagesSQL, orgID, direction, pq.Array(msgIDs))
if err != nil {
return nil, errors.Wrapf(err, "error querying msgs for org: %d", orgID)
}
defer rows.Close()

msgs := make([]*Msg, 0)
for rows.Next() {
msg := &Msg{}
err = rows.StructScan(&msg.m)
if err != nil {
return nil, errors.Wrap(err, "error scanning msg row")
}

msgs = append(msgs, msg)
}

return msgs, nil
}

// NormalizeAttachment will turn any relative URL in the passed in attachment and normalize it to
// include the full host for attachment domains
func NormalizeAttachment(attachment utils.Attachment) utils.Attachment {
Expand Down Expand Up @@ -458,12 +514,8 @@ func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status Ms
return nil
}

// MarkMessagesPending marks the passed in messages as pending
func MarkMessagesPending(ctx context.Context, db Queryer, msgs []*Msg) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure it's worth having a separate function for each status

return updateMessageStatus(ctx, db, msgs, MsgStatusPending)
}

func updateMessageStatus(ctx context.Context, db Queryer, msgs []*Msg, status MsgStatus) error {
// UpdateMessageStatus updates status for the given messages
func UpdateMessageStatus(ctx context.Context, db Queryer, msgs []*Msg, status MsgStatus) error {
is := make([]interface{}, len(msgs))
for i, msg := range msgs {
m := &msg.m
Expand Down Expand Up @@ -949,6 +1001,76 @@ func CreateBroadcastMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa
return msgs, nil
}

// CloneMessages clones the given messages for resending
func CloneMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAssets, msgs []*Msg) ([]*Msg, error) {
channels := oa.SessionAssets().Channels()
clones := make([]*Msg, len(msgs))

for i, msg := range msgs {
clone := &Msg{}
channelID := NilChannelID

urn, err := URNForID(ctx, db, oa, *msg.ContactURNID())
if err != nil {
return nil, errors.Wrap(err, "errr loading URN for cloned message")
}
contactURN, err := flows.ParseRawURN(channels, urn, assets.IgnoreMissing)
if err != nil {
return nil, errors.Wrap(err, "errr parsing URN for cloned message")
}
ch := channels.GetForURN(contactURN, assets.ChannelRoleSend)
if ch != nil {
channel := oa.ChannelByUUID(ch.UUID())
channelID = channel.ID()
}

// we fail messages for suspended orgs right away
status := MsgStatusQueued
if oa.Org().Suspended() {
status = MsgStatusFailed
}

m := &clone.m
m.UUID = flows.MsgUUID(uuids.New())
m.OrgID = oa.OrgID()
m.ContactID = msg.ContactID()
m.ContactURNID = msg.ContactURNID()
m.ChannelID = channelID
m.Text = msg.Text()
m.Attachments = msg.m.Attachments
m.Metadata = msg.m.Metadata
m.HighPriority = false
m.Direction = DirectionOut
m.Status = status
m.Visibility = VisibilityVisible
m.MsgType = msg.MsgType()
m.CreatedOn = dates.Now()

clones[i] = clone
}

// allocate a topup for these messages if org uses topups
topup, err := AllocateTopups(ctx, db, rp, oa.Org(), len(clones))
if err != nil {
return nil, errors.Wrapf(err, "error allocating topup for cloned messages")
}

// if we have an active topup, assign it to our messages
if topup != NilTopupID {
for _, m := range clones {
m.SetTopup(topup)
}
}

// insert them in a single request
err = InsertMessages(ctx, db, clones)
if err != nil {
return nil, errors.Wrapf(err, "error inserting cloned messages")
}

return clones, nil
}

// MarkBroadcastSent marks the passed in broadcast as sent
func MarkBroadcastSent(ctx context.Context, db Queryer, id BroadcastID) error {
// noop if it is a nil id
Expand Down
34 changes: 31 additions & 3 deletions core/models/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,34 @@ func TestGetMessageIDFromUUID(t *testing.T) {
assert.Equal(t, models.MsgID(msgIn.ID()), msgID)
}

func TestLoadMessages(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()
msgIn1 := testdata.InsertIncomingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "in 1")
msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", []utils.Attachment{"image/jpeg:hi.jpg"})
msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 2", nil)
msgOut3 := testdata.InsertOutgoingMsg(t, db, models.Org2, models.Org2FredID, models.Org2FredURN, models.Org2FredURNID, "out 3", nil)
testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "hi 3", nil)

ids := []models.MsgID{models.MsgID(msgIn1.ID()), models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID()), models.MsgID(msgOut3.ID())}

msgs, err := models.LoadMessages(ctx, db, models.Org1, models.DirectionOut, ids)

// should only return the outgoing messages for this org
require.NoError(t, err)
assert.Equal(t, 2, len(msgs))
assert.Equal(t, "out 1", msgs[0].Text())
assert.Equal(t, []utils.Attachment{"image/jpeg:hi.jpg"}, msgs[0].Attachments())
assert.Equal(t, "out 2", msgs[1].Text())

msgs, err = models.LoadMessages(ctx, db, models.Org1, models.DirectionIn, ids)

// should only return the incoming message for this org
require.NoError(t, err)
assert.Equal(t, 1, len(msgs))
assert.Equal(t, "in 1", msgs[0].Text())
}

func TestNormalizeAttachment(t *testing.T) {
config.Mailroom.AttachmentDomain = "foo.bar.com"
defer func() { config.Mailroom.AttachmentDomain = "" }()
Expand All @@ -167,7 +195,7 @@ func TestNormalizeAttachment(t *testing.T) {
}
}

func TestMarkMessages(t *testing.T) {
func TestUpdateMessageStatus(t *testing.T) {
ctx, db, _ := testsuite.Reset()
defer testsuite.Reset()

Expand All @@ -192,7 +220,7 @@ func TestMarkMessages(t *testing.T) {
msg2 := insertMsg("Hola")
insertMsg("Howdy")

models.MarkMessagesPending(ctx, db, []*models.Msg{msg1, msg2})
models.UpdateMessageStatus(ctx, db, []*models.Msg{msg1, msg2}, models.MsgStatusPending)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P'`, nil, 2)

Expand All @@ -205,7 +233,7 @@ func TestMarkMessages(t *testing.T) {
msg4 := insertMsg("Big messages!")
assert.Equal(t, flows.MsgID(3000000000), msg4.ID())

err = models.MarkMessagesPending(ctx, db, []*models.Msg{msg4})
err = models.UpdateMessageStatus(ctx, db, []*models.Msg{msg4}, models.MsgStatusPending)
assert.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P'`, nil, 3)
Expand Down
2 changes: 1 addition & 1 deletion core/msgio/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func SendMessages(ctx context.Context, db models.Queryer, rp *redis.Pool, fc *fc
// any messages that didn't get sent should be moved back to pending (they are queued at creation to save an
// update in the common case)
if len(pending) > 0 {
err := models.MarkMessagesPending(ctx, db, pending)
err := models.UpdateMessageStatus(ctx, db, pending, models.MsgStatusPending)
if err != nil {
log.WithError(err).Error("error marking message as pending")
}
Expand Down
59 changes: 59 additions & 0 deletions core/tasks/msgs/resend_msgs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package msgs

import (
"context"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/msgio"
"github.com/nyaruka/mailroom/core/tasks"

"github.com/pkg/errors"
)

// TypeResendMsgs is the type of the resend messages task
const TypeResendMsgs = "resend_msgs"

func init() {
tasks.RegisterType(TypeResendMsgs, func() tasks.Task { return &ResendMsgsTask{} })
}

// ResendMsgsTask is our task for resending messages
type ResendMsgsTask struct {
MsgIDs []models.MsgID `json:"msg_ids"`
}

// Timeout is the maximum amount of time the task can run for
func (t *ResendMsgsTask) Timeout() time.Duration {
return time.Minute * 5
}

func (t *ResendMsgsTask) Perform(ctx context.Context, mr *mailroom.Mailroom, orgID models.OrgID) error {
db := mr.DB
rp := mr.RP

oa, err := models.GetOrgAssets(ctx, mr.DB, orgID)
if err != nil {
return errors.Wrap(err, "unable to load org")
}

msgs, err := models.LoadMessages(ctx, db, orgID, models.DirectionOut, t.MsgIDs)
if err != nil {
return errors.Wrap(err, "error loading messages to resend")
}

clones, err := models.CloneMessages(ctx, db, rp, oa, msgs)
if err != nil {
return errors.Wrap(err, "error cloning messages")
}

// update existing messages as RESENT
err = models.UpdateMessageStatus(ctx, db, msgs, models.MsgStatusResent)
if err != nil {
return errors.Wrap(err, "error updating message status")
}

msgio.SendMessages(ctx, db, rp, nil, clones)
return nil
}
52 changes: 52 additions & 0 deletions core/tasks/msgs/resend_msgs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package msgs_test

import (
"testing"

"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks/msgs"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/stretchr/testify/require"
)

func TestResendMsgs(t *testing.T) {
testsuite.Reset()
ctx := testsuite.CTX()
db := testsuite.DB()
mr := &mailroom.Mailroom{Config: config.Mailroom, DB: db, RP: testsuite.RP(), ElasticClient: nil}

msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", []utils.Attachment{"image/jpeg:hi.jpg"})
msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.BobID, models.BobURN, models.BobURNID, "out 2", nil)
msgOut3 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 3", nil)

db.MustExec(`UPDATE msgs_msg SET metadata = '{"topic":"cool-stuff"}' WHERE id = $1`, msgOut1.ID())

// create our task
task := &msgs.ResendMsgsTask{
MsgIDs: []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())},
}

// execute it
err := task.Perform(ctx, mr, models.Org1)
require.NoError(t, err)

// there should be 2 new pending outgoing messages in the database, with channel set
testsuite.AssertQueryCount(t, db,
`SELECT count(*) FROM msgs_msg WHERE direction = 'O' AND status = 'P' AND channel_id IS NOT NULL AND id > $1`,
[]interface{}{msgOut3.ID()}, 2,
)

// cloning will have cloned message text, attachments and metadata
testsuite.AssertQueryCount(t, db,
`SELECT count(*) FROM msgs_msg WHERE text = 'out 1' AND attachments = '{"image/jpeg:hi.jpg"}' AND metadata = '{"topic":"cool-stuff"}' AND id != $1`,
[]interface{}{msgOut1.ID()}, 1,
)

// the resent messages should have had their status updated
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'R'`, nil, 2)
}
16 changes: 16 additions & 0 deletions testsuite/testdata/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package testdata
import (
"testing"

"github.com/lib/pq"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom/core/models"

"github.com/jmoiron/sqlx"
Expand All @@ -25,3 +27,17 @@ func InsertIncomingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID
msg.SetID(id)
return msg
}

// InsertOutgoingMsg inserts an outgoing message
func InsertOutgoingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID models.ContactID, urn urns.URN, urnID models.URNID, text string, attachments []utils.Attachment) *flows.MsgOut {
msg := flows.NewMsgOut(urn, nil, text, nil, nil, nil, flows.NilMsgTopic)

var id flows.MsgID
err := db.Get(&id,
`INSERT INTO msgs_msg(uuid, text, attachments, created_on, direction, status, visibility, msg_count, error_count, next_attempt, contact_id, contact_urn_id, org_id)
VALUES($1, $2, $3, NOW(), 'O', 'P', 'V', 1, 0, NOW(), $4, $5, $6) RETURNING id`, msg.UUID(), text, pq.Array(attachments), contactID, urnID, orgID)
require.NoError(t, err)

msg.SetID(id)
return msg
}