Skip to content

Commit

Permalink
ivr retry workers tweak and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rasoro committed Jan 24, 2023
1 parent bcda628 commit 7b8c267
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 21 deletions.
5 changes: 1 addition & 4 deletions core/tasks/ivr/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error
func() error {
currentHour := time.Now().In(location).Hour()
if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*time.Duration(rt.Config.IVRRetryTimeout))
defer cancel()
return retryCallsInWorkerPool(ctx, rt)
}
Expand Down Expand Up @@ -112,9 +112,6 @@ func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error {
log := logrus.WithField("comp", "ivr_cron_retryer")
start := time.Now()

ctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, rt.Config.IVRConnRetryLimit)
if err != nil {
return errors.Wrapf(err, "error loading connections to retry")
Expand Down
58 changes: 58 additions & 0 deletions core/tasks/ivr/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,64 @@ func TestRetries(t *testing.T) {
testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1)
}

func TestRetryCallsInWorkerPool(t *testing.T) {
ctx, rt, db, rp := testsuite.Get()
rc := rp.Get()
defer rc.Close()

defer testsuite.Reset(testsuite.ResetAll)

// register our mock client
ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider)

// update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1
db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

// create a flow start for cathy
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

// call our master starter
err := starts.CreateFlowBatches(ctx, rt, start)
assert.NoError(t, err)

// should have one task in our ivr queue
task, err := queue.PopNextTask(rc, queue.HandlerQueue)
assert.NoError(t, err)
batch := &models.FlowStartBatch{}
err = json.Unmarshal(task.Task, batch)
assert.NoError(t, err)

client.callError = nil
client.callID = ivr.CallID("call1")
err = HandleFlowStartBatch(ctx, rt, batch)
assert.NoError(t, err)
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1)

// change our call to be errored instead of wired
db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`)

err = retryCallsInWorkerPool(ctx, rt)
assert.NoError(t, err)

// should now be in wired state
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1)

// back to retry and make the channel inactive
db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`)
db.MustExec(`UPDATE channels_channel SET is_active = FALSE WHERE id = $1`, testdata.TwilioChannel.ID)

models.FlushCache()
err = retryCallsInWorkerPool(ctx, rt)
assert.NoError(t, err)

// this time should be failed
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1)
}

func TestClearConnections(t *testing.T) {
ctx, rt, db, rp := testsuite.Get()
rc := rp.Get()
Expand Down
29 changes: 14 additions & 15 deletions core/tasks/ivr/retryworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package ivr

import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/nyaruka/mailroom/core/ivr"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand All @@ -30,46 +30,45 @@ func handleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-ch
for job := range jobChannel {
timeUntilNextExecution := -(time.Since(lastExecutionTime) - minimumTimeBetweenEachExecution)
if timeUntilNextExecution > 0 {
fmt.Printf("Worker #%d backing off for %s\n", id, timeUntilNextExecution)
logrus.Infof("Worker #%d backing off for %s\n", id, timeUntilNextExecution)
time.Sleep(timeUntilNextExecution)
} else {
fmt.Printf("Worker #%d not backing off \n", id)
logrus.Infof("Worker #%d not backing off \n", id)
}
lastExecutionTime = time.Now()
retryCall(id, rt, job.conn)
err := retryCall(id, rt, job.conn)
if err != nil {
logrus.Error(err)
}
}
}

func retryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) JobResult {
log := logrus.WithField("connection_id", conn.ID())
func retryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
oa, err := models.GetOrgAssets(ctx, rt, conn.OrgID())
if err != nil {
log.WithError(err).WithField("org_id", conn.OrgID()).Error("erroro loading org")
return JobResult{Output: "Fail"}
return errors.Wrapf(err, "error loading org with id %v", conn.OrgID())
}

channel := oa.ChannelByID(conn.ChannelID())
if channel == nil {
err = models.UpdateChannelConnectionStatuses(ctx, rt.DB, []models.ConnectionID{conn.ID()}, models.ConnectionStatusFailed)
if err != nil {
log.WithError(err).WithField("channel_id", conn.ChannelID()).Error("error marking call as failed due to missing channel")
return errors.Wrapf(err, "error marking call as failed due to missing channel with id %v", conn.ChannelID())
}
return JobResult{Output: "Fail"}
return err
}

urn, err := models.URNForID(ctx, rt.DB, oa, conn.ContactURNID())
if err != nil {
log.WithError(err).WithField("urn_id", conn.ContactURNID()).Error("unable to load contact urn")
return JobResult{Output: "Fail"}
return errors.Wrapf(err, "unable to load contact urn for urn_id %v", conn.ContactURNID())
}

err = ivr.RequestCallStartForConnection(ctx, rt, channel, urn, conn)
if err != nil {
log.WithError(err).Error(err)
return JobResult{Output: "Fail"}
return err
}

return JobResult{Output: "Success"}
return nil
}
97 changes: 97 additions & 0 deletions core/tasks/ivr/retryworker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package ivr

import (
"encoding/json"
"sync"
"testing"

"github.com/nyaruka/mailroom/core/ivr"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

"github.com/stretchr/testify/assert"
)

func TestHandleWork(t *testing.T) {
ctx, rt, db, rp := testsuite.Get()
rc := rp.Get()
defer rc.Close()

defer testsuite.Reset(testsuite.ResetAll)

// register our mock client
ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider)

// update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1
db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

// create a flow start for cathy
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

// call our master starter
err := starts.CreateFlowBatches(ctx, rt, start)
assert.NoError(t, err)

// should have one task in our ivr queue
task, err := queue.PopNextTask(rc, queue.HandlerQueue)
assert.NoError(t, err)
batch := &models.FlowStartBatch{}
err = json.Unmarshal(task.Task, batch)
assert.NoError(t, err)

client.callError = nil
client.callID = ivr.CallID("call1")
err = HandleFlowStartBatch(ctx, rt, batch)
assert.NoError(t, err)
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1)

// change our call to be errored instead of wired
db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`)

conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, 1)
assert.NoError(t, err)
assert.Equal(t, len(conns), 1)

err = retryCall(0, rt, conns[0])
assert.NoError(t, err)

// should now be in wired state
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1)

// back to retry and make the channel inactive
db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`)
db.MustExec(`UPDATE channels_channel SET is_active = FALSE WHERE id = $1`, testdata.TwilioChannel.ID)

models.FlushCache()

conns, err = models.LoadChannelConnectionsToRetry(ctx, rt.DB, 1)
assert.NoError(t, err)
assert.Equal(t, len(conns), 1)

jobs := []Job{{Id: 1, conn: conns[0]}}

var (
wg sync.WaitGroup
jobChannel = make(chan Job)
)
wg.Add(1)

go handleWork(0, rt, &wg, jobChannel)

for _, job := range jobs {
jobChannel <- job
}

close(jobChannel)
wg.Wait()

// this time should be failed
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1)
}
2 changes: 1 addition & 1 deletion core/tasks/ivr/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func HandleFlowStartBatch(bg context.Context, rt *runtime.Runtime, batch *models
if nextExecutionTime > 0 {
logrus.WithFields(logrus.Fields{
"elapsed": time.Since(time.Now()),
}).Info("backing off call start for %s", nextExecutionTime)
}).Infof("backing off call start for %v", nextExecutionTime)
time.Sleep(nextExecutionTime)
}

Expand Down
2 changes: 1 addition & 1 deletion runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func NewDefaultConfig() *Config {
IVRStopHour: 21,
IVRTimeZone: "Asia/Kolkata",
IVRCancelCronStartHour: 22,
IVRFlowStartBatchTimeout: 10,
IVRFlowStartBatchTimeout: 15,
IVRFlowStartBatchExecutionsPerSecond: 50,

IVRConnRetryLimit: 500,
Expand Down

0 comments on commit 7b8c267

Please sign in to comment.