diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c192bfe7b..a53ec90773 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 * [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683 * [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684 +* [ENHANCEMENT] Index Cache: Multi level cache adds config `max_backfill_items` to cap max items to backfill per async operation. #5686 ## 1.16.0 2023-11-20 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 524a2699fa..4c303613ca 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -719,6 +719,10 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size [max_async_buffer_size: | default = 10000] + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 5a83eecbb3..2d1d89bf3e 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -834,6 +834,10 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size [max_async_buffer_size: | default = 10000] + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index b5ab659830..c409a276ec 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1268,6 +1268,10 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size [max_async_buffer_size: | default = 10000] + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + chunks_cache: # Backend for chunks cache, if not empty. Supported values: memcached. # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend diff --git a/go.mod b/go.mod index fe9889d96d..5bfbe71179 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( github.com/VictoriaMetrics/fastcache v1.12.1 github.com/cespare/xxhash/v2 v2.2.0 github.com/google/go-cmp v0.6.0 + golang.org/x/exp v0.0.0-20231006140011-7918f672742d google.golang.org/protobuf v1.31.0 ) @@ -217,7 +218,6 @@ require ( go4.org/intern v0.0.0-20230525184215-6c62f75575cb // indirect go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect golang.org/x/crypto v0.15.0 // indirect - golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/mod v0.13.0 // indirect golang.org/x/oauth2 v0.14.0 // indirect golang.org/x/sys v0.14.0 // indirect diff --git a/pkg/storage/tsdb/index_cache.go b/pkg/storage/tsdb/index_cache.go index b52532bce6..74ba5a257d 100644 --- a/pkg/storage/tsdb/index_cache.go +++ b/pkg/storage/tsdb/index_cache.go @@ -44,6 +44,7 @@ var ( errNoIndexCacheAddresses = errors.New("no index cache backend addresses") errInvalidMaxAsyncConcurrency = errors.New("invalid max_async_concurrency, must greater than 0") errInvalidMaxAsyncBufferSize = errors.New("invalid max_async_buffer_size, must greater than 0") + errInvalidMaxBackfillItems = errors.New("invalid max_backfill_items, must greater than 0") ) type IndexCacheConfig struct { @@ -114,6 +115,7 @@ func (cfg *IndexCacheConfig) Validate() error { type MultiLevelIndexCacheConfig struct { MaxAsyncConcurrency int `yaml:"max_async_concurrency"` MaxAsyncBufferSize int `yaml:"max_async_buffer_size"` + MaxBackfillItems int `yaml:"max_backfill_items"` } func (cfg *MultiLevelIndexCacheConfig) Validate() error { @@ -123,12 +125,16 @@ func (cfg *MultiLevelIndexCacheConfig) Validate() error { if cfg.MaxAsyncConcurrency <= 0 { return errInvalidMaxAsyncConcurrency } + if cfg.MaxBackfillItems <= 0 { + return errInvalidMaxBackfillItems + } return nil } func (cfg *MultiLevelIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 50, "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.") f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.") + f.IntVar(&cfg.MaxBackfillItems, prefix+"max-backfill-items", 10000, "The maximum number of items to backfill per asynchronous operation.") } type InMemoryIndexCacheConfig struct { @@ -187,7 +193,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu splitBackends := strings.Split(cfg.Backend, ",") var ( caches []storecache.IndexCache - enabledItems []string + enabledItems [][]string ) for i, backend := range splitBackends { @@ -205,7 +211,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu return c, err } caches = append(caches, c) - enabledItems = cfg.InMemory.EnabledItems + enabledItems = append(enabledItems, cfg.InMemory.EnabledItems) case IndexCacheBackendMemcached: c, err := newMemcachedIndexCacheClient(cfg.Memcached.ClientConfig, logger, registerer) if err != nil { @@ -217,7 +223,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu return nil, err } caches = append(caches, cache) - enabledItems = cfg.Memcached.EnabledItems + enabledItems = append(enabledItems, cfg.Memcached.EnabledItems) case IndexCacheBackendRedis: c, err := newRedisIndexCacheClient(cfg.Redis.ClientConfig, logger, iReg) if err != nil { @@ -229,18 +235,13 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu return nil, err } caches = append(caches, cache) - enabledItems = cfg.Redis.EnabledItems + enabledItems = append(enabledItems, cfg.Redis.EnabledItems) default: return nil, errUnsupportedIndexCacheBackend } - if len(enabledItems) > 0 { - latestCache := caches[len(caches)-1] - cache := storecache.NewFilteredIndexCache(latestCache, enabledItems) - caches[len(caches)-1] = cache - } } - return newMultiLevelCache(registerer, cfg.MultiLevel, caches...), nil + return newMultiLevelCache(registerer, cfg.MultiLevel, enabledItems, caches...), nil } func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) { diff --git a/pkg/storage/tsdb/multilevel_cache.go b/pkg/storage/tsdb/multilevel_cache.go index 2511b4259d..7e7a35eadc 100644 --- a/pkg/storage/tsdb/multilevel_cache.go +++ b/pkg/storage/tsdb/multilevel_cache.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/cacheutil" storecache "github.com/thanos-io/thanos/pkg/store/cache" + "golang.org/x/exp/slices" ) const ( @@ -21,18 +22,22 @@ const ( ) type multiLevelCache struct { - caches []storecache.IndexCache + postingsCaches, seriesCaches, expandedPostingCaches []storecache.IndexCache - fetchLatency *prometheus.HistogramVec - backFillLatency *prometheus.HistogramVec - backfillProcessor *cacheutil.AsyncOperationProcessor - backfillDroppedItems *prometheus.CounterVec + fetchLatency *prometheus.HistogramVec + backFillLatency *prometheus.HistogramVec + backfillProcessor *cacheutil.AsyncOperationProcessor + backfillDroppedPostings prometheus.Counter + backfillDroppedSeries prometheus.Counter + backfillDroppedExpandedPostings prometheus.Counter + + maxBackfillItems int } func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { wg := sync.WaitGroup{} - wg.Add(len(m.caches)) - for _, c := range m.caches { + wg.Add(len(m.postingsCaches)) + for _, c := range m.postingsCaches { cache := c go func() { defer wg.Done() @@ -48,9 +53,9 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U misses = keys hits = map[labels.Label][]byte{} - backfillItems := make([]map[labels.Label][]byte, len(m.caches)-1) - for i, c := range m.caches { - if i < len(m.caches)-1 { + backfillItems := make([]map[labels.Label][]byte, len(m.postingsCaches)-1) + for i, c := range m.postingsCaches { + if i < len(m.postingsCaches)-1 { backfillItems[i] = map[labels.Label][]byte{} } if ctx.Err() != nil { @@ -76,14 +81,23 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings)) defer backFillTimer.ObserveDuration() for i, values := range backfillItems { - for lbl, b := range values { - lbl := lbl - b := b - if err := m.backfillProcessor.EnqueueAsync(func() { - m.caches[i].StorePostings(blockID, lbl, b, tenant) - }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { - m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc() + i := i + values := values + if len(values) == 0 { + continue + } + if err := m.backfillProcessor.EnqueueAsync(func() { + cnt := 0 + for lbl, b := range values { + m.postingsCaches[i].StorePostings(blockID, lbl, b, tenant) + cnt++ + if cnt == m.maxBackfillItems { + m.backfillDroppedPostings.Add(float64(len(values) - cnt)) + return + } } + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.backfillDroppedPostings.Add(float64(len(values))) } } }() @@ -93,8 +107,8 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { wg := sync.WaitGroup{} - wg.Add(len(m.caches)) - for _, c := range m.caches { + wg.Add(len(m.expandedPostingCaches)) + for _, c := range m.expandedPostingCaches { cache := c go func() { defer wg.Done() @@ -108,7 +122,7 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeExpandedPostings)) defer timer.ObserveDuration() - for i, c := range m.caches { + for i, c := range m.expandedPostingCaches { if ctx.Err() != nil { return nil, false } @@ -116,9 +130,9 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli if i > 0 { backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeExpandedPostings)) if err := m.backfillProcessor.EnqueueAsync(func() { - m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant) + m.expandedPostingCaches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant) }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { - m.backfillDroppedItems.WithLabelValues(cacheTypeExpandedPostings).Inc() + m.backfillDroppedExpandedPostings.Inc() } backFillTimer.ObserveDuration() } @@ -131,8 +145,8 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { wg := sync.WaitGroup{} - wg.Add(len(m.caches)) - for _, c := range m.caches { + wg.Add(len(m.seriesCaches)) + for _, c := range m.seriesCaches { cache := c go func() { defer wg.Done() @@ -148,10 +162,10 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI misses = ids hits = map[storage.SeriesRef][]byte{} - backfillItems := make([]map[storage.SeriesRef][]byte, len(m.caches)-1) + backfillItems := make([]map[storage.SeriesRef][]byte, len(m.seriesCaches)-1) - for i, c := range m.caches { - if i < len(m.caches)-1 { + for i, c := range m.seriesCaches { + if i < len(m.seriesCaches)-1 { backfillItems[i] = map[storage.SeriesRef][]byte{} } if ctx.Err() != nil { @@ -177,14 +191,23 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries)) defer backFillTimer.ObserveDuration() for i, values := range backfillItems { - for ref, b := range values { - ref := ref - b := b - if err := m.backfillProcessor.EnqueueAsync(func() { - m.caches[i].StoreSeries(blockID, ref, b, tenant) - }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { - m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc() + i := i + values := values + if len(values) == 0 { + continue + } + if err := m.backfillProcessor.EnqueueAsync(func() { + cnt := 0 + for ref, b := range values { + m.seriesCaches[i].StoreSeries(blockID, ref, b, tenant) + cnt++ + if cnt == m.maxBackfillItems { + m.backfillDroppedSeries.Add(float64(len(values) - cnt)) + return + } } + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.backfillDroppedSeries.Add(float64(len(values))) } } }() @@ -192,14 +215,33 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI return hits, misses } -func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfig, c ...storecache.IndexCache) storecache.IndexCache { +func filterCachesByItem(enabledItems [][]string, cachedItem string, c ...storecache.IndexCache) []storecache.IndexCache { + filteredCaches := make([]storecache.IndexCache, 0, len(c)) + for i := range enabledItems { + if len(enabledItems[i]) == 0 || slices.Contains(enabledItems[i], cachedItem) { + filteredCaches = append(filteredCaches, c[i]) + } + } + return filteredCaches +} + +func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfig, enabledItems [][]string, c ...storecache.IndexCache) storecache.IndexCache { if len(c) == 1 { - return c[0] + if len(enabledItems[0]) == 0 { + return c[0] + } + return storecache.NewFilteredIndexCache(c[0], enabledItems[0]) } + backfillDroppedItems := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_store_multilevel_index_cache_backfill_dropped_items_total", + Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ", + }, []string{"item_type"}) return &multiLevelCache{ - caches: c, - backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency), + postingsCaches: filterCachesByItem(enabledItems, cacheTypePostings, c...), + seriesCaches: filterCachesByItem(enabledItems, cacheTypeSeries, c...), + expandedPostingCaches: filterCachesByItem(enabledItems, cacheTypeExpandedPostings, c...), + backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency), fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds", Help: "Histogram to track latency to fetch items from multi level index cache", @@ -210,9 +252,9 @@ func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfi Help: "Histogram to track latency to backfill items from multi level index cache", Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90}, }, []string{"item_type"}), - backfillDroppedItems: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_store_multilevel_index_cache_backfill_dropped_items_total", - Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ", - }, []string{"item_type"}), + backfillDroppedPostings: backfillDroppedItems.WithLabelValues(cacheTypePostings), + backfillDroppedSeries: backfillDroppedItems.WithLabelValues(cacheTypeSeries), + backfillDroppedExpandedPostings: backfillDroppedItems.WithLabelValues(cacheTypeExpandedPostings), + maxBackfillItems: cfg.MaxBackfillItems, } } diff --git a/pkg/storage/tsdb/multilevel_cache_test.go b/pkg/storage/tsdb/multilevel_cache_test.go index 263735cbde..d66390a1c6 100644 --- a/pkg/storage/tsdb/multilevel_cache_test.go +++ b/pkg/storage/tsdb/multilevel_cache_test.go @@ -22,6 +22,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { multiLevelCfg := MultiLevelIndexCacheConfig{ MaxAsyncBufferSize: 1, MaxAsyncConcurrency: 1, + MaxBackfillItems: 1, } s, err := miniredis.Run() if err != nil { @@ -73,6 +74,42 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { expectedType: &storecache.InMemoryIndexCache{}, expectedValidationError: errDuplicatedIndexCacheBackend, }, + "validate multilevel cache max MaxAsyncBufferSize": { + cfg: IndexCacheConfig{ + Backend: "inmemory,memcached", + MultiLevel: MultiLevelIndexCacheConfig{ + MaxAsyncBufferSize: 0, + MaxAsyncConcurrency: 1, + MaxBackfillItems: 1, + }, + }, + expectedType: &storecache.InMemoryIndexCache{}, + expectedValidationError: errInvalidMaxAsyncBufferSize, + }, + "validate multilevel cache max MaxAsyncConcurrency": { + cfg: IndexCacheConfig{ + Backend: "inmemory,memcached", + MultiLevel: MultiLevelIndexCacheConfig{ + MaxAsyncBufferSize: 1, + MaxAsyncConcurrency: 0, + MaxBackfillItems: 1, + }, + }, + expectedType: &storecache.InMemoryIndexCache{}, + expectedValidationError: errInvalidMaxAsyncConcurrency, + }, + "validate multilevel cache max MaxBackfillItems": { + cfg: IndexCacheConfig{ + Backend: "inmemory,memcached", + MultiLevel: MultiLevelIndexCacheConfig{ + MaxAsyncBufferSize: 1, + MaxAsyncConcurrency: 1, + MaxBackfillItems: 0, + }, + }, + expectedType: &storecache.InMemoryIndexCache{}, + expectedValidationError: errInvalidMaxBackfillItems, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { @@ -86,7 +123,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { _, err = cacheutil.NewMemcachedClientWithConfig(log.NewNopLogger(), "test", cacheutil.MemcachedClientConfig{MaxAsyncConcurrency: 2, Addresses: []string{s.Addr()}, DNSProviderUpdateInterval: 1}, reg) require.NoError(t, err) } else { - require.ErrorIs(t, tc.cfg.Validate(), errDuplicatedIndexCacheBackend) + require.ErrorIs(t, tc.cfg.Validate(), tc.expectedValidationError) } }) } @@ -96,6 +133,7 @@ func Test_MultiLevelCache(t *testing.T) { cfg := MultiLevelIndexCacheConfig{ MaxAsyncConcurrency: 10, MaxAsyncBufferSize: 100000, + MaxBackfillItems: 10000, } bID, _ := ulid.Parse("01D78XZ44G0000000000000000") ctx := context.Background() @@ -125,6 +163,7 @@ func Test_MultiLevelCache(t *testing.T) { m2ExpectedCalls map[string][][]interface{} m1MockedCalls map[string][]interface{} m2MockedCalls map[string][]interface{} + enabledItems [][]string call func(storecache.IndexCache) }{ "[StorePostings] Should store on all caches": { @@ -138,6 +177,19 @@ func Test_MultiLevelCache(t *testing.T) { cache.StorePostings(bID, l1, v, "") }, }, + "[StorePostings] Should store on m2 only": { + m1ExpectedCalls: map[string][][]interface{}{}, + m2ExpectedCalls: map[string][][]interface{}{ + "StorePostings": {{bID, l1, v}}, + }, + enabledItems: [][]string{ + {cacheTypeSeries}, + {}, + }, + call: func(cache storecache.IndexCache) { + cache.StorePostings(bID, l1, v, "") + }, + }, "[StoreSeries] Should store on all caches": { m1ExpectedCalls: map[string][][]interface{}{ "StoreSeries": {{bID, storage.SeriesRef(1), v}}, @@ -149,6 +201,19 @@ func Test_MultiLevelCache(t *testing.T) { cache.StoreSeries(bID, 1, v, "") }, }, + "[StoreSeries] Should store on m2 only": { + m1ExpectedCalls: map[string][][]interface{}{}, + m2ExpectedCalls: map[string][][]interface{}{ + "StoreSeries": {{bID, storage.SeriesRef(1), v}}, + }, + enabledItems: [][]string{ + {cacheTypePostings}, + {}, + }, + call: func(cache storecache.IndexCache) { + cache.StoreSeries(bID, 1, v, "") + }, + }, "[StoreExpandedPostings] Should store on all caches": { m1ExpectedCalls: map[string][][]interface{}{ "StoreExpandedPostings": {{bID, []*labels.Matcher{matcher}, v}}, @@ -160,6 +225,19 @@ func Test_MultiLevelCache(t *testing.T) { cache.StoreExpandedPostings(bID, []*labels.Matcher{matcher}, v, "") }, }, + "[StoreExpandedPostings] Should store on m2 only": { + m1ExpectedCalls: map[string][][]interface{}{}, + m2ExpectedCalls: map[string][][]interface{}{ + "StoreExpandedPostings": {{bID, []*labels.Matcher{matcher}, v}}, + }, + enabledItems: [][]string{ + {cacheTypePostings}, + {}, + }, + call: func(cache storecache.IndexCache) { + cache.StoreExpandedPostings(bID, []*labels.Matcher{matcher}, v, "") + }, + }, "[FetchMultiPostings] Should fallback when all misses": { m1ExpectedCalls: map[string][][]interface{}{ "FetchMultiPostings": {{bID, []labels.Label{l1, l2}}}, @@ -207,6 +285,23 @@ func Test_MultiLevelCache(t *testing.T) { cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2, l3}, "") }, }, + "[FetchMultiPostings] m1 doesn't enable postings": { + m1ExpectedCalls: map[string][][]interface{}{}, + m2ExpectedCalls: map[string][][]interface{}{ + "FetchMultiPostings": {{bID, []labels.Label{l1, l2, l3}}}, + }, + m1MockedCalls: map[string][]interface{}{}, + m2MockedCalls: map[string][]interface{}{ + "FetchMultiPostings": {map[labels.Label][]byte{l1: v, l2: v, l3: v2}, []labels.Label{}}, + }, + enabledItems: [][]string{ + {cacheTypeSeries}, + {}, + }, + call: func(cache storecache.IndexCache) { + cache.FetchMultiPostings(ctx, bID, []labels.Label{l1, l2, l3}, "") + }, + }, "[FetchMultiPostings] should not fallback when all hit on l1": { m1ExpectedCalls: map[string][][]interface{}{ "FetchMultiPostings": {{bID, []labels.Label{l1, l2}}}, @@ -269,6 +364,23 @@ func Test_MultiLevelCache(t *testing.T) { cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2, 3}, "") }, }, + "[FetchMultiPostings] m1 doesn't enable series": { + m1ExpectedCalls: map[string][][]interface{}{}, + m2ExpectedCalls: map[string][][]interface{}{ + "FetchMultiSeries": {{bID, []storage.SeriesRef{1, 2, 3}}}, + }, + m1MockedCalls: map[string][]interface{}{}, + m2MockedCalls: map[string][]interface{}{ + "FetchMultiSeries": {map[storage.SeriesRef][]byte{1: v, 2: v, 3: v2}, []storage.SeriesRef{}}, + }, + enabledItems: [][]string{ + {cacheTypePostings}, + {}, + }, + call: func(cache storecache.IndexCache) { + cache.FetchMultiSeries(ctx, bID, []storage.SeriesRef{1, 2, 3}, "") + }, + }, "[FetchMultiSeries] should not fallback when all hit on l1": { m1ExpectedCalls: map[string][][]interface{}{ "FetchMultiSeries": {{bID, []storage.SeriesRef{1, 2}}}, @@ -308,14 +420,34 @@ func Test_MultiLevelCache(t *testing.T) { cache.FetchExpandedPostings(ctx, bID, []*labels.Matcher{matcher}, "") }, }, + "[FetchExpandedPostings] m1 doesn't enable expanded postings": { + m1ExpectedCalls: map[string][][]interface{}{}, + m2ExpectedCalls: map[string][][]interface{}{ + "FetchExpandedPostings": {{bID, []*labels.Matcher{matcher}}}, + }, + m1MockedCalls: map[string][]interface{}{}, + m2MockedCalls: map[string][]interface{}{ + "FetchExpandedPostings": {[]byte{}, true}, + }, + enabledItems: [][]string{ + {cacheTypePostings}, + {}, + }, + call: func(cache storecache.IndexCache) { + cache.FetchExpandedPostings(ctx, bID, []*labels.Matcher{matcher}, "") + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + if tc.enabledItems == nil { + tc.enabledItems = [][]string{{}, {}} + } m1 := newMockIndexCache(tc.m1MockedCalls) m2 := newMockIndexCache(tc.m2MockedCalls) reg := prometheus.NewRegistry() - c := newMultiLevelCache(reg, cfg, m1, m2) + c := newMultiLevelCache(reg, cfg, tc.enabledItems, m1, m2) tc.call(c) mlc := c.(*multiLevelCache) // Wait until async operation finishes.