From 58987553a1da705b90c7d2562bfb1fb081f2c10a Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 5 Jan 2021 05:50:07 -0500 Subject: [PATCH] Disables the stream limiter until wal has recovered (#3114) * disables the stream limiter until wal has recovered * limiter disable/enable methods --- pkg/ingester/checkpoint_test.go | 97 +++++++++++++++++++++++++++++++++ pkg/ingester/ingester.go | 4 ++ pkg/ingester/limiter.go | 24 ++++++++ 3 files changed, 125 insertions(+) diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 85a99b147b81..24509c708825 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -129,6 +129,103 @@ func TestIngesterWAL(t *testing.T) { } +func TestIngesterWALIgnoresStreamLimits(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") + require.Nil(t, err) + defer os.RemoveAll(walDir) + + ingesterConfig := defaultIngesterTestConfig(t) + ingesterConfig.MaxTransferRetries = 0 + ingesterConfig.WAL = WALConfig{ + Enabled: true, + Dir: walDir, + Recover: true, + CheckpointDuration: time.Second, + } + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Labels: `{foo="bar",bar="baz2"}`, + }, + }, + } + + start := time.Now() + steps := 10 + end := start.Add(time.Second * time.Duration(steps)) + + for i := 0; i < steps; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", i), + }) + } + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + ensureIngesterData(ctx, t, start, end, i) + + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // Limit all streams except those written during WAL recovery. + limitCfg := defaultLimitsTestConfig() + limitCfg.MaxLocalStreamsPerUser = -1 + limits, err = validation.NewOverrides(limitCfg, nil) + require.NoError(t, err) + + // restart the ingester + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // ensure we've recovered data from wal segments + ensureIngesterData(ctx, t, start, end, i) + + req = logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="new"}`, + Entries: []logproto.Entry{ + { + Timestamp: start, + Line: "hi", + }, + }, + }, + }, + } + + ctx = user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + // Ensure regular pushes error due to stream limits. + require.Error(t, err) + +} + func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) { fs, err := ioutil.ReadDir(walDir) require.Nil(t, err) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index bd8b7cea1bd7..be635014373d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -210,6 +210,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid func (i *Ingester) starting(ctx context.Context) error { if i.cfg.WAL.Recover { + // Disable the in process stream limit checks while replaying the WAL + i.limiter.Disable() + defer i.limiter.Enable() + recoverer := newIngesterRecoverer(i) defer recoverer.Close() diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 382c1c0be70b..7bf56690e2f5 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -3,6 +3,7 @@ package ingester import ( "fmt" "math" + "sync" "github.com/grafana/loki/pkg/util/validation" ) @@ -23,6 +24,21 @@ type Limiter struct { limits *validation.Overrides ring RingCount replicationFactor int + + mtx sync.RWMutex + disabled bool +} + +func (l *Limiter) Disable() { + l.mtx.Lock() + defer l.mtx.Unlock() + l.disabled = true +} + +func (l *Limiter) Enable() { + l.mtx.Lock() + defer l.mtx.Unlock() + l.disabled = false } // NewLimiter makes a new limiter @@ -37,6 +53,14 @@ func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor // AssertMaxStreamsPerUser ensures limit has not been reached compared to the current // number of streams in input and returns an error if so. func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error { + // Until the limiter actually starts, all accesses are successful. + // This is used to disable limits while recovering from the WAL. + l.mtx.RLock() + defer l.mtx.RUnlock() + if l.disabled { + return nil + } + // Start by setting the local limit either from override or default localLimit := l.limits.MaxLocalStreamsPerUser(userID)