From e40a6a068dd12afa6ae51ac71d4aa1d04d8b12f2 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 20 Feb 2017 18:12:32 +0100 Subject: [PATCH] use storage-schemas.conf and storage-aggregation.conf to configure numchunks, chunkspans, ttls and aggregations. For carbon also to find out the raw interval. For other input plugins we only honor interval specified in the data --- api/dataprocessor_test.go | 2 +- api/graphite.go | 2 +- api/query_engine.go | 10 ++- docker/docker-cluster/metrictank.ini | 25 ------ .../docker-compose.yml | 2 + .../metrictank.ini | 25 ------ docs/config.md | 23 ------ input/carbon/carbon.go | 31 +------ input/input.go | 2 +- mdata/aggmetric.go | 15 ++-- mdata/aggmetric_test.go | 3 +- mdata/aggmetrics.go | 22 ++--- mdata/aggregator.go | 15 ++-- mdata/aggsettings.go | 82 ------------------- mdata/ifaces.go | 11 ++- mdata/init.go | 39 ++++++++- mdata/notifier.go | 7 +- mdata/notifierKafka/notifierKafka.go | 1 + mdata/notifierNsq/notifierNsq.go | 12 ++- mdata/schema.go | 39 +++++++++ mdata/store_cassandra.go | 3 +- metrictank-sample.ini | 25 ------ metrictank.go | 36 ++------ scripts/config/metrictank-docker.ini | 3 - scripts/config/metrictank-package.ini | 3 - usage/usage.go | 2 +- usage/usage_test.go | 2 +- 27 files changed, 153 insertions(+), 289 deletions(-) delete mode 100644 mdata/aggsettings.go create mode 100644 mdata/schema.go diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index c90e6b40d1..e21f14051c 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -566,7 +566,7 @@ func TestGetSeriesFixed(t *testing.T) { for to := uint32(31); to <= 40; to++ { // should always yield result with last point at 30 (because to is exclusive) name := fmt.Sprintf("case.data.offset.%d.query:%d-%d", offset, from, to) - metric := metrics.GetOrCreate(name) + metric := metrics.GetOrCreate(name, name) metric.Add(offset, 10) // this point will always be quantized to 10 metric.Add(10+offset, 20) // this point will always be quantized to 20, so it should be selected metric.Add(20+offset, 30) // this point will always be quantized to 30, so it should be selected diff --git a/api/graphite.go b/api/graphite.go index f942622c77..0f5de45cb0 100644 --- a/api/graphite.go +++ b/api/graphite.go @@ -279,7 +279,7 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR ctx.Req.Method, from, to, len(request.Targets), request.MaxDataPoints) } - reqs, err = alignRequests(uint32(time.Now().Unix()), reqs, s.MemoryStore.AggSettings()) + reqs, err = alignRequests(uint32(time.Now().Unix()), reqs, s.MemoryStore.Schemas()) if err != nil { log.Error(3, "HTTP Render alignReq error: %s", err) response.Write(ctx, response.WrapError(err)) diff --git a/api/query_engine.go b/api/query_engine.go index 5e9e15c8f2..ca518a8d37 100644 --- a/api/query_engine.go +++ b/api/query_engine.go @@ -1,8 +1,8 @@ package api import ( + "github.com/lomik/go-carbon/persister" "github.com/raintank/metrictank/api/models" - "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/stats" "github.com/raintank/metrictank/util" ) @@ -18,10 +18,10 @@ var ( ) // alignRequests updates the requests with all details for fetching, making sure all metrics are in the same, optimal interval -// luckily, all metrics still use the same aggSettings, making this a bit simpler // note: it is assumed that all requests have the same from & to. // also takes a "now" value which we compare the TTL against -func alignRequests(now uint32, reqs []models.Req, s mdata.AggSettings) ([]models.Req, error) { +func alignRequests(now uint32, reqs []models.Req, s persister.WhisperSchemas) ([]models.Req, error) { + // TODO figure out how to use schemas here tsRange := (reqs[0].To - reqs[0].From) @@ -65,6 +65,10 @@ func alignRequests(now uint32, reqs []models.Req, s mdata.AggSettings) ([]models interval = s.Aggs[archive-1].Span } + // then, make sure all series are at the same step, runtime consolidating as necessary + // note: we could have been able to just fetch a rollup to get the correct data. + // maybe for v2 of this. + /* we now just need to update the following properties for each req: archive int // 0 means original data, 1 means first agg level, 2 means 2nd, etc. archInterval uint32 // the interval corresponding to the archive we'll fetch diff --git a/docker/docker-cluster/metrictank.ini b/docker/docker-cluster/metrictank.ini index fb37e1a05e..a5270a5c4f 100644 --- a/docker/docker-cluster/metrictank.ini +++ b/docker/docker-cluster/metrictank.ini @@ -10,16 +10,6 @@ accounting-period = 5min # see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details -# duration of raw chunks. e.g. 10min, 30min, 1h, 90min... -# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans -chunkspan = 10min -# number of raw chunks to keep in in-memory ring buffer -# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache -# (settings further down) which may be a more effective method to cache data and alleviate workload for cassandra. -numchunks = 7 -# minimum wait before raw metrics are removed from storage -ttl = 35d - # max age for a chunk before to be considered stale and to be persisted to Cassandra chunk-max-stale = 1h # max age for a metric before to be considered stale and to be purged from memory @@ -32,18 +22,6 @@ gc-interval = 1h # in clusters, best to assure the primary has saved all the data that a newly warmup instance will need to query, to prevent gaps in charts warm-up-period = 1h -# settings for rollups (aggregation for archives) -# comma-separated list of archive specifications. -# archive specification is of the form: aggSpan:chunkSpan:numChunks:TTL[:ready as bool. default true] -# with these aggregation rules: 5min:1h:2:3mon,1h:6h:2:1y:false you get: -# - 5 min of data, store in a chunk that lasts 1hour, keep 2 chunks in in-memory ring buffer, keep for 3months in cassandra -# - 1hr worth of data, in chunks of 6 hours, 2 chunks in in-memory ring buffer, keep for 1 year, but this series is not ready yet for querying. -# When running a cluster of metrictank instances, all instances should have the same agg-settings. -# Note: -# * chunk spans must be valid values as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans -# * numchunks -like the global setting- has nuanced use compared to chunk cache. see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md -agg-settings = - ## metric data storage in cassandra ## # see https://github.com/raintank/metrictank/blob/master/docs/cassandra.md for more details @@ -159,9 +137,6 @@ enabled = true addr = :2003 # represents the "partition" of your data if you decide to partition your data. partition = 0 -# needed to know your raw resolution for your metrics. see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf -# NOTE: does NOT use aggregation and retention settings from this file. We use agg-settings and ttl for that. -schemas-file = /etc/raintank/storage-schemas.conf ### kafka-mdm input (optional, recommended) [kafka-mdm-in] diff --git a/docker/docker-dev-custom-cfg-kafka/docker-compose.yml b/docker/docker-dev-custom-cfg-kafka/docker-compose.yml index 79332f2c59..46b1b01d33 100644 --- a/docker/docker-dev-custom-cfg-kafka/docker-compose.yml +++ b/docker/docker-dev-custom-cfg-kafka/docker-compose.yml @@ -10,6 +10,8 @@ services: volumes: - ../../build/metrictank:/usr/bin/metrictank - ./metrictank.ini:/etc/raintank/metrictank.ini + - ./storage-schemas.conf:/etc/raintank/storage-schemas.conf + - ./storage-aggregation.conf:/etc/raintank/storage-aggregation.conf environment: WAIT_HOSTS: cassandra:9042,kafka:9092 WAIT_TIMEOUT: 60 diff --git a/docker/docker-dev-custom-cfg-kafka/metrictank.ini b/docker/docker-dev-custom-cfg-kafka/metrictank.ini index 280b4a8949..45ebe3f061 100644 --- a/docker/docker-dev-custom-cfg-kafka/metrictank.ini +++ b/docker/docker-dev-custom-cfg-kafka/metrictank.ini @@ -10,16 +10,6 @@ accounting-period = 5min # see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details -# duration of raw chunks. e.g. 10min, 30min, 1h, 90min... -# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans -chunkspan = 2min -# number of raw chunks to keep in in-memory ring buffer -# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache -# (settings further down) which may be a more effective method to cache data and alleviate workload for cassandra. -numchunks = 2 -# minimum wait before raw metrics are removed from storage -ttl = 35d - # max age for a chunk before to be considered stale and to be persisted to Cassandra chunk-max-stale = 1h # max age for a metric before to be considered stale and to be purged from memory @@ -32,18 +22,6 @@ gc-interval = 1h # in clusters, best to assure the primary has saved all the data that a newly warmup instance will need to query, to prevent gaps in charts warm-up-period = 1h -# settings for rollups (aggregation for archives) -# comma-separated list of archive specifications. -# archive specification is of the form: aggSpan:chunkSpan:numChunks:TTL[:ready as bool. default true] -# with these aggregation rules: 5min:1h:2:3mon,1h:6h:2:1y:false you get: -# - 5 min of data, store in a chunk that lasts 1hour, keep 2 chunks in in-memory ring buffer, keep for 3months in cassandra -# - 1hr worth of data, in chunks of 6 hours, 2 chunks in in-memory ring buffer, keep for 1 year, but this series is not ready yet for querying. -# When running a cluster of metrictank instances, all instances should have the same agg-settings. -# Note: -# * chunk spans must be valid values as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans -# * numchunks -like the global setting- has nuanced use compared to chunk cache. see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md -agg-settings = - ## metric data storage in cassandra ## # see https://github.com/raintank/metrictank/blob/master/docs/cassandra.md for more details @@ -159,9 +137,6 @@ enabled = true addr = :2003 # represents the "partition" of your data if you decide to partition your data. partition = 0 -# needed to know your raw resolution for your metrics. see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf -# NOTE: does NOT use aggregation and retention settings from this file. We use agg-settings and ttl for that. -schemas-file = /etc/raintank/storage-schemas.conf ### kafka-mdm input (optional, recommended) [kafka-mdm-in] diff --git a/docs/config.md b/docs/config.md index a5cc96b15a..a2416f0009 100644 --- a/docs/config.md +++ b/docs/config.md @@ -35,15 +35,6 @@ accounting-period = 5min ``` # see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details -# duration of raw chunks. e.g. 10min, 30min, 1h, 90min... -# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans -chunkspan = 10min -# number of raw chunks to keep in in-memory ring buffer -# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache -# (settings further down) which may be a more effective method to cache data and alleviate workload for cassandra. -numchunks = 7 -# minimum wait before raw metrics are removed from storage -ttl = 35d # max age for a chunk before to be considered stale and to be persisted to Cassandra chunk-max-stale = 1h # max age for a metric before to be considered stale and to be purged from in-memory ring buffer. @@ -54,17 +45,6 @@ gc-interval = 1h # shorter warmup means metrictank will need to query cassandra more if it doesn't have requested data yet. # in clusters, best to assure the primary has saved all the data that a newly warmup instance will need to query, to prevent gaps in charts warm-up-period = 1h -# settings for rollups (aggregation for archives) -# comma-separated list of archive specifications. -# archive specification is of the form: aggSpan:chunkSpan:numChunks:TTL[:ready as bool. default true] -# with these aggregation rules: 5min:1h:2:3mon,1h:6h:2:1y:false you get: -# - 5 min of data, store in a chunk that lasts 1hour, keep 2 chunks in in-memory ring buffer, keep for 3months in cassandra -# - 1hr worth of data, in chunks of 6 hours, 2 chunks in in-memory ring buffer, keep for 1 year, but this series is not ready yet for querying. -# When running a cluster of metrictank instances, all instances should have the same agg-settings. -# Note: -# * chunk spans must be valid values as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans -# * numchunks -like the global setting- has nuanced use compared to chunk cache. see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md -agg-settings = ``` ## metric data storage in cassandra ## @@ -189,9 +169,6 @@ enabled = false addr = :2003 # represents the "partition" of your data if you decide to partition your data. partition = 0 -# needed to know your raw resolution for your metrics. see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf -# NOTE: does NOT use aggregation and retention settings from this file. We use agg-settings and ttl for that. -schemas-file = /path/to/your/schemas-file ``` ### kafka-mdm input (optional, recommended) diff --git a/input/carbon/carbon.go b/input/carbon/carbon.go index 3107c931f4..8c369984df 100644 --- a/input/carbon/carbon.go +++ b/input/carbon/carbon.go @@ -9,10 +9,10 @@ import ( "net" "sync" - "github.com/lomik/go-carbon/persister" "github.com/metrics20/go-metrics20/carbon20" "github.com/raintank/metrictank/cluster" "github.com/raintank/metrictank/input" + "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/stats" "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" @@ -29,7 +29,6 @@ type Carbon struct { input.Handler addrStr string addr *net.TCPAddr - schemas persister.WhisperSchemas listener *net.TCPListener handlerWaitGroup sync.WaitGroup quit chan struct{} @@ -73,8 +72,6 @@ func (c *Carbon) Name() string { var Enabled bool var addr string -var schemasFile string -var schemas persister.WhisperSchemas var partitionId int func ConfigSetup() { @@ -82,7 +79,6 @@ func ConfigSetup() { inCarbon.BoolVar(&Enabled, "enabled", false, "") inCarbon.StringVar(&addr, "addr", ":2003", "tcp listen address") inCarbon.IntVar(&partitionId, "partition", 0, "partition Id.") - inCarbon.StringVar(&schemasFile, "schemas-file", "/path/to/your/schemas-file", "see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf") globalconf.Register("carbon-in", inCarbon) } @@ -90,25 +86,6 @@ func ConfigProcess() { if !Enabled { return } - var err error - schemas, err = persister.ReadWhisperSchemas(schemasFile) - if err != nil { - log.Fatal(4, "carbon-in: can't read schemas file %q: %s", schemasFile, err.Error()) - } - var defaultFound bool - for _, schema := range schemas { - if schema.Pattern.String() == ".*" { - defaultFound = true - } - if len(schema.Retentions) == 0 { - log.Fatal(4, "carbon-in: retention setting cannot be empty") - } - } - if !defaultFound { - // good graphite health (not sure what graphite does if there's no .*) - // but we definitely need to always be able to determine which interval to use - log.Fatal(4, "carbon-in: storage-conf does not have a default '.*' pattern") - } cluster.Manager.SetPartitions([]int32{int32(partitionId)}) } @@ -120,7 +97,6 @@ func New() *Carbon { return &Carbon{ addrStr: addr, addr: addrT, - schemas: schemas, connTrack: NewConnTrack(), } } @@ -196,10 +172,7 @@ func (c *Carbon) handle(conn net.Conn) { continue } name := string(key) - s, ok := c.schemas.Match(name) - if !ok { - log.Fatal(4, "carbon-in: couldn't find a schema for %q - this is impossible since we asserted there was a default with patt .*", name) - } + s := mdata.Match(name) interval := s.Retentions[0].SecondsPerPoint() md := &schema.MetricData{ Name: name, diff --git a/input/input.go b/input/input.go index 0359fdfdb5..7a1de699b2 100644 --- a/input/input.go +++ b/input/input.go @@ -75,7 +75,7 @@ func (in DefaultHandler) Process(metric *schema.MetricData, partition int32) { } pre = time.Now() - m := in.metrics.GetOrCreate(metric.Id) + m := in.metrics.GetOrCreate(metric.Id, metric.Name) m.Add(uint32(metric.Time), metric.Value) if in.usage != nil { in.usage.Add(metric.OrgId, metric.Id) diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index 4e89167499..988a1c5b98 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -6,6 +6,7 @@ import ( "sync" "time" + whisper "github.com/lomik/go-whisper" "github.com/raintank/metrictank/cluster" "github.com/raintank/metrictank/consolidation" "github.com/raintank/metrictank/mdata/cache" @@ -39,21 +40,21 @@ type AggMetric struct { // NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long // it optionally also creates aggregations with the given settings -func NewAggMetric(store Store, cachePusher cache.CachePusher, key string, chunkSpan, numChunks uint32, ttl uint32, aggsetting ...AggSetting) *AggMetric { +func NewAggMetric(store Store, cachePusher cache.CachePusher, key string, retentions whisper.Retentions) *AggMetric { m := AggMetric{ cachePusher: cachePusher, store: store, Key: key, - ChunkSpan: chunkSpan, - NumChunks: numChunks, - Chunks: make([]*chunk.Chunk, 0, numChunks), - ttl: ttl, + ChunkSpan: retentions[0].ChunkSpan, + NumChunks: retentions[0].NumChunks, + Chunks: make([]*chunk.Chunk, 0, retentions[0].NumChunks), // note: during parsing of retentions, we assure there's at least 1. + ttl: uint32(retentions[0].MaxRetention()), // we set LastWrite here to make sure a new Chunk doesn't get immediately // garbage collected right after creating it, before we can push to it. lastWrite: uint32(time.Now().Unix()), } - for _, as := range aggsetting { - m.aggregators = append(m.aggregators, NewAggregator(store, cachePusher, key, as.Span, as.ChunkSpan, as.NumChunks, as.TTL)) + for _, ret := range retentions[1:] { + m.aggregators = append(m.aggregators, NewAggregator(store, cachePusher, key, *ret)) } return &m diff --git a/mdata/aggmetric_test.go b/mdata/aggmetric_test.go index 5d8de75471..06d43ff34e 100644 --- a/mdata/aggmetric_test.go +++ b/mdata/aggmetric_test.go @@ -231,7 +231,8 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) { maxT := 3600 * 24 * uint32(b.N) // b.N in days for t := uint32(1); t < maxT; t += 10 { for metricI := 0; metricI < 1000; metricI++ { - m := metrics.GetOrCreate(keys[metricI]) + k := keys[metricI] + m := metrics.GetOrCreate(k, k) m.Add(t, float64(t)) } } diff --git a/mdata/aggmetrics.go b/mdata/aggmetrics.go index e411736376..937e6ebfdc 100644 --- a/mdata/aggmetrics.go +++ b/mdata/aggmetrics.go @@ -4,6 +4,7 @@ import ( "sync" "time" + "github.com/lomik/go-carbon/persister" "github.com/raintank/metrictank/mdata/cache" "github.com/raintank/worldping-api/pkg/log" ) @@ -13,22 +14,16 @@ type AggMetrics struct { cachePusher cache.CachePusher sync.RWMutex Metrics map[string]*AggMetric - chunkSpan uint32 - numChunks uint32 - aggSettings AggSettings // for now we apply the same settings to all AggMetrics. later we may want to have different settings. chunkMaxStale uint32 metricMaxStale uint32 gcInterval time.Duration } -func NewAggMetrics(store Store, cachePusher cache.CachePusher, chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, ttl uint32, gcInterval time.Duration, aggSettings []AggSetting) *AggMetrics { +func NewAggMetrics(store Store, cachePusher cache.CachePusher, chunkMaxStale, metricMaxStale uint32, gcInterval time.Duration) *AggMetrics { ms := AggMetrics{ store: store, cachePusher: cachePusher, Metrics: make(map[string]*AggMetric), - chunkSpan: chunkSpan, - numChunks: numChunks, - aggSettings: AggSettings{ttl, aggSettings}, chunkMaxStale: chunkMaxStale, metricMaxStale: metricMaxStale, gcInterval: gcInterval, @@ -49,8 +44,8 @@ func (ms *AggMetrics) GC() { time.Sleep(diff + time.Minute) log.Info("checking for stale chunks that need persisting.") now := uint32(time.Now().Unix()) - chunkMinTs := now - (now % ms.chunkSpan) - uint32(ms.chunkMaxStale) - metricMinTs := now - (now % ms.chunkSpan) - uint32(ms.metricMaxStale) + chunkMinTs := now - uint32(ms.chunkMaxStale) + metricMinTs := now - uint32(ms.metricMaxStale) // as this is the only goroutine that can delete from ms.Metrics // we only need to lock long enough to get the list of actives metrics. @@ -85,11 +80,12 @@ func (ms *AggMetrics) Get(key string) (Metric, bool) { return m, ok } -func (ms *AggMetrics) GetOrCreate(key string) Metric { +func (ms *AggMetrics) GetOrCreate(key, name string) Metric { ms.Lock() m, ok := ms.Metrics[key] if !ok { - m = NewAggMetric(ms.store, ms.cachePusher, key, ms.chunkSpan, ms.numChunks, ms.aggSettings.RawTTL, ms.aggSettings.Aggs...) + s := Match(name) + m = NewAggMetric(ms.store, ms.cachePusher, key, s.Retentions) ms.Metrics[key] = m metricsActive.Set(len(ms.Metrics)) } @@ -97,6 +93,6 @@ func (ms *AggMetrics) GetOrCreate(key string) Metric { return m } -func (ms *AggMetrics) AggSettings() AggSettings { - return ms.aggSettings +func (ms *AggMetrics) Schemas() persister.WhisperSchemas { + return schemas } diff --git a/mdata/aggregator.go b/mdata/aggregator.go index c686fe38e4..f398c0ba72 100644 --- a/mdata/aggregator.go +++ b/mdata/aggregator.go @@ -2,6 +2,8 @@ package mdata import ( "fmt" + + whisper "github.com/lomik/go-whisper" "github.com/raintank/metrictank/mdata/cache" ) @@ -28,15 +30,16 @@ type Aggregator struct { cntMetric *AggMetric } -func NewAggregator(store Store, cachePusher cache.CachePusher, key string, aggSpan, aggChunkSpan, aggNumChunks uint32, ttl uint32) *Aggregator { +func NewAggregator(store Store, cachePusher cache.CachePusher, key string, ret whisper.Retention) *Aggregator { + span := uint32(ret.SecondsPerPoint()) return &Aggregator{ key: key, - span: aggSpan, + span: span, agg: NewAggregation(), - minMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_min_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), - maxMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_max_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), - sumMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_sum_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), - cntMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_cnt_%d", key, aggSpan), aggChunkSpan, aggNumChunks, ttl), + minMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_min_%d", key, span), whisper.Retentions{&ret}), + maxMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_max_%d", key, span), whisper.Retentions{&ret}), + sumMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_sum_%d", key, span), whisper.Retentions{&ret}), + cntMetric: NewAggMetric(store, cachePusher, fmt.Sprintf("%s_cnt_%d", key, span), whisper.Retentions{&ret}), } } func (agg *Aggregator) flush() { diff --git a/mdata/aggsettings.go b/mdata/aggsettings.go deleted file mode 100644 index d8f5c3ca93..0000000000 --- a/mdata/aggsettings.go +++ /dev/null @@ -1,82 +0,0 @@ -package mdata - -import ( - "errors" - "fmt" - "sort" - "strconv" - "strings" - - "github.com/raintank/dur" - "github.com/raintank/metrictank/mdata/chunk" -) - -type AggSetting struct { - Span uint32 // in seconds, controls how many input points go into an aggregated point. - ChunkSpan uint32 // duration of chunk of aggregated metric for storage, controls how many aggregated points go into 1 chunk - NumChunks uint32 // number of chunks to keep in memory. remember, for a query from now until 3 months ago, we will end up querying the memory server as well. - TTL uint32 // how many seconds to keep the chunk in cassandra - Ready bool // ready for reads? -} - -type AggSettings struct { - RawTTL uint32 // TTL for raw data - Aggs []AggSetting // aggregations -} - -func NewAggSetting(span, chunkSpan, numChunks, ttl uint32, ready bool) AggSetting { - return AggSetting{ - Span: span, - ChunkSpan: chunkSpan, - NumChunks: numChunks, - TTL: ttl, - Ready: ready, - } -} - -type AggSettingsSpanAsc []AggSetting - -func (a AggSettingsSpanAsc) Len() int { return len(a) } -func (a AggSettingsSpanAsc) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a AggSettingsSpanAsc) Less(i, j int) bool { return a[i].Span < a[j].Span } - -func ParseAggSettings(input string) ([]AggSetting, error) { - set := strings.Split(input, ",") - settings := make([]AggSetting, 0) - for _, v := range set { - if v == "" { - continue - } - fields := strings.Split(v, ":") - if len(fields) < 4 { - return nil, errors.New("bad agg settings format") - } - aggSpan := dur.MustParseUNsec("aggsettings", fields[0]) - aggChunkSpan := dur.MustParseUNsec("aggsettings", fields[1]) - aggNumChunks := dur.MustParseUNsec("aggsettings", fields[2]) - aggTTL := dur.MustParseUNsec("aggsettings", fields[3]) - if (Month_sec % aggChunkSpan) != 0 { - return nil, errors.New("aggChunkSpan must fit without remainders into month_sec (28*24*60*60)") - } - _, ok := chunk.RevChunkSpans[aggChunkSpan] - if !ok { - return nil, fmt.Errorf("aggChunkSpan %s is not a valid value (https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans)", fields[1]) - } - ready := true - var err error - if len(fields) == 5 { - ready, err = strconv.ParseBool(fields[4]) - if err != nil { - return nil, fmt.Errorf("aggsettings ready: %s", err) - } - } - settings = append(settings, NewAggSetting(aggSpan, aggChunkSpan, aggNumChunks, aggTTL, ready)) - } - - spanAsc := AggSettingsSpanAsc(settings) - if !sort.IsSorted(spanAsc) { - return nil, errors.New("aggregation settings need to be ordered by span, in ascending order") - } - - return settings, nil -} diff --git a/mdata/ifaces.go b/mdata/ifaces.go index 6abbb2aa28..d55b1a5818 100644 --- a/mdata/ifaces.go +++ b/mdata/ifaces.go @@ -1,12 +1,15 @@ package mdata -import "github.com/raintank/metrictank/consolidation" -import "github.com/raintank/metrictank/mdata/chunk" +import ( + "github.com/lomik/go-carbon/persister" + "github.com/raintank/metrictank/consolidation" + "github.com/raintank/metrictank/mdata/chunk" +) type Metrics interface { Get(key string) (Metric, bool) - GetOrCreate(key string) Metric - AggSettings() AggSettings + GetOrCreate(key, name string) Metric + Schemas() persister.WhisperSchemas } type Metric interface { diff --git a/mdata/init.go b/mdata/init.go index b8d67b1fb7..c37184fd7e 100644 --- a/mdata/init.go +++ b/mdata/init.go @@ -3,7 +3,12 @@ // save states over the network package mdata -import "github.com/raintank/metrictank/stats" +import ( + "log" + + "github.com/lomik/go-carbon/persister" + "github.com/raintank/metrictank/stats" +) var ( LogLevel int @@ -37,4 +42,36 @@ var ( // metric tank.gc_metric is the number of times the metrics GC is about to inspect a metric (series) gcMetric = stats.NewCounter32("tank.gc_metric") + + schemas persister.WhisperSchemas + aggregations *persister.WhisperAggregation + + schemasFile = "/etc/raintank/storage-schemas.conf" + aggFile = "/etc/raintank/storage-aggregation.conf" ) + +func ConfigProcess() { + var err error + schemas, err = persister.ReadWhisperSchemas(schemasFile) + if err != nil { + log.Fatal(4, "can't read schemas file %q: %s", schemasFile, err.Error()) + } + var defaultFound bool + for _, schema := range schemas { + if schema.Pattern.String() == ".*" { + defaultFound = true + } + if len(schema.Retentions) == 0 { + log.Fatal(4, "retention setting cannot be empty") + } + } + if !defaultFound { + // good graphite health (not sure what graphite does if there's no .*) + // but we definitely need to always be able to determine which interval to use + log.Fatal(4, "storage-conf does not have a default '.*' pattern") + } + aggregations, err = persister.ReadWhisperAggregation(aggFile) + if err != nil { + log.Fatal(4, "can't read storage-aggregation file %q: %s", aggFile, err.Error()) + } +} diff --git a/mdata/notifier.go b/mdata/notifier.go index 610312d857..08e23e33b5 100644 --- a/mdata/notifier.go +++ b/mdata/notifier.go @@ -28,6 +28,7 @@ const PersistMessageBatchV1 = 1 type PersistMessage struct { Instance string `json:"instance"` Key string `json:"key"` + Name string `json:"name"` T0 uint32 `json:"t0"` } @@ -39,6 +40,8 @@ type PersistMessageBatch struct { type SavedChunk struct { Key string `json:"key"` T0 uint32 `json:"t0"` + + Name string `json:"name"` // filled in when handler does index lookup } func SendPersistMessage(key string, t0 uint32) { @@ -82,7 +85,7 @@ func (cl Notifier) Handle(data []byte) { } } if cl.CreateMissingMetrics { - agg := cl.Metrics.GetOrCreate(key[0]) + agg := cl.Metrics.GetOrCreate(key[0], c.Name) if len(key) == 3 { agg.(*AggMetric).SyncAggregatedChunkSaveState(c.T0, consolidator, uint32(aggSpan)) } else { @@ -107,7 +110,7 @@ func (cl Notifier) Handle(data []byte) { messagesReceived.Add(1) // get metric if cl.CreateMissingMetrics { - agg := cl.Metrics.GetOrCreate(ms.Key) + agg := cl.Metrics.GetOrCreate(ms.Key, ms.Name) agg.(*AggMetric).SyncChunkSaveState(ms.T0) } else if agg, ok := cl.Metrics.Get(ms.Key); ok { agg.(*AggMetric).SyncChunkSaveState(ms.T0) diff --git a/mdata/notifierKafka/notifierKafka.go b/mdata/notifierKafka/notifierKafka.go index a5b4dbec27..bfbf409279 100644 --- a/mdata/notifierKafka/notifierKafka.go +++ b/mdata/notifierKafka/notifierKafka.go @@ -238,6 +238,7 @@ func (c *NotifierKafka) flush() { buf := bytes.NewBuffer(c.bPool.Get()) binary.Write(buf, binary.LittleEndian, uint8(mdata.PersistMessageBatchV1)) encoder := json.NewEncoder(buf) + c.buf[i].Name = def.Name pMsg = mdata.PersistMessageBatch{Instance: c.instance, SavedChunks: c.buf[i : i+1]} err := encoder.Encode(&pMsg) if err != nil { diff --git a/mdata/notifierNsq/notifierNsq.go b/mdata/notifierNsq/notifierNsq.go index edf98dc521..5f02106e06 100644 --- a/mdata/notifierNsq/notifierNsq.go +++ b/mdata/notifierNsq/notifierNsq.go @@ -4,10 +4,12 @@ import ( "bytes" "encoding/binary" "encoding/json" + "strings" "time" "github.com/bitly/go-hostpool" "github.com/nsqio/go-nsq" + "github.com/raintank/metrictank/idx" "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/mdata/notifierNsq/instrumented_nsq" "github.com/raintank/metrictank/stats" @@ -24,9 +26,10 @@ type NotifierNSQ struct { buf []mdata.SavedChunk instance string mdata.Notifier + idx idx.MetricIndex } -func New(instance string, metrics mdata.Metrics) *NotifierNSQ { +func New(instance string, metrics mdata.Metrics, idx idx.MetricIndex) *NotifierNSQ { // metric cluster.notifier.nsq.messages-published is a counter of messages published to the nsq cluster notifier messagesPublished = stats.NewCounter32("cluster.notifier.nsq.messages-published") // metric cluster.notifier.nsq.message_size is the sizes seen of messages through the nsq cluster notifier @@ -55,6 +58,7 @@ func New(instance string, metrics mdata.Metrics) *NotifierNSQ { Instance: instance, Metrics: metrics, }, + idx: idx, } consumer.AddConcurrentHandlers(c, 2) @@ -78,6 +82,12 @@ func (c *NotifierNSQ) HandleMessage(m *nsq.Message) error { } func (c *NotifierNSQ) Send(sc mdata.SavedChunk) { + def, ok := c.idx.Get(strings.SplitN(sc.Key, "_", 2)[0]) + if !ok { + log.Error(3, "nsq-cluster: failed to lookup metricDef with id %s", sc.Key) + return + } + sc.Name = def.Name c.in <- sc } diff --git a/mdata/schema.go b/mdata/schema.go new file mode 100644 index 0000000000..0e0d98f995 --- /dev/null +++ b/mdata/schema.go @@ -0,0 +1,39 @@ +package mdata + +import ( + "github.com/lomik/go-carbon/persister" + "github.com/raintank/metrictank/util" +) + +// Match returns the schema for the given metric key. +// it will always find the schema because we made sure there is a catchall '.*' pattern +func Match(key string) persister.Schema { + schema, _ := schemas.Match(key) + return schema +} + +// TTLs returns a slice of all TTL's seen amongst all archives of all schemas +func TTLs() []uint32 { + ttls := make(map[uint32]struct{}) + for _, s := range schemas { + for _, r := range s.Retentions { + ttls[uint32(r.MaxRetention())] = struct{}{} + } + } + var ttlSlice []uint32 + for ttl := range ttls { + ttlSlice = append(ttlSlice, ttl) + } + return ttlSlice +} + +// MaxChunkSpan returns the largest chunkspan seen amongst all archives of all schemas +func MaxChunkSpan() uint32 { + max := uint32(0) + for _, s := range schemas { + for _, r := range s.Retentions { + max = util.Max(max, r.ChunkSpan) + } + } + return max +} diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index b1b9b4f1f8..8c59a0f8f7 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -137,7 +137,8 @@ func getTTLTables(ttls []uint32, windowFactor int, nameFormat string) ttlTables return tables } -func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, hostSelectionPolicy string, timeout, readers, writers, readqsize, writeqsize, retries, protoVer, windowFactor int, ssl, auth, hostVerification bool, ttls []uint32) (*cassandraStore, error) { +func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, hostSelectionPolicy string, timeout, readers, writers, readqsize, writeqsize, retries, protoVer, windowFactor int, ssl, auth, hostVerification bool) (*cassandraStore, error) { + ttls := TTLs() stats.NewGauge32("store.cassandra.write_queue.size").Set(writeqsize) stats.NewGauge32("store.cassandra.num_writers").Set(writers) diff --git a/metrictank-sample.ini b/metrictank-sample.ini index 2bfc5ddecb..19baebca4a 100644 --- a/metrictank-sample.ini +++ b/metrictank-sample.ini @@ -13,16 +13,6 @@ accounting-period = 5min # see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for more details -# duration of raw chunks. e.g. 10min, 30min, 1h, 90min... -# must be valid value as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans -chunkspan = 10min -# number of raw chunks to keep in in-memory ring buffer -# See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache -# (settings further down) which may be a more effective method to cache data and alleviate workload for cassandra. -numchunks = 7 -# minimum wait before raw metrics are removed from storage -ttl = 35d - # max age for a chunk before to be considered stale and to be persisted to Cassandra chunk-max-stale = 1h # max age for a metric before to be considered stale and to be purged from in-memory ring buffer. @@ -35,18 +25,6 @@ gc-interval = 1h # in clusters, best to assure the primary has saved all the data that a newly warmup instance will need to query, to prevent gaps in charts warm-up-period = 1h -# settings for rollups (aggregation for archives) -# comma-separated list of archive specifications. -# archive specification is of the form: aggSpan:chunkSpan:numChunks:TTL[:ready as bool. default true] -# with these aggregation rules: 5min:1h:2:3mon,1h:6h:2:1y:false you get: -# - 5 min of data, store in a chunk that lasts 1hour, keep 2 chunks in in-memory ring buffer, keep for 3months in cassandra -# - 1hr worth of data, in chunks of 6 hours, 2 chunks in in-memory ring buffer, keep for 1 year, but this series is not ready yet for querying. -# When running a cluster of metrictank instances, all instances should have the same agg-settings. -# Note: -# * chunk spans must be valid values as described here https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans -# * numchunks -like the global setting- has nuanced use compared to chunk cache. see https://github.com/raintank/metrictank/blob/master/docs/memory-server.md -agg-settings = - ## metric data storage in cassandra ## # see https://github.com/raintank/metrictank/blob/master/docs/cassandra.md for more details @@ -162,9 +140,6 @@ enabled = false addr = :2003 # represents the "partition" of your data if you decide to partition your data. partition = 0 -# needed to know your raw resolution for your metrics. see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf -# NOTE: does NOT use aggregation and retention settings from this file. We use agg-settings and ttl for that. -schemas-file = /path/to/your/schemas-file ### kafka-mdm input (optional, recommended) [kafka-mdm-in] diff --git a/metrictank.go b/metrictank.go index 492c9574d6..daf12a4c9c 100644 --- a/metrictank.go +++ b/metrictank.go @@ -29,13 +29,11 @@ import ( inKafkaMdm "github.com/raintank/metrictank/input/kafkamdm" "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/mdata/cache" - "github.com/raintank/metrictank/mdata/chunk" "github.com/raintank/metrictank/mdata/notifierKafka" "github.com/raintank/metrictank/mdata/notifierNsq" "github.com/raintank/metrictank/stats" statsConfig "github.com/raintank/metrictank/stats/config" "github.com/raintank/metrictank/usage" - "github.com/raintank/metrictank/util" "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" ) @@ -57,17 +55,11 @@ var ( accountingPeriodStr = flag.String("accounting-period", "5min", "accounting period to track per-org usage metrics") // Data: - chunkSpanStr = flag.String("chunkspan", "10min", "duration of raw chunks") - numChunksInt = flag.Int("numchunks", 7, "number of raw chunks to keep in in-memory ring buffer. See https://github.com/raintank/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache") - ttlStr = flag.String("ttl", "35d", "minimum wait before metrics are removed from storage") - chunkMaxStaleStr = flag.String("chunk-max-stale", "1h", "max age for a chunk before to be considered stale and to be persisted to Cassandra.") metricMaxStaleStr = flag.String("metric-max-stale", "6h", "max age for a metric before to be considered stale and to be purged from memory.") gcIntervalStr = flag.String("gc-interval", "1h", "Interval to run garbage collection job.") warmUpPeriodStr = flag.String("warm-up-period", "1h", "duration before secondary nodes start serving requests") - aggSettingsStr = flag.String("agg-settings", "", "aggregation settings: :::[:] (may be given multiple times as comma-separated list)") - // Cassandra: cassandraAddrs = flag.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)") cassandraKeyspace = flag.String("cassandra-keyspace", "metrictank", "cassandra keyspace to use for storing the metric data table") @@ -211,6 +203,7 @@ func main() { notifierNsq.ConfigProcess() notifierKafka.ConfigProcess(*instance) statsConfig.ConfigProcess(*instance) + mdata.ConfigProcess() if !inCarbon.Enabled && !inKafkaMdm.Enabled { log.Fatal(4, "you should enable at least 1 input plugin") @@ -219,19 +212,9 @@ func main() { sec := dur.MustParseUNsec("warm-up-period", *warmUpPeriodStr) warmupPeriod = time.Duration(sec) * time.Second - chunkSpan := dur.MustParseUNsec("chunkspan", *chunkSpanStr) - numChunks := uint32(*numChunksInt) chunkMaxStale := dur.MustParseUNsec("chunk-max-stale", *chunkMaxStaleStr) metricMaxStale := dur.MustParseUNsec("metric-max-stale", *metricMaxStaleStr) gcInterval := time.Duration(dur.MustParseUNsec("gc-interval", *gcIntervalStr)) * time.Second - ttl := dur.MustParseUNsec("ttl", *ttlStr) - if (mdata.Month_sec % chunkSpan) != 0 { - log.Fatal(4, "chunkSpan must fit without remainders into month_sec (28*24*60*60)") - } - _, ok := chunk.RevChunkSpans[chunkSpan] - if !ok { - log.Fatal(4, "chunkSpan %s is not a valid value (https://github.com/raintank/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans)", *chunkSpanStr) - } aggSettings, err := mdata.ParseAggSettings(*aggSettingsStr) if err != nil { @@ -273,11 +256,7 @@ func main() { /*********************************** Initialize our backendStore ***********************************/ - ttls := []uint32{ttl} - for _, agg := range aggSettings { - ttls = append(ttls, agg.TTL) - } - store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraWriteConcurrency, *cassandraReadQueueSize, *cassandraWriteQueueSize, *cassandraRetries, *cqlProtocolVersion, *cassandraWindowFactor, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, ttls) + store, err := mdata.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraWriteConcurrency, *cassandraReadQueueSize, *cassandraWriteQueueSize, *cassandraRetries, *cqlProtocolVersion, *cassandraWindowFactor, *cassandraSSL, *cassandraAuth, *cassandraHostVerification) if err != nil { log.Fatal(4, "failed to initialize cassandra. %s", err) } @@ -290,7 +269,7 @@ func main() { /*********************************** Initialize our MemoryStore ***********************************/ - metrics = mdata.NewAggMetrics(store, ccache, chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, gcInterval, aggSettings) + metrics = mdata.NewAggMetrics(store, ccache, chunkMaxStale, metricMaxStale, gcInterval) /*********************************** Initialize our Inputs @@ -371,7 +350,7 @@ func main() { } if notifierNsq.Enabled { - handlers = append(handlers, notifierNsq.New(*instance, metrics)) + handlers = append(handlers, notifierNsq.New(*instance, metrics, metricIndex)) } mdata.InitPersistNotifier(handlers...) @@ -392,11 +371,8 @@ func main() { // When the timer becomes 0 it means the in-memory buffer has been able to fully populate so that if you stop a primary // and it was able to save its complete chunks, this node will be able to take over without dataloss. // You can upgrade a candidate to primary while the timer is not 0 yet, it just means it may have missing data in the chunks that it will save. - highestChunkSpan := chunkSpan - for _, agg := range aggSettings { - highestChunkSpan = util.Max(chunkSpan, agg.ChunkSpan) - } - stats.NewTimeDiffReporter32("cluster.self.promotion_wait", (uint32(time.Now().Unix())/highestChunkSpan+1)*highestChunkSpan) + maxChunkSpan := mdata.MaxChunkSpan() + stats.NewTimeDiffReporter32("cluster.self.promotion_wait", (uint32(time.Now().Unix())/maxChunkSpan+1)*maxChunkSpan) /*********************************** Set our status so we can accept diff --git a/scripts/config/metrictank-docker.ini b/scripts/config/metrictank-docker.ini index be4d5147c4..2d78ac79a0 100644 --- a/scripts/config/metrictank-docker.ini +++ b/scripts/config/metrictank-docker.ini @@ -159,9 +159,6 @@ enabled = true addr = :2003 # represents the "partition" of your data if you decide to partition your data. partition = 0 -# needed to know your raw resolution for your metrics. see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf -# NOTE: does NOT use aggregation and retention settings from this file. We use agg-settings and ttl for that. -schemas-file = /etc/raintank/storage-schemas.conf ### kafka-mdm input (optional, recommended) [kafka-mdm-in] diff --git a/scripts/config/metrictank-package.ini b/scripts/config/metrictank-package.ini index 57eed8d866..1b8278ee82 100644 --- a/scripts/config/metrictank-package.ini +++ b/scripts/config/metrictank-package.ini @@ -159,9 +159,6 @@ enabled = true addr = :2003 # represents the "partition" of your data if you decide to partition your data. partition = 0 -# needed to know your raw resolution for your metrics. see http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf -# NOTE: does NOT use aggregation and retention settings from this file. We use agg-settings and ttl for that. -schemas-file = /etc/raintank/storage-schemas.conf ### kafka-mdm input (optional, recommended) [kafka-mdm-in] diff --git a/usage/usage.go b/usage/usage.go index 4d3ac4f90a..bd06a3fdde 100644 --- a/usage/usage.go +++ b/usage/usage.go @@ -101,7 +101,7 @@ func (u *Usage) Report() { met.Value = val met.SetId() - m := metrics.GetOrCreate(met.Id) + m := metrics.GetOrCreate(met.Id, met.Metric) m.Add(uint32(met.Time), met.Value) //TODO: how to set the partition of the metric? We probably just need to publish the metric to our Input Plugin metricIndex.AddOrUpdate(met, 0) diff --git a/usage/usage_test.go b/usage/usage_test.go index 722632b83b..b16a09a8d4 100644 --- a/usage/usage_test.go +++ b/usage/usage_test.go @@ -30,7 +30,7 @@ func (f *FakeAggMetrics) Get(key string) (mdata.Metric, bool) { f.Unlock() return m, ok } -func (f *FakeAggMetrics) GetOrCreate(key string) mdata.Metric { +func (f *FakeAggMetrics) GetOrCreate(key, name string) mdata.Metric { f.Lock() m, ok := f.Metrics[key] if !ok {