diff --git a/core/ivr/ivr.go b/core/ivr/ivr.go index 0ef5ab086..4174c646b 100644 --- a/core/ivr/ivr.go +++ b/core/ivr/ivr.go @@ -213,22 +213,19 @@ func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, cha if maxCalls != "" { maxCalls, _ := strconv.Atoi(maxCalls) - // max calls is set, lets see how many are currently active on this channel - if maxCalls > 0 { - count, err := models.ActiveChannelConnectionCount(ctx, rt.DB, channel.ID()) - if err != nil { - return errors.Wrapf(err, "error finding number of active channel connections") - } + count, err := models.ActiveChannelConnectionCount(ctx, rt.DB, channel.ID()) + if err != nil { + return errors.Wrapf(err, "error finding number of active channel connections") + } - // we are at max calls, do not move on - if count >= maxCalls { - logrus.WithField("channel_id", channel.ID()).Info("call being queued, max concurrent reached") - err := conn.MarkThrottled(ctx, rt.DB, time.Now()) - if err != nil { - return errors.Wrapf(err, "error marking connection as throttled") - } - return nil + // we are at max calls, do not move on + if count >= maxCalls { + logrus.WithField("channel_id", channel.ID()).Info("call being queued, max concurrent reached") + err := conn.MarkThrottled(ctx, rt.DB, time.Now()) + if err != nil { + return errors.Wrapf(err, "error marking connection as throttled") } + return nil } } diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index 3f628e966..d46783270 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -2,22 +2,122 @@ package ivr import ( "context" + "encoding/json" + "sync" "time" + "github.com/nyaruka/gocommon/dbutil" + "github.com/nyaruka/goflow/flows" "github.com/nyaruka/mailroom" "github.com/nyaruka/mailroom/core/ivr" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/runtime" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) +const ( + retryIVRLock = "retry_ivr_calls" + expireIVRLock = "expire_ivr_calls" + clearIVRLock = "clear_ivr_connections" + changeMaxConnNightLock = "change_ivr_max_conn_night" + changeMaxConnDayLock = "change_ivr_max_conn_day" + cancelIVRCallsLock = "cancel_ivr_calls" +) + +var location *time.Location + func init() { - mailroom.RegisterCron("retry_ivr_calls", time.Minute, false, RetryCalls) + mailroom.RegisterCron("retry_ivr_calls", time.Minute, false, func(ctx context.Context, rt *runtime.Runtime) error { + var err error + location, err = time.LoadLocation(rt.Config.IVRTimeZone) + if err != nil { + return err + } + currentHour := time.Now().In(location).Hour() + if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*time.Duration(rt.Config.IVRRetryTimeout)) + defer cancel() + return retryCallsInWorkerPool(ctx, rt) + } + return nil + }) + + mailroom.RegisterCron(clearIVRLock, time.Hour, false, clearStuckChannelConnections) + + mailroom.RegisterCron(changeMaxConnNightLock, time.Minute*10, false, func(ctx context.Context, rt *runtime.Runtime) error { + currentHour := time.Now().In(location).Hour() + if currentHour >= rt.Config.IVRStopHour || currentHour < rt.Config.IVRStartHour { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + return changeMaxConnectionsConfig(ctx, rt, "TW", 0) + } + return nil + }) + + mailroom.RegisterCron(changeMaxConnDayLock, time.Minute*10, false, func(ctx context.Context, rt *runtime.Runtime) error { + currentHour := time.Now().In(location).Hour() + if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + return changeMaxConnectionsConfig(ctx, rt, "TW", rt.Config.MaxConcurrentEvents) + } + return nil + }) + + mailroom.RegisterCron(cancelIVRCallsLock, time.Hour*1, false, func(ctx context.Context, rt *runtime.Runtime) error { + currentHour := time.Now().In(location).Hour() + if currentHour == rt.Config.IVRCancelCronStartHour { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20) + defer cancel() + return cancelCalls(ctx, rt) + } + return nil + }) + } -// RetryCalls looks for calls that need to be retried and retries them -func RetryCalls(ctx context.Context, rt *runtime.Runtime) error { +// retryCallsInWorkerPoll looks for calls that need to be retried and retries then +func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error { + log := logrus.WithField("comp", "ivr_cron_retryer") + start := time.Now() + + conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, rt.Config.IVRConnRetryLimit) + if err != nil { + return errors.Wrapf(err, "error loading connections to retry") + } + + var jobs []Job + for i := 0; i < len(conns); i++ { + jobs = append(jobs, Job{Id: i, conn: conns[i]}) + } + + var ( + wg sync.WaitGroup + jobChannel = make(chan Job) + ) + + wg.Add(rt.Config.IVRRetryWorkers) + + for i := 0; i < rt.Config.IVRRetryWorkers; i++ { + go handleWork(i, rt, &wg, jobChannel) + } + + for _, job := range jobs { + jobChannel <- job + } + + close(jobChannel) + wg.Wait() + + log.WithField("count", len(conns)).WithField("elapsed", time.Since(start)).Info("retried errored calls") + + return nil +} + +// retryCalls looks for calls that need to be retried and retries them +func retryCalls(ctx context.Context, rt *runtime.Runtime) error { log := logrus.WithField("comp", "ivr_cron_retryer") start := time.Now() @@ -25,7 +125,7 @@ func RetryCalls(ctx context.Context, rt *runtime.Runtime) error { ctx, cancel := context.WithTimeout(ctx, time.Minute*10) defer cancel() - conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, 200) + conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, rt.Config.IVRConnRetryLimit) if err != nil { return errors.Wrapf(err, "error loading connections to retry") } @@ -82,3 +182,182 @@ func RetryCalls(ctx context.Context, rt *runtime.Runtime) error { return nil } + +func clearStuckChannelConnections(ctx context.Context, rt *runtime.Runtime) error { + log := logrus.WithField("comp", "ivr_cron_cleaner") + start := time.Now() + + ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + defer cancel() + + result, err := rt.DB.ExecContext(ctx, clearStuckedChanelConnectionsSQL) + if err != nil { + return errors.Wrapf(err, "error cleaning stucked connections") + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return errors.Wrapf(err, "error getting rows affected on cleaning stucked connections") + } + if rowsAffected > 0 { + log.WithField("count", rowsAffected).WithField("elapsed", time.Since(start)).Info("stucked channel connections") + } + return nil +} + +func cancelCalls(ctx context.Context, rt *runtime.Runtime) error { + log := logrus.WithField("comp", "ivr_cron_canceler") + start := time.Now() + ctx, cancel := context.WithTimeout(ctx, time.Minute*15) + defer cancel() + + result, err := rt.DB.ExecContext(ctx, cancelQueuedChannelConnectionsSQL) + if err != nil { + return errors.Wrapf(err, "error canceling remaining connection calls") + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return errors.Wrapf(err, "error getting rows affected on cleaning stucked connections") + } + if rowsAffected > 0 { + log.WithField("count", rowsAffected).WithField("elapsed", time.Since(start)).Info("stucked channel connections") + } + return nil +} + +func changeMaxConnectionsConfig(ctx context.Context, rt *runtime.Runtime, channelType string, maxConcurrentEventsToSet int) error { + log := logrus.WithField("comp", "ivr_cron_change_max_connections") + start := time.Now() + + ctx, cancel := context.WithTimeout(ctx, time.Minute*5) + defer cancel() + + rows, err := rt.DB.QueryxContext(ctx, selectIVRTWTypeChannelsSQL, channelType) + if err != nil { + return errors.Wrapf(err, "error querying for channels") + } + defer rows.Close() + + ivrChannels := make([]Channel, 0) + + for rows.Next() { + ch := Channel{} + err := dbutil.ScanJSON(rows, &ch) + if err != nil { + return errors.Wrapf(err, "error scanning channel") + } + + ivrChannels = append(ivrChannels, ch) + } + + for _, ch := range ivrChannels { + + if ch.Config["max_concurrent_events"] == maxConcurrentEventsToSet { + return nil + } + + ch.Config["max_concurrent_events"] = maxConcurrentEventsToSet + + configJSON, err := json.Marshal(ch.Config) + if err != nil { + return errors.Wrapf(err, "error marshalling channels config") + } + + _, err = rt.DB.ExecContext(ctx, updateIVRChannelConfigSQL, string(configJSON), ch.ID) + if err != nil { + return errors.Wrapf(err, "error updating channels config") + } + } + + log.WithField("count", len(ivrChannels)).WithField("elapsed", time.Since(start)).Info("channels that have max_concurrent_events updated") + + return nil +} + +const selectIVRTWTypeChannelsSQL = ` + SELECT ROW_TO_JSON(r) FROM ( + SELECT + c.id, + c.uuid, + c.channel_type, + COALESCE(c.config, '{}')::json as config, + c.is_active + FROM + channels_channel as c + WHERE + c.channel_type = $1 AND + c.is_active = TRUE ) r; +` + +const updateIVRChannelConfigSQL = ` + UPDATE channels_channel + SET config = $1 + WHERE id = $2 +` + +const cancelQueuedChannelConnectionsSQL = ` + UPDATE channels_channelconnection + SET status = 'F' + WHERE id in ( + SELECT id + FROM channels_channelconnection + WHERE + (status = 'Q' OR status = 'E' OR status = 'P') + ) +` + +const clearStuckedChanelConnectionsSQL = ` + UPDATE channels_channelconnection + SET status = 'F' + WHERE id in ( + SELECT id + FROM channels_channelconnection + WHERE + (status = 'W' OR status = 'R' OR status = 'I') AND + modified_on < NOW() - INTERVAL '2 DAYS' + LIMIT 100 + ) +` + +const selectExpiredRunsSQL = ` + SELECT + fr.id as run_id, + fr.org_id as org_id, + fr.flow_id as flow_id, + fr.contact_id as contact_id, + fr.session_id as session_id, + fr.expires_on as expires_on, + cc.id as connection_id + FROM + flows_flowrun fr + JOIN orgs_org o ON fr.org_id = o.id + JOIN channels_channelconnection cc ON fr.connection_id = cc.id + WHERE + fr.is_active = TRUE AND + fr.expires_on < NOW() AND + fr.connection_id IS NOT NULL AND + fr.session_id IS NOT NULL AND + cc.connection_type = 'V' + ORDER BY + expires_on ASC + LIMIT 100 +` + +type RunExpiration struct { + OrgID models.OrgID `db:"org_id"` + FlowID models.FlowID `db:"flow_id"` + ContactID flows.ContactID `db:"contact_id"` + RunID models.FlowRunID `db:"run_id"` + SessionID models.SessionID `db:"session_id"` + ExpiresOn time.Time `db:"expires_on"` + ConnectionID models.ConnectionID `db:"connection_id"` +} + +type Channel struct { + ID int `db:"id" json:"id,omitempty"` + UUID string `db:"uuid" json:"uuid,omitempty"` + ChannelType string `db:"channel_type" json:"channel_type,omitempty"` + Config map[string]interface{} `db:"config" json:"config,omitempty"` + IsActive bool `db:"is_active" json:"is_active,omitempty"` +} diff --git a/core/tasks/ivr/cron_test.go b/core/tasks/ivr/cron_test.go index 2dde012e0..7df47eec5 100644 --- a/core/tasks/ivr/cron_test.go +++ b/core/tasks/ivr/cron_test.go @@ -73,3 +73,178 @@ func TestRetries(t *testing.T) { assertdb.Query(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) } + +func TestRetryCallsInWorkerPool(t *testing.T) { + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetAll) + + // register our mock client + ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + + // update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1 + db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) + + // create a flow start for cathy + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + WithContactIDs([]models.ContactID{testdata.Cathy.ID}) + + // call our master starter + err := starts.CreateFlowBatches(ctx, rt, start) + assert.NoError(t, err) + + // should have one task in our ivr queue + task, err := queue.PopNextTask(rc, queue.HandlerQueue) + assert.NoError(t, err) + batch := &models.FlowStartBatch{} + err = json.Unmarshal(task.Task, batch) + assert.NoError(t, err) + + client.callError = nil + client.callID = ivr.CallID("call1") + err = HandleFlowStartBatch(ctx, rt, batch) + assert.NoError(t, err) + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // change our call to be errored instead of wired + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + + err = retryCallsInWorkerPool(ctx, rt) + assert.NoError(t, err) + + // should now be in wired state + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // back to retry and make the channel inactive + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + db.MustExec(`UPDATE channels_channel SET is_active = FALSE WHERE id = $1`, testdata.TwilioChannel.ID) + + models.FlushCache() + err = retryCallsInWorkerPool(ctx, rt) + assert.NoError(t, err) + + // this time should be failed + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) +} + +func TestClearConnections(t *testing.T) { + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetAll) + + ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + + db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) + + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + WithContactIDs([]models.ContactID{testdata.Cathy.ID}) + + // call our master starter + err := starts.CreateFlowBatches(ctx, rt, start) + assert.NoError(t, err) + + task, err := queue.PopNextTask(rc, queue.HandlerQueue) + assert.NoError(t, err) + batch := &models.FlowStartBatch{} + err = json.Unmarshal(task.Task, batch) + assert.NoError(t, err) + + client.callError = nil + client.callID = ivr.CallID("call1") + err = HandleFlowStartBatch(ctx, rt, batch) + assert.NoError(t, err) + testsuite.AssertQuery(t, db, + `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1", + ).Returns(1) + + // update channel connection to be modified_on 2 days ago + db.MustExec(`UPDATE channels_channelconnection SET modified_on = NOW() - INTERVAL '2 DAY' WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1", + ) + + // cleaning + err = clearStuckedChannelConnections(ctx, rt, "cleaner_test") + assert.NoError(t, err) + + // status should be Failed + testsuite.AssertQuery(t, db, + `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusFailed, "call1", + ).Returns(1) +} + +func TestUpdateMaxChannelsConnection(t *testing.T) { + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetAll) + + // register our mock client + ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + + //set max_concurrent_events to 1 + db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) + + //set max_concurrent_events to 0 + err := changeMaxConnectionsConfig(ctx, rt, "change_max_connections", "ZZ", 0) + assert.NoError(t, err) + var confStr string + err = db.QueryRowx("SELECT config FROM channels_channel WHERE id = $1", testdata.TwilioChannel.ID).Scan(&confStr) + assert.NoError(t, err) + conf := make(map[string]interface{}) + err = json.Unmarshal([]byte(confStr), &conf) + assert.NoError(t, err) + assert.Equal(t, 0, int(conf["max_concurrent_events"].(float64))) + + // create a flow start for cathy + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + WithContactIDs([]models.ContactID{testdata.Cathy.ID}) + // call our master starter + err = starts.CreateFlowBatches(ctx, rt, start) + assert.NoError(t, err) + + // should have one task in our ivr queue + task, err := queue.PopNextTask(rc, queue.HandlerQueue) + assert.NoError(t, err) + batch := &models.FlowStartBatch{} + err = json.Unmarshal(task.Task, batch) + assert.NoError(t, err) + + client.callError = nil + client.callID = ivr.CallID("call1") + err = HandleFlowStartBatch(ctx, rt, batch) + assert.NoError(t, err) + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`, + testdata.Cathy.ID, models.ConnectionStatusQueued).Returns(1) + + //set max_concurrent_events to 500 + err = changeMaxConnectionsConfig(ctx, rt, "change_max_connections", "ZZ", 500) + assert.NoError(t, err) + err = db.QueryRowx("SELECT config FROM channels_channel WHERE id = $1", testdata.TwilioChannel.ID).Scan(&confStr) + assert.NoError(t, err) + conf2 := make(map[string]interface{}) + err = json.Unmarshal([]byte(confStr), &conf2) + assert.NoError(t, err) + assert.Equal(t, 500, int(conf2["max_concurrent_events"].(float64))) + + // change our call to next attempt to be now minus 1 minute + db.MustExec(`UPDATE channels_channelconnection SET next_attempt = NOW() - INTERVAL '1 MINUTE' WHERE contact_id = $1;`, testdata.Cathy.ID) + assert.NoError(t, err) + + db.MustExec("SELECT pg_sleep(5)") + + err = retryCalls(ctx, rt) + assert.NoError(t, err) + + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2`, + testdata.Cathy.ID, models.ConnectionStatusWired).Returns(1) +} diff --git a/core/tasks/ivr/retryworker.go b/core/tasks/ivr/retryworker.go new file mode 100644 index 000000000..fa4c49217 --- /dev/null +++ b/core/tasks/ivr/retryworker.go @@ -0,0 +1,74 @@ +package ivr + +import ( + "context" + "math" + "sync" + "time" + + "github.com/nyaruka/mailroom/core/ivr" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/runtime" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type Job struct { + Id int + conn *models.ChannelConnection +} + +type JobResult struct { + Output string +} + +func handleWork(id int, rt *runtime.Runtime, wg *sync.WaitGroup, jobChannel <-chan Job) { + defer wg.Done() + lastExecutionTime := time.Now() + minimumTimeBetweenEachExecution := time.Duration(math.Ceil(1e9 / float64(rt.Config.IVRRetryMaximumExecutionsPerSecond))) + + for job := range jobChannel { + timeUntilNextExecution := -(time.Since(lastExecutionTime) - minimumTimeBetweenEachExecution) + if timeUntilNextExecution > 0 { + logrus.Infof("Worker #%d backing off for %s\n", id, timeUntilNextExecution) + time.Sleep(timeUntilNextExecution) + } else { + logrus.Infof("Worker #%d not backing off \n", id) + } + lastExecutionTime = time.Now() + err := retryCall(id, rt, job.conn) + if err != nil { + logrus.Error(err) + } + } +} + +func retryCall(workerId int, rt *runtime.Runtime, conn *models.ChannelConnection) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + defer cancel() + oa, err := models.GetOrgAssets(ctx, rt, conn.OrgID()) + if err != nil { + return errors.Wrapf(err, "error loading org with id %v", conn.OrgID()) + } + + channel := oa.ChannelByID(conn.ChannelID()) + if channel == nil { + err = models.UpdateChannelConnectionStatuses(ctx, rt.DB, []models.ConnectionID{conn.ID()}, models.ConnectionStatusFailed) + if err != nil { + return errors.Wrapf(err, "error marking call as failed due to missing channel with id %v", conn.ChannelID()) + } + return err + } + + urn, err := models.URNForID(ctx, rt.DB, oa, conn.ContactURNID()) + if err != nil { + return errors.Wrapf(err, "unable to load contact urn for urn_id %v", conn.ContactURNID()) + } + + err = ivr.RequestCallStartForConnection(ctx, rt, channel, urn, conn) + if err != nil { + return err + } + + return nil +} diff --git a/core/tasks/ivr/retryworker_test.go b/core/tasks/ivr/retryworker_test.go new file mode 100644 index 000000000..fa0a94672 --- /dev/null +++ b/core/tasks/ivr/retryworker_test.go @@ -0,0 +1,97 @@ +package ivr + +import ( + "encoding/json" + "sync" + "testing" + + "github.com/nyaruka/mailroom/core/ivr" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/queue" + "github.com/nyaruka/mailroom/core/tasks/starts" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + + "github.com/stretchr/testify/assert" +) + +func TestHandleWork(t *testing.T) { + ctx, rt, db, rp := testsuite.Get() + rc := rp.Get() + defer rc.Close() + + defer testsuite.Reset(testsuite.ResetAll) + + // register our mock client + ivr.RegisterServiceType(models.ChannelType("ZZ"), newMockProvider) + + // update our twilio channel to be of type 'ZZ' and set max_concurrent_events to 1 + db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, testdata.TwilioChannel.ID) + + // create a flow start for cathy + start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). + WithContactIDs([]models.ContactID{testdata.Cathy.ID}) + + // call our master starter + err := starts.CreateFlowBatches(ctx, rt, start) + assert.NoError(t, err) + + // should have one task in our ivr queue + task, err := queue.PopNextTask(rc, queue.HandlerQueue) + assert.NoError(t, err) + batch := &models.FlowStartBatch{} + err = json.Unmarshal(task.Task, batch) + assert.NoError(t, err) + + client.callError = nil + client.callID = ivr.CallID("call1") + err = HandleFlowStartBatch(ctx, rt, batch) + assert.NoError(t, err) + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // change our call to be errored instead of wired + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + + conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, 1) + assert.NoError(t, err) + assert.Equal(t, len(conns), 1) + + err = retryCall(0, rt, conns[0]) + assert.NoError(t, err) + + // should now be in wired state + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusWired, "call1").Returns(1) + + // back to retry and make the channel inactive + db.MustExec(`UPDATE channels_channelconnection SET status = 'E', next_attempt = NOW() WHERE external_id = 'call1';`) + db.MustExec(`UPDATE channels_channel SET is_active = FALSE WHERE id = $1`, testdata.TwilioChannel.ID) + + models.FlushCache() + + conns, err = models.LoadChannelConnectionsToRetry(ctx, rt.DB, 1) + assert.NoError(t, err) + assert.Equal(t, len(conns), 1) + + jobs := []Job{{Id: 1, conn: conns[0]}} + + var ( + wg sync.WaitGroup + jobChannel = make(chan Job) + ) + wg.Add(1) + + go handleWork(0, rt, &wg, jobChannel) + + for _, job := range jobs { + jobChannel <- job + } + + close(jobChannel) + wg.Wait() + + // this time should be failed + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Cathy.ID, models.ConnectionStatusFailed, "call1").Returns(1) +} diff --git a/core/tasks/ivr/worker.go b/core/tasks/ivr/worker.go index 03fcc19d1..c6870a427 100644 --- a/core/tasks/ivr/worker.go +++ b/core/tasks/ivr/worker.go @@ -3,6 +3,7 @@ package ivr import ( "context" "encoding/json" + "math" "time" "github.com/nyaruka/mailroom" @@ -34,7 +35,7 @@ func handleFlowStartTask(ctx context.Context, rt *runtime.Runtime, task *queue.T // HandleFlowStartBatch starts a batch of contacts in an IVR flow func HandleFlowStartBatch(bg context.Context, rt *runtime.Runtime, batch *models.FlowStartBatch) error { - ctx, cancel := context.WithTimeout(bg, time.Minute*5) + ctx, cancel := context.WithTimeout(bg, time.Minute*time.Duration(rt.Config.IVRFlowStartBatchTimeout)) defer cancel() // contacts we will exclude either because they are in a flow or have already been in this one @@ -84,8 +85,19 @@ func HandleFlowStartBatch(bg context.Context, rt *runtime.Runtime, batch *models return errors.Wrapf(err, "error loading contacts") } + lastExecutionTime := time.Now() + minimumTimeBetweenEachExecution := time.Duration(math.Ceil(1e9 / float64(rt.Config.IVRFlowStartBatchExecutionsPerSecond))) + // for each contacts, request a call start for _, contact := range contacts { + nextExecutionTime := -(time.Since(lastExecutionTime) - minimumTimeBetweenEachExecution) + if nextExecutionTime > 0 { + logrus.WithFields(logrus.Fields{ + "elapsed": time.Since(time.Now()), + }).Infof("backing off call start for %v", nextExecutionTime) + time.Sleep(nextExecutionTime) + } + start := time.Now() ctx, cancel := context.WithTimeout(bg, time.Minute) diff --git a/runtime/config.go b/runtime/config.go index 9a1f66c90..5360d2e53 100644 --- a/runtime/config.go +++ b/runtime/config.go @@ -68,14 +68,27 @@ type Config struct { FCMKey string `help:"the FCM API key used to notify Android relayers to sync"` MailgunSigningKey string `help:"the signing key used to validate requests from mailgun"` - InstanceName string `help:"the unique name of this instance used for analytics"` - LogLevel string `help:"the logging level courier should use"` - UUIDSeed int `help:"seed to use for UUID generation in a testing environment"` - Version string `help:"the version of this mailroom install"` - TimeoutTime int `help:"the amount of time to between every timeout queued"` - WenichatsServiceURL string `help:"wenichats external api url for ticketer service integration"` - - FlowStartBatchTimeout int `help:"timeout config for flow start batch"` + InstanceName string `help:"the unique name of this instance used for analytics"` + LogLevel string `help:"the logging level courier should use"` + UUIDSeed int `help:"seed to use for UUID generation in a testing environment"` + Version string `help:"the version of this mailroom install"` + + TimeoutTime int `help:"the amount of time to between every timeout queued"` + WenichatsServiceURL string `help:"wenichats external api url for ticketer service integration"` + FlowStartBatchTimeout int `help:"timeout config for flow start batch"` + + MaxConcurrentEvents int `help:"ivr max concurrent events limit to set when on day time period of activity of ivr channels"` + IVRStartHour int `help:"ivr start hour"` + IVRStopHour int `help:"ivr stop hour"` + IVRTimeZone string `help:"ivr time zone"` + IVRCancelCronStartHour int `help:"the cron hour to start cancel ivr calls queued"` + IVRFlowStartBatchTimeout int `help:"timeout of flow start batch"` + IVRFlowStartBatchExecutionsPerSecond int `help:"executions per second of flow start batch calls"` + + IVRConnRetryLimit int `help:"limit connection that be retryed"` + IVRRetryWorkers int `help:"the number of goroutines that will be used to handle each connection retry"` + IVRRetryTimeout int `help:"timeout to run a retry ivr connections"` + IVRRetryMaximumExecutionsPerSecond int `help:"maximum executions per second of retry calls"` } // NewDefaultConfig returns a new default configuration object @@ -122,14 +135,27 @@ func NewDefaultConfig() *Config { AWSAccessKeyID: "", AWSSecretAccessKey: "", - InstanceName: hostname, - LogLevel: "error", - UUIDSeed: 0, - Version: "Dev", - TimeoutTime: 15, - WenichatsServiceURL: "https://chats-engine.dev.cloud.weni.ai/v1/external", + InstanceName: hostname, + LogLevel: "error", + UUIDSeed: 0, + Version: "Dev", + TimeoutTime: 60, + WenichatsServiceURL: "https://chats-engine.dev.cloud.weni.ai/v1/external", FlowStartBatchTimeout: 15, + + MaxConcurrentEvents: 1500, + IVRStartHour: 8, + IVRStopHour: 21, + IVRTimeZone: "Asia/Kolkata", + IVRCancelCronStartHour: 22, + IVRFlowStartBatchTimeout: 15, + IVRFlowStartBatchExecutionsPerSecond: 50, + + IVRConnRetryLimit: 500, + IVRRetryWorkers: 5, + IVRRetryTimeout: 10, + IVRRetryMaximumExecutionsPerSecond: 10, } }