Skip to content

Commit

Permalink
Merge pull request #597 from nyaruka/resend_fix
Browse files Browse the repository at this point in the history
Rework resending to fail messages with no destination
  • Loading branch information
rowanseymour authored Mar 10, 2022
2 parents 395971c + 26ccfe3 commit 1e65bee
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 68 deletions.
127 changes: 75 additions & 52 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,80 +1146,103 @@ func CreateBroadcastMessages(ctx context.Context, rt *runtime.Runtime, oa *OrgAs
return msgs, nil
}

const updateMsgForResendingSQL = `
UPDATE
msgs_msg m
SET
channel_id = r.channel_id::int,
topup_id = r.topup_id::int,
status = 'P',
error_count = 0,
failed_reason = NULL,
queued_on = r.queued_on::timestamp with time zone,
sent_on = NULL,
modified_on = NOW()
FROM (
VALUES(:id, :channel_id, :topup_id, :queued_on)
) AS
r(id, channel_id, topup_id, queued_on)
WHERE
m.id = r.id::bigint
`
const sqlUpdateMsgForResending = `
UPDATE msgs_msg m
SET channel_id = r.channel_id::int,
topup_id = r.topup_id::int,
status = 'P',
error_count = 0,
failed_reason = NULL,
queued_on = r.queued_on::timestamp with time zone,
sent_on = NULL,
modified_on = NOW()
FROM (VALUES(:id, :channel_id, :topup_id, :queued_on)) AS r(id, channel_id, topup_id, queued_on)
WHERE m.id = r.id::bigint`

const sqlUpdateMsgResendFailed = `
UPDATE msgs_msg m
SET channel_id = NULL, status = 'F', error_count = 0, failed_reason = 'D', sent_on = NULL, modified_on = NOW()
WHERE id = ANY($1)`

// ResendMessages prepares messages for resending by reselecting a channel and marking them as PENDING
func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAssets, msgs []*Msg) error {
func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAssets, msgs []*Msg) ([]*Msg, error) {
channels := oa.SessionAssets().Channels()
resends := make([]interface{}, len(msgs))

for i, msg := range msgs {
// reselect channel for this message's URN
urn, err := URNForID(ctx, db, oa, *msg.ContactURNID())
if err != nil {
return errors.Wrap(err, "error loading URN")
}
msg.m.URN = urn // needs to be set for queueing to courier
// for the bulk db updates
resends := make([]interface{}, 0, len(msgs))
refails := make([]MsgID, 0, len(msgs))

contactURN, err := flows.ParseRawURN(channels, urn, assets.IgnoreMissing)
if err != nil {
return errors.Wrap(err, "error parsing URN")
resent := make([]*Msg, 0, len(msgs))

for _, msg := range msgs {
var ch *flows.Channel
var err error
urnID := msg.ContactURNID()

if urnID != nil {
// reselect channel for this message's URN
urn, err := URNForID(ctx, db, oa, *urnID)
if err != nil {
return nil, errors.Wrap(err, "error loading URN")
}
msg.m.URN = urn // needs to be set for queueing to courier

contactURN, err := flows.ParseRawURN(channels, urn, assets.IgnoreMissing)
if err != nil {
return nil, errors.Wrap(err, "error parsing URN")
}

ch = channels.GetForURN(contactURN, assets.ChannelRoleSend)
}

ch := channels.GetForURN(contactURN, assets.ChannelRoleSend)
if ch != nil {
channel := oa.ChannelByUUID(ch.UUID())
msg.channel = channel

msg.m.ChannelID = channel.ID()
msg.m.ChannelUUID = channel.UUID()
msg.channel = channel
msg.m.Status = MsgStatusPending
msg.m.QueuedOn = dates.Now()
msg.m.SentOn = nil
msg.m.ErrorCount = 0
msg.m.FailedReason = ""
msg.m.IsResend = true // mark message as being a resend so it will be queued to courier as such

// allocate a new topup for this message if org uses topups
msg.m.TopupID, err = AllocateTopups(ctx, db, rp, oa.Org(), 1)
if err != nil {
return nil, errors.Wrapf(err, "error allocating topup for message resending")
}
resends = append(resends, msg.m)
resent = append(resent, msg)
} else {
// if we don't have channel or a URN, fail again
msg.channel = nil
msg.m.ChannelID = NilChannelID
msg.m.ChannelUUID = assets.ChannelUUID("")
msg.channel = nil
}
msg.m.Status = MsgStatusFailed
msg.m.QueuedOn = dates.Now()
msg.m.SentOn = nil
msg.m.ErrorCount = 0
msg.m.FailedReason = MsgFailedNoDestination

// allocate a new topup for this message if org uses topups
msg.m.TopupID, err = AllocateTopups(ctx, db, rp, oa.Org(), 1)
if err != nil {
return errors.Wrapf(err, "error allocating topup for message resending")
refails = append(refails, MsgID(msg.m.ID))
}
}

// mark message as being a resend so it will be queued to courier as such
msg.m.Status = MsgStatusPending
msg.m.QueuedOn = dates.Now()
msg.m.SentOn = nil
msg.m.ErrorCount = 0
msg.m.FailedReason = ""
msg.m.IsResend = true

resends[i] = msg.m
// update the messages that can be resent
err := BulkQuery(ctx, "updating messages for resending", db, sqlUpdateMsgForResending, resends)
if err != nil {
return nil, errors.Wrapf(err, "error updating messages for resending")
}

// update the messages in the database
err := BulkQuery(ctx, "updating messages for resending", db, updateMsgForResendingSQL, resends)
// and update the messages that can't be
_, err = db.ExecContext(ctx, sqlUpdateMsgResendFailed, pq.Array(refails))
if err != nil {
return errors.Wrapf(err, "error updating messages for resending")
return nil, errors.Wrapf(err, "error updating non-resendable messages")
}

return nil
return resent, nil
}

// MarkBroadcastSent marks the passed in broadcast as sent
Expand Down
48 changes: 35 additions & 13 deletions core/models/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,31 +416,53 @@ func TestResendMessages(t *testing.T) {
oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
require.NoError(t, err)

msgOut1 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "out 1", nil, models.MsgStatusFailed, false)
msgOut2 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Bob, "out 2", nil, models.MsgStatusFailed, false)
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "out 3", nil, models.MsgStatusFailed, false)
out1 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "hi", nil, models.MsgStatusFailed, false)
out2 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Bob, "hi", nil, models.MsgStatusFailed, false)

// failed message with no channel
out3 := testdata.InsertOutgoingMsg(db, testdata.Org1, nil, testdata.Cathy, "hi", nil, models.MsgStatusFailed, false)

// failed message with no URN
out4 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "hi", nil, models.MsgStatusFailed, false)
db.MustExec(`UPDATE msgs_msg SET contact_urn_id = NULL, failed_reason = 'D' WHERE id = $1`, out4.ID())

// failed message with URN which we no longer have a channel for
out5 := testdata.InsertOutgoingMsg(db, testdata.Org1, nil, testdata.George, "hi", nil, models.MsgStatusFailed, false)
db.MustExec(`UPDATE msgs_msg SET failed_reason = 'E' WHERE id = $1`, out5.ID())
db.MustExec(`UPDATE contacts_contacturn SET scheme = 'viber', path = '1234', identity = 'viber:1234' WHERE id = $1`, testdata.George.URNID)

// other failed message not included in set to resend
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "hi", nil, models.MsgStatusFailed, false)

// give Bob's URN an affinity for the Vonage channel
db.MustExec(`UPDATE contacts_contacturn SET channel_id = $1 WHERE id = $2`, testdata.VonageChannel.ID, testdata.Bob.URNID)

msgs, err := models.GetMessagesByID(ctx, db, testdata.Org1.ID, models.DirectionOut, []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())})
ids := []models.MsgID{models.MsgID(out1.ID()), models.MsgID(out2.ID()), models.MsgID(out3.ID()), models.MsgID(out4.ID()), models.MsgID(out5.ID())}
msgs, err := models.GetMessagesByID(ctx, db, testdata.Org1.ID, models.DirectionOut, ids)
require.NoError(t, err)

now := dates.Now()

// resend both msgs
err = models.ResendMessages(ctx, db, rp, oa, msgs)
resent, err := models.ResendMessages(ctx, db, rp, oa, msgs)
require.NoError(t, err)

assert.Len(t, resent, 3) // only #1, #2 and #3 can be resent

// both messages should now have a channel, a topup and be marked for resending
assert.True(t, msgs[0].IsResend())
assert.Equal(t, testdata.TwilioChannel.ID, msgs[0].ChannelID())
assert.Equal(t, models.TopupID(1), msgs[0].TopupID())
assert.True(t, msgs[1].IsResend())
assert.Equal(t, testdata.VonageChannel.ID, msgs[1].ChannelID())
assert.Equal(t, models.TopupID(1), msgs[1].TopupID())

assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P' AND queued_on > $1 AND sent_on IS NULL`, now).Returns(2)
assert.True(t, resent[0].IsResend())
assert.Equal(t, testdata.TwilioChannel.ID, resent[0].ChannelID())
assert.Equal(t, models.TopupID(1), resent[0].TopupID())
assert.True(t, resent[1].IsResend())
assert.Equal(t, testdata.VonageChannel.ID, resent[1].ChannelID()) // channel changed
assert.Equal(t, models.TopupID(1), resent[1].TopupID())
assert.True(t, resent[2].IsResend())
assert.Equal(t, testdata.TwilioChannel.ID, resent[2].ChannelID()) // channel added

assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P' AND queued_on > $1 AND sent_on IS NULL`, now).Returns(3)

assertdb.Query(t, db, `SELECT status, failed_reason FROM msgs_msg WHERE id = $1`, out4.ID()).Columns(map[string]interface{}{"status": "F", "failed_reason": "D"})
assertdb.Query(t, db, `SELECT status, failed_reason FROM msgs_msg WHERE id = $1`, out5.ID()).Columns(map[string]interface{}{"status": "F", "failed_reason": "D"})
}

func TestGetMsgRepetitions(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions web/msg/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func handleResend(ctx context.Context, rt *runtime.Runtime, r *http.Request) (in
return nil, http.StatusInternalServerError, errors.Wrap(err, "error loading messages to resend")
}

err = models.ResendMessages(ctx, rt.DB, rt.RP, oa, msgs)
resends, err := models.ResendMessages(ctx, rt.DB, rt.RP, oa, msgs)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "error resending messages")
}

msgio.SendMessages(ctx, rt, rt.DB, nil, msgs)
msgio.SendMessages(ctx, rt, rt.DB, nil, resends)

// response is the ids of the messages that were actually resent
resentMsgIDs := make([]flows.MsgID, len(msgs))
resentMsgIDs := make([]flows.MsgID, len(resends))
for i, m := range msgs {
resentMsgIDs[i] = m.ID()
}
Expand Down

0 comments on commit 1e65bee

Please sign in to comment.