Skip to content

Commit

Permalink
make smartMerger.Merge merge reports in parallel
Browse files Browse the repository at this point in the history
for reduced latency
  • Loading branch information
rade committed Aug 24, 2016
1 parent 32edfc9 commit 902ba88
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 104 deletions.
99 changes: 18 additions & 81 deletions app/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ package app

import (
"fmt"
"math"
"sort"

"github.com/bluele/gcache"
"github.com/spaolacci/murmur3"

"github.com/weaveworks/scope/report"
Expand Down Expand Up @@ -34,91 +31,31 @@ func (dumbMerger) Merge(reports []report.Report) report.Report {
return rpt
}

type smartMerger struct {
cache gcache.Cache
}
type smartMerger struct{}

// NewSmartMerger makes a Merger which merges together reports as
// a binary tree of reports. Speed up comes from the fact that
// most merges are between small reports.
// NewSmartMerger makes a Merger which merges reports in
// parallel. Speed up comes from the fact that a) most merges are
// between small reports, and b) we take advantage of available cores.
func NewSmartMerger() Merger {
return smartMerger{}
}

type node struct {
id uint64
rpt report.Report
}

type byID []*node

func (ns byID) Len() int { return len(ns) }
func (ns byID) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] }
func (ns byID) Less(i, j int) bool { return ns[i].id < ns[j].id }

func hash(ids ...string) uint64 {
id := murmur3.New64()
for _, i := range ids {
id.Write([]byte(i))
func (smartMerger) Merge(reports []report.Report) report.Report {
l := len(reports)
switch l {
case 0:
return report.MakeReport()
case 1:
return reports[0]
}
return id.Sum64()
}

func (s smartMerger) Merge(reports []report.Report) report.Report {
// Start with a sorted list of leaves.
// Note we must dedupe reports with the same ID to ensure the
// algorithm below doesn't go into an infinite loop. This is
// fine as reports with the same ID are assumed to be the same.
nodes := []*node{}
seen := map[uint64]struct{}{}
c := make(chan report.Report, l)
for _, r := range reports {
id := hash(r.ID)
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
nodes = append(nodes, &node{
id: id,
rpt: r,
})
}
sort.Sort(byID(nodes))

// Define how to merge two nodes together. The result of merging
// two reports is cached.
merge := func(left, right *node) *node {
return &node{
id: hash(left.rpt.ID, right.rpt.ID),
rpt: report.MakeReport().Merge(left.rpt).Merge(right.rpt),
}
c <- r
}

// Define how to reduce n nodes to 1.
// Min and max are both inclusive!
var reduce func(min, max uint64, nodes []*node) *node
reduce = func(min, max uint64, nodes []*node) *node {
switch len(nodes) {
case 0:
return &node{rpt: report.MakeReport()}
case 1:
return nodes[0]
case 2:
return merge(nodes[0], nodes[1])
}

partition := min + ((max - min) / 2)
index := sort.Search(len(nodes), func(i int) bool {
return nodes[i].id > partition
})
if index == len(nodes) {
return reduce(min, partition, nodes)
} else if index == 0 {
return reduce(partition+1, max, nodes)
}
left := reduce(min, partition, nodes[:index])
right := reduce(partition+1, max, nodes[index:])
return merge(left, right)
for ; l > 1; l-- {
go func(left, right report.Report) {
c <- left.Merge(right)
}(<-c, <-c)
}

return reduce(0, math.MaxUint64, nodes).rpt
return <-c
}
23 changes: 0 additions & 23 deletions app/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,6 @@ func TestMerger(t *testing.T) {
}
}

func TestSmartMerger(t *testing.T) {
// Use 3 reports _WITH SAME ID_
report1 := report.MakeReport()
report1.Endpoint.AddNode(report.MakeNode("foo"))
report1.ID = "foo"
report2 := report.MakeReport()
report2.Endpoint.AddNode(report.MakeNode("bar"))
report2.ID = "foo"
report3 := report.MakeReport()
report3.Endpoint.AddNode(report.MakeNode("baz"))
report3.ID = "foo"
reports := []report.Report{
report1, report2, report3,
}
want := report.MakeReport()
want.Endpoint.AddNode(report.MakeNode("foo"))

merger := app.NewSmartMerger()
if have := merger.Merge(reports); !reflect.DeepEqual(have, want) {
t.Errorf("Bad merge: %s", test.Diff(have, want))
}
}

func BenchmarkSmartMerger(b *testing.B) {
benchmarkMerger(b, app.NewSmartMerger())
}
Expand Down

0 comments on commit 902ba88

Please sign in to comment.