Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cumulativetodeltaprocessor: Reopening #4444 Update cumulative to delta #5772

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2791c68
Add metric tracking library to replace awsmetrics dependency.
a-feld Aug 6, 2021
bbe3cbc
Update configuration of cumulativetodelta processor.
a-feld Aug 6, 2021
d4d3c49
Update cumulativetodelta processor logic.
a-feld Aug 6, 2021
bcbc722
Update processor default test case.
a-feld Aug 6, 2021
86840f1
Remove extra validate function call.
a-feld Aug 6, 2021
8617d90
Add processor benchmark.
a-feld Aug 6, 2021
6464c55
Remove aws metrics library.
a-feld Aug 6, 2021
5e59ad4
Update readme.
a-feld Aug 16, 2021
ebefb02
Convert to using attributes rather than labels map.
a-feld Aug 18, 2021
4ca2171
Fix usage of deprecated api.
a-feld Sep 17, 2021
36c7d2a
Retain NaN values.
a-feld Sep 17, 2021
88fa329
Exported field comments should start with field name.
a-feld Sep 24, 2021
007837d
Remove monotonic configuration option.
a-feld Sep 24, 2021
5f928c2
Merge branch 'main' into update-cumulative-to-delta
alanwest Oct 15, 2021
db462d0
Fixes after merge
alanwest Oct 15, 2021
f5cc398
Rename MaxStale to MaxStaleness
alanwest Oct 15, 2021
ee86cfd
Update README to reflect removal of monotonic_only setting
alanwest Oct 15, 2021
e63e77c
Change processor to only convert metrics explicitly specified in the …
alanwest Oct 15, 2021
c032dd8
Merge branch 'main' into update-cumulative-to-delta
alanwest Oct 15, 2021
789f07f
Reintroduce test for invalid config with no metric names
alanwest Oct 16, 2021
58a72c1
Rename max_stale to max_staleness
alanwest Oct 16, 2021
c84a17a
Fix README
alanwest Oct 16, 2021
466187c
List of metric names can no longer be nil or empty
alanwest Oct 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions processor/cumulativetodeltaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ The cumulative to delta processor (`cumulativetodeltaprocessor`) converts cumula

## Configuration

Configuration is specified through a list of metrics. The processor uses metric names to identify a set of cumulative sum metrics and converts them to cumulative delta.
Configuration is specified through a list of metrics. The processor uses metric names to identify a set of cumulative metrics and converts them from cumulative to delta.

The following settings can be optionally configured:

- `metrics`: The processor uses metric names to identify a set of cumulative metrics and converts them to delta.
- `max_stale`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0

#### Example

```yaml
processors:
Expand All @@ -23,4 +30,4 @@ processors:
.
.
- <metric_n_name>
```
```
8 changes: 7 additions & 1 deletion processor/cumulativetodeltaprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cumulativetodeltaprocessor

import (
"fmt"
"time"

"go.opentelemetry.io/collector/config"
)
Expand All @@ -24,10 +25,15 @@ import (
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// List of cumulative sum metrics to convert to delta
// List of cumulative metrics to convert to delta.
Metrics []string `mapstructure:"metrics"`

// MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely.
MaxStaleness time.Duration `mapstructure:"max_staleness"`
}

var _ config.Processor = (*Config)(nil)

// Validate checks whether the input configuration has all of the required fields for the processor.
// An error is returned if there are any invalid inputs.
func (config *Config) Validate() error {
Expand Down
32 changes: 17 additions & 15 deletions processor/cumulativetodeltaprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -26,35 +27,37 @@ import (
"go.opentelemetry.io/collector/config/configtest"
)

const configFile = "config.yaml"

func TestLoadingFullConfig(t *testing.T) {

factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", configFile), factories)
assert.NoError(t, err)
require.NotNil(t, cfg)

tests := []struct {
configFile string
expCfg *Config
expCfg *Config
}{
{
configFile: "config_full.yaml",
expCfg: &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Metrics: []string{
"metric1",
"metric2",
},
MaxStaleness: 10 * time.Second,
},
},
}

for _, test := range tests {
t.Run(test.expCfg.ID().String(), func(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
config, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", test.configFile), factories)
assert.NoError(t, err)
require.NotNil(t, config)

cfg := config.Processors[test.expCfg.ID()]
cfg := cfg.Processors[test.expCfg.ID()]
assert.Equal(t, test.expCfg, cfg)
})
}
Expand All @@ -67,7 +70,7 @@ func TestValidateConfig(t *testing.T) {
errorMessage string
}{
{
configName: "config_full.yaml",
configName: "config.yaml",
succeed: true,
},
{
Expand All @@ -92,6 +95,5 @@ func TestValidateConfig(t *testing.T) {
assert.EqualError(t, err, fmt.Sprintf("processor %q has invalid configuration: %s", typeStr, test.errorMessage))
}
})

}
}
3 changes: 1 addition & 2 deletions processor/cumulativetodeltaprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func createDefaultConfig() config.Processor {
}

func createMetricsProcessor(
ctx context.Context,
_ context.Context,
params component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
Expand All @@ -56,7 +56,6 @@ func createMetricsProcessor(
return nil, fmt.Errorf("configuration parsing error")
}

processorConfig.Validate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

metricsProcessor := newCumulativeToDeltaProcessor(processorConfig, params.Logger)

return processorhelper.NewMetricsProcessor(
Expand Down
2 changes: 1 addition & 1 deletion processor/cumulativetodeltaprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestCreateProcessors(t *testing.T) {
errorMessage string
}{
{
configName: "config_full.yaml",
configName: "config.yaml",
succeed: true,
},
}
Expand Down
3 changes: 0 additions & 3 deletions processor/cumulativetodeltaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumul
go 1.17

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics v0.37.1
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.37.0
go.opentelemetry.io/collector/model v0.37.0
Expand Down Expand Up @@ -41,5 +40,3 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics => ./../../internal/aws/metrics
150 changes: 91 additions & 59 deletions processor/cumulativetodeltaprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,35 @@ package cumulativetodeltaprocessor
import (
"context"
"math"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"

awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/tracking"
)

type cumulativeToDeltaProcessor struct {
metrics map[string]bool
metrics map[string]struct{}
logger *zap.Logger
deltaCalculator awsmetrics.MetricCalculator
deltaCalculator tracking.MetricTracker
cancelFunc context.CancelFunc
}

func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor {
inputMetricSet := make(map[string]bool, len(config.Metrics))
for _, name := range config.Metrics {
inputMetricSet[name] = true
}

return &cumulativeToDeltaProcessor{
metrics: inputMetricSet,
ctx, cancel := context.WithCancel(context.Background())
p := &cumulativeToDeltaProcessor{
logger: logger,
deltaCalculator: newDeltaCalculator(),
deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness),
cancelFunc: cancel,
}
if len(config.Metrics) > 0 {
p.metrics = make(map[string]struct{}, len(config.Metrics))
for _, m := range config.Metrics {
p.metrics[m] = struct{}{}
}
}
return p
}

// Start is invoked during service startup.
Expand All @@ -53,64 +56,93 @@ func (ctdp *cumulativeToDeltaProcessor) Start(context.Context, component.Host) e
// processMetrics implements the ProcessMetricsFunc type.
func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
rm := resourceMetricsSlice.At(i)
resourceMetricsSlice.RemoveIf(func(rm pdata.ResourceMetrics) bool {
ilms := rm.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
metricSlice := ilm.Metrics()
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
if ctdp.metrics[metric.Name()] {
if metric.DataType() == pdata.MetricDataTypeSum && metric.Sum().AggregationTemporality() == pdata.MetricAggregationTemporalityCumulative {
dataPoints := metric.Sum().DataPoints()

for l := 0; l < dataPoints.Len(); l++ {
fromDataPoint := dataPoints.At(l)
labelMap := make(map[string]string)

fromDataPoint.Attributes().Range(func(k string, v pdata.AttributeValue) bool {
labelMap[k] = v.AsString()
return true
})
datapointValue := fromDataPoint.DoubleVal()
if math.IsNaN(datapointValue) {
continue
}
result, _ := ctdp.deltaCalculator.Calculate(metric.Name(), labelMap, datapointValue, fromDataPoint.Timestamp().AsTime())
ilms.RemoveIf(func(ilm pdata.InstrumentationLibraryMetrics) bool {
ms := ilm.Metrics()
ms.RemoveIf(func(m pdata.Metric) bool {
if _, ok := ctdp.metrics[m.Name()]; !ok {
return false
}
baseIdentity := tracking.MetricIdentity{
Resource: rm.Resource(),
InstrumentationLibrary: ilm.InstrumentationLibrary(),
MetricDataType: m.DataType(),
MetricName: m.Name(),
MetricUnit: m.Unit(),
}
switch m.DataType() {
case pdata.MetricDataTypeSum:
ms := m.Sum()
if ms.AggregationTemporality() != pdata.MetricAggregationTemporalityCumulative {
return false
}

fromDataPoint.SetDoubleVal(result.(delta).value)
fromDataPoint.SetStartTimestamp(pdata.NewTimestampFromTime(result.(delta).prevTimestamp))
}
metric.Sum().SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta)
// Ignore any metrics that aren't monotonic
if !ms.IsMonotonic() {
return false
}

baseIdentity.MetricIsMonotonic = ms.IsMonotonic()
ctdp.convertDataPoints(ms.DataPoints(), baseIdentity)
ms.SetAggregationTemporality(pdata.MetricAggregationTemporalityDelta)
return ms.DataPoints().Len() == 0
default:
return false
}
}
}
}
})
return ilm.Metrics().Len() == 0
})
return rm.InstrumentationLibraryMetrics().Len() == 0
})
return md, nil
}

// Shutdown is invoked during service shutdown.
func (ctdp *cumulativeToDeltaProcessor) Shutdown(context.Context) error {
ctdp.cancelFunc()
return nil
}

func newDeltaCalculator() awsmetrics.MetricCalculator {
return awsmetrics.NewMetricCalculator(func(prev *awsmetrics.MetricValue, val interface{}, timestamp time.Time) (interface{}, bool) {
result := delta{value: val.(float64), prevTimestamp: timestamp}

if prev != nil {
deltaValue := val.(float64) - prev.RawValue.(float64)
result.value = deltaValue
result.prevTimestamp = prev.Timestamp
return result, true
}
return result, false
})
}
func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) {
switch dps := in.(type) {
case pdata.NumberDataPointSlice:
dps.RemoveIf(func(dp pdata.NumberDataPoint) bool {
id := baseIdentity
id.StartTimestamp = dp.StartTimestamp()
id.Attributes = dp.Attributes()
id.MetricValueType = dp.Type()
point := tracking.ValuePoint{
ObservedTimestamp: dp.Timestamp(),
}
if id.IsFloatVal() {
// Do not attempt to transform NaN values
if math.IsNaN(dp.DoubleVal()) {
return false
}
point.FloatValue = dp.DoubleVal()
} else {
point.IntValue = dp.IntVal()
}
trackingPoint := tracking.MetricPoint{
Identity: id,
Value: point,
}
delta, valid := ctdp.deltaCalculator.Convert(trackingPoint)

type delta struct {
value float64
prevTimestamp time.Time
// When converting non-monotonic cumulative counters,
// the first data point is omitted since the initial
// reference is not assumed to be zero
if !valid {
return true
}
dp.SetStartTimestamp(delta.StartTimestamp)
if id.IsFloatVal() {
dp.SetDoubleVal(delta.FloatValue)
} else {
dp.SetIntVal(delta.IntValue)
}
return false
})
}
}
Loading