diff --git a/pkg/ingester/stream_rate_calculator.go b/pkg/ingester/stream_rate_calculator.go index 7b2bc9e5e15f..6312cbe1aeeb 100644 --- a/pkg/ingester/stream_rate_calculator.go +++ b/pkg/ingester/stream_rate_calculator.go @@ -10,7 +10,7 @@ import ( const ( // defaultStripeSize is the default number of entries to allocate in the // stripeSeries list. - defaultStripeSize = 1 << 15 + defaultStripeSize = 1 << 10 // The intent is for a per-second rate so this is hard coded updateInterval = time.Second @@ -25,8 +25,7 @@ type stripeLock struct { type StreamRateCalculator struct { size int - samples []map[string]logproto.StreamRate - rates []map[string]logproto.StreamRate + samples []map[string]map[uint64]logproto.StreamRate locks []stripeLock stopchan chan struct{} @@ -37,15 +36,14 @@ type StreamRateCalculator struct { func NewStreamRateCalculator() *StreamRateCalculator { calc := &StreamRateCalculator{ size: defaultStripeSize, - samples: make([]map[string]logproto.StreamRate, defaultStripeSize), - rates: make([]map[string]logproto.StreamRate, defaultStripeSize), + // Lookup pattern: tenant -> fingerprint -> rate + samples: make([]map[string]map[uint64]logproto.StreamRate, defaultStripeSize), locks: make([]stripeLock, defaultStripeSize), stopchan: make(chan struct{}), } for i := 0; i < defaultStripeSize; i++ { - calc.rates[i] = make(map[string]logproto.StreamRate) - calc.samples[i] = make(map[string]logproto.StreamRate) + calc.samples[i] = make(map[string]map[uint64]logproto.StreamRate) } go calc.updateLoop() @@ -72,18 +70,20 @@ func (c *StreamRateCalculator) updateRates() { for i := 0; i < c.size; i++ { c.locks[i].Lock() - c.rates[i] = c.samples[i] - c.samples[i] = make(map[string]logproto.StreamRate) - - sr := c.rates[i] - for _, v := range sr { - rates = append(rates, logproto.StreamRate{ - Tenant: v.Tenant, - StreamHash: v.StreamHash, - StreamHashNoShard: v.StreamHashNoShard, - Rate: v.Rate, - }) + + tenantRates := c.samples[i] + for _, tenant := range tenantRates { + for _, streamRate := range tenant { + rates = append(rates, logproto.StreamRate{ + Tenant: streamRate.Tenant, + StreamHash: streamRate.StreamHash, + StreamHashNoShard: streamRate.StreamHashNoShard, + Rate: streamRate.Rate, + }) + } } + + c.samples[i] = make(map[string]map[uint64]logproto.StreamRate) c.locks[i].Unlock() } @@ -106,13 +106,22 @@ func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoSha c.locks[i].Lock() defer c.locks[i].Unlock() - streamRate := c.samples[i][tenant] + tenantMap := c.getTenant(i, tenant) + streamRate := tenantMap[streamHash] streamRate.StreamHash = streamHash streamRate.StreamHashNoShard = streamHashNoShard streamRate.Tenant = tenant streamRate.Rate += int64(bytes) + tenantMap[streamHash] = streamRate - c.samples[i][tenant] = streamRate + c.samples[i][tenant] = tenantMap +} + +func (c *StreamRateCalculator) getTenant(idx uint64, tenant string) map[uint64]logproto.StreamRate { + if t, ok := c.samples[idx][tenant]; ok { + return t + } + return make(map[uint64]logproto.StreamRate) } func (c *StreamRateCalculator) Stop() {