Skip to content

Commit

Permalink
IsRollupRule -> IsMappingRule
Browse files Browse the repository at this point in the history
  • Loading branch information
wesleyk committed Dec 23, 2020
1 parent 554f042 commit 93cf1c3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 46 deletions.
86 changes: 44 additions & 42 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,59 +127,76 @@ func TestDownsamplerAggregationWithAutoMappingRulesFromNamespacesWatcher(t *test
func TestDownsamplerAggregationDownsamplesRawMetricWithRollupRule(t *testing.T) {
t.Parallel()

gaugeMetrics, _ := testGaugeMetrics(testGaugeMetricsOptions{})
require.Equal(t, 1, len(gaugeMetrics))

gaugeMetric := gaugeMetrics[0]
numSamples := len(gaugeMetric.samples)

gaugeMetric := testGaugeMetric{
tags: map[string]string{
nameTag: "http_requests",
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
"not_rolled_up": "not_rolled_up_value",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 42},
{value: 64, offset: 1 * time.Second},
},
}
res := 1 * time.Second
ret := 30 * 24 * time.Hour
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
rulesConfig: &RulesConfiguration{
RollupRules: []RollupRuleConfiguration{
{
Filter: fmt.Sprintf(
"%s:gauge0",
"%s:http_requests app:* status_code:* endpoint:*",
nameTag),
Transforms: []TransformConfiguration{
{
Transform: &TransformOperationConfiguration{
Type: transformation.Increase,
Type: transformation.PerSecond,
},
},
{
Rollup: &RollupOperationConfiguration{
MetricName: "rolled_up_metric",
GroupBy: []string{"app"},
MetricName: "http_requests_by_status_code",
GroupBy: []string{"app", "status_code", "endpoint"},
Aggregations: []aggregation.Type{aggregation.Sum},
},
},
{
Transform: &TransformOperationConfiguration{
Type: transformation.Add,
},
},
},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
Resolution: res,
Retention: ret,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: gaugeMetrics,
gaugeMetrics: []testGaugeMetric{gaugeMetric},
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
// Raw aggregated metric
// aggregated rollup metric
{
tags: gaugeMetric.tags,
// NB(nate): Automapping rules generated from cluster namespaces currently
// hardcode 'Last' as the aggregation type. As such, expect value to be the last value
// in the sample.
values: []expectedValue{{value: gaugeMetric.samples[numSamples-1]}},
tags: map[string]string{
nameTag: "http_requests_by_status_code",
string(rollupTagName): string(rollupTagValue),
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
},
values: []expectedValue{{value: 22}},
attributes: &storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: res,
Retention: ret,
},
},
// raw aggregated metric
{
tags: gaugeMetric.tags,
values: []expectedValue{{value: 42}, {value: 64}},
},
},
},
Expand All @@ -192,9 +209,9 @@ func TestDownsamplerAggregationDownsamplesRawMetricWithRollupRule(t *testing.T)
defer ctrl.Finish()
session := dbclient.NewMockSession(ctrl)
setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{
NamespaceID: ident.StringID("2s:1d"),
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
NamespaceID: ident.StringID("1s:30d"),
Resolution: res,
Retention: ret,
Session: session,
})
waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata)
Expand Down Expand Up @@ -1088,21 +1105,6 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesPerSecondSum(t *testing
},
})

// Setup auto-mapping rules.
require.False(t, testDownsampler.downsampler.Enabled())
origStagedMetadata := originalStagedMetadata(t, testDownsampler)
ctrl := xtest.NewController(t)
defer ctrl.Finish()
session := dbclient.NewMockSession(ctrl)
setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{
NamespaceID: ident.StringID("2s:1d"),
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
Session: session,
})
waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata)
require.True(t, testDownsampler.downsampler.Enabled())

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
// as we still want to apply default mapping rules to
// metrics that are rolled up to ensure the underlying metric
// gets written to aggregated namespaces.
if a.includeRollupsOnDefaultRuleFiltering || !pipe.IsRollupRule() {
if a.includeRollupsOnDefaultRuleFiltering || pipe.IsMappingRule() {
for _, sp := range pipe.StoragePolicies {
a.mappingRuleStoragePolicies =
append(a.mappingRuleStoragePolicies, sp)
Expand Down
6 changes: 3 additions & 3 deletions src/metrics/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ func (m PipelineMetadata) IsDefault() bool {
m.DropPolicy.IsDefault()
}

// IsRollupRule returns whether this is a rollup rule pipeline metadata.
// IsMappingRule returns whether this is a rollup rule pipeline metadata.
// nolint:gocritic
func (m PipelineMetadata) IsRollupRule() bool {
return !m.Pipeline.IsEmpty()
func (m PipelineMetadata) IsMappingRule() bool {
return m.Pipeline.IsEmpty()
}

// IsDropPolicyApplied returns whether this is the default standard pipeline
Expand Down

0 comments on commit 93cf1c3

Please sign in to comment.