diff --git a/CHANGELOG.md b/CHANGELOG.md index 3de810c69b4..ee22ef45ca4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6163](https://github.com/thanos-io/thanos/pull/6163) Receiver: Add hidden flag `--receive-forward-max-backoff` to configure the max backoff for forwarding requests. - [#5777](https://github.com/thanos-io/thanos/pull/5777) Receive: Allow specifying tenant-specific external labels in Router Ingestor. - [#6352](https://github.com/thanos-io/thanos/pull/6352) Store: Expose store gateway query stats in series response hints. +- [#6420](https://github.com/thanos-io/thanos/pull/6420) Index Cache: Cache expanded postings. ### Fixed - [#6427](https://github.com/thanos-io/thanos/pull/6427) Receive: increasing log level for failed uploads to error diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 559f94f0bca..ec284fc8eec 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -379,6 +379,11 @@ func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []label return map[labels.Label][]byte{}, keys } +func (noopCache) StoreExpandedPostings(_ ulid.ULID, _ []*labels.Matcher, _ []byte) {} +func (noopCache) FetchExpandedPostings(_ context.Context, _ ulid.ULID, _ []*labels.Matcher) ([]byte, bool) { + return []byte{}, false +} + func (noopCache) StoreSeries(ulid.ULID, storage.SeriesRef, []byte) {} func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []storage.SeriesRef) (map[storage.SeriesRef][]byte, []storage.SeriesRef) { return map[storage.SeriesRef][]byte{}, ids @@ -2185,6 +2190,23 @@ func (r *bucketIndexReader) reset() { // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) { + // Sort matchers to make sure we generate the same cache key. + sort.Slice(ms, func(i, j int) bool { + if ms[i].Type == ms[j].Type { + if ms[i].Name == ms[j].Name { + return ms[i].Value < ms[j].Value + } + return ms[i].Name < ms[j].Name + } + return ms[i].Type < ms[j].Type + }) + hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter) + if err != nil { + return nil, err + } + if hit { + return postings, nil + } var ( postingGroups []*postingGroup allRequested = false @@ -2280,18 +2302,29 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M return nil, errors.Wrap(err, "expand") } - // As of version two all series entries are 16 byte padded. All references - // we get have to account for that to get the correct offset. - version, err := r.block.indexHeaderReader.IndexVersion() - if err != nil { - return nil, errors.Wrap(err, "get index version") - } - if version >= 2 { - for i, id := range ps { - ps[i] = id * 16 + // Encode postings to cache. We compress and cache postings before adding + // 16 bytes padding in order to make compressed size smaller. + dataToCache, compressionDuration, compressionErrors, compressedSize := r.encodePostingsToCache(index.NewListPostings(ps), len(ps)) + r.stats.cachedPostingsCompressions++ + r.stats.cachedPostingsCompressionErrors += compressionErrors + r.stats.CachedPostingsCompressionTimeSum += compressionDuration + r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) + r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(ps) * 4) // Estimate the posting list size. + r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache) + + if len(ps) > 0 { + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + version, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return nil, errors.Wrap(err, "get index version") + } + if version >= 2 { + for i, id := range ps { + ps[i] = id * 16 + } } } - return ps, nil } @@ -2408,6 +2441,51 @@ type postingPtr struct { ptr index.Range } +func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) (bool, []storage.SeriesRef, error) { + dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms) + if !hit { + return false, nil, nil + } + if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { + return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err) + } + r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache)) + r.stats.postingsTouched++ + r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache)) + p, closeFns, err := r.decodeCachedPostings(dataFromCache) + defer func() { + for _, closeFn := range closeFns { + closeFn() + } + }() + // If failed to decode or expand cached postings, return and expand postings again. + if err != nil { + level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String()) + return false, nil, nil + } + + ps, err := index.ExpandPostings(p) + if err != nil { + level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String()) + return false, nil, nil + } + + if len(ps) > 0 { + // As of version two all series entries are 16 byte padded. All references + // we get have to account for that to get the correct offset. + version, err := r.block.indexHeaderReader.IndexVersion() + if err != nil { + return false, nil, errors.Wrap(err, "get index version") + } + if version >= 2 { + for i, id := range ps { + ps[i] = id * 16 + } + } + } + return true, ps, nil +} + // fetchPostings fill postings requested by posting groups. // It returns one posting for each key, in the same order. // If postings for given key is not fetched, entry at given index will be nil. @@ -2439,32 +2517,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab r.stats.postingsTouched++ r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(b)) - // Even if this instance is not using compression, there may be compressed - // entries in the cache written by other stores. - var ( - l index.Postings - err error - ) - if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) { - s := time.Now() - clPostings, err := decodePostings(b) - r.stats.cachedPostingsDecompressions += 1 - r.stats.CachedPostingsDecompressionTimeSum += time.Since(s) - if err != nil { - r.stats.cachedPostingsDecompressionErrors += 1 - } else { - closeFns = append(closeFns, clPostings.close) - l = clPostings - } - } else { - _, l, err = r.dec.Postings(b) - } - + l, closer, err := r.decodeCachedPostings(b) if err != nil { return nil, closeFns, errors.Wrap(err, "decode postings") } - output[ix] = l + closeFns = append(closeFns, closer...) continue } @@ -2536,27 +2594,12 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab return err } - dataToCache := pBytes - - compressionTime := time.Duration(0) - compressions, compressionErrors, compressedSize := 0, 0, 0 - // Reencode postings before storing to cache. If that fails, we store original bytes. // This can only fail, if postings data was somehow corrupted, // and there is nothing we can do about it. // Errors from corrupted postings will be reported when postings are used. - compressions++ - s := time.Now() bep := newBigEndianPostings(pBytes[4:]) - data, err := diffVarintSnappyStreamedEncode(bep, bep.length()) - compressionTime = time.Since(s) - if err == nil { - dataToCache = data - compressedSize = len(data) - } else { - compressionErrors = 1 - } - + dataToCache, compressionTime, compressionErrors, compressedSize := r.encodePostingsToCache(bep, bep.length()) r.mtx.Lock() // Return postings and fill LRU cache. // Truncate first 4 bytes which are length of posting. @@ -2567,7 +2610,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes)) - r.stats.cachedPostingsCompressions += compressions + r.stats.cachedPostingsCompressions += 1 r.stats.cachedPostingsCompressionErrors += compressionErrors r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes)) r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize) @@ -2581,6 +2624,47 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab return output, closeFns, g.Wait() } +func (r *bucketIndexReader) decodeCachedPostings(b []byte) (index.Postings, []func(), error) { + // Even if this instance is not using compression, there may be compressed + // entries in the cache written by other stores. + var ( + l index.Postings + err error + closeFns []func() + ) + if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) { + s := time.Now() + clPostings, err := decodePostings(b) + r.stats.cachedPostingsDecompressions += 1 + r.stats.CachedPostingsDecompressionTimeSum += time.Since(s) + if err != nil { + r.stats.cachedPostingsDecompressionErrors += 1 + } else { + closeFns = append(closeFns, clPostings.close) + l = clPostings + } + } else { + _, l, err = r.dec.Postings(b) + } + return l, closeFns, err +} + +func (r *bucketIndexReader) encodePostingsToCache(p index.Postings, length int) ([]byte, time.Duration, int, int) { + var dataToCache []byte + compressionTime := time.Duration(0) + compressionErrors, compressedSize := 0, 0 + s := time.Now() + data, err := diffVarintSnappyStreamedEncode(p, length) + compressionTime = time.Since(s) + if err == nil { + dataToCache = data + compressedSize = len(data) + } else { + compressionErrors = 1 + } + return dataToCache, compressionTime, compressionErrors, compressedSize +} + func resizePostings(b []byte) ([]byte, error) { d := encoding.Decbuf{B: b} n := d.Be32int() diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 9e1fd17ca44..886536fae9e 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -66,6 +66,14 @@ func (c *swappableCache) FetchMultiPostings(ctx context.Context, blockID ulid.UL return c.ptr.FetchMultiPostings(ctx, blockID, keys) } +func (c *swappableCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { + c.ptr.StoreExpandedPostings(blockID, matchers, v) +} + +func (c *swappableCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { + return c.ptr.FetchExpandedPostings(ctx, blockID, matchers) +} + func (c *swappableCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { c.ptr.StoreSeries(blockID, id, v) } diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index c849073c74e..82bf30625ef 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -7,6 +7,7 @@ import ( "context" "encoding/base64" "strconv" + "strings" "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" @@ -15,8 +16,9 @@ import ( ) const ( - cacheTypePostings string = "Postings" - cacheTypeSeries string = "Series" + cacheTypePostings string = "Postings" + cacheTypeExpandedPostings string = "ExpandedPostings" + cacheTypeSeries string = "Series" sliceHeaderSize = 16 ) @@ -38,6 +40,12 @@ type IndexCache interface { // and returns a map containing cache hits, along with a list of missing keys. FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) + // StoreExpandedPostings stores expanded postings for a set of label matchers. + StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) + + // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. + FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) + // StoreSeries stores a single series. StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) @@ -59,6 +67,8 @@ func (c cacheKey) keyType() string { return cacheTypePostings case cacheKeySeries: return cacheTypeSeries + case cacheKeyExpandedPostings: + return cacheTypeExpandedPostings } return "" } @@ -68,6 +78,8 @@ func (c cacheKey) size() uint64 { case cacheKeyPostings: // ULID + 2 slice headers + number of chars in value and name. return ulidSize + 2*sliceHeaderSize + uint64(len(k.Value)+len(k.Name)) + case cacheKeyExpandedPostings: + return ulidSize + sliceHeaderSize + uint64(len(k)) case cacheKeySeries: return ulidSize + 8 // ULID + uint64. } @@ -86,6 +98,16 @@ func (c cacheKey) string() string { key += ":" + c.compression } return key + case cacheKeyExpandedPostings: + // Use cryptographically hash functions to avoid hash collisions + // which would end up in wrong query results. + matchers := c.key.(cacheKeyExpandedPostings) + matchersHash := blake2b.Sum256([]byte(matchers)) + key := "EP:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(matchersHash[0:]) + if len(c.compression) > 0 { + key += ":" + c.compression + } + return key case cacheKeySeries: return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10) default: @@ -93,5 +115,17 @@ func (c cacheKey) string() string { } } +func labelMatchersToString(matchers []*labels.Matcher) string { + sb := strings.Builder{} + for i, lbl := range matchers { + sb.WriteString(lbl.String()) + if i < len(matchers)-1 { + sb.WriteRune(';') + } + } + return sb.String() +} + type cacheKeyPostings labels.Label +type cacheKeyExpandedPostings string // We don't use []*labels.Matcher because it is not a hashable type so fail at inmemory cache. type cacheKeySeries uint64 diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go index 542c5084c34..59adf2b8c7a 100644 --- a/pkg/store/cache/cache_test.go +++ b/pkg/store/cache/cache_test.go @@ -27,6 +27,8 @@ func TestCacheKey_string(t *testing.T) { uid := ulid.MustNew(1, nil) ulidString := uid.String() + matcher := labels.MustNewMatcher(labels.MatchRegexp, "aaa", "bbb") + matcher2 := labels.MustNewMatcher(labels.MatchNotEqual, "foo", "bar") tests := map[string]struct { key cacheKey @@ -41,10 +43,46 @@ func TestCacheKey_string(t *testing.T) { return fmt.Sprintf("P:%s:%s", uid.String(), encodedHash) }(), }, + "postings cache key includes compression scheme": { + key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"}), compressionSchemeStreamedSnappy}, + expected: func() string { + hash := blake2b.Sum256([]byte("foo:bar")) + encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) + + return fmt.Sprintf("P:%s:%s:%s", uid.String(), encodedHash, compressionSchemeStreamedSnappy) + }(), + }, "should stringify series cache key": { key: cacheKey{ulidString, cacheKeySeries(12345), ""}, expected: fmt.Sprintf("S:%s:12345", uid.String()), }, + "should stringify expanded postings cache key": { + key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher})), ""}, + expected: func() string { + hash := blake2b.Sum256([]byte(matcher.String())) + encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) + + return fmt.Sprintf("EP:%s:%s", uid.String(), encodedHash) + }(), + }, + "should stringify expanded postings cache key when multiple matchers": { + key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher, matcher2})), ""}, + expected: func() string { + hash := blake2b.Sum256([]byte(fmt.Sprintf("%s;%s", matcher.String(), matcher2.String()))) + encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) + + return fmt.Sprintf("EP:%s:%s", uid.String(), encodedHash) + }(), + }, + "expanded postings cache key includes compression scheme": { + key: cacheKey{ulidString, cacheKeyExpandedPostings(labelMatchersToString([]*labels.Matcher{matcher})), compressionSchemeStreamedSnappy}, + expected: func() string { + hash := blake2b.Sum256([]byte(matcher.String())) + encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:]) + + return fmt.Sprintf("EP:%s:%s:%s", uid.String(), encodedHash, compressionSchemeStreamedSnappy) + }(), + }, } for testName, testData := range tests { @@ -78,6 +116,21 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) { {ulidString, cacheKeySeries(math.MaxUint64), ""}, }, }, + "should guarantee reasonably short key length for expanded postings": { + expectedLen: 73, + keys: []cacheKey{ + {ulidString, func() interface{} { + matchers := make([]*labels.Matcher, 0, 100) + name := strings.Repeat("a", 100) + value := strings.Repeat("a", 1000) + for i := 0; i < 100; i++ { + t := labels.MatchType(i % 4) + matchers = append(matchers, labels.MustNewMatcher(t, name, value)) + } + return cacheKeyExpandedPostings(labelMatchersToString(matchers)) + }(), ""}, + }, + }, } for testName, testData := range tests { diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index d7ecc608148..8e35f4dca39 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -100,6 +100,7 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, reg prometheus.Registere }, []string{"item_type"}) c.evicted.WithLabelValues(cacheTypePostings) c.evicted.WithLabelValues(cacheTypeSeries) + c.evicted.WithLabelValues(cacheTypeExpandedPostings) c.added = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_items_added_total", @@ -107,6 +108,7 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, reg prometheus.Registere }, []string{"item_type"}) c.added.WithLabelValues(cacheTypePostings) c.added.WithLabelValues(cacheTypeSeries) + c.added.WithLabelValues(cacheTypeExpandedPostings) c.requests = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_requests_total", @@ -114,6 +116,7 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, reg prometheus.Registere }, []string{"item_type"}) c.requests.WithLabelValues(cacheTypePostings) c.requests.WithLabelValues(cacheTypeSeries) + c.requests.WithLabelValues(cacheTypeExpandedPostings) c.overflow = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_items_overflowed_total", @@ -121,6 +124,7 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, reg prometheus.Registere }, []string{"item_type"}) c.overflow.WithLabelValues(cacheTypePostings) c.overflow.WithLabelValues(cacheTypeSeries) + c.overflow.WithLabelValues(cacheTypeExpandedPostings) c.hits = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_hits_total", @@ -128,6 +132,7 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, reg prometheus.Registere }, []string{"item_type"}) c.hits.WithLabelValues(cacheTypePostings) c.hits.WithLabelValues(cacheTypeSeries) + c.hits.WithLabelValues(cacheTypeExpandedPostings) c.current = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_store_index_cache_items", @@ -135,6 +140,7 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, reg prometheus.Registere }, []string{"item_type"}) c.current.WithLabelValues(cacheTypePostings) c.current.WithLabelValues(cacheTypeSeries) + c.current.WithLabelValues(cacheTypeExpandedPostings) c.currentSize = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_store_index_cache_items_size_bytes", @@ -142,6 +148,7 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, reg prometheus.Registere }, []string{"item_type"}) c.currentSize.WithLabelValues(cacheTypePostings) c.currentSize.WithLabelValues(cacheTypeSeries) + c.currentSize.WithLabelValues(cacheTypeExpandedPostings) c.totalCurrentSize = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_store_index_cache_total_size_bytes", @@ -149,6 +156,7 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, reg prometheus.Registere }, []string{"item_type"}) c.totalCurrentSize.WithLabelValues(cacheTypePostings) c.totalCurrentSize.WithLabelValues(cacheTypeSeries) + c.totalCurrentSize.WithLabelValues(cacheTypeExpandedPostings) _ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ Name: "thanos_store_index_cache_max_size_bytes", @@ -184,10 +192,10 @@ func (c *InMemoryIndexCache) onEvict(key, val interface{}) { k := key.(cacheKey).keyType() entrySize := sliceHeaderSize + uint64(len(val.([]byte))) - c.evicted.WithLabelValues(string(k)).Inc() - c.current.WithLabelValues(string(k)).Dec() - c.currentSize.WithLabelValues(string(k)).Sub(float64(entrySize)) - c.totalCurrentSize.WithLabelValues(string(k)).Sub(float64(entrySize + key.(cacheKey).size())) + c.evicted.WithLabelValues(k).Inc() + c.current.WithLabelValues(k).Dec() + c.currentSize.WithLabelValues(k).Sub(float64(entrySize)) + c.totalCurrentSize.WithLabelValues(k).Sub(float64(entrySize + key.(cacheKey).size())) c.curSize -= entrySize } @@ -311,6 +319,19 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid. return hits, misses } +// StoreExpandedPostings stores expanded postings for a set of label matchers. +func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { + c.set(cacheTypeExpandedPostings, cacheKey{block: blockID.String(), key: cacheKeyExpandedPostings(labelMatchersToString(matchers))}, v) +} + +// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. +func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { + if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}); ok { + return b, true + } + return nil, false +} + // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { diff --git a/pkg/store/cache/inmemory_test.go b/pkg/store/cache/inmemory_test.go index d97d125cf80..205a639be16 100644 --- a/pkg/store/cache/inmemory_test.go +++ b/pkg/store/cache/inmemory_test.go @@ -122,6 +122,7 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { uid := func(id storage.SeriesRef) ulid.ULID { return ulid.MustNew(uint64(id), nil) } lbl := labels.Label{Name: "foo", Value: "bar"} + matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") ctx := context.Background() for _, tt := range []struct { @@ -149,6 +150,15 @@ func TestInMemoryIndexCache_UpdateItem(t *testing.T) { return b, ok }, }, + { + typ: cacheTypeExpandedPostings, + set: func(id storage.SeriesRef, b []byte) { + cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b) + }, + get: func(id storage.SeriesRef) ([]byte, bool) { + return cache.FetchExpandedPostings(ctx, uid(id), []*labels.Matcher{matcher}) + }, + }, } { t.Run(tt.typ, func(t *testing.T) { defer func() { errorLogs = nil }() diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index 16a5b92cec8..b80d2d98946 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -34,10 +34,12 @@ type RemoteIndexCache struct { compressionScheme string // Metrics. - postingRequests prometheus.Counter - seriesRequests prometheus.Counter - postingHits prometheus.Counter - seriesHits prometheus.Counter + postingRequests prometheus.Counter + seriesRequests prometheus.Counter + expandedPostingRequests prometheus.Counter + postingHits prometheus.Counter + seriesHits prometheus.Counter + expandedPostingHits prometheus.Counter } // NewRemoteIndexCache makes a new RemoteIndexCache. @@ -54,6 +56,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli }, []string{"item_type"}) c.postingRequests = requests.WithLabelValues(cacheTypePostings) c.seriesRequests = requests.WithLabelValues(cacheTypeSeries) + c.expandedPostingRequests = requests.WithLabelValues(cacheTypeExpandedPostings) hits := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_store_index_cache_hits_total", @@ -61,6 +64,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli }, []string{"item_type"}) c.postingHits = hits.WithLabelValues(cacheTypePostings) c.seriesHits = hits.WithLabelValues(cacheTypeSeries) + c.expandedPostingHits = hits.WithLabelValues(cacheTypeExpandedPostings) level.Info(logger).Log("msg", "created index cache") @@ -115,6 +119,36 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. return hits, misses } +// StoreExpandedPostings sets the postings identified by the ulid and label to the value v. +// The function enqueues the request and returns immediately: the entry will be +// asynchronously stored in the cache. +func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte) { + key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), c.compressionScheme}.string() + + if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { + level.Error(c.logger).Log("msg", "failed to cache expanded postings in memcached", "err", err) + } +} + +// FetchExpandedPostings fetches multiple postings - each identified by a label - +// and returns a map containing cache hits, along with a list of missing keys. +// In case of error, it logs and return an empty cache hits map. +func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher) ([]byte, bool) { + key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(lbls)), c.compressionScheme}.string() + + // Fetch the keys from memcached in a single request. + c.expandedPostingRequests.Add(1) + results := c.memcached.GetMulti(ctx, []string{key}) + if len(results) == 0 { + return nil, false + } + if res, ok := results[key]; ok { + c.expandedPostingHits.Add(1) + return res, true + } + return nil, false +} + // StoreSeries sets the series identified by the ulid and id to the value v. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. diff --git a/pkg/store/cache/memcached_test.go b/pkg/store/cache/memcached_test.go index 4911f77845a..47249aa5e90 100644 --- a/pkg/store/cache/memcached_test.go +++ b/pkg/store/cache/memcached_test.go @@ -109,6 +109,93 @@ func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) { } } +func TestMemcachedIndexCache_FetchExpandedPostings(t *testing.T) { + t.Parallel() + + // Init some data to conveniently define test cases later one. + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + matcher1 := labels.MustNewMatcher(labels.MatchEqual, "cluster", "us") + matcher2 := labels.MustNewMatcher(labels.MatchEqual, "job", "thanos") + matcher3 := labels.MustNewMatcher(labels.MatchRegexp, "__name__", "up") + value1 := []byte{1} + value2 := []byte{2} + + tests := map[string]struct { + setup []mockedExpandedPostings + mockedErr error + fetchBlockID ulid.ULID + fetchMatchers []*labels.Matcher + expectedHit bool + expectedValue []byte + }{ + "should return no hits on empty cache": { + setup: []mockedExpandedPostings{}, + fetchBlockID: block1, + fetchMatchers: []*labels.Matcher{matcher1, matcher2}, + expectedHit: false, + }, + "should return no misses on 100% hit ratio": { + setup: []mockedExpandedPostings{ + {block: block1, matchers: []*labels.Matcher{matcher1}, value: value1}, + }, + fetchBlockID: block1, + fetchMatchers: []*labels.Matcher{matcher1}, + expectedHit: true, + expectedValue: value1, + }, + "Cache miss when matchers key doesn't match": { + setup: []mockedExpandedPostings{ + {block: block1, matchers: []*labels.Matcher{matcher1}, value: value1}, + {block: block2, matchers: []*labels.Matcher{matcher2}, value: value2}, + }, + fetchBlockID: block1, + fetchMatchers: []*labels.Matcher{matcher1, matcher2}, + expectedHit: false, + }, + "should return no hits on memcached error": { + setup: []mockedExpandedPostings{ + {block: block1, matchers: []*labels.Matcher{matcher3}, value: value1}, + }, + mockedErr: errors.New("mocked error"), + fetchBlockID: block1, + fetchMatchers: []*labels.Matcher{matcher3}, + expectedHit: false, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + memcached := newMockedMemcachedClient(testData.mockedErr) + c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil) + testutil.Ok(t, err) + + // Store the postings expected before running the test. + ctx := context.Background() + for _, p := range testData.setup { + c.StoreExpandedPostings(p.block, p.matchers, p.value) + } + + // Fetch postings from cached and assert on it. + val, hit := c.FetchExpandedPostings(ctx, testData.fetchBlockID, testData.fetchMatchers) + testutil.Equals(t, testData.expectedHit, hit) + if hit { + testutil.Equals(t, testData.expectedValue, val) + } + + // Assert on metrics. + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.expandedPostingRequests)) + if testData.expectedHit { + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.expandedPostingHits)) + } + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.postingRequests)) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.postingHits)) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.seriesRequests)) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.seriesHits)) + }) + } +} + func TestMemcachedIndexCache_FetchMultiSeries(t *testing.T) { t.Parallel() @@ -204,6 +291,12 @@ type mockedPostings struct { value []byte } +type mockedExpandedPostings struct { + block ulid.ULID + matchers []*labels.Matcher + value []byte +} + type mockedSeries struct { block ulid.ULID id storage.SeriesRef diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index a4d8bb4c4f2..63b6830a90a 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -458,7 +458,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { } // Crank down the deletion mark delay since deduplication can miss blocks in the presence of replica labels it doesn't know about. - str := e2ethanos.NewStoreGW(e, "1", bktConfig, "", []string{"--ignore-deletion-marks-delay=2s"}) + str := e2ethanos.NewStoreGW(e, "1", bktConfig, "", "", []string{"--ignore-deletion-marks-delay=2s"}) testutil.Ok(t, e2e.StartAndWaitReady(str)) testutil.Ok(t, str.WaitSumMetrics(e2emon.Equals(float64(len(rawBlockIDs)+8)), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 2053421ef6f..9dfcf7f95ef 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -846,7 +846,7 @@ receivers: })), "http") } -func NewStoreGW(e e2e.Environment, name string, bucketConfig client.BucketConfig, cacheConfig string, extArgs []string, relabelConfig ...relabel.Config) *e2emon.InstrumentedRunnable { +func NewStoreGW(e e2e.Environment, name string, bucketConfig client.BucketConfig, cacheConfig, indexCacheConfig string, extArgs []string, relabelConfig ...relabel.Config) *e2emon.InstrumentedRunnable { f := e.Runnable(fmt.Sprintf("store-gw-%v", name)). WithPorts(map[string]int{"http": 8080, "grpc": 9091}). Future() @@ -885,6 +885,10 @@ func NewStoreGW(e e2e.Environment, name string, bucketConfig client.BucketConfig args = append(args, "--store.caching-bucket.config", cacheConfig) } + if indexCacheConfig != "" { + args = append(args, "--index-cache.config", indexCacheConfig) + } + return e2emon.AsInstrumented(f.Init(wrapWithDefaults(e2e.StartOptions{ Image: DefaultImage(), Command: e2e.NewCommand("store", args...), diff --git a/test/e2e/info_api_test.go b/test/e2e/info_api_test.go index aa546feb0ea..ef36ed23afa 100644 --- a/test/e2e/info_api_test.go +++ b/test/e2e/info_api_test.go @@ -48,6 +48,7 @@ func TestInfo(t *testing.T) { Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, "", + "", nil, ) testutil.Ok(t, e2e.StartAndWaitReady(store)) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index d74c49cc483..c85cf81b58c 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -645,6 +645,7 @@ func TestQueryStoreMetrics(t *testing.T) { Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("http"), minio.InternalDir()), }, "", + "", nil, ) @@ -779,6 +780,7 @@ func TestSidecarStorePushdown(t *testing.T) { Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, "", + "", nil, ) testutil.Ok(t, e2e.StartAndWaitReady(s1)) @@ -882,6 +884,7 @@ func TestQueryStoreDedup(t *testing.T) { Config: e2ethanos.NewS3Config(bucket, minio.InternalEndpoint("http"), minio.InternalDir()), }, "", + "", nil, ) testutil.Ok(t, e2e.StartAndWaitReady(storeGW)) diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 3baf94abdc8..f409d9a437d 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -72,6 +72,7 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, memcachedConfig, + "", nil, relabel.Config{ Action: relabel.Drop, @@ -340,6 +341,7 @@ func TestStoreGatewayNoCacheFile(t *testing.T) { Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, "", + "", []string{"--no-cache-index-header"}, relabel.Config{ Action: relabel.Drop, @@ -571,6 +573,7 @@ blocks_iter_ttl: 0s`, memcached.InternalEndpoint("memcached")) Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, memcachedConfig, + "", nil, ) testutil.Ok(t, e2e.StartAndWaitReady(s1)) @@ -679,6 +682,7 @@ metafile_content_ttl: 0s` Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, fmt.Sprintf(groupcacheConfig, 1), + "", nil, ) store2 := e2ethanos.NewStoreGW( @@ -689,6 +693,7 @@ metafile_content_ttl: 0s` Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, fmt.Sprintf(groupcacheConfig, 2), + "", nil, ) store3 := e2ethanos.NewStoreGW( @@ -699,6 +704,7 @@ metafile_content_ttl: 0s` Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, fmt.Sprintf(groupcacheConfig, 3), + "", nil, ) testutil.Ok(t, e2e.StartAndWaitReady(store1, store2, store3)) @@ -795,6 +801,7 @@ config: Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, string(cacheCfg), + "", []string{"--store.grpc.downloaded-bytes-limit=1B"}, ) @@ -806,6 +813,7 @@ config: Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, string(cacheCfg), + "", []string{"--store.grpc.downloaded-bytes-limit=100B"}, ) store3 := e2ethanos.NewStoreGW( @@ -816,6 +824,7 @@ config: Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), }, string(cacheCfg), + "", []string{"--store.grpc.downloaded-bytes-limit=196627B"}, ) @@ -921,3 +930,108 @@ func TestRedisClient_Rueidis(t *testing.T) { testutil.Equals(t, 1, len(returnedVals)) testutil.Equals(t, []byte("bar"), returnedVals["foo"]) } + +func TestStoreGatewayMemcachedIndexCacheExpandedPostings(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("memcached-exp") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + const bucket = "store-gateway-memcached-index-cache-expanded-postings-test" + m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + memcached := e2ethanos.NewMemcached(e, "1") + testutil.Ok(t, e2e.StartAndWaitReady(memcached)) + + indexCacheConfig := fmt.Sprintf(`type: MEMCACHED +config: + addresses: [%s] + max_async_concurrency: 10 + dns_provider_update_interval: 1s + auto_discovery: false`, memcached.InternalEndpoint("memcached")) + + s1 := e2ethanos.NewStoreGW( + e, + "1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), + }, + "", + indexCacheConfig, + nil, + ) + testutil.Ok(t, e2e.StartAndWaitReady(s1)) + + q := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + series := []labels.Labels{labels.FromStrings("a", "1", "b", "2")} + extLset := labels.FromStrings("ext1", "value1", "replica", "1") + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + now := time.Now() + id, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) + testutil.Ok(t, err) + + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, + e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed") + testutil.Ok(t, err) + + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String())) + + // Wait for store to sync blocks. + // thanos_blocks_meta_synced: 1x loadedMeta 0x labelExcludedMeta 0x TooFreshMeta. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) + + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) + + t.Run("query with cache miss", func(t *testing.T) { + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "1", + }, + }, + ) + + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_requests_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + }) + + t.Run("query with cache hit", func(t *testing.T) { + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "1", + }, + }, + ) + + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{`thanos_store_index_cache_requests_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{`thanos_store_index_cache_hits_total`}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "item_type", "ExpandedPostings")))) + }) +}