diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index 1a4769ba7..f8858acfc 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -44,7 +44,7 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error func() error { currentHour := time.Now().In(location).Hour() if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*time.Duration(rt.Config.IVRRetryTimeout)) defer cancel() return retryCallsInWorkerPool(ctx, rt) } @@ -112,9 +112,6 @@ func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error { log := logrus.WithField("comp", "ivr_cron_retryer") start := time.Now() - ctx, cancel := context.WithTimeout(ctx, time.Minute*10) - defer cancel() - conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, rt.Config.IVRConnRetryLimit) if err != nil { return errors.Wrapf(err, "error loading connections to retry") diff --git a/core/tasks/ivr/cron_test.go b/core/tasks/ivr/cron_test.go index add1ac1b3..ec515e5f0 100644 --- a/core/tasks/ivr/cron_test.go +++ b/core/tasks/ivr/cron_test.go @@ -73,6 +73,64 @@ func TestRetries(t *testing.T) { testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) } +func TestRetryCallsInWorkerPool(t *testing.T) { + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetAll) + + // register our mock client + ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + + // update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1 + db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) + + // create a flow start for cathy + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + WithContactIDs([]models.ContactID{testdata.Cathy.ID}) + + // call our master starter + err := starts.CreateFlowBatches(ctx, rt, start) + assert.NoError(t, err) + + // should have one task in our ivr queue + task, err := queue.PopNextTask(rc, queue.HandlerQueue) + assert.NoError(t, err) + batch := &models.FlowStartBatch{} + err = json.Unmarshal(task.Task, batch) + assert.NoError(t, err) + + client.callError = nil + client.callID = ivr.CallID("call1") + err = HandleFlowStartBatch(ctx, rt, batch) + assert.NoError(t, err) + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // change our call to be errored instead of wired + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + + err = retryCallsInWorkerPool(ctx, rt) + assert.NoError(t, err) + + // should now be in wired state + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // back to retry and make the channel inactive + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + db.MustExec(`UPDATE channels_channel SET is_active = FALSE WHERE id = $1`, testdata.TwilioChannel.ID) + + models.FlushCache() + err = retryCallsInWorkerPool(ctx, rt) + assert.NoError(t, err) + + // this time should be failed + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) +} + func TestClearConnections(t *testing.T) { ctx, rt, db, rp := testsuite.Get() rc := rp.Get() diff --git a/core/tasks/ivr/retryworker.go b/core/tasks/ivr/retryworker.go index b2bb449e0..fa4c49217 100644 --- a/core/tasks/ivr/retryworker.go +++ b/core/tasks/ivr/retryworker.go @@ -2,7 +2,6 @@ package ivr import ( "context" - "fmt" "math" "sync" "time" @@ -10,6 +9,7 @@ import ( "github.com/nyaruka/mailroom/core/ivr" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/runtime" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -30,46 +30,45 @@ func handleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-ch for job := range jobChannel { timeUntilNextExecution := -(time.Since(lastExecutionTime) - minimumTimeBetweenEachExecution) if timeUntilNextExecution > 0 { - fmt.Printf("Worker #%d backing off for %s\n", id, timeUntilNextExecution) + logrus.Infof("Worker #%d backing off for %s\n", id, timeUntilNextExecution) time.Sleep(timeUntilNextExecution) } else { - fmt.Printf("Worker #%d not backing off \n", id) + logrus.Infof("Worker #%d not backing off \n", id) } lastExecutionTime = time.Now() - retryCall(id, rt, job.conn) + err := retryCall(id, rt, job.conn) + if err != nil { + logrus.Error(err) + } } } -func retryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) JobResult { - log := logrus.WithField("connection_id", conn.ID()) +func retryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() oa, err := models.GetOrgAssets(ctx, rt, conn.OrgID()) if err != nil { - log.WithError(err).WithField("org_id", conn.OrgID()).Error("erroro loading org") - return JobResult{Output: "Fail"} + return errors.Wrapf(err, "error loading org with id %v", conn.OrgID()) } channel := oa.ChannelByID(conn.ChannelID()) if channel == nil { err = models.UpdateChannelConnectionStatuses(ctx, rt.DB, []models.ConnectionID{conn.ID()}, models.ConnectionStatusFailed) if err != nil { - log.WithError(err).WithField("channel_id", conn.ChannelID()).Error("error marking call as failed due to missing channel") + return errors.Wrapf(err, "error marking call as failed due to missing channel with id %v", conn.ChannelID()) } - return JobResult{Output: "Fail"} + return err } urn, err := models.URNForID(ctx, rt.DB, oa, conn.ContactURNID()) if err != nil { - log.WithError(err).WithField("urn_id", conn.ContactURNID()).Error("unable to load contact urn") - return JobResult{Output: "Fail"} + return errors.Wrapf(err, "unable to load contact urn for urn_id %v", conn.ContactURNID()) } err = ivr.RequestCallStartForConnection(ctx, rt, channel, urn, conn) if err != nil { - log.WithError(err).Error(err) - return JobResult{Output: "Fail"} + return err } - return JobResult{Output: "Success"} + return nil } diff --git a/core/tasks/ivr/retryworker_test.go b/core/tasks/ivr/retryworker_test.go new file mode 100644 index 000000000..fa0a94672 --- /dev/null +++ b/core/tasks/ivr/retryworker_test.go @@ -0,0 +1,97 @@ +package ivr + +import ( + "encoding/json" + "sync" + "testing" + + "github.com/nyaruka/mailroom/core/ivr" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/queue" + "github.com/nyaruka/mailroom/core/tasks/starts" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + + "github.com/stretchr/testify/assert" +) + +func TestHandleWork(t *testing.T) { + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetAll) + + // register our mock client + ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + + // update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1 + db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) + + // create a flow start for cathy + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + WithContactIDs([]models.ContactID{testdata.Cathy.ID}) + + // call our master starter + err := starts.CreateFlowBatches(ctx, rt, start) + assert.NoError(t, err) + + // should have one task in our ivr queue + task, err := queue.PopNextTask(rc, queue.HandlerQueue) + assert.NoError(t, err) + batch := &models.FlowStartBatch{} + err = json.Unmarshal(task.Task, batch) + assert.NoError(t, err) + + client.callError = nil + client.callID = ivr.CallID("call1") + err = HandleFlowStartBatch(ctx, rt, batch) + assert.NoError(t, err) + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // change our call to be errored instead of wired + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + + conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, 1) + assert.NoError(t, err) + assert.Equal(t, len(conns), 1) + + err = retryCall(0, rt, conns[0]) + assert.NoError(t, err) + + // should now be in wired state + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // back to retry and make the channel inactive + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + db.MustExec(`UPDATE channels_channel SET is_active = FALSE WHERE id = $1`, testdata.TwilioChannel.ID) + + models.FlushCache() + + conns, err = models.LoadChannelConnectionsToRetry(ctx, rt.DB, 1) + assert.NoError(t, err) + assert.Equal(t, len(conns), 1) + + jobs := []Job{{Id: 1, conn: conns[0]}} + + var ( + wg sync.WaitGroup + jobChannel = make(chan Job) + ) + wg.Add(1) + + go handleWork(0, rt, &wg, jobChannel) + + for _, job := range jobs { + jobChannel <- job + } + + close(jobChannel) + wg.Wait() + + // this time should be failed + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) +} diff --git a/core/tasks/ivr/worker.go b/core/tasks/ivr/worker.go index e45030959..b1b8a17ad 100644 --- a/core/tasks/ivr/worker.go +++ b/core/tasks/ivr/worker.go @@ -94,7 +94,7 @@ func HandleFlowStartBatch(bg context.Context, rt *runtime.Runtime, batch *models if nextExecutionTime > 0 { logrus.WithFields(logrus.Fields{ "elapsed": time.Since(time.Now()), - }).Info("backing off call start for %s", nextExecutionTime) + }).Infof("backing off call start for %v", nextExecutionTime) time.Sleep(nextExecutionTime) } diff --git a/runtime/config.go b/runtime/config.go index 2c68f55ad..f83b31d5d 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -140,7 +140,7 @@ func NewDefaultConfig() *Config { IVRStopHour: 21, IVRTimeZone: "Asia/Kolkata", IVRCancelCronStartHour: 22, - IVRFlowStartBatchTimeout: 10, + IVRFlowStartBatchTimeout: 15, IVRFlowStartBatchExecutionsPerSecond: 50, IVRConnRetryLimit: 500,