Skip to content

Commit

Permalink
Merge pull request rapidpro#303 from nyaruka/keep_going
Browse files Browse the repository at this point in the history
⚙️ Handling as normal for suspended orgs
  • Loading branch information
rowanseymour authored Jun 18, 2020
2 parents e4d65a1 + f16d0c4 commit 4f527e6
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 63 deletions.
6 changes: 2 additions & 4 deletions hooks/ivr_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ func (h *CommitIVRHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool,
}

// find the topup we will assign
rc := rp.Get()
topup, err := models.AllocateTopups(ctx, tx, rc, oa.Org(), len(msgs))
rc.Close()
topup, err := models.AllocateTopups(ctx, tx, rp, oa.Org(), len(msgs))
if err != nil {
return errors.Wrapf(err, "error finding active topup")
return errors.Wrapf(err, "error allocating topup for outgoing IVR message")
}

// if we have an active topup, assign it to our messages
Expand Down
12 changes: 5 additions & 7 deletions hooks/msg_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func (h *SendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Poo

// for each scene gather all our messages
for s, args := range scenes {
// walk through our messages, separate by whether they have a topup
// walk through our messages, separate by whether they're android or not
courierMsgs := make([]*models.Msg, 0, len(args))

for _, m := range args {
msg := m.(*models.Msg)
channel := msg.Channel()
if msg.TopupID() != models.NilTopupID && channel != nil {
if channel != nil {
if channel.Type() == models.ChannelTypeAndroid {
androidChannels[channel] = true
} else {
Expand Down Expand Up @@ -149,12 +149,10 @@ func (h *CommitMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.P
}
}

// find the topup we will assign
rc := rp.Get()
topup, err := models.AllocateTopups(ctx, tx, rc, oa.Org(), len(msgs))
rc.Close()
// allocate a topup for this message if org uses topups
topup, err := models.AllocateTopups(ctx, tx, rp, oa.Org(), len(msgs))
if err != nil {
return errors.Wrapf(err, "error finding active topup")
return errors.Wrapf(err, "error allocating topup for outgoing message")
}

// if we have an active topup, assign it to our messages
Expand Down
2 changes: 1 addition & 1 deletion hooks/msg_created_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestNoTopup(t *testing.T) {
},
SQLAssertions: []SQLAssertion{
SQLAssertion{
SQL: "SELECT COUNT(*) FROM msgs_msg WHERE text='No Topup' AND contact_id = $1 AND status = 'P'",
SQL: "SELECT COUNT(*) FROM msgs_msg WHERE text='No Topup' AND contact_id = $1 AND status = 'Q'",
Args: []interface{}{models.CathyID},
Count: 1,
},
Expand Down
14 changes: 4 additions & 10 deletions ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,18 +520,12 @@ func ResumeIVRFlow(
// create an incoming message
msg := models.NewIncomingIVR(oa.OrgID(), conn, msgIn, time.Now())

// find a topup
rc := rp.Get()
topupID, err := models.AllocateTopups(ctx, db, rc, oa.Org(), 1)
rc.Close()

// error or no topup, that's an end of call
// allocate a topup for this message if org uses topups)
topupID, err := models.AllocateTopups(ctx, db, rp, oa.Org(), 1)
if err != nil {
return errors.Wrapf(err, "unable to look up topup")
}
if topupID == models.NilTopupID {
return client.WriteEmptyResponse(w, "no topups for org, exiting call")
return errors.Wrapf(err, "error allocating topup for incoming IVR message")
}

msg.SetTopup(topupID)

// commit it
Expand Down
8 changes: 3 additions & 5 deletions models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,12 +913,10 @@ func CreateBroadcastMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa
}
}

// get a topup to assign to our messages
rc := rp.Get()
topup, err := AllocateTopups(ctx, db, rc, oa.Org(), len(msgs))
rc.Close()
// allocate a topup for these message if org uses topups
topup, err := AllocateTopups(ctx, db, rp, oa.Org(), len(msgs))
if err != nil {
return nil, errors.Wrapf(err, "error finding active topup")
return nil, errors.Wrapf(err, "error allocating topup for broadcast messages")
}

// if we have an active topup, assign it to our messages
Expand Down
5 changes: 4 additions & 1 deletion models/topups.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ const (

// AllocateTopups allocates topups for the given number of messages if topups are used by the org.
// If topups are allocated it will return the ID of the topup to assign to those messages.
func AllocateTopups(ctx context.Context, db Queryer, rc redis.Conn, org *Org, amount int) (TopupID, error) {
func AllocateTopups(ctx context.Context, db Queryer, rp *redis.Pool, org *Org, amount int) (TopupID, error) {
rc := rp.Get()
defer rc.Close()

// if org doesn't use topups, do nothing
if !org.UsesTopups() {
return NilTopupID, nil
Expand Down
7 changes: 3 additions & 4 deletions models/topups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (
func TestTopups(t *testing.T) {
ctx := testsuite.CTX()
db := testsuite.DB()
rc := testsuite.RC()
defer rc.Close()
rp := testsuite.RP()

tx, err := db.BeginTxx(ctx, nil)
assert.NoError(t, err)
Expand Down Expand Up @@ -57,7 +56,7 @@ func TestTopups(t *testing.T) {
org, err := loadOrg(ctx, tx, tc.OrgID)
assert.NoError(t, err)

topup, err := AllocateTopups(ctx, tx, rc, org, 1)
topup, err := AllocateTopups(ctx, tx, rp, org, 1)
assert.NoError(t, err)
assert.Equal(t, tc.TopupID, topup)
tx.MustExec(`INSERT INTO orgs_topupcredits(is_squashed, used, topup_id) VALUES(TRUE, 1, $1)`, tc.OrgID)
Expand All @@ -66,7 +65,7 @@ func TestTopups(t *testing.T) {
// topups can be disabled for orgs
tx.MustExec(`UPDATE orgs_org SET uses_topups = FALSE WHERE id = $1`, Org1)
org, err := loadOrg(ctx, tx, Org1)
topup, err := AllocateTopups(ctx, tx, rc, org, 1)
topup, err := AllocateTopups(ctx, tx, rp, org, 1)
assert.NoError(t, err)
assert.Equal(t, NilTopupID, topup)
}
17 changes: 0 additions & 17 deletions tasks/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,6 @@ func TestMsgEvents(t *testing.T) {

db.Get(&text, `SELECT text FROM msgs_msg WHERE contact_id = $1 AND direction = 'O' AND created_on > $2 ORDER BY id DESC LIMIT 1`, models.Org2FredID, previous)
assert.Equal(t, "Hey, how are you?", text)

// restore flow
db.MustExec(`UPDATE flows_flow SET is_active = TRUE where id = $1`, models.Org2FavoritesFlowID)
models.FlushCache()

// suspend our org
db.MustExec(`UPDATE orgs_org SET is_suspended = TRUE WHERE id = $1`, models.Org2)

// message should be handled as an inbox message.. no new session
task = makeMsgTask(models.Org2, models.Org2ChannelID, models.Org2FredID, models.Org2FredURN, models.Org2FredURNID, "start")
AddHandleTask(rc, models.Org2FredID, task)
task, _ = queue.PopNextTask(rc, queue.HandlerQueue)
err = handleContactEvent(ctx, db, rp, task)
assert.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) from msgs_msg WHERE status = 'H' AND msg_type = 'I'`, nil, 1)
testsuite.AssertQueryCount(t, db, `SELECT count(*) from flows_flowsession where contact_id = $1`, []interface{}{models.Org2FredID}, 7)
}

func TestChannelEvents(t *testing.T) {
Expand Down
17 changes: 3 additions & 14 deletions tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,21 +463,10 @@ func handleMsgEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, event *Msg
return errors.Wrapf(err, "error loading org")
}

// if org is suspended, just send message to inbox
if oa.Org().Suspended() {
err := models.UpdateMessage(ctx, db, event.MsgID, models.MsgStatusHandled, models.VisibilityVisible, models.TypeInbox, models.NilTopupID)
if err != nil {
return errors.Wrapf(err, "error updating message for suspended org")
}
return nil
}

// find the topup for this message
rc := rp.Get()
topupID, err := models.AllocateTopups(ctx, db, rc, oa.Org(), 1)
rc.Close()
// allocate a topup for this message if org uses topups
topupID, err := models.AllocateTopups(ctx, db, rp, oa.Org(), 1)
if err != nil {
return errors.Wrapf(err, "error calculating topup for msg")
return errors.Wrapf(err, "error allocating topup for incoming message")
}

// load our contact
Expand Down

0 comments on commit 4f527e6

Please sign in to comment.