Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new stream count limiter #13006

Merged
merged 5 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2974,6 +2974,11 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -validation.discover-log-levels
[discover_log_levels: <boolean> | default = true]

# When true, the ingester takes into account only the streams that belongs to
# this instance according to the ring while applying the stream limit.
# CLI flag: -ingester.use-owned-stream-count
[use_owned_stream_count: <boolean> | default = false]

# Maximum number of active streams per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-streams-per-user
[max_streams_per_user: <int> | default = 0]
Expand Down
75 changes: 45 additions & 30 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ type instance struct {
tailers map[uint32]*tailer
tailerMtx sync.RWMutex

limiter *Limiter
limiter *Limiter
streamCountLimiter *streamCountLimiter
ownedStreamsSvc *ownedStreamService

configs *runtime.TenantConfigs

wal WAL
Expand Down Expand Up @@ -147,21 +150,24 @@ func newInstance(
if err != nil {
return nil, err
}

streams := newStreamsMap()
ownedStreamsSvc := newOwnedStreamService(instanceID, limiter)
c := config.SchemaConfig{Configs: periodConfigs}
i := &instance{
cfg: cfg,
streams: newStreamsMap(),
streams: streams,
buf: make([]byte, 0, 1024),
index: invertedIndex,
instanceID: instanceID,

streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),

tailers: map[uint32]*tailer{},
limiter: limiter,
configs: configs,
tailers: map[uint32]*tailer{},
limiter: limiter,
streamCountLimiter: newStreamCountLimiter(instanceID, streams.Len, limiter, ownedStreamsSvc),
ownedStreamsSvc: ownedStreamsSvc,
configs: configs,

wal: wal,
metrics: metrics,
Expand Down Expand Up @@ -286,29 +292,11 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
}

if record != nil {
err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, i.streams.Len())
err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID)
}

if err != nil {
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "failed to create stream, exceeded limit",
"org_id", i.instanceID,
"err", err,
"stream", pushReqStream.Labels,
)
}

validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
if i.customStreamsTracker != nil {
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes))
}
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID)
return i.onStreamCreationError(ctx, pushReqStream, err, labels)
}

fp := i.getHashForLabels(labels)
Expand All @@ -333,21 +321,47 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
i.metrics.recoveredStreamsTotal.Inc()
}

i.onStreamCreated(s)

return s, nil
}

func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logproto.Stream, err error, labels labels.Labels) (*stream, error) {
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "failed to create stream, exceeded limit",
"org_id", i.instanceID,
"err", err,
"stream", pushReqStream.Labels,
)
}

validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
if i.customStreamsTracker != nil {
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes))
}
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID)
}

func (i *instance) onStreamCreated(s *stream) {
memoryStreams.WithLabelValues(i.instanceID).Inc()
memoryStreamsLabelsBytes.Add(float64(len(s.labels.String())))
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(s)
streamsCountStats.Add(1)

i.ownedStreamsSvc.incOwnedStreamCount()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this go in the LogStreamCreation conditional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we need to do it all the time... btw, even if UseOwnedStreamCount is set to false, because this option is in runtime config and can be turned on at any moment, the service should always know the count of the streams.

if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "successfully created stream",
"org_id", i.instanceID,
"stream", pushReqStream.Labels,
"stream", s.labels.String(),
)
}

return s, nil
}

func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*stream, error) {
Expand Down Expand Up @@ -407,6 +421,7 @@ func (i *instance) removeStream(s *stream) {
memoryStreams.WithLabelValues(i.instanceID).Dec()
memoryStreamsLabelsBytes.Sub(float64(len(s.labels.String())))
streamsCountStats.Add(-1)
i.ownedStreamsSvc.decOwnedStreamCount()
}
}

Expand Down
81 changes: 58 additions & 23 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type RingCount interface {

type Limits interface {
UnorderedWrites(userID string) bool
UseOwnedStreamCount(userID string) bool
MaxLocalStreamsPerUser(userID string) int
MaxGlobalStreamsPerUser(userID string) int
PerStreamRateLimit(userID string) validation.RateLimit
Expand Down Expand Up @@ -76,46 +77,39 @@ func (l *Limiter) UnorderedWrites(userID string) bool {
return l.limits.UnorderedWrites(userID)
}

// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
// Until the limiter actually starts, all accesses are successful.
// This is used to disable limits while recovering from the WAL.
l.mtx.RLock()
defer l.mtx.RUnlock()
if l.disabled {
return nil
}

func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) {
// Start by setting the local limit either from override or default
localLimit := l.limits.MaxLocalStreamsPerUser(userID)
localLimit = l.limits.MaxLocalStreamsPerUser(tenantID)

// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit)
globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID)
adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit)

// Set the calculated limit to the lesser of the local limit or the new calculated global limit
calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit)
calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit)

// If both the local and global limits are disabled, we just
// use the largest int value
if calculatedLimit == 0 {
calculatedLimit = math.MaxInt32
}
return
}

if streams < calculatedLimit {
return nil
func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
}

return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
return first
}

func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
if globalLimit == 0 {
return 0
}

// todo: change to healthyInstancesInZoneCount() once
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Given we don't need a super accurate count (ie. when the ingesters
// topology changes) and we prefer to always be in favor of the tenant,
// we can use a per-ingester limit equal to:
Expand All @@ -131,12 +125,53 @@ func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
return 0
}

func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
type supplier[T any] func() T

type streamCountLimiter struct {
tenantID string
limiter *Limiter
defaultStreamCountSupplier supplier[int]
ownedStreamSvc *ownedStreamService
}

var noopFixedLimitSupplier = func() int {
return 0
}

func newStreamCountLimiter(tenantID string, defaultStreamCountSupplier supplier[int], limiter *Limiter, service *ownedStreamService) *streamCountLimiter {
return &streamCountLimiter{
tenantID: tenantID,
limiter: limiter,
defaultStreamCountSupplier: defaultStreamCountSupplier,
ownedStreamSvc: service,
}
}

return first
func (l *streamCountLimiter) AssertNewStreamAllowed(tenantID string) error {
streamCountSupplier, fixedLimitSupplier := l.getSuppliers(tenantID)
calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit := l.getCurrentLimit(tenantID, fixedLimitSupplier)
actualStreamsCount := streamCountSupplier()
if actualStreamsCount < calculatedLimit {
return nil
}

return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, tenantID, actualStreamsCount, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
}

func (l *streamCountLimiter) getCurrentLimit(tenantID string, fixedLimitSupplier supplier[int]) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) {
calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit = l.limiter.GetStreamCountLimit(tenantID)
fixedLimit := fixedLimitSupplier()
if fixedLimit > calculatedLimit {
calculatedLimit = fixedLimit
}
return
}

func (l *streamCountLimiter) getSuppliers(tenant string) (streamCountSupplier, fixedLimitSupplier supplier[int]) {
if l.limiter.limits.UseOwnedStreamCount(tenant) {
return l.ownedStreamSvc.getOwnedStreamCount, l.ownedStreamSvc.getFixedLimit
}
return l.defaultStreamCountSupplier, noopFixedLimitSupplier
}

type RateLimiterStrategy interface {
Expand Down
47 changes: 45 additions & 2 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,23 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/time/rate"

"github.com/grafana/loki/v3/pkg/validation"
)

func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) {
tests := map[string]struct {
maxLocalStreamsPerUser int
maxGlobalStreamsPerUser int
ringReplicationFactor int
ringIngesterCount int
streams int
expected error
useOwnedStreamService bool
fixedLimit int32
ownedStreamCount int64
}{
"both local and global limit are disabled": {
maxLocalStreamsPerUser: 0,
Expand Down Expand Up @@ -94,6 +98,36 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300),
},
"actual limit must be used if it's greater than fixed limit": {
maxLocalStreamsPerUser: 500,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
useOwnedStreamService: true,
fixedLimit: 20,
ownedStreamCount: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300),
},
"fixed limit must be used if it's greater than actual limit": {
maxLocalStreamsPerUser: 500,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
useOwnedStreamService: true,
fixedLimit: 2000,
ownedStreamCount: 2001,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 2001, 2000, 500, 1000, 300),
},
"fixed limit must not be used if both limits are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 3,
ringIngesterCount: 10,
useOwnedStreamService: true,
fixedLimit: 2000,
ownedStreamCount: 2001,
expected: nil,
},
}

for testName, testData := range tests {
Expand All @@ -107,11 +141,20 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser,
MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser,
UseOwnedStreamCount: testData.useOwnedStreamService,
}, nil)
require.NoError(t, err)

ownedStreamSvc := &ownedStreamService{
fixedLimit: atomic.NewInt32(testData.fixedLimit),
ownedStreamCount: atomic.NewInt64(testData.ownedStreamCount),
}
limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor)
actual := limiter.AssertMaxStreamsPerUser("test", testData.streams)
defaultCountSupplier := func() int {
return testData.streams
}
streamCountLimiter := newStreamCountLimiter("test", defaultCountSupplier, limiter, ownedStreamSvc)
actual := streamCountLimiter.AssertNewStreamAllowed("test")

assert.Equal(t, testData.expected, actual)
})
Expand Down
44 changes: 44 additions & 0 deletions pkg/ingester/owned_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package ingester

import "go.uber.org/atomic"

type ownedStreamService struct {
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32

//todo: implement job to recalculate it
ownedStreamCount *atomic.Int64
}

func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
svc := &ownedStreamService{
tenantID: tenantID,
limiter: limiter,
ownedStreamCount: atomic.NewInt64(0),
fixedLimit: atomic.NewInt32(0),
}
svc.updateFixedLimit()
return svc
}

func (s *ownedStreamService) getOwnedStreamCount() int {
return int(s.ownedStreamCount.Load())
}

func (s *ownedStreamService) updateFixedLimit() {
limit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID)
s.fixedLimit.Store(int32(limit))
}

func (s *ownedStreamService) getFixedLimit() int {
return int(s.fixedLimit.Load())
}

func (s *ownedStreamService) incOwnedStreamCount() {
s.ownedStreamCount.Inc()
}

func (s *ownedStreamService) decOwnedStreamCount() {
s.ownedStreamCount.Dec()
}
Loading
Loading