Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use synchronous answering machine detection on Twilio channels #485

Merged
merged 4 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions core/ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/storage"
"github.com/nyaruka/gocommon/urns"
Expand Down Expand Up @@ -40,9 +41,6 @@ const (
ErrorMessage = "An error has occurred, please try again later."
)

// CallEndedError is our constant error for when a call has ended
var CallEndedError = fmt.Errorf("call ended")

// our map of client constructors
var constructors = make(map[models.ChannelType]ClientConstructor)

Expand Down Expand Up @@ -82,6 +80,9 @@ type Client interface {
// and if available, the current call duration
StatusForRequest(r *http.Request) (models.ConnectionStatus, models.ConnectionError, int)

// CheckStartRequest checks the start request from the provider is as we expect and if not returns an error reason
CheckStartRequest(r *http.Request) models.ConnectionError

PreprocessResume(ctx context.Context, db *sqlx.DB, rp *redis.Pool, conn *models.ChannelConnection, r *http.Request) ([]byte, error)

PreprocessStatus(ctx context.Context, db *sqlx.DB, rp *redis.Pool, r *http.Request) ([]byte, error)
Expand Down Expand Up @@ -287,11 +288,11 @@ func RequestCallStartForConnection(ctx context.Context, config *config.Config, d
return nil
}

// WriteErrorResponse marks the passed in connection as errored and writes the appropriate error response to our writer
func WriteErrorResponse(ctx context.Context, db *sqlx.DB, client Client, conn *models.ChannelConnection, w http.ResponseWriter, rootErr error) error {
// HandleAsFailure marks the passed in connection as errored and writes the appropriate error response to our writer
func HandleAsFailure(ctx context.Context, db *sqlx.DB, client Client, conn *models.ChannelConnection, w http.ResponseWriter, rootErr error) error {
err := conn.MarkFailed(ctx, db, time.Now())
if err != nil {
logrus.WithError(err).Error("error when trying to mark connection as errored")
logrus.WithError(err).Error("error marking connection as failed")
}
return client.WriteErrorResponse(w, rootErr)
}
Expand All @@ -302,22 +303,36 @@ func StartIVRFlow(
channel *models.Channel, conn *models.ChannelConnection, c *models.Contact, urn urns.URN, startID models.StartID,
r *http.Request, w http.ResponseWriter) error {

// connection isn't in a wired status, that's an error
// connection isn't in a wired or in-progress status then we shouldn't be here
if conn.Status() != models.ConnectionStatusWired && conn.Status() != models.ConnectionStatusInProgress {
return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Errorf("connection in invalid state: %s", conn.Status()))
return HandleAsFailure(ctx, rt.DB, client, conn, w, errors.Errorf("connection in invalid state: %s", conn.Status()))
}

// get the flow for our start
start, err := models.GetFlowStartAttributes(ctx, rt.DB, startID)
if err != nil {
return errors.Wrapf(err, "unable to load start: %d", startID)
}

flow, err := oa.FlowByID(start.FlowID())
if err != nil {
return errors.Wrapf(err, "unable to load flow: %d", startID)
}

// check that call on provider side is in the state we need to continue
if errorReason := client.CheckStartRequest(r); errorReason != "" {
err := conn.MarkErrored(ctx, rt.DB, dates.Now(), flow.IVRRetryWait(), errorReason)
if err != nil {
return errors.Wrap(err, "unable to mark connection as errored")
}

errMsg := fmt.Sprintf("status updated: %s", conn.Status())
if conn.Status() == models.ConnectionStatusErrored {
errMsg = fmt.Sprintf("%s, next_attempt: %s", errMsg, conn.NextAttempt())
}

return client.WriteErrorResponse(w, errors.New(errMsg))
}

// our flow contact
contact, err := c.FlowContact(oa)
if err != nil {
Expand Down Expand Up @@ -408,14 +423,14 @@ func ResumeIVRFlow(
}

if session == nil {
return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Errorf("no active IVR session for contact"))
return HandleAsFailure(ctx, rt.DB, client, conn, w, errors.Errorf("no active IVR session for contact"))
}

if session.ConnectionID() == nil {
return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Errorf("active session: %d has no connection", session.ID()))
return HandleAsFailure(ctx, rt.DB, client, conn, w, errors.Errorf("active session: %d has no connection", session.ID()))
}
if *session.ConnectionID() != conn.ID() {
return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Errorf("active session: %d does not match connection: %d", session.ID(), *session.ConnectionID()))
return HandleAsFailure(ctx, rt.DB, client, conn, w, errors.Errorf("active session: %d does not match connection: %d", session.ID(), *session.ConnectionID()))
}

// check if connection has been marked as errored - it maybe have been updated by status callback
Expand Down Expand Up @@ -462,12 +477,7 @@ func ResumeIVRFlow(
// get the input of our request
ivrResume, err := client.ResumeForRequest(r)
if err != nil {
// call has ended, so will our session
if err == CallEndedError {
WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Wrapf(err, "call already ended"))
}

return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Wrapf(err, "error finding input for request"))
return HandleAsFailure(ctx, rt.DB, client, conn, w, errors.Wrapf(err, "error finding input for request"))
}

var resume flows.Resume
Expand Down Expand Up @@ -620,10 +630,10 @@ func HandleIVRStatus(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAss
return errors.Wrapf(err, "unable to load flow: %d", start.FlowID())
}

conn.MarkErrored(ctx, rt.DB, time.Now(), flow.IVRRetryWait(), errorReason)
conn.MarkErrored(ctx, rt.DB, dates.Now(), flow.IVRRetryWait(), errorReason)

if conn.Status() == models.ConnectionStatusErrored {
return client.WriteEmptyResponse(w, fmt.Sprintf("status updated: %s next_attempt: %s", conn.Status(), conn.NextAttempt()))
return client.WriteEmptyResponse(w, fmt.Sprintf("status updated: %s, next_attempt: %s", conn.Status(), conn.NextAttempt()))
}

} else if status == models.ConnectionStatusFailed {
Expand Down
36 changes: 14 additions & 22 deletions core/ivr/twiml/twiml.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"encoding/json"
"encoding/xml"
"fmt"
"net/http"
Expand Down Expand Up @@ -149,6 +148,15 @@ func (c *client) DownloadMedia(url string) (*http.Response, error) {
return http.Get(url)
}

func (c *client) CheckStartRequest(r *http.Request) models.ConnectionError {
r.ParseForm()
answeredBy := r.Form.Get("AnsweredBy")
if answeredBy == "machine_start" || answeredBy == "fax" {
return models.ConnectionErrorMachine
}
return ""
}

func (c *client) PreprocessStatus(ctx context.Context, db *sqlx.DB, rp *redis.Pool, r *http.Request) ([]byte, error) {
return nil, nil
}
Expand Down Expand Up @@ -177,7 +185,7 @@ func (c *client) URNForRequest(r *http.Request) (urns.URN, error) {

// CallResponse is our struct for a Twilio call response
type CallResponse struct {
SID string `json:"sid"`
SID string `json:"sid" validate:"required"`
Status string `json:"status"`
}

Expand All @@ -191,8 +199,6 @@ func (c *client) RequestCall(number urns.URN, callbackURL string, statusURL stri

if machineDetection {
form.Set("MachineDetection", "Enable")
form.Set("AsyncAmd", "true")
form.Set("AsyncAmdStatusCallback", statusURL)
}

sendURL := c.baseURL + strings.Replace(callPath, "{AccountSID}", c.accountSID, -1)
Expand All @@ -206,13 +212,11 @@ func (c *client) RequestCall(number urns.URN, callbackURL string, statusURL stri
return ivr.NilCallID, trace, errors.Errorf("received non 201 status for call start: %d", trace.Response.StatusCode)
}

// parse out our call sid
// parse the response from Twilio
call := &CallResponse{}
err = json.Unmarshal(trace.ResponseBody, call)
if err != nil || call.SID == "" {
return ivr.NilCallID, trace, errors.Errorf("unable to read call id")
if err := utils.UnmarshalAndValidate(trace.ResponseBody, call); err != nil {
return ivr.NilCallID, trace, errors.Wrap(err, "unable parse Twilio response")
}

if call.Status == statusFailed {
return ivr.NilCallID, trace, errors.Errorf("call status returned as failed")
}
Expand Down Expand Up @@ -293,25 +297,13 @@ func (c *client) ResumeForRequest(r *http.Request) (ivr.Resume, error) {
// StatusForRequest returns the call status for the passed in request, and if it's an error the reason,
// and if available, the current call duration
func (c *client) StatusForRequest(r *http.Request) (models.ConnectionStatus, models.ConnectionError, int) {
// we re-use our status callback for AMD results which will have an AnsweredBy field but no CallStatus field
answeredBy := r.Form.Get("AnsweredBy")
if answeredBy != "" {
switch answeredBy {
case "machine_start", "fax":
return models.ConnectionStatusErrored, models.ConnectionErrorMachine, 0
}
return models.ConnectionStatusInProgress, "", 0
}

status := r.Form.Get("CallStatus")
switch status {

case "queued", "ringing":
return models.ConnectionStatusWired, "", 0

case "in-progress", "initiated":
return models.ConnectionStatusInProgress, "", 0

case "completed":
duration, _ := strconv.Atoi(r.Form.Get("CallDuration"))
return models.ConnectionStatusCompleted, "", duration
Expand All @@ -321,7 +313,7 @@ func (c *client) StatusForRequest(r *http.Request) (models.ConnectionStatus, mod
case "no-answer":
return models.ConnectionStatusErrored, models.ConnectionErrorNoAnswer, 0
case "canceled", "failed":
return models.ConnectionStatusErrored, "", 0
return models.ConnectionStatusErrored, models.ConnectionErrorProvider, 0

default:
logrus.WithField("call_status", status).Error("unknown call status in status callback")
Expand Down
4 changes: 4 additions & 0 deletions core/ivr/vonage/vonage.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ func (c *client) DownloadMedia(url string) (*http.Response, error) {
return http.DefaultClient.Do(req)
}

func (c *client) CheckStartRequest(r *http.Request) models.ConnectionError {
return ""
}

func (c *client) PreprocessStatus(ctx context.Context, db *sqlx.DB, rp *redis.Pool, r *http.Request) ([]byte, error) {
// parse out the call status, we are looking for a leg of one of our conferences ending in the "forward" case
// get our recording url out
Expand Down
4 changes: 4 additions & 0 deletions core/tasks/ivr/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (c *MockClient) StatusForRequest(r *http.Request) (models.ConnectionStatus,
return models.ConnectionStatusFailed, models.ConnectionErrorProvider, 10
}

func (c *MockClient) CheckStartRequest(r *http.Request) models.ConnectionError {
return ""
}

func (c *MockClient) PreprocessResume(ctx context.Context, db *sqlx.DB, rp *redis.Pool, conn *models.ChannelConnection, r *http.Request) ([]byte, error) {
return nil, nil
}
Expand Down
Loading