diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index acf11102be511..c0e4bdeeca4d2 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -2978,6 +2978,11 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v # CLI flag: -validation.discover-log-levels [discover_log_levels: | default = true] +# When true an ingester takes into account only the streams that it owns +# according to the ring while applying the stream limit. +# CLI flag: -ingester.use-owned-stream-count +[use_owned_stream_count: | default = false] + # Maximum number of active streams per user, per ingester. 0 to disable. # CLI flag: -ingester.max-streams-per-user [max_streams_per_user: | default = 0] diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index a4436b9d41915..7f1ec78601fff 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -104,7 +104,10 @@ type instance struct { tailers map[uint32]*tailer tailerMtx sync.RWMutex - limiter *Limiter + limiter *Limiter + streamCountLimiter *streamCountLimiter + ownedStreamsSvc *ownedStreamService + configs *runtime.TenantConfigs wal WAL @@ -147,11 +150,12 @@ func newInstance( if err != nil { return nil, err } - + streams := newStreamsMap() + ownedStreamsSvc := newOwnedStreamService(instanceID, limiter) c := config.SchemaConfig{Configs: periodConfigs} i := &instance{ cfg: cfg, - streams: newStreamsMap(), + streams: streams, buf: make([]byte, 0, 1024), index: invertedIndex, instanceID: instanceID, @@ -159,9 +163,11 @@ func newInstance( streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID), streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID), - tailers: map[uint32]*tailer{}, - limiter: limiter, - configs: configs, + tailers: map[uint32]*tailer{}, + limiter: limiter, + streamCountLimiter: newStreamCountLimiter(instanceID, streams.Len, limiter, ownedStreamsSvc), + ownedStreamsSvc: ownedStreamsSvc, + configs: configs, wal: wal, metrics: metrics, @@ -286,29 +292,11 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre } if record != nil { - err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, i.streams.Len()) + err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID) } if err != nil { - if i.configs.LogStreamCreation(i.instanceID) { - level.Debug(util_log.Logger).Log( - "msg", "failed to create stream, exceeded limit", - "org_id", i.instanceID, - "err", err, - "stream", pushReqStream.Labels, - ) - } - - validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) - bytes := 0 - for _, e := range pushReqStream.Entries { - bytes += len(e.Line) - } - validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) - if i.customStreamsTracker != nil { - i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes)) - } - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID) + return i.onStreamCreationError(ctx, pushReqStream, err, labels) } fp := i.getHashForLabels(labels) @@ -333,21 +321,47 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre i.metrics.recoveredStreamsTotal.Inc() } + i.onStreamCreated(s) + + return s, nil +} + +func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logproto.Stream, err error, labels labels.Labels) (*stream, error) { + if i.configs.LogStreamCreation(i.instanceID) { + level.Debug(util_log.Logger).Log( + "msg", "failed to create stream, exceeded limit", + "org_id", i.instanceID, + "err", err, + "stream", pushReqStream.Labels, + ) + } + + validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) + bytes := 0 + for _, e := range pushReqStream.Entries { + bytes += len(e.Line) + } + validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) + if i.customStreamsTracker != nil { + i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes)) + } + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID) +} + +func (i *instance) onStreamCreated(s *stream) { memoryStreams.WithLabelValues(i.instanceID).Inc() memoryStreamsLabelsBytes.Add(float64(len(s.labels.String()))) i.streamsCreatedTotal.Inc() i.addTailersToNewStream(s) streamsCountStats.Add(1) - + i.ownedStreamsSvc.incOwnedStreamCount() if i.configs.LogStreamCreation(i.instanceID) { level.Debug(util_log.Logger).Log( "msg", "successfully created stream", "org_id", i.instanceID, - "stream", pushReqStream.Labels, + "stream", s.labels.String(), ) } - - return s, nil } func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*stream, error) { @@ -407,6 +421,7 @@ func (i *instance) removeStream(s *stream) { memoryStreams.WithLabelValues(i.instanceID).Dec() memoryStreamsLabelsBytes.Sub(float64(len(s.labels.String()))) streamsCountStats.Add(-1) + i.ownedStreamsSvc.decOwnedStreamCount() } } diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 94c77a30be7e3..daa1fe7aec8da 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -24,6 +24,7 @@ type RingCount interface { type Limits interface { UnorderedWrites(userID string) bool + UseOwnedStreamCount(userID string) bool MaxLocalStreamsPerUser(userID string) int MaxGlobalStreamsPerUser(userID string) int PerStreamRateLimit(userID string) validation.RateLimit @@ -76,46 +77,39 @@ func (l *Limiter) UnorderedWrites(userID string) bool { return l.limits.UnorderedWrites(userID) } -// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current -// number of streams in input and returns an error if so. -func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error { - // Until the limiter actually starts, all accesses are successful. - // This is used to disable limits while recovering from the WAL. - l.mtx.RLock() - defer l.mtx.RUnlock() - if l.disabled { - return nil - } - +func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) { // Start by setting the local limit either from override or default - localLimit := l.limits.MaxLocalStreamsPerUser(userID) + localLimit = l.limits.MaxLocalStreamsPerUser(tenantID) // We can assume that streams are evenly distributed across ingesters // so we do convert the global limit into a local limit - globalLimit := l.limits.MaxGlobalStreamsPerUser(userID) - adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit) + globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID) + adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit) // Set the calculated limit to the lesser of the local limit or the new calculated global limit - calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit) + calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit) // If both the local and global limits are disabled, we just // use the largest int value if calculatedLimit == 0 { calculatedLimit = math.MaxInt32 } + return +} - if streams < calculatedLimit { - return nil +func (l *Limiter) minNonZero(first, second int) int { + if first == 0 || (second != 0 && first > second) { + return second } - return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit) + return first } func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int { if globalLimit == 0 { return 0 } - + // todo: change to healthyInstancesInZoneCount() once // Given we don't need a super accurate count (ie. when the ingesters // topology changes) and we prefer to always be in favor of the tenant, // we can use a per-ingester limit equal to: @@ -131,12 +125,53 @@ func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int { return 0 } -func (l *Limiter) minNonZero(first, second int) int { - if first == 0 || (second != 0 && first > second) { - return second +type supplier[T any] func() T + +type streamCountLimiter struct { + tenantID string + limiter *Limiter + defaultStreamCountSupplier supplier[int] + ownedStreamSvc *ownedStreamService +} + +var noopFixedLimitSupplier = func() int { + return 0 +} + +func newStreamCountLimiter(tenantID string, defaultStreamCountSupplier supplier[int], limiter *Limiter, service *ownedStreamService) *streamCountLimiter { + return &streamCountLimiter{ + tenantID: tenantID, + limiter: limiter, + defaultStreamCountSupplier: defaultStreamCountSupplier, + ownedStreamSvc: service, } +} - return first +func (l *streamCountLimiter) AssertNewStreamAllowed(tenantID string) error { + streamCountSupplier, fixedLimitSupplier := l.getSuppliers(tenantID) + calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit := l.getCurrentLimit(tenantID, fixedLimitSupplier) + actualStreamsCount := streamCountSupplier() + if actualStreamsCount < calculatedLimit { + return nil + } + + return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, tenantID, actualStreamsCount, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit) +} + +func (l *streamCountLimiter) getCurrentLimit(tenantID string, fixedLimitSupplier supplier[int]) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) { + calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit = l.limiter.GetStreamCountLimit(tenantID) + fixedLimit := fixedLimitSupplier() + if fixedLimit > calculatedLimit { + calculatedLimit = fixedLimit + } + return +} + +func (l *streamCountLimiter) getSuppliers(tenant string) (streamCountSupplier, fixedLimitSupplier supplier[int]) { + if l.limiter.limits.UseOwnedStreamCount(tenant) { + return l.ownedStreamSvc.getOwnedStreamCount, l.ownedStreamSvc.getFixedLimit + } + return l.defaultStreamCountSupplier, noopFixedLimitSupplier } type RateLimiterStrategy interface { diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 6186e910663e0..9d4d3b3037c6f 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -8,12 +8,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "golang.org/x/time/rate" "github.com/grafana/loki/v3/pkg/validation" ) -func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { +func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { tests := map[string]struct { maxLocalStreamsPerUser int maxGlobalStreamsPerUser int @@ -21,6 +22,9 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { ringIngesterCount int streams int expected error + useOwnedStreamService bool + fixedLimit int32 + ownedStreamCount int64 }{ "both local and global limit are disabled": { maxLocalStreamsPerUser: 0, @@ -94,6 +98,36 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { streams: 3000, expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300), }, + "actual limit must be used if it's greater than fixed limit": { + maxLocalStreamsPerUser: 500, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + useOwnedStreamService: true, + fixedLimit: 20, + ownedStreamCount: 3000, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300), + }, + "fixed limit must be used if it's greater than actual limit": { + maxLocalStreamsPerUser: 500, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + useOwnedStreamService: true, + fixedLimit: 2000, + ownedStreamCount: 2001, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 2001, 2000, 500, 1000, 300), + }, + "fixed limit must not be used if both limits are disabled": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 0, + ringReplicationFactor: 3, + ringIngesterCount: 10, + useOwnedStreamService: true, + fixedLimit: 2000, + ownedStreamCount: 2001, + expected: nil, + }, } for testName, testData := range tests { @@ -107,11 +141,20 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{ MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser, MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser, + UseOwnedStreamCount: testData.useOwnedStreamService, }, nil) require.NoError(t, err) + ownedStreamSvc := &ownedStreamService{ + fixedLimit: atomic.NewInt32(testData.fixedLimit), + ownedStreamCount: atomic.NewInt64(testData.ownedStreamCount), + } limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor) - actual := limiter.AssertMaxStreamsPerUser("test", testData.streams) + defaultCountSupplier := func() int { + return testData.streams + } + streamCountLimiter := newStreamCountLimiter("test", defaultCountSupplier, limiter, ownedStreamSvc) + actual := streamCountLimiter.AssertNewStreamAllowed("test") assert.Equal(t, testData.expected, actual) }) diff --git a/pkg/ingester/owned_streams.go b/pkg/ingester/owned_streams.go new file mode 100644 index 0000000000000..01cb8235f9b1a --- /dev/null +++ b/pkg/ingester/owned_streams.go @@ -0,0 +1,44 @@ +package ingester + +import "go.uber.org/atomic" + +type ownedStreamService struct { + tenantID string + limiter *Limiter + fixedLimit *atomic.Int32 + + //todo: implement job to recalculate it + ownedStreamCount *atomic.Int64 +} + +func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService { + svc := &ownedStreamService{ + tenantID: tenantID, + limiter: limiter, + ownedStreamCount: atomic.NewInt64(0), + fixedLimit: atomic.NewInt32(0), + } + svc.updateFixedLimit() + return svc +} + +func (s *ownedStreamService) getOwnedStreamCount() int { + return int(s.ownedStreamCount.Load()) +} + +func (s *ownedStreamService) updateFixedLimit() { + limit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID) + s.fixedLimit.Store(int32(limit)) +} + +func (s *ownedStreamService) getFixedLimit() int { + return int(s.fixedLimit.Load()) +} + +func (s *ownedStreamService) incOwnedStreamCount() { + s.ownedStreamCount.Inc() +} + +func (s *ownedStreamService) decOwnedStreamCount() { + s.ownedStreamCount.Dec() +} diff --git a/pkg/ingester/owned_streams_test.go b/pkg/ingester/owned_streams_test.go new file mode 100644 index 0000000000000..c7ddd9d87f29d --- /dev/null +++ b/pkg/ingester/owned_streams_test.go @@ -0,0 +1,36 @@ +package ingester + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/validation" +) + +func Test_OwnedStreamService(t *testing.T) { + limits, err := validation.NewOverrides(validation.Limits{ + MaxGlobalStreamsPerUser: 100, + }, nil) + require.NoError(t, err) + // Mock the ring + ring := &ringCountMock{count: 30} + limiter := NewLimiter(limits, NilMetrics, ring, 3) + + service := newOwnedStreamService("test", limiter) + require.Equal(t, 0, service.getOwnedStreamCount()) + require.Equal(t, 10, service.getFixedLimit(), "fixed limit must be initialised during the instantiation") + + limits.DefaultLimits().MaxGlobalStreamsPerUser = 1000 + require.Equal(t, 10, service.getFixedLimit(), "fixed list must not be changed until update is triggered") + + service.updateFixedLimit() + require.Equal(t, 100, service.getFixedLimit()) + + service.incOwnedStreamCount() + service.incOwnedStreamCount() + require.Equal(t, 2, service.getOwnedStreamCount()) + + service.decOwnedStreamCount() + require.Equal(t, 1, service.getOwnedStreamCount()) +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 148205b306c19..7fde990e43656 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -85,6 +85,7 @@ type Limits struct { DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"` // Ingester enforced limits. + UseOwnedStreamCount bool `yaml:"use_owned_stream_count" json:"use_owned_stream_count"` MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"` MaxGlobalStreamsPerUser int `yaml:"max_global_streams_per_user" json:"max_global_streams_per_user"` UnorderedWrites bool `yaml:"unordered_writes" json:"unordered_writes"` @@ -270,6 +271,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") f.IntVar(&l.MaxEntriesLimitPerQuery, "validation.max-entries-limit", 5000, "Maximum number of log entries that will be returned for a query.") + f.BoolVar(&l.UseOwnedStreamCount, "ingester.use-owned-stream-count", false, "When true an ingester takes into account only the streams that it owns according to the ring while applying the stream limit.") f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 0, "Maximum number of active streams per user, per ingester. 0 to disable.") f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 5000, "Maximum number of active streams per user, across the cluster. 0 to disable. When the global limit is enabled, each ingester is configured with a dynamic local limit based on the replication factor and the current number of healthy ingesters, and is kept updated whenever the number of ingesters change.") @@ -588,6 +590,10 @@ func (o *Overrides) CreationGracePeriod(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).CreationGracePeriod) } +func (o *Overrides) UseOwnedStreamCount(userID string) bool { + return o.getOverridesForUser(userID).UseOwnedStreamCount +} + // MaxLocalStreamsPerUser returns the maximum number of streams a user is allowed to store // in a single ingester. func (o *Overrides) MaxLocalStreamsPerUser(userID string) int {