Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Fixed binary header bug that was causing all postings to be kept in memory instead of 1/32 as we meant. #2390

Merged
merged 4 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2208](https://github.com/thanos-io/thanos/pull/2208) Query and Rule: fix handling of `web.route-prefix` to correctly handle `/` and prefixes that do not begin with a `/`.
- [#2311](https://github.com/thanos-io/thanos/pull/2311) Receive: ensure receive component serves TLS when TLS configuration is provided.
- [#2319](https://github.com/thanos-io/thanos/pull/2319) Query: fixed inconsistent naming of metrics.
- [#2390](https://github.com/thanos-io/thanos/pull/2390) Store: Fixed bug which was causing all posting offsets to be used instead of 1/32 as it was meant.
Added hidden flag to control this behavior.

### Added

Expand Down
9 changes: 9 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main

import (
"context"
"fmt"
"path"
"time"

Expand Down Expand Up @@ -86,6 +87,11 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
disableIndexHeader := cmd.Flag("store.disable-index-header", "If specified, Store Gateway will use index-cache.json for each block instead of recreating binary index-header").
Hidden().Default("false").Bool()

postingOffsetsInMemSampling := cmd.Flag("store.index-header-posting-offsets-in-mem-sampling", "Controls what is the ratio of postings offsets store will hold in memory. "+
"Larger value will keep less offsets, which will increase CPU cycles needed for query touching those postings. It's meant for setups that want low baseline memory pressure and where less traffic is expected. "+
"On the contrary, smaller value will increase baseline memory usage, but improve latency slightly. 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance. This works only when --store.disable-index-header is NOT specified.").
Hidden().Default(fmt.Sprintf("%v", store.DefaultPostingOffsetInMemorySampling)).Int()

enablePostingsCompression := cmd.Flag("experimental.enable-index-cache-postings-compression", "If true, Store Gateway will reencode and compress postings before storing them into cache. Compressed postings take about 10% of the original size.").
Hidden().Default("false").Bool()

Expand Down Expand Up @@ -142,6 +148,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
time.Duration(*ignoreDeletionMarksDelay),
*webExternalPrefix,
*webPrefixHeaderName,
*postingOffsetsInMemSampling,
)
}
}
Expand Down Expand Up @@ -171,6 +178,7 @@ func runStore(
consistencyDelay time.Duration,
ignoreDeletionMarksDelay time.Duration,
externalPrefix, prefixHeader string,
postingOffsetsInMemSampling int,
) error {
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
Expand Down Expand Up @@ -275,6 +283,7 @@ func runStore(
advertiseCompatibilityLabel,
!disableIndexHeader,
enablePostingsCompression,
postingOffsetsInMemSampling,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
46 changes: 26 additions & 20 deletions pkg/block/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ const (
// MagicIndex are 4 bytes at the head of an index-header file.
MagicIndex = 0xBAAAD792

symbolFactor = 32

postingLengthFieldSize = 4
)

Expand Down Expand Up @@ -428,11 +426,12 @@ type BinaryReader struct {
c io.Closer

// Map of LabelName to a list of some LabelValues's position in the offset table.
// The first and last values for each name are always present.
// The first and last values for each name are always present, we keep only 1/postingOffsetsInMemSampling of the rest.
postings map[string]*postingValueOffsets
// For the v1 format, labelname -> labelvalue -> offset.
postingsV1 map[string]map[string]index.Range

// Symbols struct that keeps only 1/postingOffsetsInMemSampling in the memory, then looks up the rest via mmap.
symbols *index.Symbols
nameSymbols map[uint32]string // Cache of the label name symbol lookups,
// as there are not many and they are half of all lookups.
Expand All @@ -442,12 +441,14 @@ type BinaryReader struct {
version int
indexVersion int
indexLastPostingEnd int64

postingOffsetsInMemSampling int
}

// NewBinaryReader loads or builds new index-header if not present on disk.
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*BinaryReader, error) {
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (*BinaryReader, error) {
binfn := filepath.Join(dir, id.String(), block.IndexHeaderFilename)
br, err := newFileBinaryReader(binfn)
br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling)
if err == nil {
return br, nil
}
Expand All @@ -460,11 +461,10 @@ func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bucket
}

level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start))

return newFileBinaryReader(binfn)
return newFileBinaryReader(binfn, postingOffsetsInMemSampling)
}

func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *BinaryReader, err error) {
f, err := fileutil.OpenMmapFile(path)
if err != nil {
return nil, err
Expand All @@ -476,9 +476,10 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
}()

r := &BinaryReader{
b: realByteSlice(f.Bytes()),
c: f,
postings: map[string]*postingValueOffsets{},
b: realByteSlice(f.Bytes()),
c: f,
postings: map[string]*postingValueOffsets{},
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
}

// Verify header.
Expand All @@ -502,6 +503,7 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
return nil, errors.Wrap(err, "read index header TOC")
}

// TODO(bwplotka): Consider contributing to Prometheus to allow specifying custom number for symbolsFactor.
r.symbols, err = index.NewSymbols(r.b, r.indexVersion, int(r.toc.Symbols))
if err != nil {
return nil, errors.Wrap(err, "read symbols")
Expand Down Expand Up @@ -551,11 +553,12 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
}

if _, ok := r.postings[key[0]]; !ok {
// Next label name.
// Not seen before label name.
r.postings[key[0]] = &postingValueOffsets{}
if lastKey != nil {
if valueCount%symbolFactor != 0 {
// Always include last value for each label name.
// Always include last value for each label name, unless it was just added in previous iteration based
// on valueCount.
if (valueCount-1)%postingOffsetsInMemSampling != 0 {
r.postings[lastKey[0]].offsets = append(r.postings[lastKey[0]].offsets, postingOffset{value: lastKey[1], tableOff: lastTableOff})
}
r.postings[lastKey[0]].lastValOffset = int64(off - crc32.Size)
Expand All @@ -565,21 +568,24 @@ func newFileBinaryReader(path string) (bw *BinaryReader, err error) {
}

lastKey = key
if valueCount%symbolFactor == 0 {
lastTableOff = tableOff
valueCount++

if (valueCount-1)%postingOffsetsInMemSampling == 0 {
r.postings[key[0]].offsets = append(r.postings[key[0]].offsets, postingOffset{value: key[1], tableOff: tableOff})
return nil
}

lastTableOff = tableOff
valueCount++
return nil
}); err != nil {
return nil, errors.Wrap(err, "read postings table")
}
if lastKey != nil {
if valueCount%symbolFactor != 0 {
if (valueCount-1)%postingOffsetsInMemSampling != 0 {
// Always include last value for each label name if not included already based on valueCount.
r.postings[lastKey[0]].offsets = append(r.postings[lastKey[0]].offsets, postingOffset{value: lastKey[1], tableOff: lastTableOff})
}
// In any case lastValOffset is unknown as don't have next posting anymore. Guess from TOC table.
// In worst case we will overfetch a few bytes.
r.postings[lastKey[0]].lastValOffset = r.indexLastPostingEnd - crc32.Size
}
// Trim any extra space in the slices.
Expand Down Expand Up @@ -787,7 +793,7 @@ func (r BinaryReader) LabelValues(name string) ([]string, error) {
if len(e.offsets) == 0 {
return nil, nil
}
values := make([]string, 0, len(e.offsets)*symbolFactor)
values := make([]string, 0, len(e.offsets)*r.postingOffsetsInMemSampling)

d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil)
d.Skip(e.offsets[0].tableOff)
Expand Down
46 changes: 38 additions & 8 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ func TestReaders(t *testing.T) {
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "a", Value: "5"}},
{{Name: "a", Value: "6"}},
{{Name: "a", Value: "7"}},
{{Name: "a", Value: "8"}},
{{Name: "a", Value: "9"}},
// Missing 10 on purpose.
{{Name: "a", Value: "11"}},
{{Name: "a", Value: "12"}},
{{Name: "a", Value: "13"}},
{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124)
testutil.Ok(t, err)
Expand Down Expand Up @@ -92,18 +101,37 @@ func TestReaders(t *testing.T) {
fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename)
testutil.Ok(t, WriteBinary(ctx, bkt, id, fn))

br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id)
br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3)
testutil.Ok(t, err)

defer func() { testutil.Ok(t, br.Close()) }()

if id == id1 {
testutil.Equals(t, 1, br.version)
testutil.Equals(t, 2, br.indexVersion)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 50}, br.toc)
testutil.Equals(t, int64(330), br.indexLastPostingEnd)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 69}, br.toc)
testutil.Equals(t, int64(666), br.indexLastPostingEnd)
testutil.Equals(t, 8, br.symbols.Size())
testutil.Equals(t, 3, len(br.postings))
testutil.Equals(t, map[string]*postingValueOffsets{
"": {
offsets: []postingOffset{{value: "", tableOff: 4}},
lastValOffset: 416,
},
"a": {
offsets: []postingOffset{
{value: "1", tableOff: 9},
{value: "13", tableOff: 32},
{value: "4", tableOff: 54},
{value: "7", tableOff: 75},
{value: "9", tableOff: 89},
},
lastValOffset: 612,
},
"longer-string": {
offsets: []postingOffset{{value: "1", tableOff: 96}},
lastValOffset: 662,
},
}, br.postings)
testutil.Equals(t, 0, len(br.postingsV1))
testutil.Equals(t, 2, len(br.nameSymbols))
}
Expand All @@ -121,9 +149,9 @@ func TestReaders(t *testing.T) {
defer func() { testutil.Ok(t, jr.Close()) }()

if id == id1 {
testutil.Equals(t, 6, len(jr.symbols))
testutil.Equals(t, 14, len(jr.symbols))
testutil.Equals(t, 2, len(jr.lvals))
testutil.Equals(t, 6, len(jr.postings))
testutil.Equals(t, 14, len(jr.postings))
}

compareIndexToHeader(t, b, jr)
Expand Down Expand Up @@ -224,12 +252,14 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].Start, ptr.Start)
testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].End, ptr.End)

// Check not existing.
vals, err := indexReader.LabelValues("not-existing")
testutil.Ok(t, err)
testutil.Equals(t, []string(nil), vals)

_, err = headerReader.PostingsOffset("not-existing", "1")
testutil.NotOk(t, err)
_, err = headerReader.PostingsOffset("a", "10")
testutil.NotOk(t, err)
}

func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *metadata.Meta {
Expand Down Expand Up @@ -392,7 +422,7 @@ func BenchmarkBinaryReader(t *testing.B) {

t.ResetTimer()
for i := 0; i < t.N; i++ {
br, err := newFileBinaryReader(fn)
br, err := newFileBinaryReader(fn, 32)
testutil.Ok(t, err)
testutil.Ok(t, br.Close())
}
Expand Down
28 changes: 17 additions & 11 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ const (
// because you barely get any improvements in compression when the number of samples is beyond this.
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
maxSamplesPerChunk = 120

maxChunkSize = 16000

maxSeriesSize = 64 * 1024
maxChunkSize = 16000
maxSeriesSize = 64 * 1024

// CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility
// with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels.
Expand All @@ -76,6 +74,11 @@ const (
// TODO(bwplotka): Remove it at some point.
CompatibilityTypeLabelName = "@thanos_compatibility_store_type"

// DefaultPostingOffsetInMemorySampling represents default value for --store.index-header-posting-offsets-in-mem-sampling.
// 32 value is chosen as it's a good balance for common setups. Sampling that is not too large (too many CPU cycles) and
// not too small (too much memory).
DefaultPostingOffsetInMemorySampling = 32

partitionerMaxGapSize = 512 * 1024
)

Expand Down Expand Up @@ -252,7 +255,8 @@ type BucketStore struct {
// Reencode postings using diff+varint+snappy when storing to cache.
// This makes them smaller, but takes extra CPU and memory.
// When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.
enablePostingsCompression bool
enablePostingsCompression bool
postingOffsetsInMemSampling int
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -273,6 +277,7 @@ func NewBucketStore(
enableCompatibilityLabel bool,
enableIndexHeader bool,
enablePostingsCompression bool,
postingOffsetsInMemSampling int,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -304,11 +309,12 @@ func NewBucketStore(
maxConcurrent,
extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg),
),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enableIndexHeader: enableIndexHeader,
enablePostingsCompression: enablePostingsCompression,
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
enableIndexHeader: enableIndexHeader,
enablePostingsCompression: enablePostingsCompression,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
}
s.metrics = metrics

Expand Down Expand Up @@ -463,7 +469,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er

var indexHeaderReader indexheader.Reader
if s.enableIndexHeader {
indexHeaderReader, err = indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID)
indexHeaderReader, err = indexheader.NewBinaryReader(ctx, s.logger, s.bkt, s.dir, meta.ULID, s.postingOffsetsInMemSampling)
if err != nil {
return errors.Wrap(err, "create index header reader")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
true,
true,
true,
DefaultPostingOffsetInMemorySampling,
)
testutil.Ok(t, err)
s.store = store
Expand Down
Loading