Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
do metrics the new way
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Nov 27, 2016
1 parent 23a1488 commit 6bf366c
Show file tree
Hide file tree
Showing 19 changed files with 34 additions and 113 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

_ "net/http/pprof"

"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
"github.com/raintank/metrictank/idx"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/worldping-api/pkg/log"
Expand Down
2 changes: 1 addition & 1 deletion api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package api

import (
"fmt"
"github.com/raintank/met/helper"
"github.com/raintank/metrictank/api/models"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/consolidation"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/stats"
"gopkg.in/raintank/schema.v1"
"math"
"math/rand"
Expand Down
2 changes: 1 addition & 1 deletion cassandra/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"

"github.com/gocql/gocql"
"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
)

type Metrics struct {
Expand Down
2 changes: 1 addition & 1 deletion idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/gocql/gocql"
"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
"github.com/raintank/metrictank/cassandra"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/idx"
Expand Down
2 changes: 1 addition & 1 deletion idx/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"time"

"github.com/mattbaird/elastigo/lib"
"github.com/raintank/met"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/idx"
"github.com/raintank/metrictank/idx/memory"
"github.com/raintank/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"
"gopkg.in/raintank/schema.v1"
Expand Down
2 changes: 1 addition & 1 deletion idx/idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"errors"
"time"

"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
"gopkg.in/raintank/schema.v1"
)

Expand Down
2 changes: 1 addition & 1 deletion idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"sync"
"time"

"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
"github.com/raintank/metrictank/idx"
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"
Expand Down
2 changes: 1 addition & 1 deletion input/carbon/carbon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (

"github.com/lomik/go-carbon/persister"
"github.com/metrics20/go-metrics20/carbon20"
"github.com/raintank/met"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/idx"
"github.com/raintank/metrictank/input"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/stats"
"github.com/raintank/metrictank/usage"
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"
Expand Down
2 changes: 1 addition & 1 deletion input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package input
import (
"fmt"

"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
"github.com/raintank/metrictank/idx"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/usage"
Expand Down
2 changes: 1 addition & 1 deletion input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"

"github.com/raintank/met"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/idx"
"github.com/raintank/metrictank/input"
"github.com/raintank/metrictank/kafka"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/stats"
"github.com/raintank/metrictank/usage"
"gopkg.in/raintank/schema.v1"
)
Expand Down
12 changes: 5 additions & 7 deletions mdata/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"time"

"github.com/dgryski/go-tsz"
"github.com/raintank/metrictank/stats"
)

var totalPoints uint64
var totalPoints = stats.NewGauge64("total_points")

// Chunk is a chunk of data. not concurrency safe.
type Chunk struct {
Expand Down Expand Up @@ -38,13 +39,10 @@ func (c *Chunk) Push(t uint32, v float64) error {
c.NumPoints += 1
c.LastTs = t
c.LastWrite = uint32(time.Now().Unix())
atomic.AddUint64(&totalPoints, 1)
totalPoints.Inc()
return nil
}
func (c *Chunk) Clear() {
atomic.AddUint64(&totalPoints, ^uint64(c.NumPoints-1))
}

func TotalPoints() uint64 {
return atomic.LoadUint64(&totalPoints)
func (c *Chunk) Clear() {
totalPoints.Dec64(c.NumPoints)
}
2 changes: 1 addition & 1 deletion mdata/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// save states over the network
package mdata

import "github.com/raintank/met"
import "github.com/raintank/metrictank/stats"

var (
LogLevel int
Expand Down
2 changes: 1 addition & 1 deletion mdata/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package mdata
import (
"encoding/json"

"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
)

Expand Down
2 changes: 1 addition & 1 deletion mdata/notifierKafka/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
"github.com/rakyll/globalconf"
)

Expand Down
2 changes: 1 addition & 1 deletion mdata/notifierKafka/notifierKafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
"github.com/raintank/metrictank/kafka"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/worldping-api/pkg/log"
Expand Down
2 changes: 1 addition & 1 deletion mdata/notifierNsq/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"strings"

"github.com/nsqio/go-nsq"
"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
"github.com/raintank/misc/app"
"github.com/rakyll/globalconf"
)
Expand Down
2 changes: 1 addition & 1 deletion mdata/notifierNsq/notifierNsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/bitly/go-hostpool"
"github.com/nsqio/go-nsq"
"github.com/raintank/met"
"github.com/raintank/metrictank/stats"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/misc/instrumented_nsq"
"github.com/raintank/worldping-api/pkg/log"
Expand Down
7 changes: 6 additions & 1 deletion mdata/store_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
"github.com/dgryski/go-tsz"
"github.com/gocql/gocql"
"github.com/hailocab/go-hostpool"
"github.com/raintank/met"
"github.com/raintank/metrictank/cassandra"
"github.com/raintank/metrictank/iter"
"github.com/raintank/metrictank/mdata/chunk"
"github.com/raintank/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
)

Expand All @@ -38,6 +38,11 @@ var (
errUnknownChunkFormat = errors.New("unrecognized chunk format in cassandra")
errStartBeforeEnd = errors.New("start must be before end.")

cassWriteQueueSize met.Gauge32
cassWriters met.Gauge32
cassWriteQueueSize = stats.NewGauge("cassandra.write_queue.size", int64(*cassandraWriteQueueSize))
cassWriters = stats.NewGauge("cassandra.num_writers", int64(*cassandraWriteConcurrency))

cassGetExecDuration met.Timer
cassGetWaitDuration met.Timer
cassPutExecDuration met.Timer
Expand Down
96 changes: 7 additions & 89 deletions metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/Shopify/sarama"
"github.com/benbjohnson/clock"
"github.com/raintank/dur"
"github.com/raintank/met"
"github.com/raintank/met/helper"
"github.com/raintank/metrictank/api"
"github.com/raintank/metrictank/cluster"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/raintank/metrictank/mdata/chunk"
"github.com/raintank/metrictank/mdata/notifierKafka"
"github.com/raintank/metrictank/mdata/notifierNsq"
"github.com/raintank/metrictank/stats"
"github.com/raintank/metrictank/usage"
"github.com/raintank/metrictank/util"
"github.com/raintank/worldping-api/pkg/log"
Expand Down Expand Up @@ -100,33 +100,6 @@ var (
proftrigFreqStr = flag.String("proftrigger-freq", "60s", "inspect status frequency. set to 0 to disable")
proftrigMinDiffStr = flag.String("proftrigger-min-diff", "1h", "minimum time between triggered profiles")
proftrigHeapThresh = flag.Int("proftrigger-heap-thresh", 25000000000, "if this many bytes allocated, trigger a profile")

cassWriteQueueSize met.Gauge
cassWriters met.Gauge
points met.Gauge

// metric bytes_alloc.not_freed is a gauge of currently allocated (within the runtime) memory.
// it does not include freed data so it drops at every GC run.
alloc met.Gauge
// metric bytes_alloc.incl_freed is a counter of total amount of bytes allocated during process lifetime. (incl freed data)
totalAlloc met.Gauge
// metric bytes_sys is the amount of bytes currently obtained from the system by the process. This is what the profiletrigger looks at.
sysBytes met.Gauge
clusterPrimary met.Gauge

// metric cluster.promotion_wait is how long a candidate (secondary node) has to wait until it can become a primary
// When the timer becomes 0 it means the in-memory buffer has been able to fully populate so that if you stop a primary
// and it was able to save its complete chunks, this node will be able to take over without dataloss.
// You can upgrade a candidate to primary while the timer is not 0 yet, it just means it may have missing data in the chunks that it will save.
clusterPromoWait met.Gauge
gcNum met.Gauge // go GC
gcDur met.Gauge // go GC
gcCpuFraction met.Gauge // go GC

// metric gc.heap_objects is how many objects are allocated on the heap, it's a key indicator for GC workload
heapObjects met.Gauge

promotionReadyAtChan chan uint32
)

func init() {
Expand Down Expand Up @@ -237,8 +210,6 @@ func main() {
sec := dur.MustParseUNsec("warm-up-period", *warmUpPeriodStr)
warmupPeriod = time.Duration(sec) * time.Second

promotionReadyAtChan = make(chan uint32)

chunkSpan := dur.MustParseUNsec("chunkspan", *chunkSpanStr)
numChunks := uint32(*numChunksInt)
chunkMaxStale := dur.MustParseUNsec("chunk-max-stale", *chunkMaxStaleStr)
Expand Down Expand Up @@ -308,6 +279,7 @@ func main() {
configure stats
***********************************/
if *statsEnabled {
stats.NewMemoryReporter()
stats.NewGraphite(*statsAddr)
} else {
log.Warn("running metrictank without instrumentation.")
Expand Down Expand Up @@ -412,7 +384,11 @@ func main() {
plugin.Start(metrics, metricIndex, usg)
}

promotionReadyAtChan <- (uint32(time.Now().Unix())/highestChunkSpan + 1) * highestChunkSpan
// metric cluster.promotion_wait is how long a candidate (secondary node) has to wait until it can become a primary
// When the timer becomes 0 it means the in-memory buffer has been able to fully populate so that if you stop a primary
// and it was able to save its complete chunks, this node will be able to take over without dataloss.
// You can upgrade a candidate to primary while the timer is not 0 yet, it just means it may have missing data in the chunks that it will save.
stats.NewTimeDiffReporter("cluster.promotion_wait", (uint32(time.Now().Unix())/highestChunkSpan+1)*highestChunkSpan)

/***********************************
Initialize our API server
Expand Down Expand Up @@ -479,61 +455,3 @@ func main() {
log.Close()

}

func initMetrics(stats met.Backend) {
cassWriteQueueSize = stats.NewGauge("cassandra.write_queue.size", int64(*cassandraWriteQueueSize))
cassWriters = stats.NewGauge("cassandra.num_writers", int64(*cassandraWriteConcurrency))
points = stats.NewGauge("total_points", 0)
alloc = stats.NewGauge("bytes_alloc.not_freed", 0)
totalAlloc = stats.NewGauge("bytes_alloc.incl_freed", 0)
sysBytes = stats.NewGauge("bytes_sys", 0)
clusterPrimary = stats.NewGauge("cluster.primary", 0)
clusterPromoWait = stats.NewGauge("cluster.promotion_wait", 1)
gcNum = stats.NewGauge("gc.num", 0)
gcDur = stats.NewGauge("gc.dur", 0) // in nanoseconds. last known duration.
gcCpuFraction = stats.NewGauge("gc.cpufraction", 0) // reported as pro-mille
heapObjects = stats.NewGauge("gc.heap_objects", 0)

// run a collector for some global stats
go func() {
var m runtime.MemStats
var promotionReadyAtTs uint32

ticker := time.Tick(time.Duration(1) * time.Second)
for {
select {
case now := <-ticker:
points.Value(int64(chunk.TotalPoints()))
runtime.ReadMemStats(&m)
alloc.Value(int64(m.Alloc))
totalAlloc.Value(int64(m.TotalAlloc))
sysBytes.Value(int64(m.Sys))
gcNum.Value(int64(m.NumGC))
gcDur.Value(int64(m.PauseNs[(m.NumGC+255)%256]))
gcCpuFraction.Value(int64(1000 * m.GCCPUFraction))
heapObjects.Value(int64(m.HeapObjects))
var px int64
if cluster.ThisNode.IsPrimary() {
px = 1
} else {
px = 0
}
clusterPrimary.Value(px)
cassWriters.Value(int64(*cassandraWriteConcurrency))
cassWriteQueueSize.Value(int64(*cassandraWriteQueueSize))
unix := uint32(now.Unix())
if unix >= promotionReadyAtTs {
if promotionReadyAtTs == 0 {
// not set yet. operator should hold off
clusterPromoWait.Value(1)
} else {
clusterPromoWait.Value(0)
}
} else {
clusterPromoWait.Value(int64(promotionReadyAtTs - unix))
}
case promotionReadyAtTs = <-promotionReadyAtChan:
}
}
}()
}

0 comments on commit 6bf366c

Please sign in to comment.