From 6bf366c47dc782d062b004244f99730778173694 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Sun, 27 Nov 2016 08:33:52 -0500 Subject: [PATCH] do metrics the new way --- api/api.go | 2 +- api/dataprocessor_test.go | 2 +- cassandra/metrics.go | 2 +- idx/cassandra/cassandra.go | 2 +- idx/elasticsearch/elasticsearch.go | 2 +- idx/idx.go | 2 +- idx/memory/memory.go | 2 +- input/carbon/carbon.go | 2 +- input/input.go | 2 +- input/kafkamdm/kafkamdm.go | 2 +- mdata/chunk/chunk.go | 12 ++-- mdata/init.go | 2 +- mdata/notifier.go | 2 +- mdata/notifierKafka/cfg.go | 2 +- mdata/notifierKafka/notifierKafka.go | 2 +- mdata/notifierNsq/cfg.go | 2 +- mdata/notifierNsq/notifierNsq.go | 2 +- mdata/store_cassandra.go | 7 +- metrictank.go | 96 ++-------------------------- 19 files changed, 34 insertions(+), 113 deletions(-) diff --git a/api/api.go b/api/api.go index ec31aa9885..87b4513b20 100644 --- a/api/api.go +++ b/api/api.go @@ -9,7 +9,7 @@ import ( _ "net/http/pprof" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" "github.com/raintank/metrictank/idx" "github.com/raintank/metrictank/mdata" "github.com/raintank/worldping-api/pkg/log" diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index dec20ec801..5d7974b401 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -2,11 +2,11 @@ package api import ( "fmt" - "github.com/raintank/met/helper" "github.com/raintank/metrictank/api/models" "github.com/raintank/metrictank/cluster" "github.com/raintank/metrictank/consolidation" "github.com/raintank/metrictank/mdata" + "github.com/raintank/stats" "gopkg.in/raintank/schema.v1" "math" "math/rand" diff --git a/cassandra/metrics.go b/cassandra/metrics.go index 618807fbf5..3163d3df71 100644 --- a/cassandra/metrics.go +++ b/cassandra/metrics.go @@ -5,7 +5,7 @@ import ( "strings" "github.com/gocql/gocql" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" ) type Metrics struct { diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index d122f3dc24..8866227e7d 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -9,7 +9,7 @@ import ( "time" "github.com/gocql/gocql" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" "github.com/raintank/metrictank/cassandra" "github.com/raintank/metrictank/cluster" "github.com/raintank/metrictank/idx" diff --git a/idx/elasticsearch/elasticsearch.go b/idx/elasticsearch/elasticsearch.go index 8ccdf7de0b..2624958663 100644 --- a/idx/elasticsearch/elasticsearch.go +++ b/idx/elasticsearch/elasticsearch.go @@ -11,10 +11,10 @@ import ( "time" "github.com/mattbaird/elastigo/lib" - "github.com/raintank/met" "github.com/raintank/metrictank/cluster" "github.com/raintank/metrictank/idx" "github.com/raintank/metrictank/idx/memory" + "github.com/raintank/metrictank/stats" "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" "gopkg.in/raintank/schema.v1" diff --git a/idx/idx.go b/idx/idx.go index ef40642e89..ea55ec3386 100644 --- a/idx/idx.go +++ b/idx/idx.go @@ -6,7 +6,7 @@ import ( "errors" "time" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" "gopkg.in/raintank/schema.v1" ) diff --git a/idx/memory/memory.go b/idx/memory/memory.go index 5272e47cf0..85624c10f5 100644 --- a/idx/memory/memory.go +++ b/idx/memory/memory.go @@ -8,7 +8,7 @@ import ( "sync" "time" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" "github.com/raintank/metrictank/idx" "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" diff --git a/input/carbon/carbon.go b/input/carbon/carbon.go index 0b807c4010..b2137f51f9 100644 --- a/input/carbon/carbon.go +++ b/input/carbon/carbon.go @@ -11,11 +11,11 @@ import ( "github.com/lomik/go-carbon/persister" "github.com/metrics20/go-metrics20/carbon20" - "github.com/raintank/met" "github.com/raintank/metrictank/cluster" "github.com/raintank/metrictank/idx" "github.com/raintank/metrictank/input" "github.com/raintank/metrictank/mdata" + "github.com/raintank/metrictank/stats" "github.com/raintank/metrictank/usage" "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" diff --git a/input/input.go b/input/input.go index 1ed0ef357d..cb0022a79f 100644 --- a/input/input.go +++ b/input/input.go @@ -5,7 +5,7 @@ package input import ( "fmt" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" "github.com/raintank/metrictank/idx" "github.com/raintank/metrictank/mdata" "github.com/raintank/metrictank/usage" diff --git a/input/kafkamdm/kafkamdm.go b/input/kafkamdm/kafkamdm.go index 8a406e3c90..beca963116 100644 --- a/input/kafkamdm/kafkamdm.go +++ b/input/kafkamdm/kafkamdm.go @@ -11,12 +11,12 @@ import ( "github.com/raintank/worldping-api/pkg/log" "github.com/rakyll/globalconf" - "github.com/raintank/met" "github.com/raintank/metrictank/cluster" "github.com/raintank/metrictank/idx" "github.com/raintank/metrictank/input" "github.com/raintank/metrictank/kafka" "github.com/raintank/metrictank/mdata" + "github.com/raintank/metrictank/stats" "github.com/raintank/metrictank/usage" "gopkg.in/raintank/schema.v1" ) diff --git a/mdata/chunk/chunk.go b/mdata/chunk/chunk.go index 41e9365b4d..c038c43585 100644 --- a/mdata/chunk/chunk.go +++ b/mdata/chunk/chunk.go @@ -6,9 +6,10 @@ import ( "time" "github.com/dgryski/go-tsz" + "github.com/raintank/metrictank/stats" ) -var totalPoints uint64 +var totalPoints = stats.NewGauge64("total_points") // Chunk is a chunk of data. not concurrency safe. type Chunk struct { @@ -38,13 +39,10 @@ func (c *Chunk) Push(t uint32, v float64) error { c.NumPoints += 1 c.LastTs = t c.LastWrite = uint32(time.Now().Unix()) - atomic.AddUint64(&totalPoints, 1) + totalPoints.Inc() return nil } -func (c *Chunk) Clear() { - atomic.AddUint64(&totalPoints, ^uint64(c.NumPoints-1)) -} -func TotalPoints() uint64 { - return atomic.LoadUint64(&totalPoints) +func (c *Chunk) Clear() { + totalPoints.Dec64(c.NumPoints) } diff --git a/mdata/init.go b/mdata/init.go index 20450b0432..d3b744139b 100644 --- a/mdata/init.go +++ b/mdata/init.go @@ -3,7 +3,7 @@ // save states over the network package mdata -import "github.com/raintank/met" +import "github.com/raintank/metrictank/stats" var ( LogLevel int diff --git a/mdata/notifier.go b/mdata/notifier.go index ebd3308e01..e9e1585129 100644 --- a/mdata/notifier.go +++ b/mdata/notifier.go @@ -3,7 +3,7 @@ package mdata import ( "encoding/json" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" "github.com/raintank/worldping-api/pkg/log" ) diff --git a/mdata/notifierKafka/cfg.go b/mdata/notifierKafka/cfg.go index b000572a64..ed29aeed0e 100644 --- a/mdata/notifierKafka/cfg.go +++ b/mdata/notifierKafka/cfg.go @@ -7,7 +7,7 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" "github.com/rakyll/globalconf" ) diff --git a/mdata/notifierKafka/notifierKafka.go b/mdata/notifierKafka/notifierKafka.go index 18ccf119c8..5c433c501f 100644 --- a/mdata/notifierKafka/notifierKafka.go +++ b/mdata/notifierKafka/notifierKafka.go @@ -8,7 +8,7 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" "github.com/raintank/metrictank/kafka" "github.com/raintank/metrictank/mdata" "github.com/raintank/worldping-api/pkg/log" diff --git a/mdata/notifierNsq/cfg.go b/mdata/notifierNsq/cfg.go index f52ea27e62..dc243b903a 100644 --- a/mdata/notifierNsq/cfg.go +++ b/mdata/notifierNsq/cfg.go @@ -12,7 +12,7 @@ import ( "strings" "github.com/nsqio/go-nsq" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" "github.com/raintank/misc/app" "github.com/rakyll/globalconf" ) diff --git a/mdata/notifierNsq/notifierNsq.go b/mdata/notifierNsq/notifierNsq.go index 1b48680d55..4295adb92a 100644 --- a/mdata/notifierNsq/notifierNsq.go +++ b/mdata/notifierNsq/notifierNsq.go @@ -8,7 +8,7 @@ import ( "github.com/bitly/go-hostpool" "github.com/nsqio/go-nsq" - "github.com/raintank/met" + "github.com/raintank/metrictank/stats" "github.com/raintank/metrictank/mdata" "github.com/raintank/misc/instrumented_nsq" "github.com/raintank/worldping-api/pkg/log" diff --git a/mdata/store_cassandra.go b/mdata/store_cassandra.go index 67365847f0..a85bb40374 100644 --- a/mdata/store_cassandra.go +++ b/mdata/store_cassandra.go @@ -12,10 +12,10 @@ import ( "github.com/dgryski/go-tsz" "github.com/gocql/gocql" "github.com/hailocab/go-hostpool" - "github.com/raintank/met" "github.com/raintank/metrictank/cassandra" "github.com/raintank/metrictank/iter" "github.com/raintank/metrictank/mdata/chunk" + "github.com/raintank/metrictank/stats" "github.com/raintank/worldping-api/pkg/log" ) @@ -38,6 +38,11 @@ var ( errUnknownChunkFormat = errors.New("unrecognized chunk format in cassandra") errStartBeforeEnd = errors.New("start must be before end.") + cassWriteQueueSize met.Gauge32 + cassWriters met.Gauge32 + cassWriteQueueSize = stats.NewGauge("cassandra.write_queue.size", int64(*cassandraWriteQueueSize)) + cassWriters = stats.NewGauge("cassandra.num_writers", int64(*cassandraWriteConcurrency)) + cassGetExecDuration met.Timer cassGetWaitDuration met.Timer cassPutExecDuration met.Timer diff --git a/metrictank.go b/metrictank.go index 4c730dbc28..0f374b1287 100644 --- a/metrictank.go +++ b/metrictank.go @@ -19,7 +19,6 @@ import ( "github.com/Shopify/sarama" "github.com/benbjohnson/clock" "github.com/raintank/dur" - "github.com/raintank/met" "github.com/raintank/met/helper" "github.com/raintank/metrictank/api" "github.com/raintank/metrictank/cluster" @@ -34,6 +33,7 @@ import ( "github.com/raintank/metrictank/mdata/chunk" "github.com/raintank/metrictank/mdata/notifierKafka" "github.com/raintank/metrictank/mdata/notifierNsq" + "github.com/raintank/metrictank/stats" "github.com/raintank/metrictank/usage" "github.com/raintank/metrictank/util" "github.com/raintank/worldping-api/pkg/log" @@ -100,33 +100,6 @@ var ( proftrigFreqStr = flag.String("proftrigger-freq", "60s", "inspect status frequency. set to 0 to disable") proftrigMinDiffStr = flag.String("proftrigger-min-diff", "1h", "minimum time between triggered profiles") proftrigHeapThresh = flag.Int("proftrigger-heap-thresh", 25000000000, "if this many bytes allocated, trigger a profile") - - cassWriteQueueSize met.Gauge - cassWriters met.Gauge - points met.Gauge - - // metric bytes_alloc.not_freed is a gauge of currently allocated (within the runtime) memory. - // it does not include freed data so it drops at every GC run. - alloc met.Gauge - // metric bytes_alloc.incl_freed is a counter of total amount of bytes allocated during process lifetime. (incl freed data) - totalAlloc met.Gauge - // metric bytes_sys is the amount of bytes currently obtained from the system by the process. This is what the profiletrigger looks at. - sysBytes met.Gauge - clusterPrimary met.Gauge - - // metric cluster.promotion_wait is how long a candidate (secondary node) has to wait until it can become a primary - // When the timer becomes 0 it means the in-memory buffer has been able to fully populate so that if you stop a primary - // and it was able to save its complete chunks, this node will be able to take over without dataloss. - // You can upgrade a candidate to primary while the timer is not 0 yet, it just means it may have missing data in the chunks that it will save. - clusterPromoWait met.Gauge - gcNum met.Gauge // go GC - gcDur met.Gauge // go GC - gcCpuFraction met.Gauge // go GC - - // metric gc.heap_objects is how many objects are allocated on the heap, it's a key indicator for GC workload - heapObjects met.Gauge - - promotionReadyAtChan chan uint32 ) func init() { @@ -237,8 +210,6 @@ func main() { sec := dur.MustParseUNsec("warm-up-period", *warmUpPeriodStr) warmupPeriod = time.Duration(sec) * time.Second - promotionReadyAtChan = make(chan uint32) - chunkSpan := dur.MustParseUNsec("chunkspan", *chunkSpanStr) numChunks := uint32(*numChunksInt) chunkMaxStale := dur.MustParseUNsec("chunk-max-stale", *chunkMaxStaleStr) @@ -308,6 +279,7 @@ func main() { configure stats ***********************************/ if *statsEnabled { + stats.NewMemoryReporter() stats.NewGraphite(*statsAddr) } else { log.Warn("running metrictank without instrumentation.") @@ -412,7 +384,11 @@ func main() { plugin.Start(metrics, metricIndex, usg) } - promotionReadyAtChan <- (uint32(time.Now().Unix())/highestChunkSpan + 1) * highestChunkSpan + // metric cluster.promotion_wait is how long a candidate (secondary node) has to wait until it can become a primary + // When the timer becomes 0 it means the in-memory buffer has been able to fully populate so that if you stop a primary + // and it was able to save its complete chunks, this node will be able to take over without dataloss. + // You can upgrade a candidate to primary while the timer is not 0 yet, it just means it may have missing data in the chunks that it will save. + stats.NewTimeDiffReporter("cluster.promotion_wait", (uint32(time.Now().Unix())/highestChunkSpan+1)*highestChunkSpan) /*********************************** Initialize our API server @@ -479,61 +455,3 @@ func main() { log.Close() } - -func initMetrics(stats met.Backend) { - cassWriteQueueSize = stats.NewGauge("cassandra.write_queue.size", int64(*cassandraWriteQueueSize)) - cassWriters = stats.NewGauge("cassandra.num_writers", int64(*cassandraWriteConcurrency)) - points = stats.NewGauge("total_points", 0) - alloc = stats.NewGauge("bytes_alloc.not_freed", 0) - totalAlloc = stats.NewGauge("bytes_alloc.incl_freed", 0) - sysBytes = stats.NewGauge("bytes_sys", 0) - clusterPrimary = stats.NewGauge("cluster.primary", 0) - clusterPromoWait = stats.NewGauge("cluster.promotion_wait", 1) - gcNum = stats.NewGauge("gc.num", 0) - gcDur = stats.NewGauge("gc.dur", 0) // in nanoseconds. last known duration. - gcCpuFraction = stats.NewGauge("gc.cpufraction", 0) // reported as pro-mille - heapObjects = stats.NewGauge("gc.heap_objects", 0) - - // run a collector for some global stats - go func() { - var m runtime.MemStats - var promotionReadyAtTs uint32 - - ticker := time.Tick(time.Duration(1) * time.Second) - for { - select { - case now := <-ticker: - points.Value(int64(chunk.TotalPoints())) - runtime.ReadMemStats(&m) - alloc.Value(int64(m.Alloc)) - totalAlloc.Value(int64(m.TotalAlloc)) - sysBytes.Value(int64(m.Sys)) - gcNum.Value(int64(m.NumGC)) - gcDur.Value(int64(m.PauseNs[(m.NumGC+255)%256])) - gcCpuFraction.Value(int64(1000 * m.GCCPUFraction)) - heapObjects.Value(int64(m.HeapObjects)) - var px int64 - if cluster.ThisNode.IsPrimary() { - px = 1 - } else { - px = 0 - } - clusterPrimary.Value(px) - cassWriters.Value(int64(*cassandraWriteConcurrency)) - cassWriteQueueSize.Value(int64(*cassandraWriteQueueSize)) - unix := uint32(now.Unix()) - if unix >= promotionReadyAtTs { - if promotionReadyAtTs == 0 { - // not set yet. operator should hold off - clusterPromoWait.Value(1) - } else { - clusterPromoWait.Value(0) - } - } else { - clusterPromoWait.Value(int64(promotionReadyAtTs - unix)) - } - case promotionReadyAtTs = <-promotionReadyAtChan: - } - } - }() -}