Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Added missing matcher logic for negative matchers. #839

Merged
merged 1 commit into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 109 additions & 62 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) {
return s, nil
}

// Postings returns postings in expanded list instead of index.Postings.
// ExpandedPostings returns postings in expanded list instead of index.Postings.
// This is because we need to have them buffered anyway to perform efficient lookup
// on object storage.
// Found posting IDs (ps) are not strictly required to point to a valid Series, e.g. during
Expand All @@ -1178,32 +1178,33 @@ func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) {
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ms []labels.Matcher) ([]uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - Fix comment above Postings > ExtendedPostings

var postingsToIntersect []index.Postings
var postingGroups []*postingGroup

// NOTE: Derived from tsdb.PostingsForMatchers.
for _, m := range ms {
matching, err := matchingLabels(r.LabelValues, m)
if err != nil {
return nil, errors.Wrap(err, "match labels")
}
if len(matching) == 0 {
matchingGroup := toPostingGroup(r.LabelValues, m)
if matchingGroup == nil {
continue
}

// We need to load all matching postings to tell what postings are intersecting with what.
postings, err := r.fetchPostings(matching)
if err != nil {
return nil, errors.Wrap(err, "get postings")
}

postingsToIntersect = append(postingsToIntersect, postings)
// Each group is separate to tell later what postings are intersecting with what.
postingGroups = append(postingGroups, matchingGroup)
}

if len(postingsToIntersect) == 0 {
if len(postingGroups) == 0 {
return nil, nil
}

ps, err := index.ExpandPostings(index.Intersect(postingsToIntersect...))
if err := r.fetchPostings(postingGroups); err != nil {
return nil, errors.Wrap(err, "get postings")
}

var postings []index.Postings
for _, g := range postingGroups {
postings = append(postings, g.Postings())
}

ps, err := index.ExpandPostings(index.Intersect(postings...))
if err != nil {
return nil, errors.Wrap(err, "expand")
}
Expand All @@ -1219,70 +1220,120 @@ func (r *bucketIndexReader) ExpandedPostings(ms []labels.Matcher) ([]uint64, err
return ps, nil
}

type postingGroup struct {
keys labels.Labels
postings []index.Postings

aggregate func(postings []index.Postings) index.Postings
}

func newPostingGroup(keys labels.Labels, aggr func(postings []index.Postings) index.Postings) *postingGroup {
return &postingGroup{
keys: keys,
postings: make([]index.Postings, len(keys)),
aggregate: aggr,
}
}

func (p *postingGroup) Fill(i int, posting index.Postings) {
p.postings[i] = posting
}

func (p *postingGroup) Postings() index.Postings {
return p.aggregate(p.postings)
}

func merge(p []index.Postings) index.Postings {
return index.Merge(p...)
}

func allWithout(p []index.Postings) index.Postings {
return index.Without(p[0], index.Merge(p[1:]...))
}

// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication.
func matchingLabels(lvalsFn func(name string) []string, m labels.Matcher) (labels.Labels, error) {
func toPostingGroup(lvalsFn func(name string) []string, m labels.Matcher) *postingGroup {
var matchingLabels labels.Labels

// If the matcher selects an empty value, it selects all the series which don't
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
if m.Matches("") {
// We don't support tsdb.postingsForUnsetLabelMatcher.
// This is because it requires fetching all postings for index.
// This requires additional logic to avoid fetching big bytes range (todo: how big?). See https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
// to what it blocks.
return nil, errors.Errorf("support for <> != <val> matcher is not implemented; empty matcher for label name %s", m.Name())
allName, allValue := index.AllPostingsKey()

matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue})
for _, val := range lvalsFn(m.Name()) {
if !m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name(), Value: val})
}
}

if len(matchingLabels) == 1 {
// This is known hack to return all series.
// Ask for x != <not existing value>. Allow for that as Prometheus does,
// even though it is expensive.
return newPostingGroup(matchingLabels, merge)
}

return newPostingGroup(matchingLabels, allWithout)
}

// Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok {
return labels.Labels{{Name: em.Name(), Value: em.Value()}}, nil
return newPostingGroup(labels.Labels{{Name: em.Name(), Value: em.Value()}}, merge)
}

var matchingLabels labels.Labels
for _, val := range lvalsFn(m.Name()) {
if m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name(), Value: val})
}
}

return matchingLabels, nil
}
if len(matchingLabels) == 0 {
return nil
}

type postingPtr struct {
key labels.Label
ptr index.Range
return newPostingGroup(matchingLabels, merge)
}

// fetchPostings returns sorted slice of postings that match the selected labels.
func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, error) {
var (
ptrs []postingPtr
postings = make([]index.Postings, 0, len(keys))
)

// TODO(bwplotka): sort postings?
type postingPtr struct {
groupID int
keyID int
ptr index.Range
}

// fetchPostings fill postings requested by posting groups.
func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
var ptrs []postingPtr

// Iterate over all groups and fetch posting from cache.
// If we have a miss, mark key to be fetched in `ptrs` slice.
// Overlaps are well handled by partitioner, so we don't need to deduplicate keys.
for i, g := range groups {
for j, key := range g.keys {
// Get postings for the given key from cache first.
if b, ok := r.cache.postings(r.block.meta.ULID, key); ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

for _, k := range keys {
// Get postings for given key from cache first.
if b, ok := r.cache.postings(r.block.meta.ULID, k); ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)
_, l, err := r.dec.Postings(b)
if err != nil {
return errors.Wrap(err, "decode postings")
}
g.Fill(j, l)
continue
}

_, l, err := r.dec.Postings(b)
if err != nil {
return nil, errors.Wrap(err, "decode postings")
// Cache miss; save pointer for actual posting in index stored in object store.
ptr, ok := r.block.postings[key]
if !ok {
// This block does not have any posting for given key.
g.Fill(j, index.EmptyPostings())
continue
}
postings = append(postings, l)
continue
}

// Cache miss; save pointer for actual posting in index stored in object store.
ptr, ok := r.block.postings[k]
if !ok {
// Index malformed? Should not happen.
continue
ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j})
}

ptrs = append(ptrs, postingPtr{ptr: ptr, key: k})
}

sort.Slice(ptrs, func(i, j int) bool {
Expand Down Expand Up @@ -1331,8 +1382,8 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e
}

// Return postings and fill LRU cache.
postings = append(postings, fetchedPostings)
r.cache.setPostings(r.block.meta.ULID, p.key, c)
groups[p.groupID].Fill(p.keyID, fetchedPostings)
r.cache.setPostings(r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
Expand All @@ -1346,11 +1397,7 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e
})
}

if err := g.Run(); err != nil {
return nil, err
}

return index.Merge(postings...), nil
return g.Run()
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
Expand Down
102 changes: 102 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,74 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NRE, Name: "a", Value: "2"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NRE, Name: "a", Value: "not_existing"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NRE, Name: "not_existing", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
Expand Down Expand Up @@ -191,6 +259,40 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MaxTime: maxt,
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NEQ, Name: "a", Value: "2"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NEQ, Name: "a", Value: "not_existing"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
} {
t.Log("Run ", i)

Expand Down