From 7724b9934d771b79f8ed55cc5dbf1b4be3c6b01d Mon Sep 17 00:00:00 2001 From: woodsaj Date: Mon, 27 Feb 2017 12:40:16 +0800 Subject: [PATCH] fix deleting from mixed leaf/branch nodes from index - the index delete code was still assuming that nodes could only be brancheds or leafs and not both. - add unit tests to verify correct behaviour --- idx/memory/memory.go | 22 ++++++----- idx/memory/memory_test.go | 83 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 10 deletions(-) diff --git a/idx/memory/memory.go b/idx/memory/memory.go index 3c1a484242..76c7f50e94 100644 --- a/idx/memory/memory.go +++ b/idx/memory/memory.go @@ -492,7 +492,7 @@ func (m *MemoryIdx) Delete(orgId int, pattern string) ([]schema.MetricDefinition for _, f := range found { deleted := m.delete(orgId, f) - statMetricsActive.Dec() + statMetricsActive.DecUint32(uint32(len(deleted))) deletedDefs = append(deletedDefs, deleted...) } statDeleteDuration.Value(time.Since(pre)) @@ -501,10 +501,10 @@ func (m *MemoryIdx) Delete(orgId int, pattern string) ([]schema.MetricDefinition func (m *MemoryIdx) delete(orgId int, n *Node) []schema.MetricDefinition { tree := m.Tree[orgId] + deletedDefs := make([]schema.MetricDefinition, 0) if n.HasChildren() { log.Debug("memory-idx: deleting branch %s", n.Path) // walk up the tree to find all leaf nodes and delete them. - deletedDefs := make([]schema.MetricDefinition, 0) for _, child := range n.Children { node, ok := tree.Items[n.Path+"."+child] if !ok { @@ -515,17 +515,16 @@ func (m *MemoryIdx) delete(orgId int, n *Node) []schema.MetricDefinition { deleted := m.delete(orgId, node) deletedDefs = append(deletedDefs, deleted...) } - return deletedDefs } - deletedDefs := make([]schema.MetricDefinition, len(n.Defs)) + // delete the metricDefs - for i, id := range n.Defs { + for _, id := range n.Defs { log.Debug("memory-idx: deleting %s from index", id) - deletedDefs[i] = *m.DefById[id] + deletedDefs = append(deletedDefs, *m.DefById[id]) delete(m.DefById, id) } - // delete the leaf. + // delete the node. delete(tree.Items, n.Path) // delete from the branches @@ -558,10 +557,13 @@ func (m *MemoryIdx) delete(orgId int, n *Node) []schema.MetricDefinition { log.Error(3, "memory-idx: %s not in children list for branch %s. Index is corrupt", nodes[i], branch) break } - if !bNode.Leaf() { - log.Debug("memory-idx: branch %s has no children and is not a leaf node, deleting it.", branch) - delete(tree.Items, branch) + bNode.Children = make([]string, 0) + if bNode.Leaf() { + log.Debug("memory-idx: branch %s is also a leaf node, keeping it.", branch) + break } + log.Debug("memory-idx: branch %s has no children and is not a leaf node, deleting it.", branch) + delete(tree.Items, branch) } return deletedDefs diff --git a/idx/memory/memory_test.go b/idx/memory/memory_test.go index ca7c291105..c0d1426f10 100644 --- a/idx/memory/memory_test.go +++ b/idx/memory/memory_test.go @@ -330,6 +330,89 @@ func TestMixedBranchLeaf(t *testing.T) { }) } +func TestMixedBranchLeafDelete(t *testing.T) { + ix := New() + ix.Init() + series := []*schema.MetricData{ + { + Name: fmt.Sprintf("a.b.c"), + Metric: fmt.Sprintf("a.b.c"), + OrgId: 1, + Interval: 10, + }, + { + Name: fmt.Sprintf("a.b.c.d"), + Metric: fmt.Sprintf("a.b.c.d"), + OrgId: 1, + Interval: 10, + }, + { + Name: fmt.Sprintf("a.b.c2"), + Metric: fmt.Sprintf("a.b.c2"), + OrgId: 1, + Interval: 10, + }, + { + Name: fmt.Sprintf("a.b.c2.d.e"), + Metric: fmt.Sprintf("a.b.c2.d.e"), + OrgId: 1, + Interval: 10, + }, + { + Name: fmt.Sprintf("a.b.c2.d2.e"), + Metric: fmt.Sprintf("a.b.c2.d2.e"), + OrgId: 1, + Interval: 10, + }, + } + for _, s := range series { + s.SetId() + ix.AddOrUpdate(s, 1) + } + Convey("when deleting mixed leaf/branch", t, func() { + defs, err := ix.Delete(1, "a.b.c") + So(err, ShouldBeNil) + So(defs, ShouldHaveLength, 2) + deletedIds := make([]string, len(defs)) + for i, d := range defs { + deletedIds[i] = d.Id + } + So(series[0].Id, ShouldBeIn, deletedIds) + So(series[1].Id, ShouldBeIn, deletedIds) + Convey("series should not be present in the metricDef index", func() { + _, ok := ix.Get(series[0].Id) + So(ok, ShouldEqual, false) + Convey("series should not be present in searches", func() { + found, err := ix.Find(1, "a.b.c", 0) + So(err, ShouldBeNil) + So(found, ShouldHaveLength, 0) + found, err = ix.Find(1, "a.b.c.d", 0) + So(err, ShouldBeNil) + So(found, ShouldHaveLength, 0) + }) + }) + }) + Convey("when deleting from branch", t, func() { + defs, err := ix.Delete(1, "a.b.c2.d.*") + So(err, ShouldBeNil) + So(defs, ShouldHaveLength, 1) + So(defs[0].Id, ShouldEqual, series[3].Id) + + Convey("deleted series should not be present in the metricDef index", func() { + _, ok := ix.Get(series[3].Id) + So(ok, ShouldEqual, false) + Convey("deleted series should not be present in searches", func() { + found, err := ix.Find(1, "a.b.c2.*", 0) + So(err, ShouldBeNil) + So(found, ShouldHaveLength, 1) + found, err = ix.Find(1, "a.b.c2.d", 0) + So(err, ShouldBeNil) + So(found, ShouldHaveLength, 0) + }) + }) + }) +} + func TestPrune(t *testing.T) { ix := New() ix.Init()