Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
fix delete function
Browse files Browse the repository at this point in the history
- when deleting a branch node, we first walk up the tree to delete
  children. As all siblings and the parents are already being deleted,
  there is no need to walk back down the tree removing the children and
  any empty branches.
  • Loading branch information
woodsaj committed Apr 17, 2017
1 parent d7dc925 commit 8fc67f1
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
12 changes: 8 additions & 4 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,15 +496,15 @@ func (m *MemoryIdx) Delete(orgId int, pattern string) ([]idx.Archive, error) {
}

for _, f := range found {
deleted := m.delete(orgId, f)
deleted := m.delete(orgId, f, true)
statMetricsActive.DecUint32(uint32(len(deleted)))
deletedDefs = append(deletedDefs, deleted...)
}
statDeleteDuration.Value(time.Since(pre))
return deletedDefs, nil
}

func (m *MemoryIdx) delete(orgId int, n *Node) []idx.Archive {
func (m *MemoryIdx) delete(orgId int, n *Node, deleteEmptyParents bool) []idx.Archive {
tree := m.Tree[orgId]
deletedDefs := make([]idx.Archive, 0)
if n.HasChildren() {
Expand All @@ -517,7 +517,7 @@ func (m *MemoryIdx) delete(orgId int, n *Node) []idx.Archive {
continue
}
log.Debug("memory-idx: deleting child %s from branch %s", node.Path, n.Path)
deleted := m.delete(orgId, node)
deleted := m.delete(orgId, node, false)
deletedDefs = append(deletedDefs, deleted...)
}
}
Expand All @@ -532,6 +532,10 @@ func (m *MemoryIdx) delete(orgId int, n *Node) []idx.Archive {
// delete the node.
delete(tree.Items, n.Path)

if !deleteEmptyParents {
return deletedDefs
}

// delete node from the branches
// e.g. for foo.bar.baz
// branch "foo.bar" -> node "baz"
Expand Down Expand Up @@ -621,7 +625,7 @@ func (m *MemoryIdx) Prune(orgId int, oldest time.Time) ([]idx.Archive, error) {
if staleCount == len(n.Defs) {
log.Debug("memory-idx: series %s for orgId:%d is stale. pruning it.", n.Path, org)
//we need to delete this node.
defs := m.delete(org, n)
defs := m.delete(org, n, true)
statMetricsActive.Dec()
pruned = append(pruned, defs...)
}
Expand Down
47 changes: 47 additions & 0 deletions idx/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,30 @@ func TestDelete(t *testing.T) {
})
}

func TestDeleteWithLotsOfSeries(t *testing.T) {
ix := New()
ix.Init()

var data *schema.MetricData
var key string
for i := 1; i <= 100000; i++ {
key = fmt.Sprintf("some.metric.%d.%d", i, i)
data = &schema.MetricData{
Name: key,
Metric: key,
OrgId: 1,
Interval: 10,
}
data.SetId()
ix.AddOrUpdate(data, 1)
}
Convey("when deleting 1 million series", t, func() {
defs, err := ix.Delete(1, "some.*")
So(err, ShouldBeNil)
So(defs, ShouldHaveLength, 100000)
})
}

func TestMixedBranchLeaf(t *testing.T) {
ix := New()
ix.Init()
Expand Down Expand Up @@ -499,3 +523,26 @@ func BenchmarkIndexing(b *testing.B) {
ix.AddOrUpdate(data, 1)
}
}

func BenchmarkDeletes(b *testing.B) {
ix := New()
ix.Init()

var data *schema.MetricData
var key string
for i := 1; i <= b.N; i++ {
key = fmt.Sprintf("some.metric.%d.%d", i, i)
data = &schema.MetricData{
Name: key,
Metric: key,
OrgId: 1,
Interval: 10,
}
data.SetId()
ix.AddOrUpdate(data, 1)
}
b.ReportAllocs()
b.ResetTimer()

ix.Delete(1, "some.*")
}

0 comments on commit 8fc67f1

Please sign in to comment.