diff --git a/src/aggregator/aggregator/entry.go b/src/aggregator/aggregator/entry.go index e77e47477c..9ded32a92a 100644 --- a/src/aggregator/aggregator/entry.go +++ b/src/aggregator/aggregator/entry.go @@ -1076,8 +1076,14 @@ type aggregationValue struct { type aggregationValues []aggregationValue func (vals aggregationValues) index(k aggregationKey) int { - for i, val := range vals { - if val.key.Equal(k) { + // keep in sync with aggregationKey.Equal() + // this is >2x slower if not inlined manually. + for i := range vals { + if vals[i].key.aggregationID == k.aggregationID && + vals[i].key.storagePolicy == k.storagePolicy && + vals[i].key.pipeline.Equal(k.pipeline) && + vals[i].key.numForwardedTimes == k.numForwardedTimes && + vals[i].key.idPrefixSuffixType == k.idPrefixSuffixType { return i } } diff --git a/src/aggregator/aggregator/entry_benchmark_test.go b/src/aggregator/aggregator/entry_benchmark_test.go new file mode 100644 index 0000000000..66d163c368 --- /dev/null +++ b/src/aggregator/aggregator/entry_benchmark_test.go @@ -0,0 +1,136 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package aggregator + +import ( + "runtime" + "testing" + "time" + + "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/metrics/pipeline" + "github.com/m3db/m3/src/metrics/pipeline/applied" + "github.com/m3db/m3/src/metrics/policy" + "github.com/m3db/m3/src/metrics/transformation" + xtime "github.com/m3db/m3/src/x/time" +) + +func BenchmarkAggregationValues(b *testing.B) { + aggregationKeys := []aggregationKey{ + { + aggregationID: aggregation.DefaultID, + storagePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour), + pipeline: applied.NewPipeline([]applied.OpUnion{ + { + Type: pipeline.TransformationOpType, + Transformation: pipeline.TransformationOp{Type: transformation.Absolute}, + }, + { + Type: pipeline.RollupOpType, + Rollup: applied.RollupOp{ + ID: []byte("foo.baz1234"), + AggregationID: aggregation.DefaultID, + }, + }, + }), + }, + { + aggregationID: aggregation.DefaultID, + storagePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour), + pipeline: applied.NewPipeline([]applied.OpUnion{ + { + Type: pipeline.TransformationOpType, + Transformation: pipeline.TransformationOp{Type: transformation.Absolute}, + }, + { + Type: pipeline.RollupOpType, + Rollup: applied.RollupOp{ + ID: []byte("foo.baz55"), + AggregationID: aggregation.DefaultID, + }, + }, + }), + }, + { + aggregationID: aggregation.DefaultID, + storagePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour), + pipeline: applied.NewPipeline([]applied.OpUnion{ + { + Type: pipeline.TransformationOpType, + Transformation: pipeline.TransformationOp{Type: transformation.Absolute}, + }, + { + Type: pipeline.RollupOpType, + Rollup: applied.RollupOp{ + ID: []byte("foo.baz45"), + AggregationID: aggregation.DefaultID, + }, + }, + }), + }, + { + aggregationID: aggregation.DefaultID, + storagePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour), + pipeline: applied.NewPipeline([]applied.OpUnion{ + { + Type: pipeline.TransformationOpType, + Transformation: pipeline.TransformationOp{Type: transformation.Absolute}, + }, + { + Type: pipeline.RollupOpType, + Rollup: applied.RollupOp{ + ID: []byte("foo.baz1234"), + AggregationID: aggregation.DefaultID, + }, + }, + }), + }, + { + aggregationID: aggregation.DefaultID, + storagePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour), + pipeline: applied.NewPipeline([]applied.OpUnion{ + { + Type: pipeline.TransformationOpType, + Transformation: pipeline.TransformationOp{Type: transformation.Absolute}, + }, + { + Type: pipeline.RollupOpType, + Rollup: applied.RollupOp{ + ID: []byte("foo.baz42"), + AggregationID: aggregation.DefaultID, + }, + }, + }), + }, + } + + vals := make(aggregationValues, len(aggregationKeys)) + for i := range aggregationKeys { + vals[i] = aggregationValue{key: aggregationKeys[i]} + } + + b.ResetTimer() + var contains bool + for n := 0; n < b.N; n++ { + contains = vals.contains(aggregationKeys[len(aggregationKeys)-1]) + } + runtime.KeepAlive(contains) +} diff --git a/src/aggregator/aggregator/forwarded_writer.go b/src/aggregator/aggregator/forwarded_writer.go index 43a98c2534..f3e7db37d7 100644 --- a/src/aggregator/aggregator/forwarded_writer.go +++ b/src/aggregator/aggregator/forwarded_writer.go @@ -474,8 +474,8 @@ func (agg *forwardedAggregation) onDone(key aggregationKey) error { } func (agg *forwardedAggregation) index(key aggregationKey) int { - for i, k := range agg.byKey { - if k.key.Equal(key) { + for i := range agg.byKey { + if agg.byKey[i].key.Equal(key) { return i } } diff --git a/src/metrics/metadata/metadata_benchmark_test.go b/src/metrics/metadata/metadata_benchmark_test.go new file mode 100644 index 0000000000..8577a448db --- /dev/null +++ b/src/metrics/metadata/metadata_benchmark_test.go @@ -0,0 +1,42 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package metadata + +import ( + "runtime" + "testing" +) + +func isDefault(m StagedMetadatas) bool { + return m.IsDefault() +} + +func BenchmarkMetadata_IsDefault(b *testing.B) { + m := testLargeStagedMetadatas + m = append(m, testLargeStagedMetadatas...) + for i := 0; i < b.N; i++ { + m[0].CutoverNanos = int64(i) + if isDefault(m) { + b.Fail() + } + } + runtime.KeepAlive(m) +} diff --git a/src/metrics/pipeline/applied/type.go b/src/metrics/pipeline/applied/type.go index b25a1abfe9..a6db631164 100644 --- a/src/metrics/pipeline/applied/type.go +++ b/src/metrics/pipeline/applied/type.go @@ -91,25 +91,25 @@ type OpUnion struct { // Equal determines whether two operation unions are equal. func (u OpUnion) Equal(other OpUnion) bool { + // keep in sync with Pipeline.Equal as go is terrible at inlining anything with a loop if u.Type != other.Type { return false } - switch u.Type { - case pipeline.TransformationOpType: - return u.Transformation.Equal(other.Transformation) - case pipeline.RollupOpType: + + if u.Type == pipeline.RollupOpType { return u.Rollup.Equal(other.Rollup) } - return true + + return u.Transformation.Type == other.Transformation.Type } // Clone clones an operation union. func (u OpUnion) Clone() OpUnion { - clone := OpUnion{Type: u.Type} - switch u.Type { - case pipeline.TransformationOpType: - clone.Transformation = u.Transformation.Clone() - case pipeline.RollupOpType: + clone := OpUnion{ + Type: u.Type, + Transformation: u.Transformation, + } + if u.Type == pipeline.RollupOpType { clone.Rollup = u.Rollup.Clone() } return clone @@ -187,22 +187,36 @@ func (p Pipeline) At(i int) OpUnion { return p.operations[i] } // Equal determines whether two pipelines are equal. func (p Pipeline) Equal(other Pipeline) bool { + // keep in sync with OpUnion.Equal as go is terrible at inlining anything with a loop if len(p.operations) != len(other.operations) { return false } + for i := 0; i < len(p.operations); i++ { - if !p.operations[i].Equal(other.operations[i]) { + if p.operations[i].Type != other.operations[i].Type { return false } + //nolint:exhaustive + switch p.operations[i].Type { + case pipeline.RollupOpType: + if !p.operations[i].Rollup.Equal(other.operations[i].Rollup) { + return false + } + case pipeline.TransformationOpType: + if p.operations[i].Transformation.Type != other.operations[i].Transformation.Type { + return false + } + } } + return true } // Clone clones the pipeline. func (p Pipeline) Clone() Pipeline { clone := make([]OpUnion, len(p.operations)) - for i, op := range p.operations { - clone[i] = op.Clone() + for i := range p.operations { + clone[i] = p.operations[i].Clone() } return Pipeline{operations: clone} } diff --git a/src/metrics/pipeline/applied/type_test.go b/src/metrics/pipeline/applied/type_test.go index b9ece8b22b..edd8e26b39 100644 --- a/src/metrics/pipeline/applied/type_test.go +++ b/src/metrics/pipeline/applied/type_test.go @@ -21,6 +21,7 @@ package applied import ( + "fmt" "testing" "github.com/m3db/m3/src/metrics/aggregation" @@ -360,8 +361,27 @@ func TestPipelineEqual(t *testing.T) { } for _, input := range inputs { - require.Equal(t, input.expected, input.p1.Equal(input.p2)) - require.Equal(t, input.expected, input.p2.Equal(input.p1)) + input := input + t.Run(fmt.Sprintf("%v %v", input.p1.String(), input.p2.String()), func(t *testing.T) { + require.Equal(t, input.expected, input.p1.Equal(input.p2)) + require.Equal(t, input.expected, input.p2.Equal(input.p1)) + // assert implementation is equal to OpUnion + if input.expected { + for i, op := range input.p1.operations { + require.True(t, op.Equal(input.p2.operations[i])) + } + for i, op := range input.p2.operations { + require.True(t, op.Equal(input.p1.operations[i])) + } + } else if len(input.p1.operations) == len(input.p2.operations) { + for i, op := range input.p1.operations { + require.False(t, op.Equal(input.p2.operations[i])) + } + for i, op := range input.p2.operations { + require.False(t, op.Equal(input.p1.operations[i])) + } + } + }) } } diff --git a/src/metrics/pipeline/type.go b/src/metrics/pipeline/type.go index d2091f6dc5..9e25da72e8 100644 --- a/src/metrics/pipeline/type.go +++ b/src/metrics/pipeline/type.go @@ -200,10 +200,10 @@ func NewRollupOpFromProto(pb *pipelinepb.RollupOp) (RollupOp, error) { // SameTransform returns true if the two rollup operations have the same rollup transformation // (i.e., same new rollup metric name and same set of rollup tags). func (op RollupOp) SameTransform(other RollupOp) bool { - if !bytes.Equal(op.NewName, other.NewName) { + if len(op.Tags) != len(other.Tags) { return false } - if len(op.Tags) != len(other.Tags) { + if !bytes.Equal(op.NewName, other.NewName) { return false } // Sort the tags and compare.