Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove topups #668

Merged
merged 4 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions core/hooks/commit_ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,8 @@ func (h *commitIVRHook) Apply(ctx context.Context, rt *runtime.Runtime, tx *sqlx
}
}

// find the topup we will assign
topup, err := models.AllocateTopups(ctx, tx, rt.RP, oa.Org(), len(msgs))
if err != nil {
return errors.Wrapf(err, "error allocating topup for outgoing IVR message")
}

// if we have an active topup, assign it to our messages
if topup != models.NilTopupID {
for _, m := range msgs {
m.SetTopup(topup)
}
}

// insert all our messages
err = models.InsertMessages(ctx, tx, msgs)
if err != nil {
if err := models.InsertMessages(ctx, tx, msgs); err != nil {
return errors.Wrapf(err, "error writing messages")
}

Expand Down
16 changes: 1 addition & 15 deletions core/hooks/commit_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,8 @@ func (h *commitMessagesHook) Apply(ctx context.Context, rt *runtime.Runtime, tx
}
}

// allocate a topup for this message if org uses topups
topup, err := models.AllocateTopups(ctx, tx, rt.RP, oa.Org(), len(msgs))
if err != nil {
return errors.Wrapf(err, "error allocating topup for outgoing message")
}

// if we have an active topup, assign it to our messages
if topup != models.NilTopupID {
for _, m := range msgs {
m.SetTopup(topup)
}
}

// insert all our messages
err = models.InsertMessages(ctx, tx, msgs)
if err != nil {
if err := models.InsertMessages(ctx, tx, msgs); err != nil {
return errors.Wrapf(err, "error writing messages")
}

Expand Down
11 changes: 1 addition & 10 deletions core/ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,17 +580,8 @@ func buildMsgResume(
// create an incoming message
msg := models.NewIncomingIVR(rt.Config, oa.OrgID(), call, msgIn, time.Now())

// allocate a topup for this message if org uses topups)
topupID, err := models.AllocateTopups(ctx, rt.DB, rt.RP, oa.Org(), 1)
if err != nil {
return nil, nil, errors.Wrapf(err, "error allocating topup for incoming IVR message")
}

msg.SetTopup(topupID)

// commit it
err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg})
if err != nil {
if err := models.InsertMessages(ctx, rt.DB, []*models.Msg{msg}); err != nil {
return nil, nil, errors.Wrapf(err, "error committing new message")
}

Expand Down
47 changes: 8 additions & 39 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ type Msg struct {
URN urns.URN `db:"urn_urn" json:"urn"`
URNAuth null.String `db:"urn_auth" json:"urn_auth,omitempty"`
OrgID OrgID `db:"org_id" json:"org_id"`
TopupID TopupID `db:"topup_id" json:"-"`
FlowID FlowID `db:"flow_id" json:"-"`

// extra data from handling added to the courier payload
Expand Down Expand Up @@ -183,14 +182,11 @@ func (m *Msg) ChannelUUID() assets.ChannelUUID { return m.m.ChannelUUID }
func (m *Msg) URN() urns.URN { return m.m.URN }
func (m *Msg) URNAuth() null.String { return m.m.URNAuth }
func (m *Msg) OrgID() OrgID { return m.m.OrgID }
func (m *Msg) TopupID() TopupID { return m.m.TopupID }
func (m *Msg) FlowID() FlowID { return m.m.FlowID }
func (m *Msg) ContactID() ContactID { return m.m.ContactID }
func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID }
func (m *Msg) IsResend() bool { return m.m.IsResend }

func (m *Msg) SetTopup(topupID TopupID) { m.m.TopupID = topupID }

func (m *Msg) SetChannel(channel *Channel) {
m.channel = channel
if channel != nil {
Expand Down Expand Up @@ -254,7 +250,6 @@ func NewIncomingIVR(cfg *runtime.Config, orgID OrgID, call *Call, in *flows.MsgI
m.ChannelID = call.ChannelID()

m.OrgID = orgID
m.TopupID = NilTopupID
m.CreatedOn = createdOn

// add any attachments
Expand Down Expand Up @@ -287,7 +282,6 @@ func NewOutgoingIVR(cfg *runtime.Config, orgID OrgID, call *Call, out *flows.Msg
m.URN = out.URN()

m.OrgID = orgID
m.TopupID = NilTopupID
m.CreatedOn = createdOn
m.SentOn = &createdOn

Expand Down Expand Up @@ -353,7 +347,6 @@ func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *fl
m.OrgID = org.ID()
m.ContactID = ContactID(contact.ID())
m.BroadcastID = broadcastID
m.TopupID = NilTopupID
m.Text = out.Text()
m.HighPriority = false
m.Direction = DirectionOut
Expand Down Expand Up @@ -450,7 +443,6 @@ func NewIncomingMsg(cfg *runtime.Config, orgID OrgID, channel *Channel, contactI
m.MsgType = MsgTypeFlow
m.ContactID = contactID
m.OrgID = orgID
m.TopupID = NilTopupID
m.CreatedOn = createdOn

// add any attachments
Expand Down Expand Up @@ -482,8 +474,7 @@ SELECT
channel_id,
contact_id,
contact_urn_id,
org_id,
topup_id
org_id
FROM
msgs_msg
WHERE
Expand Down Expand Up @@ -520,7 +511,6 @@ SELECT
m.contact_id,
m.contact_urn_id,
m.org_id,
m.topup_id,
u.identity AS "urn_urn",
u.auth AS "urn_auth"
FROM
Expand Down Expand Up @@ -625,18 +615,18 @@ const insertMsgSQL = `
INSERT INTO
msgs_msg(uuid, text, high_priority, created_on, modified_on, queued_on, sent_on, direction, status, attachments, metadata,
visibility, msg_type, msg_count, error_count, next_attempt, failed_reason, channel_id,
contact_id, contact_urn_id, org_id, topup_id, flow_id, broadcast_id)
contact_id, contact_urn_id, org_id, flow_id, broadcast_id)
VALUES(:uuid, :text, :high_priority, :created_on, now(), now(), :sent_on, :direction, :status, :attachments, :metadata,
:visibility, :msg_type, :msg_count, :error_count, :next_attempt, :failed_reason, :channel_id,
:contact_id, :contact_urn_id, :org_id, :topup_id, :flow_id, :broadcast_id)
:contact_id, :contact_urn_id, :org_id, :flow_id, :broadcast_id)
RETURNING
id as id,
now() as modified_on,
now() as queued_on
`

// UpdateMessage updates a message after handling
func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status MsgStatus, visibility MsgVisibility, msgType MsgType, flow FlowID, topup TopupID, attachments []utils.Attachment, logUUIDs []ChannelLogUUID) error {
func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status MsgStatus, visibility MsgVisibility, msgType MsgType, flow FlowID, attachments []utils.Attachment, logUUIDs []ChannelLogUUID) error {
_, err := tx.ExecContext(ctx,
`UPDATE
msgs_msg
Expand All @@ -645,12 +635,11 @@ func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status Ms
visibility = $3,
msg_type = $4,
flow_id = $5,
topup_id = $6,
attachments = $7,
log_uuids = array_cat(log_uuids, $8)
attachments = $6,
log_uuids = array_cat(log_uuids, $7)
WHERE
id = $1`,
msgID, status, visibility, msgType, flow, topup, pq.Array(attachments), pq.Array(logUUIDs))
msgID, status, visibility, msgType, flow, pq.Array(attachments), pq.Array(logUUIDs))

if err != nil {
return errors.Wrapf(err, "error updating msg: %d", msgID)
Expand Down Expand Up @@ -1120,19 +1109,6 @@ func (b *BroadcastBatch) CreateMessages(ctx context.Context, rt *runtime.Runtime
}
}

// allocate a topup for these message if org uses topups
topup, err := AllocateTopups(ctx, rt.DB, rt.RP, oa.Org(), len(msgs))
if err != nil {
return nil, errors.Wrapf(err, "error allocating topup for broadcast messages")
}

// if we have an active topup, assign it to our messages
if topup != NilTopupID {
for _, m := range msgs {
m.SetTopup(topup)
}
}

// insert them in a single request
err = InsertMessages(ctx, rt.DB, msgs)
if err != nil {
Expand Down Expand Up @@ -1183,14 +1159,13 @@ func (b *BroadcastBatch) updateTicket(ctx context.Context, db Queryer, oa *OrgAs
const sqlUpdateMsgForResending = `
UPDATE msgs_msg m
SET channel_id = r.channel_id::int,
topup_id = r.topup_id::int,
status = 'P',
error_count = 0,
failed_reason = NULL,
queued_on = r.queued_on::timestamp with time zone,
sent_on = NULL,
modified_on = NOW()
FROM (VALUES(:id, :channel_id, :topup_id, :queued_on)) AS r(id, channel_id, topup_id, queued_on)
FROM (VALUES(:id, :channel_id, :queued_on)) AS r(id, channel_id, queued_on)
WHERE m.id = r.id::bigint`

const sqlUpdateMsgResendFailed = `
Expand All @@ -1210,7 +1185,6 @@ func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAsse

for _, msg := range msgs {
var ch *flows.Channel
var err error
urnID := msg.ContactURNID()

if urnID != nil {
Expand Down Expand Up @@ -1242,11 +1216,6 @@ func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAsse
msg.m.FailedReason = ""
msg.m.IsResend = true // mark message as being a resend so it will be queued to courier as such

// allocate a new topup for this message if org uses topups
msg.m.TopupID, err = AllocateTopups(ctx, db, rp, oa.Org(), 1)
if err != nil {
return nil, errors.Wrapf(err, "error allocating topup for message resending")
}
resends = append(resends, msg.m)
resent = append(resent, msg)
} else {
Expand Down
4 changes: 1 addition & 3 deletions core/models/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,11 @@ func TestResendMessages(t *testing.T) {

assert.Len(t, resent, 3) // only #1, #2 and #3 can be resent

// both messages should now have a channel, a topup and be marked for resending
// both messages should now have a channel and be marked for resending
assert.True(t, resent[0].IsResend())
assert.Equal(t, testdata.TwilioChannel.ID, resent[0].ChannelID())
assert.Equal(t, models.TopupID(1), resent[0].TopupID())
assert.True(t, resent[1].IsResend())
assert.Equal(t, testdata.VonageChannel.ID, resent[1].ChannelID()) // channel changed
assert.Equal(t, models.TopupID(1), resent[1].TopupID())
assert.True(t, resent[2].IsResend())
assert.Equal(t, testdata.TwilioChannel.ID, resent[2].ChannelID()) // channel added

Expand Down
11 changes: 3 additions & 8 deletions core/models/orgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ const (
// Org is mailroom's type for RapidPro orgs. It also implements the envs.Environment interface for GoFlow
type Org struct {
o struct {
ID OrgID `json:"id"`
Suspended bool `json:"is_suspended"`
UsesTopups bool `json:"uses_topups"`
Config null.Map `json:"config"`
ID OrgID `json:"id"`
Suspended bool `json:"is_suspended"`
Config null.Map `json:"config"`
}
env envs.Environment
}
Expand All @@ -82,9 +81,6 @@ func (o *Org) ID() OrgID { return o.o.ID }
// Suspended returns whether the org has been suspended
func (o *Org) Suspended() bool { return o.o.Suspended }

// UsesTopups returns whether the org uses topups
func (o *Org) UsesTopups() bool { return o.o.UsesTopups }

// DateFormat returns the date format for this org
func (o *Org) DateFormat() envs.DateFormat { return o.env.DateFormat() }

Expand Down Expand Up @@ -252,7 +248,6 @@ const selectOrgByID = `
SELECT ROW_TO_JSON(o) FROM (SELECT
id,
is_suspended,
uses_topups,
COALESCE(o.config::json,'{}'::json) AS config,
(SELECT CASE date_format WHEN 'D' THEN 'DD-MM-YYYY' WHEN 'M' THEN 'MM-DD-YYYY' END) AS date_format,
'tt:mm' AS time_format,
Expand Down
1 change: 0 additions & 1 deletion core/models/orgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestOrgs(t *testing.T) {

assert.Equal(t, models.OrgID(1), org.ID())
assert.False(t, org.Suspended())
assert.True(t, org.UsesTopups())
assert.Equal(t, envs.DateFormatDayMonthYear, org.DateFormat())
assert.Equal(t, envs.TimeFormatHourMinute, org.TimeFormat())
assert.Equal(t, envs.RedactionPolicyNone, org.RedactionPolicy())
Expand Down
Loading