diff --git a/cmd/mailroom/main.go b/cmd/mailroom/main.go index a063f4cfa..f61055628 100644 --- a/cmd/mailroom/main.go +++ b/cmd/mailroom/main.go @@ -23,6 +23,7 @@ import ( _ "github.com/nyaruka/mailroom/core/tasks/groups" _ "github.com/nyaruka/mailroom/core/tasks/interrupts" _ "github.com/nyaruka/mailroom/core/tasks/ivr" + _ "github.com/nyaruka/mailroom/core/tasks/msgs" _ "github.com/nyaruka/mailroom/core/tasks/schedules" _ "github.com/nyaruka/mailroom/core/tasks/starts" _ "github.com/nyaruka/mailroom/core/tasks/stats" diff --git a/core/models/msgs.go b/core/models/msgs.go index d5eea2cd5..b94e7f49a 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/gocommon/gsm7" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/assets" @@ -118,6 +119,7 @@ type Msg struct { ContactURNID *URNID `db:"contact_urn_id" json:"contact_urn_id"` ResponseToID MsgID `db:"response_to_id" json:"response_to_id"` ResponseToExternalID null.String ` json:"response_to_external_id"` + IsResend bool ` json:"is_resend,omitempty"` URN urns.URN ` json:"urn"` URNAuth null.String ` json:"urn_auth,omitempty"` OrgID OrgID `db:"org_id" json:"org_id"` @@ -164,6 +166,7 @@ func (m *Msg) OrgID() OrgID { return m.m.OrgID } func (m *Msg) TopupID() TopupID { return m.m.TopupID } func (m *Msg) ContactID() ContactID { return m.m.ContactID } func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID } +func (m *Msg) IsResend() bool { return m.m.IsResend } func (m *Msg) SetTopup(topupID TopupID) { m.m.TopupID = topupID } func (m *Msg) SetChannelID(channelID ChannelID) { m.m.ChannelID = channelID } @@ -382,6 +385,60 @@ func NewIncomingMsg(orgID OrgID, channel *Channel, contactID ContactID, in *flow return msg } +var loadMessagesSQL = ` +SELECT + id, + broadcast_id, + uuid, + text, + created_on, + direction, + status, + visibility, + msg_count, + error_count, + next_attempt, + external_id, + attachments, + metadata, + channel_id, + connection_id, + contact_id, + contact_urn_id, + response_to_id, + org_id, + topup_id +FROM + msgs_msg +WHERE + org_id = $1 AND + direction = $2 AND + id = ANY($3) +ORDER BY + id ASC` + +// LoadMessages loads the given messages for the passed in org +func LoadMessages(ctx context.Context, db Queryer, orgID OrgID, direction MsgDirection, msgIDs []MsgID) ([]*Msg, error) { + rows, err := db.QueryxContext(ctx, loadMessagesSQL, orgID, direction, pq.Array(msgIDs)) + if err != nil { + return nil, errors.Wrapf(err, "error querying msgs for org: %d", orgID) + } + defer rows.Close() + + msgs := make([]*Msg, 0) + for rows.Next() { + msg := &Msg{} + err = rows.StructScan(&msg.m) + if err != nil { + return nil, errors.Wrap(err, "error scanning msg row") + } + + msgs = append(msgs, msg) + } + + return msgs, nil +} + // NormalizeAttachment will turn any relative URL in the passed in attachment and normalize it to // include the full host for attachment domains func NormalizeAttachment(attachment utils.Attachment) utils.Attachment { @@ -949,6 +1006,73 @@ func CreateBroadcastMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa return msgs, nil } +const updateMsgForResendingSQL = ` + UPDATE + msgs_msg m + SET + channel_id = r.channel_id::int, + topup_id = r.topup_id::int, + status = 'P', + error_count = 0, + queued_on = r.queued_on::timestamp with time zone, + sent_on = NULL, + modified_on = NOW() + FROM ( + VALUES(:id, :channel_id, :topup_id, :queued_on) + ) AS + r(id, channel_id, topup_id, queued_on) + WHERE + m.id = r.id::bigint +` + +// ResendMessages prepares messages for resending by reselecting a channel and marking them as PENDING +func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAssets, msgs []*Msg) error { + channels := oa.SessionAssets().Channels() + resends := make([]interface{}, len(msgs)) + + for i, msg := range msgs { + // reselect channel for this message's URN + urn, err := URNForID(ctx, db, oa, *msg.ContactURNID()) + if err != nil { + return errors.Wrap(err, "error loading URN") + } + contactURN, err := flows.ParseRawURN(channels, urn, assets.IgnoreMissing) + if err != nil { + return errors.Wrap(err, "error parsing URN") + } + ch := channels.GetForURN(contactURN, assets.ChannelRoleSend) + if ch != nil { + channel := oa.ChannelByUUID(ch.UUID()) + msg.m.ChannelID = channel.ID() + } else { + msg.m.ChannelID = NilChannelID + } + + // allocate a new topup for this message if org uses topups + msg.m.TopupID, err = AllocateTopups(ctx, db, rp, oa.Org(), 1) + if err != nil { + return errors.Wrapf(err, "error allocating topup for message resending") + } + + // mark message as being a resend so it will be queued to courier as such + msg.m.Status = MsgStatusPending + msg.m.QueuedOn = dates.Now() + msg.m.SentOn = dates.ZeroDateTime + msg.m.ErrorCount = 0 + msg.m.IsResend = true + + resends[i] = msg.m + } + + // update the messages in the database + err := BulkQuery(ctx, "updating messages for resending", db, updateMsgForResendingSQL, resends) + if err != nil { + return errors.Wrapf(err, "error updating messages for resending") + } + + return nil +} + // MarkBroadcastSent marks the passed in broadcast as sent func MarkBroadcastSent(ctx context.Context, db Queryer, id BroadcastID) error { // noop if it is a nil id diff --git a/core/models/msgs_test.go b/core/models/msgs_test.go index 6f0478c51..ab32a1396 100644 --- a/core/models/msgs_test.go +++ b/core/models/msgs_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/flows" @@ -147,6 +148,74 @@ func TestGetMessageIDFromUUID(t *testing.T) { assert.Equal(t, models.MsgID(msgIn.ID()), msgID) } +func TestLoadMessages(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + msgIn1 := testdata.InsertIncomingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "in 1") + msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", []utils.Attachment{"image/jpeg:hi.jpg"}) + msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 2", nil) + msgOut3 := testdata.InsertOutgoingMsg(t, db, models.Org2, models.Org2FredID, models.Org2FredURN, models.Org2FredURNID, "out 3", nil) + testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "hi 3", nil) + + ids := []models.MsgID{models.MsgID(msgIn1.ID()), models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID()), models.MsgID(msgOut3.ID())} + + msgs, err := models.LoadMessages(ctx, db, models.Org1, models.DirectionOut, ids) + + // should only return the outgoing messages for this org + require.NoError(t, err) + assert.Equal(t, 2, len(msgs)) + assert.Equal(t, "out 1", msgs[0].Text()) + assert.Equal(t, []utils.Attachment{"image/jpeg:hi.jpg"}, msgs[0].Attachments()) + assert.Equal(t, "out 2", msgs[1].Text()) + + msgs, err = models.LoadMessages(ctx, db, models.Org1, models.DirectionIn, ids) + + // should only return the incoming message for this org + require.NoError(t, err) + assert.Equal(t, 1, len(msgs)) + assert.Equal(t, "in 1", msgs[0].Text()) +} + +func TestResendMessages(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + rp := testsuite.RP() + + db.MustExec(`DELETE FROM msgs_msg`) + + oa, err := models.GetOrgAssets(ctx, db, models.Org1) + require.NoError(t, err) + + msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", nil) + msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.BobID, models.BobURN, models.BobURNID, "out 2", nil) + testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 3", nil) + + // make them look like failed messages + db.MustExec(`UPDATE msgs_msg SET status = 'F', sent_on = NOW(), error_count = 3`) + + // give Bob's URN an affinity for the Vonage channel + db.MustExec(`UPDATE contacts_contacturn SET channel_id = $1 WHERE id = $2`, models.VonageChannelID, models.BobURNID) + + msgs, err := models.LoadMessages(ctx, db, models.Org1, models.DirectionOut, []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())}) + require.NoError(t, err) + + now := dates.Now() + + // resend both msgs + err = models.ResendMessages(ctx, db, rp, oa, msgs) + require.NoError(t, err) + + // both messages should now have a channel, a topup and be marked for resending + assert.True(t, msgs[0].IsResend()) + assert.Equal(t, models.TwilioChannelID, msgs[0].ChannelID()) + assert.Equal(t, models.TopupID(1), msgs[0].TopupID()) + assert.True(t, msgs[1].IsResend()) + assert.Equal(t, models.VonageChannelID, msgs[1].ChannelID()) + assert.Equal(t, models.TopupID(1), msgs[1].TopupID()) + + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P' AND queued_on > $1 AND sent_on IS NULL`, []interface{}{now}, 2) +} + func TestNormalizeAttachment(t *testing.T) { config.Mailroom.AttachmentDomain = "foo.bar.com" defer func() { config.Mailroom.AttachmentDomain = "" }() diff --git a/core/tasks/msgs/resend_msgs.go b/core/tasks/msgs/resend_msgs.go new file mode 100644 index 000000000..2a705ef44 --- /dev/null +++ b/core/tasks/msgs/resend_msgs.go @@ -0,0 +1,53 @@ +package msgs + +import ( + "context" + "time" + + "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/msgio" + "github.com/nyaruka/mailroom/core/tasks" + + "github.com/pkg/errors" +) + +// TypeResendMsgs is the type of the resend messages task +const TypeResendMsgs = "resend_msgs" + +func init() { + tasks.RegisterType(TypeResendMsgs, func() tasks.Task { return &ResendMsgsTask{} }) +} + +// ResendMsgsTask is our task for resending messages +type ResendMsgsTask struct { + MsgIDs []models.MsgID `json:"msg_ids"` +} + +// Timeout is the maximum amount of time the task can run for +func (t *ResendMsgsTask) Timeout() time.Duration { + return time.Minute * 5 +} + +func (t *ResendMsgsTask) Perform(ctx context.Context, mr *mailroom.Mailroom, orgID models.OrgID) error { + db := mr.DB + rp := mr.RP + + oa, err := models.GetOrgAssets(ctx, mr.DB, orgID) + if err != nil { + return errors.Wrap(err, "unable to load org") + } + + msgs, err := models.LoadMessages(ctx, db, orgID, models.DirectionOut, t.MsgIDs) + if err != nil { + return errors.Wrap(err, "error loading messages to resend") + } + + err = models.ResendMessages(ctx, db, rp, oa, msgs) + if err != nil { + return errors.Wrap(err, "error cloning messages") + } + + msgio.SendMessages(ctx, db, rp, nil, msgs) + return nil +} diff --git a/core/tasks/msgs/resend_msgs_test.go b/core/tasks/msgs/resend_msgs_test.go new file mode 100644 index 000000000..30fc7990b --- /dev/null +++ b/core/tasks/msgs/resend_msgs_test.go @@ -0,0 +1,43 @@ +package msgs_test + +import ( + "testing" + + "github.com/nyaruka/goflow/utils" + "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/config" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/tasks/msgs" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + + "github.com/stretchr/testify/require" +) + +func TestResendMsgs(t *testing.T) { + testsuite.Reset() + ctx := testsuite.CTX() + db := testsuite.DB() + mr := &mailroom.Mailroom{Config: config.Mailroom, DB: db, RP: testsuite.RP(), ElasticClient: nil} + + msgOut1 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 1", []utils.Attachment{"image/jpeg:hi.jpg"}) + msgOut2 := testdata.InsertOutgoingMsg(t, db, models.Org1, models.BobID, models.BobURN, models.BobURNID, "out 2", nil) + testdata.InsertOutgoingMsg(t, db, models.Org1, models.CathyID, models.CathyURN, models.CathyURNID, "out 3", nil) + + db.MustExec(`UPDATE msgs_msg SET metadata = '{"topic":"cool-stuff"}' WHERE id = $1`, msgOut1.ID()) + + // create our task + task := &msgs.ResendMsgsTask{ + MsgIDs: []models.MsgID{models.MsgID(msgOut1.ID()), models.MsgID(msgOut2.ID())}, + } + + // execute it + err := task.Perform(ctx, mr, models.Org1) + require.NoError(t, err) + + // the two resent messages should now be pending and have a channel set + testsuite.AssertQueryCount(t, db, + `SELECT count(*) FROM msgs_msg WHERE status = 'P' AND channel_id IS NOT NULL AND id IN ($1, $2)`, + []interface{}{msgOut1.ID(), msgOut2.ID()}, 2, + ) +} diff --git a/testsuite/testdata/msgs.go b/testsuite/testdata/msgs.go index f4fc62964..beb9e9112 100644 --- a/testsuite/testdata/msgs.go +++ b/testsuite/testdata/msgs.go @@ -3,9 +3,11 @@ package testdata import ( "testing" + "github.com/lib/pq" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/gocommon/uuids" "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/utils" "github.com/nyaruka/mailroom/core/models" "github.com/jmoiron/sqlx" @@ -25,3 +27,17 @@ func InsertIncomingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID msg.SetID(id) return msg } + +// InsertOutgoingMsg inserts an outgoing message +func InsertOutgoingMsg(t *testing.T, db *sqlx.DB, orgID models.OrgID, contactID models.ContactID, urn urns.URN, urnID models.URNID, text string, attachments []utils.Attachment) *flows.MsgOut { + msg := flows.NewMsgOut(urn, nil, text, nil, nil, nil, flows.NilMsgTopic) + + var id flows.MsgID + err := db.Get(&id, + `INSERT INTO msgs_msg(uuid, text, attachments, created_on, direction, status, visibility, msg_count, error_count, next_attempt, contact_id, contact_urn_id, org_id) + VALUES($1, $2, $3, NOW(), 'O', 'P', 'V', 1, 0, NOW(), $4, $5, $6) RETURNING id`, msg.UUID(), text, pq.Array(attachments), contactID, urnID, orgID) + require.NoError(t, err) + + msg.SetID(id) + return msg +}