From 115cef43350fed27ffeff9997da6f9961b05104a Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Mon, 25 Jul 2022 18:49:23 -0300 Subject: [PATCH 1/8] add ivr crons adaptations and limitations --- core/ivr/ivr.go | 25 +++-- core/tasks/ivr/cron.go | 178 ++++++++++++++++++++++++++++++++++-- core/tasks/ivr/cron_test.go | 117 ++++++++++++++++++++++++ 3 files changed, 300 insertions(+), 20 deletions(-) diff --git a/core/ivr/ivr.go b/core/ivr/ivr.go index 2cb3ee09a..c8ed8c240 100644 --- a/core/ivr/ivr.go +++ b/core/ivr/ivr.go @@ -213,22 +213,19 @@ func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, cha if maxCalls != "" { maxCalls, _ := strconv.Atoi(maxCalls) - // max calls is set, lets see how many are currently active on this channel - if maxCalls > 0 { - count, err := models.ActiveChannelConnectionCount(ctx, rt.DB, channel.ID()) - if err != nil { - return errors.Wrapf(err, "error finding number of active channel connections") - } + count, err := models.ActiveChannelConnectionCount(ctx, rt.DB, channel.ID()) + if err != nil { + return errors.Wrapf(err, "error finding number of active channel connections") + } - // we are at max calls, do not move on - if count >= maxCalls { - logrus.WithField("channel_id", channel.ID()).Info("call being queued, max concurrent reached") - err := conn.MarkThrottled(ctx, rt.DB, time.Now()) - if err != nil { - return errors.Wrapf(err, "error marking connection as throttled") - } - return nil + // we are at max calls, do not move on + if count >= maxCalls { + logrus.WithField("channel_id", channel.ID()).Info("call being queued, max concurrent reached") + err := conn.MarkThrottled(ctx, rt.DB, time.Now()) + if err != nil { + return errors.Wrapf(err, "error marking connection as throttled") } + return nil } } diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index 09dd38e99..cbedb8677 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -2,6 +2,7 @@ package ivr import ( "context" + "encoding/json" "sync" "time" @@ -11,14 +12,24 @@ import ( "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/runtime" "github.com/nyaruka/mailroom/utils/cron" + "github.com/nyaruka/mailroom/utils/dbutil" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) const ( - retryIVRLock = "retry_ivr_calls" - expireIVRLock = "expire_ivr_calls" + retryIVRLock = "retry_ivr_calls" + expireIVRLock = "expire_ivr_calls" + clearIVRLock = "clear_ivr_connections" + changeMaxConnNightLock = "change_ivr_max_conn_night" + changeMaxConnDayLock = "change_ivr_max_conn_day" + clearQueuedPendingLock = "clear_ivr_queued_pending" +) + +const ( + startHour = 6 + finishHour = 22 ) func init() { @@ -27,22 +38,64 @@ func init() { // StartIVRCron starts our cron job of retrying errored calls func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error { + + location, err := time.LoadLocation("Asia/Kolkata") + if err != nil { + return err + } + cron.Start(quit, rt, retryIVRLock, time.Minute, false, func() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) - defer cancel() - return retryCalls(ctx, rt) + currentHour := time.Now().In(location).Hour() + if currentHour >= startHour && currentHour < finishHour { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + return retryCalls(ctx, rt) + } + return nil }, ) cron.Start(quit, rt, expireIVRLock, time.Minute, false, func() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() return expireCalls(ctx, rt) }, ) + cron.Start(quit, rt, clearIVRLock, time.Hour, false, + func() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + return clearStuckedChannelConnections(ctx, rt, clearIVRLock) + }, + ) + + cron.Start(quit, rt, changeMaxConnNightLock, time.Minute*10, false, + func() error { + currentHour := time.Now().In(location).Hour() + if currentHour >= 22 || currentHour < 6 { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + return changeMaxConnectionsConfig(ctx, rt, changeMaxConnNightLock, "TW", 0) + } + return nil + }, + ) + + cron.Start(quit, rt, changeMaxConnDayLock, time.Minute*10, false, + func() error { + currentHour := time.Now().In(location).Hour() + if currentHour >= 6 && currentHour < 22 { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + return changeMaxConnectionsConfig(ctx, rt, changeMaxConnDayLock, "TW", 500) + } + return nil + }, + ) + return nil } @@ -168,6 +221,111 @@ func expireCalls(ctx context.Context, rt *runtime.Runtime) error { return nil } +func clearStuckedChannelConnections(ctx context.Context, rt *runtime.Runtime, lockName string) error { + log := logrus.WithField("comp", "ivr_cron_cleaner") + start := time.Now() + + ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + defer cancel() + + result, err := rt.DB.ExecContext(ctx, clearStuckedChanelConnectionsSQL) + if err != nil { + return errors.Wrapf(err, "error cleaning stucked connections") + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return errors.Wrapf(err, "error getting rows affected on cleaning stucked connections") + } + if rowsAffected > 0 { + log.WithField("count", rowsAffected).WithField("elapsed", time.Since(start)).Info("stucked channel connections") + } + return nil +} + +func changeMaxConnectionsConfig(ctx context.Context, rt *runtime.Runtime, lockName string, channelType string, maxConcurrentEventsToSet int) error { + log := logrus.WithField("comp", "ivr_cron_change_max_connections") + start := time.Now() + + ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + defer cancel() + + rows, err := rt.DB.QueryxContext(ctx, selectIVRTWTypeChannelsSQL, channelType) + if err != nil { + return errors.Wrapf(err, "error querying for channels") + } + defer rows.Close() + + ivrChannels := make([]Channel, 0) + + for rows.Next() { + ch := Channel{} + err := dbutil.ReadJSONRow(rows, &ch) + if err != nil { + return errors.Wrapf(err, "error scanning channel") + } + + ivrChannels = append(ivrChannels, ch) + } + + for _, ch := range ivrChannels { + + if ch.Config["max_concurrent_events"] == maxConcurrentEventsToSet { + return nil + } + + ch.Config["max_concurrent_events"] = maxConcurrentEventsToSet + + configJSON, err := json.Marshal(ch.Config) + if err != nil { + return errors.Wrapf(err, "error marshalling channels config") + } + + _, err = rt.DB.ExecContext(ctx, updateIVRChannelConfigSQL, string(configJSON), ch.ID) + if err != nil { + return errors.Wrapf(err, "error updating channels config") + } + } + + log.WithField("count", len(ivrChannels)).WithField("elapsed", time.Since(start)).Info("channels that have max_concurrent_events updated") + + return nil +} + +const selectIVRTWTypeChannelsSQL = ` + SELECT ROW_TO_JSON(r) FROM ( + SELECT + c.id, + c.uuid, + c.channel_type, + COALESCE(c.config, '{}')::json as config, + c.is_active + FROM + channels_channel as c + WHERE + c.channel_type = $1 AND + c.is_active = TRUE ) r; +` + +const updateIVRChannelConfigSQL = ` + UPDATE channels_channel + SET config = $1 + WHERE id = $2 +` + +const clearStuckedChanelConnectionsSQL = ` + UPDATE channels_channelconnection + SET status = 'F' + WHERE id in ( + SELECT id + FROM channels_channelconnection + WHERE + (status = 'W' OR status = 'R' OR status = 'I') AND + modified_on < NOW() - INTERVAL '2 DAYS' + LIMIT 100 + ) +` + const selectExpiredRunsSQL = ` SELECT fr.id as run_id, @@ -201,3 +359,11 @@ type RunExpiration struct { ExpiresOn time.Time `db:"expires_on"` ConnectionID models.ConnectionID `db:"connection_id"` } + +type Channel struct { + ID int `db:"id" json:"id,omitempty"` + UUID string `db:"uuid" json:"uuid,omitempty"` + ChannelType string `db:"channel_type" json:"channel_type,omitempty"` + Config map[string]interface{} `db:"config" json:"config,omitempty"` + IsActive bool `db:"is_active" json:"is_active,omitempty"` +} diff --git a/core/tasks/ivr/cron_test.go b/core/tasks/ivr/cron_test.go index 614f0ae83..add1ac1b3 100644 --- a/core/tasks/ivr/cron_test.go +++ b/core/tasks/ivr/cron_test.go @@ -72,3 +72,120 @@ func TestRetries(t *testing.T) { testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) } + +func TestClearConnections(t *testing.T) { + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetAll) + + ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + + db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) + + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + WithContactIDs([]models.ContactID{testdata.Cathy.ID}) + + // call our master starter + err := starts.CreateFlowBatches(ctx, rt, start) + assert.NoError(t, err) + + task, err := queue.PopNextTask(rc, queue.HandlerQueue) + assert.NoError(t, err) + batch := &models.FlowStartBatch{} + err = json.Unmarshal(task.Task, batch) + assert.NoError(t, err) + + client.callError = nil + client.callID = ivr.CallID("call1") + err = HandleFlowStartBatch(ctx, rt, batch) + assert.NoError(t, err) + testsuite.AssertQuery(t, db, + `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1", + ).Returns(1) + + // update channel connection to be modified_on 2 days ago + db.MustExec(`UPDATE channels_channelconnection SET modified_on = NOW() - INTERVAL '2 DAY' WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1", + ) + + // cleaning + err = clearStuckedChannelConnections(ctx, rt, "cleaner_test") + assert.NoError(t, err) + + // status should be Failed + testsuite.AssertQuery(t, db, + `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusFailed, "call1", + ).Returns(1) +} + +func TestUpdateMaxChannelsConnection(t *testing.T) { + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetAll) + + // register our mock client + ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + + //set max_concurrent_events to 1 + db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) + + //set max_concurrent_events to 0 + err := changeMaxConnectionsConfig(ctx, rt, "change_max_connections", "ZZ", 0) + assert.NoError(t, err) + var confStr string + err = db.QueryRowx("SELECT config FROM channels_channel WHERE id = $1", testdata.TwilioChannel.ID).Scan(&confStr) + assert.NoError(t, err) + conf := make(map[string]interface{}) + err = json.Unmarshal([]byte(confStr), &conf) + assert.NoError(t, err) + assert.Equal(t, 0, int(conf["max_concurrent_events"].(float64))) + + // create a flow start for cathy + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + WithContactIDs([]models.ContactID{testdata.Cathy.ID}) + // call our master starter + err = starts.CreateFlowBatches(ctx, rt, start) + assert.NoError(t, err) + + // should have one task in our ivr queue + task, err := queue.PopNextTask(rc, queue.HandlerQueue) + assert.NoError(t, err) + batch := &models.FlowStartBatch{} + err = json.Unmarshal(task.Task, batch) + assert.NoError(t, err) + + client.callError = nil + client.callID = ivr.CallID("call1") + err = HandleFlowStartBatch(ctx, rt, batch) + assert.NoError(t, err) + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`, + testdata.Cathy.ID, models.ConnectionStatusQueued).Returns(1) + + //set max_concurrent_events to 500 + err = changeMaxConnectionsConfig(ctx, rt, "change_max_connections", "ZZ", 500) + assert.NoError(t, err) + err = db.QueryRowx("SELECT config FROM channels_channel WHERE id = $1", testdata.TwilioChannel.ID).Scan(&confStr) + assert.NoError(t, err) + conf2 := make(map[string]interface{}) + err = json.Unmarshal([]byte(confStr), &conf2) + assert.NoError(t, err) + assert.Equal(t, 500, int(conf2["max_concurrent_events"].(float64))) + + // change our call to next attempt to be now minus 1 minute + db.MustExec(`UPDATE channels_channelconnection SET next_attempt = NOW() - INTERVAL '1 MINUTE' WHERE contact_id = $1;`, testdata.Cathy.ID) + assert.NoError(t, err) + + db.MustExec("SELECT pg_sleep(5)") + + err = retryCalls(ctx, rt) + assert.NoError(t, err) + + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`, + testdata.Cathy.ID, models.ConnectionStatusWired).Returns(1) +} From 74d09efe6de8db1bed89471c82aa06823289055c Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Tue, 20 Sep 2022 18:36:47 -0300 Subject: [PATCH 2/8] add config from envvar for ivr max concurrent events --- core/tasks/ivr/cron.go | 2 +- runtime/config.go | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index cbedb8677..ff8efea8f 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -90,7 +90,7 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error if currentHour >= 6 && currentHour < 22 { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() - return changeMaxConnectionsConfig(ctx, rt, changeMaxConnDayLock, "TW", 500) + return changeMaxConnectionsConfig(ctx, rt, changeMaxConnDayLock, "TW", rt.Config.MaxConcurrentEvents) } return nil }, diff --git a/runtime/config.go b/runtime/config.go index 65f9e037e..1d78e000c 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -67,10 +67,11 @@ type Config struct { FCMKey string `help:"the FCM API key used to notify Android relayers to sync"` MailgunSigningKey string `help:"the signing key used to validate requests from mailgun"` - InstanceName string `help:"the unique name of this instance used for analytics"` - LogLevel string `help:"the logging level courier should use"` - UUIDSeed int `help:"seed to use for UUID generation in a testing environment"` - Version string `help:"the version of this mailroom install"` + InstanceName string `help:"the unique name of this instance used for analytics"` + LogLevel string `help:"the logging level courier should use"` + UUIDSeed int `help:"seed to use for UUID generation in a testing environment"` + Version string `help:"the version of this mailroom install"` + MaxConcurrentEvents int `help:"ivr max concurrent events limit to set when on day time period of activity of ivr channels"` } // NewDefaultConfig returns a new default configuration object @@ -121,6 +122,8 @@ func NewDefaultConfig() *Config { LogLevel: "error", UUIDSeed: 0, Version: "Dev", + + MaxConcurrentEvents: 1500, } } From 32da61c5185885f0ec59296ffbe4420837f2fe25 Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Fri, 6 Jan 2023 10:52:16 -0300 Subject: [PATCH 3/8] move ivr cron constants to configs --- core/tasks/ivr/cron.go | 13 ++++--------- runtime/config.go | 15 +++++++++++---- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index ff8efea8f..cccd3c484 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -27,11 +27,6 @@ const ( clearQueuedPendingLock = "clear_ivr_queued_pending" ) -const ( - startHour = 6 - finishHour = 22 -) - func init() { mailroom.AddInitFunction(StartIVRCron) } @@ -39,7 +34,7 @@ func init() { // StartIVRCron starts our cron job of retrying errored calls func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error { - location, err := time.LoadLocation("Asia/Kolkata") + location, err := time.LoadLocation(rt.Config.IVRTimeZone) if err != nil { return err } @@ -47,7 +42,7 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error cron.Start(quit, rt, retryIVRLock, time.Minute, false, func() error { currentHour := time.Now().In(location).Hour() - if currentHour >= startHour && currentHour < finishHour { + if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() return retryCalls(ctx, rt) @@ -75,7 +70,7 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error cron.Start(quit, rt, changeMaxConnNightLock, time.Minute*10, false, func() error { currentHour := time.Now().In(location).Hour() - if currentHour >= 22 || currentHour < 6 { + if currentHour >= rt.Config.IVRStopHour || currentHour < rt.Config.IVRStartHour { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() return changeMaxConnectionsConfig(ctx, rt, changeMaxConnNightLock, "TW", 0) @@ -87,7 +82,7 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error cron.Start(quit, rt, changeMaxConnDayLock, time.Minute*10, false, func() error { currentHour := time.Now().In(location).Hour() - if currentHour >= 6 && currentHour < 22 { + if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() return changeMaxConnectionsConfig(ctx, rt, changeMaxConnDayLock, "TW", rt.Config.MaxConcurrentEvents) diff --git a/runtime/config.go b/runtime/config.go index 1d78e000c..ba153bf22 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -67,11 +67,15 @@ type Config struct { FCMKey string `help:"the FCM API key used to notify Android relayers to sync"` MailgunSigningKey string `help:"the signing key used to validate requests from mailgun"` - InstanceName string `help:"the unique name of this instance used for analytics"` - LogLevel string `help:"the logging level courier should use"` - UUIDSeed int `help:"seed to use for UUID generation in a testing environment"` - Version string `help:"the version of this mailroom install"` + InstanceName string `help:"the unique name of this instance used for analytics"` + LogLevel string `help:"the logging level courier should use"` + UUIDSeed int `help:"seed to use for UUID generation in a testing environment"` + Version string `help:"the version of this mailroom install"` + MaxConcurrentEvents int `help:"ivr max concurrent events limit to set when on day time period of activity of ivr channels"` + IVRStartHour int `help:"ivr start hour"` + IVRStopHour int `help:"ivr stop hour"` + IVRTimeZone string `help:"ivr time zone"` } // NewDefaultConfig returns a new default configuration object @@ -124,6 +128,9 @@ func NewDefaultConfig() *Config { Version: "Dev", MaxConcurrentEvents: 1500, + IVRStartHour: 8, + IVRStopHour: 21, + IVRTimeZone: "Asia/Kolkata", } } From 1dbe0535cf31e5ba246223a90c4d84c50fe6ce67 Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Wed, 11 Jan 2023 09:44:32 -0300 Subject: [PATCH 4/8] add ivr cron for cancel queued ivr calls --- core/tasks/ivr/cron.go | 45 ++++++++++++++++++++++++++++++++++++++++++ runtime/config.go | 18 +++++++++-------- 2 files changed, 55 insertions(+), 8 deletions(-) diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index cccd3c484..cb0679f29 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -25,6 +25,7 @@ const ( changeMaxConnNightLock = "change_ivr_max_conn_night" changeMaxConnDayLock = "change_ivr_max_conn_day" clearQueuedPendingLock = "clear_ivr_queued_pending" + cancelIVRCallsLock = "cancel_ivr_calls" ) func init() { @@ -91,6 +92,18 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error }, ) + cron.Start(quit, rt, cancelIVRCallsLock, time.Hour*1, false, + func() error { + currentHour := time.Now().In(location).Hour() + if currentHour == rt.Config.IVRCancelCronStartHour { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20) + defer cancel() + return cancelCalls(ctx, rt, cancelIVRCallsLock) + } + return nil + }, + ) + return nil } @@ -238,6 +251,27 @@ func clearStuckedChannelConnections(ctx context.Context, rt *runtime.Runtime, lo return nil } +func cancelCalls(ctx context.Context, rt *runtime.Runtime, lockName string) error { + log := logrus.WithField("comp", "ivr_cron_canceler") + start := time.Now() + ctx, cancel := context.WithTimeout(ctx, time.Minute*15) + defer cancel() + + result, err := rt.DB.ExecContext(ctx, cancelQueuedChannelConnectionsSQL) + if err != nil { + return errors.Wrapf(err, "error canceling remaining connection calls") + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return errors.Wrapf(err, "error getting rows affected on cleaning stucked connections") + } + if rowsAffected > 0 { + log.WithField("count", rowsAffected).WithField("elapsed", time.Since(start)).Info("stucked channel connections") + } + return nil +} + func changeMaxConnectionsConfig(ctx context.Context, rt *runtime.Runtime, lockName string, channelType string, maxConcurrentEventsToSet int) error { log := logrus.WithField("comp", "ivr_cron_change_max_connections") start := time.Now() @@ -308,6 +342,17 @@ const updateIVRChannelConfigSQL = ` WHERE id = $2 ` +const cancelQueuedChannelConnectionsSQL = ` + UPDATE channels_channelconnection + SET status = 'F' + WHERE id in ( + SELECT id + FROM channels_channelconnection + WHERE + (status = 'Q' OR status = 'E' OR status = 'P') + ) +` + const clearStuckedChanelConnectionsSQL = ` UPDATE channels_channelconnection SET status = 'F' diff --git a/runtime/config.go b/runtime/config.go index ba153bf22..a5d78d39a 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -72,10 +72,11 @@ type Config struct { UUIDSeed int `help:"seed to use for UUID generation in a testing environment"` Version string `help:"the version of this mailroom install"` - MaxConcurrentEvents int `help:"ivr max concurrent events limit to set when on day time period of activity of ivr channels"` - IVRStartHour int `help:"ivr start hour"` - IVRStopHour int `help:"ivr stop hour"` - IVRTimeZone string `help:"ivr time zone"` + MaxConcurrentEvents int `help:"ivr max concurrent events limit to set when on day time period of activity of ivr channels"` + IVRStartHour int `help:"ivr start hour"` + IVRStopHour int `help:"ivr stop hour"` + IVRTimeZone string `help:"ivr time zone"` + IVRCancelCronStartHour int `help:"the cron hour to start cancel ivr calls queued"` } // NewDefaultConfig returns a new default configuration object @@ -127,10 +128,11 @@ func NewDefaultConfig() *Config { UUIDSeed: 0, Version: "Dev", - MaxConcurrentEvents: 1500, - IVRStartHour: 8, - IVRStopHour: 21, - IVRTimeZone: "Asia/Kolkata", + MaxConcurrentEvents: 1500, + IVRStartHour: 8, + IVRStopHour: 21, + IVRTimeZone: "Asia/Kolkata", + IVRCancelCronStartHour: 22, } } From 456d56a957a2441c85ae97abbe5a8b09202e7d20 Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Wed, 18 Jan 2023 15:48:27 -0300 Subject: [PATCH 5/8] add config limit for ivr connections amount in retry cron --- core/tasks/ivr/cron.go | 2 +- runtime/config.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index cb0679f29..043ff0898 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -116,7 +116,7 @@ func retryCalls(ctx context.Context, rt *runtime.Runtime) error { ctx, cancel := context.WithTimeout(ctx, time.Minute*10) defer cancel() - conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, 200) + conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, rt.Config.IVRConnRetryLimit) if err != nil { return errors.Wrapf(err, "error loading connections to retry") } diff --git a/runtime/config.go b/runtime/config.go index a5d78d39a..5a7815957 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -77,6 +77,7 @@ type Config struct { IVRStopHour int `help:"ivr stop hour"` IVRTimeZone string `help:"ivr time zone"` IVRCancelCronStartHour int `help:"the cron hour to start cancel ivr calls queued"` + IVRConnRetryLimit int `help:"limit connection that be retryed"` } // NewDefaultConfig returns a new default configuration object @@ -133,6 +134,7 @@ func NewDefaultConfig() *Config { IVRStopHour: 21, IVRTimeZone: "Asia/Kolkata", IVRCancelCronStartHour: 22, + IVRConnRetryLimit: 500, } } From 48b5fb9abe41fc9646716c72f67a0251488f63d9 Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Wed, 18 Jan 2023 19:47:30 -0300 Subject: [PATCH 6/8] add throttling in ivr call start batch --- core/tasks/ivr/worker.go | 14 +++++++++++++- runtime/config.go | 28 ++++++++++++++++------------ 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/core/tasks/ivr/worker.go b/core/tasks/ivr/worker.go index 57e118b26..121d7a754 100644 --- a/core/tasks/ivr/worker.go +++ b/core/tasks/ivr/worker.go @@ -3,6 +3,7 @@ package ivr import ( "context" "encoding/json" + "math" "time" "github.com/nyaruka/mailroom" @@ -34,7 +35,7 @@ func handleFlowStartTask(ctx context.Context, rt *runtime.Runtime, task *queue.T // HandleFlowStartBatch starts a batch of contacts in an IVR flow func HandleFlowStartBatch(bg context.Context, rt *runtime.Runtime, batch *models.FlowStartBatch) error { - ctx, cancel := context.WithTimeout(bg, time.Minute*5) + ctx, cancel := context.WithTimeout(bg, time.Minute*time.Duration(rt.Config.IVRFLowStartBatchTimeout)) defer cancel() // contacts we will exclude either because they are in a flow or have already been in this one @@ -84,8 +85,19 @@ func HandleFlowStartBatch(bg context.Context, rt *runtime.Runtime, batch *models return errors.Wrapf(err, "error loading contacts") } + lastExecutionTime := time.Now() + minimumTimeBetweenEachExecution := time.Duration(math.Ceil(1e9 / float64(rt.Config.IVRFlowStartBatchExecutionsPerSecond))) + // for each contacts, request a call start for _, contact := range contacts { + nextExecutionTime := -(time.Since(lastExecutionTime) - minimumTimeBetweenEachExecution) + if nextExecutionTime > 0 { + logrus.WithFields(logrus.Fields{ + "elapsed": time.Since(time.Now()), + }).Info("backing off call start for %s", nextExecutionTime) + time.Sleep(nextExecutionTime) + } + start := time.Now() ctx, cancel := context.WithTimeout(bg, time.Minute) diff --git a/runtime/config.go b/runtime/config.go index 5a7815957..8d6278c20 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -72,12 +72,14 @@ type Config struct { UUIDSeed int `help:"seed to use for UUID generation in a testing environment"` Version string `help:"the version of this mailroom install"` - MaxConcurrentEvents int `help:"ivr max concurrent events limit to set when on day time period of activity of ivr channels"` - IVRStartHour int `help:"ivr start hour"` - IVRStopHour int `help:"ivr stop hour"` - IVRTimeZone string `help:"ivr time zone"` - IVRCancelCronStartHour int `help:"the cron hour to start cancel ivr calls queued"` - IVRConnRetryLimit int `help:"limit connection that be retryed"` + MaxConcurrentEvents int `help:"ivr max concurrent events limit to set when on day time period of activity of ivr channels"` + IVRStartHour int `help:"ivr start hour"` + IVRStopHour int `help:"ivr stop hour"` + IVRTimeZone string `help:"ivr time zone"` + IVRCancelCronStartHour int `help:"the cron hour to start cancel ivr calls queued"` + IVRConnRetryLimit int `help:"limit connection that be retryed"` + IVRFLowStartBatchTimeout int `help:"timeout of flow start batch"` + IVRFlowStartBatchExecutionsPerSecond int `help:"executions per second of flow start batch calls"` } // NewDefaultConfig returns a new default configuration object @@ -129,12 +131,14 @@ func NewDefaultConfig() *Config { UUIDSeed: 0, Version: "Dev", - MaxConcurrentEvents: 1500, - IVRStartHour: 8, - IVRStopHour: 21, - IVRTimeZone: "Asia/Kolkata", - IVRCancelCronStartHour: 22, - IVRConnRetryLimit: 500, + MaxConcurrentEvents: 1500, + IVRStartHour: 8, + IVRStopHour: 21, + IVRTimeZone: "Asia/Kolkata", + IVRCancelCronStartHour: 22, + IVRConnRetryLimit: 500, + IVRFLowStartBatchTimeout: 10, + IVRFlowStartBatchExecutionsPerSecond: 50, } } From bcda6281d3ce6218497a868b553e3ecd49e364fa Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Fri, 20 Jan 2023 20:30:46 -0300 Subject: [PATCH 7/8] add ivr retry worker pool and throttling --- core/tasks/ivr/cron.go | 43 +++++++++++++++++++- core/tasks/ivr/retryworker.go | 75 +++++++++++++++++++++++++++++++++++ core/tasks/ivr/worker.go | 2 +- runtime/config.go | 16 ++++++-- 4 files changed, 130 insertions(+), 6 deletions(-) create mode 100644 core/tasks/ivr/retryworker.go diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index 043ff0898..1a4769ba7 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -46,7 +46,7 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() - return retryCalls(ctx, rt) + return retryCallsInWorkerPool(ctx, rt) } return nil }, @@ -107,6 +107,47 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error return nil } +// retryCallsInWorkerPoll looks for calls that need to be retried and retries then +func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error { + log := logrus.WithField("comp", "ivr_cron_retryer") + start := time.Now() + + ctx, cancel := context.WithTimeout(ctx, time.Minute*10) + defer cancel() + + conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, rt.Config.IVRConnRetryLimit) + if err != nil { + return errors.Wrapf(err, "error loading connections to retry") + } + + var jobs []Job + for i := 0; i < len(conns); i++ { + jobs = append(jobs, Job{Id: i, conn: conns[i]}) + } + + var ( + wg sync.WaitGroup + jobChannel = make(chan Job) + ) + + wg.Add(rt.Config.IVRRetryWorkers) + + for i := 0; i < rt.Config.IVRRetryWorkers; i++ { + go handleWork(i, rt, &wg, jobChannel) + } + + for _, job := range jobs { + jobChannel <- job + } + + close(jobChannel) + wg.Wait() + + log.WithField("count", len(conns)).WithField("elapsed", time.Since(start)).Info("retried errored calls") + + return nil +} + // retryCalls looks for calls that need to be retried and retries them func retryCalls(ctx context.Context, rt *runtime.Runtime) error { log := logrus.WithField("comp", "ivr_cron_retryer") diff --git a/core/tasks/ivr/retryworker.go b/core/tasks/ivr/retryworker.go new file mode 100644 index 000000000..b2bb449e0 --- /dev/null +++ b/core/tasks/ivr/retryworker.go @@ -0,0 +1,75 @@ +package ivr + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + "github.com/nyaruka/mailroom/core/ivr" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/runtime" + "github.com/sirupsen/logrus" +) + +type Job struct { + Id int + conn *models.ChannelConnection +} + +type JobResult struct { + Output string +} + +func handleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-chan Job) { + defer wg.Done() + lastExecutionTime := time.Now() + minimumTimeBetweenEachExecution := time.Duration(math.Ceil(1e9 / float64(rt.Config.IVRRetryMaximumExecutionsPerSecond))) + + for job := range jobChannel { + timeUntilNextExecution := -(time.Since(lastExecutionTime) - minimumTimeBetweenEachExecution) + if timeUntilNextExecution > 0 { + fmt.Printf("Worker #%d backing off for %s\n", id, timeUntilNextExecution) + time.Sleep(timeUntilNextExecution) + } else { + fmt.Printf("Worker #%d not backing off \n", id) + } + lastExecutionTime = time.Now() + retryCall(id, rt, job.conn) + } +} + +func retryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) JobResult { + log := logrus.WithField("connection_id", conn.ID()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + defer cancel() + oa, err := models.GetOrgAssets(ctx, rt, conn.OrgID()) + if err != nil { + log.WithError(err).WithField("org_id", conn.OrgID()).Error("erroro loading org") + return JobResult{Output: "Fail"} + } + + channel := oa.ChannelByID(conn.ChannelID()) + if channel == nil { + err = models.UpdateChannelConnectionStatuses(ctx, rt.DB, []models.ConnectionID{conn.ID()}, models.ConnectionStatusFailed) + if err != nil { + log.WithError(err).WithField("channel_id", conn.ChannelID()).Error("error marking call as failed due to missing channel") + } + return JobResult{Output: "Fail"} + } + + urn, err := models.URNForID(ctx, rt.DB, oa, conn.ContactURNID()) + if err != nil { + log.WithError(err).WithField("urn_id", conn.ContactURNID()).Error("unable to load contact urn") + return JobResult{Output: "Fail"} + } + + err = ivr.RequestCallStartForConnection(ctx, rt, channel, urn, conn) + if err != nil { + log.WithError(err).Error(err) + return JobResult{Output: "Fail"} + } + + return JobResult{Output: "Success"} +} diff --git a/core/tasks/ivr/worker.go b/core/tasks/ivr/worker.go index 121d7a754..e45030959 100644 --- a/core/tasks/ivr/worker.go +++ b/core/tasks/ivr/worker.go @@ -35,7 +35,7 @@ func handleFlowStartTask(ctx context.Context, rt *runtime.Runtime, task *queue.T // HandleFlowStartBatch starts a batch of contacts in an IVR flow func HandleFlowStartBatch(bg context.Context, rt *runtime.Runtime, batch *models.FlowStartBatch) error { - ctx, cancel := context.WithTimeout(bg, time.Minute*time.Duration(rt.Config.IVRFLowStartBatchTimeout)) + ctx, cancel := context.WithTimeout(bg, time.Minute*time.Duration(rt.Config.IVRFlowStartBatchTimeout)) defer cancel() // contacts we will exclude either because they are in a flow or have already been in this one diff --git a/runtime/config.go b/runtime/config.go index 8d6278c20..2c68f55ad 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -77,9 +77,13 @@ type Config struct { IVRStopHour int `help:"ivr stop hour"` IVRTimeZone string `help:"ivr time zone"` IVRCancelCronStartHour int `help:"the cron hour to start cancel ivr calls queued"` - IVRConnRetryLimit int `help:"limit connection that be retryed"` - IVRFLowStartBatchTimeout int `help:"timeout of flow start batch"` + IVRFlowStartBatchTimeout int `help:"timeout of flow start batch"` IVRFlowStartBatchExecutionsPerSecond int `help:"executions per second of flow start batch calls"` + + IVRConnRetryLimit int `help:"limit connection that be retryed"` + IVRRetryWorkers int `help:"the number of goroutines that will be used to handle each connection retry"` + IVRRetryTimeout int `help:"timeout to run a retry ivr connections"` + IVRRetryMaximumExecutionsPerSecond int `help:"maximum executions per second of retry calls"` } // NewDefaultConfig returns a new default configuration object @@ -136,9 +140,13 @@ func NewDefaultConfig() *Config { IVRStopHour: 21, IVRTimeZone: "Asia/Kolkata", IVRCancelCronStartHour: 22, - IVRConnRetryLimit: 500, - IVRFLowStartBatchTimeout: 10, + IVRFlowStartBatchTimeout: 10, IVRFlowStartBatchExecutionsPerSecond: 50, + + IVRConnRetryLimit: 500, + IVRRetryWorkers: 5, + IVRRetryTimeout: 10, + IVRRetryMaximumExecutionsPerSecond: 10, } } From 7b8c267305aafe846ad0cdc125a9bb67939ae1d4 Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Tue, 24 Jan 2023 17:21:43 -0300 Subject: [PATCH 8/8] ivr retry workers tweak and tests --- core/tasks/ivr/cron.go | 5 +- core/tasks/ivr/cron_test.go | 58 ++++++++++++++++++ core/tasks/ivr/retryworker.go | 29 +++++---- core/tasks/ivr/retryworker_test.go | 97 ++++++++++++++++++++++++++++++ core/tasks/ivr/worker.go | 2 +- runtime/config.go | 2 +- 6 files changed, 172 insertions(+), 21 deletions(-) create mode 100644 core/tasks/ivr/retryworker_test.go diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index 1a4769ba7..f8858acfc 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -44,7 +44,7 @@ func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error func() error { currentHour := time.Now().In(location).Hour() if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*time.Duration(rt.Config.IVRRetryTimeout)) defer cancel() return retryCallsInWorkerPool(ctx, rt) } @@ -112,9 +112,6 @@ func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error { log := logrus.WithField("comp", "ivr_cron_retryer") start := time.Now() - ctx, cancel := context.WithTimeout(ctx, time.Minute*10) - defer cancel() - conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, rt.Config.IVRConnRetryLimit) if err != nil { return errors.Wrapf(err, "error loading connections to retry") diff --git a/core/tasks/ivr/cron_test.go b/core/tasks/ivr/cron_test.go index add1ac1b3..ec515e5f0 100644 --- a/core/tasks/ivr/cron_test.go +++ b/core/tasks/ivr/cron_test.go @@ -73,6 +73,64 @@ func TestRetries(t *testing.T) { testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) } +func TestRetryCallsInWorkerPool(t *testing.T) { + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetAll) + + // register our mock client + ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + + // update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1 + db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) + + // create a flow start for cathy + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + WithContactIDs([]models.ContactID{testdata.Cathy.ID}) + + // call our master starter + err := starts.CreateFlowBatches(ctx, rt, start) + assert.NoError(t, err) + + // should have one task in our ivr queue + task, err := queue.PopNextTask(rc, queue.HandlerQueue) + assert.NoError(t, err) + batch := &models.FlowStartBatch{} + err = json.Unmarshal(task.Task, batch) + assert.NoError(t, err) + + client.callError = nil + client.callID = ivr.CallID("call1") + err = HandleFlowStartBatch(ctx, rt, batch) + assert.NoError(t, err) + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // change our call to be errored instead of wired + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + + err = retryCallsInWorkerPool(ctx, rt) + assert.NoError(t, err) + + // should now be in wired state + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // back to retry and make the channel inactive + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + db.MustExec(`UPDATE channels_channel SET is_active = FALSE WHERE id = $1`, testdata.TwilioChannel.ID) + + models.FlushCache() + err = retryCallsInWorkerPool(ctx, rt) + assert.NoError(t, err) + + // this time should be failed + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) +} + func TestClearConnections(t *testing.T) { ctx, rt, db, rp := testsuite.Get() rc := rp.Get() diff --git a/core/tasks/ivr/retryworker.go b/core/tasks/ivr/retryworker.go index b2bb449e0..fa4c49217 100644 --- a/core/tasks/ivr/retryworker.go +++ b/core/tasks/ivr/retryworker.go @@ -2,7 +2,6 @@ package ivr import ( "context" - "fmt" "math" "sync" "time" @@ -10,6 +9,7 @@ import ( "github.com/nyaruka/mailroom/core/ivr" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/runtime" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -30,46 +30,45 @@ func handleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-ch for job := range jobChannel { timeUntilNextExecution := -(time.Since(lastExecutionTime) - minimumTimeBetweenEachExecution) if timeUntilNextExecution > 0 { - fmt.Printf("Worker #%d backing off for %s\n", id, timeUntilNextExecution) + logrus.Infof("Worker #%d backing off for %s\n", id, timeUntilNextExecution) time.Sleep(timeUntilNextExecution) } else { - fmt.Printf("Worker #%d not backing off \n", id) + logrus.Infof("Worker #%d not backing off \n", id) } lastExecutionTime = time.Now() - retryCall(id, rt, job.conn) + err := retryCall(id, rt, job.conn) + if err != nil { + logrus.Error(err) + } } } -func retryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) JobResult { - log := logrus.WithField("connection_id", conn.ID()) +func retryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() oa, err := models.GetOrgAssets(ctx, rt, conn.OrgID()) if err != nil { - log.WithError(err).WithField("org_id", conn.OrgID()).Error("erroro loading org") - return JobResult{Output: "Fail"} + return errors.Wrapf(err, "error loading org with id %v", conn.OrgID()) } channel := oa.ChannelByID(conn.ChannelID()) if channel == nil { err = models.UpdateChannelConnectionStatuses(ctx, rt.DB, []models.ConnectionID{conn.ID()}, models.ConnectionStatusFailed) if err != nil { - log.WithError(err).WithField("channel_id", conn.ChannelID()).Error("error marking call as failed due to missing channel") + return errors.Wrapf(err, "error marking call as failed due to missing channel with id %v", conn.ChannelID()) } - return JobResult{Output: "Fail"} + return err } urn, err := models.URNForID(ctx, rt.DB, oa, conn.ContactURNID()) if err != nil { - log.WithError(err).WithField("urn_id", conn.ContactURNID()).Error("unable to load contact urn") - return JobResult{Output: "Fail"} + return errors.Wrapf(err, "unable to load contact urn for urn_id %v", conn.ContactURNID()) } err = ivr.RequestCallStartForConnection(ctx, rt, channel, urn, conn) if err != nil { - log.WithError(err).Error(err) - return JobResult{Output: "Fail"} + return err } - return JobResult{Output: "Success"} + return nil } diff --git a/core/tasks/ivr/retryworker_test.go b/core/tasks/ivr/retryworker_test.go new file mode 100644 index 000000000..fa0a94672 --- /dev/null +++ b/core/tasks/ivr/retryworker_test.go @@ -0,0 +1,97 @@ +package ivr + +import ( + "encoding/json" + "sync" + "testing" + + "github.com/nyaruka/mailroom/core/ivr" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/queue" + "github.com/nyaruka/mailroom/core/tasks/starts" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + + "github.com/stretchr/testify/assert" +) + +func TestHandleWork(t *testing.T) { + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetAll) + + // register our mock client + ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + + // update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1 + db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) + + // create a flow start for cathy + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + WithContactIDs([]models.ContactID{testdata.Cathy.ID}) + + // call our master starter + err := starts.CreateFlowBatches(ctx, rt, start) + assert.NoError(t, err) + + // should have one task in our ivr queue + task, err := queue.PopNextTask(rc, queue.HandlerQueue) + assert.NoError(t, err) + batch := &models.FlowStartBatch{} + err = json.Unmarshal(task.Task, batch) + assert.NoError(t, err) + + client.callError = nil + client.callID = ivr.CallID("call1") + err = HandleFlowStartBatch(ctx, rt, batch) + assert.NoError(t, err) + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // change our call to be errored instead of wired + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + + conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, 1) + assert.NoError(t, err) + assert.Equal(t, len(conns), 1) + + err = retryCall(0, rt, conns[0]) + assert.NoError(t, err) + + // should now be in wired state + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // back to retry and make the channel inactive + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + db.MustExec(`UPDATE channels_channel SET is_active = FALSE WHERE id = $1`, testdata.TwilioChannel.ID) + + models.FlushCache() + + conns, err = models.LoadChannelConnectionsToRetry(ctx, rt.DB, 1) + assert.NoError(t, err) + assert.Equal(t, len(conns), 1) + + jobs := []Job{{Id: 1, conn: conns[0]}} + + var ( + wg sync.WaitGroup + jobChannel = make(chan Job) + ) + wg.Add(1) + + go handleWork(0, rt, &wg, jobChannel) + + for _, job := range jobs { + jobChannel <- job + } + + close(jobChannel) + wg.Wait() + + // this time should be failed + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) +} diff --git a/core/tasks/ivr/worker.go b/core/tasks/ivr/worker.go index e45030959..b1b8a17ad 100644 --- a/core/tasks/ivr/worker.go +++ b/core/tasks/ivr/worker.go @@ -94,7 +94,7 @@ func HandleFlowStartBatch(bg context.Context, rt *runtime.Runtime, batch *models if nextExecutionTime > 0 { logrus.WithFields(logrus.Fields{ "elapsed": time.Since(time.Now()), - }).Info("backing off call start for %s", nextExecutionTime) + }).Infof("backing off call start for %v", nextExecutionTime) time.Sleep(nextExecutionTime) } diff --git a/runtime/config.go b/runtime/config.go index 2c68f55ad..f83b31d5d 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -140,7 +140,7 @@ func NewDefaultConfig() *Config { IVRStopHour: 21, IVRTimeZone: "Asia/Kolkata", IVRCancelCronStartHour: 22, - IVRFlowStartBatchTimeout: 10, + IVRFlowStartBatchTimeout: 15, IVRFlowStartBatchExecutionsPerSecond: 50, IVRConnRetryLimit: 500,