Skip to content

Commit

Permalink
Reduce number of locks and partition by tenant and stream hash (#7583)
Browse files Browse the repository at this point in the history
This PR reduces the number of locks in the rate calculator and allows it
to hold an arbitrary number of rates by partitioning by tenant and
stream hash

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
MasslessParticle and owen-d authored Nov 3, 2022
1 parent 7353a6a commit 198c81f
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions pkg/ingester/stream_rate_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
const (
// defaultStripeSize is the default number of entries to allocate in the
// stripeSeries list.
defaultStripeSize = 1 << 15
defaultStripeSize = 1 << 10

// The intent is for a per-second rate so this is hard coded
updateInterval = time.Second
Expand All @@ -25,8 +25,7 @@ type stripeLock struct {

type StreamRateCalculator struct {
size int
samples []map[string]logproto.StreamRate
rates []map[string]logproto.StreamRate
samples []map[string]map[uint64]logproto.StreamRate
locks []stripeLock
stopchan chan struct{}

Expand All @@ -37,15 +36,14 @@ type StreamRateCalculator struct {
func NewStreamRateCalculator() *StreamRateCalculator {
calc := &StreamRateCalculator{
size: defaultStripeSize,
samples: make([]map[string]logproto.StreamRate, defaultStripeSize),
rates: make([]map[string]logproto.StreamRate, defaultStripeSize),
// Lookup pattern: tenant -> fingerprint -> rate
samples: make([]map[string]map[uint64]logproto.StreamRate, defaultStripeSize),
locks: make([]stripeLock, defaultStripeSize),
stopchan: make(chan struct{}),
}

for i := 0; i < defaultStripeSize; i++ {
calc.rates[i] = make(map[string]logproto.StreamRate)
calc.samples[i] = make(map[string]logproto.StreamRate)
calc.samples[i] = make(map[string]map[uint64]logproto.StreamRate)
}

go calc.updateLoop()
Expand All @@ -72,18 +70,20 @@ func (c *StreamRateCalculator) updateRates() {

for i := 0; i < c.size; i++ {
c.locks[i].Lock()
c.rates[i] = c.samples[i]
c.samples[i] = make(map[string]logproto.StreamRate)

sr := c.rates[i]
for _, v := range sr {
rates = append(rates, logproto.StreamRate{
Tenant: v.Tenant,
StreamHash: v.StreamHash,
StreamHashNoShard: v.StreamHashNoShard,
Rate: v.Rate,
})

tenantRates := c.samples[i]
for _, tenant := range tenantRates {
for _, streamRate := range tenant {
rates = append(rates, logproto.StreamRate{
Tenant: streamRate.Tenant,
StreamHash: streamRate.StreamHash,
StreamHashNoShard: streamRate.StreamHashNoShard,
Rate: streamRate.Rate,
})
}
}

c.samples[i] = make(map[string]map[uint64]logproto.StreamRate)
c.locks[i].Unlock()
}

Expand All @@ -106,13 +106,22 @@ func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoSha
c.locks[i].Lock()
defer c.locks[i].Unlock()

streamRate := c.samples[i][tenant]
tenantMap := c.getTenant(i, tenant)
streamRate := tenantMap[streamHash]
streamRate.StreamHash = streamHash
streamRate.StreamHashNoShard = streamHashNoShard
streamRate.Tenant = tenant
streamRate.Rate += int64(bytes)
tenantMap[streamHash] = streamRate

c.samples[i][tenant] = streamRate
c.samples[i][tenant] = tenantMap
}

func (c *StreamRateCalculator) getTenant(idx uint64, tenant string) map[uint64]logproto.StreamRate {
if t, ok := c.samples[idx][tenant]; ok {
return t
}
return make(map[uint64]logproto.StreamRate)
}

func (c *StreamRateCalculator) Stop() {
Expand Down

0 comments on commit 198c81f

Please sign in to comment.