Skip to content

Commit

Permalink
Improve distributors validation. (#3134)
Browse files Browse the repository at this point in the history
```bash
❯ benchcmp  before.txt after.txt
benchmark             old ns/op     new ns/op     delta
Benchmark_Push-16     18708519      1348876       -92.79%

benchmark             old allocs     new allocs     delta
Benchmark_Push-16     44             42             -4.55%

benchmark             old bytes     new bytes     delta
Benchmark_Push-16     4008391       2079          -99.95%
```

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jan 7, 2021
1 parent c9b85b3 commit 82d96de
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 34 deletions.
19 changes: 11 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,27 +214,30 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedSamplesSize := 0
validatedSamplesCount := 0

validationContext := d.validator.getValidationContextFor(userID)

for _, stream := range req.Streams {
stream.Labels, err = d.parseStreamLabels(userID, stream.Labels, &stream)
stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
validationErr = err
continue
}
entries := make([]logproto.Entry, 0, len(stream.Entries))
n := 0
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(userID, stream.Labels, entry); err != nil {
if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {
validationErr = err
continue
}
entries = append(entries, entry)
stream.Entries[n] = entry
n++
validatedSamplesSize += len(entry.Line)
validatedSamplesCount++
}
stream.Entries = stream.Entries[:n]

if len(entries) == 0 {
if len(stream.Entries) == 0 {
continue
}
stream.Entries = entries
keys = append(keys, util.TokenFor(userID, stream.Labels))
streams = append(streams, streamTracker{
stream: stream,
Expand Down Expand Up @@ -358,7 +361,7 @@ func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckReques
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

func (d *Distributor) parseStreamLabels(userID string, key string, stream *logproto.Stream) (string, error) {
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (string, error) {
labelVal, ok := d.labelCache.Get(key)
if ok {
return labelVal.(string), nil
Expand All @@ -368,7 +371,7 @@ func (d *Distributor) parseStreamLabels(userID string, key string, stream *logpr
return "", httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
}
// ensure labels are correctly sorted.
if err := d.validator.ValidateLabels(userID, ls, *stream); err != nil {
if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {
return "", err
}
lsVal := ls.String()
Expand Down
34 changes: 32 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package distributor
import (
"context"
"fmt"
"math"
"math/rand"
"net"
"net/http"
Expand Down Expand Up @@ -120,16 +121,45 @@ func Benchmark_SortLabelsOnPush(b *testing.B) {
d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
request := makeWriteRequest(10, 10)
vCtx := d.validator.getValidationContextFor("123")
for n := 0; n < b.N; n++ {
stream := request.Streams[0]
stream.Labels = `{buzz="f", a="b"}`
_, err := d.parseStreamLabels("123", stream.Labels, &stream)
_, err := d.parseStreamLabels(vCtx, stream.Labels, &stream)
if err != nil {
panic("parseStreamLabels fail,err:" + err.Error())
}
}
}

func Benchmark_Push(b *testing.B) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionBurstSizeMB = math.MaxInt32
limits.CardinalityLimit = math.MaxInt32
limits.IngestionRateMB = math.MaxInt32
limits.EnforceMetricName = false
limits.MaxLineSize = math.MaxInt32
limits.RejectOldSamples = true
limits.RejectOldSamplesMaxAge = 24 * time.Hour
limits.CreationGracePeriod = 24 * time.Hour
ingester := &mockIngester{}
d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
request := makeWriteRequest(100000, 100)

b.ResetTimer()
b.ReportAllocs()

for n := 0; n < b.N; n++ {

_, err := d.Push(ctx, request)
if err != nil {
require.NoError(b, err)
}
}
}

func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
type testPush struct {
bytes int
Expand Down Expand Up @@ -305,7 +335,7 @@ func makeWriteRequest(lines int, size int) *logproto.PushRequest {
line = line[:size]

req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Timestamp: time.Now().Add(time.Duration(i) * time.Millisecond),
Line: line,
})
}
Expand Down
70 changes: 48 additions & 22 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,58 +24,84 @@ func NewValidator(l Limits) (*Validator, error) {
return &Validator{l}, nil
}

type validationContext struct {
rejectOldSample bool
rejectOldSampleMaxAge int64
creationGracePeriod int64
maxLineSize int

maxLabelNamesPerSeries int
maxLabelNameLength int
maxLabelValueLength int

userID string
}

func (v Validator) getValidationContextFor(userID string) validationContext {
now := time.Now()
return validationContext{
userID: userID,
rejectOldSample: v.RejectOldSamples(userID),
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
}
}

// ValidateEntry returns an error if the entry is invalid
func (v Validator) ValidateEntry(userID string, labels string, entry logproto.Entry) error {
if v.RejectOldSamples(userID) && entry.Timestamp.UnixNano() < time.Now().Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano() {
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, userID).Add(float64(len(entry.Line)))
func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error {
ts := entry.Timestamp.UnixNano()
if ctx.rejectOldSample && ts < ctx.rejectOldSampleMaxAge {
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg(labels, entry.Timestamp))
}

if entry.Timestamp.UnixNano() > time.Now().Add(v.CreationGracePeriod(userID)).UnixNano() {
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, userID).Add(float64(len(entry.Line)))
if ts > ctx.creationGracePeriod {
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg(labels, entry.Timestamp))
}

if maxSize := v.MaxLineSize(userID); maxSize != 0 && len(entry.Line) > maxSize {
if maxSize := ctx.maxLineSize; maxSize != 0 && len(entry.Line) > maxSize {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, userID).Add(float64(len(entry.Line)))
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(maxSize, len(entry.Line), labels))
}

return nil
}

// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, ls labels.Labels, stream logproto.Stream) error {
func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error {
numLabelNames := len(ls)
if numLabelNames > v.MaxLabelNamesPerSeries(userID) {
validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc()
if numLabelNames > ctx.maxLabelNamesPerSeries {
validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, ctx.userID).Inc()
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Add(float64(bytes))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(stream.Labels, numLabelNames, v.MaxLabelNamesPerSeries(userID)))
validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, ctx.userID).Add(float64(bytes))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries))
}

maxLabelNameLength := v.MaxLabelNameLength(userID)
maxLabelValueLength := v.MaxLabelValueLength(userID)
lastLabelName := ""
for _, l := range ls {
if len(l.Name) > maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, userID, stream)
if len(l.Name) > ctx.maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg(stream.Labels, l.Name))
} else if len(l.Value) > maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, userID, stream)
} else if len(l.Value) > ctx.maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg(stream.Labels, l.Value))
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
updateMetrics(validation.DuplicateLabelNames, userID, stream)
updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg(stream.Labels, l.Name))
}
lastLabelName = l.Name
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
v, err := NewValidator(o)
assert.NoError(t, err)

err = v.ValidateEntry(tt.userID, testStreamLabels, tt.entry)
err = v.ValidateEntry(v.getValidationContextFor(tt.userID), testStreamLabels, tt.entry)
assert.Equal(t, tt.expected, err)
})
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
v, err := NewValidator(o)
assert.NoError(t, err)

err = v.ValidateLabels(tt.userID, mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels})
err = v.ValidateLabels(v.getValidationContextFor(tt.userID), mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels})
assert.Equal(t, tt.expected, err)
})
}
Expand Down

0 comments on commit 82d96de

Please sign in to comment.