Skip to content

Commit

Permalink
Merge pull request rapidpro#500 from nyaruka/webhooks_as_httplogs_1
Browse files Browse the repository at this point in the history
Add new fields to HTTPLog and save for webhook called events
  • Loading branch information
rowanseymour authored Sep 21, 2021
2 parents 80c4c90 + 3e3eab5 commit 805fd03
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 51 deletions.
2 changes: 2 additions & 0 deletions core/handlers/airtime_transferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ func handleAirtimeTransferred(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool,
transfer.AddLog(models.NewAirtimeTransferredLog(
oa.OrgID(),
httpLog.URL,
httpLog.StatusCode,
httpLog.Request,
httpLog.Response,
httpLog.Status != flows.CallStatusSuccess,
time.Duration(httpLog.ElapsedMS)*time.Millisecond,
httpLog.Retries,
httpLog.CreatedOn,
))
}
Expand Down
4 changes: 4 additions & 0 deletions core/handlers/service_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,25 @@ func handleServiceCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *m
oa.OrgID(),
classifier.ID(),
httpLog.URL,
httpLog.StatusCode,
httpLog.Request,
httpLog.Response,
httpLog.Status != flows.CallStatusSuccess,
time.Duration(httpLog.ElapsedMS)*time.Millisecond,
httpLog.Retries,
httpLog.CreatedOn,
)
} else if event.Service == "ticketer" {
log = models.NewTicketerCalledLog(
oa.OrgID(),
ticketer.ID(),
httpLog.URL,
httpLog.StatusCode,
httpLog.Request,
httpLog.Response,
httpLog.Status != flows.CallStatusSuccess,
time.Duration(httpLog.ElapsedMS)*time.Millisecond,
httpLog.Retries,
httpLog.CreatedOn,
)
}
Expand Down
21 changes: 18 additions & 3 deletions core/handlers/webhook_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,28 @@ func handleWebhookCalled(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *m
scene.AppendToEventPreCommitHook(hooks.UnsubscribeResthookHook, unsub)
}

// if this is a connection error, use that as our response
run, _ := scene.Session().FindStep(e.StepUUID())
flow, _ := oa.Flow(run.FlowReference().UUID)

// create an HTTP log
if flow != nil {
httpLog := models.NewWebhookCalledLog(
oa.OrgID(),
flow.(*models.Flow).ID(),
event.URL, event.StatusCode, event.Request, event.Response,
event.Status != flows.CallStatusSuccess,
time.Millisecond*time.Duration(event.ElapsedMS),
event.Retries,
event.CreatedOn(),
)
scene.AppendToEventPreCommitHook(hooks.InsertHTTPLogsHook, httpLog)
}

// TODO drop this once RP UI is switched to using HTTP logs
response := event.Response
if event.Status == flows.CallStatusConnectionError {
response = "connection error"
}

// create a result for this call
result := models.NewWebhookResult(
oa.OrgID(), scene.ContactID(),
event.URL, event.Request,
Expand Down
19 changes: 15 additions & 4 deletions core/handlers/webhook_called_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,16 @@ func TestWebhookCalled(t *testing.T) {
{
Actions: handlers.ContactActionMap{
testdata.Cathy: []flows.Action{
actions.NewCallResthook(handlers.NewActionUUID(), "foo", "foo"),
actions.NewCallResthook(handlers.NewActionUUID(), "foo", "foo"), // calls both subscribers
},
testdata.George: []flows.Action{
actions.NewCallResthook(handlers.NewActionUUID(), "foo", "foo"),
actions.NewCallResthook(handlers.NewActionUUID(), "foo", "foo"), // calls both subscribers
actions.NewCallWebhook(handlers.NewActionUUID(), "GET", "http://rapidpro.io/?unsub=1", nil, "", ""),
},
},
SQLAssertions: []handlers.SQLAssertion{
{
SQL: "select count(*) from api_resthooksubscriber where is_active = FALSE",
Args: nil,
Count: 1,
},
{
Expand All @@ -63,9 +62,16 @@ func TestWebhookCalled(t *testing.T) {
},
{
SQL: "select count(*) from api_resthooksubscriber where is_active = TRUE",
Args: nil,
Count: 2,
},
{
SQL: "select count(*) from request_logs_httplog where log_type = 'webhook_called' AND flow_id IS NOT NULL AND status_code = 200",
Count: 2,
},
{
SQL: "select count(*) from request_logs_httplog where log_type = 'webhook_called' AND flow_id IS NOT NULL AND status_code = 410",
Count: 3,
},
{
SQL: "select count(*) from api_webhookresult where contact_id = $1 AND status_code = 200",
Args: []interface{}{testdata.Cathy.ID},
Expand All @@ -81,6 +87,11 @@ func TestWebhookCalled(t *testing.T) {
Args: []interface{}{testdata.George.ID},
Count: 3,
},
{
SQL: "select count(*) from api_webhookresult where contact_id = $1",
Args: []interface{}{testdata.George.ID},
Count: 3,
},
{
SQL: "select count(*) from api_webhookevent where org_id = $1",
Args: []interface{}{testdata.Org1.ID},
Expand Down
85 changes: 50 additions & 35 deletions core/models/http_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type HTTPLogID null.Int
type HTTPLogType string

const (
// LogTypeWebhookCalled is our type for when a flow calls a webhook
LogTypeWebhookCalled = "webhook_called"

// LogTypeIntentsSynced is our type for when we sync intents
LogTypeIntentsSynced = "intents_synced"

Expand All @@ -31,62 +34,72 @@ const (

// HTTPLog is our type for a HTTPLog
type HTTPLog struct {
h struct {
ID HTTPLogID `db:"id"`
LogType HTTPLogType `db:"log_type"`
ClassifierID ClassifierID `db:"classifier_id"`
TicketerID TicketerID `db:"ticketer_id"`
AirtimeTransferID AirtimeTransferID `db:"airtime_transfer_id"`
URL string `db:"url"`
Request string `db:"request"`
Response null.String `db:"response"`
IsError bool `db:"is_error"`
RequestTime int `db:"request_time"`
CreatedOn time.Time `db:"created_on"`
OrgID OrgID `db:"org_id"`
ID HTTPLogID `db:"id"`
OrgID OrgID `db:"org_id"`
LogType HTTPLogType `db:"log_type"`
URL string `db:"url"`
StatusCode int `db:"status_code"`
Request string `db:"request"`
Response null.String `db:"response"`
IsError bool `db:"is_error"`
RequestTime int `db:"request_time"`
NumRetries int `db:"num_retries"`
CreatedOn time.Time `db:"created_on"`
FlowID FlowID `db:"flow_id"`
ClassifierID ClassifierID `db:"classifier_id"`
TicketerID TicketerID `db:"ticketer_id"`
AirtimeTransferID AirtimeTransferID `db:"airtime_transfer_id"`
}

func newHTTPLog(orgID OrgID, logType HTTPLogType, url string, statusCode int, request, response string, isError bool, elapsed time.Duration, retries int, createdOn time.Time) *HTTPLog {
return &HTTPLog{
OrgID: orgID,
LogType: logType,
URL: url,
StatusCode: statusCode,
Request: request,
Response: null.String(response),
IsError: isError,
RequestTime: int(elapsed / time.Millisecond),
NumRetries: retries,
CreatedOn: createdOn,
}
}

func newHTTPLog(orgID OrgID, logType HTTPLogType, url string, request string, response string, isError bool, elapsed time.Duration, createdOn time.Time) *HTTPLog {
h := &HTTPLog{}
h.h.LogType = logType
h.h.OrgID = orgID
h.h.URL = url
h.h.Request = request
h.h.Response = null.String(response)
h.h.IsError = isError
h.h.RequestTime = int(elapsed / time.Millisecond)
h.h.CreatedOn = createdOn
// NewWebhookCalledLog creates a new HTTP log for an in-flow webhook call
func NewWebhookCalledLog(orgID OrgID, fid FlowID, url string, statusCode int, request, response string, isError bool, elapsed time.Duration, retries int, createdOn time.Time) *HTTPLog {
h := newHTTPLog(orgID, LogTypeWebhookCalled, url, statusCode, request, response, isError, elapsed, retries, createdOn)
h.FlowID = fid
return h
}

// NewClassifierCalledLog creates a new HTTP log for a classifier call
func NewClassifierCalledLog(orgID OrgID, cid ClassifierID, url string, request string, response string, isError bool, elapsed time.Duration, createdOn time.Time) *HTTPLog {
h := newHTTPLog(orgID, LogTypeClassifierCalled, url, request, response, isError, elapsed, createdOn)
h.h.ClassifierID = cid
func NewClassifierCalledLog(orgID OrgID, cid ClassifierID, url string, statusCode int, request, response string, isError bool, elapsed time.Duration, retries int, createdOn time.Time) *HTTPLog {
h := newHTTPLog(orgID, LogTypeClassifierCalled, url, statusCode, request, response, isError, elapsed, retries, createdOn)
h.ClassifierID = cid
return h
}

// NewTicketerCalledLog creates a new HTTP log for a ticketer call
func NewTicketerCalledLog(orgID OrgID, tid TicketerID, url string, request string, response string, isError bool, elapsed time.Duration, createdOn time.Time) *HTTPLog {
h := newHTTPLog(orgID, LogTypeTicketerCalled, url, request, response, isError, elapsed, createdOn)
h.h.TicketerID = tid
func NewTicketerCalledLog(orgID OrgID, tid TicketerID, url string, statusCode int, request, response string, isError bool, elapsed time.Duration, retries int, createdOn time.Time) *HTTPLog {
h := newHTTPLog(orgID, LogTypeTicketerCalled, url, statusCode, request, response, isError, elapsed, retries, createdOn)
h.TicketerID = tid
return h
}

// NewAirtimeTransferredLog creates a new HTTP log for an airtime transfer
func NewAirtimeTransferredLog(orgID OrgID, url string, request string, response string, isError bool, elapsed time.Duration, createdOn time.Time) *HTTPLog {
return newHTTPLog(orgID, LogTypeAirtimeTransferred, url, request, response, isError, elapsed, createdOn)
func NewAirtimeTransferredLog(orgID OrgID, url string, statusCode int, request, response string, isError bool, elapsed time.Duration, retries int, createdOn time.Time) *HTTPLog {
return newHTTPLog(orgID, LogTypeAirtimeTransferred, url, statusCode, request, response, isError, elapsed, retries, createdOn)
}

// SetAirtimeTransferID called to set the transfer ID on a log after the transfer has been created
func (h *HTTPLog) SetAirtimeTransferID(tid AirtimeTransferID) {
h.h.AirtimeTransferID = tid
h.AirtimeTransferID = tid
}

const insertHTTPLogsSQL = `
INSERT INTO request_logs_httplog( log_type, org_id, classifier_id, ticketer_id, airtime_transfer_id, url, request, response, is_error, request_time, created_on)
VALUES(:log_type, :org_id, :classifier_id, :ticketer_id, :airtime_transfer_id, :url, :request, :response, :is_error, :request_time, :created_on)
INSERT INTO request_logs_httplog( log_type, org_id, url, status_code, flow_id, classifier_id, ticketer_id, airtime_transfer_id, request, response, is_error, request_time, num_retries, created_on)
VALUES(:log_type, :org_id, :url, :status_code, :flow_id, :classifier_id, :ticketer_id, :airtime_transfer_id, :request, :response, :is_error, :request_time, :num_retries, :created_on)
RETURNING id
`

Expand All @@ -98,7 +111,7 @@ func InsertHTTPLogs(ctx context.Context, tx Queryer, logs []*HTTPLog) error {

ls := make([]interface{}, len(logs))
for i := range logs {
ls[i] = &logs[i].h
ls[i] = &logs[i]
}

return BulkQuery(ctx, "inserted http logs", tx, insertHTTPLogsSQL, ls)
Expand Down Expand Up @@ -136,10 +149,12 @@ func (h *HTTPLogger) Ticketer(t *Ticketer) flows.HTTPLogCallback {
t.OrgID(),
t.ID(),
l.URL,
l.StatusCode,
l.Request,
l.Response,
l.Status != flows.CallStatusSuccess,
time.Duration(l.ElapsedMS)*time.Millisecond,
l.Retries,
l.CreatedOn,
))
}
Expand Down
19 changes: 14 additions & 5 deletions core/models/http_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,28 @@ import (
func TestHTTPLogs(t *testing.T) {
ctx, _, db, _ := testsuite.Get()

// insert a log
log := models.NewClassifierCalledLog(testdata.Org1.ID, testdata.Wit.ID, "http://foo.bar", "GET /", "STATUS 200", false, time.Second, time.Now())
defer func() { db.MustExec(`DELETE FROM request_logs_httplog`) }()

// insert a classifier log
log := models.NewClassifierCalledLog(testdata.Org1.ID, testdata.Wit.ID, "http://foo.bar", 200, "GET /", "STATUS 200", false, time.Second, 0, time.Now())
err := models.InsertHTTPLogs(ctx, db, []*models.HTTPLog{log})
assert.Nil(t, err)

testsuite.AssertQuery(t, db, `SELECT count(*) from request_logs_httplog WHERE org_id = $1 AND classifier_id = $2 AND is_error = FALSE`, testdata.Org1.ID, testdata.Wit.ID).Returns(1)
testsuite.AssertQuery(t, db, `SELECT count(*) from request_logs_httplog WHERE org_id = $1 AND status_code = 200 AND classifier_id = $2 AND is_error = FALSE`, testdata.Org1.ID, testdata.Wit.ID).Returns(1)

// insert a log with nil response
log = models.NewClassifierCalledLog(testdata.Org1.ID, testdata.Wit.ID, "http://foo.bar", "GET /", "", true, time.Second, time.Now())
log = models.NewClassifierCalledLog(testdata.Org1.ID, testdata.Wit.ID, "http://foo.bar", 0, "GET /", "", true, time.Second, 0, time.Now())
err = models.InsertHTTPLogs(ctx, db, []*models.HTTPLog{log})
assert.Nil(t, err)

testsuite.AssertQuery(t, db, `SELECT count(*) from request_logs_httplog WHERE org_id = $1 AND status_code = 0 AND classifier_id = $2 AND is_error = TRUE AND response IS NULL`, testdata.Org1.ID, testdata.Wit.ID).Returns(1)

// insert a webhook log
log = models.NewWebhookCalledLog(testdata.Org1.ID, testdata.Favorites.ID, "http://foo.bar", 400, "GET /", "HTTP 200", false, time.Second, 2, time.Now())
err = models.InsertHTTPLogs(ctx, db, []*models.HTTPLog{log})
assert.Nil(t, err)

testsuite.AssertQuery(t, db, `SELECT count(*) from request_logs_httplog WHERE org_id = $1 AND classifier_id = $2 AND is_error = TRUE AND response IS NULL`, testdata.Org1.ID, testdata.Wit.ID).Returns(1)
testsuite.AssertQuery(t, db, `SELECT count(*) from request_logs_httplog WHERE org_id = $1 AND status_code = 400 AND flow_id = $2 AND num_retries = 2`, testdata.Org1.ID, testdata.Favorites.ID).Returns(1)
}

func TestHTTPLogger(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions core/models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type Session struct {

// we also keep around a reference to the wait (if any)
wait flows.ActivatedWait

findStep func(flows.StepUUID) (flows.FlowRun, flows.Step)
}

func (s *Session) ID() SessionID { return s.s.ID }
Expand Down Expand Up @@ -234,6 +236,11 @@ func (s *Session) Wait() flows.ActivatedWait {
return s.wait
}

// FindStep finds the run and step with the given UUID
func (s *Session) FindStep(uuid flows.StepUUID) (flows.FlowRun, flows.Step) {
return s.findStep(uuid)
}

// Timeout returns the amount of time after our last message sends that we should timeout
func (s *Session) Timeout() *time.Duration {
return s.timeout
Expand Down Expand Up @@ -384,6 +391,7 @@ func NewSession(ctx context.Context, tx *sqlx.Tx, org *OrgAssets, fs flows.Sessi

session.sprint = sprint
session.wait = fs.Wait()
session.findStep = fs.FindStep

// now build up our runs
for _, r := range fs.Runs() {
Expand Down
2 changes: 1 addition & 1 deletion core/models/tickets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestTickets(t *testing.T) {
testdata.Alexandria.ID,
testdata.Zendesk.ID,
"EX6677",
models.NilTopicID,
testdata.SupportTopic.ID,
"Where are my pants?",
testdata.Org2Admin.ID,
nil,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.14.0
github.com/nyaruka/goflow v0.135.0
github.com/nyaruka/goflow v0.136.1
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
github.com/nyaruka/gocommon v1.14.0 h1:sZ0rsKy52GAHfEB6H74thj0cf7VIW9uRYpuFBe1iJ9Q=
github.com/nyaruka/gocommon v1.14.0/go.mod h1:J0BvHSsj8gjMp0oPW+PEb4x25oStupkNpHm7Y5OfNPo=
github.com/nyaruka/goflow v0.135.0 h1:CPrhJh1e2hLKJu4q9ae3oHJUtWi53hnheLC0bC7FBZA=
github.com/nyaruka/goflow v0.135.0/go.mod h1:kqvs8GzFkLjogLqLmNJmpuvEHFvlKsALlDWgC25AtGk=
github.com/nyaruka/goflow v0.136.1 h1:0ReTkOEYDTkDocNeyrzqr0lWKk8gpdeKrfz/XBGnMsM=
github.com/nyaruka/goflow v0.136.1/go.mod h1:kqvs8GzFkLjogLqLmNJmpuvEHFvlKsALlDWgC25AtGk=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
Binary file modified mailroom_test.dump
Binary file not shown.

0 comments on commit 805fd03

Please sign in to comment.