Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
fix deleting from mixed leaf/branch nodes from index
Browse files Browse the repository at this point in the history
- the index delete code was still assuming that nodes could
only be brancheds or leafs and not both.
- add unit tests to verify correct behaviour
  • Loading branch information
woodsaj committed Feb 27, 2017
1 parent 6f62aac commit 7724b99
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 10 deletions.
22 changes: 12 additions & 10 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (m *MemoryIdx) Delete(orgId int, pattern string) ([]schema.MetricDefinition

for _, f := range found {
deleted := m.delete(orgId, f)
statMetricsActive.Dec()
statMetricsActive.DecUint32(uint32(len(deleted)))
deletedDefs = append(deletedDefs, deleted...)
}
statDeleteDuration.Value(time.Since(pre))
Expand All @@ -501,10 +501,10 @@ func (m *MemoryIdx) Delete(orgId int, pattern string) ([]schema.MetricDefinition

func (m *MemoryIdx) delete(orgId int, n *Node) []schema.MetricDefinition {
tree := m.Tree[orgId]
deletedDefs := make([]schema.MetricDefinition, 0)
if n.HasChildren() {
log.Debug("memory-idx: deleting branch %s", n.Path)
// walk up the tree to find all leaf nodes and delete them.
deletedDefs := make([]schema.MetricDefinition, 0)
for _, child := range n.Children {
node, ok := tree.Items[n.Path+"."+child]
if !ok {
Expand All @@ -515,17 +515,16 @@ func (m *MemoryIdx) delete(orgId int, n *Node) []schema.MetricDefinition {
deleted := m.delete(orgId, node)
deletedDefs = append(deletedDefs, deleted...)
}
return deletedDefs
}
deletedDefs := make([]schema.MetricDefinition, len(n.Defs))

// delete the metricDefs
for i, id := range n.Defs {
for _, id := range n.Defs {
log.Debug("memory-idx: deleting %s from index", id)
deletedDefs[i] = *m.DefById[id]
deletedDefs = append(deletedDefs, *m.DefById[id])
delete(m.DefById, id)
}

// delete the leaf.
// delete the node.
delete(tree.Items, n.Path)

// delete from the branches
Expand Down Expand Up @@ -558,10 +557,13 @@ func (m *MemoryIdx) delete(orgId int, n *Node) []schema.MetricDefinition {
log.Error(3, "memory-idx: %s not in children list for branch %s. Index is corrupt", nodes[i], branch)
break
}
if !bNode.Leaf() {
log.Debug("memory-idx: branch %s has no children and is not a leaf node, deleting it.", branch)
delete(tree.Items, branch)
bNode.Children = make([]string, 0)
if bNode.Leaf() {
log.Debug("memory-idx: branch %s is also a leaf node, keeping it.", branch)
break
}
log.Debug("memory-idx: branch %s has no children and is not a leaf node, deleting it.", branch)
delete(tree.Items, branch)
}

return deletedDefs
Expand Down
83 changes: 83 additions & 0 deletions idx/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,89 @@ func TestMixedBranchLeaf(t *testing.T) {
})
}

func TestMixedBranchLeafDelete(t *testing.T) {
ix := New()
ix.Init()
series := []*schema.MetricData{
{
Name: fmt.Sprintf("a.b.c"),
Metric: fmt.Sprintf("a.b.c"),
OrgId: 1,
Interval: 10,
},
{
Name: fmt.Sprintf("a.b.c.d"),
Metric: fmt.Sprintf("a.b.c.d"),
OrgId: 1,
Interval: 10,
},
{
Name: fmt.Sprintf("a.b.c2"),
Metric: fmt.Sprintf("a.b.c2"),
OrgId: 1,
Interval: 10,
},
{
Name: fmt.Sprintf("a.b.c2.d.e"),
Metric: fmt.Sprintf("a.b.c2.d.e"),
OrgId: 1,
Interval: 10,
},
{
Name: fmt.Sprintf("a.b.c2.d2.e"),
Metric: fmt.Sprintf("a.b.c2.d2.e"),
OrgId: 1,
Interval: 10,
},
}
for _, s := range series {
s.SetId()
ix.AddOrUpdate(s, 1)
}
Convey("when deleting mixed leaf/branch", t, func() {
defs, err := ix.Delete(1, "a.b.c")
So(err, ShouldBeNil)
So(defs, ShouldHaveLength, 2)
deletedIds := make([]string, len(defs))
for i, d := range defs {
deletedIds[i] = d.Id
}
So(series[0].Id, ShouldBeIn, deletedIds)
So(series[1].Id, ShouldBeIn, deletedIds)
Convey("series should not be present in the metricDef index", func() {
_, ok := ix.Get(series[0].Id)
So(ok, ShouldEqual, false)
Convey("series should not be present in searches", func() {
found, err := ix.Find(1, "a.b.c", 0)
So(err, ShouldBeNil)
So(found, ShouldHaveLength, 0)
found, err = ix.Find(1, "a.b.c.d", 0)
So(err, ShouldBeNil)
So(found, ShouldHaveLength, 0)
})
})
})
Convey("when deleting from branch", t, func() {
defs, err := ix.Delete(1, "a.b.c2.d.*")
So(err, ShouldBeNil)
So(defs, ShouldHaveLength, 1)
So(defs[0].Id, ShouldEqual, series[3].Id)

Convey("deleted series should not be present in the metricDef index", func() {
_, ok := ix.Get(series[3].Id)
So(ok, ShouldEqual, false)
Convey("deleted series should not be present in searches", func() {
found, err := ix.Find(1, "a.b.c2.*", 0)
So(err, ShouldBeNil)
So(found, ShouldHaveLength, 1)
found, err = ix.Find(1, "a.b.c2.d", 0)
So(err, ShouldBeNil)
So(found, ShouldHaveLength, 0)
})
})
})
}

func TestPrune(t *testing.T) {
ix := New()
ix.Init()
Expand Down

0 comments on commit 7724b99

Please sign in to comment.