From 910b7268763c74849f1574cbc305c7a53b358b49 Mon Sep 17 00:00:00 2001 From: Patrick Hemmer Date: Thu, 7 Jan 2021 11:39:12 -0500 Subject: [PATCH] Optimize SeriesGrouper & aggregators.merge (#8391) The previous implementation of SeriesGrouper required breaking a metric object apart into its constituents, converting tags and keys into unoptimized maps, only to have it put them back together into another metric object. This resulted in a significant performance overhead. This overhead was further compounded when the number of fields was large. This change adds a new AddMetric method to SeriesGrouper which preserves the metric object and removes the back-and-forth conversion. Additionlly the method used for calculating the metric's hash was switched to use maphash, which is optimized for this case. ---- Benchmarks Before: BenchmarkMergeOne-16 106012 11790 ns/op BenchmarkMergeTwo-16 48529 24819 ns/op BenchmarkGroupID-16 780018 1608 ns/op After: BenchmarkMergeOne-16 907093 1173 ns/op BenchmarkMergeTwo-16 508321 2168 ns/op BenchmarkGroupID-16 11217788 99.4 ns/op --- metric/series_grouper.go | 71 +++++++++++++++++-------- metric/series_grouper_test.go | 37 +++++++++++++ plugins/aggregators/merge/merge.go | 8 +-- plugins/aggregators/merge/merge_test.go | 69 +++++++++++++++++++++++- 4 files changed, 154 insertions(+), 31 deletions(-) create mode 100644 metric/series_grouper_test.go diff --git a/metric/series_grouper.go b/metric/series_grouper.go index 5dc66e11b8e00..c6ba23793d478 100644 --- a/metric/series_grouper.go +++ b/metric/series_grouper.go @@ -1,10 +1,9 @@ package metric import ( - "hash/fnv" - "io" + "encoding/binary" + "hash/maphash" "sort" - "strconv" "time" "github.com/influxdata/telegraf" @@ -23,14 +22,17 @@ import ( // + cpu,host=localhost idle_time=42,usage_time=42 func NewSeriesGrouper() *SeriesGrouper { return &SeriesGrouper{ - metrics: make(map[uint64]telegraf.Metric), - ordered: []telegraf.Metric{}, + metrics: make(map[uint64]telegraf.Metric), + ordered: []telegraf.Metric{}, + hashSeed: maphash.MakeSeed(), } } type SeriesGrouper struct { metrics map[uint64]telegraf.Metric ordered []telegraf.Metric + + hashSeed maphash.Seed } // Add adds a field key and value to the series. @@ -41,8 +43,15 @@ func (g *SeriesGrouper) Add( field string, fieldValue interface{}, ) error { + taglist := make([]*telegraf.Tag, 0, len(tags)) + for k, v := range tags { + taglist = append(taglist, + &telegraf.Tag{Key: k, Value: v}) + } + sort.Slice(taglist, func(i, j int) bool { return taglist[i].Key < taglist[j].Key }) + var err error - id := groupID(measurement, tags, tm) + id := groupID(g.hashSeed, measurement, taglist, tm) metric := g.metrics[id] if metric == nil { metric, err = New(measurement, tags, map[string]interface{}{field: fieldValue}, tm) @@ -57,30 +66,46 @@ func (g *SeriesGrouper) Add( return nil } +// AddMetric adds a metric to the series, merging with any previous matching metrics. +func (g *SeriesGrouper) AddMetric( + metric telegraf.Metric, +) { + id := groupID(g.hashSeed, metric.Name(), metric.TagList(), metric.Time()) + m := g.metrics[id] + if m == nil { + m = metric.Copy() + g.metrics[id] = m + g.ordered = append(g.ordered, m) + } else { + for _, f := range metric.FieldList() { + m.AddField(f.Key, f.Value) + } + } +} + // Metrics returns the metrics grouped by series and time. func (g *SeriesGrouper) Metrics() []telegraf.Metric { return g.ordered } -func groupID(measurement string, tags map[string]string, tm time.Time) uint64 { - h := fnv.New64a() - h.Write([]byte(measurement)) - h.Write([]byte("\n")) +func groupID(seed maphash.Seed, measurement string, taglist []*telegraf.Tag, tm time.Time) uint64 { + var mh maphash.Hash + mh.SetSeed(seed) + + mh.WriteString(measurement) + mh.WriteByte(0) - taglist := make([]*telegraf.Tag, 0, len(tags)) - for k, v := range tags { - taglist = append(taglist, - &telegraf.Tag{Key: k, Value: v}) - } - sort.Slice(taglist, func(i, j int) bool { return taglist[i].Key < taglist[j].Key }) for _, tag := range taglist { - h.Write([]byte(tag.Key)) - h.Write([]byte("\n")) - h.Write([]byte(tag.Value)) - h.Write([]byte("\n")) + mh.WriteString(tag.Key) + mh.WriteByte(0) + mh.WriteString(tag.Value) + mh.WriteByte(0) } - h.Write([]byte("\n")) + mh.WriteByte(0) + + var tsBuf [8]byte + binary.BigEndian.PutUint64(tsBuf[:], uint64(tm.UnixNano())) + mh.Write(tsBuf[:]) - io.WriteString(h, strconv.FormatInt(tm.UnixNano(), 10)) - return h.Sum64() + return mh.Sum64() } diff --git a/metric/series_grouper_test.go b/metric/series_grouper_test.go new file mode 100644 index 0000000000000..32fbecb6e41b2 --- /dev/null +++ b/metric/series_grouper_test.go @@ -0,0 +1,37 @@ +package metric + +import ( + "hash/maphash" + "testing" + "time" +) + +var m, _ = New( + "mymetric", + map[string]string{ + "host": "host.example.com", + "mykey": "myvalue", + "another key": "another value", + }, + map[string]interface{}{ + "f1": 1, + "f2": 2, + "f3": 3, + "f4": 4, + "f5": 5, + "f6": 6, + "f7": 7, + "f8": 8, + }, + time.Now(), +) + +var result uint64 + +var hashSeed = maphash.MakeSeed() + +func BenchmarkGroupID(b *testing.B) { + for n := 0; n < b.N; n++ { + result = groupID(hashSeed, m.Name(), m.TagList(), m.Time()) + } +} diff --git a/plugins/aggregators/merge/merge.go b/plugins/aggregators/merge/merge.go index 083c8fd3e6b0a..35be286d3bc01 100644 --- a/plugins/aggregators/merge/merge.go +++ b/plugins/aggregators/merge/merge.go @@ -36,13 +36,7 @@ func (a *Merge) SampleConfig() string { } func (a *Merge) Add(m telegraf.Metric) { - tags := m.Tags() - for _, field := range m.FieldList() { - err := a.grouper.Add(m.Name(), tags, m.Time(), field.Key, field.Value) - if err != nil { - a.log.Errorf("Error adding metric: %v", err) - } - } + a.grouper.AddMetric(m) } func (a *Merge) Push(acc telegraf.Accumulator) { diff --git a/plugins/aggregators/merge/merge_test.go b/plugins/aggregators/merge/merge_test.go index 2f2703c8f4b7c..552c8618e3482 100644 --- a/plugins/aggregators/merge/merge_test.go +++ b/plugins/aggregators/merge/merge_test.go @@ -4,9 +4,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) func TestSimple(t *testing.T) { @@ -184,3 +186,68 @@ func TestReset(t *testing.T) { testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } + +var m1, _ = metric.New( + "mymetric", + map[string]string{ + "host": "host.example.com", + "mykey": "myvalue", + "another key": "another value", + }, + map[string]interface{}{ + "f1": 1, + "f2": 2, + "f3": 3, + "f4": 4, + "f5": 5, + "f6": 6, + "f7": 7, + "f8": 8, + }, + time.Now(), +) +var m2, _ = metric.New( + "mymetric", + map[string]string{ + "host": "host.example.com", + "mykey": "myvalue", + "another key": "another value", + }, + map[string]interface{}{ + "f8": 8, + "f9": 9, + "f10": 10, + "f11": 11, + "f12": 12, + "f13": 13, + "f14": 14, + "f15": 15, + "f16": 16, + }, + m1.Time(), +) + +func BenchmarkMergeOne(b *testing.B) { + var merger Merge + merger.Init() + var acc testutil.NopAccumulator + + for n := 0; n < b.N; n++ { + merger.Reset() + merger.Add(m1) + merger.Push(&acc) + } +} + +func BenchmarkMergeTwo(b *testing.B) { + var merger Merge + merger.Init() + var acc testutil.NopAccumulator + + for n := 0; n < b.N; n++ { + merger.Reset() + merger.Add(m1) + merger.Add(m2) + merger.Push(&acc) + } +}