From 975b5c375263e43a4a5db0b584f18ff61bf859a8 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 30 Sep 2024 14:55:56 +0900 Subject: [PATCH] Add in-memory chunk cache Signed-off-by: SungJin1212 --- CHANGELOG.md | 3 +- docs/blocks-storage/querier.md | 9 ++- docs/blocks-storage/store-gateway.md | 9 ++- docs/configuration/config-file-reference.md | 9 ++- integration/querier_test.go | 13 ++++ pkg/storage/tsdb/caching_bucket.go | 71 ++++++++++++++++--- pkg/storage/tsdb/inmemory_chunk_cache.go | 64 +++++++++++++++++ .../tsdb/inmemory_chunk_cache_config.go | 15 ++++ 8 files changed, 179 insertions(+), 14 deletions(-) create mode 100644 pkg/storage/tsdb/inmemory_chunk_cache.go create mode 100644 pkg/storage/tsdb/inmemory_chunk_cache_config.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4305f4ed27..096200888b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,11 @@ ## master / unreleased -* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527 * [CHANGE] Enable Compactor and Alertmanager in target all. #6204 +* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527 * [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151 * [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129 +* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245 * [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232 * [ENHANCEMENT] Query Frontend: Add info field to query response. #6207 * [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 00605f60e4..ad797ea23c 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -788,10 +788,17 @@ blocks_storage: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached. + # Backend for chunks cache, if not empty. Supported values: memcached, + # redis or inmemory. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] + inmemory: + # Maximum size in bytes of in-memory chunk cache used to speed up chunk + # lookups (shared between all tenants). + # CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + memcached: # Comma separated list of memcached addresses. Supported prefixes are: # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 3303ec9cb9..34f776b095 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -903,10 +903,17 @@ blocks_storage: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached. + # Backend for chunks cache, if not empty. Supported values: memcached, + # redis or inmemory. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] + inmemory: + # Maximum size in bytes of in-memory chunk cache used to speed up chunk + # lookups (shared between all tenants). + # CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + memcached: # Comma separated list of memcached addresses. Supported prefixes are: # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 3146f68c3c..866671670c 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1339,10 +1339,17 @@ bucket_store: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached. + # Backend for chunks cache, if not empty. Supported values: memcached, redis + # or inmemory. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] + inmemory: + # Maximum size in bytes of in-memory chunk cache used to speed up chunk + # lookups (shared between all tenants). + # CLI flag: -blocks-storage.bucket-store.chunks-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + memcached: # Comma separated list of memcached addresses. Supported prefixes are: # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, diff --git a/integration/querier_test.go b/integration/querier_test.go index a261a6d2c6..ae442bcb16 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -97,6 +97,19 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { chunkCacheBackend: tsdb.CacheBackendRedis, bucketIndexEnabled: true, }, + "blocks default sharding, in-memory chunk cache": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, + "blocks shuffle sharding, in-memory chunk cache": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, } for testName, testCfg := range tests { diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index cbd9efee59..205ecd348c 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -24,16 +24,17 @@ import ( const ( CacheBackendMemcached = "memcached" CacheBackendRedis = "redis" + CacheBackendInMemory = "inmemory" ) -type CacheBackend struct { +type MetadataCacheBackend struct { Backend string `yaml:"backend"` Memcached MemcachedClientConfig `yaml:"memcached"` Redis RedisClientConfig `yaml:"redis"` } // Validate the config. -func (cfg *CacheBackend) Validate() error { +func (cfg *MetadataCacheBackend) Validate() error { switch cfg.Backend { case CacheBackendMemcached: return cfg.Memcached.Validate() @@ -46,8 +47,31 @@ func (cfg *CacheBackend) Validate() error { return nil } +type ChunkCacheBackend struct { + Backend string `yaml:"backend"` + InMemory InMemoryChunkCacheConfig `yaml:"inmemory"` + Memcached MemcachedClientConfig `yaml:"memcached"` + Redis RedisClientConfig `yaml:"redis"` +} + +// Validate the config. +func (cfg *ChunkCacheBackend) Validate() error { + switch cfg.Backend { + case CacheBackendMemcached: + return cfg.Memcached.Validate() + case CacheBackendRedis: + return cfg.Redis.Validate() + case CacheBackendInMemory: + return nil + case "": + default: + return fmt.Errorf("unsupported cache backend: %s", cfg.Backend) + } + return nil +} + type ChunksCacheConfig struct { - CacheBackend `yaml:",inline"` + ChunkCacheBackend `yaml:",inline"` SubrangeSize int64 `yaml:"subrange_size"` MaxGetRangeRequests int `yaml:"max_get_range_requests"` @@ -56,10 +80,11 @@ type ChunksCacheConfig struct { } func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s.", CacheBackendMemcached)) + f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s, %s or %s.", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory)) cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.") cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.") + cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.") f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.") f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.") @@ -68,11 +93,11 @@ func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st } func (cfg *ChunksCacheConfig) Validate() error { - return cfg.CacheBackend.Validate() + return cfg.ChunkCacheBackend.Validate() } type MetadataCacheConfig struct { - CacheBackend `yaml:",inline"` + MetadataCacheBackend `yaml:",inline"` TenantsListTTL time.Duration `yaml:"tenants_list_ttl"` TenantBlocksListTTL time.Duration `yaml:"tenant_blocks_list_ttl"` @@ -107,14 +132,14 @@ func (cfg *MetadataCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix } func (cfg *MetadataCacheConfig) Validate() error { - return cfg.CacheBackend.Validate() + return cfg.MetadataCacheBackend.Validate() } func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { cfg := cache.NewCachingBucketConfig() cachingConfigured := false - chunksCache, err := createCache("chunks-cache", &chunksConfig.CacheBackend, logger, reg) + chunksCache, err := createChunkCache("chunks-cache", &chunksConfig.ChunkCacheBackend, logger, reg) if err != nil { return nil, errors.Wrapf(err, "chunks-cache") } @@ -124,7 +149,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata cfg.CacheGetRange("chunks", chunksCache, matchers.GetChunksMatcher(), chunksConfig.SubrangeSize, chunksConfig.AttributesTTL, chunksConfig.SubrangeTTL, chunksConfig.MaxGetRangeRequests) } - metadataCache, err := createCache("metadata-cache", &metadataConfig.CacheBackend, logger, reg) + metadataCache, err := createMetadataCache("metadata-cache", &metadataConfig.MetadataCacheBackend, logger, reg) if err != nil { return nil, errors.Wrapf(err, "metadata-cache") } @@ -152,12 +177,38 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata return storecache.NewCachingBucket(bkt, cfg, logger, reg) } -func createCache(cacheName string, cacheBackend *CacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { +func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { switch cacheBackend.Backend { case "": // No caching. return nil, nil + case CacheBackendMemcached: + var client cacheutil.MemcachedClient + client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg) + if err != nil { + return nil, errors.Wrapf(err, "failed to create memcached client") + } + return cache.NewMemcachedCache(cacheName, logger, client, reg), nil + + case CacheBackendRedis: + redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg) + if err != nil { + return nil, errors.Wrapf(err, "failed to create redis client") + } + return cache.NewRedisCache(cacheName, logger, redisCache, reg), nil + default: + return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cacheBackend.Backend) + } +} + +func createChunkCache(cacheName string, cacheBackend *ChunkCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { + switch cacheBackend.Backend { + case "": + // No caching. + return nil, nil + case CacheBackendInMemory: + return newInMemoryChunkCache(cacheName, cacheBackend.InMemory, logger, reg) case CacheBackendMemcached: var client cacheutil.MemcachedClient client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg) diff --git a/pkg/storage/tsdb/inmemory_chunk_cache.go b/pkg/storage/tsdb/inmemory_chunk_cache.go new file mode 100644 index 0000000000..1c8483580e --- /dev/null +++ b/pkg/storage/tsdb/inmemory_chunk_cache.go @@ -0,0 +1,64 @@ +package tsdb + +import ( + "context" + "time" + + "github.com/go-kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/cache" + "github.com/thanos-io/thanos/pkg/model" + + "github.com/VictoriaMetrics/fastcache" +) + +type InMemoryChunkCache struct { + cache *cache.InMemoryCache + fcache *fastcache.Cache + name string +} + +func newInMemoryChunkCache(name string, cfg InMemoryChunkCacheConfig, logger log.Logger, reg prometheus.Registerer) (*InMemoryChunkCache, error) { + maxCacheSize := model.Bytes(cfg.MaxSizeBytes) + + // Calculate the max item size. + maxItemSize := defaultMaxItemSize + if maxItemSize > maxCacheSize { + maxItemSize = maxCacheSize + } + + config := cache.InMemoryCacheConfig{ + MaxSize: maxCacheSize, + MaxItemSize: maxItemSize, + } + + inMemoryCache, err := cache.NewInMemoryCacheWithConfig(name, logger, reg, config) + if err != nil { + return nil, errors.Wrap(err, "create in-memory chunk cache") + } + + fcache := fastcache.New(int(config.MaxSize)) + + inMemoryChunkCache := &InMemoryChunkCache{ + cache: inMemoryCache, + fcache: fcache, + name: name, + } + + return inMemoryChunkCache, nil +} + +func (c *InMemoryChunkCache) Store(data map[string][]byte, ttl time.Duration) { + c.cache.Store(data, ttl) +} + +// Fetch fetches multiple keys and returns a map containing cache hits +// In case of error, it logs and return an empty cache hits map. +func (c *InMemoryChunkCache) Fetch(ctx context.Context, keys []string) map[string][]byte { + return c.cache.Fetch(ctx, keys) +} + +func (c *InMemoryChunkCache) Name() string { + return c.name +} diff --git a/pkg/storage/tsdb/inmemory_chunk_cache_config.go b/pkg/storage/tsdb/inmemory_chunk_cache_config.go new file mode 100644 index 0000000000..e3dd45906c --- /dev/null +++ b/pkg/storage/tsdb/inmemory_chunk_cache_config.go @@ -0,0 +1,15 @@ +package tsdb + +import ( + "flag" + + "github.com/alecthomas/units" +) + +type InMemoryChunkCacheConfig struct { + MaxSizeBytes uint64 `yaml:"max_size_bytes"` +} + +func (cfg *InMemoryChunkCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.Uint64Var(&cfg.MaxSizeBytes, prefix+"max-size-bytes", uint64(1*units.Gibibyte), "Maximum size in bytes of in-memory chunk cache used to speed up chunk lookups (shared between all tenants).") +}