Skip to content

Commit

Permalink
[aggregator] keep metric type during the aggregation (#2941)
Browse files Browse the repository at this point in the history
  • Loading branch information
gediminasgu authored Dec 9, 2020
1 parent 066e956 commit 37665c0
Show file tree
Hide file tree
Showing 34 changed files with 351 additions and 83 deletions.
2 changes: 2 additions & 0 deletions scripts/development/m3_stack/m3coordinator-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ carbon:

tagOptions:
idScheme: quoted

storeMetricsType: true
2 changes: 2 additions & 0 deletions scripts/development/m3_stack/m3coordinator-standard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ carbon:

tagOptions:
idScheme: quoted

storeMetricsType: true
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/aggregator/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@ ingest:
retry:
maxBackoff: 10s
jitter: true

storeMetricsType: true
52 changes: 52 additions & 0 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ function prometheus_remote_write {
local label1_value=${label1_value:-label1}
local label2_name=${label2_name:-label2}
local label2_value=${label2_value:-label2}
local metric_type=${metric_type:counter}

network_name="aggregator"
network=$(docker network ls | fgrep $network_name | tr -s ' ' | cut -f 1 -d ' ' | tail -n 1)
out=$((docker run -it --rm --network $network \
$PROMREMOTECLI_IMAGE \
-u http://m3coordinator01:7202/api/v1/prom/remote/write \
-h M3-Prom-Type:${metric_type} \
-t __name__:${metric_name} \
-t ${label0_name}:${label0_value} \
-t ${label1_name}:${label1_value} \
Expand Down Expand Up @@ -217,6 +219,22 @@ function prometheus_query_native {
return $?
}
function dbnode_fetch {
local namespace=${namespace}
local id=${id}
local rangeStart=${rangeStart}
local rangeEnd=${rangeEnd}
local jq_path=${jq_path:-}
local expected_value=${expected_value:-}
result=$(curl -s \
"0.0.0.0:9002/fetch" \
"-d" \
"{\"namespace\": \"${namespace}\", \"id\": \"${id}\", \"rangeStart\": ${rangeStart}, \"rangeEnd\": ${rangeEnd}}" | jq -r "${jq_path}")
test "$result" = "$expected_value"
return $?
}
function test_aggregated_rollup_rule {
resolution_seconds="10"
now=$(date +"%s")
Expand All @@ -234,6 +252,7 @@ function test_aggregated_rollup_rule {
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/bar" \
metric_type="counter" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
Expand All @@ -251,6 +270,7 @@ function test_aggregated_rollup_rule {
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/baz" \
metric_type="gauge" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
Expand Down Expand Up @@ -284,6 +304,38 @@ function test_aggregated_rollup_rule {
retry_with_backoff prometheus_query_native
}

function test_metric_type_survives_aggregation {
now=$(date +"%s")

echo "Test metric type should be kept after aggregation"

# Emit values for endpoint /foo/bar (to ensure right values aggregated)
write_at="$now_truncated"
value="42"

metric_type="counter" \
prometheus_remote_write \
metric_type_test $now $value \
true "Expected request to succeed" \
200 "Expected request to return status code 200"

start=$(( $now - 3600 ))
end=$(( $now + 3600 ))
jq_path=".datapoints[0].annotation"

echo "Test query metric type"

# Test by metric types are stored in aggregated namespace
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
namespace="agg" \
id='{__name__=\"metric_type_test\",label0=\"label0\",label1=\"label1\",label2=\"label2\"}' \
rangeStart=${start} \
rangeEnd=${end} \
jq_path="$jq_path" expected_value="CAEQAQ==" \
retry_with_backoff dbnode_fetch
}

echo "Run tests"
test_aggregated_graphite_metric
test_aggregated_rollup_rule
test_metric_type_survives_aggregation
2 changes: 2 additions & 0 deletions src/aggregator/aggregation/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func TestCounterCustomAggregationType(t *testing.T) {
require.Equal(t, float64(338350), v)
case aggregation.Stdev:
require.InDelta(t, 29.01149, v, 0.001)
case aggregation.Last:
require.Equal(t, 0.0, v)
default:
require.Equal(t, float64(0), v)
require.False(t, aggType.IsValidForCounter())
Expand Down
1 change: 1 addition & 0 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (agg *aggregator) AddPassthrough(
ChunkedID: id.ChunkedID{
Data: []byte(metric.ID),
},
Type: metric.Type,
TimeNanos: metric.TimeNanos,
Value: metric.Value,
},
Expand Down
7 changes: 4 additions & 3 deletions src/aggregator/aggregator/counter_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/aggregator/aggregator/elem_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ func TestCounterElemBaseResetSetData(t *testing.T) {

func TestCounterElemBaseResetSetDataInvalidTypes(t *testing.T) {
e := counterElemBase{}
err := e.ResetSetData(nil, maggregation.Types{maggregation.Last}, false)
err := e.ResetSetData(nil, maggregation.Types{maggregation.P10}, false)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "invalid aggregation types Last for counter"))
require.True(t, strings.Contains(err.Error(), "invalid aggregation types P10 for counter"))
}

func TestTimerElemBase(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions src/aggregator/aggregator/elem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ func TestCounterResetSetData(t *testing.T) {

func TestCounterResetSetDataInvalidAggregationType(t *testing.T) {
opts := NewOptions()
ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, maggregation.DefaultTypes, applied.DefaultPipeline, testNumForwardedTimes, NoPrefixNoSuffix, opts)
err := ce.ResetSetData(testCounterID, testStoragePolicy, maggregation.Types{maggregation.Last}, applied.DefaultPipeline, 0, NoPrefixNoSuffix)
ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, maggregation.DefaultTypes,
applied.DefaultPipeline, testNumForwardedTimes, NoPrefixNoSuffix, opts)
err := ce.ResetSetData(testCounterID, testStoragePolicy, maggregation.Types{maggregation.P10},
applied.DefaultPipeline, 0, NoPrefixNoSuffix)
require.Error(t, err)
}

Expand Down Expand Up @@ -1810,6 +1812,7 @@ func testFlushLocalMetricFn() (
return func(
idPrefix []byte,
id id.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
2 changes: 2 additions & 0 deletions src/aggregator/aggregator/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package aggregator
import (
"time"

"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/policy"
)
Expand Down Expand Up @@ -83,6 +84,7 @@ const (
type flushLocalMetricFn func(
idPrefix []byte,
id id.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
7 changes: 4 additions & 3 deletions src/aggregator/aggregator/gauge_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions src/aggregator/aggregator/generic_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,10 @@ func (e *GenericElem) processValueWithAggregationLock(
for _, point := range toFlush {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp)
flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType),
point.TimeNanos, point.Value, e.sp)
flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType,
e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp)
}
}
} else {
Expand Down
1 change: 1 addition & 0 deletions src/aggregator/aggregator/handler/writer/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (w *protobufWriter) prepare(mp aggregated.ChunkedMetricWithStoragePolicy) (
w.m.ID = append(w.m.ID, mp.Suffix...)
w.m.Metric.TimeNanos = mp.TimeNanos
w.m.Metric.Value = mp.Value
w.m.Metric.Type = mp.Type
w.m.StoragePolicy = mp.StoragePolicy
shard := w.shardFn(w.m.ID, w.numShards)
return w.m, shard
Expand Down
4 changes: 4 additions & 0 deletions src/aggregator/aggregator/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/m3db/m3/src/aggregator/aggregator/handler"
"github.com/m3db/m3/src/aggregator/aggregator/handler/writer"
"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/metric/aggregated"
metricid "github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/policy"
Expand Down Expand Up @@ -434,6 +435,7 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) {
func (l *baseMetricList) consumeLocalMetric(
idPrefix []byte,
id metricid.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand All @@ -447,6 +449,7 @@ func (l *baseMetricList) consumeLocalMetric(
chunkedMetricWithPolicy := aggregated.ChunkedMetricWithStoragePolicy{
ChunkedMetric: aggregated.ChunkedMetric{
ChunkedID: chunkedID,
Type: metricType,
TimeNanos: timeNanos,
Value: value,
},
Expand All @@ -463,6 +466,7 @@ func (l *baseMetricList) consumeLocalMetric(
func (l *baseMetricList) discardLocalMetric(
idPrefix []byte,
id metricid.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
2 changes: 2 additions & 0 deletions src/aggregator/aggregator/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ func TestTimedMetricListFlushConsumingAndCollectingTimedMetrics(t *testing.T) {
ChunkedID: id.ChunkedID{
Data: ep.metric.ID,
},
Type: ep.metric.Type,
TimeNanos: alignedStart,
Value: ep.metric.Value,
},
Expand Down Expand Up @@ -1056,6 +1057,7 @@ func TestForwardedMetricListLastStepLocalFlush(t *testing.T) {
Prefix: ep.expectedPrefix,
Data: ep.metric.ID,
},
Type: ep.metric.Type,
TimeNanos: alignedStart,
Value: ep.metric.Values[0],
},
Expand Down
7 changes: 4 additions & 3 deletions src/aggregator/aggregator/timer_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/aggregator/generated-source-files.mk
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ genny-all: genny-aggregator-counter-elem genny-aggregator-timer-elem genny-aggre
genny-aggregator-counter-elem:
cat $(m3db_package_path)/src/aggregator/aggregator/generic_elem.go \
| awk '/^package/{i++}i' \
| sed 's/metric.GaugeType/metric.CounterType/' \
| genny -out=$(m3db_package_path)/src/aggregator/aggregator/counter_elem_gen.go -pkg=aggregator gen \
"timedAggregation=timedCounter lockedAggregation=lockedCounterAggregation typeSpecificAggregation=counterAggregation typeSpecificElemBase=counterElemBase genericElemPool=CounterElemPool GenericElem=CounterElem"

Expand Down
5 changes: 4 additions & 1 deletion src/cmd/services/m3coordinator/ingest/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func (cfg Configuration) NewIngester(
appender storage.Appender,
tagOptions models.TagOptions,
instrumentOptions instrument.Options,
storeMetricsType bool,
) (*Ingester, error) {
opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions)
opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions, storeMetricsType)
if err != nil {
return nil, err
}
Expand All @@ -58,6 +59,7 @@ func (cfg Configuration) newOptions(
appender storage.Appender,
tagOptions models.TagOptions,
instrumentOptions instrument.Options,
storeMetricsType bool,
) (Options, error) {
scope := instrumentOptions.MetricsScope().Tagged(
map[string]string{"component": "ingester"},
Expand Down Expand Up @@ -98,5 +100,6 @@ func (cfg Configuration) newOptions(
RetryOptions: cfg.Retry.NewOptions(scope),
Sampler: sampler,
InstrumentOptions: instrumentOptions,
StoreMetricsType: storeMetricsType,
}, nil
}
Loading

0 comments on commit 37665c0

Please sign in to comment.