From 28c0b310c75b1de44f0489199af604f6a48b9d48 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 15 Jan 2021 18:48:06 +0100 Subject: [PATCH] Logs PushRequest data. (#3178) * Logs PushRequest data. This will allows to find information about received size and total entries per tenant. Example of a log from my dev testing: ``` level=debug ts=2021-01-15T11:16:21.735663076Z caller=http.go:67 org_id=3927 traceID=11c4774c6ec4bbf4 msg="push request parsed" path=/loki/api/v1/push content-type=application/x-protobuf body-size="11 kB" streams=5 entries=298 streamLabelsSize="1.9 kB" entriesSize="45 kB" totalSize="47 kB" ``` Of course this means we can use LogQL on this. Signed-off-by: Cyril Tovena * Move metrics to avoid request traversal twice. Signed-off-by: Cyril Tovena * Fixes tests. Signed-off-by: Cyril Tovena --- pkg/distributor/distributor.go | 23 ------------ pkg/distributor/http.go | 68 +++++++++++++++++++++++++++++++--- pkg/util/reader.go | 31 ++++++++++++++++ 3 files changed, 93 insertions(+), 29 deletions(-) create mode 100644 pkg/util/reader.go diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index bc73889e5ce6..43006cd939dc 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -42,17 +42,6 @@ var ( Help: "The total number of failed batch appends sent to ingesters.", }, []string{"ingester"}) - bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "loki", - Name: "distributor_bytes_received_total", - Help: "The total number of uncompressed bytes received per tenant", - }, []string{"tenant"}) - linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "loki", - Name: "distributor_lines_received_total", - Help: "The total number of lines received per tenant", - }, []string{"tenant"}) - maxLabelCacheSize = 100000 ) @@ -193,18 +182,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log return nil, err } - // Track metrics. - bytesCount := 0 - lineCount := 0 - for _, stream := range req.Streams { - for _, entry := range stream.Entries { - bytesCount += len(entry.Line) - lineCount++ - } - } - bytesIngested.WithLabelValues(userID).Add(float64(bytesCount)) - linesIngested.WithLabelValues(userID).Add(float64(lineCount)) - // First we flatten out the request into a list of samples. // We use the heuristic of 1 sample per TS to size the array. // We also work out the hash value at the same time. diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index b25e3961bf0f..8935bd657134 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -4,7 +4,12 @@ import ( "math" "net/http" + "github.com/dustin/go-humanize" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/util" @@ -12,15 +17,28 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/unmarshal" unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy" + lokiutil "github.com/grafana/loki/pkg/util" ) -var contentType = http.CanonicalHeaderKey("Content-Type") +var ( + contentType = http.CanonicalHeaderKey("Content-Type") + + bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "distributor_bytes_received_total", + Help: "The total number of uncompressed bytes received per tenant", + }, []string{"tenant"}) + linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "distributor_lines_received_total", + Help: "The total number of lines received per tenant", + }, []string{"tenant"}) +) const applicationJSON = "application/json" // PushHandler reads a snappy-compressed proto from the HTTP body. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { - req, err := ParseRequest(r) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -42,16 +60,54 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { } func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { + userID, _ := user.ExtractOrgID(r.Context()) + logger := util.WithContext(r.Context(), util.Logger) + body := lokiutil.NewSizeReader(r.Body) + contentType := r.Header.Get(contentType) var req logproto.PushRequest - switch r.Header.Get(contentType) { + defer func() { + var ( + entriesSize int64 + streamLabelsSize int64 + totalEntries int64 + ) + + for _, s := range req.Streams { + streamLabelsSize += int64(len(s.Labels)) + for _, e := range s.Entries { + totalEntries++ + entriesSize += int64(len(e.Line)) + } + } + + // incrementing tenant metrics if we have a tenant. + if totalEntries != 0 && userID != "" { + bytesIngested.WithLabelValues(userID).Add(float64(entriesSize)) + linesIngested.WithLabelValues(userID).Add(float64(totalEntries)) + } + + level.Debug(logger).Log( + "msg", "push request parsed", + "path", r.URL.Path, + "contentType", contentType, + "bodySize", humanize.Bytes(uint64(body.Size())), + "streams", len(req.Streams), + "entries", totalEntries, + "streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)), + "entriesSize", humanize.Bytes(uint64(entriesSize)), + "totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)), + ) + }() + + switch contentType { case applicationJSON: var err error if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 { - err = unmarshal.DecodePushRequest(r.Body, &req) + err = unmarshal.DecodePushRequest(body, &req) } else { - err = unmarshal_legacy.DecodePushRequest(r.Body, &req) + err = unmarshal_legacy.DecodePushRequest(body, &req) } if err != nil { @@ -59,7 +115,7 @@ func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { } default: - if err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil { + if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil { return nil, err } } diff --git a/pkg/util/reader.go b/pkg/util/reader.go new file mode 100644 index 000000000000..47a5764143bd --- /dev/null +++ b/pkg/util/reader.go @@ -0,0 +1,31 @@ +package util + +import ( + "io" +) + +type sizeReader struct { + size int64 + r io.Reader +} + +type SizeReader interface { + io.Reader + Size() int64 +} + +// NewSizeReader returns an io.Reader that will have the number of bytes +// read from r available. +func NewSizeReader(r io.Reader) SizeReader { + return &sizeReader{r: r} +} + +func (v *sizeReader) Read(p []byte) (int, error) { + n, err := v.r.Read(p) + v.size += int64(n) + return n, err +} + +func (v *sizeReader) Size() int64 { + return v.size +}