Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Initial tag query performance improvements #848

Merged
merged 3 commits into from
Feb 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 14 additions & 38 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ type Tree struct {
Items map[string]*Node // key is the full path of the node.
}

type IdSet map[idx.MetricID]struct{} // set of ids
type IdSet map[string]struct{} // set of ids

func (ids IdSet) String() string {
var res string
for id := range ids {
if len(res) > 0 {
res += " "
}
res += id.String()
res += id
}
return res

Expand All @@ -80,7 +80,7 @@ func (ids IdSet) String() string {
type TagValue map[string]IdSet // value -> set of ids
type TagIndex map[string]TagValue // key -> list of values

func (t *TagIndex) addTagId(name, value string, id idx.MetricID) {
func (t *TagIndex) addTagId(name, value string, id string) {
ti := *t
if _, ok := ti[name]; !ok {
ti[name] = make(TagValue)
Expand All @@ -91,7 +91,7 @@ func (t *TagIndex) addTagId(name, value string, id idx.MetricID) {
ti[name][value][id] = struct{}{}
}

func (t *TagIndex) delTagId(name, value string, id idx.MetricID) {
func (t *TagIndex) delTagId(name, value string, id string) {
ti := *t

delete(ti[name][value], id)
Expand Down Expand Up @@ -252,15 +252,6 @@ func (m *MemoryIdx) indexTags(def *schema.MetricDefinition) {
m.tags[def.OrgId] = tags
}

id, err := idx.NewMetricIDFromString(def.Id)
if err != nil {
// should never happen because all IDs in the index must have
// a valid format
invalidId.Inc()
log.Error(3, "memory-idx: ID %q has invalid format", def.Id)
return
}

for _, tag := range def.Tags {
tagSplits := strings.SplitN(tag, "=", 2)
if len(tagSplits) < 2 {
Expand All @@ -273,9 +264,9 @@ func (m *MemoryIdx) indexTags(def *schema.MetricDefinition) {

tagName := tagSplits[0]
tagValue := tagSplits[1]
tags.addTagId(tagName, tagValue, id)
tags.addTagId(tagName, tagValue, def.Id)
}
tags.addTagId("name", def.Name, id)
tags.addTagId("name", def.Name, def.Id)

m.defByTagSet.add(def)
}
Expand All @@ -286,15 +277,6 @@ func (m *MemoryIdx) indexTags(def *schema.MetricDefinition) {
// unsuccessful, "true" means the indexing was at least partially or completely
// successful
func (m *MemoryIdx) deindexTags(tags TagIndex, def *schema.MetricDefinition) bool {
id, err := idx.NewMetricIDFromString(def.Id)
if err != nil {
// should never happen because all IDs in the index must have
// a valid format
invalidId.Inc()
log.Error(3, "memory-idx: ID %q has invalid format", def.Id)
return false
}

for _, tag := range def.Tags {
tagSplits := strings.SplitN(tag, "=", 2)
if len(tagSplits) < 2 {
Expand All @@ -307,10 +289,10 @@ func (m *MemoryIdx) deindexTags(tags TagIndex, def *schema.MetricDefinition) boo

tagName := tagSplits[0]
tagValue := tagSplits[1]
tags.delTagId(tagName, tagValue, id)
tags.delTagId(tagName, tagValue, def.Id)
}

tags.delTagId("name", def.Name, id)
tags.delTagId("name", def.Name, def.Id)

m.defByTagSet.del(def)

Expand Down Expand Up @@ -515,7 +497,7 @@ func (m *MemoryIdx) TagDetails(orgId int, key, filter string, from int64) (map[s
count := uint64(0)
if from > 0 {
for id := range ids {
def, ok := m.defById[id.String()]
def, ok := m.defById[id]
if !ok {
corruptIndex.Inc()
log.Error(3, "memory-idx: corrupt. ID %q is in tag index but not in the byId lookup table", id)
Expand Down Expand Up @@ -670,7 +652,7 @@ func (m *MemoryIdx) FindTagValues(orgId int, tag, prefix string, expressions []s
for id := range ids {
var ok bool
var def *idx.Archive
if def, ok = m.defById[id.String()]; !ok {
if def, ok = m.defById[id]; !ok {
// should never happen because every ID in the tag index
// must be present in the byId lookup table
corruptIndex.Inc()
Expand Down Expand Up @@ -786,7 +768,7 @@ func (m *MemoryIdx) Tags(orgId int, filter string, from int64) ([]string, error)
func (m *MemoryIdx) hasOneMetricFrom(tags TagIndex, tag string, from int64) bool {
for _, ids := range tags[tag] {
for id := range ids {
def, ok := m.defById[id.String()]
def, ok := m.defById[id]
if !ok {
corruptIndex.Inc()
log.Error(3, "memory-idx: corrupt. ID %q is in tag index but not in the byId lookup table", id)
Expand Down Expand Up @@ -820,7 +802,7 @@ func (m *MemoryIdx) FindByTag(orgId int, expressions []string, from int64) ([]id
ids := m.idsByTagQuery(orgId, query)
res := make([]idx.Node, 0, len(ids))
for id := range ids {
def, ok := m.defById[id.String()]
def, ok := m.defById[id]
if !ok {
corruptIndex.Inc()
log.Error(3, "memory-idx: corrupt. ID %q has been given, but it is not in the byId lookup table", id)
Expand Down Expand Up @@ -1071,7 +1053,7 @@ func (m *MemoryIdx) deleteTaggedByIdSet(orgId int, ids IdSet) []idx.Archive {

deletedDefs := make([]idx.Archive, 0, len(ids))
for id := range ids {
idStr := id.String()
idStr := id
def, ok := m.defById[idStr]
if !ok {
// not necessarily a corruption, the id could have been deleted
Expand Down Expand Up @@ -1273,13 +1255,7 @@ DEFS:
}

for def := range defs {
id, err := idx.NewMetricIDFromString(def.Id)
if err != nil {
log.Error(3, "memory-idx: corrupt index. ID format %s seems to be invalid", def.Id)
continue
}

toPruneTagged[def.OrgId][id] = struct{}{}
toPruneTagged[def.OrgId][def.Id] = struct{}{}
}
}
}
Expand Down
5 changes: 0 additions & 5 deletions idx/memory/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ var (
// each time this happens, an error is logged with more details.
corruptIndex = stats.NewCounter32("recovered_errors.idx.memory.corrupt-index")

// metric recovered_errors.idx.memory.invalid-id is how many times
// an invalid metric id is encountered.
// each time this happens, an error is logged with more details.
invalidId = stats.NewCounter32("recovered_errors.idx.memory.invalid-id")

// metric recovered_errors.idx.memory.invalid-tag is how many times
// an invalid tag for a metric is encountered.
// each time this happens, an error is logged with more details.
Expand Down
28 changes: 14 additions & 14 deletions idx/memory/tag_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func NewTagQuery(expressions []string, from int64) (TagQuery, error) {
}

// getInitialByEqual generates the initial resultset by executing the given equal expression
func (q *TagQuery) getInitialByEqual(expr kv, idCh chan idx.MetricID, stopCh chan struct{}) {
func (q *TagQuery) getInitialByEqual(expr kv, idCh chan string, stopCh chan struct{}) {
defer q.wg.Done()

KEYS:
Expand All @@ -349,7 +349,7 @@ KEYS:
}

// getInitialByPrefix generates the initial resultset by executing the given prefix match expression
func (q *TagQuery) getInitialByPrefix(expr kv, idCh chan idx.MetricID, stopCh chan struct{}) {
func (q *TagQuery) getInitialByPrefix(expr kv, idCh chan string, stopCh chan struct{}) {
defer q.wg.Done()

VALUES:
Expand All @@ -371,7 +371,7 @@ VALUES:
}

// getInitialByMatch generates the initial resultset by executing the given match expression
func (q *TagQuery) getInitialByMatch(expr kvRe, idCh chan idx.MetricID, stopCh chan struct{}) {
func (q *TagQuery) getInitialByMatch(expr kvRe, idCh chan string, stopCh chan struct{}) {
defer q.wg.Done()

// shortcut if value == nil.
Expand Down Expand Up @@ -412,7 +412,7 @@ VALUES2:

// getInitialByTagPrefix generates the initial resultset by creating a list of
// metric IDs of which at least one tag starts with the defined prefix
func (q *TagQuery) getInitialByTagPrefix(idCh chan idx.MetricID, stopCh chan struct{}) {
func (q *TagQuery) getInitialByTagPrefix(idCh chan string, stopCh chan struct{}) {
defer q.wg.Done()

TAGS:
Expand All @@ -437,7 +437,7 @@ TAGS:

// getInitialByTagMatch generates the initial resultset by creating a list of
// metric IDs of which at least one tag matches the defined regex
func (q *TagQuery) getInitialByTagMatch(idCh chan idx.MetricID, stopCh chan struct{}) {
func (q *TagQuery) getInitialByTagMatch(idCh chan string, stopCh chan struct{}) {
defer q.wg.Done()

TAGS:
Expand All @@ -461,8 +461,8 @@ TAGS:
// getInitialIds asynchronously collects all ID's of the initial result set. It returns:
// a channel through which the IDs of the initial result set will be sent
// a stop channel, which when closed, will cause it to abort the background worker.
func (q *TagQuery) getInitialIds() (chan idx.MetricID, chan struct{}) {
idCh := make(chan idx.MetricID, 1000)
func (q *TagQuery) getInitialIds() (chan string, chan struct{}) {
idCh := make(chan string, 1000)
stopCh := make(chan struct{})
q.wg.Add(1)

Expand Down Expand Up @@ -492,7 +492,7 @@ func (q *TagQuery) getInitialIds() (chan idx.MetricID, chan struct{}) {
// all required tests in order to decide whether this metric should be part
// of the final result set or not
// in map/reduce terms this is the reduce function
func (q *TagQuery) testByAllExpressions(id idx.MetricID, def *idx.Archive, omitTagFilters bool) bool {
func (q *TagQuery) testByAllExpressions(id string, def *idx.Archive, omitTagFilters bool) bool {
if !q.testByFrom(def) {
return false
}
Expand Down Expand Up @@ -695,7 +695,7 @@ func (q *TagQuery) testByTagPrefix(def *idx.Archive) bool {
}

// testByEqual filters a given metric by the defined "=" expressions
func (q *TagQuery) testByEqual(id idx.MetricID, exprs []kv, not bool) bool {
func (q *TagQuery) testByEqual(id string, exprs []kv, not bool) bool {
for _, e := range exprs {
indexIds := q.index[e.key][e.value]

Expand All @@ -722,12 +722,12 @@ func (q *TagQuery) testByEqual(id idx.MetricID, exprs []kv, not bool) bool {
// required tests to decide whether a metric should be part of the final
// result set or not
// it returns the final result set via the given resCh parameter
func (q *TagQuery) filterIdsFromChan(idCh, resCh chan idx.MetricID) {
func (q *TagQuery) filterIdsFromChan(idCh, resCh chan string) {
for id := range idCh {
var def *idx.Archive
var ok bool

if def, ok = q.byId[id.String()]; !ok {
if def, ok = q.byId[id]; !ok {
// should never happen because every ID in the tag index
// must be present in the byId lookup table
corruptIndex.Inc()
Expand Down Expand Up @@ -779,7 +779,7 @@ func (q *TagQuery) Run(index TagIndex, byId map[string]*idx.Archive) IdSet {
q.sortByCost()

idCh, _ := q.getInitialIds()
resCh := make(chan idx.MetricID)
resCh := make(chan string)

// start the tag query workers. they'll consume the ids on the idCh and
// evaluate for each of them whether it satisfies all the conditions
Expand Down Expand Up @@ -836,7 +836,7 @@ func (q *TagQuery) getMaxTagCount() int {
// according to the criteria associated with this query
// those that pass all the tests will have their relevant tags extracted, which
// are then pushed into the given tag channel
func (q *TagQuery) filterTagsFromChan(idCh chan idx.MetricID, tagCh chan string, stopCh chan struct{}, omitTagFilters bool) {
func (q *TagQuery) filterTagsFromChan(idCh chan string, tagCh chan string, stopCh chan struct{}, omitTagFilters bool) {
// used to prevent that this worker thread will push the same result into
// the chan twice
resultsCache := make(map[string]struct{})
Expand All @@ -846,7 +846,7 @@ IDS:
var def *idx.Archive
var ok bool

if def, ok = q.byId[id.String()]; !ok {
if def, ok = q.byId[id]; !ok {
// should never happen because every ID in the tag index
// must be present in the byId lookup table
corruptIndex.Inc()
Expand Down
24 changes: 10 additions & 14 deletions idx/memory/tag_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"gopkg.in/raintank/schema.v1"
)

var ids []idx.MetricID
var ids []string

func getTestIDs(t *testing.T) []idx.MetricID {
func getTestIDs(t *testing.T) []string {
if len(ids) > 0 {
return ids
}
Expand All @@ -29,19 +29,15 @@ func getTestIDs(t *testing.T) []idx.MetricID {
"1.72345678901234567890123456789012",
}
for _, idStr := range idStrings {
id, err := idx.NewMetricIDFromString(idStr)
if err != nil {
t.Fatalf("Did not expect an error when converting id string to object: %s", idStr)
}
ids = append(ids, id)
ids = append(ids, idStr)
}

return ids
}

func getTestIndex(t *testing.T) (TagIndex, map[string]*idx.Archive) {
type testCase struct {
id idx.MetricID
id string
lastUpdate int64
tags []string
}
Expand All @@ -63,7 +59,7 @@ func getTestIndex(t *testing.T) (TagIndex, map[string]*idx.Archive) {
byId := make(map[string]*idx.Archive)

for i, d := range data {
idStr := d.id.String()
idStr := d.id
byId[idStr] = &idx.Archive{}
byId[idStr].Name = fmt.Sprintf("metric%d", i)
byId[idStr].Tags = d.tags
Expand Down Expand Up @@ -393,12 +389,12 @@ func TestGetByTag(t *testing.T) {

idString := "1.000000000000000000000000000000%02x"
mds := make([]schema.MetricData, 20)
ids := make([]idx.MetricID, 20)
ids := make([]string, 20)
for i := range mds {
ids[i], _ = idx.NewMetricIDFromString(fmt.Sprintf(idString, i))
ids[i] = fmt.Sprintf(idString, i)
mds[i].Metric = fmt.Sprintf("metric.%d", i)
mds[i].Name = mds[i].Metric
mds[i].Id = ids[i].String()
mds[i].Id = ids[i]
mds[i].OrgId = 1
mds[i].Interval = 1
mds[i].Time = 12345
Expand Down Expand Up @@ -473,9 +469,9 @@ func TestGetByTag(t *testing.T) {

resPaths := make([]string, 0, len(res))
for id := range res {
def, ok := ix.defById[id.String()]
def, ok := ix.defById[id]
if !ok {
t.Fatalf("Tag query returned ID that did not exist in DefByID: %s", id.String())
t.Fatalf("Tag query returned ID that did not exist in DefByID: %s", id)
}
resPaths = append(resPaths, def.NameWithTags())
}
Expand Down