Skip to content

Commit

Permalink
Add missing bytes field to backend response log (#494)
Browse files Browse the repository at this point in the history
* Add missing bytes field to backend response log

* (docs) remove unknown utf8 chars and reformat tables

* (docs) add response.bytes field to backend log documentation

* Add content-length fallback

* rm obsolete key

* more checks in test

* handle timeout cancel select case

* fmt

* Fix missing token request within custom log context

* Fix possible timer deadlock

golang/go#27169

* cancel while reading results

* fire collected backend logStack at the end

endpoint handler has to many exits; even with panic recovers

* Add changelog entry

Co-authored-by: Alex Schneider <alex.schneider@avenga.com>
  • Loading branch information
Marcel Ludwig and Alex Schneider authored May 9, 2022
1 parent 27bce19 commit 502ef91
Show file tree
Hide file tree
Showing 15 changed files with 398 additions and 115 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Unreleased changes are available as `avenga/couper:edge` container.
* Log malformed duration settings ([#487](https://github.com/avenga/couper/pull/487))
* `url` attribute could make use of our wildcard pattern `/**` and relative urls in combination with a backend reference ([#480](https://github.com/avenga/couper/pull/480))
* Error handling for `backend`, `backend_openapi_validation` and `backend_timeout` error types ([#490](https://github.com/avenga/couper/pull/490))
* `response.bytes` log-field to backend logs if read from body, fallback is the `Content-Length` header ([#494](https://github.com/avenga/couper/pull/494))

* **Changed**
* Permission handling: ([#477](https://github.com/avenga/couper/pull/477))
Expand Down
1 change: 1 addition & 0 deletions config/request/context_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
ContextType ContextKey = iota
APIName
AccessControls
BackendBytes
BackendName
BackendParams
BackendTokenRequest
Expand Down
175 changes: 88 additions & 87 deletions docs/LOGS.md

Large diffs are not rendered by default.

18 changes: 15 additions & 3 deletions eval/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"sync"

"github.com/zclconf/go-cty/cty"

"github.com/avenga/couper/config/request"
)

const TokenRequestPrefix = "_tr_"

type SyncedVariables struct {
items sync.Map
}
Expand All @@ -20,11 +24,18 @@ func NewSyncedVariables() *SyncedVariables {
type syncPair struct {
name string
bereq, beresp cty.Value
tokenRequest bool
}

// Set finalized cty req/resp pair.
func (sv *SyncedVariables) Set(beresp *http.Response) {
name, bereqV, berespV := newBerespValues(beresp.Request.Context(), true, beresp)
ctx := beresp.Request.Context()
name, bereqV, berespV := newBerespValues(ctx, true, beresp)

if tr, ok := ctx.Value(request.TokenRequest).(string); ok && tr != "" {
name = TokenRequestPrefix + name
}

sv.items.Store(name, &syncPair{
name: name,
bereq: bereqV,
Expand All @@ -46,12 +57,13 @@ func (sv *SyncedVariables) Sync(variables map[string]cty.Value) {
if bereqs == nil {
bereqs = make(map[string]cty.Value)
}
bereqs[p.name] = p.bereq
name := key.(string)
bereqs[name] = p.bereq

if beresps == nil {
beresps = make(map[string]cty.Value)
}
beresps[p.name] = p.beresp
beresps[name] = p.beresp

return true
})
Expand Down
6 changes: 4 additions & 2 deletions handler/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (e *Endpoint) produce(req *http.Request) (producer.ResultMap, error) {
close(tripCh)

for resultCh := range tripCh {
e.readResults(resultCh, results)
e.readResults(req.Context(), resultCh, results)
}

var err error // TODO: prefer default resp err
Expand All @@ -240,10 +240,12 @@ func (e *Endpoint) produce(req *http.Request) (producer.ResultMap, error) {
return results, err
}

func (e *Endpoint) readResults(requestResults producer.Results, beresps producer.ResultMap) {
func (e *Endpoint) readResults(ctx context.Context, requestResults producer.Results, beresps producer.ResultMap) {
i := 0
for {
select {
case <-ctx.Done():
return
case r, more := <-requestResults:
if !more {
return
Expand Down
23 changes: 13 additions & 10 deletions handler/transport/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) {
}

if !eval.IsUpgradeResponse(req, beresp) {
beresp.Body = logging.NewBytesCountReader(beresp)
if err = setGzipReader(beresp); err != nil {
b.upstreamLog.LogEntry().WithContext(req.Context()).WithError(err).Error()
}
Expand Down Expand Up @@ -373,18 +374,20 @@ func (b *Backend) withTimeout(req *http.Request, conf *Config) <-chan error {
return
}

go func(done <-chan struct{}) {
go func(c context.Context, timeoutCh chan time.Time) {
ttfbTimer.Reset(conf.TTFBTimeout)
select {
case <-done:
if !ttfbTimer.Stop() {
<-ttfbTimer.C
case <-c.Done():
ttfbTimer.Stop()
select {
case <-ttfbTimer.C:
default:
}
return
case ttfbTimeout <- <-ttfbTimer.C:
case t := <-ttfbTimer.C:
// buffered, no select done required
timeoutCh <- t
}

}(req.Context().Done())
}(ctx, ttfbTimeout)
},
GotFirstResponseByte: func() {
if downstreamTrace != nil && downstreamTrace.GotFirstResponseByte != nil {
Expand All @@ -396,7 +399,7 @@ func (b *Backend) withTimeout(req *http.Request, conf *Config) <-chan error {

*req = *req.WithContext(httptrace.WithClientTrace(ctx, ctxTrace))

go func(cancelFn func(), c context.Context, ec chan error) {
go func(c context.Context, cancelFn func(), ec chan error) {
defer cancelFn()
deadline := make(<-chan time.Time)
if timeout > 0 {
Expand All @@ -415,7 +418,7 @@ func (b *Backend) withTimeout(req *http.Request, conf *Config) <-chan error {
case <-c.Done():
return
}
}(cancel, ctx, errCh)
}(ctx, cancel, errCh)
return errCh
}

Expand Down
40 changes: 40 additions & 0 deletions logging/bytes_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package logging

import (
"context"
"io"
"net/http"
"sync/atomic"

"github.com/avenga/couper/config/request"
)

var _ io.ReadCloser = &BytesCountReader{}

type BytesCountReader struct {
c context.Context
n int64
r io.ReadCloser
}

// NewBytesCountReader just counts the raw read bytes from given response body for logging purposes.
func NewBytesCountReader(beresp *http.Response) io.ReadCloser {
return &BytesCountReader{
c: beresp.Request.Context(),
r: beresp.Body,
}
}

func (b *BytesCountReader) Read(p []byte) (n int, err error) {
n, err = b.r.Read(p)
b.n += int64(n)
return n, err
}

func (b *BytesCountReader) Close() error {
bytesPtr, ok := b.c.Value(request.BackendBytes).(*int64)
if ok {
atomic.StoreInt64(bytesPtr, b.n)
}
return b.r.Close()
}
14 changes: 14 additions & 0 deletions logging/hooks/context.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package hooks

import (
"sync/atomic"

"github.com/sirupsen/logrus"

"github.com/avenga/couper/config/request"
"github.com/avenga/couper/logging"
)

var _ logrus.Hook = &Context{}
Expand All @@ -21,5 +24,16 @@ func (c *Context) Fire(entry *logrus.Entry) error {
entry.Data["uid"] = uid
}
}

if field, ok := entry.Data["type"]; ok && field == beTypeField {
if bytes, i := entry.Context.Value(request.BackendBytes).(*int64); i {
response, r := entry.Data["response"].(logging.Fields)
b := atomic.LoadInt64(bytes)
if r && b > 0 {
response["bytes"] = b
}
}
}

return nil
}
19 changes: 17 additions & 2 deletions logging/hooks/custom_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,21 @@ func fireUpstream(entry *logrus.Entry) {
// syncedUpstreamContext prepares the local backend variable.
func syncedUpstreamContext(evalCtx *eval.Context, entry *logrus.Entry) *hcl.EvalContext {
ctx := evalCtx.HCLContextSync()

tr, _ := entry.Context.Value(request.TokenRequest).(string)
rtName, _ := entry.Context.Value(request.RoundTripName).(string)
isTr := tr != ""

if rtName == "" {
return ctx
}

if _, ok := ctx.Variables[eval.BackendRequests]; ok {
for k, v := range ctx.Variables[eval.BackendRequests].AsValueMap() {
if k == entry.Context.Value(request.RoundTripName) {
if isTr && k == eval.TokenRequestPrefix+rtName {
ctx.Variables[eval.BackendRequest] = v
break
} else if k == rtName {
ctx.Variables[eval.BackendRequest] = v
break
}
Expand All @@ -113,7 +125,10 @@ func syncedUpstreamContext(evalCtx *eval.Context, entry *logrus.Entry) *hcl.Eval

if _, ok := ctx.Variables[eval.BackendResponses]; ok {
for k, v := range ctx.Variables[eval.BackendResponses].AsValueMap() {
if k == entry.Context.Value(request.RoundTripName) {
if isTr && k == eval.TokenRequestPrefix+rtName {
ctx.Variables[eval.BackendResponse] = v
break
} else if k == rtName {
ctx.Variables[eval.BackendResponse] = v
break
}
Expand Down
55 changes: 55 additions & 0 deletions logging/stack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package logging

import (
"context"
"sync"

"github.com/sirupsen/logrus"
)

type entry struct {
logEntry *logrus.Entry
}

func (e *entry) Level(lvl logrus.Level) {
e.logEntry.Level = lvl
}

type Level interface {
Level(level logrus.Level)
}

type Stack struct {
entries []*entry
mu sync.Mutex
}

const logStack = "logStack"

func NewStack(ctx context.Context) (context.Context, *Stack) {
s := &Stack{}
return context.WithValue(ctx, logStack, s), s
}

func (s *Stack) Push(e *logrus.Entry) Level {
s.mu.Lock()
defer s.mu.Unlock()

item := &entry{logEntry: e}
s.entries = append(s.entries, item)
return item
}

func (s *Stack) Fire() {
s.mu.Lock()
defer s.mu.Unlock()

for _, item := range s.entries {
item.logEntry.Log(item.logEntry.Level)
}
}

func FromContext(ctx context.Context) (*Stack, bool) {
s, exist := ctx.Value(logStack).(*Stack)
return s, exist
}
30 changes: 25 additions & 5 deletions logging/upstream_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ func (u *UpstreamLog) RoundTrip(req *http.Request) (*http.Response, error) {

fields["request"] = requestFields

berespBytes := int64(0)
logCtxCh := make(chan hcl.Body, 10)
outctx := context.WithValue(req.Context(), request.LogCustomUpstream, logCtxCh)
outctx = context.WithValue(outctx, request.BackendBytes, &berespBytes)
oCtx, openAPIContext := validation.NewWithContext(outctx)
outreq := req.WithContext(httptrace.WithClientTrace(oCtx, clientTrace))

Expand Down Expand Up @@ -116,15 +118,20 @@ func (u *UpstreamLog) RoundTrip(req *http.Request) (*http.Response, error) {
if tr, ok := outreq.Context().Value(request.TokenRequest).(string); ok && tr != "" {
fields["token_request"] = tr

if retries, ok := outreq.Context().Value(request.TokenRequestRetries).(uint8); ok && retries > 0 {
if retries, exist := outreq.Context().Value(request.TokenRequestRetries).(uint8); exist && retries > 0 {
fields["token_request_retry"] = retries
}
}

fields["status"] = 0
if beresp != nil {
fields["status"] = beresp.StatusCode
cl := int64(0)
if beresp.ContentLength > 0 {
cl = beresp.ContentLength
}
responseFields := Fields{
"bytes": cl,
"headers": filterHeader(u.config.ResponseHeaders, beresp.Header),
"status": beresp.StatusCode,
}
Expand All @@ -146,16 +153,29 @@ func (u *UpstreamLog) RoundTrip(req *http.Request) (*http.Response, error) {
fields["timings"] = timingResults
//timings["ttlb"] = roundMS(rtDone.Sub(timeTTFB)) // TODO: depends on stream or buffer

entry := u.log.WithFields(logrus.Fields(fields)).WithContext(outreq.Context())
entry.Time = startTime
entry := u.log.
WithFields(logrus.Fields(fields)).
WithContext(outreq.Context()).
WithTime(startTime)

stack, stacked := FromContext(outreq.Context())

if err != nil {
if _, ok := err.(errors.GoError); !ok {
err = errors.Backend.With(err)
}
entry.WithError(err).Error()
entry = entry.WithError(err)
if stacked {
stack.Push(entry).Level(logrus.ErrorLevel)
} else {
entry.Error()
}
} else {
entry.Info()
if stacked {
stack.Push(entry).Level(logrus.InfoLevel)
} else {
entry.Info()
}
}

return beresp, err
Expand Down
Loading

0 comments on commit 502ef91

Please sign in to comment.