Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing multi level index cache #5451

Merged
merged 4 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401
* [ENHANCEMENT] Distributor/Ingester: Add experimental `-distributor.sign_write_requests` flag to sign the write requests. #5430
* [ENHANCEMENT] Store Gateway/Querier/Compactor: Handling CMK Access Denied errors. #5420 #5442 #5446
* [ENHANCEMENT] Store Gateway: Implementing multi level index cache. #5451
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293
Expand Down
5 changes: 3 additions & 2 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,9 @@ blocks_storage:
[consistency_delay: <duration> | default = 0s]

index_cache:
# The index cache backend type. Supported values: inmemory, memcached,
# redis.
# The index cache backend type. Multiple cache backend can be provided as
# a comma-separated ordered list to enable the implementation of a cache
# hierarchy. Supported values: inmemory, memcached, redis.
# CLI flag: -blocks-storage.bucket-store.index-cache.backend
[backend: <string> | default = "inmemory"]

Expand Down
5 changes: 3 additions & 2 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,9 @@ blocks_storage:
[consistency_delay: <duration> | default = 0s]

index_cache:
# The index cache backend type. Supported values: inmemory, memcached,
# redis.
# The index cache backend type. Multiple cache backend can be provided as
# a comma-separated ordered list to enable the implementation of a cache
# hierarchy. Supported values: inmemory, memcached, redis.
# CLI flag: -blocks-storage.bucket-store.index-cache.backend
[backend: <string> | default = "inmemory"]

Expand Down
5 changes: 3 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1045,8 +1045,9 @@ bucket_store:
[consistency_delay: <duration> | default = 0s]

index_cache:
# The index cache backend type. Supported values: inmemory, memcached,
# redis.
# The index cache backend type. Multiple cache backend can be provided as a
# comma-separated ordered list to enable the implementation of a cache
# hierarchy. Supported values: inmemory, memcached, redis.
# CLI flag: -blocks-storage.bucket-store.index-cache.backend
[backend: <string> | default = "inmemory"]

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ require (
github.com/sony/gobreaker v0.5.0
github.com/spf13/afero v1.9.5
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea
github.com/thanos-io/thanos v0.31.1-0.20230711160112-df3a5f808726
github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d
go.etcd.io/etcd/api/v3 v3.5.8
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1160,12 +1160,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/tencentyun/cos-go-sdk-v5 v0.7.40 h1:W6vDGKCHe4wBACI1d2UgE6+50sJFhRWU4O8IB2ozzxM=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng=
github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM=
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a h1:tXcVeuval1nzdHn1JXqaBmyjuEUcpDI9huPrUF04nR4=
github.com/thanos-io/objstore v0.0.0-20230629211010-ff1b35b9841a/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM=
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca h1:JRF7i58HovirZQVJGwCClQsMK6CCmK2fvialXjeoSpI=
github.com/thanos-io/objstore v0.0.0-20230710163637-47c0118da0ca/go.mod h1:5V7lzXuaxwt6XFQoA/zJrhdnQrxq1+r0bwQ1iYOq3gM=
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea h1:kzK8sBn2+mo3NAxP+XjAjAqr1hwfxxFUy5CybaBkjAI=
github.com/thanos-io/promql-engine v0.0.0-20230526105742-791d78b260ea/go.mod h1:eIgPaXWgOhNAv6CPPrgu09r0AtT7byBTZy+7WkX0D18=
github.com/thanos-io/thanos v0.31.1-0.20230711160112-df3a5f808726 h1:DcjKUBKKMckA48Eua9H37+lOs13xDUx1PxixIs9hHHo=
github.com/thanos-io/thanos v0.31.1-0.20230711160112-df3a5f808726/go.mod h1:bDBl+vJEBXNkMvedh10vjDbvYkPyI6r2JJYJG0lLZTo=
github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054 h1:kBuXA0B+jXX89JAJTymw7g/v/4jyjCSgfPcWQeFUOoM=
github.com/thanos-io/thanos v0.31.1-0.20230712154708-a395c5dbd054/go.mod h1:C0Cdk0kFFEDS3qkTgScF9ONSjrPxqnScGPoIgah3NJY=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
84 changes: 63 additions & 21 deletions pkg/storage/tsdb/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
supportedIndexCacheBackends = []string{IndexCacheBackendInMemory, IndexCacheBackendMemcached, IndexCacheBackendRedis}

errUnsupportedIndexCacheBackend = errors.New("unsupported index cache backend")
errDuplicatedIndexCacheBackend = errors.New("duplicated index cache backend")
errNoIndexCacheAddresses = errors.New("no index cache backend addresses")
)

Expand All @@ -51,7 +52,10 @@ func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) {
}

func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Backend, prefix+"backend", IndexCacheBackendDefault, fmt.Sprintf("The index cache backend type. Supported values: %s.", strings.Join(supportedIndexCacheBackends, ", ")))
f.StringVar(&cfg.Backend, prefix+"backend", IndexCacheBackendDefault, fmt.Sprintf("The index cache backend type. "+
"Multiple cache backend can be provided as a comma-separated ordered list to enable the implementation of a cache hierarchy. "+
"Supported values: %s.",
strings.Join(supportedIndexCacheBackends, ", ")))

cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
Expand All @@ -60,18 +64,30 @@ func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix str

// Validate the config.
func (cfg *IndexCacheConfig) Validate() error {
if !util.StringsContain(supportedIndexCacheBackends, cfg.Backend) {
return errUnsupportedIndexCacheBackend
}

if cfg.Backend == IndexCacheBackendMemcached {
if err := cfg.Memcached.Validate(); err != nil {
return err
splitBackends := strings.Split(cfg.Backend, ",")
configuredBackends := map[string]struct{}{}

for _, backend := range splitBackends {
if !util.StringsContain(supportedIndexCacheBackends, backend) {
return errUnsupportedIndexCacheBackend
}

if _, ok := configuredBackends[backend]; ok {
return errors.WithMessagef(errDuplicatedIndexCacheBackend, "duplicated backend: %v", backend)
}
} else if cfg.Backend == IndexCacheBackendRedis {
if err := cfg.Redis.Validate(); err != nil {
return err

if backend == IndexCacheBackendMemcached {
if err := cfg.Memcached.Validate(); err != nil {
return err
}
} else if backend == IndexCacheBackendRedis {
if err := cfg.Redis.Validate(); err != nil {
return err
}
}

configuredBackends[backend] = struct{}{}
}

return nil
Expand All @@ -87,16 +103,42 @@ func (cfg *InMemoryIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, pr

// NewIndexCache creates a new index cache based on the input configuration.
func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
switch cfg.Backend {
case IndexCacheBackendInMemory:
return newInMemoryIndexCache(cfg.InMemory, logger, registerer)
case IndexCacheBackendMemcached:
return newMemcachedIndexCache(cfg.Memcached, logger, registerer)
case IndexCacheBackendRedis:
return newRedisIndexCache(cfg.Redis, logger, registerer)
default:
return nil, errUnsupportedIndexCacheBackend
splitBackends := strings.Split(cfg.Backend, ",")
var caches []storecache.IndexCache

for i, backend := range splitBackends {
iReg := registerer

// Create the level label if we have more than one cache
if len(splitBackends) > 1 {
iReg = prometheus.WrapRegistererWith(prometheus.Labels{"level": fmt.Sprintf("L%v", i)}, registerer)
alanprot marked this conversation as resolved.
Show resolved Hide resolved
}

switch backend {
case IndexCacheBackendInMemory:
c, err := newInMemoryIndexCache(cfg.InMemory, logger, iReg)
if err != nil {
return c, err
}
caches = append(caches, c)
case IndexCacheBackendMemcached:
c, err := newMemcachedIndexCache(cfg.Memcached, logger, iReg)
if err != nil {
return c, err
}
caches = append(caches, c)
case IndexCacheBackendRedis:
c, err := newRedisIndexCache(cfg.Redis, logger, iReg)
if err != nil {
return c, err
}
caches = append(caches, c)
default:
return nil, errUnsupportedIndexCacheBackend
}
}

return newMultiLevelCache(caches...), nil
}

func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
Expand All @@ -108,7 +150,7 @@ func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, regi
maxItemSize = maxCacheSize
}

return storecache.NewInMemoryIndexCacheWithConfig(logger, registerer, storecache.InMemoryIndexCacheConfig{
return storecache.NewInMemoryIndexCacheWithConfig(logger, nil, registerer, storecache.InMemoryIndexCacheConfig{
MaxSize: maxCacheSize,
MaxItemSize: maxItemSize,
})
Expand All @@ -129,5 +171,5 @@ func newRedisIndexCache(cfg RedisClientConfig, logger log.Logger, registerer pro
return nil, errors.Wrapf(err, "create index cache redis client")
}

return storecache.NewRemoteIndexCache(logger, client, registerer)
return storecache.NewRemoteIndexCache(logger, client, nil, registerer)
}
109 changes: 109 additions & 0 deletions pkg/storage/tsdb/multilevel_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package tsdb

import (
"context"
"sync"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
)

type multiLevelCache struct {
caches []storecache.IndexCache
}

func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
wg := sync.WaitGroup{}
wg.Add(len(m.caches))
for _, c := range m.caches {
cache := c
go func() {
defer wg.Done()
cache.StorePostings(blockID, l, v)
}()
}
wg.Wait()
}

func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
misses = keys
hits = map[labels.Label][]byte{}
for _, c := range m.caches {
h, m := c.FetchMultiPostings(ctx, blockID, misses)
misses = m

for label, bytes := range h {
hits[label] = bytes
}
if len(misses) == 0 {
break
}
}

return hits, misses
}

func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) {
wg := sync.WaitGroup{}
wg.Add(len(m.caches))
for _, c := range m.caches {
cache := c
go func() {
defer wg.Done()
cache.StoreExpandedPostings(blockID, matchers, v)
}()
}
wg.Wait()
}

func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) {
for _, c := range m.caches {
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers); h {
return d, h
}
}

return []byte{}, false
}

func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
wg := sync.WaitGroup{}
wg.Add(len(m.caches))
for _, c := range m.caches {
cache := c
go func() {
defer wg.Done()
cache.StoreSeries(blockID, id, v)
}()
}
wg.Wait()
}

func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
misses = ids
hits = map[storage.SeriesRef][]byte{}
for _, c := range m.caches {
h, m := c.FetchMultiSeries(ctx, blockID, misses)
misses = m

for label, bytes := range h {
hits[label] = bytes
}
if len(misses) == 0 {
break
}
}

return hits, misses
}

func newMultiLevelCache(c ...storecache.IndexCache) storecache.IndexCache {
if len(c) == 1 {
return c[0]
alanprot marked this conversation as resolved.
Show resolved Hide resolved
}
return &multiLevelCache{
caches: c,
}
}
Loading