Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new fields to HTTPLog and save for webhook called events #500

Merged
merged 3 commits into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/handlers/airtime_transferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool,
transfer.AddLog(models.NewAirtimeTransferredLog(
oa.OrgID(),
httpLog.URL,
httpLog.StatusCode,
httpLog.Request,
httpLog.Response,
httpLog.Status != flows.CallStatusSuccess,
time.Duration(httpLog.ElapsedMS)*time.Millisecond,
httpLog.Retries,
httpLog.CreatedOn,
))
}
Expand Down
4 changes: 4 additions & 0 deletions core/handlers/service_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,25 @@ func handleServiceCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *m
oa.OrgID(),
classifier.ID(),
httpLog.URL,
httpLog.StatusCode,
httpLog.Request,
httpLog.Response,
httpLog.Status != flows.CallStatusSuccess,
time.Duration(httpLog.ElapsedMS)*time.Millisecond,
httpLog.Retries,
httpLog.CreatedOn,
)
} else if event.Service == "ticketer" {
log = models.NewTicketerCalledLog(
oa.OrgID(),
ticketer.ID(),
httpLog.URL,
httpLog.StatusCode,
httpLog.Request,
httpLog.Response,
httpLog.Status != flows.CallStatusSuccess,
time.Duration(httpLog.ElapsedMS)*time.Millisecond,
httpLog.Retries,
httpLog.CreatedOn,
)
}
Expand Down
21 changes: 18 additions & 3 deletions core/handlers/webhook_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,28 @@ func handleWebhookCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *m
scene.AppendToEventPreCommitHook(hooks.UnsubscribeResthookHook, unsub)
}

// if this is a connection error, use that as our response
run, _ := scene.Session().FindStep(e.StepUUID())
flow, _ := oa.Flow(run.FlowReference().UUID)

// create an HTTP log
if flow != nil {
httpLog := models.NewWebhookCalledLog(
oa.OrgID(),
flow.(*models.Flow).ID(),
event.URL, event.StatusCode, event.Request, event.Response,
event.Status != flows.CallStatusSuccess,
time.Millisecond*time.Duration(event.ElapsedMS),
event.Retries,
event.CreatedOn(),
)
scene.AppendToEventPreCommitHook(hooks.InsertHTTPLogsHook, httpLog)
}

// TODO drop this once RP UI is switched to using HTTP logs
response := event.Response
if event.Status == flows.CallStatusConnectionError {
response = "connection error"
}

// create a result for this call
result := models.NewWebhookResult(
oa.OrgID(), scene.ContactID(),
event.URL, event.Request,
Expand Down
19 changes: 15 additions & 4 deletions core/handlers/webhook_called_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,16 @@ func TestWebhookCalled(t *testing.T) {
{
Actions: handlers.ContactActionMap{
testdata.Cathy: []flows.Action{
actions.NewCallResthook(handlers.NewActionUUID(), "foo", "foo"),
actions.NewCallResthook(handlers.NewActionUUID(), "foo", "foo"), // calls both subscribers
},
testdata.George: []flows.Action{
actions.NewCallResthook(handlers.NewActionUUID(), "foo", "foo"),
actions.NewCallResthook(handlers.NewActionUUID(), "foo", "foo"), // calls both subscribers
actions.NewCallWebhook(handlers.NewActionUUID(), "GET", "http://rapidpro.io/?unsub=1", nil, "", ""),
},
},
SQLAssertions: []handlers.SQLAssertion{
{
SQL: "select count(*) from api_resthooksubscriber where is_active = FALSE",
Args: nil,
Count: 1,
},
{
Expand All @@ -63,9 +62,16 @@ func TestWebhookCalled(t *testing.T) {
},
{
SQL: "select count(*) from api_resthooksubscriber where is_active = TRUE",
Args: nil,
Count: 2,
},
{
SQL: "select count(*) from request_logs_httplog where log_type = 'webhook_called' AND flow_id IS NOT NULL AND status_code = 200",
Count: 2,
},
{
SQL: "select count(*) from request_logs_httplog where log_type = 'webhook_called' AND flow_id IS NOT NULL AND status_code = 410",
Count: 3,
},
{
SQL: "select count(*) from api_webhookresult where contact_id = $1 AND status_code = 200",
Args: []interface{}{testdata.Cathy.ID},
Expand All @@ -81,6 +87,11 @@ func TestWebhookCalled(t *testing.T) {
Args: []interface{}{testdata.George.ID},
Count: 3,
},
{
SQL: "select count(*) from api_webhookresult where contact_id = $1",
Args: []interface{}{testdata.George.ID},
Count: 3,
},
{
SQL: "select count(*) from api_webhookevent where org_id = $1",
Args: []interface{}{testdata.Org1.ID},
Expand Down
85 changes: 50 additions & 35 deletions core/models/http_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type HTTPLogID null.Int
type HTTPLogType string

const (
// LogTypeWebhookCalled is our type for when a flow calls a webhook
LogTypeWebhookCalled = "webhook_called"

// LogTypeIntentsSynced is our type for when we sync intents
LogTypeIntentsSynced = "intents_synced"

Expand All @@ -31,62 +34,72 @@ const (

// HTTPLog is our type for a HTTPLog
type HTTPLog struct {
h struct {
ID HTTPLogID `db:"id"`
LogType HTTPLogType `db:"log_type"`
ClassifierID ClassifierID `db:"classifier_id"`
TicketerID TicketerID `db:"ticketer_id"`
AirtimeTransferID AirtimeTransferID `db:"airtime_transfer_id"`
URL string `db:"url"`
Request string `db:"request"`
Response null.String `db:"response"`
IsError bool `db:"is_error"`
RequestTime int `db:"request_time"`
CreatedOn time.Time `db:"created_on"`
OrgID OrgID `db:"org_id"`
ID HTTPLogID `db:"id"`
OrgID OrgID `db:"org_id"`
LogType HTTPLogType `db:"log_type"`
URL string `db:"url"`
StatusCode int `db:"status_code"`
Request string `db:"request"`
Response null.String `db:"response"`
IsError bool `db:"is_error"`
RequestTime int `db:"request_time"`
NumRetries int `db:"num_retries"`
CreatedOn time.Time `db:"created_on"`
FlowID FlowID `db:"flow_id"`
ClassifierID ClassifierID `db:"classifier_id"`
TicketerID TicketerID `db:"ticketer_id"`
AirtimeTransferID AirtimeTransferID `db:"airtime_transfer_id"`
}

func newHTTPLog(orgID OrgID, logType HTTPLogType, url string, statusCode int, request, response string, isError bool, elapsed time.Duration, retries int, createdOn time.Time) *HTTPLog {
return &HTTPLog{
OrgID: orgID,
LogType: logType,
URL: url,
StatusCode: statusCode,
Request: request,
Response: null.String(response),
IsError: isError,
RequestTime: int(elapsed / time.Millisecond),
NumRetries: retries,
CreatedOn: createdOn,
}
}

func newHTTPLog(orgID OrgID, logType HTTPLogType, url string, request string, response string, isError bool, elapsed time.Duration, createdOn time.Time) *HTTPLog {
h := &HTTPLog{}
h.h.LogType = logType
h.h.OrgID = orgID
h.h.URL = url
h.h.Request = request
h.h.Response = null.String(response)
h.h.IsError = isError
h.h.RequestTime = int(elapsed / time.Millisecond)
h.h.CreatedOn = createdOn
// NewWebhookCalledLog creates a new HTTP log for an in-flow webhook call
func NewWebhookCalledLog(orgID OrgID, fid FlowID, url string, statusCode int, request, response string, isError bool, elapsed time.Duration, retries int, createdOn time.Time) *HTTPLog {
h := newHTTPLog(orgID, LogTypeWebhookCalled, url, statusCode, request, response, isError, elapsed, retries, createdOn)
h.FlowID = fid
return h
}

// NewClassifierCalledLog creates a new HTTP log for a classifier call
func NewClassifierCalledLog(orgID OrgID, cid ClassifierID, url string, request string, response string, isError bool, elapsed time.Duration, createdOn time.Time) *HTTPLog {
h := newHTTPLog(orgID, LogTypeClassifierCalled, url, request, response, isError, elapsed, createdOn)
h.h.ClassifierID = cid
func NewClassifierCalledLog(orgID OrgID, cid ClassifierID, url string, statusCode int, request, response string, isError bool, elapsed time.Duration, retries int, createdOn time.Time) *HTTPLog {
h := newHTTPLog(orgID, LogTypeClassifierCalled, url, statusCode, request, response, isError, elapsed, retries, createdOn)
h.ClassifierID = cid
return h
}

// NewTicketerCalledLog creates a new HTTP log for a ticketer call
func NewTicketerCalledLog(orgID OrgID, tid TicketerID, url string, request string, response string, isError bool, elapsed time.Duration, createdOn time.Time) *HTTPLog {
h := newHTTPLog(orgID, LogTypeTicketerCalled, url, request, response, isError, elapsed, createdOn)
h.h.TicketerID = tid
func NewTicketerCalledLog(orgID OrgID, tid TicketerID, url string, statusCode int, request, response string, isError bool, elapsed time.Duration, retries int, createdOn time.Time) *HTTPLog {
h := newHTTPLog(orgID, LogTypeTicketerCalled, url, statusCode, request, response, isError, elapsed, retries, createdOn)
h.TicketerID = tid
return h
}

// NewAirtimeTransferredLog creates a new HTTP log for an airtime transfer
func NewAirtimeTransferredLog(orgID OrgID, url string, request string, response string, isError bool, elapsed time.Duration, createdOn time.Time) *HTTPLog {
return newHTTPLog(orgID, LogTypeAirtimeTransferred, url, request, response, isError, elapsed, createdOn)
func NewAirtimeTransferredLog(orgID OrgID, url string, statusCode int, request, response string, isError bool, elapsed time.Duration, retries int, createdOn time.Time) *HTTPLog {
return newHTTPLog(orgID, LogTypeAirtimeTransferred, url, statusCode, request, response, isError, elapsed, retries, createdOn)
}

// SetAirtimeTransferID called to set the transfer ID on a log after the transfer has been created
func (h *HTTPLog) SetAirtimeTransferID(tid AirtimeTransferID) {
h.h.AirtimeTransferID = tid
h.AirtimeTransferID = tid
}

const insertHTTPLogsSQL = `
INSERT INTO request_logs_httplog( log_type, org_id, classifier_id, ticketer_id, airtime_transfer_id, url, request, response, is_error, request_time, created_on)
VALUES(:log_type, :org_id, :classifier_id, :ticketer_id, :airtime_transfer_id, :url, :request, :response, :is_error, :request_time, :created_on)
INSERT INTO request_logs_httplog( log_type, org_id, url, status_code, flow_id, classifier_id, ticketer_id, airtime_transfer_id, request, response, is_error, request_time, num_retries, created_on)
VALUES(:log_type, :org_id, :url, :status_code, :flow_id, :classifier_id, :ticketer_id, :airtime_transfer_id, :request, :response, :is_error, :request_time, :num_retries, :created_on)
RETURNING id
`

Expand All @@ -98,7 +111,7 @@ func InsertHTTPLogs(ctx context.Context, tx Queryer, logs []*HTTPLog) error {

ls := make([]interface{}, len(logs))
for i := range logs {
ls[i] = &logs[i].h
ls[i] = &logs[i]
}

return BulkQuery(ctx, "inserted http logs", tx, insertHTTPLogsSQL, ls)
Expand Down Expand Up @@ -136,10 +149,12 @@ func (h *HTTPLogger) Ticketer(t *Ticketer) flows.HTTPLogCallback {
t.OrgID(),
t.ID(),
l.URL,
l.StatusCode,
l.Request,
l.Response,
l.Status != flows.CallStatusSuccess,
time.Duration(l.ElapsedMS)*time.Millisecond,
l.Retries,
l.CreatedOn,
))
}
Expand Down
19 changes: 14 additions & 5 deletions core/models/http_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,28 @@ import (
func TestHTTPLogs(t *testing.T) {
ctx, _, db, _ := testsuite.Get()

// insert a log
log := models.NewClassifierCalledLog(testdata.Org1.ID, testdata.Wit.ID, "http://foo.bar", "GET /", "STATUS 200", false, time.Second, time.Now())
defer func() { db.MustExec(`DELETE FROM request_logs_httplog`) }()

// insert a classifier log
log := models.NewClassifierCalledLog(testdata.Org1.ID, testdata.Wit.ID, "http://foo.bar", 200, "GET /", "STATUS 200", false, time.Second, 0, time.Now())
err := models.InsertHTTPLogs(ctx, db, []*models.HTTPLog{log})
assert.Nil(t, err)

testsuite.AssertQuery(t, db, `SELECT count(*) from request_logs_httplog WHERE org_id = $1 AND classifier_id = $2 AND is_error = FALSE`, testdata.Org1.ID, testdata.Wit.ID).Returns(1)
testsuite.AssertQuery(t, db, `SELECT count(*) from request_logs_httplog WHERE org_id = $1 AND status_code = 200 AND classifier_id = $2 AND is_error = FALSE`, testdata.Org1.ID, testdata.Wit.ID).Returns(1)

// insert a log with nil response
log = models.NewClassifierCalledLog(testdata.Org1.ID, testdata.Wit.ID, "http://foo.bar", "GET /", "", true, time.Second, time.Now())
log = models.NewClassifierCalledLog(testdata.Org1.ID, testdata.Wit.ID, "http://foo.bar", 0, "GET /", "", true, time.Second, 0, time.Now())
err = models.InsertHTTPLogs(ctx, db, []*models.HTTPLog{log})
assert.Nil(t, err)

testsuite.AssertQuery(t, db, `SELECT count(*) from request_logs_httplog WHERE org_id = $1 AND status_code = 0 AND classifier_id = $2 AND is_error = TRUE AND response IS NULL`, testdata.Org1.ID, testdata.Wit.ID).Returns(1)

// insert a webhook log
log = models.NewWebhookCalledLog(testdata.Org1.ID, testdata.Favorites.ID, "http://foo.bar", 400, "GET /", "HTTP 200", false, time.Second, 2, time.Now())
err = models.InsertHTTPLogs(ctx, db, []*models.HTTPLog{log})
assert.Nil(t, err)

testsuite.AssertQuery(t, db, `SELECT count(*) from request_logs_httplog WHERE org_id = $1 AND classifier_id = $2 AND is_error = TRUE AND response IS NULL`, testdata.Org1.ID, testdata.Wit.ID).Returns(1)
testsuite.AssertQuery(t, db, `SELECT count(*) from request_logs_httplog WHERE org_id = $1 AND status_code = 400 AND flow_id = $2 AND num_retries = 2`, testdata.Org1.ID, testdata.Favorites.ID).Returns(1)
}

func TestHTTPLogger(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions core/models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type Session struct {

// we also keep around a reference to the wait (if any)
wait flows.ActivatedWait

findStep func(flows.StepUUID) (flows.FlowRun, flows.Step)
}

func (s *Session) ID() SessionID { return s.s.ID }
Expand Down Expand Up @@ -234,6 +236,11 @@ func (s *Session) Wait() flows.ActivatedWait {
return s.wait
}

// FindStep finds the run and step with the given UUID
func (s *Session) FindStep(uuid flows.StepUUID) (flows.FlowRun, flows.Step) {
return s.findStep(uuid)
}

// Timeout returns the amount of time after our last message sends that we should timeout
func (s *Session) Timeout() *time.Duration {
return s.timeout
Expand Down Expand Up @@ -384,6 +391,7 @@ func NewSession(ctx context.Context, tx *sqlx.Tx, org *OrgAssets, fs flows.Sessi

session.sprint = sprint
session.wait = fs.Wait()
session.findStep = fs.FindStep

// now build up our runs
for _, r := range fs.Runs() {
Expand Down
2 changes: 1 addition & 1 deletion core/models/tickets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestTickets(t *testing.T) {
testdata.Alexandria.ID,
testdata.Zendesk.ID,
"EX6677",
models.NilTopicID,
testdata.SupportTopic.ID,
"Where are my pants?",
testdata.Org2Admin.ID,
nil,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.14.0
github.com/nyaruka/goflow v0.135.0
github.com/nyaruka/goflow v0.136.1
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
github.com/nyaruka/gocommon v1.14.0 h1:sZ0rsKy52GAHfEB6H74thj0cf7VIW9uRYpuFBe1iJ9Q=
github.com/nyaruka/gocommon v1.14.0/go.mod h1:J0BvHSsj8gjMp0oPW+PEb4x25oStupkNpHm7Y5OfNPo=
github.com/nyaruka/goflow v0.135.0 h1:CPrhJh1e2hLKJu4q9ae3oHJUtWi53hnheLC0bC7FBZA=
github.com/nyaruka/goflow v0.135.0/go.mod h1:kqvs8GzFkLjogLqLmNJmpuvEHFvlKsALlDWgC25AtGk=
github.com/nyaruka/goflow v0.136.1 h1:0ReTkOEYDTkDocNeyrzqr0lWKk8gpdeKrfz/XBGnMsM=
github.com/nyaruka/goflow v0.136.1/go.mod h1:kqvs8GzFkLjogLqLmNJmpuvEHFvlKsALlDWgC25AtGk=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
Binary file modified mailroom_test.dump
Binary file not shown.