Skip to content

Commit

Permalink
Add in-memory chunk cache
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
  • Loading branch information
SungJin1212 committed Sep 30, 2024
1 parent fbe118b commit 975b5c3
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 14 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

## master / unreleased

* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188
Expand Down
9 changes: 8 additions & 1 deletion docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,17 @@ blocks_storage:
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# Backend for chunks cache, if not empty. Supported values: memcached,
# redis or inmemory.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
[backend: <string> | default = ""]

inmemory:
# Maximum size in bytes of in-memory chunk cache used to speed up chunk
# lookups (shared between all tenants).
# CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes
[max_size_bytes: <int> | default = 1073741824]

memcached:
# Comma separated list of memcached addresses. Supported prefixes are:
# dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV
Expand Down
9 changes: 8 additions & 1 deletion docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -903,10 +903,17 @@ blocks_storage:
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# Backend for chunks cache, if not empty. Supported values: memcached,
# redis or inmemory.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
[backend: <string> | default = ""]

inmemory:
# Maximum size in bytes of in-memory chunk cache used to speed up chunk
# lookups (shared between all tenants).
# CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes
[max_size_bytes: <int> | default = 1073741824]

memcached:
# Comma separated list of memcached addresses. Supported prefixes are:
# dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV
Expand Down
9 changes: 8 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1339,10 +1339,17 @@ bucket_store:
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# Backend for chunks cache, if not empty. Supported values: memcached, redis
# or inmemory.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
[backend: <string> | default = ""]

inmemory:
# Maximum size in bytes of in-memory chunk cache used to speed up chunk
# lookups (shared between all tenants).
# CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes
[max_size_bytes: <int> | default = 1073741824]

memcached:
# Comma separated list of memcached addresses. Supported prefixes are:
# dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query,
Expand Down
13 changes: 13 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
chunkCacheBackend: tsdb.CacheBackendRedis,
bucketIndexEnabled: true,
},
"blocks default sharding, in-memory chunk cache": {
blocksShardingStrategy: "default",
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: tsdb.CacheBackendInMemory,
bucketIndexEnabled: true,
},
"blocks shuffle sharding, in-memory chunk cache": {
blocksShardingStrategy: "shuffle-sharding",
tenantShardSize: 1,
indexCacheBackend: tsdb.IndexCacheBackendRedis,
chunkCacheBackend: tsdb.CacheBackendInMemory,
bucketIndexEnabled: true,
},
}

for testName, testCfg := range tests {
Expand Down
71 changes: 61 additions & 10 deletions pkg/storage/tsdb/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ import (
const (
CacheBackendMemcached = "memcached"
CacheBackendRedis = "redis"
CacheBackendInMemory = "inmemory"
)

type CacheBackend struct {
type MetadataCacheBackend struct {
Backend string `yaml:"backend"`
Memcached MemcachedClientConfig `yaml:"memcached"`
Redis RedisClientConfig `yaml:"redis"`
}

// Validate the config.
func (cfg *CacheBackend) Validate() error {
func (cfg *MetadataCacheBackend) Validate() error {
switch cfg.Backend {
case CacheBackendMemcached:
return cfg.Memcached.Validate()
Expand All @@ -46,8 +47,31 @@ func (cfg *CacheBackend) Validate() error {
return nil
}

type ChunkCacheBackend struct {
Backend string `yaml:"backend"`
InMemory InMemoryChunkCacheConfig `yaml:"inmemory"`
Memcached MemcachedClientConfig `yaml:"memcached"`
Redis RedisClientConfig `yaml:"redis"`
}

// Validate the config.
func (cfg *ChunkCacheBackend) Validate() error {
switch cfg.Backend {
case CacheBackendMemcached:
return cfg.Memcached.Validate()
case CacheBackendRedis:
return cfg.Redis.Validate()
case CacheBackendInMemory:
return nil
case "":
default:
return fmt.Errorf("unsupported cache backend: %s", cfg.Backend)
}
return nil
}

type ChunksCacheConfig struct {
CacheBackend `yaml:",inline"`
ChunkCacheBackend `yaml:",inline"`

SubrangeSize int64 `yaml:"subrange_size"`
MaxGetRangeRequests int `yaml:"max_get_range_requests"`
Expand All @@ -56,10 +80,11 @@ type ChunksCacheConfig struct {
}

func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s.", CacheBackendMemcached))
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s, %s or %s.", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory))

cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")

f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.")
f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.")
Expand All @@ -68,11 +93,11 @@ func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st
}

func (cfg *ChunksCacheConfig) Validate() error {
return cfg.CacheBackend.Validate()
return cfg.ChunkCacheBackend.Validate()
}

type MetadataCacheConfig struct {
CacheBackend `yaml:",inline"`
MetadataCacheBackend `yaml:",inline"`

TenantsListTTL time.Duration `yaml:"tenants_list_ttl"`
TenantBlocksListTTL time.Duration `yaml:"tenant_blocks_list_ttl"`
Expand Down Expand Up @@ -107,14 +132,14 @@ func (cfg *MetadataCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix
}

func (cfg *MetadataCacheConfig) Validate() error {
return cfg.CacheBackend.Validate()
return cfg.MetadataCacheBackend.Validate()
}

func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
cfg := cache.NewCachingBucketConfig()
cachingConfigured := false

chunksCache, err := createCache("chunks-cache", &chunksConfig.CacheBackend, logger, reg)
chunksCache, err := createChunkCache("chunks-cache", &chunksConfig.ChunkCacheBackend, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "chunks-cache")
}
Expand All @@ -124,7 +149,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
cfg.CacheGetRange("chunks", chunksCache, matchers.GetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests)
}

metadataCache, err := createCache("metadata-cache", &metadataConfig.CacheBackend, logger, reg)
metadataCache, err := createMetadataCache("metadata-cache", &metadataConfig.MetadataCacheBackend, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "metadata-cache")
}
Expand Down Expand Up @@ -152,12 +177,38 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
return storecache.NewCachingBucket(bkt, cfg, logger, reg)
}

func createCache(cacheName string, cacheBackend *CacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
switch cacheBackend.Backend {
case "":
// No caching.
return nil, nil
case CacheBackendMemcached:
var client cacheutil.MemcachedClient
client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create memcached client")
}
return cache.NewMemcachedCache(cacheName, logger, client, reg), nil

case CacheBackendRedis:
redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create redis client")
}
return cache.NewRedisCache(cacheName, logger, redisCache, reg), nil

default:
return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cacheBackend.Backend)
}
}

func createChunkCache(cacheName string, cacheBackend *ChunkCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
switch cacheBackend.Backend {
case "":
// No caching.
return nil, nil
case CacheBackendInMemory:
return newInMemoryChunkCache(cacheName, cacheBackend.InMemory, logger, reg)
case CacheBackendMemcached:
var client cacheutil.MemcachedClient
client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg)
Expand Down
64 changes: 64 additions & 0 deletions pkg/storage/tsdb/inmemory_chunk_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package tsdb

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/cache"
"github.com/thanos-io/thanos/pkg/model"

"github.com/VictoriaMetrics/fastcache"
)

type InMemoryChunkCache struct {
cache *cache.InMemoryCache
fcache *fastcache.Cache
name string
}

func newInMemoryChunkCache(name string, cfg InMemoryChunkCacheConfig, logger log.Logger, reg prometheus.Registerer) (*InMemoryChunkCache, error) {
maxCacheSize := model.Bytes(cfg.MaxSizeBytes)

// Calculate the max item size.
maxItemSize := defaultMaxItemSize
if maxItemSize > maxCacheSize {
maxItemSize = maxCacheSize
}

config := cache.InMemoryCacheConfig{
MaxSize: maxCacheSize,
MaxItemSize: maxItemSize,
}

inMemoryCache, err := cache.NewInMemoryCacheWithConfig(name, logger, reg, config)
if err != nil {
return nil, errors.Wrap(err, "create in-memory chunk cache")
}

fcache := fastcache.New(int(config.MaxSize))

inMemoryChunkCache := &InMemoryChunkCache{
cache: inMemoryCache,
fcache: fcache,
name: name,
}

return inMemoryChunkCache, nil
}

func (c *InMemoryChunkCache) Store(data map[string][]byte, ttl time.Duration) {
c.cache.Store(data, ttl)
}

// Fetch fetches multiple keys and returns a map containing cache hits
// In case of error, it logs and return an empty cache hits map.
func (c *InMemoryChunkCache) Fetch(ctx context.Context, keys []string) map[string][]byte {
return c.cache.Fetch(ctx, keys)
}

func (c *InMemoryChunkCache) Name() string {
return c.name
}
15 changes: 15 additions & 0 deletions pkg/storage/tsdb/inmemory_chunk_cache_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package tsdb

import (
"flag"

"github.com/alecthomas/units"
)

type InMemoryChunkCacheConfig struct {
MaxSizeBytes uint64 `yaml:"max_size_bytes"`
}

func (cfg *InMemoryChunkCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.Uint64Var(&cfg.MaxSizeBytes, prefix+"max-size-bytes", uint64(1*units.Gibibyte), "Maximum size in bytes of in-memory chunk cache used to speed up chunk lookups (shared between all tenants).")
}

0 comments on commit 975b5c3

Please sign in to comment.