diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index d46783270..ac2376ed7 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -39,19 +39,19 @@ func init() { if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*time.Duration(rt.Config.IVRRetryTimeout)) defer cancel() - return retryCallsInWorkerPool(ctx, rt) + return RetryCallsInWorkerPool(ctx, rt) } return nil }) - mailroom.RegisterCron(clearIVRLock, time.Hour, false, clearStuckChannelConnections) + mailroom.RegisterCron(clearIVRLock, time.Hour, false, ClearStuckChannelConnections) mailroom.RegisterCron(changeMaxConnNightLock, time.Minute*10, false, func(ctx context.Context, rt *runtime.Runtime) error { currentHour := time.Now().In(location).Hour() if currentHour >= rt.Config.IVRStopHour || currentHour < rt.Config.IVRStartHour { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() - return changeMaxConnectionsConfig(ctx, rt, "TW", 0) + return ChangeMaxConnectionsConfig(ctx, rt, "TW", 0) } return nil }) @@ -61,7 +61,7 @@ func init() { if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() - return changeMaxConnectionsConfig(ctx, rt, "TW", rt.Config.MaxConcurrentEvents) + return ChangeMaxConnectionsConfig(ctx, rt, "TW", rt.Config.MaxConcurrentEvents) } return nil }) @@ -71,7 +71,7 @@ func init() { if currentHour == rt.Config.IVRCancelCronStartHour { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20) defer cancel() - return cancelCalls(ctx, rt) + return CancelCalls(ctx, rt) } return nil }) @@ -79,7 +79,7 @@ func init() { } // retryCallsInWorkerPoll looks for calls that need to be retried and retries then -func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error { +func RetryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error { log := logrus.WithField("comp", "ivr_cron_retryer") start := time.Now() @@ -90,7 +90,7 @@ func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error { var jobs []Job for i := 0; i < len(conns); i++ { - jobs = append(jobs, Job{Id: i, conn: conns[i]}) + jobs = append(jobs, Job{Id: i, Conn: conns[i]}) } var ( @@ -101,7 +101,7 @@ func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error { wg.Add(rt.Config.IVRRetryWorkers) for i := 0; i < rt.Config.IVRRetryWorkers; i++ { - go handleWork(i, rt, &wg, jobChannel) + go HandleWork(i, rt, &wg, jobChannel) } for _, job := range jobs { @@ -116,8 +116,8 @@ func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error { return nil } -// retryCalls looks for calls that need to be retried and retries them -func retryCalls(ctx context.Context, rt *runtime.Runtime) error { +// RetryCalls looks for calls that need to be retried and retries them +func RetryCalls(ctx context.Context, rt *runtime.Runtime) error { log := logrus.WithField("comp", "ivr_cron_retryer") start := time.Now() @@ -183,7 +183,7 @@ func retryCalls(ctx context.Context, rt *runtime.Runtime) error { return nil } -func clearStuckChannelConnections(ctx context.Context, rt *runtime.Runtime) error { +func ClearStuckChannelConnections(ctx context.Context, rt *runtime.Runtime) error { log := logrus.WithField("comp", "ivr_cron_cleaner") start := time.Now() @@ -205,7 +205,7 @@ func clearStuckChannelConnections(ctx context.Context, rt *runtime.Runtime) erro return nil } -func cancelCalls(ctx context.Context, rt *runtime.Runtime) error { +func CancelCalls(ctx context.Context, rt *runtime.Runtime) error { log := logrus.WithField("comp", "ivr_cron_canceler") start := time.Now() ctx, cancel := context.WithTimeout(ctx, time.Minute*15) @@ -226,7 +226,7 @@ func cancelCalls(ctx context.Context, rt *runtime.Runtime) error { return nil } -func changeMaxConnectionsConfig(ctx context.Context, rt *runtime.Runtime, channelType string, maxConcurrentEventsToSet int) error { +func ChangeMaxConnectionsConfig(ctx context.Context, rt *runtime.Runtime, channelType string, maxConcurrentEventsToSet int) error { log := logrus.WithField("comp", "ivr_cron_change_max_connections") start := time.Now() diff --git a/core/tasks/ivr/cron_test.go b/core/tasks/ivr/cron_test.go index 7df47eec5..f700b9cba 100644 --- a/core/tasks/ivr/cron_test.go +++ b/core/tasks/ivr/cron_test.go @@ -82,13 +82,13 @@ func TestRetryCallsInWorkerPool(t *testing.T) { defer testsuite.Reset(testsuite.ResetAll) // register our mock client - ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + ivr.RegisterServiceType(models.ChannelType("ZZ"), NewMockProvider) // update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1 db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) // create a flow start for cathy - start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID). WithContactIDs([]models.ContactID{testdata.Cathy.ID}) // call our master starter @@ -104,19 +104,19 @@ func TestRetryCallsInWorkerPool(t *testing.T) { client.callError = nil client.callID = ivr.CallID("call1") - err = HandleFlowStartBatch(ctx, rt, batch) + err = ivrtasks.HandleFlowStartBatch(ctx, rt, batch) assert.NoError(t, err) - testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) // change our call to be errored instead of wired db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) - err = retryCallsInWorkerPool(ctx, rt) + err = ivrtasks.RetryCallsInWorkerPool(ctx, rt) assert.NoError(t, err) // should now be in wired state - testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) // back to retry and make the channel inactive @@ -124,11 +124,11 @@ func TestRetryCallsInWorkerPool(t *testing.T) { db.MustExec(`UPDATE channels_channel SET is_active = FALSE WHERE id = $1`, testdata.TwilioChannel.ID) models.FlushCache() - err = retryCallsInWorkerPool(ctx, rt) + err = ivrtasks.RetryCallsInWorkerPool(ctx, rt) assert.NoError(t, err) // this time should be failed - testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) } @@ -139,11 +139,11 @@ func TestClearConnections(t *testing.T) { defer testsuite.Reset(testsuite.ResetAll) - ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + ivr.RegisterServiceType(models.ChannelType("ZZ"), NewMockProvider) db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) - start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID). WithContactIDs([]models.ContactID{testdata.Cathy.ID}) // call our master starter @@ -158,9 +158,9 @@ func TestClearConnections(t *testing.T) { client.callError = nil client.callID = ivr.CallID("call1") - err = HandleFlowStartBatch(ctx, rt, batch) + err = ivrtasks.HandleFlowStartBatch(ctx, rt, batch) assert.NoError(t, err) - testsuite.AssertQuery(t, db, + assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusWired, "call1", ).Returns(1) @@ -171,11 +171,11 @@ func TestClearConnections(t *testing.T) { ) // cleaning - err = clearStuckedChannelConnections(ctx, rt, "cleaner_test") + err = ivrtasks.ClearStuckChannelConnections(ctx, rt) assert.NoError(t, err) // status should be Failed - testsuite.AssertQuery(t, db, + assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusFailed, "call1", ).Returns(1) @@ -189,13 +189,13 @@ func TestUpdateMaxChannelsConnection(t *testing.T) { defer testsuite.Reset(testsuite.ResetAll) // register our mock client - ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + ivr.RegisterServiceType(models.ChannelType("ZZ"), NewMockProvider) //set max_concurrent_events to 1 db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) //set max_concurrent_events to 0 - err := changeMaxConnectionsConfig(ctx, rt, "change_max_connections", "ZZ", 0) + err := ivrtasks.ChangeMaxConnectionsConfig(ctx, rt, "ZZ", 0) assert.NoError(t, err) var confStr string err = db.QueryRowx("SELECT config FROM channels_channel WHERE id = $1", testdata.TwilioChannel.ID).Scan(&confStr) @@ -206,7 +206,7 @@ func TestUpdateMaxChannelsConnection(t *testing.T) { assert.Equal(t, 0, int(conf["max_concurrent_events"].(float64))) // create a flow start for cathy - start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID). WithContactIDs([]models.ContactID{testdata.Cathy.ID}) // call our master starter err = starts.CreateFlowBatches(ctx, rt, start) @@ -221,13 +221,13 @@ func TestUpdateMaxChannelsConnection(t *testing.T) { client.callError = nil client.callID = ivr.CallID("call1") - err = HandleFlowStartBatch(ctx, rt, batch) + err = ivrtasks.HandleFlowStartBatch(ctx, rt, batch) assert.NoError(t, err) - testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`, + assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`, testdata.Cathy.ID, models.ConnectionStatusQueued).Returns(1) //set max_concurrent_events to 500 - err = changeMaxConnectionsConfig(ctx, rt, "change_max_connections", "ZZ", 500) + err = ivrtasks.ChangeMaxConnectionsConfig(ctx, rt, "ZZ", 500) assert.NoError(t, err) err = db.QueryRowx("SELECT config FROM channels_channel WHERE id = $1", testdata.TwilioChannel.ID).Scan(&confStr) assert.NoError(t, err) @@ -242,9 +242,9 @@ func TestUpdateMaxChannelsConnection(t *testing.T) { db.MustExec("SELECT pg_sleep(5)") - err = retryCalls(ctx, rt) + err = ivrtasks.RetryCalls(ctx, rt) assert.NoError(t, err) - testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`, + assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`, testdata.Cathy.ID, models.ConnectionStatusWired).Returns(1) } diff --git a/core/tasks/ivr/retryworker.go b/core/tasks/ivr/retryworker.go index fa4c49217..57ba94329 100644 --- a/core/tasks/ivr/retryworker.go +++ b/core/tasks/ivr/retryworker.go @@ -15,14 +15,14 @@ import ( type Job struct { Id int - conn *models.ChannelConnection + Conn *models.ChannelConnection } type JobResult struct { Output string } -func handleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-chan Job) { +func HandleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-chan Job) { defer wg.Done() lastExecutionTime := time.Now() minimumTimeBetweenEachExecution := time.Duration(math.Ceil(1e9 / float64(rt.Config.IVRRetryMaximumExecutionsPerSecond))) @@ -36,14 +36,14 @@ func handleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-ch logrus.Infof("Worker #%d not backing off \n", id) } lastExecutionTime = time.Now() - err := retryCall(id, rt, job.conn) + err := RetryCall(id, rt, job.Conn) if err != nil { logrus.Error(err) } } } -func retryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) error { +func RetryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() oa, err := models.GetOrgAssets(ctx, rt, conn.OrgID()) diff --git a/core/tasks/ivr/retryworker_test.go b/core/tasks/ivr/retryworker_test.go index fa0a94672..aee146de9 100644 --- a/core/tasks/ivr/retryworker_test.go +++ b/core/tasks/ivr/retryworker_test.go @@ -1,13 +1,15 @@ -package ivr +package ivr_test import ( "encoding/json" "sync" "testing" + "github.com/nyaruka/gocommon/dbutil/assertdb" "github.com/nyaruka/mailroom/core/ivr" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/core/queue" + ivrtasks "github.com/nyaruka/mailroom/core/tasks/ivr" "github.com/nyaruka/mailroom/core/tasks/starts" "github.com/nyaruka/mailroom/testsuite" "github.com/nyaruka/mailroom/testsuite/testdata" @@ -23,13 +25,13 @@ func TestHandleWork(t *testing.T) { defer testsuite.Reset(testsuite.ResetAll) // register our mock client - ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + ivr.RegisterServiceType(models.ChannelType("ZZ"), NewMockProvider) // update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1 db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) // create a flow start for cathy - start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID). WithContactIDs([]models.ContactID{testdata.Cathy.ID}) // call our master starter @@ -45,9 +47,9 @@ func TestHandleWork(t *testing.T) { client.callError = nil client.callID = ivr.CallID("call1") - err = HandleFlowStartBatch(ctx, rt, batch) + err = ivrtasks.HandleFlowStartBatch(ctx, rt, batch) assert.NoError(t, err) - testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) // change our call to be errored instead of wired @@ -57,11 +59,11 @@ func TestHandleWork(t *testing.T) { assert.NoError(t, err) assert.Equal(t, len(conns), 1) - err = retryCall(0, rt, conns[0]) + err = ivrtasks.RetryCall(0, rt, conns[0]) assert.NoError(t, err) // should now be in wired state - testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) // back to retry and make the channel inactive @@ -74,15 +76,15 @@ func TestHandleWork(t *testing.T) { assert.NoError(t, err) assert.Equal(t, len(conns), 1) - jobs := []Job{{Id: 1, conn: conns[0]}} + jobs := []ivrtasks.Job{{Id: 1, Conn: conns[0]}} var ( wg sync.WaitGroup - jobChannel = make(chan Job) + jobChannel = make(chan ivrtasks.Job) ) wg.Add(1) - go handleWork(0, rt, &wg, jobChannel) + go ivrtasks.HandleWork(0, rt, &wg, jobChannel) for _, job := range jobs { jobChannel <- job @@ -92,6 +94,6 @@ func TestHandleWork(t *testing.T) { wg.Wait() // this time should be failed - testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) }