Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Add mt-parrot command to generate deterministic artificial metrics fo…
Browse files Browse the repository at this point in the history
…r each metrictank partition.
  • Loading branch information
fitzoh committed Feb 18, 2020
1 parent bbe0c21 commit 2ca8897
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff
/cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff
/cmd/mt-keygen/mt-keygen
/cmd/mt-parrot/mt-parrot
/cmd/mt-schemas-explain/mt-schemas-explain
/cmd/mt-split-metrics-by-ttl/mt-split-metrics-by-ttl
/cmd/mt-store-cat/mt-store-cat
Expand Down
62 changes: 62 additions & 0 deletions cmd/mt-parrot/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"fmt"
log "github.com/sirupsen/logrus"
"time"

"github.com/grafana/metrictank/schema"
)

func produceArtificialMetrics(schemas []*schema.MetricData) {
for tick := range time.NewTicker(artificialMetricsInterval).C {
for _, metric := range schemas {
metric.Time = tick.Unix()
metric.Value = float64(tick.Unix())
}
gateway.Flush(schemas)
log.Infof("flushed schemas for ts %d", tick.Unix())
}
}

//generateSchemas generates a MetricData that hashes to each of numPartitions partitions
func generateSchemas(numPartitions int32) []*schema.MetricData {
var metrics []*schema.MetricData
for i := int32(0); i < numPartitions; i++ {
metrics = append(metrics, generateSchema(i))
}
return metrics
}

//generateSchema generates a single MetricData that hashes to the given partition
func generateSchema(desiredPartition int32) *schema.MetricData {
metric := schema.MetricData{
OrgId: orgId,
Unit: "partyparrots",
Mtype: "gauge",
Interval: int(artificialMetricsInterval.Seconds()),
}

for i := 1; true; i++ {
metric.Name = fmt.Sprintf("parrot.testdata.%d.generated.%s", desiredPartition, generatePartitionSuffix(i))
id, err := metric.PartitionID(partitionMethod, partitionCount)
if err != nil {
log.Fatal(err)
}
if id == desiredPartition {
log.Infof("metric for partition %d: %s", desiredPartition, metric.Name)
return &metric
}
}
return nil
}

var alphabet = []rune("abcdefghijklmnopqrstuvwxyz")

//generatePartitionSuffix deterministically generates a suffix for partition by brute force
func generatePartitionSuffix(i int) string {
if i == 0 {
return ""
}
return generatePartitionSuffix(i/26) + string(alphabet[i%26])
}
91 changes: 91 additions & 0 deletions cmd/mt-parrot/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package main

import (
"github.com/grafana/metrictank/cmd/mt-fakemetrics/out"
"github.com/grafana/metrictank/cmd/mt-fakemetrics/out/gnet"
"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/schema"
"github.com/raintank/met/statsd"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"time"
)

var (
gatewayAddress string
gatewayKey string
orgId int
partitionCount int32
partitionMethodString string
artificialMetricsInterval time.Duration
queryInterval time.Duration
logLevel string

partitionMethod schema.PartitionByMethod
gateway out.Out
)

func init() {
parrotCmd.Flags().StringVar(&gatewayAddress, "gateway-address", "http://localhost:6059/metrics", "the url of the metrics gateway to publish to")
parrotCmd.Flags().StringVar(&gatewayKey, "gateway-key", "", "the bearer token to include with gateway requests")
parrotCmd.Flags().IntVar(&orgId, "org-id", 1, "org id to publish parrot metrics to")
parrotCmd.Flags().Int32Var(&partitionCount, "partition-count", 8, "number of partitions to publish parrot metrics to")
parrotCmd.Flags().StringVar(&partitionMethodString, "partition-method", "bySeries", "the partition method to use, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv")
parrotCmd.Flags().DurationVar(&artificialMetricsInterval, "artificial-metrics-interval", 5*time.Second, "interval to send metrics")
parrotCmd.Flags().DurationVar(&queryInterval, "query-interval", 10*time.Second, "interval to query to validate metrics")

parrotCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level. panic|fatal|error|warning|info|debug")

formatter := &logger.TextFormatter{}
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
log.SetFormatter(formatter)
}

func main() {
err := parrotCmd.Execute()
if err != nil {
log.Fatal(err)
}
}

var parrotCmd = &cobra.Command{
Use: "parrot",
Short: "generate deterministic metrics for each metrictank partition",
Run: func(cmd *cobra.Command, args []string) {
lvl, err := log.ParseLevel(logLevel)
if err != nil {
log.Fatalf("failed to parse log-level, %s", err.Error())
}
log.SetLevel(lvl)
parsePartitionMethod()
initGateway()

schemas := generateSchemas(partitionCount)
go produceArtificialMetrics(schemas)

monitor()
},
}

//parsePartitionMethod parses the partitionScheme cli flag,
//exiting if an invalid partition schema is entered or if org based partitioning is used (not currently allowed by parrot).
func parsePartitionMethod() {
var err error
partitionMethod, err = schema.PartitonMethodFromString(partitionMethodString)
if err != nil {
log.Fatal(err)
}
if partitionMethod == schema.PartitionByOrg {
log.Fatal("byOrg not supported")
}
}

func initGateway() {
var err error
backend, _ := statsd.New(false, "", "")
gateway, err = gnet.New(gatewayAddress, gatewayKey, backend)
if err != nil {
log.Fatal(err)
}
log.Info("gateway initialized")
}
81 changes: 81 additions & 0 deletions cmd/mt-parrot/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"fmt"
"github.com/grafana/metrictank/stacktest/graphite"
log "github.com/sirupsen/logrus"
"math"
"net/http"
"strconv"
"time"
)

type seriesStats struct {
lastTs uint32
//the partition currently being checked
partition int32
//the number of nans present in the time series
nans int32
//the sum of abs(value - ts) across the time series
deltaSum float64
//the number of timestamps where value != ts
numNonMatching int32

//tracks the last seen non-NaN time stamp (useful for lag
lastSeen uint32
}

func monitor() {
for range time.NewTicker(queryInterval).C {

query := graphite.ExecuteRenderQuery(buildRequest())

for _, s := range query.Decoded {
log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts)

stats := seriesStats{}
stats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts

for _, dp := range s.Datapoints {

if math.IsNaN(dp.Val) {
stats.nans += 1
continue
}
if diff := dp.Val - float64(dp.Ts); diff != 0 {
stats.lastSeen = dp.Ts
stats.deltaSum += diff
stats.numNonMatching += 1
}
}

//TODO create/emit metrics for each partition

//number of missing values for each series
fmt.Printf("parrot.monitoring.nanCount;partition=%d; %d\n", stats.partition, stats.nans)
//time since the last value was recorded
fmt.Printf("parrot.monitoring.lag;partition=%d; %d\n", stats.partition, stats.lastTs-stats.lastSeen)
//total amount of drift between expected value and actual values
fmt.Printf("parrot.monitoring.deltaSum;partition=%d; %f\n", stats.partition, stats.deltaSum)
//total number of entries where drift occurred
fmt.Printf("parrot.monitoring.nonMatching;partition=%d; %d\n", stats.partition, stats.numNonMatching)
fmt.Println()
}
}
}

func buildRequest() *http.Request {
req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil)
q := req.URL.Query()
q.Set("target", "parrot.testdata.*.generated.*")
//TODO parameterize this
q.Set("from", "-5min")
q.Set("until", "now")
q.Set("format", "json")
q.Set("X-Org-Id", strconv.Itoa(orgId))
req.URL.RawQuery = q.Encode()
if len(gatewayKey) != 0 {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", gatewayKey))
}
return req
}
2 changes: 1 addition & 1 deletion publish/kafka/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ func (m *mtPublisher) Publish(metrics []*schema.MetricData) error {
if m.autoInterval {
_, s := m.schemas.Match(metric.Name, 0)
metric.Interval = s.Retentions.Rets[0].SecondsPerPoint
metric.SetId()
} else {
log.Error("interval is 0 but can't deduce interval automatically. this should never happen")
return errors.New("need to deduce interval but cannot")
}
}
metric.SetId()

isMD := false
isMP := false
Expand Down

0 comments on commit 2ca8897

Please sign in to comment.