Skip to content

Commit

Permalink
Merge branch 'v1.3.3-mailroom-7.1.22-india' of https://github.com/Ilh…
Browse files Browse the repository at this point in the history
…asoft/mailroom into main-in
  • Loading branch information
rasoro committed Jul 27, 2023
2 parents e015da2 + 7b8c267 commit 5a72126
Show file tree
Hide file tree
Showing 7 changed files with 693 additions and 33 deletions.
25 changes: 11 additions & 14 deletions core/ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,19 @@ func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, cha
if maxCalls != "" {
maxCalls, _ := strconv.Atoi(maxCalls)

// max calls is set, lets see how many are currently active on this channel
if maxCalls > 0 {
count, err := models.ActiveChannelConnectionCount(ctx, rt.DB, channel.ID())
if err != nil {
return errors.Wrapf(err, "error finding number of active channel connections")
}
count, err := models.ActiveChannelConnectionCount(ctx, rt.DB, channel.ID())
if err != nil {
return errors.Wrapf(err, "error finding number of active channel connections")
}

// we are at max calls, do not move on
if count >= maxCalls {
logrus.WithField("channel_id", channel.ID()).Info("call being queued, max concurrent reached")
err := conn.MarkThrottled(ctx, rt.DB, time.Now())
if err != nil {
return errors.Wrapf(err, "error marking connection as throttled")
}
return nil
// we are at max calls, do not move on
if count >= maxCalls {
logrus.WithField("channel_id", channel.ID()).Info("call being queued, max concurrent reached")
err := conn.MarkThrottled(ctx, rt.DB, time.Now())
if err != nil {
return errors.Wrapf(err, "error marking connection as throttled")
}
return nil
}
}

Expand Down
287 changes: 283 additions & 4 deletions core/tasks/ivr/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,130 @@ package ivr

import (
"context"
"encoding/json"
"sync"
"time"

"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/ivr"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const (
retryIVRLock = "retry_ivr_calls"
expireIVRLock = "expire_ivr_calls"
clearIVRLock = "clear_ivr_connections"
changeMaxConnNightLock = "change_ivr_max_conn_night"
changeMaxConnDayLock = "change_ivr_max_conn_day"
cancelIVRCallsLock = "cancel_ivr_calls"
)

var location *time.Location

func init() {
mailroom.RegisterCron("retry_ivr_calls", time.Minute, false, RetryCalls)
mailroom.RegisterCron("retry_ivr_calls", time.Minute, false, func(ctx context.Context, rt *runtime.Runtime) error {
var err error
location, err = time.LoadLocation(rt.Config.IVRTimeZone)
if err != nil {
return err
}
currentHour := time.Now().In(location).Hour()
if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*time.Duration(rt.Config.IVRRetryTimeout))
defer cancel()
return retryCallsInWorkerPool(ctx, rt)
}
return nil
})

mailroom.RegisterCron(clearIVRLock, time.Hour, false, clearStuckChannelConnections)

mailroom.RegisterCron(changeMaxConnNightLock, time.Minute*10, false, func(ctx context.Context, rt *runtime.Runtime) error {
currentHour := time.Now().In(location).Hour()
if currentHour >= rt.Config.IVRStopHour || currentHour < rt.Config.IVRStartHour {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
return changeMaxConnectionsConfig(ctx, rt, "TW", 0)
}
return nil
})

mailroom.RegisterCron(changeMaxConnDayLock, time.Minute*10, false, func(ctx context.Context, rt *runtime.Runtime) error {
currentHour := time.Now().In(location).Hour()
if currentHour >= rt.Config.IVRStartHour && currentHour < rt.Config.IVRStopHour {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
return changeMaxConnectionsConfig(ctx, rt, "TW", rt.Config.MaxConcurrentEvents)
}
return nil
})

mailroom.RegisterCron(cancelIVRCallsLock, time.Hour*1, false, func(ctx context.Context, rt *runtime.Runtime) error {
currentHour := time.Now().In(location).Hour()
if currentHour == rt.Config.IVRCancelCronStartHour {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20)
defer cancel()
return cancelCalls(ctx, rt)
}
return nil
})

}

// RetryCalls looks for calls that need to be retried and retries them
func RetryCalls(ctx context.Context, rt *runtime.Runtime) error {
// retryCallsInWorkerPoll looks for calls that need to be retried and retries then
func retryCallsInWorkerPool(ctx context.Context, rt *runtime.Runtime) error {
log := logrus.WithField("comp", "ivr_cron_retryer")
start := time.Now()

conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, rt.Config.IVRConnRetryLimit)
if err != nil {
return errors.Wrapf(err, "error loading connections to retry")
}

var jobs []Job
for i := 0; i < len(conns); i++ {
jobs = append(jobs, Job{Id: i, conn: conns[i]})
}

var (
wg sync.WaitGroup
jobChannel = make(chan Job)
)

wg.Add(rt.Config.IVRRetryWorkers)

for i := 0; i < rt.Config.IVRRetryWorkers; i++ {
go handleWork(i, rt, &wg, jobChannel)
}

for _, job := range jobs {
jobChannel <- job
}

close(jobChannel)
wg.Wait()

log.WithField("count", len(conns)).WithField("elapsed", time.Since(start)).Info("retried errored calls")

return nil
}

// retryCalls looks for calls that need to be retried and retries them
func retryCalls(ctx context.Context, rt *runtime.Runtime) error {
log := logrus.WithField("comp", "ivr_cron_retryer")
start := time.Now()

// find all calls that need restarting
ctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, 200)
conns, err := models.LoadChannelConnectionsToRetry(ctx, rt.DB, rt.Config.IVRConnRetryLimit)
if err != nil {
return errors.Wrapf(err, "error loading connections to retry")
}
Expand Down Expand Up @@ -82,3 +182,182 @@ func RetryCalls(ctx context.Context, rt *runtime.Runtime) error {

return nil
}

func clearStuckChannelConnections(ctx context.Context, rt *runtime.Runtime) error {
log := logrus.WithField("comp", "ivr_cron_cleaner")
start := time.Now()

ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()

result, err := rt.DB.ExecContext(ctx, clearStuckedChanelConnectionsSQL)
if err != nil {
return errors.Wrapf(err, "error cleaning stucked connections")
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return errors.Wrapf(err, "error getting rows affected on cleaning stucked connections")
}
if rowsAffected > 0 {
log.WithField("count", rowsAffected).WithField("elapsed", time.Since(start)).Info("stucked channel connections")
}
return nil
}

func cancelCalls(ctx context.Context, rt *runtime.Runtime) error {
log := logrus.WithField("comp", "ivr_cron_canceler")
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

result, err := rt.DB.ExecContext(ctx, cancelQueuedChannelConnectionsSQL)
if err != nil {
return errors.Wrapf(err, "error canceling remaining connection calls")
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return errors.Wrapf(err, "error getting rows affected on cleaning stucked connections")
}
if rowsAffected > 0 {
log.WithField("count", rowsAffected).WithField("elapsed", time.Since(start)).Info("stucked channel connections")
}
return nil
}

func changeMaxConnectionsConfig(ctx context.Context, rt *runtime.Runtime, channelType string, maxConcurrentEventsToSet int) error {
log := logrus.WithField("comp", "ivr_cron_change_max_connections")
start := time.Now()

ctx, cancel := context.WithTimeout(ctx, time.Minute*5)
defer cancel()

rows, err := rt.DB.QueryxContext(ctx, selectIVRTWTypeChannelsSQL, channelType)
if err != nil {
return errors.Wrapf(err, "error querying for channels")
}
defer rows.Close()

ivrChannels := make([]Channel, 0)

for rows.Next() {
ch := Channel{}
err := dbutil.ScanJSON(rows, &ch)
if err != nil {
return errors.Wrapf(err, "error scanning channel")
}

ivrChannels = append(ivrChannels, ch)
}

for _, ch := range ivrChannels {

if ch.Config["max_concurrent_events"] == maxConcurrentEventsToSet {
return nil
}

ch.Config["max_concurrent_events"] = maxConcurrentEventsToSet

configJSON, err := json.Marshal(ch.Config)
if err != nil {
return errors.Wrapf(err, "error marshalling channels config")
}

_, err = rt.DB.ExecContext(ctx, updateIVRChannelConfigSQL, string(configJSON), ch.ID)
if err != nil {
return errors.Wrapf(err, "error updating channels config")
}
}

log.WithField("count", len(ivrChannels)).WithField("elapsed", time.Since(start)).Info("channels that have max_concurrent_events updated")

return nil
}

const selectIVRTWTypeChannelsSQL = `
SELECT ROW_TO_JSON(r) FROM (
SELECT
c.id,
c.uuid,
c.channel_type,
COALESCE(c.config, '{}')::json as config,
c.is_active
FROM
channels_channel as c
WHERE
c.channel_type = $1 AND
c.is_active = TRUE ) r;
`

const updateIVRChannelConfigSQL = `
UPDATE channels_channel
SET config = $1
WHERE id = $2
`

const cancelQueuedChannelConnectionsSQL = `
UPDATE channels_channelconnection
SET status = 'F'
WHERE id in (
SELECT id
FROM channels_channelconnection
WHERE
(status = 'Q' OR status = 'E' OR status = 'P')
)
`

const clearStuckedChanelConnectionsSQL = `
UPDATE channels_channelconnection
SET status = 'F'
WHERE id in (
SELECT id
FROM channels_channelconnection
WHERE
(status = 'W' OR status = 'R' OR status = 'I') AND
modified_on < NOW() - INTERVAL '2 DAYS'
LIMIT 100
)
`

const selectExpiredRunsSQL = `
SELECT
fr.id as run_id,
fr.org_id as org_id,
fr.flow_id as flow_id,
fr.contact_id as contact_id,
fr.session_id as session_id,
fr.expires_on as expires_on,
cc.id as connection_id
FROM
flows_flowrun fr
JOIN orgs_org o ON fr.org_id = o.id
JOIN channels_channelconnection cc ON fr.connection_id = cc.id
WHERE
fr.is_active = TRUE AND
fr.expires_on < NOW() AND
fr.connection_id IS NOT NULL AND
fr.session_id IS NOT NULL AND
cc.connection_type = 'V'
ORDER BY
expires_on ASC
LIMIT 100
`

type RunExpiration struct {
OrgID models.OrgID `db:"org_id"`
FlowID models.FlowID `db:"flow_id"`
ContactID flows.ContactID `db:"contact_id"`
RunID models.FlowRunID `db:"run_id"`
SessionID models.SessionID `db:"session_id"`
ExpiresOn time.Time `db:"expires_on"`
ConnectionID models.ConnectionID `db:"connection_id"`
}

type Channel struct {
ID int `db:"id" json:"id,omitempty"`
UUID string `db:"uuid" json:"uuid,omitempty"`
ChannelType string `db:"channel_type" json:"channel_type,omitempty"`
Config map[string]interface{} `db:"config" json:"config,omitempty"`
IsActive bool `db:"is_active" json:"is_active,omitempty"`
}
Loading

0 comments on commit 5a72126

Please sign in to comment.