From 37665c06e3c3e151be2207ec914bb922c54b3a59 Mon Sep 17 00:00:00 2001 From: Gediminas Guoba Date: Wed, 9 Dec 2020 15:19:22 +0200 Subject: [PATCH] [aggregator] keep metric type during the aggregation (#2941) --- .../m3_stack/m3coordinator-aggregator.yml | 2 + .../m3_stack/m3coordinator-standard.yml | 2 + .../aggregator/m3coordinator.yml | 2 + .../aggregator/test.sh | 52 ++++++++ src/aggregator/aggregation/counter_test.go | 2 + src/aggregator/aggregator/aggregator.go | 1 + src/aggregator/aggregator/counter_elem_gen.go | 7 +- src/aggregator/aggregator/elem_base_test.go | 4 +- src/aggregator/aggregator/elem_test.go | 7 +- src/aggregator/aggregator/flush.go | 2 + src/aggregator/aggregator/gauge_elem_gen.go | 7 +- src/aggregator/aggregator/generic_elem.go | 6 +- .../aggregator/handler/writer/protobuf.go | 1 + src/aggregator/aggregator/list.go | 4 + src/aggregator/aggregator/list_test.go | 2 + src/aggregator/aggregator/timer_elem_gen.go | 7 +- src/aggregator/generated-source-files.mk | 1 + .../m3coordinator/ingest/m3msg/config.go | 5 +- .../m3coordinator/ingest/m3msg/ingest.go | 116 ++++++++++++------ .../m3coordinator/ingest/m3msg/ingest_test.go | 12 +- .../services/m3coordinator/ingest/write.go | 27 ++-- .../server/m3msg/protobuf_handler.go | 2 +- .../server/m3msg/protobuf_handler_test.go | 2 + .../m3coordinator/server/m3msg/types.go | 2 + src/metrics/aggregation/type.go | 2 +- .../encoding/protobuf/aggregated_decoder.go | 11 ++ src/metrics/metric/aggregated/types.go | 1 + .../api/v1/handler/prometheus/remote/write.go | 24 +++- .../handler/prometheus/remote/write_test.go | 83 +++++++++++++ src/query/server/query.go | 7 +- src/query/server/query_test.go | 4 +- src/query/storage/converter.go | 14 +-- src/query/storage/converter_test.go | 8 +- src/x/headers/headers.go | 5 + 34 files changed, 351 insertions(+), 83 deletions(-) diff --git a/scripts/development/m3_stack/m3coordinator-aggregator.yml b/scripts/development/m3_stack/m3coordinator-aggregator.yml index b622668a4a..2591de57d2 100644 --- a/scripts/development/m3_stack/m3coordinator-aggregator.yml +++ b/scripts/development/m3_stack/m3coordinator-aggregator.yml @@ -80,3 +80,5 @@ carbon: tagOptions: idScheme: quoted + +storeMetricsType: true diff --git a/scripts/development/m3_stack/m3coordinator-standard.yml b/scripts/development/m3_stack/m3coordinator-standard.yml index 16137c65b4..5da1d01b5b 100644 --- a/scripts/development/m3_stack/m3coordinator-standard.yml +++ b/scripts/development/m3_stack/m3coordinator-standard.yml @@ -36,3 +36,5 @@ carbon: tagOptions: idScheme: quoted + +storeMetricsType: true diff --git a/scripts/docker-integration-tests/aggregator/m3coordinator.yml b/scripts/docker-integration-tests/aggregator/m3coordinator.yml index 10ba278c91..59319cfde7 100644 --- a/scripts/docker-integration-tests/aggregator/m3coordinator.yml +++ b/scripts/docker-integration-tests/aggregator/m3coordinator.yml @@ -77,3 +77,5 @@ ingest: retry: maxBackoff: 10s jitter: true + +storeMetricsType: true diff --git a/scripts/docker-integration-tests/aggregator/test.sh b/scripts/docker-integration-tests/aggregator/test.sh index 8d287df7e0..a0453ef12a 100755 --- a/scripts/docker-integration-tests/aggregator/test.sh +++ b/scripts/docker-integration-tests/aggregator/test.sh @@ -170,12 +170,14 @@ function prometheus_remote_write { local label1_value=${label1_value:-label1} local label2_name=${label2_name:-label2} local label2_value=${label2_value:-label2} + local metric_type=${metric_type:counter} network_name="aggregator" network=$(docker network ls | fgrep $network_name | tr -s ' ' | cut -f 1 -d ' ' | tail -n 1) out=$((docker run -it --rm --network $network \ $PROMREMOTECLI_IMAGE \ -u http://m3coordinator01:7202/api/v1/prom/remote/write \ + -h M3-Prom-Type:${metric_type} \ -t __name__:${metric_name} \ -t ${label0_name}:${label0_value} \ -t ${label1_name}:${label1_value} \ @@ -217,6 +219,22 @@ function prometheus_query_native { return $? } +function dbnode_fetch { + local namespace=${namespace} + local id=${id} + local rangeStart=${rangeStart} + local rangeEnd=${rangeEnd} + local jq_path=${jq_path:-} + local expected_value=${expected_value:-} + + result=$(curl -s \ + "0.0.0.0:9002/fetch" \ + "-d" \ + "{\"namespace\": \"${namespace}\", \"id\": \"${id}\", \"rangeStart\": ${rangeStart}, \"rangeEnd\": ${rangeEnd}}" | jq -r "${jq_path}") + test "$result" = "$expected_value" + return $? +} + function test_aggregated_rollup_rule { resolution_seconds="10" now=$(date +"%s") @@ -234,6 +252,7 @@ function test_aggregated_rollup_rule { label0_name="app" label0_value="nginx_edge" \ label1_name="status_code" label1_value="500" \ label2_name="endpoint" label2_value="/foo/bar" \ + metric_type="counter" \ prometheus_remote_write \ http_requests $write_at $value \ true "Expected request to succeed" \ @@ -251,6 +270,7 @@ function test_aggregated_rollup_rule { label0_name="app" label0_value="nginx_edge" \ label1_name="status_code" label1_value="500" \ label2_name="endpoint" label2_value="/foo/baz" \ + metric_type="gauge" \ prometheus_remote_write \ http_requests $write_at $value \ true "Expected request to succeed" \ @@ -284,6 +304,38 @@ function test_aggregated_rollup_rule { retry_with_backoff prometheus_query_native } +function test_metric_type_survives_aggregation { + now=$(date +"%s") + + echo "Test metric type should be kept after aggregation" + + # Emit values for endpoint /foo/bar (to ensure right values aggregated) + write_at="$now_truncated" + value="42" + + metric_type="counter" \ + prometheus_remote_write \ + metric_type_test $now $value \ + true "Expected request to succeed" \ + 200 "Expected request to return status code 200" + + start=$(( $now - 3600 )) + end=$(( $now + 3600 )) + jq_path=".datapoints[0].annotation" + + echo "Test query metric type" + + # Test by metric types are stored in aggregated namespace + ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \ + namespace="agg" \ + id='{__name__=\"metric_type_test\",label0=\"label0\",label1=\"label1\",label2=\"label2\"}' \ + rangeStart=${start} \ + rangeEnd=${end} \ + jq_path="$jq_path" expected_value="CAEQAQ==" \ + retry_with_backoff dbnode_fetch +} + echo "Run tests" test_aggregated_graphite_metric test_aggregated_rollup_rule +test_metric_type_survives_aggregation diff --git a/src/aggregator/aggregation/counter_test.go b/src/aggregator/aggregation/counter_test.go index 47492459fe..57610479aa 100644 --- a/src/aggregator/aggregation/counter_test.go +++ b/src/aggregator/aggregation/counter_test.go @@ -70,6 +70,8 @@ func TestCounterCustomAggregationType(t *testing.T) { require.Equal(t, float64(338350), v) case aggregation.Stdev: require.InDelta(t, 29.01149, v, 0.001) + case aggregation.Last: + require.Equal(t, 0.0, v) default: require.Equal(t, float64(0), v) require.False(t, aggType.IsValidForCounter()) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index 5e0df44e9c..2e30c8b4e5 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -287,6 +287,7 @@ func (agg *aggregator) AddPassthrough( ChunkedID: id.ChunkedID{ Data: []byte(metric.ID), }, + Type: metric.Type, TimeNanos: metric.TimeNanos, Value: metric.Value, }, diff --git a/src/aggregator/aggregator/counter_elem_gen.go b/src/aggregator/aggregator/counter_elem_gen.go index 625ce7de2d..040a988321 100644 --- a/src/aggregator/aggregator/counter_elem_gen.go +++ b/src/aggregator/aggregator/counter_elem_gen.go @@ -31,6 +31,7 @@ import ( "time" maggregation "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/pipeline/applied" @@ -480,10 +481,10 @@ func (e *CounterElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, metric.CounterType, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), - point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, metric.CounterType, + e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/aggregator/elem_base_test.go b/src/aggregator/aggregator/elem_base_test.go index a277432a51..a13ab372bc 100644 --- a/src/aggregator/aggregator/elem_base_test.go +++ b/src/aggregator/aggregator/elem_base_test.go @@ -203,9 +203,9 @@ func TestCounterElemBaseResetSetData(t *testing.T) { func TestCounterElemBaseResetSetDataInvalidTypes(t *testing.T) { e := counterElemBase{} - err := e.ResetSetData(nil, maggregation.Types{maggregation.Last}, false) + err := e.ResetSetData(nil, maggregation.Types{maggregation.P10}, false) require.Error(t, err) - require.True(t, strings.Contains(err.Error(), "invalid aggregation types Last for counter")) + require.True(t, strings.Contains(err.Error(), "invalid aggregation types P10 for counter")) } func TestTimerElemBase(t *testing.T) { diff --git a/src/aggregator/aggregator/elem_test.go b/src/aggregator/aggregator/elem_test.go index 52c89da25b..a87130d10c 100644 --- a/src/aggregator/aggregator/elem_test.go +++ b/src/aggregator/aggregator/elem_test.go @@ -158,8 +158,10 @@ func TestCounterResetSetData(t *testing.T) { func TestCounterResetSetDataInvalidAggregationType(t *testing.T) { opts := NewOptions() - ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, maggregation.DefaultTypes, applied.DefaultPipeline, testNumForwardedTimes, NoPrefixNoSuffix, opts) - err := ce.ResetSetData(testCounterID, testStoragePolicy, maggregation.Types{maggregation.Last}, applied.DefaultPipeline, 0, NoPrefixNoSuffix) + ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, maggregation.DefaultTypes, + applied.DefaultPipeline, testNumForwardedTimes, NoPrefixNoSuffix, opts) + err := ce.ResetSetData(testCounterID, testStoragePolicy, maggregation.Types{maggregation.P10}, + applied.DefaultPipeline, 0, NoPrefixNoSuffix) require.Error(t, err) } @@ -1810,6 +1812,7 @@ func testFlushLocalMetricFn() ( return func( idPrefix []byte, id id.RawID, + metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, diff --git a/src/aggregator/aggregator/flush.go b/src/aggregator/aggregator/flush.go index e5d41e037e..1f93fa1621 100644 --- a/src/aggregator/aggregator/flush.go +++ b/src/aggregator/aggregator/flush.go @@ -23,6 +23,7 @@ package aggregator import ( "time" + "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/policy" ) @@ -83,6 +84,7 @@ const ( type flushLocalMetricFn func( idPrefix []byte, id id.RawID, + metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, diff --git a/src/aggregator/aggregator/gauge_elem_gen.go b/src/aggregator/aggregator/gauge_elem_gen.go index 20efbc03fa..a845a296a2 100644 --- a/src/aggregator/aggregator/gauge_elem_gen.go +++ b/src/aggregator/aggregator/gauge_elem_gen.go @@ -31,6 +31,7 @@ import ( "time" maggregation "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/pipeline/applied" @@ -480,10 +481,10 @@ func (e *GaugeElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), - point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType, + e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/aggregator/generic_elem.go b/src/aggregator/aggregator/generic_elem.go index eab77d6b34..4a1370c83c 100644 --- a/src/aggregator/aggregator/generic_elem.go +++ b/src/aggregator/aggregator/generic_elem.go @@ -537,10 +537,10 @@ func (e *GenericElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), - point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType, + e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/aggregator/handler/writer/protobuf.go b/src/aggregator/aggregator/handler/writer/protobuf.go index f00f8d3564..abe84608f3 100644 --- a/src/aggregator/aggregator/handler/writer/protobuf.go +++ b/src/aggregator/aggregator/handler/writer/protobuf.go @@ -133,6 +133,7 @@ func (w *protobufWriter) prepare(mp aggregated.ChunkedMetricWithStoragePolicy) ( w.m.ID = append(w.m.ID, mp.Suffix...) w.m.Metric.TimeNanos = mp.TimeNanos w.m.Metric.Value = mp.Value + w.m.Metric.Type = mp.Type w.m.StoragePolicy = mp.StoragePolicy shard := w.shardFn(w.m.ID, w.numShards) return w.m, shard diff --git a/src/aggregator/aggregator/list.go b/src/aggregator/aggregator/list.go index ea38acfcc3..20f51b6d2c 100644 --- a/src/aggregator/aggregator/list.go +++ b/src/aggregator/aggregator/list.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/aggregator/aggregator/handler" "github.com/m3db/m3/src/aggregator/aggregator/handler/writer" + "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/aggregated" metricid "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/policy" @@ -434,6 +435,7 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) { func (l *baseMetricList) consumeLocalMetric( idPrefix []byte, id metricid.RawID, + metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, @@ -447,6 +449,7 @@ func (l *baseMetricList) consumeLocalMetric( chunkedMetricWithPolicy := aggregated.ChunkedMetricWithStoragePolicy{ ChunkedMetric: aggregated.ChunkedMetric{ ChunkedID: chunkedID, + Type: metricType, TimeNanos: timeNanos, Value: value, }, @@ -463,6 +466,7 @@ func (l *baseMetricList) consumeLocalMetric( func (l *baseMetricList) discardLocalMetric( idPrefix []byte, id metricid.RawID, + metricType metric.Type, idSuffix []byte, timeNanos int64, value float64, diff --git a/src/aggregator/aggregator/list_test.go b/src/aggregator/aggregator/list_test.go index 67e61f1c4b..9a523521d3 100644 --- a/src/aggregator/aggregator/list_test.go +++ b/src/aggregator/aggregator/list_test.go @@ -604,6 +604,7 @@ func TestTimedMetricListFlushConsumingAndCollectingTimedMetrics(t *testing.T) { ChunkedID: id.ChunkedID{ Data: ep.metric.ID, }, + Type: ep.metric.Type, TimeNanos: alignedStart, Value: ep.metric.Value, }, @@ -1056,6 +1057,7 @@ func TestForwardedMetricListLastStepLocalFlush(t *testing.T) { Prefix: ep.expectedPrefix, Data: ep.metric.ID, }, + Type: ep.metric.Type, TimeNanos: alignedStart, Value: ep.metric.Values[0], }, diff --git a/src/aggregator/aggregator/timer_elem_gen.go b/src/aggregator/aggregator/timer_elem_gen.go index 52e0d2be88..cf46acc790 100644 --- a/src/aggregator/aggregator/timer_elem_gen.go +++ b/src/aggregator/aggregator/timer_elem_gen.go @@ -31,6 +31,7 @@ import ( "time" maggregation "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/pipeline/applied" @@ -480,10 +481,10 @@ func (e *TimerElem) processValueWithAggregationLock( for _, point := range toFlush { switch e.idPrefixSuffixType { case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) + flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp) case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), - point.TimeNanos, point.Value, e.sp) + flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType, + e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp) } } } else { diff --git a/src/aggregator/generated-source-files.mk b/src/aggregator/generated-source-files.mk index df9293c791..d95dbd85d3 100644 --- a/src/aggregator/generated-source-files.mk +++ b/src/aggregator/generated-source-files.mk @@ -13,6 +13,7 @@ genny-all: genny-aggregator-counter-elem genny-aggregator-timer-elem genny-aggre genny-aggregator-counter-elem: cat $(m3db_package_path)/src/aggregator/aggregator/generic_elem.go \ | awk '/^package/{i++}i' \ + | sed 's/metric.GaugeType/metric.CounterType/' \ | genny -out=$(m3db_package_path)/src/aggregator/aggregator/counter_elem_gen.go -pkg=aggregator gen \ "timedAggregation=timedCounter lockedAggregation=lockedCounterAggregation typeSpecificAggregation=counterAggregation typeSpecificElemBase=counterElemBase genericElemPool=CounterElemPool GenericElem=CounterElem" diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/config.go b/src/cmd/services/m3coordinator/ingest/m3msg/config.go index 9d7c17cce2..7ca43135d1 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/config.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/config.go @@ -46,8 +46,9 @@ func (cfg Configuration) NewIngester( appender storage.Appender, tagOptions models.TagOptions, instrumentOptions instrument.Options, + storeMetricsType bool, ) (*Ingester, error) { - opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions) + opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions, storeMetricsType) if err != nil { return nil, err } @@ -58,6 +59,7 @@ func (cfg Configuration) newOptions( appender storage.Appender, tagOptions models.TagOptions, instrumentOptions instrument.Options, + storeMetricsType bool, ) (Options, error) { scope := instrumentOptions.MetricsScope().Tagged( map[string]string{"component": "ingester"}, @@ -98,5 +100,6 @@ func (cfg Configuration) newOptions( RetryOptions: cfg.Retry.NewOptions(scope), Sampler: sampler, InstrumentOptions: instrumentOptions, + StoreMetricsType: storeMetricsType, }, nil } diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go index 8105bd9b3c..ca2920197f 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go @@ -56,6 +56,7 @@ type Options struct { Sampler *sampler.Sampler InstrumentOptions instrument.Options TagOptions models.TagOptions + StoreMetricsType bool } type ingestMetrics struct { @@ -99,14 +100,15 @@ func NewIngester( // pooled, but currently this is the only way to get tag decoder. tagDecoder := opts.TagDecoderPool.Get() op := ingestOp{ - s: opts.Appender, - r: retrier, - it: serialize.NewMetricTagsIterator(tagDecoder, nil), - tagOpts: tagOpts, - p: p, - m: m, - logger: opts.InstrumentOptions.Logger(), - sampler: opts.Sampler, + storageAppender: opts.Appender, + retrier: retrier, + iter: serialize.NewMetricTagsIterator(tagDecoder, nil), + tagOpts: tagOpts, + pool: p, + metrics: m, + logger: opts.InstrumentOptions.Logger(), + sampler: opts.Sampler, + storeMetricsType: opts.StoreMetricsType, } op.attemptFn = op.attempt op.ingestFn = op.ingest @@ -123,14 +125,16 @@ func NewIngester( func (i *Ingester) Ingest( ctx context.Context, id []byte, + metricType ts.PromMetricType, metricNanos, encodeNanos int64, value float64, sp policy.StoragePolicy, callback m3msg.Callbackable, ) { op := i.p.Get().(*ingestOp) - op.c = ctx + op.ctx = ctx op.id = id + op.metricType = metricType op.metricNanos = metricNanos op.value = value op.sp = sp @@ -139,26 +143,28 @@ func (i *Ingester) Ingest( } type ingestOp struct { - s storage.Appender - r retry.Retrier - it id.SortedTagIterator - tagOpts models.TagOptions - p pool.ObjectPool - m ingestMetrics - logger *zap.Logger - sampler *sampler.Sampler - attemptFn retry.Fn - ingestFn func() + storageAppender storage.Appender + retrier retry.Retrier + iter id.SortedTagIterator + tagOpts models.TagOptions + pool pool.ObjectPool + metrics ingestMetrics + logger *zap.Logger + sampler *sampler.Sampler + attemptFn retry.Fn + ingestFn func() + storeMetricsType bool - c context.Context + ctx context.Context id []byte + metricType ts.PromMetricType metricNanos int64 value float64 sp policy.StoragePolicy callback m3msg.Callbackable tags models.Tags datapoints ts.Datapoints - q storage.WriteQuery + writeQuery storage.WriteQuery } func (op *ingestOp) sample() bool { @@ -170,22 +176,22 @@ func (op *ingestOp) sample() bool { func (op *ingestOp) ingest() { if err := op.resetWriteQuery(); err != nil { - op.m.ingestInternalError.Inc(1) + op.metrics.ingestInternalError.Inc(1) op.callback.Callback(m3msg.OnRetriableError) - op.p.Put(op) + op.pool.Put(op) if op.sample() { op.logger.Error("could not reset ingest op", zap.Error(err)) } return } - if err := op.r.Attempt(op.attemptFn); err != nil { + if err := op.retrier.Attempt(op.attemptFn); err != nil { nonRetryableErr := xerrors.IsNonRetryableError(err) if nonRetryableErr { op.callback.Callback(m3msg.OnNonRetriableError) - op.m.ingestNonRetryableError.Inc(1) + op.metrics.ingestNonRetryableError.Inc(1) } else { op.callback.Callback(m3msg.OnRetriableError) - op.m.ingestInternalError.Inc(1) + op.metrics.ingestInternalError.Inc(1) } // NB(r): Always log non-retriable errors since they are usually @@ -197,16 +203,16 @@ func (op *ingestOp) ingest() { zap.Bool("retryableError", !nonRetryableErr)) } - op.p.Put(op) + op.pool.Put(op) return } - op.m.ingestSuccess.Inc(1) + op.metrics.ingestSuccess.Inc(1) op.callback.Callback(m3msg.OnSuccess) - op.p.Put(op) + op.pool.Put(op) } func (op *ingestOp) attempt() error { - return op.s.Write(op.c, &op.q) + return op.storageAppender.Write(op.ctx, &op.writeQuery) } func (op *ingestOp) resetWriteQuery() error { @@ -214,7 +220,8 @@ func (op *ingestOp) resetWriteQuery() error { return err } op.resetDataPoints() - return op.q.Reset(storage.WriteQueryOptions{ + + wq := storage.WriteQueryOptions{ Tags: op.tags, Datapoints: op.datapoints, Unit: convert.UnitForM3DB(op.sp.Resolution().Precision), @@ -223,15 +230,50 @@ func (op *ingestOp) resetWriteQuery() error { Resolution: op.sp.Resolution().Window, Retention: op.sp.Retention().Duration(), }, - }) + } + + if op.storeMetricsType { + var err error + wq.Annotation, err = op.convertTypeToAnnotation(op.metricType) + if err != nil { + return err + } + } + + return op.writeQuery.Reset(wq) +} + +func (op *ingestOp) convertTypeToAnnotation(tp ts.PromMetricType) ([]byte, error) { + if tp == ts.PromMetricTypeUnknown { + return nil, nil + } + + handleValueResets := false + if tp == ts.PromMetricTypeCounter { + handleValueResets = true + } + + annotationPayload, err := storage.SeriesAttributesToAnnotationPayload(tp, handleValueResets) + if err != nil { + return nil, err + } + annot, err := annotationPayload.Marshal() + if err != nil { + return nil, err + } + + if len(annot) == 0 { + annot = nil + } + return annot, nil } func (op *ingestOp) resetTags() error { - op.it.Reset(op.id) + op.iter.Reset(op.id) op.tags.Tags = op.tags.Tags[:0] op.tags.Opts = op.tagOpts - for op.it.Next() { - name, value := op.it.Current() + for op.iter.Next() { + name, value := op.iter.Current() // TODO_FIX_GRAPHITE_TAGGING: Using this string constant to track // all places worth fixing this hack. There is at least one @@ -241,7 +283,7 @@ func (op *ingestOp) resetTags() error { if bytes.Equal(value, downsample.GraphiteIDSchemeTagValue) && op.tags.Opts.IDSchemeType() != models.TypeGraphite { // Restart iteration with graphite tag options parsing - op.it.Reset(op.id) + op.iter.Reset(op.id) op.tags.Tags = op.tags.Tags[:0] op.tags.Opts = op.tags.Opts.SetIDSchemeType(models.TypeGraphite) } @@ -256,7 +298,7 @@ func (op *ingestOp) resetTags() error { }.Clone()) } op.tags.Normalize() - return op.it.Err() + return op.iter.Err() } func (op *ingestOp) resetDataPoints() { diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go index 22babfd397..2e9336065f 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go @@ -59,7 +59,7 @@ func TestIngest(t *testing.T) { } appender := &mockAppender{} ingester, err := cfg.NewIngester(appender, models.NewTagOptions(), - instrument.NewOptions()) + instrument.NewOptions(), true) require.NoError(t, err) id := newTestID(t, "__name__", "foo", "app", "bar") @@ -72,14 +72,14 @@ func TestIngest(t *testing.T) { callback := m3msg.NewProtobufCallback(m, protobuf.NewAggregatedDecoder(nil), &wg) m.EXPECT().Ack() - ingester.Ingest(context.TODO(), id, metricNanos, 0, val, sp, callback) + ingester.Ingest(context.TODO(), id, ts.PromMetricTypeGauge, metricNanos, 0, val, sp, callback) for appender.cnt() != 1 { time.Sleep(100 * time.Millisecond) } expected, err := storage.NewWriteQuery(storage.WriteQueryOptions{ - Annotation: nil, + Annotation: []byte{8, 2}, Attributes: storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, Resolution: time.Minute, @@ -93,7 +93,7 @@ func TestIngest(t *testing.T) { }, Tags: models.NewTags(2, nil).AddTags( []models.Tag{ - models.Tag{ + { Name: []byte("__name__"), Value: []byte("foo"), }, @@ -131,7 +131,7 @@ func TestIngestNonRetryableError(t *testing.T) { nonRetryableError := xerrors.NewNonRetryableError(errors.New("bad request error")) appender := &mockAppender{expectErr: nonRetryableError} ingester, err := cfg.NewIngester(appender, models.NewTagOptions(), - instrumentOpts) + instrumentOpts, true) require.NoError(t, err) id := newTestID(t, "__name__", "foo", "app", "bar") @@ -144,7 +144,7 @@ func TestIngestNonRetryableError(t *testing.T) { callback := m3msg.NewProtobufCallback(m, protobuf.NewAggregatedDecoder(nil), &wg) m.EXPECT().Ack() - ingester.Ingest(context.TODO(), id, metricNanos, 0, val, sp, callback) + ingester.Ingest(context.TODO(), id, ts.PromMetricTypeGauge, metricNanos, 0, val, sp, callback) for appender.cntErr() != 1 { time.Sleep(100 * time.Millisecond) diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index 27cd4b477a..6d131f8e15 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -22,6 +22,7 @@ package ingest import ( "context" + "fmt" "sync" "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" @@ -495,14 +496,26 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( } for _, dp := range value.Datapoints { - switch value.Attributes.M3Type { - case ts.M3MetricTypeGauge: - err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) - case ts.M3MetricTypeCounter: - err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value)) - case ts.M3MetricTypeTimer: - err = result.SamplesAppender.AppendTimerTimedSample(dp.Timestamp, dp.Value) + if value.Attributes.PromType != ts.PromMetricTypeUnknown { + switch value.Attributes.PromType { + case ts.PromMetricTypeCounter: + err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value)) + default: + err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) + } + } else { + switch value.Attributes.M3Type { + case ts.M3MetricTypeGauge: + err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) + case ts.M3MetricTypeCounter: + err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value)) + case ts.M3MetricTypeTimer: + err = result.SamplesAppender.AppendTimerTimedSample(dp.Timestamp, dp.Value) + default: + err = fmt.Errorf("unknown m3type '%v'", value.Attributes.M3Type) + } } + if err != nil { // If we see an error break out so we can try processing the // next datapoint. diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index ffca0e0c1d..65f2d6b6c5 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -122,7 +122,7 @@ func (h *pbHandler) Process(msg consumer.Message) { } } - h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r) + h.writeFn(h.ctx, dec.ID(), dec.Type(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r) } func (h *pbHandler) Close() { h.wg.Wait() } diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index 02e411749a..1770a2b757 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/msg/consumer" "github.com/m3db/m3/src/msg/generated/proto/msgpb" "github.com/m3db/m3/src/msg/protocol/proto" + "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/server" xtime "github.com/m3db/m3/src/x/time" @@ -233,6 +234,7 @@ type mockWriter struct { func (m *mockWriter) write( ctx context.Context, name []byte, + metricType ts.PromMetricType, metricNanos, encodeNanos int64, value float64, sp policy.StoragePolicy, diff --git a/src/cmd/services/m3coordinator/server/m3msg/types.go b/src/cmd/services/m3coordinator/server/m3msg/types.go index 8d3a5f96be..20c20c37fe 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/types.go +++ b/src/cmd/services/m3coordinator/server/m3msg/types.go @@ -24,12 +24,14 @@ import ( "context" "github.com/m3db/m3/src/metrics/policy" + "github.com/m3db/m3/src/query/ts" ) // WriteFn is the function that writes a metric. type WriteFn func( ctx context.Context, id []byte, + metricType ts.PromMetricType, metricNanos, encodeNanos int64, value float64, sp policy.StoragePolicy, diff --git a/src/metrics/aggregation/type.go b/src/metrics/aggregation/type.go index db10e274ae..9716dd560d 100644 --- a/src/metrics/aggregation/type.go +++ b/src/metrics/aggregation/type.go @@ -164,7 +164,7 @@ func (a Type) IsValidForGauge() bool { // IsValidForCounter if an Type is valid for Counter. func (a Type) IsValidForCounter() bool { switch a { - case Min, Max, Mean, Count, Sum, SumSq, Stdev: + case Min, Max, Mean, Count, Sum, SumSq, Stdev, Last: return true default: return false diff --git a/src/metrics/encoding/protobuf/aggregated_decoder.go b/src/metrics/encoding/protobuf/aggregated_decoder.go index ca9052c44c..a9ee58e9b8 100644 --- a/src/metrics/encoding/protobuf/aggregated_decoder.go +++ b/src/metrics/encoding/protobuf/aggregated_decoder.go @@ -23,6 +23,7 @@ package protobuf import ( "github.com/m3db/m3/src/metrics/generated/proto/metricpb" "github.com/m3db/m3/src/metrics/policy" + "github.com/m3db/m3/src/query/ts" ) // AggregatedDecoder is a decoder for decoding aggregated metrics. @@ -52,6 +53,16 @@ func (d AggregatedDecoder) ID() []byte { return d.pb.Metric.TimedMetric.Id } +// Type returns the type of the metric. +func (d *AggregatedDecoder) Type() ts.PromMetricType { + switch d.pb.Metric.TimedMetric.Type { + case metricpb.MetricType_COUNTER: + return ts.PromMetricTypeCounter + default: + return ts.PromMetricTypeGauge + } +} + // TimeNanos returns the decoded timestamp. func (d AggregatedDecoder) TimeNanos() int64 { return d.pb.Metric.TimedMetric.TimeNanos diff --git a/src/metrics/metric/aggregated/types.go b/src/metrics/metric/aggregated/types.go index 4415dfa039..e4503a0c60 100644 --- a/src/metrics/metric/aggregated/types.go +++ b/src/metrics/metric/aggregated/types.go @@ -81,6 +81,7 @@ func (m Metric) String() string { // ChunkedMetric is a metric with a chunked ID. type ChunkedMetric struct { id.ChunkedID + Type metric.Type TimeNanos int64 Value float64 } diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index e5a7ff2736..6850e4e8eb 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -93,6 +93,16 @@ var ( Attributes: ts.DefaultSeriesAttributes(), Metadata: ts.Metadata{}, } + + headerToMetricType = map[string]prompb.MetricType{ + "counter": prompb.MetricType_COUNTER, + "gauge": prompb.MetricType_GAUGE, + "gauge-histogram": prompb.MetricType_GAUGE_HISTOGRAM, + "histogram": prompb.MetricType_HISTOGRAM, + "info": prompb.MetricType_INFO, + "stateset": prompb.MetricType_STATESET, + "summary": prompb.MetricType_SUMMARY, + } ) // PromWriteHandler represents a handler for prometheus write endpoint. @@ -502,6 +512,16 @@ func (h *PromWriteHandler) parseRequest( } } + if promType := r.Header.Get(headers.PromTypeHeader); promType != "" { + tp, ok := headerToMetricType[strings.ToLower(promType)] + if !ok { + return parseRequestResult{}, fmt.Errorf("unknown prom metric type %s", promType) + } + for i := range req.Timeseries { + req.Timeseries[i].Type = tp + } + } + return parseRequestResult{ Request: &req, Options: opts, @@ -642,7 +662,9 @@ func (i *promTSIter) Next() bool { return true } - annotationPayload, err := storage.SeriesAttributesToAnnotationPayload(i.attributes[i.idx]) + annotationPayload, err := storage.SeriesAttributesToAnnotationPayload( + i.attributes[i.idx].PromType, + i.attributes[i.idx].HandleValueResets) if err != nil { i.err = err return false diff --git a/src/query/api/v1/handler/prometheus/remote/write_test.go b/src/query/api/v1/handler/prometheus/remote/write_test.go index 3f71305f35..58b2a8d419 100644 --- a/src/query/api/v1/handler/prometheus/remote/write_test.go +++ b/src/query/api/v1/handler/prometheus/remote/write_test.go @@ -86,6 +86,89 @@ func TestPromWriteParsing(t *testing.T) { require.Equal(t, ingest.WriteOptions{}, r.Options) } +func TestMetricTypeHeader(t *testing.T) { + tests := []struct { + headerValue string + expectedType prompb.MetricType + }{ + { + expectedType: prompb.MetricType_UNKNOWN, + }, + { + headerValue: "counter", + expectedType: prompb.MetricType_COUNTER, + }, + { + headerValue: "Counter", + expectedType: prompb.MetricType_COUNTER, + }, + { + headerValue: "gauge", + expectedType: prompb.MetricType_GAUGE, + }, + { + headerValue: "histogram", + expectedType: prompb.MetricType_HISTOGRAM, + }, + { + headerValue: "gauge-histogram", + expectedType: prompb.MetricType_GAUGE_HISTOGRAM, + }, + { + headerValue: "summary", + expectedType: prompb.MetricType_SUMMARY, + }, + { + headerValue: "info", + expectedType: prompb.MetricType_INFO, + }, + { + headerValue: "stateset", + expectedType: prompb.MetricType_STATESET, + }, + } + + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) + handlerOpts := makeOptions(mockDownsamplerAndWriter) + handler, err := NewPromWriteHandler(handlerOpts) + require.NoError(t, err) + + for _, testCase := range tests { + t.Run(testCase.headerValue, func(tt *testing.T) { + tc := testCase // nolint + promReq := test.GeneratePromWriteRequest() + promReqBody := test.GeneratePromWriteRequestBody(tt, promReq) + req := httptest.NewRequest(PromWriteHTTPMethod, PromWriteURL, promReqBody) + if tc.headerValue > "" { + req.Header.Add(headers.PromTypeHeader, tc.headerValue) + } + r, err := handler.(*PromWriteHandler).parseRequest(req) + require.NoError(tt, err) + require.Equal(tt, tc.expectedType, r.Request.Timeseries[0].Type) + }) + } +} + +func TestInvalidMetricTypeHeader(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) + handlerOpts := makeOptions(mockDownsamplerAndWriter) + handler, err := NewPromWriteHandler(handlerOpts) + require.NoError(t, err) + + promReq := test.GeneratePromWriteRequest() + promReqBody := test.GeneratePromWriteRequestBody(t, promReq) + req := httptest.NewRequest(PromWriteHTTPMethod, PromWriteURL, promReqBody) + req.Header.Add(headers.PromTypeHeader, "random") + _, err = handler.(*PromWriteHandler).parseRequest(req) + require.Error(t, err) +} + func TestPromWrite(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() diff --git a/src/query/server/query.go b/src/query/server/query.go index e82d31232d..3b45fe5eaa 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -545,10 +545,15 @@ func Run(runOpts RunOptions) { }() if cfg.Ingest != nil { + storeMetricsType := false + if cfg.StoreMetricsType != nil { + storeMetricsType = *cfg.StoreMetricsType + } + logger.Info("starting m3msg server", zap.String("address", cfg.Ingest.M3Msg.Server.ListenAddress)) ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, - tagOptions, instrumentOptions) + tagOptions, instrumentOptions, storeMetricsType) if err != nil { logger.Fatal("unable to create ingester", zap.Error(err)) } diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index d1f95e0171..f79ccf05cb 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -116,6 +116,8 @@ writeWorkerPoolPolicy: size: 100 shards: 100 killProbability: 0.3 + +storeMetricsType: true ` func TestWrite(t *testing.T) { @@ -253,7 +255,7 @@ func TestIngest(t *testing.T) { gomock.Any(), 42.0, gomock.Any(), - nil). + []byte{8, 2}). Do(func(_, _, _, _, _, _, _ interface{}) { numWrites.Add(1) }) diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index 9cd4ecbfcd..9302266006 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -149,12 +149,12 @@ func PromTimeSeriesToSeriesAttributes(series prompb.TimeSeries) (ts.SeriesAttrib }, nil } -// SeriesAttributesToAnnotationPayload converts ts.SeriesAttributes into an annotation.Payload. -func SeriesAttributesToAnnotationPayload(seriesAttributes ts.SeriesAttributes) (annotation.Payload, error) { +// SeriesAttributesToAnnotationPayload converts passed arguments into an annotation.Payload. +func SeriesAttributesToAnnotationPayload( + promType ts.PromMetricType, + handleValueResets bool) (annotation.Payload, error) { var metricType annotation.MetricType - - switch seriesAttributes.PromType { - + switch promType { case ts.PromMetricTypeUnknown: metricType = annotation.MetricType_UNKNOWN @@ -180,12 +180,12 @@ func SeriesAttributesToAnnotationPayload(seriesAttributes ts.SeriesAttributes) ( metricType = annotation.MetricType_STATESET default: - return annotation.Payload{}, fmt.Errorf("invalid Prometheus metric type %v", seriesAttributes.PromType) + return annotation.Payload{}, fmt.Errorf("invalid Prometheus metric type %v", promType) } return annotation.Payload{ MetricType: metricType, - HandleValueResets: seriesAttributes.HandleValueResets, + HandleValueResets: handleValueResets, }, nil } diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index ca808ab4f5..88a68ef72e 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -385,19 +385,19 @@ func TestSeriesAttributesToAnnotationPayload(t *testing.T) { } for promType, expected := range mapping { - payload, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{PromType: promType}) + payload, err := SeriesAttributesToAnnotationPayload(promType, false) require.NoError(t, err) assert.Equal(t, expected, payload.MetricType) } - _, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{PromType: math.MaxUint8}) + _, err := SeriesAttributesToAnnotationPayload(math.MaxUint8, false) require.Error(t, err) - payload, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{HandleValueResets: true}) + payload, err := SeriesAttributesToAnnotationPayload(0, true) require.NoError(t, err) assert.True(t, payload.HandleValueResets) - payload, err = SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{HandleValueResets: false}) + payload, err = SeriesAttributesToAnnotationPayload(0, false) require.NoError(t, err) assert.False(t, payload.HandleValueResets) } diff --git a/src/x/headers/headers.go b/src/x/headers/headers.go index 65a4a74fb8..579ac13080 100644 --- a/src/x/headers/headers.go +++ b/src/x/headers/headers.go @@ -43,6 +43,11 @@ const ( // Valid values are "unaggregated" or "aggregated". MetricsTypeHeader = M3HeaderPrefix + "Metrics-Type" + // PromTypeHeader sets the prometheus metric type. Valid values are + // "counter", "gauge", etc. (see src/query/api/v1/handler/prometheus/remote/write.go + // field `headerToMetricType`) + PromTypeHeader = M3HeaderPrefix + "Prom-Type" + // WriteTypeHeader is a header that controls if default // writes should be written to both unaggregated and aggregated // namespaces, or if unaggregated values are skipped and