Skip to content

Commit

Permalink
Disables the stream limiter until wal has recovered (#3114)
Browse files Browse the repository at this point in the history
* disables the stream limiter until wal has recovered

* limiter disable/enable methods
  • Loading branch information
owen-d authored Jan 5, 2021
1 parent 3edcda7 commit 5898755
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 0 deletions.
97 changes: 97 additions & 0 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,103 @@ func TestIngesterWAL(t *testing.T) {

}

func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)

ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Enabled: true,
Dir: walDir,
Recover: true,
CheckpointDuration: time.Second,
}
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
},
}

start := time.Now()
steps := 10
end := start.Add(time.Second * time.Duration(steps))

for i := 0; i < steps; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
}

ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)

ensureIngesterData(ctx, t, start, end, i)

require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// Limit all streams except those written during WAL recovery.
limitCfg := defaultLimitsTestConfig()
limitCfg.MaxLocalStreamsPerUser = -1
limits, err = validation.NewOverrides(limitCfg, nil)
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))

// ensure we've recovered data from wal segments
ensureIngesterData(ctx, t, start, end, i)

req = logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="new"}`,
Entries: []logproto.Entry{
{
Timestamp: start,
Line: "hi",
},
},
},
},
}

ctx = user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
// Ensure regular pushes error due to stream limits.
require.Error(t, err)

}

func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) {
fs, err := ioutil.ReadDir(walDir)
require.Nil(t, err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid

func (i *Ingester) starting(ctx context.Context) error {
if i.cfg.WAL.Recover {
// Disable the in process stream limit checks while replaying the WAL
i.limiter.Disable()
defer i.limiter.Enable()

recoverer := newIngesterRecoverer(i)
defer recoverer.Close()

Expand Down
24 changes: 24 additions & 0 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"fmt"
"math"
"sync"

"github.com/grafana/loki/pkg/util/validation"
)
Expand All @@ -23,6 +24,21 @@ type Limiter struct {
limits *validation.Overrides
ring RingCount
replicationFactor int

mtx sync.RWMutex
disabled bool
}

func (l *Limiter) Disable() {
l.mtx.Lock()
defer l.mtx.Unlock()
l.disabled = true
}

func (l *Limiter) Enable() {
l.mtx.Lock()
defer l.mtx.Unlock()
l.disabled = false
}

// NewLimiter makes a new limiter
Expand All @@ -37,6 +53,14 @@ func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor
// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
// Until the limiter actually starts, all accesses are successful.
// This is used to disable limits while recovering from the WAL.
l.mtx.RLock()
defer l.mtx.RUnlock()
if l.disabled {
return nil
}

// Start by setting the local limit either from override or default
localLimit := l.limits.MaxLocalStreamsPerUser(userID)

Expand Down

0 comments on commit 5898755

Please sign in to comment.