Skip to content

Commit

Permalink
Improve error message for stream rate limit. (#4207)
Browse files Browse the repository at this point in the history
* Improve error message for stream rate limit.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Move stream rate limit error message to validation package.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Use ByteSize for number values in stream rate limit error so we can
print human readable rate limits and log line byte lengths.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix lint issues.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Don't need to type assert twice.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Compare errors contents instead of doing error equals comparison
directly since the rate limit error is an RPC error.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Don't call Limit on hot path just to log the value in errors.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Make coment about rate limit clearer.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
  • Loading branch information
cstyan authored Sep 30, 2021
1 parent de0c469 commit 2516347
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
16 changes: 11 additions & 5 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util/flagext"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -49,8 +50,7 @@ var (
)

var (
ErrEntriesExist = errors.New("duplicate push - entries already exist")
ErrStreamRateLimit = errors.New("stream rate limit exceeded")
ErrEntriesExist = errors.New("duplicate push - entries already exist")
)

func init() {
Expand Down Expand Up @@ -218,6 +218,11 @@ func (s *stream) Push(
}
}()

// This call uses a mutex under the hood, cache the result since we're checking the limit
// on each entry in the push (hot path) and we only use this value when logging entries
// over the rate limit.
limit := s.limiter.lim.Limit()

// Don't fail on the first append error - if samples are sent out of order,
// we still want to append the later ones.
for i := range entries {
Expand All @@ -240,7 +245,7 @@ func (s *stream) Push(
// Check if this this should be rate limited.
now := time.Now()
if !s.limiter.AllowN(now, len(entries[i].Line)) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], ErrStreamRateLimit})
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[i].Line))}})
rateLimitedSamples++
rateLimitedBytes += len(entries[i].Line)
continue
Expand Down Expand Up @@ -318,14 +323,15 @@ func (s *stream) Push(

if len(failedEntriesWithError) > 0 {
lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1]
if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && lastEntryWithErr.e != ErrStreamRateLimit {
_, ok := lastEntryWithErr.e.(*validation.ErrStreamRateLimit)
if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && !ok {
return bytesAdded, lastEntryWithErr.e
}
var statusCode int
if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
statusCode = http.StatusBadRequest
}
if lastEntryWithErr.e == ErrStreamRateLimit {
if ok {
statusCode = http.StatusTooManyRequests
}
// Return a http status 4xx request response with all failed entries.
Expand Down
11 changes: 6 additions & 5 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/flagext"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -329,13 +330,13 @@ func TestPushRateLimit(t *testing.T) {
NilMetrics,
)

// Counter should be 2 now since the first line will be deduped.
_, err = s.Push(context.Background(), []logproto.Entry{
entries := []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "aaaaaaaaaa"},
{Timestamp: time.Unix(1, 0), Line: "aaaaaaaaab"},
}, recordPool.GetRecord(), 0)
require.Contains(t, err.Error(), ErrStreamRateLimit.Error())
require.Contains(t, err.Error(), "total ignored: 1 out of 2")
}
// Counter should be 2 now since the first line will be deduped.
_, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0)
require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[1].Line))}).Error())
}

func iterEq(t *testing.T, exp []logproto.Entry, got iter.EntryIterator) {
Expand Down
17 changes: 17 additions & 0 deletions pkg/validation/validate.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package validation

import (
"fmt"

"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/util/flagext"
)

const (
Expand Down Expand Up @@ -47,6 +51,19 @@ const (
DuplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'"
)

type ErrStreamRateLimit struct {
RateLimit flagext.ByteSize
Labels string
Bytes flagext.ByteSize
}

func (e *ErrStreamRateLimit) Error() string {
return fmt.Sprintf("Per stream rate limit exceeded (limit: %s/sec) while attempting to ingest for stream '%s' totaling %s, consider splitting a stream via additional labels or contact your Loki administrator to see if the limt can be increased",
e.RateLimit.String(),
e.Labels,
e.Bytes.String())
}

// MutatedSamples is a metric of the total number of lines mutated, by reason.
var MutatedSamples = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down

0 comments on commit 2516347

Please sign in to comment.