diff --git a/core/ivr/ivr.go b/core/ivr/ivr.go index 78686b70c..248e0290e 100644 --- a/core/ivr/ivr.go +++ b/core/ivr/ivr.go @@ -78,10 +78,9 @@ type Client interface { ResumeForRequest(r *http.Request) (Resume, error) - // StatusForRequest returns the current call status for the passed in request and also: - // - duration of call if known - // - whether we should hangup - i.e. the call is still in progress on the provider side, but we shouldn't continue with it - StatusForRequest(r *http.Request) (models.ConnectionStatus, int, bool) + // StatusForRequest returns the call status for the passed in request, and if it's an error the reason, + // and if available, the current call duration + StatusForRequest(r *http.Request) (models.ConnectionStatus, models.ConnectionError, int) PreprocessResume(ctx context.Context, db *sqlx.DB, rp *redis.Pool, conn *models.ChannelConnection, r *http.Request) ([]byte, error) @@ -415,14 +414,18 @@ func ResumeIVRFlow( if session.ConnectionID() == nil { return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Errorf("active session: %d has no connection", session.ID())) } - if *session.ConnectionID() != conn.ID() { return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Errorf("active session: %d does not match connection: %d", session.ID(), *session.ConnectionID())) } - // check connection is still marked as in progress - if conn.Status() != models.ConnectionStatusInProgress { - return WriteErrorResponse(ctx, rt.DB, client, conn, w, errors.Errorf("connection in invalid state: %s", conn.Status())) + // check if connection has been marked as errored - it maybe have been updated by status callback + if conn.Status() == models.ConnectionStatusErrored || conn.Status() == models.ConnectionStatusFailed { + err = models.ExitSessions(ctx, rt.DB, []models.SessionID{session.ID()}, models.ExitInterrupted, time.Now()) + if err != nil { + logrus.WithError(err).Error("error interrupting session") + } + + return client.WriteErrorResponse(w, fmt.Errorf("call ended due to previous status callback")) } // preprocess this request @@ -591,30 +594,10 @@ func buildMsgResume( // ended for some reason and update the state of the call and session if so func HandleIVRStatus(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, client Client, conn *models.ChannelConnection, r *http.Request, w http.ResponseWriter) error { // read our status and duration from our client - status, duration, hangup := client.StatusForRequest(r) - - if hangup { - err := HangupCall(ctx, rt.Config, rt.DB, conn) - if err != nil { - return errors.Wrapf(err, "error hanging up call") - } - } + status, errorReason, duration := client.StatusForRequest(r) + // if we errored schedule a retry if appropriate if status == models.ConnectionStatusErrored { - // get session associated with this connection - sessionID, sessionStatus, err := models.SessionForConnection(ctx, rt.DB, conn) - if err != nil { - return errors.Wrapf(err, "error fetching session for connection") - } - - // if session is still active we interrupt it - if sessionStatus == models.SessionStatusWaiting { - err = models.ExitSessions(ctx, rt.DB, []models.SessionID{sessionID}, models.ExitInterrupted, time.Now()) - if err != nil { - logrus.WithError(err).Error("error interrupting session for connection") - } - } - // no associated start? this is a permanent failure if conn.StartID() == models.NilStartID { conn.MarkFailed(ctx, rt.DB, time.Now()) @@ -632,9 +615,11 @@ func HandleIVRStatus(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAss return errors.Wrapf(err, "unable to load flow: %d", start.FlowID()) } - conn.MarkErrored(ctx, rt.DB, time.Now(), flow.IVRRetryWait()) + conn.MarkErrored(ctx, rt.DB, time.Now(), flow.IVRRetryWait(), errorReason) - return client.WriteEmptyResponse(w, fmt.Sprintf("status updated: %s next_attempt: %s", conn.Status(), conn.NextAttempt())) + if conn.Status() == models.ConnectionStatusErrored { + return client.WriteEmptyResponse(w, fmt.Sprintf("status updated: %s next_attempt: %s", conn.Status(), conn.NextAttempt())) + } } else if status == models.ConnectionStatusFailed { conn.MarkFailed(ctx, rt.DB, time.Now()) diff --git a/core/ivr/twiml/twiml.go b/core/ivr/twiml/twiml.go index d97cb3557..3bdc2c707 100644 --- a/core/ivr/twiml/twiml.go +++ b/core/ivr/twiml/twiml.go @@ -287,39 +287,42 @@ func (c *client) ResumeForRequest(r *http.Request) (ivr.Resume, error) { } } -// StatusForRequest returns the current call status for the passed in request and also: -// - duration of call if known -// - whether we should hangup - i.e. the call is still in progress on the provider side, but we shouldn't continue with it -func (c *client) StatusForRequest(r *http.Request) (models.ConnectionStatus, int, bool) { +// StatusForRequest returns the call status for the passed in request, and if it's an error the reason, +// and if available, the current call duration +func (c *client) StatusForRequest(r *http.Request) (models.ConnectionStatus, models.ConnectionError, int) { // we re-use our status callback for AMD results which will have an AnsweredBy field but no CallStatus field answeredBy := r.Form.Get("AnsweredBy") if answeredBy != "" { switch answeredBy { case "machine_start", "fax": - return models.ConnectionStatusErrored, 0, true + return models.ConnectionStatusErrored, models.ConnectionErrorMachine, 0 } - return models.ConnectionStatusInProgress, 0, false + return models.ConnectionStatusInProgress, "", 0 } status := r.Form.Get("CallStatus") switch status { case "queued", "ringing": - return models.ConnectionStatusWired, 0, false + return models.ConnectionStatusWired, "", 0 case "in-progress", "initiated": - return models.ConnectionStatusInProgress, 0, false + return models.ConnectionStatusInProgress, "", 0 case "completed": duration, _ := strconv.Atoi(r.Form.Get("CallDuration")) - return models.ConnectionStatusCompleted, duration, false + return models.ConnectionStatusCompleted, "", duration - case "busy", "no-answer", "canceled", "failed": - return models.ConnectionStatusErrored, 0, false + case "busy": + return models.ConnectionStatusErrored, models.ConnectionErrorBusy, 0 + case "no-answer": + return models.ConnectionStatusErrored, models.ConnectionErrorNoAnswer, 0 + case "canceled", "failed": + return models.ConnectionStatusErrored, "", 0 default: logrus.WithField("call_status", status).Error("unknown call status in status callback") - return models.ConnectionStatusFailed, 0, false + return models.ConnectionStatusFailed, models.ConnectionErrorProvider, 0 } } diff --git a/core/ivr/vonage/vonage.go b/core/ivr/vonage/vonage.go index aefb89e3d..a05b628dd 100644 --- a/core/ivr/vonage/vonage.go +++ b/core/ivr/vonage/vonage.go @@ -542,50 +542,55 @@ type StatusRequest struct { Duration string `json:"duration"` } -// StatusForRequest returns the current call status for the passed in request and also: -// - duration of call if known -// - whether we should hangup - i.e. the call is still in progress on the provider side, but we shouldn't continue with it -func (c *client) StatusForRequest(r *http.Request) (models.ConnectionStatus, int, bool) { +// StatusForRequest returns the current call status for the passed in status (and optional duration if known) +func (c *client) StatusForRequest(r *http.Request) (models.ConnectionStatus, models.ConnectionError, int) { // this is a resume, call is in progress, no need to look at the body if r.Form.Get("action") == "resume" { - return models.ConnectionStatusInProgress, 0, false + return models.ConnectionStatusInProgress, "", 0 } - status := &StatusRequest{} bb, err := readBody(r) if err != nil { logrus.WithError(err).Error("error reading status request body") - return models.ConnectionStatusErrored, 0, false + return models.ConnectionStatusErrored, models.ConnectionErrorProvider, 0 } + + status := &StatusRequest{} err = json.Unmarshal(bb, status) if err != nil { logrus.WithError(err).WithField("body", string(bb)).Error("error unmarshalling ncco status") - return models.ConnectionStatusErrored, 0, false + return models.ConnectionStatusErrored, models.ConnectionErrorProvider, 0 } // transfer status callbacks have no status, safe to ignore them if status.Status == "" { - return models.ConnectionStatusInProgress, 0, false + return models.ConnectionStatusInProgress, "", 0 } switch status.Status { case "started", "ringing": - return models.ConnectionStatusWired, 0, false + return models.ConnectionStatusWired, "", 0 case "answered": - return models.ConnectionStatusInProgress, 0, false + return models.ConnectionStatusInProgress, "", 0 case "completed": duration, _ := strconv.Atoi(status.Duration) - return models.ConnectionStatusCompleted, duration, false + return models.ConnectionStatusCompleted, "", duration - case "rejected", "busy", "unanswered", "timeout", "failed", "machine": - return models.ConnectionStatusErrored, 0, false + case "busy": + return models.ConnectionStatusErrored, models.ConnectionErrorBusy, 0 + case "rejected", "unanswered", "timeout": + return models.ConnectionStatusErrored, models.ConnectionErrorNoAnswer, 0 + case "machine": + return models.ConnectionStatusErrored, models.ConnectionErrorMachine, 0 + case "failed": + return models.ConnectionStatusErrored, models.ConnectionErrorProvider, 0 default: logrus.WithField("status", status.Status).Error("unknown call status in ncco callback") - return models.ConnectionStatusFailed, 0, false + return models.ConnectionStatusFailed, models.ConnectionErrorProvider, 0 } } diff --git a/core/models/channel_connection.go b/core/models/channel_connection.go index 84d9d63fc..b6953c7dc 100644 --- a/core/models/channel_connection.go +++ b/core/models/channel_connection.go @@ -20,6 +20,9 @@ const NilConnectionID = ConnectionID(0) // ConnectionStatus is the type for the status of a connection type ConnectionStatus string +// ConnectionError is the type for the reason of an errored connection +type ConnectionError null.String + // ConnectionDirection is the type for the direction of a connection type ConnectionDirection string @@ -39,13 +42,18 @@ const ( // connection status constants const ( - ConnectionStatusPending = ConnectionStatus("P") - ConnectionStatusQueued = ConnectionStatus("Q") - ConnectionStatusWired = ConnectionStatus("W") - ConnectionStatusInProgress = ConnectionStatus("I") - ConnectionStatusFailed = ConnectionStatus("F") - ConnectionStatusErrored = ConnectionStatus("E") - ConnectionStatusCompleted = ConnectionStatus("D") + ConnectionStatusPending = ConnectionStatus("P") // used for initial creation in database + ConnectionStatusQueued = ConnectionStatus("Q") // call can't be wired yet and is queued locally + ConnectionStatusWired = ConnectionStatus("W") // call has been requested on the IVR provider + ConnectionStatusInProgress = ConnectionStatus("I") // call was answered and is in progress + ConnectionStatusCompleted = ConnectionStatus("D") // call was completed successfully + ConnectionStatusErrored = ConnectionStatus("E") // temporary failure (will be retried) + ConnectionStatusFailed = ConnectionStatus("F") // permanent failure + + ConnectionErrorProvider = ConnectionError("P") + ConnectionErrorBusy = ConnectionError("B") + ConnectionErrorNoAnswer = ConnectionError("N") + ConnectionErrorMachine = ConnectionError("M") ConnectionMaxRetries = 3 @@ -69,7 +77,9 @@ type ChannelConnection struct { EndedOn *time.Time `json:"ended_on" db:"ended_on"` ConnectionType ConnectionType `json:"connection_type" db:"connection_type"` Duration int `json:"duration" db:"duration"` - RetryCount int `json:"retry_count" db:"retry_count"` + RetryCount int `json:"retry_count" db:"retry_count"` // TODO replace use with error_count + ErrorReason null.String `json:"error_reason" db:"error_reason"` + ErrorCount int `json:"error_count" db:"error_count"` NextAttempt *time.Time `json:"next_attempt" db:"next_attempt"` ChannelID ChannelID `json:"channel_id" db:"channel_id"` ContactID ContactID `json:"contact_id" db:"contact_id"` @@ -85,13 +95,16 @@ func (c *ChannelConnection) ID() ConnectionID { return c.c.ID } // Status returns the status of this connection func (c *ChannelConnection) Status() ConnectionStatus { return c.c.Status } -func (c *ChannelConnection) NextAttempt() *time.Time { return c.c.NextAttempt } -func (c *ChannelConnection) ExternalID() string { return c.c.ExternalID } -func (c *ChannelConnection) OrgID() OrgID { return c.c.OrgID } -func (c *ChannelConnection) ContactID() ContactID { return c.c.ContactID } -func (c *ChannelConnection) ContactURNID() URNID { return c.c.ContactURNID } -func (c *ChannelConnection) ChannelID() ChannelID { return c.c.ChannelID } -func (c *ChannelConnection) StartID() StartID { return c.c.StartID } +func (c *ChannelConnection) ExternalID() string { return c.c.ExternalID } +func (c *ChannelConnection) OrgID() OrgID { return c.c.OrgID } +func (c *ChannelConnection) ContactID() ContactID { return c.c.ContactID } +func (c *ChannelConnection) ContactURNID() URNID { return c.c.ContactURNID } +func (c *ChannelConnection) ChannelID() ChannelID { return c.c.ChannelID } +func (c *ChannelConnection) StartID() StartID { return c.c.StartID } + +func (c *ChannelConnection) ErrorReason() ConnectionError { return ConnectionError(c.c.ErrorReason) } +func (c *ChannelConnection) ErrorCount() int { return c.c.ErrorCount } +func (c *ChannelConnection) NextAttempt() *time.Time { return c.c.NextAttempt } const insertConnectionSQL = ` INSERT INTO @@ -108,6 +121,7 @@ INSERT INTO channel_id, contact_id, contact_urn_id, + error_count, retry_count ) @@ -123,6 +137,7 @@ VALUES( :channel_id, :contact_id, :contact_urn_id, + 0, 0 ) RETURNING @@ -193,6 +208,8 @@ SELECT cc.connection_type as connection_type, cc.duration as duration, cc.retry_count as retry_count, + cc.error_reason as error_reason, + cc.error_count as error_count, cc.next_attempt as next_attempt, cc.channel_id as channel_id, cc.contact_id as contact_id, @@ -229,6 +246,8 @@ SELECT cc.connection_type as connection_type, cc.duration as duration, cc.retry_count as retry_count, + cc.error_reason as error_reason, + cc.error_count as error_count, cc.next_attempt as next_attempt, cc.channel_id as channel_id, cc.contact_id as contact_id, @@ -270,6 +289,8 @@ SELECT cc.connection_type as connection_type, cc.duration as duration, cc.retry_count as retry_count, + cc.error_reason as error_reason, + cc.error_count as error_count, cc.next_attempt as next_attempt, cc.channel_id as channel_id, cc.contact_id as contact_id, @@ -281,7 +302,7 @@ FROM LEFT OUTER JOIN flows_flowstart_connections fsc ON cc.id = fsc.channelconnection_id WHERE cc.connection_type = 'V' AND - cc.status IN ('Q', 'N', 'B', 'E') AND + cc.status IN ('Q', 'E') AND next_attempt < NOW() ORDER BY cc.next_attempt ASC @@ -343,12 +364,20 @@ func (c *ChannelConnection) MarkStarted(ctx context.Context, db Queryer, now tim } // MarkErrored updates the status for this connection to errored and schedules a retry if appropriate -func (c *ChannelConnection) MarkErrored(ctx context.Context, db Queryer, now time.Time, retryWait *time.Duration) error { +func (c *ChannelConnection) MarkErrored(ctx context.Context, db Queryer, now time.Time, retryWait *time.Duration, errorReason ConnectionError) error { c.c.Status = ConnectionStatusErrored + c.c.ErrorReason = null.String(errorReason) c.c.EndedOn = &now - if c.c.RetryCount < ConnectionMaxRetries && retryWait != nil { + // TODO remove this once we only use error_count + errorCount := c.c.ErrorCount + if c.c.RetryCount > errorCount { + errorCount = c.c.RetryCount + } + + if errorCount < ConnectionMaxRetries && retryWait != nil { c.c.RetryCount++ + c.c.ErrorCount++ next := now.Add(*retryWait) c.c.NextAttempt = &next } else { @@ -357,8 +386,8 @@ func (c *ChannelConnection) MarkErrored(ctx context.Context, db Queryer, now tim } _, err := db.ExecContext(ctx, - `UPDATE channels_channelconnection SET status = $2, ended_on = $3, retry_count = $4, next_attempt = $5, modified_on = NOW() WHERE id = $1`, - c.c.ID, c.c.Status, c.c.EndedOn, c.c.RetryCount, c.c.NextAttempt, + `UPDATE channels_channelconnection SET status = $2, ended_on = $3, retry_count = $4, error_reason = $5, error_count = $6, next_attempt = $7, modified_on = NOW() WHERE id = $1`, + c.c.ID, c.c.Status, c.c.EndedOn, c.c.RetryCount, c.c.ErrorReason, c.c.ErrorCount, c.c.NextAttempt, ) if err != nil { diff --git a/core/models/runs.go b/core/models/runs.go index 8634d8ca1..f4b327387 100644 --- a/core/models/runs.go +++ b/core/models/runs.go @@ -33,8 +33,6 @@ type SessionCommitHook func(context.Context, *sqlx.Tx, *redis.Pool, *OrgAssets, type SessionID int64 type SessionStatus string -const NilSessionID = SessionID(0) - type FlowRunID int64 const NilFlowRunID = FlowRunID(0) @@ -989,18 +987,6 @@ WHERE fs.contact_id = ANY($2) ` -func SessionForConnection(ctx context.Context, db *sqlx.DB, conn *ChannelConnection) (SessionID, SessionStatus, error) { - res := struct { - ID SessionID `db:"id"` - Status SessionStatus `db:"status"` - }{} - err := db.GetContext(ctx, &res, `SELECT id, status FROM flows_flowsession WHERE connection_id = $1`, conn.ID()) - if err == sql.ErrNoRows { - return NilSessionID, SessionStatusFailed, nil - } - return res.ID, res.Status, err -} - // RunExpiration looks up the run expiration for the passed in run, can return nil if the run is no longer active func RunExpiration(ctx context.Context, db *sqlx.DB, runID FlowRunID) (*time.Time, error) { var expiration time.Time diff --git a/core/tasks/ivr/worker_test.go b/core/tasks/ivr/worker_test.go index a3bebe16d..05671071e 100644 --- a/core/tasks/ivr/worker_test.go +++ b/core/tasks/ivr/worker_test.go @@ -102,8 +102,8 @@ func (c *MockClient) ResumeForRequest(r *http.Request) (ivr.Resume, error) { return nil, nil } -func (c *MockClient) StatusForRequest(r *http.Request) (models.ConnectionStatus, int, bool) { - return models.ConnectionStatusFailed, 10, false +func (c *MockClient) StatusForRequest(r *http.Request) (models.ConnectionStatus, models.ConnectionError, int) { + return models.ConnectionStatusFailed, models.ConnectionErrorProvider, 10 } func (c *MockClient) PreprocessResume(ctx context.Context, db *sqlx.DB, rp *redis.Pool, conn *models.ChannelConnection, r *http.Request) ([]byte, error) { diff --git a/mailroom_test.dump b/mailroom_test.dump index f85d1aabc..a05e40b3b 100644 Binary files a/mailroom_test.dump and b/mailroom_test.dump differ diff --git a/web/ivr/ivr_test.go b/web/ivr/ivr_test.go index ba7fa3d8c..1ebc8c8d4 100644 --- a/web/ivr/ivr_test.go +++ b/web/ivr/ivr_test.go @@ -11,6 +11,7 @@ import ( "sync" "testing" + "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/goflow/test" "github.com/nyaruka/mailroom/config" "github.com/nyaruka/mailroom/core/models" @@ -30,6 +31,30 @@ import ( ivr_tasks "github.com/nyaruka/mailroom/core/tasks/ivr" ) +// mocks the Twilio API +func mockTwilioHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + logrus.WithField("method", r.Method).WithField("url", r.URL.String()).WithField("form", r.Form).Info("test server called") + if strings.HasSuffix(r.URL.String(), "Calls.json") { + to := r.Form.Get("To") + if to == "+16055741111" { + w.WriteHeader(http.StatusCreated) + w.Write([]byte(`{"sid": "Call1"}`)) + } else if to == "+16055742222" { + w.WriteHeader(http.StatusCreated) + w.Write([]byte(`{"sid": "Call2"}`)) + } else if to == "+16055743333" { + w.WriteHeader(http.StatusCreated) + w.Write([]byte(`{"sid": "Call3"}`)) + } else { + w.WriteHeader(http.StatusNotFound) + } + } + if strings.HasSuffix(r.URL.String(), "recording.mp3") { + w.WriteHeader(http.StatusOK) + } +} + func TestTwilioIVR(t *testing.T) { ctx, _, db, rp := testsuite.Get() rc := rp.Get() @@ -41,25 +66,7 @@ func TestTwilioIVR(t *testing.T) { }() // start test server - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - logrus.WithField("method", r.Method).WithField("url", r.URL.String()).WithField("form", r.Form).Info("test server called") - if strings.HasSuffix(r.URL.String(), "Calls.json") { - to := r.Form.Get("To") - if to == "+16055741111" { - w.WriteHeader(http.StatusCreated) - w.Write([]byte(`{"sid": "Call1"}`)) - } else if to == "+16055743333" { - w.WriteHeader(http.StatusCreated) - w.Write([]byte(`{"sid": "Call2"}`)) - } else { - w.WriteHeader(http.StatusNotFound) - } - } - if strings.HasSuffix(r.URL.String(), "recording.mp3") { - w.WriteHeader(http.StatusOK) - } - })) + ts := httptest.NewServer(http.HandlerFunc(mockTwilioHandler)) defer ts.Close() twiml.BaseURL = ts.URL @@ -73,37 +80,51 @@ func TestTwilioIVR(t *testing.T) { // add auth tokens db.MustExec(`UPDATE channels_channel SET config = '{"auth_token": "token", "account_sid": "sid", "callback_domain": "localhost:8090"}' WHERE id = $1`, testdata.TwilioChannel.ID) - // create a flow start for cathy and george - parentSummary := json.RawMessage(`{"flow": {"name": "IVR Flow", "uuid": "2f81d0ea-4d75-4843-9371-3f7465311cce"}, "uuid": "8bc73097-ac57-47fb-82e5-184f8ec6dbef", "status": "active", "contact": {"id": 10000, "name": "Cathy", "urns": ["tel:+16055741111?id=10000&priority=50"], "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", "fields": {"gender": {"text": "F"}}, "groups": [{"name": "Doctors", "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638"}], "timezone": "America/Los_Angeles", "created_on": "2019-07-23T09:35:01.439614-07:00"}, "results": {}}`) - + // create a flow start for cathy bob, and george + parentSummary := json.RawMessage(`{ + "flow": {"name": "IVR Flow", "uuid": "2f81d0ea-4d75-4843-9371-3f7465311cce"}, + "uuid": "8bc73097-ac57-47fb-82e5-184f8ec6dbef", + "status": "active", + "contact": { + "id": 10000, + "name": "Cathy", + "urns": ["tel:+16055741111?id=10000&priority=50"], + "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", + "fields": {"gender": {"text": "F"}}, + "groups": [{"name": "Doctors", "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638"}], + "timezone": "America/Los_Angeles", + "created_on": "2019-07-23T09:35:01.439614-07:00" + }, + "results": {} + }`) start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID, models.DoRestartParticipants, models.DoIncludeActive). - WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.George.ID}). + WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID, testdata.George.ID}). WithParentSummary(parentSummary) err := models.InsertFlowStarts(ctx, db, []*models.FlowStart{start}) - assert.NoError(t, err) + require.NoError(t, err) // call our master starter err = starts.CreateFlowBatches(ctx, db, rp, nil, start) - assert.NoError(t, err) + require.NoError(t, err) // start our task - task, err := queue.PopNextTask(rc, queue.HandlerQueue) - assert.NoError(t, err) + task, err := queue.PopNextTask(rc, queue.BatchQueue) + require.NoError(t, err) batch := &models.FlowStartBatch{} - err = json.Unmarshal(task.Task, batch) - assert.NoError(t, err) + jsonx.MustUnmarshal(task.Task, batch) - // request our call to start + // request our calls to start err = ivr_tasks.HandleFlowStartBatch(ctx, config.Mailroom, db, rp, batch) - assert.NoError(t, err) + require.NoError(t, err) + // check our 3 contacts have 3 wired calls testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.ConnectionStatusWired, "Call1").Returns(1) - - testsuite.AssertQuery(t, db, - `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, - testdata.George.ID, models.ConnectionStatusWired, "Call2").Returns(1) + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.Bob.ID, models.ConnectionStatusWired, "Call2").Returns(1) + testsuite.AssertQuery(t, db, `SELECT COUNT(*) FROM channels_channelconnection WHERE contact_id = $1 AND status = $2 AND external_id = $3`, + testdata.George.ID, models.ConnectionStatusWired, "Call3").Returns(1) tcs := []struct { label string @@ -240,6 +261,25 @@ func TestTwilioIVR(t *testing.T) { expectedStatus: 200, expectedResponse: "", }, + { + label: "call3 started", + url: fmt.Sprintf("/ivr/c/%s/handle?action=start&connection=3", testdata.TwilioChannel.UUID), + form: nil, + expectedStatus: 200, + contains: []string{`Hello there. Please enter one or two. This flow was triggered by Cathy`}, + }, + { + label: "answer machine detection sent to tell us we're talking to a voicemail", + url: fmt.Sprintf("/ivr/c/%s/status", testdata.TwilioChannel.UUID), + form: url.Values{ + "CallSid": []string{"Call3"}, + "AccountSid": []string{"sid"}, + "AnsweredBy": []string{"machine_start"}, + "MachineDetectionDuration": []string{"2000"}, + }, + expectedStatus: 200, + contains: []string{"