From 945df1bd8eacebbd87aa63d9ed8f20f090c2a9e8 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 30 Sep 2022 16:09:30 -0500 Subject: [PATCH 1/2] Remove topups --- core/hooks/commit_ivr.go | 16 +-- core/hooks/commit_messages.go | 16 +-- core/ivr/ivr.go | 11 +- core/models/msgs.go | 55 ++------ core/models/msgs_test.go | 4 +- core/models/topups.go | 170 ------------------------- core/models/topups_test.go | 74 ----------- core/runner/runner_test.go | 2 +- core/tasks/handler/worker.go | 24 ++-- core/tasks/msgs/send_broadcast_test.go | 4 +- 10 files changed, 24 insertions(+), 352 deletions(-) delete mode 100644 core/models/topups.go delete mode 100644 core/models/topups_test.go diff --git a/core/hooks/commit_ivr.go b/core/hooks/commit_ivr.go index 62a18c877..24ff0da83 100644 --- a/core/hooks/commit_ivr.go +++ b/core/hooks/commit_ivr.go @@ -24,22 +24,8 @@ func (h *commitIVRHook) Apply(ctx context.Context, rt *runtime.Runtime, tx *sqlx } } - // find the topup we will assign - topup, err := models.AllocateTopups(ctx, tx, rt.RP, oa.Org(), len(msgs)) - if err != nil { - return errors.Wrapf(err, "error allocating topup for outgoing IVR message") - } - - // if we have an active topup, assign it to our messages - if topup != models.NilTopupID { - for _, m := range msgs { - m.SetTopup(topup) - } - } - // insert all our messages - err = models.InsertMessages(ctx, tx, msgs) - if err != nil { + if err := models.InsertMessages(ctx, tx, msgs); err != nil { return errors.Wrapf(err, "error writing messages") } diff --git a/core/hooks/commit_messages.go b/core/hooks/commit_messages.go index 5e058b6d2..323a82153 100644 --- a/core/hooks/commit_messages.go +++ b/core/hooks/commit_messages.go @@ -24,22 +24,8 @@ func (h *commitMessagesHook) Apply(ctx context.Context, rt *runtime.Runtime, tx } } - // allocate a topup for this message if org uses topups - topup, err := models.AllocateTopups(ctx, tx, rt.RP, oa.Org(), len(msgs)) - if err != nil { - return errors.Wrapf(err, "error allocating topup for outgoing message") - } - - // if we have an active topup, assign it to our messages - if topup != models.NilTopupID { - for _, m := range msgs { - m.SetTopup(topup) - } - } - // insert all our messages - err = models.InsertMessages(ctx, tx, msgs) - if err != nil { + if err := models.InsertMessages(ctx, tx, msgs); err != nil { return errors.Wrapf(err, "error writing messages") } diff --git a/core/ivr/ivr.go b/core/ivr/ivr.go index 829f16be4..b88daafac 100644 --- a/core/ivr/ivr.go +++ b/core/ivr/ivr.go @@ -581,17 +581,8 @@ func buildMsgResume( // create an incoming message msg := models.NewIncomingIVR(rt.Config, oa.OrgID(), call, msgIn, time.Now()) - // allocate a topup for this message if org uses topups) - topupID, err := models.AllocateTopups(ctx, rt.DB, rt.RP, oa.Org(), 1) - if err != nil { - return nil, nil, errors.Wrapf(err, "error allocating topup for incoming IVR message") - } - - msg.SetTopup(topupID) - // commit it - err = models.InsertMessages(ctx, rt.DB, []*models.Msg{msg}) - if err != nil { + if err := models.InsertMessages(ctx, rt.DB, []*models.Msg{msg}); err != nil { return nil, nil, errors.Wrapf(err, "error committing new message") } diff --git a/core/models/msgs.go b/core/models/msgs.go index 15c96fb29..741e1b1f4 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -140,7 +140,6 @@ type Msg struct { URN urns.URN `db:"urn_urn" json:"urn"` URNAuth null.String `db:"urn_auth" json:"urn_auth,omitempty"` OrgID OrgID `db:"org_id" json:"org_id"` - TopupID TopupID `db:"topup_id" json:"-"` FlowID FlowID `db:"flow_id" json:"-"` // extra data from handling added to the courier payload @@ -183,14 +182,11 @@ func (m *Msg) ChannelUUID() assets.ChannelUUID { return m.m.ChannelUUID } func (m *Msg) URN() urns.URN { return m.m.URN } func (m *Msg) URNAuth() null.String { return m.m.URNAuth } func (m *Msg) OrgID() OrgID { return m.m.OrgID } -func (m *Msg) TopupID() TopupID { return m.m.TopupID } func (m *Msg) FlowID() FlowID { return m.m.FlowID } func (m *Msg) ContactID() ContactID { return m.m.ContactID } func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID } func (m *Msg) IsResend() bool { return m.m.IsResend } -func (m *Msg) SetTopup(topupID TopupID) { m.m.TopupID = topupID } - func (m *Msg) SetChannel(channel *Channel) { m.channel = channel if channel != nil { @@ -254,7 +250,6 @@ func NewIncomingIVR(cfg *runtime.Config, orgID OrgID, call *Call, in *flows.MsgI m.ChannelID = call.ChannelID() m.OrgID = orgID - m.TopupID = NilTopupID m.CreatedOn = createdOn // add any attachments @@ -287,7 +282,6 @@ func NewOutgoingIVR(cfg *runtime.Config, orgID OrgID, call *Call, out *flows.Msg m.URN = out.URN() m.OrgID = orgID - m.TopupID = NilTopupID m.CreatedOn = createdOn m.SentOn = &createdOn @@ -353,7 +347,6 @@ func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *fl m.OrgID = org.ID() m.ContactID = ContactID(contact.ID()) m.BroadcastID = broadcastID - m.TopupID = NilTopupID m.Text = out.Text() m.HighPriority = false m.Direction = DirectionOut @@ -450,7 +443,6 @@ func NewIncomingMsg(cfg *runtime.Config, orgID OrgID, channel *Channel, contactI m.MsgType = MsgTypeFlow m.ContactID = contactID m.OrgID = orgID - m.TopupID = NilTopupID m.CreatedOn = createdOn // add any attachments @@ -482,8 +474,7 @@ SELECT channel_id, contact_id, contact_urn_id, - org_id, - topup_id + org_id FROM msgs_msg WHERE @@ -520,7 +511,6 @@ SELECT m.contact_id, m.contact_urn_id, m.org_id, - m.topup_id, u.identity AS "urn_urn", u.auth AS "urn_auth" FROM @@ -621,10 +611,10 @@ const insertMsgSQL = ` INSERT INTO msgs_msg(uuid, text, high_priority, created_on, modified_on, queued_on, sent_on, direction, status, attachments, metadata, visibility, msg_type, msg_count, error_count, next_attempt, failed_reason, channel_id, - contact_id, contact_urn_id, org_id, topup_id, flow_id, broadcast_id) + contact_id, contact_urn_id, org_id, flow_id, broadcast_id) VALUES(:uuid, :text, :high_priority, :created_on, now(), now(), :sent_on, :direction, :status, :attachments, :metadata, :visibility, :msg_type, :msg_count, :error_count, :next_attempt, :failed_reason, :channel_id, - :contact_id, :contact_urn_id, :org_id, :topup_id, :flow_id, :broadcast_id) + :contact_id, :contact_urn_id, :org_id, :flow_id, :broadcast_id) RETURNING id as id, now() as modified_on, @@ -632,20 +622,11 @@ RETURNING ` // UpdateMessage updates a message after handling -func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status MsgStatus, visibility MsgVisibility, msgType MsgType, flow FlowID, topup TopupID) error { +func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status MsgStatus, visibility MsgVisibility, msgType MsgType, flow FlowID) error { _, err := tx.ExecContext(ctx, - `UPDATE - msgs_msg - SET - status = $2, - visibility = $3, - msg_type = $4, - flow_id = $5, - topup_id = $6 - WHERE - id = $1`, - msgID, status, visibility, msgType, flow, topup) - + `UPDATE msgs_msg SET status = $2, visibility = $3, msg_type = $4, flow_id = $5 WHERE id = $1`, + msgID, status, visibility, msgType, flow, + ) if err != nil { return errors.Wrapf(err, "error updating msg: %d", msgID) } @@ -1114,19 +1095,6 @@ func (b *BroadcastBatch) CreateMessages(ctx context.Context, rt *runtime.Runtime } } - // allocate a topup for these message if org uses topups - topup, err := AllocateTopups(ctx, rt.DB, rt.RP, oa.Org(), len(msgs)) - if err != nil { - return nil, errors.Wrapf(err, "error allocating topup for broadcast messages") - } - - // if we have an active topup, assign it to our messages - if topup != NilTopupID { - for _, m := range msgs { - m.SetTopup(topup) - } - } - // insert them in a single request err = InsertMessages(ctx, rt.DB, msgs) if err != nil { @@ -1177,14 +1145,13 @@ func (b *BroadcastBatch) updateTicket(ctx context.Context, db Queryer, oa *OrgAs const sqlUpdateMsgForResending = ` UPDATE msgs_msg m SET channel_id = r.channel_id::int, - topup_id = r.topup_id::int, status = 'P', error_count = 0, failed_reason = NULL, queued_on = r.queued_on::timestamp with time zone, sent_on = NULL, modified_on = NOW() - FROM (VALUES(:id, :channel_id, :topup_id, :queued_on)) AS r(id, channel_id, topup_id, queued_on) + FROM (VALUES(:id, :channel_id, :queued_on)) AS r(id, channel_id, queued_on) WHERE m.id = r.id::bigint` const sqlUpdateMsgResendFailed = ` @@ -1204,7 +1171,6 @@ func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAsse for _, msg := range msgs { var ch *flows.Channel - var err error urnID := msg.ContactURNID() if urnID != nil { @@ -1236,11 +1202,6 @@ func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAsse msg.m.FailedReason = "" msg.m.IsResend = true // mark message as being a resend so it will be queued to courier as such - // allocate a new topup for this message if org uses topups - msg.m.TopupID, err = AllocateTopups(ctx, db, rp, oa.Org(), 1) - if err != nil { - return nil, errors.Wrapf(err, "error allocating topup for message resending") - } resends = append(resends, msg.m) resent = append(resent, msg) } else { diff --git a/core/models/msgs_test.go b/core/models/msgs_test.go index 4355774db..50625a7e8 100644 --- a/core/models/msgs_test.go +++ b/core/models/msgs_test.go @@ -472,13 +472,11 @@ func TestResendMessages(t *testing.T) { assert.Len(t, resent, 3) // only #1, #2 and #3 can be resent - // both messages should now have a channel, a topup and be marked for resending + // both messages should now have a channel and be marked for resending assert.True(t, resent[0].IsResend()) assert.Equal(t, testdata.TwilioChannel.ID, resent[0].ChannelID()) - assert.Equal(t, models.TopupID(1), resent[0].TopupID()) assert.True(t, resent[1].IsResend()) assert.Equal(t, testdata.VonageChannel.ID, resent[1].ChannelID()) // channel changed - assert.Equal(t, models.TopupID(1), resent[1].TopupID()) assert.True(t, resent[2].IsResend()) assert.Equal(t, testdata.TwilioChannel.ID, resent[2].ChannelID()) // channel added diff --git a/core/models/topups.go b/core/models/topups.go deleted file mode 100644 index 46f6773a1..000000000 --- a/core/models/topups.go +++ /dev/null @@ -1,170 +0,0 @@ -package models - -import ( - "context" - "database/sql/driver" - "fmt" - "time" - - "github.com/gomodule/redigo/redis" - "github.com/nyaruka/null" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -// TopupID is our type for topup ids, which can be null -type TopupID null.Int - -// NilTopupID is our nil value for topup id -var NilTopupID = TopupID(0) - -const ( - // also check lua script if modifying these - redisOrgCredistsUsedKey = `org:%d:cache:credits_used` - redisActiveTopupKey = `org:%d:cache:active_topup` - redisCreditsRemainingKey = `org:%d:cache:credits_remaining:%d` -) - -// AllocateTopups allocates topups for the given number of messages if topups are used by the org. -// If topups are allocated it will return the ID of the topup to assign to those messages. -func AllocateTopups(ctx context.Context, db Queryer, rp *redis.Pool, org *Org, amount int) (TopupID, error) { - rc := rp.Get() - defer rc.Close() - - // if org doesn't use topups, do nothing - if !org.UsesTopups() { - return NilTopupID, nil - } - - // no matter what we decrement our org credit - topups, err := redis.Ints(decrementCreditLua.Do(rc, org.ID(), amount)) - if err != nil { - return NilTopupID, err - } - - // we found an active topup, return it - if topups[0] > 0 { - return TopupID(topups[0]), nil - } - - // no active topup found, lets calculate it - topup, err := CalculateActiveTopup(ctx, db, org.ID()) - if err != nil { - return NilTopupID, err - } - - // no topup found, oh well - if topup == nil { - return NilTopupID, nil - } - - // got one? then cache it - expireSeconds := -int(time.Since(topup.Expiration) / time.Second) - if expireSeconds > 0 && topup.Remaining-amount > 0 { - rc.Send("SETEX", fmt.Sprintf(redisActiveTopupKey, org.ID()), expireSeconds, topup.ID) - _, err := rc.Do("SETEX", fmt.Sprintf(redisCreditsRemainingKey, org.ID(), topup.ID), expireSeconds, topup.Remaining-amount) - if err != nil { - // an error here isn't the end of the world, log it and move on - logrus.WithError(err).Errorf("error setting active topup in redis for org: %d", org.ID()) - } - } - - return topup.ID, nil -} - -var decrementCreditLua = redis.NewScript(2, `-- KEYS: [OrgID] [Amount] --- first check whether we have an org level cache of credits used, and if so decrement it -local ttl = redis.call('ttl', 'org:' .. KEYS[1] .. ':cache:credits_used') -if ttl > 0 then - redis.call('incrby', 'org:' .. KEYS[1] .. ':cache:credits_used', KEYS[2]) -end - --- look up our active topup -local orgKey = 'org:' .. KEYS[1] .. ':cache:active_topup' -local activeTopup = redis.call('get', orgKey) -local remaining = -1 - --- found an active topup, try do decrement its credits -if activeTopup then - local topupKey = 'org:' .. KEYS[1] .. ':cache:credits_remaining:' .. tonumber(activeTopup) - remaining = redis.call('decrby', topupKey, KEYS[2]) - if remaining <= 0 then - redis.call('del', topupKey, orgKey) - end --- no active topup cached -else - activeTopup = -1 -end - -return {activeTopup, remaining} -`) - -// CalculateActiveTopup loads the active topup for the passed in org -func CalculateActiveTopup(ctx context.Context, db Queryer, orgID OrgID) (*Topup, error) { - topup := &Topup{} - rows, err := db.QueryxContext(ctx, selectActiveTopup, orgID) - if err != nil { - return nil, errors.Wrapf(err, "error loading active topup for org: %d", orgID) - } - defer rows.Close() - - if !rows.Next() { - return nil, nil - } - - err = rows.StructScan(topup) - if err != nil { - return nil, errors.Wrapf(err, "error scanning topup") - } - - return topup, nil -} - -// Topup is our internal struct representing an org's topup and expiration date -type Topup struct { - ID TopupID `db:"id"` - Remaining int `db:"remaining"` - Expiration time.Time `db:"expires_on"` -} - -const selectActiveTopup = ` -SELECT - t.id as id, - t.credits - COALESCE(SUM(tc.used), 0) as remaining, - t.expires_on as expires_on -FROM - orgs_topup t - LEFT OUTER JOIN orgs_topupcredits tc ON (t.id = tc.topup_id) -WHERE - t.org_id = $1 AND - t.expires_on >= NOW() AND - t.is_active = TRUE AND - t.credits > 0 -GROUP BY - t.id -HAVING - COALESCE(SUM(tc.used), 0) < (t.credits) -ORDER BY - t.expires_on ASC, t.id ASC -LIMIT 1 -` - -// MarshalJSON marshals into JSON. 0 values will become null -func (i TopupID) MarshalJSON() ([]byte, error) { - return null.Int(i).MarshalJSON() -} - -// UnmarshalJSON unmarshals from JSON. null values become 0 -func (i *TopupID) UnmarshalJSON(b []byte) error { - return null.UnmarshalInt(b, (*null.Int)(i)) -} - -// Value returns the db value, null is returned for 0 -func (i TopupID) Value() (driver.Value, error) { - return null.Int(i).Value() -} - -// Scan scans from the db value. null values become 0 -func (i *TopupID) Scan(value interface{}) error { - return null.ScanInt(value, (*null.Int)(i)) -} diff --git a/core/models/topups_test.go b/core/models/topups_test.go deleted file mode 100644 index c126149e0..000000000 --- a/core/models/topups_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package models_test - -import ( - "testing" - - "github.com/nyaruka/mailroom/core/models" - "github.com/nyaruka/mailroom/testsuite" - "github.com/nyaruka/mailroom/testsuite/testdata" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestTopups(t *testing.T) { - ctx, rt, db, rp := testsuite.Get() - - tx, err := db.BeginTxx(ctx, nil) - assert.NoError(t, err) - defer tx.Rollback() - - tx.MustExec(`INSERT INTO orgs_topupcredits(is_squashed, used, topup_id) - VALUES(TRUE, 1000000, 1),(TRUE, 99000, 2),(TRUE, 998, 2)`) - - tcs := []struct { - OrgID models.OrgID - TopupID models.TopupID - Remaining int - }{ - {testdata.Org1.ID, models.NilTopupID, 0}, - {testdata.Org2.ID, models.TopupID(2), 2}, - } - - for _, tc := range tcs { - topup, err := models.CalculateActiveTopup(ctx, tx, tc.OrgID) - assert.NoError(t, err) - - if tc.TopupID == models.NilTopupID { - assert.Nil(t, topup) - } else { - assert.NotNil(t, topup) - assert.Equal(t, tc.TopupID, topup.ID) - assert.Equal(t, tc.Remaining, topup.Remaining) - } - } - - tc2s := []struct { - OrgID models.OrgID - TopupID models.TopupID - }{ - {testdata.Org1.ID, models.NilTopupID}, - {testdata.Org2.ID, models.TopupID(2)}, - {testdata.Org2.ID, models.TopupID(2)}, - {testdata.Org2.ID, models.NilTopupID}, - } - - for _, tc := range tc2s { - org, err := models.LoadOrg(ctx, rt.Config, tx, tc.OrgID) - assert.NoError(t, err) - - topup, err := models.AllocateTopups(ctx, tx, rp, org, 1) - assert.NoError(t, err) - assert.Equal(t, tc.TopupID, topup) - tx.MustExec(`INSERT INTO orgs_topupcredits(is_squashed, used, topup_id) VALUES(TRUE, 1, $1)`, tc.OrgID) - } - - // topups can be disabled for orgs - tx.MustExec(`UPDATE orgs_org SET uses_topups = FALSE WHERE id = $1`, testdata.Org1.ID) - org, err := models.LoadOrg(ctx, rt.Config, tx, testdata.Org1.ID) - require.NoError(t, err) - - topup, err := models.AllocateTopups(ctx, tx, rp, org, 1) - assert.NoError(t, err) - assert.Equal(t, models.NilTopupID, topup) -} diff --git a/core/runner/runner_test.go b/core/runner/runner_test.go index 0085d6ec3..9844ac14c 100644 --- a/core/runner/runner_test.go +++ b/core/runner/runner_test.go @@ -242,7 +242,7 @@ func TestBatchStart(t *testing.T) { assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE contact_id = ANY($1) AND text = $2 AND org_id = 1 AND status = 'Q' - AND queued_on IS NOT NULL AND direction = 'O' AND topup_id IS NOT NULL AND msg_type = 'F' AND channel_id = $3`, + AND queued_on IS NOT NULL AND direction = 'O' AND msg_type = 'F' AND channel_id = $3`, pq.Array(contactIDs), tc.Msg, testdata.TwilioChannel.ID). Returns(tc.TotalCount, "%d: unexpected number of messages", i) diff --git a/core/tasks/handler/worker.go b/core/tasks/handler/worker.go index beebd4154..457e69869 100644 --- a/core/tasks/handler/worker.go +++ b/core/tasks/handler/worker.go @@ -480,12 +480,6 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e return errors.Wrapf(err, "error loading org") } - // allocate a topup for this message if org uses topups - topupID, err := models.AllocateTopups(ctx, rt.DB, rt.RP, oa.Org(), 1) - if err != nil { - return errors.Wrapf(err, "error allocating topup for incoming message") - } - // load our contact contacts, err := models.LoadContacts(ctx, rt.ReadonlyDB, oa, []models.ContactID{event.ContactID}) if err != nil { @@ -494,7 +488,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e // contact has been deleted, ignore this message but mark it as handled if len(contacts) == 0 { - err := models.UpdateMessage(ctx, rt.DB, event.MsgID, models.MsgStatusHandled, models.VisibilityArchived, models.MsgTypeInbox, models.NilFlowID, topupID) + err := models.UpdateMessage(ctx, rt.DB, event.MsgID, models.MsgStatusHandled, models.VisibilityArchived, models.MsgTypeInbox, models.NilFlowID) if err != nil { return errors.Wrapf(err, "error updating message for deleted contact") } @@ -516,7 +510,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e // if this channel is no longer active or this contact is blocked, ignore this message (mark it as handled) if channel == nil || modelContact.Status() == models.ContactStatusBlocked { - err := models.UpdateMessage(ctx, rt.DB, event.MsgID, models.MsgStatusHandled, models.VisibilityArchived, models.MsgTypeInbox, models.NilFlowID, topupID) + err := models.UpdateMessage(ctx, rt.DB, event.MsgID, models.MsgStatusHandled, models.VisibilityArchived, models.MsgTypeInbox, models.NilFlowID) if err != nil { return errors.Wrapf(err, "error marking blocked or nil channel message as handled") } @@ -594,7 +588,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e } sessions[0].SetIncomingMsg(event.MsgID, event.MsgExternalID) - return markMsgHandled(ctx, tx, contact, msgIn, flow, topupID, tickets) + return markMsgHandled(ctx, tx, contact, msgIn, flow, tickets) } // we found a trigger and their session is nil or doesn't ignore keywords @@ -611,7 +605,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e // if this is an IVR flow, we need to trigger that start (which happens in a different queue) if flow.FlowType() == models.FlowTypeVoice { ivrMsgHook := func(ctx context.Context, tx *sqlx.Tx) error { - return markMsgHandled(ctx, tx, contact, msgIn, flow, topupID, tickets) + return markMsgHandled(ctx, tx, contact, msgIn, flow, tickets) } err = runner.TriggerIVRFlow(ctx, rt, oa.OrgID(), flow.ID(), []models.ContactID{modelContact.ID()}, ivrMsgHook) if err != nil { @@ -641,7 +635,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e } // this message didn't trigger and new sessions or resume any existing ones, so handle as inbox - err = handleAsInbox(ctx, rt, oa, contact, msgIn, topupID, tickets) + err = handleAsInbox(ctx, rt, oa, contact, msgIn, tickets) if err != nil { return errors.Wrapf(err, "error handling inbox message") } @@ -745,7 +739,7 @@ func handleTicketEvent(ctx context.Context, rt *runtime.Runtime, event *models.T } // handles a message as an inbox message -func handleAsInbox(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, contact *flows.Contact, msg *flows.MsgIn, topupID models.TopupID, tickets []*models.Ticket) error { +func handleAsInbox(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, contact *flows.Contact, msg *flows.MsgIn, tickets []*models.Ticket) error { // usually last_seen_on is updated by handling the msg_received event in the engine sprint, but since this is an inbox // message we manually create that event and handle it msgEvent := events.NewMsgReceived(msg) @@ -757,11 +751,11 @@ func handleAsInbox(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAsset return errors.Wrap(err, "error handling inbox message events") } - return markMsgHandled(ctx, rt.DB, contact, msg, nil, topupID, tickets) + return markMsgHandled(ctx, rt.DB, contact, msg, nil, tickets) } // utility to mark as message as handled and update any open contact tickets -func markMsgHandled(ctx context.Context, db models.Queryer, contact *flows.Contact, msg *flows.MsgIn, flow *models.Flow, topupID models.TopupID, tickets []*models.Ticket) error { +func markMsgHandled(ctx context.Context, db models.Queryer, contact *flows.Contact, msg *flows.MsgIn, flow *models.Flow, tickets []*models.Ticket) error { msgType := models.MsgTypeInbox flowID := models.NilFlowID if flow != nil { @@ -769,7 +763,7 @@ func markMsgHandled(ctx context.Context, db models.Queryer, contact *flows.Conta flowID = flow.ID() } - err := models.UpdateMessage(ctx, db, msg.ID(), models.MsgStatusHandled, models.VisibilityVisible, msgType, flowID, topupID) + err := models.UpdateMessage(ctx, db, msg.ID(), models.MsgStatusHandled, models.VisibilityVisible, msgType, flowID) if err != nil { return errors.Wrapf(err, "error marking message as handled") } diff --git a/core/tasks/msgs/send_broadcast_test.go b/core/tasks/msgs/send_broadcast_test.go index 116f390ec..b5d9c769c 100644 --- a/core/tasks/msgs/send_broadcast_test.go +++ b/core/tasks/msgs/send_broadcast_test.go @@ -113,7 +113,7 @@ func TestBroadcastEvents(t *testing.T) { assert.Equal(t, tc.BatchCount, count, "%d: unexpected batch count", i) // assert our count of total msgs created - assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE org_id = 1 AND created_on > $1 AND topup_id IS NOT NULL AND text = $2`, lastNow, tc.MsgText). + assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE org_id = 1 AND created_on > $1 AND text = $2`, lastNow, tc.MsgText). Returns(tc.MsgCount, "%d: unexpected msg count", i) lastNow = time.Now() @@ -263,7 +263,7 @@ func TestBroadcastTask(t *testing.T) { assert.Equal(t, tc.BatchCount, count, "%d: unexpected batch count", i) // assert our count of total msgs created - assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE org_id = 1 AND created_on > $1 AND topup_id IS NOT NULL AND text = $2`, lastNow, tc.MsgText). + assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE org_id = 1 AND created_on > $1 AND text = $2`, lastNow, tc.MsgText). Returns(tc.MsgCount, "%d: unexpected msg count", i) // make sure our broadcast is marked as sent From 221fcbbfae939037c4c5f7edaf516d63841a91ac Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Wed, 12 Oct 2022 12:05:20 -0500 Subject: [PATCH 2/2] Remove Org.uses_topups --- core/models/orgs.go | 11 +++-------- core/models/orgs_test.go | 1 - 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/core/models/orgs.go b/core/models/orgs.go index f89259900..8b062e7f1 100644 --- a/core/models/orgs.go +++ b/core/models/orgs.go @@ -68,10 +68,9 @@ const ( // Org is mailroom's type for RapidPro orgs. It also implements the envs.Environment interface for GoFlow type Org struct { o struct { - ID OrgID `json:"id"` - Suspended bool `json:"is_suspended"` - UsesTopups bool `json:"uses_topups"` - Config null.Map `json:"config"` + ID OrgID `json:"id"` + Suspended bool `json:"is_suspended"` + Config null.Map `json:"config"` } env envs.Environment } @@ -82,9 +81,6 @@ func (o *Org) ID() OrgID { return o.o.ID } // Suspended returns whether the org has been suspended func (o *Org) Suspended() bool { return o.o.Suspended } -// UsesTopups returns whether the org uses topups -func (o *Org) UsesTopups() bool { return o.o.UsesTopups } - // DateFormat returns the date format for this org func (o *Org) DateFormat() envs.DateFormat { return o.env.DateFormat() } @@ -252,7 +248,6 @@ const selectOrgByID = ` SELECT ROW_TO_JSON(o) FROM (SELECT id, is_suspended, - uses_topups, COALESCE(o.config::json,'{}'::json) AS config, (SELECT CASE date_format WHEN 'D' THEN 'DD-MM-YYYY' WHEN 'M' THEN 'MM-DD-YYYY' END) AS date_format, 'tt:mm' AS time_format, diff --git a/core/models/orgs_test.go b/core/models/orgs_test.go index 11befb0a4..e2286d76b 100644 --- a/core/models/orgs_test.go +++ b/core/models/orgs_test.go @@ -36,7 +36,6 @@ func TestOrgs(t *testing.T) { assert.Equal(t, models.OrgID(1), org.ID()) assert.False(t, org.Suspended()) - assert.True(t, org.UsesTopups()) assert.Equal(t, envs.DateFormatDayMonthYear, org.DateFormat()) assert.Equal(t, envs.TimeFormatHourMinute, org.TimeFormat()) assert.Equal(t, envs.RedactionPolicyNone, org.RedactionPolicy())