Skip to content

Commit

Permalink
topdown: Pass metrics to built-ins and record http.send latency
Browse files Browse the repository at this point in the history
Previously the built-in functions had no way to record metrics for
performance monitoring or other purposes. With this change, built-in
functions can manipulate the evaluation metrics. Initially, only the
http.send function has been updated to report latency.

Fixes open-policy-agent#2034

Signed-off-by: Torin Sandall <torinsandall@gmail.com>
  • Loading branch information
tsandall committed Aug 13, 2020
1 parent e8806c1 commit 1c90afa
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 20 deletions.
2 changes: 2 additions & 0 deletions topdown/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"

"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/topdown/cache"

"github.com/open-policy-agent/opa/ast"
Expand All @@ -32,6 +33,7 @@ type (
// built-in functions.
BuiltinContext struct {
Context context.Context // request context that was passed when query started
Metrics metrics.Metrics // metrics registry for recording built-in specific metrics
Seed io.Reader // randomization seed
Time *ast.Term // wall clock time
Cancel Cancel // atomic value that signals evaluation to halt
Expand Down
6 changes: 4 additions & 2 deletions topdown/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"strconv"
"strings"

"github.com/open-policy-agent/opa/topdown/cache"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/topdown/builtins"
"github.com/open-policy-agent/opa/topdown/cache"
"github.com/open-policy-agent/opa/topdown/copypropagation"
)

Expand All @@ -33,6 +33,7 @@ func (f *queryIDFactory) Next() uint64 {

type eval struct {
ctx context.Context
metrics metrics.Metrics
seed io.Reader
time *ast.Term
queryID uint64
Expand Down Expand Up @@ -621,6 +622,7 @@ func (e *eval) evalCall(terms []*ast.Term, iter unifyIterator) error {

bctx := BuiltinContext{
Context: e.ctx,
Metrics: e.metrics,
Seed: e.seed,
Time: e.time,
Cancel: e.cancel,
Expand Down
11 changes: 9 additions & 2 deletions topdown/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ var allowedKeyNames = [...]string{
"timeout",
"cache",
}
var allowedKeys = ast.NewSet()

var requiredKeys = ast.NewSet(ast.StringTerm("method"), ast.StringTerm("url"))
var (
allowedKeys = ast.NewSet()
requiredKeys = ast.NewSet(ast.StringTerm("method"), ast.StringTerm("url"))
httpSendLatencyMetricKey = "rego_builtin_" + ast.HTTPSend.Name
)

type httpSendKey string

Expand All @@ -64,6 +67,8 @@ const httpSendBuiltinCacheKey httpSendKey = "HTTP_SEND_CACHE_KEY"

func builtinHTTPSend(bctx BuiltinContext, args []*ast.Term, iter func(*ast.Term) error) error {

bctx.Metrics.Timer(httpSendLatencyMetricKey).Start()

req, err := validateHTTPRequestOperand(args[0], 1)
if err != nil {
return handleBuiltinErr(ast.HTTPSend.Name, bctx.Location, err)
Expand Down Expand Up @@ -93,6 +98,8 @@ func builtinHTTPSend(bctx BuiltinContext, args []*ast.Term, iter func(*ast.Term)
}
}

bctx.Metrics.Timer(httpSendLatencyMetricKey).Stop()

return iter(ast.NewTerm(resp))
}

Expand Down
24 changes: 24 additions & 0 deletions topdown/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package topdown

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand All @@ -22,6 +23,7 @@ import (
"time"

"github.com/open-policy-agent/opa/internal/version"
"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/topdown/builtins"

"github.com/open-policy-agent/opa/ast"
Expand Down Expand Up @@ -1841,6 +1843,28 @@ func TestHTTPSNoClientCerts(t *testing.T) {
})
}

func TestHTTPSendMetrics(t *testing.T) {

// run test server
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))

defer ts.Close()

// Execute query and verify http.send latency shows up in metrics registry.
m := metrics.New()
q := NewQuery(ast.MustParseBody(fmt.Sprintf(`http.send({"method": "get", "url": %q})`, ts.URL))).WithMetrics(m)
_, err := q.Run(context.Background())
if err != nil {
t.Fatal(err)
}

if m.Timer(httpSendLatencyMetricKey).Int64() == 0 {
t.Fatal("expected non-zero value for http.send latency metric")
}
}

var httpSendHelperRules = []string{
`clean_headers(resp) = cleaned {
cleaned = json.remove(resp, ["headers/date"])
Expand Down
28 changes: 12 additions & 16 deletions topdown/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,14 @@ func (q *Query) PartialRun(ctx context.Context) (partials []ast.Body, support []
if !q.time.IsZero() {
q.time = time.Now()
}
if q.metrics == nil {
q.metrics = metrics.New()
}
f := &queryIDFactory{}
b := newBindings(0, q.instr)
e := &eval{
ctx: ctx,
metrics: q.metrics,
seed: q.seed,
time: ast.NumberTerm(int64ToJSONNumber(q.time.UnixNano())),
cancel: q.cancel,
Expand Down Expand Up @@ -288,8 +292,8 @@ func (q *Query) PartialRun(ctx context.Context) (partials []ast.Body, support []
}

e.caller = e
q.startTimer(metrics.RegoPartialEval)
defer q.stopTimer(metrics.RegoPartialEval)
q.metrics.Timer(metrics.RegoPartialEval).Start()
defer q.metrics.Timer(metrics.RegoPartialEval).Stop()

livevars := ast.NewVarSet()

Expand Down Expand Up @@ -360,9 +364,13 @@ func (q *Query) Iter(ctx context.Context, iter func(QueryResult) error) error {
if q.time.IsZero() {
q.time = time.Now()
}
if q.metrics == nil {
q.metrics = metrics.New()
}
f := &queryIDFactory{}
e := &eval{
ctx: ctx,
metrics: q.metrics,
seed: q.seed,
time: ast.NumberTerm(int64ToJSONNumber(q.time.UnixNano())),
cancel: q.cancel,
Expand Down Expand Up @@ -391,7 +399,7 @@ func (q *Query) Iter(ctx context.Context, iter func(QueryResult) error) error {
indexing: q.indexing,
}
e.caller = e
q.startTimer(metrics.RegoQueryEval)
q.metrics.Timer(metrics.RegoQueryEval).Start()
err := e.Run(func(e *eval) error {
qr := QueryResult{}
e.bindings.Iter(nil, func(k, v *ast.Term) error {
Expand All @@ -400,18 +408,6 @@ func (q *Query) Iter(ctx context.Context, iter func(QueryResult) error) error {
})
return iter(qr)
})
q.stopTimer(metrics.RegoQueryEval)
q.metrics.Timer(metrics.RegoQueryEval).Stop()
return err
}

func (q *Query) startTimer(name string) {
if q.metrics != nil {
q.metrics.Timer(name).Start()
}
}

func (q *Query) stopTimer(name string) {
if q.metrics != nil {
q.metrics.Timer(name).Stop()
}
}

0 comments on commit 1c90afa

Please sign in to comment.