Skip to content

Commit

Permalink
[coordinator] Ignore rollup rule storage policies when deciding to ut…
Browse files Browse the repository at this point in the history
…ilize auto-mapping rules (#3044)

* [coordinator] Ignore rollup rule storage policies when deciding to utiilize auto-mapping rules

* IsRollupRule -> IsMappingRule
  • Loading branch information
wesleyk authored Dec 23, 2020
1 parent d5122ee commit 3ad9380
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 36 deletions.
19 changes: 10 additions & 9 deletions src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,16 @@ func defaultMetricsAppenderOptions(opts DownsamplerOptions, agg agg) metricsAppe
}

return metricsAppenderOptions{
agg: agg.aggregator,
clientRemote: agg.clientRemote,
clockOpts: agg.clockOpts,
tagEncoderPool: agg.pools.tagEncoderPool,
matcher: agg.matcher,
metricTagsIteratorPool: agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
augmentM3Tags: agg.augmentM3Tags,
agg: agg.aggregator,
clientRemote: agg.clientRemote,
clockOpts: agg.clockOpts,
tagEncoderPool: agg.pools.tagEncoderPool,
matcher: agg.matcher,
metricTagsIteratorPool: agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
augmentM3Tags: agg.augmentM3Tags,
includeRollupsOnDefaultRuleFiltering: agg.includeRollupsOnDefaultRuleFiltering,
}
}

Expand Down
97 changes: 97 additions & 0 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,103 @@ func TestDownsamplerAggregationWithAutoMappingRulesFromNamespacesWatcher(t *test
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationDownsamplesRawMetricWithRollupRule(t *testing.T) {
t.Parallel()

gaugeMetric := testGaugeMetric{
tags: map[string]string{
nameTag: "http_requests",
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
"not_rolled_up": "not_rolled_up_value",
},
timedSamples: []testGaugeMetricTimedSample{
{value: 42},
{value: 64, offset: 1 * time.Second},
},
}
res := 1 * time.Second
ret := 30 * 24 * time.Hour
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
rulesConfig: &RulesConfiguration{
RollupRules: []RollupRuleConfiguration{
{
Filter: fmt.Sprintf(
"%s:http_requests app:* status_code:* endpoint:*",
nameTag),
Transforms: []TransformConfiguration{
{
Transform: &TransformOperationConfiguration{
Type: transformation.PerSecond,
},
},
{
Rollup: &RollupOperationConfiguration{
MetricName: "http_requests_by_status_code",
GroupBy: []string{"app", "status_code", "endpoint"},
Aggregations: []aggregation.Type{aggregation.Sum},
},
},
},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: res,
Retention: ret,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
// aggregated rollup metric
{
tags: map[string]string{
nameTag: "http_requests_by_status_code",
string(rollupTagName): string(rollupTagValue),
"app": "nginx_edge",
"status_code": "500",
"endpoint": "/foo/bar",
},
values: []expectedValue{{value: 22}},
attributes: &storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: res,
Retention: ret,
},
},
// raw aggregated metric
{
tags: gaugeMetric.tags,
values: []expectedValue{{value: 42}, {value: 64}},
},
},
},
})

// Setup auto-mapping rules.
require.False(t, testDownsampler.downsampler.Enabled())
origStagedMetadata := originalStagedMetadata(t, testDownsampler)
ctrl := xtest.NewController(t)
defer ctrl.Finish()
session := dbclient.NewMockSession(ctrl)
setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{
NamespaceID: ident.StringID("1s:30d"),
Resolution: res,
Retention: ret,
Session: session,
})
waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata)
require.True(t, testDownsampler.downsampler.Enabled())

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationToggleEnabled(t *testing.T) {
t.Parallel()

Expand Down
45 changes: 30 additions & 15 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ type metricsAppenderOptions struct {
agg aggregator.Aggregator
clientRemote client.Client

defaultStagedMetadatasProtos []metricpb.StagedMetadatas
matcher matcher.Matcher
tagEncoderPool serialize.TagEncoderPool
metricTagsIteratorPool serialize.MetricTagsIteratorPool
augmentM3Tags bool
defaultStagedMetadatasProtos []metricpb.StagedMetadatas
matcher matcher.Matcher
tagEncoderPool serialize.TagEncoderPool
metricTagsIteratorPool serialize.MetricTagsIteratorPool
augmentM3Tags bool
includeRollupsOnDefaultRuleFiltering bool

clockOpts clock.Options
debugLogging bool
Expand Down Expand Up @@ -230,23 +231,37 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
// name and tags (i.e. overwriting each other).
a.mappingRuleStoragePolicies = a.mappingRuleStoragePolicies[:0]

mappingRuleStagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !mappingRuleStagedMetadatas.IsDefault() && len(mappingRuleStagedMetadatas) != 0 {
a.debugLogMatch("downsampler applying matched mapping rule",
debugLogMatchOptions{Meta: mappingRuleStagedMetadatas})
ruleStagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !ruleStagedMetadatas.IsDefault() && len(ruleStagedMetadatas) != 0 {
a.debugLogMatch("downsampler applying matched rule",
debugLogMatchOptions{Meta: ruleStagedMetadatas})

// Collect all the current active mapping rules
for _, stagedMetadata := range mappingRuleStagedMetadatas {
// Collect storage policies for all the current active mapping rules.
// TODO: we should convert this to iterate over pointers
// nolint:gocritic
for _, stagedMetadata := range ruleStagedMetadatas {
for _, pipe := range stagedMetadata.Pipelines {
for _, sp := range pipe.StoragePolicies {
a.mappingRuleStoragePolicies =
append(a.mappingRuleStoragePolicies, sp)
// Skip rollup rules unless configured otherwise.
// We only want to consider mapping rules here,
// as we still want to apply default mapping rules to
// metrics that are rolled up to ensure the underlying metric
// gets written to aggregated namespaces.
if a.includeRollupsOnDefaultRuleFiltering || pipe.IsMappingRule() {
for _, sp := range pipe.StoragePolicies {
a.mappingRuleStoragePolicies =
append(a.mappingRuleStoragePolicies, sp)
}
} else {
a.debugLogMatch(
"skipping rollup rule in populating active mapping rule policies",
debugLogMatchOptions{},
)
}
}
}

// Only sample if going to actually aggregate
pipelines := mappingRuleStagedMetadatas[len(mappingRuleStagedMetadatas)-1]
pipelines := ruleStagedMetadatas[len(ruleStagedMetadatas)-1]
a.curr.Pipelines =
append(a.curr.Pipelines, pipelines.Pipelines...)
}
Expand Down
35 changes: 23 additions & 12 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ type agg struct {
aggregator aggregator.Aggregator
clientRemote client.Client

clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
augmentM3Tags bool
clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
augmentM3Tags bool
includeRollupsOnDefaultRuleFiltering bool
}

// Configuration configurates a downsampler.
Expand Down Expand Up @@ -270,6 +271,14 @@ type Configuration struct {
// Furthermore, the option is automatically enabled if static rules are
// used and any filter contain an __m3_type__ tag.
AugmentM3Tags bool `yaml:"augmentM3Tags"`

// IncludeRollupsOnDefaultRuleFiltering will include rollup rules
// when deciding if the downsampler should ignore the default auto mapping rules
// based on the storage policies applied on a given rule.
// This is usually not what you want to do, as it means the raw metric
// that is being rolled up by your rule will not be forward into aggregated namespaces,
// and will only be written to the unaggregated namespace.
IncludeRollupsOnDefaultRuleFiltering bool `yaml:"includeRollupsOnDefaultRuleFiltering"`
}

// MatcherConfiguration is the configuration for the rule matcher.
Expand Down Expand Up @@ -788,10 +797,11 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

return agg{
clientRemote: client,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
clientRemote: client,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
includeRollupsOnDefaultRuleFiltering: cfg.IncludeRollupsOnDefaultRuleFiltering,
}, nil
}

Expand Down Expand Up @@ -953,10 +963,11 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

return agg{
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
includeRollupsOnDefaultRuleFiltering: cfg.IncludeRollupsOnDefaultRuleFiltering,
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions src/metrics/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func (m PipelineMetadata) IsDefault() bool {
m.DropPolicy.IsDefault()
}

// IsMappingRule returns whether this is a rollup rule pipeline metadata.
// nolint:gocritic
func (m PipelineMetadata) IsMappingRule() bool {
return m.Pipeline.IsEmpty()
}

// IsDropPolicyApplied returns whether this is the default standard pipeline
// but with the drop policy applied.
func (m PipelineMetadata) IsDropPolicyApplied() bool {
Expand Down

0 comments on commit 3ad9380

Please sign in to comment.