Skip to content

Commit

Permalink
[coordinator] Ignore rollup rule storage policies when deciding to ut…
Browse files Browse the repository at this point in the history
…iilize auto-mapping rules
  • Loading branch information
wesleyk committed Dec 23, 2020
1 parent d5122ee commit 554f042
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 36 deletions.
19 changes: 10 additions & 9 deletions src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,16 @@ func defaultMetricsAppenderOptions(opts DownsamplerOptions, agg agg) metricsAppe
}

return metricsAppenderOptions{
agg: agg.aggregator,
clientRemote: agg.clientRemote,
clockOpts: agg.clockOpts,
tagEncoderPool: agg.pools.tagEncoderPool,
matcher: agg.matcher,
metricTagsIteratorPool: agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
augmentM3Tags: agg.augmentM3Tags,
agg: agg.aggregator,
clientRemote: agg.clientRemote,
clockOpts: agg.clockOpts,
tagEncoderPool: agg.pools.tagEncoderPool,
matcher: agg.matcher,
metricTagsIteratorPool: agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
augmentM3Tags: agg.augmentM3Tags,
includeRollupsOnDefaultRuleFiltering: agg.includeRollupsOnDefaultRuleFiltering,
}
}

Expand Down
95 changes: 95 additions & 0 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,86 @@ func TestDownsamplerAggregationWithAutoMappingRulesFromNamespacesWatcher(t *test
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationDownsamplesRawMetricWithRollupRule(t *testing.T) {
t.Parallel()

gaugeMetrics, _ := testGaugeMetrics(testGaugeMetricsOptions{})
require.Equal(t, 1, len(gaugeMetrics))

gaugeMetric := gaugeMetrics[0]
numSamples := len(gaugeMetric.samples)

testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
rulesConfig: &RulesConfiguration{
RollupRules: []RollupRuleConfiguration{
{
Filter: fmt.Sprintf(
"%s:gauge0",
nameTag),
Transforms: []TransformConfiguration{
{
Transform: &TransformOperationConfiguration{
Type: transformation.Increase,
},
},
{
Rollup: &RollupOperationConfiguration{
MetricName: "rolled_up_metric",
GroupBy: []string{"app"},
Aggregations: []aggregation.Type{aggregation.Sum},
},
},
{
Transform: &TransformOperationConfiguration{
Type: transformation.Add,
},
},
},
StoragePolicies: []StoragePolicyConfiguration{
{
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
},
},
},
},
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: gaugeMetrics,
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
// Raw aggregated metric
{
tags: gaugeMetric.tags,
// NB(nate): Automapping rules generated from cluster namespaces currently
// hardcode 'Last' as the aggregation type. As such, expect value to be the last value
// in the sample.
values: []expectedValue{{value: gaugeMetric.samples[numSamples-1]}},
},
},
},
})

// Setup auto-mapping rules.
require.False(t, testDownsampler.downsampler.Enabled())
origStagedMetadata := originalStagedMetadata(t, testDownsampler)
ctrl := xtest.NewController(t)
defer ctrl.Finish()
session := dbclient.NewMockSession(ctrl)
setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{
NamespaceID: ident.StringID("2s:1d"),
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
Session: session,
})
waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata)
require.True(t, testDownsampler.downsampler.Enabled())

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationToggleEnabled(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1008,6 +1088,21 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesPerSecondSum(t *testing
},
})

// Setup auto-mapping rules.
require.False(t, testDownsampler.downsampler.Enabled())
origStagedMetadata := originalStagedMetadata(t, testDownsampler)
ctrl := xtest.NewController(t)
defer ctrl.Finish()
session := dbclient.NewMockSession(ctrl)
setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{
NamespaceID: ident.StringID("2s:1d"),
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
Session: session,
})
waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata)
require.True(t, testDownsampler.downsampler.Enabled())

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}
Expand Down
45 changes: 30 additions & 15 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,12 @@ type metricsAppenderOptions struct {
agg aggregator.Aggregator
clientRemote client.Client

defaultStagedMetadatasProtos []metricpb.StagedMetadatas
matcher matcher.Matcher
tagEncoderPool serialize.TagEncoderPool
metricTagsIteratorPool serialize.MetricTagsIteratorPool
augmentM3Tags bool
defaultStagedMetadatasProtos []metricpb.StagedMetadatas
matcher matcher.Matcher
tagEncoderPool serialize.TagEncoderPool
metricTagsIteratorPool serialize.MetricTagsIteratorPool
augmentM3Tags bool
includeRollupsOnDefaultRuleFiltering bool

clockOpts clock.Options
debugLogging bool
Expand Down Expand Up @@ -230,23 +231,37 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
// name and tags (i.e. overwriting each other).
a.mappingRuleStoragePolicies = a.mappingRuleStoragePolicies[:0]

mappingRuleStagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !mappingRuleStagedMetadatas.IsDefault() && len(mappingRuleStagedMetadatas) != 0 {
a.debugLogMatch("downsampler applying matched mapping rule",
debugLogMatchOptions{Meta: mappingRuleStagedMetadatas})
ruleStagedMetadatas := matchResult.ForExistingIDAt(nowNanos)
if !ruleStagedMetadatas.IsDefault() && len(ruleStagedMetadatas) != 0 {
a.debugLogMatch("downsampler applying matched rule",
debugLogMatchOptions{Meta: ruleStagedMetadatas})

// Collect all the current active mapping rules
for _, stagedMetadata := range mappingRuleStagedMetadatas {
// Collect storage policies for all the current active mapping rules.
// TODO: we should convert this to iterate over pointers
// nolint:gocritic
for _, stagedMetadata := range ruleStagedMetadatas {
for _, pipe := range stagedMetadata.Pipelines {
for _, sp := range pipe.StoragePolicies {
a.mappingRuleStoragePolicies =
append(a.mappingRuleStoragePolicies, sp)
// Skip rollup rules unless configured otherwise.
// We only want to consider mapping rules here,
// as we still want to apply default mapping rules to
// metrics that are rolled up to ensure the underlying metric
// gets written to aggregated namespaces.
if a.includeRollupsOnDefaultRuleFiltering || !pipe.IsRollupRule() {
for _, sp := range pipe.StoragePolicies {
a.mappingRuleStoragePolicies =
append(a.mappingRuleStoragePolicies, sp)
}
} else {
a.debugLogMatch(
"skipping rollup rule in populating active mapping rule policies",
debugLogMatchOptions{},
)
}
}
}

// Only sample if going to actually aggregate
pipelines := mappingRuleStagedMetadatas[len(mappingRuleStagedMetadatas)-1]
pipelines := ruleStagedMetadatas[len(ruleStagedMetadatas)-1]
a.curr.Pipelines =
append(a.curr.Pipelines, pipelines.Pipelines...)
}
Expand Down
35 changes: 23 additions & 12 deletions src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ type agg struct {
aggregator aggregator.Aggregator
clientRemote client.Client

clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
augmentM3Tags bool
clockOpts clock.Options
matcher matcher.Matcher
pools aggPools
augmentM3Tags bool
includeRollupsOnDefaultRuleFiltering bool
}

// Configuration configurates a downsampler.
Expand Down Expand Up @@ -270,6 +271,14 @@ type Configuration struct {
// Furthermore, the option is automatically enabled if static rules are
// used and any filter contain an __m3_type__ tag.
AugmentM3Tags bool `yaml:"augmentM3Tags"`

// IncludeRollupsOnDefaultRuleFiltering will include rollup rules
// when deciding if the downsampler should ignore the default auto mapping rules
// based on the storage policies applied on a given rule.
// This is usually not what you want to do, as it means the raw metric
// that is being rolled up by your rule will not be forward into aggregated namespaces,
// and will only be written to the unaggregated namespace.
IncludeRollupsOnDefaultRuleFiltering bool `yaml:"includeRollupsOnDefaultRuleFiltering"`
}

// MatcherConfiguration is the configuration for the rule matcher.
Expand Down Expand Up @@ -788,10 +797,11 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

return agg{
clientRemote: client,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
clientRemote: client,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
includeRollupsOnDefaultRuleFiltering: cfg.IncludeRollupsOnDefaultRuleFiltering,
}, nil
}

Expand Down Expand Up @@ -953,10 +963,11 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
}

return agg{
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
aggregator: aggregatorInstance,
matcher: matcher,
pools: pools,
augmentM3Tags: augmentM3Tags,
includeRollupsOnDefaultRuleFiltering: cfg.IncludeRollupsOnDefaultRuleFiltering,
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions src/metrics/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func (m PipelineMetadata) IsDefault() bool {
m.DropPolicy.IsDefault()
}

// IsRollupRule returns whether this is a rollup rule pipeline metadata.
// nolint:gocritic
func (m PipelineMetadata) IsRollupRule() bool {
return !m.Pipeline.IsEmpty()
}

// IsDropPolicyApplied returns whether this is the default standard pipeline
// but with the drop policy applied.
func (m PipelineMetadata) IsDropPolicyApplied() bool {
Expand Down

0 comments on commit 554f042

Please sign in to comment.