Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limits: limits implementation for loki #948

Merged
merged 4 commits into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/grafana/loki/pkg/util/validation"
)

func init() {
Expand Down
98 changes: 86 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,34 @@ import (
"context"
"flag"
"net/http"
"sync"
"sync/atomic"
"time"

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"

"github.com/go-kit/kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/time/rate"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

const metricName = "logs"
const (
metricName = "logs"
bytesInMB = 1048576
)

var readinessProbeSuccess = []byte("Ready")
var (
Expand Down Expand Up @@ -55,10 +62,13 @@ var (
type Config struct {
// For testing.
factory func(addr string) (grpc_health_v1.HealthClient, error)

LimiterReloadPeriod time.Duration `yaml:"limiter_reload_period"`
}

// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
}

// Distributor coordinates replicates and distribution of log streams.
Expand All @@ -68,6 +78,10 @@ type Distributor struct {
ring ring.ReadRing
overrides *validation.Overrides
pool *cortex_client.Pool

ingestLimitersMtx sync.RWMutex
ingestLimiters map[string]*rate.Limiter
quit chan struct{}
}

// New a distributor creates.
Expand All @@ -79,13 +93,44 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *val
}
}

return &Distributor{
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
}, nil
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
ingestLimiters: map[string]*rate.Limiter{},
quit: make(chan struct{}),
}

go d.loop()

return &d, nil
}

func (d *Distributor) loop() {
if d.cfg.LimiterReloadPeriod == 0 {
return
}

ticker := time.NewTicker(d.cfg.LimiterReloadPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
d.ingestLimitersMtx.Lock()
d.ingestLimiters = make(map[string]*rate.Limiter, len(d.ingestLimiters))
d.ingestLimitersMtx.Unlock()

case <-d.quit:
return
}
}
}

func (d *Distributor) Stop() {
close(d.quit)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand Down Expand Up @@ -145,6 +190,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
streams := make([]streamTracker, 0, len(req.Streams))
keys := make([]uint32, 0, len(req.Streams))
var validationErr error
validatedSamplesSize := 0

for _, stream := range req.Streams {
if err := d.validateLabels(userID, stream.Labels); err != nil {
validationErr = err
Expand All @@ -153,13 +200,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

entries := make([]logproto.Entry, 0, len(stream.Entries))
for _, entry := range stream.Entries {
if err := validation.ValidateSample(d.overrides, userID, metricName, cortex_client.Sample{
if err := cortex_validation.ValidateSample(d.overrides, userID, metricName, cortex_client.Sample{
TimestampMs: entry.Timestamp.UnixNano() / int64(time.Millisecond),
}); err != nil {
validationErr = err
continue
}
entries = append(entries, entry)
validatedSamplesSize += len(entry.Line)
}

if len(entries) == 0 {
Expand All @@ -176,6 +224,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

limiter := d.getOrCreateIngestLimiter(userID)
if !limiter.AllowN(time.Now(), validatedSamplesSize) {
// Return a 4xx here to have the client discard the data and not retry. If a client
// is sending too much data consistently we will unlikely ever catch up otherwise.
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d) exceeded while adding %d lines", int(limiter.Limit()), lineCount)
}

replicationSets, err := d.ring.BatchGet(keys, ring.Write)
if err != nil {
return nil, err
Expand Down Expand Up @@ -226,7 +282,7 @@ func (d *Distributor) validateLabels(userID, labels string) error {
}

// everything in `ValidateLabels` returns `httpgrpc.Errorf` errors, no sugaring needed
return validation.ValidateLabels(d.overrides, userID, ls)
return cortex_validation.ValidateLabels(d.overrides, userID, ls)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand Down Expand Up @@ -287,3 +343,21 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
d.ingestLimitersMtx.RLock()
limiter, ok := d.ingestLimiters[userID]
d.ingestLimitersMtx.RUnlock()

if ok {
return limiter
}

limiter = rate.NewLimiter(rate.Limit(int64(d.overrides.IngestionRate(userID)*bytesInMB)), int(d.overrides.IngestionBurstSize(userID)*bytesInMB))

d.ingestLimitersMtx.Lock()
d.ingestLimiters[userID] = limiter
d.ingestLimitersMtx.Unlock()

return limiter
}
55 changes: 49 additions & 6 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,72 @@ package distributor
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)

const (
numIngesters = 5
ingestionRateLimit = 0.000096 // 100 Bytes/s limit
)

const numIngesters = 5
var (
success = &logproto.PushResponse{}
ctx = user.InjectOrgID(context.Background(), "test")
)

func TestDistributor(t *testing.T) {
for i, tc := range []struct {
samples int
expectedResponse *logproto.PushResponse
expectedError error
}{
{
samples: 10,
expectedResponse: success,
},
{
samples: 100,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (100) exceeded while adding 100 lines"),
},
} {
t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.samples), func(t *testing.T) {
d := prepare(t)

request := makeWriteRequest(tc.samples)
response, err := d.Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.expectedError, err)
})
}
}

func prepare(t *testing.T) *Distributor {
var (
distributorConfig Config
defaultLimits validation.Limits
clientConfig client.Config
)
flagext.DefaultValues(&distributorConfig, &defaultLimits, &clientConfig)
defaultLimits.EnforceMetricName = false
defaultLimits.IngestionRate = ingestionRateLimit
defaultLimits.IngestionBurstSize = ingestionRateLimit

limits, err := validation.NewOverrides(defaultLimits)
require.NoError(t, err)
Expand All @@ -54,22 +94,25 @@ func TestDistributor(t *testing.T) {
d, err := New(distributorConfig, clientConfig, r, limits)
require.NoError(t, err)

return d
}

func makeWriteRequest(samples int) *logproto.PushRequest {
req := logproto.PushRequest{
Streams: []*logproto.Stream{
{
Labels: `{foo="bar"}`,
},
},
}
for i := 0; i < 10; i++ {

for i := 0; i < samples; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}

_, err = d.Push(user.InjectOrgID(context.Background(), "test"), &req)
require.NoError(t, err)
return &req
}

type mockIngester struct {
Expand Down
Loading