diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 9f1db601bb72d..402768988bb52 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -458,7 +458,8 @@ func Test_SeriesIterator(t *testing.T) { limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) for i := 0; i < 3; i++ { inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil) @@ -505,7 +506,7 @@ func Benchmark_SeriesIterator(b *testing.B) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(b, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) for i := range instances { inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 1f3a39415fa3f..2fd07b0a10202 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -388,10 +388,6 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con i.lifecyclerWatcher.WatchService(i.partitionReader) } - // Now that the lifecycler has been created, we can create the limiter - // which depends on it. - i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) - i.Service = services.NewBasicService(i.starting, i.running, i.stopping) i.setupAutoForget() @@ -408,12 +404,18 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con i.SetExtractorWrapper(i.cfg.SampleExtractorWrapper) } + var limiterStrategy limiterRingStrategy var ownedStreamsStrategy ownershipStrategy if i.cfg.KafkaIngestion.Enabled { + limiterStrategy = newPartitionRingLimiterStrategy(partitionRingWatcher, limits.IngestionPartitionsTenantShardSize) ownedStreamsStrategy = newOwnedStreamsPartitionStrategy(i.ingestPartitionID, partitionRingWatcher, limits.IngestionPartitionsTenantShardSize, util_log.Logger) } else { + limiterStrategy = newIngesterRingLimiterStrategy(i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor) ownedStreamsStrategy = newOwnedStreamsIngesterStrategy(i.lifecycler.ID, i.readRing, util_log.Logger) } + // Now that the lifecycler has been created, we can create the limiter + // which depends on it. + i.limiter = NewLimiter(limits, metrics, limiterStrategy) i.recalculateOwnedStreams = newRecalculateOwnedStreamsSvc(i.getInstances, ownedStreamsStrategy, cfg.OwnedStreamsCheckInterval, util_log.Logger) return i, nil diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 4086831bb33fb..819577540f664 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -78,7 +78,7 @@ var NilMetrics = newIngesterMetrics(nil, constants.Loki) func TestLabelsCollisions(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.Nil(t, err) @@ -106,7 +106,7 @@ func TestLabelsCollisions(t *testing.T) { func TestConcurrentPushes(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.Nil(t, err) @@ -158,7 +158,7 @@ func TestConcurrentPushes(t *testing.T) { func TestGetStreamRates(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) require.NoError(t, err) @@ -245,7 +245,7 @@ func labelHashNoShard(l labels.Labels) uint64 { func TestSyncPeriod(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) const ( syncPeriod = 1 * time.Minute @@ -290,7 +290,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { t.Helper() limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) indexShards := 2 // just some random values @@ -507,7 +507,7 @@ func makeRandomLabels() labels.Labels { func Benchmark_PushInstance(b *testing.B) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(b, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) ctx := context.Background() @@ -549,7 +549,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { l.MaxLocalStreamsPerUser = 100000 limits, err := validation.NewOverrides(l, nil) require.NoError(b, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) ctx := context.Background() @@ -1089,7 +1089,7 @@ func TestStreamShardingUsage(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), limitsDefinition) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) defaultShardStreamsCfg := limiter.limits.ShardStreams("fake") tenantShardStreamsCfg := limiter.limits.ShardStreams(customTenant1) @@ -1454,7 +1454,7 @@ func defaultInstance(t *testing.T) *instance { &ingesterConfig, defaultPeriodConfigs, "fake", - NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), + NewLimiter(overrides, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index a9ddd2ba3ba3c..fd3a5c7c1b9bb 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/grafana/dskit/ring" "golang.org/x/time/rate" "github.com/grafana/loki/v3/pkg/distributor/shardstreams" @@ -13,7 +14,7 @@ import ( ) const ( - errMaxStreamsPerUserLimitExceeded = "tenant '%v' per-user streams limit exceeded, streams: %d exceeds calculated limit: %d (local limit: %d, global limit: %d, global/ingesters: %d)" + errMaxStreamsPerUserLimitExceeded = "tenant '%v' per-user streams limit exceeded, streams: %d exceeds calculated limit: %d (local limit: %d, global limit: %d, local share: %d)" ) // RingCount is the interface exposed by a ring implementation which allows @@ -37,10 +38,9 @@ type Limits interface { // Limiter implements primitives to get the maximum number of streams // an ingester can handle for a specific tenant type Limiter struct { - limits Limits - ring RingCount - replicationFactor int - metrics *ingesterMetrics + limits Limits + ringStrategy limiterRingStrategy + metrics *ingesterMetrics mtx sync.RWMutex disabled bool @@ -60,13 +60,16 @@ func (l *Limiter) Enable() { l.metrics.limiterEnabled.Set(1) } +type limiterRingStrategy interface { + convertGlobalToLocalLimit(int, string) int +} + // NewLimiter makes a new limiter -func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter { +func NewLimiter(limits Limits, metrics *ingesterMetrics, ingesterRingLimiterStrategy limiterRingStrategy) *Limiter { return &Limiter{ - limits: limits, - ring: ring, - replicationFactor: replicationFactor, - metrics: metrics, + limits: limits, + ringStrategy: ingesterRingLimiterStrategy, + metrics: metrics, } } @@ -87,7 +90,7 @@ func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLi // We can assume that streams are evenly distributed across ingesters // so we do convert the global limit into a local limit globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID) - adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit) + adjustedGlobalLimit = l.ringStrategy.convertGlobalToLocalLimit(globalLimit, tenantID) // Set the calculated limit to the lesser of the local limit or the new calculated global limit calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit) @@ -108,20 +111,32 @@ func (l *Limiter) minNonZero(first, second int) int { return first } -func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int { +type ingesterRingLimiterStrategy struct { + ring RingCount + replicationFactor int +} + +func newIngesterRingLimiterStrategy(ring RingCount, replicationFactor int) *ingesterRingLimiterStrategy { + return &ingesterRingLimiterStrategy{ + ring: ring, + replicationFactor: replicationFactor, + } +} + +func (l *ingesterRingLimiterStrategy) convertGlobalToLocalLimit(globalLimit int, _ string) int { if globalLimit == 0 || l.replicationFactor == 0 { return 0 } zonesCount := l.ring.ZonesCount() if zonesCount <= 1 { - return calculateLimitForSingleZone(globalLimit, l) + return l.calculateLimitForSingleZone(globalLimit) } - return calculateLimitForMultipleZones(globalLimit, zonesCount, l) + return l.calculateLimitForMultipleZones(globalLimit, zonesCount) } -func calculateLimitForSingleZone(globalLimit int, l *Limiter) int { +func (l *ingesterRingLimiterStrategy) calculateLimitForSingleZone(globalLimit int) int { numIngesters := l.ring.HealthyInstancesCount() if numIngesters > 0 { return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor)) @@ -129,7 +144,7 @@ func calculateLimitForSingleZone(globalLimit int, l *Limiter) int { return 0 } -func calculateLimitForMultipleZones(globalLimit, zonesCount int, l *Limiter) int { +func (l *ingesterRingLimiterStrategy) calculateLimitForMultipleZones(globalLimit, zonesCount int) int { ingestersInZone := l.ring.HealthyInstancesInZoneCount() if ingestersInZone > 0 { return int((float64(globalLimit) * float64(l.replicationFactor)) / float64(zonesCount) / float64(ingestersInZone)) @@ -137,6 +152,34 @@ func calculateLimitForMultipleZones(globalLimit, zonesCount int, l *Limiter) int return 0 } +type partitionRingLimiterStrategy struct { + ring ring.PartitionRingReader + getPartitionShardSize func(user string) int +} + +func newPartitionRingLimiterStrategy(ring ring.PartitionRingReader, getPartitionShardSize func(user string) int) *partitionRingLimiterStrategy { + return &partitionRingLimiterStrategy{ + ring: ring, + getPartitionShardSize: getPartitionShardSize, + } +} + +func (l *partitionRingLimiterStrategy) convertGlobalToLocalLimit(globalLimit int, tenantID string) int { + if globalLimit == 0 { + return 0 + } + + userShardSize := l.getPartitionShardSize(tenantID) + + // ShuffleShardSize correctly handles cases when user has no shard config or more shards than number of active partitions in the ring. + activePartitionsForUser := l.ring.PartitionRing().ShuffleShardSize(userShardSize) + + if activePartitionsForUser == 0 { + return 0 + } + return int(float64(globalLimit) / float64(activePartitionsForUser)) +} + type supplier[T any] func() T type streamCountLimiter struct { diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 0d0055d0a0afb..bb87efe9555e5 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/grafana/dskit/ring" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -14,12 +15,18 @@ import ( "github.com/grafana/loki/v3/pkg/validation" ) +type fixedStrategy struct { + localLimit int +} + +func (strategy *fixedStrategy) convertGlobalToLocalLimit(_ int, _ string) int { + return strategy.localLimit +} func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { tests := map[string]struct { maxLocalStreamsPerUser int maxGlobalStreamsPerUser int - ringReplicationFactor int - ringIngesterCount int + calculatedLocalLimit int streams int expected error useOwnedStreamService bool @@ -29,80 +36,63 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { "both local and global limit are disabled": { maxLocalStreamsPerUser: 0, maxGlobalStreamsPerUser: 0, - ringReplicationFactor: 1, - ringIngesterCount: 1, + calculatedLocalLimit: 0, streams: 100, expected: nil, }, "current number of streams is below the limit": { maxLocalStreamsPerUser: 0, maxGlobalStreamsPerUser: 1000, - ringReplicationFactor: 3, - ringIngesterCount: 10, + calculatedLocalLimit: 300, streams: 299, expected: nil, }, "current number of streams is above the limit": { maxLocalStreamsPerUser: 0, maxGlobalStreamsPerUser: 1000, - ringReplicationFactor: 3, - ringIngesterCount: 10, + calculatedLocalLimit: 300, streams: 300, expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 300, 300, 0, 1000, 300), }, "both local and global limits are disabled": { maxLocalStreamsPerUser: 0, maxGlobalStreamsPerUser: 0, - ringReplicationFactor: 1, - ringIngesterCount: 1, + calculatedLocalLimit: 0, streams: math.MaxInt32 - 1, expected: nil, }, "only local limit is enabled": { maxLocalStreamsPerUser: 1000, maxGlobalStreamsPerUser: 0, - ringReplicationFactor: 1, - ringIngesterCount: 1, + calculatedLocalLimit: 1000, streams: 3000, - expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 1000, 1000, 0, 0), + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 1000, 1000, 0, 1000), }, - "only global limit is enabled with replication-factor=1": { + "only global limit is enabled": { maxLocalStreamsPerUser: 0, maxGlobalStreamsPerUser: 1000, - ringReplicationFactor: 1, - ringIngesterCount: 10, + calculatedLocalLimit: 100, streams: 3000, expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 100, 0, 1000, 100), }, - "only global limit is enabled with replication-factor=3": { - maxLocalStreamsPerUser: 0, - maxGlobalStreamsPerUser: 1000, - ringReplicationFactor: 3, - ringIngesterCount: 10, - streams: 3000, - expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 0, 1000, 300), - }, "both local and global limits are set with local limit < global limit": { maxLocalStreamsPerUser: 150, maxGlobalStreamsPerUser: 1000, - ringReplicationFactor: 3, - ringIngesterCount: 10, + calculatedLocalLimit: 150, streams: 3000, - expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 150, 150, 1000, 300), + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 150, 150, 1000, 150), }, "both local and global limits are set with local limit > global limit": { maxLocalStreamsPerUser: 500, maxGlobalStreamsPerUser: 1000, - ringReplicationFactor: 3, - ringIngesterCount: 10, + calculatedLocalLimit: 300, streams: 3000, expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300), }, "actual limit must be used if it's greater than fixed limit": { maxLocalStreamsPerUser: 500, maxGlobalStreamsPerUser: 1000, - ringReplicationFactor: 3, - ringIngesterCount: 10, + calculatedLocalLimit: 300, useOwnedStreamService: true, fixedLimit: 20, ownedStreamCount: 3000, @@ -111,18 +101,16 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { "fixed limit must be used if it's greater than actual limit": { maxLocalStreamsPerUser: 500, maxGlobalStreamsPerUser: 1000, - ringReplicationFactor: 3, - ringIngesterCount: 10, + calculatedLocalLimit: 500, useOwnedStreamService: true, fixedLimit: 2000, ownedStreamCount: 2001, - expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 2001, 2000, 500, 1000, 300), + expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 2001, 2000, 500, 1000, 500), }, "fixed limit must not be used if both limits are disabled": { maxLocalStreamsPerUser: 0, maxGlobalStreamsPerUser: 0, - ringReplicationFactor: 3, - ringIngesterCount: 10, + calculatedLocalLimit: 0, useOwnedStreamService: true, fixedLimit: 2000, ownedStreamCount: 2001, @@ -134,9 +122,6 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { testData := testData t.Run(testName, func(t *testing.T) { - // Mock the ring - ring := &ringCountMock{count: testData.ringIngesterCount} - // Mock limits limits, err := validation.NewOverrides(validation.Limits{ MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser, @@ -149,7 +134,8 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) { fixedLimit: atomic.NewInt32(testData.fixedLimit), ownedStreamCount: testData.ownedStreamCount, } - limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor) + strategy := &fixedStrategy{localLimit: testData.calculatedLocalLimit} + limiter := NewLimiter(limits, NilMetrics, strategy) defaultCountSupplier := func() int { return testData.streams } @@ -200,7 +186,7 @@ func TestLimiter_minNonZero(t *testing.T) { testData := testData t.Run(testName, func(t *testing.T) { - limiter := NewLimiter(nil, NilMetrics, nil, 0) + limiter := NewLimiter(nil, NilMetrics, nil) assert.Equal(t, testData.expected, limiter.minNonZero(testData.first, testData.second)) }) } @@ -281,7 +267,7 @@ func (m *MockRing) HealthyInstancesInZoneCount() int { return m.healthyInstancesInZoneCount } -func TestConvertGlobalToLocalLimit(t *testing.T) { +func TestConvertGlobalToLocalLimit_IngesterRing(t *testing.T) { tests := []struct { name string globalLimit int @@ -299,19 +285,87 @@ func TestConvertGlobalToLocalLimit(t *testing.T) { } for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { + t.Run(tc.name+"_ingesterStrategy", func(t *testing.T) { mockRing := &MockRing{ zonesCount: tc.zonesCount, healthyInstancesCount: tc.healthyInstancesCount, healthyInstancesInZoneCount: tc.healthyInstancesInZoneCount, } - limiter := &Limiter{ - ring: mockRing, - replicationFactor: tc.replicationFactor, + strategy := newIngesterRingLimiterStrategy(mockRing, tc.replicationFactor) + + localLimit := strategy.convertGlobalToLocalLimit(tc.globalLimit, "test") + if localLimit != tc.expectedLocalLimit { + t.Errorf("expected %d, got %d", tc.expectedLocalLimit, localLimit) + } + }) + } +} + +func newMockPartitionRingWithPartitions(activeCount int, inactiveCount int) *ring.PartitionRing { + partitionRing := ring.PartitionRingDesc{ + Partitions: map[int32]ring.PartitionDesc{}, + Owners: map[string]ring.OwnerDesc{}, + } + + for i := 0; i < activeCount; i++ { + id := int32(i) + + partitionRing.Partitions[id] = ring.PartitionDesc{ + Id: id, + Tokens: []uint32{uint32(id)}, + State: ring.PartitionActive, + } + partitionRing.Owners[fmt.Sprintf("test%d", id)] = ring.OwnerDesc{ + OwnedPartition: id, + State: ring.OwnerActive, + } + } + for i := activeCount; i < activeCount+inactiveCount; i++ { + id := int32(i) + + partitionRing.Partitions[id] = ring.PartitionDesc{ + Id: id, + Tokens: []uint32{uint32(id)}, + State: ring.PartitionInactive, + } + } + return ring.NewPartitionRing(partitionRing) +} + +func TestConvertGlobalToLocalLimit_PartitionRing(t *testing.T) { + tests := []struct { + name string + globalLimit int + activePartitions int + inactivePartitions int + shardsPerUser int + expectedLocalLimit int + }{ + {"GlobalLimitZero", 0, 1, 0, 0, 0}, + {"SinglePartition", 100, 1, 0, 0, 100}, + {"MultiplePartitions", 200, 3, 0, 0, 66}, + {"NoActivePartitions", 200, 0, 3, 0, 0}, + {"PartialActivePartitions", 60, 3, 3, 0, 20}, + {"LimitLessThanActivePartitions", 3, 10, 0, 0, 0}, + {"LimitLessThanActivePartitions", 3, 10, 0, 0, 0}, + {"MultiplePartitionsWithLimitedShardsPerUser", 200, 3, 0, 2, 100}, + {"MultiplePartitionsWithMoreShardsPerUserThanPartitions", 200, 3, 0, 10, 66}, + } + + for _, tc := range tests { + t.Run(tc.name+"_partitionStrategy", func(t *testing.T) { + ringReader := &mockPartitionRingReader{ + ring: newMockPartitionRingWithPartitions(tc.activePartitions, tc.inactivePartitions), + } + + getPartitionsForUser := func(_ string) int { + return tc.shardsPerUser } - localLimit := limiter.convertGlobalToLocalLimit(tc.globalLimit) + strategy := newPartitionRingLimiterStrategy(ringReader, getPartitionsForUser) + + localLimit := strategy.convertGlobalToLocalLimit(tc.globalLimit, "test") if localLimit != tc.expectedLocalLimit { t.Errorf("expected %d, got %d", tc.expectedLocalLimit, localLimit) } diff --git a/pkg/ingester/owned_streams_test.go b/pkg/ingester/owned_streams_test.go index 7f114922fa447..373d37a5f62e6 100644 --- a/pkg/ingester/owned_streams_test.go +++ b/pkg/ingester/owned_streams_test.go @@ -17,7 +17,7 @@ func Test_OwnedStreamService(t *testing.T) { require.NoError(t, err) // Mock the ring ring := &ringCountMock{count: 30} - limiter := NewLimiter(limits, NilMetrics, ring, 3) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(ring, 3)) service := newOwnedStreamService("test", limiter) require.Equal(t, 0, service.getOwnedStreamCount()) diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go index d5dce8599287b..d2d3583095b02 100644 --- a/pkg/ingester/recalculate_owned_streams_test.go +++ b/pkg/ingester/recalculate_owned_streams_test.go @@ -70,7 +70,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) UseOwnedStreamCount: testData.featureEnabled, }, nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, mockRing, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(mockRing, 1)) tenant, err := newInstance( defaultConfig(), diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 03e0ca976628f..fcad6558b21de 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -56,7 +56,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { @@ -114,7 +114,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { func TestPushDeduplication(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) chunkfmt, headfmt := defaultChunkFormat(t) @@ -150,7 +150,7 @@ func TestPushDeduplication(t *testing.T) { func TestPushDeduplicationExtraMetrics(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) chunkfmt, headfmt := defaultChunkFormat(t) @@ -220,7 +220,7 @@ func TestPushDeduplicationExtraMetrics(t *testing.T) { func TestPushRejectOldCounter(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) chunkfmt, headfmt := defaultChunkFormat(t) @@ -328,7 +328,7 @@ func TestEntryErrorCorrectlyReported(t *testing.T) { } limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) chunkfmt, headfmt := defaultChunkFormat(t) @@ -367,7 +367,7 @@ func TestUnorderedPush(t *testing.T) { cfg.MaxChunkAge = 10 * time.Second limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) chunkfmt, headfmt := defaultChunkFormat(t) @@ -470,7 +470,7 @@ func TestPushRateLimit(t *testing.T) { } limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) chunkfmt, headfmt := defaultChunkFormat(t) @@ -510,7 +510,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { } limits, err := validation.NewOverrides(l, nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) cfg := defaultConfig() chunkfmt, headfmt := defaultChunkFormat(t) @@ -549,7 +549,7 @@ func TestPushRateLimitAllOrNothing(t *testing.T) { func TestReplayAppendIgnoresValidityWindow(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) cfg := defaultConfig() cfg.MaxChunkAge = time.Minute @@ -617,7 +617,7 @@ func Benchmark_PushStream(b *testing.B) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(b, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) chunkfmt, headfmt := defaultChunkFormat(b) s := newStream(chunkfmt, headfmt, &Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NewStreamRateCalculator(), NilMetrics, nil, nil) diff --git a/pkg/ingester/streams_map_test.go b/pkg/ingester/streams_map_test.go index b14b3e07e497f..87e8332eddd45 100644 --- a/pkg/ingester/streams_map_test.go +++ b/pkg/ingester/streams_map_test.go @@ -13,7 +13,7 @@ import ( func TestStreamsMap(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1) + limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)) chunkfmt, headfmt := defaultChunkFormat(t) ss := []*stream{