diff --git a/core/ivr/ivr.go b/core/ivr/ivr.go index ccf5521d6..098f69653 100644 --- a/core/ivr/ivr.go +++ b/core/ivr/ivr.go @@ -96,7 +96,7 @@ type Service interface { } // HangupCall hangs up the passed in call also taking care of updating the status of our call in the process -func HangupCall(ctx context.Context, rt *runtime.Runtime, conn *models.ChannelConnection) (*httpx.Trace, error) { +func HangupCall(ctx context.Context, rt *runtime.Runtime, conn *models.ChannelConnection) (*models.ChannelLog, error) { // no matter what mark our call as failed defer conn.MarkFailed(ctx, rt.DB, time.Now()) @@ -113,14 +113,25 @@ func HangupCall(ctx context.Context, rt *runtime.Runtime, conn *models.ChannelCo } // create the right service - c, err := GetService(channel) + svc, err := GetService(channel) if err != nil { return nil, errors.Wrapf(err, "unable to create IVR service") } + clog := models.NewChannelLog(models.ChannelLogTypeIVRHangup, channel, nil) + clog.SetConnection(conn) + defer clog.End() + // try to request our call hangup - trace, err := c.HangupCall(conn.ExternalID()) - return trace, errors.Wrapf(err, "error hanging call up") + trace, err := svc.HangupCall(conn.ExternalID()) + if trace != nil { + clog.HTTP(trace) + } + if err != nil { + clog.Error(err) + return clog, errors.Wrapf(err, "error hanging call up") + } + return clog, nil } // RequestCallStart creates a new ChannelSession for the passed in flow start and contact, returning the created session @@ -179,10 +190,19 @@ func RequestCallStart(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAs return nil, errors.Wrapf(err, "error creating ivr session") } - return conn, RequestCallStartForConnection(ctx, rt, channel, telURN, conn) + clog, err := RequestCallStartForConnection(ctx, rt, channel, telURN, conn) + + // log any error inserting our channel log, but continue + if clog != nil { + if err := models.InsertChannelLogs(ctx, rt.DB, []*models.ChannelLog{clog}); err != nil { + logrus.WithError(err).Error("error inserting channel log") + } + } + + return conn, err } -func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, channel *models.Channel, telURN urns.URN, conn *models.ChannelConnection) error { +func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, channel *models.Channel, telURN urns.URN, conn *models.ChannelConnection) (*models.ChannelLog, error) { // the domain that will be used for callbacks, can be specific for channels due to white labeling domain := channel.ConfigValue(models.ChannelConfigCallbackDomain, rt.Config.Domain) @@ -195,7 +215,7 @@ func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, cha if maxCalls > 0 { count, err := models.ActiveChannelConnectionCount(ctx, rt.DB, channel.ID()) if err != nil { - return errors.Wrapf(err, "error finding number of active channel connections") + return nil, errors.Wrapf(err, "error finding number of active channel connections") } // we are at max calls, do not move on @@ -203,9 +223,9 @@ func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, cha logrus.WithField("channel_id", channel.ID()).Info("call being queued, max concurrent reached") err := conn.MarkThrottled(ctx, rt.DB, time.Now()) if err != nil { - return errors.Wrapf(err, "error marking connection as throttled") + return nil, errors.Wrapf(err, "error marking connection as throttled") } - return nil + return nil, nil } } } @@ -224,39 +244,36 @@ func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, cha // create the right service c, err := GetService(channel) if err != nil { - return errors.Wrapf(err, "unable to create IVR service") + return nil, errors.Wrapf(err, "unable to create IVR service") } + clog := models.NewChannelLog(models.ChannelLogTypeIVRStart, channel, nil) + clog.SetConnection(conn) + defer clog.End() + // try to request our call start callID, trace, err := c.RequestCall(telURN, resumeURL, statusURL, channel.MachineDetection()) - - /// insert an channel log if we have an HTTP trace if trace != nil { - log := models.NewChannelLog(channel.ID(), conn, models.ChannelLogTypeIVRStart, trace) - err := models.InsertChannelLogs(ctx, rt.DB, []*models.ChannelLog{log}) - - // log any error inserting our channel log, but try to continue - if err != nil { - logrus.WithError(err).Error("error inserting channel log") - } + clog.HTTP(trace) } - if err != nil { + clog.Error(err) + // set our status as errored err := conn.UpdateStatus(ctx, rt.DB, models.ConnectionStatusFailed, 0, time.Now()) if err != nil { - return errors.Wrapf(err, "error setting errored status on session") + return clog, errors.Wrapf(err, "error setting errored status on session") } - return nil + return clog, nil } // update our channel session err = conn.UpdateExternalID(ctx, rt.DB, string(callID)) if err != nil { - return errors.Wrapf(err, "error updating session external id") + return clog, errors.Wrapf(err, "error updating session external id") } - return nil + return clog, nil } // HandleAsFailure marks the passed in connection as errored and writes the appropriate error response to our writer diff --git a/core/models/channel_connection.go b/core/models/channel_connection.go index 2d6d97037..bae38fd12 100644 --- a/core/models/channel_connection.go +++ b/core/models/channel_connection.go @@ -211,13 +211,13 @@ FROM channels_channelconnection as cc LEFT OUTER JOIN flows_flowstart_connections fsc ON cc.id = fsc.channelconnection_id WHERE - cc.id = $1 + cc.org_id = $1 AND cc.id = $2 ` // SelectChannelConnection loads a channel connection by id -func SelectChannelConnection(ctx context.Context, db Queryer, id ConnectionID) (*ChannelConnection, error) { +func SelectChannelConnection(ctx context.Context, db Queryer, orgID OrgID, id ConnectionID) (*ChannelConnection, error) { conn := &ChannelConnection{} - err := db.GetContext(ctx, &conn.c, selectConnectionSQL, id) + err := db.GetContext(ctx, &conn.c, selectConnectionSQL, orgID, id) if err != nil { return nil, errors.Wrapf(err, "unable to load channel connection with id: %d", id) } diff --git a/core/models/channel_connection_test.go b/core/models/channel_connection_test.go index c8a150563..b5251db60 100644 --- a/core/models/channel_connection_test.go +++ b/core/models/channel_connection_test.go @@ -25,7 +25,7 @@ func TestChannelConnections(t *testing.T) { assertdb.Query(t, db, `SELECT count(*) from channels_channelconnection where external_id = 'test1' AND id = $1`, conn.ID()).Returns(1) - conn2, err := models.SelectChannelConnection(ctx, db, conn.ID()) + conn2, err := models.SelectChannelConnection(ctx, db, testdata.Org1.ID, conn.ID()) assert.NoError(t, err) assert.Equal(t, "test1", conn2.ExternalID()) } diff --git a/core/models/channel_logs.go b/core/models/channel_logs.go index f74cc8fdd..047fe4f61 100644 --- a/core/models/channel_logs.go +++ b/core/models/channel_logs.go @@ -5,8 +5,10 @@ import ( "encoding/json" "time" + "github.com/nyaruka/gocommon/dates" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/gocommon/stringsx" "github.com/nyaruka/gocommon/uuids" "github.com/pkg/errors" ) @@ -17,7 +19,7 @@ type ChannelLogID int64 // ChannelLogUUID is our type for a channel log UUID type ChannelLogUUID uuids.UUID -type ChanneLogType string +type ChannelLogType string const ( ChannelLogTypeIVRStart = "ivr_start" @@ -27,53 +29,126 @@ const ( ChannelLogTypeIVRHangup = "ivr_hangup" ) -// ChannelLog is the mailroom struct that represents channel logs +type ChannelError struct { + Message string `json:"message"` + Code string `json:"code"` +} + +func NewChannelError(message, code string) ChannelError { + return ChannelError{Message: message, Code: code} +} + +// ChannelLog stores the HTTP traces and errors generated by an interaction with a channel. type ChannelLog struct { - ID ChannelLogID `db:"id"` - UUID ChannelLogUUID `db:"uuid"` - ChannelID ChannelID `db:"channel_id"` - ConnectionID ConnectionID `db:"connection_id"` - - Type ChanneLogType `db:"log_type"` - HTTPLogs json.RawMessage `db:"http_logs"` - IsError bool `db:"is_error"` - ElapsedMS int `db:"elapsed_ms"` - CreatedOn time.Time `db:"created_on"` + uuid ChannelLogUUID + type_ ChannelLogType + channel *Channel + connection *ChannelConnection + httpLogs []*httpx.Log + errors []ChannelError + createdOn time.Time + elapsed time.Duration + + recorder *httpx.Recorder + redactor stringsx.Redactor } -const sqlInsertChannelLog = ` -INSERT INTO channels_channellog( uuid, channel_id, connection_id, log_type, http_logs, is_error, elapsed_ms, created_on) - VALUES(:uuid, :channel_id, :connection_id, :log_type, :http_logs, :is_error, :elapsed_ms, :created_on) - RETURNING id` +// NewChannelLog creates a new channel log with the given type and channel +func NewChannelLog(t ChannelLogType, ch *Channel, redactVals []string) *ChannelLog { + return newChannelLog(t, ch, nil, redactVals) +} -// NewChannelLog creates a new channel log from the given HTTP trace -func NewChannelLog(channelID ChannelID, conn *ChannelConnection, logType ChanneLogType, trace *httpx.Trace) *ChannelLog { - httpLog := httpx.NewLog(trace, 2048, 50000, nil) +// NewChannelLogForIncoming creates a new channel log for an incoming request +func NewChannelLogForIncoming(t ChannelLogType, ch *Channel, r *httpx.Recorder, redactVals []string) *ChannelLog { + return newChannelLog(t, ch, r, redactVals) +} - isError := false - if trace.Response == nil || trace.Response.StatusCode/100 != 2 { - isError = true - } +func newChannelLog(t ChannelLogType, ch *Channel, r *httpx.Recorder, redactVals []string) *ChannelLog { + return &ChannelLog{ + uuid: ChannelLogUUID(uuids.New()), + type_: t, + channel: ch, + createdOn: dates.Now(), - l := &ChannelLog{ - UUID: ChannelLogUUID(uuids.New()), - ChannelID: channelID, - Type: logType, - HTTPLogs: jsonx.MustMarshal([]*httpx.Log{httpLog}), - IsError: isError, - ElapsedMS: httpLog.ElapsedMS, - CreatedOn: time.Now(), + recorder: r, + redactor: stringsx.NewRedactor("**********", redactVals...), } +} - if conn != nil { - l.ConnectionID = conn.ID() +func (l *ChannelLog) SetConnection(c *ChannelConnection) { + l.connection = c +} + +func (l *ChannelLog) HTTP(t *httpx.Trace) { + l.httpLogs = append(l.httpLogs, l.traceToLog(t)) +} + +func (l *ChannelLog) Error(err error) { + l.errors = append(l.errors, NewChannelError(err.Error(), "")) +} + +func (l *ChannelLog) End() { + if l.recorder != nil { + // prepend so it's the first HTTP request in the log + l.httpLogs = append([]*httpx.Log{l.traceToLog(l.recorder.Trace)}, l.httpLogs...) } - return l + l.elapsed = time.Since(l.createdOn) +} + +func (l *ChannelLog) traceToLog(t *httpx.Trace) *httpx.Log { + return httpx.NewLog(t, 2048, 50000, l.redactor) +} + +const sqlInsertChannelLog = ` +INSERT INTO channels_channellog( uuid, channel_id, connection_id, log_type, http_logs, errors, is_error, elapsed_ms, created_on) + VALUES(:uuid, :channel_id, :connection_id, :log_type, :http_logs, :errors, :is_error, :elapsed_ms, :created_on) + RETURNING id` + +type dbChannelLog struct { + ID ChannelLogID `db:"id"` + UUID ChannelLogUUID `db:"uuid"` + ChannelID ChannelID `db:"channel_id"` + ConnectionID ConnectionID `db:"connection_id"` + Type ChannelLogType `db:"log_type"` + HTTPLogs json.RawMessage `db:"http_logs"` + Errors json.RawMessage `db:"errors"` + IsError bool `db:"is_error"` + ElapsedMS int `db:"elapsed_ms"` + CreatedOn time.Time `db:"created_on"` } // InsertChannelLogs writes the given channel logs to the db func InsertChannelLogs(ctx context.Context, db Queryer, logs []*ChannelLog) error { - err := BulkQuery(ctx, "insert channel log", db, sqlInsertChannelLog, logs) + vs := make([]*dbChannelLog, len(logs)) + for i, l := range logs { + // if we have an error or a non 2XX/3XX http response then this log is marked as an error + isError := len(l.errors) > 0 + if !isError { + for _, l := range l.httpLogs { + if l.StatusCode < 200 || l.StatusCode >= 400 { + isError = true + break + } + } + } + + v := &dbChannelLog{ + UUID: ChannelLogUUID(uuids.New()), + ChannelID: l.channel.ID(), + Type: l.type_, + HTTPLogs: jsonx.MustMarshal(l.httpLogs), + Errors: jsonx.MustMarshal(l.errors), + IsError: isError, + CreatedOn: time.Now(), + ElapsedMS: int(l.elapsed / time.Millisecond), + } + if l.connection != nil { + v.ConnectionID = l.connection.ID() + } + vs[i] = v + } + + err := BulkQuery(ctx, "insert channel log", db, sqlInsertChannelLog, vs) return errors.Wrapf(err, "error inserting channel logs") } diff --git a/core/models/channel_logs_test.go b/core/models/channel_logs_test.go index a85f0d7f8..63facce7e 100644 --- a/core/models/channel_logs_test.go +++ b/core/models/channel_logs_test.go @@ -1,6 +1,7 @@ package models_test import ( + "errors" "net/http" "testing" @@ -30,17 +31,25 @@ func TestChannelLogsOutgoing(t *testing.T) { channel := oa.ChannelByID(testdata.TwilioChannel.ID) require.NotNil(t, channel) + clog1 := models.NewChannelLog(models.ChannelLogTypeIVRStart, channel, nil) + clog2 := models.NewChannelLog(models.ChannelLogTypeIVRHangup, channel, nil) + req1, _ := httpx.NewRequest("GET", "http://ivr.com/start", nil, nil) trace1, err := httpx.DoTrace(http.DefaultClient, req1, nil, nil, -1) require.NoError(t, err) - log1 := models.NewChannelLog(channel.ID(), nil, models.ChannelLogTypeIVRStart, trace1) + + clog1.HTTP(trace1) + clog1.End() req2, _ := httpx.NewRequest("GET", "http://ivr.com/hangup", nil, nil) trace2, err := httpx.DoTrace(http.DefaultClient, req2, nil, nil, -1) require.NoError(t, err) - log2 := models.NewChannelLog(channel.ID(), nil, models.ChannelLogTypeIVRHangup, trace2) - err = models.InsertChannelLogs(ctx, db, []*models.ChannelLog{log1, log2}) + clog2.HTTP(trace2) + clog2.Error(errors.New("oops")) + clog2.End() + + err = models.InsertChannelLogs(ctx, db, []*models.ChannelLog{clog1, clog2}) require.NoError(t, err) assertdb.Query(t, db, `SELECT count(*) FROM channels_channellog`).Returns(2) diff --git a/core/tasks/expirations/cron.go b/core/tasks/expirations/cron.go index 45846714c..f64f63cb1 100644 --- a/core/tasks/expirations/cron.go +++ b/core/tasks/expirations/cron.go @@ -143,7 +143,7 @@ func ExpireVoiceSessions(ctx context.Context, rt *runtime.Runtime) error { defer rows.Close() expiredSessions := make([]models.SessionID, 0, 100) - channelLogs := make([]*models.ChannelLog, 0, 100) + clogs := make([]*models.ChannelLog, 0, 100) for rows.Next() { expiredWait := &ExpiredVoiceWait{} @@ -156,20 +156,21 @@ func ExpireVoiceSessions(ctx context.Context, rt *runtime.Runtime) error { expiredSessions = append(expiredSessions, expiredWait.SessionID) // load our connection - conn, err := models.SelectChannelConnection(ctx, rt.DB, expiredWait.ConnectionID) + conn, err := models.SelectChannelConnection(ctx, rt.DB, expiredWait.OrgID, expiredWait.ConnectionID) if err != nil { log.WithError(err).WithField("connection_id", expiredWait.ConnectionID).Error("unable to load connection") continue } // hang up our call - trace, err := ivr.HangupCall(ctx, rt, conn) + clog, err := ivr.HangupCall(ctx, rt, conn) if err != nil { // log error but carry on with other calls log.WithError(err).WithField("connection_id", conn.ID()).Error("error hanging up call") } - if trace != nil { - channelLogs = append(channelLogs, models.NewChannelLog(conn.ChannelID(), conn, models.ChannelLogTypeIVRHangup, trace)) + + if clog != nil { + clogs = append(clogs, clog) } } @@ -182,7 +183,7 @@ func ExpireVoiceSessions(ctx context.Context, rt *runtime.Runtime) error { log.WithField("count", len(expiredSessions)).WithField("elapsed", time.Since(start)).Info("expired and hung up on channel connections") } - if err := models.InsertChannelLogs(ctx, rt.DB, channelLogs); err != nil { + if err := models.InsertChannelLogs(ctx, rt.DB, clogs); err != nil { return errors.Wrap(err, "error inserting channel logs") } @@ -190,7 +191,7 @@ func ExpireVoiceSessions(ctx context.Context, rt *runtime.Runtime) error { } const sqlSelectExpiredVoiceWaits = ` - SELECT id, connection_id, wait_expires_on + SELECT id, org_id, connection_id, wait_expires_on FROM flows_flowsession WHERE session_type = 'V' AND status = 'W' AND wait_expires_on <= NOW() ORDER BY wait_expires_on ASC @@ -198,6 +199,7 @@ ORDER BY wait_expires_on ASC type ExpiredVoiceWait struct { SessionID models.SessionID `db:"id"` + OrgID models.OrgID `db:"org_id"` ConnectionID models.ConnectionID `db:"connection_id"` ExpiresOn time.Time `db:"wait_expires_on"` } diff --git a/core/tasks/ivr/cron.go b/core/tasks/ivr/cron.go index 57742da5f..fd5f948c7 100644 --- a/core/tasks/ivr/cron.go +++ b/core/tasks/ivr/cron.go @@ -31,6 +31,7 @@ func RetryCalls(ctx context.Context, rt *runtime.Runtime) error { } throttledChannels := make(map[models.ChannelID]bool) + clogs := make([]*models.ChannelLog, 0, len(conns)) // schedules calls for each connection for _, conn := range conns { @@ -68,7 +69,10 @@ func RetryCalls(ctx context.Context, rt *runtime.Runtime) error { continue } - err = ivr.RequestCallStartForConnection(ctx, rt, channel, urn, conn) + clog, err := ivr.RequestCallStartForConnection(ctx, rt, channel, urn, conn) + if clog != nil { + clogs = append(clogs, clog) + } if err != nil { log.WithError(err).Error(err) continue @@ -78,6 +82,11 @@ func RetryCalls(ctx context.Context, rt *runtime.Runtime) error { throttledChannels[conn.ChannelID()] = true } + // log any error inserting our channel logs, but continue + if err := models.InsertChannelLogs(ctx, rt.DB, clogs); err != nil { + logrus.WithError(err).Error("error inserting channel logs") + } + log.WithField("count", len(conns)).WithField("elapsed", time.Since(start)).Info("retried errored calls") return nil diff --git a/web/ivr/ivr.go b/web/ivr/ivr.go index 586642708..a00df8cd4 100644 --- a/web/ivr/ivr.go +++ b/web/ivr/ivr.go @@ -29,105 +29,106 @@ func init() { web.RegisterRoute(http.MethodPost, "/mr/ivr/c/{uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}}/incoming", newIVRHandler(handleIncoming, models.ChannelLogTypeIVRIncoming)) } -type ivrHandlerFn func(ctx context.Context, rt *runtime.Runtime, r *http.Request, w http.ResponseWriter) (*models.Channel, *models.ChannelConnection, error) +type ivrHandlerFn func(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, ch *models.Channel, svc ivr.Service, r *http.Request, w http.ResponseWriter) (*models.ChannelConnection, error) -func newIVRHandler(handler ivrHandlerFn, logType models.ChanneLogType) web.Handler { +func newIVRHandler(handler ivrHandlerFn, logType models.ChannelLogType) web.Handler { return func(ctx context.Context, rt *runtime.Runtime, r *http.Request, w http.ResponseWriter) error { - recorder, err := httpx.NewRecorder(r, w, true) + channelUUID := assets.ChannelUUID(chi.URLParam(r, "uuid")) + + // load the org id for this UUID (we could load the entire channel here but we want to take the same paths through everything else) + orgID, err := models.OrgIDForChannelUUID(ctx, rt.DB, channelUUID) if err != nil { - return errors.Wrapf(err, "error reading request body") + return writeGenericErrorResponse(w, err) } - channel, connection, rerr := handler(ctx, rt, r, recorder.ResponseWriter) + // load our org assets + oa, err := models.GetOrgAssets(ctx, rt, orgID) + if err != nil { + return writeGenericErrorResponse(w, errors.Wrapf(err, "error loading org assets")) + } - if channel != nil { - err := recorder.End() - if err != nil { - logrus.WithError(err).WithField("http_request", r).Error("error recording IVR request") - } + // and our channel + ch := oa.ChannelByUUID(channelUUID) + if ch == nil { + return writeGenericErrorResponse(w, errors.Wrapf(err, "no active channel with uuid: %s", channelUUID)) + } - log := models.NewChannelLog(channel.ID(), connection, logType, recorder.Trace) - err = models.InsertChannelLogs(ctx, rt.DB, []*models.ChannelLog{log}) - if err != nil { - logrus.WithError(err).WithField("http_request", r).Error("error writing ivr channel log") - } + // get the IVR service for this channel + svc, err := ivr.GetService(ch) + if svc == nil { + return writeGenericErrorResponse(w, errors.Wrapf(err, "unable to get service for channel: %s", ch.UUID())) } - return rerr - } -} + // validate this request's signature + err = svc.ValidateRequestSignature(r) + if err != nil { + return svc.WriteErrorResponse(w, errors.Wrapf(err, "request failed signature validation")) + } -func handleIncoming(ctx context.Context, rt *runtime.Runtime, r *http.Request, w http.ResponseWriter) (*models.Channel, *models.ChannelConnection, error) { - channelUUID := assets.ChannelUUID(chi.URLParam(r, "uuid")) + recorder, err := httpx.NewRecorder(r, w, true) + if err != nil { + return errors.Wrapf(err, "error reading request body") + } - // load the org id for this UUID (we could load the entire channel here but we want to take the same paths through everything else) - orgID, err := models.OrgIDForChannelUUID(ctx, rt.DB, channelUUID) - if err != nil { - return nil, nil, writeGenericErrorResponse(w, err) - } + clog := models.NewChannelLogForIncoming(logType, ch, recorder, nil) - // load our org assets - oa, err := models.GetOrgAssets(ctx, rt, orgID) - if err != nil { - return nil, nil, writeGenericErrorResponse(w, errors.Wrapf(err, "error loading org assets")) - } + connection, rerr := handler(ctx, rt, oa, ch, svc, r, recorder.ResponseWriter) + clog.SetConnection(connection) - // and our channel - channel := oa.ChannelByUUID(channelUUID) - if channel == nil { - return nil, nil, writeGenericErrorResponse(w, errors.Wrapf(err, "no active channel with uuid: %s", channelUUID)) - } + if err := recorder.End(); err != nil { + logrus.WithError(err).WithField("http_request", r).Error("error recording IVR request") + } - // get the right kind of provider - provider, err := ivr.GetService(channel) - if provider == nil { - return channel, nil, writeGenericErrorResponse(w, errors.Wrapf(err, "unable to load client for channel: %s", channelUUID)) - } + clog.End() - // validate this request's signature - err = provider.ValidateRequestSignature(r) - if err != nil { - return channel, nil, provider.WriteErrorResponse(w, errors.Wrapf(err, "request failed signature validation")) + err = models.InsertChannelLogs(ctx, rt.DB, []*models.ChannelLog{clog}) + if err != nil { + logrus.WithError(err).WithField("http_request", r).Error("error writing ivr channel log") + } + + return rerr } +} +func handleIncoming(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, ch *models.Channel, svc ivr.Service, r *http.Request, w http.ResponseWriter) (*models.ChannelConnection, error) { // lookup the URN of the caller - urn, err := provider.URNForRequest(r) + urn, err := svc.URNForRequest(r) if err != nil { - return channel, nil, provider.WriteErrorResponse(w, errors.Wrapf(err, "unable to find URN in request")) + return nil, svc.WriteErrorResponse(w, errors.Wrapf(err, "unable to find URN in request")) } // get the contact for this URN - contact, _, _, err := models.GetOrCreateContact(ctx, rt.DB, oa, []urns.URN{urn}, channel.ID()) + contact, _, _, err := models.GetOrCreateContact(ctx, rt.DB, oa, []urns.URN{urn}, ch.ID()) if err != nil { - return channel, nil, provider.WriteErrorResponse(w, errors.Wrapf(err, "unable to get contact by urn")) + return nil, svc.WriteErrorResponse(w, errors.Wrapf(err, "unable to get contact by urn")) } urn, err = models.URNForURN(ctx, rt.DB, oa, urn) if err != nil { - return channel, nil, provider.WriteErrorResponse(w, errors.Wrapf(err, "unable to load urn")) + return nil, svc.WriteErrorResponse(w, errors.Wrapf(err, "unable to load urn")) } // urn ID urnID := models.GetURNID(urn) if urnID == models.NilURNID { - return channel, nil, provider.WriteErrorResponse(w, errors.Wrapf(err, "unable to get id for URN")) + return nil, svc.WriteErrorResponse(w, errors.Wrapf(err, "unable to get id for URN")) } // we first create an incoming call channel event and see if that matches - event := models.NewChannelEvent(models.MOCallEventType, oa.OrgID(), channel.ID(), contact.ID(), urnID, nil, false) + event := models.NewChannelEvent(models.MOCallEventType, oa.OrgID(), ch.ID(), contact.ID(), urnID, nil, false) - externalID, err := provider.CallIDForRequest(r) + externalID, err := svc.CallIDForRequest(r) if err != nil { - return channel, nil, provider.WriteErrorResponse(w, errors.Wrapf(err, "unable to get external id from request")) + return nil, svc.WriteErrorResponse(w, errors.Wrapf(err, "unable to get external id from request")) } // create our connection conn, err := models.InsertIVRConnection( - ctx, rt.DB, oa.OrgID(), channel.ID(), models.NilStartID, contact.ID(), urnID, + ctx, rt.DB, oa.OrgID(), ch.ID(), models.NilStartID, contact.ID(), urnID, models.ConnectionDirectionIn, models.ConnectionStatusInProgress, externalID, ) if err != nil { - return channel, nil, provider.WriteErrorResponse(w, errors.Wrapf(err, "error creating ivr connection")) + return nil, svc.WriteErrorResponse(w, errors.Wrapf(err, "error creating ivr connection")) } // try to handle this event @@ -135,40 +136,40 @@ func handleIncoming(ctx context.Context, rt *runtime.Runtime, r *http.Request, w if err != nil { logrus.WithError(err).WithField("http_request", r).Error("error handling incoming call") - return channel, conn, provider.WriteErrorResponse(w, errors.Wrapf(err, "error handling incoming call")) + return conn, svc.WriteErrorResponse(w, errors.Wrapf(err, "error handling incoming call")) } // we got a session back so we have an active call trigger if session != nil { // build our resume URL - resumeURL := buildResumeURL(rt.Config, channel, conn, urn) + resumeURL := buildResumeURL(rt.Config, ch, conn, urn) // have our client output our session status - err = provider.WriteSessionResponse(ctx, rt, channel, conn, session, urn, resumeURL, r, w) + err = svc.WriteSessionResponse(ctx, rt, ch, conn, session, urn, resumeURL, r, w) if err != nil { - return channel, conn, errors.Wrapf(err, "error writing ivr response for start") + return conn, errors.Wrapf(err, "error writing ivr response for start") } - return channel, conn, nil + return conn, nil } // no session means no trigger, create a missed call event instead // we first create an incoming call channel event and see if that matches - event = models.NewChannelEvent(models.MOMissEventType, oa.OrgID(), channel.ID(), contact.ID(), urnID, nil, false) + event = models.NewChannelEvent(models.MOMissEventType, oa.OrgID(), ch.ID(), contact.ID(), urnID, nil, false) err = event.Insert(ctx, rt.DB) if err != nil { - return channel, conn, provider.WriteErrorResponse(w, errors.Wrapf(err, "error inserting channel event")) + return conn, svc.WriteErrorResponse(w, errors.Wrapf(err, "error inserting channel event")) } // try to handle it, this time looking for a missed call event _, err = handler.HandleChannelEvent(ctx, rt, models.MOMissEventType, event, nil) if err != nil { logrus.WithError(err).WithField("http_request", r).Error("error handling missed call") - return channel, conn, provider.WriteErrorResponse(w, errors.Wrapf(err, "error handling missed call")) + return conn, svc.WriteErrorResponse(w, errors.Wrapf(err, "error handling missed call")) } // write our empty response - return channel, conn, provider.WriteEmptyResponse(w, "missed call handled") + return conn, svc.WriteEmptyResponse(w, "missed call handled") } const ( @@ -203,61 +204,37 @@ func buildResumeURL(cfg *runtime.Config, channel *models.Channel, conn *models.C } // handles all incoming IVR requests related to a flow (status is handled elsewhere) -func handleCallback(ctx context.Context, rt *runtime.Runtime, r *http.Request, w http.ResponseWriter) (*models.Channel, *models.ChannelConnection, error) { +func handleCallback(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, ch *models.Channel, svc ivr.Service, r *http.Request, w http.ResponseWriter) (*models.ChannelConnection, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*55) defer cancel() request := &IVRRequest{} if err := web.DecodeAndValidateForm(request, r); err != nil { - return nil, nil, errors.Wrapf(err, "request failed validation") + return nil, errors.Wrapf(err, "request failed validation") } // load our connection - conn, err := models.SelectChannelConnection(ctx, rt.DB, request.ConnectionID) - if err != nil { - return nil, nil, errors.Wrapf(err, "unable to load channel connection with id: %d", request.ConnectionID) - } - - // load our org assets - oa, err := models.GetOrgAssets(ctx, rt, conn.OrgID()) - if err != nil { - return nil, nil, writeGenericErrorResponse(w, errors.Wrapf(err, "error loading org assets")) - } - - // and our channel - channel := oa.ChannelByID(conn.ChannelID()) - if channel == nil { - return nil, nil, writeGenericErrorResponse(w, errors.Errorf("no active channel with id: %d", conn.ChannelID())) - } - - // get the right kind of provider - provider, err := ivr.GetService(channel) - if provider == nil { - return channel, conn, writeGenericErrorResponse(w, errors.Wrapf(err, "unable to load client for channel: %d", conn.ChannelID())) - } - - // validate this request's signature if relevant - err = provider.ValidateRequestSignature(r) + conn, err := models.SelectChannelConnection(ctx, rt.DB, oa.OrgID(), request.ConnectionID) if err != nil { - return channel, conn, writeGenericErrorResponse(w, errors.Wrapf(err, "request failed signature validation")) + return nil, errors.Wrapf(err, "unable to load channel connection with id: %d", request.ConnectionID) } // load our contact contacts, err := models.LoadContacts(ctx, rt.ReadonlyDB, oa, []models.ContactID{conn.ContactID()}) if err != nil { - return channel, conn, provider.WriteErrorResponse(w, errors.Wrapf(err, "no such contact")) + return conn, svc.WriteErrorResponse(w, errors.Wrapf(err, "no such contact")) } if len(contacts) == 0 { - return channel, conn, provider.WriteErrorResponse(w, errors.Errorf("no contact with id: %d", conn.ContactID())) + return conn, svc.WriteErrorResponse(w, errors.Errorf("no contact with id: %d", conn.ContactID())) } if contacts[0].Status() != models.ContactStatusActive { - return channel, conn, provider.WriteErrorResponse(w, errors.Errorf("no contact with id: %d", conn.ContactID())) + return conn, svc.WriteErrorResponse(w, errors.Errorf("no contact with id: %d", conn.ContactID())) } // load the URN for this connection urn, err := models.URNForID(ctx, rt.DB, oa, conn.ContactURNID()) if err != nil { - return channel, conn, provider.WriteErrorResponse(w, errors.Errorf("unable to find connection urn: %d", conn.ContactURNID())) + return conn, svc.WriteErrorResponse(w, errors.Errorf("unable to find connection urn: %d", conn.ContactURNID())) } // make sure our URN is indeed present on our contact, no funny business @@ -268,117 +245,72 @@ func handleCallback(ctx context.Context, rt *runtime.Runtime, r *http.Request, w } } if !found { - return channel, conn, provider.WriteErrorResponse(w, errors.Errorf("unable to find URN: %s on contact: %d", urn, conn.ContactID())) + return conn, svc.WriteErrorResponse(w, errors.Errorf("unable to find URN: %s on contact: %d", urn, conn.ContactID())) } - resumeURL := buildResumeURL(rt.Config, channel, conn, urn) + resumeURL := buildResumeURL(rt.Config, ch, conn, urn) // if this a start, start our contact switch request.Action { case actionStart: - err = ivr.StartIVRFlow( - ctx, rt, provider, resumeURL, - oa, channel, conn, contacts[0], urn, conn.StartID(), - r, w, - ) - + err = ivr.StartIVRFlow(ctx, rt, svc, resumeURL, oa, ch, conn, contacts[0], urn, conn.StartID(), r, w) case actionResume: - err = ivr.ResumeIVRFlow( - ctx, rt, resumeURL, provider, - oa, channel, conn, contacts[0], urn, - r, w, - ) - + err = ivr.ResumeIVRFlow(ctx, rt, resumeURL, svc, oa, ch, conn, contacts[0], urn, r, w) case actionStatus: - err = ivr.HandleIVRStatus( - ctx, rt, oa, provider, conn, - r, w, - ) + err = ivr.HandleIVRStatus(ctx, rt, oa, svc, conn, r, w) default: - err = provider.WriteErrorResponse(w, errors.Errorf("unknown action: %s", request.Action)) + err = svc.WriteErrorResponse(w, errors.Errorf("unknown action: %s", request.Action)) } // had an error? mark our connection as errored and log it if err != nil { logrus.WithError(err).WithField("http_request", r).Error("error while handling IVR") - return channel, conn, ivr.HandleAsFailure(ctx, rt.DB, provider, conn, w, err) + return conn, ivr.HandleAsFailure(ctx, rt.DB, svc, conn, w, err) } - return channel, conn, nil + return conn, nil } // handleStatus handles all incoming IVR events / status updates -func handleStatus(ctx context.Context, rt *runtime.Runtime, r *http.Request, w http.ResponseWriter) (*models.Channel, *models.ChannelConnection, error) { +func handleStatus(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, ch *models.Channel, svc ivr.Service, r *http.Request, w http.ResponseWriter) (*models.ChannelConnection, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*55) defer cancel() - channelUUID := assets.ChannelUUID(chi.URLParam(r, "uuid")) - - // load the org id for this UUID (we could load the entire channel here but we want to take the same paths through everything else) - orgID, err := models.OrgIDForChannelUUID(ctx, rt.DB, channelUUID) - if err != nil { - return nil, nil, writeGenericErrorResponse(w, err) - } - - // load our org assets - oa, err := models.GetOrgAssets(ctx, rt, orgID) - if err != nil { - return nil, nil, writeGenericErrorResponse(w, errors.Wrapf(err, "error loading org assets")) - } - - // and our channel - channel := oa.ChannelByUUID(channelUUID) - if channel == nil { - return nil, nil, writeGenericErrorResponse(w, errors.Wrapf(err, "no active channel with uuid: %s", channelUUID)) - } - - // get the right kind of provider - provider, err := ivr.GetService(channel) - if provider == nil { - return channel, nil, writeGenericErrorResponse(w, errors.Wrapf(err, "unable to load client for channel: %s", channelUUID)) - } - - // validate this request's signature if relevant - err = provider.ValidateRequestSignature(r) - if err != nil { - return channel, nil, writeGenericErrorResponse(w, errors.Wrapf(err, "request failed signature validation")) - } - // preprocess this status - body, err := provider.PreprocessStatus(ctx, rt, r) + body, err := svc.PreprocessStatus(ctx, rt, r) if err != nil { - return channel, nil, provider.WriteErrorResponse(w, errors.Wrapf(err, "error while preprocessing status")) + return nil, svc.WriteErrorResponse(w, errors.Wrapf(err, "error while preprocessing status")) } if len(body) > 0 { contentType := httpx.DetectContentType(body) w.Header().Set("Content-Type", contentType) _, err := w.Write(body) - return channel, nil, err + return nil, err } // get our external id - externalID, err := provider.CallIDForRequest(r) + externalID, err := svc.CallIDForRequest(r) if err != nil { - return channel, nil, provider.WriteErrorResponse(w, errors.Wrapf(err, "unable to get call id for request")) + return nil, svc.WriteErrorResponse(w, errors.Wrapf(err, "unable to get call id for request")) } // load our connection - conn, err := models.SelectChannelConnectionByExternalID(ctx, rt.DB, channel.ID(), models.ConnectionTypeIVR, externalID) + conn, err := models.SelectChannelConnectionByExternalID(ctx, rt.DB, ch.ID(), models.ConnectionTypeIVR, externalID) if errors.Cause(err) == sql.ErrNoRows { - return channel, nil, provider.WriteEmptyResponse(w, "unknown connection, ignoring") + return nil, svc.WriteEmptyResponse(w, "unknown connection, ignoring") } if err != nil { - return channel, nil, provider.WriteErrorResponse(w, errors.Wrapf(err, "unable to load channel connection with id: %s", externalID)) + return nil, svc.WriteErrorResponse(w, errors.Wrapf(err, "unable to load channel connection with id: %s", externalID)) } - err = ivr.HandleIVRStatus(ctx, rt, oa, provider, conn, r, w) + err = ivr.HandleIVRStatus(ctx, rt, oa, svc, conn, r, w) // had an error? mark our connection as errored and log it if err != nil { logrus.WithError(err).WithField("http_request", r).Error("error while handling status") - return channel, conn, ivr.HandleAsFailure(ctx, rt.DB, provider, conn, w, err) + return conn, ivr.HandleAsFailure(ctx, rt.DB, svc, conn, w, err) } - return channel, conn, nil + return conn, nil }