Skip to content

Commit

Permalink
remove new expire_ivr_call and recover old expire_calls
Browse files Browse the repository at this point in the history
  • Loading branch information
rasoro committed Feb 29, 2024
1 parent 74dae91 commit 615c0e7
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 1 deletion.
57 changes: 57 additions & 0 deletions core/models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,60 @@ WHERE
contact_id = ANY($1) AND
flow_id = $2
`

// ExpireRunsAndSessions expires all the passed in runs and sessions. Note this should only be called
// for runs that have no parents or no way of continuing
func ExpireRunsAndSessions(ctx context.Context, db *sqlx.DB, runIDs []FlowRunID, sessionIDs []SessionID) error {
if len(runIDs) == 0 {
return nil
}

tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return errors.Wrapf(err, "error starting transaction to expire sessions")
}

err = Exec(ctx, "expiring runs", tx, expireRunsSQL, pq.Array(runIDs))
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error expiring runs")
}

if len(sessionIDs) > 0 {
err = Exec(ctx, "expiring sessions", tx, expireSessionsSQL, pq.Array(sessionIDs))
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error expiring sessions")
}
}

err = tx.Commit()
if err != nil {
return errors.Wrapf(err, "error committing expiration of runs and sessions")
}
return nil
}

const expireSessionsSQL = `
UPDATE
flows_flowsession s
SET
timeout_on = NULL,
ended_on = NOW(),
status = 'X'
WHERE
id = ANY($1)
`

const expireRunsSQL = `
UPDATE
flows_flowrun fr
SET
is_active = FALSE,
exited_on = NOW(),
exit_type = 'E',
status = 'E',
modified_on = NOW()
WHERE
id = ANY($1)
`
1 change: 0 additions & 1 deletion core/tasks/expirations/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ var expirationsMarker = redisx.NewIntervalSet("run_expirations", time.Hour*24, 2

func init() {
mailroom.RegisterCron("run_expirations", time.Minute, false, HandleWaitExpirations)
mailroom.RegisterCron("expire_ivr_calls", time.Minute, false, ExpireVoiceSessions)
}

// HandleWaitExpirations handles waiting messaging sessions whose waits have expired, resuming those that can be resumed,
Expand Down
57 changes: 57 additions & 0 deletions core/tasks/ivr/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func init() {
return nil
})

mailroom.RegisterCron(expireIVRLock, time.Minute, false, ExpireCalls)

}

// retryCallsInWorkerPoll looks for calls that need to be retried and retries then
Expand Down Expand Up @@ -282,6 +284,61 @@ func ChangeMaxConnectionsConfig(ctx context.Context, rt *runtime.Runtime, channe
return nil
}

// expireCalls looks for calls that should be expired and ends them
func ExpireCalls(ctx context.Context, rt *runtime.Runtime) error {
log := logrus.WithField("comp", "ivr_cron_expirer")
start := time.Now()

ctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

// select our expired runs
rows, err := rt.DB.QueryxContext(ctx, selectExpiredRunsSQL)
if err != nil {
return errors.Wrapf(err, "error querying for expired runs")
}
defer rows.Close()

expiredRuns := make([]models.FlowRunID, 0, 100)
expiredSessions := make([]models.SessionID, 0, 100)

for rows.Next() {
exp := &RunExpiration{}
err := rows.StructScan(exp)
if err != nil {
return errors.Wrapf(err, "error scanning expired run")
}

// add the run and session to those we need to expire
expiredRuns = append(expiredRuns, exp.RunID)
expiredSessions = append(expiredSessions, exp.SessionID)

// load our connection
conn, err := models.SelectChannelConnection(ctx, rt.DB, exp.ConnectionID)
if err != nil {
log.WithError(err).WithField("connection_id", exp.ConnectionID).Error("unable to load connection")
continue
}

// hang up our call
err = ivr.HangupCall(ctx, rt, conn)
if err != nil {
log.WithError(err).WithField("connection_id", conn.ID()).Error("error hanging up call")
}
}

// now expire our runs and sessions
if len(expiredRuns) > 0 {
err := models.ExpireRunsAndSessions(ctx, rt.DB, expiredRuns, expiredSessions)
if err != nil {
log.WithError(err).Error("error expiring runs and sessions for expired calls")
}
log.WithField("count", len(expiredRuns)).WithField("elapsed", time.Since(start)).Info("expired and hung up on channel connections")
}

return nil
}

const selectIVRTWTypeChannelsSQL = `
SELECT ROW_TO_JSON(r) FROM (
SELECT
Expand Down

0 comments on commit 615c0e7

Please sign in to comment.