Skip to content

Commit

Permalink
Improve async processor handling enabled items, optimize code further (
Browse files Browse the repository at this point in the history
…#5686)

* improve async processor handling enabled items, optimize code further

Signed-off-by: Ben Ye <benye@amazon.com>

* update docs

Signed-off-by: Ben Ye <benye@amazon.com>

* fix mod

Signed-off-by: Ben Ye <benye@amazon.com>

* avoid enqueing empty values

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored Nov 30, 2023
1 parent e85a331 commit aa5aca5
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
* [ENHANCEMENT] Index Cache: Multi level cache adds config `max_backfill_items` to cap max items to backfill per async operation. #5686

## 1.16.0 2023-11-20

Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,10 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

# The maximum number of items to backfill per asynchronous operation.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-backfill-items
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,10 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

# The maximum number of items to backfill per asynchronous operation.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-backfill-items
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,10 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

# The maximum number of items to backfill per asynchronous operation.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-backfill-items
[max_backfill_items: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
github.com/VictoriaMetrics/fastcache v1.12.1
github.com/cespare/xxhash/v2 v2.2.0
github.com/google/go-cmp v0.6.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
google.golang.org/protobuf v1.31.0
)

Expand Down Expand Up @@ -217,7 +218,6 @@ require (
go4.org/intern v0.0.0-20230525184215-6c62f75575cb // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
golang.org/x/sys v0.14.0 // indirect
Expand Down
21 changes: 11 additions & 10 deletions pkg/storage/tsdb/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
errNoIndexCacheAddresses = errors.New("no index cache backend addresses")
errInvalidMaxAsyncConcurrency = errors.New("invalid max_async_concurrency, must greater than 0")
errInvalidMaxAsyncBufferSize = errors.New("invalid max_async_buffer_size, must greater than 0")
errInvalidMaxBackfillItems = errors.New("invalid max_backfill_items, must greater than 0")
)

type IndexCacheConfig struct {
Expand Down Expand Up @@ -114,6 +115,7 @@ func (cfg *IndexCacheConfig) Validate() error {
type MultiLevelIndexCacheConfig struct {
MaxAsyncConcurrency int `yaml:"max_async_concurrency"`
MaxAsyncBufferSize int `yaml:"max_async_buffer_size"`
MaxBackfillItems int `yaml:"max_backfill_items"`
}

func (cfg *MultiLevelIndexCacheConfig) Validate() error {
Expand All @@ -123,12 +125,16 @@ func (cfg *MultiLevelIndexCacheConfig) Validate() error {
if cfg.MaxAsyncConcurrency <= 0 {
return errInvalidMaxAsyncConcurrency
}
if cfg.MaxBackfillItems <= 0 {
return errInvalidMaxBackfillItems
}
return nil
}

func (cfg *MultiLevelIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 50, "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.")
f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.")
f.IntVar(&cfg.MaxBackfillItems, prefix+"max-backfill-items", 10000, "The maximum number of items to backfill per asynchronous operation.")
}

type InMemoryIndexCacheConfig struct {
Expand Down Expand Up @@ -187,7 +193,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
splitBackends := strings.Split(cfg.Backend, ",")
var (
caches []storecache.IndexCache
enabledItems []string
enabledItems [][]string
)

for i, backend := range splitBackends {
Expand All @@ -205,7 +211,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
return c, err
}
caches = append(caches, c)
enabledItems = cfg.InMemory.EnabledItems
enabledItems = append(enabledItems, cfg.InMemory.EnabledItems)
case IndexCacheBackendMemcached:
c, err := newMemcachedIndexCacheClient(cfg.Memcached.ClientConfig, logger, registerer)
if err != nil {
Expand All @@ -217,7 +223,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
return nil, err
}
caches = append(caches, cache)
enabledItems = cfg.Memcached.EnabledItems
enabledItems = append(enabledItems, cfg.Memcached.EnabledItems)
case IndexCacheBackendRedis:
c, err := newRedisIndexCacheClient(cfg.Redis.ClientConfig, logger, iReg)
if err != nil {
Expand All @@ -229,18 +235,13 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
return nil, err
}
caches = append(caches, cache)
enabledItems = cfg.Redis.EnabledItems
enabledItems = append(enabledItems, cfg.Redis.EnabledItems)
default:
return nil, errUnsupportedIndexCacheBackend
}
if len(enabledItems) > 0 {
latestCache := caches[len(caches)-1]
cache := storecache.NewFilteredIndexCache(latestCache, enabledItems)
caches[len(caches)-1] = cache
}
}

return newMultiLevelCache(registerer, cfg.MultiLevel, caches...), nil
return newMultiLevelCache(registerer, cfg.MultiLevel, enabledItems, caches...), nil
}

func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
Expand Down
126 changes: 84 additions & 42 deletions pkg/storage/tsdb/multilevel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/cacheutil"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"golang.org/x/exp/slices"
)

const (
Expand All @@ -21,18 +22,22 @@ const (
)

type multiLevelCache struct {
caches []storecache.IndexCache
postingsCaches, seriesCaches, expandedPostingCaches []storecache.IndexCache

fetchLatency *prometheus.HistogramVec
backFillLatency *prometheus.HistogramVec
backfillProcessor *cacheutil.AsyncOperationProcessor
backfillDroppedItems *prometheus.CounterVec
fetchLatency *prometheus.HistogramVec
backFillLatency *prometheus.HistogramVec
backfillProcessor *cacheutil.AsyncOperationProcessor
backfillDroppedPostings prometheus.Counter
backfillDroppedSeries prometheus.Counter
backfillDroppedExpandedPostings prometheus.Counter

maxBackfillItems int
}

func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
wg := sync.WaitGroup{}
wg.Add(len(m.caches))
for _, c := range m.caches {
wg.Add(len(m.postingsCaches))
for _, c := range m.postingsCaches {
cache := c
go func() {
defer wg.Done()
Expand All @@ -48,9 +53,9 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U

misses = keys
hits = map[labels.Label][]byte{}
backfillItems := make([]map[labels.Label][]byte, len(m.caches)-1)
for i, c := range m.caches {
if i < len(m.caches)-1 {
backfillItems := make([]map[labels.Label][]byte, len(m.postingsCaches)-1)
for i, c := range m.postingsCaches {
if i < len(m.postingsCaches)-1 {
backfillItems[i] = map[labels.Label][]byte{}
}
if ctx.Err() != nil {
Expand All @@ -76,14 +81,23 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings))
defer backFillTimer.ObserveDuration()
for i, values := range backfillItems {
for lbl, b := range values {
lbl := lbl
b := b
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StorePostings(blockID, lbl, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc()
i := i
values := values
if len(values) == 0 {
continue
}
if err := m.backfillProcessor.EnqueueAsync(func() {
cnt := 0
for lbl, b := range values {
m.postingsCaches[i].StorePostings(blockID, lbl, b, tenant)
cnt++
if cnt == m.maxBackfillItems {
m.backfillDroppedPostings.Add(float64(len(values) - cnt))
return
}
}
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedPostings.Add(float64(len(values)))
}
}
}()
Expand All @@ -93,8 +107,8 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U

func (m *multiLevelCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
wg := sync.WaitGroup{}
wg.Add(len(m.caches))
for _, c := range m.caches {
wg.Add(len(m.expandedPostingCaches))
for _, c := range m.expandedPostingCaches {
cache := c
go func() {
defer wg.Done()
Expand All @@ -108,17 +122,17 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues(cacheTypeExpandedPostings))
defer timer.ObserveDuration()

for i, c := range m.caches {
for i, c := range m.expandedPostingCaches {
if ctx.Err() != nil {
return nil, false
}
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h {
if i > 0 {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeExpandedPostings))
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
m.expandedPostingCaches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypeExpandedPostings).Inc()
m.backfillDroppedExpandedPostings.Inc()
}
backFillTimer.ObserveDuration()
}
Expand All @@ -131,8 +145,8 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli

func (m *multiLevelCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
wg := sync.WaitGroup{}
wg.Add(len(m.caches))
for _, c := range m.caches {
wg.Add(len(m.seriesCaches))
for _, c := range m.seriesCaches {
cache := c
go func() {
defer wg.Done()
Expand All @@ -148,10 +162,10 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI

misses = ids
hits = map[storage.SeriesRef][]byte{}
backfillItems := make([]map[storage.SeriesRef][]byte, len(m.caches)-1)
backfillItems := make([]map[storage.SeriesRef][]byte, len(m.seriesCaches)-1)

for i, c := range m.caches {
if i < len(m.caches)-1 {
for i, c := range m.seriesCaches {
if i < len(m.seriesCaches)-1 {
backfillItems[i] = map[storage.SeriesRef][]byte{}
}
if ctx.Err() != nil {
Expand All @@ -177,29 +191,57 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries))
defer backFillTimer.ObserveDuration()
for i, values := range backfillItems {
for ref, b := range values {
ref := ref
b := b
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StoreSeries(blockID, ref, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc()
i := i
values := values
if len(values) == 0 {
continue
}
if err := m.backfillProcessor.EnqueueAsync(func() {
cnt := 0
for ref, b := range values {
m.seriesCaches[i].StoreSeries(blockID, ref, b, tenant)
cnt++
if cnt == m.maxBackfillItems {
m.backfillDroppedSeries.Add(float64(len(values) - cnt))
return
}
}
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedSeries.Add(float64(len(values)))
}
}
}()

return hits, misses
}

func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfig, c ...storecache.IndexCache) storecache.IndexCache {
func filterCachesByItem(enabledItems [][]string, cachedItem string, c ...storecache.IndexCache) []storecache.IndexCache {
filteredCaches := make([]storecache.IndexCache, 0, len(c))
for i := range enabledItems {
if len(enabledItems[i]) == 0 || slices.Contains(enabledItems[i], cachedItem) {
filteredCaches = append(filteredCaches, c[i])
}
}
return filteredCaches
}

func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfig, enabledItems [][]string, c ...storecache.IndexCache) storecache.IndexCache {
if len(c) == 1 {
return c[0]
if len(enabledItems[0]) == 0 {
return c[0]
}
return storecache.NewFilteredIndexCache(c[0], enabledItems[0])
}

backfillDroppedItems := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_store_multilevel_index_cache_backfill_dropped_items_total",
Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ",
}, []string{"item_type"})
return &multiLevelCache{
caches: c,
backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency),
postingsCaches: filterCachesByItem(enabledItems, cacheTypePostings, c...),
seriesCaches: filterCachesByItem(enabledItems, cacheTypeSeries, c...),
expandedPostingCaches: filterCachesByItem(enabledItems, cacheTypeExpandedPostings, c...),
backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency),
fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds",
Help: "Histogram to track latency to fetch items from multi level index cache",
Expand All @@ -210,9 +252,9 @@ func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfi
Help: "Histogram to track latency to backfill items from multi level index cache",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
}, []string{"item_type"}),
backfillDroppedItems: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_store_multilevel_index_cache_backfill_dropped_items_total",
Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ",
}, []string{"item_type"}),
backfillDroppedPostings: backfillDroppedItems.WithLabelValues(cacheTypePostings),
backfillDroppedSeries: backfillDroppedItems.WithLabelValues(cacheTypeSeries),
backfillDroppedExpandedPostings: backfillDroppedItems.WithLabelValues(cacheTypeExpandedPostings),
maxBackfillItems: cfg.MaxBackfillItems,
}
}
Loading

0 comments on commit aa5aca5

Please sign in to comment.