From d336f03557632bcd224508804f43f72d2047477f Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 4 Jan 2021 09:24:07 -0500 Subject: [PATCH 1/2] disables the stream limiter until wal has recovered --- pkg/ingester/checkpoint_test.go | 97 +++++++++++++++++++++++++++++++++ pkg/ingester/ingester.go | 3 + pkg/ingester/limiter.go | 19 +++++++ 3 files changed, 119 insertions(+) diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 85a99b147b81..24509c708825 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -129,6 +129,103 @@ func TestIngesterWAL(t *testing.T) { } +func TestIngesterWALIgnoresStreamLimits(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") + require.Nil(t, err) + defer os.RemoveAll(walDir) + + ingesterConfig := defaultIngesterTestConfig(t) + ingesterConfig.MaxTransferRetries = 0 + ingesterConfig.WAL = WALConfig{ + Enabled: true, + Dir: walDir, + Recover: true, + CheckpointDuration: time.Second, + } + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Labels: `{foo="bar",bar="baz2"}`, + }, + }, + } + + start := time.Now() + steps := 10 + end := start.Add(time.Second * time.Duration(steps)) + + for i := 0; i < steps; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: start.Add(time.Duration(i) * time.Second), + Line: fmt.Sprintf("line %d", i), + }) + } + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + ensureIngesterData(ctx, t, start, end, i) + + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // Limit all streams except those written during WAL recovery. + limitCfg := defaultLimitsTestConfig() + limitCfg.MaxLocalStreamsPerUser = -1 + limits, err = validation.NewOverrides(limitCfg, nil) + require.NoError(t, err) + + // restart the ingester + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // ensure we've recovered data from wal segments + ensureIngesterData(ctx, t, start, end, i) + + req = logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="new"}`, + Entries: []logproto.Entry{ + { + Timestamp: start, + Line: "hi", + }, + }, + }, + }, + } + + ctx = user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + // Ensure regular pushes error due to stream limits. + require.Error(t, err) + +} + func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) { fs, err := ioutil.ReadDir(walDir) require.Nil(t, err) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index bd8b7cea1bd7..4faf6e16bfab 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -249,6 +249,9 @@ func (i *Ingester) starting(ctx context.Context) error { } + // Once the WAL has replayed, signal the stream limiter to start. + i.limiter.Begin() + i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) for j := 0; j < i.cfg.ConcurrentFlushes; j++ { i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 382c1c0be70b..d1a1c9d3ffbe 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -3,6 +3,7 @@ package ingester import ( "fmt" "math" + "sync" "github.com/grafana/loki/pkg/util/validation" ) @@ -23,6 +24,16 @@ type Limiter struct { limits *validation.Overrides ring RingCount replicationFactor int + + mtx sync.RWMutex + started bool +} + +// Begins Begin +func (l *Limiter) Begin() { + l.mtx.Lock() + defer l.mtx.Unlock() + l.started = true } // NewLimiter makes a new limiter @@ -37,6 +48,14 @@ func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor // AssertMaxStreamsPerUser ensures limit has not been reached compared to the current // number of streams in input and returns an error if so. func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error { + // Until the limiter actually starts, all accesses are successful. + // This is used to disable limits while recovering from the WAL. + l.mtx.RLock() + defer l.mtx.RUnlock() + if !l.started { + return nil + } + // Start by setting the local limit either from override or default localLimit := l.limits.MaxLocalStreamsPerUser(userID) From 31279d4e0541b8a222a9128cfff0f4909ade0400 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 4 Jan 2021 09:42:04 -0500 Subject: [PATCH 2/2] limiter disable/enable methods --- pkg/ingester/ingester.go | 7 ++++--- pkg/ingester/limiter.go | 17 +++++++++++------ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4faf6e16bfab..be635014373d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -210,6 +210,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid func (i *Ingester) starting(ctx context.Context) error { if i.cfg.WAL.Recover { + // Disable the in process stream limit checks while replaying the WAL + i.limiter.Disable() + defer i.limiter.Enable() + recoverer := newIngesterRecoverer(i) defer recoverer.Close() @@ -249,9 +253,6 @@ func (i *Ingester) starting(ctx context.Context) error { } - // Once the WAL has replayed, signal the stream limiter to start. - i.limiter.Begin() - i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) for j := 0; j < i.cfg.ConcurrentFlushes; j++ { i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index d1a1c9d3ffbe..7bf56690e2f5 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -25,15 +25,20 @@ type Limiter struct { ring RingCount replicationFactor int - mtx sync.RWMutex - started bool + mtx sync.RWMutex + disabled bool } -// Begins Begin -func (l *Limiter) Begin() { +func (l *Limiter) Disable() { l.mtx.Lock() defer l.mtx.Unlock() - l.started = true + l.disabled = true +} + +func (l *Limiter) Enable() { + l.mtx.Lock() + defer l.mtx.Unlock() + l.disabled = false } // NewLimiter makes a new limiter @@ -52,7 +57,7 @@ func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error { // This is used to disable limits while recovering from the WAL. l.mtx.RLock() defer l.mtx.RUnlock() - if !l.started { + if l.disabled { return nil }