Skip to content

Commit

Permalink
fix ivr tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rasoro committed Jul 28, 2023
1 parent 5a72126 commit f1ba44f
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 50 deletions.
26 changes: 13 additions & 13 deletions core/tasks/ivr/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ func init() {
if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*time.Duration(rt.Config.IVRRetryTimeout))
defer cancel()
return retryCallsInWorkerPool(ctx, rt)
return RetryCallsInWorkerPool(ctx, rt)
}
return nil
})

mailroom.RegisterCron(clearIVRLock, time.Hour, false, clearStuckChannelConnections)
mailroom.RegisterCron(clearIVRLock, time.Hour, false, ClearStuckChannelConnections)

mailroom.RegisterCron(changeMaxConnNightLock, time.Minute*10, false, func(ctx context.Context, rt *runtime.Runtime) error {
currentHour := time.Now().In(location).Hour()
if currentHour >= rt.Config.IVRStopHour || currentHour < rt.Config.IVRStartHour {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
return changeMaxConnectionsConfig(ctx, rt, "TW", 0)
return ChangeMaxConnectionsConfig(ctx, rt, "TW", 0)
}
return nil
})
Expand All @@ -61,7 +61,7 @@ func init() {
if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
return changeMaxConnectionsConfig(ctx, rt, "TW", rt.Config.MaxConcurrentEvents)
return ChangeMaxConnectionsConfig(ctx, rt, "TW", rt.Config.MaxConcurrentEvents)
}
return nil
})
Expand All @@ -71,15 +71,15 @@ func init() {
if currentHour == rt.Config.IVRCancelCronStartHour {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20)
defer cancel()
return cancelCalls(ctx, rt)
return CancelCalls(ctx, rt)
}
return nil
})

}

// retryCallsInWorkerPoll looks for calls that need to be retried and retries then
func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error {
func RetryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error {
log := logrus.WithField("comp", "ivr_cron_retryer")
start := time.Now()

Expand All @@ -90,7 +90,7 @@ func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error {

var jobs []Job
for i := 0; i < len(conns); i++ {
jobs = append(jobs, Job{Id: i, conn: conns[i]})
jobs = append(jobs, Job{Id: i, Conn: conns[i]})
}

var (
Expand All @@ -101,7 +101,7 @@ func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error {
wg.Add(rt.Config.IVRRetryWorkers)

for i := 0; i < rt.Config.IVRRetryWorkers; i++ {
go handleWork(i, rt, &wg, jobChannel)
go HandleWork(i, rt, &wg, jobChannel)
}

for _, job := range jobs {
Expand All @@ -116,8 +116,8 @@ func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error {
return nil
}

// retryCalls looks for calls that need to be retried and retries them
func retryCalls(ctx context.Context, rt *runtime.Runtime) error {
// RetryCalls looks for calls that need to be retried and retries them
func RetryCalls(ctx context.Context, rt *runtime.Runtime) error {
log := logrus.WithField("comp", "ivr_cron_retryer")
start := time.Now()

Expand Down Expand Up @@ -183,7 +183,7 @@ func retryCalls(ctx context.Context, rt *runtime.Runtime) error {
return nil
}

func clearStuckChannelConnections(ctx context.Context, rt *runtime.Runtime) error {
func ClearStuckChannelConnections(ctx context.Context, rt *runtime.Runtime) error {
log := logrus.WithField("comp", "ivr_cron_cleaner")
start := time.Now()

Expand All @@ -205,7 +205,7 @@ func clearStuckChannelConnections(ctx context.Context, rt *runtime.Runtime) erro
return nil
}

func cancelCalls(ctx context.Context, rt *runtime.Runtime) error {
func CancelCalls(ctx context.Context, rt *runtime.Runtime) error {
log := logrus.WithField("comp", "ivr_cron_canceler")
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
Expand All @@ -226,7 +226,7 @@ func cancelCalls(ctx context.Context, rt *runtime.Runtime) error {
return nil
}

func changeMaxConnectionsConfig(ctx context.Context, rt *runtime.Runtime, channelType string, maxConcurrentEventsToSet int) error {
func ChangeMaxConnectionsConfig(ctx context.Context, rt *runtime.Runtime, channelType string, maxConcurrentEventsToSet int) error {
log := logrus.WithField("comp", "ivr_cron_change_max_connections")
start := time.Now()

Expand Down
44 changes: 22 additions & 22 deletions core/tasks/ivr/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ func TestRetryCallsInWorkerPool(t *testing.T) {
defer testsuite.Reset(testsuite.ResetAll)

// register our mock client
ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider)
ivr.RegisterServiceType(models.ChannelType("ZZ"), NewMockProvider)

// update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1
db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

// create a flow start for cathy
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

// call our master starter
Expand All @@ -104,31 +104,31 @@ func TestRetryCallsInWorkerPool(t *testing.T) {

client.callError = nil
client.callID = ivr.CallID("call1")
err = HandleFlowStartBatch(ctx, rt, batch)
err = ivrtasks.HandleFlowStartBatch(ctx, rt, batch)
assert.NoError(t, err)
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1)

// change our call to be errored instead of wired
db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`)

err = retryCallsInWorkerPool(ctx, rt)
err = ivrtasks.RetryCallsInWorkerPool(ctx, rt)
assert.NoError(t, err)

// should now be in wired state
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1)

// back to retry and make the channel inactive
db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`)
db.MustExec(`UPDATE channels_channel SET is_active = FALSE WHERE id = $1`, testdata.TwilioChannel.ID)

models.FlushCache()
err = retryCallsInWorkerPool(ctx, rt)
err = ivrtasks.RetryCallsInWorkerPool(ctx, rt)
assert.NoError(t, err)

// this time should be failed
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1)
}

Expand All @@ -139,11 +139,11 @@ func TestClearConnections(t *testing.T) {

defer testsuite.Reset(testsuite.ResetAll)

ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider)
ivr.RegisterServiceType(models.ChannelType("ZZ"), NewMockProvider)

db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

// call our master starter
Expand All @@ -158,9 +158,9 @@ func TestClearConnections(t *testing.T) {

client.callError = nil
client.callID = ivr.CallID("call1")
err = HandleFlowStartBatch(ctx, rt, batch)
err = ivrtasks.HandleFlowStartBatch(ctx, rt, batch)
assert.NoError(t, err)
testsuite.AssertQuery(t, db,
assertdb.Query(t, db,
`SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1",
).Returns(1)
Expand All @@ -171,11 +171,11 @@ func TestClearConnections(t *testing.T) {
)

// cleaning
err = clearStuckedChannelConnections(ctx, rt, "cleaner_test")
err = ivrtasks.ClearStuckChannelConnections(ctx, rt)
assert.NoError(t, err)

// status should be Failed
testsuite.AssertQuery(t, db,
assertdb.Query(t, db,
`SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusFailed, "call1",
).Returns(1)
Expand All @@ -189,13 +189,13 @@ func TestUpdateMaxChannelsConnection(t *testing.T) {
defer testsuite.Reset(testsuite.ResetAll)

// register our mock client
ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider)
ivr.RegisterServiceType(models.ChannelType("ZZ"), NewMockProvider)

//set max_concurrent_events to 1
db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

//set max_concurrent_events to 0
err := changeMaxConnectionsConfig(ctx, rt, "change_max_connections", "ZZ", 0)
err := ivrtasks.ChangeMaxConnectionsConfig(ctx, rt, "ZZ", 0)
assert.NoError(t, err)
var confStr string
err = db.QueryRowx("SELECT config FROM channels_channel WHERE id = $1", testdata.TwilioChannel.ID).Scan(&confStr)
Expand All @@ -206,7 +206,7 @@ func TestUpdateMaxChannelsConnection(t *testing.T) {
assert.Equal(t, 0, int(conf["max_concurrent_events"].(float64)))

// create a flow start for cathy
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})
// call our master starter
err = starts.CreateFlowBatches(ctx, rt, start)
Expand All @@ -221,13 +221,13 @@ func TestUpdateMaxChannelsConnection(t *testing.T) {

client.callError = nil
client.callID = ivr.CallID("call1")
err = HandleFlowStartBatch(ctx, rt, batch)
err = ivrtasks.HandleFlowStartBatch(ctx, rt, batch)
assert.NoError(t, err)
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`,
assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`,
testdata.Cathy.ID, models.ConnectionStatusQueued).Returns(1)

//set max_concurrent_events to 500
err = changeMaxConnectionsConfig(ctx, rt, "change_max_connections", "ZZ", 500)
err = ivrtasks.ChangeMaxConnectionsConfig(ctx, rt, "ZZ", 500)
assert.NoError(t, err)
err = db.QueryRowx("SELECT config FROM channels_channel WHERE id = $1", testdata.TwilioChannel.ID).Scan(&confStr)
assert.NoError(t, err)
Expand All @@ -242,9 +242,9 @@ func TestUpdateMaxChannelsConnection(t *testing.T) {

db.MustExec("SELECT pg_sleep(5)")

err = retryCalls(ctx, rt)
err = ivrtasks.RetryCalls(ctx, rt)
assert.NoError(t, err)

testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`,
assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`,
testdata.Cathy.ID, models.ConnectionStatusWired).Returns(1)
}
8 changes: 4 additions & 4 deletions core/tasks/ivr/retryworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (

type Job struct {
Id int
conn *models.ChannelConnection
Conn *models.ChannelConnection
}

type JobResult struct {
Output string
}

func handleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-chan Job) {
func HandleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-chan Job) {
defer wg.Done()
lastExecutionTime := time.Now()
minimumTimeBetweenEachExecution := time.Duration(math.Ceil(1e9 / float64(rt.Config.IVRRetryMaximumExecutionsPerSecond)))
Expand All @@ -36,14 +36,14 @@ func handleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-ch
logrus.Infof("Worker #%d not backing off \n", id)
}
lastExecutionTime = time.Now()
err := retryCall(id, rt, job.conn)
err := RetryCall(id, rt, job.Conn)
if err != nil {
logrus.Error(err)
}
}
}

func retryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) error {
func RetryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
oa, err := models.GetOrgAssets(ctx, rt, conn.OrgID())
Expand Down
24 changes: 13 additions & 11 deletions core/tasks/ivr/retryworker_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package ivr
package ivr_test

import (
"encoding/json"
"sync"
"testing"

"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/mailroom/core/ivr"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
ivrtasks "github.com/nyaruka/mailroom/core/tasks/ivr"
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
Expand All @@ -23,13 +25,13 @@ func TestHandleWork(t *testing.T) {
defer testsuite.Reset(testsuite.ResetAll)

// register our mock client
ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider)
ivr.RegisterServiceType(models.ChannelType("ZZ"), NewMockProvider)

// update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1
db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID)

// create a flow start for cathy
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive).
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

// call our master starter
Expand All @@ -45,9 +47,9 @@ func TestHandleWork(t *testing.T) {

client.callError = nil
client.callID = ivr.CallID("call1")
err = HandleFlowStartBatch(ctx, rt, batch)
err = ivrtasks.HandleFlowStartBatch(ctx, rt, batch)
assert.NoError(t, err)
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1)

// change our call to be errored instead of wired
Expand All @@ -57,11 +59,11 @@ func TestHandleWork(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, len(conns), 1)

err = retryCall(0, rt, conns[0])
err = ivrtasks.RetryCall(0, rt, conns[0])
assert.NoError(t, err)

// should now be in wired state
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1)

// back to retry and make the channel inactive
Expand All @@ -74,15 +76,15 @@ func TestHandleWork(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, len(conns), 1)

jobs := []Job{{Id: 1, conn: conns[0]}}
jobs := []ivrtasks.Job{{Id: 1, Conn: conns[0]}}

var (
wg sync.WaitGroup
jobChannel = make(chan Job)
jobChannel = make(chan ivrtasks.Job)
)
wg.Add(1)

go handleWork(0, rt, &wg, jobChannel)
go ivrtasks.HandleWork(0, rt, &wg, jobChannel)

for _, job := range jobs {
jobChannel <- job
Expand All @@ -92,6 +94,6 @@ func TestHandleWork(t *testing.T) {
wg.Wait()

// this time should be failed
testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1)
}

0 comments on commit f1ba44f

Please sign in to comment.