From 93cf1c3a0f7d77c4b9e95cc2a5bcbc521d6a61ff Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Wed, 23 Dec 2020 17:21:15 -0500 Subject: [PATCH] IsRollupRule -> IsMappingRule --- .../downsample/downsampler_test.go | 86 ++++++++++--------- .../downsample/metrics_appender.go | 2 +- src/metrics/metadata/metadata.go | 6 +- 3 files changed, 48 insertions(+), 46 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 43ea1fe50c..72dc1aebb5 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -127,59 +127,76 @@ func TestDownsamplerAggregationWithAutoMappingRulesFromNamespacesWatcher(t *test func TestDownsamplerAggregationDownsamplesRawMetricWithRollupRule(t *testing.T) { t.Parallel() - gaugeMetrics, _ := testGaugeMetrics(testGaugeMetricsOptions{}) - require.Equal(t, 1, len(gaugeMetrics)) - - gaugeMetric := gaugeMetrics[0] - numSamples := len(gaugeMetric.samples) - + gaugeMetric := testGaugeMetric{ + tags: map[string]string{ + nameTag: "http_requests", + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + "not_rolled_up": "not_rolled_up_value", + }, + timedSamples: []testGaugeMetricTimedSample{ + {value: 42}, + {value: 64, offset: 1 * time.Second}, + }, + } + res := 1 * time.Second + ret := 30 * 24 * time.Hour testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ rulesConfig: &RulesConfiguration{ RollupRules: []RollupRuleConfiguration{ { Filter: fmt.Sprintf( - "%s:gauge0", + "%s:http_requests app:* status_code:* endpoint:*", nameTag), Transforms: []TransformConfiguration{ { Transform: &TransformOperationConfiguration{ - Type: transformation.Increase, + Type: transformation.PerSecond, }, }, { Rollup: &RollupOperationConfiguration{ - MetricName: "rolled_up_metric", - GroupBy: []string{"app"}, + MetricName: "http_requests_by_status_code", + GroupBy: []string{"app", "status_code", "endpoint"}, Aggregations: []aggregation.Type{aggregation.Sum}, }, }, - { - Transform: &TransformOperationConfiguration{ - Type: transformation.Add, - }, - }, }, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 2 * time.Second, - Retention: 24 * time.Hour, + Resolution: res, + Retention: ret, }, }, }, }, }, ingest: &testDownsamplerOptionsIngest{ - gaugeMetrics: gaugeMetrics, + gaugeMetrics: []testGaugeMetric{gaugeMetric}, }, expect: &testDownsamplerOptionsExpect{ writes: []testExpectedWrite{ - // Raw aggregated metric + // aggregated rollup metric { - tags: gaugeMetric.tags, - // NB(nate): Automapping rules generated from cluster namespaces currently - // hardcode 'Last' as the aggregation type. As such, expect value to be the last value - // in the sample. - values: []expectedValue{{value: gaugeMetric.samples[numSamples-1]}}, + tags: map[string]string{ + nameTag: "http_requests_by_status_code", + string(rollupTagName): string(rollupTagValue), + "app": "nginx_edge", + "status_code": "500", + "endpoint": "/foo/bar", + }, + values: []expectedValue{{value: 22}}, + attributes: &storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Resolution: res, + Retention: ret, + }, + }, + // raw aggregated metric + { + tags: gaugeMetric.tags, + values: []expectedValue{{value: 42}, {value: 64}}, }, }, }, @@ -192,9 +209,9 @@ func TestDownsamplerAggregationDownsamplesRawMetricWithRollupRule(t *testing.T) defer ctrl.Finish() session := dbclient.NewMockSession(ctrl) setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{ - NamespaceID: ident.StringID("2s:1d"), - Resolution: 2 * time.Second, - Retention: 24 * time.Hour, + NamespaceID: ident.StringID("1s:30d"), + Resolution: res, + Retention: ret, Session: session, }) waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata) @@ -1088,21 +1105,6 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesPerSecondSum(t *testing }, }) - // Setup auto-mapping rules. - require.False(t, testDownsampler.downsampler.Enabled()) - origStagedMetadata := originalStagedMetadata(t, testDownsampler) - ctrl := xtest.NewController(t) - defer ctrl.Finish() - session := dbclient.NewMockSession(ctrl) - setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{ - NamespaceID: ident.StringID("2s:1d"), - Resolution: 2 * time.Second, - Retention: 24 * time.Hour, - Session: session, - }) - waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata) - require.True(t, testDownsampler.downsampler.Enabled()) - // Test expected output testDownsamplerAggregation(t, testDownsampler) } diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index 54dc8d8aa5..46e26c0a04 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -246,7 +246,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp // as we still want to apply default mapping rules to // metrics that are rolled up to ensure the underlying metric // gets written to aggregated namespaces. - if a.includeRollupsOnDefaultRuleFiltering || !pipe.IsRollupRule() { + if a.includeRollupsOnDefaultRuleFiltering || pipe.IsMappingRule() { for _, sp := range pipe.StoragePolicies { a.mappingRuleStoragePolicies = append(a.mappingRuleStoragePolicies, sp) diff --git a/src/metrics/metadata/metadata.go b/src/metrics/metadata/metadata.go index 534ebd1f91..8c0dc2dbd8 100644 --- a/src/metrics/metadata/metadata.go +++ b/src/metrics/metadata/metadata.go @@ -106,10 +106,10 @@ func (m PipelineMetadata) IsDefault() bool { m.DropPolicy.IsDefault() } -// IsRollupRule returns whether this is a rollup rule pipeline metadata. +// IsMappingRule returns whether this is a rollup rule pipeline metadata. // nolint:gocritic -func (m PipelineMetadata) IsRollupRule() bool { - return !m.Pipeline.IsEmpty() +func (m PipelineMetadata) IsMappingRule() bool { + return m.Pipeline.IsEmpty() } // IsDropPolicyApplied returns whether this is the default standard pipeline