From 98c09fa07cfaa96f81e567c7ddc0e1af1a34edb2 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 23 Apr 2021 15:13:11 -0500 Subject: [PATCH 1/6] Add task to resend messages --- core/models/msgs.go | 133 +++++++++++++++++++++++++++++++-- core/models/msgs_test.go | 21 +++++- core/msgio/send.go | 2 +- core/tasks/msgs/resend_msgs.go | 59 +++++++++++++++ testsuite/testdata/msgs.go | 14 ++++ 5 files changed, 219 insertions(+), 10 deletions(-) create mode 100644 core/tasks/msgs/resend_msgs.go diff --git a/core/models/msgs.go b/core/models/msgs.go index d5eea2cd5..e9e011d61 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -9,8 +9,10 @@ import ( "strings" "time" + "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/gocommon/gsm7" "github.com/nyaruka/gocommon/urns" + "github.com/nyaruka/gocommon/uuids" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/envs" "github.com/nyaruka/goflow/excellent" @@ -382,6 +384,59 @@ func NewIncomingMsg(orgID OrgID, channel *Channel, contactID ContactID, in *flow return msg } +var loadMessagesSQL = ` +SELECT + id, + broadcast_id, + uuid, + text, + created_on, + direction, + status, + visibility, + msg_count, + error_count, + next_attempt, + external_id, + attachments, + metadata, + channel_id, + connection_id, + contact_id, + contact_urn_id, + response_to_id, + org_id, + topup_id +FROM + msgs_msg +WHERE + org_id = $1 AND + id = ANY($2) +ORDER BY + id ASC` + +// LoadMessages loads the given messages for the passed in org +func LoadMessages(ctx context.Context, db Queryer, orgID OrgID, msgIDs []MsgID) ([]*Msg, error) { + rows, err := db.QueryxContext(ctx, loadMessagesSQL, orgID, pq.Array(msgIDs)) + if err != nil { + return nil, errors.Wrapf(err, "error querying msgs for org: %d", orgID) + } + defer rows.Close() + + msgs := make([]*Msg, 0) + for rows.Next() { + msg := &Msg{} + err = rows.StructScan(&msg.m) + if err != nil { + return nil, errors.Wrap(err, "error scanning msg row") + } + + msgs = append(msgs, msg) + } + + return msgs, nil +} + // NormalizeAttachment will turn any relative URL in the passed in attachment and normalize it to // include the full host for attachment domains func NormalizeAttachment(attachment utils.Attachment) utils.Attachment { @@ -458,12 +513,8 @@ func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status Ms return nil } -// MarkMessagesPending marks the passed in messages as pending -func MarkMessagesPending(ctx context.Context, db Queryer, msgs []*Msg) error { - return updateMessageStatus(ctx, db, msgs, MsgStatusPending) -} - -func updateMessageStatus(ctx context.Context, db Queryer, msgs []*Msg, status MsgStatus) error { +// UpdateMessageStatus updates status for the given messages +func UpdateMessageStatus(ctx context.Context, db Queryer, msgs []*Msg, status MsgStatus) error { is := make([]interface{}, len(msgs)) for i, msg := range msgs { m := &msg.m @@ -949,6 +1000,76 @@ func CreateBroadcastMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa return msgs, nil } +// CloneMessages clones the given messages for resending +func CloneMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAssets, msgs []*Msg) ([]*Msg, error) { + channels := oa.SessionAssets().Channels() + clones := make([]*Msg, len(msgs)) + + for i, msg := range msgs { + clone := &Msg{} + channelID := NilChannelID + + urn, err := URNForID(ctx, db, oa, *msg.ContactURNID()) + if err != nil { + return nil, errors.Wrap(err, "errr loading URN for cloned message") + } + contactURN, err := flows.ParseRawURN(channels, urn, assets.IgnoreMissing) + if err != nil { + return nil, errors.Wrap(err, "errr parsing URN for cloned message") + } + ch := channels.GetForURN(contactURN, assets.ChannelRoleSend) + if ch != nil { + channel := oa.ChannelByUUID(ch.UUID()) + channelID = channel.ID() + } + + // we fail messages for suspended orgs right away + status := MsgStatusQueued + if oa.Org().Suspended() { + status = MsgStatusFailed + } + + m := &clone.m + m.UUID = flows.MsgUUID(uuids.New()) + m.OrgID = oa.OrgID() + m.ContactID = msg.ContactID() + m.ContactURNID = msg.ContactURNID() + m.ChannelID = channelID + m.Text = msg.Text() + m.Attachments = msg.m.Attachments + m.Metadata = msg.m.Metadata + m.HighPriority = false + m.Direction = DirectionOut + m.Status = status + m.Visibility = VisibilityVisible + m.MsgType = msg.MsgType() + m.CreatedOn = dates.Now() + + clones[i] = clone + } + + // allocate a topup for these messages if org uses topups + topup, err := AllocateTopups(ctx, db, rp, oa.Org(), len(clones)) + if err != nil { + return nil, errors.Wrapf(err, "error allocating topup for cloned messages") + } + + // if we have an active topup, assign it to our messages + if topup != NilTopupID { + for _, m := range clones { + m.SetTopup(topup) + } + } + + // insert them in a single request + err = InsertMessages(ctx, db, clones) + if err != nil { + return nil, errors.Wrapf(err, "error inserting cloned messages") + } + + return clones, nil +} + // MarkBroadcastSent marks the passed in broadcast as sent func MarkBroadcastSent(ctx context.Context, db Queryer, id BroadcastID) error { // noop if it is a nil id diff --git a/core/models/msgs_test.go b/core/models/msgs_test.go index 6f0478c51..a08f880ee 100644 --- a/core/models/msgs_test.go +++ b/core/models/msgs_test.go @@ -147,6 +147,21 @@ func TestGetMessageIDFromUUID(t *testing.T) { assert.Equal(t, models.MsgID(msgIn.ID()), msgID) } +func TestLoadMessages(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "hi 1") + msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "hi 2") + testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "hi 3") + + msgs, err := models.LoadMessages(ctx, db, models.Org1, []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())}) + + require.NoError(t, err) + assert.Equal(t, 2, len(msgs)) + assert.Equal(t, "hi 1", msgs[0].Text()) + assert.Equal(t, "hi 2", msgs[1].Text()) +} + func TestNormalizeAttachment(t *testing.T) { config.Mailroom.AttachmentDomain = "foo.bar.com" defer func() { config.Mailroom.AttachmentDomain = "" }() @@ -167,7 +182,7 @@ func TestNormalizeAttachment(t *testing.T) { } } -func TestMarkMessages(t *testing.T) { +func TestUpdateMessageStatus(t *testing.T) { ctx, db, _ := testsuite.Reset() defer testsuite.Reset() @@ -192,7 +207,7 @@ func TestMarkMessages(t *testing.T) { msg2 := insertMsg("Hola") insertMsg("Howdy") - models.MarkMessagesPending(ctx, db, []*models.Msg{msg1, msg2}) + models.UpdateMessageStatus(ctx, db, []*models.Msg{msg1, msg2}, models.MsgStatusPending) testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P'`, nil, 2) @@ -205,7 +220,7 @@ func TestMarkMessages(t *testing.T) { msg4 := insertMsg("Big messages!") assert.Equal(t, flows.MsgID(3000000000), msg4.ID()) - err = models.MarkMessagesPending(ctx, db, []*models.Msg{msg4}) + err = models.UpdateMessageStatus(ctx, db, []*models.Msg{msg4}, models.MsgStatusPending) assert.NoError(t, err) testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P'`, nil, 3) diff --git a/core/msgio/send.go b/core/msgio/send.go index 850886890..607e9e9cc 100644 --- a/core/msgio/send.go +++ b/core/msgio/send.go @@ -71,7 +71,7 @@ func SendMessages(ctx context.Context, db models.Queryer, rp *redis.Pool, fc *fc // any messages that didn't get sent should be moved back to pending (they are queued at creation to save an // update in the common case) if len(pending) > 0 { - err := models.MarkMessagesPending(ctx, db, pending) + err := models.UpdateMessageStatus(ctx, db, pending, models.MsgStatusPending) if err != nil { log.WithError(err).Error("error marking message as pending") } diff --git a/core/tasks/msgs/resend_msgs.go b/core/tasks/msgs/resend_msgs.go new file mode 100644 index 000000000..8b124beab --- /dev/null +++ b/core/tasks/msgs/resend_msgs.go @@ -0,0 +1,59 @@ +package msgs + +import ( + "context" + "time" + + "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/msgio" + "github.com/nyaruka/mailroom/core/tasks" + + "github.com/pkg/errors" +) + +// TypeResendMsgs is the type of the resend messages task +const TypeResendMsgs = "resend_msgs" + +func init() { + tasks.RegisterType(TypeResendMsgs, func() tasks.Task { return &ResendMsgsTask{} }) +} + +// ResendMsgsTask is our task for resending messages +type ResendMsgsTask struct { + MsgIDs []models.MsgID `json:"msg_ids"` +} + +// Timeout is the maximum amount of time the task can run for +func (t *ResendMsgsTask) Timeout() time.Duration { + return time.Minute * 5 +} + +func (t *ResendMsgsTask) Perform(ctx context.Context, mr *mailroom.Mailroom, orgID models.OrgID) error { + db := mr.DB + rp := mr.RP + + oa, err := models.GetOrgAssets(ctx, mr.DB, orgID) + if err != nil { + return errors.Wrap(err, "unable to load org") + } + + msgs, err := models.LoadMessages(ctx, db, orgID, t.MsgIDs) + if err != nil { + return errors.Wrap(err, "error loading messages to resend") + } + + clones, err := models.CloneMessages(ctx, db, rp, oa, msgs) + if err != nil { + return errors.Wrap(err, "error cloning messages") + } + + // update existing messages as RESENT + err = models.UpdateMessageStatus(ctx, db, msgs, models.MsgStatusResent) + if err != nil { + return errors.Wrap(err, "error updating message status") + } + + msgio.SendMessages(ctx, db, rp, nil, clones) + return nil +} diff --git a/testsuite/testdata/msgs.go b/testsuite/testdata/msgs.go index f4fc62964..3b7f1408e 100644 --- a/testsuite/testdata/msgs.go +++ b/testsuite/testdata/msgs.go @@ -25,3 +25,17 @@ func InsertIncomingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID msg.SetID(id) return msg } + +// InsertOutgoingMsg inserts an outgoing message +func InsertOutgoingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID models.ContactID, urn urns.URN, urnID models.URNID, text string) *flows.MsgOut { + msg := flows.NewMsgOut(urn, nil, text, nil, nil, nil, flows.NilMsgTopic) + + var id flows.MsgID + err := db.Get(&id, + `INSERT INTO msgs_msg(uuid, text, created_on, direction, status, visibility, msg_count, error_count, next_attempt, contact_id, contact_urn_id, org_id) + VALUES($1, $2, NOW(), 'O', 'P', 'V', 1, 0, NOW(), $3, $4, $5) RETURNING id`, msg.UUID(), text, contactID, urnID, orgID) + require.NoError(t, err) + + msg.SetID(id) + return msg +} From 74e2f687ac9a812d61c61eebd68cad79ed8fe65c Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 26 Apr 2021 12:47:37 -0500 Subject: [PATCH 2/6] Add tests --- core/models/msgs.go | 7 ++-- core/models/msgs_test.go | 25 ++++++++++---- core/tasks/msgs/resend_msgs.go | 2 +- core/tasks/msgs/resend_msgs_test.go | 52 +++++++++++++++++++++++++++++ testsuite/testdata/msgs.go | 8 +++-- 5 files changed, 81 insertions(+), 13 deletions(-) create mode 100644 core/tasks/msgs/resend_msgs_test.go diff --git a/core/models/msgs.go b/core/models/msgs.go index e9e011d61..630ef11cf 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -411,13 +411,14 @@ FROM msgs_msg WHERE org_id = $1 AND - id = ANY($2) + direction = $2 AND + id = ANY($3) ORDER BY id ASC` // LoadMessages loads the given messages for the passed in org -func LoadMessages(ctx context.Context, db Queryer, orgID OrgID, msgIDs []MsgID) ([]*Msg, error) { - rows, err := db.QueryxContext(ctx, loadMessagesSQL, orgID, pq.Array(msgIDs)) +func LoadMessages(ctx context.Context, db Queryer, orgID OrgID, direction MsgDirection, msgIDs []MsgID) ([]*Msg, error) { + rows, err := db.QueryxContext(ctx, loadMessagesSQL, orgID, direction, pq.Array(msgIDs)) if err != nil { return nil, errors.Wrapf(err, "error querying msgs for org: %d", orgID) } diff --git a/core/models/msgs_test.go b/core/models/msgs_test.go index a08f880ee..a4a6145b1 100644 --- a/core/models/msgs_test.go +++ b/core/models/msgs_test.go @@ -150,16 +150,29 @@ func TestGetMessageIDFromUUID(t *testing.T) { func TestLoadMessages(t *testing.T) { ctx := testsuite.CTX() db := testsuite.DB() - msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "hi 1") - msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "hi 2") - testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "hi 3") + msgIn1 := testdata.InsertIncomingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "in 1") + msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", []utils.Attachment{"image/jpeg:hi.jpg"}) + msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 2", nil) + msgOut3 := testdata.InsertOutgoingMsg(t, db, models.Org2, models.Org2FredID, models.Org2FredURN, models.Org2FredURNID, "out 3", nil) + testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "hi 3", nil) - msgs, err := models.LoadMessages(ctx, db, models.Org1, []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())}) + ids := []models.MsgID{models.MsgID(msgIn1.ID()), models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID()), models.MsgID(msgOut3.ID())} + msgs, err := models.LoadMessages(ctx, db, models.Org1, models.DirectionOut, ids) + + // should only return the outgoing messages for this org require.NoError(t, err) assert.Equal(t, 2, len(msgs)) - assert.Equal(t, "hi 1", msgs[0].Text()) - assert.Equal(t, "hi 2", msgs[1].Text()) + assert.Equal(t, "out 1", msgs[0].Text()) + assert.Equal(t, []utils.Attachment{"image/jpeg:hi.jpg"}, msgs[0].Attachments()) + assert.Equal(t, "out 2", msgs[1].Text()) + + msgs, err = models.LoadMessages(ctx, db, models.Org1, models.DirectionIn, ids) + + // should only return the incoming message for this org + require.NoError(t, err) + assert.Equal(t, 1, len(msgs)) + assert.Equal(t, "in 1", msgs[0].Text()) } func TestNormalizeAttachment(t *testing.T) { diff --git a/core/tasks/msgs/resend_msgs.go b/core/tasks/msgs/resend_msgs.go index 8b124beab..f245da2a9 100644 --- a/core/tasks/msgs/resend_msgs.go +++ b/core/tasks/msgs/resend_msgs.go @@ -38,7 +38,7 @@ func (t *ResendMsgsTask) Perform(ctx context.Context, mr *mailroom.Mailroom, org return errors.Wrap(err, "unable to load org") } - msgs, err := models.LoadMessages(ctx, db, orgID, t.MsgIDs) + msgs, err := models.LoadMessages(ctx, db, orgID, models.DirectionOut, t.MsgIDs) if err != nil { return errors.Wrap(err, "error loading messages to resend") } diff --git a/core/tasks/msgs/resend_msgs_test.go b/core/tasks/msgs/resend_msgs_test.go new file mode 100644 index 000000000..4b89374f1 --- /dev/null +++ b/core/tasks/msgs/resend_msgs_test.go @@ -0,0 +1,52 @@ +package msgs_test + +import ( + "testing" + + "github.com/nyaruka/goflow/utils" + "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/config" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/tasks/msgs" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + + "github.com/stretchr/testify/require" +) + +func TestResendMsgs(t *testing.T) { + testsuite.Reset() + ctx := testsuite.CTX() + db := testsuite.DB() + mr := &mailroom.Mailroom{Config: config.Mailroom, DB: db, RP: testsuite.RP(), ElasticClient: nil} + + msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", []utils.Attachment{"image/jpeg:hi.jpg"}) + msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.BobID, models.BobURN, models.BobURNID, "out 2", nil) + msgOut3 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 3", nil) + + db.MustExec(`UPDATE msgs_msg SET metadata = '{"topic":"cool-stuff"}' WHERE id = $1`, msgOut1.ID()) + + // create our task + task := &msgs.ResendMsgsTask{ + MsgIDs: []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())}, + } + + // execute it + err := task.Perform(ctx, mr, models.Org1) + require.NoError(t, err) + + // there should be 2 new pending outgoing messages in the database, with channel set + testsuite.AssertQueryCount(t, db, + `SELECT count(*) FROM msgs_msg WHERE direction = 'O' AND status = 'P' AND channel_id IS NOT NULL AND id > $1`, + []interface{}{msgOut3.ID()}, 2, + ) + + // cloning will have cloned message text, attachments and metadata + testsuite.AssertQueryCount(t, db, + `SELECT count(*) FROM msgs_msg WHERE text = 'out 1' AND attachments = '{"image/jpeg:hi.jpg"}' AND metadata = '{"topic":"cool-stuff"}' AND id != $1`, + []interface{}{msgOut1.ID()}, 1, + ) + + // the resent messages should have had their status updated + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'R'`, nil, 2) +} diff --git a/testsuite/testdata/msgs.go b/testsuite/testdata/msgs.go index 3b7f1408e..beb9e9112 100644 --- a/testsuite/testdata/msgs.go +++ b/testsuite/testdata/msgs.go @@ -3,9 +3,11 @@ package testdata import ( "testing" + "github.com/lib/pq" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/gocommon/uuids" "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/utils" "github.com/nyaruka/mailroom/core/models" "github.com/jmoiron/sqlx" @@ -27,13 +29,13 @@ func InsertIncomingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID } // InsertOutgoingMsg inserts an outgoing message -func InsertOutgoingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID models.ContactID, urn urns.URN, urnID models.URNID, text string) *flows.MsgOut { +func InsertOutgoingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID models.ContactID, urn urns.URN, urnID models.URNID, text string, attachments []utils.Attachment) *flows.MsgOut { msg := flows.NewMsgOut(urn, nil, text, nil, nil, nil, flows.NilMsgTopic) var id flows.MsgID err := db.Get(&id, - `INSERT INTO msgs_msg(uuid, text, created_on, direction, status, visibility, msg_count, error_count, next_attempt, contact_id, contact_urn_id, org_id) - VALUES($1, $2, NOW(), 'O', 'P', 'V', 1, 0, NOW(), $3, $4, $5) RETURNING id`, msg.UUID(), text, contactID, urnID, orgID) + `INSERT INTO msgs_msg(uuid, text, attachments, created_on, direction, status, visibility, msg_count, error_count, next_attempt, contact_id, contact_urn_id, org_id) + VALUES($1, $2, $3, NOW(), 'O', 'P', 'V', 1, 0, NOW(), $4, $5, $6) RETURNING id`, msg.UUID(), text, pq.Array(attachments), contactID, urnID, orgID) require.NoError(t, err) msg.SetID(id) From dc3998360d5543311469441b7b328ebbe6135851 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 26 Apr 2021 12:49:39 -0500 Subject: [PATCH 3/6] Import msgs task package in mailroom cmd --- cmd/mailroom/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/mailroom/main.go b/cmd/mailroom/main.go index a063f4cfa..f61055628 100644 --- a/cmd/mailroom/main.go +++ b/cmd/mailroom/main.go @@ -23,6 +23,7 @@ import ( _ "github.com/nyaruka/mailroom/core/tasks/groups" _ "github.com/nyaruka/mailroom/core/tasks/interrupts" _ "github.com/nyaruka/mailroom/core/tasks/ivr" + _ "github.com/nyaruka/mailroom/core/tasks/msgs" _ "github.com/nyaruka/mailroom/core/tasks/schedules" _ "github.com/nyaruka/mailroom/core/tasks/starts" _ "github.com/nyaruka/mailroom/core/tasks/stats" From 910e8f8806f91bbe56c41035275e8fb60db649c7 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 6 May 2021 13:30:09 -0500 Subject: [PATCH 4/6] Rather than clone messages for resending, reset their state --- core/models/msgs.go | 100 +++++++++++++++++---------------- core/models/msgs_test.go | 42 +++++++++++++- core/msgio/send.go | 2 +- core/tasks/msgs/resend_msgs.go | 10 +--- 4 files changed, 93 insertions(+), 61 deletions(-) diff --git a/core/models/msgs.go b/core/models/msgs.go index 630ef11cf..f4d1fd706 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -12,7 +12,6 @@ import ( "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/gocommon/gsm7" "github.com/nyaruka/gocommon/urns" - "github.com/nyaruka/gocommon/uuids" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/envs" "github.com/nyaruka/goflow/excellent" @@ -120,6 +119,7 @@ type Msg struct { ContactURNID *URNID `db:"contact_urn_id" json:"contact_urn_id"` ResponseToID MsgID `db:"response_to_id" json:"response_to_id"` ResponseToExternalID null.String ` json:"response_to_external_id"` + IsResend bool ` json:"is_resend,omitempty"` URN urns.URN ` json:"urn"` URNAuth null.String ` json:"urn_auth,omitempty"` OrgID OrgID `db:"org_id" json:"org_id"` @@ -166,6 +166,7 @@ func (m *Msg) OrgID() OrgID { return m.m.OrgID } func (m *Msg) TopupID() TopupID { return m.m.TopupID } func (m *Msg) ContactID() ContactID { return m.m.ContactID } func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID } +func (m *Msg) IsResend() bool { return m.m.IsResend } func (m *Msg) SetTopup(topupID TopupID) { m.m.TopupID = topupID } func (m *Msg) SetChannelID(channelID ChannelID) { m.m.ChannelID = channelID } @@ -514,8 +515,12 @@ func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status Ms return nil } -// UpdateMessageStatus updates status for the given messages -func UpdateMessageStatus(ctx context.Context, db Queryer, msgs []*Msg, status MsgStatus) error { +// MarkMessagesPending marks the passed in messages as pending +func MarkMessagesPending(ctx context.Context, db Queryer, msgs []*Msg) error { + return updateMessageStatus(ctx, db, msgs, MsgStatusPending) +} + +func updateMessageStatus(ctx context.Context, db Queryer, msgs []*Msg, status MsgStatus) error { is := make([]interface{}, len(msgs)) for i, msg := range msgs { m := &msg.m @@ -1001,74 +1006,71 @@ func CreateBroadcastMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa return msgs, nil } -// CloneMessages clones the given messages for resending -func CloneMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAssets, msgs []*Msg) ([]*Msg, error) { +const updateMsgForResendingSQL = ` + UPDATE + msgs_msg m + SET + channel_id = r.channel_id::int, + topup_id = r.topup_id::int, + status = 'P', + error_count = 0, + queued_on = NULL, + sent_on = NULL, + modified_on = NOW() + FROM ( + VALUES(:id, :channel_id, :topup_id) + ) AS + r(id, channel_id, topup_id) + WHERE + m.id = r.id::bigint +` + +// ResendMessages prepares messages for resending by reselecting a channel and marking them as PENDING +func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAssets, msgs []*Msg) error { channels := oa.SessionAssets().Channels() - clones := make([]*Msg, len(msgs)) + resends := make([]interface{}, len(msgs)) for i, msg := range msgs { - clone := &Msg{} - channelID := NilChannelID - + // reselect channel for this message's URN urn, err := URNForID(ctx, db, oa, *msg.ContactURNID()) if err != nil { - return nil, errors.Wrap(err, "errr loading URN for cloned message") + return errors.Wrap(err, "error loading URN") } contactURN, err := flows.ParseRawURN(channels, urn, assets.IgnoreMissing) if err != nil { - return nil, errors.Wrap(err, "errr parsing URN for cloned message") + return errors.Wrap(err, "error parsing URN") } ch := channels.GetForURN(contactURN, assets.ChannelRoleSend) if ch != nil { channel := oa.ChannelByUUID(ch.UUID()) - channelID = channel.ID() + msg.m.ChannelID = channel.ID() + } else { + msg.m.ChannelID = NilChannelID } - // we fail messages for suspended orgs right away - status := MsgStatusQueued - if oa.Org().Suspended() { - status = MsgStatusFailed + // allocate a new topup for this message if org uses topups + msg.m.TopupID, err = AllocateTopups(ctx, db, rp, oa.Org(), 1) + if err != nil { + return errors.Wrapf(err, "error allocating topup for message resending") } - m := &clone.m - m.UUID = flows.MsgUUID(uuids.New()) - m.OrgID = oa.OrgID() - m.ContactID = msg.ContactID() - m.ContactURNID = msg.ContactURNID() - m.ChannelID = channelID - m.Text = msg.Text() - m.Attachments = msg.m.Attachments - m.Metadata = msg.m.Metadata - m.HighPriority = false - m.Direction = DirectionOut - m.Status = status - m.Visibility = VisibilityVisible - m.MsgType = msg.MsgType() - m.CreatedOn = dates.Now() + // mark message as being a resend so it will be queued to courier as such + msg.m.Status = MsgStatusPending + msg.m.QueuedOn = dates.ZeroDateTime + msg.m.SentOn = dates.ZeroDateTime + msg.m.ErrorCount = 0 + msg.m.IsResend = true - clones[i] = clone + resends[i] = msg.m } - // allocate a topup for these messages if org uses topups - topup, err := AllocateTopups(ctx, db, rp, oa.Org(), len(clones)) + // update the messages in the database + err := BulkQuery(ctx, "updating messages for resending", db, updateMsgForResendingSQL, resends) if err != nil { - return nil, errors.Wrapf(err, "error allocating topup for cloned messages") + return errors.Wrapf(err, "error updating messages for resending") } - // if we have an active topup, assign it to our messages - if topup != NilTopupID { - for _, m := range clones { - m.SetTopup(topup) - } - } - - // insert them in a single request - err = InsertMessages(ctx, db, clones) - if err != nil { - return nil, errors.Wrapf(err, "error inserting cloned messages") - } - - return clones, nil + return nil } // MarkBroadcastSent marks the passed in broadcast as sent diff --git a/core/models/msgs_test.go b/core/models/msgs_test.go index a4a6145b1..d01a1ddec 100644 --- a/core/models/msgs_test.go +++ b/core/models/msgs_test.go @@ -175,6 +175,42 @@ func TestLoadMessages(t *testing.T) { assert.Equal(t, "in 1", msgs[0].Text()) } +func TestResendMessages(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + rp := testsuite.RP() + + oa, err := models.GetOrgAssets(ctx, db, models.Org1) + require.NoError(t, err) + + msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", nil) + msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.BobID, models.BobURN, models.BobURNID, "out 2", nil) + testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 3", nil) + + // make them look like failed messages + db.MustExec(`UPDATE msgs_msg SET status = 'F', sent_on = NOW(), error_count = 3`) + + // give Bob's URN an affinity for the Vonage channel + db.MustExec(`UPDATE contacts_contacturn SET channel_id = $1 WHERE id = $2`, models.VonageChannelID, models.BobURNID) + + msgs, err := models.LoadMessages(ctx, db, models.Org1, models.DirectionOut, []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())}) + require.NoError(t, err) + + // resend both msgs + err = models.ResendMessages(ctx, db, rp, oa, msgs) + require.NoError(t, err) + + // both messages should now have a channel, a topup and be marked for resending + assert.True(t, msgs[0].IsResend()) + assert.Equal(t, models.TwilioChannelID, msgs[0].ChannelID()) + assert.Equal(t, models.TopupID(1), msgs[0].TopupID()) + assert.True(t, msgs[1].IsResend()) + assert.Equal(t, models.VonageChannelID, msgs[1].ChannelID()) + assert.Equal(t, models.TopupID(1), msgs[1].TopupID()) + + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P' AND sent_on IS NULL`, nil, 2) +} + func TestNormalizeAttachment(t *testing.T) { config.Mailroom.AttachmentDomain = "foo.bar.com" defer func() { config.Mailroom.AttachmentDomain = "" }() @@ -195,7 +231,7 @@ func TestNormalizeAttachment(t *testing.T) { } } -func TestUpdateMessageStatus(t *testing.T) { +func TestMarkMessages(t *testing.T) { ctx, db, _ := testsuite.Reset() defer testsuite.Reset() @@ -220,7 +256,7 @@ func TestUpdateMessageStatus(t *testing.T) { msg2 := insertMsg("Hola") insertMsg("Howdy") - models.UpdateMessageStatus(ctx, db, []*models.Msg{msg1, msg2}, models.MsgStatusPending) + models.MarkMessagesPending(ctx, db, []*models.Msg{msg1, msg2}) testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P'`, nil, 2) @@ -233,7 +269,7 @@ func TestUpdateMessageStatus(t *testing.T) { msg4 := insertMsg("Big messages!") assert.Equal(t, flows.MsgID(3000000000), msg4.ID()) - err = models.UpdateMessageStatus(ctx, db, []*models.Msg{msg4}, models.MsgStatusPending) + err = models.MarkMessagesPending(ctx, db, []*models.Msg{msg4}) assert.NoError(t, err) testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P'`, nil, 3) diff --git a/core/msgio/send.go b/core/msgio/send.go index 607e9e9cc..850886890 100644 --- a/core/msgio/send.go +++ b/core/msgio/send.go @@ -71,7 +71,7 @@ func SendMessages(ctx context.Context, db models.Queryer, rp *redis.Pool, fc *fc // any messages that didn't get sent should be moved back to pending (they are queued at creation to save an // update in the common case) if len(pending) > 0 { - err := models.UpdateMessageStatus(ctx, db, pending, models.MsgStatusPending) + err := models.MarkMessagesPending(ctx, db, pending) if err != nil { log.WithError(err).Error("error marking message as pending") } diff --git a/core/tasks/msgs/resend_msgs.go b/core/tasks/msgs/resend_msgs.go index f245da2a9..2a705ef44 100644 --- a/core/tasks/msgs/resend_msgs.go +++ b/core/tasks/msgs/resend_msgs.go @@ -43,17 +43,11 @@ func (t *ResendMsgsTask) Perform(ctx context.Context, mr *mailroom.Mailroom, org return errors.Wrap(err, "error loading messages to resend") } - clones, err := models.CloneMessages(ctx, db, rp, oa, msgs) + err = models.ResendMessages(ctx, db, rp, oa, msgs) if err != nil { return errors.Wrap(err, "error cloning messages") } - // update existing messages as RESENT - err = models.UpdateMessageStatus(ctx, db, msgs, models.MsgStatusResent) - if err != nil { - return errors.Wrap(err, "error updating message status") - } - - msgio.SendMessages(ctx, db, rp, nil, clones) + msgio.SendMessages(ctx, db, rp, nil, msgs) return nil } From ecb4f52f96dfa61d9b02024e4604d68bbb447660 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 6 May 2021 13:50:32 -0500 Subject: [PATCH 5/6] Fix test --- core/tasks/msgs/resend_msgs_test.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/core/tasks/msgs/resend_msgs_test.go b/core/tasks/msgs/resend_msgs_test.go index 4b89374f1..30fc7990b 100644 --- a/core/tasks/msgs/resend_msgs_test.go +++ b/core/tasks/msgs/resend_msgs_test.go @@ -22,7 +22,7 @@ func TestResendMsgs(t *testing.T) { msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", []utils.Attachment{"image/jpeg:hi.jpg"}) msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.BobID, models.BobURN, models.BobURNID, "out 2", nil) - msgOut3 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 3", nil) + testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 3", nil) db.MustExec(`UPDATE msgs_msg SET metadata = '{"topic":"cool-stuff"}' WHERE id = $1`, msgOut1.ID()) @@ -35,18 +35,9 @@ func TestResendMsgs(t *testing.T) { err := task.Perform(ctx, mr, models.Org1) require.NoError(t, err) - // there should be 2 new pending outgoing messages in the database, with channel set + // the two resent messages should now be pending and have a channel set testsuite.AssertQueryCount(t, db, - `SELECT count(*) FROM msgs_msg WHERE direction = 'O' AND status = 'P' AND channel_id IS NOT NULL AND id > $1`, - []interface{}{msgOut3.ID()}, 2, + `SELECT count(*) FROM msgs_msg WHERE status = 'P' AND channel_id IS NOT NULL AND id IN ($1, $2)`, + []interface{}{msgOut1.ID(), msgOut2.ID()}, 2, ) - - // cloning will have cloned message text, attachments and metadata - testsuite.AssertQueryCount(t, db, - `SELECT count(*) FROM msgs_msg WHERE text = 'out 1' AND attachments = '{"image/jpeg:hi.jpg"}' AND metadata = '{"topic":"cool-stuff"}' AND id != $1`, - []interface{}{msgOut1.ID()}, 1, - ) - - // the resent messages should have had their status updated - testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'R'`, nil, 2) } From a74599ead84ff6c5124e7e5b539591d4f3db7612 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 6 May 2021 13:57:46 -0500 Subject: [PATCH 6/6] Set queued_on to now() when resending messages --- core/models/msgs.go | 8 ++++---- core/models/msgs_test.go | 7 ++++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/models/msgs.go b/core/models/msgs.go index f4d1fd706..b94e7f49a 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -1014,13 +1014,13 @@ const updateMsgForResendingSQL = ` topup_id = r.topup_id::int, status = 'P', error_count = 0, - queued_on = NULL, + queued_on = r.queued_on::timestamp with time zone, sent_on = NULL, modified_on = NOW() FROM ( - VALUES(:id, :channel_id, :topup_id) + VALUES(:id, :channel_id, :topup_id, :queued_on) ) AS - r(id, channel_id, topup_id) + r(id, channel_id, topup_id, queued_on) WHERE m.id = r.id::bigint ` @@ -1056,7 +1056,7 @@ func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAsse // mark message as being a resend so it will be queued to courier as such msg.m.Status = MsgStatusPending - msg.m.QueuedOn = dates.ZeroDateTime + msg.m.QueuedOn = dates.Now() msg.m.SentOn = dates.ZeroDateTime msg.m.ErrorCount = 0 msg.m.IsResend = true diff --git a/core/models/msgs_test.go b/core/models/msgs_test.go index d01a1ddec..ab32a1396 100644 --- a/core/models/msgs_test.go +++ b/core/models/msgs_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/flows" @@ -180,6 +181,8 @@ func TestResendMessages(t *testing.T) { db := testsuite.DB() rp := testsuite.RP() + db.MustExec(`DELETE FROM msgs_msg`) + oa, err := models.GetOrgAssets(ctx, db, models.Org1) require.NoError(t, err) @@ -196,6 +199,8 @@ func TestResendMessages(t *testing.T) { msgs, err := models.LoadMessages(ctx, db, models.Org1, models.DirectionOut, []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())}) require.NoError(t, err) + now := dates.Now() + // resend both msgs err = models.ResendMessages(ctx, db, rp, oa, msgs) require.NoError(t, err) @@ -208,7 +213,7 @@ func TestResendMessages(t *testing.T) { assert.Equal(t, models.VonageChannelID, msgs[1].ChannelID()) assert.Equal(t, models.TopupID(1), msgs[1].TopupID()) - testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P' AND sent_on IS NULL`, nil, 2) + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P' AND queued_on > $1 AND sent_on IS NULL`, []interface{}{now}, 2) } func TestNormalizeAttachment(t *testing.T) {