Skip to content

Commit

Permalink
cumulativetodeltaprocessor: Reopening #4444 Update cumulative to delta (
Browse files Browse the repository at this point in the history
#5772)

* Add metric tracking library to replace awsmetrics dependency.

* Update configuration of cumulativetodelta processor.

The configuration has been updated so that:
* An empty list of metrics defaults to converting all cumulative metrics
  to delta.
* The state TTL (MaxStale) is now considered configurable
* There is a configuration option to enable/disable conversion of
  non-monotonic cumulative metrics. Non-monotonic cumulative metrics are
  normally considered gauges and aggregation temporality should not be
  converted. The default is that only monotonic values are converted.

* Update cumulativetodelta processor logic.

* Uses the internal tracker library instead of AWS metrics library. This
  enables separation of timeseries by resource and instrumentation
  library.
* Metric data points which are invalid (the first in a series of
  non-monontic cumulative values) are now removed from the dataset.

* Update processor default test case.

By default, the cumulative to delta processor now converts all metrics.
Previously, the processor did not convert any metrics unless listed in
the configuration.

* Remove extra validate function call.

* Add processor benchmark.

* Remove aws metrics library.

* Update readme.

* Convert to using attributes rather than labels map.

* Fix usage of deprecated api.

* Retain NaN values.

* Exported field comments should start with field name.

Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>

* Remove monotonic configuration option.

* Fixes after merge

* Rename MaxStale to MaxStaleness

* Update README to reflect removal of monotonic_only setting

* Change processor to only convert metrics explicitly specified in the config

* Reintroduce test for invalid config with no metric names

* Rename max_stale to max_staleness

* Fix README

* List of metric names can no longer be nil or empty

Co-authored-by: Allan Feldman <afeldman@newrelic.com>
Co-authored-by: Allan Feldman <6374032+a-feld@users.noreply.github.com>
Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
  • Loading branch information
4 people authored Oct 19, 2021
1 parent 500ae00 commit 97fc0e2
Show file tree
Hide file tree
Showing 15 changed files with 931 additions and 83 deletions.
11 changes: 9 additions & 2 deletions processor/cumulativetodeltaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ The cumulative to delta processor (`cumulativetodeltaprocessor`) converts cumula

## Configuration

Configuration is specified through a list of metrics. The processor uses metric names to identify a set of cumulative sum metrics and converts them to cumulative delta.
Configuration is specified through a list of metrics. The processor uses metric names to identify a set of cumulative metrics and converts them from cumulative to delta.

The following settings can be optionally configured:

- `metrics`: The processor uses metric names to identify a set of cumulative metrics and converts them to delta.
- `max_stale`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0

#### Example

```yaml
processors:
Expand All @@ -23,4 +30,4 @@ processors:
.
.
- <metric_n_name>
```
```
8 changes: 7 additions & 1 deletion processor/cumulativetodeltaprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cumulativetodeltaprocessor

import (
"fmt"
"time"

"go.opentelemetry.io/collector/config"
)
Expand All @@ -24,10 +25,15 @@ import (
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// List of cumulative sum metrics to convert to delta
// List of cumulative metrics to convert to delta.
Metrics []string `mapstructure:"metrics"`

// MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely.
MaxStaleness time.Duration `mapstructure:"max_staleness"`
}

var _ config.Processor = (*Config)(nil)

// Validate checks whether the input configuration has all of the required fields for the processor.
// An error is returned if there are any invalid inputs.
func (config *Config) Validate() error {
Expand Down
32 changes: 17 additions & 15 deletions processor/cumulativetodeltaprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -26,35 +27,37 @@ import (
"go.opentelemetry.io/collector/config/configtest"
)

const configFile = "config.yaml"

func TestLoadingFullConfig(t *testing.T) {

factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", configFile), factories)
assert.NoError(t, err)
require.NotNil(t, cfg)

tests := []struct {
configFile string
expCfg *Config
expCfg *Config
}{
{
configFile: "config_full.yaml",
expCfg: &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Metrics: []string{
"metric1",
"metric2",
},
MaxStaleness: 10 * time.Second,
},
},
}

for _, test := range tests {
t.Run(test.expCfg.ID().String(), func(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
config, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", test.configFile), factories)
assert.NoError(t, err)
require.NotNil(t, config)

cfg := config.Processors[test.expCfg.ID()]
cfg := cfg.Processors[test.expCfg.ID()]
assert.Equal(t, test.expCfg, cfg)
})
}
Expand All @@ -67,7 +70,7 @@ func TestValidateConfig(t *testing.T) {
errorMessage string
}{
{
configName: "config_full.yaml",
configName: "config.yaml",
succeed: true,
},
{
Expand All @@ -92,6 +95,5 @@ func TestValidateConfig(t *testing.T) {
assert.EqualError(t, err, fmt.Sprintf("processor %q has invalid configuration: %s", typeStr, test.errorMessage))
}
})

}
}
3 changes: 1 addition & 2 deletions processor/cumulativetodeltaprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func createDefaultConfig() config.Processor {
}

func createMetricsProcessor(
ctx context.Context,
_ context.Context,
params component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
Expand All @@ -56,7 +56,6 @@ func createMetricsProcessor(
return nil, fmt.Errorf("configuration parsing error")
}

processorConfig.Validate()
metricsProcessor := newCumulativeToDeltaProcessor(processorConfig, params.Logger)

return processorhelper.NewMetricsProcessor(
Expand Down
2 changes: 1 addition & 1 deletion processor/cumulativetodeltaprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestCreateProcessors(t *testing.T) {
errorMessage string
}{
{
configName: "config_full.yaml",
configName: "config.yaml",
succeed: true,
},
}
Expand Down
3 changes: 0 additions & 3 deletions processor/cumulativetodeltaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumul
go 1.17

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics v0.37.1
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.37.1-0.20211015233822-bd87fb628058
go.opentelemetry.io/collector/model v0.37.1-0.20211015233822-bd87fb628058
Expand Down Expand Up @@ -41,5 +40,3 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics => ./../../internal/aws/metrics
150 changes: 91 additions & 59 deletions processor/cumulativetodeltaprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,35 @@ package cumulativetodeltaprocessor
import (
"context"
"math"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/tracking"
)

type cumulativeToDeltaProcessor struct {
metrics map[string]bool
metrics map[string]struct{}
logger *zap.Logger
deltaCalculator awsmetrics.MetricCalculator
deltaCalculator tracking.MetricTracker
cancelFunc context.CancelFunc
}

func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor {
inputMetricSet := make(map[string]bool, len(config.Metrics))
for _, name := range config.Metrics {
inputMetricSet[name] = true
}

return &cumulativeToDeltaProcessor{
metrics: inputMetricSet,
ctx, cancel := context.WithCancel(context.Background())
p := &cumulativeToDeltaProcessor{
logger: logger,
deltaCalculator: newDeltaCalculator(),
deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness),
cancelFunc: cancel,
}
if len(config.Metrics) > 0 {
p.metrics = make(map[string]struct{}, len(config.Metrics))
for _, m := range config.Metrics {
p.metrics[m] = struct{}{}
}
}
return p
}

// Start is invoked during service startup.
Expand All @@ -53,64 +56,93 @@ func (ctdp *cumulativeToDeltaProcessor) Start(context.Context, component.Host) e
// processMetrics implements the ProcessMetricsFunc type.
func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
rm := resourceMetricsSlice.At(i)
resourceMetricsSlice.RemoveIf(func(rm pdata.ResourceMetrics) bool {
ilms := rm.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
metricSlice := ilm.Metrics()
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
if ctdp.metrics[metric.Name()] {
if metric.DataType() == pdata.MetricDataTypeSum && metric.Sum().AggregationTemporality() == pdata.MetricAggregationTemporalityCumulative {
dataPoints := metric.Sum().DataPoints()

for l := 0; l < dataPoints.Len(); l++ {
fromDataPoint := dataPoints.At(l)
labelMap := make(map[string]string)

fromDataPoint.Attributes().Range(func(k string, v pdata.AttributeValue) bool {
labelMap[k] = v.AsString()
return true
})
datapointValue := fromDataPoint.DoubleVal()
if math.IsNaN(datapointValue) {
continue
}
result, _ := ctdp.deltaCalculator.Calculate(metric.Name(), labelMap, datapointValue, fromDataPoint.Timestamp().AsTime())
ilms.RemoveIf(func(ilm pdata.InstrumentationLibraryMetrics) bool {
ms := ilm.Metrics()
ms.RemoveIf(func(m pdata.Metric) bool {
if _, ok := ctdp.metrics[m.Name()]; !ok {
return false
}
baseIdentity := tracking.MetricIdentity{
Resource: rm.Resource(),
InstrumentationLibrary: ilm.InstrumentationLibrary(),
MetricDataType: m.DataType(),
MetricName: m.Name(),
MetricUnit: m.Unit(),
}
switch m.DataType() {
case pdata.MetricDataTypeSum:
ms := m.Sum()
if ms.AggregationTemporality() != pdata.MetricAggregationTemporalityCumulative {
return false
}

fromDataPoint.SetDoubleVal(result.(delta).value)
fromDataPoint.SetStartTimestamp(pdata.NewTimestampFromTime(result.(delta).prevTimestamp))
}
metric.Sum().SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta)
// Ignore any metrics that aren't monotonic
if !ms.IsMonotonic() {
return false
}

baseIdentity.MetricIsMonotonic = ms.IsMonotonic()
ctdp.convertDataPoints(ms.DataPoints(), baseIdentity)
ms.SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta)
return ms.DataPoints().Len() == 0
default:
return false
}
}
}
}
})
return ilm.Metrics().Len() == 0
})
return rm.InstrumentationLibraryMetrics().Len() == 0
})
return md, nil
}

// Shutdown is invoked during service shutdown.
func (ctdp *cumulativeToDeltaProcessor) Shutdown(context.Context) error {
ctdp.cancelFunc()
return nil
}

func newDeltaCalculator() awsmetrics.MetricCalculator {
return awsmetrics.NewMetricCalculator(func(prev *awsmetrics.MetricValue, val interface{}, timestamp time.Time) (interface{}, bool) {
result := delta{value: val.(float64), prevTimestamp: timestamp}

if prev != nil {
deltaValue := val.(float64) - prev.RawValue.(float64)
result.value = deltaValue
result.prevTimestamp = prev.Timestamp
return result, true
}
return result, false
})
}
func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) {
switch dps := in.(type) {
case pdata.NumberDataPointSlice:
dps.RemoveIf(func(dp pdata.NumberDataPoint) bool {
id := baseIdentity
id.StartTimestamp = dp.StartTimestamp()
id.Attributes = dp.Attributes()
id.MetricValueType = dp.Type()
point := tracking.ValuePoint{
ObservedTimestamp: dp.Timestamp(),
}
if id.IsFloatVal() {
// Do not attempt to transform NaN values
if math.IsNaN(dp.DoubleVal()) {
return false
}
point.FloatValue = dp.DoubleVal()
} else {
point.IntValue = dp.IntVal()
}
trackingPoint := tracking.MetricPoint{
Identity: id,
Value: point,
}
delta, valid := ctdp.deltaCalculator.Convert(trackingPoint)

type delta struct {
value float64
prevTimestamp time.Time
// When converting non-monotonic cumulative counters,
// the first data point is omitted since the initial
// reference is not assumed to be zero
if !valid {
return true
}
dp.SetStartTimestamp(delta.StartTimestamp)
if id.IsFloatVal() {
dp.SetDoubleVal(delta.FloatValue)
} else {
dp.SetIntVal(delta.IntValue)
}
return false
})
}
}
Loading

0 comments on commit 97fc0e2

Please sign in to comment.