diff --git a/src/cmd/services/m3coordinator/downsample/downsampler.go b/src/cmd/services/m3coordinator/downsample/downsampler.go index 9a635758cc..0e5fc69689 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler.go @@ -127,15 +127,16 @@ func defaultMetricsAppenderOptions(opts DownsamplerOptions, agg agg) metricsAppe } return metricsAppenderOptions{ - agg: agg.aggregator, - clientRemote: agg.clientRemote, - clockOpts: agg.clockOpts, - tagEncoderPool: agg.pools.tagEncoderPool, - matcher: agg.matcher, - metricTagsIteratorPool: agg.pools.metricTagsIteratorPool, - debugLogging: debugLogging, - logger: logger, - augmentM3Tags: agg.augmentM3Tags, + agg: agg.aggregator, + clientRemote: agg.clientRemote, + clockOpts: agg.clockOpts, + tagEncoderPool: agg.pools.tagEncoderPool, + matcher: agg.matcher, + metricTagsIteratorPool: agg.pools.metricTagsIteratorPool, + debugLogging: debugLogging, + logger: logger, + augmentM3Tags: agg.augmentM3Tags, + includeRollupsOnDefaultRuleFiltering: agg.includeRollupsOnDefaultRuleFiltering, } } diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 25241157f3..43ea1fe50c 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -124,6 +124,86 @@ func TestDownsamplerAggregationWithAutoMappingRulesFromNamespacesWatcher(t *test testDownsamplerAggregation(t, testDownsampler) } +func TestDownsamplerAggregationDownsamplesRawMetricWithRollupRule(t *testing.T) { + t.Parallel() + + gaugeMetrics, _ := testGaugeMetrics(testGaugeMetricsOptions{}) + require.Equal(t, 1, len(gaugeMetrics)) + + gaugeMetric := gaugeMetrics[0] + numSamples := len(gaugeMetric.samples) + + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ + rulesConfig: &RulesConfiguration{ + RollupRules: []RollupRuleConfiguration{ + { + Filter: fmt.Sprintf( + "%s:gauge0", + nameTag), + Transforms: []TransformConfiguration{ + { + Transform: &TransformOperationConfiguration{ + Type: transformation.Increase, + }, + }, + { + Rollup: &RollupOperationConfiguration{ + MetricName: "rolled_up_metric", + GroupBy: []string{"app"}, + Aggregations: []aggregation.Type{aggregation.Sum}, + }, + }, + { + Transform: &TransformOperationConfiguration{ + Type: transformation.Add, + }, + }, + }, + StoragePolicies: []StoragePolicyConfiguration{ + { + Resolution: 2 * time.Second, + Retention: 24 * time.Hour, + }, + }, + }, + }, + }, + ingest: &testDownsamplerOptionsIngest{ + gaugeMetrics: gaugeMetrics, + }, + expect: &testDownsamplerOptionsExpect{ + writes: []testExpectedWrite{ + // Raw aggregated metric + { + tags: gaugeMetric.tags, + // NB(nate): Automapping rules generated from cluster namespaces currently + // hardcode 'Last' as the aggregation type. As such, expect value to be the last value + // in the sample. + values: []expectedValue{{value: gaugeMetric.samples[numSamples-1]}}, + }, + }, + }, + }) + + // Setup auto-mapping rules. + require.False(t, testDownsampler.downsampler.Enabled()) + origStagedMetadata := originalStagedMetadata(t, testDownsampler) + ctrl := xtest.NewController(t) + defer ctrl.Finish() + session := dbclient.NewMockSession(ctrl) + setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{ + NamespaceID: ident.StringID("2s:1d"), + Resolution: 2 * time.Second, + Retention: 24 * time.Hour, + Session: session, + }) + waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata) + require.True(t, testDownsampler.downsampler.Enabled()) + + // Test expected output + testDownsamplerAggregation(t, testDownsampler) +} + func TestDownsamplerAggregationToggleEnabled(t *testing.T) { t.Parallel() @@ -1008,6 +1088,21 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesPerSecondSum(t *testing }, }) + // Setup auto-mapping rules. + require.False(t, testDownsampler.downsampler.Enabled()) + origStagedMetadata := originalStagedMetadata(t, testDownsampler) + ctrl := xtest.NewController(t) + defer ctrl.Finish() + session := dbclient.NewMockSession(ctrl) + setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{ + NamespaceID: ident.StringID("2s:1d"), + Resolution: 2 * time.Second, + Retention: 24 * time.Hour, + Session: session, + }) + waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata) + require.True(t, testDownsampler.downsampler.Enabled()) + // Test expected output testDownsamplerAggregation(t, testDownsampler) } diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index d073e813ee..54dc8d8aa5 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -98,11 +98,12 @@ type metricsAppenderOptions struct { agg aggregator.Aggregator clientRemote client.Client - defaultStagedMetadatasProtos []metricpb.StagedMetadatas - matcher matcher.Matcher - tagEncoderPool serialize.TagEncoderPool - metricTagsIteratorPool serialize.MetricTagsIteratorPool - augmentM3Tags bool + defaultStagedMetadatasProtos []metricpb.StagedMetadatas + matcher matcher.Matcher + tagEncoderPool serialize.TagEncoderPool + metricTagsIteratorPool serialize.MetricTagsIteratorPool + augmentM3Tags bool + includeRollupsOnDefaultRuleFiltering bool clockOpts clock.Options debugLogging bool @@ -230,23 +231,37 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp // name and tags (i.e. overwriting each other). a.mappingRuleStoragePolicies = a.mappingRuleStoragePolicies[:0] - mappingRuleStagedMetadatas := matchResult.ForExistingIDAt(nowNanos) - if !mappingRuleStagedMetadatas.IsDefault() && len(mappingRuleStagedMetadatas) != 0 { - a.debugLogMatch("downsampler applying matched mapping rule", - debugLogMatchOptions{Meta: mappingRuleStagedMetadatas}) + ruleStagedMetadatas := matchResult.ForExistingIDAt(nowNanos) + if !ruleStagedMetadatas.IsDefault() && len(ruleStagedMetadatas) != 0 { + a.debugLogMatch("downsampler applying matched rule", + debugLogMatchOptions{Meta: ruleStagedMetadatas}) - // Collect all the current active mapping rules - for _, stagedMetadata := range mappingRuleStagedMetadatas { + // Collect storage policies for all the current active mapping rules. + // TODO: we should convert this to iterate over pointers + // nolint:gocritic + for _, stagedMetadata := range ruleStagedMetadatas { for _, pipe := range stagedMetadata.Pipelines { - for _, sp := range pipe.StoragePolicies { - a.mappingRuleStoragePolicies = - append(a.mappingRuleStoragePolicies, sp) + // Skip rollup rules unless configured otherwise. + // We only want to consider mapping rules here, + // as we still want to apply default mapping rules to + // metrics that are rolled up to ensure the underlying metric + // gets written to aggregated namespaces. + if a.includeRollupsOnDefaultRuleFiltering || !pipe.IsRollupRule() { + for _, sp := range pipe.StoragePolicies { + a.mappingRuleStoragePolicies = + append(a.mappingRuleStoragePolicies, sp) + } + } else { + a.debugLogMatch( + "skipping rollup rule in populating active mapping rule policies", + debugLogMatchOptions{}, + ) } } } // Only sample if going to actually aggregate - pipelines := mappingRuleStagedMetadatas[len(mappingRuleStagedMetadatas)-1] + pipelines := ruleStagedMetadatas[len(ruleStagedMetadatas)-1] a.curr.Pipelines = append(a.curr.Pipelines, pipelines.Pipelines...) } diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index d6886cf959..1fa70f6555 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -225,10 +225,11 @@ type agg struct { aggregator aggregator.Aggregator clientRemote client.Client - clockOpts clock.Options - matcher matcher.Matcher - pools aggPools - augmentM3Tags bool + clockOpts clock.Options + matcher matcher.Matcher + pools aggPools + augmentM3Tags bool + includeRollupsOnDefaultRuleFiltering bool } // Configuration configurates a downsampler. @@ -270,6 +271,14 @@ type Configuration struct { // Furthermore, the option is automatically enabled if static rules are // used and any filter contain an __m3_type__ tag. AugmentM3Tags bool `yaml:"augmentM3Tags"` + + // IncludeRollupsOnDefaultRuleFiltering will include rollup rules + // when deciding if the downsampler should ignore the default auto mapping rules + // based on the storage policies applied on a given rule. + // This is usually not what you want to do, as it means the raw metric + // that is being rolled up by your rule will not be forward into aggregated namespaces, + // and will only be written to the unaggregated namespace. + IncludeRollupsOnDefaultRuleFiltering bool `yaml:"includeRollupsOnDefaultRuleFiltering"` } // MatcherConfiguration is the configuration for the rule matcher. @@ -788,10 +797,11 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { } return agg{ - clientRemote: client, - matcher: matcher, - pools: pools, - augmentM3Tags: augmentM3Tags, + clientRemote: client, + matcher: matcher, + pools: pools, + augmentM3Tags: augmentM3Tags, + includeRollupsOnDefaultRuleFiltering: cfg.IncludeRollupsOnDefaultRuleFiltering, }, nil } @@ -953,10 +963,11 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { } return agg{ - aggregator: aggregatorInstance, - matcher: matcher, - pools: pools, - augmentM3Tags: augmentM3Tags, + aggregator: aggregatorInstance, + matcher: matcher, + pools: pools, + augmentM3Tags: augmentM3Tags, + includeRollupsOnDefaultRuleFiltering: cfg.IncludeRollupsOnDefaultRuleFiltering, }, nil } diff --git a/src/metrics/metadata/metadata.go b/src/metrics/metadata/metadata.go index 8898930153..534ebd1f91 100644 --- a/src/metrics/metadata/metadata.go +++ b/src/metrics/metadata/metadata.go @@ -106,6 +106,12 @@ func (m PipelineMetadata) IsDefault() bool { m.DropPolicy.IsDefault() } +// IsRollupRule returns whether this is a rollup rule pipeline metadata. +// nolint:gocritic +func (m PipelineMetadata) IsRollupRule() bool { + return !m.Pipeline.IsEmpty() +} + // IsDropPolicyApplied returns whether this is the default standard pipeline // but with the drop policy applied. func (m PipelineMetadata) IsDropPolicyApplied() bool {