Skip to content

Commit

Permalink
Logs PushRequest data. (#3178)
Browse files Browse the repository at this point in the history
* Logs PushRequest data.

This will allows to find information about received size and total entries per tenant.

Example of a log from my dev testing:

```
level=debug ts=2021-01-15T11:16:21.735663076Z caller=http.go:67 org_id=3927 traceID=11c4774c6ec4bbf4 msg="push request parsed" path=/loki/api/v1/push content-type=application/x-protobuf body-size="11 kB" streams=5 entries=298 streamLabelsSize="1.9 kB" entriesSize="45 kB" totalSize="47 kB"
```

Of course this means we can use LogQL on this.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Move metrics to avoid request traversal twice.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jan 15, 2021
1 parent c883f02 commit 28c0b31
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 29 deletions.
23 changes: 0 additions & 23 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,6 @@ var (
Help: "The total number of failed batch appends sent to ingesters.",
}, []string{"ingester"})

bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant",
}, []string{"tenant"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant"})

maxLabelCacheSize = 100000
)

Expand Down Expand Up @@ -193,18 +182,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, err
}

// Track metrics.
bytesCount := 0
lineCount := 0
for _, stream := range req.Streams {
for _, entry := range stream.Entries {
bytesCount += len(entry.Line)
lineCount++
}
}
bytesIngested.WithLabelValues(userID).Add(float64(bytesCount))
linesIngested.WithLabelValues(userID).Add(float64(lineCount))

// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
Expand Down
68 changes: 62 additions & 6 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,41 @@ import (
"math"
"net/http"

"github.com/dustin/go-humanize"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/util"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/unmarshal"
unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy"
lokiutil "github.com/grafana/loki/pkg/util"
)

var contentType = http.CanonicalHeaderKey("Content-Type")
var (
contentType = http.CanonicalHeaderKey("Content-Type")

bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant",
}, []string{"tenant"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant"})
)

const applicationJSON = "application/json"

// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {

req, err := ParseRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -42,24 +60,62 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
}

func ParseRequest(r *http.Request) (*logproto.PushRequest, error) {
userID, _ := user.ExtractOrgID(r.Context())
logger := util.WithContext(r.Context(), util.Logger)
body := lokiutil.NewSizeReader(r.Body)
contentType := r.Header.Get(contentType)
var req logproto.PushRequest

switch r.Header.Get(contentType) {
defer func() {
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
)

for _, s := range req.Streams {
streamLabelsSize += int64(len(s.Labels))
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
}
}

// incrementing tenant metrics if we have a tenant.
if totalEntries != 0 && userID != "" {
bytesIngested.WithLabelValues(userID).Add(float64(entriesSize))
linesIngested.WithLabelValues(userID).Add(float64(totalEntries))
}

level.Debug(logger).Log(
"msg", "push request parsed",
"path", r.URL.Path,
"contentType", contentType,
"bodySize", humanize.Bytes(uint64(body.Size())),
"streams", len(req.Streams),
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
)
}()

switch contentType {
case applicationJSON:
var err error

if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = unmarshal.DecodePushRequest(r.Body, &req)
err = unmarshal.DecodePushRequest(body, &req)
} else {
err = unmarshal_legacy.DecodePushRequest(r.Body, &req)
err = unmarshal_legacy.DecodePushRequest(body, &req)
}

if err != nil {
return nil, err
}

default:
if err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
return nil, err
}
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/util/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package util

import (
"io"
)

type sizeReader struct {
size int64
r io.Reader
}

type SizeReader interface {
io.Reader
Size() int64
}

// NewSizeReader returns an io.Reader that will have the number of bytes
// read from r available.
func NewSizeReader(r io.Reader) SizeReader {
return &sizeReader{r: r}
}

func (v *sizeReader) Read(p []byte) (int, error) {
n, err := v.r.Read(p)
v.size += int64(n)
return n, err
}

func (v *sizeReader) Size() int64 {
return v.size
}

0 comments on commit 28c0b31

Please sign in to comment.