Skip to content

Commit

Permalink
Reduce number of locks and partition by tenant and stream hash (grafa…
Browse files Browse the repository at this point in the history
…na#7583)

This PR reduces the number of locks in the rate calculator and allows it
to hold an arbitrary number of rates by partitioning by tenant and
stream hash

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
2 people authored and lxwzy committed Nov 7, 2022
1 parent e200262 commit 8eb8d33
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions pkg/ingester/stream_rate_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
const (
// defaultStripeSize is the default number of entries to allocate in the
// stripeSeries list.
defaultStripeSize = 1 << 15
defaultStripeSize = 1 << 10

// The intent is for a per-second rate so this is hard coded
updateInterval = time.Second
Expand All @@ -25,8 +25,7 @@ type stripeLock struct {

type StreamRateCalculator struct {
size int
samples []map[string]logproto.StreamRate
rates []map[string]logproto.StreamRate
samples []map[string]map[uint64]logproto.StreamRate
locks []stripeLock
stopchan chan struct{}

Expand All @@ -37,15 +36,14 @@ type StreamRateCalculator struct {
func NewStreamRateCalculator() *StreamRateCalculator {
calc := &StreamRateCalculator{
size: defaultStripeSize,
samples: make([]map[string]logproto.StreamRate, defaultStripeSize),
rates: make([]map[string]logproto.StreamRate, defaultStripeSize),
// Lookup pattern: tenant -> fingerprint -> rate
samples: make([]map[string]map[uint64]logproto.StreamRate, defaultStripeSize),
locks: make([]stripeLock, defaultStripeSize),
stopchan: make(chan struct{}),
}

for i := 0; i < defaultStripeSize; i++ {
calc.rates[i] = make(map[string]logproto.StreamRate)
calc.samples[i] = make(map[string]logproto.StreamRate)
calc.samples[i] = make(map[string]map[uint64]logproto.StreamRate)
}

go calc.updateLoop()
Expand All @@ -72,18 +70,20 @@ func (c *StreamRateCalculator) updateRates() {

for i := 0; i < c.size; i++ {
c.locks[i].Lock()
c.rates[i] = c.samples[i]
c.samples[i] = make(map[string]logproto.StreamRate)

sr := c.rates[i]
for _, v := range sr {
rates = append(rates, logproto.StreamRate{
Tenant: v.Tenant,
StreamHash: v.StreamHash,
StreamHashNoShard: v.StreamHashNoShard,
Rate: v.Rate,
})

tenantRates := c.samples[i]
for _, tenant := range tenantRates {
for _, streamRate := range tenant {
rates = append(rates, logproto.StreamRate{
Tenant: streamRate.Tenant,
StreamHash: streamRate.StreamHash,
StreamHashNoShard: streamRate.StreamHashNoShard,
Rate: streamRate.Rate,
})
}
}

c.samples[i] = make(map[string]map[uint64]logproto.StreamRate)
c.locks[i].Unlock()
}

Expand All @@ -106,13 +106,22 @@ func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoSha
c.locks[i].Lock()
defer c.locks[i].Unlock()

streamRate := c.samples[i][tenant]
tenantMap := c.getTenant(i, tenant)
streamRate := tenantMap[streamHash]
streamRate.StreamHash = streamHash
streamRate.StreamHashNoShard = streamHashNoShard
streamRate.Tenant = tenant
streamRate.Rate += int64(bytes)
tenantMap[streamHash] = streamRate

c.samples[i][tenant] = streamRate
c.samples[i][tenant] = tenantMap
}

func (c *StreamRateCalculator) getTenant(idx uint64, tenant string) map[uint64]logproto.StreamRate {
if t, ok := c.samples[idx][tenant]; ok {
return t
}
return make(map[uint64]logproto.StreamRate)
}

func (c *StreamRateCalculator) Stop() {
Expand Down

0 comments on commit 8eb8d33

Please sign in to comment.