This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 105
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add mt-parrot command to generate deterministic artificial metrics fo…
…r each metrictank partition.
- Loading branch information
Showing
7 changed files
with
258 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
log "github.com/sirupsen/logrus" | ||
"time" | ||
|
||
"github.com/grafana/metrictank/schema" | ||
) | ||
|
||
func produceArtificialMetrics(schemas []*schema.MetricData) { | ||
for tick := range time.NewTicker(artificialMetricsInterval).C { | ||
for _, metric := range schemas { | ||
metric.Time = tick.Unix() | ||
metric.Value = float64(tick.Unix()) | ||
} | ||
gateway.Flush(schemas) | ||
log.Infof("flushed schemas for ts %d", tick.Unix()) | ||
} | ||
} | ||
|
||
//generateSchemas generates a MetricData that hashes to each of numPartitions partitions | ||
func generateSchemas(numPartitions int32) []*schema.MetricData { | ||
var metrics []*schema.MetricData | ||
for i := int32(0); i < numPartitions; i++ { | ||
metrics = append(metrics, generateSchema(i)) | ||
} | ||
return metrics | ||
} | ||
|
||
//generateSchema generates a single MetricData that hashes to the given partition | ||
func generateSchema(desiredPartition int32) *schema.MetricData { | ||
metric := schema.MetricData{ | ||
OrgId: orgId, | ||
Unit: "partyparrots", | ||
Mtype: "gauge", | ||
Interval: int(artificialMetricsInterval.Seconds()), | ||
} | ||
|
||
for i := 1; true; i++ { | ||
metric.Name = fmt.Sprintf("parrot.testdata.%d.generated.%s", desiredPartition, generatePartitionSuffix(i)) | ||
id, err := metric.PartitionID(partitionMethod, partitionCount) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
if id == desiredPartition { | ||
log.Infof("metric for partition %d: %s", desiredPartition, metric.Name) | ||
return &metric | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
var alphabet = []rune("abcdefghijklmnopqrstuvwxyz") | ||
|
||
//generatePartitionSuffix deterministically generates a suffix for partition by brute force | ||
func generatePartitionSuffix(i int) string { | ||
if i == 0 { | ||
return "" | ||
} | ||
return generatePartitionSuffix(i/26) + string(alphabet[i%26]) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package main | ||
|
||
import ( | ||
"github.com/grafana/metrictank/cmd/mt-fakemetrics/out" | ||
"github.com/grafana/metrictank/cmd/mt-fakemetrics/out/gnet" | ||
"github.com/grafana/metrictank/logger" | ||
"github.com/grafana/metrictank/schema" | ||
"github.com/raintank/met/statsd" | ||
log "github.com/sirupsen/logrus" | ||
"github.com/spf13/cobra" | ||
"time" | ||
) | ||
|
||
var ( | ||
gatewayAddress string | ||
gatewayKey string | ||
orgId int | ||
partitionCount int32 | ||
partitionMethodString string | ||
artificialMetricsInterval time.Duration | ||
queryInterval time.Duration | ||
logLevel string | ||
|
||
partitionMethod schema.PartitionByMethod | ||
gateway out.Out | ||
) | ||
|
||
func init() { | ||
parrotCmd.Flags().StringVar(&gatewayAddress, "gateway-address", "http://localhost:6059/metrics", "the url of the metrics gateway to publish to") | ||
parrotCmd.Flags().StringVar(&gatewayKey, "gateway-key", "", "the bearer token to include with gateway requests") | ||
parrotCmd.Flags().IntVar(&orgId, "org-id", 1, "org id to publish parrot metrics to") | ||
parrotCmd.Flags().Int32Var(&partitionCount, "partition-count", 8, "number of partitions to publish parrot metrics to") | ||
parrotCmd.Flags().StringVar(&partitionMethodString, "partition-method", "bySeries", "the partition method to use, must be one of bySeries|bySeriesWithTags|bySeriesWithTagsFnv") | ||
parrotCmd.Flags().DurationVar(&artificialMetricsInterval, "artificial-metrics-interval", 5*time.Second, "interval to send metrics") | ||
parrotCmd.Flags().DurationVar(&queryInterval, "query-interval", 10*time.Second, "interval to query to validate metrics") | ||
|
||
parrotCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level. panic|fatal|error|warning|info|debug") | ||
|
||
formatter := &logger.TextFormatter{} | ||
formatter.TimestampFormat = "2006-01-02 15:04:05.000" | ||
log.SetFormatter(formatter) | ||
} | ||
|
||
func main() { | ||
err := parrotCmd.Execute() | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
} | ||
|
||
var parrotCmd = &cobra.Command{ | ||
Use: "parrot", | ||
Short: "generate deterministic metrics for each metrictank partition", | ||
Run: func(cmd *cobra.Command, args []string) { | ||
lvl, err := log.ParseLevel(logLevel) | ||
if err != nil { | ||
log.Fatalf("failed to parse log-level, %s", err.Error()) | ||
} | ||
log.SetLevel(lvl) | ||
parsePartitionMethod() | ||
initGateway() | ||
|
||
schemas := generateSchemas(partitionCount) | ||
go produceArtificialMetrics(schemas) | ||
|
||
monitor() | ||
}, | ||
} | ||
|
||
//parsePartitionMethod parses the partitionScheme cli flag, | ||
//exiting if an invalid partition schema is entered or if org based partitioning is used (not currently allowed by parrot). | ||
func parsePartitionMethod() { | ||
var err error | ||
partitionMethod, err = schema.PartitonMethodFromString(partitionMethodString) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
if partitionMethod == schema.PartitionByOrg { | ||
log.Fatal("byOrg not supported") | ||
} | ||
} | ||
|
||
func initGateway() { | ||
var err error | ||
backend, _ := statsd.New(false, "", "") | ||
gateway, err = gnet.New(gatewayAddress, gatewayKey, backend) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
log.Info("gateway initialized") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"github.com/grafana/metrictank/stacktest/graphite" | ||
log "github.com/sirupsen/logrus" | ||
"math" | ||
"net/http" | ||
"strconv" | ||
"time" | ||
) | ||
|
||
type seriesStats struct { | ||
lastTs uint32 | ||
//the partition currently being checked | ||
partition int32 | ||
//the number of nans present in the time series | ||
nans int32 | ||
//the sum of abs(value - ts) across the time series | ||
deltaSum float64 | ||
//the number of timestamps where value != ts | ||
numNonMatching int32 | ||
|
||
//tracks the last seen non-NaN time stamp (useful for lag | ||
lastSeen uint32 | ||
} | ||
|
||
func monitor() { | ||
for range time.NewTicker(queryInterval).C { | ||
|
||
query := graphite.ExecuteRenderQuery(buildRequest()) | ||
|
||
for _, s := range query.Decoded { | ||
log.Infof("%d - %d", s.Datapoints[0].Ts, s.Datapoints[len(s.Datapoints)-1].Ts) | ||
|
||
stats := seriesStats{} | ||
stats.lastTs = s.Datapoints[len(s.Datapoints)-1].Ts | ||
|
||
for _, dp := range s.Datapoints { | ||
|
||
if math.IsNaN(dp.Val) { | ||
stats.nans += 1 | ||
continue | ||
} | ||
if diff := dp.Val - float64(dp.Ts); diff != 0 { | ||
stats.lastSeen = dp.Ts | ||
stats.deltaSum += diff | ||
stats.numNonMatching += 1 | ||
} | ||
} | ||
|
||
//TODO create/emit metrics for each partition | ||
|
||
//number of missing values for each series | ||
fmt.Printf("parrot.monitoring.nanCount;partition=%d; %d\n", stats.partition, stats.nans) | ||
//time since the last value was recorded | ||
fmt.Printf("parrot.monitoring.lag;partition=%d; %d\n", stats.partition, stats.lastTs-stats.lastSeen) | ||
//total amount of drift between expected value and actual values | ||
fmt.Printf("parrot.monitoring.deltaSum;partition=%d; %f\n", stats.partition, stats.deltaSum) | ||
//total number of entries where drift occurred | ||
fmt.Printf("parrot.monitoring.nonMatching;partition=%d; %d\n", stats.partition, stats.numNonMatching) | ||
fmt.Println() | ||
} | ||
} | ||
} | ||
|
||
func buildRequest() *http.Request { | ||
req, _ := http.NewRequest("GET", fmt.Sprintf("%s/render", gatewayAddress), nil) | ||
q := req.URL.Query() | ||
q.Set("target", "parrot.testdata.*.generated.*") | ||
//TODO parameterize this | ||
q.Set("from", "-5min") | ||
q.Set("until", "now") | ||
q.Set("format", "json") | ||
q.Set("X-Org-Id", strconv.Itoa(orgId)) | ||
req.URL.RawQuery = q.Encode() | ||
if len(gatewayKey) != 0 { | ||
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", gatewayKey)) | ||
} | ||
return req | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters