From c9b9131459573cfa84093550fe1d4d56f305fa56 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Tue, 21 May 2024 17:17:45 +0300 Subject: [PATCH 1/5] implemented a new stream count limiter that uses the values from ownedStreamService if the feature flag is enabled. Signed-off-by: Vladyslav Diachenko --- pkg/ingester/instance.go | 75 ++++++++++++++++----------- pkg/ingester/limiter.go | 81 +++++++++++++++++++++--------- pkg/ingester/limiter_test.go | 47 ++++++++++++++++- pkg/ingester/owned_streams.go | 44 ++++++++++++++++ pkg/ingester/owned_streams_test.go | 35 +++++++++++++ pkg/validation/limits.go | 6 +++ 6 files changed, 233 insertions(+), 55 deletions(-) create mode 100644 pkg/ingester/owned_streams.go create mode 100644 pkg/ingester/owned_streams_test.go diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index a4436b9d41915..7f1ec78601fff 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -104,7 +104,10 @@ type instance struct { tailers map[uint32]*tailer tailerMtx sync.RWMutex - limiter *Limiter + limiter *Limiter + streamCountLimiter *streamCountLimiter + ownedStreamsSvc *ownedStreamService + configs *runtime.TenantConfigs wal WAL @@ -147,11 +150,12 @@ func newInstance( if err != nil { return nil, err } - + streams := newStreamsMap() + ownedStreamsSvc := newOwnedStreamService(instanceID, limiter) c := config.SchemaConfig{Configs: periodConfigs} i := &instance{ cfg: cfg, - streams: newStreamsMap(), + streams: streams, buf: make([]byte, 0, 1024), index: invertedIndex, instanceID: instanceID, @@ -159,9 +163,11 @@ func newInstance( streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID), streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID), - tailers: map[uint32]*tailer{}, - limiter: limiter, - configs: configs, + tailers: map[uint32]*tailer{}, + limiter: limiter, + streamCountLimiter: newStreamCountLimiter(instanceID, streams.Len, limiter, ownedStreamsSvc), + ownedStreamsSvc: ownedStreamsSvc, + configs: configs, wal: wal, metrics: metrics, @@ -286,29 +292,11 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre } if record != nil { - err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, i.streams.Len()) + err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID) } if err != nil { - if i.configs.LogStreamCreation(i.instanceID) { - level.Debug(util_log.Logger).Log( - "msg", "failed to create stream, exceeded limit", - "org_id", i.instanceID, - "err", err, - "stream", pushReqStream.Labels, - ) - } - - validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) - bytes := 0 - for _, e := range pushReqStream.Entries { - bytes += len(e.Line) - } - validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) - if i.customStreamsTracker != nil { - i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes)) - } - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID) + return i.onStreamCreationError(ctx, pushReqStream, err, labels) } fp := i.getHashForLabels(labels) @@ -333,21 +321,47 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre i.metrics.recoveredStreamsTotal.Inc() } + i.onStreamCreated(s) + + return s, nil +} + +func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logproto.Stream, err error, labels labels.Labels) (*stream, error) { + if i.configs.LogStreamCreation(i.instanceID) { + level.Debug(util_log.Logger).Log( + "msg", "failed to create stream, exceeded limit", + "org_id", i.instanceID, + "err", err, + "stream", pushReqStream.Labels, + ) + } + + validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) + bytes := 0 + for _, e := range pushReqStream.Entries { + bytes += len(e.Line) + } + validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) + if i.customStreamsTracker != nil { + i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes)) + } + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID) +} + +func (i *instance) onStreamCreated(s *stream) { memoryStreams.WithLabelValues(i.instanceID).Inc() memoryStreamsLabelsBytes.Add(float64(len(s.labels.String()))) i.streamsCreatedTotal.Inc() i.addTailersToNewStream(s) streamsCountStats.Add(1) - + i.ownedStreamsSvc.incOwnedStreamCount() if i.configs.LogStreamCreation(i.instanceID) { level.Debug(util_log.Logger).Log( "msg", "successfully created stream", "org_id", i.instanceID, - "stream", pushReqStream.Labels, + "stream", s.labels.String(), ) } - - return s, nil } func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*stream, error) { @@ -407,6 +421,7 @@ func (i *instance) removeStream(s *stream) { memoryStreams.WithLabelValues(i.instanceID).Dec() memoryStreamsLabelsBytes.Sub(float64(len(s.labels.String()))) streamsCountStats.Add(-1) + i.ownedStreamsSvc.decOwnedStreamCount() } } diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 94c77a30be7e3..daa1fe7aec8da 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -24,6 +24,7 @@ type RingCount interface { type Limits interface { UnorderedWrites(userID string) bool + UseOwnedStreamCount(userID string) bool MaxLocalStreamsPerUser(userID string) int MaxGlobalStreamsPerUser(userID string) int PerStreamRateLimit(userID string) validation.RateLimit @@ -76,46 +77,39 @@ func (l *Limiter) UnorderedWrites(userID string) bool { return l.limits.UnorderedWrites(userID) } -// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current -// number of streams in input and returns an error if so. -func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error { - // Until the limiter actually starts, all accesses are successful. - // This is used to disable limits while recovering from the WAL. - l.mtx.RLock() - defer l.mtx.RUnlock() - if l.disabled { - return nil - } - +func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) { // Start by setting the local limit either from override or default - localLimit := l.limits.MaxLocalStreamsPerUser(userID) + localLimit = l.limits.MaxLocalStreamsPerUser(tenantID) // We can assume that streams are evenly distributed across ingesters // so we do convert the global limit into a local limit - globalLimit := l.limits.MaxGlobalStreamsPerUser(userID) - adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit) + globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID) + adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit) // Set the calculated limit to the lesser of the local limit or the new calculated global limit - calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit) + calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit) // If both the local and global limits are disabled, we just // use the largest int value if calculatedLimit == 0 { calculatedLimit = math.MaxInt32 } + return +} - if streams < calculatedLimit { - return nil +func (l *Limiter) minNonZero(first, second int) int { + if first == 0 || (second != 0 && first > second) { + return second } - return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit) + return first } func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int { if globalLimit == 0 { return 0 } - + // todo: change to healthyInstancesInZoneCount() once // Given we don't need a super accurate count (ie. when the ingesters // topology changes) and we prefer to always be in favor of the tenant, // we can use a per-ingester limit equal to: @@ -131,12 +125,53 @@ func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int { return 0 } -func (l *Limiter) minNonZero(first, second int) int { - if first == 0 || (second != 0 && first > second) { - return second +type supplier[T any] func() T + +type streamCountLimiter struct { + tenantID string + limiter *Limiter + defaultStreamCountSupplier supplier[int] + ownedStreamSvc *ownedStreamService +} + +var noopFixedLimitSupplier = func() int { + return 0 +} + +func newStreamCountLimiter(tenantID string, defaultStreamCountSupplier supplier[int], limiter *Limiter, service *ownedStreamService) *streamCountLimiter { + return &streamCountLimiter{ + tenantID: tenantID, + limiter: limiter, + defaultStreamCountSupplier: defaultStreamCountSupplier, + ownedStreamSvc: service, } +} - return first +func (l *streamCountLimiter) AssertNewStreamAllowed(tenantID string) error { + streamCountSupplier, fixedLimitSupplier := l.getSuppliers(tenantID) + calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit := l.getCurrentLimit(tenantID, fixedLimitSupplier) + actualStreamsCount := streamCountSupplier() + if actualStreamsCount < calculatedLimit { + return nil + } + + return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, tenantID, actualStreamsCount, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit) +} + +func (l *streamCountLimiter) getCurrentLimit(tenantID string, fixedLimitSupplier supplier[int]) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) { + calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit = l.limiter.GetStreamCountLimit(tenantID) + fixedLimit := fixedLimitSupplier() + if fixedLimit > calculatedLimit { + calculatedLimit = fixedLimit + } + return +} + +func (l *streamCountLimiter) getSuppliers(tenant string) (streamCountSupplier, fixedLimitSupplier supplier[int]) { + if l.limiter.limits.UseOwnedStreamCount(tenant) { + return l.ownedStreamSvc.getOwnedStreamCount, l.ownedStreamSvc.getFixedLimit + } + return l.defaultStreamCountSupplier, noopFixedLimitSupplier } type RateLimiterStrategy interface { diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 6186e910663e0..9d4d3b3037c6f 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -8,12 +8,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "golang.org/x/time/rate" "github.com/grafana/loki/v3/pkg/validation" ) -func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { +func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { tests := map[string]struct { maxLocalStreamsPerUser int maxGlobalStreamsPerUser int @@ -21,6 +22,9 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { ringIngesterCount int streams int expected error + useOwnedStreamService bool + fixedLimit int32 + ownedStreamCount int64 }{ "both local and global limit are disabled": { maxLocalStreamsPerUser: 0, @@ -94,6 +98,36 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { streams: 3000, expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300), }, + "actual limit must be used if it's greater than fixed limit": { + maxLocalStreamsPerUser: 500, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + useOwnedStreamService: true, + fixedLimit: 20, + ownedStreamCount: 3000, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300), + }, + "fixed limit must be used if it's greater than actual limit": { + maxLocalStreamsPerUser: 500, + maxGlobalStreamsPerUser: 1000, + ringReplicationFactor: 3, + ringIngesterCount: 10, + useOwnedStreamService: true, + fixedLimit: 2000, + ownedStreamCount: 2001, + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 2001, 2000, 500, 1000, 300), + }, + "fixed limit must not be used if both limits are disabled": { + maxLocalStreamsPerUser: 0, + maxGlobalStreamsPerUser: 0, + ringReplicationFactor: 3, + ringIngesterCount: 10, + useOwnedStreamService: true, + fixedLimit: 2000, + ownedStreamCount: 2001, + expected: nil, + }, } for testName, testData := range tests { @@ -107,11 +141,20 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{ MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser, MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser, + UseOwnedStreamCount: testData.useOwnedStreamService, }, nil) require.NoError(t, err) + ownedStreamSvc := &ownedStreamService{ + fixedLimit: atomic.NewInt32(testData.fixedLimit), + ownedStreamCount: atomic.NewInt64(testData.ownedStreamCount), + } limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor) - actual := limiter.AssertMaxStreamsPerUser("test", testData.streams) + defaultCountSupplier := func() int { + return testData.streams + } + streamCountLimiter := newStreamCountLimiter("test", defaultCountSupplier, limiter, ownedStreamSvc) + actual := streamCountLimiter.AssertNewStreamAllowed("test") assert.Equal(t, testData.expected, actual) }) diff --git a/pkg/ingester/owned_streams.go b/pkg/ingester/owned_streams.go new file mode 100644 index 0000000000000..01cb8235f9b1a --- /dev/null +++ b/pkg/ingester/owned_streams.go @@ -0,0 +1,44 @@ +package ingester + +import "go.uber.org/atomic" + +type ownedStreamService struct { + tenantID string + limiter *Limiter + fixedLimit *atomic.Int32 + + //todo: implement job to recalculate it + ownedStreamCount *atomic.Int64 +} + +func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService { + svc := &ownedStreamService{ + tenantID: tenantID, + limiter: limiter, + ownedStreamCount: atomic.NewInt64(0), + fixedLimit: atomic.NewInt32(0), + } + svc.updateFixedLimit() + return svc +} + +func (s *ownedStreamService) getOwnedStreamCount() int { + return int(s.ownedStreamCount.Load()) +} + +func (s *ownedStreamService) updateFixedLimit() { + limit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID) + s.fixedLimit.Store(int32(limit)) +} + +func (s *ownedStreamService) getFixedLimit() int { + return int(s.fixedLimit.Load()) +} + +func (s *ownedStreamService) incOwnedStreamCount() { + s.ownedStreamCount.Inc() +} + +func (s *ownedStreamService) decOwnedStreamCount() { + s.ownedStreamCount.Dec() +} diff --git a/pkg/ingester/owned_streams_test.go b/pkg/ingester/owned_streams_test.go new file mode 100644 index 0000000000000..d801be7fbf43d --- /dev/null +++ b/pkg/ingester/owned_streams_test.go @@ -0,0 +1,35 @@ +package ingester + +import ( + "testing" + + "github.com/grafana/loki/v3/pkg/validation" + "github.com/stretchr/testify/require" +) + +func Test_OwnedStreamService(t *testing.T) { + limits, err := validation.NewOverrides(validation.Limits{ + MaxGlobalStreamsPerUser: 100, + }, nil) + require.NoError(t, err) + // Mock the ring + ring := &ringCountMock{count: 30} + limiter := NewLimiter(limits, NilMetrics, ring, 3) + + service := newOwnedStreamService("test", limiter) + require.Equal(t, 0, service.getOwnedStreamCount()) + require.Equal(t, 10, service.getFixedLimit(), "fixed limit must be initialised during the instantiation") + + limits.DefaultLimits().MaxGlobalStreamsPerUser = 1000 + require.Equal(t, 10, service.getFixedLimit(), "fixed list must not be changed until update is triggered") + + service.updateFixedLimit() + require.Equal(t, 100, service.getFixedLimit()) + + service.incOwnedStreamCount() + service.incOwnedStreamCount() + require.Equal(t, 2, service.getOwnedStreamCount()) + + service.decOwnedStreamCount() + require.Equal(t, 1, service.getOwnedStreamCount()) +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index b0660686f5c1b..e9eda23d6be9f 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -85,6 +85,7 @@ type Limits struct { DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"` // Ingester enforced limits. + UseOwnedStreamCount bool `yaml:"use_owned_stream_count" json:"use_owned_stream_count"` MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"` MaxGlobalStreamsPerUser int `yaml:"max_global_streams_per_user" json:"max_global_streams_per_user"` UnorderedWrites bool `yaml:"unordered_writes" json:"unordered_writes"` @@ -269,6 +270,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") f.IntVar(&l.MaxEntriesLimitPerQuery, "validation.max-entries-limit", 5000, "Maximum number of log entries that will be returned for a query.") + f.BoolVar(&l.UseOwnedStreamCount, "ingester.use-owned-stream-count", false, "When true, the ingester takes into account only the streams that belongs to this instance according to the ring while applying the stream limit.") f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 0, "Maximum number of active streams per user, per ingester. 0 to disable.") f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 5000, "Maximum number of active streams per user, across the cluster. 0 to disable. When the global limit is enabled, each ingester is configured with a dynamic local limit based on the replication factor and the current number of healthy ingesters, and is kept updated whenever the number of ingesters change.") @@ -586,6 +588,10 @@ func (o *Overrides) CreationGracePeriod(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).CreationGracePeriod) } +func (o *Overrides) UseOwnedStreamCount(userID string) bool { + return o.getOverridesForUser(userID).UseOwnedStreamCount +} + // MaxLocalStreamsPerUser returns the maximum number of streams a user is allowed to store // in a single ingester. func (o *Overrides) MaxLocalStreamsPerUser(userID string) int { From 7e984f100c7feb29f5010a757b710f9d3ba7614e Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Tue, 21 May 2024 17:24:42 +0300 Subject: [PATCH 2/5] updated the docs Signed-off-by: Vladyslav Diachenko --- docs/sources/shared/configuration.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d30dce2f7775b..9b56a9e3f9ade 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -2974,6 +2974,11 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v # CLI flag: -validation.discover-log-levels [discover_log_levels: | default = true] +# When true, the ingester takes into account only the streams that belongs to +# this instance according to the ring while applying the stream limit. +# CLI flag: -ingester.use-owned-stream-count +[use_owned_stream_count: | default = false] + # Maximum number of active streams per user, per ingester. 0 to disable. # CLI flag: -ingester.max-streams-per-user [max_streams_per_user: | default = 0] From 9e9e1b88bf3ab8999537d01d6fa7b637f1c53c6a Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Tue, 21 May 2024 19:44:04 +0300 Subject: [PATCH 3/5] fixed imports Signed-off-by: Vladyslav Diachenko --- pkg/ingester/owned_streams_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/owned_streams_test.go b/pkg/ingester/owned_streams_test.go index d801be7fbf43d..c7ddd9d87f29d 100644 --- a/pkg/ingester/owned_streams_test.go +++ b/pkg/ingester/owned_streams_test.go @@ -3,8 +3,9 @@ package ingester import ( "testing" - "github.com/grafana/loki/v3/pkg/validation" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/validation" ) func Test_OwnedStreamService(t *testing.T) { From 8dd26fb00b96f59a339ac5a281f50fb3af83048b Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko <82767850+vlad-diachenko@users.noreply.github.com> Date: Wed, 22 May 2024 14:03:28 +0300 Subject: [PATCH 4/5] Update pkg/validation/limits.go Co-authored-by: JordanRushing --- pkg/validation/limits.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index e9eda23d6be9f..93884bfc4fdda 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -270,7 +270,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") f.IntVar(&l.MaxEntriesLimitPerQuery, "validation.max-entries-limit", 5000, "Maximum number of log entries that will be returned for a query.") - f.BoolVar(&l.UseOwnedStreamCount, "ingester.use-owned-stream-count", false, "When true, the ingester takes into account only the streams that belongs to this instance according to the ring while applying the stream limit.") + f.BoolVar(&l.UseOwnedStreamCount, "ingester.use-owned-stream-count", false, "When true an ingester takes into account only the streams that it owns according to the ring while applying the stream limit.") f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 0, "Maximum number of active streams per user, per ingester. 0 to disable.") f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 5000, "Maximum number of active streams per user, across the cluster. 0 to disable. When the global limit is enabled, each ingester is configured with a dynamic local limit based on the replication factor and the current number of healthy ingesters, and is kept updated whenever the number of ingesters change.") From a8503842248c98ef3dd48c7d10fe703f413b68a4 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Wed, 22 May 2024 14:16:39 +0300 Subject: [PATCH 5/5] updated the docs Signed-off-by: Vladyslav Diachenko --- docs/sources/shared/configuration.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 9b56a9e3f9ade..d94cb125b1f88 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -2974,8 +2974,8 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v # CLI flag: -validation.discover-log-levels [discover_log_levels: | default = true] -# When true, the ingester takes into account only the streams that belongs to -# this instance according to the ring while applying the stream limit. +# When true an ingester takes into account only the streams that it owns +# according to the ring while applying the stream limit. # CLI flag: -ingester.use-owned-stream-count [use_owned_stream_count: | default = false]