Skip to content

Commit

Permalink
Merge branch 'main' into fix_amd_retry
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Aug 16, 2021
2 parents daaa2a1 + 6d91f5e commit 7b2edc9
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 132 deletions.
49 changes: 17 additions & 32 deletions core/ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ type Client interface {

ResumeForRequest(r *http.Request) (Resume, error)

// StatusForRequest returns the current call status for the passed in request and also:
// - duration of call if known
// - whether we should hangup - i.e. the call is still in progress on the provider side, but we shouldn't continue with it
StatusForRequest(r *http.Request) (models.ConnectionStatus, int, bool)
// StatusForRequest returns the call status for the passed in request, and if it's an error the reason,
// and if available, the current call duration
StatusForRequest(r *http.Request) (models.ConnectionStatus, models.ConnectionError, int)

PreprocessResume(ctx context.Context, db *sqlx.DB, rp *redis.Pool, conn *models.ChannelConnection, r *http.Request) ([]byte, error)

Expand Down Expand Up @@ -415,14 +414,18 @@ func ResumeIVRFlow(
if session.ConnectionID() == nil {
return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Errorf("active session: %d has no connection", session.ID()))
}

if *session.ConnectionID() != conn.ID() {
return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Errorf("active session: %d does not match connection: %d", session.ID(), *session.ConnectionID()))
}

// check connection is still marked as in progress
if conn.Status() != models.ConnectionStatusInProgress {
return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Errorf("connection in invalid state: %s", conn.Status()))
// check if connection has been marked as errored - it maybe have been updated by status callback
if conn.Status() == models.ConnectionStatusErrored || conn.Status() == models.ConnectionStatusFailed {
err = models.ExitSessions(ctx, rt.DB, []models.SessionID{session.ID()}, models.ExitInterrupted, time.Now())
if err != nil {
logrus.WithError(err).Error("error interrupting session")
}

return client.WriteErrorResponse(w, fmt.Errorf("call ended due to previous status callback"))
}

// preprocess this request
Expand Down Expand Up @@ -591,30 +594,10 @@ func buildMsgResume(
// ended for some reason and update the state of the call and session if so
func HandleIVRStatus(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, client Client, conn *models.ChannelConnection, r *http.Request, w http.ResponseWriter) error {
// read our status and duration from our client
status, duration, hangup := client.StatusForRequest(r)

if hangup {
err := HangupCall(ctx, rt.Config, rt.DB, conn)
if err != nil {
return errors.Wrapf(err, "error hanging up call")
}
}
status, errorReason, duration := client.StatusForRequest(r)

// if we errored schedule a retry if appropriate
if status == models.ConnectionStatusErrored {
// get session associated with this connection
sessionID, sessionStatus, err := models.SessionForConnection(ctx, rt.DB, conn)
if err != nil {
return errors.Wrapf(err, "error fetching session for connection")
}

// if session is still active we interrupt it
if sessionStatus == models.SessionStatusWaiting {
err = models.ExitSessions(ctx, rt.DB, []models.SessionID{sessionID}, models.ExitInterrupted, time.Now())
if err != nil {
logrus.WithError(err).Error("error interrupting session for connection")
}
}

// no associated start? this is a permanent failure
if conn.StartID() == models.NilStartID {
conn.MarkFailed(ctx, rt.DB, time.Now())
Expand All @@ -632,9 +615,11 @@ func HandleIVRStatus(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAss
return errors.Wrapf(err, "unable to load flow: %d", start.FlowID())
}

conn.MarkErrored(ctx, rt.DB, time.Now(), flow.IVRRetryWait())
conn.MarkErrored(ctx, rt.DB, time.Now(), flow.IVRRetryWait(), errorReason)

return client.WriteEmptyResponse(w, fmt.Sprintf("status updated: %s next_attempt: %s", conn.Status(), conn.NextAttempt()))
if conn.Status() == models.ConnectionStatusErrored {
return client.WriteEmptyResponse(w, fmt.Sprintf("status updated: %s next_attempt: %s", conn.Status(), conn.NextAttempt()))
}

} else if status == models.ConnectionStatusFailed {
conn.MarkFailed(ctx, rt.DB, time.Now())
Expand Down
27 changes: 15 additions & 12 deletions core/ivr/twiml/twiml.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,39 +287,42 @@ func (c *client) ResumeForRequest(r *http.Request) (ivr.Resume, error) {
}
}

// StatusForRequest returns the current call status for the passed in request and also:
// - duration of call if known
// - whether we should hangup - i.e. the call is still in progress on the provider side, but we shouldn't continue with it
func (c *client) StatusForRequest(r *http.Request) (models.ConnectionStatus, int, bool) {
// StatusForRequest returns the call status for the passed in request, and if it's an error the reason,
// and if available, the current call duration
func (c *client) StatusForRequest(r *http.Request) (models.ConnectionStatus, models.ConnectionError, int) {
// we re-use our status callback for AMD results which will have an AnsweredBy field but no CallStatus field
answeredBy := r.Form.Get("AnsweredBy")
if answeredBy != "" {
switch answeredBy {
case "machine_start", "fax":
return models.ConnectionStatusErrored, 0, true
return models.ConnectionStatusErrored, models.ConnectionErrorMachine, 0
}
return models.ConnectionStatusInProgress, 0, false
return models.ConnectionStatusInProgress, "", 0
}

status := r.Form.Get("CallStatus")
switch status {

case "queued", "ringing":
return models.ConnectionStatusWired, 0, false
return models.ConnectionStatusWired, "", 0

case "in-progress", "initiated":
return models.ConnectionStatusInProgress, 0, false
return models.ConnectionStatusInProgress, "", 0

case "completed":
duration, _ := strconv.Atoi(r.Form.Get("CallDuration"))
return models.ConnectionStatusCompleted, duration, false
return models.ConnectionStatusCompleted, "", duration

case "busy", "no-answer", "canceled", "failed":
return models.ConnectionStatusErrored, 0, false
case "busy":
return models.ConnectionStatusErrored, models.ConnectionErrorBusy, 0
case "no-answer":
return models.ConnectionStatusErrored, models.ConnectionErrorNoAnswer, 0
case "canceled", "failed":
return models.ConnectionStatusErrored, "", 0

default:
logrus.WithField("call_status", status).Error("unknown call status in status callback")
return models.ConnectionStatusFailed, 0, false
return models.ConnectionStatusFailed, models.ConnectionErrorProvider, 0
}
}

Expand Down
35 changes: 20 additions & 15 deletions core/ivr/vonage/vonage.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,50 +542,55 @@ type StatusRequest struct {
Duration string `json:"duration"`
}

// StatusForRequest returns the current call status for the passed in request and also:
// - duration of call if known
// - whether we should hangup - i.e. the call is still in progress on the provider side, but we shouldn't continue with it
func (c *client) StatusForRequest(r *http.Request) (models.ConnectionStatus, int, bool) {
// StatusForRequest returns the current call status for the passed in status (and optional duration if known)
func (c *client) StatusForRequest(r *http.Request) (models.ConnectionStatus, models.ConnectionError, int) {
// this is a resume, call is in progress, no need to look at the body
if r.Form.Get("action") == "resume" {
return models.ConnectionStatusInProgress, 0, false
return models.ConnectionStatusInProgress, "", 0
}

status := &StatusRequest{}
bb, err := readBody(r)
if err != nil {
logrus.WithError(err).Error("error reading status request body")
return models.ConnectionStatusErrored, 0, false
return models.ConnectionStatusErrored, models.ConnectionErrorProvider, 0
}

status := &StatusRequest{}
err = json.Unmarshal(bb, status)
if err != nil {
logrus.WithError(err).WithField("body", string(bb)).Error("error unmarshalling ncco status")
return models.ConnectionStatusErrored, 0, false
return models.ConnectionStatusErrored, models.ConnectionErrorProvider, 0
}

// transfer status callbacks have no status, safe to ignore them
if status.Status == "" {
return models.ConnectionStatusInProgress, 0, false
return models.ConnectionStatusInProgress, "", 0
}

switch status.Status {

case "started", "ringing":
return models.ConnectionStatusWired, 0, false
return models.ConnectionStatusWired, "", 0

case "answered":
return models.ConnectionStatusInProgress, 0, false
return models.ConnectionStatusInProgress, "", 0

case "completed":
duration, _ := strconv.Atoi(status.Duration)
return models.ConnectionStatusCompleted, duration, false
return models.ConnectionStatusCompleted, "", duration

case "rejected", "busy", "unanswered", "timeout", "failed", "machine":
return models.ConnectionStatusErrored, 0, false
case "busy":
return models.ConnectionStatusErrored, models.ConnectionErrorBusy, 0
case "rejected", "unanswered", "timeout":
return models.ConnectionStatusErrored, models.ConnectionErrorNoAnswer, 0
case "machine":
return models.ConnectionStatusErrored, models.ConnectionErrorMachine, 0
case "failed":
return models.ConnectionStatusErrored, models.ConnectionErrorProvider, 0

default:
logrus.WithField("status", status.Status).Error("unknown call status in ncco callback")
return models.ConnectionStatusFailed, 0, false
return models.ConnectionStatusFailed, models.ConnectionErrorProvider, 0
}
}

Expand Down
69 changes: 49 additions & 20 deletions core/models/channel_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const NilConnectionID = ConnectionID(0)
// ConnectionStatus is the type for the status of a connection
type ConnectionStatus string

// ConnectionError is the type for the reason of an errored connection
type ConnectionError null.String

// ConnectionDirection is the type for the direction of a connection
type ConnectionDirection string

Expand All @@ -39,13 +42,18 @@ const (

// connection status constants
const (
ConnectionStatusPending = ConnectionStatus("P")
ConnectionStatusQueued = ConnectionStatus("Q")
ConnectionStatusWired = ConnectionStatus("W")
ConnectionStatusInProgress = ConnectionStatus("I")
ConnectionStatusFailed = ConnectionStatus("F")
ConnectionStatusErrored = ConnectionStatus("E")
ConnectionStatusCompleted = ConnectionStatus("D")
ConnectionStatusPending = ConnectionStatus("P") // used for initial creation in database
ConnectionStatusQueued = ConnectionStatus("Q") // call can't be wired yet and is queued locally
ConnectionStatusWired = ConnectionStatus("W") // call has been requested on the IVR provider
ConnectionStatusInProgress = ConnectionStatus("I") // call was answered and is in progress
ConnectionStatusCompleted = ConnectionStatus("D") // call was completed successfully
ConnectionStatusErrored = ConnectionStatus("E") // temporary failure (will be retried)
ConnectionStatusFailed = ConnectionStatus("F") // permanent failure

ConnectionErrorProvider = ConnectionError("P")
ConnectionErrorBusy = ConnectionError("B")
ConnectionErrorNoAnswer = ConnectionError("N")
ConnectionErrorMachine = ConnectionError("M")

ConnectionMaxRetries = 3

Expand All @@ -69,7 +77,9 @@ type ChannelConnection struct {
EndedOn *time.Time `json:"ended_on" db:"ended_on"`
ConnectionType ConnectionType `json:"connection_type" db:"connection_type"`
Duration int `json:"duration" db:"duration"`
RetryCount int `json:"retry_count" db:"retry_count"`
RetryCount int `json:"retry_count" db:"retry_count"` // TODO replace use with error_count
ErrorReason null.String `json:"error_reason" db:"error_reason"`
ErrorCount int `json:"error_count" db:"error_count"`
NextAttempt *time.Time `json:"next_attempt" db:"next_attempt"`
ChannelID ChannelID `json:"channel_id" db:"channel_id"`
ContactID ContactID `json:"contact_id" db:"contact_id"`
Expand All @@ -85,13 +95,16 @@ func (c *ChannelConnection) ID() ConnectionID { return c.c.ID }
// Status returns the status of this connection
func (c *ChannelConnection) Status() ConnectionStatus { return c.c.Status }

func (c *ChannelConnection) NextAttempt() *time.Time { return c.c.NextAttempt }
func (c *ChannelConnection) ExternalID() string { return c.c.ExternalID }
func (c *ChannelConnection) OrgID() OrgID { return c.c.OrgID }
func (c *ChannelConnection) ContactID() ContactID { return c.c.ContactID }
func (c *ChannelConnection) ContactURNID() URNID { return c.c.ContactURNID }
func (c *ChannelConnection) ChannelID() ChannelID { return c.c.ChannelID }
func (c *ChannelConnection) StartID() StartID { return c.c.StartID }
func (c *ChannelConnection) ExternalID() string { return c.c.ExternalID }
func (c *ChannelConnection) OrgID() OrgID { return c.c.OrgID }
func (c *ChannelConnection) ContactID() ContactID { return c.c.ContactID }
func (c *ChannelConnection) ContactURNID() URNID { return c.c.ContactURNID }
func (c *ChannelConnection) ChannelID() ChannelID { return c.c.ChannelID }
func (c *ChannelConnection) StartID() StartID { return c.c.StartID }

func (c *ChannelConnection) ErrorReason() ConnectionError { return ConnectionError(c.c.ErrorReason) }
func (c *ChannelConnection) ErrorCount() int { return c.c.ErrorCount }
func (c *ChannelConnection) NextAttempt() *time.Time { return c.c.NextAttempt }

const insertConnectionSQL = `
INSERT INTO
Expand All @@ -108,6 +121,7 @@ INSERT INTO
channel_id,
contact_id,
contact_urn_id,
error_count,
retry_count
)
Expand All @@ -123,6 +137,7 @@ VALUES(
:channel_id,
:contact_id,
:contact_urn_id,
0,
0
)
RETURNING
Expand Down Expand Up @@ -193,6 +208,8 @@ SELECT
cc.connection_type as connection_type,
cc.duration as duration,
cc.retry_count as retry_count,
cc.error_reason as error_reason,
cc.error_count as error_count,
cc.next_attempt as next_attempt,
cc.channel_id as channel_id,
cc.contact_id as contact_id,
Expand Down Expand Up @@ -229,6 +246,8 @@ SELECT
cc.connection_type as connection_type,
cc.duration as duration,
cc.retry_count as retry_count,
cc.error_reason as error_reason,
cc.error_count as error_count,
cc.next_attempt as next_attempt,
cc.channel_id as channel_id,
cc.contact_id as contact_id,
Expand Down Expand Up @@ -270,6 +289,8 @@ SELECT
cc.connection_type as connection_type,
cc.duration as duration,
cc.retry_count as retry_count,
cc.error_reason as error_reason,
cc.error_count as error_count,
cc.next_attempt as next_attempt,
cc.channel_id as channel_id,
cc.contact_id as contact_id,
Expand All @@ -281,7 +302,7 @@ FROM
LEFT OUTER JOIN flows_flowstart_connections fsc ON cc.id = fsc.channelconnection_id
WHERE
cc.connection_type = 'V' AND
cc.status IN ('Q', 'N', 'B', 'E') AND
cc.status IN ('Q', 'E') AND
next_attempt < NOW()
ORDER BY
cc.next_attempt ASC
Expand Down Expand Up @@ -343,12 +364,20 @@ func (c *ChannelConnection) MarkStarted(ctx context.Context, db Queryer, now tim
}

// MarkErrored updates the status for this connection to errored and schedules a retry if appropriate
func (c *ChannelConnection) MarkErrored(ctx context.Context, db Queryer, now time.Time, retryWait *time.Duration) error {
func (c *ChannelConnection) MarkErrored(ctx context.Context, db Queryer, now time.Time, retryWait *time.Duration, errorReason ConnectionError) error {
c.c.Status = ConnectionStatusErrored
c.c.ErrorReason = null.String(errorReason)
c.c.EndedOn = &now

if c.c.RetryCount < ConnectionMaxRetries && retryWait != nil {
// TODO remove this once we only use error_count
errorCount := c.c.ErrorCount
if c.c.RetryCount > errorCount {
errorCount = c.c.RetryCount
}

if errorCount < ConnectionMaxRetries && retryWait != nil {
c.c.RetryCount++
c.c.ErrorCount++
next := now.Add(*retryWait)
c.c.NextAttempt = &next
} else {
Expand All @@ -357,8 +386,8 @@ func (c *ChannelConnection) MarkErrored(ctx context.Context, db Queryer, now tim
}

_, err := db.ExecContext(ctx,
`UPDATE channels_channelconnection SET status = $2, ended_on = $3, retry_count = $4, next_attempt = $5, modified_on = NOW() WHERE id = $1`,
c.c.ID, c.c.Status, c.c.EndedOn, c.c.RetryCount, c.c.NextAttempt,
`UPDATE channels_channelconnection SET status = $2, ended_on = $3, retry_count = $4, error_reason = $5, error_count = $6, next_attempt = $7, modified_on = NOW() WHERE id = $1`,
c.c.ID, c.c.Status, c.c.EndedOn, c.c.RetryCount, c.c.ErrorReason, c.c.ErrorCount, c.c.NextAttempt,
)

if err != nil {
Expand Down
14 changes: 0 additions & 14 deletions core/models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ type SessionCommitHook func(context.Context, *sqlx.Tx, *redis.Pool, *OrgAssets,
type SessionID int64
type SessionStatus string

const NilSessionID = SessionID(0)

type FlowRunID int64

const NilFlowRunID = FlowRunID(0)
Expand Down Expand Up @@ -989,18 +987,6 @@ WHERE
fs.contact_id = ANY($2)
`

func SessionForConnection(ctx context.Context, db *sqlx.DB, conn *ChannelConnection) (SessionID, SessionStatus, error) {
res := struct {
ID SessionID `db:"id"`
Status SessionStatus `db:"status"`
}{}
err := db.GetContext(ctx, &res, `SELECT id, status FROM flows_flowsession WHERE connection_id = $1`, conn.ID())
if err == sql.ErrNoRows {
return NilSessionID, SessionStatusFailed, nil
}
return res.ID, res.Status, err
}

// RunExpiration looks up the run expiration for the passed in run, can return nil if the run is no longer active
func RunExpiration(ctx context.Context, db *sqlx.DB, runID FlowRunID) (*time.Time, error) {
var expiration time.Time
Expand Down
Loading

0 comments on commit 7b2edc9

Please sign in to comment.