Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
back schemas and aggs matching with a cache
Browse files Browse the repository at this point in the history
before this, all regex matching dominated the cpu profile.
With this, cpu usage reduced by easily 5x
Though we still have:
      flat                     cumulative
70ms  0.86% 69.79%      770ms  9.49%
github.com/raintank/metrictank/mdata/matchcache.(*Cache).Get

due to the map locking

We could further optimize this, probably, by changing the
idx.AddOrUpdate signature to returning SchemaI and AggI, instead
of requiring it as input as @replay suggested.
This way we only have to match if it wasn't in the index already.
However this requires more intensive changes to the index than
I'm comfortable with right now (DefById only has the metricdef, not the
properties, we could add them but then we need to adjust how we work
with DefById everywhere and do we still need to store the properties in the tree, etc)
I rather re-address this when the need is clearer and we have time to
give this the attention it deserves.
  • Loading branch information
Dieterbe committed Mar 9, 2017
1 parent 486d645 commit 0842aa1
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 3 deletions.
7 changes: 7 additions & 0 deletions mdata/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"io/ioutil"
"log"
"regexp"
"time"

"github.com/lomik/go-carbon/persister"
whisper "github.com/lomik/go-whisper"
"github.com/raintank/metrictank/mdata/matchcache"
"github.com/raintank/metrictank/stats"
)

Expand Down Expand Up @@ -49,6 +51,8 @@ var (
// set either via ConfigProcess or from the unit tests. other code should not touch
Schemas persister.WhisperSchemas
Aggregations persister.WhisperAggregation
schemasCache *matchcache.Cache
aggsCache *matchcache.Cache

schemasFile = "/etc/metrictank/storage-schemas.conf"
aggFile = "/etc/metrictank/storage-aggregation.conf"
Expand Down Expand Up @@ -93,4 +97,7 @@ func ConfigProcess() {
log.Fatalf("can't read storage-aggregation file %q: %s", aggFile, err.Error())
}

schemasCache = matchcache.New(time.Hour, time.Hour)
aggsCache = matchcache.New(time.Hour, time.Hour)

}
63 changes: 63 additions & 0 deletions mdata/matchcache/matchcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package matchcache

import (
"sync"
"time"
)

// Cache caches key to uint16 lookups (for schemas and aggregations)
// when it cleans the cache it locks up the entire cache
// this is a tradeoff we can make for simplicity, since this sits on the ingestion
// path, where occasional stalls are ok.
type Cache struct {
sync.Mutex
data map[string]Item

cleanInterval time.Duration
expireAfter time.Duration
}

type Item struct {
val uint16
seen int64
}

func New(cleanInterval, expireAfter time.Duration) *Cache {
m := &Cache{
data: make(map[string]Item),
cleanInterval: cleanInterval,
expireAfter: expireAfter,
}
go m.maintain()
return m
}

type AddFunc func(key string) uint16

// if not in cache, will atomically add it using the provided function
func (m *Cache) Get(key string, fn AddFunc) uint16 {
m.Lock()
item, ok := m.data[key]
if !ok {
item.val = fn(key)
}
item.seen = time.Now().Unix()
m.data[key] = item
m.Unlock()
return item.val
}

func (m *Cache) maintain() {
ticker := time.NewTicker(m.cleanInterval)
diff := int64(m.expireAfter.Seconds())
for now := range ticker.C {
nowUnix := now.Unix()
m.Lock()
for key, item := range m.data {
if nowUnix-item.seen > diff {
delete(m.data, key)
}
}
m.Unlock()
}
}
13 changes: 10 additions & 3 deletions mdata/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@ import (
// MatchSchema returns the schema for the given metric key, and the index of the schema (to efficiently reference it)
// it will always find the schema because we made sure there is a catchall '.*' pattern
func MatchSchema(key string) (uint16, persister.Schema) {
i, schema, _ := Schemas.Match(key)
return i, schema
i := schemasCache.Get(key, func(key string) uint16 {
i, _, _ := Schemas.Match(key)
return i
})
return i, Schemas[i]
}

// MatchAgg returns the aggregation definition for the given metric key, and the index of it (to efficiently reference it)
// i may be 1 more than the last defined by user, in which case it's the default.
func MatchAgg(key string) (uint16, persister.WhisperAggregationItem) {
return Aggregations.Match(key)
i := aggsCache.Get(key, func(key string) uint16 {
i, _ := Aggregations.Match(key)
return i
})
return i, GetAgg(i)
}

// caller must assure i is valid
Expand Down

0 comments on commit 0842aa1

Please sign in to comment.