Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Increase eviction threshold
Browse files Browse the repository at this point in the history
If we have slow ingest load and ingesting large amount of series
then eviction timestamp will get older (it takes more time to go through the
round of series).
We need to bump the threashold to cover for that case.

Also covering an edge case of evictor starvation.
  • Loading branch information
niksajakovljevic committed Dec 13, 2022
1 parent 6c160d0 commit 94e68c9
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pkg/clockcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (cache *Cache) evict() (insertPtr *element) {
for i := 0; i < 2; i++ {
for next := range postStart {
elem := &postStart[next]
old := atomic.SwapUint32(&elem.usedWithTs, setUsed(false, elem.usedWithTs))
old := atomic.SwapUint32(&elem.usedWithTs, setUsed(false, atomic.LoadUint32(&elem.usedWithTs)))
if !extractUsed(old) {
insertPtr = elem
}
Expand All @@ -188,7 +188,7 @@ func (cache *Cache) evict() (insertPtr *element) {
}
for next := range preStart {
elem := &preStart[next]
old := atomic.SwapUint32(&elem.usedWithTs, setUsed(false, elem.usedWithTs))
old := atomic.SwapUint32(&elem.usedWithTs, setUsed(false, atomic.LoadUint32(&elem.usedWithTs)))
if !extractUsed(old) {
insertPtr = elem
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/clockcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,49 @@ func TestEviction(t *testing.T) {
}
}

// Test that we don't starve evictor with a lots of concurrent gets
func TestEvictionStarvation(t *testing.T) {
t.Parallel()
cache := WithMax(1000000)
// prepopulate cache
for i := 0; i < cache.Cap(); i++ {
cache.Insert(i, i+1, uint64(16))
}
// mark all as used
for i := 0; i < cache.Cap(); i++ {
cache.Get(i)
}
parallelReq := 10
stopCh := make(chan bool, 1)
for i := 0; i < parallelReq; i++ {
// simulate a lot of parallel gets
go func(worker int) {
for {
select {
case <-stopCh:
return
default:
start := worker * 100000
end := start + 100000
for i := start; i < end; i++ {
cache.Get(i)
}
time.Sleep(time.Millisecond)
}
}
}(i)
}
time.Sleep(time.Second)
// inserting over capacity will trigger evictions
for i := cache.Cap(); i < 1200000; i++ {
_, inCache := cache.Insert(i, i+1, uint64(16))
if !inCache {
t.Fatal("failed to insert due to starved evictor")
}
}
stopCh <- true
}

func TestCacheGetRandomly(t *testing.T) {
t.Parallel()

Expand Down
14 changes: 13 additions & 1 deletion pkg/clockcache/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (pm *perfMetrics) createAndRegister(r prometheus.Registerer, name, module s
Buckets: prometheus.LinearBuckets(1, 500, 10),
}, []string{"method"},
)

r.MustRegister(pm.hitsTotal, pm.queriesTotal, pm.queriesLatency)
}

Expand Down Expand Up @@ -143,5 +144,16 @@ func registerMetrics(cacheName, moduleType string, c *Cache) {
return float64(c.Evictions())
},
)
r.MustRegister(enabled, count, size, capacity, evictions)
maxEvictionTs := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "promscale",
Subsystem: "cache",
Name: "evicted_element_min_age_seconds",
Help: "Minimum age in seconds between element last usage and eviction",
ConstLabels: map[string]string{"type": moduleType, "name": cacheName},
}, func() float64 {
return float64(time.Now().Unix() - int64(c.MaxEvictionTs()))
},
)
r.MustRegister(enabled, count, size, capacity, evictions, maxEvictionTs)
}
11 changes: 9 additions & 2 deletions pkg/pgmodel/cache/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const DefaultSeriesCacheSize = 1000000

const growCheckDuration = time.Second * 5 // check whether to grow the series cache this often
const growFactor = float64(2.0) // multiply cache size by this factor when growing the cache
var evictionMaxAge = time.Minute * 2 // grow cache if we are evicting elements younger than `now - evictionMaxAge`
var evictionMaxAge = time.Minute * 5 // grow cache if we are evicting elements younger than `now - evictionMaxAge`

// SeriesCache is a cache of model.Series entries.
type SeriesCache interface {
Expand Down Expand Up @@ -134,7 +134,14 @@ func (t *SeriesCacheImpl) loadSeries(str string) (l *model.Series) {
// the even of multiple goroutines setting labels concurrently).
func (t *SeriesCacheImpl) setSeries(str string, lset *model.Series) *model.Series {
//str not counted twice in size since the key and lset.str will point to same thing.
val, _ := t.cache.Insert(str, lset, lset.FinalSizeBytes())
val, inCache := t.cache.Insert(str, lset, lset.FinalSizeBytes())
if !inCache {
// It seems that cache was full and eviction failed to remove
// element due to starvation caused by a lot of concurrent gets
// This is a signal to grow our cache
log.Info("growing cache because of eviction starvation")
t.grow()
}
return val.(*model.Series)
}

Expand Down

0 comments on commit 94e68c9

Please sign in to comment.