Skip to content

Commit

Permalink
Merge pull request #656 from nyaruka/clogs
Browse files Browse the repository at this point in the history
Restructure channel logs like how they are in courier
  • Loading branch information
rowanseymour authored Sep 12, 2022
2 parents 1f85830 + 92d893c commit acd61cf
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 235 deletions.
65 changes: 41 additions & 24 deletions core/ivr/ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Service interface {
}

// HangupCall hangs up the passed in call also taking care of updating the status of our call in the process
func HangupCall(ctx context.Context, rt *runtime.Runtime, conn *models.ChannelConnection) (*httpx.Trace, error) {
func HangupCall(ctx context.Context, rt *runtime.Runtime, conn *models.ChannelConnection) (*models.ChannelLog, error) {
// no matter what mark our call as failed
defer conn.MarkFailed(ctx, rt.DB, time.Now())

Expand All @@ -113,14 +113,25 @@ func HangupCall(ctx context.Context, rt *runtime.Runtime, conn *models.ChannelCo
}

// create the right service
c, err := GetService(channel)
svc, err := GetService(channel)
if err != nil {
return nil, errors.Wrapf(err, "unable to create IVR service")
}

clog := models.NewChannelLog(models.ChannelLogTypeIVRHangup, channel, nil)
clog.SetConnection(conn)
defer clog.End()

// try to request our call hangup
trace, err := c.HangupCall(conn.ExternalID())
return trace, errors.Wrapf(err, "error hanging call up")
trace, err := svc.HangupCall(conn.ExternalID())
if trace != nil {
clog.HTTP(trace)
}
if err != nil {
clog.Error(err)
return clog, errors.Wrapf(err, "error hanging call up")
}
return clog, nil
}

// RequestCallStart creates a new ChannelSession for the passed in flow start and contact, returning the created session
Expand Down Expand Up @@ -179,10 +190,19 @@ func RequestCallStart(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAs
return nil, errors.Wrapf(err, "error creating ivr session")
}

return conn, RequestCallStartForConnection(ctx, rt, channel, telURN, conn)
clog, err := RequestCallStartForConnection(ctx, rt, channel, telURN, conn)

// log any error inserting our channel log, but continue
if clog != nil {
if err := models.InsertChannelLogs(ctx, rt.DB, []*models.ChannelLog{clog}); err != nil {
logrus.WithError(err).Error("error inserting channel log")
}
}

return conn, err
}

func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, channel *models.Channel, telURN urns.URN, conn *models.ChannelConnection) error {
func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, channel *models.Channel, telURN urns.URN, conn *models.ChannelConnection) (*models.ChannelLog, error) {
// the domain that will be used for callbacks, can be specific for channels due to white labeling
domain := channel.ConfigValue(models.ChannelConfigCallbackDomain, rt.Config.Domain)

Expand All @@ -195,17 +215,17 @@ func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, cha
if maxCalls > 0 {
count, err := models.ActiveChannelConnectionCount(ctx, rt.DB, channel.ID())
if err != nil {
return errors.Wrapf(err, "error finding number of active channel connections")
return nil, errors.Wrapf(err, "error finding number of active channel connections")
}

// we are at max calls, do not move on
if count >= maxCalls {
logrus.WithField("channel_id", channel.ID()).Info("call being queued, max concurrent reached")
err := conn.MarkThrottled(ctx, rt.DB, time.Now())
if err != nil {
return errors.Wrapf(err, "error marking connection as throttled")
return nil, errors.Wrapf(err, "error marking connection as throttled")
}
return nil
return nil, nil
}
}
}
Expand All @@ -224,39 +244,36 @@ func RequestCallStartForConnection(ctx context.Context, rt *runtime.Runtime, cha
// create the right service
c, err := GetService(channel)
if err != nil {
return errors.Wrapf(err, "unable to create IVR service")
return nil, errors.Wrapf(err, "unable to create IVR service")
}

clog := models.NewChannelLog(models.ChannelLogTypeIVRStart, channel, nil)
clog.SetConnection(conn)
defer clog.End()

// try to request our call start
callID, trace, err := c.RequestCall(telURN, resumeURL, statusURL, channel.MachineDetection())

/// insert an channel log if we have an HTTP trace
if trace != nil {
log := models.NewChannelLog(channel.ID(), conn, models.ChannelLogTypeIVRStart, trace)
err := models.InsertChannelLogs(ctx, rt.DB, []*models.ChannelLog{log})

// log any error inserting our channel log, but try to continue
if err != nil {
logrus.WithError(err).Error("error inserting channel log")
}
clog.HTTP(trace)
}

if err != nil {
clog.Error(err)

// set our status as errored
err := conn.UpdateStatus(ctx, rt.DB, models.ConnectionStatusFailed, 0, time.Now())
if err != nil {
return errors.Wrapf(err, "error setting errored status on session")
return clog, errors.Wrapf(err, "error setting errored status on session")
}
return nil
return clog, nil
}

// update our channel session
err = conn.UpdateExternalID(ctx, rt.DB, string(callID))
if err != nil {
return errors.Wrapf(err, "error updating session external id")
return clog, errors.Wrapf(err, "error updating session external id")
}

return nil
return clog, nil
}

// HandleAsFailure marks the passed in connection as errored and writes the appropriate error response to our writer
Expand Down
6 changes: 3 additions & 3 deletions core/models/channel_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ FROM
channels_channelconnection as cc
LEFT OUTER JOIN flows_flowstart_connections fsc ON cc.id = fsc.channelconnection_id
WHERE
cc.id = $1
cc.org_id = $1 AND cc.id = $2
`

// SelectChannelConnection loads a channel connection by id
func SelectChannelConnection(ctx context.Context, db Queryer, id ConnectionID) (*ChannelConnection, error) {
func SelectChannelConnection(ctx context.Context, db Queryer, orgID OrgID, id ConnectionID) (*ChannelConnection, error) {
conn := &ChannelConnection{}
err := db.GetContext(ctx, &conn.c, selectConnectionSQL, id)
err := db.GetContext(ctx, &conn.c, selectConnectionSQL, orgID, id)
if err != nil {
return nil, errors.Wrapf(err, "unable to load channel connection with id: %d", id)
}
Expand Down
2 changes: 1 addition & 1 deletion core/models/channel_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestChannelConnections(t *testing.T) {

assertdb.Query(t, db, `SELECT count(*) from channels_channelconnection where external_id = 'test1' AND id = $1`, conn.ID()).Returns(1)

conn2, err := models.SelectChannelConnection(ctx, db, conn.ID())
conn2, err := models.SelectChannelConnection(ctx, db, testdata.Org1.ID, conn.ID())
assert.NoError(t, err)
assert.Equal(t, "test1", conn2.ExternalID())
}
145 changes: 110 additions & 35 deletions core/models/channel_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"encoding/json"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/stringsx"
"github.com/nyaruka/gocommon/uuids"
"github.com/pkg/errors"
)
Expand All @@ -17,7 +19,7 @@ type ChannelLogID int64
// ChannelLogUUID is our type for a channel log UUID
type ChannelLogUUID uuids.UUID

type ChanneLogType string
type ChannelLogType string

const (
ChannelLogTypeIVRStart = "ivr_start"
Expand All @@ -27,53 +29,126 @@ const (
ChannelLogTypeIVRHangup = "ivr_hangup"
)

// ChannelLog is the mailroom struct that represents channel logs
type ChannelError struct {
Message string `json:"message"`
Code string `json:"code"`
}

func NewChannelError(message, code string) ChannelError {
return ChannelError{Message: message, Code: code}
}

// ChannelLog stores the HTTP traces and errors generated by an interaction with a channel.
type ChannelLog struct {
ID ChannelLogID `db:"id"`
UUID ChannelLogUUID `db:"uuid"`
ChannelID ChannelID `db:"channel_id"`
ConnectionID ConnectionID `db:"connection_id"`

Type ChanneLogType `db:"log_type"`
HTTPLogs json.RawMessage `db:"http_logs"`
IsError bool `db:"is_error"`
ElapsedMS int `db:"elapsed_ms"`
CreatedOn time.Time `db:"created_on"`
uuid ChannelLogUUID
type_ ChannelLogType
channel *Channel
connection *ChannelConnection
httpLogs []*httpx.Log
errors []ChannelError
createdOn time.Time
elapsed time.Duration

recorder *httpx.Recorder
redactor stringsx.Redactor
}

const sqlInsertChannelLog = `
INSERT INTO channels_channellog( uuid, channel_id, connection_id, log_type, http_logs, is_error, elapsed_ms, created_on)
VALUES(:uuid, :channel_id, :connection_id, :log_type, :http_logs, :is_error, :elapsed_ms, :created_on)
RETURNING id`
// NewChannelLog creates a new channel log with the given type and channel
func NewChannelLog(t ChannelLogType, ch *Channel, redactVals []string) *ChannelLog {
return newChannelLog(t, ch, nil, redactVals)
}

// NewChannelLog creates a new channel log from the given HTTP trace
func NewChannelLog(channelID ChannelID, conn *ChannelConnection, logType ChanneLogType, trace *httpx.Trace) *ChannelLog {
httpLog := httpx.NewLog(trace, 2048, 50000, nil)
// NewChannelLogForIncoming creates a new channel log for an incoming request
func NewChannelLogForIncoming(t ChannelLogType, ch *Channel, r *httpx.Recorder, redactVals []string) *ChannelLog {
return newChannelLog(t, ch, r, redactVals)
}

isError := false
if trace.Response == nil || trace.Response.StatusCode/100 != 2 {
isError = true
}
func newChannelLog(t ChannelLogType, ch *Channel, r *httpx.Recorder, redactVals []string) *ChannelLog {
return &ChannelLog{
uuid: ChannelLogUUID(uuids.New()),
type_: t,
channel: ch,
createdOn: dates.Now(),

l := &ChannelLog{
UUID: ChannelLogUUID(uuids.New()),
ChannelID: channelID,
Type: logType,
HTTPLogs: jsonx.MustMarshal([]*httpx.Log{httpLog}),
IsError: isError,
ElapsedMS: httpLog.ElapsedMS,
CreatedOn: time.Now(),
recorder: r,
redactor: stringsx.NewRedactor("**********", redactVals...),
}
}

if conn != nil {
l.ConnectionID = conn.ID()
func (l *ChannelLog) SetConnection(c *ChannelConnection) {
l.connection = c
}

func (l *ChannelLog) HTTP(t *httpx.Trace) {
l.httpLogs = append(l.httpLogs, l.traceToLog(t))
}

func (l *ChannelLog) Error(err error) {
l.errors = append(l.errors, NewChannelError(err.Error(), ""))
}

func (l *ChannelLog) End() {
if l.recorder != nil {
// prepend so it's the first HTTP request in the log
l.httpLogs = append([]*httpx.Log{l.traceToLog(l.recorder.Trace)}, l.httpLogs...)
}

return l
l.elapsed = time.Since(l.createdOn)
}

func (l *ChannelLog) traceToLog(t *httpx.Trace) *httpx.Log {
return httpx.NewLog(t, 2048, 50000, l.redactor)
}

const sqlInsertChannelLog = `
INSERT INTO channels_channellog( uuid, channel_id, connection_id, log_type, http_logs, errors, is_error, elapsed_ms, created_on)
VALUES(:uuid, :channel_id, :connection_id, :log_type, :http_logs, :errors, :is_error, :elapsed_ms, :created_on)
RETURNING id`

type dbChannelLog struct {
ID ChannelLogID `db:"id"`
UUID ChannelLogUUID `db:"uuid"`
ChannelID ChannelID `db:"channel_id"`
ConnectionID ConnectionID `db:"connection_id"`
Type ChannelLogType `db:"log_type"`
HTTPLogs json.RawMessage `db:"http_logs"`
Errors json.RawMessage `db:"errors"`
IsError bool `db:"is_error"`
ElapsedMS int `db:"elapsed_ms"`
CreatedOn time.Time `db:"created_on"`
}

// InsertChannelLogs writes the given channel logs to the db
func InsertChannelLogs(ctx context.Context, db Queryer, logs []*ChannelLog) error {
err := BulkQuery(ctx, "insert channel log", db, sqlInsertChannelLog, logs)
vs := make([]*dbChannelLog, len(logs))
for i, l := range logs {
// if we have an error or a non 2XX/3XX http response then this log is marked as an error
isError := len(l.errors) > 0
if !isError {
for _, l := range l.httpLogs {
if l.StatusCode < 200 || l.StatusCode >= 400 {
isError = true
break
}
}
}

v := &dbChannelLog{
UUID: ChannelLogUUID(uuids.New()),
ChannelID: l.channel.ID(),
Type: l.type_,
HTTPLogs: jsonx.MustMarshal(l.httpLogs),
Errors: jsonx.MustMarshal(l.errors),
IsError: isError,
CreatedOn: time.Now(),
ElapsedMS: int(l.elapsed / time.Millisecond),
}
if l.connection != nil {
v.ConnectionID = l.connection.ID()
}
vs[i] = v
}

err := BulkQuery(ctx, "insert channel log", db, sqlInsertChannelLog, vs)
return errors.Wrapf(err, "error inserting channel logs")
}
15 changes: 12 additions & 3 deletions core/models/channel_logs_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models_test

import (
"errors"
"net/http"
"testing"

Expand Down Expand Up @@ -30,17 +31,25 @@ func TestChannelLogsOutgoing(t *testing.T) {
channel := oa.ChannelByID(testdata.TwilioChannel.ID)
require.NotNil(t, channel)

clog1 := models.NewChannelLog(models.ChannelLogTypeIVRStart, channel, nil)
clog2 := models.NewChannelLog(models.ChannelLogTypeIVRHangup, channel, nil)

req1, _ := httpx.NewRequest("GET", "http://ivr.com/start", nil, nil)
trace1, err := httpx.DoTrace(http.DefaultClient, req1, nil, nil, -1)
require.NoError(t, err)
log1 := models.NewChannelLog(channel.ID(), nil, models.ChannelLogTypeIVRStart, trace1)

clog1.HTTP(trace1)
clog1.End()

req2, _ := httpx.NewRequest("GET", "http://ivr.com/hangup", nil, nil)
trace2, err := httpx.DoTrace(http.DefaultClient, req2, nil, nil, -1)
require.NoError(t, err)
log2 := models.NewChannelLog(channel.ID(), nil, models.ChannelLogTypeIVRHangup, trace2)

err = models.InsertChannelLogs(ctx, db, []*models.ChannelLog{log1, log2})
clog2.HTTP(trace2)
clog2.Error(errors.New("oops"))
clog2.End()

err = models.InsertChannelLogs(ctx, db, []*models.ChannelLog{clog1, clog2})
require.NoError(t, err)

assertdb.Query(t, db, `SELECT count(*) FROM channels_channellog`).Returns(2)
Expand Down
Loading

0 comments on commit acd61cf

Please sign in to comment.